Compare commits

..

2 Commits

Author SHA1 Message Date
JCW
22cc6f8781 Merge remote-tracking branch 'origin/develop' into a1q123456/minor-cleanup-for-network-ops
Signed-off-by: JCW <a1q123456@users.noreply.github.com>

# Conflicts:
#	src/xrpld/app/misc/NetworkOPs.cpp
2026-03-18 22:37:34 +00:00
JCW
beceec3a47 Clean up NetworkOPs
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2026-03-18 13:23:49 +00:00
12 changed files with 207 additions and 265 deletions

View File

@@ -11,7 +11,7 @@ runs:
steps:
# When a tag is pushed, the version is used as-is.
- name: Generate version for tag event
if: ${{ startsWith(github.ref, 'refs/tags/') }}
if: ${{ github.event_name == 'tag' }}
shell: bash
env:
VERSION: ${{ github.ref_name }}
@@ -22,7 +22,7 @@ runs:
# We use a plus sign instead of a hyphen because Conan recipe versions do
# not support two hyphens.
- name: Generate version for non-tag event
if: ${{ !startsWith(github.ref, 'refs/tags/') }}
if: ${{ github.event_name != 'tag' }}
shell: bash
run: |
echo 'Extracting version from BuildInfo.cpp.'

View File

@@ -1,13 +0,0 @@
name: Check PR commits
on:
pull_request:
# The action needs to have write permissions to post comments on the PR.
permissions:
contents: read
pull-requests: write
jobs:
check_commits:
uses: XRPLF/actions/.github/workflows/check-pr-commits.yml@481048b78b94ac3343d1292b4ef125a813879f2b

View File

@@ -11,4 +11,4 @@ on:
jobs:
check_title:
if: ${{ github.event.pull_request.draft != true }}
uses: XRPLF/actions/.github/workflows/check-pr-title.yml@e2c7f400d1e85ae65dad552fd425169fbacca4a3
uses: XRPLF/actions/.github/workflows/check-pr-title.yml@f9c2b57a7ac30d70555b5de6e627005f62e933f3

View File

@@ -5,7 +5,7 @@ name: Tag
on:
push:
tags:
- "[0-9]+.[0-9]+.[0-9]*"
- "v*"
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}

View File

@@ -14,7 +14,7 @@ on:
jobs:
# Call the workflow in the XRPLF/actions repo that runs the pre-commit hooks.
run-hooks:
uses: XRPLF/actions/.github/workflows/pre-commit.yml@e7896f15cc60d0da1a272c77ee5c4026b424f9c7
uses: XRPLF/actions/.github/workflows/pre-commit.yml@44856eb0d6ecb7d376370244324ab3dc8b863bad
with:
runs_on: ubuntu-latest
container: '{ "image": "ghcr.io/xrplf/ci/tools-rippled-pre-commit:sha-41ec7c1" }'

View File

@@ -48,7 +48,7 @@ jobs:
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Prepare runner
uses: XRPLF/actions/prepare-runner@2bbc2dc1abeec7bfaa886804ab86871ac201764e
uses: XRPLF/actions/prepare-runner@2cbf481018d930656e9276fcc20dc0e3a0be5b6d
with:
enable_ccache: false

View File

@@ -107,7 +107,7 @@ jobs:
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Prepare runner
uses: XRPLF/actions/prepare-runner@2bbc2dc1abeec7bfaa886804ab86871ac201764e
uses: XRPLF/actions/prepare-runner@2cbf481018d930656e9276fcc20dc0e3a0be5b6d
with:
enable_ccache: ${{ inputs.ccache_enabled }}

View File

@@ -35,7 +35,7 @@ jobs:
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Prepare runner
uses: XRPLF/actions/prepare-runner@2bbc2dc1abeec7bfaa886804ab86871ac201764e
uses: XRPLF/actions/prepare-runner@2cbf481018d930656e9276fcc20dc0e3a0be5b6d
with:
enable_ccache: false

View File

