From 443789d6ae4fcf728e329c4f8e7ebd0c18589331 Mon Sep 17 00:00:00 2001 From: Aiden <68633820+awils27@users.noreply.github.com> Date: Tue, 26 May 2026 00:48:28 +1000 Subject: [PATCH] bench test updates --- README.md | 9 + h8536/bench_connect_lcd.py | 88 ++++- h8536/serial_ack_probe.py | 1 + h8536/serial_scenario.py | 509 +++++++++++++++++++++++++ h8536/serial_table_dump.py | 1 + scenarios/ack-race-000-001.json | 48 +++ scenarios/early-ack-000-001.json | 41 ++ scenarios/table-sweep-ack-000-07f.json | 46 +++ scripts/serial_scenario.py | 16 + tests/test_bench_connect_lcd.py | 25 ++ tests/test_serial_scenario.py | 50 +++ 11 files changed, 832 insertions(+), 2 deletions(-) create mode 100644 h8536/serial_scenario.py create mode 100644 scenarios/ack-race-000-001.json create mode 100644 scenarios/early-ack-000-001.json create mode 100644 scenarios/table-sweep-ack-000-07f.json create mode 100644 scripts/serial_scenario.py create mode 100644 tests/test_serial_scenario.py diff --git a/README.md b/README.md index e576894..c5392ec 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,8 @@ To start the current emulator harness: .\.venv\Scripts\python.exe h8536_emulator_rx_probe.py --preset connect-lcd .\.venv\Scripts\python.exe scripts\bench_connect_lcd_sequence.py --port COM5 --relay-port COM6 --prompt-screen .\.venv\Scripts\python.exe scripts\serial_ack_probe.py --ack-frame "05 00 40 00 00 1F" +.\.venv\Scripts\python.exe scripts\serial_scenario.py scenarios\ack-race-000-001.json --log captures\ack-race-000-001.txt --result-json captures\ack-race-000-001-result.json +.\.venv\Scripts\python.exe scripts\serial_scenario.py scenarios\table-sweep-ack-000-07f.json --log captures\table-sweep-ack-000-07f.txt --result-json captures\table-sweep-ack-000-07f-result.json .\.venv\Scripts\python.exe h8536_emulator_bench_replay.py captures\bench-connect-lcd-sequence-20260525-214411.txt --assert-bench-parity ``` @@ -92,6 +94,8 @@ The real-device bench helper uses `pyserial`; install repo dependencies with `.\ - Includes an RX command probe that boots until SCI1 RXI is serviceable, injects host six-byte frames through RDR/RDRF, can optionally schedule 38400 8N1 byte arrivals at real UART spacing, listens for device TX frames, and reports serial latch/table/LCD-buffer and emulated-LCD effects. - Includes a bench helper for replaying the emulator-derived CONNECT LCD frame sequence against the real device through COM5, with optional COM6 relay power cycling and timestamped capture logs. - Includes a bench ACK probe that reproduces the `01 00 00...` -> `01 00 01...` visible retry burst, waits for `07 80 40 20 90 2D`, then sends a candidate command-5 ACK and reports whether the target keeps repeating. +- Includes a checksum-resynchronizing bench receiver that scans RX byte streams for valid six-byte frames, avoids common shifted-heartbeat false locks, and can fall back to the old fixed six-byte slicer with `--sync fixed`. +- Includes a JSON scenario bench runner for repeatable multi-step serial tests, including low-latency ACK-aware command-1 probes that can send the current command-5 ACK candidate immediately after the retry frame appears, with explicit max-ACK/max-target guardrails. - Includes a bench-log replay harness that feeds recorded host TX frames back into the ROM emulator with bench-style UART byte timing by default and asserts parity against the real device's observed response/LCD state. Current serial observations: @@ -227,6 +231,9 @@ python h8536_emulator_rx_probe.py --help - `h8536_emulator_rx_probe.py --uart-timing --uart-baud 38400 "04 00 00 80 00"`: inject all six host bytes with 8N1 wire spacing of about 260 us per byte, letting RXI/TXI/timers interleave; if the ROM has not cleared `RDRF` before the next byte, the SCI model raises `ORER`. - `h8536_emulator_rx_probe.py --preset connect-lcd`: replay the current CONNECT LCD activation candidates. - `scripts\serial_table_dump.py --port COM5 --relay-port COM6 --start 0x000 --count 0x200 --log captures\table-read.txt`: read-only command-1 sweep of the firmware-exposed serial table state for EEPROM/shadow inference. +- `scripts\serial_scenario.py scenarios\ack-race-000-001.json --log captures\ack-race-000-001.txt --result-json captures\ack-race-000-001-result.json`: run the focused `0x000 -> 0x001` retry probe with immediate reactive ACK and a 2 ms poll interval, to test whether command 5 can arrive before the second `07 80 40 20 90 2D` retry. +- `scripts\serial_scenario.py scenarios\early-ack-000-001.json --log captures\early-ack-000-001.txt --result-json captures\early-ack-000-001-result.json`: send the same command-1 pair, then send command-5 ACK immediately without waiting for the retry frame. +- `scripts\serial_scenario.py scenarios\table-sweep-ack-000-07f.json --log captures\table-sweep-ack-000-07f.txt --result-json captures\table-sweep-ack-000-07f-result.json`: run a repeatable bench scenario that sweeps selectors `0x000-0x07F` and sends `05 00 40 00 00 1F` only after `07 80 40 20 90 2D` appears. The checked-in scenario stops if it reaches 8 ACKs or 32 target hits. Use `--sync fixed` only when comparing against the old non-resyncing receiver. - `scripts\bench_connect_lcd_sequence.py --port COM5 --relay-port COM6 --prompt-screen`: power-cycle the bench device, wait for heartbeat readiness, send `04 00 00 40 00 1E`, `04 00 00 80 00 DE`, `04 00 00 C0 00 9E`, log RX/TX, and prompt for observed LCD text. - `h8536_emulator_bench_replay.py captures\bench-connect-lcd-sequence-20260525-214411.txt --assert-bench-parity`: replay a real bench log into the emulator using timed UART RX by default and intentionally fail while any response/LCD state still diverges from the bench-observed `CONNECT NOT ACT` plus `07 80 C0 60 20 5D` path. Pass `--polite-rx` for the old wait-until-consumed injection mode. - Current status: boots from `H'1000`, initializes SCI1, models the traced X24164 EEPROM bus on P9, captures P9 byte candidates, can optionally fast-path known P9 EEPROM routines, schedules FRT1/FRT2 OCIA from timer registers and `--clock-hz`, captures the ROM-driven LCD line ` CONNECT:NOT ACT`, and emits the observed heartbeat frame `00 00 00 00 80 DA`. @@ -256,6 +263,7 @@ python h8536_emulator_rx_probe.py --help - `h8536/serial_pseudocode.py`: focused RX/TX protocol pseudocode generation from reconstruction metadata. - `h8536/protocol_trace.py`: raw six-byte protocol frame decoder/checksum validator. - `h8536/protocol_capture.py`: timestamped serial capture parser, frame recombiner, and cadence/gate-session analyzer. +- `h8536/serial_scenario.py`: JSON-driven bench scenario engine shared by real-device serial scripts. - `h8536/serial_gate.py`: autonomous TX gate/queue state-machine reconstruction. - `h8536/report_source_trace.py`: direct `loc_3E54` report enqueue source tracer. - `h8536/table_xrefs.py`: table/index xrefs and LCD correlation report generation. @@ -274,3 +282,4 @@ python h8536_emulator_rx_probe.py --help - `h8536_emulator.py`, `h8536_emulator_probe.py`, `h8536_emulator_rx_probe.py`, `h8536_emulator_bench_replay.py`: emulator CLI wrappers. - `scripts/bench_connect_lcd_sequence.py`: real-device COM5/COM6 bench runner for the CONNECT LCD sequence. - `scripts/serial_table_dump.py`: read-only COM5/COM6 command-1 table sweep for inferring live EEPROM-backed parameter state. +- `scripts/serial_scenario.py`: JSON-driven COM5/COM6 bench scenario runner for chained probes, waits, read sweeps, and ACK-on-target experiments. diff --git a/h8536/bench_connect_lcd.py b/h8536/bench_connect_lcd.py index e6cae60..4b9c252 100644 --- a/h8536/bench_connect_lcd.py +++ b/h8536/bench_connect_lcd.py @@ -23,12 +23,22 @@ COMMAND7_REPEAT_FRAME = bytes.fromhex("07000000005D") @dataclass class FrameDetector: + sync_mode: str = "checksum" buffer: bytearray = field(default_factory=bytearray) frames: list[bytes] = field(default_factory=list) labels: Counter[str] = field(default_factory=Counter) + dropped_bytes: int = 0 + resync_events: int = 0 def feed(self, data: bytes) -> list[tuple[bytes, str]]: self.buffer.extend(data) + if self.sync_mode == "fixed": + return self._feed_fixed() + if self.sync_mode != "checksum": + raise ValueError(f"unknown frame sync mode {self.sync_mode!r}") + return self._feed_checksum_resync() + + def _feed_fixed(self) -> list[tuple[bytes, str]]: detected = [] while len(self.buffer) >= FRAME_LENGTH: frame = bytes(self.buffer[:FRAME_LENGTH]) @@ -40,6 +50,35 @@ class FrameDetector: detected.append((frame, label)) return detected + def _feed_checksum_resync(self) -> list[tuple[bytes, str]]: + detected = [] + while len(self.buffer) >= FRAME_LENGTH: + offset = _next_sync_offset(self.buffer) + if offset is None: + self._drop_unsynced_prefix(len(self.buffer) - (FRAME_LENGTH - 1)) + break + if offset: + self._drop_unsynced_prefix(offset) + frame = bytes(self.buffer[:FRAME_LENGTH]) + if not frame_checksum_ok(frame): + self._drop_unsynced_prefix(1) + continue + del self.buffer[:FRAME_LENGTH] + label = label_frame(frame) + self.frames.append(frame) + if label: + self.labels[label] += 1 + detected.append((frame, label)) + return detected + + def _drop_unsynced_prefix(self, count: int) -> None: + count = max(0, min(count, len(self.buffer))) + if not count: + return + del self.buffer[:count] + self.dropped_bytes += count + self.resync_events += 1 + class BenchLogger: def __init__(self, path: Path, stdout: TextIO = sys.stdout) -> None: @@ -113,10 +152,46 @@ def label_frame(frame: bytes) -> str: if label: return label if frame_checksum_ok(frame): + if frame[0] == 0x04: + return "table_readback_candidate" + if frame[0] == 0x07: + return "visible_report_candidate" return "checksum_ok_unlabeled" return "checksum_bad_or_unaligned" +def _next_sync_offset(buffer: bytearray) -> int | None: + scored_offsets: list[tuple[int, int]] = [] + for offset in range(0, len(buffer) - FRAME_LENGTH + 1): + frame = bytes(buffer[offset : offset + FRAME_LENGTH]) + if not frame_checksum_ok(frame): + continue + if offset == 0 and not _looks_like_shifted_heartbeat(frame): + return 0 + label = label_frame(frame) + scored_offsets.append((_sync_score(frame, label), offset)) + if not scored_offsets: + return None + return min(scored_offsets)[1] + + +def _sync_score(frame: bytes, label: str) -> int: + if label and label not in {"checksum_ok_unlabeled", "checksum_bad_or_unaligned"}: + return 0 + if frame[0] in {0x00, 0x02, 0x04, 0x07}: + return 100 + return 200 + + +def _looks_like_shifted_heartbeat(frame: bytes) -> bool: + return frame in { + bytes.fromhex("00000080DA00"), + bytes.fromhex("000080DA0000"), + bytes.fromhex("0080DA000000"), + bytes.fromhex("80DA00000000"), + } + + def default_log_path() -> Path: return Path("captures") / f"bench-connect-lcd-sequence-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt" @@ -141,6 +216,7 @@ def build_arg_parser() -> argparse.ArgumentParser: parser.add_argument("--post-sequence-read", type=float, default=3.0, help="seconds to listen after the sequence") parser.add_argument("--repeat", type=int, default=1, help="times to send the frame sequence in the same power session") parser.add_argument("--frame", action="append", type=parse_frame, help="override preset with a custom frame; repeatable") + parser.add_argument("--sync", choices=("checksum", "fixed"), default="checksum", help="RX frame sync strategy") parser.add_argument("--two-frame", action="store_true", help="send only the first two CONNECT candidate frames") parser.add_argument("--command7-after", action="store_true", help="send command-7 repeat probe after the sequence") parser.add_argument("--pre-sequence-drain", type=float, default=0.250, help="seconds to drain/log RX immediately before sending") @@ -169,7 +245,7 @@ def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int: serial = _import_serial() logger = BenchLogger(log_path, stdout=stdout) - detector = FrameDetector() + detector = FrameDetector(sync_mode=args.sync) try: logger.emit("CONNECT LCD bench sequence") logger.emit(f"device={args.port} {args.baud} 8N1 relay={args.relay_port} {args.relay_baud}") @@ -186,7 +262,7 @@ def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int: _relay_command(relay, args.power_off_command, logger) time.sleep(args.off_seconds) device.reset_input_buffer() - detector = FrameDetector() + detector = FrameDetector(sync_mode=args.sync) _relay_command(relay, args.power_on_command, logger) else: device.reset_input_buffer() @@ -256,9 +332,16 @@ def _read_for(device, detector: FrameDetector, logger: BenchLogger, seconds: flo waiting = getattr(device, "in_waiting", 0) data = device.read(waiting or 1) if data: + dropped_before = detector.dropped_bytes logger.chunk("RX", data) for frame, label in detector.feed(data): logger.event(f"DETECT {label} {format_frame(frame)}") + dropped_now = detector.dropped_bytes - dropped_before + if dropped_now: + logger.event( + f"RESYNC dropped_bytes={dropped_now} total_dropped={detector.dropped_bytes} " + f"buffered={len(detector.buffer)}" + ) def _wait_for_ready( @@ -310,6 +393,7 @@ def _summary(detector: FrameDetector, logger: BenchLogger) -> None: logger.emit() logger.emit("Summary") logger.emit(f"rx_frames={len(detector.frames)} trailing_unframed_bytes={len(detector.buffer)}") + logger.emit(f"resync_events={detector.resync_events} dropped_bytes={detector.dropped_bytes}") for label, count in sorted(detector.labels.items()): logger.emit(f"{label}={count}") diff --git a/h8536/serial_ack_probe.py b/h8536/serial_ack_probe.py index 99f873b..4aac130 100644 --- a/h8536/serial_ack_probe.py +++ b/h8536/serial_ack_probe.py @@ -153,6 +153,7 @@ def _summary(detector: FrameDetector, logger: BenchLogger, target: bytes, trigge logger.emit() logger.emit("Summary") logger.emit(f"rx_frames={len(detector.frames)} trailing_unframed_bytes={len(detector.buffer)}") + logger.emit(f"resync_events={detector.resync_events} dropped_bytes={detector.dropped_bytes}") logger.emit(f"target_before_ack={sum(1 for frame in trigger_frames if frame == target)}") logger.emit(f"target_after_ack={sum(1 for frame in post_ack_frames if frame == target)}") labels = Counter(label for label in detector.labels.elements()) diff --git a/h8536/serial_scenario.py b/h8536/serial_scenario.py new file mode 100644 index 0000000..85c1b05 --- /dev/null +++ b/h8536/serial_scenario.py @@ -0,0 +1,509 @@ +from __future__ import annotations + +import argparse +import json +import sys +import time +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any, TextIO + +from .bench_connect_lcd import ( + BenchLogger, + FrameDetector, + _import_serial, + _read_for, + _relay_command, + _relay_settle, + _send_frame, + _wait_for_ready, + format_frame, + frame_checksum_ok, + parse_frame, +) +from .serial_table_dump import build_read_frame, decode_table_read_response + + +DEFAULT_ACK_TARGET = bytes.fromhex("07804020902D") +DEFAULT_ACK_FRAME = bytes.fromhex("05004000001F") + + +@dataclass +class ScenarioContext: + args: argparse.Namespace + logger: BenchLogger + detector: FrameDetector + device: Any + relay: Any | None = None + table_rows: list[dict[str, Any]] = field(default_factory=list) + target_counts: dict[str, int] = field(default_factory=dict) + tx_records: list[dict[str, Any]] = field(default_factory=list) + ack_sent: int = 0 + abort_requested: bool = False + + +def default_log_path(scenario: dict[str, Any]) -> Path: + name = str(scenario.get("name") or "serial-scenario").strip() or "serial-scenario" + safe_name = "".join(char if char.isalnum() or char in "-_" else "-" for char in name) + return Path("captures") / f"{safe_name}-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt" + + +def build_arg_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Run JSON-described serial bench scenarios against the real RCP." + ) + parser.add_argument("scenario", type=Path, help="JSON scenario file") + parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP") + parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate") + parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port") + parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate") + parser.add_argument("--no-power-cycle", action="store_true", help="skip power_cycle actions") + parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power") + parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power") + parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the relay port") + parser.add_argument("--sync", choices=("checksum", "fixed"), default="checksum", help="RX frame sync strategy") + parser.add_argument("--log", type=Path, help="capture log path") + parser.add_argument("--result-json", type=Path, help="write machine-readable scenario summary") + parser.add_argument("--dry-run", action="store_true", help="print the plan without opening serial ports") + return parser + + +def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int: + args = build_arg_parser().parse_args(argv) + scenario = load_scenario(args.scenario) + log_path = args.log or default_log_path(scenario) + + if args.dry_run: + _print_dry_run(args, scenario, log_path, stdout) + return 0 + + serial = _import_serial() + logger = BenchLogger(log_path, stdout=stdout) + detector = FrameDetector(sync_mode=args.sync) + try: + logger.emit("Serial bench scenario") + logger.emit(f"name={scenario.get('name', args.scenario.stem)}") + logger.emit(f"device={args.port} {args.baud} 8N1 relay={args.relay_port} {args.relay_baud} sync={args.sync}") + logger.emit(f"log={log_path}") + with serial.Serial(args.port, args.baud, bytesize=8, parity="N", stopbits=1, timeout=0.05) as device: + ctx = ScenarioContext(args=args, logger=logger, detector=detector, device=device) + try: + for index, step in enumerate(_scenario_steps(scenario), start=1): + if ctx.abort_requested: + logger.event("SCENARIO_ABORT requested by prior step") + break + action, spec = _normalize_step(step) + logger.event(f"STEP {index} {action}") + _run_step(ctx, action, spec) + finally: + if ctx.relay is not None: + ctx.relay.close() + _emit_summary(ctx, logger) + if args.result_json: + _write_result_json(args.result_json, scenario, log_path, ctx) + return 0 + finally: + logger.close() + + +def load_scenario(path: Path) -> dict[str, Any]: + with path.open("r", encoding="utf-8") as handle: + scenario = json.load(handle) + if not isinstance(scenario, dict): + raise SystemExit("scenario root must be a JSON object") + if not isinstance(scenario.get("steps"), list): + raise SystemExit("scenario must contain a steps array") + return scenario + + +def _scenario_steps(scenario: dict[str, Any]) -> list[Any]: + steps = scenario.get("steps", []) + if not isinstance(steps, list): + raise SystemExit("scenario steps must be an array") + return steps + + +def _normalize_step(step: Any) -> tuple[str, dict[str, Any]]: + if isinstance(step, str): + return step, {} + if not isinstance(step, dict): + raise SystemExit(f"invalid scenario step {step!r}") + if "action" in step: + spec = dict(step) + action = str(spec.pop("action")) + return action, spec + if len(step) == 1: + action, value = next(iter(step.items())) + if value is None: + return str(action), {} + if isinstance(value, dict): + return str(action), dict(value) + return str(action), {"value": value} + raise SystemExit(f"scenario step needs an action: {step!r}") + + +def _run_step(ctx: ScenarioContext, action: str, spec: dict[str, Any]) -> None: + if action == "power_cycle": + _step_power_cycle(ctx, spec) + elif action == "wait_ready": + ready = _wait_for_ready( + ctx.device, + ctx.detector, + ctx.logger, + float(spec.get("timeout", 10.0)), + int(spec.get("heartbeats", 2)), + ) + if spec.get("require", False) and not ready: + raise SystemExit(2) + elif action in {"drain", "listen", "wait"}: + _listen(ctx, float(spec.get("seconds", spec.get("value", 0.0)))) + elif action == "send": + frame = _parse_required_frame(spec.get("frame")) + label = str(spec.get("label", "send")) + _send_and_record(ctx, frame, label) + if float(spec.get("listen", 0.0)) > 0: + _listen(ctx, float(spec.get("listen", 0.0))) + elif action == "wait_for": + _step_wait_for(ctx, spec) + elif action == "table_sweep": + _step_table_sweep(ctx, spec) + elif action == "repeat": + _step_repeat(ctx, spec) + else: + raise SystemExit(f"unknown scenario action {action!r}") + + +def _step_power_cycle(ctx: ScenarioContext, spec: dict[str, Any]) -> None: + if ctx.args.no_power_cycle: + ctx.logger.event("POWER_CYCLE skipped by --no-power-cycle") + ctx.device.reset_input_buffer() + ctx.detector = FrameDetector(sync_mode=ctx.args.sync) + return + serial = _import_serial() + if ctx.relay is None: + ctx.relay = serial.Serial(ctx.args.relay_port, ctx.args.relay_baud, timeout=0.25) + _relay_settle(ctx.relay, float(spec.get("relay_settle", ctx.args.relay_settle)), ctx.logger) + off_command = str(spec.get("off_command", ctx.args.power_off_command)) + on_command = str(spec.get("on_command", ctx.args.power_on_command)) + _relay_command(ctx.relay, off_command, ctx.logger) + time.sleep(float(spec.get("off_seconds", 1.5))) + ctx.device.reset_input_buffer() + ctx.detector = FrameDetector(sync_mode=ctx.args.sync) + _relay_command(ctx.relay, on_command, ctx.logger) + + +def _step_wait_for(ctx: ScenarioContext, spec: dict[str, Any]) -> None: + targets = _parse_frame_list(spec.get("frames", spec.get("frame"))) + timeout = float(spec.get("timeout", 1.0)) + require = bool(spec.get("require", False)) + ctx.logger.event( + "WAIT_FOR " + + ",".join(format_frame(frame) for frame in targets) + + f" timeout={timeout:.3f}s" + ) + found = _listen_until(ctx, timeout, targets) + if require and not found: + raise SystemExit(3) + + +def _step_repeat(ctx: ScenarioContext, spec: dict[str, Any]) -> None: + count = max(0, int(spec.get("count", 1))) + steps = spec.get("steps", []) + if not isinstance(steps, list): + raise SystemExit("repeat step requires a steps array") + for repeat_index in range(count): + ctx.logger.event(f"REPEAT {repeat_index + 1}/{count}") + for step in steps: + action, child_spec = _normalize_step(step) + _run_step(ctx, action, child_spec) + + +def _step_table_sweep(ctx: ScenarioContext, spec: dict[str, Any]) -> None: + selectors = _selector_list(spec) + gap = float(spec.get("gap", 0.080)) + ack = _ack_config(spec.get("ack_on", {})) + ctx.logger.event( + f"TABLE_SWEEP selectors={len(selectors)} gap={gap:.3f}s " + f"ack_targets={len(ack['targets'])} ack_frame={format_frame(ack['frame'])}" + ) + for selector in selectors: + if ctx.abort_requested: + ctx.logger.event("TABLE_SWEEP_ABORT stopping before next selector") + break + frame = build_read_frame(selector) + ctx.logger.event(f"READ selector=0x{selector:03X} frame={format_frame(frame)}") + _send_and_record(ctx, frame, f"read_0x{selector:03X}") + _listen_with_ack(ctx, gap, selector, ack) + + +def _ack_config(raw: Any) -> dict[str, Any]: + spec = raw if isinstance(raw, dict) else {} + targets = _parse_frame_list(spec.get("frames", spec.get("frame", DEFAULT_ACK_TARGET))) + return { + "targets": set(targets), + "frame": _parse_optional_frame(spec.get("ack_frame"), DEFAULT_ACK_FRAME), + "guard": float(spec.get("ack_guard", 0.020)), + "poll_interval": float(spec.get("poll_interval", 0.005)), + "post_read": float(spec.get("post_ack_read", 0.250)), + "once_per_selector": bool(spec.get("once_per_selector", True)), + "enabled": bool(spec.get("enabled", True)), + "max_acks": _optional_int(spec.get("max_acks")), + "max_target_hits": _optional_int(spec.get("max_target_hits")), + "abort_on_limit": bool(spec.get("abort_on_limit", True)), + } + + +def _listen_with_ack( + ctx: ScenarioContext, + seconds: float, + selector: int, + ack: dict[str, Any], +) -> list[bytes]: + deadline = time.monotonic() + max(0.0, seconds) + observed: list[bytes] = [] + acked_targets: set[bytes] = set() + while time.monotonic() < deadline: + frames = _read_available(ctx, selector=selector) + observed.extend(frames) + if not frames: + sleep_for = min(max(0.001, ack["poll_interval"]), max(0.0, deadline - time.monotonic())) + if sleep_for > 0: + time.sleep(sleep_for) + continue + if not ack["enabled"]: + continue + for frame in frames: + if frame not in ack["targets"]: + continue + _count_target(ctx, frame) + if _ack_limit_reached(ctx, ack): + ctx.logger.event("ACK_LIMIT reached before ACK send") + if ack["abort_on_limit"]: + ctx.abort_requested = True + return observed + continue + if ack["once_per_selector"] and frame in acked_targets: + continue + acked_targets.add(frame) + if ack["guard"] > 0: + observed.extend(_listen(ctx, ack["guard"], selector=selector)) + _send_and_record(ctx, ack["frame"], "ack") + ctx.ack_sent += 1 + if _ack_limit_reached(ctx, ack): + ctx.logger.event("ACK_LIMIT reached after ACK send") + if ack["abort_on_limit"]: + ctx.abort_requested = True + if ack["post_read"] > 0: + observed.extend(_listen(ctx, ack["post_read"], selector=selector)) + if ctx.abort_requested: + return observed + return observed + + +def _listen_until(ctx: ScenarioContext, seconds: float, targets: set[bytes]) -> bool: + deadline = time.monotonic() + max(0.0, seconds) + while time.monotonic() < deadline: + interval = min(0.050, max(0.0, deadline - time.monotonic())) + if interval <= 0: + break + for frame in _listen(ctx, interval): + if frame in targets: + ctx.logger.event(f"WAIT_FOR_MATCH {format_frame(frame)}") + return True + return False + + +def _listen(ctx: ScenarioContext, seconds: float, *, selector: int | None = None) -> list[bytes]: + before = len(ctx.detector.frames) + _read_for(ctx.device, ctx.detector, ctx.logger, seconds) + frames = ctx.detector.frames[before:] + _record_table_rows(ctx, frames, selector) + return frames + + +def _read_available(ctx: ScenarioContext, *, selector: int | None = None) -> list[bytes]: + waiting = getattr(ctx.device, "in_waiting", 0) + if not waiting: + return [] + dropped_before = ctx.detector.dropped_bytes + data = ctx.device.read(waiting) + if not data: + return [] + ctx.logger.chunk("RX", data) + detected = ctx.detector.feed(data) + for frame, label in detected: + ctx.logger.event(f"DETECT {label} {format_frame(frame)}") + dropped_now = ctx.detector.dropped_bytes - dropped_before + if dropped_now: + ctx.logger.event( + f"RESYNC dropped_bytes={dropped_now} total_dropped={ctx.detector.dropped_bytes} " + f"buffered={len(ctx.detector.buffer)}" + ) + frames = [frame for frame, _label in detected] + _record_table_rows(ctx, frames, selector) + return frames + + +def _record_table_rows(ctx: ScenarioContext, frames: list[bytes], selector: int | None) -> None: + for frame in frames: + decoded = decode_table_read_response(frame) + if decoded is None or selector is None: + continue + echo, value = decoded + row = { + "selector": selector, + "echo": echo, + "value": value, + "frame": format_frame(frame), + } + ctx.table_rows.append(row) + ctx.logger.event(f"TABLE selector=0x{selector:03X} echo={echo:02X} value={value:04X}") + + +def _send_and_record(ctx: ScenarioContext, frame: bytes, label: str) -> None: + _send_frame(ctx.device, frame, ctx.logger, label) + ctx.tx_records.append( + { + "label": label, + "frame": format_frame(frame), + "checksum_ok": frame_checksum_ok(frame), + } + ) + + +def _count_target(ctx: ScenarioContext, frame: bytes) -> None: + text = format_frame(frame) + ctx.target_counts[text] = ctx.target_counts.get(text, 0) + 1 + ctx.logger.event(f"ACK_TARGET {text} count={ctx.target_counts[text]}") + + +def _selector_list(spec: dict[str, Any]) -> list[int]: + if "selectors" in spec: + raw_selectors = spec["selectors"] + if not isinstance(raw_selectors, list): + raise SystemExit("table_sweep selectors must be an array") + return [_int_value(selector) & 0x01FF for selector in raw_selectors] + start = _int_value(spec.get("start", 0)) + count = max(0, _int_value(spec.get("count", 0x80))) + return [((start + offset) & 0x01FF) for offset in range(count)] + + +def _parse_frame_list(raw: Any) -> set[bytes]: + if raw is None: + return set() + values = raw if isinstance(raw, list) else [raw] + return {_parse_required_frame(value) for value in values} + + +def _parse_required_frame(raw: Any) -> bytes: + if raw is None: + raise SystemExit("frame is required") + if isinstance(raw, bytes): + return raw + if not isinstance(raw, str): + raise SystemExit(f"frame must be a hex string, got {raw!r}") + return parse_frame(raw) + + +def _parse_optional_frame(raw: Any, default: bytes) -> bytes: + if raw is None: + return default + return _parse_required_frame(raw) + + +def _int_value(raw: Any) -> int: + if isinstance(raw, int): + return raw + if isinstance(raw, str): + return int(raw, 0) + raise SystemExit(f"expected integer value, got {raw!r}") + + +def _optional_int(raw: Any) -> int | None: + if raw is None: + return None + return _int_value(raw) + + +def _ack_limit_reached(ctx: ScenarioContext, ack: dict[str, Any]) -> bool: + max_acks = ack.get("max_acks") + if max_acks is not None and ctx.ack_sent >= max_acks: + return True + max_target_hits = ack.get("max_target_hits") + if max_target_hits is not None and sum(ctx.target_counts.values()) >= max_target_hits: + return True + return False + + +def _print_dry_run(args: argparse.Namespace, scenario: dict[str, Any], log_path: Path, stdout: TextIO) -> None: + print(f"scenario={scenario.get('name', args.scenario.stem)}", file=stdout) + print(f"device={args.port} {args.baud} 8N1", file=stdout) + print(f"relay={args.relay_port} {args.relay_baud}", file=stdout) + print(f"sync={args.sync}", file=stdout) + print(f"log={log_path}", file=stdout) + for index, step in enumerate(_scenario_steps(scenario), start=1): + action, spec = _normalize_step(step) + print(f"step[{index}]={action}", file=stdout) + if action == "send": + frame = _parse_required_frame(spec.get("frame")) + print(f" frame={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}", file=stdout) + elif action == "table_sweep": + selectors = _selector_list(spec) + ack = _ack_config(spec.get("ack_on", {})) + if selectors: + first = selectors[0] + last = selectors[-1] + print(f" selectors={len(selectors)} first=0x{first:03X} last=0x{last:03X}", file=stdout) + else: + print(" selectors=0", file=stdout) + print(f" gap={float(spec.get('gap', 0.080)):.3f}", file=stdout) + for target in sorted(ack["targets"]): + print(f" ack_target={format_frame(target)}", file=stdout) + print(f" ack_frame={format_frame(ack['frame'])}", file=stdout) + print(f" max_acks={ack['max_acks']} max_target_hits={ack['max_target_hits']}", file=stdout) + + +def _emit_summary(ctx: ScenarioContext, logger: BenchLogger) -> None: + logger.emit() + logger.emit("Summary") + logger.emit(f"rx_frames={len(ctx.detector.frames)} trailing_unframed_bytes={len(ctx.detector.buffer)}") + logger.emit(f"resync_events={ctx.detector.resync_events} dropped_bytes={ctx.detector.dropped_bytes}") + logger.emit(f"tx_frames={len(ctx.tx_records)} ack_sent={ctx.ack_sent} table_response_rows={len(ctx.table_rows)}") + logger.emit(f"abort_requested={int(ctx.abort_requested)}") + for target, count in sorted(ctx.target_counts.items()): + logger.emit(f"ack_target {target}={count}") + for label, count in sorted(ctx.detector.labels.items()): + logger.emit(f"{label}={count}") + for row in ctx.table_rows: + logger.emit( + f"table selector=0x{row['selector']:03X} echo=0x{row['echo']:02X} value=0x{row['value']:04X}" + ) + + +def _write_result_json(path: Path, scenario: dict[str, Any], log_path: Path, ctx: ScenarioContext) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + result = { + "scenario": scenario.get("name", ""), + "log": str(log_path), + "rx_frames": len(ctx.detector.frames), + "trailing_unframed_bytes": len(ctx.detector.buffer), + "resync_events": ctx.detector.resync_events, + "dropped_bytes": ctx.detector.dropped_bytes, + "labels": dict(ctx.detector.labels), + "tx_frames": ctx.tx_records, + "ack_sent": ctx.ack_sent, + "abort_requested": ctx.abort_requested, + "ack_targets": ctx.target_counts, + "table_rows": ctx.table_rows, + } + path.write_text(json.dumps(result, indent=2, sort_keys=True) + "\n", encoding="utf-8") + + +__all__ = [ + "DEFAULT_ACK_FRAME", + "DEFAULT_ACK_TARGET", + "build_arg_parser", + "load_scenario", + "main", +] diff --git a/h8536/serial_table_dump.py b/h8536/serial_table_dump.py index 1514208..860b9e5 100644 --- a/h8536/serial_table_dump.py +++ b/h8536/serial_table_dump.py @@ -185,6 +185,7 @@ def _summary( logger.emit() logger.emit("Summary") logger.emit(f"rx_frames={len(detector.frames)} table_response_rows={len(table_rows)}") + logger.emit(f"trailing_unframed_bytes={len(detector.buffer)} resync_events={detector.resync_events} dropped_bytes={detector.dropped_bytes}") for selector, value in table_rows: logger.emit(f"table selector=0x{selector:03X} value=0x{value:04X}") diff --git a/scenarios/ack-race-000-001.json b/scenarios/ack-race-000-001.json new file mode 100644 index 0000000..d178190 --- /dev/null +++ b/scenarios/ack-race-000-001.json @@ -0,0 +1,48 @@ +{ + "name": "ack-race-000-001", + "notes": [ + "Focused command-1 pair that has produced the 07 80 40 20 90 2D retry frame on the bench.", + "Uses immediate reactive ACK with a 2 ms poll interval to test whether command 5 can beat the second retry." + ], + "steps": [ + { + "action": "power_cycle", + "off_seconds": 1.5 + }, + { + "action": "wait_ready", + "timeout": 10.0, + "heartbeats": 2, + "require": true + }, + { + "action": "drain", + "seconds": 0.25 + }, + { + "action": "table_sweep", + "selectors": [ + "0x000", + "0x001" + ], + "gap": 1.0, + "ack_on": { + "frames": [ + "07 80 40 20 90 2D" + ], + "ack_frame": "05 00 40 00 00 1F", + "ack_guard": 0.0, + "poll_interval": 0.002, + "post_ack_read": 1.0, + "once_per_selector": true, + "max_acks": 4, + "max_target_hits": 8, + "abort_on_limit": true + } + }, + { + "action": "listen", + "seconds": 2.0 + } + ] +} diff --git a/scenarios/early-ack-000-001.json b/scenarios/early-ack-000-001.json new file mode 100644 index 0000000..b0f2d1c --- /dev/null +++ b/scenarios/early-ack-000-001.json @@ -0,0 +1,41 @@ +{ + "name": "early-ack-000-001", + "notes": [ + "Manual early-ACK variant: send the command-1 pair and immediately send command-5 ACK instead of waiting for the retry frame.", + "This tests whether the ACK has to arrive before the ROM emits the second retry." + ], + "steps": [ + { + "action": "power_cycle", + "off_seconds": 1.5 + }, + { + "action": "wait_ready", + "timeout": 10.0, + "heartbeats": 2, + "require": true + }, + { + "action": "drain", + "seconds": 0.25 + }, + { + "action": "send", + "label": "read_0x000", + "frame": "01 00 00 00 00 5B", + "listen": 0.75 + }, + { + "action": "send", + "label": "read_0x001", + "frame": "01 00 01 00 00 5A", + "listen": 0.0 + }, + { + "action": "send", + "label": "early_ack", + "frame": "05 00 40 00 00 1F", + "listen": 2.0 + } + ] +} diff --git a/scenarios/table-sweep-ack-000-07f.json b/scenarios/table-sweep-ack-000-07f.json new file mode 100644 index 0000000..565968b --- /dev/null +++ b/scenarios/table-sweep-ack-000-07f.json @@ -0,0 +1,46 @@ +{ + "name": "table-sweep-ack-000-07f", + "notes": [ + "Read-only command-1 sweep of live E000 table selectors 0x000-0x07F.", + "If the visible retry frame 07 80 40 20 90 2D appears, send command-5 ACK 05 00 40 00 00 1F." + ], + "steps": [ + { + "action": "power_cycle", + "off_seconds": 1.5 + }, + { + "action": "wait_ready", + "timeout": 10.0, + "heartbeats": 2, + "require": true + }, + { + "action": "drain", + "seconds": 0.25 + }, + { + "action": "table_sweep", + "start": "0x000", + "count": "0x080", + "gap": 0.75, + "ack_on": { + "frames": [ + "07 80 40 20 90 2D" + ], + "ack_frame": "05 00 40 00 00 1F", + "ack_guard": 0.0, + "poll_interval": 0.002, + "post_ack_read": 0.35, + "once_per_selector": true, + "max_acks": 8, + "max_target_hits": 32, + "abort_on_limit": true + } + }, + { + "action": "listen", + "seconds": 2.0 + } + ] +} diff --git a/scripts/serial_scenario.py b/scripts/serial_scenario.py new file mode 100644 index 0000000..8f933ca --- /dev/null +++ b/scripts/serial_scenario.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python3 +"""Run JSON-described serial bench scenarios.""" + +import sys +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from h8536.serial_scenario import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_bench_connect_lcd.py b/tests/test_bench_connect_lcd.py index 1d15a23..d6866b6 100644 --- a/tests/test_bench_connect_lcd.py +++ b/tests/test_bench_connect_lcd.py @@ -43,9 +43,34 @@ class BenchConnectLcdTest(unittest.TestCase): ], ) + def test_detector_resyncs_to_checksum_valid_frame(self): + detector = FrameDetector() + + detected = detector.feed(bytes.fromhex("FF0000000080DA")) + + self.assertEqual([(format_frame(frame), label) for frame, label in detected], [ + ("00 00 00 00 80 DA", "heartbeat") + ]) + self.assertEqual(detector.dropped_bytes, 1) + self.assertEqual(detector.resync_events, 1) + + def test_detector_prefers_labeled_heartbeat_over_shifted_valid_window(self): + detector = FrameDetector() + heartbeat = bytes.fromhex("0000000080DA") + + detected = detector.feed(heartbeat[1:] + heartbeat) + + self.assertEqual([(format_frame(frame), label) for frame, label in detected], [ + ("00 00 00 00 80 DA", "heartbeat") + ]) + self.assertEqual(detector.dropped_bytes, 5) + def test_label_frame_marks_unlabeled_checksum_ok_frame(self): self.assertEqual(label_frame(bytes.fromhex("01000000005B")), "checksum_ok_unlabeled") + def test_label_frame_marks_table_readback_candidate(self): + self.assertEqual(label_frame(bytes.fromhex("04001280804C")), "table_readback_candidate") + def test_label_frame_marks_real_bench_c0_6020_response(self): self.assertEqual(label_frame(bytes.fromhex("0780C060205D")), "visible_C0_6020_family_candidate") diff --git a/tests/test_serial_scenario.py b/tests/test_serial_scenario.py new file mode 100644 index 0000000..0bd1902 --- /dev/null +++ b/tests/test_serial_scenario.py @@ -0,0 +1,50 @@ +import io +import json +import tempfile +import unittest +from pathlib import Path + +from h8536.serial_scenario import DEFAULT_ACK_FRAME, DEFAULT_ACK_TARGET, main + + +class SerialScenarioTest(unittest.TestCase): + def test_dry_run_summarizes_ack_aware_table_sweep(self): + scenario = { + "name": "unit-sweep", + "steps": [ + { + "action": "table_sweep", + "start": "0x000", + "count": "0x002", + "gap": 0.75, + "ack_on": { + "frames": ["07 80 40 20 90 2D"], + "ack_frame": "05 00 40 00 00 1F", + "max_acks": 8, + "max_target_hits": 32, + }, + } + ], + } + with tempfile.TemporaryDirectory() as tmpdir: + path = Path(tmpdir) / "scenario.json" + path.write_text(json.dumps(scenario), encoding="utf-8") + stdout = io.StringIO() + + exit_code = main([str(path), "--dry-run"], stdout=stdout) + + output = stdout.getvalue() + self.assertEqual(exit_code, 0) + self.assertIn("scenario=unit-sweep", output) + self.assertIn("selectors=2 first=0x000 last=0x001", output) + self.assertIn("ack_target=07 80 40 20 90 2D", output) + self.assertIn("ack_frame=05 00 40 00 00 1F", output) + self.assertIn("max_acks=8 max_target_hits=32", output) + + def test_default_ack_frames_match_current_rom_probe_candidate(self): + self.assertEqual(DEFAULT_ACK_TARGET, bytes.fromhex("07804020902D")) + self.assertEqual(DEFAULT_ACK_FRAME, bytes.fromhex("05004000001F")) + + +if __name__ == "__main__": + unittest.main()