Response from RCP

This commit is contained in:
Aiden
2026-05-13 13:10:25 +10:00
parent f99a60710e
commit f406bc12a2
21 changed files with 2073 additions and 4 deletions

View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python3
"""Summarize fixed-size hex frames from serial_sniff.py logs."""
"""Summarize fixed-size hex frames from serial helper logs."""
from __future__ import annotations
@@ -9,7 +9,11 @@ import re
from pathlib import Path
FRAME_RE = re.compile(r"\b(?:(RX|TX)\s+)?frame\s+\d+\s+((?:[0-9A-Fa-f]{2}\s*)+)$")
FRAME_RE = re.compile(
r"\b(?P<direction>RX|TX)(?:\s+cmd\s+0x[0-9A-Fa-f]{2})?"
r"(?:\s+fields\b.*?)?\s+"
r"frame\s+\d+\s+(?P<frame>(?:[0-9A-Fa-f]{2}\s*)+)(?:\s+\S.*)?$"
)
def frames_from_log(path: Path) -> list[tuple[str, str]]:
@@ -17,8 +21,8 @@ def frames_from_log(path: Path) -> list[tuple[str, str]]:
for line in path.read_text(encoding="utf-8").splitlines():
match = FRAME_RE.search(line.strip())
if match:
direction = match.group(1) or "RX"
frame = " ".join(match.group(2).upper().split())
direction = match.group("direction")
frame = " ".join(match.group("frame").upper().split())
frames.append((direction, frame))
return frames

View File

