Compare commits

...

6 Commits

Author SHA1 Message Date
Valentin Balaschenko
3592bd43c1 clang and levelization 2025-05-22 17:58:28 +01:00
Valentin Balaschenko
aa14097b02 measureDurationAndLog wip 2 2025-05-22 17:49:52 +01:00
Valentin Balaschenko
8a5f95c223 measureDurationAndLog wip 1 2025-05-22 17:29:56 +01:00
Valentin Balaschenko
fbda5ccc15 Ledger master duration 2025-05-22 17:16:38 +01:00
Valentin Balaschenko
4cc37b9cd8 networkops duration measure 2025-05-22 16:50:32 +01:00
Valentin Balaschenko
3495cb3753 PeerImp measure duration 2025-05-22 13:35:57 +01:00
18 changed files with 794 additions and 433 deletions

View File

@@ -29,7 +29,7 @@ Loop: xrpld.core xrpld.net
xrpld.net > xrpld.core xrpld.net > xrpld.core
Loop: xrpld.core xrpld.perflog Loop: xrpld.core xrpld.perflog
xrpld.perflog == xrpld.core xrpld.perflog ~= xrpld.core
Loop: xrpld.net xrpld.rpc Loop: xrpld.net xrpld.rpc
xrpld.rpc ~= xrpld.net xrpld.rpc ~= xrpld.net

View File

@@ -162,6 +162,7 @@ xrpld.ledger > xrpl.basics
xrpld.ledger > xrpl.json xrpld.ledger > xrpl.json
xrpld.ledger > xrpl.protocol xrpld.ledger > xrpl.protocol
xrpld.net > xrpl.basics xrpld.net > xrpl.basics
xrpld.net > xrpld.perflog
xrpld.net > xrpl.json xrpld.net > xrpl.json
xrpld.net > xrpl.protocol xrpld.net > xrpl.protocol
xrpld.net > xrpl.resource xrpld.net > xrpl.resource

View File

@@ -37,6 +37,7 @@
#include <xrpld/consensus/LedgerTiming.h> #include <xrpld/consensus/LedgerTiming.h>
#include <xrpld/overlay/Overlay.h> #include <xrpld/overlay/Overlay.h>
#include <xrpld/overlay/predicates.h> #include <xrpld/overlay/predicates.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/random.h> #include <xrpl/basics/random.h>
#include <xrpl/beast/core/LexicalCast.h> #include <xrpl/beast/core/LexicalCast.h>
@@ -49,6 +50,8 @@
#include <iomanip> #include <iomanip>
#include <mutex> #include <mutex>
using namespace std::chrono_literals;
namespace ripple { namespace ripple {
RCLConsensus::RCLConsensus( RCLConsensus::RCLConsensus(
@@ -140,11 +143,17 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtADVANCE, jtADVANCE,
"getConsensusLedger1", "getConsensusLedger1",
[id = hash, &app = app_, this]() { [id = hash, &app = app_, this, journal = j_]() {
JLOG(j_.debug()) perf::measureDurationAndLog(
<< "JOB advanceLedger getConsensusLedger1 started"; [&]() {
app.getInboundLedgers().acquireAsync( JLOG(j_.debug()) << "JOB advanceLedger "
id, 0, InboundLedger::Reason::CONSENSUS); "getConsensusLedger1 started";
app.getInboundLedgers().acquireAsync(
id, 0, InboundLedger::Reason::CONSENSUS);
},
"getConsensusLedger1",
1s,
journal);
}); });
} }
return std::nullopt; return std::nullopt;
@@ -442,21 +451,27 @@ RCLConsensus::Adaptor::onAccept(
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtACCEPT, jtACCEPT,
"acceptLedger", "acceptLedger",
[=, this, cj = std::move(consensusJson)]() mutable { [=, this, cj = std::move(consensusJson), journal = j_]() mutable {
// Note that no lock is held or acquired during this job. perf::measureDurationAndLog(
// This is because generic Consensus guarantees that once a ledger [&]() {
// is accepted, the consensus results and capture by reference state // Note that no lock is held or acquired during this job.
// will not change until startRound is called (which happens via // This is because generic Consensus guarantees that once a
// endConsensus). // ledger is accepted, the consensus results and capture by
RclConsensusLogger clog("onAccept", validating, j_); // reference state will not change until startRound is
this->doAccept( // called (which happens via endConsensus).
result, RclConsensusLogger clog("onAccept", validating, j_);
prevLedger, this->doAccept(
closeResolution, result,
rawCloseTimes, prevLedger,
mode, closeResolution,
std::move(cj)); rawCloseTimes,
this->app_.getOPs().endConsensus(clog.ss()); mode,
std::move(cj));
this->app_.getOPs().endConsensus(clog.ss());
},
"acceptLedger",
1s,
journal);
}); });
} }

View File

@@ -32,6 +32,8 @@
#include <memory> #include <memory>
using namespace std::chrono_literals;
namespace ripple { namespace ripple {
RCLValidatedLedger::RCLValidatedLedger(MakeGenesis) RCLValidatedLedger::RCLValidatedLedger(MakeGenesis)
@@ -142,11 +144,19 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash)
Application* pApp = &app_; Application* pApp = &app_;
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtADVANCE, "getConsensusLedger2", [pApp, hash, this]() { jtADVANCE,
JLOG(j_.debug()) "getConsensusLedger2",
<< "JOB advanceLedger getConsensusLedger2 started"; [pApp, hash, this, journal = j_]() {
pApp->getInboundLedgers().acquireAsync( perf::measureDurationAndLog(
hash, 0, InboundLedger::Reason::CONSENSUS); [&]() {
JLOG(j_.debug())
<< "JOB advanceLedger getConsensusLedger2 started";
pApp->getInboundLedgers().acquireAsync(
hash, 0, InboundLedger::Reason::CONSENSUS);
},
"getConsensusLedger2",
1s,
journal);
}); });
return std::nullopt; return std::nullopt;
} }

View File

