217 lines
6.7 KiB
Python
217 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Cautious RCP-TX7 host-frame command sweep.
|
|
|
|
This sends checksum-valid 6-byte frames of the observed shape:
|
|
|
|
prefix1 prefix2 command state value checksum
|
|
|
|
The checksum is the current working hypothesis:
|
|
|
|
checksum = 0x5A xor prefix1 xor prefix2 xor command xor state xor value
|
|
|
|
The script logs TX frames plus any RX frames. It highlights RX frames that differ
|
|
from the known heartbeat so a run can be scanned quickly.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import sys
|
|
import time
|
|
|
|
try:
|
|
import serial
|
|
except ImportError:
|
|
print(
|
|
"Missing dependency: pyserial\n"
|
|
"Install it with: python -m pip install pyserial",
|
|
file=sys.stderr,
|
|
)
|
|
raise SystemExit(2)
|
|
|
|
|
|
HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA")
|
|
|
|
|
|
def parse_int(text: str) -> int:
|
|
value = int(text, 0)
|
|
if not 0 <= value <= 0xFF:
|
|
raise argparse.ArgumentTypeError(f"must be a byte: {text}")
|
|
return value
|
|
|
|
|
|
def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes:
|
|
body = bytes([prefix1, prefix2, command, state, value])
|
|
checksum = 0x5A
|
|
for byte in body:
|
|
checksum ^= byte
|
|
return body + bytes([checksum])
|
|
|
|
|
|
def hex_preview(data: bytes) -> str:
|
|
return " ".join(f"{byte:02X}" for byte in data)
|
|
|
|
|
|
def make_logger(path: str | None):
|
|
log_file = open(path, "a", encoding="utf-8") if path else None
|
|
|
|
def emit(line: str) -> None:
|
|
print(line)
|
|
if log_file:
|
|
log_file.write(line + "\n")
|
|
log_file.flush()
|
|
|
|
return emit, log_file
|
|
|
|
|
|
def drain_rx(ser: serial.Serial, emit, until: float, frame_size: int) -> int:
|
|
buffer = bytearray()
|
|
interesting = 0
|
|
|
|
while time.monotonic() < until:
|
|
data = ser.read(64)
|
|
if not data:
|
|
continue
|
|
|
|
buffer.extend(data)
|
|
while len(buffer) >= frame_size:
|
|
frame = bytes(buffer[:frame_size])
|
|
del buffer[:frame_size]
|
|
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
marker = "" if frame == HEARTBEAT else " NON_HEARTBEAT"
|
|
emit(f"{stamp} RX frame {frame_size:03d} {hex_preview(frame)}{marker}")
|
|
if frame != HEARTBEAT:
|
|
interesting += 1
|
|
|
|
return interesting
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Sweep checksum-valid command bytes and log non-heartbeat RX."
|
|
)
|
|
parser.add_argument("--port", required=True, help="serial port, for example COM5")
|
|
parser.add_argument("--baud", type=int, default=38400)
|
|
parser.add_argument("--start", type=parse_int, default=0x00)
|
|
parser.add_argument("--end", type=parse_int, default=0x20)
|
|
parser.add_argument("--prefix1", type=parse_int, default=0x00)
|
|
parser.add_argument("--prefix2", type=parse_int, default=0x00)
|
|
parser.add_argument("--state", type=parse_int, default=0x00)
|
|
parser.add_argument("--value", type=parse_int, default=0x80)
|
|
parser.add_argument(
|
|
"--settle",
|
|
type=float,
|
|
default=1.5,
|
|
help="seconds to listen before the sweep starts",
|
|
)
|
|
parser.add_argument(
|
|
"--after-each",
|
|
type=float,
|
|
default=0.8,
|
|
help="seconds to listen after each transmitted frame",
|
|
)
|
|
parser.add_argument(
|
|
"--after",
|
|
type=float,
|
|
default=3.0,
|
|
help="seconds to listen after the whole sweep",
|
|
)
|
|
parser.add_argument("--frame-size", type=int, default=6)
|
|
parser.add_argument("--log", help="append capture/transmit log to this file")
|
|
parser.add_argument(
|
|
"--stop-on-non-heartbeat",
|
|
action="store_true",
|
|
help="stop the sweep if the RCP sends any non-heartbeat frame",
|
|
)
|
|
parser.add_argument(
|
|
"--prompt-screen",
|
|
action="store_true",
|
|
help="after each command, prompt for the observed RCP screen state",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="print frames without opening the serial port",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
if args.end < args.start:
|
|
raise SystemExit("--end must be >= --start")
|
|
|
|
frames = [
|
|
(command, build_frame(args.prefix1, args.prefix2, command, args.state, args.value))
|
|
for command in range(args.start, args.end + 1)
|
|
]
|
|
|
|
if args.dry_run:
|
|
for command, frame in frames:
|
|
print(f"cmd 0x{command:02X}: {hex_preview(frame)}")
|
|
return 0
|
|
|
|
emit, log_file = make_logger(args.log)
|
|
try:
|
|
with serial.Serial(
|
|
port=args.port,
|
|
baudrate=args.baud,
|
|
bytesize=serial.EIGHTBITS,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE,
|
|
timeout=0.05,
|
|
write_timeout=1.0,
|
|
rtscts=False,
|
|
dsrdtr=False,
|
|
xonxoff=False,
|
|
) as ser:
|
|
ser.reset_input_buffer()
|
|
emit(
|
|
f"Sweeping commands 0x{args.start:02X}-0x{args.end:02X} "
|
|
f"on {ser.port} at {ser.baudrate} 8N1"
|
|
)
|
|
drain_rx(ser, emit, time.monotonic() + args.settle, args.frame_size)
|
|
|
|
for command, frame in frames:
|
|
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
emit(f"{stamp} TX cmd 0x{command:02X} frame {len(frame):03d} {hex_preview(frame)}")
|
|
ser.write(frame)
|
|
ser.flush()
|
|
interesting = drain_rx(
|
|
ser,
|
|
emit,
|
|
time.monotonic() + args.after_each,
|
|
args.frame_size,
|
|
)
|
|
if interesting and args.stop_on_non_heartbeat:
|
|
emit(f"Stopping after cmd 0x{command:02X}: non-heartbeat RX observed")
|
|
break
|
|
if args.prompt_screen:
|
|
screen = input(
|
|
f"Screen after cmd 0x{command:02X} "
|
|
"(blank = no change, q = stop): "
|
|
).strip()
|
|
if screen:
|
|
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
emit(f"{stamp} SCREEN cmd 0x{command:02X} {screen}")
|
|
if screen.lower() in {"q", "quit", "stop"}:
|
|
break
|
|
|
|
drain_rx(ser, emit, time.monotonic() + args.after, args.frame_size)
|
|
except KeyboardInterrupt:
|
|
print("Stopped.")
|
|
return 0
|
|
except serial.SerialException as exc:
|
|
print(f"Serial error: {exc}", file=sys.stderr)
|
|
return 1
|
|
finally:
|
|
if log_file:
|
|
log_file.close()
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|