From 21f0e455ee0d1aeade9facfe73de5ce9907e761c Mon Sep 17 00:00:00 2001 From: Aiden <68633820+awils27@users.noreply.github.com> Date: Wed, 27 May 2026 12:17:12 +1000 Subject: [PATCH] webcam copy --- README.md | 14 +++ docs/pt2-copy-state-machine.md | 18 +++- docs/pt2-protocol.md | 19 ++++ h8536/camera_snapshots.py | 168 +++++++++++++++++++++++++++++++++ h8536/serial_scenario.py | 161 ++++++++++++++++++++++++++++++- tests/test_serial_scenario.py | 39 ++++++++ 6 files changed, 410 insertions(+), 9 deletions(-) create mode 100644 h8536/camera_snapshots.py diff --git a/README.md b/README.md index fe180ae..0af8844 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,19 @@ To start the current emulator harness: ``` The real-device bench helper uses `pyserial`; install repo dependencies with `.\.venv\Scripts\python.exe -m pip install -r requirements.txt` if needed. +Optional webcam snapshots for `scripts\serial_scenario.py` use OpenCV; install it only on the bench machine that needs panel photos: + +```powershell +.\.venv\Scripts\python.exe -m pip install opencv-python +``` + +Example snapshot run: + +```powershell +.\.venv\Scripts\python.exe scripts\serial_scenario.py scenarios\copy-step-006d-006c-1000ms.json --parity E --quiet-console --log captures\copy-webcam.txt --result-json captures\copy-webcam-result.json --snapshot-dir captures\copy-webcam-shots --camera-index 4 --snapshot-delays 0.5 +``` + +Current bench calibration: webcam index `4` gives the useful panel view, and a `0.5` second post-command delay is enough for LCD changes to settle without producing too many images. The current PT2/protocol reconstruction is documented in [docs/pt2-protocol.md](docs/pt2-protocol.md), with focused mini-notes for [COPY state](docs/pt2-copy-state-machine.md), [menu state](docs/pt2-menu-state-machine.md), the [session rhythm ROM trace](docs/pt2-session-rhythm-trace.md), [continuation commands](docs/pt2-continuation-command-trace.md), and [report aftermath handling](docs/pt2-report-aftermath-trace.md). @@ -135,6 +148,7 @@ Minimal smoke-test shape: - Includes a bench ACK probe that reproduces the `01 00 00...` -> `01 00 01...` visible retry burst, waits for `07 80 40 20 90 2D`, then sends a candidate command-5 ACK and reports whether the target keeps repeating. - Includes a checksum-resynchronizing bench receiver that scans RX byte streams for valid six-byte frames, avoids common shifted-heartbeat false locks, and can fall back to the old fixed six-byte slicer with `--sync fixed`. - Includes a JSON scenario bench runner for repeatable multi-step serial tests, including low-latency ACK-aware command-1 probes that can send the current command-5 ACK candidate immediately after the retry frame appears, with explicit max-ACK/max-target guardrails. +- The scenario runner can optionally schedule webcam snapshots on each command send, writing image paths into logs/result JSON without delaying timing-sensitive serial steps. - Includes a modular fake-CCU runner in `ccu_emulator/` that seeds active state, listens for complete RCP report frames, and immediately sends the neutral command-5 ACK `05 00 40 00 00 1F`; optional periodic refresh frames let lamp/value streaming be tested separately from report ACKs. - Includes a PT2 state-map-aware bench runner/analyzer for the current CONNECT gate proof: it hunts a fresh device `07...` visible-drain token candidate, sends exactly one selector-zero command-4 force, probes `E000[0]` with command 1, optionally uses command 7 to recover a hidden finalized response, and labels likely token-destroying turns. - Includes a bounded emulator CONNECT state-search tool that patches small ROM-derived RAM/table surfaces, runs either the direct CONNECT branch or the selector-zero queue dispatch path, and classifies LCD outcomes as OK, DXC, NOT ACT, or other. diff --git a/docs/pt2-copy-state-machine.md b/docs/pt2-copy-state-machine.md index de17531..f4b693d 100644 --- a/docs/pt2-copy-state-machine.md +++ b/docs/pt2-copy-state-machine.md @@ -124,15 +124,25 @@ Family-00 comparison run on 2026-05-27: -> 02 00 04 responses after 006C ``` -This suggests the family-00 forms update/read back selector values but should not -be treated as equivalent to the command-5 COPY side-effect selectors unless an -LCD observation proves otherwise. +Webcam-confirmed LCD results from the same 2026-05-27 run: + +```text +00 00 6D 00 00 37 -> COPY IN PROGRESS +00 00 6C 00 00 36 -> COPY COMPLETED +05 00 6D 00 00 32 -> COPY IN PROGRESS +05 00 6C 00 00 33 -> COPY COMPLETED +``` + +The 250 ms and 1000 ms gap variants both produced the same LCD sequence. This +means the family-00 selector/write form can reach the COPY LCD side effects, even +though its serial response rhythm is the table-readback rhythm rather than the +command-5 `01 00 02` / `02 00 04` rhythm. Current interpretation: - `0x006D` is a copy-start/progress-window refresh selector. - `0x006C` is a completion/exit selector that only behaves cleanly while the copy window is live. -- The observed `00 00 6C 00 00 36` frame is copy-related, but should not be read as "COPY COMPLETED" by itself. +- The observed `00 00 6C 00 00 36` frame can display `COPY COMPLETED` when it is sent inside a live copy window, but should not be read as a stateless "show completed" command. - The copy window is transient and timer-controlled. - The panel does not treat `0x006C` as a stateless "show completed" command. diff --git a/docs/pt2-protocol.md b/docs/pt2-protocol.md index 73cf9fb..6995b6c 100644 --- a/docs/pt2-protocol.md +++ b/docs/pt2-protocol.md @@ -646,6 +646,25 @@ The gated scenario first seeds candidate secondary-table feature bits with comma Then it streams `E000[0x008F]=0x1800` and `E000[0x0093]=0x90FF`. This is the better next watch if quiet/broad E000-only refreshes do not produce local-control TX. +## Bench Webcam Capture + +For tests where LCD timing/state matters, `serial_scenario.py` can take webcam +snapshots tied to exact TX commands. Current bench calibration: + +```powershell +--camera-index 4 --snapshot-delays 0.5 +``` + +Camera index `4` is the working panel-facing webcam on the current PC. A single +`0.5` second post-TX delay captures readable LCD changes without creating the +large image sets produced by `0,0.25,1.0`. + +Example: + +```powershell +.\.venv\Scripts\python.exe scripts\serial_scenario.py scenarios\copy-step-006d-006c-1000ms.json --parity E --quiet-console --log captures\copy-webcam.txt --result-json captures\copy-webcam-result.json --snapshot-dir captures\copy-webcam-shots --camera-index 4 --snapshot-delays 0.5 +``` + After a run, summarize unexpected device frames with: ```powershell diff --git a/h8536/camera_snapshots.py b/h8536/camera_snapshots.py new file mode 100644 index 0000000..5a0ebc3 --- /dev/null +++ b/h8536/camera_snapshots.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +import re +import heapq +import threading +import time +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any + + +_SAFE_NAME_RE = re.compile(r"[^A-Za-z0-9_.-]+") + + +class CameraSnapshots: + """Small optional OpenCV webcam wrapper for bench-test snapshots.""" + + def __init__( + self, + *, + camera_index: int, + output_dir: Path, + warmup_seconds: float = 0.5, + width: int | None = None, + height: int | None = None, + ) -> None: + self.camera_index = camera_index + self.output_dir = output_dir + self.warmup_seconds = max(0.0, warmup_seconds) + self.width = width + self.height = height + self.cv2: Any | None = None + self.device: Any | None = None + self._condition = threading.Condition() + self._tasks: list[tuple[float, int, dict[str, Any]]] = [] + self._records: list[dict[str, Any]] = [] + self._thread: threading.Thread | None = None + self._seq = 0 + self._closing = False + + def open(self) -> None: + try: + import cv2 + except ImportError as exc: # pragma: no cover - depends on local bench environment. + raise SystemExit( + "OpenCV is required for webcam snapshots. Install it with: " + ".\\.venv\\Scripts\\python.exe -m pip install opencv-python" + ) from exc + + self.output_dir.mkdir(parents=True, exist_ok=True) + self.cv2 = cv2 + self.device = cv2.VideoCapture(self.camera_index) + if not self.device.isOpened(): + raise SystemExit(f"could not open webcam index {self.camera_index}") + if self.width: + self.device.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) + if self.height: + self.device.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) + self._warm_up() + self._thread = threading.Thread(target=self._run, name="camera-snapshots", daemon=True) + self._thread.start() + + def close(self) -> None: + with self._condition: + self._closing = True + self._condition.notify_all() + if self._thread is not None: + self._thread.join() + self._thread = None + if self.device is not None: + self.device.release() + self.device = None + + def schedule( + self, + *, + label: str, + frame_text: str, + phase: str, + delay_seconds: float = 0.0, + step_index: int | None = None, + ) -> dict[str, Any]: + if self.cv2 is None or self.device is None: + raise RuntimeError("camera snapshotter is not open") + + due_monotonic = time.monotonic() + max(0.0, delay_seconds) + due_timestamp = datetime.now() + timedelta(seconds=max(0.0, delay_seconds)) + timestamp = due_timestamp.strftime("%Y%m%d-%H%M%S-%f")[:-3] + step_part = f"step{step_index:03d}_" if step_index is not None else "" + frame_part = frame_text.replace(" ", "") + filename = _safe_name(f"{timestamp}_{step_part}{phase}_{label}_{frame_part}.jpg") + path = self.output_dir / filename + record: dict[str, Any] = { + "path": str(path), + "scheduled_timestamp": timestamp, + "camera_index": self.camera_index, + "step_index": step_index, + "phase": phase, + "delay_seconds": delay_seconds, + "label": label, + "frame": frame_text, + "status": "scheduled", + } + with self._condition: + self._seq += 1 + heapq.heappush(self._tasks, (due_monotonic, self._seq, record)) + self._records.append(record) + self._condition.notify_all() + return record + + def records(self) -> list[dict[str, Any]]: + with self._condition: + return [dict(record) for record in self._records] + + def _run(self) -> None: + while True: + with self._condition: + while not self._tasks and not self._closing: + self._condition.wait() + if not self._tasks and self._closing: + return + due_monotonic, _seq, record = self._tasks[0] + wait_seconds = due_monotonic - time.monotonic() + if wait_seconds > 0: + self._condition.wait(wait_seconds) + continue + heapq.heappop(self._tasks) + self._write_snapshot(record) + + def _write_snapshot(self, record: dict[str, Any]) -> None: + if self.cv2 is None or self.device is None: + record["status"] = "error" + record["error"] = "camera closed before capture" + return + + image = None + ok = False + for _attempt in range(3): + ok, image = self.device.read() + if ok: + break + time.sleep(0.020) + if not ok or image is None: + record["status"] = "error" + record["error"] = f"webcam index {self.camera_index} did not return an image" + return + + path = Path(str(record["path"])) + if not self.cv2.imwrite(str(path), image): + record["status"] = "error" + record["error"] = f"failed to write webcam snapshot {path}" + return + record["status"] = "written" + record["captured_timestamp"] = datetime.now().strftime("%Y%m%d-%H%M%S-%f")[:-3] + + def _warm_up(self) -> None: + if self.device is None: + return + deadline = time.monotonic() + self.warmup_seconds + while time.monotonic() < deadline: + self.device.read() + time.sleep(0.020) + self.device.read() + + +def _safe_name(text: str) -> str: + cleaned = _SAFE_NAME_RE.sub("_", text.strip()).strip("._") + return cleaned or "snapshot.jpg" diff --git a/h8536/serial_scenario.py b/h8536/serial_scenario.py index 632e0a9..b0ae874 100644 --- a/h8536/serial_scenario.py +++ b/h8536/serial_scenario.py @@ -26,6 +26,7 @@ from .bench_connect_lcd import ( parse_frame, serial_format_label, ) +from .camera_snapshots import CameraSnapshots from .serial_table_dump import build_read_frame, decode_table_read_response @@ -44,6 +45,9 @@ class ScenarioContext: table_rows: list[dict[str, Any]] = field(default_factory=list) target_counts: dict[str, int] = field(default_factory=dict) tx_records: list[dict[str, Any]] = field(default_factory=list) + snapshot_records: list[dict[str, Any]] = field(default_factory=list) + snapshotter: CameraSnapshots | None = None + current_step_index: int | None = None ack_sent: int = 0 abort_requested: bool = False @@ -54,6 +58,37 @@ def default_log_path(scenario: dict[str, Any]) -> Path: return Path("captures") / f"{safe_name}-{datetime.now().strftime('%Y%m%d-%H%M%S')}.txt" +def _snapshot_output_dir(args: argparse.Namespace, log_path: Path) -> Path | None: + if args.snapshot_dir is not None: + return args.snapshot_dir + if args.camera_index is not None: + return log_path.parent / f"{log_path.stem}-snapshots" + return None + + +def _snapshot_camera_index(args: argparse.Namespace) -> int: + return 0 if args.camera_index is None else args.camera_index + + +def _snapshot_delays(args: argparse.Namespace) -> list[float]: + text = str(args.snapshot_delays or "").strip() + if not text: + return [] + delays: list[float] = [] + for part in text.split(","): + item = part.strip() + if not item: + continue + try: + delay = float(item) + except ValueError as exc: + raise SystemExit(f"invalid snapshot delay {item!r}") from exc + if delay < 0: + raise SystemExit("snapshot delays must be zero or positive") + delays.append(delay) + return delays + + def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Run JSON-described serial bench scenarios against the real RCP." @@ -76,6 +111,34 @@ def build_arg_parser() -> argparse.ArgumentParser: action="store_true", help="keep full logs in --log but suppress RX/DETECT chatter on the console", ) + parser.add_argument( + "--camera-index", + type=int, + help="enable webcam snapshots with this OpenCV camera index; use 0 for the default camera", + ) + parser.add_argument( + "--snapshot-dir", + type=Path, + help="directory for webcam snapshots; enables camera index 0 if --camera-index is omitted", + ) + parser.add_argument( + "--snapshot-delays", + default="0", + help="comma-separated seconds after each send to capture, e.g. 0,0.25,1.0", + ) + parser.add_argument( + "--snapshot-before-send", + action="store_true", + help="also capture immediately before each send", + ) + parser.add_argument( + "--snapshot-acks", + action="store_true", + help="capture snapshots for generated ACK frames as well as scenario send steps", + ) + parser.add_argument("--snapshot-warmup", type=float, default=0.5, help="seconds to warm up the webcam") + parser.add_argument("--snapshot-width", type=int, help="requested webcam capture width") + parser.add_argument("--snapshot-height", type=int, help="requested webcam capture height") parser.add_argument("--dry-run", action="store_true", help="print the plan without opening serial ports") return parser @@ -93,6 +156,8 @@ def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int: console = _FilteredStdout(stdout, _quiet_console_line) if args.quiet_console else stdout logger = BenchLogger(log_path, stdout=console) detector = FrameDetector(sync_mode=args.sync) + snapshotter: CameraSnapshots | None = None + ctx: ScenarioContext | None = None try: logger.emit("Serial bench scenario") logger.emit(f"name={scenario.get('name', args.scenario.stem)}") @@ -101,24 +166,54 @@ def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int: f"relay={args.relay_port} {args.relay_baud} sync={args.sync}" ) logger.emit(f"log={log_path}") + snapshot_dir = _snapshot_output_dir(args, log_path) + if snapshot_dir is not None: + snapshotter = CameraSnapshots( + camera_index=_snapshot_camera_index(args), + output_dir=snapshot_dir, + warmup_seconds=args.snapshot_warmup, + width=args.snapshot_width, + height=args.snapshot_height, + ) + snapshotter.open() + logger.emit( + f"snapshots={snapshot_dir} camera_index={_snapshot_camera_index(args)} " + f"delays={','.join(f'{delay:g}' for delay in _snapshot_delays(args))}" + ) with open_device_serial(serial, args) as device: - ctx = ScenarioContext(args=args, logger=logger, detector=detector, device=device) + ctx = ScenarioContext( + args=args, + logger=logger, + detector=detector, + device=device, + snapshotter=snapshotter, + ) try: for index, step in enumerate(_scenario_steps(scenario), start=1): if ctx.abort_requested: logger.event("SCENARIO_ABORT requested by prior step") break action, spec = _normalize_step(step) + ctx.current_step_index = index logger.event(f"STEP {index} {action}") _run_step(ctx, action, spec) finally: + ctx.current_step_index = None if ctx.relay is not None: ctx.relay.close() + if snapshotter is not None and ctx is not None: + snapshotter.close() + ctx.snapshot_records = snapshotter.records() + snapshotter = None + if ctx is None: + raise RuntimeError("scenario did not create a context") _emit_summary(ctx, logger) if args.result_json: _write_result_json(args.result_json, scenario, log_path, ctx) return 0 finally: + if snapshotter is not None: + snapshotter.close() logger.close() @@ -178,7 +273,8 @@ def _run_step(ctx: ScenarioContext, action: str, spec: dict[str, Any]) -> None: elif action == "send": frame = _parse_required_frame(spec.get("frame")) label = str(spec.get("label", "send")) - _send_and_record(ctx, frame, label) + capture = bool(spec.get("snapshot", True)) + _send_and_record(ctx, frame, label, capture=capture) if float(spec.get("listen", 0.0)) > 0: _listen(ctx, float(spec.get("listen", 0.0))) elif action == "wait_for": @@ -276,7 +372,7 @@ def _step_table_sweep(ctx: ScenarioContext, spec: dict[str, Any]) -> None: break frame = build_read_frame(selector) ctx.logger.event(f"READ selector=0x{selector:03X} frame={format_frame(frame)}") - _send_and_record(ctx, frame, f"read_0x{selector:03X}") + _send_and_record(ctx, frame, f"read_0x{selector:03X}", capture=bool(spec.get("snapshot", False))) _listen_with_ack(ctx, gap, selector, ack) @@ -398,7 +494,7 @@ def _listen_with_ack( acked_targets.add(frame) if ack["guard"] > 0: observed.extend(_listen(ctx, ack["guard"], selector=selector)) - _send_and_record(ctx, _ack_frame_for_target(frame, ack), "ack") + _send_and_record(ctx, _ack_frame_for_target(frame, ack), "ack", capture=ctx.args.snapshot_acks) ctx.ack_sent += 1 if _ack_limit_reached(ctx, ack, ack_start=ack_start, target_start=target_start): ctx.logger.event("ACK_LIMIT reached after ACK send") @@ -471,8 +567,12 @@ def _record_table_rows(ctx: ScenarioContext, frames: list[bytes], selector: int ctx.logger.event(f"TABLE selector=0x{selector:03X} echo={echo:02X} value={value:04X}") -def _send_and_record(ctx: ScenarioContext, frame: bytes, label: str) -> None: +def _send_and_record(ctx: ScenarioContext, frame: bytes, label: str, *, capture: bool = True) -> None: + if capture: + _capture_snapshots(ctx, frame, label, phase_prefix="pre", delays=[]) _send_frame(ctx.device, frame, ctx.logger, label) + if capture: + _capture_snapshots(ctx, frame, label, phase_prefix="tx", delays=_snapshot_delays(ctx.args)) ctx.tx_records.append( { "label": label, @@ -482,6 +582,45 @@ def _send_and_record(ctx: ScenarioContext, frame: bytes, label: str) -> None: ) +def _capture_snapshots( + ctx: ScenarioContext, + frame: bytes, + label: str, + *, + phase_prefix: str, + delays: list[float], +) -> None: + if ctx.snapshotter is None: + return + frame_text = format_frame(frame) + if phase_prefix == "pre": + if not ctx.args.snapshot_before_send: + return + phases = [("pre", 0.0)] + else: + phases = [(_delay_phase(delay), delay) for delay in delays] + + for phase, delay in phases: + try: + record = ctx.snapshotter.schedule( + label=label, + frame_text=frame_text, + phase=phase, + delay_seconds=delay, + step_index=ctx.current_step_index, + ) + except RuntimeError as exc: + ctx.logger.event(f"SNAPSHOT_ERROR {label} {phase} {exc}") + continue + ctx.snapshot_records.append(record) + ctx.logger.event(f"SNAPSHOT_SCHEDULE {phase} {label} {record['path']}") + + +def _delay_phase(delay: float) -> str: + millis = int(round(max(0.0, delay) * 1000.0)) + return f"tx+{millis:04d}ms" + + def _count_target(ctx: ScenarioContext, frame: bytes) -> None: text = format_frame(frame) ctx.target_counts[text] = ctx.target_counts.get(text, 0) + 1 @@ -564,6 +703,14 @@ def _print_dry_run(args: argparse.Namespace, scenario: dict[str, Any], log_path: print(f"relay={args.relay_port} {args.relay_baud}", file=stdout) print(f"sync={args.sync}", file=stdout) print(f"log={log_path}", file=stdout) + snapshot_dir = _snapshot_output_dir(args, log_path) + if snapshot_dir is not None: + print( + f"snapshots={snapshot_dir} camera_index={_snapshot_camera_index(args)} " + f"before_send={int(args.snapshot_before_send)} " + f"delays={','.join(f'{delay:g}' for delay in _snapshot_delays(args))}", + file=stdout, + ) for index, step in enumerate(_scenario_steps(scenario), start=1): action, spec = _normalize_step(step) print(f"step[{index}]={action}", file=stdout) @@ -604,6 +751,8 @@ def _quiet_console_line(line: str) -> bool: "STEP ", "PROMPT ", "NOTE ", + "SNAPSHOT_SCHEDULE ", + "SNAPSHOT_ERROR ", "Summary", "rx_frames=", "resync_events=", @@ -706,6 +855,7 @@ def _emit_summary(ctx: ScenarioContext, logger: BenchLogger) -> None: logger.emit(f"rx_frames={len(ctx.detector.frames)} trailing_unframed_bytes={len(ctx.detector.buffer)}") logger.emit(f"resync_events={ctx.detector.resync_events} dropped_bytes={ctx.detector.dropped_bytes}") logger.emit(f"tx_frames={len(ctx.tx_records)} ack_sent={ctx.ack_sent} table_response_rows={len(ctx.table_rows)}") + logger.emit(f"snapshots={len(ctx.snapshot_records)}") logger.emit(f"abort_requested={int(ctx.abort_requested)}") for target, count in sorted(ctx.target_counts.items()): logger.emit(f"ack_target {target}={count}") @@ -729,6 +879,7 @@ def _write_result_json(path: Path, scenario: dict[str, Any], log_path: Path, ctx "labels": dict(ctx.detector.labels), "tx_frames": ctx.tx_records, "ack_sent": ctx.ack_sent, + "snapshots": ctx.snapshot_records, "abort_requested": ctx.abort_requested, "ack_targets": ctx.target_counts, "table_rows": ctx.table_rows, diff --git a/tests/test_serial_scenario.py b/tests/test_serial_scenario.py index 0bd1902..bf1592a 100644 --- a/tests/test_serial_scenario.py +++ b/tests/test_serial_scenario.py @@ -45,6 +45,45 @@ class SerialScenarioTest(unittest.TestCase): self.assertEqual(DEFAULT_ACK_TARGET, bytes.fromhex("07804020902D")) self.assertEqual(DEFAULT_ACK_FRAME, bytes.fromhex("05004000001F")) + def test_dry_run_summarizes_webcam_snapshot_options(self): + scenario = { + "name": "unit-camera", + "steps": [ + { + "action": "send", + "label": "camera_test", + "frame": "00 00 00 80 80 5A", + } + ], + } + with tempfile.TemporaryDirectory() as tmpdir: + path = Path(tmpdir) / "scenario.json" + snapshot_dir = Path(tmpdir) / "shots" + path.write_text(json.dumps(scenario), encoding="utf-8") + stdout = io.StringIO() + + exit_code = main( + [ + str(path), + "--dry-run", + "--snapshot-dir", + str(snapshot_dir), + "--camera-index", + "1", + "--snapshot-before-send", + "--snapshot-delays", + "0,0.25,1", + ], + stdout=stdout, + ) + + output = stdout.getvalue() + self.assertEqual(exit_code, 0) + self.assertIn(f"snapshots={snapshot_dir}", output) + self.assertIn("camera_index=1", output) + self.assertIn("before_send=1", output) + self.assertIn("delays=0,0.25,1", output) + if __name__ == "__main__": unittest.main()