@@ -23,11 +23,14 @@
#include <xrpld/app/misc/Transaction.h> #include <xrpld/app/misc/Transaction.h>
#include <xrpld/core/JobQueue.h> #include <xrpld/core/JobQueue.h>
#include <xrpld/nodestore/Database.h> #include <xrpld/nodestore/Database.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/Log.h> #include <xrpl/basics/Log.h>
#include <xrpl/protocol/HashPrefix.h> #include <xrpl/protocol/HashPrefix.h>
#include <xrpl/protocol/digest.h> #include <xrpl/protocol/digest.h>
using namespace std::chrono_literals;
namespace ripple { namespace ripple {
ConsensusTransSetSF::ConsensusTransSetSF(Application& app, NodeCache& nodeCache) ConsensusTransSetSF::ConsensusTransSetSF(Application& app, NodeCache& nodeCache)
@@ -65,9 +68,14 @@ ConsensusTransSetSF::gotNode(
"ripple::ConsensusTransSetSF::gotNode : transaction hash " "ripple::ConsensusTransSetSF::gotNode : transaction hash "
"match"); "match");
auto const pap = &app_; auto const pap = &app_;
app_.getJobQueue().addJob(jtTRANSACTION, "TXS->TXN", [pap, stx]() { app_.getJobQueue().addJob(
pap->getOPs().submitTransaction(stx); jtTRANSACTION, "TXS->TXN", [pap, stx, journal = j_]() {
}); perf::measureDurationAndLog(
[&]() { pap->getOPs().submitTransaction(stx); },
"TXS->TXN",
1s,
journal);
});
} }
catch (std::exception const& ex) catch (std::exception const& ex)
{ {

View File

@@ -30,6 +30,7 @@
#include <xrpld/core/SociDB.h> #include <xrpld/core/SociDB.h>
#include <xrpld/nodestore/Database.h> #include <xrpld/nodestore/Database.h>
#include <xrpld/nodestore/detail/DatabaseNodeImp.h> #include <xrpld/nodestore/detail/DatabaseNodeImp.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/Log.h> #include <xrpl/basics/Log.h>
#include <xrpl/basics/contract.h> #include <xrpl/basics/contract.h>
@@ -46,6 +47,8 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
using namespace std::chrono_literals;
namespace ripple { namespace ripple {
create_genesis_t const create_genesis{}; create_genesis_t const create_genesis{};
@@ -1028,7 +1031,12 @@ pendSaveValidated(
isCurrent ? jtPUBLEDGER : jtPUBOLDLEDGER, isCurrent ? jtPUBLEDGER : jtPUBOLDLEDGER,
std::to_string(ledger->seq()), std::to_string(ledger->seq()),
[&app, ledger, isCurrent]() { [&app, ledger, isCurrent]() {
saveValidatedLedger(app, ledger, isCurrent); beast::Journal journal = app.journal("Ledger");
perf::measureDurationAndLog(
[&]() { saveValidatedLedger(app, ledger, isCurrent); },
"OrderBookDB::update:",
1s,
journal);
})) }))
{ {
return true; return true;

View File

@@ -24,10 +24,13 @@
#include <xrpld/app/misc/NetworkOPs.h> #include <xrpld/app/misc/NetworkOPs.h>
#include <xrpld/core/Config.h> #include <xrpld/core/Config.h>
#include <xrpld/core/JobQueue.h> #include <xrpld/core/JobQueue.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/Log.h> #include <xrpl/basics/Log.h>
#include <xrpl/protocol/Indexes.h> #include <xrpl/protocol/Indexes.h>
using namespace std::chrono_literals;
namespace ripple { namespace ripple {
OrderBookDB::OrderBookDB(Application& app) OrderBookDB::OrderBookDB(Application& app)
@@ -69,7 +72,13 @@ OrderBookDB::setup(std::shared_ptr<ReadView const> const& ledger)
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtUPDATE_PF, jtUPDATE_PF,
"OrderBookDB::update: " + std::to_string(ledger->seq()), "OrderBookDB::update: " + std::to_string(ledger->seq()),
[this, ledger]() { update(ledger); }); [this, ledger, journal = j_]() {
perf::measureDurationAndLog(
[&]() { update(ledger); },
"OrderBookDB::update:",
1s,
journal);
});
} }
} }

View File

@@ -25,6 +25,7 @@
#include <xrpld/app/main/Application.h> #include <xrpld/app/main/Application.h>
#include <xrpld/core/JobQueue.h> #include <xrpld/core/JobQueue.h>
#include <xrpld/overlay/Overlay.h> #include <xrpld/overlay/Overlay.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpld/shamap/SHAMapNodeID.h> #include <xrpld/shamap/SHAMapNodeID.h>
#include <xrpl/basics/Log.h> #include <xrpl/basics/Log.h>
@@ -37,6 +38,8 @@
#include <algorithm> #include <algorithm>
#include <random> #include <random>
using namespace std::chrono_literals;
namespace ripple { namespace ripple {
using namespace std::chrono_literals; using namespace std::chrono_literals;
@@ -473,15 +476,24 @@ InboundLedger::done()
// We hold the PeerSet lock, so must dispatch // We hold the PeerSet lock, so must dispatch
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() { jtLEDGER_DATA,
if (self->complete_ && !self->failed_) "AcquisitionDone",
{ [self = shared_from_this(), journal = journal_]() {
self->app_.getLedgerMaster().checkAccept(self->getLedger()); perf::measureDurationAndLog(
self->app_.getLedgerMaster().tryAdvance(); [&]() {
} if (self->complete_ && !self->failed_)
else {
self->app_.getInboundLedgers().logFailure( self->app_.getLedgerMaster().checkAccept(
self->hash_, self->mSeq); self->getLedger());
self->app_.getLedgerMaster().tryAdvance();
}
else
self->app_.getInboundLedgers().logFailure(
self->hash_, self->mSeq);
},
"AcquisitionDone",
1s,
journal);
}); });
} }

View File

@@ -35,6 +35,8 @@
#include <mutex> #include <mutex>
#include <vector> #include <vector>
using namespace std::chrono_literals;
namespace ripple { namespace ripple {
class InboundLedgersImp : public InboundLedgers class InboundLedgersImp : public InboundLedgers
@@ -212,8 +214,14 @@ public:
// dispatch // dispatch
if (ledger->gotData(std::weak_ptr<Peer>(peer), packet)) if (ledger->gotData(std::weak_ptr<Peer>(peer), packet))
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtLEDGER_DATA, "processLedgerData", [ledger]() { jtLEDGER_DATA,
ledger->runData(); "processLedgerData",
[ledger, journal = j_]() {
perf::measureDurationAndLog(
[&]() { ledger->runData(); },
"processLedgerData",
1s,
journal);
}); });
return true; return true;
@@ -227,8 +235,12 @@ public:
if (packet->type() == protocol::liAS_NODE) if (packet->type() == protocol::liAS_NODE)
{ {
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtLEDGER_DATA, "gotStaleData", [this, packet]() { jtLEDGER_DATA, "gotStaleData", [this, packet, journal = j_]() {
gotStaleData(packet); perf::measureDurationAndLog(
[&]() { gotStaleData(packet); },
"gotStaleData",
1s,
journal);
}); });
} }

