#!/usr/bin/env python3 """Send arbitrary RCP-TX7 host frame sequences and classify RX windows. This helper is meant for exploratory multi-frame tests such as: primer -> announce/selector -> query primer -> short query block repeated pseudo-keepalive groups Each transmitted frame can be followed by a small read window so responses can be attributed to a specific host frame more easily than in a long raw capture. """ from __future__ import annotations import argparse import datetime as dt import sys import time try: import serial except ImportError: print( "Missing dependency: pyserial\n" "Install it with: python -m pip install pyserial", file=sys.stderr, ) raise SystemExit(2) HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA") def parse_hex_bytes(text: str) -> bytes: normalized = text.replace(",", " ").replace("0x", "").replace("0X", "") parts = normalized.split() if not parts: raise argparse.ArgumentTypeError("hex frame cannot be empty") try: values = [int(part, 16) for part in parts] except ValueError as exc: raise argparse.ArgumentTypeError(f"invalid hex byte list: {text}") from exc if any(value < 0 or value > 0xFF for value in values): raise argparse.ArgumentTypeError("hex values must be bytes") return bytes(values) def hex_preview(data: bytes) -> str: return " ".join(f"{byte:02X}" for byte in data) def ascii_preview(data: bytes) -> str: return "".join(chr(byte) if 32 <= byte <= 126 else "." for byte in data) def make_logger(path: str | None): log_file = open(path, "a", encoding="utf-8") if path else None def emit(line: str) -> None: print(line) if log_file: log_file.write(line + "\n") log_file.flush() return emit, log_file def heartbeat_offset(data: bytes) -> int | None: if not data: return 0 for offset in range(len(HEARTBEAT)): if all(byte == HEARTBEAT[(offset + index) % len(HEARTBEAT)] for index, byte in enumerate(data)): return offset return None def first_mismatch(data: bytes, offset: int) -> tuple[int, int, int] | None: for index, byte in enumerate(data): expected = HEARTBEAT[(offset + index) % len(HEARTBEAT)] if byte != expected: return index, byte, expected return None def classify_rx(data: bytes) -> tuple[bool, str]: if not data: return False, "no RX bytes" offset = heartbeat_offset(data) if offset is not None: full = len(data) // len(HEARTBEAT) extra = len(data) % len(HEARTBEAT) return False, f"heartbeat-compatible RX: {len(data)} bytes, offset {offset}, {full} frames + {extra} bytes" best_offset = min( range(len(HEARTBEAT)), key=lambda candidate: sum( byte != HEARTBEAT[(candidate + index) % len(HEARTBEAT)] for index, byte in enumerate(data) ), ) mismatch = first_mismatch(data, best_offset) if mismatch is None: return False, "heartbeat-compatible RX" index, byte, expected = mismatch return ( True, f"ANOMALY {len(data)} RX bytes; first mismatch at byte {index}: " f"got {byte:02X}, heartbeat offset {best_offset} expected {expected:02X}", ) def read_window(ser: serial.Serial, duration: float) -> bytes: stop_at = time.monotonic() + duration data = bytearray() while time.monotonic() < stop_at: chunk = ser.read(128) if chunk: data.extend(chunk) return bytes(data) def emit_rx(emit, label: str, data: bytes, ascii_mode: bool) -> bool: is_anomaly, note = classify_rx(data) emit(f"{label} {note}") if data and ascii_mode: emit(f"{'':14} ASCII {ascii_preview(data)}") if is_anomaly: emit(f"{label} raw {hex_preview(data)}") return is_anomaly def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Send an arbitrary host frame sequence and classify RX after each step." ) parser.add_argument("--port", required=True, help="serial port, for example COM5") parser.add_argument("--baud", type=int, default=38400) parser.add_argument("--timeout", type=float, default=0.03) parser.add_argument("--log", help="append probe log to this file") parser.add_argument("--ascii", action="store_true") parser.add_argument("--prompt", action="store_true", help="pause before starting so you can power-cycle or prepare the panel") parser.add_argument("--pre-read", type=float, default=3.0, help="seconds to observe baseline heartbeat before the first frame") parser.add_argument("--delay", type=float, default=0.0, help="extra delay after prompt/baseline before the first frame") parser.add_argument( "--frame", type=parse_hex_bytes, action="append", required=True, help="hex frame to send; repeat the flag to build a sequence", ) parser.add_argument("--frame-interval", type=float, default=0.05, help="delay between consecutive frames in the same group") parser.add_argument("--read-after-frame", type=float, default=0.8, help="seconds to read/classify after each frame") parser.add_argument("--repeat", type=int, default=1, help="how many times to send the full frame group") parser.add_argument("--repeat-interval", type=float, default=0.0, help="delay between repeated frame groups") parser.add_argument("--read-after-group", type=float, default=0.0, help="extra seconds to read/classify after each full group") parser.add_argument( "--prompt-screen", action="store_true", help="prompt for a final screen/light note so visible panel state is captured in the log", ) return parser.parse_args() def main() -> int: args = parse_args() emit, log_file = make_logger(args.log) try: with serial.Serial( port=args.port, baudrate=args.baud, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=args.timeout, write_timeout=1.0, rtscts=False, dsrdtr=False, xonxoff=False, ) as ser: emit( f"Sequence probe: {len(args.frame)} frames x {args.repeat} group(s) " f"on {ser.port} at {ser.baudrate} 8N1" ) for index, frame in enumerate(args.frame, start=1): emit(f"FRAME {index}: {hex_preview(frame)}") if args.prompt: input("Prepare/power-cycle RCP, wait for heartbeat, then press Enter: ") ser.reset_input_buffer() emit_rx(emit, "BASELINE", read_window(ser, args.pre_read), args.ascii) if args.delay > 0: time.sleep(args.delay) anomaly_count = 0 for group_index in range(1, args.repeat + 1): if group_index > 1 and args.repeat_interval > 0: time.sleep(args.repeat_interval) emit(f"BEGIN group {group_index}/{args.repeat}") for frame_index, frame in enumerate(args.frame, start=1): stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit( f"{stamp} TX group={group_index} frame={frame_index} " f"len={len(frame):03d} {hex_preview(frame)}" ) ser.write(frame) ser.flush() if args.read_after_frame > 0: label = f"{stamp} RX group={group_index} frame={frame_index}" if emit_rx(emit, label, read_window(ser, args.read_after_frame), args.ascii): anomaly_count += 1 if frame_index < len(args.frame) and args.frame_interval > 0: time.sleep(args.frame_interval) if args.read_after_group > 0: label = f"GROUP {group_index} TAIL" if emit_rx(emit, label, read_window(ser, args.read_after_group), args.ascii): anomaly_count += 1 if args.prompt_screen: note = input("Final screen/light note (blank = no note): ").strip() if note: stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit(f"{stamp} PANEL {note}") emit(f"Anomalies: {anomaly_count}") except KeyboardInterrupt: emit("Stopped.") return 0 except serial.SerialException as exc: print(f"Serial error: {exc}", file=sys.stderr) return 1 finally: if log_file: log_file.close() return 0 if __name__ == "__main__": raise SystemExit(main())