1
0

Emula;tor bench mimicing

This commit is contained in:
Aiden
2026-05-25 22:00:25 +10:00
parent 191b72d418
commit 6d4d9f0027
9 changed files with 947 additions and 1 deletions

348
h8536/bench_connect_lcd.py Normal file
View File

@@ -0,0 +1,348 @@
from __future__ import annotations
import argparse
import sys
import time
from collections import Counter
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Iterable, TextIO
CHECKSUM_SEED = 0x5A
FRAME_LENGTH = 6
CONNECT_LCD_SEQUENCE = (
bytes.fromhex("04000040001E"),
bytes.fromhex("0400008000DE"),
bytes.fromhex("040000C0009E"),
)
COMMAND7_REPEAT_FRAME = bytes.fromhex("07000000005D")
@dataclass
class FrameDetector:
buffer: bytearray = field(default_factory=bytearray)
frames: list[bytes] = field(default_factory=list)
labels: Counter[str] = field(default_factory=Counter)
def feed(self, data: bytes) -> list[tuple[bytes, str]]:
self.buffer.extend(data)
detected = []
while len(self.buffer) >= FRAME_LENGTH:
frame = bytes(self.buffer[:FRAME_LENGTH])
del self.buffer[:FRAME_LENGTH]
label = label_frame(frame)
self.frames.append(frame)
if label:
self.labels[label] += 1
detected.append((frame, label))
return detected
class BenchLogger:
def __init__(self, path: Path, stdout: TextIO = sys.stdout) -> None:
self.path = path
self.stdout = stdout
self.path.parent.mkdir(parents=True, exist_ok=True)
self.file = self.path.open("w", encoding="utf-8", newline="\n")
def close(self) -> None:
self.file.close()
def emit(self, line: str = "") -> None:
print(line, file=self.stdout)
print(line, file=self.file)
self.file.flush()
def chunk(self, direction: str, data: bytes) -> None:
self.emit(f"{timestamp()} {direction:<2} {len(data):03d} bytes {format_frame(data)}")
def event(self, text: str) -> None:
self.emit(f"{timestamp()} {text}")
def frame_checksum(data: bytes) -> int:
checksum = CHECKSUM_SEED
for value in data[: FRAME_LENGTH - 1]:
checksum ^= value
return checksum & 0xFF
def frame_checksum_ok(frame: bytes) -> bool:
return len(frame) == FRAME_LENGTH and frame_checksum(frame) == frame[-1]
def parse_frame(text: str) -> bytes:
normalized = text.strip().replace(",", " ").replace(":", " ").replace("-", " ").replace("_", " ")
parts = normalized.split()
if len(parts) == 1:
compact = parts[0]
if compact.lower().startswith("0x"):
compact = compact[2:]
if compact.upper().startswith("H'"):
compact = compact[2:]
if len(compact) % 2:
raise argparse.ArgumentTypeError("compact frame hex must contain an even number of digits")
parts = [compact[index : index + 2] for index in range(0, len(compact), 2)]
values = [_parse_byte(part) for part in parts]
if len(values) == FRAME_LENGTH - 1:
values.append(frame_checksum(bytes(values)))
if len(values) != FRAME_LENGTH:
raise argparse.ArgumentTypeError("frame must contain five bytes plus computed checksum, or exactly six bytes")
return bytes(values)
def format_frame(data: bytes) -> str:
return data.hex(" ").upper()
def label_frame(frame: bytes) -> str:
labels = {
bytes.fromhex("0000000080DA"): "heartbeat",
bytes.fromhex("02000200005A"): "connect_ok_path_response_candidate",
bytes.fromhex("010002000059"): "connect_c0_path_response_candidate",
bytes.fromhex("07804040A07D"): "visible_40A0_family_40",
bytes.fromhex("07808040A0BD"): "visible_40A0_family_80",
bytes.fromhex("0780C040A0FD"): "visible_40A0_family_C0",
bytes.fromhex("0780C060205D"): "visible_C0_6020_family_candidate",
}
label = labels.get(frame, "")
if label:
return label
if frame_checksum_ok(frame):
return "checksum_ok_unlabeled"
return "checksum_bad_or_unaligned"
def default_log_path() -> Path:
return Path("captures") / f"bench-connect-lcd-sequence-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt"
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Bench-test the emulator CONNECT LCD sequence against the real RCP over RS232."
)
parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP")
parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate")
parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port")
parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate")
parser.add_argument("--no-power-cycle", action="store_true", help="do not send relay off/on before the test")
parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power")
parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power")
parser.add_argument("--off-seconds", type=float, default=1.5, help="seconds to hold the DUT powered off")
parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the Pico relay port")
parser.add_argument("--ready-timeout", type=float, default=10.0, help="seconds to wait for heartbeat before sending")
parser.add_argument("--ready-heartbeats", type=int, default=2, help="heartbeat frames to observe before sending")
parser.add_argument("--require-ready", action="store_true", help="abort if ready heartbeat count is not observed")
parser.add_argument("--frame-gap", type=float, default=0.150, help="seconds to listen between sent frames")
parser.add_argument("--post-sequence-read", type=float, default=3.0, help="seconds to listen after the sequence")
parser.add_argument("--repeat", type=int, default=1, help="times to send the frame sequence in the same power session")
parser.add_argument("--frame", action="append", type=parse_frame, help="override preset with a custom frame; repeatable")
parser.add_argument("--two-frame", action="store_true", help="send only the first two CONNECT candidate frames")
parser.add_argument("--command7-after", action="store_true", help="send command-7 repeat probe after the sequence")
parser.add_argument("--pre-sequence-drain", type=float, default=0.250, help="seconds to drain/log RX immediately before sending")
parser.add_argument("--prompt-screen", action="store_true", help="prompt for observed LCD text after the sequence")
parser.add_argument("--prompt-before-send", action="store_true", help="also prompt for LCD text before sending the sequence")
parser.add_argument("--log", type=Path, help="capture log path")
parser.add_argument("--dry-run", action="store_true", help="print the planned sequence without opening serial ports")
return parser
def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
args = build_arg_parser().parse_args(argv)
frames = _planned_frames(args)
log_path = args.log or default_log_path()
if args.dry_run:
print(f"device={args.port} {args.baud} 8N1", file=stdout)
print(f"relay={args.relay_port} {args.relay_baud}", file=stdout)
print(f"power_cycle={int(not args.no_power_cycle)} off={args.power_off_command!r} on={args.power_on_command!r}", file=stdout)
for index, frame in enumerate(frames, start=1):
print(f"frame[{index}]={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}", file=stdout)
if args.command7_after:
print(f"command7_after={format_frame(COMMAND7_REPEAT_FRAME)} checksum_ok=1", file=stdout)
print(f"log={log_path}", file=stdout)
return 0
serial = _import_serial()
logger = BenchLogger(log_path, stdout=stdout)
detector = FrameDetector()
try:
logger.emit("CONNECT LCD bench sequence")
logger.emit(f"device={args.port} {args.baud} 8N1 relay={args.relay_port} {args.relay_baud}")
logger.emit(f"log={log_path}")
for index, frame in enumerate(frames, start=1):
logger.emit(f"plan frame[{index}]={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}")
with serial.Serial(args.port, args.baud, bytesize=8, parity="N", stopbits=1, timeout=0.05) as device:
relay = None
try:
if not args.no_power_cycle:
relay = serial.Serial(args.relay_port, args.relay_baud, timeout=0.25)
_relay_settle(relay, args.relay_settle, logger)
_relay_command(relay, args.power_off_command, logger)
time.sleep(args.off_seconds)
device.reset_input_buffer()
detector = FrameDetector()
_relay_command(relay, args.power_on_command, logger)
else:
device.reset_input_buffer()
ready = _wait_for_ready(device, detector, logger, args.ready_timeout, args.ready_heartbeats)
if args.require_ready and not ready:
logger.event("ABORT ready heartbeat threshold was not observed")
return 2
if args.prompt_before_send:
_prompt_screen("LCD after boot/ready", logger)
if args.pre_sequence_drain > 0:
logger.event(f"DRAIN before sequence {args.pre_sequence_drain:.3f}s")
_read_for(device, detector, logger, args.pre_sequence_drain)
for repeat_index in range(args.repeat):
if args.repeat > 1:
logger.event(f"BEGIN repeat {repeat_index + 1}/{args.repeat}")
for frame_index, frame in enumerate(frames, start=1):
_send_frame(device, frame, logger, f"seq{repeat_index + 1}.frame{frame_index}")
_read_for(device, detector, logger, args.frame_gap)
if args.command7_after:
_send_frame(device, COMMAND7_REPEAT_FRAME, logger, "command7_after")
_read_for(device, detector, logger, args.frame_gap)
_read_for(device, detector, logger, args.post_sequence_read)
if args.prompt_screen:
_prompt_screen("LCD after CONNECT sequence", logger)
finally:
if relay is not None:
relay.close()
_summary(detector, logger)
return 0
finally:
logger.close()
def _planned_frames(args: argparse.Namespace) -> list[bytes]:
if args.frame:
frames = list(args.frame)
elif args.two_frame:
frames = list(CONNECT_LCD_SEQUENCE[:2])
else:
frames = list(CONNECT_LCD_SEQUENCE)
if not frames:
raise SystemExit("no frames selected")
return frames
def _import_serial():
try:
import serial
except ImportError as exc: # pragma: no cover - depends on local environment.
raise SystemExit("pyserial is required; install it with: .\\.venv\\Scripts\\python.exe -m pip install pyserial") from exc
return serial
def _send_frame(device, frame: bytes, logger: BenchLogger, label: str) -> None:
device.write(frame)
device.flush()
logger.chunk("TX", frame)
logger.event(f"SENT {label} checksum_ok={int(frame_checksum_ok(frame))}")
def _read_for(device, detector: FrameDetector, logger: BenchLogger, seconds: float) -> None:
deadline = time.monotonic() + max(0.0, seconds)
while time.monotonic() < deadline:
waiting = getattr(device, "in_waiting", 0)
data = device.read(waiting or 1)
if data:
logger.chunk("RX", data)
for frame, label in detector.feed(data):
logger.event(f"DETECT {label} {format_frame(frame)}")
def _wait_for_ready(
device,
detector: FrameDetector,
logger: BenchLogger,
timeout_seconds: float,
ready_heartbeats: int,
) -> bool:
logger.event(f"WAIT ready heartbeat target={ready_heartbeats} timeout={timeout_seconds:.3f}s")
start_count = detector.labels["heartbeat"]
deadline = time.monotonic() + max(0.0, timeout_seconds)
while time.monotonic() < deadline:
_read_for(device, detector, logger, 0.100)
if detector.labels["heartbeat"] - start_count >= ready_heartbeats:
logger.event(f"READY heartbeat_count={detector.labels['heartbeat']}")
return True
logger.event(f"READY_TIMEOUT heartbeat_count={detector.labels['heartbeat']}")
return False
def _relay_settle(relay, seconds: float, logger: BenchLogger) -> None:
time.sleep(max(0.0, seconds))
_read_relay_lines(relay, logger, prefix="RELAY")
def _relay_command(relay, command: str, logger: BenchLogger) -> None:
relay.write((command.strip() + "\n").encode("utf-8"))
relay.flush()
logger.event(f"RELAY_TX {command.strip()}")
_read_relay_lines(relay, logger, prefix="RELAY_RX")
def _read_relay_lines(relay, logger: BenchLogger, prefix: str) -> None:
deadline = time.monotonic() + 2.0
while time.monotonic() < deadline:
line = relay.readline()
if not line:
return
logger.event(f"{prefix} {line.decode('utf-8', errors='replace').strip()}")
def _prompt_screen(label: str, logger: BenchLogger) -> None:
note = input(f"{label}: type observed LCD text, or press Enter to skip: ").strip()
logger.event(f"SCREEN {label}: {note or '(no note)'}")
def _summary(detector: FrameDetector, logger: BenchLogger) -> None:
logger.emit()
logger.emit("Summary")
logger.emit(f"rx_frames={len(detector.frames)} trailing_unframed_bytes={len(detector.buffer)}")
for label, count in sorted(detector.labels.items()):
logger.emit(f"{label}={count}")
def _parse_byte(text: str) -> int:
token = text.strip()
if token.lower().startswith("0x"):
token = token[2:]
if token.upper().startswith("H'"):
token = token[2:]
if not token or len(token) > 2:
raise argparse.ArgumentTypeError(f"invalid byte token {text!r}")
try:
value = int(token, 16)
except ValueError as exc:
raise argparse.ArgumentTypeError(f"invalid byte token {text!r}") from exc
if not 0 <= value <= 0xFF:
raise argparse.ArgumentTypeError(f"byte out of range {text!r}")
return value
def timestamp() -> str:
return datetime.now().strftime("%H:%M:%S.%f")[:-3]
__all__ = [
"COMMAND7_REPEAT_FRAME",
"CONNECT_LCD_SEQUENCE",
"FrameDetector",
"build_arg_parser",
"format_frame",
"frame_checksum",
"frame_checksum_ok",
"label_frame",
"main",
"parse_frame",
]

