emualtor working
This commit is contained in:
211
h8536/consistency.py
Normal file
211
h8536/consistency.py
Normal file
@@ -0,0 +1,211 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any, Mapping
|
||||
|
||||
|
||||
JsonObject = dict[str, Any]
|
||||
|
||||
|
||||
def analyze_decompiler_consistency(payload: Mapping[str, Any]) -> JsonObject:
|
||||
"""Flag decompiler/pseudocode semantic cases that are easy to misread."""
|
||||
width_checks = [
|
||||
_byte_immediate_word_write_check(ins)
|
||||
for ins in _instruction_sequence(payload.get("instructions"))
|
||||
if is_byte_immediate_to_word_destination(ins)
|
||||
]
|
||||
width_checks = [check for check in width_checks if check]
|
||||
return {
|
||||
"kind": "decompiler_pseudocode_consistency",
|
||||
"summary": _summary(width_checks),
|
||||
"checks": width_checks,
|
||||
}
|
||||
|
||||
|
||||
def format_consistency_report(analysis: Mapping[str, Any]) -> str:
|
||||
lines = [
|
||||
"Decompiler/Pseudocode Consistency",
|
||||
str(analysis.get("summary") or "No checks emitted."),
|
||||
"",
|
||||
]
|
||||
checks = analysis.get("checks")
|
||||
if not isinstance(checks, list) or not checks:
|
||||
return "\n".join(lines).rstrip() + "\n"
|
||||
for check in checks:
|
||||
if not isinstance(check, Mapping):
|
||||
continue
|
||||
lines.append(
|
||||
f"- {check.get('address_hex', '?')}: {check.get('instruction', '')} "
|
||||
f"[{check.get('status', 'info')}]",
|
||||
)
|
||||
summary = check.get("summary")
|
||||
if summary:
|
||||
lines.append(f" {summary}")
|
||||
return "\n".join(lines).rstrip() + "\n"
|
||||
|
||||
|
||||
def is_byte_immediate_to_word_destination(instruction: Mapping[str, Any]) -> bool:
|
||||
mnemonic = str(instruction.get("mnemonic") or "")
|
||||
if _mnemonic_base(mnemonic) not in {"MOV:G", "MOV"} or _mnemonic_size(mnemonic) != "W":
|
||||
return False
|
||||
operands = _split_operands(str(instruction.get("operands") or ""))
|
||||
if len(operands) != 2:
|
||||
return False
|
||||
source = operands[0].strip()
|
||||
if not source.startswith("#"):
|
||||
return False
|
||||
literal = _immediate_literal_text(source[1:])
|
||||
return literal is not None and len(literal) <= 2
|
||||
|
||||
|
||||
def _byte_immediate_word_write_check(instruction: Mapping[str, Any]) -> JsonObject:
|
||||
address = int(instruction.get("address") or 0)
|
||||
immediate = _immediate_value(_split_operands(str(instruction.get("operands") or ""))[0])
|
||||
value_text = f"0x{immediate:04X}" if immediate is not None else "zero-extended byte"
|
||||
return {
|
||||
"kind": "byte_immediate_to_word_destination",
|
||||
"status": "requires_zero_extend8_to16_pseudocode",
|
||||
"address": address,
|
||||
"address_hex": _h16(address),
|
||||
"instruction": str(instruction.get("text") or _instruction_text(instruction)),
|
||||
"expected_pseudocode_hint": "zero_extend8_to16",
|
||||
"zero_extended_value_hex": value_text,
|
||||
"summary": (
|
||||
"Word-sized MOV with an 8-bit immediate writes a zero-extended word. "
|
||||
"Pseudocode should not model this as a one-byte write or preserve the old low byte."
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def _summary(width_checks: list[JsonObject]) -> str:
|
||||
if not width_checks:
|
||||
return "No byte-immediate-to-word destination cases found."
|
||||
return (
|
||||
f"{len(width_checks)} byte-immediate-to-word destination case(s) require "
|
||||
"explicit zero-extension in pseudocode."
|
||||
)
|
||||
|
||||
|
||||
def _instruction_sequence(value: object) -> list[Mapping[str, Any]]:
|
||||
if not isinstance(value, list):
|
||||
return []
|
||||
instructions = [item for item in value if isinstance(item, Mapping)]
|
||||
return sorted(instructions, key=lambda item: int(item.get("address") or 0))
|
||||
|
||||
|
||||
def _split_operands(operands: str) -> list[str]:
|
||||
if not operands:
|
||||
return []
|
||||
parts: list[str] = []
|
||||
start = 0
|
||||
depth = 0
|
||||
for idx, char in enumerate(operands):
|
||||
if char in "({":
|
||||
depth += 1
|
||||
elif char in ")}" and depth:
|
||||
depth -= 1
|
||||
elif char == "," and depth == 0:
|
||||
parts.append(operands[start:idx].strip())
|
||||
start = idx + 1
|
||||
parts.append(operands[start:].strip())
|
||||
return [part for part in parts if part]
|
||||
|
||||
|
||||
def _immediate_literal_text(text: str) -> str | None:
|
||||
stripped = text.strip()
|
||||
h_match = re.fullmatch(r"H'([0-9A-Fa-f]+)", stripped)
|
||||
if h_match:
|
||||
return h_match.group(1)
|
||||
x_match = re.fullmatch(r"0x([0-9A-Fa-f]+)", stripped)
|
||||
if x_match:
|
||||
return x_match.group(1)
|
||||
decimal_match = re.fullmatch(r"\d+", stripped)
|
||||
if decimal_match:
|
||||
value = int(stripped, 10)
|
||||
if 0 <= value <= 0xFF:
|
||||
return f"{value:02X}"
|
||||
return None
|
||||
|
||||
|
||||
def _immediate_value(operand: str) -> int | None:
|
||||
stripped = operand.strip()
|
||||
if stripped.startswith("#"):
|
||||
stripped = stripped[1:].strip()
|
||||
literal = _immediate_literal_text(stripped)
|
||||
if literal is None:
|
||||
return None
|
||||
return int(literal, 16)
|
||||
|
||||
|
||||
def _instruction_text(instruction: Mapping[str, Any]) -> str:
|
||||
mnemonic = str(instruction.get("mnemonic") or "")
|
||||
operands = str(instruction.get("operands") or "")
|
||||
return f"{mnemonic} {operands}".strip()
|
||||
|
||||
|
||||
def _mnemonic_base(mnemonic: str) -> str:
|
||||
return mnemonic.rsplit(".", 1)[0] if "." in mnemonic else mnemonic
|
||||
|
||||
|
||||
def _mnemonic_size(mnemonic: str) -> str:
|
||||
suffix = mnemonic.rsplit(".", 1)[-1] if "." in mnemonic else ""
|
||||
return suffix if suffix in {"B", "W"} else ""
|
||||
|
||||
|
||||
def _h16(value: int) -> str:
|
||||
return f"H'{value & 0xFFFF:04X}"
|
||||
|
||||
|
||||
def load_consistency_input(path: Path) -> JsonObject:
|
||||
with path.open("r", encoding="utf-8") as handle:
|
||||
payload = json.load(handle)
|
||||
if not isinstance(payload, dict) or "instructions" not in payload:
|
||||
raise ValueError(f"{path} does not look like h8536_decompiler JSON output")
|
||||
return payload
|
||||
|
||||
|
||||
def write_consistency_report(input_path: Path, output_path: Path, *, json_output: bool = False) -> None:
|
||||
analysis = analyze_decompiler_consistency(load_consistency_input(input_path))
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
if json_output:
|
||||
output_path.write_text(json.dumps(analysis, indent=2), encoding="utf-8")
|
||||
else:
|
||||
output_path.write_text(format_consistency_report(analysis), encoding="utf-8")
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Report decompiler/pseudocode semantic consistency checks.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"input",
|
||||
nargs="?",
|
||||
type=Path,
|
||||
default=Path("build/rom_decompiled.json"),
|
||||
help="structured JSON emitted by h8536_decompiler.py",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--out",
|
||||
type=Path,
|
||||
default=Path("build/rom_consistency.txt"),
|
||||
help="consistency report output path",
|
||||
)
|
||||
parser.add_argument("--json", action="store_true", help="write JSON instead of text")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
write_consistency_report(args.input, args.out, json_output=args.json)
|
||||
print(f"wrote {args.out}")
|
||||
return 0
|
||||
|
||||
|
||||
__all__ = [
|
||||
"analyze_decompiler_consistency",
|
||||
"format_consistency_report",
|
||||
"is_byte_immediate_to_word_destination",
|
||||
"load_consistency_input",
|
||||
"main",
|
||||
"write_consistency_report",
|
||||
]
|
||||
@@ -829,7 +829,7 @@ def _report_gate_decision(pc: int, *, f9c4: int, faa5: int, f9c3: int, head: int
|
||||
if pc == 0x4063:
|
||||
return "return_queue_not_empty" if not z else "enqueue_zero_report"
|
||||
if pc == 0x4067:
|
||||
return "write_report_00ff_to_queue_slot"
|
||||
return "write_report_0000_to_queue_slot"
|
||||
if pc == 0x406C:
|
||||
return "advance_report_queue_head"
|
||||
if pc == 0x4070:
|
||||
@@ -1197,7 +1197,7 @@ def build_arg_parser() -> argparse.ArgumentParser:
|
||||
action="append",
|
||||
type=parse_watch_pc,
|
||||
default=[],
|
||||
help="highlight queue traces involving a logical report id, e.g. 00FF or 0x00FF",
|
||||
help="highlight queue traces involving a logical report id, e.g. 0015 or 0x0015",
|
||||
)
|
||||
parser.add_argument("--report-queue-watch-hit-limit", type=int, default=32)
|
||||
parser.add_argument(
|
||||
|
||||
@@ -207,8 +207,9 @@ class H8536Emulator:
|
||||
|
||||
if op in (0x06, 0x07):
|
||||
value = raw[-1] if op == 0x06 else int.from_bytes(raw[-2:], "big")
|
||||
self._write_ea(ea, value, 1 if op == 0x06 else 2)
|
||||
self._set_logic_flags(value, 1 if op == 0x06 else 2)
|
||||
write_size = size if op == 0x06 else 2
|
||||
self._write_ea(ea, value, write_size)
|
||||
self._set_logic_flags(value, write_size)
|
||||
return next_pc
|
||||
|
||||
base = op & 0xF8
|
||||
|
||||
@@ -7,6 +7,8 @@ from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from .consistency import is_byte_immediate_to_word_destination
|
||||
|
||||
|
||||
JsonObject = dict[str, Any]
|
||||
|
||||
@@ -195,6 +197,8 @@ def _file_header(source_name: str, payload: JsonObject) -> list[str]:
|
||||
"u8 CCR, BR, EP, DP, TP;",
|
||||
"int C, Z, N, V;",
|
||||
"",
|
||||
"static inline u16 zero_extend8_to16(u8 value) { return (u16)value; }",
|
||||
"",
|
||||
]
|
||||
|
||||
|
||||
@@ -648,6 +652,8 @@ def _translate_instruction(ins: JsonObject, labels: dict[int, str]) -> str:
|
||||
if base in {"MOV", "MOV:G", "MOV:I", "MOV:E", "MOV:L", "MOV:S", "MOV:F"} and len(ops) == 2:
|
||||
source = _format_operand(ops[0], size)
|
||||
dest = _format_operand(ops[1], size, lvalue=True)
|
||||
if is_byte_immediate_to_word_destination(ins):
|
||||
return f"{dest} = zero_extend8_to16({source});"
|
||||
return f"{dest} = {_cast(source, size)};"
|
||||
|
||||
if base in {"MOVFPE"} and len(ops) == 2:
|
||||
@@ -908,6 +914,9 @@ def _metadata_comments(ins: JsonObject) -> list[str]:
|
||||
if isinstance(item, dict) and item.get("comment"):
|
||||
comments.append(str(item["comment"]))
|
||||
|
||||
if is_byte_immediate_to_word_destination(ins):
|
||||
comments.append("byte immediate zero-extended into word destination")
|
||||
|
||||
board_profile = ins.get("board_profile")
|
||||
if isinstance(board_profile, dict) and board_profile.get("comment"):
|
||||
comments.append(str(board_profile["comment"]))
|
||||
|
||||
@@ -4,6 +4,7 @@ import json
|
||||
from pathlib import Path
|
||||
|
||||
from .board_profile import board_comment_for_instruction, board_json_payload, board_metadata_for_instruction
|
||||
from .consistency import analyze_decompiler_consistency
|
||||
from .cycles import cycle_comment
|
||||
from .dataflow import state_for_instruction
|
||||
from .dtc import DtcEndpointInfo, DtcRegisterInfo
|
||||
@@ -491,6 +492,7 @@ def write_json(
|
||||
for ins in (instructions[addr] for addr in sorted(instructions))
|
||||
],
|
||||
}
|
||||
payload["decompiler_consistency"] = analyze_decompiler_consistency(payload)
|
||||
payload["serial_semantics"] = analyze_serial_semantics(payload)
|
||||
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
|
||||
|
||||
|
||||
@@ -316,7 +316,7 @@ def _idle_heartbeat_gate(payload: dict[str, Any], by_address: dict[int, JsonObje
|
||||
"summary": (
|
||||
"F9C4 gates the idle/default report enqueue. Reset/init loads H'14, each BA26 send "
|
||||
"reloads H'07, and the FRT2 OCIA handler decrements it; when it reaches zero loc_4046 "
|
||||
"can enqueue H'00FF if the queue is empty and the FAA5/F9C3 RX gate permits it. With "
|
||||
"can enqueue H'0000 if the queue is empty and the FAA5/F9C3 RX gate permits it. With "
|
||||
"FRT2 OCRA H'7A12 and CKS=phi/32, a phi near 10 MHz gives about 0.7s for H'07, matching "
|
||||
"the observed heartbeat cadence."
|
||||
),
|
||||
|
||||
@@ -635,6 +635,20 @@ def _gate_queue_comment_lines(
|
||||
condition = _comment_text(str(predicate.get("condition_candidate") or "condition unknown"))
|
||||
summary = _comment_text(str(predicate.get("summary") or "candidate gate"))
|
||||
lines.append(f"{prefix}- {name}: {condition}; {summary}")
|
||||
enqueued = predicate.get("enqueued_report_candidate_hex")
|
||||
if enqueued:
|
||||
lines.append(f"{prefix} enqueues report {enqueued}")
|
||||
write_semantics = str(predicate.get("write_semantics_candidate") or "").strip()
|
||||
if write_semantics:
|
||||
lines.append(f"{prefix} write semantics: {_comment_text(write_semantics)}")
|
||||
runtime = predicate.get("runtime_trace_confirmation")
|
||||
if isinstance(runtime, dict):
|
||||
frame = runtime.get("emitted_frame_hex")
|
||||
path = " -> ".join(str(item) for item in runtime.get("dequeue_path", []) if item)
|
||||
detail = f"runtime-confirmed frame {frame}" if frame else "runtime-confirmed path"
|
||||
if path:
|
||||
detail += f" via {path}"
|
||||
lines.append(f"{prefix} {detail}")
|
||||
for effect in _object_list(value.get("session_effects")):
|
||||
name = effect.get("name") or "session_effect_candidate"
|
||||
summary = _comment_text(str(effect.get("summary") or "candidate session effect"))
|
||||
@@ -677,6 +691,16 @@ def _gate_queue_predicate_function_lines(value: object) -> list[str]:
|
||||
" return idle_timer_clear && rx_gate_open && queue_empty;",
|
||||
"}",
|
||||
"",
|
||||
"static void sci1_candidate_enqueue_idle_heartbeat_report(void)",
|
||||
"{",
|
||||
" if (!sci1_candidate_idle_heartbeat_enqueue_gate_open()) {",
|
||||
" return;",
|
||||
" }",
|
||||
"",
|
||||
" /* loc_4067 writes MOV:G.W #H'00, so the queue report id is 0x0000. */",
|
||||
" candidate_enqueue_report(0x0000u);",
|
||||
"}",
|
||||
"",
|
||||
"static bool sci1_candidate_periodic_resend_gate_open(void)",
|
||||
"{",
|
||||
" bool pending = (MEM8[0xFAA5u] & MEM8[0xFAA3u] & 0x80u) != 0u;",
|
||||
@@ -711,6 +735,21 @@ def _tx_report_comment_lines(
|
||||
observed.append(f"{name}: {frames}")
|
||||
if observed:
|
||||
lines.append(f"{prefix}- observed overlay candidates: {_comment_text('; '.join(observed))}")
|
||||
for runtime in _object_list(value.get("runtime_confirmed_paths")):
|
||||
name = runtime.get("name") or "runtime_confirmation"
|
||||
frame = runtime.get("emitted_frame_hex") or "frame?"
|
||||
report = runtime.get("report_id_hex") or "report?"
|
||||
summary = f"{name}: report {report} emits {frame}"
|
||||
semantics = runtime.get("queue_write_semantics")
|
||||
if semantics:
|
||||
summary += f"; {semantics}"
|
||||
lines.append(f"{prefix}- runtime confirmation: {_comment_text(summary)}")
|
||||
checks = _object_list(value.get("consistency_checks"))
|
||||
for check in checks:
|
||||
name = check.get("name") or "consistency_check"
|
||||
status = check.get("status") or "info"
|
||||
summary = _comment_text(str(check.get("summary") or ""))
|
||||
lines.append(f"{prefix}- consistency {name}: {status}; {summary}")
|
||||
caveat = str(value.get("observed_autonomous_output_caveat") or value.get("caveat") or "").strip()
|
||||
if caveat:
|
||||
lines.append(f"{prefix}- caveat: {_comment_text(caveat)}")
|
||||
|
||||
@@ -1683,10 +1683,24 @@ def _gate_queue_model(ordered: list[JsonObject], commands: list[JsonObject]) ->
|
||||
),
|
||||
"summary": (
|
||||
"Idle/default report gate; when the FRT2 countdown clears and the queue is "
|
||||
"empty, loc_4046 can enqueue H'00FF for the later loc_BAF2 -> loc_BA26 send path."
|
||||
"empty, loc_4046 can enqueue H'0000 for the later loc_BAF2 -> loc_BA26 send path."
|
||||
),
|
||||
"state_addresses_hex": [_h16(0xF9C4), _h16(0xFAA5), _h16(0xF9C3), _h16(0xF9B0), _h16(0xF9B5)],
|
||||
"enqueued_report_candidate_hex": _h16(0x00FF),
|
||||
"enqueued_report_candidate_hex": _h16(0x0000),
|
||||
"write_semantics_candidate": (
|
||||
"loc_4067 is MOV:G.W #H'00, @(-H'0790,R2): the byte immediate is "
|
||||
"zero-extended by the word destination, so the queue slot becomes H'0000."
|
||||
),
|
||||
"runtime_trace_confirmation": {
|
||||
"source": "h8536_emulator_probe target-frame run",
|
||||
"report_id_hex": _h16(0x0000),
|
||||
"queue_write_address_hex": _h16(IDLE_REPORT_QUEUE_WRITE),
|
||||
"queue_write_semantics": "H'FFFF -> H'0000, not H'00FF",
|
||||
"dequeue_path": ["loc_4046", "loc_BAF2", "loc_BB08", "loc_BB1C", "loc_BB20", "loc_BB2B", "loc_BA26"],
|
||||
"emitted_frame_hex": "00 00 00 00 80 DA",
|
||||
"checksum_seed_hex": _h16(CHECKSUM_SEED, width=2),
|
||||
"checksum_hex": "H'DA",
|
||||
},
|
||||
"evidence_addresses": _addresses_in_ranges(
|
||||
ordered,
|
||||
[(IDLE_REPORT_GATE_ENTRY, IDLE_REPORT_GATE_END)],
|
||||
@@ -1965,6 +1979,27 @@ def _tx_report_model(ordered: list[JsonObject], responses: list[JsonObject]) ->
|
||||
"value_source_candidate": "current_value_table_candidate",
|
||||
"checksum_formula": "checksum = 0x5A ^ byte0 ^ byte1 ^ byte2 ^ byte3 ^ byte4",
|
||||
"observed_capture_overlay_candidates": OBSERVED_TX_REPORT_OVERLAY,
|
||||
"runtime_confirmed_paths": [
|
||||
{
|
||||
"name": "idle_heartbeat_report_runtime_confirmation",
|
||||
"report_id_hex": _h16(0x0000),
|
||||
"queue_write_address_hex": _h16(IDLE_REPORT_QUEUE_WRITE),
|
||||
"queue_write_semantics": "MOV:G.W #H'00 writes H'0000 to the queue slot",
|
||||
"staging_path": ["loc_4046", "loc_BAF2", "loc_BB08", "loc_BB1C", "loc_BB20", "loc_BB2B", "loc_BA26"],
|
||||
"emitted_frame_hex": "00 00 00 00 80 DA",
|
||||
"checksum_hex": "H'DA",
|
||||
}
|
||||
],
|
||||
"consistency_checks": [
|
||||
{
|
||||
"name": "idle_heartbeat_report_id_width",
|
||||
"status": "pass",
|
||||
"summary": (
|
||||
"Decompiler mnemonic MOV:G.W and emulator execution now agree that the "
|
||||
"H'00 immediate at loc_4067 is zero-extended to report H'0000."
|
||||
),
|
||||
}
|
||||
],
|
||||
"observed_autonomous_output_caveat": (
|
||||
"Real captures supplied so far show only heartbeat/idle, call, and camera-power "
|
||||
"autonomous TX frames. Other panel controls may require a host/device request or "
|
||||
|
||||
Reference in New Issue
Block a user