505 lines
18 KiB
Python
505 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Iterable, Mapping, TextIO
|
|
|
|
|
|
CHECKSUM_SEED = 0x5A
|
|
FRAME_LENGTH = 6
|
|
VALID_DIRECTIONS = {"rx", "tx", "auto"}
|
|
OBSERVED_TX_REPORT_CANDIDATES = {
|
|
(0x0000, 0x0080): {
|
|
"name_candidate": "heartbeat_alive_candidate",
|
|
},
|
|
(0x0007, 0x8000): {
|
|
"name_candidate": "cam_power_button_candidate",
|
|
"state_candidate": "active",
|
|
},
|
|
(0x0015, 0x8000): {
|
|
"name_candidate": "call_button_candidate",
|
|
"state_candidate": "active",
|
|
},
|
|
(0x0015, 0x0000): {
|
|
"name_candidate": "call_button_candidate",
|
|
"state_candidate": "inactive",
|
|
},
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ByteEvent:
|
|
value: int
|
|
direction_hint: str | None = None
|
|
|
|
|
|
def checksum_for(frame_prefix: Iterable[int]) -> int:
|
|
value = CHECKSUM_SEED
|
|
for byte in frame_prefix:
|
|
value ^= byte & 0xFF
|
|
return value & 0xFF
|
|
|
|
|
|
def decode_trace(
|
|
data: bytes | Iterable[int | ByteEvent],
|
|
*,
|
|
direction: str = "auto",
|
|
semantics_path: str | Path | None = None,
|
|
) -> dict[str, Any]:
|
|
if direction not in VALID_DIRECTIONS:
|
|
raise ValueError(f"direction must be one of {sorted(VALID_DIRECTIONS)}")
|
|
|
|
events = _byte_events(data)
|
|
semantics = load_semantics(semantics_path)
|
|
frames: list[dict[str, Any]] = []
|
|
previous_valid: dict[str, dict[str, Any] | None] = {"rx": None, "tx": None}
|
|
|
|
complete_len = (len(events) // FRAME_LENGTH) * FRAME_LENGTH
|
|
for frame_index, offset in enumerate(range(0, complete_len, FRAME_LENGTH)):
|
|
chunk = events[offset : offset + FRAME_LENGTH]
|
|
resolved_direction = _frame_direction(chunk, direction)
|
|
frame = _decode_frame(
|
|
[event.value for event in chunk],
|
|
frame_index=frame_index,
|
|
byte_offset=offset,
|
|
direction=resolved_direction,
|
|
semantics=semantics,
|
|
previous_valid=previous_valid,
|
|
)
|
|
frames.append(frame)
|
|
if frame["checksum"]["valid"] and resolved_direction in previous_valid:
|
|
previous_valid[resolved_direction] = frame
|
|
|
|
trailing = [event.value for event in events[complete_len:]]
|
|
return {
|
|
"kind": "h8536_protocol_trace",
|
|
"frame_length": FRAME_LENGTH,
|
|
"checksum_model": {
|
|
"algorithm": "xor",
|
|
"seed": CHECKSUM_SEED,
|
|
"seed_hex": _h8(CHECKSUM_SEED),
|
|
"covered_offsets": [0, 1, 2, 3, 4],
|
|
"checksum_offset": 5,
|
|
},
|
|
"direction_mode": direction,
|
|
"semantics": {
|
|
"loaded": semantics["loaded"],
|
|
"path": str(semantics["path"]) if semantics["path"] else None,
|
|
"command_effect_count": len(semantics["command_effects"]),
|
|
"response_schema_count": len(semantics["response_schemas"]),
|
|
"caveat": (
|
|
"Semantic names are evidence-backed candidates imported from decompiler output; "
|
|
"trace decoding does not make them protocol facts."
|
|
),
|
|
},
|
|
"frames": frames,
|
|
"trailing_bytes": [_h8(byte) for byte in trailing],
|
|
"trailing_byte_count": len(trailing),
|
|
}
|
|
|
|
|
|
def parse_byte_text(text: str, *, direction_hint: str | None = None) -> list[ByteEvent]:
|
|
events: list[ByteEvent] = []
|
|
for raw_line in text.splitlines():
|
|
line = raw_line.split("#", 1)[0].strip()
|
|
if not line:
|
|
continue
|
|
line_direction = direction_hint
|
|
lowered = line.lower()
|
|
for prefix in ("rx:", "tx:"):
|
|
if lowered.startswith(prefix):
|
|
line_direction = prefix[:2]
|
|
line = line[len(prefix) :].strip()
|
|
break
|
|
for token in _tokens(line):
|
|
events.extend(_events_from_token(token, line_direction))
|
|
return events
|
|
|
|
|
|
def load_semantics(path: str | Path | None = None) -> dict[str, Any]:
|
|
candidate = Path(path) if path else Path("build") / "rom_decompiled.json"
|
|
if not candidate.exists():
|
|
return _empty_semantics(candidate)
|
|
|
|
try:
|
|
with candidate.open("r", encoding="utf-8") as handle:
|
|
payload = json.load(handle)
|
|
except (OSError, json.JSONDecodeError):
|
|
return _empty_semantics(candidate)
|
|
|
|
serial = payload.get("serial_protocol")
|
|
if not isinstance(serial, Mapping):
|
|
serial = payload.get("serial_semantics")
|
|
if not isinstance(serial, Mapping):
|
|
return _empty_semantics(candidate)
|
|
|
|
protocol = _first_protocol(serial)
|
|
command_effects = _mapping_by_command(
|
|
_list_value(protocol.get("command_effects")) or _list_value(serial.get("command_effects"))
|
|
)
|
|
response_schemas = _list_value(protocol.get("response_schema")) or _list_value(
|
|
serial.get("response_schema")
|
|
)
|
|
return {
|
|
"loaded": True,
|
|
"path": candidate,
|
|
"command_effects": command_effects,
|
|
"response_schemas": response_schemas,
|
|
}
|
|
|
|
|
|
def format_text_report(decoded: Mapping[str, Any]) -> str:
|
|
lines = [
|
|
"H8/536 protocol trace",
|
|
(
|
|
f"frames={len(decoded.get('frames', []))} "
|
|
f"trailing={decoded.get('trailing_byte_count', 0)} "
|
|
f"semantics={'loaded' if decoded.get('semantics', {}).get('loaded') else 'not-loaded'}"
|
|
),
|
|
]
|
|
for frame in decoded.get("frames", []):
|
|
checksum = frame["checksum"]
|
|
status = "ok" if checksum["valid"] else f"bad expected {checksum['expected_hex']}"
|
|
direction = frame.get("direction") or "unknown"
|
|
prefix = (
|
|
f"[{frame['frame_index']:04d}] {direction:<7} off={frame['byte_offset']:04d} "
|
|
f"bytes={' '.join(frame['bytes_hex'])} checksum={status} "
|
|
)
|
|
if direction == "tx":
|
|
report = frame["report"]
|
|
candidate = report.get("observed_candidate")
|
|
suffix = ""
|
|
if candidate:
|
|
name = candidate.get("name_candidate")
|
|
state = candidate.get("state_candidate")
|
|
suffix = f" observed_candidate={name}" if name else " observed_candidate"
|
|
if state:
|
|
suffix += f" state={state}"
|
|
lines.append(
|
|
(
|
|
f"{prefix}report_index={report['index_hex']} "
|
|
f"value={report['value_hex']}{suffix}"
|
|
)
|
|
)
|
|
else:
|
|
command = frame["command"]
|
|
name = command.get("name_candidate")
|
|
suffix = f" {name}" if name else ""
|
|
lines.append(
|
|
(
|
|
f"{prefix}cmd={command['value_hex']}{suffix} "
|
|
f"index={frame['index']['combined']} value={frame['payload_value']['word_be_hex']}"
|
|
)
|
|
)
|
|
for annotation in frame.get("stateful_annotations", []):
|
|
lines.append(f" candidate: {annotation['kind']} - {annotation['summary']}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main(argv: list[str] | None = None, *, stdin: TextIO | None = None, stdout: TextIO | None = None) -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Decode H8/536 serial byte captures into 6-byte protocol frames."
|
|
)
|
|
parser.add_argument("bytes", nargs="*", help="Byte tokens, e.g. 00 01 02 03 04 5E or rx:00010203045E")
|
|
parser.add_argument("-i", "--input", help="Input file. Use '-' or omit byte args to read stdin.")
|
|
parser.add_argument("--direction", choices=sorted(VALID_DIRECTIONS), default="auto")
|
|
parser.add_argument("--json", action="store_true", help="Emit JSON instead of text.")
|
|
parser.add_argument(
|
|
"--semantics",
|
|
default=None,
|
|
help="Decompiler JSON path. Defaults to build/rom_decompiled.json when present.",
|
|
)
|
|
args = parser.parse_args(argv)
|
|
|
|
stdin = stdin or sys.stdin
|
|
stdout = stdout or sys.stdout
|
|
events: list[ByteEvent] = []
|
|
if args.input:
|
|
if args.input == "-":
|
|
events.extend(parse_byte_text(stdin.read()))
|
|
else:
|
|
events.extend(parse_byte_text(Path(args.input).read_text(encoding="utf-8")))
|
|
if args.bytes:
|
|
events.extend(parse_byte_text(" ".join(args.bytes)))
|
|
if not events and not args.input:
|
|
events.extend(parse_byte_text(stdin.read()))
|
|
|
|
decoded = decode_trace(events, direction=args.direction, semantics_path=args.semantics)
|
|
if args.json:
|
|
json.dump(decoded, stdout, indent=2, sort_keys=True)
|
|
stdout.write("\n")
|
|
else:
|
|
stdout.write(format_text_report(decoded))
|
|
stdout.write("\n")
|
|
return 0
|
|
|
|
|
|
def _decode_frame(
|
|
frame: list[int],
|
|
*,
|
|
frame_index: int,
|
|
byte_offset: int,
|
|
direction: str | None,
|
|
semantics: Mapping[str, Any],
|
|
previous_valid: Mapping[str, dict[str, Any] | None],
|
|
) -> dict[str, Any]:
|
|
expected = checksum_for(frame[:5])
|
|
actual = frame[5]
|
|
command_value = frame[0] & 0x07
|
|
command_effect = semantics["command_effects"].get(command_value, {})
|
|
is_tx_report = direction == "tx"
|
|
decoded = {
|
|
"frame_index": frame_index,
|
|
"byte_offset": byte_offset,
|
|
"direction": direction,
|
|
"bytes": frame,
|
|
"bytes_hex": [_h8(byte) for byte in frame],
|
|
"checksum": {
|
|
"algorithm": "xor",
|
|
"seed": CHECKSUM_SEED,
|
|
"expected": expected,
|
|
"expected_hex": _h8(expected),
|
|
"actual": actual,
|
|
"actual_hex": _h8(actual),
|
|
"valid": expected == actual,
|
|
},
|
|
"command": {
|
|
"applicable": not is_tx_report,
|
|
"source_byte": frame[0],
|
|
"source_byte_hex": _h8(frame[0]),
|
|
"mask": 0x07,
|
|
"value": command_value,
|
|
"value_hex": _h8(command_value),
|
|
"name_candidate": None if is_tx_report else command_effect.get("name_candidate"),
|
|
"effect_candidate": None if is_tx_report else command_effect or None,
|
|
"caveat": "TX frames are decoded as report frames; byte0 is not treated as a command."
|
|
if is_tx_report
|
|
else None,
|
|
},
|
|
"index": {
|
|
"byte1": frame[1],
|
|
"byte1_hex": _h8(frame[1]),
|
|
"byte1_low3": frame[1] & 0x07,
|
|
"byte1_low3_hex": _h8(frame[1] & 0x07),
|
|
"byte2": frame[2],
|
|
"byte2_hex": _h8(frame[2]),
|
|
"combined": ((frame[1] & 0x07) << 8) | frame[2],
|
|
"combined_hex": _h16(((frame[1] & 0x07) << 8) | frame[2]),
|
|
},
|
|
"payload_value": {
|
|
"byte3": frame[3],
|
|
"byte3_hex": _h8(frame[3]),
|
|
"byte4": frame[4],
|
|
"byte4_hex": _h8(frame[4]),
|
|
"word_be": (frame[3] << 8) | frame[4],
|
|
"word_be_hex": _h16((frame[3] << 8) | frame[4]),
|
|
"word_le": (frame[4] << 8) | frame[3],
|
|
"word_le_hex": _h16((frame[4] << 8) | frame[3]),
|
|
},
|
|
"report": _tx_report(frame) if is_tx_report else None,
|
|
"response_schema_candidates": []
|
|
if is_tx_report
|
|
else _response_schema_candidates(semantics, command_value),
|
|
"stateful_annotations": [],
|
|
}
|
|
decoded["stateful_annotations"] = _stateful_annotations(decoded, previous_valid)
|
|
return decoded
|
|
|
|
|
|
def _tx_report(frame: list[int]) -> dict[str, Any]:
|
|
index = (frame[0] << 16) | (frame[1] << 8) | frame[2]
|
|
value = (frame[3] << 8) | frame[4]
|
|
candidate = OBSERVED_TX_REPORT_CANDIDATES.get((index, value))
|
|
return {
|
|
"encoding": "observed_tx_index_value_report_candidate",
|
|
"confidence": "observed_candidate",
|
|
"index_source_offsets": [0, 1, 2],
|
|
"index": index,
|
|
"index_hex": f"0x{index:06X}" if index > 0xFFFF else _h16(index),
|
|
"index_bytes_hex": [_h8(frame[0]), _h8(frame[1]), _h8(frame[2])],
|
|
"value_source_offsets": [3, 4],
|
|
"value": value,
|
|
"value_hex": _h16(value),
|
|
"observed_candidate": dict(candidate) if candidate else None,
|
|
"caveat": "TX report names are capture-observed candidates, not ROM-derived protocol facts.",
|
|
}
|
|
|
|
|
|
def _stateful_annotations(
|
|
frame: Mapping[str, Any],
|
|
previous_valid: Mapping[str, dict[str, Any] | None],
|
|
) -> list[dict[str, Any]]:
|
|
annotations: list[dict[str, Any]] = []
|
|
if frame.get("direction") == "tx":
|
|
return annotations
|
|
if frame["command"]["value"] != 0x07:
|
|
return annotations
|
|
|
|
direction = frame.get("direction")
|
|
same = previous_valid.get(direction) if direction in previous_valid else None
|
|
opposite_direction = "tx" if direction == "rx" else "rx" if direction == "tx" else None
|
|
opposite = previous_valid.get(opposite_direction) if opposite_direction else None
|
|
|
|
annotation = {
|
|
"kind": "retransmit_or_error_candidate",
|
|
"confidence": "candidate",
|
|
"summary": "cmd 0x07 is associated with retry/error handling in decompiler semantics.",
|
|
"evidence": ["command_low3 == 0x07"],
|
|
"previous_valid_same_direction": _previous_summary(same),
|
|
"previous_valid_opposite_direction": _previous_summary(opposite),
|
|
}
|
|
if same and same.get("bytes") == frame.get("bytes"):
|
|
annotation["evidence"].append("matches previous valid frame in same direction")
|
|
if opposite and opposite.get("bytes") == frame.get("bytes"):
|
|
annotation["evidence"].append("matches previous valid frame in opposite direction")
|
|
annotations.append(annotation)
|
|
return annotations
|
|
|
|
|
|
def _previous_summary(frame: Mapping[str, Any] | None) -> dict[str, Any] | None:
|
|
if not frame:
|
|
return None
|
|
return {
|
|
"frame_index": frame["frame_index"],
|
|
"direction": frame.get("direction"),
|
|
"bytes_hex": frame["bytes_hex"],
|
|
"command": frame["command"]["value_hex"],
|
|
"checksum_valid": frame["checksum"]["valid"],
|
|
}
|
|
|
|
|
|
def _byte_events(data: bytes | Iterable[int | ByteEvent]) -> list[ByteEvent]:
|
|
if isinstance(data, bytes):
|
|
return [ByteEvent(byte) for byte in data]
|
|
events: list[ByteEvent] = []
|
|
for item in data:
|
|
if isinstance(item, ByteEvent):
|
|
events.append(item)
|
|
else:
|
|
value = int(item)
|
|
if not 0 <= value <= 0xFF:
|
|
raise ValueError(f"byte out of range: {value}")
|
|
events.append(ByteEvent(value))
|
|
return events
|
|
|
|
|
|
def _frame_direction(chunk: list[ByteEvent], mode: str) -> str | None:
|
|
if mode in {"rx", "tx"}:
|
|
return mode
|
|
hints = {event.direction_hint for event in chunk if event.direction_hint in {"rx", "tx"}}
|
|
if len(hints) == 1:
|
|
return next(iter(hints))
|
|
return None
|
|
|
|
|
|
def _tokens(text: str) -> list[str]:
|
|
return [token for token in text.replace(",", " ").replace(";", " ").split() if token]
|
|
|
|
|
|
def _events_from_token(token: str, direction_hint: str | None) -> list[ByteEvent]:
|
|
lowered = token.lower()
|
|
for prefix in ("rx:", "tx:"):
|
|
if lowered.startswith(prefix):
|
|
return _events_from_token(token[len(prefix) :], prefix[:2])
|
|
value_text = token.strip()
|
|
if value_text.upper().startswith("H'"):
|
|
value_text = "0x" + value_text[2:]
|
|
if (
|
|
not value_text.lower().startswith("0x")
|
|
and len(value_text) > 2
|
|
and len(value_text) % 2 == 0
|
|
and all(char in "0123456789abcdefABCDEF" for char in value_text)
|
|
):
|
|
return [
|
|
ByteEvent(int(value_text[index : index + 2], 16), direction_hint)
|
|
for index in range(0, len(value_text), 2)
|
|
]
|
|
if value_text.lower().startswith("0x"):
|
|
value = int(value_text, 16)
|
|
else:
|
|
value = int(value_text, 16)
|
|
if not 0 <= value <= 0xFF:
|
|
raise ValueError(f"byte out of range: {token}")
|
|
return [ByteEvent(value, direction_hint)]
|
|
|
|
|
|
def _empty_semantics(path: Path) -> dict[str, Any]:
|
|
return {"loaded": False, "path": path, "command_effects": {}, "response_schemas": []}
|
|
|
|
|
|
def _first_protocol(serial: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
protocols = serial.get("protocol_semantics")
|
|
if isinstance(protocols, list):
|
|
for protocol in protocols:
|
|
if isinstance(protocol, Mapping):
|
|
return protocol
|
|
return serial
|
|
|
|
|
|
def _list_value(value: Any) -> list[Any]:
|
|
return value if isinstance(value, list) else []
|
|
|
|
|
|
def _mapping_by_command(items: list[Any]) -> dict[int, Mapping[str, Any]]:
|
|
output: dict[int, Mapping[str, Any]] = {}
|
|
for item in items:
|
|
if not isinstance(item, Mapping):
|
|
continue
|
|
value = item.get("command_value", item.get("command"))
|
|
if isinstance(value, int):
|
|
output[value] = item
|
|
return output
|
|
|
|
|
|
def _response_schema_candidates(semantics: Mapping[str, Any], command: int) -> list[Mapping[str, Any]]:
|
|
matches: list[Mapping[str, Any]] = []
|
|
for schema in semantics.get("response_schemas", []):
|
|
if not isinstance(schema, Mapping):
|
|
continue
|
|
constants = _schema_constants(schema)
|
|
if command in constants:
|
|
matches.append(
|
|
{
|
|
"response_id": schema.get("response_id"),
|
|
"call_address_hex": schema.get("call_address_hex"),
|
|
"matched_command_byte_candidate": _h8(command),
|
|
"caveat": "Matched schema constants are candidates from decompiler output.",
|
|
}
|
|
)
|
|
return matches
|
|
|
|
|
|
def _schema_constants(value: Any) -> set[int]:
|
|
constants: set[int] = set()
|
|
if isinstance(value, Mapping):
|
|
for key, item in value.items():
|
|
if key in {"value", "constant", "constant_value"} and isinstance(item, int):
|
|
constants.add(item & 0x07)
|
|
constants.update(_schema_constants(item))
|
|
elif isinstance(value, list):
|
|
for item in value:
|
|
constants.update(_schema_constants(item))
|
|
return constants
|
|
|
|
|
|
def _h8(value: int) -> str:
|
|
return f"0x{value & 0xFF:02X}"
|
|
|
|
|
|
def _h16(value: int) -> str:
|
|
return f"0x{value & 0xFFFF:04X}"
|
|
|
|
|
|
__all__ = [
|
|
"ByteEvent",
|
|
"checksum_for",
|
|
"decode_trace",
|
|
"format_text_report",
|
|
"load_semantics",
|
|
"main",
|
|
"parse_byte_text",
|
|
]
|