event dispatcher
This commit is contained in:
@@ -0,0 +1,147 @@
|
||||
#pragma once
|
||||
|
||||
#include "RuntimeEvent.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <cstddef>
|
||||
#include <deque>
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
struct RuntimeEventCoalescingQueueMetrics
|
||||
{
|
||||
std::size_t depth = 0;
|
||||
std::size_t capacity = 0;
|
||||
std::size_t droppedCount = 0;
|
||||
std::size_t coalescedCount = 0;
|
||||
double oldestEventAgeMilliseconds = 0.0;
|
||||
};
|
||||
|
||||
inline std::string RuntimeEventDefaultCoalescingKey(const RuntimeEvent& event)
|
||||
{
|
||||
if (const auto* payload = std::get_if<OscValueReceivedEvent>(&event.payload))
|
||||
return std::string(RuntimeEventTypeName(event.type)) + ":" + payload->routeKey;
|
||||
if (const auto* payload = std::get_if<OscCommitRequestedEvent>(&event.payload))
|
||||
return std::string(RuntimeEventTypeName(event.type)) + ":" + payload->routeKey;
|
||||
if (const auto* payload = std::get_if<FileChangeDetectedEvent>(&event.payload))
|
||||
return std::string(RuntimeEventTypeName(event.type)) + ":" + payload->path;
|
||||
if (const auto* payload = std::get_if<QueueDepthChangedEvent>(&event.payload))
|
||||
return std::string(RuntimeEventTypeName(event.type)) + ":" + payload->queueName;
|
||||
|
||||
return std::string(RuntimeEventTypeName(event.type));
|
||||
}
|
||||
|
||||
class RuntimeEventCoalescingQueue
|
||||
{
|
||||
public:
|
||||
using KeySelector = std::function<std::string(const RuntimeEvent&)>;
|
||||
|
||||
explicit RuntimeEventCoalescingQueue(std::size_t capacity = 256, KeySelector keySelector = RuntimeEventDefaultCoalescingKey) :
|
||||
mCapacity(capacity),
|
||||
mKeySelector(std::move(keySelector))
|
||||
{
|
||||
}
|
||||
|
||||
bool Push(RuntimeEvent event)
|
||||
{
|
||||
const std::string key = mKeySelector(event);
|
||||
if (key.empty())
|
||||
return false;
|
||||
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
auto found = mEntries.find(key);
|
||||
if (found != mEntries.end())
|
||||
{
|
||||
const auto firstCreatedAt = found->second.event.createdAt;
|
||||
found->second.event = std::move(event);
|
||||
found->second.event.createdAt = firstCreatedAt;
|
||||
++found->second.coalescedCount;
|
||||
++mCoalescedCount;
|
||||
return true;
|
||||
}
|
||||
|
||||
if (mEntries.size() >= mCapacity)
|
||||
{
|
||||
++mDroppedCount;
|
||||
return false;
|
||||
}
|
||||
|
||||
mOrder.push_back(key);
|
||||
Entry entry;
|
||||
entry.event = std::move(event);
|
||||
mEntries.emplace(key, std::move(entry));
|
||||
return true;
|
||||
}
|
||||
|
||||
std::vector<RuntimeEvent> Drain(std::size_t maxEvents = 0)
|
||||
{
|
||||
std::vector<RuntimeEvent> events;
|
||||
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
const std::size_t count = maxEvents == 0 || maxEvents > mOrder.size() ? mOrder.size() : maxEvents;
|
||||
events.reserve(count);
|
||||
|
||||
for (std::size_t index = 0; index < count; ++index)
|
||||
{
|
||||
const std::string key = std::move(mOrder.front());
|
||||
mOrder.pop_front();
|
||||
|
||||
auto found = mEntries.find(key);
|
||||
if (found == mEntries.end())
|
||||
continue;
|
||||
|
||||
events.push_back(std::move(found->second.event));
|
||||
mEntries.erase(found);
|
||||
}
|
||||
|
||||
return events;
|
||||
}
|
||||
|
||||
RuntimeEventCoalescingQueueMetrics GetMetrics(std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now()) const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
|
||||
RuntimeEventCoalescingQueueMetrics metrics;
|
||||
metrics.depth = mEntries.size();
|
||||
metrics.capacity = mCapacity;
|
||||
metrics.droppedCount = mDroppedCount;
|
||||
metrics.coalescedCount = mCoalescedCount;
|
||||
|
||||
if (!mOrder.empty())
|
||||
{
|
||||
const auto found = mEntries.find(mOrder.front());
|
||||
if (found != mEntries.end())
|
||||
{
|
||||
const auto age = now - found->second.event.createdAt;
|
||||
metrics.oldestEventAgeMilliseconds = std::chrono::duration<double, std::milli>(age).count();
|
||||
}
|
||||
}
|
||||
|
||||
return metrics;
|
||||
}
|
||||
|
||||
std::size_t Depth() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
return mEntries.size();
|
||||
}
|
||||
|
||||
private:
|
||||
struct Entry
|
||||
{
|
||||
RuntimeEvent event;
|
||||
std::size_t coalescedCount = 0;
|
||||
};
|
||||
|
||||
mutable std::mutex mMutex;
|
||||
std::size_t mCapacity = 0;
|
||||
KeySelector mKeySelector;
|
||||
std::deque<std::string> mOrder;
|
||||
std::map<std::string, Entry> mEntries;
|
||||
std::size_t mDroppedCount = 0;
|
||||
std::size_t mCoalescedCount = 0;
|
||||
};
|
||||
Reference in New Issue
Block a user