1
0
Files
h8-536-decoder/h8536/serial_table_dump.py
2026-05-26 00:48:28 +10:00

200 lines
8.5 KiB
Python

from __future__ import annotations
import argparse
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import TextIO
from .bench_connect_lcd import (
BenchLogger,
FrameDetector,
_import_serial,
_read_for,
_relay_command,
_relay_settle,
_wait_for_ready,
format_frame,
frame_checksum,
frame_checksum_ok,
)
READ_COMMAND = 0x01
FRAME_LENGTH = 6
@dataclass(frozen=True)
class SelectorEncoding:
selector: int
frame_hi: int
frame_lo: int
def encode_selector(selector: int) -> SelectorEncoding:
selector &= 0x01FF
if selector <= 0x007F:
return SelectorEncoding(selector, 0x00, selector)
if selector <= 0x017F:
return SelectorEncoding(selector, 0x01, selector - 0x0080)
return SelectorEncoding(selector, 0x02, selector - 0x0180)
def build_read_frame(selector: int) -> bytes:
encoded = encode_selector(selector)
body = bytes([READ_COMMAND, encoded.frame_hi, encoded.frame_lo, 0x00, 0x00])
return body + bytes([frame_checksum(body)])
def decode_table_read_response(frame: bytes) -> tuple[int, int] | None:
if len(frame) != FRAME_LENGTH or not frame_checksum_ok(frame):
return None
if frame[0] != 0x04:
return None
return frame[2], (frame[3] << 8) | frame[4]
def default_log_path() -> Path:
return Path("captures") / f"serial-table-dump-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt"
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=(
"Read-only serial table sweep. This uses command 1 frames only; it does not write EEPROM."
)
)
parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP")
parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate")
parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port")
parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate")
parser.add_argument("--no-power-cycle", action="store_true", help="do not send relay off/on before the sweep")
parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power")
parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power")
parser.add_argument("--off-seconds", type=float, default=1.5, help="seconds to hold the DUT powered off")
parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening the relay port")
parser.add_argument("--ready-timeout", type=float, default=10.0, help="seconds to wait for heartbeat before reading")
parser.add_argument("--ready-heartbeats", type=int, default=2, help="heartbeat frames to observe before reading")
parser.add_argument("--require-ready", action="store_true", help="abort if ready heartbeat count is not observed")
parser.add_argument("--start", type=lambda text: int(text, 0), default=0x000, help="first logical selector")
parser.add_argument("--count", type=lambda text: int(text, 0), default=0x80, help="number of selectors to read")
parser.add_argument("--selector", action="append", type=lambda text: int(text, 0), help="specific selector to read; repeatable")
parser.add_argument("--gap", type=float, default=0.080, help="seconds to listen after each read frame")
parser.add_argument("--pre-drain", type=float, default=0.250, help="seconds to drain/log RX before the sweep")
parser.add_argument("--post-read", type=float, default=1.0, help="seconds to listen after the sweep")
parser.add_argument("--log", type=Path, help="capture log path")
parser.add_argument("--dry-run", action="store_true", help="print planned read frames without opening serial ports")
return parser
def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
args = build_arg_parser().parse_args(argv)
selectors = _selectors(args)
log_path = args.log or default_log_path()
if args.dry_run:
print(f"device={args.port} {args.baud} 8N1", file=stdout)
print(f"relay={args.relay_port} {args.relay_baud}", file=stdout)
print(f"power_cycle={int(not args.no_power_cycle)}", file=stdout)
for selector in selectors:
encoded = encode_selector(selector)
frame = build_read_frame(selector)
print(
f"selector=0x{selector:03X} encoded={encoded.frame_hi:02X} {encoded.frame_lo:02X} "
f"frame={format_frame(frame)} checksum_ok={int(frame_checksum_ok(frame))}",
file=stdout,
)
print(f"log={log_path}", file=stdout)
return 0
serial = _import_serial()
logger = BenchLogger(log_path, stdout=stdout)
detector = FrameDetector()
response_rows: list[tuple[int, bytes, tuple[int, int] | None]] = []
try:
logger.emit("Read-only serial table sweep")
logger.emit(f"device={args.port} {args.baud} 8N1 relay={args.relay_port} {args.relay_baud}")
logger.emit(f"log={log_path}")
logger.emit(f"selectors={len(selectors)} command=01 write_frames=0")
with serial.Serial(args.port, args.baud, bytesize=8, parity="N", stopbits=1, timeout=0.05) as device:
relay = None
try:
if not args.no_power_cycle:
relay = serial.Serial(args.relay_port, args.relay_baud, timeout=0.25)
_relay_settle(relay, args.relay_settle, logger)
_relay_command(relay, args.power_off_command, logger)
time.sleep(args.off_seconds)
device.reset_input_buffer()
detector = FrameDetector()
_relay_command(relay, args.power_on_command, logger)
else:
device.reset_input_buffer()
ready = _wait_for_ready(device, detector, logger, args.ready_timeout, args.ready_heartbeats)
if args.require_ready and not ready:
logger.event("ABORT ready heartbeat threshold was not observed")
return 2
if args.pre_drain > 0:
logger.event(f"DRAIN before read sweep {args.pre_drain:.3f}s")
_read_for(device, detector, logger, args.pre_drain)
for selector in selectors:
frame = build_read_frame(selector)
before = len(detector.frames)
logger.event(f"READ selector=0x{selector:03X} frame={format_frame(frame)}")
device.write(frame)
device.flush()
logger.chunk("TX", frame)
_read_for(device, detector, logger, args.gap)
for response in detector.frames[before:]:
decoded = decode_table_read_response(response)
if decoded is not None:
page_or_echo, value = decoded
logger.event(
f"TABLE selector=0x{selector:03X} echo={page_or_echo:02X} value={value:04X}"
)
response_rows.append((selector, response, decoded))
_read_for(device, detector, logger, args.post_read)
finally:
if relay is not None:
relay.close()
_summary(response_rows, detector, logger)
return 0
finally:
logger.close()
def _selectors(args: argparse.Namespace) -> list[int]:
if args.selector:
return [selector & 0x01FF for selector in args.selector]
start = args.start & 0x01FF
count = max(0, args.count)
return [((start + offset) & 0x01FF) for offset in range(count)]
def _summary(
response_rows: list[tuple[int, bytes, tuple[int, int] | None]],
detector: FrameDetector,
logger: BenchLogger,
) -> None:
table_rows = [(selector, decoded[1]) for selector, _frame, decoded in response_rows if decoded is not None]
logger.emit()
logger.emit("Summary")
logger.emit(f"rx_frames={len(detector.frames)} table_response_rows={len(table_rows)}")
logger.emit(f"trailing_unframed_bytes={len(detector.buffer)} resync_events={detector.resync_events} dropped_bytes={detector.dropped_bytes}")
for selector, value in table_rows:
logger.emit(f"table selector=0x{selector:03X} value=0x{value:04X}")
__all__ = [
"build_arg_parser",
"build_read_frame",
"decode_table_read_response",
"encode_selector",
"main",
]