1
0
This commit is contained in:
Aiden
2026-05-27 21:37:50 +10:00
parent 21f0e455ee
commit 4364d0ed48
54 changed files with 30241 additions and 191 deletions

View File

@@ -154,9 +154,21 @@ def label_frame(frame: bytes) -> str:
bytes.fromhex("0000158000CF"): "known_call_button_active_report",
bytes.fromhex("00001500004F"): "known_call_button_inactive_report",
bytes.fromhex("0000078000DD"): "known_cam_power_button_report",
bytes.fromhex("000013000049"): "known_iris_mblack_link_clear_report_candidate",
bytes.fromhex("000013400009"): "known_iris_mblack_link_active_report_candidate",
bytes.fromhex("0000138000C9"): "known_selector_0013_bit15_report_candidate",
bytes.fromhex("000013C00089"): "known_selector_0013_bit15_plus_iris_mblack_link_report_candidate",
bytes.fromhex("00010F8000D4"): "known_shutter_onoff_bit7_report_candidate",
bytes.fromhex("00010F200074"): "known_shutter_onoff_bit6_report_candidate",
bytes.fromhex("00010F000054"): "known_shutter_onoff_clear_report_candidate",
bytes.fromhex("010013000048"): "queued_iris_mblack_link_clear_report_candidate",
bytes.fromhex("02001300004B"): "queued_iris_mblack_link_clear_report_candidate",
bytes.fromhex("010013400008"): "queued_iris_mblack_link_active_report_candidate",
bytes.fromhex("02001340000B"): "queued_iris_mblack_link_active_report_candidate",
bytes.fromhex("0100138000C8"): "queued_selector_0013_bit15_report_candidate",
bytes.fromhex("0200138000CB"): "queued_selector_0013_bit15_report_candidate",
bytes.fromhex("010013C00088"): "queued_selector_0013_bit15_plus_iris_mblack_link_report_candidate",
bytes.fromhex("020013C0008B"): "queued_selector_0013_bit15_plus_iris_mblack_link_report_candidate",
bytes.fromhex("01010F8000D5"): "queued_shutter_onoff_bit7_report_candidate",
bytes.fromhex("02010F8000D6"): "queued_shutter_onoff_bit7_report_candidate",
bytes.fromhex("01010F200075"): "queued_shutter_onoff_bit6_report_candidate",

View File

