250 lines
6.0 KiB
C++
250 lines
6.0 KiB
C++
#include "PersistenceWriter.h"
|
|
|
|
#include <windows.h>
|
|
|
|
#include <algorithm>
|
|
#include <filesystem>
|
|
#include <fstream>
|
|
#include <utility>
|
|
|
|
PersistenceWriter::PersistenceWriter(std::chrono::milliseconds debounceDelay, SnapshotSink sink) :
|
|
mDebounceDelay(debounceDelay),
|
|
mSink(std::move(sink))
|
|
{
|
|
}
|
|
|
|
PersistenceWriter::~PersistenceWriter()
|
|
{
|
|
StopAndFlush();
|
|
}
|
|
|
|
void PersistenceWriter::SetResultCallback(ResultCallback callback)
|
|
{
|
|
std::lock_guard<std::mutex> lock(mMutex);
|
|
mResultCallback = std::move(callback);
|
|
}
|
|
|
|
bool PersistenceWriter::WriteSnapshot(const PersistenceSnapshot& snapshot, std::string& error)
|
|
{
|
|
if (!ValidateSnapshot(snapshot, error))
|
|
return false;
|
|
|
|
const bool succeeded = WriteSnapshotThroughSink(snapshot, error);
|
|
PublishWriteResult(snapshot, succeeded, error, false);
|
|
return succeeded;
|
|
}
|
|
|
|
bool PersistenceWriter::EnqueueSnapshot(const PersistenceSnapshot& snapshot, std::string& error)
|
|
{
|
|
if (!ValidateSnapshot(snapshot, error))
|
|
return false;
|
|
|
|
std::lock_guard<std::mutex> lock(mMutex);
|
|
if (!mAcceptingRequests)
|
|
{
|
|
error = "Persistence writer is stopping.";
|
|
return false;
|
|
}
|
|
|
|
StartWorkerLocked();
|
|
|
|
const auto now = std::chrono::steady_clock::now();
|
|
if (snapshot.debounceAllowed)
|
|
{
|
|
const std::string debounceKey = snapshot.debounceKey.empty() ? snapshot.targetPath.string() : snapshot.debounceKey;
|
|
PendingSnapshot& pending = mDebouncedSnapshots[debounceKey];
|
|
if (!pending.snapshot.targetPath.empty())
|
|
++mCoalescedCount;
|
|
else
|
|
++mEnqueuedCount;
|
|
|
|
pending.snapshot = snapshot;
|
|
pending.readyAt = snapshot.flushRequested ? now : now + mDebounceDelay;
|
|
}
|
|
else
|
|
{
|
|
mImmediateSnapshots.push_back(snapshot);
|
|
++mEnqueuedCount;
|
|
}
|
|
|
|
mCondition.notify_one();
|
|
return true;
|
|
}
|
|
|
|
void PersistenceWriter::StopAndFlush()
|
|
{
|
|
{
|
|
std::lock_guard<std::mutex> lock(mMutex);
|
|
mAcceptingRequests = false;
|
|
mStopping = true;
|
|
const auto now = std::chrono::steady_clock::now();
|
|
for (auto& entry : mDebouncedSnapshots)
|
|
entry.second.readyAt = now;
|
|
}
|
|
mCondition.notify_all();
|
|
|
|
if (mWorker.joinable())
|
|
mWorker.join();
|
|
}
|
|
|
|
PersistenceWriterMetrics PersistenceWriter::GetMetrics() const
|
|
{
|
|
std::lock_guard<std::mutex> lock(mMutex);
|
|
PersistenceWriterMetrics metrics;
|
|
metrics.pendingCount = PendingCountLocked();
|
|
metrics.enqueuedCount = mEnqueuedCount;
|
|
metrics.coalescedCount = mCoalescedCount;
|
|
metrics.writtenCount = mWrittenCount;
|
|
metrics.failedCount = mFailedCount;
|
|
return metrics;
|
|
}
|
|
|
|
bool PersistenceWriter::ValidateSnapshot(const PersistenceSnapshot& snapshot, std::string& error) const
|
|
{
|
|
if (snapshot.targetPath.empty())
|
|
{
|
|
error = "Persistence snapshot target path is empty.";
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool PersistenceWriter::WriteSnapshotThroughSink(const PersistenceSnapshot& snapshot, std::string& error) const
|
|
{
|
|
if (mSink)
|
|
return mSink(snapshot, error);
|
|
|
|
std::error_code fsError;
|
|
std::filesystem::create_directories(snapshot.targetPath.parent_path(), fsError);
|
|
|
|
const std::filesystem::path temporaryPath = snapshot.targetPath.string() + ".tmp";
|
|
std::ofstream output(temporaryPath, std::ios::binary | std::ios::trunc);
|
|
if (!output)
|
|
{
|
|
error = "Could not write file: " + temporaryPath.string();
|
|
return false;
|
|
}
|
|
|
|
output << snapshot.contents;
|
|
output.close();
|
|
if (!output.good())
|
|
{
|
|
error = "Could not finish writing file: " + temporaryPath.string();
|
|
return false;
|
|
}
|
|
|
|
if (!MoveFileExA(temporaryPath.string().c_str(), snapshot.targetPath.string().c_str(), MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH))
|
|
{
|
|
const DWORD lastError = GetLastError();
|
|
std::filesystem::remove(temporaryPath, fsError);
|
|
error = "Could not replace file: " + snapshot.targetPath.string() + " (Win32 error " + std::to_string(lastError) + ")";
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void PersistenceWriter::PublishWriteResult(const PersistenceSnapshot& snapshot, bool succeeded, const std::string& errorMessage, bool newerRequestPending)
|
|
{
|
|
ResultCallback callback;
|
|
{
|
|
std::lock_guard<std::mutex> lock(mMutex);
|
|
callback = mResultCallback;
|
|
}
|
|
|
|
if (!callback)
|
|
return;
|
|
|
|
PersistenceWriteResult result;
|
|
result.targetKind = snapshot.targetKind;
|
|
result.targetPath = snapshot.targetPath.string();
|
|
result.reason = snapshot.reason;
|
|
result.succeeded = succeeded;
|
|
result.errorMessage = errorMessage;
|
|
result.newerRequestPending = newerRequestPending;
|
|
callback(result);
|
|
}
|
|
|
|
void PersistenceWriter::StartWorkerLocked()
|
|
{
|
|
if (mWorkerRunning)
|
|
return;
|
|
|
|
mWorkerRunning = true;
|
|
mWorker = std::thread([this]() { WorkerMain(); });
|
|
}
|
|
|
|
void PersistenceWriter::WorkerMain()
|
|
{
|
|
for (;;)
|
|
{
|
|
PersistenceSnapshot snapshot;
|
|
{
|
|
std::unique_lock<std::mutex> lock(mMutex);
|
|
for (;;)
|
|
{
|
|
if (!mImmediateSnapshots.empty())
|
|
{
|
|
snapshot = std::move(mImmediateSnapshots.front());
|
|
mImmediateSnapshots.pop_front();
|
|
break;
|
|
}
|
|
|
|
if (!mDebouncedSnapshots.empty())
|
|
{
|
|
const auto now = std::chrono::steady_clock::now();
|
|
auto readyIt = mDebouncedSnapshots.end();
|
|
auto nextReadyAt = (std::chrono::steady_clock::time_point::max)();
|
|
for (auto it = mDebouncedSnapshots.begin(); it != mDebouncedSnapshots.end(); ++it)
|
|
{
|
|
if (it->second.readyAt <= now)
|
|
{
|
|
readyIt = it;
|
|
break;
|
|
}
|
|
if (it->second.readyAt < nextReadyAt)
|
|
nextReadyAt = it->second.readyAt;
|
|
}
|
|
|
|
if (readyIt != mDebouncedSnapshots.end())
|
|
{
|
|
snapshot = std::move(readyIt->second.snapshot);
|
|
mDebouncedSnapshots.erase(readyIt);
|
|
break;
|
|
}
|
|
|
|
mCondition.wait_until(lock, nextReadyAt);
|
|
continue;
|
|
}
|
|
|
|
if (mStopping)
|
|
{
|
|
mWorkerRunning = false;
|
|
return;
|
|
}
|
|
|
|
mCondition.wait(lock);
|
|
}
|
|
}
|
|
|
|
std::string error;
|
|
const bool succeeded = WriteSnapshotThroughSink(snapshot, error);
|
|
bool newerRequestPending = false;
|
|
{
|
|
std::lock_guard<std::mutex> lock(mMutex);
|
|
if (succeeded)
|
|
++mWrittenCount;
|
|
else
|
|
++mFailedCount;
|
|
newerRequestPending = PendingCountLocked() > 0;
|
|
}
|
|
PublishWriteResult(snapshot, succeeded, error, newerRequestPending);
|
|
}
|
|
}
|
|
|
|
std::size_t PersistenceWriter::PendingCountLocked() const
|
|
{
|
|
return mImmediateSnapshots.size() + mDebouncedSnapshots.size();
|
|
}
|