#include #include #include #include #include #include #include #include #include #include namespace xrpl::NodeStore { BatchWriter::BatchWriter(Callback& callback, Scheduler& scheduler) : m_callback(callback), m_scheduler(scheduler) { mWriteSet.reserve(batchWritePreallocationSize); } BatchWriter::~BatchWriter() { waitForWriting(); } void BatchWriter::store(std::shared_ptr const& object) { std::unique_lock sl(mWriteMutex); // If the batch has reached its limit, we wait // until the batch writer is finished while (mWriteSet.size() >= batchWriteLimitSize) mWriteCondition.wait(sl); mWriteSet.push_back(object); if (!mWritePending) { mWritePending = true; m_scheduler.scheduleTask(*this); } } int BatchWriter::getWriteLoad() { std::scoped_lock const sl(mWriteMutex); return std::max(mWriteLoad, static_cast(mWriteSet.size())); } void BatchWriter::performScheduledTask() { writeBatch(); } void BatchWriter::writeBatch() { for (;;) { std::vector> set; set.reserve(batchWritePreallocationSize); { std::scoped_lock const sl(mWriteMutex); mWriteSet.swap(set); XRPL_ASSERT( mWriteSet.empty(), "xrpl::NodeStore::BatchWriter::writeBatch : writes not set"); mWriteLoad = set.size(); if (set.empty()) { mWritePending = false; mWriteCondition.notify_all(); // VFALCO NOTE Fix this function to not return from the middle return; } } BatchWriteReport report{}; report.writeCount = set.size(); auto const before = std::chrono::steady_clock::now(); m_callback.writeBatch(set); report.elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - before); m_scheduler.onBatchWrite(report); } } void BatchWriter::waitForWriting() { std::unique_lock sl(mWriteMutex); while (mWritePending) mWriteCondition.wait(sl); } } // namespace xrpl::NodeStore