View File

@@ -0,0 +1,451 @@
from __future__ import annotations
import argparse
import json
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable, Mapping
from ..formatting import h16
from ..bench_connect_lcd import FrameDetector, format_frame, label_frame
from .cli import load_rom
from .errors import UnsupportedInstruction
from .runner import H8536Emulator
from .rx_probe import (
RunContext,
_interrupt_mask,
_rx_byte_consumed,
_rx_ready,
_run_until,
_sci1_priority,
)
BENCH_CHUNK_RE = re.compile(
r"^\s*(?P<time>\d{1,2}:\d{2}:\d{2}(?:\.\d{1,6})?)\s+"
r"(?P<direction>TX|RX)\s+"
r"(?P<count>\d+)(?:\s+bytes?)?\s+"
r"(?P<byte_text>.*?)\s*$",
re.IGNORECASE,
)
SCREEN_RE = re.compile(
r"^\s*(?P<time>\d{1,2}:\d{2}:\d{2}(?:\.\d{1,6})?)\s+SCREEN\s+"
r"(?P<label>.*?):\s*(?P<note>.*?)\s*$",
re.IGNORECASE,
)
HEX_BYTE_RE = re.compile(r"\b[0-9A-Fa-f]{2}\b")
HEARTBEAT_FRAME = bytes.fromhex("0000000080DA")
CONNECT_OK_RESPONSE = bytes.fromhex("02000200005A")
BENCH_VISIBLE_C0_6020 = bytes.fromhex("0780C060205D")
@dataclass(frozen=True)
class BenchChunk:
timestamp: str
timestamp_ms: int
direction: str
bytes: bytes
raw_line: str
@dataclass(frozen=True)
class BenchFrame:
timestamp: str
timestamp_ms: int
frame: bytes
label: str
@dataclass(frozen=True)
class ScreenNote:
timestamp: str
timestamp_ms: int
label: str
note: str
@dataclass(frozen=True)
class BenchReplayLog:
chunks: tuple[BenchChunk, ...]
tx_frames: tuple[BenchFrame, ...]
rx_frames: tuple[BenchFrame, ...]
screen_notes: tuple[ScreenNote, ...]
@property
def first_tx_ms(self) -> int | None:
return self.tx_frames[0].timestamp_ms if self.tx_frames else None
@property
def last_event_ms(self) -> int | None:
values = [chunk.timestamp_ms for chunk in self.chunks]
values.extend(note.timestamp_ms for note in self.screen_notes)
return max(values) if values else None
@dataclass(frozen=True)
class ReplayFrameResult:
host_frame: bytes
host_timestamp: str
host_delta_ms: int
steps_before: int
steps_during_rx: int
emulator_gap_frames_before: tuple[bytes, ...]
emulator_new_frames: tuple[bytes, ...]
@dataclass(frozen=True)
class BenchReplayResult:
log_path: Path
rom_path: Path
boot_summary: str
host_frames: tuple[BenchFrame, ...]
observed_device_frames: tuple[BenchFrame, ...]
replay_frame_results: tuple[ReplayFrameResult, ...]
emulator_tx_frames: tuple[bytes, ...]
emulator_lcd_display: str
emulator_lcd_line_buffer: str
parity: Mapping[str, Any]
def as_dict(self) -> dict[str, Any]:
return {
"kind": "h8536_emulator_bench_replay",
"log_path": str(self.log_path),
"rom_path": str(self.rom_path),
"boot_summary": self.boot_summary,
"host_frames": [_bench_frame_dict(frame) for frame in self.host_frames],
"observed_device_frames": [_bench_frame_dict(frame) for frame in self.observed_device_frames],
"replay_frame_results": [
{
"host_timestamp": item.host_timestamp,
"host_delta_ms": item.host_delta_ms,
"host_frame": format_frame(item.host_frame),
"steps_before": item.steps_before,
"steps_during_rx": item.steps_during_rx,
"emulator_gap_frames_before": [format_frame(frame) for frame in item.emulator_gap_frames_before],
"emulator_new_frames": [format_frame(frame) for frame in item.emulator_new_frames],
}
for item in self.replay_frame_results
],
"emulator_tx_frames": [format_frame(frame) for frame in self.emulator_tx_frames],
"emulator_lcd_display": self.emulator_lcd_display,
"emulator_lcd_line_buffer": self.emulator_lcd_line_buffer,
"parity": dict(self.parity),
}
def text_lines(self) -> list[str]:
lines = [
f"log={self.log_path}",
f"rom={self.rom_path}",
self.boot_summary,
"bench_host_frames=" + " | ".join(format_frame(frame.frame) for frame in self.host_frames),
"bench_device_nonheartbeat_frames="
+ _format_frame_list(frame.frame for frame in self.observed_device_frames if frame.frame != HEARTBEAT_FRAME),
"emulator_tx_frames=" + _format_frame_list(self.emulator_tx_frames),
f"emulator_lcd_display={self.emulator_lcd_display!r}",
f"emulator_lcd_line_buffer={self.emulator_lcd_line_buffer!r}",
"replay_frames:",
]
for index, item in enumerate(self.replay_frame_results):
lines.append(
(
f" [{index}] {item.host_timestamp} delta={item.host_delta_ms}ms "
f"steps_before={item.steps_before} steps_rx={item.steps_during_rx} "
f"host={format_frame(item.host_frame)} "
f"gap_emu={_format_frame_list(item.emulator_gap_frames_before)} "
f"emu_new={_format_frame_list(item.emulator_new_frames)}"
)
)
lines.append("parity:")
for key, value in self.parity.items():
lines.append(f" {key}={value}")
return lines
@dataclass(frozen=True)
class ReplayConfig:
boot_steps: int = 250_000
per_byte_steps: int = 5_000
steps_per_second: int = 65_000
post_log_steps: int = 50_000
interval_steps: int = 512
frt1_ocia_steps: int = 512
frt2_ocia_steps: int = 512
p9_fast_path: bool = True
p9_fast_input: int = 0xFF
def parse_bench_replay_log_text(text: str) -> BenchReplayLog:
chunks: list[BenchChunk] = []
tx_frames: list[BenchFrame] = []
rx_frames: list[BenchFrame] = []
screen_notes: list[ScreenNote] = []
rx_detector = FrameDetector()
for raw_line in text.splitlines():
screen_match = SCREEN_RE.match(raw_line)
if screen_match:
screen_notes.append(
ScreenNote(
timestamp=screen_match.group("time"),
timestamp_ms=_timestamp_ms(screen_match.group("time")),
label=screen_match.group("label").strip(),
note=screen_match.group("note").strip(),
)
)
continue
match = BENCH_CHUNK_RE.match(raw_line)
if not match:
continue
timestamp = match.group("time")
timestamp_ms = _timestamp_ms(timestamp)
direction = match.group("direction").upper()
data = bytes(int(token, 16) for token in HEX_BYTE_RE.findall(match.group("byte_text")))
declared = int(match.group("count"))
if len(data) != declared:
# Keep the bytes we could parse; the raw line remains available.
pass
chunk = BenchChunk(timestamp, timestamp_ms, direction, data, raw_line)
chunks.append(chunk)
if direction == "TX":
for frame_offset in range(0, len(data), 6):
frame = data[frame_offset : frame_offset + 6]
if len(frame) == 6:
tx_frames.append(BenchFrame(timestamp, timestamp_ms, frame, label_frame(frame)))
else:
for frame, label in rx_detector.feed(data):
rx_frames.append(BenchFrame(timestamp, timestamp_ms, frame, label))
return BenchReplayLog(tuple(chunks), tuple(tx_frames), tuple(rx_frames), tuple(screen_notes))
def parse_bench_replay_log(path: Path) -> BenchReplayLog:
return parse_bench_replay_log_text(path.read_text(encoding="utf-8"))
def run_bench_replay(log_path: Path, *, rom_path: Path | None = None, config: ReplayConfig = ReplayConfig()) -> BenchReplayResult:
bench_log = parse_bench_replay_log(log_path)
rom_bytes, discovered_rom_path = load_rom(rom_path)
emulator = H8536Emulator(
rom_bytes,
interval_steps=config.interval_steps,
frt1_ocia_steps=config.frt1_ocia_steps,
frt2_ocia_steps=config.frt2_ocia_steps,
p9_fast_path_enabled=config.p9_fast_path,
p9_fast_default_input_byte=config.p9_fast_input,
)
context = RunContext()
boot_steps_used, boot_reason = _run_until(emulator, config.boot_steps, _rx_ready, context)
boot_summary = (
f"boot={boot_reason} steps={boot_steps_used} pc={h16(emulator.cpu.pc)} "
f"SCR={emulator.sci1.scr:02X} SSR={emulator.sci1.ssr:02X} "
f"rx_serviceable={int(_rx_ready(emulator))} "
f"sci1_priority={_sci1_priority(emulator)} interrupt_mask={_interrupt_mask(emulator)} "
f"lcd_display={emulator.memory.lcd.display_text(lines=4)!r}"
)
replay_results: list[ReplayFrameResult] = []
previous_tx_ms: int | None = None
for host in bench_log.tx_frames:
delta_ms = 0 if previous_tx_ms is None else max(0, host.timestamp_ms - previous_tx_ms)
tx_frame_start_before_delay = len(emulator.sci1.tx_frames)
steps_before = _run_steps_for_ms(emulator, delta_ms, config.steps_per_second, context)
gap_frames = tuple(emulator.sci1.tx_frames[tx_frame_start_before_delay:])
tx_frame_start = len(emulator.sci1.tx_frames)
steps_during_rx = _inject_host_frame(emulator, host.frame, config.per_byte_steps, context)
replay_results.append(
ReplayFrameResult(
host_frame=host.frame,
host_timestamp=host.timestamp,
host_delta_ms=delta_ms,
steps_before=steps_before,
steps_during_rx=steps_during_rx,
emulator_gap_frames_before=gap_frames,
emulator_new_frames=tuple(emulator.sci1.tx_frames[tx_frame_start:]),
)
)
previous_tx_ms = host.timestamp_ms
_run_steps(emulator, config.post_log_steps, context)
emulator_lcd_display = emulator.memory.lcd.display_text(lines=4, width=16)
emulator_lcd_line_buffer = _ascii_window(emulator, 0xFAF0, 16)
parity = assess_bench_parity(
bench_log,
emulator_tx_frames=emulator.sci1.tx_frames,
emulator_lcd_display=emulator_lcd_display,
emulator_lcd_line_buffer=emulator_lcd_line_buffer,
)
return BenchReplayResult(
log_path=log_path,
rom_path=discovered_rom_path,
boot_summary=boot_summary,
host_frames=bench_log.tx_frames,
observed_device_frames=bench_log.rx_frames,
replay_frame_results=tuple(replay_results),
emulator_tx_frames=tuple(emulator.sci1.tx_frames),
emulator_lcd_display=emulator_lcd_display,
emulator_lcd_line_buffer=emulator_lcd_line_buffer,
parity=parity,
)
def assess_bench_parity(
bench_log: BenchReplayLog,
*,
emulator_tx_frames: Iterable[bytes],
emulator_lcd_display: str,
emulator_lcd_line_buffer: str,
) -> dict[str, Any]:
emulator_frames = list(emulator_tx_frames)
bench_screen_text = " | ".join(note.note for note in bench_log.screen_notes)
bench_reached_ok = _looks_like_connect_ok(bench_screen_text)
emulator_reached_ok = _looks_like_connect_ok(emulator_lcd_display) or _looks_like_connect_ok(emulator_lcd_line_buffer)
bench_nonheartbeat = [frame.frame for frame in bench_log.rx_frames if frame.frame != HEARTBEAT_FRAME]
emulator_nonheartbeat = [frame for frame in emulator_frames if frame != HEARTBEAT_FRAME]
bench_visible_c0_6020 = BENCH_VISIBLE_C0_6020 in bench_nonheartbeat
emulator_visible_c0_6020 = BENCH_VISIBLE_C0_6020 in emulator_nonheartbeat
emulator_connect_ok_response = CONNECT_OK_RESPONSE in emulator_nonheartbeat
mismatch_reasons: list[str] = []
if bench_reached_ok != emulator_reached_ok:
mismatch_reasons.append("lcd_connect_state")
if bench_visible_c0_6020 and not emulator_visible_c0_6020:
mismatch_reasons.append("missing_visible_C0_6020_response")
if not bench_reached_ok and emulator_connect_ok_response:
mismatch_reasons.append("emulator_emitted_connect_ok_response")
return {
"bench_reached_connect_ok": bench_reached_ok,
"emulator_reached_connect_ok": emulator_reached_ok,
"bench_visible_C0_6020": bench_visible_c0_6020,
"emulator_visible_C0_6020": emulator_visible_c0_6020,
"emulator_connect_ok_response": emulator_connect_ok_response,
"bench_nonheartbeat_frames": [format_frame(frame) for frame in bench_nonheartbeat],
"emulator_nonheartbeat_frames": [format_frame(frame) for frame in emulator_nonheartbeat],
"matched": not mismatch_reasons,
"mismatch_reasons": mismatch_reasons,
}
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Replay a real bench serial log into the H8/536 ROM emulator.")
parser.add_argument("log", type=Path, help="bench log produced by scripts/bench_connect_lcd_sequence.py")
parser.add_argument("--rom", type=Path, help="ROM image path; defaults to ROM/M27C512@DIP28_1.BIN")
parser.add_argument("--boot-steps", type=int, default=ReplayConfig.boot_steps)
parser.add_argument("--per-byte-steps", type=int, default=ReplayConfig.per_byte_steps)
parser.add_argument("--steps-per-second", type=int, default=ReplayConfig.steps_per_second)
parser.add_argument("--post-log-steps", type=int, default=ReplayConfig.post_log_steps)
parser.add_argument("--interval-steps", type=int, default=ReplayConfig.interval_steps)
parser.add_argument("--frt1-ocia-steps", type=int, default=ReplayConfig.frt1_ocia_steps)
parser.add_argument("--frt2-ocia-steps", type=int, default=ReplayConfig.frt2_ocia_steps)
parser.add_argument("--no-p9-fast-path", action="store_true", help="disable shortcut handling for known P9 routines")
parser.add_argument("--p9-fast-input", type=lambda text: int(text, 0), default=ReplayConfig.p9_fast_input)
parser.add_argument("--assert-bench-parity", action="store_true", help="exit nonzero if emulator behavior diverges from the bench log")
parser.add_argument("--json", action="store_true", help="emit JSON")
return parser
def main(argv: list[str] | None = None) -> int:
args = build_arg_parser().parse_args(argv)
result = run_bench_replay(
args.log,
rom_path=args.rom,
config=ReplayConfig(
boot_steps=args.boot_steps,
per_byte_steps=args.per_byte_steps,
steps_per_second=args.steps_per_second,
post_log_steps=args.post_log_steps,
interval_steps=args.interval_steps,
frt1_ocia_steps=args.frt1_ocia_steps,
frt2_ocia_steps=args.frt2_ocia_steps,
p9_fast_path=not args.no_p9_fast_path,
p9_fast_input=args.p9_fast_input,
),
)
if args.json:
print(json.dumps(result.as_dict(), indent=2))
else:
for line in result.text_lines():
print(line)
if args.assert_bench_parity and not result.parity.get("matched"):
return 1
return 0
def _inject_host_frame(emulator: H8536Emulator, frame: bytes, per_byte_steps: int, context: RunContext) -> int:
steps_total = 0
for value in frame:
emulator.inject_sci1_rx_byte(value)
steps, _reason = _run_until(emulator, per_byte_steps, _rx_byte_consumed, context)
steps_total += steps
return steps_total
def _run_steps_for_ms(emulator: H8536Emulator, delta_ms: int, steps_per_second: int, context: RunContext) -> int:
steps = int((max(0, delta_ms) * max(1, steps_per_second)) / 1000)
return _run_steps(emulator, steps, context)
def _run_steps(emulator: H8536Emulator, steps: int, context: RunContext) -> int:
completed = 0
for _ in range(max(0, steps)):
context.record_pc(emulator.cpu.pc)
try:
emulator.step()
except UnsupportedInstruction as exc:
context.unsupported = str(exc)
break
completed += 1
return completed
def _ascii_window(emulator: H8536Emulator, start: int, length: int) -> str:
chars = []
for offset in range(length):
value = emulator.memory.read8(start + offset)
chars.append(chr(value) if 0x20 <= value <= 0x7E else ".")
return "".join(chars)
def _looks_like_connect_ok(text: str) -> bool:
normalized = " ".join(text.upper().replace(":", " ").split())
return "CONNECT" in normalized and "OK" in normalized and "NOT ACT" not in normalized
def _timestamp_ms(text: str) -> int:
hours, minutes, rest = text.split(":")
if "." in rest:
seconds, fraction = rest.split(".", 1)
else:
seconds, fraction = rest, "0"
fraction = (fraction + "000")[:3]
return ((int(hours) * 60 + int(minutes)) * 60 + int(seconds)) * 1000 + int(fraction)
def _bench_frame_dict(frame: BenchFrame) -> dict[str, Any]:
return {
"timestamp": frame.timestamp,
"timestamp_ms": frame.timestamp_ms,
"frame": format_frame(frame.frame),
"label": frame.label,
}
def _format_frame_list(frames: Iterable[bytes]) -> str:
items = [format_frame(frame) for frame in frames]
return " | ".join(items) if items else "none"
__all__ = [
"BENCH_VISIBLE_C0_6020",
"BenchReplayLog",
"BenchReplayResult",
"ReplayConfig",
"assess_bench_parity",
"main",
"parse_bench_replay_log",
"parse_bench_replay_log_text",
"run_bench_replay",
]