461 lines
18 KiB
Python
461 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import sys
|
|
import time
|
|
from collections import Counter
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Iterable, TextIO
|
|
|
|
|
|
CHECKSUM_SEED = 0x5A
|
|
FRAME_LENGTH = 6
|
|
SERIAL_PARITY_CHOICES = ("N", "E", "O")
|
|
|
|
CONNECT_LCD_SEQUENCE = (
|
|
bytes.fromhex("04000040001E"),
|
|
bytes.fromhex("0400008000DE"),
|
|
bytes.fromhex("040000C0009E"),
|
|
)
|
|
COMMAND7_REPEAT_FRAME = bytes.fromhex("07000000005D")
|
|
|
|
|
|
@dataclass
|
|
class FrameDetector:
|
|
sync_mode: str = "checksum"
|
|
buffer: bytearray = field(default_factory=bytearray)
|
|
frames: list[bytes] = field(default_factory=list)
|
|
labels: Counter[str] = field(default_factory=Counter)
|
|
dropped_bytes: int = 0
|
|
resync_events: int = 0
|
|
|
|
def feed(self, data: bytes) -> list[tuple[bytes, str]]:
|
|
self.buffer.extend(data)
|
|
if self.sync_mode == "fixed":
|
|
return self._feed_fixed()
|
|
if self.sync_mode != "checksum":
|
|
raise ValueError(f"unknown frame sync mode {self.sync_mode!r}")
|
|
return self._feed_checksum_resync()
|
|
|
|
def _feed_fixed(self) -> list[tuple[bytes, str]]:
|
|
detected = []
|
|
while len(self.buffer) >= FRAME_LENGTH:
|
|
frame = bytes(self.buffer[:FRAME_LENGTH])
|
|
del self.buffer[:FRAME_LENGTH]
|
|
label = label_frame(frame)
|
|
self.frames.append(frame)
|
|
if label:
|
|
self.labels[label] += 1
|
|
detected.append((frame, label))
|
|
return detected
|
|
|
|
def _feed_checksum_resync(self) -> list[tuple[bytes, str]]:
|
|
detected = []
|
|
while len(self.buffer) >= FRAME_LENGTH:
|
|
offset = _next_sync_offset(self.buffer)
|
|
if offset is None:
|
|
self._drop_unsynced_prefix(len(self.buffer) - (FRAME_LENGTH - 1))
|
|
break
|
|
if offset:
|
|
self._drop_unsynced_prefix(offset)
|
|
frame = bytes(self.buffer[:FRAME_LENGTH])
|
|
if not frame_checksum_ok(frame):
|
|
self._drop_unsynced_prefix(1)
|
|
continue
|
|
del self.buffer[:FRAME_LENGTH]
|
|
label = label_frame(frame)
|
|
self.frames.append(frame)
|
|
if label:
|
|
self.labels[label] += 1
|
|
detected.append((frame, label))
|
|
return detected
|
|
|
|
def _drop_unsynced_prefix(self, count: int) -> None:
|
|
count = max(0, min(count, len(self.buffer)))
|
|
if not count:
|
|
return
|
|
del self.buffer[:count]
|
|
self.dropped_bytes += count
|
|
self.resync_events += 1
|
|
|
|
|
|
class BenchLogger:
|
|
def __init__(self, path: Path, stdout: TextIO = sys.stdout) -> None:
|
|
self.path = path
|
|
self.stdout = stdout
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
self.file = self.path.open("w", encoding="utf-8", newline="\n")
|
|
|
|
def close(self) -> None:
|
|
self.file.close()
|
|
|
|
def emit(self, line: str = "") -> None:
|
|
print(line, file=self.stdout)
|
|
print(line, file=self.file)
|
|
self.file.flush()
|
|
|
|
def chunk(self, direction: str, data: bytes) -> None:
|
|
self.emit(f"{timestamp()} {direction:<2} {len(data):03d} bytes {format_frame(data)}")
|
|
|
|
def event(self, text: str) -> None:
|
|
self.emit(f"{timestamp()} {text}")
|
|
|
|
|
|
def frame_checksum(data: bytes) -> int:
|
|
checksum = CHECKSUM_SEED
|
|
for value in data[: FRAME_LENGTH - 1]:
|
|
checksum ^= value
|
|
return checksum & 0xFF
|
|
|
|
|
|
def frame_checksum_ok(frame: bytes) -> bool:
|
|
return len(frame) == FRAME_LENGTH and frame_checksum(frame) == frame[-1]
|
|
|
|
|
|
def parse_frame(text: str) -> bytes:
|
|
normalized = text.strip().replace(",", " ").replace(":", " ").replace("-", " ").replace("_", " ")
|
|
parts = normalized.split()
|
|
if len(parts) == 1:
|
|
compact = parts[0]
|
|
if compact.lower().startswith("0x"):
|
|
compact = compact[2:]
|
|
if compact.upper().startswith("H'"):
|
|
compact = compact[2:]
|
|
if len(compact) % 2:
|
|
raise argparse.ArgumentTypeError("compact frame hex must contain an even number of digits")
|
|
parts = [compact[index : index + 2] for index in range(0, len(compact), 2)]
|
|
values = [_parse_byte(part) for part in parts]
|
|
if len(values) == FRAME_LENGTH - 1:
|
|
values.append(frame_checksum(bytes(values)))
|
|
if len(values) != FRAME_LENGTH:
|
|
raise argparse.ArgumentTypeError("frame must contain five bytes plus computed checksum, or exactly six bytes")
|
|
return bytes(values)
|
|
|
|
|
|
def format_frame(data: bytes) -> str:
|
|
return data.hex(" ").upper()
|
|
|
|
|
|
def label_frame(frame: bytes) -> str:
|
|
labels = {
|
|
bytes.fromhex("0000000080DA"): "heartbeat",
|
|
bytes.fromhex("02000200005A"): "connect_ok_path_response_candidate",
|
|
bytes.fromhex("010002000059"): "connect_c0_path_response_candidate",
|
|
bytes.fromhex("07804040A07D"): "visible_40A0_family_40",
|
|
bytes.fromhex("07808040A0BD"): "visible_40A0_family_80",
|
|
bytes.fromhex("0780C040A0FD"): "visible_40A0_family_C0",
|
|
bytes.fromhex("07804020902D"): "visible_retry_0040_2090_candidate",
|
|
bytes.fromhex("0780C060205D"): "visible_C0_6020_family_candidate",
|
|
bytes.fromhex("0000158000CF"): "known_call_button_active_report",
|
|
bytes.fromhex("00001500004F"): "known_call_button_inactive_report",
|
|
bytes.fromhex("0000078000DD"): "known_cam_power_button_report",
|
|
bytes.fromhex("01000400005F"): "gated_active_0004_response_candidate",
|
|
bytes.fromhex("02000400005C"): "gated_active_0004_transition_candidate",
|
|
}
|
|
label = labels.get(frame, "")
|
|
if label:
|
|
return label
|
|
if frame_checksum_ok(frame):
|
|
if frame[0] == 0x04:
|
|
return "table_readback_candidate"
|
|
if frame[0] == 0x07:
|
|
return "visible_report_candidate"
|
|
return "checksum_ok_unlabeled"
|
|
return "checksum_bad_or_unaligned"
|
|
|
|
|
|
def _next_sync_offset(buffer: bytearray) -> int | None:
|
|
scored_offsets: list[tuple[int, int]] = []
|
|
for offset in range(0, len(buffer) - FRAME_LENGTH + 1):
|
|
frame = bytes(buffer[offset : offset + FRAME_LENGTH])
|
|
if not frame_checksum_ok(frame):
|
|
continue
|
|
if offset == 0 and not _looks_like_shifted_heartbeat(frame):
|
|
return 0
|
|
label = label_frame(frame)
|
|
scored_offsets.append((_sync_score(frame, label), offset))
|
|
if not scored_offsets:
|
|
return None
|
|
return min(scored_offsets)[1]
|
|
|
|
|
|
def _sync_score(frame: bytes, label: str) -> int:
|
|
if label and label not in {"checksum_ok_unlabeled", "checksum_bad_or_unaligned"}:
|
|
return 0
|
|
if frame[0] in {0x00, 0x02, 0x04, 0x07}:
|
|
return 100
|
|
return 200
|
|
|
|
|
|
def _looks_like_shifted_heartbeat(frame: bytes) -> bool:
|
|
return frame in {
|
|
bytes.fromhex("00000080DA00"),
|
|
bytes.fromhex("000080DA0000"),
|
|
bytes.fromhex("0080DA000000"),
|
|
bytes.fromhex("80DA00000000"),
|
|
}
|
|
|
|
|
|
def default_log_path() -> Path:
|
|
return Path("captures") / f"bench-connect-lcd-sequence-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt"
|
|
|
|
|
|
def build_arg_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="Bench-test the emulator CONNECT LCD sequence against the real RCP over RS232."
|
|
)
|
|
parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP")
|
|
parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate")
|
|
add_serial_format_args(parser)
|
|
parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port")
|
|
parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate")
|
|
parser.add_argument("--no-power-cycle", action="store_true", help="do not send relay off/on before the test")
|
|
parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power")
|
|
parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power")
|
|
parser.add_argument("--off-seconds", type=float, default=1.5, help="seconds to hold the DUT powered off")
|
|
parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the Pico relay port")
|
|
parser.add_argument("--ready-timeout", type=float, default=10.0, help="seconds to wait for heartbeat before sending")
|
|
parser.add_argument("--ready-heartbeats", type=int, default=2, help="heartbeat frames to observe before sending")
|
|
parser.add_argument("--require-ready", action="store_true", help="abort if ready heartbeat count is not observed")
|
|
parser.add_argument("--frame-gap", type=float, default=0.150, help="seconds to listen between sent frames")
|
|
parser.add_argument("--post-sequence-read", type=float, default=3.0, help="seconds to listen after the sequence")
|
|
parser.add_argument("--repeat", type=int, default=1, help="times to send the frame sequence in the same power session")
|
|
parser.add_argument("--frame", action="append", type=parse_frame, help="override preset with a custom frame; repeatable")
|
|
parser.add_argument("--sync", choices=("checksum", "fixed"), default="checksum", help="RX frame sync strategy")
|
|
parser.add_argument("--two-frame", action="store_true", help="send only the first two CONNECT candidate frames")
|
|
parser.add_argument("--command7-after", action="store_true", help="send command-7 repeat probe after the sequence")
|
|
parser.add_argument("--pre-sequence-drain", type=float, default=0.250, help="seconds to drain/log RX immediately before sending")
|
|
parser.add_argument("--prompt-screen", action="store_true", help="prompt for observed LCD text after the sequence")
|
|
parser.add_argument("--prompt-before-send", action="store_true", help="also prompt for LCD text before sending the sequence")
|
|
parser.add_argument("--log", type=Path, help="capture log path")
|
|
parser.add_argument("--dry-run", action="store_true", help="print the planned sequence without opening serial ports")
|
|
return parser
|
|
|
|
|
|
def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
|
|
args = build_arg_parser().parse_args(argv)
|
|
frames = _planned_frames(args)
|
|
log_path = args.log or default_log_path()
|
|
|
|
if args.dry_run:
|
|
print(f"device={args.port} {args.baud} {serial_format_label(args)}", file=stdout)
|
|
print(f"relay={args.relay_port} {args.relay_baud}", file=stdout)
|
|
print(f"power_cycle={int(not args.no_power_cycle)} off={args.power_off_command!r} on={args.power_on_command!r}", file=stdout)
|
|
for index, frame in enumerate(frames, start=1):
|
|
print(f"frame[{index}]={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}", file=stdout)
|
|
if args.command7_after:
|
|
print(f"command7_after={format_frame(COMMAND7_REPEAT_FRAME)} checksum_ok=1", file=stdout)
|
|
print(f"log={log_path}", file=stdout)
|
|
return 0
|
|
|
|
serial = _import_serial()
|
|
logger = BenchLogger(log_path, stdout=stdout)
|
|
detector = FrameDetector(sync_mode=args.sync)
|
|
try:
|
|
logger.emit("CONNECT LCD bench sequence")
|
|
logger.emit(
|
|
f"device={args.port} {args.baud} {serial_format_label(args)} "
|
|
f"relay={args.relay_port} {args.relay_baud}"
|
|
)
|
|
logger.emit(f"log={log_path}")
|
|
for index, frame in enumerate(frames, start=1):
|
|
logger.emit(f"plan frame[{index}]={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}")
|
|
|
|
with open_device_serial(serial, args) as device:
|
|
relay = None
|
|
try:
|
|
if not args.no_power_cycle:
|
|
relay = serial.Serial(args.relay_port, args.relay_baud, timeout=0.25)
|
|
_relay_settle(relay, args.relay_settle, logger)
|
|
_relay_command(relay, args.power_off_command, logger)
|
|
time.sleep(args.off_seconds)
|
|
device.reset_input_buffer()
|
|
detector = FrameDetector(sync_mode=args.sync)
|
|
_relay_command(relay, args.power_on_command, logger)
|
|
else:
|
|
device.reset_input_buffer()
|
|
|
|
ready = _wait_for_ready(device, detector, logger, args.ready_timeout, args.ready_heartbeats)
|
|
if args.require_ready and not ready:
|
|
logger.event("ABORT ready heartbeat threshold was not observed")
|
|
return 2
|
|
if args.prompt_before_send:
|
|
_prompt_screen("LCD after boot/ready", logger)
|
|
if args.pre_sequence_drain > 0:
|
|
logger.event(f"DRAIN before sequence {args.pre_sequence_drain:.3f}s")
|
|
_read_for(device, detector, logger, args.pre_sequence_drain)
|
|
|
|
for repeat_index in range(args.repeat):
|
|
if args.repeat > 1:
|
|
logger.event(f"BEGIN repeat {repeat_index + 1}/{args.repeat}")
|
|
for frame_index, frame in enumerate(frames, start=1):
|
|
_send_frame(device, frame, logger, f"seq{repeat_index + 1}.frame{frame_index}")
|
|
_read_for(device, detector, logger, args.frame_gap)
|
|
if args.command7_after:
|
|
_send_frame(device, COMMAND7_REPEAT_FRAME, logger, "command7_after")
|
|
_read_for(device, detector, logger, args.frame_gap)
|
|
|
|
_read_for(device, detector, logger, args.post_sequence_read)
|
|
if args.prompt_screen:
|
|
_prompt_screen("LCD after CONNECT sequence", logger)
|
|
finally:
|
|
if relay is not None:
|
|
relay.close()
|
|
_summary(detector, logger)
|
|
return 0
|
|
finally:
|
|
logger.close()
|
|
|
|
|
|
def _planned_frames(args: argparse.Namespace) -> list[bytes]:
|
|
if args.frame:
|
|
frames = list(args.frame)
|
|
elif args.two_frame:
|
|
frames = list(CONNECT_LCD_SEQUENCE[:2])
|
|
else:
|
|
frames = list(CONNECT_LCD_SEQUENCE)
|
|
if not frames:
|
|
raise SystemExit("no frames selected")
|
|
return frames
|
|
|
|
|
|
def _import_serial():
|
|
try:
|
|
import serial
|
|
except ImportError as exc: # pragma: no cover - depends on local environment.
|
|
raise SystemExit("pyserial is required; install it with: .\\.venv\\Scripts\\python.exe -m pip install pyserial") from exc
|
|
return serial
|
|
|
|
|
|
def add_serial_format_args(parser: argparse.ArgumentParser) -> None:
|
|
parser.add_argument(
|
|
"--parity",
|
|
choices=SERIAL_PARITY_CHOICES,
|
|
default="E",
|
|
help="serial parity for the RCP link; ROM SCI1 setup uses even parity",
|
|
)
|
|
|
|
|
|
def serial_format_label(args: argparse.Namespace) -> str:
|
|
return f"8{args.parity}1"
|
|
|
|
|
|
def open_device_serial(serial, args: argparse.Namespace):
|
|
return serial.Serial(args.port, args.baud, bytesize=8, parity=args.parity, stopbits=1, timeout=0.05)
|
|
|
|
|
|
def _send_frame(device, frame: bytes, logger: BenchLogger, label: str) -> None:
|
|
device.write(frame)
|
|
device.flush()
|
|
logger.chunk("TX", frame)
|
|
logger.event(f"SENT {label} checksum_ok={int(frame_checksum_ok(frame))}")
|
|
|
|
|
|
def _read_for(device, detector: FrameDetector, logger: BenchLogger, seconds: float) -> None:
|
|
deadline = time.monotonic() + max(0.0, seconds)
|
|
while time.monotonic() < deadline:
|
|
waiting = getattr(device, "in_waiting", 0)
|
|
data = device.read(waiting or 1)
|
|
if data:
|
|
dropped_before = detector.dropped_bytes
|
|
logger.chunk("RX", data)
|
|
for frame, label in detector.feed(data):
|
|
logger.event(f"DETECT {label} {format_frame(frame)}")
|
|
dropped_now = detector.dropped_bytes - dropped_before
|
|
if dropped_now:
|
|
logger.event(
|
|
f"RESYNC dropped_bytes={dropped_now} total_dropped={detector.dropped_bytes} "
|
|
f"buffered={len(detector.buffer)}"
|
|
)
|
|
|
|
|
|
def _wait_for_ready(
|
|
device,
|
|
detector: FrameDetector,
|
|
logger: BenchLogger,
|
|
timeout_seconds: float,
|
|
ready_heartbeats: int,
|
|
) -> bool:
|
|
logger.event(f"WAIT ready heartbeat target={ready_heartbeats} timeout={timeout_seconds:.3f}s")
|
|
start_count = detector.labels["heartbeat"]
|
|
deadline = time.monotonic() + max(0.0, timeout_seconds)
|
|
while time.monotonic() < deadline:
|
|
_read_for(device, detector, logger, 0.100)
|
|
if detector.labels["heartbeat"] - start_count >= ready_heartbeats:
|
|
logger.event(f"READY heartbeat_count={detector.labels['heartbeat']}")
|
|
return True
|
|
logger.event(f"READY_TIMEOUT heartbeat_count={detector.labels['heartbeat']}")
|
|
return False
|
|
|
|
|
|
def _relay_settle(relay, seconds: float, logger: BenchLogger) -> None:
|
|
time.sleep(max(0.0, seconds))
|
|
_read_relay_lines(relay, logger, prefix="RELAY")
|
|
|
|
|
|
def _relay_command(relay, command: str, logger: BenchLogger) -> None:
|
|
relay.write((command.strip() + "\n").encode("utf-8"))
|
|
relay.flush()
|
|
logger.event(f"RELAY_TX {command.strip()}")
|
|
_read_relay_lines(relay, logger, prefix="RELAY_RX")
|
|
|
|
|
|
def _read_relay_lines(relay, logger: BenchLogger, prefix: str) -> None:
|
|
deadline = time.monotonic() + 2.0
|
|
while time.monotonic() < deadline:
|
|
line = relay.readline()
|
|
if not line:
|
|
return
|
|
logger.event(f"{prefix} {line.decode('utf-8', errors='replace').strip()}")
|
|
|
|
|
|
def _prompt_screen(label: str, logger: BenchLogger) -> None:
|
|
note = input(f"{label}: type observed LCD text, or press Enter to skip: ").strip()
|
|
logger.event(f"SCREEN {label}: {note or '(no note)'}")
|
|
|
|
|
|
def _summary(detector: FrameDetector, logger: BenchLogger) -> None:
|
|
logger.emit()
|
|
logger.emit("Summary")
|
|
logger.emit(f"rx_frames={len(detector.frames)} trailing_unframed_bytes={len(detector.buffer)}")
|
|
logger.emit(f"resync_events={detector.resync_events} dropped_bytes={detector.dropped_bytes}")
|
|
for label, count in sorted(detector.labels.items()):
|
|
logger.emit(f"{label}={count}")
|
|
|
|
|
|
def _parse_byte(text: str) -> int:
|
|
token = text.strip()
|
|
if token.lower().startswith("0x"):
|
|
token = token[2:]
|
|
if token.upper().startswith("H'"):
|
|
token = token[2:]
|
|
if not token or len(token) > 2:
|
|
raise argparse.ArgumentTypeError(f"invalid byte token {text!r}")
|
|
try:
|
|
value = int(token, 16)
|
|
except ValueError as exc:
|
|
raise argparse.ArgumentTypeError(f"invalid byte token {text!r}") from exc
|
|
if not 0 <= value <= 0xFF:
|
|
raise argparse.ArgumentTypeError(f"byte out of range {text!r}")
|
|
return value
|
|
|
|
|
|
def timestamp() -> str:
|
|
return datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
|
|
|
|
__all__ = [
|
|
"COMMAND7_REPEAT_FRAME",
|
|
"CONNECT_LCD_SEQUENCE",
|
|
"FrameDetector",
|
|
"build_arg_parser",
|
|
"format_frame",
|
|
"frame_checksum",
|
|
"frame_checksum_ok",
|
|
"label_frame",
|
|
"main",
|
|
"parse_frame",
|
|
]
|