mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-19 10:35:50 +00:00
Compare commits
6 Commits
Bronek/len
...
vlntb/job-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3592bd43c1 | ||
|
|
aa14097b02 | ||
|
|
8a5f95c223 | ||
|
|
fbda5ccc15 | ||
|
|
4cc37b9cd8 | ||
|
|
3495cb3753 |
@@ -29,7 +29,7 @@ Loop: xrpld.core xrpld.net
|
|||||||
xrpld.net > xrpld.core
|
xrpld.net > xrpld.core
|
||||||
|
|
||||||
Loop: xrpld.core xrpld.perflog
|
Loop: xrpld.core xrpld.perflog
|
||||||
xrpld.perflog == xrpld.core
|
xrpld.perflog ~= xrpld.core
|
||||||
|
|
||||||
Loop: xrpld.net xrpld.rpc
|
Loop: xrpld.net xrpld.rpc
|
||||||
xrpld.rpc ~= xrpld.net
|
xrpld.rpc ~= xrpld.net
|
||||||
|
|||||||
@@ -162,6 +162,7 @@ xrpld.ledger > xrpl.basics
|
|||||||
xrpld.ledger > xrpl.json
|
xrpld.ledger > xrpl.json
|
||||||
xrpld.ledger > xrpl.protocol
|
xrpld.ledger > xrpl.protocol
|
||||||
xrpld.net > xrpl.basics
|
xrpld.net > xrpl.basics
|
||||||
|
xrpld.net > xrpld.perflog
|
||||||
xrpld.net > xrpl.json
|
xrpld.net > xrpl.json
|
||||||
xrpld.net > xrpl.protocol
|
xrpld.net > xrpl.protocol
|
||||||
xrpld.net > xrpl.resource
|
xrpld.net > xrpl.resource
|
||||||
|
|||||||
@@ -37,6 +37,7 @@
|
|||||||
#include <xrpld/consensus/LedgerTiming.h>
|
#include <xrpld/consensus/LedgerTiming.h>
|
||||||
#include <xrpld/overlay/Overlay.h>
|
#include <xrpld/overlay/Overlay.h>
|
||||||
#include <xrpld/overlay/predicates.h>
|
#include <xrpld/overlay/predicates.h>
|
||||||
|
#include <xrpld/perflog/PerfLog.h>
|
||||||
|
|
||||||
#include <xrpl/basics/random.h>
|
#include <xrpl/basics/random.h>
|
||||||
#include <xrpl/beast/core/LexicalCast.h>
|
#include <xrpl/beast/core/LexicalCast.h>
|
||||||
@@ -49,6 +50,8 @@
|
|||||||
#include <iomanip>
|
#include <iomanip>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
RCLConsensus::RCLConsensus(
|
RCLConsensus::RCLConsensus(
|
||||||
@@ -140,11 +143,17 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
|
|||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtADVANCE,
|
jtADVANCE,
|
||||||
"getConsensusLedger1",
|
"getConsensusLedger1",
|
||||||
[id = hash, &app = app_, this]() {
|
[id = hash, &app = app_, this, journal = j_]() {
|
||||||
JLOG(j_.debug())
|
perf::measureDurationAndLog(
|
||||||
<< "JOB advanceLedger getConsensusLedger1 started";
|
[&]() {
|
||||||
app.getInboundLedgers().acquireAsync(
|
JLOG(j_.debug()) << "JOB advanceLedger "
|
||||||
id, 0, InboundLedger::Reason::CONSENSUS);
|
"getConsensusLedger1 started";
|
||||||
|
app.getInboundLedgers().acquireAsync(
|
||||||
|
id, 0, InboundLedger::Reason::CONSENSUS);
|
||||||
|
},
|
||||||
|
"getConsensusLedger1",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
@@ -442,21 +451,27 @@ RCLConsensus::Adaptor::onAccept(
|
|||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtACCEPT,
|
jtACCEPT,
|
||||||
"acceptLedger",
|
"acceptLedger",
|
||||||
[=, this, cj = std::move(consensusJson)]() mutable {
|
[=, this, cj = std::move(consensusJson), journal = j_]() mutable {
|
||||||
// Note that no lock is held or acquired during this job.
|
perf::measureDurationAndLog(
|
||||||
// This is because generic Consensus guarantees that once a ledger
|
[&]() {
|
||||||
// is accepted, the consensus results and capture by reference state
|
// Note that no lock is held or acquired during this job.
|
||||||
// will not change until startRound is called (which happens via
|
// This is because generic Consensus guarantees that once a
|
||||||
// endConsensus).
|
// ledger is accepted, the consensus results and capture by
|
||||||
RclConsensusLogger clog("onAccept", validating, j_);
|
// reference state will not change until startRound is
|
||||||
this->doAccept(
|
// called (which happens via endConsensus).
|
||||||
result,
|
RclConsensusLogger clog("onAccept", validating, j_);
|
||||||
prevLedger,
|
this->doAccept(
|
||||||
closeResolution,
|
result,
|
||||||
rawCloseTimes,
|
prevLedger,
|
||||||
mode,
|
closeResolution,
|
||||||
std::move(cj));
|
rawCloseTimes,
|
||||||
this->app_.getOPs().endConsensus(clog.ss());
|
mode,
|
||||||
|
std::move(cj));
|
||||||
|
this->app_.getOPs().endConsensus(clog.ss());
|
||||||
|
},
|
||||||
|
"acceptLedger",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -32,6 +32,8 @@
|
|||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
RCLValidatedLedger::RCLValidatedLedger(MakeGenesis)
|
RCLValidatedLedger::RCLValidatedLedger(MakeGenesis)
|
||||||
@@ -142,11 +144,19 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash)
|
|||||||
Application* pApp = &app_;
|
Application* pApp = &app_;
|
||||||
|
|
||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtADVANCE, "getConsensusLedger2", [pApp, hash, this]() {
|
jtADVANCE,
|
||||||
JLOG(j_.debug())
|
"getConsensusLedger2",
|
||||||
<< "JOB advanceLedger getConsensusLedger2 started";
|
[pApp, hash, this, journal = j_]() {
|
||||||
pApp->getInboundLedgers().acquireAsync(
|
perf::measureDurationAndLog(
|
||||||
hash, 0, InboundLedger::Reason::CONSENSUS);
|
[&]() {
|
||||||
|
JLOG(j_.debug())
|
||||||
|
<< "JOB advanceLedger getConsensusLedger2 started";
|
||||||
|
pApp->getInboundLedgers().acquireAsync(
|
||||||
|
hash, 0, InboundLedger::Reason::CONSENSUS);
|
||||||
|
},
|
||||||
|
"getConsensusLedger2",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,11 +23,14 @@
|
|||||||
#include <xrpld/app/misc/Transaction.h>
|
#include <xrpld/app/misc/Transaction.h>
|
||||||
#include <xrpld/core/JobQueue.h>
|
#include <xrpld/core/JobQueue.h>
|
||||||
#include <xrpld/nodestore/Database.h>
|
#include <xrpld/nodestore/Database.h>
|
||||||
|
#include <xrpld/perflog/PerfLog.h>
|
||||||
|
|
||||||
#include <xrpl/basics/Log.h>
|
#include <xrpl/basics/Log.h>
|
||||||
#include <xrpl/protocol/HashPrefix.h>
|
#include <xrpl/protocol/HashPrefix.h>
|
||||||
#include <xrpl/protocol/digest.h>
|
#include <xrpl/protocol/digest.h>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
ConsensusTransSetSF::ConsensusTransSetSF(Application& app, NodeCache& nodeCache)
|
ConsensusTransSetSF::ConsensusTransSetSF(Application& app, NodeCache& nodeCache)
|
||||||
@@ -65,9 +68,14 @@ ConsensusTransSetSF::gotNode(
|
|||||||
"ripple::ConsensusTransSetSF::gotNode : transaction hash "
|
"ripple::ConsensusTransSetSF::gotNode : transaction hash "
|
||||||
"match");
|
"match");
|
||||||
auto const pap = &app_;
|
auto const pap = &app_;
|
||||||
app_.getJobQueue().addJob(jtTRANSACTION, "TXS->TXN", [pap, stx]() {
|
app_.getJobQueue().addJob(
|
||||||
pap->getOPs().submitTransaction(stx);
|
jtTRANSACTION, "TXS->TXN", [pap, stx, journal = j_]() {
|
||||||
});
|
perf::measureDurationAndLog(
|
||||||
|
[&]() { pap->getOPs().submitTransaction(stx); },
|
||||||
|
"TXS->TXN",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
catch (std::exception const& ex)
|
catch (std::exception const& ex)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -30,6 +30,7 @@
|
|||||||
#include <xrpld/core/SociDB.h>
|
#include <xrpld/core/SociDB.h>
|
||||||
#include <xrpld/nodestore/Database.h>
|
#include <xrpld/nodestore/Database.h>
|
||||||
#include <xrpld/nodestore/detail/DatabaseNodeImp.h>
|
#include <xrpld/nodestore/detail/DatabaseNodeImp.h>
|
||||||
|
#include <xrpld/perflog/PerfLog.h>
|
||||||
|
|
||||||
#include <xrpl/basics/Log.h>
|
#include <xrpl/basics/Log.h>
|
||||||
#include <xrpl/basics/contract.h>
|
#include <xrpl/basics/contract.h>
|
||||||
@@ -46,6 +47,8 @@
|
|||||||
#include <utility>
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
create_genesis_t const create_genesis{};
|
create_genesis_t const create_genesis{};
|
||||||
@@ -1028,7 +1031,12 @@ pendSaveValidated(
|
|||||||
isCurrent ? jtPUBLEDGER : jtPUBOLDLEDGER,
|
isCurrent ? jtPUBLEDGER : jtPUBOLDLEDGER,
|
||||||
std::to_string(ledger->seq()),
|
std::to_string(ledger->seq()),
|
||||||
[&app, ledger, isCurrent]() {
|
[&app, ledger, isCurrent]() {
|
||||||
saveValidatedLedger(app, ledger, isCurrent);
|
beast::Journal journal = app.journal("Ledger");
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() { saveValidatedLedger(app, ledger, isCurrent); },
|
||||||
|
"OrderBookDB::update:",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
}))
|
}))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
|
|||||||
@@ -24,10 +24,13 @@
|
|||||||
#include <xrpld/app/misc/NetworkOPs.h>
|
#include <xrpld/app/misc/NetworkOPs.h>
|
||||||
#include <xrpld/core/Config.h>
|
#include <xrpld/core/Config.h>
|
||||||
#include <xrpld/core/JobQueue.h>
|
#include <xrpld/core/JobQueue.h>
|
||||||
|
#include <xrpld/perflog/PerfLog.h>
|
||||||
|
|
||||||
#include <xrpl/basics/Log.h>
|
#include <xrpl/basics/Log.h>
|
||||||
#include <xrpl/protocol/Indexes.h>
|
#include <xrpl/protocol/Indexes.h>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
OrderBookDB::OrderBookDB(Application& app)
|
OrderBookDB::OrderBookDB(Application& app)
|
||||||
@@ -69,7 +72,13 @@ OrderBookDB::setup(std::shared_ptr<ReadView const> const& ledger)
|
|||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtUPDATE_PF,
|
jtUPDATE_PF,
|
||||||
"OrderBookDB::update: " + std::to_string(ledger->seq()),
|
"OrderBookDB::update: " + std::to_string(ledger->seq()),
|
||||||
[this, ledger]() { update(ledger); });
|
[this, ledger, journal = j_]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() { update(ledger); },
|
||||||
|
"OrderBookDB::update:",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,7 @@
|
|||||||
#include <xrpld/app/main/Application.h>
|
#include <xrpld/app/main/Application.h>
|
||||||
#include <xrpld/core/JobQueue.h>
|
#include <xrpld/core/JobQueue.h>
|
||||||
#include <xrpld/overlay/Overlay.h>
|
#include <xrpld/overlay/Overlay.h>
|
||||||
|
#include <xrpld/perflog/PerfLog.h>
|
||||||
#include <xrpld/shamap/SHAMapNodeID.h>
|
#include <xrpld/shamap/SHAMapNodeID.h>
|
||||||
|
|
||||||
#include <xrpl/basics/Log.h>
|
#include <xrpl/basics/Log.h>
|
||||||
@@ -37,6 +38,8 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <random>
|
#include <random>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
using namespace std::chrono_literals;
|
using namespace std::chrono_literals;
|
||||||
@@ -473,15 +476,24 @@ InboundLedger::done()
|
|||||||
|
|
||||||
// We hold the PeerSet lock, so must dispatch
|
// We hold the PeerSet lock, so must dispatch
|
||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() {
|
jtLEDGER_DATA,
|
||||||
if (self->complete_ && !self->failed_)
|
"AcquisitionDone",
|
||||||
{
|
[self = shared_from_this(), journal = journal_]() {
|
||||||
self->app_.getLedgerMaster().checkAccept(self->getLedger());
|
perf::measureDurationAndLog(
|
||||||
self->app_.getLedgerMaster().tryAdvance();
|
[&]() {
|
||||||
}
|
if (self->complete_ && !self->failed_)
|
||||||
else
|
{
|
||||||
self->app_.getInboundLedgers().logFailure(
|
self->app_.getLedgerMaster().checkAccept(
|
||||||
self->hash_, self->mSeq);
|
self->getLedger());
|
||||||
|
self->app_.getLedgerMaster().tryAdvance();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
self->app_.getInboundLedgers().logFailure(
|
||||||
|
self->hash_, self->mSeq);
|
||||||
|
},
|
||||||
|
"AcquisitionDone",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -35,6 +35,8 @@
|
|||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
class InboundLedgersImp : public InboundLedgers
|
class InboundLedgersImp : public InboundLedgers
|
||||||
@@ -212,8 +214,14 @@ public:
|
|||||||
// dispatch
|
// dispatch
|
||||||
if (ledger->gotData(std::weak_ptr<Peer>(peer), packet))
|
if (ledger->gotData(std::weak_ptr<Peer>(peer), packet))
|
||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtLEDGER_DATA, "processLedgerData", [ledger]() {
|
jtLEDGER_DATA,
|
||||||
ledger->runData();
|
"processLedgerData",
|
||||||
|
[ledger, journal = j_]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() { ledger->runData(); },
|
||||||
|
"processLedgerData",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
@@ -227,8 +235,12 @@ public:
|
|||||||
if (packet->type() == protocol::liAS_NODE)
|
if (packet->type() == protocol::liAS_NODE)
|
||||||
{
|
{
|
||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtLEDGER_DATA, "gotStaleData", [this, packet]() {
|
jtLEDGER_DATA, "gotStaleData", [this, packet, journal = j_]() {
|
||||||
gotStaleData(packet);
|
perf::measureDurationAndLog(
|
||||||
|
[&]() { gotStaleData(packet); },
|
||||||
|
"gotStaleData",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,9 @@
|
|||||||
#include <xrpld/app/main/Application.h>
|
#include <xrpld/app/main/Application.h>
|
||||||
#include <xrpld/core/JobQueue.h>
|
#include <xrpld/core/JobQueue.h>
|
||||||
#include <xrpld/overlay/PeerSet.h>
|
#include <xrpld/overlay/PeerSet.h>
|
||||||
|
#include <xrpld/perflog/PerfLog.h>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
@@ -244,22 +247,31 @@ LedgerDeltaAcquire::onLedgerBuilt(
|
|||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtREPLAY_TASK,
|
jtREPLAY_TASK,
|
||||||
"onLedgerBuilt",
|
"onLedgerBuilt",
|
||||||
[=, ledger = this->fullLedger_, &app = this->app_]() {
|
[=,
|
||||||
for (auto reason : reasons)
|
ledger = this->fullLedger_,
|
||||||
{
|
&app = this->app_,
|
||||||
switch (reason)
|
journal = journal_]() {
|
||||||
{
|
perf::measureDurationAndLog(
|
||||||
case InboundLedger::Reason::GENERIC:
|
[&]() {
|
||||||
app.getLedgerMaster().storeLedger(ledger);
|
for (auto reason : reasons)
|
||||||
break;
|
{
|
||||||
default:
|
switch (reason)
|
||||||
// TODO for other use cases
|
{
|
||||||
break;
|
case InboundLedger::Reason::GENERIC:
|
||||||
}
|
app.getLedgerMaster().storeLedger(ledger);
|
||||||
}
|
break;
|
||||||
|
default:
|
||||||
|
// TODO for other use cases
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (firstTime)
|
if (firstTime)
|
||||||
app.getLedgerMaster().tryAdvance();
|
app.getLedgerMaster().tryAdvance();
|
||||||
|
},
|
||||||
|
"onLedgerBuilt",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -37,6 +37,7 @@
|
|||||||
#include <xrpld/core/TimeKeeper.h>
|
#include <xrpld/core/TimeKeeper.h>
|
||||||
#include <xrpld/overlay/Overlay.h>
|
#include <xrpld/overlay/Overlay.h>
|
||||||
#include <xrpld/overlay/Peer.h>
|
#include <xrpld/overlay/Peer.h>
|
||||||
|
#include <xrpld/perflog/PerfLog.h>
|
||||||
|
|
||||||
#include <xrpl/basics/Log.h>
|
#include <xrpl/basics/Log.h>
|
||||||
#include <xrpl/basics/MathUtilities.h>
|
#include <xrpl/basics/MathUtilities.h>
|
||||||
@@ -56,6 +57,8 @@
|
|||||||
#include <memory>
|
#include <memory>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
// Don't catch up more than 100 ledgers (cannot exceed 256)
|
// Don't catch up more than 100 ledgers (cannot exceed 256)
|
||||||
@@ -1361,27 +1364,36 @@ LedgerMaster::tryAdvance()
|
|||||||
if (!mAdvanceThread && !mValidLedger.empty())
|
if (!mAdvanceThread && !mValidLedger.empty())
|
||||||
{
|
{
|
||||||
mAdvanceThread = true;
|
mAdvanceThread = true;
|
||||||
app_.getJobQueue().addJob(jtADVANCE, "advanceLedger", [this]() {
|
app_.getJobQueue().addJob(
|
||||||
std::unique_lock sl(m_mutex);
|
jtADVANCE, "advanceLedger", [this, journal = m_journal]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() {
|
||||||
|
std::unique_lock sl(m_mutex);
|
||||||
|
|
||||||
XRPL_ASSERT(
|
XRPL_ASSERT(
|
||||||
!mValidLedger.empty() && mAdvanceThread,
|
!mValidLedger.empty() && mAdvanceThread,
|
||||||
"ripple::LedgerMaster::tryAdvance : has valid ledger");
|
"ripple::LedgerMaster::tryAdvance : has valid "
|
||||||
|
"ledger");
|
||||||
|
|
||||||
JLOG(m_journal.trace()) << "advanceThread<";
|
JLOG(m_journal.trace()) << "advanceThread<";
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
doAdvance(sl);
|
doAdvance(sl);
|
||||||
}
|
}
|
||||||
catch (std::exception const& ex)
|
catch (std::exception const& ex)
|
||||||
{
|
{
|
||||||
JLOG(m_journal.fatal()) << "doAdvance throws: " << ex.what();
|
JLOG(m_journal.fatal())
|
||||||
}
|
<< "doAdvance throws: " << ex.what();
|
||||||
|
}
|
||||||
|
|
||||||
mAdvanceThread = false;
|
mAdvanceThread = false;
|
||||||
JLOG(m_journal.trace()) << "advanceThread>";
|
JLOG(m_journal.trace()) << "advanceThread>";
|
||||||
});
|
},
|
||||||
|
"advanceLedger",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1536,7 +1548,13 @@ LedgerMaster::newPFWork(
|
|||||||
<< "newPFWork: Creating job. path find threads: "
|
<< "newPFWork: Creating job. path find threads: "
|
||||||
<< mPathFindThread;
|
<< mPathFindThread;
|
||||||
if (app_.getJobQueue().addJob(
|
if (app_.getJobQueue().addJob(
|
||||||
jtUPDATE_PF, name, [this]() { updatePaths(); }))
|
jtUPDATE_PF, name, [this, journal = m_journal]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() { updatePaths(); },
|
||||||
|
"newPFWork: Creating job. path find threads:",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
|
}))
|
||||||
{
|
{
|
||||||
++mPathFindThread;
|
++mPathFindThread;
|
||||||
}
|
}
|
||||||
@@ -1857,8 +1875,11 @@ LedgerMaster::fetchForHistory(
|
|||||||
mFillInProgress = seq;
|
mFillInProgress = seq;
|
||||||
}
|
}
|
||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtADVANCE, "tryFill", [this, ledger]() {
|
jtADVANCE,
|
||||||
tryFill(ledger);
|
"tryFill",
|
||||||
|
[this, ledger, journal = m_journal]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() { tryFill(ledger); }, "tryFill", 1s, journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
progress = true;
|
progress = true;
|
||||||
@@ -2027,10 +2048,17 @@ LedgerMaster::gotFetchPack(bool progress, std::uint32_t seq)
|
|||||||
{
|
{
|
||||||
if (!mGotFetchPackThread.test_and_set(std::memory_order_acquire))
|
if (!mGotFetchPackThread.test_and_set(std::memory_order_acquire))
|
||||||
{
|
{
|
||||||
app_.getJobQueue().addJob(jtLEDGER_DATA, "gotFetchPack", [&]() {
|
app_.getJobQueue().addJob(
|
||||||
app_.getInboundLedgers().gotFetchPack();
|
jtLEDGER_DATA, "gotFetchPack", [this, journal = m_journal]() {
|
||||||
mGotFetchPackThread.clear(std::memory_order_release);
|
perf::measureDurationAndLog(
|
||||||
});
|
[&]() {
|
||||||
|
app_.getInboundLedgers().gotFetchPack();
|
||||||
|
mGotFetchPackThread.clear(std::memory_order_release);
|
||||||
|
},
|
||||||
|
"gotFetchPack",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,6 +19,9 @@
|
|||||||
|
|
||||||
#include <xrpld/app/ledger/detail/TimeoutCounter.h>
|
#include <xrpld/app/ledger/detail/TimeoutCounter.h>
|
||||||
#include <xrpld/core/JobQueue.h>
|
#include <xrpld/core/JobQueue.h>
|
||||||
|
#include <xrpld/perflog/PerfLog.h>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
@@ -83,9 +86,15 @@ TimeoutCounter::queueJob(ScopedLockType& sl)
|
|||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
queueJobParameter_.jobType,
|
queueJobParameter_.jobType,
|
||||||
queueJobParameter_.jobName,
|
queueJobParameter_.jobName,
|
||||||
[wptr = pmDowncast()]() {
|
[wptr = pmDowncast(), journal = journal_]() {
|
||||||
if (auto sptr = wptr.lock(); sptr)
|
perf::measureDurationAndLog(
|
||||||
sptr->invokeOnTimer();
|
[&]() {
|
||||||
|
if (auto sptr = wptr.lock(); sptr)
|
||||||
|
sptr->invokeOnTimer();
|
||||||
|
},
|
||||||
|
"TimeoutCounter::queueJob",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -23,6 +23,7 @@
|
|||||||
#include <xrpld/app/ledger/detail/TransactionAcquire.h>
|
#include <xrpld/app/ledger/detail/TransactionAcquire.h>
|
||||||
#include <xrpld/app/main/Application.h>
|
#include <xrpld/app/main/Application.h>
|
||||||
#include <xrpld/app/misc/NetworkOPs.h>
|
#include <xrpld/app/misc/NetworkOPs.h>
|
||||||
|
#include <xrpld/perflog/PerfLog.h>
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
@@ -79,8 +80,16 @@ TransactionAcquire::done()
|
|||||||
// just updates the consensus and related structures when we acquire
|
// just updates the consensus and related structures when we acquire
|
||||||
// a transaction set. No need to update them if we're shutting down.
|
// a transaction set. No need to update them if we're shutting down.
|
||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtTXN_DATA, "completeAcquire", [pap, hash, map]() {
|
jtTXN_DATA,
|
||||||
pap->getInboundTransactions().giveSet(hash, map, true);
|
"completeAcquire",
|
||||||
|
[pap, hash, map, journal = journal_]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() {
|
||||||
|
pap->getInboundTransactions().giveSet(hash, map, true);
|
||||||
|
},
|
||||||
|
"completeAcquire",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -81,6 +81,8 @@
|
|||||||
#include <tuple>
|
#include <tuple>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
class NetworkOPsImp final : public NetworkOPs
|
class NetworkOPsImp final : public NetworkOPs
|
||||||
@@ -994,9 +996,16 @@ NetworkOPsImp::setHeartbeatTimer()
|
|||||||
heartbeatTimer_,
|
heartbeatTimer_,
|
||||||
mConsensus.parms().ledgerGRANULARITY,
|
mConsensus.parms().ledgerGRANULARITY,
|
||||||
[this]() {
|
[this]() {
|
||||||
m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() {
|
m_job_queue.addJob(
|
||||||
processHeartbeatTimer();
|
jtNETOP_TIMER,
|
||||||
});
|
"NetOPs.heartbeat",
|
||||||
|
[this, journal = m_journal]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() { processHeartbeatTimer(); },
|
||||||
|
"NetOPs.heartbeat",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
|
});
|
||||||
},
|
},
|
||||||
[this]() { setHeartbeatTimer(); });
|
[this]() { setHeartbeatTimer(); });
|
||||||
}
|
}
|
||||||
@@ -1010,9 +1019,16 @@ NetworkOPsImp::setClusterTimer()
|
|||||||
clusterTimer_,
|
clusterTimer_,
|
||||||
10s,
|
10s,
|
||||||
[this]() {
|
[this]() {
|
||||||
m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this]() {
|
m_job_queue.addJob(
|
||||||
processClusterTimer();
|
jtNETOP_CLUSTER,
|
||||||
});
|
"NetOPs.cluster",
|
||||||
|
[this, journal = m_journal]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() { processClusterTimer(); },
|
||||||
|
"NetOPs.cluster",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
|
});
|
||||||
},
|
},
|
||||||
[this]() { setClusterTimer(); });
|
[this]() { setClusterTimer(); });
|
||||||
}
|
}
|
||||||
@@ -1229,10 +1245,17 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
|
|||||||
|
|
||||||
auto tx = std::make_shared<Transaction>(trans, reason, app_);
|
auto tx = std::make_shared<Transaction>(trans, reason, app_);
|
||||||
|
|
||||||
m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() {
|
m_job_queue.addJob(
|
||||||
auto t = tx;
|
jtTRANSACTION, "submitTxn", [this, tx, journal = m_journal]() {
|
||||||
processTransaction(t, false, false, FailHard::no);
|
perf::measureDurationAndLog(
|
||||||
});
|
[&]() {
|
||||||
|
auto t = tx;
|
||||||
|
processTransaction(t, false, false, FailHard::no);
|
||||||
|
},
|
||||||
|
"submitTxn",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
@@ -1315,7 +1338,13 @@ NetworkOPsImp::doTransactionAsync(
|
|||||||
if (mDispatchState == DispatchState::none)
|
if (mDispatchState == DispatchState::none)
|
||||||
{
|
{
|
||||||
if (m_job_queue.addJob(
|
if (m_job_queue.addJob(
|
||||||
jtBATCH, "transactionBatch", [this]() { transactionBatch(); }))
|
jtBATCH, "transactionBatch", [this, journal = m_journal]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() { transactionBatch(); },
|
||||||
|
"transactionBatch",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
|
}))
|
||||||
{
|
{
|
||||||
mDispatchState = DispatchState::scheduled;
|
mDispatchState = DispatchState::scheduled;
|
||||||
}
|
}
|
||||||
@@ -1362,9 +1391,16 @@ NetworkOPsImp::doTransactionSyncBatch(
|
|||||||
if (mTransactions.size())
|
if (mTransactions.size())
|
||||||
{
|
{
|
||||||
// More transactions need to be applied, but by another job.
|
// More transactions need to be applied, but by another job.
|
||||||
if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() {
|
if (m_job_queue.addJob(
|
||||||
transactionBatch();
|
jtBATCH,
|
||||||
}))
|
"transactionBatch",
|
||||||
|
[this, journal = m_journal]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() { transactionBatch(); },
|
||||||
|
"transactionBatch",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
|
}))
|
||||||
{
|
{
|
||||||
mDispatchState = DispatchState::scheduled;
|
mDispatchState = DispatchState::scheduled;
|
||||||
}
|
}
|
||||||
@@ -3175,8 +3211,17 @@ NetworkOPsImp::reportFeeChange()
|
|||||||
if (f != mLastFeeSummary)
|
if (f != mLastFeeSummary)
|
||||||
{
|
{
|
||||||
m_job_queue.addJob(
|
m_job_queue.addJob(
|
||||||
jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this]() {
|
jtCLIENT_FEE_CHANGE,
|
||||||
pubServer();
|
"reportFeeChange->pubServer",
|
||||||
|
[this, journal = m_journal]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() {
|
||||||
|
pubServer();
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
"reportFeeChange->pubServer",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -3187,7 +3232,16 @@ NetworkOPsImp::reportConsensusStateChange(ConsensusPhase phase)
|
|||||||
m_job_queue.addJob(
|
m_job_queue.addJob(
|
||||||
jtCLIENT_CONSENSUS,
|
jtCLIENT_CONSENSUS,
|
||||||
"reportConsensusStateChange->pubConsensus",
|
"reportConsensusStateChange->pubConsensus",
|
||||||
[this, phase]() { pubConsensus(phase); });
|
[this, phase, journal = m_journal]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() {
|
||||||
|
pubConsensus(phase);
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
"reportConsensusStateChange->pubConsensus",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void
|
inline void
|
||||||
@@ -3693,262 +3747,301 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
|
|||||||
jtCLIENT_ACCT_HIST,
|
jtCLIENT_ACCT_HIST,
|
||||||
"AccountHistoryTxStream",
|
"AccountHistoryTxStream",
|
||||||
[this, dbType = databaseType, subInfo]() {
|
[this, dbType = databaseType, subInfo]() {
|
||||||
auto const& accountId = subInfo.index_->accountId_;
|
perf::measureDurationAndLog(
|
||||||
auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
|
[&]() {
|
||||||
auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
|
auto const& accountId = subInfo.index_->accountId_;
|
||||||
|
auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
|
||||||
|
auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
|
||||||
|
|
||||||
JLOG(m_journal.trace())
|
|
||||||
<< "AccountHistory job for account " << toBase58(accountId)
|
|
||||||
<< " started. lastLedgerSeq=" << lastLedgerSeq;
|
|
||||||
|
|
||||||
auto isFirstTx = [&](std::shared_ptr<Transaction> const& tx,
|
|
||||||
std::shared_ptr<TxMeta> const& meta) -> bool {
|
|
||||||
/*
|
|
||||||
* genesis account: first tx is the one with seq 1
|
|
||||||
* other account: first tx is the one created the account
|
|
||||||
*/
|
|
||||||
if (accountId == genesisAccountId)
|
|
||||||
{
|
|
||||||
auto stx = tx->getSTransaction();
|
|
||||||
if (stx->getAccountID(sfAccount) == accountId &&
|
|
||||||
stx->getSeqValue() == 1)
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (auto& node : meta->getNodes())
|
|
||||||
{
|
|
||||||
if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
if (node.isFieldPresent(sfNewFields))
|
|
||||||
{
|
|
||||||
if (auto inner = dynamic_cast<STObject const*>(
|
|
||||||
node.peekAtPField(sfNewFields));
|
|
||||||
inner)
|
|
||||||
{
|
|
||||||
if (inner->isFieldPresent(sfAccount) &&
|
|
||||||
inner->getAccountID(sfAccount) == accountId)
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
|
|
||||||
auto send = [&](Json::Value const& jvObj,
|
|
||||||
bool unsubscribe) -> bool {
|
|
||||||
if (auto sptr = subInfo.sinkWptr_.lock())
|
|
||||||
{
|
|
||||||
sptr->send(jvObj, true);
|
|
||||||
if (unsubscribe)
|
|
||||||
unsubAccountHistory(sptr, accountId, false);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
|
|
||||||
auto sendMultiApiJson = [&](MultiApiJson const& jvObj,
|
|
||||||
bool unsubscribe) -> bool {
|
|
||||||
if (auto sptr = subInfo.sinkWptr_.lock())
|
|
||||||
{
|
|
||||||
jvObj.visit(
|
|
||||||
sptr->getApiVersion(), //
|
|
||||||
[&](Json::Value const& jv) { sptr->send(jv, true); });
|
|
||||||
|
|
||||||
if (unsubscribe)
|
|
||||||
unsubAccountHistory(sptr, accountId, false);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
|
|
||||||
auto getMoreTxns =
|
|
||||||
[&](std::uint32_t minLedger,
|
|
||||||
std::uint32_t maxLedger,
|
|
||||||
std::optional<RelationalDatabase::AccountTxMarker> marker)
|
|
||||||
-> std::optional<std::pair<
|
|
||||||
RelationalDatabase::AccountTxs,
|
|
||||||
std::optional<RelationalDatabase::AccountTxMarker>>> {
|
|
||||||
switch (dbType)
|
|
||||||
{
|
|
||||||
case Sqlite: {
|
|
||||||
auto db = static_cast<SQLiteDatabase*>(
|
|
||||||
&app_.getRelationalDatabase());
|
|
||||||
RelationalDatabase::AccountTxPageOptions options{
|
|
||||||
accountId, minLedger, maxLedger, marker, 0, true};
|
|
||||||
return db->newestAccountTxPage(options);
|
|
||||||
}
|
|
||||||
default: {
|
|
||||||
UNREACHABLE(
|
|
||||||
"ripple::NetworkOPsImp::addAccountHistoryJob::"
|
|
||||||
"getMoreTxns : invalid database type");
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/*
|
|
||||||
* search backward until the genesis ledger or asked to stop
|
|
||||||
*/
|
|
||||||
while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_)
|
|
||||||
{
|
|
||||||
int feeChargeCount = 0;
|
|
||||||
if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
|
|
||||||
{
|
|
||||||
sptr->getConsumer().charge(Resource::feeMediumBurdenRPC);
|
|
||||||
++feeChargeCount;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
JLOG(m_journal.trace())
|
JLOG(m_journal.trace())
|
||||||
<< "AccountHistory job for account "
|
<< "AccountHistory job for account "
|
||||||
<< toBase58(accountId) << " no InfoSub. Fee charged "
|
<< toBase58(accountId)
|
||||||
<< feeChargeCount << " times.";
|
<< " started. lastLedgerSeq=" << lastLedgerSeq;
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// try to search in 1024 ledgers till reaching genesis ledgers
|
auto isFirstTx =
|
||||||
auto startLedgerSeq =
|
[&](std::shared_ptr<Transaction> const& tx,
|
||||||
(lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
|
std::shared_ptr<TxMeta> const& meta) -> bool {
|
||||||
JLOG(m_journal.trace())
|
/*
|
||||||
<< "AccountHistory job for account " << toBase58(accountId)
|
* genesis account: first tx is the one with seq 1
|
||||||
<< ", working on ledger range [" << startLedgerSeq << ","
|
* other account: first tx is the one created the
|
||||||
<< lastLedgerSeq << "]";
|
* account
|
||||||
|
*/
|
||||||
|
if (accountId == genesisAccountId)
|
||||||
|
{
|
||||||
|
auto stx = tx->getSTransaction();
|
||||||
|
if (stx->getAccountID(sfAccount) == accountId &&
|
||||||
|
stx->getSeqValue() == 1)
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
auto haveRange = [&]() -> bool {
|
for (auto& node : meta->getNodes())
|
||||||
std::uint32_t validatedMin = UINT_MAX;
|
{
|
||||||
std::uint32_t validatedMax = 0;
|
if (node.getFieldU16(sfLedgerEntryType) !=
|
||||||
auto haveSomeValidatedLedgers =
|
ltACCOUNT_ROOT)
|
||||||
app_.getLedgerMaster().getValidatedRange(
|
continue;
|
||||||
validatedMin, validatedMax);
|
|
||||||
|
|
||||||
return haveSomeValidatedLedgers &&
|
if (node.isFieldPresent(sfNewFields))
|
||||||
validatedMin <= startLedgerSeq &&
|
{
|
||||||
lastLedgerSeq <= validatedMax;
|
if (auto inner = dynamic_cast<STObject const*>(
|
||||||
}();
|
node.peekAtPField(sfNewFields));
|
||||||
|
inner)
|
||||||
|
{
|
||||||
|
if (inner->isFieldPresent(sfAccount) &&
|
||||||
|
inner->getAccountID(sfAccount) ==
|
||||||
|
accountId)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!haveRange)
|
return false;
|
||||||
{
|
};
|
||||||
JLOG(m_journal.debug())
|
|
||||||
<< "AccountHistory reschedule job for account "
|
|
||||||
<< toBase58(accountId) << ", incomplete ledger range ["
|
|
||||||
<< startLedgerSeq << "," << lastLedgerSeq << "]";
|
|
||||||
setAccountHistoryJobTimer(subInfo);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::optional<RelationalDatabase::AccountTxMarker> marker{};
|
auto send = [&](Json::Value const& jvObj,
|
||||||
while (!subInfo.index_->stopHistorical_)
|
bool unsubscribe) -> bool {
|
||||||
{
|
if (auto sptr = subInfo.sinkWptr_.lock())
|
||||||
auto dbResult =
|
{
|
||||||
getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
|
sptr->send(jvObj, true);
|
||||||
if (!dbResult)
|
if (unsubscribe)
|
||||||
|
unsubAccountHistory(sptr, accountId, false);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
auto sendMultiApiJson = [&](MultiApiJson const& jvObj,
|
||||||
|
bool unsubscribe) -> bool {
|
||||||
|
if (auto sptr = subInfo.sinkWptr_.lock())
|
||||||
|
{
|
||||||
|
jvObj.visit(
|
||||||
|
sptr->getApiVersion(), //
|
||||||
|
[&](Json::Value const& jv) {
|
||||||
|
sptr->send(jv, true);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (unsubscribe)
|
||||||
|
unsubAccountHistory(sptr, accountId, false);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
auto getMoreTxns =
|
||||||
|
[&](std::uint32_t minLedger,
|
||||||
|
std::uint32_t maxLedger,
|
||||||
|
std::optional<RelationalDatabase::AccountTxMarker>
|
||||||
|
marker)
|
||||||
|
-> std::optional<std::pair<
|
||||||
|
RelationalDatabase::AccountTxs,
|
||||||
|
std::optional<
|
||||||
|
RelationalDatabase::AccountTxMarker>>> {
|
||||||
|
switch (dbType)
|
||||||
|
{
|
||||||
|
case Sqlite: {
|
||||||
|
auto db = static_cast<SQLiteDatabase*>(
|
||||||
|
&app_.getRelationalDatabase());
|
||||||
|
RelationalDatabase::AccountTxPageOptions
|
||||||
|
options{
|
||||||
|
accountId,
|
||||||
|
minLedger,
|
||||||
|
maxLedger,
|
||||||
|
marker,
|
||||||
|
0,
|
||||||
|
true};
|
||||||
|
return db->newestAccountTxPage(options);
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
UNREACHABLE(
|
||||||
|
"ripple::NetworkOPsImp::"
|
||||||
|
"addAccountHistoryJob::"
|
||||||
|
"getMoreTxns : invalid database type");
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* search backward until the genesis ledger or asked to stop
|
||||||
|
*/
|
||||||
|
while (lastLedgerSeq >= 2 &&
|
||||||
|
!subInfo.index_->stopHistorical_)
|
||||||
{
|
{
|
||||||
JLOG(m_journal.debug())
|
int feeChargeCount = 0;
|
||||||
<< "AccountHistory job for account "
|
if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
|
||||||
<< toBase58(accountId) << " getMoreTxns failed.";
|
|
||||||
send(rpcError(rpcINTERNAL), true);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto const& txns = dbResult->first;
|
|
||||||
marker = dbResult->second;
|
|
||||||
size_t num_txns = txns.size();
|
|
||||||
for (size_t i = 0; i < num_txns; ++i)
|
|
||||||
{
|
|
||||||
auto const& [tx, meta] = txns[i];
|
|
||||||
|
|
||||||
if (!tx || !meta)
|
|
||||||
{
|
{
|
||||||
JLOG(m_journal.debug())
|
sptr->getConsumer().charge(
|
||||||
<< "AccountHistory job for account "
|
Resource::feeMediumBurdenRPC);
|
||||||
<< toBase58(accountId) << " empty tx or meta.";
|
++feeChargeCount;
|
||||||
send(rpcError(rpcINTERNAL), true);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
auto curTxLedger =
|
|
||||||
app_.getLedgerMaster().getLedgerBySeq(
|
|
||||||
tx->getLedger());
|
|
||||||
if (!curTxLedger)
|
|
||||||
{
|
|
||||||
JLOG(m_journal.debug())
|
|
||||||
<< "AccountHistory job for account "
|
|
||||||
<< toBase58(accountId) << " no ledger.";
|
|
||||||
send(rpcError(rpcINTERNAL), true);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
std::shared_ptr<STTx const> stTxn =
|
|
||||||
tx->getSTransaction();
|
|
||||||
if (!stTxn)
|
|
||||||
{
|
|
||||||
JLOG(m_journal.debug())
|
|
||||||
<< "AccountHistory job for account "
|
|
||||||
<< toBase58(accountId)
|
|
||||||
<< " getSTransaction failed.";
|
|
||||||
send(rpcError(rpcINTERNAL), true);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto const mRef = std::ref(*meta);
|
|
||||||
auto const trR = meta->getResultTER();
|
|
||||||
MultiApiJson jvTx =
|
|
||||||
transJson(stTxn, trR, true, curTxLedger, mRef);
|
|
||||||
|
|
||||||
jvTx.set(
|
|
||||||
jss::account_history_tx_index, txHistoryIndex--);
|
|
||||||
if (i + 1 == num_txns ||
|
|
||||||
txns[i + 1].first->getLedger() != tx->getLedger())
|
|
||||||
jvTx.set(jss::account_history_boundary, true);
|
|
||||||
|
|
||||||
if (isFirstTx(tx, meta))
|
|
||||||
{
|
|
||||||
jvTx.set(jss::account_history_tx_first, true);
|
|
||||||
sendMultiApiJson(jvTx, false);
|
|
||||||
|
|
||||||
JLOG(m_journal.trace())
|
|
||||||
<< "AccountHistory job for account "
|
|
||||||
<< toBase58(accountId)
|
|
||||||
<< " done, found last tx.";
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
sendMultiApiJson(jvTx, false);
|
JLOG(m_journal.trace())
|
||||||
|
<< "AccountHistory job for account "
|
||||||
|
<< toBase58(accountId)
|
||||||
|
<< " no InfoSub. Fee charged " << feeChargeCount
|
||||||
|
<< " times.";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// try to search in 1024 ledgers till reaching genesis
|
||||||
|
// ledgers
|
||||||
|
auto startLedgerSeq =
|
||||||
|
(lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024
|
||||||
|
: 2);
|
||||||
|
JLOG(m_journal.trace())
|
||||||
|
<< "AccountHistory job for account "
|
||||||
|
<< toBase58(accountId)
|
||||||
|
<< ", working on ledger range [" << startLedgerSeq
|
||||||
|
<< "," << lastLedgerSeq << "]";
|
||||||
|
|
||||||
|
auto haveRange = [&]() -> bool {
|
||||||
|
std::uint32_t validatedMin = UINT_MAX;
|
||||||
|
std::uint32_t validatedMax = 0;
|
||||||
|
auto haveSomeValidatedLedgers =
|
||||||
|
app_.getLedgerMaster().getValidatedRange(
|
||||||
|
validatedMin, validatedMax);
|
||||||
|
|
||||||
|
return haveSomeValidatedLedgers &&
|
||||||
|
validatedMin <= startLedgerSeq &&
|
||||||
|
lastLedgerSeq <= validatedMax;
|
||||||
|
}();
|
||||||
|
|
||||||
|
if (!haveRange)
|
||||||
|
{
|
||||||
|
JLOG(m_journal.debug())
|
||||||
|
<< "AccountHistory reschedule job for account "
|
||||||
|
<< toBase58(accountId)
|
||||||
|
<< ", incomplete ledger range ["
|
||||||
|
<< startLedgerSeq << "," << lastLedgerSeq
|
||||||
|
<< "]";
|
||||||
|
setAccountHistoryJobTimer(subInfo);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<RelationalDatabase::AccountTxMarker>
|
||||||
|
marker{};
|
||||||
|
while (!subInfo.index_->stopHistorical_)
|
||||||
|
{
|
||||||
|
auto dbResult = getMoreTxns(
|
||||||
|
startLedgerSeq, lastLedgerSeq, marker);
|
||||||
|
if (!dbResult)
|
||||||
|
{
|
||||||
|
JLOG(m_journal.debug())
|
||||||
|
<< "AccountHistory job for account "
|
||||||
|
<< toBase58(accountId)
|
||||||
|
<< " getMoreTxns failed.";
|
||||||
|
send(rpcError(rpcINTERNAL), true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto const& txns = dbResult->first;
|
||||||
|
marker = dbResult->second;
|
||||||
|
size_t num_txns = txns.size();
|
||||||
|
for (size_t i = 0; i < num_txns; ++i)
|
||||||
|
{
|
||||||
|
auto const& [tx, meta] = txns[i];
|
||||||
|
|
||||||
|
if (!tx || !meta)
|
||||||
|
{
|
||||||
|
JLOG(m_journal.debug())
|
||||||
|
<< "AccountHistory job for account "
|
||||||
|
<< toBase58(accountId)
|
||||||
|
<< " empty tx or meta.";
|
||||||
|
send(rpcError(rpcINTERNAL), true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto curTxLedger =
|
||||||
|
app_.getLedgerMaster().getLedgerBySeq(
|
||||||
|
tx->getLedger());
|
||||||
|
if (!curTxLedger)
|
||||||
|
{
|
||||||
|
JLOG(m_journal.debug())
|
||||||
|
<< "AccountHistory job for account "
|
||||||
|
<< toBase58(accountId) << " no ledger.";
|
||||||
|
send(rpcError(rpcINTERNAL), true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
std::shared_ptr<STTx const> stTxn =
|
||||||
|
tx->getSTransaction();
|
||||||
|
if (!stTxn)
|
||||||
|
{
|
||||||
|
JLOG(m_journal.debug())
|
||||||
|
<< "AccountHistory job for account "
|
||||||
|
<< toBase58(accountId)
|
||||||
|
<< " getSTransaction failed.";
|
||||||
|
send(rpcError(rpcINTERNAL), true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto const mRef = std::ref(*meta);
|
||||||
|
auto const trR = meta->getResultTER();
|
||||||
|
MultiApiJson jvTx = transJson(
|
||||||
|
stTxn, trR, true, curTxLedger, mRef);
|
||||||
|
|
||||||
|
jvTx.set(
|
||||||
|
jss::account_history_tx_index,
|
||||||
|
txHistoryIndex--);
|
||||||
|
if (i + 1 == num_txns ||
|
||||||
|
txns[i + 1].first->getLedger() !=
|
||||||
|
tx->getLedger())
|
||||||
|
jvTx.set(
|
||||||
|
jss::account_history_boundary, true);
|
||||||
|
|
||||||
|
if (isFirstTx(tx, meta))
|
||||||
|
{
|
||||||
|
jvTx.set(
|
||||||
|
jss::account_history_tx_first, true);
|
||||||
|
sendMultiApiJson(jvTx, false);
|
||||||
|
|
||||||
|
JLOG(m_journal.trace())
|
||||||
|
<< "AccountHistory job for account "
|
||||||
|
<< toBase58(accountId)
|
||||||
|
<< " done, found last tx.";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
sendMultiApiJson(jvTx, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (marker)
|
||||||
|
{
|
||||||
|
JLOG(m_journal.trace())
|
||||||
|
<< "AccountHistory job for account "
|
||||||
|
<< toBase58(accountId)
|
||||||
|
<< " paging, marker=" << marker->ledgerSeq
|
||||||
|
<< ":" << marker->txnSeq;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!subInfo.index_->stopHistorical_)
|
||||||
|
{
|
||||||
|
lastLedgerSeq = startLedgerSeq - 1;
|
||||||
|
if (lastLedgerSeq <= 1)
|
||||||
|
{
|
||||||
|
JLOG(m_journal.trace())
|
||||||
|
<< "AccountHistory job for account "
|
||||||
|
<< toBase58(accountId)
|
||||||
|
<< " done, reached genesis ledger.";
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (marker)
|
return;
|
||||||
{
|
},
|
||||||
JLOG(m_journal.trace())
|
"AccountHistoryTxStream",
|
||||||
<< "AccountHistory job for account "
|
1s,
|
||||||
<< toBase58(accountId)
|
this->m_journal);
|
||||||
<< " paging, marker=" << marker->ledgerSeq << ":"
|
|
||||||
<< marker->txnSeq;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!subInfo.index_->stopHistorical_)
|
|
||||||
{
|
|
||||||
lastLedgerSeq = startLedgerSeq - 1;
|
|
||||||
if (lastLedgerSeq <= 1)
|
|
||||||
{
|
|
||||||
JLOG(m_journal.trace())
|
|
||||||
<< "AccountHistory job for account "
|
|
||||||
<< toBase58(accountId)
|
|
||||||
<< " done, reached genesis ledger.";
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,7 @@
|
|||||||
#include <xrpld/core/Config.h>
|
#include <xrpld/core/Config.h>
|
||||||
#include <xrpld/core/DatabaseCon.h>
|
#include <xrpld/core/DatabaseCon.h>
|
||||||
#include <xrpld/core/SociDB.h>
|
#include <xrpld/core/SociDB.h>
|
||||||
|
#include <xrpld/perflog/PerfLog.h>
|
||||||
|
|
||||||
#include <xrpl/basics/ByteUtilities.h>
|
#include <xrpl/basics/ByteUtilities.h>
|
||||||
#include <xrpl/basics/contract.h>
|
#include <xrpl/basics/contract.h>
|
||||||
@@ -35,6 +36,8 @@
|
|||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
static auto checkpointPageCount = 1000;
|
static auto checkpointPageCount = 1000;
|
||||||
@@ -257,9 +260,17 @@ public:
|
|||||||
// There is a separate check in `checkpoint` for a valid
|
// There is a separate check in `checkpoint` for a valid
|
||||||
// connection in the rare case when the DatabaseCon is destroyed
|
// connection in the rare case when the DatabaseCon is destroyed
|
||||||
// after locking this weak_ptr
|
// after locking this weak_ptr
|
||||||
[wp = std::weak_ptr<Checkpointer>{shared_from_this()}]() {
|
[wp = std::weak_ptr<Checkpointer>{shared_from_this()},
|
||||||
if (auto self = wp.lock())
|
journal = j_]() {
|
||||||
self->checkpoint();
|
perf::measureDurationAndLog(
|
||||||
|
[&]() {
|
||||||
|
if (auto self = wp.lock())
|
||||||
|
self->checkpoint();
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
"WAL",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
}))
|
}))
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex_);
|
std::lock_guard lock(mutex_);
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
#include <xrpld/net/RPCCall.h>
|
#include <xrpld/net/RPCCall.h>
|
||||||
#include <xrpld/net/RPCSub.h>
|
#include <xrpld/net/RPCSub.h>
|
||||||
|
#include <xrpld/perflog/PerfLog.h>
|
||||||
|
|
||||||
#include <xrpl/basics/Log.h>
|
#include <xrpl/basics/Log.h>
|
||||||
#include <xrpl/basics/StringUtilities.h>
|
#include <xrpl/basics/StringUtilities.h>
|
||||||
@@ -27,6 +28,8 @@
|
|||||||
|
|
||||||
#include <deque>
|
#include <deque>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
namespace ripple {
|
namespace ripple {
|
||||||
|
|
||||||
// Subscription object for JSON-RPC
|
// Subscription object for JSON-RPC
|
||||||
@@ -91,8 +94,17 @@ public:
|
|||||||
JLOG(j_.info()) << "RPCCall::fromNetwork start";
|
JLOG(j_.info()) << "RPCCall::fromNetwork start";
|
||||||
|
|
||||||
mSending = m_jobQueue.addJob(
|
mSending = m_jobQueue.addJob(
|
||||||
jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this]() {
|
jtCLIENT_SUBSCRIBE,
|
||||||
sendThread();
|
"RPCSub::sendThread",
|
||||||
|
[this, journal = j_]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() {
|
||||||
|
sendThread();
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
"RPCSub::sendThread",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1077,8 +1077,17 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMManifests> const& m)
|
|||||||
fee_.update(Resource::feeModerateBurdenPeer, "oversize");
|
fee_.update(Resource::feeModerateBurdenPeer, "oversize");
|
||||||
|
|
||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtMANIFEST, "receiveManifests", [this, that = shared_from_this(), m]() {
|
jtMANIFEST,
|
||||||
overlay_.onManifests(m, that);
|
"receiveManifests",
|
||||||
|
[this, that = shared_from_this(), m, journal = p_journal_]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() {
|
||||||
|
overlay_.onManifests(m, that);
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
"receiveManifests",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1349,10 +1358,18 @@ PeerImp::handleTransaction(
|
|||||||
flags,
|
flags,
|
||||||
checkSignature,
|
checkSignature,
|
||||||
batch,
|
batch,
|
||||||
stx]() {
|
stx,
|
||||||
if (auto peer = weak.lock())
|
journal = p_journal_]() {
|
||||||
peer->checkTransaction(
|
perf::measureDurationAndLog(
|
||||||
flags, checkSignature, stx, batch);
|
[&]() {
|
||||||
|
if (auto peer = weak.lock())
|
||||||
|
peer->checkTransaction(
|
||||||
|
flags, checkSignature, stx, batch);
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
"recvTransaction->checkTransaction",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1447,10 +1464,18 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
|
|||||||
|
|
||||||
// Queue a job to process the request
|
// Queue a job to process the request
|
||||||
std::weak_ptr<PeerImp> weak = shared_from_this();
|
std::weak_ptr<PeerImp> weak = shared_from_this();
|
||||||
app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m]() {
|
app_.getJobQueue().addJob(
|
||||||
if (auto peer = weak.lock())
|
jtLEDGER_REQ, "recvGetLedger", [weak, m, journal = p_journal_]() {
|
||||||
peer->processLedgerRequest(m);
|
perf::measureDurationAndLog(
|
||||||
});
|
[&]() {
|
||||||
|
if (auto peer = weak.lock())
|
||||||
|
peer->processLedgerRequest(m);
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
"recvGetLedger",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@@ -1468,27 +1493,38 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
|
|||||||
Resource::feeModerateBurdenPeer, "received a proof path request");
|
Resource::feeModerateBurdenPeer, "received a proof path request");
|
||||||
std::weak_ptr<PeerImp> weak = shared_from_this();
|
std::weak_ptr<PeerImp> weak = shared_from_this();
|
||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtREPLAY_REQ, "recvProofPathRequest", [weak, m]() {
|
jtREPLAY_REQ,
|
||||||
if (auto peer = weak.lock())
|
"recvProofPathRequest",
|
||||||
{
|
[weak, m, journal = p_journal_]() {
|
||||||
auto reply =
|
perf::measureDurationAndLog(
|
||||||
peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
|
[&]() {
|
||||||
if (reply.has_error())
|
if (auto peer = weak.lock())
|
||||||
{
|
{
|
||||||
if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
|
auto reply = peer->ledgerReplayMsgHandler_
|
||||||
peer->charge(
|
.processProofPathRequest(m);
|
||||||
Resource::feeMalformedRequest,
|
if (reply.has_error())
|
||||||
"proof_path_request");
|
{
|
||||||
else
|
if (reply.error() ==
|
||||||
peer->charge(
|
protocol::TMReplyError::reBAD_REQUEST)
|
||||||
Resource::feeRequestNoReply, "proof_path_request");
|
peer->charge(
|
||||||
}
|
Resource::feeMalformedRequest,
|
||||||
else
|
"proof_path_request");
|
||||||
{
|
else
|
||||||
peer->send(std::make_shared<Message>(
|
peer->charge(
|
||||||
reply, protocol::mtPROOF_PATH_RESPONSE));
|
Resource::feeRequestNoReply,
|
||||||
}
|
"proof_path_request");
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
peer->send(std::make_shared<Message>(
|
||||||
|
reply, protocol::mtPROOF_PATH_RESPONSE));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
"recvProofPathRequest",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1522,28 +1558,38 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
|
|||||||
fee_.fee = Resource::feeModerateBurdenPeer;
|
fee_.fee = Resource::feeModerateBurdenPeer;
|
||||||
std::weak_ptr<PeerImp> weak = shared_from_this();
|
std::weak_ptr<PeerImp> weak = shared_from_this();
|
||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtREPLAY_REQ, "recvReplayDeltaRequest", [weak, m]() {
|
jtREPLAY_REQ,
|
||||||
if (auto peer = weak.lock())
|
"recvReplayDeltaRequest",
|
||||||
{
|
[weak, m, journal = p_journal_]() {
|
||||||
auto reply =
|
perf::measureDurationAndLog(
|
||||||
peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
|
[&]() {
|
||||||
if (reply.has_error())
|
if (auto peer = weak.lock())
|
||||||
{
|
{
|
||||||
if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
|
auto reply = peer->ledgerReplayMsgHandler_
|
||||||
peer->charge(
|
.processReplayDeltaRequest(m);
|
||||||
Resource::feeMalformedRequest,
|
if (reply.has_error())
|
||||||
"replay_delta_request");
|
{
|
||||||
else
|
if (reply.error() ==
|
||||||
peer->charge(
|
protocol::TMReplyError::reBAD_REQUEST)
|
||||||
Resource::feeRequestNoReply,
|
peer->charge(
|
||||||
"replay_delta_request");
|
Resource::feeMalformedRequest,
|
||||||
}
|
"replay_delta_request");
|
||||||
else
|
else
|
||||||
{
|
peer->charge(
|
||||||
peer->send(std::make_shared<Message>(
|
Resource::feeRequestNoReply,
|
||||||
reply, protocol::mtREPLAY_DELTA_RESPONSE));
|
"replay_delta_request");
|
||||||
}
|
}
|
||||||
}
|
else
|
||||||
|
{
|
||||||
|
peer->send(std::make_shared<Message>(
|
||||||
|
reply, protocol::mtREPLAY_DELTA_RESPONSE));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
"recvReplayDeltaRequest",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1640,12 +1686,21 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
|
|||||||
{
|
{
|
||||||
std::weak_ptr<PeerImp> weak{shared_from_this()};
|
std::weak_ptr<PeerImp> weak{shared_from_this()};
|
||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtTXN_DATA, "recvPeerData", [weak, ledgerHash, m]() {
|
jtTXN_DATA,
|
||||||
if (auto peer = weak.lock())
|
"recvPeerData",
|
||||||
{
|
[weak, ledgerHash, m, journal = p_journal_]() {
|
||||||
peer->app_.getInboundTransactions().gotData(
|
perf::measureDurationAndLog(
|
||||||
ledgerHash, peer, m);
|
[&]() {
|
||||||
}
|
if (auto peer = weak.lock())
|
||||||
|
{
|
||||||
|
peer->app_.getInboundTransactions().gotData(
|
||||||
|
ledgerHash, peer, m);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
"recvPeerData",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -1770,9 +1825,16 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
|||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
|
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
|
||||||
"recvPropose->checkPropose",
|
"recvPropose->checkPropose",
|
||||||
[weak, isTrusted, m, proposal]() {
|
[weak, isTrusted, m, proposal, journal = p_journal_]() {
|
||||||
if (auto peer = weak.lock())
|
perf::measureDurationAndLog(
|
||||||
peer->checkPropose(isTrusted, m, proposal);
|
[&]() {
|
||||||
|
if (auto peer = weak.lock())
|
||||||
|
peer->checkPropose(isTrusted, m, proposal);
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
"recvPropose->checkPropose",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2408,9 +2470,16 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
|||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
|
isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
|
||||||
name,
|
name,
|
||||||
[weak, val, m, key]() {
|
[weak, val, m, key, name, journal = p_journal_]() {
|
||||||
if (auto peer = weak.lock())
|
perf::measureDurationAndLog(
|
||||||
peer->checkValidation(val, key, m);
|
[&]() {
|
||||||
|
if (auto peer = weak.lock())
|
||||||
|
peer->checkValidation(val, key, m);
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
name,
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@@ -2463,9 +2532,18 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
|
|||||||
|
|
||||||
std::weak_ptr<PeerImp> weak = shared_from_this();
|
std::weak_ptr<PeerImp> weak = shared_from_this();
|
||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtREQUESTED_TXN, "doTransactions", [weak, m]() {
|
jtREQUESTED_TXN,
|
||||||
if (auto peer = weak.lock())
|
"doTransactions",
|
||||||
peer->doTransactions(m);
|
[weak, m, journal = p_journal_]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() {
|
||||||
|
if (auto peer = weak.lock())
|
||||||
|
peer->doTransactions(m);
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
"doTransactions",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -2597,9 +2675,18 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m)
|
|||||||
|
|
||||||
std::weak_ptr<PeerImp> weak = shared_from_this();
|
std::weak_ptr<PeerImp> weak = shared_from_this();
|
||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtMISSING_TXN, "handleHaveTransactions", [weak, m]() {
|
jtMISSING_TXN,
|
||||||
if (auto peer = weak.lock())
|
"handleHaveTransactions",
|
||||||
peer->handleHaveTransactions(m);
|
[weak, m, journal = p_journal_]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() {
|
||||||
|
if (auto peer = weak.lock())
|
||||||
|
peer->handleHaveTransactions(m);
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
"handleHaveTransactions",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2768,8 +2855,18 @@ PeerImp::doFetchPack(std::shared_ptr<protocol::TMGetObjectByHash> const& packet)
|
|||||||
auto elapsed = UptimeClock::now();
|
auto elapsed = UptimeClock::now();
|
||||||
auto const pap = &app_;
|
auto const pap = &app_;
|
||||||
app_.getJobQueue().addJob(
|
app_.getJobQueue().addJob(
|
||||||
jtPACK, "MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
|
jtPACK,
|
||||||
pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
|
"MakeFetchPack",
|
||||||
|
[pap, weak, packet, hash, elapsed, journal = p_journal_]() {
|
||||||
|
perf::measureDurationAndLog(
|
||||||
|
[&]() {
|
||||||
|
pap->getLedgerMaster().makeFetchPack(
|
||||||
|
weak, packet, hash, elapsed);
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
"MakeFetchPack",
|
||||||
|
1s,
|
||||||
|
journal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -190,20 +190,35 @@ measureDurationAndLog(
|
|||||||
std::chrono::duration<Rep, Period> maxDelay,
|
std::chrono::duration<Rep, Period> maxDelay,
|
||||||
beast::Journal const& journal)
|
beast::Journal const& journal)
|
||||||
{
|
{
|
||||||
|
using Result = std::invoke_result_t<Func>;
|
||||||
|
|
||||||
auto start_time = std::chrono::high_resolution_clock::now();
|
auto start_time = std::chrono::high_resolution_clock::now();
|
||||||
|
|
||||||
auto result = func();
|
if constexpr (std::is_void_v<Result>)
|
||||||
|
|
||||||
auto end_time = std::chrono::high_resolution_clock::now();
|
|
||||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|
||||||
end_time - start_time);
|
|
||||||
if (duration > maxDelay)
|
|
||||||
{
|
{
|
||||||
JLOG(journal.warn())
|
std::forward<Func>(func)();
|
||||||
<< actionDescription << " took " << duration.count() << " ms";
|
auto end_time = std::chrono::high_resolution_clock::now();
|
||||||
|
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||||
|
end_time - start_time);
|
||||||
|
if (duration > maxDelay)
|
||||||
|
{
|
||||||
|
JLOG(journal.warn())
|
||||||
|
<< actionDescription << " took " << duration.count() << " ms";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Result result = std::forward<Func>(func)();
|
||||||
|
auto end_time = std::chrono::high_resolution_clock::now();
|
||||||
|
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||||
|
end_time - start_time);
|
||||||
|
if (duration > maxDelay)
|
||||||
|
{
|
||||||
|
JLOG(journal.warn())
|
||||||
|
<< actionDescription << " took " << duration.count() << " ms";
|
||||||
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace perf
|
} // namespace perf
|
||||||
|
|||||||
Reference in New Issue
Block a user