use a thread local queue to avoid txn submission bottleneck

This commit is contained in:
Richard Holland
2024-12-07 14:35:40 +11:00
parent 8c91d861c0
commit cb29902a37
2 changed files with 199 additions and 2 deletions

View File

@@ -69,6 +69,7 @@
#include <ripple/rpc/impl/RPCHelpers.h>
#include <boost/asio/ip/host_name.hpp>
#include <boost/asio/steady_timer.hpp>
#include <ripple/basics/ThreadLocalQueue.h>
#include <exception>
#include <mutex>
@@ -774,7 +775,7 @@ private:
std::condition_variable mCond;
std::mutex mMutex;
DispatchState mDispatchState = DispatchState::none;
std::vector<TransactionStatus> mTransactions;
ThreadLocalQueue<TransactionStatus> mTransactions;
StateAccounting accounting_{};
@@ -1404,7 +1405,6 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
std::unique_lock ledgerLock{
m_ledgerMaster.peekMutex(), std::defer_lock};
std::lock(masterLock, ledgerLock);
app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
for (TransactionStatus& e : transactions)
{

View File

@@ -0,0 +1,197 @@
#ifndef RIPPLE_BASICS_THREADLOCALQUEUE_H_INCLUDED
#define RIPPLE_BASICS_THREADLOCALQUEUE_H_INCLUDED
#include <atomic>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
#include <set>
namespace ripple {
template <typename T>
class ThreadLocalQueue
{
private:
static inline thread_local std::vector<T> local_queue;
static inline thread_local size_t local_count{0};
mutable std::recursive_mutex mutex;
std::set<std::vector<T>*> thread_queues;
std::atomic<size_t> total_count{0};
public:
void
register_thread() noexcept
{
std::lock_guard lock(mutex);
thread_queues.emplace(&local_queue);
local_count = 0;
}
void
unregister_thread() noexcept
{
std::lock_guard lock(mutex);
auto it =
std::find(thread_queues.begin(), thread_queues.end(), &local_queue);
if (it != thread_queues.end())
{
thread_queues.erase(it);
}
total_count.fetch_sub(local_count, std::memory_order_relaxed);
local_queue.clear();
local_count = 0;
}
void
push_back(T item) noexcept
{
static thread_local bool registered = false;
if (!registered)
{
register_thread();
registered = true;
}
std::lock_guard lock(mutex);
local_queue.push_back(std::move(item));
local_count++;
total_count.fetch_add(1, std::memory_order_relaxed);
}
std::vector<T>
merge_and_clear() noexcept
{
std::lock_guard lock(mutex);
size_t expected_size = total_count.load(std::memory_order_relaxed);
std::vector<T> merged;
merged.reserve(expected_size);
for (auto* queue : thread_queues)
{
merged.insert(
merged.end(),
std::make_move_iterator(queue->begin()),
std::make_move_iterator(queue->end()));
queue->clear();
queue->shrink_to_fit();
}
total_count.store(0, std::memory_order_relaxed);
for (auto* queue : thread_queues)
{
*const_cast<size_t*>(&local_count) = 0;
}
return merged;
}
size_t
local_size() const noexcept
{
std::lock_guard lock(mutex);
return local_count;
}
size_t
size() const noexcept
{
std::lock_guard lock(mutex);
return total_count.load(std::memory_order_relaxed);
}
bool
empty() const noexcept
{
std::lock_guard lock(mutex);
return total_count.load(std::memory_order_relaxed) == 0;
}
void
swap(ThreadLocalQueue& other) noexcept
{
if (this == &other)
return;
// With recursive mutex, we can simply lock both mutexes sequentially
{
std::scoped_lock(mutex, other.mutex);
// Swap the thread queues
thread_queues.swap(other.thread_queues);
// Swap the total counts using atomic exchange
size_t this_count = total_count.load(std::memory_order_relaxed);
size_t other_count = other.total_count.load(std::memory_order_relaxed);
total_count.store(other_count, std::memory_order_relaxed);
other.total_count.store(this_count, std::memory_order_relaxed);
}
}
friend void
swap(ThreadLocalQueue& lhs, ThreadLocalQueue& rhs) noexcept
{
lhs.swap(rhs);
}
void
swap(std::vector<T>& vec) noexcept
{
std::lock_guard lock(mutex);
std::vector<T> temp;
temp.swap(vec);
// Move all our queue contents into vec
size_t total_size = total_count.load(std::memory_order_relaxed);
vec.reserve(total_size);
// For each queue
int i = 0;
for (auto* queue : thread_queues)
{
// Move elements one at a time to handle non-copyable types
while (!queue->empty())
{
vec.push_back(std::move(queue->back()));
queue->pop_back();
}
queue->shrink_to_fit();
}
// Move temp's contents to the first available queue
if (!thread_queues.empty())
{
auto* first_queue = *thread_queues.begin();
// Move elements one at a time from temp to queue
while (!temp.empty())
{
first_queue->push_back(std::move(temp.back()));
temp.pop_back();
}
local_count = first_queue->size();
total_count.store(local_count, std::memory_order_relaxed);
}
else
{
total_count.store(0, std::memory_order_relaxed);
}
}
friend void
swap(ThreadLocalQueue& tlq, std::vector<T>& vec) noexcept
{
tlq.swap(vec);
}
friend void
swap(std::vector<T>& vec, ThreadLocalQueue& tlq) noexcept
{
tlq.swap(vec);
}
};
} // namespace ripple
#endif // THREAD_LOCAL_QUEUE_HPP