199 lines
8.4 KiB
Python
199 lines
8.4 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import TextIO
|
|
|
|
from .bench_connect_lcd import (
|
|
BenchLogger,
|
|
FrameDetector,
|
|
_import_serial,
|
|
_read_for,
|
|
_relay_command,
|
|
_relay_settle,
|
|
_wait_for_ready,
|
|
format_frame,
|
|
frame_checksum,
|
|
frame_checksum_ok,
|
|
)
|
|
|
|
|
|
READ_COMMAND = 0x01
|
|
FRAME_LENGTH = 6
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SelectorEncoding:
|
|
selector: int
|
|
frame_hi: int
|
|
frame_lo: int
|
|
|
|
|
|
def encode_selector(selector: int) -> SelectorEncoding:
|
|
selector &= 0x01FF
|
|
if selector <= 0x007F:
|
|
return SelectorEncoding(selector, 0x00, selector)
|
|
if selector <= 0x017F:
|
|
return SelectorEncoding(selector, 0x01, selector - 0x0080)
|
|
return SelectorEncoding(selector, 0x02, selector - 0x0180)
|
|
|
|
|
|
def build_read_frame(selector: int) -> bytes:
|
|
encoded = encode_selector(selector)
|
|
body = bytes([READ_COMMAND, encoded.frame_hi, encoded.frame_lo, 0x00, 0x00])
|
|
return body + bytes([frame_checksum(body)])
|
|
|
|
|
|
def decode_table_read_response(frame: bytes) -> tuple[int, int] | None:
|
|
if len(frame) != FRAME_LENGTH or not frame_checksum_ok(frame):
|
|
return None
|
|
if frame[0] != 0x04:
|
|
return None
|
|
return frame[2], (frame[3] << 8) | frame[4]
|
|
|
|
|
|
def default_log_path() -> Path:
|
|
return Path("captures") / f"serial-table-dump-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt"
|
|
|
|
|
|
def build_arg_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description=(
|
|
"Read-only serial table sweep. This uses command 1 frames only; it does not write EEPROM."
|
|
)
|
|
)
|
|
parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP")
|
|
parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate")
|
|
parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port")
|
|
parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate")
|
|
parser.add_argument("--no-power-cycle", action="store_true", help="do not send relay off/on before the sweep")
|
|
parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power")
|
|
parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power")
|
|
parser.add_argument("--off-seconds", type=float, default=1.5, help="seconds to hold the DUT powered off")
|
|
parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the relay port")
|
|
parser.add_argument("--ready-timeout", type=float, default=10.0, help="seconds to wait for heartbeat before reading")
|
|
parser.add_argument("--ready-heartbeats", type=int, default=2, help="heartbeat frames to observe before reading")
|
|
parser.add_argument("--require-ready", action="store_true", help="abort if ready heartbeat count is not observed")
|
|
parser.add_argument("--start", type=lambda text: int(text, 0), default=0x000, help="first logical selector")
|
|
parser.add_argument("--count", type=lambda text: int(text, 0), default=0x80, help="number of selectors to read")
|
|
parser.add_argument("--selector", action="append", type=lambda text: int(text, 0), help="specific selector to read; repeatable")
|
|
parser.add_argument("--gap", type=float, default=0.080, help="seconds to listen after each read frame")
|
|
parser.add_argument("--pre-drain", type=float, default=0.250, help="seconds to drain/log RX before the sweep")
|
|
parser.add_argument("--post-read", type=float, default=1.0, help="seconds to listen after the sweep")
|
|
parser.add_argument("--log", type=Path, help="capture log path")
|
|
parser.add_argument("--dry-run", action="store_true", help="print planned read frames without opening serial ports")
|
|
return parser
|
|
|
|
|
|
def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
|
|
args = build_arg_parser().parse_args(argv)
|
|
selectors = _selectors(args)
|
|
log_path = args.log or default_log_path()
|
|
|
|
if args.dry_run:
|
|
print(f"device={args.port} {args.baud} 8N1", file=stdout)
|
|
print(f"relay={args.relay_port} {args.relay_baud}", file=stdout)
|
|
print(f"power_cycle={int(not args.no_power_cycle)}", file=stdout)
|
|
for selector in selectors:
|
|
encoded = encode_selector(selector)
|
|
frame = build_read_frame(selector)
|
|
print(
|
|
f"selector=0x{selector:03X} encoded={encoded.frame_hi:02X} {encoded.frame_lo:02X} "
|
|
f"frame={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}",
|
|
file=stdout,
|
|
)
|
|
print(f"log={log_path}", file=stdout)
|
|
return 0
|
|
|
|
serial = _import_serial()
|
|
logger = BenchLogger(log_path, stdout=stdout)
|
|
detector = FrameDetector()
|
|
response_rows: list[tuple[int, bytes, tuple[int, int] | None]] = []
|
|
try:
|
|
logger.emit("Read-only serial table sweep")
|
|
logger.emit(f"device={args.port} {args.baud} 8N1 relay={args.relay_port} {args.relay_baud}")
|
|
logger.emit(f"log={log_path}")
|
|
logger.emit(f"selectors={len(selectors)} command=01 write_frames=0")
|
|
|
|
with serial.Serial(args.port, args.baud, bytesize=8, parity="N", stopbits=1, timeout=0.05) as device:
|
|
relay = None
|
|
try:
|
|
if not args.no_power_cycle:
|
|
relay = serial.Serial(args.relay_port, args.relay_baud, timeout=0.25)
|
|
_relay_settle(relay, args.relay_settle, logger)
|
|
_relay_command(relay, args.power_off_command, logger)
|
|
time.sleep(args.off_seconds)
|
|
device.reset_input_buffer()
|
|
detector = FrameDetector()
|
|
_relay_command(relay, args.power_on_command, logger)
|
|
else:
|
|
device.reset_input_buffer()
|
|
|
|
ready = _wait_for_ready(device, detector, logger, args.ready_timeout, args.ready_heartbeats)
|
|
if args.require_ready and not ready:
|
|
logger.event("ABORT ready heartbeat threshold was not observed")
|
|
return 2
|
|
if args.pre_drain > 0:
|
|
logger.event(f"DRAIN before read sweep {args.pre_drain:.3f}s")
|
|
_read_for(device, detector, logger, args.pre_drain)
|
|
|
|
for selector in selectors:
|
|
frame = build_read_frame(selector)
|
|
before = len(detector.frames)
|
|
logger.event(f"READ selector=0x{selector:03X} frame={format_frame(frame)}")
|
|
device.write(frame)
|
|
device.flush()
|
|
logger.chunk("TX", frame)
|
|
_read_for(device, detector, logger, args.gap)
|
|
for response in detector.frames[before:]:
|
|
decoded = decode_table_read_response(response)
|
|
if decoded is not None:
|
|
page_or_echo, value = decoded
|
|
logger.event(
|
|
f"TABLE selector=0x{selector:03X} echo={page_or_echo:02X} value={value:04X}"
|
|
)
|
|
response_rows.append((selector, response, decoded))
|
|
|
|
_read_for(device, detector, logger, args.post_read)
|
|
finally:
|
|
if relay is not None:
|
|
relay.close()
|
|
_summary(response_rows, detector, logger)
|
|
return 0
|
|
finally:
|
|
logger.close()
|
|
|
|
|
|
def _selectors(args: argparse.Namespace) -> list[int]:
|
|
if args.selector:
|
|
return [selector & 0x01FF for selector in args.selector]
|
|
start = args.start & 0x01FF
|
|
count = max(0, args.count)
|
|
return [((start + offset) & 0x01FF) for offset in range(count)]
|
|
|
|
|
|
def _summary(
|
|
response_rows: list[tuple[int, bytes, tuple[int, int] | None]],
|
|
detector: FrameDetector,
|
|
logger: BenchLogger,
|
|
) -> None:
|
|
table_rows = [(selector, decoded[1]) for selector, _frame, decoded in response_rows if decoded is not None]
|
|
logger.emit()
|
|
logger.emit("Summary")
|
|
logger.emit(f"rx_frames={len(detector.frames)} table_response_rows={len(table_rows)}")
|
|
for selector, value in table_rows:
|
|
logger.emit(f"table selector=0x{selector:03X} value=0x{value:04X}")
|
|
|
|
|
|
__all__ = [
|
|
"build_arg_parser",
|
|
"build_read_frame",
|
|
"decode_table_read_response",
|
|
"encode_selector",
|
|
"main",
|
|
]
|