1
0

Compare commits

..

2 Commits

Author SHA1 Message Date
Aiden
d1d924c408 new learnigns 2026-05-26 00:59:38 +10:00
Aiden
443789d6ae bench test updates 2026-05-26 00:48:28 +10:00
17 changed files with 2517 additions and 851 deletions

View File

@@ -45,6 +45,8 @@ To start the current emulator harness:
.\.venv\Scripts\python.exe h8536_emulator_rx_probe.py --preset connect-lcd
.\.venv\Scripts\python.exe scripts\bench_connect_lcd_sequence.py --port COM5 --relay-port COM6 --prompt-screen
.\.venv\Scripts\python.exe scripts\serial_ack_probe.py --ack-frame "05 00 40 00 00 1F"
.\.venv\Scripts\python.exe scripts\serial_scenario.py scenarios\ack-race-000-001.json --log captures\ack-race-000-001.txt --result-json captures\ack-race-000-001-result.json
.\.venv\Scripts\python.exe scripts\serial_scenario.py scenarios\table-sweep-ack-000-07f.json --log captures\table-sweep-ack-000-07f.txt --result-json captures\table-sweep-ack-000-07f-result.json
.\.venv\Scripts\python.exe h8536_emulator_bench_replay.py captures\bench-connect-lcd-sequence-20260525-214411.txt --assert-bench-parity
```
@@ -92,6 +94,8 @@ The real-device bench helper uses `pyserial`; install repo dependencies with `.\
- Includes an RX command probe that boots until SCI1 RXI is serviceable, injects host six-byte frames through RDR/RDRF, can optionally schedule 38400 8N1 byte arrivals at real UART spacing, listens for device TX frames, and reports serial latch/table/LCD-buffer and emulated-LCD effects.
- Includes a bench helper for replaying the emulator-derived CONNECT LCD frame sequence against the real device through COM5, with optional COM6 relay power cycling and timestamped capture logs.
- Includes a bench ACK probe that reproduces the `01 00 00...` -> `01 00 01...` visible retry burst, waits for `07 80 40 20 90 2D`, then sends a candidate command-5 ACK and reports whether the target keeps repeating.
- Includes a checksum-resynchronizing bench receiver that scans RX byte streams for valid six-byte frames, avoids common shifted-heartbeat false locks, and can fall back to the old fixed six-byte slicer with `--sync fixed`.
- Includes a JSON scenario bench runner for repeatable multi-step serial tests, including low-latency ACK-aware command-1 probes that can send the current command-5 ACK candidate immediately after the retry frame appears, with explicit max-ACK/max-target guardrails.
- Includes a bench-log replay harness that feeds recorded host TX frames back into the ROM emulator with bench-style UART byte timing by default and asserts parity against the real device's observed response/LCD state.
Current serial observations:
@@ -227,6 +231,9 @@ python h8536_emulator_rx_probe.py --help
- `h8536_emulator_rx_probe.py --uart-timing --uart-baud 38400 "04 00 00 80 00"`: inject all six host bytes with 8N1 wire spacing of about 260 us per byte, letting RXI/TXI/timers interleave; if the ROM has not cleared `RDRF` before the next byte, the SCI model raises `ORER`.
- `h8536_emulator_rx_probe.py --preset connect-lcd`: replay the current CONNECT LCD activation candidates.
- `scripts\serial_table_dump.py --port COM5 --relay-port COM6 --start 0x000 --count 0x200 --log captures\table-read.txt`: read-only command-1 sweep of the firmware-exposed serial table state for EEPROM/shadow inference.
- `scripts\serial_scenario.py scenarios\ack-race-000-001.json --log captures\ack-race-000-001.txt --result-json captures\ack-race-000-001-result.json`: run the focused `0x000 -> 0x001` retry probe with immediate reactive ACK and a 2 ms poll interval, to test whether command 5 can arrive before the second `07 80 40 20 90 2D` retry.
- `scripts\serial_scenario.py scenarios\early-ack-000-001.json --log captures\early-ack-000-001.txt --result-json captures\early-ack-000-001-result.json`: send the same command-1 pair, then send command-5 ACK immediately without waiting for the retry frame.
- `scripts\serial_scenario.py scenarios\table-sweep-ack-000-07f.json --log captures\table-sweep-ack-000-07f.txt --result-json captures\table-sweep-ack-000-07f-result.json`: run a repeatable bench scenario that sweeps selectors `0x000-0x07F` and sends `05 00 40 00 00 1F` only after `07 80 40 20 90 2D` appears. The checked-in scenario stops if it reaches 8 ACKs or 32 target hits. Use `--sync fixed` only when comparing against the old non-resyncing receiver.
- `scripts\bench_connect_lcd_sequence.py --port COM5 --relay-port COM6 --prompt-screen`: power-cycle the bench device, wait for heartbeat readiness, send `04 00 00 40 00 1E`, `04 00 00 80 00 DE`, `04 00 00 C0 00 9E`, log RX/TX, and prompt for observed LCD text.
- `h8536_emulator_bench_replay.py captures\bench-connect-lcd-sequence-20260525-214411.txt --assert-bench-parity`: replay a real bench log into the emulator using timed UART RX by default and intentionally fail while any response/LCD state still diverges from the bench-observed `CONNECT NOT ACT` plus `07 80 C0 60 20 5D` path. Pass `--polite-rx` for the old wait-until-consumed injection mode.
- Current status: boots from `H'1000`, initializes SCI1, models the traced X24164 EEPROM bus on P9, captures P9 byte candidates, can optionally fast-path known P9 EEPROM routines, schedules FRT1/FRT2 OCIA from timer registers and `--clock-hz`, captures the ROM-driven LCD line ` CONNECT:NOT ACT`, and emits the observed heartbeat frame `00 00 00 00 80 DA`.
@@ -256,6 +263,7 @@ python h8536_emulator_rx_probe.py --help
- `h8536/serial_pseudocode.py`: focused RX/TX protocol pseudocode generation from reconstruction metadata.
- `h8536/protocol_trace.py`: raw six-byte protocol frame decoder/checksum validator.
- `h8536/protocol_capture.py`: timestamped serial capture parser, frame recombiner, and cadence/gate-session analyzer.
- `h8536/serial_scenario.py`: JSON-driven bench scenario engine shared by real-device serial scripts.
- `h8536/serial_gate.py`: autonomous TX gate/queue state-machine reconstruction.
- `h8536/report_source_trace.py`: direct `loc_3E54` report enqueue source tracer.
- `h8536/table_xrefs.py`: table/index xrefs and LCD correlation report generation.
@@ -274,3 +282,4 @@ python h8536_emulator_rx_probe.py --help
- `h8536_emulator.py`, `h8536_emulator_probe.py`, `h8536_emulator_rx_probe.py`, `h8536_emulator_bench_replay.py`: emulator CLI wrappers.
- `scripts/bench_connect_lcd_sequence.py`: real-device COM5/COM6 bench runner for the CONNECT LCD sequence.
- `scripts/serial_table_dump.py`: read-only COM5/COM6 command-1 table sweep for inferring live EEPROM-backed parameter state.
- `scripts/serial_scenario.py`: JSON-driven COM5/COM6 bench scenario runner for chained probes, waits, read sweeps, and ACK-on-target experiments.

File diff suppressed because it is too large Load Diff

View File

