From b3705d96ccfc29ee27acb8ce9317f4d12c3cc3ba Mon Sep 17 00:00:00 2001 From: Aiden <68633820+awils27@users.noreply.github.com> Date: Mon, 11 May 2026 15:15:42 +1000 Subject: [PATCH] event dispatcher --- CMakeLists.txt | 23 + .../control/ControlServices.cpp | 3 +- .../control/ControlServices.h | 4 +- .../control/RuntimeServices.cpp | 4 +- .../control/RuntimeServices.h | 3 +- .../gl/OpenGLComposite.cpp | 7 +- .../gl/OpenGLComposite.h | 2 + .../gl/RuntimeUpdateController.cpp | 2 + .../gl/RuntimeUpdateController.h | 3 + .../coordination/RuntimeCoordinator.cpp | 5 +- .../runtime/coordination/RuntimeCoordinator.h | 4 +- .../runtime/events/RuntimeEvent.h | 87 ++++ .../events/RuntimeEventCoalescingQueue.h | 147 ++++++ .../runtime/events/RuntimeEventDispatcher.h | 117 +++++ .../runtime/events/RuntimeEventPayloads.h | 436 ++++++++++++++++++ .../runtime/events/RuntimeEventQueue.h | 99 ++++ .../runtime/events/RuntimeEventType.h | 151 ++++++ docs/PHASE_2_INTERNAL_EVENT_MODEL_DESIGN.md | 75 +++ tests/RuntimeEventTestHarness.h | 121 +++++ tests/RuntimeEventTypeTests.cpp | 334 ++++++++++++++ 20 files changed, 1617 insertions(+), 10 deletions(-) create mode 100644 apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEvent.h create mode 100644 apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventCoalescingQueue.h create mode 100644 apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventDispatcher.h create mode 100644 apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventPayloads.h create mode 100644 apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventQueue.h create mode 100644 apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventType.h create mode 100644 tests/RuntimeEventTestHarness.h create mode 100644 tests/RuntimeEventTypeTests.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 5babb31..2581b7b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -104,6 +104,12 @@ set(APP_SOURCES "${APP_DIR}/resource.h" "${APP_DIR}/runtime/coordination/RuntimeCoordinator.cpp" "${APP_DIR}/runtime/coordination/RuntimeCoordinator.h" + "${APP_DIR}/runtime/events/RuntimeEventCoalescingQueue.h" + "${APP_DIR}/runtime/events/RuntimeEventDispatcher.h" + "${APP_DIR}/runtime/events/RuntimeEvent.h" + "${APP_DIR}/runtime/events/RuntimeEventPayloads.h" + "${APP_DIR}/runtime/events/RuntimeEventQueue.h" + "${APP_DIR}/runtime/events/RuntimeEventType.h" "${APP_DIR}/runtime/presentation/RuntimeStateJson.cpp" "${APP_DIR}/runtime/presentation/RuntimeStateJson.h" "${APP_DIR}/runtime/presentation/RuntimeStatePresenter.cpp" @@ -158,6 +164,7 @@ target_include_directories(LoopThroughWithOpenGLCompositing PRIVATE "${APP_DIR}/platform" "${APP_DIR}/runtime" "${APP_DIR}/runtime/coordination" + "${APP_DIR}/runtime/events" "${APP_DIR}/runtime/presentation" "${APP_DIR}/runtime/snapshot" "${APP_DIR}/runtime/store" @@ -242,6 +249,22 @@ endif() add_test(NAME RuntimeParameterUtilsTests COMMAND RuntimeParameterUtilsTests) +add_executable(RuntimeEventTypeTests + "${CMAKE_CURRENT_SOURCE_DIR}/tests/RuntimeEventTypeTests.cpp" +) + +target_include_directories(RuntimeEventTypeTests PRIVATE + "${APP_DIR}" + "${APP_DIR}/runtime" + "${APP_DIR}/runtime/events" +) + +if(MSVC) + target_compile_options(RuntimeEventTypeTests PRIVATE /W3) +endif() + +add_test(NAME RuntimeEventTypeTests COMMAND RuntimeEventTypeTests) + add_executable(RuntimeSubsystemTests "${APP_DIR}/runtime/store/LayerStackStore.cpp" "${APP_DIR}/runtime/store/ShaderPackageCatalog.cpp" diff --git a/apps/LoopThroughWithOpenGLCompositing/control/ControlServices.cpp b/apps/LoopThroughWithOpenGLCompositing/control/ControlServices.cpp index f724e9f..d7784e8 100644 --- a/apps/LoopThroughWithOpenGLCompositing/control/ControlServices.cpp +++ b/apps/LoopThroughWithOpenGLCompositing/control/ControlServices.cpp @@ -6,9 +6,10 @@ #include "RuntimeStore.h" #include -ControlServices::ControlServices() : +ControlServices::ControlServices(RuntimeEventDispatcher& runtimeEventDispatcher) : mControlServer(std::make_unique()), mOscServer(std::make_unique()), + mRuntimeEventDispatcher(runtimeEventDispatcher), mPollRunning(false) { } diff --git a/apps/LoopThroughWithOpenGLCompositing/control/ControlServices.h b/apps/LoopThroughWithOpenGLCompositing/control/ControlServices.h index d5b3188..eef344a 100644 --- a/apps/LoopThroughWithOpenGLCompositing/control/ControlServices.h +++ b/apps/LoopThroughWithOpenGLCompositing/control/ControlServices.h @@ -15,6 +15,7 @@ class ControlServer; class OpenGLComposite; class OscServer; +class RuntimeEventDispatcher; class RuntimeStore; struct RuntimeCoordinatorServiceResult @@ -40,7 +41,7 @@ public: uint64_t generation = 0; }; - ControlServices(); + explicit ControlServices(RuntimeEventDispatcher& runtimeEventDispatcher); ~ControlServices(); bool Start(OpenGLComposite& composite, RuntimeStore& runtimeStore, std::string& error); @@ -79,6 +80,7 @@ private: std::unique_ptr mControlServer; std::unique_ptr mOscServer; + RuntimeEventDispatcher& mRuntimeEventDispatcher; std::thread mPollThread; std::atomic mPollRunning; std::mutex mRuntimeCoordinatorResultMutex; diff --git a/apps/LoopThroughWithOpenGLCompositing/control/RuntimeServices.cpp b/apps/LoopThroughWithOpenGLCompositing/control/RuntimeServices.cpp index 854f902..c85eb3c 100644 --- a/apps/LoopThroughWithOpenGLCompositing/control/RuntimeServices.cpp +++ b/apps/LoopThroughWithOpenGLCompositing/control/RuntimeServices.cpp @@ -2,8 +2,8 @@ #include "RuntimeStore.h" -RuntimeServices::RuntimeServices() : - mControlServices(std::make_unique()) +RuntimeServices::RuntimeServices(RuntimeEventDispatcher& runtimeEventDispatcher) : + mControlServices(std::make_unique(runtimeEventDispatcher)) { } diff --git a/apps/LoopThroughWithOpenGLCompositing/control/RuntimeServices.h b/apps/LoopThroughWithOpenGLCompositing/control/RuntimeServices.h index a00a865..6154acc 100644 --- a/apps/LoopThroughWithOpenGLCompositing/control/RuntimeServices.h +++ b/apps/LoopThroughWithOpenGLCompositing/control/RuntimeServices.h @@ -6,6 +6,7 @@ #include class OpenGLComposite; class RuntimeCoordinator; +class RuntimeEventDispatcher; class RuntimeStore; class RuntimeServices @@ -14,7 +15,7 @@ public: using AppliedOscUpdate = ControlServices::AppliedOscUpdate; using CompletedOscCommit = ControlServices::CompletedOscCommit; - RuntimeServices(); + explicit RuntimeServices(RuntimeEventDispatcher& runtimeEventDispatcher); ~RuntimeServices(); bool Start(OpenGLComposite& composite, RuntimeStore& runtimeStore, std::string& error); diff --git a/apps/LoopThroughWithOpenGLCompositing/gl/OpenGLComposite.cpp b/apps/LoopThroughWithOpenGLCompositing/gl/OpenGLComposite.cpp index 38dd8bd..fcfb45f 100644 --- a/apps/LoopThroughWithOpenGLCompositing/gl/OpenGLComposite.cpp +++ b/apps/LoopThroughWithOpenGLCompositing/gl/OpenGLComposite.cpp @@ -5,6 +5,7 @@ #include "PngScreenshotWriter.h" #include "RenderEngine.h" #include "RuntimeCoordinator.h" +#include "RuntimeEventDispatcher.h" #include "RuntimeParameterUtils.h" #include "RuntimeServices.h" #include "RuntimeSnapshotProvider.h" @@ -36,8 +37,9 @@ OpenGLComposite::OpenGLComposite(HWND hWnd, HDC hDC, HGLRC hRC) : { InitializeCriticalSection(&pMutex); mRuntimeStore = std::make_unique(); + mRuntimeEventDispatcher = std::make_unique(); mRuntimeSnapshotProvider = std::make_unique(mRuntimeStore->GetRenderSnapshotBuilder()); - mRuntimeCoordinator = std::make_unique(*mRuntimeStore); + mRuntimeCoordinator = std::make_unique(*mRuntimeStore, *mRuntimeEventDispatcher); mRenderEngine = std::make_unique( *mRuntimeSnapshotProvider, mRuntimeStore->GetHealthTelemetry(), @@ -49,10 +51,11 @@ OpenGLComposite::OpenGLComposite(HWND hWnd, HDC hDC, HGLRC hRC) : [this]() { paintGL(false); }); mVideoBackend = std::make_unique(*mRenderEngine, mRuntimeStore->GetHealthTelemetry()); mShaderBuildQueue = std::make_unique(*mRuntimeSnapshotProvider); - mRuntimeServices = std::make_unique(); + mRuntimeServices = std::make_unique(*mRuntimeEventDispatcher); mRuntimeUpdateController = std::make_unique( *mRuntimeStore, *mRuntimeCoordinator, + *mRuntimeEventDispatcher, *mRuntimeServices, *mRenderEngine, *mShaderBuildQueue, diff --git a/apps/LoopThroughWithOpenGLCompositing/gl/OpenGLComposite.h b/apps/LoopThroughWithOpenGLCompositing/gl/OpenGLComposite.h index 6611bb2..7bd86b8 100644 --- a/apps/LoopThroughWithOpenGLCompositing/gl/OpenGLComposite.h +++ b/apps/LoopThroughWithOpenGLCompositing/gl/OpenGLComposite.h @@ -22,6 +22,7 @@ class RenderEngine; class RuntimeCoordinator; +class RuntimeEventDispatcher; class RuntimeSnapshotProvider; class RuntimeServices; class RuntimeStore; @@ -76,6 +77,7 @@ private: std::unique_ptr mRuntimeStore; std::unique_ptr mRuntimeCoordinator; std::unique_ptr mRuntimeSnapshotProvider; + std::unique_ptr mRuntimeEventDispatcher; std::unique_ptr mRenderEngine; std::unique_ptr mShaderBuildQueue; std::unique_ptr mRuntimeServices; diff --git a/apps/LoopThroughWithOpenGLCompositing/gl/RuntimeUpdateController.cpp b/apps/LoopThroughWithOpenGLCompositing/gl/RuntimeUpdateController.cpp index c178d2d..83d31d9 100644 --- a/apps/LoopThroughWithOpenGLCompositing/gl/RuntimeUpdateController.cpp +++ b/apps/LoopThroughWithOpenGLCompositing/gl/RuntimeUpdateController.cpp @@ -11,12 +11,14 @@ RuntimeUpdateController::RuntimeUpdateController( RuntimeStore& runtimeStore, RuntimeCoordinator& runtimeCoordinator, + RuntimeEventDispatcher& runtimeEventDispatcher, RuntimeServices& runtimeServices, RenderEngine& renderEngine, ShaderBuildQueue& shaderBuildQueue, VideoBackend& videoBackend) : mRuntimeStore(runtimeStore), mRuntimeCoordinator(runtimeCoordinator), + mRuntimeEventDispatcher(runtimeEventDispatcher), mRuntimeServices(runtimeServices), mRenderEngine(renderEngine), mShaderBuildQueue(shaderBuildQueue), diff --git a/apps/LoopThroughWithOpenGLCompositing/gl/RuntimeUpdateController.h b/apps/LoopThroughWithOpenGLCompositing/gl/RuntimeUpdateController.h index e0f1dcf..f3d25e3 100644 --- a/apps/LoopThroughWithOpenGLCompositing/gl/RuntimeUpdateController.h +++ b/apps/LoopThroughWithOpenGLCompositing/gl/RuntimeUpdateController.h @@ -5,6 +5,7 @@ #include class RenderEngine; +class RuntimeEventDispatcher; class RuntimeServices; class RuntimeStore; class ShaderBuildQueue; @@ -16,6 +17,7 @@ public: RuntimeUpdateController( RuntimeStore& runtimeStore, RuntimeCoordinator& runtimeCoordinator, + RuntimeEventDispatcher& runtimeEventDispatcher, RuntimeServices& runtimeServices, RenderEngine& renderEngine, ShaderBuildQueue& shaderBuildQueue, @@ -29,6 +31,7 @@ public: private: RuntimeStore& mRuntimeStore; RuntimeCoordinator& mRuntimeCoordinator; + RuntimeEventDispatcher& mRuntimeEventDispatcher; RuntimeServices& mRuntimeServices; RenderEngine& mRenderEngine; ShaderBuildQueue& mShaderBuildQueue; diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/coordination/RuntimeCoordinator.cpp b/apps/LoopThroughWithOpenGLCompositing/runtime/coordination/RuntimeCoordinator.cpp index c09687a..4b6770c 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/coordination/RuntimeCoordinator.cpp +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/coordination/RuntimeCoordinator.cpp @@ -3,8 +3,9 @@ #include "RuntimeParameterUtils.h" #include "RuntimeStore.h" -RuntimeCoordinator::RuntimeCoordinator(RuntimeStore& runtimeStore) : - mRuntimeStore(runtimeStore) +RuntimeCoordinator::RuntimeCoordinator(RuntimeStore& runtimeStore, RuntimeEventDispatcher& runtimeEventDispatcher) : + mRuntimeStore(runtimeStore), + mRuntimeEventDispatcher(runtimeEventDispatcher) { } diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/coordination/RuntimeCoordinator.h b/apps/LoopThroughWithOpenGLCompositing/runtime/coordination/RuntimeCoordinator.h index fb01096..26033c1 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/coordination/RuntimeCoordinator.h +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/coordination/RuntimeCoordinator.h @@ -9,6 +9,7 @@ #include class RuntimeStore; +class RuntimeEventDispatcher; enum class RuntimeCoordinatorCommittedStateMode { @@ -42,7 +43,7 @@ struct RuntimeCoordinatorResult class RuntimeCoordinator { public: - explicit RuntimeCoordinator(RuntimeStore& runtimeStore); + RuntimeCoordinator(RuntimeStore& runtimeStore, RuntimeEventDispatcher& runtimeEventDispatcher); RuntimeCoordinatorResult AddLayer(const std::string& shaderId); RuntimeCoordinatorResult RemoveLayer(const std::string& layerId); @@ -93,6 +94,7 @@ private: RuntimeCoordinatorResult BuildAcceptedNoReloadResult() const; RuntimeStore& mRuntimeStore; + RuntimeEventDispatcher& mRuntimeEventDispatcher; mutable std::mutex mMutex; bool mPreserveFeedbackOnNextShaderBuild = false; std::atomic mUseCommittedLayerStates{ false }; diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEvent.h b/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEvent.h new file mode 100644 index 0000000..53430f4 --- /dev/null +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEvent.h @@ -0,0 +1,87 @@ +#pragma once + +#include "RuntimeEventPayloads.h" + +#include +#include +#include +#include +#include +#include + +using RuntimeEventPayload = std::variant< + std::monostate, + OscValueReceivedEvent, + OscValueCoalescedEvent, + OscCommitRequestedEvent, + HttpControlMutationRequestedEvent, + WebSocketClientConnectedEvent, + RuntimeStateBroadcastRequestedEvent, + FileChangeDetectedEvent, + ManualReloadRequestedEvent, + RuntimeMutationEvent, + RuntimeStateChangedEvent, + RuntimePersistenceRequestedEvent, + RuntimeReloadRequestedEvent, + ShaderPackagesChangedEvent, + RenderSnapshotPublishRequestedEvent, + RuntimeStatePresentationChangedEvent, + ShaderBuildEvent, + CompileStatusChangedEvent, + RenderSnapshotPublishedEvent, + RenderResetEvent, + OscOverlayEvent, + FrameRenderedEvent, + PreviewFrameAvailableEvent, + InputSignalChangedEvent, + InputFrameArrivedEvent, + OutputFrameScheduledEvent, + OutputFrameCompletedEvent, + BackendStateChangedEvent, + SubsystemWarningEvent, + SubsystemRecoveredEvent, + TimingSampleRecordedEvent, + QueueDepthChangedEvent>; + +inline RuntimeEventType RuntimeEventPayloadType(const RuntimeEventPayload& payload) +{ + return std::visit([](const auto& value) -> RuntimeEventType { + using PayloadType = std::decay_t; + if constexpr (std::is_same_v) + return RuntimeEventType::Unknown; + else + return RuntimeEventPayloadType(value); + }, payload); +} + +struct RuntimeEvent +{ + RuntimeEventType type = RuntimeEventType::Unknown; + uint64_t sequence = 0; + std::chrono::steady_clock::time_point createdAt = std::chrono::steady_clock::now(); + std::string source; + RuntimeEventPayload payload; + + bool HasPayload() const + { + return !std::holds_alternative(payload); + } + + bool PayloadMatchesType() const + { + return RuntimeEventPayloadType(payload) == type; + } +}; + +template +RuntimeEvent MakeRuntimeEvent(Payload payload, std::string source = {}, uint64_t sequence = 0, + std::chrono::steady_clock::time_point createdAt = std::chrono::steady_clock::now()) +{ + RuntimeEvent event; + event.type = RuntimeEventPayloadType(payload); + event.sequence = sequence; + event.createdAt = createdAt; + event.source = std::move(source); + event.payload = std::move(payload); + return event; +} diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventCoalescingQueue.h b/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventCoalescingQueue.h new file mode 100644 index 0000000..918cd39 --- /dev/null +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventCoalescingQueue.h @@ -0,0 +1,147 @@ +#pragma once + +#include "RuntimeEvent.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +struct RuntimeEventCoalescingQueueMetrics +{ + std::size_t depth = 0; + std::size_t capacity = 0; + std::size_t droppedCount = 0; + std::size_t coalescedCount = 0; + double oldestEventAgeMilliseconds = 0.0; +}; + +inline std::string RuntimeEventDefaultCoalescingKey(const RuntimeEvent& event) +{ + if (const auto* payload = std::get_if(&event.payload)) + return std::string(RuntimeEventTypeName(event.type)) + ":" + payload->routeKey; + if (const auto* payload = std::get_if(&event.payload)) + return std::string(RuntimeEventTypeName(event.type)) + ":" + payload->routeKey; + if (const auto* payload = std::get_if(&event.payload)) + return std::string(RuntimeEventTypeName(event.type)) + ":" + payload->path; + if (const auto* payload = std::get_if(&event.payload)) + return std::string(RuntimeEventTypeName(event.type)) + ":" + payload->queueName; + + return std::string(RuntimeEventTypeName(event.type)); +} + +class RuntimeEventCoalescingQueue +{ +public: + using KeySelector = std::function; + + explicit RuntimeEventCoalescingQueue(std::size_t capacity = 256, KeySelector keySelector = RuntimeEventDefaultCoalescingKey) : + mCapacity(capacity), + mKeySelector(std::move(keySelector)) + { + } + + bool Push(RuntimeEvent event) + { + const std::string key = mKeySelector(event); + if (key.empty()) + return false; + + std::lock_guard lock(mMutex); + auto found = mEntries.find(key); + if (found != mEntries.end()) + { + const auto firstCreatedAt = found->second.event.createdAt; + found->second.event = std::move(event); + found->second.event.createdAt = firstCreatedAt; + ++found->second.coalescedCount; + ++mCoalescedCount; + return true; + } + + if (mEntries.size() >= mCapacity) + { + ++mDroppedCount; + return false; + } + + mOrder.push_back(key); + Entry entry; + entry.event = std::move(event); + mEntries.emplace(key, std::move(entry)); + return true; + } + + std::vector Drain(std::size_t maxEvents = 0) + { + std::vector events; + + std::lock_guard lock(mMutex); + const std::size_t count = maxEvents == 0 || maxEvents > mOrder.size() ? mOrder.size() : maxEvents; + events.reserve(count); + + for (std::size_t index = 0; index < count; ++index) + { + const std::string key = std::move(mOrder.front()); + mOrder.pop_front(); + + auto found = mEntries.find(key); + if (found == mEntries.end()) + continue; + + events.push_back(std::move(found->second.event)); + mEntries.erase(found); + } + + return events; + } + + RuntimeEventCoalescingQueueMetrics GetMetrics(std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now()) const + { + std::lock_guard lock(mMutex); + + RuntimeEventCoalescingQueueMetrics metrics; + metrics.depth = mEntries.size(); + metrics.capacity = mCapacity; + metrics.droppedCount = mDroppedCount; + metrics.coalescedCount = mCoalescedCount; + + if (!mOrder.empty()) + { + const auto found = mEntries.find(mOrder.front()); + if (found != mEntries.end()) + { + const auto age = now - found->second.event.createdAt; + metrics.oldestEventAgeMilliseconds = std::chrono::duration(age).count(); + } + } + + return metrics; + } + + std::size_t Depth() const + { + std::lock_guard lock(mMutex); + return mEntries.size(); + } + +private: + struct Entry + { + RuntimeEvent event; + std::size_t coalescedCount = 0; + }; + + mutable std::mutex mMutex; + std::size_t mCapacity = 0; + KeySelector mKeySelector; + std::deque mOrder; + std::map mEntries; + std::size_t mDroppedCount = 0; + std::size_t mCoalescedCount = 0; +}; diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventDispatcher.h b/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventDispatcher.h new file mode 100644 index 0000000..f479d63 --- /dev/null +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventDispatcher.h @@ -0,0 +1,117 @@ +#pragma once + +#include "RuntimeEventQueue.h" + +#include +#include +#include +#include +#include + +struct RuntimeEventDispatchResult +{ + std::size_t dispatchedEvents = 0; + std::size_t handlerInvocations = 0; + std::size_t handlerFailures = 0; +}; + +class RuntimeEventDispatcher +{ +public: + using Handler = std::function; + + explicit RuntimeEventDispatcher(std::size_t queueCapacity = 1024) : + mQueue(queueCapacity) + { + } + + bool Publish(RuntimeEvent event) + { + if (!event.PayloadMatchesType()) + return false; + + if (event.sequence == 0) + event.sequence = mNextSequence.fetch_add(1); + + return mQueue.Push(std::move(event)); + } + + template + bool PublishPayload(Payload payload, std::string source = {}) + { + return Publish(MakeRuntimeEvent(std::move(payload), std::move(source))); + } + + void Subscribe(RuntimeEventType type, Handler handler) + { + std::lock_guard lock(mHandlerMutex); + mHandlers[type].push_back(std::move(handler)); + } + + void SubscribeAll(Handler handler) + { + std::lock_guard lock(mHandlerMutex); + mAllHandlers.push_back(std::move(handler)); + } + + RuntimeEventDispatchResult DispatchPending(std::size_t maxEvents = 0) + { + RuntimeEventDispatchResult result; + std::vector events = mQueue.Drain(maxEvents); + result.dispatchedEvents = events.size(); + + for (const RuntimeEvent& event : events) + { + std::vector handlers = HandlersFor(event.type); + result.handlerInvocations += handlers.size(); + + for (const Handler& handler : handlers) + { + try + { + handler(event); + } + catch (...) + { + ++result.handlerFailures; + } + } + } + + return result; + } + + bool TryPop(RuntimeEvent& event) + { + return mQueue.TryPop(event); + } + + RuntimeEventQueueMetrics GetQueueMetrics(std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now()) const + { + return mQueue.GetMetrics(now); + } + + std::size_t QueueDepth() const + { + return mQueue.Depth(); + } + +private: + std::vector HandlersFor(RuntimeEventType type) const + { + std::lock_guard lock(mHandlerMutex); + std::vector handlers = mAllHandlers; + + const auto found = mHandlers.find(type); + if (found != mHandlers.end()) + handlers.insert(handlers.end(), found->second.begin(), found->second.end()); + + return handlers; + } + + RuntimeEventQueue mQueue; + std::atomic mNextSequence{ 1 }; + mutable std::mutex mHandlerMutex; + std::map> mHandlers; + std::vector mAllHandlers; +}; diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventPayloads.h b/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventPayloads.h new file mode 100644 index 0000000..15f0e5d --- /dev/null +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventPayloads.h @@ -0,0 +1,436 @@ +#pragma once + +#include "RuntimeEventType.h" + +#include +#include +#include + +enum class RuntimeEventSeverity +{ + Debug, + Info, + Warning, + Error +}; + +enum class RuntimeEventRenderResetScope +{ + None, + TemporalHistoryOnly, + TemporalHistoryAndFeedback +}; + +enum class RuntimeEventShaderBuildPhase +{ + Requested, + Prepared, + Applied, + Failed +}; + +struct OscValueReceivedEvent +{ + std::string routeKey; + std::string layerKey; + std::string parameterKey; + std::string valueJson; + uint64_t generation = 0; +}; + +struct OscValueCoalescedEvent +{ + std::string routeKey; + std::size_t coalescedCount = 0; + uint64_t latestGeneration = 0; +}; + +struct OscCommitRequestedEvent +{ + std::string routeKey; + std::string layerKey; + std::string parameterKey; + std::string valueJson; + uint64_t generation = 0; +}; + +struct HttpControlMutationRequestedEvent +{ + std::string method; + std::string path; + std::string bodyJson; +}; + +struct WebSocketClientConnectedEvent +{ + std::string clientId; + std::size_t connectedClientCount = 0; +}; + +struct RuntimeStateBroadcastRequestedEvent +{ + std::string reason; + bool coalescable = true; +}; + +struct FileChangeDetectedEvent +{ + std::string path; + bool shaderPackageCandidate = false; + bool runtimeConfigCandidate = false; + bool presetCandidate = false; +}; + +struct ManualReloadRequestedEvent +{ + bool preserveFeedbackState = false; + std::string reason; +}; + +struct RuntimeMutationEvent +{ + std::string action; + bool accepted = false; + bool runtimeStateChanged = false; + bool runtimeStateBroadcastRequired = false; + bool shaderBuildRequested = false; + bool persistenceRequested = false; + bool clearTransientOscState = false; + RuntimeEventRenderResetScope renderResetScope = RuntimeEventRenderResetScope::None; + std::string errorMessage; +}; + +struct RuntimeStateChangedEvent +{ + std::string reason; + bool renderVisible = false; + bool persistenceRequested = false; +}; + +struct RuntimePersistenceRequestedEvent +{ + std::string reason; + bool debounceAllowed = true; +}; + +struct RuntimeReloadRequestedEvent +{ + bool preserveFeedbackState = false; + std::string reason; +}; + +struct ShaderPackagesChangedEvent +{ + bool registryChanged = false; + std::size_t packageCount = 0; + std::string reason; +}; + +struct RenderSnapshotPublishRequestedEvent +{ + unsigned inputWidth = 0; + unsigned inputHeight = 0; + unsigned outputWidth = 0; + unsigned outputHeight = 0; + std::string reason; +}; + +struct RuntimeStatePresentationChangedEvent +{ + std::string reason; +}; + +struct ShaderBuildEvent +{ + RuntimeEventShaderBuildPhase phase = RuntimeEventShaderBuildPhase::Requested; + unsigned inputWidth = 0; + unsigned inputHeight = 0; + bool preserveFeedbackState = false; + bool succeeded = false; + std::string message; +}; + +struct CompileStatusChangedEvent +{ + bool succeeded = false; + std::string message; +}; + +struct RenderSnapshotPublishedEvent +{ + uint64_t snapshotVersion = 0; + uint64_t structureVersion = 0; + uint64_t parameterVersion = 0; + uint64_t packageVersion = 0; + unsigned outputWidth = 0; + unsigned outputHeight = 0; + std::size_t layerCount = 0; +}; + +struct RenderResetEvent +{ + RuntimeEventRenderResetScope scope = RuntimeEventRenderResetScope::None; + bool applied = false; + std::string reason; +}; + +struct OscOverlayEvent +{ + std::string routeKey; + std::string layerKey; + std::string parameterKey; + uint64_t generation = 0; + bool settled = false; +}; + +struct FrameRenderedEvent +{ + uint64_t frameIndex = 0; + double renderMilliseconds = 0.0; +}; + +struct PreviewFrameAvailableEvent +{ + uint64_t frameIndex = 0; + unsigned width = 0; + unsigned height = 0; +}; + +struct InputSignalChangedEvent +{ + bool hasSignal = false; + unsigned width = 0; + unsigned height = 0; + std::string modeName; +}; + +struct InputFrameArrivedEvent +{ + uint64_t frameIndex = 0; + unsigned width = 0; + unsigned height = 0; + long rowBytes = 0; + std::string pixelFormat; + bool hasNoInputSource = false; +}; + +struct OutputFrameScheduledEvent +{ + uint64_t frameIndex = 0; + int64_t streamTime = 0; + int64_t duration = 0; + int64_t timeScale = 0; +}; + +struct OutputFrameCompletedEvent +{ + uint64_t frameIndex = 0; + std::string result; +}; + +struct BackendStateChangedEvent +{ + std::string backendName; + std::string state; + std::string message; +}; + +struct SubsystemWarningEvent +{ + std::string subsystem; + std::string warningKey; + RuntimeEventSeverity severity = RuntimeEventSeverity::Warning; + std::string message; + bool cleared = false; +}; + +struct SubsystemRecoveredEvent +{ + std::string subsystem; + std::string recoveryKey; + std::string message; +}; + +struct TimingSampleRecordedEvent +{ + std::string subsystem; + std::string metric; + double value = 0.0; + std::string unit; +}; + +struct QueueDepthChangedEvent +{ + std::string queueName; + std::size_t depth = 0; + std::size_t capacity = 0; + std::size_t droppedCount = 0; + std::size_t coalescedCount = 0; +}; + +constexpr RuntimeEventType RuntimeEventPayloadType(const OscValueReceivedEvent&) +{ + return RuntimeEventType::OscValueReceived; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const OscValueCoalescedEvent&) +{ + return RuntimeEventType::OscValueCoalesced; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const OscCommitRequestedEvent&) +{ + return RuntimeEventType::OscCommitRequested; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const HttpControlMutationRequestedEvent&) +{ + return RuntimeEventType::HttpControlMutationRequested; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const WebSocketClientConnectedEvent&) +{ + return RuntimeEventType::WebSocketClientConnected; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const RuntimeStateBroadcastRequestedEvent&) +{ + return RuntimeEventType::RuntimeStateBroadcastRequested; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const FileChangeDetectedEvent&) +{ + return RuntimeEventType::FileChangeDetected; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const ManualReloadRequestedEvent&) +{ + return RuntimeEventType::ManualReloadRequested; +} + +inline RuntimeEventType RuntimeEventPayloadType(const RuntimeMutationEvent& event) +{ + return event.accepted ? RuntimeEventType::RuntimeMutationAccepted : RuntimeEventType::RuntimeMutationRejected; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const RuntimeStateChangedEvent&) +{ + return RuntimeEventType::RuntimeStateChanged; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const RuntimePersistenceRequestedEvent&) +{ + return RuntimeEventType::RuntimePersistenceRequested; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const RuntimeReloadRequestedEvent&) +{ + return RuntimeEventType::RuntimeReloadRequested; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const ShaderPackagesChangedEvent&) +{ + return RuntimeEventType::ShaderPackagesChanged; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const RenderSnapshotPublishRequestedEvent&) +{ + return RuntimeEventType::RenderSnapshotPublishRequested; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const RuntimeStatePresentationChangedEvent&) +{ + return RuntimeEventType::RuntimeStatePresentationChanged; +} + +inline RuntimeEventType RuntimeEventPayloadType(const ShaderBuildEvent& event) +{ + switch (event.phase) + { + case RuntimeEventShaderBuildPhase::Requested: + return RuntimeEventType::ShaderBuildRequested; + case RuntimeEventShaderBuildPhase::Prepared: + return RuntimeEventType::ShaderBuildPrepared; + case RuntimeEventShaderBuildPhase::Applied: + return RuntimeEventType::ShaderBuildApplied; + case RuntimeEventShaderBuildPhase::Failed: + return RuntimeEventType::ShaderBuildFailed; + } + + return RuntimeEventType::ShaderBuildRequested; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const CompileStatusChangedEvent&) +{ + return RuntimeEventType::CompileStatusChanged; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const RenderSnapshotPublishedEvent&) +{ + return RuntimeEventType::RenderSnapshotPublished; +} + +inline RuntimeEventType RuntimeEventPayloadType(const RenderResetEvent& event) +{ + return event.applied ? RuntimeEventType::RenderResetApplied : RuntimeEventType::RenderResetRequested; +} + +inline RuntimeEventType RuntimeEventPayloadType(const OscOverlayEvent& event) +{ + return event.settled ? RuntimeEventType::OscOverlaySettled : RuntimeEventType::OscOverlayApplied; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const FrameRenderedEvent&) +{ + return RuntimeEventType::FrameRendered; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const PreviewFrameAvailableEvent&) +{ + return RuntimeEventType::PreviewFrameAvailable; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const InputSignalChangedEvent&) +{ + return RuntimeEventType::InputSignalChanged; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const InputFrameArrivedEvent&) +{ + return RuntimeEventType::InputFrameArrived; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const OutputFrameScheduledEvent&) +{ + return RuntimeEventType::OutputFrameScheduled; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const OutputFrameCompletedEvent&) +{ + return RuntimeEventType::OutputFrameCompleted; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const BackendStateChangedEvent&) +{ + return RuntimeEventType::BackendStateChanged; +} + +inline RuntimeEventType RuntimeEventPayloadType(const SubsystemWarningEvent& event) +{ + return event.cleared ? RuntimeEventType::SubsystemWarningCleared : RuntimeEventType::SubsystemWarningRaised; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const SubsystemRecoveredEvent&) +{ + return RuntimeEventType::SubsystemRecovered; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const TimingSampleRecordedEvent&) +{ + return RuntimeEventType::TimingSampleRecorded; +} + +constexpr RuntimeEventType RuntimeEventPayloadType(const QueueDepthChangedEvent&) +{ + return RuntimeEventType::QueueDepthChanged; +} diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventQueue.h b/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventQueue.h new file mode 100644 index 0000000..509bc0c --- /dev/null +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventQueue.h @@ -0,0 +1,99 @@ +#pragma once + +#include "RuntimeEvent.h" + +#include +#include +#include +#include +#include + +struct RuntimeEventQueueMetrics +{ + std::size_t depth = 0; + std::size_t capacity = 0; + std::size_t droppedCount = 0; + double oldestEventAgeMilliseconds = 0.0; +}; + +class RuntimeEventQueue +{ +public: + explicit RuntimeEventQueue(std::size_t capacity = 1024) : + mCapacity(capacity) + { + } + + bool Push(RuntimeEvent event) + { + std::lock_guard lock(mMutex); + if (mEvents.size() >= mCapacity) + { + ++mDroppedCount; + return false; + } + + mEvents.push_back(std::move(event)); + return true; + } + + bool TryPop(RuntimeEvent& event) + { + std::lock_guard lock(mMutex); + if (mEvents.empty()) + return false; + + event = std::move(mEvents.front()); + mEvents.pop_front(); + return true; + } + + std::vector Drain(std::size_t maxEvents = 0) + { + std::vector events; + + std::lock_guard lock(mMutex); + const std::size_t count = maxEvents == 0 || maxEvents > mEvents.size() ? mEvents.size() : maxEvents; + events.reserve(count); + for (std::size_t index = 0; index < count; ++index) + { + events.push_back(std::move(mEvents.front())); + mEvents.pop_front(); + } + + return events; + } + + RuntimeEventQueueMetrics GetMetrics(std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now()) const + { + std::lock_guard lock(mMutex); + + RuntimeEventQueueMetrics metrics; + metrics.depth = mEvents.size(); + metrics.capacity = mCapacity; + metrics.droppedCount = mDroppedCount; + if (!mEvents.empty()) + { + const auto age = now - mEvents.front().createdAt; + metrics.oldestEventAgeMilliseconds = std::chrono::duration(age).count(); + } + return metrics; + } + + std::size_t Depth() const + { + std::lock_guard lock(mMutex); + return mEvents.size(); + } + + std::size_t Capacity() const + { + return mCapacity; + } + +private: + mutable std::mutex mMutex; + std::deque mEvents; + std::size_t mCapacity = 0; + std::size_t mDroppedCount = 0; +}; diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventType.h b/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventType.h new file mode 100644 index 0000000..cb8739d --- /dev/null +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/events/RuntimeEventType.h @@ -0,0 +1,151 @@ +#pragma once + +#include + +enum class RuntimeEventType +{ + Unknown = 0, + + // Control ingress. + OscValueReceived, + OscValueCoalesced, + OscCommitRequested, + HttpControlMutationRequested, + WebSocketClientConnected, + RuntimeStateBroadcastRequested, + FileChangeDetected, + ManualReloadRequested, + + // Runtime policy and state. + RuntimeMutationAccepted, + RuntimeMutationRejected, + RuntimeStateChanged, + RuntimePersistenceRequested, + RuntimeReloadRequested, + ShaderPackagesChanged, + RenderSnapshotPublishRequested, + RuntimeStatePresentationChanged, + + // Shader build lifecycle. + ShaderBuildRequested, + ShaderBuildPrepared, + ShaderBuildApplied, + ShaderBuildFailed, + CompileStatusChanged, + + // Render lifecycle. + RenderSnapshotPublished, + RenderResetRequested, + RenderResetApplied, + OscOverlayApplied, + OscOverlaySettled, + FrameRendered, + PreviewFrameAvailable, + + // Video backend lifecycle. + InputSignalChanged, + InputFrameArrived, + OutputFrameScheduled, + OutputFrameCompleted, + OutputLateFrameDetected, + OutputDroppedFrameDetected, + BackendStateChanged, + + // Health and telemetry. + SubsystemWarningRaised, + SubsystemWarningCleared, + SubsystemRecovered, + TimingSampleRecorded, + QueueDepthChanged +}; + +constexpr std::string_view RuntimeEventTypeName(RuntimeEventType type) +{ + switch (type) + { + case RuntimeEventType::Unknown: + return "Unknown"; + case RuntimeEventType::OscValueReceived: + return "OscValueReceived"; + case RuntimeEventType::OscValueCoalesced: + return "OscValueCoalesced"; + case RuntimeEventType::OscCommitRequested: + return "OscCommitRequested"; + case RuntimeEventType::HttpControlMutationRequested: + return "HttpControlMutationRequested"; + case RuntimeEventType::WebSocketClientConnected: + return "WebSocketClientConnected"; + case RuntimeEventType::RuntimeStateBroadcastRequested: + return "RuntimeStateBroadcastRequested"; + case RuntimeEventType::FileChangeDetected: + return "FileChangeDetected"; + case RuntimeEventType::ManualReloadRequested: + return "ManualReloadRequested"; + case RuntimeEventType::RuntimeMutationAccepted: + return "RuntimeMutationAccepted"; + case RuntimeEventType::RuntimeMutationRejected: + return "RuntimeMutationRejected"; + case RuntimeEventType::RuntimeStateChanged: + return "RuntimeStateChanged"; + case RuntimeEventType::RuntimePersistenceRequested: + return "RuntimePersistenceRequested"; + case RuntimeEventType::RuntimeReloadRequested: + return "RuntimeReloadRequested"; + case RuntimeEventType::ShaderPackagesChanged: + return "ShaderPackagesChanged"; + case RuntimeEventType::RenderSnapshotPublishRequested: + return "RenderSnapshotPublishRequested"; + case RuntimeEventType::RuntimeStatePresentationChanged: + return "RuntimeStatePresentationChanged"; + case RuntimeEventType::ShaderBuildRequested: + return "ShaderBuildRequested"; + case RuntimeEventType::ShaderBuildPrepared: + return "ShaderBuildPrepared"; + case RuntimeEventType::ShaderBuildApplied: + return "ShaderBuildApplied"; + case RuntimeEventType::ShaderBuildFailed: + return "ShaderBuildFailed"; + case RuntimeEventType::CompileStatusChanged: + return "CompileStatusChanged"; + case RuntimeEventType::RenderSnapshotPublished: + return "RenderSnapshotPublished"; + case RuntimeEventType::RenderResetRequested: + return "RenderResetRequested"; + case RuntimeEventType::RenderResetApplied: + return "RenderResetApplied"; + case RuntimeEventType::OscOverlayApplied: + return "OscOverlayApplied"; + case RuntimeEventType::OscOverlaySettled: + return "OscOverlaySettled"; + case RuntimeEventType::FrameRendered: + return "FrameRendered"; + case RuntimeEventType::PreviewFrameAvailable: + return "PreviewFrameAvailable"; + case RuntimeEventType::InputSignalChanged: + return "InputSignalChanged"; + case RuntimeEventType::InputFrameArrived: + return "InputFrameArrived"; + case RuntimeEventType::OutputFrameScheduled: + return "OutputFrameScheduled"; + case RuntimeEventType::OutputFrameCompleted: + return "OutputFrameCompleted"; + case RuntimeEventType::OutputLateFrameDetected: + return "OutputLateFrameDetected"; + case RuntimeEventType::OutputDroppedFrameDetected: + return "OutputDroppedFrameDetected"; + case RuntimeEventType::BackendStateChanged: + return "BackendStateChanged"; + case RuntimeEventType::SubsystemWarningRaised: + return "SubsystemWarningRaised"; + case RuntimeEventType::SubsystemWarningCleared: + return "SubsystemWarningCleared"; + case RuntimeEventType::SubsystemRecovered: + return "SubsystemRecovered"; + case RuntimeEventType::TimingSampleRecorded: + return "TimingSampleRecorded"; + case RuntimeEventType::QueueDepthChanged: + return "QueueDepthChanged"; + } + + return "Unknown"; +} diff --git a/docs/PHASE_2_INTERNAL_EVENT_MODEL_DESIGN.md b/docs/PHASE_2_INTERNAL_EVENT_MODEL_DESIGN.md index 1a05dff..716f2ce 100644 --- a/docs/PHASE_2_INTERNAL_EVENT_MODEL_DESIGN.md +++ b/docs/PHASE_2_INTERNAL_EVENT_MODEL_DESIGN.md @@ -307,6 +307,24 @@ Suggested components: Initial implementation can be single-process and mostly single-dispatch-thread. The important part is that event publication and event handling become explicit. +### Dispatcher Ownership Decision + +The first concrete implementation uses one app-owned `RuntimeEventDispatcher`. + +Ownership: + +- `OpenGLComposite` owns the dispatcher as part of the current composition root. + +References: + +- `RuntimeServices` receives the dispatcher and passes it to `ControlServices`. +- `RuntimeCoordinator` receives the dispatcher so coordinator outcomes can become explicit events. +- `RuntimeUpdateController` receives the dispatcher so it can become the first effect/apply handler. + +This is intentionally a composition-root dependency, not a new subsystem dependency. Subsystems should not construct their own dispatchers, and future tests should use `RuntimeEventTestHarness` rather than creating ad hoc event plumbing. + +The dispatcher should move out of `OpenGLComposite` only if a later application-shell/composition-root object replaces `OpenGLComposite` as the owner of subsystem wiring. + ## Queue Policy Not every event deserves the same queue semantics. @@ -342,6 +360,63 @@ Some calls may remain synchronous during Phase 2: The rule is that synchronous calls should still publish events for accepted/rejected/completed work, so the rest of the app does not need to infer side effects from the call path. +## Event Bridge Policy + +This section is the implementation rulebook for converting existing direct calls and result queues into events. Future Phase 2 lanes should use this table unless they deliberately update the policy here first. + +### Bridge Categories + +| Bridge category | Use when | Queue shape | Handler expectation | +| --- | --- | --- | --- | +| `fifo-fact` | every occurrence matters and must be observed in order | bounded FIFO | handler consumes each event exactly once | +| `coalesced-latest` | only the latest value per key matters | bounded coalescing queue | handler consumes the latest event and telemetry records collapsed count | +| `sync-command-with-event` | caller needs an immediate success/error result | direct owner call plus follow-up event publication | handler must not be required for the caller's response | +| `observation-only` | event is telemetry/diagnostic and must not drive core behavior | FIFO or coalesced depending on rate | handler failure must never block app behavior | +| `compatibility-poll` | source cannot yet publish an event directly | temporary poll adapter publishes typed events | poll interval should shrink or become wakeup-driven over Phase 2 | + +### Current Bridge Decisions + +| Current flow | First Phase 2 bridge | Event(s) | Queue policy | +| --- | --- | --- | --- | +| OSC latest-value updates | `ControlServices` ingress bridge | `OscValueReceived`, optional `OscValueCoalesced` | `coalesced-latest` by route key | +| OSC commit after settle | `ControlServices -> RuntimeCoordinator` bridge | `OscCommitRequested`, then `RuntimeMutationAccepted` or `RuntimeMutationRejected` | commit request `coalesced-latest` by route key; mutation result `fifo-fact` | +| HTTP/UI mutation needing response | direct call into `RuntimeCoordinator` | `RuntimeMutationAccepted` or `RuntimeMutationRejected` after the synchronous response path | `sync-command-with-event` | +| runtime-state broadcast request | presentation/broadcast bridge | `RuntimeStatePresentationChanged`, `RuntimeStateBroadcastRequested` | `coalesced-latest` by event type or reason family | +| manual reload button | control ingress bridge | `ManualReloadRequested`, then `RuntimeReloadRequested` | `fifo-fact` for manual request; reload execution may coalesce | +| file watcher changes | file-watch bridge | `FileChangeDetected`, then `RuntimeReloadRequested` | `coalesced-latest` by path, then coalesced reload request | +| runtime store poll fallback | compatibility poll adapter | `ShaderPackagesChanged`, `RuntimeReloadRequested`, or warning event | `compatibility-poll` until file events fully replace polling | +| shader build request | runtime/render bridge | `ShaderBuildRequested` | `coalesced-latest` by input dimensions and preserve-feedback flag | +| shader build ready/failure/apply | shader build lifecycle bridge | `ShaderBuildPrepared`, `ShaderBuildFailed`, `ShaderBuildApplied`, `CompileStatusChanged` | `fifo-fact` | +| render snapshot publication | snapshot bridge | `RenderSnapshotPublishRequested`, `RenderSnapshotPublished` | request may coalesce by output dimensions; published event is `fifo-fact` | +| render reset request/application | render bridge | `RenderResetRequested`, `RenderResetApplied` | `fifo-fact` | +| input signal changes | backend observation bridge | `InputSignalChanged` | `coalesced-latest` by signal lane | +| output late/dropped/completed frames | backend timing bridge | `OutputFrameCompleted`, `OutputLateFrameDetected`, `OutputDroppedFrameDetected` | late/dropped `fifo-fact`; high-rate completed frames may become `observation-only` coalesced metrics | +| warnings and recovery | telemetry bridge | `SubsystemWarningRaised`, `SubsystemWarningCleared`, `SubsystemRecovered` | `fifo-fact` for lifecycle transitions | +| queue depth/timing samples | telemetry metrics bridge | `QueueDepthChanged`, `TimingSampleRecorded` | `coalesced-latest` by metric key | + +### Bridge Rules + +- A bridge may translate an old direct call into an owner command, but it must publish the accepted/rejected/completed event that describes the outcome. +- A bridge must not mutate state owned by another subsystem just because it handles that subsystem's event. +- A coalesced event must have a stable key in code and a documented policy here. +- A FIFO event should be cheap enough that retaining every occurrence is useful. If not, turn it into a coalesced metric before putting it on a hot path. +- A synchronous bridge must treat event publication as a side effect of the owner decision, not as the mechanism that produces the direct caller's response. +- A compatibility poll adapter should be named as temporary in code so it does not become the new long-term coordination model. +- Handler failure should be reported through telemetry and dispatch metrics. It should not throw back across subsystem boundaries. + +### First Integration Recommendation + +The safest first behavior-changing bridge is `RuntimeStateBroadcastRequested`. + +It is low risk because: + +- it is already a side effect of many coordinator outcomes +- duplicate requests are naturally coalescable +- the handler can call the existing `ControlServices::BroadcastState()` path +- success can be verified through existing UI behavior and event tests + +After that, the next bridge should be `ShaderBuildRequested`, because it already behaves like a queued side effect and has clear follow-up events. + ## Target Flow Examples ### OSC Parameter Update diff --git a/tests/RuntimeEventTestHarness.h b/tests/RuntimeEventTestHarness.h new file mode 100644 index 0000000..2e68ffd --- /dev/null +++ b/tests/RuntimeEventTestHarness.h @@ -0,0 +1,121 @@ +#pragma once + +#include "RuntimeEventCoalescingQueue.h" +#include "RuntimeEventDispatcher.h" + +#include +#include +#include + +class RuntimeEventTestHarness +{ +public: + explicit RuntimeEventTestHarness(std::size_t dispatchQueueCapacity = 64, std::size_t coalescingQueueCapacity = 64) : + mDispatcher(dispatchQueueCapacity), + mCoalescingQueue(coalescingQueueCapacity) + { + mDispatcher.SubscribeAll([this](const RuntimeEvent& event) { + mSeenEvents.push_back(event); + }); + } + + RuntimeEventDispatcher& Dispatcher() + { + return mDispatcher; + } + + const RuntimeEventDispatcher& Dispatcher() const + { + return mDispatcher; + } + + RuntimeEventCoalescingQueue& CoalescingQueue() + { + return mCoalescingQueue; + } + + template + bool Publish(Payload payload, std::string source = {}) + { + return mDispatcher.PublishPayload(std::move(payload), std::move(source)); + } + + bool Publish(RuntimeEvent event) + { + return mDispatcher.Publish(std::move(event)); + } + + template + RuntimeEventDispatchResult PublishAndDispatch(Payload payload, std::string source = {}) + { + Publish(std::move(payload), std::move(source)); + return DispatchPending(); + } + + RuntimeEventDispatchResult DispatchPending(std::size_t maxEvents = 0) + { + return mDispatcher.DispatchPending(maxEvents); + } + + template + bool PublishCoalesced(Payload payload, std::string source = {}, uint64_t sequence = 0) + { + return mCoalescingQueue.Push(MakeRuntimeEvent(std::move(payload), std::move(source), sequence)); + } + + std::size_t FlushCoalescedToDispatcher(std::size_t maxEvents = 0) + { + std::vector events = mCoalescingQueue.Drain(maxEvents); + const std::size_t eventCount = events.size(); + for (RuntimeEvent& event : events) + mDispatcher.Publish(std::move(event)); + return eventCount; + } + + RuntimeEventDispatchResult FlushCoalescedAndDispatch(std::size_t maxEvents = 0) + { + FlushCoalescedToDispatcher(maxEvents); + return DispatchPending(); + } + + const std::vector& SeenEvents() const + { + return mSeenEvents; + } + + std::size_t SeenCount() const + { + return mSeenEvents.size(); + } + + std::size_t SeenCount(RuntimeEventType type) const + { + std::size_t count = 0; + for (const RuntimeEvent& event : mSeenEvents) + { + if (event.type == type) + ++count; + } + return count; + } + + const RuntimeEvent* LastSeen(RuntimeEventType type) const + { + for (auto it = mSeenEvents.rbegin(); it != mSeenEvents.rend(); ++it) + { + if (it->type == type) + return &(*it); + } + return nullptr; + } + + void ClearSeen() + { + mSeenEvents.clear(); + } + +private: + RuntimeEventDispatcher mDispatcher; + RuntimeEventCoalescingQueue mCoalescingQueue; + std::vector mSeenEvents; +}; diff --git a/tests/RuntimeEventTypeTests.cpp b/tests/RuntimeEventTypeTests.cpp new file mode 100644 index 0000000..16daf61 --- /dev/null +++ b/tests/RuntimeEventTypeTests.cpp @@ -0,0 +1,334 @@ +#include "RuntimeEvent.h" +#include "RuntimeEventCoalescingQueue.h" +#include "RuntimeEventDispatcher.h" +#include "RuntimeEventQueue.h" +#include "RuntimeEventType.h" +#include "RuntimeEventPayloads.h" +#include "RuntimeEventTestHarness.h" + +#include +#include +#include +#include +#include + +namespace +{ +int gFailures = 0; + +void Expect(bool condition, const char* message) +{ + if (condition) + return; + + std::cerr << "FAIL: " << message << "\n"; + ++gFailures; +} + +void TestRuntimeEventTypeNames() +{ + Expect(RuntimeEventTypeName(RuntimeEventType::Unknown) == "Unknown", "unknown event type has a stable name"); + Expect(RuntimeEventTypeName(RuntimeEventType::OscCommitRequested) == "OscCommitRequested", "control event name is stable"); + Expect(RuntimeEventTypeName(RuntimeEventType::RuntimeMutationAccepted) == "RuntimeMutationAccepted", "runtime event name is stable"); + Expect(RuntimeEventTypeName(RuntimeEventType::ShaderBuildPrepared) == "ShaderBuildPrepared", "shader build event name is stable"); + Expect(RuntimeEventTypeName(RuntimeEventType::RenderSnapshotPublished) == "RenderSnapshotPublished", "render event name is stable"); + Expect(RuntimeEventTypeName(RuntimeEventType::BackendStateChanged) == "BackendStateChanged", "backend event name is stable"); + Expect(RuntimeEventTypeName(RuntimeEventType::QueueDepthChanged) == "QueueDepthChanged", "telemetry event name is stable"); +} + +void TestRuntimeEventPayloadTypes() +{ + OscCommitRequestedEvent oscCommit; + oscCommit.routeKey = "layer-1\namount"; + oscCommit.layerKey = "layer-1"; + oscCommit.parameterKey = "amount"; + oscCommit.generation = 42; + Expect(RuntimeEventPayloadType(oscCommit) == RuntimeEventType::OscCommitRequested, "OSC commit payload maps to OSC commit event type"); + Expect(oscCommit.generation == 42, "OSC commit payload keeps generation"); + + RuntimeMutationEvent acceptedMutation; + acceptedMutation.action = "SetLayerShader"; + acceptedMutation.accepted = true; + acceptedMutation.shaderBuildRequested = true; + Expect(RuntimeEventPayloadType(acceptedMutation) == RuntimeEventType::RuntimeMutationAccepted, "accepted mutation payload maps to accepted event type"); + Expect(acceptedMutation.shaderBuildRequested, "mutation payload carries shader build follow-up"); + + RuntimeMutationEvent rejectedMutation; + rejectedMutation.accepted = false; + rejectedMutation.errorMessage = "Unknown layer."; + Expect(RuntimeEventPayloadType(rejectedMutation) == RuntimeEventType::RuntimeMutationRejected, "rejected mutation payload maps to rejected event type"); + Expect(rejectedMutation.errorMessage == "Unknown layer.", "mutation payload carries rejection error"); + + ShaderBuildEvent preparedBuild; + preparedBuild.phase = RuntimeEventShaderBuildPhase::Prepared; + preparedBuild.inputWidth = 1920; + preparedBuild.inputHeight = 1080; + Expect(RuntimeEventPayloadType(preparedBuild) == RuntimeEventType::ShaderBuildPrepared, "shader build payload maps by phase"); + Expect(preparedBuild.inputWidth == 1920 && preparedBuild.inputHeight == 1080, "shader build payload carries input dimensions"); + + RenderResetEvent appliedReset; + appliedReset.scope = RuntimeEventRenderResetScope::TemporalHistoryAndFeedback; + appliedReset.applied = true; + Expect(RuntimeEventPayloadType(appliedReset) == RuntimeEventType::RenderResetApplied, "render reset payload maps applied state"); + Expect(appliedReset.scope == RuntimeEventRenderResetScope::TemporalHistoryAndFeedback, "render reset payload carries reset scope"); + + SubsystemWarningEvent warning; + warning.subsystem = "VideoBackend"; + warning.warningKey = "late-frame"; + Expect(RuntimeEventPayloadType(warning) == RuntimeEventType::SubsystemWarningRaised, "warning payload maps to raised event by default"); + warning.cleared = true; + Expect(RuntimeEventPayloadType(warning) == RuntimeEventType::SubsystemWarningCleared, "warning payload maps to cleared event when marked cleared"); +} + +void TestRuntimeEventEnvelope() +{ + const auto createdAt = std::chrono::steady_clock::now(); + + OscCommitRequestedEvent oscCommit; + oscCommit.routeKey = "layer-1\namount"; + oscCommit.layerKey = "layer-1"; + oscCommit.parameterKey = "amount"; + oscCommit.generation = 7; + + RuntimeEvent event = MakeRuntimeEvent(oscCommit, "ControlServices", 12, createdAt); + Expect(event.type == RuntimeEventType::OscCommitRequested, "runtime event infers type from payload"); + Expect(event.sequence == 12, "runtime event stores sequence"); + Expect(event.source == "ControlServices", "runtime event stores source"); + Expect(event.createdAt == createdAt, "runtime event stores creation time"); + Expect(event.HasPayload(), "runtime event reports payload presence"); + Expect(event.PayloadMatchesType(), "runtime event payload matches inferred type"); + + const auto* payload = std::get_if(&event.payload); + Expect(payload && payload->generation == 7, "runtime event stores typed payload in variant"); + + event.type = RuntimeEventType::RuntimeMutationAccepted; + Expect(!event.PayloadMatchesType(), "runtime event detects mismatched payload and type"); + + RuntimeEvent empty; + Expect(!empty.HasPayload(), "default runtime event has no payload"); + Expect(empty.PayloadMatchesType(), "default runtime event unknown type matches empty payload"); + + RuntimeMutationEvent acceptedMutation; + acceptedMutation.accepted = true; + acceptedMutation.action = "SetLayerBypass"; + RuntimeEvent acceptedEvent = MakeRuntimeEvent(acceptedMutation, "RuntimeCoordinator", 13, createdAt); + Expect(acceptedEvent.type == RuntimeEventType::RuntimeMutationAccepted, "runtime event handles payloads with dynamic accepted mapping"); + + RuntimeMutationEvent rejectedMutation; + rejectedMutation.accepted = false; + rejectedMutation.action = "SetLayerBypass"; + RuntimeEvent rejectedEvent = MakeRuntimeEvent(rejectedMutation, "RuntimeCoordinator", 14, createdAt); + Expect(rejectedEvent.type == RuntimeEventType::RuntimeMutationRejected, "runtime event handles payloads with dynamic rejected mapping"); +} + +void TestRuntimeEventQueue() +{ + RuntimeEventQueue queue(2); + const auto createdAt = std::chrono::steady_clock::now() - std::chrono::milliseconds(5); + + RuntimeStateBroadcastRequestedEvent firstPayload; + firstPayload.reason = "first"; + RuntimeStateBroadcastRequestedEvent secondPayload; + secondPayload.reason = "second"; + RuntimeStateBroadcastRequestedEvent thirdPayload; + thirdPayload.reason = "third"; + + Expect(queue.Push(MakeRuntimeEvent(firstPayload, "test", 1, createdAt)), "queue accepts first event"); + Expect(queue.Push(MakeRuntimeEvent(secondPayload, "test", 2, createdAt)), "queue accepts second event"); + Expect(!queue.Push(MakeRuntimeEvent(thirdPayload, "test", 3, createdAt)), "queue rejects event when capacity is full"); + + RuntimeEventQueueMetrics fullMetrics = queue.GetMetrics(std::chrono::steady_clock::now()); + Expect(fullMetrics.depth == 2, "queue metrics report depth"); + Expect(fullMetrics.capacity == 2, "queue metrics report capacity"); + Expect(fullMetrics.droppedCount == 1, "queue metrics report dropped count"); + Expect(fullMetrics.oldestEventAgeMilliseconds >= 0.0, "queue metrics report oldest event age"); + + RuntimeEvent event; + Expect(queue.TryPop(event), "queue pops first event"); + Expect(event.sequence == 1, "queue preserves FIFO order"); + + std::vector drained = queue.Drain(); + Expect(drained.size() == 1, "queue drains remaining event"); + Expect(drained[0].sequence == 2, "drained event preserves FIFO order"); + Expect(queue.Depth() == 0, "queue is empty after drain"); +} + +void TestRuntimeEventDispatcher() +{ + RuntimeEventDispatcher dispatcher(4); + int allHandlerCount = 0; + int broadcastHandlerCount = 0; + int failureHandlerCount = 0; + + dispatcher.SubscribeAll([&](const RuntimeEvent& event) { + Expect(event.sequence != 0, "dispatcher assigns sequence before all-handler dispatch"); + ++allHandlerCount; + }); + dispatcher.Subscribe(RuntimeEventType::RuntimeStateBroadcastRequested, [&](const RuntimeEvent& event) { + Expect(event.type == RuntimeEventType::RuntimeStateBroadcastRequested, "dispatcher invokes type-specific handler for matching event"); + ++broadcastHandlerCount; + }); + dispatcher.Subscribe(RuntimeEventType::RuntimeStateBroadcastRequested, [&](const RuntimeEvent&) { + ++failureHandlerCount; + throw std::runtime_error("test handler failure"); + }); + + RuntimeStateBroadcastRequestedEvent broadcast; + broadcast.reason = "test"; + Expect(dispatcher.PublishPayload(broadcast, "test"), "dispatcher publishes payload"); + Expect(dispatcher.QueueDepth() == 1, "dispatcher reports queued depth"); + + RuntimeEventDispatchResult result = dispatcher.DispatchPending(); + Expect(result.dispatchedEvents == 1, "dispatcher reports dispatched event count"); + Expect(result.handlerInvocations == 3, "dispatcher reports handler invocation count"); + Expect(result.handlerFailures == 1, "dispatcher catches and reports handler failures"); + Expect(allHandlerCount == 1, "dispatcher invoked all-handler"); + Expect(broadcastHandlerCount == 1, "dispatcher invoked type-specific handler"); + Expect(failureHandlerCount == 1, "dispatcher invoked failing handler"); + Expect(dispatcher.QueueDepth() == 0, "dispatcher queue is empty after dispatch"); + + RuntimeEvent mismatched = MakeRuntimeEvent(broadcast, "test"); + mismatched.type = RuntimeEventType::ShaderBuildRequested; + Expect(!dispatcher.Publish(mismatched), "dispatcher rejects mismatched event type and payload"); + + RuntimeEventDispatcher tinyDispatcher(1); + Expect(tinyDispatcher.PublishPayload(broadcast, "test"), "tiny dispatcher accepts first event"); + Expect(!tinyDispatcher.PublishPayload(broadcast, "test"), "tiny dispatcher rejects event when queue is full"); + RuntimeEventQueueMetrics metrics = tinyDispatcher.GetQueueMetrics(); + Expect(metrics.droppedCount == 1, "dispatcher exposes queue drop metrics"); +} + +void TestRuntimeEventCoalescingQueue() +{ + RuntimeEventCoalescingQueue queue(2); + const auto createdAt = std::chrono::steady_clock::now() - std::chrono::milliseconds(5); + + OscValueReceivedEvent first; + first.routeKey = "layer-1\namount"; + first.layerKey = "layer-1"; + first.parameterKey = "amount"; + first.valueJson = "0.1"; + first.generation = 1; + + OscValueReceivedEvent replacement = first; + replacement.valueJson = "0.9"; + replacement.generation = 2; + + OscValueReceivedEvent otherRoute; + otherRoute.routeKey = "layer-2\namount"; + otherRoute.layerKey = "layer-2"; + otherRoute.parameterKey = "amount"; + otherRoute.valueJson = "0.3"; + otherRoute.generation = 3; + + OscValueReceivedEvent overflow; + overflow.routeKey = "layer-3\namount"; + overflow.layerKey = "layer-3"; + overflow.parameterKey = "amount"; + overflow.valueJson = "0.4"; + overflow.generation = 4; + + Expect(queue.Push(MakeRuntimeEvent(first, "ControlServices", 1, createdAt)), "coalescing queue accepts first keyed event"); + Expect(queue.Push(MakeRuntimeEvent(replacement, "ControlServices", 2, std::chrono::steady_clock::now())), "coalescing queue replaces matching keyed event"); + Expect(queue.Push(MakeRuntimeEvent(otherRoute, "ControlServices", 3, createdAt)), "coalescing queue accepts second keyed event"); + Expect(!queue.Push(MakeRuntimeEvent(overflow, "ControlServices", 4, createdAt)), "coalescing queue rejects new key when full"); + + RuntimeEventCoalescingQueueMetrics metrics = queue.GetMetrics(std::chrono::steady_clock::now()); + Expect(metrics.depth == 2, "coalescing queue metrics report unique-key depth"); + Expect(metrics.capacity == 2, "coalescing queue metrics report capacity"); + Expect(metrics.coalescedCount == 1, "coalescing queue metrics report coalesced count"); + Expect(metrics.droppedCount == 1, "coalescing queue metrics report dropped count"); + Expect(metrics.oldestEventAgeMilliseconds >= 0.0, "coalescing queue metrics report oldest event age"); + + std::vector drained = queue.Drain(); + Expect(drained.size() == 2, "coalescing queue drains unique events"); + Expect(drained[0].sequence == 2, "coalescing queue keeps latest replacement event"); + Expect(drained[1].sequence == 3, "coalescing queue preserves first-seen key order"); + + const auto* latestPayload = std::get_if(&drained[0].payload); + Expect(latestPayload && latestPayload->valueJson == "0.9", "coalescing queue keeps latest payload value"); + Expect(queue.Depth() == 0, "coalescing queue is empty after drain"); +} + +void TestRuntimeEventCoalescingCustomKey() +{ + RuntimeEventCoalescingQueue queue(4, [](const RuntimeEvent& event) { + return std::string(RuntimeEventTypeName(event.type)); + }); + + RuntimeStateBroadcastRequestedEvent first; + first.reason = "parameter"; + RuntimeStateBroadcastRequestedEvent second; + second.reason = "reload"; + + Expect(queue.Push(MakeRuntimeEvent(first, "RuntimeCoordinator", 10)), "custom-key coalescing queue accepts first event"); + Expect(queue.Push(MakeRuntimeEvent(second, "RuntimeCoordinator", 11)), "custom-key coalescing queue coalesces second event by type"); + + std::vector drained = queue.Drain(); + Expect(drained.size() == 1, "custom-key coalescing queue drains one coalesced event"); + Expect(drained[0].sequence == 11, "custom-key coalescing queue keeps latest event"); + + const auto* payload = std::get_if(&drained[0].payload); + Expect(payload && payload->reason == "reload", "custom-key coalescing queue keeps latest typed payload"); +} + +void TestRuntimeEventTestHarness() +{ + RuntimeEventTestHarness harness; + + RuntimeStateBroadcastRequestedEvent broadcast; + broadcast.reason = "parameter"; + RuntimeEventDispatchResult firstDispatch = harness.PublishAndDispatch(broadcast, "RuntimeCoordinator"); + Expect(firstDispatch.dispatchedEvents == 1, "test harness publishes and dispatches payloads"); + Expect(harness.SeenCount() == 1, "test harness records dispatched events"); + Expect(harness.SeenCount(RuntimeEventType::RuntimeStateBroadcastRequested) == 1, "test harness counts seen events by type"); + + const RuntimeEvent* seenBroadcast = harness.LastSeen(RuntimeEventType::RuntimeStateBroadcastRequested); + Expect(seenBroadcast && seenBroadcast->source == "RuntimeCoordinator", "test harness returns last seen event by type"); + + harness.ClearSeen(); + Expect(harness.SeenCount() == 0, "test harness clears seen events"); + + OscValueReceivedEvent first; + first.routeKey = "layer-1\namount"; + first.layerKey = "layer-1"; + first.parameterKey = "amount"; + first.valueJson = "0.1"; + first.generation = 1; + + OscValueReceivedEvent replacement = first; + replacement.valueJson = "0.8"; + replacement.generation = 2; + + Expect(harness.PublishCoalesced(first, "ControlServices", 20), "test harness accepts first coalesced payload"); + Expect(harness.PublishCoalesced(replacement, "ControlServices", 21), "test harness accepts replacement coalesced payload"); + RuntimeEventDispatchResult coalescedDispatch = harness.FlushCoalescedAndDispatch(); + Expect(coalescedDispatch.dispatchedEvents == 1, "test harness dispatches one coalesced event"); + Expect(harness.SeenCount(RuntimeEventType::OscValueReceived) == 1, "test harness records coalesced event"); + + const RuntimeEvent* seenOsc = harness.LastSeen(RuntimeEventType::OscValueReceived); + const auto* seenPayload = seenOsc ? std::get_if(&seenOsc->payload) : nullptr; + Expect(seenPayload && seenPayload->valueJson == "0.8", "test harness keeps latest coalesced payload"); +} +} + +int main() +{ + TestRuntimeEventTypeNames(); + TestRuntimeEventPayloadTypes(); + TestRuntimeEventEnvelope(); + TestRuntimeEventQueue(); + TestRuntimeEventDispatcher(); + TestRuntimeEventCoalescingQueue(); + TestRuntimeEventCoalescingCustomKey(); + TestRuntimeEventTestHarness(); + + if (gFailures != 0) + { + std::cerr << gFailures << " RuntimeEventType test failure(s).\n"; + return 1; + } + + std::cout << "RuntimeEventType tests passed.\n"; + return 0; +}