4 Commits

Author SHA1 Message Date
Aiden
1d08dec5fe step 6
Some checks failed
CI / React UI Build (push) Successful in 10s
CI / Native Windows Build And Tests (push) Successful in 2m44s
CI / Windows Release Package (push) Has been cancelled
2026-05-11 20:06:14 +10:00
Aiden
0d57920bc1 step 5 2026-05-11 20:02:26 +10:00
Aiden
1629dbc77a step 4 2026-05-11 19:58:14 +10:00
Aiden
205c90e52e Step 3 2026-05-11 19:53:31 +10:00
15 changed files with 796 additions and 42 deletions

View File

@@ -345,6 +345,23 @@ endif()
add_test(NAME RuntimeStateLayerModelTests COMMAND RuntimeStateLayerModelTests) add_test(NAME RuntimeStateLayerModelTests COMMAND RuntimeStateLayerModelTests)
add_executable(PersistenceWriterTests
"${APP_DIR}/runtime/persistence/PersistenceWriter.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/tests/PersistenceWriterTests.cpp"
)
target_include_directories(PersistenceWriterTests PRIVATE
"${APP_DIR}"
"${APP_DIR}/runtime"
"${APP_DIR}/runtime/persistence"
)
if(MSVC)
target_compile_options(PersistenceWriterTests PRIVATE /W3)
endif()
add_test(NAME PersistenceWriterTests COMMAND PersistenceWriterTests)
add_executable(RuntimeSubsystemTests add_executable(RuntimeSubsystemTests
"${APP_DIR}/runtime/coordination/RuntimeCoordinator.cpp" "${APP_DIR}/runtime/coordination/RuntimeCoordinator.cpp"
"${APP_DIR}/runtime/live/CommittedLiveState.cpp" "${APP_DIR}/runtime/live/CommittedLiveState.cpp"

View File

@@ -58,6 +58,12 @@ OpenGLComposite::~OpenGLComposite()
mShaderBuildQueue->Stop(); mShaderBuildQueue->Stop();
if (mVideoBackend) if (mVideoBackend)
mVideoBackend->ReleaseResources(); mVideoBackend->ReleaseResources();
if (mRuntimeStore)
{
std::string persistenceError;
if (!mRuntimeStore->FlushPersistenceForShutdown(std::chrono::seconds(2), persistenceError))
OutputDebugStringA((std::string("Persistence shutdown flush failed: ") + persistenceError + "\n").c_str());
}
} }
bool OpenGLComposite::InitDeckLink() bool OpenGLComposite::InitDeckLink()
@@ -277,6 +283,13 @@ bool OpenGLComposite::Stop()
if (mRenderEngine) if (mRenderEngine)
mRenderEngine->StopRenderThread(); mRenderEngine->StopRenderThread();
if (mRuntimeStore)
{
std::string persistenceError;
if (!mRuntimeStore->FlushPersistenceForShutdown(std::chrono::seconds(2), persistenceError))
OutputDebugStringA((std::string("Persistence shutdown flush failed: ") + persistenceError + "\n").c_str());
}
return true; return true;
} }

View File

@@ -48,6 +48,9 @@ RuntimeUpdateController::RuntimeUpdateController(
mRuntimeEventDispatcher.Subscribe( mRuntimeEventDispatcher.Subscribe(
RuntimeEventType::RuntimeReloadRequested, RuntimeEventType::RuntimeReloadRequested,
[this](const RuntimeEvent& event) { HandleRuntimeReloadRequested(event); }); [this](const RuntimeEvent& event) { HandleRuntimeReloadRequested(event); });
mRuntimeEventDispatcher.Subscribe(
RuntimeEventType::RuntimePersistenceRequested,
[this](const RuntimeEvent& event) { HandleRuntimePersistenceRequested(event); });
mRuntimeEventDispatcher.Subscribe( mRuntimeEventDispatcher.Subscribe(
RuntimeEventType::ShaderBuildRequested, RuntimeEventType::ShaderBuildRequested,
[this](const RuntimeEvent& event) { HandleShaderBuildRequested(event); }); [this](const RuntimeEvent& event) { HandleShaderBuildRequested(event); });
@@ -158,6 +161,16 @@ void RuntimeUpdateController::HandleRuntimeReloadRequested(const RuntimeEvent& e
mRuntimeStore.ClearReloadRequest(); mRuntimeStore.ClearReloadRequest();
} }
void RuntimeUpdateController::HandleRuntimePersistenceRequested(const RuntimeEvent& event)
{
const RuntimePersistenceRequestedEvent* payload = std::get_if<RuntimePersistenceRequestedEvent>(&event.payload);
if (!payload)
return;
std::string error;
mRuntimeStore.RequestPersistence(payload->request, error);
}
void RuntimeUpdateController::HandleShaderBuildRequested(const RuntimeEvent& event) void RuntimeUpdateController::HandleShaderBuildRequested(const RuntimeEvent& event)
{ {
const ShaderBuildEvent* payload = std::get_if<ShaderBuildEvent>(&event.payload); const ShaderBuildEvent* payload = std::get_if<ShaderBuildEvent>(&event.payload);

View File

@@ -36,6 +36,7 @@ public:
private: private:
void HandleRuntimeStateBroadcastRequested(const RuntimeEvent& event); void HandleRuntimeStateBroadcastRequested(const RuntimeEvent& event);
void HandleRuntimeReloadRequested(const RuntimeEvent& event); void HandleRuntimeReloadRequested(const RuntimeEvent& event);
void HandleRuntimePersistenceRequested(const RuntimeEvent& event);
void HandleShaderBuildRequested(const RuntimeEvent& event); void HandleShaderBuildRequested(const RuntimeEvent& event);
void HandleShaderBuildPrepared(const RuntimeEvent& event); void HandleShaderBuildPrepared(const RuntimeEvent& event);
void HandleShaderBuildFailed(const RuntimeEvent& event); void HandleShaderBuildFailed(const RuntimeEvent& event);

View File

@@ -37,5 +37,8 @@ struct PersistenceSnapshot
std::filesystem::path targetPath; std::filesystem::path targetPath;
std::string contents; std::string contents;
std::string reason; std::string reason;
std::string debounceKey;
bool debounceAllowed = false;
bool flushRequested = false;
uint64_t generation = 0; uint64_t generation = 0;
}; };

View File

