#pragma once #include "RuntimeEventCoalescingQueue.h" #include "RuntimeEventQueue.h" #include #include #include #include #include #include struct RuntimeEventDispatchResult { std::size_t dispatchedEvents = 0; std::size_t handlerInvocations = 0; std::size_t handlerFailures = 0; double dispatchDurationMilliseconds = 0.0; }; class RuntimeEventDispatcher { public: using Handler = std::function; explicit RuntimeEventDispatcher(std::size_t queueCapacity = 1024) : mQueue(queueCapacity), mCoalescingQueue(queueCapacity) { } bool Publish(RuntimeEvent event) { if (!event.PayloadMatchesType()) return false; if (event.sequence == 0) event.sequence = mNextSequence.fetch_add(1); if (ShouldCoalesce(event)) return mCoalescingQueue.Push(std::move(event)); return mQueue.Push(std::move(event)); } template bool PublishPayload(Payload payload, std::string source = {}) { return Publish(MakeRuntimeEvent(std::move(payload), std::move(source))); } void Subscribe(RuntimeEventType type, Handler handler) { std::lock_guard lock(mHandlerMutex); mHandlers[type].push_back(std::move(handler)); } void SubscribeAll(Handler handler) { std::lock_guard lock(mHandlerMutex); mAllHandlers.push_back(std::move(handler)); } RuntimeEventDispatchResult DispatchPending(std::size_t maxEvents = 0) { const auto startedAt = std::chrono::steady_clock::now(); RuntimeEventDispatchResult result; FlushCoalescedToFifo(maxEvents); std::vector events = mQueue.Drain(maxEvents); result.dispatchedEvents = events.size(); for (const RuntimeEvent& event : events) { std::vector handlers = HandlersFor(event.type); result.handlerInvocations += handlers.size(); for (const Handler& handler : handlers) { try { handler(event); } catch (...) { ++result.handlerFailures; } } } result.dispatchDurationMilliseconds = std::chrono::duration(std::chrono::steady_clock::now() - startedAt).count(); return result; } bool TryPop(RuntimeEvent& event) { return mQueue.TryPop(event); } RuntimeEventQueueMetrics GetQueueMetrics(std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now()) const { RuntimeEventQueueMetrics metrics = mQueue.GetMetrics(now); const RuntimeEventCoalescingQueueMetrics coalescingMetrics = mCoalescingQueue.GetMetrics(now); if (metrics.depth == 0) metrics.oldestEventAgeMilliseconds = coalescingMetrics.oldestEventAgeMilliseconds; else if (coalescingMetrics.depth > 0) metrics.oldestEventAgeMilliseconds = (std::max)(metrics.oldestEventAgeMilliseconds, coalescingMetrics.oldestEventAgeMilliseconds); metrics.depth += coalescingMetrics.depth; metrics.capacity += coalescingMetrics.capacity; metrics.droppedCount += coalescingMetrics.droppedCount; metrics.coalescedCount = coalescingMetrics.coalescedCount; return metrics; } std::size_t QueueDepth() const { return mQueue.Depth() + mCoalescingQueue.Depth(); } private: static bool ShouldCoalesce(const RuntimeEvent& event) { switch (event.type) { case RuntimeEventType::OscValueReceived: case RuntimeEventType::OscCommitRequested: case RuntimeEventType::RuntimeStateBroadcastRequested: case RuntimeEventType::FileChangeDetected: case RuntimeEventType::RuntimeReloadRequested: case RuntimeEventType::ShaderBuildRequested: case RuntimeEventType::RenderSnapshotPublishRequested: case RuntimeEventType::TimingSampleRecorded: case RuntimeEventType::QueueDepthChanged: return true; default: return false; } } void FlushCoalescedToFifo(std::size_t maxEvents) { const std::size_t fifoDepth = mQueue.Depth(); if (maxEvents != 0 && fifoDepth >= maxEvents) return; const std::size_t flushLimit = maxEvents == 0 ? 0 : maxEvents - fifoDepth; std::vector events = mCoalescingQueue.Drain(flushLimit); for (RuntimeEvent& event : events) mQueue.Push(std::move(event)); } std::vector HandlersFor(RuntimeEventType type) const { std::lock_guard lock(mHandlerMutex); std::vector handlers = mAllHandlers; const auto found = mHandlers.find(type); if (found != mHandlers.end()) handlers.insert(handlers.end(), found->second.begin(), found->second.end()); return handlers; } RuntimeEventQueue mQueue; RuntimeEventCoalescingQueue mCoalescingQueue; std::atomic mNextSequence{ 1 }; mutable std::mutex mHandlerMutex; std::map> mHandlers; std::vector mAllHandlers; };