305 lines
11 KiB
Python
305 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""Sweep candidate commands that may unlatch the RCP-TX7 response state.
|
|
|
|
Each candidate test performs:
|
|
|
|
latch primer -> latch query -> candidate -> verify primer -> verify query
|
|
|
|
The verify query checks whether the RCP will answer again without a power cycle.
|
|
Use --prompt-power-cycle for clean bench testing.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import sys
|
|
import time
|
|
|
|
try:
|
|
import serial
|
|
except ImportError:
|
|
print(
|
|
"Missing dependency: pyserial\n"
|
|
"Install it with: python -m pip install pyserial",
|
|
file=sys.stderr,
|
|
)
|
|
raise SystemExit(2)
|
|
|
|
|
|
HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA")
|
|
|
|
|
|
def parse_byte(text: str) -> int:
|
|
value = int(text, 0)
|
|
if not 0 <= value <= 0xFF:
|
|
raise argparse.ArgumentTypeError(f"must be a byte: {text}")
|
|
return value
|
|
|
|
|
|
def parse_byte_set(text: str) -> list[int]:
|
|
values: list[int] = []
|
|
for part in text.replace(",", " ").split():
|
|
if "-" in part:
|
|
start_text, end_text = part.split("-", 1)
|
|
start = parse_byte(start_text)
|
|
end = parse_byte(end_text)
|
|
if end < start:
|
|
raise argparse.ArgumentTypeError(f"bad range: {part}")
|
|
values.extend(range(start, end + 1))
|
|
else:
|
|
values.append(parse_byte(part))
|
|
if not values:
|
|
raise argparse.ArgumentTypeError("empty byte set")
|
|
return values
|
|
|
|
|
|
def parse_frame(text: str) -> bytes:
|
|
values = parse_byte_set(text)
|
|
if len(values) < 1:
|
|
raise argparse.ArgumentTypeError("empty frame")
|
|
return bytes(values)
|
|
|
|
|
|
def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes:
|
|
body = bytes([prefix1, prefix2, command, state, value])
|
|
checksum = 0x5A
|
|
for byte in body:
|
|
checksum ^= byte
|
|
return body + bytes([checksum])
|
|
|
|
|
|
def hex_preview(data: bytes) -> str:
|
|
return " ".join(f"{byte:02X}" for byte in data)
|
|
|
|
|
|
def make_logger(path: str | None):
|
|
log_file = open(path, "a", encoding="utf-8") if path else None
|
|
|
|
def emit(line: str) -> None:
|
|
print(line)
|
|
if log_file:
|
|
log_file.write(line + "\n")
|
|
log_file.flush()
|
|
|
|
return emit, log_file
|
|
|
|
|
|
def heartbeat_offset(data: bytes) -> int | None:
|
|
if not data:
|
|
return 0
|
|
for offset in range(len(HEARTBEAT)):
|
|
if all(byte == HEARTBEAT[(offset + index) % len(HEARTBEAT)] for index, byte in enumerate(data)):
|
|
return offset
|
|
return None
|
|
|
|
|
|
def first_mismatch(data: bytes, offset: int) -> tuple[int, int, int] | None:
|
|
for index, byte in enumerate(data):
|
|
expected = HEARTBEAT[(offset + index) % len(HEARTBEAT)]
|
|
if byte != expected:
|
|
return index, byte, expected
|
|
return None
|
|
|
|
|
|
def classify_rx(data: bytes) -> tuple[bool, str]:
|
|
if not data:
|
|
return False, "no RX bytes"
|
|
|
|
offset = heartbeat_offset(data)
|
|
if offset is not None:
|
|
full = len(data) // len(HEARTBEAT)
|
|
extra = len(data) % len(HEARTBEAT)
|
|
return False, f"heartbeat-compatible RX: {len(data)} bytes, offset {offset}, {full} frames + {extra} bytes"
|
|
|
|
best_offset = min(
|
|
range(len(HEARTBEAT)),
|
|
key=lambda candidate: sum(
|
|
byte != HEARTBEAT[(candidate + index) % len(HEARTBEAT)]
|
|
for index, byte in enumerate(data)
|
|
),
|
|
)
|
|
mismatch = first_mismatch(data, best_offset)
|
|
if mismatch is None:
|
|
return False, "heartbeat-compatible RX"
|
|
|
|
index, byte, expected = mismatch
|
|
return (
|
|
True,
|
|
f"ANOMALY {len(data)} RX bytes; first mismatch at byte {index}: "
|
|
f"got {byte:02X}, heartbeat offset {best_offset} expected {expected:02X}",
|
|
)
|
|
|
|
|
|
def read_window(ser: serial.Serial, duration: float) -> bytes:
|
|
stop_at = time.monotonic() + duration
|
|
data = bytearray()
|
|
while time.monotonic() < stop_at:
|
|
chunk = ser.read(128)
|
|
if chunk:
|
|
data.extend(chunk)
|
|
return bytes(data)
|
|
|
|
|
|
def emit_rx(emit, label: str, data: bytes) -> bool:
|
|
is_anomaly, note = classify_rx(data)
|
|
emit(f"{label} {note}")
|
|
if is_anomaly:
|
|
emit(f"{label} raw {hex_preview(data)}")
|
|
return is_anomaly
|
|
|
|
|
|
def contains_frame(data: bytes, expected: bytes) -> bool:
|
|
return bool(expected) and expected in data
|
|
|
|
|
|
def send_and_read(ser: serial.Serial, emit, label: str, frame: bytes, duration: float) -> tuple[bool, bytes]:
|
|
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
emit(f"{stamp} TX {label} frame {len(frame):03d} {hex_preview(frame)}")
|
|
ser.write(frame)
|
|
ser.flush()
|
|
data = read_window(ser, duration)
|
|
is_anomaly = emit_rx(emit, f"{stamp} {label.upper()} RX", data)
|
|
return is_anomaly, data
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Sweep candidate unlatch commands and verify whether a second query responds."
|
|
)
|
|
parser.add_argument("--port", required=True, help="serial port, for example COM5")
|
|
parser.add_argument("--baud", type=int, default=38400)
|
|
parser.add_argument("--prefix1", type=parse_byte, default=0x00)
|
|
parser.add_argument("--prefix2", type=parse_byte, default=0x00)
|
|
parser.add_argument("--latch-primer-command", type=parse_byte, default=0x00)
|
|
parser.add_argument("--latch-query-command", type=parse_byte, default=0xB5)
|
|
parser.add_argument("--verify-primer-command", type=parse_byte, default=0x00)
|
|
parser.add_argument("--verify-query-command", type=parse_byte, default=0xB5)
|
|
parser.add_argument("--state", type=parse_byte, default=0x00)
|
|
parser.add_argument("--value", type=parse_byte, default=0x80)
|
|
parser.add_argument("--candidates", type=parse_byte_set, required=True)
|
|
parser.add_argument("--settle", type=float, default=3.0)
|
|
parser.add_argument("--between", type=float, default=0.8)
|
|
parser.add_argument("--after-candidate", type=float, default=1.2)
|
|
parser.add_argument("--after-verify", type=float, default=1.5)
|
|
parser.add_argument("--timeout", type=float, default=0.03)
|
|
parser.add_argument("--log", help="append sweep log to this file")
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
parser.add_argument("--prompt-power-cycle", action="store_true")
|
|
parser.add_argument("--prompt-screen", action="store_true")
|
|
parser.add_argument(
|
|
"--expected-verify-response",
|
|
type=parse_frame,
|
|
help="only count a verify hit when these bytes appear in verify-query RX",
|
|
)
|
|
parser.add_argument("--stop-on-verify-response", action="store_true")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
latch_primer = build_frame(args.prefix1, args.prefix2, args.latch_primer_command, args.state, args.value)
|
|
latch_query = build_frame(args.prefix1, args.prefix2, args.latch_query_command, args.state, args.value)
|
|
verify_primer = build_frame(args.prefix1, args.prefix2, args.verify_primer_command, args.state, args.value)
|
|
verify_query = build_frame(args.prefix1, args.prefix2, args.verify_query_command, args.state, args.value)
|
|
candidates = [
|
|
(command, build_frame(args.prefix1, args.prefix2, command, args.state, args.value))
|
|
for command in args.candidates
|
|
]
|
|
|
|
if args.dry_run:
|
|
print(f"latch primer 0x{args.latch_primer_command:02X}: {hex_preview(latch_primer)}")
|
|
print(f"latch query 0x{args.latch_query_command:02X}: {hex_preview(latch_query)}")
|
|
print(f"verify primer 0x{args.verify_primer_command:02X}: {hex_preview(verify_primer)}")
|
|
print(f"verify query 0x{args.verify_query_command:02X}: {hex_preview(verify_query)}")
|
|
for command, frame in candidates:
|
|
print(f"candidate 0x{command:02X}: {hex_preview(frame)}")
|
|
return 0
|
|
|
|
emit, log_file = make_logger(args.log)
|
|
verify_hits = 0
|
|
try:
|
|
with serial.Serial(
|
|
port=args.port,
|
|
baudrate=args.baud,
|
|
bytesize=serial.EIGHTBITS,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE,
|
|
timeout=args.timeout,
|
|
write_timeout=1.0,
|
|
rtscts=False,
|
|
dsrdtr=False,
|
|
xonxoff=False,
|
|
) as ser:
|
|
emit(
|
|
f"Unlatch sweep: latch {hex_preview(latch_primer)} -> {hex_preview(latch_query)}, "
|
|
f"verify {hex_preview(verify_primer)} -> {hex_preview(verify_query)}, "
|
|
f"{len(candidates)} candidates on {ser.port} at {ser.baudrate} 8N1"
|
|
)
|
|
if args.expected_verify_response:
|
|
emit(f"Expected verify response: {hex_preview(args.expected_verify_response)}")
|
|
|
|
for index, (command, candidate) in enumerate(candidates, start=1):
|
|
if args.prompt_power_cycle:
|
|
answer = input(
|
|
f"Power-cycle RCP for unlatch candidate 0x{command:02X}, "
|
|
"wait for heartbeat, then press Enter (q then Enter to stop): "
|
|
).strip()
|
|
if answer.lower() in {"q", "quit", "stop"}:
|
|
emit("Stopped before next candidate.")
|
|
break
|
|
|
|
ser.reset_input_buffer()
|
|
emit_rx(emit, f"CANDIDATE 0x{command:02X} BASELINE", read_window(ser, args.settle))
|
|
send_and_read(ser, emit, "latch primer", latch_primer, args.between)
|
|
send_and_read(ser, emit, "latch query", latch_query, args.between)
|
|
send_and_read(ser, emit, f"candidate 0x{command:02X}", candidate, args.after_candidate)
|
|
|
|
if args.prompt_screen:
|
|
screen = input(
|
|
f"Screen after candidate 0x{command:02X} "
|
|
"(blank = no change, q = stop): "
|
|
).strip()
|
|
if screen:
|
|
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
emit(f"{stamp} SCREEN candidate=0x{command:02X} {screen}")
|
|
if screen.lower() in {"q", "quit", "stop"}:
|
|
break
|
|
|
|
send_and_read(ser, emit, "verify primer", verify_primer, args.between)
|
|
verify_anomaly, verify_data = send_and_read(ser, emit, "verify query", verify_query, args.after_verify)
|
|
if args.expected_verify_response:
|
|
verify_hit = contains_frame(verify_data, args.expected_verify_response)
|
|
if verify_anomaly and not verify_hit:
|
|
emit(
|
|
f"VERIFY anomaly did not contain expected response "
|
|
f"{hex_preview(args.expected_verify_response)}"
|
|
)
|
|
else:
|
|
verify_hit = verify_anomaly
|
|
if verify_hit:
|
|
verify_hits += 1
|
|
emit(f"VERIFY RESPONSE after candidate 0x{command:02X}")
|
|
if args.stop_on_verify_response:
|
|
emit("Stopping after verify response.")
|
|
break
|
|
|
|
emit(f"Completed candidate {index}/{len(candidates)}")
|
|
|
|
emit(f"Verify responses: {verify_hits}")
|
|
except KeyboardInterrupt:
|
|
emit("Stopped.")
|
|
return 0
|
|
except serial.SerialException as exc:
|
|
print(f"Serial error: {exc}", file=sys.stderr)
|
|
return 1
|
|
finally:
|
|
if log_file:
|
|
log_file.close()
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|