464 lines
18 KiB
Python
464 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any, Iterable, Mapping
|
|
|
|
from ..formatting import h16
|
|
from ..bench_connect_lcd import FrameDetector, format_frame, label_frame
|
|
from .cli import load_rom
|
|
from .errors import UnsupportedInstruction
|
|
from .runner import H8536Emulator
|
|
from .rx_probe import (
|
|
RunContext,
|
|
_interrupt_mask,
|
|
_rx_byte_consumed,
|
|
_rx_ready,
|
|
_run_until,
|
|
_sci1_priority,
|
|
)
|
|
|
|
|
|
BENCH_CHUNK_RE = re.compile(
|
|
r"^\s*(?P<time>\d{1,2}:\d{2}:\d{2}(?:\.\d{1,6})?)\s+"
|
|
r"(?P<direction>TX|RX)\s+"
|
|
r"(?P<count>\d+)(?:\s+bytes?)?\s+"
|
|
r"(?P<byte_text>.*?)\s*$",
|
|
re.IGNORECASE,
|
|
)
|
|
SCREEN_RE = re.compile(
|
|
r"^\s*(?P<time>\d{1,2}:\d{2}:\d{2}(?:\.\d{1,6})?)\s+SCREEN\s+"
|
|
r"(?P<label>.*?):\s*(?P<note>.*?)\s*$",
|
|
re.IGNORECASE,
|
|
)
|
|
HEX_BYTE_RE = re.compile(r"\b[0-9A-Fa-f]{2}\b")
|
|
HEARTBEAT_FRAME = bytes.fromhex("0000000080DA")
|
|
CONNECT_OK_RESPONSE = bytes.fromhex("02000200005A")
|
|
BENCH_VISIBLE_C0_6020 = bytes.fromhex("0780C060205D")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BenchChunk:
|
|
timestamp: str
|
|
timestamp_ms: int
|
|
direction: str
|
|
bytes: bytes
|
|
raw_line: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BenchFrame:
|
|
timestamp: str
|
|
timestamp_ms: int
|
|
frame: bytes
|
|
label: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ScreenNote:
|
|
timestamp: str
|
|
timestamp_ms: int
|
|
label: str
|
|
note: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BenchReplayLog:
|
|
chunks: tuple[BenchChunk, ...]
|
|
tx_frames: tuple[BenchFrame, ...]
|
|
rx_frames: tuple[BenchFrame, ...]
|
|
screen_notes: tuple[ScreenNote, ...]
|
|
|
|
@property
|
|
def first_tx_ms(self) -> int | None:
|
|
return self.tx_frames[0].timestamp_ms if self.tx_frames else None
|
|
|
|
@property
|
|
def last_event_ms(self) -> int | None:
|
|
values = [chunk.timestamp_ms for chunk in self.chunks]
|
|
values.extend(note.timestamp_ms for note in self.screen_notes)
|
|
return max(values) if values else None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ReplayFrameResult:
|
|
host_frame: bytes
|
|
host_timestamp: str
|
|
host_delta_ms: int
|
|
steps_before: int
|
|
steps_during_rx: int
|
|
emulator_gap_frames_before: tuple[bytes, ...]
|
|
emulator_new_frames: tuple[bytes, ...]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BenchReplayResult:
|
|
log_path: Path
|
|
rom_path: Path
|
|
boot_summary: str
|
|
host_frames: tuple[BenchFrame, ...]
|
|
observed_device_frames: tuple[BenchFrame, ...]
|
|
replay_frame_results: tuple[ReplayFrameResult, ...]
|
|
emulator_tx_frames: tuple[bytes, ...]
|
|
emulator_lcd_display: str
|
|
emulator_lcd_line_buffer: str
|
|
parity: Mapping[str, Any]
|
|
|
|
def as_dict(self) -> dict[str, Any]:
|
|
return {
|
|
"kind": "h8536_emulator_bench_replay",
|
|
"log_path": str(self.log_path),
|
|
"rom_path": str(self.rom_path),
|
|
"boot_summary": self.boot_summary,
|
|
"host_frames": [_bench_frame_dict(frame) for frame in self.host_frames],
|
|
"observed_device_frames": [_bench_frame_dict(frame) for frame in self.observed_device_frames],
|
|
"replay_frame_results": [
|
|
{
|
|
"host_timestamp": item.host_timestamp,
|
|
"host_delta_ms": item.host_delta_ms,
|
|
"host_frame": format_frame(item.host_frame),
|
|
"steps_before": item.steps_before,
|
|
"steps_during_rx": item.steps_during_rx,
|
|
"emulator_gap_frames_before": [format_frame(frame) for frame in item.emulator_gap_frames_before],
|
|
"emulator_new_frames": [format_frame(frame) for frame in item.emulator_new_frames],
|
|
}
|
|
for item in self.replay_frame_results
|
|
],
|
|
"emulator_tx_frames": [format_frame(frame) for frame in self.emulator_tx_frames],
|
|
"emulator_lcd_display": self.emulator_lcd_display,
|
|
"emulator_lcd_line_buffer": self.emulator_lcd_line_buffer,
|
|
"parity": dict(self.parity),
|
|
}
|
|
|
|
def text_lines(self) -> list[str]:
|
|
lines = [
|
|
f"log={self.log_path}",
|
|
f"rom={self.rom_path}",
|
|
self.boot_summary,
|
|
"bench_host_frames=" + " | ".join(format_frame(frame.frame) for frame in self.host_frames),
|
|
"bench_device_nonheartbeat_frames="
|
|
+ _format_frame_list(frame.frame for frame in self.observed_device_frames if frame.frame != HEARTBEAT_FRAME),
|
|
"emulator_tx_frames=" + _format_frame_list(self.emulator_tx_frames),
|
|
f"emulator_lcd_display={self.emulator_lcd_display!r}",
|
|
f"emulator_lcd_line_buffer={self.emulator_lcd_line_buffer!r}",
|
|
"replay_frames:",
|
|
]
|
|
for index, item in enumerate(self.replay_frame_results):
|
|
lines.append(
|
|
(
|
|
f" [{index}] {item.host_timestamp} delta={item.host_delta_ms}ms "
|
|
f"steps_before={item.steps_before} steps_rx={item.steps_during_rx} "
|
|
f"host={format_frame(item.host_frame)} "
|
|
f"gap_emu={_format_frame_list(item.emulator_gap_frames_before)} "
|
|
f"emu_new={_format_frame_list(item.emulator_new_frames)}"
|
|
)
|
|
)
|
|
lines.append("parity:")
|
|
for key, value in self.parity.items():
|
|
lines.append(f" {key}={value}")
|
|
return lines
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ReplayConfig:
|
|
boot_steps: int = 250_000
|
|
per_byte_steps: int = 5_000
|
|
post_log_steps: int = 50_000
|
|
interval_steps: int = 512
|
|
frt1_ocia_steps: int | None = None
|
|
frt2_ocia_steps: int | None = None
|
|
clock_hz: int = 10_000_000
|
|
p9_fast_path: bool = True
|
|
p9_fast_input: int = 0xFF
|
|
|
|
|
|
def parse_bench_replay_log_text(text: str) -> BenchReplayLog:
|
|
chunks: list[BenchChunk] = []
|
|
tx_frames: list[BenchFrame] = []
|
|
rx_frames: list[BenchFrame] = []
|
|
screen_notes: list[ScreenNote] = []
|
|
rx_detector = FrameDetector()
|
|
|
|
for raw_line in text.splitlines():
|
|
screen_match = SCREEN_RE.match(raw_line)
|
|
if screen_match:
|
|
screen_notes.append(
|
|
ScreenNote(
|
|
timestamp=screen_match.group("time"),
|
|
timestamp_ms=_timestamp_ms(screen_match.group("time")),
|
|
label=screen_match.group("label").strip(),
|
|
note=screen_match.group("note").strip(),
|
|
)
|
|
)
|
|
continue
|
|
|
|
match = BENCH_CHUNK_RE.match(raw_line)
|
|
if not match:
|
|
continue
|
|
timestamp = match.group("time")
|
|
timestamp_ms = _timestamp_ms(timestamp)
|
|
direction = match.group("direction").upper()
|
|
data = bytes(int(token, 16) for token in HEX_BYTE_RE.findall(match.group("byte_text")))
|
|
declared = int(match.group("count"))
|
|
if len(data) != declared:
|
|
# Keep the bytes we could parse; the raw line remains available.
|
|
pass
|
|
chunk = BenchChunk(timestamp, timestamp_ms, direction, data, raw_line)
|
|
chunks.append(chunk)
|
|
if direction == "TX":
|
|
for frame_offset in range(0, len(data), 6):
|
|
frame = data[frame_offset : frame_offset + 6]
|
|
if len(frame) == 6:
|
|
tx_frames.append(BenchFrame(timestamp, timestamp_ms, frame, label_frame(frame)))
|
|
else:
|
|
for frame, label in rx_detector.feed(data):
|
|
rx_frames.append(BenchFrame(timestamp, timestamp_ms, frame, label))
|
|
|
|
return BenchReplayLog(tuple(chunks), tuple(tx_frames), tuple(rx_frames), tuple(screen_notes))
|
|
|
|
|
|
def parse_bench_replay_log(path: Path) -> BenchReplayLog:
|
|
return parse_bench_replay_log_text(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def run_bench_replay(log_path: Path, *, rom_path: Path | None = None, config: ReplayConfig = ReplayConfig()) -> BenchReplayResult:
|
|
bench_log = parse_bench_replay_log(log_path)
|
|
rom_bytes, discovered_rom_path = load_rom(rom_path)
|
|
emulator = H8536Emulator(
|
|
rom_bytes,
|
|
interval_steps=config.interval_steps,
|
|
frt1_ocia_steps=config.frt1_ocia_steps,
|
|
frt2_ocia_steps=config.frt2_ocia_steps,
|
|
clock_hz=config.clock_hz,
|
|
p9_fast_path_enabled=config.p9_fast_path,
|
|
p9_fast_default_input_byte=config.p9_fast_input,
|
|
)
|
|
|
|
context = RunContext()
|
|
boot_steps_used, boot_reason = _run_until(emulator, config.boot_steps, _rx_ready, context)
|
|
boot_summary = (
|
|
f"boot={boot_reason} steps={boot_steps_used} pc={h16(emulator.cpu.pc)} "
|
|
f"SCR={emulator.sci1.scr:02X} SSR={emulator.sci1.ssr:02X} "
|
|
f"rx_serviceable={int(_rx_ready(emulator))} "
|
|
f"sci1_priority={_sci1_priority(emulator)} interrupt_mask={_interrupt_mask(emulator)} "
|
|
f"clock_hz={emulator.clock_hz} "
|
|
f"lcd_display={emulator.memory.lcd.display_text(lines=4)!r}"
|
|
)
|
|
|
|
replay_results: list[ReplayFrameResult] = []
|
|
previous_tx_ms: int | None = None
|
|
for host in bench_log.tx_frames:
|
|
delta_ms = 0 if previous_tx_ms is None else max(0, host.timestamp_ms - previous_tx_ms)
|
|
tx_frame_start_before_delay = len(emulator.sci1.tx_frames)
|
|
steps_before = _run_cycles_for_ms(emulator, delta_ms, config.clock_hz, context)
|
|
gap_frames = tuple(emulator.sci1.tx_frames[tx_frame_start_before_delay:])
|
|
tx_frame_start = len(emulator.sci1.tx_frames)
|
|
steps_during_rx = _inject_host_frame(emulator, host.frame, config.per_byte_steps, context)
|
|
replay_results.append(
|
|
ReplayFrameResult(
|
|
host_frame=host.frame,
|
|
host_timestamp=host.timestamp,
|
|
host_delta_ms=delta_ms,
|
|
steps_before=steps_before,
|
|
steps_during_rx=steps_during_rx,
|
|
emulator_gap_frames_before=gap_frames,
|
|
emulator_new_frames=tuple(emulator.sci1.tx_frames[tx_frame_start:]),
|
|
)
|
|
)
|
|
previous_tx_ms = host.timestamp_ms
|
|
|
|
_run_steps(emulator, config.post_log_steps, context)
|
|
emulator_lcd_display = emulator.memory.lcd.display_text(lines=4, width=16)
|
|
emulator_lcd_line_buffer = _ascii_window(emulator, 0xFAF0, 16)
|
|
parity = assess_bench_parity(
|
|
bench_log,
|
|
emulator_tx_frames=emulator.sci1.tx_frames,
|
|
emulator_lcd_display=emulator_lcd_display,
|
|
emulator_lcd_line_buffer=emulator_lcd_line_buffer,
|
|
)
|
|
return BenchReplayResult(
|
|
log_path=log_path,
|
|
rom_path=discovered_rom_path,
|
|
boot_summary=boot_summary,
|
|
host_frames=bench_log.tx_frames,
|
|
observed_device_frames=bench_log.rx_frames,
|
|
replay_frame_results=tuple(replay_results),
|
|
emulator_tx_frames=tuple(emulator.sci1.tx_frames),
|
|
emulator_lcd_display=emulator_lcd_display,
|
|
emulator_lcd_line_buffer=emulator_lcd_line_buffer,
|
|
parity=parity,
|
|
)
|
|
|
|
|
|
def assess_bench_parity(
|
|
bench_log: BenchReplayLog,
|
|
*,
|
|
emulator_tx_frames: Iterable[bytes],
|
|
emulator_lcd_display: str,
|
|
emulator_lcd_line_buffer: str,
|
|
) -> dict[str, Any]:
|
|
emulator_frames = list(emulator_tx_frames)
|
|
bench_screen_text = " | ".join(note.note for note in bench_log.screen_notes)
|
|
bench_reached_ok = _looks_like_connect_ok(bench_screen_text)
|
|
emulator_reached_ok = _looks_like_connect_ok(emulator_lcd_display) or _looks_like_connect_ok(emulator_lcd_line_buffer)
|
|
bench_nonheartbeat = [frame.frame for frame in bench_log.rx_frames if frame.frame != HEARTBEAT_FRAME]
|
|
emulator_nonheartbeat = [frame for frame in emulator_frames if frame != HEARTBEAT_FRAME]
|
|
bench_visible_c0_6020 = BENCH_VISIBLE_C0_6020 in bench_nonheartbeat
|
|
emulator_visible_c0_6020 = BENCH_VISIBLE_C0_6020 in emulator_nonheartbeat
|
|
emulator_connect_ok_response = CONNECT_OK_RESPONSE in emulator_nonheartbeat
|
|
|
|
mismatch_reasons: list[str] = []
|
|
if bench_reached_ok != emulator_reached_ok:
|
|
mismatch_reasons.append("lcd_connect_state")
|
|
if bench_visible_c0_6020 and not emulator_visible_c0_6020:
|
|
mismatch_reasons.append("missing_visible_C0_6020_response")
|
|
if not bench_reached_ok and emulator_connect_ok_response:
|
|
mismatch_reasons.append("emulator_emitted_connect_ok_response")
|
|
|
|
return {
|
|
"bench_reached_connect_ok": bench_reached_ok,
|
|
"emulator_reached_connect_ok": emulator_reached_ok,
|
|
"bench_visible_C0_6020": bench_visible_c0_6020,
|
|
"emulator_visible_C0_6020": emulator_visible_c0_6020,
|
|
"emulator_connect_ok_response": emulator_connect_ok_response,
|
|
"bench_nonheartbeat_frames": [format_frame(frame) for frame in bench_nonheartbeat],
|
|
"emulator_nonheartbeat_frames": [format_frame(frame) for frame in emulator_nonheartbeat],
|
|
"matched": not mismatch_reasons,
|
|
"mismatch_reasons": mismatch_reasons,
|
|
}
|
|
|
|
|
|
def build_arg_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description="Replay a real bench serial log into the H8/536 ROM emulator.")
|
|
parser.add_argument("log", type=Path, help="bench log produced by scripts/bench_connect_lcd_sequence.py")
|
|
parser.add_argument("--rom", type=Path, help="ROM image path; defaults to ROM/M27C512@DIP28_1.BIN")
|
|
parser.add_argument("--boot-steps", type=int, default=ReplayConfig.boot_steps)
|
|
parser.add_argument("--per-byte-steps", type=int, default=ReplayConfig.per_byte_steps)
|
|
parser.add_argument("--post-log-steps", type=int, default=ReplayConfig.post_log_steps)
|
|
parser.add_argument("--interval-steps", type=int, default=ReplayConfig.interval_steps)
|
|
parser.add_argument("--clock-hz", type=lambda text: int(text, 0), default=ReplayConfig.clock_hz)
|
|
parser.add_argument("--frt1-ocia-steps", type=int, default=ReplayConfig.frt1_ocia_steps)
|
|
parser.add_argument("--frt2-ocia-steps", type=int, default=ReplayConfig.frt2_ocia_steps)
|
|
parser.add_argument("--no-p9-fast-path", action="store_true", help="disable shortcut handling for known P9 routines")
|
|
parser.add_argument("--p9-fast-input", type=lambda text: int(text, 0), default=ReplayConfig.p9_fast_input)
|
|
parser.add_argument("--assert-bench-parity", action="store_true", help="exit nonzero if emulator behavior diverges from the bench log")
|
|
parser.add_argument("--json", action="store_true", help="emit JSON")
|
|
return parser
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
args = build_arg_parser().parse_args(argv)
|
|
result = run_bench_replay(
|
|
args.log,
|
|
rom_path=args.rom,
|
|
config=ReplayConfig(
|
|
boot_steps=args.boot_steps,
|
|
per_byte_steps=args.per_byte_steps,
|
|
post_log_steps=args.post_log_steps,
|
|
interval_steps=args.interval_steps,
|
|
frt1_ocia_steps=args.frt1_ocia_steps,
|
|
frt2_ocia_steps=args.frt2_ocia_steps,
|
|
clock_hz=args.clock_hz,
|
|
p9_fast_path=not args.no_p9_fast_path,
|
|
p9_fast_input=args.p9_fast_input,
|
|
),
|
|
)
|
|
if args.json:
|
|
print(json.dumps(result.as_dict(), indent=2))
|
|
else:
|
|
for line in result.text_lines():
|
|
print(line)
|
|
if args.assert_bench_parity and not result.parity.get("matched"):
|
|
return 1
|
|
return 0
|
|
|
|
|
|
def _inject_host_frame(emulator: H8536Emulator, frame: bytes, per_byte_steps: int, context: RunContext) -> int:
|
|
steps_total = 0
|
|
for value in frame:
|
|
emulator.inject_sci1_rx_byte(value)
|
|
steps, _reason = _run_until(emulator, per_byte_steps, _rx_byte_consumed, context)
|
|
steps_total += steps
|
|
return steps_total
|
|
|
|
|
|
def _run_cycles_for_ms(emulator: H8536Emulator, delta_ms: int, clock_hz: int, context: RunContext) -> int:
|
|
target_delta_cycles = int((max(0, delta_ms) * max(1, clock_hz)) / 1000)
|
|
target_cycles = emulator.cpu.cycles + target_delta_cycles
|
|
completed = 0
|
|
while emulator.cpu.cycles < target_cycles:
|
|
context.record_pc(emulator.cpu.pc)
|
|
try:
|
|
emulator.step()
|
|
except UnsupportedInstruction as exc:
|
|
context.unsupported = str(exc)
|
|
break
|
|
completed += 1
|
|
return completed
|
|
|
|
|
|
def _run_steps(emulator: H8536Emulator, steps: int, context: RunContext) -> int:
|
|
completed = 0
|
|
for _ in range(max(0, steps)):
|
|
context.record_pc(emulator.cpu.pc)
|
|
try:
|
|
emulator.step()
|
|
except UnsupportedInstruction as exc:
|
|
context.unsupported = str(exc)
|
|
break
|
|
completed += 1
|
|
return completed
|
|
|
|
|
|
def _ascii_window(emulator: H8536Emulator, start: int, length: int) -> str:
|
|
chars = []
|
|
for offset in range(length):
|
|
value = emulator.memory.read8(start + offset)
|
|
chars.append(chr(value) if 0x20 <= value <= 0x7E else ".")
|
|
return "".join(chars)
|
|
|
|
|
|
def _looks_like_connect_ok(text: str) -> bool:
|
|
normalized = " ".join(text.upper().replace(":", " ").split())
|
|
return "CONNECT" in normalized and "OK" in normalized and "NOT ACT" not in normalized
|
|
|
|
|
|
def _timestamp_ms(text: str) -> int:
|
|
hours, minutes, rest = text.split(":")
|
|
if "." in rest:
|
|
seconds, fraction = rest.split(".", 1)
|
|
else:
|
|
seconds, fraction = rest, "0"
|
|
fraction = (fraction + "000")[:3]
|
|
return ((int(hours) * 60 + int(minutes)) * 60 + int(seconds)) * 1000 + int(fraction)
|
|
|
|
|
|
def _bench_frame_dict(frame: BenchFrame) -> dict[str, Any]:
|
|
return {
|
|
"timestamp": frame.timestamp,
|
|
"timestamp_ms": frame.timestamp_ms,
|
|
"frame": format_frame(frame.frame),
|
|
"label": frame.label,
|
|
}
|
|
|
|
|
|
def _format_frame_list(frames: Iterable[bytes]) -> str:
|
|
items = [format_frame(frame) for frame in frames]
|
|
return " | ".join(items) if items else "none"
|
|
|
|
|
|
__all__ = [
|
|
"BENCH_VISIBLE_C0_6020",
|
|
"BenchReplayLog",
|
|
"BenchReplayResult",
|
|
"ReplayConfig",
|
|
"assess_bench_parity",
|
|
"main",
|
|
"parse_bench_replay_log",
|
|
"parse_bench_replay_log_text",
|
|
"run_bench_replay",
|
|
]
|