From 7c211f8112e82abc93fdae664969d69e7aac932c Mon Sep 17 00:00:00 2001 From: Aiden <68633820+awils27@users.noreply.github.com> Date: Tue, 26 May 2026 00:15:25 +1000 Subject: [PATCH] bench ack --- README.md | 2 + h8536/bench_connect_lcd.py | 1 + h8536/serial_ack_probe.py | 188 ++++++++++++++++++++++++++++++++ scripts/serial_ack_probe.py | 14 +++ tests/test_bench_connect_lcd.py | 3 + 5 files changed, 208 insertions(+) create mode 100644 h8536/serial_ack_probe.py create mode 100644 scripts/serial_ack_probe.py diff --git a/README.md b/README.md index a263092..e576894 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ To start the current emulator harness: .\.venv\Scripts\python.exe h8536_emulator_probe.py --max-steps 1000000 --stop-on-tx --p9-fast-path .\.venv\Scripts\python.exe h8536_emulator_rx_probe.py --preset connect-lcd .\.venv\Scripts\python.exe scripts\bench_connect_lcd_sequence.py --port COM5 --relay-port COM6 --prompt-screen +.\.venv\Scripts\python.exe scripts\serial_ack_probe.py --ack-frame "05 00 40 00 00 1F" .\.venv\Scripts\python.exe h8536_emulator_bench_replay.py captures\bench-connect-lcd-sequence-20260525-214411.txt --assert-bench-parity ``` @@ -90,6 +91,7 @@ The real-device bench helper uses `pyserial`; install repo dependencies with `.\ - Includes an emulator probe that reports hot PCs, recent P9/SCI accesses, serial report queue/gate traces, RAM lifecycle watches, final SCI1/TXI state, and captured P9 byte candidates while running the real ROM. - Includes an RX command probe that boots until SCI1 RXI is serviceable, injects host six-byte frames through RDR/RDRF, can optionally schedule 38400 8N1 byte arrivals at real UART spacing, listens for device TX frames, and reports serial latch/table/LCD-buffer and emulated-LCD effects. - Includes a bench helper for replaying the emulator-derived CONNECT LCD frame sequence against the real device through COM5, with optional COM6 relay power cycling and timestamped capture logs. +- Includes a bench ACK probe that reproduces the `01 00 00...` -> `01 00 01...` visible retry burst, waits for `07 80 40 20 90 2D`, then sends a candidate command-5 ACK and reports whether the target keeps repeating. - Includes a bench-log replay harness that feeds recorded host TX frames back into the ROM emulator with bench-style UART byte timing by default and asserts parity against the real device's observed response/LCD state. Current serial observations: diff --git a/h8536/bench_connect_lcd.py b/h8536/bench_connect_lcd.py index 4418bbc..e6cae60 100644 --- a/h8536/bench_connect_lcd.py +++ b/h8536/bench_connect_lcd.py @@ -106,6 +106,7 @@ def label_frame(frame: bytes) -> str: bytes.fromhex("07804040A07D"): "visible_40A0_family_40", bytes.fromhex("07808040A0BD"): "visible_40A0_family_80", bytes.fromhex("0780C040A0FD"): "visible_40A0_family_C0", + bytes.fromhex("07804020902D"): "visible_retry_0040_2090_candidate", bytes.fromhex("0780C060205D"): "visible_C0_6020_family_candidate", } label = labels.get(frame, "") diff --git a/h8536/serial_ack_probe.py b/h8536/serial_ack_probe.py new file mode 100644 index 0000000..99f873b --- /dev/null +++ b/h8536/serial_ack_probe.py @@ -0,0 +1,188 @@ +from __future__ import annotations + +import argparse +import sys +import time +from collections import Counter +from datetime import datetime +from pathlib import Path +from typing import TextIO + +from .bench_connect_lcd import ( + BenchLogger, + FrameDetector, + _import_serial, + _read_for, + _relay_command, + _relay_settle, + _send_frame, + _wait_for_ready, + format_frame, + frame_checksum_ok, + parse_frame, +) + + +DEFAULT_PRIME_FRAME = bytes.fromhex("01000000005B") +DEFAULT_TRIGGER_FRAME = bytes.fromhex("01000100005A") +DEFAULT_TARGET_FRAME = bytes.fromhex("07804020902D") +DEFAULT_ACK_FRAME = bytes.fromhex("05004000001F") + + +def default_log_path() -> Path: + return Path("captures") / f"serial-ack-probe-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt" + + +def build_arg_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description=( + "Reproduce the command-1 visible retry burst and send one candidate ACK " + "after a target device frame is observed." + ) + ) + parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP") + parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate") + parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port") + parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate") + parser.add_argument("--no-power-cycle", action="store_true", help="do not send relay off/on before the test") + parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power") + parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power") + parser.add_argument("--off-seconds", type=float, default=1.5, help="seconds to hold the DUT powered off") + parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the relay port") + parser.add_argument("--ready-timeout", type=float, default=10.0, help="seconds to wait for heartbeat before sending") + parser.add_argument("--ready-heartbeats", type=int, default=2, help="heartbeat frames to observe before sending") + parser.add_argument("--require-ready", action="store_true", help="abort if ready heartbeat count is not observed") + parser.add_argument("--pre-drain", type=float, default=0.250, help="seconds to drain/log RX before priming") + parser.add_argument("--prime-frame", type=parse_frame, default=DEFAULT_PRIME_FRAME, help="first host frame") + parser.add_argument("--prime-gap", type=float, default=1.25, help="seconds to listen after the prime frame") + parser.add_argument("--trigger-frame", type=parse_frame, default=DEFAULT_TRIGGER_FRAME, help="second host frame") + parser.add_argument("--target-frame", type=parse_frame, default=DEFAULT_TARGET_FRAME, help="device frame to wait for") + parser.add_argument("--target-timeout", type=float, default=3.0, help="seconds to wait for target after trigger") + parser.add_argument("--ack-frame", type=parse_frame, default=DEFAULT_ACK_FRAME, help="candidate ACK frame") + parser.add_argument("--ack-guard", type=float, default=0.020, help="seconds to wait after target before ACK") + parser.add_argument("--post-ack-read", type=float, default=3.0, help="seconds to listen after ACK") + parser.add_argument("--log", type=Path, help="capture log path") + parser.add_argument("--dry-run", action="store_true", help="print planned frames without opening serial ports") + return parser + + +def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int: + args = build_arg_parser().parse_args(argv) + log_path = args.log or default_log_path() + + if args.dry_run: + _print_plan(args, log_path, stdout) + return 0 + + serial = _import_serial() + logger = BenchLogger(log_path, stdout=stdout) + detector = FrameDetector() + try: + logger.emit("Serial ACK probe") + logger.emit(f"device={args.port} {args.baud} 8N1 relay={args.relay_port} {args.relay_baud}") + logger.emit(f"log={log_path}") + _emit_plan(args, logger) + + with serial.Serial(args.port, args.baud, bytesize=8, parity="N", stopbits=1, timeout=0.05) as device: + relay = None + try: + if not args.no_power_cycle: + relay = serial.Serial(args.relay_port, args.relay_baud, timeout=0.25) + _relay_settle(relay, args.relay_settle, logger) + _relay_command(relay, args.power_off_command, logger) + time.sleep(args.off_seconds) + device.reset_input_buffer() + detector = FrameDetector() + _relay_command(relay, args.power_on_command, logger) + else: + device.reset_input_buffer() + + ready = _wait_for_ready(device, detector, logger, args.ready_timeout, args.ready_heartbeats) + if args.require_ready and not ready: + logger.event("ABORT ready heartbeat threshold was not observed") + return 2 + if args.pre_drain > 0: + logger.event(f"DRAIN before ACK probe {args.pre_drain:.3f}s") + _read_for(device, detector, logger, args.pre_drain) + + _send_frame(device, args.prime_frame, logger, "prime") + _read_for(device, detector, logger, args.prime_gap) + + before_trigger = len(detector.frames) + _send_frame(device, args.trigger_frame, logger, "trigger") + target_seen = _wait_for_target(device, detector, logger, args.target_frame, args.target_timeout) + before_ack = len(detector.frames) + if not target_seen: + logger.event("TARGET_TIMEOUT no ACK sent") + _read_for(device, detector, logger, args.post_ack_read) + _summary(detector, logger, args.target_frame, before_trigger, before_ack, len(detector.frames)) + return 3 + + if args.ack_guard > 0: + logger.event(f"ACK guard {args.ack_guard:.3f}s") + _read_for(device, detector, logger, args.ack_guard) + before_ack = len(detector.frames) + _send_frame(device, args.ack_frame, logger, "ack") + _read_for(device, detector, logger, args.post_ack_read) + _summary(detector, logger, args.target_frame, before_trigger, before_ack, len(detector.frames)) + finally: + if relay is not None: + relay.close() + return 0 + finally: + logger.close() + + +def _wait_for_target(device, detector: FrameDetector, logger: BenchLogger, target: bytes, timeout_seconds: float) -> bool: + logger.event(f"WAIT target={format_frame(target)} timeout={timeout_seconds:.3f}s") + start_index = len(detector.frames) + deadline = time.monotonic() + max(0.0, timeout_seconds) + while time.monotonic() < deadline: + before = len(detector.frames) + _read_for(device, detector, logger, 0.050) + for frame in detector.frames[max(start_index, before) :]: + if frame == target: + logger.event(f"TARGET seen {format_frame(frame)}") + return True + return False + + +def _summary(detector: FrameDetector, logger: BenchLogger, target: bytes, trigger_start: int, ack_start: int, end: int) -> None: + trigger_frames = detector.frames[trigger_start:ack_start] + post_ack_frames = detector.frames[ack_start:end] + logger.emit() + logger.emit("Summary") + logger.emit(f"rx_frames={len(detector.frames)} trailing_unframed_bytes={len(detector.buffer)}") + logger.emit(f"target_before_ack={sum(1 for frame in trigger_frames if frame == target)}") + logger.emit(f"target_after_ack={sum(1 for frame in post_ack_frames if frame == target)}") + labels = Counter(label for label in detector.labels.elements()) + for label, count in sorted(labels.items()): + logger.emit(f"{label}={count}") + + +def _emit_plan(args: argparse.Namespace, logger: BenchLogger) -> None: + logger.emit(f"prime={format_frame(args.prime_frame)} checksum_ok={int(frame_checksum_ok(args.prime_frame))}") + logger.emit(f"trigger={format_frame(args.trigger_frame)} checksum_ok={int(frame_checksum_ok(args.trigger_frame))}") + logger.emit(f"target={format_frame(args.target_frame)} checksum_ok={int(frame_checksum_ok(args.target_frame))}") + logger.emit(f"ack={format_frame(args.ack_frame)} checksum_ok={int(frame_checksum_ok(args.ack_frame))}") + + +def _print_plan(args: argparse.Namespace, log_path: Path, stdout: TextIO) -> None: + print(f"device={args.port} {args.baud} 8N1", file=stdout) + print(f"relay={args.relay_port} {args.relay_baud}", file=stdout) + print(f"power_cycle={int(not args.no_power_cycle)}", file=stdout) + print(f"prime={format_frame(args.prime_frame)} checksum_ok={int(frame_checksum_ok(args.prime_frame))}", file=stdout) + print(f"trigger={format_frame(args.trigger_frame)} checksum_ok={int(frame_checksum_ok(args.trigger_frame))}", file=stdout) + print(f"target={format_frame(args.target_frame)} checksum_ok={int(frame_checksum_ok(args.target_frame))}", file=stdout) + print(f"ack={format_frame(args.ack_frame)} checksum_ok={int(frame_checksum_ok(args.ack_frame))}", file=stdout) + print(f"log={log_path}", file=stdout) + + +__all__ = [ + "DEFAULT_ACK_FRAME", + "DEFAULT_PRIME_FRAME", + "DEFAULT_TARGET_FRAME", + "DEFAULT_TRIGGER_FRAME", + "build_arg_parser", + "main", +] diff --git a/scripts/serial_ack_probe.py b/scripts/serial_ack_probe.py new file mode 100644 index 0000000..160d5bb --- /dev/null +++ b/scripts/serial_ack_probe.py @@ -0,0 +1,14 @@ +"""Bench ACK probe for the stateful H8/536 serial retry frame.""" + +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from h8536.serial_ack_probe import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_bench_connect_lcd.py b/tests/test_bench_connect_lcd.py index 3729eff..1d15a23 100644 --- a/tests/test_bench_connect_lcd.py +++ b/tests/test_bench_connect_lcd.py @@ -49,6 +49,9 @@ class BenchConnectLcdTest(unittest.TestCase): def test_label_frame_marks_real_bench_c0_6020_response(self): self.assertEqual(label_frame(bytes.fromhex("0780C060205D")), "visible_C0_6020_family_candidate") + def test_label_frame_marks_visible_retry_ack_target(self): + self.assertEqual(label_frame(bytes.fromhex("07804020902D")), "visible_retry_0040_2090_candidate") + if __name__ == "__main__": unittest.main()