new discovery phase

This commit is contained in:
Aiden
2026-05-13 13:57:19 +10:00
parent 5e736bf395
commit 769e47ed67
11 changed files with 970 additions and 1 deletions

View File

@@ -0,0 +1,11 @@
Direct response sweep: 5 frames x 1 cycles (5 total) on COM5 at 38400 8N1
BASELINE heartbeat-compatible RX: 24 bytes, offset 0, 4 frames + 0 bytes
13:46:41.102 TX cycle=1 p1=0x00 p2=0x00 cmd=0x00 state=0x00 value=0x80 frame 006 00 00 00 00 80 DA
13:46:41.929 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB2 state=0x00 value=0x80 frame 006 00 00 B2 00 80 68
13:46:41.929 ANOMALY 24 RX bytes; first mismatch at byte 0: got 07, heartbeat offset 3 expected 00
13:46:41.929 RX raw 07 80 36 10 0C F7 07 80 36 10 0C F7 07 80 36 10 0C F7 07 80 36 10 0C F7
13:46:42.752 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB3 state=0x00 value=0x80 frame 006 00 00 B3 00 80 69
13:46:43.577 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB4 state=0x00 value=0x80 frame 006 00 00 B4 00 80 6E
13:46:44.400 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB5 state=0x00 value=0x80 frame 006 00 00 B5 00 80 6F
FINAL heartbeat-compatible RX: 30 bytes, offset 0, 5 frames + 0 bytes
Anomalies: 1

View File

@@ -0,0 +1,10 @@
Direct response sweep: 4 frames x 1 cycles (4 total) on COM5 at 38400 8N1
BASELINE heartbeat-compatible RX: 24 bytes, offset 0, 4 frames + 0 bytes
13:51:07.753 TX cycle=1 p1=0x00 p2=0x00 cmd=0x00 state=0x00 value=0x80 frame 006 00 00 00 00 80 DA
13:51:08.574 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB5 state=0x00 value=0x80 frame 006 00 00 B5 00 80 6F
13:51:08.574 ANOMALY 24 RX bytes; first mismatch at byte 0: got 07, heartbeat offset 3 expected 00
13:51:08.574 RX raw 07 80 6D 20 D8 48 07 80 6D 20 D8 48 07 80 6D 20 D8 48 07 80 6D 20 D8 48
13:51:09.396 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB5 state=0x00 value=0x80 frame 006 00 00 B5 00 80 6F
13:51:10.220 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB5 state=0x00 value=0x80 frame 006 00 00 B5 00 80 6F
FINAL heartbeat-compatible RX: 30 bytes, offset 0, 5 frames + 0 bytes
Anomalies: 1

View File

@@ -0,0 +1,9 @@
Direct response sweep: 2 frames x 1 cycles (2 total) on COM5 at 38400 8N1
BASELINE heartbeat-compatible RX: 24 bytes, offset 0, 4 frames + 0 bytes
13:47:32.862 TX cycle=1 p1=0x00 p2=0x00 cmd=0x00 state=0x00 value=0x80 frame 006 00 00 00 00 80 DA
13:47:33.688 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB5 state=0x00 value=0x80 frame 006 00 00 B5 00 80 6F
13:47:33.688 ANOMALY 24 RX bytes; first mismatch at byte 0: got 07, heartbeat offset 3 expected 00
13:47:33.688 RX raw 07 80 6D 20 D8 48 07 80 6D 20 D8 48 07 80 6D 20 D8 48 07 80 6D 20 D8 48
FINAL ANOMALY 36 RX bytes; first mismatch at byte 0: got 07, heartbeat offset 3 expected 00
FINAL raw 07 80 6D 20 D8 48 07 80 6D 20 D8 48 07 80 6D 20 D8 48 07 80 6D 20 D8 48 07 80 6D 20 D8 48 00 00 00 00 80 DA
Anomalies: 1

View File

@@ -0,0 +1,12 @@
Direct response sweep: 6 frames x 1 cycles (6 total) on COM5 at 38400 8N1
BASELINE heartbeat-compatible RX: 30 bytes, offset 0, 5 frames + 0 bytes
13:51:31.244 TX cycle=1 p1=0x00 p2=0x00 cmd=0x00 state=0x00 value=0x80 frame 006 00 00 00 00 80 DA
13:51:32.067 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB5 state=0x00 value=0x80 frame 006 00 00 B5 00 80 6F
13:51:32.067 ANOMALY 24 RX bytes; first mismatch at byte 0: got 07, heartbeat offset 3 expected 00
13:51:32.067 RX raw 07 80 6D 20 D8 48 07 80 6D 20 D8 48 07 80 6D 20 D8 48 07 80 6D 20 D8 48
13:51:32.893 TX cycle=1 p1=0x00 p2=0x00 cmd=0x00 state=0x00 value=0x80 frame 006 00 00 00 00 80 DA
13:51:33.720 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB5 state=0x00 value=0x80 frame 006 00 00 B5 00 80 6F
13:51:34.544 TX cycle=1 p1=0x00 p2=0x00 cmd=0x00 state=0x00 value=0x80 frame 006 00 00 00 00 80 DA
13:51:35.366 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB5 state=0x00 value=0x80 frame 006 00 00 B5 00 80 6F
FINAL heartbeat-compatible RX: 30 bytes, offset 0, 5 frames + 0 bytes
Anomalies: 1

