step 6
This commit is contained in:
@@ -58,6 +58,12 @@ OpenGLComposite::~OpenGLComposite()
|
|||||||
mShaderBuildQueue->Stop();
|
mShaderBuildQueue->Stop();
|
||||||
if (mVideoBackend)
|
if (mVideoBackend)
|
||||||
mVideoBackend->ReleaseResources();
|
mVideoBackend->ReleaseResources();
|
||||||
|
if (mRuntimeStore)
|
||||||
|
{
|
||||||
|
std::string persistenceError;
|
||||||
|
if (!mRuntimeStore->FlushPersistenceForShutdown(std::chrono::seconds(2), persistenceError))
|
||||||
|
OutputDebugStringA((std::string("Persistence shutdown flush failed: ") + persistenceError + "\n").c_str());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool OpenGLComposite::InitDeckLink()
|
bool OpenGLComposite::InitDeckLink()
|
||||||
@@ -277,6 +283,13 @@ bool OpenGLComposite::Stop()
|
|||||||
if (mRenderEngine)
|
if (mRenderEngine)
|
||||||
mRenderEngine->StopRenderThread();
|
mRenderEngine->StopRenderThread();
|
||||||
|
|
||||||
|
if (mRuntimeStore)
|
||||||
|
{
|
||||||
|
std::string persistenceError;
|
||||||
|
if (!mRuntimeStore->FlushPersistenceForShutdown(std::chrono::seconds(2), persistenceError))
|
||||||
|
OutputDebugStringA((std::string("Persistence shutdown flush failed: ") + persistenceError + "\n").c_str());
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -72,6 +72,12 @@ bool PersistenceWriter::EnqueueSnapshot(const PersistenceSnapshot& snapshot, std
|
|||||||
}
|
}
|
||||||
|
|
||||||
void PersistenceWriter::StopAndFlush()
|
void PersistenceWriter::StopAndFlush()
|
||||||
|
{
|
||||||
|
std::string error;
|
||||||
|
StopAndFlush((std::chrono::milliseconds::max)(), error);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool PersistenceWriter::StopAndFlush(std::chrono::milliseconds timeout, std::string& error)
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(mMutex);
|
std::lock_guard<std::mutex> lock(mMutex);
|
||||||
@@ -83,8 +89,28 @@ void PersistenceWriter::StopAndFlush()
|
|||||||
}
|
}
|
||||||
mCondition.notify_all();
|
mCondition.notify_all();
|
||||||
|
|
||||||
|
std::unique_lock<std::mutex> lock(mMutex);
|
||||||
|
if (mWorkerRunning)
|
||||||
|
{
|
||||||
|
if (timeout == (std::chrono::milliseconds::max)())
|
||||||
|
{
|
||||||
|
mCondition.wait(lock, [this]() { return !mWorkerRunning; });
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
const auto deadline = std::chrono::steady_clock::now() + timeout;
|
||||||
|
if (!mCondition.wait_until(lock, deadline, [this]() { return !mWorkerRunning; }))
|
||||||
|
{
|
||||||
|
error = "Timed out while flushing persistence writer.";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lock.unlock();
|
||||||
|
|
||||||
if (mWorker.joinable())
|
if (mWorker.joinable())
|
||||||
mWorker.join();
|
mWorker.join();
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
PersistenceWriterMetrics PersistenceWriter::GetMetrics() const
|
PersistenceWriterMetrics PersistenceWriter::GetMetrics() const
|
||||||
@@ -221,6 +247,7 @@ void PersistenceWriter::WorkerMain()
|
|||||||
if (mStopping)
|
if (mStopping)
|
||||||
{
|
{
|
||||||
mWorkerRunning = false;
|
mWorkerRunning = false;
|
||||||
|
mCondition.notify_all();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ public:
|
|||||||
void SetResultCallback(ResultCallback callback);
|
void SetResultCallback(ResultCallback callback);
|
||||||
bool WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error);
|
bool WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error);
|
||||||
bool EnqueueSnapshot(const PersistenceSnapshot& snapshot, std::string& error);
|
bool EnqueueSnapshot(const PersistenceSnapshot& snapshot, std::string& error);
|
||||||
|
bool StopAndFlush(std::chrono::milliseconds timeout, std::string& error);
|
||||||
void StopAndFlush();
|
void StopAndFlush();
|
||||||
PersistenceWriterMetrics GetMetrics() const;
|
PersistenceWriterMetrics GetMetrics() const;
|
||||||
|
|
||||||
|
|||||||
@@ -156,6 +156,21 @@ bool RuntimeStore::RequestPersistence(const PersistenceRequest& request, std::st
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool RuntimeStore::FlushPersistenceForShutdown(std::chrono::milliseconds timeout, std::string& error)
|
||||||
|
{
|
||||||
|
if (mPersistenceWriter.StopAndFlush(timeout, error))
|
||||||
|
return true;
|
||||||
|
|
||||||
|
mHealthTelemetry.RecordPersistenceWriteResult(
|
||||||
|
false,
|
||||||
|
PersistenceTargetKindName(PersistenceTargetKind::RuntimeState),
|
||||||
|
std::string(),
|
||||||
|
"shutdown-flush",
|
||||||
|
error,
|
||||||
|
true);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
PersistenceSnapshot RuntimeStore::BuildRuntimeStatePersistenceSnapshotLocked(const PersistenceRequest& request) const
|
PersistenceSnapshot RuntimeStore::BuildRuntimeStatePersistenceSnapshotLocked(const PersistenceRequest& request) const
|
||||||
{
|
{
|
||||||
PersistenceSnapshot snapshot;
|
PersistenceSnapshot snapshot;
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ public:
|
|||||||
std::string BuildPersistentStateJson() const;
|
std::string BuildPersistentStateJson() const;
|
||||||
PersistenceSnapshot BuildRuntimeStatePersistenceSnapshot(const PersistenceRequest& request) const;
|
PersistenceSnapshot BuildRuntimeStatePersistenceSnapshot(const PersistenceRequest& request) const;
|
||||||
bool RequestPersistence(const PersistenceRequest& request, std::string& error);
|
bool RequestPersistence(const PersistenceRequest& request, std::string& error);
|
||||||
|
bool FlushPersistenceForShutdown(std::chrono::milliseconds timeout, std::string& error);
|
||||||
bool PollStoredFileChanges(bool& registryChanged, bool& reloadRequested, std::string& error);
|
bool PollStoredFileChanges(bool& registryChanged, bool& reloadRequested, std::string& error);
|
||||||
|
|
||||||
bool CreateStoredLayer(const std::string& shaderId, std::string& error);
|
bool CreateStoredLayer(const std::string& shaderId, std::string& error);
|
||||||
|
|||||||
@@ -6,8 +6,8 @@ Phases 1-5 separate durable state, coordination policy, render-facing snapshots,
|
|||||||
|
|
||||||
## Status
|
## Status
|
||||||
|
|
||||||
- Phase 6 design package: proposed.
|
- Phase 6 design package: complete.
|
||||||
- Phase 6 implementation: Step 5 complete.
|
- Phase 6 implementation: Step 6 complete.
|
||||||
- Current alignment: `RuntimeStore` owns durable serialization, config, package metadata, preset IO, and persistence request execution; `CommittedLiveState` owns the current committed/session layer state; and `RuntimeCoordinator` publishes typed persistence requests for persisted mutations. Runtime-state persistence is now requested through the coordinator/event path and executed by the background writer.
|
- Current alignment: `RuntimeStore` owns durable serialization, config, package metadata, preset IO, and persistence request execution; `CommittedLiveState` owns the current committed/session layer state; and `RuntimeCoordinator` publishes typed persistence requests for persisted mutations. Runtime-state persistence is now requested through the coordinator/event path and executed by the background writer.
|
||||||
|
|
||||||
Current persistence footholds:
|
Current persistence footholds:
|
||||||
@@ -270,9 +270,16 @@ Make app shutdown persistence behavior deterministic.
|
|||||||
|
|
||||||
Initial target:
|
Initial target:
|
||||||
|
|
||||||
- stop accepting new requests
|
- [x] stop accepting new requests
|
||||||
- flush latest pending snapshot with bounded wait
|
- [x] flush latest pending snapshot with bounded wait
|
||||||
- report failure if flush fails
|
- [x] report failure if flush fails
|
||||||
|
|
||||||
|
Current implementation:
|
||||||
|
|
||||||
|
- `PersistenceWriter::StopAndFlush(timeout, error)` stops accepting new snapshots, forces debounced snapshots ready, drains pending work, and reports timeout/failure to the caller.
|
||||||
|
- `RuntimeStore::FlushPersistenceForShutdown(...)` provides the runtime-level shutdown API and records flush failures in `HealthTelemetry`.
|
||||||
|
- `OpenGLComposite::Stop()` and the destructor explicitly flush persistence after control services/backend/render-thread shutdown.
|
||||||
|
- `PersistenceWriterTests` cover shutdown draining, request rejection after shutdown, and timeout/retry behavior without rendering or DeckLink.
|
||||||
|
|
||||||
## Testing Strategy
|
## Testing Strategy
|
||||||
|
|
||||||
@@ -320,7 +327,7 @@ Phase 6 can be considered complete once the project can say:
|
|||||||
- [x] writes use temp-file/replace or equivalent atomic policy
|
- [x] writes use temp-file/replace or equivalent atomic policy
|
||||||
- [x] persistence failures are reported through structured health/events
|
- [x] persistence failures are reported through structured health/events
|
||||||
- [x] transient/live-only mutations do not request persistence
|
- [x] transient/live-only mutations do not request persistence
|
||||||
- [ ] shutdown flush behavior is explicit and tested
|
- [x] shutdown flush behavior is explicit and tested
|
||||||
- [x] `RuntimeStore` remains durable-state/serialization owner, not worker policy owner
|
- [x] `RuntimeStore` remains durable-state/serialization owner, not worker policy owner
|
||||||
- [x] persistence behavior has focused non-render tests
|
- [x] persistence behavior has focused non-render tests
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
#include "PersistenceWriter.h"
|
#include "PersistenceWriter.h"
|
||||||
|
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <chrono>
|
||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
@@ -123,6 +125,70 @@ void TestWriteFailureReportsStructuredResult()
|
|||||||
Expect(!results.empty() && !results[0].newerRequestPending, "writer result reports no newer pending request");
|
Expect(!results.empty() && !results[0].newerRequestPending, "writer result reports no newer pending request");
|
||||||
Expect(writer.GetMetrics().failedCount == 1, "writer metrics count failed writes");
|
Expect(writer.GetMetrics().failedCount == 1, "writer metrics count failed writes");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void TestShutdownFlushDrainsPendingSnapshotAndRejectsNewRequests()
|
||||||
|
{
|
||||||
|
std::mutex mutex;
|
||||||
|
std::vector<PersistenceSnapshot> writtenSnapshots;
|
||||||
|
PersistenceWriter writer(
|
||||||
|
std::chrono::milliseconds(1000),
|
||||||
|
[&](const PersistenceSnapshot& snapshot, std::string&) {
|
||||||
|
std::lock_guard<std::mutex> lock(mutex);
|
||||||
|
writtenSnapshots.push_back(snapshot);
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
|
||||||
|
std::string error;
|
||||||
|
Expect(writer.EnqueueSnapshot(MakeRuntimeSnapshot("pending"), error), "pending snapshot enqueues before shutdown");
|
||||||
|
Expect(writer.StopAndFlush(std::chrono::seconds(1), error), "bounded shutdown flush completes");
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mutex);
|
||||||
|
Expect(writtenSnapshots.size() == 1, "shutdown flush writes pending debounced snapshot");
|
||||||
|
Expect(!writtenSnapshots.empty() && writtenSnapshots[0].contents == "pending", "shutdown flush preserves pending snapshot contents");
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(!writer.EnqueueSnapshot(MakeRuntimeSnapshot("late"), error), "writer rejects requests after shutdown flush");
|
||||||
|
}
|
||||||
|
|
||||||
|
void TestShutdownFlushTimeoutCanBeRetried()
|
||||||
|
{
|
||||||
|
std::mutex mutex;
|
||||||
|
std::condition_variable condition;
|
||||||
|
bool sinkStarted = false;
|
||||||
|
bool releaseSink = false;
|
||||||
|
PersistenceWriter writer(
|
||||||
|
std::chrono::milliseconds(1),
|
||||||
|
[&](const PersistenceSnapshot&, std::string&) {
|
||||||
|
std::unique_lock<std::mutex> lock(mutex);
|
||||||
|
sinkStarted = true;
|
||||||
|
condition.notify_all();
|
||||||
|
condition.wait(lock, [&]() { return releaseSink; });
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
|
||||||
|
PersistenceSnapshot snapshot = MakeRuntimeSnapshot("slow");
|
||||||
|
snapshot.debounceAllowed = false;
|
||||||
|
|
||||||
|
std::string error;
|
||||||
|
Expect(writer.EnqueueSnapshot(snapshot, error), "slow snapshot enqueues");
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(mutex);
|
||||||
|
Expect(condition.wait_for(lock, std::chrono::seconds(1), [&]() { return sinkStarted; }),
|
||||||
|
"slow sink starts before timeout test");
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(!writer.StopAndFlush(std::chrono::milliseconds(10), error), "bounded shutdown flush reports timeout");
|
||||||
|
Expect(error.find("Timed out") != std::string::npos, "shutdown timeout returns a useful error");
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mutex);
|
||||||
|
releaseSink = true;
|
||||||
|
}
|
||||||
|
condition.notify_all();
|
||||||
|
error.clear();
|
||||||
|
Expect(writer.StopAndFlush(std::chrono::seconds(1), error), "shutdown flush can complete after earlier timeout");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int main()
|
int main()
|
||||||
@@ -130,6 +196,8 @@ int main()
|
|||||||
TestDebouncedRequestsCoalesceToNewestSnapshot();
|
TestDebouncedRequestsCoalesceToNewestSnapshot();
|
||||||
TestImmediateRequestsAreNotCoalesced();
|
TestImmediateRequestsAreNotCoalesced();
|
||||||
TestWriteFailureReportsStructuredResult();
|
TestWriteFailureReportsStructuredResult();
|
||||||
|
TestShutdownFlushDrainsPendingSnapshotAndRejectsNewRequests();
|
||||||
|
TestShutdownFlushTimeoutCanBeRetried();
|
||||||
|
|
||||||
if (gFailures != 0)
|
if (gFailures != 0)
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user