171 lines
4.6 KiB
C++
171 lines
4.6 KiB
C++
#pragma once
|
|
|
|
#include "RuntimeEventCoalescingQueue.h"
|
|
#include "RuntimeEventQueue.h"
|
|
|
|
#include <algorithm>
|
|
#include <atomic>
|
|
#include <functional>
|
|
#include <map>
|
|
#include <mutex>
|
|
#include <vector>
|
|
|
|
struct RuntimeEventDispatchResult
|
|
{
|
|
std::size_t dispatchedEvents = 0;
|
|
std::size_t handlerInvocations = 0;
|
|
std::size_t handlerFailures = 0;
|
|
double dispatchDurationMilliseconds = 0.0;
|
|
};
|
|
|
|
class RuntimeEventDispatcher
|
|
{
|
|
public:
|
|
using Handler = std::function<void(const RuntimeEvent&)>;
|
|
|
|
explicit RuntimeEventDispatcher(std::size_t queueCapacity = 1024) :
|
|
mQueue(queueCapacity),
|
|
mCoalescingQueue(queueCapacity)
|
|
{
|
|
}
|
|
|
|
bool Publish(RuntimeEvent event)
|
|
{
|
|
if (!event.PayloadMatchesType())
|
|
return false;
|
|
|
|
if (event.sequence == 0)
|
|
event.sequence = mNextSequence.fetch_add(1);
|
|
|
|
if (ShouldCoalesce(event))
|
|
return mCoalescingQueue.Push(std::move(event));
|
|
|
|
return mQueue.Push(std::move(event));
|
|
}
|
|
|
|
template <typename Payload>
|
|
bool PublishPayload(Payload payload, std::string source = {})
|
|
{
|
|
return Publish(MakeRuntimeEvent(std::move(payload), std::move(source)));
|
|
}
|
|
|
|
void Subscribe(RuntimeEventType type, Handler handler)
|
|
{
|
|
std::lock_guard<std::mutex> lock(mHandlerMutex);
|
|
mHandlers[type].push_back(std::move(handler));
|
|
}
|
|
|
|
void SubscribeAll(Handler handler)
|
|
{
|
|
std::lock_guard<std::mutex> lock(mHandlerMutex);
|
|
mAllHandlers.push_back(std::move(handler));
|
|
}
|
|
|
|
RuntimeEventDispatchResult DispatchPending(std::size_t maxEvents = 0)
|
|
{
|
|
const auto startedAt = std::chrono::steady_clock::now();
|
|
RuntimeEventDispatchResult result;
|
|
FlushCoalescedToFifo(maxEvents);
|
|
std::vector<RuntimeEvent> events = mQueue.Drain(maxEvents);
|
|
result.dispatchedEvents = events.size();
|
|
|
|
for (const RuntimeEvent& event : events)
|
|
{
|
|
std::vector<Handler> handlers = HandlersFor(event.type);
|
|
result.handlerInvocations += handlers.size();
|
|
|
|
for (const Handler& handler : handlers)
|
|
{
|
|
try
|
|
{
|
|
handler(event);
|
|
}
|
|
catch (...)
|
|
{
|
|
++result.handlerFailures;
|
|
}
|
|
}
|
|
}
|
|
|
|
result.dispatchDurationMilliseconds =
|
|
std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now() - startedAt).count();
|
|
return result;
|
|
}
|
|
|
|
bool TryPop(RuntimeEvent& event)
|
|
{
|
|
return mQueue.TryPop(event);
|
|
}
|
|
|
|
RuntimeEventQueueMetrics GetQueueMetrics(std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now()) const
|
|
{
|
|
RuntimeEventQueueMetrics metrics = mQueue.GetMetrics(now);
|
|
const RuntimeEventCoalescingQueueMetrics coalescingMetrics = mCoalescingQueue.GetMetrics(now);
|
|
if (metrics.depth == 0)
|
|
metrics.oldestEventAgeMilliseconds = coalescingMetrics.oldestEventAgeMilliseconds;
|
|
else if (coalescingMetrics.depth > 0)
|
|
metrics.oldestEventAgeMilliseconds = (std::max)(metrics.oldestEventAgeMilliseconds, coalescingMetrics.oldestEventAgeMilliseconds);
|
|
metrics.depth += coalescingMetrics.depth;
|
|
metrics.capacity += coalescingMetrics.capacity;
|
|
metrics.droppedCount += coalescingMetrics.droppedCount;
|
|
metrics.coalescedCount = coalescingMetrics.coalescedCount;
|
|
return metrics;
|
|
}
|
|
|
|
std::size_t QueueDepth() const
|
|
{
|
|
return mQueue.Depth() + mCoalescingQueue.Depth();
|
|
}
|
|
|
|
private:
|
|
static bool ShouldCoalesce(const RuntimeEvent& event)
|
|
{
|
|
switch (event.type)
|
|
{
|
|
case RuntimeEventType::OscValueReceived:
|
|
case RuntimeEventType::OscCommitRequested:
|
|
case RuntimeEventType::RuntimeStateBroadcastRequested:
|
|
case RuntimeEventType::FileChangeDetected:
|
|
case RuntimeEventType::RuntimeReloadRequested:
|
|
case RuntimeEventType::ShaderBuildRequested:
|
|
case RuntimeEventType::RenderSnapshotPublishRequested:
|
|
case RuntimeEventType::TimingSampleRecorded:
|
|
case RuntimeEventType::QueueDepthChanged:
|
|
return true;
|
|
default:
|
|
return false;
|
|
}
|
|
}
|
|
|
|
void FlushCoalescedToFifo(std::size_t maxEvents)
|
|
{
|
|
const std::size_t fifoDepth = mQueue.Depth();
|
|
if (maxEvents != 0 && fifoDepth >= maxEvents)
|
|
return;
|
|
|
|
const std::size_t flushLimit = maxEvents == 0 ? 0 : maxEvents - fifoDepth;
|
|
std::vector<RuntimeEvent> events = mCoalescingQueue.Drain(flushLimit);
|
|
for (RuntimeEvent& event : events)
|
|
mQueue.Push(std::move(event));
|
|
}
|
|
|
|
std::vector<Handler> HandlersFor(RuntimeEventType type) const
|
|
{
|
|
std::lock_guard<std::mutex> lock(mHandlerMutex);
|
|
std::vector<Handler> handlers = mAllHandlers;
|
|
|
|
const auto found = mHandlers.find(type);
|
|
if (found != mHandlers.end())
|
|
handlers.insert(handlers.end(), found->second.begin(), found->second.end());
|
|
|
|
return handlers;
|
|
}
|
|
|
|
RuntimeEventQueue mQueue;
|
|
RuntimeEventCoalescingQueue mCoalescingQueue;
|
|
std::atomic<uint64_t> mNextSequence{ 1 };
|
|
mutable std::mutex mHandlerMutex;
|
|
std::map<RuntimeEventType, std::vector<Handler>> mHandlers;
|
|
std::vector<Handler> mAllHandlers;
|
|
};
|