1
0
Files
h8-536-decoder/tests/test_serial_semantics.py
2026-05-25 16:32:58 +10:00

318 lines
13 KiB
Python

import unittest
from typing import Any
from h8536.serial_semantics import analyze_serial_semantics
def reference(address: int) -> dict:
return {"address": address}
def instruction(
address: int,
mnemonic: str,
operands: str = "",
references: list[int] | None = None,
targets: list[int] | None = None,
) -> dict:
return {
"address": address,
"mnemonic": mnemonic,
"operands": operands,
"references": [reference(item) for item in (references or [])],
"targets": targets or [],
}
def base_payload(instructions: list[dict]) -> dict:
return {
"serial_reconstruction": {
"candidates": [
{
"kind": "candidate_sci1_rx_frame",
"channel": "SCI1",
"frame_length": 6,
"validation_buffer_start": 0xF860,
"validation_buffer_end": 0xF865,
"checksum_address": 0xF865,
"checksum_seed": 0x5A,
"confidence": "high",
},
{
"kind": "candidate_sci1_tx_frame",
"channel": "SCI1",
"frame_length": 6,
"buffer_start": 0xF850,
"buffer_end": 0xF855,
"checksum_address": 0xF855,
"checksum_seed": 0x5A,
"confidence": "high",
},
],
},
"instructions": instructions,
}
def only_semantics(testcase: unittest.TestCase, payload: dict) -> dict:
analysis = analyze_serial_semantics(payload)
testcase.assertEqual(analysis["kind"], "serial_semantics")
testcase.assertEqual(len(analysis["protocol_semantics"]), 1)
return analysis["protocol_semantics"][0]
def semantic_items(value: Any) -> list[Any]:
if isinstance(value, list):
return value
if isinstance(value, dict):
return list(value.values())
return []
def command_item(items: list[Any], command: int) -> Any:
for item in items:
if not isinstance(item, dict):
continue
command_value = item.get("command_value", item.get("command"))
if command_value == command:
return item
if isinstance(command_value, str) and command_value.lower() == f"0x{command:02x}":
return item
return None
def semantic_text(value: Any) -> str:
if isinstance(value, dict):
return " ".join(
f"{key} {semantic_text(item)}"
for key, item in value.items()
).lower()
if isinstance(value, list):
return " ".join(semantic_text(item) for item in value).lower()
return str(value).lower()
def planned_semantics_payload() -> dict:
return base_payload(
[
instruction(0xBF00, "MOV:G.B", "@H'F860, R0", [0xF860]),
instruction(0xBF04, "AND.B", "#H'07, R0"),
instruction(0xBF08, "CMP:E.B", "#H'00, R0"),
instruction(0xBF0C, "BEQ", "loc_C000", targets=[0xC000]),
instruction(0xBF10, "CMP:E.B", "#H'01, R0"),
instruction(0xBF14, "BEQ", "loc_C100", targets=[0xC100]),
instruction(0xBF18, "CMP:E.B", "#H'06, R0"),
instruction(0xBF1C, "BEQ", "loc_C600", targets=[0xC600]),
instruction(0xBF20, "CMP:E.B", "#H'07, R0"),
instruction(0xBF24, "BEQ", "loc_C700", targets=[0xC700]),
instruction(0xC000, "MOV:G.B", "@H'F861, R1", [0xF861]),
instruction(0xC004, "MOV:G.B", "@H'F862, R2", [0xF862]),
instruction(0xC008, "BSR", "loc_622B", targets=[0x622B]),
instruction(0xC00C, "MOV:G.W", "@H'F863, R3", [0xF863]),
instruction(0xC010, "MOV:G.W", "R3, @H'F900", [0xF900]),
instruction(0xC014, "MOV:G.W", "R3, @H'F920", [0xF920]),
instruction(0xC018, "MOV:G.B", "#H'01, @H'FAA2", [0xFAA2]),
instruction(0xC01C, "MOV:G.B", "#H'04, @H'F850", [0xF850]),
instruction(0xC020, "MOV:G.B", "@H'F861, R4", [0xF861]),
instruction(0xC024, "MOV:G.B", "R4, @H'F851", [0xF851]),
instruction(0xC028, "MOV:G.B", "@H'F862, R5", [0xF862]),
instruction(0xC02C, "MOV:G.B", "R5, @H'F852", [0xF852]),
instruction(0xC030, "BSR", "loc_BA26", targets=[0xBA26]),
instruction(0xC100, "MOV:G.B", "@H'F861, R1", [0xF861]),
instruction(0xC104, "MOV:G.B", "@H'F862, R2", [0xF862]),
instruction(0xC108, "BSR", "loc_622B", targets=[0x622B]),
instruction(0xC10C, "MOV:G.W", "@H'F900, R3", [0xF900]),
instruction(0xC110, "MOV:G.B", "#H'04, @H'F850", [0xF850]),
instruction(0xC114, "MOV:G.B", "@H'F861, R4", [0xF861]),
instruction(0xC118, "MOV:G.B", "R4, @H'F851", [0xF851]),
instruction(0xC11C, "MOV:G.B", "@H'F862, R5", [0xF862]),
instruction(0xC120, "MOV:G.B", "R5, @H'F852", [0xF852]),
instruction(0xC124, "MOV:G.W", "R3, @H'F853", [0xF853]),
instruction(0xC128, "MOV:G.B", "@H'F9B5, R6", [0xF9B5]),
instruction(0xC12C, "BSR", "loc_BA26", targets=[0xBA26]),
instruction(0xC600, "MOV:G.B", "@H'F861, R1", [0xF861]),
instruction(0xC604, "MOV:G.B", "@H'F862, R2", [0xF862]),
instruction(0xC608, "BSR", "loc_622B", targets=[0x622B]),
instruction(0xC60C, "MOV:G.W", "@H'F863, R3", [0xF863]),
instruction(0xC610, "MOV:G.W", "R3, @H'F940", [0xF940]),
instruction(0xC614, "MOV:G.B", "#H'01, @H'F980", [0xF980]),
instruction(0xC618, "MOV:G.B", "#H'01, @H'F9C0", [0xF9C0]),
instruction(0xC700, "MOV:G.B", "#H'07, @H'F850", [0xF850]),
instruction(0xC704, "MOV:G.B", "@H'F861, R1", [0xF861]),
instruction(0xC708, "MOV:G.B", "R1, @H'F851", [0xF851]),
instruction(0xC70C, "BSR", "loc_BA26", targets=[0xBA26]),
instruction(0xC800, "CMP:E.B", "@H'F865, R7", [0xF865]),
instruction(0xC804, "BNE", "loc_C700", targets=[0xC700]),
]
)
class SerialSemanticsTest(unittest.TestCase):
def test_detects_low_three_bit_command_dispatch(self):
payload = base_payload(
[
instruction(0xBA80, "MOV:G.B", "@H'F860, R0", [0xF860]),
instruction(0xBA84, "AND.B", "#H'07, R0"),
instruction(0xBA88, "CMP:E.B", "#H'00, R0"),
instruction(0xBA8C, "BEQ", "loc_BAA0", targets=[0xBAA0]),
instruction(0xBA90, "CMP:E.B", "#H'02, R0"),
instruction(0xBA94, "BEQ", "loc_BAC0", targets=[0xBAC0]),
instruction(0xBA98, "CMP:E.B", "#H'07, R0"),
instruction(0xBA9C, "BEQ", "loc_BAE0", targets=[0xBAE0]),
]
)
semantics = only_semantics(self, payload)
dispatch = semantics["command_dispatch"]
self.assertEqual(dispatch["source_address"], 0xF860)
self.assertEqual(dispatch["source_field"], "byte0")
self.assertEqual(dispatch["mask"], 0x07)
self.assertEqual(dispatch["field"], "command_low3")
self.assertEqual(
{(case["value"], case["target"]) for case in dispatch["cases"]},
{(0x00, 0xBAA0), (0x02, 0xBAC0), (0x07, 0xBAE0)},
)
self.assertIn(0xBA80, dispatch["evidence_addresses"])
self.assertIn(0xBA84, dispatch["evidence_addresses"])
def test_labels_likely_rx_fields_from_validation_buffer_offsets(self):
payload = base_payload(
[
instruction(0xBB00, "MOV:G.B", "@H'F860, R0", [0xF860]),
instruction(0xBB04, "AND.B", "#H'07, R0"),
instruction(0xBB08, "MOV:G.W", "@H'F861, R1", [0xF861]),
instruction(0xBB0C, "MOV:G.W", "@H'F863, R2", [0xF863]),
]
)
semantics = only_semantics(self, payload)
fields = {field["offset"]: field for field in semantics["rx_fields"]}
self.assertEqual(fields[0]["name"], "command_low3")
self.assertEqual(fields[0]["address"], 0xF860)
self.assertEqual(fields[0]["mask"], 0x07)
self.assertEqual(fields[1]["name"], "likely_id_or_index")
self.assertEqual(fields[2]["name"], "likely_id_or_index")
self.assertEqual(fields[3]["name"], "likely_value")
self.assertEqual(fields[4]["name"], "likely_value")
self.assertIn("candidate", fields[1]["confidence"])
self.assertIn("candidate", fields[3]["confidence"])
def test_detects_response_builder_before_serial_send_call(self):
payload = base_payload(
[
instruction(0xBC00, "MOV:G.B", "@H'F860, R0", [0xF860]),
instruction(0xBC04, "MOV:G.B", "R0, @H'F850", [0xF850]),
instruction(0xBC08, "MOV:G.B", "@H'F861, R1", [0xF861]),
instruction(0xBC0C, "MOV:G.B", "R1, @H'F851", [0xF851]),
instruction(0xBC10, "MOV:G.B", "@H'F862, R2", [0xF862]),
instruction(0xBC14, "MOV:G.B", "R2, @H'F852", [0xF852]),
instruction(0xBC18, "MOV:G.B", "#H'00, @H'F853", [0xF853]),
instruction(0xBC1C, "MOV:G.B", "#H'01, @H'F854", [0xF854]),
instruction(0xBC20, "BSR", "loc_BA26", targets=[0xBA26]),
]
)
semantics = only_semantics(self, payload)
response = semantics["response_builders"][0]
self.assertEqual(response["buffer_start"], 0xF850)
self.assertEqual(response["buffer_end"], 0xF854)
self.assertEqual(response["send_call_target"], 0xBA26)
self.assertEqual(response["call_address"], 0xBC20)
self.assertEqual(
[write["address"] for write in response["writes"]],
[0xF850, 0xF851, 0xF852, 0xF853, 0xF854],
)
def test_planned_command_effects_include_core_command_behaviors(self):
semantics = only_semantics(self, planned_semantics_payload())
self.assertIn("command_effects", semantics)
effects = semantic_items(semantics["command_effects"])
command_0 = command_item(effects, 0x00)
self.assertIsNotNone(command_0)
command_0_text = semantic_text(command_0)
self.assertIn("write", command_0_text)
self.assertIn("primary_value_table", command_0_text)
self.assertIn("current_value_table", command_0_text)
command_1 = command_item(effects, 0x01)
self.assertIsNotNone(command_1)
command_1_text = semantic_text(command_1)
self.assertIn("read", command_1_text)
self.assertIn("response", command_1_text)
command_6 = command_item(effects, 0x06)
self.assertIsNotNone(command_6)
command_6_text = semantic_text(command_6)
self.assertIn("write", command_6_text)
self.assertIn("secondary_value_table", command_6_text)
def test_planned_response_schema_tracks_immediates_and_rx_copies(self):
semantics = only_semantics(self, planned_semantics_payload())
self.assertIn("response_schema", semantics)
schema_text = semantic_text(semantics["response_schema"])
self.assertIn("tx", schema_text)
self.assertIn("byte0", schema_text)
self.assertIn("0x04", schema_text)
self.assertIn("byte1", schema_text)
self.assertIn("rx[1]", schema_text)
self.assertIn("byte2", schema_text)
self.assertIn("rx[2]", schema_text)
def test_planned_table_map_candidates_name_known_tables(self):
semantics = only_semantics(self, planned_semantics_payload())
self.assertIn("table_map_candidates", semantics)
table_text = semantic_text(semantics["table_map_candidates"])
self.assertIn("primary_value_table", table_text)
self.assertIn("current_value_table", table_text)
self.assertIn("secondary_value_table", table_text)
self.assertIn("flag_table", table_text)
def test_planned_state_variable_candidates_include_known_addresses(self):
semantics = only_semantics(self, planned_semantics_payload())
self.assertIn("state_variable_candidates", semantics)
state_text = semantic_text(semantics["state_variable_candidates"])
self.assertIn("faa2", state_text)
self.assertIn("f9b5", state_text)
self.assertIn("f9c0", state_text)
def test_planned_retry_error_model_identifies_retransmit_and_checksum_error(self):
semantics = only_semantics(self, planned_semantics_payload())
self.assertIn("retry_error_model", semantics)
retry_text = semantic_text(semantics["retry_error_model"])
self.assertIn("retransmit", retry_text)
self.assertIn("0x07", retry_text)
self.assertIn("checksum_error_response", retry_text)
def test_missing_serial_reconstruction_candidates_emit_no_protocol_semantics(self):
payload = {
"serial_reconstruction": {"candidates": []},
"instructions": [
instruction(0xBA80, "MOV:G.B", "@H'F860, R0", [0xF860]),
instruction(0xBA84, "AND.B", "#H'07, R0"),
instruction(0xBA88, "CMP:E.B", "#H'00, R0"),
instruction(0xBA8C, "BEQ", "loc_BAA0", targets=[0xBAA0]),
],
}
analysis = analyze_serial_semantics(payload)
self.assertEqual(analysis["kind"], "serial_semantics")
self.assertEqual(analysis["protocol_semantics"], [])
if __name__ == "__main__":
unittest.main()