1
0

More decompiling work

This commit is contained in:
Aiden
2026-05-25 17:32:00 +10:00
parent 56829b6e0b
commit 07f48c76e0
22 changed files with 9837 additions and 5 deletions

534
h8536/protocol_capture.py Normal file
View File

@@ -0,0 +1,534 @@
from __future__ import annotations
import argparse
import json
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterable, Mapping, TextIO
try: # Keep this module useful even when copied away from the decompiler tree.
from . import protocol_trace as _protocol_trace
except ImportError: # pragma: no cover - exercised only outside package imports.
_protocol_trace = None
CHECKSUM_SEED = getattr(_protocol_trace, "CHECKSUM_SEED", 0x5A)
FRAME_LENGTH = getattr(_protocol_trace, "FRAME_LENGTH", 6)
CAPTURE_LINE_RE = re.compile(
r"^\s*(?P<time>\d{1,2}:\d{2}:\d{2}(?:\.\d{1,6})?)\s+"
r"(?P<direction>RX|TX)\s+"
r"(?P<count>\d+)\s+bytes?\s+"
r"(?P<byte_text>.*?)\s*$",
re.IGNORECASE,
)
HEX_BYTE_RE = re.compile(r"\b[0-9A-Fa-f]{2}\b")
_FALLBACK_OBSERVED_TX_REPORT_CANDIDATES: dict[tuple[int, int], dict[str, str]] = {
(0x0000, 0x0080): {
"name_candidate": "heartbeat_alive_candidate",
},
(0x0015, 0x8000): {
"name_candidate": "call_button_candidate",
"state_candidate": "active",
},
(0x0015, 0x0000): {
"name_candidate": "call_button_candidate",
"state_candidate": "inactive",
},
(0x0007, 0x8000): {
"name_candidate": "cam_power_button_candidate",
"state_candidate": "active",
},
}
OBSERVED_TX_REPORT_CANDIDATES = getattr(
_protocol_trace,
"OBSERVED_TX_REPORT_CANDIDATES",
_FALLBACK_OBSERVED_TX_REPORT_CANDIDATES,
)
@dataclass(frozen=True)
class CaptureChunk:
chunk_index: int
timestamp: str
timestamp_ms: int
analyzer_direction: str
device_direction: str
declared_count: int
bytes: tuple[int, ...]
raw_line: str
def checksum_for(frame_prefix: Iterable[int]) -> int:
if _protocol_trace is not None and hasattr(_protocol_trace, "checksum_for"):
return int(_protocol_trace.checksum_for(frame_prefix))
value = CHECKSUM_SEED
for byte in frame_prefix:
value ^= byte & 0xFF
return value & 0xFF
def parse_capture_text(text: str) -> list[CaptureChunk]:
chunks: list[CaptureChunk] = []
for raw_line in text.splitlines():
line = raw_line.strip()
if not line:
continue
match = CAPTURE_LINE_RE.match(line)
if not match:
continue
byte_values = tuple(int(token, 16) for token in HEX_BYTE_RE.findall(match.group("byte_text")))
analyzer_direction = match.group("direction").lower()
chunks.append(
CaptureChunk(
chunk_index=len(chunks),
timestamp=match.group("time"),
timestamp_ms=_timestamp_ms(match.group("time")),
analyzer_direction=analyzer_direction,
device_direction=_device_direction(analyzer_direction),
declared_count=int(match.group("count")),
bytes=byte_values,
raw_line=raw_line,
)
)
if len(byte_values) != int(match.group("count")):
# Preserve the chunk and expose the mismatch in analysis instead of dropping capture evidence.
continue
return chunks
def analyze_capture_text(text: str) -> dict[str, Any]:
return analyze_capture_chunks(parse_capture_text(text))
def analyze_capture_chunks(chunks: Iterable[CaptureChunk]) -> dict[str, Any]:
chunk_list = list(chunks)
frames = _recombine_frames(chunk_list)
groups = _repeated_groups(frames)
gate_session_hints = _gate_session_hints(frames)
return {
"kind": "h8536_protocol_capture",
"frame_length": FRAME_LENGTH,
"checksum_model": {
"algorithm": "xor",
"seed": CHECKSUM_SEED,
"seed_hex": _h8(CHECKSUM_SEED),
"covered_offsets": [0, 1, 2, 3, 4],
"checksum_offset": 5,
},
"chunks": [_chunk_dict(chunk) for chunk in chunk_list],
"chunk_count": len(chunk_list),
"frames": frames,
"frame_count": len(frames),
"repeated_groups": groups,
"repeated_group_count": len(groups),
"gate_session_hints": gate_session_hints,
"direction_note": (
"Capture RX is analyzer-perspective receive; these bytes are device-perspective TX."
),
}
def format_text_report(analysis: Mapping[str, Any]) -> str:
lines = [
"H8/536 capture log",
(
f"chunks={analysis.get('chunk_count', 0)} "
f"frames={analysis.get('frame_count', 0)} "
f"repeated_groups={analysis.get('repeated_group_count', 0)}"
),
]
for frame in analysis.get("frames", []):
label = ""
report = frame.get("report_candidate") or {}
candidate = report.get("observed_candidate") or {}
if candidate.get("name_candidate"):
label = f" {candidate['name_candidate']}"
if candidate.get("state_candidate"):
label += f" state={candidate['state_candidate']}"
split = " split" if frame.get("source_chunk_count", 0) > 1 else ""
lines.append(
(
f"[{frame['frame_index']:04d}] {frame['timestamp']} "
f"{frame['analyzer_direction'].upper()}=>device:{frame['device_direction']} "
f"bytes={' '.join(frame['bytes_hex'])} checksum=ok{split} "
f"index={report.get('index_hex')} value={report.get('value_hex')}{label}"
)
)
for group in analysis.get("repeated_groups", []):
cadence = group.get("cadence_ms") or {}
cadence_text = "n/a"
if cadence.get("average") is not None:
cadence_text = (
f"avg={cadence['average']:.1f}ms "
f"min={cadence['min']}ms max={cadence['max']}ms"
)
lines.append(
(
f"group {group['bytes']} count={group['count']} "
f"span={group['span_ms']}ms cadence={cadence_text}"
)
)
hints = analysis.get("gate_session_hints") or {}
names = hints.get("observed_autonomous_report_names") or []
if names:
lines.append("observed autonomous report candidates: " + ", ".join(names))
heartbeat = hints.get("heartbeat_cadence_ms") or {}
if heartbeat.get("count"):
cadence_text = "n/a"
if heartbeat.get("average") is not None:
cadence_text = (
f"avg={heartbeat['average']:.1f}ms "
f"min={heartbeat['min']}ms max={heartbeat['max']}ms"
)
lines.append(f"heartbeat cadence count={heartbeat['count']} cadence={cadence_text}")
for transition in hints.get("active_inactive_transitions", []):
lines.append(
(
f"transition index={transition['index_hex']} "
f"{transition['from_state']}->{transition['to_state']} "
f"{transition['from_timestamp']}..{transition['to_timestamp']}"
)
)
for interruption in hints.get("heartbeat_interruptions", []):
interrupted_names = ", ".join(
item["name_candidate"] for item in interruption.get("interrupted_by", [])
)
lines.append(
(
f"heartbeat gap {interruption['from_timestamp']}..{interruption['to_timestamp']} "
f"gap={interruption['gap_ms']}ms interrupted_by={interrupted_names}"
)
)
if hints.get("caveat"):
lines.append(f"caveat: {hints['caveat']}")
return "\n".join(lines)
def main(argv: list[str] | None = None, *, stdin: TextIO | None = None, stdout: TextIO | None = None) -> int:
parser = argparse.ArgumentParser(
description="Analyze timestamped H8/536 serial capture logs and recombine 6-byte frames."
)
parser.add_argument("input", nargs="?", help="Capture log path. Use '-' or omit to read stdin.")
parser.add_argument("--json", action="store_true", help="Emit JSON instead of text.")
args = parser.parse_args(argv)
stdin = stdin or sys.stdin
stdout = stdout or sys.stdout
if args.input and args.input != "-":
text = Path(args.input).read_text(encoding="utf-8")
else:
text = stdin.read()
analysis = analyze_capture_text(text)
if args.json:
json.dump(analysis, stdout, indent=2, sort_keys=True)
stdout.write("\n")
else:
stdout.write(format_text_report(analysis))
stdout.write("\n")
return 0
def _recombine_frames(chunks: list[CaptureChunk]) -> list[dict[str, Any]]:
buffers: dict[str, list[dict[str, Any]]] = {}
frames: list[dict[str, Any]] = []
for chunk in chunks:
key = chunk.analyzer_direction
stream = buffers.setdefault(key, [])
for offset, byte in enumerate(chunk.bytes):
stream.append({"byte": byte, "chunk": chunk, "offset": offset})
_drain_valid_frames(stream, frames)
return frames
def _drain_valid_frames(stream: list[dict[str, Any]], frames: list[dict[str, Any]]) -> None:
while len(stream) >= FRAME_LENGTH:
candidate = stream[:FRAME_LENGTH]
values = [int(item["byte"]) for item in candidate]
if checksum_for(values[:5]) == values[5]:
frames.append(_frame_dict(len(frames), candidate))
del stream[:FRAME_LENGTH]
continue
realigned = False
for start in range(1, len(stream) - FRAME_LENGTH + 1):
window = stream[start : start + FRAME_LENGTH]
values = [int(item["byte"]) for item in window]
if checksum_for(values[:5]) == values[5]:
del stream[:start]
realigned = True
break
if not realigned:
break
def _frame_dict(frame_index: int, items: list[dict[str, Any]]) -> dict[str, Any]:
values = [int(item["byte"]) for item in items]
chunks = [item["chunk"] for item in items]
first: CaptureChunk = chunks[0]
source_chunk_indexes = sorted({chunk.chunk_index for chunk in chunks})
return {
"frame_index": frame_index,
"timestamp": first.timestamp,
"timestamp_ms": first.timestamp_ms,
"analyzer_direction": first.analyzer_direction,
"device_direction": first.device_direction,
"bytes": values,
"bytes_hex": [_h8(value) for value in values],
"checksum": {
"valid": True,
"expected": values[5],
"expected_hex": _h8(values[5]),
"actual": values[5],
"actual_hex": _h8(values[5]),
},
"source_chunk_indexes": source_chunk_indexes,
"source_chunk_count": len(source_chunk_indexes),
"report_candidate": _tx_report_candidate(values),
}
def _tx_report_candidate(frame: list[int]) -> dict[str, Any]:
index = (frame[0] << 16) | (frame[1] << 8) | frame[2]
value = (frame[3] << 8) | frame[4]
candidate = OBSERVED_TX_REPORT_CANDIDATES.get((index, value))
return {
"encoding": "observed_tx_index_value_report_candidate",
"confidence": "observed_candidate" if candidate else "unknown",
"index": index,
"index_hex": f"0x{index:06X}" if index > 0xFFFF else _h16(index),
"value": value,
"value_hex": _h16(value),
"observed_candidate": dict(candidate) if candidate else None,
"caveat": "Observed TX report names are capture labels, not proven protocol facts.",
}
def _repeated_groups(frames: list[Mapping[str, Any]]) -> list[dict[str, Any]]:
by_bytes: dict[tuple[int, ...], list[Mapping[str, Any]]] = {}
for frame in frames:
by_bytes.setdefault(tuple(frame["bytes"]), []).append(frame)
groups: list[dict[str, Any]] = []
for values, members in by_bytes.items():
if len(members) < 2:
continue
timestamps = [int(member["timestamp_ms"]) for member in members]
deltas = [right - left for left, right in zip(timestamps, timestamps[1:])]
groups.append(
{
"bytes": " ".join(_h8(value) for value in values),
"count": len(members),
"frame_indexes": [member["frame_index"] for member in members],
"first_timestamp": members[0]["timestamp"],
"last_timestamp": members[-1]["timestamp"],
"span_ms": timestamps[-1] - timestamps[0],
"cadence_ms": {
"samples": deltas,
"average": (sum(deltas) / len(deltas)) if deltas else None,
"min": min(deltas) if deltas else None,
"max": max(deltas) if deltas else None,
},
}
)
return sorted(groups, key=lambda group: (-int(group["count"]), str(group["bytes"])))
def _gate_session_hints(frames: list[Mapping[str, Any]]) -> dict[str, Any]:
observed = [_observed_report_frame(frame) for frame in frames]
observed = [item for item in observed if item is not None]
by_name: dict[str, list[dict[str, Any]]] = {}
for item in observed:
by_name.setdefault(str(item["name_candidate"]), []).append(item)
observed_reports = []
for name, members in sorted(by_name.items()):
observed_reports.append(
{
"name_candidate": name,
"count": len(members),
"first_timestamp": members[0]["timestamp"],
"last_timestamp": members[-1]["timestamp"],
"frame_indexes": [member["frame_index"] for member in members],
"indexes_hex": sorted({str(member["index_hex"]) for member in members}),
"values_hex": sorted({str(member["value_hex"]) for member in members}),
"states": sorted(
{
str(member["state_candidate"])
for member in members
if member.get("state_candidate")
}
),
}
)
heartbeat_frames = [
item for item in observed if item.get("name_candidate") == "heartbeat_alive_candidate"
]
heartbeat_timestamps = [int(item["timestamp_ms"]) for item in heartbeat_frames]
heartbeat_deltas = [
right - left for left, right in zip(heartbeat_timestamps, heartbeat_timestamps[1:])
]
return {
"observed_autonomous_report_names": sorted(by_name),
"observed_reports": observed_reports,
"active_inactive_transitions": _active_inactive_transitions(observed),
"heartbeat_cadence_ms": {
"count": len(heartbeat_frames),
"samples": heartbeat_deltas,
"average": (sum(heartbeat_deltas) / len(heartbeat_deltas)) if heartbeat_deltas else None,
"min": min(heartbeat_deltas) if heartbeat_deltas else None,
"max": max(heartbeat_deltas) if heartbeat_deltas else None,
},
"heartbeat_interruptions": _heartbeat_interruptions(observed),
"caveat": (
"Missing autonomous reports for other controls may reflect host/session gating "
"or capture timing, not proof that local control state did not change."
),
"evidence_scope": "capture_side_observation_only",
}
def _observed_report_frame(frame: Mapping[str, Any]) -> dict[str, Any] | None:
report = frame.get("report_candidate") or {}
candidate = report.get("observed_candidate") or {}
name = candidate.get("name_candidate")
if not name:
return None
return {
"frame_index": frame.get("frame_index"),
"timestamp": frame.get("timestamp"),
"timestamp_ms": frame.get("timestamp_ms"),
"analyzer_direction": frame.get("analyzer_direction"),
"device_direction": frame.get("device_direction"),
"name_candidate": name,
"state_candidate": candidate.get("state_candidate"),
"index": report.get("index"),
"index_hex": report.get("index_hex"),
"value": report.get("value"),
"value_hex": report.get("value_hex"),
}
def _active_inactive_transitions(observed: list[Mapping[str, Any]]) -> list[dict[str, Any]]:
by_index: dict[int, list[Mapping[str, Any]]] = {}
for item in observed:
state = item.get("state_candidate")
index = item.get("index")
if state not in {"active", "inactive"} or not isinstance(index, int):
continue
by_index.setdefault(index, []).append(item)
transitions: list[dict[str, Any]] = []
for index, members in sorted(by_index.items()):
previous: Mapping[str, Any] | None = None
for member in sorted(members, key=lambda item: int(item.get("frame_index") or 0)):
if previous is not None and previous.get("state_candidate") != member.get("state_candidate"):
transitions.append(
{
"index": index,
"index_hex": member.get("index_hex"),
"name_candidate": member.get("name_candidate"),
"from_state": previous.get("state_candidate"),
"to_state": member.get("state_candidate"),
"from_timestamp": previous.get("timestamp"),
"to_timestamp": member.get("timestamp"),
"from_frame_index": previous.get("frame_index"),
"to_frame_index": member.get("frame_index"),
}
)
previous = member
return transitions
def _heartbeat_interruptions(observed: list[Mapping[str, Any]]) -> list[dict[str, Any]]:
interruptions: list[dict[str, Any]] = []
heartbeat_positions = [
index
for index, item in enumerate(observed)
if item.get("name_candidate") == "heartbeat_alive_candidate"
]
for left, right in zip(heartbeat_positions, heartbeat_positions[1:]):
between = [
item
for item in observed[left + 1 : right]
if item.get("name_candidate") != "heartbeat_alive_candidate"
]
if not between:
continue
start = observed[left]
end = observed[right]
interruptions.append(
{
"from_frame_index": start.get("frame_index"),
"to_frame_index": end.get("frame_index"),
"from_timestamp": start.get("timestamp"),
"to_timestamp": end.get("timestamp"),
"gap_ms": int(end.get("timestamp_ms") or 0) - int(start.get("timestamp_ms") or 0),
"interrupted_by": [
{
"frame_index": item.get("frame_index"),
"timestamp": item.get("timestamp"),
"name_candidate": item.get("name_candidate"),
"state_candidate": item.get("state_candidate"),
"index_hex": item.get("index_hex"),
"value_hex": item.get("value_hex"),
}
for item in between
],
}
)
return interruptions
def _chunk_dict(chunk: CaptureChunk) -> dict[str, Any]:
return {
"chunk_index": chunk.chunk_index,
"timestamp": chunk.timestamp,
"timestamp_ms": chunk.timestamp_ms,
"analyzer_direction": chunk.analyzer_direction,
"device_direction": chunk.device_direction,
"declared_count": chunk.declared_count,
"byte_count": len(chunk.bytes),
"count_matches": chunk.declared_count == len(chunk.bytes),
"bytes": list(chunk.bytes),
"bytes_hex": [_h8(byte) for byte in chunk.bytes],
}
def _device_direction(analyzer_direction: str) -> str:
if analyzer_direction == "rx":
return "tx"
if analyzer_direction == "tx":
return "rx"
return "unknown"
def _timestamp_ms(value: str) -> int:
head, _, fraction = value.partition(".")
hours, minutes, seconds = [int(part) for part in head.split(":")]
millis = int((fraction + "000")[:3]) if fraction else 0
return ((hours * 60 + minutes) * 60 + seconds) * 1000 + millis
def _h8(value: int) -> str:
return f"0x{value & 0xFF:02X}"
def _h16(value: int) -> str:
return f"0x{value & 0xFFFF:04X}"
__all__ = [
"CaptureChunk",
"analyze_capture_chunks",
"analyze_capture_text",
"checksum_for",
"format_text_report",
"main",
"parse_capture_text",
]

