gated run
This commit is contained in:
136
h8536/serial_scenario_unexpected.py
Normal file
136
h8536/serial_scenario_unexpected.py
Normal file
@@ -0,0 +1,136 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import re
|
||||
import sys
|
||||
from collections import Counter
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Iterable, TextIO
|
||||
|
||||
|
||||
DETECT_RE = re.compile(
|
||||
r"^(?P<time>\d\d:\d\d:\d\d\.\d{3})\s+DETECT\s+"
|
||||
r"(?:(?P<label>\S+)\s+)?(?P<frame>(?:[0-9A-Fa-f]{2}\s*){6})\s*$"
|
||||
)
|
||||
|
||||
DEFAULT_IGNORED_LABELS = {
|
||||
"heartbeat",
|
||||
"connect_ok_path_response_candidate",
|
||||
"connect_c0_path_response_candidate",
|
||||
"table_readback_candidate",
|
||||
"gated_active_0004_response_candidate",
|
||||
"gated_active_0004_transition_candidate",
|
||||
}
|
||||
|
||||
KNOWN_FRAME_LABELS = {
|
||||
"00 00 15 80 00 CF": "known_call_button_active_report",
|
||||
"00 00 15 00 00 4F": "known_call_button_inactive_report",
|
||||
"00 00 07 80 00 DD": "known_cam_power_button_report",
|
||||
"01 00 04 00 00 5F": "gated_active_0004_response_candidate",
|
||||
"02 00 04 00 00 5C": "gated_active_0004_transition_candidate",
|
||||
}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DetectedFrame:
|
||||
timestamp: str
|
||||
label: str
|
||||
frame: str
|
||||
line_number: int
|
||||
|
||||
|
||||
def build_arg_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Show unexpected DETECT frames from a serial_scenario capture log."
|
||||
)
|
||||
parser.add_argument("log", type=Path, help="serial_scenario capture log")
|
||||
parser.add_argument(
|
||||
"--include-refresh",
|
||||
action="store_true",
|
||||
help="include heartbeat, table-readback, and CONNECT OK refresh frames",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--show-all",
|
||||
action="store_true",
|
||||
help="print every matching DETECT frame after the summary",
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
|
||||
args = build_arg_parser().parse_args(argv)
|
||||
text = args.log.read_text(encoding="utf-8")
|
||||
frames = parse_detected_frames(text.splitlines())
|
||||
interesting = frames if args.include_refresh else filter_expected_refresh(frames)
|
||||
print(format_report(frames, interesting, include_refresh=args.include_refresh, show_all=args.show_all), file=stdout)
|
||||
return 0
|
||||
|
||||
|
||||
def parse_detected_frames(lines: Iterable[str]) -> list[DetectedFrame]:
|
||||
frames: list[DetectedFrame] = []
|
||||
for line_number, line in enumerate(lines, start=1):
|
||||
match = DETECT_RE.match(line.strip())
|
||||
if not match:
|
||||
continue
|
||||
frame = " ".join(match.group("frame").upper().split())
|
||||
label = KNOWN_FRAME_LABELS.get(frame, match.group("label") or "checksum_ok_unlabeled")
|
||||
frames.append(
|
||||
DetectedFrame(
|
||||
timestamp=match.group("time"),
|
||||
label=label,
|
||||
frame=frame,
|
||||
line_number=line_number,
|
||||
)
|
||||
)
|
||||
return frames
|
||||
|
||||
|
||||
def filter_expected_refresh(frames: Iterable[DetectedFrame]) -> list[DetectedFrame]:
|
||||
return [frame for frame in frames if frame.label not in DEFAULT_IGNORED_LABELS]
|
||||
|
||||
|
||||
def format_report(
|
||||
frames: list[DetectedFrame],
|
||||
interesting: list[DetectedFrame],
|
||||
*,
|
||||
include_refresh: bool,
|
||||
show_all: bool,
|
||||
) -> str:
|
||||
lines = [
|
||||
"Serial scenario unexpected-frame summary",
|
||||
f"detected_frames={len(frames)} mode={'all' if include_refresh else 'unexpected-only'}",
|
||||
]
|
||||
label_counts = Counter(frame.label for frame in frames)
|
||||
if label_counts:
|
||||
lines.append("labels:")
|
||||
for label, count in sorted(label_counts.items()):
|
||||
lines.append(f" {label}: {count}")
|
||||
ignored = len(frames) - len(interesting)
|
||||
if not include_refresh:
|
||||
lines.append(f"ignored_expected_refresh={ignored}")
|
||||
lines.append(f"interesting_frames={len(interesting)}")
|
||||
|
||||
frame_counts = Counter((frame.frame, frame.label) for frame in interesting)
|
||||
if frame_counts:
|
||||
lines.append("interesting frame counts:")
|
||||
for (frame_text, label), count in sorted(frame_counts.items(), key=lambda item: (-item[1], item[0][0])):
|
||||
first = next(frame for frame in interesting if frame.frame == frame_text and frame.label == label)
|
||||
lines.append(f" {count:4d}x {frame_text} label={label} first={first.timestamp} line={first.line_number}")
|
||||
else:
|
||||
lines.append("no unexpected checksum-valid frames found")
|
||||
|
||||
if show_all and interesting:
|
||||
lines.append("timeline:")
|
||||
for frame in interesting:
|
||||
lines.append(f" {frame.timestamp} line={frame.line_number} {frame.frame} label={frame.label}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
__all__ = [
|
||||
"DetectedFrame",
|
||||
"filter_expected_refresh",
|
||||
"format_report",
|
||||
"main",
|
||||
"parse_detected_frames",
|
||||
]
|
||||
Reference in New Issue
Block a user