otehr lamps
This commit is contained in:
336
h8536/emulator/report_queue_probe.py
Normal file
336
h8536/emulator/report_queue_probe.py
Normal file
@@ -0,0 +1,336 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from ..formatting import h16, parse_int
|
||||
from .cli import load_rom
|
||||
from .eeprom_image import write_eeprom_snapshot
|
||||
from .runner import H8536Emulator
|
||||
from .rx_probe import RunContext, _run_until, _rx_ready, format_frame
|
||||
from .uart import UartTiming
|
||||
|
||||
|
||||
CHECKSUM_SEED = 0x5A
|
||||
REPORT_QUEUE_START = 0xF870
|
||||
REPORT_QUEUE_HEAD = 0xF9B0
|
||||
REPORT_QUEUE_TAIL = 0xF9B5
|
||||
TX_STAGING_START = 0xF850
|
||||
TX_FRAME_START = 0xF858
|
||||
TX_FRAME_LENGTH = 6
|
||||
CURRENT_TABLE_START = 0xE800
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ReportQueueProbeResult:
|
||||
rom_path: Path
|
||||
report_word: int
|
||||
payload_selector: int
|
||||
payload: int
|
||||
expected_frame: bytes
|
||||
boot_summary: str
|
||||
steps: int
|
||||
stopped_reason: str
|
||||
tx_frames: tuple[bytes, ...]
|
||||
staging_bytes: bytes
|
||||
finalized_bytes: bytes
|
||||
state: dict[str, int]
|
||||
eeprom_writes: tuple[str, ...]
|
||||
|
||||
@property
|
||||
def emitted_expected_frame(self) -> bool:
|
||||
return self.expected_frame in self.tx_frames
|
||||
|
||||
def as_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"kind": "h8536_report_queue_probe",
|
||||
"rom_path": str(self.rom_path),
|
||||
"report_word": f"0x{self.report_word:04X}",
|
||||
"payload_selector": f"0x{self.payload_selector:04X}",
|
||||
"payload": f"0x{self.payload:04X}",
|
||||
"expected_frame": format_frame(self.expected_frame),
|
||||
"emitted_expected_frame": self.emitted_expected_frame,
|
||||
"boot_summary": self.boot_summary,
|
||||
"steps": self.steps,
|
||||
"stopped_reason": self.stopped_reason,
|
||||
"tx_frames": [format_frame(frame) for frame in self.tx_frames],
|
||||
"staging_bytes_f850_f855": format_frame(self.staging_bytes),
|
||||
"finalized_bytes_f858_f85d": format_frame(self.finalized_bytes),
|
||||
"state": {name: _format_state_value(name, value) for name, value in self.state.items()},
|
||||
"eeprom_writes": list(self.eeprom_writes),
|
||||
}
|
||||
|
||||
def lines(self) -> list[str]:
|
||||
lines = [
|
||||
f"rom={self.rom_path}",
|
||||
self.boot_summary,
|
||||
f"report_word={h16(self.report_word)} payload_selector={h16(self.payload_selector)} payload={h16(self.payload)}",
|
||||
f"expected_frame={format_frame(self.expected_frame)}",
|
||||
f"emitted_expected_frame={int(self.emitted_expected_frame)}",
|
||||
f"stopped={self.stopped_reason} steps={self.steps}",
|
||||
"tx_frames=" + (" | ".join(format_frame(frame) for frame in self.tx_frames) or "none"),
|
||||
f"staging_F850_F855={format_frame(self.staging_bytes)}",
|
||||
f"finalized_F858_F85D={format_frame(self.finalized_bytes)}",
|
||||
"state:",
|
||||
]
|
||||
for name, value in self.state.items():
|
||||
width = 4 if name in {"queue_word", "current_table_word"} else 2
|
||||
lines.append(f" {name}=0x{value:0{width}X}")
|
||||
if self.eeprom_writes:
|
||||
lines.append("eeprom_writes:")
|
||||
lines.extend(f" {line}" for line in self.eeprom_writes)
|
||||
else:
|
||||
lines.append("eeprom_writes=none")
|
||||
return lines
|
||||
|
||||
|
||||
def build_expected_report_frame(report_word: int, payload: int) -> bytes:
|
||||
first, second, third = encode_report_header(report_word)
|
||||
frame = bytes(
|
||||
[
|
||||
first,
|
||||
second,
|
||||
third,
|
||||
(payload >> 8) & 0xFF,
|
||||
payload & 0xFF,
|
||||
]
|
||||
)
|
||||
return frame + bytes([frame_checksum(frame)])
|
||||
|
||||
|
||||
def encode_report_header(report_word: int) -> tuple[int, int, int]:
|
||||
raw = report_word & 0xFFFF
|
||||
encoded_selector = _loc_6206_selector_encode(raw)
|
||||
|
||||
r1 = _swap_bytes(raw)
|
||||
r1 = (r1 & 0xFF00) | ((r1 & 0xFF) >> 1)
|
||||
r2 = r1 & 0xFF
|
||||
|
||||
first = r1 & 0x07
|
||||
third = encoded_selector & 0xFF
|
||||
swapped_encoded = _swap_bytes(encoded_selector)
|
||||
second = (swapped_encoded & 0xFF) | (r2 & 0x78)
|
||||
return first & 0xFF, second & 0xFF, third & 0xFF
|
||||
|
||||
|
||||
def report_payload_selector(report_word: int) -> int:
|
||||
return report_word & 0x01FF
|
||||
|
||||
|
||||
def frame_checksum(frame_without_checksum: bytes) -> int:
|
||||
checksum = CHECKSUM_SEED
|
||||
for value in frame_without_checksum[: TX_FRAME_LENGTH - 1]:
|
||||
checksum ^= value
|
||||
return checksum & 0xFF
|
||||
|
||||
|
||||
def run_report_queue_probe(
|
||||
*,
|
||||
report_word: int = 0x0204,
|
||||
payload: int = 0x0000,
|
||||
queue_index: int = 0,
|
||||
rom_path: Path | None = None,
|
||||
boot_steps: int = 250_000,
|
||||
max_steps: int = 200_000,
|
||||
eeprom_seed: str = "blank",
|
||||
eeprom_image: bytes | None = None,
|
||||
tx_wire_timing: bool = False,
|
||||
uart_baud: int = 38_400,
|
||||
uart_format: str = "8E1",
|
||||
p9_fast_path: bool = True,
|
||||
p9_fast_input: int = 0xFF,
|
||||
p7_input: int = 0xFF,
|
||||
) -> tuple[H8536Emulator, ReportQueueProbeResult]:
|
||||
rom_bytes, discovered_rom_path = load_rom(rom_path)
|
||||
emulator = H8536Emulator(
|
||||
rom_bytes,
|
||||
p9_fast_path_enabled=p9_fast_path,
|
||||
p9_fast_default_input_byte=p9_fast_input,
|
||||
p7_input=p7_input,
|
||||
eeprom_seed=eeprom_seed,
|
||||
sci1_tx_timing=UartTiming.from_format(uart_format, baud=uart_baud) if tx_wire_timing else None,
|
||||
)
|
||||
if eeprom_image is not None:
|
||||
emulator.memory.load_eeprom_image(eeprom_image)
|
||||
|
||||
boot_context = RunContext()
|
||||
boot_used, boot_reason = _run_until(emulator, boot_steps, _rx_ready, boot_context)
|
||||
boot_summary = (
|
||||
f"boot={boot_reason} steps={boot_used} pc={h16(emulator.cpu.pc)} "
|
||||
f"rx_serviceable={int(_rx_ready(emulator))} lcd_display={emulator.memory.lcd.display_text(lines=4)!r}"
|
||||
)
|
||||
|
||||
payload_selector = report_payload_selector(report_word)
|
||||
expected_frame = build_expected_report_frame(report_word, payload)
|
||||
_seed_report_queue(emulator, report_word=report_word, payload=payload, queue_index=queue_index)
|
||||
emulator.memory.clear_eeprom_write_log()
|
||||
|
||||
start_frame_count = len(emulator.sci1.tx_frames)
|
||||
|
||||
def emitted_expected(inner: H8536Emulator) -> bool:
|
||||
return expected_frame in inner.sci1.tx_frames[start_frame_count:]
|
||||
|
||||
run_context = RunContext()
|
||||
steps, reason = _run_until(emulator, max_steps, emitted_expected, run_context)
|
||||
stopped_reason = "expected_frame" if reason == "predicate" else reason
|
||||
|
||||
state = _state_snapshot(emulator, queue_index=queue_index, payload_selector=payload_selector)
|
||||
staging = bytes(emulator.memory.read8(address) for address in range(TX_STAGING_START, TX_STAGING_START + TX_FRAME_LENGTH))
|
||||
finalized = bytes(emulator.memory.read8(address) for address in range(TX_FRAME_START, TX_FRAME_START + TX_FRAME_LENGTH))
|
||||
result = ReportQueueProbeResult(
|
||||
rom_path=discovered_rom_path,
|
||||
report_word=report_word & 0xFFFF,
|
||||
payload_selector=payload_selector,
|
||||
payload=payload & 0xFFFF,
|
||||
expected_frame=expected_frame,
|
||||
boot_summary=boot_summary,
|
||||
steps=steps,
|
||||
stopped_reason=stopped_reason,
|
||||
tx_frames=tuple(emulator.sci1.tx_frames[start_frame_count:]),
|
||||
staging_bytes=staging,
|
||||
finalized_bytes=finalized,
|
||||
state=state,
|
||||
eeprom_writes=tuple(emulator.memory.p9_bus.x24164_bus.write_log_lines(limit=80)),
|
||||
)
|
||||
return emulator, result
|
||||
|
||||
|
||||
def build_arg_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Seed the ROM report queue and let the H8/536 ROM build the matching SCI1 TX frame."
|
||||
)
|
||||
parser.add_argument("--rom", type=Path, help="ROM image path; defaults to ROM/M27C512@DIP28_1.BIN")
|
||||
parser.add_argument("--report-word", type=parse_int, default=0x0204, help="word to place in F870 report queue")
|
||||
parser.add_argument("--payload", type=parse_int, default=0x0000, help="word to place in E800[current selector]")
|
||||
parser.add_argument("--queue-index", type=parse_int, default=0, help="F870 queue slot to seed")
|
||||
parser.add_argument("--boot-steps", type=int, default=250_000, help="maximum boot steps before queue seeding")
|
||||
parser.add_argument("--max-steps", type=int, default=200_000, help="maximum steps after queue seeding")
|
||||
parser.add_argument("--tx-wire-timing", action="store_true", help="model UART TX character time between TXI bytes")
|
||||
parser.add_argument("--uart-baud", type=parse_int, default=38_400, help="baud rate used with --tx-wire-timing")
|
||||
parser.add_argument("--uart-format", default="8E1", help="UART format used with --tx-wire-timing")
|
||||
parser.add_argument("--no-p9-fast-path", action="store_true", help="disable shortcut handling for known P9 routines")
|
||||
parser.add_argument("--p9-fast-input", type=parse_int, default=0xFF, help="default byte returned by P9 fast-path reads")
|
||||
parser.add_argument("--p7-input", type=parse_int, default=0xFF, help="external P7 pin state; DIP-off default is 0xFF")
|
||||
parser.add_argument("--eeprom-seed", choices=("blank", "factory"), default="blank", help="initial X24164/shadow state")
|
||||
parser.add_argument("--eeprom-load", type=Path, help="load a 0x1000-byte logical EEPROM image before booting")
|
||||
parser.add_argument("--eeprom-save", type=Path, help="save the final EEPROM image")
|
||||
parser.add_argument("--eeprom-report", type=Path, help="write a readable EEPROM snapshot")
|
||||
parser.add_argument("--eeprom-report-json", type=Path, help="write a structured EEPROM snapshot")
|
||||
parser.add_argument("--eeprom-report-include-image", action="store_true", help="include full EEPROM image in JSON")
|
||||
parser.add_argument("--json", action="store_true", help="print JSON instead of text")
|
||||
return parser
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
args = build_arg_parser().parse_args(argv)
|
||||
emulator, result = run_report_queue_probe(
|
||||
report_word=args.report_word,
|
||||
payload=args.payload,
|
||||
queue_index=args.queue_index,
|
||||
rom_path=args.rom,
|
||||
boot_steps=args.boot_steps,
|
||||
max_steps=args.max_steps,
|
||||
eeprom_seed=args.eeprom_seed,
|
||||
eeprom_image=args.eeprom_load.read_bytes() if args.eeprom_load else None,
|
||||
tx_wire_timing=args.tx_wire_timing,
|
||||
uart_baud=args.uart_baud,
|
||||
uart_format=args.uart_format,
|
||||
p9_fast_path=not args.no_p9_fast_path,
|
||||
p9_fast_input=args.p9_fast_input,
|
||||
p7_input=args.p7_input,
|
||||
)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(result.as_dict(), indent=2, sort_keys=True))
|
||||
else:
|
||||
for line in result.lines():
|
||||
print(line)
|
||||
|
||||
if args.eeprom_save:
|
||||
args.eeprom_save.parent.mkdir(parents=True, exist_ok=True)
|
||||
args.eeprom_save.write_bytes(emulator.memory.dump_eeprom_image())
|
||||
print(f"eeprom_saved={args.eeprom_save}")
|
||||
if args.eeprom_report:
|
||||
write_eeprom_snapshot(emulator.memory, args.eeprom_report, rom_bytes=emulator.memory.rom.data)
|
||||
print(f"eeprom_report={args.eeprom_report}")
|
||||
if args.eeprom_report_json:
|
||||
write_eeprom_snapshot(
|
||||
emulator.memory,
|
||||
args.eeprom_report_json,
|
||||
rom_bytes=emulator.memory.rom.data,
|
||||
as_json=True,
|
||||
include_image_hex=args.eeprom_report_include_image,
|
||||
)
|
||||
print(f"eeprom_report_json={args.eeprom_report_json}")
|
||||
return 0
|
||||
|
||||
|
||||
def _seed_report_queue(
|
||||
emulator: H8536Emulator,
|
||||
*,
|
||||
report_word: int,
|
||||
payload: int,
|
||||
queue_index: int,
|
||||
) -> None:
|
||||
queue_index &= 0x7F
|
||||
queue_address = REPORT_QUEUE_START + (queue_index * 2)
|
||||
payload_address = CURRENT_TABLE_START + (report_payload_selector(report_word) * 2)
|
||||
memory = emulator.memory
|
||||
memory.write16(queue_address, report_word)
|
||||
memory.write16(payload_address, payload)
|
||||
memory.write8(REPORT_QUEUE_TAIL, queue_index)
|
||||
memory.write8(REPORT_QUEUE_HEAD, (queue_index + 1) & 0x7F)
|
||||
memory.write8(0xF9C0, 0x00)
|
||||
memory.write8(0xF9C3, 0x00)
|
||||
memory.write8(0xFAA2, 0x00)
|
||||
memory.write8(0xFAA3, 0x00)
|
||||
|
||||
|
||||
def _state_snapshot(emulator: H8536Emulator, *, queue_index: int, payload_selector: int) -> dict[str, int]:
|
||||
memory = emulator.memory
|
||||
queue_address = REPORT_QUEUE_START + ((queue_index & 0x7F) * 2)
|
||||
payload_address = CURRENT_TABLE_START + ((payload_selector & 0x01FF) * 2)
|
||||
return {
|
||||
"queue_head_F9B0": memory.read8(REPORT_QUEUE_HEAD),
|
||||
"queue_tail_F9B5": memory.read8(REPORT_QUEUE_TAIL),
|
||||
"queue_word": memory.read16(queue_address),
|
||||
"current_table_word": memory.read16(payload_address),
|
||||
"tx_gate_F9C0": memory.read8(0xF9C0),
|
||||
"rx_index_F9C3": memory.read8(0xF9C3),
|
||||
"heartbeat_gate_F9C4": memory.read8(0xF9C4),
|
||||
"session_flags_FAA2": memory.read8(0xFAA2),
|
||||
"pending_mask_FAA3": memory.read8(0xFAA3),
|
||||
}
|
||||
|
||||
|
||||
def _loc_6206_selector_encode(value: int) -> int:
|
||||
value &= 0x01FF
|
||||
if value <= 0x007F:
|
||||
return value
|
||||
if value <= 0x017F:
|
||||
return ((value - 0x0080) + 0x0100) & 0xFFFF
|
||||
return ((value - 0x0180) + 0x0200) & 0xFFFF
|
||||
|
||||
|
||||
def _swap_bytes(value: int) -> int:
|
||||
value &= 0xFFFF
|
||||
return ((value & 0x00FF) << 8) | ((value >> 8) & 0x00FF)
|
||||
|
||||
|
||||
def _format_state_value(name: str, value: int) -> str:
|
||||
width = 4 if name in {"queue_word", "current_table_word"} else 2
|
||||
return f"0x{value:0{width}X}"
|
||||
|
||||
|
||||
__all__ = [
|
||||
"ReportQueueProbeResult",
|
||||
"build_expected_report_frame",
|
||||
"encode_report_header",
|
||||
"frame_checksum",
|
||||
"main",
|
||||
"report_payload_selector",
|
||||
"run_report_queue_probe",
|
||||
]
|
||||
Reference in New Issue
Block a user