#!/usr/bin/env python3 """Targeted RCP-TX7 host-frame field probe. This is for the phase after a command sweep finds parser-visible commands. It keeps the observed 6-byte frame shape: prefix1 prefix2 command state value checksum and tries a small matrix of selected field values with the checksum hypothesis: checksum = 0x5A xor prefix1 xor prefix2 xor command xor state xor value Use narrow ranges and reset the RCP between screen-triggering attempts when you need independent observations. """ from __future__ import annotations import argparse import datetime as dt import itertools import sys import time try: import serial except ImportError: print( "Missing dependency: pyserial\n" "Install it with: python -m pip install pyserial", file=sys.stderr, ) raise SystemExit(2) HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA") def parse_byte(text: str) -> int: value = int(text, 0) if not 0 <= value <= 0xFF: raise argparse.ArgumentTypeError(f"must be a byte: {text}") return value def parse_byte_set(text: str) -> list[int]: values: list[int] = [] for part in text.replace(",", " ").split(): if "-" in part: start_text, end_text = part.split("-", 1) start = parse_byte(start_text) end = parse_byte(end_text) if end < start: raise argparse.ArgumentTypeError(f"bad range: {part}") values.extend(range(start, end + 1)) else: values.append(parse_byte(part)) if not values: raise argparse.ArgumentTypeError("empty byte set") deduped = list(dict.fromkeys(values)) return deduped def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes: body = bytes([prefix1, prefix2, command, state, value]) checksum = 0x5A for byte in body: checksum ^= byte return body + bytes([checksum]) def hex_preview(data: bytes) -> str: return " ".join(f"{byte:02X}" for byte in data) def make_logger(path: str | None): log_file = open(path, "a", encoding="utf-8") if path else None def emit(line: str) -> None: print(line) if log_file: log_file.write(line + "\n") log_file.flush() return emit, log_file def drain_rx(ser: serial.Serial, emit, until: float, frame_size: int) -> int: buffer = bytearray() interesting = 0 while time.monotonic() < until: data = ser.read(64) if not data: continue buffer.extend(data) while len(buffer) >= frame_size: frame = bytes(buffer[:frame_size]) del buffer[:frame_size] stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] marker = "" if frame == HEARTBEAT else " NON_HEARTBEAT" emit(f"{stamp} RX frame {frame_size:03d} {hex_preview(frame)}{marker}") if frame != HEARTBEAT: interesting += 1 return interesting def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Probe selected RCP-TX7 frame fields and log screen observations." ) parser.add_argument("--port", required=True, help="serial port, for example COM5") parser.add_argument("--baud", type=int, default=38400) parser.add_argument("--prefix1s", type=parse_byte_set, default=[0x00]) parser.add_argument("--prefix2s", type=parse_byte_set, default=[0x00]) parser.add_argument("--commands", type=parse_byte_set, default=[0x15]) parser.add_argument("--states", type=parse_byte_set, default=[0x00, 0x80]) parser.add_argument("--values", type=parse_byte_set, default=[0x00, 0x80]) parser.add_argument("--settle", type=float, default=1.5) parser.add_argument("--after-each", type=float, default=1.0) parser.add_argument("--after", type=float, default=3.0) parser.add_argument("--frame-size", type=int, default=6) parser.add_argument("--log", help="append capture/transmit log to this file") parser.add_argument("--prompt-screen", action="store_true") parser.add_argument("--dry-run", action="store_true") parser.add_argument( "--max-frames", type=int, default=64, help="safety limit for generated frames", ) return parser.parse_args() def main() -> int: args = parse_args() rows = list( itertools.product( args.prefix1s, args.prefix2s, args.commands, args.states, args.values, ) ) if len(rows) > args.max_frames: raise SystemExit( f"Refusing to send {len(rows)} frames; raise --max-frames if intentional" ) frames = [ (prefix1, prefix2, command, state, value, build_frame(prefix1, prefix2, command, state, value)) for prefix1, prefix2, command, state, value in rows ] if args.dry_run: for prefix1, prefix2, command, state, value, frame in frames: print( f"p1=0x{prefix1:02X} p2=0x{prefix2:02X} cmd=0x{command:02X} " f"state=0x{state:02X} value=0x{value:02X}: {hex_preview(frame)}" ) return 0 emit, log_file = make_logger(args.log) try: with serial.Serial( port=args.port, baudrate=args.baud, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=0.05, write_timeout=1.0, rtscts=False, dsrdtr=False, xonxoff=False, ) as ser: ser.reset_input_buffer() emit(f"Probing {len(frames)} field combinations on {ser.port} at {ser.baudrate} 8N1") drain_rx(ser, emit, time.monotonic() + args.settle, args.frame_size) for prefix1, prefix2, command, state, value, frame in frames: stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit( f"{stamp} TX fields p1=0x{prefix1:02X} p2=0x{prefix2:02X} " f"cmd=0x{command:02X} state=0x{state:02X} value=0x{value:02X} " f"frame {len(frame):03d} {hex_preview(frame)}" ) ser.write(frame) ser.flush() drain_rx(ser, emit, time.monotonic() + args.after_each, args.frame_size) if args.prompt_screen: screen = input( "Screen after " f"cmd 0x{command:02X} state 0x{state:02X} value 0x{value:02X} " "(blank = no change, q = stop): " ).strip() if screen: stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit( f"{stamp} SCREEN p1=0x{prefix1:02X} p2=0x{prefix2:02X} " f"cmd=0x{command:02X} state=0x{state:02X} " f"value=0x{value:02X} {screen}" ) if screen.lower() in {"q", "quit", "stop"}: break drain_rx(ser, emit, time.monotonic() + args.after, args.frame_size) except KeyboardInterrupt: print("Stopped.") return 0 except serial.SerialException as exc: print(f"Serial error: {exc}", file=sys.stderr) return 1 finally: if log_file: log_file.close() return 0 if __name__ == "__main__": raise SystemExit(main())