@@ -2,10 +2,130 @@
#include <windows.h> #include <windows.h>
#include <algorithm>
#include <filesystem> #include <filesystem>
#include <fstream> #include <fstream>
#include <utility>
bool PersistenceWriter::WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const PersistenceWriter::PersistenceWriter(std::chrono::milliseconds debounceDelay, SnapshotSink sink) :
mDebounceDelay(debounceDelay),
mSink(std::move(sink))
{
}
PersistenceWriter::~PersistenceWriter()
{
StopAndFlush();
}
void PersistenceWriter::SetResultCallback(ResultCallback callback)
{
std::lock_guard<std::mutex> lock(mMutex);
mResultCallback = std::move(callback);
}
bool PersistenceWriter::WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error)
{
if (!ValidateSnapshot(snapshot, error))
return false;
const bool succeeded = WriteSnapshotThroughSink(snapshot, error);
PublishWriteResult(snapshot, succeeded, error, false);
return succeeded;
}
bool PersistenceWriter::EnqueueSnapshot(const PersistenceSnapshot& snapshot, std::string& error)
{
if (!ValidateSnapshot(snapshot, error))
return false;
std::lock_guard<std::mutex> lock(mMutex);
if (!mAcceptingRequests)
{
error = "Persistence writer is stopping.";
return false;
}
StartWorkerLocked();
const auto now = std::chrono::steady_clock::now();
if (snapshot.debounceAllowed)
{
const std::string debounceKey = snapshot.debounceKey.empty() ? snapshot.targetPath.string() : snapshot.debounceKey;
PendingSnapshot& pending = mDebouncedSnapshots[debounceKey];
if (!pending.snapshot.targetPath.empty())
++mCoalescedCount;
else
++mEnqueuedCount;
pending.snapshot = snapshot;
pending.readyAt = snapshot.flushRequested ? now : now + mDebounceDelay;
}
else
{
mImmediateSnapshots.push_back(snapshot);
++mEnqueuedCount;
}
mCondition.notify_one();
return true;
}
void PersistenceWriter::StopAndFlush()
{
std::string error;
StopAndFlush((std::chrono::milliseconds::max)(), error);
}
bool PersistenceWriter::StopAndFlush(std::chrono::milliseconds timeout, std::string& error)
{
{
std::lock_guard<std::mutex> lock(mMutex);
mAcceptingRequests = false;
mStopping = true;
const auto now = std::chrono::steady_clock::now();
for (auto& entry : mDebouncedSnapshots)
entry.second.readyAt = now;
}
mCondition.notify_all();
std::unique_lock<std::mutex> lock(mMutex);
if (mWorkerRunning)
{
if (timeout == (std::chrono::milliseconds::max)())
{
mCondition.wait(lock, [this]() { return !mWorkerRunning; });
}
else
{
const auto deadline = std::chrono::steady_clock::now() + timeout;
if (!mCondition.wait_until(lock, deadline, [this]() { return !mWorkerRunning; }))
{
error = "Timed out while flushing persistence writer.";
return false;
}
}
}
lock.unlock();
if (mWorker.joinable())
mWorker.join();
return true;
}
PersistenceWriterMetrics PersistenceWriter::GetMetrics() const
{
std::lock_guard<std::mutex> lock(mMutex);
PersistenceWriterMetrics metrics;
metrics.pendingCount = PendingCountLocked();
metrics.enqueuedCount = mEnqueuedCount;
metrics.coalescedCount = mCoalescedCount;
metrics.writtenCount = mWrittenCount;
metrics.failedCount = mFailedCount;
return metrics;
}
bool PersistenceWriter::ValidateSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const
{ {
if (snapshot.targetPath.empty()) if (snapshot.targetPath.empty())
{ {
@@ -13,6 +133,14 @@ bool PersistenceWriter::WriteSnapshot(const PersistenceSnapshot& snapshot, std::
return false; return false;
} }
return true;
}
bool PersistenceWriter::WriteSnapshotThroughSink(const PersistenceSnapshot& snapshot, std::string& error) const
{
if (mSink)
return mSink(snapshot, error);
std::error_code fsError; std::error_code fsError;
std::filesystem::create_directories(snapshot.targetPath.parent_path(), fsError); std::filesystem::create_directories(snapshot.targetPath.parent_path(), fsError);
@@ -42,3 +170,107 @@ bool PersistenceWriter::WriteSnapshot(const PersistenceSnapshot& snapshot, std::
return true; return true;
} }
void PersistenceWriter::PublishWriteResult(const PersistenceSnapshot& snapshot, bool succeeded, const std::string& errorMessage, bool newerRequestPending)
{
ResultCallback callback;
{
std::lock_guard<std::mutex> lock(mMutex);
callback = mResultCallback;
}
if (!callback)
return;
PersistenceWriteResult result;
result.targetKind = snapshot.targetKind;
result.targetPath = snapshot.targetPath.string();
result.reason = snapshot.reason;
result.succeeded = succeeded;
result.errorMessage = errorMessage;
result.newerRequestPending = newerRequestPending;
callback(result);
}
void PersistenceWriter::StartWorkerLocked()
{
if (mWorkerRunning)
return;
mWorkerRunning = true;
mWorker = std::thread([this]() { WorkerMain(); });
}
void PersistenceWriter::WorkerMain()
{
for (;;)
{
PersistenceSnapshot snapshot;
{
std::unique_lock<std::mutex> lock(mMutex);
for (;;)
{
if (!mImmediateSnapshots.empty())
{
snapshot = std::move(mImmediateSnapshots.front());
mImmediateSnapshots.pop_front();
break;
}
if (!mDebouncedSnapshots.empty())
{
const auto now = std::chrono::steady_clock::now();
auto readyIt = mDebouncedSnapshots.end();
auto nextReadyAt = (std::chrono::steady_clock::time_point::max)();
for (auto it = mDebouncedSnapshots.begin(); it != mDebouncedSnapshots.end(); ++it)
{
if (it->second.readyAt <= now)
{
readyIt = it;
break;
}
if (it->second.readyAt < nextReadyAt)
nextReadyAt = it->second.readyAt;
}
if (readyIt != mDebouncedSnapshots.end())
{
snapshot = std::move(readyIt->second.snapshot);
mDebouncedSnapshots.erase(readyIt);
break;
}
mCondition.wait_until(lock, nextReadyAt);
continue;
}
if (mStopping)
{
mWorkerRunning = false;
mCondition.notify_all();
return;
}
mCondition.wait(lock);
}
}
std::string error;
const bool succeeded = WriteSnapshotThroughSink(snapshot, error);
bool newerRequestPending = false;
{
std::lock_guard<std::mutex> lock(mMutex);
if (succeeded)
++mWrittenCount;
else
++mFailedCount;
newerRequestPending = PendingCountLocked() > 0;
}
PublishWriteResult(snapshot, succeeded, error, newerRequestPending);
}
}
std::size_t PersistenceWriter::PendingCountLocked() const
{
return mImmediateSnapshots.size() + mDebouncedSnapshots.size();
}

View File

@@ -2,10 +2,80 @@
#include "PersistenceRequest.h" #include "PersistenceRequest.h"
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <deque>
#include <functional>
#include <mutex>
#include <string> #include <string>
#include <thread>
#include <unordered_map>
struct PersistenceWriterMetrics
{
std::size_t pendingCount = 0;
uint64_t enqueuedCount = 0;
uint64_t coalescedCount = 0;
uint64_t writtenCount = 0;
uint64_t failedCount = 0;
};
struct PersistenceWriteResult
{
PersistenceTargetKind targetKind = PersistenceTargetKind::RuntimeState;
std::string targetPath;
std::string reason;
bool succeeded = false;
std::string errorMessage;
bool newerRequestPending = false;
};
class PersistenceWriter class PersistenceWriter
{ {
public: public:
bool WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const; using SnapshotSink = std::function<bool(const PersistenceSnapshot&, std::string&)>;
using ResultCallback = std::function<void(const PersistenceWriteResult&)>;
explicit PersistenceWriter(
std::chrono::milliseconds debounceDelay = std::chrono::milliseconds(50),
SnapshotSink sink = SnapshotSink());
~PersistenceWriter();
void SetResultCallback(ResultCallback callback);
bool WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error);
bool EnqueueSnapshot(const PersistenceSnapshot& snapshot, std::string& error);
bool StopAndFlush(std::chrono::milliseconds timeout, std::string& error);
void StopAndFlush();
PersistenceWriterMetrics GetMetrics() const;
private:
struct PendingSnapshot
{
PersistenceSnapshot snapshot;
std::chrono::steady_clock::time_point readyAt;
};
bool ValidateSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const;
bool WriteSnapshotThroughSink(const PersistenceSnapshot& snapshot, std::string& error) const;
void PublishWriteResult(const PersistenceSnapshot& snapshot, bool succeeded, const std::string& errorMessage, bool newerRequestPending);
void StartWorkerLocked();
void WorkerMain();
std::size_t PendingCountLocked() const;
std::chrono::milliseconds mDebounceDelay;
SnapshotSink mSink;
ResultCallback mResultCallback;
mutable std::mutex mMutex;
std::condition_variable mCondition;
std::thread mWorker;
bool mWorkerRunning = false;
bool mStopping = false;
bool mAcceptingRequests = true;
std::unordered_map<std::string, PendingSnapshot> mDebouncedSnapshots;
std::deque<PersistenceSnapshot> mImmediateSnapshots;
uint64_t mEnqueuedCount = 0;
uint64_t mCoalescedCount = 0;
uint64_t mWrittenCount = 0;
uint64_t mFailedCount = 0;
}; };

View File

@@ -24,11 +24,25 @@ double GenerateStartupRandom()
return distribution(randomDevice); return distribution(randomDevice);
} }
std::string PersistenceTargetKindName(PersistenceTargetKind targetKind)
{
switch (targetKind)
{
case PersistenceTargetKind::RuntimeState:
return "runtime-state";
case PersistenceTargetKind::StackPreset:
return "stack-preset";
case PersistenceTargetKind::RuntimeConfig:
return "runtime-config";
default:
return "unknown";
}
}
} }
RuntimeStore::RuntimeStore() : RuntimeStore::RuntimeStore() :
mRenderSnapshotBuilder(*this), mRenderSnapshotBuilder(*this),
mHealthTelemetry(),
mReloadRequested(false), mReloadRequested(false),
mCompileSucceeded(false), mCompileSucceeded(false),
mStartupRandom(GenerateStartupRandom()), mStartupRandom(GenerateStartupRandom()),
@@ -37,6 +51,15 @@ RuntimeStore::RuntimeStore() :
mStartTime(std::chrono::steady_clock::now()), mStartTime(std::chrono::steady_clock::now()),
mLastScanTime((std::chrono::steady_clock::time_point::min)()) mLastScanTime((std::chrono::steady_clock::time_point::min)())
{ {
mPersistenceWriter.SetResultCallback([this](const PersistenceWriteResult& result) {
mHealthTelemetry.RecordPersistenceWriteResult(
result.succeeded,
PersistenceTargetKindName(result.targetKind),
result.targetPath,
result.reason,
result.errorMessage,
result.newerRequestPending);
});
} }
HealthTelemetry& RuntimeStore::GetHealthTelemetry() HealthTelemetry& RuntimeStore::GetHealthTelemetry()
@@ -104,6 +127,50 @@ PersistenceSnapshot RuntimeStore::BuildRuntimeStatePersistenceSnapshot(const Per
return BuildRuntimeStatePersistenceSnapshotLocked(request); return BuildRuntimeStatePersistenceSnapshotLocked(request);
} }
bool RuntimeStore::RequestPersistence(const PersistenceRequest& request, std::string& error)
{
if (request.targetKind != PersistenceTargetKind::RuntimeState)
{
error = "Unsupported persistence request target: " + PersistenceTargetKindName(request.targetKind);
mHealthTelemetry.RecordPersistenceWriteResult(
false,
PersistenceTargetKindName(request.targetKind),
std::string(),
request.reason,
error,
false);
return false;
}
const PersistenceSnapshot snapshot = BuildRuntimeStatePersistenceSnapshot(request);
if (mPersistenceWriter.EnqueueSnapshot(snapshot, error))
return true;
mHealthTelemetry.RecordPersistenceWriteResult(
false,
PersistenceTargetKindName(request.targetKind),
snapshot.targetPath.string(),
request.reason,
error,
false);
return false;
}
bool RuntimeStore::FlushPersistenceForShutdown(std::chrono::milliseconds timeout, std::string& error)
{
if (mPersistenceWriter.StopAndFlush(timeout, error))
return true;
mHealthTelemetry.RecordPersistenceWriteResult(
false,
PersistenceTargetKindName(PersistenceTargetKind::RuntimeState),
std::string(),
"shutdown-flush",
error,
true);
return false;
}
PersistenceSnapshot RuntimeStore::BuildRuntimeStatePersistenceSnapshotLocked(const PersistenceRequest& request) const PersistenceSnapshot RuntimeStore::BuildRuntimeStatePersistenceSnapshotLocked(const PersistenceRequest& request) const
{ {
PersistenceSnapshot snapshot; PersistenceSnapshot snapshot;
@@ -111,6 +178,9 @@ PersistenceSnapshot RuntimeStore::BuildRuntimeStatePersistenceSnapshotLocked(con
snapshot.targetPath = mConfigStore.GetRuntimeStatePath(); snapshot.targetPath = mConfigStore.GetRuntimeStatePath();
snapshot.contents = SerializeJson(mCommittedLiveState.BuildPersistentStateValue(mShaderCatalog), true); snapshot.contents = SerializeJson(mCommittedLiveState.BuildPersistentStateValue(mShaderCatalog), true);
snapshot.reason = request.reason; snapshot.reason = request.reason;
snapshot.debounceKey = request.debounceKey;
snapshot.debounceAllowed = request.debounceAllowed;
snapshot.flushRequested = request.flushRequested;
snapshot.generation = request.sequence; snapshot.generation = request.sequence;
return snapshot; return snapshot;
} }
@@ -184,7 +254,7 @@ bool RuntimeStore::CreateStoredLayer(const std::string& shaderId, std::string& e
mReloadRequested = true; mReloadRequested = true;
MarkRenderStateDirtyLocked(); MarkRenderStateDirtyLocked();
return SavePersistentState(error); return true;
} }
bool RuntimeStore::DeleteStoredLayer(const std::string& layerId, std::string& error) bool RuntimeStore::DeleteStoredLayer(const std::string& layerId, std::string& error)
@@ -195,7 +265,7 @@ bool RuntimeStore::DeleteStoredLayer(const std::string& layerId, std::string& er
mReloadRequested = true; mReloadRequested = true;
MarkRenderStateDirtyLocked(); MarkRenderStateDirtyLocked();
return SavePersistentState(error); return true;
} }
bool RuntimeStore::MoveStoredLayer(const std::string& layerId, int direction, std::string& error) bool RuntimeStore::MoveStoredLayer(const std::string& layerId, int direction, std::string& error)
@@ -212,7 +282,7 @@ bool RuntimeStore::MoveStoredLayer(const std::string& layerId, int direction, st
mReloadRequested = true; mReloadRequested = true;
MarkRenderStateDirtyLocked(); MarkRenderStateDirtyLocked();
return SavePersistentState(error); return true;
} }
bool RuntimeStore::MoveStoredLayerToIndex(const std::string& layerId, std::size_t targetIndex, std::string& error) bool RuntimeStore::MoveStoredLayerToIndex(const std::string& layerId, std::size_t targetIndex, std::string& error)
@@ -229,7 +299,7 @@ bool RuntimeStore::MoveStoredLayerToIndex(const std::string& layerId, std::size_
mReloadRequested = true; mReloadRequested = true;
MarkRenderStateDirtyLocked(); MarkRenderStateDirtyLocked();
return SavePersistentState(error); return true;
} }
bool RuntimeStore::SetStoredLayerBypassState(const std::string& layerId, bool bypassed, std::string& error) bool RuntimeStore::SetStoredLayerBypassState(const std::string& layerId, bool bypassed, std::string& error)
@@ -240,7 +310,7 @@ bool RuntimeStore::SetStoredLayerBypassState(const std::string& layerId, bool by
mReloadRequested = true; mReloadRequested = true;
MarkParameterStateDirtyLocked(); MarkParameterStateDirtyLocked();
return SavePersistentState(error); return true;
} }
bool RuntimeStore::SetStoredLayerShaderSelection(const std::string& layerId, const std::string& shaderId, std::string& error) bool RuntimeStore::SetStoredLayerShaderSelection(const std::string& layerId, const std::string& shaderId, std::string& error)
@@ -251,18 +321,19 @@ bool RuntimeStore::SetStoredLayerShaderSelection(const std::string& layerId, con
mReloadRequested = true; mReloadRequested = true;
MarkRenderStateDirtyLocked(); MarkRenderStateDirtyLocked();
return SavePersistentState(error); return true;
} }
bool RuntimeStore::SetStoredParameterValue(const std::string& layerId, const std::string& parameterId, const ShaderParameterValue& value, bool persistState, std::string& error) bool RuntimeStore::SetStoredParameterValue(const std::string& layerId, const std::string& parameterId, const ShaderParameterValue& value, bool persistState, std::string& error)
{ {
(void)persistState;
std::lock_guard<std::mutex> lock(mMutex); std::lock_guard<std::mutex> lock(mMutex);
if (!mCommittedLiveState.SetParameterValue(layerId, parameterId, value, error)) if (!mCommittedLiveState.SetParameterValue(layerId, parameterId, value, error))
return false; return false;
MarkParameterStateDirtyLocked(); MarkParameterStateDirtyLocked();
return !persistState || SavePersistentState(error); return true;
} }
bool RuntimeStore::ResetStoredLayerParameterValues(const std::string& layerId, std::string& error) bool RuntimeStore::ResetStoredLayerParameterValues(const std::string& layerId, std::string& error)
@@ -273,7 +344,7 @@ bool RuntimeStore::ResetStoredLayerParameterValues(const std::string& layerId, s
return false; return false;
MarkParameterStateDirtyLocked(); MarkParameterStateDirtyLocked();
return SavePersistentState(error); return true;
} }
bool RuntimeStore::SaveStackPresetSnapshot(const std::string& presetName, std::string& error) const bool RuntimeStore::SaveStackPresetSnapshot(const std::string& presetName, std::string& error) const
@@ -313,7 +384,7 @@ bool RuntimeStore::LoadStackPresetSnapshot(const std::string& presetName, std::s
mReloadRequested = true; mReloadRequested = true;
MarkRenderStateDirtyLocked(); MarkRenderStateDirtyLocked();
return SavePersistentState(error); return true;
} }
bool RuntimeStore::HasStoredLayer(const std::string& layerId) const bool RuntimeStore::HasStoredLayer(const std::string& layerId) const
@@ -476,11 +547,6 @@ bool RuntimeStore::LoadPersistentState(std::string& error)
return mCommittedLiveState.LoadPersistentStateValue(root); return mCommittedLiveState.LoadPersistentStateValue(root);
} }
bool RuntimeStore::SavePersistentState(std::string& error) const
{
return mPersistenceWriter.WriteSnapshot(BuildRuntimeStatePersistenceSnapshotLocked(PersistenceRequest::RuntimeStateRequest("SavePersistentState")), error);
}
PersistenceSnapshot RuntimeStore::BuildStackPresetPersistenceSnapshot(const std::string& presetName) const PersistenceSnapshot RuntimeStore::BuildStackPresetPersistenceSnapshot(const std::string& presetName) const
{ {
const std::string safeStem = LayerStackStore::MakeSafePresetFileStem(presetName); const std::string safeStem = LayerStackStore::MakeSafePresetFileStem(presetName);
@@ -490,6 +556,9 @@ PersistenceSnapshot RuntimeStore::BuildStackPresetPersistenceSnapshot(const std:
snapshot.targetPath = mConfigStore.GetPresetRoot() / (safeStem + ".json"); snapshot.targetPath = mConfigStore.GetPresetRoot() / (safeStem + ".json");
snapshot.contents = SerializeJson(mCommittedLiveState.BuildStackPresetValue(mShaderCatalog, presetName), true); snapshot.contents = SerializeJson(mCommittedLiveState.BuildStackPresetValue(mShaderCatalog, presetName), true);
snapshot.reason = "SaveStackPreset"; snapshot.reason = "SaveStackPreset";
snapshot.debounceKey = "stack-preset:" + safeStem;
snapshot.debounceAllowed = false;
snapshot.flushRequested = true;
snapshot.generation = 0; snapshot.generation = 0;
return snapshot; return snapshot;
} }

View File

@@ -32,6 +32,8 @@ public:
bool InitializeStore(std::string& error); bool InitializeStore(std::string& error);
std::string BuildPersistentStateJson() const; std::string BuildPersistentStateJson() const;
PersistenceSnapshot BuildRuntimeStatePersistenceSnapshot(const PersistenceRequest& request) const; PersistenceSnapshot BuildRuntimeStatePersistenceSnapshot(const PersistenceRequest& request) const;
bool RequestPersistence(const PersistenceRequest& request, std::string& error);
bool FlushPersistenceForShutdown(std::chrono::milliseconds timeout, std::string& error);
bool PollStoredFileChanges(bool& registryChanged, bool& reloadRequested, std::string& error); bool PollStoredFileChanges(bool& registryChanged, bool& reloadRequested, std::string& error);
bool CreateStoredLayer(const std::string& shaderId, std::string& error); bool CreateStoredLayer(const std::string& shaderId, std::string& error);
@@ -83,7 +85,6 @@ public:
private: private:
bool LoadPersistentState(std::string& error); bool LoadPersistentState(std::string& error);
bool SavePersistentState(std::string& error) const;
PersistenceSnapshot BuildRuntimeStatePersistenceSnapshotLocked(const PersistenceRequest& request) const; PersistenceSnapshot BuildRuntimeStatePersistenceSnapshotLocked(const PersistenceRequest& request) const;
PersistenceSnapshot BuildStackPresetPersistenceSnapshot(const std::string& presetName) const; PersistenceSnapshot BuildStackPresetPersistenceSnapshot(const std::string& presetName) const;
bool ScanShaderPackages(std::string& error); bool ScanShaderPackages(std::string& error);
@@ -93,11 +94,11 @@ private:
void MarkParameterStateDirtyLocked(); void MarkParameterStateDirtyLocked();
RenderSnapshotBuilder mRenderSnapshotBuilder; RenderSnapshotBuilder mRenderSnapshotBuilder;
PersistenceWriter mPersistenceWriter;
RuntimeConfigStore mConfigStore; RuntimeConfigStore mConfigStore;
ShaderPackageCatalog mShaderCatalog; ShaderPackageCatalog mShaderCatalog;
CommittedLiveState mCommittedLiveState; CommittedLiveState mCommittedLiveState;
HealthTelemetry mHealthTelemetry; HealthTelemetry mHealthTelemetry;
mutable PersistenceWriter mPersistenceWriter;
mutable std::mutex mMutex; mutable std::mutex mMutex;
bool mReloadRequested; bool mReloadRequested;
bool mCompileSucceeded; bool mCompileSucceeded;

View File

@@ -169,6 +169,44 @@ bool HealthTelemetry::TryRecordRuntimeEventDispatchStats(std::size_t dispatchedE
return true; return true;
} }
void HealthTelemetry::RecordPersistenceWriteResult(bool succeeded, const std::string& targetKind, const std::string& targetPath,
const std::string& reason, const std::string& errorMessage, bool newerRequestPending)
{
std::lock_guard<std::mutex> lock(mMutex);
if (succeeded)
++mPersistence.writeSuccessCount;
else
++mPersistence.writeFailureCount;
mPersistence.lastWriteSucceeded = succeeded;
mPersistence.unsavedChanges = !succeeded || newerRequestPending;
mPersistence.newerRequestPending = newerRequestPending;
mPersistence.lastTargetKind = targetKind;
mPersistence.lastTargetPath = targetPath;
mPersistence.lastReason = reason;
mPersistence.lastErrorMessage = errorMessage;
}
bool HealthTelemetry::TryRecordPersistenceWriteResult(bool succeeded, const std::string& targetKind, const std::string& targetPath,
const std::string& reason, const std::string& errorMessage, bool newerRequestPending)
{
std::unique_lock<std::mutex> lock(mMutex, std::try_to_lock);
if (!lock.owns_lock())
return false;
if (succeeded)
++mPersistence.writeSuccessCount;
else
++mPersistence.writeFailureCount;
mPersistence.lastWriteSucceeded = succeeded;
mPersistence.unsavedChanges = !succeeded || newerRequestPending;
mPersistence.newerRequestPending = newerRequestPending;
mPersistence.lastTargetKind = targetKind;
mPersistence.lastTargetPath = targetPath;
mPersistence.lastReason = reason;
mPersistence.lastErrorMessage = errorMessage;
return true;
}
HealthTelemetry::SignalStatusSnapshot HealthTelemetry::GetSignalStatusSnapshot() const HealthTelemetry::SignalStatusSnapshot HealthTelemetry::GetSignalStatusSnapshot() const
{ {
std::lock_guard<std::mutex> lock(mMutex); std::lock_guard<std::mutex> lock(mMutex);
@@ -193,6 +231,12 @@ HealthTelemetry::RuntimeEventMetricsSnapshot HealthTelemetry::GetRuntimeEventMet
return mRuntimeEvents; return mRuntimeEvents;
} }
HealthTelemetry::PersistenceSnapshot HealthTelemetry::GetPersistenceSnapshot() const
{
std::lock_guard<std::mutex> lock(mMutex);
return mPersistence;
}
HealthTelemetry::Snapshot HealthTelemetry::GetSnapshot() const HealthTelemetry::Snapshot HealthTelemetry::GetSnapshot() const
{ {
std::lock_guard<std::mutex> lock(mMutex); std::lock_guard<std::mutex> lock(mMutex);
@@ -202,5 +246,6 @@ HealthTelemetry::Snapshot HealthTelemetry::GetSnapshot() const
snapshot.videoIO = mVideoIOStatus; snapshot.videoIO = mVideoIOStatus;
snapshot.performance = mPerformance; snapshot.performance = mPerformance;
snapshot.runtimeEvents = mRuntimeEvents; snapshot.runtimeEvents = mRuntimeEvents;
snapshot.persistence = mPersistence;
return snapshot; return snapshot;
} }