View File

@@ -0,0 +1,9 @@
Direct response sweep: 2 frames x 1 cycles (2 total) on COM5 at 38400 8N1
BASELINE heartbeat-compatible RX: 24 bytes, offset 0, 4 frames + 0 bytes
13:47:48.759 TX cycle=1 p1=0x00 p2=0x00 cmd=0x00 state=0x00 value=0x80 frame 006 00 00 00 00 80 DA
13:47:49.581 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB5 state=0x00 value=0x80 frame 006 00 00 B5 00 80 6F
13:47:49.581 ANOMALY 24 RX bytes; first mismatch at byte 0: got 07, heartbeat offset 3 expected 00
13:47:49.581 RX raw 07 80 6D 20 D8 48 07 80 6D 20 D8 48 07 80 6D 20 D8 48 07 80 6D 20 D8 48
FINAL ANOMALY 30 RX bytes; first mismatch at byte 0: got 07, heartbeat offset 0 expected 00
FINAL raw 07 80 6D 20 D8 48 07 80 6D 20 D8 48 07 80 6D 20 D8 48 07 80 6D 20 D8 48 00 00 00 00 80 DA
Anomalies: 1

View File

@@ -0,0 +1,14 @@
Direct response sweep: 8 frames x 1 cycles (8 total) on COM5 at 38400 8N1
BASELINE heartbeat-compatible RX: 24 bytes, offset 0, 4 frames + 0 bytes
13:50:40.514 TX cycle=1 p1=0x00 p2=0x00 cmd=0x00 state=0x00 value=0x80 frame 006 00 00 00 00 80 DA
13:50:41.338 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB2 state=0x00 value=0x80 frame 006 00 00 B2 00 80 68
13:50:41.338 ANOMALY 30 RX bytes; first mismatch at byte 0: got 07, heartbeat offset 3 expected 00
13:50:41.338 RX raw 07 80 36 10 0C F7 07 80 36 10 0C F7 07 80 36 10 0C F7 07 80 36 10 0C F7 07 80 36 10 0C F7
13:50:42.161 TX cycle=1 p1=0x00 p2=0x00 cmd=0x00 state=0x00 value=0x80 frame 006 00 00 00 00 80 DA
13:50:42.989 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB3 state=0x00 value=0x80 frame 006 00 00 B3 00 80 69
13:50:43.813 TX cycle=1 p1=0x00 p2=0x00 cmd=0x00 state=0x00 value=0x80 frame 006 00 00 00 00 80 DA
13:50:44.638 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB4 state=0x00 value=0x80 frame 006 00 00 B4 00 80 6E
13:50:45.458 TX cycle=1 p1=0x00 p2=0x00 cmd=0x00 state=0x00 value=0x80 frame 006 00 00 00 00 80 DA
13:50:46.280 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB5 state=0x00 value=0x80 frame 006 00 00 B5 00 80 6F
FINAL heartbeat-compatible RX: 30 bytes, offset 0, 5 frames + 0 bytes
Anomalies: 1

View File

@@ -0,0 +1,11 @@
Direct response sweep: 5 frames x 1 cycles (5 total) on COM5 at 38400 8N1
BASELINE heartbeat-compatible RX: 24 bytes, offset 0, 4 frames + 0 bytes
13:47:04.045 TX cycle=1 p1=0x00 p2=0x00 cmd=0x00 state=0x00 value=0x80 frame 006 00 00 00 00 80 DA
13:47:04.870 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB2 state=0x00 value=0x80 frame 006 00 00 B2 00 80 68
13:47:04.870 ANOMALY 30 RX bytes; first mismatch at byte 0: got 07, heartbeat offset 3 expected 00
13:47:04.870 RX raw 07 80 36 10 0C F7 07 80 36 10 0C F7 07 80 36 10 0C F7 07 80 36 10 0C F7 07 80 36 10 0C F7
13:47:05.694 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB3 state=0x00 value=0x80 frame 006 00 00 B3 00 80 69
13:47:06.518 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB4 state=0x00 value=0x80 frame 006 00 00 B4 00 80 6E
13:47:07.341 TX cycle=1 p1=0x00 p2=0x00 cmd=0xB5 state=0x00 value=0x80 frame 006 00 00 B5 00 80 6F
FINAL heartbeat-compatible RX: 30 bytes, offset 0, 5 frames + 0 bytes
Anomalies: 1

View File

