from __future__ import annotations import argparse from collections import Counter from dataclasses import dataclass, field from pathlib import Path from typing import Callable, Iterable from ..formatting import h16, parse_int from .cli import load_rom from .constants import ( IPRE, SCI_SCR_RE, SCI_SCR_RIE, SCI_SSR_RDRF, VECTOR_SCI1_RXI, ) from .errors import UnsupportedInstruction from .memory import MemoryAccess from .runner import H8536Emulator from .uart import UartTiming CHECKSUM_SEED = 0x5A FRAME_LENGTH = 6 CONNECT_LCD_FRAMES = ( bytes.fromhex("04000040001E"), bytes.fromhex("0400008000DE"), bytes.fromhex("040000C0009E"), ) WATCH_PCS = { 0xBB57: "sci1_eri_entry", 0xBB67: "sci1_rxi_entry", 0xBBD6: "rx_checksum_seed", 0xBBF0: "rx_checksum_compare", 0xBC08: "command_dispatch", 0xBD0E: "command_04_handler", 0xBCCD: "command_04_send", 0xBE05: "command_07_handler", 0xBE29: "rx_error_or_retry", 0xBA26: "tx_builder", 0xBA72: "tx_first_byte", 0xBA84: "txi_entry", 0x3ECC: "lcd_line_buffer_entry", 0x3F28: "lcd_driver_stage", 0x3F40: "lcd_port_writer", } WATCH_RANGES = ( (0x2CA6, 0x2D20, "connect_display_window"), (0xBB57, 0xBE6F, "sci1_rx_command_window"), ) ACCESS_RANGES = ( (0xF850, 0xF85D, "tx_staging_or_frame"), (0xF860, 0xF86D, "rx_validation_or_capture"), (0xF870, 0xF96F, "report_queue"), (0xF970, 0xF9AF, "secondary_dispatch_or_table"), (0xF9B0, 0xF9C8, "serial_gate_state"), (0xFAA2, 0xFAA6, "serial_latches"), (0xFAF0, 0xFAFF, "lcd_line_buffer"), (0xE000, 0xE001, "primary_table_index_0000"), (0xE400, 0xE401, "secondary_table_index_0000"), (0xE800, 0xE801, "current_table_index_0000"), (0xEC00, 0xEC01, "flag_table_index_0000"), (0xF200, 0xF201, "lcd_ports"), ) STATE_BYTES = { 0xF9B0: "queue_head", 0xF9B5: "queue_tail", 0xF9C0: "tx_gate", 0xF9C1: "rx_interbyte_timeout", 0xF9C3: "rx_index", 0xF9C4: "idle_heartbeat_gate", 0xF9C5: "rx_session_timeout", 0xF9C6: "resend_period_hi", 0xF9C7: "resend_period_lo", 0xF9C8: "resend_countdown", 0xFAA2: "session_flags", 0xFAA3: "pending_mask", 0xFAA4: "rx_error_latch", 0xFAA5: "retry_or_gate_flags", 0xFAA6: "retry_counter", } STATE_WORDS = { 0xE000: "E000_index_0000_primary", 0xE400: "E400_index_0000_secondary", 0xE800: "E800_index_0000_current", 0xF860: "rx_frame_01", 0xF862: "rx_frame_23", 0xF864: "rx_frame_45", 0xF970: "F970_selector_zero_dispatch", } @dataclass class RunContext: pc_hits: Counter[str] = field(default_factory=Counter) first_pcs: list[tuple[int, str]] = field(default_factory=list) unsupported: str | None = None def record_pc(self, pc: int) -> None: label = WATCH_PCS.get(pc) if label is None: for start, end, range_label in WATCH_RANGES: if start <= pc <= end: label = range_label break if label is None: return self.pc_hits[label] += 1 if len(self.first_pcs) < 32: self.first_pcs.append((pc, label)) @dataclass(frozen=True) class FrameResult: input_frame: bytes checksum_ok: bool rx_injection: str uart_byte_cycles: int | None steps: int stopped_reason: str new_tx_bytes: bytes new_tx_frames: list[bytes] state_before: dict[str, int | str] state_after: dict[str, int | str] accesses: list[MemoryAccess] context: RunContext def lines(self, index: int) -> list[str]: lines = [ f"host_frame[{index}]={format_frame(self.input_frame)} checksum_ok={int(self.checksum_ok)}", f" rx_injection={self.rx_injection}", f" stopped={self.stopped_reason} steps={self.steps}", f" new_tx_bytes={format_frame(self.new_tx_bytes) if self.new_tx_bytes else 'none'}", ] if self.new_tx_frames: lines.append(" new_tx_frames=" + " | ".join(format_frame(frame) for frame in self.new_tx_frames)) else: lines.append(" new_tx_frames=none") lcd_display = self.state_after.get("lcd_display_ascii") if isinstance(lcd_display, str): lines.append(f" lcd_display={lcd_display!r}") state_changes = _state_change_lines(self.state_before, self.state_after) if state_changes: lines.append(" state_changes:") lines.extend(f" {line}" for line in state_changes) pc_lines = _pc_hit_lines(self.context) if pc_lines: lines.append(" pc_hits:") lines.extend(f" {line}" for line in pc_lines) access_lines = _access_lines(self.accesses) if access_lines: lines.append(" interesting_accesses:") lines.extend(f" {line}" for line in access_lines) if self.context.unsupported: lines.append(f" unsupported={self.context.unsupported}") return lines def parse_frame(text: str) -> bytes: normalized = text.strip().replace(",", " ").replace(":", " ").replace("-", " ").replace("_", " ") parts = normalized.split() if len(parts) == 1: compact = parts[0] if compact.lower().startswith("0x"): compact = compact[2:] if compact.upper().startswith("H'"): compact = compact[2:] if len(compact) % 2: raise argparse.ArgumentTypeError("frame compact hex must have an even number of digits") parts = [compact[index : index + 2] for index in range(0, len(compact), 2)] values = [_parse_byte(part) for part in parts] if len(values) == FRAME_LENGTH - 1: values.append(frame_checksum(bytes(values))) if len(values) != FRAME_LENGTH: raise argparse.ArgumentTypeError("frame must contain 5 bytes plus computed checksum or exactly 6 bytes") return bytes(values) def frame_checksum(data: bytes) -> int: checksum = CHECKSUM_SEED for value in data[: FRAME_LENGTH - 1]: checksum ^= value return checksum & 0xFF def frame_checksum_ok(frame: bytes) -> bool: return len(frame) == FRAME_LENGTH and frame_checksum(frame) == frame[-1] def format_frame(data: bytes) -> str: return data.hex(" ").upper() def run_rx_probe( frames: Iterable[bytes], *, rom_path: Path | None = None, boot_steps: int = 250_000, per_byte_steps: int = 5_000, post_frame_steps: int = 80_000, uart_timing: bool = False, uart_baud: int = 38_400, interval_steps: int = 512, frt1_ocia_steps: int | None = None, frt2_ocia_steps: int | None = None, clock_hz: int = 10_000_000, p9_fast_path: bool = True, p9_fast_input: int = 0xFF, p9_fast_optimistic_wrapper: bool = False, stop_after_tx_frame: bool = True, ) -> tuple[Path, H8536Emulator, str, list[FrameResult]]: rom_bytes, discovered_rom_path = load_rom(rom_path) emulator = H8536Emulator( rom_bytes, interval_steps=interval_steps, frt1_ocia_steps=frt1_ocia_steps, frt2_ocia_steps=frt2_ocia_steps, clock_hz=clock_hz, p9_fast_path_enabled=p9_fast_path, p9_fast_default_input_byte=p9_fast_input, p9_fast_default_wrapper_success=p9_fast_optimistic_wrapper, ) boot_context = RunContext() boot_steps_used, boot_reason = _run_until(emulator, boot_steps, _rx_ready, boot_context) boot_summary = ( f"boot={boot_reason} steps={boot_steps_used} pc={h16(emulator.cpu.pc)} " f"SCR={emulator.sci1.scr:02X} SSR={emulator.sci1.ssr:02X} " f"rx_serviceable={int(_rx_ready(emulator))} " f"clock_hz={emulator.clock_hz} " f"lcd_display={emulator.memory.lcd.display_text(lines=4)!r}" ) results = [ _run_frame( emulator, frame, per_byte_steps=per_byte_steps, post_frame_steps=post_frame_steps, uart_timing=uart_timing, uart_baud=uart_baud, stop_after_tx_frame=stop_after_tx_frame, ) for frame in frames ] return discovered_rom_path, emulator, boot_summary, results def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Inject host SCI1 frames into the H8/536 emulator and listen for ROM TX responses.") parser.add_argument("frames", nargs="*", type=parse_frame, help="host frame hex; 5-byte inputs get a 0x5A-XOR checksum appended") parser.add_argument("--rom", type=Path, help="ROM image path; defaults to ROM/M27C512@DIP28_1.BIN when present") parser.add_argument("--preset", choices=("connect-lcd",), help="append a built-in host-frame set") parser.add_argument("--boot-steps", type=int, default=250_000, help="maximum steps to boot until SCI1 RXI is serviceable") parser.add_argument("--per-byte-steps", type=int, default=5_000, help="polite mode byte-consume limit, or UART mode step limit between byte arrivals") parser.add_argument("--post-frame-steps", type=int, default=80_000, help="maximum steps after a full injected frame") parser.add_argument("--uart-timing", action="store_true", help="inject frame bytes at real 8N1 UART inter-byte timing instead of waiting for RDRF consumption") parser.add_argument("--uart-baud", type=parse_int, default=38_400, help="baud rate for --uart-timing; 38400 gives about 260 us per 8N1 byte") parser.add_argument("--keep-listening", action="store_true", help="use all post-frame steps instead of stopping at the first new TX frame") parser.add_argument("--interval-steps", type=int, default=512, help="rough step period for the scaffolded interval timer interrupt") parser.add_argument("--clock-hz", type=parse_int, default=10_000_000, help="CPU/phi clock in Hz for calibrated FRT timing") parser.add_argument("--frt1-ocia-steps", type=int, default=None, help="legacy step-period override for FRT1 OCIA") parser.add_argument("--frt2-ocia-steps", type=int, default=None, help="legacy step-period override for FRT2 OCIA") parser.add_argument("--no-p9-fast-path", action="store_true", help="disable shortcut handling for known P9 routines") parser.add_argument("--p9-fast-input", type=parse_int, default=0xFF, help="default byte returned by the P9 fast-path read routine") parser.add_argument("--p9-fast-optimistic-wrapper", action="store_true", help="legacy fallback for older wrapper experiments; known BFE0/BFFE wrappers use the X24164 model") return parser def main(argv: list[str] | None = None) -> int: args = build_arg_parser().parse_args(argv) frames = list(args.frames) if args.preset == "connect-lcd": frames.extend(CONNECT_LCD_FRAMES) if not frames: raise SystemExit("pass at least one frame or use --preset connect-lcd") rom_path, emulator, boot_summary, results = run_rx_probe( frames, rom_path=args.rom, boot_steps=args.boot_steps, per_byte_steps=args.per_byte_steps, post_frame_steps=args.post_frame_steps, uart_timing=args.uart_timing, uart_baud=args.uart_baud, interval_steps=args.interval_steps, frt1_ocia_steps=args.frt1_ocia_steps, frt2_ocia_steps=args.frt2_ocia_steps, clock_hz=args.clock_hz, p9_fast_path=not args.no_p9_fast_path, p9_fast_input=args.p9_fast_input, p9_fast_optimistic_wrapper=args.p9_fast_optimistic_wrapper, stop_after_tx_frame=not args.keep_listening, ) print(f"rom={rom_path}") print(f"reset_vector={h16(emulator.reset_vector())}") print(boot_summary) for index, result in enumerate(results): for line in result.lines(index): print(line) print("total_tx_frames=" + " | ".join(format_frame(frame) for frame in emulator.sci1.tx_frames)) return 0 def _run_frame( emulator: H8536Emulator, frame: bytes, *, per_byte_steps: int, post_frame_steps: int, uart_timing: bool, uart_baud: int, stop_after_tx_frame: bool, ) -> FrameResult: state_before = _state_snapshot(emulator) log_start = len(emulator.memory.access_log) tx_byte_start = len(emulator.sci1.tx_bytes) tx_frame_start = len(emulator.sci1.tx_frames) context = RunContext() stopped_reason = "post_frame_steps" steps_total = 0 timing = UartTiming(baud=uart_baud) if uart_timing: steps_total, stopped_reason = _inject_frame_uart_timed( emulator, frame, timing=timing, max_steps_per_gap=per_byte_steps, context=context, ) injected_all_bytes = stopped_reason == "frame_injected_uart_timing" else: injected_all_bytes = True for offset, value in enumerate(frame): emulator.inject_sci1_rx_byte(value) steps, reason = _run_until(emulator, per_byte_steps, _rx_byte_consumed, context) steps_total += steps if reason != "predicate": stopped_reason = f"rx_byte_{offset}_{reason}" injected_all_bytes = False break if injected_all_bytes: target_frame_count = tx_frame_start + 1 def post_predicate(inner: H8536Emulator) -> bool: return stop_after_tx_frame and len(inner.sci1.tx_frames) >= target_frame_count steps, reason = _run_until(emulator, post_frame_steps, post_predicate, context) steps_total += steps stopped_reason = "tx_frame" if reason == "predicate" and stop_after_tx_frame else reason log_end = len(emulator.memory.access_log) state_after = _state_snapshot(emulator) return FrameResult( input_frame=frame, checksum_ok=frame_checksum_ok(frame), rx_injection=timing.summary(emulator.clock_hz) if uart_timing else "polite_wait_for_rdrf_clear", uart_byte_cycles=timing.cycles_per_character(emulator.clock_hz) if uart_timing else None, steps=steps_total, stopped_reason=stopped_reason, new_tx_bytes=bytes(emulator.sci1.tx_bytes[tx_byte_start:]), new_tx_frames=list(emulator.sci1.tx_frames[tx_frame_start:]), state_before=state_before, state_after=state_after, accesses=emulator.memory.access_log[log_start:log_end], context=context, ) def _inject_frame_uart_timed( emulator: H8536Emulator, frame: bytes, *, timing: UartTiming, max_steps_per_gap: int, context: RunContext, ) -> tuple[int, str]: steps_total = 0 start_cycles = emulator.cpu.cycles byte_cycles = timing.cycles_per_character(emulator.clock_hz) for offset, value in enumerate(frame): if offset: target_cycles = start_cycles + (offset * byte_cycles) steps, reason = _run_until_cycle(emulator, target_cycles, max_steps_per_gap, context) steps_total += steps if reason != "target_cycle": return steps_total, f"rx_byte_{offset}_{reason}" emulator.inject_sci1_rx_byte(value) return steps_total, "frame_injected_uart_timing" def _run_until( emulator: H8536Emulator, max_steps: int, predicate: Callable[[H8536Emulator], bool], context: RunContext, ) -> tuple[int, str]: for index in range(max_steps): if predicate(emulator): return index, "predicate" pc = emulator.cpu.pc context.record_pc(pc) try: emulator.step() except UnsupportedInstruction as exc: context.unsupported = str(exc) return index, "unsupported_instruction" return max_steps, "max_steps" def _run_until_cycle( emulator: H8536Emulator, target_cycles: int, max_steps: int, context: RunContext, ) -> tuple[int, str]: for index in range(max(0, max_steps)): if emulator.cpu.cycles >= target_cycles: return index, "target_cycle" pc = emulator.cpu.pc context.record_pc(pc) try: emulator.step() except UnsupportedInstruction as exc: context.unsupported = str(exc) return index, "unsupported_instruction" if emulator.cpu.cycles >= target_cycles: return max(0, max_steps), "target_cycle" return max(0, max_steps), "max_steps" def _rx_ready(emulator: H8536Emulator) -> bool: if not (emulator.sci1.scr & SCI_SCR_RIE and emulator.sci1.scr & SCI_SCR_RE): return False if emulator.vectors.get(VECTOR_SCI1_RXI) is None: return False return _sci1_priority(emulator) > _interrupt_mask(emulator) def _rx_byte_consumed(emulator: H8536Emulator) -> bool: return not (emulator.sci1.ssr & SCI_SSR_RDRF) and emulator.cpu.interrupt_depth == 0 def _sci1_priority(emulator: H8536Emulator) -> int: return (emulator.memory.read8(IPRE) >> 4) & 0x07 def _interrupt_mask(emulator: H8536Emulator) -> int: return (emulator.cpu.sr >> 8) & 0x07 def _state_snapshot(emulator: H8536Emulator) -> dict[str, int | str]: snapshot: dict[str, int | str] = {} for address, name in STATE_BYTES.items(): snapshot[name] = emulator.memory.read8(address) for address, name in STATE_WORDS.items(): snapshot[name] = emulator.memory.read16(address) snapshot["lcd_line_buffer_ascii"] = _ascii_window(emulator, 0xFAF0, 16) snapshot["lcd_display_ascii"] = emulator.memory.lcd.display_text(lines=4, width=16) snapshot["tx_frame_staging"] = format_frame(bytes(emulator.memory.read8(0xF850 + offset) for offset in range(6))) snapshot["rx_frame_validation"] = format_frame(bytes(emulator.memory.read8(0xF860 + offset) for offset in range(6))) return snapshot def _ascii_window(emulator: H8536Emulator, start: int, length: int) -> str: chars = [] for offset in range(length): value = emulator.memory.read8(start + offset) chars.append(chr(value) if 0x20 <= value <= 0x7E else ".") return "".join(chars) def _state_change_lines(before: dict[str, int | str], after: dict[str, int | str]) -> list[str]: lines = [] for key in sorted(after): if before.get(key) == after[key]: continue old = _state_value(before.get(key)) new = _state_value(after[key]) lines.append(f"{key}: {old}->{new}") return lines def _state_value(value: int | str | None) -> str: if isinstance(value, int): return f"H'{value:04X}" if value > 0xFF else f"H'{value:02X}" return repr(value) def _pc_hit_lines(context: RunContext) -> list[str]: lines = [f"{name}={count}" for name, count in sorted(context.pc_hits.items())] if context.first_pcs: first = ", ".join(f"{h16(pc)}:{label}" for pc, label in context.first_pcs[:16]) lines.append(f"first={first}") return lines def _access_lines(accesses: list[MemoryAccess]) -> list[str]: interesting = [access for access in accesses if _interesting_access(access)] lines = [] for access in interesting[:80]: label = _access_label(access.address) lines.append(f"{access.kind:<5} {h16(access.address)} {access.value:02X} {label}") if len(interesting) > 80: lines.append(f"... {len(interesting) - 80} more interesting accesses") return lines def _interesting_access(access: MemoryAccess) -> bool: if access.kind == "write": return _access_label(access.address) != "" return _access_label(access.address) in {"secondary_dispatch_or_table", "lcd_ports"} def _access_label(address: int) -> str: for start, end, label in ACCESS_RANGES: if start <= address <= end: return label return "" def _parse_byte(text: str) -> int: token = text.strip() if token.lower().startswith("0x"): token = token[2:] if token.upper().startswith("H'"): token = token[2:] if not token or len(token) > 2: raise argparse.ArgumentTypeError(f"invalid byte token {text!r}") try: value = int(token, 16) except ValueError as exc: raise argparse.ArgumentTypeError(f"invalid byte token {text!r}") from exc if not 0 <= value <= 0xFF: raise argparse.ArgumentTypeError(f"byte out of range {text!r}") return value __all__ = [ "CONNECT_LCD_FRAMES", "format_frame", "frame_checksum", "frame_checksum_ok", "main", "parse_frame", "run_rx_probe", "UartTiming", ]