View File

@@ -69,12 +69,26 @@ public:
RuntimeEventDispatchSnapshot dispatch; RuntimeEventDispatchSnapshot dispatch;
}; };
struct PersistenceSnapshot
{
uint64_t writeSuccessCount = 0;
uint64_t writeFailureCount = 0;
bool lastWriteSucceeded = true;
bool unsavedChanges = false;
bool newerRequestPending = false;
std::string lastTargetKind;
std::string lastTargetPath;
std::string lastReason;
std::string lastErrorMessage;
};
struct Snapshot struct Snapshot
{ {
SignalStatusSnapshot signal; SignalStatusSnapshot signal;
VideoIOStatusSnapshot videoIO; VideoIOStatusSnapshot videoIO;
PerformanceSnapshot performance; PerformanceSnapshot performance;
RuntimeEventMetricsSnapshot runtimeEvents; RuntimeEventMetricsSnapshot runtimeEvents;
PersistenceSnapshot persistence;
}; };
HealthTelemetry() = default; HealthTelemetry() = default;
@@ -107,10 +121,16 @@ public:
bool TryRecordRuntimeEventDispatchStats(std::size_t dispatchedEvents, std::size_t handlerInvocations, bool TryRecordRuntimeEventDispatchStats(std::size_t dispatchedEvents, std::size_t handlerInvocations,
std::size_t handlerFailures, double dispatchDurationMilliseconds); std::size_t handlerFailures, double dispatchDurationMilliseconds);
void RecordPersistenceWriteResult(bool succeeded, const std::string& targetKind, const std::string& targetPath,
const std::string& reason, const std::string& errorMessage, bool newerRequestPending);
bool TryRecordPersistenceWriteResult(bool succeeded, const std::string& targetKind, const std::string& targetPath,
const std::string& reason, const std::string& errorMessage, bool newerRequestPending);
SignalStatusSnapshot GetSignalStatusSnapshot() const; SignalStatusSnapshot GetSignalStatusSnapshot() const;
VideoIOStatusSnapshot GetVideoIOStatusSnapshot() const; VideoIOStatusSnapshot GetVideoIOStatusSnapshot() const;
PerformanceSnapshot GetPerformanceSnapshot() const; PerformanceSnapshot GetPerformanceSnapshot() const;
RuntimeEventMetricsSnapshot GetRuntimeEventMetricsSnapshot() const; RuntimeEventMetricsSnapshot GetRuntimeEventMetricsSnapshot() const;
PersistenceSnapshot GetPersistenceSnapshot() const;
Snapshot GetSnapshot() const; Snapshot GetSnapshot() const;
private: private:
@@ -119,4 +139,5 @@ private:
VideoIOStatusSnapshot mVideoIOStatus; VideoIOStatusSnapshot mVideoIOStatus;
PerformanceSnapshot mPerformance; PerformanceSnapshot mPerformance;
RuntimeEventMetricsSnapshot mRuntimeEvents; RuntimeEventMetricsSnapshot mRuntimeEvents;
PersistenceSnapshot mPersistence;
}; };

