fix(telemetry): resolve merge conflicts, bashate, and rename for phase 5

Resolve merge conflicts taking phase 4 consensus span improvements,
fix bashate indentation in integration test script, and apply rename
script to Phase5 integration test docs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Pratik Mankawde
2026-04-29 11:28:54 +01:00
24 changed files with 695 additions and 540 deletions

View File

@@ -10,14 +10,17 @@
its own factory that can return the real TelemetryImpl.
*/
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/telemetry/Telemetry.h>
#ifdef XRPL_ENABLE_TELEMETRY
#include <opentelemetry/trace/noop.h>
#endif
namespace xrpl {
namespace telemetry {
#include <memory>
#include <utility>
namespace xrpl::telemetry {
namespace {
@@ -32,7 +35,7 @@ class NullTelemetry : public Telemetry
Setup const setup_;
public:
explicit NullTelemetry(Setup const& setup) : setup_(setup)
explicit NullTelemetry(Setup setup) : setup_(std::move(setup))
{
}
@@ -48,37 +51,37 @@ public:
Telemetry::setInstance(nullptr);
}
bool
[[nodiscard]] bool
isEnabled() const override
{
return false;
}
bool
[[nodiscard]] bool
shouldTraceTransactions() const override
{
return false;
}
bool
[[nodiscard]] bool
shouldTraceConsensus() const override
{
return false;
}
bool
[[nodiscard]] bool
shouldTraceRpc() const override
{
return false;
}
bool
[[nodiscard]] bool
shouldTracePeer() const override
{
return false;
}
bool
[[nodiscard]] bool
shouldTraceLedger() const override
{
return false;
@@ -131,5 +134,4 @@ make_Telemetry(Telemetry::Setup const& setup, beast::Journal)
}
#endif
} // namespace telemetry
} // namespace xrpl
} // namespace xrpl::telemetry

View File

