Files
rippled/src/libxrpl/nodestore/BatchWriter.cpp
Bart 1d42c4f6de refactor: Remove unnecessary copyright notices already covered by LICENSE.md (#5929)
Per XLS-0095, we are taking steps to rename ripple(d) to xrpl(d).

This change specifically removes all copyright notices referencing Ripple, XRPLF, and certain affiliated contributors upon mutual agreement, so the notice in the LICENSE.md file applies throughout. Copyright notices referencing external contributions remain as-is. Duplicate verbiage is also removed.
2025-11-04 08:33:42 +00:00

106 lines
2.2 KiB
C++

#include <xrpl/nodestore/detail/BatchWriter.h>
namespace ripple {
namespace NodeStore {
BatchWriter::BatchWriter(Callback& callback, Scheduler& scheduler)
: m_callback(callback)
, m_scheduler(scheduler)
, mWriteLoad(0)
, mWritePending(false)
{
mWriteSet.reserve(batchWritePreallocationSize);
}
BatchWriter::~BatchWriter()
{
waitForWriting();
}
void
BatchWriter::store(std::shared_ptr<NodeObject> const& object)
{
std::unique_lock<decltype(mWriteMutex)> sl(mWriteMutex);
// If the batch has reached its limit, we wait
// until the batch writer is finished
while (mWriteSet.size() >= batchWriteLimitSize)
mWriteCondition.wait(sl);
mWriteSet.push_back(object);
if (!mWritePending)
{
mWritePending = true;
m_scheduler.scheduleTask(*this);
}
}
int
BatchWriter::getWriteLoad()
{
std::lock_guard sl(mWriteMutex);
return std::max(mWriteLoad, static_cast<int>(mWriteSet.size()));
}
void
BatchWriter::performScheduledTask()
{
writeBatch();
}
void
BatchWriter::writeBatch()
{
for (;;)
{
std::vector<std::shared_ptr<NodeObject>> set;
set.reserve(batchWritePreallocationSize);
{
std::lock_guard sl(mWriteMutex);
mWriteSet.swap(set);
XRPL_ASSERT(
mWriteSet.empty(),
"ripple::NodeStore::BatchWriter::writeBatch : writes not set");
mWriteLoad = set.size();
if (set.empty())
{
mWritePending = false;
mWriteCondition.notify_all();
// VFALCO NOTE Fix this function to not return from the middle
return;
}
}
BatchWriteReport report;
report.writeCount = set.size();
auto const before = std::chrono::steady_clock::now();
m_callback.writeBatch(set);
report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - before);
m_scheduler.onBatchWrite(report);
}
}
void
BatchWriter::waitForWriting()
{
std::unique_lock<decltype(mWriteMutex)> sl(mWriteMutex);
while (mWritePending)
mWriteCondition.wait(sl);
}
} // namespace NodeStore
} // namespace ripple