View File

@@ -25,6 +25,9 @@
#include <xrpld/app/main/Application.h> #include <xrpld/app/main/Application.h>
#include <xrpld/core/JobQueue.h> #include <xrpld/core/JobQueue.h>
#include <xrpld/overlay/PeerSet.h> #include <xrpld/overlay/PeerSet.h>
#include <xrpld/perflog/PerfLog.h>
using namespace std::chrono_literals;
namespace ripple { namespace ripple {
@@ -244,22 +247,31 @@ LedgerDeltaAcquire::onLedgerBuilt(
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtREPLAY_TASK, jtREPLAY_TASK,
"onLedgerBuilt", "onLedgerBuilt",
[=, ledger = this->fullLedger_, &app = this->app_]() { [=,
for (auto reason : reasons) ledger = this->fullLedger_,
{ &app = this->app_,
switch (reason) journal = journal_]() {
{ perf::measureDurationAndLog(
case InboundLedger::Reason::GENERIC: [&]() {
app.getLedgerMaster().storeLedger(ledger); for (auto reason : reasons)
break; {
default: switch (reason)
// TODO for other use cases {
break; case InboundLedger::Reason::GENERIC:
} app.getLedgerMaster().storeLedger(ledger);
} break;
default:
// TODO for other use cases
break;
}
}
if (firstTime) if (firstTime)
app.getLedgerMaster().tryAdvance(); app.getLedgerMaster().tryAdvance();
},
"onLedgerBuilt",
1s,
journal);
}); });
} }

View File

@@ -37,6 +37,7 @@
#include <xrpld/core/TimeKeeper.h> #include <xrpld/core/TimeKeeper.h>
#include <xrpld/overlay/Overlay.h> #include <xrpld/overlay/Overlay.h>
#include <xrpld/overlay/Peer.h> #include <xrpld/overlay/Peer.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/Log.h> #include <xrpl/basics/Log.h>
#include <xrpl/basics/MathUtilities.h> #include <xrpl/basics/MathUtilities.h>
@@ -56,6 +57,8 @@
#include <memory> #include <memory>
#include <vector> #include <vector>
using namespace std::chrono_literals;
namespace ripple { namespace ripple {
// Don't catch up more than 100 ledgers (cannot exceed 256) // Don't catch up more than 100 ledgers (cannot exceed 256)
@@ -1361,27 +1364,36 @@ LedgerMaster::tryAdvance()
if (!mAdvanceThread && !mValidLedger.empty()) if (!mAdvanceThread && !mValidLedger.empty())
{ {
mAdvanceThread = true; mAdvanceThread = true;
app_.getJobQueue().addJob(jtADVANCE, "advanceLedger", [this]() { app_.getJobQueue().addJob(
std::unique_lock sl(m_mutex); jtADVANCE, "advanceLedger", [this, journal = m_journal]() {
perf::measureDurationAndLog(
[&]() {
std::unique_lock sl(m_mutex);
XRPL_ASSERT( XRPL_ASSERT(
!mValidLedger.empty() && mAdvanceThread, !mValidLedger.empty() && mAdvanceThread,
"ripple::LedgerMaster::tryAdvance : has valid ledger"); "ripple::LedgerMaster::tryAdvance : has valid "
"ledger");
JLOG(m_journal.trace()) << "advanceThread<"; JLOG(m_journal.trace()) << "advanceThread<";
try try
{ {
doAdvance(sl); doAdvance(sl);
} }
catch (std::exception const& ex) catch (std::exception const& ex)
{ {
JLOG(m_journal.fatal()) << "doAdvance throws: " << ex.what(); JLOG(m_journal.fatal())
} << "doAdvance throws: " << ex.what();
}
mAdvanceThread = false; mAdvanceThread = false;
JLOG(m_journal.trace()) << "advanceThread>"; JLOG(m_journal.trace()) << "advanceThread>";
}); },
"advanceLedger",
1s,
journal);
});
} }
} }
@@ -1536,7 +1548,13 @@ LedgerMaster::newPFWork(
<< "newPFWork: Creating job. path find threads: " << "newPFWork: Creating job. path find threads: "
<< mPathFindThread; << mPathFindThread;
if (app_.getJobQueue().addJob( if (app_.getJobQueue().addJob(
jtUPDATE_PF, name, [this]() { updatePaths(); })) jtUPDATE_PF, name, [this, journal = m_journal]() {
perf::measureDurationAndLog(
[&]() { updatePaths(); },
"newPFWork: Creating job. path find threads:",
1s,
journal);
}))
{ {
++mPathFindThread; ++mPathFindThread;
} }
@@ -1857,8 +1875,11 @@ LedgerMaster::fetchForHistory(
mFillInProgress = seq; mFillInProgress = seq;
} }
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtADVANCE, "tryFill", [this, ledger]() { jtADVANCE,
tryFill(ledger); "tryFill",
[this, ledger, journal = m_journal]() {
perf::measureDurationAndLog(
[&]() { tryFill(ledger); }, "tryFill", 1s, journal);
}); });
} }
progress = true; progress = true;
@@ -2027,10 +2048,17 @@ LedgerMaster::gotFetchPack(bool progress, std::uint32_t seq)
{ {
if (!mGotFetchPackThread.test_and_set(std::memory_order_acquire)) if (!mGotFetchPackThread.test_and_set(std::memory_order_acquire))
{ {
app_.getJobQueue().addJob(jtLEDGER_DATA, "gotFetchPack", [&]() { app_.getJobQueue().addJob(
app_.getInboundLedgers().gotFetchPack(); jtLEDGER_DATA, "gotFetchPack", [this, journal = m_journal]() {
mGotFetchPackThread.clear(std::memory_order_release); perf::measureDurationAndLog(
}); [&]() {
app_.getInboundLedgers().gotFetchPack();
mGotFetchPackThread.clear(std::memory_order_release);
},
"gotFetchPack",
1s,
journal);
});
} }
} }

View File

