45 lines
1.5 KiB
Python
45 lines
1.5 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
|
|
from .frames import HEARTBEAT_FRAME, NEUTRAL_ACK_FRAME, frame_checksum_ok
|
|
|
|
|
|
REPORT_COMMAND_BYTES = frozenset({0x00, 0x01, 0x02})
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AckDecision:
|
|
should_ack: bool
|
|
frame: bytes = NEUTRAL_ACK_FRAME
|
|
reason: str = ""
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AckPolicy:
|
|
"""Decides whether an RCP-origin frame should get a continuation ACK."""
|
|
|
|
ack_frame: bytes = NEUTRAL_ACK_FRAME
|
|
ack_reports: bool = True
|
|
ack_heartbeats: bool = True
|
|
ack_unlabeled_checksum_frames: bool = True
|
|
|
|
def decide(self, frame: bytes, label: str = "") -> AckDecision:
|
|
if not frame_checksum_ok(frame):
|
|
return AckDecision(False, self.ack_frame, "checksum_bad")
|
|
|
|
if frame == HEARTBEAT_FRAME:
|
|
if self.ack_heartbeats:
|
|
return AckDecision(True, self.ack_frame, "heartbeat_report")
|
|
return AckDecision(False, self.ack_frame, "heartbeat_ignored")
|
|
|
|
if frame[0] in REPORT_COMMAND_BYTES:
|
|
if self.ack_reports:
|
|
return AckDecision(True, self.ack_frame, f"report_cmd_{frame[0]:02X}")
|
|
return AckDecision(False, self.ack_frame, "reports_disabled")
|
|
|
|
if label == "checksum_ok_unlabeled" and self.ack_unlabeled_checksum_frames:
|
|
return AckDecision(True, self.ack_frame, "unlabeled_checksum_ok")
|
|
|
|
return AckDecision(False, self.ack_frame, f"non_report_cmd_{frame[0]:02X}")
|