#!/usr/bin/env python3 """Cautious RCP-TX7 host-frame command sweep. This sends checksum-valid 6-byte frames of the observed shape: prefix1 prefix2 command state value checksum The checksum is the current working hypothesis: checksum = 0x5A xor prefix1 xor prefix2 xor command xor state xor value The script logs TX frames plus any RX frames. It highlights RX frames that differ from the known heartbeat so a run can be scanned quickly. """ from __future__ import annotations import argparse import datetime as dt import sys import time try: import serial except ImportError: print( "Missing dependency: pyserial\n" "Install it with: python -m pip install pyserial", file=sys.stderr, ) raise SystemExit(2) HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA") def parse_int(text: str) -> int: value = int(text, 0) if not 0 <= value <= 0xFF: raise argparse.ArgumentTypeError(f"must be a byte: {text}") return value def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes: body = bytes([prefix1, prefix2, command, state, value]) checksum = 0x5A for byte in body: checksum ^= byte return body + bytes([checksum]) def hex_preview(data: bytes) -> str: return " ".join(f"{byte:02X}" for byte in data) def make_logger(path: str | None): log_file = open(path, "a", encoding="utf-8") if path else None def emit(line: str) -> None: print(line) if log_file: log_file.write(line + "\n") log_file.flush() return emit, log_file def drain_rx(ser: serial.Serial, emit, until: float, frame_size: int) -> int: buffer = bytearray() interesting = 0 while time.monotonic() < until: data = ser.read(64) if not data: continue buffer.extend(data) while len(buffer) >= frame_size: frame = bytes(buffer[:frame_size]) del buffer[:frame_size] stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] marker = "" if frame == HEARTBEAT else " NON_HEARTBEAT" emit(f"{stamp} RX frame {frame_size:03d} {hex_preview(frame)}{marker}") if frame != HEARTBEAT: interesting += 1 return interesting def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Sweep checksum-valid command bytes and log non-heartbeat RX." ) parser.add_argument("--port", required=True, help="serial port, for example COM5") parser.add_argument("--baud", type=int, default=38400) parser.add_argument("--start", type=parse_int, default=0x00) parser.add_argument("--end", type=parse_int, default=0x20) parser.add_argument("--prefix1", type=parse_int, default=0x00) parser.add_argument("--prefix2", type=parse_int, default=0x00) parser.add_argument("--state", type=parse_int, default=0x00) parser.add_argument("--value", type=parse_int, default=0x80) parser.add_argument( "--settle", type=float, default=1.5, help="seconds to listen before the sweep starts", ) parser.add_argument( "--after-each", type=float, default=0.8, help="seconds to listen after each transmitted frame", ) parser.add_argument( "--after", type=float, default=3.0, help="seconds to listen after the whole sweep", ) parser.add_argument("--frame-size", type=int, default=6) parser.add_argument("--log", help="append capture/transmit log to this file") parser.add_argument( "--stop-on-non-heartbeat", action="store_true", help="stop the sweep if the RCP sends any non-heartbeat frame", ) parser.add_argument( "--prompt-screen", action="store_true", help="after each command, prompt for the observed RCP screen state", ) parser.add_argument( "--dry-run", action="store_true", help="print frames without opening the serial port", ) return parser.parse_args() def main() -> int: args = parse_args() if args.end < args.start: raise SystemExit("--end must be >= --start") frames = [ (command, build_frame(args.prefix1, args.prefix2, command, args.state, args.value)) for command in range(args.start, args.end + 1) ] if args.dry_run: for command, frame in frames: print(f"cmd 0x{command:02X}: {hex_preview(frame)}") return 0 emit, log_file = make_logger(args.log) try: with serial.Serial( port=args.port, baudrate=args.baud, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=0.05, write_timeout=1.0, rtscts=False, dsrdtr=False, xonxoff=False, ) as ser: ser.reset_input_buffer() emit( f"Sweeping commands 0x{args.start:02X}-0x{args.end:02X} " f"on {ser.port} at {ser.baudrate} 8N1" ) drain_rx(ser, emit, time.monotonic() + args.settle, args.frame_size) for command, frame in frames: stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit(f"{stamp} TX cmd 0x{command:02X} frame {len(frame):03d} {hex_preview(frame)}") ser.write(frame) ser.flush() interesting = drain_rx( ser, emit, time.monotonic() + args.after_each, args.frame_size, ) if interesting and args.stop_on_non_heartbeat: emit(f"Stopping after cmd 0x{command:02X}: non-heartbeat RX observed") break if args.prompt_screen: screen = input( f"Screen after cmd 0x{command:02X} " "(blank = no change, q = stop): " ).strip() if screen: stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit(f"{stamp} SCREEN cmd 0x{command:02X} {screen}") if screen.lower() in {"q", "quit", "stop"}: break drain_rx(ser, emit, time.monotonic() + args.after, args.frame_size) except KeyboardInterrupt: print("Stopped.") return 0 except serial.SerialException as exc: print(f"Serial error: {exc}", file=sys.stderr) return 1 finally: if log_file: log_file.close() return 0 if __name__ == "__main__": raise SystemExit(main())