from __future__ import annotations from collections.abc import Iterable, Mapping from .formatting import h16, parse_int from .model import Instruction SCI1_TDR_ADDRESS = 0xFEDB SCI1_RDR_ADDRESS = 0xFEDD TX_BUFFER_START = 0xF858 TX_CHECKSUM_ADDRESS = 0xF85D TX_BUFFER_END = TX_CHECKSUM_ADDRESS TX_INDEX_ADDRESS = 0xF9C2 TX_FRAME_LENGTH = 6 CHECKSUM_SEED = 0x5A RX_FRAME_START = 0xF860 RX_CHECKSUM_ADDRESS = 0xF865 RX_FRAME_END = RX_CHECKSUM_ADDRESS RX_CAPTURE_START = 0xF868 RX_CAPTURE_END = 0xF86D RX_INDEX_ADDRESS = 0xF9C3 RX_INTERBYTE_TIMEOUT_ADDRESS = 0xF9C1 RX_COMPLETE_TIMER_ADDRESS = 0xF9C5 RX_FRAME_LENGTH = 6 _BUFFER_DATA_END = TX_CHECKSUM_ADDRESS - 1 _MIN_BUFFER_REFERENCES = 3 _TX_REQUIRED_EVIDENCE = ( "tx_buffer_region", "tx_checksum_seed", "checksum_byte", "xor_checksum_chain", "initial_send_from_buffer_start", "tx_index_initialized_to_one", "tx_isr_indexed_send", "tx_index_increment", "tx_index_compare_frame_length", ) _RX_REQUIRED_EVIDENCE = ( "rx_rdr_read", "rx_indexed_store", "rx_index_increment_store", "rx_isr_compare_frame_length", "rx_complete_timer", "rx_processor_requires_six_bytes", "rx_copy_capture_to_frame_buffer", "rx_checksum_seed", "rx_xor_checksum_validation", ) def analyze_serial_reconstruction( instructions: Mapping[int, Instruction] | Iterable[Instruction], ) -> dict[str, object]: """Reconstruct conservative serial-frame candidates from independent evidence.""" ordered = _instruction_sequence(instructions) evidence = _collect_tx_evidence(ordered) + _collect_rx_evidence(ordered) candidates = [ candidate for candidate in ( _tx_candidate_from_evidence(evidence), _rx_candidate_from_evidence(evidence), ) if candidate is not None ] annotations: dict[int, list[str]] = {} instruction_metadata: dict[int, list[dict[str, object]]] = {} for candidate in candidates: for item in candidate["evidence"]: if not isinstance(item, Mapping): continue comment = _comment_for_evidence(candidate, item) for address in item.get("addresses", []): if not isinstance(address, int): continue annotations.setdefault(address, []) if comment not in annotations[address]: annotations[address].append(comment) instruction_metadata.setdefault(address, []).append( _instruction_metadata(candidate, item, address, comment), ) return { "kind": "serial_reconstruction", "candidates": candidates, "evidence": evidence, "required_evidence": { "tx": list(_TX_REQUIRED_EVIDENCE), "rx": list(_RX_REQUIRED_EVIDENCE), }, "annotations": { address: "; ".join(parts) for address, parts in sorted(annotations.items()) }, "instructions": instruction_metadata, } def serial_reconstruction_comment_for_instruction( analysis: Mapping[str, object] | None, address: int, ) -> str: if not analysis: return "" annotations = analysis.get("annotations") if not isinstance(annotations, Mapping): return "" comment = annotations.get(address) if comment is None: comment = annotations.get(str(address)) return str(comment) if comment else "" def serial_reconstruction_metadata_for_instruction( analysis: Mapping[str, object] | None, address: int, ) -> list[dict[str, object]]: if not analysis: return [] instructions = analysis.get("instructions") if not isinstance(instructions, Mapping): return [] metadata = instructions.get(address) if metadata is None: metadata = instructions.get(str(address)) return list(metadata) if isinstance(metadata, list) else [] def serial_reconstruction_json_payload(analysis: Mapping[str, object] | None) -> dict[str, object]: if not analysis: return { "kind": "serial_reconstruction", "candidates": [], "evidence": [], "required_evidence": { "tx": list(_TX_REQUIRED_EVIDENCE), "rx": list(_RX_REQUIRED_EVIDENCE), }, } return { "kind": analysis.get("kind", "serial_reconstruction"), "candidates": analysis.get("candidates", []), "evidence": analysis.get("evidence", []), "required_evidence": analysis.get( "required_evidence", {"tx": list(_TX_REQUIRED_EVIDENCE), "rx": list(_RX_REQUIRED_EVIDENCE)}, ), } def _collect_tx_evidence(ordered: list[Instruction]) -> list[dict[str, object]]: evidence: list[dict[str, object]] = [] buffer_references = _buffer_region_references(ordered) if len({ref for ins in buffer_references for ref in _buffer_refs(ins)}) >= _MIN_BUFFER_REFERENCES: evidence.append( _evidence( "tx_buffer_region", buffer_references, summary=( f"TX buffer-region references cluster around {h16(TX_BUFFER_START)}" f"-{h16(TX_BUFFER_END)}" ), distinct_buffer_addresses=sorted( {ref for ins in buffer_references for ref in _buffer_refs(ins)} ), ), ) checksum_seed = _checksum_seed_before_xor(ordered, TX_BUFFER_START, _BUFFER_DATA_END) if checksum_seed: evidence.append( _evidence( "tx_checksum_seed", checksum_seed, summary=f"candidate TX checksum starts from seed {h16(CHECKSUM_SEED)}", ), ) checksum_writes = [ins for ins in ordered if _is_write_to_address(ins, TX_CHECKSUM_ADDRESS)] if checksum_writes: evidence.append( _evidence( "checksum_byte", checksum_writes, summary=f"candidate checksum byte write targets {h16(TX_CHECKSUM_ADDRESS)}", ), ) xor_chain = _xor_checksum_chain(ordered, checksum_writes) if xor_chain: evidence.append( _evidence( "xor_checksum_chain", xor_chain, summary=f"XOR chain appears to feed the {h16(TX_CHECKSUM_ADDRESS)} checksum byte", ), ) initial_send = _initial_send_from_buffer_start(ordered) if initial_send: evidence.append( _evidence( "initial_send_from_buffer_start", initial_send, summary=f"initial SCI1 TDR send is supported by a read from {h16(TX_BUFFER_START)}", ), ) index_init = [ins for ins in ordered if _is_index_initialized_to_one(ins)] if index_init: evidence.append( _evidence( "tx_index_initialized_to_one", index_init, summary=f"write evidence supports TX index {h16(TX_INDEX_ADDRESS)} being initialized to 1", ), ) isr_sequence = _indexed_tx_isr_sequence(ordered) if isr_sequence: evidence.append( _evidence( "tx_isr_indexed_send", isr_sequence["send"], summary=( f"candidate TX ISR sends SCI1 TDR from indexed {h16(TX_BUFFER_START)} buffer" ), ), ) evidence.append( _evidence( "tx_index_increment", isr_sequence["increment"], summary=f"candidate TX ISR increments TX index {h16(TX_INDEX_ADDRESS)}", ), ) evidence.append( _evidence( "tx_index_compare_frame_length", isr_sequence["compare"], summary=f"candidate TX ISR compares TX index to frame length {TX_FRAME_LENGTH}", ), ) return evidence def _collect_rx_evidence(ordered: list[Instruction]) -> list[dict[str, object]]: evidence: list[dict[str, object]] = [] rdr_reads = [ins for ins in ordered if _is_read_from_address(ins, SCI1_RDR_ADDRESS)] if rdr_reads: evidence.append( _evidence( "rx_rdr_read", rdr_reads, summary="SCI1 RX ISR reads a byte from SCI1_RDR", ), ) indexed_stores = [ins for ins in ordered if _is_indexed_capture_store(ins)] if indexed_stores: evidence.append( _evidence( "rx_indexed_store", indexed_stores, summary=f"received bytes are stored into candidate capture buffer {h16(RX_CAPTURE_START)}-{h16(RX_CAPTURE_END)}", ), ) index_store = _rx_index_increment_store(ordered) if index_store: evidence.append( _evidence( "rx_index_increment_store", index_store, summary=f"RX byte count/index is incremented and stored at {h16(RX_INDEX_ADDRESS)}", ), ) isr_length_checks = [ ins for ins in ordered if _mnemonic_root(ins.mnemonic) in {"CMP", "CMP:E", "CMP:G", "CMP:I"} and _immediate_source_value(ins.operands) == RX_FRAME_LENGTH and _destination_operand(ins.operands).upper() in {"R1", "R1L"} ] if isr_length_checks: evidence.append( _evidence( "rx_isr_compare_frame_length", isr_length_checks, summary=f"RX ISR compares incremented count to candidate frame length {RX_FRAME_LENGTH}", ), ) complete_timers = [ ins for ins in ordered if _is_write_to_address(ins, RX_COMPLETE_TIMER_ADDRESS) and _immediate_source_value(ins.operands) == 0x14 ] if complete_timers: evidence.append( _evidence( "rx_complete_timer", complete_timers, summary=f"RX ISR sets {h16(RX_COMPLETE_TIMER_ADDRESS)} after count reaches {RX_FRAME_LENGTH}", ), ) processor_length_checks = [ ins for ins in ordered if _mnemonic_root(ins.mnemonic) in {"CMP", "CMP:E", "CMP:G", "CMP:I"} and RX_INDEX_ADDRESS in ins.references and _immediate_source_value(ins.operands) == RX_FRAME_LENGTH ] if processor_length_checks: evidence.append( _evidence( "rx_processor_requires_six_bytes", processor_length_checks, summary=f"RX processing path requires {h16(RX_INDEX_ADDRESS)} to equal {RX_FRAME_LENGTH}", ), ) copies = _rx_copy_capture_to_frame_buffer(ordered) if copies: evidence.append( _evidence( "rx_copy_capture_to_frame_buffer", copies, summary=( f"RX processing copies candidate capture buffer {h16(RX_CAPTURE_START)}-{h16(RX_CAPTURE_END)} " f"to validation buffer {h16(RX_FRAME_START)}-{h16(RX_FRAME_END)}" ), ), ) checksum_seed = _checksum_seed_before_xor(ordered, RX_FRAME_START, RX_CHECKSUM_ADDRESS - 1) if checksum_seed: evidence.append( _evidence( "rx_checksum_seed", checksum_seed, summary=f"candidate RX checksum validation starts from seed {h16(CHECKSUM_SEED)}", ), ) checksum_validation = _rx_xor_checksum_validation(ordered) if checksum_validation: evidence.append( _evidence( "rx_xor_checksum_validation", checksum_validation, summary=( f"RX path XORs {h16(RX_FRAME_START)}-{h16(RX_CHECKSUM_ADDRESS - 1)} " f"and compares the result with {h16(RX_CHECKSUM_ADDRESS)}" ), ), ) return evidence def _tx_candidate_from_evidence(evidence: list[dict[str, object]]) -> dict[str, object] | None: evidence_by_key = {str(item["kind"]): item for item in evidence} missing = [key for key in _TX_REQUIRED_EVIDENCE if key not in evidence_by_key] if missing: return None evidence_addresses = { key: list(evidence_by_key[key]["addresses"]) for key in _TX_REQUIRED_EVIDENCE } candidate: dict[str, object] = { "id": "sci1_tx_frame_f858_len6_candidate", "kind": "candidate_sci1_tx_frame", "channel": "SCI1", "frame_length": TX_FRAME_LENGTH, "buffer_start": TX_BUFFER_START, "buffer_start_hex": h16(TX_BUFFER_START), "buffer_end": TX_BUFFER_END, "buffer_end_hex": h16(TX_BUFFER_END), "checksum_address": TX_CHECKSUM_ADDRESS, "checksum_address_hex": h16(TX_CHECKSUM_ADDRESS), "tx_index_address": TX_INDEX_ADDRESS, "tx_index_address_hex": h16(TX_INDEX_ADDRESS), "tdr_address": SCI1_TDR_ADDRESS, "tdr_address_hex": h16(SCI1_TDR_ADDRESS), "checksum_seed": CHECKSUM_SEED, "checksum_seed_hex": h16(CHECKSUM_SEED), "checksum_formula": "checksum = 0x5A ^ byte0 ^ byte1 ^ byte2 ^ byte3 ^ byte4", "confidence": "high", "confidence_score": 0.95, "confidence_reason": "all required independent evidence groups were observed", "required_evidence_count": len(_TX_REQUIRED_EVIDENCE), "observed_evidence_count": len(_TX_REQUIRED_EVIDENCE), "missing_evidence": [], "evidence_addresses": evidence_addresses, "evidence_addresses_hex": { key: [h16(address) for address in addresses] for key, addresses in evidence_addresses.items() }, "evidence": [evidence_by_key[key] for key in _TX_REQUIRED_EVIDENCE], "short_comment": ( f"candidate/evidence-supported SCI1 {TX_FRAME_LENGTH}-byte TX frame; " f"{h16(TX_BUFFER_START)}-{h16(TX_BUFFER_END)}, checksum {h16(TX_CHECKSUM_ADDRESS)} " f"seeded by {h16(CHECKSUM_SEED)}" ), "comment": ( f"candidate/evidence-supported SCI1 {TX_FRAME_LENGTH}-byte TX frame hypothesis " f"using buffer {h16(TX_BUFFER_START)}-{h16(TX_BUFFER_END)} with checksum byte " f"{h16(TX_CHECKSUM_ADDRESS)} seeded by {h16(CHECKSUM_SEED)}" ), } return candidate def _rx_candidate_from_evidence(evidence: list[dict[str, object]]) -> dict[str, object] | None: evidence_by_key = {str(item["kind"]): item for item in evidence} missing = [key for key in _RX_REQUIRED_EVIDENCE if key not in evidence_by_key] if missing: return None evidence_addresses = { key: list(evidence_by_key[key]["addresses"]) for key in _RX_REQUIRED_EVIDENCE } return { "id": "sci1_rx_frame_f868_len6_candidate", "kind": "candidate_sci1_rx_frame", "channel": "SCI1", "frame_length": RX_FRAME_LENGTH, "capture_buffer_start": RX_CAPTURE_START, "capture_buffer_start_hex": h16(RX_CAPTURE_START), "capture_buffer_end": RX_CAPTURE_END, "capture_buffer_end_hex": h16(RX_CAPTURE_END), "validation_buffer_start": RX_FRAME_START, "validation_buffer_start_hex": h16(RX_FRAME_START), "validation_buffer_end": RX_FRAME_END, "validation_buffer_end_hex": h16(RX_FRAME_END), "checksum_address": RX_CHECKSUM_ADDRESS, "checksum_address_hex": h16(RX_CHECKSUM_ADDRESS), "rx_index_address": RX_INDEX_ADDRESS, "rx_index_address_hex": h16(RX_INDEX_ADDRESS), "rdr_address": SCI1_RDR_ADDRESS, "rdr_address_hex": h16(SCI1_RDR_ADDRESS), "interbyte_timeout_address": RX_INTERBYTE_TIMEOUT_ADDRESS, "interbyte_timeout_address_hex": h16(RX_INTERBYTE_TIMEOUT_ADDRESS), "complete_timer_address": RX_COMPLETE_TIMER_ADDRESS, "complete_timer_address_hex": h16(RX_COMPLETE_TIMER_ADDRESS), "checksum_seed": CHECKSUM_SEED, "checksum_seed_hex": h16(CHECKSUM_SEED), "checksum_formula": "checksum = 0x5A ^ byte0 ^ byte1 ^ byte2 ^ byte3 ^ byte4", "confidence": "high", "confidence_score": 0.9, "confidence_reason": ( "RX count, copy, and checksum-validation evidence were observed; no explicit header/sync byte was identified" ), "caveat": "candidate frame means six consecutive bytes within the observed RX timing/state machine, not a proven delimited packet", "required_evidence_count": len(_RX_REQUIRED_EVIDENCE), "observed_evidence_count": len(_RX_REQUIRED_EVIDENCE), "missing_evidence": [], "evidence_addresses": evidence_addresses, "evidence_addresses_hex": { key: [h16(address) for address in addresses] for key, addresses in evidence_addresses.items() }, "evidence": [evidence_by_key[key] for key in _RX_REQUIRED_EVIDENCE], "short_comment": ( f"candidate/evidence-supported SCI1 {RX_FRAME_LENGTH}-byte RX frame; " f"capture {h16(RX_CAPTURE_START)}-{h16(RX_CAPTURE_END)}, validate " f"{h16(RX_FRAME_START)}-{h16(RX_FRAME_END)}, checksum {h16(RX_CHECKSUM_ADDRESS)} " f"seeded by {h16(CHECKSUM_SEED)}" ), "comment": ( f"candidate/evidence-supported SCI1 {RX_FRAME_LENGTH}-byte RX frame hypothesis " f"using capture buffer {h16(RX_CAPTURE_START)}-{h16(RX_CAPTURE_END)}; " f"checksum byte {h16(RX_CHECKSUM_ADDRESS)} is validated against XOR seeded by {h16(CHECKSUM_SEED)}" ), } def _comment_for_evidence(candidate: Mapping[str, object], item: Mapping[str, object]) -> str: base = str(candidate.get("short_comment") or candidate["comment"]) return ( f"{base}; evidence: {item['summary']}; " f"confidence {candidate['confidence']}" ) def _instruction_metadata( candidate: Mapping[str, object], item: Mapping[str, object], address: int, comment: str, ) -> dict[str, object]: return { "address": address, "action": "serial_reconstruction_evidence", "candidate_id": candidate["id"], "candidate_kind": candidate["kind"], "evidence": item["kind"], "evidence_summary": item["summary"], "evidence_addresses": list(item["addresses"]), "evidence_addresses_hex": list(item["addresses_hex"]), "confidence": candidate["confidence"], "confidence_score": candidate["confidence_score"], "comment": comment, } def _buffer_region_references(ordered: list[Instruction]) -> list[Instruction]: return [ins for ins in ordered if _buffer_refs(ins)] def _buffer_refs(ins: Instruction) -> list[int]: return sorted({ref for ref in ins.references if TX_BUFFER_START <= ref <= TX_BUFFER_END}) def _xor_checksum_chain( ordered: list[Instruction], checksum_writes: list[Instruction], ) -> list[Instruction]: for checksum_write in checksum_writes: index = ordered.index(checksum_write) window = ordered[max(0, index - 16) : index] xors = [ ins for ins in window if _mnemonic_root(ins.mnemonic) == "XOR" and any(TX_BUFFER_START <= ref <= _BUFFER_DATA_END for ref in ins.references) ] if len(xors) >= 2: return xors + [checksum_write] return [] def _checksum_seed_before_xor(ordered: list[Instruction], start: int, end: int) -> list[Instruction]: for index, ins in enumerate(ordered): if ( _mnemonic_root(ins.mnemonic) == "XOR" and any(start <= ref <= end for ref in ins.references) ): for candidate in reversed(ordered[max(0, index - 6) : index]): if _immediate_source_value(candidate.operands) == CHECKSUM_SEED: return [candidate] return [] def _initial_send_from_buffer_start(ordered: list[Instruction]) -> list[Instruction]: for index, ins in enumerate(ordered): if not _is_sci1_tdr_write(ins): continue source, _destination = _source_destination_operands(ins.operands) if _operand_mentions_address(source, TX_BUFFER_START) and not _is_indexed_operand(source): return [ins] for candidate in reversed(ordered[max(0, index - 3) : index]): if _is_nonindexed_read_from_buffer_start(candidate): return [candidate, ins] return [] def _indexed_tx_isr_sequence(ordered: list[Instruction]) -> dict[str, list[Instruction]] | None: for index, ins in enumerate(ordered): if not _is_sci1_tdr_write(ins): continue prior = ordered[max(0, index - 6) : index] indexed_reads = [candidate for candidate in prior if _is_indexed_buffer_read(candidate)] if not indexed_reads and _is_indexed_buffer_read(ins): indexed_reads = [ins] if not indexed_reads: continue index_reads = [candidate for candidate in prior if _is_read_from_address(candidate, TX_INDEX_ADDRESS)] if not index_reads: continue after = ordered[index + 1 : index + 9] increments = [candidate for candidate in after if _is_index_increment(candidate)] compares = [candidate for candidate in after if _is_compare_index_to_frame_length(candidate)] if not increments or not compares: continue if increments[0].address > compares[0].address: continue send = _dedupe_instructions([index_reads[-1], indexed_reads[-1], ins]) return { "send": send, "increment": [increments[0]], "compare": [compares[0]], } return None def _is_index_initialized_to_one(ins: Instruction) -> bool: return ( _mnemonic_root(ins.mnemonic) in {"MOV:G", "MOV:S"} and _is_write_to_address(ins, TX_INDEX_ADDRESS) and _immediate_source_value(ins.operands) == 1 ) def _is_index_increment(ins: Instruction) -> bool: if not _is_write_to_address(ins, TX_INDEX_ADDRESS): return False root = _mnemonic_root(ins.mnemonic) if root in {"ADD:Q", "ADD:G", "ADDS"}: return _immediate_source_value(ins.operands) == 1 return root in {"INC", "INC:G"} def _is_compare_index_to_frame_length(ins: Instruction) -> bool: return ( _mnemonic_root(ins.mnemonic) in {"CMP", "CMP:E", "CMP:G", "CMP:I"} and TX_INDEX_ADDRESS in ins.references and _immediate_source_value(ins.operands) == TX_FRAME_LENGTH ) def _is_sci1_tdr_write(ins: Instruction) -> bool: return _is_write_to_address(ins, SCI1_TDR_ADDRESS) def _is_nonindexed_read_from_buffer_start(ins: Instruction) -> bool: return ( _is_read_from_address(ins, TX_BUFFER_START) and not _is_indexed_operand(_source_destination_operands(ins.operands)[0]) ) def _is_indexed_buffer_read(ins: Instruction) -> bool: source, _destination = _source_destination_operands(ins.operands) return ( (_is_read_from_address(ins, TX_BUFFER_START) or _operand_mentions_address(source, TX_BUFFER_START)) and _is_indexed_operand(source) ) def _is_indexed_capture_store(ins: Instruction) -> bool: _source, destination = _source_destination_operands(ins.operands) return ( _access_direction(ins, RX_CAPTURE_START) == "write" and _operand_mentions_address(destination, RX_CAPTURE_START) and _is_indexed_operand(destination) ) def _rx_index_increment_store(ordered: list[Instruction]) -> list[Instruction]: for index, ins in enumerate(ordered): if _is_write_to_address(ins, RX_INDEX_ADDRESS): window = ordered[max(0, index - 4) : index + 1] increments = [ candidate for candidate in window if _mnemonic_root(candidate.mnemonic) in {"ADD:Q", "ADD:G", "ADDS", "INC", "INC:G"} and _destination_operand(candidate.operands).upper() in {"R1", "R1L"} and (_immediate_source_value(candidate.operands) in {None, 1}) ] if increments: return [increments[-1], ins] return [] def _rx_copy_capture_to_frame_buffer(ordered: list[Instruction]) -> list[Instruction]: copies: list[Instruction] = [] capture_reads = [ ins for ins in ordered if any(RX_CAPTURE_START <= ref <= RX_CAPTURE_END for ref in ins.references) ] frame_writes = [ ins for ins in ordered if any(RX_FRAME_START <= ref <= RX_FRAME_END for ref in ins.references) ] if len(capture_reads) >= 3 and len(frame_writes) >= 3: copies.extend(capture_reads[:3]) copies.extend(frame_writes[:3]) return _dedupe_instructions(copies) def _rx_xor_checksum_validation(ordered: list[Instruction]) -> list[Instruction]: for index, ins in enumerate(ordered): if ( _mnemonic_root(ins.mnemonic) in {"CMP", "CMP:E", "CMP:G", "CMP:I"} and RX_CHECKSUM_ADDRESS in ins.references ): window = ordered[max(0, index - 16) : index] xors = [ candidate for candidate in window if _mnemonic_root(candidate.mnemonic) == "XOR" and any(RX_FRAME_START <= ref <= RX_CHECKSUM_ADDRESS - 1 for ref in candidate.references) ] seed = [ candidate for candidate in window if _immediate_source_value(candidate.operands) == CHECKSUM_SEED ] if len(xors) >= 5 and seed: return [seed[-1], *xors, ins] return [] def _is_read_from_address(ins: Instruction, address: int) -> bool: source, destination = _source_destination_operands(ins.operands) if _operand_mentions_address(source, address): return True if address not in ins.references: return False if source.startswith("@") and not _operand_mentions_any_reference(destination, ins.references): return True return _access_direction(ins, address) == "read" def _is_write_to_address(ins: Instruction, address: int) -> bool: _source, destination = _source_destination_operands(ins.operands) if _operand_mentions_address(destination, address): return _access_direction(ins, address) == "write" if address not in ins.references: return False return _access_direction(ins, address) == "write" def _access_direction(ins: Instruction, address: int) -> str | None: root = _mnemonic_root(ins.mnemonic) if root in {"BTST", "CMP", "CMP:E", "CMP:G", "CMP:I", "MOVFPE", "TST"}: return "read" if root in {"BCLR", "BNOT", "BSET", "CLR", "INC", "INC:G", "NEG", "NOT"}: return "write" if root in {"ADD:Q", "ADD:G", "ADDS", "ADDX", "AND", "OR", "SUB", "SUBS", "SUBX", "XOR"}: return "write" if root in {"MOV:G", "MOV:S", "MOVTPE"}: source, destination = _source_destination_operands(ins.operands) if _operand_mentions_address(destination, address): return "write" if _operand_mentions_address(source, address): return "read" if address in ins.references: if destination.startswith("@") and not _operand_mentions_any_reference(source, ins.references): return "write" if source.startswith("@") and not _operand_mentions_any_reference(destination, ins.references): return "read" if root in {"MOV:L", "MOV:F"}: return "read" if root == "STC": return "write" if root == "LDC": return "read" return None def _evidence( kind: str, instructions: list[Instruction], *, summary: str, **extra: object, ) -> dict[str, object]: addresses = [ins.address for ins in _dedupe_instructions(instructions)] item: dict[str, object] = { "kind": kind, "summary": summary, "addresses": addresses, "addresses_hex": [h16(address) for address in addresses], "instructions": [ins.text for ins in _dedupe_instructions(instructions)], } item.update(extra) if "distinct_buffer_addresses" in item and isinstance(item["distinct_buffer_addresses"], list): item["distinct_buffer_addresses_hex"] = [ h16(address) for address in item["distinct_buffer_addresses"] if isinstance(address, int) ] return item def _dedupe_instructions(instructions: list[Instruction]) -> list[Instruction]: output: list[Instruction] = [] seen: set[int] = set() for ins in instructions: if ins.address in seen: continue seen.add(ins.address) output.append(ins) return output def _instruction_sequence( instructions: Mapping[int, Instruction] | Iterable[Instruction], ) -> list[Instruction]: values = instructions.values() if isinstance(instructions, Mapping) else instructions return sorted(values, key=lambda ins: ins.address) def _source_destination_operands(operands: str) -> tuple[str, str]: depth = 0 split_at: int | None = None for index, char in enumerate(operands): if char in "({": depth += 1 elif char in ")}" and depth: depth -= 1 elif char == "," and depth == 0: split_at = index if split_at is None: operand = operands.strip() return "", operand return operands[:split_at].strip(), operands[split_at + 1 :].strip() def _destination_operand(operands: str) -> str: return _source_destination_operands(operands)[1] def _immediate_source_value(operands: str) -> int | None: source, _destination = _source_destination_operands(operands) if not source.startswith("#"): return None try: return parse_int(source[1:]) & 0xFFFF except ValueError: return None def _operand_mentions_any_reference(operand: str, references: list[int]) -> bool: return any(_operand_mentions_address(operand, address) for address in references) def _operand_mentions_address(operand: str, address: int) -> bool: operand_upper = operand.upper().replace(" ", "") names = { SCI1_TDR_ADDRESS: ("SCI1_TDR",), TX_BUFFER_START: ("TX_BUFFER",), TX_CHECKSUM_ADDRESS: ("TX_CHECKSUM",), TX_INDEX_ADDRESS: ("TX_INDEX",), } if any(name in operand_upper for name in names.get(address, ())): return True negative = (0x10000 - address) & 0xFFFF return ( f"H'{address:04X}" in operand_upper or f"0X{address:04X}" in operand_upper or f"${address:04X}" in operand_upper or f"-H'{negative:04X}" in operand_upper or f"-0X{negative:04X}" in operand_upper or f"-${negative:04X}" in operand_upper ) def _is_indexed_operand(operand: str) -> bool: operand_upper = operand.upper().replace(" ", "") return operand_upper.startswith("@(") and ",R" in operand_upper def _mnemonic_root(mnemonic: str) -> str: return mnemonic.rsplit(".", 1)[0].upper()