Ledger master duration

This commit is contained in:
Valentin Balaschenko
2025-05-22 17:16:38 +01:00
parent 4cc37b9cd8
commit fbda5ccc15
3 changed files with 75 additions and 29 deletions

View File

@@ -37,6 +37,7 @@
#include <xrpld/core/TimeKeeper.h>
#include <xrpld/overlay/Overlay.h>
#include <xrpld/overlay/Peer.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/MathUtilities.h>
@@ -56,6 +57,8 @@
#include <memory>
#include <vector>
using namespace std::chrono_literals;
namespace ripple {
// Don't catch up more than 100 ledgers (cannot exceed 256)
@@ -1361,27 +1364,36 @@ LedgerMaster::tryAdvance()
if (!mAdvanceThread && !mValidLedger.empty())
{
mAdvanceThread = true;
app_.getJobQueue().addJob(jtADVANCE, "advanceLedger", [this]() {
std::unique_lock sl(m_mutex);
app_.getJobQueue().addJob(
jtADVANCE, "advanceLedger", [this, journal = m_journal]() {
perf::measureDurationAndLog(
[&]() {
std::unique_lock sl(m_mutex);
XRPL_ASSERT(
!mValidLedger.empty() && mAdvanceThread,
"ripple::LedgerMaster::tryAdvance : has valid ledger");
XRPL_ASSERT(
!mValidLedger.empty() && mAdvanceThread,
"ripple::LedgerMaster::tryAdvance : has valid "
"ledger");
JLOG(m_journal.trace()) << "advanceThread<";
JLOG(m_journal.trace()) << "advanceThread<";
try
{
doAdvance(sl);
}
catch (std::exception const& ex)
{
JLOG(m_journal.fatal()) << "doAdvance throws: " << ex.what();
}
try
{
doAdvance(sl);
}
catch (std::exception const& ex)
{
JLOG(m_journal.fatal())
<< "doAdvance throws: " << ex.what();
}
mAdvanceThread = false;
JLOG(m_journal.trace()) << "advanceThread>";
});
mAdvanceThread = false;
JLOG(m_journal.trace()) << "advanceThread>";
},
"advanceLedger",
1s,
journal);
});
}
}
@@ -1536,7 +1548,13 @@ LedgerMaster::newPFWork(
<< "newPFWork: Creating job. path find threads: "
<< mPathFindThread;
if (app_.getJobQueue().addJob(
jtUPDATE_PF, name, [this]() { updatePaths(); }))
jtUPDATE_PF, name, [this, journal = m_journal]() {
perf::measureDurationAndLog(
[&]() { updatePaths(); },
"newPFWork: Creating job. path find threads:",
1s,
journal);
}))
{
++mPathFindThread;
}
@@ -1857,8 +1875,11 @@ LedgerMaster::fetchForHistory(
mFillInProgress = seq;
}
app_.getJobQueue().addJob(
jtADVANCE, "tryFill", [this, ledger]() {
tryFill(ledger);
jtADVANCE,
"tryFill",
[this, ledger, journal = m_journal]() {
perf::measureDurationAndLog(
[&]() { tryFill(ledger); }, "tryFill", 1s, journal);
});
}
progress = true;
@@ -2027,10 +2048,17 @@ LedgerMaster::gotFetchPack(bool progress, std::uint32_t seq)
{
if (!mGotFetchPackThread.test_and_set(std::memory_order_acquire))
{
app_.getJobQueue().addJob(jtLEDGER_DATA, "gotFetchPack", [&]() {
app_.getInboundLedgers().gotFetchPack();
mGotFetchPackThread.clear(std::memory_order_release);
});
app_.getJobQueue().addJob(
jtLEDGER_DATA, "gotFetchPack", [this, journal = m_journal]() {
perf::measureDurationAndLog(
[&]() {
app_.getInboundLedgers().gotFetchPack();
mGotFetchPackThread.clear(std::memory_order_release);
},
"gotFetchPack",
1s,
journal);
});
}
}

View File

@@ -19,6 +19,9 @@
#include <xrpld/app/ledger/detail/TimeoutCounter.h>
#include <xrpld/core/JobQueue.h>
#include <xrpld/perflog/PerfLog.h>
using namespace std::chrono_literals;
namespace ripple {
@@ -83,9 +86,15 @@ TimeoutCounter::queueJob(ScopedLockType& sl)
app_.getJobQueue().addJob(
queueJobParameter_.jobType,
queueJobParameter_.jobName,
[wptr = pmDowncast()]() {
if (auto sptr = wptr.lock(); sptr)
sptr->invokeOnTimer();
[wptr = pmDowncast(), journal = journal_]() {
perf::measureDurationAndLog(
[&]() {
if (auto sptr = wptr.lock(); sptr)
sptr->invokeOnTimer();
},
"TimeoutCounter::queueJob",
1s,
journal);
});
}

View File

@@ -23,6 +23,7 @@
#include <xrpld/app/ledger/detail/TransactionAcquire.h>
#include <xrpld/app/main/Application.h>
#include <xrpld/app/misc/NetworkOPs.h>
#include <xrpld/perflog/PerfLog.h>
#include <memory>
@@ -79,8 +80,16 @@ TransactionAcquire::done()
// just updates the consensus and related structures when we acquire
// a transaction set. No need to update them if we're shutting down.
app_.getJobQueue().addJob(
jtTXN_DATA, "completeAcquire", [pap, hash, map]() {
pap->getInboundTransactions().giveSet(hash, map, true);
jtTXN_DATA,
"completeAcquire",
[pap, hash, map, journal = journal_]() {
perf::measureDurationAndLog(
[&]() {
pap->getInboundTransactions().giveSet(hash, map, true);
},
"completeAcquire",
1s,
journal);
});
}
}