@@ -19,6 +19,9 @@
#include <xrpld/app/ledger/detail/TimeoutCounter.h> #include <xrpld/app/ledger/detail/TimeoutCounter.h>
#include <xrpld/core/JobQueue.h> #include <xrpld/core/JobQueue.h>
#include <xrpld/perflog/PerfLog.h>
using namespace std::chrono_literals;
namespace ripple { namespace ripple {
@@ -83,9 +86,15 @@ TimeoutCounter::queueJob(ScopedLockType& sl)
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
queueJobParameter_.jobType, queueJobParameter_.jobType,
queueJobParameter_.jobName, queueJobParameter_.jobName,
[wptr = pmDowncast()]() { [wptr = pmDowncast(), journal = journal_]() {
if (auto sptr = wptr.lock(); sptr) perf::measureDurationAndLog(
sptr->invokeOnTimer(); [&]() {
if (auto sptr = wptr.lock(); sptr)
sptr->invokeOnTimer();
},
"TimeoutCounter::queueJob",
1s,
journal);
}); });
} }

View File

@@ -23,6 +23,7 @@
#include <xrpld/app/ledger/detail/TransactionAcquire.h> #include <xrpld/app/ledger/detail/TransactionAcquire.h>
#include <xrpld/app/main/Application.h> #include <xrpld/app/main/Application.h>
#include <xrpld/app/misc/NetworkOPs.h> #include <xrpld/app/misc/NetworkOPs.h>
#include <xrpld/perflog/PerfLog.h>
#include <memory> #include <memory>
@@ -79,8 +80,16 @@ TransactionAcquire::done()
// just updates the consensus and related structures when we acquire // just updates the consensus and related structures when we acquire
// a transaction set. No need to update them if we're shutting down. // a transaction set. No need to update them if we're shutting down.
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtTXN_DATA, "completeAcquire", [pap, hash, map]() { jtTXN_DATA,
pap->getInboundTransactions().giveSet(hash, map, true); "completeAcquire",
[pap, hash, map, journal = journal_]() {
perf::measureDurationAndLog(
[&]() {
pap->getInboundTransactions().giveSet(hash, map, true);
},
"completeAcquire",
1s,
journal);
}); });
} }
} }

View File

