From 1629dbc77a8cbea0f08d037e71e0abf42a9255d4 Mon Sep 17 00:00:00 2001 From: Aiden <68633820+awils27@users.noreply.github.com> Date: Mon, 11 May 2026 19:58:14 +1000 Subject: [PATCH] step 4 --- .../runtime/persistence/PersistenceWriter.cpp | 36 ++++++++++++++- .../runtime/persistence/PersistenceWriter.h | 16 ++++++- .../runtime/store/RuntimeStore.cpp | 24 ++++++++++ .../runtime/telemetry/HealthTelemetry.cpp | 45 +++++++++++++++++++ .../runtime/telemetry/HealthTelemetry.h | 21 +++++++++ docs/PHASE_6_BACKGROUND_PERSISTENCE_DESIGN.md | 15 +++++-- tests/HealthTelemetryTests.cpp | 24 ++++++++++ tests/PersistenceWriterTests.cpp | 30 +++++++++++++ 8 files changed, 204 insertions(+), 7 deletions(-) diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.cpp b/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.cpp index b1ca9cb..e4b47ae 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.cpp +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.cpp @@ -18,12 +18,20 @@ PersistenceWriter::~PersistenceWriter() StopAndFlush(); } -bool PersistenceWriter::WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const +void PersistenceWriter::SetResultCallback(ResultCallback callback) +{ + std::lock_guard lock(mMutex); + mResultCallback = std::move(callback); +} + +bool PersistenceWriter::WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error) { if (!ValidateSnapshot(snapshot, error)) return false; - return WriteSnapshotThroughSink(snapshot, error); + const bool succeeded = WriteSnapshotThroughSink(snapshot, error); + PublishWriteResult(snapshot, succeeded, error, false); + return succeeded; } bool PersistenceWriter::EnqueueSnapshot(const PersistenceSnapshot& snapshot, std::string& error) @@ -137,6 +145,27 @@ bool PersistenceWriter::WriteSnapshotThroughSink(const PersistenceSnapshot& snap return true; } +void PersistenceWriter::PublishWriteResult(const PersistenceSnapshot& snapshot, bool succeeded, const std::string& errorMessage, bool newerRequestPending) +{ + ResultCallback callback; + { + std::lock_guard lock(mMutex); + callback = mResultCallback; + } + + if (!callback) + return; + + PersistenceWriteResult result; + result.targetKind = snapshot.targetKind; + result.targetPath = snapshot.targetPath.string(); + result.reason = snapshot.reason; + result.succeeded = succeeded; + result.errorMessage = errorMessage; + result.newerRequestPending = newerRequestPending; + callback(result); +} + void PersistenceWriter::StartWorkerLocked() { if (mWorkerRunning) @@ -201,13 +230,16 @@ void PersistenceWriter::WorkerMain() std::string error; const bool succeeded = WriteSnapshotThroughSink(snapshot, error); + bool newerRequestPending = false; { std::lock_guard lock(mMutex); if (succeeded) ++mWrittenCount; else ++mFailedCount; + newerRequestPending = PendingCountLocked() > 0; } + PublishWriteResult(snapshot, succeeded, error, newerRequestPending); } } diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.h b/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.h index 8af6ae9..a24f56b 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.h +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/persistence/PersistenceWriter.h @@ -21,17 +21,29 @@ struct PersistenceWriterMetrics uint64_t failedCount = 0; }; +struct PersistenceWriteResult +{ + PersistenceTargetKind targetKind = PersistenceTargetKind::RuntimeState; + std::string targetPath; + std::string reason; + bool succeeded = false; + std::string errorMessage; + bool newerRequestPending = false; +}; + class PersistenceWriter { public: using SnapshotSink = std::function; + using ResultCallback = std::function; explicit PersistenceWriter( std::chrono::milliseconds debounceDelay = std::chrono::milliseconds(50), SnapshotSink sink = SnapshotSink()); ~PersistenceWriter(); - bool WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const; + void SetResultCallback(ResultCallback callback); + bool WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error); bool EnqueueSnapshot(const PersistenceSnapshot& snapshot, std::string& error); void StopAndFlush(); PersistenceWriterMetrics GetMetrics() const; @@ -45,12 +57,14 @@ private: bool ValidateSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const; bool WriteSnapshotThroughSink(const PersistenceSnapshot& snapshot, std::string& error) const; + void PublishWriteResult(const PersistenceSnapshot& snapshot, bool succeeded, const std::string& errorMessage, bool newerRequestPending); void StartWorkerLocked(); void WorkerMain(); std::size_t PendingCountLocked() const; std::chrono::milliseconds mDebounceDelay; SnapshotSink mSink; + ResultCallback mResultCallback; mutable std::mutex mMutex; std::condition_variable mCondition; std::thread mWorker; diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.cpp b/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.cpp index baf7378..bc4a730 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.cpp +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/store/RuntimeStore.cpp @@ -24,6 +24,21 @@ double GenerateStartupRandom() return distribution(randomDevice); } +std::string PersistenceTargetKindName(PersistenceTargetKind targetKind) +{ + switch (targetKind) + { + case PersistenceTargetKind::RuntimeState: + return "runtime-state"; + case PersistenceTargetKind::StackPreset: + return "stack-preset"; + case PersistenceTargetKind::RuntimeConfig: + return "runtime-config"; + default: + return "unknown"; + } +} + } RuntimeStore::RuntimeStore() : @@ -37,6 +52,15 @@ RuntimeStore::RuntimeStore() : mStartTime(std::chrono::steady_clock::now()), mLastScanTime((std::chrono::steady_clock::time_point::min)()) { + mPersistenceWriter.SetResultCallback([this](const PersistenceWriteResult& result) { + mHealthTelemetry.RecordPersistenceWriteResult( + result.succeeded, + PersistenceTargetKindName(result.targetKind), + result.targetPath, + result.reason, + result.errorMessage, + result.newerRequestPending); + }); } HealthTelemetry& RuntimeStore::GetHealthTelemetry() diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/telemetry/HealthTelemetry.cpp b/apps/LoopThroughWithOpenGLCompositing/runtime/telemetry/HealthTelemetry.cpp index 92e81e2..9cfb600 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/telemetry/HealthTelemetry.cpp +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/telemetry/HealthTelemetry.cpp @@ -169,6 +169,44 @@ bool HealthTelemetry::TryRecordRuntimeEventDispatchStats(std::size_t dispatchedE return true; } +void HealthTelemetry::RecordPersistenceWriteResult(bool succeeded, const std::string& targetKind, const std::string& targetPath, + const std::string& reason, const std::string& errorMessage, bool newerRequestPending) +{ + std::lock_guard lock(mMutex); + if (succeeded) + ++mPersistence.writeSuccessCount; + else + ++mPersistence.writeFailureCount; + mPersistence.lastWriteSucceeded = succeeded; + mPersistence.unsavedChanges = !succeeded || newerRequestPending; + mPersistence.newerRequestPending = newerRequestPending; + mPersistence.lastTargetKind = targetKind; + mPersistence.lastTargetPath = targetPath; + mPersistence.lastReason = reason; + mPersistence.lastErrorMessage = errorMessage; +} + +bool HealthTelemetry::TryRecordPersistenceWriteResult(bool succeeded, const std::string& targetKind, const std::string& targetPath, + const std::string& reason, const std::string& errorMessage, bool newerRequestPending) +{ + std::unique_lock lock(mMutex, std::try_to_lock); + if (!lock.owns_lock()) + return false; + + if (succeeded) + ++mPersistence.writeSuccessCount; + else + ++mPersistence.writeFailureCount; + mPersistence.lastWriteSucceeded = succeeded; + mPersistence.unsavedChanges = !succeeded || newerRequestPending; + mPersistence.newerRequestPending = newerRequestPending; + mPersistence.lastTargetKind = targetKind; + mPersistence.lastTargetPath = targetPath; + mPersistence.lastReason = reason; + mPersistence.lastErrorMessage = errorMessage; + return true; +} + HealthTelemetry::SignalStatusSnapshot HealthTelemetry::GetSignalStatusSnapshot() const { std::lock_guard lock(mMutex); @@ -193,6 +231,12 @@ HealthTelemetry::RuntimeEventMetricsSnapshot HealthTelemetry::GetRuntimeEventMet return mRuntimeEvents; } +HealthTelemetry::PersistenceSnapshot HealthTelemetry::GetPersistenceSnapshot() const +{ + std::lock_guard lock(mMutex); + return mPersistence; +} + HealthTelemetry::Snapshot HealthTelemetry::GetSnapshot() const { std::lock_guard lock(mMutex); @@ -202,5 +246,6 @@ HealthTelemetry::Snapshot HealthTelemetry::GetSnapshot() const snapshot.videoIO = mVideoIOStatus; snapshot.performance = mPerformance; snapshot.runtimeEvents = mRuntimeEvents; + snapshot.persistence = mPersistence; return snapshot; } diff --git a/apps/LoopThroughWithOpenGLCompositing/runtime/telemetry/HealthTelemetry.h b/apps/LoopThroughWithOpenGLCompositing/runtime/telemetry/HealthTelemetry.h index 19c1098..7b4734f 100644 --- a/apps/LoopThroughWithOpenGLCompositing/runtime/telemetry/HealthTelemetry.h +++ b/apps/LoopThroughWithOpenGLCompositing/runtime/telemetry/HealthTelemetry.h @@ -69,12 +69,26 @@ public: RuntimeEventDispatchSnapshot dispatch; }; + struct PersistenceSnapshot + { + uint64_t writeSuccessCount = 0; + uint64_t writeFailureCount = 0; + bool lastWriteSucceeded = true; + bool unsavedChanges = false; + bool newerRequestPending = false; + std::string lastTargetKind; + std::string lastTargetPath; + std::string lastReason; + std::string lastErrorMessage; + }; + struct Snapshot { SignalStatusSnapshot signal; VideoIOStatusSnapshot videoIO; PerformanceSnapshot performance; RuntimeEventMetricsSnapshot runtimeEvents; + PersistenceSnapshot persistence; }; HealthTelemetry() = default; @@ -107,10 +121,16 @@ public: bool TryRecordRuntimeEventDispatchStats(std::size_t dispatchedEvents, std::size_t handlerInvocations, std::size_t handlerFailures, double dispatchDurationMilliseconds); + void RecordPersistenceWriteResult(bool succeeded, const std::string& targetKind, const std::string& targetPath, + const std::string& reason, const std::string& errorMessage, bool newerRequestPending); + bool TryRecordPersistenceWriteResult(bool succeeded, const std::string& targetKind, const std::string& targetPath, + const std::string& reason, const std::string& errorMessage, bool newerRequestPending); + SignalStatusSnapshot GetSignalStatusSnapshot() const; VideoIOStatusSnapshot GetVideoIOStatusSnapshot() const; PerformanceSnapshot GetPerformanceSnapshot() const; RuntimeEventMetricsSnapshot GetRuntimeEventMetricsSnapshot() const; + PersistenceSnapshot GetPersistenceSnapshot() const; Snapshot GetSnapshot() const; private: @@ -119,4 +139,5 @@ private: VideoIOStatusSnapshot mVideoIOStatus; PerformanceSnapshot mPerformance; RuntimeEventMetricsSnapshot mRuntimeEvents; + PersistenceSnapshot mPersistence; }; diff --git a/docs/PHASE_6_BACKGROUND_PERSISTENCE_DESIGN.md b/docs/PHASE_6_BACKGROUND_PERSISTENCE_DESIGN.md index c131139..f3fa1c8 100644 --- a/docs/PHASE_6_BACKGROUND_PERSISTENCE_DESIGN.md +++ b/docs/PHASE_6_BACKGROUND_PERSISTENCE_DESIGN.md @@ -7,7 +7,7 @@ Phases 1-5 separate durable state, coordination policy, render-facing snapshots, ## Status - Phase 6 design package: proposed. -- Phase 6 implementation: Step 3 complete. +- Phase 6 implementation: Step 4 complete. - Current alignment: `RuntimeStore` owns durable serialization, config, package metadata, preset IO, and persistence requests; `CommittedLiveState` owns the current committed/session layer state; and `RuntimeCoordinator` publishes typed persistence requests for persisted mutations. The remaining issue is that actual disk writes are still synchronous store work rather than queued, debounced, atomic background writes. Current persistence footholds: @@ -235,9 +235,16 @@ Make disk writes safer and observable. Initial target: -- temp-file then replace -- failure returned/published with structured reason -- `HealthTelemetry` receives persistence warning state +- [x] temp-file then replace +- [x] failure returned/published with structured reason +- [x] `HealthTelemetry` receives persistence warning state + +Current implementation: + +- `PersistenceWriter::WriteSnapshot(...)` and worker writes use temp-file then `MoveFileExA(..., MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH)`. +- `PersistenceWriteResult` reports target kind, target path, reason, success/failure, error message, and whether newer work was pending. +- `RuntimeStore` wires persistence write results into `HealthTelemetry`. +- `HealthTelemetry` records persistence success/failure counts, last target/reason/error, pending-newer-request state, and unsaved-change state. ### Step 5. Wire Coordinator/Event Requests To Writer diff --git a/tests/HealthTelemetryTests.cpp b/tests/HealthTelemetryTests.cpp index f1e9216..9a269f6 100644 --- a/tests/HealthTelemetryTests.cpp +++ b/tests/HealthTelemetryTests.cpp @@ -53,6 +53,29 @@ void TestRuntimeEventTryRecord() Expect(metrics.queue.oldestEventAgeMilliseconds == 0.0, "queue age is clamped to non-negative values"); Expect(metrics.dispatch.lastDispatchDurationMilliseconds == 0.0, "dispatch duration is clamped to non-negative values"); } + +void TestPersistenceWriteHealth() +{ + HealthTelemetry telemetry; + telemetry.RecordPersistenceWriteResult(false, "runtime-state", "runtime/runtime_state.json", "UpdateLayerParameter", + "disk full", true); + + HealthTelemetry::PersistenceSnapshot persistence = telemetry.GetPersistenceSnapshot(); + Expect(persistence.writeFailureCount == 1, "persistence health counts write failures"); + Expect(!persistence.lastWriteSucceeded, "persistence health records failed write state"); + Expect(persistence.unsavedChanges, "persistence health reports unsaved changes after failure"); + Expect(persistence.newerRequestPending, "persistence health records pending newer request"); + Expect(persistence.lastTargetKind == "runtime-state", "persistence health records target kind"); + Expect(persistence.lastReason == "UpdateLayerParameter", "persistence health records reason"); + Expect(persistence.lastErrorMessage == "disk full", "persistence health records error message"); + + Expect(telemetry.TryRecordPersistenceWriteResult(true, "runtime-state", "runtime/runtime_state.json", "flush", "", false), + "try persistence health succeeds when uncontended"); + persistence = telemetry.GetPersistenceSnapshot(); + Expect(persistence.writeSuccessCount == 1, "persistence health counts write successes"); + Expect(persistence.lastWriteSucceeded, "persistence health records successful write state"); + Expect(!persistence.unsavedChanges, "persistence health clears unsaved changes after latest successful write with no pending request"); +} } int main() @@ -60,6 +83,7 @@ int main() TestRuntimeEventQueueMetrics(); TestRuntimeEventDispatchStats(); TestRuntimeEventTryRecord(); + TestPersistenceWriteHealth(); if (gFailures != 0) { diff --git a/tests/PersistenceWriterTests.cpp b/tests/PersistenceWriterTests.cpp index d376a09..d9d4a62 100644 --- a/tests/PersistenceWriterTests.cpp +++ b/tests/PersistenceWriterTests.cpp @@ -94,12 +94,42 @@ void TestImmediateRequestsAreNotCoalesced() "immediate snapshots preserve order"); } } + +void TestWriteFailureReportsStructuredResult() +{ + std::vector results; + PersistenceWriter writer( + std::chrono::milliseconds(1), + [](const PersistenceSnapshot&, std::string& error) { + error = "simulated failure"; + return false; + }); + writer.SetResultCallback([&results](const PersistenceWriteResult& result) { + results.push_back(result); + }); + + PersistenceSnapshot snapshot = MakeRuntimeSnapshot("payload"); + snapshot.debounceAllowed = false; + snapshot.reason = "failure-test"; + + std::string error; + Expect(writer.EnqueueSnapshot(snapshot, error), "failing snapshot still enqueues"); + writer.StopAndFlush(); + + Expect(results.size() == 1, "writer reports one failure result"); + Expect(!results.empty() && !results[0].succeeded, "writer result records failure"); + Expect(!results.empty() && results[0].reason == "failure-test", "writer result preserves reason"); + Expect(!results.empty() && results[0].errorMessage == "simulated failure", "writer result preserves error message"); + Expect(!results.empty() && !results[0].newerRequestPending, "writer result reports no newer pending request"); + Expect(writer.GetMetrics().failedCount == 1, "writer metrics count failed writes"); +} } int main() { TestDebouncedRequestsCoalesceToNewestSnapshot(); TestImmediateRequestsAreNotCoalesced(); + TestWriteFailureReportsStructuredResult(); if (gFailures != 0) {