Files
racing-data/scripts/replay_udp.py
2026-05-19 15:38:16 +10:00

74 lines
2.3 KiB
Python

import argparse
import json
import socket
import time
from pathlib import Path
parser = argparse.ArgumentParser(description="Replay a Project CARS UDP recording.")
parser.add_argument("recording", type=Path, help="Recording directory containing manifest.jsonl.")
parser.add_argument("--host", default="127.0.0.1", help="Destination host. Defaults to 127.0.0.1.")
parser.add_argument("--port", type=int, default=5606, help="Destination UDP port. Defaults to 5606.")
parser.add_argument("--speed", type=float, default=1.0, help="Playback speed multiplier.")
parser.add_argument("--loop", action="store_true", help="Loop playback until interrupted.")
parser.add_argument(
"--types",
default=None,
help="Comma-separated V2 packet types to replay, for example 1,2,3,4,7,8. V1 packets are skipped when this is set."
)
args = parser.parse_args()
recording_dir = args.recording
manifest_path = recording_dir / "manifest.jsonl"
if not manifest_path.exists():
raise SystemExit(f"Missing manifest: {manifest_path}")
if args.speed <= 0:
raise SystemExit("--speed must be greater than 0")
type_filter = None
if args.types:
type_filter = {int(part.strip()) for part in args.types.split(",") if part.strip()}
with manifest_path.open("r", encoding="utf8") as manifest:
records = [json.loads(line) for line in manifest if line.strip()]
if type_filter is not None:
records = [
record for record in records
if record.get("format") == "v2" and record.get("packetType") in type_filter
]
if not records:
raise SystemExit("No packets to replay after applying filters.")
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def play_once() -> None:
playback_started = time.monotonic()
first_offset = records[0]["timeOffset"]
for record in records:
target_offset = (record["timeOffset"] - first_offset) / args.speed
delay = target_offset - (time.monotonic() - playback_started)
if delay > 0:
time.sleep(delay)
data = (recording_dir / record["file"]).read_bytes()
sock.sendto(data, (args.host, args.port))
print(
f"Replaying {len(records)} packets to {args.host}:{args.port} "
f"at {args.speed:g}x from {recording_dir}"
)
try:
while True:
play_once()
if not args.loop:
break
except KeyboardInterrupt:
print("\nStopped replay.")