1
0
Files
h8-536-decoder/h8536/emulator/bench_replay.py
2026-05-26 15:21:52 +10:00

510 lines
20 KiB
Python

from __future__ import annotations
import argparse
import json
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable, Mapping
from ..formatting import h16
from ..bench_connect_lcd import FrameDetector, format_frame, label_frame
from .cli import load_rom
from .errors import UnsupportedInstruction
from .runner import H8536Emulator
from .rx_probe import (
RunContext,
UartTiming,
_inject_frame_uart_timed,
_interrupt_mask,
_rx_byte_consumed,
_rx_ready,
_run_until,
_sci1_priority,
)
BENCH_CHUNK_RE = re.compile(
r"^\s*(?P<time>\d{1,2}:\d{2}:\d{2}(?:\.\d{1,6})?)\s+"
r"(?P<direction>TX|RX)\s+"
r"(?P<count>\d+)(?:\s+bytes?)?\s+"
r"(?P<byte_text>.*?)\s*$",
re.IGNORECASE,
)
SCREEN_RE = re.compile(
r"^\s*(?P<time>\d{1,2}:\d{2}:\d{2}(?:\.\d{1,6})?)\s+SCREEN\s+"
r"(?P<label>.*?):\s*(?P<note>.*?)\s*$",
re.IGNORECASE,
)
HEX_BYTE_RE = re.compile(r"\b[0-9A-Fa-f]{2}\b")
HEARTBEAT_FRAME = bytes.fromhex("0000000080DA")
CONNECT_OK_RESPONSE = bytes.fromhex("02000200005A")
BENCH_VISIBLE_C0_6020 = bytes.fromhex("0780C060205D")
@dataclass(frozen=True)
class BenchChunk:
timestamp: str
timestamp_ms: int
direction: str
bytes: bytes
raw_line: str
@dataclass(frozen=True)
class BenchFrame:
timestamp: str
timestamp_ms: int
frame: bytes
label: str
@dataclass(frozen=True)
class ScreenNote:
timestamp: str
timestamp_ms: int
label: str
note: str
@dataclass(frozen=True)
class BenchReplayLog:
chunks: tuple[BenchChunk, ...]
tx_frames: tuple[BenchFrame, ...]
rx_frames: tuple[BenchFrame, ...]
screen_notes: tuple[ScreenNote, ...]
@property
def first_tx_ms(self) -> int | None:
return self.tx_frames[0].timestamp_ms if self.tx_frames else None
@property
def last_event_ms(self) -> int | None:
values = [chunk.timestamp_ms for chunk in self.chunks]
values.extend(note.timestamp_ms for note in self.screen_notes)
return max(values) if values else None
@dataclass(frozen=True)
class ReplayFrameResult:
host_frame: bytes
host_timestamp: str
host_delta_ms: int
rx_injection: str
steps_before: int
steps_during_rx: int
emulator_gap_frames_before: tuple[bytes, ...]
emulator_new_frames: tuple[bytes, ...]
@dataclass(frozen=True)
class BenchReplayResult:
log_path: Path
rom_path: Path
boot_summary: str
host_frames: tuple[BenchFrame, ...]
observed_device_frames: tuple[BenchFrame, ...]
replay_frame_results: tuple[ReplayFrameResult, ...]
emulator_tx_frames: tuple[bytes, ...]
emulator_lcd_display: str
emulator_lcd_line_buffer: str
parity: Mapping[str, Any]
def as_dict(self) -> dict[str, Any]:
return {
"kind": "h8536_emulator_bench_replay",
"log_path": str(self.log_path),
"rom_path": str(self.rom_path),
"boot_summary": self.boot_summary,
"host_frames": [_bench_frame_dict(frame) for frame in self.host_frames],
"observed_device_frames": [_bench_frame_dict(frame) for frame in self.observed_device_frames],
"replay_frame_results": [
{
"host_timestamp": item.host_timestamp,
"host_delta_ms": item.host_delta_ms,
"host_frame": format_frame(item.host_frame),
"rx_injection": item.rx_injection,
"steps_before": item.steps_before,
"steps_during_rx": item.steps_during_rx,
"emulator_gap_frames_before": [format_frame(frame) for frame in item.emulator_gap_frames_before],
"emulator_new_frames": [format_frame(frame) for frame in item.emulator_new_frames],
}
for item in self.replay_frame_results
],
"emulator_tx_frames": [format_frame(frame) for frame in self.emulator_tx_frames],
"emulator_lcd_display": self.emulator_lcd_display,
"emulator_lcd_line_buffer": self.emulator_lcd_line_buffer,
"parity": dict(self.parity),
}
def text_lines(self) -> list[str]:
lines = [
f"log={self.log_path}",
f"rom={self.rom_path}",
self.boot_summary,
"bench_host_frames=" + " | ".join(format_frame(frame.frame) for frame in self.host_frames),
"bench_device_nonheartbeat_frames="
+ _format_frame_list(frame.frame for frame in self.observed_device_frames if frame.frame != HEARTBEAT_FRAME),
"emulator_tx_frames=" + _format_frame_list(self.emulator_tx_frames),
f"emulator_lcd_display={self.emulator_lcd_display!r}",
f"emulator_lcd_line_buffer={self.emulator_lcd_line_buffer!r}",
"replay_frames:",
]
for index, item in enumerate(self.replay_frame_results):
lines.append(
(
f" [{index}] {item.host_timestamp} delta={item.host_delta_ms}ms "
f"rx_injection={item.rx_injection} "
f"steps_before={item.steps_before} steps_rx={item.steps_during_rx} "
f"host={format_frame(item.host_frame)} "
f"gap_emu={_format_frame_list(item.emulator_gap_frames_before)} "
f"emu_new={_format_frame_list(item.emulator_new_frames)}"
)
)
lines.append("parity:")
for key, value in self.parity.items():
lines.append(f" {key}={value}")
return lines
@dataclass(frozen=True)
class ReplayConfig:
boot_steps: int = 250_000
per_byte_steps: int = 5_000
post_log_steps: int = 50_000
interval_steps: int = 512
frt1_ocia_steps: int | None = None
frt2_ocia_steps: int | None = None
clock_hz: int = 10_000_000
uart_timing: bool = True
uart_baud: int = 38_400
uart_format: str = "8E1"
tx_wire_timing: bool = True
p9_fast_path: bool = True
p9_fast_input: int = 0xFF
p9_fast_optimistic_wrapper: bool = False
p7_input: int = 0xFF
eeprom_seed: str = "blank"
def parse_bench_replay_log_text(text: str) -> BenchReplayLog:
chunks: list[BenchChunk] = []
tx_frames: list[BenchFrame] = []
rx_frames: list[BenchFrame] = []
screen_notes: list[ScreenNote] = []
rx_detector = FrameDetector()
for raw_line in text.splitlines():
screen_match = SCREEN_RE.match(raw_line)
if screen_match:
screen_notes.append(
ScreenNote(
timestamp=screen_match.group("time"),
timestamp_ms=_timestamp_ms(screen_match.group("time")),
label=screen_match.group("label").strip(),
note=screen_match.group("note").strip(),
)
)
continue
match = BENCH_CHUNK_RE.match(raw_line)
if not match:
continue
timestamp = match.group("time")
timestamp_ms = _timestamp_ms(timestamp)
direction = match.group("direction").upper()
data = bytes(int(token, 16) for token in HEX_BYTE_RE.findall(match.group("byte_text")))
declared = int(match.group("count"))
if len(data) != declared:
# Keep the bytes we could parse; the raw line remains available.
pass
chunk = BenchChunk(timestamp, timestamp_ms, direction, data, raw_line)
chunks.append(chunk)
if direction == "TX":
for frame_offset in range(0, len(data), 6):
frame = data[frame_offset : frame_offset + 6]
if len(frame) == 6:
tx_frames.append(BenchFrame(timestamp, timestamp_ms, frame, label_frame(frame)))
else:
for frame, label in rx_detector.feed(data):
rx_frames.append(BenchFrame(timestamp, timestamp_ms, frame, label))
return BenchReplayLog(tuple(chunks), tuple(tx_frames), tuple(rx_frames), tuple(screen_notes))
def parse_bench_replay_log(path: Path) -> BenchReplayLog:
return parse_bench_replay_log_text(path.read_text(encoding="utf-8"))
def run_bench_replay(log_path: Path, *, rom_path: Path | None = None, config: ReplayConfig = ReplayConfig()) -> BenchReplayResult:
bench_log = parse_bench_replay_log(log_path)
rom_bytes, discovered_rom_path = load_rom(rom_path)
emulator = H8536Emulator(
rom_bytes,
interval_steps=config.interval_steps,
frt1_ocia_steps=config.frt1_ocia_steps,
frt2_ocia_steps=config.frt2_ocia_steps,
clock_hz=config.clock_hz,
p9_fast_path_enabled=config.p9_fast_path,
p9_fast_default_input_byte=config.p9_fast_input,
p9_fast_default_wrapper_success=config.p9_fast_optimistic_wrapper,
p7_input=config.p7_input,
eeprom_seed=config.eeprom_seed,
sci1_tx_timing=UartTiming.from_format(config.uart_format, baud=config.uart_baud)
if config.tx_wire_timing
else None,
)
context = RunContext()
boot_steps_used, boot_reason = _run_until(emulator, config.boot_steps, _rx_ready, context)
boot_summary = (
f"boot={boot_reason} steps={boot_steps_used} pc={h16(emulator.cpu.pc)} "
f"SCR={emulator.sci1.scr:02X} SSR={emulator.sci1.ssr:02X} "
f"rx_serviceable={int(_rx_ready(emulator))} "
f"sci1_priority={_sci1_priority(emulator)} interrupt_mask={_interrupt_mask(emulator)} "
f"clock_hz={emulator.clock_hz} "
f"uart_format={config.uart_format.upper()} tx_wire_timing={int(config.tx_wire_timing)} "
f"lcd_display={emulator.memory.lcd.display_text(lines=4)!r}"
)
replay_results: list[ReplayFrameResult] = []
previous_tx_ms: int | None = None
for host in bench_log.tx_frames:
delta_ms = 0 if previous_tx_ms is None else max(0, host.timestamp_ms - previous_tx_ms)
tx_frame_start_before_delay = len(emulator.sci1.tx_frames)
steps_before = _run_cycles_for_ms(emulator, delta_ms, config.clock_hz, context)
gap_frames = tuple(emulator.sci1.tx_frames[tx_frame_start_before_delay:])
tx_frame_start = len(emulator.sci1.tx_frames)
if config.uart_timing:
timing = UartTiming.from_format(config.uart_format, baud=config.uart_baud)
steps_during_rx, inject_reason = _inject_frame_uart_timed(
emulator,
host.frame,
timing=timing,
max_steps_per_gap=config.per_byte_steps,
context=context,
)
rx_injection = f"{timing.summary(emulator.clock_hz)} reason={inject_reason}"
else:
steps_during_rx = _inject_host_frame(emulator, host.frame, config.per_byte_steps, context)
rx_injection = "polite_wait_for_rdrf_clear"
replay_results.append(
ReplayFrameResult(
host_frame=host.frame,
host_timestamp=host.timestamp,
host_delta_ms=delta_ms,
rx_injection=rx_injection,
steps_before=steps_before,
steps_during_rx=steps_during_rx,
emulator_gap_frames_before=gap_frames,
emulator_new_frames=tuple(emulator.sci1.tx_frames[tx_frame_start:]),
)
)
previous_tx_ms = host.timestamp_ms
_run_steps(emulator, config.post_log_steps, context)
emulator_lcd_display = emulator.memory.lcd.display_text(lines=4, width=16)
emulator_lcd_line_buffer = _ascii_window(emulator, 0xFAF0, 16)
parity = assess_bench_parity(
bench_log,
emulator_tx_frames=emulator.sci1.tx_frames,
emulator_lcd_display=emulator_lcd_display,
emulator_lcd_line_buffer=emulator_lcd_line_buffer,
)
return BenchReplayResult(
log_path=log_path,
rom_path=discovered_rom_path,
boot_summary=boot_summary,
host_frames=bench_log.tx_frames,
observed_device_frames=bench_log.rx_frames,
replay_frame_results=tuple(replay_results),
emulator_tx_frames=tuple(emulator.sci1.tx_frames),
emulator_lcd_display=emulator_lcd_display,
emulator_lcd_line_buffer=emulator_lcd_line_buffer,
parity=parity,
)
def assess_bench_parity(
bench_log: BenchReplayLog,
*,
emulator_tx_frames: Iterable[bytes],
emulator_lcd_display: str,
emulator_lcd_line_buffer: str,
) -> dict[str, Any]:
emulator_frames = list(emulator_tx_frames)
bench_screen_text = " | ".join(note.note for note in bench_log.screen_notes)
bench_reached_ok = _looks_like_connect_ok(bench_screen_text)
emulator_reached_ok = _looks_like_connect_ok(emulator_lcd_display) or _looks_like_connect_ok(emulator_lcd_line_buffer)
bench_nonheartbeat = [frame.frame for frame in bench_log.rx_frames if frame.frame != HEARTBEAT_FRAME]
emulator_nonheartbeat = [frame for frame in emulator_frames if frame != HEARTBEAT_FRAME]
bench_visible_c0_6020 = BENCH_VISIBLE_C0_6020 in bench_nonheartbeat
emulator_visible_c0_6020 = BENCH_VISIBLE_C0_6020 in emulator_nonheartbeat
emulator_connect_ok_response = CONNECT_OK_RESPONSE in emulator_nonheartbeat
mismatch_reasons: list[str] = []
if bench_reached_ok != emulator_reached_ok:
mismatch_reasons.append("lcd_connect_state")
if bench_visible_c0_6020 and not emulator_visible_c0_6020:
mismatch_reasons.append("missing_visible_C0_6020_response")
if not bench_reached_ok and emulator_connect_ok_response:
mismatch_reasons.append("emulator_emitted_connect_ok_response")
return {
"bench_reached_connect_ok": bench_reached_ok,
"emulator_reached_connect_ok": emulator_reached_ok,
"bench_visible_C0_6020": bench_visible_c0_6020,
"emulator_visible_C0_6020": emulator_visible_c0_6020,
"emulator_connect_ok_response": emulator_connect_ok_response,
"bench_nonheartbeat_frames": [format_frame(frame) for frame in bench_nonheartbeat],
"emulator_nonheartbeat_frames": [format_frame(frame) for frame in emulator_nonheartbeat],
"matched": not mismatch_reasons,
"mismatch_reasons": mismatch_reasons,
}
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Replay a real bench serial log into the H8/536 ROM emulator.")
parser.add_argument("log", type=Path, help="bench log produced by scripts/bench_connect_lcd_sequence.py")
parser.add_argument("--rom", type=Path, help="ROM image path; defaults to ROM/M27C512@DIP28_1.BIN")
parser.add_argument("--boot-steps", type=int, default=ReplayConfig.boot_steps)
parser.add_argument("--per-byte-steps", type=int, default=ReplayConfig.per_byte_steps, help="UART mode step limit between byte arrivals, or polite mode byte-consume limit")
parser.add_argument("--post-log-steps", type=int, default=ReplayConfig.post_log_steps)
parser.add_argument("--interval-steps", type=int, default=ReplayConfig.interval_steps)
parser.add_argument("--clock-hz", type=lambda text: int(text, 0), default=ReplayConfig.clock_hz)
parser.add_argument("--uart-baud", type=lambda text: int(text, 0), default=ReplayConfig.uart_baud, help="baud rate for bench-style UART injection")
parser.add_argument("--uart-format", default=ReplayConfig.uart_format, help="UART character format for bench-style timing; real RCP link is 8E1")
parser.add_argument("--polite-rx", action="store_true", help="wait for each RX byte to be consumed before injecting the next byte")
parser.add_argument("--no-tx-wire-timing", action="store_true", help="use the legacy tiny TDRE delay instead of modeled UART TX character time")
parser.add_argument("--frt1-ocia-steps", type=int, default=ReplayConfig.frt1_ocia_steps)
parser.add_argument("--frt2-ocia-steps", type=int, default=ReplayConfig.frt2_ocia_steps)
parser.add_argument("--no-p9-fast-path", action="store_true", help="disable shortcut handling for known P9 routines")
parser.add_argument("--p9-fast-input", type=lambda text: int(text, 0), default=ReplayConfig.p9_fast_input)
parser.add_argument("--p9-fast-optimistic-wrapper", action="store_true", help="legacy fallback for older wrapper experiments; known BFE0/BFFE wrappers use the X24164 model")
parser.add_argument("--p7-input", type=lambda text: int(text, 0), default=ReplayConfig.p7_input, help="external P7 pin state for input bits; DIP-off board default is 0xFF")
parser.add_argument("--eeprom-seed", choices=("blank", "factory"), default=ReplayConfig.eeprom_seed, help="initial X24164/shadow state before reset")
parser.add_argument("--assert-bench-parity", action="store_true", help="exit nonzero if emulator behavior diverges from the bench log")
parser.add_argument("--json", action="store_true", help="emit JSON")
return parser
def main(argv: list[str] | None = None) -> int:
args = build_arg_parser().parse_args(argv)
result = run_bench_replay(
args.log,
rom_path=args.rom,
config=ReplayConfig(
boot_steps=args.boot_steps,
per_byte_steps=args.per_byte_steps,
post_log_steps=args.post_log_steps,
interval_steps=args.interval_steps,
frt1_ocia_steps=args.frt1_ocia_steps,
frt2_ocia_steps=args.frt2_ocia_steps,
clock_hz=args.clock_hz,
uart_timing=not args.polite_rx,
uart_baud=args.uart_baud,
uart_format=args.uart_format,
tx_wire_timing=not args.no_tx_wire_timing,
p9_fast_path=not args.no_p9_fast_path,
p9_fast_input=args.p9_fast_input,
p9_fast_optimistic_wrapper=args.p9_fast_optimistic_wrapper,
p7_input=args.p7_input,
eeprom_seed=args.eeprom_seed,
),
)
if args.json:
print(json.dumps(result.as_dict(), indent=2))
else:
for line in result.text_lines():
print(line)
if args.assert_bench_parity and not result.parity.get("matched"):
return 1
return 0
def _inject_host_frame(emulator: H8536Emulator, frame: bytes, per_byte_steps: int, context: RunContext) -> int:
steps_total = 0
for value in frame:
emulator.inject_sci1_rx_byte(value)
steps, _reason = _run_until(emulator, per_byte_steps, _rx_byte_consumed, context)
steps_total += steps
return steps_total
def _run_cycles_for_ms(emulator: H8536Emulator, delta_ms: int, clock_hz: int, context: RunContext) -> int:
target_delta_cycles = int((max(0, delta_ms) * max(1, clock_hz)) / 1000)
target_cycles = emulator.cpu.cycles + target_delta_cycles
completed = 0
while emulator.cpu.cycles < target_cycles:
context.record_pc(emulator.cpu.pc)
try:
emulator.step()
except UnsupportedInstruction as exc:
context.unsupported = str(exc)
break
completed += 1
return completed
def _run_steps(emulator: H8536Emulator, steps: int, context: RunContext) -> int:
completed = 0
for _ in range(max(0, steps)):
context.record_pc(emulator.cpu.pc)
try:
emulator.step()
except UnsupportedInstruction as exc:
context.unsupported = str(exc)
break
completed += 1
return completed
def _ascii_window(emulator: H8536Emulator, start: int, length: int) -> str:
chars = []
for offset in range(length):
value = emulator.memory.read8(start + offset)
chars.append(chr(value) if 0x20 <= value <= 0x7E else ".")
return "".join(chars)
def _looks_like_connect_ok(text: str) -> bool:
normalized = " ".join(text.upper().replace(":", " ").split())
return "CONNECT" in normalized and "OK" in normalized and "NOT ACT" not in normalized
def _timestamp_ms(text: str) -> int:
hours, minutes, rest = text.split(":")
if "." in rest:
seconds, fraction = rest.split(".", 1)
else:
seconds, fraction = rest, "0"
fraction = (fraction + "000")[:3]
return ((int(hours) * 60 + int(minutes)) * 60 + int(seconds)) * 1000 + int(fraction)
def _bench_frame_dict(frame: BenchFrame) -> dict[str, Any]:
return {
"timestamp": frame.timestamp,
"timestamp_ms": frame.timestamp_ms,
"frame": format_frame(frame.frame),
"label": frame.label,
}
def _format_frame_list(frames: Iterable[bytes]) -> str:
items = [format_frame(frame) for frame in frames]
return " | ".join(items) if items else "none"
__all__ = [
"BENCH_VISIBLE_C0_6020",
"BenchReplayLog",
"BenchReplayResult",
"ReplayConfig",
"assess_bench_parity",
"main",
"parse_bench_replay_log",
"parse_bench_replay_log_text",
"run_bench_replay",
]