from __future__ import annotations import json from pathlib import Path from .cycles import cycle_comment from .dataflow import state_for_instruction from .dtc import DtcEndpointInfo, DtcRegisterInfo from .formatting import h16, label_for from .indirect import indirect_comment_for_instruction, indirect_metadata_for_instruction from .lcd_driver import lcd_comment_for_instruction, lcd_metadata_for_instruction from .lcd_text import lcd_text_comment_for_instruction from .memory import MEMORY_REGIONS, region_for from .model import Instruction from .peripheral_access import ( peripheral_comment_for_instruction, peripheral_json_payload, peripheral_metadata_for_instruction, ) from .rom import Rom from .sci import sci_comment_for_instruction, sci_json_payload, sci_metadata_for_instruction from .symbols import symbol_for_address from .tables import IO_REGISTERS from .timing import format_timing_summary from .vectors import DtcVectorEntry def _dtc_endpoint_text(endpoint: DtcEndpointInfo) -> str: address = endpoint["address"] text = endpoint["text"] return f"{text} ({h16(address)})" if text != h16(address) else text def _dtc_register_lines(vector_addr: int, entry: DtcVectorEntry, info: DtcRegisterInfo) -> list[str]: target = entry["register_info_address"] if not info.get("valid"): error = info.get("error", "register information unavailable") return [f"; {h16(vector_addr)} {entry['source']:<24} {h16(target)} unavailable: {error}"] mode = info["mode"] source = info["source"] destination = info["destination"] count = info["count"] lines = [ ( f"; {h16(vector_addr)} {entry['source']:<24} {h16(target)} " f"{mode['size']} x{count['transfers']} ({count['bytes']} bytes): " f"{_dtc_endpoint_text(source)} -> {_dtc_endpoint_text(destination)} " f"[src+={mode['source_increment_step']}, dst+={mode['destination_increment_step']}]" ), ( f"; DTMR={h16(info['dtmr'])} DTSR={h16(info['dtsr'])} " f"DTDR={h16(info['dtdr'])} DTCR={h16(info['dtcr'])}" ), ] if mode["reserved_set"]: lines.append(f"; warning: DTMR reserved bits set ({h16(mode['reserved'])})") if count["zero_means_65536"]: lines.append("; DTCR raw zero means an initial transfer count of 65536") return lines def _reference_comment(ins: Instruction, symbols: dict[str, object] | None = None) -> str: parts: list[str] = [] for address in ins.references: region = region_for(address) name = symbol_for_address(symbols, address) or IO_REGISTERS.get(address, h16(address)) parts.append(f"{name} in {region.name}") return "refs " + ", ".join(parts) if parts else "" def _symbol_lines(symbols: dict[str, object] | None) -> list[str]: if not symbols: return [] entries = symbols.get("symbols", []) if not isinstance(entries, list) or not entries: return [] lines = ["; Symbols"] for item in entries[:80]: if not isinstance(item, dict): continue address = int(item["address"]) width = item.get("width") or "unknown" line = ( f"; {item['name']:<16} {h16(address)} {item['region']:<18} {item['kind']:<8} " f"r={item['read_count']} w={item['write_count']} width={width}" ) if item.get("xref_count"): line += f" xrefs={item['xref_count']}" lines.append(line) if len(entries) > 80: lines.append(f"; ... {len(entries) - 80} more symbols omitted from listing header") lines.append("") return lines def _known_change_text(change: dict[str, object]) -> str: after = change.get("after") if not isinstance(after, dict) or not after.get("known"): return "" value = int(after["value"]) width = int(after.get("width", 16)) digits = 2 if width <= 8 else 4 return f"{change['name']}=H'{value:0{digits}X}" def _dataflow_comment(analysis: dict[str, object] | None, address: int) -> str: record = state_for_instruction(analysis, address) if not record: return "" changes = record.get("changes") if not isinstance(changes, list): return "" parts = [_known_change_text(change) for change in changes if isinstance(change, dict)] parts = [part for part in parts if part] if not parts: return "" suffix = " ..." if len(parts) > 4 else "" return "dataflow " + ", ".join(parts[:4]) + suffix def _lcd_text_lines(lcd_text: dict[str, object] | None) -> list[str]: if not lcd_text: return [] strings = lcd_text.get("strings", []) regions = lcd_text.get("regions", []) searches = lcd_text.get("searches", []) if not strings and not regions and not searches: return [] lines = ["; LCD/Text Scan"] for search in (searches if isinstance(searches, list) else []): if not isinstance(search, dict): continue hits = len(search.get("literal_hits", [])) + len(search.get("candidate_hits", [])) status = "found" if hits else "not literal" lines.append(f"; search {search.get('term')!r}: {status}, hits={hits}") near = search.get("near_matches", []) if isinstance(near, list) and near: sample = ", ".join(f"{h16(int(item['address']))} {item['trimmed']!r}" for item in near[:4]) lines.append(f"; near: {sample}") if isinstance(regions, list) and regions: lines.append("; LCD text regions") for region in regions[:12]: if not isinstance(region, dict): continue samples = ", ".join(repr(sample) for sample in region.get("samples", [])[:4]) lines.append( f"; region {h16(int(region['start']))}-{h16(int(region['end']))} " f"count={region['count']:<3} {samples}", ) if len(regions) > 12: lines.append(f"; ... {len(regions) - 12} more LCD text regions") if isinstance(strings, list) and strings: lines.append("; LCD text candidates") shown = 0 for item in strings: if not isinstance(item, dict): continue if item.get("confidence") == "low" and not item.get("xref_count"): continue xrefs = f" xrefs={item['xref_count']}" if item.get("xref_count") else "" lines.append( f"; text {h16(int(item['address'])):<8} len={item['length']:<3} " f"{item['confidence']:<6} {str(item['trimmed'])!r}{xrefs}", ) shown += 1 if shown >= 48: break if len(strings) > shown: lines.append(f"; ... {len(strings) - shown} more LCD text candidates") lines.append("") return lines def _lcd_driver_lines(lcd_driver: dict[str, object] | None) -> list[str]: if not lcd_driver: return [] accesses = lcd_driver.get("accesses", []) loops = lcd_driver.get("polling_loops", []) routines = lcd_driver.get("routines", []) if not accesses and not loops and not routines: return [] lines = ["; LCD Driver Candidates"] for address_info in lcd_driver.get("addresses", []): if not isinstance(address_info, dict): continue lines.append( f"; {h16(int(address_info['address']))} {address_info['name']:<18} {address_info['role']}", ) if isinstance(routines, list) and routines: lines.append("; LCD routines") for routine in routines[:16]: if not isinstance(routine, dict): continue roles = ", ".join(str(role) for role in routine.get("roles", [])) lines.append( f"; routine {h16(int(routine['start']))}-{h16(int(routine['end']))} " f"{routine['role_hint']:<24} {roles}", ) if len(routines) > 16: lines.append(f"; ... {len(routines) - 16} more LCD routines") if isinstance(loops, list) and loops: lines.append("; LCD busy loops") for loop in loops[:16]: if not isinstance(loop, dict): continue lines.append( f"; loop {h16(int(loop['read_address']))}->{h16(int(loop['branch_address']))} " f"{loop['summary']}", ) lines.append("") return lines def format_listing( rom_path: Path, rom: Rom, instructions: dict[int, Instruction], vectors: dict[int, tuple[str, int]], labels: dict[int, str], mode: str, traced: bool, dtc_vectors: dict[int, DtcVectorEntry] | None = None, data_candidates: dict[str, list[dict[str, object]]] | None = None, timing_summary: dict[str, list[dict[str, object]]] | None = None, show_cycles: bool = False, sci_analysis: dict[str, object] | None = None, peripheral_access: dict[str, object] | None = None, indirect_flow: dict[str, object] | None = None, dataflow: dict[str, object] | None = None, symbols: dict[str, object] | None = None, lcd_text: dict[str, object] | None = None, lcd_driver: dict[str, object] | None = None, ) -> str: lines: list[str] = [] lines.append("; H8/536 ROM disassembly") lines.append(f"; input: {rom_path}") lines.append(f"; bytes: {len(rom.data)}") lines.append(f"; vector mode: {mode}") lines.append(f"; analysis: {'recursive trace from vectors' if traced else 'linear sweep'}") lines.append(";") lines.append("; Notes from the manual:") lines.append("; - H8/536 uses the H8/500 CPU instruction set.") lines.append("; - In minimum mode the reset vector at H'0000-H'0001 is a 16-bit PC.") lines.append("; - The register field is H'FE80-H'FFFF; names below come from appendix B.") lines.append("; - @aa:8 short absolute operands use BR as the upper address byte.") lines.append("; - SCI baud inference uses section 14.2.8 BRR formulas when SMR/BRR are known.") lines.append("; - LCD inference treats E-clock H'F200/H'F201 accesses as status/control and data candidates.") if sci_analysis and sci_analysis.get("clock_hz") is None: lines.append("; - Pass --clock-hz to convert SCI BRR settings into numeric baud rates.") if show_cycles: lines.append("; - Cycle counts use Appendix A tables A-7/A-8 for on-chip access with no external wait states.") lines.append("") lines.append("; Memory Map") for region in MEMORY_REGIONS: lines.append(f"; {h16(region.start)}-{h16(region.end)} {region.name:<18} {region.kind}") lines.append("") lines.append("; Vectors") for vector_addr, (name, target) in sorted(vectors.items()): target_name = labels.get(target, label_for(target)) lines.append(f"; {h16(vector_addr)} {name:<24} -> {target_name} ({h16(target)})") lines.append("") if dtc_vectors: lines.append("; DTC Vectors") for vector_addr, entry in sorted(dtc_vectors.items()): target = entry["register_info_address"] lines.append(f"; {h16(vector_addr)} {entry['source']:<24} -> {h16(target)}") lines.append("") lines.append("; DTC Register Information") for vector_addr, entry in sorted(dtc_vectors.items()): lines.extend(_dtc_register_lines(vector_addr, entry, entry["register_info"])) lines.append("") if data_candidates: strings = data_candidates.get("strings", []) pointer_tables = data_candidates.get("pointer_tables", []) if strings or pointer_tables: lines.append("; Unreached Data Candidates") for item in strings[:40]: lines.append( f"; string {h16(int(item['address'])):<8} len={item['length']:<3} {item['text']!r}", ) for item in pointer_tables[:40]: targets = ", ".join(h16(int(target)) for target in item["targets"][:8]) suffix = " ..." if int(item["count"]) > 8 else "" lines.append( f"; ptrtbl {h16(int(item['address'])):<8} count={item['count']:<3} -> {targets}{suffix}", ) lines.append("") lines.extend(_symbol_lines(symbols)) lines.extend(_lcd_text_lines(lcd_text)) lines.extend(_lcd_driver_lines(lcd_driver)) if timing_summary: lines.extend(format_timing_summary(timing_summary)) for address in sorted(instructions): ins = instructions[address] if address in labels: lines.append("") lines.append(f"{labels[address]}:") raw = " ".join(f"{byte:02X}" for byte in ins.raw) padded_raw = raw.ljust(14) comment_parts = [ part for part in ( ins.comment, sci_comment_for_instruction(sci_analysis, address), peripheral_comment_for_instruction(peripheral_access, address), indirect_comment_for_instruction(indirect_flow, address), lcd_text_comment_for_instruction(lcd_text, address), lcd_comment_for_instruction(lcd_driver, address), _dataflow_comment(dataflow, address), _reference_comment(ins, symbols) if not ins.comment else "", cycle_comment(ins.cycles) if show_cycles else "", ) if part ] comment = f" ; {'; '.join(comment_parts)}" if comment_parts else "" lines.append(f"{address:04X}: {padded_raw} {ins.text}{comment}") lines.append("") return "\n".join(lines) def write_json( path: Path, instructions: dict[int, Instruction], vectors: dict[int, tuple[str, int]], labels: dict[int, str], dtc_vectors: dict[int, DtcVectorEntry] | None = None, data_candidates: dict[str, list[dict[str, object]]] | None = None, call_graph: dict[str, object] | None = None, timing_summary: dict[str, list[dict[str, object]]] | None = None, sci_analysis: dict[str, object] | None = None, peripheral_access: dict[str, object] | None = None, indirect_flow: dict[str, object] | None = None, dataflow: dict[str, object] | None = None, symbols: dict[str, object] | None = None, lcd_text: dict[str, object] | None = None, lcd_driver: dict[str, object] | None = None, ) -> None: payload = { "vectors": [ {"address": addr, "name": name, "target": target, "target_label": labels.get(target)} for addr, (name, target) in sorted(vectors.items()) ], "dtc_vectors": list((dtc_vectors or {}).values()), "memory_regions": [ { "name": region.name, "start": region.start, "end": region.end, "kind": region.kind, "manual": region.manual, } for region in MEMORY_REGIONS ], "data_candidates": data_candidates or {"strings": [], "pointer_tables": []}, "call_graph": call_graph or {"nodes": [], "edges": []}, "timing_summary": timing_summary or {"blocks": [], "loops": []}, "sci": sci_json_payload(sci_analysis), "peripheral_access": peripheral_json_payload(peripheral_access), "indirect_flow": indirect_flow or {"sites": []}, "dataflow": _dataflow_json_payload(dataflow), "symbols": symbols or {"symbols": [], "by_address": {}}, "lcd_text": lcd_text or {"strings": [], "regions": [], "searches": []}, "lcd_driver": lcd_driver or {"accesses": [], "polling_loops": [], "routines": []}, "instructions": [ _instruction_payload(ins, sci_analysis, peripheral_access, indirect_flow, dataflow, symbols, lcd_text, lcd_driver) for ins in (instructions[addr] for addr in sorted(instructions)) ], } path.write_text(json.dumps(payload, indent=2), encoding="utf-8") def _dataflow_json_payload(dataflow: dict[str, object] | None) -> dict[str, object]: if not dataflow: return {"blocks": [], "registers": [], "control_registers": []} return { "blocks": dataflow.get("blocks", []), "registers": dataflow.get("registers", []), "control_registers": dataflow.get("control_registers", []), } def _compact_known_values(state: object) -> dict[str, dict[str, object]]: if not isinstance(state, dict): return {} compact: dict[str, dict[str, object]] = {} for group_name in ("registers", "control"): group = state.get(group_name) if not isinstance(group, dict): continue values = { name: value for name, value in group.items() if isinstance(value, dict) and value.get("known") } if values: compact[group_name] = values return compact def _dataflow_instruction_payload(dataflow: dict[str, object] | None, address: int) -> dict[str, object]: record = state_for_instruction(dataflow, address) if not record: return {} payload: dict[str, object] = { "block": record.get("block"), "changes": record.get("changes", []), "notes": record.get("notes", []), } known_after = _compact_known_values(record.get("after")) if known_after: payload["known_after"] = known_after return payload def _instruction_payload( ins: Instruction, sci_analysis: dict[str, object] | None = None, peripheral_access: dict[str, object] | None = None, indirect_flow: dict[str, object] | None = None, dataflow: dict[str, object] | None = None, symbols: dict[str, object] | None = None, lcd_text: dict[str, object] | None = None, lcd_driver: dict[str, object] | None = None, ) -> dict[str, object]: payload: dict[str, object] = { "address": ins.address, "address_region": region_for(ins.address).name, "bytes": ins.raw.hex().upper(), "text": ins.text, "mnemonic": ins.mnemonic, "operands": ins.operands, "kind": ins.kind, "targets": ins.targets, "cycles": ins.cycles, "references": [ { "address": address, "name": IO_REGISTERS.get(address), "symbol": symbol_for_address(symbols, address), "region": region_for(address).name, "kind": region_for(address).kind, } for address in ins.references ], "comment": ins.comment, "valid": ins.valid, } sci_metadata = sci_metadata_for_instruction(sci_analysis, ins.address) if sci_metadata: payload["sci"] = sci_metadata peripheral_metadata = peripheral_metadata_for_instruction(peripheral_access, ins.address) if peripheral_metadata: payload["peripheral_access"] = peripheral_metadata indirect_metadata = indirect_metadata_for_instruction(indirect_flow, ins.address) if indirect_metadata: payload["indirect_flow"] = indirect_metadata dataflow_metadata = _dataflow_instruction_payload(dataflow, ins.address) if dataflow_metadata: payload["dataflow"] = dataflow_metadata lcd_text_comment = lcd_text_comment_for_instruction(lcd_text, ins.address) if lcd_text_comment: payload["lcd_text"] = {"comment": lcd_text_comment} lcd_driver_metadata = lcd_metadata_for_instruction(lcd_driver, ins.address) if lcd_driver_metadata: payload["lcd_driver"] = lcd_driver_metadata return payload def format_callgraph_dot(call_graph: dict[str, object]) -> str: lines = ["digraph callgraph {"] lines.append(' graph [rankdir="LR"];') for node in call_graph.get("nodes", []): label = node["label"] lines.append(f' "{label}" [label="{label}\\n{h16(int(node["start"]))}"];') for edge in call_graph.get("edges", []): lines.append(f' "{edge["from_label"]}" -> "{edge["to_label"]}" [label="{h16(int(edge["call_site"]))}"];') lines.append("}") lines.append("") return "\n".join(lines)