1
0

Data flow improvements in pseudo code generator

This commit is contained in:
Aiden
2026-05-25 14:40:55 +10:00
parent 80819448cf
commit 1d7f00e59c
16 changed files with 105891 additions and 5141 deletions

View File

@@ -34,10 +34,13 @@ To turn the structured decompile output into conservative C-like pseudocode:
- Flags/manual-annotates TEMP-register access ordering for FRT and A/D 16-bit peripheral registers.
- Scans unreached ROM ranges for ASCII strings and pointer-table candidates.
- Emits function summaries and a direct-call graph in JSON, with optional Graphviz DOT output.
- Tracks conservative per-basic-block register/control-register dataflow in JSON and comments known value changes.
- Discovers RAM/external/global symbols from memory references and pointer tables, including read/write counts and xrefs.
- Adds indirect `JSR/JMP @Rn` flow hints when a nearby indexed word load looks like a pointer table dispatch.
- Adds Appendix A cycle estimates to JSON and can append them to ASM comments.
- Summarizes straight-line block timing and backward-branch loop timing when requested.
- Handles the E-clock transfer instructions `MOVFPE` and `MOVTPE`.
- Generates a separate C-like pseudocode view from the JSON, preserving labels, calls, branches, register names, comments, and optional cycle notes.
- Generates a separate C-like pseudocode view from the JSON, preserving labels, calls, branches, register names, inferred symbols, metadata comments, optional cycle notes, and simple structured `if`/`do while` patterns.
The generated listing is written to:
@@ -76,6 +79,7 @@ python h8536_pseudocode.py --help
- `--no-asm`: omit original assembly text from pseudocode line comments.
- `--no-addresses`: omit instruction addresses from pseudocode line comments.
- `--cycles`: include cycle estimates from the JSON.
- `--no-structure`: preserve label/goto output instead of simple structured `if`/loop output.
- `--max-functions N`: emit only the first `N` functions for focused review.
## Code Layout
@@ -90,6 +94,9 @@ python h8536_pseudocode.py --help
- `h8536/data_analysis.py`: unreached string and pointer-table candidate scans.
- `h8536/memory.py`: manual-derived memory-region tagging.
- `h8536/cycles.py`: Appendix A cycle estimate tables.
- `h8536/dataflow.py`: conservative register/control-register value tracking.
- `h8536/symbols.py`: RAM/external/global symbol discovery from references and data tables.
- `h8536/indirect.py`: indirect call/jump and pointer-table dispatch hints.
- `h8536/timing.py`: block and loop cycle summaries.
- `h8536/sci.py`: SCI setup tracking and baud inference.
- `h8536/peripheral_access.py`: FRT/A-D TEMP-register access analysis.

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -6,12 +6,15 @@ from pathlib import Path
from .analysis import build_call_graph, collect_labels, linear_sweep, trace
from .cycles import annotate_cycles
from .data_analysis import analyze_unreached_data
from .dataflow import analyze_dataflow
from .decoder import H8536Decoder
from .formatting import parse_int
from .indirect import analyze_indirect_flow
from .peripheral_access import analyze_peripheral_access
from .render import format_callgraph_dot, format_listing, write_json
from .rom import Rom
from .sci import analyze_sci
from .symbols import discover_symbols
from .timing import summarize_timing
from .vectors import read_dtc_vectors_max, read_dtc_vectors_min, read_vectors_max, read_vectors_min
@@ -72,9 +75,12 @@ def main() -> int:
annotate_cycles(instructions, args.mode)
data_candidates = analyze_unreached_data(rom, instructions, args.start, end)
call_graph = build_call_graph(instructions, vectors, labels)
dataflow = analyze_dataflow(instructions, labels, call_graph)
symbols = discover_symbols(instructions, data_candidates=data_candidates)
timing_summary = summarize_timing(instructions, labels, call_graph) if args.timing else None
sci_analysis = analyze_sci(instructions, clock_hz=args.clock_hz)
peripheral_access = analyze_peripheral_access(instructions)
indirect_flow = analyze_indirect_flow(rom, instructions, labels)
args.out.parent.mkdir(parents=True, exist_ok=True)
args.out.write_text(
@@ -92,6 +98,9 @@ def main() -> int:
show_cycles=args.cycles,
sci_analysis=sci_analysis,
peripheral_access=peripheral_access,
indirect_flow=indirect_flow,
dataflow=dataflow,
symbols=symbols,
),
encoding="utf-8",
)
@@ -108,6 +117,9 @@ def main() -> int:
timing_summary=timing_summary,
sci_analysis=sci_analysis,
peripheral_access=peripheral_access,
indirect_flow=indirect_flow,
dataflow=dataflow,
symbols=symbols,
)
if args.callgraph_dot:
args.callgraph_dot.parent.mkdir(parents=True, exist_ok=True)

637
h8536/dataflow.py Normal file
View File

