1
0
Files
h8-536-decoder/h8536/emulator/rx_probe.py
2026-05-25 22:32:13 +10:00

559 lines
20 KiB
Python

from __future__ import annotations
import argparse
from collections import Counter
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Iterable
from ..formatting import h16, parse_int
from .cli import load_rom
from .constants import (
IPRE,
SCI_SCR_RE,
SCI_SCR_RIE,
SCI_SSR_RDRF,
VECTOR_SCI1_RXI,
)
from .errors import UnsupportedInstruction
from .memory import MemoryAccess
from .runner import H8536Emulator
from .uart import UartTiming
CHECKSUM_SEED = 0x5A
FRAME_LENGTH = 6
CONNECT_LCD_FRAMES = (
bytes.fromhex("04000040001E"),
bytes.fromhex("0400008000DE"),
bytes.fromhex("040000C0009E"),
)
WATCH_PCS = {
0xBB57: "sci1_eri_entry",
0xBB67: "sci1_rxi_entry",
0xBBD6: "rx_checksum_seed",
0xBBF0: "rx_checksum_compare",
0xBC08: "command_dispatch",
0xBD0E: "command_04_handler",
0xBCCD: "command_04_send",
0xBE05: "command_07_handler",
0xBE29: "rx_error_or_retry",
0xBA26: "tx_builder",
0xBA72: "tx_first_byte",
0xBA84: "txi_entry",
0x3ECC: "lcd_line_buffer_entry",
0x3F28: "lcd_driver_stage",
0x3F40: "lcd_port_writer",
}
WATCH_RANGES = (
(0x2CA6, 0x2D20, "connect_display_window"),
(0xBB57, 0xBE6F, "sci1_rx_command_window"),
)
ACCESS_RANGES = (
(0xF850, 0xF85D, "tx_staging_or_frame"),
(0xF860, 0xF86D, "rx_validation_or_capture"),
(0xF870, 0xF96F, "report_queue"),
(0xF970, 0xF9AF, "secondary_dispatch_or_table"),
(0xF9B0, 0xF9C8, "serial_gate_state"),
(0xFAA2, 0xFAA6, "serial_latches"),
(0xFAF0, 0xFAFF, "lcd_line_buffer"),
(0xE000, 0xE001, "primary_table_index_0000"),
(0xE400, 0xE401, "secondary_table_index_0000"),
(0xE800, 0xE801, "current_table_index_0000"),
(0xEC00, 0xEC01, "flag_table_index_0000"),
(0xF200, 0xF201, "lcd_ports"),
)
STATE_BYTES = {
0xF9B0: "queue_head",
0xF9B5: "queue_tail",
0xF9C0: "tx_gate",
0xF9C1: "rx_interbyte_timeout",
0xF9C3: "rx_index",
0xF9C4: "idle_heartbeat_gate",
0xF9C5: "rx_session_timeout",
0xF9C6: "resend_period_hi",
0xF9C7: "resend_period_lo",
0xF9C8: "resend_countdown",
0xFAA2: "session_flags",
0xFAA3: "pending_mask",
0xFAA4: "rx_error_latch",
0xFAA5: "retry_or_gate_flags",
0xFAA6: "retry_counter",
}
STATE_WORDS = {
0xE000: "E000_index_0000_primary",
0xE400: "E400_index_0000_secondary",
0xE800: "E800_index_0000_current",
0xF860: "rx_frame_01",
0xF862: "rx_frame_23",
0xF864: "rx_frame_45",
0xF970: "F970_selector_zero_dispatch",
}
@dataclass
class RunContext:
pc_hits: Counter[str] = field(default_factory=Counter)
first_pcs: list[tuple[int, str]] = field(default_factory=list)
unsupported: str | None = None
def record_pc(self, pc: int) -> None:
label = WATCH_PCS.get(pc)
if label is None:
for start, end, range_label in WATCH_RANGES:
if start <= pc <= end:
label = range_label
break
if label is None:
return
self.pc_hits[label] += 1
if len(self.first_pcs) < 32:
self.first_pcs.append((pc, label))
@dataclass(frozen=True)
class FrameResult:
input_frame: bytes
checksum_ok: bool
rx_injection: str
uart_byte_cycles: int | None
steps: int
stopped_reason: str
new_tx_bytes: bytes
new_tx_frames: list[bytes]
state_before: dict[str, int | str]
state_after: dict[str, int | str]
accesses: list[MemoryAccess]
context: RunContext
def lines(self, index: int) -> list[str]:
lines = [
f"host_frame[{index}]={format_frame(self.input_frame)} checksum_ok={int(self.checksum_ok)}",
f" rx_injection={self.rx_injection}",
f" stopped={self.stopped_reason} steps={self.steps}",
f" new_tx_bytes={format_frame(self.new_tx_bytes) if self.new_tx_bytes else 'none'}",
]
if self.new_tx_frames:
lines.append(" new_tx_frames=" + " | ".join(format_frame(frame) for frame in self.new_tx_frames))
else:
lines.append(" new_tx_frames=none")
lcd_display = self.state_after.get("lcd_display_ascii")
if isinstance(lcd_display, str):
lines.append(f" lcd_display={lcd_display!r}")
state_changes = _state_change_lines(self.state_before, self.state_after)
if state_changes:
lines.append(" state_changes:")
lines.extend(f" {line}" for line in state_changes)
pc_lines = _pc_hit_lines(self.context)
if pc_lines:
lines.append(" pc_hits:")
lines.extend(f" {line}" for line in pc_lines)
access_lines = _access_lines(self.accesses)
if access_lines:
lines.append(" interesting_accesses:")
lines.extend(f" {line}" for line in access_lines)
if self.context.unsupported:
lines.append(f" unsupported={self.context.unsupported}")
return lines
def parse_frame(text: str) -> bytes:
normalized = text.strip().replace(",", " ").replace(":", " ").replace("-", " ").replace("_", " ")
parts = normalized.split()
if len(parts) == 1:
compact = parts[0]
if compact.lower().startswith("0x"):
compact = compact[2:]
if compact.upper().startswith("H'"):
compact = compact[2:]
if len(compact) % 2:
raise argparse.ArgumentTypeError("frame compact hex must have an even number of digits")
parts = [compact[index : index + 2] for index in range(0, len(compact), 2)]
values = [_parse_byte(part) for part in parts]
if len(values) == FRAME_LENGTH - 1:
values.append(frame_checksum(bytes(values)))
if len(values) != FRAME_LENGTH:
raise argparse.ArgumentTypeError("frame must contain 5 bytes plus computed checksum or exactly 6 bytes")
return bytes(values)
def frame_checksum(data: bytes) -> int:
checksum = CHECKSUM_SEED
for value in data[: FRAME_LENGTH - 1]:
checksum ^= value
return checksum & 0xFF
def frame_checksum_ok(frame: bytes) -> bool:
return len(frame) == FRAME_LENGTH and frame_checksum(frame) == frame[-1]
def format_frame(data: bytes) -> str:
return data.hex(" ").upper()
def run_rx_probe(
frames: Iterable[bytes],
*,
rom_path: Path | None = None,
boot_steps: int = 250_000,
per_byte_steps: int = 5_000,
post_frame_steps: int = 80_000,
uart_timing: bool = False,
uart_baud: int = 38_400,
interval_steps: int = 512,
frt1_ocia_steps: int | None = None,
frt2_ocia_steps: int | None = None,
clock_hz: int = 10_000_000,
p9_fast_path: bool = True,
p9_fast_input: int = 0xFF,
p9_fast_optimistic_wrapper: bool = False,
stop_after_tx_frame: bool = True,
) -> tuple[Path, H8536Emulator, str, list[FrameResult]]:
rom_bytes, discovered_rom_path = load_rom(rom_path)
emulator = H8536Emulator(
rom_bytes,
interval_steps=interval_steps,
frt1_ocia_steps=frt1_ocia_steps,
frt2_ocia_steps=frt2_ocia_steps,
clock_hz=clock_hz,
p9_fast_path_enabled=p9_fast_path,
p9_fast_default_input_byte=p9_fast_input,
p9_fast_default_wrapper_success=p9_fast_optimistic_wrapper,
)
boot_context = RunContext()
boot_steps_used, boot_reason = _run_until(emulator, boot_steps, _rx_ready, boot_context)
boot_summary = (
f"boot={boot_reason} steps={boot_steps_used} pc={h16(emulator.cpu.pc)} "
f"SCR={emulator.sci1.scr:02X} SSR={emulator.sci1.ssr:02X} "
f"rx_serviceable={int(_rx_ready(emulator))} "
f"clock_hz={emulator.clock_hz} "
f"lcd_display={emulator.memory.lcd.display_text(lines=4)!r}"
)
results = [
_run_frame(
emulator,
frame,
per_byte_steps=per_byte_steps,
post_frame_steps=post_frame_steps,
uart_timing=uart_timing,
uart_baud=uart_baud,
stop_after_tx_frame=stop_after_tx_frame,
)
for frame in frames
]
return discovered_rom_path, emulator, boot_summary, results
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Inject host SCI1 frames into the H8/536 emulator and listen for ROM TX responses.")
parser.add_argument("frames", nargs="*", type=parse_frame, help="host frame hex; 5-byte inputs get a 0x5A-XOR checksum appended")
parser.add_argument("--rom", type=Path, help="ROM image path; defaults to ROM/M27C512@DIP28_1.BIN when present")
parser.add_argument("--preset", choices=("connect-lcd",), help="append a built-in host-frame set")
parser.add_argument("--boot-steps", type=int, default=250_000, help="maximum steps to boot until SCI1 RXI is serviceable")
parser.add_argument("--per-byte-steps", type=int, default=5_000, help="polite mode byte-consume limit, or UART mode step limit between byte arrivals")
parser.add_argument("--post-frame-steps", type=int, default=80_000, help="maximum steps after a full injected frame")
parser.add_argument("--uart-timing", action="store_true", help="inject frame bytes at real 8N1 UART inter-byte timing instead of waiting for RDRF consumption")
parser.add_argument("--uart-baud", type=parse_int, default=38_400, help="baud rate for --uart-timing; 38400 gives about 260 us per 8N1 byte")
parser.add_argument("--keep-listening", action="store_true", help="use all post-frame steps instead of stopping at the first new TX frame")
parser.add_argument("--interval-steps", type=int, default=512, help="rough step period for the scaffolded interval timer interrupt")
parser.add_argument("--clock-hz", type=parse_int, default=10_000_000, help="CPU/phi clock in Hz for calibrated FRT timing")
parser.add_argument("--frt1-ocia-steps", type=int, default=None, help="legacy step-period override for FRT1 OCIA")
parser.add_argument("--frt2-ocia-steps", type=int, default=None, help="legacy step-period override for FRT2 OCIA")
parser.add_argument("--no-p9-fast-path", action="store_true", help="disable shortcut handling for known P9 routines")
parser.add_argument("--p9-fast-input", type=parse_int, default=0xFF, help="default byte returned by the P9 fast-path read routine")
parser.add_argument("--p9-fast-optimistic-wrapper", action="store_true", help="make P9 fast-path wrapper calls succeed when no modeled P9 response is queued")
return parser
def main(argv: list[str] | None = None) -> int:
args = build_arg_parser().parse_args(argv)
frames = list(args.frames)
if args.preset == "connect-lcd":
frames.extend(CONNECT_LCD_FRAMES)
if not frames:
raise SystemExit("pass at least one frame or use --preset connect-lcd")
rom_path, emulator, boot_summary, results = run_rx_probe(
frames,
rom_path=args.rom,
boot_steps=args.boot_steps,
per_byte_steps=args.per_byte_steps,
post_frame_steps=args.post_frame_steps,
uart_timing=args.uart_timing,
uart_baud=args.uart_baud,
interval_steps=args.interval_steps,
frt1_ocia_steps=args.frt1_ocia_steps,
frt2_ocia_steps=args.frt2_ocia_steps,
clock_hz=args.clock_hz,
p9_fast_path=not args.no_p9_fast_path,
p9_fast_input=args.p9_fast_input,
p9_fast_optimistic_wrapper=args.p9_fast_optimistic_wrapper,
stop_after_tx_frame=not args.keep_listening,
)
print(f"rom={rom_path}")
print(f"reset_vector={h16(emulator.reset_vector())}")
print(boot_summary)
for index, result in enumerate(results):
for line in result.lines(index):
print(line)
print("total_tx_frames=" + " | ".join(format_frame(frame) for frame in emulator.sci1.tx_frames))
return 0
def _run_frame(
emulator: H8536Emulator,
frame: bytes,
*,
per_byte_steps: int,
post_frame_steps: int,
uart_timing: bool,
uart_baud: int,
stop_after_tx_frame: bool,
) -> FrameResult:
state_before = _state_snapshot(emulator)
log_start = len(emulator.memory.access_log)
tx_byte_start = len(emulator.sci1.tx_bytes)
tx_frame_start = len(emulator.sci1.tx_frames)
context = RunContext()
stopped_reason = "post_frame_steps"
steps_total = 0
timing = UartTiming(baud=uart_baud)
if uart_timing:
steps_total, stopped_reason = _inject_frame_uart_timed(
emulator,
frame,
timing=timing,
max_steps_per_gap=per_byte_steps,
context=context,
)
injected_all_bytes = stopped_reason == "frame_injected_uart_timing"
else:
injected_all_bytes = True
for offset, value in enumerate(frame):
emulator.inject_sci1_rx_byte(value)
steps, reason = _run_until(emulator, per_byte_steps, _rx_byte_consumed, context)
steps_total += steps
if reason != "predicate":
stopped_reason = f"rx_byte_{offset}_{reason}"
injected_all_bytes = False
break
if injected_all_bytes:
target_frame_count = tx_frame_start + 1
def post_predicate(inner: H8536Emulator) -> bool:
return stop_after_tx_frame and len(inner.sci1.tx_frames) >= target_frame_count
steps, reason = _run_until(emulator, post_frame_steps, post_predicate, context)
steps_total += steps
stopped_reason = "tx_frame" if reason == "predicate" and stop_after_tx_frame else reason
log_end = len(emulator.memory.access_log)
state_after = _state_snapshot(emulator)
return FrameResult(
input_frame=frame,
checksum_ok=frame_checksum_ok(frame),
rx_injection=timing.summary(emulator.clock_hz) if uart_timing else "polite_wait_for_rdrf_clear",
uart_byte_cycles=timing.cycles_per_character(emulator.clock_hz) if uart_timing else None,
steps=steps_total,
stopped_reason=stopped_reason,
new_tx_bytes=bytes(emulator.sci1.tx_bytes[tx_byte_start:]),
new_tx_frames=list(emulator.sci1.tx_frames[tx_frame_start:]),
state_before=state_before,
state_after=state_after,
accesses=emulator.memory.access_log[log_start:log_end],
context=context,
)
def _inject_frame_uart_timed(
emulator: H8536Emulator,
frame: bytes,
*,
timing: UartTiming,
max_steps_per_gap: int,
context: RunContext,
) -> tuple[int, str]:
steps_total = 0
start_cycles = emulator.cpu.cycles
byte_cycles = timing.cycles_per_character(emulator.clock_hz)
for offset, value in enumerate(frame):
if offset:
target_cycles = start_cycles + (offset * byte_cycles)
steps, reason = _run_until_cycle(emulator, target_cycles, max_steps_per_gap, context)
steps_total += steps
if reason != "target_cycle":
return steps_total, f"rx_byte_{offset}_{reason}"
emulator.inject_sci1_rx_byte(value)
return steps_total, "frame_injected_uart_timing"
def _run_until(
emulator: H8536Emulator,
max_steps: int,
predicate: Callable[[H8536Emulator], bool],
context: RunContext,
) -> tuple[int, str]:
for index in range(max_steps):
if predicate(emulator):
return index, "predicate"
pc = emulator.cpu.pc
context.record_pc(pc)
try:
emulator.step()
except UnsupportedInstruction as exc:
context.unsupported = str(exc)
return index, "unsupported_instruction"
return max_steps, "max_steps"
def _run_until_cycle(
emulator: H8536Emulator,
target_cycles: int,
max_steps: int,
context: RunContext,
) -> tuple[int, str]:
for index in range(max(0, max_steps)):
if emulator.cpu.cycles >= target_cycles:
return index, "target_cycle"
pc = emulator.cpu.pc
context.record_pc(pc)
try:
emulator.step()
except UnsupportedInstruction as exc:
context.unsupported = str(exc)
return index, "unsupported_instruction"
if emulator.cpu.cycles >= target_cycles:
return max(0, max_steps), "target_cycle"
return max(0, max_steps), "max_steps"
def _rx_ready(emulator: H8536Emulator) -> bool:
if not (emulator.sci1.scr & SCI_SCR_RIE and emulator.sci1.scr & SCI_SCR_RE):
return False
if emulator.vectors.get(VECTOR_SCI1_RXI) is None:
return False
return _sci1_priority(emulator) > _interrupt_mask(emulator)
def _rx_byte_consumed(emulator: H8536Emulator) -> bool:
return not (emulator.sci1.ssr & SCI_SSR_RDRF) and emulator.cpu.interrupt_depth == 0
def _sci1_priority(emulator: H8536Emulator) -> int:
return (emulator.memory.read8(IPRE) >> 4) & 0x07
def _interrupt_mask(emulator: H8536Emulator) -> int:
return (emulator.cpu.sr >> 8) & 0x07
def _state_snapshot(emulator: H8536Emulator) -> dict[str, int | str]:
snapshot: dict[str, int | str] = {}
for address, name in STATE_BYTES.items():
snapshot[name] = emulator.memory.read8(address)
for address, name in STATE_WORDS.items():
snapshot[name] = emulator.memory.read16(address)
snapshot["lcd_line_buffer_ascii"] = _ascii_window(emulator, 0xFAF0, 16)
snapshot["lcd_display_ascii"] = emulator.memory.lcd.display_text(lines=4, width=16)
snapshot["tx_frame_staging"] = format_frame(bytes(emulator.memory.read8(0xF850 + offset) for offset in range(6)))
snapshot["rx_frame_validation"] = format_frame(bytes(emulator.memory.read8(0xF860 + offset) for offset in range(6)))
return snapshot
def _ascii_window(emulator: H8536Emulator, start: int, length: int) -> str:
chars = []
for offset in range(length):
value = emulator.memory.read8(start + offset)
chars.append(chr(value) if 0x20 <= value <= 0x7E else ".")
return "".join(chars)
def _state_change_lines(before: dict[str, int | str], after: dict[str, int | str]) -> list[str]:
lines = []
for key in sorted(after):
if before.get(key) == after[key]:
continue
old = _state_value(before.get(key))
new = _state_value(after[key])
lines.append(f"{key}: {old}->{new}")
return lines
def _state_value(value: int | str | None) -> str:
if isinstance(value, int):
return f"H'{value:04X}" if value > 0xFF else f"H'{value:02X}"
return repr(value)
def _pc_hit_lines(context: RunContext) -> list[str]:
lines = [f"{name}={count}" for name, count in sorted(context.pc_hits.items())]
if context.first_pcs:
first = ", ".join(f"{h16(pc)}:{label}" for pc, label in context.first_pcs[:16])
lines.append(f"first={first}")
return lines
def _access_lines(accesses: list[MemoryAccess]) -> list[str]:
interesting = [access for access in accesses if _interesting_access(access)]
lines = []
for access in interesting[:80]:
label = _access_label(access.address)
lines.append(f"{access.kind:<5} {h16(access.address)} {access.value:02X} {label}")
if len(interesting) > 80:
lines.append(f"... {len(interesting) - 80} more interesting accesses")
return lines
def _interesting_access(access: MemoryAccess) -> bool:
if access.kind == "write":
return _access_label(access.address) != ""
return _access_label(access.address) in {"secondary_dispatch_or_table", "lcd_ports"}
def _access_label(address: int) -> str:
for start, end, label in ACCESS_RANGES:
if start <= address <= end:
return label
return ""
def _parse_byte(text: str) -> int:
token = text.strip()
if token.lower().startswith("0x"):
token = token[2:]
if token.upper().startswith("H'"):
token = token[2:]
if not token or len(token) > 2:
raise argparse.ArgumentTypeError(f"invalid byte token {text!r}")
try:
value = int(token, 16)
except ValueError as exc:
raise argparse.ArgumentTypeError(f"invalid byte token {text!r}") from exc
if not 0 <= value <= 0xFF:
raise argparse.ArgumentTypeError(f"byte out of range {text!r}")
return value
__all__ = [
"CONNECT_LCD_FRAMES",
"format_frame",
"frame_checksum",
"frame_checksum_ok",
"main",
"parse_frame",
"run_rx_probe",
"UartTiming",
]