Files
Sony-rcp/scripts/serial_sniff.py
2026-05-13 12:30:37 +10:00

171 lines
4.9 KiB
Python

#!/usr/bin/env python3
"""Small serial sniffer for Sony RCP-TX7 restoration work.
This is intended for RX-only capture from a USB serial adapter.
Default settings match the CCU-D50/TX7 notes for RCP-TX7-class remotes:
38400 baud, 8 data bits, no parity, 1 stop bit.
"""
from __future__ import annotations
import argparse
import datetime as dt
import sys
import time
try:
import serial
from serial.tools import list_ports
except ImportError:
print(
"Missing dependency: pyserial\n"
"Install it with: python -m pip install pyserial",
file=sys.stderr,
)
raise SystemExit(2)
def ascii_preview(data: bytes) -> str:
return "".join(chr(byte) if 32 <= byte <= 126 else "." for byte in data)
def hex_preview(data: bytes) -> str:
return " ".join(f"{byte:02X}" for byte in data)
def list_serial_ports() -> None:
ports = list(list_ports.comports())
if not ports:
print("No serial ports found.")
return
for port in ports:
parts = [port.device]
if port.description:
parts.append(port.description)
if port.hwid:
parts.append(port.hwid)
print(" | ".join(parts))
def sniff(args: argparse.Namespace) -> int:
log_file = open(args.log, "a", encoding="utf-8") if args.log else None
def emit(line: str) -> None:
print(line)
if log_file:
log_file.write(line + "\n")
log_file.flush()
with serial.Serial(
port=args.port,
baudrate=args.baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=args.timeout,
rtscts=False,
dsrdtr=False,
xonxoff=False,
) as ser:
ser.reset_input_buffer()
print(
f"Listening on {ser.port} at {ser.baudrate} 8N1. "
"Press Ctrl+C to stop."
)
print("Tip: press RCP buttons and watch for new hex bytes.")
last_rx = time.monotonic()
frame_buffer = bytearray()
while True:
data = ser.read(args.chunk_size)
now = time.monotonic()
if data:
if args.frame_size:
frame_buffer.extend(data)
while len(frame_buffer) >= args.frame_size:
frame = bytes(frame_buffer[: args.frame_size])
del frame_buffer[: args.frame_size]
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(
f"{stamp} frame {args.frame_size:03d} "
f"{hex_preview(frame)}"
)
if args.ascii:
emit(f"{'':14} ASCII {ascii_preview(frame)}")
else:
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} {len(data):03d} bytes {hex_preview(data)}")
if args.ascii:
emit(f"{'':14} ASCII {ascii_preview(data)}")
last_rx = now
elif args.heartbeat and now - last_rx >= args.heartbeat:
stamp = dt.datetime.now().strftime("%H:%M:%S")
emit(f"{stamp} no data")
last_rx = now
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="RX-only serial sniffer for RCP-TX7 experiments."
)
parser.add_argument(
"--list",
action="store_true",
help="list available serial ports and exit",
)
parser.add_argument(
"--port",
help="serial port, for example COM3 on Windows or /dev/ttyUSB0 on Linux",
)
parser.add_argument("--baud", type=int, default=38400)
parser.add_argument("--timeout", type=float, default=0.2)
parser.add_argument("--chunk-size", type=int, default=64)
parser.add_argument(
"--ascii",
action="store_true",
help="also print printable ASCII preview",
)
parser.add_argument(
"--heartbeat",
type=float,
default=5.0,
help="print 'no data' every N seconds while idle; set 0 to disable",
)
parser.add_argument(
"--frame-size",
type=int,
default=0,
help="group stream into fixed-size frames, for example 6",
)
parser.add_argument(
"--log",
help="append capture output to this text file",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
if args.list:
list_serial_ports()
return 0
if not args.port:
print("Pass --port COMx, or run --list first.", file=sys.stderr)
return 2
try:
return sniff(args)
except KeyboardInterrupt:
print("\nStopped.")
return 0
except serial.SerialException as exc:
print(f"Serial error: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())