#!/usr/bin/env python3 """Send discovery query, then repeat a candidate host keepalive frame. This tests the hypothesis that the RCP enters CONNECT NOT ACT because it sees some host traffic but does not receive the expected ongoing CCU heartbeat. """ from __future__ import annotations import argparse import datetime as dt import sys import time try: import serial except ImportError: print( "Missing dependency: pyserial\n" "Install it with: python -m pip install pyserial", file=sys.stderr, ) raise SystemExit(2) HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA") def parse_byte(text: str) -> int: value = int(text, 0) if not 0 <= value <= 0xFF: raise argparse.ArgumentTypeError(f"must be a byte: {text}") return value def parse_hex_bytes(text: str) -> bytes: parts = text.replace(",", " ").replace("0x", "").replace("0X", "").split() values = [int(part, 16) for part in parts] if not values: raise argparse.ArgumentTypeError("hex frame cannot be empty") if any(value < 0 or value > 0xFF for value in values): raise argparse.ArgumentTypeError("hex values must be bytes") return bytes(values) def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes: body = bytes([prefix1, prefix2, command, state, value]) checksum = 0x5A for byte in body: checksum ^= byte return body + bytes([checksum]) def hex_preview(data: bytes) -> str: return " ".join(f"{byte:02X}" for byte in data) def make_logger(path: str | None): log_file = open(path, "a", encoding="utf-8") if path else None def emit(line: str) -> None: print(line) if log_file: log_file.write(line + "\n") log_file.flush() return emit, log_file def heartbeat_offset(data: bytes) -> int | None: if not data: return 0 for offset in range(len(HEARTBEAT)): if all(byte == HEARTBEAT[(offset + index) % len(HEARTBEAT)] for index, byte in enumerate(data)): return offset return None def classify_rx(data: bytes) -> str: if not data: return "no RX bytes" offset = heartbeat_offset(data) if offset is not None: full = len(data) // len(HEARTBEAT) extra = len(data) % len(HEARTBEAT) return f"heartbeat-compatible RX: {len(data)} bytes, offset {offset}, {full} frames + {extra} bytes" return f"NON_HEARTBEAT RX: {len(data)} bytes {hex_preview(data)}" def read_window(ser: serial.Serial, duration: float) -> bytes: stop_at = time.monotonic() + duration data = bytearray() while time.monotonic() < stop_at: chunk = ser.read(128) if chunk: data.extend(chunk) return bytes(data) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Send discovery query, then repeat a candidate keepalive frame." ) parser.add_argument("--port", required=True, help="serial port, for example COM5") parser.add_argument("--baud", type=int, default=38400) parser.add_argument("--primer-command", type=parse_byte, default=0x00) parser.add_argument("--query-command", type=parse_byte, default=0xB5) parser.add_argument("--state", type=parse_byte, default=0x00) parser.add_argument("--value", type=parse_byte, default=0x80) parser.add_argument( "--keepalive-frame", type=parse_hex_bytes, help="explicit keepalive frame; default builds from --keepalive-command", ) parser.add_argument("--keepalive-command", type=parse_byte, default=0x00) parser.add_argument("--duration", type=float, default=10.0) parser.add_argument("--interval", type=float, default=0.6) parser.add_argument("--settle", type=float, default=3.0) parser.add_argument("--after-query", type=float, default=2.0) parser.add_argument("--timeout", type=float, default=0.03) parser.add_argument("--log", help="append log to this file") parser.add_argument("--prompt-screen", action="store_true") parser.add_argument("--dry-run", action="store_true") return parser.parse_args() def main() -> int: args = parse_args() primer = build_frame(0x00, 0x00, args.primer_command, args.state, args.value) query = build_frame(0x00, 0x00, args.query_command, args.state, args.value) keepalive = args.keepalive_frame or build_frame(0x00, 0x00, args.keepalive_command, args.state, args.value) if args.dry_run: print(f"primer: {hex_preview(primer)}") print(f"query: {hex_preview(query)}") print(f"keepalive: {hex_preview(keepalive)} every {args.interval}s for {args.duration}s") return 0 emit, log_file = make_logger(args.log) try: with serial.Serial( port=args.port, baudrate=args.baud, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=args.timeout, write_timeout=1.0, rtscts=False, dsrdtr=False, xonxoff=False, ) as ser: ser.reset_input_buffer() emit(f"Keepalive-after-query on {ser.port} at {ser.baudrate} 8N1") emit(f"Primer {hex_preview(primer)}; query {hex_preview(query)}; keepalive {hex_preview(keepalive)}") emit(f"BASELINE {classify_rx(read_window(ser, args.settle))}") stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit(f"{stamp} TX primer {hex_preview(primer)}") ser.write(primer) ser.flush() emit(f"{stamp} PRIMER RX {classify_rx(read_window(ser, args.interval))}") stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit(f"{stamp} TX query {hex_preview(query)}") ser.write(query) ser.flush() emit(f"{stamp} QUERY RX {classify_rx(read_window(ser, args.after_query))}") stop_at = time.monotonic() + args.duration count = 0 while time.monotonic() < stop_at: stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] ser.write(keepalive) ser.flush() count += 1 emit(f"{stamp} TX keepalive {count:03d} {hex_preview(keepalive)}") rx = read_window(ser, min(args.interval, max(0.0, stop_at - time.monotonic()))) if rx: emit(f"{stamp} KEEPALIVE RX {classify_rx(rx)}") if args.prompt_screen: screen = input("Screen after keepalive run (blank = no change): ").strip() if screen: stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3] emit(f"{stamp} SCREEN {screen}") except KeyboardInterrupt: emit("Stopped.") return 0 except serial.SerialException as exc: print(f"Serial error: {exc}", file=sys.stderr) return 1 finally: if log_file: log_file.close() return 0 if __name__ == "__main__": raise SystemExit(main())