from __future__ import annotations import argparse import sys import time from collections import Counter from datetime import datetime from pathlib import Path from typing import TextIO from .bench_connect_lcd import ( BenchLogger, FrameDetector, _import_serial, _read_for, _relay_command, _relay_settle, _send_frame, _wait_for_ready, format_frame, frame_checksum_ok, parse_frame, ) DEFAULT_PRIME_FRAME = bytes.fromhex("01000000005B") DEFAULT_TRIGGER_FRAME = bytes.fromhex("01000100005A") DEFAULT_TARGET_FRAME = bytes.fromhex("07804020902D") DEFAULT_ACK_FRAME = bytes.fromhex("05004000001F") def default_log_path() -> Path: return Path("captures") / f"serial-ack-probe-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt" def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description=( "Reproduce the command-1 visible retry burst and send one candidate ACK " "after a target device frame is observed." ) ) parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP") parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate") parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port") parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate") parser.add_argument("--no-power-cycle", action="store_true", help="do not send relay off/on before the test") parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power") parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power") parser.add_argument("--off-seconds", type=float, default=1.5, help="seconds to hold the DUT powered off") parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the relay port") parser.add_argument("--ready-timeout", type=float, default=10.0, help="seconds to wait for heartbeat before sending") parser.add_argument("--ready-heartbeats", type=int, default=2, help="heartbeat frames to observe before sending") parser.add_argument("--require-ready", action="store_true", help="abort if ready heartbeat count is not observed") parser.add_argument("--pre-drain", type=float, default=0.250, help="seconds to drain/log RX before priming") parser.add_argument("--prime-frame", type=parse_frame, default=DEFAULT_PRIME_FRAME, help="first host frame") parser.add_argument("--prime-gap", type=float, default=1.25, help="seconds to listen after the prime frame") parser.add_argument("--trigger-frame", type=parse_frame, default=DEFAULT_TRIGGER_FRAME, help="second host frame") parser.add_argument("--target-frame", type=parse_frame, default=DEFAULT_TARGET_FRAME, help="device frame to wait for") parser.add_argument("--target-timeout", type=float, default=3.0, help="seconds to wait for target after trigger") parser.add_argument("--ack-frame", type=parse_frame, default=DEFAULT_ACK_FRAME, help="candidate ACK frame") parser.add_argument("--ack-guard", type=float, default=0.020, help="seconds to wait after target before ACK") parser.add_argument("--post-ack-read", type=float, default=3.0, help="seconds to listen after ACK") parser.add_argument("--log", type=Path, help="capture log path") parser.add_argument("--dry-run", action="store_true", help="print planned frames without opening serial ports") return parser def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int: args = build_arg_parser().parse_args(argv) log_path = args.log or default_log_path() if args.dry_run: _print_plan(args, log_path, stdout) return 0 serial = _import_serial() logger = BenchLogger(log_path, stdout=stdout) detector = FrameDetector() try: logger.emit("Serial ACK probe") logger.emit(f"device={args.port} {args.baud} 8N1 relay={args.relay_port} {args.relay_baud}") logger.emit(f"log={log_path}") _emit_plan(args, logger) with serial.Serial(args.port, args.baud, bytesize=8, parity="N", stopbits=1, timeout=0.05) as device: relay = None try: if not args.no_power_cycle: relay = serial.Serial(args.relay_port, args.relay_baud, timeout=0.25) _relay_settle(relay, args.relay_settle, logger) _relay_command(relay, args.power_off_command, logger) time.sleep(args.off_seconds) device.reset_input_buffer() detector = FrameDetector() _relay_command(relay, args.power_on_command, logger) else: device.reset_input_buffer() ready = _wait_for_ready(device, detector, logger, args.ready_timeout, args.ready_heartbeats) if args.require_ready and not ready: logger.event("ABORT ready heartbeat threshold was not observed") return 2 if args.pre_drain > 0: logger.event(f"DRAIN before ACK probe {args.pre_drain:.3f}s") _read_for(device, detector, logger, args.pre_drain) _send_frame(device, args.prime_frame, logger, "prime") _read_for(device, detector, logger, args.prime_gap) before_trigger = len(detector.frames) _send_frame(device, args.trigger_frame, logger, "trigger") target_seen = _wait_for_target(device, detector, logger, args.target_frame, args.target_timeout) before_ack = len(detector.frames) if not target_seen: logger.event("TARGET_TIMEOUT no ACK sent") _read_for(device, detector, logger, args.post_ack_read) _summary(detector, logger, args.target_frame, before_trigger, before_ack, len(detector.frames)) return 3 if args.ack_guard > 0: logger.event(f"ACK guard {args.ack_guard:.3f}s") _read_for(device, detector, logger, args.ack_guard) before_ack = len(detector.frames) _send_frame(device, args.ack_frame, logger, "ack") _read_for(device, detector, logger, args.post_ack_read) _summary(detector, logger, args.target_frame, before_trigger, before_ack, len(detector.frames)) finally: if relay is not None: relay.close() return 0 finally: logger.close() def _wait_for_target(device, detector: FrameDetector, logger: BenchLogger, target: bytes, timeout_seconds: float) -> bool: logger.event(f"WAIT target={format_frame(target)} timeout={timeout_seconds:.3f}s") start_index = len(detector.frames) deadline = time.monotonic() + max(0.0, timeout_seconds) while time.monotonic() < deadline: before = len(detector.frames) _read_for(device, detector, logger, 0.050) for frame in detector.frames[max(start_index, before) :]: if frame == target: logger.event(f"TARGET seen {format_frame(frame)}") return True return False def _summary(detector: FrameDetector, logger: BenchLogger, target: bytes, trigger_start: int, ack_start: int, end: int) -> None: trigger_frames = detector.frames[trigger_start:ack_start] post_ack_frames = detector.frames[ack_start:end] logger.emit() logger.emit("Summary") logger.emit(f"rx_frames={len(detector.frames)} trailing_unframed_bytes={len(detector.buffer)}") logger.emit(f"target_before_ack={sum(1 for frame in trigger_frames if frame == target)}") logger.emit(f"target_after_ack={sum(1 for frame in post_ack_frames if frame == target)}") labels = Counter(label for label in detector.labels.elements()) for label, count in sorted(labels.items()): logger.emit(f"{label}={count}") def _emit_plan(args: argparse.Namespace, logger: BenchLogger) -> None: logger.emit(f"prime={format_frame(args.prime_frame)} checksum_ok={int(frame_checksum_ok(args.prime_frame))}") logger.emit(f"trigger={format_frame(args.trigger_frame)} checksum_ok={int(frame_checksum_ok(args.trigger_frame))}") logger.emit(f"target={format_frame(args.target_frame)} checksum_ok={int(frame_checksum_ok(args.target_frame))}") logger.emit(f"ack={format_frame(args.ack_frame)} checksum_ok={int(frame_checksum_ok(args.ack_frame))}") def _print_plan(args: argparse.Namespace, log_path: Path, stdout: TextIO) -> None: print(f"device={args.port} {args.baud} 8N1", file=stdout) print(f"relay={args.relay_port} {args.relay_baud}", file=stdout) print(f"power_cycle={int(not args.no_power_cycle)}", file=stdout) print(f"prime={format_frame(args.prime_frame)} checksum_ok={int(frame_checksum_ok(args.prime_frame))}", file=stdout) print(f"trigger={format_frame(args.trigger_frame)} checksum_ok={int(frame_checksum_ok(args.trigger_frame))}", file=stdout) print(f"target={format_frame(args.target_frame)} checksum_ok={int(frame_checksum_ok(args.target_frame))}", file=stdout) print(f"ack={format_frame(args.ack_frame)} checksum_ok={int(frame_checksum_ok(args.ack_frame))}", file=stdout) print(f"log={log_path}", file=stdout) __all__ = [ "DEFAULT_ACK_FRAME", "DEFAULT_PRIME_FRAME", "DEFAULT_TARGET_FRAME", "DEFAULT_TRIGGER_FRAME", "build_arg_parser", "main", ]