Compare commits

...

11 Commits

Author SHA1 Message Date
Bart
0e26aadfe6 Make pending writes size_t to avoid casting everywhere 2026-03-21 10:24:24 -04:00
Bart
d029bcf2d0 Threadpool join in destructor, exception handling 2026-03-21 10:21:24 -04:00
Bart
403cab41e9 Fix thread parallelization calculation 2026-03-20 18:03:17 -04:00
Bart
59e6fbfe12 Merge branch 'develop' into bthomee/iops 2026-03-20 16:04:00 -04:00
Bart
d63f80f73c Merge branch 'develop' into bthomee/iops 2026-03-19 17:28:32 -04:00
Bart
dc5eb0ea50 Make class variables private 2026-03-19 08:36:24 -04:00
Bart
609024f15c Merge branch 'develop' into bthomee/iops 2026-03-19 06:32:59 -04:00
Bart
1bf5b0aa10 Add braces 2026-03-18 14:52:06 -04:00
Bart
f783a15bc8 Review feedback 2026-03-18 14:20:06 -04:00
Bart
5a94948a04 Merge branch 'develop' into bthomee/iops 2026-03-18 14:03:28 -04:00
Bart
f586382622 perf: Improve IOPS when reading from and writing to NuDB and RocksDB 2026-03-17 17:56:59 -04:00
5 changed files with 415 additions and 59 deletions

View File

@@ -138,6 +138,9 @@ public:
/** Returns the number of file descriptors the backend expects to need. */
virtual int
fdRequired() const = 0;
/** The number of hardware threads to use for compression of a batch. */
static unsigned int const numHardwareThreads;
};
} // namespace NodeStore

View File

@@ -0,0 +1,18 @@
#include <xrpl/nodestore/Backend.h>
#include <algorithm>
#include <thread>
namespace xrpl {
namespace NodeStore {
// Initialize the static constant for hardware thread count. The `hardware_concurrency` function can
// return 0 on some platforms, in which case we default to 1. We limit the total number of threads
// to 8 to avoid contention.
unsigned int const Backend::numHardwareThreads = []() {
auto const hw = std::thread::hardware_concurrency();
return std::min(std::max(hw, 1u), 8u);
}();
} // namespace NodeStore
} // namespace xrpl

View File

