Compare commits

...

11 Commits

Author SHA1 Message Date
Richard Holland
c87f0817d1 clean up 2024-12-11 12:50:51 +11:00
Richard Holland
cd1b18f47f log formatting, clang 2024-12-10 09:54:39 +11:00
Richard Holland
88b7cb81af revert to original mempool implementation, keep ledger stress test and txn injector 2024-12-10 09:44:08 +11:00
Richard Holland
d801fdbe5d remove more debugging output 2024-12-09 19:48:36 +11:00
Richard Holland
0eb46d0d49 remove debug msg from jobqueue, adjust ledger stress parameters 2024-12-09 19:44:35 +11:00
Richard Holland
13178193d6 revised ledger stress tester 2024-12-09 12:50:54 +11:00
Richard Holland
428ee457dc thread local queue working properly 2024-12-09 12:41:01 +11:00
Richard Holland
2cf2ac6e12 add jtx inject 2024-12-07 21:44:22 +11:00
Richard Holland
cb29902a37 use a thread local queue to avoid txn submission bottleneck 2024-12-07 14:35:40 +11:00
Richard Holland
8c91d861c0 ledger stress test 2024-11-27 08:35:13 +11:00
Richard Holland
77bd6b9d20 async submitTransaction 2024-11-25 21:08:48 +11:00
10 changed files with 424 additions and 11 deletions

View File

@@ -729,6 +729,7 @@ if (tests)
src/test/app/LedgerLoad_test.cpp src/test/app/LedgerLoad_test.cpp
src/test/app/LedgerMaster_test.cpp src/test/app/LedgerMaster_test.cpp
src/test/app/LedgerReplay_test.cpp src/test/app/LedgerReplay_test.cpp
src/test/app/LedgerStress_test.cpp
src/test/app/LoadFeeTrack_test.cpp src/test/app/LoadFeeTrack_test.cpp
src/test/app/Manifest_test.cpp src/test/app/Manifest_test.cpp
src/test/app/MultiSign_test.cpp src/test/app/MultiSign_test.cpp

View File

