465 lines
15 KiB
Python
465 lines
15 KiB
Python
from __future__ import annotations
|
|
|
|
from collections.abc import Iterable, Mapping
|
|
from dataclasses import dataclass
|
|
|
|
from .formatting import h16, parse_int
|
|
from .model import Instruction
|
|
|
|
|
|
MANUAL_REFERENCES = [
|
|
"Manual/0900766b802125d0.md:15748 SCI register map for RDR/TDR/SCR/SSR",
|
|
"Manual/0900766b802125d0.md:15794 RDR stores received data and is CPU-readable",
|
|
"Manual/0900766b802125d0.md:15823 TDR holds the next byte to transmit",
|
|
"Manual/0900766b802125d0.md:15976 SCR.TIE enables/disables TXI on TDRE",
|
|
"Manual/0900766b802125d0.md:15993 SCR.RIE enables RXI and ERI",
|
|
"Manual/0900766b802125d0.md:16008 SCR.TE enables the transmitter",
|
|
"Manual/0900766b802125d0.md:16028 SCR.RE enables the receiver",
|
|
"Manual/0900766b802125d0.md:16090 SSR flags are cleared by writing zero",
|
|
"Manual/0900766b802125d0.md:16100 SSR.TDRE means TDR can accept the next byte",
|
|
"Manual/0900766b802125d0.md:16116 SSR.RDRF means received data reached RDR",
|
|
"Manual/0900766b802125d0.md:16127 SSR.ORER reports receive overrun",
|
|
"Manual/0900766b802125d0.md:16140 SSR.FER reports framing errors",
|
|
"Manual/0900766b802125d0.md:16147 SSR.PER reports parity errors",
|
|
]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SciRegister:
|
|
channel: str
|
|
name: str
|
|
address: int
|
|
|
|
|
|
SCI_REGISTERS: tuple[SciRegister, ...] = (
|
|
SciRegister("SCI1", "SCR", 0xFEDA),
|
|
SciRegister("SCI1", "TDR", 0xFEDB),
|
|
SciRegister("SCI1", "SSR", 0xFEDC),
|
|
SciRegister("SCI1", "RDR", 0xFEDD),
|
|
SciRegister("SCI2", "SCR", 0xFEF2),
|
|
SciRegister("SCI2", "TDR", 0xFEF3),
|
|
SciRegister("SCI2", "SSR", 0xFEF4),
|
|
SciRegister("SCI2", "RDR", 0xFEF5),
|
|
)
|
|
|
|
SCI_REGISTER_BY_ADDRESS = {register.address: register for register in SCI_REGISTERS}
|
|
SCI_CHANNELS = ("SCI1", "SCI2")
|
|
|
|
SCR_CONTROL_BITS = {
|
|
7: ("TIE", "TX interrupt", "TXI"),
|
|
6: ("RIE", "RX/ERI interrupts", "RXI and ERI"),
|
|
5: ("TE", "transmitter", "TXD output"),
|
|
4: ("RE", "receiver", "RXD input"),
|
|
}
|
|
|
|
SSR_FLAGS = {
|
|
7: ("TDRE", "transmit data register empty"),
|
|
6: ("RDRF", "receive-data-full"),
|
|
5: ("ORER", "overrun error"),
|
|
4: ("FER", "framing error"),
|
|
3: ("PER", "parity error"),
|
|
}
|
|
|
|
SSR_CLEAR_ACTIONS = {
|
|
7: (
|
|
"clear_tdre",
|
|
"clear {channel} TDRE after TDR write; TXI can fire again when hardware reasserts TDRE",
|
|
),
|
|
6: (
|
|
"clear_rdrf",
|
|
"clear {channel} RDRF with SSR R/(W)* semantics: write 0 clears latched hardware flag, write 1 preserves hardware-owned state",
|
|
),
|
|
5: (
|
|
"clear_orer",
|
|
"clear {channel} ORER with SSR R/(W)* semantics: write 0 clears latched hardware flag, write 1 preserves hardware-owned state",
|
|
),
|
|
4: (
|
|
"clear_fer",
|
|
"clear {channel} FER with SSR R/(W)* semantics: write 0 clears latched hardware flag, write 1 preserves hardware-owned state",
|
|
),
|
|
3: (
|
|
"clear_per",
|
|
"clear {channel} PER with SSR R/(W)* semantics: write 0 clears latched hardware flag, write 1 preserves hardware-owned state",
|
|
),
|
|
}
|
|
|
|
|
|
def analyze_sci_protocol(
|
|
instructions: Mapping[int, Instruction] | Iterable[Instruction],
|
|
) -> dict[str, object]:
|
|
"""Identify SCI protocol-level reads, writes, clears, and polling waits."""
|
|
ordered = _instruction_sequence(instructions)
|
|
annotations: dict[int, list[str]] = {}
|
|
instruction_metadata: dict[int, list[dict[str, object]]] = {}
|
|
channels: dict[str, dict[str, list[dict[str, object]]]] = {
|
|
channel: {"events": []} for channel in SCI_CHANNELS
|
|
}
|
|
events: list[dict[str, object]] = []
|
|
|
|
def record(event: dict[str, object]) -> None:
|
|
public = dict(event)
|
|
events.append(public)
|
|
channel = str(public["channel"])
|
|
channels[channel]["events"].append(public)
|
|
instruction_metadata.setdefault(int(public["address"]), []).append(public)
|
|
comment = public.get("comment")
|
|
if comment:
|
|
parts = annotations.setdefault(int(public["address"]), [])
|
|
text = str(comment)
|
|
if text not in parts:
|
|
parts.append(text)
|
|
|
|
tdre_wait_events = _tdre_wait_events_by_address(ordered)
|
|
for ins in ordered:
|
|
for event in _instruction_events(ins):
|
|
record(event)
|
|
for event in tdre_wait_events.get(ins.address, []):
|
|
record(event)
|
|
|
|
return {
|
|
"manual_references": MANUAL_REFERENCES,
|
|
"channels": channels,
|
|
"events": events,
|
|
"annotations": {
|
|
address: "; ".join(parts)
|
|
for address, parts in sorted(annotations.items())
|
|
},
|
|
"instructions": instruction_metadata,
|
|
}
|
|
|
|
|
|
def sci_protocol_comment_for_instruction(analysis: Mapping[str, object] | None, address: int) -> str:
|
|
if not analysis:
|
|
return ""
|
|
annotations = analysis.get("annotations")
|
|
if not isinstance(annotations, Mapping):
|
|
return ""
|
|
comment = annotations.get(address)
|
|
return str(comment) if comment else ""
|
|
|
|
|
|
def sci_protocol_metadata_for_instruction(
|
|
analysis: Mapping[str, object] | None,
|
|
address: int,
|
|
) -> list[dict[str, object]]:
|
|
if not analysis:
|
|
return []
|
|
instructions = analysis.get("instructions")
|
|
if not isinstance(instructions, Mapping):
|
|
return []
|
|
metadata = instructions.get(address)
|
|
return list(metadata) if isinstance(metadata, list) else []
|
|
|
|
|
|
def sci_protocol_json_payload(analysis: Mapping[str, object] | None) -> dict[str, object]:
|
|
if not analysis:
|
|
return {
|
|
"manual_references": MANUAL_REFERENCES,
|
|
"channels": {channel: {"events": []} for channel in SCI_CHANNELS},
|
|
"events": [],
|
|
}
|
|
return {
|
|
"manual_references": analysis.get("manual_references", MANUAL_REFERENCES),
|
|
"channels": analysis.get("channels", {}),
|
|
"events": analysis.get("events", []),
|
|
}
|
|
|
|
|
|
def _instruction_events(ins: Instruction) -> list[dict[str, object]]:
|
|
events: list[dict[str, object]] = []
|
|
for register in _referenced_sci_registers(ins):
|
|
if register.name == "TDR":
|
|
events.extend(_tdr_events(ins, register))
|
|
elif register.name == "RDR":
|
|
events.extend(_rdr_events(ins, register))
|
|
elif register.name == "SSR":
|
|
events.extend(_ssr_events(ins, register))
|
|
elif register.name == "SCR":
|
|
events.extend(_scr_events(ins, register))
|
|
return events
|
|
|
|
|
|
def _tdr_events(ins: Instruction, register: SciRegister) -> list[dict[str, object]]:
|
|
if _access_direction(ins, register.address) != "write":
|
|
return []
|
|
return [
|
|
_event(
|
|
ins,
|
|
register,
|
|
action="write_tdr",
|
|
comment=f"write RS232/SCI byte to {register.channel} TDR for transmission",
|
|
),
|
|
]
|
|
|
|
|
|
def _rdr_events(ins: Instruction, register: SciRegister) -> list[dict[str, object]]:
|
|
if _access_direction(ins, register.address) != "read":
|
|
return []
|
|
return [
|
|
_event(
|
|
ins,
|
|
register,
|
|
action="read_rdr",
|
|
comment=f"read {register.channel} received byte from RDR",
|
|
),
|
|
]
|
|
|
|
|
|
def _ssr_events(ins: Instruction, register: SciRegister) -> list[dict[str, object]]:
|
|
if _access_direction(ins, register.address) != "write":
|
|
return []
|
|
|
|
bit = _immediate_bit(ins.operands)
|
|
if _mnemonic_root(ins.mnemonic) == "BCLR" and bit in SSR_CLEAR_ACTIONS:
|
|
action, comment_template = SSR_CLEAR_ACTIONS[bit]
|
|
flag, description = SSR_FLAGS[bit]
|
|
return [
|
|
_event(
|
|
ins,
|
|
register,
|
|
action=action,
|
|
bit=bit,
|
|
flag=flag,
|
|
description=description,
|
|
comment=comment_template.format(channel=register.channel),
|
|
),
|
|
]
|
|
|
|
value = _written_immediate_value(ins)
|
|
if value is None:
|
|
return []
|
|
|
|
events: list[dict[str, object]] = []
|
|
for clear_bit, (action, comment_template) in SSR_CLEAR_ACTIONS.items():
|
|
if value & (1 << clear_bit):
|
|
continue
|
|
flag, description = SSR_FLAGS[clear_bit]
|
|
events.append(
|
|
_event(
|
|
ins,
|
|
register,
|
|
action=action,
|
|
bit=clear_bit,
|
|
flag=flag,
|
|
description=description,
|
|
value=value,
|
|
comment=comment_template.format(channel=register.channel),
|
|
),
|
|
)
|
|
return events
|
|
|
|
|
|
def _scr_events(ins: Instruction, register: SciRegister) -> list[dict[str, object]]:
|
|
if _access_direction(ins, register.address) != "write":
|
|
return []
|
|
|
|
bit = _immediate_bit(ins.operands)
|
|
root = _mnemonic_root(ins.mnemonic)
|
|
if root in {"BCLR", "BSET"} and bit in SCR_CONTROL_BITS:
|
|
return [_scr_bit_event(ins, register, bit, enabled=root == "BSET")]
|
|
|
|
value = _written_immediate_value(ins)
|
|
if value is None:
|
|
return []
|
|
return [
|
|
_scr_bit_event(ins, register, bit_number, enabled=bool(value & (1 << bit_number)), value=value)
|
|
for bit_number in SCR_CONTROL_BITS
|
|
]
|
|
|
|
|
|
def _scr_bit_event(
|
|
ins: Instruction,
|
|
register: SciRegister,
|
|
bit: int,
|
|
enabled: bool,
|
|
value: int | None = None,
|
|
) -> dict[str, object]:
|
|
bit_name, noun, interrupt_source = SCR_CONTROL_BITS[bit]
|
|
verb = "enable" if enabled else "disable"
|
|
action_noun = noun.replace("/", "_").replace(" ", "_").lower()
|
|
if bit == 7:
|
|
comment = (
|
|
f"{verb} {register.channel} TX interrupt (TIE); "
|
|
"gates TXI when hardware sets TDRE"
|
|
)
|
|
elif bit == 6:
|
|
comment = f"{verb} {register.channel} receive and receive-error interrupts (RIE)"
|
|
elif bit == 5:
|
|
comment = f"{verb} {register.channel} transmitter (TE)"
|
|
else:
|
|
comment = f"{verb} {register.channel} receiver (RE)"
|
|
return _event(
|
|
ins,
|
|
register,
|
|
action=f"{verb}_{action_noun}",
|
|
bit=bit,
|
|
bit_name=bit_name,
|
|
enabled=enabled,
|
|
interrupt_source=interrupt_source,
|
|
value=value,
|
|
comment=comment,
|
|
)
|
|
|
|
|
|
def _tdre_wait_events_by_address(ordered: list[Instruction]) -> dict[int, list[dict[str, object]]]:
|
|
events_by_address: dict[int, list[dict[str, object]]] = {}
|
|
for index, ins in enumerate(ordered):
|
|
register = _single_sci_register(ins)
|
|
if register is None or register.name != "SSR":
|
|
continue
|
|
if _mnemonic_root(ins.mnemonic) != "BTST" or _immediate_bit(ins.operands) != 7:
|
|
continue
|
|
branch = _next_beq_back_to(ordered, index, ins.address)
|
|
if branch is None:
|
|
continue
|
|
events_by_address.setdefault(ins.address, []).append(
|
|
_event(
|
|
ins,
|
|
register,
|
|
action="wait_for_tdre",
|
|
bit=7,
|
|
flag="TDRE",
|
|
branch_address=branch.address,
|
|
branch_target=ins.address,
|
|
comment=f"wait for {register.channel} transmit data register empty (TDRE=1)",
|
|
),
|
|
)
|
|
events_by_address.setdefault(branch.address, []).append(
|
|
_event(
|
|
branch,
|
|
register,
|
|
action="tdre_wait_branch",
|
|
bit=7,
|
|
flag="TDRE",
|
|
test_address=ins.address,
|
|
branch_target=ins.address,
|
|
comment=f"repeat {register.channel} transmit-empty wait while TDRE=0",
|
|
),
|
|
)
|
|
return events_by_address
|
|
|
|
|
|
def _next_beq_back_to(ordered: list[Instruction], index: int, target: int) -> Instruction | None:
|
|
for candidate in ordered[index + 1 : index + 5]:
|
|
if _mnemonic_root(candidate.mnemonic) != "BEQ":
|
|
continue
|
|
if target in candidate.targets:
|
|
return candidate
|
|
return None
|
|
|
|
|
|
def _event(
|
|
ins: Instruction,
|
|
register: SciRegister,
|
|
*,
|
|
action: str,
|
|
comment: str,
|
|
**extra: object,
|
|
) -> dict[str, object]:
|
|
event: dict[str, object] = {
|
|
"address": ins.address,
|
|
"instruction": ins.text,
|
|
"action": action,
|
|
"channel": register.channel,
|
|
"register": register.name,
|
|
"register_address": register.address,
|
|
"register_address_hex": h16(register.address),
|
|
"comment": comment,
|
|
"manual": MANUAL_REFERENCES,
|
|
}
|
|
event.update({key: value for key, value in extra.items() if value is not None})
|
|
return event
|
|
|
|
|
|
def _instruction_sequence(
|
|
instructions: Mapping[int, Instruction] | Iterable[Instruction],
|
|
) -> list[Instruction]:
|
|
values = instructions.values() if isinstance(instructions, Mapping) else instructions
|
|
return sorted(values, key=lambda ins: ins.address)
|
|
|
|
|
|
def _referenced_sci_registers(ins: Instruction) -> list[SciRegister]:
|
|
registers: list[SciRegister] = []
|
|
seen: set[int] = set()
|
|
for address in ins.references:
|
|
register = SCI_REGISTER_BY_ADDRESS.get(address)
|
|
if register is None or register.address in seen:
|
|
continue
|
|
seen.add(register.address)
|
|
registers.append(register)
|
|
return registers
|
|
|
|
|
|
def _single_sci_register(ins: Instruction) -> SciRegister | None:
|
|
registers = _referenced_sci_registers(ins)
|
|
return registers[0] if len(registers) == 1 else None
|
|
|
|
|
|
def _access_direction(ins: Instruction, address: int) -> str | None:
|
|
root = _mnemonic_root(ins.mnemonic)
|
|
if root in {"BTST", "CMP:E", "CMP:G", "CMP:I", "MOVFPE", "TST"}:
|
|
return "read"
|
|
if root in {"BCLR", "BNOT", "BSET", "CLR", "NEG", "NOT"}:
|
|
return "write"
|
|
if root in {"ADD:Q", "ADD:G", "ADDX", "AND", "OR", "SUB", "SUBS", "SUBX", "XOR"}:
|
|
return "write"
|
|
if root in {"MOV:G", "MOV:S", "MOVTPE"}:
|
|
source, destination = _source_destination_operands(ins.operands)
|
|
if _operand_mentions_address_or_register(destination, address):
|
|
return "write"
|
|
if _operand_mentions_address_or_register(source, address):
|
|
return "read"
|
|
if root in {"MOV:L", "MOV:F"}:
|
|
return "read"
|
|
if root == "STC":
|
|
return "write"
|
|
if root == "LDC":
|
|
return "read"
|
|
return None
|
|
|
|
|
|
def _source_destination_operands(operands: str) -> tuple[str, str]:
|
|
if "," not in operands:
|
|
operand = operands.strip()
|
|
return "", operand
|
|
source, destination = operands.rsplit(",", 1)
|
|
return source.strip(), destination.strip()
|
|
|
|
|
|
def _operand_mentions_address_or_register(operand: str, address: int) -> bool:
|
|
operand_upper = operand.upper()
|
|
register = SCI_REGISTER_BY_ADDRESS.get(address)
|
|
if register and f"{register.channel}_{register.name}" in operand_upper:
|
|
return True
|
|
return f"H'{address:04X}" in operand_upper or f"0X{address:04X}" in operand_upper
|
|
|
|
|
|
def _immediate_bit(operands: str) -> int | None:
|
|
source, _destination = _source_destination_operands(operands)
|
|
if not source.startswith("#"):
|
|
return None
|
|
try:
|
|
bit = parse_int(source[1:])
|
|
except ValueError:
|
|
return None
|
|
return bit if 0 <= bit <= 7 else None
|
|
|
|
|
|
def _written_immediate_value(ins: Instruction) -> int | None:
|
|
root = _mnemonic_root(ins.mnemonic)
|
|
if root == "CLR":
|
|
return 0
|
|
if root not in {"MOV:G", "MOV:S"}:
|
|
return None
|
|
source, _destination = _source_destination_operands(ins.operands)
|
|
if not source.startswith("#"):
|
|
return None
|
|
try:
|
|
return parse_int(source[1:]) & 0xFF
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _mnemonic_root(mnemonic: str) -> str:
|
|
return mnemonic.rsplit(".", 1)[0].upper()
|