from __future__ import annotations from dataclasses import dataclass from .frames import build_frame, frame_checksum_ok from .modules import ModuleDecision, ModuleTx SELECTOR_IRIS_MBLACK_LINK = 0x0013 IRIS_MBLACK_LINK_CLEAR = 0x0000 IRIS_MBLACK_LINK_ACTIVE = 0x4000 def selector_from_frame(frame: bytes) -> int | None: if len(frame) != 6: return None if frame[1] == 0x00: return frame[2] if frame[1] == 0x01: return 0x0080 + frame[2] if frame[1] == 0x02: return 0x0180 + frame[2] return None def value_from_frame(frame: bytes) -> int | None: if len(frame) != 6: return None return ((frame[3] << 8) | frame[4]) & 0xFFFF @dataclass class IrisMblackLinkModule: """Closed-loop CCU side for the IRIS/M.BLACK LINK button/report path.""" mirror_delay: float = 0.050 report_commands: frozenset[int] = frozenset({0x00, 0x01, 0x02}) handled_values: frozenset[int] = frozenset({IRIS_MBLACK_LINK_CLEAR, IRIS_MBLACK_LINK_ACTIVE}) name: str = "iris_mblack_link" def on_rx(self, frame: bytes, label: str = "") -> ModuleDecision | None: if not frame_checksum_ok(frame) or frame[0] not in self.report_commands: return None selector = selector_from_frame(frame) if selector != SELECTOR_IRIS_MBLACK_LINK: return None value = value_from_frame(frame) if value not in self.handled_values: return None state = "active" if value == IRIS_MBLACK_LINK_ACTIVE else "clear" ack = build_frame(0x05, SELECTOR_IRIS_MBLACK_LINK, 0x0000) mirror = build_frame(0x00, SELECTOR_IRIS_MBLACK_LINK, value) return ModuleDecision( tx=( ModuleTx(ack, f"{self.name} ack selector=0x0013 value=0x{value:04X}"), ModuleTx( mirror, f"{self.name} mirror {state} selector=0x0013 value=0x{value:04X}", delay=self.mirror_delay, ), ), suppress_default_ack=True, reason=f"{self.name}_{state}_report", )