@@ -322,6 +322,9 @@ public:
void void
transactionBatch(); transactionBatch();
void
forceTransactionBatch();
/** /**
* Attempt to apply transactions and post-process based on the results. * Attempt to apply transactions and post-process based on the results.
* *
@@ -1136,6 +1139,7 @@ NetworkOPsImp::strOperatingMode(OperatingMode const mode, bool const admin)
void void
NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans) NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
{ {
// Launch async task and return immediately
if (isNeedNetworkLedger()) if (isNeedNetworkLedger())
{ {
// Nothing we can do if we've never been in sync // Nothing we can do if we've never been in sync
@@ -1147,9 +1151,9 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
// Enforce Network bar for emitted txn // Enforce Network bar for emitted txn
if (view->rules().enabled(featureHooks) && hook::isEmittedTxn(*iTrans)) if (view->rules().enabled(featureHooks) && hook::isEmittedTxn(*iTrans))
{ {
// RH NOTE: Warning removed here due to ConsesusSet using this function // RH NOTE: Warning removed here due to ConsesusSet using this
// which continually triggers this bar. Doesn't seem dangerous, just // function which continually triggers this bar. Doesn't seem
// annoying. // dangerous, just annoying.
// JLOG(m_journal.warn()) // JLOG(m_journal.warn())
// << "Submitted transaction invalid: EmitDetails present."; // << "Submitted transaction invalid: EmitDetails present.";
@@ -1164,9 +1168,9 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
if ((flags & SF_BAD) != 0) if ((flags & SF_BAD) != 0)
{ {
// RH NOTE: Warning removed here due to ConsesusSet using this function // RH NOTE: Warning removed here due to ConsesusSet using this
// which continually triggers this bar. Doesn't seem dangerous, just // function which continually triggers this bar. Doesn't seem
// annoying. // dangerous, just annoying.
// JLOG(m_journal.warn()) << "Submitted transaction cached bad"; // JLOG(m_journal.warn()) << "Submitted transaction cached bad";
return; return;
@@ -1364,6 +1368,17 @@ NetworkOPsImp::doTransactionSync(
} while (transaction->getApplying()); } while (transaction->getApplying());
} }
void
NetworkOPsImp::forceTransactionBatch()
{
std::unique_lock<std::mutex> lock(mMutex);
mDispatchState = DispatchState::scheduled;
while (mTransactions.size())
{
apply(lock);
}
}
void void
NetworkOPsImp::transactionBatch() NetworkOPsImp::transactionBatch()
{ {
@@ -1398,7 +1413,6 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
std::unique_lock ledgerLock{ std::unique_lock ledgerLock{
m_ledgerMaster.peekMutex(), std::defer_lock}; m_ledgerMaster.peekMutex(), std::defer_lock};
std::lock(masterLock, ledgerLock); std::lock(masterLock, ledgerLock);
app_.openLedger().modify([&](OpenView& view, beast::Journal j) { app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
for (TransactionStatus& e : transactions) for (TransactionStatus& e : transactions)
{ {

View File

@@ -137,7 +137,10 @@ public:
std::shared_ptr<Transaction>& transaction, std::shared_ptr<Transaction>& transaction,
bool bUnlimited, bool bUnlimited,
bool bLocal, bool bLocal,
FailHard failType) = 0; FailHard failType = FailHard::no) = 0;
virtual void
forceTransactionBatch() = 0;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
// //

View File

@@ -24,6 +24,7 @@
#include <ripple/app/misc/TxQ.h> #include <ripple/app/misc/TxQ.h>
#include <ripple/app/tx/apply.h> #include <ripple/app/tx/apply.h>
#include <ripple/basics/mulDiv.h> #include <ripple/basics/mulDiv.h>
#include <ripple/protocol/AccountID.h>
#include <ripple/protocol/Feature.h> #include <ripple/protocol/Feature.h>
#include <ripple/protocol/jss.h> #include <ripple/protocol/jss.h>
#include <ripple/protocol/st.h> #include <ripple/protocol/st.h>
@@ -1897,7 +1898,15 @@ TxQ::tryDirectApply(
// transaction straight into the ledger. // transaction straight into the ledger.
FeeLevel64 const feeLevelPaid = getFeeLevelPaid(view, *tx); FeeLevel64 const feeLevelPaid = getFeeLevelPaid(view, *tx);
if (feeLevelPaid >= requiredFeeLevel) static auto const genesisAccountId = calcAccountID(
generateKeyPair(KeyType::secp256k1, generateSeed("masterpassphrase"))
.first);
// RH NOTE: exempting the genesis account from fee escalation is useful for
// stress testing it also shouldn't require an amendment because it will be
// fought out in consensus.
if (feeLevelPaid >= requiredFeeLevel ||
(*tx)[sfAccount] == genesisAccountId)
{ {
// Attempt to apply the transaction directly. // Attempt to apply the transaction directly.
auto const transactionID = tx->getTransactionID(); auto const transactionID = tx->getTransactionID();

View File

@@ -38,6 +38,7 @@
#include <memory> #include <memory>
#include <optional> #include <optional>
#include <unordered_set> #include <unordered_set>
#include <iterator>
namespace ripple { namespace ripple {
@@ -321,6 +322,12 @@ public:
// The range of transactions // The range of transactions
txs_type txs; txs_type txs;
std::size_t
txCount() const
{
return std::distance(txs.begin(), txs.end());
}
}; };
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -0,0 +1,320 @@
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/misc/TxQ.h>
#include <ripple/basics/chrono.h>
#include <ripple/protocol/AccountID.h>
#include <ripple/protocol/Feature.h>
#include <ripple/protocol/jss.h>
#include <algorithm>
#include <chrono>
#include <map>
#include <mutex>
#include <test/jtx.h>
#include <test/jtx/Env.h>
#include <thread>
#include <vector>
namespace ripple {
namespace test {
using namespace jtx;
class LedgerStress_test : public beast::unit_test::suite
{
private:
static constexpr std::size_t TXN_PER_LEDGER = 50000;
static constexpr std::size_t MAX_TXN_PER_ACCOUNT = 5; // Increased from 1
static constexpr std::chrono::seconds MAX_CLOSE_TIME{15};
static constexpr std::size_t REQUIRED_ACCOUNTS =
(TXN_PER_LEDGER + MAX_TXN_PER_ACCOUNT - 1) / MAX_TXN_PER_ACCOUNT;
// Get number of hardware threads and use half
const std::size_t NUM_THREADS =
std::max(std::thread::hardware_concurrency() / 2, 1u);
struct LedgerMetrics
{
std::chrono::milliseconds submitTime{0};
std::chrono::milliseconds closeTime{0};
std::size_t txCount{0};
std::size_t successfulTxCount{
0}; // Added to track successful transactions
std::size_t failedTxCount{0}; // Added to track failed transactions
XRPAmount baseFee{0};
void
log(beast::Journal const& journal) const
{
std::cout << "Metrics - Submit time: " << submitTime.count()
<< "ms, "
<< "Close time: " << closeTime.count() << "ms, "
<< "Transaction count: " << txCount << ", "
<< "Successful: " << successfulTxCount << ", "
<< "Failed: " << failedTxCount << ", "
<< "Base fee: " << baseFee;
}
};
// Thread-safe console output
std::mutex consoleMutex;
std::atomic<std::size_t> totalSuccessfulTxns{0};
template <typename T>
void
threadSafeLog(T const& message)
{
std::lock_guard<std::mutex> lock(consoleMutex);
std::cout << message << std::endl;
}
XRPAmount
getEscalatedFee(jtx::Env& env) const
{
auto const metrics = env.app().getTxQ().getMetrics(*env.current());
auto const baseFee = env.current()->fees().base;
auto const feeLevel =
mulDiv(metrics.medFeeLevel, baseFee, metrics.referenceFeeLevel)
.second;
auto const escalatedFee = XRPAmount{feeLevel};
return XRPAmount{escalatedFee.drops() + (escalatedFee.drops())};
}
std::vector<jtx::Account>
createAccounts(jtx::Env& env, std::size_t count)
{
std::vector<jtx::Account> accounts;
accounts.reserve(count);
for (std::size_t i = 0; i < count; ++i)
{
std::string name = "account" + std::to_string(i);
auto account = jtx::Account(name);
accounts.push_back(account);
env.fund(false, XRP(100000), account);
if (i % 2500 == 0 && i != 0)
threadSafeLog("Accounts created: " + std::to_string(i));
}
env.close();
return accounts;
}
// Structure to hold work assignment for each thread
struct ThreadWork
{
std::size_t startAccountIdx;
std::size_t endAccountIdx;
std::size_t numTxnsToSubmit;
std::size_t successfulTxns{0};
std::size_t failedTxns{0};
};
void
submitBatchThread(
jtx::Env& env,
std::vector<jtx::Account> const& accounts,
ThreadWork& work) // Changed to non-const reference to update metrics
{
auto const escalatedFee = getEscalatedFee(env);
std::size_t txnsSubmitted = 0;
// Track sequence numbers for all accounts in this thread's range
std::map<AccountID, std::uint32_t> seqNumbers;
for (std::size_t i = work.startAccountIdx; i < work.endAccountIdx; ++i)
{
seqNumbers[accounts[i].id()] = env.seq(accounts[i]);
}
// Pre-calculate recipient indices for better distribution
std::vector<std::size_t> recipientIndices;
recipientIndices.reserve(accounts.size() - 1);
for (std::size_t i = 0; i < accounts.size(); ++i)
{
if (i < work.startAccountIdx || i >= work.endAccountIdx)
{
recipientIndices.push_back(i);
}
}
std::size_t recipientIdx = 0;
for (std::size_t i = work.startAccountIdx;
i < work.endAccountIdx && txnsSubmitted < work.numTxnsToSubmit;
++i)
{
auto const& sender = accounts[i];
// Calculate how many txns to submit from this account
std::size_t txnsRemaining = work.numTxnsToSubmit - txnsSubmitted;
std::size_t txnsForAccount =
std::min(MAX_TXN_PER_ACCOUNT, txnsRemaining);
// Submit transactions
for (std::size_t tx = 0; tx < txnsForAccount; ++tx)
{
// Select next recipient using round-robin
auto const& recipient =
accounts[recipientIndices[recipientIdx]];
recipientIdx = (recipientIdx + 1) % recipientIndices.size();
try
{
env.inject(
pay(sender, recipient, XRP(1)),
fee(escalatedFee),
seq(seqNumbers[sender.id()]));
++work.successfulTxns;
seqNumbers[sender.id()]++;
}
catch (std::exception const& e)
{
++work.failedTxns;
threadSafeLog(
"Exception submitting transaction: " +
std::string(e.what()));
}
++txnsSubmitted;
}
}
}
void
runStressTest(std::size_t numLedgers)
{
testcase(
"Multithreaded stress test: " + std::to_string(TXN_PER_LEDGER) +
" txns/ledger for " + std::to_string(numLedgers) +
" ledgers using " + std::to_string(NUM_THREADS) + " threads");
Env env{*this, envconfig(many_workers)};
env.app().config().MAX_TRANSACTIONS = TXN_PER_LEDGER;
auto const journal = env.app().journal("LedgerStressTest");
// Get actual hardware thread count
std::size_t hardwareThreads =
static_cast<std::size_t>(std::thread::hardware_concurrency());
if (hardwareThreads == 0)
hardwareThreads = 4; // Fallback
const std::size_t THREAD_COUNT = std::min(NUM_THREADS, hardwareThreads);
threadSafeLog(
"Using " + std::to_string(THREAD_COUNT) + " hardware threads");
threadSafeLog(
"Creating " + std::to_string(REQUIRED_ACCOUNTS) + " accounts");
auto accounts = createAccounts(env, REQUIRED_ACCOUNTS);
std::vector<LedgerMetrics> metrics;
metrics.reserve(numLedgers);
for (std::size_t ledger = 0; ledger < numLedgers; ++ledger)
{
threadSafeLog("Starting ledger " + std::to_string(ledger));
LedgerMetrics ledgerMetrics;
auto submitStart = std::chrono::steady_clock::now();
ledgerMetrics.baseFee = env.current()->fees().base;
// Calculate even distribution of work
std::vector<ThreadWork> threadAssignments;
threadAssignments.reserve(THREAD_COUNT);
std::size_t baseWorkload = TXN_PER_LEDGER / THREAD_COUNT;
std::size_t remainder = TXN_PER_LEDGER % THREAD_COUNT;
std::size_t accountsPerThread = accounts.size() / THREAD_COUNT;
std::size_t totalAccountsAssigned = 0;
for (std::size_t t = 0; t < THREAD_COUNT; ++t)
{
ThreadWork work;
work.startAccountIdx = totalAccountsAssigned;
work.endAccountIdx = (t == THREAD_COUNT - 1)
? accounts.size()
: work.startAccountIdx + accountsPerThread;
work.numTxnsToSubmit = baseWorkload + (t < remainder ? 1 : 0);
totalAccountsAssigned = work.endAccountIdx;
threadAssignments.push_back(work);
}
// Launch threads with work assignments
std::vector<std::thread> threads;
threads.reserve(THREAD_COUNT);
for (std::size_t t = 0; t < THREAD_COUNT; ++t)
{
threads.emplace_back(
[&env, &accounts, &work = threadAssignments[t], this]() {
submitBatchThread(env, accounts, work);
});
}
// Wait for all threads
for (auto& thread : threads)
{
if (thread.joinable())
thread.join();
}
// Aggregate metrics from all threads
ledgerMetrics.successfulTxCount = 0;
ledgerMetrics.failedTxCount = 0;
for (auto const& work : threadAssignments)
{
ledgerMetrics.successfulTxCount += work.successfulTxns;
ledgerMetrics.failedTxCount += work.failedTxns;
}
ledgerMetrics.submitTime =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - submitStart);
auto closeStart = std::chrono::steady_clock::now();
env.close();
ledgerMetrics.closeTime =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - closeStart);
auto const closed = env.closed();
ledgerMetrics.txCount = closed->txCount();
ledgerMetrics.log(journal);
metrics.push_back(ledgerMetrics);
auto const totalTime =
ledgerMetrics.submitTime + ledgerMetrics.closeTime;
// Updated expectations
BEAST_EXPECT(
ledgerMetrics.txCount >=
ledgerMetrics.successfulTxCount * 0.8); // Allow 20% variance
BEAST_EXPECT(
ledgerMetrics.closeTime <=
std::chrono::duration_cast<std::chrono::milliseconds>(
MAX_CLOSE_TIME));
threadSafeLog(
"\nCompleted ledger " + std::to_string(ledger) + " in " +
std::to_string(totalTime.count()) + "ms" + " with " +
std::to_string(ledgerMetrics.successfulTxCount) +
" successful transactions using " +
std::to_string(THREAD_COUNT) + " threads");
}
}
public:
void
run() override
{
runStressTest(5);
}
};
BEAST_DEFINE_TESTSUITE(LedgerStress, app, ripple);
} // namespace test
} // namespace ripple

View File

@@ -148,6 +148,12 @@ public:
operator=(Env const&) = delete; operator=(Env const&) = delete;
Env(Env const&) = delete; Env(Env const&) = delete;
Application*
getApp()
{
return bundle_.app;
}
/** /**
* @brief Create Env using suite, Config pointer, and explicit features. * @brief Create Env using suite, Config pointer, and explicit features.
* *
@@ -508,6 +514,9 @@ public:
virtual void virtual void
submit(JTx const& jt); submit(JTx const& jt);
virtual void
inject_jtx(JTx const& jt);
/** Use the submit RPC command with a provided JTx object. /** Use the submit RPC command with a provided JTx object.
This calls postconditions. This calls postconditions.
*/ */
@@ -529,6 +538,13 @@ public:
submit(jt(std::forward<JsonValue>(jv), fN...)); submit(jt(std::forward<JsonValue>(jv), fN...));
} }
template <class JsonValue, class... FN>
void
inject(JsonValue&& jv, FN const&... fN)
{
inject_jtx(jt(std::forward<JsonValue>(jv), fN...));
}
template <class JsonValue, class... FN> template <class JsonValue, class... FN>
void void
operator()(JsonValue&& jv, FN const&... fN) operator()(JsonValue&& jv, FN const&... fN)
@@ -591,7 +607,6 @@ public:
void void
disableFeature(uint256 const feature); disableFeature(uint256 const feature);
private:
void void
fund(bool setDefaultRipple, STAmount const& amount, Account const& account); fund(bool setDefaultRipple, STAmount const& amount, Account const& account);