@@ -27,6 +27,7 @@
#include <xrpl/telemetry/SpanNames.h>
#include <xrpl/telemetry/Telemetry.h>
#include <opentelemetry/common/attribute_value.h>
#include <opentelemetry/context/runtime_context.h>
#include <opentelemetry/nostd/shared_ptr.h>
#include <opentelemetry/trace/context.h>
@@ -376,12 +377,11 @@ SpanGuard::addEvent(std::string_view name, std::initializer_list<EventAttribute>
{
if (!impl_)
return;
// Own the strings to ensure lifetime safety through the AddEvent call.
std::vector<std::pair<std::string, std::string>> owned;
owned.reserve(attrs.size());
std::vector<std::pair<std::string_view, opentelemetry::common::AttributeValue>> otelAttrs;
otelAttrs.reserve(attrs.size());
for (auto const& [k, v] : attrs)
owned.emplace_back(std::string(k), std::string(v));
impl_->span->AddEvent(std::string(name), owned);
otelAttrs.emplace_back(k, opentelemetry::common::AttributeValue{v});
impl_->span->AddEvent(std::string(name), otelAttrs);
}
void

View File

@@ -7,12 +7,14 @@
See cfg/xrpld-example.cfg for the full list of available options.
*/
#include <xrpl/basics/BasicConfig.h>
#include <xrpl/telemetry/Telemetry.h>
#include <algorithm>
#include <cstdint>
#include <string>
namespace xrpl {
namespace telemetry {
namespace xrpl::telemetry {
namespace {
@@ -81,5 +83,4 @@ setup_Telemetry(
return setup;
}
} // namespace telemetry
} // namespace xrpl
} // namespace xrpl::telemetry

View File

@@ -8,7 +8,7 @@ using namespace xrpl::telemetry;
TEST(SpanGuardFactory, null_guard_methods_are_safe)
{
auto span = SpanGuard::span("nonexistent.span");
auto span = SpanGuard::span(TraceCategory::Rpc, "rpc", "nonexistent");
EXPECT_FALSE(span);
span.setAttribute("key", "value");
@@ -30,28 +30,28 @@ TEST(SpanGuardFactory, category_span_returns_null_when_disabled)
TEST(SpanGuardFactory, child_span_null_when_no_parent)
{
auto span = SpanGuard::span("parent.test");
auto span = SpanGuard::span(TraceCategory::Rpc, "rpc", "parent");
auto child = span.childSpan("child.test");
EXPECT_FALSE(child);
}
TEST(SpanGuardFactory, linked_span_null_when_no_context)
{
auto span = SpanGuard::span("source.test");
auto span = SpanGuard::span(TraceCategory::Rpc, "rpc", "source");
auto linked = span.linkedSpan("linked.test");
EXPECT_FALSE(linked);
}
TEST(SpanGuardFactory, capture_context_returns_invalid_on_null)
{
auto span = SpanGuard::span("ctx.test");
auto span = SpanGuard::span(TraceCategory::Rpc, "rpc", "ctx");
auto ctx = span.captureContext();
EXPECT_FALSE(ctx.isValid());
}
TEST(SpanGuardFactory, move_construction_transfers_ownership)
{
auto span = SpanGuard::span("move.test");
auto span = SpanGuard::span(TraceCategory::Rpc, "rpc", "move");
auto moved = std::move(span);
EXPECT_FALSE(span);
moved.setAttribute("key", "value");

View File

@@ -1,4 +1,3 @@
#include <xrpld/app/consensus/ConsensusSpanNames.h>
#include <xrpld/app/consensus/RCLConsensus.h>
#include <xrpld/app/consensus/RCLCensorshipDetector.h>
@@ -19,6 +18,7 @@
#include <xrpld/app/misc/ValidatorKeys.h>
#include <xrpld/app/misc/ValidatorList.h>
#include <xrpld/consensus/Consensus.h>
#include <xrpld/consensus/ConsensusSpanNames.h>
#include <xrpld/consensus/ConsensusTypes.h>
#include <xrpld/overlay/Overlay.h>
#include <xrpld/overlay/predicates.h>
@@ -227,7 +227,9 @@ void
RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal)
{
auto span = telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "proposal.send");
telemetry::TraceCategory::Consensus,
telemetry::seg::consensus,
telemetry::cons_span::op::proposalSend);
span.setAttribute(
telemetry::cons_span::attr::round, static_cast<int64_t>(proposal.proposeSeq()));
@@ -334,7 +336,9 @@ RCLConsensus::Adaptor::onClose(
ConsensusMode mode) -> Result
{
auto span = telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "ledger_close");
telemetry::TraceCategory::Consensus,
telemetry::seg::consensus,
telemetry::cons_span::op::ledgerClose);
span.setAttribute(
telemetry::cons_span::attr::ledgerSeq,
static_cast<int64_t>(ledger.ledger_->header().seq + 1));
@@ -435,7 +439,15 @@ RCLConsensus::Adaptor::onForceAccept(
ConsensusMode const& mode,
Json::Value&& consensusJson)
{
doAccept(result, prevLedger, closeResolution, rawCloseTimes, mode, std::move(consensusJson));
auto acceptSpan = makeAcceptSpan(result);
doAccept(
result,
prevLedger,
closeResolution,
rawCloseTimes,
mode,
std::move(consensusJson),
std::move(acceptSpan));
}
void
@@ -448,34 +460,46 @@ RCLConsensus::Adaptor::onAccept(
Json::Value&& consensusJson,
bool const validating)
{
{
auto span = telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "accept");
span.setAttribute(
telemetry::cons_span::attr::proposers, static_cast<int64_t>(result.proposers));
span.setAttribute(
telemetry::cons_span::attr::roundTimeMs,
static_cast<int64_t>(result.roundTime.read().count()));
span.setAttribute(
telemetry::cons_span::attr::quorum, static_cast<int64_t>(result.proposers));
}
auto acceptSpan = makeAcceptSpan(result);
app_.getJobQueue().addJob(
jtACCEPT,
"AcceptLedger",
// NOLINTNEXTLINE(cppcoreguidelines-misleading-capture-default-by-value)
[=, this, cj = std::move(consensusJson)]() mutable {
[=, this, cj = std::move(consensusJson), sp = std::move(acceptSpan)]() mutable {
// Note that no lock is held or acquired during this job.
// This is because generic Consensus guarantees that once a ledger
// is accepted, the consensus results and capture by reference state
// will not change until startRound is called (which happens via
// endConsensus).
RclConsensusLogger clog("onAccept", validating, j_);
this->doAccept(result, prevLedger, closeResolution, rawCloseTimes, mode, std::move(cj));
this->doAccept(
result,
prevLedger,
closeResolution,
rawCloseTimes,
mode,
std::move(cj),
std::move(sp));
this->app_.getOPs().endConsensus(clog.ss());
});
}
std::shared_ptr<telemetry::SpanGuard>
RCLConsensus::Adaptor::makeAcceptSpan(Result const& result)
{
auto span = std::make_shared<telemetry::SpanGuard>(
telemetry::SpanGuard::childSpan(telemetry::cons_span::accept, roundSpanContext_));
span->setAttribute(
telemetry::cons_span::attr::proposers, static_cast<int64_t>(result.proposers));
span->setAttribute(
telemetry::cons_span::attr::roundTimeMs,
static_cast<int64_t>(result.roundTime.read().count()));
span->setAttribute(
telemetry::cons_span::attr::quorum, static_cast<int64_t>(app_.getValidators().quorum()));
return span;
}
void
RCLConsensus::Adaptor::doAccept(
Result const& result,
@@ -483,7 +507,8 @@ RCLConsensus::Adaptor::doAccept(
NetClock::duration closeResolution,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode,
Json::Value&& consensusJson)
Json::Value&& consensusJson,
std::shared_ptr<telemetry::SpanGuard> acceptSpan)
{
prevProposers_ = result.proposers;
prevRoundTime_ = result.roundTime.read();
@@ -511,8 +536,9 @@ RCLConsensus::Adaptor::doAccept(
closeTimeCorrect = true;
}
auto doAcceptSpan = telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "accept.apply");
auto doAcceptSpan = acceptSpan
? acceptSpan->childSpan(telemetry::cons_span::acceptApply)
: telemetry::SpanGuard::childSpan(telemetry::cons_span::acceptApply, roundSpanContext_);
doAcceptSpan.setAttribute(
telemetry::cons_span::attr::ledgerSeq, static_cast<int64_t>(prevLedger.seq() + 1));
doAcceptSpan.setAttribute(
@@ -563,12 +589,18 @@ RCLConsensus::Adaptor::doAccept(
JLOG(j_.debug()) << "Building canonical tx set: " << retriableTxs.key();
int64_t txCount = 0;
for (auto const& item : *result.txns.map_)
{
try
{
retriableTxs.insert(std::make_shared<STTx const>(SerialIter{item.slice()}));
JLOG(j_.debug()) << " Tx: " << item.key();
++txCount;
auto const txHash = to_string(item.key());
doAcceptSpan.addEvent(
telemetry::cons_span::event::txIncluded,
{{telemetry::cons_span::attr::txId, txHash}});
}
catch (std::exception const& ex)
{
@@ -576,6 +608,7 @@ RCLConsensus::Adaptor::doAccept(
JLOG(j_.warn()) << " Tx: " << item.key() << " throws: " << ex.what();
}
}
doAcceptSpan.setAttribute(telemetry::cons_span::attr::txCount, txCount);
auto built = buildLCL(
prevLedger,
@@ -959,9 +992,11 @@ void
RCLConsensus::Adaptor::onModeChange(ConsensusMode before, ConsensusMode after)
{
auto span = telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "mode_change");
span.setAttribute(telemetry::cons_span::attr::modeOld, toDisplayString(before).c_str());
span.setAttribute(telemetry::cons_span::attr::modeNew, toDisplayString(after).c_str());
telemetry::TraceCategory::Consensus,
telemetry::seg::consensus,
telemetry::cons_span::op::modeChange);
span.setAttribute(telemetry::cons_span::attr::modeOld, to_string(before).c_str());
span.setAttribute(telemetry::cons_span::attr::modeNew, to_string(after).c_str());
JLOG(j_.info()) << "Consensus mode change before=" << to_string(before)
<< ", after=" << to_string(after);
@@ -1136,10 +1171,7 @@ RCLConsensus::Adaptor::startRoundTracing(RCLCxLedger const& prevLgr)
using namespace telemetry;
if (roundSpan_)
{
prevRoundContext_ = roundSpan_->captureContext();
roundSpan_.reset();
}
auto const& strategy = app_.getTelemetry().getConsensusTraceStrategy();
@@ -1154,7 +1186,8 @@ RCLConsensus::Adaptor::startRoundTracing(RCLCxLedger const& prevLgr)
}
else
{
roundSpan_.emplace(SpanGuard::span(TraceCategory::Consensus, seg::consensus, "round"));
roundSpan_.emplace(
SpanGuard::span(TraceCategory::Consensus, seg::consensus, cons_span::op::round));
}
if (!*roundSpan_)

View File

@@ -79,13 +79,6 @@ class RCLConsensus
*/
std::optional<telemetry::SpanGuard> roundSpan_;
/** Context captured from the previous consensus round.
*
* Used to create span links (follows-from) between consecutive
* rounds, establishing a causal chain in the trace backend.
*/
telemetry::SpanContext prevRoundContext_;
/** SpanContext snapshot of the current round span.
*
* Captured in startRoundTracing() as a lightweight value-type copy
@@ -374,8 +367,17 @@ class RCLConsensus
void
notify(protocol::NodeEvent ne, RCLCxLedger const& ledger, bool haveCorrectLCL);
/** Create a consensus.accept span as a child of the round span.
Returned via shared_ptr so it can be captured into the
jtACCEPT lambda and live until doAccept completes.
*/
std::shared_ptr<telemetry::SpanGuard>
makeAcceptSpan(Result const& result);
/** Accept a new ledger based on the given transactions.
@param acceptSpan Parent span created by makeAcceptSpan();
accept.apply is created as its child.
@ref onAccept
*/
void
@@ -385,7 +387,8 @@ class RCLConsensus
NetClock::duration closeResolution,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode,
Json::Value&& consensusJson);
Json::Value&& consensusJson,
std::shared_ptr<telemetry::SpanGuard> acceptSpan);
/** Build the new last closed ledger.

View File

@@ -82,6 +82,7 @@
#include <xrpl/protocol/STParsedJSON.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/jss.h>
#include <xrpl/protocol/tokens.h>
#include <xrpl/rdb/DatabaseCon.h>
#include <xrpl/resource/Charge.h>
#include <xrpl/resource/Consumer.h>

View File

@@ -172,9 +172,16 @@ class NetworkOPsImp final : public NetworkOPs
FailHard const failType;
bool applied = false;
TER result;
/// Keeps the tx.process span alive until the batch processes this entry.
std::shared_ptr<telemetry::SpanGuard> span;
TransactionStatus(std::shared_ptr<Transaction> t, bool a, bool l, FailHard f)
: transaction(std::move(t)), admin(a), local(l), failType(f)
TransactionStatus(
std::shared_ptr<Transaction> t,
bool a,
bool l,
FailHard f,
std::shared_ptr<telemetry::SpanGuard> s = nullptr)
: transaction(std::move(t)), admin(a), local(l), failType(f), span(std::move(s))
{
XRPL_ASSERT(
local || failType == FailHard::no,
@@ -397,7 +404,8 @@ public:
doTransactionAsync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failtype);
FailHard failtype,
std::shared_ptr<telemetry::SpanGuard> span = nullptr);
private:
bool
@@ -1315,9 +1323,9 @@ NetworkOPsImp::processTransaction(
FailHard failType)
{
using namespace telemetry;
auto span = txProcessSpan(transaction->getID());
span.setAttribute(tx_span::attr::hash, to_string(transaction->getID()).c_str());
span.setAttribute(tx_span::attr::local, bLocal);
auto span = std::make_shared<SpanGuard>(txProcessSpan(transaction->getID()));
span->setAttribute(tx_span::attr::hash, to_string(transaction->getID()).c_str());
span->setAttribute(tx_span::attr::local, bLocal);
auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
@@ -1327,13 +1335,13 @@ NetworkOPsImp::processTransaction(
if (bLocal)
{
span.setAttribute(tx_span::attr::path, tx_span::val::sync);
span->setAttribute(tx_span::attr::path, tx_span::val::sync);
doTransactionSync(transaction, bUnlimited, failType);
}
else
{
span.setAttribute(tx_span::attr::path, tx_span::val::async);
doTransactionAsync(transaction, bUnlimited, failType);
span->setAttribute(tx_span::attr::path, tx_span::val::async);
doTransactionAsync(transaction, bUnlimited, failType, std::move(span));
}
}
@@ -1341,14 +1349,15 @@ void
NetworkOPsImp::doTransactionAsync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failType)
FailHard failType,
std::shared_ptr<telemetry::SpanGuard> span)
{
std::lock_guard const lock(mMutex);
if (transaction->getApplying())
return;
mTransactions.emplace_back(transaction, bUnlimited, false, failType);
mTransactions.emplace_back(transaction, bUnlimited, false, failType, std::move(span));
transaction->setApplying();
if (mDispatchState == DispatchState::none)

View File

@@ -1,8 +1,8 @@
#include <xrpld/app/misc/TxQ.h>
#include <xrpld/app/misc/detail/TxQSpanNames.h>
#include <xrpld/app/ledger/OpenLedger.h>
#include <xrpld/app/main/Application.h>
#include <xrpld/app/misc/detail/TxQSpanNames.h>
#include <xrpl/basics/BasicConfig.h>
#include <xrpl/basics/Log.h>

View File

@@ -1,8 +1,8 @@
#pragma once
#include <xrpld/app/consensus/ConsensusSpanNames.h>
#include <xrpld/consensus/ConsensusParms.h>
#include <xrpld/consensus/ConsensusProposal.h>
#include <xrpld/consensus/ConsensusSpanNames.h>
#include <xrpld/consensus/ConsensusTypes.h>
#include <xrpld/consensus/DisputedTx.h>
@@ -609,6 +609,11 @@ private:
*/
std::optional<xrpl::telemetry::SpanGuard> establishSpan_;
/** Span for the open phase of consensus.
* Created in startRoundInternal(); cleared (ended) in closeLedger().
*/
std::optional<xrpl::telemetry::SpanGuard> openSpan_;
/** Create the establish-phase span if not yet active.
* Called on each phaseEstablish() invocation; no-op while span is live.
*/
@@ -695,6 +700,11 @@ Consensus<Adaptor>::startRoundInternal(
CLOG(clog) << "startRoundInternal transitioned to ConsensusPhase::open, "
"previous ledgerID: "
<< prevLedgerID << ", seq: " << prevLedger.seq() << ". ";
openSpan_.emplace(
telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus,
telemetry::seg::consensus,
telemetry::cons_span::op::phaseOpen));
mode_.set(mode, adaptor_);
now_ = now;
prevLedgerID_ = prevLedgerID;
@@ -1420,6 +1430,7 @@ Consensus<Adaptor>::closeLedger(std::unique_ptr<std::stringstream> const& clog)
// We should not be closing if we already have a position
XRPL_ASSERT(!result_, "xrpl::Consensus::closeLedger : result is not set");
openSpan_.reset();
phase_ = ConsensusPhase::establish;
JLOG(j_.debug()) << "transitioned to ConsensusPhase::establish";
rawCloseTimes_.self = now_;
@@ -1477,9 +1488,12 @@ Consensus<Adaptor>::updateOurPositions(std::unique_ptr<std::stringstream> const&
XRPL_ASSERT(result_, "xrpl::Consensus::updateOurPositions : result is set");
// NOLINTBEGIN(bugprone-unchecked-optional-access) assert above
using namespace telemetry;
auto span = SpanGuard::span(TraceCategory::Consensus, seg::consensus, "update_positions");
auto span =
SpanGuard::span(TraceCategory::Consensus, seg::consensus, cons_span::op::updatePositions);
span.setAttribute(cons_span::attr::convergePercent, static_cast<int64_t>(convergePercent_));
span.setAttribute(cons_span::attr::proposers, static_cast<int64_t>(currPeerPositions_.size()));
span.setAttribute(
cons_span::attr::disputesCount, static_cast<int64_t>(result_->disputes.size()));
ConsensusParms const& parms = adaptor_.parms();
// Compute a cutoff time
@@ -1540,10 +1554,14 @@ Consensus<Adaptor>::updateOurPositions(std::unique_ptr<std::stringstream> const&
mutableSet->erase(txId);
}
auto const yaysStr = std::to_string(dispute.getYays());
auto const naysStr = std::to_string(dispute.getNays());
span.addEvent(
"dispute.resolve",
cons_span::event::disputeResolve,
{{cons_span::attr::txId, to_string(txId)},
{cons_span::attr::disputeOurVote, dispute.getOurVote() ? "yes" : "no"}});
{cons_span::attr::disputeOurVote, dispute.getOurVote() ? "yes" : "no"},
{cons_span::attr::disputeYays, yaysStr},
{cons_span::attr::disputeNays, naysStr}});
}
}
@@ -1568,6 +1586,7 @@ Consensus<Adaptor>::updateOurPositions(std::unique_ptr<std::stringstream> const&
if (newState)
closeTimeAvalancheState_ = *newState;
CLOG(clog) << "neededWeight " << neededWeight << ". ";
span.setAttribute(cons_span::attr::avalancheThreshold, static_cast<int64_t>(neededWeight));
int participants = currPeerPositions_.size();
if (mode_.get() == ConsensusMode::proposing)
@@ -1672,7 +1691,7 @@ Consensus<Adaptor>::haveConsensus(std::unique_ptr<std::stringstream> const& clog
XRPL_ASSERT(result_, "xrpl::Consensus::haveConsensus : has result");
// NOLINTBEGIN(bugprone-unchecked-optional-access) assert above
using namespace telemetry;
auto span = SpanGuard::span(TraceCategory::Consensus, seg::consensus, "check");
auto span = SpanGuard::span(TraceCategory::Consensus, seg::consensus, cons_span::op::check);
// CHECKME: should possibly count unacquired TX sets as disagreeing
int agree = 0, disagree = 0;
@@ -1785,6 +1804,8 @@ Consensus<Adaptor>::haveConsensus(std::unique_ptr<std::stringstream> const& clog
stateStr = "yes";
else if (result_->state == ConsensusState::MovedOn)
stateStr = "moved_on";
else if (result_->state == ConsensusState::Expired)
stateStr = "expired";
span.setAttribute(cons_span::attr::result, stateStr);
CLOG(clog) << "Consensus has been reached. ";
@@ -1916,7 +1937,9 @@ Consensus<Adaptor>::startEstablishTracing()
return;
establishSpan_.emplace(
telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "establish"));
telemetry::TraceCategory::Consensus,
telemetry::seg::consensus,
telemetry::cons_span::op::establish));
}
template <class Adaptor>

