#!/usr/bin/env python3 """Listen on RX while sending candidate RCP-TX7 host-response frames. Windows usually allows only one process to open a COM port, so this script combines capture and transmit in one pyserial session. """ from __future__ import annotations import argparse import datetime as dt import sys import time try: import serial except ImportError: print( "Missing dependency: pyserial\n" "Install it with: python -m pip install pyserial", file=sys.stderr, ) raise SystemExit(2) def parse_hex_bytes(text: str) -> bytes: normalized = text.replace(",", " ").replace("0x", "").replace("0X", "") parts = normalized.split() try: values = [int(part, 16) for part in parts] except ValueError as exc: raise argparse.ArgumentTypeError(f"invalid hex byte list: {text}") from exc if not values: raise argparse.ArgumentTypeError("hex frame cannot be empty") if any(value < 0 or value > 0xFF for value in values): raise argparse.ArgumentTypeError("hex values must be bytes") return bytes(values) def hex_preview(data: bytes) -> str: return " ".join(f"{byte:02X}" for byte in data) def ascii_preview(data: bytes) -> str: return "".join(chr(byte) if 32 <= byte <= 126 else "." for byte in data) def make_logger(path: str | None): log_file = open(path, "a", encoding="utf-8") if path else None def emit(line: str) -> None: print(line) if log_file: log_file.write(line + "\n") log_file.flush() return emit, log_file def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Capture RX frames while sending candidate TX responses." ) parser.add_argument("--port", required=True, help="serial port, for example COM3") parser.add_argument("--baud", type=int, default=38400) parser.add_argument( "--tx-frame", type=parse_hex_bytes, default=parse_hex_bytes("00 00 00 00 80 DA"), help="hex frame to send on TXD", ) parser.add_argument("--repeat", type=int, default=5) parser.add_argument("--interval", type=float, default=0.2) parser.add_argument( "--delay", type=float, default=3.0, help="seconds to listen before first transmit", ) parser.add_argument( "--after", type=float, default=5.0, help="seconds to keep listening after the last transmit", ) parser.add_argument("--frame-size", type=int, default=6) parser.add_argument("--chunk-size", type=int, default=64) parser.add_argument("--timeout", type=float, default=0.05) parser.add_argument("--ascii", action="store_true") parser.add_argument("--log", help="append capture/transmit log to this file") return parser.parse_args() def main() -> int: args = parse_args() emit, log_file = make_logger(args.log) frame_buffer = bytearray() try: with serial.Serial( port=args.port, baudrate=args.baud, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=args.timeout, write_timeout=1.0, rtscts=False, dsrdtr=False, xonxoff=False, ) as ser: ser.reset_input_buffer() start = time.monotonic() next_tx = start + args.delay sent = 0 stop_at = None emit( f"Listening on {ser.port} at {ser.baudrate} 8N1; " f"will send {hex_preview(args.tx_frame)} after {args.delay:.1f}s" ) while True: now = time.monotonic() if sent < args.repeat and now >= next_tx: ser.write(args.tx_frame) ser.flush() stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit(f"{stamp} TX frame {len(args.tx_frame):03d} {hex_preview(args.tx_frame)}") sent += 1 next_tx = now + args.interval if sent == args.repeat: stop_at = now + args.after data = ser.read(args.chunk_size) if data: if args.frame_size: frame_buffer.extend(data) while len(frame_buffer) >= args.frame_size: frame = bytes(frame_buffer[: args.frame_size]) del frame_buffer[: args.frame_size] stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit(f"{stamp} RX frame {args.frame_size:03d} {hex_preview(frame)}") if args.ascii: emit(f"{'':14} ASCII {ascii_preview(frame)}") else: stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit(f"{stamp} RX {len(data):03d} bytes {hex_preview(data)}") if stop_at is not None and now >= stop_at: break except KeyboardInterrupt: emit("Stopped.") return 0 except serial.SerialException as exc: print(f"Serial error: {exc}", file=sys.stderr) return 1 finally: if log_file: log_file.close() return 0 if __name__ == "__main__": raise SystemExit(main())