1
0
Files
h8-536-decoder/h8536/protocol_trace.py
2026-05-25 17:32:00 +10:00

505 lines
18 KiB
Python

from __future__ import annotations
import argparse
import json
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterable, Mapping, TextIO
CHECKSUM_SEED = 0x5A
FRAME_LENGTH = 6
VALID_DIRECTIONS = {"rx", "tx", "auto"}
OBSERVED_TX_REPORT_CANDIDATES = {
(0x0000, 0x0080): {
"name_candidate": "heartbeat_alive_candidate",
},
(0x0007, 0x8000): {
"name_candidate": "cam_power_button_candidate",
"state_candidate": "active",
},
(0x0015, 0x8000): {
"name_candidate": "call_button_candidate",
"state_candidate": "active",
},
(0x0015, 0x0000): {
"name_candidate": "call_button_candidate",
"state_candidate": "inactive",
},
}
@dataclass(frozen=True)
class ByteEvent:
value: int
direction_hint: str | None = None
def checksum_for(frame_prefix: Iterable[int]) -> int:
value = CHECKSUM_SEED
for byte in frame_prefix:
value ^= byte & 0xFF
return value & 0xFF
def decode_trace(
data: bytes | Iterable[int | ByteEvent],
*,
direction: str = "auto",
semantics_path: str | Path | None = None,
) -> dict[str, Any]:
if direction not in VALID_DIRECTIONS:
raise ValueError(f"direction must be one of {sorted(VALID_DIRECTIONS)}")
events = _byte_events(data)
semantics = load_semantics(semantics_path)
frames: list[dict[str, Any]] = []
previous_valid: dict[str, dict[str, Any] | None] = {"rx": None, "tx": None}
complete_len = (len(events) // FRAME_LENGTH) * FRAME_LENGTH
for frame_index, offset in enumerate(range(0, complete_len, FRAME_LENGTH)):
chunk = events[offset : offset + FRAME_LENGTH]
resolved_direction = _frame_direction(chunk, direction)
frame = _decode_frame(
[event.value for event in chunk],
frame_index=frame_index,
byte_offset=offset,
direction=resolved_direction,
semantics=semantics,
previous_valid=previous_valid,
)
frames.append(frame)
if frame["checksum"]["valid"] and resolved_direction in previous_valid:
previous_valid[resolved_direction] = frame
trailing = [event.value for event in events[complete_len:]]
return {
"kind": "h8536_protocol_trace",
"frame_length": FRAME_LENGTH,
"checksum_model": {
"algorithm": "xor",
"seed": CHECKSUM_SEED,
"seed_hex": _h8(CHECKSUM_SEED),
"covered_offsets": [0, 1, 2, 3, 4],
"checksum_offset": 5,
},
"direction_mode": direction,
"semantics": {
"loaded": semantics["loaded"],
"path": str(semantics["path"]) if semantics["path"] else None,
"command_effect_count": len(semantics["command_effects"]),
"response_schema_count": len(semantics["response_schemas"]),
"caveat": (
"Semantic names are evidence-backed candidates imported from decompiler output; "
"trace decoding does not make them protocol facts."
),
},
"frames": frames,
"trailing_bytes": [_h8(byte) for byte in trailing],
"trailing_byte_count": len(trailing),
}
def parse_byte_text(text: str, *, direction_hint: str | None = None) -> list[ByteEvent]:
events: list[ByteEvent] = []
for raw_line in text.splitlines():
line = raw_line.split("#", 1)[0].strip()
if not line:
continue
line_direction = direction_hint
lowered = line.lower()
for prefix in ("rx:", "tx:"):
if lowered.startswith(prefix):
line_direction = prefix[:2]
line = line[len(prefix) :].strip()
break
for token in _tokens(line):
events.extend(_events_from_token(token, line_direction))
return events
def load_semantics(path: str | Path | None = None) -> dict[str, Any]:
candidate = Path(path) if path else Path("build") / "rom_decompiled.json"
if not candidate.exists():
return _empty_semantics(candidate)
try:
with candidate.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
except (OSError, json.JSONDecodeError):
return _empty_semantics(candidate)
serial = payload.get("serial_protocol")
if not isinstance(serial, Mapping):
serial = payload.get("serial_semantics")
if not isinstance(serial, Mapping):
return _empty_semantics(candidate)
protocol = _first_protocol(serial)
command_effects = _mapping_by_command(
_list_value(protocol.get("command_effects")) or _list_value(serial.get("command_effects"))
)
response_schemas = _list_value(protocol.get("response_schema")) or _list_value(
serial.get("response_schema")
)
return {
"loaded": True,
"path": candidate,
"command_effects": command_effects,
"response_schemas": response_schemas,
}
def format_text_report(decoded: Mapping[str, Any]) -> str:
lines = [
"H8/536 protocol trace",
(
f"frames={len(decoded.get('frames', []))} "
f"trailing={decoded.get('trailing_byte_count', 0)} "
f"semantics={'loaded' if decoded.get('semantics', {}).get('loaded') else 'not-loaded'}"
),
]
for frame in decoded.get("frames", []):
checksum = frame["checksum"]
status = "ok" if checksum["valid"] else f"bad expected {checksum['expected_hex']}"
direction = frame.get("direction") or "unknown"
prefix = (
f"[{frame['frame_index']:04d}] {direction:<7} off={frame['byte_offset']:04d} "
f"bytes={' '.join(frame['bytes_hex'])} checksum={status} "
)
if direction == "tx":
report = frame["report"]
candidate = report.get("observed_candidate")
suffix = ""
if candidate:
name = candidate.get("name_candidate")
state = candidate.get("state_candidate")
suffix = f" observed_candidate={name}" if name else " observed_candidate"
if state:
suffix += f" state={state}"
lines.append(
(
f"{prefix}report_index={report['index_hex']} "
f"value={report['value_hex']}{suffix}"
)
)
else:
command = frame["command"]
name = command.get("name_candidate")
suffix = f" {name}" if name else ""
lines.append(
(
f"{prefix}cmd={command['value_hex']}{suffix} "
f"index={frame['index']['combined']} value={frame['payload_value']['word_be_hex']}"
)
)
for annotation in frame.get("stateful_annotations", []):
lines.append(f" candidate: {annotation['kind']} - {annotation['summary']}")
return "\n".join(lines)
def main(argv: list[str] | None = None, *, stdin: TextIO | None = None, stdout: TextIO | None = None) -> int:
parser = argparse.ArgumentParser(
description="Decode H8/536 serial byte captures into 6-byte protocol frames."
)
parser.add_argument("bytes", nargs="*", help="Byte tokens, e.g. 00 01 02 03 04 5E or rx:00010203045E")
parser.add_argument("-i", "--input", help="Input file. Use '-' or omit byte args to read stdin.")
parser.add_argument("--direction", choices=sorted(VALID_DIRECTIONS), default="auto")
parser.add_argument("--json", action="store_true", help="Emit JSON instead of text.")
parser.add_argument(
"--semantics",
default=None,
help="Decompiler JSON path. Defaults to build/rom_decompiled.json when present.",
)
args = parser.parse_args(argv)
stdin = stdin or sys.stdin
stdout = stdout or sys.stdout
events: list[ByteEvent] = []
if args.input:
if args.input == "-":
events.extend(parse_byte_text(stdin.read()))
else:
events.extend(parse_byte_text(Path(args.input).read_text(encoding="utf-8")))
if args.bytes:
events.extend(parse_byte_text(" ".join(args.bytes)))
if not events and not args.input:
events.extend(parse_byte_text(stdin.read()))
decoded = decode_trace(events, direction=args.direction, semantics_path=args.semantics)
if args.json:
json.dump(decoded, stdout, indent=2, sort_keys=True)
stdout.write("\n")
else:
stdout.write(format_text_report(decoded))
stdout.write("\n")
return 0
def _decode_frame(
frame: list[int],
*,
frame_index: int,
byte_offset: int,
direction: str | None,
semantics: Mapping[str, Any],
previous_valid: Mapping[str, dict[str, Any] | None],
) -> dict[str, Any]:
expected = checksum_for(frame[:5])
actual = frame[5]
command_value = frame[0] & 0x07
command_effect = semantics["command_effects"].get(command_value, {})
is_tx_report = direction == "tx"
decoded = {
"frame_index": frame_index,
"byte_offset": byte_offset,
"direction": direction,
"bytes": frame,
"bytes_hex": [_h8(byte) for byte in frame],
"checksum": {
"algorithm": "xor",
"seed": CHECKSUM_SEED,
"expected": expected,
"expected_hex": _h8(expected),
"actual": actual,
"actual_hex": _h8(actual),
"valid": expected == actual,
},
"command": {
"applicable": not is_tx_report,
"source_byte": frame[0],
"source_byte_hex": _h8(frame[0]),
"mask": 0x07,
"value": command_value,
"value_hex": _h8(command_value),
"name_candidate": None if is_tx_report else command_effect.get("name_candidate"),
"effect_candidate": None if is_tx_report else command_effect or None,
"caveat": "TX frames are decoded as report frames; byte0 is not treated as a command."
if is_tx_report
else None,
},
"index": {
"byte1": frame[1],
"byte1_hex": _h8(frame[1]),
"byte1_low3": frame[1] & 0x07,
"byte1_low3_hex": _h8(frame[1] & 0x07),
"byte2": frame[2],
"byte2_hex": _h8(frame[2]),
"combined": ((frame[1] & 0x07) << 8) | frame[2],
"combined_hex": _h16(((frame[1] & 0x07) << 8) | frame[2]),
},
"payload_value": {
"byte3": frame[3],
"byte3_hex": _h8(frame[3]),
"byte4": frame[4],
"byte4_hex": _h8(frame[4]),
"word_be": (frame[3] << 8) | frame[4],
"word_be_hex": _h16((frame[3] << 8) | frame[4]),
"word_le": (frame[4] << 8) | frame[3],
"word_le_hex": _h16((frame[4] << 8) | frame[3]),
},
"report": _tx_report(frame) if is_tx_report else None,
"response_schema_candidates": []
if is_tx_report
else _response_schema_candidates(semantics, command_value),
"stateful_annotations": [],
}
decoded["stateful_annotations"] = _stateful_annotations(decoded, previous_valid)
return decoded
def _tx_report(frame: list[int]) -> dict[str, Any]:
index = (frame[0] << 16) | (frame[1] << 8) | frame[2]
value = (frame[3] << 8) | frame[4]
candidate = OBSERVED_TX_REPORT_CANDIDATES.get((index, value))
return {
"encoding": "observed_tx_index_value_report_candidate",
"confidence": "observed_candidate",
"index_source_offsets": [0, 1, 2],
"index": index,
"index_hex": f"0x{index:06X}" if index > 0xFFFF else _h16(index),
"index_bytes_hex": [_h8(frame[0]), _h8(frame[1]), _h8(frame[2])],
"value_source_offsets": [3, 4],
"value": value,
"value_hex": _h16(value),
"observed_candidate": dict(candidate) if candidate else None,
"caveat": "TX report names are capture-observed candidates, not ROM-derived protocol facts.",
}
def _stateful_annotations(
frame: Mapping[str, Any],
previous_valid: Mapping[str, dict[str, Any] | None],
) -> list[dict[str, Any]]:
annotations: list[dict[str, Any]] = []
if frame.get("direction") == "tx":
return annotations
if frame["command"]["value"] != 0x07:
return annotations
direction = frame.get("direction")
same = previous_valid.get(direction) if direction in previous_valid else None
opposite_direction = "tx" if direction == "rx" else "rx" if direction == "tx" else None
opposite = previous_valid.get(opposite_direction) if opposite_direction else None
annotation = {
"kind": "retransmit_or_error_candidate",
"confidence": "candidate",
"summary": "cmd 0x07 is associated with retry/error handling in decompiler semantics.",
"evidence": ["command_low3 == 0x07"],
"previous_valid_same_direction": _previous_summary(same),
"previous_valid_opposite_direction": _previous_summary(opposite),
}
if same and same.get("bytes") == frame.get("bytes"):
annotation["evidence"].append("matches previous valid frame in same direction")
if opposite and opposite.get("bytes") == frame.get("bytes"):
annotation["evidence"].append("matches previous valid frame in opposite direction")
annotations.append(annotation)
return annotations
def _previous_summary(frame: Mapping[str, Any] | None) -> dict[str, Any] | None:
if not frame:
return None
return {
"frame_index": frame["frame_index"],
"direction": frame.get("direction"),
"bytes_hex": frame["bytes_hex"],
"command": frame["command"]["value_hex"],
"checksum_valid": frame["checksum"]["valid"],
}
def _byte_events(data: bytes | Iterable[int | ByteEvent]) -> list[ByteEvent]:
if isinstance(data, bytes):
return [ByteEvent(byte) for byte in data]
events: list[ByteEvent] = []
for item in data:
if isinstance(item, ByteEvent):
events.append(item)
else:
value = int(item)
if not 0 <= value <= 0xFF:
raise ValueError(f"byte out of range: {value}")
events.append(ByteEvent(value))
return events
def _frame_direction(chunk: list[ByteEvent], mode: str) -> str | None:
if mode in {"rx", "tx"}:
return mode
hints = {event.direction_hint for event in chunk if event.direction_hint in {"rx", "tx"}}
if len(hints) == 1:
return next(iter(hints))
return None
def _tokens(text: str) -> list[str]:
return [token for token in text.replace(",", " ").replace(";", " ").split() if token]
def _events_from_token(token: str, direction_hint: str | None) -> list[ByteEvent]:
lowered = token.lower()
for prefix in ("rx:", "tx:"):
if lowered.startswith(prefix):
return _events_from_token(token[len(prefix) :], prefix[:2])
value_text = token.strip()
if value_text.upper().startswith("H'"):
value_text = "0x" + value_text[2:]
if (
not value_text.lower().startswith("0x")
and len(value_text) > 2
and len(value_text) % 2 == 0
and all(char in "0123456789abcdefABCDEF" for char in value_text)
):
return [
ByteEvent(int(value_text[index : index + 2], 16), direction_hint)
for index in range(0, len(value_text), 2)
]
if value_text.lower().startswith("0x"):
value = int(value_text, 16)
else:
value = int(value_text, 16)
if not 0 <= value <= 0xFF:
raise ValueError(f"byte out of range: {token}")
return [ByteEvent(value, direction_hint)]
def _empty_semantics(path: Path) -> dict[str, Any]:
return {"loaded": False, "path": path, "command_effects": {}, "response_schemas": []}
def _first_protocol(serial: Mapping[str, Any]) -> Mapping[str, Any]:
protocols = serial.get("protocol_semantics")
if isinstance(protocols, list):
for protocol in protocols:
if isinstance(protocol, Mapping):
return protocol
return serial
def _list_value(value: Any) -> list[Any]:
return value if isinstance(value, list) else []
def _mapping_by_command(items: list[Any]) -> dict[int, Mapping[str, Any]]:
output: dict[int, Mapping[str, Any]] = {}
for item in items:
if not isinstance(item, Mapping):
continue
value = item.get("command_value", item.get("command"))
if isinstance(value, int):
output[value] = item
return output
def _response_schema_candidates(semantics: Mapping[str, Any], command: int) -> list[Mapping[str, Any]]:
matches: list[Mapping[str, Any]] = []
for schema in semantics.get("response_schemas", []):
if not isinstance(schema, Mapping):
continue
constants = _schema_constants(schema)
if command in constants:
matches.append(
{
"response_id": schema.get("response_id"),
"call_address_hex": schema.get("call_address_hex"),
"matched_command_byte_candidate": _h8(command),
"caveat": "Matched schema constants are candidates from decompiler output.",
}
)
return matches
def _schema_constants(value: Any) -> set[int]:
constants: set[int] = set()
if isinstance(value, Mapping):
for key, item in value.items():
if key in {"value", "constant", "constant_value"} and isinstance(item, int):
constants.add(item & 0x07)
constants.update(_schema_constants(item))
elif isinstance(value, list):
for item in value:
constants.update(_schema_constants(item))
return constants
def _h8(value: int) -> str:
return f"0x{value & 0xFF:02X}"
def _h16(value: int) -> str:
return f"0x{value & 0xFFFF:04X}"
__all__ = [
"ByteEvent",
"checksum_for",
"decode_trace",
"format_text_report",
"load_semantics",
"main",
"parse_byte_text",
]