171 lines
4.9 KiB
Python
171 lines
4.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Small serial sniffer for Sony RCP-TX7 restoration work.
|
|
|
|
This is intended for RX-only capture from a USB serial adapter.
|
|
Default settings match the CCU-D50/TX7 notes for RCP-TX7-class remotes:
|
|
38400 baud, 8 data bits, no parity, 1 stop bit.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import sys
|
|
import time
|
|
|
|
try:
|
|
import serial
|
|
from serial.tools import list_ports
|
|
except ImportError:
|
|
print(
|
|
"Missing dependency: pyserial\n"
|
|
"Install it with: python -m pip install pyserial",
|
|
file=sys.stderr,
|
|
)
|
|
raise SystemExit(2)
|
|
|
|
|
|
def ascii_preview(data: bytes) -> str:
|
|
return "".join(chr(byte) if 32 <= byte <= 126 else "." for byte in data)
|
|
|
|
|
|
def hex_preview(data: bytes) -> str:
|
|
return " ".join(f"{byte:02X}" for byte in data)
|
|
|
|
|
|
def list_serial_ports() -> None:
|
|
ports = list(list_ports.comports())
|
|
if not ports:
|
|
print("No serial ports found.")
|
|
return
|
|
|
|
for port in ports:
|
|
parts = [port.device]
|
|
if port.description:
|
|
parts.append(port.description)
|
|
if port.hwid:
|
|
parts.append(port.hwid)
|
|
print(" | ".join(parts))
|
|
|
|
|
|
def sniff(args: argparse.Namespace) -> int:
|
|
log_file = open(args.log, "a", encoding="utf-8") if args.log else None
|
|
|
|
def emit(line: str) -> None:
|
|
print(line)
|
|
if log_file:
|
|
log_file.write(line + "\n")
|
|
log_file.flush()
|
|
|
|
with serial.Serial(
|
|
port=args.port,
|
|
baudrate=args.baud,
|
|
bytesize=serial.EIGHTBITS,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE,
|
|
timeout=args.timeout,
|
|
rtscts=False,
|
|
dsrdtr=False,
|
|
xonxoff=False,
|
|
) as ser:
|
|
ser.reset_input_buffer()
|
|
print(
|
|
f"Listening on {ser.port} at {ser.baudrate} 8N1. "
|
|
"Press Ctrl+C to stop."
|
|
)
|
|
print("Tip: press RCP buttons and watch for new hex bytes.")
|
|
|
|
last_rx = time.monotonic()
|
|
frame_buffer = bytearray()
|
|
while True:
|
|
data = ser.read(args.chunk_size)
|
|
now = time.monotonic()
|
|
|
|
if data:
|
|
if args.frame_size:
|
|
frame_buffer.extend(data)
|
|
while len(frame_buffer) >= args.frame_size:
|
|
frame = bytes(frame_buffer[: args.frame_size])
|
|
del frame_buffer[: args.frame_size]
|
|
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
emit(
|
|
f"{stamp} frame {args.frame_size:03d} "
|
|
f"{hex_preview(frame)}"
|
|
)
|
|
if args.ascii:
|
|
emit(f"{'':14} ASCII {ascii_preview(frame)}")
|
|
else:
|
|
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
emit(f"{stamp} {len(data):03d} bytes {hex_preview(data)}")
|
|
if args.ascii:
|
|
emit(f"{'':14} ASCII {ascii_preview(data)}")
|
|
last_rx = now
|
|
elif args.heartbeat and now - last_rx >= args.heartbeat:
|
|
stamp = dt.datetime.now().strftime("%H:%M:%S")
|
|
emit(f"{stamp} no data")
|
|
last_rx = now
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="RX-only serial sniffer for RCP-TX7 experiments."
|
|
)
|
|
parser.add_argument(
|
|
"--list",
|
|
action="store_true",
|
|
help="list available serial ports and exit",
|
|
)
|
|
parser.add_argument(
|
|
"--port",
|
|
help="serial port, for example COM3 on Windows or /dev/ttyUSB0 on Linux",
|
|
)
|
|
parser.add_argument("--baud", type=int, default=38400)
|
|
parser.add_argument("--timeout", type=float, default=0.2)
|
|
parser.add_argument("--chunk-size", type=int, default=64)
|
|
parser.add_argument(
|
|
"--ascii",
|
|
action="store_true",
|
|
help="also print printable ASCII preview",
|
|
)
|
|
parser.add_argument(
|
|
"--heartbeat",
|
|
type=float,
|
|
default=5.0,
|
|
help="print 'no data' every N seconds while idle; set 0 to disable",
|
|
)
|
|
parser.add_argument(
|
|
"--frame-size",
|
|
type=int,
|
|
default=0,
|
|
help="group stream into fixed-size frames, for example 6",
|
|
)
|
|
parser.add_argument(
|
|
"--log",
|
|
help="append capture output to this text file",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
if args.list:
|
|
list_serial_ports()
|
|
return 0
|
|
|
|
if not args.port:
|
|
print("Pass --port COMx, or run --list first.", file=sys.stderr)
|
|
return 2
|
|
|
|
try:
|
|
return sniff(args)
|
|
except KeyboardInterrupt:
|
|
print("\nStopped.")
|
|
return 0
|
|
except serial.SerialException as exc:
|
|
print(f"Serial error: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|