1
0
Files
2026-05-27 21:37:50 +10:00

209 lines
8.9 KiB
Python

from __future__ import annotations
import argparse
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import TextIO
from h8536.bench_connect_lcd import (
BenchLogger,
_import_serial,
_read_relay_lines,
_relay_command,
_relay_settle,
add_serial_format_args,
open_device_serial,
serial_format_label,
)
from .controller import CcuConfig, CcuEmulator
from .frames import ACTIVE_SEED_COMMAND0, CONNECT_CADENCE_SEQUENCE, NEUTRAL_ACK_FRAME, format_frame, parse_frame
from .iris_mblack_link import IrisMblackLinkModule
from .modules import CcuModule
from .policy import AckPolicy
from .refresh import PeriodicRefresh
from .serial_link import SerialLink
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Run a small fake CCU for the Sony RCP PT2-style serial link.")
parser.add_argument("--port", default="COM5", help="RS232 serial port connected to the RCP")
parser.add_argument("--baud", type=int, default=38400, help="RCP serial baud rate")
add_serial_format_args(parser)
parser.add_argument("--duration", type=float, default=30.0, help="seconds to run the CCU loop")
parser.add_argument("--sync", choices=("checksum", "fixed"), default="checksum", help="RX frame sync strategy")
parser.add_argument("--log", type=Path, help="capture log path")
parser.add_argument(
"--seed",
choices=("command0", "connect-sequence", "none"),
default="command0",
help="built-in wake-up seed before reactive ACK loop",
)
parser.add_argument("--seed-frame", action="append", type=parse_frame, help="custom seed frame; repeatable")
parser.add_argument("--seed-gap", type=float, default=0.050, help="seconds to listen after each seed frame")
parser.add_argument("--ready-heartbeats", type=int, default=1, help="heartbeats to observe before seeding")
parser.add_argument("--ready-timeout", type=float, default=10.0, help="seconds to wait for ready heartbeat")
parser.add_argument("--ack-frame", type=parse_frame, default=NEUTRAL_ACK_FRAME, help="ACK frame to send after RCP reports")
parser.add_argument("--ack-delay", type=float, default=0.0, help="seconds to wait after detecting an RCP frame before ACK")
parser.add_argument("--no-ack-heartbeats", action="store_true", help="do not ACK heartbeat frames")
parser.add_argument("--no-ack-reports", action="store_true", help="do not ACK report-looking frames")
parser.add_argument(
"--no-ack-unlabeled",
action="store_true",
help="do not ACK checksum-valid unlabeled frames outside known report command bytes",
)
parser.add_argument("--refresh-frame", action="append", type=parse_frame, help="optional periodic refresh frame")
parser.add_argument(
"--refresh-active",
action="store_true",
help="periodically refresh selector zero with command0 0x8080",
)
parser.add_argument("--refresh-interval", type=float, default=0.0, help="seconds between optional refresh frames")
parser.add_argument("--loop-poll", type=float, default=0.001, help="sleep between service loop iterations")
parser.add_argument(
"--iris-mblack-link",
action="store_true",
help="enable the selector 0x0013 IRIS/M.BLACK LINK ACK-and-mirror module",
)
parser.add_argument(
"--iris-mblack-link-mirror-delay",
type=float,
default=0.050,
help="seconds between the selector 0x0013 ACK and command-0 mirror",
)
parser.add_argument("--power-cycle", action="store_true", help="power-cycle DUT through relay before starting")
parser.add_argument("--relay-port", default="COM6", help="Pico relay serial port")
parser.add_argument("--relay-baud", type=int, default=115200, help="Pico relay serial baud rate")
parser.add_argument("--power-off-command", default="off", help="relay command used to remove DUT power")
parser.add_argument("--power-on-command", default="on", help="relay command used to apply DUT power")
parser.add_argument("--off-seconds", type=float, default=1.5, help="seconds to hold DUT powered off")
parser.add_argument("--relay-settle", type=float, default=2.0, help="seconds to wait after opening relay port")
parser.add_argument("--dry-run", action="store_true", help="print configuration without opening serial ports")
return parser
def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
args = build_arg_parser().parse_args(argv)
seed_frames = _seed_frames(args)
refresh_frames = _refresh_frames(args)
modules = _modules(args)
log_path = args.log or _default_log_path()
if args.dry_run:
_print_dry_run(args, seed_frames, refresh_frames, modules, log_path, stdout)
return 0
serial = _import_serial()
logger = BenchLogger(log_path, stdout=stdout)
relay = None
try:
logger.emit("PT2 fake CCU")
logger.emit(f"device={args.port} {args.baud} {serial_format_label(args)} sync={args.sync}")
logger.emit(f"log={log_path}")
logger.emit(f"ack_frame={format_frame(args.ack_frame)}")
logger.emit("seed_frames=" + (" | ".join(format_frame(frame) for frame in seed_frames) or "none"))
if refresh_frames:
logger.emit(
f"refresh_interval={args.refresh_interval:.3f}s frames="
+ " | ".join(format_frame(frame) for frame in refresh_frames)
)
if modules:
logger.emit("modules=" + " | ".join(module.name for module in modules))
with open_device_serial(serial, args) as device:
if args.power_cycle:
relay = serial.Serial(args.relay_port, args.relay_baud, timeout=0.25)
_relay_settle(relay, args.relay_settle, logger)
_relay_command(relay, args.power_off_command, logger)
time.sleep(max(0.0, args.off_seconds))
device.reset_input_buffer()
_relay_command(relay, args.power_on_command, logger)
else:
device.reset_input_buffer()
link = SerialLink(device, logger, sync_mode=args.sync)
config = CcuConfig(
seed_frames=tuple(seed_frames),
seed_gap=args.seed_gap,
ack_delay=args.ack_delay,
ready_heartbeats=args.ready_heartbeats,
ready_timeout=args.ready_timeout,
loop_poll=args.loop_poll,
)
policy = AckPolicy(
ack_frame=args.ack_frame,
ack_reports=not args.no_ack_reports,
ack_heartbeats=not args.no_ack_heartbeats,
ack_unlabeled_checksum_frames=not args.no_ack_unlabeled,
)
refresh = PeriodicRefresh(frames=refresh_frames, interval=args.refresh_interval)
CcuEmulator(link, logger, config=config, ack_policy=policy, refresh=refresh, modules=modules).run(
args.duration
)
return 0
finally:
if relay is not None:
relay.close()
logger.close()
def _seed_frames(args: argparse.Namespace) -> list[bytes]:
if args.seed_frame:
return list(args.seed_frame)
if args.seed == "none":
return []
if args.seed == "connect-sequence":
return list(CONNECT_CADENCE_SEQUENCE)
return [ACTIVE_SEED_COMMAND0]
def _refresh_frames(args: argparse.Namespace) -> list[bytes]:
frames = list(args.refresh_frame or [])
if args.refresh_active:
frames.append(ACTIVE_SEED_COMMAND0)
return frames
def _modules(args: argparse.Namespace) -> tuple[CcuModule, ...]:
modules: list[CcuModule] = []
if args.iris_mblack_link:
modules.append(IrisMblackLinkModule(mirror_delay=max(0.0, args.iris_mblack_link_mirror_delay)))
return tuple(modules)
def _print_dry_run(
args: argparse.Namespace,
seed_frames: list[bytes],
refresh_frames: list[bytes],
modules: tuple[CcuModule, ...],
log_path: Path,
stdout: TextIO,
) -> None:
print(f"device={args.port} {args.baud} {serial_format_label(args)} sync={args.sync}", file=stdout)
print(f"duration={args.duration:.3f}s log={log_path}", file=stdout)
print(f"power_cycle={int(args.power_cycle)} relay={args.relay_port} {args.relay_baud}", file=stdout)
print(f"ack_frame={format_frame(args.ack_frame)}", file=stdout)
print("seed_frames=" + (" | ".join(format_frame(frame) for frame in seed_frames) or "none"), file=stdout)
print(
f"refresh_interval={args.refresh_interval:.3f}s frames="
+ (" | ".join(format_frame(frame) for frame in refresh_frames) or "none"),
file=stdout,
)
print("modules=" + (" | ".join(module.name for module in modules) or "none"), file=stdout)
def _default_log_path() -> Path:
return Path("captures") / f"ccu-emulator-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt"
if __name__ == "__main__":
raise SystemExit(main())