1
0

Emulator learnings folded back into decompiler

This commit is contained in:
Aiden
2026-05-25 19:11:22 +10:00
parent 1fabf6587d
commit d2e7609bbf
16 changed files with 1468 additions and 87 deletions

View File

@@ -275,6 +275,11 @@ def _serial_reconstruction_lines(serial_reconstruction: dict[str, object] | None
f"checksum {candidate['checksum_address_hex']} seeded by {candidate['checksum_seed_hex']} "
f"(confidence {confidence} {score})",
)
tx_path = candidate.get("tx_path")
if isinstance(tx_path, dict):
lines.append(
f"; TX path: {tx_path.get('summary', 'interrupt-driven TXI path')}",
)
elif kind == "candidate_sci1_rx_frame":
lines.append(
f"; RX candidate: {candidate['frame_length']} bytes "
@@ -286,6 +291,17 @@ def _serial_reconstruction_lines(serial_reconstruction: dict[str, object] | None
caveat = candidate.get("caveat")
if caveat:
lines.append(f"; caveat: {caveat}")
ram_roles = serial_reconstruction.get("ram_roles", [])
if isinstance(ram_roles, list) and ram_roles:
lines.append("; Serial RAM role candidates")
for role in ram_roles:
if not isinstance(role, dict):
continue
address_text = str(role.get("address_hex") or h16(int(role.get("address") or 0)))
lines.append(
f"; {address_text}: "
f"{role.get('name', 'ram_role')} - {role.get('summary', '')}",
)
lines.append("")
return lines

View File

@@ -61,11 +61,26 @@ SSR_FLAGS = {
}
SSR_CLEAR_ACTIONS = {
7: ("clear_tdre", "clear {channel} transmit data register empty flag (TDRE)"),
6: ("clear_rdrf", "clear {channel} receive-data-full flag (RDRF)"),
5: ("clear_orer", "clear {channel} overrun error flag (ORER)"),
4: ("clear_fer", "clear {channel} framing error flag (FER)"),
3: ("clear_per", "clear {channel} parity error flag (PER)"),
7: (
"clear_tdre",
"clear {channel} TDRE after TDR write; TXI can fire again when hardware reasserts TDRE",
),
6: (
"clear_rdrf",
"clear {channel} RDRF with SSR R/(W)* semantics: write 0 clears latched hardware flag, write 1 preserves hardware-owned state",
),
5: (
"clear_orer",
"clear {channel} ORER with SSR R/(W)* semantics: write 0 clears latched hardware flag, write 1 preserves hardware-owned state",
),
4: (
"clear_fer",
"clear {channel} FER with SSR R/(W)* semantics: write 0 clears latched hardware flag, write 1 preserves hardware-owned state",
),
3: (
"clear_per",
"clear {channel} PER with SSR R/(W)* semantics: write 0 clears latched hardware flag, write 1 preserves hardware-owned state",
),
}
@@ -263,7 +278,10 @@ def _scr_bit_event(
verb = "enable" if enabled else "disable"
action_noun = noun.replace("/", "_").replace(" ", "_").lower()
if bit == 7:
comment = f"{verb} {register.channel} TX interrupt (TIE)"
comment = (
f"{verb} {register.channel} TX interrupt (TIE); "
"gates TXI when hardware sets TDRE"
)
elif bit == 6:
comment = f"{verb} {register.channel} receive and receive-error interrupts (RIE)"
elif bit == 5:

View File

@@ -17,6 +17,7 @@ KEY_STATE_ADDRESSES: tuple[int, ...] = (
0xF9B5,
0xF9B9,
0xF9C0,
0xF9C1,
0xF9C3,
0xF9C5,
0xF9C6,
@@ -51,6 +52,7 @@ def analyze_serial_gate(payload: dict[str, Any]) -> JsonObject:
"queue_send_gate_loc_BAF2": _queue_send_gate(by_address),
"resend_gate_path": _resend_gate_path(by_address),
"rx_session_maintenance": _rx_session_maintenance(by_address),
"timer_tick_evidence": _timer_tick_evidence(payload, by_address),
}
access_summary = _state_access_summary(instructions, labels)
@@ -94,6 +96,11 @@ def format_text_report(analysis: dict[str, Any]) -> str:
lines.append(f" {summary}")
for item in section.get("items", []):
lines.append(f" - {item['address_hex']}: {item['text']}")
roles = section.get("candidate_timer_roles", [])
if roles:
lines.append(" Candidate timer roles:")
for role in roles:
lines.append(f" - {role['address_hex']}: {role['role']}")
lines.extend(["", "State address readers/writers:"])
for entry in analysis.get("state_accesses", []):
@@ -266,6 +273,53 @@ def _rx_session_maintenance(by_address: dict[int, JsonObject]) -> JsonObject:
}
def _timer_tick_evidence(payload: dict[str, Any], by_address: dict[int, JsonObject]) -> JsonObject:
vector = _vector_entry(payload, 0x0062, "frt1_ocia")
handler = _int_field(vector, "target") if vector else None
if handler is None:
handler = 0xBEEA
addresses = [handler, 0xBEEE, 0xBEF4, 0xBEF8, 0xBEFE, 0xBF02, 0xBF08]
has_vector = vector is not None and _int_field(vector, "target") == handler
has_handler_clear = _instruction_mentions(by_address.get(handler), ("FRT1_TCSR", "#5"))
decrement_addresses = (0xBEF4, 0xBEFE, 0xBF08)
has_decrements = all(
address in by_address and _access_kind(by_address[address], state_address) == "read_write"
for address, state_address in zip(decrement_addresses, (0xF9C0, 0xF9C1, 0xF9C6))
)
return {
"title": "FRT1 OCIA periodic tick countdowns",
"present": bool((has_vector or has_handler_clear) and has_decrements),
"summary": (
"Static evidence links vector H'0062 to the FRT1 OCIA handler at H'BEEA; the handler "
"clears FRT1_TCSR.OCFA and conditionally decrements H'F9C0, H'F9C1, and H'F9C6."
),
"vector_address_hex": h16(0x0062),
"handler_address_hex": h16(handler),
"vector_target_label": str(vector.get("target_label", "")) if vector else "",
"items": _items(by_address, addresses),
"candidate_timer_roles": [
{
"address": 0xF9C0,
"address_hex": h16(0xF9C0),
"role": "candidate post-TX/report delay countdown",
"evidence_address_hex": h16(0xBEF4),
},
{
"address": 0xF9C1,
"address_hex": h16(0xF9C1),
"role": "candidate secondary delay countdown",
"evidence_address_hex": h16(0xBEFE),
},
{
"address": 0xF9C6,
"address_hex": h16(0xF9C6),
"role": "candidate periodic report/heartbeat countdown",
"evidence_address_hex": h16(0xBF08),
},
],
}
def _state_access_summary(instructions: list[JsonObject], labels: dict[int, str]) -> list[JsonObject]:
result: list[JsonObject] = []
for state_address in KEY_STATE_ADDRESSES:
@@ -333,6 +387,32 @@ def _has_all(by_address: dict[int, JsonObject], addresses: tuple[int, ...]) -> b
return all(address in by_address for address in addresses)
def _vector_entry(payload: dict[str, Any], address: int, name: str) -> JsonObject | None:
vectors = payload.get("vectors", [])
if not isinstance(vectors, list):
return None
for vector in vectors:
if not isinstance(vector, dict):
continue
if _int_field(vector, "address") == address or str(vector.get("name", "")).lower() == name:
return vector
return None
def _int_field(payload: JsonObject | None, key: str, default: int | None = None) -> int | None:
if not isinstance(payload, dict):
return default
value = payload.get(key)
return value if isinstance(value, int) else default
def _instruction_mentions(ins: JsonObject | None, fragments: tuple[str, ...]) -> bool:
if not isinstance(ins, dict):
return False
text = f"{ins.get('text', '')} {ins.get('operands', '')} {ins.get('comment', '')}".upper()
return all(fragment.upper() in text for fragment in fragments)
def _text(by_address: dict[int, JsonObject], address: int) -> str:
return str(by_address.get(address, {}).get("text", "<missing>"))

View File

@@ -301,11 +301,17 @@ def _declarations(tx_candidate: JsonObject | None, rx_candidate: JsonObject | No
tx_start = _int_field(tx_candidate, "buffer_start", 0xF858)
tx_index = _int_field(tx_candidate, "tx_index_address", 0xF9C2)
length = _int_field(tx_candidate, "frame_length", 6)
checksum_index = max(length - 1, 0)
lines.extend(
[
f"#define TX_FRAME_LENGTH {length}u",
f"#define SCI1_TX_FRAME_LENGTH {length}u",
f"#define SCI1_TX_FRAME_BASE {_c_hex(tx_start)}",
"#define SCI1_TX_FRAME_BYTE(n) MEM8[(u16)(SCI1_TX_FRAME_BASE + (n))]",
f"#define SCI1_TX_FRAME_CHECKSUM SCI1_TX_FRAME_BYTE({checksum_index}u)",
f"#define SCI1_TX_INDEX MEM8[{_c_hex(tx_index)}]",
"#define TX_FRAME_LENGTH SCI1_TX_FRAME_LENGTH",
f"#define TX_FRAME(n) MEM8[(u16)({_c_hex(tx_start)} + (n))]",
f"#define TX_INDEX MEM8[{_c_hex(tx_index)}]",
"#define TX_INDEX SCI1_TX_INDEX",
"",
],
)
@@ -407,6 +413,7 @@ def _semantics_lines(
lines.extend(_gate_queue_comment_lines(protocol.get("gate_queue_model"), opts, prefix=" * "))
lines.extend(_tx_report_comment_lines(protocol.get("tx_report_model"), opts, prefix=" * "))
lines.extend(_periodic_resend_comment_lines(protocol.get("periodic_resend_model"), opts, prefix=" * "))
lines.extend(_timer_architecture_comment_lines(protocol, opts, prefix=" * "))
lines.append(" */")
lines.append("")
@@ -442,6 +449,7 @@ def _semantics_lines(
],
)
lines.extend(_gate_queue_predicate_function_lines(protocol.get("gate_queue_model")))
lines.extend(_timer_architecture_function_lines(protocol))
lines.extend(
[
"void sci1_process_candidate_protocol_command(void)",
@@ -741,6 +749,123 @@ def _periodic_resend_comment_lines(
return lines
def _timer_architecture_comment_lines(
protocol: JsonObject,
opts: SerialPseudocodeOptions,
*,
prefix: str,
) -> list[str]:
model = _timer_architecture_model(protocol)
if not model:
return []
vector = str(model.get("vector_address_hex") or "H'BEEA")
source = str(model.get("source") or "FRT1 OCIA")
lines = [f"{prefix}interrupt/timer architecture candidate:"]
lines.append(
f"{prefix}- {source} {vector} appears to be a periodic tick ISR for serial gate/cadence counters.",
)
counters = _timer_counter_models(model)
for counter in counters:
address = counter.get("address_hex") or _h(_int_field(counter, "address", 0))
name = counter.get("name_candidate") or "counter_candidate"
role = _comment_text(str(counter.get("role") or counter.get("summary") or "candidate decrementing counter"))
lines.append(f"{prefix}- {address} {name}: {role}")
evidence = _hex_join(model.get("evidence_addresses_hex"))
if opts.include_evidence and evidence:
lines.append(f"{prefix}- evidence: {evidence}")
return lines
def _timer_architecture_function_lines(protocol: JsonObject) -> list[str]:
model = _timer_architecture_model(protocol)
if not model:
return []
counters = _timer_counter_models(model)
if not counters:
return []
lines = [
"void frt1_ocia_candidate_tick_isr(void)",
"{",
" /* Candidate periodic tick at H'BEEA: decrement nonzero serial gate/cadence counters. */",
]
for counter in counters:
address = _int_field(counter, "address", 0)
if address == 0:
continue
name = _safe_identifier(str(counter.get("name_candidate") or f"counter_{address:04X}")).upper()
lines.extend(
[
f" /* {name}: {_comment_text(str(counter.get('role') or counter.get('summary') or 'candidate counter'))} */",
f" if (MEM8[{_c_hex(address)}] != 0u) {{",
f" MEM8[{_c_hex(address)}] = (u8)(MEM8[{_c_hex(address)}] - 1u);",
" }",
"",
],
)
lines.extend(["}", ""])
return lines
def _timer_architecture_model(protocol: JsonObject) -> JsonObject:
model = protocol.get("timer_interrupt_model")
if isinstance(model, dict):
return model
if isinstance(protocol.get("gate_queue_model"), dict) or isinstance(protocol.get("periodic_resend_model"), dict):
return {
"source": "FRT1 OCIA",
"vector_address_hex": "H'BEEA",
"counters": [
{
"address": 0xF9C0,
"address_hex": "H'F9C0",
"name_candidate": "tx_report_gate_counter_candidate",
"role": "candidate gate counter used before entering the report builder.",
},
{
"address": 0xF9C1,
"address_hex": "H'F9C1",
"name_candidate": "rx_interbyte_timeout_candidate",
"role": "candidate RX interbyte timeout counter.",
},
{
"address": 0xF9C6,
"address_hex": "H'F9C6",
"name_candidate": "periodic_resend_cadence_counter_candidate",
"role": "candidate periodic resend/heartbeat cadence counter.",
},
],
}
return {}
def _timer_counter_models(model: JsonObject) -> list[JsonObject]:
counters = _object_list(model.get("counters"))
if counters:
return counters
return [
{
"address": 0xF9C0,
"address_hex": "H'F9C0",
"name_candidate": "tx_report_gate_counter_candidate",
"role": "candidate gate counter used before entering the report builder.",
},
{
"address": 0xF9C1,
"address_hex": "H'F9C1",
"name_candidate": "rx_interbyte_timeout_candidate",
"role": "candidate RX interbyte timeout counter.",
},
{
"address": 0xF9C6,
"address_hex": "H'F9C6",
"name_candidate": "periodic_resend_cadence_counter_candidate",
"role": "candidate periodic resend/heartbeat cadence counter.",
},
]
def _command_effect_switch_lines(command: JsonObject) -> list[str]:
effects = _object_list(command.get("effects"))[:3]
lines = []
@@ -835,6 +960,7 @@ def _tx_functions(candidate: JsonObject, opts: SerialPseudocodeOptions) -> list[
" /* wait for transmit data register empty */",
" }",
"",
" /* First byte is sent synchronously; TIE enables TXI for the remaining bytes. */",
" SCI1_TDR = TX_FRAME(0);",
" TX_INDEX = 1u;",
" SCI1_SSR &= (u8)~SCI_SSR_TDRE;",
@@ -843,6 +969,11 @@ def _tx_functions(candidate: JsonObject, opts: SerialPseudocodeOptions) -> list[
"",
"void sci1_txi_candidate_isr(void)",
"{",
" /* TXI runs after hardware reasserts SSR.TDRE for the next transmit byte. */",
" if ((SCI1_SSR & SCI_SSR_TDRE) == 0u) {",
" return;",
" }",
"",
" if (TX_INDEX < TX_FRAME_LENGTH) {",
" SCI1_TDR = TX_FRAME(TX_INDEX);",
" TX_INDEX = (u8)(TX_INDEX + 1u);",

View File

@@ -15,6 +15,12 @@ TX_BUFFER_END = TX_CHECKSUM_ADDRESS
TX_INDEX_ADDRESS = 0xF9C2
TX_FRAME_LENGTH = 6
CHECKSUM_SEED = 0x5A
FRT1_OCIA_VECTOR_ADDRESS = 0x0062
FRT1_OCIA_ISR_ADDRESS = 0xBEEA
FRT1_TCSR_ADDRESS = 0xFE91
POST_TX_REPORT_DELAY_ADDRESS = 0xF9C0
SECONDARY_TX_REPORT_DELAY_ADDRESS = 0xF9C1
PERIODIC_REPORT_COUNTDOWN_ADDRESS = 0xF9C6
RX_FRAME_START = 0xF860
RX_CHECKSUM_ADDRESS = 0xF865
@@ -60,7 +66,7 @@ def analyze_serial_reconstruction(
) -> dict[str, object]:
"""Reconstruct conservative serial-frame candidates from independent evidence."""
ordered = _instruction_sequence(instructions)
evidence = _collect_tx_evidence(ordered) + _collect_rx_evidence(ordered)
evidence = _collect_tx_evidence(ordered) + _collect_rx_evidence(ordered) + _collect_timer_role_evidence(ordered)
candidates = [
candidate
for candidate in (
@@ -69,6 +75,7 @@ def analyze_serial_reconstruction(
)
if candidate is not None
]
ram_roles = _ram_roles_from_evidence(evidence)
annotations: dict[int, list[str]] = {}
instruction_metadata: dict[int, list[dict[str, object]]] = {}
@@ -88,9 +95,25 @@ def analyze_serial_reconstruction(
_instruction_metadata(candidate, item, address, comment),
)
for role in ram_roles:
for item in role["evidence"]:
if not isinstance(item, Mapping):
continue
comment = _comment_for_ram_role(role, item)
for address in item.get("addresses", []):
if not isinstance(address, int):
continue
annotations.setdefault(address, [])
if comment not in annotations[address]:
annotations[address].append(comment)
instruction_metadata.setdefault(address, []).append(
_ram_role_instruction_metadata(role, item, address, comment),
)
return {
"kind": "serial_reconstruction",
"candidates": candidates,
"ram_roles": ram_roles,
"evidence": evidence,
"required_evidence": {
"tx": list(_TX_REQUIRED_EVIDENCE),
@@ -139,6 +162,7 @@ def serial_reconstruction_json_payload(analysis: Mapping[str, object] | None) ->
return {
"kind": "serial_reconstruction",
"candidates": [],
"ram_roles": [],
"evidence": [],
"required_evidence": {
"tx": list(_TX_REQUIRED_EVIDENCE),
@@ -148,6 +172,7 @@ def serial_reconstruction_json_payload(analysis: Mapping[str, object] | None) ->
return {
"kind": analysis.get("kind", "serial_reconstruction"),
"candidates": analysis.get("candidates", []),
"ram_roles": analysis.get("ram_roles", []),
"evidence": analysis.get("evidence", []),
"required_evidence": analysis.get(
"required_evidence",
@@ -408,6 +433,69 @@ def _collect_rx_evidence(ordered: list[Instruction]) -> list[dict[str, object]]:
return evidence
def _collect_timer_role_evidence(ordered: list[Instruction]) -> list[dict[str, object]]:
evidence: list[dict[str, object]] = []
tick_ack = [
ins
for ins in ordered
if ins.address == FRT1_OCIA_ISR_ADDRESS and _is_bclr_bit(ins, FRT1_TCSR_ADDRESS, 5)
]
if tick_ack:
evidence.append(
_evidence(
"frt1_ocia_periodic_tick_isr",
tick_ack,
summary=(
f"candidate periodic tick ISR at {h16(FRT1_OCIA_ISR_ADDRESS)} "
f"for FRT1 OCIA vector {h16(FRT1_OCIA_VECTOR_ADDRESS)} clears OCFA"
),
vector_address=FRT1_OCIA_VECTOR_ADDRESS,
vector_address_hex=h16(FRT1_OCIA_VECTOR_ADDRESS),
isr_address=FRT1_OCIA_ISR_ADDRESS,
isr_address_hex=h16(FRT1_OCIA_ISR_ADDRESS),
),
)
for role_name, address, width_bits, summary in (
(
"post_tx_report_delay",
POST_TX_REPORT_DELAY_ADDRESS,
8,
"candidate post-TX/report delay timer is decremented by the FRT1 OCIA periodic tick ISR",
),
(
"secondary_tx_report_delay",
SECONDARY_TX_REPORT_DELAY_ADDRESS,
8,
"candidate secondary TX/report delay timer is decremented by the FRT1 OCIA periodic tick ISR",
),
(
"periodic_report_countdown",
PERIODIC_REPORT_COUNTDOWN_ADDRESS,
16,
"candidate periodic report countdown is decremented by the FRT1 OCIA periodic tick ISR",
),
):
sequence = _timer_tick_decrement_sequence(ordered, address)
if sequence:
evidence.append(
_evidence(
f"{role_name}_tick_decrement",
sequence,
summary=summary,
role_name=role_name,
ram_address=address,
ram_address_hex=h16(address),
width_bits=width_bits,
isr_address=FRT1_OCIA_ISR_ADDRESS,
isr_address_hex=h16(FRT1_OCIA_ISR_ADDRESS),
),
)
return evidence
def _tx_candidate_from_evidence(evidence: list[dict[str, object]]) -> dict[str, object] | None:
evidence_by_key = {str(item["kind"]): item for item in evidence}
missing = [key for key in _TX_REQUIRED_EVIDENCE if key not in evidence_by_key]
@@ -436,6 +524,42 @@ def _tx_candidate_from_evidence(evidence: list[dict[str, object]]) -> dict[str,
"checksum_seed": CHECKSUM_SEED,
"checksum_seed_hex": h16(CHECKSUM_SEED),
"checksum_formula": "checksum = 0x5A ^ byte0 ^ byte1 ^ byte2 ^ byte3 ^ byte4",
"roles": [
{
"name": "tx_frame",
"address": TX_BUFFER_START,
"address_hex": h16(TX_BUFFER_START),
"end_address": TX_BUFFER_END,
"end_address_hex": h16(TX_BUFFER_END),
"summary": "evidence-supported candidate SCI1 TX frame buffer",
},
{
"name": "tx_checksum",
"address": TX_CHECKSUM_ADDRESS,
"address_hex": h16(TX_CHECKSUM_ADDRESS),
"checksum_seed": CHECKSUM_SEED,
"checksum_seed_hex": h16(CHECKSUM_SEED),
"summary": "evidence-supported candidate SCI1 TX XOR checksum byte",
},
{
"name": "tx_index",
"address": TX_INDEX_ADDRESS,
"address_hex": h16(TX_INDEX_ADDRESS),
"summary": "evidence-supported candidate SCI1 TX frame index",
},
],
"tx_path": {
"kind": "interrupt_driven_txi",
"initial_tdr_write_address": _last_address(evidence_by_key["initial_send_from_buffer_start"]),
"initial_tdr_write_address_hex": h16(_last_address(evidence_by_key["initial_send_from_buffer_start"])),
"txi_indexed_tdr_write_address": _last_address(evidence_by_key["tx_isr_indexed_send"]),
"txi_indexed_tdr_write_address_hex": h16(_last_address(evidence_by_key["tx_isr_indexed_send"])),
"summary": (
"initial byte is written from the TX frame buffer, then subsequent bytes are sent "
"by the TXI path when TDRE is reasserted"
),
"tdre_caveat": "TDRE reassertion is hardware/emulator timing context; static evidence is the indexed TXI send path.",
},
"confidence": "high",
"confidence_score": 0.95,
"confidence_reason": "all required independent evidence groups were observed",
@@ -462,6 +586,46 @@ def _tx_candidate_from_evidence(evidence: list[dict[str, object]]) -> dict[str,
return candidate
def _ram_roles_from_evidence(evidence: list[dict[str, object]]) -> list[dict[str, object]]:
evidence_by_key = {str(item["kind"]): item for item in evidence}
tick = evidence_by_key.get("frt1_ocia_periodic_tick_isr")
roles: list[dict[str, object]] = []
for role_name, address, width_bits in (
("post_tx_report_delay", POST_TX_REPORT_DELAY_ADDRESS, 8),
("secondary_tx_report_delay", SECONDARY_TX_REPORT_DELAY_ADDRESS, 8),
("periodic_report_countdown", PERIODIC_REPORT_COUNTDOWN_ADDRESS, 16),
):
decrement = evidence_by_key.get(f"{role_name}_tick_decrement")
if decrement is None:
continue
role_evidence = [item for item in (tick, decrement) if isinstance(item, Mapping)]
roles.append(
{
"kind": "candidate_ram_role",
"name": role_name,
"address": address,
"address_hex": h16(address),
"width_bits": width_bits,
"confidence": "candidate/evidence-supported",
"summary": (
f"{role_name} at {h16(address)} is a candidate/evidence-supported RAM timer "
f"role; FRT1 OCIA tick ISR {h16(FRT1_OCIA_ISR_ADDRESS)} decrements it"
),
"caveat": "role name is evidence-supported from static references plus emulator-guided timer behavior, not a proved firmware symbol",
"evidence": role_evidence,
"evidence_addresses": {
str(item["kind"]): list(item["addresses"])
for item in role_evidence
},
"evidence_addresses_hex": {
str(item["kind"]): list(item["addresses_hex"])
for item in role_evidence
},
},
)
return roles
def _rx_candidate_from_evidence(evidence: list[dict[str, object]]) -> dict[str, object] | None:
evidence_by_key = {str(item["kind"]): item for item in evidence}
missing = [key for key in _RX_REQUIRED_EVIDENCE if key not in evidence_by_key]
@@ -572,6 +736,35 @@ def _instruction_metadata(
}
def _comment_for_ram_role(role: Mapping[str, object], item: Mapping[str, object]) -> str:
return (
f"candidate/evidence-supported RAM role {role['name']} at {role['address_hex']}; "
f"evidence: {item['summary']}; confidence {role['confidence']}"
)
def _ram_role_instruction_metadata(
role: Mapping[str, object],
item: Mapping[str, object],
address: int,
comment: str,
) -> dict[str, object]:
return {
"address": address,
"action": "serial_reconstruction_ram_role",
"role_name": role["name"],
"role_kind": role["kind"],
"role_address": role["address"],
"role_address_hex": role["address_hex"],
"evidence": item["kind"],
"evidence_summary": item["summary"],
"evidence_addresses": list(item["addresses"]),
"evidence_addresses_hex": list(item["addresses_hex"]),
"confidence": role["confidence"],
"comment": comment,
}
def _buffer_region_references(ordered: list[Instruction]) -> list[Instruction]:
return [ins for ins in ordered if _buffer_refs(ins)]
@@ -761,6 +954,31 @@ def _rx_xor_checksum_validation(ordered: list[Instruction]) -> list[Instruction]
return []
def _timer_tick_decrement_sequence(ordered: list[Instruction], address: int) -> list[Instruction]:
for index, ins in enumerate(ordered):
if not (
ins.address >= FRT1_OCIA_ISR_ADDRESS
and _mnemonic_root(ins.mnemonic) == "TST"
and address in ins.references
):
continue
window = ordered[index + 1 : index + 5]
decrement = next(
(
candidate
for candidate in window
if candidate.address >= FRT1_OCIA_ISR_ADDRESS
and _is_write_to_address(candidate, address)
and _mnemonic_root(candidate.mnemonic) in {"ADD:Q", "ADD:G", "ADDS", "SUB", "SUBS"}
and _immediate_source_value(candidate.operands) in {0xFFFF, 0xFF, 1}
),
None,
)
if decrement is not None and "-1" in decrement.operands:
return [ins, decrement]
return []
def _rx_rdrf_clear_before_rdr_read(ordered: list[Instruction]) -> list[Instruction]:
for index, ins in enumerate(ordered):
if not _is_bclr_bit(ins, SCI1_SSR_ADDRESS, 6):
@@ -950,6 +1168,13 @@ def _dedupe_ints(values: Iterable[int]) -> list[int]:
return output
def _last_address(item: Mapping[str, object]) -> int:
addresses = item.get("addresses", [])
if not isinstance(addresses, list) or not addresses:
return 0
return int(addresses[-1])
def _instruction_sequence(
instructions: Mapping[int, Instruction] | Iterable[Instruction],
) -> list[Instruction]:
@@ -997,6 +1222,7 @@ def _operand_mentions_address(operand: str, address: int) -> bool:
SCI1_TDR_ADDRESS: ("SCI1_TDR",),
SCI1_SSR_ADDRESS: ("SCI1_SSR",),
SCI1_RDR_ADDRESS: ("SCI1_RDR",),
FRT1_TCSR_ADDRESS: ("FRT1_TCSR",),
TX_BUFFER_START: ("TX_BUFFER",),
TX_CHECKSUM_ADDRESS: ("TX_CHECKSUM",),
TX_INDEX_ADDRESS: ("TX_INDEX",),