#!/usr/bin/env python3 """Small serial sniffer for Sony RCP-TX7 restoration work. This is intended for RX-only capture from a USB serial adapter. Default settings match the CCU-D50/TX7 notes for RCP-TX7-class remotes: 38400 baud, 8 data bits, no parity, 1 stop bit. """ from __future__ import annotations import argparse import datetime as dt import sys import time try: import serial from serial.tools import list_ports except ImportError: print( "Missing dependency: pyserial\n" "Install it with: python -m pip install pyserial", file=sys.stderr, ) raise SystemExit(2) def ascii_preview(data: bytes) -> str: return "".join(chr(byte) if 32 <= byte <= 126 else "." for byte in data) def hex_preview(data: bytes) -> str: return " ".join(f"{byte:02X}" for byte in data) def list_serial_ports() -> None: ports = list(list_ports.comports()) if not ports: print("No serial ports found.") return for port in ports: parts = [port.device] if port.description: parts.append(port.description) if port.hwid: parts.append(port.hwid) print(" | ".join(parts)) def sniff(args: argparse.Namespace) -> int: log_file = open(args.log, "a", encoding="utf-8") if args.log else None def emit(line: str) -> None: print(line) if log_file: log_file.write(line + "\n") log_file.flush() with serial.Serial( port=args.port, baudrate=args.baud, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=args.timeout, rtscts=False, dsrdtr=False, xonxoff=False, ) as ser: ser.reset_input_buffer() print( f"Listening on {ser.port} at {ser.baudrate} 8N1. " "Press Ctrl+C to stop." ) print("Tip: press RCP buttons and watch for new hex bytes.") last_rx = time.monotonic() frame_buffer = bytearray() while True: data = ser.read(args.chunk_size) now = time.monotonic() if data: if args.frame_size: frame_buffer.extend(data) while len(frame_buffer) >= args.frame_size: frame = bytes(frame_buffer[: args.frame_size]) del frame_buffer[: args.frame_size] stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit( f"{stamp} frame {args.frame_size:03d} " f"{hex_preview(frame)}" ) if args.ascii: emit(f"{'':14} ASCII {ascii_preview(frame)}") else: stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit(f"{stamp} {len(data):03d} bytes {hex_preview(data)}") if args.ascii: emit(f"{'':14} ASCII {ascii_preview(data)}") last_rx = now elif args.heartbeat and now - last_rx >= args.heartbeat: stamp = dt.datetime.now().strftime("%H:%M:%S") emit(f"{stamp} no data") last_rx = now def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="RX-only serial sniffer for RCP-TX7 experiments." ) parser.add_argument( "--list", action="store_true", help="list available serial ports and exit", ) parser.add_argument( "--port", help="serial port, for example COM3 on Windows or /dev/ttyUSB0 on Linux", ) parser.add_argument("--baud", type=int, default=38400) parser.add_argument("--timeout", type=float, default=0.2) parser.add_argument("--chunk-size", type=int, default=64) parser.add_argument( "--ascii", action="store_true", help="also print printable ASCII preview", ) parser.add_argument( "--heartbeat", type=float, default=5.0, help="print 'no data' every N seconds while idle; set 0 to disable", ) parser.add_argument( "--frame-size", type=int, default=0, help="group stream into fixed-size frames, for example 6", ) parser.add_argument( "--log", help="append capture output to this text file", ) return parser.parse_args() def main() -> int: args = parse_args() if args.list: list_serial_ports() return 0 if not args.port: print("Pass --port COMx, or run --list first.", file=sys.stderr) return 2 try: return sniff(args) except KeyboardInterrupt: print("\nStopped.") return 0 except serial.SerialException as exc: print(f"Serial error: {exc}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())