Step 3
This commit is contained in:
@@ -345,6 +345,23 @@ endif()
|
||||
|
||||
add_test(NAME RuntimeStateLayerModelTests COMMAND RuntimeStateLayerModelTests)
|
||||
|
||||
add_executable(PersistenceWriterTests
|
||||
"${APP_DIR}/runtime/persistence/PersistenceWriter.cpp"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/tests/PersistenceWriterTests.cpp"
|
||||
)
|
||||
|
||||
target_include_directories(PersistenceWriterTests PRIVATE
|
||||
"${APP_DIR}"
|
||||
"${APP_DIR}/runtime"
|
||||
"${APP_DIR}/runtime/persistence"
|
||||
)
|
||||
|
||||
if(MSVC)
|
||||
target_compile_options(PersistenceWriterTests PRIVATE /W3)
|
||||
endif()
|
||||
|
||||
add_test(NAME PersistenceWriterTests COMMAND PersistenceWriterTests)
|
||||
|
||||
add_executable(RuntimeSubsystemTests
|
||||
"${APP_DIR}/runtime/coordination/RuntimeCoordinator.cpp"
|
||||
"${APP_DIR}/runtime/live/CommittedLiveState.cpp"
|
||||
|
||||
@@ -37,5 +37,8 @@ struct PersistenceSnapshot
|
||||
std::filesystem::path targetPath;
|
||||
std::string contents;
|
||||
std::string reason;
|
||||
std::string debounceKey;
|
||||
bool debounceAllowed = false;
|
||||
bool flushRequested = false;
|
||||
uint64_t generation = 0;
|
||||
};
|
||||
|
||||
@@ -2,10 +2,96 @@
|
||||
|
||||
#include <windows.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <filesystem>
|
||||
#include <fstream>
|
||||
#include <utility>
|
||||
|
||||
PersistenceWriter::PersistenceWriter(std::chrono::milliseconds debounceDelay, SnapshotSink sink) :
|
||||
mDebounceDelay(debounceDelay),
|
||||
mSink(std::move(sink))
|
||||
{
|
||||
}
|
||||
|
||||
PersistenceWriter::~PersistenceWriter()
|
||||
{
|
||||
StopAndFlush();
|
||||
}
|
||||
|
||||
bool PersistenceWriter::WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const
|
||||
{
|
||||
if (!ValidateSnapshot(snapshot, error))
|
||||
return false;
|
||||
|
||||
return WriteSnapshotThroughSink(snapshot, error);
|
||||
}
|
||||
|
||||
bool PersistenceWriter::EnqueueSnapshot(const PersistenceSnapshot& snapshot, std::string& error)
|
||||
{
|
||||
if (!ValidateSnapshot(snapshot, error))
|
||||
return false;
|
||||
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
if (!mAcceptingRequests)
|
||||
{
|
||||
error = "Persistence writer is stopping.";
|
||||
return false;
|
||||
}
|
||||
|
||||
StartWorkerLocked();
|
||||
|
||||
const auto now = std::chrono::steady_clock::now();
|
||||
if (snapshot.debounceAllowed)
|
||||
{
|
||||
const std::string debounceKey = snapshot.debounceKey.empty() ? snapshot.targetPath.string() : snapshot.debounceKey;
|
||||
PendingSnapshot& pending = mDebouncedSnapshots[debounceKey];
|
||||
if (!pending.snapshot.targetPath.empty())
|
||||
++mCoalescedCount;
|
||||
else
|
||||
++mEnqueuedCount;
|
||||
|
||||
pending.snapshot = snapshot;
|
||||
pending.readyAt = snapshot.flushRequested ? now : now + mDebounceDelay;
|
||||
}
|
||||
else
|
||||
{
|
||||
mImmediateSnapshots.push_back(snapshot);
|
||||
++mEnqueuedCount;
|
||||
}
|
||||
|
||||
mCondition.notify_one();
|
||||
return true;
|
||||
}
|
||||
|
||||
void PersistenceWriter::StopAndFlush()
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
mAcceptingRequests = false;
|
||||
mStopping = true;
|
||||
const auto now = std::chrono::steady_clock::now();
|
||||
for (auto& entry : mDebouncedSnapshots)
|
||||
entry.second.readyAt = now;
|
||||
}
|
||||
mCondition.notify_all();
|
||||
|
||||
if (mWorker.joinable())
|
||||
mWorker.join();
|
||||
}
|
||||
|
||||
PersistenceWriterMetrics PersistenceWriter::GetMetrics() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
PersistenceWriterMetrics metrics;
|
||||
metrics.pendingCount = PendingCountLocked();
|
||||
metrics.enqueuedCount = mEnqueuedCount;
|
||||
metrics.coalescedCount = mCoalescedCount;
|
||||
metrics.writtenCount = mWrittenCount;
|
||||
metrics.failedCount = mFailedCount;
|
||||
return metrics;
|
||||
}
|
||||
|
||||
bool PersistenceWriter::ValidateSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const
|
||||
{
|
||||
if (snapshot.targetPath.empty())
|
||||
{
|
||||
@@ -13,6 +99,14 @@ bool PersistenceWriter::WriteSnapshot(const PersistenceSnapshot& snapshot, std::
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PersistenceWriter::WriteSnapshotThroughSink(const PersistenceSnapshot& snapshot, std::string& error) const
|
||||
{
|
||||
if (mSink)
|
||||
return mSink(snapshot, error);
|
||||
|
||||
std::error_code fsError;
|
||||
std::filesystem::create_directories(snapshot.targetPath.parent_path(), fsError);
|
||||
|
||||
@@ -42,3 +136,82 @@ bool PersistenceWriter::WriteSnapshot(const PersistenceSnapshot& snapshot, std::
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void PersistenceWriter::StartWorkerLocked()
|
||||
{
|
||||
if (mWorkerRunning)
|
||||
return;
|
||||
|
||||
mWorkerRunning = true;
|
||||
mWorker = std::thread([this]() { WorkerMain(); });
|
||||
}
|
||||
|
||||
void PersistenceWriter::WorkerMain()
|
||||
{
|
||||
for (;;)
|
||||
{
|
||||
PersistenceSnapshot snapshot;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mMutex);
|
||||
for (;;)
|
||||
{
|
||||
if (!mImmediateSnapshots.empty())
|
||||
{
|
||||
snapshot = std::move(mImmediateSnapshots.front());
|
||||
mImmediateSnapshots.pop_front();
|
||||
break;
|
||||
}
|
||||
|
||||
if (!mDebouncedSnapshots.empty())
|
||||
{
|
||||
const auto now = std::chrono::steady_clock::now();
|
||||
auto readyIt = mDebouncedSnapshots.end();
|
||||
auto nextReadyAt = (std::chrono::steady_clock::time_point::max)();
|
||||
for (auto it = mDebouncedSnapshots.begin(); it != mDebouncedSnapshots.end(); ++it)
|
||||
{
|
||||
if (it->second.readyAt <= now)
|
||||
{
|
||||
readyIt = it;
|
||||
break;
|
||||
}
|
||||
if (it->second.readyAt < nextReadyAt)
|
||||
nextReadyAt = it->second.readyAt;
|
||||
}
|
||||
|
||||
if (readyIt != mDebouncedSnapshots.end())
|
||||
{
|
||||
snapshot = std::move(readyIt->second.snapshot);
|
||||
mDebouncedSnapshots.erase(readyIt);
|
||||
break;
|
||||
}
|
||||
|
||||
mCondition.wait_until(lock, nextReadyAt);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (mStopping)
|
||||
{
|
||||
mWorkerRunning = false;
|
||||
return;
|
||||
}
|
||||
|
||||
mCondition.wait(lock);
|
||||
}
|
||||
}
|
||||
|
||||
std::string error;
|
||||
const bool succeeded = WriteSnapshotThroughSink(snapshot, error);
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
if (succeeded)
|
||||
++mWrittenCount;
|
||||
else
|
||||
++mFailedCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::size_t PersistenceWriter::PendingCountLocked() const
|
||||
{
|
||||
return mImmediateSnapshots.size() + mDebouncedSnapshots.size();
|
||||
}
|
||||
|
||||
@@ -2,10 +2,65 @@
|
||||
|
||||
#include "PersistenceRequest.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <cstdint>
|
||||
#include <deque>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
|
||||
struct PersistenceWriterMetrics
|
||||
{
|
||||
std::size_t pendingCount = 0;
|
||||
uint64_t enqueuedCount = 0;
|
||||
uint64_t coalescedCount = 0;
|
||||
uint64_t writtenCount = 0;
|
||||
uint64_t failedCount = 0;
|
||||
};
|
||||
|
||||
class PersistenceWriter
|
||||
{
|
||||
public:
|
||||
using SnapshotSink = std::function<bool(const PersistenceSnapshot&, std::string&)>;
|
||||
|
||||
explicit PersistenceWriter(
|
||||
std::chrono::milliseconds debounceDelay = std::chrono::milliseconds(50),
|
||||
SnapshotSink sink = SnapshotSink());
|
||||
~PersistenceWriter();
|
||||
|
||||
bool WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const;
|
||||
bool EnqueueSnapshot(const PersistenceSnapshot& snapshot, std::string& error);
|
||||
void StopAndFlush();
|
||||
PersistenceWriterMetrics GetMetrics() const;
|
||||
|
||||
private:
|
||||
struct PendingSnapshot
|
||||
{
|
||||
PersistenceSnapshot snapshot;
|
||||
std::chrono::steady_clock::time_point readyAt;
|
||||
};
|
||||
|
||||
bool ValidateSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const;
|
||||
bool WriteSnapshotThroughSink(const PersistenceSnapshot& snapshot, std::string& error) const;
|
||||
void StartWorkerLocked();
|
||||
void WorkerMain();
|
||||
std::size_t PendingCountLocked() const;
|
||||
|
||||
std::chrono::milliseconds mDebounceDelay;
|
||||
SnapshotSink mSink;
|
||||
mutable std::mutex mMutex;
|
||||
std::condition_variable mCondition;
|
||||
std::thread mWorker;
|
||||
bool mWorkerRunning = false;
|
||||
bool mStopping = false;
|
||||
bool mAcceptingRequests = true;
|
||||
std::unordered_map<std::string, PendingSnapshot> mDebouncedSnapshots;
|
||||
std::deque<PersistenceSnapshot> mImmediateSnapshots;
|
||||
uint64_t mEnqueuedCount = 0;
|
||||
uint64_t mCoalescedCount = 0;
|
||||
uint64_t mWrittenCount = 0;
|
||||
uint64_t mFailedCount = 0;
|
||||
};
|
||||
|
||||
@@ -111,6 +111,9 @@ PersistenceSnapshot RuntimeStore::BuildRuntimeStatePersistenceSnapshotLocked(con
|
||||
snapshot.targetPath = mConfigStore.GetRuntimeStatePath();
|
||||
snapshot.contents = SerializeJson(mCommittedLiveState.BuildPersistentStateValue(mShaderCatalog), true);
|
||||
snapshot.reason = request.reason;
|
||||
snapshot.debounceKey = request.debounceKey;
|
||||
snapshot.debounceAllowed = request.debounceAllowed;
|
||||
snapshot.flushRequested = request.flushRequested;
|
||||
snapshot.generation = request.sequence;
|
||||
return snapshot;
|
||||
}
|
||||
@@ -478,7 +481,7 @@ bool RuntimeStore::LoadPersistentState(std::string& error)
|
||||
|
||||
bool RuntimeStore::SavePersistentState(std::string& error) const
|
||||
{
|
||||
return mPersistenceWriter.WriteSnapshot(BuildRuntimeStatePersistenceSnapshotLocked(PersistenceRequest::RuntimeStateRequest("SavePersistentState")), error);
|
||||
return mPersistenceWriter.EnqueueSnapshot(BuildRuntimeStatePersistenceSnapshotLocked(PersistenceRequest::RuntimeStateRequest("SavePersistentState")), error);
|
||||
}
|
||||
|
||||
PersistenceSnapshot RuntimeStore::BuildStackPresetPersistenceSnapshot(const std::string& presetName) const
|
||||
@@ -490,6 +493,9 @@ PersistenceSnapshot RuntimeStore::BuildStackPresetPersistenceSnapshot(const std:
|
||||
snapshot.targetPath = mConfigStore.GetPresetRoot() / (safeStem + ".json");
|
||||
snapshot.contents = SerializeJson(mCommittedLiveState.BuildStackPresetValue(mShaderCatalog, presetName), true);
|
||||
snapshot.reason = "SaveStackPreset";
|
||||
snapshot.debounceKey = "stack-preset:" + safeStem;
|
||||
snapshot.debounceAllowed = false;
|
||||
snapshot.flushRequested = true;
|
||||
snapshot.generation = 0;
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
@@ -93,7 +93,7 @@ private:
|
||||
void MarkParameterStateDirtyLocked();
|
||||
|
||||
RenderSnapshotBuilder mRenderSnapshotBuilder;
|
||||
PersistenceWriter mPersistenceWriter;
|
||||
mutable PersistenceWriter mPersistenceWriter;
|
||||
RuntimeConfigStore mConfigStore;
|
||||
ShaderPackageCatalog mShaderCatalog;
|
||||
CommittedLiveState mCommittedLiveState;
|
||||
|
||||
@@ -7,7 +7,7 @@ Phases 1-5 separate durable state, coordination policy, render-facing snapshots,
|
||||
## Status
|
||||
|
||||
- Phase 6 design package: proposed.
|
||||
- Phase 6 implementation: Step 2 complete.
|
||||
- Phase 6 implementation: Step 3 complete.
|
||||
- Current alignment: `RuntimeStore` owns durable serialization, config, package metadata, preset IO, and persistence requests; `CommittedLiveState` owns the current committed/session layer state; and `RuntimeCoordinator` publishes typed persistence requests for persisted mutations. The remaining issue is that actual disk writes are still synchronous store work rather than queued, debounced, atomic background writes.
|
||||
|
||||
Current persistence footholds:
|
||||
@@ -218,9 +218,16 @@ Introduce a worker thread or queued task owner.
|
||||
|
||||
Initial target:
|
||||
|
||||
- repeated runtime-state requests coalesce
|
||||
- worker writes only latest pending snapshot
|
||||
- tests cover coalescing without filesystem where possible
|
||||
- [x] repeated runtime-state requests coalesce
|
||||
- [x] worker writes only latest pending snapshot
|
||||
- [x] tests cover coalescing without filesystem where possible
|
||||
|
||||
Current implementation:
|
||||
|
||||
- `PersistenceWriter::EnqueueSnapshot(...)` starts a worker lazily and debounces snapshots by `debounceKey`.
|
||||
- Runtime-state saves enqueue debounced snapshots, so routine mutation paths no longer write the runtime-state file directly.
|
||||
- Synchronous `PersistenceWriter::WriteSnapshot(...)` remains for stack preset saves and transitional direct writes.
|
||||
- `PersistenceWriterTests` use an injected in-memory sink to verify coalescing and non-coalesced immediate requests without touching the filesystem.
|
||||
|
||||
### Step 4. Add Atomic Write And Failure Reporting
|
||||
|
||||
|
||||
112
tests/PersistenceWriterTests.cpp
Normal file
112
tests/PersistenceWriterTests.cpp
Normal file
@@ -0,0 +1,112 @@
|
||||
#include "PersistenceWriter.h"
|
||||
|
||||
#include <filesystem>
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace
|
||||
{
|
||||
int gFailures = 0;
|
||||
|
||||
void Expect(bool condition, const char* message)
|
||||
{
|
||||
if (condition)
|
||||
return;
|
||||
|
||||
std::cerr << "FAIL: " << message << "\n";
|
||||
++gFailures;
|
||||
}
|
||||
|
||||
PersistenceSnapshot MakeRuntimeSnapshot(const std::string& contents)
|
||||
{
|
||||
PersistenceSnapshot snapshot;
|
||||
snapshot.targetKind = PersistenceTargetKind::RuntimeState;
|
||||
snapshot.targetPath = std::filesystem::temp_directory_path() / "video-shader-persistence-writer-test.json";
|
||||
snapshot.contents = contents;
|
||||
snapshot.reason = "test";
|
||||
snapshot.debounceKey = "runtime-state";
|
||||
snapshot.debounceAllowed = true;
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
void TestDebouncedRequestsCoalesceToNewestSnapshot()
|
||||
{
|
||||
std::mutex mutex;
|
||||
std::vector<PersistenceSnapshot> writtenSnapshots;
|
||||
PersistenceWriter writer(
|
||||
std::chrono::milliseconds(1000),
|
||||
[&](const PersistenceSnapshot& snapshot, std::string&) {
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
writtenSnapshots.push_back(snapshot);
|
||||
return true;
|
||||
});
|
||||
|
||||
std::string error;
|
||||
Expect(writer.EnqueueSnapshot(MakeRuntimeSnapshot("first"), error), "first debounced snapshot enqueues");
|
||||
Expect(writer.EnqueueSnapshot(MakeRuntimeSnapshot("second"), error), "second debounced snapshot enqueues");
|
||||
|
||||
PersistenceWriterMetrics metrics = writer.GetMetrics();
|
||||
Expect(metrics.pendingCount == 1, "debounced snapshots share one pending slot");
|
||||
Expect(metrics.enqueuedCount == 1, "first debounced snapshot counts as enqueue");
|
||||
Expect(metrics.coalescedCount == 1, "second debounced snapshot counts as coalesced");
|
||||
|
||||
writer.StopAndFlush();
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
Expect(writtenSnapshots.size() == 1, "flush writes one coalesced snapshot");
|
||||
Expect(!writtenSnapshots.empty() && writtenSnapshots[0].contents == "second", "coalesced writer keeps newest snapshot");
|
||||
}
|
||||
|
||||
metrics = writer.GetMetrics();
|
||||
Expect(metrics.pendingCount == 0, "flush drains pending debounced snapshot");
|
||||
Expect(metrics.writtenCount == 1, "flush records one successful write");
|
||||
}
|
||||
|
||||
void TestImmediateRequestsAreNotCoalesced()
|
||||
{
|
||||
std::mutex mutex;
|
||||
std::vector<PersistenceSnapshot> writtenSnapshots;
|
||||
PersistenceWriter writer(
|
||||
std::chrono::milliseconds(1000),
|
||||
[&](const PersistenceSnapshot& snapshot, std::string&) {
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
writtenSnapshots.push_back(snapshot);
|
||||
return true;
|
||||
});
|
||||
|
||||
PersistenceSnapshot first = MakeRuntimeSnapshot("first");
|
||||
first.debounceAllowed = false;
|
||||
PersistenceSnapshot second = MakeRuntimeSnapshot("second");
|
||||
second.debounceAllowed = false;
|
||||
|
||||
std::string error;
|
||||
Expect(writer.EnqueueSnapshot(first, error), "first immediate snapshot enqueues");
|
||||
Expect(writer.EnqueueSnapshot(second, error), "second immediate snapshot enqueues");
|
||||
writer.StopAndFlush();
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
Expect(writtenSnapshots.size() == 2, "immediate snapshots are written independently");
|
||||
Expect(writtenSnapshots.size() == 2 && writtenSnapshots[0].contents == "first" && writtenSnapshots[1].contents == "second",
|
||||
"immediate snapshots preserve order");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
TestDebouncedRequestsCoalesceToNewestSnapshot();
|
||||
TestImmediateRequestsAreNotCoalesced();
|
||||
|
||||
if (gFailures != 0)
|
||||
{
|
||||
std::cerr << gFailures << " persistence writer test(s) failed.\n";
|
||||
return 1;
|
||||
}
|
||||
|
||||
std::cout << "Persistence writer tests passed.\n";
|
||||
return 0;
|
||||
}
|
||||
Reference in New Issue
Block a user