diff --git a/.gitignore b/.gitignore index 0d20b64..93bbd37 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.pyc +captures/ diff --git a/README.md b/README.md index f740699..28b2739 100644 --- a/README.md +++ b/README.md @@ -43,8 +43,12 @@ To start the current emulator harness: .\.venv\Scripts\python.exe h8536_emulator_probe.py --max-steps 4000000 --stop-on-tx .\.venv\Scripts\python.exe h8536_emulator_probe.py --max-steps 1000000 --stop-on-tx --p9-fast-path .\.venv\Scripts\python.exe h8536_emulator_rx_probe.py --preset connect-lcd +.\.venv\Scripts\python.exe scripts\bench_connect_lcd_sequence.py --port COM5 --relay-port COM6 --prompt-screen +.\.venv\Scripts\python.exe h8536_emulator_bench_replay.py captures\bench-connect-lcd-sequence-20260525-214411.txt --assert-bench-parity ``` +The real-device bench helper uses `pyserial`; install repo dependencies with `.\.venv\Scripts\python.exe -m pip install -r requirements.txt` if needed. + ## What It Does - Decodes the H8/500 instruction set used by the H8/536. @@ -85,6 +89,8 @@ To start the current emulator harness: - Provides an early H8/536 emulator harness with ROM/RAM/register memory mapping, reset-vector boot, SCI1 transmit capture, MOV condition-code updates, `SCB/F`, stack/call/return support, indirect `JMP/JSR @Rn` dispatch, scaffolded SCI1 RXI/ERI/TXI and interval/FRT1-OCIA/FRT2-OCIA interrupt scheduling, a P9 bit-banged bus model, a 16x4 LCD bus/DDRAM model for `H'F200`/`H'F201`, and an opt-in P9 transfer fast path. - Includes an emulator probe that reports hot PCs, recent P9/SCI accesses, serial report queue/gate traces, RAM lifecycle watches, final SCI1/TXI state, and captured P9 byte candidates while running the real ROM. - Includes an RX command probe that boots until SCI1 RXI is serviceable, injects host six-byte frames through RDR/RDRF, listens for device TX frames, and reports serial latch/table/LCD-buffer and emulated-LCD effects. +- Includes a bench helper for replaying the emulator-derived CONNECT LCD frame sequence against the real device through COM5, with optional COM6 relay power cycling and timestamped capture logs. +- Includes a bench-log replay harness that feeds recorded host TX frames back into the ROM emulator and asserts parity against the real device's observed response/LCD state. Current serial observations: @@ -96,6 +102,7 @@ Current serial observations: - Runtime-confirmed heartbeat path: `loc_4067` writes `H'0000` into the queue via a zero-extended word move, `loc_BAF2/loc_BB08` dequeue it, `loc_BB1C/loc_BB20/loc_BB2B` stage the TX bytes, and `loc_BA26` emits `00 00 00 00 80 DA`. - Emulator LCD finding: the ROM writes the boot/no-active-session message to the LCD bus as ` CONNECT:NOT ACT` on line 0 by the time SCI1 RX is serviceable. Valid and invalid six-byte host frames leave that display active while normal serial replies/heartbeats continue. - RX probe finding: the `--preset connect-lcd` sequence reaches the command-`0x04` handler; `04 00 00 80 00 DE` writes table index zero, fills the LCD line buffer with `CONNECT: OK`, and emits `02 00 02 00 00 5A` in the current emulator model. +- Bench follow-up: replaying the emulator CONNECT sequence on the real device did not switch the LCD to OK. The real device answered the `04 00 00 80 00 DE` step with `07 80 C0 60 20 5D` in the captured run and remained at `CONNECT NOT ACT`, which points to a missing gate/session precondition in the emulator. - Observed capture labels such as `cam_power_button_candidate` and `call_button_candidate` are deliberately treated as capture overlays, not protocol facts hard-coded in ROM. The generated listing is written to: @@ -208,6 +215,8 @@ python h8536_emulator_rx_probe.py --help - `--target-frame "00 00 00 00 80 DA"`: compare staged/emitted TX bytes against an expected six-byte frame. - `h8536_emulator_rx_probe.py "04 00 00 80 00"`: append the checksum, inject the host frame through SCI1 RX, and summarize responses. - `h8536_emulator_rx_probe.py --preset connect-lcd`: replay the current CONNECT LCD activation candidates. +- `scripts\bench_connect_lcd_sequence.py --port COM5 --relay-port COM6 --prompt-screen`: power-cycle the bench device, wait for heartbeat readiness, send `04 00 00 40 00 1E`, `04 00 00 80 00 DE`, `04 00 00 C0 00 9E`, log RX/TX, and prompt for observed LCD text. +- `h8536_emulator_bench_replay.py captures\bench-connect-lcd-sequence-20260525-214411.txt --assert-bench-parity`: replay a real bench log into the emulator and intentionally fail while the emulator still emits `02 00 02 00 00 5A` instead of the bench-observed `07 80 C0 60 20 5D`. - Current status: boots from `H'1000`, initializes SCI1, models the first P9 bit-banged handshakes, captures P9 byte candidates, can optionally fast-path known P9 routines, schedules FRT1/FRT2 OCIA, captures the ROM-driven LCD line ` CONNECT:NOT ACT`, and emits the observed heartbeat frame `00 00 00 00 80 DA`. ## Code Layout @@ -250,4 +259,5 @@ python h8536_emulator_rx_probe.py --help - `h8536_serial_pseudocode.py`: focused serial pseudocode CLI wrapper. - `h8536_protocol_trace.py`, `h8536_protocol_capture.py`: protocol analysis CLI wrappers. - `h8536_serial_gate.py`, `h8536_report_source_trace.py`, `h8536_table_xrefs.py`, `h8536_consistency.py`: sidecar analysis CLI wrappers. -- `h8536_emulator.py`, `h8536_emulator_probe.py`, `h8536_emulator_rx_probe.py`: emulator CLI wrappers. +- `h8536_emulator.py`, `h8536_emulator_probe.py`, `h8536_emulator_rx_probe.py`, `h8536_emulator_bench_replay.py`: emulator CLI wrappers. +- `scripts/bench_connect_lcd_sequence.py`: real-device COM5/COM6 bench runner for the CONNECT LCD sequence. diff --git a/h8536/bench_connect_lcd.py b/h8536/bench_connect_lcd.py new file mode 100644 index 0000000..4418bbc --- /dev/null +++ b/h8536/bench_connect_lcd.py @@ -0,0 +1,348 @@ +from __future__ import annotations + +import argparse +import sys +import time +from collections import Counter +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Iterable, TextIO + + +CHECKSUM_SEED = 0x5A +FRAME_LENGTH = 6 + +CONNECT_LCD_SEQUENCE = ( + bytes.fromhex("04000040001E"), + bytes.fromhex("0400008000DE"), + bytes.fromhex("040000C0009E"), +) +COMMAND7_REPEAT_FRAME = bytes.fromhex("07000000005D") + + +@dataclass +class FrameDetector: + buffer: bytearray = field(default_factory=bytearray) + frames: list[bytes] = field(default_factory=list) + labels: Counter[str] = field(default_factory=Counter) + + def feed(self, data: bytes) -> list[tuple[bytes, str]]: + self.buffer.extend(data) + detected = [] + while len(self.buffer) >= FRAME_LENGTH: + frame = bytes(self.buffer[:FRAME_LENGTH]) + del self.buffer[:FRAME_LENGTH] + label = label_frame(frame) + self.frames.append(frame) + if label: + self.labels[label] += 1 + detected.append((frame, label)) + return detected + + +class BenchLogger: + def __init__(self, path: Path, stdout: TextIO = sys.stdout) -> None: + self.path = path + self.stdout = stdout + self.path.parent.mkdir(parents=True, exist_ok=True) + self.file = self.path.open("w", encoding="utf-8", newline="\n") + + def close(self) -> None: + self.file.close() + + def emit(self, line: str = "") -> None: + print(line, file=self.stdout) + print(line, file=self.file) + self.file.flush() + + def chunk(self, direction: str, data: bytes) -> None: + self.emit(f"{timestamp()} {direction:<2} {len(data):03d} bytes {format_frame(data)}") + + def event(self, text: str) -> None: + self.emit(f"{timestamp()} {text}") + + +def frame_checksum(data: bytes) -> int: + checksum = CHECKSUM_SEED + for value in data[: FRAME_LENGTH - 1]: + checksum ^= value + return checksum & 0xFF + + +def frame_checksum_ok(frame: bytes) -> bool: + return len(frame) == FRAME_LENGTH and frame_checksum(frame) == frame[-1] + + +def parse_frame(text: str) -> bytes: + normalized = text.strip().replace(",", " ").replace(":", " ").replace("-", " ").replace("_", " ") + parts = normalized.split() + if len(parts) == 1: + compact = parts[0] + if compact.lower().startswith("0x"): + compact = compact[2:] + if compact.upper().startswith("H'"): + compact = compact[2:] + if len(compact) % 2: + raise argparse.ArgumentTypeError("compact frame hex must contain an even number of digits") + parts = [compact[index : index + 2] for index in range(0, len(compact), 2)] + values = [_parse_byte(part) for part in parts] + if len(values) == FRAME_LENGTH - 1: + values.append(frame_checksum(bytes(values))) + if len(values) != FRAME_LENGTH: + raise argparse.ArgumentTypeError("frame must contain five bytes plus computed checksum, or exactly six bytes") + return bytes(values) + + +def format_frame(data: bytes) -> str: + return data.hex(" ").upper() + + +def label_frame(frame: bytes) -> str: + labels = { + bytes.fromhex("0000000080DA"): "heartbeat", + bytes.fromhex("02000200005A"): "connect_ok_path_response_candidate", + bytes.fromhex("010002000059"): "connect_c0_path_response_candidate", + bytes.fromhex("07804040A07D"): "visible_40A0_family_40", + bytes.fromhex("07808040A0BD"): "visible_40A0_family_80", + bytes.fromhex("0780C040A0FD"): "visible_40A0_family_C0", + bytes.fromhex("0780C060205D"): "visible_C0_6020_family_candidate", + } + label = labels.get(frame, "") + if label: + return label + if frame_checksum_ok(frame): + return "checksum_ok_unlabeled" + return "checksum_bad_or_unaligned" + + +def default_log_path() -> Path: + return Path("captures") / f"bench-connect-lcd-sequence-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt" + + +def build_arg_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Bench-test the emulator CONNECT LCD sequence against the real RCP over RS232." + ) + parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP") + parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate") + parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port") + parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate") + parser.add_argument("--no-power-cycle", action="store_true", help="do not send relay off/on before the test") + parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power") + parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power") + parser.add_argument("--off-seconds", type=float, default=1.5, help="seconds to hold the DUT powered off") + parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the Pico relay port") + parser.add_argument("--ready-timeout", type=float, default=10.0, help="seconds to wait for heartbeat before sending") + parser.add_argument("--ready-heartbeats", type=int, default=2, help="heartbeat frames to observe before sending") + parser.add_argument("--require-ready", action="store_true", help="abort if ready heartbeat count is not observed") + parser.add_argument("--frame-gap", type=float, default=0.150, help="seconds to listen between sent frames") + parser.add_argument("--post-sequence-read", type=float, default=3.0, help="seconds to listen after the sequence") + parser.add_argument("--repeat", type=int, default=1, help="times to send the frame sequence in the same power session") + parser.add_argument("--frame", action="append", type=parse_frame, help="override preset with a custom frame; repeatable") + parser.add_argument("--two-frame", action="store_true", help="send only the first two CONNECT candidate frames") + parser.add_argument("--command7-after", action="store_true", help="send command-7 repeat probe after the sequence") + parser.add_argument("--pre-sequence-drain", type=float, default=0.250, help="seconds to drain/log RX immediately before sending") + parser.add_argument("--prompt-screen", action="store_true", help="prompt for observed LCD text after the sequence") + parser.add_argument("--prompt-before-send", action="store_true", help="also prompt for LCD text before sending the sequence") + parser.add_argument("--log", type=Path, help="capture log path") + parser.add_argument("--dry-run", action="store_true", help="print the planned sequence without opening serial ports") + return parser + + +def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int: + args = build_arg_parser().parse_args(argv) + frames = _planned_frames(args) + log_path = args.log or default_log_path() + + if args.dry_run: + print(f"device={args.port} {args.baud} 8N1", file=stdout) + print(f"relay={args.relay_port} {args.relay_baud}", file=stdout) + print(f"power_cycle={int(not args.no_power_cycle)} off={args.power_off_command!r} on={args.power_on_command!r}", file=stdout) + for index, frame in enumerate(frames, start=1): + print(f"frame[{index}]={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}", file=stdout) + if args.command7_after: + print(f"command7_after={format_frame(COMMAND7_REPEAT_FRAME)} checksum_ok=1", file=stdout) + print(f"log={log_path}", file=stdout) + return 0 + + serial = _import_serial() + logger = BenchLogger(log_path, stdout=stdout) + detector = FrameDetector() + try: + logger.emit("CONNECT LCD bench sequence") + logger.emit(f"device={args.port} {args.baud} 8N1 relay={args.relay_port} {args.relay_baud}") + logger.emit(f"log={log_path}") + for index, frame in enumerate(frames, start=1): + logger.emit(f"plan frame[{index}]={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}") + + with serial.Serial(args.port, args.baud, bytesize=8, parity="N", stopbits=1, timeout=0.05) as device: + relay = None + try: + if not args.no_power_cycle: + relay = serial.Serial(args.relay_port, args.relay_baud, timeout=0.25) + _relay_settle(relay, args.relay_settle, logger) + _relay_command(relay, args.power_off_command, logger) + time.sleep(args.off_seconds) + device.reset_input_buffer() + detector = FrameDetector() + _relay_command(relay, args.power_on_command, logger) + else: + device.reset_input_buffer() + + ready = _wait_for_ready(device, detector, logger, args.ready_timeout, args.ready_heartbeats) + if args.require_ready and not ready: + logger.event("ABORT ready heartbeat threshold was not observed") + return 2 + if args.prompt_before_send: + _prompt_screen("LCD after boot/ready", logger) + if args.pre_sequence_drain > 0: + logger.event(f"DRAIN before sequence {args.pre_sequence_drain:.3f}s") + _read_for(device, detector, logger, args.pre_sequence_drain) + + for repeat_index in range(args.repeat): + if args.repeat > 1: + logger.event(f"BEGIN repeat {repeat_index + 1}/{args.repeat}") + for frame_index, frame in enumerate(frames, start=1): + _send_frame(device, frame, logger, f"seq{repeat_index + 1}.frame{frame_index}") + _read_for(device, detector, logger, args.frame_gap) + if args.command7_after: + _send_frame(device, COMMAND7_REPEAT_FRAME, logger, "command7_after") + _read_for(device, detector, logger, args.frame_gap) + + _read_for(device, detector, logger, args.post_sequence_read) + if args.prompt_screen: + _prompt_screen("LCD after CONNECT sequence", logger) + finally: + if relay is not None: + relay.close() + _summary(detector, logger) + return 0 + finally: + logger.close() + + +def _planned_frames(args: argparse.Namespace) -> list[bytes]: + if args.frame: + frames = list(args.frame) + elif args.two_frame: + frames = list(CONNECT_LCD_SEQUENCE[:2]) + else: + frames = list(CONNECT_LCD_SEQUENCE) + if not frames: + raise SystemExit("no frames selected") + return frames + + +def _import_serial(): + try: + import serial + except ImportError as exc: # pragma: no cover - depends on local environment. + raise SystemExit("pyserial is required; install it with: .\\.venv\\Scripts\\python.exe -m pip install pyserial") from exc + return serial + + +def _send_frame(device, frame: bytes, logger: BenchLogger, label: str) -> None: + device.write(frame) + device.flush() + logger.chunk("TX", frame) + logger.event(f"SENT {label} checksum_ok={int(frame_checksum_ok(frame))}") + + +def _read_for(device, detector: FrameDetector, logger: BenchLogger, seconds: float) -> None: + deadline = time.monotonic() + max(0.0, seconds) + while time.monotonic() < deadline: + waiting = getattr(device, "in_waiting", 0) + data = device.read(waiting or 1) + if data: + logger.chunk("RX", data) + for frame, label in detector.feed(data): + logger.event(f"DETECT {label} {format_frame(frame)}") + + +def _wait_for_ready( + device, + detector: FrameDetector, + logger: BenchLogger, + timeout_seconds: float, + ready_heartbeats: int, +) -> bool: + logger.event(f"WAIT ready heartbeat target={ready_heartbeats} timeout={timeout_seconds:.3f}s") + start_count = detector.labels["heartbeat"] + deadline = time.monotonic() + max(0.0, timeout_seconds) + while time.monotonic() < deadline: + _read_for(device, detector, logger, 0.100) + if detector.labels["heartbeat"] - start_count >= ready_heartbeats: + logger.event(f"READY heartbeat_count={detector.labels['heartbeat']}") + return True + logger.event(f"READY_TIMEOUT heartbeat_count={detector.labels['heartbeat']}") + return False + + +def _relay_settle(relay, seconds: float, logger: BenchLogger) -> None: + time.sleep(max(0.0, seconds)) + _read_relay_lines(relay, logger, prefix="RELAY") + + +def _relay_command(relay, command: str, logger: BenchLogger) -> None: + relay.write((command.strip() + "\n").encode("utf-8")) + relay.flush() + logger.event(f"RELAY_TX {command.strip()}") + _read_relay_lines(relay, logger, prefix="RELAY_RX") + + +def _read_relay_lines(relay, logger: BenchLogger, prefix: str) -> None: + deadline = time.monotonic() + 2.0 + while time.monotonic() < deadline: + line = relay.readline() + if not line: + return + logger.event(f"{prefix} {line.decode('utf-8', errors='replace').strip()}") + + +def _prompt_screen(label: str, logger: BenchLogger) -> None: + note = input(f"{label}: type observed LCD text, or press Enter to skip: ").strip() + logger.event(f"SCREEN {label}: {note or '(no note)'}") + + +def _summary(detector: FrameDetector, logger: BenchLogger) -> None: + logger.emit() + logger.emit("Summary") + logger.emit(f"rx_frames={len(detector.frames)} trailing_unframed_bytes={len(detector.buffer)}") + for label, count in sorted(detector.labels.items()): + logger.emit(f"{label}={count}") + + +def _parse_byte(text: str) -> int: + token = text.strip() + if token.lower().startswith("0x"): + token = token[2:] + if token.upper().startswith("H'"): + token = token[2:] + if not token or len(token) > 2: + raise argparse.ArgumentTypeError(f"invalid byte token {text!r}") + try: + value = int(token, 16) + except ValueError as exc: + raise argparse.ArgumentTypeError(f"invalid byte token {text!r}") from exc + if not 0 <= value <= 0xFF: + raise argparse.ArgumentTypeError(f"byte out of range {text!r}") + return value + + +def timestamp() -> str: + return datetime.now().strftime("%H:%M:%S.%f")[:-3] + + +__all__ = [ + "COMMAND7_REPEAT_FRAME", + "CONNECT_LCD_SEQUENCE", + "FrameDetector", + "build_arg_parser", + "format_frame", + "frame_checksum", + "frame_checksum_ok", + "label_frame", + "main", + "parse_frame", +] diff --git a/h8536/emulator/bench_replay.py b/h8536/emulator/bench_replay.py new file mode 100644 index 0000000..2a3818d --- /dev/null +++ b/h8536/emulator/bench_replay.py @@ -0,0 +1,451 @@ +from __future__ import annotations + +import argparse +import json +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Iterable, Mapping + +from ..formatting import h16 +from ..bench_connect_lcd import FrameDetector, format_frame, label_frame +from .cli import load_rom +from .errors import UnsupportedInstruction +from .runner import H8536Emulator +from .rx_probe import ( + RunContext, + _interrupt_mask, + _rx_byte_consumed, + _rx_ready, + _run_until, + _sci1_priority, +) + + +BENCH_CHUNK_RE = re.compile( + r"^\s*(?P