responses

This commit is contained in:
Aiden
2026-05-13 12:30:37 +10:00
parent 455886691a
commit f99a60710e
28 changed files with 2709 additions and 1 deletions

View File

@@ -0,0 +1,55 @@
#!/usr/bin/env python3
"""Summarize fixed-size hex frames from serial_sniff.py logs."""
from __future__ import annotations
import argparse
import collections
import re
from pathlib import Path
FRAME_RE = re.compile(r"\b(?:(RX|TX)\s+)?frame\s+\d+\s+((?:[0-9A-Fa-f]{2}\s*)+)$")
def frames_from_log(path: Path) -> list[tuple[str, str]]:
frames: list[tuple[str, str]] = []
for line in path.read_text(encoding="utf-8").splitlines():
match = FRAME_RE.search(line.strip())
if match:
direction = match.group(1) or "RX"
frame = " ".join(match.group(2).upper().split())
frames.append((direction, frame))
return frames
def checksum_note(frame: str) -> str:
values = [int(part, 16) for part in frame.split()]
if len(values) != 6:
return ""
checksum = 0x5A
for value in values[:5]:
checksum ^= value
if checksum == values[5]:
return " checksum ok"
return f" checksum expected {checksum:02X}"
def main() -> int:
parser = argparse.ArgumentParser(description="Count frames in capture logs.")
parser.add_argument("logs", nargs="+", type=Path)
args = parser.parse_args()
for log in args.logs:
frames = frames_from_log(log)
counts = collections.Counter(frames)
print(f"{log}: {len(frames)} frames, {len(counts)} unique direction/frame pairs")
for (direction, frame), count in counts.most_common():
print(f" {count:5d} {direction:<2} {frame}{checksum_note(frame)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,168 @@
#!/usr/bin/env python3
"""Listen on RX while sending candidate RCP-TX7 host-response frames.
Windows usually allows only one process to open a COM port, so this script
combines capture and transmit in one pyserial session.
"""
from __future__ import annotations
import argparse
import datetime as dt
import sys
import time
try:
import serial
except ImportError:
print(
"Missing dependency: pyserial\n"
"Install it with: python -m pip install pyserial",
file=sys.stderr,
)
raise SystemExit(2)
def parse_hex_bytes(text: str) -> bytes:
normalized = text.replace(",", " ").replace("0x", "").replace("0X", "")
parts = normalized.split()
try:
values = [int(part, 16) for part in parts]
except ValueError as exc:
raise argparse.ArgumentTypeError(f"invalid hex byte list: {text}") from exc
if not values:
raise argparse.ArgumentTypeError("hex frame cannot be empty")
if any(value < 0 or value > 0xFF for value in values):
raise argparse.ArgumentTypeError("hex values must be bytes")
return bytes(values)
def hex_preview(data: bytes) -> str:
return " ".join(f"{byte:02X}" for byte in data)
def ascii_preview(data: bytes) -> str:
return "".join(chr(byte) if 32 <= byte <= 126 else "." for byte in data)
def make_logger(path: str | None):
log_file = open(path, "a", encoding="utf-8") if path else None
def emit(line: str) -> None:
print(line)
if log_file:
log_file.write(line + "\n")
log_file.flush()
return emit, log_file
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Capture RX frames while sending candidate TX responses."
)
parser.add_argument("--port", required=True, help="serial port, for example COM3")
parser.add_argument("--baud", type=int, default=38400)
parser.add_argument(
"--tx-frame",
type=parse_hex_bytes,
default=parse_hex_bytes("00 00 00 00 80 DA"),
help="hex frame to send on TXD",
)
parser.add_argument("--repeat", type=int, default=5)
parser.add_argument("--interval", type=float, default=0.2)
parser.add_argument(
"--delay",
type=float,
default=3.0,
help="seconds to listen before first transmit",
)
parser.add_argument(
"--after",
type=float,
default=5.0,
help="seconds to keep listening after the last transmit",
)
parser.add_argument("--frame-size", type=int, default=6)
parser.add_argument("--chunk-size", type=int, default=64)
parser.add_argument("--timeout", type=float, default=0.05)
parser.add_argument("--ascii", action="store_true")
parser.add_argument("--log", help="append capture/transmit log to this file")
return parser.parse_args()
def main() -> int:
args = parse_args()
emit, log_file = make_logger(args.log)
frame_buffer = bytearray()
try:
with serial.Serial(
port=args.port,
baudrate=args.baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=args.timeout,
write_timeout=1.0,
rtscts=False,
dsrdtr=False,
xonxoff=False,
) as ser:
ser.reset_input_buffer()
start = time.monotonic()
next_tx = start + args.delay
sent = 0
stop_at = None
emit(
f"Listening on {ser.port} at {ser.baudrate} 8N1; "
f"will send {hex_preview(args.tx_frame)} after {args.delay:.1f}s"
)
while True:
now = time.monotonic()
if sent < args.repeat and now >= next_tx:
ser.write(args.tx_frame)
ser.flush()
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} TX frame {len(args.tx_frame):03d} {hex_preview(args.tx_frame)}")
sent += 1
next_tx = now + args.interval
if sent == args.repeat:
stop_at = now + args.after
data = ser.read(args.chunk_size)
if data:
if args.frame_size:
frame_buffer.extend(data)
while len(frame_buffer) >= args.frame_size:
frame = bytes(frame_buffer[: args.frame_size])
del frame_buffer[: args.frame_size]
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} RX frame {args.frame_size:03d} {hex_preview(frame)}")
if args.ascii:
emit(f"{'':14} ASCII {ascii_preview(frame)}")
else:
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} RX {len(data):03d} bytes {hex_preview(data)}")
if stop_at is not None and now >= stop_at:
break
except KeyboardInterrupt:
emit("Stopped.")
return 0
except serial.SerialException as exc:
print(f"Serial error: {exc}", file=sys.stderr)
return 1
finally:
if log_file:
log_file.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""Send one Sony RCP-TX7 candidate serial frame.
Use carefully. This is for controlled experiments on the host-to-RCP line
through an RS-232 adapter. It can either send a complete hex frame or build the
observed 6-byte frame shape and checksum from fields.
"""
from __future__ import annotations
import argparse
import sys
import time
try:
import serial
except ImportError:
print(
"Missing dependency: pyserial\n"
"Install it with: python -m pip install pyserial",
file=sys.stderr,
)
raise SystemExit(2)
def parse_hex_bytes(text: str) -> bytes:
normalized = text.replace(",", " ").replace("0x", "").replace("0X", "")
parts = normalized.split()
try:
data = bytes(int(part, 16) for part in parts)
except ValueError as exc:
raise argparse.ArgumentTypeError(f"invalid hex byte list: {text}") from exc
if not data:
raise argparse.ArgumentTypeError("hex frame cannot be empty")
if any(part and int(part, 16) > 0xFF for part in parts):
raise argparse.ArgumentTypeError("hex values must be bytes")
return data
def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes:
body = bytes([prefix1, prefix2, command, state, value])
checksum = 0x5A
for byte in body:
checksum ^= byte
return body + bytes([checksum])
def hex_preview(data: bytes) -> str:
return " ".join(f"{byte:02X}" for byte in data)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Send an explicit or checksum-built 6-byte RCP-TX7 frame."
)
parser.add_argument("--port", required=True, help="serial port, for example COM3")
parser.add_argument("--baud", type=int, default=38400)
parser.add_argument(
"--frame",
type=parse_hex_bytes,
help="complete hex frame, for example \"00 00 00 00 80 DA\"",
)
parser.add_argument("--prefix1", type=lambda s: int(s, 0), default=0)
parser.add_argument("--prefix2", type=lambda s: int(s, 0), default=0)
parser.add_argument("--command", type=lambda s: int(s, 0), default=0)
parser.add_argument("--state", type=lambda s: int(s, 0), default=0)
parser.add_argument("--value", type=lambda s: int(s, 0), default=0x80)
parser.add_argument("--repeat", type=int, default=1)
parser.add_argument("--interval", type=float, default=0.2)
parser.add_argument(
"--dry-run",
action="store_true",
help="print the frame but do not open the serial port",
)
return parser.parse_args()
def validate_byte(name: str, value: int) -> None:
if not 0 <= value <= 0xFF:
raise SystemExit(f"{name} must be a byte, got {value!r}")
def main() -> int:
args = parse_args()
if args.frame:
frame = args.frame
else:
for name in ("prefix1", "prefix2", "command", "state", "value"):
validate_byte(name, getattr(args, name))
frame = build_frame(args.prefix1, args.prefix2, args.command, args.state, args.value)
print(f"Frame: {hex_preview(frame)}")
if args.dry_run:
return 0
with serial.Serial(
port=args.port,
baudrate=args.baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=0.2,
write_timeout=1.0,
rtscts=False,
dsrdtr=False,
xonxoff=False,
) as ser:
for index in range(args.repeat):
ser.write(frame)
ser.flush()
print(f"Sent {index + 1}/{args.repeat}")
if index + 1 < args.repeat:
time.sleep(args.interval)
return 0
if __name__ == "__main__":
raise SystemExit(main())

170
scripts/serial_sniff.py Normal file
View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""Small serial sniffer for Sony RCP-TX7 restoration work.
This is intended for RX-only capture from a USB serial adapter.
Default settings match the CCU-D50/TX7 notes for RCP-TX7-class remotes:
38400 baud, 8 data bits, no parity, 1 stop bit.
"""
from __future__ import annotations
import argparse
import datetime as dt
import sys
import time
try:
import serial
from serial.tools import list_ports
except ImportError:
print(
"Missing dependency: pyserial\n"
"Install it with: python -m pip install pyserial",
file=sys.stderr,
)
raise SystemExit(2)
def ascii_preview(data: bytes) -> str:
return "".join(chr(byte) if 32 <= byte <= 126 else "." for byte in data)
def hex_preview(data: bytes) -> str:
return " ".join(f"{byte:02X}" for byte in data)
def list_serial_ports() -> None:
ports = list(list_ports.comports())
if not ports:
print("No serial ports found.")
return
for port in ports:
parts = [port.device]
if port.description:
parts.append(port.description)
if port.hwid:
parts.append(port.hwid)
print(" | ".join(parts))
def sniff(args: argparse.Namespace) -> int:
log_file = open(args.log, "a", encoding="utf-8") if args.log else None
def emit(line: str) -> None:
print(line)
if log_file:
log_file.write(line + "\n")
log_file.flush()
with serial.Serial(
port=args.port,
baudrate=args.baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=args.timeout,
rtscts=False,
dsrdtr=False,
xonxoff=False,
) as ser:
ser.reset_input_buffer()
print(
f"Listening on {ser.port} at {ser.baudrate} 8N1. "
"Press Ctrl+C to stop."
)
print("Tip: press RCP buttons and watch for new hex bytes.")
last_rx = time.monotonic()
frame_buffer = bytearray()
while True:
data = ser.read(args.chunk_size)
now = time.monotonic()
if data:
if args.frame_size:
frame_buffer.extend(data)
while len(frame_buffer) >= args.frame_size:
frame = bytes(frame_buffer[: args.frame_size])
del frame_buffer[: args.frame_size]
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(
f"{stamp} frame {args.frame_size:03d} "
f"{hex_preview(frame)}"
)
if args.ascii:
emit(f"{'':14} ASCII {ascii_preview(frame)}")
else:
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} {len(data):03d} bytes {hex_preview(data)}")
if args.ascii:
emit(f"{'':14} ASCII {ascii_preview(data)}")
last_rx = now
elif args.heartbeat and now - last_rx >= args.heartbeat:
stamp = dt.datetime.now().strftime("%H:%M:%S")
emit(f"{stamp} no data")
last_rx = now
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="RX-only serial sniffer for RCP-TX7 experiments."
)
parser.add_argument(
"--list",
action="store_true",
help="list available serial ports and exit",
)
parser.add_argument(
"--port",
help="serial port, for example COM3 on Windows or /dev/ttyUSB0 on Linux",
)
parser.add_argument("--baud", type=int, default=38400)
parser.add_argument("--timeout", type=float, default=0.2)
parser.add_argument("--chunk-size", type=int, default=64)
parser.add_argument(
"--ascii",
action="store_true",
help="also print printable ASCII preview",
)
parser.add_argument(
"--heartbeat",
type=float,
default=5.0,
help="print 'no data' every N seconds while idle; set 0 to disable",
)
parser.add_argument(
"--frame-size",
type=int,
default=0,
help="group stream into fixed-size frames, for example 6",
)
parser.add_argument(
"--log",
help="append capture output to this text file",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
if args.list:
list_serial_ports()
return 0
if not args.port:
print("Pass --port COMx, or run --list first.", file=sys.stderr)
return 2
try:
return sniff(args)
except KeyboardInterrupt:
print("\nStopped.")
return 0
except serial.SerialException as exc:
print(f"Serial error: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,216 @@
#!/usr/bin/env python3
"""Cautious RCP-TX7 host-frame command sweep.
This sends checksum-valid 6-byte frames of the observed shape:
prefix1 prefix2 command state value checksum
The checksum is the current working hypothesis:
checksum = 0x5A xor prefix1 xor prefix2 xor command xor state xor value
The script logs TX frames plus any RX frames. It highlights RX frames that differ
from the known heartbeat so a run can be scanned quickly.
"""
from __future__ import annotations
import argparse
import datetime as dt
import sys
import time
try:
import serial
except ImportError:
print(
"Missing dependency: pyserial\n"
"Install it with: python -m pip install pyserial",
file=sys.stderr,
)
raise SystemExit(2)
HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA")
def parse_int(text: str) -> int:
value = int(text, 0)
if not 0 <= value <= 0xFF:
raise argparse.ArgumentTypeError(f"must be a byte: {text}")
return value
def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes:
body = bytes([prefix1, prefix2, command, state, value])
checksum = 0x5A
for byte in body:
checksum ^= byte
return body + bytes([checksum])
def hex_preview(data: bytes) -> str:
return " ".join(f"{byte:02X}" for byte in data)
def make_logger(path: str | None):
log_file = open(path, "a", encoding="utf-8") if path else None
def emit(line: str) -> None:
print(line)
if log_file:
log_file.write(line + "\n")
log_file.flush()
return emit, log_file
def drain_rx(ser: serial.Serial, emit, until: float, frame_size: int) -> int:
buffer = bytearray()
interesting = 0
while time.monotonic() < until:
data = ser.read(64)
if not data:
continue
buffer.extend(data)
while len(buffer) >= frame_size:
frame = bytes(buffer[:frame_size])
del buffer[:frame_size]
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
marker = "" if frame == HEARTBEAT else " NON_HEARTBEAT"
emit(f"{stamp} RX frame {frame_size:03d} {hex_preview(frame)}{marker}")
if frame != HEARTBEAT:
interesting += 1
return interesting
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Sweep checksum-valid command bytes and log non-heartbeat RX."
)
parser.add_argument("--port", required=True, help="serial port, for example COM5")
parser.add_argument("--baud", type=int, default=38400)
parser.add_argument("--start", type=parse_int, default=0x00)
parser.add_argument("--end", type=parse_int, default=0x20)
parser.add_argument("--prefix1", type=parse_int, default=0x00)
parser.add_argument("--prefix2", type=parse_int, default=0x00)
parser.add_argument("--state", type=parse_int, default=0x00)
parser.add_argument("--value", type=parse_int, default=0x80)
parser.add_argument(
"--settle",
type=float,
default=1.5,
help="seconds to listen before the sweep starts",
)
parser.add_argument(
"--after-each",
type=float,
default=0.8,
help="seconds to listen after each transmitted frame",
)
parser.add_argument(
"--after",
type=float,
default=3.0,
help="seconds to listen after the whole sweep",
)
parser.add_argument("--frame-size", type=int, default=6)
parser.add_argument("--log", help="append capture/transmit log to this file")
parser.add_argument(
"--stop-on-non-heartbeat",
action="store_true",
help="stop the sweep if the RCP sends any non-heartbeat frame",
)
parser.add_argument(
"--prompt-screen",
action="store_true",
help="after each command, prompt for the observed RCP screen state",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="print frames without opening the serial port",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
if args.end < args.start:
raise SystemExit("--end must be >= --start")
frames = [
(command, build_frame(args.prefix1, args.prefix2, command, args.state, args.value))
for command in range(args.start, args.end + 1)
]
if args.dry_run:
for command, frame in frames:
print(f"cmd 0x{command:02X}: {hex_preview(frame)}")
return 0
emit, log_file = make_logger(args.log)
try:
with serial.Serial(
port=args.port,
baudrate=args.baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=0.05,
write_timeout=1.0,
rtscts=False,
dsrdtr=False,
xonxoff=False,
) as ser:
ser.reset_input_buffer()
emit(
f"Sweeping commands 0x{args.start:02X}-0x{args.end:02X} "
f"on {ser.port} at {ser.baudrate} 8N1"
)
drain_rx(ser, emit, time.monotonic() + args.settle, args.frame_size)
for command, frame in frames:
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} TX cmd 0x{command:02X} frame {len(frame):03d} {hex_preview(frame)}")
ser.write(frame)
ser.flush()
interesting = drain_rx(
ser,
emit,
time.monotonic() + args.after_each,
args.frame_size,
)
if interesting and args.stop_on_non_heartbeat:
emit(f"Stopping after cmd 0x{command:02X}: non-heartbeat RX observed")
break
if args.prompt_screen:
screen = input(
f"Screen after cmd 0x{command:02X} "
"(blank = no change, q = stop): "
).strip()
if screen:
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} SCREEN cmd 0x{command:02X} {screen}")
if screen.lower() in {"q", "quit", "stop"}:
break
drain_rx(ser, emit, time.monotonic() + args.after, args.frame_size)
except KeyboardInterrupt:
print("Stopped.")
return 0
except serial.SerialException as exc:
print(f"Serial error: {exc}", file=sys.stderr)
return 1
finally:
if log_file:
log_file.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())