@@ -0,0 +1,293 @@
#!/usr/bin/env python3
"""Sweep RCP-TX7 host frames and flag raw RX bytes beyond heartbeat.
This helper is for unattended response hunting. It sends checksum-valid 6-byte
candidate frames and reads the RCP-to-host line as raw bytes. Instead of forcing
the incoming stream into fixed 6-byte frames, it checks whether the received
bytes can be explained as a contiguous slice of the known repeated heartbeat:
00 00 00 00 80 DA
That avoids false positives such as:
00 00 00 80 DA 00
which is just the normal heartbeat viewed one byte late.
"""
from __future__ import annotations
import argparse
import datetime as dt
import itertools
import sys
import time
try:
import serial
except ImportError:
print(
"Missing dependency: pyserial\n"
"Install it with: python -m pip install pyserial",
file=sys.stderr,
)
raise SystemExit(2)
HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA")
def parse_byte(text: str) -> int:
value = int(text, 0)
if not 0 <= value <= 0xFF:
raise argparse.ArgumentTypeError(f"must be a byte: {text}")
return value
def parse_byte_set(text: str) -> list[int]:
values: list[int] = []
for part in text.replace(",", " ").split():
if "-" in part:
start_text, end_text = part.split("-", 1)
start = parse_byte(start_text)
end = parse_byte(end_text)
if end < start:
raise argparse.ArgumentTypeError(f"bad range: {part}")
values.extend(range(start, end + 1))
else:
values.append(parse_byte(part))
if not values:
raise argparse.ArgumentTypeError("empty byte set")
return list(dict.fromkeys(values))
def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes:
body = bytes([prefix1, prefix2, command, state, value])
checksum = 0x5A
for byte in body:
checksum ^= byte
return body + bytes([checksum])
def hex_preview(data: bytes) -> str:
return " ".join(f"{byte:02X}" for byte in data)
def make_logger(path: str | None):
log_file = open(path, "a", encoding="utf-8") if path else None
def emit(line: str) -> None:
print(line)
if log_file:
log_file.write(line + "\n")
log_file.flush()
return emit, log_file
def heartbeat_offset(data: bytes) -> int | None:
if not data:
return 0
for offset in range(len(HEARTBEAT)):
if all(byte == HEARTBEAT[(offset + index) % len(HEARTBEAT)] for index, byte in enumerate(data)):
return offset
return None
def first_mismatch(data: bytes, offset: int) -> tuple[int, int, int] | None:
for index, byte in enumerate(data):
expected = HEARTBEAT[(offset + index) % len(HEARTBEAT)]
if byte != expected:
return index, byte, expected
return None
def classify_rx(data: bytes) -> tuple[bool, str]:
if not data:
return False, "no RX bytes"
offset = heartbeat_offset(data)
if offset is not None:
full = len(data) // len(HEARTBEAT)
extra = len(data) % len(HEARTBEAT)
return False, f"heartbeat-compatible RX: {len(data)} bytes, offset {offset}, {full} frames + {extra} bytes"
best_offset = min(
range(len(HEARTBEAT)),
key=lambda candidate: sum(
byte != HEARTBEAT[(candidate + index) % len(HEARTBEAT)]
for index, byte in enumerate(data)
),
)
mismatch = first_mismatch(data, best_offset)
if mismatch is None:
return False, "heartbeat-compatible RX"
index, byte, expected = mismatch
return (
True,
f"ANOMALY {len(data)} RX bytes; first mismatch at byte {index}: "
f"got {byte:02X}, heartbeat offset {best_offset} expected {expected:02X}",
)
def read_window(ser: serial.Serial, duration: float) -> bytes:
stop_at = time.monotonic() + duration
data = bytearray()
while time.monotonic() < stop_at:
chunk = ser.read(128)
if chunk:
data.extend(chunk)
return bytes(data)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Sweep candidate host frames and stop/log on non-heartbeat raw RX."
)
parser.add_argument("--port", required=True, help="serial port, for example COM5")
parser.add_argument("--baud", type=int, default=38400)
parser.add_argument("--prefix1s", type=parse_byte_set, default=[0x00])
parser.add_argument("--prefix2s", type=parse_byte_set, default=[0x00])
parser.add_argument("--commands", type=parse_byte_set, default=[0x00])
parser.add_argument("--states", type=parse_byte_set, default=[0x00])
parser.add_argument("--values", type=parse_byte_set, default=[0x80])
parser.add_argument("--settle", type=float, default=1.5)
parser.add_argument("--after-each", type=float, default=1.0)
parser.add_argument("--after", type=float, default=2.0)
parser.add_argument("--timeout", type=float, default=0.03)
parser.add_argument("--log", help="append sweep log to this file")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--stop-on-anomaly", action="store_true")
parser.add_argument("--verbose-heartbeat", action="store_true")
parser.add_argument("--cycles", type=int, default=1)
parser.add_argument("--cycle-pause", type=float, default=0.0)
parser.add_argument(
"--max-frames",
type=int,
default=512,
help="safety limit for generated frames",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
if args.cycles < 1:
raise SystemExit("--cycles must be >= 1")
rows = list(
itertools.product(
args.prefix1s,
args.prefix2s,
args.commands,
args.states,
args.values,
)
)
if len(rows) > args.max_frames:
raise SystemExit(
f"Refusing to send {len(rows)} frames; raise --max-frames if intentional"
)
frames = [
(prefix1, prefix2, command, state, value, build_frame(prefix1, prefix2, command, state, value))
for prefix1, prefix2, command, state, value in rows
]
if args.dry_run:
for prefix1, prefix2, command, state, value, frame in frames:
print(
f"p1=0x{prefix1:02X} p2=0x{prefix2:02X} cmd=0x{command:02X} "
f"state=0x{state:02X} value=0x{value:02X}: {hex_preview(frame)}"
)
return 0
emit, log_file = make_logger(args.log)
anomaly_count = 0
try:
with serial.Serial(
port=args.port,
baudrate=args.baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=args.timeout,
write_timeout=1.0,
rtscts=False,
dsrdtr=False,
xonxoff=False,
) as ser:
ser.reset_input_buffer()
total_frames = len(frames) * args.cycles
emit(
f"Direct response sweep: {len(frames)} frames x {args.cycles} cycles "
f"({total_frames} total) on {ser.port} at {ser.baudrate} 8N1"
)
baseline = read_window(ser, args.settle)
baseline_anomaly, baseline_note = classify_rx(baseline)
emit(f"BASELINE {baseline_note}")
if baseline_anomaly:
emit(f"BASELINE raw {hex_preview(baseline)}")
should_stop = False
for cycle in range(1, args.cycles + 1):
if args.cycles > 1:
emit(f"CYCLE {cycle}/{args.cycles}")
for prefix1, prefix2, command, state, value, frame in frames:
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(
f"{stamp} TX cycle={cycle} p1=0x{prefix1:02X} p2=0x{prefix2:02X} "
f"cmd=0x{command:02X} state=0x{state:02X} value=0x{value:02X} "
f"frame {len(frame):03d} {hex_preview(frame)}"
)
ser.write(frame)
ser.flush()
rx = read_window(ser, args.after_each)
is_anomaly, note = classify_rx(rx)
if is_anomaly:
anomaly_count += 1
emit(f"{stamp} {note}")
emit(f"{stamp} RX raw {hex_preview(rx)}")
if args.stop_on_anomaly:
emit("Stopping after anomaly.")
should_stop = True
break
elif args.verbose_heartbeat:
emit(f"{stamp} {note}")
if should_stop:
break
if cycle < args.cycles and args.cycle_pause > 0:
pause_rx = read_window(ser, args.cycle_pause)
pause_anomaly, pause_note = classify_rx(pause_rx)
emit(f"CYCLE {cycle} PAUSE {pause_note}")
if pause_anomaly:
anomaly_count += 1
emit(f"CYCLE {cycle} PAUSE raw {hex_preview(pause_rx)}")
if args.stop_on_anomaly:
emit("Stopping after anomaly.")
break
final_rx = read_window(ser, args.after)
final_anomaly, final_note = classify_rx(final_rx)
emit(f"FINAL {final_note}")
if final_anomaly:
emit(f"FINAL raw {hex_preview(final_rx)}")
emit(f"Anomalies: {anomaly_count}")
except KeyboardInterrupt:
emit("Stopped.")
return 0
except serial.SerialException as exc:
print(f"Serial error: {exc}", file=sys.stderr)
return 1
finally:
if log_file:
log_file.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,226 @@
#!/usr/bin/env python3
"""Targeted RCP-TX7 host-frame field probe.
This is for the phase after a command sweep finds parser-visible commands. It
keeps the observed 6-byte frame shape:
prefix1 prefix2 command state value checksum
and tries a small matrix of selected field values with the checksum hypothesis:
checksum = 0x5A xor prefix1 xor prefix2 xor command xor state xor value
Use narrow ranges and reset the RCP between screen-triggering attempts when you
need independent observations.
"""
from __future__ import annotations
import argparse
import datetime as dt
import itertools
import sys
import time
try:
import serial
except ImportError:
print(
"Missing dependency: pyserial\n"
"Install it with: python -m pip install pyserial",
file=sys.stderr,
)
raise SystemExit(2)
HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA")
def parse_byte(text: str) -> int:
value = int(text, 0)
if not 0 <= value <= 0xFF:
raise argparse.ArgumentTypeError(f"must be a byte: {text}")
return value
def parse_byte_set(text: str) -> list[int]:
values: list[int] = []
for part in text.replace(",", " ").split():
if "-" in part:
start_text, end_text = part.split("-", 1)
start = parse_byte(start_text)
end = parse_byte(end_text)
if end < start:
raise argparse.ArgumentTypeError(f"bad range: {part}")
values.extend(range(start, end + 1))
else:
values.append(parse_byte(part))
if not values:
raise argparse.ArgumentTypeError("empty byte set")
deduped = list(dict.fromkeys(values))
return deduped
def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes:
body = bytes([prefix1, prefix2, command, state, value])
checksum = 0x5A
for byte in body:
checksum ^= byte
return body + bytes([checksum])
def hex_preview(data: bytes) -> str:
return " ".join(f"{byte:02X}" for byte in data)
def make_logger(path: str | None):
log_file = open(path, "a", encoding="utf-8") if path else None
def emit(line: str) -> None:
print(line)
if log_file:
log_file.write(line + "\n")
log_file.flush()
return emit, log_file
def drain_rx(ser: serial.Serial, emit, until: float, frame_size: int) -> int:
buffer = bytearray()
interesting = 0
while time.monotonic() < until:
data = ser.read(64)
if not data:
continue
buffer.extend(data)
while len(buffer) >= frame_size:
frame = bytes(buffer[:frame_size])
del buffer[:frame_size]
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
marker = "" if frame == HEARTBEAT else " NON_HEARTBEAT"
emit(f"{stamp} RX frame {frame_size:03d} {hex_preview(frame)}{marker}")
if frame != HEARTBEAT:
interesting += 1
return interesting
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Probe selected RCP-TX7 frame fields and log screen observations."
)
parser.add_argument("--port", required=True, help="serial port, for example COM5")
parser.add_argument("--baud", type=int, default=38400)
parser.add_argument("--prefix1s", type=parse_byte_set, default=[0x00])
parser.add_argument("--prefix2s", type=parse_byte_set, default=[0x00])
parser.add_argument("--commands", type=parse_byte_set, default=[0x15])
parser.add_argument("--states", type=parse_byte_set, default=[0x00, 0x80])
parser.add_argument("--values", type=parse_byte_set, default=[0x00, 0x80])
parser.add_argument("--settle", type=float, default=1.5)
parser.add_argument("--after-each", type=float, default=1.0)
parser.add_argument("--after", type=float, default=3.0)
parser.add_argument("--frame-size", type=int, default=6)
parser.add_argument("--log", help="append capture/transmit log to this file")
parser.add_argument("--prompt-screen", action="store_true")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument(
"--max-frames",
type=int,
default=64,
help="safety limit for generated frames",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
rows = list(
itertools.product(
args.prefix1s,
args.prefix2s,
args.commands,
args.states,
args.values,
)
)
if len(rows) > args.max_frames:
raise SystemExit(
f"Refusing to send {len(rows)} frames; raise --max-frames if intentional"
)
frames = [
(prefix1, prefix2, command, state, value, build_frame(prefix1, prefix2, command, state, value))
for prefix1, prefix2, command, state, value in rows
]
if args.dry_run:
for prefix1, prefix2, command, state, value, frame in frames:
print(
f"p1=0x{prefix1:02X} p2=0x{prefix2:02X} cmd=0x{command:02X} "
f"state=0x{state:02X} value=0x{value:02X}: {hex_preview(frame)}"
)
return 0
emit, log_file = make_logger(args.log)
try:
with serial.Serial(
port=args.port,
baudrate=args.baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=0.05,
write_timeout=1.0,
rtscts=False,
dsrdtr=False,
xonxoff=False,
) as ser:
ser.reset_input_buffer()
emit(f"Probing {len(frames)} field combinations on {ser.port} at {ser.baudrate} 8N1")
drain_rx(ser, emit, time.monotonic() + args.settle, args.frame_size)
for prefix1, prefix2, command, state, value, frame in frames:
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(
f"{stamp} TX fields p1=0x{prefix1:02X} p2=0x{prefix2:02X} "
f"cmd=0x{command:02X} state=0x{state:02X} value=0x{value:02X} "
f"frame {len(frame):03d} {hex_preview(frame)}"
)
ser.write(frame)
ser.flush()
drain_rx(ser, emit, time.monotonic() + args.after_each, args.frame_size)
if args.prompt_screen:
screen = input(
"Screen after "
f"cmd 0x{command:02X} state 0x{state:02X} value 0x{value:02X} "
"(blank = no change, q = stop): "
).strip()
if screen:
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(
f"{stamp} SCREEN p1=0x{prefix1:02X} p2=0x{prefix2:02X} "
f"cmd=0x{command:02X} state=0x{state:02X} "
f"value=0x{value:02X} {screen}"
)
if screen.lower() in {"q", "quit", "stop"}:
break
drain_rx(ser, emit, time.monotonic() + args.after, args.frame_size)
except KeyboardInterrupt:
print("Stopped.")
return 0
except serial.SerialException as exc:
print(f"Serial error: {exc}", file=sys.stderr)
return 1
finally:
if log_file:
log_file.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())