run 3 and 4 and 5

This commit is contained in:
Aiden
2026-05-13 16:15:22 +10:00
parent 659ad33c48
commit 7f0ec19798
12 changed files with 2102 additions and 0 deletions

View File

@@ -0,0 +1,260 @@
#!/usr/bin/env python3
"""Button-focused RCP-TX7 serial tests.
This helper can:
1. Optionally put the RCP into a latched state with a known primer/query.
2. Listen for the button frames that are known to appear while disconnected.
3. Optionally transmit response frames when CAM POWER or CALL is observed.
Known RCP-origin button frames:
CAM POWER: 00 00 07 80 00 DD
CALL on: 00 00 15 80 00 CF
CALL off: 00 00 15 00 00 4F
"""
from __future__ import annotations
import argparse
import datetime as dt
import sys
import time
try:
import serial
except ImportError:
print(
"Missing dependency: pyserial\n"
"Install it with: python -m pip install pyserial",
file=sys.stderr,
)
raise SystemExit(2)
HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA")
CAM_POWER = bytes.fromhex("00 00 07 80 00 DD")
CALL_ON = bytes.fromhex("00 00 15 80 00 CF")
CALL_OFF = bytes.fromhex("00 00 15 00 00 4F")
KNOWN_PATTERNS = {
"heartbeat": HEARTBEAT,
"cam-power": CAM_POWER,
"call-on": CALL_ON,
"call-off": CALL_OFF,
}
def parse_byte(text: str) -> int:
value = int(text, 0)
if not 0 <= value <= 0xFF:
raise argparse.ArgumentTypeError(f"must be a byte: {text}")
return value
def parse_hex_bytes(text: str) -> bytes:
normalized = text.replace(",", " ").replace("0x", "").replace("0X", "")
parts = normalized.split()
if not parts:
raise argparse.ArgumentTypeError("hex frame cannot be empty")
try:
values = [int(part, 16) for part in parts]
except ValueError as exc:
raise argparse.ArgumentTypeError(f"invalid hex byte list: {text}") from exc
if any(value < 0 or value > 0xFF for value in values):
raise argparse.ArgumentTypeError("hex values must be bytes")
return bytes(values)
def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes:
body = bytes([prefix1, prefix2, command, state, value])
checksum = 0x5A
for byte in body:
checksum ^= byte
return body + bytes([checksum])
def hex_preview(data: bytes) -> str:
return " ".join(f"{byte:02X}" for byte in data)
def ascii_preview(data: bytes) -> str:
return "".join(chr(byte) if 32 <= byte <= 126 else "." for byte in data)
def make_logger(path: str | None):
log_file = open(path, "a", encoding="utf-8") if path else None
def emit(line: str) -> None:
print(line)
if log_file:
log_file.write(line + "\n")
log_file.flush()
return emit, log_file
def read_window(ser: serial.Serial, duration: float) -> bytes:
stop_at = time.monotonic() + duration
data = bytearray()
while time.monotonic() < stop_at:
chunk = ser.read(128)
if chunk:
data.extend(chunk)
return bytes(data)
def send_frame(ser: serial.Serial, emit, label: str, frame: bytes) -> None:
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} TX {label} frame {len(frame):03d} {hex_preview(frame)}")
ser.write(frame)
ser.flush()
def emit_known_counts(emit, label: str, data: bytes) -> None:
if not data:
emit(f"{label} no RX bytes")
return
counts = {
name: data.count(pattern)
for name, pattern in KNOWN_PATTERNS.items()
if data.count(pattern)
}
count_text = ", ".join(f"{name}={count}" for name, count in counts.items()) or "no known complete frames"
emit(f"{label} RX {len(data)} bytes; {count_text}")
if counts.get("cam-power") or counts.get("call-on") or counts.get("call-off"):
emit(f"{label} raw {hex_preview(data)}")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Listen for CAM POWER/CALL frames and optionally respond to CAM POWER."
)
parser.add_argument("--port", required=True, help="serial port, for example COM5")
parser.add_argument("--baud", type=int, default=38400)
parser.add_argument("--timeout", type=float, default=0.03)
parser.add_argument("--duration", type=float, default=30.0)
parser.add_argument("--chunk-size", type=int, default=128)
parser.add_argument("--log", help="append capture/transmit log to this file")
parser.add_argument("--ascii", action="store_true")
parser.add_argument("--latch", action="store_true", help="send latch primer/query before listening")
parser.add_argument("--latch-primer-command", type=parse_byte, default=0x00)
parser.add_argument("--latch-query-command", type=parse_byte, default=0xB5)
parser.add_argument("--state", type=parse_byte, default=0x00)
parser.add_argument("--value", type=parse_byte, default=0x80)
parser.add_argument("--after-latch", type=float, default=1.0)
parser.add_argument("--respond-to-cam-power", action="store_true")
parser.add_argument("--respond-to-call", action="store_true")
parser.add_argument(
"--response-frame",
type=parse_hex_bytes,
action="append",
help="hex frame to send when CAM POWER is seen; can be repeated",
)
parser.add_argument("--response-delay", type=float, default=0.05)
parser.add_argument("--response-repeat", type=int, default=1)
parser.add_argument("--response-interval", type=float, default=0.2)
parser.add_argument(
"--respond-once",
action="store_true",
help="only transmit the response on the first matched button frame",
)
parser.add_argument("--prompt", action="store_true", help="pause before listen so you can prepare button presses")
return parser.parse_args()
def main() -> int:
args = parse_args()
emit, log_file = make_logger(args.log)
response_frames = args.response_frame or [CAM_POWER]
primer = build_frame(0x00, 0x00, args.latch_primer_command, args.state, args.value)
query = build_frame(0x00, 0x00, args.latch_query_command, args.state, args.value)
responded = False
totals = {name: 0 for name in KNOWN_PATTERNS}
try:
with serial.Serial(
port=args.port,
baudrate=args.baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=args.timeout,
write_timeout=1.0,
rtscts=False,
dsrdtr=False,
xonxoff=False,
) as ser:
ser.reset_input_buffer()
emit(f"Button test on {ser.port} at {ser.baudrate} 8N1")
if args.latch:
send_frame(ser, emit, "latch primer", primer)
emit_known_counts(emit, "LATCH PRIMER", read_window(ser, args.after_latch))
send_frame(ser, emit, "latch query", query)
emit_known_counts(emit, "LATCH QUERY", read_window(ser, args.after_latch))
if args.prompt:
input("Ready to listen. Press Enter, then press CAM POWER/CALL on the RCP: ")
emit(
f"Listening for {args.duration:.1f}s; "
f"respond_to_cam_power={args.respond_to_cam_power}, "
f"respond_to_call={args.respond_to_call}"
)
stop_at = time.monotonic() + args.duration
buffer = bytearray()
while time.monotonic() < stop_at:
data = ser.read(args.chunk_size)
if not data:
continue
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} RX {len(data):03d} bytes {hex_preview(data)}")
if args.ascii:
emit(f"{'':14} ASCII {ascii_preview(data)}")
buffer.extend(data)
for name, pattern in KNOWN_PATTERNS.items():
count = data.count(pattern)
if count:
totals[name] += count
emit(f"{stamp} DETECT {name} x{count}")
should_respond = (
(
(args.respond_to_cam_power and CAM_POWER in buffer)
or (args.respond_to_call and (CALL_ON in buffer or CALL_OFF in buffer))
)
and not (args.respond_once and responded)
)
if should_respond:
responded = True
time.sleep(args.response_delay)
for _ in range(args.response_repeat):
for response in response_frames:
send_frame(ser, emit, "button response", response)
if args.response_repeat > 1:
time.sleep(args.response_interval)
buffer.clear()
elif len(buffer) > 256:
del buffer[:-64]
emit(
"Totals: "
+ ", ".join(f"{name}={count}" for name, count in totals.items())
)
except KeyboardInterrupt:
emit("Stopped.")
return 0
except serial.SerialException as exc:
print(f"Serial error: {exc}", file=sys.stderr)
return 1
finally:
if log_file:
log_file.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,304 @@
#!/usr/bin/env python3
"""Sweep candidate commands that may unlatch the RCP-TX7 response state.
Each candidate test performs:
latch primer -> latch query -> candidate -> verify primer -> verify query
The verify query checks whether the RCP will answer again without a power cycle.
Use --prompt-power-cycle for clean bench testing.
"""
from __future__ import annotations
import argparse
import datetime as dt
import sys
import time
try:
import serial
except ImportError:
print(
"Missing dependency: pyserial\n"
"Install it with: python -m pip install pyserial",
file=sys.stderr,
)
raise SystemExit(2)
HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA")
def parse_byte(text: str) -> int:
value = int(text, 0)
if not 0 <= value <= 0xFF:
raise argparse.ArgumentTypeError(f"must be a byte: {text}")
return value
def parse_byte_set(text: str) -> list[int]:
values: list[int] = []
for part in text.replace(",", " ").split():
if "-" in part:
start_text, end_text = part.split("-", 1)
start = parse_byte(start_text)
end = parse_byte(end_text)
if end < start:
raise argparse.ArgumentTypeError(f"bad range: {part}")
values.extend(range(start, end + 1))
else:
values.append(parse_byte(part))
if not values:
raise argparse.ArgumentTypeError("empty byte set")
return values
def parse_frame(text: str) -> bytes:
values = parse_byte_set(text)
if len(values) < 1:
raise argparse.ArgumentTypeError("empty frame")
return bytes(values)
def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes:
body = bytes([prefix1, prefix2, command, state, value])
checksum = 0x5A
for byte in body:
checksum ^= byte
return body + bytes([checksum])
def hex_preview(data: bytes) -> str:
return " ".join(f"{byte:02X}" for byte in data)
def make_logger(path: str | None):
log_file = open(path, "a", encoding="utf-8") if path else None
def emit(line: str) -> None:
print(line)
if log_file:
log_file.write(line + "\n")
log_file.flush()
return emit, log_file
def heartbeat_offset(data: bytes) -> int | None:
if not data:
return 0
for offset in range(len(HEARTBEAT)):
if all(byte == HEARTBEAT[(offset + index) % len(HEARTBEAT)] for index, byte in enumerate(data)):
return offset
return None
def first_mismatch(data: bytes, offset: int) -> tuple[int, int, int] | None:
for index, byte in enumerate(data):
expected = HEARTBEAT[(offset + index) % len(HEARTBEAT)]
if byte != expected:
return index, byte, expected
return None
def classify_rx(data: bytes) -> tuple[bool, str]:
if not data:
return False, "no RX bytes"
offset = heartbeat_offset(data)
if offset is not None:
full = len(data) // len(HEARTBEAT)
extra = len(data) % len(HEARTBEAT)
return False, f"heartbeat-compatible RX: {len(data)} bytes, offset {offset}, {full} frames + {extra} bytes"
best_offset = min(
range(len(HEARTBEAT)),
key=lambda candidate: sum(
byte != HEARTBEAT[(candidate + index) % len(HEARTBEAT)]
for index, byte in enumerate(data)
),
)
mismatch = first_mismatch(data, best_offset)
if mismatch is None:
return False, "heartbeat-compatible RX"
index, byte, expected = mismatch
return (
True,
f"ANOMALY {len(data)} RX bytes; first mismatch at byte {index}: "
f"got {byte:02X}, heartbeat offset {best_offset} expected {expected:02X}",
)
def read_window(ser: serial.Serial, duration: float) -> bytes:
stop_at = time.monotonic() + duration
data = bytearray()
while time.monotonic() < stop_at:
chunk = ser.read(128)
if chunk:
data.extend(chunk)
return bytes(data)
def emit_rx(emit, label: str, data: bytes) -> bool:
is_anomaly, note = classify_rx(data)
emit(f"{label} {note}")
if is_anomaly:
emit(f"{label} raw {hex_preview(data)}")
return is_anomaly
def contains_frame(data: bytes, expected: bytes) -> bool:
return bool(expected) and expected in data
def send_and_read(ser: serial.Serial, emit, label: str, frame: bytes, duration: float) -> tuple[bool, bytes]:
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} TX {label} frame {len(frame):03d} {hex_preview(frame)}")
ser.write(frame)
ser.flush()
data = read_window(ser, duration)
is_anomaly = emit_rx(emit, f"{stamp} {label.upper()} RX", data)
return is_anomaly, data
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Sweep candidate unlatch commands and verify whether a second query responds."
)
parser.add_argument("--port", required=True, help="serial port, for example COM5")
parser.add_argument("--baud", type=int, default=38400)
parser.add_argument("--prefix1", type=parse_byte, default=0x00)
parser.add_argument("--prefix2", type=parse_byte, default=0x00)
parser.add_argument("--latch-primer-command", type=parse_byte, default=0x00)
parser.add_argument("--latch-query-command", type=parse_byte, default=0xB5)
parser.add_argument("--verify-primer-command", type=parse_byte, default=0x00)
parser.add_argument("--verify-query-command", type=parse_byte, default=0xB5)
parser.add_argument("--state", type=parse_byte, default=0x00)
parser.add_argument("--value", type=parse_byte, default=0x80)
parser.add_argument("--candidates", type=parse_byte_set, required=True)
parser.add_argument("--settle", type=float, default=3.0)
parser.add_argument("--between", type=float, default=0.8)
parser.add_argument("--after-candidate", type=float, default=1.2)
parser.add_argument("--after-verify", type=float, default=1.5)
parser.add_argument("--timeout", type=float, default=0.03)
parser.add_argument("--log", help="append sweep log to this file")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--prompt-power-cycle", action="store_true")
parser.add_argument("--prompt-screen", action="store_true")
parser.add_argument(
"--expected-verify-response",
type=parse_frame,
help="only count a verify hit when these bytes appear in verify-query RX",
)
parser.add_argument("--stop-on-verify-response", action="store_true")
return parser.parse_args()
def main() -> int:
args = parse_args()
latch_primer = build_frame(args.prefix1, args.prefix2, args.latch_primer_command, args.state, args.value)
latch_query = build_frame(args.prefix1, args.prefix2, args.latch_query_command, args.state, args.value)
verify_primer = build_frame(args.prefix1, args.prefix2, args.verify_primer_command, args.state, args.value)
verify_query = build_frame(args.prefix1, args.prefix2, args.verify_query_command, args.state, args.value)
candidates = [
(command, build_frame(args.prefix1, args.prefix2, command, args.state, args.value))
for command in args.candidates
]
if args.dry_run:
print(f"latch primer 0x{args.latch_primer_command:02X}: {hex_preview(latch_primer)}")
print(f"latch query 0x{args.latch_query_command:02X}: {hex_preview(latch_query)}")
print(f"verify primer 0x{args.verify_primer_command:02X}: {hex_preview(verify_primer)}")
print(f"verify query 0x{args.verify_query_command:02X}: {hex_preview(verify_query)}")
for command, frame in candidates:
print(f"candidate 0x{command:02X}: {hex_preview(frame)}")
return 0
emit, log_file = make_logger(args.log)
verify_hits = 0
try:
with serial.Serial(
port=args.port,
baudrate=args.baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=args.timeout,
write_timeout=1.0,
rtscts=False,
dsrdtr=False,
xonxoff=False,
) as ser:
emit(
f"Unlatch sweep: latch {hex_preview(latch_primer)} -> {hex_preview(latch_query)}, "
f"verify {hex_preview(verify_primer)} -> {hex_preview(verify_query)}, "
f"{len(candidates)} candidates on {ser.port} at {ser.baudrate} 8N1"
)
if args.expected_verify_response:
emit(f"Expected verify response: {hex_preview(args.expected_verify_response)}")
for index, (command, candidate) in enumerate(candidates, start=1):
if args.prompt_power_cycle:
answer = input(
f"Power-cycle RCP for unlatch candidate 0x{command:02X}, "
"wait for heartbeat, then press Enter (q then Enter to stop): "
).strip()
if answer.lower() in {"q", "quit", "stop"}:
emit("Stopped before next candidate.")
break
ser.reset_input_buffer()
emit_rx(emit, f"CANDIDATE 0x{command:02X} BASELINE", read_window(ser, args.settle))
send_and_read(ser, emit, "latch primer", latch_primer, args.between)
send_and_read(ser, emit, "latch query", latch_query, args.between)
send_and_read(ser, emit, f"candidate 0x{command:02X}", candidate, args.after_candidate)
if args.prompt_screen:
screen = input(
f"Screen after candidate 0x{command:02X} "
"(blank = no change, q = stop): "
).strip()
if screen:
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} SCREEN candidate=0x{command:02X} {screen}")
if screen.lower() in {"q", "quit", "stop"}:
break
send_and_read(ser, emit, "verify primer", verify_primer, args.between)
verify_anomaly, verify_data = send_and_read(ser, emit, "verify query", verify_query, args.after_verify)
if args.expected_verify_response:
verify_hit = contains_frame(verify_data, args.expected_verify_response)
if verify_anomaly and not verify_hit:
emit(
f"VERIFY anomaly did not contain expected response "
f"{hex_preview(args.expected_verify_response)}"
)
else:
verify_hit = verify_anomaly
if verify_hit:
verify_hits += 1
emit(f"VERIFY RESPONSE after candidate 0x{command:02X}")
if args.stop_on_verify_response:
emit("Stopping after verify response.")
break
emit(f"Completed candidate {index}/{len(candidates)}")
emit(f"Verify responses: {verify_hits}")
except KeyboardInterrupt:
emit("Stopped.")
return 0
except serial.SerialException as exc:
print(f"Serial error: {exc}", file=sys.stderr)
return 1
finally:
if log_file:
log_file.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())