from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from pathlib import Path from typing import Any, Iterable, Mapping, TextIO CHECKSUM_SEED = 0x5A FRAME_LENGTH = 6 VALID_DIRECTIONS = {"rx", "tx", "auto"} OBSERVED_TX_REPORT_CANDIDATES = { (0x0000, 0x0080): { "name_candidate": "heartbeat_alive_candidate", }, (0x0007, 0x8000): { "name_candidate": "cam_power_button_candidate", "state_candidate": "active", }, (0x0015, 0x8000): { "name_candidate": "call_button_candidate", "state_candidate": "active", }, (0x0015, 0x0000): { "name_candidate": "call_button_candidate", "state_candidate": "inactive", }, } @dataclass(frozen=True) class ByteEvent: value: int direction_hint: str | None = None def checksum_for(frame_prefix: Iterable[int]) -> int: value = CHECKSUM_SEED for byte in frame_prefix: value ^= byte & 0xFF return value & 0xFF def decode_trace( data: bytes | Iterable[int | ByteEvent], *, direction: str = "auto", semantics_path: str | Path | None = None, ) -> dict[str, Any]: if direction not in VALID_DIRECTIONS: raise ValueError(f"direction must be one of {sorted(VALID_DIRECTIONS)}") events = _byte_events(data) semantics = load_semantics(semantics_path) frames: list[dict[str, Any]] = [] previous_valid: dict[str, dict[str, Any] | None] = {"rx": None, "tx": None} complete_len = (len(events) // FRAME_LENGTH) * FRAME_LENGTH for frame_index, offset in enumerate(range(0, complete_len, FRAME_LENGTH)): chunk = events[offset : offset + FRAME_LENGTH] resolved_direction = _frame_direction(chunk, direction) frame = _decode_frame( [event.value for event in chunk], frame_index=frame_index, byte_offset=offset, direction=resolved_direction, semantics=semantics, previous_valid=previous_valid, ) frames.append(frame) if frame["checksum"]["valid"] and resolved_direction in previous_valid: previous_valid[resolved_direction] = frame trailing = [event.value for event in events[complete_len:]] return { "kind": "h8536_protocol_trace", "frame_length": FRAME_LENGTH, "checksum_model": { "algorithm": "xor", "seed": CHECKSUM_SEED, "seed_hex": _h8(CHECKSUM_SEED), "covered_offsets": [0, 1, 2, 3, 4], "checksum_offset": 5, }, "direction_mode": direction, "semantics": { "loaded": semantics["loaded"], "path": str(semantics["path"]) if semantics["path"] else None, "command_effect_count": len(semantics["command_effects"]), "response_schema_count": len(semantics["response_schemas"]), "caveat": ( "Semantic names are evidence-backed candidates imported from decompiler output; " "trace decoding does not make them protocol facts." ), }, "frames": frames, "trailing_bytes": [_h8(byte) for byte in trailing], "trailing_byte_count": len(trailing), } def parse_byte_text(text: str, *, direction_hint: str | None = None) -> list[ByteEvent]: events: list[ByteEvent] = [] for raw_line in text.splitlines(): line = raw_line.split("#", 1)[0].strip() if not line: continue line_direction = direction_hint lowered = line.lower() for prefix in ("rx:", "tx:"): if lowered.startswith(prefix): line_direction = prefix[:2] line = line[len(prefix) :].strip() break for token in _tokens(line): events.extend(_events_from_token(token, line_direction)) return events def load_semantics(path: str | Path | None = None) -> dict[str, Any]: candidate = Path(path) if path else Path("build") / "rom_decompiled.json" if not candidate.exists(): return _empty_semantics(candidate) try: with candidate.open("r", encoding="utf-8") as handle: payload = json.load(handle) except (OSError, json.JSONDecodeError): return _empty_semantics(candidate) serial = payload.get("serial_protocol") if not isinstance(serial, Mapping): serial = payload.get("serial_semantics") if not isinstance(serial, Mapping): return _empty_semantics(candidate) protocol = _first_protocol(serial) command_effects = _mapping_by_command( _list_value(protocol.get("command_effects")) or _list_value(serial.get("command_effects")) ) response_schemas = _list_value(protocol.get("response_schema")) or _list_value( serial.get("response_schema") ) return { "loaded": True, "path": candidate, "command_effects": command_effects, "response_schemas": response_schemas, } def format_text_report(decoded: Mapping[str, Any]) -> str: lines = [ "H8/536 protocol trace", ( f"frames={len(decoded.get('frames', []))} " f"trailing={decoded.get('trailing_byte_count', 0)} " f"semantics={'loaded' if decoded.get('semantics', {}).get('loaded') else 'not-loaded'}" ), ] for frame in decoded.get("frames", []): checksum = frame["checksum"] status = "ok" if checksum["valid"] else f"bad expected {checksum['expected_hex']}" direction = frame.get("direction") or "unknown" prefix = ( f"[{frame['frame_index']:04d}] {direction:<7} off={frame['byte_offset']:04d} " f"bytes={' '.join(frame['bytes_hex'])} checksum={status} " ) if direction == "tx": report = frame["report"] candidate = report.get("observed_candidate") suffix = "" if candidate: name = candidate.get("name_candidate") state = candidate.get("state_candidate") suffix = f" observed_candidate={name}" if name else " observed_candidate" if state: suffix += f" state={state}" lines.append( ( f"{prefix}report_index={report['index_hex']} " f"value={report['value_hex']}{suffix}" ) ) else: command = frame["command"] name = command.get("name_candidate") suffix = f" {name}" if name else "" lines.append( ( f"{prefix}cmd={command['value_hex']}{suffix} " f"index={frame['index']['combined']} value={frame['payload_value']['word_be_hex']}" ) ) for annotation in frame.get("stateful_annotations", []): lines.append(f" candidate: {annotation['kind']} - {annotation['summary']}") return "\n".join(lines) def main(argv: list[str] | None = None, *, stdin: TextIO | None = None, stdout: TextIO | None = None) -> int: parser = argparse.ArgumentParser( description="Decode H8/536 serial byte captures into 6-byte protocol frames." ) parser.add_argument("bytes", nargs="*", help="Byte tokens, e.g. 00 01 02 03 04 5E or rx:00010203045E") parser.add_argument("-i", "--input", help="Input file. Use '-' or omit byte args to read stdin.") parser.add_argument("--direction", choices=sorted(VALID_DIRECTIONS), default="auto") parser.add_argument("--json", action="store_true", help="Emit JSON instead of text.") parser.add_argument( "--semantics", default=None, help="Decompiler JSON path. Defaults to build/rom_decompiled.json when present.", ) args = parser.parse_args(argv) stdin = stdin or sys.stdin stdout = stdout or sys.stdout events: list[ByteEvent] = [] if args.input: if args.input == "-": events.extend(parse_byte_text(stdin.read())) else: events.extend(parse_byte_text(Path(args.input).read_text(encoding="utf-8"))) if args.bytes: events.extend(parse_byte_text(" ".join(args.bytes))) if not events and not args.input: events.extend(parse_byte_text(stdin.read())) decoded = decode_trace(events, direction=args.direction, semantics_path=args.semantics) if args.json: json.dump(decoded, stdout, indent=2, sort_keys=True) stdout.write("\n") else: stdout.write(format_text_report(decoded)) stdout.write("\n") return 0 def _decode_frame( frame: list[int], *, frame_index: int, byte_offset: int, direction: str | None, semantics: Mapping[str, Any], previous_valid: Mapping[str, dict[str, Any] | None], ) -> dict[str, Any]: expected = checksum_for(frame[:5]) actual = frame[5] command_value = frame[0] & 0x07 command_effect = semantics["command_effects"].get(command_value, {}) is_tx_report = direction == "tx" decoded = { "frame_index": frame_index, "byte_offset": byte_offset, "direction": direction, "bytes": frame, "bytes_hex": [_h8(byte) for byte in frame], "checksum": { "algorithm": "xor", "seed": CHECKSUM_SEED, "expected": expected, "expected_hex": _h8(expected), "actual": actual, "actual_hex": _h8(actual), "valid": expected == actual, }, "command": { "applicable": not is_tx_report, "source_byte": frame[0], "source_byte_hex": _h8(frame[0]), "mask": 0x07, "value": command_value, "value_hex": _h8(command_value), "name_candidate": None if is_tx_report else command_effect.get("name_candidate"), "effect_candidate": None if is_tx_report else command_effect or None, "caveat": "TX frames are decoded as report frames; byte0 is not treated as a command." if is_tx_report else None, }, "index": { "byte1": frame[1], "byte1_hex": _h8(frame[1]), "byte1_low3": frame[1] & 0x07, "byte1_low3_hex": _h8(frame[1] & 0x07), "byte2": frame[2], "byte2_hex": _h8(frame[2]), "combined": ((frame[1] & 0x07) << 8) | frame[2], "combined_hex": _h16(((frame[1] & 0x07) << 8) | frame[2]), }, "payload_value": { "byte3": frame[3], "byte3_hex": _h8(frame[3]), "byte4": frame[4], "byte4_hex": _h8(frame[4]), "word_be": (frame[3] << 8) | frame[4], "word_be_hex": _h16((frame[3] << 8) | frame[4]), "word_le": (frame[4] << 8) | frame[3], "word_le_hex": _h16((frame[4] << 8) | frame[3]), }, "report": _tx_report(frame) if is_tx_report else None, "response_schema_candidates": [] if is_tx_report else _response_schema_candidates(semantics, command_value), "stateful_annotations": [], } decoded["stateful_annotations"] = _stateful_annotations(decoded, previous_valid) return decoded def _tx_report(frame: list[int]) -> dict[str, Any]: index = (frame[0] << 16) | (frame[1] << 8) | frame[2] value = (frame[3] << 8) | frame[4] candidate = OBSERVED_TX_REPORT_CANDIDATES.get((index, value)) return { "encoding": "observed_tx_index_value_report_candidate", "confidence": "observed_candidate", "index_source_offsets": [0, 1, 2], "index": index, "index_hex": f"0x{index:06X}" if index > 0xFFFF else _h16(index), "index_bytes_hex": [_h8(frame[0]), _h8(frame[1]), _h8(frame[2])], "value_source_offsets": [3, 4], "value": value, "value_hex": _h16(value), "observed_candidate": dict(candidate) if candidate else None, "caveat": "TX report names are capture-observed candidates, not ROM-derived protocol facts.", } def _stateful_annotations( frame: Mapping[str, Any], previous_valid: Mapping[str, dict[str, Any] | None], ) -> list[dict[str, Any]]: annotations: list[dict[str, Any]] = [] if frame.get("direction") == "tx": return annotations if frame["command"]["value"] != 0x07: return annotations direction = frame.get("direction") same = previous_valid.get(direction) if direction in previous_valid else None opposite_direction = "tx" if direction == "rx" else "rx" if direction == "tx" else None opposite = previous_valid.get(opposite_direction) if opposite_direction else None annotation = { "kind": "retransmit_or_error_candidate", "confidence": "candidate", "summary": "cmd 0x07 is associated with retry/error handling in decompiler semantics.", "evidence": ["command_low3 == 0x07"], "previous_valid_same_direction": _previous_summary(same), "previous_valid_opposite_direction": _previous_summary(opposite), } if same and same.get("bytes") == frame.get("bytes"): annotation["evidence"].append("matches previous valid frame in same direction") if opposite and opposite.get("bytes") == frame.get("bytes"): annotation["evidence"].append("matches previous valid frame in opposite direction") annotations.append(annotation) return annotations def _previous_summary(frame: Mapping[str, Any] | None) -> dict[str, Any] | None: if not frame: return None return { "frame_index": frame["frame_index"], "direction": frame.get("direction"), "bytes_hex": frame["bytes_hex"], "command": frame["command"]["value_hex"], "checksum_valid": frame["checksum"]["valid"], } def _byte_events(data: bytes | Iterable[int | ByteEvent]) -> list[ByteEvent]: if isinstance(data, bytes): return [ByteEvent(byte) for byte in data] events: list[ByteEvent] = [] for item in data: if isinstance(item, ByteEvent): events.append(item) else: value = int(item) if not 0 <= value <= 0xFF: raise ValueError(f"byte out of range: {value}") events.append(ByteEvent(value)) return events def _frame_direction(chunk: list[ByteEvent], mode: str) -> str | None: if mode in {"rx", "tx"}: return mode hints = {event.direction_hint for event in chunk if event.direction_hint in {"rx", "tx"}} if len(hints) == 1: return next(iter(hints)) return None def _tokens(text: str) -> list[str]: return [token for token in text.replace(",", " ").replace(";", " ").split() if token] def _events_from_token(token: str, direction_hint: str | None) -> list[ByteEvent]: lowered = token.lower() for prefix in ("rx:", "tx:"): if lowered.startswith(prefix): return _events_from_token(token[len(prefix) :], prefix[:2]) value_text = token.strip() if value_text.upper().startswith("H'"): value_text = "0x" + value_text[2:] if ( not value_text.lower().startswith("0x") and len(value_text) > 2 and len(value_text) % 2 == 0 and all(char in "0123456789abcdefABCDEF" for char in value_text) ): return [ ByteEvent(int(value_text[index : index + 2], 16), direction_hint) for index in range(0, len(value_text), 2) ] if value_text.lower().startswith("0x"): value = int(value_text, 16) else: value = int(value_text, 16) if not 0 <= value <= 0xFF: raise ValueError(f"byte out of range: {token}") return [ByteEvent(value, direction_hint)] def _empty_semantics(path: Path) -> dict[str, Any]: return {"loaded": False, "path": path, "command_effects": {}, "response_schemas": []} def _first_protocol(serial: Mapping[str, Any]) -> Mapping[str, Any]: protocols = serial.get("protocol_semantics") if isinstance(protocols, list): for protocol in protocols: if isinstance(protocol, Mapping): return protocol return serial def _list_value(value: Any) -> list[Any]: return value if isinstance(value, list) else [] def _mapping_by_command(items: list[Any]) -> dict[int, Mapping[str, Any]]: output: dict[int, Mapping[str, Any]] = {} for item in items: if not isinstance(item, Mapping): continue value = item.get("command_value", item.get("command")) if isinstance(value, int): output[value] = item return output def _response_schema_candidates(semantics: Mapping[str, Any], command: int) -> list[Mapping[str, Any]]: matches: list[Mapping[str, Any]] = [] for schema in semantics.get("response_schemas", []): if not isinstance(schema, Mapping): continue constants = _schema_constants(schema) if command in constants: matches.append( { "response_id": schema.get("response_id"), "call_address_hex": schema.get("call_address_hex"), "matched_command_byte_candidate": _h8(command), "caveat": "Matched schema constants are candidates from decompiler output.", } ) return matches def _schema_constants(value: Any) -> set[int]: constants: set[int] = set() if isinstance(value, Mapping): for key, item in value.items(): if key in {"value", "constant", "constant_value"} and isinstance(item, int): constants.add(item & 0x07) constants.update(_schema_constants(item)) elif isinstance(value, list): for item in value: constants.update(_schema_constants(item)) return constants def _h8(value: int) -> str: return f"0x{value & 0xFF:02X}" def _h16(value: int) -> str: return f"0x{value & 0xFFFF:04X}" __all__ = [ "ByteEvent", "checksum_for", "decode_trace", "format_text_report", "load_semantics", "main", "parse_byte_text", ]