from __future__ import annotations import argparse from collections import Counter from dataclasses import dataclass, field from pathlib import Path from typing import Iterable from ..formatting import h16, parse_int from .cli import load_rom from .constants import HEARTBEAT_FRAME from .memory import MemoryAccess from .runner import H8536Emulator from .rx_probe import ( UartTiming, _inject_frame_uart_timed, _interrupt_mask, _rx_byte_consumed, _rx_ready, _run_until, _sci1_priority, format_frame, frame_checksum_ok, parse_frame, ) DEFAULT_EEPROM_LOAD = Path("build") / "bench-sync-after-dip-reset.bin" DEFAULT_FRAMES = ( bytes.fromhex("0000008000DA"), bytes.fromhex("01000000005B"), bytes.fromhex("07000000005D"), ) BENCH_SIGNATURES = { bytes.fromhex("04000080805E"): "bench_cmd0_echo_80", bytes.fromhex("02000200005A"): "bench_connect_ok", bytes.fromhex("07804040A07D"): "bench_40a0_retry_echo_candidate_40", bytes.fromhex("07808040A0BD"): "bench_40a0_retry_echo_candidate_80", bytes.fromhex("0780C040A0FD"): "bench_40a0_retry_echo_candidate_c0", } WATCH_PCS = { 0x3FD3: "report_scheduler_gate", 0x3FEF: "report_scheduler_return", 0x4007: "resend_or_session_gate", 0xBBF0: "rx_checksum_compare", 0xBE29: "rx_checksum_retry_error", 0xBE4D: "retry_echo_stage", 0xBC0F: "faa2_split", 0xBC15: "initial_latch_faa2_bit7", 0xBC33: "initial_dispatch_fallthrough", 0xBC3A: "continuation_dispatch", 0xBC5C: "continuation_clear_queued_ack", 0xBC67: "continuation_reenter_initial", 0xBC69: "cmd0_set_value", 0xBCD7: "cmd1_read_value", 0xBE05: "cmd7_copy_or_retry", 0xBE70: "selector_queue_be70", 0xBA26: "tx_builder_ba26", 0xBA72: "tx_first_byte", 0xBA84: "txi_entry_ba84", 0xBA8A: "txi_faa2_bit3_test", 0xBA90: "txi_rx_index_test", 0xBA96: "txi_overlap_collapse", 0xBA9A: "txi_clear_pending_mask", 0xBAA2: "txi_disable_tie", 0xBAB5: "txi_next_byte", 0xBAF2: "report_dequeue_baf2", 0xBB00: "report_session_latch", 0xBB08: "report_selector_read", 0xBB1C: "report_selector_encode", 0xBB20: "report_selector_encode_hi", 0xBB2B: "report_payload_read", 0xBB35: "report_value_stage", 0xBB43: "report_send", 0xBE9E: "session_resend_gate", 0xBED5: "session_resend_send", } WATCH_GROUPS = { "cmd0_reached_BC69": (0xBC69,), "cmd1_reached_BCD7": (0xBCD7,), "retry_echo": (0xBE29, 0xBE4D), "cmd7_replay": (0xBE05,), "autonomous_report": (0xBAF2, 0xBB00, 0xBB35, 0xBB43), "tx_rx_overlap_collapse": (0xBA96, 0xBA9A, 0xBAA2), } WATCH_WRITE_RANGES = ( (0xF860, 0xF865, "rx_validation_F860_F865"), (0xF868, 0xF86D, "rx_capture_F868_F86D"), (0xF850, 0xF85D, "tx_staging_F850_F85D"), (0xF870, 0xF96F, "report_queue_F870_F96F"), (0xF970, 0xF9AF, "selector_queue_F970_F9AF"), (0xF9B0, 0xF9B0, "report_head_F9B0"), (0xF9B4, 0xF9B4, "selector_tail_F9B4"), (0xF9B5, 0xF9B5, "report_tail_F9B5"), (0xF9C0, 0xF9C0, "tx_gate_F9C0"), (0xF9C1, 0xF9C1, "rx_interbyte_timeout_F9C1"), (0xF9C3, 0xF9C3, "rx_index_F9C3"), (0xF9C5, 0xF9C5, "rx_session_timeout_F9C5"), (0xFAA2, 0xFAA6, "serial_latches_FAA2_FAA6"), (0xE000, 0xE001, "primary_table_E000"), (0xE800, 0xE801, "current_table_E800"), (0xEC00, 0xEC01, "flag_table_EC00"), ) STATE_BYTES = { 0xF9B0: "F9B0_report_head", 0xF9B4: "F9B4_selector_tail", 0xF9B5: "F9B5_report_tail", 0xF9C0: "F9C0_tx_gate", 0xF9C1: "F9C1_rx_interbyte_timeout", 0xF9C3: "F9C3_rx_index", 0xF9C5: "F9C5_rx_session_timeout", 0xFAA2: "FAA2_session_flags", 0xFAA3: "FAA3_pending_mask", 0xFAA4: "FAA4_rx_error_latch", 0xFAA5: "FAA5_retry_gate_flags", 0xFAA6: "FAA6_retry_counter", } STATE_BUFFERS = { "F850_F85D_tx_staging": (0xF850, 14), "F860_F865_rx_validation": (0xF860, 6), "F868_F86D_rx_capture": (0xF868, 6), "F870_F87F_report_queue_head": (0xF870, 16), "F970_F97F_selector_queue_head": (0xF970, 16), "E000_E001_primary_0000": (0xE000, 2), "E800_E801_current_0000": (0xE800, 2), "EC00_EC01_flags_0000": (0xEC00, 2), "E880_E881_current_0040": (0xE880, 2), "E900_E901_current_0080": (0xE900, 2), "E980_E981_current_00C0": (0xE980, 2), } @dataclass class DivergenceContext: pc_hits: Counter[int] = field(default_factory=Counter) first_pcs: list[int] = field(default_factory=list) unsupported: str | None = None def record_pc(self, pc: int) -> None: if pc not in WATCH_PCS: return self.pc_hits[pc] += 1 if len(self.first_pcs) < 32: self.first_pcs.append(pc) @dataclass(frozen=True) class RxDivergenceConfig: boot_steps: int = 250_000 wait_heartbeats: int = 0 wait_heartbeat_steps: int = 500_000 post_frame_steps: int = 80_000 per_byte_steps: int = 5_000 interval_steps: int = 512 frt1_ocia_steps: int | None = None frt2_ocia_steps: int | None = None clock_hz: int = 10_000_000 uart_timing: bool = False uart_baud: int = 38_400 p7_input: int = 0xFF p9_fast_path: bool = True p9_fast_input: int = 0xFF p9_fast_optimistic_wrapper: bool = False eeprom_seed: str = "blank" eeprom_load: Path | None = DEFAULT_EEPROM_LOAD stop_after_tx_frame: bool = False @dataclass(frozen=True) class FrameTrace: frame: bytes checksum_ok: bool rx_injection: str steps: int stopped_reason: str tx_frames: tuple[bytes, ...] state_changes: tuple[str, ...] writes: tuple[str, ...] context: DivergenceContext def outcome(self) -> str: flags = classify_hits(self.context.pc_hits) return " ".join(f"{name}={int(value)}" for name, value in flags.items()) def lines(self, index: int, *, summary_only: bool = False) -> list[str]: lines = [ ( f"frame[{index}] host={format_frame(self.frame)} checksum_ok={int(self.checksum_ok)} " f"steps={self.steps} stopped={self.stopped_reason} {self.outcome()}" ) ] if self.tx_frames: lines.append(" tx=" + " | ".join(label_frame(frame) for frame in self.tx_frames)) else: lines.append(" tx=none") if summary_only: return lines lines.append(f" rx_injection={self.rx_injection}") hit_lines = format_pc_hits(self.context) if hit_lines: lines.append(" pcs=" + " ".join(hit_lines)) if self.state_changes: lines.append(" state=" + "; ".join(self.state_changes)) if self.writes: lines.append(" writes:") lines.extend(f" {line}" for line in self.writes) if self.context.unsupported: lines.append(f" unsupported={self.context.unsupported}") return lines @dataclass(frozen=True) class RxDivergenceResult: rom_path: Path eeprom_load: Path | None reset_vector: int boot_summary: str heartbeat_summary: str | None traces: tuple[FrameTrace, ...] all_tx_frames: tuple[bytes, ...] def lines(self, *, summary_only: bool = False) -> list[str]: lines = [ f"rom={self.rom_path}", f"eeprom_loaded={self.eeprom_load if self.eeprom_load else 'none'}", f"reset_vector={h16(self.reset_vector)}", self.boot_summary, bench_signature_line(), ] if self.heartbeat_summary is not None: lines.append(self.heartbeat_summary) for index, trace in enumerate(self.traces): lines.extend(trace.lines(index, summary_only=summary_only)) lines.append("all_tx=" + _format_frame_list(self.all_tx_frames)) return lines def run_rx_divergence( frames: Iterable[bytes], *, rom_path: Path | None = None, config: RxDivergenceConfig = RxDivergenceConfig(), ) -> RxDivergenceResult: rom_bytes, discovered_rom_path = load_rom(rom_path) emulator = H8536Emulator( rom_bytes, interval_steps=config.interval_steps, frt1_ocia_steps=config.frt1_ocia_steps, frt2_ocia_steps=config.frt2_ocia_steps, clock_hz=config.clock_hz, p9_fast_path_enabled=config.p9_fast_path, p9_fast_default_input_byte=config.p9_fast_input, p9_fast_default_wrapper_success=config.p9_fast_optimistic_wrapper, p7_input=config.p7_input, eeprom_seed=config.eeprom_seed, ) eeprom_load = config.eeprom_load if eeprom_load is not None and eeprom_load.is_file(): emulator.memory.load_eeprom_image(eeprom_load.read_bytes()) boot_context = DivergenceContext() boot_steps_used, boot_reason = _run_until(emulator, config.boot_steps, _rx_ready, boot_context) boot_summary = ( f"boot={boot_reason} steps={boot_steps_used} pc={h16(emulator.cpu.pc)} " f"SCR={emulator.sci1.scr:02X} SSR={emulator.sci1.ssr:02X} " f"rx_serviceable={int(_rx_ready(emulator))} " f"sci1_priority={_sci1_priority(emulator)} interrupt_mask={_interrupt_mask(emulator)} " f"clock_hz={emulator.clock_hz} p7_input={config.p7_input:#04x}" ) heartbeat_summary = None if config.wait_heartbeats: heartbeat_summary = _wait_for_heartbeats(emulator, config.wait_heartbeats, config.wait_heartbeat_steps) traces = tuple(_trace_frame(emulator, frame, config) for frame in frames) return RxDivergenceResult( rom_path=discovered_rom_path, eeprom_load=eeprom_load if eeprom_load is not None and eeprom_load.is_file() else None, reset_vector=emulator.reset_vector(), boot_summary=boot_summary, heartbeat_summary=heartbeat_summary, traces=traces, all_tx_frames=tuple(emulator.sci1.tx_frames), ) def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Focused H8/536 RX divergence trace around command dispatch and serial state.") parser.add_argument("frames", nargs="*", type=parse_frame, help="host frame hex; 5-byte inputs get a 0x5A-XOR checksum appended") parser.add_argument("--rom", type=Path, help="ROM image path; defaults to ROM/M27C512@DIP28_1.BIN") parser.add_argument("--default-frames", action="store_true", help="append the known bench-divergence frame trio") parser.add_argument("--boot-steps", type=int, default=RxDivergenceConfig.boot_steps) parser.add_argument("--eeprom-load", type=Path, default=DEFAULT_EEPROM_LOAD, help="logical EEPROM image loaded before boot") parser.add_argument("--no-eeprom-load", action="store_true", help="boot without loading the default bench EEPROM image") parser.add_argument("--p7-input", type=parse_int, default=RxDivergenceConfig.p7_input) parser.add_argument("--wait-heartbeats", type=int, default=RxDivergenceConfig.wait_heartbeats) parser.add_argument("--wait-heartbeat-steps", type=int, default=RxDivergenceConfig.wait_heartbeat_steps) parser.add_argument("--uart-timing", action="store_true", help="inject bytes at UART character timing instead of waiting for RDRF clear") parser.add_argument("--uart-baud", type=parse_int, default=RxDivergenceConfig.uart_baud) parser.add_argument("--post-frame-steps", type=int, default=RxDivergenceConfig.post_frame_steps) parser.add_argument("--per-byte-steps", type=int, default=RxDivergenceConfig.per_byte_steps) parser.add_argument("--clock-hz", type=parse_int, default=RxDivergenceConfig.clock_hz) parser.add_argument("--interval-steps", type=int, default=RxDivergenceConfig.interval_steps) parser.add_argument("--frt1-ocia-steps", type=int, default=RxDivergenceConfig.frt1_ocia_steps) parser.add_argument("--frt2-ocia-steps", type=int, default=RxDivergenceConfig.frt2_ocia_steps) parser.add_argument("--no-p9-fast-path", action="store_true") parser.add_argument("--p9-fast-input", type=parse_int, default=RxDivergenceConfig.p9_fast_input) parser.add_argument("--p9-fast-optimistic-wrapper", action="store_true") parser.add_argument("--eeprom-seed", choices=("blank", "factory"), default=RxDivergenceConfig.eeprom_seed) parser.add_argument("--stop-after-tx-frame", action="store_true", help="stop each post-frame run after the first new TX frame") parser.add_argument("--summary-only", action="store_true", help="omit detailed state/write trace lines") return parser def main(argv: list[str] | None = None) -> int: args = build_arg_parser().parse_args(argv) frames = list(args.frames) if args.default_frames: frames.extend(DEFAULT_FRAMES) if not frames: raise SystemExit("pass at least one frame, or use --default-frames") result = run_rx_divergence( frames, rom_path=args.rom, config=RxDivergenceConfig( boot_steps=args.boot_steps, wait_heartbeats=args.wait_heartbeats, wait_heartbeat_steps=args.wait_heartbeat_steps, post_frame_steps=args.post_frame_steps, per_byte_steps=args.per_byte_steps, interval_steps=args.interval_steps, frt1_ocia_steps=args.frt1_ocia_steps, frt2_ocia_steps=args.frt2_ocia_steps, clock_hz=args.clock_hz, uart_timing=args.uart_timing, uart_baud=args.uart_baud, p7_input=args.p7_input, p9_fast_path=not args.no_p9_fast_path, p9_fast_input=args.p9_fast_input, p9_fast_optimistic_wrapper=args.p9_fast_optimistic_wrapper, eeprom_seed=args.eeprom_seed, eeprom_load=None if args.no_eeprom_load else args.eeprom_load, stop_after_tx_frame=args.stop_after_tx_frame, ), ) for line in result.lines(summary_only=args.summary_only): print(line) return 0 def _trace_frame(emulator: H8536Emulator, frame: bytes, config: RxDivergenceConfig) -> FrameTrace: state_before = snapshot_state(emulator) log_start = len(emulator.memory.access_log) tx_frame_start = len(emulator.sci1.tx_frames) context = DivergenceContext() steps_total = 0 if config.uart_timing: timing = UartTiming(baud=config.uart_baud) steps_total, stopped_reason = _inject_frame_uart_timed( emulator, frame, timing=timing, max_steps_per_gap=config.per_byte_steps, context=context, ) rx_injection = timing.summary(emulator.clock_hz) injected_all_bytes = stopped_reason == "frame_injected_uart_timing" else: rx_injection = "polite_wait_for_rdrf_clear" injected_all_bytes = True stopped_reason = "post_frame_steps" for offset, value in enumerate(frame): emulator.inject_sci1_rx_byte(value) steps, reason = _run_until(emulator, config.per_byte_steps, _rx_byte_consumed, context) steps_total += steps if reason != "predicate": stopped_reason = f"rx_byte_{offset}_{reason}" injected_all_bytes = False break if injected_all_bytes: target_frame_count = tx_frame_start + 1 def stop_predicate(inner: H8536Emulator) -> bool: return config.stop_after_tx_frame and len(inner.sci1.tx_frames) >= target_frame_count steps, reason = _run_until(emulator, config.post_frame_steps, stop_predicate, context) steps_total += steps stopped_reason = "tx_frame" if reason == "predicate" and config.stop_after_tx_frame else reason log_end = len(emulator.memory.access_log) state_after = snapshot_state(emulator) return FrameTrace( frame=frame, checksum_ok=frame_checksum_ok(frame), rx_injection=rx_injection, steps=steps_total, stopped_reason=stopped_reason, tx_frames=tuple(emulator.sci1.tx_frames[tx_frame_start:]), state_changes=tuple(format_state_changes(state_before, state_after)), writes=tuple(format_interesting_writes(emulator.memory.access_log[log_start:log_end])), context=context, ) def _wait_for_heartbeats(emulator: H8536Emulator, target_count: int, max_steps: int) -> str: start_count = _heartbeat_count(emulator.sci1.tx_frames) def predicate(inner: H8536Emulator) -> bool: return _heartbeat_count(inner.sci1.tx_frames) - start_count >= target_count context = DivergenceContext() steps, reason = _run_until(emulator, max_steps, predicate, context) seen = _heartbeat_count(emulator.sci1.tx_frames) - start_count return f"wait_heartbeats target={target_count} seen={seen} steps={steps} stopped={reason}" def snapshot_state(emulator: H8536Emulator) -> dict[str, int | bytes]: state: dict[str, int | bytes] = {} for address, name in STATE_BYTES.items(): state[name] = emulator.memory.read8(address) for name, (address, length) in STATE_BUFFERS.items(): state[name] = bytes(emulator.memory.read8(address + offset) for offset in range(length)) return state def format_state_changes(before: dict[str, int | bytes], after: dict[str, int | bytes]) -> list[str]: lines = [] for key in sorted(after): if before.get(key) == after[key]: continue lines.append(f"{key}:{_state_value(before.get(key))}->{_state_value(after[key])}") return lines def format_interesting_writes(accesses: Iterable[MemoryAccess], *, limit: int = 96) -> list[str]: lines = [] total = 0 for access in accesses: if access.kind != "write": continue label = write_label(access.address) if label is None: continue total += 1 if len(lines) < limit: lines.append(f"{h16(access.address)}={access.value:02X} {label}") if total > limit: lines.append(f"... {total - limit} more watched writes") return lines def write_label(address: int) -> str | None: for start, end, label in WATCH_WRITE_RANGES: if start <= address <= end: return label return None def classify_hits(hits: Counter[int]) -> dict[str, bool]: return {name: any(hits.get(pc, 0) for pc in pcs) for name, pcs in WATCH_GROUPS.items()} def format_pc_hits(context: DivergenceContext) -> list[str]: lines = [f"{h16(pc)}:{WATCH_PCS[pc]}={context.pc_hits[pc]}" for pc in sorted(context.pc_hits)] if context.first_pcs: first = ",".join(h16(pc) for pc in context.first_pcs[:16]) lines.append(f"first={first}") return lines def label_frame(frame: bytes) -> str: signature = BENCH_SIGNATURES.get(frame) if signature: return f"{format_frame(frame)} ({signature})" if frame == HEARTBEAT_FRAME: return f"{format_frame(frame)} (heartbeat)" return format_frame(frame) def bench_signature_line() -> str: return "bench_signatures=" + " | ".join(label_frame(frame) for frame in BENCH_SIGNATURES) def _heartbeat_count(frames: Iterable[bytes]) -> int: return sum(1 for frame in frames if frame == HEARTBEAT_FRAME) def _state_value(value: int | bytes | None) -> str: if isinstance(value, bytes): return format_frame(value) if isinstance(value, int): return f"{value:02X}" return "none" def _format_frame_list(frames: Iterable[bytes]) -> str: items = [label_frame(frame) for frame in frames] return " | ".join(items) if items else "none" __all__ = [ "BENCH_SIGNATURES", "DEFAULT_EEPROM_LOAD", "DEFAULT_FRAMES", "DivergenceContext", "FrameTrace", "RxDivergenceConfig", "RxDivergenceResult", "bench_signature_line", "build_arg_parser", "classify_hits", "format_interesting_writes", "format_pc_hits", "format_state_changes", "label_frame", "main", "parse_frame", "run_rx_divergence", "write_label", ]