@@ -1413,3 +1413,316 @@ Optional latch test, without power-cycling after the first run:
```powershell ```powershell
python scripts/serial_direct_response_sweep.py --port COM5 --commands "0x00 0xB5" --states 0x00 --values 0x80 --settle 3 --after-each 0.6 --cycles 3 --cycle-pause 2 --log captures/rcp-latch-primer-00-b5-cycles.txt python scripts/serial_direct_response_sweep.py --port COM5 --commands "0x00 0xB5" --states 0x00 --values 0x80 --settle 3 --after-each 0.6 --cycles 3 --cycle-pause 2 --log captures/rcp-latch-primer-00-b5-cycles.txt
``` ```
## Current Inferred Behavior
The current evidence suggests the RCP is entering a discovery/query/status-read
phase, not a completed active-control handshake.
Working model:
```text
Host/CCU -> RCP: valid primer frame
Host/CCU -> RCP: selected query/status command
RCP -> Host/CCU: checksum-valid response frame repeated briefly
RCP -> Host/CCU: returns to heartbeat
```
Important details:
- Single selected commands such as `B2`, `B6`, and `B7` do not respond from a
cold panel.
- A preceding valid frame is required. `00 00 00 00 80 DA` works as a generic
primer for several selected commands.
- The second command selects the response payload.
- The LCD can remain `CONNECT NOT ACT` while serial responses vary in a
structured way. Serial response does not yet mean the active control session
is accepted.
- At least some primed queries appear one-shot after power-up. Repeating the
same primed query without power-cycling can produce only heartbeat traffic.
Likely protocol role:
- These `B0`-range commands may be a CCU discovery or capability/status query
phase.
- The CCU may query RCP model/capability/state blocks before sending a later
activation/session command.
- The next unknown is the command or command sequence that follows these
discovery responses and moves the panel from `CONNECT NOT ACT` to active.
## Primer-Candidate Broad Sweep
Use `scripts/serial_primer_candidate_sweep.py` for broader searches based on
the current primer/query model. It sends:
```text
primer frame -> candidate frame -> raw RX classification
```
For clean mapping, use `--prompt-power-cycle` and power-cycle before each
candidate. This avoids the one-shot/latch behavior contaminating later
candidates.
Example dry run:
```powershell
python scripts/serial_primer_candidate_sweep.py --port COM5 --candidates "0xB0-0xB7" --dry-run
```
Continue the known `B` range first:
```powershell
python scripts/serial_primer_candidate_sweep.py --port COM5 --candidates "0xB1 0xB6 0xB7 0xB8 0xB9 0xBA 0xBB 0xBC 0xBD 0xBE 0xBF" --prompt-power-cycle --stop-on-anomaly --log captures/rcp-primer-sweep-b1-bf.txt
```
Because `--stop-on-anomaly` stops at the first response, after each hit:
1. Save the reported candidate and response frame.
2. Power-cycle the panel.
3. Restart the sweep from the next unmapped candidate.
For non-stop mapping, omit `--stop-on-anomaly`, but still power-cycle at each
prompt:
```powershell
python scripts/serial_primer_candidate_sweep.py --port COM5 --candidates "0xB8-0xBF" --prompt-power-cycle --log captures/rcp-primer-sweep-b8-bf.txt
```
Suggested broad ranges after `B0-BF`:
```powershell
python scripts/serial_primer_candidate_sweep.py --port COM5 --candidates "0xA0-0xAF" --prompt-power-cycle --log captures/rcp-primer-sweep-a0-af.txt
python scripts/serial_primer_candidate_sweep.py --port COM5 --candidates "0xC0-0xCF" --prompt-power-cycle --log captures/rcp-primer-sweep-c0-cf.txt
python scripts/serial_primer_candidate_sweep.py --port COM5 --candidates "0x00-0x1F" --prompt-power-cycle --log captures/rcp-primer-sweep-00-1f.txt
```
Recommended first run:
```powershell
python scripts/serial_primer_candidate_sweep.py --port COM5 --candidates "0xB1 0xB6 0xB7 0xB8 0xB9 0xBA 0xBB 0xBC 0xBD 0xBE 0xBF" --prompt-power-cycle --log captures/rcp-primer-sweep-b1-bf.txt
```
## Primer Reuse and Sequential Query Tests
Two open questions:
1. After a cold boot, does the RCP only answer one selected query before it
latches/suppresses further responses?
2. Is a fresh primer required before every selected query, or can one primer
unlock several selected commands in sequence?
Use `scripts/serial_direct_response_sweep.py` for these tests because it can
send arbitrary command sequences without stopping between commands. For each
test below, power-cycle once before starting the script, then do not power-cycle
again until the script exits.
### Test S1: One Primer, Multiple Different Queries
Purpose: test whether one primer can unlock several different selected commands.
```powershell
python scripts/serial_direct_response_sweep.py --port COM5 --commands "0x00 0xB2 0xB3 0xB4 0xB5" --states 0x00 --values 0x80 --settle 3 --after-each 0.8 --log captures/rcp-seq-one-primer-b2-b5.txt
```
Interpretation:
- If only `B2` responds, the panel likely allows one selected response per
cold-boot/primer state.
- If `B2`, `B3`, `B4`, and `B5` all respond, one primer can unlock multiple
sequential queries.
- If some respond and some do not, there may be command-group or latch behavior.
### Test S2: Primer Before Every Query, No Power Cycle
Purpose: test whether a new primer can re-arm another selected query without
power-cycling.
```powershell
python scripts/serial_direct_response_sweep.py --port COM5 --commands "0x00 0xB2 0x00 0xB3 0x00 0xB4 0x00 0xB5" --states 0x00 --values 0x80 --settle 3 --after-each 0.8 --log captures/rcp-seq-reprimer-b2-b5.txt
```
Interpretation:
- If every selected command responds, a primer is required before each query but
power-cycling is not.
- If only the first selected command responds, power-cycle or another reset-like
command may be required to clear the latch.
### Test S3: Repeat Same Query With and Without Reprimer
Purpose: test whether the same selected query can be repeated in one powered
session.
Without re-primer:
```powershell
python scripts/serial_direct_response_sweep.py --port COM5 --commands "0x00 0xB5 0xB5 0xB5" --states 0x00 --values 0x80 --settle 3 --after-each 0.8 --log captures/rcp-seq-repeat-b5-no-reprimer.txt
```
Power-cycle, then with re-primer:
```powershell
python scripts/serial_direct_response_sweep.py --port COM5 --commands "0x00 0xB5 0x00 0xB5 0x00 0xB5" --states 0x00 --values 0x80 --settle 3 --after-each 0.8 --log captures/rcp-seq-repeat-b5-reprimer.txt
```
Interpretation:
- If only the first `B5` responds in both tests, the response is one-shot until
power cycle or a yet-unknown reset/ack command.
- If the re-primer version responds repeatedly, the primer re-arms the selected
query.
### 2026-05-13 Sequential Query Test Result
Captures:
- `captures/rcp-seq-one-primer-b2-b5.txt`
- `captures/rcp-seq-reprimer-b2-b5.txt`
- `captures/rcp-seq-repeat-b5-no-reprimer.txt`
- `captures/rcp-seq-repeat-b5-reprimer.txt`
Valid result:
| Test | Intended sequence | Actual sequence sent | Result |
| --- | --- | --- | --- |
| S1 | `00 -> B2 -> B3 -> B4 -> B5` | `00 -> B2 -> B3 -> B4 -> B5` | only `B2` responded: `07 80 36 10 0C F7` |
Tooling caveat:
- The original `serial_direct_response_sweep.py` de-duplicated command lists.
- Because of that, sequences containing repeated commands did not run as
intended.
- `S2`, `S3 no re-primer`, and `S3 re-primer` need to be rerun after the
script fix.
- The script has been updated to preserve repeated command values in explicit
command lists.
Interpretation from S1:
- One primer did not unlock a whole list of feature/status queries.
- After `00 -> B2` returned `07 80 36 10 0C F7`, later `B3`, `B4`, and `B5`
in the same powered session did not produce additional non-heartbeat frames.
- This supports a one-response latch model unless the re-primer test proves that
the primer can re-arm another query.
Rerun these tests after the script fix:
```powershell
python scripts/serial_direct_response_sweep.py --port COM5 --commands "0x00 0xB2 0x00 0xB3 0x00 0xB4 0x00 0xB5" --states 0x00 --values 0x80 --settle 3 --after-each 0.8 --log captures/rcp-seq-reprimer-b2-b5-v2.txt
```
Power-cycle, then:
```powershell
python scripts/serial_direct_response_sweep.py --port COM5 --commands "0x00 0xB5 0xB5 0xB5" --states 0x00 --values 0x80 --settle 3 --after-each 0.8 --log captures/rcp-seq-repeat-b5-no-reprimer-v2.txt
```
Power-cycle, then:
```powershell
python scripts/serial_direct_response_sweep.py --port COM5 --commands "0x00 0xB5 0x00 0xB5 0x00 0xB5" --states 0x00 --values 0x80 --settle 3 --after-each 0.8 --log captures/rcp-seq-repeat-b5-reprimer-v2.txt
```
### 2026-05-13 Sequential Query Rerun Result
Captures:
- `captures/rcp-seq-reprimer-b2-b5-v2.txt`
- `captures/rcp-seq-repeat-b5-no-reprimer-v2.txt`
- `captures/rcp-seq-repeat-b5-reprimer-v2.txt`
These reruns used the fixed `serial_direct_response_sweep.py`, which preserves
repeated command values in explicit sequences.
Results:
| Test | Sequence | Non-heartbeat response(s) |
| --- | --- | --- |
| Re-primer between different queries | `00 -> B2 -> 00 -> B3 -> 00 -> B4 -> 00 -> B5` | only `B2`: `07 80 36 10 0C F7` |
| Repeat `B5`, no re-primer | `00 -> B5 -> B5 -> B5` | only first `B5`: `07 80 6D 20 D8 48` |
| Repeat `B5`, re-primer each time | `00 -> B5 -> 00 -> B5 -> 00 -> B5` | only first `B5`: `07 80 6D 20 D8 48` |
Interpretation:
- The RCP appears to allow only one selected query response per powered session
in the current `CONNECT NOT ACT` state.
- Sending another primer (`00 00 00 00 80 DA`) after the first response does not
re-arm the query responder.
- Repeating the same selected query does not produce another response.
- This strongly suggests a one-shot discovery/status response followed by a
required next-stage command, acknowledgement, reset, or activation step.
Implication for CCU behavior:
- The CCU may not scan a list of feature queries in the current state. It may
send one discovery/status query, receive one response, then decide what
activation/session command to send next.
- Alternatively, additional feature reads may require an acknowledgement or
state-advance command that has not yet been identified.
Recommended next direction:
- Stop broad feature scanning for the moment.
- Search for the post-discovery acknowledgement/activation command that follows
one known response such as `00 -> B5 => 07 80 6D 20 D8 48`.
- Use a three-step pattern:
```text
primer -> selected query -> candidate activation/ack command
```
Known reproducible setup:
```text
Host -> RCP: 00 00 00 00 80 DA
Host -> RCP: 00 00 B5 00 80 6F
RCP -> Host: 07 80 6D 20 D8 48
Host -> RCP: candidate next-stage command
```
## Post-Discovery Candidate Sweep
Use `scripts/serial_post_discovery_sweep.py` to search for the command that
comes after one known discovery/status response. This is the likely next stage
after the one-shot response behavior.
Default setup:
```text
primer: 00 00 00 00 80 DA
query: 00 00 B5 00 80 6F
RCP: 07 80 6D 20 D8 48
then: candidate next-stage command
```
Recommended first post-discovery sweep:
```powershell
python scripts/serial_post_discovery_sweep.py --port COM5 --candidates "0x00-0x1F" --prompt-power-cycle --prompt-screen --log captures/rcp-post-discovery-b5-candidates-00-1f.txt
```
For each candidate:
1. Power-cycle the RCP.
2. Wait for heartbeat/panel stable.
3. Press Enter at the prompt.
4. Watch for any screen change after the candidate frame.
5. Type the screen state if it changes, or press Enter for no visible change.
Why this range first:
- Earlier frame-length tests showed small command values can change screen state
to `CONNECT NOT ACT`.
- If a simple ACK/activation command exists, it may be in the low command range.
Next ranges if `00-1F` does not change state:
```powershell
python scripts/serial_post_discovery_sweep.py --port COM5 --candidates "0x20-0x3F" --prompt-power-cycle --prompt-screen --log captures/rcp-post-discovery-b5-candidates-20-3f.txt
python scripts/serial_post_discovery_sweep.py --port COM5 --candidates "0x80-0x9F" --prompt-power-cycle --prompt-screen --log captures/rcp-post-discovery-b5-candidates-80-9f.txt
python scripts/serial_post_discovery_sweep.py --port COM5 --candidates "0xB0-0xBF" --prompt-power-cycle --prompt-screen --log captures/rcp-post-discovery-b5-candidates-b0-bf.txt
```
If any candidate changes the screen away from `CONNECT NOT ACT`, or produces a
new RCP response after the candidate stage, retest that candidate alone with
three fresh power cycles.