@@ -0,0 +1,637 @@
from __future__ import annotations
import re
from collections.abc import Iterable, Mapping
from dataclasses import dataclass
from .formatting import parse_int
from .model import Instruction
REGISTER_NAMES = tuple(f"R{idx}" for idx in range(8))
CONTROL_REGISTER_NAMES = ("CCR", "BR", "EP", "DP", "TP", "SR")
@dataclass(frozen=True)
class TrackedValue:
value: int | None = None
width: int | None = None
source: str = ""
reason: str = ""
@property
def known(self) -> bool:
return self.value is not None and self.width is not None
State = dict[str, dict[str, TrackedValue]]
def analyze_dataflow(
instructions: Mapping[int, Instruction],
labels: Mapping[int, str] | None = None,
functions: object | None = None,
) -> dict[str, object]:
"""Track simple register values through conservative linear basic blocks.
The pass intentionally avoids merging states at branch targets. Each basic
block starts with unknown register state, then instructions are interpreted
linearly until a branch, jump, return, label, function entry, or address gap.
"""
ordered = sorted(instructions)
block_starts = _find_block_starts(instructions, labels, functions)
instruction_records: dict[int, dict[str, object]] = {}
blocks: list[dict[str, object]] = []
state: State | None = None
current_block: dict[str, object] | None = None
for index, address in enumerate(ordered):
ins = instructions[address]
next_address = ordered[index + 1] if index + 1 < len(ordered) else None
starts_new_block = state is None or current_block is None or address in block_starts
if index > 0:
previous = instructions[ordered[index - 1]]
starts_new_block = starts_new_block or not _is_contiguous(previous, address)
if starts_new_block:
if current_block is not None:
blocks.append(current_block)
state = _initial_state()
current_block = {"start": address, "instructions": []}
assert state is not None
assert current_block is not None
before = _copy_state(state)
after, notes = _transfer(ins, before)
changes = _state_changes(before, after)
block_start = int(current_block["start"])
record = {
"address": address,
"text": ins.text,
"mnemonic": ins.mnemonic,
"operands": ins.operands,
"kind": ins.kind,
"block": block_start,
"before": _public_state(before),
"after": _public_state(after),
"changes": changes,
"notes": notes,
}
instruction_records[address] = record
cast_instructions = current_block["instructions"]
assert isinstance(cast_instructions, list)
cast_instructions.append(address)
current_block["end"] = address
current_block["end_exclusive"] = address + max(ins.size, 1)
state = after
if _ends_basic_block(ins, next_address):
blocks.append(current_block)
current_block = None
state = None
if current_block is not None:
blocks.append(current_block)
return {
"instructions": instruction_records,
"blocks": blocks,
"registers": REGISTER_NAMES,
"control_registers": CONTROL_REGISTER_NAMES,
}
track_registers = analyze_dataflow
def state_for_instruction(analysis: Mapping[str, object] | None, address: int) -> dict[str, object]:
if not analysis:
return {}
instructions = analysis.get("instructions")
if not isinstance(instructions, Mapping):
return {}
record = instructions.get(address)
return record if isinstance(record, dict) else {}
def _find_block_starts(
instructions: Mapping[int, Instruction],
labels: Mapping[int, str] | None,
functions: object | None,
) -> set[int]:
addresses = set(instructions)
starts: set[int] = set()
if addresses:
starts.add(min(addresses))
if labels:
starts.update(address for address in labels if address in addresses)
starts.update(address for address in _function_entries(functions) if address in addresses)
for address, ins in instructions.items():
starts.update(target for target in ins.targets if target in addresses)
if ins.kind == "branch" and ins.fallthrough:
fallthrough = address + max(ins.size, 1)
if fallthrough in addresses:
starts.add(fallthrough)
return starts
def _function_entries(functions: object | None) -> set[int]:
if functions is None:
return set()
if isinstance(functions, Mapping):
if "nodes" in functions:
return _function_entries(functions.get("nodes"))
if "start" in functions:
value = functions.get("start")
return {int(value)} if value is not None else set()
entries: set[int] = set()
for key, value in functions.items():
if isinstance(key, int):
entries.add(key)
if isinstance(value, Mapping) and "start" in value:
entries.add(int(value["start"]))
return entries
if isinstance(functions, Iterable) and not isinstance(functions, (str, bytes)):
entries = set()
for item in functions:
if isinstance(item, int):
entries.add(item)
elif isinstance(item, Mapping) and "start" in item:
entries.add(int(item["start"]))
return entries
return set()
def _initial_state(reason: str = "block_entry") -> State:
return {
"registers": {name: _unknown(reason) for name in REGISTER_NAMES},
"control": {name: _unknown(reason) for name in CONTROL_REGISTER_NAMES},
}
def _copy_state(state: State) -> State:
return {
"registers": dict(state["registers"]),
"control": dict(state["control"]),
}
def _public_state(state: State) -> dict[str, dict[str, dict[str, object]]]:
return {
"registers": {name: _public_value(value) for name, value in state["registers"].items()},
"control": {name: _public_value(value) for name, value in state["control"].items()},
}
def _public_value(value: TrackedValue) -> dict[str, object]:
if not value.known:
result: dict[str, object] = {"known": False}
if value.reason:
result["reason"] = value.reason
return result
assert value.value is not None
assert value.width is not None
digits = 2 if value.width <= 8 else 4
result = {
"known": True,
"value": value.value,
"hex": f"0x{value.value:0{digits}X}",
"width": value.width,
}
if value.source:
result["source"] = value.source
return result
def _unknown(reason: str = "") -> TrackedValue:
return TrackedValue(reason=reason)
def _known(value: int, width: int, source: str) -> TrackedValue:
return TrackedValue(value=value & _mask(width), width=width, source=source)
def _transfer(ins: Instruction, state: State) -> tuple[State, list[str]]:
after = _copy_state(state)
notes: list[str] = []
mnemonic = ins.mnemonic
base = _mnemonic_base(mnemonic)
width = _mnemonic_width(mnemonic)
ops = split_operands(ins.operands)
if ins.kind == "call":
_unknown_all(after, "call")
notes.append("call clobbers tracked register state")
return after, notes
if ins.kind == "jump" and not ins.targets:
_unknown_all(after, "indirect_jump")
notes.append("indirect jump ends known register state")
return after, notes
if ins.writes_br:
if ins.br_value is None:
_set_control_unknown(after, "BR", "control_load")
else:
_set_control_known(after, "BR", ins.br_value, 8, ins.text)
notes.append("tracked BR write")
if base == "NOP":
return after, notes
if base in {"CMP:E", "CMP:I", "CMP:G", "TST", "BTST"}:
_unknown_ccr(after, "flags")
return after, notes
if base in {"MOV:I", "MOV:E", "MOV:G"} and len(ops) == 2:
_apply_mov(after, ops[0], ops[1], width, ins, notes)
_unknown_ccr(after, "flags")
return after, notes
if base in {"MOV:L", "MOV:F", "MOVFPE"} and len(ops) == 2:
if _is_register(ops[1]):
_set_register_unknown(after, ops[1], "memory_load")
notes.append(f"{ops[1]} unknown after memory load")
_apply_addressing_side_effects(after, ops, width)
_unknown_ccr(after, "flags")
return after, notes
if base in {"MOV:S", "MOVTPE"}:
_apply_addressing_side_effects(after, ops, width)
_unknown_ccr(after, "flags")
return after, notes
if base == "CLR" and len(ops) == 1:
if _is_register(ops[0]):
_set_register_known(after, ops[0], 0, width or 16, ins.text)
notes.append(f"{ops[0]} cleared")
else:
_apply_addressing_side_effects(after, ops, width)
_unknown_ccr(after, "flags")
return after, notes
if base in {"ADD", "ADD:G", "ADD:Q", "ADDS", "SUB", "SUBS"} and len(ops) == 2:
_apply_add_sub(after, base, ops[0], ops[1], width, ins, notes)
_unknown_ccr(after, "flags")
return after, notes
if base == "LDC" and len(ops) == 2:
_apply_ldc(after, ops[0], ops[1], width, ins, notes)
return after, notes
if base == "STC" and len(ops) == 2:
_apply_stc(after, ops[0], ops[1], width, ins, notes)
return after, notes
if base in {"ORC", "ANDC", "XORC"} and len(ops) == 2:
_apply_control_binary(after, base, ops[0], ops[1], width, ins, notes)
return after, notes
_apply_unsupported(after, base, ops, width, ins, notes)
return after, notes
def split_operands(operands: str) -> list[str]:
if not operands:
return []
parts: list[str] = []
start = 0
depth = 0
for idx, char in enumerate(operands):
if char in "({":
depth += 1
elif char in ")}" and depth:
depth -= 1
elif char == "," and depth == 0:
parts.append(operands[start:idx].strip())
start = idx + 1
parts.append(operands[start:].strip())
return [part for part in parts if part]
def _apply_mov(
state: State,
source: str,
dest: str,
width: int | None,
ins: Instruction,
notes: list[str],
) -> None:
effective_width = width or 16
_apply_addressing_side_effects(state, (source, dest), effective_width)
if not _is_register(dest):
return
if source.startswith("@"):
_set_register_unknown(state, dest, "memory_load")
notes.append(f"{dest} unknown after memory load")
return
operand = _operand_value(state, source, effective_width)
if operand is None:
_set_register_unknown(state, dest, "unknown_operand")
notes.append(f"{dest} unknown after MOV source")
return
_set_register_known(state, dest, operand, effective_width, ins.text)
notes.append(f"{dest} = {_format_known(operand, effective_width)}")
def _apply_add_sub(
state: State,
base: str,
source: str,
dest: str,
width: int | None,
ins: Instruction,
notes: list[str],
) -> None:
effective_width = width or 16
_apply_addressing_side_effects(state, (source, dest), effective_width)
if not _is_register(dest):
return
if source.startswith("@"):
_set_register_unknown(state, dest, "memory_load")
notes.append(f"{dest} unknown after arithmetic memory source")
return
left = _operand_value(state, dest, effective_width)
right = _operand_value(state, source, effective_width)
if left is None or right is None:
_set_register_unknown(state, dest, "unknown_operand")
notes.append(f"{dest} unknown after arithmetic")
return
if base.startswith("SUB"):
result = left - right
else:
result = left + right
_set_register_known(state, dest, result, effective_width, ins.text)
notes.append(f"{dest} = {_format_known(result, effective_width)}")
def _apply_ldc(
state: State,
source: str,
dest: str,
width: int | None,
ins: Instruction,
notes: list[str],
) -> None:
control = _control_name(dest)
if control is None:
return
effective_width = _control_width(control, width)
if source.startswith("@"):
_set_control_unknown(state, control, "memory_load")
notes.append(f"{control} unknown after memory load")
return
value = _operand_value(state, source, effective_width)
if value is None:
_set_control_unknown(state, control, "unknown_operand")
notes.append(f"{control} unknown after LDC source")
return
_set_control_known(state, control, value, effective_width, ins.text)
notes.append(f"{control} = {_format_known(value, effective_width)}")
def _apply_stc(
state: State,
source: str,
dest: str,
width: int | None,
ins: Instruction,
notes: list[str],
) -> None:
control = _control_name(source)
if control is None:
return
effective_width = _control_width(control, width)
value = _control_value(state, control, effective_width)
if _is_register(dest):
if value is None:
_set_register_unknown(state, dest, "unknown_operand")
notes.append(f"{dest} unknown after STC source")
else:
_set_register_known(state, dest, value, effective_width, ins.text)
notes.append(f"{dest} = {_format_known(value, effective_width)}")
else:
_apply_addressing_side_effects(state, (dest,), effective_width)
def _apply_control_binary(
state: State,
base: str,
source: str,
dest: str,
width: int | None,
ins: Instruction,
notes: list[str],
) -> None:
control = _control_name(dest)
if control is None:
return
effective_width = _control_width(control, width)
left = _control_value(state, control, effective_width)
right = _operand_value(state, source, effective_width)
if left is None or right is None:
_set_control_unknown(state, control, "unknown_operand")
notes.append(f"{control} unknown after {base}")
return
if base == "ORC":
result = left | right
elif base == "ANDC":
result = left & right
else:
result = left ^ right
_set_control_known(state, control, result, effective_width, ins.text)
notes.append(f"{control} = {_format_known(result, effective_width)}")
def _apply_unsupported(
state: State,
base: str,
ops: list[str],
width: int | None,
ins: Instruction,
notes: list[str],
) -> None:
if base in {"RTE", "RTS", "RTD", "PRTS", "PRTD", "SLEEP", "BRA", "BHI", "BLS", "BCC", "BCS", "BNE", "BEQ", "BVC", "BVS", "BPL", "BMI", "BGE", "BLT", "BGT", "BLE", "BRN", "SCB/F", "SCB/NE", "SCB/EQ", "JMP", "PJMP", "BSR", "JSR", "PJSR"}:
return
affected = _written_registers(base, ops)
for register in affected:
_set_register_unknown(state, register, f"unsupported:{ins.mnemonic}")
_apply_addressing_side_effects(state, ops, width)
if affected:
notes.append(f"unsupported operation invalidated {', '.join(affected)}")
if _may_update_ccr(base):
_unknown_ccr(state, "flags")
def _operand_value(state: State, operand: str, width: int) -> int | None:
operand = operand.strip()
immediate = _parse_immediate(operand)
if immediate is not None:
return immediate & _mask(width)
if _is_register(operand):
value = state["registers"][operand]
return _narrow(value, width)
control = _control_name(operand)
if control is not None:
return _control_value(state, control, width)
return None
def _control_value(state: State, control: str, width: int) -> int | None:
return _narrow(state["control"][control], width)
def _narrow(value: TrackedValue, width: int) -> int | None:
if not value.known or value.value is None or value.width is None:
return None
if width <= value.width:
return value.value & _mask(width)
return None
def _parse_immediate(operand: str) -> int | None:
if not operand.startswith("#"):
return None
text = operand[1:].strip()
if not text:
return None
if text.startswith("-"):
return -parse_int(text[1:])
try:
return parse_int(text)
except ValueError:
return None
def _set_register_known(state: State, register: str, value: int, width: int, source: str) -> None:
state["registers"][register] = _known(value, width, source)
def _set_register_unknown(state: State, register: str, reason: str) -> None:
state["registers"][register] = _unknown(reason)
def _set_control_known(state: State, control: str, value: int, width: int, source: str) -> None:
state["control"][control] = _known(value, width, source)
def _set_control_unknown(state: State, control: str, reason: str) -> None:
state["control"][control] = _unknown(reason)
def _unknown_all(state: State, reason: str) -> None:
for register in REGISTER_NAMES:
_set_register_unknown(state, register, reason)
for control in CONTROL_REGISTER_NAMES:
_set_control_unknown(state, control, reason)
def _unknown_ccr(state: State, reason: str) -> None:
_set_control_unknown(state, "CCR", reason)
def _apply_addressing_side_effects(state: State, operands: Iterable[str], width: int | None) -> None:
_ = width
for operand in operands:
match = re.fullmatch(r"@-(R[0-7])", operand) or re.fullmatch(r"@(R[0-7])\+", operand)
if match:
_set_register_unknown(state, match.group(1), "addressing_side_effect")
def _written_registers(base: str, ops: list[str]) -> list[str]:
if base == "LDM" and len(ops) == 2:
return [reg for reg in REGISTER_NAMES if re.search(rf"\b{reg}\b", ops[1])]
if base in {"SWAP", "EXTS", "EXTU", "NEG", "NOT", "SHAL", "SHAR", "SHLL", "SHLR", "ROTL", "ROTR", "ROTXL", "ROTXR", "TAS"} and ops:
return [ops[0]] if _is_register(ops[0]) else []
if len(ops) >= 2 and base not in {"CMP", "CMP:E", "CMP:I", "CMP:G", "BTST", "TST", "STM"}:
dest = ops[-1]
return [dest] if _is_register(dest) else []
return []
def _may_update_ccr(base: str) -> bool:
return base not in {"NOP", "MOV:S", "MOVTPE", "STC", "LDC", "STM", "LDM", "LINK", "UNLK"}
def _state_changes(before: State, after: State) -> list[dict[str, object]]:
changes: list[dict[str, object]] = []
for group_name, public_name in (("registers", "register"), ("control", "control")):
for name in before[group_name]:
if before[group_name][name] == after[group_name][name]:
continue
changes.append(
{
"kind": public_name,
"name": name,
"before": _public_value(before[group_name][name]),
"after": _public_value(after[group_name][name]),
}
)
return changes
def _ends_basic_block(ins: Instruction, next_address: int | None) -> bool:
if next_address is None:
return True
if ins.kind in {"branch", "jump", "return", "rte", "sleep"}:
return True
if not ins.fallthrough:
return True
return not _is_contiguous(previous_instruction=ins, address=next_address)
def _is_contiguous(previous_instruction: Instruction, address: int) -> bool:
return previous_instruction.address + max(previous_instruction.size, 1) == address
def _mnemonic_base(mnemonic: str) -> str:
return mnemonic.rsplit(".", 1)[0] if "." in mnemonic else mnemonic
def _mnemonic_width(mnemonic: str) -> int | None:
suffix = mnemonic.rsplit(".", 1)[-1] if "." in mnemonic else ""
if suffix == "B":
return 8
if suffix == "W":
return 16
if mnemonic.endswith(":I"):
return 16
if mnemonic.endswith(":E"):
return 8
return None
def _control_width(control: str, mnemonic_width: int | None) -> int:
if control == "SR":
return 16
return mnemonic_width or 8
def _mask(width: int) -> int:
return (1 << width) - 1
def _format_known(value: int, width: int) -> str:
digits = 2 if width <= 8 else 4
return f"0x{value & _mask(width):0{digits}X}"
def _is_register(operand: str) -> bool:
return operand in REGISTER_NAMES
def _control_name(operand: str) -> str | None:
operand = operand.strip()
return operand if operand in CONTROL_REGISTER_NAMES else None