View File

@@ -6,9 +6,9 @@ Phases 1-5 separate durable state, coordination policy, render-facing snapshots,
## Status ## Status
- Phase 6 design package: proposed. - Phase 6 design package: complete.
- Phase 6 implementation: Step 2 complete. - Phase 6 implementation: Step 6 complete.
- Current alignment: `RuntimeStore` owns durable serialization, config, package metadata, preset IO, and persistence requests; `CommittedLiveState` owns the current committed/session layer state; and `RuntimeCoordinator` publishes typed persistence requests for persisted mutations. The remaining issue is that actual disk writes are still synchronous store work rather than queued, debounced, atomic background writes. - Current alignment: `RuntimeStore` owns durable serialization, config, package metadata, preset IO, and persistence request execution; `CommittedLiveState` owns the current committed/session layer state; and `RuntimeCoordinator` publishes typed persistence requests for persisted mutations. Runtime-state persistence is now requested through the coordinator/event path and executed by the background writer.
Current persistence footholds: Current persistence footholds:
@@ -218,9 +218,16 @@ Introduce a worker thread or queued task owner.
Initial target: Initial target:
- repeated runtime-state requests coalesce - [x] repeated runtime-state requests coalesce
- worker writes only latest pending snapshot - [x] worker writes only latest pending snapshot
- tests cover coalescing without filesystem where possible - [x] tests cover coalescing without filesystem where possible
Current implementation:
- `PersistenceWriter::EnqueueSnapshot(...)` starts a worker lazily and debounces snapshots by `debounceKey`.
- Runtime-state saves enqueue debounced snapshots, so routine mutation paths no longer write the runtime-state file directly.
- Synchronous `PersistenceWriter::WriteSnapshot(...)` remains for stack preset saves and transitional direct writes.
- `PersistenceWriterTests` use an injected in-memory sink to verify coalescing and non-coalesced immediate requests without touching the filesystem.
### Step 4. Add Atomic Write And Failure Reporting ### Step 4. Add Atomic Write And Failure Reporting
@@ -228,9 +235,16 @@ Make disk writes safer and observable.
Initial target: Initial target:
- temp-file then replace - [x] temp-file then replace
- failure returned/published with structured reason - [x] failure returned/published with structured reason
- `HealthTelemetry` receives persistence warning state - [x] `HealthTelemetry` receives persistence warning state
Current implementation:
- `PersistenceWriter::WriteSnapshot(...)` and worker writes use temp-file then `MoveFileExA(..., MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH)`.
- `PersistenceWriteResult` reports target kind, target path, reason, success/failure, error message, and whether newer work was pending.
- `RuntimeStore` wires persistence write results into `HealthTelemetry`.
- `HealthTelemetry` records persistence success/failure counts, last target/reason/error, pending-newer-request state, and unsaved-change state.
### Step 5. Wire Coordinator/Event Requests To Writer ### Step 5. Wire Coordinator/Event Requests To Writer
@@ -238,9 +252,17 @@ Route `RuntimePersistenceRequested` or coordinator persistence outcomes into the
Initial target: Initial target:
- accepted durable mutations request persistence - [x] accepted durable mutations request persistence
- transient-only mutations do not - [x] transient-only mutations do not
- runtime reload/preset policies remain explicit - [x] runtime reload/preset policies remain explicit
Current implementation:
- Store mutation methods update committed durable/session state and mark render state dirty, but no longer enqueue runtime-state writes directly.
- `RuntimeCoordinator` remains the owner of the persistence decision and publishes `RuntimePersistenceRequested` only for accepted durable mutations.
- `RuntimeUpdateController` handles `RuntimePersistenceRequested` and calls `RuntimeStore::RequestPersistence(...)`.
- `RuntimeStore::RequestPersistence(...)` validates the request target, builds the runtime-state snapshot, enqueues it on `PersistenceWriter`, and records enqueue failures in `HealthTelemetry`.
- Stack preset save remains a synchronous preset-file write; preset load updates state and relies on the coordinator persistence request for runtime-state persistence.
### Step 6. Define Shutdown Flush ### Step 6. Define Shutdown Flush
@@ -248,9 +270,16 @@ Make app shutdown persistence behavior deterministic.
Initial target: Initial target:
- stop accepting new requests - [x] stop accepting new requests
- flush latest pending snapshot with bounded wait - [x] flush latest pending snapshot with bounded wait
- report failure if flush fails - [x] report failure if flush fails
Current implementation:
- `PersistenceWriter::StopAndFlush(timeout, error)` stops accepting new snapshots, forces debounced snapshots ready, drains pending work, and reports timeout/failure to the caller.
- `RuntimeStore::FlushPersistenceForShutdown(...)` provides the runtime-level shutdown API and records flush failures in `HealthTelemetry`.
- `OpenGLComposite::Stop()` and the destructor explicitly flush persistence after control services/backend/render-thread shutdown.
- `PersistenceWriterTests` cover shutdown draining, request rejection after shutdown, and timeout/retry behavior without rendering or DeckLink.
## Testing Strategy ## Testing Strategy
@@ -293,14 +322,14 @@ Operator-triggered preset save often feels like it should complete before report
Phase 6 can be considered complete once the project can say: Phase 6 can be considered complete once the project can say:
- [ ] durable mutations enqueue persistence instead of directly writing from mutation paths - [x] durable mutations enqueue persistence instead of directly writing from mutation paths
- [ ] runtime-state writes are debounced/coalesced - [x] runtime-state writes are debounced/coalesced
- [ ] writes use temp-file/replace or equivalent atomic policy - [x] writes use temp-file/replace or equivalent atomic policy
- [ ] persistence failures are reported through structured health/events - [x] persistence failures are reported through structured health/events
- [ ] transient/live-only mutations do not request persistence - [x] transient/live-only mutations do not request persistence
- [ ] shutdown flush behavior is explicit and tested - [x] shutdown flush behavior is explicit and tested
- [ ] `RuntimeStore` remains durable-state/serialization owner, not worker policy owner - [x] `RuntimeStore` remains durable-state/serialization owner, not worker policy owner
- [ ] persistence behavior has focused non-render tests - [x] persistence behavior has focused non-render tests
## Open Questions ## Open Questions

