1
0
Files
2026-05-25 23:28:14 +10:00

259 lines
9.3 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable
from .x24164 import X24164Bus
P9_ACK_BIT = 0x80
P9_STROBE_BIT = 0x02
@dataclass(frozen=True)
class P9StrobeEvent:
edge: str
ddr: int
dr: int
data_bit: int
bit7_output: bool
@dataclass(frozen=True)
class P9TraceEvent:
kind: str
ddr: int
dr: int
value: int | None = None
bit: int | None = None
source: str | None = None
success: bool | None = None
queue_depth: int | None = None
message: str | None = None
def line(self) -> str:
parts = [self.kind, f"ddr={self.ddr:02X}", f"dr={self.dr:02X}"]
if self.value is not None:
parts.append(f"value={self.value:02X}")
if self.bit is not None:
parts.append(f"bit={self.bit}")
if self.success is not None:
parts.append(f"success={int(self.success)}")
if self.source is not None:
parts.append(f"source={self.source}")
if self.queue_depth is not None:
parts.append(f"queued={self.queue_depth}")
if self.message is not None:
parts.append(self.message)
return " ".join(parts)
class P9Bus:
"""Small model for the ROM's P9 bit-banged serial handshake.
Board tracing ties P91/P97 to X24164 SCL/SDA. The legacy bit queue is
retained for tests and exploratory scripts, while the X24164 model now
drives SDA during recognized EEPROM transactions.
"""
def __init__(
self,
ddr: int = 0x00,
dr: int = 0x00,
input_bits: Iterable[int] = (),
wrapper_results: Iterable[bool] = (),
x24164_bus: X24164Bus | None = None,
) -> None:
self.ddr = ddr & 0xFF
self.dr_latch = dr & 0xFF
self.input_bits: list[int] = [1 if bit else 0 for bit in input_bits]
self.default_input_bit = 0
self.wrapper_results: list[bool] = [bool(result) for result in wrapper_results]
self.wrapper_sources: list[str] = ["initial"] * len(self.wrapper_results)
self.default_wrapper_success = False
self.strobe_edges: list[P9StrobeEvent] = []
self.trace_events: list[P9TraceEvent] = []
self.transmitted_bits: list[int] = []
self.byte_candidates: list[int] = []
self.x24164_bus = x24164_bus if x24164_bus is not None else X24164Bus()
self._x24164_trace_index = 0
def write_ddr(self, value: int) -> int:
self.ddr = value & 0xFF
self.trace_events.append(P9TraceEvent("write_ddr", self.ddr, self.dr_latch, value=self.ddr))
return self.ddr
def write_dr(self, value: int) -> int:
previous = self.dr_latch
previous_ddr = self.ddr
self.dr_latch = value & 0xFF
self.trace_events.append(P9TraceEvent("write_dr", self.ddr, self.dr_latch, value=self.dr_latch))
previous_strobe = bool(previous & P9_STROBE_BIT)
current_strobe = bool(self.dr_latch & P9_STROBE_BIT)
if previous_strobe != current_strobe:
edge = "rising" if current_strobe else "falling"
data_bit = 1 if self.dr_latch & P9_ACK_BIT else 0
bit7_output = bool(self.ddr & P9_ACK_BIT)
self.strobe_edges.append(P9StrobeEvent(edge, self.ddr, self.dr_latch, data_bit, bit7_output))
self.trace_events.append(P9TraceEvent(f"strobe_{edge}", self.ddr, self.dr_latch, bit=data_bit))
if edge == "rising" and bit7_output:
self._record_transmitted_bit(data_bit)
self.x24164_bus.observe(
previous_scl=bool(previous & P9_STROBE_BIT),
previous_master_sda=bool(previous & P9_ACK_BIT) if previous_ddr & P9_ACK_BIT else True,
current_scl=bool(self.dr_latch & P9_STROBE_BIT),
current_master_sda=bool(self.dr_latch & P9_ACK_BIT) if self.ddr & P9_ACK_BIT else True,
master_sda_output=bool(self.ddr & P9_ACK_BIT),
)
self._append_x24164_trace()
return self.dr_latch
def read_ddr(self) -> int:
return self.ddr
def read_dr(self) -> int:
value = self.dr_latch
input_bit = None
source = None
if not (self.ddr & P9_ACK_BIT):
x24164_bit = self.x24164_bus.sda_bit()
if self.input_bits:
input_bit = self.input_bits.pop(0)
source = "queued_bit"
elif x24164_bit is not None:
input_bit = x24164_bit
source = "x24164"
else:
input_bit = self.default_input_bit
source = "default_bit"
if input_bit:
value |= P9_ACK_BIT
else:
value &= ~P9_ACK_BIT
self.trace_events.append(P9TraceEvent("read_dr", self.ddr, value & 0xFF, value=value, bit=input_bit, source=source))
return value & 0xFF
def queue_input_bits(self, bits: Iterable[int]) -> None:
self.input_bits.extend(1 if bit else 0 for bit in bits)
def set_default_input_bit(self, bit: int) -> None:
self.default_input_bit = 1 if bit else 0
def queue_wrapper_results(self, results: Iterable[bool], source: str = "queued_wrapper") -> None:
for result in results:
self.wrapper_results.append(bool(result))
self.wrapper_sources.append(source)
def consume_wrapper_result(self) -> tuple[bool, str, int]:
if self.wrapper_results:
success = self.wrapper_results.pop(0)
source = self.wrapper_sources.pop(0) if self.wrapper_sources else "queued_wrapper"
else:
success = self.default_wrapper_success
source = "default_success" if success else "default_timeout"
queue_depth = len(self.wrapper_results)
self.trace_events.append(
P9TraceEvent(
"wrapper_result",
self.ddr,
self.dr_latch,
value=1 if success else 0,
success=success,
source=source,
queue_depth=queue_depth,
)
)
return success, source, queue_depth
def trace_lines(self, limit: int | None = None) -> list[str]:
events = self.trace_events if limit is None else self.trace_events[-limit:]
return [event.line() for event in events]
def fast_start(self) -> None:
self.x24164_bus.fast_start()
self._append_x24164_trace()
def fast_stop(self) -> None:
self.x24164_bus.fast_stop()
self._append_x24164_trace()
def fast_master_ack(self, ack: bool = True) -> None:
self.x24164_bus.fast_master_ack(ack)
self._append_x24164_trace()
def fast_write_byte(self, value: int) -> bool:
success = self.x24164_bus.fast_write_byte(value)
self.trace_events.append(
P9TraceEvent(
"fast_write_byte",
self.ddr,
self.dr_latch,
value=value & 0xFF,
success=success,
source="x24164",
)
)
self._append_x24164_trace()
return success
def fast_read_byte(self) -> int | None:
value = self.x24164_bus.fast_read_byte()
if value is not None:
self.trace_events.append(P9TraceEvent("fast_read_byte", self.ddr, self.dr_latch, value=value, source="x24164"))
self._append_x24164_trace()
return value
def fast_read_word(self, address: int) -> tuple[bool, int]:
success, value = self.x24164_bus.read_linear_word(address)
self.trace_events.append(
P9TraceEvent(
"fast_read_word",
self.ddr,
self.dr_latch,
value=(value >> 8) & 0xFF,
success=success,
source="x24164",
message=f"addr={address & 0x0FFF:03X} word={value & 0xFFFF:04X}",
)
)
self._append_x24164_trace()
return success, value
def fast_write_word(self, address: int, value: int) -> bool:
success = self.x24164_bus.write_linear_word(address, value)
self.trace_events.append(
P9TraceEvent(
"fast_write_word",
self.ddr,
self.dr_latch,
value=(value >> 8) & 0xFF,
success=success,
source="x24164",
message=f"addr={address & 0x0FFF:03X} word={value & 0xFFFF:04X}",
)
)
self._append_x24164_trace()
return success
def clear_x24164_trace(self) -> None:
self.x24164_bus.trace_events.clear()
self._x24164_trace_index = 0
def _record_transmitted_bit(self, bit: int) -> None:
self.transmitted_bits.append(bit)
self.trace_events.append(P9TraceEvent("tx_bit", self.ddr, self.dr_latch, bit=bit))
if len(self.transmitted_bits) % 8 == 0:
byte = 0
for data_bit in self.transmitted_bits[-8:]:
byte = (byte << 1) | data_bit
self.byte_candidates.append(byte)
self.trace_events.append(P9TraceEvent("tx_byte", self.ddr, self.dr_latch, value=byte))
def _append_x24164_trace(self) -> None:
new_events = self.x24164_bus.trace_events[self._x24164_trace_index :]
self._x24164_trace_index = len(self.x24164_bus.trace_events)
for event in new_events:
self.trace_events.append(P9TraceEvent("x24164", self.ddr, self.dr_latch, message=event.line()))