1
0

command advance sweep

This commit is contained in:
Aiden
2026-05-26 15:21:52 +10:00
parent 74a2e2fd2c
commit a48fa0ed18
14 changed files with 821 additions and 78 deletions

View File

@@ -49,6 +49,7 @@ To start the current emulator harness:
.\.venv\Scripts\python.exe h8536_emulator_rx_divergence.py --default-frames --uart-timing --wait-heartbeats 2 --summary-only .\.venv\Scripts\python.exe h8536_emulator_rx_divergence.py --default-frames --uart-timing --wait-heartbeats 2 --summary-only
.\.venv\Scripts\python.exe scripts\bench_connect_lcd_sequence.py --port COM5 --relay-port COM6 --prompt-screen .\.venv\Scripts\python.exe scripts\bench_connect_lcd_sequence.py --port COM5 --relay-port COM6 --prompt-screen
.\.venv\Scripts\python.exe scripts\connect_ok_matrix.py --suite minimal --prompt-observation --result-json captures\connect-ok-minimal-result.json .\.venv\Scripts\python.exe scripts\connect_ok_matrix.py --suite minimal --prompt-observation --result-json captures\connect-ok-minimal-result.json
.\.venv\Scripts\python.exe scripts\connect_ok_advance_sweep.py --suite core --prompt-observation --result-json captures\connect-ok-advance-core-result.json
.\.venv\Scripts\python.exe scripts\serial_ack_probe.py --ack-frame "05 00 40 00 00 1F" .\.venv\Scripts\python.exe scripts\serial_ack_probe.py --ack-frame "05 00 40 00 00 1F"
.\.venv\Scripts\python.exe scripts\serial_scenario.py scenarios\ack-race-000-001.json --log captures\ack-race-000-001.txt --result-json captures\ack-race-000-001-result.json .\.venv\Scripts\python.exe scripts\serial_scenario.py scenarios\ack-race-000-001.json --log captures\ack-race-000-001.txt --result-json captures\ack-race-000-001-result.json
.\.venv\Scripts\python.exe scripts\serial_scenario.py scenarios\table-sweep-ack-000-07f.json --log captures\table-sweep-ack-000-07f.txt --result-json captures\table-sweep-ack-000-07f-result.json .\.venv\Scripts\python.exe scripts\serial_scenario.py scenarios\table-sweep-ack-000-07f.json --log captures\table-sweep-ack-000-07f.txt --result-json captures\table-sweep-ack-000-07f-result.json
@@ -128,6 +129,7 @@ Minimal smoke-test shape:
- Includes an RX command probe that boots until SCI1 RXI is serviceable, injects host six-byte frames through RDR/RDRF, can optionally schedule bench-style UART byte arrivals at real spacing, listens for device TX frames, and reports serial latch/table/LCD-buffer and emulated-LCD effects. - Includes an RX command probe that boots until SCI1 RXI is serviceable, injects host six-byte frames through RDR/RDRF, can optionally schedule bench-style UART byte arrivals at real spacing, listens for device TX frames, and reports serial latch/table/LCD-buffer and emulated-LCD effects.
- Includes a bench helper for replaying the emulator-derived CONNECT LCD frame sequence against the real device through COM5, with optional COM6 relay power cycling and timestamped capture logs. - Includes a bench helper for replaying the emulator-derived CONNECT LCD frame sequence against the real device through COM5, with optional COM6 relay power cycling and timestamped capture logs.
- Includes a CONNECT: OK bench matrix runner that power-cycles between cases and tests the known sequence, single frames, primer pairs, order permutations, inter-frame gaps, repeats, and hold time to separate magic-frame, primer, cadence, and latch behavior. - Includes a CONNECT: OK bench matrix runner that power-cycles between cases and tests the known sequence, single frames, primer pairs, order permutations, inter-frame gaps, repeats, and hold time to separate magic-frame, primer, cadence, and latch behavior.
- Includes a CONNECT: OK advance sweep runner that recovers to the known OK cadence, waits for an active RCP report frame, then sends one candidate continuation/ACK frame so ACK-only, selector-zero-refresh, and command-5 special selectors can be compared from the same baseline.
- Includes a bench ACK probe that reproduces the `01 00 00...` -> `01 00 01...` visible retry burst, waits for `07 80 40 20 90 2D`, then sends a candidate command-5 ACK and reports whether the target keeps repeating. - Includes a bench ACK probe that reproduces the `01 00 00...` -> `01 00 01...` visible retry burst, waits for `07 80 40 20 90 2D`, then sends a candidate command-5 ACK and reports whether the target keeps repeating.
- Includes a checksum-resynchronizing bench receiver that scans RX byte streams for valid six-byte frames, avoids common shifted-heartbeat false locks, and can fall back to the old fixed six-byte slicer with `--sync fixed`. - Includes a checksum-resynchronizing bench receiver that scans RX byte streams for valid six-byte frames, avoids common shifted-heartbeat false locks, and can fall back to the old fixed six-byte slicer with `--sync fixed`.
- Includes a JSON scenario bench runner for repeatable multi-step serial tests, including low-latency ACK-aware command-1 probes that can send the current command-5 ACK candidate immediately after the retry frame appears, with explicit max-ACK/max-target guardrails. - Includes a JSON scenario bench runner for repeatable multi-step serial tests, including low-latency ACK-aware command-1 probes that can send the current command-5 ACK candidate immediately after the retry frame appears, with explicit max-ACK/max-target guardrails.
@@ -148,6 +150,7 @@ Current serial observations:
- Bench serial-format finding: real hardware talks `38400 8E1`. Earlier `8N1` captures primarily exercised SCI1 parity/error handling and retry echoes, not the normal command path. After switching bench scripts to even parity, the selector-zero CONNECT path can reach `CONNECT: OK`. - Bench serial-format finding: real hardware talks `38400 8E1`. Earlier `8N1` captures primarily exercised SCI1 parity/error handling and retry echoes, not the normal command path. After switching bench scripts to even parity, the selector-zero CONNECT path can reach `CONNECT: OK`.
- Bench CONNECT recovery finding: `CONNECT:NOT ACT` is recoverable without a power cycle. This makes it a normal no-active-session/cleared-state display rather than a terminal latch; tests can now probe from the idle NOT ACT state directly, then separately check whether OK is held or needs periodic CCU-like refresh traffic. - Bench CONNECT recovery finding: `CONNECT:NOT ACT` is recoverable without a power cycle. This makes it a normal no-active-session/cleared-state display rather than a terminal latch; tests can now probe from the idle NOT ACT state directly, then separately check whether OK is held or needs periodic CCU-like refresh traffic.
- Bench CONNECT cadence finding: the `40 -> 80 -> C0` sequence stayed at `CONNECT:NOT ACT` with 10 ms, 50 ms, and 150 ms gaps, but produced `CONNECT: OK` then returned to `CONNECT:NOT ACT` with 700 ms and 1.5 s gaps. At 700 ms, single `40`/`80`/`C0` frames did not work, but all tested two-frame pairs did. Repeated `80 -> 80` at about 700 ms also worked, so the values do not need to differ. The no-power-cycle NOT ACT recovery capture produced repeated `02 00 02 00 00 5A` OK-path responses before heartbeat traffic resumed. - Bench CONNECT cadence finding: the `40 -> 80 -> C0` sequence stayed at `CONNECT:NOT ACT` with 10 ms, 50 ms, and 150 ms gaps, but produced `CONNECT: OK` then returned to `CONNECT:NOT ACT` with 700 ms and 1.5 s gaps. At 700 ms, single `40`/`80`/`C0` frames did not work, but all tested two-frame pairs did. Repeated `80 -> 80` at about 700 ms also worked, so the values do not need to differ. The no-power-cycle NOT ACT recovery capture produced repeated `02 00 02 00 00 5A` OK-path responses before heartbeat traffic resumed.
- ROM report-source finding: the active `02/01 ...` frames exposed during CONNECT OK attempts are autonomous `F870 -> BAF2 -> BA26` report-queue transmissions, not ordinary command-1 readbacks. The ROM sets `FAA2.3/FAA3.7` after sending them, so the CCU probably needs to answer in that continuation window with command `4`, `5`, or `6` to consume the report queue and keep the session alive.
- Board/P9 finding: traced MCU pin 62 `P91` reaches X24164 pin 6 `SCL`, and MCU pin 68 `P97` reaches the shared X24164 pin 5 `SDA` node. The emulator now treats the ROM's `C121/C08B/C0DB/C10C/C142` P9 routines as an X24164-style two-wire EEPROM bus, with ROM logical addresses `0x000-0x7FF` on the `H'A0/H'A1` control-byte family and `0x800-0xFFF` on `H'E0/H'E1`. - Board/P9 finding: traced MCU pin 62 `P91` reaches X24164 pin 6 `SCL`, and MCU pin 68 `P97` reaches the shared X24164 pin 5 `SDA` node. The emulator now treats the ROM's `C121/C08B/C0DB/C10C/C142` P9 routines as an X24164-style two-wire EEPROM bus, with ROM logical addresses `0x000-0x7FF` on the `H'A0/H'A1` control-byte family and `0x800-0xFFF` on `H'E0/H'E1`.
- EEPROM role finding: `loc_40BB` checks `P7DR.7` and the `F402 == H'6B6F` signature before defaulting EEPROM/shadow tables; `loc_4103` writes ROM default words through `BFE0`, `loc_41D2` reads sixteen 8-byte records into `F7B0-F82F`, and the command-4 path at `BD2B-BD5F` can persist serial table writes when `F76E.7` is set. - EEPROM role finding: `loc_40BB` checks `P7DR.7` and the `F402 == H'6B6F` signature before defaulting EEPROM/shadow tables; `loc_4103` writes ROM default words through `BFE0`, `loc_41D2` reads sixteen 8-byte records into `F7B0-F82F`, and the command-4 path at `BD2B-BD5F` can persist serial table writes when `F76E.7` is set.
- EEPROM layout finding: `build\rom_eeprom_layout.txt` currently identifies the ROM factory table at `H'C964-H'CA63`, the F400 shadow defaults, page 0 offset `0x000-0x007` as the signature/options header (`00 00 6B 6F FE 00 00 00`), pages 1-F offset `0x00-0x07` as blank-by-default record slots, and 89 selector mappings from the `H'C564` table into F400/EEPROM offsets. `F404` defaults to `H'FE00` and is tested as option/feature bits, while `F76E` combines persistence enable, dispatch suppression, and low-nibble EEPROM page selection. - EEPROM layout finding: `build\rom_eeprom_layout.txt` currently identifies the ROM factory table at `H'C964-H'CA63`, the F400 shadow defaults, page 0 offset `0x000-0x007` as the signature/options header (`00 00 6B 6F FE 00 00 00`), pages 1-F offset `0x00-0x07` as blank-by-default record slots, and 89 selector mappings from the `H'C564` table into F400/EEPROM offsets. `F404` defaults to `H'FE00` and is tested as option/feature bits, while `F76E` combines persistence enable, dispatch suppression, and low-nibble EEPROM page selection.
@@ -286,6 +289,7 @@ python h8536_emulator_rx_divergence.py --help
- `--target-frame "00 00 00 00 80 DA"`: compare staged/emitted TX bytes against an expected six-byte frame. - `--target-frame "00 00 00 00 80 DA"`: compare staged/emitted TX bytes against an expected six-byte frame.
- `h8536_emulator_rx_probe.py "04 00 00 80 00"`: append the checksum, inject the host frame through SCI1 RX, and summarize responses. - `h8536_emulator_rx_probe.py "04 00 00 80 00"`: append the checksum, inject the host frame through SCI1 RX, and summarize responses.
- `h8536_emulator_rx_probe.py --uart-timing --uart-baud 38400 "04 00 00 80 00"`: inject all six host bytes with bench-style wire spacing of about 260 us per byte, letting RXI/TXI/timers interleave; if the ROM has not cleared `RDRF` before the next byte, the SCI model raises `ORER`. The real bench link is `8E1`. - `h8536_emulator_rx_probe.py --uart-timing --uart-baud 38400 "04 00 00 80 00"`: inject all six host bytes with bench-style wire spacing of about 260 us per byte, letting RXI/TXI/timers interleave; if the ROM has not cleared `RDRF` before the next byte, the SCI model raises `ORER`. The real bench link is `8E1`.
- `h8536_emulator_rx_probe.py --uart-timing --uart-format 8E1 --tx-wire-timing --wait-heartbeats 2 --post-frame-ms 700 "04 00 00 80 00 DE" "04 00 00 80 00 DE"`: replay the CONNECT refresh shape after heartbeat readiness and keep the emulator running for a bench-scale gap after each frame. The RAM trace now tags interesting accesses with the executing ROM PC, models SCI1 TDRE/TXI at 8E1 character time, and reports whether X24164 EEPROM bytes were written.
- `h8536_emulator_rx_probe.py --preset connect-lcd`: replay the current CONNECT LCD activation candidates. - `h8536_emulator_rx_probe.py --preset connect-lcd`: replay the current CONNECT LCD activation candidates.
- `h8536_emulator_rx_divergence.py --default-frames --uart-timing --wait-heartbeats 2 --summary-only`: run the focused RX divergence trace for the bench mismatch. It flags whether a frame reached cmd0 `BC69`, cmd1 `BCD7`, retry echo, command-7 replay, autonomous `BAF2` report output, or the TX/RX overlap-collapse path. - `h8536_emulator_rx_divergence.py --default-frames --uart-timing --wait-heartbeats 2 --summary-only`: run the focused RX divergence trace for the bench mismatch. It flags whether a frame reached cmd0 `BC69`, cmd1 `BCD7`, retry echo, command-7 replay, autonomous `BAF2` report output, or the TX/RX overlap-collapse path.
- `scripts\serial_table_dump.py --port COM5 --relay-port COM6 --start 0x000 --count 0x200 --log captures\table-read.txt`: read-only command-1 sweep of the firmware-exposed serial table state for EEPROM/shadow inference. Bench serial scripts default to `8E1` because the ROM initializes SCI1 as async 8-bit even parity, 1 stop; pass `--parity N` only when reproducing older 8N1 captures. - `scripts\serial_table_dump.py --port COM5 --relay-port COM6 --start 0x000 --count 0x200 --log captures\table-read.txt`: read-only command-1 sweep of the firmware-exposed serial table state for EEPROM/shadow inference. Bench serial scripts default to `8E1` because the ROM initializes SCI1 as async 8-bit even parity, 1 stop; pass `--parity N` only when reproducing older 8N1 captures.

