From 2cf2ac6e12a5fb99bf2e6a85bdab776e4395cbaa Mon Sep 17 00:00:00 2001 From: Richard Holland Date: Sat, 7 Dec 2024 21:44:22 +1100 Subject: [PATCH] add jtx inject --- src/ripple/app/main/Application.cpp | 40 +++++---- src/ripple/app/misc/NetworkOPs.h | 2 +- src/ripple/basics/ThreadLocalQueue.h | 1 + src/test/app/LedgerStress_test.cpp | 130 ++++++++++++++++----------- src/test/jtx/Env.h | 16 ++++ src/test/jtx/envconfig.h | 3 + src/test/jtx/impl/Env.cpp | 28 ++++++ src/test/jtx/impl/envconfig.cpp | 8 ++ 8 files changed, 157 insertions(+), 71 deletions(-) diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 9134df035..835ba0d04 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -301,27 +301,33 @@ public: , m_jobQueue(std::make_unique( [](std::unique_ptr const& config) { - if (config->standalone() && !config->reporting() && - !config->FORCE_MULTI_THREAD) - return 1; + auto f = [&]() { + if (config->standalone() && !config->reporting() && + !config->FORCE_MULTI_THREAD) + return 1; - if (config->WORKERS) - return config->WORKERS; + if (config->WORKERS) + return config->WORKERS; - auto count = - static_cast(std::thread::hardware_concurrency()); + auto count = + static_cast(std::thread::hardware_concurrency()); - // Be more aggressive about the number of threads to use - // for the job queue if the server is configured as "large" - // or "huge" if there are enough cores. - if (config->NODE_SIZE >= 4 && count >= 16) - count = 6 + std::min(count, 8); - else if (config->NODE_SIZE >= 3 && count >= 8) - count = 4 + std::min(count, 6); - else - count = 2 + std::min(count, 4); + // Be more aggressive about the number of threads to use + // for the job queue if the server is configured as "large" + // or "huge" if there are enough cores. + if (config->NODE_SIZE >= 4 && count >= 16) + count = 6 + std::min(count, 8); + else if (config->NODE_SIZE >= 3 && count >= 8) + count = 4 + std::min(count, 6); + else + count = 2 + std::min(count, 4); - return count; + return count; + }; + + int threads = f(); + std::cout << "JobQueue thread count: " << threads << "\n"; + return threads; }(config_), m_collectorManager->group("jobq"), logs_->journal("JobQueue"), diff --git a/src/ripple/app/misc/NetworkOPs.h b/src/ripple/app/misc/NetworkOPs.h index d53127ed3..abfdd1898 100644 --- a/src/ripple/app/misc/NetworkOPs.h +++ b/src/ripple/app/misc/NetworkOPs.h @@ -137,7 +137,7 @@ public: std::shared_ptr& transaction, bool bUnlimited, bool bLocal, - FailHard failType) = 0; + FailHard failType = FailHard::no) = 0; //-------------------------------------------------------------------------- // diff --git a/src/ripple/basics/ThreadLocalQueue.h b/src/ripple/basics/ThreadLocalQueue.h index 30b7d7c3c..436632e9e 100644 --- a/src/ripple/basics/ThreadLocalQueue.h +++ b/src/ripple/basics/ThreadLocalQueue.h @@ -26,6 +26,7 @@ public: { std::lock_guard lock(mutex); thread_queues.emplace(&local_queue); + std::cout << "Thread registered: " << reinterpret_cast(&local_queue) << "\n"; local_count = 0; } diff --git a/src/test/app/LedgerStress_test.cpp b/src/test/app/LedgerStress_test.cpp index b32834e11..61f870bdc 100644 --- a/src/test/app/LedgerStress_test.cpp +++ b/src/test/app/LedgerStress_test.cpp @@ -10,6 +10,7 @@ #include #include #include +#include namespace ripple { namespace test { @@ -18,14 +19,14 @@ using namespace jtx; class LedgerStress_test : public beast::unit_test::suite { private: - static constexpr std::size_t TXN_PER_LEDGER = 20000; + static constexpr std::size_t TXN_PER_LEDGER = 100; static constexpr std::size_t MAX_TXN_PER_ACCOUNT = 1; static constexpr std::chrono::seconds MAX_CLOSE_TIME{6}; static constexpr std::size_t REQUIRED_ACCOUNTS = (TXN_PER_LEDGER + MAX_TXN_PER_ACCOUNT - 1) / MAX_TXN_PER_ACCOUNT; // Get number of hardware threads and use half - const std::size_t NUM_THREADS = std::max(std::thread::hardware_concurrency() / 2, 1u); + const std::size_t NUM_THREADS = 1;//std::max(std::thread::hardware_concurrency() / 2, 1u); struct LedgerMetrics { std::chrono::milliseconds submitTime{0}; @@ -78,6 +79,9 @@ private: auto account = jtx::Account(name); accounts.push_back(account); env.fund(false, XRP(100000), account); + + if (i % 2500 == 0 && i != 0) + std::cout << "Accounts: " << i << "\n"; } env.close(); return accounts; @@ -92,6 +96,9 @@ private: std::size_t endIdx, std::size_t txPerAccount) { + static thread_local int tmp = 0; + std::cout << "submitBatchThread " << reinterpret_cast(&tmp) << "\n"; + for (std::size_t i = startIdx; i < endIdx; ++i) { auto const& sender = senders[i]; @@ -109,31 +116,48 @@ private: for (std::size_t tx = 0; tx < txPerAccount; ++tx) { auto const& recipient = recipients[tx % recipients.size()]; - env(pay(sender, recipient, XRP(1)), + env.inject(pay(sender, recipient, XRP(1)), fee(escalatedFee), seq(autofill)); } } } - void - runStressTest(std::size_t numLedgers) + void runStressTest(std::size_t numLedgers) { - testcase("Multithreaded stress test: " + std::to_string(TXN_PER_LEDGER) + - " txns/ledger for " + std::to_string(numLedgers) + + testcase("Multithreaded stress test: " + std::to_string(TXN_PER_LEDGER) + + " txns/ledger for " + std::to_string(numLedgers) + " ledgers using " + std::to_string(NUM_THREADS) + " threads"); - Env env(*this); + Env env{*this, envconfig(many_workers)}; + + Application* appPtr = env.getApp(); + + std::cout << "Application ptr: " << reinterpret_cast(appPtr) << "\n"; + auto const journal = env.app().journal("LedgerStressTest"); env.app().config().MAX_TRANSACTIONS = 100000; - threadSafeLog("Creating " + std::to_string(REQUIRED_ACCOUNTS) + " accounts"); - auto accounts = createAccounts(env, REQUIRED_ACCOUNTS); + // Get actual hardware thread count and ensure consistent types + std::size_t hardwareThreads = static_cast(std::thread::hardware_concurrency()); + if (hardwareThreads == 0) + hardwareThreads = 4; // Fallback if hardware_concurrency() fails + const std::size_t THREAD_COUNT = std::min(NUM_THREADS, hardwareThreads); + + threadSafeLog("Using " + std::to_string(THREAD_COUNT) + " hardware threads"); + threadSafeLog("Creating " + std::to_string(REQUIRED_ACCOUNTS) + " accounts"); + + auto accounts = createAccounts(env, REQUIRED_ACCOUNTS); + std::vector metrics; metrics.reserve(numLedgers); + // Create thread pool + std::vector threadPool; + threadPool.reserve(THREAD_COUNT); + for (std::size_t ledger = 0; ledger < numLedgers; ++ledger) { threadSafeLog("Starting ledger " + std::to_string(ledger)); @@ -142,36 +166,57 @@ private: auto submitStart = std::chrono::steady_clock::now(); ledgerMetrics.baseFee = env.current()->fees().base; - // Create threads for parallel submission - std::vector threads; - threads.reserve(NUM_THREADS); + // Calculate work distribution + std::size_t txnsPerThread = (TXN_PER_LEDGER + THREAD_COUNT - 1) / THREAD_COUNT; + std::size_t accountsPerThread = (accounts.size() + THREAD_COUNT - 1) / THREAD_COUNT; - std::size_t accountsPerThread = - (accounts.size() + NUM_THREADS - 1) / NUM_THREADS; + // Atomic counter for synchronization + std::atomic completedTxns{0}; + std::mutex submitMutex; - for (std::size_t t = 0; t < NUM_THREADS; ++t) + // Launch worker threads + for (std::size_t t = 0; t < THREAD_COUNT; ++t) { std::size_t startIdx = t * accountsPerThread; std::size_t endIdx = std::min(startIdx + accountsPerThread, accounts.size()); - threads.emplace_back( - [this, &env, &accounts, startIdx, endIdx]() { - submitBatchThread( - env, - accounts, - accounts, - startIdx, - endIdx, - MAX_TXN_PER_ACCOUNT); + threadPool.emplace_back( + [&, startIdx, endIdx]() { + try { + std::size_t localTxnCount = 0; + std::size_t targetTxns = txnsPerThread; + + // Ensure we don't exceed total desired transactions + { + std::lock_guard lock(submitMutex); + std::size_t remaining = TXN_PER_LEDGER - completedTxns; + targetTxns = std::min(txnsPerThread, remaining); + } + + submitBatchThread( + env, + accounts, + accounts, + startIdx, + endIdx, + targetTxns); + + completedTxns += targetTxns; + } + catch (const std::exception& e) { + threadSafeLog("Thread exception: " + std::string(e.what())); + } } ); } // Wait for all threads to complete - for (auto& thread : threads) + for (auto& thread : threadPool) { - thread.join(); + if (thread.joinable()) + thread.join(); } + threadPool.clear(); ledgerMetrics.submitTime = std::chrono::duration_cast( std::chrono::steady_clock::now() - submitStart); @@ -192,35 +237,14 @@ private: auto const totalTime = ledgerMetrics.submitTime + ledgerMetrics.closeTime; BEAST_EXPECT(totalTime <= std::chrono::duration_cast(MAX_CLOSE_TIME)); - threadSafeLog("Completed ledger " + std::to_string(ledger) + - " in " + std::to_string(totalTime.count()) + "ms"); + threadSafeLog("Completed ledger " + std::to_string(ledger) + + " in " + std::to_string(totalTime.count()) + "ms" + + " using " + std::to_string(THREAD_COUNT) + " threads"); } - - // Print summary - auto avgSubmitTime = std::chrono::milliseconds(0); - auto avgCloseTime = std::chrono::milliseconds(0); - XRPAmount totalFees{0}; - std::size_t totalTxns = 0; - - for (auto const& m : metrics) - { - avgSubmitTime += m.submitTime; - avgCloseTime += m.closeTime; - totalFees += m.baseFee; - totalTxns += m.txCount; - } - - avgSubmitTime /= metrics.size(); - avgCloseTime /= metrics.size(); - - threadSafeLog( - "Test Summary - " - "Average submit time: " + std::to_string(avgSubmitTime.count()) + "ms, " - "Average close time: " + std::to_string(avgCloseTime.count()) + "ms, " - "Average base fee: " + std::to_string(totalFees.drops()/metrics.size()) + " drops, " - "Total transactions: " + std::to_string(totalTxns)); } + + public: void run() override { diff --git a/src/test/jtx/Env.h b/src/test/jtx/Env.h index 93b253640..9fb7b4152 100644 --- a/src/test/jtx/Env.h +++ b/src/test/jtx/Env.h @@ -148,6 +148,12 @@ public: operator=(Env const&) = delete; Env(Env const&) = delete; + Application* + getApp() + { + return bundle_.app; + } + /** * @brief Create Env using suite, Config pointer, and explicit features. * @@ -508,6 +514,9 @@ public: virtual void submit(JTx const& jt); + virtual void + inject_jtx(JTx const& jt); + /** Use the submit RPC command with a provided JTx object. This calls postconditions. */ @@ -529,6 +538,13 @@ public: submit(jt(std::forward(jv), fN...)); } + template + void + inject(JsonValue&& jv, FN const&... fN) + { + inject_jtx(jt(std::forward(jv), fN...)); + } + template void operator()(JsonValue&& jv, FN const&... fN) diff --git a/src/test/jtx/envconfig.h b/src/test/jtx/envconfig.h index 5507cd2fc..f27d39d8c 100644 --- a/src/test/jtx/envconfig.h +++ b/src/test/jtx/envconfig.h @@ -86,6 +86,9 @@ std::unique_ptr no_admin(std::unique_ptr); std::unique_ptr no_admin_networkid(std::unique_ptr cfg); +std::unique_ptr +many_workers(std::unique_ptr cfg); + std::unique_ptr secure_gateway(std::unique_ptr); std::unique_ptr admin_localnet(std::unique_ptr); diff --git a/src/test/jtx/impl/Env.cpp b/src/test/jtx/impl/Env.cpp index a26d1a25f..0c68bca7e 100644 --- a/src/test/jtx/impl/Env.cpp +++ b/src/test/jtx/impl/Env.cpp @@ -49,6 +49,8 @@ #include #include #include +#include +#include namespace ripple { namespace test { @@ -284,6 +286,32 @@ Env::parseResult(Json::Value const& jr) return std::make_pair(ter, isTesSuccess(ter) || isTecClaim(ter)); } +void +Env::inject_jtx(JTx const& jt) +{ + Application& app = *(getApp()); + auto& netOPs = app.getOPs(); + if (jt.stx) + { + std::string reason; + // make a copy + STTx* newData = new STTx(*jt.stx); + auto stx = std::shared_ptr(newData); + auto id = stx->getTransactionID(); + + static std::map> storage; + storage.emplace(id, stx); + + auto tx = std::make_shared(stx, reason, app); + + static int counter = 0; + std::cout << "inject_jtx [" << counter++ << "] id=" << id << "\n"; + netOPs.processTransaction(tx, false, false); + } + return postconditions(jt, ter_, true); + +} + void Env::submit(JTx const& jt) { diff --git a/src/test/jtx/impl/envconfig.cpp b/src/test/jtx/impl/envconfig.cpp index ce2c2ae60..3ee46a0d6 100644 --- a/src/test/jtx/impl/envconfig.cpp +++ b/src/test/jtx/impl/envconfig.cpp @@ -76,6 +76,14 @@ setupConfigForUnitTests(Config& cfg) namespace jtx { +std::unique_ptr +many_workers(std::unique_ptr cfg) +{ + cfg->WORKERS = 128; + return cfg; +} + + std::unique_ptr no_admin(std::unique_ptr cfg) {