1
0

RX-tx understanding

This commit is contained in:
Aiden
2026-05-26 10:48:39 +10:00
parent d1d924c408
commit 421c9f4567
13 changed files with 3968 additions and 0 deletions

View File

@@ -0,0 +1,393 @@
from __future__ import annotations
import argparse
import itertools
import json
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, TextIO
from ..formatting import h16, parse_int
from .cli import load_rom
from .errors import UnsupportedInstruction
from .runner import H8536Emulator
from .rx_probe import RunContext, _run_until, _rx_ready
CONNECT_WORDS = (0x0000, 0x0080, 0x4080, 0x8080, 0xC080)
F730_LATCH_VALUES = (0x00, 0x01, 0x41, 0x81, 0xC1)
PRESET_DESCRIPTIONS = {
"connect-branch": "Start at loc_2CB9 with E000[0]/F730 matrix patches.",
"connect-queue": "Queue selector zero in F970, start at loc_2806, then enter loc_2CB9 through the ROM dispatch.",
"custom": "Use only --byte/--word/--matrix-* patches and --pc.",
}
@dataclass(frozen=True)
class StatePatch:
size: int
address: int
value: int
source: str = "user"
def label(self) -> str:
kind = "byte" if self.size == 1 else "word"
width = 2 if self.size == 1 else 4
return f"{kind}:{h16(self.address)}=0x{self.value:0{width}X}"
@dataclass(frozen=True)
class SearchCase:
patches: tuple[StatePatch, ...]
pc: int
def name(self) -> str:
return ", ".join(patch.label() for patch in self.patches)
@dataclass(frozen=True)
class SearchResult:
case_index: int
patches: tuple[StatePatch, ...]
pc: int
steps: int
stopped_reason: str
final_pc: int
display: str
line0: str
outcome: str
f730: int
e000: int
f9b4: int
f9b9: int
unsupported: str | None = None
def payload(self) -> dict[str, object]:
return {
"case_index": self.case_index,
"patches": [patch.label() for patch in self.patches],
"pc": h16(self.pc),
"steps": self.steps,
"stopped_reason": self.stopped_reason,
"final_pc": h16(self.final_pc),
"display": self.display,
"line0": self.line0,
"outcome": self.outcome,
"f730": f"0x{self.f730:02X}",
"e000": f"0x{self.e000:04X}",
"f9b4": f"0x{self.f9b4:02X}",
"f9b9": f"0x{self.f9b9:02X}",
"unsupported": self.unsupported,
}
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=(
"Patch internal emulator state and search for CONNECT LCD outcomes. "
"This is bounded ROM execution, not an unbounded serial fuzzer."
)
)
parser.add_argument("--preset", choices=sorted(PRESET_DESCRIPTIONS), default="connect-queue")
parser.add_argument("--rom", type=Path, help="ROM image path; defaults to ROM/M27C512@DIP28_1.BIN when present")
parser.add_argument("--pc", type=parse_int, help="entry PC for custom/direct tests")
parser.add_argument("--stop-pc", type=parse_int, default=0xFFFF, help="sentinel PC used to stop after RTS")
parser.add_argument("--stack", type=parse_int, default=0xFF00, help="temporary stack pointer for direct function entry")
parser.add_argument("--boot-steps", type=int, default=250_000, help="steps to boot before patching state")
parser.add_argument("--max-steps", type=int, default=120_000, help="steps to run each patched case")
parser.add_argument("--clock-hz", type=parse_int, default=10_000_000)
parser.add_argument("--interval-steps", type=int, default=512)
parser.add_argument("--frt1-ocia-steps", type=int, default=None)
parser.add_argument("--frt2-ocia-steps", type=int, default=None)
parser.add_argument("--no-p9-fast-path", action="store_true")
parser.add_argument("--p9-fast-input", type=parse_int, default=0xFF)
parser.add_argument("--p9-fast-optimistic-wrapper", action="store_true")
parser.add_argument("--p7-input", type=parse_int, default=0xFF)
parser.add_argument("--eeprom-seed", choices=("blank", "factory"), default="blank")
parser.add_argument("--byte", action="append", default=[], help="fixed byte patch, e.g. F730=0")
parser.add_argument("--word", action="append", default=[], help="fixed word patch, e.g. E000=0x8080")
parser.add_argument("--matrix-byte", action="append", default=[], help="byte matrix patch, e.g. F730=0,1,0x41")
parser.add_argument("--matrix-word", action="append", default=[], help="word matrix patch, e.g. E000=0x4080,0x8080")
parser.add_argument("--target", choices=("ok", "dxc", "not-act", "any-connect", "changed"), default="ok")
parser.add_argument("--first-hit", action="store_true", help="stop after the first target hit")
parser.add_argument("--show-all", action="store_true", help="print every case, not only hits/non-baseline outcomes")
parser.add_argument("--limit", type=int, help="maximum number of cases to run")
parser.add_argument("--json-out", type=Path, help="write machine-readable results")
parser.add_argument("--dry-run", action="store_true", help="print planned cases without running the emulator")
return parser
def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
args = build_arg_parser().parse_args(argv)
cases = build_cases(args)
if args.limit is not None:
cases = cases[: max(0, args.limit)]
if args.dry_run:
print(f"preset={args.preset} cases={len(cases)}", file=stdout)
print(PRESET_DESCRIPTIONS[args.preset], file=stdout)
for index, case in enumerate(cases[:32]):
print(f"case[{index}] pc={h16(case.pc)} {case.name()}", file=stdout)
if len(cases) > 32:
print(f"... {len(cases) - 32} more cases", file=stdout)
return 0
results = run_search(args, cases)
print(format_results(results, target=args.target, show_all=args.show_all), file=stdout)
if args.json_out:
write_json(args.json_out, args, cases, results)
return 0
def build_cases(args: argparse.Namespace) -> list[SearchCase]:
fixed = []
fixed.extend(parse_patch_list(args.byte, size=1, source="user"))
fixed.extend(parse_patch_list(args.word, size=2, source="user"))
matrix_groups: list[list[StatePatch]] = []
if args.preset == "connect-branch":
matrix_groups.extend(
[
[StatePatch(2, 0xE000, value, "preset") for value in CONNECT_WORDS],
[StatePatch(1, 0xF730, value, "preset") for value in F730_LATCH_VALUES],
]
)
default_pc = 0x2CB9
elif args.preset == "connect-queue":
fixed.extend(
[
StatePatch(1, 0xF9B9, 0x00, "preset"),
StatePatch(1, 0xF9B4, 0x01, "preset"),
StatePatch(2, 0xF970, 0x0000, "preset"),
]
)
matrix_groups.extend(
[
[StatePatch(2, 0xE000, value, "preset") for value in CONNECT_WORDS],
[StatePatch(1, 0xF730, value, "preset") for value in F730_LATCH_VALUES],
]
)
default_pc = 0x2806
else:
default_pc = 0x2CB9
matrix_groups.extend(parse_matrix_list(args.matrix_byte, size=1, source="user"))
matrix_groups.extend(parse_matrix_list(args.matrix_word, size=2, source="user"))
pc = args.pc if args.pc is not None else default_pc
if not matrix_groups:
return [SearchCase(tuple(fixed), pc)]
return [
SearchCase(tuple(fixed + list(combo)), pc)
for combo in itertools.product(*matrix_groups)
]
def run_search(args: argparse.Namespace, cases: Iterable[SearchCase]) -> list[SearchResult]:
rom_bytes, _rom_path = load_rom(args.rom)
results: list[SearchResult] = []
for index, case in enumerate(cases):
emulator = H8536Emulator(
rom_bytes,
interval_steps=args.interval_steps,
frt1_ocia_steps=args.frt1_ocia_steps,
frt2_ocia_steps=args.frt2_ocia_steps,
clock_hz=args.clock_hz,
p9_fast_path_enabled=not args.no_p9_fast_path,
p9_fast_default_input_byte=args.p9_fast_input,
p9_fast_default_wrapper_success=args.p9_fast_optimistic_wrapper,
p7_input=args.p7_input,
eeprom_seed=args.eeprom_seed,
)
_run_until(emulator, args.boot_steps, _rx_ready, RunContext())
result = run_case(emulator, case, max_steps=args.max_steps, stop_pc=args.stop_pc, stack=args.stack)
results.append(SearchResult(index, case.patches, case.pc, *result))
if args.first_hit and target_matches(results[-1].outcome, args.target):
break
return results
def run_case(
emulator: H8536Emulator,
case: SearchCase,
*,
max_steps: int,
stop_pc: int,
stack: int,
) -> tuple[int, str, int, str, str, str, int, int, int, int, str | None]:
for patch in case.patches:
apply_patch(emulator, patch)
emulator.cpu.sr |= 0x0700
emulator.cpu.regs[7] = stack & 0xFFFF
emulator.memory.write16(stack, stop_pc)
emulator.cpu.pc = case.pc & 0xFFFF
stopped_reason = "max_steps"
unsupported: str | None = None
steps = 0
for index in range(max(0, max_steps)):
if emulator.cpu.pc == (stop_pc & 0xFFFF):
stopped_reason = "stop_pc"
break
try:
emulator.step()
except UnsupportedInstruction as exc:
stopped_reason = "unsupported_instruction"
unsupported = str(exc)
break
steps = index + 1
display = emulator.memory.lcd.display_text(lines=4, width=16)
line0 = emulator.memory.lcd.line_text(0)
outcome = classify_display(display)
return (
steps,
stopped_reason,
emulator.cpu.pc,
display,
line0,
outcome,
emulator.memory.read8(0xF730),
emulator.memory.read16(0xE000),
emulator.memory.read8(0xF9B4),
emulator.memory.read8(0xF9B9),
unsupported,
)
def apply_patch(emulator: H8536Emulator, patch: StatePatch) -> None:
if patch.size == 1:
emulator.memory.write8(patch.address, patch.value)
elif patch.size == 2:
emulator.memory.write16(patch.address, patch.value)
else:
raise ValueError(f"unsupported patch size {patch.size}")
def classify_display(display: str) -> str:
if "CONNECT: OK" in display:
return "ok"
if "CONNECT:DXC-637" in display:
return "dxc"
if "CONNECT:NOT ACT" in display or "CONNECT NOT ACT" in display:
return "not-act"
if "CONNECT" in display:
return "other-connect"
return "other"
def target_matches(outcome: str, target: str) -> bool:
if target == "any-connect":
return outcome in {"ok", "dxc", "not-act", "other-connect"}
if target == "changed":
return outcome != "not-act"
return outcome == target
def format_results(results: list[SearchResult], *, target: str, show_all: bool = False) -> str:
lines = [
"Emulator CONNECT state search",
f"cases={len(results)} target={target}",
]
hits = [result for result in results if target_matches(result.outcome, target)]
lines.append(f"hits={len(hits)}")
selected = results if show_all else [result for result in results if result.outcome != "not-act"]
if not selected:
selected = hits[:]
for result in selected:
hit = "hit" if target_matches(result.outcome, target) else "miss"
lines.append(
f"[{result.case_index:03d}] {hit} outcome={result.outcome} stopped={result.stopped_reason} "
f"steps={result.steps} pc={h16(result.final_pc)} E000=0x{result.e000:04X} "
f"F730=0x{result.f730:02X} line0={result.line0!r}"
)
lines.append(" " + ", ".join(patch.label() for patch in result.patches))
return "\n".join(lines)
def parse_patch_list(values: Iterable[str], *, size: int, source: str) -> list[StatePatch]:
return [parse_single_patch(value, size=size, source=source) for value in values]
def parse_matrix_list(values: Iterable[str], *, size: int, source: str) -> list[list[StatePatch]]:
return [parse_matrix_patch(value, size=size, source=source) for value in values]
def parse_single_patch(text: str, *, size: int, source: str = "user") -> StatePatch:
address, value_text = split_assignment(text)
value = parse_int(value_text)
return StatePatch(size, parse_address(address), value & mask_for_size(size), source)
def parse_matrix_patch(text: str, *, size: int, source: str = "user") -> list[StatePatch]:
address, values_text = split_assignment(text)
address_value = parse_address(address)
patches = []
for value_text in values_text.split(","):
if not value_text.strip():
continue
patches.append(StatePatch(size, address_value, parse_int(value_text) & mask_for_size(size), source))
if not patches:
raise argparse.ArgumentTypeError(f"matrix patch has no values: {text!r}")
return patches
def split_assignment(text: str) -> tuple[str, str]:
if "=" not in text:
raise argparse.ArgumentTypeError(f"patch must be ADDRESS=VALUE, got {text!r}")
left, right = text.split("=", 1)
if not left.strip() or not right.strip():
raise argparse.ArgumentTypeError(f"patch must be ADDRESS=VALUE, got {text!r}")
return left.strip(), right.strip()
def parse_address(text: str) -> int:
token = text.strip()
if token.upper().startswith("H'"):
token = "0x" + token[2:]
elif not token.lower().startswith("0x") and any(char in token.upper() for char in "ABCDEF"):
token = "0x" + token
return parse_int(token) & 0xFFFF
def mask_for_size(size: int) -> int:
if size == 1:
return 0xFF
if size == 2:
return 0xFFFF
raise ValueError(f"unsupported patch size {size}")
def write_json(path: Path, args: argparse.Namespace, cases: list[SearchCase], results: list[SearchResult]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"kind": "h8536_emulator_state_search",
"preset": args.preset,
"description": PRESET_DESCRIPTIONS[args.preset],
"target": args.target,
"case_count": len(cases),
"result_count": len(results),
"hits": [result.payload() for result in results if target_matches(result.outcome, args.target)],
"results": [result.payload() for result in results],
}
path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
__all__ = [
"CONNECT_WORDS",
"F730_LATCH_VALUES",
"SearchCase",
"SearchResult",
"StatePatch",
"apply_patch",
"build_arg_parser",
"build_cases",
"classify_display",
"main",
"parse_address",
"parse_matrix_patch",
"parse_single_patch",
"run_case",
"run_search",
"target_matches",
]

