#include "ControlServices.h" #include "ControlServer.h" #include "OscServer.h" #include "RuntimeControlBridge.h" #include "RuntimeEventDispatcher.h" #include "RuntimeStore.h" #include namespace { constexpr auto kCompatibilityPollFallbackInterval = std::chrono::milliseconds(250); } ControlServices::ControlServices(RuntimeEventDispatcher& runtimeEventDispatcher) : mControlServer(std::make_unique()), mOscServer(std::make_unique()), mRuntimeEventDispatcher(runtimeEventDispatcher), mPollRunning(false) { } ControlServices::~ControlServices() { Stop(); } bool ControlServices::Start(OpenGLComposite& composite, RuntimeStore& runtimeStore, std::string& error) { Stop(); if (!StartControlServicesBoundary(composite, runtimeStore, *this, *mControlServer, *mOscServer, error)) { Stop(); return false; } return true; } void ControlServices::BeginPolling(RuntimeCoordinator& runtimeCoordinator) { StartPolling(runtimeCoordinator); } void ControlServices::Stop() { StopPolling(); if (mOscServer) mOscServer->Stop(); if (mControlServer) mControlServer->Stop(); } void ControlServices::BroadcastState() { if (mControlServer) mControlServer->BroadcastState(); } void ControlServices::RequestBroadcastState() { PublishRuntimeStateBroadcastRequested("control-service-request"); if (mControlServer) mControlServer->RequestBroadcastState(); } bool ControlServices::QueueOscUpdate(const std::string& layerKey, const std::string& parameterKey, const std::string& valueJson, std::string& error) { (void)error; PendingOscUpdate update; update.layerKey = layerKey; update.parameterKey = parameterKey; update.valueJson = valueJson; const std::string routeKey = layerKey + "\n" + parameterKey; { std::lock_guard lock(mPendingOscMutex); mPendingOscUpdates[routeKey] = std::move(update); } return true; } bool ControlServices::ApplyPendingOscUpdates(std::vector& appliedUpdates, std::string& error) { appliedUpdates.clear(); std::map pending; { std::lock_guard lock(mPendingOscMutex); if (mPendingOscUpdates.empty()) return true; pending.swap(mPendingOscUpdates); } for (const auto& entry : pending) { JsonValue targetValue; std::string parseError; if (!ParseJson(entry.second.valueJson, targetValue, parseError)) { OutputDebugStringA(("OSC queued value parse failed: " + parseError + "\n").c_str()); continue; } AppliedOscUpdate appliedUpdate; appliedUpdate.routeKey = entry.first; appliedUpdate.layerKey = entry.second.layerKey; appliedUpdate.parameterKey = entry.second.parameterKey; appliedUpdate.targetValue = targetValue; appliedUpdates.push_back(std::move(appliedUpdate)); PublishOscValueReceived(entry.second, entry.first); } (void)error; return true; } bool ControlServices::QueueOscCommit(const std::string& routeKey, const std::string& layerKey, const std::string& parameterKey, const JsonValue& value, uint64_t generation, std::string& error) { (void)error; PendingOscCommit commit; commit.routeKey = routeKey; commit.layerKey = layerKey; commit.parameterKey = parameterKey; commit.value = value; commit.generation = generation; { std::lock_guard lock(mPendingOscCommitMutex); mPendingOscCommits[routeKey] = std::move(commit); } WakePolling(); return true; } void ControlServices::ClearOscState() { { std::lock_guard lock(mPendingOscMutex); mPendingOscUpdates.clear(); } { std::lock_guard lock(mPendingOscCommitMutex); mPendingOscCommits.clear(); } { std::lock_guard lock(mCompletedOscCommitMutex); mCompletedOscCommits.clear(); } } void ControlServices::ConsumeCompletedOscCommits(std::vector& completedCommits) { completedCommits.clear(); std::lock_guard lock(mCompletedOscCommitMutex); if (mCompletedOscCommits.empty()) return; completedCommits.swap(mCompletedOscCommits); } void ControlServices::StartPolling(RuntimeCoordinator& runtimeCoordinator) { if (mPollRunning.exchange(true)) return; mPollThread = std::thread([this, &runtimeCoordinator]() { PollLoop(runtimeCoordinator); }); } void ControlServices::StopPolling() { if (!mPollRunning.exchange(false)) return; WakePolling(); if (mPollThread.joinable()) mPollThread.join(); } void ControlServices::PollLoop(RuntimeCoordinator& runtimeCoordinator) { while (mPollRunning) { std::map pendingCommits; { std::lock_guard lock(mPendingOscCommitMutex); pendingCommits.swap(mPendingOscCommits); } for (const auto& entry : pendingCommits) { PublishOscCommitRequested(entry.second); const RuntimeCoordinatorResult result = runtimeCoordinator.CommitOscParameterByControlKey( entry.second.layerKey, entry.second.parameterKey, entry.second.value); if (result.accepted) { CompletedOscCommit completedCommit; completedCommit.routeKey = entry.second.routeKey; completedCommit.generation = entry.second.generation; std::lock_guard lock(mCompletedOscCommitMutex); mCompletedOscCommits.push_back(std::move(completedCommit)); } else if (!result.errorMessage.empty()) { OutputDebugStringA(("OSC commit failed: " + result.errorMessage + "\n").c_str()); } } bool registryChanged = false; const RuntimeCoordinatorResult pollResult = runtimeCoordinator.PollRuntimeStoreChanges(registryChanged); if (pollResult.compileStatusChanged && !pollResult.compileStatusSucceeded && !pollResult.compileStatusMessage.empty()) OutputDebugStringA(("Runtime poll failed: " + pollResult.compileStatusMessage + "\n").c_str()); std::unique_lock wakeLock(mPollWakeMutex); mPollWakeCondition.wait_for(wakeLock, kCompatibilityPollFallbackInterval, [this]() { return !mPollRunning.load() || mPollWakeRequested; }); mPollWakeRequested = false; } } void ControlServices::WakePolling() { { std::lock_guard lock(mPollWakeMutex); mPollWakeRequested = true; } mPollWakeCondition.notify_one(); } void ControlServices::PublishRuntimeStateBroadcastRequested(const std::string& reason) { try { RuntimeStateBroadcastRequestedEvent event; event.reason = reason; if (!mRuntimeEventDispatcher.PublishPayload(event, "ControlServices")) OutputDebugStringA("RuntimeStateBroadcastRequested event publish failed.\n"); } catch (...) { OutputDebugStringA("RuntimeStateBroadcastRequested event publish threw.\n"); } } void ControlServices::PublishOscValueReceived(const PendingOscUpdate& update, const std::string& routeKey) { try { OscValueReceivedEvent event; event.routeKey = routeKey; event.layerKey = update.layerKey; event.parameterKey = update.parameterKey; event.valueJson = update.valueJson; if (!mRuntimeEventDispatcher.PublishPayload(event, "ControlServices")) OutputDebugStringA("OscValueReceived event publish failed.\n"); } catch (...) { OutputDebugStringA("OscValueReceived event publish threw.\n"); } } void ControlServices::PublishOscCommitRequested(const PendingOscCommit& commit) { try { OscCommitRequestedEvent event; event.routeKey = commit.routeKey; event.layerKey = commit.layerKey; event.parameterKey = commit.parameterKey; event.valueJson = SerializeJson(commit.value, false); event.generation = commit.generation; if (!mRuntimeEventDispatcher.PublishPayload(event, "ControlServices")) OutputDebugStringA("OscCommitRequested event publish failed.\n"); } catch (...) { OutputDebugStringA("OscCommitRequested event publish threw.\n"); } }