1
0
This commit is contained in:
Aiden
2026-05-27 11:50:10 +10:00
parent 0d099235c5
commit c0304c575c
55 changed files with 26035 additions and 16 deletions

View File

@@ -141,6 +141,9 @@ def format_frame(data: bytes) -> str:
def label_frame(frame: bytes) -> str:
labels = {
bytes.fromhex("0000000080DA"): "heartbeat",
bytes.fromhex("00000080805A"): "active_selector0_keepalive_report",
bytes.fromhex("00006C000036"): "copy_completion_exit_selector_006c_candidate",
bytes.fromhex("00006D000037"): "copy_in_progress_selector_006d_candidate",
bytes.fromhex("02000200005A"): "connect_ok_path_response_candidate",
bytes.fromhex("010002000059"): "connect_c0_path_response_candidate",
bytes.fromhex("07804040A07D"): "visible_40A0_family_40",
@@ -151,6 +154,21 @@ def label_frame(frame: bytes) -> str:
bytes.fromhex("0000158000CF"): "known_call_button_active_report",
bytes.fromhex("00001500004F"): "known_call_button_inactive_report",
bytes.fromhex("0000078000DD"): "known_cam_power_button_report",
bytes.fromhex("00010F8000D4"): "known_shutter_onoff_bit7_report_candidate",
bytes.fromhex("00010F200074"): "known_shutter_onoff_bit6_report_candidate",
bytes.fromhex("00010F000054"): "known_shutter_onoff_clear_report_candidate",
bytes.fromhex("01010F8000D5"): "queued_shutter_onoff_bit7_report_candidate",
bytes.fromhex("02010F8000D6"): "queued_shutter_onoff_bit7_report_candidate",
bytes.fromhex("01010F200075"): "queued_shutter_onoff_bit6_report_candidate",
bytes.fromhex("02010F200076"): "queued_shutter_onoff_bit6_report_candidate",
bytes.fromhex("01010F000055"): "queued_shutter_onoff_clear_report_candidate",
bytes.fromhex("02010F000056"): "queued_shutter_onoff_clear_report_candidate",
bytes.fromhex("0100178000CC"): "queued_bars_button_selector_0017_active_candidate",
bytes.fromhex("0200178000CF"): "queued_bars_button_selector_0017_active_candidate",
bytes.fromhex("0100188000C3"): "queued_bars_button_selector_0018_active_candidate",
bytes.fromhex("0200188000C0"): "queued_bars_button_selector_0018_active_candidate",
bytes.fromhex("01011A080048"): "queued_iris_auto_button_selector_009a_active_candidate",
bytes.fromhex("02011A08004B"): "queued_iris_auto_button_selector_009a_active_candidate",
bytes.fromhex("01000400005F"): "gated_active_0004_response_candidate",
bytes.fromhex("02000400005C"): "gated_active_0004_transition_candidate",
}

View File

