1
0

further digging and basic emulator

This commit is contained in:
Aiden
2026-05-25 17:42:58 +10:00
parent 07f48c76e0
commit b264037e82
11 changed files with 1628 additions and 3 deletions

54
ROM/rcp-txd-idle-only.txt Normal file
View File

@@ -0,0 +1,54 @@
11:54:40.567 frame 006 00 00 00 00 80 DA
11:54:41.368 frame 006 00 00 00 00 80 DA
11:54:41.970 frame 006 00 00 00 00 80 DA
11:54:42.772 frame 006 00 00 00 00 80 DA
11:54:43.373 frame 006 00 00 00 00 80 DA
11:54:44.176 frame 006 00 00 00 00 80 DA
11:54:44.778 frame 006 00 00 00 00 80 DA
11:54:45.580 frame 006 00 00 00 00 80 DA
11:54:46.183 frame 006 00 00 00 00 80 DA
11:54:46.984 frame 006 00 00 00 00 80 DA
11:54:47.586 frame 006 00 00 00 00 80 DA
11:54:48.387 frame 006 00 00 00 00 80 DA
11:54:48.988 frame 006 00 00 00 00 80 DA
11:54:49.790 frame 006 00 00 00 00 80 DA
11:54:50.393 frame 006 00 00 00 00 80 DA
11:54:51.196 frame 006 00 00 00 00 80 DA
11:54:51.797 frame 006 00 00 00 00 80 DA
11:54:52.599 frame 006 00 00 00 00 80 DA
11:54:53.201 frame 006 00 00 00 00 80 DA
11:54:54.003 frame 006 00 00 00 00 80 DA
11:54:54.604 frame 006 00 00 00 00 80 DA
11:54:55.406 frame 006 00 00 00 00 80 DA
11:54:56.009 frame 006 00 00 00 00 80 DA
11:54:56.812 frame 006 00 00 00 00 80 DA
11:54:57.413 frame 006 00 00 00 00 80 DA
11:54:58.215 frame 006 00 00 00 00 80 DA
11:54:58.816 frame 006 00 00 00 00 80 DA
11:54:59.619 frame 006 00 00 00 00 80 DA
11:55:00.220 frame 006 00 00 00 00 80 DA
11:55:01.023 frame 006 00 00 00 00 80 DA
11:55:01.625 frame 006 00 00 00 00 80 DA
11:55:02.426 frame 006 00 00 00 00 80 DA
11:55:03.027 frame 006 00 00 00 00 80 DA
11:55:03.829 frame 006 00 00 00 00 80 DA
11:55:04.430 frame 006 00 00 00 00 80 DA
11:55:05.231 frame 006 00 00 00 00 80 DA
11:55:05.832 frame 006 00 00 00 00 80 DA
11:55:06.634 frame 006 00 00 00 00 80 DA
11:55:07.235 frame 006 00 00 00 00 80 DA
11:55:07.837 frame 006 00 00 00 00 80 DA
11:55:08.638 frame 006 00 00 00 00 80 DA
11:55:09.239 frame 006 00 00 00 00 80 DA
11:55:10.040 frame 006 00 00 00 00 80 DA
11:55:10.642 frame 006 00 00 00 00 80 DA
11:55:11.443 frame 006 00 00 00 00 80 DA
11:55:12.045 frame 006 00 00 00 00 80 DA
11:55:12.848 frame 006 00 00 00 00 80 DA
11:55:13.450 frame 006 00 00 00 00 80 DA
11:55:14.253 frame 006 00 00 00 00 80 DA
11:55:14.854 frame 006 00 00 00 00 80 DA
11:55:15.657 frame 006 00 00 00 00 80 DA
11:55:16.259 frame 006 00 00 00 00 80 DA
11:55:17.061 frame 006 00 00 00 00 80 DA
11:55:17.663 frame 006 00 00 00 00 80 DA

View File

@@ -0,0 +1,256 @@
{
"calls": [
{
"address": 5622,
"address_hex": "H'15F6",
"assessment": "Direct static enqueue source for 0x0081, not 0x0007.",
"can_directly_enqueue_report_index": false,
"dataflow_block": 5609,
"function_label": "loc_15E0",
"function_start": 5600,
"function_start_hex": "H'15E0",
"instruction": "BSR loc_3E54",
"r2": {
"bit7": true,
"classification": "constant",
"evidence": {
"address": 5617,
"address_hex": "H'15F1",
"instruction": "MOV:E.B #H'80, R2",
"mnemonic": "MOV:E.B",
"operands": "#H'80, R2"
},
"reason": "immediate load",
"register": "R2",
"value": 128,
"value_hex": "0x80"
},
"r3": {
"classification": "constant",
"evidence": {
"address": 5619,
"address_hex": "H'15F3",
"instruction": "MOV:I.W #H'0081, R3",
"mnemonic": "MOV:I.W",
"operands": "#H'0081, R3"
},
"reason": "immediate load",
"register": "R3",
"value": 129,
"value_hex": "0x0081"
},
"table_hints": [],
"target": 15956,
"target_hex": "H'3E54",
"window_instruction_count": 4,
"window_start": 5609,
"window_start_hex": "H'15E9"
},
{
"address": 6673,
"address_hex": "H'1A11",
"assessment": "No static 0x0007 constant here; R3 is dynamic/table-derived.",
"can_directly_enqueue_report_index": false,
"dataflow_block": 6665,
"function_label": "loc_19DB",
"function_start": 6619,
"function_start_hex": "H'19DB",
"instruction": "BSR loc_3E54",
"r2": {
"bit7": true,
"classification": "constant",
"evidence": {
"address": 6669,
"address_hex": "H'1A0D",
"instruction": "MOV:E.B #H'80, R2",
"mnemonic": "MOV:E.B",
"operands": "#H'80, R2"
},
"reason": "immediate load",
"register": "R2",
"value": 128,
"value_hex": "0x80"
},
"r3": {
"classification": "dynamic/table-derived",
"evidence": {
"address": 6671,
"address_hex": "H'1A0F",
"instruction": "MOV:G.W R5, R3",
"mnemonic": "MOV:G.W",
"operands": "R5, R3"
},
"reason": "copied from unresolved R5",
"register": "R3",
"value": null,
"value_hex": null
},
"table_hints": [
{
"address": 6665,
"address_hex": "H'1A09",
"instruction": "MOV:G.W R1, @(-H'1800,R3)",
"operand": "@(-H'1800,R3)",
"table": "current_value_table_candidate"
}
],
"target": 15956,
"target_hex": "H'3E54",
"window_instruction_count": 3,
"window_start": 6665,
"window_start_hex": "H'1A09"
},
{
"address": 6777,
"address_hex": "H'1A79",
"assessment": "No static 0x0007 constant here; R3 is dynamic/table-derived.",
"can_directly_enqueue_report_index": false,
"dataflow_block": 6769,
"function_label": "loc_1A35",
"function_start": 6709,
"function_start_hex": "H'1A35",
"instruction": "BSR loc_3E54",
"r2": {
"bit7": true,
"classification": "constant",
"evidence": {
"address": 6773,
"address_hex": "H'1A75",
"instruction": "MOV:E.B #H'80, R2",
"mnemonic": "MOV:E.B",
"operands": "#H'80, R2"
},
"reason": "immediate load",
"register": "R2",
"value": 128,
"value_hex": "0x80"
},
"r3": {
"classification": "dynamic/table-derived",
"evidence": {
"address": 6775,
"address_hex": "H'1A77",
"instruction": "MOV:G.W R5, R3",
"mnemonic": "MOV:G.W",
"operands": "R5, R3"
},
"reason": "copied from unresolved R5",
"register": "R3",
"value": null,
"value_hex": null
},
"table_hints": [
{
"address": 6769,
"address_hex": "H'1A71",
"instruction": "MOV:G.W R0, @(-H'1800,R3)",
"operand": "@(-H'1800,R3)",
"table": "current_value_table_candidate"
}
],
"target": 15956,
"target_hex": "H'3E54",
"window_instruction_count": 3,
"window_start": 6769,
"window_start_hex": "H'1A71"
},
{
"address": 9896,
"address_hex": "H'26A8",
"assessment": "No static 0x0007 constant here; R3 is unknown.",
"can_directly_enqueue_report_index": false,
"dataflow_block": 9896,
"function_label": "loc_2650",
"function_start": 9808,
"function_start_hex": "H'2650",
"instruction": "BSR loc_3E54",
"r2": {
"bit7": null,
"classification": "unknown",
"evidence": null,
"reason": "decompiler dataflow: block_entry",
"register": "R2",
"value": null,
"value_hex": null
},
"r3": {
"classification": "unknown",
"evidence": null,
"reason": "decompiler dataflow: block_entry",
"register": "R3",
"value": null,
"value_hex": null
},
"table_hints": [],
"target": 15956,
"target_hex": "H'3E54",
"window_instruction_count": 0,
"window_start": 9896,
"window_start_hex": "H'26A8"
},
{
"address": 18726,
"address_hex": "H'4926",
"assessment": "Direct static enqueue source for 0x00f6, not 0x0007.",
"can_directly_enqueue_report_index": false,
"dataflow_block": 18709,
"function_label": "loc_48FA",
"function_start": 18682,
"function_start_hex": "H'48FA",
"instruction": "BSR loc_3E54",
"r2": {
"bit7": true,
"classification": "constant",
"evidence": {
"address": 18721,
"address_hex": "H'4921",
"instruction": "MOV:E.B #H'80, R2",
"mnemonic": "MOV:E.B",
"operands": "#H'80, R2"
},
"reason": "immediate load",
"register": "R2",
"value": 128,
"value_hex": "0x80"
},
"r3": {
"classification": "constant",
"evidence": {
"address": 18723,
"address_hex": "H'4923",
"instruction": "MOV:I.W #H'00F6, R3",
"mnemonic": "MOV:I.W",
"operands": "#H'00F6, R3"
},
"reason": "immediate load",
"register": "R3",
"value": 246,
"value_hex": "0x00F6"
},
"table_hints": [],
"target": 15956,
"target_hex": "H'3E54",
"window_instruction_count": 5,
"window_start": 18709,
"window_start_hex": "H'4915"
}
],
"caveats": [
"This is a bounded local static trace, not an emulator run.",
"R3 values classified as dynamic/table-derived may still become 0x0007 at runtime.",
"Indirect dispatch, table handlers, interrupt interleavings, or callers absent from the JSON may still enqueue 0x0007.",
"The generic queue-to-TX path only emits queued entries; this tracer looks for direct report-index sources at loc_3E54 callers."
],
"kind": "report_source_trace",
"queue_function": 15956,
"queue_function_hex": "H'3E54",
"report_index_of_interest": 7,
"report_index_of_interest_hex": "0x0007",
"summary": {
"conclusion": "No direct loc_3E54 caller in this JSON statically loads report index 0x0007. 0x0007 remains an observed runtime/capture value unless another indirect or table-dispatch path is proven.",
"direct_call_count": 5,
"direct_static_hit_count": 0,
"dynamic_or_unknown_candidate_count": 3,
"status": "not_statically_proven"
}
}

