thread local queue working properly

This commit is contained in:
Richard Holland
2024-12-09 12:41:01 +11:00
parent 2cf2ac6e12
commit 428ee457dc
5 changed files with 370 additions and 225 deletions

View File

@@ -323,6 +323,9 @@ public:
void
transactionBatch();
void
forceTransactionBatch();
/**
* Attempt to apply transactions and post-process based on the results.
*
@@ -1371,6 +1374,17 @@ NetworkOPsImp::doTransactionSync(
} while (transaction->getApplying());
}
void
NetworkOPsImp::forceTransactionBatch()
{
std::unique_lock<std::mutex> lock(mMutex);
mDispatchState = DispatchState::scheduled;
while (mTransactions.size())
{
apply(lock);
}
}
void
NetworkOPsImp::transactionBatch()
{

View File

@@ -139,6 +139,9 @@ public:
bool bLocal,
FailHard failType = FailHard::no) = 0;
virtual void
forceTransactionBatch() = 0;
//--------------------------------------------------------------------------
//
// Owner functions

View File

@@ -1,133 +1,318 @@
#ifndef RIPPLE_BASICS_THREADLOCALQUEUE_H_INCLUDED
#define RIPPLE_BASICS_THREADLOCALQUEUE_H_INCLUDED
#include <atomic>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
#include <set>
namespace ripple {
#define DEBUG_TLQ 0
namespace detail {
enum class Operation { PUSH_BACK, MERGE_AND_CLEAR, SIZE, REGISTER, UNREGISTER };
// Thread-specific data including mutex and vector
template <typename T>
struct ThreadData
{
std::mutex mutex;
std::vector<T> queue;
};
// Structure to hold per-instance data
template <typename T>
struct InstanceData
{
std::mutex mutex;
std::atomic<size_t> total_size{0};
std::unordered_map<std::thread::id, std::unique_ptr<ThreadData<T>>>
thread_queues;
};
// Global map of instance IDs to their data
template <typename T>
struct GlobalInstances
{
std::mutex mutex;
std::unordered_map<uintptr_t, std::shared_ptr<InstanceData<T>>> instances;
static GlobalInstances&
instance()
{
static GlobalInstances inst;
return inst;
}
};
// Helper to get or create instance data
template <typename T>
std::shared_ptr<InstanceData<T>>
get_instance_data(uintptr_t instance_id)
{
auto& global = GlobalInstances<T>::instance();
std::lock_guard<std::mutex> lock(global.mutex);
auto it = global.instances.find(instance_id);
if (it == global.instances.end())
{
auto data = std::make_shared<InstanceData<T>>();
global.instances[instance_id] = data;
return data;
}
return it->second;
}
// Generic dispatch function that handles all operations
template <typename T>
std::pair<std::vector<T>, size_t>
dispatch(uintptr_t instance_id, Operation op, T* item = nullptr)
{
auto instance_data = get_instance_data<T>(instance_id);
auto thread_id = std::this_thread::get_id();
switch (op)
{
case Operation::MERGE_AND_CLEAR: {
auto current_size =
instance_data->total_size.load(std::memory_order_acquire);
if (DEBUG_TLQ)
std::cout << "start merge, counter=" << current_size << "\n";
if (current_size == 0)
{
return {std::vector<T>(), 0};
}
std::vector<T> result;
size_t total_size = 0;
// Lock the entire instance data to prevent concurrent modifications
std::lock_guard<std::mutex> instance_lock(instance_data->mutex);
// First calculate total size
for (const auto& queue_entry : instance_data->thread_queues)
{
auto queue_size = queue_entry.second->queue.size();
total_size += queue_size;
if (DEBUG_TLQ)
std::cout << "queue size for thread " << queue_entry.first
<< ": " << queue_size << "\n";
}
if (DEBUG_TLQ)
std::cout << "merge total size: " << total_size << "\n";
result.reserve(total_size);
// Move all elements from all thread queues
for (auto& queue_entry : instance_data->thread_queues)
{
auto& queue = queue_entry.second->queue;
std::lock_guard<std::mutex> queue_lock(
queue_entry.second->mutex);
// For non-movable types, we need to copy elements individually
for (auto& item : queue)
{
result.push_back(
item); // Will use copy constructor instead of move
}
if (DEBUG_TLQ)
std::cout << "copied queue of size: " << queue.size()
<< " for thread " << queue_entry.first << "\n";
queue.clear();
}
instance_data->total_size.store(0, std::memory_order_release);
if (DEBUG_TLQ)
std::cout << "merge size: " << result.size() << ", counter: "
<< instance_data->total_size.load(
std::memory_order_acquire)
<< "\n";
return {std::move(result), 0};
}
case Operation::REGISTER: {
std::lock_guard<std::mutex> lock(instance_data->mutex);
if (instance_data->thread_queues.find(thread_id) ==
instance_data->thread_queues.end())
{
instance_data->thread_queues[thread_id] =
std::make_unique<ThreadData<T>>();
}
return {std::vector<T>(), 0};
}
case Operation::PUSH_BACK: {
// Ensure thread is registered
{
std::lock_guard<std::mutex> lock(instance_data->mutex);
if (instance_data->thread_queues.find(thread_id) ==
instance_data->thread_queues.end())
{
instance_data->thread_queues[thread_id] =
std::make_unique<ThreadData<T>>();
}
}
auto& thread_data = instance_data->thread_queues[thread_id];
std::lock_guard<std::mutex> lock(thread_data->mutex);
if (item)
{
thread_data->queue.push_back(std::move(*item));
instance_data->total_size.fetch_add(
1, std::memory_order_release);
}
if (DEBUG_TLQ)
std::cout << "pushed, counter="
<< instance_data->total_size.load(
std::memory_order_acquire)
<< "\n";
return {std::vector<T>(), 0};
}
case Operation::SIZE: {
return {
std::vector<T>(),
instance_data->total_size.load(std::memory_order_acquire)};
}
case Operation::UNREGISTER: {
std::lock_guard<std::mutex> lock(instance_data->mutex);
if (auto it = instance_data->thread_queues.find(thread_id);
it != instance_data->thread_queues.end())
{
size_t queue_size = it->second->queue.size();
instance_data->total_size.fetch_sub(
queue_size, std::memory_order_release);
instance_data->thread_queues.erase(it);
}
return {std::vector<T>(), 0};
}
default:
return {std::vector<T>(), 0};
}
}
} // namespace detail
template <typename T>
class ThreadLocalQueue
{
private:
static inline thread_local std::vector<T> local_queue;
static inline thread_local size_t local_count{0};
mutable std::recursive_mutex mutex;
std::set<std::vector<T>*> thread_queues;
std::atomic<size_t> total_count{0};
public:
ThreadLocalQueue() : instance_id(reinterpret_cast<uintptr_t>(this))
{
// Initial registration happens in the constructor thread
detail::dispatch<T>(instance_id, detail::Operation::REGISTER);
}
~ThreadLocalQueue()
{
// Clean up instance data when the queue is destroyed
auto& global = detail::GlobalInstances<T>::instance();
std::lock_guard<std::mutex> lock(global.mutex);
global.instances.erase(instance_id);
}
// Prevent copying to avoid counter complications
ThreadLocalQueue(const ThreadLocalQueue&) = delete;
ThreadLocalQueue&
operator=(const ThreadLocalQueue&) = delete;
// Move operations
ThreadLocalQueue(ThreadLocalQueue&& other) noexcept
: instance_id(other.instance_id)
{
other.instance_id = 0;
}
ThreadLocalQueue&
operator=(ThreadLocalQueue&& other) noexcept
{
if (this != &other)
{
auto& global = detail::GlobalInstances<T>::instance();
std::lock_guard<std::mutex> lock(global.mutex);
global.instances.erase(instance_id);
instance_id = other.instance_id;
other.instance_id = 0;
}
return *this;
}
// Explicitly register current thread
void
register_thread() noexcept
{
std::lock_guard lock(mutex);
thread_queues.emplace(&local_queue);
std::cout << "Thread registered: " << reinterpret_cast<uint64_t>(&local_queue) << "\n";
local_count = 0;
detail::dispatch<T>(instance_id, detail::Operation::REGISTER);
}
void
unregister_thread() noexcept
{
std::lock_guard lock(mutex);
auto it =
std::find(thread_queues.begin(), thread_queues.end(), &local_queue);
if (it != thread_queues.end())
{
thread_queues.erase(it);
}
total_count.fetch_sub(local_count, std::memory_order_relaxed);
local_queue.clear();
local_count = 0;
detail::dispatch<T>(instance_id, detail::Operation::UNREGISTER);
}
void
push_back(T item) noexcept
{
static thread_local bool registered = false;
if (!registered)
{
register_thread();
registered = true;
}
std::lock_guard lock(mutex);
local_queue.push_back(std::move(item));
local_count++;
total_count.fetch_add(1, std::memory_order_relaxed);
detail::dispatch<T>(instance_id, detail::Operation::PUSH_BACK, &item);
}
std::vector<T>
merge_and_clear() noexcept
{
std::lock_guard lock(mutex);
size_t expected_size = total_count.load(std::memory_order_relaxed);
std::vector<T> merged;
merged.reserve(expected_size);
for (auto* queue : thread_queues)
{
merged.insert(
merged.end(),
std::make_move_iterator(queue->begin()),
std::make_move_iterator(queue->end()));
queue->clear();
queue->shrink_to_fit();
}
total_count.store(0, std::memory_order_relaxed);
for (auto* queue : thread_queues)
{
*const_cast<size_t*>(&local_count) = 0;
}
return merged;
}
size_t
local_size() const noexcept
{
std::lock_guard lock(mutex);
return local_count;
return detail::dispatch<T>(
instance_id, detail::Operation::MERGE_AND_CLEAR)
.first;
}
size_t
size() const noexcept
{
std::lock_guard lock(mutex);
return total_count.load(std::memory_order_relaxed);
return detail::dispatch<T>(instance_id, detail::Operation::SIZE).second;
}
bool
empty() const noexcept
{
std::lock_guard lock(mutex);
return total_count.load(std::memory_order_relaxed) == 0;
return size() == 0;
}
void
swap(std::vector<T>& other) noexcept
{
auto merged = merge_and_clear();
merged.swap(other);
}
void
swap(ThreadLocalQueue& other) noexcept
{
if (this == &other)
return;
// With recursive mutex, we can simply lock both mutexes sequentially
{
std::scoped_lock(mutex, other.mutex);
// Swap the thread queues
thread_queues.swap(other.thread_queues);
// Swap the total counts using atomic exchange
size_t this_count = total_count.load(std::memory_order_relaxed);
size_t other_count = other.total_count.load(std::memory_order_relaxed);
total_count.store(other_count, std::memory_order_relaxed);
other.total_count.store(this_count, std::memory_order_relaxed);
return;
}
auto& global = detail::GlobalInstances<T>::instance();
// Get the global lock first
std::lock_guard<std::mutex> global_lock(global.mutex);
// Then get individual instance locks in a consistent order
auto my_instance = global.instances[instance_id];
auto other_instance = global.instances[other.instance_id];
std::lock_guard<std::mutex> lock1(my_instance->mutex);
std::lock_guard<std::mutex> lock2(other_instance->mutex);
// Now safe to swap instance IDs and data
std::swap(instance_id, other.instance_id);
std::swap(
global.instances[instance_id], global.instances[other.instance_id]);
}
friend void
@@ -136,63 +321,6 @@ public:
lhs.swap(rhs);
}
void
swap(std::vector<T>& vec) noexcept
{
std::lock_guard lock(mutex);
std::vector<T> temp;
temp.swap(vec);
// Move all our queue contents into vec
size_t total_size = total_count.load(std::memory_order_relaxed);
vec.reserve(total_size);
// For each queue
int i = 0;
for (auto* queue : thread_queues)
{
// Move elements one at a time to handle non-copyable types
while (!queue->empty())
{
vec.push_back(std::move(queue->back()));
queue->pop_back();
}
queue->shrink_to_fit();
}
// Move temp's contents to the first available queue
if (!thread_queues.empty())
{
auto* first_queue = *thread_queues.begin();
// Move elements one at a time from temp to queue
while (!temp.empty())
{
first_queue->push_back(std::move(temp.back()));
temp.pop_back();
}
local_count = first_queue->size();
total_count.store(local_count, std::memory_order_relaxed);
}
else
{
total_count.store(0, std::memory_order_relaxed);
}
}
friend void
swap(ThreadLocalQueue& tlq, std::vector<T>& vec) noexcept
{
tlq.swap(vec);
}
friend void
swap(std::vector<T>& vec, ThreadLocalQueue& tlq) noexcept
{
tlq.swap(vec);
}
private:
uintptr_t instance_id;
};
} // namespace ripple
#endif // THREAD_LOCAL_QUEUE_HPP

View File

@@ -19,14 +19,14 @@ using namespace jtx;
class LedgerStress_test : public beast::unit_test::suite
{
private:
static constexpr std::size_t TXN_PER_LEDGER = 100;
static constexpr std::size_t MAX_TXN_PER_ACCOUNT = 1;
static constexpr std::size_t TXN_PER_LEDGER = 20000;
static constexpr std::size_t MAX_TXN_PER_ACCOUNT = 5;
static constexpr std::chrono::seconds MAX_CLOSE_TIME{6};
static constexpr std::size_t REQUIRED_ACCOUNTS =
(TXN_PER_LEDGER + MAX_TXN_PER_ACCOUNT - 1) / MAX_TXN_PER_ACCOUNT;
// Get number of hardware threads and use half
const std::size_t NUM_THREADS = 1;//std::max(std::thread::hardware_concurrency() / 2, 1u);
const std::size_t NUM_THREADS = std::max(std::thread::hardware_concurrency() / 2, 1u);
struct LedgerMetrics {
std::chrono::milliseconds submitTime{0};
@@ -86,39 +86,49 @@ private:
env.close();
return accounts;
}
// Structure to hold work assignment for each thread
struct ThreadWork {
std::size_t startAccountIdx;
std::size_t endAccountIdx;
std::size_t numTxnsToSubmit;
};
void
submitBatchThread(
jtx::Env& env,
std::vector<jtx::Account> const& senders,
std::vector<jtx::Account> const& allAccounts,
std::size_t startIdx,
std::size_t endIdx,
std::size_t txPerAccount)
std::vector<jtx::Account> const& accounts,
ThreadWork const& work)
{
static thread_local int tmp = 0;
std::cout << "submitBatchThread " << reinterpret_cast<uint64_t>(&tmp) << "\n";
for (std::size_t i = startIdx; i < endIdx; ++i)
auto const escalatedFee = getEscalatedFee(env);
std::size_t txnsSubmitted = 0;
for (std::size_t i = work.startAccountIdx;
i < work.endAccountIdx && txnsSubmitted < work.numTxnsToSubmit; ++i)
{
auto const& sender = senders[i];
auto const& sender = accounts[i];
// Create list of possible recipients (everyone except sender)
std::vector<Account> recipients;
recipients.reserve(allAccounts.size() - 1);
for (auto const& acct : allAccounts)
recipients.reserve(accounts.size() - 1);
for (auto const& acct : accounts)
{
if (acct.id() != sender.id())
recipients.push_back(acct);
}
auto const escalatedFee = getEscalatedFee(env);
// Calculate how many txns to submit from this account
std::size_t txnsRemaining = work.numTxnsToSubmit - txnsSubmitted;
std::size_t txnsForAccount = std::min(MAX_TXN_PER_ACCOUNT, txnsRemaining);
for (std::size_t tx = 0; tx < txPerAccount; ++tx)
// Submit transactions
for (std::size_t tx = 0; tx < txnsForAccount; ++tx)
{
auto const& recipient = recipients[tx % recipients.size()];
env.inject(pay(sender, recipient, XRP(1)),
fee(escalatedFee),
seq(autofill));
++txnsSubmitted;
}
}
}
@@ -130,19 +140,14 @@ private:
" ledgers using " + std::to_string(NUM_THREADS) + " threads");
Env env{*this, envconfig(many_workers)};
Application* appPtr = env.getApp();
std::cout << "Application ptr: " << reinterpret_cast<uint64_t>(appPtr) << "\n";
auto const journal = env.app().journal("LedgerStressTest");
env.app().config().MAX_TRANSACTIONS = 100000;
// Get actual hardware thread count and ensure consistent types
auto const journal = env.app().journal("LedgerStressTest");
// Get actual hardware thread count
std::size_t hardwareThreads = static_cast<std::size_t>(std::thread::hardware_concurrency());
if (hardwareThreads == 0)
hardwareThreads = 4; // Fallback if hardware_concurrency() fails
hardwareThreads = 4; // Fallback
const std::size_t THREAD_COUNT = std::min(NUM_THREADS, hardwareThreads);
@@ -154,10 +159,6 @@ private:
std::vector<LedgerMetrics> metrics;
metrics.reserve(numLedgers);
// Create thread pool
std::vector<std::thread> threadPool;
threadPool.reserve(THREAD_COUNT);
for (std::size_t ledger = 0; ledger < numLedgers; ++ledger)
{
threadSafeLog("Starting ledger " + std::to_string(ledger));
@@ -166,57 +167,55 @@ private:
auto submitStart = std::chrono::steady_clock::now();
ledgerMetrics.baseFee = env.current()->fees().base;
// Calculate work distribution
std::size_t txnsPerThread = (TXN_PER_LEDGER + THREAD_COUNT - 1) / THREAD_COUNT;
std::size_t accountsPerThread = (accounts.size() + THREAD_COUNT - 1) / THREAD_COUNT;
// Atomic counter for synchronization
std::atomic<std::size_t> completedTxns{0};
std::mutex submitMutex;
// Launch worker threads
// Pre-calculate exact work distribution for threads
std::vector<ThreadWork> threadAssignments;
threadAssignments.reserve(THREAD_COUNT);
std::size_t totalAccountsAssigned = 0;
std::size_t totalTxnsAssigned = 0;
std::size_t accountsPerThread = accounts.size() / THREAD_COUNT;
for (std::size_t t = 0; t < THREAD_COUNT; ++t)
{
std::size_t startIdx = t * accountsPerThread;
std::size_t endIdx = std::min(startIdx + accountsPerThread, accounts.size());
ThreadWork work;
work.startAccountIdx = totalAccountsAssigned;
threadPool.emplace_back(
[&, startIdx, endIdx]() {
try {
std::size_t localTxnCount = 0;
std::size_t targetTxns = txnsPerThread;
// Ensure we don't exceed total desired transactions
{
std::lock_guard<std::mutex> lock(submitMutex);
std::size_t remaining = TXN_PER_LEDGER - completedTxns;
targetTxns = std::min(txnsPerThread, remaining);
}
// Last thread gets remaining accounts
if (t == THREAD_COUNT - 1) {
work.endAccountIdx = accounts.size();
work.numTxnsToSubmit = TXN_PER_LEDGER - totalTxnsAssigned;
}
else {
work.endAccountIdx = work.startAccountIdx + accountsPerThread;
work.numTxnsToSubmit = TXN_PER_LEDGER / THREAD_COUNT;
}
totalAccountsAssigned = work.endAccountIdx;
totalTxnsAssigned += work.numTxnsToSubmit;
threadAssignments.push_back(work);
}
BEAST_EXPECT(totalTxnsAssigned == TXN_PER_LEDGER);
submitBatchThread(
env,
accounts,
accounts,
startIdx,
endIdx,
targetTxns);
completedTxns += targetTxns;
}
catch (const std::exception& e) {
threadSafeLog("Thread exception: " + std::string(e.what()));
}
// Launch threads with pre-calculated work assignments
std::vector<std::thread> threads;
threads.reserve(THREAD_COUNT);
for (std::size_t t = 0; t < THREAD_COUNT; ++t)
{
threads.emplace_back(
[&env, &accounts, work = threadAssignments[t], this]() {
submitBatchThread(env, accounts, work);
}
);
}
// Wait for all threads to complete
for (auto& thread : threadPool)
// Wait for all threads
for (auto& thread : threads)
{
if (thread.joinable())
thread.join();
}
threadPool.clear();
ledgerMetrics.submitTime = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - submitStart);
@@ -243,12 +242,10 @@ private:
}
}
public:
void run() override
{
runStressTest(1);
runStressTest(5);
}
};

View File

@@ -126,13 +126,17 @@ Env::close(
{
// Round up to next distinguishable value
using namespace std::chrono_literals;
auto& netOPs = app().getOPs();
netOPs.forceTransactionBatch();
bool res = true;
closeTime += closed()->info().closeTimeResolution - 1s;
timeKeeper().set(closeTime);
// Go through the rpc interface unless we need to simulate
// a specific consensus delay.
if (consensusDelay)
app().getOPs().acceptLedger(consensusDelay);
netOPs.acceptLedger(consensusDelay);
else
{
auto resp = rpc("ledger_accept");
@@ -295,18 +299,17 @@ Env::inject_jtx(JTx const& jt)
{
std::string reason;
// make a copy
STTx* newData = new STTx(*jt.stx);
auto stx = std::shared_ptr<STTx const>(newData);
auto id = stx->getTransactionID();
//STTx* newData = new STTx(*jt.stx);
//auto stx = std::shared_ptr<STTx const>(newData);
auto id = jt.stx->getTransactionID();
static std::map<uint256, std::shared_ptr<STTx const>> storage;
storage.emplace(id, stx);
auto tx = std::make_shared<Transaction>(stx, reason, app);
auto tx = std::make_shared<Transaction>(jt.stx, reason, app);
static int counter = 0;
std::cout << "inject_jtx [" << counter++ << "] id=" << id << "\n";
netOPs.processTransaction(tx, false, false);
counter++;
if (counter % 2500 == 0)
std::cout << "inject_jtx [" << counter++ << "] id=" << id << "\n";
netOPs.processTransaction(tx, true, false);
}
return postconditions(jt, ter_, true);