Compare commits

...

2 Commits

Author SHA1 Message Date
JCW
22cc6f8781 Merge remote-tracking branch 'origin/develop' into a1q123456/minor-cleanup-for-network-ops
Signed-off-by: JCW <a1q123456@users.noreply.github.com>

# Conflicts:
#	src/xrpld/app/misc/NetworkOPs.cpp
2026-03-18 22:37:34 +00:00
JCW
beceec3a47 Clean up NetworkOPs
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2026-03-18 13:23:49 +00:00

View File

@@ -17,7 +17,6 @@
#include <xrpld/app/misc/ValidatorList.h>
#include <xrpld/app/misc/detail/AccountTxPaging.h>
#include <xrpld/app/misc/make_NetworkOPs.h>
#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
#include <xrpld/consensus/Consensus.h>
#include <xrpld/consensus/ConsensusParms.h>
#include <xrpld/core/ConfigSections.h>
@@ -3549,281 +3548,237 @@ NetworkOPsImp::unsubAccountInternal(
void
NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
{
enum DatabaseType { Sqlite, None };
static auto const databaseType = [&]() -> DatabaseType {
// Use a dynamic_cast to return DatabaseType::None
// on failure.
if (dynamic_cast<SQLiteDatabase*>(&registry_.getRelationalDatabase()))
{
return DatabaseType::Sqlite;
}
return DatabaseType::None;
}();
registry_.getJobQueue().addJob(jtCLIENT_ACCT_HIST, "HistTxStream", [this, subInfo]() {
auto const& accountId = subInfo.index_->accountId_;
auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
if (databaseType == DatabaseType::None)
{
// LCOV_EXCL_START
UNREACHABLE("xrpl::NetworkOPsImp::addAccountHistoryJob : no database");
JLOG(m_journal.error()) << "AccountHistory job for account "
<< toBase58(subInfo.index_->accountId_) << " no database";
if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
{
sptr->send(rpcError(rpcINTERNAL), true);
unsubAccountHistory(sptr, subInfo.index_->accountId_, false);
}
return;
// LCOV_EXCL_STOP
}
JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
<< " started. lastLedgerSeq=" << lastLedgerSeq;
registry_.getJobQueue().addJob(
jtCLIENT_ACCT_HIST, "HistTxStream", [this, dbType = databaseType, subInfo]() {
auto const& accountId = subInfo.index_->accountId_;
auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
auto isFirstTx = [&](std::shared_ptr<Transaction> const& tx,
std::shared_ptr<TxMeta> const& meta) -> bool {
/*
* genesis account: first tx is the one with seq 1
* other account: first tx is the one created the account
*/
if (accountId == genesisAccountId)
{
auto stx = tx->getSTransaction();
if (stx->getAccountID(sfAccount) == accountId && stx->getSeqValue() == 1)
return true;
}
JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
<< " started. lastLedgerSeq=" << lastLedgerSeq;
for (auto& node : meta->getNodes())
{
if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
continue;
auto isFirstTx = [&](std::shared_ptr<Transaction> const& tx,
std::shared_ptr<TxMeta> const& meta) -> bool {
/*
* genesis account: first tx is the one with seq 1
* other account: first tx is the one created the account
*/
if (accountId == genesisAccountId)
if (node.isFieldPresent(sfNewFields))
{
auto stx = tx->getSTransaction();
if (stx->getAccountID(sfAccount) == accountId && stx->getSeqValue() == 1)
return true;
}
for (auto& node : meta->getNodes())
{
if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
continue;
if (node.isFieldPresent(sfNewFields))
if (auto inner = dynamic_cast<STObject const*>(node.peekAtPField(sfNewFields));
inner)
{
if (auto inner =
dynamic_cast<STObject const*>(node.peekAtPField(sfNewFields));
inner)
if (inner->isFieldPresent(sfAccount) &&
inner->getAccountID(sfAccount) == accountId)
{
if (inner->isFieldPresent(sfAccount) &&
inner->getAccountID(sfAccount) == accountId)
{
return true;
}
return true;
}
}
}
}
return false;
};
return false;
};
auto send = [&](Json::Value const& jvObj, bool unsubscribe) -> bool {
if (auto sptr = subInfo.sinkWptr_.lock())
{
sptr->send(jvObj, true);
if (unsubscribe)
unsubAccountHistory(sptr, accountId, false);
return true;
}
return false;
};
auto sendMultiApiJson = [&](MultiApiJson const& jvObj, bool unsubscribe) -> bool {
if (auto sptr = subInfo.sinkWptr_.lock())
{
jvObj.visit(
sptr->getApiVersion(), //
[&](Json::Value const& jv) { sptr->send(jv, true); });
if (unsubscribe)
unsubAccountHistory(sptr, accountId, false);
return true;
}
return false;
};
auto getMoreTxns = [&](std::uint32_t minLedger,
std::uint32_t maxLedger,
std::optional<RelationalDatabase::AccountTxMarker> marker)
-> std::optional<std::pair<
RelationalDatabase::AccountTxs,
std::optional<RelationalDatabase::AccountTxMarker>>> {
switch (dbType)
{
case Sqlite: {
auto db =
safe_downcast<SQLiteDatabase*>(&registry_.getRelationalDatabase());
RelationalDatabase::AccountTxPageOptions options{
accountId, minLedger, maxLedger, marker, 0, true};
return db->newestAccountTxPage(options);
}
// LCOV_EXCL_START
default: {
UNREACHABLE(
"xrpl::NetworkOPsImp::addAccountHistoryJob : "
"getMoreTxns : invalid database type");
return {};
}
// LCOV_EXCL_STOP
}
};
/*
* search backward until the genesis ledger or asked to stop
*/
while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_)
auto send = [&](Json::Value const& jvObj, bool unsubscribe) -> bool {
if (auto sptr = subInfo.sinkWptr_.lock())
{
int feeChargeCount = 0;
if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
sptr->send(jvObj, true);
if (unsubscribe)
unsubAccountHistory(sptr, accountId, false);
return true;
}
return false;
};
auto sendMultiApiJson = [&](MultiApiJson const& jvObj, bool unsubscribe) -> bool {
if (auto sptr = subInfo.sinkWptr_.lock())
{
jvObj.visit(
sptr->getApiVersion(), //
[&](Json::Value const& jv) { sptr->send(jv, true); });
if (unsubscribe)
unsubAccountHistory(sptr, accountId, false);
return true;
}
return false;
};
auto getMoreTxns = [&](std::uint32_t minLedger,
std::uint32_t maxLedger,
std::optional<RelationalDatabase::AccountTxMarker> marker)
-> std::optional<std::pair<
RelationalDatabase::AccountTxs,
std::optional<RelationalDatabase::AccountTxMarker>>> {
auto& db = registry_.getRelationalDatabase();
RelationalDatabase::AccountTxPageOptions options{
accountId, minLedger, maxLedger, marker, 0, true};
return db.newestAccountTxPage(options);
};
/*
* search backward until the genesis ledger or asked to stop
*/
while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_)
{
int feeChargeCount = 0;
if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
{
sptr->getConsumer().charge(Resource::feeMediumBurdenRPC);
++feeChargeCount;
}
else
{
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " no InfoSub. Fee charged " << feeChargeCount << " times.";
return;
}
// try to search in 1024 ledgers till reaching genesis ledgers
auto startLedgerSeq = (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< ", working on ledger range [" << startLedgerSeq << "," << lastLedgerSeq << "]";
auto haveRange = [&]() -> bool {
std::uint32_t validatedMin = UINT_MAX;
std::uint32_t validatedMax = 0;
auto haveSomeValidatedLedgers =
registry_.getLedgerMaster().getValidatedRange(validatedMin, validatedMax);
return haveSomeValidatedLedgers && validatedMin <= startLedgerSeq &&
lastLedgerSeq <= validatedMax;
}();
if (!haveRange)
{
JLOG(m_journal.debug()) << "AccountHistory reschedule job for account "
<< toBase58(accountId) << ", incomplete ledger range ["
<< startLedgerSeq << "," << lastLedgerSeq << "]";
setAccountHistoryJobTimer(subInfo);
return;
}
std::optional<RelationalDatabase::AccountTxMarker> marker{};
while (!subInfo.index_->stopHistorical_)
{
auto dbResult = getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
if (!dbResult)
{
sptr->getConsumer().charge(Resource::feeMediumBurdenRPC);
++feeChargeCount;
}
else
{
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " no InfoSub. Fee charged " << feeChargeCount << " times.";
// LCOV_EXCL_START
UNREACHABLE(
"xrpl::NetworkOPsImp::addAccountHistoryJob : "
"getMoreTxns failed");
JLOG(m_journal.debug()) << "AccountHistory job for account "
<< toBase58(accountId) << " getMoreTxns failed.";
send(rpcError(rpcINTERNAL), true);
return;
// LCOV_EXCL_STOP
}
// try to search in 1024 ledgers till reaching genesis ledgers
auto startLedgerSeq = (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
<< ", working on ledger range [" << startLedgerSeq << ","
<< lastLedgerSeq << "]";
auto haveRange = [&]() -> bool {
std::uint32_t validatedMin = UINT_MAX;
std::uint32_t validatedMax = 0;
auto haveSomeValidatedLedgers =
registry_.getLedgerMaster().getValidatedRange(validatedMin, validatedMax);
return haveSomeValidatedLedgers && validatedMin <= startLedgerSeq &&
lastLedgerSeq <= validatedMax;
}();
if (!haveRange)
auto const& txns = dbResult->first;
marker = dbResult->second;
size_t num_txns = txns.size();
for (size_t i = 0; i < num_txns; ++i)
{
JLOG(m_journal.debug()) << "AccountHistory reschedule job for account "
<< toBase58(accountId) << ", incomplete ledger range ["
<< startLedgerSeq << "," << lastLedgerSeq << "]";
setAccountHistoryJobTimer(subInfo);
return;
}
auto const& [tx, meta] = txns[i];
std::optional<RelationalDatabase::AccountTxMarker> marker{};
while (!subInfo.index_->stopHistorical_)
{
auto dbResult = getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
if (!dbResult)
if (!tx || !meta)
{
JLOG(m_journal.debug()) << "AccountHistory job for account "
<< toBase58(accountId) << " empty tx or meta.";
send(rpcError(rpcINTERNAL), true);
return;
}
auto curTxLedger = registry_.getLedgerMaster().getLedgerBySeq(tx->getLedger());
if (!curTxLedger)
{
// LCOV_EXCL_START
UNREACHABLE(
"xrpl::NetworkOPsImp::addAccountHistoryJob : "
"getMoreTxns failed");
"getLedgerBySeq failed");
JLOG(m_journal.debug()) << "AccountHistory job for account "
<< toBase58(accountId) << " getMoreTxns failed.";
<< toBase58(accountId) << " no ledger.";
send(rpcError(rpcINTERNAL), true);
return;
// LCOV_EXCL_STOP
}
std::shared_ptr<STTx const> stTxn = tx->getSTransaction();
if (!stTxn)
{
// LCOV_EXCL_START
UNREACHABLE(
"NetworkOPsImp::addAccountHistoryJob : "
"getSTransaction failed");
JLOG(m_journal.debug())
<< "AccountHistory job for account " << toBase58(accountId)
<< " getSTransaction failed.";
send(rpcError(rpcINTERNAL), true);
return;
// LCOV_EXCL_STOP
}
auto const& txns = dbResult->first;
marker = dbResult->second;
size_t num_txns = txns.size();
for (size_t i = 0; i < num_txns; ++i)
auto const mRef = std::ref(*meta);
auto const trR = meta->getResultTER();
MultiApiJson jvTx = transJson(stTxn, trR, true, curTxLedger, mRef);
jvTx.set(jss::account_history_tx_index, txHistoryIndex--);
if (i + 1 == num_txns || txns[i + 1].first->getLedger() != tx->getLedger())
jvTx.set(jss::account_history_boundary, true);
if (isFirstTx(tx, meta))
{
auto const& [tx, meta] = txns[i];
if (!tx || !meta)
{
JLOG(m_journal.debug()) << "AccountHistory job for account "
<< toBase58(accountId) << " empty tx or meta.";
send(rpcError(rpcINTERNAL), true);
return;
}
auto curTxLedger =
registry_.getLedgerMaster().getLedgerBySeq(tx->getLedger());
if (!curTxLedger)
{
// LCOV_EXCL_START
UNREACHABLE(
"xrpl::NetworkOPsImp::addAccountHistoryJob : "
"getLedgerBySeq failed");
JLOG(m_journal.debug()) << "AccountHistory job for account "
<< toBase58(accountId) << " no ledger.";
send(rpcError(rpcINTERNAL), true);
return;
// LCOV_EXCL_STOP
}
std::shared_ptr<STTx const> stTxn = tx->getSTransaction();
if (!stTxn)
{
// LCOV_EXCL_START
UNREACHABLE(
"NetworkOPsImp::addAccountHistoryJob : "
"getSTransaction failed");
JLOG(m_journal.debug())
<< "AccountHistory job for account " << toBase58(accountId)
<< " getSTransaction failed.";
send(rpcError(rpcINTERNAL), true);
return;
// LCOV_EXCL_STOP
}
auto const mRef = std::ref(*meta);
auto const trR = meta->getResultTER();
MultiApiJson jvTx = transJson(stTxn, trR, true, curTxLedger, mRef);
jvTx.set(jss::account_history_tx_index, txHistoryIndex--);
if (i + 1 == num_txns || txns[i + 1].first->getLedger() != tx->getLedger())
jvTx.set(jss::account_history_boundary, true);
if (isFirstTx(tx, meta))
{
jvTx.set(jss::account_history_tx_first, true);
sendMultiApiJson(jvTx, false);
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " done, found last tx.";
return;
}
jvTx.set(jss::account_history_tx_first, true);
sendMultiApiJson(jvTx, false);
}
if (marker)
{
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " paging, marker=" << marker->ledgerSeq << ":" << marker->txnSeq;
}
else
{
break;
}
}
if (!subInfo.index_->stopHistorical_)
{
lastLedgerSeq = startLedgerSeq - 1;
if (lastLedgerSeq <= 1)
{
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " done, reached genesis ledger.";
JLOG(m_journal.trace()) << "AccountHistory job for account "
<< toBase58(accountId) << " done, found last tx.";
return;
}
sendMultiApiJson(jvTx, false);
}
if (marker)
{
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " paging, marker=" << marker->ledgerSeq << ":" << marker->txnSeq;
}
else
{
break;
}
}
});
if (!subInfo.index_->stopHistorical_)
{
lastLedgerSeq = startLedgerSeq - 1;
if (lastLedgerSeq <= 1)
{
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " done, reached genesis ledger.";
return;
}
}
}
});
}
void