754
h8536/rx_branch_trace.py Normal file
View File

@@ -0,0 +1,754 @@
from __future__ import annotations
import argparse
import json
from collections.abc import Mapping
from pathlib import Path
from typing import Any
from .formatting import h16
JsonObject = dict[str, Any]
DEFAULT_INPUT = Path("build/rom_decompiled.json")
RX_CAPTURE_START = 0xF868
RX_CAPTURE_END = 0xF86D
RX_VALIDATION_START = 0xF860
RX_VALIDATION_END = 0xF865
TX_STAGING_START = 0xF850
TX_STAGING_END = 0xF854
CHECKSUM_SEED = 0x5A
def load_rx_branch_input(path: Path) -> JsonObject:
with path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict) or "instructions" not in payload:
raise ValueError(f"{path} does not look like h8536_decompiler JSON output")
return payload
def analyze_rx_branch_trace(payload: Mapping[str, Any]) -> JsonObject:
instructions = _instruction_sequence(payload.get("instructions"))
by_address = {int(ins["address"]): ins for ins in instructions if "address" in ins}
selector_decode = _selector_decode(by_address)
stages = [
_rx_interrupt_capture(by_address),
_frame_validation(by_address),
_dispatcher(by_address),
_error_retry_path(by_address),
_pending_selector_ring(by_address),
_resend_and_timeout_paths(by_address),
]
commands = _command_branches(by_address)
table_surfaces = _table_surfaces(by_address)
downstream_traces = _downstream_traces(by_address)
feedback_loops = _feedback_loops(by_address)
confidence = _confidence(stages, commands)
return {
"kind": "rx_branch_trace",
"summary": {
"title": "SCI1 RX frame branch map",
"confidence": confidence,
"core_finding": (
"The ROM captures six SCI1 bytes, validates a 0x5A-seeded XOR checksum, "
"decodes RX[0] & 0x07, then splits into initial commands while FAA2 == 0 "
"and continuation commands while FAA2 != 0."
),
},
"frame_model": {
"channel": "SCI1",
"capture_buffer": _range_payload(RX_CAPTURE_START, RX_CAPTURE_END),
"validation_buffer": _range_payload(RX_VALIDATION_START, RX_VALIDATION_END),
"tx_staging_buffer": _range_payload(TX_STAGING_START, TX_STAGING_END),
"frame_length": 6,
"checksum_seed": CHECKSUM_SEED,
"checksum_seed_hex": f"0x{CHECKSUM_SEED:02X}",
"command_expression": "RX[0] & 0x07",
"index_expression_candidate": "loc_622B(RX[1], RX[2])",
"value_expression_candidate": "RX[3:4]",
},
"selector_decode": selector_decode,
"stages": stages,
"commands": commands,
"table_surfaces": table_surfaces,
"downstream_traces": downstream_traces,
"feedback_loops": feedback_loops,
"state_bits": _state_bits(),
"bench_implications": _bench_implications(),
"caveats": [
"This is an address-driven static branch map, not proof of every runtime predicate value.",
"Semantic names are candidates; the branch destinations and RAM side effects are ROM evidence.",
"loc_BE70/F970 and loc_3E54/F870 are separate queues; this report names them separately because mixing them up changes the command-4/5 interpretation.",
],
}
def format_text_report(analysis: Mapping[str, Any]) -> str:
summary = analysis["summary"]
frame = analysis["frame_model"]
lines = [
"H8/536 SCI1 RX Branch Trace",
"",
f"Summary: {summary['core_finding']}",
f"Confidence: {summary['confidence']}",
"",
"Frame Model:",
f"- capture buffer: {frame['capture_buffer']['range_hex']}",
f"- validation buffer: {frame['validation_buffer']['range_hex']}",
f"- command: {frame['command_expression']}",
f"- logical index candidate: {frame['index_expression_candidate']}",
f"- value candidate: {frame['value_expression_candidate']}",
f"- checksum: 0x5A XOR RX[0..4] == RX[5]",
"",
"Selector Decode:",
]
selector_decode = analysis.get("selector_decode", {})
if selector_decode:
status = "present" if selector_decode.get("present") else "missing"
lines.append(f"- {selector_decode['title']}: {status}")
lines.append(f" {selector_decode['summary']}")
for rule in selector_decode.get("rules", []):
evidence = _join_hex(rule.get("evidence_addresses_hex", []))
suffix = f" [{evidence}]" if evidence else ""
lines.append(f" - {rule['condition']}: {rule['outcome']}{suffix}")
for implication in selector_decode.get("implications", []):
lines.append(f" - implication: {implication}")
lines.extend([
"",
"Stages:",
])
for stage in analysis.get("stages", []):
status = "present" if stage.get("present") else "missing"
lines.append(f"- {stage['title']}: {status}")
lines.append(f" {stage['summary']}")
for branch in stage.get("branches", []):
condition = branch.get("condition")
outcome = branch.get("outcome")
evidence = _join_hex(branch.get("evidence_addresses_hex", []))
suffix = f" [{evidence}]" if evidence else ""
lines.append(f" - {condition}: {outcome}{suffix}")
lines.extend(["", "Command Branches:"])
for command in analysis.get("commands", []):
response = command.get("response_candidate") or "no immediate serial response"
lines.append(
f"- cmd {command['command_hex']} {command['name']}: {command['availability']}; "
f"handler {command['handler_address_hex']}; {command['summary']}"
)
lines.append(f" response: {response}")
for effect in command.get("side_effects", []):
lines.append(f" - {effect}")
lines.extend(["", "Table Surfaces:"])
for surface in analysis.get("table_surfaces", []):
status = "present" if surface.get("present") else "missing"
evidence = _join_hex(surface.get("evidence_addresses_hex", []))
suffix = f" [{evidence}]" if evidence else ""
lines.append(f"- {surface['name']} {surface['range_hex']}: {status}; {surface['summary']}{suffix}")
for detail in surface.get("details", []):
lines.append(f" - {detail}")
lines.extend(["", "Downstream Flow Traces:"])
for trace in analysis.get("downstream_traces", []):
status = "present" if trace.get("present") else "missing"
lines.append(f"- {trace['title']}: {status}")
lines.append(f" {trace['summary']}")
for step in trace.get("steps", []):
evidence = _join_hex(step.get("evidence_addresses_hex", []))
suffix = f" [{evidence}]" if evidence else ""
lines.append(f" - {step['name']}: {step['effect']}{suffix}")
lines.extend(["", "RX-to-TX Feedback Loops:"])
for loop in analysis.get("feedback_loops", []):
status = "present" if loop.get("present") else "missing"
lines.append(f"- {loop['name']}: {status}")
lines.append(f" trigger: {loop['trigger']}")
lines.append(f" path: {loop['path']}")
lines.append(f" TX outcome: {loop['tx_outcome']}")
if loop.get("timing_gate"):
lines.append(f" timing/gate: {loop['timing_gate']}")
if loop.get("bench_read"):
lines.append(f" bench read: {loop['bench_read']}")
evidence = _join_hex(loop.get("evidence_addresses_hex", []))
if evidence:
lines.append(f" evidence: {evidence}")
lines.extend(["", "State Bits:"])
for bit in analysis.get("state_bits", []):
lines.append(f"- {bit['address_hex']}.{bit['bit']}: {bit['name']} - {bit['meaning']}")
lines.extend(["", "Bench Implications:"])
for implication in analysis.get("bench_implications", []):
lines.append(f"- {implication}")
lines.extend(["", "Caveats:"])
for caveat in analysis.get("caveats", []):
lines.append(f"- {caveat}")
return "\n".join(lines).rstrip() + "\n"
def write_rx_branch_trace(input_path: Path, output_path: Path, *, as_json: bool = False) -> JsonObject:
analysis = analyze_rx_branch_trace(load_rx_branch_input(input_path))
output_path.parent.mkdir(parents=True, exist_ok=True)
if as_json:
output_path.write_text(json.dumps(analysis, indent=2, sort_keys=True) + "\n", encoding="utf-8")
else:
output_path.write_text(format_text_report(analysis), encoding="utf-8")
return analysis
def main(argv: list[str] | None = None, stdout: Any | None = None) -> int:
parser = argparse.ArgumentParser(description="Trace the H8/536 SCI1 RX command branch tree.")
parser.add_argument("input", nargs="?", type=Path, default=DEFAULT_INPUT)
parser.add_argument("--json", action="store_true", help="emit structured JSON instead of readable text")
parser.add_argument("--out", type=Path, default=None, help="write report to this path")
args = parser.parse_args(argv)
stream = stdout
if stream is None:
import sys
stream = sys.stdout
analysis = analyze_rx_branch_trace(load_rx_branch_input(args.input))
rendered = json.dumps(analysis, indent=2, sort_keys=True) + "\n" if args.json else format_text_report(analysis)
if args.out:
args.out.parent.mkdir(parents=True, exist_ok=True)
args.out.write_text(rendered, encoding="utf-8")
print(f"wrote {args.out}", file=stream)
else:
print(rendered, end="", file=stream)
return 0
def _selector_decode(by_address: Mapping[int, JsonObject]) -> JsonObject:
return {
"title": "loc_622B logical selector decode",
"present": _has_all(by_address, (0x622B, 0x622D, 0x6231, 0x6244, 0x624D, 0x6256, 0x625F, 0x6264)),
"summary": (
"The RX handler builds a raw word from RX[1:2], uses RX[1] bit0-bit2 as a page, "
"keeps RX[2] as the low selector byte, and maps the result into a 0x000-0x1FF logical selector."
),
"input_expression_candidate": "raw = (RX[1] << 8) | RX[2]; page = RX[1] & 0x07; low = RX[2]",
"rules": [
_branch("page 0, or pages 4-7, and low <= 0x7F", "selector = 0x000 + low", [0x6234, 0x6236, 0x6244, 0x6248, 0x6264]),
_branch("page 1 and low <= 0xFF", "selector = 0x080 + low", [0x6238, 0x623A, 0x624D, 0x6251, 0x6264]),
_branch("page 2 and low <= 0x7F", "selector = 0x180 + low", [0x623C, 0x623E, 0x6256, 0x625A, 0x6264]),
_branch("page 3, page/range failure, or page 0/4-7 with low > 0x7F", "selector forced to 0x01FF", [0x6240, 0x6242, 0x625F, 0x6261]),
],
"implications": [
"RX[1].7 is not part of the selector here because the dispatcher rejects byte1 bit7 before command handling.",
"Commands shaped as 01 80 xx ... are rejected before loc_622B-derived command handling even if the low byte looks useful.",
"Page 4-7 encodings appear to alias the page-0 decode path unless the low byte is out of range.",
],
}
def _rx_interrupt_capture(by_address: Mapping[int, JsonObject]) -> JsonObject:
return {
"title": "SCI1 RXI/ERI byte capture",
"present": _has_all(by_address, (0xBB57, 0xBB67, 0xBB6D, 0xBB90, 0xBB96, 0xBB9E)),
"summary": (
"ERI latches FAA4.7 and clears physical error flags; RXI clears RDRF, reads SCI1_RDR, "
"stores bytes into F868-F86D, reloads F9C1, and sets F9C5 when six bytes are captured."
),
"branches": [
_branch("ERI taken", "set FAA4.7, clear ORER/FER/PER, then fall into RXI byte capture", [0xBB57, 0xBB5B, 0xBB5F, 0xBB63]),
_branch("F9C1 == 0 before byte", "clear F9C3 so the byte starts a fresh frame", [0xBB71, 0xBB75, 0xBB77]),
_branch("F9C1 != 0 and F9C3 <= 5", "append byte at F868 + F9C3 and increment F9C3", [0xBB7D, 0xBB82, 0xBB8A, 0xBB90, 0xBB96]),
_branch("F9C1 != 0 and F9C3 > 5", "clear FAA4 and skip storing this byte", [0xBB7D, 0xBB82, 0xBB84, 0xBB88]),
_branch("incremented F9C3 == 6", "load F9C5 with 0x14 as the RX/session timeout window", [0xBB9A, 0xBB9C, 0xBB9E]),
_branch("any RXI exit", "reload F9C1 with 0x05 as the inter-byte timeout", [0xBBA3]),
],
}
def _frame_validation(by_address: Mapping[int, JsonObject]) -> JsonObject:
return {
"title": "six-byte validation and checksum",
"present": _has_all(by_address, (0xBBAB, 0xBBB3, 0xBBCB, 0xBBCF, 0xBBD6, 0xBBEC, 0xBBF0, 0xBC01)),
"summary": (
"The main-loop processor only runs when F9C3 is six, copies F868-F86D to F860-F865, "
"clears F9C3, rejects physical-error frames, checks the 0x5A XOR checksum, and decodes the index."
),
"branches": [
_branch("F9C3 != 6", "return without processing", [0xBBAB, 0xBBB0]),
_branch("FAA4.7 set after capture", "enter retry/error path at BE29", [0xBBCF, 0xBBD3]),
_branch("checksum mismatch", "enter retry/error path at BE29", [0xBBD6, 0xBBD8, 0xBBDC, 0xBBE0, 0xBBE4, 0xBBE8, 0xBBEC, 0xBBF0]),
_branch("checksum valid", "clear FAA6, decode selector from RX[1:2] through loc_622B, and dispatch on RX[0] & 0x07", [0xBBF3, 0xBBF7, 0xBBFD, 0xBC01, 0xBC08, 0xBC0C]),
],
}
def _dispatcher(by_address: Mapping[int, JsonObject]) -> JsonObject:
return {
"title": "FAA2 split dispatcher",
"present": _has_all(by_address, (0xBC0F, 0xBC15, 0xBC3A, 0xBC5C, 0xBC63)),
"summary": (
"FAA2 == 0 enters the initial dispatcher for commands 0, 1, 2, and 7. "
"FAA2 != 0 enters the continuation dispatcher; commands 4, 5, and 6 only live there."
),
"branches": [
_branch("FAA2 == 0", "set FAA2.7 and test initial commands 0, 1, 2, 7", [0xBC0F, 0xBC13, 0xBC15, 0xBC20, 0xBC24, 0xBC29, 0xBC2E]),
_branch("FAA2 != 0 and command bit2 set", "continuation commands 4, 5, 6, 7 are possible", [0xBC0F, 0xBC13, 0xBC3A, 0xBC45, 0xBC4A, 0xBC4F, 0xBC54]),
_branch("FAA2 != 0 and command bit2 clear", "BCLR FAA2.3; if that bit was set, clear FAA3 and re-enter initial dispatcher", [0xBC3A, 0xBC3C, 0xBC5C, 0xBC60, 0xBC63, 0xBC67]),
_branch("byte1 bit7 set on initial path", "branch to BD0B and return without normal command handling", [0xBC19, 0xBC1D, 0xBD0B]),
_branch("byte1 bit7 set on continuation path", "branch to BE27 and return without normal command handling", [0xBC3E, 0xBC42, 0xBE27]),
],
}
def _error_retry_path(by_address: Mapping[int, JsonObject]) -> JsonObject:
return {
"title": "checksum/error retry path",
"present": _has_all(by_address, (0xBE29, 0xBE2D, 0xBE33, 0xBE37, 0xBE4D, 0xBE6A)),
"summary": (
"Physical RX errors or checksum failures can either be ignored, clear the session after two retries, "
"or stage a command-7 retry/error echo of RX[1:4]."
),
"branches": [
_branch("FAA5.7 == 0", "return after clearing FAA4.7; retry echo is disabled", [0xBE29, 0xBE2D, 0xBE31]),
_branch("FAA5.7 == 1 and FAA6 < 2", "stage F850=0x07 and F851-F854=RX[1:4], then call BA26", [0xBE33, 0xBE37, 0xBE3C, 0xBE4D, 0xBE52, 0xBE5A, 0xBE62, 0xBE6A]),
_branch("FAA5.7 == 1 and FAA6 >= 2", "load F9C0=0x1F and clear FAA3/FAA2", [0xBE37, 0xBE3C, 0xBE3E, 0xBE43, 0xBE47]),
],
}
def _pending_selector_ring(by_address: Mapping[int, JsonObject]) -> JsonObject:
return {
"title": "pending selector ring at loc_BE70",
"present": _has_all(by_address, (0xBE70, 0xBE78, 0xBE84, 0xBE91, 0xBE95)),
"summary": (
"Several write/ACK paths call BE70 with R5 as the logical selector. BE70 deduplicates that selector "
"against a small F970-like ring using cursors F9B9/F9B4, then appends it if absent."
),
"branches": [
_branch("selector already present", "exit without appending", [0xBE84, 0xBE88, 0xBE9D]),
_branch("ring scan reaches F9B4 cursor", "store R5 into the pending ring and advance F9B4", [0xBE80, 0xBE82, 0xBE91, 0xBE95, 0xBE99]),
],
}
def _resend_and_timeout_paths(by_address: Mapping[int, JsonObject]) -> JsonObject:
return {
"title": "session timeout and resend side paths",
"present": _has_all(by_address, (0x3FEF, 0x3FF5, 0x3FFD, 0x4003, 0xBE9E, 0xBEB5, 0xBED5, 0xBEE4)),
"summary": (
"After any complete RX frame, F9C5 keeps FAA5.7/session-gate state alive for a short window. "
"When that window expires, 3FEF can clear queue cursors and call 400C, while BE9E handles resend countdowns."
),
"branches": [
_branch("F9C5 == 0 at loc_3FEF", "clear F9B5/F9B0 and clear FAA5.7; if FAA5.7 was set, call 400C reset/NOT-ACT state clear", [0x3FEF, 0x3FF3, 0x3FF5, 0x3FF9, 0x3FFD, 0x4001, 0x4003]),
_branch("F9C5 != 0 at loc_3FEF", "set FAA5.7, allowing retry/resend/session-gated paths", [0x3FEF, 0x3FF3, 0x4007]),
_branch("BE9E sees no pending FAA5 & FAA3 & 0x80", "clear FAA2 and return", [0xBE9E, 0xBEA5, 0xBEA9, 0xBEAD, 0xBEAF]),
_branch("BE9E pending and F9C6==0 and F9C8!=0", "decrement F9C8, reload F9C6, and possibly resend staged TX through BA26", [0xBEB5, 0xBEBB, 0xBEC1, 0xBEC5, 0xBECB, 0xBED1, 0xBED5]),
_branch("BE9E pending but F9C8==0", "clear F9C5, which lets 3FEF collapse the session gate later", [0xBEBB, 0xBEBF, 0xBEE4]),
],
}
def _table_surfaces(by_address: Mapping[int, JsonObject]) -> list[JsonObject]:
return [
{
"name": "primary_value_table",
"range_hex": "H'E000-H'E3FF",
"present": _has_all(by_address, (0xBC75, 0xBC95, 0xBCEC, 0xBD1A, 0xBD35)),
"summary": "logical selector word table; command 0 and continuation command 4 write it, command 1 reads it",
"details": [
"indexed as E000 + 2*selector after loc_622B",
"selector zero writes force the low byte to 0x80 on commands 0 and 4",
"this is the table that must contain E000[0]=0x8080 for the emulator-correlated CONNECT OK branch",
],
"evidence_addresses_hex": _hexes([0xBC75, 0xBC95, 0xBCEC, 0xBD1A, 0xBD35], by_address),
},
{
"name": "current_report_value_table",
"range_hex": "H'E800-H'EBFF",
"present": _has_all(by_address, (0xBC79, 0xBC99, 0xBD1E, 0xBB35)),
"summary": "current/report value table used when queued serial reports are converted into TX frames",
"details": [
"command 0 writes both primary E000 and current E800 for zero and nonzero selectors",
"command 4 writes E800 only on the selector-zero special path; the nonzero command-4 path does not show a matching E800 write here",
"loc_BAF2 reads E800 + 2*queued_selector when building autonomous report frames",
],
"evidence_addresses_hex": _hexes([0xBC79, 0xBC99, 0xBD1E, 0xBB35, 0xBB39, 0xBB3F], by_address),
},
{
"name": "secondary_value_table",
"range_hex": "H'E400-H'E7FF",
"present": _has_all(by_address, (0xBDE5, 0xBDE9)),
"summary": "secondary logical selector word table written by continuation command 6",
"details": [
"command 6 writes RX[3:4] to E400 + 2*selector",
"the matching EC00 flag bit is bit6 rather than bit7",
],
"evidence_addresses_hex": _hexes([0xBDE5, 0xBDE9], by_address),
},
{
"name": "dirty_flag_table",
"range_hex": "H'EC00-H'EDFF",
"present": _has_all(by_address, (0xBC82, 0xBC9D, 0xBD22, 0xBD39, 0xBDE9)),
"summary": "per-selector flag bytes; command 0/4 set bit7 and command 6 sets bit6",
"details": [
"the same logical selector indexes this byte table directly, not as a word offset",
"loc_48FA and other consumers test these table bits before raising follow-on reports",
],
"evidence_addresses_hex": _hexes([0xBC82, 0xBC9D, 0xBD22, 0xBD39, 0xBDE9, 0x4911], by_address),
},
{
"name": "mapped_shadow_or_eeprom_surface",
"range_hex": "H'F400-H'F4FF",
"present": _has_all(by_address, (0xBCA1, 0xBCA9, 0xBD3D, 0xBD45, 0xBD49, 0xBD5F)),
"summary": "optional mapped mirror/persistence surface selected through ROM tables around C564/C565",
"details": [
"nonzero command-0 and command-4 writes consult mapping bytes/words before mirroring into F400",
"command 4 can call BFE0 to persist the mapped word when F76E.7 is set",
"selector zero bypasses this mapped mirror path",
],
"evidence_addresses_hex": _hexes([0xBCA1, 0xBCA9, 0xBD3D, 0xBD45, 0xBD49, 0xBD5F], by_address),
},
]
def _downstream_traces(by_address: Mapping[int, JsonObject]) -> list[JsonObject]:
return [
{
"title": "immediate response staging through loc_BA26",
"present": _has_all(by_address, (0xBCB0, 0xBCCD, 0xBCD7, 0xBCFA, 0xBA36, 0xBA64, 0xBA72, 0xBA7F)),
"summary": (
"Command 0, command 1, command 7, and the retry/error path stage F850-F854, call BA26, "
"copy that staging area to F858-F85C, compute F85D, send the first byte, then let TXI send bytes 1-5."
),
"steps": [
_step("stage command-0 echo", "F850=0x04 and F851-F854 mirror the accepted host fields before BA26", [0xBCB0, 0xBCB5, 0xBCC1, 0xBCC9, 0xBCCD]),
_step("stage command-1 readback", "F850=0x04, F853/F854 receive the E000 table word, and BA26 sends it", [0xBCD7, 0xBCEC, 0xBCF0, 0xBCF6, 0xBCFA]),
_step("finalize TX frame", "BA26 copies F850-F854 to F858-F85C and computes F85D as 0x5A XOR bytes 0-4", [0xBA36, 0xBA3A, 0xBA42, 0xBA4A, 0xBA4E, 0xBA64]),
_step("start SCI1 transmission", "BA26 waits for TDRE, writes F858 to SCI1_TDR, sets F9C2=1, clears TDRE, and enables TIE", [0xBA68, 0xBA72, 0xBA76, 0xBA7B, 0xBA7F]),
_step("finish SCI1 transmission", "TXI indexes F858+F9C2 until six bytes are sent, then disables TIE", [0xBAAB, 0xBAB1, 0xBAB5, 0xBABF, 0xBAC3, 0xBACA]),
],
},
{
"title": "selector-processing queue BE70/F970 into loc_2806",
"present": _has_all(by_address, (0xBE70, 0xBE91, 0xBE95, 0x2806, 0x2819, 0x2822, 0x289F)),
"summary": (
"BE70 appends unique logical selectors into the F970 ring. The main loop later consumes that ring at "
"loc_2806 and dispatches selector-specific behavior through the 28A6 jump table."
),
"steps": [
_step("append unique selector", "BE70 scans from F9B9 to F9B4, skips duplicates, writes R5 to F970+2*cursor, and advances F9B4", [0xBE70, 0xBE78, 0xBE84, 0xBE91, 0xBE95, 0xBE99]),
_step("consume selector", "loc_2806 reads F970+2*F9B9, advances F9B9, masks the selector to 0x01FF, and keeps it in R5", [0x2806, 0x280C, 0x2819, 0x281D, 0x2822, 0x2826, 0x282A]),
_step("active-selector side path", "selectors matching F736/F738/F73A/F73C/F73E/F740/F742/F754 call loc_48FA before the jump table", [0x2837, 0x285E, 0x2878, 0x2892, 0x2CAB, 0x2CAD]),
_step("selector jump table", "the consumer jumps through table 28A6; selector zero is emulator-correlated with the CONNECT handler window", [0x289F, 0x28A3]),
],
},
{
"title": "serial-report queue loc_3E54/F870 into loc_BAF2",
"present": _has_all(by_address, (0x3E54, 0x3E76, 0x3E7A, 0x3E9A, 0x3EBF, 0x3EC3, 0xBAF2, 0xBB00)),
"summary": (
"loc_3E54 is a shared enqueue helper. R2.bit7 queues serial-visible report indexes in F870, "
"while R2.bit6 queues selector-processing work in F970. loc_BAF2 is the path that turns F870 entries into outbound 6-byte frames."
),
"steps": [
_step("enqueue serial report", "when R2.7 is set, loc_3E54 deduplicates R3 in F870 and advances F9B0", [0x3E54, 0x3E58, 0x3E6C, 0x3E76, 0x3E7A, 0x3E7E]),
_step("queue backpressure drain", "if the serial queue is nearly full, loc_3E54 calls loc_3FD3 until there is space", [0x3E82, 0x3E8B, 0x3E91, 0x3E93]),
_step("enqueue selector processing", "when R2.6 is set, loc_3E54 deduplicates R3 in F970 and advances F9B4", [0x3E9A, 0x3E9E, 0x3EB2, 0x3EBF, 0x3EC3, 0x3EC7]),
_step("dequeue serial report", "loc_BAF2 compares F9B5/F9B0, reads F870+2*F9B5, builds a TX report from E800, and calls BA26", [0xBAF2, 0xBAF8, 0xBB00, 0xBB08, 0xBB1C, 0xBB35, 0xBB43]),
_step("open continuation latch", "after a report send, loc_BAF2 sets FAA2.3, FAA3.7, F9C6, and F9C8; command 4/5/6 can then consume or ACK that report", [0xBB00, 0xBB46, 0xBB4C, 0xBB51]),
],
},
{
"title": "TXI/RXI race and continuation collapse",
"present": _has_all(by_address, (0xBA84, 0xBA8A, 0xBA90, 0xBA96, 0xBA9A, 0xBA9E, 0xBAA2)),
"summary": (
"The TX interrupt handler has an interlock: if a queued report is awaiting continuation and RX bytes are already arriving, "
"it clears the report/continuation state and stops TXI before the normal frame-finish path."
),
"steps": [
_step("detect overlap", "TXI tests FAA2.3, FAA5.7, and nonzero F9C3 before continuing the TX frame", [0xBA84, 0xBA8A, 0xBA90]),
_step("collapse continuation", "on overlap it clears FAA2.3 and FAA3, disables TIE, and loads F9C0=0x1F", [0xBA96, 0xBA9A, 0xBA9E, 0xBAA2]),
_step("normal completion path", "without the overlap, TXI sends bytes until F9C2 reaches six and then starts the post-TX delay path", [0xBAA9, 0xBAB5, 0xBAC3, 0xBACA, 0xBADA, 0xBAED]),
],
},
{
"title": "session expiry into reset/not-active state",
"present": _has_all(by_address, (0x3FEF, 0x3FFD, 0x4003, 0x400C, 0x4028, 0x4042)),
"summary": (
"When F9C5 reaches zero, loc_3FEF clears queue cursors and FAA5.7; if FAA5.7 had been set, "
"loc_400C clears connection/session RAM and refreshes the inactive display state."
),
"steps": [
_step("expire RX session", "F9C5==0 clears F9B5/F9B0 and clears FAA5.7", [0x3FEF, 0x3FF5, 0x3FF9, 0x3FFD]),
_step("call reset side path", "if clearing FAA5.7 changed the bit, loc_3FEF calls loc_400C", [0x4001, 0x4003]),
_step("clear connection state", "loc_400C clears F730/F756-F759/F732/F75C/FB03/F791/F795/F76E and calls follow-on display/session refresh routines", [0x400C, 0x4010, 0x4020, 0x4028, 0x4034, 0x403C, 0x4040, 0x4042]),
],
},
]
def _feedback_loops(by_address: Mapping[int, JsonObject]) -> list[JsonObject]:
return [
{
"name": "command-0 write echo and selector-processing loop",
"present": _has_all(by_address, (0xBC69, 0xBC86, 0xBCB0, 0xBCCD, 0xBE70, 0x2806, 0x28A3)),
"trigger": "valid command 0 while FAA2 == 0",
"path": "RX validation -> BC69 table write -> BE70 appends selector to F970 -> BCB0/BA26 sends immediate 0x04 echo -> later loc_2806 consumes F970",
"tx_outcome": "immediate command-4-style echo frame plus possible later selector-driven reports/display work",
"timing_gate": "BA26 sets F9C0=0x64 and F9C4=0x07, temporarily delaying queued TX and heartbeat enqueue",
"bench_read": "a command-0 probe is stateful: it both writes table state and spends time in the post-TX delay, so it can disturb the continuation window being tested",
"evidence_addresses_hex": _hexes([0xBC69, 0xBC75, 0xBC86, 0xBCB0, 0xBCCD, 0xBA2C, 0xBA31, 0xBE70, 0x2819, 0x28A3], by_address),
},
{
"name": "command-1 readback and previous-frame loop",
"present": _has_all(by_address, (0xBCD7, 0xBCEC, 0xBCFA, 0xBA36, 0xBE05, 0xBE22)),
"trigger": "valid command 1 while FAA2 == 0, followed by optional command 7",
"path": "RX validation -> BCD7 reads E000[selector] -> BA26 finalizes TX in F858-F85D -> BE05 can copy F858-F85C back to F850-F854",
"tx_outcome": "direct 0x04 readback, then command 7 can retransmit the exact last finalized TX frame",
"timing_gate": "command 1 clears FAA2.7 and never enters continuation handling; it is a readback path, not an ACK path",
"bench_read": "command 7 after command 1 proves what was last finalized, but does not prove a hidden continuation token by itself",
"evidence_addresses_hex": _hexes([0xBCD7, 0xBCEC, 0xBCFA, 0xBA36, 0xBA64, 0xBE05, 0xBE09, 0xBE22], by_address),
},
{
"name": "retry/error 07 echo loop",
"present": _has_all(by_address, (0xBE29, 0xBE2D, 0xBE4D, 0xBE6A, 0xBA26)),
"trigger": "physical RX error or checksum mismatch while FAA5.7 is set and retry count FAA6 is below two",
"path": "RX validation error -> BE29 retry gate -> BE4D stages F850=0x07 and copies RX[1:4] into F851-F854 -> BA26 sends",
"tx_outcome": "a 0x07 frame that can echo the host payload bytes, independent of E000/E800 table contents",
"timing_gate": "after two retries, the ROM loads F9C0=0x1F and clears FAA3/FAA2 instead of sending another echo",
"bench_read": "visible 07 frames in a noisy/tight timing run can be retry echoes, not necessarily device status or a continuation token",
"evidence_addresses_hex": _hexes([0xBE29, 0xBE2D, 0xBE33, 0xBE37, 0xBE3E, 0xBE43, 0xBE47, 0xBE4D, 0xBE6A], by_address),
},
{
"name": "autonomous report to host continuation loop",
"present": _has_all(by_address, (0x3E54, 0x3E76, 0x3FD3, 0xBAF2, 0xBB00, 0xBB43, 0xBD67)),
"trigger": "firmware enqueues a serial-visible report via loc_3E54 with R2.7 set",
"path": "loc_3E54 appends report selector to F870 -> loc_3FD3 allows BAF2 when FAA2/F9C0 gates are clear -> BAF2 sends report -> command 4/5/6 continuation can advance F9B5",
"tx_outcome": "autonomous 6-byte report frame built from E800[selector], with FAA2.3/FAA3.7 left set to await host continuation or ACK",
"timing_gate": "F9C0 must count down before BAF2 can send, and F9C6/F9C8/BE9E control repeated sends while FAA3.7 remains live",
"bench_read": "the actual ACK/write target is not just the selector; it is the report that is live under FAA2.3 before TXI/RXI or BE9E clears it",
"evidence_addresses_hex": _hexes([0x3E54, 0x3E76, 0x3E7A, 0x3FD3, 0x3FE5, 0x3FEB, 0xBAF2, 0xBB00, 0xBB35, 0xBB43, 0xBB46, 0xBB51, 0xBD67, 0xBDC2, 0xBDED], by_address),
},
{
"name": "selector-processing to report loop",
"present": _has_all(by_address, (0xBE70, 0x2806, 0x2CAD, 0x48FA, 0x4926, 0x3E54)),
"trigger": "command 0/4/selected command 5 calls BE70, or loc_3E54 is called with R2.6 set",
"path": "BE70/F970 selector queue -> loc_2806 selector dispatch -> active-selector side path loc_48FA -> loc_3E54 can enqueue report 0x00F6 when E1EC.13 is set",
"tx_outcome": "possible later serial report produced through the F870/BAF2 loop rather than an immediate response to the original RX frame",
"timing_gate": "loc_48FA is gated by FB03.7, F732 values, E1EC.13, and F76E.6 before reaching its indirect table",
"bench_read": "selector-zero CONNECT work and visible serial reports can be separated in time; lack of immediate TX does not mean the selector queue did nothing",
"evidence_addresses_hex": _hexes([0xBE70, 0xBE91, 0x2806, 0x2819, 0x2CAD, 0x48FA, 0x490F, 0x4921, 0x4923, 0x4926], by_address),
},
{
"name": "TXI/RXI overlap cancellation loop",
"present": _has_all(by_address, (0xBA84, 0xBA8A, 0xBA90, 0xBA96, 0xBA9A, 0xBAA2)),
"trigger": "host RX bytes begin while a report TX is active, FAA2.3 is set, and FAA5.7 is set",
"path": "TXI observes FAA2.3 + FAA5.7 + F9C3 != 0 -> clears FAA2.3/FAA3 -> disables TIE -> loads F9C0=0x1F",
"tx_outcome": "the pending report/continuation state can be canceled before the host command reaches the continuation dispatcher",
"timing_gate": "this depends on byte timing relative to TXI and F9C3, so polite emulator injection can miss it",
"bench_read": "this is a concrete ROM reason why bench timing might see latches/retries while a too-polite emulator reaches cleaner continuation paths",
"evidence_addresses_hex": _hexes([0xBA84, 0xBA8A, 0xBA90, 0xBA96, 0xBA9A, 0xBA9E, 0xBAA2], by_address),
},
{
"name": "session-expiry to heartbeat/not-active loop",
"present": _has_all(by_address, (0xBB9E, 0xBF31, 0xBF37, 0x3FEF, 0x400C, 0x4046, 0x4067)),
"trigger": "any complete six-byte RX frame loads F9C5, then FRT2 decrements it to zero",
"path": "RX complete -> F9C5=0x14 -> FRT2 decrements F9C5 -> loc_3FEF clears session/queues and calls 400C -> loc_4046 can later enqueue heartbeat selector 0",
"tx_outcome": "eventual return to idle heartbeat/report behavior and inactive-session display state",
"timing_gate": "F9C4 gates heartbeat enqueue; BA26 reloads it to 0x07 after each send, matching the roughly 700 ms heartbeat cadence",
"bench_read": "the common CONNECT NOT ACT after arbitrary six-byte traffic is consistent with this expiry/reset loop",
"evidence_addresses_hex": _hexes([0xBB9E, 0xBF31, 0xBF37, 0x3FEF, 0x3FFD, 0x4003, 0x400C, 0x4046, 0x4067, 0xBA31], by_address),
},
]
def _command_branches(by_address: Mapping[int, JsonObject]) -> list[JsonObject]:
return [
{
"command": 0x00,
"command_hex": "0x00",
"name": "set_value_acked_candidate",
"handler_address_hex": h16(0xBC69),
"availability": "initial path only: checksum valid, FAA2 == 0, RX[1].7 == 0",
"summary": "writes RX[3:4] into primary/current tables, flags the selector, calls BE70, and sends an echo-style 0x04 response",
"response_candidate": "F850=0x04; F851-F854 mostly echo RX[1:4]; BA26 sends it",
"side_effects": [
"selector zero is special: the low byte is forced to 0x80 after the high byte is taken from RX[3]",
"nonzero selectors can mirror into an auxiliary table via a mapping table",
"clears FAA2.7 before exit",
],
"evidence_addresses_hex": _hexes([0xBC69, 0xBC75, 0xBC79, 0xBC82, 0xBC86, 0xBCB0, 0xBCCD, 0xBCD0], by_address),
},
{
"command": 0x01,
"command_hex": "0x01",
"name": "read_value_candidate",
"handler_address_hex": h16(0xBCD7),
"availability": "initial path only: checksum valid, FAA2 == 0, RX[1].7 == 0",
"summary": "reads primary table E000 + 2*selector and stages a 0x04 response",
"response_candidate": "F850=0x04; F851 is overwritten with RX[2]; F853/F854 receive table high/low; F852 is not freshly written here",
"side_effects": [
"does not enter continuation command handling",
"clears FAA2.7 before exit",
],
"evidence_addresses_hex": _hexes([0xBCD7, 0xBCE0, 0xBCE8, 0xBCEC, 0xBCF0, 0xBCF6, 0xBCFA, 0xBCFD], by_address),
},
{
"command": 0x02,
"command_hex": "0x02",
"name": "initial_clear_or_noop_candidate",
"handler_address_hex": h16(0xBD04),
"availability": "initial path only: checksum valid, FAA2 == 0, RX[1].7 == 0",
"summary": "clears FAA2.7 and returns without staging a response",
"response_candidate": None,
"side_effects": ["likely a quiet/session-clear style command on the initial path"],
"evidence_addresses_hex": _hexes([0xBD04, 0xBD08], by_address),
},
{
"command": 0x04,
"command_hex": "0x04",
"name": "continuation_set_value_candidate",
"handler_address_hex": h16(0xBD0E),
"availability": "continuation path only: checksum valid, FAA2 != 0, command bit2 set, RX[1].7 == 0",
"summary": "writes a value into the primary table without an immediate serial response; selector zero also updates the current/report table",
"response_candidate": None,
"side_effects": [
"selector zero is special: RX[3] becomes the high byte and low byte is forced to 0x80",
"nonzero selectors write E000 and flag EC00.7; the matching E800 current/report write is not present in this handler",
"nonzero selectors can mirror/persist through F400/BFE0 when mapping and F76E.7 allow it",
"if FAA2.3 was set from a queued report, advances F9B5 to consume that report",
"clears FAA3 and FAA2 before exit",
],
"evidence_addresses_hex": _hexes([0xBD0E, 0xBD1A, 0xBD1E, 0xBD22, 0xBD26, 0xBD35, 0xBD64, 0xBD67, 0xBD6D, 0xBD75, 0xBD79], by_address),
},
{
"command": 0x05,
"command_hex": "0x05",
"name": "continuation_ack_or_clear_pending_candidate",
"handler_address_hex": h16(0xBD80),
"availability": "continuation path only: checksum valid, FAA2 != 0, command bit2 set, RX[1].7 == 0",
"summary": "ACK/session-clear path; usually no response, but selected logical indexes feed BE70 or clear connection latches",
"response_candidate": None,
"side_effects": [
"selectors 0x006C, 0x006D, and 0x006E call BE70",
"with F731.7 set, selectors 0x006B, 0x0096, 0x0097, 0x00C6, and 0x00F8 clear F731.7/F790.7",
"if FAA2.3 was set from a queued report, advances F9B5",
"clears FAA3 and FAA2 before exit",
],
"evidence_addresses_hex": _hexes([0xBD80, 0xBD85, 0xBD94, 0xBD9A, 0xBDB5, 0xBDBF, 0xBDC2, 0xBDC8, 0xBDD0, 0xBDD4], by_address),
},
{
"command": 0x06,
"command_hex": "0x06",
"name": "continuation_set_secondary_candidate",
"handler_address_hex": h16(0xBDDB),
"availability": "continuation path only: checksum valid, FAA2 != 0, command bit2 set, RX[1].7 == 0",
"summary": "writes RX[3:4] into the secondary table and sets flag-table bit 6",
"response_candidate": None,
"side_effects": [
"if FAA2.3 was set from a queued report, advances F9B5",
"clears FAA3 and FAA2 before exit",
],
"evidence_addresses_hex": _hexes([0xBDDB, 0xBDE5, 0xBDE9, 0xBDED, 0xBDF3, 0xBDFB, 0xBDFF], by_address),
},
{
"command": 0x07,
"command_hex": "0x07",
"name": "retransmit_previous_tx_candidate",
"handler_address_hex": h16(0xBE05),
"availability": "initial or continuation path",
"summary": "copies the previous finalized TX frame bytes back into staging and sends them again",
"response_candidate": "previous TX frame retransmitted through BA26",
"side_effects": ["loads F9C0 with 0x1F before sending"],
"evidence_addresses_hex": _hexes([0xBE05, 0xBE09, 0xBE0D, 0xBE11, 0xBE15, 0xBE19, 0xBE1D, 0xBE22], by_address),
},
]
def _state_bits() -> list[JsonObject]:
return [
{"address": 0xFAA2, "address_hex": h16(0xFAA2), "bit": 7, "name": "rx_command_in_progress_candidate", "meaning": "set on initial-path parse; cleared by command 0/1/2 exits or by continuation cleanup"},
{"address": 0xFAA2, "address_hex": h16(0xFAA2), "bit": 3, "name": "queued_report_ack_needed_candidate", "meaning": "set by the autonomous queue send path at BB00; continuation command 4/5/6 can advance F9B5 only when this bit was set"},
{"address": 0xFAA3, "address_hex": h16(0xFAA3), "bit": 7, "name": "pending_resend_mask_candidate", "meaning": "set after queued report send; BE9E masks it with FAA5.7 before resend/clear decisions"},
{"address": 0xFAA4, "address_hex": h16(0xFAA4), "bit": 7, "name": "rx_physical_error_latch_candidate", "meaning": "set by SCI1 ERI and tested before checksum dispatch"},
{"address": 0xFAA5, "address_hex": h16(0xFAA5), "bit": 7, "name": "rx_session_gate_candidate", "meaning": "set while F9C5 is alive after a complete RX frame; gates retry/resend and heartbeat/report enqueue behavior"},
]
def _bench_implications() -> list[str]:
return [
"A standalone command 4 frame from idle should not hit BD0E; command 4 is continuation-only and initial dispatch does not accept it.",
"Command 5 is not a generic always-live ACK. It only performs ACK/session-clear work on the continuation path while FAA2 != 0.",
"A valid six-byte RX frame loads F9C5 with 0x14, which temporarily sets FAA5.7; when that window expires, loc_3FEF can clear queue/session state and call loc_400C.",
"The observed 07 retry/error family can be produced by BE4D from RX[1:4] after error/checksum retry conditions; it is not automatically proof of a table value or a command-4 continuation token.",
"To prove CONNECT OK through serial, the bench has to create or preserve FAA2 != 0 at the moment command 4 arrives, and ideally FAA2.3 if it expects queued-report advancement.",
]
def _branch(condition: str, outcome: str, evidence: list[int]) -> JsonObject:
return {
"condition": condition,
"outcome": outcome,
"evidence_addresses_hex": [h16(address) for address in evidence],
}
def _step(name: str, effect: str, evidence: list[int]) -> JsonObject:
return {
"name": name,
"effect": effect,
"evidence_addresses_hex": [h16(address) for address in evidence],
}
def _range_payload(start: int, end: int) -> JsonObject:
return {
"start": start,
"end": end,
"start_hex": h16(start),
"end_hex": h16(end),
"range_hex": f"{h16(start)}-{h16(end)}",
}
def _instruction_sequence(value: Any) -> list[JsonObject]:
if isinstance(value, list):
return [dict(item) for item in value if isinstance(item, Mapping) and "address" in item]
return []
def _has_all(by_address: Mapping[int, JsonObject], addresses: tuple[int, ...]) -> bool:
return all(address in by_address for address in addresses)
def _hexes(addresses: list[int], by_address: Mapping[int, JsonObject]) -> list[str]:
return [h16(address) for address in addresses if address in by_address]
def _join_hex(values: Any) -> str:
if not isinstance(values, list):
return ""
return ", ".join(str(value) for value in values)
def _confidence(stages: list[JsonObject], commands: list[JsonObject]) -> str:
present = sum(1 for stage in stages if stage.get("present"))
command_evidence = sum(1 for command in commands if command.get("evidence_addresses_hex"))
if present >= 5 and command_evidence >= 6:
return "high"
if present >= 3 and command_evidence >= 4:
return "medium"
return "low"
__all__ = [
"analyze_rx_branch_trace",
"format_text_report",
"load_rx_branch_input",
"main",
"write_rx_branch_trace",
]