148
h8536/indirect.py Normal file
View File

@@ -0,0 +1,148 @@
from __future__ import annotations
import re
from collections.abc import Mapping
from .formatting import h16, parse_int
from .memory import region_for
from .model import Instruction
from .rom import Rom
INDEXED_WORD_LOAD_RE = re.compile(r"^@\((?P<base>[^,]+),\s*(?P<index>R[0-7])\),\s*(?P<dest>R[0-7])$")
INDIRECT_FLOW_RE = re.compile(r"^@(?P<reg>R[0-7])$")
def analyze_indirect_flow(
rom: Rom,
instructions: Mapping[int, Instruction],
labels: Mapping[int, str] | None = None,
*,
max_entries: int = 128,
) -> dict[str, object]:
labels = labels or {}
ordered = [instructions[address] for address in sorted(instructions)]
by_address = {ins.address: ins for ins in ordered}
known_code = set(by_address)
sites: list[dict[str, object]] = []
for index, ins in enumerate(ordered):
if ins.kind not in {"call", "jump"} or ins.targets:
continue
target_reg = _indirect_target_register(ins.operands)
if target_reg is None:
continue
previous = ordered[index - 1] if index else None
table = _table_from_previous_load(rom, previous, target_reg, known_code, labels, max_entries)
site: dict[str, object] = {
"address": ins.address,
"instruction": ins.text,
"kind": ins.kind,
"target_register": target_reg,
"confidence": "table_load" if table else "unknown",
}
if table:
site["table"] = table
site["summary"] = _site_summary(ins, target_reg, table)
else:
site["summary"] = f"{ins.text} uses {target_reg}; target not resolved"
sites.append(site)
return {"sites": sites}
def indirect_comment_for_instruction(analysis: Mapping[str, object] | None, address: int) -> str:
if not analysis:
return ""
for site in analysis.get("sites", []):
if isinstance(site, Mapping) and int(site.get("address", -1)) == address:
return str(site.get("summary", ""))
return ""
def indirect_metadata_for_instruction(
analysis: Mapping[str, object] | None,
address: int,
) -> dict[str, object] | None:
if not analysis:
return None
for site in analysis.get("sites", []):
if isinstance(site, dict) and int(site.get("address", -1)) == address:
return site
return None
def _indirect_target_register(operands: str) -> str | None:
match = INDIRECT_FLOW_RE.match(operands.strip())
return match.group("reg") if match else None
def _table_from_previous_load(
rom: Rom,
previous: Instruction | None,
target_reg: str,
known_code: set[int],
labels: Mapping[int, str],
max_entries: int,
) -> dict[str, object] | None:
if previous is None or not previous.mnemonic.startswith("MOV:G.W"):
return None
match = INDEXED_WORD_LOAD_RE.match(previous.operands.strip())
if not match or match.group("dest") != target_reg:
return None
try:
base = parse_int(match.group("base"))
except ValueError:
return None
if not rom.contains(base, 2):
return None
entries: list[dict[str, object]] = []
for entry_index in range(max_entries):
address = base + entry_index * 2
if not rom.contains(address, 2):
break
target = rom.u16(address)
if target in (0x0000, 0xFFFF):
break
region = region_for(target)
is_code = target in known_code
plausible = is_code or region.kind == "program"
if not plausible and entries:
break
entries.append(
{
"index": entry_index,
"entry_address": address,
"target": target,
"target_label": labels.get(target),
"target_region": region.name,
"decoded_code": is_code,
},
)
if not plausible:
break
if not entries:
return None
decoded = sum(1 for entry in entries if entry["decoded_code"])
return {
"base": base,
"index_register": match.group("index"),
"target_register": target_reg,
"load_address": previous.address,
"load_instruction": previous.text,
"entry_size": 2,
"entry_count": len(entries),
"decoded_target_count": decoded,
"entries": entries,
}
def _site_summary(ins: Instruction, target_reg: str, table: Mapping[str, object]) -> str:
decoded = int(table["decoded_target_count"])
total = int(table["entry_count"])
return (
f"{ins.text} uses {target_reg} loaded from pointer table {h16(int(table['base']))} "
f"via {table['index_register']} ({decoded}/{total} decoded targets)"
)

View File