@@ -81,6 +81,8 @@
#include <tuple> #include <tuple>
#include <unordered_map> #include <unordered_map>
using namespace std::chrono_literals;
namespace ripple { namespace ripple {
class NetworkOPsImp final : public NetworkOPs class NetworkOPsImp final : public NetworkOPs
@@ -994,9 +996,16 @@ NetworkOPsImp::setHeartbeatTimer()
heartbeatTimer_, heartbeatTimer_,
mConsensus.parms().ledgerGRANULARITY, mConsensus.parms().ledgerGRANULARITY,
[this]() { [this]() {
m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() { m_job_queue.addJob(
processHeartbeatTimer(); jtNETOP_TIMER,
}); "NetOPs.heartbeat",
[this, journal = m_journal]() {
perf::measureDurationAndLog(
[&]() { processHeartbeatTimer(); },
"NetOPs.heartbeat",
1s,
journal);
});
}, },
[this]() { setHeartbeatTimer(); }); [this]() { setHeartbeatTimer(); });
} }
@@ -1010,9 +1019,16 @@ NetworkOPsImp::setClusterTimer()
clusterTimer_, clusterTimer_,
10s, 10s,
[this]() { [this]() {
m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this]() { m_job_queue.addJob(
processClusterTimer(); jtNETOP_CLUSTER,
}); "NetOPs.cluster",
[this, journal = m_journal]() {
perf::measureDurationAndLog(
[&]() { processClusterTimer(); },
"NetOPs.cluster",
1s,
journal);
});
}, },
[this]() { setClusterTimer(); }); [this]() { setClusterTimer(); });
} }
@@ -1229,10 +1245,17 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
auto tx = std::make_shared<Transaction>(trans, reason, app_); auto tx = std::make_shared<Transaction>(trans, reason, app_);
m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() { m_job_queue.addJob(
auto t = tx; jtTRANSACTION, "submitTxn", [this, tx, journal = m_journal]() {
processTransaction(t, false, false, FailHard::no); perf::measureDurationAndLog(
}); [&]() {
auto t = tx;
processTransaction(t, false, false, FailHard::no);
},
"submitTxn",
1s,
journal);
});
} }
bool bool
@@ -1315,7 +1338,13 @@ NetworkOPsImp::doTransactionAsync(
if (mDispatchState == DispatchState::none) if (mDispatchState == DispatchState::none)
{ {
if (m_job_queue.addJob( if (m_job_queue.addJob(
jtBATCH, "transactionBatch", [this]() { transactionBatch(); })) jtBATCH, "transactionBatch", [this, journal = m_journal]() {
perf::measureDurationAndLog(
[&]() { transactionBatch(); },
"transactionBatch",
1s,
journal);
}))
{ {
mDispatchState = DispatchState::scheduled; mDispatchState = DispatchState::scheduled;
} }
@@ -1362,9 +1391,16 @@ NetworkOPsImp::doTransactionSyncBatch(
if (mTransactions.size()) if (mTransactions.size())
{ {
// More transactions need to be applied, but by another job. // More transactions need to be applied, but by another job.
if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() { if (m_job_queue.addJob(
transactionBatch(); jtBATCH,
})) "transactionBatch",
[this, journal = m_journal]() {
perf::measureDurationAndLog(
[&]() { transactionBatch(); },
"transactionBatch",
1s,
journal);
}))
{ {
mDispatchState = DispatchState::scheduled; mDispatchState = DispatchState::scheduled;
} }
@@ -3175,8 +3211,17 @@ NetworkOPsImp::reportFeeChange()
if (f != mLastFeeSummary) if (f != mLastFeeSummary)
{ {
m_job_queue.addJob( m_job_queue.addJob(
jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this]() { jtCLIENT_FEE_CHANGE,
pubServer(); "reportFeeChange->pubServer",
[this, journal = m_journal]() {
perf::measureDurationAndLog(
[&]() {
pubServer();
return true;
},
"reportFeeChange->pubServer",
1s,
journal);
}); });
} }
} }
@@ -3187,7 +3232,16 @@ NetworkOPsImp::reportConsensusStateChange(ConsensusPhase phase)
m_job_queue.addJob( m_job_queue.addJob(
jtCLIENT_CONSENSUS, jtCLIENT_CONSENSUS,
"reportConsensusStateChange->pubConsensus", "reportConsensusStateChange->pubConsensus",
[this, phase]() { pubConsensus(phase); }); [this, phase, journal = m_journal]() {
perf::measureDurationAndLog(
[&]() {
pubConsensus(phase);
return true;
},
"reportConsensusStateChange->pubConsensus",
1s,
journal);
});
} }
inline void inline void
@@ -3693,262 +3747,301 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
jtCLIENT_ACCT_HIST, jtCLIENT_ACCT_HIST,
"AccountHistoryTxStream", "AccountHistoryTxStream",
[this, dbType = databaseType, subInfo]() { [this, dbType = databaseType, subInfo]() {
auto const& accountId = subInfo.index_->accountId_; perf::measureDurationAndLog(
auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_; [&]() {
auto& txHistoryIndex = subInfo.index_->historyTxIndex_; auto const& accountId = subInfo.index_->accountId_;
auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " started. lastLedgerSeq=" << lastLedgerSeq;
auto isFirstTx = [&](std::shared_ptr<Transaction> const& tx,
std::shared_ptr<TxMeta> const& meta) -> bool {
/*
* genesis account: first tx is the one with seq 1
* other account: first tx is the one created the account
*/
if (accountId == genesisAccountId)
{
auto stx = tx->getSTransaction();
if (stx->getAccountID(sfAccount) == accountId &&
stx->getSeqValue() == 1)
return true;
}
for (auto& node : meta->getNodes())
{
if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
continue;
if (node.isFieldPresent(sfNewFields))
{
if (auto inner = dynamic_cast<STObject const*>(
node.peekAtPField(sfNewFields));
inner)
{
if (inner->isFieldPresent(sfAccount) &&
inner->getAccountID(sfAccount) == accountId)
{
return true;
}
}
}
}
return false;
};
auto send = [&](Json::Value const& jvObj,
bool unsubscribe) -> bool {
if (auto sptr = subInfo.sinkWptr_.lock())
{
sptr->send(jvObj, true);
if (unsubscribe)
unsubAccountHistory(sptr, accountId, false);
return true;
}
return false;
};
auto sendMultiApiJson = [&](MultiApiJson const& jvObj,
bool unsubscribe) -> bool {
if (auto sptr = subInfo.sinkWptr_.lock())
{
jvObj.visit(
sptr->getApiVersion(), //
[&](Json::Value const& jv) { sptr->send(jv, true); });
if (unsubscribe)
unsubAccountHistory(sptr, accountId, false);
return true;
}
return false;
};
auto getMoreTxns =
[&](std::uint32_t minLedger,
std::uint32_t maxLedger,
std::optional<RelationalDatabase::AccountTxMarker> marker)
-> std::optional<std::pair<
RelationalDatabase::AccountTxs,
std::optional<RelationalDatabase::AccountTxMarker>>> {
switch (dbType)
{
case Sqlite: {
auto db = static_cast<SQLiteDatabase*>(
&app_.getRelationalDatabase());
RelationalDatabase::AccountTxPageOptions options{
accountId, minLedger, maxLedger, marker, 0, true};
return db->newestAccountTxPage(options);
}
default: {
UNREACHABLE(
"ripple::NetworkOPsImp::addAccountHistoryJob::"
"getMoreTxns : invalid database type");
return {};
}
}
};
/*
* search backward until the genesis ledger or asked to stop
*/
while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_)
{
int feeChargeCount = 0;
if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
{
sptr->getConsumer().charge(Resource::feeMediumBurdenRPC);
++feeChargeCount;
}
else
{
JLOG(m_journal.trace()) JLOG(m_journal.trace())
<< "AccountHistory job for account " << "AccountHistory job for account "
<< toBase58(accountId) << " no InfoSub. Fee charged " << toBase58(accountId)
<< feeChargeCount << " times."; << " started. lastLedgerSeq=" << lastLedgerSeq;
return;
}
// try to search in 1024 ledgers till reaching genesis ledgers auto isFirstTx =
auto startLedgerSeq = [&](std::shared_ptr<Transaction> const& tx,
(lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2); std::shared_ptr<TxMeta> const& meta) -> bool {
JLOG(m_journal.trace()) /*
<< "AccountHistory job for account " << toBase58(accountId) * genesis account: first tx is the one with seq 1
<< ", working on ledger range [" << startLedgerSeq << "," * other account: first tx is the one created the
<< lastLedgerSeq << "]"; * account
*/
if (accountId == genesisAccountId)
{
auto stx = tx->getSTransaction();
if (stx->getAccountID(sfAccount) == accountId &&
stx->getSeqValue() == 1)
return true;
}
auto haveRange = [&]() -> bool { for (auto& node : meta->getNodes())
std::uint32_t validatedMin = UINT_MAX; {
std::uint32_t validatedMax = 0; if (node.getFieldU16(sfLedgerEntryType) !=
auto haveSomeValidatedLedgers = ltACCOUNT_ROOT)
app_.getLedgerMaster().getValidatedRange( continue;
validatedMin, validatedMax);
return haveSomeValidatedLedgers && if (node.isFieldPresent(sfNewFields))
validatedMin <= startLedgerSeq && {
lastLedgerSeq <= validatedMax; if (auto inner = dynamic_cast<STObject const*>(
}(); node.peekAtPField(sfNewFields));
inner)
{
if (inner->isFieldPresent(sfAccount) &&
inner->getAccountID(sfAccount) ==
accountId)
{
return true;
}
}
}
}
if (!haveRange) return false;
{ };
JLOG(m_journal.debug())
<< "AccountHistory reschedule job for account "
<< toBase58(accountId) << ", incomplete ledger range ["
<< startLedgerSeq << "," << lastLedgerSeq << "]";
setAccountHistoryJobTimer(subInfo);
return;
}
std::optional<RelationalDatabase::AccountTxMarker> marker{}; auto send = [&](Json::Value const& jvObj,
while (!subInfo.index_->stopHistorical_) bool unsubscribe) -> bool {
{ if (auto sptr = subInfo.sinkWptr_.lock())
auto dbResult = {
getMoreTxns(startLedgerSeq, lastLedgerSeq, marker); sptr->send(jvObj, true);
if (!dbResult) if (unsubscribe)
unsubAccountHistory(sptr, accountId, false);
return true;
}
return false;
};
auto sendMultiApiJson = [&](MultiApiJson const& jvObj,
bool unsubscribe) -> bool {
if (auto sptr = subInfo.sinkWptr_.lock())
{
jvObj.visit(
sptr->getApiVersion(), //
[&](Json::Value const& jv) {
sptr->send(jv, true);
});
if (unsubscribe)
unsubAccountHistory(sptr, accountId, false);
return true;
}
return false;
};
auto getMoreTxns =
[&](std::uint32_t minLedger,
std::uint32_t maxLedger,
std::optional<RelationalDatabase::AccountTxMarker>
marker)
-> std::optional<std::pair<
RelationalDatabase::AccountTxs,
std::optional<
RelationalDatabase::AccountTxMarker>>> {
switch (dbType)
{
case Sqlite: {
auto db = static_cast<SQLiteDatabase*>(
&app_.getRelationalDatabase());
RelationalDatabase::AccountTxPageOptions
options{
accountId,
minLedger,
maxLedger,
marker,
0,
true};
return db->newestAccountTxPage(options);
}
default: {
UNREACHABLE(
"ripple::NetworkOPsImp::"
"addAccountHistoryJob::"
"getMoreTxns : invalid database type");
return {};
}
}
};
/*
* search backward until the genesis ledger or asked to stop
*/
while (lastLedgerSeq >= 2 &&
!subInfo.index_->stopHistorical_)
{ {
JLOG(m_journal.debug()) int feeChargeCount = 0;
<< "AccountHistory job for account " if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
<< toBase58(accountId) << " getMoreTxns failed.";
send(rpcError(rpcINTERNAL), true);
return;
}
auto const& txns = dbResult->first;
marker = dbResult->second;
size_t num_txns = txns.size();
for (size_t i = 0; i < num_txns; ++i)
{
auto const& [tx, meta] = txns[i];
if (!tx || !meta)
{ {
JLOG(m_journal.debug()) sptr->getConsumer().charge(
<< "AccountHistory job for account " Resource::feeMediumBurdenRPC);
<< toBase58(accountId) << " empty tx or meta."; ++feeChargeCount;
send(rpcError(rpcINTERNAL), true);
return;
}
auto curTxLedger =
app_.getLedgerMaster().getLedgerBySeq(
tx->getLedger());
if (!curTxLedger)
{
JLOG(m_journal.debug())
<< "AccountHistory job for account "
<< toBase58(accountId) << " no ledger.";
send(rpcError(rpcINTERNAL), true);
return;
}
std::shared_ptr<STTx const> stTxn =
tx->getSTransaction();
if (!stTxn)
{
JLOG(m_journal.debug())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " getSTransaction failed.";
send(rpcError(rpcINTERNAL), true);
return;
}
auto const mRef = std::ref(*meta);
auto const trR = meta->getResultTER();
MultiApiJson jvTx =
transJson(stTxn, trR, true, curTxLedger, mRef);
jvTx.set(
jss::account_history_tx_index, txHistoryIndex--);
if (i + 1 == num_txns ||
txns[i + 1].first->getLedger() != tx->getLedger())
jvTx.set(jss::account_history_boundary, true);
if (isFirstTx(tx, meta))
{
jvTx.set(jss::account_history_tx_first, true);
sendMultiApiJson(jvTx, false);
JLOG(m_journal.trace())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " done, found last tx.";
return;
} }
else else
{ {
sendMultiApiJson(jvTx, false); JLOG(m_journal.trace())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " no InfoSub. Fee charged " << feeChargeCount
<< " times.";
return;
}
// try to search in 1024 ledgers till reaching genesis
// ledgers
auto startLedgerSeq =
(lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024
: 2);
JLOG(m_journal.trace())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< ", working on ledger range [" << startLedgerSeq
<< "," << lastLedgerSeq << "]";
auto haveRange = [&]() -> bool {
std::uint32_t validatedMin = UINT_MAX;
std::uint32_t validatedMax = 0;
auto haveSomeValidatedLedgers =
app_.getLedgerMaster().getValidatedRange(
validatedMin, validatedMax);
return haveSomeValidatedLedgers &&
validatedMin <= startLedgerSeq &&
lastLedgerSeq <= validatedMax;
}();
if (!haveRange)
{
JLOG(m_journal.debug())
<< "AccountHistory reschedule job for account "
<< toBase58(accountId)
<< ", incomplete ledger range ["
<< startLedgerSeq << "," << lastLedgerSeq
<< "]";
setAccountHistoryJobTimer(subInfo);
return;
}
std::optional<RelationalDatabase::AccountTxMarker>
marker{};
while (!subInfo.index_->stopHistorical_)
{
auto dbResult = getMoreTxns(
startLedgerSeq, lastLedgerSeq, marker);
if (!dbResult)
{
JLOG(m_journal.debug())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " getMoreTxns failed.";
send(rpcError(rpcINTERNAL), true);
return;
}
auto const& txns = dbResult->first;
marker = dbResult->second;
size_t num_txns = txns.size();
for (size_t i = 0; i < num_txns; ++i)
{
auto const& [tx, meta] = txns[i];
if (!tx || !meta)
{
JLOG(m_journal.debug())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " empty tx or meta.";
send(rpcError(rpcINTERNAL), true);
return;
}
auto curTxLedger =
app_.getLedgerMaster().getLedgerBySeq(
tx->getLedger());
if (!curTxLedger)
{
JLOG(m_journal.debug())
<< "AccountHistory job for account "
<< toBase58(accountId) << " no ledger.";
send(rpcError(rpcINTERNAL), true);
return;
}
std::shared_ptr<STTx const> stTxn =
tx->getSTransaction();
if (!stTxn)
{
JLOG(m_journal.debug())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " getSTransaction failed.";
send(rpcError(rpcINTERNAL), true);
return;
}
auto const mRef = std::ref(*meta);
auto const trR = meta->getResultTER();
MultiApiJson jvTx = transJson(
stTxn, trR, true, curTxLedger, mRef);
jvTx.set(
jss::account_history_tx_index,
txHistoryIndex--);
if (i + 1 == num_txns ||
txns[i + 1].first->getLedger() !=
tx->getLedger())
jvTx.set(
jss::account_history_boundary, true);
if (isFirstTx(tx, meta))
{
jvTx.set(
jss::account_history_tx_first, true);
sendMultiApiJson(jvTx, false);
JLOG(m_journal.trace())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " done, found last tx.";
return;
}
else
{
sendMultiApiJson(jvTx, false);
}
}
if (marker)
{
JLOG(m_journal.trace())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " paging, marker=" << marker->ledgerSeq
<< ":" << marker->txnSeq;
}
else
{
break;
}
}
if (!subInfo.index_->stopHistorical_)
{
lastLedgerSeq = startLedgerSeq - 1;
if (lastLedgerSeq <= 1)
{
JLOG(m_journal.trace())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " done, reached genesis ledger.";
return;
}
} }
} }
if (marker) return;
{ },
JLOG(m_journal.trace()) "AccountHistoryTxStream",
<< "AccountHistory job for account " 1s,
<< toBase58(accountId) this->m_journal);
<< " paging, marker=" << marker->ledgerSeq << ":"
<< marker->txnSeq;
}
else
{
break;
}
}
if (!subInfo.index_->stopHistorical_)
{
lastLedgerSeq = startLedgerSeq - 1;
if (lastLedgerSeq <= 1)
{
JLOG(m_journal.trace())
<< "AccountHistory job for account "
<< toBase58(accountId)
<< " done, reached genesis ledger.";
return;
}
}
}
}); });
} }

