diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 55e269dda..423a7506b 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -69,6 +69,7 @@ #include #include #include +#include #include #include @@ -774,7 +775,7 @@ private: std::condition_variable mCond; std::mutex mMutex; DispatchState mDispatchState = DispatchState::none; - std::vector mTransactions; + ThreadLocalQueue mTransactions; StateAccounting accounting_{}; @@ -1404,7 +1405,6 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) std::unique_lock ledgerLock{ m_ledgerMaster.peekMutex(), std::defer_lock}; std::lock(masterLock, ledgerLock); - app_.openLedger().modify([&](OpenView& view, beast::Journal j) { for (TransactionStatus& e : transactions) { diff --git a/src/ripple/basics/ThreadLocalQueue.h b/src/ripple/basics/ThreadLocalQueue.h new file mode 100644 index 000000000..30b7d7c3c --- /dev/null +++ b/src/ripple/basics/ThreadLocalQueue.h @@ -0,0 +1,197 @@ +#ifndef RIPPLE_BASICS_THREADLOCALQUEUE_H_INCLUDED +#define RIPPLE_BASICS_THREADLOCALQUEUE_H_INCLUDED + +#include +#include +#include +#include +#include +#include + +namespace ripple { +template +class ThreadLocalQueue +{ +private: + static inline thread_local std::vector local_queue; + static inline thread_local size_t local_count{0}; + + mutable std::recursive_mutex mutex; + std::set*> thread_queues; + std::atomic total_count{0}; + +public: + void + register_thread() noexcept + { + std::lock_guard lock(mutex); + thread_queues.emplace(&local_queue); + local_count = 0; + } + + void + unregister_thread() noexcept + { + std::lock_guard lock(mutex); + auto it = + std::find(thread_queues.begin(), thread_queues.end(), &local_queue); + if (it != thread_queues.end()) + { + thread_queues.erase(it); + } + total_count.fetch_sub(local_count, std::memory_order_relaxed); + local_queue.clear(); + local_count = 0; + } + + void + push_back(T item) noexcept + { + static thread_local bool registered = false; + if (!registered) + { + register_thread(); + registered = true; + } + std::lock_guard lock(mutex); + local_queue.push_back(std::move(item)); + local_count++; + total_count.fetch_add(1, std::memory_order_relaxed); + } + + std::vector + merge_and_clear() noexcept + { + std::lock_guard lock(mutex); + + size_t expected_size = total_count.load(std::memory_order_relaxed); + std::vector merged; + merged.reserve(expected_size); + + for (auto* queue : thread_queues) + { + merged.insert( + merged.end(), + std::make_move_iterator(queue->begin()), + std::make_move_iterator(queue->end())); + queue->clear(); + queue->shrink_to_fit(); + } + + total_count.store(0, std::memory_order_relaxed); + for (auto* queue : thread_queues) + { + *const_cast(&local_count) = 0; + } + + return merged; + } + + size_t + local_size() const noexcept + { + std::lock_guard lock(mutex); + return local_count; + } + + size_t + size() const noexcept + { + std::lock_guard lock(mutex); + return total_count.load(std::memory_order_relaxed); + } + + bool + empty() const noexcept + { + std::lock_guard lock(mutex); + return total_count.load(std::memory_order_relaxed) == 0; + } + + void + swap(ThreadLocalQueue& other) noexcept + { + if (this == &other) + return; + + // With recursive mutex, we can simply lock both mutexes sequentially + { + std::scoped_lock(mutex, other.mutex); + + // Swap the thread queues + thread_queues.swap(other.thread_queues); + + // Swap the total counts using atomic exchange + size_t this_count = total_count.load(std::memory_order_relaxed); + size_t other_count = other.total_count.load(std::memory_order_relaxed); + total_count.store(other_count, std::memory_order_relaxed); + other.total_count.store(this_count, std::memory_order_relaxed); + } + } + + friend void + swap(ThreadLocalQueue& lhs, ThreadLocalQueue& rhs) noexcept + { + lhs.swap(rhs); + } + + void + swap(std::vector& vec) noexcept + { + std::lock_guard lock(mutex); + + std::vector temp; + temp.swap(vec); + + // Move all our queue contents into vec + size_t total_size = total_count.load(std::memory_order_relaxed); + + vec.reserve(total_size); + + // For each queue + int i = 0; + for (auto* queue : thread_queues) + { + // Move elements one at a time to handle non-copyable types + while (!queue->empty()) + { + vec.push_back(std::move(queue->back())); + queue->pop_back(); + } + queue->shrink_to_fit(); + } + + // Move temp's contents to the first available queue + if (!thread_queues.empty()) + { + auto* first_queue = *thread_queues.begin(); + // Move elements one at a time from temp to queue + while (!temp.empty()) + { + first_queue->push_back(std::move(temp.back())); + temp.pop_back(); + } + local_count = first_queue->size(); + total_count.store(local_count, std::memory_order_relaxed); + } + else + { + total_count.store(0, std::memory_order_relaxed); + } + } + + friend void + swap(ThreadLocalQueue& tlq, std::vector& vec) noexcept + { + tlq.swap(vec); + } + + friend void + swap(std::vector& vec, ThreadLocalQueue& tlq) noexcept + { + tlq.swap(vec); + } +}; + +} // namespace ripple +#endif // THREAD_LOCAL_QUEUE_HPP