#!/usr/bin/env python3 """Button-focused RCP-TX7 serial tests. This helper can: 1. Optionally put the RCP into a latched state with a known primer/query. 2. Listen for the button frames that are known to appear while disconnected. 3. Optionally transmit response frames when CAM POWER or CALL is observed. 4. Optionally mirror CALL high/low events with matching host responses. 5. Optionally transmit follow-up frames when a watched response frame appears. Known RCP-origin button frames: CAM POWER: 00 00 07 80 00 DD CALL on: 00 00 15 80 00 CF CALL off: 00 00 15 00 00 4F """ from __future__ import annotations import argparse import datetime as dt import sys import time try: import serial except ImportError: print( "Missing dependency: pyserial\n" "Install it with: python -m pip install pyserial", file=sys.stderr, ) raise SystemExit(2) HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA") CAM_POWER = bytes.fromhex("00 00 07 80 00 DD") CALL_ON = bytes.fromhex("00 00 15 80 00 CF") CALL_OFF = bytes.fromhex("00 00 15 00 00 4F") KNOWN_PATTERNS = { "heartbeat": HEARTBEAT, "cam-power": CAM_POWER, "call-on": CALL_ON, "call-off": CALL_OFF, } def parse_byte(text: str) -> int: value = int(text, 0) if not 0 <= value <= 0xFF: raise argparse.ArgumentTypeError(f"must be a byte: {text}") return value def parse_hex_bytes(text: str) -> bytes: normalized = text.replace(",", " ").replace("0x", "").replace("0X", "") parts = normalized.split() if not parts: raise argparse.ArgumentTypeError("hex frame cannot be empty") try: values = [int(part, 16) for part in parts] except ValueError as exc: raise argparse.ArgumentTypeError(f"invalid hex byte list: {text}") from exc if any(value < 0 or value > 0xFF for value in values): raise argparse.ArgumentTypeError("hex values must be bytes") return bytes(values) def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes: body = bytes([prefix1, prefix2, command, state, value]) checksum = 0x5A for byte in body: checksum ^= byte return body + bytes([checksum]) def hex_preview(data: bytes) -> str: return " ".join(f"{byte:02X}" for byte in data) def ascii_preview(data: bytes) -> str: return "".join(chr(byte) if 32 <= byte <= 126 else "." for byte in data) def make_logger(path: str | None): log_file = open(path, "a", encoding="utf-8") if path else None def emit(line: str) -> None: print(line) if log_file: log_file.write(line + "\n") log_file.flush() return emit, log_file def read_window(ser: serial.Serial, duration: float) -> bytes: stop_at = time.monotonic() + duration data = bytearray() while time.monotonic() < stop_at: chunk = ser.read(128) if chunk: data.extend(chunk) return bytes(data) def send_frame(ser: serial.Serial, emit, label: str, frame: bytes) -> None: stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit(f"{stamp} TX {label} frame {len(frame):03d} {hex_preview(frame)}") ser.write(frame) ser.flush() def emit_known_counts(emit, label: str, data: bytes) -> None: if not data: emit(f"{label} no RX bytes") return counts = { name: data.count(pattern) for name, pattern in KNOWN_PATTERNS.items() if data.count(pattern) } count_text = ", ".join(f"{name}={count}" for name, count in counts.items()) or "no known complete frames" emit(f"{label} RX {len(data)} bytes; {count_text}") if counts.get("cam-power") or counts.get("call-on") or counts.get("call-off"): emit(f"{label} raw {hex_preview(data)}") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Listen for CAM POWER/CALL frames and optionally respond to CAM POWER." ) parser.add_argument("--port", required=True, help="serial port, for example COM5") parser.add_argument("--baud", type=int, default=38400) parser.add_argument("--timeout", type=float, default=0.03) parser.add_argument("--duration", type=float, default=30.0) parser.add_argument("--chunk-size", type=int, default=128) parser.add_argument("--log", help="append capture/transmit log to this file") parser.add_argument("--ascii", action="store_true") parser.add_argument("--latch", action="store_true", help="send latch primer/query before listening") parser.add_argument("--latch-primer-command", type=parse_byte, default=0x00) parser.add_argument("--latch-query-command", type=parse_byte, default=0xB5) parser.add_argument("--state", type=parse_byte, default=0x00) parser.add_argument("--value", type=parse_byte, default=0x80) parser.add_argument("--after-latch", type=float, default=1.0) parser.add_argument("--respond-to-cam-power", action="store_true") parser.add_argument("--respond-to-call", action="store_true") parser.add_argument( "--mirror-call", action="store_true", help="respond to CALL high with CALL high and CALL low with CALL low", ) parser.add_argument( "--response-frame", type=parse_hex_bytes, action="append", help="hex frame to send when CAM POWER is seen; can be repeated", ) parser.add_argument( "--watch-frame", type=parse_hex_bytes, action="append", help="hex frame to count when it appears in RX; can be repeated", ) parser.add_argument( "--followup-on-watch-frame", action="store_true", help="send follow-up frame(s) when any watched frame is observed", ) parser.add_argument( "--followup-frame", type=parse_hex_bytes, action="append", help="hex frame to send after a watched frame appears; can be repeated", ) parser.add_argument("--followup-delay", type=float, default=0.05) parser.add_argument("--response-delay", type=float, default=0.05) parser.add_argument("--response-repeat", type=int, default=1) parser.add_argument("--response-interval", type=float, default=0.2) parser.add_argument( "--respond-once", action="store_true", help="only transmit the response on the first matched button frame", ) parser.add_argument( "--mirror-call-once-per-state", action="store_true", help="with --mirror-call, respond once to CALL high and once to CALL low", ) parser.add_argument("--prompt", action="store_true", help="pause before listen so you can prepare button presses") args = parser.parse_args() if args.followup_on_watch_frame and not args.followup_frame: parser.error("--followup-on-watch-frame requires at least one --followup-frame") return args def main() -> int: args = parse_args() emit, log_file = make_logger(args.log) response_frames = args.response_frame or [CAM_POWER] watch_frames = args.watch_frame or [] primer = build_frame(0x00, 0x00, args.latch_primer_command, args.state, args.value) query = build_frame(0x00, 0x00, args.latch_query_command, args.state, args.value) responded = False followup_sent = False mirrored_call_on = False mirrored_call_off = False totals = {name: 0 for name in KNOWN_PATTERNS} watch_totals = {hex_preview(frame): 0 for frame in watch_frames} try: with serial.Serial( port=args.port, baudrate=args.baud, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=args.timeout, write_timeout=1.0, rtscts=False, dsrdtr=False, xonxoff=False, ) as ser: ser.reset_input_buffer() emit(f"Button test on {ser.port} at {ser.baudrate} 8N1") if args.latch: send_frame(ser, emit, "latch primer", primer) emit_known_counts(emit, "LATCH PRIMER", read_window(ser, args.after_latch)) send_frame(ser, emit, "latch query", query) emit_known_counts(emit, "LATCH QUERY", read_window(ser, args.after_latch)) if args.prompt: input("Ready to listen. Press Enter, then press CAM POWER/CALL on the RCP: ") ser.reset_input_buffer() emit( f"Listening for {args.duration:.1f}s; " f"respond_to_cam_power={args.respond_to_cam_power}, " f"respond_to_call={args.respond_to_call}, " f"mirror_call={args.mirror_call}" ) stop_at = time.monotonic() + args.duration buffer = bytearray() while time.monotonic() < stop_at: data = ser.read(args.chunk_size) if not data: continue stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit(f"{stamp} RX {len(data):03d} bytes {hex_preview(data)}") if args.ascii: emit(f"{'':14} ASCII {ascii_preview(data)}") buffer.extend(data) for name, pattern in KNOWN_PATTERNS.items(): count = data.count(pattern) if count: totals[name] += count emit(f"{stamp} DETECT {name} x{count}") for frame in watch_frames: count = data.count(frame) if count: key = hex_preview(frame) watch_totals[key] += count emit(f"{stamp} DETECT watch-frame {key} x{count}") if args.followup_on_watch_frame and not followup_sent: followup_sent = True time.sleep(args.followup_delay) for followup in args.followup_frame or []: send_frame(ser, emit, "watch follow-up", followup) if args.mirror_call: mirrored = False if CALL_ON in buffer and not ( args.mirror_call_once_per_state and mirrored_call_on ): send_frame(ser, emit, "CALL high mirror", CALL_ON) mirrored_call_on = True responded = True mirrored = True if CALL_OFF in buffer and not ( args.mirror_call_once_per_state and mirrored_call_off ): send_frame(ser, emit, "CALL low mirror", CALL_OFF) mirrored_call_off = True responded = True mirrored = True if mirrored: buffer.clear() continue should_respond = ( ( (args.respond_to_cam_power and CAM_POWER in buffer) or (args.respond_to_call and (CALL_ON in buffer or CALL_OFF in buffer)) ) and not (args.respond_once and responded) ) if should_respond: responded = True time.sleep(args.response_delay) for _ in range(args.response_repeat): for response in response_frames: send_frame(ser, emit, "button response", response) if args.response_repeat > 1: time.sleep(args.response_interval) buffer.clear() elif len(buffer) > 256: del buffer[:-64] emit( "Totals: " + ", ".join(f"{name}={count}" for name, count in totals.items()) ) if watch_totals: emit( "Watch totals: " + ", ".join(f"{frame}={count}" for frame, count in watch_totals.items()) ) except KeyboardInterrupt: emit("Stopped.") return 0 except serial.SerialException as exc: print(f"Serial error: {exc}", file=sys.stderr) return 1 finally: if log_file: log_file.close() return 0 if __name__ == "__main__": raise SystemExit(main())