1
0
Files
h8-536-decoder/h8536/serial_semantics.py
2026-05-25 16:05:45 +10:00

1263 lines
48 KiB
Python

from __future__ import annotations
from collections.abc import Iterable, Mapping
from typing import Any
JsonObject = dict[str, Any]
RX_FRAME_START = 0xF860
RX_FRAME_END = 0xF865
RX_CHECKSUM_ADDRESS = RX_FRAME_END
RX_FRAME_LENGTH = 6
TX_STAGING_START = 0xF850
TX_STAGING_END = 0xF854
TX_STAGING_LENGTH = 5
TX_FRAME_START = 0xF858
TX_FRAME_END = 0xF85D
TX_CHECKSUM_ADDRESS = TX_FRAME_END
SEND_BUILDER_ADDRESS = 0xBA26
SEND_BUILDER_LABEL = "loc_BA26"
INDEX_DECODER_ADDRESS = 0x622B
INDEX_DECODER_LABEL = "loc_622B"
CHECKSUM_SEED = 0x5A
def analyze_serial_semantics(payload: Mapping[str, Any]) -> JsonObject:
"""Infer conservative SCI1 frame/command semantics from decompiler JSON."""
ordered = _instruction_sequence(payload.get("instructions"))
reconstruction = _serial_reconstruction(payload)
rx_candidate = _candidate_by_kind(reconstruction, "candidate_sci1_rx_frame")
tx_candidate = _candidate_by_kind(reconstruction, "candidate_sci1_tx_frame")
frame_supported = bool(rx_candidate and tx_candidate)
if not frame_supported:
return {
"kind": "serial_semantics",
"protocol_semantics": [],
"fields": [],
"command_dispatch": None,
"commands": [],
"response_candidates": [],
"confidence": "low",
"confidence_score": 0.0,
"caveat": "No protocol semantics are emitted without both RX and TX serial reconstruction candidates.",
}
dispatch = _find_command_dispatch(ordered)
responses = _response_candidates(ordered)
commands = _command_candidates(ordered, dispatch, responses)
fields = _field_candidates(ordered, dispatch, responses)
send_builder = _send_builder_candidate(ordered, responses, tx_candidate)
evidence = _top_level_evidence(ordered, dispatch, responses, rx_candidate, tx_candidate)
confidence_score = _confidence_score(frame_supported, dispatch, responses, commands)
protocol = {
"kind": "serial_semantics",
"scope": "evidence_supported_sci1_6_byte_frame",
"confidence": _confidence_label(confidence_score),
"confidence_score": confidence_score,
"caveat": (
"Semantic names are candidates only. The analyzer reports byte roles, command values, "
"dispatch targets, and response staging patterns observed in code; it does not prove "
"source-level intent or protocol documentation."
),
"frame_candidate": {
"channel": "SCI1",
"rx_frame_start": RX_FRAME_START,
"rx_frame_start_hex": _h16(RX_FRAME_START),
"rx_frame_end": RX_FRAME_END,
"rx_frame_end_hex": _h16(RX_FRAME_END),
"tx_staging_start": TX_STAGING_START,
"tx_staging_start_hex": _h16(TX_STAGING_START),
"tx_staging_end": TX_STAGING_END,
"tx_staging_end_hex": _h16(TX_STAGING_END),
"tx_frame_start": TX_FRAME_START,
"tx_frame_start_hex": _h16(TX_FRAME_START),
"tx_frame_end": TX_FRAME_END,
"tx_frame_end_hex": _h16(TX_FRAME_END),
"frame_length": RX_FRAME_LENGTH,
"tx_staging_length": TX_STAGING_LENGTH,
"checksum_seed": CHECKSUM_SEED,
"checksum_seed_hex": _h16(CHECKSUM_SEED),
"serial_reconstruction_supported": frame_supported,
"rx_reconstruction_candidate_id": rx_candidate.get("id") if rx_candidate else None,
"tx_reconstruction_candidate_id": tx_candidate.get("id") if tx_candidate else None,
},
"byte_layout": _byte_layout(),
"fields": fields,
"command_dispatch": dispatch,
"commands": commands,
"index_decoder": _index_decoder_candidate(ordered),
"send_builder": send_builder,
"response_candidates": responses,
"rx_fields": _rx_field_candidates(ordered, dispatch),
"response_builders": _response_builder_aliases(responses),
"evidence": evidence,
}
return {
"kind": "serial_semantics",
"protocol_semantics": [protocol],
"fields": protocol["fields"],
"command_dispatch": protocol["command_dispatch"],
"commands": protocol["commands"],
"response_candidates": protocol["response_candidates"],
"send_builder": protocol["send_builder"],
"confidence": protocol["confidence"],
"confidence_score": protocol["confidence_score"],
"caveat": protocol["caveat"],
}
def _field_candidates(
ordered: list[JsonObject],
dispatch: JsonObject | None,
responses: list[JsonObject],
) -> list[JsonObject]:
fields: list[JsonObject] = []
response_write_map: dict[int, list[int]] = {}
for response in responses:
for write in response.get("writes", []):
if not isinstance(write, Mapping):
continue
for address in write.get("addresses", []):
if isinstance(address, int):
response_write_map.setdefault(address, []).append(int(write["instruction_address"]))
rx_reads = {
address: [ins["address"] for ins in ordered if _is_read_from_address(ins, address)]
for address in range(RX_FRAME_START, RX_FRAME_END + 1)
}
rx_writes = {
address: [ins["address"] for ins in ordered if _is_write_to_address(ins, address)]
for address in range(RX_FRAME_START, RX_FRAME_END + 1)
}
dispatch_addresses = set(dispatch.get("evidence_addresses", []) if dispatch else [])
for offset, address in enumerate(range(RX_FRAME_START, RX_FRAME_END + 1)):
role = "payload_byte_candidate"
caveat = "Role is inferred from reads in command processing."
if offset == 0:
role = "command_selector_candidate"
caveat = "RX[0] is masked with 0x07 before command comparisons."
elif address == RX_CHECKSUM_ADDRESS:
role = "checksum_byte_candidate"
caveat = "RX[5] is compared with a checksum over RX[0..4]."
fields.append(
{
"id": f"rx_{offset}",
"kind": "rx_frame_field_candidate",
"offset": offset,
"address": address,
"address_hex": _h16(address),
"role_candidate": role,
"evidence_addresses": _dedupe_ints(
rx_reads[address]
+ rx_writes[address]
+ ([addr for addr in dispatch_addresses if offset == 0])
),
"evidence_addresses_hex": _hlist(
rx_reads[address]
+ rx_writes[address]
+ ([addr for addr in dispatch_addresses if offset == 0])
),
"read_count": len(rx_reads[address]),
"write_count": len(rx_writes[address]),
"confidence": "medium" if rx_reads[address] else "low",
"caveat": caveat,
}
)
for offset, address in enumerate(range(TX_STAGING_START, TX_STAGING_END + 1)):
write_addresses = _dedupe_ints(response_write_map.get(address, []))
fields.append(
{
"id": f"tx_staging_{offset}",
"kind": "tx_staging_field_candidate",
"offset": offset,
"address": address,
"address_hex": _h16(address),
"role_candidate": "response_staging_byte_candidate",
"evidence_addresses": write_addresses,
"evidence_addresses_hex": _hlist(write_addresses),
"write_count": len(write_addresses),
"confidence": "medium" if write_addresses else "low",
"caveat": (
"This byte is staged before calls to loc_BA26; the analyzer does not infer "
"a stable field name beyond response position."
),
}
)
return fields
def _rx_field_candidates(
ordered: list[JsonObject],
dispatch: JsonObject | None,
) -> list[JsonObject]:
read_map = {
address: [
int(ins["address"])
for ins in ordered
if address in _read_addresses_in_range(ins, RX_FRAME_START, RX_FRAME_END)
]
for address in range(RX_FRAME_START, RX_FRAME_END + 1)
}
fields: list[JsonObject] = []
for offset, address in enumerate(range(RX_FRAME_START, RX_FRAME_END + 1)):
if offset == 0 and dispatch:
name = "command_low3"
confidence = "candidate-medium"
mask = dispatch.get("mask")
evidence = _dedupe_ints(read_map[address] + dispatch.get("evidence_addresses", []))
elif offset in {1, 2}:
name = "likely_id_or_index"
confidence = "candidate-low"
mask = None
evidence = read_map[address]
elif offset in {3, 4}:
name = "likely_value"
confidence = "candidate-low"
mask = None
evidence = read_map[address]
else:
name = "checksum"
confidence = "candidate-medium"
mask = None
evidence = read_map[address]
field = {
"offset": offset,
"field": f"byte{offset}",
"name": name,
"address": address,
"address_hex": _h16(address),
"confidence": confidence,
"caveat": "Field name is inferred from access pattern and remains a candidate.",
"evidence_addresses": _dedupe_ints(evidence),
"evidence_addresses_hex": _hlist(evidence),
}
if mask is not None:
field["mask"] = mask
field["mask_hex"] = _h16(int(mask), width=2)
fields.append(field)
return fields
def _find_command_dispatch(ordered: list[JsonObject]) -> JsonObject | None:
by_index = {int(ins["address"]): index for index, ins in enumerate(ordered) if "address" in ins}
best: JsonObject | None = None
for index, ins in enumerate(ordered):
if _mnemonic_root(ins.get("mnemonic", "")) != "AND":
continue
if _immediate_source_value(str(ins.get("operands", ""))) != 0x07:
continue
_source, selector_reg = _source_destination_operands(str(ins.get("operands", "")))
if not selector_reg:
continue
read = _find_prior_read(ordered, index, RX_FRAME_START, selector_reg)
if read is None:
continue
comparisons = _dispatch_comparisons(ordered, index + 1, selector_reg)
command_values = sorted({int(item["command_value"]) for item in comparisons})
candidate = {
"kind": "command_dispatch_candidate",
"selector": "rx0_low3_bits",
"field": "command_low3",
"rx_offset": 0,
"rx_address": RX_FRAME_START,
"rx_address_hex": _h16(RX_FRAME_START),
"source_address": RX_FRAME_START,
"source_address_hex": _h16(RX_FRAME_START),
"source_field": "byte0",
"mask": 0x07,
"mask_hex": _h16(0x07),
"selector_register": selector_reg,
"read_address": int(read["address"]),
"read_address_hex": _h16(int(read["address"])),
"mask_address": int(ins["address"]),
"mask_address_hex": _h16(int(ins["address"])),
"command_values": command_values,
"command_values_hex": [_h16(value, width=2) for value in command_values],
"comparisons": comparisons,
"cases": [
{
"value": int(item["command_value"]),
"value_hex": item["command_value_hex"],
"target": int(item["handler_start"]),
"target_hex": item["handler_start_hex"],
"compare_address": item["compare_address"],
"branch_address": item["branch_address"],
}
for item in comparisons
],
"evidence_addresses": _dedupe_ints(
[int(read["address"]), int(ins["address"])]
+ [addr for item in comparisons for addr in item["evidence_addresses"]]
),
"confidence": "medium",
"caveat": (
"Dispatch is inferred from a read of RX[0], an AND 0x07 mask, and nearby "
"compare/branch pairs. Gating state around the dispatch may affect reachability."
),
}
candidate["evidence_addresses_hex"] = _hlist(candidate["evidence_addresses"])
if best is None or len(comparisons) > len(best["comparisons"]):
best = candidate
if best:
for item in best["comparisons"]:
target = item.get("handler_start")
if isinstance(target, int) and target in by_index:
item["handler_start_index"] = by_index[target]
return best
def _dispatch_comparisons(
ordered: list[JsonObject],
start_index: int,
selector_reg: str,
) -> list[JsonObject]:
comparisons: list[JsonObject] = []
for index in range(start_index, min(len(ordered) - 1, start_index + 96)):
ins = ordered[index]
address = int(ins.get("address", -1))
if address >= 0xBE70:
break
if _mnemonic_root(str(ins.get("mnemonic", ""))) not in {"CMP", "CMP:E", "CMP:G", "CMP:I"}:
continue
if _destination_operand(str(ins.get("operands", ""))).upper() != selector_reg.upper():
continue
value = _immediate_source_value(str(ins.get("operands", "")))
if value is None or not 0 <= value <= 7:
continue
branch = ordered[index + 1]
if str(branch.get("mnemonic", "")).upper() != "BEQ":
continue
targets = _targets(branch)
if not targets:
continue
branch_address = int(branch["address"])
target = int(targets[0])
comparisons.append(
{
"command_value": value,
"command_value_hex": _h16(value, width=2),
"compare_address": address,
"compare_address_hex": _h16(address),
"branch_address": branch_address,
"branch_address_hex": _h16(branch_address),
"handler_start": target,
"handler_start_hex": _h16(target),
"evidence_addresses": [address, branch_address],
"evidence_addresses_hex": _hlist([address, branch_address]),
}
)
return comparisons
def _command_candidates(
ordered: list[JsonObject],
dispatch: JsonObject | None,
responses: list[JsonObject],
) -> list[JsonObject]:
if not dispatch:
return []
comparisons = [
item for item in dispatch.get("comparisons", []) if isinstance(item, Mapping)
]
starts = sorted({int(item["handler_start"]) for item in comparisons if "handler_start" in item})
ranges = {
start: _handler_end(ordered, start, starts)
for start in starts
}
by_value: dict[int, JsonObject] = {}
for comparison in comparisons:
value = int(comparison["command_value"])
start = int(comparison["handler_start"])
end = ranges.get(start)
command = by_value.setdefault(
value,
{
"kind": "command_candidate",
"command_value": value,
"command_value_hex": _h16(value, width=2),
"name_candidate": _command_name_candidate(value),
"summary": _command_summary(value),
"handler_alternatives": [],
"evidence_addresses": [],
"response_candidates": [],
"rx_reads": [],
"confidence": "medium",
"caveat": (
"Command value and handler range are inferred from compare/BEQ dispatch. "
"No command name or intent is asserted."
),
},
)
alternative = {
"handler_start": start,
"handler_start_hex": _h16(start),
"handler_end": end,
"handler_end_hex": _h16(end) if end is not None else None,
"dispatch_compare_address": comparison["compare_address"],
"dispatch_compare_address_hex": comparison["compare_address_hex"],
"dispatch_branch_address": comparison["branch_address"],
"dispatch_branch_address_hex": comparison["branch_address_hex"],
}
if alternative not in command["handler_alternatives"]:
command["handler_alternatives"].append(alternative)
command["evidence_addresses"].extend(dispatch.get("evidence_addresses", [])[:2])
command["evidence_addresses"].extend(comparison.get("evidence_addresses", []))
for command in by_value.values():
alternatives = command["handler_alternatives"]
starts_for_command = _dedupe_ints(
alt["handler_start"] for alt in alternatives if isinstance(alt["handler_start"], int)
)
ends_for_command = _dedupe_ints(
alt["handler_end"] for alt in alternatives if isinstance(alt["handler_end"], int)
)
command["handler_start"] = starts_for_command[0] if len(starts_for_command) == 1 else None
command["handler_start_hex"] = _h16(starts_for_command[0]) if len(starts_for_command) == 1 else None
command["handler_end"] = ends_for_command[0] if len(ends_for_command) == 1 else None
command["handler_end_hex"] = _h16(ends_for_command[0]) if len(ends_for_command) == 1 else None
ranges_for_command = [
(alt["handler_start"], alt["handler_end"])
for alt in alternatives
if isinstance(alt["handler_end"], int)
]
command["rx_reads"] = _rx_reads_in_ranges(ordered, ranges_for_command)
command["response_candidates"] = [
response["id"]
for response in responses
if _response_in_ranges(response, ranges_for_command)
]
response_evidence = [
addr
for response in responses
if response["id"] in command["response_candidates"]
for addr in response.get("evidence_addresses", [])
]
command["evidence_addresses"] = _dedupe_ints(command["evidence_addresses"] + response_evidence)
command["evidence_addresses_hex"] = _hlist(command["evidence_addresses"])
return [by_value[value] for value in sorted(by_value)]
def _byte_layout() -> list[JsonObject]:
return [
{
"offset": 0,
"rx_address": RX_FRAME_START,
"tx_staging_address": TX_STAGING_START,
"name_candidate": "op_flags",
"semantic": "low three bits select a command; upper bits are preserved or gated in some paths",
"confidence": "medium-high",
},
{
"offset": 1,
"rx_address": RX_FRAME_START + 1,
"tx_staging_address": TX_STAGING_START + 1,
"name_candidate": "addr_page_flags",
"semantic": "candidate high/page byte for logical point/index; bit 7 is tested as a control flag",
"confidence": "medium",
},
{
"offset": 2,
"rx_address": RX_FRAME_START + 2,
"tx_staging_address": TX_STAGING_START + 2,
"name_candidate": "addr_offset",
"semantic": "candidate low/offset byte for logical point/index",
"confidence": "medium",
},
{
"offset": 3,
"rx_address": RX_FRAME_START + 3,
"tx_staging_address": TX_STAGING_START + 3,
"name_candidate": "value_hi",
"semantic": "candidate high byte of a word value",
"confidence": "medium",
},
{
"offset": 4,
"rx_address": RX_FRAME_START + 4,
"tx_staging_address": TX_STAGING_START + 4,
"name_candidate": "value_lo",
"semantic": "candidate low byte of a word value",
"confidence": "medium",
},
{
"offset": 5,
"rx_address": RX_CHECKSUM_ADDRESS,
"tx_staging_address": None,
"name_candidate": "checksum",
"semantic": "0x5A-seeded XOR of bytes 0..4",
"confidence": "high",
},
]
def _command_name_candidate(value: int) -> str:
return {
0x00: "set_value_acked",
0x01: "read_value",
0x02: "clear_or_abort",
0x04: "set_value_no_immediate_reply",
0x05: "ack_or_clear_pending",
0x06: "set_secondary_value",
0x07: "retransmit_or_error_reply",
}.get(value, f"command_{value:02X}")
def _command_summary(value: int) -> str:
return {
0x00: "candidate write of RX[3:4] into primary/current tables, followed by a response",
0x01: "candidate read from the primary table, followed by a response carrying the value",
0x02: "candidate clear/abort path with no immediate response builder",
0x04: "candidate write/update path that stores a value without an immediate serial response",
0x05: "candidate pending/event acknowledgement path",
0x06: "candidate secondary-table value write path",
0x07: "candidate retransmit/NAK-style path; error handling also builds command 0x07 responses",
}.get(value, "candidate command semantics are unknown")
def _index_decoder_candidate(ordered: list[JsonObject]) -> JsonObject | None:
calls = [
ins for ins in ordered
if _mnemonic_root(str(ins.get("mnemonic", ""))) in {"BSR", "JSR", "PJSR"}
and (
INDEX_DECODER_ADDRESS in _targets(ins)
or INDEX_DECODER_LABEL.upper() in str(ins.get("operands", "")).upper()
)
]
if not calls:
return None
evidence_addresses = [int(ins["address"]) for ins in calls]
return {
"kind": "logical_index_decoder_candidate",
"label": INDEX_DECODER_LABEL,
"address": INDEX_DECODER_ADDRESS,
"address_hex": _h16(INDEX_DECODER_ADDRESS),
"input_fields": ["addr_page_flags", "addr_offset"],
"output_register": "R5",
"post_scale_register": "R4",
"post_scale": "R4 = R5 << 1",
"mapping_candidate": [
{"page": 0, "offset_range": "0x00-0x7F", "index_range": "0x000-0x07F"},
{"page": 1, "offset_range": "0x00-0xFF", "index_range": "0x080-0x17F"},
{"page": 2, "offset_range": "0x00-0x7F", "index_range": "0x180-0x1FF"},
{"page": "other/overflow", "index": "0x1FF"},
],
"evidence_addresses": evidence_addresses,
"evidence_addresses_hex": _hlist(evidence_addresses),
"confidence": "medium",
"caveat": (
"Mapping is inferred from loc_622B behavior and the nearby R4 = R5 << 1 table-index use."
),
}
def _response_candidates(ordered: list[JsonObject]) -> list[JsonObject]:
responses: list[JsonObject] = []
for index, ins in enumerate(ordered):
if not _is_send_builder_call(ins):
continue
window = _response_window(ordered, index)
writes = _staging_writes(window)
if not writes:
continue
reads = _rx_reads(window, RX_FRAME_START + 1, RX_FRAME_START + 4)
call_address = int(ins["address"])
evidence_addresses = _dedupe_ints(
[write["instruction_address"] for write in writes]
+ [read["instruction_address"] for read in reads]
+ [call_address]
)
response = {
"id": f"response_at_{call_address:04X}",
"kind": "response_staging_candidate",
"call_address": call_address,
"call_address_hex": _h16(call_address),
"send_builder": SEND_BUILDER_LABEL,
"send_builder_address": SEND_BUILDER_ADDRESS,
"send_builder_address_hex": _h16(SEND_BUILDER_ADDRESS),
"window_start": int(window[0]["address"]) if window else call_address,
"window_start_hex": _h16(int(window[0]["address"])) if window else _h16(call_address),
"writes": writes,
"rx_reads": reads,
"evidence_addresses": evidence_addresses,
"evidence_addresses_hex": _hlist(evidence_addresses),
"confidence": "medium",
"caveat": (
"Response candidate means F850-F854 are written shortly before loc_BA26. "
"The analyzer does not prove every byte is meaningful for every path."
),
}
responses.append(response)
return responses
def _rx_field_candidates(
ordered: list[JsonObject],
dispatch: JsonObject | None,
) -> list[JsonObject]:
fields: list[JsonObject] = []
dispatch_evidence = []
if isinstance(dispatch, Mapping):
dispatch_evidence = [
value for value in dispatch.get("evidence_addresses", []) if isinstance(value, int)
]
for offset in range(RX_FRAME_LENGTH):
address = RX_FRAME_START + offset
read_evidence = [
int(ins["address"]) for ins in ordered if _is_read_from_address(ins, address)
]
name = "payload_byte"
confidence = "candidate-low"
caveat = "role is inferred only from frame position"
mask = None
if offset == 0:
name = "command_low3"
confidence = "candidate-high" if dispatch else "candidate-medium"
caveat = "RX[0] is masked with 0x07 before command comparisons"
mask = 0x07
read_evidence = _dedupe_ints(read_evidence + dispatch_evidence)
elif offset in {1, 2}:
name = "likely_id_or_index"
confidence = "candidate-medium" if read_evidence else "candidate-low"
caveat = "RX[1:2] are read near logical point/index and response-echo handling"
elif offset in {3, 4}:
name = "likely_value"
confidence = "candidate-medium" if read_evidence else "candidate-low"
caveat = "RX[3:4] are read near table-value write/read response handling"
elif offset == 5:
name = "checksum"
confidence = "candidate-high"
caveat = "RX[5] is validated by the serial reconstruction checksum evidence"
field: JsonObject = {
"kind": "rx_field_semantic_candidate",
"offset": offset,
"name": name,
"address": address,
"address_hex": _h16(address),
"confidence": confidence,
"caveat": caveat,
"evidence_addresses": _dedupe_ints(read_evidence),
"evidence_addresses_hex": _hlist(read_evidence),
}
if mask is not None:
field["mask"] = mask
field["mask_hex"] = _h16(mask, width=2)
fields.append(field)
return fields
def _response_builder_aliases(responses: list[JsonObject]) -> list[JsonObject]:
builders: list[JsonObject] = []
for response in responses:
writes: list[JsonObject] = []
for write in response.get("writes", []):
if not isinstance(write, Mapping):
continue
for address in write.get("addresses", []):
if not isinstance(address, int):
continue
writes.append(
{
"address": address,
"address_hex": _h16(address),
"instruction_address": write.get("instruction_address"),
"instruction_address_hex": write.get("instruction_address_hex"),
"source": write.get("source"),
"instruction": write.get("instruction"),
}
)
builders.append(
{
"kind": "response_builder_candidate",
"buffer_start": TX_STAGING_START,
"buffer_start_hex": _h16(TX_STAGING_START),
"buffer_end": TX_STAGING_END,
"buffer_end_hex": _h16(TX_STAGING_END),
"send_call_target": SEND_BUILDER_ADDRESS,
"send_call_target_hex": _h16(SEND_BUILDER_ADDRESS),
"call_address": response.get("call_address"),
"call_address_hex": response.get("call_address_hex"),
"writes": writes,
"evidence_addresses": response.get("evidence_addresses", []),
"evidence_addresses_hex": response.get("evidence_addresses_hex", []),
"confidence": response.get("confidence", "medium"),
"caveat": response.get("caveat"),
}
)
return builders
def _send_builder_candidate(
ordered: list[JsonObject],
responses: list[JsonObject],
tx_candidate: Mapping[str, Any] | None,
) -> JsonObject:
copies = []
builder_body = [
ins
for ins in ordered
if SEND_BUILDER_ADDRESS <= int(ins.get("address", -1)) <= 0xBA83
]
for ins in builder_body:
source, destination = _source_destination_operands(str(ins.get("operands", "")))
source_address = _first_address_in_range(ins, TX_STAGING_START, TX_STAGING_END, operand=source)
destination_address = _first_address_in_range(ins, TX_FRAME_START, TX_FRAME_END, operand=destination)
if source_address is None or destination_address is None:
continue
copies.append(
{
"instruction_address": int(ins["address"]),
"instruction_address_hex": _h16(int(ins["address"])),
"source_address": source_address,
"source_address_hex": _h16(source_address),
"destination_address": destination_address,
"destination_address_hex": _h16(destination_address),
"instruction": str(ins.get("text", "")),
}
)
call_addresses = [int(response["call_address"]) for response in responses]
evidence_addresses = _dedupe_ints(
[int(ins["address"]) for ins in builder_body if _has_ref_in_range(ins, TX_FRAME_START, TX_FRAME_END)]
+ call_addresses
)
return {
"kind": "tx_send_builder_candidate",
"label": SEND_BUILDER_LABEL,
"address": SEND_BUILDER_ADDRESS,
"address_hex": _h16(SEND_BUILDER_ADDRESS),
"staging_buffer_start": TX_STAGING_START,
"staging_buffer_start_hex": _h16(TX_STAGING_START),
"staging_buffer_end": TX_STAGING_END,
"staging_buffer_end_hex": _h16(TX_STAGING_END),
"tx_frame_start": TX_FRAME_START,
"tx_frame_start_hex": _h16(TX_FRAME_START),
"tx_frame_end": TX_FRAME_END,
"tx_frame_end_hex": _h16(TX_FRAME_END),
"checksum_address": TX_CHECKSUM_ADDRESS,
"checksum_address_hex": _h16(TX_CHECKSUM_ADDRESS),
"checksum_seed": CHECKSUM_SEED,
"checksum_seed_hex": _h16(CHECKSUM_SEED),
"staging_to_frame_copies": copies,
"response_call_addresses": call_addresses,
"response_call_addresses_hex": _hlist(call_addresses),
"serial_reconstruction_candidate_id": tx_candidate.get("id") if tx_candidate else None,
"evidence_addresses": evidence_addresses,
"evidence_addresses_hex": _hlist(evidence_addresses),
"confidence": "high" if copies and tx_candidate else "medium" if copies else "low",
"caveat": (
"loc_BA26 is treated as a send builder because it copies F850-F854 into the "
"evidence-supported TX frame and then starts SCI1 transmission."
),
}
def _top_level_evidence(
ordered: list[JsonObject],
dispatch: JsonObject | None,
responses: list[JsonObject],
rx_candidate: Mapping[str, Any] | None,
tx_candidate: Mapping[str, Any] | None,
) -> list[JsonObject]:
evidence: list[JsonObject] = []
if rx_candidate:
evidence.append(
{
"kind": "rx_frame_reconstruction_present",
"summary": "serial_reconstruction contains an evidence-supported SCI1 RX frame candidate",
"candidate_id": rx_candidate.get("id"),
}
)
if tx_candidate:
evidence.append(
{
"kind": "tx_frame_reconstruction_present",
"summary": "serial_reconstruction contains an evidence-supported SCI1 TX frame candidate",
"candidate_id": tx_candidate.get("id"),
}
)
if dispatch:
evidence.append(
{
"kind": "rx0_masked_command_dispatch",
"summary": "RX[0] is read, masked with 0x07, and compared against command values",
"addresses": dispatch.get("evidence_addresses", []),
"addresses_hex": dispatch.get("evidence_addresses_hex", []),
}
)
if responses:
addresses = _dedupe_ints(
[addr for response in responses for addr in response.get("evidence_addresses", [])]
)
evidence.append(
{
"kind": "responses_stage_f850_f854_before_send",
"summary": "F850-F854 writes are observed before calls to loc_BA26",
"addresses": addresses,
"addresses_hex": _hlist(addresses),
"response_count": len(responses),
}
)
rx_payload_reads = [
int(ins["address"])
for ins in ordered
if any(_is_read_from_address(ins, address) for address in range(RX_FRAME_START + 1, RX_FRAME_START + 5))
]
if rx_payload_reads:
evidence.append(
{
"kind": "rx_payload_bytes_read",
"summary": "RX[1..4] are read in the command-processing region",
"addresses": _dedupe_ints(rx_payload_reads),
"addresses_hex": _hlist(rx_payload_reads),
}
)
return evidence
def _response_window(ordered: list[JsonObject], call_index: int) -> list[JsonObject]:
start = call_index
for index in range(call_index - 1, max(-1, call_index - 48), -1):
candidate = ordered[index]
mnemonic = str(candidate.get("mnemonic", "")).upper()
if mnemonic in {"RTS", "RTE"}:
break
if candidate.get("kind") == "branch" and mnemonic != "BSR":
break
start = index
return ordered[start:call_index]
def _staging_writes(window: list[JsonObject]) -> list[JsonObject]:
writes: list[JsonObject] = []
for index, ins in enumerate(window):
touched = _written_addresses_in_range(ins, TX_STAGING_START, TX_STAGING_END)
if not touched:
continue
source, _destination = _source_destination_operands(str(ins.get("operands", "")))
source_info = _source_info(window, index, source)
writes.append(
{
"instruction_address": int(ins["address"]),
"instruction_address_hex": _h16(int(ins["address"])),
"addresses": touched,
"addresses_hex": _hlist(touched),
"source_operand": source,
"source": source_info,
"instruction": str(ins.get("text", "")),
}
)
return writes
def _source_info(window: list[JsonObject], index: int, source: str) -> JsonObject:
immediate = _parse_immediate(source)
if immediate is not None:
return {
"kind": "immediate",
"value": immediate,
"value_hex": _h16(immediate, width=2 if immediate <= 0xFF else 4),
}
source_upper = source.upper()
for prior in reversed(window[max(0, index - 4) : index]):
prior_source, prior_destination = _source_destination_operands(str(prior.get("operands", "")))
if prior_destination.upper() != source_upper:
continue
rx_address = _first_address_in_range(prior, RX_FRAME_START, RX_FRAME_END, operand=prior_source)
if rx_address is not None and _is_read_from_address(prior, rx_address):
return {
"kind": "rx_frame_byte",
"rx_offset": rx_address - RX_FRAME_START,
"rx_address": rx_address,
"rx_address_hex": _h16(rx_address),
"evidence_address": int(prior["address"]),
"evidence_address_hex": _h16(int(prior["address"])),
"instruction": str(prior.get("text", "")),
}
return {"kind": "register_or_computed", "operand": source}
def _rx_reads(window: list[JsonObject], start: int, end: int) -> list[JsonObject]:
reads: list[JsonObject] = []
for ins in window:
for address in range(start, end + 1):
if not _is_read_from_address(ins, address):
continue
reads.append(
{
"instruction_address": int(ins["address"]),
"instruction_address_hex": _h16(int(ins["address"])),
"rx_offset": address - RX_FRAME_START,
"rx_address": address,
"rx_address_hex": _h16(address),
"instruction": str(ins.get("text", "")),
}
)
return reads
def _rx_reads_in_ranges(
ordered: list[JsonObject],
ranges: list[tuple[int, int]],
) -> list[JsonObject]:
reads: list[JsonObject] = []
for ins in ordered:
address = int(ins.get("address", -1))
if not any(start <= address <= end for start, end in ranges):
continue
reads.extend(_rx_reads([ins], RX_FRAME_START + 1, RX_FRAME_START + 4))
seen: set[tuple[int, int]] = set()
output: list[JsonObject] = []
for read in reads:
key = (int(read["instruction_address"]), int(read["rx_address"]))
if key in seen:
continue
seen.add(key)
output.append(read)
return output
def _response_in_ranges(response: Mapping[str, Any], ranges: list[tuple[int, int]]) -> bool:
call_address = int(response.get("call_address", -1))
return any(start <= call_address <= end for start, end in ranges)
def _handler_end(
ordered: list[JsonObject],
start: int,
handler_starts: list[int],
) -> int | None:
addresses = [int(ins["address"]) for ins in ordered]
try:
start_index = addresses.index(start)
except ValueError:
return None
later_starts = [candidate for candidate in handler_starts if candidate > start]
if later_starts:
next_start = min(later_starts)
previous = [address for address in addresses if start <= address < next_start]
return previous[-1] if previous else None
for ins in ordered[start_index:]:
mnemonic = str(ins.get("mnemonic", "")).upper()
if mnemonic in {"RTS", "RTE"}:
return int(ins["address"])
if mnemonic == "BRA" and SEND_BUILDER_ADDRESS not in _targets(ins):
targets = _targets(ins)
if targets and targets[0] >= 0xBE6D:
return int(ins["address"])
return None
def _find_prior_read(
ordered: list[JsonObject],
index: int,
address: int,
destination_register: str,
) -> JsonObject | None:
for candidate in reversed(ordered[max(0, index - 6) : index]):
if not _is_read_from_address(candidate, address):
continue
if _destination_operand(str(candidate.get("operands", ""))).upper() == destination_register.upper():
return candidate
return None
def _is_send_builder_call(ins: Mapping[str, Any]) -> bool:
mnemonic = str(ins.get("mnemonic", "")).upper()
if mnemonic not in {"BSR", "JSR", "PJSR"}:
return False
if SEND_BUILDER_ADDRESS in _targets(ins):
return True
return SEND_BUILDER_LABEL.upper() in str(ins.get("operands", "")).upper()
def _written_addresses_in_range(ins: Mapping[str, Any], start: int, end: int) -> list[int]:
if not _has_ref_in_range(ins, start, end):
return []
source, destination = _source_destination_operands(str(ins.get("operands", "")))
del source
base = _first_address_in_range(ins, start, end, operand=destination)
if base is None or not _is_write_to_address(ins, base):
return []
width = _access_width(str(ins.get("mnemonic", "")))
return [address for address in range(base, min(base + width - 1, end) + 1)]
def _read_addresses_in_range(ins: Mapping[str, Any], start: int, end: int) -> list[int]:
if not _has_ref_in_range(ins, start, end):
return []
source, _destination = _source_destination_operands(str(ins.get("operands", "")))
base = _first_address_in_range(ins, start, end, operand=source)
if base is None or not _is_read_from_address(ins, base):
return []
width = _access_width(str(ins.get("mnemonic", "")))
return [address for address in range(base, min(base + width - 1, end) + 1)]
def _is_read_from_address(ins: Mapping[str, Any], address: int) -> bool:
source, destination = _source_destination_operands(str(ins.get("operands", "")))
if _operand_mentions_address(source, address):
return True
if address not in _references(ins):
return False
if source.startswith("@") and not _operand_mentions_any_reference(destination, _references(ins)):
return True
return _access_direction(ins, address) == "read"
def _is_write_to_address(ins: Mapping[str, Any], address: int) -> bool:
_source, destination = _source_destination_operands(str(ins.get("operands", "")))
if _operand_mentions_address(destination, address):
return _access_direction(ins, address) == "write"
if address not in _references(ins):
return False
return _access_direction(ins, address) == "write"
def _access_direction(ins: Mapping[str, Any], address: int) -> str | None:
root = _mnemonic_root(str(ins.get("mnemonic", "")))
if root in {"BTST", "CMP", "CMP:E", "CMP:G", "CMP:I", "MOVFPE", "TST"}:
return "read"
if root in {"BCLR", "BNOT", "BSET", "CLR", "INC", "INC:G", "NEG", "NOT"}:
return "write"
if root in {"ADD:Q", "ADD:G", "ADDS", "ADDX", "AND", "OR", "SUB", "SUBS", "SUBX", "XOR"}:
return "write"
if root in {"MOV:G", "MOV:S", "MOVTPE"}:
source, destination = _source_destination_operands(str(ins.get("operands", "")))
if _operand_mentions_address(destination, address):
return "write"
if _operand_mentions_address(source, address):
return "read"
if address in _references(ins):
if destination.startswith("@") and not _operand_mentions_any_reference(source, _references(ins)):
return "write"
if source.startswith("@") and not _operand_mentions_any_reference(destination, _references(ins)):
return "read"
if root in {"MOV:L", "MOV:F"}:
return "read"
if root == "STC":
return "write"
if root == "LDC":
return "read"
return None
def _first_address_in_range(
ins: Mapping[str, Any],
start: int,
end: int,
*,
operand: str = "",
) -> int | None:
if operand:
for address in range(start, end + 1):
if _operand_mentions_address(operand, address):
return address
for address in _references(ins):
if start <= address <= end:
return address
return None
def _has_ref_in_range(ins: Mapping[str, Any], start: int, end: int) -> bool:
return any(start <= address <= end for address in _references(ins))
def _references(ins: Mapping[str, Any]) -> list[int]:
references = ins.get("references", [])
output: list[int] = []
if not isinstance(references, list):
return output
for reference in references:
if isinstance(reference, Mapping) and isinstance(reference.get("address"), int):
output.append(int(reference["address"]))
elif isinstance(reference, int):
output.append(reference)
return output
def _targets(ins: Mapping[str, Any]) -> list[int]:
targets = ins.get("targets", [])
if not isinstance(targets, list):
return []
return [int(target) for target in targets if isinstance(target, int)]
def _instruction_sequence(value: object) -> list[JsonObject]:
if isinstance(value, Mapping):
values: Iterable[Any] = value.values()
elif isinstance(value, list):
values = value
else:
values = []
return sorted(
[item for item in values if isinstance(item, dict) and isinstance(item.get("address"), int)],
key=lambda item: int(item["address"]),
)
def _serial_reconstruction(payload: Mapping[str, Any]) -> Mapping[str, Any]:
serial = payload.get("serial_reconstruction")
return serial if isinstance(serial, Mapping) else {}
def _candidate_by_kind(serial: Mapping[str, Any], kind: str) -> Mapping[str, Any] | None:
candidates = serial.get("candidates")
if not isinstance(candidates, list):
return None
for candidate in candidates:
if isinstance(candidate, Mapping) and candidate.get("kind") == kind:
return candidate
return None
def _source_destination_operands(operands: str) -> tuple[str, str]:
depth = 0
split_at: int | None = None
for index, char in enumerate(operands):
if char in "({":
depth += 1
elif char in ")}" and depth:
depth -= 1
elif char == "," and depth == 0:
split_at = index
if split_at is None:
operand = operands.strip()
return "", operand
return operands[:split_at].strip(), operands[split_at + 1 :].strip()
def _destination_operand(operands: str) -> str:
return _source_destination_operands(operands)[1]
def _immediate_source_value(operands: str) -> int | None:
source, _destination = _source_destination_operands(operands)
if not source.startswith("#"):
return None
return _parse_immediate(source)
def _parse_immediate(operand: str) -> int | None:
text = operand.strip()
if text.startswith("#"):
text = text[1:].strip()
try:
if text.upper().startswith("H'"):
return int(text[2:], 16) & 0xFFFF
if text.upper().startswith("0X"):
return int(text, 16) & 0xFFFF
if text.upper().startswith("$"):
return int(text[1:], 16) & 0xFFFF
return int(text, 10) & 0xFFFF
except ValueError:
return None
def _operand_mentions_any_reference(operand: str, references: list[int]) -> bool:
return any(_operand_mentions_address(operand, address) for address in references)
def _operand_mentions_address(operand: str, address: int) -> bool:
operand_upper = operand.upper().replace(" ", "")
names = {
TX_STAGING_START: ("TX_STAGING",),
TX_FRAME_START: ("TX_FRAME",),
TX_CHECKSUM_ADDRESS: ("TX_CHECKSUM",),
RX_FRAME_START: ("RX_FRAME",),
RX_CHECKSUM_ADDRESS: ("RX_CHECKSUM",),
}
if any(name in operand_upper for name in names.get(address, ())):
return True
negative = (0x10000 - address) & 0xFFFF
return (
f"H'{address:04X}" in operand_upper
or f"0X{address:04X}" in operand_upper
or f"${address:04X}" in operand_upper
or f"-H'{negative:04X}" in operand_upper
or f"-0X{negative:04X}" in operand_upper
or f"-${negative:04X}" in operand_upper
)
def _mnemonic_root(mnemonic: str) -> str:
return mnemonic.rsplit(".", 1)[0].upper()
def _access_width(mnemonic: str) -> int:
upper = mnemonic.upper()
if upper.endswith(".L"):
return 4
if upper.endswith(".W"):
return 2
return 1
def _confidence_score(
frame_supported: bool,
dispatch: JsonObject | None,
responses: list[JsonObject],
commands: list[JsonObject],
) -> float:
score = 0.2
if frame_supported:
score += 0.25
if dispatch:
score += 0.2
if responses:
score += min(0.2, 0.04 * len(responses))
if commands:
score += min(0.15, 0.02 * len(commands))
return round(min(score, 0.9), 2)
def _confidence_label(score: float) -> str:
if score >= 0.75:
return "medium-high"
if score >= 0.5:
return "medium"
return "low"
def _dedupe_ints(values: Iterable[int]) -> list[int]:
seen: set[int] = set()
output: list[int] = []
for value in values:
if value in seen:
continue
seen.add(value)
output.append(value)
return output
def _hlist(values: Iterable[int]) -> list[str]:
return [_h16(value) for value in _dedupe_ints(values)]
def _h16(value: int, *, width: int = 4) -> str:
return f"H'{value & 0xFFFF:0{width}X}"
__all__ = ["analyze_serial_semantics"]