View File

@@ -0,0 +1,32 @@
H8/536 loc_3E54 Report Source Trace
Queue function: H'3E54
Report index of interest: 0x0007
Direct callers: 5
Direct static 0x0007 hits: 0
Dynamic/unknown candidates: 3
Conclusion: No direct loc_3E54 caller in this JSON statically loads report index 0x0007. 0x0007 remains an observed runtime/capture value unless another indirect or table-dispatch path is proven.
Call sites:
- H'15F6 in loc_15E0: R2.bit7=set, R3=0x0081 (constant); direct_0x0007=False
R2 evidence: H'15F1 MOV:E.B #H'80, R2
R3 evidence: H'15F3 MOV:I.W #H'0081, R3
- H'1A11 in loc_19DB: R2.bit7=set, R3=<dynamic> (dynamic/table-derived); direct_0x0007=False
R2 evidence: H'1A0D MOV:E.B #H'80, R2
R3 evidence: H'1A0F MOV:G.W R5, R3
table/context hints: H'1A09 current_value_table_candidate via @(-H'1800,R3)
- H'1A79 in loc_1A35: R2.bit7=set, R3=<dynamic> (dynamic/table-derived); direct_0x0007=False
R2 evidence: H'1A75 MOV:E.B #H'80, R2
R3 evidence: H'1A77 MOV:G.W R5, R3
table/context hints: H'1A71 current_value_table_candidate via @(-H'1800,R3)
- H'26A8 in loc_2650: R2.bit7=unknown, R3=<dynamic> (unknown); direct_0x0007=False
- H'4926 in loc_48FA: R2.bit7=set, R3=0x00F6 (constant); direct_0x0007=False
R2 evidence: H'4921 MOV:E.B #H'80, R2
R3 evidence: H'4923 MOV:I.W #H'00F6, R3
Caveats:
- This is a bounded local static trace, not an emulator run.
- R3 values classified as dynamic/table-derived may still become 0x0007 at runtime.
- Indirect dispatch, table handlers, interrupt interleavings, or callers absent from the JSON may still enqueue 0x0007.
- The generic queue-to-TX path only emits queued entries; this tracer looks for direct report-index sources at loc_3E54 callers.

480
h8536/emulator.py Normal file
View File

