415 lines
16 KiB
Python
415 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""Button-focused RCP-TX7 serial tests.
|
|
|
|
This helper can:
|
|
|
|
1. Optionally put the RCP into a latched state with a known primer/query.
|
|
2. Listen for the button frames that are known to appear while disconnected.
|
|
3. Optionally transmit response frames when CAM POWER or CALL is observed.
|
|
4. Optionally mirror CALL high/low events with matching host responses.
|
|
5. Optionally transmit follow-up frames when a watched response frame appears.
|
|
6. Optionally transmit startup frames without waiting for a button event.
|
|
7. Optionally repeat the startup-frame group to test retrigger/latch behavior.
|
|
8. Optionally read after each startup-frame group before sending the next one.
|
|
|
|
Known RCP-origin button frames:
|
|
|
|
CAM POWER: 00 00 07 80 00 DD
|
|
CALL on: 00 00 15 80 00 CF
|
|
CALL off: 00 00 15 00 00 4F
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import sys
|
|
import time
|
|
|
|
try:
|
|
import serial
|
|
except ImportError:
|
|
print(
|
|
"Missing dependency: pyserial\n"
|
|
"Install it with: python -m pip install pyserial",
|
|
file=sys.stderr,
|
|
)
|
|
raise SystemExit(2)
|
|
|
|
|
|
HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA")
|
|
CAM_POWER = bytes.fromhex("00 00 07 80 00 DD")
|
|
CALL_ON = bytes.fromhex("00 00 15 80 00 CF")
|
|
CALL_OFF = bytes.fromhex("00 00 15 00 00 4F")
|
|
KNOWN_PATTERNS = {
|
|
"heartbeat": HEARTBEAT,
|
|
"cam-power": CAM_POWER,
|
|
"call-on": CALL_ON,
|
|
"call-off": CALL_OFF,
|
|
}
|
|
|
|
|
|
def parse_byte(text: str) -> int:
|
|
value = int(text, 0)
|
|
if not 0 <= value <= 0xFF:
|
|
raise argparse.ArgumentTypeError(f"must be a byte: {text}")
|
|
return value
|
|
|
|
|
|
def parse_hex_bytes(text: str) -> bytes:
|
|
normalized = text.replace(",", " ").replace("0x", "").replace("0X", "")
|
|
parts = normalized.split()
|
|
if not parts:
|
|
raise argparse.ArgumentTypeError("hex frame cannot be empty")
|
|
try:
|
|
values = [int(part, 16) for part in parts]
|
|
except ValueError as exc:
|
|
raise argparse.ArgumentTypeError(f"invalid hex byte list: {text}") from exc
|
|
if any(value < 0 or value > 0xFF for value in values):
|
|
raise argparse.ArgumentTypeError("hex values must be bytes")
|
|
return bytes(values)
|
|
|
|
|
|
def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes:
|
|
body = bytes([prefix1, prefix2, command, state, value])
|
|
checksum = 0x5A
|
|
for byte in body:
|
|
checksum ^= byte
|
|
return body + bytes([checksum])
|
|
|
|
|
|
def hex_preview(data: bytes) -> str:
|
|
return " ".join(f"{byte:02X}" for byte in data)
|
|
|
|
|
|
def ascii_preview(data: bytes) -> str:
|
|
return "".join(chr(byte) if 32 <= byte <= 126 else "." for byte in data)
|
|
|
|
|
|
def make_logger(path: str | None):
|
|
log_file = open(path, "a", encoding="utf-8") if path else None
|
|
|
|
def emit(line: str) -> None:
|
|
print(line)
|
|
if log_file:
|
|
log_file.write(line + "\n")
|
|
log_file.flush()
|
|
|
|
return emit, log_file
|
|
|
|
|
|
def read_window(ser: serial.Serial, duration: float) -> bytes:
|
|
stop_at = time.monotonic() + duration
|
|
data = bytearray()
|
|
while time.monotonic() < stop_at:
|
|
chunk = ser.read(128)
|
|
if chunk:
|
|
data.extend(chunk)
|
|
return bytes(data)
|
|
|
|
|
|
def send_frame(ser: serial.Serial, emit, label: str, frame: bytes) -> None:
|
|
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
emit(f"{stamp} TX {label} frame {len(frame):03d} {hex_preview(frame)}")
|
|
ser.write(frame)
|
|
ser.flush()
|
|
|
|
|
|
def emit_known_counts(emit, label: str, data: bytes) -> None:
|
|
if not data:
|
|
emit(f"{label} no RX bytes")
|
|
return
|
|
counts = {
|
|
name: data.count(pattern)
|
|
for name, pattern in KNOWN_PATTERNS.items()
|
|
if data.count(pattern)
|
|
}
|
|
count_text = ", ".join(f"{name}={count}" for name, count in counts.items()) or "no known complete frames"
|
|
emit(f"{label} RX {len(data)} bytes; {count_text}")
|
|
if counts.get("cam-power") or counts.get("call-on") or counts.get("call-off"):
|
|
emit(f"{label} raw {hex_preview(data)}")
|
|
|
|
|
|
def emit_rx_chunk(emit, args, totals, watch_totals, data: bytes) -> None:
|
|
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
emit(f"{stamp} RX {len(data):03d} bytes {hex_preview(data)}")
|
|
if args.ascii:
|
|
emit(f"{'':14} ASCII {ascii_preview(data)}")
|
|
|
|
for name, pattern in KNOWN_PATTERNS.items():
|
|
count = data.count(pattern)
|
|
if count:
|
|
totals[name] += count
|
|
emit(f"{stamp} DETECT {name} x{count}")
|
|
for frame in args.watch_frame or []:
|
|
count = data.count(frame)
|
|
if count:
|
|
key = hex_preview(frame)
|
|
watch_totals[key] += count
|
|
emit(f"{stamp} DETECT watch-frame {key} x{count}")
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Listen for CAM POWER/CALL frames and optionally respond to CAM POWER."
|
|
)
|
|
parser.add_argument("--port", required=True, help="serial port, for example COM5")
|
|
parser.add_argument("--baud", type=int, default=38400)
|
|
parser.add_argument("--timeout", type=float, default=0.03)
|
|
parser.add_argument("--duration", type=float, default=30.0)
|
|
parser.add_argument("--chunk-size", type=int, default=128)
|
|
parser.add_argument("--log", help="append capture/transmit log to this file")
|
|
parser.add_argument("--ascii", action="store_true")
|
|
parser.add_argument("--latch", action="store_true", help="send latch primer/query before listening")
|
|
parser.add_argument("--latch-primer-command", type=parse_byte, default=0x00)
|
|
parser.add_argument("--latch-query-command", type=parse_byte, default=0xB5)
|
|
parser.add_argument("--state", type=parse_byte, default=0x00)
|
|
parser.add_argument("--value", type=parse_byte, default=0x80)
|
|
parser.add_argument("--after-latch", type=float, default=1.0)
|
|
parser.add_argument("--respond-to-cam-power", action="store_true")
|
|
parser.add_argument("--respond-to-call", action="store_true")
|
|
parser.add_argument(
|
|
"--mirror-call",
|
|
action="store_true",
|
|
help="respond to CALL high with CALL high and CALL low with CALL low",
|
|
)
|
|
parser.add_argument(
|
|
"--response-frame",
|
|
type=parse_hex_bytes,
|
|
action="append",
|
|
help="hex frame to send when CAM POWER is seen; can be repeated",
|
|
)
|
|
parser.add_argument(
|
|
"--watch-frame",
|
|
type=parse_hex_bytes,
|
|
action="append",
|
|
help="hex frame to count when it appears in RX; can be repeated",
|
|
)
|
|
parser.add_argument(
|
|
"--followup-on-watch-frame",
|
|
action="store_true",
|
|
help="send follow-up frame(s) when any watched frame is observed",
|
|
)
|
|
parser.add_argument(
|
|
"--followup-frame",
|
|
type=parse_hex_bytes,
|
|
action="append",
|
|
help="hex frame to send after a watched frame appears; can be repeated",
|
|
)
|
|
parser.add_argument("--followup-delay", type=float, default=0.05)
|
|
parser.add_argument(
|
|
"--startup-frame",
|
|
type=parse_hex_bytes,
|
|
action="append",
|
|
help="hex frame to send after listening starts, without waiting for a button event",
|
|
)
|
|
parser.add_argument("--startup-delay", type=float, default=0.5)
|
|
parser.add_argument(
|
|
"--startup-frame-interval",
|
|
type=float,
|
|
default=0.05,
|
|
help="delay between multiple startup frames",
|
|
)
|
|
parser.add_argument(
|
|
"--startup-repeat",
|
|
type=int,
|
|
default=1,
|
|
help="how many times to send the full startup-frame group",
|
|
)
|
|
parser.add_argument(
|
|
"--startup-repeat-interval",
|
|
type=float,
|
|
default=1.0,
|
|
help="delay between repeated startup-frame groups",
|
|
)
|
|
parser.add_argument(
|
|
"--startup-read-after-group",
|
|
type=float,
|
|
default=0.0,
|
|
help="seconds to read and log RX after each startup-frame group before the next repeat",
|
|
)
|
|
parser.add_argument("--response-delay", type=float, default=0.05)
|
|
parser.add_argument(
|
|
"--response-frame-interval",
|
|
type=float,
|
|
default=0.0,
|
|
help="delay between multiple response frames sent for one button event",
|
|
)
|
|
parser.add_argument("--response-repeat", type=int, default=1)
|
|
parser.add_argument("--response-interval", type=float, default=0.2)
|
|
parser.add_argument(
|
|
"--respond-once",
|
|
action="store_true",
|
|
help="only transmit the response on the first matched button frame",
|
|
)
|
|
parser.add_argument(
|
|
"--mirror-call-once-per-state",
|
|
action="store_true",
|
|
help="with --mirror-call, respond once to CALL high and once to CALL low",
|
|
)
|
|
parser.add_argument("--prompt", action="store_true", help="pause before listen so you can prepare button presses")
|
|
args = parser.parse_args()
|
|
if args.followup_on_watch_frame and not args.followup_frame:
|
|
parser.error("--followup-on-watch-frame requires at least one --followup-frame")
|
|
return args
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
emit, log_file = make_logger(args.log)
|
|
response_frames = args.response_frame or [CAM_POWER]
|
|
watch_frames = args.watch_frame or []
|
|
primer = build_frame(0x00, 0x00, args.latch_primer_command, args.state, args.value)
|
|
query = build_frame(0x00, 0x00, args.latch_query_command, args.state, args.value)
|
|
responded = False
|
|
followup_sent = False
|
|
mirrored_call_on = False
|
|
mirrored_call_off = False
|
|
totals = {name: 0 for name in KNOWN_PATTERNS}
|
|
watch_totals = {hex_preview(frame): 0 for frame in watch_frames}
|
|
|
|
try:
|
|
with serial.Serial(
|
|
port=args.port,
|
|
baudrate=args.baud,
|
|
bytesize=serial.EIGHTBITS,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE,
|
|
timeout=args.timeout,
|
|
write_timeout=1.0,
|
|
rtscts=False,
|
|
dsrdtr=False,
|
|
xonxoff=False,
|
|
) as ser:
|
|
ser.reset_input_buffer()
|
|
emit(f"Button test on {ser.port} at {ser.baudrate} 8N1")
|
|
|
|
if args.latch:
|
|
send_frame(ser, emit, "latch primer", primer)
|
|
emit_known_counts(emit, "LATCH PRIMER", read_window(ser, args.after_latch))
|
|
send_frame(ser, emit, "latch query", query)
|
|
emit_known_counts(emit, "LATCH QUERY", read_window(ser, args.after_latch))
|
|
|
|
if args.prompt:
|
|
if args.startup_frame:
|
|
input("Ready to listen. Press Enter; startup frames will be sent automatically: ")
|
|
else:
|
|
input("Ready to listen. Press Enter, then press CAM POWER/CALL on the RCP: ")
|
|
ser.reset_input_buffer()
|
|
|
|
emit(
|
|
f"Listening for {args.duration:.1f}s; "
|
|
f"respond_to_cam_power={args.respond_to_cam_power}, "
|
|
f"respond_to_call={args.respond_to_call}, "
|
|
f"mirror_call={args.mirror_call}"
|
|
)
|
|
stop_at = time.monotonic() + args.duration
|
|
buffer = bytearray()
|
|
if args.startup_frame:
|
|
time.sleep(args.startup_delay)
|
|
for repeat_index in range(args.startup_repeat):
|
|
for frame_index, frame in enumerate(args.startup_frame):
|
|
if frame_index:
|
|
time.sleep(args.startup_frame_interval)
|
|
send_frame(ser, emit, "startup", frame)
|
|
if args.startup_read_after_group:
|
|
group_data = read_window(ser, args.startup_read_after_group)
|
|
if group_data:
|
|
emit_rx_chunk(emit, args, totals, watch_totals, group_data)
|
|
buffer.extend(group_data)
|
|
if repeat_index + 1 < args.startup_repeat:
|
|
time.sleep(args.startup_repeat_interval)
|
|
|
|
while time.monotonic() < stop_at:
|
|
data = ser.read(args.chunk_size)
|
|
if not data:
|
|
continue
|
|
|
|
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
emit(f"{stamp} RX {len(data):03d} bytes {hex_preview(data)}")
|
|
if args.ascii:
|
|
emit(f"{'':14} ASCII {ascii_preview(data)}")
|
|
buffer.extend(data)
|
|
for name, pattern in KNOWN_PATTERNS.items():
|
|
count = data.count(pattern)
|
|
if count:
|
|
totals[name] += count
|
|
emit(f"{stamp} DETECT {name} x{count}")
|
|
for frame in watch_frames:
|
|
count = data.count(frame)
|
|
if count:
|
|
key = hex_preview(frame)
|
|
watch_totals[key] += count
|
|
emit(f"{stamp} DETECT watch-frame {key} x{count}")
|
|
if args.followup_on_watch_frame and not followup_sent:
|
|
followup_sent = True
|
|
time.sleep(args.followup_delay)
|
|
for followup in args.followup_frame or []:
|
|
send_frame(ser, emit, "watch follow-up", followup)
|
|
|
|
if args.mirror_call:
|
|
mirrored = False
|
|
if CALL_ON in buffer and not (
|
|
args.mirror_call_once_per_state and mirrored_call_on
|
|
):
|
|
send_frame(ser, emit, "CALL high mirror", CALL_ON)
|
|
mirrored_call_on = True
|
|
responded = True
|
|
mirrored = True
|
|
if CALL_OFF in buffer and not (
|
|
args.mirror_call_once_per_state and mirrored_call_off
|
|
):
|
|
send_frame(ser, emit, "CALL low mirror", CALL_OFF)
|
|
mirrored_call_off = True
|
|
responded = True
|
|
mirrored = True
|
|
if mirrored:
|
|
buffer.clear()
|
|
continue
|
|
|
|
should_respond = (
|
|
(
|
|
(args.respond_to_cam_power and CAM_POWER in buffer)
|
|
or (args.respond_to_call and (CALL_ON in buffer or CALL_OFF in buffer))
|
|
)
|
|
and not (args.respond_once and responded)
|
|
)
|
|
if should_respond:
|
|
responded = True
|
|
time.sleep(args.response_delay)
|
|
for _ in range(args.response_repeat):
|
|
for response_index, response in enumerate(response_frames):
|
|
if response_index and args.response_frame_interval:
|
|
time.sleep(args.response_frame_interval)
|
|
send_frame(ser, emit, "button response", response)
|
|
if args.response_repeat > 1:
|
|
time.sleep(args.response_interval)
|
|
buffer.clear()
|
|
elif len(buffer) > 256:
|
|
del buffer[:-64]
|
|
|
|
emit(
|
|
"Totals: "
|
|
+ ", ".join(f"{name}={count}" for name, count in totals.items())
|
|
)
|
|
if watch_totals:
|
|
emit(
|
|
"Watch totals: "
|
|
+ ", ".join(f"{frame}={count}" for frame, count in watch_totals.items())
|
|
)
|
|
except KeyboardInterrupt:
|
|
emit("Stopped.")
|
|
return 0
|
|
except serial.SerialException as exc:
|
|
print(f"Serial error: {exc}", file=sys.stderr)
|
|
return 1
|
|
finally:
|
|
if log_file:
|
|
log_file.close()
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|