mirror of
https://github.com/XRPLF/rippled.git
synced 2026-06-03 08:46:46 +00:00
feat(telemetry): add Phase 4 consensus tracing with SpanGuard API
Instrument the consensus subsystem with OpenTelemetry spans covering
the full round lifecycle: round start, establish phase, proposal send,
ledger close, position updates, consensus check, accept, validation
send, and mode changes.
Key design choices adapted from the original Phase 4 implementation
to the new SpanGuard factory pattern introduced in Phase 3:
- Add SpanGuard::hashSpan() for category-gated hash-derived trace IDs
(consensus round spans share trace_id across validators via ledger hash)
- Add SpanGuard::addEvent() overload with key-value attribute pairs
(used for dispute.resolve events during position updates)
- Add ConsensusSpanNames.h with compile-time span name constants
following the colocated *SpanNames.h pattern from Phase 3
- Add consensusTraceStrategy config option ("deterministic"/"attribute")
for cross-node trace correlation strategy selection
- Use SpanGuard::linkedSpan() for follows-from relationships between
consecutive rounds and cross-thread validation spans
- Use SpanGuard::captureContext() for thread-safe context propagation
from consensus thread to jtACCEPT worker thread
Spans produced: consensus.round, consensus.proposal.send,
consensus.ledger_close, consensus.establish, consensus.update_positions,
consensus.check, consensus.accept, consensus.accept.apply,
consensus.validation.send, consensus.mode_change
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -87,6 +87,12 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
std::string const&
|
||||
getConsensusTraceStrategy() const override
|
||||
{
|
||||
return setup_.consensusTraceStrategy;
|
||||
}
|
||||
|
||||
#ifdef XRPL_ENABLE_TELEMETRY
|
||||
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer>
|
||||
getTracer(std::string_view) override
|
||||
|
||||
@@ -43,6 +43,7 @@
|
||||
#include <cstring>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
namespace xrpl {
|
||||
namespace telemetry {
|
||||
@@ -298,6 +299,40 @@ SpanGuard::hashSpan(
|
||||
return SpanGuard(std::make_unique<Impl>(tel->startSpan(std::string(name), parentCtx)));
|
||||
}
|
||||
|
||||
// ===== Hash-derived span (generic, category-gated) =========================
|
||||
|
||||
SpanGuard
|
||||
SpanGuard::hashSpan(
|
||||
TraceCategory cat,
|
||||
std::string_view name,
|
||||
std::uint8_t const* hashData,
|
||||
std::size_t hashSize)
|
||||
{
|
||||
if (hashSize < 16)
|
||||
return {};
|
||||
auto* tel = Telemetry::getInstance();
|
||||
if (!tel || !tel->isEnabled() || !isCategoryEnabled(*tel, cat))
|
||||
return {};
|
||||
|
||||
otel_trace::TraceId traceId(opentelemetry::nostd::span<std::uint8_t const, 16>(hashData, 16));
|
||||
|
||||
std::uint8_t spanIdBytes[8];
|
||||
std::random_device rd;
|
||||
for (auto& b : spanIdBytes)
|
||||
b = static_cast<std::uint8_t>(rd());
|
||||
otel_trace::SpanId spanId(opentelemetry::nostd::span<std::uint8_t const, 8>(spanIdBytes, 8));
|
||||
|
||||
otel_trace::SpanContext syntheticCtx(
|
||||
traceId, spanId, otel_trace::TraceFlags(1), /* remote = */ false);
|
||||
|
||||
auto parentCtx = opentelemetry::context::Context{}.SetValue(
|
||||
otel_trace::kSpanKey,
|
||||
opentelemetry::nostd::shared_ptr<otel_trace::Span>(
|
||||
new otel_trace::DefaultSpan(syntheticCtx)));
|
||||
|
||||
return SpanGuard(std::make_unique<Impl>(tel->startSpan(std::string(name), parentCtx)));
|
||||
}
|
||||
|
||||
// ===== Context capture =====================================================
|
||||
|
||||
SpanContext
|
||||
@@ -390,6 +425,19 @@ SpanGuard::addEvent(std::string_view name)
|
||||
impl_->span->AddEvent(std::string(name));
|
||||
}
|
||||
|
||||
void
|
||||
SpanGuard::addEvent(std::string_view name, std::initializer_list<EventAttribute> attrs)
|
||||
{
|
||||
if (!impl_)
|
||||
return;
|
||||
// Own the strings to ensure lifetime safety through the AddEvent call.
|
||||
std::vector<std::pair<std::string, std::string>> owned;
|
||||
owned.reserve(attrs.size());
|
||||
for (auto const& [k, v] : attrs)
|
||||
owned.emplace_back(std::string(k), std::string(v));
|
||||
impl_->span->AddEvent(std::string(name), owned);
|
||||
}
|
||||
|
||||
void
|
||||
SpanGuard::recordException(std::exception const& e)
|
||||
{
|
||||
|
||||
@@ -193,6 +193,12 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
std::string const&
|
||||
getConsensusTraceStrategy() const override
|
||||
{
|
||||
return setup_.consensusTraceStrategy;
|
||||
}
|
||||
|
||||
opentelemetry::nostd::shared_ptr<trace_api::Tracer>
|
||||
getTracer(std::string_view) override
|
||||
{
|
||||
@@ -367,6 +373,12 @@ public:
|
||||
return setup_.traceLedger;
|
||||
}
|
||||
|
||||
std::string const&
|
||||
getConsensusTraceStrategy() const override
|
||||
{
|
||||
return setup_.consensusTraceStrategy;
|
||||
}
|
||||
|
||||
opentelemetry::nostd::shared_ptr<trace_api::Tracer>
|
||||
getTracer(std::string_view name) override
|
||||
{
|
||||
|
||||
@@ -77,6 +77,9 @@ setup_Telemetry(
|
||||
setup.tracePeer = section.value_or<int>("trace_peer", 0) != 0;
|
||||
setup.traceLedger = section.value_or<int>("trace_ledger", 1) != 0;
|
||||
|
||||
setup.consensusTraceStrategy =
|
||||
section.value_or<std::string>("consensus_trace_strategy", "deterministic");
|
||||
|
||||
return setup;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
#include <xrpl/telemetry/SpanGuard.h>
|
||||
#include <xrpl/telemetry/SpanNames.h>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
@@ -80,3 +81,26 @@ TEST(SpanGuardFactory, discard_safe_on_null)
|
||||
span.discard();
|
||||
EXPECT_FALSE(span);
|
||||
}
|
||||
|
||||
TEST(SpanGuardFactory, consensus_close_time_attributes)
|
||||
{
|
||||
// Verify the consensus attribute pattern compiles and
|
||||
// doesn't crash with null SpanGuard.
|
||||
{
|
||||
auto span = telemetry::SpanGuard::span(
|
||||
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "accept.apply");
|
||||
span.setAttribute("xrpl.consensus.ledger.seq", static_cast<int64_t>(42));
|
||||
span.setAttribute("xrpl.consensus.close_time", static_cast<int64_t>(780000000));
|
||||
span.setAttribute("xrpl.consensus.close_time_correct", true);
|
||||
span.setAttribute("xrpl.consensus.close_resolution_ms", static_cast<int64_t>(30000));
|
||||
span.setAttribute("xrpl.consensus.state", std::string("finished"));
|
||||
span.setAttribute("xrpl.consensus.proposing", true);
|
||||
span.setAttribute("xrpl.consensus.round_time_ms", static_cast<int64_t>(3500));
|
||||
}
|
||||
{
|
||||
auto span = telemetry::SpanGuard::span(
|
||||
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "accept.apply");
|
||||
span.setAttribute("xrpl.consensus.close_time_correct", false);
|
||||
span.setAttribute("xrpl.consensus.state", std::string("moved_on"));
|
||||
}
|
||||
}
|
||||
|
||||
156
src/xrpld/app/consensus/ConsensusSpanNames.h
Normal file
156
src/xrpld/app/consensus/ConsensusSpanNames.h
Normal file
@@ -0,0 +1,156 @@
|
||||
#pragma once
|
||||
|
||||
/** Compile-time span name constants for consensus tracing.
|
||||
*
|
||||
* Used by RCLConsensus (app) and Consensus.h (template) for
|
||||
* consensus lifecycle spans. Built on StaticStr/join() from SpanNames.h.
|
||||
*
|
||||
* Span hierarchy:
|
||||
*
|
||||
* consensus.round (deterministic trace_id from ledger hash)
|
||||
* |
|
||||
* +-- consensus.proposal.send
|
||||
* +-- consensus.ledger_close
|
||||
* +-- consensus.establish
|
||||
* +-- consensus.update_positions
|
||||
* +-- consensus.check
|
||||
* +-- consensus.accept
|
||||
* +-- consensus.accept.apply (jtACCEPT thread)
|
||||
* +-- consensus.validation.send (jtACCEPT thread, linked)
|
||||
* +-- consensus.mode_change
|
||||
*/
|
||||
|
||||
#include <xrpl/telemetry/SpanNames.h>
|
||||
|
||||
namespace xrpl {
|
||||
namespace telemetry {
|
||||
namespace cons_span {
|
||||
|
||||
// ===== Span name segments ====================================================
|
||||
|
||||
namespace op {
|
||||
inline constexpr auto round = makeStr("round");
|
||||
inline constexpr auto proposalSend = makeStr("proposal.send");
|
||||
inline constexpr auto ledgerClose = makeStr("ledger_close");
|
||||
inline constexpr auto establish = makeStr("establish");
|
||||
inline constexpr auto updatePositions = makeStr("update_positions");
|
||||
inline constexpr auto check = makeStr("check");
|
||||
inline constexpr auto accept = makeStr("accept");
|
||||
inline constexpr auto acceptApply = makeStr("accept.apply");
|
||||
inline constexpr auto validationSend = makeStr("validation.send");
|
||||
inline constexpr auto modeChange = makeStr("mode_change");
|
||||
} // namespace op
|
||||
|
||||
// ===== Full span names (prefix.op) ===========================================
|
||||
|
||||
inline constexpr auto round = join(seg::consensus, op::round);
|
||||
inline constexpr auto proposalSend = join(seg::consensus, op::proposalSend);
|
||||
inline constexpr auto ledgerClose = join(seg::consensus, op::ledgerClose);
|
||||
inline constexpr auto establish = join(seg::consensus, op::establish);
|
||||
inline constexpr auto updatePositions = join(seg::consensus, op::updatePositions);
|
||||
inline constexpr auto check = join(seg::consensus, op::check);
|
||||
inline constexpr auto accept = join(seg::consensus, op::accept);
|
||||
inline constexpr auto acceptApply = join(seg::consensus, op::acceptApply);
|
||||
inline constexpr auto validationSend = join(seg::consensus, op::validationSend);
|
||||
inline constexpr auto modeChange = join(seg::consensus, op::modeChange);
|
||||
|
||||
// ===== Attribute keys ========================================================
|
||||
|
||||
namespace attr {
|
||||
inline constexpr auto xrplConsensus = join(seg::xrpl, seg::consensus);
|
||||
|
||||
/// "xrpl.consensus.ledger_id"
|
||||
inline constexpr auto ledgerId = join(xrplConsensus, makeStr("ledger_id"));
|
||||
/// "xrpl.consensus.ledger.seq"
|
||||
inline constexpr auto ledgerSeq = join(xrplConsensus, makeStr("ledger.seq"));
|
||||
/// "xrpl.consensus.mode"
|
||||
inline constexpr auto mode = join(xrplConsensus, makeStr("mode"));
|
||||
/// "xrpl.consensus.round"
|
||||
inline constexpr auto round = join(xrplConsensus, makeStr("round"));
|
||||
/// "xrpl.consensus.proposers"
|
||||
inline constexpr auto proposers = join(xrplConsensus, makeStr("proposers"));
|
||||
/// "xrpl.consensus.round_time_ms"
|
||||
inline constexpr auto roundTimeMs = join(xrplConsensus, makeStr("round_time_ms"));
|
||||
/// "xrpl.consensus.proposing"
|
||||
inline constexpr auto proposing = join(xrplConsensus, makeStr("proposing"));
|
||||
/// "xrpl.consensus.state"
|
||||
inline constexpr auto state = join(xrplConsensus, makeStr("state"));
|
||||
|
||||
// Close time attributes
|
||||
/// "xrpl.consensus.close_time"
|
||||
inline constexpr auto closeTime = join(xrplConsensus, makeStr("close_time"));
|
||||
/// "xrpl.consensus.close_time_correct"
|
||||
inline constexpr auto closeTimeCorrect = join(xrplConsensus, makeStr("close_time_correct"));
|
||||
/// "xrpl.consensus.close_resolution_ms"
|
||||
inline constexpr auto closeResolutionMs = join(xrplConsensus, makeStr("close_resolution_ms"));
|
||||
/// "xrpl.consensus.parent_close_time"
|
||||
inline constexpr auto parentCloseTime = join(xrplConsensus, makeStr("parent_close_time"));
|
||||
/// "xrpl.consensus.close_time_self"
|
||||
inline constexpr auto closeTimeSelf = join(xrplConsensus, makeStr("close_time_self"));
|
||||
/// "xrpl.consensus.close_time_vote_bins"
|
||||
inline constexpr auto closeTimeVoteBins = join(xrplConsensus, makeStr("close_time_vote_bins"));
|
||||
/// "xrpl.consensus.resolution_direction"
|
||||
inline constexpr auto resolutionDirection = join(xrplConsensus, makeStr("resolution_direction"));
|
||||
|
||||
// Establish/convergence attributes
|
||||
/// "xrpl.consensus.converge_percent"
|
||||
inline constexpr auto convergePercent = join(xrplConsensus, makeStr("converge_percent"));
|
||||
/// "xrpl.consensus.establish_count"
|
||||
inline constexpr auto establishCount = join(xrplConsensus, makeStr("establish_count"));
|
||||
/// "xrpl.consensus.proposers_agreed"
|
||||
inline constexpr auto proposersAgreed = join(xrplConsensus, makeStr("proposers_agreed"));
|
||||
|
||||
// Consensus check attributes
|
||||
/// "xrpl.consensus.agree_count"
|
||||
inline constexpr auto agreeCount = join(xrplConsensus, makeStr("agree_count"));
|
||||
/// "xrpl.consensus.disagree_count"
|
||||
inline constexpr auto disagreeCount = join(xrplConsensus, makeStr("disagree_count"));
|
||||
/// "xrpl.consensus.threshold_percent"
|
||||
inline constexpr auto thresholdPercent = join(xrplConsensus, makeStr("threshold_percent"));
|
||||
/// "xrpl.consensus.result"
|
||||
inline constexpr auto result = join(xrplConsensus, makeStr("result"));
|
||||
/// "xrpl.consensus.quorum"
|
||||
inline constexpr auto quorum = join(xrplConsensus, makeStr("quorum"));
|
||||
/// "xrpl.consensus.validation_count"
|
||||
inline constexpr auto validationCount = join(xrplConsensus, makeStr("validation_count"));
|
||||
|
||||
// Trace strategy attribute
|
||||
/// "xrpl.consensus.trace_strategy"
|
||||
inline constexpr auto traceStrategy = join(xrplConsensus, makeStr("trace_strategy"));
|
||||
/// "xrpl.consensus.round_id"
|
||||
inline constexpr auto roundId = join(xrplConsensus, makeStr("round_id"));
|
||||
|
||||
// Mode change attributes
|
||||
/// "xrpl.consensus.mode.old"
|
||||
inline constexpr auto modeOld = join(xrplConsensus, makeStr("mode.old"));
|
||||
/// "xrpl.consensus.mode.new"
|
||||
inline constexpr auto modeNew = join(xrplConsensus, makeStr("mode.new"));
|
||||
|
||||
// Dispute event attributes
|
||||
/// "xrpl.tx.id"
|
||||
inline constexpr auto txId = join(join(seg::xrpl, seg::tx), makeStr("id"));
|
||||
/// "xrpl.dispute.our_vote"
|
||||
inline constexpr auto disputeOurVote =
|
||||
join(join(seg::xrpl, makeStr("dispute")), makeStr("our_vote"));
|
||||
/// "xrpl.dispute.yays"
|
||||
inline constexpr auto disputeYays = join(join(seg::xrpl, makeStr("dispute")), makeStr("yays"));
|
||||
/// "xrpl.dispute.nays"
|
||||
inline constexpr auto disputeNays = join(join(seg::xrpl, makeStr("dispute")), makeStr("nays"));
|
||||
} // namespace attr
|
||||
|
||||
// ===== Attribute values ======================================================
|
||||
|
||||
namespace val {
|
||||
inline constexpr auto finished = makeStr("finished");
|
||||
inline constexpr auto movedOn = makeStr("moved_on");
|
||||
inline constexpr auto yes = makeStr("yes");
|
||||
inline constexpr auto no = makeStr("no");
|
||||
inline constexpr auto expired = makeStr("expired");
|
||||
inline constexpr auto increased = makeStr("increased");
|
||||
inline constexpr auto decreased = makeStr("decreased");
|
||||
inline constexpr auto unchanged = makeStr("unchanged");
|
||||
} // namespace val
|
||||
|
||||
} // namespace cons_span
|
||||
} // namespace telemetry
|
||||
} // namespace xrpl
|
||||
@@ -1,3 +1,4 @@
|
||||
#include <xrpld/app/consensus/ConsensusSpanNames.h>
|
||||
#include <xrpld/app/consensus/RCLConsensus.h>
|
||||
|
||||
#include <xrpld/app/consensus/RCLCensorshipDetector.h>
|
||||
@@ -230,6 +231,11 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx)
|
||||
void
|
||||
RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal)
|
||||
{
|
||||
auto span = telemetry::SpanGuard::span(
|
||||
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "proposal.send");
|
||||
span.setAttribute(
|
||||
telemetry::cons_span::attr::round, static_cast<int64_t>(proposal.proposeSeq()));
|
||||
|
||||
JLOG(j_.trace()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ")
|
||||
<< xrpl::to_string(proposal.prevLedger()) << " -> "
|
||||
<< xrpl::to_string(proposal.position());
|
||||
@@ -342,6 +348,13 @@ RCLConsensus::Adaptor::onClose(
|
||||
NetClock::time_point const& closeTime,
|
||||
ConsensusMode mode) -> Result
|
||||
{
|
||||
auto span = telemetry::SpanGuard::span(
|
||||
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "ledger_close");
|
||||
span.setAttribute(
|
||||
telemetry::cons_span::attr::ledgerSeq,
|
||||
static_cast<int64_t>(ledger.ledger_->header().seq + 1));
|
||||
span.setAttribute(telemetry::cons_span::attr::mode, to_string(mode).c_str());
|
||||
|
||||
bool const wrongLCL = mode == ConsensusMode::wrongLedger;
|
||||
bool const proposing = mode == ConsensusMode::proposing;
|
||||
|
||||
@@ -450,6 +463,18 @@ RCLConsensus::Adaptor::onAccept(
|
||||
Json::Value&& consensusJson,
|
||||
bool const validating)
|
||||
{
|
||||
{
|
||||
auto span = telemetry::SpanGuard::span(
|
||||
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "accept");
|
||||
span.setAttribute(
|
||||
telemetry::cons_span::attr::proposers, static_cast<int64_t>(result.proposers));
|
||||
span.setAttribute(
|
||||
telemetry::cons_span::attr::roundTimeMs,
|
||||
static_cast<int64_t>(result.roundTime.read().count()));
|
||||
span.setAttribute(
|
||||
telemetry::cons_span::attr::quorum, static_cast<int64_t>(result.proposers));
|
||||
}
|
||||
|
||||
app_.getJobQueue().addJob(
|
||||
jtACCEPT,
|
||||
"AcceptLedger",
|
||||
@@ -501,6 +526,41 @@ RCLConsensus::Adaptor::doAccept(
|
||||
closeTimeCorrect = true;
|
||||
}
|
||||
|
||||
auto doAcceptSpan = telemetry::SpanGuard::span(
|
||||
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "accept.apply");
|
||||
doAcceptSpan.setAttribute(
|
||||
telemetry::cons_span::attr::ledgerSeq, static_cast<int64_t>(prevLedger.seq() + 1));
|
||||
doAcceptSpan.setAttribute(
|
||||
telemetry::cons_span::attr::closeTime,
|
||||
static_cast<int64_t>(consensusCloseTime.time_since_epoch().count()));
|
||||
doAcceptSpan.setAttribute(telemetry::cons_span::attr::closeTimeCorrect, closeTimeCorrect);
|
||||
doAcceptSpan.setAttribute(
|
||||
telemetry::cons_span::attr::closeResolutionMs,
|
||||
static_cast<int64_t>(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(closeResolution).count()));
|
||||
doAcceptSpan.setAttribute(
|
||||
telemetry::cons_span::attr::state, std::string(consensusFail ? "moved_on" : "finished"));
|
||||
doAcceptSpan.setAttribute(telemetry::cons_span::attr::proposing, proposing);
|
||||
doAcceptSpan.setAttribute(
|
||||
telemetry::cons_span::attr::roundTimeMs,
|
||||
static_cast<int64_t>(result.roundTime.read().count()));
|
||||
doAcceptSpan.setAttribute(
|
||||
telemetry::cons_span::attr::parentCloseTime,
|
||||
static_cast<int64_t>(prevLedger.closeTime().time_since_epoch().count()));
|
||||
doAcceptSpan.setAttribute(
|
||||
telemetry::cons_span::attr::closeTimeSelf,
|
||||
static_cast<int64_t>(rawCloseTimes.self.time_since_epoch().count()));
|
||||
doAcceptSpan.setAttribute(
|
||||
telemetry::cons_span::attr::closeTimeVoteBins,
|
||||
static_cast<int64_t>(rawCloseTimes.peers.size()));
|
||||
{
|
||||
auto const prevRes = prevLedger.closeTimeResolution();
|
||||
std::string dir = (closeResolution > prevRes) ? "increased"
|
||||
: (closeResolution < prevRes) ? "decreased"
|
||||
: "unchanged";
|
||||
doAcceptSpan.setAttribute(telemetry::cons_span::attr::resolutionDirection, std::move(dir));
|
||||
}
|
||||
|
||||
JLOG(j_.debug()) << "Report: Prop=" << (proposing ? "yes" : "no")
|
||||
<< " val=" << (validating_ ? "yes" : "no")
|
||||
<< " corLCL=" << (haveCorrectLCL ? "yes" : "no")
|
||||
@@ -818,6 +878,14 @@ RCLConsensus::Adaptor::buildLCL(
|
||||
void
|
||||
RCLConsensus::Adaptor::validate(RCLCxLedger const& ledger, RCLTxSet const& txns, bool proposing)
|
||||
{
|
||||
auto valSpan = createValidationSpan();
|
||||
if (valSpan)
|
||||
{
|
||||
valSpan->setAttribute(
|
||||
telemetry::cons_span::attr::ledgerSeq, static_cast<int64_t>(ledger.seq()));
|
||||
valSpan->setAttribute(telemetry::cons_span::attr::proposing, proposing);
|
||||
}
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
auto validationTime = app_.getTimeKeeper().closeTime();
|
||||
@@ -913,6 +981,11 @@ RCLConsensus::Adaptor::validate(RCLCxLedger const& ledger, RCLTxSet const& txns,
|
||||
void
|
||||
RCLConsensus::Adaptor::onModeChange(ConsensusMode before, ConsensusMode after)
|
||||
{
|
||||
auto span = telemetry::SpanGuard::span(
|
||||
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "mode_change");
|
||||
span.setAttribute(telemetry::cons_span::attr::modeOld, to_string(before).c_str());
|
||||
span.setAttribute(telemetry::cons_span::attr::modeNew, to_string(after).c_str());
|
||||
|
||||
JLOG(j_.info()) << "Consensus mode change before=" << to_string(before)
|
||||
<< ", after=" << to_string(after);
|
||||
|
||||
@@ -1035,6 +1108,8 @@ RCLConsensus::Adaptor::preStartRound(RCLCxLedger const& prevLgr, hash_set<NodeID
|
||||
if (!nowTrusted.empty())
|
||||
nUnlVote_.newValidators(prevLgr.seq() + 1, nowTrusted);
|
||||
|
||||
startRoundTracing(prevLgr);
|
||||
|
||||
// propose only if we're in sync with the network (and validating)
|
||||
return validating_ && synced;
|
||||
}
|
||||
@@ -1078,6 +1153,67 @@ RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
|
||||
app_.getOPs().setMode(OperatingMode::CONNECTED);
|
||||
}
|
||||
|
||||
void
|
||||
RCLConsensus::Adaptor::startRoundTracing(RCLCxLedger const& prevLgr)
|
||||
{
|
||||
using namespace telemetry;
|
||||
|
||||
if (roundSpan_)
|
||||
{
|
||||
prevRoundContext_ = roundSpan_->captureContext();
|
||||
roundSpan_.reset();
|
||||
}
|
||||
|
||||
auto const& strategy = app_.getTelemetry().getConsensusTraceStrategy();
|
||||
|
||||
if (strategy == "deterministic")
|
||||
{
|
||||
roundSpan_.emplace(
|
||||
SpanGuard::hashSpan(
|
||||
TraceCategory::Consensus,
|
||||
cons_span::round,
|
||||
prevLgr.id().data(),
|
||||
prevLgr.id().bytes));
|
||||
}
|
||||
else
|
||||
{
|
||||
roundSpan_.emplace(SpanGuard::span(TraceCategory::Consensus, seg::consensus, "round"));
|
||||
}
|
||||
|
||||
if (!*roundSpan_)
|
||||
return;
|
||||
|
||||
if (prevRoundContext_.isValid())
|
||||
{
|
||||
// Create a linked span to establish follows-from relationship
|
||||
// between consecutive rounds, then transfer to roundSpan_.
|
||||
auto linked = SpanGuard::linkedSpan(cons_span::round, prevRoundContext_);
|
||||
if (linked)
|
||||
{
|
||||
roundSpan_.emplace(std::move(linked));
|
||||
}
|
||||
}
|
||||
|
||||
roundSpan_->setAttribute(cons_span::attr::ledgerId, to_string(prevLgr.id()).c_str());
|
||||
roundSpan_->setAttribute(cons_span::attr::ledgerSeq, static_cast<int64_t>(prevLgr.seq() + 1));
|
||||
roundSpan_->setAttribute(cons_span::attr::mode, to_string(mode_.load()).c_str());
|
||||
roundSpan_->setAttribute(cons_span::attr::traceStrategy, strategy.c_str());
|
||||
roundSpan_->setAttribute(cons_span::attr::roundId, static_cast<int64_t>(prevLgr.seq() + 1));
|
||||
|
||||
roundSpanContext_ = roundSpan_->captureContext();
|
||||
}
|
||||
|
||||
std::optional<telemetry::SpanGuard>
|
||||
RCLConsensus::Adaptor::createValidationSpan()
|
||||
{
|
||||
using namespace telemetry;
|
||||
|
||||
if (!roundSpanContext_.isValid())
|
||||
return std::nullopt;
|
||||
|
||||
return SpanGuard::linkedSpan(cons_span::validationSend, roundSpanContext_);
|
||||
}
|
||||
|
||||
void
|
||||
RCLConsensus::startRound(
|
||||
NetClock::time_point const& now,
|
||||
|
||||
@@ -12,10 +12,12 @@
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
#include <xrpl/protocol/RippleLedgerHash.h>
|
||||
#include <xrpl/shamap/SHAMap.h>
|
||||
#include <xrpl/telemetry/SpanGuard.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <set>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
@@ -68,6 +70,31 @@ class RCLConsensus
|
||||
RCLCensorshipDetector<TxID, LedgerIndex> censorshipDetector_;
|
||||
NegativeUNLVote nUnlVote_;
|
||||
|
||||
/** Span for the current consensus round.
|
||||
*
|
||||
* Created in preStartRound(), ended (via reset()) when the next
|
||||
* round begins. When consensusTraceStrategy is "deterministic",
|
||||
* the trace_id is derived from previousLedger.id() so that all
|
||||
* validators in the same round share the same trace_id.
|
||||
*/
|
||||
std::optional<telemetry::SpanGuard> roundSpan_;
|
||||
|
||||
/** Context captured from the previous consensus round.
|
||||
*
|
||||
* Used to create span links (follows-from) between consecutive
|
||||
* rounds, establishing a causal chain in the trace backend.
|
||||
*/
|
||||
telemetry::SpanContext prevRoundContext_;
|
||||
|
||||
/** SpanContext snapshot of the current round span.
|
||||
*
|
||||
* Captured in startRoundTracing() as a lightweight value-type copy
|
||||
* so that createValidationSpan() — which runs on the jtACCEPT
|
||||
* worker thread — can build span links without accessing roundSpan_
|
||||
* across threads.
|
||||
*/
|
||||
telemetry::SpanContext roundSpanContext_;
|
||||
|
||||
public:
|
||||
using Ledger_t = RCLCxLedger;
|
||||
using NodeID_t = NodeID;
|
||||
@@ -156,6 +183,27 @@ class RCLConsensus
|
||||
return parms_;
|
||||
}
|
||||
|
||||
/** Set up the consensus round span and link it to the previous round.
|
||||
*
|
||||
* Saves the previous round's context for span-link construction,
|
||||
* ends the old round span, and creates a new "consensus.round" span.
|
||||
* Depending on the configured trace strategy the trace_id is either
|
||||
* deterministic (derived from prevLgr hash) or random.
|
||||
*
|
||||
* @param prevLgr The ledger that will be the prior ledger for the
|
||||
* new round.
|
||||
*/
|
||||
void
|
||||
startRoundTracing(RCLCxLedger const& prevLgr);
|
||||
|
||||
/** Create the "consensus.validation.send" span linked to the round.
|
||||
*
|
||||
* @return An engaged optional SpanGuard if tracing is active,
|
||||
* std::nullopt otherwise.
|
||||
*/
|
||||
std::optional<telemetry::SpanGuard>
|
||||
createValidationSpan();
|
||||
|
||||
private:
|
||||
//---------------------------------------------------------------------
|
||||
// The following members implement the generic Consensus requirements
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <xrpld/app/consensus/ConsensusSpanNames.h>
|
||||
#include <xrpld/consensus/ConsensusParms.h>
|
||||
#include <xrpld/consensus/ConsensusProposal.h>
|
||||
#include <xrpld/consensus/ConsensusTypes.h>
|
||||
@@ -10,6 +11,7 @@
|
||||
#include <xrpl/beast/utility/Journal.h>
|
||||
#include <xrpl/json/json_writer.h>
|
||||
#include <xrpl/ledger/LedgerTiming.h>
|
||||
#include <xrpl/telemetry/SpanGuard.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
@@ -601,6 +603,21 @@ private:
|
||||
// nodes that have bowed out of this consensus process
|
||||
hash_set<NodeID_t> deadNodes_;
|
||||
|
||||
/** Span for the establish phase of consensus.
|
||||
* Created when the ledger closes and we enter phaseEstablish;
|
||||
* cleared (ended) when consensus is reached.
|
||||
*/
|
||||
std::optional<xrpl::telemetry::SpanGuard> establishSpan_;
|
||||
|
||||
void
|
||||
startEstablishTracing();
|
||||
|
||||
void
|
||||
updateEstablishTracing();
|
||||
|
||||
void
|
||||
endEstablishTracing();
|
||||
|
||||
// Journal for debugging
|
||||
beast::Journal const j_;
|
||||
};
|
||||
@@ -1327,6 +1344,8 @@ Consensus<Adaptor>::phaseEstablish(std::unique_ptr<std::stringstream> const& clo
|
||||
XRPL_ASSERT(result_, "xrpl::Consensus::phaseEstablish : result is set");
|
||||
// NOLINTBEGIN(bugprone-unchecked-optional-access) assert above
|
||||
|
||||
startEstablishTracing();
|
||||
|
||||
++peerUnchangedCounter_;
|
||||
++establishCounter_;
|
||||
|
||||
@@ -1354,6 +1373,8 @@ Consensus<Adaptor>::phaseEstablish(std::unique_ptr<std::stringstream> const& clo
|
||||
|
||||
updateOurPositions(clog);
|
||||
|
||||
updateEstablishTracing();
|
||||
|
||||
// Nothing to do if too many laggards or we don't have consensus.
|
||||
if (shouldPause(clog) || !haveConsensus(clog))
|
||||
return;
|
||||
@@ -1371,6 +1392,7 @@ Consensus<Adaptor>::phaseEstablish(std::unique_ptr<std::stringstream> const& clo
|
||||
adaptor_.updateOperatingMode(currPeerPositions_.size());
|
||||
prevProposers_ = currPeerPositions_.size();
|
||||
prevRoundTime_ = result_->roundTime.read();
|
||||
endEstablishTracing();
|
||||
phase_ = ConsensusPhase::accepted;
|
||||
JLOG(j_.debug()) << "transitioned to ConsensusPhase::accepted";
|
||||
adaptor_.onAccept(
|
||||
@@ -1447,6 +1469,10 @@ Consensus<Adaptor>::updateOurPositions(std::unique_ptr<std::stringstream> const&
|
||||
// We must have a position if we are updating it
|
||||
XRPL_ASSERT(result_, "xrpl::Consensus::updateOurPositions : result is set");
|
||||
// NOLINTBEGIN(bugprone-unchecked-optional-access) assert above
|
||||
using namespace telemetry;
|
||||
auto span = SpanGuard::span(TraceCategory::Consensus, seg::consensus, "update_positions");
|
||||
span.setAttribute(cons_span::attr::convergePercent, static_cast<int64_t>(convergePercent_));
|
||||
span.setAttribute(cons_span::attr::proposers, static_cast<int64_t>(currPeerPositions_.size()));
|
||||
ConsensusParms const& parms = adaptor_.parms();
|
||||
|
||||
// Compute a cutoff time
|
||||
@@ -1506,6 +1532,11 @@ Consensus<Adaptor>::updateOurPositions(std::unique_ptr<std::stringstream> const&
|
||||
// now a no
|
||||
mutableSet->erase(txId);
|
||||
}
|
||||
|
||||
span.addEvent(
|
||||
"dispute.resolve",
|
||||
{{cons_span::attr::txId, to_string(txId)},
|
||||
{cons_span::attr::disputeOurVote, dispute.getOurVote() ? "yes" : "no"}});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1629,6 +1660,8 @@ Consensus<Adaptor>::haveConsensus(std::unique_ptr<std::stringstream> const& clog
|
||||
// Must have a stance if we are checking for consensus
|
||||
XRPL_ASSERT(result_, "xrpl::Consensus::haveConsensus : has result");
|
||||
// NOLINTBEGIN(bugprone-unchecked-optional-access) assert above
|
||||
using namespace telemetry;
|
||||
auto span = SpanGuard::span(TraceCategory::Consensus, seg::consensus, "check");
|
||||
|
||||
// CHECKME: should possibly count unacquired TX sets as disagreeing
|
||||
int agree = 0, disagree = 0;
|
||||
@@ -1728,6 +1761,17 @@ Consensus<Adaptor>::haveConsensus(std::unique_ptr<std::stringstream> const& clog
|
||||
CLOG(clog) << "Unable to reach consensus " << Json::Compact{getJson(true)} << ". ";
|
||||
}
|
||||
|
||||
span.setAttribute(cons_span::attr::agreeCount, static_cast<int64_t>(agree));
|
||||
span.setAttribute(cons_span::attr::disagreeCount, static_cast<int64_t>(disagree));
|
||||
span.setAttribute(cons_span::attr::convergePercent, static_cast<int64_t>(convergePercent_));
|
||||
|
||||
char const* stateStr = "no";
|
||||
if (result_->state == ConsensusState::Yes)
|
||||
stateStr = "yes";
|
||||
else if (result_->state == ConsensusState::MovedOn)
|
||||
stateStr = "moved_on";
|
||||
span.setAttribute(cons_span::attr::result, stateStr);
|
||||
|
||||
CLOG(clog) << "Consensus has been reached. ";
|
||||
// NOLINTEND(bugprone-unchecked-optional-access)
|
||||
return true;
|
||||
@@ -1849,4 +1893,36 @@ Consensus<Adaptor>::asCloseTime(NetClock::time_point raw) const
|
||||
return roundCloseTime(raw, closeResolution_);
|
||||
}
|
||||
|
||||
template <class Adaptor>
|
||||
void
|
||||
Consensus<Adaptor>::startEstablishTracing()
|
||||
{
|
||||
if (establishSpan_)
|
||||
return;
|
||||
establishSpan_.emplace(
|
||||
telemetry::SpanGuard::span(
|
||||
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, "establish"));
|
||||
}
|
||||
|
||||
template <class Adaptor>
|
||||
void
|
||||
Consensus<Adaptor>::updateEstablishTracing()
|
||||
{
|
||||
if (!establishSpan_)
|
||||
return;
|
||||
establishSpan_->setAttribute(
|
||||
telemetry::cons_span::attr::convergePercent, static_cast<int64_t>(convergePercent_));
|
||||
establishSpan_->setAttribute(
|
||||
telemetry::cons_span::attr::establishCount, static_cast<int64_t>(establishCounter_));
|
||||
establishSpan_->setAttribute(
|
||||
telemetry::cons_span::attr::proposers, static_cast<int64_t>(currPeerPositions_.size()));
|
||||
}
|
||||
|
||||
template <class Adaptor>
|
||||
void
|
||||
Consensus<Adaptor>::endEstablishTracing()
|
||||
{
|
||||
establishSpan_.reset();
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
@@ -176,6 +176,20 @@ public:
|
||||
[[nodiscard]] Json::Value
|
||||
getJson() const;
|
||||
|
||||
//! Number of peers voting yes.
|
||||
int
|
||||
getYays() const
|
||||
{
|
||||
return yays_;
|
||||
}
|
||||
|
||||
//! Number of peers voting no.
|
||||
int
|
||||
getNays() const
|
||||
{
|
||||
return nays_;
|
||||
}
|
||||
|
||||
private:
|
||||
int yays_{0}; //< Number of yes votes
|
||||
int nays_{0}; //< Number of no votes
|
||||
|
||||
Reference in New Issue
Block a user