mirror of
https://github.com/XRPLF/rippled.git
synced 2026-04-29 15:37:57 +00:00
networkops duration measure
This commit is contained in:
@@ -81,6 +81,8 @@
|
||||
#include <tuple>
|
||||
#include <unordered_map>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace ripple {
|
||||
|
||||
class NetworkOPsImp final : public NetworkOPs
|
||||
@@ -994,9 +996,16 @@ NetworkOPsImp::setHeartbeatTimer()
|
||||
heartbeatTimer_,
|
||||
mConsensus.parms().ledgerGRANULARITY,
|
||||
[this]() {
|
||||
m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() {
|
||||
processHeartbeatTimer();
|
||||
});
|
||||
m_job_queue.addJob(
|
||||
jtNETOP_TIMER,
|
||||
"NetOPs.heartbeat",
|
||||
[this, journal = m_journal]() {
|
||||
perf::measureDurationAndLog(
|
||||
[&]() { processHeartbeatTimer(); },
|
||||
"NetOPs.heartbeat",
|
||||
1s,
|
||||
journal);
|
||||
});
|
||||
},
|
||||
[this]() { setHeartbeatTimer(); });
|
||||
}
|
||||
@@ -1010,9 +1019,16 @@ NetworkOPsImp::setClusterTimer()
|
||||
clusterTimer_,
|
||||
10s,
|
||||
[this]() {
|
||||
m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this]() {
|
||||
processClusterTimer();
|
||||
});
|
||||
m_job_queue.addJob(
|
||||
jtNETOP_CLUSTER,
|
||||
"NetOPs.cluster",
|
||||
[this, journal = m_journal]() {
|
||||
perf::measureDurationAndLog(
|
||||
[&]() { processClusterTimer(); },
|
||||
"NetOPs.cluster",
|
||||
1s,
|
||||
journal);
|
||||
});
|
||||
},
|
||||
[this]() { setClusterTimer(); });
|
||||
}
|
||||
@@ -1229,10 +1245,17 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
|
||||
|
||||
auto tx = std::make_shared<Transaction>(trans, reason, app_);
|
||||
|
||||
m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() {
|
||||
auto t = tx;
|
||||
processTransaction(t, false, false, FailHard::no);
|
||||
});
|
||||
m_job_queue.addJob(
|
||||
jtTRANSACTION, "submitTxn", [this, tx, journal = m_journal]() {
|
||||
perf::measureDurationAndLog(
|
||||
[&]() {
|
||||
auto t = tx;
|
||||
processTransaction(t, false, false, FailHard::no);
|
||||
},
|
||||
"submitTxn",
|
||||
1s,
|
||||
journal);
|
||||
});
|
||||
}
|
||||
|
||||
bool
|
||||
@@ -1315,7 +1338,13 @@ NetworkOPsImp::doTransactionAsync(
|
||||
if (mDispatchState == DispatchState::none)
|
||||
{
|
||||
if (m_job_queue.addJob(
|
||||
jtBATCH, "transactionBatch", [this]() { transactionBatch(); }))
|
||||
jtBATCH, "transactionBatch", [this, journal = m_journal]() {
|
||||
perf::measureDurationAndLog(
|
||||
[&]() { transactionBatch(); },
|
||||
"transactionBatch",
|
||||
1s,
|
||||
journal);
|
||||
}))
|
||||
{
|
||||
mDispatchState = DispatchState::scheduled;
|
||||
}
|
||||
@@ -1362,9 +1391,16 @@ NetworkOPsImp::doTransactionSyncBatch(
|
||||
if (mTransactions.size())
|
||||
{
|
||||
// More transactions need to be applied, but by another job.
|
||||
if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() {
|
||||
transactionBatch();
|
||||
}))
|
||||
if (m_job_queue.addJob(
|
||||
jtBATCH,
|
||||
"transactionBatch",
|
||||
[this, journal = m_journal]() {
|
||||
perf::measureDurationAndLog(
|
||||
[&]() { transactionBatch(); },
|
||||
"transactionBatch",
|
||||
1s,
|
||||
journal);
|
||||
}))
|
||||
{
|
||||
mDispatchState = DispatchState::scheduled;
|
||||
}
|
||||
@@ -2655,8 +2691,9 @@ NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters)
|
||||
|
||||
info[jss::server_state] = strOperatingMode(admin);
|
||||
|
||||
info[jss::time] = to_string(std::chrono::floor<std::chrono::microseconds>(
|
||||
std::chrono::system_clock::now()));
|
||||
info[jss::time] = to_string(
|
||||
std::chrono::floor<std::chrono::microseconds>(
|
||||
std::chrono::system_clock::now()));
|
||||
|
||||
if (needNetworkLedger_)
|
||||
info[jss::network_ledger] = "waiting";
|
||||
@@ -3175,8 +3212,17 @@ NetworkOPsImp::reportFeeChange()
|
||||
if (f != mLastFeeSummary)
|
||||
{
|
||||
m_job_queue.addJob(
|
||||
jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this]() {
|
||||
pubServer();
|
||||
jtCLIENT_FEE_CHANGE,
|
||||
"reportFeeChange->pubServer",
|
||||
[this, journal = m_journal]() {
|
||||
perf::measureDurationAndLog(
|
||||
[&]() {
|
||||
pubServer();
|
||||
return true;
|
||||
},
|
||||
"reportFeeChange->pubServer",
|
||||
1s,
|
||||
journal);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -3187,7 +3233,16 @@ NetworkOPsImp::reportConsensusStateChange(ConsensusPhase phase)
|
||||
m_job_queue.addJob(
|
||||
jtCLIENT_CONSENSUS,
|
||||
"reportConsensusStateChange->pubConsensus",
|
||||
[this, phase]() { pubConsensus(phase); });
|
||||
[this, phase, journal = m_journal]() {
|
||||
perf::measureDurationAndLog(
|
||||
[&]() {
|
||||
pubConsensus(phase);
|
||||
return true;
|
||||
},
|
||||
"reportConsensusStateChange->pubConsensus",
|
||||
1s,
|
||||
journal);
|
||||
});
|
||||
}
|
||||
|
||||
inline void
|
||||
@@ -3693,262 +3748,301 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
|
||||
jtCLIENT_ACCT_HIST,
|
||||
"AccountHistoryTxStream",
|
||||
[this, dbType = databaseType, subInfo]() {
|
||||
auto const& accountId = subInfo.index_->accountId_;
|
||||
auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
|
||||
auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
|
||||
perf::measureDurationAndLog(
|
||||
[&]() {
|
||||
auto const& accountId = subInfo.index_->accountId_;
|
||||
auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
|
||||
auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
|
||||
|
||||
JLOG(m_journal.trace())
|
||||
<< "AccountHistory job for account " << toBase58(accountId)
|
||||
<< " started. lastLedgerSeq=" << lastLedgerSeq;
|
||||
|
||||
auto isFirstTx = [&](std::shared_ptr<Transaction> const& tx,
|
||||
std::shared_ptr<TxMeta> const& meta) -> bool {
|
||||
/*
|
||||
* genesis account: first tx is the one with seq 1
|
||||
* other account: first tx is the one created the account
|
||||
*/
|
||||
if (accountId == genesisAccountId)
|
||||
{
|
||||
auto stx = tx->getSTransaction();
|
||||
if (stx->getAccountID(sfAccount) == accountId &&
|
||||
stx->getSeqValue() == 1)
|
||||
return true;
|
||||
}
|
||||
|
||||
for (auto& node : meta->getNodes())
|
||||
{
|
||||
if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
|
||||
continue;
|
||||
|
||||
if (node.isFieldPresent(sfNewFields))
|
||||
{
|
||||
if (auto inner = dynamic_cast<STObject const*>(
|
||||
node.peekAtPField(sfNewFields));
|
||||
inner)
|
||||
{
|
||||
if (inner->isFieldPresent(sfAccount) &&
|
||||
inner->getAccountID(sfAccount) == accountId)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
auto send = [&](Json::Value const& jvObj,
|
||||
bool unsubscribe) -> bool {
|
||||
if (auto sptr = subInfo.sinkWptr_.lock())
|
||||
{
|
||||
sptr->send(jvObj, true);
|
||||
if (unsubscribe)
|
||||
unsubAccountHistory(sptr, accountId, false);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
auto sendMultiApiJson = [&](MultiApiJson const& jvObj,
|
||||
bool unsubscribe) -> bool {
|
||||
if (auto sptr = subInfo.sinkWptr_.lock())
|
||||
{
|
||||
jvObj.visit(
|
||||
sptr->getApiVersion(), //
|
||||
[&](Json::Value const& jv) { sptr->send(jv, true); });
|
||||
|
||||
if (unsubscribe)
|
||||
unsubAccountHistory(sptr, accountId, false);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
auto getMoreTxns =
|
||||
[&](std::uint32_t minLedger,
|
||||
std::uint32_t maxLedger,
|
||||
std::optional<RelationalDatabase::AccountTxMarker> marker)
|
||||
-> std::optional<std::pair<
|
||||
RelationalDatabase::AccountTxs,
|
||||
std::optional<RelationalDatabase::AccountTxMarker>>> {
|
||||
switch (dbType)
|
||||
{
|
||||
case Sqlite: {
|
||||
auto db = static_cast<SQLiteDatabase*>(
|
||||
&app_.getRelationalDatabase());
|
||||
RelationalDatabase::AccountTxPageOptions options{
|
||||
accountId, minLedger, maxLedger, marker, 0, true};
|
||||
return db->newestAccountTxPage(options);
|
||||
}
|
||||
default: {
|
||||
UNREACHABLE(
|
||||
"ripple::NetworkOPsImp::addAccountHistoryJob::"
|
||||
"getMoreTxns : invalid database type");
|
||||
return {};
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/*
|
||||
* search backward until the genesis ledger or asked to stop
|
||||
*/
|
||||
while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_)
|
||||
{
|
||||
int feeChargeCount = 0;
|
||||
if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
|
||||
{
|
||||
sptr->getConsumer().charge(Resource::feeMediumBurdenRPC);
|
||||
++feeChargeCount;
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(m_journal.trace())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId) << " no InfoSub. Fee charged "
|
||||
<< feeChargeCount << " times.";
|
||||
return;
|
||||
}
|
||||
<< toBase58(accountId)
|
||||
<< " started. lastLedgerSeq=" << lastLedgerSeq;
|
||||
|
||||
// try to search in 1024 ledgers till reaching genesis ledgers
|
||||
auto startLedgerSeq =
|
||||
(lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
|
||||
JLOG(m_journal.trace())
|
||||
<< "AccountHistory job for account " << toBase58(accountId)
|
||||
<< ", working on ledger range [" << startLedgerSeq << ","
|
||||
<< lastLedgerSeq << "]";
|
||||
auto isFirstTx =
|
||||
[&](std::shared_ptr<Transaction> const& tx,
|
||||
std::shared_ptr<TxMeta> const& meta) -> bool {
|
||||
/*
|
||||
* genesis account: first tx is the one with seq 1
|
||||
* other account: first tx is the one created the
|
||||
* account
|
||||
*/
|
||||
if (accountId == genesisAccountId)
|
||||
{
|
||||
auto stx = tx->getSTransaction();
|
||||
if (stx->getAccountID(sfAccount) == accountId &&
|
||||
stx->getSeqValue() == 1)
|
||||
return true;
|
||||
}
|
||||
|
||||
auto haveRange = [&]() -> bool {
|
||||
std::uint32_t validatedMin = UINT_MAX;
|
||||
std::uint32_t validatedMax = 0;
|
||||
auto haveSomeValidatedLedgers =
|
||||
app_.getLedgerMaster().getValidatedRange(
|
||||
validatedMin, validatedMax);
|
||||
for (auto& node : meta->getNodes())
|
||||
{
|
||||
if (node.getFieldU16(sfLedgerEntryType) !=
|
||||
ltACCOUNT_ROOT)
|
||||
continue;
|
||||
|
||||
return haveSomeValidatedLedgers &&
|
||||
validatedMin <= startLedgerSeq &&
|
||||
lastLedgerSeq <= validatedMax;
|
||||
}();
|
||||
if (node.isFieldPresent(sfNewFields))
|
||||
{
|
||||
if (auto inner = dynamic_cast<STObject const*>(
|
||||
node.peekAtPField(sfNewFields));
|
||||
inner)
|
||||
{
|
||||
if (inner->isFieldPresent(sfAccount) &&
|
||||
inner->getAccountID(sfAccount) ==
|
||||
accountId)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!haveRange)
|
||||
{
|
||||
JLOG(m_journal.debug())
|
||||
<< "AccountHistory reschedule job for account "
|
||||
<< toBase58(accountId) << ", incomplete ledger range ["
|
||||
<< startLedgerSeq << "," << lastLedgerSeq << "]";
|
||||
setAccountHistoryJobTimer(subInfo);
|
||||
return;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
std::optional<RelationalDatabase::AccountTxMarker> marker{};
|
||||
while (!subInfo.index_->stopHistorical_)
|
||||
{
|
||||
auto dbResult =
|
||||
getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
|
||||
if (!dbResult)
|
||||
auto send = [&](Json::Value const& jvObj,
|
||||
bool unsubscribe) -> bool {
|
||||
if (auto sptr = subInfo.sinkWptr_.lock())
|
||||
{
|
||||
sptr->send(jvObj, true);
|
||||
if (unsubscribe)
|
||||
unsubAccountHistory(sptr, accountId, false);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
auto sendMultiApiJson = [&](MultiApiJson const& jvObj,
|
||||
bool unsubscribe) -> bool {
|
||||
if (auto sptr = subInfo.sinkWptr_.lock())
|
||||
{
|
||||
jvObj.visit(
|
||||
sptr->getApiVersion(), //
|
||||
[&](Json::Value const& jv) {
|
||||
sptr->send(jv, true);
|
||||
});
|
||||
|
||||
if (unsubscribe)
|
||||
unsubAccountHistory(sptr, accountId, false);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
auto getMoreTxns =
|
||||
[&](std::uint32_t minLedger,
|
||||
std::uint32_t maxLedger,
|
||||
std::optional<RelationalDatabase::AccountTxMarker>
|
||||
marker)
|
||||
-> std::optional<std::pair<
|
||||
RelationalDatabase::AccountTxs,
|
||||
std::optional<
|
||||
RelationalDatabase::AccountTxMarker>>> {
|
||||
switch (dbType)
|
||||
{
|
||||
case Sqlite: {
|
||||
auto db = static_cast<SQLiteDatabase*>(
|
||||
&app_.getRelationalDatabase());
|
||||
RelationalDatabase::AccountTxPageOptions
|
||||
options{
|
||||
accountId,
|
||||
minLedger,
|
||||
maxLedger,
|
||||
marker,
|
||||
0,
|
||||
true};
|
||||
return db->newestAccountTxPage(options);
|
||||
}
|
||||
default: {
|
||||
UNREACHABLE(
|
||||
"ripple::NetworkOPsImp::"
|
||||
"addAccountHistoryJob::"
|
||||
"getMoreTxns : invalid database type");
|
||||
return {};
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/*
|
||||
* search backward until the genesis ledger or asked to stop
|
||||
*/
|
||||
while (lastLedgerSeq >= 2 &&
|
||||
!subInfo.index_->stopHistorical_)
|
||||
{
|
||||
JLOG(m_journal.debug())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId) << " getMoreTxns failed.";
|
||||
send(rpcError(rpcINTERNAL), true);
|
||||
return;
|
||||
}
|
||||
|
||||
auto const& txns = dbResult->first;
|
||||
marker = dbResult->second;
|
||||
size_t num_txns = txns.size();
|
||||
for (size_t i = 0; i < num_txns; ++i)
|
||||
{
|
||||
auto const& [tx, meta] = txns[i];
|
||||
|
||||
if (!tx || !meta)
|
||||
int feeChargeCount = 0;
|
||||
if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
|
||||
{
|
||||
JLOG(m_journal.debug())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId) << " empty tx or meta.";
|
||||
send(rpcError(rpcINTERNAL), true);
|
||||
return;
|
||||
}
|
||||
auto curTxLedger =
|
||||
app_.getLedgerMaster().getLedgerBySeq(
|
||||
tx->getLedger());
|
||||
if (!curTxLedger)
|
||||
{
|
||||
JLOG(m_journal.debug())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId) << " no ledger.";
|
||||
send(rpcError(rpcINTERNAL), true);
|
||||
return;
|
||||
}
|
||||
std::shared_ptr<STTx const> stTxn =
|
||||
tx->getSTransaction();
|
||||
if (!stTxn)
|
||||
{
|
||||
JLOG(m_journal.debug())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId)
|
||||
<< " getSTransaction failed.";
|
||||
send(rpcError(rpcINTERNAL), true);
|
||||
return;
|
||||
}
|
||||
|
||||
auto const mRef = std::ref(*meta);
|
||||
auto const trR = meta->getResultTER();
|
||||
MultiApiJson jvTx =
|
||||
transJson(stTxn, trR, true, curTxLedger, mRef);
|
||||
|
||||
jvTx.set(
|
||||
jss::account_history_tx_index, txHistoryIndex--);
|
||||
if (i + 1 == num_txns ||
|
||||
txns[i + 1].first->getLedger() != tx->getLedger())
|
||||
jvTx.set(jss::account_history_boundary, true);
|
||||
|
||||
if (isFirstTx(tx, meta))
|
||||
{
|
||||
jvTx.set(jss::account_history_tx_first, true);
|
||||
sendMultiApiJson(jvTx, false);
|
||||
|
||||
JLOG(m_journal.trace())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId)
|
||||
<< " done, found last tx.";
|
||||
return;
|
||||
sptr->getConsumer().charge(
|
||||
Resource::feeMediumBurdenRPC);
|
||||
++feeChargeCount;
|
||||
}
|
||||
else
|
||||
{
|
||||
sendMultiApiJson(jvTx, false);
|
||||
JLOG(m_journal.trace())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId)
|
||||
<< " no InfoSub. Fee charged " << feeChargeCount
|
||||
<< " times.";
|
||||
return;
|
||||
}
|
||||
|
||||
// try to search in 1024 ledgers till reaching genesis
|
||||
// ledgers
|
||||
auto startLedgerSeq =
|
||||
(lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024
|
||||
: 2);
|
||||
JLOG(m_journal.trace())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId)
|
||||
<< ", working on ledger range [" << startLedgerSeq
|
||||
<< "," << lastLedgerSeq << "]";
|
||||
|
||||
auto haveRange = [&]() -> bool {
|
||||
std::uint32_t validatedMin = UINT_MAX;
|
||||
std::uint32_t validatedMax = 0;
|
||||
auto haveSomeValidatedLedgers =
|
||||
app_.getLedgerMaster().getValidatedRange(
|
||||
validatedMin, validatedMax);
|
||||
|
||||
return haveSomeValidatedLedgers &&
|
||||
validatedMin <= startLedgerSeq &&
|
||||
lastLedgerSeq <= validatedMax;
|
||||
}();
|
||||
|
||||
if (!haveRange)
|
||||
{
|
||||
JLOG(m_journal.debug())
|
||||
<< "AccountHistory reschedule job for account "
|
||||
<< toBase58(accountId)
|
||||
<< ", incomplete ledger range ["
|
||||
<< startLedgerSeq << "," << lastLedgerSeq
|
||||
<< "]";
|
||||
setAccountHistoryJobTimer(subInfo);
|
||||
return;
|
||||
}
|
||||
|
||||
std::optional<RelationalDatabase::AccountTxMarker>
|
||||
marker{};
|
||||
while (!subInfo.index_->stopHistorical_)
|
||||
{
|
||||
auto dbResult = getMoreTxns(
|
||||
startLedgerSeq, lastLedgerSeq, marker);
|
||||
if (!dbResult)
|
||||
{
|
||||
JLOG(m_journal.debug())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId)
|
||||
<< " getMoreTxns failed.";
|
||||
send(rpcError(rpcINTERNAL), true);
|
||||
return;
|
||||
}
|
||||
|
||||
auto const& txns = dbResult->first;
|
||||
marker = dbResult->second;
|
||||
size_t num_txns = txns.size();
|
||||
for (size_t i = 0; i < num_txns; ++i)
|
||||
{
|
||||
auto const& [tx, meta] = txns[i];
|
||||
|
||||
if (!tx || !meta)
|
||||
{
|
||||
JLOG(m_journal.debug())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId)
|
||||
<< " empty tx or meta.";
|
||||
send(rpcError(rpcINTERNAL), true);
|
||||
return;
|
||||
}
|
||||
auto curTxLedger =
|
||||
app_.getLedgerMaster().getLedgerBySeq(
|
||||
tx->getLedger());
|
||||
if (!curTxLedger)
|
||||
{
|
||||
JLOG(m_journal.debug())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId) << " no ledger.";
|
||||
send(rpcError(rpcINTERNAL), true);
|
||||
return;
|
||||
}
|
||||
std::shared_ptr<STTx const> stTxn =
|
||||
tx->getSTransaction();
|
||||
if (!stTxn)
|
||||
{
|
||||
JLOG(m_journal.debug())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId)
|
||||
<< " getSTransaction failed.";
|
||||
send(rpcError(rpcINTERNAL), true);
|
||||
return;
|
||||
}
|
||||
|
||||
auto const mRef = std::ref(*meta);
|
||||
auto const trR = meta->getResultTER();
|
||||
MultiApiJson jvTx = transJson(
|
||||
stTxn, trR, true, curTxLedger, mRef);
|
||||
|
||||
jvTx.set(
|
||||
jss::account_history_tx_index,
|
||||
txHistoryIndex--);
|
||||
if (i + 1 == num_txns ||
|
||||
txns[i + 1].first->getLedger() !=
|
||||
tx->getLedger())
|
||||
jvTx.set(
|
||||
jss::account_history_boundary, true);
|
||||
|
||||
if (isFirstTx(tx, meta))
|
||||
{
|
||||
jvTx.set(
|
||||
jss::account_history_tx_first, true);
|
||||
sendMultiApiJson(jvTx, false);
|
||||
|
||||
JLOG(m_journal.trace())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId)
|
||||
<< " done, found last tx.";
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
sendMultiApiJson(jvTx, false);
|
||||
}
|
||||
}
|
||||
|
||||
if (marker)
|
||||
{
|
||||
JLOG(m_journal.trace())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId)
|
||||
<< " paging, marker=" << marker->ledgerSeq
|
||||
<< ":" << marker->txnSeq;
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!subInfo.index_->stopHistorical_)
|
||||
{
|
||||
lastLedgerSeq = startLedgerSeq - 1;
|
||||
if (lastLedgerSeq <= 1)
|
||||
{
|
||||
JLOG(m_journal.trace())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId)
|
||||
<< " done, reached genesis ledger.";
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (marker)
|
||||
{
|
||||
JLOG(m_journal.trace())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId)
|
||||
<< " paging, marker=" << marker->ledgerSeq << ":"
|
||||
<< marker->txnSeq;
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!subInfo.index_->stopHistorical_)
|
||||
{
|
||||
lastLedgerSeq = startLedgerSeq - 1;
|
||||
if (lastLedgerSeq <= 1)
|
||||
{
|
||||
JLOG(m_journal.trace())
|
||||
<< "AccountHistory job for account "
|
||||
<< toBase58(accountId)
|
||||
<< " done, reached genesis ledger.";
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
return;
|
||||
},
|
||||
"AccountHistoryTxStream",
|
||||
1s,
|
||||
this->m_journal);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include <xrpld/core/Config.h>
|
||||
#include <xrpld/core/DatabaseCon.h>
|
||||
#include <xrpld/core/SociDB.h>
|
||||
#include <xrpld/perflog/PerfLog.h>
|
||||
|
||||
#include <xrpl/basics/ByteUtilities.h>
|
||||
#include <xrpl/basics/contract.h>
|
||||
@@ -35,6 +36,8 @@
|
||||
|
||||
#include <memory>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace ripple {
|
||||
|
||||
static auto checkpointPageCount = 1000;
|
||||
@@ -257,9 +260,17 @@ public:
|
||||
// There is a separate check in `checkpoint` for a valid
|
||||
// connection in the rare case when the DatabaseCon is destroyed
|
||||
// after locking this weak_ptr
|
||||
[wp = std::weak_ptr<Checkpointer>{shared_from_this()}]() {
|
||||
if (auto self = wp.lock())
|
||||
self->checkpoint();
|
||||
[wp = std::weak_ptr<Checkpointer>{shared_from_this()},
|
||||
journal = j_]() {
|
||||
perf::measureDurationAndLog(
|
||||
[&]() {
|
||||
if (auto self = wp.lock())
|
||||
self->checkpoint();
|
||||
return true;
|
||||
},
|
||||
"WAL",
|
||||
1s,
|
||||
journal);
|
||||
}))
|
||||
{
|
||||
std::lock_guard lock(mutex_);
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
|
||||
#include <xrpld/net/RPCCall.h>
|
||||
#include <xrpld/net/RPCSub.h>
|
||||
#include <xrpld/perflog/PerfLog.h>
|
||||
|
||||
#include <xrpl/basics/Log.h>
|
||||
#include <xrpl/basics/StringUtilities.h>
|
||||
@@ -27,6 +28,8 @@
|
||||
|
||||
#include <deque>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace ripple {
|
||||
|
||||
// Subscription object for JSON-RPC
|
||||
@@ -91,8 +94,17 @@ public:
|
||||
JLOG(j_.info()) << "RPCCall::fromNetwork start";
|
||||
|
||||
mSending = m_jobQueue.addJob(
|
||||
jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this]() {
|
||||
sendThread();
|
||||
jtCLIENT_SUBSCRIBE,
|
||||
"RPCSub::sendThread",
|
||||
[this, journal = j_]() {
|
||||
perf::measureDurationAndLog(
|
||||
[&]() {
|
||||
sendThread();
|
||||
return true;
|
||||
},
|
||||
"RPCSub::sendThread",
|
||||
1s,
|
||||
journal);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -190,20 +190,35 @@ measureDurationAndLog(
|
||||
std::chrono::duration<Rep, Period> maxDelay,
|
||||
beast::Journal const& journal)
|
||||
{
|
||||
using Result = std::invoke_result_t<Func>;
|
||||
|
||||
auto start_time = std::chrono::high_resolution_clock::now();
|
||||
|
||||
auto result = func();
|
||||
|
||||
auto end_time = std::chrono::high_resolution_clock::now();
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
end_time - start_time);
|
||||
if (duration > maxDelay)
|
||||
if constexpr (std::is_void_v<Result>)
|
||||
{
|
||||
JLOG(journal.warn())
|
||||
<< actionDescription << " took " << duration.count() << " ms";
|
||||
std::forward<Func>(func)();
|
||||
auto end_time = std::chrono::high_resolution_clock::now();
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
end_time - start_time);
|
||||
if (duration > maxDelay)
|
||||
{
|
||||
JLOG(journal.warn())
|
||||
<< actionDescription << " took " << duration.count() << " ms";
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
Result result = std::forward<Func>(func)();
|
||||
auto end_time = std::chrono::high_resolution_clock::now();
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
end_time - start_time);
|
||||
if (duration > maxDelay)
|
||||
{
|
||||
JLOG(journal.warn())
|
||||
<< actionDescription << " took " << duration.count() << " ms";
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
} // namespace perf
|
||||
|
||||
Reference in New Issue
Block a user