732
h8536/state_map_runner.py Normal file
View File

@@ -0,0 +1,732 @@
from __future__ import annotations
import argparse
import json
import re
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable, TextIO
from .bench_connect_lcd import (
BenchLogger,
FrameDetector,
_import_serial,
_relay_command,
_relay_settle,
_wait_for_ready,
format_frame,
frame_checksum,
frame_checksum_ok,
label_frame,
parse_frame,
)
READBACK_E000_FRAME = bytes.fromhex("01000000005B")
COMMAND7_REPEAT_FRAME = bytes.fromhex("07000000005D")
HEARTBEAT_FRAME = bytes.fromhex("0000000080DA")
CONNECT_FORCE_PRESETS: dict[str, tuple[bytes, int, str]] = {
"clear": (bytes.fromhex("04000000005E"), 0x0080, "selector-zero no-bit clear/inactive primer"),
"dxc": (bytes.fromhex("04000040001E"), 0x4080, "selector-zero CONNECT:DXC-637 candidate"),
"ok": (bytes.fromhex("0400008000DE"), 0x8080, "selector-zero CONNECT: OK candidate"),
"both": (bytes.fromhex("040000C0009E"), 0xC080, "selector-zero bit14+bit15 priority test"),
}
@dataclass(frozen=True)
class StateMapEvent:
direction: str
frame: bytes
timestamp_ms: int | None = None
label: str = ""
source: str = ""
@property
def frame_text(self) -> str:
return format_frame(self.frame)
@dataclass
class StateMapRunContext:
args: argparse.Namespace
logger: BenchLogger
detector: FrameDetector
device: Any
relay: Any | None = None
events: list[StateMapEvent] = field(default_factory=list)
def default_log_path() -> Path:
return Path("captures") / f"state-map-runner-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt"
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=(
"Run or analyze the PT2 state-map proof sequence: visible 07 drain candidate, "
"selector-zero force, E000[0] readback, and command-7 hidden-response probe."
)
)
parser.add_argument("--analyze-log", type=Path, help="analyze an existing bench log instead of opening serial ports")
parser.add_argument("--json-out", type=Path, help="write machine-readable state-map analysis")
parser.add_argument("--preset", choices=sorted(CONNECT_FORCE_PRESETS), default="ok", help="selector-zero force preset")
parser.add_argument("--force-frame", type=parse_frame, help="override preset with a custom selector-zero command-4 frame")
parser.add_argument("--expected-word", type=_int_arg, help="expected E000[0] readback word; default follows --preset")
parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP")
parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate")
parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port")
parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate")
parser.add_argument("--no-power-cycle", action="store_true", help="do not send relay off/on before the test")
parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power")
parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power")
parser.add_argument("--off-seconds", type=float, default=1.5, help="seconds to hold the DUT powered off")
parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the relay port")
parser.add_argument("--ready-timeout", type=float, default=10.0, help="seconds to wait for heartbeat before sending")
parser.add_argument("--ready-heartbeats", type=int, default=2, help="heartbeat frames to observe before sending")
parser.add_argument("--require-ready", action="store_true", help="abort if ready heartbeat count is not observed")
parser.add_argument("--sync", choices=("checksum", "fixed"), default="checksum", help="RX frame sync strategy")
parser.add_argument("--pre-drain", type=float, default=0.250, help="seconds to drain/log RX before priming")
parser.add_argument("--prime-frame", action="append", type=parse_frame, help="optional trigger/tickle frame; repeatable")
parser.add_argument("--prime-repeat", type=int, default=0, help="times to send each prime frame while hunting a trigger")
parser.add_argument("--prime-gap", type=float, default=0.120, help="seconds to listen after each prime frame")
parser.add_argument("--trigger-timeout", type=float, default=3.0, help="seconds to wait for a visible 07 drain candidate")
parser.add_argument("--trigger-prefix", default="07", help="hex prefix for the trigger frame; default any device 07...")
parser.add_argument(
"--trigger-poll-interval",
type=float,
default=0.002,
help="seconds between non-blocking serial polls while hunting a trigger",
)
parser.add_argument(
"--read-poll-interval",
type=float,
default=0.002,
help="seconds between non-blocking serial polls during timed listen/guard windows",
)
parser.add_argument("--force-guard", type=float, default=0.005, help="seconds after the detected trigger before force TX")
parser.add_argument("--post-force-listen", type=float, default=0.050, help="seconds to listen before readback")
parser.add_argument("--readback-frame", type=parse_frame, default=READBACK_E000_FRAME, help="E000[0] readback frame")
parser.add_argument("--readback-window", type=float, default=0.300, help="seconds to listen after readback")
parser.add_argument("--no-command7-probe", action="store_true", help="skip command-7 previous-frame probe")
parser.add_argument("--command7-window", type=float, default=0.300, help="seconds to listen after command-7 probe")
parser.add_argument("--final-read", type=float, default=2.0, help="seconds to listen after the proof sequence")
parser.add_argument("--prompt-screen", action="store_true", help="prompt for observed LCD text after the sequence")
parser.add_argument("--log", type=Path, help="capture log path")
parser.add_argument("--dry-run", action="store_true", help="print the planned state-map sequence without opening ports")
return parser
def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
args = build_arg_parser().parse_args(argv)
force_frame, expected_word, preset_note = resolve_force(args)
if args.analyze_log:
text = args.analyze_log.read_text(encoding="utf-8")
events = parse_bench_log(text)
analysis = analyze_events(events, expected_word=expected_word)
print(format_analysis_report(analysis), file=stdout)
if args.json_out:
_write_json(args.json_out, analysis)
return 0
log_path = args.log or default_log_path()
if args.dry_run:
_print_dry_run(args, log_path, force_frame, expected_word, preset_note, stdout)
return 0
serial = _import_serial()
logger = BenchLogger(log_path, stdout=stdout)
detector = FrameDetector(sync_mode=args.sync)
try:
logger.emit("PT2 state-map proof runner")
logger.emit(f"device={args.port} {args.baud} 8N1 relay={args.relay_port} {args.relay_baud} sync={args.sync}")
logger.emit(f"log={log_path}")
_emit_plan(logger, args, force_frame, expected_word, preset_note)
with serial.Serial(args.port, args.baud, bytesize=8, parity="N", stopbits=1, timeout=0.05) as device:
ctx = StateMapRunContext(args=args, logger=logger, detector=detector, device=device)
try:
_prepare_device(ctx)
trigger = _hunt_trigger(ctx, _parse_prefix(args.trigger_prefix))
if trigger is None:
logger.event("STATE no visible-drain token candidate; force/readback skipped")
_read_for_collect(ctx, args.final_read)
return _finish(ctx, logger, expected_word, args.json_out)
logger.event(f"STATE visible-drain token candidate {trigger.frame_text}")
if args.force_guard > 0:
logger.event(f"STATE force guard {args.force_guard:.3f}s")
_read_for_collect(ctx, args.force_guard)
_send_and_record(ctx, force_frame, "selector_zero_force")
if args.post_force_listen > 0:
_read_for_collect(ctx, args.post_force_listen)
_send_and_record(ctx, args.readback_frame, "e000_readback")
_read_for_collect(ctx, args.readback_window)
if not args.no_command7_probe:
_send_and_record(ctx, COMMAND7_REPEAT_FRAME, "command7_previous_frame_probe")
_read_for_collect(ctx, args.command7_window)
_read_for_collect(ctx, args.final_read)
if args.prompt_screen:
_prompt_screen("LCD after state-map proof sequence", logger)
finally:
if ctx.relay is not None:
ctx.relay.close()
return _finish(ctx, logger, expected_word, args.json_out)
finally:
logger.close()
def resolve_force(args: argparse.Namespace) -> tuple[bytes, int, str]:
preset_frame, preset_word, preset_note = CONNECT_FORCE_PRESETS[args.preset]
frame = args.force_frame or preset_frame
expected_word = args.expected_word if args.expected_word is not None else _expected_word_for_force(frame, preset_word)
return frame, expected_word, preset_note
def analyze_events(events: Iterable[StateMapEvent], *, expected_word: int | None = None) -> dict[str, Any]:
event_list = list(events)
trigger_indexes = [index for index, event in enumerate(event_list) if is_visible_drain_candidate(event)]
first_trigger = trigger_indexes[0] if trigger_indexes else None
force_indexes = [index for index, event in enumerate(event_list) if is_selector_zero_force(event)]
first_force_after_trigger = _first_after(force_indexes, first_trigger)
readback_tx_indexes = [index for index, event in enumerate(event_list) if is_tx_frame(event, READBACK_E000_FRAME)]
command7_tx_indexes = [index for index, event in enumerate(event_list) if is_tx_frame(event, COMMAND7_REPEAT_FRAME)]
readback_rx_indexes = [index for index, event in enumerate(event_list) if is_selector_zero_readback(event)]
direct_readbacks = [_readback_info(event_list[index], expected_word) for index in readback_rx_indexes]
first_direct_after_force = _first_after(readback_rx_indexes, first_force_after_trigger)
command7_after_force = _first_after(command7_tx_indexes, first_force_after_trigger)
command7_replay = _first_after(readback_rx_indexes, command7_after_force) if command7_after_force is not None else None
warnings = _state_warnings(event_list, first_trigger, first_force_after_trigger)
outcome = _outcome(
first_trigger=first_trigger,
first_force=first_force_after_trigger,
first_direct=first_direct_after_force,
command7_replay=command7_replay,
events=event_list,
expected_word=expected_word,
)
facts = {
"kind": "pt2_state_map_analysis",
"event_count": len(event_list),
"expected_word": _word_payload(expected_word),
"outcome": outcome,
"warnings": warnings,
"trigger_candidates": [_event_payload(event_list[index]) for index in trigger_indexes],
"selector_zero_forces": [_event_payload(event_list[index]) for index in force_indexes],
"readback_tx": [_event_payload(event_list[index]) for index in readback_tx_indexes],
"command7_tx": [_event_payload(event_list[index]) for index in command7_tx_indexes],
"direct_readbacks": direct_readbacks,
"first_trigger_index": first_trigger,
"first_force_after_trigger_index": first_force_after_trigger,
"first_direct_readback_after_force_index": first_direct_after_force,
"command7_replay_readback_index": command7_replay,
"post_force_rx_labels": _post_force_rx_labels(event_list, first_force_after_trigger),
"state_map_notes": [
"A device 07... frame is treated as a visible F870 drain/token candidate, not proof by itself.",
"The proof target is a retained selector-zero readback: RX 04 00 QQ HH LL after the force/readback turn.",
"Command 0/1 or overlapping RX before the force can spend or clear the FAA2/FAA3 opportunity.",
],
}
return facts
def format_analysis_report(analysis: dict[str, Any]) -> str:
lines = [
"PT2 state-map analysis",
f"outcome={analysis['outcome']['name']} confidence={analysis['outcome']['confidence']}",
f"reason={analysis['outcome']['reason']}",
f"events={analysis['event_count']} triggers={len(analysis['trigger_candidates'])} "
f"forces={len(analysis['selector_zero_forces'])} readbacks={len(analysis['direct_readbacks'])}",
]
expected = analysis.get("expected_word") or {}
if expected:
lines.append(f"expected_e0000={expected['hex']}")
if analysis["trigger_candidates"]:
first = analysis["trigger_candidates"][0]
lines.append(f"first_trigger={first['frame']} label={first['label'] or '(unlabeled)'}")
if analysis["selector_zero_forces"]:
first = analysis["selector_zero_forces"][0]
lines.append(f"first_force={first['frame']}")
for readback in analysis["direct_readbacks"]:
match = " expected" if readback.get("matches_expected") else ""
lines.append(
f"readback frame={readback['frame']} qq=0x{readback['qq']:02X} "
f"value={readback['value_hex']}{match}"
)
for warning in analysis["warnings"]:
lines.append(f"warning={warning}")
labels = analysis.get("post_force_rx_labels", {})
if labels:
joined = ", ".join(f"{name}={count}" for name, count in sorted(labels.items()))
lines.append(f"post_force_rx={joined}")
return "\n".join(lines)
def parse_bench_log(text: str) -> list[StateMapEvent]:
lines = text.splitlines()
events: list[StateMapEvent] = []
rx_detect_seen = False
for line in lines:
detect = _DETECT_RE.match(line.strip())
if detect:
rx_detect_seen = True
frame = _parse_hex_bytes(detect.group("hex"))
if len(frame) == 6:
events.append(
StateMapEvent(
direction="rx",
frame=frame,
timestamp_ms=_timestamp_to_ms(detect.group("ts")),
label=detect.group("label"),
source="detect",
)
)
continue
chunk = _CHUNK_RE.match(line.strip())
if not chunk or chunk.group("direction") != "TX":
continue
frame = _parse_hex_bytes(chunk.group("hex"))
if len(frame) == 6:
events.append(
StateMapEvent(
direction="tx",
frame=frame,
timestamp_ms=_timestamp_to_ms(chunk.group("ts")),
label="",
source="tx_chunk",
)
)
if rx_detect_seen:
return events
detector = FrameDetector()
for line in lines:
chunk = _CHUNK_RE.match(line.strip())
if not chunk or chunk.group("direction") != "RX":
continue
data = _parse_hex_bytes(chunk.group("hex"))
for frame, label in detector.feed(data):
events.append(
StateMapEvent(
direction="rx",
frame=frame,
timestamp_ms=_timestamp_to_ms(chunk.group("ts")),
label=label,
source="rx_chunk_resync",
)
)
return events
def is_visible_drain_candidate(event: StateMapEvent) -> bool:
return event.direction == "rx" and len(event.frame) == 6 and frame_checksum_ok(event.frame) and event.frame[0] == 0x07
def is_selector_zero_force(event: StateMapEvent) -> bool:
return (
event.direction == "tx"
and len(event.frame) == 6
and frame_checksum_ok(event.frame)
and event.frame[0] == 0x04
and event.frame[1] == 0x00
and event.frame[2] == 0x00
)
def is_selector_zero_readback(event: StateMapEvent) -> bool:
return (
event.direction == "rx"
and len(event.frame) == 6
and frame_checksum_ok(event.frame)
and event.frame[0] == 0x04
and event.frame[1] == 0x00
)
def is_tx_frame(event: StateMapEvent, frame: bytes) -> bool:
return event.direction == "tx" and event.frame == frame
def _prepare_device(ctx: StateMapRunContext) -> None:
args = ctx.args
if not args.no_power_cycle:
serial = _import_serial()
ctx.relay = serial.Serial(args.relay_port, args.relay_baud, timeout=0.25)
_relay_settle(ctx.relay, args.relay_settle, ctx.logger)
_relay_command(ctx.relay, args.power_off_command, ctx.logger)
time.sleep(args.off_seconds)
ctx.device.reset_input_buffer()
ctx.detector = FrameDetector(sync_mode=args.sync)
_relay_command(ctx.relay, args.power_on_command, ctx.logger)
else:
ctx.device.reset_input_buffer()
ready = _wait_for_ready(ctx.device, ctx.detector, ctx.logger, args.ready_timeout, args.ready_heartbeats)
if args.require_ready and not ready:
raise SystemExit(2)
if args.pre_drain > 0:
ctx.logger.event(f"STATE pre-drain {args.pre_drain:.3f}s")
_read_for_collect(ctx, args.pre_drain)
def _hunt_trigger(ctx: StateMapRunContext, prefix: bytes) -> StateMapEvent | None:
args = ctx.args
primes = list(args.prime_frame or [])
for repeat_index in range(max(0, args.prime_repeat)):
for prime_index, prime in enumerate(primes, start=1):
ctx.logger.event(f"STATE prime {repeat_index + 1}/{args.prime_repeat}.{prime_index}")
_send_and_record(ctx, prime, "prime")
trigger = _read_until_trigger(ctx, args.prime_gap, prefix)
if trigger is not None:
return trigger
return _read_until_trigger(ctx, args.trigger_timeout, prefix)
def _read_until_trigger(ctx: StateMapRunContext, seconds: float, prefix: bytes) -> StateMapEvent | None:
ctx.logger.event(f"STATE wait visible-drain prefix={format_frame(prefix)} timeout={seconds:.3f}s")
deadline = time.monotonic() + max(0.0, seconds)
poll_interval = max(0.001, float(getattr(ctx.args, "trigger_poll_interval", 0.002)))
while time.monotonic() < deadline:
events = _read_available_for_collect(ctx)
for event in events:
if is_visible_drain_candidate(event) and event.frame.startswith(prefix):
return event
remaining = deadline - time.monotonic()
if remaining > 0:
time.sleep(min(poll_interval, remaining))
return None
def _read_for_collect(ctx: StateMapRunContext, seconds: float) -> list[StateMapEvent]:
observed: list[StateMapEvent] = []
deadline = time.monotonic() + max(0.0, seconds)
poll_interval = max(0.001, float(getattr(ctx.args, "read_poll_interval", 0.002)))
while time.monotonic() < deadline:
events = _read_available_for_collect(ctx)
if events:
observed.extend(events)
continue
remaining = deadline - time.monotonic()
if remaining > 0:
time.sleep(min(poll_interval, remaining))
return observed
def _read_available_for_collect(ctx: StateMapRunContext) -> list[StateMapEvent]:
waiting = getattr(ctx.device, "in_waiting", 0)
if waiting <= 0:
return []
return _record_rx_data(ctx, ctx.device.read(waiting))
def _record_rx_data(ctx: StateMapRunContext, data: bytes) -> list[StateMapEvent]:
observed: list[StateMapEvent] = []
dropped_before = ctx.detector.dropped_bytes
ctx.logger.chunk("RX", data)
for frame, label in ctx.detector.feed(data):
event = StateMapEvent(
direction="rx",
frame=frame,
timestamp_ms=_now_ms(),
label=label,
source="live",
)
ctx.events.append(event)
observed.append(event)
state_label = _state_frame_label(event)
ctx.logger.event(f"DETECT {label} {format_frame(frame)}")
if state_label:
ctx.logger.event(f"STATE_FRAME {state_label} {format_frame(frame)}")
dropped_now = ctx.detector.dropped_bytes - dropped_before
if dropped_now:
ctx.logger.event(
f"RESYNC dropped_bytes={dropped_now} total_dropped={ctx.detector.dropped_bytes} "
f"buffered={len(ctx.detector.buffer)}"
)
return observed
def _send_and_record(ctx: StateMapRunContext, frame: bytes, label: str) -> None:
ctx.device.write(frame)
ctx.device.flush()
ctx.logger.chunk("TX", frame)
ctx.logger.event(f"SENT {label} checksum_ok={int(frame_checksum_ok(frame))}")
ctx.events.append(
StateMapEvent(direction="tx", frame=frame, timestamp_ms=_now_ms(), label=label, source="live")
)
def _finish(ctx: StateMapRunContext, logger: BenchLogger, expected_word: int | None, json_path: Path | None) -> int:
analysis = analyze_events(ctx.events, expected_word=expected_word)
logger.emit()
logger.emit(format_analysis_report(analysis))
logger.emit()
logger.emit("Summary")
logger.emit(f"rx_frames={len(ctx.detector.frames)} trailing_unframed_bytes={len(ctx.detector.buffer)}")
logger.emit(f"resync_events={ctx.detector.resync_events} dropped_bytes={ctx.detector.dropped_bytes}")
for label, count in sorted(ctx.detector.labels.items()):
logger.emit(f"{label}={count}")
if json_path:
_write_json(json_path, analysis)
return 0
def _outcome(
*,
first_trigger: int | None,
first_force: int | None,
first_direct: int | None,
command7_replay: int | None,
events: list[StateMapEvent],
expected_word: int | None,
) -> dict[str, str]:
if first_trigger is None:
return {
"name": "no_visible_drain_token",
"confidence": "high",
"reason": "No device 07... frame was observed, so the alternate RX opportunity was not demonstrated.",
}
if first_force is None:
return {
"name": "token_observed_but_not_forced",
"confidence": "high",
"reason": "A device 07... frame was observed, but no selector-zero command-4 force followed it.",
}
readback_index = first_direct if first_direct is not None else command7_replay
if readback_index is not None:
value = _readback_value(events[readback_index].frame)
if expected_word is not None and value == expected_word:
return {
"name": "selector_zero_retained",
"confidence": "high",
"reason": f"E000[0] readback matched expected 0x{expected_word:04X}.",
}
return {
"name": "selector_zero_readback_unexpected",
"confidence": "medium",
"reason": f"A selector-zero readback appeared, but value 0x{value:04X} did not match the expected word.",
}
if _only_heartbeat_after_force(events, first_force):
return {
"name": "force_not_proven_heartbeat_only",
"confidence": "medium",
"reason": "After the force/readback turn, only heartbeat frames were observed.",
}
return {
"name": "force_not_proven",
"confidence": "medium",
"reason": "No direct or command-7-recovered selector-zero readback was observed after the force.",
}
def _state_warnings(events: list[StateMapEvent], trigger_index: int | None, force_index: int | None) -> list[str]:
warnings: list[str] = []
if trigger_index is None or force_index is None:
return warnings
trigger = events[trigger_index]
force = events[force_index]
if trigger.timestamp_ms is not None and force.timestamp_ms is not None:
guard_ms = force.timestamp_ms - trigger.timestamp_ms
if guard_ms < 2:
warnings.append(f"force_guard_short_{guard_ms}ms_may_overlap_TXI")
between = events[trigger_index + 1 : force_index]
for event in between:
if event.direction != "tx" or not event.frame:
continue
command = event.frame[0]
if command == 0x00:
warnings.append("command0_between_trigger_and_force_can_destroy_token")
elif command == 0x01:
warnings.append("command1_readback_between_trigger_and_force_can_spend_token")
elif command in {0x04, 0x05, 0x06}:
warnings.append(f"command{command}_between_trigger_and_force_can_spend_alternate_tail")
return sorted(set(warnings))
def _post_force_rx_labels(events: list[StateMapEvent], force_index: int | None) -> dict[str, int]:
if force_index is None:
return {}
counts: dict[str, int] = {}
for event in events[force_index + 1 :]:
if event.direction != "rx":
continue
label = _state_frame_label(event) or event.label or label_frame(event.frame) or "rx_unlabeled"
counts[label] = counts.get(label, 0) + 1
return counts
def _state_frame_label(event: StateMapEvent) -> str:
if event.direction == "rx" and is_visible_drain_candidate(event):
return "visible_drain_token_candidate"
if event.direction == "rx" and is_selector_zero_readback(event):
return "selector_zero_readback_proof_candidate"
if event.direction == "tx" and is_selector_zero_force(event):
return "selector_zero_force"
if event.direction == "tx" and event.frame == READBACK_E000_FRAME:
return "e000_readback_probe"
if event.direction == "tx" and event.frame == COMMAND7_REPEAT_FRAME:
return "command7_previous_frame_probe"
return ""
def _readback_info(event: StateMapEvent, expected_word: int | None = None) -> dict[str, Any]:
value = _readback_value(event.frame)
return {
"frame": event.frame_text,
"timestamp_ms": event.timestamp_ms,
"qq": event.frame[2],
"value": value,
"value_hex": f"0x{value:04X}",
"matches_expected": expected_word is not None and value == expected_word,
}
def _readback_value(frame: bytes) -> int:
return ((frame[3] << 8) | frame[4]) & 0xFFFF
def _first_after(indexes: list[int], anchor: int | None) -> int | None:
if anchor is None:
return indexes[0] if indexes else None
for index in indexes:
if index > anchor:
return index
return None
def _only_heartbeat_after_force(events: list[StateMapEvent], force_index: int) -> bool:
rx_after = [event for event in events[force_index + 1 :] if event.direction == "rx"]
return bool(rx_after) and all(event.frame == HEARTBEAT_FRAME for event in rx_after)
def _event_payload(event: StateMapEvent) -> dict[str, Any]:
return {
"direction": event.direction,
"frame": event.frame_text,
"timestamp_ms": event.timestamp_ms,
"label": event.label,
"state_label": _state_frame_label(event),
"source": event.source,
}
def _word_payload(word: int | None) -> dict[str, Any] | None:
if word is None:
return None
return {"value": word & 0xFFFF, "hex": f"0x{word & 0xFFFF:04X}"}
def _emit_plan(logger: BenchLogger, args: argparse.Namespace, force_frame: bytes, expected_word: int, preset_note: str) -> None:
logger.emit(f"preset={args.preset} note={preset_note}")
logger.emit(f"force={format_frame(force_frame)} checksum_ok={int(frame_checksum_ok(force_frame))}")
logger.emit(f"expected_e0000=0x{expected_word:04X}")
logger.emit(f"readback={format_frame(args.readback_frame)} checksum_ok={int(frame_checksum_ok(args.readback_frame))}")
logger.emit(f"command7_probe={int(not args.no_command7_probe)} frame={format_frame(COMMAND7_REPEAT_FRAME)}")
logger.emit("guardrails=no command-0/command-1 is sent between trigger and force by this runner")
def _print_dry_run(
args: argparse.Namespace,
log_path: Path,
force_frame: bytes,
expected_word: int,
preset_note: str,
stdout: TextIO,
) -> None:
print("PT2 state-map proof runner", file=stdout)
print(f"device={args.port} {args.baud} 8N1", file=stdout)
print(f"relay={args.relay_port} {args.relay_baud}", file=stdout)
print(f"power_cycle={int(not args.no_power_cycle)}", file=stdout)
print(f"preset={args.preset} note={preset_note}", file=stdout)
print(f"force={format_frame(force_frame)} checksum_ok={int(frame_checksum_ok(force_frame))}", file=stdout)
print(f"expected_e0000=0x{expected_word:04X}", file=stdout)
print(f"readback={format_frame(args.readback_frame)} checksum_ok={int(frame_checksum_ok(args.readback_frame))}", file=stdout)
for prime in args.prime_frame or []:
print(f"prime={format_frame(prime)} checksum_ok={int(frame_checksum_ok(prime))}", file=stdout)
print(f"prime_repeat={args.prime_repeat} prime_gap={args.prime_gap:.3f}", file=stdout)
print(f"trigger_prefix={format_frame(_parse_prefix(args.trigger_prefix))} timeout={args.trigger_timeout:.3f}", file=stdout)
print(f"force_guard={args.force_guard:.3f} post_force_listen={args.post_force_listen:.3f}", file=stdout)
print(f"command7_probe={int(not args.no_command7_probe)} frame={format_frame(COMMAND7_REPEAT_FRAME)}", file=stdout)
print(f"log={log_path}", file=stdout)
def _expected_word_for_force(frame: bytes, default: int) -> int:
if len(frame) == 6 and frame_checksum_ok(frame) and frame[0] == 0x04 and frame[1] == 0 and frame[2] == 0:
return ((frame[3] << 8) | 0x0080) & 0xFFFF
return default
def _parse_prefix(text: str) -> bytes:
normalized = text.strip().replace(",", " ").replace(":", " ").replace("-", " ").replace("_", " ")
if not normalized:
return b""
parts = normalized.split()
if len(parts) == 1:
compact = parts[0]
if compact.lower().startswith("0x"):
compact = compact[2:]
if compact.upper().startswith("H'"):
compact = compact[2:]
if len(compact) % 2:
compact = "0" + compact
return bytes(int(compact[index : index + 2], 16) for index in range(0, len(compact), 2))
return bytes(int(part, 16) for part in parts)
def _parse_hex_bytes(text: str) -> bytes:
normalized = text.strip().replace(",", " ").replace(":", " ").replace("-", " ").replace("_", " ")
if not normalized:
return b""
return bytes(int(part, 16) for part in normalized.split())
def _timestamp_to_ms(text: str) -> int:
hour, minute, rest = text.split(":")
second, milli = rest.split(".")
return ((int(hour) * 60 + int(minute)) * 60 + int(second)) * 1000 + int(milli)
def _now_ms() -> int:
now = datetime.now()
return ((now.hour * 60 + now.minute) * 60 + now.second) * 1000 + now.microsecond // 1000
def _int_arg(text: str) -> int:
return int(text, 0)
def _prompt_screen(label: str, logger: BenchLogger) -> None:
note = input(f"{label}: type observed LCD text, or press Enter to skip: ").strip()
logger.event(f"SCREEN {label}: {note or '(no note)'}")
def _write_json(path: Path, analysis: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(analysis, indent=2, sort_keys=True) + "\n", encoding="utf-8")
_CHUNK_RE = re.compile(
r"^(?P<ts>\d\d:\d\d:\d\d\.\d{3})\s+(?P<direction>TX|RX)\s+\d+\s+bytes\s+(?P<hex>[0-9A-Fa-f ]+)$"
)
_DETECT_RE = re.compile(
r"^(?P<ts>\d\d:\d\d:\d\d\.\d{3})\s+DETECT\s+(?P<label>\S+)\s+(?P<hex>[0-9A-Fa-f ]+)$"
)
__all__ = [
"COMMAND7_REPEAT_FRAME",
"CONNECT_FORCE_PRESETS",
"READBACK_E000_FRAME",
"StateMapEvent",
"analyze_events",
"build_arg_parser",
"format_analysis_report",
"main",
"parse_bench_log",
"resolve_force",
]