1
0
Files
h8-536-decoder/h8536/serial_ack_probe.py
2026-05-26 00:48:28 +10:00

190 lines
9.5 KiB
Python

from __future__ import annotations
import argparse
import sys
import time
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import TextIO
from .bench_connect_lcd import (
BenchLogger,
FrameDetector,
_import_serial,
_read_for,
_relay_command,
_relay_settle,
_send_frame,
_wait_for_ready,
format_frame,
frame_checksum_ok,
parse_frame,
)
DEFAULT_PRIME_FRAME = bytes.fromhex("01000000005B")
DEFAULT_TRIGGER_FRAME = bytes.fromhex("01000100005A")
DEFAULT_TARGET_FRAME = bytes.fromhex("07804020902D")
DEFAULT_ACK_FRAME = bytes.fromhex("05004000001F")
def default_log_path() -> Path:
return Path("captures") / f"serial-ack-probe-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt"
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=(
"Reproduce the command-1 visible retry burst and send one candidate ACK "
"after a target device frame is observed."
)
)
parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP")
parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate")
parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port")
parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate")
parser.add_argument("--no-power-cycle", action="store_true", help="do not send relay off/on before the test")
parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power")
parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power")
parser.add_argument("--off-seconds", type=float, default=1.5, help="seconds to hold the DUT powered off")
parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the relay port")
parser.add_argument("--ready-timeout", type=float, default=10.0, help="seconds to wait for heartbeat before sending")
parser.add_argument("--ready-heartbeats", type=int, default=2, help="heartbeat frames to observe before sending")
parser.add_argument("--require-ready", action="store_true", help="abort if ready heartbeat count is not observed")
parser.add_argument("--pre-drain", type=float, default=0.250, help="seconds to drain/log RX before priming")
parser.add_argument("--prime-frame", type=parse_frame, default=DEFAULT_PRIME_FRAME, help="first host frame")
parser.add_argument("--prime-gap", type=float, default=1.25, help="seconds to listen after the prime frame")
parser.add_argument("--trigger-frame", type=parse_frame, default=DEFAULT_TRIGGER_FRAME, help="second host frame")
parser.add_argument("--target-frame", type=parse_frame, default=DEFAULT_TARGET_FRAME, help="device frame to wait for")
parser.add_argument("--target-timeout", type=float, default=3.0, help="seconds to wait for target after trigger")
parser.add_argument("--ack-frame", type=parse_frame, default=DEFAULT_ACK_FRAME, help="candidate ACK frame")
parser.add_argument("--ack-guard", type=float, default=0.020, help="seconds to wait after target before ACK")
parser.add_argument("--post-ack-read", type=float, default=3.0, help="seconds to listen after ACK")
parser.add_argument("--log", type=Path, help="capture log path")
parser.add_argument("--dry-run", action="store_true", help="print planned frames without opening serial ports")
return parser
def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
args = build_arg_parser().parse_args(argv)
log_path = args.log or default_log_path()
if args.dry_run:
_print_plan(args, log_path, stdout)
return 0
serial = _import_serial()
logger = BenchLogger(log_path, stdout=stdout)
detector = FrameDetector()
try:
logger.emit("Serial ACK probe")
logger.emit(f"device={args.port} {args.baud} 8N1 relay={args.relay_port} {args.relay_baud}")
logger.emit(f"log={log_path}")
_emit_plan(args, logger)
with serial.Serial(args.port, args.baud, bytesize=8, parity="N", stopbits=1, timeout=0.05) as device:
relay = None
try:
if not args.no_power_cycle:
relay = serial.Serial(args.relay_port, args.relay_baud, timeout=0.25)
_relay_settle(relay, args.relay_settle, logger)
_relay_command(relay, args.power_off_command, logger)
time.sleep(args.off_seconds)
device.reset_input_buffer()
detector = FrameDetector()
_relay_command(relay, args.power_on_command, logger)
else:
device.reset_input_buffer()
ready = _wait_for_ready(device, detector, logger, args.ready_timeout, args.ready_heartbeats)
if args.require_ready and not ready:
logger.event("ABORT ready heartbeat threshold was not observed")
return 2
if args.pre_drain > 0:
logger.event(f"DRAIN before ACK probe {args.pre_drain:.3f}s")
_read_for(device, detector, logger, args.pre_drain)
_send_frame(device, args.prime_frame, logger, "prime")
_read_for(device, detector, logger, args.prime_gap)
before_trigger = len(detector.frames)
_send_frame(device, args.trigger_frame, logger, "trigger")
target_seen = _wait_for_target(device, detector, logger, args.target_frame, args.target_timeout)
before_ack = len(detector.frames)
if not target_seen:
logger.event("TARGET_TIMEOUT no ACK sent")
_read_for(device, detector, logger, args.post_ack_read)
_summary(detector, logger, args.target_frame, before_trigger, before_ack, len(detector.frames))
return 3
if args.ack_guard > 0:
logger.event(f"ACK guard {args.ack_guard:.3f}s")
_read_for(device, detector, logger, args.ack_guard)
before_ack = len(detector.frames)
_send_frame(device, args.ack_frame, logger, "ack")
_read_for(device, detector, logger, args.post_ack_read)
_summary(detector, logger, args.target_frame, before_trigger, before_ack, len(detector.frames))
finally:
if relay is not None:
relay.close()
return 0
finally:
logger.close()
def _wait_for_target(device, detector: FrameDetector, logger: BenchLogger, target: bytes, timeout_seconds: float) -> bool:
logger.event(f"WAIT target={format_frame(target)} timeout={timeout_seconds:.3f}s")
start_index = len(detector.frames)
deadline = time.monotonic() + max(0.0, timeout_seconds)
while time.monotonic() < deadline:
before = len(detector.frames)
_read_for(device, detector, logger, 0.050)
for frame in detector.frames[max(start_index, before) :]:
if frame == target:
logger.event(f"TARGET seen {format_frame(frame)}")
return True
return False
def _summary(detector: FrameDetector, logger: BenchLogger, target: bytes, trigger_start: int, ack_start: int, end: int) -> None:
trigger_frames = detector.frames[trigger_start:ack_start]
post_ack_frames = detector.frames[ack_start:end]
logger.emit()
logger.emit("Summary")
logger.emit(f"rx_frames={len(detector.frames)} trailing_unframed_bytes={len(detector.buffer)}")
logger.emit(f"resync_events={detector.resync_events} dropped_bytes={detector.dropped_bytes}")
logger.emit(f"target_before_ack={sum(1 for frame in trigger_frames if frame == target)}")
logger.emit(f"target_after_ack={sum(1 for frame in post_ack_frames if frame == target)}")
labels = Counter(label for label in detector.labels.elements())
for label, count in sorted(labels.items()):
logger.emit(f"{label}={count}")
def _emit_plan(args: argparse.Namespace, logger: BenchLogger) -> None:
logger.emit(f"prime={format_frame(args.prime_frame)} checksum_ok={int(frame_checksum_ok(args.prime_frame))}")
logger.emit(f"trigger={format_frame(args.trigger_frame)} checksum_ok={int(frame_checksum_ok(args.trigger_frame))}")
logger.emit(f"target={format_frame(args.target_frame)} checksum_ok={int(frame_checksum_ok(args.target_frame))}")
logger.emit(f"ack={format_frame(args.ack_frame)} checksum_ok={int(frame_checksum_ok(args.ack_frame))}")
def _print_plan(args: argparse.Namespace, log_path: Path, stdout: TextIO) -> None:
print(f"device={args.port} {args.baud} 8N1", file=stdout)
print(f"relay={args.relay_port} {args.relay_baud}", file=stdout)
print(f"power_cycle={int(not args.no_power_cycle)}", file=stdout)
print(f"prime={format_frame(args.prime_frame)} checksum_ok={int(frame_checksum_ok(args.prime_frame))}", file=stdout)
print(f"trigger={format_frame(args.trigger_frame)} checksum_ok={int(frame_checksum_ok(args.trigger_frame))}", file=stdout)
print(f"target={format_frame(args.target_frame)} checksum_ok={int(frame_checksum_ok(args.target_frame))}", file=stdout)
print(f"ack={format_frame(args.ack_frame)} checksum_ok={int(frame_checksum_ok(args.ack_frame))}", file=stdout)
print(f"log={log_path}", file=stdout)
__all__ = [
"DEFAULT_ACK_FRAME",
"DEFAULT_PRIME_FRAME",
"DEFAULT_TARGET_FRAME",
"DEFAULT_TRIGGER_FRAME",
"build_arg_parser",
"main",
]