@@ -9,6 +9,7 @@ from typing import Any
from .formatting import h16
from .lcd_text import analyze_lcd_text
from .panel_selectors import panel_selector_semantics_payload
from .rom import Rom
from .serial_semantics import OBSERVED_TX_REPORT_OVERLAY
from .table_xrefs import analyze_table_xrefs
@@ -125,6 +126,7 @@ def analyze_ccu_seed_hints(payload: Mapping[str, Any], *, rom_path: Path | None
selector_hints = _selector_hints_from_tables(table_analysis)
_merge_special_selectors(selector_hints)
_merge_observed_reports(selector_hints)
_merge_panel_selector_semantics(selector_hints)
dispatch = _dispatch_table_summary(payload, rom_path)
for entry in dispatch.get("interesting_entries", []):
@@ -362,6 +364,30 @@ def _merge_observed_reports(hints: dict[int, JsonObject]) -> None:
hint["reasons"].append(f"observed RCP autonomous report frame(s): {frames}")
def _merge_panel_selector_semantics(hints: dict[int, JsonObject]) -> None:
for item in panel_selector_semantics_payload():
selector = int(item["selector"])
hint = hints.setdefault(selector, _new_selector_hint(selector))
hint["score"] += 4
hint["name"] = str(item.get("name") or hint["name"])
summary = str(item.get("summary") or "").strip()
if summary:
hint["reasons"].append(summary)
for effect in item.get("effects", []):
if not isinstance(effect, Mapping):
continue
name = effect.get("name") or "panel effect"
mask = effect.get("mask_hex") or "mask?"
when_set = effect.get("when_set") or "set"
hint["reasons"].append(f"{mask} {name}: {when_set}")
for meaning in item.get("value_meanings", []):
if not isinstance(meaning, Mapping):
continue
value = meaning.get("value")
if isinstance(value, int):
_add_seed_value(hint, value)
def _seed_plan(hints: Mapping[int, JsonObject]) -> JsonObject:
planned = [
(0x000, 0x8080, "selector zero active/connect candidate from emulator state search"),
@@ -595,3 +621,7 @@ __all__ = [
"selector_bytes",
"write_ccu_seed_hints",
]
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -105,6 +105,7 @@ INPUT_BYTES = [
KNOWN_REPORTS = {
0x0007: "CAM POWER",
0x0013: "IRIS/M.BLACK LINK",
0x0015: "CALL",
}

213
h8536/panel_selectors.py Normal file
View File

@@ -0,0 +1,213 @@
from __future__ import annotations
from copy import deepcopy
from typing import Any
JsonObject = dict[str, Any]
CURRENT_TABLE_BASE = 0xE800
PRIMARY_TABLE_BASE = 0xE000
SECONDARY_TABLE_BASE = 0xE400
PANEL_SELECTOR_SEMANTICS: tuple[JsonObject, ...] = (
{
"selector": 0x0013,
"selector_hex": "0x0013",
"name": "slave_and_iris_mblack_link_lamps",
"summary": (
"Selector 0x0013 is a two-bit lamp/status word. ROM dispatch H'2E06 "
"reads current table word H'E826 and fans bit 15 and bit 14 into panel latch RAM."
),
"state_machine": {
"name_candidate": "iris_mblack_link_closed_loop_state_candidate",
"summary": (
"Bench-proven closed loop: the RCP reports local IRIS/M.BLACK LINK intent, "
"the CCU ACKs selector 0x0013, then the CCU mirrors the accepted selector "
"state back with command 0. The mirrored state controls the next toggle direction."
),
"active_report_frame": "00 00 13 40 00 09",
"clear_report_frame": "00 00 13 00 00 49",
"ack_frame": "05 00 13 00 00 4C",
"active_mirror_frame": "00 00 13 40 00 09",
"clear_mirror_frame": "00 00 13 00 00 49",
"confidence": "bench-high",
},
"primary_word_address": PRIMARY_TABLE_BASE + 0x0013 * 2,
"primary_word_address_hex": "H'E026",
"current_word_address": CURRENT_TABLE_BASE + 0x0013 * 2,
"current_word_address_hex": "H'E826",
"dispatch_handler": "H'2E06",
"confidence": "high",
"evidence": [
"bench: 00 00 13 80 00 C9 lights far-right SLAVE lamp",
"bench: 00 00 13 40 00 09 lights IRIS/M.BLACK LINK lamp",
"ROM: H'2E06-H'2E32 tests H'E826 bits 15/14 and sets/clears F791/F713/F716 latch bits",
],
"effects": [
{
"bit": 15,
"mask": 0x8000,
"mask_hex": "0x8000",
"name": "SLAVE lamp",
"when_set": "sets F791.6 and F713.4",
"when_clear": "clears F791.6 and F713.4",
"ram_bits": ["F791.6", "F713.4"],
"handler_range": "H'2E06-H'2E1A",
"confidence": "high",
},
{
"bit": 14,
"mask": 0x4000,
"mask_hex": "0x4000",
"name": "IRIS/M.BLACK LINK lamp",
"when_set": "sets F791.5 and F716.7",
"when_clear": "clears F791.5 and F716.7",
"ram_bits": ["F791.5", "F716.7"],
"handler_range": "H'2E1E-H'2E32",
"confidence": "high",
},
],
"local_triggers": [
{
"kind": "panel_input_toggle",
"source": "F006.7 / F6DB.7",
"handler": "H'200E",
"name_candidate": "provisional_iris_mblack_link_button_toggle_report",
"summary": (
"When F6DB.7 is asserted and F731 <= 3, the ROM toggles current-table "
"bit 14 at H'E826 based on F791.5, then queues selector 0x0013 through loc_3E54."
),
"gate": "F731 <= 3",
"current_state_bit": "F791.5",
"active_value": 0x4000,
"active_value_hex": "0x4000",
"clear_value": 0x0000,
"clear_value_hex": "0x0000",
"writes": ["H'E826 bit14"],
"queue_call": "loc_3E54",
"confidence": "medium-high",
},
{
"kind": "local_helper_set_clear",
"source": "H'1FE8/H'1FFB",
"handler": "H'1FE8-H'200D",
"summary": "Adjacent local helpers set or clear current-table bit 15 at H'E826 and queue selector 0x0013.",
"writes": ["H'E826 bit15"],
"queue_call": "loc_3E54",
"confidence": "medium",
},
],
"value_meanings": [
{"value": 0x8000, "value_hex": "0x8000", "meaning": "SLAVE lamp on", "confidence": "high"},
{"value": 0x4000, "value_hex": "0x4000", "meaning": "IRIS/M.BLACK LINK lamp on", "confidence": "high"},
{"value": 0x0000, "value_hex": "0x0000", "meaning": "SLAVE and IRIS/M.BLACK LINK latch bits clear through H'2E06", "confidence": "high"},
],
},
{
"selector": 0x0024,
"selector_hex": "0x0024",
"name": "lcd_selector_button_lamp",
"summary": "Bench-visible LCD selector-button lamp lane.",
"primary_word_address": PRIMARY_TABLE_BASE + 0x0024 * 2,
"primary_word_address_hex": "H'E048",
"current_word_address": CURRENT_TABLE_BASE + 0x0024 * 2,
"current_word_address_hex": "H'E848",
"confidence": "bench-medium",
"value_meanings": [
{"value": 0x8000, "value_hex": "0x8000", "meaning": "LCD selector-button lamp visible", "confidence": "bench-medium"},
{"value": 0x0000, "value_hex": "0x0000", "meaning": "lamp remained visible at 0.5 s in isolation run", "confidence": "bench-low"},
],
},
{
"selector": 0x0082,
"selector_hex": "0x0082",
"name": "iris_readout_lane",
"summary": "Bench-visible IRIS seven-segment/display lane.",
"primary_word_address": PRIMARY_TABLE_BASE + 0x0082 * 2,
"primary_word_address_hex": "H'E104",
"current_word_address": CURRENT_TABLE_BASE + 0x0082 * 2,
"current_word_address_hex": "H'E904",
"confidence": "bench-high",
"value_meanings": [
{"value": 0x8000, "value_hex": "0x8000", "meaning": "IRIS display OP", "confidence": "bench-high"},
{"value": 0x4000, "value_hex": "0x4000", "meaning": "IRIS display 1.4", "confidence": "bench-high"},
{"value": 0x0000, "value_hex": "0x0000", "meaning": "IRIS display blank", "confidence": "bench-high"},
],
},
{
"selector": 0x0083,
"selector_hex": "0x0083",
"name": "combined_iris_shutter_master_gain_status_lane",
"summary": "Bench-visible combined status/readout lane; clear behavior appears latched or copied elsewhere.",
"primary_word_address": PRIMARY_TABLE_BASE + 0x0083 * 2,
"primary_word_address_hex": "H'E106",
"current_word_address": CURRENT_TABLE_BASE + 0x0083 * 2,
"current_word_address_hex": "H'E906",
"confidence": "bench-medium-high",
"value_meanings": [
{"value": 0x8000, "value_hex": "0x8000", "meaning": "IRIS AUTO, SHUTTER OFF, MASTER GAIN -3", "confidence": "bench-medium-high"},
{"value": 0x0000, "value_hex": "0x0000", "meaning": "same visible state remained at 0.5 s", "confidence": "bench-low"},
],
},
{
"selector": 0x0093,
"selector_hex": "0x0093",
"name": "white_balance_black_flare_mode_lane",
"summary": "Bench-visible white-balance and black/flare lamp lane.",
"primary_word_address": PRIMARY_TABLE_BASE + 0x0093 * 2,
"primary_word_address_hex": "H'E126",
"current_word_address": CURRENT_TABLE_BASE + 0x0093 * 2,
"current_word_address_hex": "H'E926",
"confidence": "bench-high",
"value_meanings": [
{"value": 0x8000, "value_hex": "0x8000", "meaning": "BLACK/FLARE MANUAL plus white-balance PRESET", "confidence": "bench-high"},
{"value": 0x4000, "value_hex": "0x4000", "meaning": "BLACK/FLARE MANUAL plus white-balance AUTO", "confidence": "bench-high"},
{"value": 0x2000, "value_hex": "0x2000", "meaning": "BLACK/FLARE MANUAL plus white-balance MANUAL", "confidence": "bench-high"},
{"value": 0x0000, "value_hex": "0x0000", "meaning": "BLACK/FLARE MANUAL plus white-balance MANUAL", "confidence": "bench-high"},
],
},
)
def panel_selector_semantics_payload() -> list[JsonObject]:
return deepcopy(list(PANEL_SELECTOR_SEMANTICS))
def known_panel_selector(selector: int) -> JsonObject | None:
normalized = selector & 0x01FF
for item in PANEL_SELECTOR_SEMANTICS:
if int(item["selector"]) == normalized:
return deepcopy(item)
return None
def selector_word_address(table_base: int, selector: int) -> int:
return (table_base + ((selector & 0x01FF) * 2)) & 0xFFFF
def describe_selector_value(selector: int, value: int) -> list[str]:
item = known_panel_selector(selector)
if item is None:
return []
normalized_value = value & 0xFFFF
lines: list[str] = []
for meaning in item.get("value_meanings", []):
if not isinstance(meaning, dict) or meaning.get("value") != normalized_value:
continue
lines.append(str(meaning.get("meaning") or "known panel selector value"))
for effect in item.get("effects", []):
if not isinstance(effect, dict) or not isinstance(effect.get("mask"), int):
continue
mask = int(effect["mask"]) & 0xFFFF
state = "set" if normalized_value & mask else "clear"
action = effect.get("when_set") if state == "set" else effect.get("when_clear")
name = str(effect.get("name") or f"bit {effect.get('bit', '?')}")
if action:
lines.append(f"{name}: bit {effect.get('bit', '?')} {state}; {action}")
else:
lines.append(f"{name}: bit {effect.get('bit', '?')} {state}")
return lines

View File

@@ -278,6 +278,7 @@ def _declarations(tx_candidate: JsonObject | None, rx_candidate: JsonObject | No
"typedef uint8_t u8;",
"typedef uint16_t u16;",
"",
"#define BIT(n) (1u << (n))",
"extern volatile u8 MEM8[0x10000];",
"",
f"#define {channel}_SCR MEM8[{_c_hex(scr)}]",
@@ -424,6 +425,7 @@ def _semantics_lines(
lines.extend(_command_effect_comment_lines(protocol.get("command_effects"), opts, prefix=" * "))
lines.extend(_response_schema_comment_lines(_schema_list(protocol), opts, prefix=" * "))
lines.extend(_table_map_comment_lines(_table_map_list(protocol), opts, prefix=" * "))
lines.extend(_panel_selector_comment_lines(protocol.get("panel_selector_semantics"), opts, prefix=" * "))
lines.extend(_state_variable_comment_lines(protocol.get("state_variable_candidates"), opts, prefix=" * "))
lines.extend(_retry_error_comment_lines(protocol.get("retry_error_model"), opts, prefix=" * "))
lines.extend(_gate_queue_comment_lines(protocol.get("gate_queue_model"), opts, prefix=" * "))
@@ -466,6 +468,8 @@ def _semantics_lines(
)
lines.extend(_gate_queue_predicate_function_lines(protocol.get("gate_queue_model")))
lines.extend(_timer_architecture_function_lines(protocol))
lines.extend(_panel_selector_function_lines(protocol.get("panel_selector_semantics")))
lines.extend(_panel_selector_provisional_function_lines(protocol.get("panel_selector_semantics")))
lines.extend(
[
"void sci1_process_candidate_protocol_command(void)",
@@ -474,6 +478,8 @@ def _semantics_lines(
" u16 logical_index = sci1_rx_candidate_logical_index();",
" u16 value = sci1_rx_candidate_value();",
"",
" sci1_candidate_panel_selector_annotation(logical_index, value);",
"",
],
)
lines.extend(_command_dispatch_switch_lines(commands, opts))
@@ -644,6 +650,70 @@ def _table_map_comment_lines(
return lines
def _panel_selector_comment_lines(
value: object,
opts: SerialPseudocodeOptions,
*,
prefix: str,
) -> list[str]:
selectors = _object_list(value)
if not selectors:
return []
lines = [f"{prefix}panel selector semantics:"]
for selector in selectors[:6]:
selector_hex = selector.get("selector_hex") or _selector_hex(selector.get("selector"))
name = selector.get("name") or "panel_selector"
current = selector.get("current_word_address_hex") or "current table"
dispatch = selector.get("dispatch_handler") or "dispatch unknown"
summary = _comment_text(str(selector.get("summary") or "bench/ROM selector annotation"))
lines.append(f"{prefix}- {selector_hex} {name}: {summary}")
lines.append(f"{prefix} current word: {current}; dispatch: {dispatch}")
for effect in _object_list(selector.get("effects"))[:4]:
mask = effect.get("mask_hex") or _selector_hex(effect.get("mask"))
effect_name = effect.get("name") or "effect"
when_set = _comment_text(str(effect.get("when_set") or "set"))
bits = ", ".join(str(item) for item in effect.get("ram_bits", []))
suffix = f"; RAM {bits}" if bits else ""
lines.append(f"{prefix} {mask} -> {effect_name}: {when_set}{suffix}")
meanings = []
for meaning in _object_list(selector.get("value_meanings"))[:4]:
value_hex = meaning.get("value_hex") or _selector_hex(meaning.get("value"))
label = _comment_text(str(meaning.get("meaning") or "known panel value"))
meanings.append(f"{value_hex} {label}")
if meanings:
lines.append(f"{prefix} observed values: {'; '.join(meanings)}")
state_machine = selector.get("state_machine")
if isinstance(state_machine, dict):
name_candidate = state_machine.get("name_candidate") or "selector_state_machine_candidate"
summary = _comment_text(str(state_machine.get("summary") or "bench-proven selector state-machine candidate"))
lines.append(f"{prefix} state machine: {name_candidate}: {summary}")
active = state_machine.get("active_report_frame")
clear = state_machine.get("clear_report_frame")
ack = state_machine.get("ack_frame")
mirror_active = state_machine.get("active_mirror_frame")
mirror_clear = state_machine.get("clear_mirror_frame")
if active or clear or ack:
lines.append(
f"{prefix} frames: active report {active or '?'}; clear report {clear or '?'}; "
f"ACK {ack or '?'}; mirror active {mirror_active or '?'}; mirror clear {mirror_clear or '?'}"
)
triggers = []
for trigger in _object_list(selector.get("local_triggers"))[:3]:
source = trigger.get("source") or trigger.get("handler") or "local path"
summary = _comment_text(str(trigger.get("summary") or "local trigger candidate"))
name_candidate = trigger.get("name_candidate")
prefix_text = f"{name_candidate} " if name_candidate else ""
triggers.append(f"{prefix_text}{source}: {summary}")
if triggers:
lines.append(f"{prefix} local trigger candidates: {'; '.join(triggers)}")
evidence = ", ".join(str(item) for item in selector.get("evidence", []) if item)
if opts.include_evidence and evidence:
lines.append(f"{prefix} evidence: {_comment_text(evidence)}")
if len(selectors) > 6:
lines.append(f"{prefix}- ... {len(selectors) - 6} more panel selector annotations")
return lines
def _state_variable_comment_lines(
value: object,
opts: SerialPseudocodeOptions,
@@ -955,6 +1025,121 @@ def _timer_architecture_function_lines(protocol: JsonObject) -> list[str]:
)
def _panel_selector_function_lines(value: object) -> list[str]:
selectors = _object_list(value)
if not selectors:
return [
"static void sci1_candidate_panel_selector_annotation(u16 logical_index, u16 value)",
"{",
" (void)logical_index;",
" (void)value;",
"}",
"",
]
lines = [
"static void sci1_candidate_panel_selector_annotation(u16 logical_index, u16 value)",
"{",
" /* Known bench/ROM selector labels. This helper is commentary for the decompile. */",
" switch (logical_index) {",
]
for selector in selectors:
selector_value = selector.get("selector")
if not isinstance(selector_value, int):
continue
selector_hex = selector.get("selector_hex") or f"0x{selector_value:04X}"
name = _comment_text(str(selector.get("name") or "panel selector"))
current = selector.get("current_word_address_hex") or "current table"
dispatch = selector.get("dispatch_handler") or "dispatch unknown"
lines.append(f" case 0x{selector_value & 0x01FF:04X}u:")
lines.append(f" /* {selector_hex} {name}; current word {current}; {dispatch}. */")
for effect in _object_list(selector.get("effects")):
mask = effect.get("mask")
if not isinstance(mask, int):
continue
effect_name = _comment_text(str(effect.get("name") or "panel effect"))
when_set = _comment_text(str(effect.get("when_set") or "set"))
when_clear = _comment_text(str(effect.get("when_clear") or "clear"))
lines.append(f" if ((value & 0x{mask & 0xFFFF:04X}u) != 0u) {{")
lines.append(f" /* {effect_name}: {when_set}. */")
lines.append(" } else {")
lines.append(f" /* {effect_name}: {when_clear}. */")
lines.append(" }")
for meaning in _object_list(selector.get("value_meanings")):
known_value = meaning.get("value")
if not isinstance(known_value, int):
continue
label = _comment_text(str(meaning.get("meaning") or "known panel value"))
lines.append(f" if (value == 0x{known_value & 0xFFFF:04X}u) {{")
lines.append(f" /* {label}. */")
lines.append(" }")
lines.append(" break;")
lines.extend(
[
" default:",
" break;",
" }",
"}",
"",
],
)
return lines
def _panel_selector_provisional_function_lines(value: object) -> list[str]:
selectors = _object_list(value)
lines: list[str] = []
for selector in selectors:
selector_value = selector.get("selector")
if not isinstance(selector_value, int):
continue
state_machine = selector.get("state_machine")
if not isinstance(state_machine, dict):
continue
for trigger in _object_list(selector.get("local_triggers")):
name = str(trigger.get("name_candidate") or "").strip()
if not name:
continue
handler = _comment_text(str(trigger.get("handler") or "handler unknown"))
source = _comment_text(str(trigger.get("source") or "source unknown"))
gate = _comment_text(str(trigger.get("gate") or "gate unknown"))
current_bit = _comment_text(str(trigger.get("current_state_bit") or "current state bit unknown"))
summary = _comment_text(str(trigger.get("summary") or "local trigger candidate"))
active_value = _int_from_object(trigger.get("active_value"), 0x4000)
clear_value = _int_from_object(trigger.get("clear_value"), 0x0000)
active_report = _comment_text(str(state_machine.get("active_report_frame") or "active report unknown"))
clear_report = _comment_text(str(state_machine.get("clear_report_frame") or "clear report unknown"))
ack_frame = _comment_text(str(state_machine.get("ack_frame") or "ACK unknown"))
active_mirror = _comment_text(str(state_machine.get("active_mirror_frame") or "active mirror unknown"))
clear_mirror = _comment_text(str(state_machine.get("clear_mirror_frame") or "clear mirror unknown"))
safe_name = _safe_identifier(name)
lines.extend(
[
f"void {safe_name}(void)",
"{",
f" /* Provisional name for ROM {handler}: {summary} */",
f" /* Source {source}; gate {gate}; current state {current_bit}. */",
" if ((MEM8[0xF6DBu] & BIT(7)) == 0u) {",
" return;",
" }",
" if (MEM8[0xF731u] > 3u) {",
" return;",
" }",
"",
" if ((MEM8[0xF791u] & BIT(5)) == 0u) {",
f" /* Requests selector 0x{selector_value & 0x01FF:04X}=0x{active_value & 0xFFFF:04X}: {active_report}. */",
f" /* CCU should ACK {ack_frame}, then mirror {active_mirror}. */",
" } else {",
f" /* Requests selector 0x{selector_value & 0x01FF:04X}=0x{clear_value & 0xFFFF:04X}: {clear_report}. */",
f" /* CCU should ACK {ack_frame}, then mirror {clear_mirror}. */",
" }",
"}",
"",
],
)
return lines
def _timer_tick_function_lines(function_name: str, counters: list[JsonObject], summary: str) -> list[str]:
lines = [
f"void {function_name}(void)",
@@ -1130,6 +1315,16 @@ def _command_hex(value: object) -> str:
return "?"
def _selector_hex(value: object) -> str:
if isinstance(value, int):
return f"0x{value & 0xFFFF:04X}"
return "?"
def _int_from_object(value: object, default: int) -> int:
return value if isinstance(value, int) else default
def _tx_functions(candidate: JsonObject, opts: SerialPseudocodeOptions) -> list[str]:
length = _int_field(candidate, "frame_length", 6)
seed = _int_field(candidate, "checksum_seed", 0x5A)
@@ -1386,3 +1581,7 @@ def _safe_identifier(value: str) -> str:
def _comment_text(text: str) -> str:
return text.replace("*/", "* /").replace("\r", " ").replace("\n", " ")
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -270,6 +270,8 @@ def _run_step(ctx: ScenarioContext, action: str, spec: dict[str, Any]) -> None:
_listen(ctx, float(spec.get("seconds", spec.get("value", 0.0))))
elif action == "listen_ack":
_step_listen_ack(ctx, spec)
elif action == "listen_ack_until_quiet":
_step_listen_ack_until_quiet(ctx, spec)
elif action == "send":
frame = _parse_required_frame(spec.get("frame"))
label = str(spec.get("label", "send"))
@@ -378,7 +380,38 @@ def _step_table_sweep(ctx: ScenarioContext, spec: dict[str, Any]) -> None:
def _step_listen_ack(ctx: ScenarioContext, spec: dict[str, Any]) -> None:
seconds = float(spec.get("seconds", spec.get("value", 1.0)))
ack = _ack_config(
ack = _ack_config_from_step(spec)
ack_text = (
f"ack_frame={format_frame(ack['frame'])}"
if ack["ack_mode"] == "fixed"
else f"ack_mode={ack['ack_mode']}"
)
ctx.logger.event(
f"LISTEN_ACK seconds={seconds:.3f} target_mode={ack['target_mode']} targets={len(ack['targets'])} "
f"{ack_text} limit_scope={ack['limit_scope']} max_acks={ack['max_acks']}"
)
_listen_with_ack(ctx, seconds, None, ack)
def _step_listen_ack_until_quiet(ctx: ScenarioContext, spec: dict[str, Any]) -> None:
seconds = float(spec.get("seconds", spec.get("value", 10.0)))
quiet_seconds = float(spec.get("quiet_seconds", spec.get("quiet", 0.750)))
ack = _ack_config_from_step(spec)
ack_text = (
f"ack_frame={format_frame(ack['frame'])}"
if ack["ack_mode"] == "fixed"
else f"ack_mode={ack['ack_mode']}"
)
ctx.logger.event(
f"LISTEN_ACK_UNTIL_QUIET seconds={seconds:.3f} quiet={quiet_seconds:.3f} "
f"target_mode={ack['target_mode']} targets={len(ack['targets'])} "
f"{ack_text} limit_scope={ack['limit_scope']} max_acks={ack['max_acks']}"
)
_listen_with_ack(ctx, seconds, None, ack, quiet_seconds=quiet_seconds)
def _ack_config_from_step(spec: dict[str, Any]) -> dict[str, Any]:
return _ack_config(
{
"enabled": spec.get("enabled", True),
"frames": spec.get("frames", spec.get("frame")),
@@ -393,18 +426,9 @@ def _step_listen_ack(ctx: ScenarioContext, spec: dict[str, Any]) -> None:
"ack_mode": spec.get("ack_mode", spec.get("mode", "fixed")),
"target_mode": spec.get("target_mode", spec.get("match", "explicit")),
"limit_scope": spec.get("limit_scope", spec.get("scope", "local")),
"respond_on": spec.get("respond_on", spec.get("send_on", [])),
}
)
ack_text = (
f"ack_frame={format_frame(ack['frame'])}"
if ack["ack_mode"] == "fixed"
else f"ack_mode={ack['ack_mode']}"
)
ctx.logger.event(
f"LISTEN_ACK seconds={seconds:.3f} target_mode={ack['target_mode']} targets={len(ack['targets'])} "
f"{ack_text} limit_scope={ack['limit_scope']} max_acks={ack['max_acks']}"
)
_listen_with_ack(ctx, seconds, None, ack)
def _ack_config(raw: Any) -> dict[str, Any]:
@@ -438,6 +462,7 @@ def _ack_config(raw: Any) -> dict[str, Any]:
"ack_mode": ack_mode,
"target_mode": target_mode,
"limit_scope": limit_scope,
"respond_on": _response_rules(spec.get("respond_on", [])),
}
@@ -458,28 +483,71 @@ def _ack_matches(frame: bytes, ack: dict[str, Any]) -> bool:
return frame[0] in {0x00, 0x01, 0x02} and not (frame[1] & 0x80)
def _response_rules(raw: Any) -> list[dict[str, Any]]:
values = raw if isinstance(raw, list) else ([raw] if raw else [])
rules: list[dict[str, Any]] = []
for index, value in enumerate(values):
if not isinstance(value, dict):
raise SystemExit("respond_on entries must be objects")
targets = _parse_frame_list(value.get("frames", value.get("frame")))
response_frame = _parse_required_frame(value.get("send", value.get("response")))
label = str(value.get("label", f"respond_on_{index + 1}"))
rules.append(
{
"targets": targets,
"frame": response_frame,
"label": label,
"delay": float(value.get("delay", 0.0)),
"listen": float(value.get("listen", 0.0)),
"once": bool(value.get("once", True)),
}
)
return rules
def _listen_with_ack(
ctx: ScenarioContext,
seconds: float,
selector: int,
selector: int | None,
ack: dict[str, Any],
*,
quiet_seconds: float | None = None,
) -> list[bytes]:
deadline = time.monotonic() + max(0.0, seconds)
observed: list[bytes] = []
pending: list[bytes] = []
pending_index = 0
last_activity = time.monotonic()
acked_targets: set[bytes] = set()
fired_responses: set[int] = set()
ack_start = ctx.ack_sent
target_start = sum(ctx.target_counts.values())
def enqueue(frames: list[bytes]) -> None:
nonlocal last_activity
if not frames:
return
observed.extend(frames)
pending.extend(frames)
last_activity = time.monotonic()
while time.monotonic() < deadline:
frames = _read_available(ctx, selector=selector)
observed.extend(frames)
if not frames:
enqueue(frames)
if not frames and pending_index >= len(pending):
if quiet_seconds is not None and time.monotonic() - last_activity >= quiet_seconds:
ctx.logger.event(f"LISTEN_ACK_QUIET quiet={quiet_seconds:.3f}s")
break
sleep_for = min(max(0.001, ack["poll_interval"]), max(0.0, deadline - time.monotonic()))
if sleep_for > 0:
time.sleep(sleep_for)
continue
if not ack["enabled"]:
pending_index = len(pending)
continue
for frame in frames:
while pending_index < len(pending):
frame = pending[pending_index]
pending_index += 1
if not _ack_matches(frame, ack):
continue
_count_target(ctx, frame)
@@ -493,7 +561,7 @@ def _listen_with_ack(
continue
acked_targets.add(frame)
if ack["guard"] > 0:
observed.extend(_listen(ctx, ack["guard"], selector=selector))
enqueue(_listen(ctx, ack["guard"], selector=selector))
_send_and_record(ctx, _ack_frame_for_target(frame, ack), "ack", capture=ctx.args.snapshot_acks)
ctx.ack_sent += 1
if _ack_limit_reached(ctx, ack, ack_start=ack_start, target_start=target_start):
@@ -501,7 +569,22 @@ def _listen_with_ack(
if ack["abort_on_limit"]:
ctx.abort_requested = True
if ack["post_read"] > 0:
observed.extend(_listen(ctx, ack["post_read"], selector=selector))
enqueue(_listen(ctx, ack["post_read"], selector=selector))
for rule_index, rule in enumerate(ack["respond_on"]):
if frame not in rule["targets"]:
continue
if rule["once"] and rule_index in fired_responses:
continue
fired_responses.add(rule_index)
if rule["delay"] > 0:
time.sleep(rule["delay"])
ctx.logger.event(
f"RESPOND_ON target={format_frame(frame)} "
f"send={format_frame(rule['frame'])} label={rule['label']}"
)
_send_and_record(ctx, rule["frame"], rule["label"], capture=ctx.args.snapshot_acks)
if rule["listen"] > 0:
enqueue(_listen(ctx, rule["listen"], selector=selector))
if ctx.abort_requested:
return observed
return observed
@@ -753,6 +836,7 @@ def _quiet_console_line(line: str) -> bool:
"NOTE ",
"SNAPSHOT_SCHEDULE ",
"SNAPSHOT_ERROR ",
"RESPOND_ON ",
"Summary",
"rx_frames=",
"resync_events=",
@@ -760,6 +844,8 @@ def _quiet_console_line(line: str) -> bool:
"abort_requested=",
"known_shutter",
"queued_shutter",
"iris_mblack",
"selector_0013",
)
return any(fragment in line for fragment in keep_fragments)
@@ -772,25 +858,14 @@ def _print_step_dry_run(action: str, spec: dict[str, Any], stdout: TextIO, *, in
print(f"{indent}listen={float(spec.get('listen', 0.0)):.3f}s", file=stdout)
elif action in {"drain", "listen", "wait"}:
print(f"{indent}seconds={float(spec.get('seconds', spec.get('value', 0.0))):.3f}", file=stdout)
elif action == "listen_ack":
ack = _ack_config(
{
"enabled": spec.get("enabled", True),
"frames": spec.get("frames", spec.get("frame")),
"ack_frame": spec.get("ack_frame"),
"ack_guard": spec.get("ack_guard", 0.020),
"poll_interval": spec.get("poll_interval", 0.005),
"post_ack_read": spec.get("post_ack_read", 0.250),
"once_per_selector": spec.get("once_per_frame", False),
"max_acks": spec.get("max_acks"),
"max_target_hits": spec.get("max_target_hits"),
"abort_on_limit": spec.get("abort_on_limit", False),
"ack_mode": spec.get("ack_mode", spec.get("mode", "fixed")),
"target_mode": spec.get("target_mode", spec.get("match", "explicit")),
"limit_scope": spec.get("limit_scope", spec.get("scope", "local")),
}
)
elif action in {"listen_ack", "listen_ack_until_quiet"}:
ack = _ack_config_from_step(spec)
print(f"{indent}seconds={float(spec.get('seconds', spec.get('value', 1.0))):.3f}", file=stdout)
if action == "listen_ack_until_quiet":
print(
f"{indent}quiet={float(spec.get('quiet_seconds', spec.get('quiet', 0.750))):.3f}s",
file=stdout,
)
if not ack["enabled"]:
print(f"{indent}ack=disabled", file=stdout)
else:
@@ -805,6 +880,13 @@ def _print_step_dry_run(action: str, spec: dict[str, Any], stdout: TextIO, *, in
f"max_target_hits={ack['max_target_hits']}",
file=stdout,
)
for rule in ack["respond_on"]:
print(
f"{indent}respond_on={len(rule['targets'])} "
f"send={format_frame(rule['frame'])} label={rule['label']} "
f"once={int(rule['once'])}",
file=stdout,
)
elif action in {"prompt", "note"}:
message = str(spec.get("message", spec.get("value", "Press Enter to continue.")))
print(f"{indent}message={message}", file=stdout)

View File

@@ -31,6 +31,18 @@ KNOWN_FRAME_LABELS = {
"00 00 15 80 00 CF": "known_call_button_active_report",
"00 00 15 00 00 4F": "known_call_button_inactive_report",
"00 00 07 80 00 DD": "known_cam_power_button_report",
"00 00 13 00 00 49": "known_iris_mblack_link_clear_report_candidate",
"00 00 13 40 00 09": "known_iris_mblack_link_active_report_candidate",
"00 00 13 80 00 C9": "known_selector_0013_bit15_report_candidate",
"00 00 13 C0 00 89": "known_selector_0013_bit15_plus_iris_mblack_link_report_candidate",
"01 00 13 00 00 48": "queued_iris_mblack_link_clear_report_candidate",
"02 00 13 00 00 4B": "queued_iris_mblack_link_clear_report_candidate",
"01 00 13 40 00 08": "queued_iris_mblack_link_active_report_candidate",
"02 00 13 40 00 0B": "queued_iris_mblack_link_active_report_candidate",
"01 00 13 80 00 C8": "queued_selector_0013_bit15_report_candidate",
"02 00 13 80 00 CB": "queued_selector_0013_bit15_report_candidate",
"01 00 13 C0 00 88": "queued_selector_0013_bit15_plus_iris_mblack_link_report_candidate",
"02 00 13 C0 00 8B": "queued_selector_0013_bit15_plus_iris_mblack_link_report_candidate",
"01 00 17 80 00 CC": "queued_bars_button_selector_0017_active_candidate",
"02 00 17 80 00 CF": "queued_bars_button_selector_0017_active_candidate",
"01 00 18 80 00 C3": "queued_bars_button_selector_0018_active_candidate",

View File

@@ -4,6 +4,8 @@ import re
from collections.abc import Iterable, Mapping
from typing import Any
from .panel_selectors import panel_selector_semantics_payload
JsonObject = dict[str, Any]
@@ -140,6 +142,7 @@ def analyze_serial_semantics(payload: Mapping[str, Any]) -> JsonObject:
"tx_report_model": None,
"periodic_resend_model": None,
"timer_interrupt_model": None,
"panel_selector_semantics": [],
"confidence": "low",
"confidence_score": 0.0,
"caveat": "No protocol semantics are emitted without both RX and TX serial reconstruction candidates.",
@@ -212,6 +215,7 @@ def analyze_serial_semantics(payload: Mapping[str, Any]) -> JsonObject:
"tx_report_model": tx_report_model,
"periodic_resend_model": periodic_resend_model,
"timer_interrupt_model": timer_interrupt_model,
"panel_selector_semantics": panel_selector_semantics_payload(),
"evidence": evidence,
}
return {
@@ -233,6 +237,7 @@ def analyze_serial_semantics(payload: Mapping[str, Any]) -> JsonObject:
"tx_report_model": protocol["tx_report_model"],
"periodic_resend_model": protocol["periodic_resend_model"],
"timer_interrupt_model": protocol["timer_interrupt_model"],
"panel_selector_semantics": protocol["panel_selector_semantics"],
"confidence": protocol["confidence"],
"confidence_score": protocol["confidence_score"],
"caveat": protocol["caveat"],

View File

@@ -287,6 +287,7 @@ def _logical_operand_accesses(
logical_address: int | None = None
if isinstance(offset, int):
logical_address = (int(table["logical_base_address"]) + offset) & 0xFFFF
selector = _selector_for_table_offset(table, offset)
access = _base_access(ins, functions, semantic_accesses)
access.update(
{
@@ -306,6 +307,9 @@ def _logical_operand_accesses(
if logical_address is not None:
access["logical_address"] = logical_address
access["logical_address_hex"] = h16(logical_address)
if selector is not None:
access["selector"] = selector
access["selector_hex"] = f"0x{selector:03X}"
accesses.append(access)
return accesses
@@ -342,6 +346,7 @@ def _direct_logical_address_access(
) -> JsonObject:
base = int(table["logical_base_address"])
offset = address - base
selector = _selector_for_table_offset(table, offset)
access = _base_access(ins, functions, semantic_accesses)
access.update(
{
@@ -359,6 +364,9 @@ def _direct_logical_address_access(
"access": _access_direction(ins, address) or "read_write_candidate",
}
)
if selector is not None:
access["selector"] = selector
access["selector_hex"] = f"0x{selector:03X}"
return access
@@ -373,6 +381,7 @@ def _direct_candidate_address_access(
offset = address - base
access = _base_access(ins, functions, semantic_accesses)
logical_offset = DIRECT_TABLE_TO_LOGICAL_OFFSET.get(base)
selector = _selector_for_table_offset(table, offset)
access.update(
{
"table": table["name"],
@@ -392,6 +401,9 @@ def _direct_candidate_address_access(
if logical_offset is not None:
access["semantic_negative_offset"] = logical_offset
access["semantic_negative_offset_hex"] = h16(logical_offset)
if selector is not None:
access["selector"] = selector
access["selector_hex"] = f"0x{selector:03X}"
return access
@@ -564,6 +576,8 @@ def _format_access_line(access: Mapping[str, Any]) -> str:
index_text = f"index dynamic via {access.get('index_register')} operand {operand}"
else:
index_text = f"offset {h16(int(index or 0))}"
if access.get("selector_hex"):
index_text += f" selector {access['selector_hex']}"
if access.get("logical_address_hex"):
index_text += f" -> {access['logical_address_hex']}"
elif access.get("direct_address_hex"):
@@ -599,6 +613,19 @@ def _summarize_functions(accesses: Iterable[Mapping[str, Any]]) -> list[JsonObje
return sorted(summaries.values(), key=lambda item: (-int(item["access_count"]), str(item["label"])))
def _selector_for_table_offset(table: Mapping[str, Any], offset: int | str) -> int | None:
if not isinstance(offset, int):
return None
element = str(table.get("element_candidate") or "")
if element == "word_value":
if offset % 2:
return None
return (offset // 2) & 0x01FF
if element == "bit_flags":
return offset & 0x01FF
return None
def _function_ranges(payload: Mapping[str, Any]) -> list[JsonObject]:
call_graph = payload.get("call_graph")
if not isinstance(call_graph, Mapping):