This commit is contained in:
Aiden
2026-05-13 16:24:13 +10:00
parent 7f0ec19798
commit 9d76820ef6
6 changed files with 365 additions and 1 deletions

View File

@@ -6,6 +6,7 @@ This helper can:
1. Optionally put the RCP into a latched state with a known primer/query.
2. Listen for the button frames that are known to appear while disconnected.
3. Optionally transmit response frames when CAM POWER or CALL is observed.
4. Optionally mirror CALL high/low events with matching host responses.
Known RCP-origin button frames:
@@ -144,12 +145,23 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--after-latch", type=float, default=1.0)
parser.add_argument("--respond-to-cam-power", action="store_true")
parser.add_argument("--respond-to-call", action="store_true")
parser.add_argument(
"--mirror-call",
action="store_true",
help="respond to CALL high with CALL high and CALL low with CALL low",
)
parser.add_argument(
"--response-frame",
type=parse_hex_bytes,
action="append",
help="hex frame to send when CAM POWER is seen; can be repeated",
)
parser.add_argument(
"--watch-frame",
type=parse_hex_bytes,
action="append",
help="hex frame to count when it appears in RX; can be repeated",
)
parser.add_argument("--response-delay", type=float, default=0.05)
parser.add_argument("--response-repeat", type=int, default=1)
parser.add_argument("--response-interval", type=float, default=0.2)
@@ -158,6 +170,11 @@ def parse_args() -> argparse.Namespace:
action="store_true",
help="only transmit the response on the first matched button frame",
)
parser.add_argument(
"--mirror-call-once-per-state",
action="store_true",
help="with --mirror-call, respond once to CALL high and once to CALL low",
)
parser.add_argument("--prompt", action="store_true", help="pause before listen so you can prepare button presses")
return parser.parse_args()
@@ -166,10 +183,14 @@ def main() -> int:
args = parse_args()
emit, log_file = make_logger(args.log)
response_frames = args.response_frame or [CAM_POWER]
watch_frames = args.watch_frame or []
primer = build_frame(0x00, 0x00, args.latch_primer_command, args.state, args.value)
query = build_frame(0x00, 0x00, args.latch_query_command, args.state, args.value)
responded = False
mirrored_call_on = False
mirrored_call_off = False
totals = {name: 0 for name in KNOWN_PATTERNS}
watch_totals = {hex_preview(frame): 0 for frame in watch_frames}
try:
with serial.Serial(
@@ -195,11 +216,13 @@ def main() -> int:
if args.prompt:
input("Ready to listen. Press Enter, then press CAM POWER/CALL on the RCP: ")
ser.reset_input_buffer()
emit(
f"Listening for {args.duration:.1f}s; "
f"respond_to_cam_power={args.respond_to_cam_power}, "
f"respond_to_call={args.respond_to_call}"
f"respond_to_call={args.respond_to_call}, "
f"mirror_call={args.mirror_call}"
)
stop_at = time.monotonic() + args.duration
buffer = bytearray()
@@ -219,6 +242,32 @@ def main() -> int:
if count:
totals[name] += count
emit(f"{stamp} DETECT {name} x{count}")
for frame in watch_frames:
count = data.count(frame)
if count:
key = hex_preview(frame)
watch_totals[key] += count
emit(f"{stamp} DETECT watch-frame {key} x{count}")
if args.mirror_call:
mirrored = False
if CALL_ON in buffer and not (
args.mirror_call_once_per_state and mirrored_call_on
):
send_frame(ser, emit, "CALL high mirror", CALL_ON)
mirrored_call_on = True
responded = True
mirrored = True
if CALL_OFF in buffer and not (
args.mirror_call_once_per_state and mirrored_call_off
):
send_frame(ser, emit, "CALL low mirror", CALL_OFF)
mirrored_call_off = True
responded = True
mirrored = True
if mirrored:
buffer.clear()
continue
should_respond = (
(
@@ -243,6 +292,11 @@ def main() -> int:
"Totals: "
+ ", ".join(f"{name}={count}" for name, count in totals.items())
)
if watch_totals:
emit(
"Watch totals: "
+ ", ".join(f"{frame}={count}" for frame, count in watch_totals.items())
)
except KeyboardInterrupt:
emit("Stopped.")
return 0