119 lines
3.7 KiB
Python
119 lines
3.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Send one Sony RCP-TX7 candidate serial frame.
|
|
|
|
Use carefully. This is for controlled experiments on the host-to-RCP line
|
|
through an RS-232 adapter. It can either send a complete hex frame or build the
|
|
observed 6-byte frame shape and checksum from fields.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import sys
|
|
import time
|
|
|
|
try:
|
|
import serial
|
|
except ImportError:
|
|
print(
|
|
"Missing dependency: pyserial\n"
|
|
"Install it with: python -m pip install pyserial",
|
|
file=sys.stderr,
|
|
)
|
|
raise SystemExit(2)
|
|
|
|
|
|
def parse_hex_bytes(text: str) -> bytes:
|
|
normalized = text.replace(",", " ").replace("0x", "").replace("0X", "")
|
|
parts = normalized.split()
|
|
try:
|
|
data = bytes(int(part, 16) for part in parts)
|
|
except ValueError as exc:
|
|
raise argparse.ArgumentTypeError(f"invalid hex byte list: {text}") from exc
|
|
if not data:
|
|
raise argparse.ArgumentTypeError("hex frame cannot be empty")
|
|
if any(part and int(part, 16) > 0xFF for part in parts):
|
|
raise argparse.ArgumentTypeError("hex values must be bytes")
|
|
return data
|
|
|
|
|
|
def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes:
|
|
body = bytes([prefix1, prefix2, command, state, value])
|
|
checksum = 0x5A
|
|
for byte in body:
|
|
checksum ^= byte
|
|
return body + bytes([checksum])
|
|
|
|
|
|
def hex_preview(data: bytes) -> str:
|
|
return " ".join(f"{byte:02X}" for byte in data)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Send an explicit or checksum-built 6-byte RCP-TX7 frame."
|
|
)
|
|
parser.add_argument("--port", required=True, help="serial port, for example COM3")
|
|
parser.add_argument("--baud", type=int, default=38400)
|
|
parser.add_argument(
|
|
"--frame",
|
|
type=parse_hex_bytes,
|
|
help="complete hex frame, for example \"00 00 00 00 80 DA\"",
|
|
)
|
|
parser.add_argument("--prefix1", type=lambda s: int(s, 0), default=0)
|
|
parser.add_argument("--prefix2", type=lambda s: int(s, 0), default=0)
|
|
parser.add_argument("--command", type=lambda s: int(s, 0), default=0)
|
|
parser.add_argument("--state", type=lambda s: int(s, 0), default=0)
|
|
parser.add_argument("--value", type=lambda s: int(s, 0), default=0x80)
|
|
parser.add_argument("--repeat", type=int, default=1)
|
|
parser.add_argument("--interval", type=float, default=0.2)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="print the frame but do not open the serial port",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def validate_byte(name: str, value: int) -> None:
|
|
if not 0 <= value <= 0xFF:
|
|
raise SystemExit(f"{name} must be a byte, got {value!r}")
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
if args.frame:
|
|
frame = args.frame
|
|
else:
|
|
for name in ("prefix1", "prefix2", "command", "state", "value"):
|
|
validate_byte(name, getattr(args, name))
|
|
frame = build_frame(args.prefix1, args.prefix2, args.command, args.state, args.value)
|
|
|
|
print(f"Frame: {hex_preview(frame)}")
|
|
if args.dry_run:
|
|
return 0
|
|
|
|
with serial.Serial(
|
|
port=args.port,
|
|
baudrate=args.baud,
|
|
bytesize=serial.EIGHTBITS,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE,
|
|
timeout=0.2,
|
|
write_timeout=1.0,
|
|
rtscts=False,
|
|
dsrdtr=False,
|
|
xonxoff=False,
|
|
) as ser:
|
|
for index in range(args.repeat):
|
|
ser.write(frame)
|
|
ser.flush()
|
|
print(f"Sent {index + 1}/{args.repeat}")
|
|
if index + 1 < args.repeat:
|
|
time.sleep(args.interval)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|