import json import tempfile import unittest from pathlib import Path from unittest.mock import patch from h8536.serial_pseudocode import ( SerialPseudocodeOptions, generate_serial_pseudocode, write_serial_pseudocode, ) def candidate_payload() -> dict: return { "instructions": [], "dtc_vectors": [], "sci": { "channels": { "SCI1": { "configurations": [ { "mode": "async", "mode_summary": "async 8-bit even parity 1 stop", "smr": 0x24, "smr_hex": "H'24", "brr": 0x07, "brr_hex": "H'07", "scr": 0x3C, "scr_hex": "H'3C", "clock_source": "internal", "baud_bps": None, }, ], }, }, }, "sci_protocol": { "manual_references": [ "Manual/0900766b802125d0.md:15794 RDR receive data register", "Manual/0900766b802125d0.md:15823 TDR transmit data register", ], }, "board_profile": { "summary": "Board trace ties the H8/536 SCI1 pins to a MAX202 RS232 transceiver.", "manual_references": [ "Manual/0900766b802125d0.md:2417 FP-80 H8/536 pin 66 is P95/TXD", ], "traces": [ { "signal": "TXD", "h8_pin": 66, "h8_pin_name": "P95/TXD", "max202_pin": 11, "evidence": "MAX202 pin 11 traces to H8 pin 66", }, { "signal": "RXD", "h8_pin": 67, "h8_pin_name": "P96/RXD", "max202_pin": 12, "evidence": "MAX202 pin 12 traces to H8 pin 67", }, ], }, "serial_reconstruction": { "candidates": [ { "kind": "candidate_sci1_tx_frame", "channel": "SCI1", "frame_length": 6, "buffer_start": 0xF858, "buffer_start_hex": "H'F858", "buffer_end": 0xF85D, "buffer_end_hex": "H'F85D", "checksum_address": 0xF85D, "tx_index_address": 0xF9C2, "tdr_address": 0xFEDB, "checksum_seed": 0x5A, "checksum_formula": "checksum = 0x5A ^ byte0 ^ byte1 ^ byte2 ^ byte3 ^ byte4", "confidence": "high", "confidence_score": 0.95, "confidence_reason": "all required independent evidence groups were observed", "comment": "candidate/evidence-supported SCI1 6-byte TX frame hypothesis", "evidence_addresses_hex": { "tx_checksum_seed": ["H'BA4E"], "xor_checksum_chain": ["H'BA50", "H'BA54"], }, }, { "kind": "candidate_sci1_rx_frame", "channel": "SCI1", "frame_length": 6, "capture_buffer_start": 0xF868, "capture_buffer_start_hex": "H'F868", "capture_buffer_end": 0xF86D, "capture_buffer_end_hex": "H'F86D", "validation_buffer_start": 0xF860, "validation_buffer_end": 0xF865, "checksum_address": 0xF865, "rx_index_address": 0xF9C3, "rdr_address": 0xFEDD, "interbyte_timeout_address": 0xF9C1, "complete_timer_address": 0xF9C5, "checksum_seed": 0x5A, "checksum_formula": "checksum = 0x5A ^ byte0 ^ byte1 ^ byte2 ^ byte3 ^ byte4", "confidence": "high", "confidence_score": 0.9, "confidence_reason": "RX count, copy, and checksum-validation evidence were observed", "caveat": "candidate frame means six consecutive bytes, not a proven delimited packet", "comment": "candidate/evidence-supported SCI1 6-byte RX frame hypothesis", "rx_error_handling": { "error_latch_address": 0xFAA4, "error_latch_address_hex": "H'FAA4", "error_latch_bit": 7, "fallthrough_to_rx_byte_path": True, "rdrf_clear_before_rdr_read": True, "summary": "SCI1 ERI marks a physical receive error and continues into RXI byte capture.", "manual_caveat": "The ROM clears RDRF before reading RDR; preserve that observed order.", "evidence_addresses_hex": ["H'BB57", "H'BB69", "H'BB6D"], }, "evidence_addresses_hex": { "rx_rdr_read": ["H'BB6D"], "rx_rdrf_clear_before_rdr_read": ["H'BB69", "H'BB6D"], "rx_eri_falls_through_to_rxi": ["H'BB57", "H'BB5B", "H'BB5F", "H'BB63", "H'BB69", "H'BB6D"], "rx_xor_checksum_validation": ["H'BBD6", "H'BBEC"], }, }, ], }, } def semantic_payload() -> dict: payload = candidate_payload() payload["instructions"] = [ {"address": 0xBC08, "mnemonic": "MOV:G.B", "operands": "@H'F860, R0", "references": [{"address": 0xF860}], "targets": []}, {"address": 0xBC0C, "mnemonic": "AND.B", "operands": "#H'07, R0", "references": [], "targets": []}, {"address": 0xBC20, "mnemonic": "CMP:E.B", "operands": "#H'00, R0", "references": [], "targets": []}, {"address": 0xBC22, "mnemonic": "BEQ", "operands": "loc_BC69", "references": [], "targets": [0xBC69]}, {"address": 0xBC24, "mnemonic": "CMP:E.B", "operands": "#H'01, R0", "references": [], "targets": []}, {"address": 0xBC26, "mnemonic": "BEQ", "operands": "loc_BCD7", "references": [], "targets": [0xBCD7]}, {"address": 0xBCB0, "mnemonic": "MOV:G.B", "operands": "#H'04, @H'F850", "references": [{"address": 0xF850}], "targets": []}, {"address": 0xBCB5, "mnemonic": "MOV:G.B", "operands": "@H'F861, R0", "references": [{"address": 0xF861}], "targets": []}, {"address": 0xBCB9, "mnemonic": "MOV:G.B", "operands": "R0, @H'F851", "references": [{"address": 0xF851}], "targets": []}, {"address": 0xBCCD, "mnemonic": "BSR", "operands": "loc_BA26", "references": [], "targets": [0xBA26]}, ] return payload class SerialPseudocodeTest(unittest.TestCase): def test_generates_focused_tx_and_rx_candidate_paths(self): text = generate_serial_pseudocode(candidate_payload(), source_name="rom.json") self.assertIn("focused SCI RX/TX pseudocode from rom.json", text) self.assertIn("SCI1 TX 6-byte frame at H'F858-H'F85D", text) self.assertIn("SCI1 RX 6-byte frame captured at H'F868-H'F86D", text) self.assertIn("8E1 SCI characters carry the 6 protocol bytes", text) self.assertIn("SMR=H'24, BRR=H'07, SCR=H'3C", text) self.assertIn("No SCI1 RXI/TXI DTC vector entries are present", text) self.assertIn("MAX202 pin 11 traces to H8 pin 66", text) self.assertIn("Manual/0900766b802125d0.md:15823 TDR transmit data register", text) self.assertIn("#define TX_FRAME(n) MEM8[(u16)(0xF858u + (n))]", text) self.assertIn("#define RX_CAPTURE(n) MEM8[(u16)(0xF868u + (n))]", text) self.assertIn("checksum ^= TX_FRAME(4);", text) self.assertIn("TX_FRAME(5) = sci1_tx_candidate_checksum();", text) self.assertIn("SCI1_TDR = TX_FRAME(0);", text) self.assertIn("void sci1_txi_candidate_isr(void)", text) self.assertIn("SCI1_SSR &= (u8)~SCI_SSR_RDRF;\n byte = SCI1_RDR;", text) self.assertIn("RX_CAPTURE(RX_INDEX) = byte;", text) self.assertIn("return sci1_process_rx_candidate_frame();", text) self.assertIn("RX_ERROR_LATCH |= RX_ERROR_LATCH_PHYSICAL_ERROR;", text) self.assertIn("sci1_rx_byte_received_candidate_isr();", text) self.assertIn("RX error handling: SCI1 ERI marks a physical receive error", text) self.assertIn("rx_xor_checksum_validation: H'BBD6, H'BBEC", text) def test_generates_candidate_protocol_semantics_switch(self): text = generate_serial_pseudocode(semantic_payload()) self.assertIn("Candidate Protocol Semantics", text) self.assertIn("byte0: op_flags", text) self.assertIn("dispatch: command_low3 = RX_FRAME(0) & 0x07", text) self.assertIn("case 0x00u:", text) self.assertIn("candidate_set_value_acked(logical_index, value);", text) self.assertIn("case 0x01u:", text) self.assertIn("candidate_read_value(logical_index, value);", text) def test_surfaces_refined_semantic_candidates(self): analysis = { "protocol_semantics": [ { "confidence": "medium", "confidence_score": 0.7, "commands": [ { "command_value": 0, "command_value_hex": "0x00", "name_candidate": "set_value_acked", "summary": "candidate acknowledged set", "effects": [ { "kind": "table_write_candidate", "target_candidate": "primary_value_table_candidate", "source_candidate": "RX[3:4] value bytes", "table_base_hex": "H'E000", } ], } ], "command_effects": [ { "command_value": 0, "command_value_hex": "0x00", "name_candidate": "set_value_acked", "summary": "Candidate acknowledged set writes value bytes.", "effects": [ { "kind": "table_write_candidate", "target_candidate": "primary_value_table_candidate", "source_candidate": "RX[3:4] value bytes", "table_base_hex": "H'E000", "evidence_addresses_hex": ["H'C010"], } ], "evidence_addresses_hex": ["H'C000"], } ], "response_schemas": [ { "response_id": "response_at_C030", "bytes": [ {"byte": "byte0", "source_expression": "0x04"}, {"byte": "byte1", "source_expression": "RX[1]"}, ], "evidence_addresses_hex": ["H'C01C", "H'C024"], } ], "logical_table_map_candidates": [ { "name_candidate": "primary_value_table_candidate", "logical_base_address_hex": "H'E000", "element_candidate": "word_value", "observed_accesses": ["write"], "evidence_addresses_hex": ["H'C010"], } ], "state_variable_candidates": [ { "name_candidate": "serial_session_flags_candidate", "address": 0xFAA2, "address_hex": "H'FAA2", "read_count": 1, "write_count": 2, "bit_candidates": [7], "evidence_addresses_hex": ["H'C018"], } ], "retry_error_model": { "checksum_failure_path": { "condition_candidate": "0x5A-seeded XOR over RX[0..4] differs from RX[5]", "error_target": "loc_BE29", }, "retry_path": { "counter_address_hex": "H'FAA6", "threshold_candidate": 2, "summary": "Candidate retry path stages a command 0x07 response.", }, "command_0x07_path": { "summary": "Candidate explicit command 0x07 path copies previous TX frame bytes.", }, "evidence_addresses_hex": ["H'BBD6", "H'BE29"], }, "gate_queue_model": { "predicates": [ { "name": "main_loop_may_enter_report_builder", "condition_candidate": "FAA2 == 0 && F9C0 == 0 && ((FAA5.bit7 == 0) || (F9C3 == 0))", "summary": "loc_3FD3 gates loc_BAF2.", }, { "name": "queue_has_pending_report", "condition_candidate": "F9B5 != F9B0", "summary": "Queue non-empty path stages through BB43/BA26.", }, { "name": "periodic_resend_may_fire", "condition_candidate": "(FAA5 & FAA3 & 0x80) != 0 && F9C6 == 0 && F9C8 != 0", "summary": "BE9E/BED5 resend gate.", }, ], "session_effects": [ { "name": "rx_completion_sets_session_timer", "summary": "RX completion sets F9C5.", }, { "name": "session_timeout_clears_gate_and_queue", "summary": "loc_3FEF clears F9B5/F9B0 and clears or sets FAA5.", }, { "name": "host_ack_can_advance_queue", "summary": "Commands 0x05/0x06 can ack or advance F9B5.", "command_values_hex": ["H'05", "H'06"], }, ], "caveat": "Many panel controls may require host/session traffic before reporting; observed autonomous call/cam-power indexes are runtime/capture overlays, not ROM constants.", "evidence_addresses_hex": ["H'3FD3", "H'3FEB", "H'BAF2", "H'BB43", "H'BE9E", "H'BED5"], }, "tx_report_model": { "entry_label": "loc_BB43", "value_source_candidate": "current_value_table_candidate", "observed_capture_overlay_candidates": [ { "name_candidate": "heartbeat_or_idle_report_candidate", "observed_frames_hex": ["00 00 00 00 80 DA"], }, { "name_candidate": "call_button_report_candidate", "observed_frames_hex": ["00 00 15 80 00 CF", "00 00 15 00 00 4F"], }, { "name_candidate": "camera_power_report_candidate", "observed_frames_hex": ["00 00 07 80 00 DD"], }, ], "observed_autonomous_output_caveat": "Observed autonomous output is limited to heartbeat/call/cam-power; other controls may require host/device requests first.", "evidence_addresses_hex": ["H'BB20", "H'BB43"], }, "periodic_resend_model": { "period_timer": { "reload_value_hex": "H'01F4", "summary": "Candidate periodic report/heartbeat timer reload.", }, "resend_countdown": { "reload_value_hex": "H'14", "summary": "Candidate periodic resend countdown.", }, "pending_mask": { "mask_hex": "H'80", "summary": "Candidate autonomous report pending mask.", }, "resend_path": { "summary": "Candidate periodic resend path feeding TX staging.", }, "evidence_addresses_hex": ["H'BE90", "H'BED5"], }, } ] } with patch("h8536.serial_pseudocode.analyze_serial_semantics", return_value=analysis): text = generate_serial_pseudocode(candidate_payload()) self.assertIn("command effects:", text) self.assertIn("effect: table_write_candidate; target primary_value_table_candidate", text) self.assertIn("response schemas:", text) self.assertIn("response_at_C030: byte0=0x04; byte1=RX[1]", text) self.assertIn("table map candidates:", text) self.assertIn("primary_value_table_candidate at H'E000", text) self.assertIn("state variable candidates:", text) self.assertIn("serial_session_flags_candidate H'FAA2: reads 1, writes 2; bits 7", text) self.assertIn("retry/error model candidate:", text) self.assertIn("checksum path: 0x5A-seeded XOR over RX[0..4] differs from RX[5] -> loc_BE29", text) self.assertIn("gate/queue state machine candidate:", text) self.assertIn("main_loop_may_enter_report_builder: FAA2 == 0 && F9C0 == 0", text) self.assertIn("queue_has_pending_report: F9B5 != F9B0", text) self.assertIn("host_ack_can_advance_queue: Commands 0x05/0x06 can ack or advance F9B5", text) self.assertIn("static bool sci1_candidate_main_report_gate_open(void)", text) self.assertIn("static bool sci1_candidate_report_queue_nonempty(void)", text) self.assertIn("static bool sci1_candidate_periodic_resend_gate_open(void)", text) self.assertIn("TX/autonomous report model candidate:", text) self.assertIn("loc_BB43 -> loc_BA26: bytes 0..2 encode candidate logical index/report id; bytes 3..4 come from current_value_table_candidate", text) self.assertIn("heartbeat_or_idle_report_candidate: 00 00 00 00 80 DA", text) self.assertIn("heartbeat/periodic resend candidate:", text) self.assertIn("F9C6 reload H'01F4", text) self.assertIn("BED5 resend path", text) self.assertIn("candidate effect: table_write_candidate; target primary_value_table_candidate", text) def test_tx_only_option_omits_rx_functions(self): text = generate_serial_pseudocode( candidate_payload(), options=SerialPseudocodeOptions(include_rx=False), ) self.assertIn("void sci1_tx_start_candidate_frame(void)", text) self.assertNotIn("sci1_rx_byte_received_candidate_isr", text) def test_write_serial_pseudocode_writes_output_file(self): with tempfile.TemporaryDirectory() as tmp: input_path = Path(tmp) / "rom.json" output_path = Path(tmp) / "serial.c" input_path.write_text( json.dumps(candidate_payload()), encoding="utf-8", ) write_serial_pseudocode(input_path, output_path, SerialPseudocodeOptions()) self.assertIn("sci1_process_rx_candidate_frame", output_path.read_text(encoding="utf-8")) if __name__ == "__main__": unittest.main()