mirror of
https://github.com/XRPLF/rippled.git
synced 2026-06-02 08:17:13 +00:00
fixed interround consensus linking and added some state attrs to spans.
Signed-off-by: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com>
This commit is contained in:
@@ -279,17 +279,22 @@ public:
|
||||
TraceCategory. All nodes using the same hash independently produce
|
||||
spans under the same trace_id, enabling cross-node correlation
|
||||
without context propagation.
|
||||
@param cat Trace subsystem category.
|
||||
@param name Full span name (e.g. "tx.receive").
|
||||
@param hashData Pointer to at least 16 bytes of hash data.
|
||||
@param hashSize Size of the hash buffer (must be >= 16).
|
||||
@param cat Trace subsystem category.
|
||||
@param name Full span name (e.g. "tx.receive").
|
||||
@param hashData Pointer to at least 16 bytes of hash data.
|
||||
@param hashSize Size of the hash buffer (must be >= 16).
|
||||
@param followsFrom Optional captured context to attach as a
|
||||
follows-from link. Use to stitch sequential
|
||||
top-level spans (e.g. consecutive consensus
|
||||
rounds). Ignored if nullptr or invalid.
|
||||
*/
|
||||
static SpanGuard
|
||||
hashSpan(
|
||||
TraceCategory const cat,
|
||||
std::string_view const name,
|
||||
std::uint8_t const* const hashData,
|
||||
std::size_t const hashSize);
|
||||
std::size_t const hashSize,
|
||||
SpanContext const* followsFrom = nullptr);
|
||||
|
||||
/** Create a hash-derived span with a remote parent.
|
||||
trace_id = hashData[0:16], parent span_id from protobuf context
|
||||
@@ -460,7 +465,12 @@ public:
|
||||
}
|
||||
|
||||
[[nodiscard]] static SpanGuard
|
||||
hashSpan(TraceCategory, std::string_view, std::uint8_t const*, std::size_t)
|
||||
hashSpan(
|
||||
TraceCategory,
|
||||
std::string_view,
|
||||
std::uint8_t const*,
|
||||
std::size_t,
|
||||
SpanContext const* = nullptr)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
@@ -270,7 +270,8 @@ SpanGuard::hashSpan(
|
||||
TraceCategory const cat,
|
||||
std::string_view const name,
|
||||
std::uint8_t const* const hashData,
|
||||
std::size_t const hashSize)
|
||||
std::size_t const hashSize,
|
||||
SpanContext const* const followsFrom)
|
||||
{
|
||||
if (hashSize < 16)
|
||||
return {};
|
||||
@@ -293,6 +294,25 @@ SpanGuard::hashSpan(
|
||||
opentelemetry::nostd::shared_ptr<otel_trace::Span>(
|
||||
new otel_trace::DefaultSpan(syntheticCtx)));
|
||||
|
||||
if (followsFrom != nullptr && followsFrom->isValid())
|
||||
{
|
||||
auto linkSpan = otel_trace::GetSpan(followsFrom->impl_->ctx);
|
||||
if (linkSpan && linkSpan->GetContext().IsValid())
|
||||
{
|
||||
auto tracer = tel->getTracer("xrpld");
|
||||
otel_trace::StartSpanOptions opts;
|
||||
opts.parent = parentCtx;
|
||||
opts.kind = categoryToSpanKind(cat);
|
||||
return SpanGuard(
|
||||
std::make_unique<Impl>(tracer->StartSpan(
|
||||
std::string(name),
|
||||
{},
|
||||
{{linkSpan->GetContext(),
|
||||
{{std::string(attr::linkType), std::string(attr_val::followsFrom)}}}},
|
||||
opts)));
|
||||
}
|
||||
}
|
||||
|
||||
return SpanGuard(std::make_unique<Impl>(tel->startSpan(std::string(name), parentCtx)));
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include <boost/container/flat_set.hpp>
|
||||
|
||||
#include <algorithm>
|
||||
#include <string_view>
|
||||
|
||||
namespace xrpl::test::csf {
|
||||
|
||||
@@ -616,6 +617,20 @@ struct Peer
|
||||
{
|
||||
}
|
||||
|
||||
// Telemetry hooks — no-ops in the simulator. The generic engine calls
|
||||
// these at every phase transition / outcome resolution so the
|
||||
// production adaptor (RCLConsensus::Adaptor) can record events on the
|
||||
// round span; the simulator runs without telemetry.
|
||||
void
|
||||
onPhaseEvent(std::string_view, std::string_view)
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
onOutcomeEvent(std::string_view)
|
||||
{
|
||||
}
|
||||
|
||||
// Share a message by broadcasting to all connected peers
|
||||
template <class M>
|
||||
void
|
||||
|
||||
@@ -349,6 +349,13 @@ RCLConsensus::Adaptor::onClose(
|
||||
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, cs::op::ledgerClose);
|
||||
span.setAttribute(cs::attr::ledgerSeq, static_cast<int64_t>(ledger.ledger_->header().seq) + 1);
|
||||
span.setAttribute(cs::attr::mode, toDisplayString(mode).c_str());
|
||||
span.setAttribute(
|
||||
cs::attr::txCountOpen, static_cast<int64_t>(app_.getOpenLedger().current()->txCount()));
|
||||
span.setAttribute(
|
||||
cs::attr::closeTimeResolutionMs,
|
||||
static_cast<int64_t>(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(ledger.closeTimeResolution())
|
||||
.count()));
|
||||
|
||||
bool const wrongLCL = mode == ConsensusMode::wrongLedger;
|
||||
bool const proposing = mode == ConsensusMode::proposing;
|
||||
@@ -502,6 +509,15 @@ RCLConsensus::Adaptor::makeAcceptSpan(Result const& result)
|
||||
span->setAttribute(
|
||||
cs::attr::roundTimeMs, static_cast<int64_t>(result.roundTime.read().count()));
|
||||
span->setAttribute(cs::attr::quorum, static_cast<int64_t>(app_.getValidators().quorum()));
|
||||
|
||||
// Capture the accept span's context so createValidationSpan() — which
|
||||
// runs on the jtACCEPT worker thread — can link the validation.send
|
||||
// span to the accept span (matching the design diagram and the
|
||||
// "validation follows acceptance" causal model).
|
||||
if (*span)
|
||||
{
|
||||
acceptSpanContext_ = span->captureContext();
|
||||
}
|
||||
return span;
|
||||
}
|
||||
|
||||
@@ -567,6 +583,8 @@ RCLConsensus::Adaptor::doAccept(
|
||||
static_cast<int64_t>(rawCloseTimes.self.time_since_epoch().count()));
|
||||
doAcceptSpan.setAttribute(
|
||||
cs::attr::closeTimeVoteBins, static_cast<int64_t>(rawCloseTimes.peers.size()));
|
||||
doAcceptSpan.setAttribute(
|
||||
cs::attr::disputesResolvedCount, static_cast<int64_t>(result.disputes.size()));
|
||||
{
|
||||
auto const prevRes = prevLedger.closeTimeResolution();
|
||||
auto const dir = [&]() -> std::string {
|
||||
@@ -908,9 +926,12 @@ RCLConsensus::Adaptor::validate(RCLCxLedger const& ledger, RCLTxSet const& txns,
|
||||
auto valSpan = createValidationSpan();
|
||||
if (valSpan)
|
||||
{
|
||||
valSpan->setAttribute(
|
||||
telemetry::consensus::span::attr::ledgerSeq, static_cast<int64_t>(ledger.seq()));
|
||||
valSpan->setAttribute(telemetry::consensus::span::attr::proposing, proposing);
|
||||
namespace cs = telemetry::consensus::span;
|
||||
valSpan->setAttribute(cs::attr::ledgerSeq, static_cast<int64_t>(ledger.seq()));
|
||||
valSpan->setAttribute(cs::attr::proposing, proposing);
|
||||
// proposing implies a full validation (vfFullValidation is set on
|
||||
// the STValidation only when proposing — see below).
|
||||
valSpan->setAttribute(cs::attr::fullValidation, proposing);
|
||||
}
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
@@ -920,6 +941,13 @@ RCLConsensus::Adaptor::validate(RCLCxLedger const& ledger, RCLTxSet const& txns,
|
||||
validationTime = lastValidationTime_ + 1s;
|
||||
lastValidationTime_ = validationTime;
|
||||
|
||||
if (valSpan)
|
||||
{
|
||||
valSpan->setAttribute(
|
||||
telemetry::consensus::span::attr::validationSignTime,
|
||||
static_cast<int64_t>(validationTime.time_since_epoch().count()));
|
||||
}
|
||||
|
||||
if (!validatorKeys_.keys)
|
||||
{
|
||||
JLOG(j_.warn()) << "RCLConsensus::Adaptor::validate: ValidatorKeys "
|
||||
@@ -1191,11 +1219,23 @@ RCLConsensus::Adaptor::startRoundTracing(RCLCxLedger const& prevLgr)
|
||||
{
|
||||
namespace cs = telemetry::consensus::span;
|
||||
|
||||
// Capture the prior round's context BEFORE the new span overwrites
|
||||
// roundSpanContext_; used to add a follows-from link from the new
|
||||
// round's span so consecutive rounds remain navigable.
|
||||
prevRoundSpanContext_ = roundSpanContext_;
|
||||
|
||||
// Reset the prior accept context so a stale value can't be used to
|
||||
// link a validation that fires before this round's accept span exists.
|
||||
acceptSpanContext_ = telemetry::SpanContext{};
|
||||
|
||||
if (roundSpan_)
|
||||
roundSpan_.reset();
|
||||
|
||||
auto const& strategy = app_.getTelemetry().getConsensusTraceStrategy();
|
||||
|
||||
telemetry::SpanContext const* const link =
|
||||
prevRoundSpanContext_.isValid() ? &prevRoundSpanContext_ : nullptr;
|
||||
|
||||
if (strategy == "deterministic")
|
||||
{
|
||||
roundSpan_.emplace(
|
||||
@@ -1203,7 +1243,8 @@ RCLConsensus::Adaptor::startRoundTracing(RCLCxLedger const& prevLgr)
|
||||
telemetry::TraceCategory::Consensus,
|
||||
cs::round,
|
||||
prevLgr.id().data(),
|
||||
prevLgr.id().bytes));
|
||||
prevLgr.id().bytes,
|
||||
link));
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1220,6 +1261,13 @@ RCLConsensus::Adaptor::startRoundTracing(RCLCxLedger const& prevLgr)
|
||||
roundSpan_->setAttribute(cs::attr::mode, toDisplayString(mode_.load()).c_str());
|
||||
roundSpan_->setAttribute(cs::attr::traceStrategy, strategy.c_str());
|
||||
roundSpan_->setAttribute(cs::attr::roundId, static_cast<int64_t>(prevLgr.seq()) + 1);
|
||||
roundSpan_->setAttribute(cs::attr::previousLedgerSeq, static_cast<int64_t>(prevLgr.seq()));
|
||||
roundSpan_->setAttribute(cs::attr::previousProposers, static_cast<int64_t>(prevProposers_));
|
||||
roundSpan_->setAttribute(
|
||||
cs::attr::previousRoundTimeMs, static_cast<int64_t>(prevRoundTime_.load().count()));
|
||||
roundSpan_->setAttribute(cs::attr::consensusPhase, "open");
|
||||
|
||||
roundSpan_->addEvent(cs::event::phaseOpen);
|
||||
|
||||
roundSpanContext_ = roundSpan_->captureContext();
|
||||
}
|
||||
@@ -1229,12 +1277,51 @@ RCLConsensus::Adaptor::createValidationSpan()
|
||||
{
|
||||
namespace cs = telemetry::consensus::span;
|
||||
|
||||
// Prefer linking to the accept span (matches the design diagram and
|
||||
// the "validation follows acceptance" causal model). Fall back to the
|
||||
// round span only if the accept context isn't yet captured (e.g.
|
||||
// tracing started after onAccept, or makeAcceptSpan returned a null
|
||||
// guard).
|
||||
if (acceptSpanContext_.isValid())
|
||||
{
|
||||
return telemetry::SpanGuard::linkedSpan(cs::validationSend, acceptSpanContext_);
|
||||
}
|
||||
|
||||
if (!roundSpanContext_.isValid())
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return telemetry::SpanGuard::linkedSpan(cs::validationSend, roundSpanContext_);
|
||||
}
|
||||
|
||||
void
|
||||
RCLConsensus::Adaptor::onPhaseEvent(std::string_view eventName, std::string_view phaseLabel)
|
||||
{
|
||||
namespace cs = telemetry::consensus::span;
|
||||
|
||||
if (!roundSpan_ || !*roundSpan_)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
roundSpan_->addEvent(eventName);
|
||||
if (!phaseLabel.empty())
|
||||
{
|
||||
roundSpan_->setAttribute(cs::attr::consensusPhase, phaseLabel);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
RCLConsensus::Adaptor::onOutcomeEvent(std::string_view eventName)
|
||||
{
|
||||
if (!roundSpan_ || !*roundSpan_)
|
||||
{
|
||||
return;
|
||||
}
|
||||
roundSpan_->addEvent(eventName);
|
||||
}
|
||||
|
||||
void
|
||||
RCLConsensus::startRound(
|
||||
NetClock::time_point const& now,
|
||||
|
||||
@@ -88,6 +88,34 @@ class RCLConsensus
|
||||
*/
|
||||
telemetry::SpanContext roundSpanContext_;
|
||||
|
||||
/** SpanContext of the prior round, captured before roundSpanContext_
|
||||
* is overwritten by the next round's startRoundTracing().
|
||||
*
|
||||
* Used as a follows-from link target on the new round's span so
|
||||
* consecutive rounds (each with its own deterministic trace-id)
|
||||
* remain navigable in trace UIs as a connected sequence.
|
||||
*/
|
||||
telemetry::SpanContext prevRoundSpanContext_;
|
||||
|
||||
/** SpanContext snapshot of the current round's accept span.
|
||||
*
|
||||
* Captured in makeAcceptSpan() and consumed by createValidationSpan()
|
||||
* on the jtACCEPT worker thread so the validation.send span can be
|
||||
* follows-from linked to consensus.accept (matching the design doc
|
||||
* and span hierarchy diagram). Reset on each startRoundTracing()
|
||||
* to prevent a stale prior-round context from being linked.
|
||||
*
|
||||
* Thread safety: same model as roundSpanContext_. The write in
|
||||
* makeAcceptSpan happens on the main consensus thread under
|
||||
* mutex_, and the read in createValidationSpan happens on the
|
||||
* jtACCEPT worker thread that was posted by makeAcceptSpan.
|
||||
* The job queue dispatch establishes a happens-before relation,
|
||||
* so no atomic synchronization is needed. The next reset (in
|
||||
* startRoundTracing) only runs after endConsensus → startRound,
|
||||
* which by Consensus design only fires after doAccept completes.
|
||||
*/
|
||||
telemetry::SpanContext acceptSpanContext_;
|
||||
|
||||
public:
|
||||
using Ledger_t = RCLCxLedger;
|
||||
using NodeID_t = NodeID;
|
||||
@@ -197,6 +225,32 @@ class RCLConsensus
|
||||
std::optional<telemetry::SpanGuard>
|
||||
createValidationSpan();
|
||||
|
||||
/** Record a phase-transition event on the active round span.
|
||||
*
|
||||
* Called from the engine at each phase boundary
|
||||
* (open/establish/accepted/recovery) so the round span carries a
|
||||
* complete timeline of state changes. Also updates the
|
||||
* `consensus_phase` attribute to the current phase name.
|
||||
*
|
||||
* @param eventName Event name (e.g. "phase.establish").
|
||||
* @param phaseLabel String value for the consensus_phase attr
|
||||
* ("open"/"establish"/"accepted"). Empty to skip
|
||||
* the attribute update (e.g. for "recovery").
|
||||
*/
|
||||
void
|
||||
onPhaseEvent(std::string_view eventName, std::string_view phaseLabel);
|
||||
|
||||
/** Record a checkConsensus outcome event on the round span.
|
||||
*
|
||||
* Called from the engine at the establish→accepted transition so
|
||||
* the path that drove acceptance (Yes / MovedOn / Expired) is
|
||||
* queryable from the round-level trace.
|
||||
*
|
||||
* @param eventName Event name (e.g. "outcome.yes").
|
||||
*/
|
||||
void
|
||||
onOutcomeEvent(std::string_view eventName);
|
||||
|
||||
private:
|
||||
//---------------------------------------------------------------------
|
||||
// The following members implement the generic Consensus requirements
|
||||
|
||||
@@ -427,13 +427,28 @@ public:
|
||||
getJson(bool full) const;
|
||||
|
||||
private:
|
||||
/** Why startRoundInternal is being entered.
|
||||
*
|
||||
* Distinguishes the normal Initial entry (from public startRound)
|
||||
* from a Recovered re-entry (from handleWrongLedger after the
|
||||
* correct prior ledger was acquired mid-round). The Recovered path
|
||||
* resets phase to Open within the SAME round, which is a state
|
||||
* transition that would otherwise be invisible in traces — the
|
||||
* recovery flag drives a `phase.recovery` event on the round span.
|
||||
*/
|
||||
enum class StartRoundReason : std::uint8_t {
|
||||
Initial,
|
||||
Recovered,
|
||||
};
|
||||
|
||||
void
|
||||
startRoundInternal(
|
||||
NetClock::time_point const& now,
|
||||
typename Ledger_t::ID const& prevLedgerID,
|
||||
Ledger_t const& prevLedger,
|
||||
ConsensusMode mode,
|
||||
std::unique_ptr<std::stringstream> const& clog);
|
||||
std::unique_ptr<std::stringstream> const& clog,
|
||||
StartRoundReason reason = StartRoundReason::Initial);
|
||||
|
||||
// Change our view of the previous ledger
|
||||
void
|
||||
@@ -693,8 +708,21 @@ Consensus<Adaptor>::startRoundInternal(
|
||||
typename Ledger_t::ID const& prevLedgerID,
|
||||
Ledger_t const& prevLedger,
|
||||
ConsensusMode mode,
|
||||
std::unique_ptr<std::stringstream> const& clog)
|
||||
std::unique_ptr<std::stringstream> const& clog,
|
||||
StartRoundReason const reason)
|
||||
{
|
||||
// Recovery path: handleWrongLedger acquired the correct prior ledger
|
||||
// and re-entered startRoundInternal mid-round. The roundSpan_ owned by
|
||||
// the adaptor is still the SAME span as before — startRoundTracing is
|
||||
// not called on the recovery path, so we record the recovery as an
|
||||
// event on the surviving round span. Pass empty phaseLabel to leave
|
||||
// consensus_phase unchanged (the actual phase reset to open is marked
|
||||
// separately by the new openSpan_ being emplaced below).
|
||||
if (reason == StartRoundReason::Recovered)
|
||||
{
|
||||
adaptor_.onPhaseEvent(telemetry::consensus::span::event::phaseRecovery, "");
|
||||
}
|
||||
|
||||
phase_ = ConsensusPhase::open;
|
||||
JLOG(j_.debug()) << "transitioned to ConsensusPhase::open ";
|
||||
CLOG(clog) << "startRoundInternal transitioned to ConsensusPhase::open, "
|
||||
@@ -709,6 +737,15 @@ Consensus<Adaptor>::startRoundInternal(
|
||||
telemetry::TraceCategory::Consensus,
|
||||
telemetry::seg::consensus,
|
||||
telemetry::consensus::span::op::phaseOpen));
|
||||
// On the Recovered path, fire phase.open here because startRoundTracing
|
||||
// (which fires it for the Initial path) is not called on re-entry. On
|
||||
// the Initial path this is a no-op because the round span hasn't been
|
||||
// created yet — the phase.open event is fired later by startRoundTracing
|
||||
// after the new round span is in place.
|
||||
if (reason == StartRoundReason::Recovered)
|
||||
{
|
||||
adaptor_.onPhaseEvent(telemetry::consensus::span::event::phaseOpen, "open");
|
||||
}
|
||||
mode_.set(mode, adaptor_);
|
||||
now_ = now;
|
||||
prevLedgerID_ = prevLedgerID;
|
||||
@@ -957,6 +994,7 @@ Consensus<Adaptor>::simulate(
|
||||
result_->proposers = prevProposers_ = currPeerPositions_.size();
|
||||
prevRoundTime_ = result_->roundTime.read();
|
||||
phase_ = ConsensusPhase::accepted;
|
||||
adaptor_.onPhaseEvent(telemetry::consensus::span::event::phaseAccepted, "accepted");
|
||||
adaptor_.onForceAccept(
|
||||
*result_, previousLedger_, closeResolution_, rawCloseTimes_, mode_.get(), getJson(true));
|
||||
// NOLINTEND(bugprone-unchecked-optional-access)
|
||||
@@ -1105,7 +1143,13 @@ Consensus<Adaptor>::handleWrongLedger(
|
||||
{
|
||||
JLOG(j_.info()) << "Have the consensus ledger " << prevLedgerID_;
|
||||
CLOG(clog) << "Have the consensus ledger " << prevLedgerID_ << ". ";
|
||||
startRoundInternal(now_, lgrId, *newLedger, ConsensusMode::switchedLedger, clog);
|
||||
startRoundInternal(
|
||||
now_,
|
||||
lgrId,
|
||||
*newLedger,
|
||||
ConsensusMode::switchedLedger,
|
||||
clog,
|
||||
StartRoundReason::Recovered);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1414,7 +1458,23 @@ Consensus<Adaptor>::phaseEstablish(std::unique_ptr<std::stringstream> const& clo
|
||||
prevProposers_ = currPeerPositions_.size();
|
||||
prevRoundTime_ = result_->roundTime.read();
|
||||
endEstablishTracing();
|
||||
{
|
||||
namespace cs = telemetry::consensus::span;
|
||||
if (result_->state == ConsensusState::Yes)
|
||||
{
|
||||
adaptor_.onOutcomeEvent(cs::event::outcomeYes);
|
||||
}
|
||||
else if (result_->state == ConsensusState::MovedOn)
|
||||
{
|
||||
adaptor_.onOutcomeEvent(cs::event::outcomeMovedOn);
|
||||
}
|
||||
else if (result_->state == ConsensusState::Expired)
|
||||
{
|
||||
adaptor_.onOutcomeEvent(cs::event::outcomeExpired);
|
||||
}
|
||||
}
|
||||
phase_ = ConsensusPhase::accepted;
|
||||
adaptor_.onPhaseEvent(telemetry::consensus::span::event::phaseAccepted, "accepted");
|
||||
JLOG(j_.debug()) << "transitioned to ConsensusPhase::accepted";
|
||||
adaptor_.onAccept(
|
||||
*result_,
|
||||
@@ -1434,8 +1494,21 @@ Consensus<Adaptor>::closeLedger(std::unique_ptr<std::stringstream> const& clog)
|
||||
// We should not be closing if we already have a position
|
||||
XRPL_ASSERT(!result_, "xrpl::Consensus::closeLedger : result is not set");
|
||||
|
||||
// Annotate the open-phase span with end-of-phase metadata before
|
||||
// ending it, so a Tempo/Jaeger query for consensus.phase.open shows
|
||||
// how long the phase ran and how many peer positions arrived during it.
|
||||
openTime_.tick(clock_.now());
|
||||
if (openSpan_ && *openSpan_)
|
||||
{
|
||||
namespace cs = telemetry::consensus::span;
|
||||
openSpan_->setAttribute(
|
||||
cs::attr::openDurationMs, static_cast<int64_t>(openTime_.read().count()));
|
||||
openSpan_->setAttribute(
|
||||
cs::attr::peerPositionsAtClose, static_cast<int64_t>(currPeerPositions_.size()));
|
||||
}
|
||||
openSpan_.reset();
|
||||
phase_ = ConsensusPhase::establish;
|
||||
adaptor_.onPhaseEvent(telemetry::consensus::span::event::phaseEstablish, "establish");
|
||||
JLOG(j_.debug()) << "transitioned to ConsensusPhase::establish";
|
||||
rawCloseTimes_.self = now_;
|
||||
peerUnchangedCounter_ = 0;
|
||||
@@ -1807,6 +1880,11 @@ Consensus<Adaptor>::haveConsensus(std::unique_ptr<std::stringstream> const& clog
|
||||
span.setAttribute(
|
||||
consensus::span::attr::thresholdPercent,
|
||||
static_cast<int64_t>(adaptor_.parms().avCT_CONSENSUS_PCT));
|
||||
span.setAttribute(
|
||||
consensus::span::attr::proposersFinished, static_cast<int64_t>(currentFinished));
|
||||
span.setAttribute(consensus::span::attr::consensusStalled, stalled);
|
||||
span.setAttribute(
|
||||
consensus::span::attr::establishCounter, static_cast<int64_t>(establishCounter_));
|
||||
|
||||
char const* stateStr = "no";
|
||||
if (result_->state == ConsensusState::Yes)
|
||||
@@ -1963,13 +2041,16 @@ Consensus<Adaptor>::updateEstablishTracing()
|
||||
{
|
||||
if (!establishSpan_)
|
||||
return;
|
||||
namespace cs = telemetry::consensus::span;
|
||||
establishSpan_->setAttribute(cs::attr::convergePercent, static_cast<int64_t>(convergePercent_));
|
||||
establishSpan_->setAttribute(cs::attr::establishCount, static_cast<int64_t>(establishCounter_));
|
||||
establishSpan_->setAttribute(
|
||||
telemetry::consensus::span::attr::convergePercent, static_cast<int64_t>(convergePercent_));
|
||||
establishSpan_->setAttribute(
|
||||
telemetry::consensus::span::attr::establishCount, static_cast<int64_t>(establishCounter_));
|
||||
establishSpan_->setAttribute(
|
||||
telemetry::consensus::span::attr::proposers,
|
||||
static_cast<int64_t>(currPeerPositions_.size()));
|
||||
cs::attr::proposers, static_cast<int64_t>(currPeerPositions_.size()));
|
||||
if (result_)
|
||||
{
|
||||
establishSpan_->setAttribute(
|
||||
cs::attr::disputesCount, static_cast<int64_t>(result_->disputes.size()));
|
||||
}
|
||||
}
|
||||
|
||||
template <class Adaptor>
|
||||
|
||||
@@ -137,11 +137,37 @@ inline constexpr auto ledgerId = makeStr("consensus_ledger_id");
|
||||
inline constexpr auto mode = makeStr("consensus_mode");
|
||||
inline constexpr auto round = makeStr("consensus_round");
|
||||
inline constexpr auto roundId = makeStr("consensus_round_id");
|
||||
/// Current phase name attached to consensus.round; updated on each
|
||||
/// phase transition event (open/establish/accepted).
|
||||
inline constexpr auto consensusPhase = makeStr("consensus_phase");
|
||||
/// Boolean flag set on consensus.check when checkConsensus reports stalled.
|
||||
inline constexpr auto consensusStalled = makeStr("consensus_stalled");
|
||||
|
||||
/// Domain-owned bare attrs.
|
||||
inline constexpr auto proposers = makeStr("proposers");
|
||||
inline constexpr auto roundTimeMs = makeStr("round_time_ms");
|
||||
inline constexpr auto proposing = makeStr("proposing");
|
||||
/// Round continuity / context attrs (set on consensus.round at round start).
|
||||
inline constexpr auto previousProposers = makeStr("previous_proposers");
|
||||
inline constexpr auto previousRoundTimeMs = makeStr("previous_round_time_ms");
|
||||
inline constexpr auto previousLedgerSeq = makeStr("previous_ledger_seq");
|
||||
inline constexpr auto closeTimeResolutionMs = makeStr("close_time_resolution_ms");
|
||||
/// Open-phase end metadata (set on consensus.phase.open before reset).
|
||||
inline constexpr auto openDurationMs = makeStr("open_duration_ms");
|
||||
inline constexpr auto peerPositionsAtClose = makeStr("peer_positions_at_close");
|
||||
/// Ledger-close inputs.
|
||||
inline constexpr auto txCountOpen = makeStr("tx_count_open");
|
||||
/// Establish/check additional state.
|
||||
inline constexpr auto proposersFinished = makeStr("proposers_finished");
|
||||
inline constexpr auto establishCounter = makeStr("establish_counter");
|
||||
/// Accept/apply enrichment.
|
||||
inline constexpr auto disputesResolvedCount = makeStr("disputes_resolved_count");
|
||||
/// Validation send/receive enrichment.
|
||||
inline constexpr auto fullValidation = makeStr("full_validation");
|
||||
inline constexpr auto validationSignTime = makeStr("validation_sign_time");
|
||||
/// Receive-side hash prefixes for cross-peer correlation.
|
||||
inline constexpr auto prevLedgerPrefix = makeStr("prev_ledger_prefix");
|
||||
inline constexpr auto positionHashPrefix = makeStr("position_hash_prefix");
|
||||
/// "consensus_state" — domain-qualified (collides with other domains' state).
|
||||
inline constexpr auto consensusState = makeStr("consensus_state");
|
||||
inline constexpr auto parentCloseTime = makeStr("parent_close_time");
|
||||
@@ -180,6 +206,20 @@ namespace event {
|
||||
inline constexpr auto disputeResolve = join(makeStr("dispute"), makeStr("resolve"));
|
||||
/// "tx.included"
|
||||
inline constexpr auto txIncluded = join(makeStr("tx"), makeStr("included"));
|
||||
|
||||
/// Phase transition events — fired on consensus.round at each transition
|
||||
/// so the round-level span carries a complete timeline of phase changes,
|
||||
/// including the handleWrongLedger recovery edge that re-enters Open.
|
||||
inline constexpr auto phaseOpen = join(makeStr("phase"), makeStr("open"));
|
||||
inline constexpr auto phaseEstablish = join(makeStr("phase"), makeStr("establish"));
|
||||
inline constexpr auto phaseAccepted = join(makeStr("phase"), makeStr("accepted"));
|
||||
inline constexpr auto phaseRecovery = join(makeStr("phase"), makeStr("recovery"));
|
||||
|
||||
/// Outcome events — fired on consensus.round at the establish→accepted
|
||||
/// transition so the path that drove acceptance is queryable.
|
||||
inline constexpr auto outcomeYes = join(makeStr("outcome"), makeStr("yes"));
|
||||
inline constexpr auto outcomeMovedOn = join(makeStr("outcome"), makeStr("moved_on"));
|
||||
inline constexpr auto outcomeExpired = join(makeStr("outcome"), makeStr("expired"));
|
||||
} // namespace event
|
||||
|
||||
// ===== Attribute values ======================================================
|
||||
|
||||
@@ -1982,6 +1982,15 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
span->setAttribute(telemetry::consensus::span::attr::trusted, isTrusted);
|
||||
span->setAttribute(
|
||||
telemetry::consensus::span::attr::round, static_cast<int64_t>(set.proposeseq()));
|
||||
// First 16 hex chars (8 bytes) of each hash — enough to disambiguate
|
||||
// peer positions and prior ledgers without exporting full 32-byte
|
||||
// hashes on every receive event.
|
||||
span->setAttribute(
|
||||
telemetry::consensus::span::attr::prevLedgerPrefix,
|
||||
to_string(prevLedger).substr(0, 16).c_str());
|
||||
span->setAttribute(
|
||||
telemetry::consensus::span::attr::positionHashPrefix,
|
||||
to_string(proposeHash).substr(0, 16).c_str());
|
||||
|
||||
std::weak_ptr<PeerImp> const weak = shared_from_this();
|
||||
app_.getJobQueue().addJob(
|
||||
@@ -2572,6 +2581,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
telemetry::consensus::span::attr::ledgerSeq,
|
||||
static_cast<int64_t>(val->getFieldU32(sfLedgerSequence)));
|
||||
}
|
||||
span->setAttribute(telemetry::consensus::span::attr::fullValidation, val->isFull());
|
||||
span->setAttribute(
|
||||
telemetry::consensus::span::attr::validationSignTime,
|
||||
static_cast<int64_t>(val->getSignTime().time_since_epoch().count()));
|
||||
|
||||
if (!isTrusted && (tracking_.load() == Tracking::diverged))
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user