View File

@@ -419,10 +419,17 @@ Current interpretation:
- `CONNECT:NOT ACT` is a normal no-active-session/cleared-state display, not a terminal latch. - `CONNECT:NOT ACT` is a normal no-active-session/cleared-state display, not a terminal latch.
- `CONNECT: OK` is table/state driven, probably selector-zero active/connect state. - `CONNECT: OK` is table/state driven, probably selector-zero active/connect state.
- `0x8080` at selector zero is the strongest known active/connect value. - `0x8080` at selector zero is the strongest known active/connect value.
- The panel likely expects the CCU to keep seeding or refreshing state after entering OK. - The panel likely expects the CCU to consume/ACK report-queue frames and keep seeding or refreshing state after entering OK.
- The working fake-CCU sequence is probably not "three frames as fast as possible"; it appears to need CCU-like cadence or a live session window, roughly on the heartbeat/report timescale. - The working fake-CCU sequence is probably not "three frames as fast as possible"; it appears to need CCU-like cadence or a live session window, roughly on the heartbeat/report timescale.
- A single selector-zero continuation-shaped frame is insufficient in the current tests; two selector-zero writes at the working cadence are enough. They do not need to carry different values, because `80 -> 80` also worked. - A single selector-zero continuation-shaped frame is insufficient in the current tests; two selector-zero writes at the working cadence are enough. They do not need to carry different values, because `80 -> 80` also worked.
ROM report-source update:
- The active `02/01 ...` frames seen during CONNECT OK attempts are best modeled as `F870 -> BAF2 -> BA26` report-queue transmissions.
- `BAF2` dequeues a report word, encodes the first three TX bytes, reads the payload from `E800 + 2*selector`, and `BA26` appends the `0x5A` XOR checksum.
- After sending a queued report, the ROM sets `FAA2.3` and `FAA3.7`; command `4`, `5`, or `6` can then consume/advance the report only while that continuation latch is live.
- This makes a reactive fake-CCU test more valuable than another blind fixed-delay matrix: recover to OK, wait for the first active report, then send one candidate continuation/ACK frame.
## Candidate CCU Seed Values ## Candidate CCU Seed Values
These are syntactically valid host frames produced from ROM table mining. Use them as candidate fake-CCU state seeds, not as final protocol truth. These are syntactically valid host frames produced from ROM table mining. Use them as candidate fake-CCU state seeds, not as final protocol truth.
@@ -513,6 +520,19 @@ Test whether OK is held:
.\.venv\Scripts\python.exe scripts\connect_ok_matrix.py --suite hold --parity E --prompt-observation --result-json captures\connect-ok-hold-result.json .\.venv\Scripts\python.exe scripts\connect_ok_matrix.py --suite hold --parity E --prompt-observation --result-json captures\connect-ok-hold-result.json
``` ```
Sweep strong continuation/ACK candidates after recovering to `CONNECT: OK`:
```powershell
.\.venv\Scripts\python.exe scripts\connect_ok_advance_sweep.py --suite core --parity E --prompt-observation --result-json captures\connect-ok-advance-core-result.json
```
Candidate suites:
- `core`: `05 00 40 00 00 1F` pure command-5 report-consume candidate, then `04 00 00 80 00 DE` selector-zero refresh/consume candidate.
- `special`: command-5 `0x006C/0x006D/0x006E`, which call the ROM's `BE70` queue helper.
- `latch`: command-5 `0x006B/0x0096/0x0097/0x00C6/0x00F8`, which can clear the `F731/F790` latch bits when that path is live.
- `all`: all of the above.
Current matrix result summary: Current matrix result summary:
```text ```text
@@ -554,6 +574,7 @@ This fits the real panel behavior:
- Whether the CCU sends a periodic refresh stream after CONNECT OK. - Whether the CCU sends a periodic refresh stream after CONNECT OK.
- Exact hold time before OK falls back to NOT ACT, if no refresh traffic follows. - Exact hold time before OK falls back to NOT ACT, if no refresh traffic follows.
- Whether command 4 CONNECT success depends on an existing continuation latch, timing, or a side effect created by earlier frames. - Whether command 4 CONNECT success depends on an existing continuation latch, timing, or a side effect created by earlier frames.
- Whether the remaining emulator/bench mismatch is an SCI timing bug or a missing ROM/peripheral behavior. The emulator now has an optional 8E1 TX wire-timing mode; with it enabled, repeated `80` no longer immediately reaches `CONNECT: OK`, which is closer to bench behavior but still needs calibration.
- How EEPROM option bits change selector behavior. - How EEPROM option bits change selector behavior.
- Whether all old visible `07...` families can be reproduced under `8E1`. - Whether all old visible `07...` families can be reproduced under `8E1`.
@@ -561,11 +582,16 @@ This fits the real panel behavior:
1. Finish the CONNECT matrix runs: 1. Finish the CONNECT matrix runs:
- rerun `hold` with 700 ms gaps to measure how long OK remains without refresh traffic. - rerun `hold` with 700 ms gaps to measure how long OK remains without refresh traffic.
2. Test whether periodic `80` refreshes hold CONNECT OK, and find the longest safe refresh interval. 2. Run the reactive advance sweep from OK and compare:
3. Dump selector table state before and after CONNECT OK. - ACK-only `05 00 40 00 00 1F`
4. Seed selectors `0x003`, `0x040`, and `0x0F6` after selector-zero OK and watch lamps/readouts. - refresh/consume `04 00 00 80 00 DE`
5. Mine selector dispatch handlers for known UI text terms: `IRIS`, `GAIN`, `SHUTTER`, `BARS`, `BLACK`, `CALL`, `AUTO`, `DIAG`. - command-5 special selectors `0x006C/0x006D/0x006E`
6. Build a fake-CCU streamer that repeatedly writes a small selector set and logs which RCP reports appear. 3. Test whether periodic report ACKs plus periodic `80` refreshes hold CONNECT OK.
4. Sweep emulator RX phase/gap with `--tx-wire-timing` and compare where `BD0E` is reached or missed.
5. Dump selector table state before and after CONNECT OK.
6. Seed selectors `0x003`, `0x040`, and `0x0F6` after selector-zero OK and watch lamps/readouts.
7. Mine selector dispatch handlers for known UI text terms: `IRIS`, `GAIN`, `SHUTTER`, `BARS`, `BLACK`, `CALL`, `AUTO`, `DIAG`.
8. Build a fake-CCU streamer that repeatedly writes a small selector set and logs which RCP reports appear.
## Source Files And Reports ## Source Files And Reports
@@ -588,5 +614,6 @@ Useful tools:
- `h8536_emulator_state_search.py` - `h8536_emulator_state_search.py`
- `scripts/bench_connect_lcd_sequence.py` - `scripts/bench_connect_lcd_sequence.py`
- `scripts/connect_ok_matrix.py` - `scripts/connect_ok_matrix.py`
- `scripts/connect_ok_advance_sweep.py`
- `scripts/serial_table_dump.py` - `scripts/serial_table_dump.py`
- `scripts/serial_scenario.py` - `scripts/serial_scenario.py`

