#include "PersistenceWriter.h" #include #include #include #include #include PersistenceWriter::PersistenceWriter(std::chrono::milliseconds debounceDelay, SnapshotSink sink) : mDebounceDelay(debounceDelay), mSink(std::move(sink)) { } PersistenceWriter::~PersistenceWriter() { std::string error; StopAndFlush((std::chrono::milliseconds::max)(), error); } void PersistenceWriter::SetResultCallback(ResultCallback callback) { std::lock_guard lock(mMutex); mResultCallback = std::move(callback); } bool PersistenceWriter::WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error) { if (!ValidateSnapshot(snapshot, error)) return false; const bool succeeded = WriteSnapshotThroughSink(snapshot, error); PublishWriteResult(snapshot, succeeded, error, false); return succeeded; } bool PersistenceWriter::EnqueueSnapshot(const PersistenceSnapshot& snapshot, std::string& error) { if (!ValidateSnapshot(snapshot, error)) return false; std::lock_guard lock(mMutex); if (!mAcceptingRequests) { error = "Persistence writer is stopping."; return false; } StartWorkerLocked(); const auto now = std::chrono::steady_clock::now(); if (snapshot.debounceAllowed) { const std::string debounceKey = snapshot.debounceKey.empty() ? snapshot.targetPath.string() : snapshot.debounceKey; PendingSnapshot& pending = mDebouncedSnapshots[debounceKey]; if (!pending.snapshot.targetPath.empty()) ++mCoalescedCount; else ++mEnqueuedCount; pending.snapshot = snapshot; pending.readyAt = snapshot.flushRequested ? now : now + mDebounceDelay; } else { mImmediateSnapshots.push_back(snapshot); ++mEnqueuedCount; } mCondition.notify_one(); return true; } bool PersistenceWriter::StopAndFlush(std::chrono::milliseconds timeout, std::string& error) { { std::lock_guard lock(mMutex); mAcceptingRequests = false; mStopping = true; const auto now = std::chrono::steady_clock::now(); for (auto& entry : mDebouncedSnapshots) entry.second.readyAt = now; } mCondition.notify_all(); std::unique_lock lock(mMutex); if (mWorkerRunning) { if (timeout == (std::chrono::milliseconds::max)()) { mCondition.wait(lock, [this]() { return !mWorkerRunning; }); } else { const auto deadline = std::chrono::steady_clock::now() + timeout; if (!mCondition.wait_until(lock, deadline, [this]() { return !mWorkerRunning; })) { error = "Timed out while flushing persistence writer."; return false; } } } lock.unlock(); if (mWorker.joinable()) mWorker.join(); return true; } PersistenceWriterMetrics PersistenceWriter::GetMetrics() const { std::lock_guard lock(mMutex); PersistenceWriterMetrics metrics; metrics.pendingCount = PendingCountLocked(); metrics.enqueuedCount = mEnqueuedCount; metrics.coalescedCount = mCoalescedCount; metrics.writtenCount = mWrittenCount; metrics.failedCount = mFailedCount; return metrics; } bool PersistenceWriter::ValidateSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const { if (snapshot.targetPath.empty()) { error = "Persistence snapshot target path is empty."; return false; } return true; } bool PersistenceWriter::WriteSnapshotThroughSink(const PersistenceSnapshot& snapshot, std::string& error) const { if (mSink) return mSink(snapshot, error); std::error_code fsError; std::filesystem::create_directories(snapshot.targetPath.parent_path(), fsError); const std::filesystem::path temporaryPath = snapshot.targetPath.string() + ".tmp"; std::ofstream output(temporaryPath, std::ios::binary | std::ios::trunc); if (!output) { error = "Could not write file: " + temporaryPath.string(); return false; } output << snapshot.contents; output.close(); if (!output.good()) { error = "Could not finish writing file: " + temporaryPath.string(); return false; } if (!MoveFileExA(temporaryPath.string().c_str(), snapshot.targetPath.string().c_str(), MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH)) { const DWORD lastError = GetLastError(); std::filesystem::remove(temporaryPath, fsError); error = "Could not replace file: " + snapshot.targetPath.string() + " (Win32 error " + std::to_string(lastError) + ")"; return false; } return true; } void PersistenceWriter::PublishWriteResult(const PersistenceSnapshot& snapshot, bool succeeded, const std::string& errorMessage, bool newerRequestPending) { ResultCallback callback; { std::lock_guard lock(mMutex); callback = mResultCallback; } if (!callback) return; PersistenceWriteResult result; result.targetKind = snapshot.targetKind; result.targetPath = snapshot.targetPath.string(); result.reason = snapshot.reason; result.succeeded = succeeded; result.errorMessage = errorMessage; result.newerRequestPending = newerRequestPending; callback(result); } void PersistenceWriter::StartWorkerLocked() { if (mWorkerRunning) return; mWorkerRunning = true; mWorker = std::thread([this]() { WorkerMain(); }); } void PersistenceWriter::WorkerMain() { for (;;) { PersistenceSnapshot snapshot; { std::unique_lock lock(mMutex); for (;;) { if (!mImmediateSnapshots.empty()) { snapshot = std::move(mImmediateSnapshots.front()); mImmediateSnapshots.pop_front(); break; } if (!mDebouncedSnapshots.empty()) { const auto now = std::chrono::steady_clock::now(); auto readyIt = mDebouncedSnapshots.end(); auto nextReadyAt = (std::chrono::steady_clock::time_point::max)(); for (auto it = mDebouncedSnapshots.begin(); it != mDebouncedSnapshots.end(); ++it) { if (it->second.readyAt <= now) { readyIt = it; break; } if (it->second.readyAt < nextReadyAt) nextReadyAt = it->second.readyAt; } if (readyIt != mDebouncedSnapshots.end()) { snapshot = std::move(readyIt->second.snapshot); mDebouncedSnapshots.erase(readyIt); break; } mCondition.wait_until(lock, nextReadyAt); continue; } if (mStopping) { mWorkerRunning = false; mCondition.notify_all(); return; } mCondition.wait(lock); } } std::string error; const bool succeeded = WriteSnapshotThroughSink(snapshot, error); bool newerRequestPending = false; { std::lock_guard lock(mMutex); if (succeeded) ++mWrittenCount; else ++mFailedCount; newerRequestPending = PendingCountLocked() > 0; } PublishWriteResult(snapshot, succeeded, error, newerRequestPending); } } std::size_t PersistenceWriter::PendingCountLocked() const { return mImmediateSnapshots.size() + mDebouncedSnapshots.size(); }