@@ -89,10 +89,10 @@ jobs:
conan export . --version=rc
conan upload --confirm --check --remote="${REMOTE_NAME}" xrpl/rc
# When this workflow is triggered by a push event, it will always be when tagging a final
# When this workflow is triggered by a tag event, it will always be when tagging a final
# release, see on-tag.yml.
- name: Upload Conan recipe (release)
if: ${{ startsWith(github.ref, 'refs/tags/') }}
if: ${{ github.event_name == 'tag' }}
env:
REMOTE_NAME: ${{ inputs.remote_name }}
run: |

View File

@@ -70,7 +70,7 @@ jobs:
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Prepare runner
uses: XRPLF/actions/prepare-runner@2bbc2dc1abeec7bfaa886804ab86871ac201764e
uses: XRPLF/actions/prepare-runner@2cbf481018d930656e9276fcc20dc0e3a0be5b6d
with:
enable_ccache: false

View File

@@ -89,7 +89,7 @@ TEST(parseStatmRSSkB, standard_format)
// Test empty string
{
std::string statm;
std::string statm = "";
long result = parseStatmRSSkB(statm);
EXPECT_EQ(result, -1);
}

View File

@@ -17,7 +17,6 @@
#include <xrpld/app/misc/ValidatorList.h>
#include <xrpld/app/misc/detail/AccountTxPaging.h>
#include <xrpld/app/misc/make_NetworkOPs.h>
#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
#include <xrpld/consensus/Consensus.h>
#include <xrpld/consensus/ConsensusParms.h>
#include <xrpld/core/ConfigSections.h>
@@ -3549,281 +3548,237 @@ NetworkOPsImp::unsubAccountInternal(
void
NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
{
enum DatabaseType { Sqlite, None };
static auto const databaseType = [&]() -> DatabaseType {
// Use a dynamic_cast to return DatabaseType::None
// on failure.
if (dynamic_cast<SQLiteDatabase*>(&registry_.getRelationalDatabase()))
{
return DatabaseType::Sqlite;
}
return DatabaseType::None;
}();
registry_.getJobQueue().addJob(jtCLIENT_ACCT_HIST, "HistTxStream", [this, subInfo]() {
auto const& accountId = subInfo.index_->accountId_;
auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
if (databaseType == DatabaseType::None)
{
// LCOV_EXCL_START
UNREACHABLE("xrpl::NetworkOPsImp::addAccountHistoryJob : no database");
JLOG(m_journal.error()) << "AccountHistory job for account "
<< toBase58(subInfo.index_->accountId_) << " no database";
if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
{
sptr->send(rpcError(rpcINTERNAL), true);
unsubAccountHistory(sptr, subInfo.index_->accountId_, false);
}
return;
// LCOV_EXCL_STOP
}
JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
<< " started. lastLedgerSeq=" << lastLedgerSeq;
registry_.getJobQueue().addJob(
jtCLIENT_ACCT_HIST, "HistTxStream", [this, dbType = databaseType, subInfo]() {
auto const& accountId = subInfo.index_->accountId_;
auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
auto isFirstTx = [&](std::shared_ptr<Transaction> const& tx,
std::shared_ptr<TxMeta> const& meta) -> bool {
/*
* genesis account: first tx is the one with seq 1
* other account: first tx is the one created the account
*/
if (accountId == genesisAccountId)
{
auto stx = tx->getSTransaction();
if (stx->getAccountID(sfAccount) == accountId && stx->getSeqValue() == 1)
return true;
}
JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
<< " started. lastLedgerSeq=" << lastLedgerSeq;
for (auto& node : meta->getNodes())
{
if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
continue;
auto isFirstTx = [&](std::shared_ptr<Transaction> const& tx,
std::shared_ptr<TxMeta> const& meta) -> bool {
/*
* genesis account: first tx is the one with seq 1
* other account: first tx is the one created the account
*/
if (accountId == genesisAccountId)
if (node.isFieldPresent(sfNewFields))
{
auto stx = tx->getSTransaction();
if (stx->getAccountID(sfAccount) == accountId && stx->getSeqValue() == 1)
return true;
}
for (auto& node : meta->getNodes())
{
if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
continue;
if (node.isFieldPresent(sfNewFields))
if (auto inner = dynamic_cast<STObject const*>(node.peekAtPField(sfNewFields));
inner)
{
if (auto inner =
dynamic_cast<STObject const*>(node.peekAtPField(sfNewFields));
inner)
if (inner->isFieldPresent(sfAccount) &&
inner->getAccountID(sfAccount) == accountId)
{
if (inner->isFieldPresent(sfAccount) &&
inner->getAccountID(sfAccount) == accountId)
{
return true;
}
return true;
}
}
}
}
return false;
};
return false;
};
auto send = [&](Json::Value const& jvObj, bool unsubscribe) -> bool {
if (auto sptr = subInfo.sinkWptr_.lock())
{
sptr->send(jvObj, true);
if (unsubscribe)
unsubAccountHistory(sptr, accountId, false);
return true;
}
return false;
};
auto sendMultiApiJson = [&](MultiApiJson const& jvObj, bool unsubscribe) -> bool {
if (auto sptr = subInfo.sinkWptr_.lock())
{
jvObj.visit(
sptr->getApiVersion(), //
[&](Json::Value const& jv) { sptr->send(jv, true); });
if (unsubscribe)
unsubAccountHistory(sptr, accountId, false);
return true;
}
return false;
};
auto getMoreTxns = [&](std::uint32_t minLedger,
std::uint32_t maxLedger,
std::optional<RelationalDatabase::AccountTxMarker> marker)
-> std::optional<std::pair<
RelationalDatabase::AccountTxs,
std::optional<RelationalDatabase::AccountTxMarker>>> {
switch (dbType)
{
case Sqlite: {
auto db =
safe_downcast<SQLiteDatabase*>(&registry_.getRelationalDatabase());
RelationalDatabase::AccountTxPageOptions options{
accountId, minLedger, maxLedger, marker, 0, true};
return db->newestAccountTxPage(options);
}
// LCOV_EXCL_START
default: {
UNREACHABLE(
"xrpl::NetworkOPsImp::addAccountHistoryJob : "
"getMoreTxns : invalid database type");
return {};
}
// LCOV_EXCL_STOP
}
};
/*
* search backward until the genesis ledger or asked to stop
*/
while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_)
auto send = [&](Json::Value const& jvObj, bool unsubscribe) -> bool {
if (auto sptr = subInfo.sinkWptr_.lock())
{
int feeChargeCount = 0;
if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
sptr->send(jvObj, true);
if (unsubscribe)
unsubAccountHistory(sptr, accountId, false);
return true;
}
return false;
};
auto sendMultiApiJson = [&](MultiApiJson const& jvObj, bool unsubscribe) -> bool {
if (auto sptr = subInfo.sinkWptr_.lock())
{
jvObj.visit(
sptr->getApiVersion(), //
[&](Json::Value const& jv) { sptr->send(jv, true); });
if (unsubscribe)
unsubAccountHistory(sptr, accountId, false);
return true;
}
return false;
};
auto getMoreTxns = [&](std::uint32_t minLedger,
std::uint32_t maxLedger,
std::optional<RelationalDatabase::AccountTxMarker> marker)
-> std::optional<std::pair<
RelationalDatabase::AccountTxs,
std::optional<RelationalDatabase::AccountTxMarker>>> {
auto& db = registry_.getRelationalDatabase();
RelationalDatabase::AccountTxPageOptions options{
accountId, minLedger, maxLedger, marker, 0, true};
return db.newestAccountTxPage(options);
};
/*
* search backward until the genesis ledger or asked to stop
*/
while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_)
{
int feeChargeCount = 0;
if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
{
sptr->getConsumer().charge(Resource::feeMediumBurdenRPC);
++feeChargeCount;
}
else
{
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " no InfoSub. Fee charged " << feeChargeCount << " times.";
return;
}
// try to search in 1024 ledgers till reaching genesis ledgers
auto startLedgerSeq = (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< ", working on ledger range [" << startLedgerSeq << "," << lastLedgerSeq << "]";
auto haveRange = [&]() -> bool {
std::uint32_t validatedMin = UINT_MAX;
std::uint32_t validatedMax = 0;
auto haveSomeValidatedLedgers =
registry_.getLedgerMaster().getValidatedRange(validatedMin, validatedMax);
return haveSomeValidatedLedgers && validatedMin <= startLedgerSeq &&
lastLedgerSeq <= validatedMax;
}();
if (!haveRange)
{
JLOG(m_journal.debug()) << "AccountHistory reschedule job for account "
<< toBase58(accountId) << ", incomplete ledger range ["
<< startLedgerSeq << "," << lastLedgerSeq << "]";
setAccountHistoryJobTimer(subInfo);
return;
}
std::optional<RelationalDatabase::AccountTxMarker> marker{};
while (!subInfo.index_->stopHistorical_)
{
auto dbResult = getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
if (!dbResult)
{
sptr->getConsumer().charge(Resource::feeMediumBurdenRPC);
++feeChargeCount;
}
else
{
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " no InfoSub. Fee charged " << feeChargeCount << " times.";
// LCOV_EXCL_START
UNREACHABLE(
"xrpl::NetworkOPsImp::addAccountHistoryJob : "
"getMoreTxns failed");
JLOG(m_journal.debug()) << "AccountHistory job for account "
<< toBase58(accountId) << " getMoreTxns failed.";
send(rpcError(rpcINTERNAL), true);
return;
// LCOV_EXCL_STOP
}
// try to search in 1024 ledgers till reaching genesis ledgers
auto startLedgerSeq = (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
<< ", working on ledger range [" << startLedgerSeq << ","
<< lastLedgerSeq << "]";
auto haveRange = [&]() -> bool {
std::uint32_t validatedMin = UINT_MAX;
std::uint32_t validatedMax = 0;
auto haveSomeValidatedLedgers =
registry_.getLedgerMaster().getValidatedRange(validatedMin, validatedMax);
return haveSomeValidatedLedgers && validatedMin <= startLedgerSeq &&
lastLedgerSeq <= validatedMax;
}();
if (!haveRange)
auto const& txns = dbResult->first;
marker = dbResult->second;
size_t num_txns = txns.size();
for (size_t i = 0; i < num_txns; ++i)
{
JLOG(m_journal.debug()) << "AccountHistory reschedule job for account "
<< toBase58(accountId) << ", incomplete ledger range ["
<< startLedgerSeq << "," << lastLedgerSeq << "]";
setAccountHistoryJobTimer(subInfo);
return;
}
auto const& [tx, meta] = txns[i];
std::optional<RelationalDatabase::AccountTxMarker> marker{};
while (!subInfo.index_->stopHistorical_)
{
auto dbResult = getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
if (!dbResult)
if (!tx || !meta)
{
JLOG(m_journal.debug()) << "AccountHistory job for account "
<< toBase58(accountId) << " empty tx or meta.";
send(rpcError(rpcINTERNAL), true);
return;
}
auto curTxLedger = registry_.getLedgerMaster().getLedgerBySeq(tx->getLedger());
if (!curTxLedger)
{
// LCOV_EXCL_START
UNREACHABLE(
"xrpl::NetworkOPsImp::addAccountHistoryJob : "
"getMoreTxns failed");
"getLedgerBySeq failed");
JLOG(m_journal.debug()) << "AccountHistory job for account "
<< toBase58(accountId) << " getMoreTxns failed.";
<< toBase58(accountId) << " no ledger.";
send(rpcError(rpcINTERNAL), true);
return;
// LCOV_EXCL_STOP
}
std::shared_ptr<STTx const> stTxn = tx->getSTransaction();
if (!stTxn)
{
// LCOV_EXCL_START
UNREACHABLE(
"NetworkOPsImp::addAccountHistoryJob : "
"getSTransaction failed");
JLOG(m_journal.debug())
<< "AccountHistory job for account " << toBase58(accountId)
<< " getSTransaction failed.";
send(rpcError(rpcINTERNAL), true);
return;
// LCOV_EXCL_STOP
}
auto const& txns = dbResult->first;
marker = dbResult->second;
size_t num_txns = txns.size();
for (size_t i = 0; i < num_txns; ++i)
auto const mRef = std::ref(*meta);
auto const trR = meta->getResultTER();
MultiApiJson jvTx = transJson(stTxn, trR, true, curTxLedger, mRef);
jvTx.set(jss::account_history_tx_index, txHistoryIndex--);
if (i + 1 == num_txns || txns[i + 1].first->getLedger() != tx->getLedger())
jvTx.set(jss::account_history_boundary, true);
if (isFirstTx(tx, meta))
{
auto const& [tx, meta] = txns[i];
if (!tx || !meta)
{
JLOG(m_journal.debug()) << "AccountHistory job for account "
<< toBase58(accountId) << " empty tx or meta.";
send(rpcError(rpcINTERNAL), true);
return;
}
auto curTxLedger =
registry_.getLedgerMaster().getLedgerBySeq(tx->getLedger());
if (!curTxLedger)
{
// LCOV_EXCL_START
UNREACHABLE(
"xrpl::NetworkOPsImp::addAccountHistoryJob : "
"getLedgerBySeq failed");
JLOG(m_journal.debug()) << "AccountHistory job for account "
<< toBase58(accountId) << " no ledger.";
send(rpcError(rpcINTERNAL), true);
return;
// LCOV_EXCL_STOP
}
std::shared_ptr<STTx const> stTxn = tx->getSTransaction();
if (!stTxn)
{
// LCOV_EXCL_START
UNREACHABLE(
"NetworkOPsImp::addAccountHistoryJob : "
"getSTransaction failed");
JLOG(m_journal.debug())
<< "AccountHistory job for account " << toBase58(accountId)
<< " getSTransaction failed.";
send(rpcError(rpcINTERNAL), true);
return;
// LCOV_EXCL_STOP
}
auto const mRef = std::ref(*meta);
auto const trR = meta->getResultTER();
MultiApiJson jvTx = transJson(stTxn, trR, true, curTxLedger, mRef);
jvTx.set(jss::account_history_tx_index, txHistoryIndex--);
if (i + 1 == num_txns || txns[i + 1].first->getLedger() != tx->getLedger())
jvTx.set(jss::account_history_boundary, true);
if (isFirstTx(tx, meta))
{
jvTx.set(jss::account_history_tx_first, true);
sendMultiApiJson(jvTx, false);
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " done, found last tx.";
return;
}
jvTx.set(jss::account_history_tx_first, true);
sendMultiApiJson(jvTx, false);
}
if (marker)
{
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " paging, marker=" << marker->ledgerSeq << ":" << marker->txnSeq;
}
else
{
break;
}
}
if (!subInfo.index_->stopHistorical_)
{
lastLedgerSeq = startLedgerSeq - 1;
if (lastLedgerSeq <= 1)
{
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " done, reached genesis ledger.";
JLOG(m_journal.trace()) << "AccountHistory job for account "
<< toBase58(accountId) << " done, found last tx.";
return;
}
sendMultiApiJson(jvTx, false);
}
if (marker)
{
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " paging, marker=" << marker->ledgerSeq << ":" << marker->txnSeq;
}
else
{
break;
}
}
});
if (!subInfo.index_->stopHistorical_)
{
lastLedgerSeq = startLedgerSeq - 1;
if (lastLedgerSeq <= 1)
{
JLOG(m_journal.trace())
<< "AccountHistory job for account " << toBase58(accountId)
<< " done, reached genesis ledger.";
return;
}
}
}
});
}
void