from __future__ import annotations import argparse from collections import Counter from dataclasses import dataclass, field from pathlib import Path from ..formatting import h16, parse_int from .cli import load_rom from .constants import ( ON_CHIP_RAM_END, ON_CHIP_RAM_START, P9DDR, P9DR, REGISTER_FIELD_END, REGISTER_FIELD_START, SCI1_BRR, SCI1_RDR, SCI1_SCR, SCI1_SMR, SCI1_SSR, SCI1_TDR, SCI_SCR_TE, SCI_SCR_TIE, SCI_SSR_TDRE, VECTOR_SCI1_TXI, ) from .errors import UnsupportedInstruction from .runner import H8536Emulator DEFAULT_WATCH_PCS = (0xC08B, 0xC0DB, 0xC121, 0xBFE0, 0xBFFE, 0xC059) TX_FRAME_WATCH_PCS = { 0xBA26: "builder_entry", 0xBA4E: "checksum_seed", 0xBA64: "checksum_store", 0xBA72: "first_tdr", 0xBAB5: "txi_tdr", } TX_FRAME_SNAPSHOT_START = 0xF850 TX_FRAME_START = 0xF858 TX_FRAME_LENGTH = 6 TX_FRAME_TRACE_START = TX_FRAME_SNAPSHOT_START TX_FRAME_TRACE_END = TX_FRAME_START + TX_FRAME_LENGTH - 1 REPORT_QUEUE_START = 0xF870 REPORT_QUEUE_SLOTS = 0x80 REPORT_QUEUE_END = REPORT_QUEUE_START + (REPORT_QUEUE_SLOTS * 2) - 1 REPORT_QUEUE_HEAD = 0xF9B0 REPORT_QUEUE_TAIL = 0xF9B5 REPORT_QUEUE_ENTRY_PCS = { 0x3E54: "enqueue_entry", 0xBAF2: "dequeue_entry", } REPORT_QUEUE_FIRST_NONZERO_DISPLAY_LIMIT = 16 REPORT_GATE_PCS = { 0x4046: "gate_entry", 0x404A: "f9c4_nonzero_return_branch", 0x404C: "faa5_bit7_test", 0x4050: "faa5_clear_enqueue_branch", 0x4052: "f9c3_test", 0x4056: "f9c3_zero_enqueue_branch", 0x4058: "gate_return_no_enqueue", 0x4059: "queue_empty_check_start", 0x405F: "queue_empty_compare", 0x4063: "queue_not_empty_return_branch", 0x4067: "enqueue_report_zero", 0x406C: "advance_head", 0x4070: "mask_head", 0x4074: "gate_return", } REPORT_GATE_F9C4 = 0xF9C4 REPORT_GATE_F9C3 = 0xF9C3 REPORT_GATE_FAA5 = 0xFAA5 RAM_LIFECYCLE_DEFAULT_WATCHES = { REPORT_GATE_F9C4: "F9C4_report_gate_timer", REPORT_GATE_F9C3: "F9C3_rx_or_activity_gate", REPORT_GATE_FAA5: "FAA5_report_flags", 0xF9C0: "F9C0_tx_or_report_gate", 0xF9C5: "F9C5_queue_reset_gate", REPORT_QUEUE_HEAD: "F9B0_report_queue_head", REPORT_QUEUE_TAIL: "F9B5_report_queue_tail", } SCI1_PROBE_REGISTERS = { SCI1_SCR: "SCR", SCI1_TDR: "TDR", SCI1_SSR: "SSR", SCI1_RDR: "RDR", } def _format_bytes(data: bytes, *, limit: int = 96) -> str: if len(data) <= limit: return data.hex(" ").upper() head = data[: limit // 2].hex(" ").upper() tail = data[-(limit // 2) :].hex(" ").upper() return f"{head} ... {tail} ({len(data)} bytes)" def _format_six_byte_frames(data: bytes, *, limit: int = 8) -> str: frames = [data[index : index + 6] for index in range(0, len(data) - 5, 6)] if not frames: return "" recent = frames[-limit:] return " | ".join(frame.hex(" ").upper() for frame in recent) def parse_tx_frame(text: str) -> bytes: normalized = ( text.strip() .replace(",", " ") .replace(":", " ") .replace("-", " ") .replace("_", " ") ) parts = normalized.split() if len(parts) == 1: compact = parts[0] if compact.lower().startswith("0x"): compact = compact[2:] if compact.upper().startswith("H'"): compact = compact[2:] if len(compact) % 2: raise argparse.ArgumentTypeError("target frame compact hex must have an even number of digits") parts = [compact[index : index + 2] for index in range(0, len(compact), 2)] values: list[int] = [] for part in parts: token = part if token.lower().startswith("0x"): token = token[2:] if token.upper().startswith("H'"): token = token[2:] if not token or len(token) > 2: raise argparse.ArgumentTypeError(f"invalid byte token {part!r}") try: value = int(token, 16) except ValueError as exc: raise argparse.ArgumentTypeError(f"invalid byte token {part!r}") from exc if not 0 <= value <= 0xFF: raise argparse.ArgumentTypeError(f"byte out of range {part!r}") values.append(value) if len(values) != TX_FRAME_LENGTH: raise argparse.ArgumentTypeError(f"target frame must contain exactly {TX_FRAME_LENGTH} bytes") return bytes(values) def _frame_diff_text(current: bytes, target: bytes) -> str: diffs = [ f"{index}:{current[index]:02X}!={target[index]:02X}" for index in range(min(len(current), len(target))) if current[index] != target[index] ] if len(current) != len(target): diffs.append(f"length:{len(current)}!={len(target)}") return "none" if not diffs else " ".join(diffs) def _target_comparison_line(current: bytes, target: bytes) -> str: pre_checksum_diffs = [ str(index) for index in range(TX_FRAME_LENGTH - 1) if index < len(current) and index < len(target) and current[index] != target[index] ] return ( "target_frame=" f"target={target.hex(' ').upper()} current={current.hex(' ').upper()} " f"diffs={_frame_diff_text(current, target)} " f"pre_checksum_diffs={','.join(pre_checksum_diffs) if pre_checksum_diffs else 'none'}" ) @dataclass(frozen=True) class SCI1Snapshot: smr: int brr: int scr: int ssr: int tdr: int rdr: int tx_ready_delay: int | None = None def line(self) -> str: fields = [ f"SMR={self.smr:02X}", f"BRR={self.brr:02X}", f"SCR={self.scr:02X}", f"SSR={self.ssr:02X}", f"TDR={self.tdr:02X}", f"RDR={self.rdr:02X}", ] if self.tx_ready_delay is not None: fields.append(f"tx_ready_delay={self.tx_ready_delay}") return "sci1=" + " ".join(fields) @dataclass(frozen=True) class SCI1TXISummary: tie: bool te: bool tdre: bool vector_target: int | None priority: int interrupt_mask: int interrupt_depth: int def line(self) -> str: pending = self.tie and self.tdre and self.vector_target is not None serviceable = pending and self.interrupt_depth == 0 and self.priority > self.interrupt_mask vector = h16(self.vector_target) if self.vector_target is not None else "none" return ( "sci1_txi=" f"TIE={int(self.tie)} TE={int(self.te)} TDRE={int(self.tdre)} " f"vector={vector} priority={self.priority} mask={self.interrupt_mask} " f"depth={self.interrupt_depth} pending={int(pending)} serviceable={int(serviceable)}" ) @dataclass(frozen=True) class WatchSnapshot: pc: int step: int regs: tuple[int, ...] sp: int stack_words: tuple[tuple[int, int], ...] callers: tuple[tuple[int, int | None], ...] def line(self) -> str: regs = " ".join(f"R{idx}={h16(value)}" for idx, value in enumerate(self.regs)) stack = " ".join(f"{h16(address)}:{h16(value)}" for address, value in self.stack_words) if self.callers: callers = " ".join( f"{h16(return_address)}<-{h16(call_site)}" if call_site is not None else h16(return_address) for return_address, call_site in self.callers ) else: callers = "-" return f"step={self.step} pc={h16(self.pc)} sp={h16(self.sp)} {regs} stack=[{stack}] callers=[{callers}]" @dataclass(frozen=True) class TXFrameSnapshot: step: int pc: int label: str bytes_f850_f85d: bytes computed_checksum: int stored_checksum: int checksum_ok: bool @property def frame_bytes(self) -> bytes: return self.bytes_f850_f85d[TX_FRAME_START - TX_FRAME_SNAPSHOT_START :] def line(self) -> str: return ( f"step={self.step} pc={h16(self.pc)} {self.label} " f"F850-F85D={self.bytes_f850_f85d.hex(' ').upper()} " f"TX={self.frame_bytes.hex(' ').upper()} " f"computed={self.computed_checksum:02X} stored={self.stored_checksum:02X} " f"checksum_ok={int(self.checksum_ok)}" ) @dataclass(frozen=True) class TXFrameWriteTrace: step: int pc: int address: int old_value: int new_value: int frame_after: bytes instruction: str regs: tuple[int, ...] target_value: int | None = None @property def area(self) -> str: if TX_FRAME_START <= self.address < TX_FRAME_START + TX_FRAME_LENGTH: return f"TX[{self.address - TX_FRAME_START}]" return f"stage[{self.address - TX_FRAME_SNAPSHOT_START}]" def line(self) -> str: regs = " ".join(f"R{idx}={h16(value)}" for idx, value in enumerate(self.regs)) target = "" if self.target_value is not None: verdict = "match" if self.new_value == self.target_value else "DIFF" target = f" target={self.target_value:02X} {verdict}" return ( f"step={self.step} pc={h16(self.pc)} {self.area} {h16(self.address)} " f"{self.old_value:02X}->{self.new_value:02X}{target} " f"frame_after={self.frame_after.hex(' ').upper()} regs=[{regs}] " f"instruction={self.instruction}" ) @dataclass(frozen=True) class ReportQueueTrace: step: int pc: int kind: str head: int tail: int instruction: str regs: tuple[int, ...] address: int | None = None queue_index: int | None = None old_value: int | None = None new_value: int | None = None old_word: int | None = None new_word: int | None = None candidate_word: int | None = None def line(self) -> str: parts = [ f"step={self.step}", f"pc={h16(self.pc)}", self.kind, f"head={self.head:02X}", f"tail={self.tail:02X}", f"depth={_report_queue_depth(self.head, self.tail):02X}", ] if self.queue_index is not None: parts.append(f"slot={self.queue_index:02X}") if self.address is not None: parts.append(f"addr={h16(self.address)}") if self.old_value is not None and self.new_value is not None: parts.append(f"byte={self.old_value:02X}->{self.new_value:02X}") if self.old_word is not None: if self.new_word is None: parts.append(f"word={h16(self.old_word)}") else: parts.append(f"word={h16(self.old_word)}->{h16(self.new_word)}") if self.candidate_word is not None: parts.append(f"candidate={h16(self.candidate_word)}") regs = " ".join(f"R{idx}={h16(value)}" for idx, value in enumerate(self.regs)) parts.append(f"regs=[{regs}]") parts.append(f"instruction={self.instruction}") return " ".join(parts) @dataclass(frozen=True) class RAMLifecycleTrace: step: int pc: int address: int name: str old_value: int new_value: int instruction: str regs: tuple[int, ...] def line(self) -> str: regs = " ".join(f"R{idx}={h16(value)}" for idx, value in enumerate(self.regs)) return ( f"step={self.step} pc={h16(self.pc)} {self.name} {h16(self.address)} " f"{self.old_value:02X}->{self.new_value:02X} regs=[{regs}] " f"instruction={self.instruction}" ) @dataclass(frozen=True) class ReportGateTrace: step: int pc: int label: str f9c4: int faa5: int f9c3: int head: int tail: int regs: tuple[int, ...] z: bool c: bool n: bool decision: str f9c4_last_write_step: int | None = None f9c4_last_write_pc: int | None = None f9c4_last_write_value: int | None = None f9c4_last_write_age: int | None = None f9c4_last_nonzero_step: int | None = None f9c4_last_nonzero_pc: int | None = None f9c4_last_nonzero_value: int | None = None f9c4_last_nonzero_age: int | None = None def line(self) -> str: regs = " ".join(f"R{idx}={h16(value)}" for idx, value in enumerate(self.regs)) lifecycle_parts = [] if ( self.f9c4_last_write_step is not None and self.f9c4_last_write_pc is not None and self.f9c4_last_write_value is not None ): lifecycle_parts.append( "F9C4_last=" f"step={self.f9c4_last_write_step}@{h16(self.f9c4_last_write_pc)} " f"value={self.f9c4_last_write_value:02X} age={self.f9c4_last_write_age}" ) if ( self.f9c4_last_nonzero_step is not None and self.f9c4_last_nonzero_pc is not None and self.f9c4_last_nonzero_value is not None ): lifecycle_parts.append( "F9C4_last_nonzero=" f"step={self.f9c4_last_nonzero_step}@{h16(self.f9c4_last_nonzero_pc)} " f"value={self.f9c4_last_nonzero_value:02X} age={self.f9c4_last_nonzero_age}" ) lifecycle = "" if lifecycle_parts: lifecycle = " " + " ".join(lifecycle_parts) return ( f"step={self.step} pc={h16(self.pc)} {self.label} " f"F9C4={self.f9c4:02X} FAA5={self.faa5:02X}/bit7={(self.faa5 >> 7) & 1} " f"F9C3={self.f9c3:02X} head={self.head:02X} tail={self.tail:02X} " f"depth={_report_queue_depth(self.head, self.tail):02X} " f"Z={int(self.z)} C={int(self.c)} N={int(self.n)} " f"decision={self.decision}{lifecycle} regs=[{regs}]" ) @dataclass class ProbeReport: steps: int pc: int stopped_reason: str hot_pcs: Counter[int] = field(default_factory=Counter) tx_bytes: bytes = b"" p9_bytes: list[int] = field(default_factory=list) p9_fast_bytes: list[int] = field(default_factory=list) p9_fast_events: int = 0 p9_accesses: list[str] = field(default_factory=list) sci_accesses: list[str] = field(default_factory=list) sci1: SCI1Snapshot | None = None sci1_txi: SCI1TXISummary | None = None watch_snapshots: list[WatchSnapshot] = field(default_factory=list) tx_frame_snapshots: list[TXFrameSnapshot] = field(default_factory=list) tx_frame_write_traces: list[TXFrameWriteTrace] = field(default_factory=list) tx_target_divergences: list[TXFrameWriteTrace] = field(default_factory=list) report_queue_traces: list[ReportQueueTrace] = field(default_factory=list) report_queue_first_writes: list[ReportQueueTrace] = field(default_factory=list) report_queue_first_nonzero_writes: list[ReportQueueTrace] = field(default_factory=list) report_queue_watch_hits: list[ReportQueueTrace] = field(default_factory=list) report_gate_traces: list[ReportGateTrace] = field(default_factory=list) ram_lifecycle_traces: list[RAMLifecycleTrace] = field(default_factory=list) ram_lifecycle_last_writes: list[RAMLifecycleTrace] = field(default_factory=list) ram_lifecycle_last_nonzero_writes: list[RAMLifecycleTrace] = field(default_factory=list) final_tx_frame: bytes = b"" target_frame: bytes | None = None unsupported: str | None = None def lines(self, hot_limit: int = 12) -> list[str]: lines = [ f"steps={self.steps}", f"pc={h16(self.pc)}", f"stopped={self.stopped_reason}", "tx_bytes=" + _format_bytes(self.tx_bytes), "tx_6byte_recent=" + _format_six_byte_frames(self.tx_bytes), "p9_bytes=" + " ".join(f"{byte:02X}" for byte in self.p9_bytes[-32:]), "p9_fast_bytes=" + " ".join(f"{byte:02X}" for byte in self.p9_fast_bytes[-32:]), f"p9_fast_events={self.p9_fast_events}", "hot_pcs=" + ", ".join(f"{h16(pc)}:{count}" for pc, count in self.hot_pcs.most_common(hot_limit)), ] if self.sci1: lines.append(self.sci1.line()) if self.sci1_txi: lines.append(self.sci1_txi.line()) if self.unsupported: lines.append(f"unsupported={self.unsupported}") if self.target_frame is not None: current = self.final_tx_frame if not current and self.tx_frame_snapshots: current = self.tx_frame_snapshots[-1].frame_bytes lines.append(_target_comparison_line(current, self.target_frame)) if self.p9_accesses: lines.append("recent_p9:") lines.extend(" " + line for line in self.p9_accesses[-24:]) if self.sci_accesses: lines.append("recent_sci:") lines.extend(" " + line for line in self.sci_accesses[-16:]) if self.tx_frame_snapshots: lines.append("recent_tx_frame_snapshots:") lines.extend(" " + snapshot.line() for snapshot in self.tx_frame_snapshots) if self.tx_frame_write_traces: lines.append("recent_tx_frame_writes:") lines.extend(" " + trace.line() for trace in self.tx_frame_write_traces) if self.tx_target_divergences: lines.append("first_target_divergences:") lines.extend(" " + trace.line() for trace in self.tx_target_divergences) if self.report_queue_traces: lines.append("recent_report_queue:") lines.extend(" " + trace.line() for trace in self.report_queue_traces) if self.report_queue_first_nonzero_writes: lines.append("first_report_queue_nonzero_writes:") shown = self.report_queue_first_nonzero_writes[:REPORT_QUEUE_FIRST_NONZERO_DISPLAY_LIMIT] lines.extend(" " + trace.line() for trace in shown) remaining = len(self.report_queue_first_nonzero_writes) - len(shown) if remaining > 0: lines.append(f" ... {remaining} more first nonzero queue writes") if self.report_queue_watch_hits: lines.append("report_queue_watch_hits:") lines.extend(" " + trace.line() for trace in self.report_queue_watch_hits) if self.report_gate_traces: lines.append("recent_report_gates:") lines.extend(" " + trace.line() for trace in self.report_gate_traces) if self.ram_lifecycle_traces: lines.append("recent_ram_lifecycle:") lines.extend(" " + trace.line() for trace in self.ram_lifecycle_traces) if self.ram_lifecycle_last_writes: lines.append("ram_lifecycle_last_writes:") lines.extend(" " + trace.line() for trace in self.ram_lifecycle_last_writes) if self.ram_lifecycle_last_nonzero_writes: lines.append("ram_lifecycle_last_nonzero_writes:") lines.extend(" " + trace.line() for trace in self.ram_lifecycle_last_nonzero_writes) if self.watch_snapshots: lines.append("recent_watch_snapshots:") lines.extend(" " + snapshot.line() for snapshot in self.watch_snapshots) return lines def _sci1_snapshot(emulator: H8536Emulator) -> SCI1Snapshot: sci1 = emulator.sci1 return SCI1Snapshot( smr=sci1.smr, brr=sci1.brr, scr=sci1.scr, ssr=sci1.ssr, tdr=sci1.tdr, rdr=sci1.rdr, tx_ready_delay=getattr(sci1, "tx_ready_delay", None), ) def _sci1_txi_summary(emulator: H8536Emulator) -> SCI1TXISummary: return SCI1TXISummary( tie=bool(emulator.sci1.scr & SCI_SCR_TIE), te=bool(emulator.sci1.scr & SCI_SCR_TE), tdre=bool(emulator.sci1.ssr & SCI_SSR_TDRE), vector_target=emulator._vector_target(VECTOR_SCI1_TXI), priority=emulator._sci1_priority(), interrupt_mask=emulator._interrupt_mask(), interrupt_depth=emulator.cpu.interrupt_depth, ) def parse_watch_pc(text: str) -> int: try: value = parse_int(text) except ValueError: value = int(text, 16) return value & 0xFFFF def _likely_call_site(emulator: H8536Emulator, return_address: int) -> int | None: rom = emulator.memory.rom for size in (2, 3): candidate = (return_address - size) & 0xFFFF if not rom.contains(candidate, size): continue opcode = rom.u8(candidate) if size == 2 and opcode == 0x0E: return candidate if size == 3 and opcode in (0x1E, 0x18): return candidate return None def _watch_snapshot(emulator: H8536Emulator, *, stack_words: int = 6) -> WatchSnapshot: sp = emulator.cpu.regs[7] & 0xFFFF words: list[tuple[int, int]] = [] callers: list[tuple[int, int | None]] = [] seen_callers: set[int] = set() for offset in range(0, stack_words * 2, 2): address = (sp + offset) & 0xFFFF try: value = emulator.memory.read16(address) except Exception: continue words.append((address, value)) if value in seen_callers or not emulator.memory.rom.contains(value): continue seen_callers.add(value) callers.append((value, _likely_call_site(emulator, value))) return WatchSnapshot( pc=emulator.cpu.pc & 0xFFFF, step=emulator.cpu.steps, regs=tuple(register & 0xFFFF for register in emulator.cpu.regs), sp=sp, stack_words=tuple(words), callers=tuple(callers), ) def _ram_window(emulator: H8536Emulator, start: int, length: int) -> bytes: offset = start - ON_CHIP_RAM_START return bytes(emulator.memory.ram[offset : offset + length]) def _tx_frame_checksum(frame: bytes) -> int: checksum = 0x5A for byte in frame[: TX_FRAME_LENGTH - 1]: checksum ^= byte return checksum & 0xFF def _tx_frame_snapshot(emulator: H8536Emulator, label: str) -> TXFrameSnapshot: data = _ram_window(emulator, TX_FRAME_SNAPSHOT_START, 0x0E) frame = data[TX_FRAME_START - TX_FRAME_SNAPSHOT_START :] computed = _tx_frame_checksum(frame) stored = frame[TX_FRAME_LENGTH - 1] return TXFrameSnapshot( step=emulator.cpu.steps, pc=emulator.cpu.pc & 0xFFFF, label=label, bytes_f850_f85d=data, computed_checksum=computed, stored_checksum=stored, checksum_ok=computed == stored, ) def _watched_frame_values(emulator: H8536Emulator) -> dict[int, int]: data = _ram_window(emulator, TX_FRAME_TRACE_START, TX_FRAME_TRACE_END - TX_FRAME_TRACE_START + 1) return {TX_FRAME_TRACE_START + index: value for index, value in enumerate(data)} def _frame_from_watched_values(values: dict[int, int]) -> bytes: return bytes(values.get(TX_FRAME_START + index, 0) & 0xFF for index in range(TX_FRAME_LENGTH)) def _report_queue_depth(head: int, tail: int) -> int: return (head - tail) & 0x7F def _report_queue_index(address: int) -> int | None: if not REPORT_QUEUE_START <= address <= REPORT_QUEUE_END: return None return (address - REPORT_QUEUE_START) // 2 def _watched_report_queue_values(emulator: H8536Emulator) -> dict[int, int]: data = _ram_window(emulator, REPORT_QUEUE_START, REPORT_QUEUE_END - REPORT_QUEUE_START + 1) values = {REPORT_QUEUE_START + index: value for index, value in enumerate(data)} values[REPORT_QUEUE_HEAD] = _ram_window(emulator, REPORT_QUEUE_HEAD, 1)[0] values[REPORT_QUEUE_TAIL] = _ram_window(emulator, REPORT_QUEUE_TAIL, 1)[0] return values def _report_queue_word(values: dict[int, int], queue_index: int) -> int: address = REPORT_QUEUE_START + ((queue_index & 0x7F) * 2) return ((values.get(address, 0) & 0xFF) << 8) | (values.get(address + 1, 0) & 0xFF) def _report_queue_state(values: dict[int, int]) -> tuple[int, int]: return values.get(REPORT_QUEUE_HEAD, 0) & 0x7F, values.get(REPORT_QUEUE_TAIL, 0) & 0x7F def _report_queue_entry_trace(emulator: H8536Emulator, values: dict[int, int], label: str) -> ReportQueueTrace: head, tail = _report_queue_state(values) queue_index = head if label == "enqueue_entry" else tail candidate = emulator.cpu.regs[3] if label == "enqueue_entry" else _report_queue_word(values, queue_index) return ReportQueueTrace( step=emulator.cpu.steps, pc=emulator.cpu.pc & 0xFFFF, kind=label, head=head, tail=tail, queue_index=queue_index, old_word=_report_queue_word(values, queue_index), candidate_word=candidate & 0xFFFF, instruction="", regs=tuple(register & 0xFFFF for register in emulator.cpu.regs), ) def _report_queue_access_traces( emulator: H8536Emulator, *, pc: int, instruction: str, accesses: list, values: dict[int, int], ) -> list[ReportQueueTrace]: queue_accesses: dict[int, dict[str, object]] = {} traces: list[ReportQueueTrace] = [] regs = tuple(register & 0xFFFF for register in emulator.cpu.regs) for access in accesses: queue_index = _report_queue_index(access.address) if queue_index is not None: group = queue_accesses.setdefault( queue_index, { "address": REPORT_QUEUE_START + queue_index * 2, "old_word": _report_queue_word(values, queue_index), "read": False, "write": False, }, ) group[access.kind] = True if access.kind == "write": values[access.address] = access.value & 0xFF continue if access.kind != "write" or access.address not in (REPORT_QUEUE_HEAD, REPORT_QUEUE_TAIL): continue old_value = values.get(access.address, 0) & 0xFF values[access.address] = access.value & 0xFF head, tail = _report_queue_state(values) traces.append( ReportQueueTrace( step=emulator.cpu.steps, pc=pc, kind="cursor_head_write" if access.address == REPORT_QUEUE_HEAD else "cursor_tail_write", address=access.address, old_value=old_value, new_value=access.value & 0xFF, head=head, tail=tail, instruction=instruction, regs=regs, ) ) for queue_index, group in sorted(queue_accesses.items()): head, tail = _report_queue_state(values) write = bool(group["write"]) traces.append( ReportQueueTrace( step=emulator.cpu.steps, pc=pc, kind="queue_write" if write else "queue_read", address=int(group["address"]), queue_index=queue_index, old_word=int(group["old_word"]), new_word=_report_queue_word(values, queue_index) if write else None, head=head, tail=tail, instruction=instruction, regs=regs, ) ) return traces def _report_queue_trace_matches_watch(trace: ReportQueueTrace, watch_ids: set[int]) -> bool: if not watch_ids: return False candidates = (trace.old_word, trace.new_word, trace.candidate_word) return any(value is not None and (value & 0xFFFF) in watch_ids for value in candidates) def _ram_byte(emulator: H8536Emulator, address: int) -> int: return _ram_window(emulator, address, 1)[0] def _raw_memory_byte(emulator: H8536Emulator, address: int) -> int: address &= 0xFFFF if ON_CHIP_RAM_START <= address <= ON_CHIP_RAM_END: return emulator.memory.ram[address - ON_CHIP_RAM_START] if REGISTER_FIELD_START <= address <= REGISTER_FIELD_END: return emulator.memory.registers[address - REGISTER_FIELD_START] if address in emulator.memory.external: return emulator.memory.external[address] if emulator.memory.rom.contains(address): return emulator.memory.rom.u8(address) return 0 def _ram_lifecycle_watch_names(extra_addresses: list[int] | tuple[int, ...]) -> dict[int, str]: names = dict(RAM_LIFECYCLE_DEFAULT_WATCHES) for address in extra_addresses: normalized = address & 0xFFFF names.setdefault(normalized, f"ram_{normalized:04X}") return names def _ram_lifecycle_access_traces( emulator: H8536Emulator, *, pc: int, instruction: str, accesses: list, values: dict[int, int], watch_names: dict[int, str], ) -> list[RAMLifecycleTrace]: traces: list[RAMLifecycleTrace] = [] regs = tuple(register & 0xFFFF for register in emulator.cpu.regs) for access in accesses: address = access.address & 0xFFFF if access.kind != "write" or address not in watch_names: continue old_value = values.get(address, 0) & 0xFF new_value = access.value & 0xFF values[address] = new_value traces.append( RAMLifecycleTrace( step=emulator.cpu.steps, pc=pc, address=address, name=watch_names[address], old_value=old_value, new_value=new_value, instruction=instruction, regs=regs, ) ) return traces def _report_gate_decision(pc: int, *, f9c4: int, faa5: int, f9c3: int, head: int, tail: int, z: bool) -> str: if pc == 0x4046: return "f9c4_zero_continue" if f9c4 == 0 else "f9c4_nonzero_return" if pc == 0x404A: return "return_f9c4_nonzero" if not z else "continue_f9c4_zero" if pc == 0x404C: return "faa5_bit7_clear_skips_f9c3" if not (faa5 & 0x80) else "faa5_bit7_set_check_f9c3" if pc == 0x4050: return "enqueue_candidate_faa5_clear" if z else "check_f9c3_faa5_set" if pc == 0x4052: return "f9c3_zero_enqueue" if f9c3 == 0 else "f9c3_nonzero_return" if pc == 0x4056: return "enqueue_candidate_f9c3_zero" if z else "return_f9c3_nonzero" if pc == 0x4059: return "check_empty_queue" if pc == 0x405F: return "queue_empty" if head == tail else "queue_not_empty" if pc == 0x4063: return "return_queue_not_empty" if not z else "enqueue_zero_report" if pc == 0x4067: return "write_report_0000_to_queue_slot" if pc == 0x406C: return "advance_report_queue_head" if pc == 0x4070: return "mask_report_queue_head_bit7" if pc in (0x4058, 0x4074): return "return" return "snapshot" def _ram_lifecycle_age(emulator: H8536Emulator, trace: RAMLifecycleTrace | None) -> int | None: if trace is None: return None return emulator.cpu.steps - trace.step def _report_gate_trace( emulator: H8536Emulator, label: str, *, last_ram_lifecycle_writes: dict[int, RAMLifecycleTrace] | None = None, last_ram_lifecycle_nonzero_writes: dict[int, RAMLifecycleTrace] | None = None, ) -> ReportGateTrace: f9c4 = _ram_byte(emulator, REPORT_GATE_F9C4) faa5 = _ram_byte(emulator, REPORT_GATE_FAA5) f9c3 = _ram_byte(emulator, REPORT_GATE_F9C3) head = _ram_byte(emulator, REPORT_QUEUE_HEAD) & 0x7F tail = _ram_byte(emulator, REPORT_QUEUE_TAIL) & 0x7F pc = emulator.cpu.pc & 0xFFFF last_f9c4 = (last_ram_lifecycle_writes or {}).get(REPORT_GATE_F9C4) last_nonzero_f9c4 = (last_ram_lifecycle_nonzero_writes or {}).get(REPORT_GATE_F9C4) return ReportGateTrace( step=emulator.cpu.steps, pc=pc, label=label, f9c4=f9c4, faa5=faa5, f9c3=f9c3, head=head, tail=tail, regs=tuple(register & 0xFFFF for register in emulator.cpu.regs), z=emulator.cpu.z, c=emulator.cpu.c, n=emulator.cpu.n, decision=_report_gate_decision( pc, f9c4=f9c4, faa5=faa5, f9c3=f9c3, head=head, tail=tail, z=emulator.cpu.z, ), f9c4_last_write_step=last_f9c4.step if last_f9c4 is not None else None, f9c4_last_write_pc=last_f9c4.pc if last_f9c4 is not None else None, f9c4_last_write_value=last_f9c4.new_value if last_f9c4 is not None else None, f9c4_last_write_age=_ram_lifecycle_age(emulator, last_f9c4), f9c4_last_nonzero_step=last_nonzero_f9c4.step if last_nonzero_f9c4 is not None else None, f9c4_last_nonzero_pc=last_nonzero_f9c4.pc if last_nonzero_f9c4 is not None else None, f9c4_last_nonzero_value=last_nonzero_f9c4.new_value if last_nonzero_f9c4 is not None else None, f9c4_last_nonzero_age=_ram_lifecycle_age(emulator, last_nonzero_f9c4), ) def run_probe( rom_bytes: bytes, *, max_steps: int, interval_steps: int, stop_on_tx: bool, p9_log_limit: int, frt1_ocia_steps: int = 1024, frt2_ocia_steps: int = 1024, p9_fast_path: bool = False, p9_fast_input: int = 0xFF, sci_log_limit: int = 32, watch_pcs: list[int] | tuple[int, ...] | None = None, watch_snapshot_limit: int = 32, watch_pc_limit: int = 8, watch_min_interval: int = 1024, tx_frame_watch: bool = True, tx_frame_snapshot_limit: int = 32, trace_frame_sources: bool = False, frame_write_trace_limit: int = 64, target_frame: bytes | None = None, trace_report_queue: bool = False, report_queue_trace_limit: int = 64, watch_report_ids: list[int] | tuple[int, ...] = (), report_queue_watch_hit_limit: int = 32, trace_report_gates: bool = False, report_gate_trace_limit: int = 64, trace_ram_lifecycle: bool = False, ram_watch_addresses: list[int] | tuple[int, ...] = (), ram_lifecycle_trace_limit: int = 64, ) -> ProbeReport: emulator = H8536Emulator( rom_bytes, interval_steps=interval_steps, frt1_ocia_steps=frt1_ocia_steps, frt2_ocia_steps=frt2_ocia_steps, p9_fast_path_enabled=p9_fast_path, p9_fast_default_input_byte=p9_fast_input, ) hot_pcs: Counter[int] = Counter() p9_accesses: list[str] = [] sci_accesses: list[str] = [] snapshots: list[WatchSnapshot] = [] tx_frame_snapshots: list[TXFrameSnapshot] = [] tx_frame_write_traces: list[TXFrameWriteTrace] = [] first_target_divergences: dict[int, TXFrameWriteTrace] = {} watched_frame_values = _watched_frame_values(emulator) report_queue_traces: list[ReportQueueTrace] = [] first_report_queue_writes: dict[int, ReportQueueTrace] = {} first_report_queue_nonzero_writes: dict[int, ReportQueueTrace] = {} report_queue_watch_hits: list[ReportQueueTrace] = [] report_gate_traces: list[ReportGateTrace] = [] ram_lifecycle_traces: list[RAMLifecycleTrace] = [] last_ram_lifecycle_writes: dict[int, RAMLifecycleTrace] = {} last_ram_lifecycle_nonzero_writes: dict[int, RAMLifecycleTrace] = {} ram_lifecycle_watch_names = _ram_lifecycle_watch_names(ram_watch_addresses) ram_lifecycle_values = { address: _raw_memory_byte(emulator, address) for address in ram_lifecycle_watch_names } watch_report_id_set = {value & 0xFFFF for value in watch_report_ids} watched_report_queue_values = _watched_report_queue_values(emulator) tx_builder_seen = False watch_set = set(DEFAULT_WATCH_PCS if watch_pcs is None else watch_pcs) watch_counts: Counter[int] = Counter() watch_last_step: dict[int, int] = {} stopped_reason = "max_steps" unsupported: str | None = None last_access_index = 0 for _ in range(max_steps): pc = emulator.cpu.pc hot_pcs[pc] += 1 if pc == 0xBA26: tx_builder_seen = True if trace_report_queue and pc in REPORT_QUEUE_ENTRY_PCS: report_queue_traces.append( _report_queue_entry_trace(emulator, watched_report_queue_values, REPORT_QUEUE_ENTRY_PCS[pc]) ) if ( _report_queue_trace_matches_watch(report_queue_traces[-1], watch_report_id_set) and len(report_queue_watch_hits) < report_queue_watch_hit_limit ): report_queue_watch_hits.append(report_queue_traces[-1]) if len(report_queue_traces) > report_queue_trace_limit: del report_queue_traces[: len(report_queue_traces) - report_queue_trace_limit] if trace_report_gates and pc in REPORT_GATE_PCS: report_gate_traces.append( _report_gate_trace( emulator, REPORT_GATE_PCS[pc], last_ram_lifecycle_writes=last_ram_lifecycle_writes, last_ram_lifecycle_nonzero_writes=last_ram_lifecycle_nonzero_writes, ) ) if len(report_gate_traces) > report_gate_trace_limit: del report_gate_traces[: len(report_gate_traces) - report_gate_trace_limit] if tx_frame_watch and pc in TX_FRAME_WATCH_PCS: tx_frame_snapshots.append(_tx_frame_snapshot(emulator, TX_FRAME_WATCH_PCS[pc])) if len(tx_frame_snapshots) > tx_frame_snapshot_limit: del tx_frame_snapshots[: len(tx_frame_snapshots) - tx_frame_snapshot_limit] if pc in watch_set and watch_counts[pc] < watch_pc_limit: last_step = watch_last_step.get(pc) if last_step is None or emulator.cpu.steps - last_step >= watch_min_interval: snapshots.append(_watch_snapshot(emulator)) if len(snapshots) > watch_snapshot_limit: del snapshots[: len(snapshots) - watch_snapshot_limit] watch_counts[pc] += 1 watch_last_step[pc] = emulator.cpu.steps last_access_index = len(emulator.memory.access_log) try: instruction = emulator.step() except UnsupportedInstruction as exc: stopped_reason = "unsupported_instruction" unsupported = str(exc) break recent_accesses = emulator.memory.access_log[last_access_index:] if trace_report_queue: new_report_queue_traces = _report_queue_access_traces( emulator, pc=pc, instruction=instruction, accesses=recent_accesses, values=watched_report_queue_values, ) for trace in new_report_queue_traces: if trace.kind == "queue_write" and trace.queue_index not in first_report_queue_writes: first_report_queue_writes[trace.queue_index] = trace if ( trace.kind == "queue_write" and trace.queue_index not in first_report_queue_nonzero_writes and trace.new_word is not None and trace.new_word != 0 ): first_report_queue_nonzero_writes[trace.queue_index] = trace if ( _report_queue_trace_matches_watch(trace, watch_report_id_set) and len(report_queue_watch_hits) < report_queue_watch_hit_limit ): report_queue_watch_hits.append(trace) report_queue_traces.extend(new_report_queue_traces) if len(report_queue_traces) > report_queue_trace_limit: del report_queue_traces[: len(report_queue_traces) - report_queue_trace_limit] if trace_ram_lifecycle: new_ram_lifecycle_traces = _ram_lifecycle_access_traces( emulator, pc=pc, instruction=instruction, accesses=recent_accesses, values=ram_lifecycle_values, watch_names=ram_lifecycle_watch_names, ) for trace in new_ram_lifecycle_traces: last_ram_lifecycle_writes[trace.address] = trace if trace.new_value != 0: last_ram_lifecycle_nonzero_writes[trace.address] = trace ram_lifecycle_traces.extend(new_ram_lifecycle_traces) if len(ram_lifecycle_traces) > ram_lifecycle_trace_limit: del ram_lifecycle_traces[: len(ram_lifecycle_traces) - ram_lifecycle_trace_limit] for access in recent_accesses: if ( trace_frame_sources and access.kind == "write" and TX_FRAME_TRACE_START <= access.address <= TX_FRAME_TRACE_END ): old_value = watched_frame_values.get(access.address, 0) watched_frame_values[access.address] = access.value & 0xFF target_value = None if target_frame is not None and TX_FRAME_START <= access.address < TX_FRAME_START + TX_FRAME_LENGTH: target_value = target_frame[access.address - TX_FRAME_START] tx_frame_write_traces.append( TXFrameWriteTrace( step=emulator.cpu.steps, pc=pc, address=access.address, old_value=old_value, new_value=access.value, frame_after=_frame_from_watched_values(watched_frame_values), instruction=instruction, regs=tuple(register & 0xFFFF for register in emulator.cpu.regs), target_value=target_value, ) ) trace = tx_frame_write_traces[-1] if ( tx_builder_seen and target_value is not None and access.value != target_value and access.address not in first_target_divergences ): first_target_divergences[access.address] = trace if len(tx_frame_write_traces) > frame_write_trace_limit: del tx_frame_write_traces[: len(tx_frame_write_traces) - frame_write_trace_limit] if access.address in (P9DDR, P9DR): p9_accesses.append(f"{h16(pc)} {access.kind} {h16(access.address)}={access.value:02X}") if len(p9_accesses) > p9_log_limit: del p9_accesses[: len(p9_accesses) - p9_log_limit] elif access.address == SCI1_TDR: sci_accesses.append(f"{h16(pc)} {access.kind} TDR={access.value:02X}") elif access.address in SCI1_PROBE_REGISTERS: name = SCI1_PROBE_REGISTERS[access.address] sci_accesses.append(f"{h16(pc)} {access.kind} {name}={access.value:02X}") if len(sci_accesses) > sci_log_limit: del sci_accesses[: len(sci_accesses) - sci_log_limit] last_access_index = len(emulator.memory.access_log) if stop_on_tx and emulator.sci1.tx_bytes: stopped_reason = "tx" break if emulator.sci1.saw_heartbeat(): stopped_reason = "heartbeat" break return ProbeReport( steps=emulator.cpu.steps, pc=emulator.cpu.pc, stopped_reason=stopped_reason, hot_pcs=hot_pcs, tx_bytes=bytes(emulator.sci1.tx_bytes), p9_bytes=list(emulator.memory.p9_bus.byte_candidates), p9_fast_bytes=list(emulator.p9_fast_path.output_bytes), p9_fast_events=len(emulator.p9_fast_path.events), p9_accesses=p9_accesses, sci_accesses=sci_accesses, sci1=_sci1_snapshot(emulator), sci1_txi=_sci1_txi_summary(emulator), watch_snapshots=snapshots, tx_frame_snapshots=tx_frame_snapshots, tx_frame_write_traces=tx_frame_write_traces, tx_target_divergences=[first_target_divergences[address] for address in sorted(first_target_divergences)], report_queue_traces=report_queue_traces, report_queue_first_writes=[first_report_queue_writes[index] for index in sorted(first_report_queue_writes)], report_queue_first_nonzero_writes=[ first_report_queue_nonzero_writes[index] for index in sorted(first_report_queue_nonzero_writes) ], report_queue_watch_hits=report_queue_watch_hits, report_gate_traces=report_gate_traces, ram_lifecycle_traces=ram_lifecycle_traces, ram_lifecycle_last_writes=[ last_ram_lifecycle_writes[address] for address in sorted(last_ram_lifecycle_writes) ], ram_lifecycle_last_nonzero_writes=[ last_ram_lifecycle_nonzero_writes[address] for address in sorted(last_ram_lifecycle_nonzero_writes) ], final_tx_frame=_ram_window(emulator, TX_FRAME_START, TX_FRAME_LENGTH), target_frame=target_frame, unsupported=unsupported, ) def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Probe H8/536 emulator progress and likely hold-ups") parser.add_argument("--rom", type=Path, help="ROM image path; defaults to the repo ROM image") parser.add_argument("--max-steps", type=int, default=250_000) parser.add_argument("--interval-steps", type=int, default=512) parser.add_argument("--frt1-ocia-steps", type=int, default=1024) parser.add_argument("--frt2-ocia-steps", type=int, default=1024) parser.add_argument("--stop-on-tx", action="store_true", help="stop when SCI1 TDR emits the first byte") parser.add_argument("--p9-fast-path", action="store_true", help="shortcut known P9 bit-banged transfer routines for exploration") parser.add_argument("--p9-fast-input", type=parse_int, default=0xFF) parser.add_argument("--p9-log-limit", type=int, default=80) parser.add_argument("--sci-log-limit", type=int, default=32) parser.add_argument("--hot-limit", type=int, default=12) parser.add_argument( "--watch-pc", action="append", type=parse_watch_pc, default=[], help="additional PC to snapshot when hit, e.g. C08B, 0xC08B, or H'C08B", ) parser.add_argument("--watch-snapshot-limit", type=int, default=32) parser.add_argument("--watch-pc-limit", type=int, default=8) parser.add_argument("--watch-min-interval", type=int, default=1024) parser.add_argument( "--tx-frame-watch", action=argparse.BooleanOptionalAction, default=True, help="snapshot F850-F85D at known TX builder/send PCs", ) parser.add_argument("--tx-frame-snapshot-limit", type=int, default=32) parser.add_argument( "--trace-frame-sources", action="store_true", help="trace writes to TX staging/frame RAM H'F850-H'F85D with PC/register provenance", ) parser.add_argument("--frame-write-trace-limit", type=int, default=64) parser.add_argument( "--target-frame", type=parse_tx_frame, help="6-byte frame to compare against, e.g. \"00 00 00 00 80 DA\" or 0000000080DA", ) parser.add_argument( "--trace-report-queue", action="store_true", help="trace autonomous report queue entries/cursors at H'F870-H'F96F, H'F9B0, and H'F9B5", ) parser.add_argument("--report-queue-trace-limit", type=int, default=64) parser.add_argument( "--watch-report-id", action="append", type=parse_watch_pc, default=[], help="highlight queue traces involving a logical report id, e.g. 0015 or 0x0015", ) parser.add_argument("--report-queue-watch-hit-limit", type=int, default=32) parser.add_argument( "--trace-report-gates", action="store_true", help="trace loc_4046 report enqueue gates: F9C4, FAA5.bit7, F9C3, and F9B0/F9B5", ) parser.add_argument("--report-gate-trace-limit", type=int, default=64) parser.add_argument( "--trace-ram-lifecycle", action="store_true", help="trace writes to key RAM lifecycle bytes such as F9C4/F9C3/FAA5/F9B0/F9B5", ) parser.add_argument( "--ram-watch", action="append", type=parse_watch_pc, default=[], help="additional byte address to include in RAM lifecycle tracing, e.g. F9C4 or 0xF9C4", ) parser.add_argument("--ram-lifecycle-trace-limit", type=int, default=64) return parser def main(argv: list[str] | None = None) -> int: args = build_arg_parser().parse_args(argv) try: rom_bytes, rom_path = load_rom(args.rom) except FileNotFoundError as exc: print(str(exc)) return 2 print(f"rom={rom_path}") report = run_probe( rom_bytes, max_steps=args.max_steps, interval_steps=args.interval_steps, frt1_ocia_steps=args.frt1_ocia_steps, frt2_ocia_steps=args.frt2_ocia_steps, stop_on_tx=args.stop_on_tx, p9_log_limit=args.p9_log_limit, p9_fast_path=args.p9_fast_path, p9_fast_input=args.p9_fast_input, sci_log_limit=args.sci_log_limit, watch_pcs=tuple(dict.fromkeys((*DEFAULT_WATCH_PCS, *args.watch_pc))), watch_snapshot_limit=args.watch_snapshot_limit, watch_pc_limit=args.watch_pc_limit, watch_min_interval=args.watch_min_interval, tx_frame_watch=args.tx_frame_watch, tx_frame_snapshot_limit=args.tx_frame_snapshot_limit, trace_frame_sources=args.trace_frame_sources or args.target_frame is not None, frame_write_trace_limit=args.frame_write_trace_limit, target_frame=args.target_frame, trace_report_queue=args.trace_report_queue, report_queue_trace_limit=args.report_queue_trace_limit, watch_report_ids=tuple(args.watch_report_id), report_queue_watch_hit_limit=args.report_queue_watch_hit_limit, trace_report_gates=args.trace_report_gates, report_gate_trace_limit=args.report_gate_trace_limit, trace_ram_lifecycle=args.trace_ram_lifecycle, ram_watch_addresses=tuple(args.ram_watch), ram_lifecycle_trace_limit=args.ram_lifecycle_trace_limit, ) for line in report.lines(hot_limit=args.hot_limit): print(line) return 0