#!/usr/bin/env python3 """Sweep candidate commands that may unlatch the RCP-TX7 response state. Each candidate test performs: latch primer -> latch query -> candidate -> verify primer -> verify query The verify query checks whether the RCP will answer again without a power cycle. Use --prompt-power-cycle for clean bench testing. """ from __future__ import annotations import argparse import datetime as dt import sys import time try: import serial except ImportError: print( "Missing dependency: pyserial\n" "Install it with: python -m pip install pyserial", file=sys.stderr, ) raise SystemExit(2) HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA") def parse_byte(text: str) -> int: value = int(text, 0) if not 0 <= value <= 0xFF: raise argparse.ArgumentTypeError(f"must be a byte: {text}") return value def parse_byte_set(text: str) -> list[int]: values: list[int] = [] for part in text.replace(",", " ").split(): if "-" in part: start_text, end_text = part.split("-", 1) start = parse_byte(start_text) end = parse_byte(end_text) if end < start: raise argparse.ArgumentTypeError(f"bad range: {part}") values.extend(range(start, end + 1)) else: values.append(parse_byte(part)) if not values: raise argparse.ArgumentTypeError("empty byte set") return values def parse_frame(text: str) -> bytes: values = parse_byte_set(text) if len(values) < 1: raise argparse.ArgumentTypeError("empty frame") return bytes(values) def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes: body = bytes([prefix1, prefix2, command, state, value]) checksum = 0x5A for byte in body: checksum ^= byte return body + bytes([checksum]) def hex_preview(data: bytes) -> str: return " ".join(f"{byte:02X}" for byte in data) def make_logger(path: str | None): log_file = open(path, "a", encoding="utf-8") if path else None def emit(line: str) -> None: print(line) if log_file: log_file.write(line + "\n") log_file.flush() return emit, log_file def heartbeat_offset(data: bytes) -> int | None: if not data: return 0 for offset in range(len(HEARTBEAT)): if all(byte == HEARTBEAT[(offset + index) % len(HEARTBEAT)] for index, byte in enumerate(data)): return offset return None def first_mismatch(data: bytes, offset: int) -> tuple[int, int, int] | None: for index, byte in enumerate(data): expected = HEARTBEAT[(offset + index) % len(HEARTBEAT)] if byte != expected: return index, byte, expected return None def classify_rx(data: bytes) -> tuple[bool, str]: if not data: return False, "no RX bytes" offset = heartbeat_offset(data) if offset is not None: full = len(data) // len(HEARTBEAT) extra = len(data) % len(HEARTBEAT) return False, f"heartbeat-compatible RX: {len(data)} bytes, offset {offset}, {full} frames + {extra} bytes" best_offset = min( range(len(HEARTBEAT)), key=lambda candidate: sum( byte != HEARTBEAT[(candidate + index) % len(HEARTBEAT)] for index, byte in enumerate(data) ), ) mismatch = first_mismatch(data, best_offset) if mismatch is None: return False, "heartbeat-compatible RX" index, byte, expected = mismatch return ( True, f"ANOMALY {len(data)} RX bytes; first mismatch at byte {index}: " f"got {byte:02X}, heartbeat offset {best_offset} expected {expected:02X}", ) def read_window(ser: serial.Serial, duration: float) -> bytes: stop_at = time.monotonic() + duration data = bytearray() while time.monotonic() < stop_at: chunk = ser.read(128) if chunk: data.extend(chunk) return bytes(data) def emit_rx(emit, label: str, data: bytes) -> bool: is_anomaly, note = classify_rx(data) emit(f"{label} {note}") if is_anomaly: emit(f"{label} raw {hex_preview(data)}") return is_anomaly def contains_frame(data: bytes, expected: bytes) -> bool: return bool(expected) and expected in data def send_and_read(ser: serial.Serial, emit, label: str, frame: bytes, duration: float) -> tuple[bool, bytes]: stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit(f"{stamp} TX {label} frame {len(frame):03d} {hex_preview(frame)}") ser.write(frame) ser.flush() data = read_window(ser, duration) is_anomaly = emit_rx(emit, f"{stamp} {label.upper()} RX", data) return is_anomaly, data def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Sweep candidate unlatch commands and verify whether a second query responds." ) parser.add_argument("--port", required=True, help="serial port, for example COM5") parser.add_argument("--baud", type=int, default=38400) parser.add_argument("--prefix1", type=parse_byte, default=0x00) parser.add_argument("--prefix2", type=parse_byte, default=0x00) parser.add_argument("--latch-primer-command", type=parse_byte, default=0x00) parser.add_argument("--latch-query-command", type=parse_byte, default=0xB5) parser.add_argument("--verify-primer-command", type=parse_byte, default=0x00) parser.add_argument("--verify-query-command", type=parse_byte, default=0xB5) parser.add_argument("--state", type=parse_byte, default=0x00) parser.add_argument("--value", type=parse_byte, default=0x80) parser.add_argument("--candidates", type=parse_byte_set, required=True) parser.add_argument("--settle", type=float, default=3.0) parser.add_argument("--between", type=float, default=0.8) parser.add_argument("--after-candidate", type=float, default=1.2) parser.add_argument("--after-verify", type=float, default=1.5) parser.add_argument("--timeout", type=float, default=0.03) parser.add_argument("--log", help="append sweep log to this file") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--prompt-power-cycle", action="store_true") parser.add_argument("--prompt-screen", action="store_true") parser.add_argument( "--expected-verify-response", type=parse_frame, help="only count a verify hit when these bytes appear in verify-query RX", ) parser.add_argument("--stop-on-verify-response", action="store_true") return parser.parse_args() def main() -> int: args = parse_args() latch_primer = build_frame(args.prefix1, args.prefix2, args.latch_primer_command, args.state, args.value) latch_query = build_frame(args.prefix1, args.prefix2, args.latch_query_command, args.state, args.value) verify_primer = build_frame(args.prefix1, args.prefix2, args.verify_primer_command, args.state, args.value) verify_query = build_frame(args.prefix1, args.prefix2, args.verify_query_command, args.state, args.value) candidates = [ (command, build_frame(args.prefix1, args.prefix2, command, args.state, args.value)) for command in args.candidates ] if args.dry_run: print(f"latch primer 0x{args.latch_primer_command:02X}: {hex_preview(latch_primer)}") print(f"latch query 0x{args.latch_query_command:02X}: {hex_preview(latch_query)}") print(f"verify primer 0x{args.verify_primer_command:02X}: {hex_preview(verify_primer)}") print(f"verify query 0x{args.verify_query_command:02X}: {hex_preview(verify_query)}") for command, frame in candidates: print(f"candidate 0x{command:02X}: {hex_preview(frame)}") return 0 emit, log_file = make_logger(args.log) verify_hits = 0 try: with serial.Serial( port=args.port, baudrate=args.baud, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=args.timeout, write_timeout=1.0, rtscts=False, dsrdtr=False, xonxoff=False, ) as ser: emit( f"Unlatch sweep: latch {hex_preview(latch_primer)} -> {hex_preview(latch_query)}, " f"verify {hex_preview(verify_primer)} -> {hex_preview(verify_query)}, " f"{len(candidates)} candidates on {ser.port} at {ser.baudrate} 8N1" ) if args.expected_verify_response: emit(f"Expected verify response: {hex_preview(args.expected_verify_response)}") for index, (command, candidate) in enumerate(candidates, start=1): if args.prompt_power_cycle: answer = input( f"Power-cycle RCP for unlatch candidate 0x{command:02X}, " "wait for heartbeat, then press Enter (q then Enter to stop): " ).strip() if answer.lower() in {"q", "quit", "stop"}: emit("Stopped before next candidate.") break ser.reset_input_buffer() emit_rx(emit, f"CANDIDATE 0x{command:02X} BASELINE", read_window(ser, args.settle)) send_and_read(ser, emit, "latch primer", latch_primer, args.between) send_and_read(ser, emit, "latch query", latch_query, args.between) send_and_read(ser, emit, f"candidate 0x{command:02X}", candidate, args.after_candidate) if args.prompt_screen: screen = input( f"Screen after candidate 0x{command:02X} " "(blank = no change, q = stop): " ).strip() if screen: stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit(f"{stamp} SCREEN candidate=0x{command:02X} {screen}") if screen.lower() in {"q", "quit", "stop"}: break send_and_read(ser, emit, "verify primer", verify_primer, args.between) verify_anomaly, verify_data = send_and_read(ser, emit, "verify query", verify_query, args.after_verify) if args.expected_verify_response: verify_hit = contains_frame(verify_data, args.expected_verify_response) if verify_anomaly and not verify_hit: emit( f"VERIFY anomaly did not contain expected response " f"{hex_preview(args.expected_verify_response)}" ) else: verify_hit = verify_anomaly if verify_hit: verify_hits += 1 emit(f"VERIFY RESPONSE after candidate 0x{command:02X}") if args.stop_on_verify_response: emit("Stopping after verify response.") break emit(f"Completed candidate {index}/{len(candidates)}") emit(f"Verify responses: {verify_hits}") except KeyboardInterrupt: emit("Stopped.") return 0 except serial.SerialException as exc: print(f"Serial error: {exc}", file=sys.stderr) return 1 finally: if log_file: log_file.close() return 0 if __name__ == "__main__": raise SystemExit(main())