1
0
Files
h8-536-decoder/h8536/serial_pseudocode.py
2026-05-27 21:37:50 +10:00

1588 lines
63 KiB
Python

from __future__ import annotations
import argparse
import json
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from .serial_semantics import analyze_serial_semantics
JsonObject = dict[str, Any]
@dataclass(frozen=True)
class SerialPseudocodeOptions:
include_tx: bool = True
include_rx: bool = True
include_evidence: bool = True
include_manual: bool = True
include_board: bool = True
include_semantics: bool = True
def generate_serial_pseudocode(
payload: JsonObject,
*,
source_name: str = "",
options: SerialPseudocodeOptions | None = None,
) -> str:
opts = options or SerialPseudocodeOptions()
tx_candidate = _find_candidate(payload, "candidate_sci1_tx_frame")
rx_candidate = _find_candidate(payload, "candidate_sci1_rx_frame")
serial_semantics = analyze_serial_semantics(payload) if opts.include_semantics else None
lines: list[str] = []
lines.extend(_file_header(source_name, tx_candidate, rx_candidate))
if opts.include_board:
lines.extend(_board_comment_lines(payload))
if opts.include_manual:
lines.extend(_manual_reference_lines(payload))
lines.extend(_sci_link_lines(payload))
lines.extend(_declarations(tx_candidate, rx_candidate))
if opts.include_semantics:
lines.extend(_semantics_lines(serial_semantics, opts))
emitted = False
if opts.include_tx and tx_candidate:
lines.extend(_tx_functions(tx_candidate, opts))
emitted = True
if opts.include_rx and rx_candidate:
lines.extend(_rx_functions(rx_candidate, opts))
emitted = True
if not emitted:
lines.append("/* No requested SCI serial reconstruction candidates were present in the JSON input. */")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def load_serial_pseudocode_input(path: Path) -> JsonObject:
with path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict) or "instructions" not in payload:
raise ValueError(f"{path} does not look like h8536_decompiler JSON output")
return payload
def write_serial_pseudocode(
input_path: Path,
output_path: Path,
options: SerialPseudocodeOptions,
) -> None:
payload = load_serial_pseudocode_input(input_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(
generate_serial_pseudocode(payload, source_name=str(input_path), options=options),
encoding="utf-8",
)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description="Generate focused C-like SCI RX/TX pseudocode from h8536_decompiler JSON output.",
)
parser.add_argument(
"input",
nargs="?",
type=Path,
default=Path("build/rom_decompiled.json"),
help="structured JSON emitted by h8536_decompiler.py",
)
parser.add_argument(
"--out",
type=Path,
default=Path("build/rom_serial_pseudocode.c"),
help="focused serial pseudocode output path",
)
mode = parser.add_mutually_exclusive_group()
mode.add_argument("--tx-only", action="store_true", help="emit only the candidate TX path")
mode.add_argument("--rx-only", action="store_true", help="emit only the candidate RX path")
parser.add_argument("--no-evidence", action="store_true", help="omit evidence-address comments")
parser.add_argument("--no-manual", action="store_true", help="omit manual-reference comments")
parser.add_argument("--no-board", action="store_true", help="omit board/MAX202 comments")
parser.add_argument("--no-semantics", action="store_true", help="omit candidate command/field semantics")
args = parser.parse_args(argv)
options = SerialPseudocodeOptions(
include_tx=not args.rx_only,
include_rx=not args.tx_only,
include_evidence=not args.no_evidence,
include_manual=not args.no_manual,
include_board=not args.no_board,
include_semantics=not args.no_semantics,
)
write_serial_pseudocode(args.input, args.out, options)
print(f"wrote {args.out}")
return 0
def _file_header(
source_name: str,
tx_candidate: JsonObject | None,
rx_candidate: JsonObject | None,
) -> list[str]:
source = f" from {source_name}" if source_name else ""
lines = [
"/*",
f" * H8/536 focused SCI RX/TX pseudocode{source}",
" *",
" * This is a protocol-oriented reconstruction from decompiler JSON metadata.",
" * It is intentionally phrased as candidate behavior: it summarizes evidence",
" * from the ROM without claiming source-level intent or a proven packet format.",
" */",
"",
]
if tx_candidate or rx_candidate:
lines.append("/* Candidate summary:")
for candidate in (tx_candidate, rx_candidate):
if not candidate:
continue
lines.append(
" * - "
+ _candidate_label(candidate)
+ f": confidence {candidate.get('confidence', 'unknown')} "
+ f"({candidate.get('confidence_score', 'n/a')})"
)
reason = str(candidate.get("confidence_reason") or "").strip()
if reason:
lines.append(f" * reason: {_comment_text(reason)}")
caveat = str(candidate.get("caveat") or "").strip()
if caveat:
lines.append(f" * caveat: {_comment_text(caveat)}")
lines.append(" */")
lines.append("")
return lines
def _board_comment_lines(payload: JsonObject) -> list[str]:
board = payload.get("board_profile")
if not isinstance(board, dict):
return []
lines = ["/* Board path:"]
summary = str(board.get("summary") or "").strip()
if summary:
lines.append(f" * {summary}")
for trace in _mapping_items(board.get("traces")):
signal = trace.get("signal", "?")
h8_pin = trace.get("h8_pin", "?")
h8_name = trace.get("h8_pin_name", "?")
max202_pin = trace.get("max202_pin", "?")
evidence = str(trace.get("evidence") or "").strip()
lines.append(f" * - {signal}: H8 pin {h8_pin} {h8_name} <-> MAX202 pin {max202_pin}")
if evidence:
lines.append(f" * evidence: {_comment_text(evidence)}")
lines.append(" */")
lines.append("")
return lines
def _manual_reference_lines(payload: JsonObject) -> list[str]:
refs: list[str] = []
for section in ("sci_protocol", "board_profile"):
data = payload.get(section)
if isinstance(data, dict):
refs.extend(str(ref) for ref in data.get("manual_references", []) if ref)
refs = _dedupe(refs)
if not refs:
return []
lines = ["/* Manual anchors used by the decompiler metadata:"]
for ref in refs:
lines.append(f" * - {_comment_text(ref)}")
lines.append(" */")
lines.append("")
return lines
def _sci_link_lines(payload: JsonObject) -> list[str]:
config = _first_sci1_configuration(payload)
dtc_note = _dtc_sci_note(payload)
if not config and not dtc_note:
return []
lines = ["/* SCI1 link layer:"]
if config:
mode_summary = str(config.get("mode_summary") or config.get("mode") or "SCI mode")
char_format = "8E1" if "8-bit even parity 1 stop" in mode_summary else mode_summary
smr = config.get("smr_hex") or _optional_hex(config.get("smr"), width=2)
brr = config.get("brr_hex") or _optional_hex(config.get("brr"), width=2)
scr = config.get("scr_hex") or _optional_hex(config.get("scr"), width=2)
clock_source = config.get("clock_source") or "clock source unknown"
lines.append(
" * - "
+ _comment_text(
f"{char_format} SCI characters carry the 6 protocol bytes; "
f"{mode_summary}, clock {clock_source}, SMR={smr}, BRR={brr}, SCR={scr}",
),
)
baud = config.get("baud_bps")
if isinstance(baud, (int, float)):
lines.append(f" * - candidate baud: {_comment_text(str(baud))} bps")
else:
lines.append(" * - baud is clock-dependent; pass --clock-hz on the decompiler for bps.")
if dtc_note:
lines.append(f" * - {_comment_text(dtc_note)}")
lines.append(" */")
lines.append("")
return lines
def _first_sci1_configuration(payload: JsonObject) -> JsonObject | None:
sci = payload.get("sci")
if not isinstance(sci, dict):
return None
channels = sci.get("channels")
if not isinstance(channels, dict):
return None
sci1 = channels.get("SCI1")
if not isinstance(sci1, dict):
return None
configurations = sci1.get("configurations")
if not isinstance(configurations, list):
return None
for item in configurations:
if isinstance(item, dict):
return item
return None
def _dtc_sci_note(payload: JsonObject) -> str:
vectors = payload.get("dtc_vectors")
if not isinstance(vectors, list):
return ""
sources = {
str(item.get("source", "")).lower()
for item in vectors
if isinstance(item, dict)
}
if "sci1_rxi" in sources or "sci1_txi" in sources:
return "SCI1 DTC vector entries are present; review DTC metadata before treating RX/TX as CPU-only."
return "No SCI1 RXI/TXI DTC vector entries are present in JSON; RX/TX are modeled as CPU ISR paths."
def _declarations(tx_candidate: JsonObject | None, rx_candidate: JsonObject | None) -> list[str]:
candidate = tx_candidate or rx_candidate or {}
channel = str(candidate.get("channel") or "SCI1")
tdr = _int_field(tx_candidate, "tdr_address", 0xFEDB)
rdr = _int_field(rx_candidate, "rdr_address", 0xFEDD)
scr = tdr - 1 if tdr else 0xFEDA
ssr = rdr - 1 if rdr else 0xFEDC
lines = [
"#include <stdbool.h>",
"#include <stdint.h>",
"",
"typedef uint8_t u8;",
"typedef uint16_t u16;",
"",
"#define BIT(n) (1u << (n))",
"extern volatile u8 MEM8[0x10000];",
"",
f"#define {channel}_SCR MEM8[{_c_hex(scr)}]",
f"#define {channel}_TDR MEM8[{_c_hex(tdr)}]",
f"#define {channel}_SSR MEM8[{_c_hex(ssr)}]",
f"#define {channel}_RDR MEM8[{_c_hex(rdr)}]",
"",
"#define SCI_SCR_TIE 0x80u",
"#define SCI_SCR_RIE 0x40u",
"#define SCI_SCR_TE 0x20u",
"#define SCI_SCR_RE 0x10u",
"#define SCI_SSR_TDRE 0x80u",
"#define SCI_SSR_RDRF 0x40u",
"#define SCI_SSR_ORER 0x20u",
"#define SCI_SSR_FER 0x10u",
"#define SCI_SSR_PER 0x08u",
"",
]
if tx_candidate:
tx_start = _int_field(tx_candidate, "buffer_start", 0xF858)
tx_index = _int_field(tx_candidate, "tx_index_address", 0xF9C2)
length = _int_field(tx_candidate, "frame_length", 6)
checksum_index = max(length - 1, 0)
lines.extend(
[
f"#define SCI1_TX_FRAME_LENGTH {length}u",
f"#define SCI1_TX_FRAME_BASE {_c_hex(tx_start)}",
"#define SCI1_TX_FRAME_BYTE(n) MEM8[(u16)(SCI1_TX_FRAME_BASE + (n))]",
f"#define SCI1_TX_FRAME_CHECKSUM SCI1_TX_FRAME_BYTE({checksum_index}u)",
f"#define SCI1_TX_INDEX MEM8[{_c_hex(tx_index)}]",
"#define TX_FRAME_LENGTH SCI1_TX_FRAME_LENGTH",
f"#define TX_FRAME(n) MEM8[(u16)({_c_hex(tx_start)} + (n))]",
"#define TX_INDEX SCI1_TX_INDEX",
"",
],
)
if rx_candidate:
capture_start = _int_field(rx_candidate, "capture_buffer_start", 0xF868)
frame_start = _int_field(rx_candidate, "validation_buffer_start", 0xF860)
rx_index = _int_field(rx_candidate, "rx_index_address", 0xF9C3)
timeout = _int_field(rx_candidate, "interbyte_timeout_address", 0xF9C1)
complete = _int_field(rx_candidate, "complete_timer_address", 0xF9C5)
length = _int_field(rx_candidate, "frame_length", 6)
error_handling = rx_candidate.get("rx_error_handling")
error_latch = 0xFAA4
if isinstance(error_handling, dict):
error_latch = _int_field(error_handling, "error_latch_address", 0xFAA4)
lines.extend(
[
f"#define RX_FRAME_LENGTH {length}u",
f"#define RX_CAPTURE(n) MEM8[(u16)({_c_hex(capture_start)} + (n))]",
f"#define RX_FRAME(n) MEM8[(u16)({_c_hex(frame_start)} + (n))]",
f"#define RX_INDEX MEM8[{_c_hex(rx_index)}]",
f"#define RX_INTERBYTE_TIMEOUT MEM8[{_c_hex(timeout)}]",
f"#define RX_COMPLETE_TIMER MEM8[{_c_hex(complete)}]",
f"#define RX_ERROR_LATCH MEM8[{_c_hex(error_latch)}]",
"#define RX_ERROR_LATCH_PHYSICAL_ERROR 0x80u",
"",
],
)
return lines
def _semantics_lines(
analysis: JsonObject | None,
opts: SerialPseudocodeOptions,
) -> list[str]:
if not isinstance(analysis, dict):
return []
protocols = analysis.get("protocol_semantics")
if not isinstance(protocols, list) or not protocols:
return []
protocol = protocols[0]
if not isinstance(protocol, dict):
return []
lines: list[str] = ["/* Candidate Protocol Semantics"]
lines.append(
f" * confidence: {protocol.get('confidence', 'unknown')} "
f"({protocol.get('confidence_score', 'n/a')})",
)
caveat = str(protocol.get("caveat") or "").strip()
if caveat:
lines.append(f" * caveat: {_comment_text(caveat)}")
layout = protocol.get("byte_layout")
if isinstance(layout, list) and layout:
lines.append(" * byte layout:")
for item in layout:
if not isinstance(item, dict):
continue
offset = item.get("offset", "?")
name = item.get("name_candidate", "byte")
semantic = item.get("semantic", "")
confidence = item.get("confidence", "unknown")
lines.append(f" * - byte{offset}: {name} ({confidence}) - {_comment_text(str(semantic))}")
dispatch = protocol.get("command_dispatch")
if isinstance(dispatch, dict):
values = ", ".join(str(value) for value in dispatch.get("command_values_hex", []))
lines.append(
" * dispatch: command_low3 = RX_FRAME(0) & 0x07"
+ (f"; observed {values}" if values else ""),
)
state_split = dispatch.get("state_split") or dispatch.get("dispatcher_split")
if isinstance(state_split, dict):
initial = ", ".join(str(item) for item in state_split.get("initial_idle_commands_hex", []) if item)
continuation = ", ".join(str(item) for item in state_split.get("continuation_commands_hex", []) if item)
lines.append(
" * dispatcher split: FAA2 == 0 accepts initial/idle commands "
f"{initial or 'none'}; FAA2 != 0 accepts continuation commands {continuation or 'none'}",
)
caveat = str(state_split.get("caveat") or "").strip()
if caveat:
lines.append(f" * dispatcher caveat: {_comment_text(caveat)}")
if opts.include_evidence:
lines.append(f" * dispatch evidence: {_hex_join(dispatch.get('evidence_addresses_hex'))}")
index_decoder = protocol.get("index_decoder")
if isinstance(index_decoder, dict):
lines.append(
" * index decoder: RX[1:2] -> logical index via "
f"{index_decoder.get('label', 'loc_622B')} ({index_decoder.get('confidence', 'unknown')})",
)
commands = [item for item in protocol.get("commands", []) if isinstance(item, dict)]
if commands:
lines.append(" * command candidates:")
for command in commands:
value = command.get("command_value_hex", "??")
name = command.get("name_candidate", "unknown")
summary = _comment_text(str(command.get("summary") or ""))
handler = command.get("handler_start_hex") or "multiple"
responses = ", ".join(str(item) for item in command.get("response_candidates", [])) or "none"
lines.append(f" * - {value} {name}: {summary}; handler {handler}; responses {responses}")
availability = _comment_text(str(command.get("availability_summary") or ""))
if availability:
lines.append(f" * availability: {availability}")
for note in command.get("semantic_notes", []):
lines.append(f" * note: {_comment_text(str(note))}")
lines.extend(_command_effect_comment_lines(protocol.get("command_effects"), opts, prefix=" * "))
lines.extend(_response_schema_comment_lines(_schema_list(protocol), opts, prefix=" * "))
lines.extend(_table_map_comment_lines(_table_map_list(protocol), opts, prefix=" * "))
lines.extend(_panel_selector_comment_lines(protocol.get("panel_selector_semantics"), opts, prefix=" * "))
lines.extend(_state_variable_comment_lines(protocol.get("state_variable_candidates"), opts, prefix=" * "))
lines.extend(_retry_error_comment_lines(protocol.get("retry_error_model"), opts, prefix=" * "))
lines.extend(_gate_queue_comment_lines(protocol.get("gate_queue_model"), opts, prefix=" * "))
lines.extend(_tx_report_comment_lines(protocol.get("tx_report_model"), opts, prefix=" * "))
lines.extend(_periodic_resend_comment_lines(protocol.get("periodic_resend_model"), opts, prefix=" * "))
lines.extend(_timer_architecture_comment_lines(protocol, opts, prefix=" * "))
lines.append(" */")
lines.append("")
lines.extend(
[
"static u8 sci1_rx_candidate_command(void)",
"{",
" return (u8)(RX_FRAME(0) & 0x07u);",
"}",
"",
"static u16 sci1_rx_candidate_value(void)",
"{",
" return (u16)(((u16)RX_FRAME(3) << 8) | RX_FRAME(4));",
"}",
"",
"static u16 sci1_rx_candidate_logical_index(void)",
"{",
" u8 page = RX_FRAME(1);",
" u8 offset = RX_FRAME(2);",
"",
" if (page == 0u && offset <= 0x7Fu) {",
" return offset;",
" }",
" if (page == 1u) {",
" return (u16)(0x0080u + offset);",
" }",
" if (page == 2u && offset <= 0x7Fu) {",
" return (u16)(0x0180u + offset);",
" }",
" return 0x01FFu;",
"}",
"",
],
)
lines.extend(_gate_queue_predicate_function_lines(protocol.get("gate_queue_model")))
lines.extend(_timer_architecture_function_lines(protocol))
lines.extend(_panel_selector_function_lines(protocol.get("panel_selector_semantics")))
lines.extend(_panel_selector_provisional_function_lines(protocol.get("panel_selector_semantics")))
lines.extend(
[
"void sci1_process_candidate_protocol_command(void)",
"{",
" u8 command = sci1_rx_candidate_command();",
" u16 logical_index = sci1_rx_candidate_logical_index();",
" u16 value = sci1_rx_candidate_value();",
"",
" sci1_candidate_panel_selector_annotation(logical_index, value);",
"",
],
)
lines.extend(_command_dispatch_switch_lines(commands, opts))
return lines
def _command_dispatch_switch_lines(commands: list[JsonObject], opts: SerialPseudocodeOptions) -> list[str]:
initial = [
command for command in commands
if _command_has_availability(command, "initial_idle_dispatch")
]
continuation = [
command for command in commands
if _command_has_availability(command, "continuation_dispatch")
]
if not initial and not continuation:
return _flat_command_switch_lines(commands, opts, indent=" ") + ["}", ""]
lines = [
" bool session_active = MEM8[0xFAA2u] != 0u;",
"",
" if (!session_active) {",
" /* Initial/idle dispatcher: valid checksum/no RX error, FAA2 == 0. */",
]
lines.extend(_flat_command_switch_lines(initial, opts, indent=" "))
lines.extend(
[
" return;",
" }",
"",
" /* Continuation dispatcher: FAA2 != 0. */",
],
)
lines.extend(_flat_command_switch_lines(continuation, opts, indent=" "))
lines.extend(["}", ""])
return lines
def _flat_command_switch_lines(
commands: list[JsonObject],
opts: SerialPseudocodeOptions,
*,
indent: str,
) -> list[str]:
lines = [f"{indent}switch (command) {{"]
for command in commands:
value = command.get("command_value")
if not isinstance(value, int):
continue
name = _safe_identifier(str(command.get("name_candidate") or f"command_{value:02X}"))
summary = _comment_text(str(command.get("summary") or "candidate command semantics unknown"))
evidence = _hex_join(command.get("evidence_addresses_hex"))
lines.append(f"{indent}case 0x{value:02X}u:")
lines.append(f"{indent} /* {name}: {summary}")
availability = _comment_text(str(command.get("availability_summary") or ""))
if availability:
lines.append(f"{indent} * availability: {availability}")
for note in command.get("semantic_notes", []):
lines.append(f"{indent} * note: {_comment_text(str(note))}")
for effect_line in _command_effect_switch_lines(command):
lines.append(f"{indent} * {effect_line}")
if opts.include_evidence and evidence:
lines.append(f"{indent} * evidence: {evidence}")
lines.append(f"{indent} */")
if value == 0x01:
lines.append(f"{indent} if ((RX_FRAME(1) & 0x80u) != 0u) {{")
lines.append(f"{indent} candidate_unknown_command(command, logical_index, value);")
lines.append(f"{indent} break;")
lines.append(f"{indent} }}")
lines.append(f"{indent} candidate_{name}(logical_index, value);")
lines.append(f"{indent} break;")
lines.extend(
[
f"{indent}default:",
f"{indent} candidate_unknown_command(command, logical_index, value);",
f"{indent} break;",
f"{indent}}}",
],
)
return lines
def _command_has_availability(command: JsonObject, availability: str) -> bool:
value = command.get("availability")
if value == availability:
return True
if isinstance(value, list) and availability in value:
return True
conditions = " ".join(str(item) for item in command.get("availability_conditions", []))
if availability == "initial_idle_dispatch":
return "FAA2 == 0" in conditions
if availability == "continuation_dispatch":
return "FAA2 != 0" in conditions
return False
def _command_effect_comment_lines(
value: object,
opts: SerialPseudocodeOptions,
*,
prefix: str,
) -> list[str]:
effects = [item for item in _object_list(value) if item.get("effects") or item.get("summary")]
if not effects:
return []
lines = [f"{prefix}command effects:"]
for item in effects:
command = item.get("command_value_hex") or _command_hex(item.get("command_value"))
name = item.get("name_candidate") or "unknown_command"
summary = _comment_text(str(item.get("summary") or "candidate effects"))
lines.append(f"{prefix}- {command} {name}: {summary}")
for effect in _object_list(item.get("effects"))[:3]:
effect_text = _effect_summary(effect)
if effect_text:
lines.append(f"{prefix} effect: {effect_text}")
evidence = _hex_join(item.get("evidence_addresses_hex"))
if opts.include_evidence and evidence:
lines.append(f"{prefix} evidence: {evidence}")
return lines
def _response_schema_comment_lines(
schemas: list[JsonObject],
opts: SerialPseudocodeOptions,
*,
prefix: str,
) -> list[str]:
if not schemas:
return []
lines = [f"{prefix}response schemas:"]
for schema in schemas[:4]:
response_id = schema.get("response_id") or schema.get("call_address_hex") or "candidate_response"
byte_text = _response_schema_summary(schema)
lines.append(f"{prefix}- {response_id}: {byte_text}")
for note in schema.get("semantic_notes", []):
lines.append(f"{prefix} note: {_comment_text(str(note))}")
evidence = _hex_join(schema.get("evidence_addresses_hex"))
if opts.include_evidence and evidence:
lines.append(f"{prefix} evidence: {evidence}")
if len(schemas) > 4:
lines.append(f"{prefix}- ... {len(schemas) - 4} more candidate response schemas")
return lines
def _table_map_comment_lines(
tables: list[JsonObject],
opts: SerialPseudocodeOptions,
*,
prefix: str,
) -> list[str]:
if not tables:
return []
lines = [f"{prefix}table map candidates:"]
for table in tables[:6]:
name = table.get("name_candidate") or "unnamed_table_candidate"
address = table.get("logical_base_address_hex") or table.get("address_hex") or "?"
element = table.get("element_candidate")
accesses = ", ".join(str(item) for item in table.get("observed_accesses", []) if item) or "access?"
detail = f"{name} at {address}"
if element:
detail += f" ({element})"
lines.append(f"{prefix}- {_comment_text(detail)}; observed {accesses}")
evidence = _hex_join(table.get("evidence_addresses_hex"))
if opts.include_evidence and evidence:
lines.append(f"{prefix} evidence: {evidence}")
if len(tables) > 6:
lines.append(f"{prefix}- ... {len(tables) - 6} more table candidates")
return lines
def _panel_selector_comment_lines(
value: object,
opts: SerialPseudocodeOptions,
*,
prefix: str,
) -> list[str]:
selectors = _object_list(value)
if not selectors:
return []
lines = [f"{prefix}panel selector semantics:"]
for selector in selectors[:6]:
selector_hex = selector.get("selector_hex") or _selector_hex(selector.get("selector"))
name = selector.get("name") or "panel_selector"
current = selector.get("current_word_address_hex") or "current table"
dispatch = selector.get("dispatch_handler") or "dispatch unknown"
summary = _comment_text(str(selector.get("summary") or "bench/ROM selector annotation"))
lines.append(f"{prefix}- {selector_hex} {name}: {summary}")
lines.append(f"{prefix} current word: {current}; dispatch: {dispatch}")
for effect in _object_list(selector.get("effects"))[:4]:
mask = effect.get("mask_hex") or _selector_hex(effect.get("mask"))
effect_name = effect.get("name") or "effect"
when_set = _comment_text(str(effect.get("when_set") or "set"))
bits = ", ".join(str(item) for item in effect.get("ram_bits", []))
suffix = f"; RAM {bits}" if bits else ""
lines.append(f"{prefix} {mask} -> {effect_name}: {when_set}{suffix}")
meanings = []
for meaning in _object_list(selector.get("value_meanings"))[:4]:
value_hex = meaning.get("value_hex") or _selector_hex(meaning.get("value"))
label = _comment_text(str(meaning.get("meaning") or "known panel value"))
meanings.append(f"{value_hex} {label}")
if meanings:
lines.append(f"{prefix} observed values: {'; '.join(meanings)}")
state_machine = selector.get("state_machine")
if isinstance(state_machine, dict):
name_candidate = state_machine.get("name_candidate") or "selector_state_machine_candidate"
summary = _comment_text(str(state_machine.get("summary") or "bench-proven selector state-machine candidate"))
lines.append(f"{prefix} state machine: {name_candidate}: {summary}")
active = state_machine.get("active_report_frame")
clear = state_machine.get("clear_report_frame")
ack = state_machine.get("ack_frame")
mirror_active = state_machine.get("active_mirror_frame")
mirror_clear = state_machine.get("clear_mirror_frame")
if active or clear or ack:
lines.append(
f"{prefix} frames: active report {active or '?'}; clear report {clear or '?'}; "
f"ACK {ack or '?'}; mirror active {mirror_active or '?'}; mirror clear {mirror_clear or '?'}"
)
triggers = []
for trigger in _object_list(selector.get("local_triggers"))[:3]:
source = trigger.get("source") or trigger.get("handler") or "local path"
summary = _comment_text(str(trigger.get("summary") or "local trigger candidate"))
name_candidate = trigger.get("name_candidate")
prefix_text = f"{name_candidate} " if name_candidate else ""
triggers.append(f"{prefix_text}{source}: {summary}")
if triggers:
lines.append(f"{prefix} local trigger candidates: {'; '.join(triggers)}")
evidence = ", ".join(str(item) for item in selector.get("evidence", []) if item)
if opts.include_evidence and evidence:
lines.append(f"{prefix} evidence: {_comment_text(evidence)}")
if len(selectors) > 6:
lines.append(f"{prefix}- ... {len(selectors) - 6} more panel selector annotations")
return lines
def _state_variable_comment_lines(
value: object,
opts: SerialPseudocodeOptions,
*,
prefix: str,
) -> list[str]:
states = sorted(
_object_list(value),
key=lambda item: item.get("address") if isinstance(item.get("address"), int) else 0x10000,
)
if not states:
return []
lines = [f"{prefix}state variable candidates:"]
for state in states[:6]:
name = state.get("name_candidate") or "unnamed_state_candidate"
address = state.get("address_hex") or _command_hex(state.get("address"))
reads = state.get("read_count", "?")
writes = state.get("write_count", "?")
bits = ", ".join(str(item) for item in state.get("bit_candidates", []))
suffix = f"; bits {bits}" if bits else ""
lines.append(f"{prefix}- {_comment_text(str(name))} {address}: reads {reads}, writes {writes}{suffix}")
evidence = _hex_join(state.get("evidence_addresses_hex"))
if opts.include_evidence and evidence:
lines.append(f"{prefix} evidence: {evidence}")
if len(states) > 6:
lines.append(f"{prefix}- ... {len(states) - 6} more state-variable candidates")
return lines
def _retry_error_comment_lines(
value: object,
opts: SerialPseudocodeOptions,
*,
prefix: str,
) -> list[str]:
if not isinstance(value, dict):
return []
checksum = value.get("checksum_failure_path")
retry = value.get("retry_path")
command_07 = value.get("command_0x07_path")
lines = [f"{prefix}retry/error model candidate:"]
if isinstance(checksum, dict):
condition = _comment_text(str(checksum.get("condition_candidate") or "checksum failure"))
target = checksum.get("error_target") or checksum.get("error_target_address_hex") or "error target"
lines.append(f"{prefix}- checksum path: {condition} -> {target}")
if isinstance(retry, dict):
counter = retry.get("counter_address_hex") or "counter?"
threshold = retry.get("threshold_candidate", "?")
summary = _comment_text(str(retry.get("summary") or "candidate retry path"))
lines.append(f"{prefix}- retry path: counter {counter}, threshold {threshold}; {summary}")
echo = retry.get("echo_response_candidate")
if isinstance(echo, dict):
staging = _comment_text(str(echo.get("staging_candidate") or "retry/error echo staging"))
caveat = _comment_text(str(echo.get("observed_frame_caveat") or ""))
lines.append(f"{prefix} echo path: {echo.get('entry_label', 'loc_BE4D')}: {staging}")
if caveat:
lines.append(f"{prefix} echo caveat: {caveat}")
if isinstance(command_07, dict):
summary = _comment_text(str(command_07.get("summary") or "candidate command 0x07 path"))
lines.append(f"{prefix}- command 0x07 path: {summary}")
evidence = _hex_join(value.get("evidence_addresses_hex"))
if opts.include_evidence and evidence:
lines.append(f"{prefix}- evidence: {evidence}")
return lines
def _gate_queue_comment_lines(
value: object,
opts: SerialPseudocodeOptions,
*,
prefix: str,
) -> list[str]:
if not isinstance(value, dict):
return []
lines = [f"{prefix}gate/queue state machine candidate:"]
for predicate in _object_list(value.get("predicates")):
name = predicate.get("name") or "predicate_candidate"
condition = _comment_text(str(predicate.get("condition_candidate") or "condition unknown"))
summary = _comment_text(str(predicate.get("summary") or "candidate gate"))
lines.append(f"{prefix}- {name}: {condition}; {summary}")
enqueued = predicate.get("enqueued_report_candidate_hex")
if enqueued:
lines.append(f"{prefix} enqueues report {enqueued}")
write_semantics = str(predicate.get("write_semantics_candidate") or "").strip()
if write_semantics:
lines.append(f"{prefix} write semantics: {_comment_text(write_semantics)}")
runtime = predicate.get("runtime_trace_confirmation")
if isinstance(runtime, dict):
frame = runtime.get("emitted_frame_hex")
path = " -> ".join(str(item) for item in runtime.get("dequeue_path", []) if item)
detail = f"runtime-confirmed frame {frame}" if frame else "runtime-confirmed path"
if path:
detail += f" via {path}"
lines.append(f"{prefix} {detail}")
for effect in _object_list(value.get("session_effects")):
name = effect.get("name") or "session_effect_candidate"
summary = _comment_text(str(effect.get("summary") or "candidate session effect"))
commands = ", ".join(str(item) for item in effect.get("command_values_hex", []) if item)
suffix = f"; commands {commands}" if commands else ""
lines.append(f"{prefix}- {name}: {summary}{suffix}")
caveat = str(value.get("caveat") or "").strip()
if caveat:
lines.append(f"{prefix}- caveat: {_comment_text(caveat)}")
evidence = _hex_join(value.get("evidence_addresses_hex"))
if opts.include_evidence and evidence:
lines.append(f"{prefix}- evidence: {evidence}")
return lines
def _gate_queue_predicate_function_lines(value: object) -> list[str]:
if not isinstance(value, dict):
return []
return [
"static bool sci1_candidate_main_report_gate_open(void)",
"{",
" bool session_idle = MEM8[0xFAA2u] == 0u;",
" bool rx_gate_open = (MEM8[0xFAA5u] & 0x80u) == 0u || MEM8[0xF9C3u] == 0u;",
" bool tx_timer_clear = MEM8[0xF9C0u] == 0u;",
"",
" return session_idle && rx_gate_open && tx_timer_clear;",
"}",
"",
"static bool sci1_candidate_report_queue_nonempty(void)",
"{",
" return MEM8[0xF9B5u] != MEM8[0xF9B0u];",
"}",
"",
"static bool sci1_candidate_idle_heartbeat_enqueue_gate_open(void)",
"{",
" bool idle_timer_clear = MEM8[0xF9C4u] == 0u;",
" bool rx_gate_open = (MEM8[0xFAA5u] & 0x80u) == 0u || MEM8[0xF9C3u] == 0u;",
" bool queue_empty = MEM8[0xF9B0u] == MEM8[0xF9B5u];",
"",
" return idle_timer_clear && rx_gate_open && queue_empty;",
"}",
"",
"static void sci1_candidate_enqueue_idle_heartbeat_report(void)",
"{",
" if (!sci1_candidate_idle_heartbeat_enqueue_gate_open()) {",
" return;",
" }",
"",
" /* loc_4067 writes MOV:G.W #H'00, so the queue report id is 0x0000. */",
" candidate_enqueue_report(0x0000u);",
"}",
"",
"static bool sci1_candidate_periodic_resend_gate_open(void)",
"{",
" bool pending = (MEM8[0xFAA5u] & MEM8[0xFAA3u] & 0x80u) != 0u;",
" bool period_elapsed = MEM8[0xF9C6u] == 0u && MEM8[0xF9C7u] == 0u;",
" bool resend_countdown_active = MEM8[0xF9C8u] != 0u;",
"",
" return pending && period_elapsed && resend_countdown_active;",
"}",
"",
]
def _tx_report_comment_lines(
value: object,
opts: SerialPseudocodeOptions,
*,
prefix: str,
) -> list[str]:
if not isinstance(value, dict):
return []
entry = value.get("entry_label") or value.get("entry_address_hex") or "TX report path"
source = _comment_text(str(value.get("value_source_candidate") or "current value table"))
lines = [f"{prefix}TX/autonomous report model candidate:"]
lines.append(f"{prefix}- {entry} -> loc_BA26: bytes 0..2 encode candidate logical index/report id; bytes 3..4 come from {source}; byte5 is 0x5A XOR checksum")
overlay = _object_list(value.get("observed_capture_overlay_candidates"))
if overlay:
observed = []
for item in overlay[:3]:
name = item.get("name_candidate") or "observed_report_candidate"
frames = ", ".join(str(frame) for frame in item.get("observed_frames_hex", []) if frame)
if frames:
observed.append(f"{name}: {frames}")
if observed:
lines.append(f"{prefix}- observed overlay candidates: {_comment_text('; '.join(observed))}")
for runtime in _object_list(value.get("runtime_confirmed_paths")):
name = runtime.get("name") or "runtime_confirmation"
frame = runtime.get("emitted_frame_hex") or "frame?"
report = runtime.get("report_id_hex") or "report?"
summary = f"{name}: report {report} emits {frame}"
semantics = runtime.get("queue_write_semantics")
if semantics:
summary += f"; {semantics}"
lines.append(f"{prefix}- runtime confirmation: {_comment_text(summary)}")
checks = _object_list(value.get("consistency_checks"))
for check in checks:
name = check.get("name") or "consistency_check"
status = check.get("status") or "info"
summary = _comment_text(str(check.get("summary") or ""))
lines.append(f"{prefix}- consistency {name}: {status}; {summary}")
caveat = str(value.get("observed_autonomous_output_caveat") or value.get("caveat") or "").strip()
if caveat:
lines.append(f"{prefix}- caveat: {_comment_text(caveat)}")
evidence = _hex_join(value.get("evidence_addresses_hex"))
if opts.include_evidence and evidence:
lines.append(f"{prefix}- evidence: {evidence}")
return lines
def _periodic_resend_comment_lines(
value: object,
opts: SerialPseudocodeOptions,
*,
prefix: str,
) -> list[str]:
if not isinstance(value, dict):
return []
lines = [f"{prefix}heartbeat/periodic resend candidate:"]
period = value.get("period_timer")
if isinstance(period, dict):
lines.append(
f"{prefix}- F9C6 reload {period.get('reload_value_hex', '?')}: "
f"{_comment_text(str(period.get('summary') or 'period timer'))}",
)
countdown = value.get("resend_countdown")
if isinstance(countdown, dict):
lines.append(
f"{prefix}- F9C8 reload {countdown.get('reload_value_hex', '?')}: "
f"{_comment_text(str(countdown.get('summary') or 'resend countdown'))}",
)
pending = value.get("pending_mask")
if isinstance(pending, dict):
lines.append(
f"{prefix}- FAA3 mask {pending.get('mask_hex', '?')}: "
f"{_comment_text(str(pending.get('summary') or 'pending mask'))}",
)
resend = value.get("resend_path")
if isinstance(resend, dict):
lines.append(
f"{prefix}- BED5 resend path: {_comment_text(str(resend.get('summary') or 'candidate resend path'))}",
)
evidence = _hex_join(value.get("evidence_addresses_hex"))
if opts.include_evidence and evidence:
lines.append(f"{prefix}- evidence: {evidence}")
return lines
def _timer_architecture_comment_lines(
protocol: JsonObject,
opts: SerialPseudocodeOptions,
*,
prefix: str,
) -> list[str]:
model = _timer_architecture_model(protocol)
if not model:
return []
lines = [f"{prefix}interrupt/timer architecture candidate:"]
sources = _timer_source_models(model)
if sources:
for source_model in sources:
source = str(source_model.get("source") or "timer")
handler = str(source_model.get("handler_address_hex") or source_model.get("vector_address_hex") or "")
details = f" {handler}" if handler else ""
summary = _comment_text(str(source_model.get("summary") or "appears to be a periodic tick ISR for serial counters."))
lines.append(f"{prefix}- {source}{details}: {summary}")
else:
vector = str(model.get("vector_address_hex") or model.get("handler_address_hex") or "H'BEEA")
source = str(model.get("source") or "FRT1 OCIA")
lines.append(
f"{prefix}- {source} {vector} appears to be a periodic tick ISR for serial gate/cadence counters.",
)
counters = _timer_counter_models(model)
for counter in counters:
address = counter.get("address_hex") or _h(_int_field(counter, "address", 0))
name = counter.get("name_candidate") or "counter_candidate"
role = _comment_text(str(counter.get("role") or counter.get("summary") or "candidate decrementing counter"))
lines.append(f"{prefix}- {address} {name}: {role}")
evidence = _hex_join(model.get("evidence_addresses_hex"))
if opts.include_evidence and evidence:
lines.append(f"{prefix}- evidence: {evidence}")
return lines
def _timer_architecture_function_lines(protocol: JsonObject) -> list[str]:
model = _timer_architecture_model(protocol)
if not model:
return []
sources = _timer_source_models(model)
if sources:
lines: list[str] = []
for source_model in sources:
counters = _timer_counter_models(source_model)
if not counters:
continue
source = str(source_model.get("source") or "timer")
handler = str(source_model.get("handler_address_hex") or source_model.get("vector_address_hex") or "")
lines.extend(
_timer_tick_function_lines(
_timer_source_function_name(source),
counters,
f"Candidate periodic tick at {handler or source}: decrement nonzero serial counters.",
)
)
return lines
counters = _timer_counter_models(model)
if not counters:
return []
return _timer_tick_function_lines(
"frt1_ocia_candidate_tick_isr",
counters,
"Candidate periodic tick at H'BEEA: decrement nonzero serial gate/cadence counters.",
)
def _panel_selector_function_lines(value: object) -> list[str]:
selectors = _object_list(value)
if not selectors:
return [
"static void sci1_candidate_panel_selector_annotation(u16 logical_index, u16 value)",
"{",
" (void)logical_index;",
" (void)value;",
"}",
"",
]
lines = [
"static void sci1_candidate_panel_selector_annotation(u16 logical_index, u16 value)",
"{",
" /* Known bench/ROM selector labels. This helper is commentary for the decompile. */",
" switch (logical_index) {",
]
for selector in selectors:
selector_value = selector.get("selector")
if not isinstance(selector_value, int):
continue
selector_hex = selector.get("selector_hex") or f"0x{selector_value:04X}"
name = _comment_text(str(selector.get("name") or "panel selector"))
current = selector.get("current_word_address_hex") or "current table"
dispatch = selector.get("dispatch_handler") or "dispatch unknown"
lines.append(f" case 0x{selector_value & 0x01FF:04X}u:")
lines.append(f" /* {selector_hex} {name}; current word {current}; {dispatch}. */")
for effect in _object_list(selector.get("effects")):
mask = effect.get("mask")
if not isinstance(mask, int):
continue
effect_name = _comment_text(str(effect.get("name") or "panel effect"))
when_set = _comment_text(str(effect.get("when_set") or "set"))
when_clear = _comment_text(str(effect.get("when_clear") or "clear"))
lines.append(f" if ((value & 0x{mask & 0xFFFF:04X}u) != 0u) {{")
lines.append(f" /* {effect_name}: {when_set}. */")
lines.append(" } else {")
lines.append(f" /* {effect_name}: {when_clear}. */")
lines.append(" }")
for meaning in _object_list(selector.get("value_meanings")):
known_value = meaning.get("value")
if not isinstance(known_value, int):
continue
label = _comment_text(str(meaning.get("meaning") or "known panel value"))
lines.append(f" if (value == 0x{known_value & 0xFFFF:04X}u) {{")
lines.append(f" /* {label}. */")
lines.append(" }")
lines.append(" break;")
lines.extend(
[
" default:",
" break;",
" }",
"}",
"",
],
)
return lines
def _panel_selector_provisional_function_lines(value: object) -> list[str]:
selectors = _object_list(value)
lines: list[str] = []
for selector in selectors:
selector_value = selector.get("selector")
if not isinstance(selector_value, int):
continue
state_machine = selector.get("state_machine")
if not isinstance(state_machine, dict):
continue
for trigger in _object_list(selector.get("local_triggers")):
name = str(trigger.get("name_candidate") or "").strip()
if not name:
continue
handler = _comment_text(str(trigger.get("handler") or "handler unknown"))
source = _comment_text(str(trigger.get("source") or "source unknown"))
gate = _comment_text(str(trigger.get("gate") or "gate unknown"))
current_bit = _comment_text(str(trigger.get("current_state_bit") or "current state bit unknown"))
summary = _comment_text(str(trigger.get("summary") or "local trigger candidate"))
active_value = _int_from_object(trigger.get("active_value"), 0x4000)
clear_value = _int_from_object(trigger.get("clear_value"), 0x0000)
active_report = _comment_text(str(state_machine.get("active_report_frame") or "active report unknown"))
clear_report = _comment_text(str(state_machine.get("clear_report_frame") or "clear report unknown"))
ack_frame = _comment_text(str(state_machine.get("ack_frame") or "ACK unknown"))
active_mirror = _comment_text(str(state_machine.get("active_mirror_frame") or "active mirror unknown"))
clear_mirror = _comment_text(str(state_machine.get("clear_mirror_frame") or "clear mirror unknown"))
safe_name = _safe_identifier(name)
lines.extend(
[
f"void {safe_name}(void)",
"{",
f" /* Provisional name for ROM {handler}: {summary} */",
f" /* Source {source}; gate {gate}; current state {current_bit}. */",
" if ((MEM8[0xF6DBu] & BIT(7)) == 0u) {",
" return;",
" }",
" if (MEM8[0xF731u] > 3u) {",
" return;",
" }",
"",
" if ((MEM8[0xF791u] & BIT(5)) == 0u) {",
f" /* Requests selector 0x{selector_value & 0x01FF:04X}=0x{active_value & 0xFFFF:04X}: {active_report}. */",
f" /* CCU should ACK {ack_frame}, then mirror {active_mirror}. */",
" } else {",
f" /* Requests selector 0x{selector_value & 0x01FF:04X}=0x{clear_value & 0xFFFF:04X}: {clear_report}. */",
f" /* CCU should ACK {ack_frame}, then mirror {clear_mirror}. */",
" }",
"}",
"",
],
)
return lines
def _timer_tick_function_lines(function_name: str, counters: list[JsonObject], summary: str) -> list[str]:
lines = [
f"void {function_name}(void)",
"{",
f" /* {_comment_text(summary)} */",
]
for counter in counters:
address = _int_field(counter, "address", 0)
if address == 0:
continue
name = _safe_identifier(str(counter.get("name_candidate") or f"counter_{address:04X}")).upper()
lines.extend(
[
f" /* {name}: {_comment_text(str(counter.get('role') or counter.get('summary') or 'candidate counter'))} */",
f" if (MEM8[{_c_hex(address)}] != 0u) {{",
f" MEM8[{_c_hex(address)}] = (u8)(MEM8[{_c_hex(address)}] - 1u);",
" }",
"",
],
)
lines.extend(["}", ""])
return lines
def _timer_source_models(model: JsonObject) -> list[JsonObject]:
return _object_list(model.get("sources"))
def _timer_source_function_name(source: str) -> str:
root = _safe_identifier(source.lower().replace(" ", "_"))
return f"{root}_candidate_tick_isr"
def _timer_architecture_model(protocol: JsonObject) -> JsonObject:
model = protocol.get("timer_interrupt_model")
if isinstance(model, dict):
return model
if isinstance(protocol.get("gate_queue_model"), dict) or isinstance(protocol.get("periodic_resend_model"), dict):
return {
"source": "FRT1/FRT2 OCIA",
"vector_address_hex": "H'BEEA/H'BF23",
"counters": [
{
"address": 0xF9C0,
"address_hex": "H'F9C0",
"name_candidate": "tx_report_gate_counter_candidate",
"role": "candidate gate counter used before entering the report builder.",
},
{
"address": 0xF9C1,
"address_hex": "H'F9C1",
"name_candidate": "rx_interbyte_timeout_candidate",
"role": "candidate RX interbyte timeout counter.",
},
{
"address": 0xF9C6,
"address_hex": "H'F9C6",
"name_candidate": "periodic_resend_cadence_counter_candidate",
"role": "candidate periodic resend/heartbeat cadence counter.",
},
{
"address": 0xF9C4,
"address_hex": "H'F9C4",
"name_candidate": "idle_heartbeat_gate_countdown_candidate",
"role": "candidate idle/default report enqueue countdown.",
},
],
}
return {}
def _timer_counter_models(model: JsonObject) -> list[JsonObject]:
counters = _object_list(model.get("counters"))
if counters:
return counters
return [
{
"address": 0xF9C0,
"address_hex": "H'F9C0",
"name_candidate": "tx_report_gate_counter_candidate",
"role": "candidate gate counter used before entering the report builder.",
},
{
"address": 0xF9C1,
"address_hex": "H'F9C1",
"name_candidate": "rx_interbyte_timeout_candidate",
"role": "candidate RX interbyte timeout counter.",
},
{
"address": 0xF9C6,
"address_hex": "H'F9C6",
"name_candidate": "periodic_resend_cadence_counter_candidate",
"role": "candidate periodic resend/heartbeat cadence counter.",
},
{
"address": 0xF9C4,
"address_hex": "H'F9C4",
"name_candidate": "idle_heartbeat_gate_countdown_candidate",
"role": "candidate idle/default report enqueue countdown.",
},
]
def _command_effect_switch_lines(command: JsonObject) -> list[str]:
effects = _object_list(command.get("effects"))[:3]
lines = []
for effect in effects:
text = _effect_summary(effect)
if text:
lines.append(f"candidate effect: {text}")
return lines
def _effect_summary(effect: JsonObject) -> str:
kind = str(effect.get("kind") or "effect_candidate")
parts = [kind]
target = effect.get("target_candidate") or effect.get("destination_candidate")
source = effect.get("source_candidate")
operation = effect.get("operation_candidate")
table = effect.get("table_base_hex")
state = effect.get("state_address_hex")
selector = effect.get("selector_without_response_hex")
observed = effect.get("observed_frame_caveat")
if target:
parts.append(f"target {target}")
if source:
parts.append(f"source {source}")
if operation:
parts.append(str(operation))
if table:
parts.append(f"table {table}")
if state:
parts.append(f"state {state}")
if selector:
parts.append(f"selector {selector} has no response")
if observed:
parts.append(str(observed))
return _comment_text("; ".join(parts))
def _schema_list(protocol: JsonObject) -> list[JsonObject]:
schemas = _object_list(protocol.get("response_schemas"))
if schemas:
return schemas
return _object_list(protocol.get("response_schema"))
def _table_map_list(protocol: JsonObject) -> list[JsonObject]:
tables = _object_list(protocol.get("logical_table_map_candidates"))
if tables:
return tables
return _object_list(protocol.get("table_map_candidates"))
def _response_schema_summary(schema: JsonObject) -> str:
parts = []
for item in _object_list(schema.get("bytes")):
label = item.get("byte") or f"byte{item.get('offset', '?')}"
source = item.get("source_expression") or item.get("source_kind") or "unknown"
parts.append(f"{label}={source}")
return _comment_text("; ".join(parts) if parts else "candidate byte sources unknown")
def _object_list(value: object) -> list[JsonObject]:
if not isinstance(value, list):
return []
return [item for item in value if isinstance(item, dict)]
def _command_hex(value: object) -> str:
if isinstance(value, int):
return f"0x{value:02X}"
return "?"
def _selector_hex(value: object) -> str:
if isinstance(value, int):
return f"0x{value & 0xFFFF:04X}"
return "?"
def _int_from_object(value: object, default: int) -> int:
return value if isinstance(value, int) else default
def _tx_functions(candidate: JsonObject, opts: SerialPseudocodeOptions) -> list[str]:
length = _int_field(candidate, "frame_length", 6)
seed = _int_field(candidate, "checksum_seed", 0x5A)
data_length = max(length - 1, 0)
lines = _candidate_comment_block("TX reconstruction evidence", candidate, opts)
lines.extend(
[
"static u8 sci1_tx_candidate_checksum(void)",
"{",
f" u8 checksum = {_c_hex(seed, width=2)};",
],
)
for index in range(data_length):
lines.append(f" checksum ^= TX_FRAME({index});")
lines.extend(
[
" return checksum;",
"}",
"",
"void sci1_tx_start_candidate_frame(void)",
"{",
" /* The ROM appears to have populated TX_FRAME(0..4) before this point. */",
f" TX_FRAME({data_length}) = sci1_tx_candidate_checksum();",
"",
f" while ((SCI1_SSR & SCI_SSR_TDRE) == 0u) {{",
" /* wait for transmit data register empty */",
" }",
"",
" /* First byte is sent synchronously; TIE enables TXI for the remaining bytes. */",
" SCI1_TDR = TX_FRAME(0);",
" TX_INDEX = 1u;",
" SCI1_SSR &= (u8)~SCI_SSR_TDRE;",
" SCI1_SCR |= SCI_SCR_TIE;",
"}",
"",
"void sci1_txi_candidate_isr(void)",
"{",
" /* TXI runs after hardware reasserts SSR.TDRE for the next transmit byte. */",
" if ((SCI1_SSR & SCI_SSR_TDRE) == 0u) {",
" return;",
" }",
"",
" if (TX_INDEX < TX_FRAME_LENGTH) {",
" SCI1_TDR = TX_FRAME(TX_INDEX);",
" TX_INDEX = (u8)(TX_INDEX + 1u);",
" SCI1_SSR &= (u8)~SCI_SSR_TDRE;",
" }",
"",
" if (TX_INDEX >= TX_FRAME_LENGTH) {",
" SCI1_SCR &= (u8)~SCI_SCR_TIE;",
" }",
"}",
"",
],
)
return lines
def _rx_functions(candidate: JsonObject, opts: SerialPseudocodeOptions) -> list[str]:
length = _int_field(candidate, "frame_length", 6)
seed = _int_field(candidate, "checksum_seed", 0x5A)
data_length = max(length - 1, 0)
lines = _candidate_comment_block("RX reconstruction evidence", candidate, opts)
lines.extend(
[
"static u8 sci1_rx_candidate_checksum(void)",
"{",
f" u8 checksum = {_c_hex(seed, width=2)};",
],
)
for index in range(data_length):
lines.append(f" checksum ^= RX_FRAME({index});")
lines.extend(
[
" return checksum;",
"}",
"",
"bool sci1_process_rx_candidate_frame(void)",
"{",
" u8 i;",
"",
" if (RX_INDEX != RX_FRAME_LENGTH) {",
" return false;",
" }",
"",
" for (i = 0u; i < RX_FRAME_LENGTH; i++) {",
" RX_FRAME(i) = RX_CAPTURE(i);",
" }",
"",
f" if (sci1_rx_candidate_checksum() != RX_FRAME({data_length})) {{",
" RX_INDEX = 0u;",
" return false;",
" }",
"",
" RX_INDEX = 0u;",
" return true;",
"}",
"",
"bool sci1_rx_byte_received_candidate_isr(void)",
"{",
" u8 byte;",
"",
" SCI1_SSR &= (u8)~SCI_SSR_RDRF;",
" byte = SCI1_RDR;",
"",
" if (RX_INTERBYTE_TIMEOUT == 0u) {",
" RX_INDEX = 0u;",
" }",
"",
" RX_INTERBYTE_TIMEOUT = 5u;",
" RX_CAPTURE(RX_INDEX) = byte;",
" RX_INDEX = (u8)(RX_INDEX + 1u);",
"",
" if (RX_INDEX == RX_FRAME_LENGTH) {",
" RX_COMPLETE_TIMER = 0x14u;",
" return sci1_process_rx_candidate_frame();",
" }",
"",
" return false;",
"}",
"",
"void sci1_rx_error_candidate_isr(void)",
"{",
" RX_ERROR_LATCH |= RX_ERROR_LATCH_PHYSICAL_ERROR;",
" SCI1_SSR &= (u8)~(SCI_SSR_ORER | SCI_SSR_FER | SCI_SSR_PER);",
" sci1_rx_byte_received_candidate_isr();",
"}",
"",
],
)
return lines
def _candidate_comment_block(
title: str,
candidate: JsonObject,
opts: SerialPseudocodeOptions,
) -> list[str]:
lines = ["/*", f" * {title}"]
comment = str(candidate.get("comment") or candidate.get("short_comment") or "").strip()
if comment:
lines.append(f" * {_comment_text(comment)}")
formula = str(candidate.get("checksum_formula") or "").strip()
if formula:
lines.append(f" * checksum formula: {_comment_text(formula)}")
error_handling = candidate.get("rx_error_handling")
if isinstance(error_handling, dict):
summary = str(error_handling.get("summary") or "").strip()
if summary:
lines.append(f" * RX error handling: {_comment_text(summary)}")
caveat = str(error_handling.get("manual_caveat") or "").strip()
if caveat:
lines.append(f" * RX error caveat: {_comment_text(caveat)}")
if opts.include_evidence:
evidence = candidate.get("evidence_addresses_hex")
if isinstance(evidence, dict):
lines.append(" * evidence addresses:")
for key in sorted(evidence):
addresses = ", ".join(str(item) for item in evidence.get(key, []))
lines.append(f" * - {key}: {addresses}")
lines.append(" */")
return lines
def _find_candidate(payload: JsonObject, kind: str) -> JsonObject | None:
serial = payload.get("serial_reconstruction")
if not isinstance(serial, dict):
return None
candidates = serial.get("candidates")
if not isinstance(candidates, list):
return None
for candidate in candidates:
if isinstance(candidate, dict) and candidate.get("kind") == kind:
return candidate
return None
def _candidate_label(candidate: JsonObject) -> str:
kind = str(candidate.get("kind") or "candidate")
channel = str(candidate.get("channel") or "SCI")
length = candidate.get("frame_length", "?")
if kind.endswith("_tx_frame"):
start = candidate.get("buffer_start_hex") or _h(_int_field(candidate, "buffer_start", 0))
end = candidate.get("buffer_end_hex") or _h(_int_field(candidate, "buffer_end", 0))
return f"{channel} TX {length}-byte frame at {start}-{end}"
if kind.endswith("_rx_frame"):
capture_start = candidate.get("capture_buffer_start_hex") or _h(
_int_field(candidate, "capture_buffer_start", 0),
)
capture_end = candidate.get("capture_buffer_end_hex") or _h(
_int_field(candidate, "capture_buffer_end", 0),
)
return f"{channel} RX {length}-byte frame captured at {capture_start}-{capture_end}"
return f"{channel} {kind}"
def _mapping_items(value: object) -> list[JsonObject]:
if not isinstance(value, list):
return []
return [item for item in value if isinstance(item, dict)]
def _int_field(candidate: JsonObject | None, key: str, default: int) -> int:
if not candidate:
return default
value = candidate.get(key)
if isinstance(value, bool):
return int(value)
if isinstance(value, int):
return value
return default
def _c_hex(value: int, *, width: int = 4) -> str:
return f"0x{value & 0xFFFF:0{width}X}u"
def _optional_hex(value: object, *, width: int = 4) -> str:
if isinstance(value, int):
return f"H'{value & 0xFFFF:0{width}X}"
return "unknown"
def _h(value: int) -> str:
return f"H'{value & 0xFFFF:04X}"
def _dedupe(items: list[str]) -> list[str]:
seen: set[str] = set()
output: list[str] = []
for item in items:
if item in seen:
continue
seen.add(item)
output.append(item)
return output
def _hex_join(value: object) -> str:
if not isinstance(value, list):
return ""
return ", ".join(str(item) for item in value)
def _safe_identifier(value: str) -> str:
cleaned = re.sub(r"[^0-9A-Za-z_]", "_", value.strip())
cleaned = re.sub(r"_+", "_", cleaned).strip("_")
if not cleaned:
return "unknown"
if cleaned[0].isdigit():
return "_" + cleaned
return cleaned
def _comment_text(text: str) -> str:
return text.replace("*/", "* /").replace("\r", " ").replace("\n", " ")
if __name__ == "__main__":
raise SystemExit(main())