View File

@@ -25,6 +25,7 @@
#include <xrpld/core/Config.h> #include <xrpld/core/Config.h>
#include <xrpld/core/DatabaseCon.h> #include <xrpld/core/DatabaseCon.h>
#include <xrpld/core/SociDB.h> #include <xrpld/core/SociDB.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/ByteUtilities.h> #include <xrpl/basics/ByteUtilities.h>
#include <xrpl/basics/contract.h> #include <xrpl/basics/contract.h>
@@ -35,6 +36,8 @@
#include <memory> #include <memory>
using namespace std::chrono_literals;
namespace ripple { namespace ripple {
static auto checkpointPageCount = 1000; static auto checkpointPageCount = 1000;
@@ -257,9 +260,17 @@ public:
// There is a separate check in `checkpoint` for a valid // There is a separate check in `checkpoint` for a valid
// connection in the rare case when the DatabaseCon is destroyed // connection in the rare case when the DatabaseCon is destroyed
// after locking this weak_ptr // after locking this weak_ptr
[wp = std::weak_ptr<Checkpointer>{shared_from_this()}]() { [wp = std::weak_ptr<Checkpointer>{shared_from_this()},
if (auto self = wp.lock()) journal = j_]() {
self->checkpoint(); perf::measureDurationAndLog(
[&]() {
if (auto self = wp.lock())
self->checkpoint();
return true;
},
"WAL",
1s,
journal);
})) }))
{ {
std::lock_guard lock(mutex_); std::lock_guard lock(mutex_);

View File

@@ -19,6 +19,7 @@
#include <xrpld/net/RPCCall.h> #include <xrpld/net/RPCCall.h>
#include <xrpld/net/RPCSub.h> #include <xrpld/net/RPCSub.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/Log.h> #include <xrpl/basics/Log.h>
#include <xrpl/basics/StringUtilities.h> #include <xrpl/basics/StringUtilities.h>
@@ -27,6 +28,8 @@
#include <deque> #include <deque>
using namespace std::chrono_literals;
namespace ripple { namespace ripple {
// Subscription object for JSON-RPC // Subscription object for JSON-RPC
@@ -91,8 +94,17 @@ public:
JLOG(j_.info()) << "RPCCall::fromNetwork start"; JLOG(j_.info()) << "RPCCall::fromNetwork start";
mSending = m_jobQueue.addJob( mSending = m_jobQueue.addJob(
jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this]() { jtCLIENT_SUBSCRIBE,
sendThread(); "RPCSub::sendThread",
[this, journal = j_]() {
perf::measureDurationAndLog(
[&]() {
sendThread();
return true;
},
"RPCSub::sendThread",
1s,
journal);
}); });
} }
} }

