1
0
Files
h8-536-decoder/h8536/serial_gate.py
2026-05-25 20:42:45 +10:00

553 lines
20 KiB
Python

from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Any
from .formatting import h16, label_for
JsonObject = dict[str, Any]
KEY_STATE_ADDRESSES: tuple[int, ...] = (
0xF9B0,
0xF9B4,
0xF9B5,
0xF9B9,
0xF9C0,
0xF9C1,
0xF9C3,
0xF9C4,
0xF9C5,
0xF9C6,
0xF9C8,
0xFAA2,
0xFAA3,
0xFAA5,
)
DEFAULT_INPUT = Path("build/rom_decompiled.json")
CAPTURE_OVERLAY_CAVEAT = (
"Observed report indexes 0x0007 and 0x0015 are capture overlays/runtime queue "
"entries; this analyzer does not treat them as statically proven ROM constants."
)
def load_serial_gate_input(path: Path) -> JsonObject:
with path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict) or "instructions" not in payload:
raise ValueError(f"{path} does not look like h8536_decompiler JSON output")
return payload
def analyze_serial_gate(payload: dict[str, Any]) -> JsonObject:
instructions = _instruction_sequence(payload.get("instructions"))
labels = _collect_labels(payload, instructions)
by_address = {int(ins["address"]): ins for ins in instructions if "address" in ins}
evidence = {
"scheduler_gate_loc_3FD3": _scheduler_gate(by_address),
"queue_send_gate_loc_BAF2": _queue_send_gate(by_address),
"resend_gate_path": _resend_gate_path(by_address),
"rx_session_maintenance": _rx_session_maintenance(by_address),
"idle_heartbeat_gate_loc_4046": _idle_heartbeat_gate(payload, by_address),
"timer_tick_evidence": _timer_tick_evidence(payload, by_address),
}
access_summary = _state_access_summary(instructions, labels)
return {
"kind": "serial_gate",
"summary": {
"state_machine_candidate": "autonomous serial TX/report queue gate",
"confidence": _confidence(evidence),
"basis": "address-driven static evidence from decompiler JSON",
},
"state_addresses": [
{"address": address, "address_hex": h16(address), "symbol": f"ram_{address:04X}"}
for address in KEY_STATE_ADDRESSES
],
"evidence": evidence,
"state_accesses": access_summary,
"caveats": [
CAPTURE_OVERLAY_CAVEAT,
"Queue entries near F870 are reached through RAM-indexed addressing; static JSON proves the access pattern, not the runtime queue contents.",
"Branch predicates are summarized from local instruction order and targets; this is not an emulator trace.",
],
}
def format_text_report(analysis: dict[str, Any]) -> str:
lines = [
"H8/536 Serial Gate/Queue State-Machine Reconstruction",
"",
f"Summary: {analysis['summary']['state_machine_candidate']}",
f"Confidence: {analysis['summary']['confidence']}",
"",
"Evidence:",
]
for key, section in analysis.get("evidence", {}).items():
title = str(section.get("title", key)).rstrip(".")
status = "present" if section.get("present") else "missing"
lines.append(f"- {title}: {status}")
summary = section.get("summary")
if summary:
lines.append(f" {summary}")
for item in section.get("items", []):
lines.append(f" - {item['address_hex']}: {item['text']}")
roles = section.get("candidate_timer_roles", [])
if roles:
lines.append(" Candidate timer roles:")
for role in roles:
lines.append(f" - {role['address_hex']}: {role['role']}")
timer = section.get("timer")
if isinstance(timer, dict):
source = timer.get("source")
handler = timer.get("handler_address_hex")
ocra = timer.get("ocra_value_hex")
period = timer.get("observed_period_ms_candidate")
timer_bits = [str(part) for part in (source, handler, f"OCRA={ocra}" if ocra else "", f"observed period ~= {period}ms" if period else "") if part]
if timer_bits:
lines.append(f" Timer: {', '.join(timer_bits)}")
lines.extend(["", "State address readers/writers:"])
for entry in analysis.get("state_accesses", []):
lines.append(
f"- {entry['address_hex']}: reads={entry['read_count']} "
f"writes={entry['write_count']} read/write={entry['read_write_count']}"
)
samples = entry.get("sample_accesses", [])
if samples:
sample_text = "; ".join(f"{sample['address_hex']} {sample['access']} {sample['text']}" for sample in samples)
lines.append(f" {sample_text}")
lines.extend(["", "Caveats:"])
for caveat in analysis.get("caveats", []):
lines.append(f"- {caveat}")
return "\n".join(lines).rstrip() + "\n"
def write_serial_gate_report(input_path: Path, output_path: Path, *, as_json: bool = False) -> JsonObject:
analysis = analyze_serial_gate(load_serial_gate_input(input_path))
output_path.parent.mkdir(parents=True, exist_ok=True)
if as_json:
output_path.write_text(json.dumps(analysis, indent=2, sort_keys=True) + "\n", encoding="utf-8")
else:
output_path.write_text(format_text_report(analysis), encoding="utf-8")
return analysis
def main(argv: list[str] | None = None, stdout: Any | None = None) -> int:
parser = argparse.ArgumentParser(
description="Summarize H8/536 autonomous serial TX/report gates and queue state.",
)
parser.add_argument(
"input",
nargs="?",
type=Path,
default=DEFAULT_INPUT,
help="structured JSON emitted by h8536_decompiler.py",
)
parser.add_argument("--json", action="store_true", help="emit structured JSON instead of readable text")
parser.add_argument("--out", type=Path, default=None, help="write report to this path")
args = parser.parse_args(argv)
stream = stdout
if stream is None:
import sys
stream = sys.stdout
analysis = analyze_serial_gate(load_serial_gate_input(args.input))
if args.json:
rendered = json.dumps(analysis, indent=2, sort_keys=True) + "\n"
else:
rendered = format_text_report(analysis)
if args.out:
args.out.parent.mkdir(parents=True, exist_ok=True)
args.out.write_text(rendered, encoding="utf-8")
print(f"wrote {args.out}", file=stream)
else:
print(rendered, end="", file=stream)
return 0
def _scheduler_gate(by_address: dict[int, JsonObject]) -> JsonObject:
addresses = [0x3FD3, 0x3FD7, 0x3FD9, 0x3FDD, 0x3FDF, 0x3FE3, 0x3FE5, 0x3FE9, 0x3FEB]
items = _items(by_address, addresses)
return {
"title": "loc_3FD3 gate into loc_BAF2",
"present": _has_all(by_address, (0x3FD3, 0x3FD9, 0x3FDF, 0x3FE5, 0x3FEB)),
"summary": (
"Requires FAA2 == 0, allows the FAA5.bit7 path only when F9C3 == 0, "
"then requires F9C0 == 0 before BSR loc_BAF2."
),
"items": items,
"required_addresses_hex": [h16(address) for address in addresses],
}
def _queue_send_gate(by_address: dict[int, JsonObject]) -> JsonObject:
addresses = [
0xBAF2,
0xBAF8,
0xBAFC,
0xBAFE,
0xBB00,
0xBB08,
0xBB1C,
0xBB20,
0xBB2B,
0xBB39,
0xBB3F,
0xBB43,
0xBB46,
0xBB4C,
0xBB51,
]
return {
"title": "loc_BAF2 queue send gate",
"present": _has_all(by_address, (0xBAF2, 0xBAF8, 0xBB08, 0xBB1C, 0xBB39, 0xBB43)),
"summary": (
"F9B5 is compared against F9B0; inequality enters the send path, reads a queued "
"word via the F9B5-derived index around F870, stages F850-F854, and calls BA26 at BB43."
),
"items": _items(by_address, addresses),
"queue_table_candidate": {
"base_address_hex": h16(0xF870),
"index_address_hex": h16(0xF9B5),
"evidence_address_hex": h16(0xBB08),
"addressing_text": _text(by_address, 0xBB08),
},
"staging_addresses_hex": [h16(address) for address in range(0xF850, 0xF855)],
"send_subroutine_hex": h16(0xBA26),
"send_call_address_hex": h16(0xBB43),
}
def _resend_gate_path(by_address: dict[int, JsonObject]) -> JsonObject:
addresses = [0xBE9E, 0xBEA5, 0xBEA9, 0xBEAF, 0xBEB5, 0xBEBB, 0xBEC5, 0xBECB, 0xBED1, 0xBED5]
return {
"title": "resend gate/path",
"present": _has_all(by_address, (0xBE9E, 0xBEA5, 0xBEB5, 0xBEBB, 0xBECB, 0xBED5)),
"summary": (
"BE9E masks FAA5 with FAA3, waits for F9C6/F9C8 timeout gates, then if FAA3.bit7 "
"remains set clears F9C3 and calls BA26 from BED5."
),
"items": _items(by_address, addresses),
"resend_call_address_hex": h16(0xBED5),
"send_subroutine_hex": h16(0xBA26),
}
def _rx_session_maintenance(by_address: dict[int, JsonObject]) -> JsonObject:
addresses = [
0x3FEF,
0x3FF5,
0x3FF9,
0x3FFD,
0x4007,
0xBBCB,
0xBC0F,
0xBC15,
0xBC33,
0xBC5C,
0xBC63,
0xBCD0,
0xBCFD,
0xBD04,
0xBD6D,
0xBD71,
0xBD75,
0xBD79,
0xBDC8,
0xBDCC,
0xBDD0,
0xBDD4,
0xBDF3,
0xBDF7,
0xBDFB,
0xBDFF,
]
return {
"title": "RX/session maintenance",
"present": _has_all(by_address, (0x3FEF, 0x3FF5, 0xBBCB, 0xBC15, 0xBD6D, 0xBD79)),
"summary": (
"F9C5 timeout maintenance clears F9B5/F9B0 and FAA5.bit7; RX command processing "
"uses FAA2 as an in-session latch and paths advance F9B5/F9B0 or clear FAA3/FAA2."
),
"items": _items(by_address, addresses),
}
def _idle_heartbeat_gate(payload: dict[str, Any], by_address: dict[int, JsonObject]) -> JsonObject:
vector = _vector_entry(payload, 0x006A, "frt2_ocia")
handler = _int_field(vector, "target") if vector else None
if handler is None:
handler = 0xBF23
addresses = [
0x4046,
0x404A,
0x404C,
0x4050,
0x4052,
0x4056,
0x4058,
0x4059,
0x405F,
0x4063,
0x4067,
0x406C,
0x4070,
0x40E0,
0xBA31,
handler,
0xBF27,
0xBF2D,
]
present = _has_all(by_address, (0x4046, 0x4050, 0x4067, 0x40E0, 0xBA31, handler, 0xBF2D))
return {
"title": "loc_4046 idle heartbeat/report gate",
"present": present,
"summary": (
"F9C4 gates the idle/default report enqueue. Reset/init loads H'14, each BA26 send "
"reloads H'07, and the FRT2 OCIA handler decrements it; when it reaches zero loc_4046 "
"can enqueue H'00FF if the queue is empty and the FAA5/F9C3 RX gate permits it. With "
"FRT2 OCRA H'7A12 and CKS=phi/32, a phi near 10 MHz gives about 0.7s for H'07, matching "
"the observed heartbeat cadence."
),
"items": _items(by_address, addresses),
"gate_address_hex": h16(0x4046),
"queue_write_address_hex": h16(0x4067),
"initial_reload_address_hex": h16(0x40E0),
"post_tx_reload_address_hex": h16(0xBA31),
"tick_handler_address_hex": h16(handler),
"decrement_address_hex": h16(0xBF2D),
"initial_reload_value_hex": "H'14",
"post_tx_reload_value_hex": "H'07",
"timer": {
"source": "FRT2 OCIA",
"vector_address_hex": h16(0x006A),
"handler_address_hex": h16(handler),
"vector_target_label": str(vector.get("target_label", "")) if vector else "",
"tcr_address_hex": h16(0xFEA0),
"tcsr_address_hex": h16(0xFEA1),
"ocra_address_hex": h16(0xFEA4),
"ocra_value_hex": "H'7A12",
"clock_select": "CKS1=1 CKS0=0 => phi/32",
"observed_period_ms_candidate": 700,
"manual_reference": "Manual/0900766b802125d0.md:12038 FRT CKS1/CKS0 clock select",
},
"candidate_timer_roles": [
{
"address": 0xF9C4,
"address_hex": h16(0xF9C4),
"role": "candidate idle heartbeat/report gate countdown",
"evidence_address_hex": h16(0xBF2D),
}
],
"required_addresses_hex": [h16(address) for address in (0x4046, 0x4050, 0x4067, 0x40E0, 0xBA31, handler, 0xBF2D)],
}
def _timer_tick_evidence(payload: dict[str, Any], by_address: dict[int, JsonObject]) -> JsonObject:
vector = _vector_entry(payload, 0x0062, "frt1_ocia")
handler = _int_field(vector, "target") if vector else None
if handler is None:
handler = 0xBEEA
addresses = [handler, 0xBEEE, 0xBEF4, 0xBEF8, 0xBEFE, 0xBF02, 0xBF08]
has_vector = vector is not None and _int_field(vector, "target") == handler
has_handler_clear = _instruction_mentions(by_address.get(handler), ("FRT1_TCSR", "#5"))
decrement_addresses = (0xBEF4, 0xBEFE, 0xBF08)
has_decrements = all(
address in by_address and _access_kind(by_address[address], state_address) == "read_write"
for address, state_address in zip(decrement_addresses, (0xF9C0, 0xF9C1, 0xF9C6))
)
return {
"title": "FRT1 OCIA periodic tick countdowns",
"present": bool((has_vector or has_handler_clear) and has_decrements),
"summary": (
"Static evidence links vector H'0062 to the FRT1 OCIA handler at H'BEEA; the handler "
"clears FRT1_TCSR.OCFA and conditionally decrements H'F9C0, H'F9C1, and H'F9C6."
),
"vector_address_hex": h16(0x0062),
"handler_address_hex": h16(handler),
"vector_target_label": str(vector.get("target_label", "")) if vector else "",
"items": _items(by_address, addresses),
"candidate_timer_roles": [
{
"address": 0xF9C0,
"address_hex": h16(0xF9C0),
"role": "candidate post-TX/report delay countdown",
"evidence_address_hex": h16(0xBEF4),
},
{
"address": 0xF9C1,
"address_hex": h16(0xF9C1),
"role": "candidate secondary delay countdown",
"evidence_address_hex": h16(0xBEFE),
},
{
"address": 0xF9C6,
"address_hex": h16(0xF9C6),
"role": "candidate periodic report/heartbeat countdown",
"evidence_address_hex": h16(0xBF08),
},
],
}
def _state_access_summary(instructions: list[JsonObject], labels: dict[int, str]) -> list[JsonObject]:
result: list[JsonObject] = []
for state_address in KEY_STATE_ADDRESSES:
accesses = []
for ins in instructions:
if state_address not in _reference_addresses(ins):
continue
access = _access_kind(ins, state_address)
accesses.append(
{
"address": int(ins["address"]),
"address_hex": h16(int(ins["address"])),
"function": _function_label_for_address(int(ins["address"]), labels),
"access": access,
"text": str(ins.get("text", "")),
}
)
result.append(
{
"address": state_address,
"address_hex": h16(state_address),
"read_count": sum(1 for access in accesses if access["access"] == "read"),
"write_count": sum(1 for access in accesses if access["access"] == "write"),
"read_write_count": sum(1 for access in accesses if access["access"] == "read_write"),
"accesses": accesses,
"sample_accesses": accesses[:6],
}
)
return result
def _instruction_sequence(raw: Any) -> list[JsonObject]:
if not isinstance(raw, list):
return []
return sorted(
[item for item in raw if isinstance(item, dict) and isinstance(item.get("address"), int)],
key=lambda item: int(item["address"]),
)
def _collect_labels(payload: dict[str, Any], instructions: list[JsonObject]) -> dict[int, str]:
labels: dict[int, str] = {}
nodes = payload.get("call_graph", {}).get("nodes", []) if isinstance(payload.get("call_graph"), dict) else []
if isinstance(nodes, list):
for node in nodes:
if isinstance(node, dict) and isinstance(node.get("start"), int) and node.get("label"):
labels[int(node["start"])] = str(node["label"])
return labels
def _items(by_address: dict[int, JsonObject], addresses: list[int]) -> list[JsonObject]:
return [
{
"address": address,
"address_hex": h16(address),
"text": _text(by_address, address),
"present": address in by_address,
"targets_hex": [h16(target) for target in by_address.get(address, {}).get("targets", []) if isinstance(target, int)],
}
for address in addresses
]
def _has_all(by_address: dict[int, JsonObject], addresses: tuple[int, ...]) -> bool:
return all(address in by_address for address in addresses)
def _vector_entry(payload: dict[str, Any], address: int, name: str) -> JsonObject | None:
vectors = payload.get("vectors", [])
if not isinstance(vectors, list):
return None
for vector in vectors:
if not isinstance(vector, dict):
continue
if _int_field(vector, "address") == address or str(vector.get("name", "")).lower() == name:
return vector
return None
def _int_field(payload: JsonObject | None, key: str, default: int | None = None) -> int | None:
if not isinstance(payload, dict):
return default
value = payload.get(key)
return value if isinstance(value, int) else default
def _instruction_mentions(ins: JsonObject | None, fragments: tuple[str, ...]) -> bool:
if not isinstance(ins, dict):
return False
text = f"{ins.get('text', '')} {ins.get('operands', '')} {ins.get('comment', '')}".upper()
return all(fragment.upper() in text for fragment in fragments)
def _text(by_address: dict[int, JsonObject], address: int) -> str:
return str(by_address.get(address, {}).get("text", "<missing>"))
def _reference_addresses(ins: JsonObject) -> set[int]:
addresses: set[int] = set()
refs = ins.get("references", [])
if isinstance(refs, list):
for ref in refs:
if isinstance(ref, dict) and isinstance(ref.get("address"), int):
addresses.add(int(ref["address"]))
text = str(ins.get("text", ""))
for match in re.finditer(r"@H'([0-9A-Fa-f]{4})", text):
addresses.add(int(match.group(1), 16))
return addresses
def _access_kind(ins: JsonObject, address: int) -> str:
mnemonic = str(ins.get("mnemonic", "")).upper()
operands = str(ins.get("operands", ""))
target = f"@H'{address:04X}"
upper_operands = operands.upper()
if mnemonic.startswith(("TST", "CMP", "BTST")):
return "read"
if mnemonic.startswith("CLR"):
return "write"
if mnemonic.startswith(("BSET", "BCLR", "ADD", "SUB", "INC", "DEC")):
return "read_write"
if mnemonic.startswith("MOV") and "," in upper_operands:
_src, dest = [part.strip() for part in upper_operands.rsplit(",", 1)]
return "write" if target in dest else "read"
if mnemonic.startswith(("AND", "OR", "XOR")) and "," in upper_operands:
_src, dest = [part.strip() for part in upper_operands.rsplit(",", 1)]
return "read_write" if target in dest else "read"
return "read"
def _function_label_for_address(address: int, labels: dict[int, str]) -> str:
starts = [start for start in labels if start <= address]
if not starts:
return label_for(address)
return labels[max(starts)]
def _confidence(evidence: dict[str, JsonObject]) -> str:
present_count = sum(1 for section in evidence.values() if section.get("present"))
if present_count == len(evidence):
return "high"
if present_count >= 2:
return "medium"
return "low"
if __name__ == "__main__":
raise SystemExit(main())