@@ -29,6 +29,27 @@ BRANCH_CONDITIONS = {
"BLE": "Z || (N != V)",
}
NEGATED_BRANCH_CONDITIONS = {
"BRN": "1",
"BHI": "C || Z",
"BLS": "!C && !Z",
"BCC": "C",
"BCS": "!C",
"BNE": "Z",
"BEQ": "!Z",
"BVC": "V",
"BVS": "!V",
"BPL": "N",
"BMI": "!N",
"BGE": "N != V",
"BLT": "N == V",
"BGT": "Z || (N != V)",
"BLE": "!Z && (N == V)",
}
_MAX_STRUCTURED_IF_BODY = 8
_MAX_STRUCTURED_LOOP_BODY = 24
@dataclass(frozen=True)
class PseudocodeOptions:
@@ -37,6 +58,22 @@ class PseudocodeOptions:
include_cycles: bool = False
emit_declarations: bool = True
max_functions: int | None = None
structured: bool = True
@dataclass(frozen=True)
class _IfCandidate:
target_index: int
target_address: int
condition: str
instruction: JsonObject
@dataclass(frozen=True)
class _LoopCandidate:
end_index: int
condition: str
instruction: JsonObject
def generate_pseudocode(
@@ -55,7 +92,7 @@ def generate_pseudocode(
lines: list[str] = []
lines.extend(_file_header(source_name, payload))
if opts.emit_declarations:
lines.extend(_declarations(instructions, functions, label_names))
lines.extend(_declarations(payload, instructions, functions, label_names))
by_address = {int(ins["address"]): ins for ins in instructions}
all_addresses = sorted(by_address)
@@ -111,6 +148,7 @@ def main(argv: list[str] | None = None) -> int:
parser.add_argument("--no-addresses", action="store_true", help="omit instruction addresses from line comments")
parser.add_argument("--cycles", action="store_true", help="include cycle estimates when present in JSON")
parser.add_argument("--no-declarations", action="store_true", help="omit register/function declarations")
parser.add_argument("--no-structure", action="store_true", help="preserve label/goto output without if/loop structuring")
parser.add_argument("--max-functions", type=int, default=None, help="emit only the first N functions")
args = parser.parse_args(argv)
@@ -120,6 +158,7 @@ def main(argv: list[str] | None = None) -> int:
include_cycles=args.cycles,
emit_declarations=not args.no_declarations,
max_functions=args.max_functions,
structured=not args.no_structure,
)
write_pseudocode(args.input, args.out, options)
print(f"wrote {args.out}")
@@ -159,7 +198,12 @@ def _file_header(source_name: str, payload: JsonObject) -> list[str]:
]
def _declarations(instructions: list[JsonObject], functions: list[JsonObject], labels: dict[int, str]) -> list[str]:
def _declarations(
payload: JsonObject,
instructions: list[JsonObject],
functions: list[JsonObject],
labels: dict[int, str],
) -> list[str]:
lines: list[str] = []
registers = _referenced_io_registers(instructions)
if registers:
@@ -169,6 +213,18 @@ def _declarations(instructions: list[JsonObject], functions: list[JsonObject], l
lines.append(f"extern volatile {c_type} {c_identifier(name)}; /* 0x{address:04X} */")
lines.append("")
memory_symbols = _referenced_memory_symbols(payload)
if memory_symbols:
lines.append("/* RAM/external symbols inferred from instruction references and data tables. */")
for symbol in memory_symbols:
c_type = "u16" if symbol.get("width") == "word" else "u8"
width = symbol.get("width") or "unknown"
lines.append(
f"extern volatile {c_type} {c_identifier(str(symbol['name']))}; "
f"/* 0x{int(symbol['address']):04X} {symbol['kind']} {width} */"
)
lines.append("")
if functions:
lines.append("/* Function entry points discovered from vectors and call targets. */")
for function in functions:
@@ -193,6 +249,20 @@ def _referenced_io_registers(instructions: list[JsonObject]) -> dict[str, tuple[
return registers
def _referenced_memory_symbols(payload: JsonObject) -> list[JsonObject]:
symbols = payload.get("symbols", {}).get("symbols", [])
if not isinstance(symbols, list):
return []
memory_symbols: list[JsonObject] = []
for symbol in symbols:
if not isinstance(symbol, dict) or symbol.get("kind") == "register":
continue
if not symbol.get("name") or symbol.get("address") is None:
continue
memory_symbols.append(symbol)
return sorted(memory_symbols, key=lambda symbol: int(symbol["address"]))
def _collect_label_names(payload: JsonObject) -> dict[int, str]:
labels: dict[int, str] = {}
for vector in payload.get("vectors", []):
@@ -258,13 +328,7 @@ def _render_function(
if sources:
lines.append(f" /* vector sources: {', '.join(str(source) for source in sources)} */")
for address in addresses:
if address in local_targets and address != start:
lines.append(f"{labels.get(address, _label_for(address))}:")
ins = by_address[address]
statement = _translate_instruction(ins, labels)
comment = _line_comment(ins, opts)
lines.append(f" {statement}{comment}")
lines.extend(_render_instruction_block(addresses, by_address, labels, opts, local_targets, function_entry=start))
lines.append("}")
lines.append("")
@@ -281,16 +345,271 @@ def _render_orphan_block(
local_targets = _local_target_addresses(addresses, by_address) | {
address for address in addresses if address in labels
}
for address in addresses:
if address in local_targets:
lines.append(f"{labels.get(address, _label_for(address))}:")
ins = by_address[address]
lines.append(f" {_translate_instruction(ins, labels)}{_line_comment(ins, opts)}")
lines.extend(_render_instruction_block(addresses, by_address, labels, opts, local_targets, function_entry=None))
lines.append("}")
lines.append("")
return lines
def _render_instruction_block(
addresses: list[int],
by_address: dict[int, JsonObject],
labels: dict[int, str],
opts: PseudocodeOptions,
local_targets: set[int],
*,
function_entry: int | None,
) -> list[str]:
if not opts.structured:
return _render_linear_block(
addresses,
by_address,
labels,
opts,
local_targets,
function_entry=function_entry,
suppressed_labels=set(),
indent=1,
)
incoming = _incoming_local_targets(addresses, by_address)
suppressed_labels: set[int] = set()
return _render_structured_block(
addresses,
by_address,
labels,
opts,
local_targets,
incoming,
function_entry=function_entry,
suppressed_labels=suppressed_labels,
indent=1,
)
def _render_structured_block(
addresses: list[int],
by_address: dict[int, JsonObject],
labels: dict[int, str],
opts: PseudocodeOptions,
local_targets: set[int],
incoming: dict[int, set[int]],
*,
function_entry: int | None,
suppressed_labels: set[int],
indent: int,
) -> list[str]:
lines: list[str] = []
address_to_index = {address: index for index, address in enumerate(addresses)}
index = 0
while index < len(addresses):
loop = _loop_candidate_at(index, addresses, address_to_index, by_address, local_targets, incoming)
if loop:
start_address = addresses[index]
suppressed_labels.add(start_address)
lines.append(f"{_indent(indent)}do {{")
lines.extend(
_render_structured_block(
addresses[index : loop.end_index],
by_address,
labels,
opts,
local_targets,
incoming,
function_entry=function_entry,
suppressed_labels=suppressed_labels,
indent=indent + 1,
)
)
lines.append(f"{_indent(indent)}}} while ({loop.condition});{_line_comment(loop.instruction, opts)}")
index = loop.end_index + 1
continue
if_candidate = _if_candidate_at(index, addresses, address_to_index, by_address, local_targets, incoming)
if if_candidate:
suppressed_labels.add(if_candidate.target_address)
lines.append(f"{_indent(indent)}if ({if_candidate.condition}) {{{_line_comment(if_candidate.instruction, opts)}")
lines.extend(
_render_structured_block(
addresses[index + 1 : if_candidate.target_index],
by_address,
labels,
opts,
local_targets,
incoming,
function_entry=function_entry,
suppressed_labels=suppressed_labels,
indent=indent + 1,
)
)
lines.append(f"{_indent(indent)}}}")
index = if_candidate.target_index
continue
address = addresses[index]
lines.extend(
_render_linear_block(
[address],
by_address,
labels,
opts,
local_targets,
function_entry=function_entry,
suppressed_labels=suppressed_labels,
indent=indent,
)
)
index += 1
return lines
def _render_linear_block(
addresses: list[int],
by_address: dict[int, JsonObject],
labels: dict[int, str],
opts: PseudocodeOptions,
local_targets: set[int],
*,
function_entry: int | None,
suppressed_labels: set[int],
indent: int,
) -> list[str]:
lines: list[str] = []
for address in addresses:
if _should_emit_label(address, local_targets, function_entry, suppressed_labels):
lines.append(f"{_indent(max(indent - 1, 0))}{labels.get(address, _label_for(address))}:")
ins = by_address[address]
lines.append(f"{_indent(indent)}{_translate_instruction(ins, labels)}{_line_comment(ins, opts)}")
return lines
def _if_candidate_at(
index: int,
addresses: list[int],
address_to_index: dict[int, int],
by_address: dict[int, JsonObject],
local_targets: set[int],
incoming: dict[int, set[int]],
) -> _IfCandidate | None:
address = addresses[index]
ins = by_address[address]
base = _conditional_branch_base(ins)
if not base:
return None
target = _single_target_address(ins)
if target is None or target <= address or target not in address_to_index:
return None
target_index = address_to_index[target]
body_addresses = addresses[index + 1 : target_index]
if not body_addresses or len(body_addresses) > _MAX_STRUCTURED_IF_BODY:
return None
if not _is_straight_line_span(body_addresses, by_address):
return None
if any(body_address in local_targets for body_address in body_addresses):
return None
if any(incoming.get(body_address) for body_address in body_addresses):
return None
if incoming.get(target, set()) != {address}:
return None
return _IfCandidate(
target_index=target_index,
target_address=target,
condition=NEGATED_BRANCH_CONDITIONS[base],
instruction=ins,
)
def _loop_candidate_at(
index: int,
addresses: list[int],
address_to_index: dict[int, int],
by_address: dict[int, JsonObject],
local_targets: set[int],
incoming: dict[int, set[int]],
) -> _LoopCandidate | None:
start_address = addresses[index]
max_end = min(len(addresses), index + _MAX_STRUCTURED_LOOP_BODY + 1)
for end_index in range(index + 1, max_end):
branch_address = addresses[end_index]
branch = by_address[branch_address]
base = _conditional_branch_base(branch)
if not base:
continue
target = _single_target_address(branch)
if target != start_address or target not in address_to_index:
continue
body_addresses = addresses[index:end_index]
interior_addresses = addresses[index + 1 : end_index + 1]
if not body_addresses or not _is_straight_line_span(body_addresses, by_address):
return None
if any(address in local_targets for address in interior_addresses):
return None
if incoming.get(start_address, set()) != {branch_address}:
return None
if any(incoming.get(address) for address in interior_addresses):
return None
return _LoopCandidate(
end_index=end_index,
condition=BRANCH_CONDITIONS[base],
instruction=branch,
)
return None
def _incoming_local_targets(addresses: list[int], by_address: dict[int, JsonObject]) -> dict[int, set[int]]:
address_set = set(addresses)
incoming: dict[int, set[int]] = {address: set() for address in addresses}
for source in addresses:
for target in by_address[source].get("targets", []):
target_address = int(target)
if target_address in address_set:
incoming[target_address].add(source)
return incoming
def _conditional_branch_base(ins: JsonObject) -> str | None:
if str(ins.get("kind", "normal")) != "branch":
return None
base = _mnemonic_base(str(ins.get("mnemonic", "")))
if base == "BRN" or base not in BRANCH_CONDITIONS:
return None
return base
def _single_target_address(ins: JsonObject) -> int | None:
targets = ins.get("targets", [])
if len(targets) != 1:
return None
return int(targets[0])
def _is_straight_line_span(addresses: list[int], by_address: dict[int, JsonObject]) -> bool:
for address in addresses:
kind = str(by_address[address].get("kind", "normal"))
if kind in {"branch", "jump", "return", "rte"}:
return False
return True
def _should_emit_label(
address: int,
local_targets: set[int],
function_entry: int | None,
suppressed_labels: set[int],
) -> bool:
return address in local_targets and address != function_entry and address not in suppressed_labels
def _indent(level: int) -> str:
return " " * level
def _local_target_addresses(addresses: list[int], by_address: dict[int, JsonObject]) -> set[int]:
address_set = set(addresses)
targets: set[int] = set()
@@ -433,6 +752,9 @@ def _branch_or_jump_statement(ins: JsonObject, labels: dict[int, str], ops: list
if base in {"BRA", "JMP", "PJMP"}:
if target:
return f"goto {target};"
table_expr = _indirect_table_call_args(ins)
if table_expr:
return f"goto_indirect_table({table_expr});"
expr = _format_operand(ops[0], "") if ops else "unknown_target"
return f"goto_indirect({expr});"
if base.startswith("SCB/"):
@@ -447,10 +769,26 @@ def _call_statement(ins: JsonObject, labels: dict[int, str], ops: list[str]) ->
target = _target_label(ins, labels)
if target:
return f"{target}();"
table_expr = _indirect_table_call_args(ins)
if table_expr:
return f"call_indirect_table({table_expr});"
expr = _format_operand(ops[0], "") if ops else "unknown_target"
return f"call_indirect({expr});"
def _indirect_table_call_args(ins: JsonObject) -> str:
indirect = ins.get("indirect_flow")
if not isinstance(indirect, dict):
return ""
table = indirect.get("table")
if not isinstance(table, dict) or table.get("base") is None:
return ""
base = int(table["base"])
index_register = c_identifier(str(table.get("index_register") or "index"))
target_register = c_identifier(str(table.get("target_register") or indirect.get("target_register") or "target"))
return f"0x{base:04X}, {index_register}, {target_register}"
def _target_label(ins: JsonObject, labels: dict[int, str]) -> str:
targets = ins.get("targets", [])
if targets:
@@ -562,6 +900,30 @@ def _metadata_comments(ins: JsonObject) -> list[str]:
if isinstance(inference, dict) and inference.get("comment"):
comments.append(str(inference["comment"]))
indirect = ins.get("indirect_flow")
if isinstance(indirect, dict) and indirect.get("summary"):
comments.append(str(indirect["summary"]))
dataflow = ins.get("dataflow")
if isinstance(dataflow, dict):
changes = dataflow.get("changes")
if isinstance(changes, list):
known_changes = [_dataflow_change_comment(change) for change in changes if isinstance(change, dict)]
known_changes = [change for change in known_changes if change]
if known_changes:
suffix = " ..." if len(known_changes) > 4 else ""
comments.append("dataflow " + ", ".join(known_changes[:4]) + suffix)
refs = []
for ref in ins.get("references", []):
if not isinstance(ref, dict):
continue
symbol = ref.get("symbol") or ref.get("name")
if symbol:
refs.append(str(symbol))
if refs:
comments.append("refs " + ", ".join(refs))
for access in ins.get("peripheral_access", []):
if not isinstance(access, dict):
continue
@@ -574,6 +936,16 @@ def _metadata_comments(ins: JsonObject) -> list[str]:
return comments
def _dataflow_change_comment(change: JsonObject) -> str:
after = change.get("after")
if not isinstance(after, dict) or not after.get("known"):
return ""
width = int(after.get("width", 16))
value = int(after["value"])
digits = 2 if width <= 8 else 4
return f"{change['name']}=0x{value:0{digits}X}"
def _instruction_text(ins: JsonObject) -> str:
mnemonic = str(ins.get("mnemonic", ""))
operands = str(ins.get("operands", ""))

View File

@@ -4,8 +4,10 @@ import json
from pathlib import Path
from .cycles import cycle_comment
from .dataflow import state_for_instruction
from .dtc import DtcEndpointInfo, DtcRegisterInfo
from .formatting import h16, label_for
from .indirect import indirect_comment_for_instruction, indirect_metadata_for_instruction
from .memory import MEMORY_REGIONS, region_for
from .model import Instruction
from .peripheral_access import (
@@ -15,6 +17,7 @@ from .peripheral_access import (
)
from .rom import Rom
from .sci import sci_comment_for_instruction, sci_json_payload, sci_metadata_for_instruction
from .symbols import symbol_for_address
from .tables import IO_REGISTERS
from .timing import format_timing_summary
from .vectors import DtcVectorEntry
@@ -55,15 +58,66 @@ def _dtc_register_lines(vector_addr: int, entry: DtcVectorEntry, info: DtcRegist
return lines
def _reference_comment(ins: Instruction) -> str:
def _reference_comment(ins: Instruction, symbols: dict[str, object] | None = None) -> str:
parts: list[str] = []
for address in ins.references:
region = region_for(address)
name = IO_REGISTERS.get(address, h16(address))
name = symbol_for_address(symbols, address) or IO_REGISTERS.get(address, h16(address))
parts.append(f"{name} in {region.name}")
return "refs " + ", ".join(parts) if parts else ""
def _symbol_lines(symbols: dict[str, object] | None) -> list[str]:
if not symbols:
return []
entries = symbols.get("symbols", [])
if not isinstance(entries, list) or not entries:
return []
lines = ["; Symbols"]
for item in entries[:80]:
if not isinstance(item, dict):
continue
address = int(item["address"])
width = item.get("width") or "unknown"
line = (
f"; {item['name']:<16} {h16(address)} {item['region']:<18} {item['kind']:<8} "
f"r={item['read_count']} w={item['write_count']} width={width}"
)
if item.get("xref_count"):
line += f" xrefs={item['xref_count']}"
lines.append(line)
if len(entries) > 80:
lines.append(f"; ... {len(entries) - 80} more symbols omitted from listing header")
lines.append("")
return lines
def _known_change_text(change: dict[str, object]) -> str:
after = change.get("after")
if not isinstance(after, dict) or not after.get("known"):
return ""
value = int(after["value"])
width = int(after.get("width", 16))
digits = 2 if width <= 8 else 4
return f"{change['name']}=H'{value:0{digits}X}"
def _dataflow_comment(analysis: dict[str, object] | None, address: int) -> str:
record = state_for_instruction(analysis, address)
if not record:
return ""
changes = record.get("changes")
if not isinstance(changes, list):
return ""
parts = [_known_change_text(change) for change in changes if isinstance(change, dict)]
parts = [part for part in parts if part]
if not parts:
return ""
suffix = " ..." if len(parts) > 4 else ""
return "dataflow " + ", ".join(parts[:4]) + suffix
def format_listing(
rom_path: Path,
rom: Rom,
@@ -78,6 +132,9 @@ def format_listing(
show_cycles: bool = False,
sci_analysis: dict[str, object] | None = None,
peripheral_access: dict[str, object] | None = None,
indirect_flow: dict[str, object] | None = None,
dataflow: dict[str, object] | None = None,
symbols: dict[str, object] | None = None,
) -> str:
lines: list[str] = []
lines.append("; H8/536 ROM disassembly")
@@ -134,6 +191,8 @@ def format_listing(
)
lines.append("")
lines.extend(_symbol_lines(symbols))
if timing_summary:
lines.extend(format_timing_summary(timing_summary))
@@ -150,7 +209,9 @@ def format_listing(
ins.comment,
sci_comment_for_instruction(sci_analysis, address),
peripheral_comment_for_instruction(peripheral_access, address),
_reference_comment(ins) if not ins.comment else "",
indirect_comment_for_instruction(indirect_flow, address),
_dataflow_comment(dataflow, address),
_reference_comment(ins, symbols) if not ins.comment else "",
cycle_comment(ins.cycles) if show_cycles else "",
)
if part
@@ -172,6 +233,9 @@ def write_json(
timing_summary: dict[str, list[dict[str, object]]] | None = None,
sci_analysis: dict[str, object] | None = None,
peripheral_access: dict[str, object] | None = None,
indirect_flow: dict[str, object] | None = None,
dataflow: dict[str, object] | None = None,
symbols: dict[str, object] | None = None,
) -> None:
payload = {
"vectors": [
@@ -194,18 +258,67 @@ def write_json(
"timing_summary": timing_summary or {"blocks": [], "loops": []},
"sci": sci_json_payload(sci_analysis),
"peripheral_access": peripheral_json_payload(peripheral_access),
"indirect_flow": indirect_flow or {"sites": []},
"dataflow": _dataflow_json_payload(dataflow),
"symbols": symbols or {"symbols": [], "by_address": {}},
"instructions": [
_instruction_payload(ins, sci_analysis, peripheral_access)
_instruction_payload(ins, sci_analysis, peripheral_access, indirect_flow, dataflow, symbols)
for ins in (instructions[addr] for addr in sorted(instructions))
],
}
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def _dataflow_json_payload(dataflow: dict[str, object] | None) -> dict[str, object]:
if not dataflow:
return {"blocks": [], "registers": [], "control_registers": []}
return {
"blocks": dataflow.get("blocks", []),
"registers": dataflow.get("registers", []),
"control_registers": dataflow.get("control_registers", []),
}
def _compact_known_values(state: object) -> dict[str, dict[str, object]]:
if not isinstance(state, dict):
return {}
compact: dict[str, dict[str, object]] = {}
for group_name in ("registers", "control"):
group = state.get(group_name)
if not isinstance(group, dict):
continue
values = {
name: value
for name, value in group.items()
if isinstance(value, dict) and value.get("known")
}
if values:
compact[group_name] = values
return compact
def _dataflow_instruction_payload(dataflow: dict[str, object] | None, address: int) -> dict[str, object]:
record = state_for_instruction(dataflow, address)
if not record:
return {}
payload: dict[str, object] = {
"block": record.get("block"),
"changes": record.get("changes", []),
"notes": record.get("notes", []),
}
known_after = _compact_known_values(record.get("after"))
if known_after:
payload["known_after"] = known_after
return payload
def _instruction_payload(
ins: Instruction,
sci_analysis: dict[str, object] | None = None,
peripheral_access: dict[str, object] | None = None,
indirect_flow: dict[str, object] | None = None,
dataflow: dict[str, object] | None = None,
symbols: dict[str, object] | None = None,
) -> dict[str, object]:
payload: dict[str, object] = {
"address": ins.address,
@@ -221,6 +334,7 @@ def _instruction_payload(
{
"address": address,
"name": IO_REGISTERS.get(address),
"symbol": symbol_for_address(symbols, address),
"region": region_for(address).name,
"kind": region_for(address).kind,
}
@@ -235,6 +349,12 @@ def _instruction_payload(
peripheral_metadata = peripheral_metadata_for_instruction(peripheral_access, ins.address)
if peripheral_metadata:
payload["peripheral_access"] = peripheral_metadata
indirect_metadata = indirect_metadata_for_instruction(indirect_flow, ins.address)
if indirect_metadata:
payload["indirect_flow"] = indirect_metadata
dataflow_metadata = _dataflow_instruction_payload(dataflow, ins.address)
if dataflow_metadata:
payload["dataflow"] = dataflow_metadata
return payload

380
h8536/symbols.py Normal file
View File

@@ -0,0 +1,380 @@
from __future__ import annotations
from collections.abc import Iterable, Mapping, Sequence
from dataclasses import dataclass, field
from .memory import MEMORY_REGIONS, MemoryRegion, region_for
from .model import Instruction
from .tables import IO_REGISTERS
READ_ONLY_ROOTS = {"BTST", "CMP:E", "CMP:G", "CMP:I", "MOVFPE", "TST"}
WRITE_ONLY_ROOTS = {"CLR", "MOVTPE", "STC"}
READ_MODIFY_WRITE_ROOTS = {"ADD:Q", "BCLR", "BNOT", "BSET", "NEG", "NOT", "TAS"}
DESTINATION_UPDATE_ROOTS = {
"ADD:G",
"ADDS",
"ADDX",
"AND",
"OR",
"ROTL",
"ROTR",
"ROTXL",
"ROTXR",
"SHAL",
"SHAR",
"SHLL",
"SHLR",
"SUB",
"SUBS",
"SUBX",
"XOR",
}
MOV_ROOTS = {"MOV:E", "MOV:F", "MOV:G", "MOV:I", "MOV:L", "MOV:S"}
@dataclass
class _Symbol:
address: int
name: str
region: str
kind: str
access_count: int = 0
read_count: int = 0
write_count: int = 0
unknown_count: int = 0
widths: set[str] = field(default_factory=set)
first_access: int | None = None
last_access: int | None = None
accesses: list[dict[str, object]] = field(default_factory=list)
xrefs: list[dict[str, object]] = field(default_factory=list)
def record_access(self, access: Mapping[str, object]) -> None:
instruction_address = int(access["instruction_address"])
direction = str(access["direction"])
width = access.get("width")
self.access_count += 1
if direction == "read":
self.read_count += 1
elif direction == "write":
self.write_count += 1
elif direction == "read_write":
self.read_count += 1
self.write_count += 1
else:
self.unknown_count += 1
if isinstance(width, str):
self.widths.add(width)
if self.first_access is None or instruction_address < self.first_access:
self.first_access = instruction_address
if self.last_access is None or instruction_address > self.last_access:
self.last_access = instruction_address
self.accesses.append(dict(access))
def record_xref(self, xref: Mapping[str, object]) -> None:
self.xrefs.append(dict(xref))
def discover_symbols(
instructions: Mapping[int, Instruction] | Iterable[Instruction],
regions: Sequence[MemoryRegion | Mapping[str, object]] | None = None,
*,
include_registers: bool = False,
data_candidates: Mapping[str, object] | None = None,
) -> dict[str, object]:
"""Discover conservative memory symbols from decoded instruction references.
The analyzer is intentionally standalone: it consumes decoded instructions and
region metadata, then returns a JSON-friendly payload that later renderers can
use without changing decode semantics.
"""
active_regions = tuple(regions or MEMORY_REGIONS)
symbols: dict[int, _Symbol] = {}
for ins in _instruction_sequence(instructions):
for access in instruction_accesses(ins):
address = int(access["address"])
region = _region_for(address, active_regions)
if _skip_region(region, include_registers):
continue
symbol = symbols.setdefault(address, _new_symbol(address, region))
symbol.record_access(access)
_record_data_candidate_xrefs(symbols, data_candidates, active_regions, include_registers)
public_symbols = [_public_symbol(symbols[address]) for address in sorted(symbols)]
return {
"symbols": public_symbols,
"by_address": {address: symbols[address].name for address in sorted(symbols)},
}
def instruction_accesses(ins: Instruction) -> list[dict[str, object]]:
"""Return per-reference access metadata for one instruction."""
if not ins.references:
return []
refs = list(ins.references)
operands = _split_operands(ins.operands)
width = _width_hint(ins.mnemonic)
accesses: list[dict[str, object]] = []
for address in refs:
operand_index = _operand_index_for_reference(address, refs, operands)
direction = _direction_for_reference(ins.mnemonic, operands, operand_index, len(refs))
access: dict[str, object] = {
"address": address,
"instruction_address": ins.address,
"instruction": ins.text,
"mnemonic": ins.mnemonic,
"direction": direction,
}
if width is not None:
access["width"] = width
if operand_index is not None:
access["operand"] = operands[operand_index]
access["operand_index"] = operand_index
accesses.append(access)
return accesses
def symbol_for_address(analysis: Mapping[str, object] | None, address: int) -> str | None:
if not analysis:
return None
by_address = analysis.get("by_address")
if not isinstance(by_address, Mapping):
return None
symbol = by_address.get(address)
return str(symbol) if symbol else None
def _instruction_sequence(
instructions: Mapping[int, Instruction] | Iterable[Instruction],
) -> list[Instruction]:
values = instructions.values() if isinstance(instructions, Mapping) else instructions
return sorted(values, key=lambda item: item.address)
def _new_symbol(address: int, region: MemoryRegion) -> _Symbol:
return _Symbol(
address=address,
name=_symbol_name(address, region),
region=region.name,
kind=_symbol_kind(region),
)
def _public_symbol(symbol: _Symbol) -> dict[str, object]:
widths = sorted(symbol.widths, key=lambda item: ("byte", "word").index(item) if item in {"byte", "word"} else 99)
payload: dict[str, object] = {
"address": symbol.address,
"name": symbol.name,
"region": symbol.region,
"kind": symbol.kind,
"access_count": symbol.access_count,
"read_count": symbol.read_count,
"write_count": symbol.write_count,
"unknown_count": symbol.unknown_count,
"width_hints": widths,
"width": widths[0] if len(widths) == 1 else "mixed" if widths else None,
"first_access": symbol.first_access,
"last_access": symbol.last_access,
"accesses": sorted(symbol.accesses, key=lambda item: int(item["instruction_address"])),
}
if symbol.xrefs:
payload["xref_count"] = len(symbol.xrefs)
payload["xrefs"] = sorted(symbol.xrefs, key=lambda item: (str(item["source"]), int(item["address"])))
return payload
def _symbol_name(address: int, region: MemoryRegion) -> str:
if region.kind == "registers":
return IO_REGISTERS.get(address, f"io_{_hex_address(address)}")
if region.kind == "ram":
return f"ram_{_hex_address(address)}"
return f"mem_{_hex_address(address)}"
def _symbol_kind(region: MemoryRegion) -> str:
if region.kind == "registers":
return "register"
if region.kind == "ram":
return "ram"
return "memory"
def _hex_address(address: int) -> str:
width = 4 if address <= 0xFFFF else 6
return f"{address:0{width}X}"
def _skip_region(region: MemoryRegion, include_registers: bool) -> bool:
return region.kind == "registers" and not include_registers
def _region_for(address: int, regions: Sequence[MemoryRegion | Mapping[str, object]]) -> MemoryRegion:
for item in regions:
region = _coerce_region(item)
if region.contains(address):
return region
return region_for(address)
def _coerce_region(item: MemoryRegion | Mapping[str, object]) -> MemoryRegion:
if isinstance(item, MemoryRegion):
return item
return MemoryRegion(
str(item["name"]),
int(item["start"]),
int(item["end"]),
str(item["kind"]),
str(item.get("manual", "")),
)
def _record_data_candidate_xrefs(
symbols: dict[int, _Symbol],
data_candidates: Mapping[str, object] | None,
regions: Sequence[MemoryRegion | Mapping[str, object]],
include_registers: bool,
) -> None:
if not data_candidates:
return
pointer_tables = data_candidates.get("pointer_tables")
if not isinstance(pointer_tables, Iterable):
return
for table in pointer_tables:
if not isinstance(table, Mapping):
continue
source_address = table.get("address")
targets = table.get("targets")
if not isinstance(source_address, int) or not isinstance(targets, Iterable):
continue
for target in targets:
if not isinstance(target, int):
continue
region = _region_for(target, regions)
if _skip_region(region, include_registers):
continue
symbol = symbols.setdefault(target, _new_symbol(target, region))
symbol.record_xref(
{
"source": "pointer_table",
"address": source_address,
"target": target,
},
)
def _direction_for_reference(
mnemonic: str,
operands: Sequence[str],
operand_index: int | None,
reference_count: int,
) -> str:
root = _mnemonic_root(mnemonic)
destination_index = len(operands) - 1 if operands else None
if root in READ_ONLY_ROOTS:
return "read"
if root in READ_MODIFY_WRITE_ROOTS:
return "read_write"
if root in WRITE_ONLY_ROOTS:
if root == "STC" and operand_index not in (None, destination_index):
return "unknown"
return "write"
if root == "LDC":
return "read" if operand_index in (None, 0) else "unknown"
if root in MOV_ROOTS:
return _source_or_destination_direction(operands, operand_index, reference_count)
if root in DESTINATION_UPDATE_ROOTS:
if operand_index is None:
return "unknown"
return "read_write" if operand_index == destination_index else "read"
return "unknown"
def _source_or_destination_direction(
operands: Sequence[str],
operand_index: int | None,
reference_count: int,
) -> str:
if not operands:
return "unknown"
destination_index = len(operands) - 1
if operand_index is not None:
return "write" if operand_index == destination_index else "read"
memory_indexes = [index for index, operand in enumerate(operands) if _is_memory_operand(operand)]
if reference_count == 1 and len(memory_indexes) == 1:
return "write" if memory_indexes[0] == destination_index else "read"
return "unknown"
def _operand_index_for_reference(
address: int,
refs: Sequence[int],
operands: Sequence[str],
) -> int | None:
matches = [index for index, operand in enumerate(operands) if _operand_mentions_address(operand, address)]
if len(matches) == 1:
return matches[0]
memory_indexes = [index for index, operand in enumerate(operands) if _is_memory_operand(operand)]
if len(refs) == 1 and len(memory_indexes) == 1:
return memory_indexes[0]
if len(refs) == len(memory_indexes):
try:
return memory_indexes[refs.index(address)]
except ValueError:
return None
return None
def _operand_mentions_address(operand: str, address: int) -> bool:
normalized = operand.upper()
if f"H'{address & 0xFFFF:04X}" in normalized:
return True
if address in IO_REGISTERS and IO_REGISTERS[address].upper() in normalized:
return True
return False
def _is_memory_operand(operand: str) -> bool:
return operand.strip().startswith("@")
def _split_operands(operands: str) -> list[str]:
parts: list[str] = []
current: list[str] = []
depth = 0
for char in operands:
if char in "({":
depth += 1
elif char in ")}" and depth:
depth -= 1
if char == "," and depth == 0:
parts.append("".join(current).strip())
current = []
continue
current.append(char)
if current or operands:
parts.append("".join(current).strip())
return [part for part in parts if part]
def _mnemonic_root(mnemonic: str) -> str:
return mnemonic.rsplit(".", 1)[0]
def _width_hint(mnemonic: str) -> str | None:
if mnemonic.endswith(".B"):
return "byte"
if mnemonic.endswith(".W"):
return "word"
return None

108
tests/test_dataflow.py Normal file
View File

@@ -0,0 +1,108 @@
import unittest
from h8536.dataflow import analyze_dataflow, state_for_instruction
from h8536.model import Instruction
def reg_after(analysis, address, register):
return analysis["instructions"][address]["after"]["registers"][register]
def reg_before(analysis, address, register):
return analysis["instructions"][address]["before"]["registers"][register]
def control_after(analysis, address, register):
return analysis["instructions"][address]["after"]["control"][register]
class DataflowTest(unittest.TestCase):
def test_tracks_immediate_load_copy_and_simple_arithmetic(self):
instructions = {
0x0100: Instruction(0x0100, b"\x58\x02\x00", "MOV:I.W", "#H'0200, R0"),
0x0103: Instruction(0x0103, b"\xA0\x81", "MOV:G.W", "R0, R1"),
0x0105: Instruction(0x0105, b"\xA1\x08", "ADD:Q.W", "#1, R1"),
0x0107: Instruction(0x0107, b"\x0C\x00\x02\x31", "SUB.W", "#H'0002, R1"),
}
analysis = analyze_dataflow(instructions)
self.assertEqual(reg_after(analysis, 0x0100, "R0")["value"], 0x0200)
self.assertEqual(reg_after(analysis, 0x0100, "R0")["width"], 16)
self.assertEqual(reg_before(analysis, 0x0103, "R0")["value"], 0x0200)
self.assertEqual(reg_after(analysis, 0x0103, "R1")["value"], 0x0200)
self.assertEqual(reg_after(analysis, 0x0105, "R1")["value"], 0x0201)
self.assertEqual(reg_after(analysis, 0x0107, "R1")["value"], 0x01FF)
def test_tracks_byte_immediates_without_promising_word_width(self):
instructions = {
0x0200: Instruction(0x0200, b"\x52\x7F", "MOV:E.B", "#H'7F, R2"),
0x0202: Instruction(0x0202, b"\xA2\x83", "MOV:G.B", "R2, R3"),
0x0204: Instruction(0x0204, b"\x58\x20\x00", "MOV:I.W", "#H'2000, R0"),
0x0207: Instruction(0x0207, b"\xD0\x84", "MOV:G.W", "@R0, R4"),
}
analysis = analyze_dataflow(instructions)
self.assertEqual(reg_after(analysis, 0x0200, "R2")["value"], 0x7F)
self.assertEqual(reg_after(analysis, 0x0200, "R2")["width"], 8)
self.assertEqual(reg_after(analysis, 0x0202, "R3")["value"], 0x7F)
self.assertEqual(reg_after(analysis, 0x0202, "R3")["width"], 8)
self.assertEqual(reg_after(analysis, 0x0207, "R0")["value"], 0x2000)
self.assertFalse(reg_after(analysis, 0x0207, "R4")["known"])
self.assertEqual(reg_after(analysis, 0x0207, "R4")["reason"], "memory_load")
def test_calls_and_ambiguous_branches_do_not_leak_known_state(self):
instructions = {
0x0300: Instruction(0x0300, b"\x58\x12\x34", "MOV:I.W", "#H'1234, R0"),
0x0303: Instruction(0x0303, b"\x26\x03", "BNE", "loc_0308", kind="branch", targets=[0x0308]),
0x0305: Instruction(0x0305, b"\xA0\x08", "ADD:Q.W", "#1, R0"),
0x0308: Instruction(0x0308, b"\xA0\x08", "ADD:Q.W", "#1, R0"),
0x030A: Instruction(0x030A, b"\x18\x04\x00", "JSR", "@loc_0400", kind="call", targets=[0x0400]),
0x030D: Instruction(0x030D, b"\xA0\x08", "ADD:Q.W", "#1, R0"),
}
analysis = analyze_dataflow(instructions)
self.assertFalse(reg_before(analysis, 0x0305, "R0")["known"])
self.assertEqual(reg_before(analysis, 0x0305, "R0")["reason"], "block_entry")
self.assertFalse(reg_before(analysis, 0x0308, "R0")["known"])
self.assertEqual(reg_before(analysis, 0x0308, "R0")["reason"], "block_entry")
self.assertFalse(reg_after(analysis, 0x030A, "R0")["known"])
self.assertEqual(reg_after(analysis, 0x030A, "R0")["reason"], "call")
self.assertFalse(reg_before(analysis, 0x030D, "R0")["known"])
def test_tracks_control_register_loads_and_stc_copies(self):
instructions = {
0x0400: Instruction(
0x0400,
b"\x04\xFE\x89",
"LDC.B",
"#H'FE, BR",
writes_br=True,
br_value=0xFE,
),
0x0403: Instruction(0x0403, b"\xA0\x99", "STC.B", "BR, R1"),
0x0405: Instruction(0x0405, b"\x04\x01\x48", "ORC.B", "#H'01, CCR"),
}
analysis = analyze_dataflow(instructions)
self.assertEqual(control_after(analysis, 0x0400, "BR")["value"], 0xFE)
self.assertEqual(control_after(analysis, 0x0400, "BR")["width"], 8)
self.assertEqual(reg_after(analysis, 0x0403, "R1")["value"], 0xFE)
self.assertFalse(control_after(analysis, 0x0405, "CCR")["known"])
def test_state_lookup_helper_returns_instruction_record(self):
instructions = {
0x0500: Instruction(0x0500, b"\x58\x00\x01", "MOV:I.W", "#H'0001, R0"),
}
analysis = analyze_dataflow(instructions)
self.assertEqual(state_for_instruction(analysis, 0x0500)["after"]["registers"]["R0"]["value"], 1)
self.assertEqual(state_for_instruction(analysis, 0x9999), {})
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,72 @@
import unittest
from h8536.indirect import analyze_indirect_flow, indirect_comment_for_instruction
from h8536.model import Instruction
from h8536.render import format_listing, write_json
from h8536.rom import Rom
import json
import tempfile
from pathlib import Path
class IndirectFlowTest(unittest.TestCase):
def test_detects_indexed_pointer_table_before_indirect_jump(self):
data = bytearray([0xFF] * 0x240)
data[0x0200:0x0206] = bytes.fromhex("01200300FFFF")
instructions = {
0x0100: Instruction(0x0100, b"", "MOV:G.W", "@(H'0200,R4), R1"),
0x0104: Instruction(0x0104, b"", "JMP", "@R1", kind="jump", fallthrough=False),
0x0120: Instruction(0x0120, b"\x19", "RTS", kind="return", fallthrough=False),
0x0300: Instruction(0x0300, b"\x19", "RTS", kind="return", fallthrough=False),
}
analysis = analyze_indirect_flow(Rom(bytes(data)), instructions, {0x0120: "loc_0120"})
site = analysis["sites"][0]
self.assertEqual(site["address"], 0x0104)
self.assertEqual(site["target_register"], "R1")
self.assertEqual(site["table"]["base"], 0x0200)
self.assertEqual(site["table"]["entry_count"], 2)
self.assertEqual(site["table"]["decoded_target_count"], 2)
self.assertIn("pointer table H'0200", indirect_comment_for_instruction(analysis, 0x0104))
def test_records_unknown_indirect_call_without_prior_table_load(self):
instructions = {
0x0100: Instruction(0x0100, b"", "JSR", "@R0", kind="call"),
}
analysis = analyze_indirect_flow(Rom(bytes([0xFF] * 0x200)), instructions)
self.assertEqual(analysis["sites"][0]["confidence"], "unknown")
self.assertIn("target not resolved", analysis["sites"][0]["summary"])
def test_listing_and_json_include_indirect_flow_metadata(self):
instructions = {
0x0100: Instruction(0x0100, b"", "JSR", "@R0", kind="call"),
}
analysis = analyze_indirect_flow(Rom(bytes([0xFF] * 0x200)), instructions)
listing = format_listing(
Path("rom.bin"),
Rom(bytes([0xFF] * 0x200)),
instructions,
{},
{},
"min",
traced=True,
indirect_flow=analysis,
)
self.assertIn("target not resolved", listing)
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "out.json"
write_json(path, instructions, {}, {}, indirect_flow=analysis)
payload = json.loads(path.read_text(encoding="utf-8"))
self.assertEqual(payload["indirect_flow"]["sites"][0]["address"], 0x0100)
self.assertEqual(payload["instructions"][0]["indirect_flow"]["confidence"], "unknown")
if __name__ == "__main__":
unittest.main()

View File

@@ -109,7 +109,7 @@ class PseudocodeTest(unittest.TestCase):
],
}
text = generate_pseudocode(payload, options=PseudocodeOptions())
text = generate_pseudocode(payload, options=PseudocodeOptions(structured=False))
self.assertIn("void vec_reset_0100(void)", text)
self.assertIn("P1DDR = (uint8_t)(0xFF);", text)

View File

@@ -0,0 +1,132 @@
import unittest
from h8536.pseudocode import PseudocodeOptions, generate_pseudocode
def _instruction(
address,
mnemonic,
operands="",
*,
kind="normal",
targets=None,
text=None,
):
return {
"address": address,
"text": text or f"{mnemonic} {operands}".strip(),
"mnemonic": mnemonic,
"operands": operands,
"kind": kind,
"targets": list(targets or []),
"references": [],
"comment": "",
}
def _payload(instructions):
start = min(ins["address"] for ins in instructions)
end = max(ins["address"] for ins in instructions)
return {
"vectors": [],
"call_graph": {
"nodes": [
{
"start": start,
"end": end,
"label": f"loc_{start:04X}",
"sources": [],
"instruction_count": len(instructions),
"calls": [],
}
],
"edges": [],
},
"instructions": instructions,
}
def _options(**overrides):
values = {
"include_asm": False,
"include_addresses": False,
"emit_declarations": False,
}
values.update(overrides)
return PseudocodeOptions(**values)
class PseudocodeStructuringTest(unittest.TestCase):
def test_backward_conditional_branch_becomes_do_while(self):
payload = _payload(
[
_instruction(0x0100, "MOV.B", "#H'00, R0"),
_instruction(0x0102, "ADD.B", "#H'01, R0"),
_instruction(0x0104, "CMP.B", "#H'03, R0"),
_instruction(0x0106, "BNE", "loc_0102", kind="branch", targets=[0x0102]),
_instruction(0x0108, "RTS", kind="return"),
]
)
text = generate_pseudocode(payload, options=_options())
self.assertIn("do {", text)
self.assertIn("} while (!Z);", text)
self.assertNotIn("goto loc_0102;", text)
self.assertNotIn("loc_0102:", text)
def test_forward_conditional_branch_over_small_span_becomes_if(self):
payload = _payload(
[
_instruction(0x0100, "CMP.B", "#H'00, R0"),
_instruction(0x0102, "BEQ", "loc_0108", kind="branch", targets=[0x0108]),
_instruction(0x0104, "MOV.B", "#H'01, R1"),
_instruction(0x0106, "ADD.B", "#H'02, R1"),
_instruction(0x0108, "RTS", kind="return"),
]
)
text = generate_pseudocode(payload, options=_options())
self.assertIn("if (!Z) {", text)
self.assertIn("R1 = (uint8_t)(0x01);", text)
self.assertIn("R1 += (uint8_t)(0x02);", text)
self.assertNotIn("goto loc_0108;", text)
self.assertNotIn("loc_0108:", text)
def test_structuring_can_be_disabled(self):
payload = _payload(
[
_instruction(0x0100, "CMP.B", "#H'00, R0"),
_instruction(0x0102, "BEQ", "loc_0108", kind="branch", targets=[0x0108]),
_instruction(0x0104, "MOV.B", "#H'01, R1"),
_instruction(0x0108, "RTS", kind="return"),
]
)
text = generate_pseudocode(payload, options=_options(structured=False))
self.assertIn("if (Z) goto loc_0108;", text)
self.assertIn("loc_0108:", text)
self.assertNotIn("if (!Z) {", text)
def test_ambiguous_forward_branch_keeps_goto_fallback(self):
payload = _payload(
[
_instruction(0x0100, "BEQ", "loc_0108", kind="branch", targets=[0x0108]),
_instruction(0x0102, "MOV.B", "#H'01, R1"),
_instruction(0x0104, "BRA", "loc_0108", kind="jump", targets=[0x0108]),
_instruction(0x0108, "RTS", kind="return"),
]
)
text = generate_pseudocode(payload, options=_options())
self.assertIn("if (Z) goto loc_0108;", text)
self.assertIn("goto loc_0108;", text)
self.assertIn("loc_0108:", text)
self.assertNotIn("if (!Z) {", text)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,59 @@
import json
import tempfile
import unittest
from pathlib import Path
from h8536.dataflow import analyze_dataflow
from h8536.model import Instruction
from h8536.render import format_listing, write_json
from h8536.rom import Rom
from h8536.symbols import discover_symbols
class RenderAnalysisIntegrationTest(unittest.TestCase):
def test_listing_and_json_include_symbols_and_compact_dataflow(self):
instructions = {
0x0100: Instruction(0x0100, b"\x58\x12\x34", "MOV:I.W", "#H'1234, R0"),
0x0103: Instruction(
0x0103,
b"\x1D\xF6\x80\x90",
"MOV:G.W",
"R0, @H'F680",
references=[0xF680],
),
}
dataflow = analyze_dataflow(instructions)
symbols = discover_symbols(instructions)
rom = Rom(bytes([0xFF] * 0x200))
listing = format_listing(
Path("rom.bin"),
rom,
instructions,
{},
{},
"min",
traced=True,
dataflow=dataflow,
symbols=symbols,
)
self.assertIn("; Symbols", listing)
self.assertIn("ram_F680", listing)
self.assertIn("dataflow R0=H'1234", listing)
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "out.json"
write_json(path, instructions, {}, {}, dataflow=dataflow, symbols=symbols)
payload = json.loads(path.read_text(encoding="utf-8"))
self.assertEqual(payload["symbols"]["symbols"][0]["name"], "ram_F680")
self.assertEqual(payload["instructions"][1]["references"][0]["symbol"], "ram_F680")
dataflow_payload = payload["instructions"][0]["dataflow"]
self.assertEqual(dataflow_payload["changes"][0]["name"], "R0")
self.assertEqual(dataflow_payload["known_after"]["registers"]["R0"]["value"], 0x1234)
self.assertNotIn("before", dataflow_payload)
if __name__ == "__main__":
unittest.main()

123
tests/test_symbols.py Normal file
View File

@@ -0,0 +1,123 @@
import unittest
from h8536.model import Instruction
from h8536.symbols import discover_symbols, instruction_accesses, symbol_for_address
def ins(address, mnemonic, operands="", references=None):
return Instruction(
address,
b"\x00",
mnemonic,
operands,
references=list(references or []),
)
class SymbolDiscoveryTest(unittest.TestCase):
def test_discovers_ram_symbol_counts_direction_and_widths(self):
instructions = {
0x1000: ins(0x1000, "MOV:G.B", "#H'12, @H'F680", [0xF680]),
0x1004: ins(0x1004, "CMP:G.B", "#H'01, @H'F680", [0xF680]),
0x1008: ins(0x1008, "ADD:Q.W", "#1, @H'F680", [0xF680]),
}
analysis = discover_symbols(instructions)
symbols = analysis["symbols"]
self.assertEqual(len(symbols), 1)
symbol = symbols[0]
self.assertEqual(symbol["address"], 0xF680)
self.assertEqual(symbol["name"], "ram_F680")
self.assertEqual(symbol["region"], "on_chip_ram")
self.assertEqual(symbol["kind"], "ram")
self.assertEqual(symbol["access_count"], 3)
self.assertEqual(symbol["read_count"], 2)
self.assertEqual(symbol["write_count"], 2)
self.assertEqual(symbol["unknown_count"], 0)
self.assertEqual(symbol["width_hints"], ["byte", "word"])
self.assertEqual(symbol["width"], "mixed")
self.assertEqual(symbol["first_access"], 0x1000)
self.assertEqual(symbol["last_access"], 0x1008)
self.assertEqual(symbol_for_address(analysis, 0xF680), "ram_F680")
def test_names_program_or_external_memory_and_excludes_registers_by_default(self):
instructions = [
ins(0x2000, "MOV:G.W", "@H'1234, R1", [0x1234]),
ins(0x2004, "MOV:G.B", "#H'80, @RAMCR", [0xFF11]),
]
analysis = discover_symbols(instructions)
self.assertEqual([symbol["name"] for symbol in analysis["symbols"]], ["mem_1234"])
symbol = analysis["symbols"][0]
self.assertEqual(symbol["region"], "program_or_external")
self.assertEqual(symbol["kind"], "memory")
self.assertEqual(symbol["read_count"], 1)
self.assertIsNone(symbol_for_address(analysis, 0xFF11))
def test_can_include_io_register_symbols_when_requested(self):
instructions = [
ins(0x2004, "MOV:G.B", "#H'80, @RAMCR", [0xFF11]),
]
analysis = discover_symbols(instructions, include_registers=True)
self.assertEqual(len(analysis["symbols"]), 1)
symbol = analysis["symbols"][0]
self.assertEqual(symbol["address"], 0xFF11)
self.assertEqual(symbol["name"], "RAMCR")
self.assertEqual(symbol["region"], "register_field")
self.assertEqual(symbol["kind"], "register")
self.assertEqual(symbol["write_count"], 1)
def test_bit_and_clear_operations_use_conservative_directions(self):
instructions = [
ins(0x3000, "BSET.B", "#4, @H'F690", [0xF690]),
ins(0x3002, "BCLR.B", "#4, @H'F690", [0xF690]),
ins(0x3004, "TST.B", "@H'F690", [0xF690]),
ins(0x3006, "CLR.B", "@H'F690", [0xF690]),
]
analysis = discover_symbols(instructions)
symbol = analysis["symbols"][0]
self.assertEqual(symbol["read_count"], 3)
self.assertEqual(symbol["write_count"], 3)
self.assertEqual(
[access["direction"] for access in symbol["accesses"]],
["read_write", "read_write", "read", "write"],
)
def test_optional_pointer_table_candidates_add_xrefs_without_io_pollution(self):
instructions = [
ins(0x4000, "MOV:G.B", "@H'F680, R0", [0xF680]),
]
data_candidates = {
"pointer_tables": [
{
"address": 0x0200,
"targets": [0xF680, 0x1234, 0xFF11],
},
],
}
analysis = discover_symbols(instructions, data_candidates=data_candidates)
by_name = {symbol["name"]: symbol for symbol in analysis["symbols"]}
self.assertEqual(by_name["ram_F680"]["xref_count"], 1)
self.assertEqual(by_name["mem_1234"]["access_count"], 0)
self.assertEqual(by_name["mem_1234"]["xref_count"], 1)
self.assertNotIn("RAMCR", by_name)
def test_instruction_accesses_handles_comma_inside_displacement_operand(self):
access = instruction_accesses(
ins(0x5000, "MOV:G.B", "@(H'0010,R1), @H'F682", [0xF682]),
)
self.assertEqual(access[0]["direction"], "write")
self.assertEqual(access[0]["operand"], "@H'F682")
if __name__ == "__main__":
unittest.main()