From 4cc37b9cd844ff61da977af9f974ba82a4a6b9af Mon Sep 17 00:00:00 2001 From: Valentin Balaschenko <13349202+vlntb@users.noreply.github.com> Date: Thu, 22 May 2025 16:50:32 +0100 Subject: [PATCH] networkops duration measure --- src/xrpld/app/misc/NetworkOPs.cpp | 612 +++++++++++++++++------------- src/xrpld/core/detail/SociDB.cpp | 17 +- src/xrpld/net/detail/RPCSub.cpp | 16 +- src/xrpld/perflog/PerfLog.h | 35 +- 4 files changed, 406 insertions(+), 274 deletions(-) diff --git a/src/xrpld/app/misc/NetworkOPs.cpp b/src/xrpld/app/misc/NetworkOPs.cpp index d87dea3c52..d39d0fa8c5 100644 --- a/src/xrpld/app/misc/NetworkOPs.cpp +++ b/src/xrpld/app/misc/NetworkOPs.cpp @@ -81,6 +81,8 @@ #include #include +using namespace std::chrono_literals; + namespace ripple { class NetworkOPsImp final : public NetworkOPs @@ -994,9 +996,16 @@ NetworkOPsImp::setHeartbeatTimer() heartbeatTimer_, mConsensus.parms().ledgerGRANULARITY, [this]() { - m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() { - processHeartbeatTimer(); - }); + m_job_queue.addJob( + jtNETOP_TIMER, + "NetOPs.heartbeat", + [this, journal = m_journal]() { + perf::measureDurationAndLog( + [&]() { processHeartbeatTimer(); }, + "NetOPs.heartbeat", + 1s, + journal); + }); }, [this]() { setHeartbeatTimer(); }); } @@ -1010,9 +1019,16 @@ NetworkOPsImp::setClusterTimer() clusterTimer_, 10s, [this]() { - m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this]() { - processClusterTimer(); - }); + m_job_queue.addJob( + jtNETOP_CLUSTER, + "NetOPs.cluster", + [this, journal = m_journal]() { + perf::measureDurationAndLog( + [&]() { processClusterTimer(); }, + "NetOPs.cluster", + 1s, + journal); + }); }, [this]() { setClusterTimer(); }); } @@ -1229,10 +1245,17 @@ NetworkOPsImp::submitTransaction(std::shared_ptr const& iTrans) auto tx = std::make_shared(trans, reason, app_); - m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() { - auto t = tx; - processTransaction(t, false, false, FailHard::no); - }); + m_job_queue.addJob( + jtTRANSACTION, "submitTxn", [this, tx, journal = m_journal]() { + perf::measureDurationAndLog( + [&]() { + auto t = tx; + processTransaction(t, false, false, FailHard::no); + }, + "submitTxn", + 1s, + journal); + }); } bool @@ -1315,7 +1338,13 @@ NetworkOPsImp::doTransactionAsync( if (mDispatchState == DispatchState::none) { if (m_job_queue.addJob( - jtBATCH, "transactionBatch", [this]() { transactionBatch(); })) + jtBATCH, "transactionBatch", [this, journal = m_journal]() { + perf::measureDurationAndLog( + [&]() { transactionBatch(); }, + "transactionBatch", + 1s, + journal); + })) { mDispatchState = DispatchState::scheduled; } @@ -1362,9 +1391,16 @@ NetworkOPsImp::doTransactionSyncBatch( if (mTransactions.size()) { // More transactions need to be applied, but by another job. - if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() { - transactionBatch(); - })) + if (m_job_queue.addJob( + jtBATCH, + "transactionBatch", + [this, journal = m_journal]() { + perf::measureDurationAndLog( + [&]() { transactionBatch(); }, + "transactionBatch", + 1s, + journal); + })) { mDispatchState = DispatchState::scheduled; } @@ -2655,8 +2691,9 @@ NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters) info[jss::server_state] = strOperatingMode(admin); - info[jss::time] = to_string(std::chrono::floor( - std::chrono::system_clock::now())); + info[jss::time] = to_string( + std::chrono::floor( + std::chrono::system_clock::now())); if (needNetworkLedger_) info[jss::network_ledger] = "waiting"; @@ -3175,8 +3212,17 @@ NetworkOPsImp::reportFeeChange() if (f != mLastFeeSummary) { m_job_queue.addJob( - jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this]() { - pubServer(); + jtCLIENT_FEE_CHANGE, + "reportFeeChange->pubServer", + [this, journal = m_journal]() { + perf::measureDurationAndLog( + [&]() { + pubServer(); + return true; + }, + "reportFeeChange->pubServer", + 1s, + journal); }); } } @@ -3187,7 +3233,16 @@ NetworkOPsImp::reportConsensusStateChange(ConsensusPhase phase) m_job_queue.addJob( jtCLIENT_CONSENSUS, "reportConsensusStateChange->pubConsensus", - [this, phase]() { pubConsensus(phase); }); + [this, phase, journal = m_journal]() { + perf::measureDurationAndLog( + [&]() { + pubConsensus(phase); + return true; + }, + "reportConsensusStateChange->pubConsensus", + 1s, + journal); + }); } inline void @@ -3693,262 +3748,301 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo) jtCLIENT_ACCT_HIST, "AccountHistoryTxStream", [this, dbType = databaseType, subInfo]() { - auto const& accountId = subInfo.index_->accountId_; - auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_; - auto& txHistoryIndex = subInfo.index_->historyTxIndex_; + perf::measureDurationAndLog( + [&]() { + auto const& accountId = subInfo.index_->accountId_; + auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_; + auto& txHistoryIndex = subInfo.index_->historyTxIndex_; - JLOG(m_journal.trace()) - << "AccountHistory job for account " << toBase58(accountId) - << " started. lastLedgerSeq=" << lastLedgerSeq; - - auto isFirstTx = [&](std::shared_ptr const& tx, - std::shared_ptr const& meta) -> bool { - /* - * genesis account: first tx is the one with seq 1 - * other account: first tx is the one created the account - */ - if (accountId == genesisAccountId) - { - auto stx = tx->getSTransaction(); - if (stx->getAccountID(sfAccount) == accountId && - stx->getSeqValue() == 1) - return true; - } - - for (auto& node : meta->getNodes()) - { - if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT) - continue; - - if (node.isFieldPresent(sfNewFields)) - { - if (auto inner = dynamic_cast( - node.peekAtPField(sfNewFields)); - inner) - { - if (inner->isFieldPresent(sfAccount) && - inner->getAccountID(sfAccount) == accountId) - { - return true; - } - } - } - } - - return false; - }; - - auto send = [&](Json::Value const& jvObj, - bool unsubscribe) -> bool { - if (auto sptr = subInfo.sinkWptr_.lock()) - { - sptr->send(jvObj, true); - if (unsubscribe) - unsubAccountHistory(sptr, accountId, false); - return true; - } - - return false; - }; - - auto sendMultiApiJson = [&](MultiApiJson const& jvObj, - bool unsubscribe) -> bool { - if (auto sptr = subInfo.sinkWptr_.lock()) - { - jvObj.visit( - sptr->getApiVersion(), // - [&](Json::Value const& jv) { sptr->send(jv, true); }); - - if (unsubscribe) - unsubAccountHistory(sptr, accountId, false); - return true; - } - - return false; - }; - - auto getMoreTxns = - [&](std::uint32_t minLedger, - std::uint32_t maxLedger, - std::optional marker) - -> std::optional>> { - switch (dbType) - { - case Sqlite: { - auto db = static_cast( - &app_.getRelationalDatabase()); - RelationalDatabase::AccountTxPageOptions options{ - accountId, minLedger, maxLedger, marker, 0, true}; - return db->newestAccountTxPage(options); - } - default: { - UNREACHABLE( - "ripple::NetworkOPsImp::addAccountHistoryJob::" - "getMoreTxns : invalid database type"); - return {}; - } - } - }; - - /* - * search backward until the genesis ledger or asked to stop - */ - while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_) - { - int feeChargeCount = 0; - if (auto sptr = subInfo.sinkWptr_.lock(); sptr) - { - sptr->getConsumer().charge(Resource::feeMediumBurdenRPC); - ++feeChargeCount; - } - else - { JLOG(m_journal.trace()) << "AccountHistory job for account " - << toBase58(accountId) << " no InfoSub. Fee charged " - << feeChargeCount << " times."; - return; - } + << toBase58(accountId) + << " started. lastLedgerSeq=" << lastLedgerSeq; - // try to search in 1024 ledgers till reaching genesis ledgers - auto startLedgerSeq = - (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2); - JLOG(m_journal.trace()) - << "AccountHistory job for account " << toBase58(accountId) - << ", working on ledger range [" << startLedgerSeq << "," - << lastLedgerSeq << "]"; + auto isFirstTx = + [&](std::shared_ptr const& tx, + std::shared_ptr const& meta) -> bool { + /* + * genesis account: first tx is the one with seq 1 + * other account: first tx is the one created the + * account + */ + if (accountId == genesisAccountId) + { + auto stx = tx->getSTransaction(); + if (stx->getAccountID(sfAccount) == accountId && + stx->getSeqValue() == 1) + return true; + } - auto haveRange = [&]() -> bool { - std::uint32_t validatedMin = UINT_MAX; - std::uint32_t validatedMax = 0; - auto haveSomeValidatedLedgers = - app_.getLedgerMaster().getValidatedRange( - validatedMin, validatedMax); + for (auto& node : meta->getNodes()) + { + if (node.getFieldU16(sfLedgerEntryType) != + ltACCOUNT_ROOT) + continue; - return haveSomeValidatedLedgers && - validatedMin <= startLedgerSeq && - lastLedgerSeq <= validatedMax; - }(); + if (node.isFieldPresent(sfNewFields)) + { + if (auto inner = dynamic_cast( + node.peekAtPField(sfNewFields)); + inner) + { + if (inner->isFieldPresent(sfAccount) && + inner->getAccountID(sfAccount) == + accountId) + { + return true; + } + } + } + } - if (!haveRange) - { - JLOG(m_journal.debug()) - << "AccountHistory reschedule job for account " - << toBase58(accountId) << ", incomplete ledger range [" - << startLedgerSeq << "," << lastLedgerSeq << "]"; - setAccountHistoryJobTimer(subInfo); - return; - } + return false; + }; - std::optional marker{}; - while (!subInfo.index_->stopHistorical_) - { - auto dbResult = - getMoreTxns(startLedgerSeq, lastLedgerSeq, marker); - if (!dbResult) + auto send = [&](Json::Value const& jvObj, + bool unsubscribe) -> bool { + if (auto sptr = subInfo.sinkWptr_.lock()) + { + sptr->send(jvObj, true); + if (unsubscribe) + unsubAccountHistory(sptr, accountId, false); + return true; + } + + return false; + }; + + auto sendMultiApiJson = [&](MultiApiJson const& jvObj, + bool unsubscribe) -> bool { + if (auto sptr = subInfo.sinkWptr_.lock()) + { + jvObj.visit( + sptr->getApiVersion(), // + [&](Json::Value const& jv) { + sptr->send(jv, true); + }); + + if (unsubscribe) + unsubAccountHistory(sptr, accountId, false); + return true; + } + + return false; + }; + + auto getMoreTxns = + [&](std::uint32_t minLedger, + std::uint32_t maxLedger, + std::optional + marker) + -> std::optional>> { + switch (dbType) + { + case Sqlite: { + auto db = static_cast( + &app_.getRelationalDatabase()); + RelationalDatabase::AccountTxPageOptions + options{ + accountId, + minLedger, + maxLedger, + marker, + 0, + true}; + return db->newestAccountTxPage(options); + } + default: { + UNREACHABLE( + "ripple::NetworkOPsImp::" + "addAccountHistoryJob::" + "getMoreTxns : invalid database type"); + return {}; + } + } + }; + + /* + * search backward until the genesis ledger or asked to stop + */ + while (lastLedgerSeq >= 2 && + !subInfo.index_->stopHistorical_) { - JLOG(m_journal.debug()) - << "AccountHistory job for account " - << toBase58(accountId) << " getMoreTxns failed."; - send(rpcError(rpcINTERNAL), true); - return; - } - - auto const& txns = dbResult->first; - marker = dbResult->second; - size_t num_txns = txns.size(); - for (size_t i = 0; i < num_txns; ++i) - { - auto const& [tx, meta] = txns[i]; - - if (!tx || !meta) + int feeChargeCount = 0; + if (auto sptr = subInfo.sinkWptr_.lock(); sptr) { - JLOG(m_journal.debug()) - << "AccountHistory job for account " - << toBase58(accountId) << " empty tx or meta."; - send(rpcError(rpcINTERNAL), true); - return; - } - auto curTxLedger = - app_.getLedgerMaster().getLedgerBySeq( - tx->getLedger()); - if (!curTxLedger) - { - JLOG(m_journal.debug()) - << "AccountHistory job for account " - << toBase58(accountId) << " no ledger."; - send(rpcError(rpcINTERNAL), true); - return; - } - std::shared_ptr stTxn = - tx->getSTransaction(); - if (!stTxn) - { - JLOG(m_journal.debug()) - << "AccountHistory job for account " - << toBase58(accountId) - << " getSTransaction failed."; - send(rpcError(rpcINTERNAL), true); - return; - } - - auto const mRef = std::ref(*meta); - auto const trR = meta->getResultTER(); - MultiApiJson jvTx = - transJson(stTxn, trR, true, curTxLedger, mRef); - - jvTx.set( - jss::account_history_tx_index, txHistoryIndex--); - if (i + 1 == num_txns || - txns[i + 1].first->getLedger() != tx->getLedger()) - jvTx.set(jss::account_history_boundary, true); - - if (isFirstTx(tx, meta)) - { - jvTx.set(jss::account_history_tx_first, true); - sendMultiApiJson(jvTx, false); - - JLOG(m_journal.trace()) - << "AccountHistory job for account " - << toBase58(accountId) - << " done, found last tx."; - return; + sptr->getConsumer().charge( + Resource::feeMediumBurdenRPC); + ++feeChargeCount; } else { - sendMultiApiJson(jvTx, false); + JLOG(m_journal.trace()) + << "AccountHistory job for account " + << toBase58(accountId) + << " no InfoSub. Fee charged " << feeChargeCount + << " times."; + return; + } + + // try to search in 1024 ledgers till reaching genesis + // ledgers + auto startLedgerSeq = + (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 + : 2); + JLOG(m_journal.trace()) + << "AccountHistory job for account " + << toBase58(accountId) + << ", working on ledger range [" << startLedgerSeq + << "," << lastLedgerSeq << "]"; + + auto haveRange = [&]() -> bool { + std::uint32_t validatedMin = UINT_MAX; + std::uint32_t validatedMax = 0; + auto haveSomeValidatedLedgers = + app_.getLedgerMaster().getValidatedRange( + validatedMin, validatedMax); + + return haveSomeValidatedLedgers && + validatedMin <= startLedgerSeq && + lastLedgerSeq <= validatedMax; + }(); + + if (!haveRange) + { + JLOG(m_journal.debug()) + << "AccountHistory reschedule job for account " + << toBase58(accountId) + << ", incomplete ledger range [" + << startLedgerSeq << "," << lastLedgerSeq + << "]"; + setAccountHistoryJobTimer(subInfo); + return; + } + + std::optional + marker{}; + while (!subInfo.index_->stopHistorical_) + { + auto dbResult = getMoreTxns( + startLedgerSeq, lastLedgerSeq, marker); + if (!dbResult) + { + JLOG(m_journal.debug()) + << "AccountHistory job for account " + << toBase58(accountId) + << " getMoreTxns failed."; + send(rpcError(rpcINTERNAL), true); + return; + } + + auto const& txns = dbResult->first; + marker = dbResult->second; + size_t num_txns = txns.size(); + for (size_t i = 0; i < num_txns; ++i) + { + auto const& [tx, meta] = txns[i]; + + if (!tx || !meta) + { + JLOG(m_journal.debug()) + << "AccountHistory job for account " + << toBase58(accountId) + << " empty tx or meta."; + send(rpcError(rpcINTERNAL), true); + return; + } + auto curTxLedger = + app_.getLedgerMaster().getLedgerBySeq( + tx->getLedger()); + if (!curTxLedger) + { + JLOG(m_journal.debug()) + << "AccountHistory job for account " + << toBase58(accountId) << " no ledger."; + send(rpcError(rpcINTERNAL), true); + return; + } + std::shared_ptr stTxn = + tx->getSTransaction(); + if (!stTxn) + { + JLOG(m_journal.debug()) + << "AccountHistory job for account " + << toBase58(accountId) + << " getSTransaction failed."; + send(rpcError(rpcINTERNAL), true); + return; + } + + auto const mRef = std::ref(*meta); + auto const trR = meta->getResultTER(); + MultiApiJson jvTx = transJson( + stTxn, trR, true, curTxLedger, mRef); + + jvTx.set( + jss::account_history_tx_index, + txHistoryIndex--); + if (i + 1 == num_txns || + txns[i + 1].first->getLedger() != + tx->getLedger()) + jvTx.set( + jss::account_history_boundary, true); + + if (isFirstTx(tx, meta)) + { + jvTx.set( + jss::account_history_tx_first, true); + sendMultiApiJson(jvTx, false); + + JLOG(m_journal.trace()) + << "AccountHistory job for account " + << toBase58(accountId) + << " done, found last tx."; + return; + } + else + { + sendMultiApiJson(jvTx, false); + } + } + + if (marker) + { + JLOG(m_journal.trace()) + << "AccountHistory job for account " + << toBase58(accountId) + << " paging, marker=" << marker->ledgerSeq + << ":" << marker->txnSeq; + } + else + { + break; + } + } + + if (!subInfo.index_->stopHistorical_) + { + lastLedgerSeq = startLedgerSeq - 1; + if (lastLedgerSeq <= 1) + { + JLOG(m_journal.trace()) + << "AccountHistory job for account " + << toBase58(accountId) + << " done, reached genesis ledger."; + return; + } } } - if (marker) - { - JLOG(m_journal.trace()) - << "AccountHistory job for account " - << toBase58(accountId) - << " paging, marker=" << marker->ledgerSeq << ":" - << marker->txnSeq; - } - else - { - break; - } - } - - if (!subInfo.index_->stopHistorical_) - { - lastLedgerSeq = startLedgerSeq - 1; - if (lastLedgerSeq <= 1) - { - JLOG(m_journal.trace()) - << "AccountHistory job for account " - << toBase58(accountId) - << " done, reached genesis ledger."; - return; - } - } - } + return; + }, + "AccountHistoryTxStream", + 1s, + this->m_journal); }); } diff --git a/src/xrpld/core/detail/SociDB.cpp b/src/xrpld/core/detail/SociDB.cpp index 5b298dac43..4b3c3a7b84 100644 --- a/src/xrpld/core/detail/SociDB.cpp +++ b/src/xrpld/core/detail/SociDB.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -35,6 +36,8 @@ #include +using namespace std::chrono_literals; + namespace ripple { static auto checkpointPageCount = 1000; @@ -257,9 +260,17 @@ public: // There is a separate check in `checkpoint` for a valid // connection in the rare case when the DatabaseCon is destroyed // after locking this weak_ptr - [wp = std::weak_ptr{shared_from_this()}]() { - if (auto self = wp.lock()) - self->checkpoint(); + [wp = std::weak_ptr{shared_from_this()}, + journal = j_]() { + perf::measureDurationAndLog( + [&]() { + if (auto self = wp.lock()) + self->checkpoint(); + return true; + }, + "WAL", + 1s, + journal); })) { std::lock_guard lock(mutex_); diff --git a/src/xrpld/net/detail/RPCSub.cpp b/src/xrpld/net/detail/RPCSub.cpp index 3f0c923e13..a0e1b9974f 100644 --- a/src/xrpld/net/detail/RPCSub.cpp +++ b/src/xrpld/net/detail/RPCSub.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -27,6 +28,8 @@ #include +using namespace std::chrono_literals; + namespace ripple { // Subscription object for JSON-RPC @@ -91,8 +94,17 @@ public: JLOG(j_.info()) << "RPCCall::fromNetwork start"; mSending = m_jobQueue.addJob( - jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this]() { - sendThread(); + jtCLIENT_SUBSCRIBE, + "RPCSub::sendThread", + [this, journal = j_]() { + perf::measureDurationAndLog( + [&]() { + sendThread(); + return true; + }, + "RPCSub::sendThread", + 1s, + journal); }); } } diff --git a/src/xrpld/perflog/PerfLog.h b/src/xrpld/perflog/PerfLog.h index 5212752ec7..98ff2dbc9e 100644 --- a/src/xrpld/perflog/PerfLog.h +++ b/src/xrpld/perflog/PerfLog.h @@ -190,20 +190,35 @@ measureDurationAndLog( std::chrono::duration maxDelay, beast::Journal const& journal) { + using Result = std::invoke_result_t; + auto start_time = std::chrono::high_resolution_clock::now(); - auto result = func(); - - auto end_time = std::chrono::high_resolution_clock::now(); - auto duration = std::chrono::duration_cast( - end_time - start_time); - if (duration > maxDelay) + if constexpr (std::is_void_v) { - JLOG(journal.warn()) - << actionDescription << " took " << duration.count() << " ms"; + std::forward(func)(); + auto end_time = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast( + end_time - start_time); + if (duration > maxDelay) + { + JLOG(journal.warn()) + << actionDescription << " took " << duration.count() << " ms"; + } + } + else + { + Result result = std::forward(func)(); + auto end_time = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast( + end_time - start_time); + if (duration > maxDelay) + { + JLOG(journal.warn()) + << actionDescription << " took " << duration.count() << " ms"; + } + return result; } - - return result; } } // namespace perf