import unittest from collections import Counter from h8536.emulator.probe import ( DEFAULT_WATCH_PCS, ProbeReport, RAMLifecycleTrace, ReportGateTrace, ReportQueueTrace, TXFrameSnapshot, TXFrameWriteTrace, parse_watch_pc, parse_tx_frame, run_probe, ) def rom_with_reset(*, reset: int = 0x1000, size: int = 0x1100) -> bytearray: rom = bytearray([0xFF] * size) rom[0:2] = reset.to_bytes(2, "big") return rom class EmulatorProbeTest(unittest.TestCase): def test_parse_watch_pc_accepts_h8_hex_forms(self): self.assertEqual(parse_watch_pc("C08B"), 0xC08B) self.assertEqual(parse_watch_pc("0xC08B"), 0xC08B) self.assertEqual(parse_watch_pc("H'C08B"), 0xC08B) def test_parse_tx_frame_accepts_spaced_and_compact_hex(self): expected = bytes.fromhex("00 00 00 00 80 DA") self.assertEqual(parse_tx_frame("00 00 00 00 80 DA"), expected) self.assertEqual(parse_tx_frame("0000000080DA"), expected) self.assertEqual(parse_tx_frame("00,00,00,00,80,DA"), expected) def test_default_watch_pcs_include_bit_bang_transfer_path(self): self.assertIn(0xC08B, DEFAULT_WATCH_PCS) self.assertIn(0xC0DB, DEFAULT_WATCH_PCS) self.assertIn(0xC121, DEFAULT_WATCH_PCS) self.assertIn(0xBFE0, DEFAULT_WATCH_PCS) self.assertIn(0xBFFE, DEFAULT_WATCH_PCS) self.assertIn(0xC059, DEFAULT_WATCH_PCS) def test_watch_snapshot_includes_bsr_return_address_on_stack(self): rom = rom_with_reset() rom[0x1000:0x1003] = b"\x5F\xFE\x80" # MOV:I.W #H'FE80, R7 rom[0x1003:0x1005] = b"\x0E\x03" # BSR H'1008, return H'1005 rom[0x1005:0x1008] = b"\x59\x12\x34" # MOV:I.W #H'1234, R1 rom[0x1008] = 0x19 # RTS report = run_probe( bytes(rom), max_steps=4, interval_steps=512, stop_on_tx=False, p9_log_limit=8, watch_pcs=(0x1008,), watch_snapshot_limit=4, watch_pc_limit=2, watch_min_interval=0, ) self.assertEqual(len(report.watch_snapshots), 1) snapshot = report.watch_snapshots[0] self.assertEqual(snapshot.pc, 0x1008) self.assertEqual(snapshot.sp, 0xFE7E) self.assertIn((0xFE7E, 0x1005), snapshot.stack_words) self.assertIn((0x1005, 0x1003), snapshot.callers) self.assertIn("H'1005<-H'1003", snapshot.line()) self.assertTrue(any("recent_watch_snapshots:" == line for line in report.lines())) def test_report_lines_include_compact_tx_frame_snapshots(self): staging_and_frame = bytes.fromhex("11 22 33 44 55 66 77 88 00 00 00 00 80 DA") report = ProbeReport( steps=99, pc=0xBA72, stopped_reason="tx", hot_pcs=Counter({0xBA72: 1}), tx_frame_snapshots=[ TXFrameSnapshot( step=98, pc=0xBA72, label="first_tdr", bytes_f850_f85d=staging_and_frame, computed_checksum=0xDA, stored_checksum=0xDA, checksum_ok=True, ) ], ) lines = report.lines() self.assertIn("recent_tx_frame_snapshots:", lines) self.assertIn( " step=98 pc=H'BA72 first_tdr " "F850-F85D=11 22 33 44 55 66 77 88 00 00 00 00 80 DA " "TX=00 00 00 00 80 DA computed=DA stored=DA checksum_ok=1", lines, ) def test_report_lines_include_target_frame_diff_and_write_sources(self): report = ProbeReport( steps=10, pc=0x1004, stopped_reason="max_steps", hot_pcs=Counter({0x1000: 1}), final_tx_frame=bytes.fromhex("00 01 7F 00 00 24"), target_frame=bytes.fromhex("00 00 00 00 80 DA"), tx_frame_write_traces=[ TXFrameWriteTrace( step=2, pc=0x1003, address=0xF859, old_value=0x00, new_value=0x01, frame_after=bytes.fromhex("00 01 00 00 00 00"), instruction="H'1003: 1D F8 58 90 MOV:G.W R0, @H'F858", regs=(0x017F, 0, 0, 0, 0, 0, 0, 0), target_value=0x00, ) ], tx_target_divergences=[ TXFrameWriteTrace( step=2, pc=0x1003, address=0xF859, old_value=0x00, new_value=0x01, frame_after=bytes.fromhex("00 01 00 00 00 00"), instruction="H'1003: 1D F8 58 90 MOV:G.W R0, @H'F858", regs=(0x017F, 0, 0, 0, 0, 0, 0, 0), target_value=0x00, ) ], report_queue_traces=[ ReportQueueTrace( step=3, pc=0x1007, kind="queue_write", head=0, tail=0, queue_index=0, address=0xF870, old_word=0x0000, new_word=0x00FF, instruction="H'1003: 1D F8 70 93 MOV:G.W R3, @H'F870", regs=(0, 0, 0, 0x00FF, 0, 0, 0, 0), ) ], report_gate_traces=[ ReportGateTrace( step=4, pc=0x4050, label="faa5_clear_enqueue_branch", f9c4=0, faa5=0, f9c3=0, head=0, tail=0, regs=(0, 0, 0, 0, 0, 0, 0, 0), z=True, c=False, n=False, decision="enqueue_candidate_faa5_clear", f9c4_last_write_step=3, f9c4_last_write_pc=0x40E0, f9c4_last_write_value=0x00, f9c4_last_write_age=1, f9c4_last_nonzero_step=2, f9c4_last_nonzero_pc=0x40D8, f9c4_last_nonzero_value=0x14, f9c4_last_nonzero_age=2, ) ], ram_lifecycle_traces=[ RAMLifecycleTrace( step=3, pc=0x40E0, address=0xF9C4, name="F9C4_report_gate_timer", old_value=0x14, new_value=0x00, instruction="H'40E0: 15 F9 C4 13 CLR.B @H'F9C4", regs=(0, 0, 0, 0, 0, 0, 0, 0), ) ], ) lines = report.lines() self.assertIn( "target_frame=target=00 00 00 00 80 DA current=00 01 7F 00 00 24 " "diffs=1:01!=00 2:7F!=00 4:00!=80 5:24!=DA pre_checksum_diffs=1,2,4", lines, ) self.assertIn("recent_tx_frame_writes:", lines) self.assertIn("first_target_divergences:", lines) self.assertIn("recent_report_queue:", lines) self.assertIn("recent_report_gates:", lines) self.assertIn("recent_ram_lifecycle:", lines) self.assertTrue(any("TX[1]" in line and "target=00 DIFF" in line for line in lines)) self.assertTrue(any("queue_write" in line and "word=H'0000->H'00FF" in line for line in lines)) self.assertTrue(any("decision=enqueue_candidate_faa5_clear" in line for line in lines)) self.assertTrue(any("F9C4_last=step=3@H'40E0 value=00 age=1" in line for line in lines)) self.assertTrue(any("F9C4_last_nonzero=step=2@H'40D8 value=14 age=2" in line for line in lines)) self.assertTrue(any("F9C4_report_gate_timer H'F9C4 14->00" in line for line in lines)) def test_run_probe_captures_tx_frame_watch_pc_and_bad_checksum(self): rom = rom_with_reset(reset=0xBA26, size=0xBB00) rom[0xBA26] = 0xFF report = run_probe( bytes(rom), max_steps=1, interval_steps=512, stop_on_tx=False, p9_log_limit=8, watch_pcs=(), tx_frame_snapshot_limit=4, ) self.assertEqual(len(report.tx_frame_snapshots), 1) snapshot = report.tx_frame_snapshots[0] self.assertEqual(snapshot.pc, 0xBA26) self.assertEqual(snapshot.label, "builder_entry") self.assertEqual(snapshot.frame_bytes, b"\x00\x00\x00\x00\x00\x00") self.assertEqual(snapshot.computed_checksum, 0x5A) self.assertEqual(snapshot.stored_checksum, 0x00) self.assertFalse(snapshot.checksum_ok) def test_run_probe_traces_tx_frame_writes_against_target(self): rom = rom_with_reset() rom[0x1000:0x1003] = b"\x58\x01\x7F" # MOV:I.W #H'017F, R0 rom[0x1003:0x1007] = b"\x1D\xF8\x58\x90" # MOV:G.W R0, @H'F858 report = run_probe( bytes(rom), max_steps=2, interval_steps=512, stop_on_tx=False, p9_log_limit=8, watch_pcs=(), trace_frame_sources=True, target_frame=bytes.fromhex("00 00 00 00 80 DA"), frame_write_trace_limit=4, ) self.assertEqual(report.final_tx_frame[:3], b"\x01\x7F\x00") self.assertEqual(len(report.tx_frame_write_traces), 2) self.assertEqual(report.tx_frame_write_traces[0].address, 0xF858) self.assertEqual(report.tx_frame_write_traces[0].target_value, 0x00) self.assertIn("MOV:G.W R0, @H'F858", report.tx_frame_write_traces[0].instruction) self.assertEqual(report.tx_target_divergences, []) self.assertTrue(any("target_frame=" in line for line in report.lines())) def test_run_probe_traces_report_queue_writes_and_cursors(self): rom = rom_with_reset() rom[0x1000:0x1003] = b"\x5B\x00\xFF" # MOV:I.W #H'00FF, R3 rom[0x1003:0x1007] = b"\x1D\xF8\x70\x93" # MOV:G.W R3, @H'F870 rom[0x1007:0x100B] = b"\x15\xF9\xB0\x08" # ADD:Q.B #1, @H'F9B0 report = run_probe( bytes(rom), max_steps=3, interval_steps=512, stop_on_tx=False, p9_log_limit=8, watch_pcs=(), trace_report_queue=True, report_queue_trace_limit=8, watch_report_ids=(0x00FF,), ) queue_writes = [trace for trace in report.report_queue_traces if trace.kind == "queue_write"] cursor_writes = [trace for trace in report.report_queue_traces if trace.kind == "cursor_head_write"] self.assertEqual(len(queue_writes), 1) self.assertEqual(queue_writes[0].queue_index, 0) self.assertEqual(queue_writes[0].old_word, 0x0000) self.assertEqual(queue_writes[0].new_word, 0x00FF) self.assertEqual(report.report_queue_first_writes, queue_writes) self.assertEqual(report.report_queue_first_nonzero_writes, queue_writes) self.assertEqual(report.report_queue_watch_hits, queue_writes) self.assertEqual(len(cursor_writes), 1) self.assertEqual(cursor_writes[0].old_value, 0x00) self.assertEqual(cursor_writes[0].new_value, 0x01) self.assertIn("recent_report_queue:", report.lines()) def test_run_probe_traces_ram_lifecycle_writes_and_last_nonzero_value(self): rom = rom_with_reset() rom[0x1000:0x1005] = b"\x15\xF9\xC4\x06\x14" # MOV:G.B #H'14, @H'F9C4 rom[0x1005:0x1009] = b"\x15\xF9\xC4\x13" # CLR.B @H'F9C4 report = run_probe( bytes(rom), max_steps=2, interval_steps=512, stop_on_tx=False, p9_log_limit=8, watch_pcs=(), trace_ram_lifecycle=True, ram_lifecycle_trace_limit=4, ) self.assertEqual(len(report.ram_lifecycle_traces), 2) set_trace, clear_trace = report.ram_lifecycle_traces self.assertEqual(set_trace.address, 0xF9C4) self.assertEqual(set_trace.old_value, 0x00) self.assertEqual(set_trace.new_value, 0x14) self.assertEqual(clear_trace.old_value, 0x14) self.assertEqual(clear_trace.new_value, 0x00) self.assertEqual(report.ram_lifecycle_last_writes, [clear_trace]) self.assertEqual(report.ram_lifecycle_last_nonzero_writes, [set_trace]) lines = report.lines() self.assertIn("recent_ram_lifecycle:", lines) self.assertIn("ram_lifecycle_last_writes:", lines) self.assertIn("ram_lifecycle_last_nonzero_writes:", lines) def test_run_probe_traces_loc_4046_report_gates(self): rom = rom_with_reset(reset=0x4046, size=0x4080) rom[0x4046:0x404A] = b"\x15\xF9\xC4\x16" # TST.B @H'F9C4 rom[0x404A:0x404C] = b"\x26\x0C" # BNE H'4058 rom[0x404C:0x4050] = b"\x15\xFA\xA5\xF7" # BTST.B #7, @H'FAA5 rom[0x4050:0x4052] = b"\x27\x07" # BEQ H'4059 rom[0x4052:0x4056] = b"\x15\xF9\xC3\x16" # TST.B @H'F9C3 rom[0x4056:0x4058] = b"\x27\x01" # BEQ H'4059 rom[0x4058] = 0x19 # RTS rom[0x4059:0x405D] = b"\x15\xF9\xB0\x82" # MOV:G.B @H'F9B0, R2 rom[0x405D:0x405F] = b"\xA2\x12" # EXTU.B R2 rom[0x405F:0x4063] = b"\x15\xF9\xB5\x72" # CMP:G.B @H'F9B5, R2 rom[0x4063:0x4065] = b"\x26\x0F" # BNE H'4074 rom[0x4065:0x4067] = b"\xA2\x1A" # SHLL.B R2 rom[0x4067:0x406C] = b"\xFA\xF8\x70\x06\x00" # MOV:G.W #H'00, @(-H'0790,R2) rom[0x406C:0x4070] = b"\x15\xF9\xB0\x08" # ADD:Q.B #1, @H'F9B0 rom[0x4070:0x4074] = b"\x15\xF9\xB0\xD7" # BCLR.B #7, @H'F9B0 report = run_probe( bytes(rom), max_steps=10, interval_steps=512, stop_on_tx=False, p9_log_limit=8, watch_pcs=(), trace_report_gates=True, report_gate_trace_limit=16, ) decisions = {trace.pc: trace.decision for trace in report.report_gate_traces} self.assertEqual(decisions[0x4046], "f9c4_zero_continue") self.assertEqual(decisions[0x4050], "enqueue_candidate_faa5_clear") self.assertEqual(decisions[0x4063], "enqueue_zero_report") self.assertEqual(decisions[0x4067], "write_report_0000_to_queue_slot") self.assertTrue(any("recent_report_gates:" == line for line in report.lines())) if __name__ == "__main__": unittest.main()