From a48fa0ed18ba51328de4b5e4007189957872f3af Mon Sep 17 00:00:00 2001 From: Aiden <68633820+awils27@users.noreply.github.com> Date: Tue, 26 May 2026 15:21:52 +1000 Subject: [PATCH] command advance sweep --- README.md | 4 + docs/pt2-protocol.md | 39 ++- h8536/connect_ok_advance_sweep.py | 442 +++++++++++++++++++++++++ h8536/emulator/bench_replay.py | 12 +- h8536/emulator/memory.py | 4 +- h8536/emulator/runner.py | 121 +++---- h8536/emulator/rx_divergence.py | 14 +- h8536/emulator/rx_probe.py | 100 +++++- h8536/emulator/sci.py | 22 +- h8536/emulator/uart.py | 31 +- scripts/connect_ok_advance_sweep.py | 14 + tests/test_connect_ok_advance_sweep.py | 52 +++ tests/test_emulator.py | 15 + tests/test_emulator_sci_timing.py | 29 ++ 14 files changed, 821 insertions(+), 78 deletions(-) create mode 100644 h8536/connect_ok_advance_sweep.py create mode 100644 scripts/connect_ok_advance_sweep.py create mode 100644 tests/test_connect_ok_advance_sweep.py diff --git a/README.md b/README.md index a25e352..ac284ec 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,7 @@ To start the current emulator harness: .\.venv\Scripts\python.exe h8536_emulator_rx_divergence.py --default-frames --uart-timing --wait-heartbeats 2 --summary-only .\.venv\Scripts\python.exe scripts\bench_connect_lcd_sequence.py --port COM5 --relay-port COM6 --prompt-screen .\.venv\Scripts\python.exe scripts\connect_ok_matrix.py --suite minimal --prompt-observation --result-json captures\connect-ok-minimal-result.json +.\.venv\Scripts\python.exe scripts\connect_ok_advance_sweep.py --suite core --prompt-observation --result-json captures\connect-ok-advance-core-result.json .\.venv\Scripts\python.exe scripts\serial_ack_probe.py --ack-frame "05 00 40 00 00 1F" .\.venv\Scripts\python.exe scripts\serial_scenario.py scenarios\ack-race-000-001.json --log captures\ack-race-000-001.txt --result-json captures\ack-race-000-001-result.json .\.venv\Scripts\python.exe scripts\serial_scenario.py scenarios\table-sweep-ack-000-07f.json --log captures\table-sweep-ack-000-07f.txt --result-json captures\table-sweep-ack-000-07f-result.json @@ -128,6 +129,7 @@ Minimal smoke-test shape: - Includes an RX command probe that boots until SCI1 RXI is serviceable, injects host six-byte frames through RDR/RDRF, can optionally schedule bench-style UART byte arrivals at real spacing, listens for device TX frames, and reports serial latch/table/LCD-buffer and emulated-LCD effects. - Includes a bench helper for replaying the emulator-derived CONNECT LCD frame sequence against the real device through COM5, with optional COM6 relay power cycling and timestamped capture logs. - Includes a CONNECT: OK bench matrix runner that power-cycles between cases and tests the known sequence, single frames, primer pairs, order permutations, inter-frame gaps, repeats, and hold time to separate magic-frame, primer, cadence, and latch behavior. +- Includes a CONNECT: OK advance sweep runner that recovers to the known OK cadence, waits for an active RCP report frame, then sends one candidate continuation/ACK frame so ACK-only, selector-zero-refresh, and command-5 special selectors can be compared from the same baseline. - Includes a bench ACK probe that reproduces the `01 00 00...` -> `01 00 01...` visible retry burst, waits for `07 80 40 20 90 2D`, then sends a candidate command-5 ACK and reports whether the target keeps repeating. - Includes a checksum-resynchronizing bench receiver that scans RX byte streams for valid six-byte frames, avoids common shifted-heartbeat false locks, and can fall back to the old fixed six-byte slicer with `--sync fixed`. - Includes a JSON scenario bench runner for repeatable multi-step serial tests, including low-latency ACK-aware command-1 probes that can send the current command-5 ACK candidate immediately after the retry frame appears, with explicit max-ACK/max-target guardrails. @@ -148,6 +150,7 @@ Current serial observations: - Bench serial-format finding: real hardware talks `38400 8E1`. Earlier `8N1` captures primarily exercised SCI1 parity/error handling and retry echoes, not the normal command path. After switching bench scripts to even parity, the selector-zero CONNECT path can reach `CONNECT: OK`. - Bench CONNECT recovery finding: `CONNECT:NOT ACT` is recoverable without a power cycle. This makes it a normal no-active-session/cleared-state display rather than a terminal latch; tests can now probe from the idle NOT ACT state directly, then separately check whether OK is held or needs periodic CCU-like refresh traffic. - Bench CONNECT cadence finding: the `40 -> 80 -> C0` sequence stayed at `CONNECT:NOT ACT` with 10 ms, 50 ms, and 150 ms gaps, but produced `CONNECT: OK` then returned to `CONNECT:NOT ACT` with 700 ms and 1.5 s gaps. At 700 ms, single `40`/`80`/`C0` frames did not work, but all tested two-frame pairs did. Repeated `80 -> 80` at about 700 ms also worked, so the values do not need to differ. The no-power-cycle NOT ACT recovery capture produced repeated `02 00 02 00 00 5A` OK-path responses before heartbeat traffic resumed. +- ROM report-source finding: the active `02/01 ...` frames exposed during CONNECT OK attempts are autonomous `F870 -> BAF2 -> BA26` report-queue transmissions, not ordinary command-1 readbacks. The ROM sets `FAA2.3/FAA3.7` after sending them, so the CCU probably needs to answer in that continuation window with command `4`, `5`, or `6` to consume the report queue and keep the session alive. - Board/P9 finding: traced MCU pin 62 `P91` reaches X24164 pin 6 `SCL`, and MCU pin 68 `P97` reaches the shared X24164 pin 5 `SDA` node. The emulator now treats the ROM's `C121/C08B/C0DB/C10C/C142` P9 routines as an X24164-style two-wire EEPROM bus, with ROM logical addresses `0x000-0x7FF` on the `H'A0/H'A1` control-byte family and `0x800-0xFFF` on `H'E0/H'E1`. - EEPROM role finding: `loc_40BB` checks `P7DR.7` and the `F402 == H'6B6F` signature before defaulting EEPROM/shadow tables; `loc_4103` writes ROM default words through `BFE0`, `loc_41D2` reads sixteen 8-byte records into `F7B0-F82F`, and the command-4 path at `BD2B-BD5F` can persist serial table writes when `F76E.7` is set. - EEPROM layout finding: `build\rom_eeprom_layout.txt` currently identifies the ROM factory table at `H'C964-H'CA63`, the F400 shadow defaults, page 0 offset `0x000-0x007` as the signature/options header (`00 00 6B 6F FE 00 00 00`), pages 1-F offset `0x00-0x07` as blank-by-default record slots, and 89 selector mappings from the `H'C564` table into F400/EEPROM offsets. `F404` defaults to `H'FE00` and is tested as option/feature bits, while `F76E` combines persistence enable, dispatch suppression, and low-nibble EEPROM page selection. @@ -286,6 +289,7 @@ python h8536_emulator_rx_divergence.py --help - `--target-frame "00 00 00 00 80 DA"`: compare staged/emitted TX bytes against an expected six-byte frame. - `h8536_emulator_rx_probe.py "04 00 00 80 00"`: append the checksum, inject the host frame through SCI1 RX, and summarize responses. - `h8536_emulator_rx_probe.py --uart-timing --uart-baud 38400 "04 00 00 80 00"`: inject all six host bytes with bench-style wire spacing of about 260 us per byte, letting RXI/TXI/timers interleave; if the ROM has not cleared `RDRF` before the next byte, the SCI model raises `ORER`. The real bench link is `8E1`. +- `h8536_emulator_rx_probe.py --uart-timing --uart-format 8E1 --tx-wire-timing --wait-heartbeats 2 --post-frame-ms 700 "04 00 00 80 00 DE" "04 00 00 80 00 DE"`: replay the CONNECT refresh shape after heartbeat readiness and keep the emulator running for a bench-scale gap after each frame. The RAM trace now tags interesting accesses with the executing ROM PC, models SCI1 TDRE/TXI at 8E1 character time, and reports whether X24164 EEPROM bytes were written. - `h8536_emulator_rx_probe.py --preset connect-lcd`: replay the current CONNECT LCD activation candidates. - `h8536_emulator_rx_divergence.py --default-frames --uart-timing --wait-heartbeats 2 --summary-only`: run the focused RX divergence trace for the bench mismatch. It flags whether a frame reached cmd0 `BC69`, cmd1 `BCD7`, retry echo, command-7 replay, autonomous `BAF2` report output, or the TX/RX overlap-collapse path. - `scripts\serial_table_dump.py --port COM5 --relay-port COM6 --start 0x000 --count 0x200 --log captures\table-read.txt`: read-only command-1 sweep of the firmware-exposed serial table state for EEPROM/shadow inference. Bench serial scripts default to `8E1` because the ROM initializes SCI1 as async 8-bit even parity, 1 stop; pass `--parity N` only when reproducing older 8N1 captures. diff --git a/docs/pt2-protocol.md b/docs/pt2-protocol.md index e5f6308..aac6baa 100644 --- a/docs/pt2-protocol.md +++ b/docs/pt2-protocol.md @@ -419,10 +419,17 @@ Current interpretation: - `CONNECT:NOT ACT` is a normal no-active-session/cleared-state display, not a terminal latch. - `CONNECT: OK` is table/state driven, probably selector-zero active/connect state. - `0x8080` at selector zero is the strongest known active/connect value. -- The panel likely expects the CCU to keep seeding or refreshing state after entering OK. +- The panel likely expects the CCU to consume/ACK report-queue frames and keep seeding or refreshing state after entering OK. - The working fake-CCU sequence is probably not "three frames as fast as possible"; it appears to need CCU-like cadence or a live session window, roughly on the heartbeat/report timescale. - A single selector-zero continuation-shaped frame is insufficient in the current tests; two selector-zero writes at the working cadence are enough. They do not need to carry different values, because `80 -> 80` also worked. +ROM report-source update: + +- The active `02/01 ...` frames seen during CONNECT OK attempts are best modeled as `F870 -> BAF2 -> BA26` report-queue transmissions. +- `BAF2` dequeues a report word, encodes the first three TX bytes, reads the payload from `E800 + 2*selector`, and `BA26` appends the `0x5A` XOR checksum. +- After sending a queued report, the ROM sets `FAA2.3` and `FAA3.7`; command `4`, `5`, or `6` can then consume/advance the report only while that continuation latch is live. +- This makes a reactive fake-CCU test more valuable than another blind fixed-delay matrix: recover to OK, wait for the first active report, then send one candidate continuation/ACK frame. + ## Candidate CCU Seed Values These are syntactically valid host frames produced from ROM table mining. Use them as candidate fake-CCU state seeds, not as final protocol truth. @@ -513,6 +520,19 @@ Test whether OK is held: .\.venv\Scripts\python.exe scripts\connect_ok_matrix.py --suite hold --parity E --prompt-observation --result-json captures\connect-ok-hold-result.json ``` +Sweep strong continuation/ACK candidates after recovering to `CONNECT: OK`: + +```powershell +.\.venv\Scripts\python.exe scripts\connect_ok_advance_sweep.py --suite core --parity E --prompt-observation --result-json captures\connect-ok-advance-core-result.json +``` + +Candidate suites: + +- `core`: `05 00 40 00 00 1F` pure command-5 report-consume candidate, then `04 00 00 80 00 DE` selector-zero refresh/consume candidate. +- `special`: command-5 `0x006C/0x006D/0x006E`, which call the ROM's `BE70` queue helper. +- `latch`: command-5 `0x006B/0x0096/0x0097/0x00C6/0x00F8`, which can clear the `F731/F790` latch bits when that path is live. +- `all`: all of the above. + Current matrix result summary: ```text @@ -554,6 +574,7 @@ This fits the real panel behavior: - Whether the CCU sends a periodic refresh stream after CONNECT OK. - Exact hold time before OK falls back to NOT ACT, if no refresh traffic follows. - Whether command 4 CONNECT success depends on an existing continuation latch, timing, or a side effect created by earlier frames. +- Whether the remaining emulator/bench mismatch is an SCI timing bug or a missing ROM/peripheral behavior. The emulator now has an optional 8E1 TX wire-timing mode; with it enabled, repeated `80` no longer immediately reaches `CONNECT: OK`, which is closer to bench behavior but still needs calibration. - How EEPROM option bits change selector behavior. - Whether all old visible `07...` families can be reproduced under `8E1`. @@ -561,11 +582,16 @@ This fits the real panel behavior: 1. Finish the CONNECT matrix runs: - rerun `hold` with 700 ms gaps to measure how long OK remains without refresh traffic. -2. Test whether periodic `80` refreshes hold CONNECT OK, and find the longest safe refresh interval. -3. Dump selector table state before and after CONNECT OK. -4. Seed selectors `0x003`, `0x040`, and `0x0F6` after selector-zero OK and watch lamps/readouts. -5. Mine selector dispatch handlers for known UI text terms: `IRIS`, `GAIN`, `SHUTTER`, `BARS`, `BLACK`, `CALL`, `AUTO`, `DIAG`. -6. Build a fake-CCU streamer that repeatedly writes a small selector set and logs which RCP reports appear. +2. Run the reactive advance sweep from OK and compare: + - ACK-only `05 00 40 00 00 1F` + - refresh/consume `04 00 00 80 00 DE` + - command-5 special selectors `0x006C/0x006D/0x006E` +3. Test whether periodic report ACKs plus periodic `80` refreshes hold CONNECT OK. +4. Sweep emulator RX phase/gap with `--tx-wire-timing` and compare where `BD0E` is reached or missed. +5. Dump selector table state before and after CONNECT OK. +6. Seed selectors `0x003`, `0x040`, and `0x0F6` after selector-zero OK and watch lamps/readouts. +7. Mine selector dispatch handlers for known UI text terms: `IRIS`, `GAIN`, `SHUTTER`, `BARS`, `BLACK`, `CALL`, `AUTO`, `DIAG`. +8. Build a fake-CCU streamer that repeatedly writes a small selector set and logs which RCP reports appear. ## Source Files And Reports @@ -588,5 +614,6 @@ Useful tools: - `h8536_emulator_state_search.py` - `scripts/bench_connect_lcd_sequence.py` - `scripts/connect_ok_matrix.py` +- `scripts/connect_ok_advance_sweep.py` - `scripts/serial_table_dump.py` - `scripts/serial_scenario.py` diff --git a/h8536/connect_ok_advance_sweep.py b/h8536/connect_ok_advance_sweep.py new file mode 100644 index 0000000..ef16fe1 --- /dev/null +++ b/h8536/connect_ok_advance_sweep.py @@ -0,0 +1,442 @@ +from __future__ import annotations + +import argparse +import json +import sys +import time +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any, TextIO + +from .bench_connect_lcd import ( + BenchLogger, + FrameDetector, + add_serial_format_args, + _import_serial, + open_device_serial, + _read_for, + _relay_command, + _relay_settle, + _send_frame, + _wait_for_ready, + format_frame, + frame_checksum_ok, + label_frame, + parse_frame, + serial_format_label, +) +from .connect_ok_matrix import FRAME_80_OK + + +ACK_0040 = bytes.fromhex("05004000001F") +REFRESH_OK = bytes.fromhex("0400008000DE") +ACK_006B = bytes.fromhex("05006B000034") +ACK_006C = bytes.fromhex("05006C000033") +ACK_006D = bytes.fromhex("05006D000032") +ACK_006E = bytes.fromhex("05006E000031") +ACK_0096 = bytes.fromhex("050116000048") +ACK_0097 = bytes.fromhex("050117000049") +ACK_00C6 = bytes.fromhex("050146000018") +ACK_00F8 = bytes.fromhex("050178000026") + +DEFAULT_BASELINE = (FRAME_80_OK, FRAME_80_OK) +HEARTBEAT_FRAME = bytes.fromhex("0000000080DA") + + +@dataclass(frozen=True) +class AdvanceCase: + name: str + frame: bytes + note: str = "" + + +def default_log_path(suite: str) -> Path: + safe_suite = "".join(char if char.isalnum() or char in "-_" else "-" for char in suite) + return Path("captures") / f"connect-ok-advance-{safe_suite}-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt" + + +def build_arg_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description=( + "Recover the RCP to CONNECT: OK, then sweep one candidate continuation/ACK " + "frame from the active report window." + ) + ) + parser.add_argument("--suite", choices=("core", "special", "latch", "all"), default="core") + parser.add_argument("--case", action="append", help="run only matching case names; repeatable") + parser.add_argument("--limit", type=int, help="run only the first N selected cases") + parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP") + parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate") + add_serial_format_args(parser) + parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port") + parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate") + parser.add_argument("--no-power-cycle", action="store_true", help="do not power-cycle before the sweep starts") + parser.add_argument( + "--power-cycle-between-cases", + action="store_true", + help="power-cycle before each candidate instead of recovering from the current state", + ) + parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power") + parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power") + parser.add_argument("--off-seconds", type=float, default=1.5, help="seconds to hold DUT power off") + parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the relay port") + parser.add_argument("--ready-timeout", type=float, default=10.0, help="seconds to wait for heartbeat") + parser.add_argument("--ready-heartbeats", type=int, default=2, help="heartbeat frames to observe before a case") + parser.add_argument("--require-ready", action="store_true", help="abort a case if readiness heartbeats are not observed") + parser.add_argument("--pre-case-drain", type=float, default=0.250, help="seconds to drain/log RX before baseline") + parser.add_argument( + "--baseline-frame", + action="append", + type=parse_frame, + help="override baseline with custom frame; repeatable; default is two selector-zero OK frames", + ) + parser.add_argument("--baseline-gap", type=float, default=0.700, help="seconds to listen between baseline frames") + parser.add_argument( + "--target-mode", + choices=("active", "connect-ok", "non-heartbeat", "none"), + default="active", + help="which device frame opens the candidate-send window", + ) + parser.add_argument("--target-timeout", type=float, default=2.5, help="seconds to wait for the target window") + parser.add_argument("--candidate-guard", type=float, default=0.020, help="seconds to wait after target before candidate") + parser.add_argument( + "--send-on-target-timeout", + action="store_true", + help="send the candidate even if no target-mode frame was observed", + ) + parser.add_argument("--post-candidate-read", type=float, default=5.0, help="seconds to listen after candidate") + parser.add_argument("--candidate", action="append", type=_parse_candidate, help="custom candidate as name=frame or frame") + parser.add_argument("--sync", choices=("checksum", "fixed"), default="checksum", help="RX frame sync strategy") + parser.add_argument("--prompt-observation", action="store_true", help="prompt for observed LCD/lamp state after each case") + parser.add_argument("--pause-between-cases", action="store_true", help="wait for Enter before the next case") + parser.add_argument("--log", type=Path, help="capture log path") + parser.add_argument("--result-json", type=Path, help="write machine-readable case summary") + parser.add_argument("--dry-run", action="store_true", help="print selected cases without opening serial ports") + return parser + + +def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int: + args = build_arg_parser().parse_args(argv) + cases = select_cases(args) + baseline = tuple(args.baseline_frame or DEFAULT_BASELINE) + log_path = args.log or default_log_path(args.suite) + + if args.dry_run: + _print_dry_run(args, cases, baseline, log_path, stdout) + return 0 + + serial = _import_serial() + logger = BenchLogger(log_path, stdout=stdout) + results: list[dict[str, Any]] = [] + try: + logger.emit("CONNECT: OK advance sweep") + logger.emit( + f"suite={args.suite} cases={len(cases)} device={args.port} {args.baud} {serial_format_label(args)} " + f"relay={args.relay_port} {args.relay_baud} sync={args.sync}" + ) + logger.emit(f"log={log_path}") + _emit_baseline_plan(logger, baseline, args.baseline_gap) + with open_device_serial(serial, args) as device: + relay = None + try: + if not args.no_power_cycle or args.power_cycle_between_cases: + relay = serial.Serial(args.relay_port, args.relay_baud, timeout=0.25) + _relay_settle(relay, args.relay_settle, logger) + if not args.no_power_cycle and not args.power_cycle_between_cases: + _power_cycle(args, device, relay, logger) + + for index, case in enumerate(cases, start=1): + if args.pause_between_cases and index > 1: + input(f"Press Enter to start case {index}/{len(cases)}: {case.name}") + result = _run_case(args, device, relay, logger, case, baseline, index, len(cases)) + results.append(result) + if args.require_ready and not result["ready"]: + logger.event("ABORT readiness was required") + break + finally: + if relay is not None: + relay.close() + _emit_summary(logger, results) + if args.result_json: + _write_result_json(args.result_json, log_path, args, baseline, results) + return 0 + finally: + logger.close() + + +def select_cases(args: argparse.Namespace) -> list[AdvanceCase]: + cases = list(args.candidate or build_cases(args.suite)) + if args.case: + filters = [item.lower() for item in args.case] + cases = [case for case in cases if any(fragment in case.name.lower() for fragment in filters)] + if args.limit is not None: + cases = cases[: max(0, args.limit)] + if not cases: + raise SystemExit("no advance cases selected") + return cases + + +def build_cases(suite: str) -> list[AdvanceCase]: + core = [ + AdvanceCase("ack-0040", ACK_0040, "pure command-5 continuation ACK candidate"), + AdvanceCase("refresh-ok", REFRESH_OK, "command-4 continuation ACK plus selector-zero 0x8080 refresh"), + ] + special = [ + AdvanceCase("ack-006c", ACK_006C, "command-5 special BE70 selector candidate"), + AdvanceCase("ack-006d", ACK_006D, "command-5 special BE70 selector candidate"), + AdvanceCase("ack-006e", ACK_006E, "command-5 special BE70 selector candidate"), + ] + latch = [ + AdvanceCase("ack-006b", ACK_006B, "command-5 latch-clear special selector candidate"), + AdvanceCase("ack-0096", ACK_0096, "command-5 F731/F790 latch-clear candidate"), + AdvanceCase("ack-0097", ACK_0097, "command-5 F731/F790 latch-clear candidate"), + AdvanceCase("ack-00c6", ACK_00C6, "command-5 F731/F790 latch-clear candidate"), + AdvanceCase("ack-00f8", ACK_00F8, "command-5 F731/F790 latch-clear candidate"), + ] + suites = { + "core": core, + "special": special, + "latch": latch, + "all": core + special + latch, + } + return suites[suite] + + +def _run_case( + args: argparse.Namespace, + device: Any, + relay: Any | None, + logger: BenchLogger, + case: AdvanceCase, + baseline: tuple[bytes, ...], + index: int, + total: int, +) -> dict[str, Any]: + detector = FrameDetector(sync_mode=args.sync) + logger.emit() + logger.emit(f"CASE {index}/{total} {case.name}") + logger.emit(f"note={case.note or '(none)'}") + logger.emit(f"candidate={format_frame(case.frame)} checksum_ok={int(frame_checksum_ok(case.frame))}") + logger.emit( + f"target_mode={args.target_mode} target_timeout={args.target_timeout:.3f}s " + f"candidate_guard={args.candidate_guard:.3f}s" + ) + + if args.power_cycle_between_cases: + _power_cycle(args, device, relay, logger) + else: + device.reset_input_buffer() + logger.event("POWER_CYCLE skipped for case; recovering with baseline") + + ready = _wait_for_ready(device, detector, logger, args.ready_timeout, args.ready_heartbeats) + if args.require_ready and not ready: + observation = _prompt_observation(args, logger, case) + return _case_result(case, detector, ready=ready, target=None, candidate_sent=False, observation=observation) + + if args.pre_case_drain > 0: + logger.event(f"DRAIN before baseline {args.pre_case_drain:.3f}s") + _read_for(device, detector, logger, args.pre_case_drain) + + for frame_index, frame in enumerate(baseline, start=1): + _send_frame(device, frame, logger, f"{case.name}.baseline{frame_index}") + _read_for(device, detector, logger, args.baseline_gap) + + target = _wait_for_target_mode(device, detector, logger, args.target_mode, args.target_timeout) + candidate_sent = target is not None or args.target_mode == "none" or args.send_on_target_timeout + if candidate_sent: + if args.candidate_guard > 0: + logger.event(f"CANDIDATE guard {args.candidate_guard:.3f}s") + _read_for(device, detector, logger, args.candidate_guard) + _send_frame(device, case.frame, logger, f"{case.name}.candidate") + else: + logger.event("CANDIDATE skipped because target window was not observed") + + if args.post_candidate_read > 0: + logger.event(f"POST_CANDIDATE_READ {args.post_candidate_read:.3f}s") + _read_for(device, detector, logger, args.post_candidate_read) + + observation = _prompt_observation(args, logger, case) + result = _case_result(case, detector, ready=ready, target=target, candidate_sent=candidate_sent, observation=observation) + logger.event( + f"CASE_RESULT {case.name} ready={int(ready)} target_seen={int(target is not None)} " + f"candidate_sent={int(candidate_sent)} rx_frames={result['rx_frames']} " + f"labels={json.dumps(result['labels'], sort_keys=True)}" + ) + return result + + +def _wait_for_target_mode( + device: Any, + detector: FrameDetector, + logger: BenchLogger, + mode: str, + timeout_seconds: float, +) -> bytes | None: + if mode == "none": + logger.event("TARGET disabled") + return None + logger.event(f"WAIT target_mode={mode} timeout={timeout_seconds:.3f}s") + start_index = len(detector.frames) + deadline = time.monotonic() + max(0.0, timeout_seconds) + while time.monotonic() < deadline: + before = len(detector.frames) + _read_for(device, detector, logger, 0.050) + for frame in detector.frames[max(start_index, before) :]: + if _matches_target(frame, mode): + logger.event(f"TARGET seen {format_frame(frame)} label={label_frame(frame)}") + return frame + logger.event("TARGET_TIMEOUT") + return None + + +def _matches_target(frame: bytes, mode: str) -> bool: + label = label_frame(frame) + if mode == "connect-ok": + return label in {"connect_ok_path_response_candidate", "connect_c0_path_response_candidate"} + if mode == "non-heartbeat": + return frame_checksum_ok(frame) and frame != HEARTBEAT_FRAME + if mode == "active": + return frame_checksum_ok(frame) and frame != HEARTBEAT_FRAME and frame[0] in {0x01, 0x02, 0x07} + raise ValueError(f"unknown target mode {mode!r}") + + +def _power_cycle(args: argparse.Namespace, device: Any, relay: Any | None, logger: BenchLogger) -> None: + if relay is None: + raise SystemExit("relay was not opened") + _relay_command(relay, args.power_off_command, logger) + time.sleep(max(0.0, args.off_seconds)) + device.reset_input_buffer() + _relay_command(relay, args.power_on_command, logger) + + +def _case_result( + case: AdvanceCase, + detector: FrameDetector, + *, + ready: bool, + target: bytes | None, + candidate_sent: bool, + observation: str, +) -> dict[str, Any]: + return { + "name": case.name, + "note": case.note, + "frame": format_frame(case.frame), + "ready": ready, + "target_seen": target is not None, + "target": format_frame(target) if target is not None else "", + "candidate_sent": candidate_sent, + "rx_frames": len(detector.frames), + "labels": dict(detector.labels), + "resync_events": detector.resync_events, + "dropped_bytes": detector.dropped_bytes, + "trailing_unframed_bytes": len(detector.buffer), + "observation": observation, + } + + +def _prompt_observation(args: argparse.Namespace, logger: BenchLogger, case: AdvanceCase) -> str: + if not args.prompt_observation: + return "" + prompt = f"{case.name}: LCD/lamps/readouts observation, or Enter to skip: " + observation = input(prompt).strip() + logger.event(f"OBSERVATION {case.name}: {observation or '(no note)'}") + return observation + + +def _emit_baseline_plan(logger: BenchLogger, baseline: tuple[bytes, ...], gap: float) -> None: + logger.emit(f"baseline_gap={gap:.3f}s baseline_frames={len(baseline)}") + for index, frame in enumerate(baseline, start=1): + logger.emit(f"baseline[{index}]={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}") + + +def _emit_summary(logger: BenchLogger, results: list[dict[str, Any]]) -> None: + logger.emit() + logger.emit("Advance Sweep Summary") + logger.emit(f"cases={len(results)}") + for result in results: + labels = ", ".join(f"{key}={value}" for key, value in sorted(result["labels"].items())) or "no_rx_frames" + note = result["observation"] or "(no observation)" + logger.emit( + f"{result['name']}: ready={int(result['ready'])} target_seen={int(result['target_seen'])} " + f"candidate_sent={int(result['candidate_sent'])} rx_frames={result['rx_frames']} " + f"{labels} observation={note}" + ) + + +def _write_result_json( + path: Path, + log_path: Path, + args: argparse.Namespace, + baseline: tuple[bytes, ...], + results: list[dict[str, Any]], +) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + payload = { + "suite": args.suite, + "log": str(log_path), + "serial_format": f"{args.baud} {serial_format_label(args)}", + "baseline": [format_frame(frame) for frame in baseline], + "baseline_gap": args.baseline_gap, + "target_mode": args.target_mode, + "candidate_guard": args.candidate_guard, + "cases": results, + } + path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") + + +def _print_dry_run( + args: argparse.Namespace, + cases: list[AdvanceCase], + baseline: tuple[bytes, ...], + log_path: Path, + stdout: TextIO, +) -> None: + print(f"suite={args.suite}", file=stdout) + print(f"cases={len(cases)}", file=stdout) + print(f"device={args.port} {args.baud} {serial_format_label(args)}", file=stdout) + print(f"relay={args.relay_port} {args.relay_baud}", file=stdout) + print(f"power_cycle_start={int(not args.no_power_cycle)}", file=stdout) + print(f"power_cycle_between_cases={int(args.power_cycle_between_cases)}", file=stdout) + print(f"target_mode={args.target_mode} target_timeout={args.target_timeout:.3f}s", file=stdout) + print(f"candidate_guard={args.candidate_guard:.3f}s post_candidate_read={args.post_candidate_read:.3f}s", file=stdout) + print(f"baseline_gap={args.baseline_gap:.3f}s", file=stdout) + for index, frame in enumerate(baseline, start=1): + print(f"baseline[{index}]={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}", file=stdout) + print(f"log={log_path}", file=stdout) + for index, case in enumerate(cases, start=1): + print(f"case[{index}]={case.name} frame={format_frame(case.frame)} checksum_ok={int(frame_checksum_ok(case.frame))}", file=stdout) + if case.note: + print(f" note={case.note}", file=stdout) + + +def _parse_candidate(text: str) -> AdvanceCase: + if "=" in text: + name, frame_text = text.split("=", 1) + name = name.strip() + if not name: + raise argparse.ArgumentTypeError("custom candidate name cannot be empty") + else: + name = "custom" + frame_text = text + return AdvanceCase(name=name, frame=parse_frame(frame_text), note="custom candidate") + + +__all__ = [ + "ACK_0040", + "ACK_006B", + "ACK_006C", + "ACK_006D", + "ACK_006E", + "ACK_0096", + "ACK_0097", + "ACK_00C6", + "ACK_00F8", + "AdvanceCase", + "REFRESH_OK", + "build_arg_parser", + "build_cases", + "main", + "select_cases", +] diff --git a/h8536/emulator/bench_replay.py b/h8536/emulator/bench_replay.py index 1be0d66..6522d8c 100644 --- a/h8536/emulator/bench_replay.py +++ b/h8536/emulator/bench_replay.py @@ -178,6 +178,8 @@ class ReplayConfig: clock_hz: int = 10_000_000 uart_timing: bool = True uart_baud: int = 38_400 + uart_format: str = "8E1" + tx_wire_timing: bool = True p9_fast_path: bool = True p9_fast_input: int = 0xFF p9_fast_optimistic_wrapper: bool = False @@ -248,6 +250,9 @@ def run_bench_replay(log_path: Path, *, rom_path: Path | None = None, config: Re p9_fast_default_wrapper_success=config.p9_fast_optimistic_wrapper, p7_input=config.p7_input, eeprom_seed=config.eeprom_seed, + sci1_tx_timing=UartTiming.from_format(config.uart_format, baud=config.uart_baud) + if config.tx_wire_timing + else None, ) context = RunContext() @@ -258,6 +263,7 @@ def run_bench_replay(log_path: Path, *, rom_path: Path | None = None, config: Re f"rx_serviceable={int(_rx_ready(emulator))} " f"sci1_priority={_sci1_priority(emulator)} interrupt_mask={_interrupt_mask(emulator)} " f"clock_hz={emulator.clock_hz} " + f"uart_format={config.uart_format.upper()} tx_wire_timing={int(config.tx_wire_timing)} " f"lcd_display={emulator.memory.lcd.display_text(lines=4)!r}" ) @@ -270,7 +276,7 @@ def run_bench_replay(log_path: Path, *, rom_path: Path | None = None, config: Re gap_frames = tuple(emulator.sci1.tx_frames[tx_frame_start_before_delay:]) tx_frame_start = len(emulator.sci1.tx_frames) if config.uart_timing: - timing = UartTiming(baud=config.uart_baud) + timing = UartTiming.from_format(config.uart_format, baud=config.uart_baud) steps_during_rx, inject_reason = _inject_frame_uart_timed( emulator, host.frame, @@ -367,7 +373,9 @@ def build_arg_parser() -> argparse.ArgumentParser: parser.add_argument("--interval-steps", type=int, default=ReplayConfig.interval_steps) parser.add_argument("--clock-hz", type=lambda text: int(text, 0), default=ReplayConfig.clock_hz) parser.add_argument("--uart-baud", type=lambda text: int(text, 0), default=ReplayConfig.uart_baud, help="baud rate for bench-style UART injection") + parser.add_argument("--uart-format", default=ReplayConfig.uart_format, help="UART character format for bench-style timing; real RCP link is 8E1") parser.add_argument("--polite-rx", action="store_true", help="wait for each RX byte to be consumed before injecting the next byte") + parser.add_argument("--no-tx-wire-timing", action="store_true", help="use the legacy tiny TDRE delay instead of modeled UART TX character time") parser.add_argument("--frt1-ocia-steps", type=int, default=ReplayConfig.frt1_ocia_steps) parser.add_argument("--frt2-ocia-steps", type=int, default=ReplayConfig.frt2_ocia_steps) parser.add_argument("--no-p9-fast-path", action="store_true", help="disable shortcut handling for known P9 routines") @@ -395,6 +403,8 @@ def main(argv: list[str] | None = None) -> int: clock_hz=args.clock_hz, uart_timing=not args.polite_rx, uart_baud=args.uart_baud, + uart_format=args.uart_format, + tx_wire_timing=not args.no_tx_wire_timing, p9_fast_path=not args.no_p9_fast_path, p9_fast_input=args.p9_fast_input, p9_fast_optimistic_wrapper=args.p9_fast_optimistic_wrapper, diff --git a/h8536/emulator/memory.py b/h8536/emulator/memory.py index fa809fa..e2ce83d 100644 --- a/h8536/emulator/memory.py +++ b/h8536/emulator/memory.py @@ -35,6 +35,7 @@ class MemoryAccess: value: int kind: str region: str + pc: int | None = None class MemoryMap: @@ -48,6 +49,7 @@ class MemoryMap: self.external: dict[int, int] = {} self.port_inputs: dict[int, int] = {P7DR: p7_input & 0xFF} self.access_log: list[MemoryAccess] = [] + self.current_pc: int | None = None self._set_register(SCI1_SMR, self.sci1.smr) self._set_register(SCI1_BRR, self.sci1.brr) @@ -185,7 +187,7 @@ class MemoryMap: return ((latch & ddr) | (pins & ~ddr)) & 0xFF def _log(self, kind: str, address: int, size: int, value: int) -> None: - self.access_log.append(MemoryAccess(address, size, value, kind, self.region(address).name)) + self.access_log.append(MemoryAccess(address, size, value, kind, self.region(address).name, self.current_pc)) def describe_regions(regions: Iterable[MemoryRegion] = MEMORY_REGIONS) -> str: diff --git a/h8536/emulator/runner.py b/h8536/emulator/runner.py index f41b1ba..dc18df2 100644 --- a/h8536/emulator/runner.py +++ b/h8536/emulator/runner.py @@ -41,6 +41,7 @@ from .fast_paths import P9FastPath, P9FastPathConfig from .memory import MemoryMap from .sci import SCI1 from .timers import FrtOciaScheduler, FrtRegisters +from .uart import UartTiming @dataclass @@ -94,6 +95,7 @@ class H8536Emulator: p9_fast_default_wrapper_success: bool = False, p7_input: int = 0xFF, eeprom_seed: str = "blank", + sci1_tx_timing: UartTiming | None = None, ) -> None: if not rom_bytes: raise ValueError("ROM image is empty") @@ -103,6 +105,7 @@ class H8536Emulator: self.memory = MemoryMap(rom_bytes, self.sci1, p7_input=p7_input) if eeprom_seed == "factory": self.memory.seed_factory_eeprom_and_shadow() + self.sci1.configure_tx_timing(sci1_tx_timing, clock_hz=clock_hz) self.memory.p9_bus.default_wrapper_success = bool(p9_fast_default_wrapper_success) self.p9_fast_path = p9_fast_path or P9FastPath( P9FastPathConfig(enabled=p9_fast_path_enabled, default_input_byte=p9_fast_default_input_byte) @@ -134,66 +137,70 @@ class H8536Emulator: def step(self) -> str: pc = self.cpu.pc cycles_before = self.cpu.cycles - if self.p9_fast_path.try_handle(self): - self._tick_peripherals(self.cpu.cycles - cycles_before) - return f"{h16(pc)}: {'':<17} P9 fast-path" + self.memory.current_pc = pc + try: + if self.p9_fast_path.try_handle(self): + self._tick_peripherals(self.cpu.cycles - cycles_before) + return f"{h16(pc)}: {'':<17} P9 fast-path" - decoder = H8536Decoder(self.memory.rom, br=self.cpu.br) - ins = decoder.decode(pc) - if not ins.valid: - raise UnsupportedInstruction(pc, ins.raw, ins.text) + decoder = H8536Decoder(self.memory.rom, br=self.cpu.br) + ins = decoder.decode(pc) + if not ins.valid: + raise UnsupportedInstruction(pc, ins.raw, ins.text) - next_pc = (pc + ins.size) & 0xFFFF - raw = ins.raw - text = ins.text + next_pc = (pc + ins.size) & 0xFFFF + raw = ins.raw + text = ins.text - if raw[0] == 0x00: - pass - elif raw[0] == 0x02 and len(raw) == 2: - self._pop_register_mask(raw[1]) - elif raw[0] == 0x11 and len(raw) >= 2: - next_pc = self._indirect_jump_call(raw, pc, next_pc) - elif raw[0] == 0x12 and len(raw) == 2: - self._push_register_mask(raw[1]) - elif raw[0] in (0x01, 0x06, 0x07) and len(raw) == 3 and 0xB8 <= raw[1] <= 0xBF: - next_pc = self._scb(raw, pc, next_pc) - elif raw[0] in (0x04, 0x0C): - next_pc = self._execute_general(pc, next_pc) - elif 0x40 <= raw[0] <= 0x47 and len(raw) == 2: - reg = raw[0] & 0x07 - self._cmp(self._reg_read(reg, 1), raw[1], 1) - elif 0x48 <= raw[0] <= 0x4F and len(raw) == 3: - reg = raw[0] & 0x07 - self._cmp(self.cpu.regs[reg], int.from_bytes(raw[1:3], "big"), 2) - elif 0x50 <= raw[0] <= 0x57 and len(raw) == 2: - self._reg_write(raw[0] & 0x07, raw[1], 1) - self._set_logic_flags(raw[1], 1) - elif 0x58 <= raw[0] <= 0x5F and len(raw) == 3: - self.cpu.regs[raw[0] & 0x07] = int.from_bytes(raw[1:3], "big") - self._set_logic_flags(self.cpu.regs[raw[0] & 0x07], 2) - elif raw[0] in (0x0E, 0x1E, 0x18): - next_pc = self._direct_call(raw, next_pc) - elif raw[0] == 0x19: - next_pc = self._pop16() - elif raw[0] == 0x0A: - next_pc = self._return_from_interrupt() - elif raw[0] in (0x15, 0x1D) and len(raw) >= 4: - next_pc = self._execute_general(pc, next_pc) - elif raw[0] in range(0xA0, 0x100): - next_pc = self._execute_general(pc, next_pc) - elif raw[0] in range(0x20, 0x30) and len(raw) == 2: - next_pc = self._branch8(raw, pc, next_pc) - elif raw[0] in range(0x30, 0x40) and len(raw) == 3: - next_pc = self._branch16(raw, pc, next_pc) - else: - raise UnsupportedInstruction(pc, raw, text) + if raw[0] == 0x00: + pass + elif raw[0] == 0x02 and len(raw) == 2: + self._pop_register_mask(raw[1]) + elif raw[0] == 0x11 and len(raw) >= 2: + next_pc = self._indirect_jump_call(raw, pc, next_pc) + elif raw[0] == 0x12 and len(raw) == 2: + self._push_register_mask(raw[1]) + elif raw[0] in (0x01, 0x06, 0x07) and len(raw) == 3 and 0xB8 <= raw[1] <= 0xBF: + next_pc = self._scb(raw, pc, next_pc) + elif raw[0] in (0x04, 0x0C): + next_pc = self._execute_general(pc, next_pc) + elif 0x40 <= raw[0] <= 0x47 and len(raw) == 2: + reg = raw[0] & 0x07 + self._cmp(self._reg_read(reg, 1), raw[1], 1) + elif 0x48 <= raw[0] <= 0x4F and len(raw) == 3: + reg = raw[0] & 0x07 + self._cmp(self.cpu.regs[reg], int.from_bytes(raw[1:3], "big"), 2) + elif 0x50 <= raw[0] <= 0x57 and len(raw) == 2: + self._reg_write(raw[0] & 0x07, raw[1], 1) + self._set_logic_flags(raw[1], 1) + elif 0x58 <= raw[0] <= 0x5F and len(raw) == 3: + self.cpu.regs[raw[0] & 0x07] = int.from_bytes(raw[1:3], "big") + self._set_logic_flags(self.cpu.regs[raw[0] & 0x07], 2) + elif raw[0] in (0x0E, 0x1E, 0x18): + next_pc = self._direct_call(raw, next_pc) + elif raw[0] == 0x19: + next_pc = self._pop16() + elif raw[0] == 0x0A: + next_pc = self._return_from_interrupt() + elif raw[0] in (0x15, 0x1D) and len(raw) >= 4: + next_pc = self._execute_general(pc, next_pc) + elif raw[0] in range(0xA0, 0x100): + next_pc = self._execute_general(pc, next_pc) + elif raw[0] in range(0x20, 0x30) and len(raw) == 2: + next_pc = self._branch8(raw, pc, next_pc) + elif raw[0] in range(0x30, 0x40) and len(raw) == 3: + next_pc = self._branch16(raw, pc, next_pc) + else: + raise UnsupportedInstruction(pc, raw, text) - self.cpu.pc = next_pc - self.cpu.steps += 1 - cycle_delta = self._rough_cycles(raw) - self.cpu.cycles += cycle_delta - self._tick_peripherals(cycle_delta) - return f"{h16(pc)}: {' '.join(f'{byte:02X}' for byte in raw):<17} {text}" + self.cpu.pc = next_pc + self.cpu.steps += 1 + cycle_delta = self._rough_cycles(raw) + self.cpu.cycles += cycle_delta + self._tick_peripherals(cycle_delta) + return f"{h16(pc)}: {' '.join(f'{byte:02X}' for byte in raw):<17} {text}" + finally: + self.memory.current_pc = None def run(self, max_steps: int, trace: bool = False, stop_on_heartbeat: bool = False) -> RunReport: trace_lines: list[str] = [] @@ -434,7 +441,7 @@ class H8536Emulator: return next_pc def _tick_peripherals(self, cycle_delta: int) -> None: - self.sci1.tick() + self.sci1.tick(cycle_delta) self._interval_counter += 1 if self.frt1_ocia_steps is None: self.frt1_ocia.tick(self.memory, cycle_delta) diff --git a/h8536/emulator/rx_divergence.py b/h8536/emulator/rx_divergence.py index 024e8a6..f591fc7 100644 --- a/h8536/emulator/rx_divergence.py +++ b/h8536/emulator/rx_divergence.py @@ -163,6 +163,8 @@ class RxDivergenceConfig: clock_hz: int = 10_000_000 uart_timing: bool = False uart_baud: int = 38_400 + uart_format: str = "8E1" + tx_wire_timing: bool = False p7_input: int = 0xFF p9_fast_path: bool = True p9_fast_input: int = 0xFF @@ -259,6 +261,9 @@ def run_rx_divergence( p9_fast_default_wrapper_success=config.p9_fast_optimistic_wrapper, p7_input=config.p7_input, eeprom_seed=config.eeprom_seed, + sci1_tx_timing=UartTiming.from_format(config.uart_format, baud=config.uart_baud) + if config.tx_wire_timing + else None, ) eeprom_load = config.eeprom_load if eeprom_load is not None and eeprom_load.is_file(): @@ -271,7 +276,8 @@ def run_rx_divergence( f"SCR={emulator.sci1.scr:02X} SSR={emulator.sci1.ssr:02X} " f"rx_serviceable={int(_rx_ready(emulator))} " f"sci1_priority={_sci1_priority(emulator)} interrupt_mask={_interrupt_mask(emulator)} " - f"clock_hz={emulator.clock_hz} p7_input={config.p7_input:#04x}" + f"clock_hz={emulator.clock_hz} p7_input={config.p7_input:#04x} " + f"uart_format={config.uart_format.upper()} tx_wire_timing={int(config.tx_wire_timing)}" ) heartbeat_summary = None @@ -303,6 +309,8 @@ def build_arg_parser() -> argparse.ArgumentParser: parser.add_argument("--wait-heartbeat-steps", type=int, default=RxDivergenceConfig.wait_heartbeat_steps) parser.add_argument("--uart-timing", action="store_true", help="inject bytes at UART character timing instead of waiting for RDRF clear") parser.add_argument("--uart-baud", type=parse_int, default=RxDivergenceConfig.uart_baud) + parser.add_argument("--uart-format", default=RxDivergenceConfig.uart_format, help="UART character format for timing; real RCP link is 8E1") + parser.add_argument("--tx-wire-timing", action="store_true", help="delay SCI1 TDRE/TXI by one modeled UART character after each TDR write") parser.add_argument("--post-frame-steps", type=int, default=RxDivergenceConfig.post_frame_steps) parser.add_argument("--per-byte-steps", type=int, default=RxDivergenceConfig.per_byte_steps) parser.add_argument("--clock-hz", type=parse_int, default=RxDivergenceConfig.clock_hz) @@ -341,6 +349,8 @@ def main(argv: list[str] | None = None) -> int: clock_hz=args.clock_hz, uart_timing=args.uart_timing, uart_baud=args.uart_baud, + uart_format=args.uart_format, + tx_wire_timing=args.tx_wire_timing, p7_input=args.p7_input, p9_fast_path=not args.no_p9_fast_path, p9_fast_input=args.p9_fast_input, @@ -363,7 +373,7 @@ def _trace_frame(emulator: H8536Emulator, frame: bytes, config: RxDivergenceConf steps_total = 0 if config.uart_timing: - timing = UartTiming(baud=config.uart_baud) + timing = UartTiming.from_format(config.uart_format, baud=config.uart_baud) steps_total, stopped_reason = _inject_frame_uart_timed( emulator, frame, diff --git a/h8536/emulator/rx_probe.py b/h8536/emulator/rx_probe.py index 641b300..6c31158 100644 --- a/h8536/emulator/rx_probe.py +++ b/h8536/emulator/rx_probe.py @@ -24,6 +24,7 @@ from .uart import UartTiming CHECKSUM_SEED = 0x5A FRAME_LENGTH = 6 +HEARTBEAT_FRAME = bytes.fromhex("0000000080DA") CONNECT_LCD_FRAMES = ( bytes.fromhex("04000040001E"), @@ -66,6 +67,12 @@ ACCESS_RANGES = ( (0xE400, 0xE401, "secondary_table_index_0000"), (0xE800, 0xE801, "current_table_index_0000"), (0xEC00, 0xEC01, "flag_table_index_0000"), + (0xE000, 0xE3FF, "primary_table_E000"), + (0xE400, 0xE7FF, "secondary_table_E400"), + (0xE800, 0xEBFF, "current_table_E800"), + (0xEC00, 0xEFFF, "flag_table_EC00"), + (0xF400, 0xF4FF, "eeprom_shadow_F400"), + (0xF7B0, 0xF82F, "persistent_record_ram"), (0xF200, 0xF201, "lcd_ports"), ) @@ -85,12 +92,30 @@ STATE_BYTES = { 0xFAA4: "rx_error_latch", 0xFAA5: "retry_or_gate_flags", 0xFAA6: "retry_counter", + 0xEC02: "EC00_flag_index_0002", + 0xEC04: "EC00_flag_index_0004", + 0xEC12: "EC00_flag_index_0012", + 0xEC13: "EC00_flag_index_0013", + 0xEC15: "EC00_flag_index_0015", + 0xEC82: "EC00_flag_index_0082", } STATE_WORDS = { 0xE000: "E000_index_0000_primary", + 0xE004: "E000_index_0002_primary", + 0xE008: "E000_index_0004_primary", + 0xE024: "E000_index_0012_primary", + 0xE026: "E000_index_0013_primary", + 0xE02A: "E000_index_0015_primary", + 0xE104: "E000_index_0082_primary", 0xE400: "E400_index_0000_secondary", 0xE800: "E800_index_0000_current", + 0xE804: "E800_index_0002_current", + 0xE808: "E800_index_0004_current", + 0xE824: "E800_index_0012_current", + 0xE826: "E800_index_0013_current", + 0xE82A: "E800_index_0015_current", + 0xE904: "E800_index_0082_current", 0xF860: "rx_frame_01", 0xF862: "rx_frame_23", 0xF864: "rx_frame_45", @@ -206,8 +231,13 @@ def run_rx_probe( boot_steps: int = 250_000, per_byte_steps: int = 5_000, post_frame_steps: int = 80_000, + post_frame_ms: int | None = None, + wait_heartbeats: int = 0, + wait_heartbeat_steps: int = 1_500_000, uart_timing: bool = False, uart_baud: int = 38_400, + uart_format: str = "8E1", + tx_wire_timing: bool = False, interval_steps: int = 512, frt1_ocia_steps: int | None = None, frt2_ocia_steps: int | None = None, @@ -232,6 +262,7 @@ def run_rx_probe( p9_fast_default_wrapper_success=p9_fast_optimistic_wrapper, p7_input=p7_input, eeprom_seed=eeprom_seed, + sci1_tx_timing=UartTiming.from_format(uart_format, baud=uart_baud) if tx_wire_timing else None, ) if eeprom_image is not None: emulator.memory.load_eeprom_image(eeprom_image) @@ -242,9 +273,24 @@ def run_rx_probe( f"boot={boot_reason} steps={boot_steps_used} pc={h16(emulator.cpu.pc)} " f"SCR={emulator.sci1.scr:02X} SSR={emulator.sci1.ssr:02X} " f"rx_serviceable={int(_rx_ready(emulator))} " - f"clock_hz={emulator.clock_hz} " + f"clock_hz={emulator.clock_hz} uart_format={uart_format.upper()} " + f"tx_wire_timing={int(tx_wire_timing)} " f"lcd_display={emulator.memory.lcd.display_text(lines=4)!r}" ) + if wait_heartbeats: + initial_heartbeats = sum(1 for frame in emulator.sci1.tx_frames if frame == HEARTBEAT_FRAME) + target_heartbeats = initial_heartbeats + wait_heartbeats + + def heartbeat_predicate(inner: H8536Emulator) -> bool: + return sum(1 for frame in inner.sci1.tx_frames if frame == HEARTBEAT_FRAME) >= target_heartbeats + + wait_context = RunContext() + wait_steps, wait_reason = _run_until(emulator, wait_heartbeat_steps, heartbeat_predicate, wait_context) + final_heartbeats = sum(1 for frame in emulator.sci1.tx_frames if frame == HEARTBEAT_FRAME) + boot_summary += ( + f" wait_heartbeats={wait_heartbeats} wait_reason={wait_reason} " + f"wait_steps={wait_steps} heartbeat_count={final_heartbeats}" + ) results = [ _run_frame( @@ -252,8 +298,10 @@ def run_rx_probe( frame, per_byte_steps=per_byte_steps, post_frame_steps=post_frame_steps, + post_frame_ms=post_frame_ms, uart_timing=uart_timing, uart_baud=uart_baud, + uart_format=uart_format, stop_after_tx_frame=stop_after_tx_frame, ) for frame in frames @@ -269,8 +317,13 @@ def build_arg_parser() -> argparse.ArgumentParser: parser.add_argument("--boot-steps", type=int, default=250_000, help="maximum steps to boot until SCI1 RXI is serviceable") parser.add_argument("--per-byte-steps", type=int, default=5_000, help="polite mode byte-consume limit, or UART mode step limit between byte arrivals") parser.add_argument("--post-frame-steps", type=int, default=80_000, help="maximum steps after a full injected frame") - parser.add_argument("--uart-timing", action="store_true", help="inject frame bytes at real 8N1 UART inter-byte timing instead of waiting for RDRF consumption") + parser.add_argument("--post-frame-ms", type=int, help="run this many emulated milliseconds after each injected frame") + parser.add_argument("--wait-heartbeats", type=int, default=0, help="wait for this many heartbeat frames before injecting the first host frame") + parser.add_argument("--wait-heartbeat-steps", type=int, default=1_500_000, help="maximum steps while waiting for pre-injection heartbeat frames") + parser.add_argument("--uart-timing", action="store_true", help="inject frame bytes at real UART inter-byte timing instead of waiting for RDRF consumption") parser.add_argument("--uart-baud", type=parse_int, default=38_400, help="baud rate for --uart-timing; 38400 gives about 260 us per 8N1 byte") + parser.add_argument("--uart-format", default="8E1", help="UART character format for timed RX/TX modeling; real RCP link is 8E1") + parser.add_argument("--tx-wire-timing", action="store_true", help="delay SCI1 TDRE/TXI by one modeled UART character after each TDR write") parser.add_argument("--keep-listening", action="store_true", help="use all post-frame steps instead of stopping at the first new TX frame") parser.add_argument("--interval-steps", type=int, default=512, help="rough step period for the scaffolded interval timer interrupt") parser.add_argument("--clock-hz", type=parse_int, default=10_000_000, help="CPU/phi clock in Hz for calibrated FRT timing") @@ -303,8 +356,13 @@ def main(argv: list[str] | None = None) -> int: boot_steps=args.boot_steps, per_byte_steps=args.per_byte_steps, post_frame_steps=args.post_frame_steps, + post_frame_ms=args.post_frame_ms, + wait_heartbeats=args.wait_heartbeats, + wait_heartbeat_steps=args.wait_heartbeat_steps, uart_timing=args.uart_timing, uart_baud=args.uart_baud, + uart_format=args.uart_format, + tx_wire_timing=args.tx_wire_timing, interval_steps=args.interval_steps, frt1_ocia_steps=args.frt1_ocia_steps, frt2_ocia_steps=args.frt2_ocia_steps, @@ -327,6 +385,13 @@ def main(argv: list[str] | None = None) -> int: for line in result.lines(index): print(line) print("total_tx_frames=" + " | ".join(format_frame(frame) for frame in emulator.sci1.tx_frames)) + eeprom_writes = emulator.memory.p9_bus.x24164_bus.write_log_lines(limit=80) + if eeprom_writes: + print("eeprom_writes:") + for line in eeprom_writes: + print(f" {line}") + else: + print("eeprom_writes=none") if args.eeprom_save: args.eeprom_save.parent.mkdir(parents=True, exist_ok=True) args.eeprom_save.write_bytes(emulator.memory.dump_eeprom_image()) @@ -352,8 +417,10 @@ def _run_frame( *, per_byte_steps: int, post_frame_steps: int, + post_frame_ms: int | None, uart_timing: bool, uart_baud: int, + uart_format: str, stop_after_tx_frame: bool, ) -> FrameResult: state_before = _state_snapshot(emulator) @@ -363,7 +430,7 @@ def _run_frame( context = RunContext() stopped_reason = "post_frame_steps" steps_total = 0 - timing = UartTiming(baud=uart_baud) + timing = UartTiming.from_format(uart_format, baud=uart_baud) if uart_timing: steps_total, stopped_reason = _inject_frame_uart_timed( @@ -391,9 +458,13 @@ def _run_frame( def post_predicate(inner: H8536Emulator) -> bool: return stop_after_tx_frame and len(inner.sci1.tx_frames) >= target_frame_count - steps, reason = _run_until(emulator, post_frame_steps, post_predicate, context) + if post_frame_ms is not None: + steps, reason = _run_cycles_for_ms(emulator, post_frame_ms, context) + stopped_reason = reason + else: + steps, reason = _run_until(emulator, post_frame_steps, post_predicate, context) + stopped_reason = "tx_frame" if reason == "predicate" and stop_after_tx_frame else reason steps_total += steps - stopped_reason = "tx_frame" if reason == "predicate" and stop_after_tx_frame else reason log_end = len(emulator.memory.access_log) state_after = _state_snapshot(emulator) @@ -475,6 +546,22 @@ def _run_until_cycle( return max(0, max_steps), "max_steps" +def _run_cycles_for_ms(emulator: H8536Emulator, delta_ms: int, context: RunContext) -> tuple[int, str]: + target_delta_cycles = int((max(0, delta_ms) * max(1, emulator.clock_hz)) / 1000) + target_cycles = emulator.cpu.cycles + target_delta_cycles + completed = 0 + while emulator.cpu.cycles < target_cycles: + pc = emulator.cpu.pc + context.record_pc(pc) + try: + emulator.step() + except UnsupportedInstruction as exc: + context.unsupported = str(exc) + return completed, "unsupported_instruction" + completed += 1 + return completed, f"post_frame_ms_{delta_ms}" + + def _rx_ready(emulator: H8536Emulator) -> bool: if not (emulator.sci1.scr & SCI_SCR_RIE and emulator.sci1.scr & SCI_SCR_RE): return False @@ -546,7 +633,8 @@ def _access_lines(accesses: list[MemoryAccess]) -> list[str]: lines = [] for access in interesting[:80]: label = _access_label(access.address) - lines.append(f"{access.kind:<5} {h16(access.address)} {access.value:02X} {label}") + pc = f" pc={h16(access.pc)}" if access.pc is not None else "" + lines.append(f"{access.kind:<5} {h16(access.address)} {access.value:02X} {label}{pc}") if len(interesting) > 80: lines.append(f"... {len(interesting) - 80} more interesting accesses") return lines diff --git a/h8536/emulator/sci.py b/h8536/emulator/sci.py index 1a6378e..dcfefba 100644 --- a/h8536/emulator/sci.py +++ b/h8536/emulator/sci.py @@ -17,6 +17,7 @@ from .constants import ( SCI_SSR_RDRF, SCI_SSR_TDRE, ) +from .uart import UartTiming @dataclass @@ -53,6 +54,8 @@ class SCI1: _frame_buffer: bytearray = field(default_factory=bytearray) tx_ready_delay: int = 0 tx_ready_ticks: int = 2 + clock_hz: int = 10_000_000 + tx_timing: UartTiming | None = None _tx_ready_pending: bool = False def read(self, address: int) -> int: @@ -96,7 +99,7 @@ class SCI1: if len(self._frame_buffer) == len(HEARTBEAT_FRAME): self.tx_frames.append(bytes(self._frame_buffer)) self._frame_buffer.clear() - self.tx_ready_delay = max(0, self.tx_ready_ticks) + self.tx_ready_delay = self._tx_ready_delay() self._tx_ready_pending = True self.tx_events.append(SciTxEvent(SCI1_TDR, value, self.scr, self.ssr, emitted)) @@ -116,9 +119,22 @@ class SCI1: def saw_heartbeat(self) -> bool: return HEARTBEAT_FRAME in self.tx_frames - def tick(self) -> None: + def configure_tx_timing(self, timing: UartTiming | None, *, clock_hz: int | None = None) -> None: + self.tx_timing = timing + if clock_hz is not None: + self.clock_hz = max(1, clock_hz) + + def tx_busy(self) -> bool: + return self._tx_ready_pending and self.tx_ready_delay > 0 + + def tick(self, cycles: int = 1) -> None: if self._tx_ready_pending and self.tx_ready_delay: - self.tx_ready_delay -= 1 + self.tx_ready_delay = max(0, self.tx_ready_delay - max(1, cycles)) if self._tx_ready_pending and self.tx_ready_delay == 0 and not (self.ssr & SCI_SSR_TDRE): self.ssr |= SCI_SSR_TDRE self._tx_ready_pending = False + + def _tx_ready_delay(self) -> int: + if self.tx_timing is None: + return max(0, self.tx_ready_ticks) + return self.tx_timing.cycles_per_character(self.clock_hz) diff --git a/h8536/emulator/uart.py b/h8536/emulator/uart.py index f87b2bd..6a70f47 100644 --- a/h8536/emulator/uart.py +++ b/h8536/emulator/uart.py @@ -7,10 +7,20 @@ from dataclasses import dataclass class UartTiming: baud: int = 38_400 data_bits: int = 8 - parity_bits: int = 0 + parity: str = "N" stop_bits: int = 1 start_bits: int = 1 + def __post_init__(self) -> None: + parity = self.parity.upper() + if parity not in {"N", "E", "O"}: + raise ValueError("parity must be N, E, or O") + object.__setattr__(self, "parity", parity) + + @property + def parity_bits(self) -> int: + return 0 if self.parity == "N" else 1 + @property def bits_per_character(self) -> int: return self.start_bits + self.data_bits + self.parity_bits + self.stop_bits @@ -26,10 +36,27 @@ class UartTiming: def summary(self, clock_hz: int) -> str: return ( - f"uart_{self.data_bits}{'N' if self.parity_bits == 0 else 'P'}{self.stop_bits} " + f"uart_{self.data_bits}{self.parity}{self.stop_bits} " f"baud={self.baud} byte_us={self.micros_per_character():.3f} " f"byte_cycles={self.cycles_per_character(clock_hz)}" ) + @classmethod + def from_format(cls, text: str, *, baud: int = 38_400) -> "UartTiming": + normalized = text.strip().upper() + if len(normalized) != 3 or normalized[0] not in "78" or normalized[1] not in "NEO" or normalized[2] not in "12": + raise ValueError(f"unsupported UART format {text!r}; expected 8E1, 8N1, 8O1, etc.") + return cls(baud=baud, data_bits=int(normalized[0]), parity=normalized[1], stop_bits=int(normalized[2])) + + @classmethod + def from_sci_smr(cls, smr: int, *, baud: int = 38_400) -> "UartTiming": + data_bits = 7 if smr & 0x40 else 8 + if smr & 0x20: + parity = "O" if smr & 0x10 else "E" + else: + parity = "N" + stop_bits = 2 if smr & 0x08 else 1 + return cls(baud=baud, data_bits=data_bits, parity=parity, stop_bits=stop_bits) + __all__ = ["UartTiming"] diff --git a/scripts/connect_ok_advance_sweep.py b/scripts/connect_ok_advance_sweep.py new file mode 100644 index 0000000..6e0dd0c --- /dev/null +++ b/scripts/connect_ok_advance_sweep.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 +"""Bench runner for CONNECT: OK candidate advance/ACK sweeps.""" + +import sys +from pathlib import Path + + +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from h8536.connect_ok_advance_sweep import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_connect_ok_advance_sweep.py b/tests/test_connect_ok_advance_sweep.py new file mode 100644 index 0000000..e6ad4e7 --- /dev/null +++ b/tests/test_connect_ok_advance_sweep.py @@ -0,0 +1,52 @@ +import io +import unittest + +from h8536.connect_ok_advance_sweep import build_cases, main, _matches_target + + +class ConnectOkAdvanceSweepTest(unittest.TestCase): + def test_core_suite_starts_with_ack_then_refresh(self): + cases = build_cases("core") + + self.assertEqual([case.name for case in cases], ["ack-0040", "refresh-ok"]) + self.assertEqual(cases[0].frame.hex().upper(), "05004000001F") + self.assertEqual(cases[1].frame.hex().upper(), "0400008000DE") + + def test_latch_suite_includes_special_clear_candidates(self): + names = [case.name for case in build_cases("latch")] + + self.assertIn("ack-0096", names) + self.assertIn("ack-00f8", names) + + def test_dry_run_defaults_to_reactive_active_report_window(self): + stdout = io.StringIO() + + exit_code = main(["--dry-run", "--suite", "core", "--limit", "1"], stdout=stdout) + + self.assertEqual(exit_code, 0) + output = stdout.getvalue() + self.assertIn("device=COM5 38400 8E1", output) + self.assertIn("target_mode=active", output) + self.assertIn("baseline[1]=04 00 00 80 00 DE checksum_ok=1", output) + self.assertIn("case[1]=ack-0040 frame=05 00 40 00 00 1F checksum_ok=1", output) + + def test_custom_candidate_accepts_five_bytes_and_computes_checksum(self): + stdout = io.StringIO() + + exit_code = main(["--dry-run", "--candidate", "probe=05 00 6D 00 00"], stdout=stdout) + + self.assertEqual(exit_code, 0) + self.assertIn("case[1]=probe frame=05 00 6D 00 00 32 checksum_ok=1", stdout.getvalue()) + + def test_active_target_ignores_heartbeat_but_accepts_report(self): + self.assertFalse(_matches_target(bytes.fromhex("0000000080DA"), "active")) + self.assertTrue(_matches_target(bytes.fromhex("02000200005A"), "active")) + self.assertTrue(_matches_target(bytes.fromhex("07804040A07D"), "active")) + + def test_connect_ok_target_requires_known_ok_response(self): + self.assertTrue(_matches_target(bytes.fromhex("02000200005A"), "connect-ok")) + self.assertFalse(_matches_target(bytes.fromhex("010012000049"), "connect-ok")) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_emulator.py b/tests/test_emulator.py index 30fc3f6..56e1f7d 100644 --- a/tests/test_emulator.py +++ b/tests/test_emulator.py @@ -176,6 +176,21 @@ class EmulatorHarnessTest(unittest.TestCase): self.assertEqual(emulator.memory.read8(ON_CHIP_RAM_START), 0x99) + def test_memory_access_log_records_executing_pc(self): + rom = rom_with_reset(size=0x1010) + rom[0x1000:0x1005] = bytes([0x15, (ON_CHIP_RAM_START >> 8) & 0xFF, ON_CHIP_RAM_START & 0xFF, 0x06, 0x77]) + + emulator = H8536Emulator(bytes(rom)) + emulator.run(max_steps=1) + + writes = [ + access + for access in emulator.memory.access_log + if access.kind == "write" and access.address == ON_CHIP_RAM_START + ] + self.assertEqual(writes[-1].value, 0x77) + self.assertEqual(writes[-1].pc, 0x1000) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_emulator_sci_timing.py b/tests/test_emulator_sci_timing.py index 702d351..9cab5c1 100644 --- a/tests/test_emulator_sci_timing.py +++ b/tests/test_emulator_sci_timing.py @@ -83,6 +83,35 @@ class SciTimingTest(unittest.TestCase): self.assertAlmostEqual(timing.micros_per_character(), 260.416666, places=3) self.assertEqual(timing.cycles_per_character(10_000_000), 2604) + def test_uart_8e1_38400_byte_timing_matches_bench_link(self): + timing = UartTiming.from_format("8E1", baud=38_400) + + self.assertEqual(timing.bits_per_character, 11) + self.assertAlmostEqual(timing.micros_per_character(), 286.458333, places=3) + self.assertEqual(timing.cycles_per_character(10_000_000), 2865) + self.assertEqual(timing.summary(10_000_000).split()[0], "uart_8E1") + + def test_uart_timing_can_be_derived_from_sci_smr(self): + timing = UartTiming.from_sci_smr(0x24, baud=38_400) + + self.assertEqual((timing.data_bits, timing.parity, timing.stop_bits), (8, "E", 1)) + + def test_tdr_write_can_use_uart_character_time_for_tdre(self): + sci = SCI1() + sci.configure_tx_timing(UartTiming.from_format("8E1", baud=38_400), clock_hz=10_000_000) + sci.write(SCI1_SCR, sci.read(SCI1_SCR) | SCI_SCR_TE) + + sci.write(SCI1_TDR, 0x42) + sci.write(SCI1_SSR, sci.read(SCI1_SSR) & ~SCI_SSR_TDRE) + + self.assertTrue(sci.tx_busy()) + self.assertFalse(sci.read(SCI1_SSR) & SCI_SSR_TDRE) + sci.tick(2864) + self.assertFalse(sci.read(SCI1_SSR) & SCI_SSR_TDRE) + sci.tick(1) + self.assertTrue(sci.read(SCI1_SSR) & SCI_SSR_TDRE) + self.assertFalse(sci.tx_busy()) + if __name__ == "__main__": unittest.main()