View File

@@ -1077,8 +1077,17 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMManifests> const& m)
fee_.update(Resource::feeModerateBurdenPeer, "oversize"); fee_.update(Resource::feeModerateBurdenPeer, "oversize");
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtMANIFEST, "receiveManifests", [this, that = shared_from_this(), m]() { jtMANIFEST,
overlay_.onManifests(m, that); "receiveManifests",
[this, that = shared_from_this(), m, journal = p_journal_]() {
perf::measureDurationAndLog(
[&]() {
overlay_.onManifests(m, that);
return true;
},
"receiveManifests",
1s,
journal);
}); });
} }
@@ -1349,10 +1358,18 @@ PeerImp::handleTransaction(
flags, flags,
checkSignature, checkSignature,
batch, batch,
stx]() { stx,
if (auto peer = weak.lock()) journal = p_journal_]() {
peer->checkTransaction( perf::measureDurationAndLog(
flags, checkSignature, stx, batch); [&]() {
if (auto peer = weak.lock())
peer->checkTransaction(
flags, checkSignature, stx, batch);
return true;
},
"recvTransaction->checkTransaction",
1s,
journal);
}); });
} }
} }
@@ -1447,10 +1464,18 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
// Queue a job to process the request // Queue a job to process the request
std::weak_ptr<PeerImp> weak = shared_from_this(); std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m]() { app_.getJobQueue().addJob(
if (auto peer = weak.lock()) jtLEDGER_REQ, "recvGetLedger", [weak, m, journal = p_journal_]() {
peer->processLedgerRequest(m); perf::measureDurationAndLog(
}); [&]() {
if (auto peer = weak.lock())
peer->processLedgerRequest(m);
return true;
},
"recvGetLedger",
1s,
journal);
});
} }
void void
@@ -1468,27 +1493,38 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
Resource::feeModerateBurdenPeer, "received a proof path request"); Resource::feeModerateBurdenPeer, "received a proof path request");
std::weak_ptr<PeerImp> weak = shared_from_this(); std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtREPLAY_REQ, "recvProofPathRequest", [weak, m]() { jtREPLAY_REQ,
if (auto peer = weak.lock()) "recvProofPathRequest",
{ [weak, m, journal = p_journal_]() {
auto reply = perf::measureDurationAndLog(
peer->ledgerReplayMsgHandler_.processProofPathRequest(m); [&]() {
if (reply.has_error()) if (auto peer = weak.lock())
{ {
if (reply.error() == protocol::TMReplyError::reBAD_REQUEST) auto reply = peer->ledgerReplayMsgHandler_
peer->charge( .processProofPathRequest(m);
Resource::feeMalformedRequest, if (reply.has_error())
"proof_path_request"); {
else if (reply.error() ==
peer->charge( protocol::TMReplyError::reBAD_REQUEST)
Resource::feeRequestNoReply, "proof_path_request"); peer->charge(
} Resource::feeMalformedRequest,
else "proof_path_request");
{ else
peer->send(std::make_shared<Message>( peer->charge(
reply, protocol::mtPROOF_PATH_RESPONSE)); Resource::feeRequestNoReply,
} "proof_path_request");
} }
else
{
peer->send(std::make_shared<Message>(
reply, protocol::mtPROOF_PATH_RESPONSE));
}
}
return true;
},
"recvProofPathRequest",
1s,
journal);
}); });
} }
@@ -1522,28 +1558,38 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
fee_.fee = Resource::feeModerateBurdenPeer; fee_.fee = Resource::feeModerateBurdenPeer;
std::weak_ptr<PeerImp> weak = shared_from_this(); std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtREPLAY_REQ, "recvReplayDeltaRequest", [weak, m]() { jtREPLAY_REQ,
if (auto peer = weak.lock()) "recvReplayDeltaRequest",
{ [weak, m, journal = p_journal_]() {
auto reply = perf::measureDurationAndLog(
peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m); [&]() {
if (reply.has_error()) if (auto peer = weak.lock())
{ {
if (reply.error() == protocol::TMReplyError::reBAD_REQUEST) auto reply = peer->ledgerReplayMsgHandler_
peer->charge( .processReplayDeltaRequest(m);
Resource::feeMalformedRequest, if (reply.has_error())
"replay_delta_request"); {
else if (reply.error() ==
peer->charge( protocol::TMReplyError::reBAD_REQUEST)
Resource::feeRequestNoReply, peer->charge(
"replay_delta_request"); Resource::feeMalformedRequest,
} "replay_delta_request");
else else
{ peer->charge(
peer->send(std::make_shared<Message>( Resource::feeRequestNoReply,
reply, protocol::mtREPLAY_DELTA_RESPONSE)); "replay_delta_request");
} }
} else
{
peer->send(std::make_shared<Message>(
reply, protocol::mtREPLAY_DELTA_RESPONSE));
}
}
return true;
},
"recvReplayDeltaRequest",
1s,
journal);
}); });
} }
@@ -1640,12 +1686,21 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
{ {
std::weak_ptr<PeerImp> weak{shared_from_this()}; std::weak_ptr<PeerImp> weak{shared_from_this()};
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtTXN_DATA, "recvPeerData", [weak, ledgerHash, m]() { jtTXN_DATA,
if (auto peer = weak.lock()) "recvPeerData",
{ [weak, ledgerHash, m, journal = p_journal_]() {
peer->app_.getInboundTransactions().gotData( perf::measureDurationAndLog(
ledgerHash, peer, m); [&]() {
} if (auto peer = weak.lock())
{
peer->app_.getInboundTransactions().gotData(
ledgerHash, peer, m);
}
return true;
},
"recvPeerData",
1s,
journal);
}); });
return; return;
} }
@@ -1770,9 +1825,16 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
"recvPropose->checkPropose", "recvPropose->checkPropose",
[weak, isTrusted, m, proposal]() { [weak, isTrusted, m, proposal, journal = p_journal_]() {
if (auto peer = weak.lock()) perf::measureDurationAndLog(
peer->checkPropose(isTrusted, m, proposal); [&]() {
if (auto peer = weak.lock())
peer->checkPropose(isTrusted, m, proposal);
return true;
},
"recvPropose->checkPropose",
1s,
journal);
}); });
} }
@@ -2408,9 +2470,16 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
name, name,
[weak, val, m, key]() { [weak, val, m, key, name, journal = p_journal_]() {
if (auto peer = weak.lock()) perf::measureDurationAndLog(
peer->checkValidation(val, key, m); [&]() {
if (auto peer = weak.lock())
peer->checkValidation(val, key, m);
return true;
},
name,
1s,
journal);
}); });
} }
else else
@@ -2463,9 +2532,18 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
std::weak_ptr<PeerImp> weak = shared_from_this(); std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtREQUESTED_TXN, "doTransactions", [weak, m]() { jtREQUESTED_TXN,
if (auto peer = weak.lock()) "doTransactions",
peer->doTransactions(m); [weak, m, journal = p_journal_]() {
perf::measureDurationAndLog(
[&]() {
if (auto peer = weak.lock())
peer->doTransactions(m);
return true;
},
"doTransactions",
1s,
journal);
}); });
return; return;
} }
@@ -2597,9 +2675,18 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m)
std::weak_ptr<PeerImp> weak = shared_from_this(); std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtMISSING_TXN, "handleHaveTransactions", [weak, m]() { jtMISSING_TXN,
if (auto peer = weak.lock()) "handleHaveTransactions",
peer->handleHaveTransactions(m); [weak, m, journal = p_journal_]() {
perf::measureDurationAndLog(
[&]() {
if (auto peer = weak.lock())
peer->handleHaveTransactions(m);
return true;
},
"handleHaveTransactions",
1s,
journal);
}); });
} }
@@ -2768,8 +2855,18 @@ PeerImp::doFetchPack(std::shared_ptr<protocol::TMGetObjectByHash> const& packet)
auto elapsed = UptimeClock::now(); auto elapsed = UptimeClock::now();
auto const pap = &app_; auto const pap = &app_;
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtPACK, "MakeFetchPack", [pap, weak, packet, hash, elapsed]() { jtPACK,
pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed); "MakeFetchPack",
[pap, weak, packet, hash, elapsed, journal = p_journal_]() {
perf::measureDurationAndLog(
[&]() {
pap->getLedgerMaster().makeFetchPack(
weak, packet, hash, elapsed);
return true;
},
"MakeFetchPack",
1s,
journal);
}); });
} }

View File

@@ -190,20 +190,35 @@ measureDurationAndLog(
std::chrono::duration<Rep, Period> maxDelay, std::chrono::duration<Rep, Period> maxDelay,
beast::Journal const& journal) beast::Journal const& journal)
{ {
using Result = std::invoke_result_t<Func>;
auto start_time = std::chrono::high_resolution_clock::now(); auto start_time = std::chrono::high_resolution_clock::now();
auto result = func(); if constexpr (std::is_void_v<Result>)
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time);
if (duration > maxDelay)
{ {
JLOG(journal.warn()) std::forward<Func>(func)();
<< actionDescription << " took " << duration.count() << " ms"; auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time);
if (duration > maxDelay)
{
JLOG(journal.warn())
<< actionDescription << " took " << duration.count() << " ms";
}
}
else
{
Result result = std::forward<Func>(func)();
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time);
if (duration > maxDelay)
{
JLOG(journal.warn())
<< actionDescription << " took " << duration.count() << " ms";
}
return result;
} }
return result;
} }
} // namespace perf } // namespace perf