from __future__ import annotations import argparse import sys import time from collections import Counter from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Iterable, TextIO CHECKSUM_SEED = 0x5A FRAME_LENGTH = 6 SERIAL_PARITY_CHOICES = ("N", "E", "O") CONNECT_LCD_SEQUENCE = ( bytes.fromhex("04000040001E"), bytes.fromhex("0400008000DE"), bytes.fromhex("040000C0009E"), ) COMMAND7_REPEAT_FRAME = bytes.fromhex("07000000005D") @dataclass class FrameDetector: sync_mode: str = "checksum" buffer: bytearray = field(default_factory=bytearray) frames: list[bytes] = field(default_factory=list) labels: Counter[str] = field(default_factory=Counter) dropped_bytes: int = 0 resync_events: int = 0 def feed(self, data: bytes) -> list[tuple[bytes, str]]: self.buffer.extend(data) if self.sync_mode == "fixed": return self._feed_fixed() if self.sync_mode != "checksum": raise ValueError(f"unknown frame sync mode {self.sync_mode!r}") return self._feed_checksum_resync() def _feed_fixed(self) -> list[tuple[bytes, str]]: detected = [] while len(self.buffer) >= FRAME_LENGTH: frame = bytes(self.buffer[:FRAME_LENGTH]) del self.buffer[:FRAME_LENGTH] label = label_frame(frame) self.frames.append(frame) if label: self.labels[label] += 1 detected.append((frame, label)) return detected def _feed_checksum_resync(self) -> list[tuple[bytes, str]]: detected = [] while len(self.buffer) >= FRAME_LENGTH: offset = _next_sync_offset(self.buffer) if offset is None: self._drop_unsynced_prefix(len(self.buffer) - (FRAME_LENGTH - 1)) break if offset: self._drop_unsynced_prefix(offset) frame = bytes(self.buffer[:FRAME_LENGTH]) if not frame_checksum_ok(frame): self._drop_unsynced_prefix(1) continue del self.buffer[:FRAME_LENGTH] label = label_frame(frame) self.frames.append(frame) if label: self.labels[label] += 1 detected.append((frame, label)) return detected def _drop_unsynced_prefix(self, count: int) -> None: count = max(0, min(count, len(self.buffer))) if not count: return del self.buffer[:count] self.dropped_bytes += count self.resync_events += 1 class BenchLogger: def __init__(self, path: Path, stdout: TextIO = sys.stdout) -> None: self.path = path self.stdout = stdout self.path.parent.mkdir(parents=True, exist_ok=True) self.file = self.path.open("w", encoding="utf-8", newline="\n") def close(self) -> None: self.file.close() def emit(self, line: str = "") -> None: print(line, file=self.stdout) print(line, file=self.file) self.file.flush() def chunk(self, direction: str, data: bytes) -> None: self.emit(f"{timestamp()} {direction:<2} {len(data):03d} bytes {format_frame(data)}") def event(self, text: str) -> None: self.emit(f"{timestamp()} {text}") def frame_checksum(data: bytes) -> int: checksum = CHECKSUM_SEED for value in data[: FRAME_LENGTH - 1]: checksum ^= value return checksum & 0xFF def frame_checksum_ok(frame: bytes) -> bool: return len(frame) == FRAME_LENGTH and frame_checksum(frame) == frame[-1] def parse_frame(text: str) -> bytes: normalized = text.strip().replace(",", " ").replace(":", " ").replace("-", " ").replace("_", " ") parts = normalized.split() if len(parts) == 1: compact = parts[0] if compact.lower().startswith("0x"): compact = compact[2:] if compact.upper().startswith("H'"): compact = compact[2:] if len(compact) % 2: raise argparse.ArgumentTypeError("compact frame hex must contain an even number of digits") parts = [compact[index : index + 2] for index in range(0, len(compact), 2)] values = [_parse_byte(part) for part in parts] if len(values) == FRAME_LENGTH - 1: values.append(frame_checksum(bytes(values))) if len(values) != FRAME_LENGTH: raise argparse.ArgumentTypeError("frame must contain five bytes plus computed checksum, or exactly six bytes") return bytes(values) def format_frame(data: bytes) -> str: return data.hex(" ").upper() def label_frame(frame: bytes) -> str: labels = { bytes.fromhex("0000000080DA"): "heartbeat", bytes.fromhex("02000200005A"): "connect_ok_path_response_candidate", bytes.fromhex("010002000059"): "connect_c0_path_response_candidate", bytes.fromhex("07804040A07D"): "visible_40A0_family_40", bytes.fromhex("07808040A0BD"): "visible_40A0_family_80", bytes.fromhex("0780C040A0FD"): "visible_40A0_family_C0", bytes.fromhex("07804020902D"): "visible_retry_0040_2090_candidate", bytes.fromhex("0780C060205D"): "visible_C0_6020_family_candidate", } label = labels.get(frame, "") if label: return label if frame_checksum_ok(frame): if frame[0] == 0x04: return "table_readback_candidate" if frame[0] == 0x07: return "visible_report_candidate" return "checksum_ok_unlabeled" return "checksum_bad_or_unaligned" def _next_sync_offset(buffer: bytearray) -> int | None: scored_offsets: list[tuple[int, int]] = [] for offset in range(0, len(buffer) - FRAME_LENGTH + 1): frame = bytes(buffer[offset : offset + FRAME_LENGTH]) if not frame_checksum_ok(frame): continue if offset == 0 and not _looks_like_shifted_heartbeat(frame): return 0 label = label_frame(frame) scored_offsets.append((_sync_score(frame, label), offset)) if not scored_offsets: return None return min(scored_offsets)[1] def _sync_score(frame: bytes, label: str) -> int: if label and label not in {"checksum_ok_unlabeled", "checksum_bad_or_unaligned"}: return 0 if frame[0] in {0x00, 0x02, 0x04, 0x07}: return 100 return 200 def _looks_like_shifted_heartbeat(frame: bytes) -> bool: return frame in { bytes.fromhex("00000080DA00"), bytes.fromhex("000080DA0000"), bytes.fromhex("0080DA000000"), bytes.fromhex("80DA00000000"), } def default_log_path() -> Path: return Path("captures") / f"bench-connect-lcd-sequence-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt" def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Bench-test the emulator CONNECT LCD sequence against the real RCP over RS232." ) parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP") parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate") add_serial_format_args(parser) parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port") parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate") parser.add_argument("--no-power-cycle", action="store_true", help="do not send relay off/on before the test") parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power") parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power") parser.add_argument("--off-seconds", type=float, default=1.5, help="seconds to hold the DUT powered off") parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the Pico relay port") parser.add_argument("--ready-timeout", type=float, default=10.0, help="seconds to wait for heartbeat before sending") parser.add_argument("--ready-heartbeats", type=int, default=2, help="heartbeat frames to observe before sending") parser.add_argument("--require-ready", action="store_true", help="abort if ready heartbeat count is not observed") parser.add_argument("--frame-gap", type=float, default=0.150, help="seconds to listen between sent frames") parser.add_argument("--post-sequence-read", type=float, default=3.0, help="seconds to listen after the sequence") parser.add_argument("--repeat", type=int, default=1, help="times to send the frame sequence in the same power session") parser.add_argument("--frame", action="append", type=parse_frame, help="override preset with a custom frame; repeatable") parser.add_argument("--sync", choices=("checksum", "fixed"), default="checksum", help="RX frame sync strategy") parser.add_argument("--two-frame", action="store_true", help="send only the first two CONNECT candidate frames") parser.add_argument("--command7-after", action="store_true", help="send command-7 repeat probe after the sequence") parser.add_argument("--pre-sequence-drain", type=float, default=0.250, help="seconds to drain/log RX immediately before sending") parser.add_argument("--prompt-screen", action="store_true", help="prompt for observed LCD text after the sequence") parser.add_argument("--prompt-before-send", action="store_true", help="also prompt for LCD text before sending the sequence") parser.add_argument("--log", type=Path, help="capture log path") parser.add_argument("--dry-run", action="store_true", help="print the planned sequence without opening serial ports") return parser def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int: args = build_arg_parser().parse_args(argv) frames = _planned_frames(args) log_path = args.log or default_log_path() if args.dry_run: print(f"device={args.port} {args.baud} {serial_format_label(args)}", file=stdout) print(f"relay={args.relay_port} {args.relay_baud}", file=stdout) print(f"power_cycle={int(not args.no_power_cycle)} off={args.power_off_command!r} on={args.power_on_command!r}", file=stdout) for index, frame in enumerate(frames, start=1): print(f"frame[{index}]={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}", file=stdout) if args.command7_after: print(f"command7_after={format_frame(COMMAND7_REPEAT_FRAME)} checksum_ok=1", file=stdout) print(f"log={log_path}", file=stdout) return 0 serial = _import_serial() logger = BenchLogger(log_path, stdout=stdout) detector = FrameDetector(sync_mode=args.sync) try: logger.emit("CONNECT LCD bench sequence") logger.emit( f"device={args.port} {args.baud} {serial_format_label(args)} " f"relay={args.relay_port} {args.relay_baud}" ) logger.emit(f"log={log_path}") for index, frame in enumerate(frames, start=1): logger.emit(f"plan frame[{index}]={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}") with open_device_serial(serial, args) as device: relay = None try: if not args.no_power_cycle: relay = serial.Serial(args.relay_port, args.relay_baud, timeout=0.25) _relay_settle(relay, args.relay_settle, logger) _relay_command(relay, args.power_off_command, logger) time.sleep(args.off_seconds) device.reset_input_buffer() detector = FrameDetector(sync_mode=args.sync) _relay_command(relay, args.power_on_command, logger) else: device.reset_input_buffer() ready = _wait_for_ready(device, detector, logger, args.ready_timeout, args.ready_heartbeats) if args.require_ready and not ready: logger.event("ABORT ready heartbeat threshold was not observed") return 2 if args.prompt_before_send: _prompt_screen("LCD after boot/ready", logger) if args.pre_sequence_drain > 0: logger.event(f"DRAIN before sequence {args.pre_sequence_drain:.3f}s") _read_for(device, detector, logger, args.pre_sequence_drain) for repeat_index in range(args.repeat): if args.repeat > 1: logger.event(f"BEGIN repeat {repeat_index + 1}/{args.repeat}") for frame_index, frame in enumerate(frames, start=1): _send_frame(device, frame, logger, f"seq{repeat_index + 1}.frame{frame_index}") _read_for(device, detector, logger, args.frame_gap) if args.command7_after: _send_frame(device, COMMAND7_REPEAT_FRAME, logger, "command7_after") _read_for(device, detector, logger, args.frame_gap) _read_for(device, detector, logger, args.post_sequence_read) if args.prompt_screen: _prompt_screen("LCD after CONNECT sequence", logger) finally: if relay is not None: relay.close() _summary(detector, logger) return 0 finally: logger.close() def _planned_frames(args: argparse.Namespace) -> list[bytes]: if args.frame: frames = list(args.frame) elif args.two_frame: frames = list(CONNECT_LCD_SEQUENCE[:2]) else: frames = list(CONNECT_LCD_SEQUENCE) if not frames: raise SystemExit("no frames selected") return frames def _import_serial(): try: import serial except ImportError as exc: # pragma: no cover - depends on local environment. raise SystemExit("pyserial is required; install it with: .\\.venv\\Scripts\\python.exe -m pip install pyserial") from exc return serial def add_serial_format_args(parser: argparse.ArgumentParser) -> None: parser.add_argument( "--parity", choices=SERIAL_PARITY_CHOICES, default="E", help="serial parity for the RCP link; ROM SCI1 setup uses even parity", ) def serial_format_label(args: argparse.Namespace) -> str: return f"8{args.parity}1" def open_device_serial(serial, args: argparse.Namespace): return serial.Serial(args.port, args.baud, bytesize=8, parity=args.parity, stopbits=1, timeout=0.05) def _send_frame(device, frame: bytes, logger: BenchLogger, label: str) -> None: device.write(frame) device.flush() logger.chunk("TX", frame) logger.event(f"SENT {label} checksum_ok={int(frame_checksum_ok(frame))}") def _read_for(device, detector: FrameDetector, logger: BenchLogger, seconds: float) -> None: deadline = time.monotonic() + max(0.0, seconds) while time.monotonic() < deadline: waiting = getattr(device, "in_waiting", 0) data = device.read(waiting or 1) if data: dropped_before = detector.dropped_bytes logger.chunk("RX", data) for frame, label in detector.feed(data): logger.event(f"DETECT {label} {format_frame(frame)}") dropped_now = detector.dropped_bytes - dropped_before if dropped_now: logger.event( f"RESYNC dropped_bytes={dropped_now} total_dropped={detector.dropped_bytes} " f"buffered={len(detector.buffer)}" ) def _wait_for_ready( device, detector: FrameDetector, logger: BenchLogger, timeout_seconds: float, ready_heartbeats: int, ) -> bool: logger.event(f"WAIT ready heartbeat target={ready_heartbeats} timeout={timeout_seconds:.3f}s") start_count = detector.labels["heartbeat"] deadline = time.monotonic() + max(0.0, timeout_seconds) while time.monotonic() < deadline: _read_for(device, detector, logger, 0.100) if detector.labels["heartbeat"] - start_count >= ready_heartbeats: logger.event(f"READY heartbeat_count={detector.labels['heartbeat']}") return True logger.event(f"READY_TIMEOUT heartbeat_count={detector.labels['heartbeat']}") return False def _relay_settle(relay, seconds: float, logger: BenchLogger) -> None: time.sleep(max(0.0, seconds)) _read_relay_lines(relay, logger, prefix="RELAY") def _relay_command(relay, command: str, logger: BenchLogger) -> None: relay.write((command.strip() + "\n").encode("utf-8")) relay.flush() logger.event(f"RELAY_TX {command.strip()}") _read_relay_lines(relay, logger, prefix="RELAY_RX") def _read_relay_lines(relay, logger: BenchLogger, prefix: str) -> None: deadline = time.monotonic() + 2.0 while time.monotonic() < deadline: line = relay.readline() if not line: return logger.event(f"{prefix} {line.decode('utf-8', errors='replace').strip()}") def _prompt_screen(label: str, logger: BenchLogger) -> None: note = input(f"{label}: type observed LCD text, or press Enter to skip: ").strip() logger.event(f"SCREEN {label}: {note or '(no note)'}") def _summary(detector: FrameDetector, logger: BenchLogger) -> None: logger.emit() logger.emit("Summary") logger.emit(f"rx_frames={len(detector.frames)} trailing_unframed_bytes={len(detector.buffer)}") logger.emit(f"resync_events={detector.resync_events} dropped_bytes={detector.dropped_bytes}") for label, count in sorted(detector.labels.items()): logger.emit(f"{label}={count}") def _parse_byte(text: str) -> int: token = text.strip() if token.lower().startswith("0x"): token = token[2:] if token.upper().startswith("H'"): token = token[2:] if not token or len(token) > 2: raise argparse.ArgumentTypeError(f"invalid byte token {text!r}") try: value = int(token, 16) except ValueError as exc: raise argparse.ArgumentTypeError(f"invalid byte token {text!r}") from exc if not 0 <= value <= 0xFF: raise argparse.ArgumentTypeError(f"byte out of range {text!r}") return value def timestamp() -> str: return datetime.now().strftime("%H:%M:%S.%f")[:-3] __all__ = [ "COMMAND7_REPEAT_FRAME", "CONNECT_LCD_SEQUENCE", "FrameDetector", "build_arg_parser", "format_frame", "frame_checksum", "frame_checksum_ok", "label_frame", "main", "parse_frame", ]