1
0
Files
h8-536-decoder/h8536/emulator/state_search.py
2026-05-26 10:48:39 +10:00

394 lines
14 KiB
Python

from __future__ import annotations
import argparse
import itertools
import json
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, TextIO
from ..formatting import h16, parse_int
from .cli import load_rom
from .errors import UnsupportedInstruction
from .runner import H8536Emulator
from .rx_probe import RunContext, _run_until, _rx_ready
CONNECT_WORDS = (0x0000, 0x0080, 0x4080, 0x8080, 0xC080)
F730_LATCH_VALUES = (0x00, 0x01, 0x41, 0x81, 0xC1)
PRESET_DESCRIPTIONS = {
"connect-branch": "Start at loc_2CB9 with E000[0]/F730 matrix patches.",
"connect-queue": "Queue selector zero in F970, start at loc_2806, then enter loc_2CB9 through the ROM dispatch.",
"custom": "Use only --byte/--word/--matrix-* patches and --pc.",
}
@dataclass(frozen=True)
class StatePatch:
size: int
address: int
value: int
source: str = "user"
def label(self) -> str:
kind = "byte" if self.size == 1 else "word"
width = 2 if self.size == 1 else 4
return f"{kind}:{h16(self.address)}=0x{self.value:0{width}X}"
@dataclass(frozen=True)
class SearchCase:
patches: tuple[StatePatch, ...]
pc: int
def name(self) -> str:
return ", ".join(patch.label() for patch in self.patches)
@dataclass(frozen=True)
class SearchResult:
case_index: int
patches: tuple[StatePatch, ...]
pc: int
steps: int
stopped_reason: str
final_pc: int
display: str
line0: str
outcome: str
f730: int
e000: int
f9b4: int
f9b9: int
unsupported: str | None = None
def payload(self) -> dict[str, object]:
return {
"case_index": self.case_index,
"patches": [patch.label() for patch in self.patches],
"pc": h16(self.pc),
"steps": self.steps,
"stopped_reason": self.stopped_reason,
"final_pc": h16(self.final_pc),
"display": self.display,
"line0": self.line0,
"outcome": self.outcome,
"f730": f"0x{self.f730:02X}",
"e000": f"0x{self.e000:04X}",
"f9b4": f"0x{self.f9b4:02X}",
"f9b9": f"0x{self.f9b9:02X}",
"unsupported": self.unsupported,
}
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=(
"Patch internal emulator state and search for CONNECT LCD outcomes. "
"This is bounded ROM execution, not an unbounded serial fuzzer."
)
)
parser.add_argument("--preset", choices=sorted(PRESET_DESCRIPTIONS), default="connect-queue")
parser.add_argument("--rom", type=Path, help="ROM image path; defaults to ROM/M27C512@DIP28_1.BIN when present")
parser.add_argument("--pc", type=parse_int, help="entry PC for custom/direct tests")
parser.add_argument("--stop-pc", type=parse_int, default=0xFFFF, help="sentinel PC used to stop after RTS")
parser.add_argument("--stack", type=parse_int, default=0xFF00, help="temporary stack pointer for direct function entry")
parser.add_argument("--boot-steps", type=int, default=250_000, help="steps to boot before patching state")
parser.add_argument("--max-steps", type=int, default=120_000, help="steps to run each patched case")
parser.add_argument("--clock-hz", type=parse_int, default=10_000_000)
parser.add_argument("--interval-steps", type=int, default=512)
parser.add_argument("--frt1-ocia-steps", type=int, default=None)
parser.add_argument("--frt2-ocia-steps", type=int, default=None)
parser.add_argument("--no-p9-fast-path", action="store_true")
parser.add_argument("--p9-fast-input", type=parse_int, default=0xFF)
parser.add_argument("--p9-fast-optimistic-wrapper", action="store_true")
parser.add_argument("--p7-input", type=parse_int, default=0xFF)
parser.add_argument("--eeprom-seed", choices=("blank", "factory"), default="blank")
parser.add_argument("--byte", action="append", default=[], help="fixed byte patch, e.g. F730=0")
parser.add_argument("--word", action="append", default=[], help="fixed word patch, e.g. E000=0x8080")
parser.add_argument("--matrix-byte", action="append", default=[], help="byte matrix patch, e.g. F730=0,1,0x41")
parser.add_argument("--matrix-word", action="append", default=[], help="word matrix patch, e.g. E000=0x4080,0x8080")
parser.add_argument("--target", choices=("ok", "dxc", "not-act", "any-connect", "changed"), default="ok")
parser.add_argument("--first-hit", action="store_true", help="stop after the first target hit")
parser.add_argument("--show-all", action="store_true", help="print every case, not only hits/non-baseline outcomes")
parser.add_argument("--limit", type=int, help="maximum number of cases to run")
parser.add_argument("--json-out", type=Path, help="write machine-readable results")
parser.add_argument("--dry-run", action="store_true", help="print planned cases without running the emulator")
return parser
def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
args = build_arg_parser().parse_args(argv)
cases = build_cases(args)
if args.limit is not None:
cases = cases[: max(0, args.limit)]
if args.dry_run:
print(f"preset={args.preset} cases={len(cases)}", file=stdout)
print(PRESET_DESCRIPTIONS[args.preset], file=stdout)
for index, case in enumerate(cases[:32]):
print(f"case[{index}] pc={h16(case.pc)} {case.name()}", file=stdout)
if len(cases) > 32:
print(f"... {len(cases) - 32} more cases", file=stdout)
return 0
results = run_search(args, cases)
print(format_results(results, target=args.target, show_all=args.show_all), file=stdout)
if args.json_out:
write_json(args.json_out, args, cases, results)
return 0
def build_cases(args: argparse.Namespace) -> list[SearchCase]:
fixed = []
fixed.extend(parse_patch_list(args.byte, size=1, source="user"))
fixed.extend(parse_patch_list(args.word, size=2, source="user"))
matrix_groups: list[list[StatePatch]] = []
if args.preset == "connect-branch":
matrix_groups.extend(
[
[StatePatch(2, 0xE000, value, "preset") for value in CONNECT_WORDS],
[StatePatch(1, 0xF730, value, "preset") for value in F730_LATCH_VALUES],
]
)
default_pc = 0x2CB9
elif args.preset == "connect-queue":
fixed.extend(
[
StatePatch(1, 0xF9B9, 0x00, "preset"),
StatePatch(1, 0xF9B4, 0x01, "preset"),
StatePatch(2, 0xF970, 0x0000, "preset"),
]
)
matrix_groups.extend(
[
[StatePatch(2, 0xE000, value, "preset") for value in CONNECT_WORDS],
[StatePatch(1, 0xF730, value, "preset") for value in F730_LATCH_VALUES],
]
)
default_pc = 0x2806
else:
default_pc = 0x2CB9
matrix_groups.extend(parse_matrix_list(args.matrix_byte, size=1, source="user"))
matrix_groups.extend(parse_matrix_list(args.matrix_word, size=2, source="user"))
pc = args.pc if args.pc is not None else default_pc
if not matrix_groups:
return [SearchCase(tuple(fixed), pc)]
return [
SearchCase(tuple(fixed + list(combo)), pc)
for combo in itertools.product(*matrix_groups)
]
def run_search(args: argparse.Namespace, cases: Iterable[SearchCase]) -> list[SearchResult]:
rom_bytes, _rom_path = load_rom(args.rom)
results: list[SearchResult] = []
for index, case in enumerate(cases):
emulator = H8536Emulator(
rom_bytes,
interval_steps=args.interval_steps,
frt1_ocia_steps=args.frt1_ocia_steps,
frt2_ocia_steps=args.frt2_ocia_steps,
clock_hz=args.clock_hz,
p9_fast_path_enabled=not args.no_p9_fast_path,
p9_fast_default_input_byte=args.p9_fast_input,
p9_fast_default_wrapper_success=args.p9_fast_optimistic_wrapper,
p7_input=args.p7_input,
eeprom_seed=args.eeprom_seed,
)
_run_until(emulator, args.boot_steps, _rx_ready, RunContext())
result = run_case(emulator, case, max_steps=args.max_steps, stop_pc=args.stop_pc, stack=args.stack)
results.append(SearchResult(index, case.patches, case.pc, *result))
if args.first_hit and target_matches(results[-1].outcome, args.target):
break
return results
def run_case(
emulator: H8536Emulator,
case: SearchCase,
*,
max_steps: int,
stop_pc: int,
stack: int,
) -> tuple[int, str, int, str, str, str, int, int, int, int, str | None]:
for patch in case.patches:
apply_patch(emulator, patch)
emulator.cpu.sr |= 0x0700
emulator.cpu.regs[7] = stack & 0xFFFF
emulator.memory.write16(stack, stop_pc)
emulator.cpu.pc = case.pc & 0xFFFF
stopped_reason = "max_steps"
unsupported: str | None = None
steps = 0
for index in range(max(0, max_steps)):
if emulator.cpu.pc == (stop_pc & 0xFFFF):
stopped_reason = "stop_pc"
break
try:
emulator.step()
except UnsupportedInstruction as exc:
stopped_reason = "unsupported_instruction"
unsupported = str(exc)
break
steps = index + 1
display = emulator.memory.lcd.display_text(lines=4, width=16)
line0 = emulator.memory.lcd.line_text(0)
outcome = classify_display(display)
return (
steps,
stopped_reason,
emulator.cpu.pc,
display,
line0,
outcome,
emulator.memory.read8(0xF730),
emulator.memory.read16(0xE000),
emulator.memory.read8(0xF9B4),
emulator.memory.read8(0xF9B9),
unsupported,
)
def apply_patch(emulator: H8536Emulator, patch: StatePatch) -> None:
if patch.size == 1:
emulator.memory.write8(patch.address, patch.value)
elif patch.size == 2:
emulator.memory.write16(patch.address, patch.value)
else:
raise ValueError(f"unsupported patch size {patch.size}")
def classify_display(display: str) -> str:
if "CONNECT: OK" in display:
return "ok"
if "CONNECT:DXC-637" in display:
return "dxc"
if "CONNECT:NOT ACT" in display or "CONNECT NOT ACT" in display:
return "not-act"
if "CONNECT" in display:
return "other-connect"
return "other"
def target_matches(outcome: str, target: str) -> bool:
if target == "any-connect":
return outcome in {"ok", "dxc", "not-act", "other-connect"}
if target == "changed":
return outcome != "not-act"
return outcome == target
def format_results(results: list[SearchResult], *, target: str, show_all: bool = False) -> str:
lines = [
"Emulator CONNECT state search",
f"cases={len(results)} target={target}",
]
hits = [result for result in results if target_matches(result.outcome, target)]
lines.append(f"hits={len(hits)}")
selected = results if show_all else [result for result in results if result.outcome != "not-act"]
if not selected:
selected = hits[:]
for result in selected:
hit = "hit" if target_matches(result.outcome, target) else "miss"
lines.append(
f"[{result.case_index:03d}] {hit} outcome={result.outcome} stopped={result.stopped_reason} "
f"steps={result.steps} pc={h16(result.final_pc)} E000=0x{result.e000:04X} "
f"F730=0x{result.f730:02X} line0={result.line0!r}"
)
lines.append(" " + ", ".join(patch.label() for patch in result.patches))
return "\n".join(lines)
def parse_patch_list(values: Iterable[str], *, size: int, source: str) -> list[StatePatch]:
return [parse_single_patch(value, size=size, source=source) for value in values]
def parse_matrix_list(values: Iterable[str], *, size: int, source: str) -> list[list[StatePatch]]:
return [parse_matrix_patch(value, size=size, source=source) for value in values]
def parse_single_patch(text: str, *, size: int, source: str = "user") -> StatePatch:
address, value_text = split_assignment(text)
value = parse_int(value_text)
return StatePatch(size, parse_address(address), value & mask_for_size(size), source)
def parse_matrix_patch(text: str, *, size: int, source: str = "user") -> list[StatePatch]:
address, values_text = split_assignment(text)
address_value = parse_address(address)
patches = []
for value_text in values_text.split(","):
if not value_text.strip():
continue
patches.append(StatePatch(size, address_value, parse_int(value_text) & mask_for_size(size), source))
if not patches:
raise argparse.ArgumentTypeError(f"matrix patch has no values: {text!r}")
return patches
def split_assignment(text: str) -> tuple[str, str]:
if "=" not in text:
raise argparse.ArgumentTypeError(f"patch must be ADDRESS=VALUE, got {text!r}")
left, right = text.split("=", 1)
if not left.strip() or not right.strip():
raise argparse.ArgumentTypeError(f"patch must be ADDRESS=VALUE, got {text!r}")
return left.strip(), right.strip()
def parse_address(text: str) -> int:
token = text.strip()
if token.upper().startswith("H'"):
token = "0x" + token[2:]
elif not token.lower().startswith("0x") and any(char in token.upper() for char in "ABCDEF"):
token = "0x" + token
return parse_int(token) & 0xFFFF
def mask_for_size(size: int) -> int:
if size == 1:
return 0xFF
if size == 2:
return 0xFFFF
raise ValueError(f"unsupported patch size {size}")
def write_json(path: Path, args: argparse.Namespace, cases: list[SearchCase], results: list[SearchResult]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"kind": "h8536_emulator_state_search",
"preset": args.preset,
"description": PRESET_DESCRIPTIONS[args.preset],
"target": args.target,
"case_count": len(cases),
"result_count": len(results),
"hits": [result.payload() for result in results if target_matches(result.outcome, args.target)],
"results": [result.payload() for result in results],
}
path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
__all__ = [
"CONNECT_WORDS",
"F730_LATCH_VALUES",
"SearchCase",
"SearchResult",
"StatePatch",
"apply_patch",
"build_arg_parser",
"build_cases",
"classify_display",
"main",
"parse_address",
"parse_matrix_patch",
"parse_single_patch",
"run_case",
"run_search",
"target_matches",
]