code review changes

Signed-off-by: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com>
This commit is contained in:
Pratik Mankawde
2026-04-28 17:03:49 +01:00
parent 2773de7b54
commit ac68091bec
5 changed files with 155 additions and 65 deletions

View File

@@ -27,6 +27,7 @@
#include <xrpl/telemetry/SpanNames.h>
#include <xrpl/telemetry/Telemetry.h>
#include <opentelemetry/common/attribute_value.h>
#include <opentelemetry/context/runtime_context.h>
#include <opentelemetry/nostd/shared_ptr.h>
#include <opentelemetry/trace/context.h>
@@ -396,12 +397,11 @@ SpanGuard::addEvent(std::string_view name, std::initializer_list<EventAttribute>
{
if (!impl_)
return;
// Own the strings to ensure lifetime safety through the AddEvent call.
std::vector<std::pair<std::string, std::string>> owned;
owned.reserve(attrs.size());
std::vector<std::pair<std::string_view, opentelemetry::common::AttributeValue>> otelAttrs;
otelAttrs.reserve(attrs.size());
for (auto const& [k, v] : attrs)
owned.emplace_back(std::string(k), std::string(v));
impl_->span->AddEvent(std::string(name), owned);
otelAttrs.emplace_back(k, opentelemetry::common::AttributeValue{v});
impl_->span->AddEvent(std::string(name), otelAttrs);
}
void

View File

@@ -2,26 +2,78 @@
/** Compile-time span name constants for consensus tracing.
*
* Used by RCLConsensus (app) and Consensus.h (template) for
* consensus lifecycle spans. Built on StaticStr/join() from SpanNames.h.
* Used by RCLConsensus (app), Consensus.h (template), and PeerImp
* (overlay) for consensus lifecycle spans.
* Built on StaticStr/join() from SpanNames.h.
*
* Span hierarchy:
* ## Span Hierarchy
*
* consensus.round (deterministic trace_id from ledger hash)
* Root span created in Adaptor::startRoundTracing(). In "deterministic"
* strategy the trace-id is derived from the previous ledger hash so all
* nodes tracing the same round share a trace.
*
* consensus.round [main thread, root]
* | Created: Adaptor::startRoundTracing()
* | Attrs: ledger_id, ledger.seq, mode, trace_strategy, round_id
* |
* +-- consensus.phase.open
* +-- consensus.proposal.send
* +-- consensus.ledger_close
* +-- consensus.establish
* +-- consensus.update_positions
* +-- consensus.check
* +-- consensus.accept
* +-- consensus.accept.apply (jtACCEPT thread)
* +-- consensus.validation.send (jtACCEPT thread, linked)
* +-- consensus.mode_change
* +-- consensus.phase.open [main thread, child]
* | Created: Consensus::startRoundInternal()
* | Ended: Consensus::closeLedger()
* |
* +-- consensus.proposal.send [main thread]
* | Created: Adaptor::propose()
* | Attrs: round (proposeSeq)
* |
* +-- consensus.ledger_close [main thread]
* | Created: Adaptor::onClose()
* | Attrs: ledger.seq, mode
* |
* +-- consensus.establish [main thread, child]
* | Created: Consensus::startEstablishTracing()
* | Ended: Consensus::phaseEstablish() on accept
* | Attrs: converge_percent, tx_count, disputes_count
* |
* +-- consensus.update_positions [main thread]
* | Created: Consensus::updateOurPositions()
* | Attrs: converge_percent, proposers, disputes_count
* | Events: per-dispute vote details (tx_id, our_vote, yays, nays)
* |
* +-- consensus.check [main thread]
* | Created: Consensus::haveConsensus()
* | Attrs: agree/disagree counts, threshold_percent, result
* |
* +-- consensus.accept [main thread, child of round]
* | Created: Adaptor::makeAcceptSpan(), shared_ptr kept alive
* | until doAccept() completes on jtACCEPT thread
* | Attrs: proposers, round_time_ms, quorum
* | |
* | +-- consensus.accept.apply [jtACCEPT thread, child of accept]
* | Created: Adaptor::doAccept()
* | Attrs: ledger.seq, close_time, close_time_correct,
* | close_resolution_ms, state, proposing, round_time_ms,
* | parent_close_time, close_time_self, close_time_vote_bins,
* | resolution_direction, tx_count
* | Events: tx.included (per tx)
* |
* +~~~ consensus.validation.send [jtACCEPT thread, linked]
* | Created: Adaptor::createValidationSpan() (follows-from link)
* | Attrs: ledger.seq, proposing
* |
* +-- consensus.mode_change [main thread]
* Created: Adaptor::onModeChange()
* Attrs: mode.old, mode.new
*
* consensus.proposal.receive (standalone, PeerImp)
* consensus.validation.receive (standalone, PeerImp)
* Standalone spans (no parent, created per-message in overlay):
*
* consensus.proposal.receive [PeerImp I/O thread]
* Created: PeerImp::onMessage(TMProposeSet)
*
* consensus.validation.receive [PeerImp I/O thread]
* Created: PeerImp::onMessage(TMValidation)
*
* Legend:
* +-- child-of relationship (same trace)
* +~~~ follows-from link (separate sub-tree, causal link)
*/
#include <xrpl/telemetry/SpanNames.h>
@@ -32,20 +84,27 @@ namespace cons_span {
// ===== Span name segments ====================================================
namespace part {
inline constexpr auto proposal = makeStr("proposal");
inline constexpr auto validation = makeStr("validation");
inline constexpr auto accept = makeStr("accept");
inline constexpr auto phase = makeStr("phase");
} // namespace part
namespace op {
inline constexpr auto round = makeStr("round");
inline constexpr auto proposalSend = makeStr("proposal.send");
inline constexpr auto proposalSend = join(part::proposal, makeStr("send"));
inline constexpr auto ledgerClose = makeStr("ledger_close");
inline constexpr auto establish = makeStr("establish");
inline constexpr auto updatePositions = makeStr("update_positions");
inline constexpr auto check = makeStr("check");
inline constexpr auto accept = makeStr("accept");
inline constexpr auto acceptApply = makeStr("accept.apply");
inline constexpr auto validationSend = makeStr("validation.send");
inline constexpr auto acceptApply = join(part::accept, makeStr("apply"));
inline constexpr auto validationSend = join(part::validation, makeStr("send"));
inline constexpr auto modeChange = makeStr("mode_change");
inline constexpr auto proposalReceive = makeStr("proposal.receive");
inline constexpr auto validationReceive = makeStr("validation.receive");
inline constexpr auto phaseOpen = makeStr("phase.open");
inline constexpr auto proposalReceive = join(part::proposal, makeStr("receive"));
inline constexpr auto validationReceive = join(part::validation, makeStr("receive"));
inline constexpr auto phaseOpen = join(part::phase, makeStr("open"));
} // namespace op
// ===== Full span names (prefix.op) ===========================================
@@ -72,7 +131,7 @@ inline constexpr auto xrplConsensus = join(seg::xrpl, seg::consensus);
/// "xrpl.consensus.ledger_id"
inline constexpr auto ledgerId = join(xrplConsensus, makeStr("ledger_id"));
/// "xrpl.consensus.ledger.seq"
inline constexpr auto ledgerSeq = join(xrplConsensus, makeStr("ledger.seq"));
inline constexpr auto ledgerSeq = join(join(xrplConsensus, makeStr("ledger")), makeStr("seq"));
/// "xrpl.consensus.mode"
inline constexpr auto mode = join(xrplConsensus, makeStr("mode"));
/// "xrpl.consensus.round"
@@ -141,9 +200,9 @@ inline constexpr auto roundId = join(xrplConsensus, makeStr("round_id"));
// Mode change attributes
/// "xrpl.consensus.mode.old"
inline constexpr auto modeOld = join(xrplConsensus, makeStr("mode.old"));
inline constexpr auto modeOld = join(join(xrplConsensus, makeStr("mode")), makeStr("old"));
/// "xrpl.consensus.mode.new"
inline constexpr auto modeNew = join(xrplConsensus, makeStr("mode.new"));
inline constexpr auto modeNew = join(join(xrplConsensus, makeStr("mode")), makeStr("new"));
// Dispute event attributes
/// "xrpl.tx.id"

View File

@@ -232,7 +232,9 @@ void
RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal)
{
auto span = telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "proposal.send");
telemetry::TraceCategory::Consensus,
telemetry::seg::consensus,
telemetry::cons_span::op::proposalSend);
span.setAttribute(
telemetry::cons_span::attr::round, static_cast<int64_t>(proposal.proposeSeq()));
@@ -349,7 +351,9 @@ RCLConsensus::Adaptor::onClose(
ConsensusMode mode) -> Result
{
auto span = telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "ledger_close");
telemetry::TraceCategory::Consensus,
telemetry::seg::consensus,
telemetry::cons_span::op::ledgerClose);
span.setAttribute(
telemetry::cons_span::attr::ledgerSeq,
static_cast<int64_t>(ledger.ledger_->header().seq + 1));
@@ -450,7 +454,15 @@ RCLConsensus::Adaptor::onForceAccept(
ConsensusMode const& mode,
Json::Value&& consensusJson)
{
doAccept(result, prevLedger, closeResolution, rawCloseTimes, mode, std::move(consensusJson));
auto acceptSpan = makeAcceptSpan(result);
doAccept(
result,
prevLedger,
closeResolution,
rawCloseTimes,
mode,
std::move(consensusJson),
std::move(acceptSpan));
}
void
@@ -463,34 +475,45 @@ RCLConsensus::Adaptor::onAccept(
Json::Value&& consensusJson,
bool const validating)
{
{
auto span =
telemetry::SpanGuard::childSpan(telemetry::cons_span::accept, roundSpanContext_);
span.setAttribute(
telemetry::cons_span::attr::proposers, static_cast<int64_t>(result.proposers));
span.setAttribute(
telemetry::cons_span::attr::roundTimeMs,
static_cast<int64_t>(result.roundTime.read().count()));
span.setAttribute(
telemetry::cons_span::attr::quorum, static_cast<int64_t>(result.proposers));
}
auto acceptSpan = makeAcceptSpan(result);
app_.getJobQueue().addJob(
jtACCEPT,
"AcceptLedger",
// NOLINTNEXTLINE(cppcoreguidelines-misleading-capture-default-by-value)
[=, this, cj = std::move(consensusJson)]() mutable {
[=, this, cj = std::move(consensusJson), sp = std::move(acceptSpan)]() mutable {
// Note that no lock is held or acquired during this job.
// This is because generic Consensus guarantees that once a ledger
// is accepted, the consensus results and capture by reference state
// will not change until startRound is called (which happens via
// endConsensus).
RclConsensusLogger clog("onAccept", validating, j_);
this->doAccept(result, prevLedger, closeResolution, rawCloseTimes, mode, std::move(cj));
this->doAccept(
result,
prevLedger,
closeResolution,
rawCloseTimes,
mode,
std::move(cj),
std::move(sp));
this->app_.getOPs().endConsensus(clog.ss());
});
}
std::shared_ptr<telemetry::SpanGuard>
RCLConsensus::Adaptor::makeAcceptSpan(Result const& result)
{
auto span = std::make_shared<telemetry::SpanGuard>(
telemetry::SpanGuard::childSpan(telemetry::cons_span::accept, roundSpanContext_));
span->setAttribute(
telemetry::cons_span::attr::proposers, static_cast<int64_t>(result.proposers));
span->setAttribute(
telemetry::cons_span::attr::roundTimeMs,
static_cast<int64_t>(result.roundTime.read().count()));
span->setAttribute(telemetry::cons_span::attr::quorum, static_cast<int64_t>(result.proposers));
return span;
}
void
RCLConsensus::Adaptor::doAccept(
Result const& result,
@@ -498,7 +521,8 @@ RCLConsensus::Adaptor::doAccept(
NetClock::duration closeResolution,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode,
Json::Value&& consensusJson)
Json::Value&& consensusJson,
std::shared_ptr<telemetry::SpanGuard> acceptSpan)
{
prevProposers_ = result.proposers;
prevRoundTime_ = result.roundTime.read();
@@ -526,8 +550,9 @@ RCLConsensus::Adaptor::doAccept(
closeTimeCorrect = true;
}
auto doAcceptSpan =
telemetry::SpanGuard::childSpan(telemetry::cons_span::acceptApply, roundSpanContext_);
auto doAcceptSpan = acceptSpan
? acceptSpan->childSpan(telemetry::cons_span::acceptApply)
: telemetry::SpanGuard::childSpan(telemetry::cons_span::acceptApply, roundSpanContext_);
doAcceptSpan.setAttribute(
telemetry::cons_span::attr::ledgerSeq, static_cast<int64_t>(prevLedger.seq() + 1));
doAcceptSpan.setAttribute(
@@ -987,7 +1012,9 @@ void
RCLConsensus::Adaptor::onModeChange(ConsensusMode before, ConsensusMode after)
{
auto span = telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "mode_change");
telemetry::TraceCategory::Consensus,
telemetry::seg::consensus,
telemetry::cons_span::op::modeChange);
span.setAttribute(telemetry::cons_span::attr::modeOld, to_string(before).c_str());
span.setAttribute(telemetry::cons_span::attr::modeNew, to_string(after).c_str());
@@ -1164,10 +1191,7 @@ RCLConsensus::Adaptor::startRoundTracing(RCLCxLedger const& prevLgr)
using namespace telemetry;
if (roundSpan_)
{
prevRoundContext_ = roundSpan_->captureContext();
roundSpan_.reset();
}
auto const& strategy = app_.getTelemetry().getConsensusTraceStrategy();
@@ -1182,7 +1206,8 @@ RCLConsensus::Adaptor::startRoundTracing(RCLCxLedger const& prevLgr)
}
else
{
roundSpan_.emplace(SpanGuard::span(TraceCategory::Consensus, seg::consensus, "round"));
roundSpan_.emplace(
SpanGuard::span(TraceCategory::Consensus, seg::consensus, cons_span::op::round));
}
if (!*roundSpan_)

View File

@@ -79,13 +79,6 @@ class RCLConsensus
*/
std::optional<telemetry::SpanGuard> roundSpan_;
/** Context captured from the previous consensus round.
*
* Used to create span links (follows-from) between consecutive
* rounds, establishing a causal chain in the trace backend.
*/
telemetry::SpanContext prevRoundContext_;
/** SpanContext snapshot of the current round span.
*
* Captured in startRoundTracing() as a lightweight value-type copy
@@ -374,8 +367,17 @@ class RCLConsensus
void
notify(protocol::NodeEvent ne, RCLCxLedger const& ledger, bool haveCorrectLCL);
/** Create a consensus.accept span as a child of the round span.
Returned via shared_ptr so it can be captured into the
jtACCEPT lambda and live until doAccept completes.
*/
std::shared_ptr<telemetry::SpanGuard>
makeAcceptSpan(Result const& result);
/** Accept a new ledger based on the given transactions.
@param acceptSpan Parent span created by makeAcceptSpan();
accept.apply is created as its child.
@ref onAccept
*/
void
@@ -385,7 +387,8 @@ class RCLConsensus
NetClock::duration closeResolution,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode,
Json::Value&& consensusJson);
Json::Value&& consensusJson,
std::shared_ptr<telemetry::SpanGuard> acceptSpan);
/** Build the new last closed ledger.

View File

@@ -1488,7 +1488,8 @@ Consensus<Adaptor>::updateOurPositions(std::unique_ptr<std::stringstream> const&
XRPL_ASSERT(result_, "xrpl::Consensus::updateOurPositions : result is set");
// NOLINTBEGIN(bugprone-unchecked-optional-access) assert above
using namespace telemetry;
auto span = SpanGuard::span(TraceCategory::Consensus, seg::consensus, "update_positions");
auto span =
SpanGuard::span(TraceCategory::Consensus, seg::consensus, cons_span::op::updatePositions);
span.setAttribute(cons_span::attr::convergePercent, static_cast<int64_t>(convergePercent_));
span.setAttribute(cons_span::attr::proposers, static_cast<int64_t>(currPeerPositions_.size()));
span.setAttribute(
@@ -1690,7 +1691,7 @@ Consensus<Adaptor>::haveConsensus(std::unique_ptr<std::stringstream> const& clog
XRPL_ASSERT(result_, "xrpl::Consensus::haveConsensus : has result");
// NOLINTBEGIN(bugprone-unchecked-optional-access) assert above
using namespace telemetry;
auto span = SpanGuard::span(TraceCategory::Consensus, seg::consensus, "check");
auto span = SpanGuard::span(TraceCategory::Consensus, seg::consensus, cons_span::op::check);
// CHECKME: should possibly count unacquired TX sets as disagreeing
int agree = 0, disagree = 0;
@@ -1934,7 +1935,9 @@ Consensus<Adaptor>::startEstablishTracing()
return;
establishSpan_.emplace(
telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "establish"));
telemetry::TraceCategory::Consensus,
telemetry::seg::consensus,
telemetry::cons_span::op::establish));
}
template <class Adaptor>