More tests

This commit is contained in:
Aiden
2026-05-13 14:45:34 +10:00
parent 769e47ed67
commit 9bcc75c2e9
9 changed files with 1253 additions and 0 deletions

View File

@@ -0,0 +1,201 @@
#!/usr/bin/env python3
"""Send discovery query, then repeat a candidate host keepalive frame.
This tests the hypothesis that the RCP enters CONNECT NOT ACT because it sees
some host traffic but does not receive the expected ongoing CCU heartbeat.
"""
from __future__ import annotations
import argparse
import datetime as dt
import sys
import time
try:
import serial
except ImportError:
print(
"Missing dependency: pyserial\n"
"Install it with: python -m pip install pyserial",
file=sys.stderr,
)
raise SystemExit(2)
HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA")
def parse_byte(text: str) -> int:
value = int(text, 0)
if not 0 <= value <= 0xFF:
raise argparse.ArgumentTypeError(f"must be a byte: {text}")
return value
def parse_hex_bytes(text: str) -> bytes:
parts = text.replace(",", " ").replace("0x", "").replace("0X", "").split()
values = [int(part, 16) for part in parts]
if not values:
raise argparse.ArgumentTypeError("hex frame cannot be empty")
if any(value < 0 or value > 0xFF for value in values):
raise argparse.ArgumentTypeError("hex values must be bytes")
return bytes(values)
def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes:
body = bytes([prefix1, prefix2, command, state, value])
checksum = 0x5A
for byte in body:
checksum ^= byte
return body + bytes([checksum])
def hex_preview(data: bytes) -> str:
return " ".join(f"{byte:02X}" for byte in data)
def make_logger(path: str | None):
log_file = open(path, "a", encoding="utf-8") if path else None
def emit(line: str) -> None:
print(line)
if log_file:
log_file.write(line + "\n")
log_file.flush()
return emit, log_file
def heartbeat_offset(data: bytes) -> int | None:
if not data:
return 0
for offset in range(len(HEARTBEAT)):
if all(byte == HEARTBEAT[(offset + index) % len(HEARTBEAT)] for index, byte in enumerate(data)):
return offset
return None
def classify_rx(data: bytes) -> str:
if not data:
return "no RX bytes"
offset = heartbeat_offset(data)
if offset is not None:
full = len(data) // len(HEARTBEAT)
extra = len(data) % len(HEARTBEAT)
return f"heartbeat-compatible RX: {len(data)} bytes, offset {offset}, {full} frames + {extra} bytes"
return f"NON_HEARTBEAT RX: {len(data)} bytes {hex_preview(data)}"
def read_window(ser: serial.Serial, duration: float) -> bytes:
stop_at = time.monotonic() + duration
data = bytearray()
while time.monotonic() < stop_at:
chunk = ser.read(128)
if chunk:
data.extend(chunk)
return bytes(data)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Send discovery query, then repeat a candidate keepalive frame."
)
parser.add_argument("--port", required=True, help="serial port, for example COM5")
parser.add_argument("--baud", type=int, default=38400)
parser.add_argument("--primer-command", type=parse_byte, default=0x00)
parser.add_argument("--query-command", type=parse_byte, default=0xB5)
parser.add_argument("--state", type=parse_byte, default=0x00)
parser.add_argument("--value", type=parse_byte, default=0x80)
parser.add_argument(
"--keepalive-frame",
type=parse_hex_bytes,
help="explicit keepalive frame; default builds from --keepalive-command",
)
parser.add_argument("--keepalive-command", type=parse_byte, default=0x00)
parser.add_argument("--duration", type=float, default=10.0)
parser.add_argument("--interval", type=float, default=0.6)
parser.add_argument("--settle", type=float, default=3.0)
parser.add_argument("--after-query", type=float, default=2.0)
parser.add_argument("--timeout", type=float, default=0.03)
parser.add_argument("--log", help="append log to this file")
parser.add_argument("--prompt-screen", action="store_true")
parser.add_argument("--dry-run", action="store_true")
return parser.parse_args()
def main() -> int:
args = parse_args()
primer = build_frame(0x00, 0x00, args.primer_command, args.state, args.value)
query = build_frame(0x00, 0x00, args.query_command, args.state, args.value)
keepalive = args.keepalive_frame or build_frame(0x00, 0x00, args.keepalive_command, args.state, args.value)
if args.dry_run:
print(f"primer: {hex_preview(primer)}")
print(f"query: {hex_preview(query)}")
print(f"keepalive: {hex_preview(keepalive)} every {args.interval}s for {args.duration}s")
return 0
emit, log_file = make_logger(args.log)
try:
with serial.Serial(
port=args.port,
baudrate=args.baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=args.timeout,
write_timeout=1.0,
rtscts=False,
dsrdtr=False,
xonxoff=False,
) as ser:
ser.reset_input_buffer()
emit(f"Keepalive-after-query on {ser.port} at {ser.baudrate} 8N1")
emit(f"Primer {hex_preview(primer)}; query {hex_preview(query)}; keepalive {hex_preview(keepalive)}")
emit(f"BASELINE {classify_rx(read_window(ser, args.settle))}")
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} TX primer {hex_preview(primer)}")
ser.write(primer)
ser.flush()
emit(f"{stamp} PRIMER RX {classify_rx(read_window(ser, args.interval))}")
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} TX query {hex_preview(query)}")
ser.write(query)
ser.flush()
emit(f"{stamp} QUERY RX {classify_rx(read_window(ser, args.after_query))}")
stop_at = time.monotonic() + args.duration
count = 0
while time.monotonic() < stop_at:
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
ser.write(keepalive)
ser.flush()
count += 1
emit(f"{stamp} TX keepalive {count:03d} {hex_preview(keepalive)}")
rx = read_window(ser, min(args.interval, max(0.0, stop_at - time.monotonic())))
if rx:
emit(f"{stamp} KEEPALIVE RX {classify_rx(rx)}")
if args.prompt_screen:
screen = input("Screen after keepalive run (blank = no change): ").strip()
if screen:
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} SCREEN {screen}")
except KeyboardInterrupt:
emit("Stopped.")
return 0
except serial.SerialException as exc:
print(f"Serial error: {exc}", file=sys.stderr)
return 1
finally:
if log_file:
log_file.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())