View File

@@ -58,7 +58,7 @@ def parse_byte_set(text: str) -> list[int]:
values.append(parse_byte(part)) values.append(parse_byte(part))
if not values: if not values:
raise argparse.ArgumentTypeError("empty byte set") raise argparse.ArgumentTypeError("empty byte set")
return list(dict.fromkeys(values)) return values
def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes: def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes:

View File

@@ -0,0 +1,302 @@
#!/usr/bin/env python3
"""Sweep candidate commands after a known RCP-TX7 discovery response.
Current working sequence:
primer -> discovery query -> RCP response -> candidate next-stage command
By default this uses:
primer: 00 00 00 00 80 DA
query: 00 00 B5 00 80 6F
Use --prompt-power-cycle for clean testing because the RCP appears to latch after
one selected query response per powered session.
"""
from __future__ import annotations
import argparse
import datetime as dt
import sys
import time
try:
import serial
except ImportError:
print(
"Missing dependency: pyserial\n"
"Install it with: python -m pip install pyserial",
file=sys.stderr,
)
raise SystemExit(2)
HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA")
def parse_byte(text: str) -> int:
value = int(text, 0)
if not 0 <= value <= 0xFF:
raise argparse.ArgumentTypeError(f"must be a byte: {text}")
return value
def parse_byte_set(text: str) -> list[int]:
values: list[int] = []
for part in text.replace(",", " ").split():
if "-" in part:
start_text, end_text = part.split("-", 1)
start = parse_byte(start_text)
end = parse_byte(end_text)
if end < start:
raise argparse.ArgumentTypeError(f"bad range: {part}")
values.extend(range(start, end + 1))
else:
values.append(parse_byte(part))
if not values:
raise argparse.ArgumentTypeError("empty byte set")
return values
def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes:
body = bytes([prefix1, prefix2, command, state, value])
checksum = 0x5A
for byte in body:
checksum ^= byte
return body + bytes([checksum])
def hex_preview(data: bytes) -> str:
return " ".join(f"{byte:02X}" for byte in data)
def make_logger(path: str | None):
log_file = open(path, "a", encoding="utf-8") if path else None
def emit(line: str) -> None:
print(line)
if log_file:
log_file.write(line + "\n")
log_file.flush()
return emit, log_file
def heartbeat_offset(data: bytes) -> int | None:
if not data:
return 0
for offset in range(len(HEARTBEAT)):
if all(byte == HEARTBEAT[(offset + index) % len(HEARTBEAT)] for index, byte in enumerate(data)):
return offset
return None
def first_mismatch(data: bytes, offset: int) -> tuple[int, int, int] | None:
for index, byte in enumerate(data):
expected = HEARTBEAT[(offset + index) % len(HEARTBEAT)]
if byte != expected:
return index, byte, expected
return None
def classify_rx(data: bytes) -> tuple[bool, str]:
if not data:
return False, "no RX bytes"
offset = heartbeat_offset(data)
if offset is not None:
full = len(data) // len(HEARTBEAT)
extra = len(data) % len(HEARTBEAT)
return False, f"heartbeat-compatible RX: {len(data)} bytes, offset {offset}, {full} frames + {extra} bytes"
best_offset = min(
range(len(HEARTBEAT)),
key=lambda candidate: sum(
byte != HEARTBEAT[(candidate + index) % len(HEARTBEAT)]
for index, byte in enumerate(data)
),
)
mismatch = first_mismatch(data, best_offset)
if mismatch is None:
return False, "heartbeat-compatible RX"
index, byte, expected = mismatch
return (
True,
f"ANOMALY {len(data)} RX bytes; first mismatch at byte {index}: "
f"got {byte:02X}, heartbeat offset {best_offset} expected {expected:02X}",
)
def read_window(ser: serial.Serial, duration: float) -> bytes:
stop_at = time.monotonic() + duration
data = bytearray()
while time.monotonic() < stop_at:
chunk = ser.read(128)
if chunk:
data.extend(chunk)
return bytes(data)
def emit_rx(emit, label: str, data: bytes) -> bool:
is_anomaly, note = classify_rx(data)
emit(f"{label} {note}")
if is_anomaly:
emit(f"{label} raw {hex_preview(data)}")
return is_anomaly
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Sweep next-stage commands after a known discovery query."
)
parser.add_argument("--port", required=True, help="serial port, for example COM5")
parser.add_argument("--baud", type=int, default=38400)
parser.add_argument("--prefix1", type=parse_byte, default=0x00)
parser.add_argument("--prefix2", type=parse_byte, default=0x00)
parser.add_argument("--primer-command", type=parse_byte, default=0x00)
parser.add_argument("--primer-state", type=parse_byte, default=0x00)
parser.add_argument("--primer-value", type=parse_byte, default=0x80)
parser.add_argument("--query-command", type=parse_byte, default=0xB5)
parser.add_argument("--query-state", type=parse_byte, default=0x00)
parser.add_argument("--query-value", type=parse_byte, default=0x80)
parser.add_argument("--candidates", type=parse_byte_set, required=True)
parser.add_argument("--candidate-state", type=parse_byte, default=0x00)
parser.add_argument("--candidate-value", type=parse_byte, default=0x80)
parser.add_argument("--settle", type=float, default=3.0)
parser.add_argument("--between", type=float, default=0.8)
parser.add_argument("--after-query", type=float, default=1.2)
parser.add_argument("--after-candidate", type=float, default=2.0)
parser.add_argument("--timeout", type=float, default=0.03)
parser.add_argument("--log", help="append sweep log to this file")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--prompt-power-cycle", action="store_true")
parser.add_argument("--prompt-screen", action="store_true")
parser.add_argument("--stop-on-candidate-anomaly", action="store_true")
return parser.parse_args()
def main() -> int:
args = parse_args()
primer = build_frame(
args.prefix1,
args.prefix2,
args.primer_command,
args.primer_state,
args.primer_value,
)
query = build_frame(
args.prefix1,
args.prefix2,
args.query_command,
args.query_state,
args.query_value,
)
candidates = [
(
command,
build_frame(
args.prefix1,
args.prefix2,
command,
args.candidate_state,
args.candidate_value,
),
)
for command in args.candidates
]
if args.dry_run:
print(f"primer cmd 0x{args.primer_command:02X}: {hex_preview(primer)}")
print(f"query cmd 0x{args.query_command:02X}: {hex_preview(query)}")
for command, frame in candidates:
print(f"candidate 0x{command:02X}: {hex_preview(frame)}")
return 0
emit, log_file = make_logger(args.log)
try:
with serial.Serial(
port=args.port,
baudrate=args.baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=args.timeout,
write_timeout=1.0,
rtscts=False,
dsrdtr=False,
xonxoff=False,
) as ser:
emit(
f"Post-discovery sweep: primer {hex_preview(primer)}, "
f"query {hex_preview(query)}, {len(candidates)} candidates "
f"on {ser.port} at {ser.baudrate} 8N1"
)
for index, (command, candidate) in enumerate(candidates, start=1):
if args.prompt_power_cycle:
answer = input(
f"Power-cycle RCP for candidate 0x{command:02X}, wait for heartbeat, "
"then press Enter (q then Enter to stop): "
).strip()
if answer.lower() in {"q", "quit", "stop"}:
emit("Stopped before next candidate.")
break
ser.reset_input_buffer()
emit_rx(emit, f"CANDIDATE 0x{command:02X} BASELINE", read_window(ser, args.settle))
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} TX primer frame {len(primer):03d} {hex_preview(primer)}")
ser.write(primer)
ser.flush()
emit_rx(emit, f"{stamp} PRIMER RX", read_window(ser, args.between))
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} TX query 0x{args.query_command:02X} frame {len(query):03d} {hex_preview(query)}")
ser.write(query)
ser.flush()
emit_rx(emit, f"{stamp} QUERY RX", read_window(ser, args.after_query))
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} TX candidate 0x{command:02X} frame {len(candidate):03d} {hex_preview(candidate)}")
ser.write(candidate)
ser.flush()
candidate_anomaly = emit_rx(
emit,
f"{stamp} CANDIDATE 0x{command:02X} RX",
read_window(ser, args.after_candidate),
)
if args.prompt_screen:
screen = input(
f"Screen after candidate 0x{command:02X} "
"(blank = no change, q = stop): "
).strip()
if screen:
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} SCREEN candidate=0x{command:02X} {screen}")
if screen.lower() in {"q", "quit", "stop"}:
break
if candidate_anomaly and args.stop_on_candidate_anomaly:
emit("Stopping after candidate anomaly.")
break
emit(f"Completed candidate {index}/{len(candidates)}")
except KeyboardInterrupt:
emit("Stopped.")
return 0
except serial.SerialException as exc:
print(f"Serial error: {exc}", file=sys.stderr)
return 1
finally:
if log_file:
log_file.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,278 @@
#!/usr/bin/env python3
"""Sweep RCP-TX7 primer -> candidate query pairs.
The current working model is that a valid host frame primes the RCP and the next
host frame selects a status/query response. This helper sends one primer frame
followed by one candidate frame and classifies raw pin-4 RX against the known
heartbeat stream.
Use --prompt-power-cycle for clean mapping: power-cycle the RCP before each
candidate, press Enter, then the script sends exactly one primer/candidate pair.
"""
from __future__ import annotations
import argparse
import datetime as dt
import sys
import time
try:
import serial
except ImportError:
print(
"Missing dependency: pyserial\n"
"Install it with: python -m pip install pyserial",
file=sys.stderr,
)
raise SystemExit(2)
HEARTBEAT = bytes.fromhex("00 00 00 00 80 DA")
def parse_byte(text: str) -> int:
value = int(text, 0)
if not 0 <= value <= 0xFF:
raise argparse.ArgumentTypeError(f"must be a byte: {text}")
return value
def parse_byte_set(text: str) -> list[int]:
values: list[int] = []
for part in text.replace(",", " ").split():
if "-" in part:
start_text, end_text = part.split("-", 1)
start = parse_byte(start_text)
end = parse_byte(end_text)
if end < start:
raise argparse.ArgumentTypeError(f"bad range: {part}")
values.extend(range(start, end + 1))
else:
values.append(parse_byte(part))
if not values:
raise argparse.ArgumentTypeError("empty byte set")
return list(dict.fromkeys(values))
def build_frame(prefix1: int, prefix2: int, command: int, state: int, value: int) -> bytes:
body = bytes([prefix1, prefix2, command, state, value])
checksum = 0x5A
for byte in body:
checksum ^= byte
return body + bytes([checksum])
def hex_preview(data: bytes) -> str:
return " ".join(f"{byte:02X}" for byte in data)
def make_logger(path: str | None):
log_file = open(path, "a", encoding="utf-8") if path else None
def emit(line: str) -> None:
print(line)
if log_file:
log_file.write(line + "\n")
log_file.flush()
return emit, log_file
def heartbeat_offset(data: bytes) -> int | None:
if not data:
return 0
for offset in range(len(HEARTBEAT)):
if all(byte == HEARTBEAT[(offset + index) % len(HEARTBEAT)] for index, byte in enumerate(data)):
return offset
return None
def first_mismatch(data: bytes, offset: int) -> tuple[int, int, int] | None:
for index, byte in enumerate(data):
expected = HEARTBEAT[(offset + index) % len(HEARTBEAT)]
if byte != expected:
return index, byte, expected
return None
def classify_rx(data: bytes) -> tuple[bool, str]:
if not data:
return False, "no RX bytes"
offset = heartbeat_offset(data)
if offset is not None:
full = len(data) // len(HEARTBEAT)
extra = len(data) % len(HEARTBEAT)
return False, f"heartbeat-compatible RX: {len(data)} bytes, offset {offset}, {full} frames + {extra} bytes"
best_offset = min(
range(len(HEARTBEAT)),
key=lambda candidate: sum(
byte != HEARTBEAT[(candidate + index) % len(HEARTBEAT)]
for index, byte in enumerate(data)
),
)
mismatch = first_mismatch(data, best_offset)
if mismatch is None:
return False, "heartbeat-compatible RX"
index, byte, expected = mismatch
return (
True,
f"ANOMALY {len(data)} RX bytes; first mismatch at byte {index}: "
f"got {byte:02X}, heartbeat offset {best_offset} expected {expected:02X}",
)
def read_window(ser: serial.Serial, duration: float) -> bytes:
stop_at = time.monotonic() + duration
data = bytearray()
while time.monotonic() < stop_at:
chunk = ser.read(128)
if chunk:
data.extend(chunk)
return bytes(data)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Sweep primer->candidate pairs and log non-heartbeat raw RX."
)
parser.add_argument("--port", required=True, help="serial port, for example COM5")
parser.add_argument("--baud", type=int, default=38400)
parser.add_argument("--prefix1", type=parse_byte, default=0x00)
parser.add_argument("--prefix2", type=parse_byte, default=0x00)
parser.add_argument("--primer-command", type=parse_byte, default=0x00)
parser.add_argument("--primer-state", type=parse_byte, default=0x00)
parser.add_argument("--primer-value", type=parse_byte, default=0x80)
parser.add_argument("--candidates", type=parse_byte_set, required=True)
parser.add_argument("--candidate-state", type=parse_byte, default=0x00)
parser.add_argument("--candidate-value", type=parse_byte, default=0x80)
parser.add_argument("--settle", type=float, default=3.0)
parser.add_argument("--between", type=float, default=0.6)
parser.add_argument("--after", type=float, default=1.5)
parser.add_argument("--timeout", type=float, default=0.03)
parser.add_argument("--log", help="append sweep log to this file")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--prompt-power-cycle", action="store_true")
parser.add_argument("--stop-on-anomaly", action="store_true")
return parser.parse_args()
def main() -> int:
args = parse_args()
primer = build_frame(
args.prefix1,
args.prefix2,
args.primer_command,
args.primer_state,
args.primer_value,
)
candidates = [
(
command,
build_frame(
args.prefix1,
args.prefix2,
command,
args.candidate_state,
args.candidate_value,
),
)
for command in args.candidates
]
if args.dry_run:
print(f"primer cmd 0x{args.primer_command:02X}: {hex_preview(primer)}")
for command, frame in candidates:
print(f"candidate 0x{command:02X}: {hex_preview(frame)}")
return 0
emit, log_file = make_logger(args.log)
anomaly_count = 0
try:
with serial.Serial(
port=args.port,
baudrate=args.baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=args.timeout,
write_timeout=1.0,
rtscts=False,
dsrdtr=False,
xonxoff=False,
) as ser:
emit(
f"Primer/candidate sweep: primer {hex_preview(primer)}, "
f"{len(candidates)} candidates on {ser.port} at {ser.baudrate} 8N1"
)
for index, (command, candidate) in enumerate(candidates, start=1):
if args.prompt_power_cycle:
answer = input(
f"Power-cycle RCP for candidate 0x{command:02X}, wait for heartbeat, "
"then press Enter (q then Enter to stop): "
).strip()
if answer.lower() in {"q", "quit", "stop"}:
emit("Stopped before next candidate.")
break
ser.reset_input_buffer()
baseline = read_window(ser, args.settle)
baseline_anomaly, baseline_note = classify_rx(baseline)
emit(f"CANDIDATE 0x{command:02X} BASELINE {baseline_note}")
if baseline_anomaly:
emit(f"CANDIDATE 0x{command:02X} BASELINE raw {hex_preview(baseline)}")
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(f"{stamp} TX primer frame {len(primer):03d} {hex_preview(primer)}")
ser.write(primer)
ser.flush()
primer_rx = read_window(ser, args.between)
primer_anomaly, primer_note = classify_rx(primer_rx)
if primer_anomaly:
anomaly_count += 1
emit(f"{stamp} PRIMER {primer_note}")
emit(f"{stamp} PRIMER raw {hex_preview(primer_rx)}")
if args.stop_on_anomaly:
emit("Stopping after primer anomaly.")
break
stamp = dt.datetime.now().strftime("%H:%M:%S.%f")[:-3]
emit(
f"{stamp} TX candidate 0x{command:02X} "
f"frame {len(candidate):03d} {hex_preview(candidate)}"
)
ser.write(candidate)
ser.flush()
rx = read_window(ser, args.after)
is_anomaly, note = classify_rx(rx)
if is_anomaly:
anomaly_count += 1
emit(f"{stamp} CANDIDATE 0x{command:02X} {note}")
emit(f"{stamp} CANDIDATE 0x{command:02X} raw {hex_preview(rx)}")
if args.stop_on_anomaly:
emit("Stopping after candidate anomaly.")
break
else:
emit(f"{stamp} CANDIDATE 0x{command:02X} {note}")
emit(f"Completed candidate {index}/{len(candidates)}")
emit(f"Anomalies: {anomaly_count}")
except KeyboardInterrupt:
emit("Stopped.")
return 0
except serial.SerialException as exc:
print(f"Serial error: {exc}", file=sys.stderr)
return 1
finally:
if log_file:
log_file.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())