@@ -57,6 +57,7 @@ from .cpu import CPUState
from .errors import EmulatorError, UnsupportedInstruction
from .fast_paths import P9FastPath, P9FastPathConfig, P9FastPathEvent
from .memory import MemoryAccess, MemoryMap, describe_regions
from .panel import PanelAction, PanelInjection, PanelInput, parse_panel_action, resolve_panel_input
from .peripherals import LCD, P9TraceEvent, X24164Bus, X24164Device, X24164TraceEvent, factory_default_words_from_rom
from .runner import H8536Emulator, RunReport
from .sci import SCI1, SciTxEvent
@@ -97,6 +98,9 @@ __all__ = [
"P9FastPath",
"P9FastPathConfig",
"P9FastPathEvent",
"PanelAction",
"PanelInjection",
"PanelInput",
"P9TraceEvent",
"RAMCR",
"REGISTER_FIELD_END",
@@ -137,4 +141,6 @@ __all__ = [
"factory_default_words_from_rom",
"load_rom",
"main",
"parse_panel_action",
"resolve_panel_input",
]

203
h8536/emulator/panel.py Normal file
View File

@@ -0,0 +1,203 @@
from __future__ import annotations
from dataclasses import dataclass
from ..formatting import h16
@dataclass(frozen=True)
class PanelInput:
name: str
source: int
shadow: int
previous: int
bit: int
dirty: int
dirty_bit: int
selector: int | None = None
note: str = ""
@property
def spec(self) -> str:
return f"{h16(self.shadow)}.{self.bit}"
@property
def dirty_spec(self) -> str:
return f"{h16(self.dirty)}.{self.dirty_bit}"
@dataclass(frozen=True)
class PanelAction:
panel_input: PanelInput
pressed: bool
raw: str = ""
@property
def label(self) -> str:
state = "press" if self.pressed else "release"
return f"{self.panel_input.name}:{state}"
@dataclass(frozen=True)
class PanelInjection:
action: PanelAction
source_before: int
source_after: int
shadow_before: int
shadow_after: int
previous_before: int
previous_after: int
dirty_before: int
dirty_after: int
def summary(self) -> str:
panel_input = self.action.panel_input
selector = "" if panel_input.selector is None else f" selector=0x{panel_input.selector:04X}"
return (
f"{self.action.label} source={h16(panel_input.source)} "
f"shadow={panel_input.spec} previous={h16(panel_input.previous)} "
f"dirty={panel_input.dirty_spec}{selector}"
)
PANEL_LANES: tuple[tuple[int, int, int, int, int], ...] = (
(0xF102, 0xF6D7, 0xF6E7, 0xF6F2, 7),
(0xF103, 0xF6D6, 0xF6E6, 0xF6F2, 6),
(0xF104, 0xF6D5, 0xF6E5, 0xF6F2, 5),
(0xF105, 0xF6D4, 0xF6E4, 0xF6F2, 4),
(0xF106, 0xF6D3, 0xF6E3, 0xF6F2, 3),
(0xF107, 0xF6D2, 0xF6E2, 0xF6F2, 2),
(0xF108, 0xF6D1, 0xF6E1, 0xF6F2, 1),
(0xF109, 0xF6D0, 0xF6E0, 0xF6F2, 0),
(0xF005, 0xF6DC, 0xF6EC, 0xF6F3, 4),
(0xF006, 0xF6DB, 0xF6EB, 0xF6F3, 3),
)
KNOWN_PANEL_INPUTS: dict[str, PanelInput] = {
"cam-power": PanelInput(
name="cam-power",
source=0xF105,
shadow=0xF6D4,
previous=0xF6E4,
bit=3,
dirty=0xF6F2,
dirty_bit=4,
selector=0x0007,
note="CAM POWER button; queues selector 0x0007 when gates allow",
),
"call": PanelInput(
name="call",
source=0xF006,
shadow=0xF6DB,
previous=0xF6EB,
bit=5,
dirty=0xF6F3,
dirty_bit=3,
selector=0x0015,
note="CALL button; queues selector 0x0015 active/inactive reports",
),
}
PANEL_ALIASES: dict[str, str] = {
"cam": "cam-power",
"camera-power": "cam-power",
"camera_power": "cam-power",
"cam_power": "cam-power",
"campower": "cam-power",
"power": "cam-power",
}
def resolve_panel_input(text: str) -> PanelInput:
token = _normalize_token(text)
token = PANEL_ALIASES.get(token, token)
if token in KNOWN_PANEL_INPUTS:
return KNOWN_PANEL_INPUTS[token]
if "." not in token:
raise ValueError(f"unknown panel input {text!r}; use cam-power, call, or an address bit like F6D4.3")
address_text, bit_text = token.split(".", 1)
address = _parse_address(address_text)
try:
bit = int(bit_text, 0)
except ValueError as exc:
raise ValueError(f"invalid panel bit in {text!r}") from exc
if not 0 <= bit <= 7:
raise ValueError(f"panel bit out of range in {text!r}")
for source, shadow, previous, dirty, dirty_bit in PANEL_LANES:
if address in {source, shadow}:
return PanelInput(
name=f"{h16(shadow)}.{bit}",
source=source,
shadow=shadow,
previous=previous,
bit=bit,
dirty=dirty,
dirty_bit=dirty_bit,
note="raw panel matrix input inferred from ROM shadow/dirty lane",
)
raise ValueError(f"{h16(address)} is not a known A8 panel byte shadow/source")
def parse_panel_action(text: str, *, default_pressed: bool = True) -> PanelAction:
raw = text.strip()
spec = raw
pressed = default_pressed
for separator in ("=", ":"):
if separator not in raw:
continue
left, right = raw.rsplit(separator, 1)
state = _parse_state(right)
if state is None:
continue
spec = left
pressed = state
break
return PanelAction(resolve_panel_input(spec), pressed=pressed, raw=raw)
def _parse_state(text: str) -> bool | None:
token = _normalize_token(text)
if token in {"press", "pressed", "on", "down", "1", "true", "active"}:
return True
if token in {"release", "released", "off", "up", "0", "false", "inactive"}:
return False
return None
def _normalize_token(text: str) -> str:
return text.strip().lower().replace("_", "-")
def _parse_address(text: str) -> int:
token = text.strip().upper()
if token.startswith("H'"):
token = token[2:]
elif token.startswith("$"):
token = token[1:]
elif token.startswith("0X"):
token = token[2:]
try:
value = int(token, 16)
except ValueError as exc:
raise ValueError(f"invalid panel address {text!r}") from exc
if not 0 <= value <= 0xFFFF:
raise ValueError(f"panel address out of range {text!r}")
return value
def set_bit(value: int, bit: int, enabled: bool) -> int:
mask = 1 << bit
return (value | mask) & 0xFF if enabled else (value & ~mask) & 0xFF
__all__ = [
"KNOWN_PANEL_INPUTS",
"PANEL_ALIASES",
"PANEL_LANES",
"PanelAction",
"PanelInjection",
"PanelInput",
"parse_panel_action",
"resolve_panel_input",
"set_bit",
]

View File

@@ -39,6 +39,7 @@ from .cpu import CPUState, mask, s8, s16, sign_bit
from .errors import EmulatorError, UnsupportedInstruction
from .fast_paths import P9FastPath, P9FastPathConfig
from .memory import MemoryMap
from .panel import PanelAction, PanelInjection, PanelInput, resolve_panel_input, set_bit
from .sci import SCI1
from .timers import FrtOciaScheduler, FrtRegisters
from .uart import UartTiming
@@ -134,6 +135,38 @@ class H8536Emulator:
def inject_sci1_rx_byte(self, value: int) -> None:
self.memory.inject_sci1_rx_byte(value)
def inject_panel_input(self, panel_input: PanelInput | str, *, pressed: bool = True) -> PanelInjection:
resolved = resolve_panel_input(panel_input) if isinstance(panel_input, str) else panel_input
action = PanelAction(resolved, pressed=pressed)
shadow_before = self.memory.read8(resolved.shadow)
source_before = self.memory.external.get(resolved.source, shadow_before)
previous_before = self.memory.read8(resolved.previous)
dirty_before = self.memory.read8(resolved.dirty)
source_after = set_bit(source_before, resolved.bit, pressed)
shadow_after = set_bit(shadow_before, resolved.bit, pressed)
previous_after = set_bit(previous_before, resolved.bit, not pressed)
dirty_after = dirty_before | (1 << resolved.dirty_bit)
self.memory.write8(resolved.source, source_after)
self.memory.write8(resolved.shadow, shadow_after)
# The ROM's loc_1Bxx dispatchers act on shadow XOR previous-shadow.
# Force the previous sample opposite at this bit so the main loop sees one clean edge.
self.memory.write8(resolved.previous, previous_after)
self.memory.write8(resolved.dirty, dirty_after)
return PanelInjection(
action=action,
source_before=source_before,
source_after=source_after,
shadow_before=shadow_before,
shadow_after=shadow_after,
previous_before=previous_before,
previous_after=previous_after,
dirty_before=dirty_before,
dirty_after=dirty_after,
)
def step(self) -> str:
pc = self.cpu.pc
cycles_before = self.cpu.cycles

View File

@@ -18,6 +18,7 @@ from .constants import (
from .eeprom_image import write_eeprom_snapshot
from .errors import UnsupportedInstruction
from .memory import MemoryAccess
from .panel import PanelAction, parse_panel_action, resolve_panel_input
from .runner import H8536Emulator
from .uart import UartTiming
@@ -33,6 +34,13 @@ CONNECT_LCD_FRAMES = (
)
WATCH_PCS = {
0x15E0: "main_panel_scanner",
0x1BA0: "panel_f6d4_edge_dispatch",
0x1BF8: "panel_f6db_edge_dispatch",
0x1C0E: "panel_bit_dispatch",
0x1F40: "cam_power_handler",
0x20A1: "call_handler",
0x3E54: "report_queue_enqueue",
0xBB57: "sci1_eri_entry",
0xBB67: "sci1_rxi_entry",
0xBBD6: "rx_checksum_seed",
@@ -56,6 +64,10 @@ WATCH_RANGES = (
)
ACCESS_RANGES = (
(0xF000, 0xF10F, "panel_external_bytes"),
(0xF6D0, 0xF6DF, "panel_shadow_bytes"),
(0xF6E0, 0xF6EF, "panel_previous_shadow_bytes"),
(0xF6F0, 0xF6F3, "panel_dirty_bytes"),
(0xF850, 0xF85D, "tx_staging_or_frame"),
(0xF860, 0xF86D, "rx_validation_or_capture"),
(0xF870, 0xF96F, "report_queue"),
@@ -64,8 +76,12 @@ ACCESS_RANGES = (
(0xFAA2, 0xFAA6, "serial_latches"),
(0xFAF0, 0xFAFF, "lcd_line_buffer"),
(0xE000, 0xE001, "primary_table_index_0000"),
(0xE00E, 0xE00F, "primary_cam_power_index_0007"),
(0xE02A, 0xE02B, "primary_call_index_0015"),
(0xE400, 0xE401, "secondary_table_index_0000"),
(0xE800, 0xE801, "current_table_index_0000"),
(0xE80E, 0xE80F, "current_cam_power_index_0007"),
(0xE82A, 0xE82B, "current_call_index_0015"),
(0xEC00, 0xEC01, "flag_table_index_0000"),
(0xE000, 0xE3FF, "primary_table_E000"),
(0xE400, 0xE7FF, "secondary_table_E400"),
@@ -77,6 +93,12 @@ ACCESS_RANGES = (
)
STATE_BYTES = {
0xF6D4: "panel_shadow_F6D4_cam_lane",
0xF6DB: "panel_shadow_F6DB_call_lane",
0xF6E4: "panel_previous_F6E4_cam_lane",
0xF6EB: "panel_previous_F6EB_call_lane",
0xF6F2: "panel_dirty_F6F2",
0xF6F3: "panel_dirty_F6F3",
0xF9B0: "queue_head",
0xF9B5: "queue_tail",
0xF9C0: "tx_gate",
@@ -104,12 +126,14 @@ STATE_WORDS = {
0xE000: "E000_index_0000_primary",
0xE004: "E000_index_0002_primary",
0xE008: "E000_index_0004_primary",
0xE00E: "E000_index_0007_cam_power_primary",
0xE024: "E000_index_0012_primary",
0xE026: "E000_index_0013_primary",
0xE02A: "E000_index_0015_primary",
0xE104: "E000_index_0082_primary",
0xE400: "E400_index_0000_secondary",
0xE800: "E800_index_0000_current",
0xE80E: "E800_index_0007_cam_power_current",
0xE804: "E800_index_0002_current",
0xE808: "E800_index_0004_current",
0xE824: "E800_index_0012_current",
@@ -189,6 +213,50 @@ class FrameResult:
return lines
@dataclass(frozen=True)
class PanelActionResult:
action: PanelAction
injection_summary: str
steps: int
stopped_reason: str
new_tx_bytes: bytes
new_tx_frames: list[bytes]
state_before: dict[str, int | str]
state_after: dict[str, int | str]
accesses: list[MemoryAccess]
context: RunContext
def lines(self, index: int) -> list[str]:
lines = [
f"panel_action[{index}]={self.action.label} input={self.action.panel_input.spec}",
f" injection={self.injection_summary}",
f" stopped={self.stopped_reason} steps={self.steps}",
f" new_tx_bytes={format_frame(self.new_tx_bytes) if self.new_tx_bytes else 'none'}",
]
if self.new_tx_frames:
lines.append(" new_tx_frames=" + " | ".join(format_frame(frame) for frame in self.new_tx_frames))
else:
lines.append(" new_tx_frames=none")
lcd_display = self.state_after.get("lcd_display_ascii")
if isinstance(lcd_display, str):
lines.append(f" lcd_display={lcd_display!r}")
state_changes = _state_change_lines(self.state_before, self.state_after)
if state_changes:
lines.append(" state_changes:")
lines.extend(f" {line}" for line in state_changes)
pc_lines = _pc_hit_lines(self.context)
if pc_lines:
lines.append(" pc_hits:")
lines.extend(f" {line}" for line in pc_lines)
access_lines = _access_lines(self.accesses)
if access_lines:
lines.append(" interesting_accesses:")
lines.extend(f" {line}" for line in access_lines)
if self.context.unsupported:
lines.append(f" unsupported={self.context.unsupported}")
return lines
def parse_frame(text: str) -> bytes:
normalized = text.strip().replace(",", " ").replace(":", " ").replace("-", " ").replace("_", " ")
parts = normalized.split()
@@ -209,6 +277,27 @@ def parse_frame(text: str) -> bytes:
return bytes(values)
def parse_panel_action_arg(text: str) -> PanelAction:
try:
return parse_panel_action(text)
except ValueError as exc:
raise argparse.ArgumentTypeError(str(exc)) from exc
def parse_panel_press_arg(text: str) -> PanelAction:
try:
return PanelAction(resolve_panel_input(text), pressed=True, raw=text)
except ValueError as exc:
raise argparse.ArgumentTypeError(str(exc)) from exc
def parse_panel_release_arg(text: str) -> PanelAction:
try:
return PanelAction(resolve_panel_input(text), pressed=False, raw=text)
except ValueError as exc:
raise argparse.ArgumentTypeError(str(exc)) from exc
def frame_checksum(data: bytes) -> int:
checksum = CHECKSUM_SEED
for value in data[: FRAME_LENGTH - 1]:
@@ -318,6 +407,11 @@ def build_arg_parser() -> argparse.ArgumentParser:
parser.add_argument("--per-byte-steps", type=int, default=5_000, help="polite mode byte-consume limit, or UART mode step limit between byte arrivals")
parser.add_argument("--post-frame-steps", type=int, default=80_000, help="maximum steps after a full injected frame")
parser.add_argument("--post-frame-ms", type=int, help="run this many emulated milliseconds after each injected frame")
parser.add_argument("--panel", action="append", type=parse_panel_action_arg, default=[], help="synthetic panel action, e.g. cam-power, call=release, or F6D4.3=press; preserves order across repeated --panel uses")
parser.add_argument("--panel-press", action="append", type=parse_panel_press_arg, default=[], help="synthetic panel press alias/spec, e.g. cam-power, call, or F6D4.3")
parser.add_argument("--panel-release", action="append", type=parse_panel_release_arg, default=[], help="synthetic panel release alias/spec, e.g. call or F6DB.5")
parser.add_argument("--post-panel-steps", type=int, default=120_000, help="maximum steps after each synthetic panel action")
parser.add_argument("--post-panel-ms", type=int, help="run this many emulated milliseconds after each synthetic panel action")
parser.add_argument("--wait-heartbeats", type=int, default=0, help="wait for this many heartbeat frames before injecting the first host frame")
parser.add_argument("--wait-heartbeat-steps", type=int, default=1_500_000, help="maximum steps while waiting for pre-injection heartbeat frames")
parser.add_argument("--uart-timing", action="store_true", help="inject frame bytes at real UART inter-byte timing instead of waiting for RDRF consumption")
@@ -347,8 +441,9 @@ def main(argv: list[str] | None = None) -> int:
frames = list(args.frames)
if args.preset == "connect-lcd":
frames.extend(CONNECT_LCD_FRAMES)
if not frames:
raise SystemExit("pass at least one frame or use --preset connect-lcd")
panel_actions = [*args.panel, *args.panel_press, *args.panel_release]
if not frames and not panel_actions:
raise SystemExit("pass at least one frame, use --preset connect-lcd, or add --panel/--panel-press")
rom_path, emulator, boot_summary, results = run_rx_probe(
frames,
@@ -384,6 +479,19 @@ def main(argv: list[str] | None = None) -> int:
for index, result in enumerate(results):
for line in result.lines(index):
print(line)
panel_results = [
_run_panel_action(
emulator,
action,
post_panel_steps=args.post_panel_steps,
post_panel_ms=args.post_panel_ms,
stop_after_tx_frame=not args.keep_listening,
)
for action in panel_actions
]
for index, result in enumerate(panel_results):
for line in result.lines(index):
print(line)
print("total_tx_frames=" + " | ".join(format_frame(frame) for frame in emulator.sci1.tx_frames))
eeprom_writes = emulator.memory.p9_bus.x24164_bus.write_log_lines(limit=80)
if eeprom_writes:
@@ -484,6 +592,49 @@ def _run_frame(
)
def _run_panel_action(
emulator: H8536Emulator,
action: PanelAction,
*,
post_panel_steps: int,
post_panel_ms: int | None,
stop_after_tx_frame: bool,
) -> PanelActionResult:
state_before = _state_snapshot(emulator)
log_start = len(emulator.memory.access_log)
tx_byte_start = len(emulator.sci1.tx_bytes)
tx_frame_start = len(emulator.sci1.tx_frames)
injection = emulator.inject_panel_input(action.panel_input, pressed=action.pressed)
context = RunContext()
def post_predicate(inner: H8536Emulator) -> bool:
if not stop_after_tx_frame:
return False
return any(frame != HEARTBEAT_FRAME for frame in inner.sci1.tx_frames[tx_frame_start:])
if post_panel_ms is not None:
steps, reason = _run_cycles_for_ms(emulator, post_panel_ms, context)
stopped_reason = reason
else:
steps, reason = _run_until(emulator, post_panel_steps, post_predicate, context)
stopped_reason = "non_heartbeat_tx_frame" if reason == "predicate" and stop_after_tx_frame else reason
log_end = len(emulator.memory.access_log)
state_after = _state_snapshot(emulator)
return PanelActionResult(
action=action,
injection_summary=injection.summary(),
steps=steps,
stopped_reason=stopped_reason,
new_tx_bytes=bytes(emulator.sci1.tx_bytes[tx_byte_start:]),
new_tx_frames=list(emulator.sci1.tx_frames[tx_frame_start:]),
state_before=state_before,
state_after=state_after,
accesses=emulator.memory.access_log[log_start:log_end],
context=context,
)
def _inject_frame_uart_timed(
emulator: H8536Emulator,
frame: bytes,
@@ -672,11 +823,15 @@ def _parse_byte(text: str) -> int:
__all__ = [
"CONNECT_LCD_FRAMES",
"PanelActionResult",
"format_frame",
"frame_checksum",
"frame_checksum_ok",
"main",
"parse_frame",
"parse_panel_action_arg",
"parse_panel_press_arg",
"parse_panel_release_arg",
"run_rx_probe",
"UartTiming",
]

306
h8536/panel_button_trace.py Normal file
View File

@@ -0,0 +1,306 @@
from __future__ import annotations
import argparse
import json
import re
from collections import defaultdict
from pathlib import Path
from typing import Any
from .decoder import H8536Decoder
from .formatting import h16
from .rom import Rom
DEFAULT_ROM = Path("ROM/M27C512@DIP28_1.BIN")
DEFAULT_TEXT_OUTPUT = Path("build/panel_button_trace.md")
DEFAULT_JSON_OUTPUT = Path("build/panel_button_trace.json")
BUTTON_TABLE_BASE = 0x2706
NOOP_HANDLER = 0x1C25
QUEUE_FUNCTION = 0x3E54
INPUT_BYTES = [
{
"shadow": "F6D7",
"source": "F102",
"dirty": "F6F2.7",
"dispatcher": "1B2D",
"initial_r5": 0x7E,
"bank": "IRQ4 A8 panel byte path",
},
{
"shadow": "F6D6",
"source": "F103",
"dirty": "F6F2.6",
"dispatcher": "1B44",
"initial_r5": 0x6E,
"bank": "IRQ4 A8 panel byte path",
},
{
"shadow": "F6D5",
"source": "F104",
"dirty": "F6F2.5",
"dispatcher": "1B5B",
"initial_r5": 0x5E,
"bank": "IRQ4 A8 panel byte path",
},
{
"shadow": "F6D4",
"source": "F105",
"dirty": "F6F2.4",
"dispatcher": "1BA0",
"initial_r5": 0x4E,
"bank": "IRQ4 A8 panel byte path",
},
{
"shadow": "F6D3",
"source": "F106",
"dirty": "F6F2.3",
"dispatcher": "1BB6",
"initial_r5": 0x3E,
"bank": "IRQ4 A8 panel byte path",
},
{
"shadow": "F6D2",
"source": "F107",
"dirty": "F6F2.2",
"dispatcher": "1BCC",
"initial_r5": 0x2E,
"bank": "IRQ4 A8 panel byte path",
},
{
"shadow": "F6D1",
"source": "F108",
"dirty": "F6F2.1",
"dispatcher": "1B72",
"initial_r5": 0x1E,
"bank": "IRQ4 A8 panel byte path",
},
{
"shadow": "F6D0",
"source": "F109",
"dirty": "F6F2.0",
"dispatcher": "1B89",
"initial_r5": 0x0E,
"bank": "IRQ4 A8 panel byte path",
},
{
"shadow": "F6DC",
"source": "F005",
"dirty": "F6F3.4",
"dispatcher": "1BE2",
"initial_r5": 0xCE,
"bank": "IRQ3 A8 panel byte path",
},
{
"shadow": "F6DB",
"source": "F006",
"dirty": "F6F3.3",
"dispatcher": "1BF8",
"initial_r5": 0xBE,
"bank": "IRQ3 A8 panel byte path",
},
]
KNOWN_REPORTS = {
0x0007: "CAM POWER",
0x0015: "CALL",
}
def analyze_panel_buttons(rom_bytes: bytes) -> dict[str, Any]:
rom = Rom(rom_bytes)
entries = _table_entries(rom)
handler_summaries = {
target: _summarize_handler(rom, target)
for target in sorted({entry["handler"] for entry in entries if entry["handler"] != NOOP_HANDLER})
}
for entry in entries:
entry["handler_summary"] = handler_summaries.get(entry["handler"], {})
selectors = entry["handler_summary"].get("report_selectors", [])
entry["known_report"] = ", ".join(
KNOWN_REPORTS.get(int(selector), f"0x{int(selector):04X}") for selector in selectors
)
return {
"kind": "panel_button_trace",
"button_table_base": BUTTON_TABLE_BASE,
"button_table_base_hex": h16(BUTTON_TABLE_BASE),
"noop_handler": NOOP_HANDLER,
"noop_handler_hex": h16(NOOP_HANDLER),
"entries": entries,
"known_paths": _known_paths(entries),
"handler_summaries": [
{"handler": handler, "handler_hex": h16(handler), **summary}
for handler, summary in sorted(handler_summaries.items())
],
}
def format_markdown(analysis: dict[str, Any]) -> str:
lines = [
"# PT2 Known Button ROM Trace",
"",
"This report follows the panel button edge path from the serial-visible reports back into the ROM input scanner.",
"The key table is the indirect handler table at `H'2706`, used by `loc_1C0E` after byte-level panel input changes are detected.",
"",
"## Known Anchors",
"",
]
for path in analysis["known_paths"]:
lines.extend(
[
f"### {path['name']}",
"",
f"- Emitted selector: `0x{path['selector']:04X}`",
f"- Handler: `H'{path['handler']:04X}`",
f"- Edge source: `{path['source']} -> {path['shadow']}` via `{path['dirty']}`",
f"- Trigger bit: `{path['shadow']}.{path['bit']}`",
f"- Table slot: `H'{path['table_address']:04X}` -> `H'{path['handler']:04X}`",
f"- Current-level tests: {', '.join(path['handler_summary'].get('level_tests', [])) or 'none found'}",
f"- State writes: {', '.join(path['handler_summary'].get('state_writes', [])) or 'none found'}",
"",
]
)
lines.extend(
[
"## Button Matrix Entries With Serial Reports",
"",
"| Source | Shadow bit | Dirty | Handler | Selector(s) | State writes |",
"| --- | --- | --- | --- | --- | --- |",
]
)
for entry in analysis["entries"]:
selectors = entry["handler_summary"].get("report_selectors", [])
if not selectors:
continue
selector_text = ", ".join(f"`0x{int(selector):04X}`" for selector in selectors)
writes = ", ".join(f"`{write}`" for write in entry["handler_summary"].get("state_writes", [])[:4])
lines.append(
f"| `{entry['source']}` | `{entry['shadow']}.{entry['bit']}` | `{entry['dirty']}` | "
f"`H'{entry['handler']:04X}` | {selector_text} | {writes} |"
)
lines.extend(
[
"",
"## Practical Read",
"",
"- CALL and CAM POWER do share the general panel edge path with many other buttons.",
"- The shared path is: panel byte snapshot -> shadow byte -> dirty bit -> `loc_1C0E` jump table -> handler -> `loc_3E54` report.",
"- Other buttons diverge in their handlers: many require `F731/F730/F791` session/menu gates, mutate page state, or emit different selectors.",
"- Some table entries are `H'1C25`, an immediate `RTS`, so those physical matrix positions are intentionally ignored in this firmware context.",
"",
]
)
return "\n".join(lines)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Trace panel button matrix handlers and known CALL/CAM report sources.")
parser.add_argument("rom", nargs="?", type=Path, default=DEFAULT_ROM)
parser.add_argument("--out", type=Path, default=DEFAULT_TEXT_OUTPUT)
parser.add_argument("--json-out", type=Path, default=DEFAULT_JSON_OUTPUT)
args = parser.parse_args(argv)
analysis = analyze_panel_buttons(args.rom.read_bytes())
args.out.parent.mkdir(parents=True, exist_ok=True)
args.out.write_text(format_markdown(analysis), encoding="utf-8")
args.json_out.parent.mkdir(parents=True, exist_ok=True)
args.json_out.write_text(json.dumps(analysis, indent=2, sort_keys=True) + "\n", encoding="utf-8")
print(f"wrote {args.out}")
print(f"wrote {args.json_out}")
return 0
def _table_entries(rom: Rom) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for lane in INPUT_BYTES:
initial_r5 = int(lane["initial_r5"])
for bit in range(7, -1, -1):
table_offset = initial_r5 - (2 * (7 - bit))
table_address = BUTTON_TABLE_BASE + table_offset
handler = rom.u16(table_address)
rows.append(
{
"source": lane["source"],
"shadow": lane["shadow"],
"dirty": lane["dirty"],
"dispatcher": lane["dispatcher"],
"bank": lane["bank"],
"bit": bit,
"table_offset": table_offset,
"table_address": table_address,
"table_address_hex": h16(table_address),
"handler": handler,
"handler_hex": h16(handler),
"is_noop": handler == NOOP_HANDLER,
}
)
return rows
def _known_paths(entries: list[dict[str, Any]]) -> list[dict[str, Any]]:
paths: list[dict[str, Any]] = []
for entry in entries:
selectors = entry["handler_summary"].get("report_selectors", [])
for selector in selectors:
name = KNOWN_REPORTS.get(int(selector))
if not name:
continue
paths.append({"name": name, "selector": int(selector), **entry})
return paths
def _summarize_handler(rom: Rom, address: int) -> dict[str, Any]:
instructions = _decode_linear_handler(rom, address)
report_selectors: list[int] = []
last_r3: int | None = None
level_tests: list[str] = []
state_writes: list[str] = []
for ins in instructions:
text = ins.text
if match := re.search(r"MOV:[^#]+#H'([0-9A-F]{4}), R3", text):
last_r3 = int(match.group(1), 16)
if QUEUE_FUNCTION in ins.targets and last_r3 is not None:
if last_r3 not in report_selectors:
report_selectors.append(last_r3)
if match := re.search(r"BTST\.B #([0-7]), @H'(F6[0-9A-F]{2})", text):
test = f"{match.group(2)}.{match.group(1)}"
if test not in level_tests:
level_tests.append(test)
if match := re.search(r"@H'(E[89][0-9A-F]{2})", text):
write = match.group(1)
if _looks_like_write(text) and write not in state_writes:
state_writes.append(write)
return {
"report_selectors": report_selectors,
"report_selectors_hex": [f"0x{selector:04X}" for selector in report_selectors],
"level_tests": level_tests,
"state_writes": state_writes,
"instruction_count_scanned": len(instructions),
}
def _decode_linear_handler(rom: Rom, address: int, *, max_bytes: int = 0x90) -> list[Any]:
decoder = H8536Decoder(rom, labels={QUEUE_FUNCTION: "loc_3E54"})
instructions: list[Any] = []
pc = address
end = min(len(rom.data), address + max_bytes)
while pc < end:
ins = decoder.decode(pc)
instructions.append(ins)
pc += max(1, ins.size)
if ins.mnemonic == "RTS":
break
return instructions
def _looks_like_write(text: str) -> bool:
return text.startswith("MOV") or text.startswith("BSET") or text.startswith("BCLR") or text.startswith("CLR")
__all__ = ["analyze_panel_buttons", "format_markdown", "main"]
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -7,7 +7,7 @@ import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, TextIO
from typing import Any, Callable, TextIO
from .bench_connect_lcd import (
BenchLogger,
@@ -21,6 +21,7 @@ from .bench_connect_lcd import (
_send_frame,
_wait_for_ready,
format_frame,
frame_checksum,
frame_checksum_ok,
parse_frame,
serial_format_label,
@@ -30,6 +31,7 @@ from .serial_table_dump import build_read_frame, decode_table_read_response
DEFAULT_ACK_TARGET = bytes.fromhex("07804020902D")
DEFAULT_ACK_FRAME = bytes.fromhex("05004000001F")
HEARTBEAT_FRAME = bytes.fromhex("0000000080DA")
@dataclass
@@ -69,6 +71,11 @@ def build_arg_parser() -> argparse.ArgumentParser:
parser.add_argument("--sync", choices=("checksum", "fixed"), default="checksum", help="RX frame sync strategy")
parser.add_argument("--log", type=Path, help="capture log path")
parser.add_argument("--result-json", type=Path, help="write machine-readable scenario summary")
parser.add_argument(
"--quiet-console",
action="store_true",
help="keep full logs in --log but suppress RX/DETECT chatter on the console",
)
parser.add_argument("--dry-run", action="store_true", help="print the plan without opening serial ports")
return parser
@@ -83,7 +90,8 @@ def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
return 0
serial = _import_serial()
logger = BenchLogger(log_path, stdout=stdout)
console = _FilteredStdout(stdout, _quiet_console_line) if args.quiet_console else stdout
logger = BenchLogger(log_path, stdout=console)
detector = FrameDetector(sync_mode=args.sync)
try:
logger.emit("Serial bench scenario")
@@ -165,6 +173,8 @@ def _run_step(ctx: ScenarioContext, action: str, spec: dict[str, Any]) -> None:
raise SystemExit(2)
elif action in {"drain", "listen", "wait"}:
_listen(ctx, float(spec.get("seconds", spec.get("value", 0.0))))
elif action == "listen_ack":
_step_listen_ack(ctx, spec)
elif action == "send":
frame = _parse_required_frame(spec.get("frame"))
label = str(spec.get("label", "send"))
@@ -173,6 +183,10 @@ def _run_step(ctx: ScenarioContext, action: str, spec: dict[str, Any]) -> None:
_listen(ctx, float(spec.get("listen", 0.0)))
elif action == "wait_for":
_step_wait_for(ctx, spec)
elif action == "prompt":
_step_prompt(ctx, spec)
elif action == "note":
_step_note(ctx, spec)
elif action == "table_sweep":
_step_table_sweep(ctx, spec)
elif action == "repeat":
@@ -214,6 +228,24 @@ def _step_wait_for(ctx: ScenarioContext, spec: dict[str, Any]) -> None:
raise SystemExit(3)
def _step_prompt(ctx: ScenarioContext, spec: dict[str, Any]) -> None:
message = str(spec.get("message", spec.get("value", "Press Enter to continue.")))
ctx.logger.event(f"PROMPT {message}")
input(message + " ")
def _step_note(ctx: ScenarioContext, spec: dict[str, Any]) -> None:
message = str(spec.get("message", spec.get("value", "")))
if bool(spec.get("banner", False)):
ctx.logger.emit("")
ctx.logger.emit("NOTE " + "=" * 68)
ctx.logger.event(f"NOTE {message}")
ctx.logger.emit("NOTE " + "=" * 68)
ctx.logger.emit("")
else:
ctx.logger.event(f"NOTE {message}")
def _step_repeat(ctx: ScenarioContext, spec: dict[str, Any]) -> None:
count = max(0, int(spec.get("count", 1)))
steps = spec.get("steps", [])
@@ -248,10 +280,52 @@ def _step_table_sweep(ctx: ScenarioContext, spec: dict[str, Any]) -> None:
_listen_with_ack(ctx, gap, selector, ack)
def _step_listen_ack(ctx: ScenarioContext, spec: dict[str, Any]) -> None:
seconds = float(spec.get("seconds", spec.get("value", 1.0)))
ack = _ack_config(
{
"enabled": spec.get("enabled", True),
"frames": spec.get("frames", spec.get("frame")),
"ack_frame": spec.get("ack_frame"),
"ack_guard": spec.get("ack_guard", 0.020),
"poll_interval": spec.get("poll_interval", 0.005),
"post_ack_read": spec.get("post_ack_read", 0.250),
"once_per_selector": spec.get("once_per_frame", False),
"max_acks": spec.get("max_acks"),
"max_target_hits": spec.get("max_target_hits"),
"abort_on_limit": spec.get("abort_on_limit", False),
"ack_mode": spec.get("ack_mode", spec.get("mode", "fixed")),
"target_mode": spec.get("target_mode", spec.get("match", "explicit")),
"limit_scope": spec.get("limit_scope", spec.get("scope", "local")),
}
)
ack_text = (
f"ack_frame={format_frame(ack['frame'])}"
if ack["ack_mode"] == "fixed"
else f"ack_mode={ack['ack_mode']}"
)
ctx.logger.event(
f"LISTEN_ACK seconds={seconds:.3f} target_mode={ack['target_mode']} targets={len(ack['targets'])} "
f"{ack_text} limit_scope={ack['limit_scope']} max_acks={ack['max_acks']}"
)
_listen_with_ack(ctx, seconds, None, ack)
def _ack_config(raw: Any) -> dict[str, Any]:
spec = raw if isinstance(raw, dict) else {}
enabled = bool(spec.get("enabled", True))
targets = _parse_frame_list(spec.get("frames", spec.get("frame", DEFAULT_ACK_TARGET)))
ack_mode = str(spec.get("ack_mode", spec.get("mode", "fixed"))).strip().lower().replace("-", "_")
if ack_mode not in {"fixed", "cmd5_selector"}:
raise SystemExit(f"unknown ack_mode {ack_mode!r}")
target_mode = str(spec.get("target_mode", spec.get("match", "explicit"))).strip().lower().replace("-", "_")
if target_mode == "queued_report":
target_mode = "queued_reports"
if target_mode not in {"explicit", "queued_reports"}:
raise SystemExit(f"unknown target_mode {target_mode!r}")
limit_scope = str(spec.get("limit_scope", spec.get("scope", "global"))).strip().lower().replace("-", "_")
if limit_scope not in {"global", "local"}:
raise SystemExit(f"unknown limit_scope {limit_scope!r}")
targets = set() if target_mode == "queued_reports" else _parse_frame_list(spec.get("frames", spec.get("frame", DEFAULT_ACK_TARGET)))
if not enabled:
targets = set()
return {
@@ -265,9 +339,29 @@ def _ack_config(raw: Any) -> dict[str, Any]:
"max_acks": _optional_int(spec.get("max_acks")),
"max_target_hits": _optional_int(spec.get("max_target_hits")),
"abort_on_limit": bool(spec.get("abort_on_limit", True)),
"ack_mode": ack_mode,
"target_mode": target_mode,
"limit_scope": limit_scope,
}
def _ack_frame_for_target(target: bytes, ack: dict[str, Any]) -> bytes:
if ack["ack_mode"] == "fixed":
return ack["frame"]
if len(target) != 6:
raise SystemExit(f"cannot build selector ACK for malformed target {target!r}")
body = bytes([0x05, target[1] & 0x7F, target[2], 0x00, 0x00])
return body + bytes([frame_checksum(body)])
def _ack_matches(frame: bytes, ack: dict[str, Any]) -> bool:
if ack["target_mode"] == "explicit":
return frame in ack["targets"]
if frame == HEARTBEAT_FRAME or not frame_checksum_ok(frame):
return False
return frame[0] in {0x00, 0x01, 0x02} and not (frame[1] & 0x80)
def _listen_with_ack(
ctx: ScenarioContext,
seconds: float,
@@ -277,6 +371,8 @@ def _listen_with_ack(
deadline = time.monotonic() + max(0.0, seconds)
observed: list[bytes] = []
acked_targets: set[bytes] = set()
ack_start = ctx.ack_sent
target_start = sum(ctx.target_counts.values())
while time.monotonic() < deadline:
frames = _read_available(ctx, selector=selector)
observed.extend(frames)
@@ -288,10 +384,10 @@ def _listen_with_ack(
if not ack["enabled"]:
continue
for frame in frames:
if frame not in ack["targets"]:
if not _ack_matches(frame, ack):
continue
_count_target(ctx, frame)
if _ack_limit_reached(ctx, ack):
if _ack_limit_reached(ctx, ack, ack_start=ack_start, target_start=target_start):
ctx.logger.event("ACK_LIMIT reached before ACK send")
if ack["abort_on_limit"]:
ctx.abort_requested = True
@@ -302,9 +398,9 @@ def _listen_with_ack(
acked_targets.add(frame)
if ack["guard"] > 0:
observed.extend(_listen(ctx, ack["guard"], selector=selector))
_send_and_record(ctx, ack["frame"], "ack")
_send_and_record(ctx, _ack_frame_for_target(frame, ack), "ack")
ctx.ack_sent += 1
if _ack_limit_reached(ctx, ack):
if _ack_limit_reached(ctx, ack, ack_start=ack_start, target_start=target_start):
ctx.logger.event("ACK_LIMIT reached after ACK send")
if ack["abort_on_limit"]:
ctx.abort_requested = True
@@ -440,12 +536,24 @@ def _optional_int(raw: Any) -> int | None:
return _int_value(raw)
def _ack_limit_reached(ctx: ScenarioContext, ack: dict[str, Any]) -> bool:
def _ack_limit_reached(
ctx: ScenarioContext,
ack: dict[str, Any],
*,
ack_start: int = 0,
target_start: int = 0,
) -> bool:
if ack.get("limit_scope") == "local":
ack_count = ctx.ack_sent - ack_start
target_count = sum(ctx.target_counts.values()) - target_start
else:
ack_count = ctx.ack_sent
target_count = sum(ctx.target_counts.values())
max_acks = ack.get("max_acks")
if max_acks is not None and ctx.ack_sent >= max_acks:
if max_acks is not None and ack_count >= max_acks:
return True
max_target_hits = ack.get("max_target_hits")
if max_target_hits is not None and sum(ctx.target_counts.values()) >= max_target_hits:
if max_target_hits is not None and target_count >= max_target_hits:
return True
return False
@@ -462,6 +570,51 @@ def _print_dry_run(args: argparse.Namespace, scenario: dict[str, Any], log_path:
_print_step_dry_run(action, spec, stdout)
class _FilteredStdout:
def __init__(self, stdout: TextIO, predicate: Callable[[str], bool]) -> None:
self.stdout = stdout
self.predicate = predicate
self.buffer = ""
def write(self, text: str) -> int:
self.buffer += text
while "\n" in self.buffer:
line, self.buffer = self.buffer.split("\n", 1)
if self.predicate(line):
self.stdout.write(line + "\n")
self.stdout.flush()
return len(text)
def flush(self) -> None:
if self.buffer:
if self.predicate(self.buffer):
self.stdout.write(self.buffer)
self.buffer = ""
self.stdout.flush()
def _quiet_console_line(line: str) -> bool:
if not line.strip():
return True
keep_fragments = (
"Serial bench scenario",
"name=",
"device=",
"log=",
"STEP ",
"PROMPT ",
"NOTE ",
"Summary",
"rx_frames=",
"resync_events=",
"tx_frames=",
"abort_requested=",
"known_shutter",
"queued_shutter",
)
return any(fragment in line for fragment in keep_fragments)
def _print_step_dry_run(action: str, spec: dict[str, Any], stdout: TextIO, *, indent: str = " ") -> None:
if action == "send":
frame = _parse_required_frame(spec.get("frame"))
@@ -470,6 +623,42 @@ def _print_step_dry_run(action: str, spec: dict[str, Any], stdout: TextIO, *, in
print(f"{indent}listen={float(spec.get('listen', 0.0)):.3f}s", file=stdout)
elif action in {"drain", "listen", "wait"}:
print(f"{indent}seconds={float(spec.get('seconds', spec.get('value', 0.0))):.3f}", file=stdout)
elif action == "listen_ack":
ack = _ack_config(
{
"enabled": spec.get("enabled", True),
"frames": spec.get("frames", spec.get("frame")),
"ack_frame": spec.get("ack_frame"),
"ack_guard": spec.get("ack_guard", 0.020),
"poll_interval": spec.get("poll_interval", 0.005),
"post_ack_read": spec.get("post_ack_read", 0.250),
"once_per_selector": spec.get("once_per_frame", False),
"max_acks": spec.get("max_acks"),
"max_target_hits": spec.get("max_target_hits"),
"abort_on_limit": spec.get("abort_on_limit", False),
"ack_mode": spec.get("ack_mode", spec.get("mode", "fixed")),
"target_mode": spec.get("target_mode", spec.get("match", "explicit")),
"limit_scope": spec.get("limit_scope", spec.get("scope", "local")),
}
)
print(f"{indent}seconds={float(spec.get('seconds', spec.get('value', 1.0))):.3f}", file=stdout)
if not ack["enabled"]:
print(f"{indent}ack=disabled", file=stdout)
else:
print(f"{indent}target_mode={ack['target_mode']}", file=stdout)
for target in sorted(ack["targets"]):
print(f"{indent}ack_target={format_frame(target)}", file=stdout)
print(f"{indent}ack_mode={ack['ack_mode']}", file=stdout)
if ack["ack_mode"] == "fixed":
print(f"{indent}ack_frame={format_frame(ack['frame'])}", file=stdout)
print(
f"{indent}limit_scope={ack['limit_scope']} max_acks={ack['max_acks']} "
f"max_target_hits={ack['max_target_hits']}",
file=stdout,
)
elif action in {"prompt", "note"}:
message = str(spec.get("message", spec.get("value", "Press Enter to continue.")))
print(f"{indent}message={message}", file=stdout)
elif action == "wait_ready":
print(
f"{indent}heartbeats={int(spec.get('heartbeats', 2))} "
@@ -493,7 +682,11 @@ def _print_step_dry_run(action: str, spec: dict[str, Any], stdout: TextIO, *, in
for target in sorted(ack["targets"]):
print(f"{indent}ack_target={format_frame(target)}", file=stdout)
print(f"{indent}ack_frame={format_frame(ack['frame'])}", file=stdout)
print(f"{indent}max_acks={ack['max_acks']} max_target_hits={ack['max_target_hits']}", file=stdout)
print(
f"{indent}limit_scope={ack['limit_scope']} max_acks={ack['max_acks']} "
f"max_target_hits={ack['max_target_hits']}",
file=stdout,
)
elif action == "repeat":
count = max(0, int(spec.get("count", 1)))
steps = spec.get("steps", [])

View File

@@ -0,0 +1,121 @@
from __future__ import annotations
import argparse
import json
import sys
from collections import Counter
from pathlib import Path
from typing import Any, TextIO
from .bench_connect_lcd import format_frame, label_frame, parse_frame
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Compare two serial_scenario result JSON files and highlight extra button/report traffic."
)
parser.add_argument("baseline", type=Path, help="baseline result JSON, usually a no-button run")
parser.add_argument("candidate", type=Path, help="candidate result JSON, usually a one-button run")
parser.add_argument("--show-labels", action="store_true", help="also show label count deltas")
return parser
def main(argv: list[str] | None = None, *, stdout: TextIO = sys.stdout) -> int:
args = build_arg_parser().parse_args(argv)
baseline = _load_result(args.baseline)
candidate = _load_result(args.candidate)
print(format_comparison(baseline, candidate, show_labels=args.show_labels), file=stdout)
return 0
def _load_result(path: Path) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as handle:
result = json.load(handle)
if not isinstance(result, dict):
raise SystemExit(f"{path} is not a result JSON object")
result["_path"] = str(path)
return result
def format_comparison(
baseline: dict[str, Any],
candidate: dict[str, Any],
*,
show_labels: bool = False,
) -> str:
base_targets = Counter(_string_int_mapping(baseline.get("ack_targets", {})))
candidate_targets = Counter(_string_int_mapping(candidate.get("ack_targets", {})))
target_delta = candidate_targets - base_targets
lines = [
"Serial scenario comparison",
f"baseline={baseline.get('_path', '')}",
f"candidate={candidate.get('_path', '')}",
f"baseline_log={baseline.get('log', '')}",
f"candidate_log={candidate.get('log', '')}",
f"baseline_rx_frames={baseline.get('rx_frames', 0)} candidate_rx_frames={candidate.get('rx_frames', 0)}",
f"baseline_ack_sent={baseline.get('ack_sent', 0)} candidate_ack_sent={candidate.get('ack_sent', 0)}",
]
if target_delta:
lines.append("extra ACK-target frames in candidate:")
for frame_text, count in sorted(target_delta.items(), key=_frame_sort_key):
lines.append(f" +{count:3d} {frame_text} {_describe_frame(frame_text)}")
else:
lines.append("no extra ACK-target frames found in candidate")
missing = base_targets - candidate_targets
if missing:
lines.append("baseline ACK-target frames missing/decreased in candidate:")
for frame_text, count in sorted(missing.items(), key=_frame_sort_key):
lines.append(f" -{count:3d} {frame_text} {_describe_frame(frame_text)}")
if show_labels:
base_labels = Counter(_string_int_mapping(baseline.get("labels", {})))
candidate_labels = Counter(_string_int_mapping(candidate.get("labels", {})))
label_delta = candidate_labels - base_labels
lines.append("label count increases:")
if label_delta:
for label, count in sorted(label_delta.items()):
lines.append(f" +{count:3d} {label}")
else:
lines.append(" none")
return "\n".join(lines)
def _string_int_mapping(raw: Any) -> dict[str, int]:
if not isinstance(raw, dict):
return {}
output: dict[str, int] = {}
for key, value in raw.items():
try:
output[str(key)] = int(value)
except (TypeError, ValueError):
continue
return output
def _describe_frame(frame_text: str) -> str:
frame = parse_frame(frame_text)
selector = ((frame[1] & 0x7F) << 7) | frame[2]
value = (frame[3] << 8) | frame[4]
label = label_frame(frame) or "checksum_ok_unlabeled"
return f"cmd=0x{frame[0]:02X} selector=0x{selector:04X} value=0x{value:04X} label={label}"
def _frame_sort_key(item: tuple[str, int]) -> tuple[int, int, int, str]:
frame_text, _count = item
try:
frame = parse_frame(frame_text)
except argparse.ArgumentTypeError:
return (0xFFFF, 0xFFFF, 0xFFFF, frame_text)
selector = ((frame[1] & 0x7F) << 7) | frame[2]
value = (frame[3] << 8) | frame[4]
return (selector, frame[0], value, frame_text)
__all__ = [
"build_arg_parser",
"format_comparison",
"main",
]

View File

@@ -19,14 +19,24 @@ DEFAULT_IGNORED_LABELS = {
"connect_ok_path_response_candidate",
"connect_c0_path_response_candidate",
"table_readback_candidate",
"active_selector0_keepalive_report",
"gated_active_0004_response_candidate",
"gated_active_0004_transition_candidate",
}
KNOWN_FRAME_LABELS = {
"00 00 00 80 80 5A": "active_selector0_keepalive_report",
"00 00 6C 00 00 36": "copy_completion_exit_selector_006c_candidate",
"00 00 6D 00 00 37": "copy_in_progress_selector_006d_candidate",
"00 00 15 80 00 CF": "known_call_button_active_report",
"00 00 15 00 00 4F": "known_call_button_inactive_report",
"00 00 07 80 00 DD": "known_cam_power_button_report",
"01 00 17 80 00 CC": "queued_bars_button_selector_0017_active_candidate",
"02 00 17 80 00 CF": "queued_bars_button_selector_0017_active_candidate",
"01 00 18 80 00 C3": "queued_bars_button_selector_0018_active_candidate",
"02 00 18 80 00 C0": "queued_bars_button_selector_0018_active_candidate",
"01 01 1A 08 00 48": "queued_iris_auto_button_selector_009a_active_candidate",
"02 01 1A 08 00 4B": "queued_iris_auto_button_selector_009a_active_candidate",
"01 00 04 00 00 5F": "gated_active_0004_response_candidate",
"02 00 04 00 00 5C": "gated_active_0004_transition_candidate",
}