3198 lines
128 KiB
Python
3198 lines
128 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
from collections.abc import Iterable, Mapping
|
|
from typing import Any
|
|
|
|
from .panel_selectors import panel_selector_semantics_payload
|
|
|
|
|
|
JsonObject = dict[str, Any]
|
|
|
|
RX_FRAME_START = 0xF860
|
|
RX_FRAME_END = 0xF865
|
|
RX_CHECKSUM_ADDRESS = RX_FRAME_END
|
|
RX_FRAME_LENGTH = 6
|
|
|
|
TX_STAGING_START = 0xF850
|
|
TX_STAGING_END = 0xF854
|
|
TX_STAGING_LENGTH = 5
|
|
TX_FRAME_START = 0xF858
|
|
TX_FRAME_END = 0xF85D
|
|
TX_CHECKSUM_ADDRESS = TX_FRAME_END
|
|
|
|
SEND_BUILDER_ADDRESS = 0xBA26
|
|
SEND_BUILDER_LABEL = "loc_BA26"
|
|
AUTONOMOUS_TX_REPORT_CALL = 0xBB43
|
|
AUTONOMOUS_TX_REPORT_LABEL = "loc_BB43"
|
|
MAIN_REPORT_GATE_ENTRY = 0x3FD3
|
|
MAIN_REPORT_GATE_CALL = 0x3FEB
|
|
SESSION_GATE_ENTRY = 0x3FEF
|
|
IDLE_REPORT_GATE_ENTRY = 0x4046
|
|
IDLE_REPORT_QUEUE_WRITE = 0x4067
|
|
IDLE_REPORT_GATE_END = 0x4070
|
|
QUEUE_REPORT_ENTRY = 0xBAF2
|
|
RESEND_GATE_ENTRY = 0xBE9E
|
|
PERIODIC_RESEND_ENTRY = 0xBED5
|
|
FRT1_OCIA_ENTRY = 0xBEEA
|
|
FRT2_OCIA_ENTRY = 0xBF23
|
|
INDEX_DECODER_ADDRESS = 0x622B
|
|
INDEX_DECODER_LABEL = "loc_622B"
|
|
CHECKSUM_SEED = 0x5A
|
|
SERIAL_HANDLER_START = 0xBA26
|
|
SERIAL_HANDLER_END = 0xBEFF
|
|
|
|
LOGICAL_TABLES = {
|
|
0x2000: {
|
|
"logical_base_address": 0xE000,
|
|
"name_candidate": "primary_value_table_candidate",
|
|
"element_candidate": "word_value",
|
|
},
|
|
0x1C00: {
|
|
"logical_base_address": 0xE400,
|
|
"name_candidate": "secondary_value_table_candidate",
|
|
"element_candidate": "word_value",
|
|
},
|
|
0x1800: {
|
|
"logical_base_address": 0xE800,
|
|
"name_candidate": "current_value_table_candidate",
|
|
"element_candidate": "word_value",
|
|
},
|
|
0x1400: {
|
|
"logical_base_address": 0xEC00,
|
|
"name_candidate": "flag_table_candidate",
|
|
"element_candidate": "bit_flags",
|
|
},
|
|
}
|
|
|
|
DIRECT_TABLE_CANDIDATES = {
|
|
0xF900: "primary_value_table_candidate",
|
|
0xF920: "current_value_table_candidate",
|
|
0xF940: "secondary_value_table_candidate",
|
|
0xF980: "flag_table_candidate",
|
|
}
|
|
|
|
DIRECT_TABLE_TO_LOGICAL_OFFSET = {
|
|
0xF900: 0x2000,
|
|
0xF920: 0x1800,
|
|
0xF940: 0x1C00,
|
|
0xF980: 0x1400,
|
|
}
|
|
|
|
STATE_VARIABLES = {
|
|
0xFAA2: "serial_session_flags_candidate",
|
|
0xFAA3: "serial_pending_mask_candidate",
|
|
0xFAA4: "serial_rx_error_or_retry_gate_candidate",
|
|
0xFAA5: "serial_retry_enable_or_mode_flags_candidate",
|
|
0xFAA6: "serial_retry_counter_candidate",
|
|
0xF9B4: "event_queue_read_cursor_candidate",
|
|
0xF9B5: "event_queue_write_or_pending_cursor_candidate",
|
|
0xF9B9: "event_queue_base_or_current_slot_candidate",
|
|
0xF9C0: "serial_tx_busy_timer_candidate",
|
|
0xF9C4: "idle_heartbeat_gate_countdown_candidate",
|
|
0xF9C5: "rx_session_timeout_candidate",
|
|
0xF9C6: "autonomous_report_period_timer_candidate",
|
|
0xF9C8: "autonomous_report_resend_countdown_candidate",
|
|
}
|
|
|
|
OBSERVED_TX_REPORT_OVERLAY = [
|
|
{
|
|
"logical_index": 0x0000,
|
|
"name_candidate": "heartbeat_or_idle_report_candidate",
|
|
"observed_frames_hex": ["00 00 00 00 80 DA"],
|
|
"observed_period_ms_candidate": 700,
|
|
},
|
|
{
|
|
"logical_index": 0x0015,
|
|
"name_candidate": "call_button_report_candidate",
|
|
"observed_frames_hex": ["00 00 15 80 00 CF", "00 00 15 00 00 4F"],
|
|
},
|
|
{
|
|
"logical_index": 0x0007,
|
|
"name_candidate": "camera_power_report_candidate",
|
|
"observed_frames_hex": ["00 00 07 80 00 DD"],
|
|
},
|
|
]
|
|
|
|
|
|
def analyze_serial_semantics(payload: Mapping[str, Any]) -> JsonObject:
|
|
"""Infer conservative SCI1 frame/command semantics from decompiler JSON."""
|
|
ordered = _instruction_sequence(payload.get("instructions"))
|
|
reconstruction = _serial_reconstruction(payload)
|
|
|
|
rx_candidate = _candidate_by_kind(reconstruction, "candidate_sci1_rx_frame")
|
|
tx_candidate = _candidate_by_kind(reconstruction, "candidate_sci1_tx_frame")
|
|
frame_supported = bool(rx_candidate and tx_candidate)
|
|
if not frame_supported:
|
|
return {
|
|
"kind": "serial_semantics",
|
|
"protocol_semantics": [],
|
|
"fields": [],
|
|
"command_dispatch": None,
|
|
"commands": [],
|
|
"command_effects": [],
|
|
"response_candidates": [],
|
|
"response_schemas": [],
|
|
"response_schema": [],
|
|
"logical_table_map_candidates": [],
|
|
"table_map_candidates": [],
|
|
"state_variable_candidates": [],
|
|
"retry_error_model": None,
|
|
"gate_queue_model": None,
|
|
"tx_report_model": None,
|
|
"periodic_resend_model": None,
|
|
"timer_interrupt_model": None,
|
|
"panel_selector_semantics": [],
|
|
"confidence": "low",
|
|
"confidence_score": 0.0,
|
|
"caveat": "No protocol semantics are emitted without both RX and TX serial reconstruction candidates.",
|
|
}
|
|
|
|
dispatch = _find_command_dispatch(ordered)
|
|
responses = _response_candidates(ordered)
|
|
commands = _command_candidates(ordered, dispatch, responses)
|
|
fields = _field_candidates(ordered, dispatch, responses)
|
|
send_builder = _send_builder_candidate(ordered, responses, tx_candidate)
|
|
logical_tables = _logical_table_map_candidates(ordered)
|
|
state_variables = _state_variable_candidates(ordered)
|
|
retry_error_model = _retry_error_model(ordered, responses)
|
|
gate_queue_model = _gate_queue_model(ordered, commands)
|
|
tx_report_model = _tx_report_model(ordered, responses)
|
|
periodic_resend_model = _periodic_resend_model(ordered, responses)
|
|
timer_interrupt_model = _timer_interrupt_model(ordered)
|
|
evidence = _top_level_evidence(ordered, dispatch, responses, rx_candidate, tx_candidate)
|
|
|
|
confidence_score = _confidence_score(frame_supported, dispatch, responses, commands)
|
|
protocol = {
|
|
"kind": "serial_semantics",
|
|
"scope": "evidence_supported_sci1_6_byte_frame",
|
|
"confidence": _confidence_label(confidence_score),
|
|
"confidence_score": confidence_score,
|
|
"caveat": (
|
|
"Semantic names are candidates only. The analyzer reports byte roles, command values, "
|
|
"dispatch targets, and response staging patterns observed in code; it does not prove "
|
|
"source-level intent or protocol documentation."
|
|
),
|
|
"frame_candidate": {
|
|
"channel": "SCI1",
|
|
"rx_frame_start": RX_FRAME_START,
|
|
"rx_frame_start_hex": _h16(RX_FRAME_START),
|
|
"rx_frame_end": RX_FRAME_END,
|
|
"rx_frame_end_hex": _h16(RX_FRAME_END),
|
|
"tx_staging_start": TX_STAGING_START,
|
|
"tx_staging_start_hex": _h16(TX_STAGING_START),
|
|
"tx_staging_end": TX_STAGING_END,
|
|
"tx_staging_end_hex": _h16(TX_STAGING_END),
|
|
"tx_frame_start": TX_FRAME_START,
|
|
"tx_frame_start_hex": _h16(TX_FRAME_START),
|
|
"tx_frame_end": TX_FRAME_END,
|
|
"tx_frame_end_hex": _h16(TX_FRAME_END),
|
|
"frame_length": RX_FRAME_LENGTH,
|
|
"tx_staging_length": TX_STAGING_LENGTH,
|
|
"checksum_seed": CHECKSUM_SEED,
|
|
"checksum_seed_hex": _h16(CHECKSUM_SEED),
|
|
"serial_reconstruction_supported": frame_supported,
|
|
"rx_reconstruction_candidate_id": rx_candidate.get("id") if rx_candidate else None,
|
|
"tx_reconstruction_candidate_id": tx_candidate.get("id") if tx_candidate else None,
|
|
},
|
|
"byte_layout": _byte_layout(),
|
|
"fields": fields,
|
|
"command_dispatch": dispatch,
|
|
"commands": commands,
|
|
"command_effects": _command_effect_aliases(commands),
|
|
"index_decoder": _index_decoder_candidate(ordered),
|
|
"logical_table_map_candidates": logical_tables,
|
|
"table_map_candidates": logical_tables,
|
|
"state_variable_candidates": state_variables,
|
|
"send_builder": send_builder,
|
|
"response_candidates": responses,
|
|
"response_schemas": _response_schemas(responses),
|
|
"response_schema": _response_schemas(responses),
|
|
"rx_fields": _rx_field_candidates(ordered, dispatch),
|
|
"response_builders": _response_builder_aliases(responses),
|
|
"retry_error_model": retry_error_model,
|
|
"gate_queue_model": gate_queue_model,
|
|
"tx_report_model": tx_report_model,
|
|
"periodic_resend_model": periodic_resend_model,
|
|
"timer_interrupt_model": timer_interrupt_model,
|
|
"panel_selector_semantics": panel_selector_semantics_payload(),
|
|
"evidence": evidence,
|
|
}
|
|
return {
|
|
"kind": "serial_semantics",
|
|
"protocol_semantics": [protocol],
|
|
"fields": protocol["fields"],
|
|
"command_dispatch": protocol["command_dispatch"],
|
|
"commands": protocol["commands"],
|
|
"command_effects": protocol["command_effects"],
|
|
"response_candidates": protocol["response_candidates"],
|
|
"response_schemas": protocol["response_schemas"],
|
|
"response_schema": protocol["response_schema"],
|
|
"send_builder": protocol["send_builder"],
|
|
"logical_table_map_candidates": protocol["logical_table_map_candidates"],
|
|
"table_map_candidates": protocol["table_map_candidates"],
|
|
"state_variable_candidates": protocol["state_variable_candidates"],
|
|
"retry_error_model": protocol["retry_error_model"],
|
|
"gate_queue_model": protocol["gate_queue_model"],
|
|
"tx_report_model": protocol["tx_report_model"],
|
|
"periodic_resend_model": protocol["periodic_resend_model"],
|
|
"timer_interrupt_model": protocol["timer_interrupt_model"],
|
|
"panel_selector_semantics": protocol["panel_selector_semantics"],
|
|
"confidence": protocol["confidence"],
|
|
"confidence_score": protocol["confidence_score"],
|
|
"caveat": protocol["caveat"],
|
|
}
|
|
|
|
|
|
def _field_candidates(
|
|
ordered: list[JsonObject],
|
|
dispatch: JsonObject | None,
|
|
responses: list[JsonObject],
|
|
) -> list[JsonObject]:
|
|
fields: list[JsonObject] = []
|
|
response_write_map: dict[int, list[int]] = {}
|
|
for response in responses:
|
|
for write in response.get("writes", []):
|
|
if not isinstance(write, Mapping):
|
|
continue
|
|
for address in write.get("addresses", []):
|
|
if isinstance(address, int):
|
|
response_write_map.setdefault(address, []).append(int(write["instruction_address"]))
|
|
|
|
rx_reads = {
|
|
address: [ins["address"] for ins in ordered if _is_read_from_address(ins, address)]
|
|
for address in range(RX_FRAME_START, RX_FRAME_END + 1)
|
|
}
|
|
rx_writes = {
|
|
address: [ins["address"] for ins in ordered if _is_write_to_address(ins, address)]
|
|
for address in range(RX_FRAME_START, RX_FRAME_END + 1)
|
|
}
|
|
|
|
dispatch_addresses = set(dispatch.get("evidence_addresses", []) if dispatch else [])
|
|
for offset, address in enumerate(range(RX_FRAME_START, RX_FRAME_END + 1)):
|
|
role = "payload_byte_candidate"
|
|
caveat = "Role is inferred from reads in command processing."
|
|
if offset == 0:
|
|
role = "command_selector_candidate"
|
|
caveat = "RX[0] is masked with 0x07 before command comparisons."
|
|
elif address == RX_CHECKSUM_ADDRESS:
|
|
role = "checksum_byte_candidate"
|
|
caveat = "RX[5] is compared with a checksum over RX[0..4]."
|
|
fields.append(
|
|
{
|
|
"id": f"rx_{offset}",
|
|
"kind": "rx_frame_field_candidate",
|
|
"offset": offset,
|
|
"address": address,
|
|
"address_hex": _h16(address),
|
|
"role_candidate": role,
|
|
"evidence_addresses": _dedupe_ints(
|
|
rx_reads[address]
|
|
+ rx_writes[address]
|
|
+ ([addr for addr in dispatch_addresses if offset == 0])
|
|
),
|
|
"evidence_addresses_hex": _hlist(
|
|
rx_reads[address]
|
|
+ rx_writes[address]
|
|
+ ([addr for addr in dispatch_addresses if offset == 0])
|
|
),
|
|
"read_count": len(rx_reads[address]),
|
|
"write_count": len(rx_writes[address]),
|
|
"confidence": "medium" if rx_reads[address] else "low",
|
|
"caveat": caveat,
|
|
}
|
|
)
|
|
|
|
for offset, address in enumerate(range(TX_STAGING_START, TX_STAGING_END + 1)):
|
|
write_addresses = _dedupe_ints(response_write_map.get(address, []))
|
|
fields.append(
|
|
{
|
|
"id": f"tx_staging_{offset}",
|
|
"kind": "tx_staging_field_candidate",
|
|
"offset": offset,
|
|
"address": address,
|
|
"address_hex": _h16(address),
|
|
"role_candidate": "response_staging_byte_candidate",
|
|
"evidence_addresses": write_addresses,
|
|
"evidence_addresses_hex": _hlist(write_addresses),
|
|
"write_count": len(write_addresses),
|
|
"confidence": "medium" if write_addresses else "low",
|
|
"caveat": (
|
|
"This byte is staged before calls to loc_BA26; the analyzer does not infer "
|
|
"a stable field name beyond response position."
|
|
),
|
|
}
|
|
)
|
|
|
|
return fields
|
|
|
|
|
|
def _rx_field_candidates(
|
|
ordered: list[JsonObject],
|
|
dispatch: JsonObject | None,
|
|
) -> list[JsonObject]:
|
|
read_map = {
|
|
address: [
|
|
int(ins["address"])
|
|
for ins in ordered
|
|
if address in _read_addresses_in_range(ins, RX_FRAME_START, RX_FRAME_END)
|
|
]
|
|
for address in range(RX_FRAME_START, RX_FRAME_END + 1)
|
|
}
|
|
fields: list[JsonObject] = []
|
|
for offset, address in enumerate(range(RX_FRAME_START, RX_FRAME_END + 1)):
|
|
if offset == 0 and dispatch:
|
|
name = "command_low3"
|
|
confidence = "candidate-medium"
|
|
mask = dispatch.get("mask")
|
|
evidence = _dedupe_ints(read_map[address] + dispatch.get("evidence_addresses", []))
|
|
elif offset in {1, 2}:
|
|
name = "likely_id_or_index"
|
|
confidence = "candidate-low"
|
|
mask = None
|
|
evidence = read_map[address]
|
|
elif offset in {3, 4}:
|
|
name = "likely_value"
|
|
confidence = "candidate-low"
|
|
mask = None
|
|
evidence = read_map[address]
|
|
else:
|
|
name = "checksum"
|
|
confidence = "candidate-medium"
|
|
mask = None
|
|
evidence = read_map[address]
|
|
field = {
|
|
"offset": offset,
|
|
"field": f"byte{offset}",
|
|
"name": name,
|
|
"address": address,
|
|
"address_hex": _h16(address),
|
|
"confidence": confidence,
|
|
"caveat": "Field name is inferred from access pattern and remains a candidate.",
|
|
"evidence_addresses": _dedupe_ints(evidence),
|
|
"evidence_addresses_hex": _hlist(evidence),
|
|
}
|
|
if mask is not None:
|
|
field["mask"] = mask
|
|
field["mask_hex"] = _h16(int(mask), width=2)
|
|
fields.append(field)
|
|
return fields
|
|
|
|
|
|
def _find_command_dispatch(ordered: list[JsonObject]) -> JsonObject | None:
|
|
by_index = {int(ins["address"]): index for index, ins in enumerate(ordered) if "address" in ins}
|
|
best: JsonObject | None = None
|
|
|
|
for index, ins in enumerate(ordered):
|
|
if _mnemonic_root(ins.get("mnemonic", "")) != "AND":
|
|
continue
|
|
if _immediate_source_value(str(ins.get("operands", ""))) != 0x07:
|
|
continue
|
|
_source, selector_reg = _source_destination_operands(str(ins.get("operands", "")))
|
|
if not selector_reg:
|
|
continue
|
|
|
|
read = _find_prior_read(ordered, index, RX_FRAME_START, selector_reg)
|
|
if read is None:
|
|
continue
|
|
|
|
comparisons = _dispatch_comparisons(ordered, index + 1, selector_reg)
|
|
state_split = _dispatcher_state_split(ordered, index + 1, comparisons)
|
|
if state_split:
|
|
_annotate_dispatch_comparison_availability(comparisons, state_split)
|
|
command_values = sorted({int(item["command_value"]) for item in comparisons})
|
|
candidate = {
|
|
"kind": "command_dispatch_candidate",
|
|
"selector": "rx0_low3_bits",
|
|
"field": "command_low3",
|
|
"rx_offset": 0,
|
|
"rx_address": RX_FRAME_START,
|
|
"rx_address_hex": _h16(RX_FRAME_START),
|
|
"source_address": RX_FRAME_START,
|
|
"source_address_hex": _h16(RX_FRAME_START),
|
|
"source_field": "byte0",
|
|
"mask": 0x07,
|
|
"mask_hex": _h16(0x07),
|
|
"selector_register": selector_reg,
|
|
"read_address": int(read["address"]),
|
|
"read_address_hex": _h16(int(read["address"])),
|
|
"mask_address": int(ins["address"]),
|
|
"mask_address_hex": _h16(int(ins["address"])),
|
|
"command_values": command_values,
|
|
"command_values_hex": [_h16(value, width=2) for value in command_values],
|
|
"comparisons": comparisons,
|
|
"state_split": state_split,
|
|
"dispatcher_split": state_split,
|
|
"cases": [
|
|
{
|
|
"value": int(item["command_value"]),
|
|
"value_hex": item["command_value_hex"],
|
|
"target": int(item["handler_start"]),
|
|
"target_hex": item["handler_start_hex"],
|
|
"compare_address": item["compare_address"],
|
|
"branch_address": item["branch_address"],
|
|
"availability": item.get("availability"),
|
|
"availability_conditions": item.get("availability_conditions", []),
|
|
}
|
|
for item in comparisons
|
|
],
|
|
"evidence_addresses": _dedupe_ints(
|
|
[int(read["address"]), int(ins["address"])]
|
|
+ [addr for item in comparisons for addr in item["evidence_addresses"]]
|
|
),
|
|
"confidence": "medium",
|
|
"caveat": (
|
|
"Dispatch is inferred from a read of RX[0], an AND 0x07 mask, and nearby "
|
|
"compare/branch pairs. Gating state around the dispatch may affect reachability."
|
|
),
|
|
}
|
|
candidate["evidence_addresses_hex"] = _hlist(candidate["evidence_addresses"])
|
|
if best is None or len(comparisons) > len(best["comparisons"]):
|
|
best = candidate
|
|
|
|
if best:
|
|
for item in best["comparisons"]:
|
|
target = item.get("handler_start")
|
|
if isinstance(target, int) and target in by_index:
|
|
item["handler_start_index"] = by_index[target]
|
|
return best
|
|
|
|
|
|
def _dispatcher_state_split(
|
|
ordered: list[JsonObject],
|
|
start_index: int,
|
|
comparisons: list[JsonObject],
|
|
) -> JsonObject | None:
|
|
if not comparisons:
|
|
return None
|
|
for index in range(start_index, min(len(ordered) - 1, start_index + 24)):
|
|
ins = ordered[index]
|
|
if not _has_ref_in_range(ins, 0xFAA2, 0xFAA2):
|
|
continue
|
|
if _mnemonic_root(str(ins.get("mnemonic", ""))) != "TST":
|
|
continue
|
|
branch = ordered[index + 1]
|
|
if _mnemonic_root(str(branch.get("mnemonic", ""))) != "BNE":
|
|
continue
|
|
targets = _targets(branch)
|
|
if not targets:
|
|
continue
|
|
continuation_start = int(targets[0])
|
|
initial_values = sorted(
|
|
{
|
|
int(item["command_value"])
|
|
for item in comparisons
|
|
if int(item.get("compare_address", 0)) < continuation_start
|
|
}
|
|
)
|
|
continuation_values = sorted(
|
|
{
|
|
int(item["command_value"])
|
|
for item in comparisons
|
|
if int(item.get("compare_address", 0)) >= continuation_start
|
|
}
|
|
)
|
|
if not initial_values and not continuation_values:
|
|
continue
|
|
return {
|
|
"kind": "serial_command_dispatch_state_split",
|
|
"state_address": 0xFAA2,
|
|
"state_address_hex": _h16(0xFAA2),
|
|
"test_address": int(ins["address"]),
|
|
"test_address_hex": _h16(int(ins["address"])),
|
|
"branch_address": int(branch["address"]),
|
|
"branch_address_hex": _h16(int(branch["address"])),
|
|
"continuation_target": continuation_start,
|
|
"continuation_target_hex": _h16(continuation_start),
|
|
"initial_idle_commands": initial_values,
|
|
"initial_idle_commands_hex": [_h16(value, width=2) for value in initial_values],
|
|
"continuation_commands": continuation_values,
|
|
"continuation_commands_hex": [_h16(value, width=2) for value in continuation_values],
|
|
"summary": (
|
|
"FAA2 == 0 takes the initial/idle dispatcher path; FAA2 != 0 takes "
|
|
"the continuation dispatcher path."
|
|
),
|
|
"caveat": (
|
|
"Initial dispatch follows checksum validation and RX error handling. Command 1 "
|
|
"is only on the initial/idle path and is also gated by F861.bit7 == 0."
|
|
),
|
|
"evidence_addresses": [int(ins["address"]), int(branch["address"])],
|
|
"evidence_addresses_hex": _hlist([int(ins["address"]), int(branch["address"])]),
|
|
}
|
|
return None
|
|
|
|
|
|
def _annotate_dispatch_comparison_availability(
|
|
comparisons: list[JsonObject],
|
|
state_split: Mapping[str, Any],
|
|
) -> None:
|
|
continuation_start = state_split.get("continuation_target")
|
|
if not isinstance(continuation_start, int):
|
|
return
|
|
for item in comparisons:
|
|
compare_address = item.get("compare_address")
|
|
if not isinstance(compare_address, int):
|
|
continue
|
|
command_value = int(item.get("command_value", -1))
|
|
if compare_address < continuation_start:
|
|
availability = "initial_idle_dispatch"
|
|
conditions = [
|
|
"valid checksum/no RX physical error",
|
|
"FAA2 == 0",
|
|
]
|
|
if command_value == 0x01:
|
|
conditions.append("F861.bit7 == 0")
|
|
else:
|
|
availability = "continuation_dispatch"
|
|
conditions = [
|
|
"valid checksum/no RX physical error",
|
|
"FAA2 != 0",
|
|
]
|
|
item["availability"] = availability
|
|
item["availability_conditions"] = conditions
|
|
item["availability_summary"] = " && ".join(conditions)
|
|
|
|
|
|
def _dispatch_comparisons(
|
|
ordered: list[JsonObject],
|
|
start_index: int,
|
|
selector_reg: str,
|
|
) -> list[JsonObject]:
|
|
comparisons: list[JsonObject] = []
|
|
for index in range(start_index, min(len(ordered) - 1, start_index + 96)):
|
|
ins = ordered[index]
|
|
address = int(ins.get("address", -1))
|
|
if _mnemonic_root(str(ins.get("mnemonic", ""))) not in {"CMP", "CMP:E", "CMP:G", "CMP:I"}:
|
|
continue
|
|
if _destination_operand(str(ins.get("operands", ""))).upper() != selector_reg.upper():
|
|
continue
|
|
value = _immediate_source_value(str(ins.get("operands", "")))
|
|
if value is None or not 0 <= value <= 7:
|
|
continue
|
|
branch = ordered[index + 1]
|
|
if str(branch.get("mnemonic", "")).upper() != "BEQ":
|
|
continue
|
|
targets = _targets(branch)
|
|
if not targets:
|
|
continue
|
|
branch_address = int(branch["address"])
|
|
target = int(targets[0])
|
|
comparisons.append(
|
|
{
|
|
"command_value": value,
|
|
"command_value_hex": _h16(value, width=2),
|
|
"compare_address": address,
|
|
"compare_address_hex": _h16(address),
|
|
"branch_address": branch_address,
|
|
"branch_address_hex": _h16(branch_address),
|
|
"handler_start": target,
|
|
"handler_start_hex": _h16(target),
|
|
"evidence_addresses": [address, branch_address],
|
|
"evidence_addresses_hex": _hlist([address, branch_address]),
|
|
}
|
|
)
|
|
return comparisons
|
|
|
|
|
|
def _command_candidates(
|
|
ordered: list[JsonObject],
|
|
dispatch: JsonObject | None,
|
|
responses: list[JsonObject],
|
|
) -> list[JsonObject]:
|
|
if not dispatch:
|
|
return []
|
|
|
|
comparisons = [
|
|
item for item in dispatch.get("comparisons", []) if isinstance(item, Mapping)
|
|
]
|
|
starts = sorted({int(item["handler_start"]) for item in comparisons if "handler_start" in item})
|
|
ranges = {
|
|
start: _handler_end(ordered, start, starts)
|
|
for start in starts
|
|
}
|
|
|
|
by_value: dict[int, JsonObject] = {}
|
|
for comparison in comparisons:
|
|
value = int(comparison["command_value"])
|
|
start = int(comparison["handler_start"])
|
|
end = ranges.get(start)
|
|
command = by_value.setdefault(
|
|
value,
|
|
{
|
|
"kind": "command_candidate",
|
|
"command_value": value,
|
|
"command_value_hex": _h16(value, width=2),
|
|
"name_candidate": _command_name_candidate(value),
|
|
"summary": _command_summary(value),
|
|
"handler_alternatives": [],
|
|
"evidence_addresses": [],
|
|
"response_candidates": [],
|
|
"rx_reads": [],
|
|
"confidence": "medium",
|
|
"caveat": (
|
|
"Command value and handler range are inferred from compare/BEQ dispatch. "
|
|
"No command name or intent is asserted."
|
|
),
|
|
},
|
|
)
|
|
alternative = {
|
|
"handler_start": start,
|
|
"handler_start_hex": _h16(start),
|
|
"handler_end": end,
|
|
"handler_end_hex": _h16(end) if end is not None else None,
|
|
"dispatch_compare_address": comparison["compare_address"],
|
|
"dispatch_compare_address_hex": comparison["compare_address_hex"],
|
|
"dispatch_branch_address": comparison["branch_address"],
|
|
"dispatch_branch_address_hex": comparison["branch_address_hex"],
|
|
"availability": comparison.get("availability"),
|
|
"availability_conditions": comparison.get("availability_conditions", []),
|
|
}
|
|
if alternative not in command["handler_alternatives"]:
|
|
command["handler_alternatives"].append(alternative)
|
|
command["evidence_addresses"].extend(dispatch.get("evidence_addresses", [])[:2])
|
|
command["evidence_addresses"].extend(comparison.get("evidence_addresses", []))
|
|
|
|
for command in by_value.values():
|
|
alternatives = command["handler_alternatives"]
|
|
starts_for_command = _dedupe_ints(
|
|
alt["handler_start"] for alt in alternatives if isinstance(alt["handler_start"], int)
|
|
)
|
|
ends_for_command = _dedupe_ints(
|
|
alt["handler_end"] for alt in alternatives if isinstance(alt["handler_end"], int)
|
|
)
|
|
command["handler_start"] = starts_for_command[0] if len(starts_for_command) == 1 else None
|
|
command["handler_start_hex"] = _h16(starts_for_command[0]) if len(starts_for_command) == 1 else None
|
|
command["handler_end"] = ends_for_command[0] if len(ends_for_command) == 1 else None
|
|
command["handler_end_hex"] = _h16(ends_for_command[0]) if len(ends_for_command) == 1 else None
|
|
availability_conditions = _dedupe_strings(
|
|
str(condition)
|
|
for alt in alternatives
|
|
for condition in alt.get("availability_conditions", [])
|
|
if condition
|
|
)
|
|
availability = _dedupe_strings(
|
|
str(alt["availability"])
|
|
for alt in alternatives
|
|
if alt.get("availability")
|
|
)
|
|
availability_summaries = _dedupe_strings(
|
|
" && ".join(str(condition) for condition in alt.get("availability_conditions", []) if condition)
|
|
for alt in alternatives
|
|
if alt.get("availability_conditions")
|
|
)
|
|
command["availability"] = availability[0] if len(availability) == 1 else availability
|
|
command["availability_conditions"] = availability_conditions
|
|
command["availability_summary"] = (
|
|
availability_summaries[0]
|
|
if len(availability_summaries) == 1
|
|
else " OR ".join(availability_summaries)
|
|
)
|
|
command["semantic_notes"] = _command_semantic_notes(int(command["command_value"]))
|
|
|
|
ranges_for_command = [
|
|
(alt["handler_start"], alt["handler_end"])
|
|
for alt in alternatives
|
|
if isinstance(alt["handler_end"], int)
|
|
]
|
|
command["rx_reads"] = _rx_reads_in_ranges(ordered, ranges_for_command)
|
|
command["response_candidates"] = [
|
|
response["id"]
|
|
for response in responses
|
|
if _response_in_ranges(response, ranges_for_command)
|
|
]
|
|
command["effects"] = _command_effects(
|
|
int(command["command_value"]),
|
|
ordered,
|
|
ranges_for_command,
|
|
command["response_candidates"],
|
|
)
|
|
command["effect_summary"] = _command_effect_summary(
|
|
int(command["command_value"]),
|
|
command["effects"],
|
|
)
|
|
response_evidence = [
|
|
addr
|
|
for response in responses
|
|
if response["id"] in command["response_candidates"]
|
|
for addr in response.get("evidence_addresses", [])
|
|
]
|
|
command["evidence_addresses"] = _dedupe_ints(command["evidence_addresses"] + response_evidence)
|
|
command["evidence_addresses_hex"] = _hlist(command["evidence_addresses"])
|
|
|
|
return [by_value[value] for value in sorted(by_value)]
|
|
|
|
|
|
def _byte_layout() -> list[JsonObject]:
|
|
return [
|
|
{
|
|
"offset": 0,
|
|
"rx_address": RX_FRAME_START,
|
|
"tx_staging_address": TX_STAGING_START,
|
|
"name_candidate": "op_flags",
|
|
"semantic": "low three bits select a command; upper bits are preserved or gated in some paths",
|
|
"confidence": "medium-high",
|
|
},
|
|
{
|
|
"offset": 1,
|
|
"rx_address": RX_FRAME_START + 1,
|
|
"tx_staging_address": TX_STAGING_START + 1,
|
|
"name_candidate": "addr_page_flags",
|
|
"semantic": "candidate high/page byte for logical point/index; bit 7 is tested as a control flag",
|
|
"confidence": "medium",
|
|
},
|
|
{
|
|
"offset": 2,
|
|
"rx_address": RX_FRAME_START + 2,
|
|
"tx_staging_address": TX_STAGING_START + 2,
|
|
"name_candidate": "addr_offset",
|
|
"semantic": "candidate low/offset byte for logical point/index",
|
|
"confidence": "medium",
|
|
},
|
|
{
|
|
"offset": 3,
|
|
"rx_address": RX_FRAME_START + 3,
|
|
"tx_staging_address": TX_STAGING_START + 3,
|
|
"name_candidate": "value_hi",
|
|
"semantic": "candidate high byte of a word value",
|
|
"confidence": "medium",
|
|
},
|
|
{
|
|
"offset": 4,
|
|
"rx_address": RX_FRAME_START + 4,
|
|
"tx_staging_address": TX_STAGING_START + 4,
|
|
"name_candidate": "value_lo",
|
|
"semantic": "candidate low byte of a word value",
|
|
"confidence": "medium",
|
|
},
|
|
{
|
|
"offset": 5,
|
|
"rx_address": RX_CHECKSUM_ADDRESS,
|
|
"tx_staging_address": None,
|
|
"name_candidate": "checksum",
|
|
"semantic": "0x5A-seeded XOR of bytes 0..4",
|
|
"confidence": "high",
|
|
},
|
|
]
|
|
|
|
|
|
def _command_name_candidate(value: int) -> str:
|
|
return {
|
|
0x00: "set_value_acked",
|
|
0x01: "read_value",
|
|
0x02: "clear_or_abort",
|
|
0x04: "set_value_no_immediate_reply",
|
|
0x05: "ack_or_clear_pending",
|
|
0x06: "set_secondary_value",
|
|
0x07: "retransmit_or_error_reply",
|
|
}.get(value, f"command_{value:02X}")
|
|
|
|
|
|
def _command_summary(value: int) -> str:
|
|
return {
|
|
0x00: "candidate write of RX[3:4] into primary/current tables, followed by a response",
|
|
0x01: "initial/idle-path primary table read only, followed by an odd response staging sequence",
|
|
0x02: "candidate clear/abort path with no immediate response builder",
|
|
0x04: "candidate write/update path that stores a value without an immediate serial response",
|
|
0x05: "continuation-only conditional acknowledgement/session clear path",
|
|
0x06: "candidate secondary-table value write path",
|
|
0x07: "candidate retransmit path; retry/error handling also builds a command 0x07 RX-payload echo",
|
|
}.get(value, "candidate command semantics are unknown")
|
|
|
|
|
|
def _command_semantic_notes(value: int) -> list[str]:
|
|
return {
|
|
0x01: [
|
|
"Only accepted on the initial/idle dispatcher path: valid checksum/no RX error, FAA2 == 0, and F861.bit7 == 0.",
|
|
"BCD7 stages F850=0x04, writes F851 from F861 and then overwrites F851 from F862.",
|
|
"BCD7 reads the primary table word at E000 + 2*selector; F854 receives the low byte and F853 receives the high byte.",
|
|
"F852 is not freshly written in the BCD7 handler, so do not describe the response as a fixed 04 00 QQ hi lo frame.",
|
|
],
|
|
0x05: [
|
|
"Only accepted on the continuation dispatcher path when FAA2 != 0.",
|
|
"For selector 0x0040, frame 05 00 40 00 00 1F performs no response staging.",
|
|
"The handler clears FAA3/FAA2; F9B5 advances only when FAA2.bit3 was set from a queued report.",
|
|
"If FAA2 == 0, command 5 falls through the initial dispatcher instead of doing acknowledgement work.",
|
|
],
|
|
0x07: [
|
|
"loc_BE4D is a retry/error echo path: F850=0x07 and F861-F864 are copied into F851-F854 before loc_BA26.",
|
|
"Observed frame 07 80 40 20 90 2D means RX bytes F861-F864 were 80 40 20 90; it is not a table value.",
|
|
],
|
|
}.get(value, [])
|
|
|
|
|
|
def _command_effect_summary(value: int, effects: list[JsonObject]) -> str:
|
|
if not effects:
|
|
return "No structured command effects were inferred."
|
|
return {
|
|
0x00: "Candidate acknowledged set: writes value bytes to primary/current tables, flags the index, and stages an echo-style response.",
|
|
0x01: "Initial/idle candidate read: reads the primary table and stages an odd value response with F852 possibly stale.",
|
|
0x02: "Candidate clear/abort: clears serial session state without an observed immediate response.",
|
|
0x04: "Candidate deferred set: writes value bytes and flags the index without an observed immediate response.",
|
|
0x05: "Continuation-only ACK/session clear: clears FAA3/FAA2 and only advances F9B5 when queued-report FAA2.bit3 was set; selector 0x0040 has no response.",
|
|
0x06: "Candidate secondary set: writes value bytes to the secondary table and flags the index.",
|
|
0x07: "Candidate retransmit/error reply: reuses prior TX bytes or copies RX payload bytes behind an explicit 0x07 retry/error echo.",
|
|
}.get(value, "Candidate effects are inferred from handler-local writes, reads, calls, and response staging.")
|
|
|
|
|
|
def _command_effects(
|
|
value: int,
|
|
ordered: list[JsonObject],
|
|
ranges: list[tuple[int, int]],
|
|
response_ids: list[str],
|
|
) -> list[JsonObject]:
|
|
effects: list[JsonObject] = []
|
|
|
|
def add(effect: JsonObject) -> None:
|
|
effect.setdefault("confidence", "candidate-medium")
|
|
effect.setdefault(
|
|
"caveat",
|
|
"Effect is inferred from local data movement and remains a protocol candidate.",
|
|
)
|
|
evidence = _dedupe_ints(
|
|
addr for addr in effect.get("evidence_addresses", []) if isinstance(addr, int)
|
|
)
|
|
effect["evidence_addresses"] = evidence
|
|
effect["evidence_addresses_hex"] = _hlist(evidence)
|
|
effects.append(effect)
|
|
|
|
table_accesses = _table_accesses_in_ranges(ordered, ranges)
|
|
state_accesses = _state_accesses_in_ranges(ordered, ranges)
|
|
|
|
if value == 0x00:
|
|
add(
|
|
{
|
|
"kind": "table_write_candidate",
|
|
"target_candidate": "primary_value_table_candidate",
|
|
"source_candidate": "RX[3:4] value bytes, with an observed 0x80 fallback when decoded index is zero",
|
|
"table_base": 0xE000,
|
|
"table_base_hex": _h16(0xE000),
|
|
"evidence_addresses": _table_access_addresses(table_accesses, 0x2000, "write"),
|
|
}
|
|
)
|
|
add(
|
|
{
|
|
"kind": "table_write_candidate",
|
|
"target_candidate": "current_value_table_candidate",
|
|
"source_candidate": "same candidate value written to the primary table",
|
|
"table_base": 0xE800,
|
|
"table_base_hex": _h16(0xE800),
|
|
"evidence_addresses": _table_access_addresses(table_accesses, 0x1800, "write"),
|
|
}
|
|
)
|
|
add(
|
|
{
|
|
"kind": "flag_update_candidate",
|
|
"target_candidate": "per_index_flag_table_candidate",
|
|
"operation_candidate": "set bit 7",
|
|
"table_base": 0xEC00,
|
|
"table_base_hex": _h16(0xEC00),
|
|
"evidence_addresses": _table_access_addresses(table_accesses, 0x1400, "write"),
|
|
}
|
|
)
|
|
elif value == 0x01:
|
|
add(
|
|
{
|
|
"kind": "table_read_candidate",
|
|
"target_candidate": "primary_value_table_candidate",
|
|
"destination_candidate": "response value bytes F854/F853, with F852 not freshly written by BCD7",
|
|
"table_base": 0xE000,
|
|
"table_base_hex": _h16(0xE000),
|
|
"address_expression_candidate": "E000 + 2*selector",
|
|
"evidence_addresses": _table_access_addresses(table_accesses, 0x2000, "read"),
|
|
}
|
|
)
|
|
elif value == 0x02:
|
|
add(
|
|
{
|
|
"kind": "state_clear_candidate",
|
|
"target_candidate": STATE_VARIABLES[0xFAA2],
|
|
"state_address": 0xFAA2,
|
|
"state_address_hex": _h16(0xFAA2),
|
|
"operation_candidate": "clear bit 7",
|
|
"evidence_addresses": _state_access_addresses(state_accesses, 0xFAA2),
|
|
}
|
|
)
|
|
elif value == 0x04:
|
|
add(
|
|
{
|
|
"kind": "table_write_candidate",
|
|
"target_candidate": "primary_value_table_candidate",
|
|
"source_candidate": "RX[3:4] value bytes, with an observed 0x80 fallback when decoded index is zero",
|
|
"table_base": 0xE000,
|
|
"table_base_hex": _h16(0xE000),
|
|
"evidence_addresses": _table_access_addresses(table_accesses, 0x2000, "write"),
|
|
}
|
|
)
|
|
add(
|
|
{
|
|
"kind": "flag_update_candidate",
|
|
"target_candidate": "per_index_flag_table_candidate",
|
|
"operation_candidate": "set bit 7",
|
|
"table_base": 0xEC00,
|
|
"table_base_hex": _h16(0xEC00),
|
|
"evidence_addresses": _table_access_addresses(table_accesses, 0x1400, "write"),
|
|
}
|
|
)
|
|
elif value == 0x05:
|
|
add(
|
|
{
|
|
"kind": "conditional_ack_session_clear_candidate",
|
|
"target_candidate": "selected event/pending state",
|
|
"operation_candidate": (
|
|
"when FAA2 != 0, clear FAA3/FAA2; advance F9B5 only if FAA2.bit3 was "
|
|
"set from queued-report state; selector 0x0040 stages no response"
|
|
),
|
|
"selector_without_response_hex": _h16(0x0040),
|
|
"requires": ["FAA2 != 0"],
|
|
"fallthrough_when": "FAA2 == 0",
|
|
"evidence_addresses": _dedupe_ints(
|
|
_state_access_addresses(state_accesses, 0xFAA2)
|
|
+ _state_access_addresses(state_accesses, 0xFAA3)
|
|
+ _state_access_addresses(state_accesses, 0xF9B5)
|
|
),
|
|
}
|
|
)
|
|
elif value == 0x06:
|
|
add(
|
|
{
|
|
"kind": "table_write_candidate",
|
|
"target_candidate": "secondary_value_table_candidate",
|
|
"source_candidate": "RX[3:4] value bytes",
|
|
"table_base": 0xE400,
|
|
"table_base_hex": _h16(0xE400),
|
|
"evidence_addresses": _table_access_addresses(table_accesses, 0x1C00, "write"),
|
|
}
|
|
)
|
|
add(
|
|
{
|
|
"kind": "flag_update_candidate",
|
|
"target_candidate": "per_index_flag_table_candidate",
|
|
"operation_candidate": "set bit 6",
|
|
"table_base": 0xEC00,
|
|
"table_base_hex": _h16(0xEC00),
|
|
"evidence_addresses": _table_access_addresses(table_accesses, 0x1400, "write"),
|
|
}
|
|
)
|
|
elif value == 0x07:
|
|
add(
|
|
{
|
|
"kind": "retransmit_candidate",
|
|
"source_candidate": "previous TX frame bytes H'F858-H'F85C",
|
|
"destination_candidate": "TX staging bytes H'F850-H'F854 before loc_BA26",
|
|
"response_candidates": [item for item in response_ids if item == "response_at_BE22"],
|
|
"evidence_addresses": _addresses_in_ranges(ordered, ranges, 0xBE05, 0xBE22),
|
|
}
|
|
)
|
|
add(
|
|
{
|
|
"kind": "retry_error_echo_candidate",
|
|
"source_candidate": "RX payload bytes F861-F864",
|
|
"destination_candidate": "F850=0x07, F851-F854=F861-F864 before loc_BA26",
|
|
"observed_frame_caveat": "07 80 40 20 90 2D echoes RX payload 80 40 20 90; it is not a table value",
|
|
"response_candidates": [item for item in response_ids if item == "response_at_BE6A"],
|
|
"evidence_addresses": _addresses_in_ranges(ordered, ranges, 0xBE4D, 0xBE6A),
|
|
}
|
|
)
|
|
|
|
if response_ids:
|
|
add(
|
|
{
|
|
"kind": "response_staging_candidate",
|
|
"response_candidates": response_ids,
|
|
"operation_candidate": "stage F850-F854 and call loc_BA26",
|
|
"evidence_addresses": [
|
|
int(response_id.rsplit("_", 1)[1], 16)
|
|
for response_id in response_ids
|
|
if response_id.startswith("response_at_")
|
|
],
|
|
}
|
|
)
|
|
|
|
return effects
|
|
|
|
|
|
def _command_effect_aliases(commands: list[JsonObject]) -> list[JsonObject]:
|
|
aliases: list[JsonObject] = []
|
|
for command in commands:
|
|
if not isinstance(command, Mapping):
|
|
continue
|
|
aliases.append(
|
|
{
|
|
"kind": "command_effects_candidate",
|
|
"command_value": command.get("command_value"),
|
|
"command_value_hex": command.get("command_value_hex"),
|
|
"name_candidate": command.get("name_candidate"),
|
|
"summary": command.get("effect_summary", command.get("summary")),
|
|
"availability": command.get("availability"),
|
|
"availability_conditions": command.get("availability_conditions", []),
|
|
"availability_summary": command.get("availability_summary"),
|
|
"semantic_notes": command.get("semantic_notes", []),
|
|
"effects": command.get("effects", []),
|
|
"response_candidates": command.get("response_candidates", []),
|
|
"evidence_addresses": command.get("evidence_addresses", []),
|
|
"evidence_addresses_hex": command.get("evidence_addresses_hex", []),
|
|
"confidence": command.get("confidence", "medium"),
|
|
"caveat": command.get("caveat"),
|
|
}
|
|
)
|
|
return aliases
|
|
|
|
|
|
def _index_decoder_candidate(ordered: list[JsonObject]) -> JsonObject | None:
|
|
calls = [
|
|
ins for ins in ordered
|
|
if _mnemonic_root(str(ins.get("mnemonic", ""))) in {"BSR", "JSR", "PJSR"}
|
|
and (
|
|
INDEX_DECODER_ADDRESS in _targets(ins)
|
|
or INDEX_DECODER_LABEL.upper() in str(ins.get("operands", "")).upper()
|
|
)
|
|
]
|
|
if not calls:
|
|
return None
|
|
|
|
evidence_addresses = [int(ins["address"]) for ins in calls]
|
|
return {
|
|
"kind": "logical_index_decoder_candidate",
|
|
"label": INDEX_DECODER_LABEL,
|
|
"address": INDEX_DECODER_ADDRESS,
|
|
"address_hex": _h16(INDEX_DECODER_ADDRESS),
|
|
"input_fields": ["addr_page_flags", "addr_offset"],
|
|
"output_register": "R5",
|
|
"post_scale_register": "R4",
|
|
"post_scale": "R4 = R5 << 1",
|
|
"mapping_candidate": [
|
|
{"page": 0, "offset_range": "0x00-0x7F", "index_range": "0x000-0x07F"},
|
|
{"page": 1, "offset_range": "0x00-0xFF", "index_range": "0x080-0x17F"},
|
|
{"page": 2, "offset_range": "0x00-0x7F", "index_range": "0x180-0x1FF"},
|
|
{"page": "other/overflow", "index": "0x1FF"},
|
|
],
|
|
"evidence_addresses": evidence_addresses,
|
|
"evidence_addresses_hex": _hlist(evidence_addresses),
|
|
"confidence": "medium",
|
|
"caveat": (
|
|
"Mapping is inferred from loc_622B behavior and the nearby R4 = R5 << 1 table-index use."
|
|
),
|
|
}
|
|
|
|
|
|
def _response_candidates(ordered: list[JsonObject]) -> list[JsonObject]:
|
|
responses: list[JsonObject] = []
|
|
for index, ins in enumerate(ordered):
|
|
if not _is_send_builder_call(ins):
|
|
continue
|
|
window = _response_window(ordered, index)
|
|
writes = _staging_writes(window)
|
|
if not writes:
|
|
continue
|
|
reads = _rx_reads(window, RX_FRAME_START + 1, RX_FRAME_START + 4)
|
|
call_address = int(ins["address"])
|
|
evidence_addresses = _dedupe_ints(
|
|
[write["instruction_address"] for write in writes]
|
|
+ [read["instruction_address"] for read in reads]
|
|
+ [call_address]
|
|
)
|
|
response = {
|
|
"id": f"response_at_{call_address:04X}",
|
|
"kind": "response_staging_candidate",
|
|
"call_address": call_address,
|
|
"call_address_hex": _h16(call_address),
|
|
"send_builder": SEND_BUILDER_LABEL,
|
|
"send_builder_address": SEND_BUILDER_ADDRESS,
|
|
"send_builder_address_hex": _h16(SEND_BUILDER_ADDRESS),
|
|
"window_start": int(window[0]["address"]) if window else call_address,
|
|
"window_start_hex": _h16(int(window[0]["address"])) if window else _h16(call_address),
|
|
"writes": writes,
|
|
"rx_reads": reads,
|
|
"evidence_addresses": evidence_addresses,
|
|
"evidence_addresses_hex": _hlist(evidence_addresses),
|
|
"confidence": "medium",
|
|
"caveat": (
|
|
"Response candidate means F850-F854 are written shortly before loc_BA26. "
|
|
"The analyzer does not prove every byte is meaningful for every path."
|
|
),
|
|
}
|
|
response["schema"] = _response_schema(response)
|
|
response["byte_schema"] = response["schema"]["bytes"]
|
|
response["semantic_notes"] = response["schema"].get("semantic_notes", [])
|
|
responses.append(response)
|
|
return responses
|
|
|
|
|
|
def _response_schema(response: Mapping[str, Any]) -> JsonObject:
|
|
writes = [
|
|
write for write in response.get("writes", []) if isinstance(write, Mapping)
|
|
]
|
|
bytes_out: list[JsonObject] = []
|
|
for offset, address in enumerate(range(TX_STAGING_START, TX_STAGING_END + 1)):
|
|
matching = [
|
|
write for write in writes
|
|
if address in [item for item in write.get("addresses", []) if isinstance(item, int)]
|
|
]
|
|
if not matching:
|
|
bytes_out.append(
|
|
{
|
|
"offset": offset,
|
|
"byte": f"byte{offset}",
|
|
"tx_byte": f"TX[{offset}]",
|
|
"tx_staging_byte": f"TX[{offset}]",
|
|
"address": address,
|
|
"address_hex": _h16(address),
|
|
"source_kind": "unknown",
|
|
"source_expression": "unknown",
|
|
"source": {"kind": "unknown"},
|
|
"evidence_addresses": [],
|
|
"evidence_addresses_hex": [],
|
|
"confidence": "candidate-low",
|
|
"caveat": "No write to this staging byte was observed in the response window.",
|
|
}
|
|
)
|
|
continue
|
|
write = matching[-1]
|
|
byte_source = _response_byte_source(write, address)
|
|
evidence = [int(write["instruction_address"])] if isinstance(write.get("instruction_address"), int) else []
|
|
bytes_out.append(
|
|
{
|
|
"offset": offset,
|
|
"byte": f"byte{offset}",
|
|
"tx_byte": f"TX[{offset}]",
|
|
"tx_staging_byte": f"TX[{offset}]",
|
|
"address": address,
|
|
"address_hex": _h16(address),
|
|
"source_kind": _schema_source_kind(byte_source),
|
|
"source_expression": _source_expression(byte_source),
|
|
"source": byte_source,
|
|
"write_instruction_address": write.get("instruction_address"),
|
|
"write_instruction_address_hex": write.get("instruction_address_hex"),
|
|
"instruction": write.get("instruction"),
|
|
"evidence_addresses": evidence,
|
|
"evidence_addresses_hex": _hlist(evidence),
|
|
"confidence": "candidate-medium",
|
|
"caveat": (
|
|
"Per-byte source is inferred from the final observed write to this staging "
|
|
"byte before loc_BA26."
|
|
),
|
|
}
|
|
)
|
|
|
|
evidence_addresses = _dedupe_ints(
|
|
addr
|
|
for item in bytes_out
|
|
for addr in item.get("evidence_addresses", [])
|
|
if isinstance(addr, int)
|
|
)
|
|
semantic_notes = _response_schema_semantic_notes(response, bytes_out)
|
|
return {
|
|
"kind": "response_schema_candidate",
|
|
"response_id": response.get("id"),
|
|
"call_address": response.get("call_address"),
|
|
"call_address_hex": response.get("call_address_hex"),
|
|
"buffer_start": TX_STAGING_START,
|
|
"buffer_start_hex": _h16(TX_STAGING_START),
|
|
"buffer_end": TX_STAGING_END,
|
|
"buffer_end_hex": _h16(TX_STAGING_END),
|
|
"bytes": bytes_out,
|
|
"semantic_notes": semantic_notes,
|
|
"evidence_addresses": evidence_addresses,
|
|
"evidence_addresses_hex": _hlist(evidence_addresses),
|
|
"confidence": "candidate-medium" if evidence_addresses else "candidate-low",
|
|
"caveat": (
|
|
"Response schema is a candidate extracted from writes to F850-F854 in the local "
|
|
"window before loc_BA26; control-flow alternatives may share a send call."
|
|
),
|
|
}
|
|
|
|
|
|
def _response_schema_semantic_notes(
|
|
response: Mapping[str, Any],
|
|
bytes_out: list[JsonObject],
|
|
) -> list[str]:
|
|
call_address = response.get("call_address")
|
|
if call_address == 0xBCFA:
|
|
for item in bytes_out:
|
|
if item.get("offset") == 2 and item.get("source_kind") == "unknown":
|
|
item["source_kind"] = "stale_or_unchanged"
|
|
item["source_expression"] = "stale/unchanged"
|
|
item["caveat"] = "BCD7 does not freshly write F852 before loc_BA26."
|
|
return [
|
|
"Command 1 BCD7 staging is odd: F850=0x04; F851 is written from F861 then overwritten by F862.",
|
|
"The primary table word is read from E000 + 2*selector; F854/F853 receive low/high value bytes.",
|
|
"F852 may be stale or unchanged in this handler; avoid a fixed 04 00 QQ hi lo response shape.",
|
|
]
|
|
if call_address == 0xBE6A:
|
|
return [
|
|
"loc_BE4D retry/error echo stages F850=0x07 and copies F861-F864 into F851-F854 before loc_BA26.",
|
|
"Observed 07 80 40 20 90 2D echoes RX payload bytes 80 40 20 90; it is not a table-derived value.",
|
|
]
|
|
return []
|
|
|
|
|
|
def _response_schemas(responses: list[JsonObject]) -> list[JsonObject]:
|
|
return [
|
|
response["schema"]
|
|
for response in responses
|
|
if isinstance(response.get("schema"), Mapping)
|
|
]
|
|
|
|
|
|
def _response_byte_source(write: Mapping[str, Any], address: int) -> JsonObject:
|
|
source = write.get("source")
|
|
if not isinstance(source, Mapping):
|
|
return {"kind": "computed", "operand": write.get("source_operand")}
|
|
addresses = [item for item in write.get("addresses", []) if isinstance(item, int)]
|
|
byte_index = addresses.index(address) if address in addresses else 0
|
|
kind = str(source.get("kind", "computed"))
|
|
|
|
if kind in {"rx_frame_byte", "rx_frame_word"}:
|
|
offsets = source.get("rx_offsets")
|
|
if isinstance(offsets, list) and byte_index < len(offsets) and isinstance(offsets[byte_index], int):
|
|
rx_offset = int(offsets[byte_index])
|
|
else:
|
|
rx_offset = int(source.get("rx_offset", 0)) + byte_index
|
|
rx_address = RX_FRAME_START + rx_offset
|
|
return {
|
|
"kind": "rx_frame_byte",
|
|
"rx_offset": rx_offset,
|
|
"rx_address": rx_address,
|
|
"rx_address_hex": _h16(rx_address),
|
|
"derived_from": dict(source),
|
|
}
|
|
|
|
if kind in {"tx_frame_byte", "tx_frame_word"}:
|
|
offsets = source.get("tx_offsets")
|
|
if isinstance(offsets, list) and byte_index < len(offsets) and isinstance(offsets[byte_index], int):
|
|
tx_offset = int(offsets[byte_index])
|
|
else:
|
|
tx_offset = int(source.get("tx_offset", 0)) + byte_index
|
|
tx_address = TX_FRAME_START + tx_offset
|
|
return {
|
|
"kind": "tx_frame_byte",
|
|
"tx_offset": tx_offset,
|
|
"tx_address": tx_address,
|
|
"tx_address_hex": _h16(tx_address),
|
|
"derived_from": dict(source),
|
|
}
|
|
|
|
if kind == "table":
|
|
output = dict(source)
|
|
output["byte_index"] = byte_index
|
|
output["kind"] = "table"
|
|
return output
|
|
|
|
if kind == "immediate":
|
|
output = dict(source)
|
|
output["byte_index"] = byte_index
|
|
return output
|
|
|
|
output = dict(source)
|
|
output.setdefault("kind", "computed")
|
|
output["byte_index"] = byte_index
|
|
return output
|
|
|
|
|
|
def _schema_source_kind(source: Mapping[str, Any]) -> str:
|
|
kind = str(source.get("kind", "computed"))
|
|
if kind == "immediate":
|
|
return "immediate"
|
|
if kind == "rx_frame_byte":
|
|
return "rx_frame_byte"
|
|
if kind == "table":
|
|
return "table"
|
|
if kind in {"tx_frame_byte", "tx_frame_word"}:
|
|
return "tx_frame_byte"
|
|
return "computed"
|
|
|
|
|
|
def _source_expression(source: Mapping[str, Any]) -> str:
|
|
kind = str(source.get("kind", "computed"))
|
|
if kind == "immediate" and isinstance(source.get("value"), int):
|
|
return f"0x{int(source['value']) & 0xFFFF:02X}".lower()
|
|
if kind == "rx_frame_byte" and isinstance(source.get("rx_offset"), int):
|
|
return f"rx[{int(source['rx_offset'])}]"
|
|
if kind == "tx_frame_byte" and isinstance(source.get("tx_offset"), int):
|
|
return f"tx[{int(source['tx_offset'])}]"
|
|
if kind == "table":
|
|
return str(source.get("name_candidate", "table"))
|
|
return "computed"
|
|
|
|
|
|
def _rx_field_candidates(
|
|
ordered: list[JsonObject],
|
|
dispatch: JsonObject | None,
|
|
) -> list[JsonObject]:
|
|
fields: list[JsonObject] = []
|
|
dispatch_evidence = []
|
|
if isinstance(dispatch, Mapping):
|
|
dispatch_evidence = [
|
|
value for value in dispatch.get("evidence_addresses", []) if isinstance(value, int)
|
|
]
|
|
|
|
for offset in range(RX_FRAME_LENGTH):
|
|
address = RX_FRAME_START + offset
|
|
read_evidence = [
|
|
int(ins["address"]) for ins in ordered if _is_read_from_address(ins, address)
|
|
]
|
|
name = "payload_byte"
|
|
confidence = "candidate-low"
|
|
caveat = "role is inferred only from frame position"
|
|
mask = None
|
|
|
|
if offset == 0:
|
|
name = "command_low3"
|
|
confidence = "candidate-high" if dispatch else "candidate-medium"
|
|
caveat = "RX[0] is masked with 0x07 before command comparisons"
|
|
mask = 0x07
|
|
read_evidence = _dedupe_ints(read_evidence + dispatch_evidence)
|
|
elif offset in {1, 2}:
|
|
name = "likely_id_or_index"
|
|
confidence = "candidate-medium" if read_evidence else "candidate-low"
|
|
caveat = "RX[1:2] are read near logical point/index and response-echo handling"
|
|
elif offset in {3, 4}:
|
|
name = "likely_value"
|
|
confidence = "candidate-medium" if read_evidence else "candidate-low"
|
|
caveat = "RX[3:4] are read near table-value write/read response handling"
|
|
elif offset == 5:
|
|
name = "checksum"
|
|
confidence = "candidate-high"
|
|
caveat = "RX[5] is validated by the serial reconstruction checksum evidence"
|
|
|
|
field: JsonObject = {
|
|
"kind": "rx_field_semantic_candidate",
|
|
"offset": offset,
|
|
"name": name,
|
|
"address": address,
|
|
"address_hex": _h16(address),
|
|
"confidence": confidence,
|
|
"caveat": caveat,
|
|
"evidence_addresses": _dedupe_ints(read_evidence),
|
|
"evidence_addresses_hex": _hlist(read_evidence),
|
|
}
|
|
if mask is not None:
|
|
field["mask"] = mask
|
|
field["mask_hex"] = _h16(mask, width=2)
|
|
fields.append(field)
|
|
return fields
|
|
|
|
|
|
def _response_builder_aliases(responses: list[JsonObject]) -> list[JsonObject]:
|
|
builders: list[JsonObject] = []
|
|
for response in responses:
|
|
writes: list[JsonObject] = []
|
|
for write in response.get("writes", []):
|
|
if not isinstance(write, Mapping):
|
|
continue
|
|
for address in write.get("addresses", []):
|
|
if not isinstance(address, int):
|
|
continue
|
|
writes.append(
|
|
{
|
|
"address": address,
|
|
"address_hex": _h16(address),
|
|
"instruction_address": write.get("instruction_address"),
|
|
"instruction_address_hex": write.get("instruction_address_hex"),
|
|
"source": write.get("source"),
|
|
"instruction": write.get("instruction"),
|
|
}
|
|
)
|
|
builders.append(
|
|
{
|
|
"kind": "response_builder_candidate",
|
|
"buffer_start": TX_STAGING_START,
|
|
"buffer_start_hex": _h16(TX_STAGING_START),
|
|
"buffer_end": TX_STAGING_END,
|
|
"buffer_end_hex": _h16(TX_STAGING_END),
|
|
"send_call_target": SEND_BUILDER_ADDRESS,
|
|
"send_call_target_hex": _h16(SEND_BUILDER_ADDRESS),
|
|
"call_address": response.get("call_address"),
|
|
"call_address_hex": response.get("call_address_hex"),
|
|
"writes": writes,
|
|
"semantic_notes": response.get("semantic_notes", []),
|
|
"evidence_addresses": response.get("evidence_addresses", []),
|
|
"evidence_addresses_hex": response.get("evidence_addresses_hex", []),
|
|
"confidence": response.get("confidence", "medium"),
|
|
"caveat": response.get("caveat"),
|
|
}
|
|
)
|
|
return builders
|
|
|
|
|
|
def _logical_table_map_candidates(ordered: list[JsonObject]) -> list[JsonObject]:
|
|
by_offset: dict[int, list[JsonObject]] = {offset: [] for offset in LOGICAL_TABLES}
|
|
direct_by_address: dict[int, list[JsonObject]] = {address: [] for address in DIRECT_TABLE_CANDIDATES}
|
|
for ins in ordered:
|
|
for operand in _negative_indexed_operands(str(ins.get("operands", ""))):
|
|
offset = int(operand["negative_offset"])
|
|
if offset not in LOGICAL_TABLES:
|
|
continue
|
|
access = _operand_access_kind(ins, str(operand["operand"]))
|
|
by_offset[offset].append(
|
|
{
|
|
"instruction_address": int(ins["address"]),
|
|
"instruction_address_hex": _h16(int(ins["address"])),
|
|
"operand": operand["operand"],
|
|
"negative_offset": offset,
|
|
"negative_offset_hex": _h16(offset),
|
|
"index_register": operand["index_register"],
|
|
"access": access,
|
|
"width": _access_width(str(ins.get("mnemonic", ""))),
|
|
"instruction": str(ins.get("text", "")),
|
|
}
|
|
)
|
|
for address in DIRECT_TABLE_CANDIDATES:
|
|
if not _has_ref_in_range(ins, address, address):
|
|
continue
|
|
direct_by_address[address].append(
|
|
{
|
|
"instruction_address": int(ins["address"]),
|
|
"instruction_address_hex": _h16(int(ins["address"])),
|
|
"address": address,
|
|
"address_hex": _h16(address),
|
|
"access": _access_direction(ins, address) or "read_write_candidate",
|
|
"width": _access_width(str(ins.get("mnemonic", ""))),
|
|
"instruction": str(ins.get("text", "")),
|
|
}
|
|
)
|
|
|
|
candidates: list[JsonObject] = []
|
|
for offset, accesses in by_offset.items():
|
|
if not accesses:
|
|
continue
|
|
metadata = LOGICAL_TABLES[offset]
|
|
evidence = _dedupe_ints(access["instruction_address"] for access in accesses)
|
|
logical_base = int(metadata["logical_base_address"])
|
|
candidates.append(
|
|
{
|
|
"kind": "logical_table_map_candidate",
|
|
"name_candidate": metadata["name_candidate"],
|
|
"element_candidate": metadata["element_candidate"],
|
|
"logical_base_address": logical_base,
|
|
"logical_base_address_hex": _h16(logical_base),
|
|
"negative_offset": offset,
|
|
"negative_offset_hex": _h16(offset),
|
|
"observed_index_registers": sorted(
|
|
{
|
|
str(access["index_register"])
|
|
for access in accesses
|
|
if access.get("index_register")
|
|
}
|
|
),
|
|
"observed_accesses": sorted(
|
|
{str(access["access"]) for access in accesses if access.get("access")}
|
|
),
|
|
"observed_widths": sorted(
|
|
{int(access["width"]) for access in accesses if isinstance(access.get("width"), int)}
|
|
),
|
|
"accesses": accesses,
|
|
"evidence_addresses": evidence,
|
|
"evidence_addresses_hex": _hlist(evidence),
|
|
"confidence": "candidate-medium",
|
|
"caveat": (
|
|
"Logical table base is inferred from negative indexed operands only; "
|
|
"the table name is a conservative candidate."
|
|
),
|
|
}
|
|
)
|
|
for address, accesses in direct_by_address.items():
|
|
if not accesses:
|
|
continue
|
|
evidence = _dedupe_ints(access["instruction_address"] for access in accesses)
|
|
candidates.append(
|
|
{
|
|
"kind": "direct_table_map_candidate",
|
|
"name_candidate": DIRECT_TABLE_CANDIDATES[address],
|
|
"address": address,
|
|
"address_hex": _h16(address),
|
|
"observed_accesses": sorted(
|
|
{str(access["access"]) for access in accesses if access.get("access")}
|
|
),
|
|
"observed_widths": sorted(
|
|
{int(access["width"]) for access in accesses if isinstance(access.get("width"), int)}
|
|
),
|
|
"accesses": accesses,
|
|
"evidence_addresses": evidence,
|
|
"evidence_addresses_hex": _hlist(evidence),
|
|
"confidence": "candidate-low",
|
|
"caveat": (
|
|
"Direct table candidate is retained for small synthetic traces; the main "
|
|
"logical table map uses negative indexed operands."
|
|
),
|
|
}
|
|
)
|
|
return candidates
|
|
|
|
|
|
def _state_variable_candidates(ordered: list[JsonObject]) -> list[JsonObject]:
|
|
candidates: list[JsonObject] = []
|
|
state_regions = [
|
|
(SERIAL_HANDLER_START, SERIAL_HANDLER_END),
|
|
(IDLE_REPORT_GATE_ENTRY, IDLE_REPORT_GATE_END),
|
|
(0x40E0, 0x40E0),
|
|
(FRT1_OCIA_ENTRY, 0xBF08),
|
|
(FRT2_OCIA_ENTRY, 0xBF37),
|
|
]
|
|
serial_region = [
|
|
ins
|
|
for ins in ordered
|
|
if any(start <= int(ins.get("address", -1)) <= end for start, end in state_regions)
|
|
]
|
|
if not any(
|
|
_has_ref_in_range(ins, min(STATE_VARIABLES), max(STATE_VARIABLES))
|
|
for ins in serial_region
|
|
):
|
|
serial_region = ordered
|
|
for address, name in STATE_VARIABLES.items():
|
|
accesses: list[JsonObject] = []
|
|
for ins in serial_region:
|
|
if not _has_ref_in_range(ins, address, address):
|
|
continue
|
|
access = _access_direction(ins, address) or "read_write_candidate"
|
|
source, _destination = _source_destination_operands(str(ins.get("operands", "")))
|
|
immediate = _parse_immediate(source)
|
|
item: JsonObject = {
|
|
"instruction_address": int(ins["address"]),
|
|
"instruction_address_hex": _h16(int(ins["address"])),
|
|
"access": access,
|
|
"mnemonic": str(ins.get("mnemonic", "")),
|
|
"instruction": str(ins.get("text", "")),
|
|
}
|
|
bit = _bit_number_from_instruction(ins)
|
|
if bit is not None:
|
|
item["bit"] = bit
|
|
if immediate is not None:
|
|
item["immediate"] = immediate
|
|
item["immediate_hex"] = _h16(immediate, width=2 if immediate <= 0xFF else 4)
|
|
accesses.append(item)
|
|
if not accesses:
|
|
continue
|
|
evidence = _dedupe_ints(access["instruction_address"] for access in accesses)
|
|
candidates.append(
|
|
{
|
|
"kind": "serial_state_variable_candidate",
|
|
"name_candidate": name,
|
|
"address": address,
|
|
"address_hex": _h16(address),
|
|
"access_count": len(accesses),
|
|
"read_count": sum(1 for access in accesses if str(access["access"]) == "read"),
|
|
"write_count": sum(1 for access in accesses if str(access["access"]) == "write"),
|
|
"bit_candidates": sorted(
|
|
{int(access["bit"]) for access in accesses if isinstance(access.get("bit"), int)}
|
|
),
|
|
"immediate_values": _dedupe_ints(
|
|
int(access["immediate"]) for access in accesses if isinstance(access.get("immediate"), int)
|
|
),
|
|
"immediate_values_hex": _hlist(
|
|
int(access["immediate"]) for access in accesses if isinstance(access.get("immediate"), int)
|
|
),
|
|
"accesses": accesses,
|
|
"evidence_addresses": evidence,
|
|
"evidence_addresses_hex": _hlist(evidence),
|
|
"confidence": "candidate-medium",
|
|
"caveat": (
|
|
"Role is inferred from references in serial handler, gate, and timer regions "
|
|
"and remains a state-variable candidate."
|
|
),
|
|
}
|
|
)
|
|
return candidates
|
|
|
|
|
|
def _generic_checksum_error_branches(
|
|
ordered: list[JsonObject],
|
|
responses: list[JsonObject],
|
|
) -> JsonObject:
|
|
response_by_call = {
|
|
int(response["call_address"]): response
|
|
for response in responses
|
|
if isinstance(response.get("call_address"), int)
|
|
}
|
|
response_ids_by_call = {
|
|
call: str(response["id"])
|
|
for call, response in response_by_call.items()
|
|
if isinstance(response.get("id"), str)
|
|
}
|
|
response_calls = sorted(response_by_call)
|
|
error_response_calls = {
|
|
call for call, response in response_by_call.items()
|
|
if _response_has_immediate_byte(response, 0, 0x07)
|
|
}
|
|
|
|
branch_addresses: list[int] = []
|
|
branch_targets: list[int] = []
|
|
response_ids: list[str] = []
|
|
evidence_addresses: list[int] = []
|
|
for index, ins in enumerate(ordered):
|
|
if not _has_ref_in_range(ins, RX_CHECKSUM_ADDRESS, RX_CHECKSUM_ADDRESS):
|
|
continue
|
|
if _mnemonic_root(str(ins.get("mnemonic", ""))) not in {"CMP", "CMP:E", "CMP:G", "CMP:I"}:
|
|
continue
|
|
evidence_addresses.append(int(ins["address"]))
|
|
for follower in ordered[index + 1:index + 4]:
|
|
targets = _targets(follower)
|
|
if not targets:
|
|
continue
|
|
branch_address = int(follower["address"])
|
|
branch_addresses.append(branch_address)
|
|
evidence_addresses.append(branch_address)
|
|
for target in targets:
|
|
branch_targets.append(target)
|
|
response_call = _first_response_call_at_or_after(response_calls, target, max_distance=0x80)
|
|
if response_call is None:
|
|
continue
|
|
if error_response_calls and response_call not in error_response_calls:
|
|
continue
|
|
response_ids.append(response_ids_by_call[response_call])
|
|
evidence_addresses.extend(_response_evidence_addresses(responses, [response_ids_by_call[response_call]]))
|
|
break
|
|
|
|
return {
|
|
"branch_addresses": _dedupe_ints(branch_addresses),
|
|
"branch_targets": _dedupe_ints(branch_targets),
|
|
"response_ids": _dedupe_strings(response_ids),
|
|
"evidence_addresses": _dedupe_ints(evidence_addresses),
|
|
}
|
|
|
|
|
|
def _first_response_call_at_or_after(
|
|
response_calls: list[int],
|
|
address: int,
|
|
*,
|
|
max_distance: int,
|
|
) -> int | None:
|
|
for call in response_calls:
|
|
if address <= call <= address + max_distance:
|
|
return call
|
|
return None
|
|
|
|
|
|
def _responses_with_immediate_byte(
|
|
responses: list[JsonObject],
|
|
offset: int,
|
|
value: int,
|
|
) -> list[str]:
|
|
return _dedupe_strings(
|
|
str(response["id"])
|
|
for response in responses
|
|
if isinstance(response.get("id"), str)
|
|
and _response_has_immediate_byte(response, offset, value)
|
|
)
|
|
|
|
|
|
def _response_has_immediate_byte(response: Mapping[str, Any], offset: int, value: int) -> bool:
|
|
schema = response.get("schema")
|
|
if not isinstance(schema, Mapping):
|
|
return False
|
|
for item in schema.get("bytes", []):
|
|
if not isinstance(item, Mapping):
|
|
continue
|
|
if item.get("offset") != offset:
|
|
continue
|
|
source = item.get("source")
|
|
return (
|
|
isinstance(source, Mapping)
|
|
and source.get("kind") == "immediate"
|
|
and source.get("value") == value
|
|
)
|
|
return False
|
|
|
|
|
|
def _response_evidence_addresses(
|
|
responses: list[JsonObject],
|
|
response_ids: Iterable[str],
|
|
) -> list[int]:
|
|
wanted = set(response_ids)
|
|
return _dedupe_ints(
|
|
int(address)
|
|
for response in responses
|
|
if response.get("id") in wanted
|
|
for address in response.get("evidence_addresses", [])
|
|
if isinstance(address, int)
|
|
)
|
|
|
|
|
|
def _retry_error_model(ordered: list[JsonObject], responses: list[JsonObject]) -> JsonObject | None:
|
|
checksum_path = [
|
|
ins
|
|
for ins in ordered
|
|
if 0xBBD6 <= int(ins.get("address", -1)) <= 0xBBF0
|
|
]
|
|
retry_path = [
|
|
ins
|
|
for ins in ordered
|
|
if 0xBE29 <= int(ins.get("address", -1)) <= 0xBE6A
|
|
]
|
|
retransmit_path = [
|
|
ins
|
|
for ins in ordered
|
|
if 0xBE05 <= int(ins.get("address", -1)) <= 0xBE22
|
|
]
|
|
checksum_branch = [
|
|
int(ins["address"])
|
|
for ins in checksum_path
|
|
if 0xBE29 in _targets(ins)
|
|
]
|
|
generic_checksum_branch = _generic_checksum_error_branches(ordered, responses)
|
|
checksum_branch = _dedupe_ints(checksum_branch + generic_checksum_branch["branch_addresses"])
|
|
retry_counter_evidence = [
|
|
int(ins["address"])
|
|
for ins in retry_path
|
|
if _has_ref_in_range(ins, 0xFAA6, 0xFAA6)
|
|
]
|
|
explicit_error_response = [
|
|
response.get("id")
|
|
for response in responses
|
|
if response.get("id") == "response_at_BE6A"
|
|
]
|
|
checksum_error_response = _dedupe_strings(
|
|
[item for item in explicit_error_response if isinstance(item, str)]
|
|
+ generic_checksum_branch["response_ids"]
|
|
)
|
|
retransmit_response = [
|
|
response.get("id")
|
|
for response in responses
|
|
if response.get("id") == "response_at_BE22"
|
|
]
|
|
command_0x07_responses = _responses_with_immediate_byte(responses, 0, 0x07)
|
|
retransmit_response = _dedupe_strings(
|
|
[item for item in retransmit_response if isinstance(item, str)]
|
|
+ [
|
|
item for item in command_0x07_responses
|
|
if item not in checksum_error_response
|
|
]
|
|
)
|
|
if (
|
|
not checksum_branch
|
|
and not retry_counter_evidence
|
|
and not retransmit_response
|
|
and not checksum_error_response
|
|
):
|
|
return None
|
|
|
|
checksum_evidence = _dedupe_ints(
|
|
[int(ins["address"]) for ins in checksum_path if _has_ref_in_range(ins, RX_FRAME_START, RX_FRAME_END)]
|
|
+ checksum_branch
|
|
+ generic_checksum_branch["evidence_addresses"]
|
|
)
|
|
retry_evidence = _dedupe_ints(
|
|
[int(ins["address"]) for ins in retry_path if _has_ref_in_range(ins, 0xFAA2, 0xFAA6)]
|
|
+ [int(ins["address"]) for ins in retry_path if _has_ref_in_range(ins, TX_STAGING_START, TX_STAGING_END)]
|
|
+ [int(ins["address"]) for ins in retry_path if _has_ref_in_range(ins, RX_FRAME_START + 1, RX_FRAME_START + 4)]
|
|
+ [int(ins["address"]) for ins in retry_path if _is_send_builder_call(ins)]
|
|
+ _response_evidence_addresses(responses, checksum_error_response)
|
|
)
|
|
retransmit_evidence = _dedupe_ints(
|
|
[int(ins["address"]) for ins in retransmit_path if _has_ref_in_range(ins, TX_FRAME_START, TX_FRAME_END)]
|
|
+ [int(ins["address"]) for ins in retransmit_path if _has_ref_in_range(ins, TX_STAGING_START, TX_STAGING_END)]
|
|
+ [int(ins["address"]) for ins in retransmit_path if _is_send_builder_call(ins)]
|
|
+ _response_evidence_addresses(responses, retransmit_response)
|
|
)
|
|
evidence = _dedupe_ints(checksum_evidence + retry_evidence + retransmit_evidence)
|
|
|
|
return {
|
|
"kind": "serial_retry_error_model_candidate",
|
|
"checksum_failure_path": {
|
|
"condition_candidate": "0x5A-seeded XOR over RX[0..4] differs from RX[5]",
|
|
"error_target": "loc_BE29",
|
|
"error_target_address": 0xBE29,
|
|
"error_target_address_hex": _h16(0xBE29),
|
|
"checksum_error_response_candidates": checksum_error_response,
|
|
"branch_evidence_addresses": checksum_branch,
|
|
"branch_evidence_addresses_hex": _hlist(checksum_branch),
|
|
"evidence_addresses": checksum_evidence,
|
|
"evidence_addresses_hex": _hlist(checksum_evidence),
|
|
"confidence": "candidate-high" if checksum_branch else "candidate-low",
|
|
},
|
|
"retry_path": {
|
|
"entry_label": "loc_BE29",
|
|
"entry_address": 0xBE29,
|
|
"entry_address_hex": _h16(0xBE29),
|
|
"counter_address": 0xFAA6,
|
|
"counter_address_hex": _h16(0xFAA6),
|
|
"threshold_candidate": 2,
|
|
"response_candidates": checksum_error_response,
|
|
"summary": (
|
|
"Candidate retry path clears/consults serial flags, increments FAA6, compares it "
|
|
"with 2, and when still below the apparent limit enters loc_BE4D to stage a "
|
|
"command 0x07 retry/error echo of RX payload bytes F861-F864."
|
|
),
|
|
"echo_response_candidate": {
|
|
"entry_label": "loc_BE4D",
|
|
"entry_address": 0xBE4D,
|
|
"entry_address_hex": _h16(0xBE4D),
|
|
"staging_candidate": "F850=0x07; F851-F854=F861-F864",
|
|
"observed_frame_caveat": "07 80 40 20 90 2D echoes RX payload 80 40 20 90 and is not a table value.",
|
|
},
|
|
"evidence_addresses": retry_evidence,
|
|
"evidence_addresses_hex": _hlist(retry_evidence),
|
|
"confidence": "candidate-medium" if retry_counter_evidence else "candidate-low",
|
|
},
|
|
"command_0x07_path": {
|
|
"entry_label": "loc_BE05",
|
|
"entry_address": 0xBE05,
|
|
"entry_address_hex": _h16(0xBE05),
|
|
"response_candidates": retransmit_response,
|
|
"summary": (
|
|
"Candidate retransmit/explicit command 0x07 path either copies previous TX "
|
|
"frame bytes back to F850-F854 or stages an observed 0x07 response before loc_BA26."
|
|
),
|
|
"evidence_addresses": retransmit_evidence,
|
|
"evidence_addresses_hex": _hlist(retransmit_evidence),
|
|
"confidence": "candidate-medium" if retransmit_response else "candidate-low",
|
|
},
|
|
"evidence_addresses": evidence,
|
|
"evidence_addresses_hex": _hlist(evidence),
|
|
"confidence": "candidate-medium",
|
|
"caveat": (
|
|
"The retry/error model is inferred from checksum branch targets, retry-counter state, "
|
|
"and response staging; exact host-visible semantics remain candidate phrasing."
|
|
),
|
|
}
|
|
|
|
|
|
def _gate_queue_model(ordered: list[JsonObject], commands: list[JsonObject]) -> JsonObject | None:
|
|
evidence = _dedupe_ints(
|
|
_addresses_in_ranges(ordered, [(MAIN_REPORT_GATE_ENTRY, MAIN_REPORT_GATE_CALL)], MAIN_REPORT_GATE_ENTRY, MAIN_REPORT_GATE_CALL)
|
|
+ _addresses_in_ranges(ordered, [(SESSION_GATE_ENTRY, 0x4007)], SESSION_GATE_ENTRY, 0x4007)
|
|
+ _addresses_in_ranges(ordered, [(IDLE_REPORT_GATE_ENTRY, IDLE_REPORT_GATE_END)], IDLE_REPORT_GATE_ENTRY, IDLE_REPORT_GATE_END)
|
|
+ _addresses_in_ranges(ordered, [(QUEUE_REPORT_ENTRY, AUTONOMOUS_TX_REPORT_CALL)], QUEUE_REPORT_ENTRY, AUTONOMOUS_TX_REPORT_CALL)
|
|
+ _addresses_in_ranges(ordered, [(RESEND_GATE_ENTRY, PERIODIC_RESEND_ENTRY)], RESEND_GATE_ENTRY, PERIODIC_RESEND_ENTRY)
|
|
)
|
|
command_ack_values = [
|
|
int(command["command_value"])
|
|
for command in commands
|
|
if command.get("command_value") == 0x05
|
|
]
|
|
if not evidence and not command_ack_values:
|
|
return None
|
|
|
|
return {
|
|
"kind": "serial_gate_queue_state_machine_candidate",
|
|
"summary": (
|
|
"Conservative model for autonomous report gating, queue cursor comparison, "
|
|
"periodic resend, and RX/session side effects."
|
|
),
|
|
"predicates": [
|
|
{
|
|
"name": "main_loop_may_enter_report_builder",
|
|
"entry_label": "loc_3FD3",
|
|
"target_label": "loc_BAF2",
|
|
"condition_candidate": (
|
|
"FAA2 == 0 && F9C0 == 0 && ((FAA5.bit7 == 0) || (F9C3 == 0))"
|
|
),
|
|
"summary": "Main-loop report gate; session must be idle, TX busy timer clear, and RX gate open.",
|
|
"state_addresses_hex": [_h16(0xFAA2), _h16(0xFAA5), _h16(0xF9C3), _h16(0xF9C0)],
|
|
"evidence_addresses": _addresses_in_ranges(
|
|
ordered,
|
|
[(MAIN_REPORT_GATE_ENTRY, MAIN_REPORT_GATE_CALL)],
|
|
MAIN_REPORT_GATE_ENTRY,
|
|
MAIN_REPORT_GATE_CALL,
|
|
),
|
|
},
|
|
{
|
|
"name": "idle_heartbeat_report_may_enqueue",
|
|
"entry_label": "loc_4046",
|
|
"target_label": "loc_4067",
|
|
"condition_candidate": (
|
|
"F9C4 == 0 && ((FAA5.bit7 == 0) || (F9C3 == 0)) && F9B0 == F9B5"
|
|
),
|
|
"summary": (
|
|
"Idle/default report gate; when the FRT2 countdown clears and the queue is "
|
|
"empty, loc_4046 can enqueue H'0000 for the later loc_BAF2 -> loc_BA26 send path."
|
|
),
|
|
"state_addresses_hex": [_h16(0xF9C4), _h16(0xFAA5), _h16(0xF9C3), _h16(0xF9B0), _h16(0xF9B5)],
|
|
"enqueued_report_candidate_hex": _h16(0x0000),
|
|
"write_semantics_candidate": (
|
|
"loc_4067 is MOV:G.W #H'00, @(-H'0790,R2): the byte immediate is "
|
|
"zero-extended by the word destination, so the queue slot becomes H'0000."
|
|
),
|
|
"runtime_trace_confirmation": {
|
|
"source": "h8536_emulator_probe target-frame run",
|
|
"report_id_hex": _h16(0x0000),
|
|
"queue_write_address_hex": _h16(IDLE_REPORT_QUEUE_WRITE),
|
|
"queue_write_semantics": "H'FFFF -> H'0000, not H'00FF",
|
|
"dequeue_path": ["loc_4046", "loc_BAF2", "loc_BB08", "loc_BB1C", "loc_BB20", "loc_BB2B", "loc_BA26"],
|
|
"emitted_frame_hex": "00 00 00 00 80 DA",
|
|
"checksum_seed_hex": _h16(CHECKSUM_SEED, width=2),
|
|
"checksum_hex": "H'DA",
|
|
},
|
|
"evidence_addresses": _addresses_in_ranges(
|
|
ordered,
|
|
[(IDLE_REPORT_GATE_ENTRY, IDLE_REPORT_GATE_END)],
|
|
IDLE_REPORT_GATE_ENTRY,
|
|
IDLE_REPORT_GATE_END,
|
|
),
|
|
},
|
|
{
|
|
"name": "queue_has_pending_report",
|
|
"entry_label": "loc_BAF2",
|
|
"condition_candidate": "F9B5 != F9B0",
|
|
"summary": "Queue/pending cursor gate; non-empty state stages through BB43 before loc_BA26.",
|
|
"state_addresses_hex": [_h16(0xF9B5), _h16(0xF9B0)],
|
|
"staging_path": ["loc_BAF2", "loc_BB43", "loc_BA26"],
|
|
"evidence_addresses": _addresses_in_ranges(
|
|
ordered,
|
|
[(QUEUE_REPORT_ENTRY, AUTONOMOUS_TX_REPORT_CALL)],
|
|
QUEUE_REPORT_ENTRY,
|
|
AUTONOMOUS_TX_REPORT_CALL,
|
|
),
|
|
},
|
|
{
|
|
"name": "periodic_resend_may_fire",
|
|
"entry_label": "loc_BE9E",
|
|
"target_label": "loc_BED5",
|
|
"condition_candidate": (
|
|
"(FAA5 & FAA3 & 0x80) != 0 && F9C6 == 0 && F9C8 != 0 after countdown"
|
|
),
|
|
"summary": "Resend gate masks pending state with FAA5, checks F9C6/F9C8, then calls BA26 at BED5.",
|
|
"state_addresses_hex": [_h16(0xFAA5), _h16(0xFAA3), _h16(0xF9C6), _h16(0xF9C8)],
|
|
"evidence_addresses": _addresses_in_ranges(
|
|
ordered,
|
|
[(RESEND_GATE_ENTRY, PERIODIC_RESEND_ENTRY)],
|
|
RESEND_GATE_ENTRY,
|
|
PERIODIC_RESEND_ENTRY,
|
|
),
|
|
},
|
|
],
|
|
"session_effects": [
|
|
{
|
|
"name": "rx_completion_sets_session_timer",
|
|
"summary": "RX completion sets F9C5 (observed reload H'14) after the sixth byte is captured.",
|
|
"state_addresses_hex": [_h16(0xF9C5)],
|
|
"evidence_addresses": _state_immediate_evidence(ordered, 0xF9C5, 0x14),
|
|
},
|
|
{
|
|
"name": "session_timeout_clears_gate_and_queue",
|
|
"entry_label": "loc_3FEF",
|
|
"summary": "When F9C5 is clear, loc_3FEF clears F9B5/F9B0 and clears FAA5.bit7; when nonzero, it sets FAA5.bit7.",
|
|
"state_addresses_hex": [_h16(0xF9C5), _h16(0xF9B5), _h16(0xF9B0), _h16(0xFAA5)],
|
|
"evidence_addresses": _addresses_in_ranges(
|
|
ordered,
|
|
[(SESSION_GATE_ENTRY, 0x4007)],
|
|
SESSION_GATE_ENTRY,
|
|
0x4007,
|
|
),
|
|
},
|
|
{
|
|
"name": "idle_heartbeat_gate_initial_delay_loaded",
|
|
"summary": "Startup/init loads F9C4 with H'14 before the first idle/default report can be queued.",
|
|
"state_addresses_hex": [_h16(0xF9C4)],
|
|
"reload_value_hex": _h16(0x14, width=2),
|
|
"evidence_addresses": _state_immediate_evidence(ordered, 0xF9C4, 0x14),
|
|
},
|
|
{
|
|
"name": "idle_heartbeat_gate_post_send_delay_loaded",
|
|
"summary": "loc_BA26 reloads F9C4 with H'07 after each send, matching the observed heartbeat spacing.",
|
|
"state_addresses_hex": [_h16(0xF9C4)],
|
|
"reload_value_hex": _h16(0x07, width=2),
|
|
"evidence_addresses": _state_immediate_evidence(ordered, 0xF9C4, 0x07),
|
|
},
|
|
{
|
|
"name": "host_ack_can_advance_queue",
|
|
"summary": (
|
|
"Command 0x05 is a continuation-only ACK/session clear path: it clears "
|
|
"FAA3/FAA2 and advances F9B5 only when queued-report FAA2.bit3 was set. "
|
|
"Selector 0x0040 has no response; if FAA2 == 0 the command falls through "
|
|
"instead of doing ACK work."
|
|
),
|
|
"command_values_hex": [_h16(value, width=2) for value in command_ack_values],
|
|
"state_addresses_hex": [_h16(0xFAA2), _h16(0xFAA3), _h16(0xF9B5)],
|
|
"evidence_addresses": _dedupe_ints(
|
|
addr
|
|
for command in commands
|
|
if command.get("command_value") == 0x05
|
|
for addr in command.get("evidence_addresses", [])
|
|
if isinstance(addr, int)
|
|
),
|
|
},
|
|
],
|
|
"caveat": (
|
|
"Many panel controls may require host/session traffic before reporting. Observed "
|
|
"autonomous call/camera-power indexes are runtime/capture overlays, not ROM constants."
|
|
),
|
|
"confidence": "candidate-medium",
|
|
"evidence_addresses": evidence,
|
|
"evidence_addresses_hex": _hlist(evidence),
|
|
}
|
|
|
|
|
|
def _timer_interrupt_model(ordered: list[JsonObject]) -> JsonObject | None:
|
|
present_addresses = {int(ins["address"]) for ins in ordered if isinstance(ins.get("address"), int)}
|
|
frt1_evidence = _dedupe_ints(
|
|
_addresses_in_ranges(ordered, [(FRT1_OCIA_ENTRY, 0xBF08)], FRT1_OCIA_ENTRY, 0xBF08)
|
|
)
|
|
frt2_evidence = _dedupe_ints(
|
|
_addresses_in_ranges(ordered, [(FRT2_OCIA_ENTRY, 0xBF37)], FRT2_OCIA_ENTRY, 0xBF37)
|
|
)
|
|
sources: list[JsonObject] = []
|
|
if frt1_evidence:
|
|
frt1_counters = [
|
|
counter
|
|
for counter in (
|
|
_timer_counter(0xF9C0, "tx_report_gate_counter_candidate", "candidate gate counter used before entering the report builder.", 0xBEF4),
|
|
_timer_counter(0xF9C1, "rx_interbyte_timeout_candidate", "candidate RX interbyte timeout counter.", 0xBEFE),
|
|
_timer_counter(0xF9C6, "periodic_resend_cadence_counter_candidate", "candidate periodic resend/heartbeat cadence counter.", 0xBF08),
|
|
)
|
|
if int(counter["evidence_address"]) in present_addresses
|
|
]
|
|
sources.append(
|
|
{
|
|
"source": "FRT1 OCIA",
|
|
"vector_address_hex": _h16(0x0062),
|
|
"handler_address": FRT1_OCIA_ENTRY,
|
|
"handler_address_hex": _h16(FRT1_OCIA_ENTRY),
|
|
"summary": "Candidate periodic tick ISR for serial busy, interbyte, and resend counters.",
|
|
"counters": frt1_counters,
|
|
"evidence_addresses": frt1_evidence,
|
|
"evidence_addresses_hex": _hlist(frt1_evidence),
|
|
}
|
|
)
|
|
if frt2_evidence:
|
|
frt2_counters = [
|
|
counter
|
|
for counter in (
|
|
_timer_counter(0xF9C4, "idle_heartbeat_gate_countdown_candidate", "candidate idle/default report enqueue countdown.", 0xBF2D),
|
|
_timer_counter(0xF9C5, "rx_session_timeout_candidate", "candidate RX/session maintenance timeout counter.", 0xBF37),
|
|
)
|
|
if int(counter["evidence_address"]) in present_addresses
|
|
]
|
|
sources.append(
|
|
{
|
|
"source": "FRT2 OCIA",
|
|
"vector_address_hex": _h16(0x006A),
|
|
"handler_address": FRT2_OCIA_ENTRY,
|
|
"handler_address_hex": _h16(FRT2_OCIA_ENTRY),
|
|
"summary": "Candidate periodic tick ISR for idle heartbeat/report and RX session counters.",
|
|
"clock_select": "CKS1=1 CKS0=0 => phi/32",
|
|
"ocra_value_hex": "H'7A12",
|
|
"manual_reference": "Manual/0900766b802125d0.md:12038 FRT CKS1/CKS0 clock select",
|
|
"counters": frt2_counters,
|
|
"evidence_addresses": frt2_evidence,
|
|
"evidence_addresses_hex": _hlist(frt2_evidence),
|
|
}
|
|
)
|
|
if not sources:
|
|
return None
|
|
|
|
counters = _dedupe_timer_counters(
|
|
counter
|
|
for source in sources
|
|
for counter in source.get("counters", [])
|
|
if isinstance(counter, dict)
|
|
)
|
|
evidence = _dedupe_ints(
|
|
addr
|
|
for source in sources
|
|
for addr in source.get("evidence_addresses", [])
|
|
if isinstance(addr, int)
|
|
)
|
|
return {
|
|
"kind": "timer_interrupt_model_candidate",
|
|
"source": " / ".join(str(source["source"]) for source in sources),
|
|
"summary": "FRT compare-match handlers decrement serial gate, timeout, and cadence counters.",
|
|
"sources": sources,
|
|
"counters": counters,
|
|
"evidence_addresses": evidence,
|
|
"evidence_addresses_hex": _hlist(evidence),
|
|
"confidence": "candidate-medium",
|
|
}
|
|
|
|
|
|
def _timer_counter(address: int, name: str, role: str, evidence_address: int) -> JsonObject:
|
|
return {
|
|
"address": address,
|
|
"address_hex": _h16(address),
|
|
"name_candidate": name,
|
|
"role": role,
|
|
"evidence_address": evidence_address,
|
|
"evidence_address_hex": _h16(evidence_address),
|
|
}
|
|
|
|
|
|
def _dedupe_timer_counters(counters: Iterable[JsonObject]) -> list[JsonObject]:
|
|
output = []
|
|
seen: set[int] = set()
|
|
for counter in counters:
|
|
address = counter.get("address")
|
|
if not isinstance(address, int) or address in seen:
|
|
continue
|
|
seen.add(address)
|
|
output.append(counter)
|
|
return output
|
|
|
|
|
|
def _tx_report_model(ordered: list[JsonObject], responses: list[JsonObject]) -> JsonObject | None:
|
|
report_responses = [
|
|
response for response in responses
|
|
if response.get("call_address") == AUTONOMOUS_TX_REPORT_CALL
|
|
]
|
|
if not report_responses:
|
|
report_responses = [
|
|
response for response in responses
|
|
if _response_reads_current_value_table(response)
|
|
and not _response_reads_rx_frame(response)
|
|
]
|
|
if not report_responses:
|
|
return None
|
|
|
|
response_ids = [
|
|
str(response["id"])
|
|
for response in report_responses
|
|
if isinstance(response.get("id"), str)
|
|
]
|
|
evidence = _dedupe_ints(
|
|
addr
|
|
for response in report_responses
|
|
for addr in response.get("evidence_addresses", [])
|
|
if isinstance(addr, int)
|
|
)
|
|
byte_roles = [
|
|
{
|
|
"offset": 0,
|
|
"field_candidate": "encoded_logical_index_or_report_id_byte0",
|
|
"source_candidate": "computed from candidate logical index/report id",
|
|
},
|
|
{
|
|
"offset": 1,
|
|
"field_candidate": "encoded_logical_index_or_report_id_byte1",
|
|
"source_candidate": "computed from candidate logical index/report id",
|
|
},
|
|
{
|
|
"offset": 2,
|
|
"field_candidate": "encoded_logical_index_or_report_id_byte2",
|
|
"source_candidate": "computed from candidate logical index/report id",
|
|
},
|
|
{
|
|
"offset": 3,
|
|
"field_candidate": "current_value_hi",
|
|
"source_candidate": "current_value_table_candidate high byte",
|
|
"table_candidate": "current_value_table_candidate",
|
|
},
|
|
{
|
|
"offset": 4,
|
|
"field_candidate": "current_value_lo",
|
|
"source_candidate": "current_value_table_candidate low byte",
|
|
"table_candidate": "current_value_table_candidate",
|
|
},
|
|
{
|
|
"offset": 5,
|
|
"field_candidate": "checksum",
|
|
"source_candidate": "0x5A XOR TX[0..4]",
|
|
},
|
|
]
|
|
return {
|
|
"kind": "bb43_to_ba26_tx_report_model_candidate",
|
|
"direction": "device_to_host_autonomous_report_candidate",
|
|
"entry_label": AUTONOMOUS_TX_REPORT_LABEL,
|
|
"entry_address": AUTONOMOUS_TX_REPORT_CALL,
|
|
"entry_address_hex": _h16(AUTONOMOUS_TX_REPORT_CALL),
|
|
"send_builder": SEND_BUILDER_LABEL,
|
|
"send_builder_address": SEND_BUILDER_ADDRESS,
|
|
"send_builder_address_hex": _h16(SEND_BUILDER_ADDRESS),
|
|
"response_candidates": _dedupe_strings(response_ids),
|
|
"summary": (
|
|
"TX report bytes 0..2 are computed encoded logical index/report id bytes, "
|
|
"bytes 3..4 come from current_value_table_candidate, and byte5 is the "
|
|
"0x5A XOR checksum."
|
|
),
|
|
"byte_roles": byte_roles,
|
|
"value_source_candidate": "current_value_table_candidate",
|
|
"checksum_formula": "checksum = 0x5A ^ byte0 ^ byte1 ^ byte2 ^ byte3 ^ byte4",
|
|
"observed_capture_overlay_candidates": OBSERVED_TX_REPORT_OVERLAY,
|
|
"runtime_confirmed_paths": [
|
|
{
|
|
"name": "idle_heartbeat_report_runtime_confirmation",
|
|
"report_id_hex": _h16(0x0000),
|
|
"queue_write_address_hex": _h16(IDLE_REPORT_QUEUE_WRITE),
|
|
"queue_write_semantics": "MOV:G.W #H'00 writes H'0000 to the queue slot",
|
|
"staging_path": ["loc_4046", "loc_BAF2", "loc_BB08", "loc_BB1C", "loc_BB20", "loc_BB2B", "loc_BA26"],
|
|
"emitted_frame_hex": "00 00 00 00 80 DA",
|
|
"checksum_hex": "H'DA",
|
|
}
|
|
],
|
|
"consistency_checks": [
|
|
{
|
|
"name": "idle_heartbeat_report_id_width",
|
|
"status": "pass",
|
|
"summary": (
|
|
"Decompiler mnemonic MOV:G.W and emulator execution now agree that the "
|
|
"H'00 immediate at loc_4067 is zero-extended to report H'0000."
|
|
),
|
|
}
|
|
],
|
|
"observed_autonomous_output_caveat": (
|
|
"Real captures supplied so far show only heartbeat/idle, call, and camera-power "
|
|
"autonomous TX frames. Other panel controls may require a host/device request or "
|
|
"state transition before the firmware reports them."
|
|
),
|
|
"confidence": "candidate-medium",
|
|
"caveat": (
|
|
"This is a TX/report model for the BB43 -> BA26 path, separate from RX command "
|
|
"dispatch. Observed report names are a capture overlay candidate only, not hard-coded "
|
|
"source truth."
|
|
),
|
|
"evidence_addresses": evidence,
|
|
"evidence_addresses_hex": _hlist(evidence),
|
|
}
|
|
|
|
|
|
def _periodic_resend_model(ordered: list[JsonObject], responses: list[JsonObject]) -> JsonObject | None:
|
|
del responses
|
|
period_evidence = _state_immediate_evidence(ordered, 0xF9C6, 0x01F4)
|
|
countdown_evidence = _state_immediate_evidence(ordered, 0xF9C8, 0x14)
|
|
pending_evidence = _state_immediate_evidence(ordered, 0xFAA3, 0x80)
|
|
pending_evidence = _dedupe_ints(pending_evidence + _state_bit_evidence(ordered, 0xFAA3, 7))
|
|
resend_evidence = [
|
|
int(ins["address"])
|
|
for ins in ordered
|
|
if PERIODIC_RESEND_ENTRY <= int(ins.get("address", -1)) <= SERIAL_HANDLER_END
|
|
]
|
|
resend_send_evidence = [
|
|
int(ins["address"])
|
|
for ins in ordered
|
|
if PERIODIC_RESEND_ENTRY <= int(ins.get("address", -1)) <= SERIAL_HANDLER_END
|
|
and (_is_send_builder_call(ins) or _has_ref_in_range(ins, TX_STAGING_START, TX_FRAME_END))
|
|
]
|
|
evidence = _dedupe_ints(period_evidence + countdown_evidence + pending_evidence + resend_send_evidence)
|
|
if not evidence and not resend_evidence:
|
|
return None
|
|
return {
|
|
"kind": "autonomous_periodic_resend_model_candidate",
|
|
"period_timer": {
|
|
"address": 0xF9C6,
|
|
"address_hex": _h16(0xF9C6),
|
|
"reload_value_candidate": 0x01F4,
|
|
"reload_value_hex": _h16(0x01F4),
|
|
"summary": "Candidate periodic report/heartbeat timer reload.",
|
|
"evidence_addresses": period_evidence,
|
|
"evidence_addresses_hex": _hlist(period_evidence),
|
|
},
|
|
"resend_countdown": {
|
|
"address": 0xF9C8,
|
|
"address_hex": _h16(0xF9C8),
|
|
"reload_value_candidate": 0x14,
|
|
"reload_value_hex": _h16(0x14, width=2),
|
|
"summary": "Candidate periodic resend countdown/retry spacing value.",
|
|
"evidence_addresses": countdown_evidence,
|
|
"evidence_addresses_hex": _hlist(countdown_evidence),
|
|
},
|
|
"pending_mask": {
|
|
"address": 0xFAA3,
|
|
"address_hex": _h16(0xFAA3),
|
|
"mask_candidate": 0x80,
|
|
"mask_hex": _h16(0x80, width=2),
|
|
"summary": "Candidate bit/mask that marks an autonomous report pending.",
|
|
"evidence_addresses": pending_evidence,
|
|
"evidence_addresses_hex": _hlist(pending_evidence),
|
|
},
|
|
"resend_path": {
|
|
"entry_label": "loc_BED5",
|
|
"entry_address": PERIODIC_RESEND_ENTRY,
|
|
"entry_address_hex": _h16(PERIODIC_RESEND_ENTRY),
|
|
"summary": "Candidate periodic resend path feeding the TX staging/send-builder flow.",
|
|
"evidence_addresses": _dedupe_ints(resend_send_evidence or resend_evidence),
|
|
"evidence_addresses_hex": _hlist(resend_send_evidence or resend_evidence),
|
|
},
|
|
"evidence_addresses": evidence,
|
|
"evidence_addresses_hex": _hlist(evidence),
|
|
"confidence": "candidate-medium" if evidence else "candidate-low",
|
|
"caveat": (
|
|
"Timer and resend roles are inferred from constants/state references around F9C6, "
|
|
"F9C8, FAA3, and loc_BED5; exact scheduling units remain candidate phrasing."
|
|
),
|
|
}
|
|
|
|
|
|
def _response_reads_current_value_table(response: Mapping[str, Any]) -> bool:
|
|
schema = response.get("schema")
|
|
if not isinstance(schema, Mapping):
|
|
return False
|
|
return any(
|
|
isinstance(item, Mapping)
|
|
and isinstance(item.get("source"), Mapping)
|
|
and item["source"].get("kind") == "table"
|
|
and item["source"].get("name_candidate") == "current_value_table_candidate"
|
|
for item in schema.get("bytes", [])
|
|
)
|
|
|
|
|
|
def _response_reads_rx_frame(response: Mapping[str, Any]) -> bool:
|
|
schema = response.get("schema")
|
|
if not isinstance(schema, Mapping):
|
|
return False
|
|
return any(
|
|
isinstance(item, Mapping)
|
|
and isinstance(item.get("source"), Mapping)
|
|
and item["source"].get("kind") == "rx_frame_byte"
|
|
for item in schema.get("bytes", [])
|
|
)
|
|
|
|
|
|
def _state_immediate_evidence(ordered: list[JsonObject], state_address: int, value: int) -> list[int]:
|
|
evidence = []
|
|
for ins in ordered:
|
|
if not _has_ref_in_range(ins, state_address, state_address):
|
|
continue
|
|
source, _destination = _source_destination_operands(str(ins.get("operands", "")))
|
|
if _parse_immediate(source) == value:
|
|
evidence.append(int(ins["address"]))
|
|
return _dedupe_ints(evidence)
|
|
|
|
|
|
def _state_bit_evidence(ordered: list[JsonObject], state_address: int, bit: int) -> list[int]:
|
|
return _dedupe_ints(
|
|
int(ins["address"])
|
|
for ins in ordered
|
|
if _has_ref_in_range(ins, state_address, state_address)
|
|
and _bit_number_from_instruction(ins) == bit
|
|
)
|
|
|
|
|
|
def _send_builder_candidate(
|
|
ordered: list[JsonObject],
|
|
responses: list[JsonObject],
|
|
tx_candidate: Mapping[str, Any] | None,
|
|
) -> JsonObject:
|
|
copies = []
|
|
builder_body = [
|
|
ins
|
|
for ins in ordered
|
|
if SEND_BUILDER_ADDRESS <= int(ins.get("address", -1)) <= 0xBA83
|
|
]
|
|
for ins in builder_body:
|
|
source, destination = _source_destination_operands(str(ins.get("operands", "")))
|
|
source_address = _first_address_in_range(ins, TX_STAGING_START, TX_STAGING_END, operand=source)
|
|
destination_address = _first_address_in_range(ins, TX_FRAME_START, TX_FRAME_END, operand=destination)
|
|
if source_address is None or destination_address is None:
|
|
continue
|
|
copies.append(
|
|
{
|
|
"instruction_address": int(ins["address"]),
|
|
"instruction_address_hex": _h16(int(ins["address"])),
|
|
"source_address": source_address,
|
|
"source_address_hex": _h16(source_address),
|
|
"destination_address": destination_address,
|
|
"destination_address_hex": _h16(destination_address),
|
|
"instruction": str(ins.get("text", "")),
|
|
}
|
|
)
|
|
|
|
call_addresses = [int(response["call_address"]) for response in responses]
|
|
evidence_addresses = _dedupe_ints(
|
|
[int(ins["address"]) for ins in builder_body if _has_ref_in_range(ins, TX_FRAME_START, TX_FRAME_END)]
|
|
+ call_addresses
|
|
)
|
|
return {
|
|
"kind": "tx_send_builder_candidate",
|
|
"label": SEND_BUILDER_LABEL,
|
|
"address": SEND_BUILDER_ADDRESS,
|
|
"address_hex": _h16(SEND_BUILDER_ADDRESS),
|
|
"staging_buffer_start": TX_STAGING_START,
|
|
"staging_buffer_start_hex": _h16(TX_STAGING_START),
|
|
"staging_buffer_end": TX_STAGING_END,
|
|
"staging_buffer_end_hex": _h16(TX_STAGING_END),
|
|
"tx_frame_start": TX_FRAME_START,
|
|
"tx_frame_start_hex": _h16(TX_FRAME_START),
|
|
"tx_frame_end": TX_FRAME_END,
|
|
"tx_frame_end_hex": _h16(TX_FRAME_END),
|
|
"checksum_address": TX_CHECKSUM_ADDRESS,
|
|
"checksum_address_hex": _h16(TX_CHECKSUM_ADDRESS),
|
|
"checksum_seed": CHECKSUM_SEED,
|
|
"checksum_seed_hex": _h16(CHECKSUM_SEED),
|
|
"staging_to_frame_copies": copies,
|
|
"response_call_addresses": call_addresses,
|
|
"response_call_addresses_hex": _hlist(call_addresses),
|
|
"serial_reconstruction_candidate_id": tx_candidate.get("id") if tx_candidate else None,
|
|
"evidence_addresses": evidence_addresses,
|
|
"evidence_addresses_hex": _hlist(evidence_addresses),
|
|
"confidence": "high" if copies and tx_candidate else "medium" if copies else "low",
|
|
"caveat": (
|
|
"loc_BA26 is treated as a send builder because it copies F850-F854 into the "
|
|
"evidence-supported TX frame and then starts SCI1 transmission."
|
|
),
|
|
}
|
|
|
|
|
|
def _top_level_evidence(
|
|
ordered: list[JsonObject],
|
|
dispatch: JsonObject | None,
|
|
responses: list[JsonObject],
|
|
rx_candidate: Mapping[str, Any] | None,
|
|
tx_candidate: Mapping[str, Any] | None,
|
|
) -> list[JsonObject]:
|
|
evidence: list[JsonObject] = []
|
|
if rx_candidate:
|
|
evidence.append(
|
|
{
|
|
"kind": "rx_frame_reconstruction_present",
|
|
"summary": "serial_reconstruction contains an evidence-supported SCI1 RX frame candidate",
|
|
"candidate_id": rx_candidate.get("id"),
|
|
}
|
|
)
|
|
if tx_candidate:
|
|
evidence.append(
|
|
{
|
|
"kind": "tx_frame_reconstruction_present",
|
|
"summary": "serial_reconstruction contains an evidence-supported SCI1 TX frame candidate",
|
|
"candidate_id": tx_candidate.get("id"),
|
|
}
|
|
)
|
|
if dispatch:
|
|
evidence.append(
|
|
{
|
|
"kind": "rx0_masked_command_dispatch",
|
|
"summary": "RX[0] is read, masked with 0x07, and compared against command values",
|
|
"addresses": dispatch.get("evidence_addresses", []),
|
|
"addresses_hex": dispatch.get("evidence_addresses_hex", []),
|
|
}
|
|
)
|
|
if responses:
|
|
addresses = _dedupe_ints(
|
|
[addr for response in responses for addr in response.get("evidence_addresses", [])]
|
|
)
|
|
evidence.append(
|
|
{
|
|
"kind": "responses_stage_f850_f854_before_send",
|
|
"summary": "F850-F854 writes are observed before calls to loc_BA26",
|
|
"addresses": addresses,
|
|
"addresses_hex": _hlist(addresses),
|
|
"response_count": len(responses),
|
|
}
|
|
)
|
|
tx_report_responses = [
|
|
response for response in responses
|
|
if response.get("call_address") == AUTONOMOUS_TX_REPORT_CALL
|
|
]
|
|
if tx_report_responses:
|
|
addresses = _dedupe_ints(
|
|
addr
|
|
for response in tx_report_responses
|
|
for addr in response.get("evidence_addresses", [])
|
|
if isinstance(addr, int)
|
|
)
|
|
evidence.append(
|
|
{
|
|
"kind": "bb43_autonomous_tx_report_path",
|
|
"summary": (
|
|
"BB43 stages a candidate device-to-host report before loc_BA26; this is "
|
|
"separate from RX command dispatch."
|
|
),
|
|
"addresses": addresses,
|
|
"addresses_hex": _hlist(addresses),
|
|
}
|
|
)
|
|
rx_payload_reads = [
|
|
int(ins["address"])
|
|
for ins in ordered
|
|
if any(_is_read_from_address(ins, address) for address in range(RX_FRAME_START + 1, RX_FRAME_START + 5))
|
|
]
|
|
if rx_payload_reads:
|
|
evidence.append(
|
|
{
|
|
"kind": "rx_payload_bytes_read",
|
|
"summary": "RX[1..4] are read in the command-processing region",
|
|
"addresses": _dedupe_ints(rx_payload_reads),
|
|
"addresses_hex": _hlist(rx_payload_reads),
|
|
}
|
|
)
|
|
return evidence
|
|
|
|
|
|
def _response_window(ordered: list[JsonObject], call_index: int) -> list[JsonObject]:
|
|
start = call_index
|
|
for index in range(call_index - 1, max(-1, call_index - 48), -1):
|
|
candidate = ordered[index]
|
|
mnemonic = str(candidate.get("mnemonic", "")).upper()
|
|
root = _mnemonic_root(mnemonic)
|
|
if root in {"RTS", "RTE"}:
|
|
break
|
|
if root in {
|
|
"BRA",
|
|
"BEQ",
|
|
"BNE",
|
|
"BCS",
|
|
"BCC",
|
|
"BHI",
|
|
"BLS",
|
|
"BGE",
|
|
"BLT",
|
|
"BGT",
|
|
"BLE",
|
|
"BMI",
|
|
"BPL",
|
|
"BVS",
|
|
"BVC",
|
|
}:
|
|
break
|
|
start = index
|
|
return ordered[start:call_index]
|
|
|
|
|
|
def _staging_writes(window: list[JsonObject]) -> list[JsonObject]:
|
|
writes: list[JsonObject] = []
|
|
for index, ins in enumerate(window):
|
|
touched = _written_addresses_in_range(ins, TX_STAGING_START, TX_STAGING_END)
|
|
if not touched:
|
|
continue
|
|
source, _destination = _source_destination_operands(str(ins.get("operands", "")))
|
|
source_info = _source_info(window, index, source)
|
|
writes.append(
|
|
{
|
|
"instruction_address": int(ins["address"]),
|
|
"instruction_address_hex": _h16(int(ins["address"])),
|
|
"addresses": touched,
|
|
"addresses_hex": _hlist(touched),
|
|
"source_operand": source,
|
|
"source": source_info,
|
|
"instruction": str(ins.get("text", "")),
|
|
}
|
|
)
|
|
return writes
|
|
|
|
|
|
def _source_info(window: list[JsonObject], index: int, source: str) -> JsonObject:
|
|
return _resolve_source_info(window, index, source, [])
|
|
|
|
|
|
def _resolve_source_info(
|
|
window: list[JsonObject],
|
|
index: int,
|
|
source: str,
|
|
transforms: list[str],
|
|
) -> JsonObject:
|
|
immediate = _parse_immediate(source)
|
|
if immediate is not None:
|
|
output = {
|
|
"kind": "immediate",
|
|
"value": immediate,
|
|
"value_hex": _h16(immediate, width=2 if immediate <= 0xFF else 4),
|
|
}
|
|
if transforms:
|
|
output["transforms"] = transforms
|
|
return output
|
|
|
|
direct = _direct_source_info(window[index] if 0 <= index < len(window) else {}, source)
|
|
if direct is not None:
|
|
if transforms:
|
|
direct = dict(direct)
|
|
direct["transforms"] = transforms
|
|
return direct
|
|
|
|
source_upper = source.upper()
|
|
for prior_index in range(index - 1, max(-1, index - 12), -1):
|
|
prior = window[prior_index]
|
|
prior_source, prior_destination = _source_destination_operands(str(prior.get("operands", "")))
|
|
if prior_destination.upper() != source_upper:
|
|
continue
|
|
root = _mnemonic_root(str(prior.get("mnemonic", "")))
|
|
if root in {"BTST", "CMP", "CMP:E", "CMP:G", "CMP:I", "TST"}:
|
|
continue
|
|
if root == "SWAP":
|
|
resolved = _resolve_source_info(window, prior_index, source, transforms + ["swap_bytes"])
|
|
elif root not in {"MOV:G", "MOV:S", "MOV:E", "MOV:I", "MOV:L", "MOV:F", "LDC"}:
|
|
resolved = {
|
|
"kind": "register_or_computed",
|
|
"operand": source,
|
|
"source_category": "computed",
|
|
"operation": root,
|
|
}
|
|
elif prior_source:
|
|
resolved = _resolve_source_info(window, prior_index, prior_source, transforms)
|
|
else:
|
|
resolved = {"kind": "register_or_computed", "operand": source}
|
|
resolved = dict(resolved)
|
|
resolved.setdefault("evidence_address", int(prior["address"]))
|
|
resolved.setdefault("evidence_address_hex", _h16(int(prior["address"])))
|
|
resolved.setdefault("instruction", str(prior.get("text", "")))
|
|
return resolved
|
|
return {
|
|
"kind": "register_or_computed",
|
|
"operand": source,
|
|
"source_category": "computed",
|
|
}
|
|
|
|
|
|
def _direct_source_info(ins: Mapping[str, Any], source: str) -> JsonObject | None:
|
|
rx_address = _first_address_in_range(ins, RX_FRAME_START, RX_FRAME_END, operand=source)
|
|
if rx_address is not None and _is_read_from_address(ins, rx_address):
|
|
width = _access_width(str(ins.get("mnemonic", "")))
|
|
offsets = [
|
|
address - RX_FRAME_START
|
|
for address in range(rx_address, min(rx_address + width - 1, RX_FRAME_END) + 1)
|
|
]
|
|
return {
|
|
"kind": "rx_frame_word" if len(offsets) > 1 else "rx_frame_byte",
|
|
"rx_offset": rx_address - RX_FRAME_START,
|
|
"rx_offsets": offsets,
|
|
"rx_address": rx_address,
|
|
"rx_address_hex": _h16(rx_address),
|
|
"evidence_address": int(ins["address"]) if isinstance(ins.get("address"), int) else None,
|
|
"evidence_address_hex": _h16(int(ins["address"])) if isinstance(ins.get("address"), int) else None,
|
|
"instruction": str(ins.get("text", "")),
|
|
}
|
|
|
|
tx_address = _first_address_in_range(ins, TX_FRAME_START, TX_FRAME_END, operand=source)
|
|
if tx_address is not None and _is_read_from_address(ins, tx_address):
|
|
width = _access_width(str(ins.get("mnemonic", "")))
|
|
offsets = [
|
|
address - TX_FRAME_START
|
|
for address in range(tx_address, min(tx_address + width - 1, TX_FRAME_END) + 1)
|
|
]
|
|
return {
|
|
"kind": "tx_frame_word" if len(offsets) > 1 else "tx_frame_byte",
|
|
"tx_offset": tx_address - TX_FRAME_START,
|
|
"tx_offsets": offsets,
|
|
"tx_address": tx_address,
|
|
"tx_address_hex": _h16(tx_address),
|
|
"evidence_address": int(ins["address"]) if isinstance(ins.get("address"), int) else None,
|
|
"evidence_address_hex": _h16(int(ins["address"])) if isinstance(ins.get("address"), int) else None,
|
|
"instruction": str(ins.get("text", "")),
|
|
}
|
|
|
|
table = _table_operand_candidate(source)
|
|
if table is not None:
|
|
output = dict(table)
|
|
output["kind"] = "table"
|
|
output["access_width"] = _access_width(str(ins.get("mnemonic", "")))
|
|
output["evidence_address"] = int(ins["address"]) if isinstance(ins.get("address"), int) else None
|
|
output["evidence_address_hex"] = _h16(int(ins["address"])) if isinstance(ins.get("address"), int) else None
|
|
output["instruction"] = str(ins.get("text", ""))
|
|
return output
|
|
|
|
return None
|
|
|
|
|
|
def _rx_reads(window: list[JsonObject], start: int, end: int) -> list[JsonObject]:
|
|
reads: list[JsonObject] = []
|
|
for ins in window:
|
|
for address in range(start, end + 1):
|
|
if not _is_read_from_address(ins, address):
|
|
continue
|
|
reads.append(
|
|
{
|
|
"instruction_address": int(ins["address"]),
|
|
"instruction_address_hex": _h16(int(ins["address"])),
|
|
"rx_offset": address - RX_FRAME_START,
|
|
"rx_address": address,
|
|
"rx_address_hex": _h16(address),
|
|
"instruction": str(ins.get("text", "")),
|
|
}
|
|
)
|
|
return reads
|
|
|
|
|
|
def _rx_reads_in_ranges(
|
|
ordered: list[JsonObject],
|
|
ranges: list[tuple[int, int]],
|
|
) -> list[JsonObject]:
|
|
reads: list[JsonObject] = []
|
|
for ins in ordered:
|
|
address = int(ins.get("address", -1))
|
|
if not any(start <= address <= end for start, end in ranges):
|
|
continue
|
|
reads.extend(_rx_reads([ins], RX_FRAME_START + 1, RX_FRAME_START + 4))
|
|
seen: set[tuple[int, int]] = set()
|
|
output: list[JsonObject] = []
|
|
for read in reads:
|
|
key = (int(read["instruction_address"]), int(read["rx_address"]))
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
output.append(read)
|
|
return output
|
|
|
|
|
|
def _response_in_ranges(response: Mapping[str, Any], ranges: list[tuple[int, int]]) -> bool:
|
|
call_address = int(response.get("call_address", -1))
|
|
return any(start <= call_address <= end for start, end in ranges)
|
|
|
|
|
|
def _addresses_in_ranges(
|
|
ordered: list[JsonObject],
|
|
ranges: list[tuple[int, int]],
|
|
start: int,
|
|
end: int,
|
|
) -> list[int]:
|
|
return _dedupe_ints(
|
|
int(ins["address"])
|
|
for ins in ordered
|
|
if start <= int(ins.get("address", -1)) <= end
|
|
and any(range_start <= int(ins.get("address", -1)) <= range_end for range_start, range_end in ranges)
|
|
)
|
|
|
|
|
|
def _table_accesses_in_ranges(
|
|
ordered: list[JsonObject],
|
|
ranges: list[tuple[int, int]],
|
|
) -> list[JsonObject]:
|
|
accesses: list[JsonObject] = []
|
|
for ins in ordered:
|
|
address = int(ins.get("address", -1))
|
|
if not any(start <= address <= end for start, end in ranges):
|
|
continue
|
|
for operand in _negative_indexed_operands(str(ins.get("operands", ""))):
|
|
offset = int(operand["negative_offset"])
|
|
if offset not in LOGICAL_TABLES:
|
|
continue
|
|
candidate = _table_operand_candidate(str(operand["operand"]))
|
|
if candidate is None:
|
|
continue
|
|
accesses.append(
|
|
{
|
|
"instruction_address": address,
|
|
"negative_offset": offset,
|
|
"access": _operand_access_kind(ins, str(operand["operand"])),
|
|
"candidate": candidate,
|
|
}
|
|
)
|
|
for direct_address, offset in DIRECT_TABLE_TO_LOGICAL_OFFSET.items():
|
|
if not _has_ref_in_range(ins, direct_address, direct_address):
|
|
continue
|
|
accesses.append(
|
|
{
|
|
"instruction_address": address,
|
|
"negative_offset": offset,
|
|
"access": _access_direction(ins, direct_address) or "read_write_candidate",
|
|
"candidate": DIRECT_TABLE_CANDIDATES[direct_address],
|
|
"direct_address": direct_address,
|
|
"direct_address_hex": _h16(direct_address),
|
|
}
|
|
)
|
|
return accesses
|
|
|
|
|
|
def _state_accesses_in_ranges(
|
|
ordered: list[JsonObject],
|
|
ranges: list[tuple[int, int]],
|
|
) -> list[JsonObject]:
|
|
accesses: list[JsonObject] = []
|
|
for ins in ordered:
|
|
address = int(ins.get("address", -1))
|
|
if not any(start <= address <= end for start, end in ranges):
|
|
continue
|
|
for state_address in STATE_VARIABLES:
|
|
if not _has_ref_in_range(ins, state_address, state_address):
|
|
continue
|
|
accesses.append(
|
|
{
|
|
"instruction_address": address,
|
|
"state_address": state_address,
|
|
"access": _access_direction(ins, state_address) or "read_write_candidate",
|
|
}
|
|
)
|
|
return accesses
|
|
|
|
|
|
def _table_access_addresses(
|
|
accesses: list[JsonObject],
|
|
negative_offset: int,
|
|
access_kind: str | None = None,
|
|
) -> list[int]:
|
|
return _dedupe_ints(
|
|
int(access["instruction_address"])
|
|
for access in accesses
|
|
if access.get("negative_offset") == negative_offset
|
|
and (access_kind is None or access.get("access") == access_kind)
|
|
)
|
|
|
|
|
|
def _state_access_addresses(accesses: list[JsonObject], address: int) -> list[int]:
|
|
return _dedupe_ints(
|
|
int(access["instruction_address"])
|
|
for access in accesses
|
|
if access.get("state_address") == address
|
|
)
|
|
|
|
|
|
def _handler_end(
|
|
ordered: list[JsonObject],
|
|
start: int,
|
|
handler_starts: list[int],
|
|
) -> int | None:
|
|
addresses = [int(ins["address"]) for ins in ordered]
|
|
try:
|
|
start_index = addresses.index(start)
|
|
except ValueError:
|
|
return None
|
|
|
|
later_starts = [candidate for candidate in handler_starts if candidate > start]
|
|
if later_starts:
|
|
next_start = min(later_starts)
|
|
previous = [address for address in addresses if start <= address < next_start]
|
|
return previous[-1] if previous else None
|
|
|
|
for ins in ordered[start_index:]:
|
|
mnemonic = str(ins.get("mnemonic", "")).upper()
|
|
if mnemonic in {"RTS", "RTE"}:
|
|
return int(ins["address"])
|
|
if mnemonic == "BRA" and SEND_BUILDER_ADDRESS not in _targets(ins):
|
|
targets = _targets(ins)
|
|
if targets and targets[0] >= 0xBE6D:
|
|
return int(ins["address"])
|
|
return None
|
|
|
|
|
|
def _find_prior_read(
|
|
ordered: list[JsonObject],
|
|
index: int,
|
|
address: int,
|
|
destination_register: str,
|
|
) -> JsonObject | None:
|
|
for candidate in reversed(ordered[max(0, index - 6) : index]):
|
|
if not _is_read_from_address(candidate, address):
|
|
continue
|
|
if _destination_operand(str(candidate.get("operands", ""))).upper() == destination_register.upper():
|
|
return candidate
|
|
return None
|
|
|
|
|
|
def _is_send_builder_call(ins: Mapping[str, Any]) -> bool:
|
|
mnemonic = str(ins.get("mnemonic", "")).upper()
|
|
if mnemonic not in {"BSR", "JSR", "PJSR"}:
|
|
return False
|
|
if SEND_BUILDER_ADDRESS in _targets(ins):
|
|
return True
|
|
return SEND_BUILDER_LABEL.upper() in str(ins.get("operands", "")).upper()
|
|
|
|
|
|
def _written_addresses_in_range(ins: Mapping[str, Any], start: int, end: int) -> list[int]:
|
|
if not _has_ref_in_range(ins, start, end):
|
|
return []
|
|
source, destination = _source_destination_operands(str(ins.get("operands", "")))
|
|
del source
|
|
base = _first_address_in_range(ins, start, end, operand=destination)
|
|
if base is None or not _is_write_to_address(ins, base):
|
|
return []
|
|
width = _access_width(str(ins.get("mnemonic", "")))
|
|
return [address for address in range(base, min(base + width - 1, end) + 1)]
|
|
|
|
|
|
def _read_addresses_in_range(ins: Mapping[str, Any], start: int, end: int) -> list[int]:
|
|
if not _has_ref_in_range(ins, start, end):
|
|
return []
|
|
source, _destination = _source_destination_operands(str(ins.get("operands", "")))
|
|
base = _first_address_in_range(ins, start, end, operand=source)
|
|
if base is None or not _is_read_from_address(ins, base):
|
|
return []
|
|
width = _access_width(str(ins.get("mnemonic", "")))
|
|
return [address for address in range(base, min(base + width - 1, end) + 1)]
|
|
|
|
|
|
def _is_read_from_address(ins: Mapping[str, Any], address: int) -> bool:
|
|
source, destination = _source_destination_operands(str(ins.get("operands", "")))
|
|
if _operand_mentions_address(source, address):
|
|
return True
|
|
if address not in _references(ins):
|
|
return False
|
|
if source.startswith("@") and not _operand_mentions_any_reference(destination, _references(ins)):
|
|
return True
|
|
return _access_direction(ins, address) == "read"
|
|
|
|
|
|
def _is_write_to_address(ins: Mapping[str, Any], address: int) -> bool:
|
|
_source, destination = _source_destination_operands(str(ins.get("operands", "")))
|
|
if _operand_mentions_address(destination, address):
|
|
return _access_direction(ins, address) == "write"
|
|
if address not in _references(ins):
|
|
return False
|
|
return _access_direction(ins, address) == "write"
|
|
|
|
|
|
def _access_direction(ins: Mapping[str, Any], address: int) -> str | None:
|
|
root = _mnemonic_root(str(ins.get("mnemonic", "")))
|
|
if root in {"BTST", "CMP", "CMP:E", "CMP:G", "CMP:I", "MOVFPE", "TST"}:
|
|
return "read"
|
|
if root in {"BCLR", "BNOT", "BSET", "CLR", "INC", "INC:G", "NEG", "NOT"}:
|
|
return "write"
|
|
if root in {"ADD:Q", "ADD:G", "ADDS", "ADDX", "AND", "OR", "SUB", "SUBS", "SUBX", "XOR"}:
|
|
return "write"
|
|
if root in {"MOV:G", "MOV:S", "MOVTPE"}:
|
|
source, destination = _source_destination_operands(str(ins.get("operands", "")))
|
|
if _operand_mentions_address(destination, address):
|
|
return "write"
|
|
if _operand_mentions_address(source, address):
|
|
return "read"
|
|
if address in _references(ins):
|
|
if destination.startswith("@") and not _operand_mentions_any_reference(source, _references(ins)):
|
|
return "write"
|
|
if source.startswith("@") and not _operand_mentions_any_reference(destination, _references(ins)):
|
|
return "read"
|
|
if root in {"MOV:L", "MOV:F"}:
|
|
return "read"
|
|
if root == "STC":
|
|
return "write"
|
|
if root == "LDC":
|
|
return "read"
|
|
return None
|
|
|
|
|
|
def _first_address_in_range(
|
|
ins: Mapping[str, Any],
|
|
start: int,
|
|
end: int,
|
|
*,
|
|
operand: str = "",
|
|
) -> int | None:
|
|
if operand:
|
|
for address in range(start, end + 1):
|
|
if _operand_mentions_address(operand, address):
|
|
return address
|
|
for address in _references(ins):
|
|
if start <= address <= end:
|
|
return address
|
|
return None
|
|
|
|
|
|
def _has_ref_in_range(ins: Mapping[str, Any], start: int, end: int) -> bool:
|
|
return any(start <= address <= end for address in _references(ins))
|
|
|
|
|
|
def _references(ins: Mapping[str, Any]) -> list[int]:
|
|
references = ins.get("references", [])
|
|
output: list[int] = []
|
|
if not isinstance(references, list):
|
|
return output
|
|
for reference in references:
|
|
if isinstance(reference, Mapping) and isinstance(reference.get("address"), int):
|
|
output.append(int(reference["address"]))
|
|
elif isinstance(reference, int):
|
|
output.append(reference)
|
|
return output
|
|
|
|
|
|
def _targets(ins: Mapping[str, Any]) -> list[int]:
|
|
targets = ins.get("targets", [])
|
|
if not isinstance(targets, list):
|
|
return []
|
|
return [int(target) for target in targets if isinstance(target, int)]
|
|
|
|
|
|
def _instruction_sequence(value: object) -> list[JsonObject]:
|
|
if isinstance(value, Mapping):
|
|
values: Iterable[Any] = value.values()
|
|
elif isinstance(value, list):
|
|
values = value
|
|
else:
|
|
values = []
|
|
return sorted(
|
|
[item for item in values if isinstance(item, dict) and isinstance(item.get("address"), int)],
|
|
key=lambda item: int(item["address"]),
|
|
)
|
|
|
|
|
|
def _serial_reconstruction(payload: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
serial = payload.get("serial_reconstruction")
|
|
return serial if isinstance(serial, Mapping) else {}
|
|
|
|
|
|
def _candidate_by_kind(serial: Mapping[str, Any], kind: str) -> Mapping[str, Any] | None:
|
|
candidates = serial.get("candidates")
|
|
if not isinstance(candidates, list):
|
|
return None
|
|
for candidate in candidates:
|
|
if isinstance(candidate, Mapping) and candidate.get("kind") == kind:
|
|
return candidate
|
|
return None
|
|
|
|
|
|
def _source_destination_operands(operands: str) -> tuple[str, str]:
|
|
depth = 0
|
|
split_at: int | None = None
|
|
for index, char in enumerate(operands):
|
|
if char in "({":
|
|
depth += 1
|
|
elif char in ")}" and depth:
|
|
depth -= 1
|
|
elif char == "," and depth == 0:
|
|
split_at = index
|
|
if split_at is None:
|
|
operand = operands.strip()
|
|
return "", operand
|
|
return operands[:split_at].strip(), operands[split_at + 1 :].strip()
|
|
|
|
|
|
def _destination_operand(operands: str) -> str:
|
|
return _source_destination_operands(operands)[1]
|
|
|
|
|
|
def _immediate_source_value(operands: str) -> int | None:
|
|
source, _destination = _source_destination_operands(operands)
|
|
if not source.startswith("#"):
|
|
return None
|
|
return _parse_immediate(source)
|
|
|
|
|
|
def _parse_immediate(operand: str) -> int | None:
|
|
text = operand.strip()
|
|
if text.startswith("#"):
|
|
text = text[1:].strip()
|
|
try:
|
|
if text.upper().startswith("H'"):
|
|
return int(text[2:], 16) & 0xFFFF
|
|
if text.upper().startswith("0X"):
|
|
return int(text, 16) & 0xFFFF
|
|
if text.upper().startswith("$"):
|
|
return int(text[1:], 16) & 0xFFFF
|
|
return int(text, 10) & 0xFFFF
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _negative_indexed_operands(operands: str) -> list[JsonObject]:
|
|
matches: list[JsonObject] = []
|
|
for match in re.finditer(r"@\(-H'([0-9A-Fa-f]+),\s*(R[0-7])\)", operands):
|
|
offset = int(match.group(1), 16) & 0xFFFF
|
|
matches.append(
|
|
{
|
|
"operand": match.group(0),
|
|
"negative_offset": offset,
|
|
"negative_offset_hex": _h16(offset),
|
|
"index_register": match.group(2).upper(),
|
|
}
|
|
)
|
|
return matches
|
|
|
|
|
|
def _table_operand_candidate(operand: str) -> JsonObject | None:
|
|
operands = _negative_indexed_operands(operand)
|
|
if not operands:
|
|
return None
|
|
first = operands[0]
|
|
offset = int(first["negative_offset"])
|
|
metadata = LOGICAL_TABLES.get(offset)
|
|
if metadata is None:
|
|
return None
|
|
logical_base = int(metadata["logical_base_address"])
|
|
return {
|
|
"name_candidate": metadata["name_candidate"],
|
|
"element_candidate": metadata["element_candidate"],
|
|
"logical_base_address": logical_base,
|
|
"logical_base_address_hex": _h16(logical_base),
|
|
"negative_offset": offset,
|
|
"negative_offset_hex": _h16(offset),
|
|
"index_register": first["index_register"],
|
|
"operand": first["operand"],
|
|
}
|
|
|
|
|
|
def _operand_access_kind(ins: Mapping[str, Any], operand: str) -> str:
|
|
root = _mnemonic_root(str(ins.get("mnemonic", "")))
|
|
source, destination = _source_destination_operands(str(ins.get("operands", "")))
|
|
if root in {"BTST", "CMP", "CMP:E", "CMP:G", "CMP:I", "TST"}:
|
|
return "read"
|
|
if root in {"BCLR", "BNOT", "BSET", "CLR", "INC", "INC:G", "NEG", "NOT"}:
|
|
return "write"
|
|
if operand in destination and operand not in source:
|
|
return "write"
|
|
if operand in source and operand not in destination:
|
|
return "read"
|
|
if root in {"ADD:Q", "ADD:G", "ADDS", "ADDX", "AND", "OR", "SUB", "SUBS", "SUBX", "XOR"}:
|
|
return "write"
|
|
return "read_write_candidate"
|
|
|
|
|
|
def _bit_number_from_instruction(ins: Mapping[str, Any]) -> int | None:
|
|
root = _mnemonic_root(str(ins.get("mnemonic", "")))
|
|
if root not in {"BCLR", "BNOT", "BSET", "BTST"}:
|
|
return None
|
|
source, _destination = _source_destination_operands(str(ins.get("operands", "")))
|
|
value = _parse_immediate(source)
|
|
if value is None:
|
|
return None
|
|
return value & 0x0F
|
|
|
|
|
|
def _operand_mentions_any_reference(operand: str, references: list[int]) -> bool:
|
|
return any(_operand_mentions_address(operand, address) for address in references)
|
|
|
|
|
|
def _operand_mentions_address(operand: str, address: int) -> bool:
|
|
operand_upper = operand.upper().replace(" ", "")
|
|
names = {
|
|
TX_STAGING_START: ("TX_STAGING",),
|
|
TX_FRAME_START: ("TX_FRAME",),
|
|
TX_CHECKSUM_ADDRESS: ("TX_CHECKSUM",),
|
|
RX_FRAME_START: ("RX_FRAME",),
|
|
RX_CHECKSUM_ADDRESS: ("RX_CHECKSUM",),
|
|
}
|
|
if any(name in operand_upper for name in names.get(address, ())):
|
|
return True
|
|
negative = (0x10000 - address) & 0xFFFF
|
|
return (
|
|
f"H'{address:04X}" in operand_upper
|
|
or f"0X{address:04X}" in operand_upper
|
|
or f"${address:04X}" in operand_upper
|
|
or f"-H'{negative:04X}" in operand_upper
|
|
or f"-0X{negative:04X}" in operand_upper
|
|
or f"-${negative:04X}" in operand_upper
|
|
)
|
|
|
|
|
|
def _mnemonic_root(mnemonic: str) -> str:
|
|
return mnemonic.rsplit(".", 1)[0].upper()
|
|
|
|
|
|
def _access_width(mnemonic: str) -> int:
|
|
upper = mnemonic.upper()
|
|
if upper.endswith(".L"):
|
|
return 4
|
|
if upper.endswith(".W"):
|
|
return 2
|
|
return 1
|
|
|
|
|
|
def _confidence_score(
|
|
frame_supported: bool,
|
|
dispatch: JsonObject | None,
|
|
responses: list[JsonObject],
|
|
commands: list[JsonObject],
|
|
) -> float:
|
|
score = 0.2
|
|
if frame_supported:
|
|
score += 0.25
|
|
if dispatch:
|
|
score += 0.2
|
|
if responses:
|
|
score += min(0.2, 0.04 * len(responses))
|
|
if commands:
|
|
score += min(0.15, 0.02 * len(commands))
|
|
return round(min(score, 0.9), 2)
|
|
|
|
|
|
def _confidence_label(score: float) -> str:
|
|
if score >= 0.75:
|
|
return "medium-high"
|
|
if score >= 0.5:
|
|
return "medium"
|
|
return "low"
|
|
|
|
|
|
def _dedupe_ints(values: Iterable[int]) -> list[int]:
|
|
seen: set[int] = set()
|
|
output: list[int] = []
|
|
for value in values:
|
|
if value in seen:
|
|
continue
|
|
seen.add(value)
|
|
output.append(value)
|
|
return output
|
|
|
|
|
|
def _dedupe_strings(values: Iterable[str]) -> list[str]:
|
|
seen: set[str] = set()
|
|
output: list[str] = []
|
|
for value in values:
|
|
if value in seen:
|
|
continue
|
|
seen.add(value)
|
|
output.append(value)
|
|
return output
|
|
|
|
|
|
def _hlist(values: Iterable[int]) -> list[str]:
|
|
return [_h16(value) for value in _dedupe_ints(values)]
|
|
|
|
|
|
def _h16(value: int, *, width: int = 4) -> str:
|
|
return f"H'{value & 0xFFFF:0{width}X}"
|
|
|
|
|
|
__all__ = ["analyze_serial_semantics"]
|