@@ -0,0 +1,480 @@
from __future__ import annotations
import argparse
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable
from .decoder import H8536Decoder
from .formatting import h8, h16
from .memory import MEMORY_REGIONS, MemoryRegion, region_for
from .rom import DecodeError, Rom
from .vectors import read_vectors_min
SCI1_SMR = 0xFED8
SCI1_BRR = 0xFED9
SCI1_SCR = 0xFEDA
SCI1_TDR = 0xFEDB
SCI1_SSR = 0xFEDC
SCI1_RDR = 0xFEDD
SCI_SCR_TIE = 0x80
SCI_SCR_RIE = 0x40
SCI_SCR_TE = 0x20
SCI_SCR_RE = 0x10
SCI_SSR_TDRE = 0x80
SCI_SSR_RDRF = 0x40
SCI_SSR_ORER = 0x20
SCI_SSR_FER = 0x10
SCI_SSR_PER = 0x08
ON_CHIP_RAM_START = 0xF680
ON_CHIP_RAM_END = 0xFE7F
REGISTER_FIELD_START = 0xFE80
REGISTER_FIELD_END = 0xFFFF
RAMCR = 0xFF11
HEARTBEAT_FRAME = bytes([0x00, 0x00, 0x00, 0x00, 0x80, 0xDA])
class EmulatorError(Exception):
pass
class UnsupportedInstruction(EmulatorError):
def __init__(self, pc: int, raw: bytes, text: str) -> None:
raw_text = " ".join(f"{byte:02X}" for byte in raw)
super().__init__(f"unsupported instruction at {h16(pc)}: {raw_text} {text}".rstrip())
self.pc = pc
self.raw = raw
self.text = text
@dataclass
class SciTxEvent:
address: int
value: int
scr: int
ssr: int
emitted: bool
@dataclass
class SCI1:
"""Small SCI1 model for the H8/536 serial path.
Manual anchors:
- RDR/TDR/SCR/SSR live at H'FEDD/H'FEDB/H'FEDA/H'FEDC for SCI1.
- SCR bit 7 TIE, bit 6 RIE, bit 5 TE, bit 4 RE.
- SSR bit 7 TDRE, bit 6 RDRF, bits 5..3 ORER/FER/PER.
- Software normally writes TDR after TDRE=1, then clears SSR.TDRE.
"""
smr: int = 0x00
brr: int = 0xFF
scr: int = 0x0C
tdr: int = 0xFF
ssr: int = 0x87
rdr: int = 0x00
tx_bytes: list[int] = field(default_factory=list)
tx_events: list[SciTxEvent] = field(default_factory=list)
tx_frames: list[bytes] = field(default_factory=list)
_frame_buffer: bytearray = field(default_factory=bytearray)
def read(self, address: int) -> int:
if address == SCI1_SMR:
return self.smr
if address == SCI1_BRR:
return self.brr
if address == SCI1_SCR:
return self.scr
if address == SCI1_TDR:
return self.tdr
if address == SCI1_SSR:
return self.ssr
if address == SCI1_RDR:
return self.rdr
raise KeyError(address)
def write(self, address: int, value: int) -> None:
value &= 0xFF
if address == SCI1_SMR:
self.smr = value
elif address == SCI1_BRR:
self.brr = value
elif address == SCI1_SCR:
self.scr = value
elif address == SCI1_TDR:
self.tdr = value
self._write_tdr(value)
elif address == SCI1_SSR:
# The real SSR is R/(W)*: writable zeroes clear latched flags after
# the required read sequence. This scaffold applies zero bits
# directly so ROM BCLR/BSET style accesses can be modeled.
self.ssr = value
elif address == SCI1_RDR:
self.rdr = value
else:
raise KeyError(address)
def _write_tdr(self, value: int) -> None:
emitted = bool(self.scr & SCI_SCR_TE)
if emitted:
self.tx_bytes.append(value)
self._frame_buffer.append(value)
if len(self._frame_buffer) == len(HEARTBEAT_FRAME):
self.tx_frames.append(bytes(self._frame_buffer))
self._frame_buffer.clear()
self.ssr |= SCI_SSR_TDRE
self.tx_events.append(SciTxEvent(SCI1_TDR, value, self.scr, self.ssr, emitted))
def inject_rx(self, value: int) -> None:
self.rdr = value & 0xFF
self.ssr |= SCI_SSR_RDRF
def saw_heartbeat(self) -> bool:
return HEARTBEAT_FRAME in self.tx_frames
@dataclass
class MemoryAccess:
address: int
size: int
value: int
kind: str
region: str
class MemoryMap:
def __init__(self, rom_bytes: bytes, sci1: SCI1 | None = None) -> None:
self.rom = Rom(rom_bytes, base=0)
self.sci1 = sci1 if sci1 is not None else SCI1()
self.ram = bytearray(ON_CHIP_RAM_END - ON_CHIP_RAM_START + 1)
self.registers = bytearray(REGISTER_FIELD_END - REGISTER_FIELD_START + 1)
self.external: dict[int, int] = {}
self.access_log: list[MemoryAccess] = []
self._set_register(SCI1_SMR, self.sci1.smr)
self._set_register(SCI1_BRR, self.sci1.brr)
self._set_register(SCI1_SCR, self.sci1.scr)
self._set_register(SCI1_TDR, self.sci1.tdr)
self._set_register(SCI1_SSR, self.sci1.ssr)
self._set_register(SCI1_RDR, self.sci1.rdr)
def region(self, address: int) -> MemoryRegion:
return region_for(address & 0xFFFF)
def read8(self, address: int) -> int:
address &= 0xFFFF
if address in (SCI1_SMR, SCI1_BRR, SCI1_SCR, SCI1_TDR, SCI1_SSR, SCI1_RDR):
value = self.sci1.read(address)
self._set_register(address, value)
elif ON_CHIP_RAM_START <= address <= ON_CHIP_RAM_END:
value = self.ram[address - ON_CHIP_RAM_START]
elif REGISTER_FIELD_START <= address <= REGISTER_FIELD_END:
value = self.registers[address - REGISTER_FIELD_START]
elif self.rom.contains(address):
value = self.rom.u8(address)
else:
value = self.external.get(address, 0xFF)
self._log("read", address, 1, value)
return value
def read16(self, address: int) -> int:
high = self.read8(address)
low = self.read8((address + 1) & 0xFFFF)
return (high << 8) | low
def write8(self, address: int, value: int) -> None:
address &= 0xFFFF
value &= 0xFF
if address in (SCI1_SMR, SCI1_BRR, SCI1_SCR, SCI1_TDR, SCI1_SSR, SCI1_RDR):
self.sci1.write(address, value)
self._set_register(address, self.sci1.read(address))
elif ON_CHIP_RAM_START <= address <= ON_CHIP_RAM_END:
self.ram[address - ON_CHIP_RAM_START] = value
elif REGISTER_FIELD_START <= address <= REGISTER_FIELD_END:
self._set_register(address, value)
elif self.rom.contains(address):
# ROM writes are logged but ignored. This keeps the scaffold useful
# for accidental writes without mutating the loaded image.
pass
else:
self.external[address] = value
self._log("write", address, 1, value)
def write16(self, address: int, value: int) -> None:
self.write8(address, (value >> 8) & 0xFF)
self.write8((address + 1) & 0xFFFF, value & 0xFF)
def _set_register(self, address: int, value: int) -> None:
self.registers[address - REGISTER_FIELD_START] = value & 0xFF
def _log(self, kind: str, address: int, size: int, value: int) -> None:
self.access_log.append(MemoryAccess(address, size, value, kind, self.region(address).name))
@dataclass
class CPUState:
pc: int = 0
sr: int = 0
br: int = 0
regs: list[int] = field(default_factory=lambda: [0] * 8)
cycles: int = 0
steps: int = 0
z: bool = False
@dataclass
class RunReport:
steps: int
cycles: int
pc: int
stopped_reason: str
tx_bytes: bytes
tx_frames: list[bytes]
heartbeat_seen: bool
unsupported: str | None = None
trace: list[str] = field(default_factory=list)
def summary_lines(self) -> list[str]:
lines = [
f"steps={self.steps}",
f"cycles={self.cycles}",
f"pc={h16(self.pc)}",
f"stopped={self.stopped_reason}",
"tx_bytes=" + self.tx_bytes.hex(" ").upper(),
"tx_frames=" + ", ".join(frame.hex(" ").upper() for frame in self.tx_frames),
f"heartbeat_seen={self.heartbeat_seen}",
]
if self.unsupported:
lines.append(f"unsupported={self.unsupported}")
lines.append("next_todo=implement the stopped opcode, then add interrupt scheduling for SCI1 TXI and interval/watchdog timer overflow")
return lines
class H8536Emulator:
def __init__(self, rom_bytes: bytes) -> None:
if not rom_bytes:
raise ValueError("ROM image is empty")
self.sci1 = SCI1()
self.memory = MemoryMap(rom_bytes, self.sci1)
self.cpu = CPUState()
self.vectors = read_vectors_min(self.memory.rom)
self.reset()
def reset(self) -> None:
self.cpu = CPUState(pc=self.reset_vector())
def reset_vector(self) -> int:
if self.memory.rom.contains(0, 2):
return self.memory.rom.u16(0)
raise DecodeError("ROM does not contain a reset vector at H'0000")
def step(self) -> str:
pc = self.cpu.pc
decoder = H8536Decoder(self.memory.rom, br=self.cpu.br)
ins = decoder.decode(pc)
if not ins.valid:
raise UnsupportedInstruction(pc, ins.raw, ins.text)
next_pc = (pc + ins.size) & 0xFFFF
raw = ins.raw
text = ins.text
if raw[0] == 0x00:
pass
elif 0x58 <= raw[0] <= 0x5F and len(raw) == 3:
self.cpu.regs[raw[0] & 0x07] = int.from_bytes(raw[1:3], "big")
elif raw[:2] == bytes([0x0C, 0x07]) and len(raw) == 4:
self.cpu.sr = int.from_bytes(raw[2:4], "big")
elif raw[0] in (0x15, 0x1D) and len(raw) >= 4:
next_pc = self._execute_general_abs(raw, pc, next_pc)
elif raw[0] in range(0x20, 0x30) and len(raw) == 2:
next_pc = self._branch8(raw, pc, next_pc)
elif raw[0] in range(0x30, 0x40) and len(raw) == 3:
next_pc = self._branch16(raw, pc, next_pc)
else:
raise UnsupportedInstruction(pc, raw, text)
self.cpu.pc = next_pc
self.cpu.steps += 1
self.cpu.cycles += self._rough_cycles(raw)
return f"{h16(pc)}: {' '.join(f'{byte:02X}' for byte in raw):<17} {text}"
def run(self, max_steps: int, trace: bool = False, stop_on_heartbeat: bool = False) -> RunReport:
trace_lines: list[str] = []
stopped_reason = "max_steps"
unsupported: str | None = None
for _ in range(max_steps):
try:
line = self.step()
except UnsupportedInstruction as exc:
stopped_reason = "unsupported_instruction"
unsupported = str(exc)
break
if trace:
trace_lines.append(line)
if stop_on_heartbeat and self.sci1.saw_heartbeat():
stopped_reason = "heartbeat"
break
return RunReport(
steps=self.cpu.steps,
cycles=self.cpu.cycles,
pc=self.cpu.pc,
stopped_reason=stopped_reason,
tx_bytes=bytes(self.sci1.tx_bytes),
tx_frames=list(self.sci1.tx_frames),
heartbeat_seen=self.sci1.saw_heartbeat(),
unsupported=unsupported,
trace=trace_lines,
)
def _execute_general_abs(self, raw: bytes, pc: int, next_pc: int) -> int:
size = "W" if raw[0] == 0x1D else "B"
address = int.from_bytes(raw[1:3], "big")
op = raw[3]
if op == 0x06:
value = raw[4]
self.memory.write8(address, value)
elif op == 0x07:
value = int.from_bytes(raw[4:6], "big")
self.memory.write16(address, value)
elif 0x90 <= op <= 0x97:
reg = self.cpu.regs[op & 0x07]
if size == "W":
self.memory.write16(address, reg)
else:
self.memory.write8(address, reg & 0xFF)
elif 0x80 <= op <= 0x87:
reg = op & 0x07
self.cpu.regs[reg] = self.memory.read16(address) if size == "W" else self.memory.read8(address)
elif 0xC0 <= op <= 0xCF:
bit = op & 0x0F
self.memory.write8(address, self.memory.read8(address) | (1 << bit))
elif 0xD0 <= op <= 0xDF:
bit = op & 0x0F
self.memory.write8(address, self.memory.read8(address) & ~(1 << bit))
elif 0xF0 <= op <= 0xFF:
bit = op & 0x0F
self.cpu.z = not bool(self.memory.read8(address) & (1 << bit))
elif op in (0x04, 0x05):
if op == 0x04:
value = raw[4]
actual = self.memory.read8(address)
else:
value = int.from_bytes(raw[4:6], "big")
actual = self.memory.read16(address)
self.cpu.z = actual == value
elif op == 0x08:
value = self.memory.read16(address) if size == "W" else self.memory.read8(address)
value = (value + 1) & (0xFFFF if size == "W" else 0xFF)
if size == "W":
self.memory.write16(address, value)
else:
self.memory.write8(address, value)
self.cpu.z = value == 0
else:
raise UnsupportedInstruction(pc, raw, H8536Decoder(self.memory.rom, br=self.cpu.br).decode(pc).text)
return next_pc
def _branch8(self, raw: bytes, pc: int, next_pc: int) -> int:
cond = raw[0] & 0x0F
disp = _s8(raw[1])
target = (pc + 2 + disp) & 0xFFFF
if cond == 0x0:
return target
if cond == 0x7 and self.cpu.z:
return target
if cond == 0x6 and not self.cpu.z:
return target
return next_pc
def _branch16(self, raw: bytes, pc: int, next_pc: int) -> int:
cond = raw[0] & 0x0F
disp = _s16(int.from_bytes(raw[1:3], "big"))
target = (pc + 3 + disp) & 0xFFFF
if cond == 0x0:
return target
if cond == 0x7 and self.cpu.z:
return target
if cond == 0x6 and not self.cpu.z:
return target
return next_pc
def _rough_cycles(self, raw: bytes) -> int:
if raw[0] in (0x15, 0x1D):
return 9
if raw[0] in range(0x20, 0x40):
return 8 if (raw[0] & 0x0F) == 0 else 4
return 3
def _s8(value: int) -> int:
return value - 0x100 if value & 0x80 else value
def _s16(value: int) -> int:
return value - 0x10000 if value & 0x8000 else value
def discover_rom_path(root: Path) -> Path | None:
candidates = [
root / "ROM" / "M27C512@DIP28_1.BIN",
root / "rom.bin",
]
candidates.extend(sorted((root / "ROM").glob("*.BIN")) if (root / "ROM").exists() else [])
candidates.extend(sorted((root / "ROM").glob("*.bin")) if (root / "ROM").exists() else [])
for candidate in candidates:
if candidate.is_file():
return candidate
return None
def load_rom(path: Path | None = None, root: Path | None = None) -> tuple[bytes, Path]:
root = root if root is not None else Path.cwd()
rom_path = path if path is not None else discover_rom_path(root)
if rom_path is None:
raise FileNotFoundError(
"could not discover ROM bytes; pass --rom PATH, expected ROM/M27C512@DIP28_1.BIN or another ROM/*.BIN"
)
return rom_path.read_bytes(), rom_path
def describe_regions(regions: Iterable[MemoryRegion] = MEMORY_REGIONS) -> str:
return "\n".join(f"{h16(region.start)}-{h16(region.end)} {region.name} {region.kind}" for region in regions)
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Minimal H8/536 emulation harness scaffold")
parser.add_argument("--rom", type=Path, help="ROM image path; defaults to ROM/M27C512@DIP28_1.BIN when present")
parser.add_argument("--max-steps", type=int, default=64, help="maximum CPU steps to execute")
parser.add_argument("--trace", action="store_true", help="print decoded/executed instruction trace")
parser.add_argument("--stop-on-heartbeat", action="store_true", help="stop only when 00 00 00 00 80 DA is emitted through SCI1 TDR")
parser.add_argument("--memory-map", action="store_true", help="print the scaffolded memory map before running")
return parser
def main(argv: list[str] | None = None) -> int:
args = build_arg_parser().parse_args(argv)
try:
rom_bytes, rom_path = load_rom(args.rom)
except FileNotFoundError as exc:
print(str(exc))
return 2
emulator = H8536Emulator(rom_bytes)
print(f"rom={rom_path}")
print(f"reset_vector={h16(emulator.reset_vector())}")
if args.memory_map:
print(describe_regions())
report = emulator.run(args.max_steps, trace=args.trace, stop_on_heartbeat=args.stop_on_heartbeat)
if args.trace:
for line in report.trace:
print(line)
for line in report.summary_lines():
print(line)
if not report.heartbeat_seen:
print("heartbeat_status=not reached; no heartbeat is reported unless bytes are emitted via SCI1_TDR")
return 0

