From 205c90e52ef7e8c13b83734ff7c86087a0cf657a Mon Sep 17 00:00:00 2001 From: Aiden <68633820+awils27@users.noreply.github.com> Date: Mon, 11 May 2026 19:53:31 +1000 Subject: [PATCH] Step 3 --- CMakeLists.txt | 17 ++ .../runtime/persistence/PersistenceRequest.h | 3 + .../runtime/persistence/PersistenceWriter.cpp | 173 ++++++++++++++++++ .../runtime/persistence/PersistenceWriter.h | 55 ++++++ .../runtime/store/RuntimeStore.cpp | 8 +- .../runtime/store/RuntimeStore.h | 2 +- docs/PHASE_6_BACKGROUND_PERSISTENCE_DESIGN.md | 15 +- tests/PersistenceWriterTests.cpp | 112 ++++++++++++ 8 files changed, 379 insertions(+), 6 deletions(-) create mode 100644 tests/PersistenceWriterTests.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index aa688da..9be76ec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -345,6 +345,23 @@ endif() add_test(NAME RuntimeStateLayerModelTests COMMAND RuntimeStateLayerModelTests) +add_executable(PersistenceWriterTests + "${APP_DIR}/runtime/persistence/PersistenceWriter.cpp" + "${CMAKE_CURRENT_SOURCE_DIR}/tests/PersistenceWriterTests.cpp" +) + +target_include_directories(PersistenceWriterTests PRIVATE + "${APP_DIR}" + "${APP_DIR}/runtime" + "${APP_DIR}/runtime/persistence" +) + +if(MSVC) + target_compile_options(PersistenceWriterTests PRIVATE /W3) +endif() + +add_test(NAME PersistenceWriterTests COMMAND PersistenceWriterTests) + add_executable(RuntimeSubsystemTests "${APP_DIR}/runtime/coordination/RuntimeCoordinator.cpp" "${APP_DIR}/runtime/live/CommittedLiveState.cpp" diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceRequest.h b/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceRequest.h index 91838e4..455803a 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceRequest.h +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceRequest.h @@ -37,5 +37,8 @@ struct PersistenceSnapshot std::filesystem::path targetPath; std::string contents; std::string reason; + std::string debounceKey; + bool debounceAllowed = false; + bool flushRequested = false; uint64_t generation = 0; }; diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.cpp b/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.cpp index 961d5f0..b1ca9cb 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.cpp +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.cpp @@ -2,10 +2,96 @@ #include +#include #include #include +#include + +PersistenceWriter::PersistenceWriter(std::chrono::milliseconds debounceDelay, SnapshotSink sink) : + mDebounceDelay(debounceDelay), + mSink(std::move(sink)) +{ +} + +PersistenceWriter::~PersistenceWriter() +{ + StopAndFlush(); +} bool PersistenceWriter::WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const +{ + if (!ValidateSnapshot(snapshot, error)) + return false; + + return WriteSnapshotThroughSink(snapshot, error); +} + +bool PersistenceWriter::EnqueueSnapshot(const PersistenceSnapshot& snapshot, std::string& error) +{ + if (!ValidateSnapshot(snapshot, error)) + return false; + + std::lock_guard lock(mMutex); + if (!mAcceptingRequests) + { + error = "Persistence writer is stopping."; + return false; + } + + StartWorkerLocked(); + + const auto now = std::chrono::steady_clock::now(); + if (snapshot.debounceAllowed) + { + const std::string debounceKey = snapshot.debounceKey.empty() ? snapshot.targetPath.string() : snapshot.debounceKey; + PendingSnapshot& pending = mDebouncedSnapshots[debounceKey]; + if (!pending.snapshot.targetPath.empty()) + ++mCoalescedCount; + else + ++mEnqueuedCount; + + pending.snapshot = snapshot; + pending.readyAt = snapshot.flushRequested ? now : now + mDebounceDelay; + } + else + { + mImmediateSnapshots.push_back(snapshot); + ++mEnqueuedCount; + } + + mCondition.notify_one(); + return true; +} + +void PersistenceWriter::StopAndFlush() +{ + { + std::lock_guard lock(mMutex); + mAcceptingRequests = false; + mStopping = true; + const auto now = std::chrono::steady_clock::now(); + for (auto& entry : mDebouncedSnapshots) + entry.second.readyAt = now; + } + mCondition.notify_all(); + + if (mWorker.joinable()) + mWorker.join(); +} + +PersistenceWriterMetrics PersistenceWriter::GetMetrics() const +{ + std::lock_guard lock(mMutex); + PersistenceWriterMetrics metrics; + metrics.pendingCount = PendingCountLocked(); + metrics.enqueuedCount = mEnqueuedCount; + metrics.coalescedCount = mCoalescedCount; + metrics.writtenCount = mWrittenCount; + metrics.failedCount = mFailedCount; + return metrics; +} + +bool PersistenceWriter::ValidateSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const { if (snapshot.targetPath.empty()) { @@ -13,6 +99,14 @@ bool PersistenceWriter::WriteSnapshot(const PersistenceSnapshot& snapshot, std:: return false; } + return true; +} + +bool PersistenceWriter::WriteSnapshotThroughSink(const PersistenceSnapshot& snapshot, std::string& error) const +{ + if (mSink) + return mSink(snapshot, error); + std::error_code fsError; std::filesystem::create_directories(snapshot.targetPath.parent_path(), fsError); @@ -42,3 +136,82 @@ bool PersistenceWriter::WriteSnapshot(const PersistenceSnapshot& snapshot, std:: return true; } + +void PersistenceWriter::StartWorkerLocked() +{ + if (mWorkerRunning) + return; + + mWorkerRunning = true; + mWorker = std::thread([this]() { WorkerMain(); }); +} + +void PersistenceWriter::WorkerMain() +{ + for (;;) + { + PersistenceSnapshot snapshot; + { + std::unique_lock lock(mMutex); + for (;;) + { + if (!mImmediateSnapshots.empty()) + { + snapshot = std::move(mImmediateSnapshots.front()); + mImmediateSnapshots.pop_front(); + break; + } + + if (!mDebouncedSnapshots.empty()) + { + const auto now = std::chrono::steady_clock::now(); + auto readyIt = mDebouncedSnapshots.end(); + auto nextReadyAt = (std::chrono::steady_clock::time_point::max)(); + for (auto it = mDebouncedSnapshots.begin(); it != mDebouncedSnapshots.end(); ++it) + { + if (it->second.readyAt <= now) + { + readyIt = it; + break; + } + if (it->second.readyAt < nextReadyAt) + nextReadyAt = it->second.readyAt; + } + + if (readyIt != mDebouncedSnapshots.end()) + { + snapshot = std::move(readyIt->second.snapshot); + mDebouncedSnapshots.erase(readyIt); + break; + } + + mCondition.wait_until(lock, nextReadyAt); + continue; + } + + if (mStopping) + { + mWorkerRunning = false; + return; + } + + mCondition.wait(lock); + } + } + + std::string error; + const bool succeeded = WriteSnapshotThroughSink(snapshot, error); + { + std::lock_guard lock(mMutex); + if (succeeded) + ++mWrittenCount; + else + ++mFailedCount; + } + } +} + +std::size_t PersistenceWriter::PendingCountLocked() const +{ + return mImmediateSnapshots.size() + mDebouncedSnapshots.size(); +} diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.h b/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.h index 1e92713..8af6ae9 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.h +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.h @@ -2,10 +2,65 @@ #include "PersistenceRequest.h" +#include +#include +#include +#include +#include +#include #include +#include +#include + +struct PersistenceWriterMetrics +{ + std::size_t pendingCount = 0; + uint64_t enqueuedCount = 0; + uint64_t coalescedCount = 0; + uint64_t writtenCount = 0; + uint64_t failedCount = 0; +}; class PersistenceWriter { public: + using SnapshotSink = std::function; + + explicit PersistenceWriter( + std::chrono::milliseconds debounceDelay = std::chrono::milliseconds(50), + SnapshotSink sink = SnapshotSink()); + ~PersistenceWriter(); + bool WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const; + bool EnqueueSnapshot(const PersistenceSnapshot& snapshot, std::string& error); + void StopAndFlush(); + PersistenceWriterMetrics GetMetrics() const; + +private: + struct PendingSnapshot + { + PersistenceSnapshot snapshot; + std::chrono::steady_clock::time_point readyAt; + }; + + bool ValidateSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const; + bool WriteSnapshotThroughSink(const PersistenceSnapshot& snapshot, std::string& error) const; + void StartWorkerLocked(); + void WorkerMain(); + std::size_t PendingCountLocked() const; + + std::chrono::milliseconds mDebounceDelay; + SnapshotSink mSink; + mutable std::mutex mMutex; + std::condition_variable mCondition; + std::thread mWorker; + bool mWorkerRunning = false; + bool mStopping = false; + bool mAcceptingRequests = true; + std::unordered_map mDebouncedSnapshots; + std::deque mImmediateSnapshots; + uint64_t mEnqueuedCount = 0; + uint64_t mCoalescedCount = 0; + uint64_t mWrittenCount = 0; + uint64_t mFailedCount = 0; }; diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.cpp b/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.cpp index ffb62a3..baf7378 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.cpp +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.cpp @@ -111,6 +111,9 @@ PersistenceSnapshot RuntimeStore::BuildRuntimeStatePersistenceSnapshotLocked(con snapshot.targetPath = mConfigStore.GetRuntimeStatePath(); snapshot.contents = SerializeJson(mCommittedLiveState.BuildPersistentStateValue(mShaderCatalog), true); snapshot.reason = request.reason; + snapshot.debounceKey = request.debounceKey; + snapshot.debounceAllowed = request.debounceAllowed; + snapshot.flushRequested = request.flushRequested; snapshot.generation = request.sequence; return snapshot; } @@ -478,7 +481,7 @@ bool RuntimeStore::LoadPersistentState(std::string& error) bool RuntimeStore::SavePersistentState(std::string& error) const { - return mPersistenceWriter.WriteSnapshot(BuildRuntimeStatePersistenceSnapshotLocked(PersistenceRequest::RuntimeStateRequest("SavePersistentState")), error); + return mPersistenceWriter.EnqueueSnapshot(BuildRuntimeStatePersistenceSnapshotLocked(PersistenceRequest::RuntimeStateRequest("SavePersistentState")), error); } PersistenceSnapshot RuntimeStore::BuildStackPresetPersistenceSnapshot(const std::string& presetName) const @@ -490,6 +493,9 @@ PersistenceSnapshot RuntimeStore::BuildStackPresetPersistenceSnapshot(const std: snapshot.targetPath = mConfigStore.GetPresetRoot() / (safeStem + ".json"); snapshot.contents = SerializeJson(mCommittedLiveState.BuildStackPresetValue(mShaderCatalog, presetName), true); snapshot.reason = "SaveStackPreset"; + snapshot.debounceKey = "stack-preset:" + safeStem; + snapshot.debounceAllowed = false; + snapshot.flushRequested = true; snapshot.generation = 0; return snapshot; } diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.h b/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.h index e95a0fb..477366e 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.h +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.h @@ -93,7 +93,7 @@ private: void MarkParameterStateDirtyLocked(); RenderSnapshotBuilder mRenderSnapshotBuilder; - PersistenceWriter mPersistenceWriter; + mutable PersistenceWriter mPersistenceWriter; RuntimeConfigStore mConfigStore; ShaderPackageCatalog mShaderCatalog; CommittedLiveState mCommittedLiveState; diff --git a/docs/PHASE_6_BACKGROUND_PERSISTENCE_DESIGN.md b/docs/PHASE_6_BACKGROUND_PERSISTENCE_DESIGN.md index a021da4..c131139 100644 --- a/docs/PHASE_6_BACKGROUND_PERSISTENCE_DESIGN.md +++ b/docs/PHASE_6_BACKGROUND_PERSISTENCE_DESIGN.md @@ -7,7 +7,7 @@ Phases 1-5 separate durable state, coordination policy, render-facing snapshots, ## Status - Phase 6 design package: proposed. -- Phase 6 implementation: Step 2 complete. +- Phase 6 implementation: Step 3 complete. - Current alignment: `RuntimeStore` owns durable serialization, config, package metadata, preset IO, and persistence requests; `CommittedLiveState` owns the current committed/session layer state; and `RuntimeCoordinator` publishes typed persistence requests for persisted mutations. The remaining issue is that actual disk writes are still synchronous store work rather than queued, debounced, atomic background writes. Current persistence footholds: @@ -218,9 +218,16 @@ Introduce a worker thread or queued task owner. Initial target: -- repeated runtime-state requests coalesce -- worker writes only latest pending snapshot -- tests cover coalescing without filesystem where possible +- [x] repeated runtime-state requests coalesce +- [x] worker writes only latest pending snapshot +- [x] tests cover coalescing without filesystem where possible + +Current implementation: + +- `PersistenceWriter::EnqueueSnapshot(...)` starts a worker lazily and debounces snapshots by `debounceKey`. +- Runtime-state saves enqueue debounced snapshots, so routine mutation paths no longer write the runtime-state file directly. +- Synchronous `PersistenceWriter::WriteSnapshot(...)` remains for stack preset saves and transitional direct writes. +- `PersistenceWriterTests` use an injected in-memory sink to verify coalescing and non-coalesced immediate requests without touching the filesystem. ### Step 4. Add Atomic Write And Failure Reporting diff --git a/tests/PersistenceWriterTests.cpp b/tests/PersistenceWriterTests.cpp new file mode 100644 index 0000000..d376a09 --- /dev/null +++ b/tests/PersistenceWriterTests.cpp @@ -0,0 +1,112 @@ +#include "PersistenceWriter.h" + +#include +#include +#include +#include +#include + +namespace +{ +int gFailures = 0; + +void Expect(bool condition, const char* message) +{ + if (condition) + return; + + std::cerr << "FAIL: " << message << "\n"; + ++gFailures; +} + +PersistenceSnapshot MakeRuntimeSnapshot(const std::string& contents) +{ + PersistenceSnapshot snapshot; + snapshot.targetKind = PersistenceTargetKind::RuntimeState; + snapshot.targetPath = std::filesystem::temp_directory_path() / "video-shader-persistence-writer-test.json"; + snapshot.contents = contents; + snapshot.reason = "test"; + snapshot.debounceKey = "runtime-state"; + snapshot.debounceAllowed = true; + return snapshot; +} + +void TestDebouncedRequestsCoalesceToNewestSnapshot() +{ + std::mutex mutex; + std::vector writtenSnapshots; + PersistenceWriter writer( + std::chrono::milliseconds(1000), + [&](const PersistenceSnapshot& snapshot, std::string&) { + std::lock_guard lock(mutex); + writtenSnapshots.push_back(snapshot); + return true; + }); + + std::string error; + Expect(writer.EnqueueSnapshot(MakeRuntimeSnapshot("first"), error), "first debounced snapshot enqueues"); + Expect(writer.EnqueueSnapshot(MakeRuntimeSnapshot("second"), error), "second debounced snapshot enqueues"); + + PersistenceWriterMetrics metrics = writer.GetMetrics(); + Expect(metrics.pendingCount == 1, "debounced snapshots share one pending slot"); + Expect(metrics.enqueuedCount == 1, "first debounced snapshot counts as enqueue"); + Expect(metrics.coalescedCount == 1, "second debounced snapshot counts as coalesced"); + + writer.StopAndFlush(); + + { + std::lock_guard lock(mutex); + Expect(writtenSnapshots.size() == 1, "flush writes one coalesced snapshot"); + Expect(!writtenSnapshots.empty() && writtenSnapshots[0].contents == "second", "coalesced writer keeps newest snapshot"); + } + + metrics = writer.GetMetrics(); + Expect(metrics.pendingCount == 0, "flush drains pending debounced snapshot"); + Expect(metrics.writtenCount == 1, "flush records one successful write"); +} + +void TestImmediateRequestsAreNotCoalesced() +{ + std::mutex mutex; + std::vector writtenSnapshots; + PersistenceWriter writer( + std::chrono::milliseconds(1000), + [&](const PersistenceSnapshot& snapshot, std::string&) { + std::lock_guard lock(mutex); + writtenSnapshots.push_back(snapshot); + return true; + }); + + PersistenceSnapshot first = MakeRuntimeSnapshot("first"); + first.debounceAllowed = false; + PersistenceSnapshot second = MakeRuntimeSnapshot("second"); + second.debounceAllowed = false; + + std::string error; + Expect(writer.EnqueueSnapshot(first, error), "first immediate snapshot enqueues"); + Expect(writer.EnqueueSnapshot(second, error), "second immediate snapshot enqueues"); + writer.StopAndFlush(); + + { + std::lock_guard lock(mutex); + Expect(writtenSnapshots.size() == 2, "immediate snapshots are written independently"); + Expect(writtenSnapshots.size() == 2 && writtenSnapshots[0].contents == "first" && writtenSnapshots[1].contents == "second", + "immediate snapshots preserve order"); + } +} +} + +int main() +{ + TestDebouncedRequestsCoalesceToNewestSnapshot(); + TestImmediateRequestsAreNotCoalesced(); + + if (gFailures != 0) + { + std::cerr << gFailures << " persistence writer test(s) failed.\n"; + return 1; + } + + std::cout << "Persistence writer tests passed.\n"; + return 0; +}