@@ -113,26 +113,45 @@ extern volatile u8 MEM8[0x10000];
* - byte4: value_lo (medium) - candidate low byte of a word value
* - byte5: checksum (high) - 0x5A-seeded XOR of bytes 0..4
* dispatch: command_low3 = RX_FRAME(0) & 0x07; observed H'00, H'01, H'02, H'04, H'05, H'06, H'07
* dispatcher split: FAA2 == 0 accepts initial/idle commands H'00, H'01, H'02, H'07; FAA2 != 0 accepts continuation commands H'04, H'05, H'06, H'07
* dispatcher caveat: Initial dispatch follows checksum validation and RX error handling. Command 1 is only on the initial/idle path and is also gated by F861.bit7 == 0.
* dispatch evidence: H'BC08, H'BC0C, H'BC20, H'BC22, H'BC24, H'BC26, H'BC29, H'BC2B, H'BC2E, H'BC30, H'BC45, H'BC47, H'BC4A, H'BC4C, H'BC4F, H'BC51, H'BC54, H'BC56
* index decoder: RX[1:2] -> logical index via loc_622B (medium)
* command candidates:
* - H'00 set_value_acked: candidate write of RX[3:4] into primary/current tables, followed by a response; handler H'BC69; responses response_at_BCCD
* - H'01 read_value: candidate read from the primary table, followed by a response carrying the value; handler H'BCD7; responses response_at_BCFA
* availability: valid checksum/no RX physical error && FAA2 == 0
* - H'01 read_value: initial/idle-path primary table read only, followed by an odd response staging sequence; handler H'BCD7; responses response_at_BCFA
* availability: valid checksum/no RX physical error && FAA2 == 0 && F861.bit7 == 0
* note: Only accepted on the initial/idle dispatcher path: valid checksum/no RX error, FAA2 == 0, and F861.bit7 == 0.
* note: BCD7 stages F850=0x04, writes F851 from F861 and then overwrites F851 from F862.
* note: BCD7 reads the primary table word at E000 + 2*selector; F854 receives the low byte and F853 receives the high byte.
* note: F852 is not freshly written in the BCD7 handler, so do not describe the response as a fixed 04 00 QQ hi lo frame.
* - H'02 clear_or_abort: candidate clear/abort path with no immediate response builder; handler H'BD04; responses none
* availability: valid checksum/no RX physical error && FAA2 == 0
* - H'04 set_value_no_immediate_reply: candidate write/update path that stores a value without an immediate serial response; handler H'BD0E; responses none
* - H'05 ack_or_clear_pending: candidate pending/event acknowledgement path; handler H'BD80; responses none
* availability: valid checksum/no RX physical error && FAA2 != 0
* - H'05 ack_or_clear_pending: continuation-only conditional acknowledgement/session clear path; handler H'BD80; responses none
* availability: valid checksum/no RX physical error && FAA2 != 0
* note: Only accepted on the continuation dispatcher path when FAA2 != 0.
* note: For selector 0x0040, frame 05 00 40 00 00 1F performs no response staging.
* note: The handler clears FAA3/FAA2; F9B5 advances only when FAA2.bit3 was set from a queued report.
* note: If FAA2 == 0, command 5 falls through the initial dispatcher instead of doing acknowledgement work.
* - H'06 set_secondary_value: candidate secondary-table value write path; handler H'BDDB; responses none
* - H'07 retransmit_or_error_reply: candidate retransmit/NAK-style path; error handling also builds command 0x07 responses; handler H'BE05; responses response_at_BE22
* availability: valid checksum/no RX physical error && FAA2 != 0
* - H'07 retransmit_or_error_reply: candidate retransmit path; retry/error handling also builds a command 0x07 RX-payload echo; handler H'BE05; responses response_at_BE22
* availability: valid checksum/no RX physical error && FAA2 == 0 OR valid checksum/no RX physical error && FAA2 != 0
* note: loc_BE4D is a retry/error echo path: F850=0x07 and F861-F864 are copied into F851-F854 before loc_BA26.
* note: Observed frame 07 80 40 20 90 2D means RX bytes F861-F864 were 80 40 20 90; it is not a table value.
* command effects:
* - H'00 set_value_acked: Candidate acknowledged set: writes value bytes to primary/current tables, flags the index, and stages an echo-style response.
* effect: table_write_candidate; target primary_value_table_candidate; source RX[3:4] value bytes, with an observed 0x80 fallback when decoded index is zero; table H'E000
* effect: table_write_candidate; target current_value_table_candidate; source same candidate value written to the primary table; table H'E800
* effect: flag_update_candidate; target per_index_flag_table_candidate; set bit 7; table H'EC00
* evidence: H'BC08, H'BC0C, H'BC20, H'BC22, H'BCB0, H'BCB9, H'BCC1, H'BCC9, H'BCB5, H'BCBD, H'BCC5, H'BCCD
* - H'01 read_value: Candidate read: reads the primary table and stages a value response.
* - H'01 read_value: Initial/idle candidate read: reads the primary table and stages an odd value response with F852 possibly stale.
* effect: table_read_candidate; target primary_value_table_candidate; table H'E000
* effect: response_staging_candidate; stage F850-F854 and call loc_BA26
* evidence: H'BC08, H'BC0C, H'BC24, H'BC26, H'BCB0, H'BCB9, H'BCC1, H'BCC9, H'BCD7, H'BCE0, H'BCE8, H'BCF0, H'BCF6, H'BCB5, H'BCBD, H'BCC5, H'BCDC, H'BCE4, H'BCFA
* evidence: H'BC08, H'BC0C, H'BC24, H'BC26, H'BCD7, H'BCE0, H'BCE8, H'BCF0, H'BCF6, H'BCDC, H'BCE4, H'BCFA
* - H'02 clear_or_abort: Candidate clear/abort: clears serial session state without an observed immediate response.
* effect: state_clear_candidate; target serial_session_flags_candidate; clear bit 7; state H'FAA2
* evidence: H'BC08, H'BC0C, H'BC29, H'BC2B
@@ -140,15 +159,16 @@ extern volatile u8 MEM8[0x10000];
* effect: table_write_candidate; target primary_value_table_candidate; source RX[3:4] value bytes, with an observed 0x80 fallback when decoded index is zero; table H'E000
* effect: flag_update_candidate; target per_index_flag_table_candidate; set bit 7; table H'EC00
* evidence: H'BC08, H'BC0C, H'BC45, H'BC47
* - H'05 ack_or_clear_pending: Candidate acknowledgement/clear: updates pending/event state without an observed immediate response.
* effect: pending_acknowledgement_candidate; target selected event/pending state; clear selected pending flags and then clear serial session state
* - H'05 ack_or_clear_pending: Continuation-only ACK/session clear: clears FAA3/FAA2 and only advances F9B5 when queued-report FAA2.bit3 was set; selector 0x0040 has no response.
* effect: conditional_ack_session_clear_candidate; target selected event/pending state; when FAA2 != 0, clear FAA3/FAA2; advance F9B5 only if FAA2.bit3 was set from queued-report state; selector 0x0040 stages no response; selector H'0040 has no response
* evidence: H'BC08, H'BC0C, H'BC4A, H'BC4C
* - H'06 set_secondary_value: Candidate secondary set: writes value bytes to the secondary table and flags the index.
* effect: table_write_candidate; target secondary_value_table_candidate; source RX[3:4] value bytes; table H'E400
* effect: flag_update_candidate; target per_index_flag_table_candidate; set bit 6; table H'EC00
* evidence: H'BC08, H'BC0C, H'BC4F, H'BC51
* - H'07 retransmit_or_error_reply: Candidate retransmit/error reply: reuses prior TX bytes or builds an explicit 0x07 retry/error response.
* - H'07 retransmit_or_error_reply: Candidate retransmit/error reply: reuses prior TX bytes or copies RX payload bytes behind an explicit 0x07 retry/error echo.
* effect: retransmit_candidate; target TX staging bytes H'F850-H'F854 before loc_BA26; source previous TX frame bytes H'F858-H'F85C
* effect: retry_error_echo_candidate; target F850=0x07, F851-F854=F861-F864 before loc_BA26; source RX payload bytes F861-F864; 07 80 40 20 90 2D echoes RX payload 80 40 20 90; it is not a table value
* effect: response_staging_candidate; stage F850-F854 and call loc_BA26
* evidence: H'BC08, H'BC0C, H'BC2E, H'BC30, H'BC54, H'BC56, H'BE09, H'BE11, H'BE19, H'BE22
* response schemas:
@@ -156,8 +176,11 @@ extern volatile u8 MEM8[0x10000];
* evidence: H'BB1C, H'BB2B, H'BB20, H'BB3F, H'BB39
* - response_at_BCCD: byte0=0x04; byte1=rx[1]; byte2=rx[2]; byte3=rx[3]; byte4=rx[4]
* evidence: H'BCB0, H'BCB9, H'BCC1, H'BCC9
* - response_at_BCFA: byte0=0x04; byte1=rx[2]; byte2=rx[2]; byte3=primary_value_table_candidate; byte4=primary_value_table_candidate
* evidence: H'BCD7, H'BCE8, H'BCC1, H'BCF6, H'BCF0
* - response_at_BCFA: byte0=0x04; byte1=rx[2]; byte2=stale/unchanged; byte3=primary_value_table_candidate; byte4=primary_value_table_candidate
* note: Command 1 BCD7 staging is odd: F850=0x04; F851 is written from F861 then overwritten by F862.
* note: The primary table word is read from E000 + 2*selector; F854/F853 receive low/high value bytes.
* note: F852 may be stale or unchanged in this handler; avoid a fixed 04 00 QQ hi lo response shape.
* evidence: H'BCD7, H'BCE8, H'BCF6, H'BCF0
* - response_at_BE22: byte0=tx[0]; byte1=tx[1]; byte2=tx[2]; byte3=tx[3]; byte4=tx[4]
* evidence: H'BE09, H'BE11, H'BE19
* - ... 1 more candidate response schemas
@@ -186,7 +209,9 @@ extern volatile u8 MEM8[0x10000];
* - ... 7 more state-variable candidates
* retry/error model candidate:
* - checksum path: 0x5A-seeded XOR over RX[0..4] differs from RX[5] -> loc_BE29
* - retry path: counter H'FAA6, threshold 2; Candidate retry path clears/consults serial flags, increments FAA6, compares it with 2, and when still below the apparent limit stages a command 0x07 response.
* - retry path: counter H'FAA6, threshold 2; Candidate retry path clears/consults serial flags, increments FAA6, compares it with 2, and when still below the apparent limit enters loc_BE4D to stage a command 0x07 retry/error echo of RX payload bytes F861-F864.
* echo path: loc_BE4D: F850=0x07; F851-F854=F861-F864
* echo caveat: 07 80 40 20 90 2D echoes RX payload 80 40 20 90 and is not a table value.
* - command 0x07 path: Candidate retransmit/explicit command 0x07 path either copies previous TX frame bytes back to F850-F854 or stages an observed 0x07 response before loc_BA26.
* - evidence: H'BBD8, H'BBDC, H'BBE0, H'BBE4, H'BBE8, H'BBEC, H'BBF0, H'BE4D, H'BE56, H'BE5E, H'BE66, H'BE52, H'BE5A, H'BE62, H'BE6A, H'BE29, H'BE2D, H'BE33, H'BE37, H'BE43, H'BE47, H'BE05, H'BE0D, H'BE15, H'BE09, H'BE11, H'BE19, H'BE22
* gate/queue state machine candidate:
@@ -201,7 +226,7 @@ extern volatile u8 MEM8[0x10000];
* - session_timeout_clears_gate_and_queue: When F9C5 is clear, loc_3FEF clears F9B5/F9B0 and clears FAA5.bit7; when nonzero, it sets FAA5.bit7.
* - idle_heartbeat_gate_initial_delay_loaded: Startup/init loads F9C4 with H'14 before the first idle/default report can be queued.
* - idle_heartbeat_gate_post_send_delay_loaded: loc_BA26 reloads F9C4 with H'07 after each send, matching the observed heartbeat spacing.
* - host_ack_can_advance_queue: Commands 0x05/0x06 are modeled as acknowledgement paths that can clear pending state or advance F9B5.; commands H'05, H'06
* - host_ack_can_advance_queue: Command 0x05 is a continuation-only ACK/session clear path: it clears FAA3/FAA2 and advances F9B5 only when queued-report FAA2.bit3 was set. Selector 0x0040 has no response; if FAA2 == 0 the command falls through instead of doing ACK work.; commands H'05
* - caveat: Many panel controls may require host/session traffic before reporting. Observed autonomous call/camera-power indexes are runtime/capture overlays, not ROM constants.
* - evidence: H'3FD3, H'3FD7, H'3FD9, H'3FDD, H'3FDF, H'3FE3, H'3FE5, H'3FE9, H'3FEB, H'3FEF, H'3FF3, H'3FF5, H'3FF9, H'3FFD, H'4001, H'4003, H'4005, H'4007, H'4046, H'404A, H'404C, H'4050, H'4052, H'4056, H'4058, H'4059, H'405D, H'405F, H'4063, H'4065, H'4067, H'406C, H'4070, H'BAF2, H'BAF6, H'BAF8, H'BAFC, H'BAFE, H'BB00, H'BB04, H'BB06, H'BB08, H'BB0C, H'BB0E, H'BB11, H'BB13, H'BB15, H'BB17, H'BB19, H'BB1C, H'BB20, H'BB24, H'BB26, H'BB29, H'BB2B, H'BB2F, H'BB33, H'BB35, H'BB39, H'BB3D, H'BB3F, H'BB43, H'BE9E, H'BEA2, H'BEA5, H'BEA9, H'BEAD, H'BEAF, H'BEB3, H'BEB5, H'BEB9, H'BEBB, H'BEBF, H'BEC1, H'BEC5, H'BECB, H'BECF, H'BED1, H'BED5
* TX/autonomous report model candidate:
@@ -338,9 +363,14 @@ void sci1_process_candidate_protocol_command(void)
u16 logical_index = sci1_rx_candidate_logical_index();
u16 value = sci1_rx_candidate_value();
bool session_active = MEM8[0xFAA2u] != 0u;
if (!session_active) {
/* Initial/idle dispatcher: valid checksum/no RX error, FAA2 == 0. */
switch (command) {
case 0x00u:
/* set_value_acked: candidate write of RX[3:4] into primary/current tables, followed by a response
* availability: valid checksum/no RX physical error && FAA2 == 0
* candidate effect: table_write_candidate; target primary_value_table_candidate; source RX[3:4] value bytes, with an observed 0x80 fallback when decoded index is zero; table H'E000
* candidate effect: table_write_candidate; target current_value_table_candidate; source same candidate value written to the primary table; table H'E800
* candidate effect: flag_update_candidate; target per_index_flag_table_candidate; set bit 7; table H'EC00
@@ -349,22 +379,54 @@ void sci1_process_candidate_protocol_command(void)
candidate_set_value_acked(logical_index, value);
break;
case 0x01u:
/* read_value: candidate read from the primary table, followed by a response carrying the value
/* read_value: initial/idle-path primary table read only, followed by an odd response staging sequence
* availability: valid checksum/no RX physical error && FAA2 == 0 && F861.bit7 == 0
* note: Only accepted on the initial/idle dispatcher path: valid checksum/no RX error, FAA2 == 0, and F861.bit7 == 0.
* note: BCD7 stages F850=0x04, writes F851 from F861 and then overwrites F851 from F862.
* note: BCD7 reads the primary table word at E000 + 2*selector; F854 receives the low byte and F853 receives the high byte.
* note: F852 is not freshly written in the BCD7 handler, so do not describe the response as a fixed 04 00 QQ hi lo frame.
* candidate effect: table_read_candidate; target primary_value_table_candidate; table H'E000
* candidate effect: response_staging_candidate; stage F850-F854 and call loc_BA26
* evidence: H'BC08, H'BC0C, H'BC24, H'BC26, H'BCB0, H'BCB9, H'BCC1, H'BCC9, H'BCD7, H'BCE0, H'BCE8, H'BCF0, H'BCF6, H'BCB5, H'BCBD, H'BCC5, H'BCDC, H'BCE4, H'BCFA
* evidence: H'BC08, H'BC0C, H'BC24, H'BC26, H'BCD7, H'BCE0, H'BCE8, H'BCF0, H'BCF6, H'BCDC, H'BCE4, H'BCFA
*/
if ((RX_FRAME(1) & 0x80u) != 0u) {
candidate_unknown_command(command, logical_index, value);
break;
}
candidate_read_value(logical_index, value);
break;
case 0x02u:
/* clear_or_abort: candidate clear/abort path with no immediate response builder
* availability: valid checksum/no RX physical error && FAA2 == 0
* candidate effect: state_clear_candidate; target serial_session_flags_candidate; clear bit 7; state H'FAA2
* evidence: H'BC08, H'BC0C, H'BC29, H'BC2B
*/
candidate_clear_or_abort(logical_index, value);
break;
case 0x07u:
/* retransmit_or_error_reply: candidate retransmit path; retry/error handling also builds a command 0x07 RX-payload echo
* availability: valid checksum/no RX physical error && FAA2 == 0 OR valid checksum/no RX physical error && FAA2 != 0
* note: loc_BE4D is a retry/error echo path: F850=0x07 and F861-F864 are copied into F851-F854 before loc_BA26.
* note: Observed frame 07 80 40 20 90 2D means RX bytes F861-F864 were 80 40 20 90; it is not a table value.
* candidate effect: retransmit_candidate; target TX staging bytes H'F850-H'F854 before loc_BA26; source previous TX frame bytes H'F858-H'F85C
* candidate effect: retry_error_echo_candidate; target F850=0x07, F851-F854=F861-F864 before loc_BA26; source RX payload bytes F861-F864; 07 80 40 20 90 2D echoes RX payload 80 40 20 90; it is not a table value
* candidate effect: response_staging_candidate; stage F850-F854 and call loc_BA26
* evidence: H'BC08, H'BC0C, H'BC2E, H'BC30, H'BC54, H'BC56, H'BE09, H'BE11, H'BE19, H'BE22
*/
candidate_retransmit_or_error_reply(logical_index, value);
break;
default:
candidate_unknown_command(command, logical_index, value);
break;
}
return;
}
/* Continuation dispatcher: FAA2 != 0. */
switch (command) {
case 0x04u:
/* set_value_no_immediate_reply: candidate write/update path that stores a value without an immediate serial response
* availability: valid checksum/no RX physical error && FAA2 != 0
* candidate effect: table_write_candidate; target primary_value_table_candidate; source RX[3:4] value bytes, with an observed 0x80 fallback when decoded index is zero; table H'E000
* candidate effect: flag_update_candidate; target per_index_flag_table_candidate; set bit 7; table H'EC00
* evidence: H'BC08, H'BC0C, H'BC45, H'BC47
@@ -372,14 +434,20 @@ void sci1_process_candidate_protocol_command(void)
candidate_set_value_no_immediate_reply(logical_index, value);
break;
case 0x05u:
/* ack_or_clear_pending: candidate pending/event acknowledgement path
* candidate effect: pending_acknowledgement_candidate; target selected event/pending state; clear selected pending flags and then clear serial session state
/* ack_or_clear_pending: continuation-only conditional acknowledgement/session clear path
* availability: valid checksum/no RX physical error && FAA2 != 0
* note: Only accepted on the continuation dispatcher path when FAA2 != 0.
* note: For selector 0x0040, frame 05 00 40 00 00 1F performs no response staging.
* note: The handler clears FAA3/FAA2; F9B5 advances only when FAA2.bit3 was set from a queued report.
* note: If FAA2 == 0, command 5 falls through the initial dispatcher instead of doing acknowledgement work.
* candidate effect: conditional_ack_session_clear_candidate; target selected event/pending state; when FAA2 != 0, clear FAA3/FAA2; advance F9B5 only if FAA2.bit3 was set from queued-report state; selector 0x0040 stages no response; selector H'0040 has no response
* evidence: H'BC08, H'BC0C, H'BC4A, H'BC4C
*/
candidate_ack_or_clear_pending(logical_index, value);
break;
case 0x06u:
/* set_secondary_value: candidate secondary-table value write path
* availability: valid checksum/no RX physical error && FAA2 != 0
* candidate effect: table_write_candidate; target secondary_value_table_candidate; source RX[3:4] value bytes; table H'E400
* candidate effect: flag_update_candidate; target per_index_flag_table_candidate; set bit 6; table H'EC00
* evidence: H'BC08, H'BC0C, H'BC4F, H'BC51
@@ -387,8 +455,12 @@ void sci1_process_candidate_protocol_command(void)
candidate_set_secondary_value(logical_index, value);
break;
case 0x07u:
/* retransmit_or_error_reply: candidate retransmit/NAK-style path; error handling also builds command 0x07 responses
/* retransmit_or_error_reply: candidate retransmit path; retry/error handling also builds a command 0x07 RX-payload echo
* availability: valid checksum/no RX physical error && FAA2 == 0 OR valid checksum/no RX physical error && FAA2 != 0
* note: loc_BE4D is a retry/error echo path: F850=0x07 and F861-F864 are copied into F851-F854 before loc_BA26.
* note: Observed frame 07 80 40 20 90 2D means RX bytes F861-F864 were 80 40 20 90; it is not a table value.
* candidate effect: retransmit_candidate; target TX staging bytes H'F850-H'F854 before loc_BA26; source previous TX frame bytes H'F858-H'F85C
* candidate effect: retry_error_echo_candidate; target F850=0x07, F851-F854=F861-F864 before loc_BA26; source RX payload bytes F861-F864; 07 80 40 20 90 2D echoes RX payload 80 40 20 90; it is not a table value
* candidate effect: response_staging_candidate; stage F850-F854 and call loc_BA26
* evidence: H'BC08, H'BC0C, H'BC2E, H'BC30, H'BC54, H'BC56, H'BE09, H'BE11, H'BE19, H'BE22
*/

View File

@@ -23,12 +23,22 @@ COMMAND7_REPEAT_FRAME = bytes.fromhex("07000000005D")
@dataclass
class FrameDetector:
sync_mode: str = "checksum"
buffer: bytearray = field(default_factory=bytearray)
frames: list[bytes] = field(default_factory=list)
labels: Counter[str] = field(default_factory=Counter)
dropped_bytes: int = 0
resync_events: int = 0
def feed(self, data: bytes) -> list[tuple[bytes, str]]:
self.buffer.extend(data)
if self.sync_mode == "fixed":
return self._feed_fixed()
if self.sync_mode != "checksum":
raise ValueError(f"unknown frame sync mode {self.sync_mode!r}")
return self._feed_checksum_resync()
def _feed_fixed(self) -> list[tuple[bytes, str]]:
detected = []
while len(self.buffer) >= FRAME_LENGTH:
frame = bytes(self.buffer[:FRAME_LENGTH])
@@ -40,6 +50,35 @@ class FrameDetector:
detected.append((frame, label))
return detected
def _feed_checksum_resync(self) -> list[tuple[bytes, str]]:
detected = []
while len(self.buffer) >= FRAME_LENGTH:
offset = _next_sync_offset(self.buffer)
if offset is None:
self._drop_unsynced_prefix(len(self.buffer) - (FRAME_LENGTH - 1))
break
if offset:
self._drop_unsynced_prefix(offset)
frame = bytes(self.buffer[:FRAME_LENGTH])
if not frame_checksum_ok(frame):
self._drop_unsynced_prefix(1)
continue
del self.buffer[:FRAME_LENGTH]
label = label_frame(frame)
self.frames.append(frame)
if label:
self.labels[label] += 1
detected.append((frame, label))
return detected
def _drop_unsynced_prefix(self, count: int) -> None:
count = max(0, min(count, len(self.buffer)))
if not count:
return
del self.buffer[:count]
self.dropped_bytes += count
self.resync_events += 1
class BenchLogger:
def __init__(self, path: Path, stdout: TextIO = sys.stdout) -> None:
@@ -113,10 +152,46 @@ def label_frame(frame: bytes) -> str:
if label:
return label
if frame_checksum_ok(frame):
if frame[0] == 0x04:
return "table_readback_candidate"
if frame[0] == 0x07:
return "visible_report_candidate"
return "checksum_ok_unlabeled"
return "checksum_bad_or_unaligned"
def _next_sync_offset(buffer: bytearray) -> int | None:
scored_offsets: list[tuple[int, int]] = []
for offset in range(0, len(buffer) - FRAME_LENGTH + 1):
frame = bytes(buffer[offset : offset + FRAME_LENGTH])
if not frame_checksum_ok(frame):
continue
if offset == 0 and not _looks_like_shifted_heartbeat(frame):
return 0
label = label_frame(frame)
scored_offsets.append((_sync_score(frame, label), offset))
if not scored_offsets:
return None
return min(scored_offsets)[1]
def _sync_score(frame: bytes, label: str) -> int:
if label and label not in {"checksum_ok_unlabeled", "checksum_bad_or_unaligned"}:
return 0
if frame[0] in {0x00, 0x02, 0x04, 0x07}:
return 100
return 200
def _looks_like_shifted_heartbeat(frame: bytes) -> bool:
return frame in {
bytes.fromhex("00000080DA00"),
bytes.fromhex("000080DA0000"),
bytes.fromhex("0080DA000000"),
bytes.fromhex("80DA00000000"),
}
def default_log_path() -> Path:
return Path("captures") / f"bench-connect-lcd-sequence-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt"
@@ -141,6 +216,7 @@ def build_arg_parser() -> argparse.ArgumentParser:
parser.add_argument("--post-sequence-read", type=float, default=3.0, help="seconds to listen after the sequence")
parser.add_argument("--repeat", type=int, default=1, help="times to send the frame sequence in the same power session")
parser.add_argument("--frame", action="append", type=parse_frame, help="override preset with a custom frame; repeatable")
parser.add_argument("--sync", choices=("checksum", "fixed"), default="checksum", help="RX frame sync strategy")
parser.add_argument("--two-frame", action="store_true", help="send only the first two CONNECT candidate frames")
parser.add_argument("--command7-after", action="store_true", help="send command-7 repeat probe after the sequence")
parser.add_argument("--pre-sequence-drain", type=float, default=0.250, help="seconds to drain/log RX immediately before sending")
@@ -169,7 +245,7 @@ def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
serial = _import_serial()
logger = BenchLogger(log_path, stdout=stdout)
detector = FrameDetector()
detector = FrameDetector(sync_mode=args.sync)
try:
logger.emit("CONNECT LCD bench sequence")
logger.emit(f"device={args.port} {args.baud} 8N1 relay={args.relay_port} {args.relay_baud}")
@@ -186,7 +262,7 @@ def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
_relay_command(relay, args.power_off_command, logger)
time.sleep(args.off_seconds)
device.reset_input_buffer()
detector = FrameDetector()
detector = FrameDetector(sync_mode=args.sync)
_relay_command(relay, args.power_on_command, logger)
else:
device.reset_input_buffer()
@@ -256,9 +332,16 @@ def _read_for(device, detector: FrameDetector, logger: BenchLogger, seconds: flo
waiting = getattr(device, "in_waiting", 0)
data = device.read(waiting or 1)
if data:
dropped_before = detector.dropped_bytes
logger.chunk("RX", data)
for frame, label in detector.feed(data):
logger.event(f"DETECT {label} {format_frame(frame)}")
dropped_now = detector.dropped_bytes - dropped_before
if dropped_now:
logger.event(
f"RESYNC dropped_bytes={dropped_now} total_dropped={detector.dropped_bytes} "
f"buffered={len(detector.buffer)}"
)
def _wait_for_ready(
@@ -310,6 +393,7 @@ def _summary(detector: FrameDetector, logger: BenchLogger) -> None:
logger.emit()
logger.emit("Summary")
logger.emit(f"rx_frames={len(detector.frames)} trailing_unframed_bytes={len(detector.buffer)}")
logger.emit(f"resync_events={detector.resync_events} dropped_bytes={detector.dropped_bytes}")
for label, count in sorted(detector.labels.items()):
logger.emit(f"{label}={count}")

View File

@@ -153,6 +153,7 @@ def _summary(detector: FrameDetector, logger: BenchLogger, target: bytes, trigge
logger.emit()
logger.emit("Summary")
logger.emit(f"rx_frames={len(detector.frames)} trailing_unframed_bytes={len(detector.buffer)}")
logger.emit(f"resync_events={detector.resync_events} dropped_bytes={detector.dropped_bytes}")
logger.emit(f"target_before_ack={sum(1 for frame in trigger_frames if frame == target)}")
logger.emit(f"target_after_ack={sum(1 for frame in post_ack_frames if frame == target)}")
labels = Counter(label for label in detector.labels.elements())

View File

@@ -385,6 +385,17 @@ def _semantics_lines(
" * dispatch: command_low3 = RX_FRAME(0) & 0x07"
+ (f"; observed {values}" if values else ""),
)
state_split = dispatch.get("state_split") or dispatch.get("dispatcher_split")
if isinstance(state_split, dict):
initial = ", ".join(str(item) for item in state_split.get("initial_idle_commands_hex", []) if item)
continuation = ", ".join(str(item) for item in state_split.get("continuation_commands_hex", []) if item)
lines.append(
" * dispatcher split: FAA2 == 0 accepts initial/idle commands "
f"{initial or 'none'}; FAA2 != 0 accepts continuation commands {continuation or 'none'}",
)
caveat = str(state_split.get("caveat") or "").strip()
if caveat:
lines.append(f" * dispatcher caveat: {_comment_text(caveat)}")
if opts.include_evidence:
lines.append(f" * dispatch evidence: {_hex_join(dispatch.get('evidence_addresses_hex'))}")
@@ -405,6 +416,11 @@ def _semantics_lines(
handler = command.get("handler_start_hex") or "multiple"
responses = ", ".join(str(item) for item in command.get("response_candidates", [])) or "none"
lines.append(f" * - {value} {name}: {summary}; handler {handler}; responses {responses}")
availability = _comment_text(str(command.get("availability_summary") or ""))
if availability:
lines.append(f" * availability: {availability}")
for note in command.get("semantic_notes", []):
lines.append(f" * note: {_comment_text(str(note))}")
lines.extend(_command_effect_comment_lines(protocol.get("command_effects"), opts, prefix=" * "))
lines.extend(_response_schema_comment_lines(_schema_list(protocol), opts, prefix=" * "))
lines.extend(_table_map_comment_lines(_table_map_list(protocol), opts, prefix=" * "))
@@ -458,9 +474,51 @@ def _semantics_lines(
" u16 logical_index = sci1_rx_candidate_logical_index();",
" u16 value = sci1_rx_candidate_value();",
"",
" switch (command) {",
],
)
lines.extend(_command_dispatch_switch_lines(commands, opts))
return lines
def _command_dispatch_switch_lines(commands: list[JsonObject], opts: SerialPseudocodeOptions) -> list[str]:
initial = [
command for command in commands
if _command_has_availability(command, "initial_idle_dispatch")
]
continuation = [
command for command in commands
if _command_has_availability(command, "continuation_dispatch")
]
if not initial and not continuation:
return _flat_command_switch_lines(commands, opts, indent=" ") + ["}", ""]
lines = [
" bool session_active = MEM8[0xFAA2u] != 0u;",
"",
" if (!session_active) {",
" /* Initial/idle dispatcher: valid checksum/no RX error, FAA2 == 0. */",
]
lines.extend(_flat_command_switch_lines(initial, opts, indent=" "))
lines.extend(
[
" return;",
" }",
"",
" /* Continuation dispatcher: FAA2 != 0. */",
],
)
lines.extend(_flat_command_switch_lines(continuation, opts, indent=" "))
lines.extend(["}", ""])
return lines
def _flat_command_switch_lines(
commands: list[JsonObject],
opts: SerialPseudocodeOptions,
*,
indent: str,
) -> list[str]:
lines = [f"{indent}switch (command) {{"]
for command in commands:
value = command.get("command_value")
if not isinstance(value, int):
@@ -468,28 +526,50 @@ def _semantics_lines(
name = _safe_identifier(str(command.get("name_candidate") or f"command_{value:02X}"))
summary = _comment_text(str(command.get("summary") or "candidate command semantics unknown"))
evidence = _hex_join(command.get("evidence_addresses_hex"))
lines.append(f" case 0x{value:02X}u:")
lines.append(f" /* {name}: {summary}")
lines.append(f"{indent}case 0x{value:02X}u:")
lines.append(f"{indent} /* {name}: {summary}")
availability = _comment_text(str(command.get("availability_summary") or ""))
if availability:
lines.append(f"{indent} * availability: {availability}")
for note in command.get("semantic_notes", []):
lines.append(f"{indent} * note: {_comment_text(str(note))}")
for effect_line in _command_effect_switch_lines(command):
lines.append(f" * {effect_line}")
lines.append(f"{indent} * {effect_line}")
if opts.include_evidence and evidence:
lines.append(f" * evidence: {evidence}")
lines.append(" */")
lines.append(f" candidate_{name}(logical_index, value);")
lines.append(" break;")
lines.append(f"{indent} * evidence: {evidence}")
lines.append(f"{indent} */")
if value == 0x01:
lines.append(f"{indent} if ((RX_FRAME(1) & 0x80u) != 0u) {{")
lines.append(f"{indent} candidate_unknown_command(command, logical_index, value);")
lines.append(f"{indent} break;")
lines.append(f"{indent} }}")
lines.append(f"{indent} candidate_{name}(logical_index, value);")
lines.append(f"{indent} break;")
lines.extend(
[
" default:",
" candidate_unknown_command(command, logical_index, value);",
" break;",
" }",
"}",
"",
f"{indent}default:",
f"{indent} candidate_unknown_command(command, logical_index, value);",
f"{indent} break;",
f"{indent}}}",
],
)
return lines
def _command_has_availability(command: JsonObject, availability: str) -> bool:
value = command.get("availability")
if value == availability:
return True
if isinstance(value, list) and availability in value:
return True
conditions = " ".join(str(item) for item in command.get("availability_conditions", []))
if availability == "initial_idle_dispatch":
return "FAA2 == 0" in conditions
if availability == "continuation_dispatch":
return "FAA2 != 0" in conditions
return False
def _command_effect_comment_lines(
value: object,
opts: SerialPseudocodeOptions,
@@ -528,6 +608,8 @@ def _response_schema_comment_lines(
response_id = schema.get("response_id") or schema.get("call_address_hex") or "candidate_response"
byte_text = _response_schema_summary(schema)
lines.append(f"{prefix}- {response_id}: {byte_text}")
for note in schema.get("semantic_notes", []):
lines.append(f"{prefix} note: {_comment_text(str(note))}")
evidence = _hex_join(schema.get("evidence_addresses_hex"))
if opts.include_evidence and evidence:
lines.append(f"{prefix} evidence: {evidence}")
@@ -612,6 +694,13 @@ def _retry_error_comment_lines(
threshold = retry.get("threshold_candidate", "?")
summary = _comment_text(str(retry.get("summary") or "candidate retry path"))
lines.append(f"{prefix}- retry path: counter {counter}, threshold {threshold}; {summary}")
echo = retry.get("echo_response_candidate")
if isinstance(echo, dict):
staging = _comment_text(str(echo.get("staging_candidate") or "retry/error echo staging"))
caveat = _comment_text(str(echo.get("observed_frame_caveat") or ""))
lines.append(f"{prefix} echo path: {echo.get('entry_label', 'loc_BE4D')}: {staging}")
if caveat:
lines.append(f"{prefix} echo caveat: {caveat}")
if isinstance(command_07, dict):
summary = _comment_text(str(command_07.get("summary") or "candidate command 0x07 path"))
lines.append(f"{prefix}- command 0x07 path: {summary}")
@@ -987,6 +1076,8 @@ def _effect_summary(effect: JsonObject) -> str:
operation = effect.get("operation_candidate")
table = effect.get("table_base_hex")
state = effect.get("state_address_hex")
selector = effect.get("selector_without_response_hex")
observed = effect.get("observed_frame_caveat")
if target:
parts.append(f"target {target}")
if source:
@@ -997,6 +1088,10 @@ def _effect_summary(effect: JsonObject) -> str:
parts.append(f"table {table}")
if state:
parts.append(f"state {state}")
if selector:
parts.append(f"selector {selector} has no response")
if observed:
parts.append(str(observed))
return _comment_text("; ".join(parts))

509
h8536/serial_scenario.py Normal file
View File

@@ -0,0 +1,509 @@
from __future__ import annotations
import argparse
import json
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, TextIO
from .bench_connect_lcd import (
BenchLogger,
FrameDetector,
_import_serial,
_read_for,
_relay_command,
_relay_settle,
_send_frame,
_wait_for_ready,
format_frame,
frame_checksum_ok,
parse_frame,
)
from .serial_table_dump import build_read_frame, decode_table_read_response
DEFAULT_ACK_TARGET = bytes.fromhex("07804020902D")
DEFAULT_ACK_FRAME = bytes.fromhex("05004000001F")
@dataclass
class ScenarioContext:
args: argparse.Namespace
logger: BenchLogger
detector: FrameDetector
device: Any
relay: Any | None = None
table_rows: list[dict[str, Any]] = field(default_factory=list)
target_counts: dict[str, int] = field(default_factory=dict)
tx_records: list[dict[str, Any]] = field(default_factory=list)
ack_sent: int = 0
abort_requested: bool = False
def default_log_path(scenario: dict[str, Any]) -> Path:
name = str(scenario.get("name") or "serial-scenario").strip() or "serial-scenario"
safe_name = "".join(char if char.isalnum() or char in "-_" else "-" for char in name)
return Path("captures") / f"{safe_name}-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt"
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Run JSON-described serial bench scenarios against the real RCP."
)
parser.add_argument("scenario", type=Path, help="JSON scenario file")
parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP")
parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate")
parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port")
parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate")
parser.add_argument("--no-power-cycle", action="store_true", help="skip power_cycle actions")
parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power")
parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power")
parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the relay port")
parser.add_argument("--sync", choices=("checksum", "fixed"), default="checksum", help="RX frame sync strategy")
parser.add_argument("--log", type=Path, help="capture log path")
parser.add_argument("--result-json", type=Path, help="write machine-readable scenario summary")
parser.add_argument("--dry-run", action="store_true", help="print the plan without opening serial ports")
return parser
def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
args = build_arg_parser().parse_args(argv)
scenario = load_scenario(args.scenario)
log_path = args.log or default_log_path(scenario)
if args.dry_run:
_print_dry_run(args, scenario, log_path, stdout)
return 0
serial = _import_serial()
logger = BenchLogger(log_path, stdout=stdout)
detector = FrameDetector(sync_mode=args.sync)
try:
logger.emit("Serial bench scenario")
logger.emit(f"name={scenario.get('name', args.scenario.stem)}")
logger.emit(f"device={args.port} {args.baud} 8N1 relay={args.relay_port} {args.relay_baud} sync={args.sync}")
logger.emit(f"log={log_path}")
with serial.Serial(args.port, args.baud, bytesize=8, parity="N", stopbits=1, timeout=0.05) as device:
ctx = ScenarioContext(args=args, logger=logger, detector=detector, device=device)
try:
for index, step in enumerate(_scenario_steps(scenario), start=1):
if ctx.abort_requested:
logger.event("SCENARIO_ABORT requested by prior step")
break
action, spec = _normalize_step(step)
logger.event(f"STEP {index} {action}")
_run_step(ctx, action, spec)
finally:
if ctx.relay is not None:
ctx.relay.close()
_emit_summary(ctx, logger)
if args.result_json:
_write_result_json(args.result_json, scenario, log_path, ctx)
return 0
finally:
logger.close()
def load_scenario(path: Path) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as handle:
scenario = json.load(handle)
if not isinstance(scenario, dict):
raise SystemExit("scenario root must be a JSON object")
if not isinstance(scenario.get("steps"), list):
raise SystemExit("scenario must contain a steps array")
return scenario
def _scenario_steps(scenario: dict[str, Any]) -> list[Any]:
steps = scenario.get("steps", [])
if not isinstance(steps, list):
raise SystemExit("scenario steps must be an array")
return steps
def _normalize_step(step: Any) -> tuple[str, dict[str, Any]]:
if isinstance(step, str):
return step, {}
if not isinstance(step, dict):
raise SystemExit(f"invalid scenario step {step!r}")
if "action" in step:
spec = dict(step)
action = str(spec.pop("action"))
return action, spec
if len(step) == 1:
action, value = next(iter(step.items()))
if value is None:
return str(action), {}
if isinstance(value, dict):
return str(action), dict(value)
return str(action), {"value": value}
raise SystemExit(f"scenario step needs an action: {step!r}")
def _run_step(ctx: ScenarioContext, action: str, spec: dict[str, Any]) -> None:
if action == "power_cycle":
_step_power_cycle(ctx, spec)
elif action == "wait_ready":
ready = _wait_for_ready(
ctx.device,
ctx.detector,
ctx.logger,
float(spec.get("timeout", 10.0)),
int(spec.get("heartbeats", 2)),
)
if spec.get("require", False) and not ready:
raise SystemExit(2)
elif action in {"drain", "listen", "wait"}:
_listen(ctx, float(spec.get("seconds", spec.get("value", 0.0))))
elif action == "send":
frame = _parse_required_frame(spec.get("frame"))
label = str(spec.get("label", "send"))
_send_and_record(ctx, frame, label)
if float(spec.get("listen", 0.0)) > 0:
_listen(ctx, float(spec.get("listen", 0.0)))
elif action == "wait_for":
_step_wait_for(ctx, spec)
elif action == "table_sweep":
_step_table_sweep(ctx, spec)
elif action == "repeat":
_step_repeat(ctx, spec)
else:
raise SystemExit(f"unknown scenario action {action!r}")
def _step_power_cycle(ctx: ScenarioContext, spec: dict[str, Any]) -> None:
if ctx.args.no_power_cycle:
ctx.logger.event("POWER_CYCLE skipped by --no-power-cycle")
ctx.device.reset_input_buffer()
ctx.detector = FrameDetector(sync_mode=ctx.args.sync)
return
serial = _import_serial()
if ctx.relay is None:
ctx.relay = serial.Serial(ctx.args.relay_port, ctx.args.relay_baud, timeout=0.25)
_relay_settle(ctx.relay, float(spec.get("relay_settle", ctx.args.relay_settle)), ctx.logger)
off_command = str(spec.get("off_command", ctx.args.power_off_command))
on_command = str(spec.get("on_command", ctx.args.power_on_command))
_relay_command(ctx.relay, off_command, ctx.logger)
time.sleep(float(spec.get("off_seconds", 1.5)))
ctx.device.reset_input_buffer()
ctx.detector = FrameDetector(sync_mode=ctx.args.sync)
_relay_command(ctx.relay, on_command, ctx.logger)
def _step_wait_for(ctx: ScenarioContext, spec: dict[str, Any]) -> None:
targets = _parse_frame_list(spec.get("frames", spec.get("frame")))
timeout = float(spec.get("timeout", 1.0))
require = bool(spec.get("require", False))
ctx.logger.event(
"WAIT_FOR "
+ ",".join(format_frame(frame) for frame in targets)
+ f" timeout={timeout:.3f}s"
)
found = _listen_until(ctx, timeout, targets)
if require and not found:
raise SystemExit(3)
def _step_repeat(ctx: ScenarioContext, spec: dict[str, Any]) -> None:
count = max(0, int(spec.get("count", 1)))
steps = spec.get("steps", [])
if not isinstance(steps, list):
raise SystemExit("repeat step requires a steps array")
for repeat_index in range(count):
ctx.logger.event(f"REPEAT {repeat_index + 1}/{count}")
for step in steps:
action, child_spec = _normalize_step(step)
_run_step(ctx, action, child_spec)
def _step_table_sweep(ctx: ScenarioContext, spec: dict[str, Any]) -> None:
selectors = _selector_list(spec)
gap = float(spec.get("gap", 0.080))
ack = _ack_config(spec.get("ack_on", {}))
ctx.logger.event(
f"TABLE_SWEEP selectors={len(selectors)} gap={gap:.3f}s "
f"ack_targets={len(ack['targets'])} ack_frame={format_frame(ack['frame'])}"
)
for selector in selectors:
if ctx.abort_requested:
ctx.logger.event("TABLE_SWEEP_ABORT stopping before next selector")
break
frame = build_read_frame(selector)
ctx.logger.event(f"READ selector=0x{selector:03X} frame={format_frame(frame)}")
_send_and_record(ctx, frame, f"read_0x{selector:03X}")
_listen_with_ack(ctx, gap, selector, ack)
def _ack_config(raw: Any) -> dict[str, Any]:
spec = raw if isinstance(raw, dict) else {}
targets = _parse_frame_list(spec.get("frames", spec.get("frame", DEFAULT_ACK_TARGET)))
return {
"targets": set(targets),
"frame": _parse_optional_frame(spec.get("ack_frame"), DEFAULT_ACK_FRAME),
"guard": float(spec.get("ack_guard", 0.020)),
"poll_interval": float(spec.get("poll_interval", 0.005)),
"post_read": float(spec.get("post_ack_read", 0.250)),
"once_per_selector": bool(spec.get("once_per_selector", True)),
"enabled": bool(spec.get("enabled", True)),
"max_acks": _optional_int(spec.get("max_acks")),
"max_target_hits": _optional_int(spec.get("max_target_hits")),
"abort_on_limit": bool(spec.get("abort_on_limit", True)),
}
def _listen_with_ack(
ctx: ScenarioContext,
seconds: float,
selector: int,
ack: dict[str, Any],
) -> list[bytes]:
deadline = time.monotonic() + max(0.0, seconds)
observed: list[bytes] = []
acked_targets: set[bytes] = set()
while time.monotonic() < deadline:
frames = _read_available(ctx, selector=selector)
observed.extend(frames)
if not frames:
sleep_for = min(max(0.001, ack["poll_interval"]), max(0.0, deadline - time.monotonic()))
if sleep_for > 0:
time.sleep(sleep_for)
continue
if not ack["enabled"]:
continue
for frame in frames:
if frame not in ack["targets"]:
continue
_count_target(ctx, frame)
if _ack_limit_reached(ctx, ack):
ctx.logger.event("ACK_LIMIT reached before ACK send")
if ack["abort_on_limit"]:
ctx.abort_requested = True
return observed
continue
if ack["once_per_selector"] and frame in acked_targets:
continue
acked_targets.add(frame)
if ack["guard"] > 0:
observed.extend(_listen(ctx, ack["guard"], selector=selector))
_send_and_record(ctx, ack["frame"], "ack")
ctx.ack_sent += 1
if _ack_limit_reached(ctx, ack):
ctx.logger.event("ACK_LIMIT reached after ACK send")
if ack["abort_on_limit"]:
ctx.abort_requested = True
if ack["post_read"] > 0:
observed.extend(_listen(ctx, ack["post_read"], selector=selector))
if ctx.abort_requested:
return observed
return observed
def _listen_until(ctx: ScenarioContext, seconds: float, targets: set[bytes]) -> bool:
deadline = time.monotonic() + max(0.0, seconds)
while time.monotonic() < deadline:
interval = min(0.050, max(0.0, deadline - time.monotonic()))
if interval <= 0:
break
for frame in _listen(ctx, interval):
if frame in targets:
ctx.logger.event(f"WAIT_FOR_MATCH {format_frame(frame)}")
return True
return False
def _listen(ctx: ScenarioContext, seconds: float, *, selector: int | None = None) -> list[bytes]:
before = len(ctx.detector.frames)
_read_for(ctx.device, ctx.detector, ctx.logger, seconds)
frames = ctx.detector.frames[before:]
_record_table_rows(ctx, frames, selector)
return frames
def _read_available(ctx: ScenarioContext, *, selector: int | None = None) -> list[bytes]:
waiting = getattr(ctx.device, "in_waiting", 0)
if not waiting:
return []
dropped_before = ctx.detector.dropped_bytes
data = ctx.device.read(waiting)
if not data:
return []
ctx.logger.chunk("RX", data)
detected = ctx.detector.feed(data)
for frame, label in detected:
ctx.logger.event(f"DETECT {label} {format_frame(frame)}")
dropped_now = ctx.detector.dropped_bytes - dropped_before
if dropped_now:
ctx.logger.event(
f"RESYNC dropped_bytes={dropped_now} total_dropped={ctx.detector.dropped_bytes} "
f"buffered={len(ctx.detector.buffer)}"
)
frames = [frame for frame, _label in detected]
_record_table_rows(ctx, frames, selector)
return frames
def _record_table_rows(ctx: ScenarioContext, frames: list[bytes], selector: int | None) -> None:
for frame in frames:
decoded = decode_table_read_response(frame)
if decoded is None or selector is None:
continue
echo, value = decoded
row = {
"selector": selector,
"echo": echo,
"value": value,
"frame": format_frame(frame),
}
ctx.table_rows.append(row)
ctx.logger.event(f"TABLE selector=0x{selector:03X} echo={echo:02X} value={value:04X}")
def _send_and_record(ctx: ScenarioContext, frame: bytes, label: str) -> None:
_send_frame(ctx.device, frame, ctx.logger, label)
ctx.tx_records.append(
{
"label": label,
"frame": format_frame(frame),
"checksum_ok": frame_checksum_ok(frame),
}
)
def _count_target(ctx: ScenarioContext, frame: bytes) -> None:
text = format_frame(frame)
ctx.target_counts[text] = ctx.target_counts.get(text, 0) + 1
ctx.logger.event(f"ACK_TARGET {text} count={ctx.target_counts[text]}")
def _selector_list(spec: dict[str, Any]) -> list[int]:
if "selectors" in spec:
raw_selectors = spec["selectors"]
if not isinstance(raw_selectors, list):
raise SystemExit("table_sweep selectors must be an array")
return [_int_value(selector) & 0x01FF for selector in raw_selectors]
start = _int_value(spec.get("start", 0))
count = max(0, _int_value(spec.get("count", 0x80)))
return [((start + offset) & 0x01FF) for offset in range(count)]
def _parse_frame_list(raw: Any) -> set[bytes]:
if raw is None:
return set()
values = raw if isinstance(raw, list) else [raw]
return {_parse_required_frame(value) for value in values}
def _parse_required_frame(raw: Any) -> bytes:
if raw is None:
raise SystemExit("frame is required")
if isinstance(raw, bytes):
return raw
if not isinstance(raw, str):
raise SystemExit(f"frame must be a hex string, got {raw!r}")
return parse_frame(raw)
def _parse_optional_frame(raw: Any, default: bytes) -> bytes:
if raw is None:
return default
return _parse_required_frame(raw)
def _int_value(raw: Any) -> int:
if isinstance(raw, int):
return raw
if isinstance(raw, str):
return int(raw, 0)
raise SystemExit(f"expected integer value, got {raw!r}")
def _optional_int(raw: Any) -> int | None:
if raw is None:
return None
return _int_value(raw)
def _ack_limit_reached(ctx: ScenarioContext, ack: dict[str, Any]) -> bool:
max_acks = ack.get("max_acks")
if max_acks is not None and ctx.ack_sent >= max_acks:
return True
max_target_hits = ack.get("max_target_hits")
if max_target_hits is not None and sum(ctx.target_counts.values()) >= max_target_hits:
return True
return False
def _print_dry_run(args: argparse.Namespace, scenario: dict[str, Any], log_path: Path, stdout: TextIO) -> None:
print(f"scenario={scenario.get('name', args.scenario.stem)}", file=stdout)
print(f"device={args.port} {args.baud} 8N1", file=stdout)
print(f"relay={args.relay_port} {args.relay_baud}", file=stdout)
print(f"sync={args.sync}", file=stdout)
print(f"log={log_path}", file=stdout)
for index, step in enumerate(_scenario_steps(scenario), start=1):
action, spec = _normalize_step(step)
print(f"step[{index}]={action}", file=stdout)
if action == "send":
frame = _parse_required_frame(spec.get("frame"))
print(f" frame={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}", file=stdout)
elif action == "table_sweep":
selectors = _selector_list(spec)
ack = _ack_config(spec.get("ack_on", {}))
if selectors:
first = selectors[0]
last = selectors[-1]
print(f" selectors={len(selectors)} first=0x{first:03X} last=0x{last:03X}", file=stdout)
else:
print(" selectors=0", file=stdout)
print(f" gap={float(spec.get('gap', 0.080)):.3f}", file=stdout)
for target in sorted(ack["targets"]):
print(f" ack_target={format_frame(target)}", file=stdout)
print(f" ack_frame={format_frame(ack['frame'])}", file=stdout)
print(f" max_acks={ack['max_acks']} max_target_hits={ack['max_target_hits']}", file=stdout)
def _emit_summary(ctx: ScenarioContext, logger: BenchLogger) -> None:
logger.emit()
logger.emit("Summary")
logger.emit(f"rx_frames={len(ctx.detector.frames)} trailing_unframed_bytes={len(ctx.detector.buffer)}")
logger.emit(f"resync_events={ctx.detector.resync_events} dropped_bytes={ctx.detector.dropped_bytes}")
logger.emit(f"tx_frames={len(ctx.tx_records)} ack_sent={ctx.ack_sent} table_response_rows={len(ctx.table_rows)}")
logger.emit(f"abort_requested={int(ctx.abort_requested)}")
for target, count in sorted(ctx.target_counts.items()):
logger.emit(f"ack_target {target}={count}")
for label, count in sorted(ctx.detector.labels.items()):
logger.emit(f"{label}={count}")
for row in ctx.table_rows:
logger.emit(
f"table selector=0x{row['selector']:03X} echo=0x{row['echo']:02X} value=0x{row['value']:04X}"
)
def _write_result_json(path: Path, scenario: dict[str, Any], log_path: Path, ctx: ScenarioContext) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
result = {
"scenario": scenario.get("name", ""),
"log": str(log_path),
"rx_frames": len(ctx.detector.frames),
"trailing_unframed_bytes": len(ctx.detector.buffer),
"resync_events": ctx.detector.resync_events,
"dropped_bytes": ctx.detector.dropped_bytes,
"labels": dict(ctx.detector.labels),
"tx_frames": ctx.tx_records,
"ack_sent": ctx.ack_sent,
"abort_requested": ctx.abort_requested,
"ack_targets": ctx.target_counts,
"table_rows": ctx.table_rows,
}
path.write_text(json.dumps(result, indent=2, sort_keys=True) + "\n", encoding="utf-8")
__all__ = [
"DEFAULT_ACK_FRAME",
"DEFAULT_ACK_TARGET",
"build_arg_parser",
"load_scenario",
"main",
]

View File

@@ -392,6 +392,9 @@ def _find_command_dispatch(ordered: list[JsonObject]) -> JsonObject | None:
continue
comparisons = _dispatch_comparisons(ordered, index + 1, selector_reg)
state_split = _dispatcher_state_split(ordered, index + 1, comparisons)
if state_split:
_annotate_dispatch_comparison_availability(comparisons, state_split)
command_values = sorted({int(item["command_value"]) for item in comparisons})
candidate = {
"kind": "command_dispatch_candidate",
@@ -413,6 +416,8 @@ def _find_command_dispatch(ordered: list[JsonObject]) -> JsonObject | None:
"command_values": command_values,
"command_values_hex": [_h16(value, width=2) for value in command_values],
"comparisons": comparisons,
"state_split": state_split,
"dispatcher_split": state_split,
"cases": [
{
"value": int(item["command_value"]),
@@ -421,6 +426,8 @@ def _find_command_dispatch(ordered: list[JsonObject]) -> JsonObject | None:
"target_hex": item["handler_start_hex"],
"compare_address": item["compare_address"],
"branch_address": item["branch_address"],
"availability": item.get("availability"),
"availability_conditions": item.get("availability_conditions", []),
}
for item in comparisons
],
@@ -446,6 +453,101 @@ def _find_command_dispatch(ordered: list[JsonObject]) -> JsonObject | None:
return best
def _dispatcher_state_split(
ordered: list[JsonObject],
start_index: int,
comparisons: list[JsonObject],
) -> JsonObject | None:
if not comparisons:
return None
for index in range(start_index, min(len(ordered) - 1, start_index + 24)):
ins = ordered[index]
if not _has_ref_in_range(ins, 0xFAA2, 0xFAA2):
continue
if _mnemonic_root(str(ins.get("mnemonic", ""))) != "TST":
continue
branch = ordered[index + 1]
if _mnemonic_root(str(branch.get("mnemonic", ""))) != "BNE":
continue
targets = _targets(branch)
if not targets:
continue
continuation_start = int(targets[0])
initial_values = sorted(
{
int(item["command_value"])
for item in comparisons
if int(item.get("compare_address", 0)) < continuation_start
}
)
continuation_values = sorted(
{
int(item["command_value"])
for item in comparisons
if int(item.get("compare_address", 0)) >= continuation_start
}
)
if not initial_values and not continuation_values:
continue
return {
"kind": "serial_command_dispatch_state_split",
"state_address": 0xFAA2,
"state_address_hex": _h16(0xFAA2),
"test_address": int(ins["address"]),
"test_address_hex": _h16(int(ins["address"])),
"branch_address": int(branch["address"]),
"branch_address_hex": _h16(int(branch["address"])),
"continuation_target": continuation_start,
"continuation_target_hex": _h16(continuation_start),
"initial_idle_commands": initial_values,
"initial_idle_commands_hex": [_h16(value, width=2) for value in initial_values],
"continuation_commands": continuation_values,
"continuation_commands_hex": [_h16(value, width=2) for value in continuation_values],
"summary": (
"FAA2 == 0 takes the initial/idle dispatcher path; FAA2 != 0 takes "
"the continuation dispatcher path."
),
"caveat": (
"Initial dispatch follows checksum validation and RX error handling. Command 1 "
"is only on the initial/idle path and is also gated by F861.bit7 == 0."
),
"evidence_addresses": [int(ins["address"]), int(branch["address"])],
"evidence_addresses_hex": _hlist([int(ins["address"]), int(branch["address"])]),
}
return None
def _annotate_dispatch_comparison_availability(
comparisons: list[JsonObject],
state_split: Mapping[str, Any],
) -> None:
continuation_start = state_split.get("continuation_target")
if not isinstance(continuation_start, int):
return
for item in comparisons:
compare_address = item.get("compare_address")
if not isinstance(compare_address, int):
continue
command_value = int(item.get("command_value", -1))
if compare_address < continuation_start:
availability = "initial_idle_dispatch"
conditions = [
"valid checksum/no RX physical error",
"FAA2 == 0",
]
if command_value == 0x01:
conditions.append("F861.bit7 == 0")
else:
availability = "continuation_dispatch"
conditions = [
"valid checksum/no RX physical error",
"FAA2 != 0",
]
item["availability"] = availability
item["availability_conditions"] = conditions
item["availability_summary"] = " && ".join(conditions)
def _dispatch_comparisons(
ordered: list[JsonObject],
start_index: int,
@@ -537,6 +639,8 @@ def _command_candidates(
"dispatch_compare_address_hex": comparison["compare_address_hex"],
"dispatch_branch_address": comparison["branch_address"],
"dispatch_branch_address_hex": comparison["branch_address_hex"],
"availability": comparison.get("availability"),
"availability_conditions": comparison.get("availability_conditions", []),
}
if alternative not in command["handler_alternatives"]:
command["handler_alternatives"].append(alternative)
@@ -555,6 +659,30 @@ def _command_candidates(
command["handler_start_hex"] = _h16(starts_for_command[0]) if len(starts_for_command) == 1 else None
command["handler_end"] = ends_for_command[0] if len(ends_for_command) == 1 else None
command["handler_end_hex"] = _h16(ends_for_command[0]) if len(ends_for_command) == 1 else None
availability_conditions = _dedupe_strings(
str(condition)
for alt in alternatives
for condition in alt.get("availability_conditions", [])
if condition
)
availability = _dedupe_strings(
str(alt["availability"])
for alt in alternatives
if alt.get("availability")
)
availability_summaries = _dedupe_strings(
" && ".join(str(condition) for condition in alt.get("availability_conditions", []) if condition)
for alt in alternatives
if alt.get("availability_conditions")
)
command["availability"] = availability[0] if len(availability) == 1 else availability
command["availability_conditions"] = availability_conditions
command["availability_summary"] = (
availability_summaries[0]
if len(availability_summaries) == 1
else " OR ".join(availability_summaries)
)
command["semantic_notes"] = _command_semantic_notes(int(command["command_value"]))
ranges_for_command = [
(alt["handler_start"], alt["handler_end"])
@@ -657,26 +785,47 @@ def _command_name_candidate(value: int) -> str:
def _command_summary(value: int) -> str:
return {
0x00: "candidate write of RX[3:4] into primary/current tables, followed by a response",
0x01: "candidate read from the primary table, followed by a response carrying the value",
0x01: "initial/idle-path primary table read only, followed by an odd response staging sequence",
0x02: "candidate clear/abort path with no immediate response builder",
0x04: "candidate write/update path that stores a value without an immediate serial response",
0x05: "candidate pending/event acknowledgement path",
0x05: "continuation-only conditional acknowledgement/session clear path",
0x06: "candidate secondary-table value write path",
0x07: "candidate retransmit/NAK-style path; error handling also builds command 0x07 responses",
0x07: "candidate retransmit path; retry/error handling also builds a command 0x07 RX-payload echo",
}.get(value, "candidate command semantics are unknown")
def _command_semantic_notes(value: int) -> list[str]:
return {
0x01: [
"Only accepted on the initial/idle dispatcher path: valid checksum/no RX error, FAA2 == 0, and F861.bit7 == 0.",
"BCD7 stages F850=0x04, writes F851 from F861 and then overwrites F851 from F862.",
"BCD7 reads the primary table word at E000 + 2*selector; F854 receives the low byte and F853 receives the high byte.",
"F852 is not freshly written in the BCD7 handler, so do not describe the response as a fixed 04 00 QQ hi lo frame.",
],
0x05: [
"Only accepted on the continuation dispatcher path when FAA2 != 0.",
"For selector 0x0040, frame 05 00 40 00 00 1F performs no response staging.",
"The handler clears FAA3/FAA2; F9B5 advances only when FAA2.bit3 was set from a queued report.",
"If FAA2 == 0, command 5 falls through the initial dispatcher instead of doing acknowledgement work.",
],
0x07: [
"loc_BE4D is a retry/error echo path: F850=0x07 and F861-F864 are copied into F851-F854 before loc_BA26.",
"Observed frame 07 80 40 20 90 2D means RX bytes F861-F864 were 80 40 20 90; it is not a table value.",
],
}.get(value, [])
def _command_effect_summary(value: int, effects: list[JsonObject]) -> str:
if not effects:
return "No structured command effects were inferred."
return {
0x00: "Candidate acknowledged set: writes value bytes to primary/current tables, flags the index, and stages an echo-style response.",
0x01: "Candidate read: reads the primary table and stages a value response.",
0x01: "Initial/idle candidate read: reads the primary table and stages an odd value response with F852 possibly stale.",
0x02: "Candidate clear/abort: clears serial session state without an observed immediate response.",
0x04: "Candidate deferred set: writes value bytes and flags the index without an observed immediate response.",
0x05: "Candidate acknowledgement/clear: updates pending/event state without an observed immediate response.",
0x05: "Continuation-only ACK/session clear: clears FAA3/FAA2 and only advances F9B5 when queued-report FAA2.bit3 was set; selector 0x0040 has no response.",
0x06: "Candidate secondary set: writes value bytes to the secondary table and flags the index.",
0x07: "Candidate retransmit/error reply: reuses prior TX bytes or builds an explicit 0x07 retry/error response.",
0x07: "Candidate retransmit/error reply: reuses prior TX bytes or copies RX payload bytes behind an explicit 0x07 retry/error echo.",
}.get(value, "Candidate effects are inferred from handler-local writes, reads, calls, and response staging.")
@@ -740,9 +889,10 @@ def _command_effects(
{
"kind": "table_read_candidate",
"target_candidate": "primary_value_table_candidate",
"destination_candidate": "response value bytes",
"destination_candidate": "response value bytes F854/F853, with F852 not freshly written by BCD7",
"table_base": 0xE000,
"table_base_hex": _h16(0xE000),
"address_expression_candidate": "E000 + 2*selector",
"evidence_addresses": _table_access_addresses(table_accesses, 0x2000, "read"),
}
)
@@ -781,9 +931,15 @@ def _command_effects(
elif value == 0x05:
add(
{
"kind": "pending_acknowledgement_candidate",
"kind": "conditional_ack_session_clear_candidate",
"target_candidate": "selected event/pending state",
"operation_candidate": "clear selected pending flags and then clear serial session state",
"operation_candidate": (
"when FAA2 != 0, clear FAA3/FAA2; advance F9B5 only if FAA2.bit3 was "
"set from queued-report state; selector 0x0040 stages no response"
),
"selector_without_response_hex": _h16(0x0040),
"requires": ["FAA2 != 0"],
"fallthrough_when": "FAA2 == 0",
"evidence_addresses": _dedupe_ints(
_state_access_addresses(state_accesses, 0xFAA2)
+ _state_access_addresses(state_accesses, 0xFAA3)
@@ -822,6 +978,16 @@ def _command_effects(
"evidence_addresses": _addresses_in_ranges(ordered, ranges, 0xBE05, 0xBE22),
}
)
add(
{
"kind": "retry_error_echo_candidate",
"source_candidate": "RX payload bytes F861-F864",
"destination_candidate": "F850=0x07, F851-F854=F861-F864 before loc_BA26",
"observed_frame_caveat": "07 80 40 20 90 2D echoes RX payload 80 40 20 90; it is not a table value",
"response_candidates": [item for item in response_ids if item == "response_at_BE6A"],
"evidence_addresses": _addresses_in_ranges(ordered, ranges, 0xBE4D, 0xBE6A),
}
)
if response_ids:
add(
@@ -852,6 +1018,10 @@ def _command_effect_aliases(commands: list[JsonObject]) -> list[JsonObject]:
"command_value_hex": command.get("command_value_hex"),
"name_candidate": command.get("name_candidate"),
"summary": command.get("effect_summary", command.get("summary")),
"availability": command.get("availability"),
"availability_conditions": command.get("availability_conditions", []),
"availability_summary": command.get("availability_summary"),
"semantic_notes": command.get("semantic_notes", []),
"effects": command.get("effects", []),
"response_candidates": command.get("response_candidates", []),
"evidence_addresses": command.get("evidence_addresses", []),
@@ -938,6 +1108,7 @@ def _response_candidates(ordered: list[JsonObject]) -> list[JsonObject]:
}
response["schema"] = _response_schema(response)
response["byte_schema"] = response["schema"]["bytes"]
response["semantic_notes"] = response["schema"].get("semantic_notes", [])
responses.append(response)
return responses
@@ -1004,6 +1175,7 @@ def _response_schema(response: Mapping[str, Any]) -> JsonObject:
for addr in item.get("evidence_addresses", [])
if isinstance(addr, int)
)
semantic_notes = _response_schema_semantic_notes(response, bytes_out)
return {
"kind": "response_schema_candidate",
"response_id": response.get("id"),
@@ -1014,6 +1186,7 @@ def _response_schema(response: Mapping[str, Any]) -> JsonObject:
"buffer_end": TX_STAGING_END,
"buffer_end_hex": _h16(TX_STAGING_END),
"bytes": bytes_out,
"semantic_notes": semantic_notes,
"evidence_addresses": evidence_addresses,
"evidence_addresses_hex": _hlist(evidence_addresses),
"confidence": "candidate-medium" if evidence_addresses else "candidate-low",
@@ -1024,6 +1197,30 @@ def _response_schema(response: Mapping[str, Any]) -> JsonObject:
}
def _response_schema_semantic_notes(
response: Mapping[str, Any],
bytes_out: list[JsonObject],
) -> list[str]:
call_address = response.get("call_address")
if call_address == 0xBCFA:
for item in bytes_out:
if item.get("offset") == 2 and item.get("source_kind") == "unknown":
item["source_kind"] = "stale_or_unchanged"
item["source_expression"] = "stale/unchanged"
item["caveat"] = "BCD7 does not freshly write F852 before loc_BA26."
return [
"Command 1 BCD7 staging is odd: F850=0x04; F851 is written from F861 then overwritten by F862.",
"The primary table word is read from E000 + 2*selector; F854/F853 receive low/high value bytes.",
"F852 may be stale or unchanged in this handler; avoid a fixed 04 00 QQ hi lo response shape.",
]
if call_address == 0xBE6A:
return [
"loc_BE4D retry/error echo stages F850=0x07 and copies F861-F864 into F851-F854 before loc_BA26.",
"Observed 07 80 40 20 90 2D echoes RX payload bytes 80 40 20 90; it is not a table-derived value.",
]
return []
def _response_schemas(responses: list[JsonObject]) -> list[JsonObject]:
return [
response["schema"]
@@ -1203,6 +1400,7 @@ def _response_builder_aliases(responses: list[JsonObject]) -> list[JsonObject]:
"call_address": response.get("call_address"),
"call_address_hex": response.get("call_address_hex"),
"writes": writes,
"semantic_notes": response.get("semantic_notes", []),
"evidence_addresses": response.get("evidence_addresses", []),
"evidence_addresses_hex": response.get("evidence_addresses_hex", []),
"confidence": response.get("confidence", "medium"),
@@ -1571,6 +1769,7 @@ def _retry_error_model(ordered: list[JsonObject], responses: list[JsonObject]) -
retry_evidence = _dedupe_ints(
[int(ins["address"]) for ins in retry_path if _has_ref_in_range(ins, 0xFAA2, 0xFAA6)]
+ [int(ins["address"]) for ins in retry_path if _has_ref_in_range(ins, TX_STAGING_START, TX_STAGING_END)]
+ [int(ins["address"]) for ins in retry_path if _has_ref_in_range(ins, RX_FRAME_START + 1, RX_FRAME_START + 4)]
+ [int(ins["address"]) for ins in retry_path if _is_send_builder_call(ins)]
+ _response_evidence_addresses(responses, checksum_error_response)
)
@@ -1606,8 +1805,16 @@ def _retry_error_model(ordered: list[JsonObject], responses: list[JsonObject]) -
"response_candidates": checksum_error_response,
"summary": (
"Candidate retry path clears/consults serial flags, increments FAA6, compares it "
"with 2, and when still below the apparent limit stages a command 0x07 response."
"with 2, and when still below the apparent limit enters loc_BE4D to stage a "
"command 0x07 retry/error echo of RX payload bytes F861-F864."
),
"echo_response_candidate": {
"entry_label": "loc_BE4D",
"entry_address": 0xBE4D,
"entry_address_hex": _h16(0xBE4D),
"staging_candidate": "F850=0x07; F851-F854=F861-F864",
"observed_frame_caveat": "07 80 40 20 90 2D echoes RX payload 80 40 20 90 and is not a table value.",
},
"evidence_addresses": retry_evidence,
"evidence_addresses_hex": _hlist(retry_evidence),
"confidence": "candidate-medium" if retry_counter_evidence else "candidate-low",
@@ -1646,7 +1853,7 @@ def _gate_queue_model(ordered: list[JsonObject], commands: list[JsonObject]) ->
command_ack_values = [
int(command["command_value"])
for command in commands
if command.get("command_value") in {0x05, 0x06}
if command.get("command_value") == 0x05
]
if not evidence and not command_ack_values:
return None
@@ -1774,13 +1981,18 @@ def _gate_queue_model(ordered: list[JsonObject], commands: list[JsonObject]) ->
},
{
"name": "host_ack_can_advance_queue",
"summary": "Commands 0x05/0x06 are modeled as acknowledgement paths that can clear pending state or advance F9B5.",
"summary": (
"Command 0x05 is a continuation-only ACK/session clear path: it clears "
"FAA3/FAA2 and advances F9B5 only when queued-report FAA2.bit3 was set. "
"Selector 0x0040 has no response; if FAA2 == 0 the command falls through "
"instead of doing ACK work."
),
"command_values_hex": [_h16(value, width=2) for value in command_ack_values],
"state_addresses_hex": [_h16(0xF9B5)],
"state_addresses_hex": [_h16(0xFAA2), _h16(0xFAA3), _h16(0xF9B5)],
"evidence_addresses": _dedupe_ints(
addr
for command in commands
if command.get("command_value") in {0x05, 0x06}
if command.get("command_value") == 0x05
for addr in command.get("evidence_addresses", [])
if isinstance(addr, int)
),
@@ -2283,9 +2495,26 @@ def _response_window(ordered: list[JsonObject], call_index: int) -> list[JsonObj
for index in range(call_index - 1, max(-1, call_index - 48), -1):
candidate = ordered[index]
mnemonic = str(candidate.get("mnemonic", "")).upper()
if mnemonic in {"RTS", "RTE"}:
root = _mnemonic_root(mnemonic)
if root in {"RTS", "RTE"}:
break
if candidate.get("kind") == "branch" and mnemonic != "BSR":
if root in {
"BRA",
"BEQ",
"BNE",
"BCS",
"BCC",
"BHI",
"BLS",
"BGE",
"BLT",
"BGT",
"BLE",
"BMI",
"BPL",
"BVS",
"BVC",
}:
break
start = index
return ordered[start:call_index]

View File

@@ -185,6 +185,7 @@ def _summary(
logger.emit()
logger.emit("Summary")
logger.emit(f"rx_frames={len(detector.frames)} table_response_rows={len(table_rows)}")
logger.emit(f"trailing_unframed_bytes={len(detector.buffer)} resync_events={detector.resync_events} dropped_bytes={detector.dropped_bytes}")
for selector, value in table_rows:
logger.emit(f"table selector=0x{selector:03X} value=0x{value:04X}")

View File

@@ -0,0 +1,48 @@
{
"name": "ack-race-000-001",
"notes": [
"Focused command-1 pair that has produced the 07 80 40 20 90 2D retry frame on the bench.",
"Uses immediate reactive ACK with a 2 ms poll interval to test whether command 5 can beat the second retry."
],
"steps": [
{
"action": "power_cycle",
"off_seconds": 1.5
},
{
"action": "wait_ready",
"timeout": 10.0,
"heartbeats": 2,
"require": true
},
{
"action": "drain",
"seconds": 0.25
},
{
"action": "table_sweep",
"selectors": [
"0x000",
"0x001"
],
"gap": 1.0,
"ack_on": {
"frames": [
"07 80 40 20 90 2D"
],
"ack_frame": "05 00 40 00 00 1F",
"ack_guard": 0.0,
"poll_interval": 0.002,
"post_ack_read": 1.0,
"once_per_selector": true,
"max_acks": 4,
"max_target_hits": 8,
"abort_on_limit": true
}
},
{
"action": "listen",
"seconds": 2.0
}
]
}

View File

@@ -0,0 +1,41 @@
{
"name": "early-ack-000-001",
"notes": [
"Manual early-ACK variant: send the command-1 pair and immediately send command-5 ACK instead of waiting for the retry frame.",
"This tests whether the ACK has to arrive before the ROM emits the second retry."
],
"steps": [
{
"action": "power_cycle",
"off_seconds": 1.5
},
{
"action": "wait_ready",
"timeout": 10.0,
"heartbeats": 2,
"require": true
},
{
"action": "drain",
"seconds": 0.25
},
{
"action": "send",
"label": "read_0x000",
"frame": "01 00 00 00 00 5B",
"listen": 0.75
},
{
"action": "send",
"label": "read_0x001",
"frame": "01 00 01 00 00 5A",
"listen": 0.0
},
{
"action": "send",
"label": "early_ack",
"frame": "05 00 40 00 00 1F",
"listen": 2.0
}
]
}

View File

@@ -0,0 +1,46 @@
{
"name": "table-sweep-ack-000-07f",
"notes": [
"Read-only command-1 sweep of live E000 table selectors 0x000-0x07F.",
"If the visible retry frame 07 80 40 20 90 2D appears, send command-5 ACK 05 00 40 00 00 1F."
],
"steps": [
{
"action": "power_cycle",
"off_seconds": 1.5
},
{
"action": "wait_ready",
"timeout": 10.0,
"heartbeats": 2,
"require": true
},
{
"action": "drain",
"seconds": 0.25
},
{
"action": "table_sweep",
"start": "0x000",
"count": "0x080",
"gap": 0.75,
"ack_on": {
"frames": [
"07 80 40 20 90 2D"
],
"ack_frame": "05 00 40 00 00 1F",
"ack_guard": 0.0,
"poll_interval": 0.002,
"post_ack_read": 0.35,
"once_per_selector": true,
"max_acks": 8,
"max_target_hits": 32,
"abort_on_limit": true
}
},
{
"action": "listen",
"seconds": 2.0
}
]
}

View File

@@ -0,0 +1,16 @@
#!/usr/bin/env python3
"""Run JSON-described serial bench scenarios."""
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from h8536.serial_scenario import main
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -43,9 +43,34 @@ class BenchConnectLcdTest(unittest.TestCase):
],
)
def test_detector_resyncs_to_checksum_valid_frame(self):
detector = FrameDetector()
detected = detector.feed(bytes.fromhex("FF0000000080DA"))
self.assertEqual([(format_frame(frame), label) for frame, label in detected], [
("00 00 00 00 80 DA", "heartbeat")
])
self.assertEqual(detector.dropped_bytes, 1)
self.assertEqual(detector.resync_events, 1)
def test_detector_prefers_labeled_heartbeat_over_shifted_valid_window(self):
detector = FrameDetector()
heartbeat = bytes.fromhex("0000000080DA")
detected = detector.feed(heartbeat[1:] + heartbeat)
self.assertEqual([(format_frame(frame), label) for frame, label in detected], [
("00 00 00 00 80 DA", "heartbeat")
])
self.assertEqual(detector.dropped_bytes, 5)
def test_label_frame_marks_unlabeled_checksum_ok_frame(self):
self.assertEqual(label_frame(bytes.fromhex("01000000005B")), "checksum_ok_unlabeled")
def test_label_frame_marks_table_readback_candidate(self):
self.assertEqual(label_frame(bytes.fromhex("04001280804C")), "table_readback_candidate")
def test_label_frame_marks_real_bench_c0_6020_response(self):
self.assertEqual(label_frame(bytes.fromhex("0780C060205D")), "visible_C0_6020_family_candidate")

View File

@@ -196,6 +196,60 @@ class SerialPseudocodeTest(unittest.TestCase):
self.assertIn("case 0x01u:", text)
self.assertIn("candidate_read_value(logical_index, value);", text)
def test_protocol_pseudocode_surfaces_dispatcher_split(self):
payload = candidate_payload()
payload["instructions"] = [
{"address": 0xBC08, "mnemonic": "MOV:G.B", "operands": "@H'F860, R0", "references": [{"address": 0xF860}], "targets": []},
{"address": 0xBC0C, "mnemonic": "AND.B", "operands": "#H'07, R0", "references": [], "targets": []},
{"address": 0xBC0F, "mnemonic": "TST.B", "operands": "@H'FAA2", "references": [{"address": 0xFAA2}], "targets": []},
{"address": 0xBC13, "mnemonic": "BNE", "operands": "loc_BC3A", "references": [], "targets": [0xBC3A]},
{"address": 0xBC19, "mnemonic": "BTST.B", "operands": "#7, @H'F861", "references": [{"address": 0xF861}], "targets": []},
{"address": 0xBC20, "mnemonic": "CMP:E", "operands": "#H'00, R0", "references": [], "targets": []},
{"address": 0xBC22, "mnemonic": "BEQ", "operands": "loc_BC69", "references": [], "targets": [0xBC69]},
{"address": 0xBC24, "mnemonic": "CMP:E", "operands": "#H'01, R0", "references": [], "targets": []},
{"address": 0xBC26, "mnemonic": "BEQ", "operands": "loc_BCD7", "references": [], "targets": [0xBCD7]},
{"address": 0xBC29, "mnemonic": "CMP:E", "operands": "#H'02, R0", "references": [], "targets": []},
{"address": 0xBC2B, "mnemonic": "BEQ", "operands": "loc_BD04", "references": [], "targets": [0xBD04]},
{"address": 0xBC2E, "mnemonic": "CMP:E", "operands": "#H'07, R0", "references": [], "targets": []},
{"address": 0xBC30, "mnemonic": "BEQ", "operands": "loc_BE05", "references": [], "targets": [0xBE05]},
{"address": 0xBC45, "mnemonic": "CMP:E", "operands": "#H'04, R0", "references": [], "targets": []},
{"address": 0xBC47, "mnemonic": "BEQ", "operands": "loc_BD0E", "references": [], "targets": [0xBD0E]},
{"address": 0xBC4A, "mnemonic": "CMP:E", "operands": "#H'05, R0", "references": [], "targets": []},
{"address": 0xBC4C, "mnemonic": "BEQ", "operands": "loc_BD80", "references": [], "targets": [0xBD80]},
{"address": 0xBC4F, "mnemonic": "CMP:E", "operands": "#H'06, R0", "references": [], "targets": []},
{"address": 0xBC51, "mnemonic": "BEQ", "operands": "loc_BDDB", "references": [], "targets": [0xBDDB]},
{"address": 0xBC54, "mnemonic": "CMP:E", "operands": "#H'07, R0", "references": [], "targets": []},
{"address": 0xBC56, "mnemonic": "BEQ", "operands": "loc_BE05", "references": [], "targets": [0xBE05]},
]
text = generate_serial_pseudocode(payload)
self.assertIn("dispatcher split: FAA2 == 0 accepts initial/idle commands H'00, H'01, H'02, H'07; FAA2 != 0 accepts continuation commands H'04, H'05, H'06, H'07", text)
self.assertIn("bool session_active = MEM8[0xFAA2u] != 0u;", text)
self.assertIn("Initial/idle dispatcher", text)
self.assertIn("Continuation dispatcher", text)
self.assertIn("availability: valid checksum/no RX physical error && FAA2 == 0 && F861.bit7 == 0", text)
def test_protocol_pseudocode_surfaces_retry_echo_schema(self):
payload = candidate_payload()
payload["instructions"] = [
{"address": 0xBE4B, "mnemonic": "BRA", "operands": "loc_BE6D", "references": [], "targets": [0xBE6D]},
{"address": 0xBE4D, "mnemonic": "MOV:G.B", "operands": "#H'07, @H'F850", "references": [{"address": 0xF850}], "targets": []},
{"address": 0xBE52, "mnemonic": "MOV:G.B", "operands": "@H'F861, R0", "references": [{"address": 0xF861}], "targets": []},
{"address": 0xBE56, "mnemonic": "MOV:G.B", "operands": "R0, @H'F851", "references": [{"address": 0xF851}], "targets": []},
{"address": 0xBE5A, "mnemonic": "MOV:G.W", "operands": "@H'F862, R0", "references": [{"address": 0xF862}], "targets": []},
{"address": 0xBE5E, "mnemonic": "MOV:G.W", "operands": "R0, @H'F852", "references": [{"address": 0xF852}], "targets": []},
{"address": 0xBE62, "mnemonic": "MOV:G.B", "operands": "@H'F864, R0", "references": [{"address": 0xF864}], "targets": []},
{"address": 0xBE66, "mnemonic": "MOV:G.B", "operands": "R0, @H'F854", "references": [{"address": 0xF854}], "targets": []},
{"address": 0xBE6A, "mnemonic": "BSR", "operands": "loc_BA26", "references": [], "targets": [0xBA26]},
]
text = generate_serial_pseudocode(payload)
self.assertIn("response_at_BE6A: byte0=0x07; byte1=rx[1]; byte2=rx[2]; byte3=rx[3]; byte4=rx[4]", text)
self.assertIn("Observed 07 80 40 20 90 2D echoes RX payload bytes 80 40 20 90", text)
self.assertIn("not a table-derived value", text)
def test_surfaces_refined_semantic_candidates(self):
analysis = {
"protocol_semantics": [

View File

@@ -0,0 +1,50 @@
import io
import json
import tempfile
import unittest
from pathlib import Path
from h8536.serial_scenario import DEFAULT_ACK_FRAME, DEFAULT_ACK_TARGET, main
class SerialScenarioTest(unittest.TestCase):
def test_dry_run_summarizes_ack_aware_table_sweep(self):
scenario = {
"name": "unit-sweep",
"steps": [
{
"action": "table_sweep",
"start": "0x000",
"count": "0x002",
"gap": 0.75,
"ack_on": {
"frames": ["07 80 40 20 90 2D"],
"ack_frame": "05 00 40 00 00 1F",
"max_acks": 8,
"max_target_hits": 32,
},
}
],
}
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "scenario.json"
path.write_text(json.dumps(scenario), encoding="utf-8")
stdout = io.StringIO()
exit_code = main([str(path), "--dry-run"], stdout=stdout)
output = stdout.getvalue()
self.assertEqual(exit_code, 0)
self.assertIn("scenario=unit-sweep", output)
self.assertIn("selectors=2 first=0x000 last=0x001", output)
self.assertIn("ack_target=07 80 40 20 90 2D", output)
self.assertIn("ack_frame=05 00 40 00 00 1F", output)
self.assertIn("max_acks=8 max_target_hits=32", output)
def test_default_ack_frames_match_current_rom_probe_candidate(self):
self.assertEqual(DEFAULT_ACK_TARGET, bytes.fromhex("07804020902D"))
self.assertEqual(DEFAULT_ACK_FRAME, bytes.fromhex("05004000001F"))
if __name__ == "__main__":
unittest.main()

View File

@@ -288,6 +288,8 @@ class SerialSemanticsTest(unittest.TestCase):
command_1_text = semantic_text(command_1)
self.assertIn("read", command_1_text)
self.assertIn("response", command_1_text)
self.assertIn("initial/idle", command_1_text)
self.assertIn("f852", command_1_text)
command_6 = command_item(effects, 0x06)
self.assertIsNotNone(command_6)
@@ -295,6 +297,13 @@ class SerialSemanticsTest(unittest.TestCase):
self.assertIn("write", command_6_text)
self.assertIn("secondary_value_table", command_6_text)
command_5 = command_item(effects, 0x05)
self.assertIsNotNone(command_5)
command_5_text = semantic_text(command_5)
self.assertIn("faa2 != 0", command_5_text)
self.assertIn("0040", command_5_text)
self.assertIn("no response", command_5_text)
def test_planned_response_schema_tracks_immediates_and_rx_copies(self):
semantics = only_semantics(self, planned_semantics_payload())
@@ -391,10 +400,105 @@ class SerialSemanticsTest(unittest.TestCase):
self.assertIn("be9e", gate_text)
self.assertIn("bed5", gate_text)
self.assertIn("f9c5", gate_text)
self.assertIn("commands 0x05", gate_text)
self.assertIn("0x06", gate_text)
self.assertIn("command 0x05", gate_text)
self.assertIn("selector 0x0040 has no response", gate_text)
self.assertIn("faa2 == 0", gate_text)
self.assertIn("not rom constants", gate_text)
def test_actual_dispatch_split_marks_initial_and_continuation_commands(self):
semantics = only_semantics(
self,
base_payload(
[
instruction(0xBC08, "MOV:G.B", "@H'F860, R0", [0xF860]),
instruction(0xBC0C, "AND.B", "#H'07, R0"),
instruction(0xBC0F, "TST.B", "@H'FAA2", [0xFAA2]),
instruction(0xBC13, "BNE", "loc_BC3A", targets=[0xBC3A]),
instruction(0xBC19, "BTST.B", "#7, @H'F861", [0xF861]),
instruction(0xBC20, "CMP:E", "#H'00, R0"),
instruction(0xBC22, "BEQ", "loc_BC69", targets=[0xBC69]),
instruction(0xBC24, "CMP:E", "#H'01, R0"),
instruction(0xBC26, "BEQ", "loc_BCD7", targets=[0xBCD7]),
instruction(0xBC29, "CMP:E", "#H'02, R0"),
instruction(0xBC2B, "BEQ", "loc_BD04", targets=[0xBD04]),
instruction(0xBC2E, "CMP:E", "#H'07, R0"),
instruction(0xBC30, "BEQ", "loc_BE05", targets=[0xBE05]),
instruction(0xBC45, "CMP:E", "#H'04, R0"),
instruction(0xBC47, "BEQ", "loc_BD0E", targets=[0xBD0E]),
instruction(0xBC4A, "CMP:E", "#H'05, R0"),
instruction(0xBC4C, "BEQ", "loc_BD80", targets=[0xBD80]),
instruction(0xBC4F, "CMP:E", "#H'06, R0"),
instruction(0xBC51, "BEQ", "loc_BDDB", targets=[0xBDDB]),
instruction(0xBC54, "CMP:E", "#H'07, R0"),
instruction(0xBC56, "BEQ", "loc_BE05", targets=[0xBE05]),
]
),
)
split = semantics["command_dispatch"]["state_split"]
self.assertEqual(split["initial_idle_commands"], [0, 1, 2, 7])
self.assertEqual(split["continuation_commands"], [4, 5, 6, 7])
command_1 = command_item(semantics["commands"], 0x01)
self.assertIn("FAA2 == 0", command_1["availability_conditions"])
self.assertIn("F861.bit7 == 0", command_1["availability_conditions"])
command_5 = command_item(semantics["commands"], 0x05)
self.assertIn("FAA2 != 0", command_5["availability_conditions"])
def test_actual_bcd7_command1_response_marks_f852_stale(self):
semantics = only_semantics(
self,
base_payload(
[
instruction(0xBCD4, "BRA", "loc_BE6F", targets=[0xBE6F]),
instruction(0xBCD7, "MOV:G.B", "#H'04, @H'F850", [0xF850]),
instruction(0xBCDC, "MOV:G.B", "@H'F861, R0", [0xF861]),
instruction(0xBCE0, "MOV:G.B", "R0, @H'F851", [0xF851]),
instruction(0xBCE4, "MOV:G.B", "@H'F862, R0", [0xF862]),
instruction(0xBCE8, "MOV:G.B", "R0, @H'F851", [0xF851]),
instruction(0xBCEC, "MOV:G.W", "@(-H'2000,R4), R0"),
instruction(0xBCF0, "MOV:G.B", "R0, @H'F854", [0xF854]),
instruction(0xBCF4, "SWAP.B", "R0"),
instruction(0xBCF6, "MOV:G.B", "R0, @H'F853", [0xF853]),
instruction(0xBCFA, "BSR", "loc_BA26", targets=[0xBA26]),
]
),
)
schema = semantics["response_schema"][0]
byte_sources = {item["offset"]: item["source_expression"] for item in schema["bytes"]}
self.assertEqual(byte_sources[0], "0x04")
self.assertEqual(byte_sources[1], "rx[2]")
self.assertEqual(byte_sources[2], "stale/unchanged")
self.assertEqual(byte_sources[3], "primary_value_table_candidate")
self.assertEqual(byte_sources[4], "primary_value_table_candidate")
self.assertIn("avoid a fixed 04 00 qq hi lo", semantic_text(schema))
def test_actual_be4d_retry_error_echo_copies_rx_payload(self):
semantics = only_semantics(
self,
base_payload(
[
instruction(0xBE4B, "BRA", "loc_BE6D", targets=[0xBE6D]),
instruction(0xBE4D, "MOV:G.B", "#H'07, @H'F850", [0xF850]),
instruction(0xBE52, "MOV:G.B", "@H'F861, R0", [0xF861]),
instruction(0xBE56, "MOV:G.B", "R0, @H'F851", [0xF851]),
instruction(0xBE5A, "MOV:G.W", "@H'F862, R0", [0xF862]),
instruction(0xBE5E, "MOV:G.W", "R0, @H'F852", [0xF852]),
instruction(0xBE62, "MOV:G.B", "@H'F864, R0", [0xF864]),
instruction(0xBE66, "MOV:G.B", "R0, @H'F854", [0xF854]),
instruction(0xBE6A, "BSR", "loc_BA26", targets=[0xBA26]),
]
),
)
schema = semantics["response_schema"][0]
byte_sources = {item["offset"]: item["source_expression"] for item in schema["bytes"]}
self.assertEqual(byte_sources, {0: "0x07", 1: "rx[1]", 2: "rx[2]", 3: "rx[3]", 4: "rx[4]"})
self.assertIn("80 40 20 90", semantic_text(schema))
self.assertIn("not a table", semantic_text(schema))
def test_timer_interrupt_model_surfaces_frt2_idle_heartbeat_counter(self):
semantics = only_semantics(
self,