from __future__ import annotations from dataclasses import dataclass from .frames import HEARTBEAT_FRAME, NEUTRAL_ACK_FRAME, frame_checksum_ok REPORT_COMMAND_BYTES = frozenset({0x00, 0x01, 0x02}) @dataclass(frozen=True) class AckDecision: should_ack: bool frame: bytes = NEUTRAL_ACK_FRAME reason: str = "" @dataclass(frozen=True) class AckPolicy: """Decides whether an RCP-origin frame should get a continuation ACK.""" ack_frame: bytes = NEUTRAL_ACK_FRAME ack_reports: bool = True ack_heartbeats: bool = True ack_unlabeled_checksum_frames: bool = True def decide(self, frame: bytes, label: str = "") -> AckDecision: if not frame_checksum_ok(frame): return AckDecision(False, self.ack_frame, "checksum_bad") if frame == HEARTBEAT_FRAME: if self.ack_heartbeats: return AckDecision(True, self.ack_frame, "heartbeat_report") return AckDecision(False, self.ack_frame, "heartbeat_ignored") if frame[0] in REPORT_COMMAND_BYTES: if self.ack_reports: return AckDecision(True, self.ack_frame, f"report_cmd_{frame[0]:02X}") return AckDecision(False, self.ack_frame, "reports_disabled") if label == "checksum_ok_unlabeled" and self.ack_unlabeled_checksum_frames: return AckDecision(True, self.ack_frame, "unlabeled_checksum_ok") return AckDecision(False, self.ack_frame, f"non_report_cmd_{frame[0]:02X}")