from __future__ import annotations from collections.abc import Iterable, Mapping, Sequence from dataclasses import dataclass, field from .memory import MEMORY_REGIONS, MemoryRegion, region_for from .model import Instruction from .tables import IO_REGISTERS READ_ONLY_ROOTS = {"BTST", "CMP:E", "CMP:G", "CMP:I", "MOVFPE", "TST"} WRITE_ONLY_ROOTS = {"CLR", "MOVTPE", "STC"} READ_MODIFY_WRITE_ROOTS = {"ADD:Q", "BCLR", "BNOT", "BSET", "NEG", "NOT", "TAS"} DESTINATION_UPDATE_ROOTS = { "ADD:G", "ADDS", "ADDX", "AND", "OR", "ROTL", "ROTR", "ROTXL", "ROTXR", "SHAL", "SHAR", "SHLL", "SHLR", "SUB", "SUBS", "SUBX", "XOR", } MOV_ROOTS = {"MOV:E", "MOV:F", "MOV:G", "MOV:I", "MOV:L", "MOV:S"} @dataclass class _Symbol: address: int name: str region: str kind: str access_count: int = 0 read_count: int = 0 write_count: int = 0 unknown_count: int = 0 widths: set[str] = field(default_factory=set) first_access: int | None = None last_access: int | None = None accesses: list[dict[str, object]] = field(default_factory=list) xrefs: list[dict[str, object]] = field(default_factory=list) def record_access(self, access: Mapping[str, object]) -> None: instruction_address = int(access["instruction_address"]) direction = str(access["direction"]) width = access.get("width") self.access_count += 1 if direction == "read": self.read_count += 1 elif direction == "write": self.write_count += 1 elif direction == "read_write": self.read_count += 1 self.write_count += 1 else: self.unknown_count += 1 if isinstance(width, str): self.widths.add(width) if self.first_access is None or instruction_address < self.first_access: self.first_access = instruction_address if self.last_access is None or instruction_address > self.last_access: self.last_access = instruction_address self.accesses.append(dict(access)) def record_xref(self, xref: Mapping[str, object]) -> None: self.xrefs.append(dict(xref)) def discover_symbols( instructions: Mapping[int, Instruction] | Iterable[Instruction], regions: Sequence[MemoryRegion | Mapping[str, object]] | None = None, *, include_registers: bool = False, data_candidates: Mapping[str, object] | None = None, ) -> dict[str, object]: """Discover conservative memory symbols from decoded instruction references. The analyzer is intentionally standalone: it consumes decoded instructions and region metadata, then returns a JSON-friendly payload that later renderers can use without changing decode semantics. """ active_regions = tuple(regions or MEMORY_REGIONS) symbols: dict[int, _Symbol] = {} for ins in _instruction_sequence(instructions): for access in instruction_accesses(ins): address = int(access["address"]) region = _region_for(address, active_regions) if _skip_region(region, include_registers): continue symbol = symbols.setdefault(address, _new_symbol(address, region)) symbol.record_access(access) _record_data_candidate_xrefs(symbols, data_candidates, active_regions, include_registers) public_symbols = [_public_symbol(symbols[address]) for address in sorted(symbols)] return { "symbols": public_symbols, "by_address": {address: symbols[address].name for address in sorted(symbols)}, } def instruction_accesses(ins: Instruction) -> list[dict[str, object]]: """Return per-reference access metadata for one instruction.""" if not ins.references: return [] refs = list(ins.references) operands = _split_operands(ins.operands) width = _width_hint(ins.mnemonic) accesses: list[dict[str, object]] = [] for address in refs: operand_index = _operand_index_for_reference(address, refs, operands) direction = _direction_for_reference(ins.mnemonic, operands, operand_index, len(refs)) access: dict[str, object] = { "address": address, "instruction_address": ins.address, "instruction": ins.text, "mnemonic": ins.mnemonic, "direction": direction, } if width is not None: access["width"] = width if operand_index is not None: access["operand"] = operands[operand_index] access["operand_index"] = operand_index accesses.append(access) return accesses def symbol_for_address(analysis: Mapping[str, object] | None, address: int) -> str | None: if not analysis: return None by_address = analysis.get("by_address") if not isinstance(by_address, Mapping): return None symbol = by_address.get(address) return str(symbol) if symbol else None def _instruction_sequence( instructions: Mapping[int, Instruction] | Iterable[Instruction], ) -> list[Instruction]: values = instructions.values() if isinstance(instructions, Mapping) else instructions return sorted(values, key=lambda item: item.address) def _new_symbol(address: int, region: MemoryRegion) -> _Symbol: return _Symbol( address=address, name=_symbol_name(address, region), region=region.name, kind=_symbol_kind(region), ) def _public_symbol(symbol: _Symbol) -> dict[str, object]: widths = sorted(symbol.widths, key=lambda item: ("byte", "word").index(item) if item in {"byte", "word"} else 99) payload: dict[str, object] = { "address": symbol.address, "name": symbol.name, "region": symbol.region, "kind": symbol.kind, "access_count": symbol.access_count, "read_count": symbol.read_count, "write_count": symbol.write_count, "unknown_count": symbol.unknown_count, "width_hints": widths, "width": widths[0] if len(widths) == 1 else "mixed" if widths else None, "first_access": symbol.first_access, "last_access": symbol.last_access, "accesses": sorted(symbol.accesses, key=lambda item: int(item["instruction_address"])), } if symbol.xrefs: payload["xref_count"] = len(symbol.xrefs) payload["xrefs"] = sorted(symbol.xrefs, key=lambda item: (str(item["source"]), int(item["address"]))) return payload def _symbol_name(address: int, region: MemoryRegion) -> str: if region.kind == "registers": return IO_REGISTERS.get(address, f"io_{_hex_address(address)}") if region.kind == "ram": return f"ram_{_hex_address(address)}" return f"mem_{_hex_address(address)}" def _symbol_kind(region: MemoryRegion) -> str: if region.kind == "registers": return "register" if region.kind == "ram": return "ram" return "memory" def _hex_address(address: int) -> str: width = 4 if address <= 0xFFFF else 6 return f"{address:0{width}X}" def _skip_region(region: MemoryRegion, include_registers: bool) -> bool: return region.kind == "registers" and not include_registers def _region_for(address: int, regions: Sequence[MemoryRegion | Mapping[str, object]]) -> MemoryRegion: for item in regions: region = _coerce_region(item) if region.contains(address): return region return region_for(address) def _coerce_region(item: MemoryRegion | Mapping[str, object]) -> MemoryRegion: if isinstance(item, MemoryRegion): return item return MemoryRegion( str(item["name"]), int(item["start"]), int(item["end"]), str(item["kind"]), str(item.get("manual", "")), ) def _record_data_candidate_xrefs( symbols: dict[int, _Symbol], data_candidates: Mapping[str, object] | None, regions: Sequence[MemoryRegion | Mapping[str, object]], include_registers: bool, ) -> None: if not data_candidates: return pointer_tables = data_candidates.get("pointer_tables") if not isinstance(pointer_tables, Iterable): return for table in pointer_tables: if not isinstance(table, Mapping): continue source_address = table.get("address") targets = table.get("targets") if not isinstance(source_address, int) or not isinstance(targets, Iterable): continue for target in targets: if not isinstance(target, int): continue region = _region_for(target, regions) if _skip_region(region, include_registers): continue symbol = symbols.setdefault(target, _new_symbol(target, region)) symbol.record_xref( { "source": "pointer_table", "address": source_address, "target": target, }, ) def _direction_for_reference( mnemonic: str, operands: Sequence[str], operand_index: int | None, reference_count: int, ) -> str: root = _mnemonic_root(mnemonic) destination_index = len(operands) - 1 if operands else None if root in READ_ONLY_ROOTS: return "read" if root in READ_MODIFY_WRITE_ROOTS: return "read_write" if root in WRITE_ONLY_ROOTS: if root == "STC" and operand_index not in (None, destination_index): return "unknown" return "write" if root == "LDC": return "read" if operand_index in (None, 0) else "unknown" if root in MOV_ROOTS: return _source_or_destination_direction(operands, operand_index, reference_count) if root in DESTINATION_UPDATE_ROOTS: if operand_index is None: return "unknown" return "read_write" if operand_index == destination_index else "read" return "unknown" def _source_or_destination_direction( operands: Sequence[str], operand_index: int | None, reference_count: int, ) -> str: if not operands: return "unknown" destination_index = len(operands) - 1 if operand_index is not None: return "write" if operand_index == destination_index else "read" memory_indexes = [index for index, operand in enumerate(operands) if _is_memory_operand(operand)] if reference_count == 1 and len(memory_indexes) == 1: return "write" if memory_indexes[0] == destination_index else "read" return "unknown" def _operand_index_for_reference( address: int, refs: Sequence[int], operands: Sequence[str], ) -> int | None: matches = [index for index, operand in enumerate(operands) if _operand_mentions_address(operand, address)] if len(matches) == 1: return matches[0] memory_indexes = [index for index, operand in enumerate(operands) if _is_memory_operand(operand)] if len(refs) == 1 and len(memory_indexes) == 1: return memory_indexes[0] if len(refs) == len(memory_indexes): try: return memory_indexes[refs.index(address)] except ValueError: return None return None def _operand_mentions_address(operand: str, address: int) -> bool: normalized = operand.upper() if f"H'{address & 0xFFFF:04X}" in normalized: return True if address in IO_REGISTERS and IO_REGISTERS[address].upper() in normalized: return True return False def _is_memory_operand(operand: str) -> bool: return operand.strip().startswith("@") def _split_operands(operands: str) -> list[str]: parts: list[str] = [] current: list[str] = [] depth = 0 for char in operands: if char in "({": depth += 1 elif char in ")}" and depth: depth -= 1 if char == "," and depth == 0: parts.append("".join(current).strip()) current = [] continue current.append(char) if current or operands: parts.append("".join(current).strip()) return [part for part in parts if part] def _mnemonic_root(mnemonic: str) -> str: return mnemonic.rsplit(".", 1)[0] def _width_hint(mnemonic: str) -> str | None: if mnemonic.endswith(".B"): return "byte" if mnemonic.endswith(".W"): return "word" return None