@@ -7,15 +7,22 @@
#include <xrpl/nodestore/detail/EncodedBlob.h>
#include <xrpl/nodestore/detail/codec.h>
#include <boost/asio/post.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/filesystem.hpp>
#include <nudb/nudb.hpp>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <exception>
#include <latch>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
namespace xrpl {
namespace NodeStore {
@@ -23,21 +30,6 @@ namespace NodeStore {
class NuDBBackend : public Backend
{
public:
// "appnum" is an application-defined constant stored in the header of a
// NuDB database. We used it to identify shard databases before that code
// was removed. For now, its only use is a sanity check that the database
// was created by xrpld.
static constexpr std::uint64_t appnum = 1;
beast::Journal const j_;
size_t const keyBytes_;
std::size_t const burstSize_;
std::string const name_;
std::size_t const blockSize_;
nudb::store db_;
std::atomic<bool> deletePath_;
Scheduler& scheduler_;
NuDBBackend(
size_t keyBytes,
Section const& keyValues,
@@ -51,6 +43,7 @@ public:
, blockSize_(parseBlockSize(name_, keyValues, journal))
, deletePath_(false)
, scheduler_(scheduler)
, threadPool_(numHardwareThreads)
{
if (name_.empty())
Throw<std::runtime_error>("nodestore: Missing path in NuDB backend");
@@ -71,6 +64,7 @@ public:
, db_(context)
, deletePath_(false)
, scheduler_(scheduler)
, threadPool_(numHardwareThreads)
{
if (name_.empty())
Throw<std::runtime_error>("nodestore: Missing path in NuDB backend");
@@ -80,6 +74,10 @@ public:
{
try
{
// Wait for all thread pool tasks to complete before closing the database. This prevents
// worker threads from accessing the database after close.
threadPool_.join();
// close can throw and we don't want the destructor to throw.
close();
}
@@ -127,16 +125,24 @@ public:
nudb::create<nudb::xxhasher>(
dp, kp, lp, appType, uid, salt, keyBytes_, blockSize_, 0.50, ec);
if (ec == nudb::errc::file_exists)
{
ec = {};
}
if (ec)
{
Throw<nudb::system_error>(ec);
}
}
db_.open(dp, kp, lp, ec);
if (ec)
{
Throw<nudb::system_error>(ec);
}
if (db_.appnum() != appnum)
{
Throw<std::runtime_error>("nodestore: unknown appnum");
}
db_.set_burst(burstSize_);
}
@@ -184,6 +190,7 @@ public:
Status status = ok;
pno->reset();
nudb::error_code ec;
db_.fetch(
hash.data(),
[&hash, pno, &status](void const* data, std::size_t size) {
@@ -199,30 +206,105 @@ public:
status = ok;
},
ec);
if (ec == nudb::error::key_not_found)
{
return notFound;
}
if (ec)
{
Throw<nudb::system_error>(ec);
}
return status;
}
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
fetchBatch(std::vector<uint256> const& hashes) override
{
std::vector<std::shared_ptr<NodeObject>> results;
results.reserve(hashes.size());
for (auto const& h : hashes)
if (hashes.empty())
{
std::shared_ptr<NodeObject> nObj;
Status status = fetch(h, &nObj);
if (status != ok)
return {{}, ok};
}
std::vector<std::shared_ptr<NodeObject>> results(hashes.size());
// Calculate optimal parallelization parameters for the batch.
auto const [numThreads, numItems] = calculateBatchParallelization(hashes.size());
// If we need only one thread, just do it sequentially.
if (numThreads == 1u)
{
for (size_t i = 0; i < hashes.size(); ++i)
{
results.push_back({});
std::shared_ptr<NodeObject> nObj;
if (fetch(hashes[i], &nObj) == ok)
{
results[i] = nObj;
}
}
else
return {results, ok};
}
// Use a latch to synchronize task completion.
std::latch taskCompletion(numThreads);
// Track exceptions from worker threads.
std::exception_ptr eptr;
std::mutex emutex;
// Submit fetch tasks to the thread pool.
for (auto t = 0u; t < numThreads; ++t)
{
auto const startIdx = t * numItems;
XRPL_ASSERT(
startIdx < hashes.size(),
"xrpl::NuDBFactory::fetchBatch : startIdx < hashes.size()");
if (startIdx >= hashes.size())
{
results.push_back(nObj);
// This should never happen, but is kept as a safety check.
taskCompletion.count_down();
continue;
}
auto const endIdx = std::min<std::size_t>(startIdx + numItems, hashes.size());
auto task =
[this, &hashes, &results, &taskCompletion, &eptr, &emutex, startIdx, endIdx]() {
try
{
// Fetch the items assigned to this task.
for (size_t i = startIdx; i < endIdx; ++i)
{
std::shared_ptr<NodeObject> nObj;
if (fetch(hashes[i], &nObj) == ok)
{
results[i] = nObj;
}
}
}
catch (...)
{
// Store the first exception that occurs. Ensures count_down() is always
// called to prevent deadlock.
std::lock_guard<std::mutex> lock(emutex);
if (!eptr)
{
eptr = std::current_exception();
}
}
// Signal task completion.
taskCompletion.count_down();
};
boost::asio::post(threadPool_, std::move(task));
}
// Wait for all fetch tasks to complete.
taskCompletion.wait();
// Rethrow the first exception if one occurred.
if (eptr)
{
std::rethrow_exception(eptr);
}
return {results, ok};
@@ -232,12 +314,16 @@ public:
do_insert(std::shared_ptr<NodeObject> const& no)
{
EncodedBlob e(no);
nudb::error_code ec;
nudb::detail::buffer bf;
auto const result = nodeobject_compress(e.getData(), e.getSize(), bf);
nudb::error_code ec;
db_.insert(e.getKey(), result.first, result.second, ec);
if (ec && ec != nudb::error::key_exists)
{
Throw<nudb::system_error>(ec);
}
}
void
@@ -246,7 +332,19 @@ public:
BatchWriteReport report{};
report.writeCount = 1;
auto const start = std::chrono::steady_clock::now();
do_insert(no);
++pendingWrites_;
try
{
do_insert(no);
}
catch (...)
{
--pendingWrites_;
throw;
}
--pendingWrites_;
report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
scheduler_.onBatchWrite(report);
@@ -255,11 +353,128 @@ public:
void
storeBatch(Batch const& batch) override
{
if (batch.empty())
{
return;
}
BatchWriteReport report{};
report.writeCount = batch.size();
auto const start = std::chrono::steady_clock::now();
for (auto const& e : batch)
do_insert(e);
pendingWrites_ += batch.size();
// Calculate optimal parallelization parameters for the batch.
auto const [numThreads, numItems] = calculateBatchParallelization(batch.size());
// If we need only one thread, just do it sequentially.
if (numThreads == 1u)
{
for (auto const& e : batch)
{
try
{
do_insert(e);
}
catch (...)
{
pendingWrites_ -= batch.size();
throw;
}
}
pendingWrites_ -= batch.size();
report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
scheduler_.onBatchWrite(report);
return;
}
// Helper struct that stores actual item data, not pointers, to avoid dangling references
// after EncodedBlob and buffer go out of scope in the thread.
struct CompressedData
{
std::vector<std::uint8_t> key;
std::vector<std::uint8_t> data;
std::exception_ptr eptr;
};
std::vector<CompressedData> compressed(batch.size());
// Use a latch to synchronize task completion.
std::latch taskCompletion(numThreads);
// Submit compression tasks to the thread pool.
for (auto t = 0u; t < numThreads; ++t)
{
auto const startIdx = t * numItems;
XRPL_ASSERT(
startIdx < batch.size(), "xrpl::NuDBFactory::storeBatch : startIdx < batch.size()");
if (startIdx >= batch.size())
{
// This should never happen, but is kept as a safety check.
taskCompletion.count_down();
continue;
}
auto const endIdx = std::min<std::size_t>(startIdx + numItems, batch.size());
auto task =
[&batch, &compressed, &taskCompletion, startIdx, endIdx, keyBytes = keyBytes_]() {
// Compress the items assigned to this task.
for (size_t i = startIdx; i < endIdx; ++i)
{
auto& item = compressed[i];
try
{
EncodedBlob e(batch[i]);
// Copy the key data to avoid dangling pointer.
auto const* keyPtr = static_cast<std::uint8_t const*>(e.getKey());
item.key.assign(keyPtr, keyPtr + keyBytes);
// Compress and copy the data to avoid dangling pointer.
nudb::detail::buffer bf;
auto const comp = nodeobject_compress(e.getData(), e.getSize(), bf);
auto const* dataPtr = static_cast<std::uint8_t const*>(comp.first);
item.data.assign(dataPtr, dataPtr + comp.second);
}
catch (...)
{
// Store the exception so it can be rethrown in the sequential phase
// below.
item.eptr = std::current_exception();
}
}
// Signal task completion.
taskCompletion.count_down();
};
boost::asio::post(threadPool_, std::move(task));
}
// Wait for all compression tasks to complete.
taskCompletion.wait();
// Insert the compressed data sequentially, since NuDB is designed as an append-only data
// store that limits concurrent writes.
for (auto const& item : compressed)
{
if (item.eptr)
{
pendingWrites_ -= batch.size();
std::rethrow_exception(item.eptr);
}
nudb::error_code ec;
db_.insert(item.key.data(), item.data.data(), item.data.size(), ec);
if (ec && ec != nudb::error::key_exists)
{
pendingWrites_ -= batch.size();
Throw<nudb::system_error>(ec);
}
}
pendingWrites_ -= batch.size();
report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
scheduler_.onBatchWrite(report);
@@ -276,7 +491,7 @@ public:
auto const dp = db_.dat_path();
auto const kp = db_.key_path();
auto const lp = db_.log_path();
// auto const appnum = db_.appnum();
nudb::error_code ec;
db_.close(ec);
if (ec)
@@ -310,7 +525,7 @@ public:
int
getWriteLoad() override
{
return 0;
return pendingWrites_.load();
}
void
@@ -345,6 +560,33 @@ public:
}
private:
/** Calculate optimal parallelization parameters for batch operations.
Determines the number of items per thread and actual number of tasks needed for parallel
batch processing, ensuring no thread has an invalid start index.
@param batchSize Number of items to process
@return A pair of (actualTasks, numItems) where actualTasks is the exact number of threads
to create and numItems is the number of items per thread.
*/
std::pair<unsigned int, unsigned int>
calculateBatchParallelization(std::size_t batchSize) const
{
// Estimate the number of threads using ceiling division: aim for at least 4 items per
// thread, but don't exceed the number of available hardware threads.
auto const numThreads =
std::min(static_cast<unsigned int>((batchSize + 3) / 4), numHardwareThreads);
// Calculate items per thread.
auto const numItems = (batchSize + numThreads - 1) / numThreads;
// Calculate actual tasks needed. After rounding up numItems, we may need fewer threads than
// initially estimated.
auto const actualTasks = (batchSize + numItems - 1) / numItems;
return {actualTasks, numItems};
}
static std::size_t
parseBlockSize(std::string const& name, Section const& keyValues, beast::Journal journal)
{
@@ -385,6 +627,23 @@ private:
Throw<std::runtime_error>(s.str());
}
}
// "appnum" is an application-defined constant stored in the header of a
// NuDB database. We used it to identify shard databases before that code
// was removed. For now, its only use is a sanity check that the database
// was created by xrpld.
static constexpr std::uint64_t appnum = 1;
beast::Journal const j_;
size_t const keyBytes_;
std::size_t const burstSize_;
std::string const name_;
std::size_t const blockSize_;
nudb::store db_;
std::atomic<bool> deletePath_;
Scheduler& scheduler_;
std::atomic<size_t> pendingWrites_{0};
boost::asio::thread_pool threadPool_;
};
//------------------------------------------------------------------------------

View File

@@ -13,6 +13,7 @@
#include <atomic>
#include <memory>
#include <thread>
namespace xrpl {
namespace NodeStore {
@@ -185,6 +186,41 @@ public:
}
}
// Enable pipelined writes for better write concurrency.
m_options.enable_pipelined_write = true;
// Set background job parallelism for better compaction/flush performance to the number of
// hardware threads, unless the value is explicitly provided in the config. The default is
// 2 (see include/rocksdb/options.h in the Conan dependency directory), so don't use fewer
// than that if no value is explicitly provided.
if (keyValues.exists("max_background_jobs"))
{
m_options.max_background_jobs = get<unsigned int>(keyValues, "max_background_jobs");
}
else if (auto v = numHardwareThreads; v > 2)
{
m_options.max_background_jobs = v;
}
// Set subcompactions for parallel compaction within a job to the number of hardware
// threads, unless the value is explicitly provided in the config. The default is 1 (see
// include/rocksdb/options.h in the Conan dependency directory), so don't use fewer
// than that if no value is explicitly provided.
if (keyValues.exists("max_subcompactions"))
{
m_options.max_subcompactions = get<unsigned int>(keyValues, "max_subcompactions");
}
else if (auto v = numHardwareThreads / 2; v > 1)
{
m_options.max_subcompactions = v;
}
// Enable direct I/O by default unless explicitly disabled in the config. This bypasses the
// OS page cache for better predictable performance on SSDs.
m_options.use_direct_reads = get<bool>(keyValues, "use_direct_io", true);
m_options.use_direct_io_for_flush_and_compaction =
get<bool>(keyValues, "use_direct_io", true);
std::string s1, s2;
rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
@@ -259,23 +295,19 @@ public:
rocksdb::ReadOptions const options;
rocksdb::Slice const slice(std::bit_cast<char const*>(hash.data()), m_keyBytes);
std::string string;
rocksdb::Status getStatus = m_db->Get(options, slice, &string);
if (getStatus.ok())
{
DecodedBlob decoded(hash.data(), string.data(), string.size());
if (decoded.wasOk())
{
*pObject = decoded.createObject();
}
else
{
// Decoding failed, probably corrupted!
//
// Decoding failed, probably corrupted.
status = dataCorrupt;
}
}
@@ -292,7 +324,6 @@ public:
else
{
status = Status(customCode + unsafe_cast<int>(getStatus.code()));
JLOG(m_journal.error()) << getStatus.ToString();
}
}
@@ -303,19 +334,44 @@ public:
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
fetchBatch(std::vector<uint256> const& hashes) override
{
std::vector<std::shared_ptr<NodeObject>> results;
results.reserve(hashes.size());
XRPL_ASSERT(m_db, "xrpl::NodeStore::RocksDBBackend::fetchBatch : non-null database");
if (hashes.empty())
{
return {{}, ok};
}
// Use MultiGet for parallel reads to allow RocksDB to fetch multiple keys concurrently,
// significantly improving throughput compared to sequential fetch() calls.
std::vector<rocksdb::Slice> keys;
keys.reserve(hashes.size());
for (auto const& h : hashes)
{
std::shared_ptr<NodeObject> nObj;
Status status = fetch(h, &nObj);
if (status != ok)
keys.emplace_back(std::bit_cast<char const*>(h.data()), m_keyBytes);
}
rocksdb::ReadOptions options;
options.async_io = true; // Enable for better concurrency on supported platforms.
std::vector<std::string> values(hashes.size());
auto statuses = m_db->MultiGet(options, keys, &values);
std::vector<std::shared_ptr<NodeObject>> results(hashes.size());
for (size_t i = 0; i < hashes.size(); ++i)
{
if (statuses[i].ok())
{
results.push_back({});
DecodedBlob decoded(hashes[i].data(), values[i].data(), values[i].size());
if (decoded.wasOk())
{
results[i] = decoded.createObject();
}
}
else
else if (!statuses[i].IsNotFound())
{
results.push_back(nObj);
// Log other errors but continue processing.
JLOG(m_journal.warn()) << "fetchBatch: MultiGet error for key "
<< keys[i].ToString() << ": " << statuses[i].ToString();
}
}
@@ -331,25 +387,45 @@ public:
void
storeBatch(Batch const& batch) override
{
XRPL_ASSERT(
m_db,
"xrpl::NodeStore::RocksDBBackend::storeBatch : non-null "
"database");
rocksdb::WriteBatch wb;
XRPL_ASSERT(m_db, "xrpl::NodeStore::RocksDBBackend::storeBatch : non-null database");
if (batch.empty())
{
return;
}
rocksdb::WriteBatch wb;
for (auto const& e : batch)
{
EncodedBlob encoded(e);
wb.Put(
rocksdb::Slice(std::bit_cast<char const*>(encoded.getKey()), m_keyBytes),
rocksdb::Slice(std::bit_cast<char const*>(encoded.getData()), encoded.getSize()));
}
rocksdb::WriteOptions const options;
// Configure WriteOptions for high throughput.
// Note: no_slowdown is intentionally NOT set here. When set to true, RocksDB returns an
// error instead of stalling when write buffers are full, which could cause write
// failures during high load. We prefer to accept brief stalls over dropped writes.
rocksdb::WriteOptions options;
// Setting `sync = false` improves write throughput significantly by allowing the OS to
// batch fsync operations, rather than forcing immediate disk synchronization on every
// write. The Write-Ahead Log (WAL) is still written and flushed, so database consistency is
// maintained across clean restarts and crashes.
//
// Note: On hard shutdown up to a few seconds of recent writes (since the last OS-initiated
// flush) may be lost from this node. However, since ledger data is replicated across
// the network, lost writes can be re-synced from peers during startup.
options.sync = false;
// Keep WAL enabled for crash recovery consistency.
options.disableWAL = false;
// Ensure RocksDB will not aggressive throttle the writes.
options.low_pri = false;
auto ret = m_db->Write(options, &wb);
if (!ret.ok())
Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
}

View File

@@ -520,6 +520,13 @@ public:
srcParams.set("type", srcBackendType);
srcParams.set("path", node_db.path());
beast::temp_dir dest_db;
Section destParams;
destParams.set("type", destBackendType);
destParams.set("path", dest_db.path());
testcase("import into '" + destBackendType + "' from '" + srcBackendType + "'");
// Create a batch
auto batch = createPredictableBatch(numObjectsToTest, seedValue);
@@ -538,16 +545,9 @@ public:
Manager::instance().make_Database(megabytes(4), scheduler, 2, srcParams, journal_);
// Set up the destination database
beast::temp_dir dest_db;
Section destParams;
destParams.set("type", destBackendType);
destParams.set("path", dest_db.path());
std::unique_ptr<Database> dest =
Manager::instance().make_Database(megabytes(4), scheduler, 2, destParams, journal_);
testcase("import into '" + destBackendType + "' from '" + srcBackendType + "'");
// Do the import
dest->importDatabase(*src);