View File

@@ -53,6 +53,29 @@ void TestRuntimeEventTryRecord()
Expect(metrics.queue.oldestEventAgeMilliseconds == 0.0, "queue age is clamped to non-negative values"); Expect(metrics.queue.oldestEventAgeMilliseconds == 0.0, "queue age is clamped to non-negative values");
Expect(metrics.dispatch.lastDispatchDurationMilliseconds == 0.0, "dispatch duration is clamped to non-negative values"); Expect(metrics.dispatch.lastDispatchDurationMilliseconds == 0.0, "dispatch duration is clamped to non-negative values");
} }
void TestPersistenceWriteHealth()
{
HealthTelemetry telemetry;
telemetry.RecordPersistenceWriteResult(false, "runtime-state", "runtime/runtime_state.json", "UpdateLayerParameter",
"disk full", true);
HealthTelemetry::PersistenceSnapshot persistence = telemetry.GetPersistenceSnapshot();
Expect(persistence.writeFailureCount == 1, "persistence health counts write failures");
Expect(!persistence.lastWriteSucceeded, "persistence health records failed write state");
Expect(persistence.unsavedChanges, "persistence health reports unsaved changes after failure");
Expect(persistence.newerRequestPending, "persistence health records pending newer request");
Expect(persistence.lastTargetKind == "runtime-state", "persistence health records target kind");
Expect(persistence.lastReason == "UpdateLayerParameter", "persistence health records reason");
Expect(persistence.lastErrorMessage == "disk full", "persistence health records error message");
Expect(telemetry.TryRecordPersistenceWriteResult(true, "runtime-state", "runtime/runtime_state.json", "flush", "", false),
"try persistence health succeeds when uncontended");
persistence = telemetry.GetPersistenceSnapshot();
Expect(persistence.writeSuccessCount == 1, "persistence health counts write successes");
Expect(persistence.lastWriteSucceeded, "persistence health records successful write state");
Expect(!persistence.unsavedChanges, "persistence health clears unsaved changes after latest successful write with no pending request");
}
} }
int main() int main()
@@ -60,6 +83,7 @@ int main()
TestRuntimeEventQueueMetrics(); TestRuntimeEventQueueMetrics();
TestRuntimeEventDispatchStats(); TestRuntimeEventDispatchStats();
TestRuntimeEventTryRecord(); TestRuntimeEventTryRecord();
TestPersistenceWriteHealth();
if (gFailures != 0) if (gFailures != 0)
{ {

View File

@@ -0,0 +1,210 @@
#include "PersistenceWriter.h"
#include <condition_variable>
#include <chrono>
#include <filesystem>
#include <iostream>
#include <mutex>
#include <string>
#include <vector>
namespace
{
int gFailures = 0;
void Expect(bool condition, const char* message)
{
if (condition)
return;
std::cerr << "FAIL: " << message << "\n";
++gFailures;
}
PersistenceSnapshot MakeRuntimeSnapshot(const std::string& contents)
{
PersistenceSnapshot snapshot;
snapshot.targetKind = PersistenceTargetKind::RuntimeState;
snapshot.targetPath = std::filesystem::temp_directory_path() / "video-shader-persistence-writer-test.json";
snapshot.contents = contents;
snapshot.reason = "test";
snapshot.debounceKey = "runtime-state";
snapshot.debounceAllowed = true;
return snapshot;
}
void TestDebouncedRequestsCoalesceToNewestSnapshot()
{
std::mutex mutex;
std::vector<PersistenceSnapshot> writtenSnapshots;
PersistenceWriter writer(
std::chrono::milliseconds(1000),
[&](const PersistenceSnapshot& snapshot, std::string&) {
std::lock_guard<std::mutex> lock(mutex);
writtenSnapshots.push_back(snapshot);
return true;
});
std::string error;
Expect(writer.EnqueueSnapshot(MakeRuntimeSnapshot("first"), error), "first debounced snapshot enqueues");
Expect(writer.EnqueueSnapshot(MakeRuntimeSnapshot("second"), error), "second debounced snapshot enqueues");
PersistenceWriterMetrics metrics = writer.GetMetrics();
Expect(metrics.pendingCount == 1, "debounced snapshots share one pending slot");
Expect(metrics.enqueuedCount == 1, "first debounced snapshot counts as enqueue");
Expect(metrics.coalescedCount == 1, "second debounced snapshot counts as coalesced");
writer.StopAndFlush();
{
std::lock_guard<std::mutex> lock(mutex);
Expect(writtenSnapshots.size() == 1, "flush writes one coalesced snapshot");
Expect(!writtenSnapshots.empty() && writtenSnapshots[0].contents == "second", "coalesced writer keeps newest snapshot");
}
metrics = writer.GetMetrics();
Expect(metrics.pendingCount == 0, "flush drains pending debounced snapshot");
Expect(metrics.writtenCount == 1, "flush records one successful write");
}
void TestImmediateRequestsAreNotCoalesced()
{
std::mutex mutex;
std::vector<PersistenceSnapshot> writtenSnapshots;
PersistenceWriter writer(
std::chrono::milliseconds(1000),
[&](const PersistenceSnapshot& snapshot, std::string&) {
std::lock_guard<std::mutex> lock(mutex);
writtenSnapshots.push_back(snapshot);
return true;
});
PersistenceSnapshot first = MakeRuntimeSnapshot("first");
first.debounceAllowed = false;
PersistenceSnapshot second = MakeRuntimeSnapshot("second");
second.debounceAllowed = false;
std::string error;
Expect(writer.EnqueueSnapshot(first, error), "first immediate snapshot enqueues");
Expect(writer.EnqueueSnapshot(second, error), "second immediate snapshot enqueues");
writer.StopAndFlush();
{
std::lock_guard<std::mutex> lock(mutex);
Expect(writtenSnapshots.size() == 2, "immediate snapshots are written independently");
Expect(writtenSnapshots.size() == 2 && writtenSnapshots[0].contents == "first" && writtenSnapshots[1].contents == "second",
"immediate snapshots preserve order");
}
}
void TestWriteFailureReportsStructuredResult()
{
std::vector<PersistenceWriteResult> results;
PersistenceWriter writer(
std::chrono::milliseconds(1),
[](const PersistenceSnapshot&, std::string& error) {
error = "simulated failure";
return false;
});
writer.SetResultCallback([&results](const PersistenceWriteResult& result) {
results.push_back(result);
});
PersistenceSnapshot snapshot = MakeRuntimeSnapshot("payload");
snapshot.debounceAllowed = false;
snapshot.reason = "failure-test";
std::string error;
Expect(writer.EnqueueSnapshot(snapshot, error), "failing snapshot still enqueues");
writer.StopAndFlush();
Expect(results.size() == 1, "writer reports one failure result");
Expect(!results.empty() && !results[0].succeeded, "writer result records failure");
Expect(!results.empty() && results[0].reason == "failure-test", "writer result preserves reason");
Expect(!results.empty() && results[0].errorMessage == "simulated failure", "writer result preserves error message");
Expect(!results.empty() && !results[0].newerRequestPending, "writer result reports no newer pending request");
Expect(writer.GetMetrics().failedCount == 1, "writer metrics count failed writes");
}
void TestShutdownFlushDrainsPendingSnapshotAndRejectsNewRequests()
{
std::mutex mutex;
std::vector<PersistenceSnapshot> writtenSnapshots;
PersistenceWriter writer(
std::chrono::milliseconds(1000),
[&](const PersistenceSnapshot& snapshot, std::string&) {
std::lock_guard<std::mutex> lock(mutex);
writtenSnapshots.push_back(snapshot);
return true;
});
std::string error;
Expect(writer.EnqueueSnapshot(MakeRuntimeSnapshot("pending"), error), "pending snapshot enqueues before shutdown");
Expect(writer.StopAndFlush(std::chrono::seconds(1), error), "bounded shutdown flush completes");
{
std::lock_guard<std::mutex> lock(mutex);
Expect(writtenSnapshots.size() == 1, "shutdown flush writes pending debounced snapshot");
Expect(!writtenSnapshots.empty() && writtenSnapshots[0].contents == "pending", "shutdown flush preserves pending snapshot contents");
}
Expect(!writer.EnqueueSnapshot(MakeRuntimeSnapshot("late"), error), "writer rejects requests after shutdown flush");
}
void TestShutdownFlushTimeoutCanBeRetried()
{
std::mutex mutex;
std::condition_variable condition;
bool sinkStarted = false;
bool releaseSink = false;
PersistenceWriter writer(
std::chrono::milliseconds(1),
[&](const PersistenceSnapshot&, std::string&) {
std::unique_lock<std::mutex> lock(mutex);
sinkStarted = true;
condition.notify_all();
condition.wait(lock, [&]() { return releaseSink; });
return true;
});
PersistenceSnapshot snapshot = MakeRuntimeSnapshot("slow");
snapshot.debounceAllowed = false;
std::string error;
Expect(writer.EnqueueSnapshot(snapshot, error), "slow snapshot enqueues");
{
std::unique_lock<std::mutex> lock(mutex);
Expect(condition.wait_for(lock, std::chrono::seconds(1), [&]() { return sinkStarted; }),
"slow sink starts before timeout test");
}
Expect(!writer.StopAndFlush(std::chrono::milliseconds(10), error), "bounded shutdown flush reports timeout");
Expect(error.find("Timed out") != std::string::npos, "shutdown timeout returns a useful error");
{
std::lock_guard<std::mutex> lock(mutex);
releaseSink = true;
}
condition.notify_all();
error.clear();
Expect(writer.StopAndFlush(std::chrono::seconds(1), error), "shutdown flush can complete after earlier timeout");
}
}
int main()
{
TestDebouncedRequestsCoalesceToNewestSnapshot();
TestImmediateRequestsAreNotCoalesced();
TestWriteFailureReportsStructuredResult();
TestShutdownFlushDrainsPendingSnapshotAndRejectsNewRequests();
TestShutdownFlushTimeoutCanBeRetried();
if (gFailures != 0)
{
std::cerr << gFailures << " persistence writer test(s) failed.\n";
return 1;
}
std::cout << "Persistence writer tests passed.\n";
return 0;
}

View File

@@ -242,6 +242,12 @@ void TestRuntimeCoordinatorPersistenceEvents()
Expect(snapshot.reason == "unit-test", "runtime-state persistence snapshot preserves request reason"); Expect(snapshot.reason == "unit-test", "runtime-state persistence snapshot preserves request reason");
Expect(snapshot.targetPath.filename().string() == "runtime_state.json", "runtime-state persistence snapshot targets the runtime state file"); Expect(snapshot.targetPath.filename().string() == "runtime_state.json", "runtime-state persistence snapshot targets the runtime state file");
Expect(snapshot.contents.find("\"layers\"") != std::string::npos, "runtime-state persistence snapshot contains serialized layer state"); Expect(snapshot.contents.find("\"layers\"") != std::string::npos, "runtime-state persistence snapshot contains serialized layer state");
Expect(store.RequestPersistence(PersistenceRequest::RuntimeStateRequest("unit-test-request"), error),
"runtime store accepts runtime-state persistence requests");
PersistenceRequest unsupportedRequest;
unsupportedRequest.targetKind = PersistenceTargetKind::StackPreset;
unsupportedRequest.reason = "unsupported-unit-test";
Expect(!store.RequestPersistence(unsupportedRequest, error), "runtime store rejects unsupported persistence request targets");
RuntimeEventDispatcher dispatcher(64); RuntimeEventDispatcher dispatcher(64);
std::vector<RuntimeEvent> seenEvents; std::vector<RuntimeEvent> seenEvents;