1
0

More ccu based mining

This commit is contained in:
Aiden
2026-05-26 11:07:36 +10:00
parent 421c9f4567
commit 1ad03d5692
6 changed files with 2880 additions and 1 deletions

597
h8536/ccu_seed_hints.py Normal file
View File

@@ -0,0 +1,597 @@
from __future__ import annotations
import argparse
import json
from collections import Counter, defaultdict
from collections.abc import Mapping
from pathlib import Path
from typing import Any
from .formatting import h16
from .lcd_text import analyze_lcd_text
from .rom import Rom
from .serial_semantics import OBSERVED_TX_REPORT_OVERLAY
from .table_xrefs import analyze_table_xrefs
JsonObject = dict[str, Any]
DEFAULT_INPUT = Path("build/rom_decompiled.json")
DEFAULT_ROM = Path("ROM/M27C512@DIP28_1.BIN")
CHECKSUM_SEED = 0x5A
DISPLAY_TERMS = (
"CONNECT",
"COMM LINK",
"COMPLETED",
"CAM",
"BARS",
"BLACK",
"IRIS",
"GAIN",
"SHUTTER",
"CALL",
"POWER",
"AUTO",
"DIAG",
"DXC",
)
SPECIAL_SELECTORS: tuple[JsonObject, ...] = (
{
"selector": 0x000,
"name": "connection_or_heartbeat_root_candidate",
"reason": "idle report selector and CONNECT OK emulator condition both center on selector zero",
"seed_values": [0x0080, 0x8080],
},
{
"selector": 0x003,
"name": "default_enabled_bit_candidate",
"reason": "ROM default table writes E000/E800 selector 0x003 to 0x8000",
"seed_values": [0x8000],
},
{
"selector": 0x040,
"name": "default_all_ones_or_status_block_candidate",
"reason": "ROM default table writes E000/E800 selector 0x040 to 0xFFFF and bench tests repeatedly touched the 0x40 family",
"seed_values": [0xFFFF, 0x4030],
},
{
"selector": 0x0F6,
"name": "active_status_bridge_candidate",
"reason": "loc_48FA tests E1EC bit13 and can enqueue report selector 0x00F6",
"seed_values": [0x2000],
},
{
"selector": 0x006C,
"name": "command5_be70_candidate",
"reason": "continuation command 5 calls BE70 for selector 0x006C",
"seed_values": [],
},
{
"selector": 0x006D,
"name": "command5_be70_candidate",
"reason": "continuation command 5 calls BE70 for selector 0x006D",
"seed_values": [],
},
{
"selector": 0x006E,
"name": "command5_be70_candidate",
"reason": "continuation command 5 calls BE70 for selector 0x006E",
"seed_values": [],
},
{
"selector": 0x006B,
"name": "connection_latch_clear_candidate",
"reason": "when F731.7 is set, command 5 on this selector clears F731.7/F790.7",
"seed_values": [],
},
{
"selector": 0x0096,
"name": "connection_latch_clear_candidate",
"reason": "when F731.7 is set, command 5 on this selector clears F731.7/F790.7",
"seed_values": [],
},
{
"selector": 0x0097,
"name": "connection_latch_clear_candidate",
"reason": "when F731.7 is set, command 5 on this selector clears F731.7/F790.7",
"seed_values": [],
},
{
"selector": 0x00C6,
"name": "connection_latch_clear_candidate",
"reason": "when F731.7 is set, command 5 on this selector clears F731.7/F790.7",
"seed_values": [],
},
{
"selector": 0x00F8,
"name": "connection_latch_clear_candidate",
"reason": "when F731.7 is set, command 5 on this selector clears F731.7/F790.7",
"seed_values": [],
},
)
def load_seed_hints_input(path: Path) -> JsonObject:
with path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict) or "instructions" not in payload:
raise ValueError(f"{path} does not look like h8536_decompiler JSON output")
return payload
def analyze_ccu_seed_hints(payload: Mapping[str, Any], *, rom_path: Path | None = DEFAULT_ROM) -> JsonObject:
table_analysis = analyze_table_xrefs(payload)
selector_hints = _selector_hints_from_tables(table_analysis)
_merge_special_selectors(selector_hints)
_merge_observed_reports(selector_hints)
dispatch = _dispatch_table_summary(payload, rom_path)
for entry in dispatch.get("interesting_entries", []):
selector = int(entry["selector"])
hint = selector_hints.setdefault(selector, _new_selector_hint(selector))
hint["score"] += 2
hint["reasons"].append(f"selector dispatches to {entry['target_label_or_hex']}")
hint["dispatch_target"] = entry
display = _display_hint_summary(payload, rom_path)
seed_plan = _seed_plan(selector_hints)
candidates = sorted(
selector_hints.values(),
key=lambda item: (-int(item["score"]), int(item["selector"])),
)
return {
"kind": "ccu_seed_hints",
"summary": {
"candidate_count": len(candidates),
"core_model": (
"The RCP likely waits for the CCU to seed mirrored state tables, then uses those "
"selector values to update LCD text, panel lamps, and report state changes."
),
"confidence": "medium",
},
"table_model": _table_model_summary(table_analysis),
"selector_candidates": candidates[:80],
"display_text_hints": display,
"dispatch_table": dispatch,
"seed_plan": seed_plan,
"bench_implications": [
"Do not wait for non-heartbeat reports as the only activation source; the CCU may be expected to push initial table state first.",
"Use command 0 writes for initial seeding, then command 1 readbacks for verification. Treat command 4/5/6 as continuation-only until a live report proves otherwise.",
"Selector zero remains the highest-value activation candidate because the emulator reaches CONNECT OK when E000[0]=0x8080 and the selector-zero processing queue runs.",
"E1EC/selector 0x00F6 is a strong follow-up candidate because loc_48FA tests bit13 there and can enqueue report 0x00F6.",
"LCD text terms such as CAM/BARS/BLACK/COMM LINK appear in ROM records, but they are not direct serial payload strings; they point to selector-driven display builders.",
],
"caveats": [
"Selector names are candidates, not confirmed protocol labels.",
"Static table xrefs prove that firmware reads/writes a selector; they do not prove the external CCU must seed it on boot.",
"Generated frames are syntactically valid six-byte host frames; bench safety still depends on timing and current RCP state.",
],
}
def format_text_report(analysis: Mapping[str, Any]) -> str:
summary = analysis["summary"]
lines = [
"H8/536 CCU Seed Hint Report",
"",
f"Summary: {summary['core_model']}",
f"Confidence: {summary['confidence']}",
"",
"Table Model:",
]
for table in analysis.get("table_model", []):
lines.append(
f"- {table['name']}: {table['logical_range_hex']}; accesses={table['access_count']} "
f"static selectors={', '.join(table.get('static_selectors_hex', [])[:12]) or 'none'}"
)
lines.extend(["", "Highest-Value Selector Candidates:"])
for hint in analysis.get("selector_candidates", [])[:24]:
lines.append(
f"- {hint['selector_hex']} {hint['name']}: score={hint['score']} "
f"tables={', '.join(hint.get('tables', [])) or 'none'}"
)
for reason in hint.get("reasons", [])[:4]:
lines.append(f" - {reason}")
frames = hint.get("seed_frames", [])
if frames:
frame_text = "; ".join(f"{frame['value_hex']} -> {frame['cmd0_frame']}" for frame in frames[:3])
lines.append(f" seed frames: {frame_text}")
read_frame = hint.get("cmd1_read_frame")
if read_frame:
lines.append(f" readback frame: {read_frame}")
display = analysis.get("display_text_hints", {})
lines.extend(["", "Display Text Hints:"])
for hit in display.get("term_hits", [])[:16]:
samples = ", ".join(
f"{sample['address_hex']} {sample['text']!r}"
for sample in hit.get("samples", [])[:3]
)
lines.append(f"- {hit['term']}: {hit['hit_count']} hit(s){f' - {samples}' if samples else ''}")
dispatch = analysis.get("dispatch_table", {})
lines.extend(["", "Selector Dispatch Hints:"])
lines.append(
f"- table {dispatch.get('table_base_hex', 'unknown')}: "
f"{dispatch.get('interesting_count', 0)} non-default/interesting entries"
)
for entry in dispatch.get("interesting_entries", [])[:16]:
lines.append(
f" - selector {entry['selector_hex']} -> {entry['target_label_or_hex']} "
f"(dispatch index {entry['dispatch_index_hex']})"
)
seed_plan = analysis.get("seed_plan", {})
lines.extend(["", "Candidate Fake-CCU Seed Plan:"])
for step in seed_plan.get("steps", []):
lines.append(f"- {step['name']}: {step['frame']}")
lines.append(f" {step['why']}")
lines.extend(["", "Bench Implications:"])
for item in analysis.get("bench_implications", []):
lines.append(f"- {item}")
lines.extend(["", "Caveats:"])
for item in analysis.get("caveats", []):
lines.append(f"- {item}")
return "\n".join(lines).rstrip() + "\n"
def write_ccu_seed_hints(
input_path: Path,
output_path: Path,
*,
rom_path: Path | None = DEFAULT_ROM,
as_json: bool = False,
) -> JsonObject:
analysis = analyze_ccu_seed_hints(load_seed_hints_input(input_path), rom_path=rom_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if as_json:
output_path.write_text(json.dumps(analysis, indent=2, sort_keys=True) + "\n", encoding="utf-8")
else:
output_path.write_text(format_text_report(analysis), encoding="utf-8")
return analysis
def main(argv: list[str] | None = None, stdout: Any | None = None) -> int:
parser = argparse.ArgumentParser(description="Mine ROM hints for CCU-to-RCP state seeding candidates.")
parser.add_argument("input", nargs="?", type=Path, default=DEFAULT_INPUT)
parser.add_argument("--rom", type=Path, default=DEFAULT_ROM, help="ROM binary used for LCD text and dispatch-table mining")
parser.add_argument("--json", action="store_true", help="emit structured JSON instead of readable text")
parser.add_argument("--out", type=Path, default=None, help="write report to this path")
args = parser.parse_args(argv)
stream = stdout
if stream is None:
import sys
stream = sys.stdout
rom_path = args.rom if args.rom and args.rom.exists() else None
analysis = analyze_ccu_seed_hints(load_seed_hints_input(args.input), rom_path=rom_path)
rendered = json.dumps(analysis, indent=2, sort_keys=True) + "\n" if args.json else format_text_report(analysis)
if args.out:
args.out.parent.mkdir(parents=True, exist_ok=True)
args.out.write_text(rendered, encoding="utf-8")
print(f"wrote {args.out}", file=stream)
else:
print(rendered, end="", file=stream)
return 0
def encode_host_frame(command: int, selector: int, value: int = 0) -> list[int]:
byte1, byte2 = selector_bytes(selector)
frame = [command & 0x07, byte1, byte2, (value >> 8) & 0xFF, value & 0xFF]
frame.append(checksum(frame))
return frame
def frame_hex(frame: list[int]) -> str:
return " ".join(f"{byte & 0xFF:02X}" for byte in frame)
def selector_bytes(selector: int) -> tuple[int, int]:
selector &= 0x01FF
if selector <= 0x007F:
return 0x00, selector
if selector <= 0x017F:
return 0x01, selector - 0x0080
return 0x02, selector - 0x0180
def checksum(frame_without_checksum: list[int]) -> int:
value = CHECKSUM_SEED
for byte in frame_without_checksum[:5]:
value ^= byte & 0xFF
return value & 0xFF
def _selector_hints_from_tables(table_analysis: Mapping[str, Any]) -> dict[int, JsonObject]:
hints: dict[int, JsonObject] = {}
for table in table_analysis.get("tables", []):
if not isinstance(table, Mapping):
continue
table_name = str(table.get("name", "unknown_table"))
element = str(table.get("element_candidate", ""))
for access in table.get("accesses", []):
if not isinstance(access, Mapping) or not isinstance(access.get("offset"), int):
continue
selector = _selector_from_offset(int(access["offset"]), element)
if selector is None:
continue
hint = hints.setdefault(selector, _new_selector_hint(selector))
hint["score"] += _access_score(access, table_name)
if table_name not in hint["tables"]:
hint["tables"].append(table_name)
hint["accesses"].append(
{
"address_hex": access.get("instruction_address_hex"),
"function": access.get("function_label"),
"table": table_name,
"access": access.get("access"),
"instruction": access.get("instruction"),
}
)
reason = _access_reason(access, table_name)
if reason not in hint["reasons"]:
hint["reasons"].append(reason)
return hints
def _merge_special_selectors(hints: dict[int, JsonObject]) -> None:
for item in SPECIAL_SELECTORS:
selector = int(item["selector"])
hint = hints.setdefault(selector, _new_selector_hint(selector))
hint["score"] += 5
hint["name"] = str(item["name"])
hint["reasons"].append(str(item["reason"]))
for value in item.get("seed_values", []):
_add_seed_value(hint, int(value))
def _merge_observed_reports(hints: dict[int, JsonObject]) -> None:
for report in OBSERVED_TX_REPORT_OVERLAY:
selector = int(report["logical_index"])
hint = hints.setdefault(selector, _new_selector_hint(selector))
hint["score"] += 3
hint["name"] = str(report["name_candidate"])
frames = ", ".join(str(frame) for frame in report.get("observed_frames_hex", []))
hint["reasons"].append(f"observed RCP autonomous report frame(s): {frames}")
def _seed_plan(hints: Mapping[int, JsonObject]) -> JsonObject:
planned = [
(0x000, 0x8080, "selector zero active/connect candidate from emulator state search"),
(0x003, 0x8000, "ROM default state also sets selector 0x003 high bit"),
(0x040, 0xFFFF, "ROM default all-ones/status candidate touched by bench 0x40 family"),
(0x0F6, 0x2000, "sets E1EC bit13 candidate used by loc_48FA report bridge"),
]
steps: list[JsonObject] = []
for selector, value, why in planned:
frame = frame_hex(encode_host_frame(0x00, selector, value))
readback = frame_hex(encode_host_frame(0x01, selector, 0))
hint = hints.get(selector)
if hint is not None:
_add_seed_value(hint, value)
steps.append(
{
"selector": selector,
"selector_hex": f"0x{selector:03X}",
"value": value,
"value_hex": f"0x{value:04X}",
"name": f"cmd0 seed selector 0x{selector:03X} = 0x{value:04X}",
"frame": frame,
"readback_frame": readback,
"why": why,
}
)
return {
"model": "candidate initial CCU state push using command 0 writes, verified with command 1 reads",
"steps": steps,
}
def _table_model_summary(table_analysis: Mapping[str, Any]) -> list[JsonObject]:
rows: list[JsonObject] = []
for table in table_analysis.get("tables", []):
if not isinstance(table, Mapping):
continue
element = str(table.get("element_candidate", ""))
selectors = []
for offset in table.get("static_offsets", []):
if isinstance(offset, int):
selector = _selector_from_offset(offset, element)
if selector is not None:
selectors.append(selector)
rows.append(
{
"name": table.get("name"),
"logical_range_hex": f"{table.get('logical_base_address_hex')}-{table.get('logical_range_end_hex')}",
"access_count": table.get("access_count", 0),
"static_selectors": sorted(set(selectors)),
"static_selectors_hex": [f"0x{selector:03X}" for selector in sorted(set(selectors))],
}
)
return rows
def _display_hint_summary(payload: Mapping[str, Any], rom_path: Path | None) -> JsonObject:
del payload
if rom_path is None or not rom_path.exists():
return {"term_hits": [], "note": "ROM binary was not available for LCD text mining."}
rom = Rom(rom_path.read_bytes())
text = analyze_lcd_text(rom, None, search_terms=DISPLAY_TERMS, max_candidates=360)
term_hits = []
for search in text.get("searches", []):
if not isinstance(search, Mapping):
continue
samples = []
for address in search.get("literal_hits", [])[:4]:
if isinstance(address, int):
samples.append({"address_hex": h16(address), "text": f"literal {search.get('term')}"})
for hit in search.get("candidate_hits", [])[:6]:
if not isinstance(hit, Mapping):
continue
samples.append(
{
"address_hex": h16(int(hit.get("address", 0))),
"text": str(hit.get("trimmed") or hit.get("text") or ""),
}
)
hit_count = len(search.get("literal_hits", []) or []) + len(search.get("candidate_hits", []) or [])
term_hits.append({"term": search.get("term"), "hit_count": hit_count, "samples": samples[:8]})
return {
"term_hits": term_hits,
"regions": text.get("regions", [])[:8],
"note": "Text hits are ROM display resources, not literal serial payloads.",
}
def _dispatch_table_summary(payload: Mapping[str, Any], rom_path: Path | None) -> JsonObject:
table_base = 0x28A6
entries = _indirect_entries(payload, table_base)
if not entries and rom_path is not None and rom_path.exists():
entries = _raw_dispatch_entries(rom_path, table_base, 128)
if not entries:
return {"table_base": table_base, "table_base_hex": h16(table_base), "interesting_count": 0, "interesting_entries": []}
target_counts = Counter(int(entry["target"]) for entry in entries if isinstance(entry.get("target"), int))
default_target, _ = target_counts.most_common(1)[0]
interesting: list[JsonObject] = []
for entry in entries:
index = int(entry["index"])
selector = _selector_from_dispatch_index(index)
if selector is None:
continue
target = int(entry["target"])
label = entry.get("target_label") or h16(target)
decoded = bool(entry.get("decoded_code", True))
if target == default_target and decoded:
continue
interesting.append(
{
"selector": selector,
"selector_hex": f"0x{selector:03X}",
"dispatch_index": index,
"dispatch_index_hex": f"0x{index:03X}",
"entry_address_hex": h16(int(entry.get("entry_address", table_base + index * 2))),
"target": target,
"target_hex": h16(target),
"target_label_or_hex": str(label),
"decoded_code": decoded,
}
)
return {
"table_base": table_base,
"table_base_hex": h16(table_base),
"entry_count": len(entries),
"default_target_hex": h16(default_target),
"interesting_count": len(interesting),
"interesting_entries": interesting[:80],
}
def _indirect_entries(payload: Mapping[str, Any], table_base: int) -> list[JsonObject]:
indirect = payload.get("indirect_flow")
if not isinstance(indirect, Mapping):
return []
for site in indirect.get("sites", []):
if not isinstance(site, Mapping):
continue
table = site.get("table")
if isinstance(table, Mapping) and int(table.get("base", -1)) == table_base:
entries = table.get("entries", [])
if isinstance(entries, list):
return [dict(entry) for entry in entries if isinstance(entry, Mapping)]
return []
def _raw_dispatch_entries(rom_path: Path, table_base: int, count: int) -> list[JsonObject]:
rom = Rom(rom_path.read_bytes())
entries = []
for index in range(count):
address = table_base + index * 2
if not rom.contains(address, 2):
break
target = rom.u16(address)
entries.append({"index": index, "entry_address": address, "target": target, "target_label": None, "decoded_code": True})
return entries
def _selector_from_dispatch_index(index: int) -> int | None:
if 0 <= index <= 0x007F:
return index
if 0x0100 <= index <= 0x01FF:
return index - 0x0080
if 0x0200 <= index <= 0x027F:
return index - 0x0080
return None
def _selector_from_offset(offset: int, element: str) -> int | None:
if element == "word_value":
if offset % 2:
return None
return (offset // 2) & 0x01FF
return offset & 0x01FF
def _new_selector_hint(selector: int) -> JsonObject:
return {
"selector": selector,
"selector_hex": f"0x{selector:03X}",
"name": "state_selector_candidate",
"score": 0,
"tables": [],
"reasons": [],
"accesses": [],
"seed_frames": [],
"cmd1_read_frame": frame_hex(encode_host_frame(0x01, selector, 0)),
}
def _add_seed_value(hint: JsonObject, value: int) -> None:
frames = hint.setdefault("seed_frames", [])
value_hex = f"0x{value & 0xFFFF:04X}"
if any(frame.get("value_hex") == value_hex for frame in frames):
return
frames.append(
{
"value": value & 0xFFFF,
"value_hex": value_hex,
"cmd0_frame": frame_hex(encode_host_frame(0x00, int(hint["selector"]), value)),
}
)
def _access_score(access: Mapping[str, Any], table_name: str) -> int:
score = 1
if access.get("access") == "read":
score += 1
if access.get("access") == "write":
score += 1
if "primary" in table_name or "current" in table_name:
score += 1
return score
def _access_reason(access: Mapping[str, Any], table_name: str) -> str:
function = access.get("function_label") or "<no function>"
instruction = access.get("instruction") or ""
return f"{table_name} {access.get('access')} in {function}: {instruction}"
__all__ = [
"analyze_ccu_seed_hints",
"checksum",
"encode_host_frame",
"format_text_report",
"frame_hex",
"load_seed_hints_input",
"main",
"selector_bytes",
"write_ccu_seed_hints",
]