#pragma once #include "RuntimeEvent.h" #include #include #include #include #include #include #include #include #include struct RuntimeEventCoalescingQueueMetrics { std::size_t depth = 0; std::size_t capacity = 0; std::size_t droppedCount = 0; std::size_t coalescedCount = 0; double oldestEventAgeMilliseconds = 0.0; }; inline std::string RuntimeEventDefaultCoalescingKey(const RuntimeEvent& event) { if (const auto* payload = std::get_if(&event.payload)) return std::string(RuntimeEventTypeName(event.type)) + ":" + payload->routeKey; if (const auto* payload = std::get_if(&event.payload)) return std::string(RuntimeEventTypeName(event.type)) + ":" + payload->routeKey; if (const auto* payload = std::get_if(&event.payload)) return std::string(RuntimeEventTypeName(event.type)) + ":" + payload->path; if (const auto* payload = std::get_if(&event.payload)) return std::string(RuntimeEventTypeName(event.type)) + ":" + std::to_string(payload->inputWidth) + "x" + std::to_string(payload->inputHeight) + ":" + (payload->preserveFeedbackState ? "preserve" : "reset"); if (const auto* payload = std::get_if(&event.payload)) return std::string(RuntimeEventTypeName(event.type)) + ":" + std::to_string(payload->outputWidth) + "x" + std::to_string(payload->outputHeight); if (const auto* payload = std::get_if(&event.payload)) return std::string(RuntimeEventTypeName(event.type)) + ":" + payload->subsystem + ":" + payload->metric; if (const auto* payload = std::get_if(&event.payload)) return std::string(RuntimeEventTypeName(event.type)) + ":" + payload->queueName; return std::string(RuntimeEventTypeName(event.type)); } class RuntimeEventCoalescingQueue { public: using KeySelector = std::function; explicit RuntimeEventCoalescingQueue(std::size_t capacity = 256, KeySelector keySelector = RuntimeEventDefaultCoalescingKey) : mCapacity(capacity), mKeySelector(std::move(keySelector)) { } bool Push(RuntimeEvent event) { const std::string key = mKeySelector(event); if (key.empty()) return false; std::lock_guard lock(mMutex); auto found = mEntries.find(key); if (found != mEntries.end()) { const auto firstCreatedAt = found->second.event.createdAt; found->second.event = std::move(event); found->second.event.createdAt = firstCreatedAt; ++found->second.coalescedCount; ++mCoalescedCount; return true; } if (mEntries.size() >= mCapacity) { ++mDroppedCount; return false; } mOrder.push_back(key); Entry entry; entry.event = std::move(event); mEntries.emplace(key, std::move(entry)); return true; } std::vector Drain(std::size_t maxEvents = 0) { std::vector events; std::lock_guard lock(mMutex); const std::size_t count = maxEvents == 0 || maxEvents > mOrder.size() ? mOrder.size() : maxEvents; events.reserve(count); for (std::size_t index = 0; index < count; ++index) { const std::string key = std::move(mOrder.front()); mOrder.pop_front(); auto found = mEntries.find(key); if (found == mEntries.end()) continue; events.push_back(std::move(found->second.event)); mEntries.erase(found); } return events; } RuntimeEventCoalescingQueueMetrics GetMetrics(std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now()) const { std::lock_guard lock(mMutex); RuntimeEventCoalescingQueueMetrics metrics; metrics.depth = mEntries.size(); metrics.capacity = mCapacity; metrics.droppedCount = mDroppedCount; metrics.coalescedCount = mCoalescedCount; if (!mOrder.empty()) { const auto found = mEntries.find(mOrder.front()); if (found != mEntries.end()) { const auto age = now - found->second.event.createdAt; metrics.oldestEventAgeMilliseconds = std::chrono::duration(age).count(); } } return metrics; } std::size_t Depth() const { std::lock_guard lock(mMutex); return mEntries.size(); } private: struct Entry { RuntimeEvent event; std::size_t coalescedCount = 0; }; mutable std::mutex mMutex; std::size_t mCapacity = 0; KeySelector mKeySelector; std::deque mOrder; std::map mEntries; std::size_t mDroppedCount = 0; std::size_t mCoalescedCount = 0; };