504
h8536/protocol_trace.py Normal file
View File

@@ -0,0 +1,504 @@
from __future__ import annotations
import argparse
import json
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterable, Mapping, TextIO
CHECKSUM_SEED = 0x5A
FRAME_LENGTH = 6
VALID_DIRECTIONS = {"rx", "tx", "auto"}
OBSERVED_TX_REPORT_CANDIDATES = {
(0x0000, 0x0080): {
"name_candidate": "heartbeat_alive_candidate",
},
(0x0007, 0x8000): {
"name_candidate": "cam_power_button_candidate",
"state_candidate": "active",
},
(0x0015, 0x8000): {
"name_candidate": "call_button_candidate",
"state_candidate": "active",
},
(0x0015, 0x0000): {
"name_candidate": "call_button_candidate",
"state_candidate": "inactive",
},
}
@dataclass(frozen=True)
class ByteEvent:
value: int
direction_hint: str | None = None
def checksum_for(frame_prefix: Iterable[int]) -> int:
value = CHECKSUM_SEED
for byte in frame_prefix:
value ^= byte & 0xFF
return value & 0xFF
def decode_trace(
data: bytes | Iterable[int | ByteEvent],
*,
direction: str = "auto",
semantics_path: str | Path | None = None,
) -> dict[str, Any]:
if direction not in VALID_DIRECTIONS:
raise ValueError(f"direction must be one of {sorted(VALID_DIRECTIONS)}")
events = _byte_events(data)
semantics = load_semantics(semantics_path)
frames: list[dict[str, Any]] = []
previous_valid: dict[str, dict[str, Any] | None] = {"rx": None, "tx": None}
complete_len = (len(events) // FRAME_LENGTH) * FRAME_LENGTH
for frame_index, offset in enumerate(range(0, complete_len, FRAME_LENGTH)):
chunk = events[offset : offset + FRAME_LENGTH]
resolved_direction = _frame_direction(chunk, direction)
frame = _decode_frame(
[event.value for event in chunk],
frame_index=frame_index,
byte_offset=offset,
direction=resolved_direction,
semantics=semantics,
previous_valid=previous_valid,
)
frames.append(frame)
if frame["checksum"]["valid"] and resolved_direction in previous_valid:
previous_valid[resolved_direction] = frame
trailing = [event.value for event in events[complete_len:]]
return {
"kind": "h8536_protocol_trace",
"frame_length": FRAME_LENGTH,
"checksum_model": {
"algorithm": "xor",
"seed": CHECKSUM_SEED,
"seed_hex": _h8(CHECKSUM_SEED),
"covered_offsets": [0, 1, 2, 3, 4],
"checksum_offset": 5,
},
"direction_mode": direction,
"semantics": {
"loaded": semantics["loaded"],
"path": str(semantics["path"]) if semantics["path"] else None,
"command_effect_count": len(semantics["command_effects"]),
"response_schema_count": len(semantics["response_schemas"]),
"caveat": (
"Semantic names are evidence-backed candidates imported from decompiler output; "
"trace decoding does not make them protocol facts."
),
},
"frames": frames,
"trailing_bytes": [_h8(byte) for byte in trailing],
"trailing_byte_count": len(trailing),
}
def parse_byte_text(text: str, *, direction_hint: str | None = None) -> list[ByteEvent]:
events: list[ByteEvent] = []
for raw_line in text.splitlines():
line = raw_line.split("#", 1)[0].strip()
if not line:
continue
line_direction = direction_hint
lowered = line.lower()
for prefix in ("rx:", "tx:"):
if lowered.startswith(prefix):
line_direction = prefix[:2]
line = line[len(prefix) :].strip()
break
for token in _tokens(line):
events.extend(_events_from_token(token, line_direction))
return events
def load_semantics(path: str | Path | None = None) -> dict[str, Any]:
candidate = Path(path) if path else Path("build") / "rom_decompiled.json"
if not candidate.exists():
return _empty_semantics(candidate)
try:
with candidate.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
except (OSError, json.JSONDecodeError):
return _empty_semantics(candidate)
serial = payload.get("serial_protocol")
if not isinstance(serial, Mapping):
serial = payload.get("serial_semantics")
if not isinstance(serial, Mapping):
return _empty_semantics(candidate)
protocol = _first_protocol(serial)
command_effects = _mapping_by_command(
_list_value(protocol.get("command_effects")) or _list_value(serial.get("command_effects"))
)
response_schemas = _list_value(protocol.get("response_schema")) or _list_value(
serial.get("response_schema")
)
return {
"loaded": True,
"path": candidate,
"command_effects": command_effects,
"response_schemas": response_schemas,
}
def format_text_report(decoded: Mapping[str, Any]) -> str:
lines = [
"H8/536 protocol trace",
(
f"frames={len(decoded.get('frames', []))} "
f"trailing={decoded.get('trailing_byte_count', 0)} "
f"semantics={'loaded' if decoded.get('semantics', {}).get('loaded') else 'not-loaded'}"
),
]
for frame in decoded.get("frames", []):
checksum = frame["checksum"]
status = "ok" if checksum["valid"] else f"bad expected {checksum['expected_hex']}"
direction = frame.get("direction") or "unknown"
prefix = (
f"[{frame['frame_index']:04d}] {direction:<7} off={frame['byte_offset']:04d} "
f"bytes={' '.join(frame['bytes_hex'])} checksum={status} "
)
if direction == "tx":
report = frame["report"]
candidate = report.get("observed_candidate")
suffix = ""
if candidate:
name = candidate.get("name_candidate")
state = candidate.get("state_candidate")
suffix = f" observed_candidate={name}" if name else " observed_candidate"
if state:
suffix += f" state={state}"
lines.append(
(
f"{prefix}report_index={report['index_hex']} "
f"value={report['value_hex']}{suffix}"
)
)
else:
command = frame["command"]
name = command.get("name_candidate")
suffix = f" {name}" if name else ""
lines.append(
(
f"{prefix}cmd={command['value_hex']}{suffix} "
f"index={frame['index']['combined']} value={frame['payload_value']['word_be_hex']}"
)
)
for annotation in frame.get("stateful_annotations", []):
lines.append(f" candidate: {annotation['kind']} - {annotation['summary']}")
return "\n".join(lines)
def main(argv: list[str] | None = None, *, stdin: TextIO | None = None, stdout: TextIO | None = None) -> int:
parser = argparse.ArgumentParser(
description="Decode H8/536 serial byte captures into 6-byte protocol frames."
)
parser.add_argument("bytes", nargs="*", help="Byte tokens, e.g. 00 01 02 03 04 5E or rx:00010203045E")
parser.add_argument("-i", "--input", help="Input file. Use '-' or omit byte args to read stdin.")
parser.add_argument("--direction", choices=sorted(VALID_DIRECTIONS), default="auto")
parser.add_argument("--json", action="store_true", help="Emit JSON instead of text.")
parser.add_argument(
"--semantics",
default=None,
help="Decompiler JSON path. Defaults to build/rom_decompiled.json when present.",
)
args = parser.parse_args(argv)
stdin = stdin or sys.stdin
stdout = stdout or sys.stdout
events: list[ByteEvent] = []
if args.input:
if args.input == "-":
events.extend(parse_byte_text(stdin.read()))
else:
events.extend(parse_byte_text(Path(args.input).read_text(encoding="utf-8")))
if args.bytes:
events.extend(parse_byte_text(" ".join(args.bytes)))
if not events and not args.input:
events.extend(parse_byte_text(stdin.read()))
decoded = decode_trace(events, direction=args.direction, semantics_path=args.semantics)
if args.json:
json.dump(decoded, stdout, indent=2, sort_keys=True)
stdout.write("\n")
else:
stdout.write(format_text_report(decoded))
stdout.write("\n")
return 0
def _decode_frame(
frame: list[int],
*,
frame_index: int,
byte_offset: int,
direction: str | None,
semantics: Mapping[str, Any],
previous_valid: Mapping[str, dict[str, Any] | None],
) -> dict[str, Any]:
expected = checksum_for(frame[:5])
actual = frame[5]
command_value = frame[0] & 0x07
command_effect = semantics["command_effects"].get(command_value, {})
is_tx_report = direction == "tx"
decoded = {
"frame_index": frame_index,
"byte_offset": byte_offset,
"direction": direction,
"bytes": frame,
"bytes_hex": [_h8(byte) for byte in frame],
"checksum": {
"algorithm": "xor",
"seed": CHECKSUM_SEED,
"expected": expected,
"expected_hex": _h8(expected),
"actual": actual,
"actual_hex": _h8(actual),
"valid": expected == actual,
},
"command": {
"applicable": not is_tx_report,
"source_byte": frame[0],
"source_byte_hex": _h8(frame[0]),
"mask": 0x07,
"value": command_value,
"value_hex": _h8(command_value),
"name_candidate": None if is_tx_report else command_effect.get("name_candidate"),
"effect_candidate": None if is_tx_report else command_effect or None,
"caveat": "TX frames are decoded as report frames; byte0 is not treated as a command."
if is_tx_report
else None,
},
"index": {
"byte1": frame[1],
"byte1_hex": _h8(frame[1]),
"byte1_low3": frame[1] & 0x07,
"byte1_low3_hex": _h8(frame[1] & 0x07),
"byte2": frame[2],
"byte2_hex": _h8(frame[2]),
"combined": ((frame[1] & 0x07) << 8) | frame[2],
"combined_hex": _h16(((frame[1] & 0x07) << 8) | frame[2]),
},
"payload_value": {
"byte3": frame[3],
"byte3_hex": _h8(frame[3]),
"byte4": frame[4],
"byte4_hex": _h8(frame[4]),
"word_be": (frame[3] << 8) | frame[4],
"word_be_hex": _h16((frame[3] << 8) | frame[4]),
"word_le": (frame[4] << 8) | frame[3],
"word_le_hex": _h16((frame[4] << 8) | frame[3]),
},
"report": _tx_report(frame) if is_tx_report else None,
"response_schema_candidates": []
if is_tx_report
else _response_schema_candidates(semantics, command_value),
"stateful_annotations": [],
}
decoded["stateful_annotations"] = _stateful_annotations(decoded, previous_valid)
return decoded
def _tx_report(frame: list[int]) -> dict[str, Any]:
index = (frame[0] << 16) | (frame[1] << 8) | frame[2]
value = (frame[3] << 8) | frame[4]
candidate = OBSERVED_TX_REPORT_CANDIDATES.get((index, value))
return {
"encoding": "observed_tx_index_value_report_candidate",
"confidence": "observed_candidate",
"index_source_offsets": [0, 1, 2],
"index": index,
"index_hex": f"0x{index:06X}" if index > 0xFFFF else _h16(index),
"index_bytes_hex": [_h8(frame[0]), _h8(frame[1]), _h8(frame[2])],
"value_source_offsets": [3, 4],
"value": value,
"value_hex": _h16(value),
"observed_candidate": dict(candidate) if candidate else None,
"caveat": "TX report names are capture-observed candidates, not ROM-derived protocol facts.",
}
def _stateful_annotations(
frame: Mapping[str, Any],
previous_valid: Mapping[str, dict[str, Any] | None],
) -> list[dict[str, Any]]:
annotations: list[dict[str, Any]] = []
if frame.get("direction") == "tx":
return annotations
if frame["command"]["value"] != 0x07:
return annotations
direction = frame.get("direction")
same = previous_valid.get(direction) if direction in previous_valid else None
opposite_direction = "tx" if direction == "rx" else "rx" if direction == "tx" else None
opposite = previous_valid.get(opposite_direction) if opposite_direction else None
annotation = {
"kind": "retransmit_or_error_candidate",
"confidence": "candidate",
"summary": "cmd 0x07 is associated with retry/error handling in decompiler semantics.",
"evidence": ["command_low3 == 0x07"],
"previous_valid_same_direction": _previous_summary(same),
"previous_valid_opposite_direction": _previous_summary(opposite),
}
if same and same.get("bytes") == frame.get("bytes"):
annotation["evidence"].append("matches previous valid frame in same direction")
if opposite and opposite.get("bytes") == frame.get("bytes"):
annotation["evidence"].append("matches previous valid frame in opposite direction")
annotations.append(annotation)
return annotations
def _previous_summary(frame: Mapping[str, Any] | None) -> dict[str, Any] | None:
if not frame:
return None
return {
"frame_index": frame["frame_index"],
"direction": frame.get("direction"),
"bytes_hex": frame["bytes_hex"],
"command": frame["command"]["value_hex"],
"checksum_valid": frame["checksum"]["valid"],
}
def _byte_events(data: bytes | Iterable[int | ByteEvent]) -> list[ByteEvent]:
if isinstance(data, bytes):
return [ByteEvent(byte) for byte in data]
events: list[ByteEvent] = []
for item in data:
if isinstance(item, ByteEvent):
events.append(item)
else:
value = int(item)
if not 0 <= value <= 0xFF:
raise ValueError(f"byte out of range: {value}")
events.append(ByteEvent(value))
return events
def _frame_direction(chunk: list[ByteEvent], mode: str) -> str | None:
if mode in {"rx", "tx"}:
return mode
hints = {event.direction_hint for event in chunk if event.direction_hint in {"rx", "tx"}}
if len(hints) == 1:
return next(iter(hints))
return None
def _tokens(text: str) -> list[str]:
return [token for token in text.replace(",", " ").replace(";", " ").split() if token]
def _events_from_token(token: str, direction_hint: str | None) -> list[ByteEvent]:
lowered = token.lower()
for prefix in ("rx:", "tx:"):
if lowered.startswith(prefix):
return _events_from_token(token[len(prefix) :], prefix[:2])
value_text = token.strip()
if value_text.upper().startswith("H'"):
value_text = "0x" + value_text[2:]
if (
not value_text.lower().startswith("0x")
and len(value_text) > 2
and len(value_text) % 2 == 0
and all(char in "0123456789abcdefABCDEF" for char in value_text)
):
return [
ByteEvent(int(value_text[index : index + 2], 16), direction_hint)
for index in range(0, len(value_text), 2)
]
if value_text.lower().startswith("0x"):
value = int(value_text, 16)
else:
value = int(value_text, 16)
if not 0 <= value <= 0xFF:
raise ValueError(f"byte out of range: {token}")
return [ByteEvent(value, direction_hint)]
def _empty_semantics(path: Path) -> dict[str, Any]:
return {"loaded": False, "path": path, "command_effects": {}, "response_schemas": []}
def _first_protocol(serial: Mapping[str, Any]) -> Mapping[str, Any]:
protocols = serial.get("protocol_semantics")
if isinstance(protocols, list):
for protocol in protocols:
if isinstance(protocol, Mapping):
return protocol
return serial
def _list_value(value: Any) -> list[Any]:
return value if isinstance(value, list) else []
def _mapping_by_command(items: list[Any]) -> dict[int, Mapping[str, Any]]:
output: dict[int, Mapping[str, Any]] = {}
for item in items:
if not isinstance(item, Mapping):
continue
value = item.get("command_value", item.get("command"))
if isinstance(value, int):
output[value] = item
return output
def _response_schema_candidates(semantics: Mapping[str, Any], command: int) -> list[Mapping[str, Any]]:
matches: list[Mapping[str, Any]] = []
for schema in semantics.get("response_schemas", []):
if not isinstance(schema, Mapping):
continue
constants = _schema_constants(schema)
if command in constants:
matches.append(
{
"response_id": schema.get("response_id"),
"call_address_hex": schema.get("call_address_hex"),
"matched_command_byte_candidate": _h8(command),
"caveat": "Matched schema constants are candidates from decompiler output.",
}
)
return matches
def _schema_constants(value: Any) -> set[int]:
constants: set[int] = set()
if isinstance(value, Mapping):
for key, item in value.items():
if key in {"value", "constant", "constant_value"} and isinstance(item, int):
constants.add(item & 0x07)
constants.update(_schema_constants(item))
elif isinstance(value, list):
for item in value:
constants.update(_schema_constants(item))
return constants
def _h8(value: int) -> str:
return f"0x{value & 0xFF:02X}"
def _h16(value: int) -> str:
return f"0x{value & 0xFFFF:04X}"
__all__ = [
"ByteEvent",
"checksum_for",
"decode_trace",
"format_text_report",
"load_semantics",
"main",
"parse_byte_text",
]

391
h8536/serial_gate.py Normal file
View File

@@ -0,0 +1,391 @@
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Any
from .formatting import h16, label_for
JsonObject = dict[str, Any]
KEY_STATE_ADDRESSES: tuple[int, ...] = (
0xF9B0,
0xF9B4,
0xF9B5,
0xF9B9,
0xF9C0,
0xF9C3,
0xF9C5,
0xF9C6,
0xF9C8,
0xFAA2,
0xFAA3,
0xFAA5,
)
DEFAULT_INPUT = Path("build/rom_decompiled.json")
CAPTURE_OVERLAY_CAVEAT = (
"Observed report indexes 0x0007 and 0x0015 are capture overlays/runtime queue "
"entries; this analyzer does not treat them as statically proven ROM constants."
)
def load_serial_gate_input(path: Path) -> JsonObject:
with path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict) or "instructions" not in payload:
raise ValueError(f"{path} does not look like h8536_decompiler JSON output")
return payload
def analyze_serial_gate(payload: dict[str, Any]) -> JsonObject:
instructions = _instruction_sequence(payload.get("instructions"))
labels = _collect_labels(payload, instructions)
by_address = {int(ins["address"]): ins for ins in instructions if "address" in ins}
evidence = {
"scheduler_gate_loc_3FD3": _scheduler_gate(by_address),
"queue_send_gate_loc_BAF2": _queue_send_gate(by_address),
"resend_gate_path": _resend_gate_path(by_address),
"rx_session_maintenance": _rx_session_maintenance(by_address),
}
access_summary = _state_access_summary(instructions, labels)
return {
"kind": "serial_gate",
"summary": {
"state_machine_candidate": "autonomous serial TX/report queue gate",
"confidence": _confidence(evidence),
"basis": "address-driven static evidence from decompiler JSON",
},
"state_addresses": [
{"address": address, "address_hex": h16(address), "symbol": f"ram_{address:04X}"}
for address in KEY_STATE_ADDRESSES
],
"evidence": evidence,
"state_accesses": access_summary,
"caveats": [
CAPTURE_OVERLAY_CAVEAT,
"Queue entries near F870 are reached through RAM-indexed addressing; static JSON proves the access pattern, not the runtime queue contents.",
"Branch predicates are summarized from local instruction order and targets; this is not an emulator trace.",
],
}
def format_text_report(analysis: dict[str, Any]) -> str:
lines = [
"H8/536 Serial Gate/Queue State-Machine Reconstruction",
"",
f"Summary: {analysis['summary']['state_machine_candidate']}",
f"Confidence: {analysis['summary']['confidence']}",
"",
"Evidence:",
]
for key, section in analysis.get("evidence", {}).items():
title = str(section.get("title", key)).rstrip(".")
status = "present" if section.get("present") else "missing"
lines.append(f"- {title}: {status}")
summary = section.get("summary")
if summary:
lines.append(f" {summary}")
for item in section.get("items", []):
lines.append(f" - {item['address_hex']}: {item['text']}")
lines.extend(["", "State address readers/writers:"])
for entry in analysis.get("state_accesses", []):
lines.append(
f"- {entry['address_hex']}: reads={entry['read_count']} "
f"writes={entry['write_count']} read/write={entry['read_write_count']}"
)
samples = entry.get("sample_accesses", [])
if samples:
sample_text = "; ".join(f"{sample['address_hex']} {sample['access']} {sample['text']}" for sample in samples)
lines.append(f" {sample_text}")
lines.extend(["", "Caveats:"])
for caveat in analysis.get("caveats", []):
lines.append(f"- {caveat}")
return "\n".join(lines).rstrip() + "\n"
def write_serial_gate_report(input_path: Path, output_path: Path, *, as_json: bool = False) -> JsonObject:
analysis = analyze_serial_gate(load_serial_gate_input(input_path))
output_path.parent.mkdir(parents=True, exist_ok=True)
if as_json:
output_path.write_text(json.dumps(analysis, indent=2, sort_keys=True) + "\n", encoding="utf-8")
else:
output_path.write_text(format_text_report(analysis), encoding="utf-8")
return analysis
def main(argv: list[str] | None = None, stdout: Any | None = None) -> int:
parser = argparse.ArgumentParser(
description="Summarize H8/536 autonomous serial TX/report gates and queue state.",
)
parser.add_argument(
"input",
nargs="?",
type=Path,
default=DEFAULT_INPUT,
help="structured JSON emitted by h8536_decompiler.py",
)
parser.add_argument("--json", action="store_true", help="emit structured JSON instead of readable text")
parser.add_argument("--out", type=Path, default=None, help="write report to this path")
args = parser.parse_args(argv)
stream = stdout
if stream is None:
import sys
stream = sys.stdout
analysis = analyze_serial_gate(load_serial_gate_input(args.input))
if args.json:
rendered = json.dumps(analysis, indent=2, sort_keys=True) + "\n"
else:
rendered = format_text_report(analysis)
if args.out:
args.out.parent.mkdir(parents=True, exist_ok=True)
args.out.write_text(rendered, encoding="utf-8")
print(f"wrote {args.out}", file=stream)
else:
print(rendered, end="", file=stream)
return 0
def _scheduler_gate(by_address: dict[int, JsonObject]) -> JsonObject:
addresses = [0x3FD3, 0x3FD7, 0x3FD9, 0x3FDD, 0x3FDF, 0x3FE3, 0x3FE5, 0x3FE9, 0x3FEB]
items = _items(by_address, addresses)
return {
"title": "loc_3FD3 gate into loc_BAF2",
"present": _has_all(by_address, (0x3FD3, 0x3FD9, 0x3FDF, 0x3FE5, 0x3FEB)),
"summary": (
"Requires FAA2 == 0, allows the FAA5.bit7 path only when F9C3 == 0, "
"then requires F9C0 == 0 before BSR loc_BAF2."
),
"items": items,
"required_addresses_hex": [h16(address) for address in addresses],
}
def _queue_send_gate(by_address: dict[int, JsonObject]) -> JsonObject:
addresses = [
0xBAF2,
0xBAF8,
0xBAFC,
0xBAFE,
0xBB00,
0xBB08,
0xBB1C,
0xBB20,
0xBB2B,
0xBB39,
0xBB3F,
0xBB43,
0xBB46,
0xBB4C,
0xBB51,
]
return {
"title": "loc_BAF2 queue send gate",
"present": _has_all(by_address, (0xBAF2, 0xBAF8, 0xBB08, 0xBB1C, 0xBB39, 0xBB43)),
"summary": (
"F9B5 is compared against F9B0; inequality enters the send path, reads a queued "
"word via the F9B5-derived index around F870, stages F850-F854, and calls BA26 at BB43."
),
"items": _items(by_address, addresses),
"queue_table_candidate": {
"base_address_hex": h16(0xF870),
"index_address_hex": h16(0xF9B5),
"evidence_address_hex": h16(0xBB08),
"addressing_text": _text(by_address, 0xBB08),
},
"staging_addresses_hex": [h16(address) for address in range(0xF850, 0xF855)],
"send_subroutine_hex": h16(0xBA26),
"send_call_address_hex": h16(0xBB43),
}
def _resend_gate_path(by_address: dict[int, JsonObject]) -> JsonObject:
addresses = [0xBE9E, 0xBEA5, 0xBEA9, 0xBEAF, 0xBEB5, 0xBEBB, 0xBEC5, 0xBECB, 0xBED1, 0xBED5]
return {
"title": "resend gate/path",
"present": _has_all(by_address, (0xBE9E, 0xBEA5, 0xBEB5, 0xBEBB, 0xBECB, 0xBED5)),
"summary": (
"BE9E masks FAA5 with FAA3, waits for F9C6/F9C8 timeout gates, then if FAA3.bit7 "
"remains set clears F9C3 and calls BA26 from BED5."
),
"items": _items(by_address, addresses),
"resend_call_address_hex": h16(0xBED5),
"send_subroutine_hex": h16(0xBA26),
}
def _rx_session_maintenance(by_address: dict[int, JsonObject]) -> JsonObject:
addresses = [
0x3FEF,
0x3FF5,
0x3FF9,
0x3FFD,
0x4007,
0xBBCB,
0xBC0F,
0xBC15,
0xBC33,
0xBC5C,
0xBC63,
0xBCD0,
0xBCFD,
0xBD04,
0xBD6D,
0xBD71,
0xBD75,
0xBD79,
0xBDC8,
0xBDCC,
0xBDD0,
0xBDD4,
0xBDF3,
0xBDF7,
0xBDFB,
0xBDFF,
]
return {
"title": "RX/session maintenance",
"present": _has_all(by_address, (0x3FEF, 0x3FF5, 0xBBCB, 0xBC15, 0xBD6D, 0xBD79)),
"summary": (
"F9C5 timeout maintenance clears F9B5/F9B0 and FAA5.bit7; RX command processing "
"uses FAA2 as an in-session latch and paths advance F9B5/F9B0 or clear FAA3/FAA2."
),
"items": _items(by_address, addresses),
}
def _state_access_summary(instructions: list[JsonObject], labels: dict[int, str]) -> list[JsonObject]:
result: list[JsonObject] = []
for state_address in KEY_STATE_ADDRESSES:
accesses = []
for ins in instructions:
if state_address not in _reference_addresses(ins):
continue
access = _access_kind(ins, state_address)
accesses.append(
{
"address": int(ins["address"]),
"address_hex": h16(int(ins["address"])),
"function": _function_label_for_address(int(ins["address"]), labels),
"access": access,
"text": str(ins.get("text", "")),
}
)
result.append(
{
"address": state_address,
"address_hex": h16(state_address),
"read_count": sum(1 for access in accesses if access["access"] == "read"),
"write_count": sum(1 for access in accesses if access["access"] == "write"),
"read_write_count": sum(1 for access in accesses if access["access"] == "read_write"),
"accesses": accesses,
"sample_accesses": accesses[:6],
}
)
return result
def _instruction_sequence(raw: Any) -> list[JsonObject]:
if not isinstance(raw, list):
return []
return sorted(
[item for item in raw if isinstance(item, dict) and isinstance(item.get("address"), int)],
key=lambda item: int(item["address"]),
)
def _collect_labels(payload: dict[str, Any], instructions: list[JsonObject]) -> dict[int, str]:
labels: dict[int, str] = {}
nodes = payload.get("call_graph", {}).get("nodes", []) if isinstance(payload.get("call_graph"), dict) else []
if isinstance(nodes, list):
for node in nodes:
if isinstance(node, dict) and isinstance(node.get("start"), int) and node.get("label"):
labels[int(node["start"])] = str(node["label"])
return labels
def _items(by_address: dict[int, JsonObject], addresses: list[int]) -> list[JsonObject]:
return [
{
"address": address,
"address_hex": h16(address),
"text": _text(by_address, address),
"present": address in by_address,
"targets_hex": [h16(target) for target in by_address.get(address, {}).get("targets", []) if isinstance(target, int)],
}
for address in addresses
]
def _has_all(by_address: dict[int, JsonObject], addresses: tuple[int, ...]) -> bool:
return all(address in by_address for address in addresses)
def _text(by_address: dict[int, JsonObject], address: int) -> str:
return str(by_address.get(address, {}).get("text", "<missing>"))
def _reference_addresses(ins: JsonObject) -> set[int]:
addresses: set[int] = set()
refs = ins.get("references", [])
if isinstance(refs, list):
for ref in refs:
if isinstance(ref, dict) and isinstance(ref.get("address"), int):
addresses.add(int(ref["address"]))
text = str(ins.get("text", ""))
for match in re.finditer(r"@H'([0-9A-Fa-f]{4})", text):
addresses.add(int(match.group(1), 16))
return addresses
def _access_kind(ins: JsonObject, address: int) -> str:
mnemonic = str(ins.get("mnemonic", "")).upper()
operands = str(ins.get("operands", ""))
target = f"@H'{address:04X}"
upper_operands = operands.upper()
if mnemonic.startswith(("TST", "CMP", "BTST")):
return "read"
if mnemonic.startswith("CLR"):
return "write"
if mnemonic.startswith(("BSET", "BCLR", "ADD", "SUB", "INC", "DEC")):
return "read_write"
if mnemonic.startswith("MOV") and "," in upper_operands:
_src, dest = [part.strip() for part in upper_operands.rsplit(",", 1)]
return "write" if target in dest else "read"
if mnemonic.startswith(("AND", "OR", "XOR")) and "," in upper_operands:
_src, dest = [part.strip() for part in upper_operands.rsplit(",", 1)]
return "read_write" if target in dest else "read"
return "read"
def _function_label_for_address(address: int, labels: dict[int, str]) -> str:
starts = [start for start in labels if start <= address]
if not starts:
return label_for(address)
return labels[max(starts)]
def _confidence(evidence: dict[str, JsonObject]) -> str:
present_count = sum(1 for section in evidence.values() if section.get("present"))
if present_count == len(evidence):
return "high"
if present_count >= 2:
return "medium"
return "low"
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -404,6 +404,9 @@ def _semantics_lines(
lines.extend(_table_map_comment_lines(_table_map_list(protocol), opts, prefix=" * "))
lines.extend(_state_variable_comment_lines(protocol.get("state_variable_candidates"), opts, prefix=" * "))
lines.extend(_retry_error_comment_lines(protocol.get("retry_error_model"), opts, prefix=" * "))
lines.extend(_gate_queue_comment_lines(protocol.get("gate_queue_model"), opts, prefix=" * "))
lines.extend(_tx_report_comment_lines(protocol.get("tx_report_model"), opts, prefix=" * "))
lines.extend(_periodic_resend_comment_lines(protocol.get("periodic_resend_model"), opts, prefix=" * "))
lines.append(" */")
lines.append("")
@@ -436,6 +439,11 @@ def _semantics_lines(
" return 0x01FFu;",
"}",
"",
],
)
lines.extend(_gate_queue_predicate_function_lines(protocol.get("gate_queue_model")))
lines.extend(
[
"void sci1_process_candidate_protocol_command(void)",
"{",
" u8 command = sci1_rx_candidate_command();",
@@ -605,6 +613,134 @@ def _retry_error_comment_lines(
return lines
def _gate_queue_comment_lines(
value: object,
opts: SerialPseudocodeOptions,
*,
prefix: str,
) -> list[str]:
if not isinstance(value, dict):
return []
lines = [f"{prefix}gate/queue state machine candidate:"]
for predicate in _object_list(value.get("predicates")):
name = predicate.get("name") or "predicate_candidate"
condition = _comment_text(str(predicate.get("condition_candidate") or "condition unknown"))
summary = _comment_text(str(predicate.get("summary") or "candidate gate"))
lines.append(f"{prefix}- {name}: {condition}; {summary}")
for effect in _object_list(value.get("session_effects")):
name = effect.get("name") or "session_effect_candidate"
summary = _comment_text(str(effect.get("summary") or "candidate session effect"))
commands = ", ".join(str(item) for item in effect.get("command_values_hex", []) if item)
suffix = f"; commands {commands}" if commands else ""
lines.append(f"{prefix}- {name}: {summary}{suffix}")
caveat = str(value.get("caveat") or "").strip()
if caveat:
lines.append(f"{prefix}- caveat: {_comment_text(caveat)}")
evidence = _hex_join(value.get("evidence_addresses_hex"))
if opts.include_evidence and evidence:
lines.append(f"{prefix}- evidence: {evidence}")
return lines
def _gate_queue_predicate_function_lines(value: object) -> list[str]:
if not isinstance(value, dict):
return []
return [
"static bool sci1_candidate_main_report_gate_open(void)",
"{",
" bool session_idle = MEM8[0xFAA2u] == 0u;",
" bool rx_gate_open = (MEM8[0xFAA5u] & 0x80u) == 0u || MEM8[0xF9C3u] == 0u;",
" bool tx_timer_clear = MEM8[0xF9C0u] == 0u;",
"",
" return session_idle && rx_gate_open && tx_timer_clear;",
"}",
"",
"static bool sci1_candidate_report_queue_nonempty(void)",
"{",
" return MEM8[0xF9B5u] != MEM8[0xF9B0u];",
"}",
"",
"static bool sci1_candidate_periodic_resend_gate_open(void)",
"{",
" bool pending = (MEM8[0xFAA5u] & MEM8[0xFAA3u] & 0x80u) != 0u;",
" bool period_elapsed = MEM8[0xF9C6u] == 0u && MEM8[0xF9C7u] == 0u;",
" bool resend_countdown_active = MEM8[0xF9C8u] != 0u;",
"",
" return pending && period_elapsed && resend_countdown_active;",
"}",
"",
]
def _tx_report_comment_lines(
value: object,
opts: SerialPseudocodeOptions,
*,
prefix: str,
) -> list[str]:
if not isinstance(value, dict):
return []
entry = value.get("entry_label") or value.get("entry_address_hex") or "TX report path"
source = _comment_text(str(value.get("value_source_candidate") or "current value table"))
lines = [f"{prefix}TX/autonomous report model candidate:"]
lines.append(f"{prefix}- {entry} -> loc_BA26: bytes 0..2 encode candidate logical index/report id; bytes 3..4 come from {source}; byte5 is 0x5A XOR checksum")
overlay = _object_list(value.get("observed_capture_overlay_candidates"))
if overlay:
observed = []
for item in overlay[:3]:
name = item.get("name_candidate") or "observed_report_candidate"
frames = ", ".join(str(frame) for frame in item.get("observed_frames_hex", []) if frame)
if frames:
observed.append(f"{name}: {frames}")
if observed:
lines.append(f"{prefix}- observed overlay candidates: {_comment_text('; '.join(observed))}")
caveat = str(value.get("observed_autonomous_output_caveat") or value.get("caveat") or "").strip()
if caveat:
lines.append(f"{prefix}- caveat: {_comment_text(caveat)}")
evidence = _hex_join(value.get("evidence_addresses_hex"))
if opts.include_evidence and evidence:
lines.append(f"{prefix}- evidence: {evidence}")
return lines
def _periodic_resend_comment_lines(
value: object,
opts: SerialPseudocodeOptions,
*,
prefix: str,
) -> list[str]:
if not isinstance(value, dict):
return []
lines = [f"{prefix}heartbeat/periodic resend candidate:"]
period = value.get("period_timer")
if isinstance(period, dict):
lines.append(
f"{prefix}- F9C6 reload {period.get('reload_value_hex', '?')}: "
f"{_comment_text(str(period.get('summary') or 'period timer'))}",
)
countdown = value.get("resend_countdown")
if isinstance(countdown, dict):
lines.append(
f"{prefix}- F9C8 reload {countdown.get('reload_value_hex', '?')}: "
f"{_comment_text(str(countdown.get('summary') or 'resend countdown'))}",
)
pending = value.get("pending_mask")
if isinstance(pending, dict):
lines.append(
f"{prefix}- FAA3 mask {pending.get('mask_hex', '?')}: "
f"{_comment_text(str(pending.get('summary') or 'pending mask'))}",
)
resend = value.get("resend_path")
if isinstance(resend, dict):
lines.append(
f"{prefix}- BED5 resend path: {_comment_text(str(resend.get('summary') or 'candidate resend path'))}",
)
evidence = _hex_join(value.get("evidence_addresses_hex"))
if opts.include_evidence and evidence:
lines.append(f"{prefix}- evidence: {evidence}")
return lines
def _command_effect_switch_lines(command: JsonObject) -> list[str]:
effects = _object_list(command.get("effects"))[:3]
lines = []

View File

@@ -21,6 +21,14 @@ TX_CHECKSUM_ADDRESS = TX_FRAME_END
SEND_BUILDER_ADDRESS = 0xBA26
SEND_BUILDER_LABEL = "loc_BA26"
AUTONOMOUS_TX_REPORT_CALL = 0xBB43
AUTONOMOUS_TX_REPORT_LABEL = "loc_BB43"
MAIN_REPORT_GATE_ENTRY = 0x3FD3
MAIN_REPORT_GATE_CALL = 0x3FEB
SESSION_GATE_ENTRY = 0x3FEF
QUEUE_REPORT_ENTRY = 0xBAF2
RESEND_GATE_ENTRY = 0xBE9E
PERIODIC_RESEND_ENTRY = 0xBED5
INDEX_DECODER_ADDRESS = 0x622B
INDEX_DECODER_LABEL = "loc_622B"
CHECKSUM_SEED = 0x5A
@@ -74,8 +82,29 @@ STATE_VARIABLES = {
0xF9B5: "event_queue_write_or_pending_cursor_candidate",
0xF9B9: "event_queue_base_or_current_slot_candidate",
0xF9C0: "serial_tx_busy_timer_candidate",
0xF9C6: "autonomous_report_period_timer_candidate",
0xF9C8: "autonomous_report_resend_countdown_candidate",
}
OBSERVED_TX_REPORT_OVERLAY = [
{
"logical_index": 0x0000,
"name_candidate": "heartbeat_or_idle_report_candidate",
"observed_frames_hex": ["00 00 00 00 80 DA"],
"observed_period_ms_candidate": 700,
},
{
"logical_index": 0x0015,
"name_candidate": "call_button_report_candidate",
"observed_frames_hex": ["00 00 15 80 00 CF", "00 00 15 00 00 4F"],
},
{
"logical_index": 0x0007,
"name_candidate": "camera_power_report_candidate",
"observed_frames_hex": ["00 00 07 80 00 DD"],
},
]
def analyze_serial_semantics(payload: Mapping[str, Any]) -> JsonObject:
"""Infer conservative SCI1 frame/command semantics from decompiler JSON."""
@@ -100,6 +129,9 @@ def analyze_serial_semantics(payload: Mapping[str, Any]) -> JsonObject:
"table_map_candidates": [],
"state_variable_candidates": [],
"retry_error_model": None,
"gate_queue_model": None,
"tx_report_model": None,
"periodic_resend_model": None,
"confidence": "low",
"confidence_score": 0.0,
"caveat": "No protocol semantics are emitted without both RX and TX serial reconstruction candidates.",
@@ -113,6 +145,9 @@ def analyze_serial_semantics(payload: Mapping[str, Any]) -> JsonObject:
logical_tables = _logical_table_map_candidates(ordered)
state_variables = _state_variable_candidates(ordered)
retry_error_model = _retry_error_model(ordered, responses)
gate_queue_model = _gate_queue_model(ordered, commands)
tx_report_model = _tx_report_model(ordered, responses)
periodic_resend_model = _periodic_resend_model(ordered, responses)
evidence = _top_level_evidence(ordered, dispatch, responses, rx_candidate, tx_candidate)
confidence_score = _confidence_score(frame_supported, dispatch, responses, commands)
@@ -164,6 +199,9 @@ def analyze_serial_semantics(payload: Mapping[str, Any]) -> JsonObject:
"rx_fields": _rx_field_candidates(ordered, dispatch),
"response_builders": _response_builder_aliases(responses),
"retry_error_model": retry_error_model,
"gate_queue_model": gate_queue_model,
"tx_report_model": tx_report_model,
"periodic_resend_model": periodic_resend_model,
"evidence": evidence,
}
return {
@@ -181,6 +219,9 @@ def analyze_serial_semantics(payload: Mapping[str, Any]) -> JsonObject:
"table_map_candidates": protocol["table_map_candidates"],
"state_variable_candidates": protocol["state_variable_candidates"],
"retry_error_model": protocol["retry_error_model"],
"gate_queue_model": protocol["gate_queue_model"],
"tx_report_model": protocol["tx_report_model"],
"periodic_resend_model": protocol["periodic_resend_model"],
"confidence": protocol["confidence"],
"confidence_score": protocol["confidence_score"],
"caveat": protocol["caveat"],
@@ -1576,6 +1617,324 @@ def _retry_error_model(ordered: list[JsonObject], responses: list[JsonObject]) -
}
def _gate_queue_model(ordered: list[JsonObject], commands: list[JsonObject]) -> JsonObject | None:
evidence = _dedupe_ints(
_addresses_in_ranges(ordered, [(MAIN_REPORT_GATE_ENTRY, MAIN_REPORT_GATE_CALL)], MAIN_REPORT_GATE_ENTRY, MAIN_REPORT_GATE_CALL)
+ _addresses_in_ranges(ordered, [(SESSION_GATE_ENTRY, 0x4007)], SESSION_GATE_ENTRY, 0x4007)
+ _addresses_in_ranges(ordered, [(QUEUE_REPORT_ENTRY, AUTONOMOUS_TX_REPORT_CALL)], QUEUE_REPORT_ENTRY, AUTONOMOUS_TX_REPORT_CALL)
+ _addresses_in_ranges(ordered, [(RESEND_GATE_ENTRY, PERIODIC_RESEND_ENTRY)], RESEND_GATE_ENTRY, PERIODIC_RESEND_ENTRY)
)
command_ack_values = [
int(command["command_value"])
for command in commands
if command.get("command_value") in {0x05, 0x06}
]
if not evidence and not command_ack_values:
return None
return {
"kind": "serial_gate_queue_state_machine_candidate",
"summary": (
"Conservative model for autonomous report gating, queue cursor comparison, "
"periodic resend, and RX/session side effects."
),
"predicates": [
{
"name": "main_loop_may_enter_report_builder",
"entry_label": "loc_3FD3",
"target_label": "loc_BAF2",
"condition_candidate": (
"FAA2 == 0 && F9C0 == 0 && ((FAA5.bit7 == 0) || (F9C3 == 0))"
),
"summary": "Main-loop report gate; session must be idle, TX busy timer clear, and RX gate open.",
"state_addresses_hex": [_h16(0xFAA2), _h16(0xFAA5), _h16(0xF9C3), _h16(0xF9C0)],
"evidence_addresses": _addresses_in_ranges(
ordered,
[(MAIN_REPORT_GATE_ENTRY, MAIN_REPORT_GATE_CALL)],
MAIN_REPORT_GATE_ENTRY,
MAIN_REPORT_GATE_CALL,
),
},
{
"name": "queue_has_pending_report",
"entry_label": "loc_BAF2",
"condition_candidate": "F9B5 != F9B0",
"summary": "Queue/pending cursor gate; non-empty state stages through BB43 before loc_BA26.",
"state_addresses_hex": [_h16(0xF9B5), _h16(0xF9B0)],
"staging_path": ["loc_BAF2", "loc_BB43", "loc_BA26"],
"evidence_addresses": _addresses_in_ranges(
ordered,
[(QUEUE_REPORT_ENTRY, AUTONOMOUS_TX_REPORT_CALL)],
QUEUE_REPORT_ENTRY,
AUTONOMOUS_TX_REPORT_CALL,
),
},
{
"name": "periodic_resend_may_fire",
"entry_label": "loc_BE9E",
"target_label": "loc_BED5",
"condition_candidate": (
"(FAA5 & FAA3 & 0x80) != 0 && F9C6 == 0 && F9C8 != 0 after countdown"
),
"summary": "Resend gate masks pending state with FAA5, checks F9C6/F9C8, then calls BA26 at BED5.",
"state_addresses_hex": [_h16(0xFAA5), _h16(0xFAA3), _h16(0xF9C6), _h16(0xF9C8)],
"evidence_addresses": _addresses_in_ranges(
ordered,
[(RESEND_GATE_ENTRY, PERIODIC_RESEND_ENTRY)],
RESEND_GATE_ENTRY,
PERIODIC_RESEND_ENTRY,
),
},
],
"session_effects": [
{
"name": "rx_completion_sets_session_timer",
"summary": "RX completion sets F9C5 (observed reload H'14) after the sixth byte is captured.",
"state_addresses_hex": [_h16(0xF9C5)],
"evidence_addresses": _state_immediate_evidence(ordered, 0xF9C5, 0x14),
},
{
"name": "session_timeout_clears_gate_and_queue",
"entry_label": "loc_3FEF",
"summary": "When F9C5 is clear, loc_3FEF clears F9B5/F9B0 and clears FAA5.bit7; when nonzero, it sets FAA5.bit7.",
"state_addresses_hex": [_h16(0xF9C5), _h16(0xF9B5), _h16(0xF9B0), _h16(0xFAA5)],
"evidence_addresses": _addresses_in_ranges(
ordered,
[(SESSION_GATE_ENTRY, 0x4007)],
SESSION_GATE_ENTRY,
0x4007,
),
},
{
"name": "host_ack_can_advance_queue",
"summary": "Commands 0x05/0x06 are modeled as acknowledgement paths that can clear pending state or advance F9B5.",
"command_values_hex": [_h16(value, width=2) for value in command_ack_values],
"state_addresses_hex": [_h16(0xF9B5)],
"evidence_addresses": _dedupe_ints(
addr
for command in commands
if command.get("command_value") in {0x05, 0x06}
for addr in command.get("evidence_addresses", [])
if isinstance(addr, int)
),
},
],
"caveat": (
"Many panel controls may require host/session traffic before reporting. Observed "
"autonomous call/camera-power indexes are runtime/capture overlays, not ROM constants."
),
"confidence": "candidate-medium",
"evidence_addresses": evidence,
"evidence_addresses_hex": _hlist(evidence),
}
def _tx_report_model(ordered: list[JsonObject], responses: list[JsonObject]) -> JsonObject | None:
report_responses = [
response for response in responses
if response.get("call_address") == AUTONOMOUS_TX_REPORT_CALL
]
if not report_responses:
report_responses = [
response for response in responses
if _response_reads_current_value_table(response)
and not _response_reads_rx_frame(response)
]
if not report_responses:
return None
response_ids = [
str(response["id"])
for response in report_responses
if isinstance(response.get("id"), str)
]
evidence = _dedupe_ints(
addr
for response in report_responses
for addr in response.get("evidence_addresses", [])
if isinstance(addr, int)
)
byte_roles = [
{
"offset": 0,
"field_candidate": "encoded_logical_index_or_report_id_byte0",
"source_candidate": "computed from candidate logical index/report id",
},
{
"offset": 1,
"field_candidate": "encoded_logical_index_or_report_id_byte1",
"source_candidate": "computed from candidate logical index/report id",
},
{
"offset": 2,
"field_candidate": "encoded_logical_index_or_report_id_byte2",
"source_candidate": "computed from candidate logical index/report id",
},
{
"offset": 3,
"field_candidate": "current_value_hi",
"source_candidate": "current_value_table_candidate high byte",
"table_candidate": "current_value_table_candidate",
},
{
"offset": 4,
"field_candidate": "current_value_lo",
"source_candidate": "current_value_table_candidate low byte",
"table_candidate": "current_value_table_candidate",
},
{
"offset": 5,
"field_candidate": "checksum",
"source_candidate": "0x5A XOR TX[0..4]",
},
]
return {
"kind": "bb43_to_ba26_tx_report_model_candidate",
"direction": "device_to_host_autonomous_report_candidate",
"entry_label": AUTONOMOUS_TX_REPORT_LABEL,
"entry_address": AUTONOMOUS_TX_REPORT_CALL,
"entry_address_hex": _h16(AUTONOMOUS_TX_REPORT_CALL),
"send_builder": SEND_BUILDER_LABEL,
"send_builder_address": SEND_BUILDER_ADDRESS,
"send_builder_address_hex": _h16(SEND_BUILDER_ADDRESS),
"response_candidates": _dedupe_strings(response_ids),
"summary": (
"TX report bytes 0..2 are computed encoded logical index/report id bytes, "
"bytes 3..4 come from current_value_table_candidate, and byte5 is the "
"0x5A XOR checksum."
),
"byte_roles": byte_roles,
"value_source_candidate": "current_value_table_candidate",
"checksum_formula": "checksum = 0x5A ^ byte0 ^ byte1 ^ byte2 ^ byte3 ^ byte4",
"observed_capture_overlay_candidates": OBSERVED_TX_REPORT_OVERLAY,
"observed_autonomous_output_caveat": (
"Real captures supplied so far show only heartbeat/idle, call, and camera-power "
"autonomous TX frames. Other panel controls may require a host/device request or "
"state transition before the firmware reports them."
),
"confidence": "candidate-medium",
"caveat": (
"This is a TX/report model for the BB43 -> BA26 path, separate from RX command "
"dispatch. Observed report names are a capture overlay candidate only, not hard-coded "
"source truth."
),
"evidence_addresses": evidence,
"evidence_addresses_hex": _hlist(evidence),
}
def _periodic_resend_model(ordered: list[JsonObject], responses: list[JsonObject]) -> JsonObject | None:
del responses
period_evidence = _state_immediate_evidence(ordered, 0xF9C6, 0x01F4)
countdown_evidence = _state_immediate_evidence(ordered, 0xF9C8, 0x14)
pending_evidence = _state_immediate_evidence(ordered, 0xFAA3, 0x80)
pending_evidence = _dedupe_ints(pending_evidence + _state_bit_evidence(ordered, 0xFAA3, 7))
resend_evidence = [
int(ins["address"])
for ins in ordered
if PERIODIC_RESEND_ENTRY <= int(ins.get("address", -1)) <= SERIAL_HANDLER_END
]
resend_send_evidence = [
int(ins["address"])
for ins in ordered
if PERIODIC_RESEND_ENTRY <= int(ins.get("address", -1)) <= SERIAL_HANDLER_END
and (_is_send_builder_call(ins) or _has_ref_in_range(ins, TX_STAGING_START, TX_FRAME_END))
]
evidence = _dedupe_ints(period_evidence + countdown_evidence + pending_evidence + resend_send_evidence)
if not evidence and not resend_evidence:
return None
return {
"kind": "autonomous_periodic_resend_model_candidate",
"period_timer": {
"address": 0xF9C6,
"address_hex": _h16(0xF9C6),
"reload_value_candidate": 0x01F4,
"reload_value_hex": _h16(0x01F4),
"summary": "Candidate periodic report/heartbeat timer reload.",
"evidence_addresses": period_evidence,
"evidence_addresses_hex": _hlist(period_evidence),
},
"resend_countdown": {
"address": 0xF9C8,
"address_hex": _h16(0xF9C8),
"reload_value_candidate": 0x14,
"reload_value_hex": _h16(0x14, width=2),
"summary": "Candidate periodic resend countdown/retry spacing value.",
"evidence_addresses": countdown_evidence,
"evidence_addresses_hex": _hlist(countdown_evidence),
},
"pending_mask": {
"address": 0xFAA3,
"address_hex": _h16(0xFAA3),
"mask_candidate": 0x80,
"mask_hex": _h16(0x80, width=2),
"summary": "Candidate bit/mask that marks an autonomous report pending.",
"evidence_addresses": pending_evidence,
"evidence_addresses_hex": _hlist(pending_evidence),
},
"resend_path": {
"entry_label": "loc_BED5",
"entry_address": PERIODIC_RESEND_ENTRY,
"entry_address_hex": _h16(PERIODIC_RESEND_ENTRY),
"summary": "Candidate periodic resend path feeding the TX staging/send-builder flow.",
"evidence_addresses": _dedupe_ints(resend_send_evidence or resend_evidence),
"evidence_addresses_hex": _hlist(resend_send_evidence or resend_evidence),
},
"evidence_addresses": evidence,
"evidence_addresses_hex": _hlist(evidence),
"confidence": "candidate-medium" if evidence else "candidate-low",
"caveat": (
"Timer and resend roles are inferred from constants/state references around F9C6, "
"F9C8, FAA3, and loc_BED5; exact scheduling units remain candidate phrasing."
),
}
def _response_reads_current_value_table(response: Mapping[str, Any]) -> bool:
schema = response.get("schema")
if not isinstance(schema, Mapping):
return False
return any(
isinstance(item, Mapping)
and isinstance(item.get("source"), Mapping)
and item["source"].get("kind") == "table"
and item["source"].get("name_candidate") == "current_value_table_candidate"
for item in schema.get("bytes", [])
)
def _response_reads_rx_frame(response: Mapping[str, Any]) -> bool:
schema = response.get("schema")
if not isinstance(schema, Mapping):
return False
return any(
isinstance(item, Mapping)
and isinstance(item.get("source"), Mapping)
and item["source"].get("kind") == "rx_frame_byte"
for item in schema.get("bytes", [])
)
def _state_immediate_evidence(ordered: list[JsonObject], state_address: int, value: int) -> list[int]:
evidence = []
for ins in ordered:
if not _has_ref_in_range(ins, state_address, state_address):
continue
source, _destination = _source_destination_operands(str(ins.get("operands", "")))
if _parse_immediate(source) == value:
evidence.append(int(ins["address"]))
return _dedupe_ints(evidence)
def _state_bit_evidence(ordered: list[JsonObject], state_address: int, bit: int) -> list[int]:
return _dedupe_ints(
int(ins["address"])
for ins in ordered
if _has_ref_in_range(ins, state_address, state_address)
and _bit_number_from_instruction(ins) == bit
)
def _send_builder_candidate(
ordered: list[JsonObject],
responses: list[JsonObject],
@@ -1687,6 +2046,28 @@ def _top_level_evidence(
"response_count": len(responses),
}
)
tx_report_responses = [
response for response in responses
if response.get("call_address") == AUTONOMOUS_TX_REPORT_CALL
]
if tx_report_responses:
addresses = _dedupe_ints(
addr
for response in tx_report_responses
for addr in response.get("evidence_addresses", [])
if isinstance(addr, int)
)
evidence.append(
{
"kind": "bb43_autonomous_tx_report_path",
"summary": (
"BB43 stages a candidate device-to-host report before loc_BA26; this is "
"separate from RX command dispatch."
),
"addresses": addresses,
"addresses_hex": _hlist(addresses),
}
)
rx_payload_reads = [
int(ins["address"])
for ins in ordered

830
h8536/table_xrefs.py Normal file
View File

@@ -0,0 +1,830 @@
from __future__ import annotations
import argparse
import json
import re
from collections.abc import Iterable, Mapping
from pathlib import Path
from typing import Any
from .formatting import h16, label_for
from .serial_semantics import DIRECT_TABLE_TO_LOGICAL_OFFSET, LOGICAL_TABLES
JsonObject = dict[str, Any]
TABLES: tuple[JsonObject, ...] = (
{
"name": "primary_value_table_candidate",
"logical_base_address": 0xE000,
"logical_range_end": 0xE3FF,
"negative_offset": 0x2000,
"element_candidate": "word_value",
"direct_addresses": [0xF900],
"direct_range_end": 0xF91F,
},
{
"name": "secondary_value_table_candidate",
"logical_base_address": 0xE400,
"logical_range_end": 0xE7FF,
"negative_offset": 0x1C00,
"element_candidate": "word_value",
"direct_addresses": [0xF940],
"direct_range_end": 0xF95F,
},
{
"name": "current_value_table_candidate",
"logical_base_address": 0xE800,
"logical_range_end": 0xEBFF,
"negative_offset": 0x1800,
"element_candidate": "word_value",
"direct_addresses": [0xF920],
"direct_range_end": 0xF93F,
},
{
"name": "flag_table_candidate",
"logical_base_address": 0xEC00,
"logical_range_end": 0xEFFF,
"negative_offset": 0x1400,
"element_candidate": "bit_flags",
"direct_addresses": [0xF980],
"direct_range_end": 0xF99F,
},
)
_TABLE_BY_NEGATIVE_OFFSET = {int(item["negative_offset"]): item for item in TABLES}
_TABLE_BY_DIRECT_ADDRESS = {
address: item
for item in TABLES
for address in item["direct_addresses"]
}
LCD_CORRELATION_TERMS = (
"CONNECT",
"CONNECT: OK",
"CONNECT: NOT ACT",
"NOT ACT",
"COMM LINK",
"COMPLETED",
)
def load_table_xref_input(path: Path) -> JsonObject:
with path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict) or "instructions" not in payload:
raise ValueError(f"{path} does not look like h8536_decompiler JSON output")
return payload
def analyze_table_xrefs(payload: Mapping[str, Any]) -> JsonObject:
instructions = _instruction_sequence(payload.get("instructions"))
functions = _function_ranges(payload)
semantic_accesses = _semantic_access_locations(payload)
accesses_by_table = {str(table["name"]): [] for table in TABLES}
for index, ins in enumerate(instructions):
for access in _logical_operand_accesses(instructions, index, functions, semantic_accesses):
accesses_by_table.setdefault(str(access["table"]), []).append(access)
for access in _direct_address_accesses(ins, functions, semantic_accesses):
accesses_by_table.setdefault(str(access["table"]), []).append(access)
tables: list[JsonObject] = []
for table in TABLES:
name = str(table["name"])
accesses = sorted(accesses_by_table.get(name, []), key=lambda item: int(item["instruction_address"]))
reads = sum(1 for access in accesses if access["access"] == "read")
writes = sum(1 for access in accesses if access["access"] == "write")
read_write = sum(1 for access in accesses if access["access"] == "read_write_candidate")
dynamic = sum(1 for access in accesses if access.get("index") == "dynamic")
static_offsets = sorted(
{
int(access["offset"])
for access in accesses
if isinstance(access.get("offset"), int)
}
)
tables.append(
{
"name": name,
"logical_base_address": table["logical_base_address"],
"logical_base_address_hex": h16(int(table["logical_base_address"])),
"logical_range_end": table["logical_range_end"],
"logical_range_end_hex": h16(int(table["logical_range_end"])),
"negative_offset": table["negative_offset"],
"negative_offset_hex": h16(int(table["negative_offset"])),
"element_candidate": table["element_candidate"],
"direct_addresses": table["direct_addresses"],
"direct_addresses_hex": [h16(int(address)) for address in table["direct_addresses"]],
"direct_range_end": table["direct_range_end"],
"direct_range_end_hex": h16(int(table["direct_range_end"])),
"access_count": len(accesses),
"read_count": reads,
"write_count": writes,
"read_write_candidate_count": read_write,
"dynamic_index_count": dynamic,
"static_offsets": static_offsets,
"static_offsets_hex": [h16(offset) for offset in static_offsets],
"functions": _summarize_functions(accesses),
"accesses": accesses,
}
)
return {
"kind": "table_xrefs",
"tables": tables,
"summary": {
"table_count": len(tables),
"access_count": sum(int(table["access_count"]) for table in tables),
"dynamic_index_count": sum(int(table["dynamic_index_count"]) for table in tables),
"source_instruction_count": len(instructions),
},
"lcd_correlation": _lcd_correlation_hints(payload),
"caveat": (
"Static offsets are emitted only when an index register value can be derived from "
"nearby immediate loads in the current JSON. Other indexed accesses are dynamic."
),
}
def generate_table_xref_report(payload: Mapping[str, Any], *, source_name: str = "") -> str:
analysis = analyze_table_xrefs(payload)
lines: list[str] = []
suffix = f" for {source_name}" if source_name else ""
lines.append(f"Table/Index Cross-Reference Report{suffix}")
lines.append("=" * len(lines[0]))
lines.append("")
lines.append(str(analysis["caveat"]))
lines.append("")
lines.extend(_format_lcd_correlation_lines(analysis.get("lcd_correlation")))
if lines[-1] != "":
lines.append("")
for table in analysis["tables"]:
name = str(table["name"])
direct = ", ".join(str(item) for item in table["direct_addresses_hex"])
lines.append(
f"{name} {table['logical_base_address_hex']}-{table['logical_range_end_hex']} "
f"(negative {table['negative_offset_hex']}; direct {direct}-{table['direct_range_end_hex']})"
)
lines.append(
f" accesses={table['access_count']} reads={table['read_count']} "
f"writes={table['write_count']} dynamic={table['dynamic_index_count']}"
)
offsets = table.get("static_offsets_hex") or []
if offsets:
lines.append(f" static offsets: {', '.join(str(item) for item in offsets[:16])}")
function_summaries = table.get("functions") or []
if function_summaries:
joined = ", ".join(
f"{item['label']}:{item['access_count']}" for item in function_summaries[:12]
)
lines.append(f" functions: {joined}")
accesses = table.get("accesses")
if isinstance(accesses, list) and accesses:
for access in accesses[:80]:
lines.append(f" - {_format_access_line(access)}")
if len(accesses) > 80:
lines.append(f" - ... {len(accesses) - 80} more accesses omitted")
else:
lines.append(" no references found in current JSON")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def _format_lcd_correlation_lines(value: Any) -> list[str]:
if not isinstance(value, Mapping):
return []
lines = ["LCD correlation hints"]
for hit in value.get("term_hits", []):
if not isinstance(hit, Mapping):
continue
term = hit.get("term")
count = int(hit.get("hit_count", 0))
if count:
samples = ", ".join(
f"{item['address_hex']} {item['trimmed']!r}"
for item in hit.get("hits", [])[:4]
if isinstance(item, Mapping)
)
lines.append(f" term {term!r}: {count} candidate hit(s): {samples}")
else:
lines.append(f" term {term!r}: no LCD/text candidate hits in current decompile")
builders = value.get("display_builder_targets", [])
if isinstance(builders, list) and builders:
parts = [
f"{item['target_hex']}:{item['xref_count']}"
for item in builders[:8]
if isinstance(item, Mapping)
]
lines.append(f" display builder xrefs: {', '.join(parts)}")
routines = value.get("lcd_driver_routines", [])
if isinstance(routines, list) and routines:
parts = [
f"{item['start_hex']} {item['role_hint']}"
for item in routines[:4]
if isinstance(item, Mapping)
]
lines.append(f" LCD driver routines: {', '.join(parts)}")
lines.append(
" caveat: LCD strings can be builder/script output; absence of a literal term does not disprove runtime composition."
)
return lines
def write_table_xrefs(input_path: Path, output_path: Path, *, as_json: bool = False) -> None:
payload = load_table_xref_input(input_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if as_json:
analysis = analyze_table_xrefs(payload)
analysis["source"] = str(input_path)
output_path.write_text(json.dumps(analysis, indent=2), encoding="utf-8")
else:
output_path.write_text(generate_table_xref_report(payload, source_name=str(input_path)), encoding="utf-8")
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description="Generate table/index cross-references for candidate serial protocol data tables.",
)
parser.add_argument(
"input",
nargs="?",
type=Path,
default=Path("build/rom_decompiled.json"),
help="structured JSON emitted by h8536_decompiler.py",
)
parser.add_argument(
"--out",
type=Path,
default=Path("build/rom_table_xrefs.txt"),
help="table cross-reference report output path",
)
parser.add_argument("--json", action="store_true", help="write structured JSON instead of text")
args = parser.parse_args(argv)
write_table_xrefs(args.input, args.out, as_json=args.json)
print(f"wrote {args.out}")
return 0
def _logical_operand_accesses(
instructions: list[JsonObject],
index: int,
functions: list[JsonObject],
semantic_accesses: Mapping[int, list[JsonObject]],
) -> list[JsonObject]:
ins = instructions[index]
accesses: list[JsonObject] = []
operands = str(ins.get("operands", ""))
for operand in _negative_indexed_operands(operands):
table = _TABLE_BY_NEGATIVE_OFFSET.get(int(operand["negative_offset"]))
if table is None:
continue
register = str(operand["index_register"])
known = _nearby_register_value(instructions, index, register)
offset: int | str = known if known is not None else "dynamic"
logical_address: int | None = None
if isinstance(offset, int):
logical_address = (int(table["logical_base_address"]) + offset) & 0xFFFF
access = _base_access(ins, functions, semantic_accesses)
access.update(
{
"table": table["name"],
"table_base_address": table["logical_base_address"],
"table_base_address_hex": h16(int(table["logical_base_address"])),
"kind": "logical_negative_indexed_access",
"operand": operand["operand"],
"negative_offset": operand["negative_offset"],
"negative_offset_hex": h16(int(operand["negative_offset"])),
"index_register": register,
"index": offset,
"offset": offset,
"access": _operand_access_kind(ins, str(operand["operand"])),
}
)
if logical_address is not None:
access["logical_address"] = logical_address
access["logical_address_hex"] = h16(logical_address)
accesses.append(access)
return accesses
def _direct_address_accesses(
ins: Mapping[str, Any],
functions: list[JsonObject],
semantic_accesses: Mapping[int, list[JsonObject]],
) -> list[JsonObject]:
accesses: list[JsonObject] = []
refs = _references(ins)
for address in refs:
logical_table = _table_for_logical_address(address)
if logical_table is not None:
accesses.append(
_direct_logical_address_access(ins, logical_table, address, functions, semantic_accesses),
)
continue
direct_table = _table_for_direct_candidate_address(address)
if direct_table is not None:
accesses.append(
_direct_candidate_address_access(ins, direct_table, address, functions, semantic_accesses),
)
return accesses
def _direct_logical_address_access(
ins: Mapping[str, Any],
table: Mapping[str, Any],
address: int,
functions: list[JsonObject],
semantic_accesses: Mapping[int, list[JsonObject]],
) -> JsonObject:
base = int(table["logical_base_address"])
offset = address - base
access = _base_access(ins, functions, semantic_accesses)
access.update(
{
"table": table["name"],
"table_base_address": base,
"table_base_address_hex": h16(base),
"kind": "direct_logical_address_access",
"direct_address": address,
"direct_address_hex": h16(address),
"logical_address": address,
"logical_address_hex": h16(address),
"index": offset,
"offset": offset,
"offset_hex": h16(offset),
"access": _access_direction(ins, address) or "read_write_candidate",
}
)
return access
def _direct_candidate_address_access(
ins: Mapping[str, Any],
table: Mapping[str, Any],
address: int,
functions: list[JsonObject],
semantic_accesses: Mapping[int, list[JsonObject]],
) -> JsonObject:
base = min(int(item) for item in table["direct_addresses"])
offset = address - base
access = _base_access(ins, functions, semantic_accesses)
logical_offset = DIRECT_TABLE_TO_LOGICAL_OFFSET.get(base)
access.update(
{
"table": table["name"],
"table_base_address": table["logical_base_address"],
"table_base_address_hex": h16(int(table["logical_base_address"])),
"kind": "direct_candidate_address_access",
"direct_address": address,
"direct_address_hex": h16(address),
"direct_base_address": base,
"direct_base_address_hex": h16(base),
"index": offset,
"offset": offset,
"offset_hex": h16(offset),
"access": _access_direction(ins, address) or "read_write_candidate",
}
)
if logical_offset is not None:
access["semantic_negative_offset"] = logical_offset
access["semantic_negative_offset_hex"] = h16(logical_offset)
return access
def _lcd_correlation_hints(payload: Mapping[str, Any]) -> JsonObject:
lcd_text = payload.get("lcd_text")
strings = []
if isinstance(lcd_text, Mapping) and isinstance(lcd_text.get("strings"), list):
strings = [item for item in lcd_text["strings"] if isinstance(item, Mapping)]
term_hits = []
for term in LCD_CORRELATION_TERMS:
hits = []
upper_term = term.upper()
for item in strings:
text = f"{item.get('text', '')} {item.get('trimmed', '')}".upper()
if upper_term not in text:
continue
hits.append(_lcd_string_summary(item))
term_hits.append(
{
"term": term,
"hit_count": len(hits),
"hits": hits[:24],
"status": "candidate_hits" if hits else "not_found",
}
)
builder_targets: dict[int, JsonObject] = {}
for item in strings:
for xref in item.get("xrefs", []):
if not isinstance(xref, Mapping):
continue
following = xref.get("following_bsr")
if not isinstance(following, Mapping) or not isinstance(following.get("target"), int):
continue
target = int(following["target"])
record = builder_targets.setdefault(
target,
{
"target": target,
"target_hex": h16(target),
"xref_count": 0,
"examples": [],
},
)
record["xref_count"] = int(record["xref_count"]) + 1
examples = record["examples"]
if isinstance(examples, list) and len(examples) < 8:
examples.append(
{
"text_address": item.get("address"),
"text_address_hex": h16(int(item["address"])) if isinstance(item.get("address"), int) else None,
"trimmed": item.get("trimmed"),
"xref_address": xref.get("address"),
"xref_address_hex": h16(int(xref["address"])) if isinstance(xref.get("address"), int) else None,
}
)
lcd_driver = payload.get("lcd_driver")
routines = []
if isinstance(lcd_driver, Mapping) and isinstance(lcd_driver.get("routines"), list):
for routine in lcd_driver["routines"]:
if not isinstance(routine, Mapping) or not isinstance(routine.get("start"), int):
continue
routines.append(
{
"start": routine["start"],
"start_hex": h16(int(routine["start"])),
"end": routine.get("end"),
"end_hex": h16(int(routine["end"])) if isinstance(routine.get("end"), int) else None,
"role_hint": routine.get("role_hint"),
"roles": routine.get("roles", []),
}
)
return {
"terms": list(LCD_CORRELATION_TERMS),
"term_hits": term_hits,
"display_builder_targets": sorted(
builder_targets.values(),
key=lambda item: (-int(item["xref_count"]), int(item["target"])),
),
"lcd_driver_routines": routines,
"caveat": (
"This is a static correlation helper. It reports text/script candidates and LCD driver "
"routines in the same decompile; it does not prove a protocol field directly causes a string."
),
}
def _lcd_string_summary(item: Mapping[str, Any]) -> JsonObject:
address = item.get("address")
return {
"address": address,
"address_hex": h16(int(address)) if isinstance(address, int) else None,
"text": item.get("text"),
"trimmed": item.get("trimmed"),
"confidence": item.get("confidence"),
"xref_count": item.get("xref_count", 0),
}
def _base_access(
ins: Mapping[str, Any],
functions: list[JsonObject],
semantic_accesses: Mapping[int, list[JsonObject]],
) -> JsonObject:
address = int(ins["address"])
function = _function_for_address(functions, address)
access: JsonObject = {
"instruction_address": address,
"instruction_address_hex": h16(address),
"mnemonic": str(ins.get("mnemonic", "")),
"operands": str(ins.get("operands", "")),
"instruction": str(ins.get("text") or _instruction_text(ins)),
"references": _references(ins),
"references_hex": [h16(ref) for ref in _references(ins)],
"targets": _targets(ins),
"targets_hex": [h16(target) for target in _targets(ins)],
"label": _label_for_instruction(ins),
"semantic_candidates": semantic_accesses.get(address, []),
}
if function:
access["function_start"] = function["start"]
access["function_start_hex"] = h16(int(function["start"]))
access["function_label"] = function["label"]
return access
def _semantic_access_locations(payload: Mapping[str, Any]) -> dict[int, list[JsonObject]]:
locations: dict[int, list[JsonObject]] = {}
semantics = payload.get("serial_semantics")
if not isinstance(semantics, Mapping):
return locations
sources: list[Any] = []
protocols = semantics.get("protocol_semantics")
if isinstance(protocols, list):
sources.extend(protocols)
sources.append(semantics)
for source in sources:
if not isinstance(source, Mapping):
continue
for item in _table_candidate_items(source.get("table_map_candidates")):
for access in _table_candidate_items(item.get("accesses")):
address = access.get("instruction_address")
if isinstance(address, int):
locations.setdefault(address, []).append(
{
"name_candidate": item.get("name_candidate"),
"kind": item.get("kind"),
"confidence": item.get("confidence"),
}
)
return locations
def _table_candidate_items(value: Any) -> list[Mapping[str, Any]]:
if isinstance(value, Mapping):
return [item for item in value.values() if isinstance(item, Mapping)]
if isinstance(value, list):
return [item for item in value if isinstance(item, Mapping)]
return []
def _format_access_line(access: Mapping[str, Any]) -> str:
function = access.get("function_label") or "<no function>"
operand = access.get("operand") or access.get("direct_address_hex")
index = access.get("index")
if index == "dynamic":
index_text = f"index dynamic via {access.get('index_register')} operand {operand}"
else:
index_text = f"offset {h16(int(index or 0))}"
if access.get("logical_address_hex"):
index_text += f" -> {access['logical_address_hex']}"
elif access.get("direct_address_hex"):
index_text += f" at {access['direct_address_hex']}"
return (
f"{access['instruction_address_hex']} {access['access']} {index_text}; "
f"{function}; {access['instruction']}"
)
def _summarize_functions(accesses: Iterable[Mapping[str, Any]]) -> list[JsonObject]:
summaries: dict[int, JsonObject] = {}
for access in accesses:
start = access.get("function_start")
if not isinstance(start, int):
start = -1
summary = summaries.setdefault(
start,
{
"start": start if start >= 0 else None,
"start_hex": h16(start) if start >= 0 else None,
"label": access.get("function_label") or "<no function>",
"access_count": 0,
"reads": 0,
"writes": 0,
},
)
summary["access_count"] = int(summary["access_count"]) + 1
if access.get("access") == "read":
summary["reads"] = int(summary["reads"]) + 1
elif access.get("access") == "write":
summary["writes"] = int(summary["writes"]) + 1
return sorted(summaries.values(), key=lambda item: (-int(item["access_count"]), str(item["label"])))
def _function_ranges(payload: Mapping[str, Any]) -> list[JsonObject]:
call_graph = payload.get("call_graph")
if not isinstance(call_graph, Mapping):
return []
nodes = call_graph.get("nodes")
if not isinstance(nodes, list):
return []
ranges: list[JsonObject] = []
for node in nodes:
if not isinstance(node, Mapping):
continue
start = node.get("start")
end = node.get("end")
if isinstance(start, int) and isinstance(end, int):
ranges.append({"start": start, "end": end, "label": str(node.get("label") or label_for(start))})
return sorted(ranges, key=lambda item: int(item["start"]))
def _function_for_address(functions: list[JsonObject], address: int) -> JsonObject | None:
for function in functions:
if int(function["start"]) <= address <= int(function["end"]):
return function
return None
def _nearby_register_value(instructions: list[JsonObject], index: int, register: str) -> int | None:
register = register.upper()
for prior_index in range(index - 1, max(-1, index - 10), -1):
prior = instructions[prior_index]
source, destination = _source_destination_operands(str(prior.get("operands", "")))
if destination.upper() != register:
continue
value = _parse_immediate(source)
if value is not None:
return value
if _writes_register(prior, register):
return None
return None
def _writes_register(ins: Mapping[str, Any], register: str) -> bool:
_source, destination = _source_destination_operands(str(ins.get("operands", "")))
return destination.upper() == register
def _instruction_sequence(value: object) -> list[JsonObject]:
if isinstance(value, Mapping):
values: Iterable[Any] = value.values()
elif isinstance(value, list):
values = value
else:
values = []
return sorted(
[item for item in values if isinstance(item, dict) and isinstance(item.get("address"), int)],
key=lambda item: int(item["address"]),
)
def _label_for_instruction(ins: Mapping[str, Any]) -> str | None:
address = int(ins["address"])
for key in ("label", "target_label"):
value = ins.get(key)
if isinstance(value, str) and value:
return value
if _targets(ins):
return label_for(address)
return None
def _instruction_text(ins: Mapping[str, Any]) -> str:
operands = str(ins.get("operands", ""))
return f"{ins.get('mnemonic', '')} {operands}".strip()
def _references(ins: Mapping[str, Any]) -> list[int]:
refs = ins.get("references", [])
if not isinstance(refs, list):
return []
output: list[int] = []
for ref in refs:
if isinstance(ref, Mapping) and isinstance(ref.get("address"), int):
output.append(int(ref["address"]))
elif isinstance(ref, int):
output.append(ref)
return output
def _targets(ins: Mapping[str, Any]) -> list[int]:
targets = ins.get("targets", [])
if not isinstance(targets, list):
return []
return [int(target) for target in targets if isinstance(target, int)]
def _negative_indexed_operands(operands: str) -> list[JsonObject]:
matches: list[JsonObject] = []
for match in re.finditer(r"@\(-H'([0-9A-Fa-f]+),\s*(R[0-7])\)", operands):
offset = int(match.group(1), 16) & 0xFFFF
if offset not in LOGICAL_TABLES:
continue
matches.append(
{
"operand": match.group(0),
"negative_offset": offset,
"index_register": match.group(2).upper(),
}
)
return matches
def _table_for_logical_address(address: int) -> Mapping[str, Any] | None:
for table in TABLES:
if int(table["logical_base_address"]) <= address <= int(table["logical_range_end"]):
return table
return None
def _table_for_direct_candidate_address(address: int) -> Mapping[str, Any] | None:
for table in TABLES:
direct_addresses = [int(item) for item in table["direct_addresses"]]
if min(direct_addresses) <= address <= int(table["direct_range_end"]):
return table
return None
def _operand_access_kind(ins: Mapping[str, Any], operand: str) -> str:
root = _mnemonic_root(str(ins.get("mnemonic", "")))
source, destination = _source_destination_operands(str(ins.get("operands", "")))
if root in {"BTST", "CMP", "CMP:E", "CMP:G", "CMP:I", "TST"}:
return "read"
if root in {"BCLR", "BNOT", "BSET", "CLR", "INC", "INC:G", "NEG", "NOT"}:
return "write"
if operand in destination and operand not in source:
return "write"
if operand in source and operand not in destination:
return "read"
if root in {"ADD:Q", "ADD:G", "ADDS", "ADDX", "AND", "OR", "SUB", "SUBS", "SUBX", "XOR"}:
return "write"
return "read_write_candidate"
def _access_direction(ins: Mapping[str, Any], address: int) -> str | None:
root = _mnemonic_root(str(ins.get("mnemonic", "")))
if root in {"BTST", "CMP", "CMP:E", "CMP:G", "CMP:I", "MOVFPE", "TST"}:
return "read"
if root in {"BCLR", "BNOT", "BSET", "CLR", "INC", "INC:G", "NEG", "NOT"}:
return "write"
if root in {"ADD:Q", "ADD:G", "ADDS", "ADDX", "AND", "OR", "SUB", "SUBS", "SUBX", "XOR"}:
return "write"
if root in {"MOV:G", "MOV:S", "MOVTPE"}:
source, destination = _source_destination_operands(str(ins.get("operands", "")))
if _operand_mentions_address(destination, address):
return "write"
if _operand_mentions_address(source, address):
return "read"
if address in _references(ins):
if destination.startswith("@") and not _operand_mentions_any_reference(source, _references(ins)):
return "write"
if source.startswith("@") and not _operand_mentions_any_reference(destination, _references(ins)):
return "read"
if root in {"MOV:L", "MOV:F"}:
return "read"
if root == "STC":
return "write"
if root == "LDC":
return "read"
return None
def _source_destination_operands(operands: str) -> tuple[str, str]:
depth = 0
split_at: int | None = None
for index, char in enumerate(operands):
if char in "({":
depth += 1
elif char in ")}" and depth:
depth -= 1
elif char == "," and depth == 0:
split_at = index
if split_at is None:
operand = operands.strip()
return "", operand
return operands[:split_at].strip(), operands[split_at + 1 :].strip()
def _parse_immediate(operand: str) -> int | None:
text = operand.strip()
if text.startswith("#"):
text = text[1:].strip()
try:
if text.upper().startswith("H'"):
return int(text[2:], 16) & 0xFFFF
if text.upper().startswith("0X"):
return int(text, 16) & 0xFFFF
if text.upper().startswith("$"):
return int(text[1:], 16) & 0xFFFF
return int(text, 10) & 0xFFFF
except ValueError:
return None
def _operand_mentions_any_reference(operand: str, references: list[int]) -> bool:
return any(_operand_mentions_address(operand, address) for address in references)
def _operand_mentions_address(operand: str, address: int) -> bool:
operand_upper = operand.upper().replace(" ", "")
negative = (0x10000 - address) & 0xFFFF
return (
f"H'{address:04X}" in operand_upper
or f"0X{address:04X}" in operand_upper
or f"${address:04X}" in operand_upper
or f"-H'{negative:04X}" in operand_upper
or f"-0X{negative:04X}" in operand_upper
or f"-${negative:04X}" in operand_upper
)
def _mnemonic_root(mnemonic: str) -> str:
return mnemonic.rsplit(".", 1)[0].upper()
__all__ = [
"analyze_table_xrefs",
"generate_table_xref_report",
"load_table_xref_input",
"main",
"write_table_xrefs",
]