diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 423a7506b..175f1e5ee 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -323,6 +323,9 @@ public: void transactionBatch(); + void + forceTransactionBatch(); + /** * Attempt to apply transactions and post-process based on the results. * @@ -1371,6 +1374,17 @@ NetworkOPsImp::doTransactionSync( } while (transaction->getApplying()); } +void +NetworkOPsImp::forceTransactionBatch() +{ + std::unique_lock lock(mMutex); + mDispatchState = DispatchState::scheduled; + while (mTransactions.size()) + { + apply(lock); + } +} + void NetworkOPsImp::transactionBatch() { diff --git a/src/ripple/app/misc/NetworkOPs.h b/src/ripple/app/misc/NetworkOPs.h index abfdd1898..fe062c420 100644 --- a/src/ripple/app/misc/NetworkOPs.h +++ b/src/ripple/app/misc/NetworkOPs.h @@ -139,6 +139,9 @@ public: bool bLocal, FailHard failType = FailHard::no) = 0; + virtual void + forceTransactionBatch() = 0; + //-------------------------------------------------------------------------- // // Owner functions diff --git a/src/ripple/basics/ThreadLocalQueue.h b/src/ripple/basics/ThreadLocalQueue.h index 436632e9e..7610459e8 100644 --- a/src/ripple/basics/ThreadLocalQueue.h +++ b/src/ripple/basics/ThreadLocalQueue.h @@ -1,133 +1,318 @@ -#ifndef RIPPLE_BASICS_THREADLOCALQUEUE_H_INCLUDED -#define RIPPLE_BASICS_THREADLOCALQUEUE_H_INCLUDED - #include +#include #include #include #include +#include +#include #include -#include -namespace ripple { +#define DEBUG_TLQ 0 + +namespace detail { + +enum class Operation { PUSH_BACK, MERGE_AND_CLEAR, SIZE, REGISTER, UNREGISTER }; + +// Thread-specific data including mutex and vector +template +struct ThreadData +{ + std::mutex mutex; + std::vector queue; +}; + +// Structure to hold per-instance data +template +struct InstanceData +{ + std::mutex mutex; + std::atomic total_size{0}; + std::unordered_map>> + thread_queues; +}; + +// Global map of instance IDs to their data +template +struct GlobalInstances +{ + std::mutex mutex; + std::unordered_map>> instances; + + static GlobalInstances& + instance() + { + static GlobalInstances inst; + return inst; + } +}; + +// Helper to get or create instance data +template +std::shared_ptr> +get_instance_data(uintptr_t instance_id) +{ + auto& global = GlobalInstances::instance(); + std::lock_guard lock(global.mutex); + auto it = global.instances.find(instance_id); + if (it == global.instances.end()) + { + auto data = std::make_shared>(); + global.instances[instance_id] = data; + return data; + } + return it->second; +} + +// Generic dispatch function that handles all operations +template +std::pair, size_t> +dispatch(uintptr_t instance_id, Operation op, T* item = nullptr) +{ + auto instance_data = get_instance_data(instance_id); + auto thread_id = std::this_thread::get_id(); + + switch (op) + { + case Operation::MERGE_AND_CLEAR: { + auto current_size = + instance_data->total_size.load(std::memory_order_acquire); + if (DEBUG_TLQ) + std::cout << "start merge, counter=" << current_size << "\n"; + + if (current_size == 0) + { + return {std::vector(), 0}; + } + + std::vector result; + size_t total_size = 0; + + // Lock the entire instance data to prevent concurrent modifications + std::lock_guard instance_lock(instance_data->mutex); + + // First calculate total size + for (const auto& queue_entry : instance_data->thread_queues) + { + auto queue_size = queue_entry.second->queue.size(); + total_size += queue_size; + if (DEBUG_TLQ) + std::cout << "queue size for thread " << queue_entry.first + << ": " << queue_size << "\n"; + } + + if (DEBUG_TLQ) + std::cout << "merge total size: " << total_size << "\n"; + result.reserve(total_size); + + // Move all elements from all thread queues + for (auto& queue_entry : instance_data->thread_queues) + { + auto& queue = queue_entry.second->queue; + std::lock_guard queue_lock( + queue_entry.second->mutex); + + // For non-movable types, we need to copy elements individually + for (auto& item : queue) + { + result.push_back( + item); // Will use copy constructor instead of move + } + if (DEBUG_TLQ) + std::cout << "copied queue of size: " << queue.size() + << " for thread " << queue_entry.first << "\n"; + queue.clear(); + } + + instance_data->total_size.store(0, std::memory_order_release); + if (DEBUG_TLQ) + std::cout << "merge size: " << result.size() << ", counter: " + << instance_data->total_size.load( + std::memory_order_acquire) + << "\n"; + return {std::move(result), 0}; + } + + case Operation::REGISTER: { + std::lock_guard lock(instance_data->mutex); + if (instance_data->thread_queues.find(thread_id) == + instance_data->thread_queues.end()) + { + instance_data->thread_queues[thread_id] = + std::make_unique>(); + } + return {std::vector(), 0}; + } + + case Operation::PUSH_BACK: { + // Ensure thread is registered + { + std::lock_guard lock(instance_data->mutex); + if (instance_data->thread_queues.find(thread_id) == + instance_data->thread_queues.end()) + { + instance_data->thread_queues[thread_id] = + std::make_unique>(); + } + } + + auto& thread_data = instance_data->thread_queues[thread_id]; + std::lock_guard lock(thread_data->mutex); + + if (item) + { + thread_data->queue.push_back(std::move(*item)); + instance_data->total_size.fetch_add( + 1, std::memory_order_release); + } + + if (DEBUG_TLQ) + std::cout << "pushed, counter=" + << instance_data->total_size.load( + std::memory_order_acquire) + << "\n"; + return {std::vector(), 0}; + } + + case Operation::SIZE: { + return { + std::vector(), + instance_data->total_size.load(std::memory_order_acquire)}; + } + + case Operation::UNREGISTER: { + std::lock_guard lock(instance_data->mutex); + if (auto it = instance_data->thread_queues.find(thread_id); + it != instance_data->thread_queues.end()) + { + size_t queue_size = it->second->queue.size(); + instance_data->total_size.fetch_sub( + queue_size, std::memory_order_release); + instance_data->thread_queues.erase(it); + } + return {std::vector(), 0}; + } + + default: + return {std::vector(), 0}; + } +} + +} // namespace detail + template class ThreadLocalQueue { -private: - static inline thread_local std::vector local_queue; - static inline thread_local size_t local_count{0}; - - mutable std::recursive_mutex mutex; - std::set*> thread_queues; - std::atomic total_count{0}; - public: + ThreadLocalQueue() : instance_id(reinterpret_cast(this)) + { + // Initial registration happens in the constructor thread + detail::dispatch(instance_id, detail::Operation::REGISTER); + } + + ~ThreadLocalQueue() + { + // Clean up instance data when the queue is destroyed + auto& global = detail::GlobalInstances::instance(); + std::lock_guard lock(global.mutex); + global.instances.erase(instance_id); + } + + // Prevent copying to avoid counter complications + ThreadLocalQueue(const ThreadLocalQueue&) = delete; + ThreadLocalQueue& + operator=(const ThreadLocalQueue&) = delete; + + // Move operations + ThreadLocalQueue(ThreadLocalQueue&& other) noexcept + : instance_id(other.instance_id) + { + other.instance_id = 0; + } + + ThreadLocalQueue& + operator=(ThreadLocalQueue&& other) noexcept + { + if (this != &other) + { + auto& global = detail::GlobalInstances::instance(); + std::lock_guard lock(global.mutex); + global.instances.erase(instance_id); + instance_id = other.instance_id; + other.instance_id = 0; + } + return *this; + } + + // Explicitly register current thread void register_thread() noexcept { - std::lock_guard lock(mutex); - thread_queues.emplace(&local_queue); - std::cout << "Thread registered: " << reinterpret_cast(&local_queue) << "\n"; - local_count = 0; + detail::dispatch(instance_id, detail::Operation::REGISTER); } void unregister_thread() noexcept { - std::lock_guard lock(mutex); - auto it = - std::find(thread_queues.begin(), thread_queues.end(), &local_queue); - if (it != thread_queues.end()) - { - thread_queues.erase(it); - } - total_count.fetch_sub(local_count, std::memory_order_relaxed); - local_queue.clear(); - local_count = 0; + detail::dispatch(instance_id, detail::Operation::UNREGISTER); } void push_back(T item) noexcept { - static thread_local bool registered = false; - if (!registered) - { - register_thread(); - registered = true; - } - std::lock_guard lock(mutex); - local_queue.push_back(std::move(item)); - local_count++; - total_count.fetch_add(1, std::memory_order_relaxed); + detail::dispatch(instance_id, detail::Operation::PUSH_BACK, &item); } std::vector merge_and_clear() noexcept { - std::lock_guard lock(mutex); - - size_t expected_size = total_count.load(std::memory_order_relaxed); - std::vector merged; - merged.reserve(expected_size); - - for (auto* queue : thread_queues) - { - merged.insert( - merged.end(), - std::make_move_iterator(queue->begin()), - std::make_move_iterator(queue->end())); - queue->clear(); - queue->shrink_to_fit(); - } - - total_count.store(0, std::memory_order_relaxed); - for (auto* queue : thread_queues) - { - *const_cast(&local_count) = 0; - } - - return merged; - } - - size_t - local_size() const noexcept - { - std::lock_guard lock(mutex); - return local_count; + return detail::dispatch( + instance_id, detail::Operation::MERGE_AND_CLEAR) + .first; } size_t size() const noexcept { - std::lock_guard lock(mutex); - return total_count.load(std::memory_order_relaxed); + return detail::dispatch(instance_id, detail::Operation::SIZE).second; } bool empty() const noexcept { - std::lock_guard lock(mutex); - return total_count.load(std::memory_order_relaxed) == 0; + return size() == 0; + } + + void + swap(std::vector& other) noexcept + { + auto merged = merge_and_clear(); + merged.swap(other); } void swap(ThreadLocalQueue& other) noexcept { if (this == &other) - return; - - // With recursive mutex, we can simply lock both mutexes sequentially { - std::scoped_lock(mutex, other.mutex); - - // Swap the thread queues - thread_queues.swap(other.thread_queues); - - // Swap the total counts using atomic exchange - size_t this_count = total_count.load(std::memory_order_relaxed); - size_t other_count = other.total_count.load(std::memory_order_relaxed); - total_count.store(other_count, std::memory_order_relaxed); - other.total_count.store(this_count, std::memory_order_relaxed); + return; } + + auto& global = detail::GlobalInstances::instance(); + + // Get the global lock first + std::lock_guard global_lock(global.mutex); + + // Then get individual instance locks in a consistent order + auto my_instance = global.instances[instance_id]; + auto other_instance = global.instances[other.instance_id]; + + std::lock_guard lock1(my_instance->mutex); + std::lock_guard lock2(other_instance->mutex); + + // Now safe to swap instance IDs and data + std::swap(instance_id, other.instance_id); + std::swap( + global.instances[instance_id], global.instances[other.instance_id]); } friend void @@ -136,63 +321,6 @@ public: lhs.swap(rhs); } - void - swap(std::vector& vec) noexcept - { - std::lock_guard lock(mutex); - - std::vector temp; - temp.swap(vec); - - // Move all our queue contents into vec - size_t total_size = total_count.load(std::memory_order_relaxed); - - vec.reserve(total_size); - - // For each queue - int i = 0; - for (auto* queue : thread_queues) - { - // Move elements one at a time to handle non-copyable types - while (!queue->empty()) - { - vec.push_back(std::move(queue->back())); - queue->pop_back(); - } - queue->shrink_to_fit(); - } - - // Move temp's contents to the first available queue - if (!thread_queues.empty()) - { - auto* first_queue = *thread_queues.begin(); - // Move elements one at a time from temp to queue - while (!temp.empty()) - { - first_queue->push_back(std::move(temp.back())); - temp.pop_back(); - } - local_count = first_queue->size(); - total_count.store(local_count, std::memory_order_relaxed); - } - else - { - total_count.store(0, std::memory_order_relaxed); - } - } - - friend void - swap(ThreadLocalQueue& tlq, std::vector& vec) noexcept - { - tlq.swap(vec); - } - - friend void - swap(std::vector& vec, ThreadLocalQueue& tlq) noexcept - { - tlq.swap(vec); - } +private: + uintptr_t instance_id; }; - -} // namespace ripple -#endif // THREAD_LOCAL_QUEUE_HPP diff --git a/src/test/app/LedgerStress_test.cpp b/src/test/app/LedgerStress_test.cpp index 61f870bdc..1fb71a544 100644 --- a/src/test/app/LedgerStress_test.cpp +++ b/src/test/app/LedgerStress_test.cpp @@ -19,14 +19,14 @@ using namespace jtx; class LedgerStress_test : public beast::unit_test::suite { private: - static constexpr std::size_t TXN_PER_LEDGER = 100; - static constexpr std::size_t MAX_TXN_PER_ACCOUNT = 1; + static constexpr std::size_t TXN_PER_LEDGER = 20000; + static constexpr std::size_t MAX_TXN_PER_ACCOUNT = 5; static constexpr std::chrono::seconds MAX_CLOSE_TIME{6}; static constexpr std::size_t REQUIRED_ACCOUNTS = (TXN_PER_LEDGER + MAX_TXN_PER_ACCOUNT - 1) / MAX_TXN_PER_ACCOUNT; // Get number of hardware threads and use half - const std::size_t NUM_THREADS = 1;//std::max(std::thread::hardware_concurrency() / 2, 1u); + const std::size_t NUM_THREADS = std::max(std::thread::hardware_concurrency() / 2, 1u); struct LedgerMetrics { std::chrono::milliseconds submitTime{0}; @@ -86,39 +86,49 @@ private: env.close(); return accounts; } + + // Structure to hold work assignment for each thread + struct ThreadWork { + std::size_t startAccountIdx; + std::size_t endAccountIdx; + std::size_t numTxnsToSubmit; + }; void submitBatchThread( jtx::Env& env, - std::vector const& senders, - std::vector const& allAccounts, - std::size_t startIdx, - std::size_t endIdx, - std::size_t txPerAccount) + std::vector const& accounts, + ThreadWork const& work) { - static thread_local int tmp = 0; - std::cout << "submitBatchThread " << reinterpret_cast(&tmp) << "\n"; - - for (std::size_t i = startIdx; i < endIdx; ++i) + auto const escalatedFee = getEscalatedFee(env); + std::size_t txnsSubmitted = 0; + + for (std::size_t i = work.startAccountIdx; + i < work.endAccountIdx && txnsSubmitted < work.numTxnsToSubmit; ++i) { - auto const& sender = senders[i]; + auto const& sender = accounts[i]; + // Create list of possible recipients (everyone except sender) std::vector recipients; - recipients.reserve(allAccounts.size() - 1); - for (auto const& acct : allAccounts) + recipients.reserve(accounts.size() - 1); + for (auto const& acct : accounts) { if (acct.id() != sender.id()) recipients.push_back(acct); } - auto const escalatedFee = getEscalatedFee(env); + // Calculate how many txns to submit from this account + std::size_t txnsRemaining = work.numTxnsToSubmit - txnsSubmitted; + std::size_t txnsForAccount = std::min(MAX_TXN_PER_ACCOUNT, txnsRemaining); - for (std::size_t tx = 0; tx < txPerAccount; ++tx) + // Submit transactions + for (std::size_t tx = 0; tx < txnsForAccount; ++tx) { auto const& recipient = recipients[tx % recipients.size()]; env.inject(pay(sender, recipient, XRP(1)), fee(escalatedFee), seq(autofill)); + ++txnsSubmitted; } } } @@ -130,19 +140,14 @@ private: " ledgers using " + std::to_string(NUM_THREADS) + " threads"); Env env{*this, envconfig(many_workers)}; - - Application* appPtr = env.getApp(); - - std::cout << "Application ptr: " << reinterpret_cast(appPtr) << "\n"; - - auto const journal = env.app().journal("LedgerStressTest"); - env.app().config().MAX_TRANSACTIONS = 100000; - // Get actual hardware thread count and ensure consistent types + auto const journal = env.app().journal("LedgerStressTest"); + + // Get actual hardware thread count std::size_t hardwareThreads = static_cast(std::thread::hardware_concurrency()); if (hardwareThreads == 0) - hardwareThreads = 4; // Fallback if hardware_concurrency() fails + hardwareThreads = 4; // Fallback const std::size_t THREAD_COUNT = std::min(NUM_THREADS, hardwareThreads); @@ -154,10 +159,6 @@ private: std::vector metrics; metrics.reserve(numLedgers); - // Create thread pool - std::vector threadPool; - threadPool.reserve(THREAD_COUNT); - for (std::size_t ledger = 0; ledger < numLedgers; ++ledger) { threadSafeLog("Starting ledger " + std::to_string(ledger)); @@ -166,57 +167,55 @@ private: auto submitStart = std::chrono::steady_clock::now(); ledgerMetrics.baseFee = env.current()->fees().base; - // Calculate work distribution - std::size_t txnsPerThread = (TXN_PER_LEDGER + THREAD_COUNT - 1) / THREAD_COUNT; - std::size_t accountsPerThread = (accounts.size() + THREAD_COUNT - 1) / THREAD_COUNT; - - // Atomic counter for synchronization - std::atomic completedTxns{0}; - std::mutex submitMutex; - - // Launch worker threads + // Pre-calculate exact work distribution for threads + std::vector threadAssignments; + threadAssignments.reserve(THREAD_COUNT); + + std::size_t totalAccountsAssigned = 0; + std::size_t totalTxnsAssigned = 0; + std::size_t accountsPerThread = accounts.size() / THREAD_COUNT; + for (std::size_t t = 0; t < THREAD_COUNT; ++t) { - std::size_t startIdx = t * accountsPerThread; - std::size_t endIdx = std::min(startIdx + accountsPerThread, accounts.size()); + ThreadWork work; + work.startAccountIdx = totalAccountsAssigned; - threadPool.emplace_back( - [&, startIdx, endIdx]() { - try { - std::size_t localTxnCount = 0; - std::size_t targetTxns = txnsPerThread; - - // Ensure we don't exceed total desired transactions - { - std::lock_guard lock(submitMutex); - std::size_t remaining = TXN_PER_LEDGER - completedTxns; - targetTxns = std::min(txnsPerThread, remaining); - } + // Last thread gets remaining accounts + if (t == THREAD_COUNT - 1) { + work.endAccountIdx = accounts.size(); + work.numTxnsToSubmit = TXN_PER_LEDGER - totalTxnsAssigned; + } + else { + work.endAccountIdx = work.startAccountIdx + accountsPerThread; + work.numTxnsToSubmit = TXN_PER_LEDGER / THREAD_COUNT; + } + + totalAccountsAssigned = work.endAccountIdx; + totalTxnsAssigned += work.numTxnsToSubmit; + threadAssignments.push_back(work); + } + + BEAST_EXPECT(totalTxnsAssigned == TXN_PER_LEDGER); - submitBatchThread( - env, - accounts, - accounts, - startIdx, - endIdx, - targetTxns); - - completedTxns += targetTxns; - } - catch (const std::exception& e) { - threadSafeLog("Thread exception: " + std::string(e.what())); - } + // Launch threads with pre-calculated work assignments + std::vector threads; + threads.reserve(THREAD_COUNT); + + for (std::size_t t = 0; t < THREAD_COUNT; ++t) + { + threads.emplace_back( + [&env, &accounts, work = threadAssignments[t], this]() { + submitBatchThread(env, accounts, work); } ); } - // Wait for all threads to complete - for (auto& thread : threadPool) + // Wait for all threads + for (auto& thread : threads) { if (thread.joinable()) thread.join(); } - threadPool.clear(); ledgerMetrics.submitTime = std::chrono::duration_cast( std::chrono::steady_clock::now() - submitStart); @@ -243,12 +242,10 @@ private: } } - - public: void run() override { - runStressTest(1); + runStressTest(5); } }; diff --git a/src/test/jtx/impl/Env.cpp b/src/test/jtx/impl/Env.cpp index 0c68bca7e..4808fc2e9 100644 --- a/src/test/jtx/impl/Env.cpp +++ b/src/test/jtx/impl/Env.cpp @@ -126,13 +126,17 @@ Env::close( { // Round up to next distinguishable value using namespace std::chrono_literals; + + auto& netOPs = app().getOPs(); + netOPs.forceTransactionBatch(); + bool res = true; closeTime += closed()->info().closeTimeResolution - 1s; timeKeeper().set(closeTime); // Go through the rpc interface unless we need to simulate // a specific consensus delay. if (consensusDelay) - app().getOPs().acceptLedger(consensusDelay); + netOPs.acceptLedger(consensusDelay); else { auto resp = rpc("ledger_accept"); @@ -295,18 +299,17 @@ Env::inject_jtx(JTx const& jt) { std::string reason; // make a copy - STTx* newData = new STTx(*jt.stx); - auto stx = std::shared_ptr(newData); - auto id = stx->getTransactionID(); + //STTx* newData = new STTx(*jt.stx); + //auto stx = std::shared_ptr(newData); + auto id = jt.stx->getTransactionID(); - static std::map> storage; - storage.emplace(id, stx); - - auto tx = std::make_shared(stx, reason, app); + auto tx = std::make_shared(jt.stx, reason, app); static int counter = 0; - std::cout << "inject_jtx [" << counter++ << "] id=" << id << "\n"; - netOPs.processTransaction(tx, false, false); + counter++; + if (counter % 2500 == 0) + std::cout << "inject_jtx [" << counter++ << "] id=" << id << "\n"; + netOPs.processTransaction(tx, true, false); } return postconditions(jt, ter_, true);