View File

@@ -18,8 +18,8 @@ CHECKSUM_SEED = getattr(_protocol_trace, "CHECKSUM_SEED", 0x5A)
FRAME_LENGTH = getattr(_protocol_trace, "FRAME_LENGTH", 6)
CAPTURE_LINE_RE = re.compile(
r"^\s*(?P<time>\d{1,2}:\d{2}:\d{2}(?:\.\d{1,6})?)\s+"
r"(?P<direction>RX|TX)\s+"
r"(?P<count>\d+)\s+bytes?\s+"
r"(?P<direction>RX|TX|FRAME)\s+"
r"(?P<count>\d+)(?:\s+bytes?)?\s+"
r"(?P<byte_text>.*?)\s*$",
re.IGNORECASE,
)
@@ -80,7 +80,8 @@ def parse_capture_text(text: str) -> list[CaptureChunk]:
if not match:
continue
byte_values = tuple(int(token, 16) for token in HEX_BYTE_RE.findall(match.group("byte_text")))
analyzer_direction = match.group("direction").lower()
raw_direction = match.group("direction").lower()
analyzer_direction = "rx" if raw_direction == "frame" else raw_direction
chunks.append(
CaptureChunk(
chunk_index=len(chunks),

View File

@@ -0,0 +1,548 @@
from __future__ import annotations
import argparse
import json
import re
from collections.abc import Iterable, Mapping
from pathlib import Path
from typing import Any
from .formatting import h16, label_for
JsonObject = dict[str, Any]
DEFAULT_INPUT = Path("build/rom_decompiled.json")
DEFAULT_TEXT_OUTPUT = Path("build/rom_report_sources.txt")
DEFAULT_JSON_OUTPUT = Path("build/rom_report_sources.json")
QUEUE_FUNCTION = 0x3E54
REPORT_INDEX_OF_INTEREST = 0x0007
_LOGICAL_TABLE_OFFSETS = {
0x2000: "primary_value_table_candidate",
0x1C00: "secondary_value_table_candidate",
0x1800: "current_value_table_candidate",
0x1400: "flag_table_candidate",
}
def load_report_source_input(path: Path) -> JsonObject:
with path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict) or "instructions" not in payload:
raise ValueError(f"{path} does not look like h8536_decompiler JSON output")
return payload
def analyze_report_sources(
payload: Mapping[str, Any],
*,
target: int = QUEUE_FUNCTION,
report_index: int = REPORT_INDEX_OF_INTEREST,
window: int = 16,
) -> JsonObject:
instructions = _instruction_sequence(payload.get("instructions"))
functions = _function_ranges(payload)
calls = [
_analyze_call(instructions, index, functions, target, report_index, window)
for index, ins in enumerate(instructions)
if _is_direct_call_to(ins, target)
]
direct_hits = [
call
for call in calls
if call["r2"].get("bit7") is True
and call["r3"].get("classification") == "constant"
and call["r3"].get("value") == report_index
]
dynamic_candidates = [
call
for call in calls
if call["r2"].get("bit7") is not False
and call["r3"].get("classification") in {"dynamic/table-derived", "unknown"}
]
if direct_hits:
conclusion = (
f"At least one direct loc_3E54 caller statically loads report index {report_index:#06x} "
"with R2.bit7 set before the queue call."
)
status = "direct_static_hit"
else:
conclusion = (
f"No direct loc_3E54 caller in this JSON statically loads report index {report_index:#06x}. "
f"{report_index:#06x} remains an observed runtime/capture value unless another indirect "
"or table-dispatch path is proven."
)
status = "not_statically_proven"
return {
"kind": "report_source_trace",
"queue_function": target,
"queue_function_hex": h16(target),
"report_index_of_interest": report_index,
"report_index_of_interest_hex": f"0x{report_index:04X}",
"summary": {
"direct_call_count": len(calls),
"direct_static_hit_count": len(direct_hits),
"dynamic_or_unknown_candidate_count": len(dynamic_candidates),
"status": status,
"conclusion": conclusion,
},
"calls": calls,
"caveats": [
"This is a bounded local static trace, not an emulator run.",
"R3 values classified as dynamic/table-derived may still become 0x0007 at runtime.",
"Indirect dispatch, table handlers, interrupt interleavings, or callers absent from the JSON may still enqueue 0x0007.",
"The generic queue-to-TX path only emits queued entries; this tracer looks for direct report-index sources at loc_3E54 callers.",
],
}
def format_text_report(analysis: Mapping[str, Any]) -> str:
summary = analysis.get("summary", {})
lines = [
"H8/536 loc_3E54 Report Source Trace",
"",
f"Queue function: {analysis.get('queue_function_hex', h16(QUEUE_FUNCTION))}",
f"Report index of interest: {analysis.get('report_index_of_interest_hex', '0x0007')}",
f"Direct callers: {summary.get('direct_call_count', 0)}",
f"Direct static 0x0007 hits: {summary.get('direct_static_hit_count', 0)}",
f"Dynamic/unknown candidates: {summary.get('dynamic_or_unknown_candidate_count', 0)}",
"",
f"Conclusion: {summary.get('conclusion', '')}",
"",
"Call sites:",
]
for call in analysis.get("calls", []):
if not isinstance(call, Mapping):
continue
r2 = call.get("r2", {}) if isinstance(call.get("r2"), Mapping) else {}
r3 = call.get("r3", {}) if isinstance(call.get("r3"), Mapping) else {}
r3_value = r3.get("value_hex") or "<dynamic>"
lines.append(
f"- {call.get('address_hex')} in {call.get('function_label')}: "
f"R2.bit7={_format_bit(r2.get('bit7'))}, "
f"R3={r3_value} ({r3.get('classification', 'unknown')}); "
f"direct_0x0007={call.get('can_directly_enqueue_report_index')}"
)
for source_name, source in (("R2", r2), ("R3", r3)):
evidence = source.get("evidence") if isinstance(source, Mapping) else None
if not isinstance(evidence, Mapping):
continue
text = evidence.get("instruction")
if text:
lines.append(f" {source_name} evidence: {evidence.get('address_hex')} {text}")
table_hints = call.get("table_hints")
if isinstance(table_hints, list) and table_hints:
hints = ", ".join(
f"{item.get('address_hex')} {item.get('table')} via {item.get('operand')}"
for item in table_hints[:4]
if isinstance(item, Mapping)
)
lines.append(f" table/context hints: {hints}")
lines.extend(["", "Caveats:"])
for caveat in analysis.get("caveats", []):
lines.append(f"- {caveat}")
return "\n".join(lines).rstrip() + "\n"
def write_report_sources(input_path: Path, output_path: Path, *, as_json: bool = False) -> JsonObject:
analysis = analyze_report_sources(load_report_source_input(input_path))
output_path.parent.mkdir(parents=True, exist_ok=True)
if as_json:
output_path.write_text(json.dumps(analysis, indent=2, sort_keys=True) + "\n", encoding="utf-8")
else:
output_path.write_text(format_text_report(analysis), encoding="utf-8")
return analysis
def main(argv: list[str] | None = None, stdout: Any | None = None) -> int:
parser = argparse.ArgumentParser(
description="Trace direct loc_3E54 report queue callers and their R2/R3 sources.",
)
parser.add_argument(
"input",
nargs="?",
type=Path,
default=DEFAULT_INPUT,
help="structured JSON emitted by h8536_decompiler.py",
)
parser.add_argument("--json", action="store_true", help="emit structured JSON instead of readable text")
parser.add_argument("--out", type=Path, default=None, help="write report to this path")
parser.add_argument("--window", type=int, default=16, help="bounded backward instruction window per call")
args = parser.parse_args(argv)
stream = stdout
if stream is None:
import sys
stream = sys.stdout
analysis = analyze_report_sources(load_report_source_input(args.input), window=args.window)
rendered = json.dumps(analysis, indent=2, sort_keys=True) + "\n" if args.json else format_text_report(analysis)
if args.out:
args.out.parent.mkdir(parents=True, exist_ok=True)
args.out.write_text(rendered, encoding="utf-8")
print(f"wrote {args.out}", file=stream)
else:
print(rendered, end="", file=stream)
return 0
def _analyze_call(
instructions: list[JsonObject],
call_index: int,
functions: list[JsonObject],
target: int,
report_index: int,
window: int,
) -> JsonObject:
call = instructions[call_index]
address = int(call["address"])
function = _function_for_address(functions, address)
local_window = _local_window(instructions, call_index, function, window)
r2 = _resolve_register(local_window, "R2", call, width=8)
r3 = _resolve_register(local_window, "R3", call, width=16)
table_hints = _table_hints(local_window)
can_enqueue = (
r2.get("bit7") is True
and r3.get("classification") == "constant"
and r3.get("value") == report_index
)
return {
"address": address,
"address_hex": h16(address),
"instruction": _instruction_text(call),
"target": target,
"target_hex": h16(target),
"function_start": function.get("start") if function else None,
"function_start_hex": h16(int(function["start"])) if function else None,
"function_label": function.get("label") if function else label_for(address),
"dataflow_block": _dataflow_block(call),
"window_instruction_count": len(local_window),
"window_start": int(local_window[0]["address"]) if local_window else address,
"window_start_hex": h16(int(local_window[0]["address"])) if local_window else h16(address),
"r2": r2,
"r3": r3,
"table_hints": table_hints,
"can_directly_enqueue_report_index": can_enqueue,
"assessment": _call_assessment(r2, r3, report_index),
}
def _resolve_register(window: list[Mapping[str, Any]], register: str, call: Mapping[str, Any], *, width: int) -> JsonObject:
evidence = _trace_register(window, register, seen=set(), width=width)
if evidence is None:
evidence = _dataflow_before(call, register)
if evidence is None:
return {
"register": register,
"classification": "unknown",
"value": None,
"value_hex": None,
"bit7": None if register != "R2" else "unknown",
"evidence": None,
}
if evidence.get("classification") == "constant" and isinstance(evidence.get("value"), int):
value = int(evidence["value"]) & ((1 << width) - 1)
evidence["value"] = value
evidence["value_hex"] = f"0x{value:04X}" if width > 8 else f"0x{value:02X}"
if register == "R2":
evidence["bit7"] = bool(value & 0x80)
elif register == "R2":
evidence["bit7"] = None
evidence["register"] = register
return evidence
def _trace_register(
window: list[Mapping[str, Any]],
register: str,
*,
seen: set[str],
width: int,
) -> JsonObject | None:
register = register.upper()
if register in seen:
return None
seen.add(register)
for ins in reversed(window):
source, destination = _source_destination_operands(str(ins.get("operands", "")))
mnemonic = _mnemonic_root(str(ins.get("mnemonic", "")))
if destination.upper() != register:
if _mutates_register(ins, register):
return _source_record("dynamic/table-derived", ins, reason=f"{mnemonic} mutates {register}")
continue
immediate = _parse_immediate(source)
if mnemonic.startswith("MOV") and immediate is not None:
return _source_record("constant", ins, value=immediate & ((1 << width) - 1), reason="immediate load")
source_register = _register_operand(source)
if mnemonic.startswith("MOV") and source_register:
nested = _trace_register(window[: window.index(ins)], source_register, seen=seen, width=width)
if nested is not None:
nested = dict(nested)
nested["via"] = _evidence_record(ins)
return nested
return _source_record("dynamic/table-derived", ins, reason=f"copied from unresolved {source_register}")
if "@" in source or "@" in destination:
classification = "dynamic/table-derived" if _table_operand(source) or _table_operand(destination) else "dynamic/table-derived"
return _source_record(classification, ins, reason="memory/indexed source")
return _source_record("unknown", ins, reason=f"unsupported writer {mnemonic}")
return None
def _dataflow_before(call: Mapping[str, Any], register: str) -> JsonObject | None:
dataflow = call.get("dataflow")
if not isinstance(dataflow, Mapping):
return None
changes = dataflow.get("changes")
if not isinstance(changes, list):
return None
for change in changes:
if not isinstance(change, Mapping) or change.get("kind") != "register" or str(change.get("name", "")).upper() != register:
continue
before = change.get("before")
if isinstance(before, Mapping) and before.get("known") is True and isinstance(before.get("value"), int):
return {
"classification": "constant",
"value": int(before["value"]),
"value_hex": before.get("hex"),
"reason": "decompiler dataflow before call",
"evidence": {
"address": call.get("address"),
"address_hex": h16(int(call["address"])) if isinstance(call.get("address"), int) else None,
"instruction": before.get("source"),
},
}
if isinstance(before, Mapping) and before.get("known") is False:
return {
"classification": "unknown",
"value": None,
"value_hex": None,
"reason": f"decompiler dataflow: {before.get('reason', 'unknown')}",
"evidence": None,
}
return None
def _call_assessment(r2: Mapping[str, Any], r3: Mapping[str, Any], report_index: int) -> str:
if r2.get("bit7") is False:
return "R2.bit7 appears clear, so loc_3E54 would not enqueue on this local evidence."
if r3.get("classification") == "constant":
if r3.get("value") == report_index:
return f"Direct static enqueue source for {report_index:#06x}."
return f"Direct static enqueue source for {int(r3.get('value', 0)):#06x}, not {report_index:#06x}."
return f"No static {report_index:#06x} constant here; R3 is {r3.get('classification', 'unknown')}."
def _local_window(
instructions: list[JsonObject],
call_index: int,
function: Mapping[str, Any] | None,
window: int,
) -> list[JsonObject]:
call = instructions[call_index]
call_block = _dataflow_block(call)
selected: list[JsonObject] = []
for prior in reversed(instructions[:call_index]):
if len(selected) >= window:
break
address = int(prior["address"])
if function and not (int(function["start"]) <= address <= int(function["end"])):
break
prior_block = _dataflow_block(prior)
if call_block is not None and prior_block is not None and prior_block != call_block:
continue
selected.append(prior)
return list(reversed(selected))
def _table_hints(window: Iterable[Mapping[str, Any]]) -> list[JsonObject]:
hints: list[JsonObject] = []
for ins in window:
operands = str(ins.get("operands", ""))
for operand, table in _table_operands(operands):
hints.append(
{
"address": int(ins["address"]),
"address_hex": h16(int(ins["address"])),
"instruction": _instruction_text(ins),
"operand": operand,
"table": table,
}
)
return hints
def _table_operand(operand: str) -> bool:
return bool(_table_operands(operand))
def _table_operands(operands: str) -> list[tuple[str, str]]:
matches: list[tuple[str, str]] = []
for match in re.finditer(r"@\(-H'([0-9A-Fa-f]+),\s*(R[0-7])\)", operands):
offset = int(match.group(1), 16) & 0xFFFF
table = _LOGICAL_TABLE_OFFSETS.get(offset)
if table:
matches.append((match.group(0), table))
return matches
def _source_record(
classification: str,
ins: Mapping[str, Any],
*,
value: int | None = None,
reason: str,
) -> JsonObject:
return {
"classification": classification,
"value": value,
"value_hex": f"0x{value:04X}" if value is not None else None,
"reason": reason,
"evidence": _evidence_record(ins),
}
def _evidence_record(ins: Mapping[str, Any]) -> JsonObject:
address = ins.get("address")
return {
"address": address,
"address_hex": h16(int(address)) if isinstance(address, int) else None,
"instruction": _instruction_text(ins),
"mnemonic": ins.get("mnemonic"),
"operands": ins.get("operands"),
}
def _is_direct_call_to(ins: Mapping[str, Any], target: int) -> bool:
mnemonic = _mnemonic_root(str(ins.get("mnemonic", "")))
return mnemonic in {"BSR", "JSR"} and target in _targets(ins)
def _instruction_sequence(value: object) -> list[JsonObject]:
if isinstance(value, Mapping):
values: Iterable[Any] = value.values()
elif isinstance(value, list):
values = value
else:
values = []
return sorted(
[item for item in values if isinstance(item, dict) and isinstance(item.get("address"), int)],
key=lambda item: int(item["address"]),
)
def _function_ranges(payload: Mapping[str, Any]) -> list[JsonObject]:
call_graph = payload.get("call_graph")
nodes = call_graph.get("nodes") if isinstance(call_graph, Mapping) else None
if not isinstance(nodes, list):
return []
ranges: list[JsonObject] = []
for node in nodes:
if not isinstance(node, Mapping):
continue
start = node.get("start")
end = node.get("end")
if isinstance(start, int) and isinstance(end, int):
ranges.append({"start": start, "end": end, "label": str(node.get("label") or label_for(start))})
return sorted(ranges, key=lambda item: int(item["start"]))
def _function_for_address(functions: list[JsonObject], address: int) -> JsonObject | None:
for function in functions:
if int(function["start"]) <= address <= int(function["end"]):
return function
return None
def _dataflow_block(ins: Mapping[str, Any]) -> int | None:
dataflow = ins.get("dataflow")
if isinstance(dataflow, Mapping) and isinstance(dataflow.get("block"), int):
return int(dataflow["block"])
return None
def _targets(ins: Mapping[str, Any]) -> list[int]:
targets = ins.get("targets", [])
return [int(target) for target in targets if isinstance(target, int)] if isinstance(targets, list) else []
def _instruction_text(ins: Mapping[str, Any]) -> str:
text = ins.get("text")
if isinstance(text, str) and text:
return text
operands = str(ins.get("operands", ""))
return f"{ins.get('mnemonic', '')} {operands}".strip()
def _source_destination_operands(operands: str) -> tuple[str, str]:
depth = 0
split_at: int | None = None
for index, char in enumerate(operands):
if char in "({":
depth += 1
elif char in ")}" and depth:
depth -= 1
elif char == "," and depth == 0:
split_at = index
if split_at is None:
operand = operands.strip()
return "", operand
return operands[:split_at].strip(), operands[split_at + 1 :].strip()
def _parse_immediate(operand: str) -> int | None:
text = operand.strip()
if text.startswith("#"):
text = text[1:].strip()
try:
if text.upper().startswith("H'"):
return int(text[2:], 16) & 0xFFFF
if text.upper().startswith("0X"):
return int(text, 16) & 0xFFFF
if text.startswith("$"):
return int(text[1:], 16) & 0xFFFF
return int(text, 10) & 0xFFFF
except ValueError:
return None
def _register_operand(operand: str) -> str | None:
text = operand.strip().upper()
return text if re.fullmatch(r"R[0-7]", text) else None
def _mutates_register(ins: Mapping[str, Any], register: str) -> bool:
mnemonic = _mnemonic_root(str(ins.get("mnemonic", "")))
source, destination = _source_destination_operands(str(ins.get("operands", "")))
if destination.upper() != register.upper():
return False
return not mnemonic.startswith("MOV")
def _mnemonic_root(mnemonic: str) -> str:
return mnemonic.rsplit(".", 1)[0].upper()
def _format_bit(value: Any) -> str:
if value is True:
return "set"
if value is False:
return "clear"
return "unknown"
__all__ = [
"analyze_report_sources",
"format_text_report",
"load_report_source_input",
"main",
"write_report_sources",
]
if __name__ == "__main__":
raise SystemExit(main())

8
h8536_emulator.py Normal file
View File

@@ -0,0 +1,8 @@
#!/usr/bin/env python3
"""Compatibility wrapper for the H8/536 emulation harness."""
from h8536.emulator import main
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,8 @@
#!/usr/bin/env python3
"""Compatibility wrapper for the H8/536 report-source tracer CLI."""
from h8536.report_source_trace import main
if __name__ == "__main__":
raise SystemExit(main())

76
tests/test_emulator.py Normal file
View File

@@ -0,0 +1,76 @@
import unittest
from pathlib import Path
from h8536.emulator import (
HEARTBEAT_FRAME,
ON_CHIP_RAM_START,
REGISTER_FIELD_START,
SCI1_SCR,
SCI1_SSR,
SCI1_TDR,
SCI_SCR_TE,
SCI_SSR_TDRE,
H8536Emulator,
MemoryMap,
SCI1,
discover_rom_path,
load_rom,
)
class EmulatorHarnessTest(unittest.TestCase):
def test_memory_map_routes_rom_ram_register_and_external(self):
memory = MemoryMap(bytes([0x12, 0x34, 0x56, 0x78]))
self.assertEqual(memory.read8(0x0001), 0x34)
memory.write8(ON_CHIP_RAM_START, 0xA5)
self.assertEqual(memory.read8(ON_CHIP_RAM_START), 0xA5)
memory.write8(REGISTER_FIELD_START, 0x5A)
self.assertEqual(memory.read8(REGISTER_FIELD_START), 0x5A)
memory.write8(0xF000, 0x11)
self.assertEqual(memory.read8(0xF000), 0x11)
def test_sci_transmit_capture_requires_enabled_transmitter(self):
sci = SCI1()
sci.write(SCI1_TDR, 0x33)
self.assertEqual(sci.tx_bytes, [])
self.assertFalse(sci.tx_events[-1].emitted)
sci.write(SCI1_SCR, sci.scr | SCI_SCR_TE)
for byte in HEARTBEAT_FRAME:
sci.write(SCI1_TDR, byte)
self.assertEqual(bytes(sci.tx_bytes), HEARTBEAT_FRAME)
self.assertEqual(sci.tx_frames, [HEARTBEAT_FRAME])
self.assertTrue(sci.saw_heartbeat())
self.assertTrue(sci.read(SCI1_SSR) & SCI_SSR_TDRE)
def test_vector_decoding_uses_minimum_mode_reset_word(self):
rom = bytearray(0x1004)
rom[0:2] = b"\x10\x00"
rom[0x1000] = 0x00
emulator = H8536Emulator(bytes(rom))
self.assertEqual(emulator.reset_vector(), 0x1000)
self.assertEqual(emulator.cpu.pc, 0x1000)
self.assertEqual(emulator.vectors[0x0000][1], 0x1000)
def test_harness_instantiates_on_repo_artifacts(self):
root = Path(__file__).resolve().parents[1]
rom_path = discover_rom_path(root)
self.assertIsNotNone(rom_path)
rom_bytes, loaded_path = load_rom(root=root)
emulator = H8536Emulator(rom_bytes)
report = emulator.run(max_steps=4)
self.assertEqual(loaded_path, rom_path)
self.assertEqual(emulator.reset_vector(), 0x1000)
self.assertGreaterEqual(len(rom_bytes), 0x1002)
self.assertEqual(report.steps, 4)
self.assertFalse(report.heartbeat_seen)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,6 +1,7 @@
import io
import json
import unittest
from pathlib import Path
from h8536.protocol_capture import analyze_capture_text, format_text_report, main, parse_capture_text
@@ -15,6 +16,15 @@ class ProtocolCaptureTest(unittest.TestCase):
self.assertEqual(chunks[0].device_direction, "tx")
self.assertEqual(chunks[0].bytes, (0x00, 0x00, 0x15, 0x80, 0x00, 0xCF))
def test_parses_idle_frame_lines_without_direction_token(self):
chunks = parse_capture_text("11:54:40.567 frame 006 00 00 00 00 80 DA\n")
self.assertEqual(len(chunks), 1)
self.assertEqual(chunks[0].timestamp_ms, 42880567)
self.assertEqual(chunks[0].analyzer_direction, "rx")
self.assertEqual(chunks[0].device_direction, "tx")
self.assertEqual(chunks[0].bytes, (0x00, 0x00, 0x00, 0x00, 0x80, 0xDA))
def test_recombines_user_split_rx_chunks_into_valid_call_frame(self):
analysis = analyze_capture_text(
"16:06:15.502 RX 003 bytes 00 00 15\n"
@@ -140,6 +150,23 @@ class ProtocolCaptureTest(unittest.TestCase):
payload = json.loads(output.getvalue())
self.assertEqual(payload["frames"][0]["report_candidate"]["index"], 0x15)
def test_idle_reference_capture_when_present(self):
path = Path("ROM/rcp-txd-idle-only.txt")
if not path.exists():
self.skipTest("idle reference capture is not present")
analysis = analyze_capture_text(path.read_text(encoding="utf-8"))
self.assertGreaterEqual(analysis["frame_count"], 10)
self.assertEqual(
analysis["gate_session_hints"]["observed_autonomous_report_names"],
["heartbeat_alive_candidate"],
)
heartbeat = analysis["gate_session_hints"]["heartbeat_cadence_ms"]
self.assertEqual(heartbeat["count"], analysis["frame_count"])
self.assertGreater(heartbeat["average"], 600)
self.assertLess(heartbeat["average"], 800)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,135 @@
import io
import json
import tempfile
import unittest
from pathlib import Path
from h8536.report_source_trace import analyze_report_sources, format_text_report, main, write_report_sources
def ins(
address: int,
mnemonic: str,
operands: str = "",
*,
text: str | None = None,
targets: list[int] | None = None,
block: int | None = None,
) -> dict[str, object]:
row: dict[str, object] = {
"address": address,
"mnemonic": mnemonic,
"operands": operands,
"text": text or f"{mnemonic} {operands}".strip(),
"kind": "call" if mnemonic in {"BSR", "JSR"} else "normal",
"targets": targets or [],
"references": [],
}
if block is not None:
row["dataflow"] = {"block": block}
return row
def payload() -> dict[str, object]:
return {
"call_graph": {
"nodes": [
{"start": 0x1000, "end": 0x10FF, "label": "loc_1000"},
{"start": 0x2000, "end": 0x20FF, "label": "loc_2000"},
{"start": 0x3000, "end": 0x30FF, "label": "loc_3000"},
],
},
"instructions": [
ins(0x1000, "MOV:E.B", "#H'80, R2", block=0x1000),
ins(0x1002, "MOV:I.W", "#H'0007, R3", block=0x1000),
ins(0x1005, "BSR", "loc_3E54", targets=[0x3E54], block=0x1000),
ins(0x2000, "MOV:I.W", "#H'0012, R5", block=0x2000),
ins(0x2003, "CMP:G.W", "@(-H'2000,R5), R1", block=0x2000),
ins(0x2007, "MOV:E.B", "#H'80, R2", block=0x2000),
ins(0x2009, "MOV:G.W", "R5, R3", block=0x2000),
ins(0x200B, "BSR", "loc_3E54", targets=[0x3E54], block=0x2000),
ins(0x3000, "MOV:E.B", "#H'00, R2", block=0x3000),
ins(0x3002, "MOV:I.W", "#H'0007, R3", block=0x3000),
ins(0x3005, "BSR", "loc_3E54", targets=[0x3E54], block=0x3000),
],
}
class ReportSourceTraceTest(unittest.TestCase):
def test_finds_direct_static_report_index_0007(self):
analysis = analyze_report_sources(payload())
self.assertEqual(analysis["summary"]["direct_call_count"], 3)
self.assertEqual(analysis["summary"]["direct_static_hit_count"], 1)
hit = analysis["calls"][0]
self.assertTrue(hit["can_directly_enqueue_report_index"])
self.assertEqual(hit["r2"]["bit7"], True)
self.assertEqual(hit["r3"]["classification"], "constant")
self.assertEqual(hit["r3"]["value"], 0x0007)
self.assertIn("Direct static enqueue source", hit["assessment"])
def test_classifies_table_context_and_clear_gate(self):
analysis = analyze_report_sources(payload())
dynamic = analysis["calls"][1]
gated_off = analysis["calls"][2]
self.assertEqual(dynamic["r3"]["classification"], "constant")
self.assertEqual(dynamic["r3"]["value"], 0x0012)
self.assertEqual(dynamic["table_hints"][0]["table"], "primary_value_table_candidate")
self.assertFalse(gated_off["can_directly_enqueue_report_index"])
self.assertEqual(gated_off["r2"]["bit7"], False)
self.assertIn("would not enqueue", gated_off["assessment"])
def test_text_report_mentions_conclusion_and_caveats(self):
text = format_text_report(analyze_report_sources(payload()))
self.assertIn("loc_3E54 Report Source Trace", text)
self.assertIn("Direct static 0x0007 hits: 1", text)
self.assertIn("Indirect dispatch", text)
self.assertIn("R3 evidence", text)
def test_cli_json_output_and_out_file(self):
with tempfile.TemporaryDirectory() as tmp:
input_path = Path(tmp) / "rom.json"
output_path = Path(tmp) / "sources.json"
input_path.write_text(json.dumps(payload()), encoding="utf-8")
stdout = io.StringIO()
rc = main(["--json", "--out", str(output_path), str(input_path)], stdout=stdout)
self.assertEqual(rc, 0)
self.assertIn("wrote", stdout.getvalue())
written = json.loads(output_path.read_text(encoding="utf-8"))
self.assertEqual(written["kind"], "report_source_trace")
self.assertEqual(written["summary"]["direct_static_hit_count"], 1)
def test_write_text_output(self):
with tempfile.TemporaryDirectory() as tmp:
input_path = Path(tmp) / "rom.json"
output_path = Path(tmp) / "sources.txt"
input_path.write_text(json.dumps(payload()), encoding="utf-8")
analysis = write_report_sources(input_path, output_path)
self.assertEqual(analysis["kind"], "report_source_trace")
self.assertIn("Report Source Trace", output_path.read_text(encoding="utf-8"))
def test_real_rom_smoke_when_present(self):
path = Path("build/rom_decompiled.json")
if not path.exists():
self.skipTest("build/rom_decompiled.json is not present")
payload_real = json.loads(path.read_text(encoding="utf-8"))
analysis = analyze_report_sources(payload_real)
self.assertEqual(analysis["kind"], "report_source_trace")
self.assertGreaterEqual(analysis["summary"]["direct_call_count"], 1)
self.assertIn("0x0007", analysis["summary"]["conclusion"])
for call in analysis["calls"]:
self.assertIn("address_hex", call)
self.assertIn("r2", call)
self.assertIn("r3", call)
if __name__ == "__main__":
unittest.main()