mirror of
https://github.com/XRPLF/rippled.git
synced 2026-04-29 15:37:57 +00:00
fix: extend tx span lifetimes across async job boundaries
- tx.receive span in PeerImp: convert to shared_ptr, capture in checkTransaction lambda so it measures actual processing, not just message parsing - tx.process span in NetworkOPs: convert to shared_ptr, store in TransactionStatus so it lives until the batch job processes the entry; sync path unchanged (span destructs on function return) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -172,9 +172,16 @@ class NetworkOPsImp final : public NetworkOPs
|
||||
FailHard const failType;
|
||||
bool applied = false;
|
||||
TER result;
|
||||
/// Keeps the tx.process span alive until the batch processes this entry.
|
||||
std::shared_ptr<telemetry::SpanGuard> span;
|
||||
|
||||
TransactionStatus(std::shared_ptr<Transaction> t, bool a, bool l, FailHard f)
|
||||
: transaction(std::move(t)), admin(a), local(l), failType(f)
|
||||
TransactionStatus(
|
||||
std::shared_ptr<Transaction> t,
|
||||
bool a,
|
||||
bool l,
|
||||
FailHard f,
|
||||
std::shared_ptr<telemetry::SpanGuard> s = nullptr)
|
||||
: transaction(std::move(t)), admin(a), local(l), failType(f), span(std::move(s))
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
local || failType == FailHard::no,
|
||||
@@ -397,7 +404,8 @@ public:
|
||||
doTransactionAsync(
|
||||
std::shared_ptr<Transaction> transaction,
|
||||
bool bUnlimited,
|
||||
FailHard failtype);
|
||||
FailHard failtype,
|
||||
std::shared_ptr<telemetry::SpanGuard> span = nullptr);
|
||||
|
||||
private:
|
||||
bool
|
||||
@@ -1315,9 +1323,9 @@ NetworkOPsImp::processTransaction(
|
||||
FailHard failType)
|
||||
{
|
||||
using namespace telemetry;
|
||||
auto span = txProcessSpan(transaction->getID());
|
||||
span.setAttribute(tx_span::attr::hash, to_string(transaction->getID()).c_str());
|
||||
span.setAttribute(tx_span::attr::local, bLocal);
|
||||
auto span = std::make_shared<SpanGuard>(txProcessSpan(transaction->getID()));
|
||||
span->setAttribute(tx_span::attr::hash, to_string(transaction->getID()).c_str());
|
||||
span->setAttribute(tx_span::attr::local, bLocal);
|
||||
|
||||
auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
|
||||
|
||||
@@ -1327,13 +1335,13 @@ NetworkOPsImp::processTransaction(
|
||||
|
||||
if (bLocal)
|
||||
{
|
||||
span.setAttribute(tx_span::attr::path, tx_span::val::sync);
|
||||
span->setAttribute(tx_span::attr::path, tx_span::val::sync);
|
||||
doTransactionSync(transaction, bUnlimited, failType);
|
||||
}
|
||||
else
|
||||
{
|
||||
span.setAttribute(tx_span::attr::path, tx_span::val::async);
|
||||
doTransactionAsync(transaction, bUnlimited, failType);
|
||||
span->setAttribute(tx_span::attr::path, tx_span::val::async);
|
||||
doTransactionAsync(transaction, bUnlimited, failType, std::move(span));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1341,14 +1349,15 @@ void
|
||||
NetworkOPsImp::doTransactionAsync(
|
||||
std::shared_ptr<Transaction> transaction,
|
||||
bool bUnlimited,
|
||||
FailHard failType)
|
||||
FailHard failType,
|
||||
std::shared_ptr<telemetry::SpanGuard> span)
|
||||
{
|
||||
std::lock_guard const lock(mMutex);
|
||||
|
||||
if (transaction->getApplying())
|
||||
return;
|
||||
|
||||
mTransactions.emplace_back(transaction, bUnlimited, false, failType);
|
||||
mTransactions.emplace_back(transaction, bUnlimited, false, failType, std::move(span));
|
||||
transaction->setApplying();
|
||||
|
||||
if (mDispatchState == DispatchState::none)
|
||||
|
||||
@@ -1442,11 +1442,11 @@ PeerImp::handleTransaction(
|
||||
uint256 const txID = stx->getTransactionID();
|
||||
|
||||
using namespace telemetry;
|
||||
auto span = txReceiveSpan(txID, *m);
|
||||
span.setAttribute(tx_span::attr::hash, to_string(txID).c_str());
|
||||
span.setAttribute(tx_span::attr::peerId, static_cast<int64_t>(id_));
|
||||
auto span = std::make_shared<SpanGuard>(txReceiveSpan(txID, *m));
|
||||
span->setAttribute(tx_span::attr::hash, to_string(txID).c_str());
|
||||
span->setAttribute(tx_span::attr::peerId, static_cast<int64_t>(id_));
|
||||
if (auto const version = getVersion(); !version.empty())
|
||||
span.setAttribute(tx_span::attr::peerVersion, version.c_str());
|
||||
span->setAttribute(tx_span::attr::peerVersion, version.c_str());
|
||||
|
||||
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
|
||||
// LCOV_EXCL_START
|
||||
@@ -1480,11 +1480,11 @@ PeerImp::handleTransaction(
|
||||
|
||||
if (!app_.getHashRouter().shouldProcess(txID, id_, flags, tx_interval))
|
||||
{
|
||||
span.setAttribute(tx_span::attr::suppressed, true);
|
||||
span->setAttribute(tx_span::attr::suppressed, true);
|
||||
// we have seen this transaction recently
|
||||
if (any(flags & HashRouterFlags::BAD))
|
||||
{
|
||||
span.setAttribute(tx_span::attr::status, tx_span::val::knownBad);
|
||||
span->setAttribute(tx_span::attr::status, tx_span::val::knownBad);
|
||||
fee_.update(Resource::feeUselessData, "known bad");
|
||||
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
|
||||
}
|
||||
@@ -1542,7 +1542,8 @@ PeerImp::handleTransaction(
|
||||
flags,
|
||||
checkSignature,
|
||||
batch,
|
||||
stx]() {
|
||||
stx,
|
||||
sp = std::move(span)]() {
|
||||
if (auto peer = weak.lock())
|
||||
peer->checkTransaction(flags, checkSignature, stx, batch);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user