from __future__ import annotations import re from collections.abc import Iterable, Mapping from typing import Any JsonObject = dict[str, Any] RX_FRAME_START = 0xF860 RX_FRAME_END = 0xF865 RX_CHECKSUM_ADDRESS = RX_FRAME_END RX_FRAME_LENGTH = 6 TX_STAGING_START = 0xF850 TX_STAGING_END = 0xF854 TX_STAGING_LENGTH = 5 TX_FRAME_START = 0xF858 TX_FRAME_END = 0xF85D TX_CHECKSUM_ADDRESS = TX_FRAME_END SEND_BUILDER_ADDRESS = 0xBA26 SEND_BUILDER_LABEL = "loc_BA26" AUTONOMOUS_TX_REPORT_CALL = 0xBB43 AUTONOMOUS_TX_REPORT_LABEL = "loc_BB43" MAIN_REPORT_GATE_ENTRY = 0x3FD3 MAIN_REPORT_GATE_CALL = 0x3FEB SESSION_GATE_ENTRY = 0x3FEF QUEUE_REPORT_ENTRY = 0xBAF2 RESEND_GATE_ENTRY = 0xBE9E PERIODIC_RESEND_ENTRY = 0xBED5 INDEX_DECODER_ADDRESS = 0x622B INDEX_DECODER_LABEL = "loc_622B" CHECKSUM_SEED = 0x5A SERIAL_HANDLER_START = 0xBA26 SERIAL_HANDLER_END = 0xBEFF LOGICAL_TABLES = { 0x2000: { "logical_base_address": 0xE000, "name_candidate": "primary_value_table_candidate", "element_candidate": "word_value", }, 0x1C00: { "logical_base_address": 0xE400, "name_candidate": "secondary_value_table_candidate", "element_candidate": "word_value", }, 0x1800: { "logical_base_address": 0xE800, "name_candidate": "current_value_table_candidate", "element_candidate": "word_value", }, 0x1400: { "logical_base_address": 0xEC00, "name_candidate": "flag_table_candidate", "element_candidate": "bit_flags", }, } DIRECT_TABLE_CANDIDATES = { 0xF900: "primary_value_table_candidate", 0xF920: "current_value_table_candidate", 0xF940: "secondary_value_table_candidate", 0xF980: "flag_table_candidate", } DIRECT_TABLE_TO_LOGICAL_OFFSET = { 0xF900: 0x2000, 0xF920: 0x1800, 0xF940: 0x1C00, 0xF980: 0x1400, } STATE_VARIABLES = { 0xFAA2: "serial_session_flags_candidate", 0xFAA3: "serial_pending_mask_candidate", 0xFAA4: "serial_rx_error_or_retry_gate_candidate", 0xFAA5: "serial_retry_enable_or_mode_flags_candidate", 0xFAA6: "serial_retry_counter_candidate", 0xF9B4: "event_queue_read_cursor_candidate", 0xF9B5: "event_queue_write_or_pending_cursor_candidate", 0xF9B9: "event_queue_base_or_current_slot_candidate", 0xF9C0: "serial_tx_busy_timer_candidate", 0xF9C6: "autonomous_report_period_timer_candidate", 0xF9C8: "autonomous_report_resend_countdown_candidate", } OBSERVED_TX_REPORT_OVERLAY = [ { "logical_index": 0x0000, "name_candidate": "heartbeat_or_idle_report_candidate", "observed_frames_hex": ["00 00 00 00 80 DA"], "observed_period_ms_candidate": 700, }, { "logical_index": 0x0015, "name_candidate": "call_button_report_candidate", "observed_frames_hex": ["00 00 15 80 00 CF", "00 00 15 00 00 4F"], }, { "logical_index": 0x0007, "name_candidate": "camera_power_report_candidate", "observed_frames_hex": ["00 00 07 80 00 DD"], }, ] def analyze_serial_semantics(payload: Mapping[str, Any]) -> JsonObject: """Infer conservative SCI1 frame/command semantics from decompiler JSON.""" ordered = _instruction_sequence(payload.get("instructions")) reconstruction = _serial_reconstruction(payload) rx_candidate = _candidate_by_kind(reconstruction, "candidate_sci1_rx_frame") tx_candidate = _candidate_by_kind(reconstruction, "candidate_sci1_tx_frame") frame_supported = bool(rx_candidate and tx_candidate) if not frame_supported: return { "kind": "serial_semantics", "protocol_semantics": [], "fields": [], "command_dispatch": None, "commands": [], "command_effects": [], "response_candidates": [], "response_schemas": [], "response_schema": [], "logical_table_map_candidates": [], "table_map_candidates": [], "state_variable_candidates": [], "retry_error_model": None, "gate_queue_model": None, "tx_report_model": None, "periodic_resend_model": None, "confidence": "low", "confidence_score": 0.0, "caveat": "No protocol semantics are emitted without both RX and TX serial reconstruction candidates.", } dispatch = _find_command_dispatch(ordered) responses = _response_candidates(ordered) commands = _command_candidates(ordered, dispatch, responses) fields = _field_candidates(ordered, dispatch, responses) send_builder = _send_builder_candidate(ordered, responses, tx_candidate) logical_tables = _logical_table_map_candidates(ordered) state_variables = _state_variable_candidates(ordered) retry_error_model = _retry_error_model(ordered, responses) gate_queue_model = _gate_queue_model(ordered, commands) tx_report_model = _tx_report_model(ordered, responses) periodic_resend_model = _periodic_resend_model(ordered, responses) evidence = _top_level_evidence(ordered, dispatch, responses, rx_candidate, tx_candidate) confidence_score = _confidence_score(frame_supported, dispatch, responses, commands) protocol = { "kind": "serial_semantics", "scope": "evidence_supported_sci1_6_byte_frame", "confidence": _confidence_label(confidence_score), "confidence_score": confidence_score, "caveat": ( "Semantic names are candidates only. The analyzer reports byte roles, command values, " "dispatch targets, and response staging patterns observed in code; it does not prove " "source-level intent or protocol documentation." ), "frame_candidate": { "channel": "SCI1", "rx_frame_start": RX_FRAME_START, "rx_frame_start_hex": _h16(RX_FRAME_START), "rx_frame_end": RX_FRAME_END, "rx_frame_end_hex": _h16(RX_FRAME_END), "tx_staging_start": TX_STAGING_START, "tx_staging_start_hex": _h16(TX_STAGING_START), "tx_staging_end": TX_STAGING_END, "tx_staging_end_hex": _h16(TX_STAGING_END), "tx_frame_start": TX_FRAME_START, "tx_frame_start_hex": _h16(TX_FRAME_START), "tx_frame_end": TX_FRAME_END, "tx_frame_end_hex": _h16(TX_FRAME_END), "frame_length": RX_FRAME_LENGTH, "tx_staging_length": TX_STAGING_LENGTH, "checksum_seed": CHECKSUM_SEED, "checksum_seed_hex": _h16(CHECKSUM_SEED), "serial_reconstruction_supported": frame_supported, "rx_reconstruction_candidate_id": rx_candidate.get("id") if rx_candidate else None, "tx_reconstruction_candidate_id": tx_candidate.get("id") if tx_candidate else None, }, "byte_layout": _byte_layout(), "fields": fields, "command_dispatch": dispatch, "commands": commands, "command_effects": _command_effect_aliases(commands), "index_decoder": _index_decoder_candidate(ordered), "logical_table_map_candidates": logical_tables, "table_map_candidates": logical_tables, "state_variable_candidates": state_variables, "send_builder": send_builder, "response_candidates": responses, "response_schemas": _response_schemas(responses), "response_schema": _response_schemas(responses), "rx_fields": _rx_field_candidates(ordered, dispatch), "response_builders": _response_builder_aliases(responses), "retry_error_model": retry_error_model, "gate_queue_model": gate_queue_model, "tx_report_model": tx_report_model, "periodic_resend_model": periodic_resend_model, "evidence": evidence, } return { "kind": "serial_semantics", "protocol_semantics": [protocol], "fields": protocol["fields"], "command_dispatch": protocol["command_dispatch"], "commands": protocol["commands"], "command_effects": protocol["command_effects"], "response_candidates": protocol["response_candidates"], "response_schemas": protocol["response_schemas"], "response_schema": protocol["response_schema"], "send_builder": protocol["send_builder"], "logical_table_map_candidates": protocol["logical_table_map_candidates"], "table_map_candidates": protocol["table_map_candidates"], "state_variable_candidates": protocol["state_variable_candidates"], "retry_error_model": protocol["retry_error_model"], "gate_queue_model": protocol["gate_queue_model"], "tx_report_model": protocol["tx_report_model"], "periodic_resend_model": protocol["periodic_resend_model"], "confidence": protocol["confidence"], "confidence_score": protocol["confidence_score"], "caveat": protocol["caveat"], } def _field_candidates( ordered: list[JsonObject], dispatch: JsonObject | None, responses: list[JsonObject], ) -> list[JsonObject]: fields: list[JsonObject] = [] response_write_map: dict[int, list[int]] = {} for response in responses: for write in response.get("writes", []): if not isinstance(write, Mapping): continue for address in write.get("addresses", []): if isinstance(address, int): response_write_map.setdefault(address, []).append(int(write["instruction_address"])) rx_reads = { address: [ins["address"] for ins in ordered if _is_read_from_address(ins, address)] for address in range(RX_FRAME_START, RX_FRAME_END + 1) } rx_writes = { address: [ins["address"] for ins in ordered if _is_write_to_address(ins, address)] for address in range(RX_FRAME_START, RX_FRAME_END + 1) } dispatch_addresses = set(dispatch.get("evidence_addresses", []) if dispatch else []) for offset, address in enumerate(range(RX_FRAME_START, RX_FRAME_END + 1)): role = "payload_byte_candidate" caveat = "Role is inferred from reads in command processing." if offset == 0: role = "command_selector_candidate" caveat = "RX[0] is masked with 0x07 before command comparisons." elif address == RX_CHECKSUM_ADDRESS: role = "checksum_byte_candidate" caveat = "RX[5] is compared with a checksum over RX[0..4]." fields.append( { "id": f"rx_{offset}", "kind": "rx_frame_field_candidate", "offset": offset, "address": address, "address_hex": _h16(address), "role_candidate": role, "evidence_addresses": _dedupe_ints( rx_reads[address] + rx_writes[address] + ([addr for addr in dispatch_addresses if offset == 0]) ), "evidence_addresses_hex": _hlist( rx_reads[address] + rx_writes[address] + ([addr for addr in dispatch_addresses if offset == 0]) ), "read_count": len(rx_reads[address]), "write_count": len(rx_writes[address]), "confidence": "medium" if rx_reads[address] else "low", "caveat": caveat, } ) for offset, address in enumerate(range(TX_STAGING_START, TX_STAGING_END + 1)): write_addresses = _dedupe_ints(response_write_map.get(address, [])) fields.append( { "id": f"tx_staging_{offset}", "kind": "tx_staging_field_candidate", "offset": offset, "address": address, "address_hex": _h16(address), "role_candidate": "response_staging_byte_candidate", "evidence_addresses": write_addresses, "evidence_addresses_hex": _hlist(write_addresses), "write_count": len(write_addresses), "confidence": "medium" if write_addresses else "low", "caveat": ( "This byte is staged before calls to loc_BA26; the analyzer does not infer " "a stable field name beyond response position." ), } ) return fields def _rx_field_candidates( ordered: list[JsonObject], dispatch: JsonObject | None, ) -> list[JsonObject]: read_map = { address: [ int(ins["address"]) for ins in ordered if address in _read_addresses_in_range(ins, RX_FRAME_START, RX_FRAME_END) ] for address in range(RX_FRAME_START, RX_FRAME_END + 1) } fields: list[JsonObject] = [] for offset, address in enumerate(range(RX_FRAME_START, RX_FRAME_END + 1)): if offset == 0 and dispatch: name = "command_low3" confidence = "candidate-medium" mask = dispatch.get("mask") evidence = _dedupe_ints(read_map[address] + dispatch.get("evidence_addresses", [])) elif offset in {1, 2}: name = "likely_id_or_index" confidence = "candidate-low" mask = None evidence = read_map[address] elif offset in {3, 4}: name = "likely_value" confidence = "candidate-low" mask = None evidence = read_map[address] else: name = "checksum" confidence = "candidate-medium" mask = None evidence = read_map[address] field = { "offset": offset, "field": f"byte{offset}", "name": name, "address": address, "address_hex": _h16(address), "confidence": confidence, "caveat": "Field name is inferred from access pattern and remains a candidate.", "evidence_addresses": _dedupe_ints(evidence), "evidence_addresses_hex": _hlist(evidence), } if mask is not None: field["mask"] = mask field["mask_hex"] = _h16(int(mask), width=2) fields.append(field) return fields def _find_command_dispatch(ordered: list[JsonObject]) -> JsonObject | None: by_index = {int(ins["address"]): index for index, ins in enumerate(ordered) if "address" in ins} best: JsonObject | None = None for index, ins in enumerate(ordered): if _mnemonic_root(ins.get("mnemonic", "")) != "AND": continue if _immediate_source_value(str(ins.get("operands", ""))) != 0x07: continue _source, selector_reg = _source_destination_operands(str(ins.get("operands", ""))) if not selector_reg: continue read = _find_prior_read(ordered, index, RX_FRAME_START, selector_reg) if read is None: continue comparisons = _dispatch_comparisons(ordered, index + 1, selector_reg) command_values = sorted({int(item["command_value"]) for item in comparisons}) candidate = { "kind": "command_dispatch_candidate", "selector": "rx0_low3_bits", "field": "command_low3", "rx_offset": 0, "rx_address": RX_FRAME_START, "rx_address_hex": _h16(RX_FRAME_START), "source_address": RX_FRAME_START, "source_address_hex": _h16(RX_FRAME_START), "source_field": "byte0", "mask": 0x07, "mask_hex": _h16(0x07), "selector_register": selector_reg, "read_address": int(read["address"]), "read_address_hex": _h16(int(read["address"])), "mask_address": int(ins["address"]), "mask_address_hex": _h16(int(ins["address"])), "command_values": command_values, "command_values_hex": [_h16(value, width=2) for value in command_values], "comparisons": comparisons, "cases": [ { "value": int(item["command_value"]), "value_hex": item["command_value_hex"], "target": int(item["handler_start"]), "target_hex": item["handler_start_hex"], "compare_address": item["compare_address"], "branch_address": item["branch_address"], } for item in comparisons ], "evidence_addresses": _dedupe_ints( [int(read["address"]), int(ins["address"])] + [addr for item in comparisons for addr in item["evidence_addresses"]] ), "confidence": "medium", "caveat": ( "Dispatch is inferred from a read of RX[0], an AND 0x07 mask, and nearby " "compare/branch pairs. Gating state around the dispatch may affect reachability." ), } candidate["evidence_addresses_hex"] = _hlist(candidate["evidence_addresses"]) if best is None or len(comparisons) > len(best["comparisons"]): best = candidate if best: for item in best["comparisons"]: target = item.get("handler_start") if isinstance(target, int) and target in by_index: item["handler_start_index"] = by_index[target] return best def _dispatch_comparisons( ordered: list[JsonObject], start_index: int, selector_reg: str, ) -> list[JsonObject]: comparisons: list[JsonObject] = [] for index in range(start_index, min(len(ordered) - 1, start_index + 96)): ins = ordered[index] address = int(ins.get("address", -1)) if _mnemonic_root(str(ins.get("mnemonic", ""))) not in {"CMP", "CMP:E", "CMP:G", "CMP:I"}: continue if _destination_operand(str(ins.get("operands", ""))).upper() != selector_reg.upper(): continue value = _immediate_source_value(str(ins.get("operands", ""))) if value is None or not 0 <= value <= 7: continue branch = ordered[index + 1] if str(branch.get("mnemonic", "")).upper() != "BEQ": continue targets = _targets(branch) if not targets: continue branch_address = int(branch["address"]) target = int(targets[0]) comparisons.append( { "command_value": value, "command_value_hex": _h16(value, width=2), "compare_address": address, "compare_address_hex": _h16(address), "branch_address": branch_address, "branch_address_hex": _h16(branch_address), "handler_start": target, "handler_start_hex": _h16(target), "evidence_addresses": [address, branch_address], "evidence_addresses_hex": _hlist([address, branch_address]), } ) return comparisons def _command_candidates( ordered: list[JsonObject], dispatch: JsonObject | None, responses: list[JsonObject], ) -> list[JsonObject]: if not dispatch: return [] comparisons = [ item for item in dispatch.get("comparisons", []) if isinstance(item, Mapping) ] starts = sorted({int(item["handler_start"]) for item in comparisons if "handler_start" in item}) ranges = { start: _handler_end(ordered, start, starts) for start in starts } by_value: dict[int, JsonObject] = {} for comparison in comparisons: value = int(comparison["command_value"]) start = int(comparison["handler_start"]) end = ranges.get(start) command = by_value.setdefault( value, { "kind": "command_candidate", "command_value": value, "command_value_hex": _h16(value, width=2), "name_candidate": _command_name_candidate(value), "summary": _command_summary(value), "handler_alternatives": [], "evidence_addresses": [], "response_candidates": [], "rx_reads": [], "confidence": "medium", "caveat": ( "Command value and handler range are inferred from compare/BEQ dispatch. " "No command name or intent is asserted." ), }, ) alternative = { "handler_start": start, "handler_start_hex": _h16(start), "handler_end": end, "handler_end_hex": _h16(end) if end is not None else None, "dispatch_compare_address": comparison["compare_address"], "dispatch_compare_address_hex": comparison["compare_address_hex"], "dispatch_branch_address": comparison["branch_address"], "dispatch_branch_address_hex": comparison["branch_address_hex"], } if alternative not in command["handler_alternatives"]: command["handler_alternatives"].append(alternative) command["evidence_addresses"].extend(dispatch.get("evidence_addresses", [])[:2]) command["evidence_addresses"].extend(comparison.get("evidence_addresses", [])) for command in by_value.values(): alternatives = command["handler_alternatives"] starts_for_command = _dedupe_ints( alt["handler_start"] for alt in alternatives if isinstance(alt["handler_start"], int) ) ends_for_command = _dedupe_ints( alt["handler_end"] for alt in alternatives if isinstance(alt["handler_end"], int) ) command["handler_start"] = starts_for_command[0] if len(starts_for_command) == 1 else None command["handler_start_hex"] = _h16(starts_for_command[0]) if len(starts_for_command) == 1 else None command["handler_end"] = ends_for_command[0] if len(ends_for_command) == 1 else None command["handler_end_hex"] = _h16(ends_for_command[0]) if len(ends_for_command) == 1 else None ranges_for_command = [ (alt["handler_start"], alt["handler_end"]) for alt in alternatives if isinstance(alt["handler_end"], int) ] command["rx_reads"] = _rx_reads_in_ranges(ordered, ranges_for_command) command["response_candidates"] = [ response["id"] for response in responses if _response_in_ranges(response, ranges_for_command) ] command["effects"] = _command_effects( int(command["command_value"]), ordered, ranges_for_command, command["response_candidates"], ) command["effect_summary"] = _command_effect_summary( int(command["command_value"]), command["effects"], ) response_evidence = [ addr for response in responses if response["id"] in command["response_candidates"] for addr in response.get("evidence_addresses", []) ] command["evidence_addresses"] = _dedupe_ints(command["evidence_addresses"] + response_evidence) command["evidence_addresses_hex"] = _hlist(command["evidence_addresses"]) return [by_value[value] for value in sorted(by_value)] def _byte_layout() -> list[JsonObject]: return [ { "offset": 0, "rx_address": RX_FRAME_START, "tx_staging_address": TX_STAGING_START, "name_candidate": "op_flags", "semantic": "low three bits select a command; upper bits are preserved or gated in some paths", "confidence": "medium-high", }, { "offset": 1, "rx_address": RX_FRAME_START + 1, "tx_staging_address": TX_STAGING_START + 1, "name_candidate": "addr_page_flags", "semantic": "candidate high/page byte for logical point/index; bit 7 is tested as a control flag", "confidence": "medium", }, { "offset": 2, "rx_address": RX_FRAME_START + 2, "tx_staging_address": TX_STAGING_START + 2, "name_candidate": "addr_offset", "semantic": "candidate low/offset byte for logical point/index", "confidence": "medium", }, { "offset": 3, "rx_address": RX_FRAME_START + 3, "tx_staging_address": TX_STAGING_START + 3, "name_candidate": "value_hi", "semantic": "candidate high byte of a word value", "confidence": "medium", }, { "offset": 4, "rx_address": RX_FRAME_START + 4, "tx_staging_address": TX_STAGING_START + 4, "name_candidate": "value_lo", "semantic": "candidate low byte of a word value", "confidence": "medium", }, { "offset": 5, "rx_address": RX_CHECKSUM_ADDRESS, "tx_staging_address": None, "name_candidate": "checksum", "semantic": "0x5A-seeded XOR of bytes 0..4", "confidence": "high", }, ] def _command_name_candidate(value: int) -> str: return { 0x00: "set_value_acked", 0x01: "read_value", 0x02: "clear_or_abort", 0x04: "set_value_no_immediate_reply", 0x05: "ack_or_clear_pending", 0x06: "set_secondary_value", 0x07: "retransmit_or_error_reply", }.get(value, f"command_{value:02X}") def _command_summary(value: int) -> str: return { 0x00: "candidate write of RX[3:4] into primary/current tables, followed by a response", 0x01: "candidate read from the primary table, followed by a response carrying the value", 0x02: "candidate clear/abort path with no immediate response builder", 0x04: "candidate write/update path that stores a value without an immediate serial response", 0x05: "candidate pending/event acknowledgement path", 0x06: "candidate secondary-table value write path", 0x07: "candidate retransmit/NAK-style path; error handling also builds command 0x07 responses", }.get(value, "candidate command semantics are unknown") def _command_effect_summary(value: int, effects: list[JsonObject]) -> str: if not effects: return "No structured command effects were inferred." return { 0x00: "Candidate acknowledged set: writes value bytes to primary/current tables, flags the index, and stages an echo-style response.", 0x01: "Candidate read: reads the primary table and stages a value response.", 0x02: "Candidate clear/abort: clears serial session state without an observed immediate response.", 0x04: "Candidate deferred set: writes value bytes and flags the index without an observed immediate response.", 0x05: "Candidate acknowledgement/clear: updates pending/event state without an observed immediate response.", 0x06: "Candidate secondary set: writes value bytes to the secondary table and flags the index.", 0x07: "Candidate retransmit/error reply: reuses prior TX bytes or builds an explicit 0x07 retry/error response.", }.get(value, "Candidate effects are inferred from handler-local writes, reads, calls, and response staging.") def _command_effects( value: int, ordered: list[JsonObject], ranges: list[tuple[int, int]], response_ids: list[str], ) -> list[JsonObject]: effects: list[JsonObject] = [] def add(effect: JsonObject) -> None: effect.setdefault("confidence", "candidate-medium") effect.setdefault( "caveat", "Effect is inferred from local data movement and remains a protocol candidate.", ) evidence = _dedupe_ints( addr for addr in effect.get("evidence_addresses", []) if isinstance(addr, int) ) effect["evidence_addresses"] = evidence effect["evidence_addresses_hex"] = _hlist(evidence) effects.append(effect) table_accesses = _table_accesses_in_ranges(ordered, ranges) state_accesses = _state_accesses_in_ranges(ordered, ranges) if value == 0x00: add( { "kind": "table_write_candidate", "target_candidate": "primary_value_table_candidate", "source_candidate": "RX[3:4] value bytes, with an observed 0x80 fallback when decoded index is zero", "table_base": 0xE000, "table_base_hex": _h16(0xE000), "evidence_addresses": _table_access_addresses(table_accesses, 0x2000, "write"), } ) add( { "kind": "table_write_candidate", "target_candidate": "current_value_table_candidate", "source_candidate": "same candidate value written to the primary table", "table_base": 0xE800, "table_base_hex": _h16(0xE800), "evidence_addresses": _table_access_addresses(table_accesses, 0x1800, "write"), } ) add( { "kind": "flag_update_candidate", "target_candidate": "per_index_flag_table_candidate", "operation_candidate": "set bit 7", "table_base": 0xEC00, "table_base_hex": _h16(0xEC00), "evidence_addresses": _table_access_addresses(table_accesses, 0x1400, "write"), } ) elif value == 0x01: add( { "kind": "table_read_candidate", "target_candidate": "primary_value_table_candidate", "destination_candidate": "response value bytes", "table_base": 0xE000, "table_base_hex": _h16(0xE000), "evidence_addresses": _table_access_addresses(table_accesses, 0x2000, "read"), } ) elif value == 0x02: add( { "kind": "state_clear_candidate", "target_candidate": STATE_VARIABLES[0xFAA2], "state_address": 0xFAA2, "state_address_hex": _h16(0xFAA2), "operation_candidate": "clear bit 7", "evidence_addresses": _state_access_addresses(state_accesses, 0xFAA2), } ) elif value == 0x04: add( { "kind": "table_write_candidate", "target_candidate": "primary_value_table_candidate", "source_candidate": "RX[3:4] value bytes, with an observed 0x80 fallback when decoded index is zero", "table_base": 0xE000, "table_base_hex": _h16(0xE000), "evidence_addresses": _table_access_addresses(table_accesses, 0x2000, "write"), } ) add( { "kind": "flag_update_candidate", "target_candidate": "per_index_flag_table_candidate", "operation_candidate": "set bit 7", "table_base": 0xEC00, "table_base_hex": _h16(0xEC00), "evidence_addresses": _table_access_addresses(table_accesses, 0x1400, "write"), } ) elif value == 0x05: add( { "kind": "pending_acknowledgement_candidate", "target_candidate": "selected event/pending state", "operation_candidate": "clear selected pending flags and then clear serial session state", "evidence_addresses": _dedupe_ints( _state_access_addresses(state_accesses, 0xFAA2) + _state_access_addresses(state_accesses, 0xFAA3) + _state_access_addresses(state_accesses, 0xF9B5) ), } ) elif value == 0x06: add( { "kind": "table_write_candidate", "target_candidate": "secondary_value_table_candidate", "source_candidate": "RX[3:4] value bytes", "table_base": 0xE400, "table_base_hex": _h16(0xE400), "evidence_addresses": _table_access_addresses(table_accesses, 0x1C00, "write"), } ) add( { "kind": "flag_update_candidate", "target_candidate": "per_index_flag_table_candidate", "operation_candidate": "set bit 6", "table_base": 0xEC00, "table_base_hex": _h16(0xEC00), "evidence_addresses": _table_access_addresses(table_accesses, 0x1400, "write"), } ) elif value == 0x07: add( { "kind": "retransmit_candidate", "source_candidate": "previous TX frame bytes H'F858-H'F85C", "destination_candidate": "TX staging bytes H'F850-H'F854 before loc_BA26", "response_candidates": [item for item in response_ids if item == "response_at_BE22"], "evidence_addresses": _addresses_in_ranges(ordered, ranges, 0xBE05, 0xBE22), } ) if response_ids: add( { "kind": "response_staging_candidate", "response_candidates": response_ids, "operation_candidate": "stage F850-F854 and call loc_BA26", "evidence_addresses": [ int(response_id.rsplit("_", 1)[1], 16) for response_id in response_ids if response_id.startswith("response_at_") ], } ) return effects def _command_effect_aliases(commands: list[JsonObject]) -> list[JsonObject]: aliases: list[JsonObject] = [] for command in commands: if not isinstance(command, Mapping): continue aliases.append( { "kind": "command_effects_candidate", "command_value": command.get("command_value"), "command_value_hex": command.get("command_value_hex"), "name_candidate": command.get("name_candidate"), "summary": command.get("effect_summary", command.get("summary")), "effects": command.get("effects", []), "response_candidates": command.get("response_candidates", []), "evidence_addresses": command.get("evidence_addresses", []), "evidence_addresses_hex": command.get("evidence_addresses_hex", []), "confidence": command.get("confidence", "medium"), "caveat": command.get("caveat"), } ) return aliases def _index_decoder_candidate(ordered: list[JsonObject]) -> JsonObject | None: calls = [ ins for ins in ordered if _mnemonic_root(str(ins.get("mnemonic", ""))) in {"BSR", "JSR", "PJSR"} and ( INDEX_DECODER_ADDRESS in _targets(ins) or INDEX_DECODER_LABEL.upper() in str(ins.get("operands", "")).upper() ) ] if not calls: return None evidence_addresses = [int(ins["address"]) for ins in calls] return { "kind": "logical_index_decoder_candidate", "label": INDEX_DECODER_LABEL, "address": INDEX_DECODER_ADDRESS, "address_hex": _h16(INDEX_DECODER_ADDRESS), "input_fields": ["addr_page_flags", "addr_offset"], "output_register": "R5", "post_scale_register": "R4", "post_scale": "R4 = R5 << 1", "mapping_candidate": [ {"page": 0, "offset_range": "0x00-0x7F", "index_range": "0x000-0x07F"}, {"page": 1, "offset_range": "0x00-0xFF", "index_range": "0x080-0x17F"}, {"page": 2, "offset_range": "0x00-0x7F", "index_range": "0x180-0x1FF"}, {"page": "other/overflow", "index": "0x1FF"}, ], "evidence_addresses": evidence_addresses, "evidence_addresses_hex": _hlist(evidence_addresses), "confidence": "medium", "caveat": ( "Mapping is inferred from loc_622B behavior and the nearby R4 = R5 << 1 table-index use." ), } def _response_candidates(ordered: list[JsonObject]) -> list[JsonObject]: responses: list[JsonObject] = [] for index, ins in enumerate(ordered): if not _is_send_builder_call(ins): continue window = _response_window(ordered, index) writes = _staging_writes(window) if not writes: continue reads = _rx_reads(window, RX_FRAME_START + 1, RX_FRAME_START + 4) call_address = int(ins["address"]) evidence_addresses = _dedupe_ints( [write["instruction_address"] for write in writes] + [read["instruction_address"] for read in reads] + [call_address] ) response = { "id": f"response_at_{call_address:04X}", "kind": "response_staging_candidate", "call_address": call_address, "call_address_hex": _h16(call_address), "send_builder": SEND_BUILDER_LABEL, "send_builder_address": SEND_BUILDER_ADDRESS, "send_builder_address_hex": _h16(SEND_BUILDER_ADDRESS), "window_start": int(window[0]["address"]) if window else call_address, "window_start_hex": _h16(int(window[0]["address"])) if window else _h16(call_address), "writes": writes, "rx_reads": reads, "evidence_addresses": evidence_addresses, "evidence_addresses_hex": _hlist(evidence_addresses), "confidence": "medium", "caveat": ( "Response candidate means F850-F854 are written shortly before loc_BA26. " "The analyzer does not prove every byte is meaningful for every path." ), } response["schema"] = _response_schema(response) response["byte_schema"] = response["schema"]["bytes"] responses.append(response) return responses def _response_schema(response: Mapping[str, Any]) -> JsonObject: writes = [ write for write in response.get("writes", []) if isinstance(write, Mapping) ] bytes_out: list[JsonObject] = [] for offset, address in enumerate(range(TX_STAGING_START, TX_STAGING_END + 1)): matching = [ write for write in writes if address in [item for item in write.get("addresses", []) if isinstance(item, int)] ] if not matching: bytes_out.append( { "offset": offset, "byte": f"byte{offset}", "tx_byte": f"TX[{offset}]", "tx_staging_byte": f"TX[{offset}]", "address": address, "address_hex": _h16(address), "source_kind": "unknown", "source_expression": "unknown", "source": {"kind": "unknown"}, "evidence_addresses": [], "evidence_addresses_hex": [], "confidence": "candidate-low", "caveat": "No write to this staging byte was observed in the response window.", } ) continue write = matching[-1] byte_source = _response_byte_source(write, address) evidence = [int(write["instruction_address"])] if isinstance(write.get("instruction_address"), int) else [] bytes_out.append( { "offset": offset, "byte": f"byte{offset}", "tx_byte": f"TX[{offset}]", "tx_staging_byte": f"TX[{offset}]", "address": address, "address_hex": _h16(address), "source_kind": _schema_source_kind(byte_source), "source_expression": _source_expression(byte_source), "source": byte_source, "write_instruction_address": write.get("instruction_address"), "write_instruction_address_hex": write.get("instruction_address_hex"), "instruction": write.get("instruction"), "evidence_addresses": evidence, "evidence_addresses_hex": _hlist(evidence), "confidence": "candidate-medium", "caveat": ( "Per-byte source is inferred from the final observed write to this staging " "byte before loc_BA26." ), } ) evidence_addresses = _dedupe_ints( addr for item in bytes_out for addr in item.get("evidence_addresses", []) if isinstance(addr, int) ) return { "kind": "response_schema_candidate", "response_id": response.get("id"), "call_address": response.get("call_address"), "call_address_hex": response.get("call_address_hex"), "buffer_start": TX_STAGING_START, "buffer_start_hex": _h16(TX_STAGING_START), "buffer_end": TX_STAGING_END, "buffer_end_hex": _h16(TX_STAGING_END), "bytes": bytes_out, "evidence_addresses": evidence_addresses, "evidence_addresses_hex": _hlist(evidence_addresses), "confidence": "candidate-medium" if evidence_addresses else "candidate-low", "caveat": ( "Response schema is a candidate extracted from writes to F850-F854 in the local " "window before loc_BA26; control-flow alternatives may share a send call." ), } def _response_schemas(responses: list[JsonObject]) -> list[JsonObject]: return [ response["schema"] for response in responses if isinstance(response.get("schema"), Mapping) ] def _response_byte_source(write: Mapping[str, Any], address: int) -> JsonObject: source = write.get("source") if not isinstance(source, Mapping): return {"kind": "computed", "operand": write.get("source_operand")} addresses = [item for item in write.get("addresses", []) if isinstance(item, int)] byte_index = addresses.index(address) if address in addresses else 0 kind = str(source.get("kind", "computed")) if kind in {"rx_frame_byte", "rx_frame_word"}: offsets = source.get("rx_offsets") if isinstance(offsets, list) and byte_index < len(offsets) and isinstance(offsets[byte_index], int): rx_offset = int(offsets[byte_index]) else: rx_offset = int(source.get("rx_offset", 0)) + byte_index rx_address = RX_FRAME_START + rx_offset return { "kind": "rx_frame_byte", "rx_offset": rx_offset, "rx_address": rx_address, "rx_address_hex": _h16(rx_address), "derived_from": dict(source), } if kind in {"tx_frame_byte", "tx_frame_word"}: offsets = source.get("tx_offsets") if isinstance(offsets, list) and byte_index < len(offsets) and isinstance(offsets[byte_index], int): tx_offset = int(offsets[byte_index]) else: tx_offset = int(source.get("tx_offset", 0)) + byte_index tx_address = TX_FRAME_START + tx_offset return { "kind": "tx_frame_byte", "tx_offset": tx_offset, "tx_address": tx_address, "tx_address_hex": _h16(tx_address), "derived_from": dict(source), } if kind == "table": output = dict(source) output["byte_index"] = byte_index output["kind"] = "table" return output if kind == "immediate": output = dict(source) output["byte_index"] = byte_index return output output = dict(source) output.setdefault("kind", "computed") output["byte_index"] = byte_index return output def _schema_source_kind(source: Mapping[str, Any]) -> str: kind = str(source.get("kind", "computed")) if kind == "immediate": return "immediate" if kind == "rx_frame_byte": return "rx_frame_byte" if kind == "table": return "table" if kind in {"tx_frame_byte", "tx_frame_word"}: return "tx_frame_byte" return "computed" def _source_expression(source: Mapping[str, Any]) -> str: kind = str(source.get("kind", "computed")) if kind == "immediate" and isinstance(source.get("value"), int): return f"0x{int(source['value']) & 0xFFFF:02X}".lower() if kind == "rx_frame_byte" and isinstance(source.get("rx_offset"), int): return f"rx[{int(source['rx_offset'])}]" if kind == "tx_frame_byte" and isinstance(source.get("tx_offset"), int): return f"tx[{int(source['tx_offset'])}]" if kind == "table": return str(source.get("name_candidate", "table")) return "computed" def _rx_field_candidates( ordered: list[JsonObject], dispatch: JsonObject | None, ) -> list[JsonObject]: fields: list[JsonObject] = [] dispatch_evidence = [] if isinstance(dispatch, Mapping): dispatch_evidence = [ value for value in dispatch.get("evidence_addresses", []) if isinstance(value, int) ] for offset in range(RX_FRAME_LENGTH): address = RX_FRAME_START + offset read_evidence = [ int(ins["address"]) for ins in ordered if _is_read_from_address(ins, address) ] name = "payload_byte" confidence = "candidate-low" caveat = "role is inferred only from frame position" mask = None if offset == 0: name = "command_low3" confidence = "candidate-high" if dispatch else "candidate-medium" caveat = "RX[0] is masked with 0x07 before command comparisons" mask = 0x07 read_evidence = _dedupe_ints(read_evidence + dispatch_evidence) elif offset in {1, 2}: name = "likely_id_or_index" confidence = "candidate-medium" if read_evidence else "candidate-low" caveat = "RX[1:2] are read near logical point/index and response-echo handling" elif offset in {3, 4}: name = "likely_value" confidence = "candidate-medium" if read_evidence else "candidate-low" caveat = "RX[3:4] are read near table-value write/read response handling" elif offset == 5: name = "checksum" confidence = "candidate-high" caveat = "RX[5] is validated by the serial reconstruction checksum evidence" field: JsonObject = { "kind": "rx_field_semantic_candidate", "offset": offset, "name": name, "address": address, "address_hex": _h16(address), "confidence": confidence, "caveat": caveat, "evidence_addresses": _dedupe_ints(read_evidence), "evidence_addresses_hex": _hlist(read_evidence), } if mask is not None: field["mask"] = mask field["mask_hex"] = _h16(mask, width=2) fields.append(field) return fields def _response_builder_aliases(responses: list[JsonObject]) -> list[JsonObject]: builders: list[JsonObject] = [] for response in responses: writes: list[JsonObject] = [] for write in response.get("writes", []): if not isinstance(write, Mapping): continue for address in write.get("addresses", []): if not isinstance(address, int): continue writes.append( { "address": address, "address_hex": _h16(address), "instruction_address": write.get("instruction_address"), "instruction_address_hex": write.get("instruction_address_hex"), "source": write.get("source"), "instruction": write.get("instruction"), } ) builders.append( { "kind": "response_builder_candidate", "buffer_start": TX_STAGING_START, "buffer_start_hex": _h16(TX_STAGING_START), "buffer_end": TX_STAGING_END, "buffer_end_hex": _h16(TX_STAGING_END), "send_call_target": SEND_BUILDER_ADDRESS, "send_call_target_hex": _h16(SEND_BUILDER_ADDRESS), "call_address": response.get("call_address"), "call_address_hex": response.get("call_address_hex"), "writes": writes, "evidence_addresses": response.get("evidence_addresses", []), "evidence_addresses_hex": response.get("evidence_addresses_hex", []), "confidence": response.get("confidence", "medium"), "caveat": response.get("caveat"), } ) return builders def _logical_table_map_candidates(ordered: list[JsonObject]) -> list[JsonObject]: by_offset: dict[int, list[JsonObject]] = {offset: [] for offset in LOGICAL_TABLES} direct_by_address: dict[int, list[JsonObject]] = {address: [] for address in DIRECT_TABLE_CANDIDATES} for ins in ordered: for operand in _negative_indexed_operands(str(ins.get("operands", ""))): offset = int(operand["negative_offset"]) if offset not in LOGICAL_TABLES: continue access = _operand_access_kind(ins, str(operand["operand"])) by_offset[offset].append( { "instruction_address": int(ins["address"]), "instruction_address_hex": _h16(int(ins["address"])), "operand": operand["operand"], "negative_offset": offset, "negative_offset_hex": _h16(offset), "index_register": operand["index_register"], "access": access, "width": _access_width(str(ins.get("mnemonic", ""))), "instruction": str(ins.get("text", "")), } ) for address in DIRECT_TABLE_CANDIDATES: if not _has_ref_in_range(ins, address, address): continue direct_by_address[address].append( { "instruction_address": int(ins["address"]), "instruction_address_hex": _h16(int(ins["address"])), "address": address, "address_hex": _h16(address), "access": _access_direction(ins, address) or "read_write_candidate", "width": _access_width(str(ins.get("mnemonic", ""))), "instruction": str(ins.get("text", "")), } ) candidates: list[JsonObject] = [] for offset, accesses in by_offset.items(): if not accesses: continue metadata = LOGICAL_TABLES[offset] evidence = _dedupe_ints(access["instruction_address"] for access in accesses) logical_base = int(metadata["logical_base_address"]) candidates.append( { "kind": "logical_table_map_candidate", "name_candidate": metadata["name_candidate"], "element_candidate": metadata["element_candidate"], "logical_base_address": logical_base, "logical_base_address_hex": _h16(logical_base), "negative_offset": offset, "negative_offset_hex": _h16(offset), "observed_index_registers": sorted( { str(access["index_register"]) for access in accesses if access.get("index_register") } ), "observed_accesses": sorted( {str(access["access"]) for access in accesses if access.get("access")} ), "observed_widths": sorted( {int(access["width"]) for access in accesses if isinstance(access.get("width"), int)} ), "accesses": accesses, "evidence_addresses": evidence, "evidence_addresses_hex": _hlist(evidence), "confidence": "candidate-medium", "caveat": ( "Logical table base is inferred from negative indexed operands only; " "the table name is a conservative candidate." ), } ) for address, accesses in direct_by_address.items(): if not accesses: continue evidence = _dedupe_ints(access["instruction_address"] for access in accesses) candidates.append( { "kind": "direct_table_map_candidate", "name_candidate": DIRECT_TABLE_CANDIDATES[address], "address": address, "address_hex": _h16(address), "observed_accesses": sorted( {str(access["access"]) for access in accesses if access.get("access")} ), "observed_widths": sorted( {int(access["width"]) for access in accesses if isinstance(access.get("width"), int)} ), "accesses": accesses, "evidence_addresses": evidence, "evidence_addresses_hex": _hlist(evidence), "confidence": "candidate-low", "caveat": ( "Direct table candidate is retained for small synthetic traces; the main " "logical table map uses negative indexed operands." ), } ) return candidates def _state_variable_candidates(ordered: list[JsonObject]) -> list[JsonObject]: candidates: list[JsonObject] = [] serial_region = [ ins for ins in ordered if SERIAL_HANDLER_START <= int(ins.get("address", -1)) <= SERIAL_HANDLER_END ] if not any( _has_ref_in_range(ins, min(STATE_VARIABLES), max(STATE_VARIABLES)) for ins in serial_region ): serial_region = ordered for address, name in STATE_VARIABLES.items(): accesses: list[JsonObject] = [] for ins in serial_region: if not _has_ref_in_range(ins, address, address): continue access = _access_direction(ins, address) or "read_write_candidate" source, _destination = _source_destination_operands(str(ins.get("operands", ""))) immediate = _parse_immediate(source) item: JsonObject = { "instruction_address": int(ins["address"]), "instruction_address_hex": _h16(int(ins["address"])), "access": access, "mnemonic": str(ins.get("mnemonic", "")), "instruction": str(ins.get("text", "")), } bit = _bit_number_from_instruction(ins) if bit is not None: item["bit"] = bit if immediate is not None: item["immediate"] = immediate item["immediate_hex"] = _h16(immediate, width=2 if immediate <= 0xFF else 4) accesses.append(item) if not accesses: continue evidence = _dedupe_ints(access["instruction_address"] for access in accesses) candidates.append( { "kind": "serial_state_variable_candidate", "name_candidate": name, "address": address, "address_hex": _h16(address), "access_count": len(accesses), "read_count": sum(1 for access in accesses if str(access["access"]) == "read"), "write_count": sum(1 for access in accesses if str(access["access"]) == "write"), "bit_candidates": sorted( {int(access["bit"]) for access in accesses if isinstance(access.get("bit"), int)} ), "immediate_values": _dedupe_ints( int(access["immediate"]) for access in accesses if isinstance(access.get("immediate"), int) ), "immediate_values_hex": _hlist( int(access["immediate"]) for access in accesses if isinstance(access.get("immediate"), int) ), "accesses": accesses, "evidence_addresses": evidence, "evidence_addresses_hex": _hlist(evidence), "confidence": "candidate-medium", "caveat": ( "Role is inferred from references in the serial handler region and remains " "a state-variable candidate." ), } ) return candidates def _generic_checksum_error_branches( ordered: list[JsonObject], responses: list[JsonObject], ) -> JsonObject: response_by_call = { int(response["call_address"]): response for response in responses if isinstance(response.get("call_address"), int) } response_ids_by_call = { call: str(response["id"]) for call, response in response_by_call.items() if isinstance(response.get("id"), str) } response_calls = sorted(response_by_call) error_response_calls = { call for call, response in response_by_call.items() if _response_has_immediate_byte(response, 0, 0x07) } branch_addresses: list[int] = [] branch_targets: list[int] = [] response_ids: list[str] = [] evidence_addresses: list[int] = [] for index, ins in enumerate(ordered): if not _has_ref_in_range(ins, RX_CHECKSUM_ADDRESS, RX_CHECKSUM_ADDRESS): continue if _mnemonic_root(str(ins.get("mnemonic", ""))) not in {"CMP", "CMP:E", "CMP:G", "CMP:I"}: continue evidence_addresses.append(int(ins["address"])) for follower in ordered[index + 1:index + 4]: targets = _targets(follower) if not targets: continue branch_address = int(follower["address"]) branch_addresses.append(branch_address) evidence_addresses.append(branch_address) for target in targets: branch_targets.append(target) response_call = _first_response_call_at_or_after(response_calls, target, max_distance=0x80) if response_call is None: continue if error_response_calls and response_call not in error_response_calls: continue response_ids.append(response_ids_by_call[response_call]) evidence_addresses.extend(_response_evidence_addresses(responses, [response_ids_by_call[response_call]])) break return { "branch_addresses": _dedupe_ints(branch_addresses), "branch_targets": _dedupe_ints(branch_targets), "response_ids": _dedupe_strings(response_ids), "evidence_addresses": _dedupe_ints(evidence_addresses), } def _first_response_call_at_or_after( response_calls: list[int], address: int, *, max_distance: int, ) -> int | None: for call in response_calls: if address <= call <= address + max_distance: return call return None def _responses_with_immediate_byte( responses: list[JsonObject], offset: int, value: int, ) -> list[str]: return _dedupe_strings( str(response["id"]) for response in responses if isinstance(response.get("id"), str) and _response_has_immediate_byte(response, offset, value) ) def _response_has_immediate_byte(response: Mapping[str, Any], offset: int, value: int) -> bool: schema = response.get("schema") if not isinstance(schema, Mapping): return False for item in schema.get("bytes", []): if not isinstance(item, Mapping): continue if item.get("offset") != offset: continue source = item.get("source") return ( isinstance(source, Mapping) and source.get("kind") == "immediate" and source.get("value") == value ) return False def _response_evidence_addresses( responses: list[JsonObject], response_ids: Iterable[str], ) -> list[int]: wanted = set(response_ids) return _dedupe_ints( int(address) for response in responses if response.get("id") in wanted for address in response.get("evidence_addresses", []) if isinstance(address, int) ) def _retry_error_model(ordered: list[JsonObject], responses: list[JsonObject]) -> JsonObject | None: checksum_path = [ ins for ins in ordered if 0xBBD6 <= int(ins.get("address", -1)) <= 0xBBF0 ] retry_path = [ ins for ins in ordered if 0xBE29 <= int(ins.get("address", -1)) <= 0xBE6A ] retransmit_path = [ ins for ins in ordered if 0xBE05 <= int(ins.get("address", -1)) <= 0xBE22 ] checksum_branch = [ int(ins["address"]) for ins in checksum_path if 0xBE29 in _targets(ins) ] generic_checksum_branch = _generic_checksum_error_branches(ordered, responses) checksum_branch = _dedupe_ints(checksum_branch + generic_checksum_branch["branch_addresses"]) retry_counter_evidence = [ int(ins["address"]) for ins in retry_path if _has_ref_in_range(ins, 0xFAA6, 0xFAA6) ] explicit_error_response = [ response.get("id") for response in responses if response.get("id") == "response_at_BE6A" ] checksum_error_response = _dedupe_strings( [item for item in explicit_error_response if isinstance(item, str)] + generic_checksum_branch["response_ids"] ) retransmit_response = [ response.get("id") for response in responses if response.get("id") == "response_at_BE22" ] command_0x07_responses = _responses_with_immediate_byte(responses, 0, 0x07) retransmit_response = _dedupe_strings( [item for item in retransmit_response if isinstance(item, str)] + [ item for item in command_0x07_responses if item not in checksum_error_response ] ) if ( not checksum_branch and not retry_counter_evidence and not retransmit_response and not checksum_error_response ): return None checksum_evidence = _dedupe_ints( [int(ins["address"]) for ins in checksum_path if _has_ref_in_range(ins, RX_FRAME_START, RX_FRAME_END)] + checksum_branch + generic_checksum_branch["evidence_addresses"] ) retry_evidence = _dedupe_ints( [int(ins["address"]) for ins in retry_path if _has_ref_in_range(ins, 0xFAA2, 0xFAA6)] + [int(ins["address"]) for ins in retry_path if _has_ref_in_range(ins, TX_STAGING_START, TX_STAGING_END)] + [int(ins["address"]) for ins in retry_path if _is_send_builder_call(ins)] + _response_evidence_addresses(responses, checksum_error_response) ) retransmit_evidence = _dedupe_ints( [int(ins["address"]) for ins in retransmit_path if _has_ref_in_range(ins, TX_FRAME_START, TX_FRAME_END)] + [int(ins["address"]) for ins in retransmit_path if _has_ref_in_range(ins, TX_STAGING_START, TX_STAGING_END)] + [int(ins["address"]) for ins in retransmit_path if _is_send_builder_call(ins)] + _response_evidence_addresses(responses, retransmit_response) ) evidence = _dedupe_ints(checksum_evidence + retry_evidence + retransmit_evidence) return { "kind": "serial_retry_error_model_candidate", "checksum_failure_path": { "condition_candidate": "0x5A-seeded XOR over RX[0..4] differs from RX[5]", "error_target": "loc_BE29", "error_target_address": 0xBE29, "error_target_address_hex": _h16(0xBE29), "checksum_error_response_candidates": checksum_error_response, "branch_evidence_addresses": checksum_branch, "branch_evidence_addresses_hex": _hlist(checksum_branch), "evidence_addresses": checksum_evidence, "evidence_addresses_hex": _hlist(checksum_evidence), "confidence": "candidate-high" if checksum_branch else "candidate-low", }, "retry_path": { "entry_label": "loc_BE29", "entry_address": 0xBE29, "entry_address_hex": _h16(0xBE29), "counter_address": 0xFAA6, "counter_address_hex": _h16(0xFAA6), "threshold_candidate": 2, "response_candidates": checksum_error_response, "summary": ( "Candidate retry path clears/consults serial flags, increments FAA6, compares it " "with 2, and when still below the apparent limit stages a command 0x07 response." ), "evidence_addresses": retry_evidence, "evidence_addresses_hex": _hlist(retry_evidence), "confidence": "candidate-medium" if retry_counter_evidence else "candidate-low", }, "command_0x07_path": { "entry_label": "loc_BE05", "entry_address": 0xBE05, "entry_address_hex": _h16(0xBE05), "response_candidates": retransmit_response, "summary": ( "Candidate retransmit/explicit command 0x07 path either copies previous TX " "frame bytes back to F850-F854 or stages an observed 0x07 response before loc_BA26." ), "evidence_addresses": retransmit_evidence, "evidence_addresses_hex": _hlist(retransmit_evidence), "confidence": "candidate-medium" if retransmit_response else "candidate-low", }, "evidence_addresses": evidence, "evidence_addresses_hex": _hlist(evidence), "confidence": "candidate-medium", "caveat": ( "The retry/error model is inferred from checksum branch targets, retry-counter state, " "and response staging; exact host-visible semantics remain candidate phrasing." ), } def _gate_queue_model(ordered: list[JsonObject], commands: list[JsonObject]) -> JsonObject | None: evidence = _dedupe_ints( _addresses_in_ranges(ordered, [(MAIN_REPORT_GATE_ENTRY, MAIN_REPORT_GATE_CALL)], MAIN_REPORT_GATE_ENTRY, MAIN_REPORT_GATE_CALL) + _addresses_in_ranges(ordered, [(SESSION_GATE_ENTRY, 0x4007)], SESSION_GATE_ENTRY, 0x4007) + _addresses_in_ranges(ordered, [(QUEUE_REPORT_ENTRY, AUTONOMOUS_TX_REPORT_CALL)], QUEUE_REPORT_ENTRY, AUTONOMOUS_TX_REPORT_CALL) + _addresses_in_ranges(ordered, [(RESEND_GATE_ENTRY, PERIODIC_RESEND_ENTRY)], RESEND_GATE_ENTRY, PERIODIC_RESEND_ENTRY) ) command_ack_values = [ int(command["command_value"]) for command in commands if command.get("command_value") in {0x05, 0x06} ] if not evidence and not command_ack_values: return None return { "kind": "serial_gate_queue_state_machine_candidate", "summary": ( "Conservative model for autonomous report gating, queue cursor comparison, " "periodic resend, and RX/session side effects." ), "predicates": [ { "name": "main_loop_may_enter_report_builder", "entry_label": "loc_3FD3", "target_label": "loc_BAF2", "condition_candidate": ( "FAA2 == 0 && F9C0 == 0 && ((FAA5.bit7 == 0) || (F9C3 == 0))" ), "summary": "Main-loop report gate; session must be idle, TX busy timer clear, and RX gate open.", "state_addresses_hex": [_h16(0xFAA2), _h16(0xFAA5), _h16(0xF9C3), _h16(0xF9C0)], "evidence_addresses": _addresses_in_ranges( ordered, [(MAIN_REPORT_GATE_ENTRY, MAIN_REPORT_GATE_CALL)], MAIN_REPORT_GATE_ENTRY, MAIN_REPORT_GATE_CALL, ), }, { "name": "queue_has_pending_report", "entry_label": "loc_BAF2", "condition_candidate": "F9B5 != F9B0", "summary": "Queue/pending cursor gate; non-empty state stages through BB43 before loc_BA26.", "state_addresses_hex": [_h16(0xF9B5), _h16(0xF9B0)], "staging_path": ["loc_BAF2", "loc_BB43", "loc_BA26"], "evidence_addresses": _addresses_in_ranges( ordered, [(QUEUE_REPORT_ENTRY, AUTONOMOUS_TX_REPORT_CALL)], QUEUE_REPORT_ENTRY, AUTONOMOUS_TX_REPORT_CALL, ), }, { "name": "periodic_resend_may_fire", "entry_label": "loc_BE9E", "target_label": "loc_BED5", "condition_candidate": ( "(FAA5 & FAA3 & 0x80) != 0 && F9C6 == 0 && F9C8 != 0 after countdown" ), "summary": "Resend gate masks pending state with FAA5, checks F9C6/F9C8, then calls BA26 at BED5.", "state_addresses_hex": [_h16(0xFAA5), _h16(0xFAA3), _h16(0xF9C6), _h16(0xF9C8)], "evidence_addresses": _addresses_in_ranges( ordered, [(RESEND_GATE_ENTRY, PERIODIC_RESEND_ENTRY)], RESEND_GATE_ENTRY, PERIODIC_RESEND_ENTRY, ), }, ], "session_effects": [ { "name": "rx_completion_sets_session_timer", "summary": "RX completion sets F9C5 (observed reload H'14) after the sixth byte is captured.", "state_addresses_hex": [_h16(0xF9C5)], "evidence_addresses": _state_immediate_evidence(ordered, 0xF9C5, 0x14), }, { "name": "session_timeout_clears_gate_and_queue", "entry_label": "loc_3FEF", "summary": "When F9C5 is clear, loc_3FEF clears F9B5/F9B0 and clears FAA5.bit7; when nonzero, it sets FAA5.bit7.", "state_addresses_hex": [_h16(0xF9C5), _h16(0xF9B5), _h16(0xF9B0), _h16(0xFAA5)], "evidence_addresses": _addresses_in_ranges( ordered, [(SESSION_GATE_ENTRY, 0x4007)], SESSION_GATE_ENTRY, 0x4007, ), }, { "name": "host_ack_can_advance_queue", "summary": "Commands 0x05/0x06 are modeled as acknowledgement paths that can clear pending state or advance F9B5.", "command_values_hex": [_h16(value, width=2) for value in command_ack_values], "state_addresses_hex": [_h16(0xF9B5)], "evidence_addresses": _dedupe_ints( addr for command in commands if command.get("command_value") in {0x05, 0x06} for addr in command.get("evidence_addresses", []) if isinstance(addr, int) ), }, ], "caveat": ( "Many panel controls may require host/session traffic before reporting. Observed " "autonomous call/camera-power indexes are runtime/capture overlays, not ROM constants." ), "confidence": "candidate-medium", "evidence_addresses": evidence, "evidence_addresses_hex": _hlist(evidence), } def _tx_report_model(ordered: list[JsonObject], responses: list[JsonObject]) -> JsonObject | None: report_responses = [ response for response in responses if response.get("call_address") == AUTONOMOUS_TX_REPORT_CALL ] if not report_responses: report_responses = [ response for response in responses if _response_reads_current_value_table(response) and not _response_reads_rx_frame(response) ] if not report_responses: return None response_ids = [ str(response["id"]) for response in report_responses if isinstance(response.get("id"), str) ] evidence = _dedupe_ints( addr for response in report_responses for addr in response.get("evidence_addresses", []) if isinstance(addr, int) ) byte_roles = [ { "offset": 0, "field_candidate": "encoded_logical_index_or_report_id_byte0", "source_candidate": "computed from candidate logical index/report id", }, { "offset": 1, "field_candidate": "encoded_logical_index_or_report_id_byte1", "source_candidate": "computed from candidate logical index/report id", }, { "offset": 2, "field_candidate": "encoded_logical_index_or_report_id_byte2", "source_candidate": "computed from candidate logical index/report id", }, { "offset": 3, "field_candidate": "current_value_hi", "source_candidate": "current_value_table_candidate high byte", "table_candidate": "current_value_table_candidate", }, { "offset": 4, "field_candidate": "current_value_lo", "source_candidate": "current_value_table_candidate low byte", "table_candidate": "current_value_table_candidate", }, { "offset": 5, "field_candidate": "checksum", "source_candidate": "0x5A XOR TX[0..4]", }, ] return { "kind": "bb43_to_ba26_tx_report_model_candidate", "direction": "device_to_host_autonomous_report_candidate", "entry_label": AUTONOMOUS_TX_REPORT_LABEL, "entry_address": AUTONOMOUS_TX_REPORT_CALL, "entry_address_hex": _h16(AUTONOMOUS_TX_REPORT_CALL), "send_builder": SEND_BUILDER_LABEL, "send_builder_address": SEND_BUILDER_ADDRESS, "send_builder_address_hex": _h16(SEND_BUILDER_ADDRESS), "response_candidates": _dedupe_strings(response_ids), "summary": ( "TX report bytes 0..2 are computed encoded logical index/report id bytes, " "bytes 3..4 come from current_value_table_candidate, and byte5 is the " "0x5A XOR checksum." ), "byte_roles": byte_roles, "value_source_candidate": "current_value_table_candidate", "checksum_formula": "checksum = 0x5A ^ byte0 ^ byte1 ^ byte2 ^ byte3 ^ byte4", "observed_capture_overlay_candidates": OBSERVED_TX_REPORT_OVERLAY, "observed_autonomous_output_caveat": ( "Real captures supplied so far show only heartbeat/idle, call, and camera-power " "autonomous TX frames. Other panel controls may require a host/device request or " "state transition before the firmware reports them." ), "confidence": "candidate-medium", "caveat": ( "This is a TX/report model for the BB43 -> BA26 path, separate from RX command " "dispatch. Observed report names are a capture overlay candidate only, not hard-coded " "source truth." ), "evidence_addresses": evidence, "evidence_addresses_hex": _hlist(evidence), } def _periodic_resend_model(ordered: list[JsonObject], responses: list[JsonObject]) -> JsonObject | None: del responses period_evidence = _state_immediate_evidence(ordered, 0xF9C6, 0x01F4) countdown_evidence = _state_immediate_evidence(ordered, 0xF9C8, 0x14) pending_evidence = _state_immediate_evidence(ordered, 0xFAA3, 0x80) pending_evidence = _dedupe_ints(pending_evidence + _state_bit_evidence(ordered, 0xFAA3, 7)) resend_evidence = [ int(ins["address"]) for ins in ordered if PERIODIC_RESEND_ENTRY <= int(ins.get("address", -1)) <= SERIAL_HANDLER_END ] resend_send_evidence = [ int(ins["address"]) for ins in ordered if PERIODIC_RESEND_ENTRY <= int(ins.get("address", -1)) <= SERIAL_HANDLER_END and (_is_send_builder_call(ins) or _has_ref_in_range(ins, TX_STAGING_START, TX_FRAME_END)) ] evidence = _dedupe_ints(period_evidence + countdown_evidence + pending_evidence + resend_send_evidence) if not evidence and not resend_evidence: return None return { "kind": "autonomous_periodic_resend_model_candidate", "period_timer": { "address": 0xF9C6, "address_hex": _h16(0xF9C6), "reload_value_candidate": 0x01F4, "reload_value_hex": _h16(0x01F4), "summary": "Candidate periodic report/heartbeat timer reload.", "evidence_addresses": period_evidence, "evidence_addresses_hex": _hlist(period_evidence), }, "resend_countdown": { "address": 0xF9C8, "address_hex": _h16(0xF9C8), "reload_value_candidate": 0x14, "reload_value_hex": _h16(0x14, width=2), "summary": "Candidate periodic resend countdown/retry spacing value.", "evidence_addresses": countdown_evidence, "evidence_addresses_hex": _hlist(countdown_evidence), }, "pending_mask": { "address": 0xFAA3, "address_hex": _h16(0xFAA3), "mask_candidate": 0x80, "mask_hex": _h16(0x80, width=2), "summary": "Candidate bit/mask that marks an autonomous report pending.", "evidence_addresses": pending_evidence, "evidence_addresses_hex": _hlist(pending_evidence), }, "resend_path": { "entry_label": "loc_BED5", "entry_address": PERIODIC_RESEND_ENTRY, "entry_address_hex": _h16(PERIODIC_RESEND_ENTRY), "summary": "Candidate periodic resend path feeding the TX staging/send-builder flow.", "evidence_addresses": _dedupe_ints(resend_send_evidence or resend_evidence), "evidence_addresses_hex": _hlist(resend_send_evidence or resend_evidence), }, "evidence_addresses": evidence, "evidence_addresses_hex": _hlist(evidence), "confidence": "candidate-medium" if evidence else "candidate-low", "caveat": ( "Timer and resend roles are inferred from constants/state references around F9C6, " "F9C8, FAA3, and loc_BED5; exact scheduling units remain candidate phrasing." ), } def _response_reads_current_value_table(response: Mapping[str, Any]) -> bool: schema = response.get("schema") if not isinstance(schema, Mapping): return False return any( isinstance(item, Mapping) and isinstance(item.get("source"), Mapping) and item["source"].get("kind") == "table" and item["source"].get("name_candidate") == "current_value_table_candidate" for item in schema.get("bytes", []) ) def _response_reads_rx_frame(response: Mapping[str, Any]) -> bool: schema = response.get("schema") if not isinstance(schema, Mapping): return False return any( isinstance(item, Mapping) and isinstance(item.get("source"), Mapping) and item["source"].get("kind") == "rx_frame_byte" for item in schema.get("bytes", []) ) def _state_immediate_evidence(ordered: list[JsonObject], state_address: int, value: int) -> list[int]: evidence = [] for ins in ordered: if not _has_ref_in_range(ins, state_address, state_address): continue source, _destination = _source_destination_operands(str(ins.get("operands", ""))) if _parse_immediate(source) == value: evidence.append(int(ins["address"])) return _dedupe_ints(evidence) def _state_bit_evidence(ordered: list[JsonObject], state_address: int, bit: int) -> list[int]: return _dedupe_ints( int(ins["address"]) for ins in ordered if _has_ref_in_range(ins, state_address, state_address) and _bit_number_from_instruction(ins) == bit ) def _send_builder_candidate( ordered: list[JsonObject], responses: list[JsonObject], tx_candidate: Mapping[str, Any] | None, ) -> JsonObject: copies = [] builder_body = [ ins for ins in ordered if SEND_BUILDER_ADDRESS <= int(ins.get("address", -1)) <= 0xBA83 ] for ins in builder_body: source, destination = _source_destination_operands(str(ins.get("operands", ""))) source_address = _first_address_in_range(ins, TX_STAGING_START, TX_STAGING_END, operand=source) destination_address = _first_address_in_range(ins, TX_FRAME_START, TX_FRAME_END, operand=destination) if source_address is None or destination_address is None: continue copies.append( { "instruction_address": int(ins["address"]), "instruction_address_hex": _h16(int(ins["address"])), "source_address": source_address, "source_address_hex": _h16(source_address), "destination_address": destination_address, "destination_address_hex": _h16(destination_address), "instruction": str(ins.get("text", "")), } ) call_addresses = [int(response["call_address"]) for response in responses] evidence_addresses = _dedupe_ints( [int(ins["address"]) for ins in builder_body if _has_ref_in_range(ins, TX_FRAME_START, TX_FRAME_END)] + call_addresses ) return { "kind": "tx_send_builder_candidate", "label": SEND_BUILDER_LABEL, "address": SEND_BUILDER_ADDRESS, "address_hex": _h16(SEND_BUILDER_ADDRESS), "staging_buffer_start": TX_STAGING_START, "staging_buffer_start_hex": _h16(TX_STAGING_START), "staging_buffer_end": TX_STAGING_END, "staging_buffer_end_hex": _h16(TX_STAGING_END), "tx_frame_start": TX_FRAME_START, "tx_frame_start_hex": _h16(TX_FRAME_START), "tx_frame_end": TX_FRAME_END, "tx_frame_end_hex": _h16(TX_FRAME_END), "checksum_address": TX_CHECKSUM_ADDRESS, "checksum_address_hex": _h16(TX_CHECKSUM_ADDRESS), "checksum_seed": CHECKSUM_SEED, "checksum_seed_hex": _h16(CHECKSUM_SEED), "staging_to_frame_copies": copies, "response_call_addresses": call_addresses, "response_call_addresses_hex": _hlist(call_addresses), "serial_reconstruction_candidate_id": tx_candidate.get("id") if tx_candidate else None, "evidence_addresses": evidence_addresses, "evidence_addresses_hex": _hlist(evidence_addresses), "confidence": "high" if copies and tx_candidate else "medium" if copies else "low", "caveat": ( "loc_BA26 is treated as a send builder because it copies F850-F854 into the " "evidence-supported TX frame and then starts SCI1 transmission." ), } def _top_level_evidence( ordered: list[JsonObject], dispatch: JsonObject | None, responses: list[JsonObject], rx_candidate: Mapping[str, Any] | None, tx_candidate: Mapping[str, Any] | None, ) -> list[JsonObject]: evidence: list[JsonObject] = [] if rx_candidate: evidence.append( { "kind": "rx_frame_reconstruction_present", "summary": "serial_reconstruction contains an evidence-supported SCI1 RX frame candidate", "candidate_id": rx_candidate.get("id"), } ) if tx_candidate: evidence.append( { "kind": "tx_frame_reconstruction_present", "summary": "serial_reconstruction contains an evidence-supported SCI1 TX frame candidate", "candidate_id": tx_candidate.get("id"), } ) if dispatch: evidence.append( { "kind": "rx0_masked_command_dispatch", "summary": "RX[0] is read, masked with 0x07, and compared against command values", "addresses": dispatch.get("evidence_addresses", []), "addresses_hex": dispatch.get("evidence_addresses_hex", []), } ) if responses: addresses = _dedupe_ints( [addr for response in responses for addr in response.get("evidence_addresses", [])] ) evidence.append( { "kind": "responses_stage_f850_f854_before_send", "summary": "F850-F854 writes are observed before calls to loc_BA26", "addresses": addresses, "addresses_hex": _hlist(addresses), "response_count": len(responses), } ) tx_report_responses = [ response for response in responses if response.get("call_address") == AUTONOMOUS_TX_REPORT_CALL ] if tx_report_responses: addresses = _dedupe_ints( addr for response in tx_report_responses for addr in response.get("evidence_addresses", []) if isinstance(addr, int) ) evidence.append( { "kind": "bb43_autonomous_tx_report_path", "summary": ( "BB43 stages a candidate device-to-host report before loc_BA26; this is " "separate from RX command dispatch." ), "addresses": addresses, "addresses_hex": _hlist(addresses), } ) rx_payload_reads = [ int(ins["address"]) for ins in ordered if any(_is_read_from_address(ins, address) for address in range(RX_FRAME_START + 1, RX_FRAME_START + 5)) ] if rx_payload_reads: evidence.append( { "kind": "rx_payload_bytes_read", "summary": "RX[1..4] are read in the command-processing region", "addresses": _dedupe_ints(rx_payload_reads), "addresses_hex": _hlist(rx_payload_reads), } ) return evidence def _response_window(ordered: list[JsonObject], call_index: int) -> list[JsonObject]: start = call_index for index in range(call_index - 1, max(-1, call_index - 48), -1): candidate = ordered[index] mnemonic = str(candidate.get("mnemonic", "")).upper() if mnemonic in {"RTS", "RTE"}: break if candidate.get("kind") == "branch" and mnemonic != "BSR": break start = index return ordered[start:call_index] def _staging_writes(window: list[JsonObject]) -> list[JsonObject]: writes: list[JsonObject] = [] for index, ins in enumerate(window): touched = _written_addresses_in_range(ins, TX_STAGING_START, TX_STAGING_END) if not touched: continue source, _destination = _source_destination_operands(str(ins.get("operands", ""))) source_info = _source_info(window, index, source) writes.append( { "instruction_address": int(ins["address"]), "instruction_address_hex": _h16(int(ins["address"])), "addresses": touched, "addresses_hex": _hlist(touched), "source_operand": source, "source": source_info, "instruction": str(ins.get("text", "")), } ) return writes def _source_info(window: list[JsonObject], index: int, source: str) -> JsonObject: return _resolve_source_info(window, index, source, []) def _resolve_source_info( window: list[JsonObject], index: int, source: str, transforms: list[str], ) -> JsonObject: immediate = _parse_immediate(source) if immediate is not None: output = { "kind": "immediate", "value": immediate, "value_hex": _h16(immediate, width=2 if immediate <= 0xFF else 4), } if transforms: output["transforms"] = transforms return output direct = _direct_source_info(window[index] if 0 <= index < len(window) else {}, source) if direct is not None: if transforms: direct = dict(direct) direct["transforms"] = transforms return direct source_upper = source.upper() for prior_index in range(index - 1, max(-1, index - 12), -1): prior = window[prior_index] prior_source, prior_destination = _source_destination_operands(str(prior.get("operands", ""))) if prior_destination.upper() != source_upper: continue root = _mnemonic_root(str(prior.get("mnemonic", ""))) if root in {"BTST", "CMP", "CMP:E", "CMP:G", "CMP:I", "TST"}: continue if root == "SWAP": resolved = _resolve_source_info(window, prior_index, source, transforms + ["swap_bytes"]) elif root not in {"MOV:G", "MOV:S", "MOV:E", "MOV:I", "MOV:L", "MOV:F", "LDC"}: resolved = { "kind": "register_or_computed", "operand": source, "source_category": "computed", "operation": root, } elif prior_source: resolved = _resolve_source_info(window, prior_index, prior_source, transforms) else: resolved = {"kind": "register_or_computed", "operand": source} resolved = dict(resolved) resolved.setdefault("evidence_address", int(prior["address"])) resolved.setdefault("evidence_address_hex", _h16(int(prior["address"]))) resolved.setdefault("instruction", str(prior.get("text", ""))) return resolved return { "kind": "register_or_computed", "operand": source, "source_category": "computed", } def _direct_source_info(ins: Mapping[str, Any], source: str) -> JsonObject | None: rx_address = _first_address_in_range(ins, RX_FRAME_START, RX_FRAME_END, operand=source) if rx_address is not None and _is_read_from_address(ins, rx_address): width = _access_width(str(ins.get("mnemonic", ""))) offsets = [ address - RX_FRAME_START for address in range(rx_address, min(rx_address + width - 1, RX_FRAME_END) + 1) ] return { "kind": "rx_frame_word" if len(offsets) > 1 else "rx_frame_byte", "rx_offset": rx_address - RX_FRAME_START, "rx_offsets": offsets, "rx_address": rx_address, "rx_address_hex": _h16(rx_address), "evidence_address": int(ins["address"]) if isinstance(ins.get("address"), int) else None, "evidence_address_hex": _h16(int(ins["address"])) if isinstance(ins.get("address"), int) else None, "instruction": str(ins.get("text", "")), } tx_address = _first_address_in_range(ins, TX_FRAME_START, TX_FRAME_END, operand=source) if tx_address is not None and _is_read_from_address(ins, tx_address): width = _access_width(str(ins.get("mnemonic", ""))) offsets = [ address - TX_FRAME_START for address in range(tx_address, min(tx_address + width - 1, TX_FRAME_END) + 1) ] return { "kind": "tx_frame_word" if len(offsets) > 1 else "tx_frame_byte", "tx_offset": tx_address - TX_FRAME_START, "tx_offsets": offsets, "tx_address": tx_address, "tx_address_hex": _h16(tx_address), "evidence_address": int(ins["address"]) if isinstance(ins.get("address"), int) else None, "evidence_address_hex": _h16(int(ins["address"])) if isinstance(ins.get("address"), int) else None, "instruction": str(ins.get("text", "")), } table = _table_operand_candidate(source) if table is not None: output = dict(table) output["kind"] = "table" output["access_width"] = _access_width(str(ins.get("mnemonic", ""))) output["evidence_address"] = int(ins["address"]) if isinstance(ins.get("address"), int) else None output["evidence_address_hex"] = _h16(int(ins["address"])) if isinstance(ins.get("address"), int) else None output["instruction"] = str(ins.get("text", "")) return output return None def _rx_reads(window: list[JsonObject], start: int, end: int) -> list[JsonObject]: reads: list[JsonObject] = [] for ins in window: for address in range(start, end + 1): if not _is_read_from_address(ins, address): continue reads.append( { "instruction_address": int(ins["address"]), "instruction_address_hex": _h16(int(ins["address"])), "rx_offset": address - RX_FRAME_START, "rx_address": address, "rx_address_hex": _h16(address), "instruction": str(ins.get("text", "")), } ) return reads def _rx_reads_in_ranges( ordered: list[JsonObject], ranges: list[tuple[int, int]], ) -> list[JsonObject]: reads: list[JsonObject] = [] for ins in ordered: address = int(ins.get("address", -1)) if not any(start <= address <= end for start, end in ranges): continue reads.extend(_rx_reads([ins], RX_FRAME_START + 1, RX_FRAME_START + 4)) seen: set[tuple[int, int]] = set() output: list[JsonObject] = [] for read in reads: key = (int(read["instruction_address"]), int(read["rx_address"])) if key in seen: continue seen.add(key) output.append(read) return output def _response_in_ranges(response: Mapping[str, Any], ranges: list[tuple[int, int]]) -> bool: call_address = int(response.get("call_address", -1)) return any(start <= call_address <= end for start, end in ranges) def _addresses_in_ranges( ordered: list[JsonObject], ranges: list[tuple[int, int]], start: int, end: int, ) -> list[int]: return _dedupe_ints( int(ins["address"]) for ins in ordered if start <= int(ins.get("address", -1)) <= end and any(range_start <= int(ins.get("address", -1)) <= range_end for range_start, range_end in ranges) ) def _table_accesses_in_ranges( ordered: list[JsonObject], ranges: list[tuple[int, int]], ) -> list[JsonObject]: accesses: list[JsonObject] = [] for ins in ordered: address = int(ins.get("address", -1)) if not any(start <= address <= end for start, end in ranges): continue for operand in _negative_indexed_operands(str(ins.get("operands", ""))): offset = int(operand["negative_offset"]) if offset not in LOGICAL_TABLES: continue candidate = _table_operand_candidate(str(operand["operand"])) if candidate is None: continue accesses.append( { "instruction_address": address, "negative_offset": offset, "access": _operand_access_kind(ins, str(operand["operand"])), "candidate": candidate, } ) for direct_address, offset in DIRECT_TABLE_TO_LOGICAL_OFFSET.items(): if not _has_ref_in_range(ins, direct_address, direct_address): continue accesses.append( { "instruction_address": address, "negative_offset": offset, "access": _access_direction(ins, direct_address) or "read_write_candidate", "candidate": DIRECT_TABLE_CANDIDATES[direct_address], "direct_address": direct_address, "direct_address_hex": _h16(direct_address), } ) return accesses def _state_accesses_in_ranges( ordered: list[JsonObject], ranges: list[tuple[int, int]], ) -> list[JsonObject]: accesses: list[JsonObject] = [] for ins in ordered: address = int(ins.get("address", -1)) if not any(start <= address <= end for start, end in ranges): continue for state_address in STATE_VARIABLES: if not _has_ref_in_range(ins, state_address, state_address): continue accesses.append( { "instruction_address": address, "state_address": state_address, "access": _access_direction(ins, state_address) or "read_write_candidate", } ) return accesses def _table_access_addresses( accesses: list[JsonObject], negative_offset: int, access_kind: str | None = None, ) -> list[int]: return _dedupe_ints( int(access["instruction_address"]) for access in accesses if access.get("negative_offset") == negative_offset and (access_kind is None or access.get("access") == access_kind) ) def _state_access_addresses(accesses: list[JsonObject], address: int) -> list[int]: return _dedupe_ints( int(access["instruction_address"]) for access in accesses if access.get("state_address") == address ) def _handler_end( ordered: list[JsonObject], start: int, handler_starts: list[int], ) -> int | None: addresses = [int(ins["address"]) for ins in ordered] try: start_index = addresses.index(start) except ValueError: return None later_starts = [candidate for candidate in handler_starts if candidate > start] if later_starts: next_start = min(later_starts) previous = [address for address in addresses if start <= address < next_start] return previous[-1] if previous else None for ins in ordered[start_index:]: mnemonic = str(ins.get("mnemonic", "")).upper() if mnemonic in {"RTS", "RTE"}: return int(ins["address"]) if mnemonic == "BRA" and SEND_BUILDER_ADDRESS not in _targets(ins): targets = _targets(ins) if targets and targets[0] >= 0xBE6D: return int(ins["address"]) return None def _find_prior_read( ordered: list[JsonObject], index: int, address: int, destination_register: str, ) -> JsonObject | None: for candidate in reversed(ordered[max(0, index - 6) : index]): if not _is_read_from_address(candidate, address): continue if _destination_operand(str(candidate.get("operands", ""))).upper() == destination_register.upper(): return candidate return None def _is_send_builder_call(ins: Mapping[str, Any]) -> bool: mnemonic = str(ins.get("mnemonic", "")).upper() if mnemonic not in {"BSR", "JSR", "PJSR"}: return False if SEND_BUILDER_ADDRESS in _targets(ins): return True return SEND_BUILDER_LABEL.upper() in str(ins.get("operands", "")).upper() def _written_addresses_in_range(ins: Mapping[str, Any], start: int, end: int) -> list[int]: if not _has_ref_in_range(ins, start, end): return [] source, destination = _source_destination_operands(str(ins.get("operands", ""))) del source base = _first_address_in_range(ins, start, end, operand=destination) if base is None or not _is_write_to_address(ins, base): return [] width = _access_width(str(ins.get("mnemonic", ""))) return [address for address in range(base, min(base + width - 1, end) + 1)] def _read_addresses_in_range(ins: Mapping[str, Any], start: int, end: int) -> list[int]: if not _has_ref_in_range(ins, start, end): return [] source, _destination = _source_destination_operands(str(ins.get("operands", ""))) base = _first_address_in_range(ins, start, end, operand=source) if base is None or not _is_read_from_address(ins, base): return [] width = _access_width(str(ins.get("mnemonic", ""))) return [address for address in range(base, min(base + width - 1, end) + 1)] def _is_read_from_address(ins: Mapping[str, Any], address: int) -> bool: source, destination = _source_destination_operands(str(ins.get("operands", ""))) if _operand_mentions_address(source, address): return True if address not in _references(ins): return False if source.startswith("@") and not _operand_mentions_any_reference(destination, _references(ins)): return True return _access_direction(ins, address) == "read" def _is_write_to_address(ins: Mapping[str, Any], address: int) -> bool: _source, destination = _source_destination_operands(str(ins.get("operands", ""))) if _operand_mentions_address(destination, address): return _access_direction(ins, address) == "write" if address not in _references(ins): return False return _access_direction(ins, address) == "write" def _access_direction(ins: Mapping[str, Any], address: int) -> str | None: root = _mnemonic_root(str(ins.get("mnemonic", ""))) if root in {"BTST", "CMP", "CMP:E", "CMP:G", "CMP:I", "MOVFPE", "TST"}: return "read" if root in {"BCLR", "BNOT", "BSET", "CLR", "INC", "INC:G", "NEG", "NOT"}: return "write" if root in {"ADD:Q", "ADD:G", "ADDS", "ADDX", "AND", "OR", "SUB", "SUBS", "SUBX", "XOR"}: return "write" if root in {"MOV:G", "MOV:S", "MOVTPE"}: source, destination = _source_destination_operands(str(ins.get("operands", ""))) if _operand_mentions_address(destination, address): return "write" if _operand_mentions_address(source, address): return "read" if address in _references(ins): if destination.startswith("@") and not _operand_mentions_any_reference(source, _references(ins)): return "write" if source.startswith("@") and not _operand_mentions_any_reference(destination, _references(ins)): return "read" if root in {"MOV:L", "MOV:F"}: return "read" if root == "STC": return "write" if root == "LDC": return "read" return None def _first_address_in_range( ins: Mapping[str, Any], start: int, end: int, *, operand: str = "", ) -> int | None: if operand: for address in range(start, end + 1): if _operand_mentions_address(operand, address): return address for address in _references(ins): if start <= address <= end: return address return None def _has_ref_in_range(ins: Mapping[str, Any], start: int, end: int) -> bool: return any(start <= address <= end for address in _references(ins)) def _references(ins: Mapping[str, Any]) -> list[int]: references = ins.get("references", []) output: list[int] = [] if not isinstance(references, list): return output for reference in references: if isinstance(reference, Mapping) and isinstance(reference.get("address"), int): output.append(int(reference["address"])) elif isinstance(reference, int): output.append(reference) return output def _targets(ins: Mapping[str, Any]) -> list[int]: targets = ins.get("targets", []) if not isinstance(targets, list): return [] return [int(target) for target in targets if isinstance(target, int)] def _instruction_sequence(value: object) -> list[JsonObject]: if isinstance(value, Mapping): values: Iterable[Any] = value.values() elif isinstance(value, list): values = value else: values = [] return sorted( [item for item in values if isinstance(item, dict) and isinstance(item.get("address"), int)], key=lambda item: int(item["address"]), ) def _serial_reconstruction(payload: Mapping[str, Any]) -> Mapping[str, Any]: serial = payload.get("serial_reconstruction") return serial if isinstance(serial, Mapping) else {} def _candidate_by_kind(serial: Mapping[str, Any], kind: str) -> Mapping[str, Any] | None: candidates = serial.get("candidates") if not isinstance(candidates, list): return None for candidate in candidates: if isinstance(candidate, Mapping) and candidate.get("kind") == kind: return candidate return None def _source_destination_operands(operands: str) -> tuple[str, str]: depth = 0 split_at: int | None = None for index, char in enumerate(operands): if char in "({": depth += 1 elif char in ")}" and depth: depth -= 1 elif char == "," and depth == 0: split_at = index if split_at is None: operand = operands.strip() return "", operand return operands[:split_at].strip(), operands[split_at + 1 :].strip() def _destination_operand(operands: str) -> str: return _source_destination_operands(operands)[1] def _immediate_source_value(operands: str) -> int | None: source, _destination = _source_destination_operands(operands) if not source.startswith("#"): return None return _parse_immediate(source) def _parse_immediate(operand: str) -> int | None: text = operand.strip() if text.startswith("#"): text = text[1:].strip() try: if text.upper().startswith("H'"): return int(text[2:], 16) & 0xFFFF if text.upper().startswith("0X"): return int(text, 16) & 0xFFFF if text.upper().startswith("$"): return int(text[1:], 16) & 0xFFFF return int(text, 10) & 0xFFFF except ValueError: return None def _negative_indexed_operands(operands: str) -> list[JsonObject]: matches: list[JsonObject] = [] for match in re.finditer(r"@\(-H'([0-9A-Fa-f]+),\s*(R[0-7])\)", operands): offset = int(match.group(1), 16) & 0xFFFF matches.append( { "operand": match.group(0), "negative_offset": offset, "negative_offset_hex": _h16(offset), "index_register": match.group(2).upper(), } ) return matches def _table_operand_candidate(operand: str) -> JsonObject | None: operands = _negative_indexed_operands(operand) if not operands: return None first = operands[0] offset = int(first["negative_offset"]) metadata = LOGICAL_TABLES.get(offset) if metadata is None: return None logical_base = int(metadata["logical_base_address"]) return { "name_candidate": metadata["name_candidate"], "element_candidate": metadata["element_candidate"], "logical_base_address": logical_base, "logical_base_address_hex": _h16(logical_base), "negative_offset": offset, "negative_offset_hex": _h16(offset), "index_register": first["index_register"], "operand": first["operand"], } def _operand_access_kind(ins: Mapping[str, Any], operand: str) -> str: root = _mnemonic_root(str(ins.get("mnemonic", ""))) source, destination = _source_destination_operands(str(ins.get("operands", ""))) if root in {"BTST", "CMP", "CMP:E", "CMP:G", "CMP:I", "TST"}: return "read" if root in {"BCLR", "BNOT", "BSET", "CLR", "INC", "INC:G", "NEG", "NOT"}: return "write" if operand in destination and operand not in source: return "write" if operand in source and operand not in destination: return "read" if root in {"ADD:Q", "ADD:G", "ADDS", "ADDX", "AND", "OR", "SUB", "SUBS", "SUBX", "XOR"}: return "write" return "read_write_candidate" def _bit_number_from_instruction(ins: Mapping[str, Any]) -> int | None: root = _mnemonic_root(str(ins.get("mnemonic", ""))) if root not in {"BCLR", "BNOT", "BSET", "BTST"}: return None source, _destination = _source_destination_operands(str(ins.get("operands", ""))) value = _parse_immediate(source) if value is None: return None return value & 0x0F def _operand_mentions_any_reference(operand: str, references: list[int]) -> bool: return any(_operand_mentions_address(operand, address) for address in references) def _operand_mentions_address(operand: str, address: int) -> bool: operand_upper = operand.upper().replace(" ", "") names = { TX_STAGING_START: ("TX_STAGING",), TX_FRAME_START: ("TX_FRAME",), TX_CHECKSUM_ADDRESS: ("TX_CHECKSUM",), RX_FRAME_START: ("RX_FRAME",), RX_CHECKSUM_ADDRESS: ("RX_CHECKSUM",), } if any(name in operand_upper for name in names.get(address, ())): return True negative = (0x10000 - address) & 0xFFFF return ( f"H'{address:04X}" in operand_upper or f"0X{address:04X}" in operand_upper or f"${address:04X}" in operand_upper or f"-H'{negative:04X}" in operand_upper or f"-0X{negative:04X}" in operand_upper or f"-${negative:04X}" in operand_upper ) def _mnemonic_root(mnemonic: str) -> str: return mnemonic.rsplit(".", 1)[0].upper() def _access_width(mnemonic: str) -> int: upper = mnemonic.upper() if upper.endswith(".L"): return 4 if upper.endswith(".W"): return 2 return 1 def _confidence_score( frame_supported: bool, dispatch: JsonObject | None, responses: list[JsonObject], commands: list[JsonObject], ) -> float: score = 0.2 if frame_supported: score += 0.25 if dispatch: score += 0.2 if responses: score += min(0.2, 0.04 * len(responses)) if commands: score += min(0.15, 0.02 * len(commands)) return round(min(score, 0.9), 2) def _confidence_label(score: float) -> str: if score >= 0.75: return "medium-high" if score >= 0.5: return "medium" return "low" def _dedupe_ints(values: Iterable[int]) -> list[int]: seen: set[int] = set() output: list[int] = [] for value in values: if value in seen: continue seen.add(value) output.append(value) return output def _dedupe_strings(values: Iterable[str]) -> list[str]: seen: set[str] = set() output: list[str] = [] for value in values: if value in seen: continue seen.add(value) output.append(value) return output def _hlist(values: Iterable[int]) -> list[str]: return [_h16(value) for value in _dedupe_ints(values)] def _h16(value: int, *, width: int = 4) -> str: return f"H'{value & 0xFFFF:0{width}X}" __all__ = ["analyze_serial_semantics"]