import argparse import json import signal import socket import time from pathlib import Path UDP_PORT = 5606 OUT_ROOT = Path("captures") OUT_ROOT.mkdir(exist_ok=True) parser = argparse.ArgumentParser(description="Capture Project CARS UDP packets.") parser.add_argument( "--mode", choices=("auto", "v1", "v2"), default="auto", help="Packet labelling mode. V2 uses the 12-byte packet header; V1 is a fixed telemetry block." ) parser.add_argument( "--out", type=Path, default=None, help="Recording directory. Defaults to captures/session_." ) parser.add_argument( "--flat", action="store_true", help="Also write packet files directly into captures/ for quick manual inspection." ) args = parser.parse_args() session_dir = args.out or OUT_ROOT / f"session_{time.strftime('%Y%m%d_%H%M%S')}" packet_dir = session_dir / "packets" packet_dir.mkdir(parents=True, exist_ok=True) manifest_path = session_dir / "manifest.jsonl" metadata_path = session_dir / "metadata.json" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(0.5) sock.bind(("0.0.0.0", UDP_PORT)) started_monotonic = time.monotonic() started_wall = time.time() packet_index = 0 stop_requested = False metadata_path.write_text(json.dumps({ "format": "projectcars-udp-recording", "formatVersion": 1, "mode": args.mode, "udpPort": UDP_PORT, "startedAt": time.strftime("%Y-%m-%dT%H:%M:%S%z"), "packetDirectory": "packets", "manifest": "manifest.jsonl" }, indent=2), encoding="utf8") print(f"Listening on UDP {UDP_PORT} in {args.mode} mode...") print(f"Recording to {session_dir}") def request_stop(_signum, _frame) -> None: global stop_requested stop_requested = True signal.signal(signal.SIGINT, request_stop) signal.signal(signal.SIGTERM, request_stop) if hasattr(signal, "SIGBREAK"): signal.signal(signal.SIGBREAK, request_stop) def looks_like_v2(data: bytes) -> bool: if len(data) < 12: return False packet_type = data[10] packet_version = data[11] partial_count = data[9] expected_sizes = { 0: {556, 559}, 1: {308}, 2: {1136}, 3: {1059, 1063}, 4: {24}, 7: {1040}, 8: {1164, 1452}, } return ( packet_type in expected_sizes and len(data) in expected_sizes[packet_type] and packet_version <= 8 and partial_count <= 8 ) def packet_label(data: bytes) -> dict: if args.mode == "v2" or (args.mode == "auto" and looks_like_v2(data)): packet_type = data[10] packet_number = int.from_bytes(data[0:4], "little") category_number = int.from_bytes(data[4:8], "little") partial_index = data[8] partial_count = data[9] version = data[11] stem = ( f"{packet_index:06d}_type{packet_type}_pkt{packet_number}_cat{category_number}_" f"part{partial_index}-of-{partial_count}_v{version}" ) return { "format": "v2", "stem": stem, "packetType": packet_type, "packetNumber": packet_number, "categoryPacketNumber": category_number, "partialPacketIndex": partial_index, "partialPacketNumber": partial_count, "packetVersion": version } sequence = int.from_bytes(data[2:4], "little") if len(data) >= 4 else 0 return { "format": "v1", "stem": f"{packet_index:06d}_v1_seq{sequence}_size{len(data)}", "sequence": sequence } def write_final_metadata(duration: float) -> None: metadata = json.loads(metadata_path.read_text(encoding="utf8")) metadata.update({ "stoppedAt": time.strftime("%Y-%m-%dT%H:%M:%S%z"), "durationSeconds": round(duration, 3), "packetCount": packet_index }) metadata_path.write_text(json.dumps(metadata, indent=2), encoding="utf8") try: with manifest_path.open("a", encoding="utf8") as manifest: while not stop_requested: try: data, addr = sock.recvfrom(1500) except socket.timeout: continue now_monotonic = time.monotonic() now_wall = time.time() label = packet_label(data) filename = f"{label['stem']}_{int(now_wall * 1000)}.bin" packet_path = packet_dir / filename packet_path.write_bytes(data) if args.flat: (OUT_ROOT / filename).write_bytes(data) record = { "index": packet_index, "timeOffset": round(now_monotonic - started_monotonic, 6), "wallTimeOffset": round(now_wall - started_wall, 6), "source": f"{addr[0]}:{addr[1]}", "size": len(data), "file": f"packets/{filename}", **{key: value for key, value in label.items() if key != "stem"} } manifest.write(json.dumps(record, separators=(",", ":")) + "\n") manifest.flush() if label["format"] == "v1" or label.get("packetType") in (1, 2, 3, 7, 8): print(f"{filename} size={len(data)} from={addr[0]}:{addr[1]}") packet_index += 1 except KeyboardInterrupt: stop_requested = True finally: sock.close() duration = time.monotonic() - started_monotonic write_final_metadata(duration) print(f"\nStopped. Captured {packet_index} packets over {duration:.1f}s.", flush=True) print(f"Replay with: python scripts\\replay_udp.py {session_dir}", flush=True)