1
0

bench test updates

This commit is contained in:
Aiden
2026-05-26 00:48:28 +10:00
parent 7c211f8112
commit 443789d6ae
11 changed files with 832 additions and 2 deletions

View File

@@ -23,12 +23,22 @@ COMMAND7_REPEAT_FRAME = bytes.fromhex("07000000005D")
@dataclass
class FrameDetector:
sync_mode: str = "checksum"
buffer: bytearray = field(default_factory=bytearray)
frames: list[bytes] = field(default_factory=list)
labels: Counter[str] = field(default_factory=Counter)
dropped_bytes: int = 0
resync_events: int = 0
def feed(self, data: bytes) -> list[tuple[bytes, str]]:
self.buffer.extend(data)
if self.sync_mode == "fixed":
return self._feed_fixed()
if self.sync_mode != "checksum":
raise ValueError(f"unknown frame sync mode {self.sync_mode!r}")
return self._feed_checksum_resync()
def _feed_fixed(self) -> list[tuple[bytes, str]]:
detected = []
while len(self.buffer) >= FRAME_LENGTH:
frame = bytes(self.buffer[:FRAME_LENGTH])
@@ -40,6 +50,35 @@ class FrameDetector:
detected.append((frame, label))
return detected
def _feed_checksum_resync(self) -> list[tuple[bytes, str]]:
detected = []
while len(self.buffer) >= FRAME_LENGTH:
offset = _next_sync_offset(self.buffer)
if offset is None:
self._drop_unsynced_prefix(len(self.buffer) - (FRAME_LENGTH - 1))
break
if offset:
self._drop_unsynced_prefix(offset)
frame = bytes(self.buffer[:FRAME_LENGTH])
if not frame_checksum_ok(frame):
self._drop_unsynced_prefix(1)
continue
del self.buffer[:FRAME_LENGTH]
label = label_frame(frame)
self.frames.append(frame)
if label:
self.labels[label] += 1
detected.append((frame, label))
return detected
def _drop_unsynced_prefix(self, count: int) -> None:
count = max(0, min(count, len(self.buffer)))
if not count:
return
del self.buffer[:count]
self.dropped_bytes += count
self.resync_events += 1
class BenchLogger:
def __init__(self, path: Path, stdout: TextIO = sys.stdout) -> None:
@@ -113,10 +152,46 @@ def label_frame(frame: bytes) -> str:
if label:
return label
if frame_checksum_ok(frame):
if frame[0] == 0x04:
return "table_readback_candidate"
if frame[0] == 0x07:
return "visible_report_candidate"
return "checksum_ok_unlabeled"
return "checksum_bad_or_unaligned"
def _next_sync_offset(buffer: bytearray) -> int | None:
scored_offsets: list[tuple[int, int]] = []
for offset in range(0, len(buffer) - FRAME_LENGTH + 1):
frame = bytes(buffer[offset : offset + FRAME_LENGTH])
if not frame_checksum_ok(frame):
continue
if offset == 0 and not _looks_like_shifted_heartbeat(frame):
return 0
label = label_frame(frame)
scored_offsets.append((_sync_score(frame, label), offset))
if not scored_offsets:
return None
return min(scored_offsets)[1]
def _sync_score(frame: bytes, label: str) -> int:
if label and label not in {"checksum_ok_unlabeled", "checksum_bad_or_unaligned"}:
return 0
if frame[0] in {0x00, 0x02, 0x04, 0x07}:
return 100
return 200
def _looks_like_shifted_heartbeat(frame: bytes) -> bool:
return frame in {
bytes.fromhex("00000080DA00"),
bytes.fromhex("000080DA0000"),
bytes.fromhex("0080DA000000"),
bytes.fromhex("80DA00000000"),
}
def default_log_path() -> Path:
return Path("captures") / f"bench-connect-lcd-sequence-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt"
@@ -141,6 +216,7 @@ def build_arg_parser() -> argparse.ArgumentParser:
parser.add_argument("--post-sequence-read", type=float, default=3.0, help="seconds to listen after the sequence")
parser.add_argument("--repeat", type=int, default=1, help="times to send the frame sequence in the same power session")
parser.add_argument("--frame", action="append", type=parse_frame, help="override preset with a custom frame; repeatable")
parser.add_argument("--sync", choices=("checksum", "fixed"), default="checksum", help="RX frame sync strategy")
parser.add_argument("--two-frame", action="store_true", help="send only the first two CONNECT candidate frames")
parser.add_argument("--command7-after", action="store_true", help="send command-7 repeat probe after the sequence")
parser.add_argument("--pre-sequence-drain", type=float, default=0.250, help="seconds to drain/log RX immediately before sending")
@@ -169,7 +245,7 @@ def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
serial = _import_serial()
logger = BenchLogger(log_path, stdout=stdout)
detector = FrameDetector()
detector = FrameDetector(sync_mode=args.sync)
try:
logger.emit("CONNECT LCD bench sequence")
logger.emit(f"device={args.port} {args.baud} 8N1 relay={args.relay_port} {args.relay_baud}")
@@ -186,7 +262,7 @@ def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
_relay_command(relay, args.power_off_command, logger)
time.sleep(args.off_seconds)
device.reset_input_buffer()
detector = FrameDetector()
detector = FrameDetector(sync_mode=args.sync)
_relay_command(relay, args.power_on_command, logger)
else:
device.reset_input_buffer()
@@ -256,9 +332,16 @@ def _read_for(device, detector: FrameDetector, logger: BenchLogger, seconds: flo
waiting = getattr(device, "in_waiting", 0)
data = device.read(waiting or 1)
if data:
dropped_before = detector.dropped_bytes
logger.chunk("RX", data)
for frame, label in detector.feed(data):
logger.event(f"DETECT {label} {format_frame(frame)}")
dropped_now = detector.dropped_bytes - dropped_before
if dropped_now:
logger.event(
f"RESYNC dropped_bytes={dropped_now} total_dropped={detector.dropped_bytes} "
f"buffered={len(detector.buffer)}"
)
def _wait_for_ready(
@@ -310,6 +393,7 @@ def _summary(detector: FrameDetector, logger: BenchLogger) -> None:
logger.emit()
logger.emit("Summary")
logger.emit(f"rx_frames={len(detector.frames)} trailing_unframed_bytes={len(detector.buffer)}")
logger.emit(f"resync_events={detector.resync_events} dropped_bytes={detector.dropped_bytes}")
for label, count in sorted(detector.labels.items()):
logger.emit(f"{label}={count}")