View File

@@ -2,22 +2,78 @@
/** Compile-time span name constants for consensus tracing.
*
* Used by RCLConsensus (app) and Consensus.h (template) for
* consensus lifecycle spans. Built on StaticStr/join() from SpanNames.h.
* Used by RCLConsensus (app), Consensus.h (template), and PeerImp
* (overlay) for consensus lifecycle spans.
* Built on StaticStr/join() from SpanNames.h.
*
* Span hierarchy:
* ## Span Hierarchy
*
* consensus.round (deterministic trace_id from ledger hash)
* Root span created in Adaptor::startRoundTracing(). In "deterministic"
* strategy the trace-id is derived from the previous ledger hash so all
* nodes tracing the same round share a trace.
*
* consensus.round [main thread, root]
* | Created: Adaptor::startRoundTracing()
* | Attrs: ledger_id, ledger.seq, mode, trace_strategy, round_id
* |
* +-- consensus.proposal.send
* +-- consensus.ledger_close
* +-- consensus.establish
* +-- consensus.update_positions
* +-- consensus.check
* +-- consensus.accept
* +-- consensus.accept.apply (jtACCEPT thread)
* +-- consensus.validation.send (jtACCEPT thread, linked)
* +-- consensus.mode_change
* +-- consensus.phase.open [main thread, child]
* | Created: Consensus::startRoundInternal()
* | Ended: Consensus::closeLedger()
* |
* +-- consensus.proposal.send [main thread]
* | Created: Adaptor::propose()
* | Attrs: round (proposeSeq)
* |
* +-- consensus.ledger_close [main thread]
* | Created: Adaptor::onClose()
* | Attrs: ledger.seq, mode
* |
* +-- consensus.establish [main thread, child]
* | Created: Consensus::startEstablishTracing()
* | Ended: Consensus::phaseEstablish() on accept
* | Attrs: converge_percent, establish_count, proposers
* |
* +-- consensus.update_positions [main thread]
* | Created: Consensus::updateOurPositions()
* | Attrs: converge_percent, proposers, disputes_count
* | Events: per-dispute vote details (tx_id, our_vote, yays, nays)
* |
* +-- consensus.check [main thread]
* | Created: Consensus::haveConsensus()
* | Attrs: agree/disagree counts, threshold_percent, result
* |
* +-- consensus.accept [main thread, child of round]
* | Created: Adaptor::makeAcceptSpan(), shared_ptr kept alive
* | until doAccept() completes on jtACCEPT thread
* | Attrs: proposers, round_time_ms, quorum
* | |
* | +-- consensus.accept.apply [jtACCEPT thread, child of accept]
* | Created: Adaptor::doAccept()
* | Attrs: ledger.seq, close_time, close_time_correct,
* | close_resolution_ms, state, proposing, round_time_ms,
* | parent_close_time, close_time_self, close_time_vote_bins,
* | resolution_direction, tx_count
* | Events: tx.included (per tx)
* |
* +~~~ consensus.validation.send [jtACCEPT thread, linked]
* | Created: Adaptor::createValidationSpan() (follows-from link)
* | Attrs: ledger.seq, proposing
* |
* +-- consensus.mode_change [main thread]
* Created: Adaptor::onModeChange()
* Attrs: mode.old, mode.new
*
* Standalone spans (no parent, created per-message in overlay):
*
* consensus.proposal.receive [PeerImp I/O thread]
* Created: PeerImp::onMessage(TMProposeSet)
*
* consensus.validation.receive [PeerImp I/O thread]
* Created: PeerImp::onMessage(TMValidation)
*
* Legend:
* +-- child-of relationship (same trace)
* +~~~ follows-from link (separate sub-tree, causal link)
*/
#include <xrpl/telemetry/SpanNames.h>
@@ -28,17 +84,27 @@ namespace cons_span {
// ===== Span name segments ====================================================
namespace part {
inline constexpr auto proposal = makeStr("proposal");
inline constexpr auto validation = makeStr("validation");
inline constexpr auto accept = makeStr("accept");
inline constexpr auto phase = makeStr("phase");
} // namespace part
namespace op {
inline constexpr auto round = makeStr("round");
inline constexpr auto proposalSend = makeStr("proposal.send");
inline constexpr auto proposalSend = join(part::proposal, makeStr("send"));
inline constexpr auto ledgerClose = makeStr("ledger_close");
inline constexpr auto establish = makeStr("establish");
inline constexpr auto updatePositions = makeStr("update_positions");
inline constexpr auto check = makeStr("check");
inline constexpr auto accept = makeStr("accept");
inline constexpr auto acceptApply = makeStr("accept.apply");
inline constexpr auto validationSend = makeStr("validation.send");
inline constexpr auto acceptApply = join(part::accept, makeStr("apply"));
inline constexpr auto validationSend = join(part::validation, makeStr("send"));
inline constexpr auto modeChange = makeStr("mode_change");
inline constexpr auto proposalReceive = join(part::proposal, makeStr("receive"));
inline constexpr auto validationReceive = join(part::validation, makeStr("receive"));
inline constexpr auto phaseOpen = join(part::phase, makeStr("open"));
} // namespace op
// ===== Full span names (prefix.op) ===========================================
@@ -53,6 +119,9 @@ inline constexpr auto accept = join(seg::consensus, op::accept);
inline constexpr auto acceptApply = join(seg::consensus, op::acceptApply);
inline constexpr auto validationSend = join(seg::consensus, op::validationSend);
inline constexpr auto modeChange = join(seg::consensus, op::modeChange);
inline constexpr auto proposalReceive = join(seg::consensus, op::proposalReceive);
inline constexpr auto validationReceive = join(seg::consensus, op::validationReceive);
inline constexpr auto phaseOpen = join(seg::consensus, op::phaseOpen);
// ===== Attribute keys ========================================================
@@ -62,7 +131,7 @@ inline constexpr auto xrplConsensus = join(seg::xrpl, seg::consensus);
/// "xrpl.consensus.ledger_id"
inline constexpr auto ledgerId = join(xrplConsensus, makeStr("ledger_id"));
/// "xrpl.consensus.ledger.seq"
inline constexpr auto ledgerSeq = join(xrplConsensus, makeStr("ledger.seq"));
inline constexpr auto ledgerSeq = join(join(xrplConsensus, makeStr("ledger")), makeStr("seq"));
/// "xrpl.consensus.mode"
inline constexpr auto mode = join(xrplConsensus, makeStr("mode"));
/// "xrpl.consensus.round"
@@ -97,9 +166,6 @@ inline constexpr auto resolutionDirection = join(xrplConsensus, makeStr("resolut
inline constexpr auto convergePercent = join(xrplConsensus, makeStr("converge_percent"));
/// "xrpl.consensus.establish_count"
inline constexpr auto establishCount = join(xrplConsensus, makeStr("establish_count"));
/// "xrpl.consensus.proposers_agreed"
inline constexpr auto proposersAgreed = join(xrplConsensus, makeStr("proposers_agreed"));
// Avalanche threshold attributes
/// "xrpl.consensus.avalanche_threshold"
inline constexpr auto avalancheThreshold = join(xrplConsensus, makeStr("avalanche_threshold"));
@@ -120,8 +186,6 @@ inline constexpr auto thresholdPercent = join(xrplConsensus, makeStr("threshold_
inline constexpr auto result = join(xrplConsensus, makeStr("result"));
/// "xrpl.consensus.quorum"
inline constexpr auto quorum = join(xrplConsensus, makeStr("quorum"));
/// "xrpl.consensus.validation_count"
inline constexpr auto validationCount = join(xrplConsensus, makeStr("validation_count"));
// Trace strategy attribute
/// "xrpl.consensus.trace_strategy"
@@ -131,9 +195,9 @@ inline constexpr auto roundId = join(xrplConsensus, makeStr("round_id"));
// Mode change attributes
/// "xrpl.consensus.mode.old"
inline constexpr auto modeOld = join(xrplConsensus, makeStr("mode.old"));
inline constexpr auto modeOld = join(join(xrplConsensus, makeStr("mode")), makeStr("old"));
/// "xrpl.consensus.mode.new"
inline constexpr auto modeNew = join(xrplConsensus, makeStr("mode.new"));
inline constexpr auto modeNew = join(join(xrplConsensus, makeStr("mode")), makeStr("new"));
// Dispute event attributes
/// "xrpl.tx.id"
@@ -145,8 +209,24 @@ inline constexpr auto disputeOurVote =
inline constexpr auto disputeYays = join(join(seg::xrpl, makeStr("dispute")), makeStr("yays"));
/// "xrpl.dispute.nays"
inline constexpr auto disputeNays = join(join(seg::xrpl, makeStr("dispute")), makeStr("nays"));
/// "xrpl.consensus.tx_count"
inline constexpr auto txCount = join(xrplConsensus, makeStr("tx_count"));
/// "xrpl.consensus.disputes_count"
inline constexpr auto disputesCount = join(xrplConsensus, makeStr("disputes_count"));
/// "xrpl.consensus.trusted"
inline constexpr auto trusted = join(xrplConsensus, makeStr("trusted"));
} // namespace attr
// ===== Event names ===========================================================
namespace event {
/// "dispute.resolve"
inline constexpr auto disputeResolve = join(makeStr("dispute"), makeStr("resolve"));
/// "tx.included"
inline constexpr auto txIncluded = join(makeStr("tx"), makeStr("included"));
} // namespace event
// ===== Attribute values ======================================================
namespace val {

View File

@@ -9,6 +9,7 @@
#include <xrpld/app/misc/Transaction.h>
#include <xrpld/app/misc/TxSpanNames.h>
#include <xrpld/app/misc/ValidatorList.h>
#include <xrpld/consensus/ConsensusSpanNames.h>
#include <xrpld/consensus/Validations.h>
#include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/ClusterNode.h>
@@ -1442,11 +1443,11 @@ PeerImp::handleTransaction(
uint256 const txID = stx->getTransactionID();
using namespace telemetry;
auto span = txReceiveSpan(txID, *m);
span.setAttribute(tx_span::attr::hash, to_string(txID).c_str());
span.setAttribute(tx_span::attr::peerId, static_cast<int64_t>(id_));
auto span = std::make_shared<SpanGuard>(txReceiveSpan(txID, *m));
span->setAttribute(tx_span::attr::hash, to_string(txID).c_str());
span->setAttribute(tx_span::attr::peerId, static_cast<int64_t>(id_));
if (auto const version = getVersion(); !version.empty())
span.setAttribute(tx_span::attr::peerVersion, version.c_str());
span->setAttribute(tx_span::attr::peerVersion, version.c_str());
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
// LCOV_EXCL_START
@@ -1480,11 +1481,11 @@ PeerImp::handleTransaction(
if (!app_.getHashRouter().shouldProcess(txID, id_, flags, tx_interval))
{
span.setAttribute(tx_span::attr::suppressed, true);
span->setAttribute(tx_span::attr::suppressed, true);
// we have seen this transaction recently
if (any(flags & HashRouterFlags::BAD))
{
span.setAttribute(tx_span::attr::status, tx_span::val::knownBad);
span->setAttribute(tx_span::attr::status, tx_span::val::knownBad);
fee_.update(Resource::feeUselessData, "known bad");
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
}
@@ -1542,7 +1543,8 @@ PeerImp::handleTransaction(
flags,
checkSignature,
batch,
stx]() {
stx,
sp = std::move(span)]() {
if (auto peer = weak.lock())
peer->checkTransaction(flags, checkSignature, stx, batch);
});
@@ -1957,9 +1959,18 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
app_.getTimeKeeper().closeTime(),
calcNodeID(app_.getValidatorManifests().getMasterKey(publicKey))});
auto span = std::make_shared<telemetry::SpanGuard>(telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus,
telemetry::seg::consensus,
telemetry::cons_span::op::proposalReceive));
span->setAttribute(telemetry::cons_span::attr::trusted, isTrusted);
span->setAttribute(telemetry::cons_span::attr::round, static_cast<int64_t>(set.proposeseq()));
std::weak_ptr<PeerImp> const weak = shared_from_this();
app_.getJobQueue().addJob(
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, "checkPropose", [weak, isTrusted, m, proposal]() {
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
"checkPropose",
[weak, isTrusted, m, proposal, sp = std::move(span)]() {
if (auto peer = weak.lock())
peer->checkPropose(isTrusted, m, proposal);
});
@@ -2534,6 +2545,18 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
return;
}
auto span = std::make_shared<telemetry::SpanGuard>(telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus,
telemetry::seg::consensus,
telemetry::cons_span::op::validationReceive));
span->setAttribute(telemetry::cons_span::attr::trusted, isTrusted);
if (val->isFieldPresent(sfLedgerSequence))
{
span->setAttribute(
telemetry::cons_span::attr::ledgerSeq,
static_cast<int64_t>(val->getFieldU32(sfLedgerSequence)));
}
if (!isTrusted && (tracking_.load() == Tracking::diverged))
{
JLOG(p_journal_.debug()) << "Dropping untrusted validation from diverged peer";
@@ -2544,7 +2567,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
std::weak_ptr<PeerImp> const weak = shared_from_this();
app_.getJobQueue().addJob(
isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, name, [weak, val, m, key]() {
isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
name,
[weak, val, m, key, sp = std::move(span)]() {
if (auto peer = weak.lock())
peer->checkValidation(val, key, m);
});

View File

@@ -37,6 +37,7 @@ inline constexpr auto command = join(seg::rpc, makeStr("command"));
namespace op {
inline constexpr auto wsMessage = makeStr("ws_message");
inline constexpr auto wsUpgrade = makeStr("ws_upgrade");
inline constexpr auto httpRequest = makeStr("http_request");
inline constexpr auto process = makeStr("process");
} // namespace op
@@ -65,6 +66,7 @@ using telemetry::attr_val::error;
using telemetry::attr_val::success;
inline constexpr auto admin = makeStr("admin");
inline constexpr auto user = makeStr("user");
inline constexpr auto unknownCommand = makeStr("unknown_command");
} // namespace val
} // namespace rpc_span