diff --git a/include/xrpl/nodestore/Backend.h b/include/xrpl/nodestore/Backend.h index 6030312b67..24f3d80e7d 100644 --- a/include/xrpl/nodestore/Backend.h +++ b/include/xrpl/nodestore/Backend.h @@ -141,6 +141,19 @@ public: /** The number of hardware threads to use for compression of a batch. */ static unsigned int const numHardwareThreads; + + /** Calculate parallelization parameters for a batch of items. + + Determines the number of threads and items per thread needed for parallel batch processing. + + @param batchSize Number of items to process + @param maxThreadCount Maximum number of threads to use. + @return A pair of (numThreads, numItems) where numThreads is the exact number of threads to + use, and numItems is the number of items per thread. The last thread may process + fewer items. + */ + static std::pair + calculateBatchParallelism(unsigned int batchSize, unsigned int maxThreadCount); }; } // namespace NodeStore diff --git a/src/libxrpl/nodestore/backend/Backend.cpp b/src/libxrpl/nodestore/backend/Backend.cpp index 65d4e7c1e2..443ae15f61 100644 --- a/src/libxrpl/nodestore/backend/Backend.cpp +++ b/src/libxrpl/nodestore/backend/Backend.cpp @@ -14,5 +14,40 @@ unsigned int const Backend::numHardwareThreads = []() { return std::min(std::max(hw, 1u), 8u); }(); +std::pair +Backend::calculateBatchParallelism(unsigned int batchSize, unsigned int maxThreadCount) +{ + // Estimate the number of threads using ceiling division: aim for at least 4 items per thread, + // but don't exceed the number of available threads. + auto const initialThreads = std::min((batchSize + 3u) / 4u, maxThreadCount); + + // Calculate number of items per thread. + auto const numItems = (batchSize + initialThreads - 1u) / initialThreads; + + // Calculate the actual number of threads needed. After rounding up numItems, we may need fewer + // threads than initially estimated. + auto const actualThreads = (batchSize + numItems - 1u) / numItems; + + // Sanity checks. + XRPL_ASSERT( + numItems <= batchSize, + "xrpl::NodeStore::Backend::calculateBatchParallelism : numItems <= batchSize"); + XRPL_ASSERT( + actualThreads <= batchSize, + "xrpl::NodeStore::Backend::calculateBatchParallelism : actualThreads <= batchSize"); + XRPL_ASSERT( + actualThreads <= maxThreadCount, + "xrpl::NodeStore::Backend::calculateBatchParallelism : actualThreads <= hwThreadCount"); + if (numItems > batchSize || actualThreads > batchSize || actualThreads > maxThreadCount) + { + // LCOV_EXCL_START + UNREACHABLE("xrpl::NodeStore::Backend::calculateBatchParallelism : sanity check failed"); + return {1, batchSize}; + // LCOV_EXCL_STOP + } + + return {actualThreads, numItems}; +} + } // namespace NodeStore } // namespace xrpl diff --git a/src/libxrpl/nodestore/backend/NuDBFactory.cpp b/src/libxrpl/nodestore/backend/NuDBFactory.cpp index 83e6ca3b0a..602f5de005 100644 --- a/src/libxrpl/nodestore/backend/NuDBFactory.cpp +++ b/src/libxrpl/nodestore/backend/NuDBFactory.cpp @@ -74,11 +74,32 @@ public: { try { - // Wait for all thread pool tasks to complete before closing the database. This prevents - // worker threads from accessing the database after close. + // Set shutdown flag to prevent new batch operations from starting. This must happen + // before stop() is called to ensure fetchBatch/storeBatch check the flag before posting + // any new tasks. + shutdown_.store(true, std::memory_order_release); + + // Wait for all active operations to complete. + while (pendingReads_.load(std::memory_order_acquire) > 0 && + pendingWrites_.load(std::memory_order_acquire) > 0) + { + std::this_thread::yield(); + } + + // Signal the thread pool to stop accepting new work. This ensures no new tasks will be + // posted after this point. + threadPool_.stop(); + + // Wait for all currently executing thread pool tasks to complete. This prevents worker + // threads from accessing the database after close(). threadPool_.join(); - // close can throw and we don't want the destructor to throw. + // Verify all writes have completed. + XRPL_ASSERT( + pendingWrites_.load() == 0, "xrpl::NuDBBackend::~NuDBBackend : pendingWrites == 0"); + + // Close the database. At this point, all threads have stopped and no pending reads and + // writes remain, so it's safe to close the database. close(); } catch (nudb::system_error const&) // NOLINT(bugprone-empty-catch) @@ -107,9 +128,7 @@ public: if (db_.is_open()) { // LCOV_EXCL_START - UNREACHABLE( - "xrpl::NodeStore::NuDBBackend::open : database is already " - "open"); + UNREACHABLE("xrpl::NodeStore::NuDBBackend::open : database is already open"); JLOG(j_.error()) << "database is already open"; return; // LCOV_EXCL_STOP @@ -187,6 +206,18 @@ public: Status fetch(uint256 const& hash, std::shared_ptr* pno) override { + // Increment pending reads counter on entry, decrement on exit. This ensures the destructor + // waits for this operation to complete. + ++pendingReads_; + auto guard = [this](void*) { --pendingReads_; }; + std::unique_ptr opGuard(reinterpret_cast(1), guard); + + // Check if we're shutting down. If so, return immediately instead of doing any work. + if (shutdown_.load(std::memory_order_acquire)) + { + return backendError; + } + Status status = ok; pno->reset(); nudb::error_code ec; @@ -226,10 +257,23 @@ public: return {{}, ok}; } + // Increment pending reads counter on entry, decrement on exit. This ensures the destructor + // waits for this operation to complete. + pendingReads_ += hashes.size(); + auto guard = [this, &hashes](void*) { pendingReads_ -= hashes.size(); }; + std::unique_ptr opGuard(reinterpret_cast(1), guard); + + // Check if we're shutting down. If so, return immediately instead of doing any work. + if (shutdown_.load(std::memory_order_acquire)) + { + return {{}, backendError}; + } + std::vector> results(hashes.size()); - // Calculate optimal parallelization parameters for the batch. - auto const [numThreads, numItems] = calculateBatchParallelization(hashes.size()); + // Calculate parallelization parameters for the batch. + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(hashes.size(), numHardwareThreads); // If we need only one thread, just do it sequentially. if (numThreads == 1u) @@ -329,21 +373,23 @@ public: void store(std::shared_ptr const& no) override { + // Increment pending writes counter on entry, decrement on exit. This ensures the destructor + // waits for this operation to complete. + ++pendingWrites_; + auto guard = [this](void*) { --pendingWrites_; }; + std::unique_ptr opGuard(reinterpret_cast(1), guard); + + // Check if we're shutting down. If so, return immediately instead of doing any work. + if (shutdown_.load(std::memory_order_acquire)) + { + return; + } + BatchWriteReport report{}; report.writeCount = 1; auto const start = std::chrono::steady_clock::now(); - ++pendingWrites_; - try - { - do_insert(no); - } - catch (...) - { - --pendingWrites_; - throw; - } - --pendingWrites_; + do_insert(no); report.elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start); @@ -358,31 +404,33 @@ public: return; } + // Increment pending writes counter on entry, decrement on exit. This ensures the destructor + // waits for this operation to complete. + pendingWrites_ += batch.size(); + auto guard = [this, &batch](void*) { pendingWrites_ -= batch.size(); }; + std::unique_ptr opGuard(reinterpret_cast(1), guard); + + // Check if we're shutting down. If so, return immediately instead of doing any work. + if (shutdown_.load(std::memory_order_acquire)) + { + return; + } + BatchWriteReport report{}; report.writeCount = batch.size(); auto const start = std::chrono::steady_clock::now(); - pendingWrites_ += batch.size(); - - // Calculate optimal parallelization parameters for the batch. - auto const [numThreads, numItems] = calculateBatchParallelization(batch.size()); + // Calculate parallelization parameters for the batch. + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batch.size(), numHardwareThreads); // If we need only one thread, just do it sequentially. if (numThreads == 1u) { for (auto const& e : batch) { - try - { - do_insert(e); - } - catch (...) - { - pendingWrites_ -= batch.size(); - throw; - } + do_insert(e); } - pendingWrites_ -= batch.size(); report.elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start); @@ -460,7 +508,6 @@ public: { if (item.eptr) { - pendingWrites_ -= batch.size(); std::rethrow_exception(item.eptr); } @@ -468,13 +515,10 @@ public: db_.insert(item.key.data(), item.data.data(), item.data.size(), ec); if (ec && ec != nudb::error::key_exists) { - pendingWrites_ -= batch.size(); Throw(ec); } } - pendingWrites_ -= batch.size(); - report.elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start); scheduler_.onBatchWrite(report); @@ -560,33 +604,6 @@ public: } private: - /** Calculate optimal parallelization parameters for batch operations. - - Determines the number of items per thread and actual number of tasks needed for parallel - batch processing, ensuring no thread has an invalid start index. - - @param batchSize Number of items to process - @return A pair of (actualTasks, numItems) where actualTasks is the exact number of threads - to create and numItems is the number of items per thread. - */ - std::pair - calculateBatchParallelization(std::size_t batchSize) const - { - // Estimate the number of threads using ceiling division: aim for at least 4 items per - // thread, but don't exceed the number of available hardware threads. - auto const numThreads = - std::min(static_cast((batchSize + 3) / 4), numHardwareThreads); - - // Calculate items per thread. - auto const numItems = (batchSize + numThreads - 1) / numThreads; - - // Calculate actual tasks needed. After rounding up numItems, we may need fewer threads than - // initially estimated. - auto const actualTasks = (batchSize + numItems - 1) / numItems; - - return {actualTasks, numItems}; - } - static std::size_t parseBlockSize(std::string const& name, Section const& keyValues, beast::Journal journal) { @@ -642,8 +659,13 @@ private: nudb::store db_; std::atomic deletePath_; Scheduler& scheduler_; - std::atomic pendingWrites_{0}; - boost::asio::thread_pool threadPool_; + std::atomic pendingReads_{ + 0}; // Declare before threadPool_ to ensure it's destroyed after. + std::atomic pendingWrites_{ + 0}; // Declare before threadPool_ to ensure it's destroyed after. + std::atomic shutdown_{ + false}; // Declare before threadPool_ to ensure it's destroyed after. + boost::asio::thread_pool threadPool_; // Declare after db_ to ensure it's destroyed before. }; //------------------------------------------------------------------------------ diff --git a/src/tests/libxrpl/CMakeLists.txt b/src/tests/libxrpl/CMakeLists.txt index a82ed1472f..eb1a760967 100644 --- a/src/tests/libxrpl/CMakeLists.txt +++ b/src/tests/libxrpl/CMakeLists.txt @@ -53,3 +53,7 @@ if(NOT WIN32) target_link_libraries(xrpl.test.net PRIVATE xrpl.imports.test) add_dependencies(xrpl.tests xrpl.test.net) endif() + +xrpl_add_test(nodestore) +target_link_libraries(xrpl.test.nodestore PRIVATE xrpl.imports.test) +add_dependencies(xrpl.tests xrpl.test.nodestore) diff --git a/src/tests/libxrpl/nodestore/BatchParallelism.cpp b/src/tests/libxrpl/nodestore/BatchParallelism.cpp new file mode 100644 index 0000000000..f346323aa8 --- /dev/null +++ b/src/tests/libxrpl/nodestore/BatchParallelism.cpp @@ -0,0 +1,334 @@ +#include + +#include + +#include +#include + +using namespace xrpl; +using namespace xrpl::NodeStore; + +// Helper function to convert the pair result into ranges for testing. +std::vector> +calculateRanges(unsigned int batchSize, unsigned int maxThreadCount) +{ + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + + std::vector> ranges; + ranges.reserve(numThreads); + + for (unsigned int t = 0; t < numThreads; ++t) + { + auto const startIdx = t * numItems; + auto const endIdx = std::min(startIdx + numItems, batchSize); + ranges.emplace_back(startIdx, endIdx); + } + + return ranges; +} + +TEST(BatchParallelism, EmptyBatch) +{ + // Empty batch should return 0 threads. + { + auto const batchSize = 0u; + auto const maxThreadCount = 8u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 0u); + EXPECT_EQ(numItems, 0u); + + // Verify ranges calculation. + auto const ranges = calculateRanges(batchSize, maxThreadCount); + EXPECT_EQ(ranges.size(), numThreads); + } +} + +TEST(BatchParallelism, SmallBatches) +{ + // Batch size 1 should use 1 thread. + { + auto const batchSize = 1u; + auto const maxThreadCount = 8u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 1u); + EXPECT_EQ(numItems, 1u); + + auto const ranges = calculateRanges(batchSize, maxThreadCount); + ASSERT_EQ(ranges.size(), numThreads); + EXPECT_EQ(ranges[0].first, 0u); + EXPECT_EQ(ranges[0].second, 1u); + } + + // Batch size 2 should use 1 thread. + { + auto const batchSize = 2u; + auto const maxThreadCount = 8u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 1u); + EXPECT_EQ(numItems, 2u); + + auto const ranges = calculateRanges(batchSize, maxThreadCount); + ASSERT_EQ(ranges.size(), numThreads); + EXPECT_EQ(ranges[0].first, 0u); + EXPECT_EQ(ranges[0].second, 2u); + } + + // Batch size 3 should use 1 thread. + { + auto const batchSize = 3u; + auto const maxThreadCount = 8u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 1u); + EXPECT_EQ(numItems, 3u); + + auto const ranges = calculateRanges(batchSize, maxThreadCount); + ASSERT_EQ(ranges.size(), numThreads); + EXPECT_EQ(ranges[0].first, 0u); + EXPECT_EQ(ranges[0].second, 3u); + } + + // Batch size 4 should use 1 thread (exactly 4 items). + { + auto const batchSize = 4u; + auto const maxThreadCount = 8u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 1u); + EXPECT_EQ(numItems, 4u); + + auto const ranges = calculateRanges(batchSize, maxThreadCount); + ASSERT_EQ(ranges.size(), numThreads); + EXPECT_EQ(ranges[0].first, 0u); + EXPECT_EQ(ranges[0].second, 4u); + } +} + +TEST(BatchParallelism, MediumBatches) +{ + // Batch size 5 should use 2 threads. + { + auto const batchSize = 5u; + auto const maxThreadCount = 8u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 2u); // ceil(5/4) = 2 + EXPECT_EQ(numItems, 3u); // ceil(5/2) = 3 + + auto const ranges = calculateRanges(batchSize, maxThreadCount); + ASSERT_EQ(ranges.size(), numThreads); + EXPECT_EQ(ranges[0].first, 0u); + EXPECT_EQ(ranges[0].second, 3u); + EXPECT_EQ(ranges[1].first, 3u); + EXPECT_EQ(ranges[1].second, 5u); + } + + // Batch size 8 should use 2 threads. + { + auto const batchSize = 8u; + auto const maxThreadCount = 8u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 2u); + EXPECT_EQ(numItems, 4u); + + auto const ranges = calculateRanges(batchSize, maxThreadCount); + ASSERT_EQ(ranges.size(), numThreads); + for (size_t i = 0; i < numThreads; ++i) + { + EXPECT_EQ(ranges[i].first, i * numItems); + EXPECT_EQ(ranges[i].second, (i + 1) * numItems); + } + } + + // Batch size 15 should use 4 threads (ceil(15/4) = 4). + { + auto const batchSize = 15u; + auto const maxThreadCount = 8u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 4u); + EXPECT_EQ(numItems, 4u); + + auto const ranges = calculateRanges(batchSize, maxThreadCount); + ASSERT_EQ(ranges.size(), numThreads); + for (size_t i = 0; i < numThreads - 1; ++i) + { + EXPECT_EQ(ranges[i].first, i * numItems); + EXPECT_EQ(ranges[i].second, (i + 1) * numItems); + } + EXPECT_EQ(ranges[numThreads - 1].first, (numThreads - 1) * numItems); + EXPECT_EQ(ranges[numThreads - 1].second, batchSize); // Last range gets remaining items. + } + + // Batch size 22 should use 6 threads. + { + auto const batchSize = 22u; + auto const maxThreadCount = 8u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 6u); // ceil(22/4) = 6 + EXPECT_EQ(numItems, 4u); + + auto const ranges = calculateRanges(batchSize, maxThreadCount); + ASSERT_EQ(ranges.size(), numThreads); + for (size_t i = 0; i < numThreads - 1; ++i) + { + EXPECT_EQ(ranges[i].first, i * numItems); + EXPECT_EQ(ranges[i].second, (i + 1) * numItems); + } + EXPECT_EQ(ranges[numThreads - 1].first, (numThreads - 1) * numItems); + EXPECT_EQ(ranges[numThreads - 1].second, batchSize); + } + + // Batch size 32 should use 8 threads. + { + auto const batchSize = 32u; + auto const maxThreadCount = 8u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 8u); + EXPECT_EQ(numItems, 4u); + + auto const ranges = calculateRanges(batchSize, maxThreadCount); + ASSERT_EQ(ranges.size(), numThreads); + for (size_t i = 0; i < numThreads; ++i) + { + EXPECT_EQ(ranges[i].first, i * numItems); + EXPECT_EQ(ranges[i].second, (i + 1) * numItems); + } + } +} + +TEST(BatchParallelism, LargeBatches) +{ + // Batch size 100 should use 8 threads (max limit). + { + auto const batchSize = 100u; + auto const maxThreadCount = 8u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 8u); + EXPECT_EQ(numItems, 13u); // ceil(100/8) = 13 + + auto const ranges = calculateRanges(batchSize, maxThreadCount); + ASSERT_EQ(ranges.size(), numThreads); + for (size_t i = 0; i < numThreads - 1; ++i) + { + EXPECT_EQ(ranges[i].first, i * numItems); + EXPECT_EQ(ranges[i].second, (i + 1) * numItems); + } + EXPECT_EQ(ranges[numThreads - 1].first, (numThreads - 1) * numItems); + EXPECT_EQ(ranges[numThreads - 1].second, batchSize); + } + + // Batch size 1000 with 8 hw threads. + { + auto const batchSize = 1000u; + auto const maxThreadCount = 8u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 8u); + EXPECT_EQ(numItems, 125u); + + auto const ranges = calculateRanges(batchSize, maxThreadCount); + ASSERT_EQ(ranges.size(), numThreads); + for (size_t i = 0; i < numThreads; ++i) + { + EXPECT_EQ(ranges[i].first, i * numItems); + EXPECT_EQ(ranges[i].second, (i + 1) * numItems); + } + } +} + +TEST(BatchParallelism, HardwareThreadLimits) +{ + // With only 1 thread available. + { + auto const batchSize = 100u; + auto const maxThreadCount = 1u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 1u); + EXPECT_EQ(numItems, 100u); + + auto const ranges = calculateRanges(batchSize, maxThreadCount); + ASSERT_EQ(ranges.size(), numThreads); + EXPECT_EQ(ranges[0].first, 0u); + EXPECT_EQ(ranges[0].second, 100u); + } + + // With 2 threads. + { + auto const batchSize = 50u; + auto const maxThreadCount = 2u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 2u); + EXPECT_EQ(numItems, 25u); + + auto const ranges = calculateRanges(batchSize, maxThreadCount); + ASSERT_EQ(ranges.size(), numThreads); + for (size_t i = 0; i < numThreads; ++i) + { + EXPECT_EQ(ranges[i].first, i * numItems); + EXPECT_EQ(ranges[i].second, (i + 1) * numItems); + } + } + + // With 10 threads. + { + auto const batchSize = 50u; + auto const maxThreadCount = 12u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 10u); // ceil(50/4) = 13, but numThreads = 10. + EXPECT_EQ(numItems, 5u); + + auto const ranges = calculateRanges(batchSize, maxThreadCount); + ASSERT_EQ(ranges.size(), numThreads); + for (size_t i = 0; i < numThreads; ++i) + { + EXPECT_EQ(ranges[i].first, i * numItems); + EXPECT_EQ(ranges[i].second, (i + 1) * numItems); + } + } + + // With many threads. + { + auto const batchSize = 20u; + auto const maxThreadCount = 100u; + + auto const [numThreads, numItems] = + Backend::calculateBatchParallelism(batchSize, maxThreadCount); + EXPECT_EQ(numThreads, 5u); // ceil(20/4) = 5, limited by batch size. + EXPECT_EQ(numItems, 4u); + + auto const ranges = calculateRanges(batchSize, maxThreadCount); + ASSERT_EQ(ranges.size(), numThreads); + for (size_t i = 0; i < numThreads; ++i) + { + EXPECT_EQ(ranges[i].first, i * numItems); + EXPECT_EQ(ranges[i].second, (i + 1) * numItems); + } + } +} diff --git a/src/tests/libxrpl/nodestore/main.cpp b/src/tests/libxrpl/nodestore/main.cpp new file mode 100644 index 0000000000..5142bbe08a --- /dev/null +++ b/src/tests/libxrpl/nodestore/main.cpp @@ -0,0 +1,8 @@ +#include + +int +main(int argc, char** argv) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}