View File

@@ -0,0 +1,442 @@
from __future__ import annotations
import argparse
import json
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, TextIO
from .bench_connect_lcd import (
BenchLogger,
FrameDetector,
add_serial_format_args,
_import_serial,
open_device_serial,
_read_for,
_relay_command,
_relay_settle,
_send_frame,
_wait_for_ready,
format_frame,
frame_checksum_ok,
label_frame,
parse_frame,
serial_format_label,
)
from .connect_ok_matrix import FRAME_80_OK
ACK_0040 = bytes.fromhex("05004000001F")
REFRESH_OK = bytes.fromhex("0400008000DE")
ACK_006B = bytes.fromhex("05006B000034")
ACK_006C = bytes.fromhex("05006C000033")
ACK_006D = bytes.fromhex("05006D000032")
ACK_006E = bytes.fromhex("05006E000031")
ACK_0096 = bytes.fromhex("050116000048")
ACK_0097 = bytes.fromhex("050117000049")
ACK_00C6 = bytes.fromhex("050146000018")
ACK_00F8 = bytes.fromhex("050178000026")
DEFAULT_BASELINE = (FRAME_80_OK, FRAME_80_OK)
HEARTBEAT_FRAME = bytes.fromhex("0000000080DA")
@dataclass(frozen=True)
class AdvanceCase:
name: str
frame: bytes
note: str = ""
def default_log_path(suite: str) -> Path:
safe_suite = "".join(char if char.isalnum() or char in "-_" else "-" for char in suite)
return Path("captures") / f"connect-ok-advance-{safe_suite}-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt"
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=(
"Recover the RCP to CONNECT: OK, then sweep one candidate continuation/ACK "
"frame from the active report window."
)
)
parser.add_argument("--suite", choices=("core", "special", "latch", "all"), default="core")
parser.add_argument("--case", action="append", help="run only matching case names; repeatable")
parser.add_argument("--limit", type=int, help="run only the first N selected cases")
parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP")
parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate")
add_serial_format_args(parser)
parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port")
parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate")
parser.add_argument("--no-power-cycle", action="store_true", help="do not power-cycle before the sweep starts")
parser.add_argument(
"--power-cycle-between-cases",
action="store_true",
help="power-cycle before each candidate instead of recovering from the current state",
)
parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power")
parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power")
parser.add_argument("--off-seconds", type=float, default=1.5, help="seconds to hold DUT power off")
parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the relay port")
parser.add_argument("--ready-timeout", type=float, default=10.0, help="seconds to wait for heartbeat")
parser.add_argument("--ready-heartbeats", type=int, default=2, help="heartbeat frames to observe before a case")
parser.add_argument("--require-ready", action="store_true", help="abort a case if readiness heartbeats are not observed")
parser.add_argument("--pre-case-drain", type=float, default=0.250, help="seconds to drain/log RX before baseline")
parser.add_argument(
"--baseline-frame",
action="append",
type=parse_frame,
help="override baseline with custom frame; repeatable; default is two selector-zero OK frames",
)
parser.add_argument("--baseline-gap", type=float, default=0.700, help="seconds to listen between baseline frames")
parser.add_argument(
"--target-mode",
choices=("active", "connect-ok", "non-heartbeat", "none"),
default="active",
help="which device frame opens the candidate-send window",
)
parser.add_argument("--target-timeout", type=float, default=2.5, help="seconds to wait for the target window")
parser.add_argument("--candidate-guard", type=float, default=0.020, help="seconds to wait after target before candidate")
parser.add_argument(
"--send-on-target-timeout",
action="store_true",
help="send the candidate even if no target-mode frame was observed",
)
parser.add_argument("--post-candidate-read", type=float, default=5.0, help="seconds to listen after candidate")
parser.add_argument("--candidate", action="append", type=_parse_candidate, help="custom candidate as name=frame or frame")
parser.add_argument("--sync", choices=("checksum", "fixed"), default="checksum", help="RX frame sync strategy")
parser.add_argument("--prompt-observation", action="store_true", help="prompt for observed LCD/lamp state after each case")
parser.add_argument("--pause-between-cases", action="store_true", help="wait for Enter before the next case")
parser.add_argument("--log", type=Path, help="capture log path")
parser.add_argument("--result-json", type=Path, help="write machine-readable case summary")
parser.add_argument("--dry-run", action="store_true", help="print selected cases without opening serial ports")
return parser
def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
args = build_arg_parser().parse_args(argv)
cases = select_cases(args)
baseline = tuple(args.baseline_frame or DEFAULT_BASELINE)
log_path = args.log or default_log_path(args.suite)
if args.dry_run:
_print_dry_run(args, cases, baseline, log_path, stdout)
return 0
serial = _import_serial()
logger = BenchLogger(log_path, stdout=stdout)
results: list[dict[str, Any]] = []
try:
logger.emit("CONNECT: OK advance sweep")
logger.emit(
f"suite={args.suite} cases={len(cases)} device={args.port} {args.baud} {serial_format_label(args)} "
f"relay={args.relay_port} {args.relay_baud} sync={args.sync}"
)
logger.emit(f"log={log_path}")
_emit_baseline_plan(logger, baseline, args.baseline_gap)
with open_device_serial(serial, args) as device:
relay = None
try:
if not args.no_power_cycle or args.power_cycle_between_cases:
relay = serial.Serial(args.relay_port, args.relay_baud, timeout=0.25)
_relay_settle(relay, args.relay_settle, logger)
if not args.no_power_cycle and not args.power_cycle_between_cases:
_power_cycle(args, device, relay, logger)
for index, case in enumerate(cases, start=1):
if args.pause_between_cases and index > 1:
input(f"Press Enter to start case {index}/{len(cases)}: {case.name}")
result = _run_case(args, device, relay, logger, case, baseline, index, len(cases))
results.append(result)
if args.require_ready and not result["ready"]:
logger.event("ABORT readiness was required")
break
finally:
if relay is not None:
relay.close()
_emit_summary(logger, results)
if args.result_json:
_write_result_json(args.result_json, log_path, args, baseline, results)
return 0
finally:
logger.close()
def select_cases(args: argparse.Namespace) -> list[AdvanceCase]:
cases = list(args.candidate or build_cases(args.suite))
if args.case:
filters = [item.lower() for item in args.case]
cases = [case for case in cases if any(fragment in case.name.lower() for fragment in filters)]
if args.limit is not None:
cases = cases[: max(0, args.limit)]
if not cases:
raise SystemExit("no advance cases selected")
return cases
def build_cases(suite: str) -> list[AdvanceCase]:
core = [
AdvanceCase("ack-0040", ACK_0040, "pure command-5 continuation ACK candidate"),
AdvanceCase("refresh-ok", REFRESH_OK, "command-4 continuation ACK plus selector-zero 0x8080 refresh"),
]
special = [
AdvanceCase("ack-006c", ACK_006C, "command-5 special BE70 selector candidate"),
AdvanceCase("ack-006d", ACK_006D, "command-5 special BE70 selector candidate"),
AdvanceCase("ack-006e", ACK_006E, "command-5 special BE70 selector candidate"),
]
latch = [
AdvanceCase("ack-006b", ACK_006B, "command-5 latch-clear special selector candidate"),
AdvanceCase("ack-0096", ACK_0096, "command-5 F731/F790 latch-clear candidate"),
AdvanceCase("ack-0097", ACK_0097, "command-5 F731/F790 latch-clear candidate"),
AdvanceCase("ack-00c6", ACK_00C6, "command-5 F731/F790 latch-clear candidate"),
AdvanceCase("ack-00f8", ACK_00F8, "command-5 F731/F790 latch-clear candidate"),
]
suites = {
"core": core,
"special": special,
"latch": latch,
"all": core + special + latch,
}
return suites[suite]
def _run_case(
args: argparse.Namespace,
device: Any,
relay: Any | None,
logger: BenchLogger,
case: AdvanceCase,
baseline: tuple[bytes, ...],
index: int,
total: int,
) -> dict[str, Any]:
detector = FrameDetector(sync_mode=args.sync)
logger.emit()
logger.emit(f"CASE {index}/{total} {case.name}")
logger.emit(f"note={case.note or '(none)'}")
logger.emit(f"candidate={format_frame(case.frame)} checksum_ok={int(frame_checksum_ok(case.frame))}")
logger.emit(
f"target_mode={args.target_mode} target_timeout={args.target_timeout:.3f}s "
f"candidate_guard={args.candidate_guard:.3f}s"
)
if args.power_cycle_between_cases:
_power_cycle(args, device, relay, logger)
else:
device.reset_input_buffer()
logger.event("POWER_CYCLE skipped for case; recovering with baseline")
ready = _wait_for_ready(device, detector, logger, args.ready_timeout, args.ready_heartbeats)
if args.require_ready and not ready:
observation = _prompt_observation(args, logger, case)
return _case_result(case, detector, ready=ready, target=None, candidate_sent=False, observation=observation)
if args.pre_case_drain > 0:
logger.event(f"DRAIN before baseline {args.pre_case_drain:.3f}s")
_read_for(device, detector, logger, args.pre_case_drain)
for frame_index, frame in enumerate(baseline, start=1):
_send_frame(device, frame, logger, f"{case.name}.baseline{frame_index}")
_read_for(device, detector, logger, args.baseline_gap)
target = _wait_for_target_mode(device, detector, logger, args.target_mode, args.target_timeout)
candidate_sent = target is not None or args.target_mode == "none" or args.send_on_target_timeout
if candidate_sent:
if args.candidate_guard > 0:
logger.event(f"CANDIDATE guard {args.candidate_guard:.3f}s")
_read_for(device, detector, logger, args.candidate_guard)
_send_frame(device, case.frame, logger, f"{case.name}.candidate")
else:
logger.event("CANDIDATE skipped because target window was not observed")
if args.post_candidate_read > 0:
logger.event(f"POST_CANDIDATE_READ {args.post_candidate_read:.3f}s")
_read_for(device, detector, logger, args.post_candidate_read)
observation = _prompt_observation(args, logger, case)
result = _case_result(case, detector, ready=ready, target=target, candidate_sent=candidate_sent, observation=observation)
logger.event(
f"CASE_RESULT {case.name} ready={int(ready)} target_seen={int(target is not None)} "
f"candidate_sent={int(candidate_sent)} rx_frames={result['rx_frames']} "
f"labels={json.dumps(result['labels'], sort_keys=True)}"
)
return result
def _wait_for_target_mode(
device: Any,
detector: FrameDetector,
logger: BenchLogger,
mode: str,
timeout_seconds: float,
) -> bytes | None:
if mode == "none":
logger.event("TARGET disabled")
return None
logger.event(f"WAIT target_mode={mode} timeout={timeout_seconds:.3f}s")
start_index = len(detector.frames)
deadline = time.monotonic() + max(0.0, timeout_seconds)
while time.monotonic() < deadline:
before = len(detector.frames)
_read_for(device, detector, logger, 0.050)
for frame in detector.frames[max(start_index, before) :]:
if _matches_target(frame, mode):
logger.event(f"TARGET seen {format_frame(frame)} label={label_frame(frame)}")
return frame
logger.event("TARGET_TIMEOUT")
return None
def _matches_target(frame: bytes, mode: str) -> bool:
label = label_frame(frame)
if mode == "connect-ok":
return label in {"connect_ok_path_response_candidate", "connect_c0_path_response_candidate"}
if mode == "non-heartbeat":
return frame_checksum_ok(frame) and frame != HEARTBEAT_FRAME
if mode == "active":
return frame_checksum_ok(frame) and frame != HEARTBEAT_FRAME and frame[0] in {0x01, 0x02, 0x07}
raise ValueError(f"unknown target mode {mode!r}")
def _power_cycle(args: argparse.Namespace, device: Any, relay: Any | None, logger: BenchLogger) -> None:
if relay is None:
raise SystemExit("relay was not opened")
_relay_command(relay, args.power_off_command, logger)
time.sleep(max(0.0, args.off_seconds))
device.reset_input_buffer()
_relay_command(relay, args.power_on_command, logger)
def _case_result(
case: AdvanceCase,
detector: FrameDetector,
*,
ready: bool,
target: bytes | None,
candidate_sent: bool,
observation: str,
) -> dict[str, Any]:
return {
"name": case.name,
"note": case.note,
"frame": format_frame(case.frame),
"ready": ready,
"target_seen": target is not None,
"target": format_frame(target) if target is not None else "",
"candidate_sent": candidate_sent,
"rx_frames": len(detector.frames),
"labels": dict(detector.labels),
"resync_events": detector.resync_events,
"dropped_bytes": detector.dropped_bytes,
"trailing_unframed_bytes": len(detector.buffer),
"observation": observation,
}
def _prompt_observation(args: argparse.Namespace, logger: BenchLogger, case: AdvanceCase) -> str:
if not args.prompt_observation:
return ""
prompt = f"{case.name}: LCD/lamps/readouts observation, or Enter to skip: "
observation = input(prompt).strip()
logger.event(f"OBSERVATION {case.name}: {observation or '(no note)'}")
return observation
def _emit_baseline_plan(logger: BenchLogger, baseline: tuple[bytes, ...], gap: float) -> None:
logger.emit(f"baseline_gap={gap:.3f}s baseline_frames={len(baseline)}")
for index, frame in enumerate(baseline, start=1):
logger.emit(f"baseline[{index}]={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}")
def _emit_summary(logger: BenchLogger, results: list[dict[str, Any]]) -> None:
logger.emit()
logger.emit("Advance Sweep Summary")
logger.emit(f"cases={len(results)}")
for result in results:
labels = ", ".join(f"{key}={value}" for key, value in sorted(result["labels"].items())) or "no_rx_frames"
note = result["observation"] or "(no observation)"
logger.emit(
f"{result['name']}: ready={int(result['ready'])} target_seen={int(result['target_seen'])} "
f"candidate_sent={int(result['candidate_sent'])} rx_frames={result['rx_frames']} "
f"{labels} observation={note}"
)
def _write_result_json(
path: Path,
log_path: Path,
args: argparse.Namespace,
baseline: tuple[bytes, ...],
results: list[dict[str, Any]],
) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"suite": args.suite,
"log": str(log_path),
"serial_format": f"{args.baud} {serial_format_label(args)}",
"baseline": [format_frame(frame) for frame in baseline],
"baseline_gap": args.baseline_gap,
"target_mode": args.target_mode,
"candidate_guard": args.candidate_guard,
"cases": results,
}
path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def _print_dry_run(
args: argparse.Namespace,
cases: list[AdvanceCase],
baseline: tuple[bytes, ...],
log_path: Path,
stdout: TextIO,
) -> None:
print(f"suite={args.suite}", file=stdout)
print(f"cases={len(cases)}", file=stdout)
print(f"device={args.port} {args.baud} {serial_format_label(args)}", file=stdout)
print(f"relay={args.relay_port} {args.relay_baud}", file=stdout)
print(f"power_cycle_start={int(not args.no_power_cycle)}", file=stdout)
print(f"power_cycle_between_cases={int(args.power_cycle_between_cases)}", file=stdout)
print(f"target_mode={args.target_mode} target_timeout={args.target_timeout:.3f}s", file=stdout)
print(f"candidate_guard={args.candidate_guard:.3f}s post_candidate_read={args.post_candidate_read:.3f}s", file=stdout)
print(f"baseline_gap={args.baseline_gap:.3f}s", file=stdout)
for index, frame in enumerate(baseline, start=1):
print(f"baseline[{index}]={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}", file=stdout)
print(f"log={log_path}", file=stdout)
for index, case in enumerate(cases, start=1):
print(f"case[{index}]={case.name} frame={format_frame(case.frame)} checksum_ok={int(frame_checksum_ok(case.frame))}", file=stdout)
if case.note:
print(f" note={case.note}", file=stdout)
def _parse_candidate(text: str) -> AdvanceCase:
if "=" in text:
name, frame_text = text.split("=", 1)
name = name.strip()
if not name:
raise argparse.ArgumentTypeError("custom candidate name cannot be empty")
else:
name = "custom"
frame_text = text
return AdvanceCase(name=name, frame=parse_frame(frame_text), note="custom candidate")
__all__ = [
"ACK_0040",
"ACK_006B",
"ACK_006C",
"ACK_006D",
"ACK_006E",
"ACK_0096",
"ACK_0097",
"ACK_00C6",
"ACK_00F8",
"AdvanceCase",
"REFRESH_OK",
"build_arg_parser",
"build_cases",
"main",
"select_cases",
]

