250 lines
8.7 KiB
Python
250 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Send arbitrary RCP-TX7 host frame sequences and classify RX windows.
|
|
|
|
This helper is meant for exploratory multi-frame tests such as:
|
|
|
|
primer -> announce/selector -> query
|
|
primer -> short query block
|
|
repeated pseudo-keepalive groups
|
|
|
|
Each transmitted frame can be followed by a small read window so responses can
|
|
be attributed to a specific host frame more easily than in a long raw capture.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import sys
|
|
import time
|
|
|
|
try:
|
|
import serial
|
|
except ImportError:
|
|
print(
|
|
"Missing dependency: pyserial\n"
|
|
"Install it with: python -m pip install pyserial",
|
|
file=sys.stderr,
|
|
)
|
|
raise SystemExit(2)
|
|
|
|
|
|
HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA")
|
|
|
|
|
|
def parse_hex_bytes(text: str) -> bytes:
|
|
normalized = text.replace(",", " ").replace("0x", "").replace("0X", "")
|
|
parts = normalized.split()
|
|
if not parts:
|
|
raise argparse.ArgumentTypeError("hex frame cannot be empty")
|
|
try:
|
|
values = [int(part, 16) for part in parts]
|
|
except ValueError as exc:
|
|
raise argparse.ArgumentTypeError(f"invalid hex byte list: {text}") from exc
|
|
if any(value < 0 or value > 0xFF for value in values):
|
|
raise argparse.ArgumentTypeError("hex values must be bytes")
|
|
return bytes(values)
|
|
|
|
|
|
def hex_preview(data: bytes) -> str:
|
|
return " ".join(f"{byte:02X}" for byte in data)
|
|
|
|
|
|
def ascii_preview(data: bytes) -> str:
|
|
return "".join(chr(byte) if 32 <= byte <= 126 else "." for byte in data)
|
|
|
|
|
|
def make_logger(path: str | None):
|
|
log_file = open(path, "a", encoding="utf-8") if path else None
|
|
|
|
def emit(line: str) -> None:
|
|
print(line)
|
|
if log_file:
|
|
log_file.write(line + "\n")
|
|
log_file.flush()
|
|
|
|
return emit, log_file
|
|
|
|
|
|
def heartbeat_offset(data: bytes) -> int | None:
|
|
if not data:
|
|
return 0
|
|
for offset in range(len(HEARTBEAT)):
|
|
if all(byte == HEARTBEAT[(offset + index) % len(HEARTBEAT)] for index, byte in enumerate(data)):
|
|
return offset
|
|
return None
|
|
|
|
|
|
def first_mismatch(data: bytes, offset: int) -> tuple[int, int, int] | None:
|
|
for index, byte in enumerate(data):
|
|
expected = HEARTBEAT[(offset + index) % len(HEARTBEAT)]
|
|
if byte != expected:
|
|
return index, byte, expected
|
|
return None
|
|
|
|
|
|
def classify_rx(data: bytes) -> tuple[bool, str]:
|
|
if not data:
|
|
return False, "no RX bytes"
|
|
|
|
offset = heartbeat_offset(data)
|
|
if offset is not None:
|
|
full = len(data) // len(HEARTBEAT)
|
|
extra = len(data) % len(HEARTBEAT)
|
|
return False, f"heartbeat-compatible RX: {len(data)} bytes, offset {offset}, {full} frames + {extra} bytes"
|
|
|
|
best_offset = min(
|
|
range(len(HEARTBEAT)),
|
|
key=lambda candidate: sum(
|
|
byte != HEARTBEAT[(candidate + index) % len(HEARTBEAT)]
|
|
for index, byte in enumerate(data)
|
|
),
|
|
)
|
|
mismatch = first_mismatch(data, best_offset)
|
|
if mismatch is None:
|
|
return False, "heartbeat-compatible RX"
|
|
|
|
index, byte, expected = mismatch
|
|
return (
|
|
True,
|
|
f"ANOMALY {len(data)} RX bytes; first mismatch at byte {index}: "
|
|
f"got {byte:02X}, heartbeat offset {best_offset} expected {expected:02X}",
|
|
)
|
|
|
|
|
|
def read_window(ser: serial.Serial, duration: float) -> bytes:
|
|
stop_at = time.monotonic() + duration
|
|
data = bytearray()
|
|
while time.monotonic() < stop_at:
|
|
chunk = ser.read(128)
|
|
if chunk:
|
|
data.extend(chunk)
|
|
return bytes(data)
|
|
|
|
|
|
def emit_rx(emit, label: str, data: bytes, ascii_mode: bool) -> bool:
|
|
is_anomaly, note = classify_rx(data)
|
|
emit(f"{label} {note}")
|
|
if data and ascii_mode:
|
|
emit(f"{'':14} ASCII {ascii_preview(data)}")
|
|
if is_anomaly:
|
|
emit(f"{label} raw {hex_preview(data)}")
|
|
return is_anomaly
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Send an arbitrary host frame sequence and classify RX after each step."
|
|
)
|
|
parser.add_argument("--port", required=True, help="serial port, for example COM5")
|
|
parser.add_argument("--baud", type=int, default=38400)
|
|
parser.add_argument("--timeout", type=float, default=0.03)
|
|
parser.add_argument("--log", help="append probe log to this file")
|
|
parser.add_argument("--ascii", action="store_true")
|
|
parser.add_argument("--prompt", action="store_true", help="pause before starting so you can power-cycle or prepare the panel")
|
|
parser.add_argument("--pre-read", type=float, default=3.0, help="seconds to observe baseline heartbeat before the first frame")
|
|
parser.add_argument("--delay", type=float, default=0.0, help="extra delay after prompt/baseline before the first frame")
|
|
parser.add_argument(
|
|
"--frame",
|
|
type=parse_hex_bytes,
|
|
action="append",
|
|
required=True,
|
|
help="hex frame to send; repeat the flag to build a sequence",
|
|
)
|
|
parser.add_argument("--frame-interval", type=float, default=0.05, help="delay between consecutive frames in the same group")
|
|
parser.add_argument("--read-after-frame", type=float, default=0.8, help="seconds to read/classify after each frame")
|
|
parser.add_argument("--repeat", type=int, default=1, help="how many times to send the full frame group")
|
|
parser.add_argument("--repeat-interval", type=float, default=0.0, help="delay between repeated frame groups")
|
|
parser.add_argument("--read-after-group", type=float, default=0.0, help="extra seconds to read/classify after each full group")
|
|
parser.add_argument(
|
|
"--prompt-screen",
|
|
action="store_true",
|
|
help="prompt for a final screen/light note so visible panel state is captured in the log",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
emit, log_file = make_logger(args.log)
|
|
|
|
try:
|
|
with serial.Serial(
|
|
port=args.port,
|
|
baudrate=args.baud,
|
|
bytesize=serial.EIGHTBITS,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE,
|
|
timeout=args.timeout,
|
|
write_timeout=1.0,
|
|
rtscts=False,
|
|
dsrdtr=False,
|
|
xonxoff=False,
|
|
) as ser:
|
|
emit(
|
|
f"Sequence probe: {len(args.frame)} frames x {args.repeat} group(s) "
|
|
f"on {ser.port} at {ser.baudrate} 8N1"
|
|
)
|
|
for index, frame in enumerate(args.frame, start=1):
|
|
emit(f"FRAME {index}: {hex_preview(frame)}")
|
|
|
|
if args.prompt:
|
|
input("Prepare/power-cycle RCP, wait for heartbeat, then press Enter: ")
|
|
|
|
ser.reset_input_buffer()
|
|
emit_rx(emit, "BASELINE", read_window(ser, args.pre_read), args.ascii)
|
|
|
|
if args.delay > 0:
|
|
time.sleep(args.delay)
|
|
|
|
anomaly_count = 0
|
|
for group_index in range(1, args.repeat + 1):
|
|
if group_index > 1 and args.repeat_interval > 0:
|
|
time.sleep(args.repeat_interval)
|
|
|
|
emit(f"BEGIN group {group_index}/{args.repeat}")
|
|
for frame_index, frame in enumerate(args.frame, start=1):
|
|
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
emit(
|
|
f"{stamp} TX group={group_index} frame={frame_index} "
|
|
f"len={len(frame):03d} {hex_preview(frame)}"
|
|
)
|
|
ser.write(frame)
|
|
ser.flush()
|
|
|
|
if args.read_after_frame > 0:
|
|
label = f"{stamp} RX group={group_index} frame={frame_index}"
|
|
if emit_rx(emit, label, read_window(ser, args.read_after_frame), args.ascii):
|
|
anomaly_count += 1
|
|
|
|
if frame_index < len(args.frame) and args.frame_interval > 0:
|
|
time.sleep(args.frame_interval)
|
|
|
|
if args.read_after_group > 0:
|
|
label = f"GROUP {group_index} TAIL"
|
|
if emit_rx(emit, label, read_window(ser, args.read_after_group), args.ascii):
|
|
anomaly_count += 1
|
|
|
|
if args.prompt_screen:
|
|
note = input("Final screen/light note (blank = no note): ").strip()
|
|
if note:
|
|
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
emit(f"{stamp} PANEL {note}")
|
|
|
|
emit(f"Anomalies: {anomaly_count}")
|
|
except KeyboardInterrupt:
|
|
emit("Stopped.")
|
|
return 0
|
|
except serial.SerialException as exc:
|
|
print(f"Serial error: {exc}", file=sys.stderr)
|
|
return 1
|
|
finally:
|
|
if log_file:
|
|
log_file.close()
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|