View File

@@ -86,6 +86,9 @@ std::unique_ptr<Config> no_admin(std::unique_ptr<Config>);
std::unique_ptr<Config> std::unique_ptr<Config>
no_admin_networkid(std::unique_ptr<Config> cfg); no_admin_networkid(std::unique_ptr<Config> cfg);
std::unique_ptr<Config>
many_workers(std::unique_ptr<Config> cfg);
std::unique_ptr<Config> secure_gateway(std::unique_ptr<Config>); std::unique_ptr<Config> secure_gateway(std::unique_ptr<Config>);
std::unique_ptr<Config> admin_localnet(std::unique_ptr<Config>); std::unique_ptr<Config> admin_localnet(std::unique_ptr<Config>);

View File

@@ -49,6 +49,8 @@
#include <test/jtx/sig.h> #include <test/jtx/sig.h>
#include <test/jtx/trust.h> #include <test/jtx/trust.h>
#include <test/jtx/utility.h> #include <test/jtx/utility.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/app/misc/Transaction.h>
namespace ripple { namespace ripple {
namespace test { namespace test {
@@ -124,13 +126,17 @@ Env::close(
{ {
// Round up to next distinguishable value // Round up to next distinguishable value
using namespace std::chrono_literals; using namespace std::chrono_literals;
auto& netOPs = app().getOPs();
netOPs.forceTransactionBatch();
bool res = true; bool res = true;
closeTime += closed()->info().closeTimeResolution - 1s; closeTime += closed()->info().closeTimeResolution - 1s;
timeKeeper().set(closeTime); timeKeeper().set(closeTime);
// Go through the rpc interface unless we need to simulate // Go through the rpc interface unless we need to simulate
// a specific consensus delay. // a specific consensus delay.
if (consensusDelay) if (consensusDelay)
app().getOPs().acceptLedger(consensusDelay); netOPs.acceptLedger(consensusDelay);
else else
{ {
auto resp = rpc("ledger_accept"); auto resp = rpc("ledger_accept");
@@ -284,6 +290,33 @@ Env::parseResult(Json::Value const& jr)
return std::make_pair(ter, isTesSuccess(ter) || isTecClaim(ter)); return std::make_pair(ter, isTesSuccess(ter) || isTecClaim(ter));
} }
void
Env::inject_jtx(JTx const& jt)
{
Application& app = *(getApp());
auto& netOPs = app.getOPs();
if (jt.stx)
{
std::string reason;
// make a copy
//STTx* newData = new STTx(*jt.stx);
//auto stx = std::shared_ptr<STTx const>(newData);
auto id = jt.stx->getTransactionID();
auto tx = std::make_shared<Transaction>(jt.stx, reason, app);
/*
static int counter = 0;
counter++;
if (counter % 2500 == 0)
std::cout << "inject_jtx [" << counter++ << "] id=" << id << "\n";
*/
netOPs.processTransaction(tx, true, false);
}
return postconditions(jt, ter_, true);
}
void void
Env::submit(JTx const& jt) Env::submit(JTx const& jt)
{ {

View File

@@ -76,6 +76,14 @@ setupConfigForUnitTests(Config& cfg)
namespace jtx { namespace jtx {
std::unique_ptr<Config>
many_workers(std::unique_ptr<Config> cfg)
{
cfg->WORKERS = 128;
return cfg;
}
std::unique_ptr<Config> std::unique_ptr<Config>
no_admin(std::unique_ptr<Config> cfg) no_admin(std::unique_ptr<Config> cfg)
{ {