1
0
Files
h8-536-decoder/h8536/state_map_runner.py
2026-05-26 13:16:50 +10:00

740 lines
32 KiB
Python

from __future__ import annotations
import argparse
import json
import re
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable, TextIO
from .bench_connect_lcd import (
BenchLogger,
FrameDetector,
add_serial_format_args,
_import_serial,
open_device_serial,
_relay_command,
_relay_settle,
_wait_for_ready,
format_frame,
frame_checksum,
frame_checksum_ok,
label_frame,
parse_frame,
serial_format_label,
)
READBACK_E000_FRAME = bytes.fromhex("01000000005B")
COMMAND7_REPEAT_FRAME = bytes.fromhex("07000000005D")
HEARTBEAT_FRAME = bytes.fromhex("0000000080DA")
CONNECT_FORCE_PRESETS: dict[str, tuple[bytes, int, str]] = {
"clear": (bytes.fromhex("04000000005E"), 0x0080, "selector-zero no-bit clear/inactive primer"),
"dxc": (bytes.fromhex("04000040001E"), 0x4080, "selector-zero CONNECT:DXC-637 candidate"),
"ok": (bytes.fromhex("0400008000DE"), 0x8080, "selector-zero CONNECT: OK candidate"),
"both": (bytes.fromhex("040000C0009E"), 0xC080, "selector-zero bit14+bit15 priority test"),
}
@dataclass(frozen=True)
class StateMapEvent:
direction: str
frame: bytes
timestamp_ms: int | None = None
label: str = ""
source: str = ""
@property
def frame_text(self) -> str:
return format_frame(self.frame)
@dataclass
class StateMapRunContext:
args: argparse.Namespace
logger: BenchLogger
detector: FrameDetector
device: Any
relay: Any | None = None
events: list[StateMapEvent] = field(default_factory=list)
def default_log_path() -> Path:
return Path("captures") / f"state-map-runner-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt"
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=(
"Run or analyze the PT2 state-map proof sequence: visible 07 drain candidate, "
"selector-zero force, E000[0] readback, and command-7 hidden-response probe."
)
)
parser.add_argument("--analyze-log", type=Path, help="analyze an existing bench log instead of opening serial ports")
parser.add_argument("--json-out", type=Path, help="write machine-readable state-map analysis")
parser.add_argument("--preset", choices=sorted(CONNECT_FORCE_PRESETS), default="ok", help="selector-zero force preset")
parser.add_argument("--force-frame", type=parse_frame, help="override preset with a custom selector-zero command-4 frame")
parser.add_argument("--expected-word", type=_int_arg, help="expected E000[0] readback word; default follows --preset")
parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP")
parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate")
add_serial_format_args(parser)
parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port")
parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate")
parser.add_argument("--no-power-cycle", action="store_true", help="do not send relay off/on before the test")
parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power")
parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power")
parser.add_argument("--off-seconds", type=float, default=1.5, help="seconds to hold the DUT powered off")
parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the relay port")
parser.add_argument("--ready-timeout", type=float, default=10.0, help="seconds to wait for heartbeat before sending")
parser.add_argument("--ready-heartbeats", type=int, default=2, help="heartbeat frames to observe before sending")
parser.add_argument("--require-ready", action="store_true", help="abort if ready heartbeat count is not observed")
parser.add_argument("--sync", choices=("checksum", "fixed"), default="checksum", help="RX frame sync strategy")
parser.add_argument("--pre-drain", type=float, default=0.250, help="seconds to drain/log RX before priming")
parser.add_argument("--prime-frame", action="append", type=parse_frame, help="optional trigger/tickle frame; repeatable")
parser.add_argument("--prime-repeat", type=int, default=0, help="times to send each prime frame while hunting a trigger")
parser.add_argument("--prime-gap", type=float, default=0.120, help="seconds to listen after each prime frame")
parser.add_argument("--trigger-timeout", type=float, default=3.0, help="seconds to wait for a visible 07 drain candidate")
parser.add_argument("--trigger-prefix", default="07", help="hex prefix for the trigger frame; default any device 07...")
parser.add_argument(
"--trigger-poll-interval",
type=float,
default=0.002,
help="seconds between non-blocking serial polls while hunting a trigger",
)
parser.add_argument(
"--read-poll-interval",
type=float,
default=0.002,
help="seconds between non-blocking serial polls during timed listen/guard windows",
)
parser.add_argument("--force-guard", type=float, default=0.005, help="seconds after the detected trigger before force TX")
parser.add_argument("--post-force-listen", type=float, default=0.050, help="seconds to listen before readback")
parser.add_argument("--readback-frame", type=parse_frame, default=READBACK_E000_FRAME, help="E000[0] readback frame")
parser.add_argument("--readback-window", type=float, default=0.300, help="seconds to listen after readback")
parser.add_argument("--no-command7-probe", action="store_true", help="skip command-7 previous-frame probe")
parser.add_argument("--command7-window", type=float, default=0.300, help="seconds to listen after command-7 probe")
parser.add_argument("--final-read", type=float, default=2.0, help="seconds to listen after the proof sequence")
parser.add_argument("--prompt-screen", action="store_true", help="prompt for observed LCD text after the sequence")
parser.add_argument("--log", type=Path, help="capture log path")
parser.add_argument("--dry-run", action="store_true", help="print the planned state-map sequence without opening ports")
return parser
def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
args = build_arg_parser().parse_args(argv)
force_frame, expected_word, preset_note = resolve_force(args)
if args.analyze_log:
text = args.analyze_log.read_text(encoding="utf-8")
events = parse_bench_log(text)
analysis = analyze_events(events, expected_word=expected_word)
print(format_analysis_report(analysis), file=stdout)
if args.json_out:
_write_json(args.json_out, analysis)
return 0
log_path = args.log or default_log_path()
if args.dry_run:
_print_dry_run(args, log_path, force_frame, expected_word, preset_note, stdout)
return 0
serial = _import_serial()
logger = BenchLogger(log_path, stdout=stdout)
detector = FrameDetector(sync_mode=args.sync)
try:
logger.emit("PT2 state-map proof runner")
logger.emit(
f"device={args.port} {args.baud} {serial_format_label(args)} "
f"relay={args.relay_port} {args.relay_baud} sync={args.sync}"
)
logger.emit(f"log={log_path}")
_emit_plan(logger, args, force_frame, expected_word, preset_note)
with open_device_serial(serial, args) as device:
ctx = StateMapRunContext(args=args, logger=logger, detector=detector, device=device)
try:
_prepare_device(ctx)
trigger = _hunt_trigger(ctx, _parse_prefix(args.trigger_prefix))
if trigger is None:
logger.event("STATE no visible-drain token candidate; force/readback skipped")
_read_for_collect(ctx, args.final_read)
return _finish(ctx, logger, expected_word, args.json_out)
logger.event(f"STATE visible-drain token candidate {trigger.frame_text}")
if args.force_guard > 0:
logger.event(f"STATE force guard {args.force_guard:.3f}s")
_read_for_collect(ctx, args.force_guard)
_send_and_record(ctx, force_frame, "selector_zero_force")
if args.post_force_listen > 0:
_read_for_collect(ctx, args.post_force_listen)
_send_and_record(ctx, args.readback_frame, "e000_readback")
_read_for_collect(ctx, args.readback_window)
if not args.no_command7_probe:
_send_and_record(ctx, COMMAND7_REPEAT_FRAME, "command7_previous_frame_probe")
_read_for_collect(ctx, args.command7_window)
_read_for_collect(ctx, args.final_read)
if args.prompt_screen:
_prompt_screen("LCD after state-map proof sequence", logger)
finally:
if ctx.relay is not None:
ctx.relay.close()
return _finish(ctx, logger, expected_word, args.json_out)
finally:
logger.close()
def resolve_force(args: argparse.Namespace) -> tuple[bytes, int, str]:
preset_frame, preset_word, preset_note = CONNECT_FORCE_PRESETS[args.preset]
frame = args.force_frame or preset_frame
expected_word = args.expected_word if args.expected_word is not None else _expected_word_for_force(frame, preset_word)
return frame, expected_word, preset_note
def analyze_events(events: Iterable[StateMapEvent], *, expected_word: int | None = None) -> dict[str, Any]:
event_list = list(events)
trigger_indexes = [index for index, event in enumerate(event_list) if is_visible_drain_candidate(event)]
first_trigger = trigger_indexes[0] if trigger_indexes else None
force_indexes = [index for index, event in enumerate(event_list) if is_selector_zero_force(event)]
first_force_after_trigger = _first_after(force_indexes, first_trigger)
readback_tx_indexes = [index for index, event in enumerate(event_list) if is_tx_frame(event, READBACK_E000_FRAME)]
command7_tx_indexes = [index for index, event in enumerate(event_list) if is_tx_frame(event, COMMAND7_REPEAT_FRAME)]
readback_rx_indexes = [index for index, event in enumerate(event_list) if is_selector_zero_readback(event)]
direct_readbacks = [_readback_info(event_list[index], expected_word) for index in readback_rx_indexes]
first_direct_after_force = _first_after(readback_rx_indexes, first_force_after_trigger)
command7_after_force = _first_after(command7_tx_indexes, first_force_after_trigger)
command7_replay = _first_after(readback_rx_indexes, command7_after_force) if command7_after_force is not None else None
warnings = _state_warnings(event_list, first_trigger, first_force_after_trigger)
outcome = _outcome(
first_trigger=first_trigger,
first_force=first_force_after_trigger,
first_direct=first_direct_after_force,
command7_replay=command7_replay,
events=event_list,
expected_word=expected_word,
)
facts = {
"kind": "pt2_state_map_analysis",
"event_count": len(event_list),
"expected_word": _word_payload(expected_word),
"outcome": outcome,
"warnings": warnings,
"trigger_candidates": [_event_payload(event_list[index]) for index in trigger_indexes],
"selector_zero_forces": [_event_payload(event_list[index]) for index in force_indexes],
"readback_tx": [_event_payload(event_list[index]) for index in readback_tx_indexes],
"command7_tx": [_event_payload(event_list[index]) for index in command7_tx_indexes],
"direct_readbacks": direct_readbacks,
"first_trigger_index": first_trigger,
"first_force_after_trigger_index": first_force_after_trigger,
"first_direct_readback_after_force_index": first_direct_after_force,
"command7_replay_readback_index": command7_replay,
"post_force_rx_labels": _post_force_rx_labels(event_list, first_force_after_trigger),
"state_map_notes": [
"A device 07... frame is treated as a visible F870 drain/token candidate, not proof by itself.",
"The proof target is a retained selector-zero readback: RX 04 00 QQ HH LL after the force/readback turn.",
"Command 0/1 or overlapping RX before the force can spend or clear the FAA2/FAA3 opportunity.",
],
}
return facts
def format_analysis_report(analysis: dict[str, Any]) -> str:
lines = [
"PT2 state-map analysis",
f"outcome={analysis['outcome']['name']} confidence={analysis['outcome']['confidence']}",
f"reason={analysis['outcome']['reason']}",
f"events={analysis['event_count']} triggers={len(analysis['trigger_candidates'])} "
f"forces={len(analysis['selector_zero_forces'])} readbacks={len(analysis['direct_readbacks'])}",
]
expected = analysis.get("expected_word") or {}
if expected:
lines.append(f"expected_e0000={expected['hex']}")
if analysis["trigger_candidates"]:
first = analysis["trigger_candidates"][0]
lines.append(f"first_trigger={first['frame']} label={first['label'] or '(unlabeled)'}")
if analysis["selector_zero_forces"]:
first = analysis["selector_zero_forces"][0]
lines.append(f"first_force={first['frame']}")
for readback in analysis["direct_readbacks"]:
match = " expected" if readback.get("matches_expected") else ""
lines.append(
f"readback frame={readback['frame']} qq=0x{readback['qq']:02X} "
f"value={readback['value_hex']}{match}"
)
for warning in analysis["warnings"]:
lines.append(f"warning={warning}")
labels = analysis.get("post_force_rx_labels", {})
if labels:
joined = ", ".join(f"{name}={count}" for name, count in sorted(labels.items()))
lines.append(f"post_force_rx={joined}")
return "\n".join(lines)
def parse_bench_log(text: str) -> list[StateMapEvent]:
lines = text.splitlines()
events: list[StateMapEvent] = []
rx_detect_seen = False
for line in lines:
detect = _DETECT_RE.match(line.strip())
if detect:
rx_detect_seen = True
frame = _parse_hex_bytes(detect.group("hex"))
if len(frame) == 6:
events.append(
StateMapEvent(
direction="rx",
frame=frame,
timestamp_ms=_timestamp_to_ms(detect.group("ts")),
label=detect.group("label"),
source="detect",
)
)
continue
chunk = _CHUNK_RE.match(line.strip())
if not chunk or chunk.group("direction") != "TX":
continue
frame = _parse_hex_bytes(chunk.group("hex"))
if len(frame) == 6:
events.append(
StateMapEvent(
direction="tx",
frame=frame,
timestamp_ms=_timestamp_to_ms(chunk.group("ts")),
label="",
source="tx_chunk",
)
)
if rx_detect_seen:
return events
detector = FrameDetector()
for line in lines:
chunk = _CHUNK_RE.match(line.strip())
if not chunk or chunk.group("direction") != "RX":
continue
data = _parse_hex_bytes(chunk.group("hex"))
for frame, label in detector.feed(data):
events.append(
StateMapEvent(
direction="rx",
frame=frame,
timestamp_ms=_timestamp_to_ms(chunk.group("ts")),
label=label,
source="rx_chunk_resync",
)
)
return events
def is_visible_drain_candidate(event: StateMapEvent) -> bool:
return event.direction == "rx" and len(event.frame) == 6 and frame_checksum_ok(event.frame) and event.frame[0] == 0x07
def is_selector_zero_force(event: StateMapEvent) -> bool:
return (
event.direction == "tx"
and len(event.frame) == 6
and frame_checksum_ok(event.frame)
and event.frame[0] == 0x04
and event.frame[1] == 0x00
and event.frame[2] == 0x00
)
def is_selector_zero_readback(event: StateMapEvent) -> bool:
return (
event.direction == "rx"
and len(event.frame) == 6
and frame_checksum_ok(event.frame)
and event.frame[0] == 0x04
and event.frame[1] == 0x00
)
def is_tx_frame(event: StateMapEvent, frame: bytes) -> bool:
return event.direction == "tx" and event.frame == frame
def _prepare_device(ctx: StateMapRunContext) -> None:
args = ctx.args
if not args.no_power_cycle:
serial = _import_serial()
ctx.relay = serial.Serial(args.relay_port, args.relay_baud, timeout=0.25)
_relay_settle(ctx.relay, args.relay_settle, ctx.logger)
_relay_command(ctx.relay, args.power_off_command, ctx.logger)
time.sleep(args.off_seconds)
ctx.device.reset_input_buffer()
ctx.detector = FrameDetector(sync_mode=args.sync)
_relay_command(ctx.relay, args.power_on_command, ctx.logger)
else:
ctx.device.reset_input_buffer()
ready = _wait_for_ready(ctx.device, ctx.detector, ctx.logger, args.ready_timeout, args.ready_heartbeats)
if args.require_ready and not ready:
raise SystemExit(2)
if args.pre_drain > 0:
ctx.logger.event(f"STATE pre-drain {args.pre_drain:.3f}s")
_read_for_collect(ctx, args.pre_drain)
def _hunt_trigger(ctx: StateMapRunContext, prefix: bytes) -> StateMapEvent | None:
args = ctx.args
primes = list(args.prime_frame or [])
for repeat_index in range(max(0, args.prime_repeat)):
for prime_index, prime in enumerate(primes, start=1):
ctx.logger.event(f"STATE prime {repeat_index + 1}/{args.prime_repeat}.{prime_index}")
_send_and_record(ctx, prime, "prime")
trigger = _read_until_trigger(ctx, args.prime_gap, prefix)
if trigger is not None:
return trigger
return _read_until_trigger(ctx, args.trigger_timeout, prefix)
def _read_until_trigger(ctx: StateMapRunContext, seconds: float, prefix: bytes) -> StateMapEvent | None:
ctx.logger.event(f"STATE wait visible-drain prefix={format_frame(prefix)} timeout={seconds:.3f}s")
deadline = time.monotonic() + max(0.0, seconds)
poll_interval = max(0.001, float(getattr(ctx.args, "trigger_poll_interval", 0.002)))
while time.monotonic() < deadline:
events = _read_available_for_collect(ctx)
for event in events:
if is_visible_drain_candidate(event) and event.frame.startswith(prefix):
return event
remaining = deadline - time.monotonic()
if remaining > 0:
time.sleep(min(poll_interval, remaining))
return None
def _read_for_collect(ctx: StateMapRunContext, seconds: float) -> list[StateMapEvent]:
observed: list[StateMapEvent] = []
deadline = time.monotonic() + max(0.0, seconds)
poll_interval = max(0.001, float(getattr(ctx.args, "read_poll_interval", 0.002)))
while time.monotonic() < deadline:
events = _read_available_for_collect(ctx)
if events:
observed.extend(events)
continue
remaining = deadline - time.monotonic()
if remaining > 0:
time.sleep(min(poll_interval, remaining))
return observed
def _read_available_for_collect(ctx: StateMapRunContext) -> list[StateMapEvent]:
waiting = getattr(ctx.device, "in_waiting", 0)
if waiting <= 0:
return []
return _record_rx_data(ctx, ctx.device.read(waiting))
def _record_rx_data(ctx: StateMapRunContext, data: bytes) -> list[StateMapEvent]:
observed: list[StateMapEvent] = []
dropped_before = ctx.detector.dropped_bytes
ctx.logger.chunk("RX", data)
for frame, label in ctx.detector.feed(data):
event = StateMapEvent(
direction="rx",
frame=frame,
timestamp_ms=_now_ms(),
label=label,
source="live",
)
ctx.events.append(event)
observed.append(event)
state_label = _state_frame_label(event)
ctx.logger.event(f"DETECT {label} {format_frame(frame)}")
if state_label:
ctx.logger.event(f"STATE_FRAME {state_label} {format_frame(frame)}")
dropped_now = ctx.detector.dropped_bytes - dropped_before
if dropped_now:
ctx.logger.event(
f"RESYNC dropped_bytes={dropped_now} total_dropped={ctx.detector.dropped_bytes} "
f"buffered={len(ctx.detector.buffer)}"
)
return observed
def _send_and_record(ctx: StateMapRunContext, frame: bytes, label: str) -> None:
ctx.device.write(frame)
ctx.device.flush()
ctx.logger.chunk("TX", frame)
ctx.logger.event(f"SENT {label} checksum_ok={int(frame_checksum_ok(frame))}")
ctx.events.append(
StateMapEvent(direction="tx", frame=frame, timestamp_ms=_now_ms(), label=label, source="live")
)
def _finish(ctx: StateMapRunContext, logger: BenchLogger, expected_word: int | None, json_path: Path | None) -> int:
analysis = analyze_events(ctx.events, expected_word=expected_word)
logger.emit()
logger.emit(format_analysis_report(analysis))
logger.emit()
logger.emit("Summary")
logger.emit(f"rx_frames={len(ctx.detector.frames)} trailing_unframed_bytes={len(ctx.detector.buffer)}")
logger.emit(f"resync_events={ctx.detector.resync_events} dropped_bytes={ctx.detector.dropped_bytes}")
for label, count in sorted(ctx.detector.labels.items()):
logger.emit(f"{label}={count}")
if json_path:
_write_json(json_path, analysis)
return 0
def _outcome(
*,
first_trigger: int | None,
first_force: int | None,
first_direct: int | None,
command7_replay: int | None,
events: list[StateMapEvent],
expected_word: int | None,
) -> dict[str, str]:
if first_trigger is None:
return {
"name": "no_visible_drain_token",
"confidence": "high",
"reason": "No device 07... frame was observed, so the alternate RX opportunity was not demonstrated.",
}
if first_force is None:
return {
"name": "token_observed_but_not_forced",
"confidence": "high",
"reason": "A device 07... frame was observed, but no selector-zero command-4 force followed it.",
}
readback_index = first_direct if first_direct is not None else command7_replay
if readback_index is not None:
value = _readback_value(events[readback_index].frame)
if expected_word is not None and value == expected_word:
return {
"name": "selector_zero_retained",
"confidence": "high",
"reason": f"E000[0] readback matched expected 0x{expected_word:04X}.",
}
return {
"name": "selector_zero_readback_unexpected",
"confidence": "medium",
"reason": f"A selector-zero readback appeared, but value 0x{value:04X} did not match the expected word.",
}
if _only_heartbeat_after_force(events, first_force):
return {
"name": "force_not_proven_heartbeat_only",
"confidence": "medium",
"reason": "After the force/readback turn, only heartbeat frames were observed.",
}
return {
"name": "force_not_proven",
"confidence": "medium",
"reason": "No direct or command-7-recovered selector-zero readback was observed after the force.",
}
def _state_warnings(events: list[StateMapEvent], trigger_index: int | None, force_index: int | None) -> list[str]:
warnings: list[str] = []
if trigger_index is None or force_index is None:
return warnings
trigger = events[trigger_index]
force = events[force_index]
if trigger.timestamp_ms is not None and force.timestamp_ms is not None:
guard_ms = force.timestamp_ms - trigger.timestamp_ms
if guard_ms < 2:
warnings.append(f"force_guard_short_{guard_ms}ms_may_overlap_TXI")
between = events[trigger_index + 1 : force_index]
for event in between:
if event.direction != "tx" or not event.frame:
continue
command = event.frame[0]
if command == 0x00:
warnings.append("command0_between_trigger_and_force_can_destroy_token")
elif command == 0x01:
warnings.append("command1_readback_between_trigger_and_force_can_spend_token")
elif command in {0x04, 0x05, 0x06}:
warnings.append(f"command{command}_between_trigger_and_force_can_spend_alternate_tail")
return sorted(set(warnings))
def _post_force_rx_labels(events: list[StateMapEvent], force_index: int | None) -> dict[str, int]:
if force_index is None:
return {}
counts: dict[str, int] = {}
for event in events[force_index + 1 :]:
if event.direction != "rx":
continue
label = _state_frame_label(event) or event.label or label_frame(event.frame) or "rx_unlabeled"
counts[label] = counts.get(label, 0) + 1
return counts
def _state_frame_label(event: StateMapEvent) -> str:
if event.direction == "rx" and is_visible_drain_candidate(event):
return "visible_drain_token_candidate"
if event.direction == "rx" and is_selector_zero_readback(event):
return "selector_zero_readback_proof_candidate"
if event.direction == "tx" and is_selector_zero_force(event):
return "selector_zero_force"
if event.direction == "tx" and event.frame == READBACK_E000_FRAME:
return "e000_readback_probe"
if event.direction == "tx" and event.frame == COMMAND7_REPEAT_FRAME:
return "command7_previous_frame_probe"
return ""
def _readback_info(event: StateMapEvent, expected_word: int | None = None) -> dict[str, Any]:
value = _readback_value(event.frame)
return {
"frame": event.frame_text,
"timestamp_ms": event.timestamp_ms,
"qq": event.frame[2],
"value": value,
"value_hex": f"0x{value:04X}",
"matches_expected": expected_word is not None and value == expected_word,
}
def _readback_value(frame: bytes) -> int:
return ((frame[3] << 8) | frame[4]) & 0xFFFF
def _first_after(indexes: list[int], anchor: int | None) -> int | None:
if anchor is None:
return indexes[0] if indexes else None
for index in indexes:
if index > anchor:
return index
return None
def _only_heartbeat_after_force(events: list[StateMapEvent], force_index: int) -> bool:
rx_after = [event for event in events[force_index + 1 :] if event.direction == "rx"]
return bool(rx_after) and all(event.frame == HEARTBEAT_FRAME for event in rx_after)
def _event_payload(event: StateMapEvent) -> dict[str, Any]:
return {
"direction": event.direction,
"frame": event.frame_text,
"timestamp_ms": event.timestamp_ms,
"label": event.label,
"state_label": _state_frame_label(event),
"source": event.source,
}
def _word_payload(word: int | None) -> dict[str, Any] | None:
if word is None:
return None
return {"value": word & 0xFFFF, "hex": f"0x{word & 0xFFFF:04X}"}
def _emit_plan(logger: BenchLogger, args: argparse.Namespace, force_frame: bytes, expected_word: int, preset_note: str) -> None:
logger.emit(f"preset={args.preset} note={preset_note}")
logger.emit(f"force={format_frame(force_frame)} checksum_ok={int(frame_checksum_ok(force_frame))}")
logger.emit(f"expected_e0000=0x{expected_word:04X}")
logger.emit(f"readback={format_frame(args.readback_frame)} checksum_ok={int(frame_checksum_ok(args.readback_frame))}")
logger.emit(f"command7_probe={int(not args.no_command7_probe)} frame={format_frame(COMMAND7_REPEAT_FRAME)}")
logger.emit("guardrails=no command-0/command-1 is sent between trigger and force by this runner")
def _print_dry_run(
args: argparse.Namespace,
log_path: Path,
force_frame: bytes,
expected_word: int,
preset_note: str,
stdout: TextIO,
) -> None:
print("PT2 state-map proof runner", file=stdout)
print(f"device={args.port} {args.baud} {serial_format_label(args)}", file=stdout)
print(f"relay={args.relay_port} {args.relay_baud}", file=stdout)
print(f"power_cycle={int(not args.no_power_cycle)}", file=stdout)
print(f"preset={args.preset} note={preset_note}", file=stdout)
print(f"force={format_frame(force_frame)} checksum_ok={int(frame_checksum_ok(force_frame))}", file=stdout)
print(f"expected_e0000=0x{expected_word:04X}", file=stdout)
print(f"readback={format_frame(args.readback_frame)} checksum_ok={int(frame_checksum_ok(args.readback_frame))}", file=stdout)
for prime in args.prime_frame or []:
print(f"prime={format_frame(prime)} checksum_ok={int(frame_checksum_ok(prime))}", file=stdout)
print(f"prime_repeat={args.prime_repeat} prime_gap={args.prime_gap:.3f}", file=stdout)
print(f"trigger_prefix={format_frame(_parse_prefix(args.trigger_prefix))} timeout={args.trigger_timeout:.3f}", file=stdout)
print(f"force_guard={args.force_guard:.3f} post_force_listen={args.post_force_listen:.3f}", file=stdout)
print(f"command7_probe={int(not args.no_command7_probe)} frame={format_frame(COMMAND7_REPEAT_FRAME)}", file=stdout)
print(f"log={log_path}", file=stdout)
def _expected_word_for_force(frame: bytes, default: int) -> int:
if len(frame) == 6 and frame_checksum_ok(frame) and frame[0] == 0x04 and frame[1] == 0 and frame[2] == 0:
return ((frame[3] << 8) | 0x0080) & 0xFFFF
return default
def _parse_prefix(text: str) -> bytes:
normalized = text.strip().replace(",", " ").replace(":", " ").replace("-", " ").replace("_", " ")
if not normalized:
return b""
parts = normalized.split()
if len(parts) == 1:
compact = parts[0]
if compact.lower().startswith("0x"):
compact = compact[2:]
if compact.upper().startswith("H'"):
compact = compact[2:]
if len(compact) % 2:
compact = "0" + compact
return bytes(int(compact[index : index + 2], 16) for index in range(0, len(compact), 2))
return bytes(int(part, 16) for part in parts)
def _parse_hex_bytes(text: str) -> bytes:
normalized = text.strip().replace(",", " ").replace(":", " ").replace("-", " ").replace("_", " ")
if not normalized:
return b""
return bytes(int(part, 16) for part in normalized.split())
def _timestamp_to_ms(text: str) -> int:
hour, minute, rest = text.split(":")
second, milli = rest.split(".")
return ((int(hour) * 60 + int(minute)) * 60 + int(second)) * 1000 + int(milli)
def _now_ms() -> int:
now = datetime.now()
return ((now.hour * 60 + now.minute) * 60 + now.second) * 1000 + now.microsecond // 1000
def _int_arg(text: str) -> int:
return int(text, 0)
def _prompt_screen(label: str, logger: BenchLogger) -> None:
note = input(f"{label}: type observed LCD text, or press Enter to skip: ").strip()
logger.event(f"SCREEN {label}: {note or '(no note)'}")
def _write_json(path: Path, analysis: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(analysis, indent=2, sort_keys=True) + "\n", encoding="utf-8")
_CHUNK_RE = re.compile(
r"^(?P<ts>\d\d:\d\d:\d\d\.\d{3})\s+(?P<direction>TX|RX)\s+\d+\s+bytes\s+(?P<hex>[0-9A-Fa-f ]+)$"
)
_DETECT_RE = re.compile(
r"^(?P<ts>\d\d:\d\d:\d\d\.\d{3})\s+DETECT\s+(?P<label>\S+)\s+(?P<hex>[0-9A-Fa-f ]+)$"
)
__all__ = [
"COMMAND7_REPEAT_FRAME",
"CONNECT_FORCE_PRESETS",
"READBACK_E000_FRAME",
"StateMapEvent",
"analyze_events",
"build_arg_parser",
"format_analysis_report",
"main",
"parse_bench_log",
"resolve_force",
]