View File

@@ -178,6 +178,8 @@ class ReplayConfig:
clock_hz: int = 10_000_000 clock_hz: int = 10_000_000
uart_timing: bool = True uart_timing: bool = True
uart_baud: int = 38_400 uart_baud: int = 38_400
uart_format: str = "8E1"
tx_wire_timing: bool = True
p9_fast_path: bool = True p9_fast_path: bool = True
p9_fast_input: int = 0xFF p9_fast_input: int = 0xFF
p9_fast_optimistic_wrapper: bool = False p9_fast_optimistic_wrapper: bool = False
@@ -248,6 +250,9 @@ def run_bench_replay(log_path: Path, *, rom_path: Path | None = None, config: Re
p9_fast_default_wrapper_success=config.p9_fast_optimistic_wrapper, p9_fast_default_wrapper_success=config.p9_fast_optimistic_wrapper,
p7_input=config.p7_input, p7_input=config.p7_input,
eeprom_seed=config.eeprom_seed, eeprom_seed=config.eeprom_seed,
sci1_tx_timing=UartTiming.from_format(config.uart_format, baud=config.uart_baud)
if config.tx_wire_timing
else None,
) )
context = RunContext() context = RunContext()
@@ -258,6 +263,7 @@ def run_bench_replay(log_path: Path, *, rom_path: Path | None = None, config: Re
f"rx_serviceable={int(_rx_ready(emulator))} " f"rx_serviceable={int(_rx_ready(emulator))} "
f"sci1_priority={_sci1_priority(emulator)} interrupt_mask={_interrupt_mask(emulator)} " f"sci1_priority={_sci1_priority(emulator)} interrupt_mask={_interrupt_mask(emulator)} "
f"clock_hz={emulator.clock_hz} " f"clock_hz={emulator.clock_hz} "
f"uart_format={config.uart_format.upper()} tx_wire_timing={int(config.tx_wire_timing)} "
f"lcd_display={emulator.memory.lcd.display_text(lines=4)!r}" f"lcd_display={emulator.memory.lcd.display_text(lines=4)!r}"
) )
@@ -270,7 +276,7 @@ def run_bench_replay(log_path: Path, *, rom_path: Path | None = None, config: Re
gap_frames = tuple(emulator.sci1.tx_frames[tx_frame_start_before_delay:]) gap_frames = tuple(emulator.sci1.tx_frames[tx_frame_start_before_delay:])
tx_frame_start = len(emulator.sci1.tx_frames) tx_frame_start = len(emulator.sci1.tx_frames)
if config.uart_timing: if config.uart_timing:
timing = UartTiming(baud=config.uart_baud) timing = UartTiming.from_format(config.uart_format, baud=config.uart_baud)
steps_during_rx, inject_reason = _inject_frame_uart_timed( steps_during_rx, inject_reason = _inject_frame_uart_timed(
emulator, emulator,
host.frame, host.frame,
@@ -367,7 +373,9 @@ def build_arg_parser() -> argparse.ArgumentParser:
parser.add_argument("--interval-steps", type=int, default=ReplayConfig.interval_steps) parser.add_argument("--interval-steps", type=int, default=ReplayConfig.interval_steps)
parser.add_argument("--clock-hz", type=lambda text: int(text, 0), default=ReplayConfig.clock_hz) parser.add_argument("--clock-hz", type=lambda text: int(text, 0), default=ReplayConfig.clock_hz)
parser.add_argument("--uart-baud", type=lambda text: int(text, 0), default=ReplayConfig.uart_baud, help="baud rate for bench-style UART injection") parser.add_argument("--uart-baud", type=lambda text: int(text, 0), default=ReplayConfig.uart_baud, help="baud rate for bench-style UART injection")
parser.add_argument("--uart-format", default=ReplayConfig.uart_format, help="UART character format for bench-style timing; real RCP link is 8E1")
parser.add_argument("--polite-rx", action="store_true", help="wait for each RX byte to be consumed before injecting the next byte") parser.add_argument("--polite-rx", action="store_true", help="wait for each RX byte to be consumed before injecting the next byte")
parser.add_argument("--no-tx-wire-timing", action="store_true", help="use the legacy tiny TDRE delay instead of modeled UART TX character time")
parser.add_argument("--frt1-ocia-steps", type=int, default=ReplayConfig.frt1_ocia_steps) parser.add_argument("--frt1-ocia-steps", type=int, default=ReplayConfig.frt1_ocia_steps)
parser.add_argument("--frt2-ocia-steps", type=int, default=ReplayConfig.frt2_ocia_steps) parser.add_argument("--frt2-ocia-steps", type=int, default=ReplayConfig.frt2_ocia_steps)
parser.add_argument("--no-p9-fast-path", action="store_true", help="disable shortcut handling for known P9 routines") parser.add_argument("--no-p9-fast-path", action="store_true", help="disable shortcut handling for known P9 routines")
@@ -395,6 +403,8 @@ def main(argv: list[str] | None = None) -> int:
clock_hz=args.clock_hz, clock_hz=args.clock_hz,
uart_timing=not args.polite_rx, uart_timing=not args.polite_rx,
uart_baud=args.uart_baud, uart_baud=args.uart_baud,
uart_format=args.uart_format,
tx_wire_timing=not args.no_tx_wire_timing,
p9_fast_path=not args.no_p9_fast_path, p9_fast_path=not args.no_p9_fast_path,
p9_fast_input=args.p9_fast_input, p9_fast_input=args.p9_fast_input,
p9_fast_optimistic_wrapper=args.p9_fast_optimistic_wrapper, p9_fast_optimistic_wrapper=args.p9_fast_optimistic_wrapper,

View File

@@ -35,6 +35,7 @@ class MemoryAccess:
value: int value: int
kind: str kind: str
region: str region: str
pc: int | None = None
class MemoryMap: class MemoryMap:
@@ -48,6 +49,7 @@ class MemoryMap:
self.external: dict[int, int] = {} self.external: dict[int, int] = {}
self.port_inputs: dict[int, int] = {P7DR: p7_input & 0xFF} self.port_inputs: dict[int, int] = {P7DR: p7_input & 0xFF}
self.access_log: list[MemoryAccess] = [] self.access_log: list[MemoryAccess] = []
self.current_pc: int | None = None
self._set_register(SCI1_SMR, self.sci1.smr) self._set_register(SCI1_SMR, self.sci1.smr)
self._set_register(SCI1_BRR, self.sci1.brr) self._set_register(SCI1_BRR, self.sci1.brr)
@@ -185,7 +187,7 @@ class MemoryMap:
return ((latch & ddr) | (pins & ~ddr)) & 0xFF return ((latch & ddr) | (pins & ~ddr)) & 0xFF
def _log(self, kind: str, address: int, size: int, value: int) -> None: def _log(self, kind: str, address: int, size: int, value: int) -> None:
self.access_log.append(MemoryAccess(address, size, value, kind, self.region(address).name)) self.access_log.append(MemoryAccess(address, size, value, kind, self.region(address).name, self.current_pc))
def describe_regions(regions: Iterable[MemoryRegion] = MEMORY_REGIONS) -> str: def describe_regions(regions: Iterable[MemoryRegion] = MEMORY_REGIONS) -> str:

View File

@@ -41,6 +41,7 @@ from .fast_paths import P9FastPath, P9FastPathConfig
from .memory import MemoryMap from .memory import MemoryMap
from .sci import SCI1 from .sci import SCI1
from .timers import FrtOciaScheduler, FrtRegisters from .timers import FrtOciaScheduler, FrtRegisters
from .uart import UartTiming
@dataclass @dataclass
@@ -94,6 +95,7 @@ class H8536Emulator:
p9_fast_default_wrapper_success: bool = False, p9_fast_default_wrapper_success: bool = False,
p7_input: int = 0xFF, p7_input: int = 0xFF,
eeprom_seed: str = "blank", eeprom_seed: str = "blank",
sci1_tx_timing: UartTiming | None = None,
) -> None: ) -> None:
if not rom_bytes: if not rom_bytes:
raise ValueError("ROM image is empty") raise ValueError("ROM image is empty")
@@ -103,6 +105,7 @@ class H8536Emulator:
self.memory = MemoryMap(rom_bytes, self.sci1, p7_input=p7_input) self.memory = MemoryMap(rom_bytes, self.sci1, p7_input=p7_input)
if eeprom_seed == "factory": if eeprom_seed == "factory":
self.memory.seed_factory_eeprom_and_shadow() self.memory.seed_factory_eeprom_and_shadow()
self.sci1.configure_tx_timing(sci1_tx_timing, clock_hz=clock_hz)
self.memory.p9_bus.default_wrapper_success = bool(p9_fast_default_wrapper_success) self.memory.p9_bus.default_wrapper_success = bool(p9_fast_default_wrapper_success)
self.p9_fast_path = p9_fast_path or P9FastPath( self.p9_fast_path = p9_fast_path or P9FastPath(
P9FastPathConfig(enabled=p9_fast_path_enabled, default_input_byte=p9_fast_default_input_byte) P9FastPathConfig(enabled=p9_fast_path_enabled, default_input_byte=p9_fast_default_input_byte)
@@ -134,6 +137,8 @@ class H8536Emulator:
def step(self) -> str: def step(self) -> str:
pc = self.cpu.pc pc = self.cpu.pc
cycles_before = self.cpu.cycles cycles_before = self.cpu.cycles
self.memory.current_pc = pc
try:
if self.p9_fast_path.try_handle(self): if self.p9_fast_path.try_handle(self):
self._tick_peripherals(self.cpu.cycles - cycles_before) self._tick_peripherals(self.cpu.cycles - cycles_before)
return f"{h16(pc)}: {'<p9-fast-path>':<17} P9 fast-path" return f"{h16(pc)}: {'<p9-fast-path>':<17} P9 fast-path"
@@ -194,6 +199,8 @@ class H8536Emulator:
self.cpu.cycles += cycle_delta self.cpu.cycles += cycle_delta
self._tick_peripherals(cycle_delta) self._tick_peripherals(cycle_delta)
return f"{h16(pc)}: {' '.join(f'{byte:02X}' for byte in raw):<17} {text}" return f"{h16(pc)}: {' '.join(f'{byte:02X}' for byte in raw):<17} {text}"
finally:
self.memory.current_pc = None
def run(self, max_steps: int, trace: bool = False, stop_on_heartbeat: bool = False) -> RunReport: def run(self, max_steps: int, trace: bool = False, stop_on_heartbeat: bool = False) -> RunReport:
trace_lines: list[str] = [] trace_lines: list[str] = []
@@ -434,7 +441,7 @@ class H8536Emulator:
return next_pc return next_pc
def _tick_peripherals(self, cycle_delta: int) -> None: def _tick_peripherals(self, cycle_delta: int) -> None:
self.sci1.tick() self.sci1.tick(cycle_delta)
self._interval_counter += 1 self._interval_counter += 1
if self.frt1_ocia_steps is None: if self.frt1_ocia_steps is None:
self.frt1_ocia.tick(self.memory, cycle_delta) self.frt1_ocia.tick(self.memory, cycle_delta)

View File

@@ -163,6 +163,8 @@ class RxDivergenceConfig:
clock_hz: int = 10_000_000 clock_hz: int = 10_000_000
uart_timing: bool = False uart_timing: bool = False
uart_baud: int = 38_400 uart_baud: int = 38_400
uart_format: str = "8E1"
tx_wire_timing: bool = False
p7_input: int = 0xFF p7_input: int = 0xFF
p9_fast_path: bool = True p9_fast_path: bool = True
p9_fast_input: int = 0xFF p9_fast_input: int = 0xFF
@@ -259,6 +261,9 @@ def run_rx_divergence(
p9_fast_default_wrapper_success=config.p9_fast_optimistic_wrapper, p9_fast_default_wrapper_success=config.p9_fast_optimistic_wrapper,
p7_input=config.p7_input, p7_input=config.p7_input,
eeprom_seed=config.eeprom_seed, eeprom_seed=config.eeprom_seed,
sci1_tx_timing=UartTiming.from_format(config.uart_format, baud=config.uart_baud)
if config.tx_wire_timing
else None,
) )
eeprom_load = config.eeprom_load eeprom_load = config.eeprom_load
if eeprom_load is not None and eeprom_load.is_file(): if eeprom_load is not None and eeprom_load.is_file():
@@ -271,7 +276,8 @@ def run_rx_divergence(
f"SCR={emulator.sci1.scr:02X} SSR={emulator.sci1.ssr:02X} " f"SCR={emulator.sci1.scr:02X} SSR={emulator.sci1.ssr:02X} "
f"rx_serviceable={int(_rx_ready(emulator))} " f"rx_serviceable={int(_rx_ready(emulator))} "
f"sci1_priority={_sci1_priority(emulator)} interrupt_mask={_interrupt_mask(emulator)} " f"sci1_priority={_sci1_priority(emulator)} interrupt_mask={_interrupt_mask(emulator)} "
f"clock_hz={emulator.clock_hz} p7_input={config.p7_input:#04x}" f"clock_hz={emulator.clock_hz} p7_input={config.p7_input:#04x} "
f"uart_format={config.uart_format.upper()} tx_wire_timing={int(config.tx_wire_timing)}"
) )
heartbeat_summary = None heartbeat_summary = None
@@ -303,6 +309,8 @@ def build_arg_parser() -> argparse.ArgumentParser:
parser.add_argument("--wait-heartbeat-steps", type=int, default=RxDivergenceConfig.wait_heartbeat_steps) parser.add_argument("--wait-heartbeat-steps", type=int, default=RxDivergenceConfig.wait_heartbeat_steps)
parser.add_argument("--uart-timing", action="store_true", help="inject bytes at UART character timing instead of waiting for RDRF clear") parser.add_argument("--uart-timing", action="store_true", help="inject bytes at UART character timing instead of waiting for RDRF clear")
parser.add_argument("--uart-baud", type=parse_int, default=RxDivergenceConfig.uart_baud) parser.add_argument("--uart-baud", type=parse_int, default=RxDivergenceConfig.uart_baud)
parser.add_argument("--uart-format", default=RxDivergenceConfig.uart_format, help="UART character format for timing; real RCP link is 8E1")
parser.add_argument("--tx-wire-timing", action="store_true", help="delay SCI1 TDRE/TXI by one modeled UART character after each TDR write")
parser.add_argument("--post-frame-steps", type=int, default=RxDivergenceConfig.post_frame_steps) parser.add_argument("--post-frame-steps", type=int, default=RxDivergenceConfig.post_frame_steps)
parser.add_argument("--per-byte-steps", type=int, default=RxDivergenceConfig.per_byte_steps) parser.add_argument("--per-byte-steps", type=int, default=RxDivergenceConfig.per_byte_steps)
parser.add_argument("--clock-hz", type=parse_int, default=RxDivergenceConfig.clock_hz) parser.add_argument("--clock-hz", type=parse_int, default=RxDivergenceConfig.clock_hz)
@@ -341,6 +349,8 @@ def main(argv: list[str] | None = None) -> int:
clock_hz=args.clock_hz, clock_hz=args.clock_hz,
uart_timing=args.uart_timing, uart_timing=args.uart_timing,
uart_baud=args.uart_baud, uart_baud=args.uart_baud,
uart_format=args.uart_format,
tx_wire_timing=args.tx_wire_timing,
p7_input=args.p7_input, p7_input=args.p7_input,
p9_fast_path=not args.no_p9_fast_path, p9_fast_path=not args.no_p9_fast_path,
p9_fast_input=args.p9_fast_input, p9_fast_input=args.p9_fast_input,
@@ -363,7 +373,7 @@ def _trace_frame(emulator: H8536Emulator, frame: bytes, config: RxDivergenceConf
steps_total = 0 steps_total = 0
if config.uart_timing: if config.uart_timing:
timing = UartTiming(baud=config.uart_baud) timing = UartTiming.from_format(config.uart_format, baud=config.uart_baud)
steps_total, stopped_reason = _inject_frame_uart_timed( steps_total, stopped_reason = _inject_frame_uart_timed(
emulator, emulator,
frame, frame,

View File

@@ -24,6 +24,7 @@ from .uart import UartTiming
CHECKSUM_SEED = 0x5A CHECKSUM_SEED = 0x5A
FRAME_LENGTH = 6 FRAME_LENGTH = 6
HEARTBEAT_FRAME = bytes.fromhex("0000000080DA")
CONNECT_LCD_FRAMES = ( CONNECT_LCD_FRAMES = (
bytes.fromhex("04000040001E"), bytes.fromhex("04000040001E"),
@@ -66,6 +67,12 @@ ACCESS_RANGES = (
(0xE400, 0xE401, "secondary_table_index_0000"), (0xE400, 0xE401, "secondary_table_index_0000"),
(0xE800, 0xE801, "current_table_index_0000"), (0xE800, 0xE801, "current_table_index_0000"),
(0xEC00, 0xEC01, "flag_table_index_0000"), (0xEC00, 0xEC01, "flag_table_index_0000"),
(0xE000, 0xE3FF, "primary_table_E000"),
(0xE400, 0xE7FF, "secondary_table_E400"),
(0xE800, 0xEBFF, "current_table_E800"),
(0xEC00, 0xEFFF, "flag_table_EC00"),
(0xF400, 0xF4FF, "eeprom_shadow_F400"),
(0xF7B0, 0xF82F, "persistent_record_ram"),
(0xF200, 0xF201, "lcd_ports"), (0xF200, 0xF201, "lcd_ports"),
) )
@@ -85,12 +92,30 @@ STATE_BYTES = {
0xFAA4: "rx_error_latch", 0xFAA4: "rx_error_latch",
0xFAA5: "retry_or_gate_flags", 0xFAA5: "retry_or_gate_flags",
0xFAA6: "retry_counter", 0xFAA6: "retry_counter",
0xEC02: "EC00_flag_index_0002",
0xEC04: "EC00_flag_index_0004",
0xEC12: "EC00_flag_index_0012",
0xEC13: "EC00_flag_index_0013",
0xEC15: "EC00_flag_index_0015",
0xEC82: "EC00_flag_index_0082",
} }
STATE_WORDS = { STATE_WORDS = {
0xE000: "E000_index_0000_primary", 0xE000: "E000_index_0000_primary",
0xE004: "E000_index_0002_primary",
0xE008: "E000_index_0004_primary",
0xE024: "E000_index_0012_primary",
0xE026: "E000_index_0013_primary",
0xE02A: "E000_index_0015_primary",
0xE104: "E000_index_0082_primary",
0xE400: "E400_index_0000_secondary", 0xE400: "E400_index_0000_secondary",
0xE800: "E800_index_0000_current", 0xE800: "E800_index_0000_current",
0xE804: "E800_index_0002_current",
0xE808: "E800_index_0004_current",
0xE824: "E800_index_0012_current",
0xE826: "E800_index_0013_current",
0xE82A: "E800_index_0015_current",
0xE904: "E800_index_0082_current",
0xF860: "rx_frame_01", 0xF860: "rx_frame_01",
0xF862: "rx_frame_23", 0xF862: "rx_frame_23",
0xF864: "rx_frame_45", 0xF864: "rx_frame_45",
@@ -206,8 +231,13 @@ def run_rx_probe(
boot_steps: int = 250_000, boot_steps: int = 250_000,
per_byte_steps: int = 5_000, per_byte_steps: int = 5_000,
post_frame_steps: int = 80_000, post_frame_steps: int = 80_000,
post_frame_ms: int | None = None,
wait_heartbeats: int = 0,
wait_heartbeat_steps: int = 1_500_000,
uart_timing: bool = False, uart_timing: bool = False,
uart_baud: int = 38_400, uart_baud: int = 38_400,
uart_format: str = "8E1",
tx_wire_timing: bool = False,
interval_steps: int = 512, interval_steps: int = 512,
frt1_ocia_steps: int | None = None, frt1_ocia_steps: int | None = None,
frt2_ocia_steps: int | None = None, frt2_ocia_steps: int | None = None,
@@ -232,6 +262,7 @@ def run_rx_probe(
p9_fast_default_wrapper_success=p9_fast_optimistic_wrapper, p9_fast_default_wrapper_success=p9_fast_optimistic_wrapper,
p7_input=p7_input, p7_input=p7_input,
eeprom_seed=eeprom_seed, eeprom_seed=eeprom_seed,
sci1_tx_timing=UartTiming.from_format(uart_format, baud=uart_baud) if tx_wire_timing else None,
) )
if eeprom_image is not None: if eeprom_image is not None:
emulator.memory.load_eeprom_image(eeprom_image) emulator.memory.load_eeprom_image(eeprom_image)
@@ -242,9 +273,24 @@ def run_rx_probe(
f"boot={boot_reason} steps={boot_steps_used} pc={h16(emulator.cpu.pc)} " f"boot={boot_reason} steps={boot_steps_used} pc={h16(emulator.cpu.pc)} "
f"SCR={emulator.sci1.scr:02X} SSR={emulator.sci1.ssr:02X} " f"SCR={emulator.sci1.scr:02X} SSR={emulator.sci1.ssr:02X} "
f"rx_serviceable={int(_rx_ready(emulator))} " f"rx_serviceable={int(_rx_ready(emulator))} "
f"clock_hz={emulator.clock_hz} " f"clock_hz={emulator.clock_hz} uart_format={uart_format.upper()} "
f"tx_wire_timing={int(tx_wire_timing)} "
f"lcd_display={emulator.memory.lcd.display_text(lines=4)!r}" f"lcd_display={emulator.memory.lcd.display_text(lines=4)!r}"
) )
if wait_heartbeats:
initial_heartbeats = sum(1 for frame in emulator.sci1.tx_frames if frame == HEARTBEAT_FRAME)
target_heartbeats = initial_heartbeats + wait_heartbeats
def heartbeat_predicate(inner: H8536Emulator) -> bool:
return sum(1 for frame in inner.sci1.tx_frames if frame == HEARTBEAT_FRAME) >= target_heartbeats
wait_context = RunContext()
wait_steps, wait_reason = _run_until(emulator, wait_heartbeat_steps, heartbeat_predicate, wait_context)
final_heartbeats = sum(1 for frame in emulator.sci1.tx_frames if frame == HEARTBEAT_FRAME)
boot_summary += (
f" wait_heartbeats={wait_heartbeats} wait_reason={wait_reason} "
f"wait_steps={wait_steps} heartbeat_count={final_heartbeats}"
)
results = [ results = [
_run_frame( _run_frame(
@@ -252,8 +298,10 @@ def run_rx_probe(
frame, frame,
per_byte_steps=per_byte_steps, per_byte_steps=per_byte_steps,
post_frame_steps=post_frame_steps, post_frame_steps=post_frame_steps,
post_frame_ms=post_frame_ms,
uart_timing=uart_timing, uart_timing=uart_timing,
uart_baud=uart_baud, uart_baud=uart_baud,
uart_format=uart_format,
stop_after_tx_frame=stop_after_tx_frame, stop_after_tx_frame=stop_after_tx_frame,
) )
for frame in frames for frame in frames
@@ -269,8 +317,13 @@ def build_arg_parser() -> argparse.ArgumentParser:
parser.add_argument("--boot-steps", type=int, default=250_000, help="maximum steps to boot until SCI1 RXI is serviceable") parser.add_argument("--boot-steps", type=int, default=250_000, help="maximum steps to boot until SCI1 RXI is serviceable")
parser.add_argument("--per-byte-steps", type=int, default=5_000, help="polite mode byte-consume limit, or UART mode step limit between byte arrivals") parser.add_argument("--per-byte-steps", type=int, default=5_000, help="polite mode byte-consume limit, or UART mode step limit between byte arrivals")
parser.add_argument("--post-frame-steps", type=int, default=80_000, help="maximum steps after a full injected frame") parser.add_argument("--post-frame-steps", type=int, default=80_000, help="maximum steps after a full injected frame")
parser.add_argument("--uart-timing", action="store_true", help="inject frame bytes at real 8N1 UART inter-byte timing instead of waiting for RDRF consumption") parser.add_argument("--post-frame-ms", type=int, help="run this many emulated milliseconds after each injected frame")
parser.add_argument("--wait-heartbeats", type=int, default=0, help="wait for this many heartbeat frames before injecting the first host frame")
parser.add_argument("--wait-heartbeat-steps", type=int, default=1_500_000, help="maximum steps while waiting for pre-injection heartbeat frames")
parser.add_argument("--uart-timing", action="store_true", help="inject frame bytes at real UART inter-byte timing instead of waiting for RDRF consumption")
parser.add_argument("--uart-baud", type=parse_int, default=38_400, help="baud rate for --uart-timing; 38400 gives about 260 us per 8N1 byte") parser.add_argument("--uart-baud", type=parse_int, default=38_400, help="baud rate for --uart-timing; 38400 gives about 260 us per 8N1 byte")
parser.add_argument("--uart-format", default="8E1", help="UART character format for timed RX/TX modeling; real RCP link is 8E1")
parser.add_argument("--tx-wire-timing", action="store_true", help="delay SCI1 TDRE/TXI by one modeled UART character after each TDR write")
parser.add_argument("--keep-listening", action="store_true", help="use all post-frame steps instead of stopping at the first new TX frame") parser.add_argument("--keep-listening", action="store_true", help="use all post-frame steps instead of stopping at the first new TX frame")
parser.add_argument("--interval-steps", type=int, default=512, help="rough step period for the scaffolded interval timer interrupt") parser.add_argument("--interval-steps", type=int, default=512, help="rough step period for the scaffolded interval timer interrupt")
parser.add_argument("--clock-hz", type=parse_int, default=10_000_000, help="CPU/phi clock in Hz for calibrated FRT timing") parser.add_argument("--clock-hz", type=parse_int, default=10_000_000, help="CPU/phi clock in Hz for calibrated FRT timing")
@@ -303,8 +356,13 @@ def main(argv: list[str] | None = None) -> int:
boot_steps=args.boot_steps, boot_steps=args.boot_steps,
per_byte_steps=args.per_byte_steps, per_byte_steps=args.per_byte_steps,
post_frame_steps=args.post_frame_steps, post_frame_steps=args.post_frame_steps,
post_frame_ms=args.post_frame_ms,
wait_heartbeats=args.wait_heartbeats,
wait_heartbeat_steps=args.wait_heartbeat_steps,
uart_timing=args.uart_timing, uart_timing=args.uart_timing,
uart_baud=args.uart_baud, uart_baud=args.uart_baud,
uart_format=args.uart_format,
tx_wire_timing=args.tx_wire_timing,
interval_steps=args.interval_steps, interval_steps=args.interval_steps,
frt1_ocia_steps=args.frt1_ocia_steps, frt1_ocia_steps=args.frt1_ocia_steps,
frt2_ocia_steps=args.frt2_ocia_steps, frt2_ocia_steps=args.frt2_ocia_steps,
@@ -327,6 +385,13 @@ def main(argv: list[str] | None = None) -> int:
for line in result.lines(index): for line in result.lines(index):
print(line) print(line)
print("total_tx_frames=" + " | ".join(format_frame(frame) for frame in emulator.sci1.tx_frames)) print("total_tx_frames=" + " | ".join(format_frame(frame) for frame in emulator.sci1.tx_frames))
eeprom_writes = emulator.memory.p9_bus.x24164_bus.write_log_lines(limit=80)
if eeprom_writes:
print("eeprom_writes:")
for line in eeprom_writes:
print(f" {line}")
else:
print("eeprom_writes=none")
if args.eeprom_save: if args.eeprom_save:
args.eeprom_save.parent.mkdir(parents=True, exist_ok=True) args.eeprom_save.parent.mkdir(parents=True, exist_ok=True)
args.eeprom_save.write_bytes(emulator.memory.dump_eeprom_image()) args.eeprom_save.write_bytes(emulator.memory.dump_eeprom_image())
@@ -352,8 +417,10 @@ def _run_frame(
*, *,
per_byte_steps: int, per_byte_steps: int,
post_frame_steps: int, post_frame_steps: int,
post_frame_ms: int | None,
uart_timing: bool, uart_timing: bool,
uart_baud: int, uart_baud: int,
uart_format: str,
stop_after_tx_frame: bool, stop_after_tx_frame: bool,
) -> FrameResult: ) -> FrameResult:
state_before = _state_snapshot(emulator) state_before = _state_snapshot(emulator)
@@ -363,7 +430,7 @@ def _run_frame(
context = RunContext() context = RunContext()
stopped_reason = "post_frame_steps" stopped_reason = "post_frame_steps"
steps_total = 0 steps_total = 0
timing = UartTiming(baud=uart_baud) timing = UartTiming.from_format(uart_format, baud=uart_baud)
if uart_timing: if uart_timing:
steps_total, stopped_reason = _inject_frame_uart_timed( steps_total, stopped_reason = _inject_frame_uart_timed(
@@ -391,9 +458,13 @@ def _run_frame(
def post_predicate(inner: H8536Emulator) -> bool: def post_predicate(inner: H8536Emulator) -> bool:
return stop_after_tx_frame and len(inner.sci1.tx_frames) >= target_frame_count return stop_after_tx_frame and len(inner.sci1.tx_frames) >= target_frame_count
if post_frame_ms is not None:
steps, reason = _run_cycles_for_ms(emulator, post_frame_ms, context)
stopped_reason = reason
else:
steps, reason = _run_until(emulator, post_frame_steps, post_predicate, context) steps, reason = _run_until(emulator, post_frame_steps, post_predicate, context)
steps_total += steps
stopped_reason = "tx_frame" if reason == "predicate" and stop_after_tx_frame else reason stopped_reason = "tx_frame" if reason == "predicate" and stop_after_tx_frame else reason
steps_total += steps
log_end = len(emulator.memory.access_log) log_end = len(emulator.memory.access_log)
state_after = _state_snapshot(emulator) state_after = _state_snapshot(emulator)
@@ -475,6 +546,22 @@ def _run_until_cycle(
return max(0, max_steps), "max_steps" return max(0, max_steps), "max_steps"
def _run_cycles_for_ms(emulator: H8536Emulator, delta_ms: int, context: RunContext) -> tuple[int, str]:
target_delta_cycles = int((max(0, delta_ms) * max(1, emulator.clock_hz)) / 1000)
target_cycles = emulator.cpu.cycles + target_delta_cycles
completed = 0
while emulator.cpu.cycles < target_cycles:
pc = emulator.cpu.pc
context.record_pc(pc)
try:
emulator.step()
except UnsupportedInstruction as exc:
context.unsupported = str(exc)
return completed, "unsupported_instruction"
completed += 1
return completed, f"post_frame_ms_{delta_ms}"
def _rx_ready(emulator: H8536Emulator) -> bool: def _rx_ready(emulator: H8536Emulator) -> bool:
if not (emulator.sci1.scr & SCI_SCR_RIE and emulator.sci1.scr & SCI_SCR_RE): if not (emulator.sci1.scr & SCI_SCR_RIE and emulator.sci1.scr & SCI_SCR_RE):
return False return False
@@ -546,7 +633,8 @@ def _access_lines(accesses: list[MemoryAccess]) -> list[str]:
lines = [] lines = []
for access in interesting[:80]: for access in interesting[:80]:
label = _access_label(access.address) label = _access_label(access.address)
lines.append(f"{access.kind:<5} {h16(access.address)} {access.value:02X} {label}") pc = f" pc={h16(access.pc)}" if access.pc is not None else ""
lines.append(f"{access.kind:<5} {h16(access.address)} {access.value:02X} {label}{pc}")
if len(interesting) > 80: if len(interesting) > 80:
lines.append(f"... {len(interesting) - 80} more interesting accesses") lines.append(f"... {len(interesting) - 80} more interesting accesses")
return lines return lines

View File

@@ -17,6 +17,7 @@ from .constants import (
SCI_SSR_RDRF, SCI_SSR_RDRF,
SCI_SSR_TDRE, SCI_SSR_TDRE,
) )
from .uart import UartTiming
@dataclass @dataclass
@@ -53,6 +54,8 @@ class SCI1:
_frame_buffer: bytearray = field(default_factory=bytearray) _frame_buffer: bytearray = field(default_factory=bytearray)
tx_ready_delay: int = 0 tx_ready_delay: int = 0
tx_ready_ticks: int = 2 tx_ready_ticks: int = 2
clock_hz: int = 10_000_000
tx_timing: UartTiming | None = None
_tx_ready_pending: bool = False _tx_ready_pending: bool = False
def read(self, address: int) -> int: def read(self, address: int) -> int:
@@ -96,7 +99,7 @@ class SCI1:
if len(self._frame_buffer) == len(HEARTBEAT_FRAME): if len(self._frame_buffer) == len(HEARTBEAT_FRAME):
self.tx_frames.append(bytes(self._frame_buffer)) self.tx_frames.append(bytes(self._frame_buffer))
self._frame_buffer.clear() self._frame_buffer.clear()
self.tx_ready_delay = max(0, self.tx_ready_ticks) self.tx_ready_delay = self._tx_ready_delay()
self._tx_ready_pending = True self._tx_ready_pending = True
self.tx_events.append(SciTxEvent(SCI1_TDR, value, self.scr, self.ssr, emitted)) self.tx_events.append(SciTxEvent(SCI1_TDR, value, self.scr, self.ssr, emitted))
@@ -116,9 +119,22 @@ class SCI1:
def saw_heartbeat(self) -> bool: def saw_heartbeat(self) -> bool:
return HEARTBEAT_FRAME in self.tx_frames return HEARTBEAT_FRAME in self.tx_frames
def tick(self) -> None: def configure_tx_timing(self, timing: UartTiming | None, *, clock_hz: int | None = None) -> None:
self.tx_timing = timing
if clock_hz is not None:
self.clock_hz = max(1, clock_hz)
def tx_busy(self) -> bool:
return self._tx_ready_pending and self.tx_ready_delay > 0
def tick(self, cycles: int = 1) -> None:
if self._tx_ready_pending and self.tx_ready_delay: if self._tx_ready_pending and self.tx_ready_delay:
self.tx_ready_delay -= 1 self.tx_ready_delay = max(0, self.tx_ready_delay - max(1, cycles))
if self._tx_ready_pending and self.tx_ready_delay == 0 and not (self.ssr & SCI_SSR_TDRE): if self._tx_ready_pending and self.tx_ready_delay == 0 and not (self.ssr & SCI_SSR_TDRE):
self.ssr |= SCI_SSR_TDRE self.ssr |= SCI_SSR_TDRE
self._tx_ready_pending = False self._tx_ready_pending = False
def _tx_ready_delay(self) -> int:
if self.tx_timing is None:
return max(0, self.tx_ready_ticks)
return self.tx_timing.cycles_per_character(self.clock_hz)

View File

@@ -7,10 +7,20 @@ from dataclasses import dataclass
class UartTiming: class UartTiming:
baud: int = 38_400 baud: int = 38_400
data_bits: int = 8 data_bits: int = 8
parity_bits: int = 0 parity: str = "N"
stop_bits: int = 1 stop_bits: int = 1
start_bits: int = 1 start_bits: int = 1
def __post_init__(self) -> None:
parity = self.parity.upper()
if parity not in {"N", "E", "O"}:
raise ValueError("parity must be N, E, or O")
object.__setattr__(self, "parity", parity)
@property
def parity_bits(self) -> int:
return 0 if self.parity == "N" else 1
@property @property
def bits_per_character(self) -> int: def bits_per_character(self) -> int:
return self.start_bits + self.data_bits + self.parity_bits + self.stop_bits return self.start_bits + self.data_bits + self.parity_bits + self.stop_bits
@@ -26,10 +36,27 @@ class UartTiming:
def summary(self, clock_hz: int) -> str: def summary(self, clock_hz: int) -> str:
return ( return (
f"uart_{self.data_bits}{'N' if self.parity_bits == 0 else 'P'}{self.stop_bits} " f"uart_{self.data_bits}{self.parity}{self.stop_bits} "
f"baud={self.baud} byte_us={self.micros_per_character():.3f} " f"baud={self.baud} byte_us={self.micros_per_character():.3f} "
f"byte_cycles={self.cycles_per_character(clock_hz)}" f"byte_cycles={self.cycles_per_character(clock_hz)}"
) )
@classmethod
def from_format(cls, text: str, *, baud: int = 38_400) -> "UartTiming":
normalized = text.strip().upper()
if len(normalized) != 3 or normalized[0] not in "78" or normalized[1] not in "NEO" or normalized[2] not in "12":
raise ValueError(f"unsupported UART format {text!r}; expected 8E1, 8N1, 8O1, etc.")
return cls(baud=baud, data_bits=int(normalized[0]), parity=normalized[1], stop_bits=int(normalized[2]))
@classmethod
def from_sci_smr(cls, smr: int, *, baud: int = 38_400) -> "UartTiming":
data_bits = 7 if smr & 0x40 else 8
if smr & 0x20:
parity = "O" if smr & 0x10 else "E"
else:
parity = "N"
stop_bits = 2 if smr & 0x08 else 1
return cls(baud=baud, data_bits=data_bits, parity=parity, stop_bits=stop_bits)
__all__ = ["UartTiming"] __all__ = ["UartTiming"]

View File

@@ -0,0 +1,14 @@
#!/usr/bin/env python3
"""Bench runner for CONNECT: OK candidate advance/ACK sweeps."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from h8536.connect_ok_advance_sweep import main
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,52 @@
import io
import unittest
from h8536.connect_ok_advance_sweep import build_cases, main, _matches_target
class ConnectOkAdvanceSweepTest(unittest.TestCase):
def test_core_suite_starts_with_ack_then_refresh(self):
cases = build_cases("core")
self.assertEqual([case.name for case in cases], ["ack-0040", "refresh-ok"])
self.assertEqual(cases[0].frame.hex().upper(), "05004000001F")
self.assertEqual(cases[1].frame.hex().upper(), "0400008000DE")
def test_latch_suite_includes_special_clear_candidates(self):
names = [case.name for case in build_cases("latch")]
self.assertIn("ack-0096", names)
self.assertIn("ack-00f8", names)
def test_dry_run_defaults_to_reactive_active_report_window(self):
stdout = io.StringIO()
exit_code = main(["--dry-run", "--suite", "core", "--limit", "1"], stdout=stdout)
self.assertEqual(exit_code, 0)
output = stdout.getvalue()
self.assertIn("device=COM5 38400 8E1", output)
self.assertIn("target_mode=active", output)
self.assertIn("baseline[1]=04 00 00 80 00 DE checksum_ok=1", output)
self.assertIn("case[1]=ack-0040 frame=05 00 40 00 00 1F checksum_ok=1", output)
def test_custom_candidate_accepts_five_bytes_and_computes_checksum(self):
stdout = io.StringIO()
exit_code = main(["--dry-run", "--candidate", "probe=05 00 6D 00 00"], stdout=stdout)
self.assertEqual(exit_code, 0)
self.assertIn("case[1]=probe frame=05 00 6D 00 00 32 checksum_ok=1", stdout.getvalue())
def test_active_target_ignores_heartbeat_but_accepts_report(self):
self.assertFalse(_matches_target(bytes.fromhex("0000000080DA"), "active"))
self.assertTrue(_matches_target(bytes.fromhex("02000200005A"), "active"))
self.assertTrue(_matches_target(bytes.fromhex("07804040A07D"), "active"))
def test_connect_ok_target_requires_known_ok_response(self):
self.assertTrue(_matches_target(bytes.fromhex("02000200005A"), "connect-ok"))
self.assertFalse(_matches_target(bytes.fromhex("010012000049"), "connect-ok"))
if __name__ == "__main__":
unittest.main()

View File

@@ -176,6 +176,21 @@ class EmulatorHarnessTest(unittest.TestCase):
self.assertEqual(emulator.memory.read8(ON_CHIP_RAM_START), 0x99) self.assertEqual(emulator.memory.read8(ON_CHIP_RAM_START), 0x99)
def test_memory_access_log_records_executing_pc(self):
rom = rom_with_reset(size=0x1010)
rom[0x1000:0x1005] = bytes([0x15, (ON_CHIP_RAM_START >> 8) & 0xFF, ON_CHIP_RAM_START & 0xFF, 0x06, 0x77])
emulator = H8536Emulator(bytes(rom))
emulator.run(max_steps=1)
writes = [
access
for access in emulator.memory.access_log
if access.kind == "write" and access.address == ON_CHIP_RAM_START
]
self.assertEqual(writes[-1].value, 0x77)
self.assertEqual(writes[-1].pc, 0x1000)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -83,6 +83,35 @@ class SciTimingTest(unittest.TestCase):
self.assertAlmostEqual(timing.micros_per_character(), 260.416666, places=3) self.assertAlmostEqual(timing.micros_per_character(), 260.416666, places=3)
self.assertEqual(timing.cycles_per_character(10_000_000), 2604) self.assertEqual(timing.cycles_per_character(10_000_000), 2604)
def test_uart_8e1_38400_byte_timing_matches_bench_link(self):
timing = UartTiming.from_format("8E1", baud=38_400)
self.assertEqual(timing.bits_per_character, 11)
self.assertAlmostEqual(timing.micros_per_character(), 286.458333, places=3)
self.assertEqual(timing.cycles_per_character(10_000_000), 2865)
self.assertEqual(timing.summary(10_000_000).split()[0], "uart_8E1")
def test_uart_timing_can_be_derived_from_sci_smr(self):
timing = UartTiming.from_sci_smr(0x24, baud=38_400)
self.assertEqual((timing.data_bits, timing.parity, timing.stop_bits), (8, "E", 1))
def test_tdr_write_can_use_uart_character_time_for_tdre(self):
sci = SCI1()
sci.configure_tx_timing(UartTiming.from_format("8E1", baud=38_400), clock_hz=10_000_000)
sci.write(SCI1_SCR, sci.read(SCI1_SCR) | SCI_SCR_TE)
sci.write(SCI1_TDR, 0x42)
sci.write(SCI1_SSR, sci.read(SCI1_SSR) & ~SCI_SSR_TDRE)
self.assertTrue(sci.tx_busy())
self.assertFalse(sci.read(SCI1_SSR) & SCI_SSR_TDRE)
sci.tick(2864)
self.assertFalse(sci.read(SCI1_SSR) & SCI_SSR_TDRE)
sci.tick(1)
self.assertTrue(sci.read(SCI1_SSR) & SCI_SSR_TDRE)
self.assertFalse(sci.tx_busy())
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()