diff --git a/apps/LoopThroughWithOpenGLCompositing/gl/composite/OpenGLComposite.cpp b/apps/LoopThroughWithOpenGLCompositing/gl/composite/OpenGLComposite.cpp index c328467..6efbf99 100644 --- a/apps/LoopThroughWithOpenGLCompositing/gl/composite/OpenGLComposite.cpp +++ b/apps/LoopThroughWithOpenGLCompositing/gl/composite/OpenGLComposite.cpp @@ -58,6 +58,12 @@ OpenGLComposite::~OpenGLComposite() mShaderBuildQueue->Stop(); if (mVideoBackend) mVideoBackend->ReleaseResources(); + if (mRuntimeStore) + { + std::string persistenceError; + if (!mRuntimeStore->FlushPersistenceForShutdown(std::chrono::seconds(2), persistenceError)) + OutputDebugStringA((std::string("Persistence shutdown flush failed: ") + persistenceError + "\n").c_str()); + } } bool OpenGLComposite::InitDeckLink() @@ -277,6 +283,13 @@ bool OpenGLComposite::Stop() if (mRenderEngine) mRenderEngine->StopRenderThread(); + if (mRuntimeStore) + { + std::string persistenceError; + if (!mRuntimeStore->FlushPersistenceForShutdown(std::chrono::seconds(2), persistenceError)) + OutputDebugStringA((std::string("Persistence shutdown flush failed: ") + persistenceError + "\n").c_str()); + } + return true; } diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.cpp b/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.cpp index e4b47ae..67cfd06 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.cpp +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.cpp @@ -72,6 +72,12 @@ bool PersistenceWriter::EnqueueSnapshot(const PersistenceSnapshot& snapshot, std } void PersistenceWriter::StopAndFlush() +{ + std::string error; + StopAndFlush((std::chrono::milliseconds::max)(), error); +} + +bool PersistenceWriter::StopAndFlush(std::chrono::milliseconds timeout, std::string& error) { { std::lock_guard lock(mMutex); @@ -83,8 +89,28 @@ void PersistenceWriter::StopAndFlush() } mCondition.notify_all(); + std::unique_lock lock(mMutex); + if (mWorkerRunning) + { + if (timeout == (std::chrono::milliseconds::max)()) + { + mCondition.wait(lock, [this]() { return !mWorkerRunning; }); + } + else + { + const auto deadline = std::chrono::steady_clock::now() + timeout; + if (!mCondition.wait_until(lock, deadline, [this]() { return !mWorkerRunning; })) + { + error = "Timed out while flushing persistence writer."; + return false; + } + } + } + lock.unlock(); + if (mWorker.joinable()) mWorker.join(); + return true; } PersistenceWriterMetrics PersistenceWriter::GetMetrics() const @@ -221,6 +247,7 @@ void PersistenceWriter::WorkerMain() if (mStopping) { mWorkerRunning = false; + mCondition.notify_all(); return; } diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.h b/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.h index a24f56b..8d65203 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.h +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.h @@ -45,6 +45,7 @@ public: void SetResultCallback(ResultCallback callback); bool WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error); bool EnqueueSnapshot(const PersistenceSnapshot& snapshot, std::string& error); + bool StopAndFlush(std::chrono::milliseconds timeout, std::string& error); void StopAndFlush(); PersistenceWriterMetrics GetMetrics() const; diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.cpp b/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.cpp index 39abd35..96a4026 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.cpp +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.cpp @@ -156,6 +156,21 @@ bool RuntimeStore::RequestPersistence(const PersistenceRequest& request, std::st return false; } +bool RuntimeStore::FlushPersistenceForShutdown(std::chrono::milliseconds timeout, std::string& error) +{ + if (mPersistenceWriter.StopAndFlush(timeout, error)) + return true; + + mHealthTelemetry.RecordPersistenceWriteResult( + false, + PersistenceTargetKindName(PersistenceTargetKind::RuntimeState), + std::string(), + "shutdown-flush", + error, + true); + return false; +} + PersistenceSnapshot RuntimeStore::BuildRuntimeStatePersistenceSnapshotLocked(const PersistenceRequest& request) const { PersistenceSnapshot snapshot; diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.h b/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.h index ffc0351..087b10f 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.h +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.h @@ -33,6 +33,7 @@ public: std::string BuildPersistentStateJson() const; PersistenceSnapshot BuildRuntimeStatePersistenceSnapshot(const PersistenceRequest& request) const; bool RequestPersistence(const PersistenceRequest& request, std::string& error); + bool FlushPersistenceForShutdown(std::chrono::milliseconds timeout, std::string& error); bool PollStoredFileChanges(bool& registryChanged, bool& reloadRequested, std::string& error); bool CreateStoredLayer(const std::string& shaderId, std::string& error); diff --git a/docs/PHASE_6_BACKGROUND_PERSISTENCE_DESIGN.md b/docs/PHASE_6_BACKGROUND_PERSISTENCE_DESIGN.md index f4c926d..51c8def 100644 --- a/docs/PHASE_6_BACKGROUND_PERSISTENCE_DESIGN.md +++ b/docs/PHASE_6_BACKGROUND_PERSISTENCE_DESIGN.md @@ -6,8 +6,8 @@ Phases 1-5 separate durable state, coordination policy, render-facing snapshots, ## Status -- Phase 6 design package: proposed. -- Phase 6 implementation: Step 5 complete. +- Phase 6 design package: complete. +- Phase 6 implementation: Step 6 complete. - Current alignment: `RuntimeStore` owns durable serialization, config, package metadata, preset IO, and persistence request execution; `CommittedLiveState` owns the current committed/session layer state; and `RuntimeCoordinator` publishes typed persistence requests for persisted mutations. Runtime-state persistence is now requested through the coordinator/event path and executed by the background writer. Current persistence footholds: @@ -270,9 +270,16 @@ Make app shutdown persistence behavior deterministic. Initial target: -- stop accepting new requests -- flush latest pending snapshot with bounded wait -- report failure if flush fails +- [x] stop accepting new requests +- [x] flush latest pending snapshot with bounded wait +- [x] report failure if flush fails + +Current implementation: + +- `PersistenceWriter::StopAndFlush(timeout, error)` stops accepting new snapshots, forces debounced snapshots ready, drains pending work, and reports timeout/failure to the caller. +- `RuntimeStore::FlushPersistenceForShutdown(...)` provides the runtime-level shutdown API and records flush failures in `HealthTelemetry`. +- `OpenGLComposite::Stop()` and the destructor explicitly flush persistence after control services/backend/render-thread shutdown. +- `PersistenceWriterTests` cover shutdown draining, request rejection after shutdown, and timeout/retry behavior without rendering or DeckLink. ## Testing Strategy @@ -320,7 +327,7 @@ Phase 6 can be considered complete once the project can say: - [x] writes use temp-file/replace or equivalent atomic policy - [x] persistence failures are reported through structured health/events - [x] transient/live-only mutations do not request persistence -- [ ] shutdown flush behavior is explicit and tested +- [x] shutdown flush behavior is explicit and tested - [x] `RuntimeStore` remains durable-state/serialization owner, not worker policy owner - [x] persistence behavior has focused non-render tests diff --git a/tests/PersistenceWriterTests.cpp b/tests/PersistenceWriterTests.cpp index d9d4a62..0148c00 100644 --- a/tests/PersistenceWriterTests.cpp +++ b/tests/PersistenceWriterTests.cpp @@ -1,5 +1,7 @@ #include "PersistenceWriter.h" +#include +#include #include #include #include @@ -123,6 +125,70 @@ void TestWriteFailureReportsStructuredResult() Expect(!results.empty() && !results[0].newerRequestPending, "writer result reports no newer pending request"); Expect(writer.GetMetrics().failedCount == 1, "writer metrics count failed writes"); } + +void TestShutdownFlushDrainsPendingSnapshotAndRejectsNewRequests() +{ + std::mutex mutex; + std::vector writtenSnapshots; + PersistenceWriter writer( + std::chrono::milliseconds(1000), + [&](const PersistenceSnapshot& snapshot, std::string&) { + std::lock_guard lock(mutex); + writtenSnapshots.push_back(snapshot); + return true; + }); + + std::string error; + Expect(writer.EnqueueSnapshot(MakeRuntimeSnapshot("pending"), error), "pending snapshot enqueues before shutdown"); + Expect(writer.StopAndFlush(std::chrono::seconds(1), error), "bounded shutdown flush completes"); + + { + std::lock_guard lock(mutex); + Expect(writtenSnapshots.size() == 1, "shutdown flush writes pending debounced snapshot"); + Expect(!writtenSnapshots.empty() && writtenSnapshots[0].contents == "pending", "shutdown flush preserves pending snapshot contents"); + } + + Expect(!writer.EnqueueSnapshot(MakeRuntimeSnapshot("late"), error), "writer rejects requests after shutdown flush"); +} + +void TestShutdownFlushTimeoutCanBeRetried() +{ + std::mutex mutex; + std::condition_variable condition; + bool sinkStarted = false; + bool releaseSink = false; + PersistenceWriter writer( + std::chrono::milliseconds(1), + [&](const PersistenceSnapshot&, std::string&) { + std::unique_lock lock(mutex); + sinkStarted = true; + condition.notify_all(); + condition.wait(lock, [&]() { return releaseSink; }); + return true; + }); + + PersistenceSnapshot snapshot = MakeRuntimeSnapshot("slow"); + snapshot.debounceAllowed = false; + + std::string error; + Expect(writer.EnqueueSnapshot(snapshot, error), "slow snapshot enqueues"); + { + std::unique_lock lock(mutex); + Expect(condition.wait_for(lock, std::chrono::seconds(1), [&]() { return sinkStarted; }), + "slow sink starts before timeout test"); + } + + Expect(!writer.StopAndFlush(std::chrono::milliseconds(10), error), "bounded shutdown flush reports timeout"); + Expect(error.find("Timed out") != std::string::npos, "shutdown timeout returns a useful error"); + + { + std::lock_guard lock(mutex); + releaseSink = true; + } + condition.notify_all(); + error.clear(); + Expect(writer.StopAndFlush(std::chrono::seconds(1), error), "shutdown flush can complete after earlier timeout"); +} } int main() @@ -130,6 +196,8 @@ int main() TestDebouncedRequestsCoalesceToNewestSnapshot(); TestImmediateRequestsAreNotCoalesced(); TestWriteFailureReportsStructuredResult(); + TestShutdownFlushDrainsPendingSnapshotAndRejectsNewRequests(); + TestShutdownFlushTimeoutCanBeRetried(); if (gFailures != 0) {