Files
2026-05-11 19:09:01 +10:00

344 lines
8.9 KiB
C++

#include "ControlServices.h"
#include "ControlServer.h"
#include "OscServer.h"
#include "RuntimeControlBridge.h"
#include "RuntimeEventDispatcher.h"
#include "RuntimeStore.h"
#include <windows.h>
namespace
{
constexpr auto kCompatibilityPollFallbackInterval = std::chrono::milliseconds(250);
}
ControlServices::ControlServices(RuntimeEventDispatcher& runtimeEventDispatcher) :
mControlServer(std::make_unique<ControlServer>()),
mOscServer(std::make_unique<OscServer>()),
mRuntimeEventDispatcher(runtimeEventDispatcher),
mPollRunning(false)
{
}
ControlServices::~ControlServices()
{
Stop();
}
bool ControlServices::Start(OpenGLComposite& composite, RuntimeStore& runtimeStore, std::string& error)
{
Stop();
if (!StartControlServicesBoundary(composite, runtimeStore, *this, *mControlServer, *mOscServer, error))
{
Stop();
return false;
}
return true;
}
void ControlServices::BeginPolling(RuntimeCoordinator& runtimeCoordinator)
{
StartPolling(runtimeCoordinator);
}
void ControlServices::Stop()
{
StopPolling();
if (mOscServer)
mOscServer->Stop();
if (mControlServer)
mControlServer->Stop();
}
void ControlServices::BroadcastState()
{
if (mControlServer)
mControlServer->BroadcastState();
}
void ControlServices::RequestBroadcastState()
{
PublishRuntimeStateBroadcastRequested("control-service-request");
if (mControlServer)
mControlServer->RequestBroadcastState();
}
bool ControlServices::QueueOscUpdate(const std::string& layerKey, const std::string& parameterKey, const std::string& valueJson, std::string& error)
{
(void)error;
PendingOscUpdate update;
update.layerKey = layerKey;
update.parameterKey = parameterKey;
update.valueJson = valueJson;
const std::string routeKey = layerKey + "\n" + parameterKey;
{
std::lock_guard<std::mutex> lock(mPendingOscMutex);
mPendingOscUpdates[routeKey] = std::move(update);
}
return true;
}
bool ControlServices::ApplyPendingOscUpdates(std::vector<AppliedOscUpdate>& appliedUpdates, std::string& error)
{
appliedUpdates.clear();
std::map<std::string, PendingOscUpdate> pending;
{
std::lock_guard<std::mutex> lock(mPendingOscMutex);
if (mPendingOscUpdates.empty())
return true;
pending.swap(mPendingOscUpdates);
}
for (const auto& entry : pending)
{
JsonValue targetValue;
std::string parseError;
if (!ParseJson(entry.second.valueJson, targetValue, parseError))
{
OutputDebugStringA(("OSC queued value parse failed: " + parseError + "\n").c_str());
continue;
}
AppliedOscUpdate appliedUpdate;
appliedUpdate.routeKey = entry.first;
appliedUpdate.layerKey = entry.second.layerKey;
appliedUpdate.parameterKey = entry.second.parameterKey;
appliedUpdate.targetValue = targetValue;
appliedUpdates.push_back(std::move(appliedUpdate));
PublishOscValueReceived(entry.second, entry.first);
}
(void)error;
return true;
}
bool ControlServices::QueueOscCommit(const std::string& routeKey, const std::string& layerKey, const std::string& parameterKey, const JsonValue& value, uint64_t generation, std::string& error)
{
(void)error;
PendingOscCommit commit;
commit.routeKey = routeKey;
commit.layerKey = layerKey;
commit.parameterKey = parameterKey;
commit.value = value;
commit.generation = generation;
{
std::lock_guard<std::mutex> lock(mPendingOscCommitMutex);
mPendingOscCommits[routeKey] = std::move(commit);
}
WakePolling();
return true;
}
void ControlServices::ClearOscState()
{
{
std::lock_guard<std::mutex> lock(mPendingOscMutex);
mPendingOscUpdates.clear();
}
{
std::lock_guard<std::mutex> lock(mPendingOscCommitMutex);
mPendingOscCommits.clear();
}
{
std::lock_guard<std::mutex> lock(mCompletedOscCommitMutex);
mCompletedOscCommits.clear();
}
}
void ControlServices::ClearOscStateForLayerKey(const std::string& layerKey)
{
{
std::lock_guard<std::mutex> lock(mPendingOscMutex);
for (auto it = mPendingOscUpdates.begin(); it != mPendingOscUpdates.end();)
{
if (it->second.layerKey == layerKey)
it = mPendingOscUpdates.erase(it);
else
++it;
}
}
{
std::lock_guard<std::mutex> lock(mPendingOscCommitMutex);
for (auto it = mPendingOscCommits.begin(); it != mPendingOscCommits.end();)
{
if (it->second.layerKey == layerKey)
it = mPendingOscCommits.erase(it);
else
++it;
}
}
{
std::lock_guard<std::mutex> lock(mCompletedOscCommitMutex);
for (auto it = mCompletedOscCommits.begin(); it != mCompletedOscCommits.end();)
{
if (it->routeKey.rfind(layerKey + "\n", 0) == 0)
it = mCompletedOscCommits.erase(it);
else
++it;
}
}
}
void ControlServices::ConsumeCompletedOscCommits(std::vector<CompletedOscCommit>& completedCommits)
{
completedCommits.clear();
std::lock_guard<std::mutex> lock(mCompletedOscCommitMutex);
if (mCompletedOscCommits.empty())
return;
completedCommits.swap(mCompletedOscCommits);
}
void ControlServices::StartPolling(RuntimeCoordinator& runtimeCoordinator)
{
if (mPollRunning.exchange(true))
return;
mPollThread = std::thread([this, &runtimeCoordinator]() { PollLoop(runtimeCoordinator); });
}
void ControlServices::StopPolling()
{
if (!mPollRunning.exchange(false))
return;
WakePolling();
if (mPollThread.joinable())
mPollThread.join();
}
void ControlServices::PollLoop(RuntimeCoordinator& runtimeCoordinator)
{
while (mPollRunning)
{
std::map<std::string, PendingOscCommit> pendingCommits;
{
std::lock_guard<std::mutex> lock(mPendingOscCommitMutex);
pendingCommits.swap(mPendingOscCommits);
}
for (const auto& entry : pendingCommits)
{
PublishOscCommitRequested(entry.second);
const RuntimeCoordinatorResult result = runtimeCoordinator.CommitOscParameterByControlKey(
entry.second.layerKey,
entry.second.parameterKey,
entry.second.value);
if (result.accepted)
{
CompletedOscCommit completedCommit;
completedCommit.routeKey = entry.second.routeKey;
completedCommit.generation = entry.second.generation;
std::lock_guard<std::mutex> lock(mCompletedOscCommitMutex);
mCompletedOscCommits.push_back(std::move(completedCommit));
PublishOscOverlaySettled(entry.second);
}
else if (!result.errorMessage.empty())
{
OutputDebugStringA(("OSC commit failed: " + result.errorMessage + "\n").c_str());
}
}
bool registryChanged = false;
const RuntimeCoordinatorResult pollResult = runtimeCoordinator.PollRuntimeStoreChanges(registryChanged);
if (pollResult.compileStatusChanged && !pollResult.compileStatusSucceeded && !pollResult.compileStatusMessage.empty())
OutputDebugStringA(("Runtime poll failed: " + pollResult.compileStatusMessage + "\n").c_str());
std::unique_lock<std::mutex> wakeLock(mPollWakeMutex);
mPollWakeCondition.wait_for(wakeLock, kCompatibilityPollFallbackInterval, [this]() {
return !mPollRunning.load() || mPollWakeRequested;
});
mPollWakeRequested = false;
}
}
void ControlServices::WakePolling()
{
{
std::lock_guard<std::mutex> lock(mPollWakeMutex);
mPollWakeRequested = true;
}
mPollWakeCondition.notify_one();
}
void ControlServices::PublishRuntimeStateBroadcastRequested(const std::string& reason)
{
try
{
RuntimeStateBroadcastRequestedEvent event;
event.reason = reason;
if (!mRuntimeEventDispatcher.PublishPayload(event, "ControlServices"))
OutputDebugStringA("RuntimeStateBroadcastRequested event publish failed.\n");
}
catch (...)
{
OutputDebugStringA("RuntimeStateBroadcastRequested event publish threw.\n");
}
}
void ControlServices::PublishOscValueReceived(const PendingOscUpdate& update, const std::string& routeKey)
{
try
{
OscValueReceivedEvent event;
event.routeKey = routeKey;
event.layerKey = update.layerKey;
event.parameterKey = update.parameterKey;
event.valueJson = update.valueJson;
if (!mRuntimeEventDispatcher.PublishPayload(event, "ControlServices"))
OutputDebugStringA("OscValueReceived event publish failed.\n");
}
catch (...)
{
OutputDebugStringA("OscValueReceived event publish threw.\n");
}
}
void ControlServices::PublishOscCommitRequested(const PendingOscCommit& commit)
{
try
{
OscCommitRequestedEvent event;
event.routeKey = commit.routeKey;
event.layerKey = commit.layerKey;
event.parameterKey = commit.parameterKey;
event.valueJson = SerializeJson(commit.value, false);
event.generation = commit.generation;
if (!mRuntimeEventDispatcher.PublishPayload(event, "ControlServices"))
OutputDebugStringA("OscCommitRequested event publish failed.\n");
}
catch (...)
{
OutputDebugStringA("OscCommitRequested event publish threw.\n");
}
}
void ControlServices::PublishOscOverlaySettled(const PendingOscCommit& commit)
{
try
{
OscOverlayEvent event;
event.routeKey = commit.routeKey;
event.layerKey = commit.layerKey;
event.parameterKey = commit.parameterKey;
event.generation = commit.generation;
event.settled = true;
if (!mRuntimeEventDispatcher.PublishPayload(event, "ControlServices"))
OutputDebugStringA("OscOverlaySettled event publish failed.\n");
}
catch (...)
{
OutputDebugStringA("OscOverlaySettled event publish threw.\n");
}
}