import argparse import json import mmap import struct import time import urllib.error import urllib.request import zlib from datetime import datetime, timezone SHARED_MEMORY_NAME = "$pcars2$" SHARED_MEMORY_SIZE = 20704 DEFAULT_URL = "http://127.0.0.1:3000/api/ingest/shared-memory" GAME_STATE_NAMES = { 0: "Exited", 1: "Front end", 2: "Playing", 3: "Paused", 4: "In menu", 5: "Restarting", 6: "Replay", 7: "Front end replay", } SESSION_STATE_NAMES = { 0: "Invalid", 1: "Practice", 2: "Test", 3: "Qualifying", 4: "Formation lap", 5: "Race", 6: "Time attack", } OFFSETS = { "version": 0, "buildVersionNumber": 4, "gameState": 8, "sessionState": 12, "raceState": 16, "viewedParticipantIndex": 20, "numParticipants": 24, "participantInfo": 28, "lapsInEvent": 6572, "trackLocation": 6576, "trackVariation": 6640, "trackLength": 6704, "lapInvalidated": 6712, "splitTimeAhead": 6728, "splitTimeBehind": 6732, "splitTime": 6736, "eventTimeRemaining": 6740, "personalFastestLapTime": 6744, "worldFastestLapTime": 6748, "currentSector1Time": 6752, "currentSector2Time": 6756, "currentSector3Time": 6760, "fastestSector1Time": 6764, "fastestSector2Time": 6768, "fastestSector3Time": 6772, "personalFastestSector1Time": 6776, "personalFastestSector2Time": 6780, "personalFastestSector3Time": 6784, "worldFastestSector1Time": 6788, "worldFastestSector2Time": 6792, "worldFastestSector3Time": 6796, "ambientTemperature": 7292, "trackTemperature": 7296, "rainDensity": 7300, "windSpeed": 7304, "windDirectionX": 7308, "windDirectionY": 7312, "currentSector1Times": 7408, "currentSector2Times": 7664, "currentSector3Times": 7920, "fastestSector1Times": 8176, "fastestSector2Times": 8432, "fastestSector3Times": 8688, "fastestLapTimes": 8944, "lastLapTimes": 9200, "lapsInvalidated": 9456, "raceStates": 9520, "pitModes": 9776, "speeds": 10800, "carNames": 11056, "carClassNames": 15152, "pitSchedules": 19548, "highestFlagColours": 19804, "highestFlagReasons": 20060, "nationalities": 20316, "snowDensity": 20572, "sessionDuration": 20576, "sessionAdditionalLaps": 20580, "yellowFlagState": 20688, "launchStage": 20696, } PARTICIPANT_STRIDE = 100 MAX_PARTICIPANTS = 64 UDP_PARTICIPANTS = 32 STRING_LENGTH = 64 def utc_now() -> str: return datetime.now(timezone.utc).isoformat() def u32(data: bytes, offset: int) -> int: return struct.unpack_from(" int: return struct.unpack_from(" float: value = struct.unpack_from(" bool: return data[offset] != 0 def c_string(data: bytes, offset: int, length: int = STRING_LENGTH) -> str: raw = data[offset:offset + length] return raw.split(b"\x00", 1)[0].decode("utf-8", errors="ignore").strip() def float_array_value(data: bytes, offset: int, index: int) -> float: return f32(data, offset + index * 4) def u32_array_value(data: bytes, offset: int, index: int) -> int: return u32(data, offset + index * 4) def bool_array_value(data: bytes, offset: int, index: int) -> bool: return boolean(data, offset + index) def participant_base(offset: int) -> int: return OFFSETS["participantInfo"] + offset * PARTICIPANT_STRIDE def participant_name(data: bytes, slot: int) -> str: return c_string(data, participant_base(slot) + 1) def participant_world_position(data: bytes, slot: int) -> list[float]: base = participant_base(slot) + 68 return [f32(data, base), f32(data, base + 4), f32(data, base + 8)] def participant_lap_distance(data: bytes, slot: int) -> int: return max(0, min(65535, round(f32(data, participant_base(slot) + 80)))) def participant_u32(data: bytes, slot: int, relative_offset: int) -> int: return u32(data, participant_base(slot) + relative_offset) def car_name(data: bytes, slot: int) -> str: return c_string(data, OFFSETS["carNames"] + slot * STRING_LENGTH) def car_class_name(data: bytes, slot: int) -> str: return c_string(data, OFFSETS["carClassNames"] + slot * STRING_LENGTH) def active_slots(data: bytes) -> list[int]: declared = max(0, min(MAX_PARTICIPANTS, i32(data, OFFSETS["numParticipants"]))) slots = [] for slot in range(declared): is_active = boolean(data, participant_base(slot)) name = participant_name(data, slot) race_position = participant_u32(data, slot, 84) if is_active or name or race_position: slots.append(slot) return slots[:UDP_PARTICIPANTS] def packet(packet_type: int, packet_number: int, data: dict, name: str) -> dict: return { "receivedAt": utc_now(), "source": "shared-memory", "base": { "packetNumber": packet_number, "categoryPacketNumber": packet_number, "partialPacketIndex": 0, "partialPacketNumber": 1, "packetType": packet_type, "packetVersion": 0, }, "name": name, "size": 0, "data": data, } def build_frame(data: bytes, sequence: int) -> dict: slots = active_slots(data) timestamp = u32(data, OFFSETS["version"]) participants = [] timings = [] stats = [] vehicles = [] classes = {} for slot in slots: name = participant_name(data, slot) race_position = participant_u32(data, slot, 84) laps_completed = participant_u32(data, slot, 88) current_lap = participant_u32(data, slot, 92) sector = i32(data, participant_base(slot) + 96) race_state = u32_array_value(data, OFFSETS["raceStates"], slot) lap_invalidated = bool_array_value(data, OFFSETS["lapsInvalidated"], slot) pit_mode = u32_array_value(data, OFFSETS["pitModes"], slot) pit_schedule = u32_array_value(data, OFFSETS["pitSchedules"], slot) flag_colour = u32_array_value(data, OFFSETS["highestFlagColours"], slot) flag_reason = u32_array_value(data, OFFSETS["highestFlagReasons"], slot) nationality = u32_array_value(data, OFFSETS["nationalities"], slot) vehicle_name = car_name(data, slot) vehicle_class = car_class_name(data, slot) current_sector_times = [ float_array_value(data, OFFSETS["currentSector1Times"], slot), float_array_value(data, OFFSETS["currentSector2Times"], slot), float_array_value(data, OFFSETS["currentSector3Times"], slot), ] participants.append({ "slot": slot, "localSlot": slot % 16, "partialPacketIndex": slot // 16, "name": name, "nationality": nationality, "index": slot + 1, }) timings.append({ "slot": slot, "worldPosition": participant_world_position(data, slot), "orientation": [0, 0, 0], "currentLapDistance": participant_lap_distance(data, slot), "racePosition": race_position, "active": boolean(data, participant_base(slot)), "sector": sector, "sectorRaw": sector, "sectorExtraPrecision": 0, "highestFlag": flag_colour, "highestFlagColour": flag_colour, "highestFlagReason": flag_reason, "pitModeSchedule": pit_mode | ((pit_schedule & 0x07) << 3), "pitMode": pit_mode, "pitSchedule": pit_schedule, "carIndex": slot + 1, "raceState": race_state | (0x08 if lap_invalidated else 0), "currentLap": current_lap or laps_completed + 1, "currentTime": max(current_sector_times), "currentSectorTime": current_sector_times[sector - 1] if 1 <= sector <= 3 else 0, "mpParticipantIndex": slot + 1, "speed": float_array_value(data, OFFSETS["speeds"], slot), }) stats.append({ "slot": slot, "fastestLapTime": float_array_value(data, OFFSETS["fastestLapTimes"], slot), "lastLapTime": float_array_value(data, OFFSETS["lastLapTimes"], slot), "lastSectorTime": max(current_sector_times), "fastestSectors": [ float_array_value(data, OFFSETS["fastestSector1Times"], slot), float_array_value(data, OFFSETS["fastestSector2Times"], slot), float_array_value(data, OFFSETS["fastestSector3Times"], slot), ], "participantOnlineRep": 0, "mpParticipantIndex": slot + 1, }) if vehicle_name: class_index = zlib.crc32(vehicle_class.encode("utf-8")) if vehicle_class else 0 vehicles.append({ "index": slot + 1, "class": class_index, "name": vehicle_name, }) if vehicle_class: classes[class_index] = {"classIndex": class_index, "name": vehicle_class} game_state = u32(data, OFFSETS["gameState"]) session_state = u32(data, OFFSETS["sessionState"]) packets = [ packet(1, sequence, { "worldFastestLapTime": f32(data, OFFSETS["worldFastestLapTime"]), "personalFastestLapTime": f32(data, OFFSETS["personalFastestLapTime"]), "personalFastestSectors": [ f32(data, OFFSETS["personalFastestSector1Time"]), f32(data, OFFSETS["personalFastestSector2Time"]), f32(data, OFFSETS["personalFastestSector3Time"]), ], "worldFastestSectors": [ f32(data, OFFSETS["worldFastestSector1Time"]), f32(data, OFFSETS["worldFastestSector2Time"]), f32(data, OFFSETS["worldFastestSector3Time"]), ], "trackLength": f32(data, OFFSETS["trackLength"]), "trackLocation": c_string(data, OFFSETS["trackLocation"]), "trackVariation": c_string(data, OFFSETS["trackVariation"]), "translatedTrackLocation": "", "translatedTrackVariation": "", "lapsTimeInEvent": u32(data, OFFSETS["lapsInEvent"]), "enforcedPitStopLap": 0, }, "Race Definition"), packet(2, sequence, { "participantsChangedTimestamp": timestamp, "participants": participants, }, "Participants"), packet(3, sequence, { "numParticipants": len(slots), "participantsChangedTimestamp": timestamp, "eventTimeRemaining": f32(data, OFFSETS["eventTimeRemaining"]), "splitTimeAhead": f32(data, OFFSETS["splitTimeAhead"]), "splitTimeBehind": f32(data, OFFSETS["splitTimeBehind"]), "splitTime": f32(data, OFFSETS["splitTime"]), "localParticipantIndex": i32(data, OFFSETS["viewedParticipantIndex"]) + 1, "participants": timings, }, "Timings"), packet(4, sequence, { "buildVersionNumber": u32(data, OFFSETS["buildVersionNumber"]), "gameStateRaw": game_state | (session_state << 4), "gameState": game_state, "gameStateName": GAME_STATE_NAMES.get(game_state), "sessionState": session_state, "sessionStateName": SESSION_STATE_NAMES.get(session_state), "ambientTemperature": round(f32(data, OFFSETS["ambientTemperature"])), "trackTemperature": round(f32(data, OFFSETS["trackTemperature"])), "rainDensity": round(f32(data, OFFSETS["rainDensity"]) * 255), "snowDensity": round(f32(data, OFFSETS["snowDensity"]) * 255), "windSpeed": round(f32(data, OFFSETS["windSpeed"])), "windDirectionX": round(f32(data, OFFSETS["windDirectionX"])), "windDirectionY": round(f32(data, OFFSETS["windDirectionY"])), "yellowFlagState": i32(data, OFFSETS["yellowFlagState"]), "launchStage": i32(data, OFFSETS["launchStage"]), }, "Game State"), packet(7, sequence, { "participantsChangedTimestamp": timestamp, "participants": stats, }, "Time Stats"), packet(8, sequence, { "vehicles": vehicles, "classes": list(classes.values()), }, "Participant Vehicle Names"), ] return { "source": "shared-memory", "sequence": sequence, "capturedAt": utc_now(), "packets": packets, } def open_shared_memory() -> mmap.mmap: return mmap.mmap( -1, SHARED_MEMORY_SIZE, tagname=SHARED_MEMORY_NAME, access=mmap.ACCESS_READ, ) def post_json(url: str, payload: dict, timeout: float) -> None: body = json.dumps(payload, separators=(",", ":")).encode("utf-8") request = urllib.request.Request( url, data=body, headers={"content-type": "application/json"}, method="POST", ) with urllib.request.urlopen(request, timeout=timeout) as response: response.read() def main() -> None: parser = argparse.ArgumentParser( description="Read AMS2/Project CARS shared memory and feed JSON frames to the Node server." ) parser.add_argument("--url", default=DEFAULT_URL, help=f"Node ingest URL. Default: {DEFAULT_URL}") parser.add_argument("--interval", type=float, default=0.2, help="Seconds between frames.") parser.add_argument("--timeout", type=float, default=2.0, help="HTTP timeout in seconds.") parser.add_argument("--once", action="store_true", help="Read and send one frame, then exit.") parser.add_argument("--print", action="store_true", dest="print_frame", help="Print frames instead of POSTing.") args = parser.parse_args() if args.interval <= 0: raise SystemExit("--interval must be greater than 0") try: shared_memory = open_shared_memory() except OSError as error: raise SystemExit( f"Could not open shared memory '{SHARED_MEMORY_NAME}'. " "Start AMS2/Project CARS and enable shared memory first. " f"Original error: {error}" ) print(f"Reading shared memory '{SHARED_MEMORY_NAME}' every {args.interval:g}s.") if args.print_frame: print("Printing frames to stdout.") else: print(f"Posting frames to {args.url}") sequence = 0 try: while True: shared_memory.seek(0) data = shared_memory.read(SHARED_MEMORY_SIZE) frame = build_frame(data, sequence) if args.print_frame: print(json.dumps(frame, indent=2)) else: try: post_json(args.url, frame, args.timeout) if sequence % 20 == 0: print(f"sent frame {sequence} with {len(frame['packets'][2]['data']['participants'])} participants") except urllib.error.URLError as error: print(f"POST failed: {error}") sequence += 1 if args.once: break time.sleep(args.interval) except KeyboardInterrupt: print("\nStopped shared-memory bridge.") finally: shared_memory.close() if __name__ == "__main__": main()