1
0

testing around OK state

This commit is contained in:
Aiden
2026-05-26 14:01:10 +10:00
parent 4e0ef92e25
commit 74a2e2fd2c
5 changed files with 1049 additions and 0 deletions

387
h8536/connect_ok_matrix.py Normal file
View File

@@ -0,0 +1,387 @@
from __future__ import annotations
import argparse
import json
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from itertools import permutations
from pathlib import Path
from typing import Any, TextIO
from .bench_connect_lcd import (
BenchLogger,
COMMAND7_REPEAT_FRAME,
FrameDetector,
add_serial_format_args,
_import_serial,
open_device_serial,
_read_for,
_relay_command,
_relay_settle,
_send_frame,
_wait_for_ready,
format_frame,
frame_checksum_ok,
serial_format_label,
)
FRAME_40_DXC = bytes.fromhex("04000040001E")
FRAME_80_OK = bytes.fromhex("0400008000DE")
FRAME_C0_PRIORITY = bytes.fromhex("040000C0009E")
NAMED_FRAMES = {
"40": FRAME_40_DXC,
"80": FRAME_80_OK,
"c0": FRAME_C0_PRIORITY,
}
@dataclass(frozen=True)
class MatrixCase:
name: str
frames: tuple[bytes, ...]
gap: float = 0.150
repeat: int = 1
post_read: float = 3.0
note: str = ""
def default_log_path(suite: str) -> Path:
safe_suite = "".join(char if char.isalnum() or char in "-_" else "-" for char in suite)
return Path("captures") / f"connect-ok-matrix-{safe_suite}-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt"
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Run a reproducibility matrix for the CONNECT: OK bench behavior."
)
parser.add_argument("--suite", choices=("baseline", "minimal", "single", "pair", "order", "gap", "repeat", "hold", "all"), default="minimal")
parser.add_argument("--case", action="append", help="run only matching case names; repeatable")
parser.add_argument("--limit", type=int, help="run only the first N selected cases")
parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP")
parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate")
add_serial_format_args(parser)
parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port")
parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate")
parser.add_argument("--no-power-cycle", action="store_true", help="do not power-cycle between cases")
parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power")
parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power")
parser.add_argument("--off-seconds", type=float, default=1.5, help="seconds to hold DUT power off between cases")
parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the relay port")
parser.add_argument("--ready-timeout", type=float, default=10.0, help="seconds to wait for heartbeat after power-on")
parser.add_argument("--ready-heartbeats", type=int, default=2, help="heartbeat frames to observe before sending")
parser.add_argument("--require-ready", action="store_true", help="abort a case if readiness heartbeats are not observed")
parser.add_argument("--pre-case-drain", type=float, default=0.250, help="seconds to drain/log RX before sending each case")
parser.add_argument("--post-case-read", type=float, default=3.0, help="default seconds to listen after each case")
parser.add_argument("--default-gap", type=float, default=0.150, help="default seconds to listen between frames")
parser.add_argument("--gaps", default="0.010,0.050,0.150,0.700,1.500", help="comma-separated gaps for --suite gap")
parser.add_argument("--repeat-counts", default="1,2,4", help="comma-separated repeat counts for --suite repeat")
parser.add_argument("--hold-seconds", default="3,8,20", help="comma-separated post-read times for --suite hold")
parser.add_argument("--command7-after", action="store_true", help="send command-7 previous-frame probe after each case")
parser.add_argument("--sync", choices=("checksum", "fixed"), default="checksum", help="RX frame sync strategy")
parser.add_argument("--prompt-observation", action="store_true", help="prompt for observed LCD/lamp state after each case")
parser.add_argument("--pause-between-cases", action="store_true", help="wait for Enter before starting the next case")
parser.add_argument("--log", type=Path, help="capture log path")
parser.add_argument("--result-json", type=Path, help="write machine-readable case summary")
parser.add_argument("--dry-run", action="store_true", help="print selected cases without opening serial ports")
return parser
def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
args = build_arg_parser().parse_args(argv)
cases = select_cases(args)
log_path = args.log or default_log_path(args.suite)
if args.dry_run:
_print_dry_run(args, cases, log_path, stdout)
return 0
serial = _import_serial()
logger = BenchLogger(log_path, stdout=stdout)
results: list[dict[str, Any]] = []
try:
logger.emit("CONNECT: OK bench matrix")
logger.emit(
f"suite={args.suite} cases={len(cases)} device={args.port} {args.baud} {serial_format_label(args)} "
f"relay={args.relay_port} {args.relay_baud} sync={args.sync}"
)
logger.emit(f"log={log_path}")
with open_device_serial(serial, args) as device:
relay = None
try:
if not args.no_power_cycle:
relay = serial.Serial(args.relay_port, args.relay_baud, timeout=0.25)
_relay_settle(relay, args.relay_settle, logger)
for index, case in enumerate(cases, start=1):
if args.pause_between_cases and index > 1:
input(f"Press Enter to start case {index}/{len(cases)}: {case.name}")
result = _run_case(args, device, relay, logger, case, index, len(cases))
results.append(result)
if args.require_ready and not result["ready"]:
logger.event("ABORT readiness was required")
break
finally:
if relay is not None:
relay.close()
_emit_matrix_summary(logger, results)
if args.result_json:
_write_result_json(args.result_json, log_path, args, results)
return 0
finally:
logger.close()
def select_cases(args: argparse.Namespace) -> list[MatrixCase]:
cases = build_cases(
args.suite,
default_gap=args.default_gap,
default_post_read=args.post_case_read,
gaps=_parse_float_csv(args.gaps),
repeat_counts=_parse_int_csv(args.repeat_counts),
hold_seconds=_parse_float_csv(args.hold_seconds),
)
if args.case:
filters = [item.lower() for item in args.case]
cases = [case for case in cases if any(fragment in case.name.lower() for fragment in filters)]
if args.limit is not None:
cases = cases[: max(0, args.limit)]
if not cases:
raise SystemExit("no matrix cases selected")
return cases
def build_cases(
suite: str,
*,
default_gap: float = 0.150,
default_post_read: float = 3.0,
gaps: list[float] | None = None,
repeat_counts: list[int] | None = None,
hold_seconds: list[float] | None = None,
) -> list[MatrixCase]:
gaps = gaps or [0.010, 0.050, 0.150, 0.700, 1.500]
repeat_counts = repeat_counts or [1, 2, 4]
hold_seconds = hold_seconds or [3.0, 8.0, 20.0]
def case(name: str, keys: tuple[str, ...], *, gap: float = default_gap, repeat: int = 1, post_read: float = default_post_read, note: str = "") -> MatrixCase:
return MatrixCase(name=name, frames=tuple(NAMED_FRAMES[key] for key in keys), gap=gap, repeat=repeat, post_read=post_read, note=note)
baseline = [
case("baseline-40-80-c0", ("40", "80", "c0"), note="known emulator-derived order"),
]
single = [
case("single-40", ("40",), note="tests whether the DXC/low path alone wakes the panel"),
case("single-80", ("80",), note="tests whether the OK/high path alone wakes the panel"),
case("single-c0", ("c0",), note="tests whether the priority-combined path alone wakes the panel"),
]
pair = [
case("pair-40-80", ("40", "80"), note="tests primer-then-OK without the C0 branch"),
case("pair-80-c0", ("80", "c0"), note="tests OK followed by priority branch"),
case("pair-40-c0", ("40", "c0"), note="tests DXC/low followed by priority branch"),
case("pair-80-40", ("80", "40"), note="tests reverse OK/DXC ordering"),
case("pair-c0-80", ("c0", "80"), note="tests C0 as primer for OK"),
case("pair-c0-40", ("c0", "40"), note="tests C0 as primer for DXC"),
]
order = [
case("order-" + "-".join(keys), keys, note="three-frame order/permutation test")
for keys in permutations(("40", "80", "c0"), 3)
]
gap_cases = [
case(f"gap-{_gap_name(gap)}-40-80-c0", ("40", "80", "c0"), gap=gap, note="same order with varied inter-frame delay")
for gap in gaps
]
repeat_cases = [
case(f"repeat-{count}x-40-80-c0", ("40", "80", "c0"), repeat=count, note="tests whether repeated cadence is required")
for count in repeat_counts
]
hold_cases = [
case(f"hold-{_gap_name(seconds)}s-40-80-c0", ("40", "80", "c0"), post_read=seconds, note="tests whether CONNECT OK persists without continued traffic")
for seconds in hold_seconds
]
suites = {
"baseline": baseline,
"minimal": baseline + [single[1], single[0], single[2]] + pair[:3],
"single": single,
"pair": pair,
"order": order,
"gap": gap_cases,
"repeat": repeat_cases,
"hold": hold_cases,
"all": baseline + single + pair + order + gap_cases + repeat_cases + hold_cases,
}
return _dedupe_cases(suites[suite])
def _run_case(
args: argparse.Namespace,
device: Any,
relay: Any | None,
logger: BenchLogger,
case: MatrixCase,
index: int,
total: int,
) -> dict[str, Any]:
detector = FrameDetector(sync_mode=args.sync)
logger.emit()
logger.emit(f"CASE {index}/{total} {case.name}")
logger.emit(f"note={case.note or '(none)'}")
logger.emit(f"gap={case.gap:.3f}s repeat={case.repeat} post_read={case.post_read:.3f}s")
for frame_index, frame in enumerate(case.frames, start=1):
logger.emit(f"case_frame[{frame_index}]={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}")
if args.no_power_cycle:
device.reset_input_buffer()
logger.event("POWER_CYCLE skipped")
else:
if relay is None:
raise SystemExit("relay was not opened")
_relay_command(relay, args.power_off_command, logger)
time.sleep(max(0.0, args.off_seconds))
device.reset_input_buffer()
_relay_command(relay, args.power_on_command, logger)
ready = _wait_for_ready(device, detector, logger, args.ready_timeout, args.ready_heartbeats)
if args.require_ready and not ready:
observation = _prompt_observation(args, logger, case)
return _case_result(case, detector, ready=ready, observation=observation)
if args.pre_case_drain > 0:
logger.event(f"DRAIN before case {args.pre_case_drain:.3f}s")
_read_for(device, detector, logger, args.pre_case_drain)
for repeat_index in range(max(1, case.repeat)):
if case.repeat > 1:
logger.event(f"BEGIN case_repeat {repeat_index + 1}/{case.repeat}")
for frame_index, frame in enumerate(case.frames, start=1):
_send_frame(device, frame, logger, f"{case.name}.r{repeat_index + 1}.f{frame_index}")
_read_for(device, detector, logger, case.gap)
if args.command7_after:
_send_frame(device, COMMAND7_REPEAT_FRAME, logger, f"{case.name}.command7_after")
_read_for(device, detector, logger, case.gap)
if case.post_read > 0:
logger.event(f"POST_READ {case.post_read:.3f}s")
_read_for(device, detector, logger, case.post_read)
observation = _prompt_observation(args, logger, case)
result = _case_result(case, detector, ready=ready, observation=observation)
logger.event(
f"CASE_RESULT {case.name} ready={int(ready)} rx_frames={result['rx_frames']} "
f"labels={json.dumps(result['labels'], sort_keys=True)}"
)
return result
def _prompt_observation(args: argparse.Namespace, logger: BenchLogger, case: MatrixCase) -> str:
if not args.prompt_observation:
return ""
prompt = f"{case.name}: LCD/lamps/readouts observation, or Enter to skip: "
observation = input(prompt).strip()
logger.event(f"OBSERVATION {case.name}: {observation or '(no note)'}")
return observation
def _case_result(case: MatrixCase, detector: FrameDetector, *, ready: bool, observation: str) -> dict[str, Any]:
return {
"name": case.name,
"note": case.note,
"frames": [format_frame(frame) for frame in case.frames],
"gap": case.gap,
"repeat": case.repeat,
"post_read": case.post_read,
"ready": ready,
"rx_frames": len(detector.frames),
"labels": dict(detector.labels),
"resync_events": detector.resync_events,
"dropped_bytes": detector.dropped_bytes,
"trailing_unframed_bytes": len(detector.buffer),
"observation": observation,
}
def _emit_matrix_summary(logger: BenchLogger, results: list[dict[str, Any]]) -> None:
logger.emit()
logger.emit("Matrix Summary")
logger.emit(f"cases={len(results)}")
for result in results:
labels = ", ".join(f"{key}={value}" for key, value in sorted(result["labels"].items())) or "no_rx_frames"
note = result["observation"] or "(no observation)"
logger.emit(
f"{result['name']}: ready={int(result['ready'])} rx_frames={result['rx_frames']} "
f"{labels} observation={note}"
)
def _write_result_json(path: Path, log_path: Path, args: argparse.Namespace, results: list[dict[str, Any]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"suite": args.suite,
"log": str(log_path),
"serial_format": f"{args.baud} {serial_format_label(args)}",
"cases": results,
}
path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def _print_dry_run(args: argparse.Namespace, cases: list[MatrixCase], log_path: Path, stdout: TextIO) -> None:
print(f"suite={args.suite}", file=stdout)
print(f"cases={len(cases)}", file=stdout)
print(f"device={args.port} {args.baud} {serial_format_label(args)}", file=stdout)
print(f"relay={args.relay_port} {args.relay_baud}", file=stdout)
print(f"power_cycle_between_cases={int(not args.no_power_cycle)}", file=stdout)
print(f"log={log_path}", file=stdout)
for index, case in enumerate(cases, start=1):
print(
f"case[{index}]={case.name} gap={case.gap:.3f}s repeat={case.repeat} "
f"post_read={case.post_read:.3f}s",
file=stdout,
)
for frame in case.frames:
print(f" frame={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}", file=stdout)
if case.note:
print(f" note={case.note}", file=stdout)
if args.command7_after:
print(f"command7_after={format_frame(COMMAND7_REPEAT_FRAME)}", file=stdout)
def _dedupe_cases(cases: list[MatrixCase]) -> list[MatrixCase]:
seen: set[str] = set()
deduped: list[MatrixCase] = []
for case in cases:
if case.name in seen:
continue
seen.add(case.name)
deduped.append(case)
return deduped
def _parse_float_csv(text: str) -> list[float]:
return [float(part.strip()) for part in text.split(",") if part.strip()]
def _parse_int_csv(text: str) -> list[int]:
return [int(part.strip(), 0) for part in text.split(",") if part.strip()]
def _gap_name(value: float) -> str:
if value >= 1:
return f"{value:.1f}".rstrip("0").rstrip(".").replace(".", "p")
milliseconds = int(round(value * 1000))
return f"{milliseconds}ms"
__all__ = [
"FRAME_40_DXC",
"FRAME_80_OK",
"FRAME_C0_PRIORITY",
"MatrixCase",
"build_arg_parser",
"build_cases",
"main",
"select_cases",
]