mirror of
https://github.com/XRPLF/rippled.git
synced 2026-04-29 15:37:57 +00:00
1295 lines
45 KiB
C++
1295 lines
45 KiB
C++
#include <xrpld/app/consensus/RCLConsensus.h>
|
|
#include <xrpld/app/consensus/RCLValidations.h>
|
|
#include <xrpld/app/ledger/BuildLedger.h>
|
|
#include <xrpld/app/ledger/InboundLedgers.h>
|
|
#include <xrpld/app/ledger/InboundTransactions.h>
|
|
#include <xrpld/app/ledger/LedgerMaster.h>
|
|
#include <xrpld/app/ledger/LocalTxs.h>
|
|
#include <xrpld/app/ledger/OpenLedger.h>
|
|
#include <xrpld/app/misc/NegativeUNLVote.h>
|
|
#include <xrpld/app/misc/TxQ.h>
|
|
#include <xrpld/app/misc/ValidatorKeys.h>
|
|
#include <xrpld/app/misc/ValidatorList.h>
|
|
#include <xrpld/overlay/Overlay.h>
|
|
#include <xrpld/overlay/predicates.h>
|
|
#include <xrpld/telemetry/TracingInstrumentation.h>
|
|
|
|
#include <xrpl/basics/random.h>
|
|
#ifdef XRPL_ENABLE_TELEMETRY
|
|
#include <xrpl/crypto/csprng.h>
|
|
#include <xrpl/telemetry/SpanGuard.h>
|
|
|
|
#include <opentelemetry/trace/context.h>
|
|
#include <opentelemetry/trace/default_span.h>
|
|
#include <opentelemetry/trace/span_context.h>
|
|
#include <opentelemetry/trace/trace_flags.h>
|
|
#include <opentelemetry/trace/trace_id.h>
|
|
#endif
|
|
#include <xrpl/beast/core/LexicalCast.h>
|
|
#include <xrpl/beast/utility/instrumentation.h>
|
|
#include <xrpl/core/HashRouter.h>
|
|
#include <xrpl/ledger/AmendmentTable.h>
|
|
#include <xrpl/ledger/Ledger.h>
|
|
#include <xrpl/ledger/LedgerTiming.h>
|
|
#include <xrpl/protocol/BuildInfo.h>
|
|
#include <xrpl/protocol/Feature.h>
|
|
#include <xrpl/protocol/digest.h>
|
|
#include <xrpl/server/LoadFeeTrack.h>
|
|
#include <xrpl/server/NetworkOPs.h>
|
|
|
|
#include <algorithm>
|
|
#include <iomanip>
|
|
#include <mutex>
|
|
|
|
namespace xrpl {
|
|
|
|
#ifdef XRPL_ENABLE_TELEMETRY
|
|
namespace {
|
|
|
|
/** Create an OTel context with a deterministic trace ID.
|
|
*
|
|
* Derives the trace_id from the first 16 bytes of a uint256 ledger hash
|
|
* so that all validators participating in the same consensus round
|
|
* produce spans sharing the same trace_id. This enables cross-node
|
|
* trace correlation in the backend without requiring explicit context
|
|
* propagation over the peer protocol.
|
|
*
|
|
* The span_id is randomly generated (8 bytes from the CSPRNG) so each
|
|
* validator's root span is unique within the shared trace.
|
|
*
|
|
* @param ledgerId The previousLedger.id() hash for the consensus round.
|
|
* @return An OTel Context containing a synthetic parent span with the
|
|
* deterministic trace_id and a random span_id.
|
|
*/
|
|
opentelemetry::context::Context
|
|
createDeterministicContext(uint256 const& ledgerId)
|
|
{
|
|
namespace trace = opentelemetry::trace;
|
|
|
|
// Use first 16 bytes of the 256-bit ledger hash as trace ID.
|
|
// uint256::data() returns a const uint8_t* to 32 bytes in
|
|
// big-endian order; the first 16 are the most-significant half.
|
|
trace::TraceId traceId(opentelemetry::nostd::span<uint8_t const, 16>(ledgerId.data(), 16));
|
|
|
|
// Generate a random 8-byte span ID using the crypto PRNG.
|
|
uint8_t spanIdBytes[8];
|
|
crypto_prng()(spanIdBytes, sizeof(spanIdBytes));
|
|
trace::SpanId spanId(opentelemetry::nostd::span<uint8_t const, 8>(spanIdBytes, 8));
|
|
|
|
// Build a synthetic SpanContext that is sampled (flag 0x01)
|
|
// and not remote (originated locally).
|
|
trace::SpanContext syntheticCtx(
|
|
traceId,
|
|
spanId,
|
|
trace::TraceFlags(1),
|
|
/* remote = */ false);
|
|
|
|
// Wrap in a DefaultSpan and set on an empty Context via the
|
|
// standard kSpanKey used by the OTel SDK for context propagation.
|
|
return opentelemetry::context::Context{}.SetValue(
|
|
trace::kSpanKey,
|
|
opentelemetry::nostd::shared_ptr<trace::Span>(new trace::DefaultSpan(syntheticCtx)));
|
|
}
|
|
|
|
} // namespace
|
|
#endif // XRPL_ENABLE_TELEMETRY
|
|
|
|
RCLConsensus::RCLConsensus(
|
|
Application& app,
|
|
std::unique_ptr<FeeVote>&& feeVote,
|
|
LedgerMaster& ledgerMaster,
|
|
LocalTxs& localTxs,
|
|
InboundTransactions& inboundTransactions,
|
|
Consensus<Adaptor>::clock_type const& clock,
|
|
ValidatorKeys const& validatorKeys,
|
|
beast::Journal journal)
|
|
: adaptor_(
|
|
app,
|
|
std::move(feeVote),
|
|
ledgerMaster,
|
|
localTxs,
|
|
inboundTransactions,
|
|
validatorKeys,
|
|
journal)
|
|
, consensus_(clock, adaptor_, journal)
|
|
, j_(journal)
|
|
{
|
|
}
|
|
|
|
RCLConsensus::Adaptor::Adaptor(
|
|
Application& app,
|
|
std::unique_ptr<FeeVote>&& feeVote,
|
|
LedgerMaster& ledgerMaster,
|
|
LocalTxs& localTxs,
|
|
InboundTransactions& inboundTransactions,
|
|
ValidatorKeys const& validatorKeys,
|
|
beast::Journal journal)
|
|
: app_(app)
|
|
, feeVote_(std::move(feeVote))
|
|
, ledgerMaster_(ledgerMaster)
|
|
, localTxs_(localTxs)
|
|
, inboundTransactions_{inboundTransactions}
|
|
, j_(journal)
|
|
, validatorKeys_(validatorKeys)
|
|
, valCookie_(1 + rand_int(crypto_prng(), std::numeric_limits<std::uint64_t>::max() - 1))
|
|
, nUnlVote_(validatorKeys_.nodeID, j_)
|
|
{
|
|
XRPL_ASSERT(valCookie_, "xrpl::RCLConsensus::Adaptor::Adaptor : nonzero cookie");
|
|
|
|
JLOG(j_.info()) << "Consensus engine started (cookie: " + std::to_string(valCookie_) + ")";
|
|
|
|
if (validatorKeys_.nodeID != beast::zero && validatorKeys_.keys)
|
|
{
|
|
JLOG(j_.info()) << "Validator identity: "
|
|
<< toBase58(TokenType::NodePublic, validatorKeys_.keys->masterPublicKey);
|
|
|
|
if (validatorKeys_.keys->masterPublicKey != validatorKeys_.keys->publicKey)
|
|
{
|
|
JLOG(j_.debug()) << "Validator ephemeral signing key: "
|
|
<< toBase58(TokenType::NodePublic, validatorKeys_.keys->publicKey)
|
|
<< " (seq: " << std::to_string(validatorKeys_.sequence) << ")";
|
|
}
|
|
}
|
|
}
|
|
|
|
std::optional<RCLCxLedger>
|
|
RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
|
|
{
|
|
// we need to switch the ledger we're working from
|
|
auto built = ledgerMaster_.getLedgerByHash(hash);
|
|
if (!built)
|
|
{
|
|
if (acquiringLedger_ != hash)
|
|
{
|
|
// need to start acquiring the correct consensus LCL
|
|
JLOG(j_.warn()) << "Need consensus ledger " << hash;
|
|
|
|
// Tell the ledger acquire system that we need the consensus ledger
|
|
acquiringLedger_ = hash;
|
|
|
|
app_.getJobQueue().addJob(jtADVANCE, "GetConsL1", [id = hash, &app = app_, this]() {
|
|
JLOG(j_.debug()) << "JOB advanceLedger getConsensusLedger1 started";
|
|
app.getInboundLedgers().acquireAsync(id, 0, InboundLedger::Reason::CONSENSUS);
|
|
});
|
|
}
|
|
return std::nullopt;
|
|
}
|
|
|
|
XRPL_ASSERT(
|
|
!built->open() && built->isImmutable(),
|
|
"xrpl::RCLConsensus::Adaptor::acquireLedger : valid ledger state");
|
|
XRPL_ASSERT(
|
|
built->header().hash == hash,
|
|
"xrpl::RCLConsensus::Adaptor::acquireLedger : ledger hash match");
|
|
|
|
// Notify inbound transactions of the new ledger sequence number
|
|
inboundTransactions_.newRound(built->header().seq);
|
|
|
|
return RCLCxLedger(built);
|
|
}
|
|
|
|
void
|
|
RCLConsensus::Adaptor::share(RCLCxPeerPos const& peerPos)
|
|
{
|
|
protocol::TMProposeSet prop;
|
|
|
|
auto const& proposal = peerPos.proposal();
|
|
|
|
prop.set_proposeseq(proposal.proposeSeq());
|
|
prop.set_closetime(proposal.closeTime().time_since_epoch().count());
|
|
|
|
prop.set_currenttxhash(proposal.position().begin(), proposal.position().size());
|
|
prop.set_previousledger(proposal.prevLedger().begin(), proposal.position().size());
|
|
|
|
auto const pk = peerPos.publicKey().slice();
|
|
prop.set_nodepubkey(pk.data(), pk.size());
|
|
|
|
auto const sig = peerPos.signature();
|
|
prop.set_signature(sig.data(), sig.size());
|
|
|
|
app_.getOverlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey());
|
|
}
|
|
|
|
void
|
|
RCLConsensus::Adaptor::share(RCLCxTx const& tx)
|
|
{
|
|
// If we didn't relay this transaction recently, relay it to all peers
|
|
if (app_.getHashRouter().shouldRelay(tx.id()))
|
|
{
|
|
JLOG(j_.debug()) << "Relaying disputed tx " << tx.id();
|
|
auto const slice = tx.tx_->slice();
|
|
protocol::TMTransaction msg;
|
|
msg.set_rawtransaction(slice.data(), slice.size());
|
|
msg.set_status(protocol::tsNEW);
|
|
msg.set_receivetimestamp(app_.getTimeKeeper().now().time_since_epoch().count());
|
|
static std::set<Peer::id_t> skip{};
|
|
app_.getOverlay().relay(tx.id(), msg, skip);
|
|
}
|
|
else
|
|
{
|
|
JLOG(j_.debug()) << "Not relaying disputed tx " << tx.id();
|
|
}
|
|
}
|
|
void
|
|
RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal)
|
|
{
|
|
XRPL_TRACE_CONSENSUS(app_.getTelemetry(), "consensus.proposal.send");
|
|
XRPL_TRACE_SET_ATTR("xrpl.consensus.round", static_cast<int64_t>(proposal.proposeSeq()));
|
|
|
|
JLOG(j_.trace()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ")
|
|
<< xrpl::to_string(proposal.prevLedger()) << " -> "
|
|
<< xrpl::to_string(proposal.position());
|
|
|
|
protocol::TMProposeSet prop;
|
|
|
|
prop.set_currenttxhash(proposal.position().begin(), proposal.position().size());
|
|
prop.set_previousledger(proposal.prevLedger().begin(), proposal.prevLedger().size());
|
|
prop.set_proposeseq(proposal.proposeSeq());
|
|
prop.set_closetime(proposal.closeTime().time_since_epoch().count());
|
|
|
|
if (!validatorKeys_.keys)
|
|
{
|
|
JLOG(j_.warn()) << "RCLConsensus::Adaptor::propose: ValidatorKeys "
|
|
"not set: \n";
|
|
return;
|
|
}
|
|
|
|
auto const& keys = *validatorKeys_.keys;
|
|
|
|
prop.set_nodepubkey(keys.publicKey.data(), keys.publicKey.size());
|
|
|
|
auto sig = signDigest(keys.publicKey, keys.secretKey, proposal.signingHash());
|
|
|
|
prop.set_signature(sig.data(), sig.size());
|
|
|
|
auto const suppression = proposalUniqueId(
|
|
proposal.position(),
|
|
proposal.prevLedger(),
|
|
proposal.proposeSeq(),
|
|
proposal.closeTime(),
|
|
keys.publicKey,
|
|
sig);
|
|
|
|
app_.getHashRouter().addSuppression(suppression);
|
|
|
|
app_.getOverlay().broadcast(prop);
|
|
}
|
|
|
|
void
|
|
RCLConsensus::Adaptor::share(RCLTxSet const& txns)
|
|
{
|
|
inboundTransactions_.giveSet(txns.id(), txns.map_, false);
|
|
}
|
|
|
|
std::optional<RCLTxSet>
|
|
RCLConsensus::Adaptor::acquireTxSet(RCLTxSet::ID const& setId)
|
|
{
|
|
if (auto txns = inboundTransactions_.getSet(setId, true))
|
|
{
|
|
return RCLTxSet{std::move(txns)};
|
|
}
|
|
return std::nullopt;
|
|
}
|
|
|
|
bool
|
|
RCLConsensus::Adaptor::hasOpenTransactions() const
|
|
{
|
|
return !app_.getOpenLedger().empty();
|
|
}
|
|
|
|
std::size_t
|
|
RCLConsensus::Adaptor::proposersValidated(LedgerHash const& h) const
|
|
{
|
|
return app_.getValidations().numTrustedForLedger(h);
|
|
}
|
|
|
|
std::size_t
|
|
RCLConsensus::Adaptor::proposersFinished(RCLCxLedger const& ledger, LedgerHash const& h) const
|
|
{
|
|
RCLValidations& vals = app_.getValidations();
|
|
return vals.getNodesAfter(RCLValidatedLedger(ledger.ledger_, vals.adaptor().journal()), h);
|
|
}
|
|
|
|
uint256
|
|
RCLConsensus::Adaptor::getPrevLedger(
|
|
uint256 ledgerID,
|
|
RCLCxLedger const& ledger,
|
|
ConsensusMode mode)
|
|
{
|
|
RCLValidations& vals = app_.getValidations();
|
|
uint256 netLgr = vals.getPreferred(
|
|
RCLValidatedLedger{ledger.ledger_, vals.adaptor().journal()},
|
|
ledgerMaster_.getValidLedgerIndex());
|
|
|
|
if (netLgr != ledgerID)
|
|
{
|
|
if (mode != ConsensusMode::wrongLedger)
|
|
app_.getOPs().consensusViewChange();
|
|
|
|
JLOG(j_.debug()) << Json::Compact(app_.getValidations().getJsonTrie());
|
|
}
|
|
|
|
return netLgr;
|
|
}
|
|
|
|
auto
|
|
RCLConsensus::Adaptor::onClose(
|
|
RCLCxLedger const& ledger,
|
|
NetClock::time_point const& closeTime,
|
|
ConsensusMode mode) -> Result
|
|
{
|
|
XRPL_TRACE_CONSENSUS(app_.getTelemetry(), "consensus.ledger_close");
|
|
XRPL_TRACE_SET_ATTR(
|
|
"xrpl.consensus.ledger.seq", static_cast<int64_t>(ledger.ledger_->header().seq + 1));
|
|
XRPL_TRACE_SET_ATTR("xrpl.consensus.mode", toDisplayString(mode).c_str());
|
|
|
|
bool const wrongLCL = mode == ConsensusMode::wrongLedger;
|
|
bool const proposing = mode == ConsensusMode::proposing;
|
|
|
|
notify(protocol::neCLOSING_LEDGER, ledger, !wrongLCL);
|
|
|
|
auto const& prevLedger = ledger.ledger_;
|
|
|
|
ledgerMaster_.applyHeldTransactions();
|
|
// Tell the ledger master not to acquire the ledger we're probably building
|
|
ledgerMaster_.setBuildingLedger(prevLedger->header().seq + 1);
|
|
|
|
auto initialLedger = app_.getOpenLedger().current();
|
|
|
|
auto initialSet = std::make_shared<SHAMap>(SHAMapType::TRANSACTION, app_.getNodeFamily());
|
|
initialSet->setUnbacked();
|
|
|
|
// Build SHAMap containing all transactions in our open ledger
|
|
for (auto const& tx : initialLedger->txs)
|
|
{
|
|
JLOG(j_.trace()) << "Adding open ledger TX " << tx.first->getTransactionID();
|
|
Serializer s(2048);
|
|
tx.first->add(s);
|
|
initialSet->addItem(
|
|
SHAMapNodeType::tnTRANSACTION_NM,
|
|
make_shamapitem(tx.first->getTransactionID(), s.slice()));
|
|
}
|
|
|
|
// Add pseudo-transactions to the set
|
|
if (app_.config().standalone() || (proposing && !wrongLCL))
|
|
{
|
|
if (prevLedger->isFlagLedger())
|
|
{
|
|
// previous ledger was flag ledger, add fee and amendment
|
|
// pseudo-transactions
|
|
auto validations =
|
|
app_.getValidators().negativeUNLFilter(app_.getValidations().getTrustedForLedger(
|
|
prevLedger->header().parentHash, prevLedger->seq() - 1));
|
|
if (validations.size() >= app_.getValidators().quorum())
|
|
{
|
|
feeVote_->doVoting(prevLedger, validations, initialSet);
|
|
app_.getAmendmentTable().doVoting(prevLedger, validations, initialSet, j_);
|
|
}
|
|
}
|
|
else if (prevLedger->isVotingLedger())
|
|
{
|
|
// previous ledger was a voting ledger,
|
|
// so the current consensus session is for a flag ledger,
|
|
// add negative UNL pseudo-transactions
|
|
nUnlVote_.doVoting(
|
|
prevLedger,
|
|
app_.getValidators().getTrustedMasterKeys(),
|
|
app_.getValidations(),
|
|
initialSet);
|
|
}
|
|
}
|
|
|
|
// Now we need an immutable snapshot
|
|
initialSet = initialSet->snapShot(false);
|
|
|
|
if (!wrongLCL)
|
|
{
|
|
LedgerIndex const seq = prevLedger->header().seq + 1;
|
|
RCLCensorshipDetector<TxID, LedgerIndex>::TxIDSeqVec proposed;
|
|
|
|
initialSet->visitLeaves(
|
|
[&proposed, seq](boost::intrusive_ptr<SHAMapItem const> const& item) {
|
|
proposed.emplace_back(item->key(), seq);
|
|
});
|
|
|
|
censorshipDetector_.propose(std::move(proposed));
|
|
}
|
|
|
|
// Needed because of the move below.
|
|
auto const setHash = initialSet->getHash().as_uint256();
|
|
|
|
return Result{
|
|
std::move(initialSet),
|
|
RCLCxPeerPos::Proposal{
|
|
initialLedger->header().parentHash,
|
|
RCLCxPeerPos::Proposal::seqJoin,
|
|
setHash,
|
|
closeTime,
|
|
app_.getTimeKeeper().closeTime(),
|
|
validatorKeys_.nodeID}};
|
|
}
|
|
|
|
void
|
|
RCLConsensus::Adaptor::onForceAccept(
|
|
Result const& result,
|
|
RCLCxLedger const& prevLedger,
|
|
NetClock::duration const& closeResolution,
|
|
ConsensusCloseTimes const& rawCloseTimes,
|
|
ConsensusMode const& mode,
|
|
Json::Value&& consensusJson)
|
|
{
|
|
doAccept(result, prevLedger, closeResolution, rawCloseTimes, mode, std::move(consensusJson));
|
|
}
|
|
|
|
void
|
|
RCLConsensus::Adaptor::onAccept(
|
|
Result const& result,
|
|
RCLCxLedger const& prevLedger,
|
|
NetClock::duration const& closeResolution,
|
|
ConsensusCloseTimes const& rawCloseTimes,
|
|
ConsensusMode const& mode,
|
|
Json::Value&& consensusJson,
|
|
bool const validating)
|
|
{
|
|
XRPL_TRACE_CONSENSUS(app_.getTelemetry(), "consensus.accept");
|
|
XRPL_TRACE_SET_ATTR("xrpl.consensus.proposers", static_cast<int64_t>(result.proposers));
|
|
XRPL_TRACE_SET_ATTR(
|
|
"xrpl.consensus.round_time_ms", static_cast<int64_t>(result.roundTime.read().count()));
|
|
XRPL_TRACE_SET_ATTR(
|
|
"xrpl.consensus.validation_quorum", static_cast<int64_t>(app_.getValidators().quorum()));
|
|
XRPL_TRACE_SET_ATTR(
|
|
"xrpl.consensus.proposers_validated", static_cast<int64_t>(result.proposers));
|
|
|
|
app_.getJobQueue().addJob(
|
|
jtACCEPT,
|
|
"AcceptLedger",
|
|
// NOLINTNEXTLINE(cppcoreguidelines-misleading-capture-default-by-value)
|
|
[=, this, cj = std::move(consensusJson)]() mutable {
|
|
// Note that no lock is held or acquired during this job.
|
|
// This is because generic Consensus guarantees that once a ledger
|
|
// is accepted, the consensus results and capture by reference state
|
|
// will not change until startRound is called (which happens via
|
|
// endConsensus).
|
|
RclConsensusLogger clog("onAccept", validating, j_);
|
|
this->doAccept(result, prevLedger, closeResolution, rawCloseTimes, mode, std::move(cj));
|
|
this->app_.getOPs().endConsensus(clog.ss());
|
|
});
|
|
}
|
|
|
|
void
|
|
RCLConsensus::Adaptor::doAccept(
|
|
Result const& result,
|
|
RCLCxLedger const& prevLedger,
|
|
NetClock::duration closeResolution,
|
|
ConsensusCloseTimes const& rawCloseTimes,
|
|
ConsensusMode const& mode,
|
|
Json::Value&& consensusJson)
|
|
{
|
|
prevProposers_ = result.proposers;
|
|
prevRoundTime_ = result.roundTime.read();
|
|
|
|
bool closeTimeCorrect = false;
|
|
|
|
bool const proposing = mode == ConsensusMode::proposing;
|
|
bool const haveCorrectLCL = mode != ConsensusMode::wrongLedger;
|
|
bool const consensusFail = result.state == ConsensusState::MovedOn;
|
|
|
|
auto consensusCloseTime = result.position.closeTime();
|
|
|
|
if (consensusCloseTime == NetClock::time_point{})
|
|
{
|
|
// We agreed to disagree on the close time
|
|
using namespace std::chrono_literals;
|
|
consensusCloseTime = prevLedger.closeTime() + 1s;
|
|
closeTimeCorrect = false;
|
|
}
|
|
else
|
|
{
|
|
// We agreed on a close time
|
|
consensusCloseTime =
|
|
effCloseTime(consensusCloseTime, closeResolution, prevLedger.closeTime());
|
|
closeTimeCorrect = true;
|
|
}
|
|
|
|
/// @note This method runs on a JobQueue worker thread (jtACCEPT), not the
|
|
/// consensus thread where roundSpan_ is active. OTel's thread-local
|
|
/// context propagation does NOT cross thread boundaries, so the
|
|
/// consensus.accept.apply span below is standalone — it is NOT a child
|
|
/// of consensus.round. Cross-thread context propagation for this path
|
|
/// is a future enhancement (Phase 4b).
|
|
|
|
// Trace the ledger application phase with close time details.
|
|
// This span runs on the jtACCEPT job queue thread (posted by onAccept),
|
|
// separate from the consensus.accept span which fires synchronously in
|
|
// onAccept. It captures the agreed-upon close time, whether validators
|
|
// converged on it (per avCT_CONSENSUS_PCT), the consensus outcome,
|
|
// parent close time, this node's own close time proposal, the number
|
|
// of distinct vote bins, and the resolution adaptation direction.
|
|
XRPL_TRACE_CONSENSUS(app_.getTelemetry(), "consensus.accept.apply");
|
|
XRPL_TRACE_SET_ATTR("xrpl.consensus.ledger.seq", static_cast<int64_t>(prevLedger.seq() + 1));
|
|
XRPL_TRACE_SET_ATTR(
|
|
"xrpl.consensus.close_time",
|
|
static_cast<int64_t>(consensusCloseTime.time_since_epoch().count()));
|
|
XRPL_TRACE_SET_ATTR("xrpl.consensus.close_time_correct", closeTimeCorrect);
|
|
XRPL_TRACE_SET_ATTR(
|
|
"xrpl.consensus.close_resolution_ms",
|
|
static_cast<int64_t>(
|
|
std::chrono::duration_cast<std::chrono::milliseconds>(closeResolution).count()));
|
|
XRPL_TRACE_SET_ATTR(
|
|
"xrpl.consensus.state", std::string(consensusFail ? "moved_on" : "finished"));
|
|
XRPL_TRACE_SET_ATTR("xrpl.consensus.proposing", proposing);
|
|
XRPL_TRACE_SET_ATTR(
|
|
"xrpl.consensus.round_time_ms", static_cast<int64_t>(result.roundTime.read().count()));
|
|
// Parent ledger's close time — enables computing close-time deltas across
|
|
// consecutive rounds without correlating separate spans.
|
|
XRPL_TRACE_SET_ATTR(
|
|
"xrpl.consensus.parent_close_time",
|
|
static_cast<int64_t>(prevLedger.closeTime().time_since_epoch().count()));
|
|
// This node's own proposed close time before consensus voting.
|
|
XRPL_TRACE_SET_ATTR(
|
|
"xrpl.consensus.close_time_self",
|
|
static_cast<int64_t>(rawCloseTimes.self.time_since_epoch().count()));
|
|
// Number of distinct close-time vote bins from peer proposals.
|
|
XRPL_TRACE_SET_ATTR(
|
|
"xrpl.consensus.close_time_vote_bins", static_cast<int64_t>(rawCloseTimes.peers.size()));
|
|
// Whether close-time resolution increased (coarser), decreased (finer),
|
|
// or stayed the same relative to the previous ledger.
|
|
{
|
|
auto const prevRes = prevLedger.closeTimeResolution();
|
|
std::string dir = (closeResolution > prevRes) ? "increased"
|
|
: (closeResolution < prevRes) ? "decreased"
|
|
: "unchanged";
|
|
XRPL_TRACE_SET_ATTR("xrpl.consensus.resolution_direction", std::move(dir));
|
|
}
|
|
|
|
JLOG(j_.debug()) << "Report: Prop=" << (proposing ? "yes" : "no")
|
|
<< " val=" << (validating_ ? "yes" : "no")
|
|
<< " corLCL=" << (haveCorrectLCL ? "yes" : "no")
|
|
<< " fail=" << (consensusFail ? "yes" : "no");
|
|
JLOG(j_.debug()) << "Report: Prev = " << prevLedger.id() << ":" << prevLedger.seq();
|
|
|
|
//--------------------------------------------------------------------------
|
|
std::set<TxID> failed;
|
|
|
|
// We want to put transactions in an unpredictable but deterministic order:
|
|
// we use the hash of the set.
|
|
//
|
|
// FIXME: Use a std::vector and a custom sorter instead of CanonicalTXSet?
|
|
CanonicalTXSet retriableTxs{result.txns.map_->getHash().as_uint256()};
|
|
|
|
JLOG(j_.debug()) << "Building canonical tx set: " << retriableTxs.key();
|
|
|
|
for (auto const& item : *result.txns.map_)
|
|
{
|
|
try
|
|
{
|
|
retriableTxs.insert(std::make_shared<STTx const>(SerialIter{item.slice()}));
|
|
JLOG(j_.debug()) << " Tx: " << item.key();
|
|
}
|
|
catch (std::exception const& ex)
|
|
{
|
|
failed.insert(item.key());
|
|
JLOG(j_.warn()) << " Tx: " << item.key() << " throws: " << ex.what();
|
|
}
|
|
}
|
|
|
|
auto built = buildLCL(
|
|
prevLedger,
|
|
retriableTxs,
|
|
consensusCloseTime,
|
|
closeTimeCorrect,
|
|
closeResolution,
|
|
result.roundTime.read(),
|
|
failed);
|
|
|
|
auto const newLCLHash = built.id();
|
|
JLOG(j_.debug()) << "Built ledger #" << built.seq() << ": " << newLCLHash;
|
|
|
|
// Tell directly connected peers that we have a new LCL
|
|
notify(protocol::neACCEPTED_LEDGER, built, haveCorrectLCL);
|
|
|
|
// As long as we're in sync with the network, attempt to detect attempts
|
|
// at censorship of transaction by tracking which ones don't make it in
|
|
// after a period of time.
|
|
if (haveCorrectLCL && result.state == ConsensusState::Yes)
|
|
{
|
|
std::vector<TxID> accepted;
|
|
|
|
result.txns.map_->visitLeaves(
|
|
[&accepted](boost::intrusive_ptr<SHAMapItem const> const& item) {
|
|
accepted.push_back(item->key());
|
|
});
|
|
|
|
// Track all the transactions which failed or were marked as retriable
|
|
for (auto const& r : retriableTxs)
|
|
failed.insert(r.first.getTXID());
|
|
|
|
censorshipDetector_.check(
|
|
std::move(accepted),
|
|
[curr = built.seq(), j = app_.getJournal("CensorshipDetector"), &failed](
|
|
uint256 const& id, LedgerIndex seq) {
|
|
if (failed.contains(id))
|
|
return true;
|
|
|
|
auto const wait = curr - seq;
|
|
|
|
if (wait && (wait % censorshipWarnInternal == 0))
|
|
{
|
|
std::ostringstream ss;
|
|
ss << "Potential Censorship: Eligible tx " << id
|
|
<< ", which we are tracking since ledger " << seq
|
|
<< " has not been included as of ledger " << curr << ".";
|
|
|
|
JLOG(j.warn()) << ss.str();
|
|
}
|
|
|
|
return false;
|
|
});
|
|
}
|
|
|
|
if (validating_)
|
|
validating_ = ledgerMaster_.isCompatible(*built.ledger_, j_.warn(), "Not validating");
|
|
|
|
if (validating_ && !consensusFail && app_.getValidations().canValidateSeq(built.seq()))
|
|
{
|
|
validate(built, result.txns, proposing);
|
|
JLOG(j_.info()) << "CNF Val " << newLCLHash;
|
|
}
|
|
else
|
|
JLOG(j_.info()) << "CNF buildLCL " << newLCLHash;
|
|
|
|
// See if we can accept a ledger as fully-validated
|
|
ledgerMaster_.consensusBuilt(built.ledger_, result.txns.id(), std::move(consensusJson));
|
|
|
|
//-------------------------------------------------------------------------
|
|
{
|
|
// Apply disputed transactions that didn't get in
|
|
//
|
|
// The first crack of transactions to get into the new
|
|
// open ledger goes to transactions proposed by a validator
|
|
// we trust but not included in the consensus set.
|
|
//
|
|
// These are done first because they are the most likely
|
|
// to receive agreement during consensus. They are also
|
|
// ordered logically "sooner" than transactions not mentioned
|
|
// in the previous consensus round.
|
|
//
|
|
bool anyDisputes = false;
|
|
for (auto const& [_, dispute] : result.disputes)
|
|
{
|
|
(void)_;
|
|
if (!dispute.getOurVote())
|
|
{
|
|
// we voted NO
|
|
try
|
|
{
|
|
JLOG(j_.debug()) << "Test applying disputed transaction that did"
|
|
<< " not get in " << dispute.tx().id();
|
|
|
|
SerialIter sit(dispute.tx().tx_->slice());
|
|
auto txn = std::make_shared<STTx const>(sit);
|
|
|
|
// Disputed pseudo-transactions that were not accepted
|
|
// can't be successfully applied in the next ledger
|
|
if (isPseudoTx(*txn))
|
|
continue;
|
|
|
|
retriableTxs.insert(txn);
|
|
|
|
anyDisputes = true;
|
|
}
|
|
catch (std::exception const& ex)
|
|
{
|
|
JLOG(j_.debug()) << "Failed to apply transaction we voted "
|
|
"NO on. Exception: "
|
|
<< ex.what();
|
|
}
|
|
}
|
|
}
|
|
|
|
// Build new open ledger
|
|
std::unique_lock lock{app_.getMasterMutex(), std::defer_lock};
|
|
std::unique_lock sl{ledgerMaster_.peekMutex(), std::defer_lock};
|
|
std::lock(lock, sl);
|
|
|
|
auto const lastVal = ledgerMaster_.getValidatedLedger();
|
|
std::optional<Rules> rules;
|
|
if (lastVal)
|
|
{
|
|
rules = makeRulesGivenLedger(*lastVal, app_.config().features);
|
|
}
|
|
else
|
|
{
|
|
rules.emplace(app_.config().features);
|
|
}
|
|
app_.getOpenLedger().accept(
|
|
app_,
|
|
*rules,
|
|
built.ledger_,
|
|
localTxs_.getTxSet(),
|
|
anyDisputes,
|
|
retriableTxs,
|
|
tapNONE,
|
|
"consensus",
|
|
[&](OpenView& view, beast::Journal j) {
|
|
// Stuff the ledger with transactions from the queue.
|
|
return app_.getTxQ().accept(app_, view);
|
|
});
|
|
|
|
// Signal a potential fee change to subscribers after the open ledger
|
|
// is created
|
|
app_.getOPs().reportFeeChange();
|
|
}
|
|
|
|
//-------------------------------------------------------------------------
|
|
{
|
|
ledgerMaster_.switchLCL(built.ledger_);
|
|
|
|
// Do these need to exist?
|
|
XRPL_ASSERT(
|
|
ledgerMaster_.getClosedLedger()->header().hash == built.id(),
|
|
"xrpl::RCLConsensus::Adaptor::doAccept : ledger hash match");
|
|
XRPL_ASSERT(
|
|
app_.getOpenLedger().current()->header().parentHash == built.id(),
|
|
"xrpl::RCLConsensus::Adaptor::doAccept : parent hash match");
|
|
}
|
|
|
|
//-------------------------------------------------------------------------
|
|
// we entered the round with the network,
|
|
// see how close our close time is to other node's
|
|
// close time reports, and update our clock.
|
|
if ((mode == ConsensusMode::proposing || mode == ConsensusMode::observing) && !consensusFail)
|
|
{
|
|
auto closeTime = rawCloseTimes.self;
|
|
|
|
JLOG(j_.info()) << "We closed at " << closeTime.time_since_epoch().count();
|
|
using usec64_t = std::chrono::duration<std::uint64_t>;
|
|
usec64_t closeTotal = std::chrono::duration_cast<usec64_t>(closeTime.time_since_epoch());
|
|
int closeCount = 1;
|
|
|
|
for (auto const& [t, v] : rawCloseTimes.peers)
|
|
{
|
|
JLOG(j_.info()) << std::to_string(v) << " time votes for "
|
|
<< std::to_string(t.time_since_epoch().count());
|
|
closeCount += v;
|
|
closeTotal += std::chrono::duration_cast<usec64_t>(t.time_since_epoch()) * v;
|
|
}
|
|
|
|
closeTotal += usec64_t(closeCount / 2); // for round to nearest
|
|
closeTotal /= closeCount;
|
|
|
|
// Use signed times since we are subtracting
|
|
using duration = std::chrono::duration<std::int32_t>;
|
|
using time_point = std::chrono::time_point<NetClock, duration>;
|
|
auto offset = time_point{closeTotal} - std::chrono::time_point_cast<duration>(closeTime);
|
|
JLOG(j_.info()) << "Our close offset is estimated at " << offset.count() << " ("
|
|
<< closeCount << ")";
|
|
|
|
app_.getTimeKeeper().adjustCloseTime(offset);
|
|
}
|
|
}
|
|
|
|
void
|
|
RCLConsensus::Adaptor::notify(
|
|
protocol::NodeEvent ne,
|
|
RCLCxLedger const& ledger,
|
|
bool haveCorrectLCL)
|
|
{
|
|
protocol::TMStatusChange s;
|
|
|
|
if (!haveCorrectLCL)
|
|
{
|
|
s.set_newevent(protocol::neLOST_SYNC);
|
|
}
|
|
else
|
|
{
|
|
s.set_newevent(ne);
|
|
}
|
|
|
|
s.set_ledgerseq(ledger.seq());
|
|
s.set_networktime(app_.getTimeKeeper().now().time_since_epoch().count());
|
|
s.set_ledgerhashprevious(
|
|
ledger.parentID().begin(), std::decay_t<decltype(ledger.parentID())>::bytes);
|
|
s.set_ledgerhash(ledger.id().begin(), std::decay_t<decltype(ledger.id())>::bytes);
|
|
|
|
std::uint32_t uMin = 0, uMax = 0;
|
|
if (!ledgerMaster_.getFullValidatedRange(uMin, uMax))
|
|
{
|
|
uMin = 0;
|
|
uMax = 0;
|
|
}
|
|
else
|
|
{
|
|
// Don't advertise ledgers we're not willing to serve
|
|
uMin = std::max(uMin, ledgerMaster_.getEarliestFetch());
|
|
}
|
|
s.set_firstseq(uMin);
|
|
s.set_lastseq(uMax);
|
|
app_.getOverlay().foreach(send_always(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
|
|
JLOG(j_.trace()) << "send status change to peer";
|
|
}
|
|
|
|
RCLCxLedger
|
|
RCLConsensus::Adaptor::buildLCL(
|
|
RCLCxLedger const& previousLedger,
|
|
CanonicalTXSet& retriableTxs,
|
|
NetClock::time_point closeTime,
|
|
bool closeTimeCorrect,
|
|
NetClock::duration closeResolution,
|
|
std::chrono::milliseconds roundTime,
|
|
std::set<TxID>& failedTxs)
|
|
{
|
|
std::shared_ptr<Ledger> built = [&]() {
|
|
if (auto const replayData = ledgerMaster_.releaseReplay())
|
|
{
|
|
XRPL_ASSERT(
|
|
replayData->parent()->header().hash == previousLedger.id(),
|
|
"xrpl::RCLConsensus::Adaptor::buildLCL : parent hash match");
|
|
return buildLedger(*replayData, tapNONE, app_, j_);
|
|
}
|
|
return buildLedger(
|
|
previousLedger.ledger_,
|
|
closeTime,
|
|
closeTimeCorrect,
|
|
closeResolution,
|
|
app_,
|
|
retriableTxs,
|
|
failedTxs,
|
|
j_);
|
|
}();
|
|
|
|
// Update fee computations based on accepted txs
|
|
using namespace std::chrono_literals;
|
|
app_.getTxQ().processClosedLedger(app_, *built, roundTime > 5s);
|
|
|
|
// And stash the ledger in the ledger master
|
|
if (ledgerMaster_.storeLedger(built))
|
|
{
|
|
JLOG(j_.debug()) << "Consensus built ledger we already had";
|
|
}
|
|
else if (app_.getInboundLedgers().find(built->header().hash))
|
|
{
|
|
JLOG(j_.debug()) << "Consensus built ledger we were acquiring";
|
|
}
|
|
else
|
|
JLOG(j_.debug()) << "Consensus built new ledger";
|
|
return RCLCxLedger{std::move(built)};
|
|
}
|
|
|
|
void
|
|
RCLConsensus::Adaptor::validate(RCLCxLedger const& ledger, RCLTxSet const& txns, bool proposing)
|
|
{
|
|
/// @note This method is called from doAccept(), which runs on a JobQueue
|
|
/// worker thread (jtACCEPT). The consensus.validation.send span is
|
|
/// therefore standalone — NOT a child of consensus.round. A span link
|
|
/// to the round span is added below to establish the follows-from
|
|
/// relationship without requiring parent-child context propagation.
|
|
#ifdef XRPL_ENABLE_TELEMETRY
|
|
std::optional<telemetry::SpanGuard> _xrpl_guard_ = createValidationSpan();
|
|
#endif
|
|
XRPL_TRACE_SET_ATTR("xrpl.consensus.ledger.seq", static_cast<int64_t>(ledger.seq()));
|
|
XRPL_TRACE_SET_ATTR("xrpl.consensus.proposing", proposing);
|
|
XRPL_TRACE_SET_ATTR("xrpl.validation.ledger_hash", to_string(ledger.id()).c_str());
|
|
XRPL_TRACE_SET_ATTR("xrpl.validation.full", proposing);
|
|
|
|
using namespace std::chrono_literals;
|
|
|
|
auto validationTime = app_.getTimeKeeper().closeTime();
|
|
if (validationTime <= lastValidationTime_)
|
|
validationTime = lastValidationTime_ + 1s;
|
|
lastValidationTime_ = validationTime;
|
|
|
|
if (!validatorKeys_.keys)
|
|
{
|
|
JLOG(j_.warn()) << "RCLConsensus::Adaptor::validate: ValidatorKeys "
|
|
"not set\n";
|
|
return;
|
|
}
|
|
|
|
auto const& keys = *validatorKeys_.keys;
|
|
|
|
auto v = std::make_shared<STValidation>(
|
|
lastValidationTime_,
|
|
keys.publicKey,
|
|
keys.secretKey,
|
|
validatorKeys_.nodeID,
|
|
[&](STValidation& v) {
|
|
v.setFieldH256(sfLedgerHash, ledger.id());
|
|
v.setFieldH256(sfConsensusHash, txns.id());
|
|
|
|
v.setFieldU32(sfLedgerSequence, ledger.seq());
|
|
|
|
if (proposing)
|
|
v.setFlag(vfFullValidation);
|
|
|
|
// Attest to the hash of what we consider to be the last fully
|
|
// validated ledger. This may be the hash of the ledger we are
|
|
// validating here, and that's fine.
|
|
if (auto const vl = ledgerMaster_.getValidatedLedger())
|
|
v.setFieldH256(sfValidatedHash, vl->header().hash);
|
|
|
|
v.setFieldU64(sfCookie, valCookie_);
|
|
|
|
// Report our server version every flag ledger:
|
|
if (ledger.ledger_->isVotingLedger())
|
|
v.setFieldU64(sfServerVersion, BuildInfo::getEncodedVersion());
|
|
|
|
// Report our load
|
|
{
|
|
auto const& ft = app_.getFeeTrack();
|
|
auto const fee = std::max(ft.getLocalFee(), ft.getClusterFee());
|
|
if (fee > ft.getLoadBase())
|
|
v.setFieldU32(sfLoadFee, fee);
|
|
}
|
|
|
|
// If the next ledger is a flag ledger, suggest fee changes and
|
|
// new features:
|
|
if (ledger.ledger_->isVotingLedger())
|
|
{
|
|
// Fees:
|
|
feeVote_->doValidation(ledger.ledger_->fees(), ledger.ledger_->rules(), v);
|
|
|
|
// Amendments
|
|
// FIXME: pass `v` and have the function insert the array
|
|
// directly?
|
|
auto const amendments =
|
|
app_.getAmendmentTable().doValidation(getEnabledAmendments(*ledger.ledger_));
|
|
|
|
if (!amendments.empty())
|
|
v.setFieldV256(sfAmendments, STVector256(sfAmendments, amendments));
|
|
}
|
|
});
|
|
|
|
auto const serialized = v->getSerialized();
|
|
|
|
// suppress it if we receive it
|
|
app_.getHashRouter().addSuppression(sha512Half(makeSlice(serialized)));
|
|
|
|
handleNewValidation(app_, v, "local");
|
|
|
|
// Broadcast to all our peers:
|
|
protocol::TMValidation val;
|
|
val.set_validation(serialized.data(), serialized.size());
|
|
app_.getOverlay().broadcast(val);
|
|
|
|
// Publish to all our subscribers:
|
|
app_.getOPs().pubValidation(v);
|
|
}
|
|
|
|
void
|
|
RCLConsensus::Adaptor::onModeChange(ConsensusMode before, ConsensusMode after)
|
|
{
|
|
// Trace mode transitions as short-lived spans for visibility in the
|
|
// trace backend. Each transition (e.g. observing -> proposing) appears
|
|
// as a child of the current consensus.round span.
|
|
XRPL_TRACE_CONSENSUS(app_.getTelemetry(), "consensus.mode_change");
|
|
XRPL_TRACE_SET_ATTR("xrpl.consensus.mode.old", toDisplayString(before).c_str());
|
|
XRPL_TRACE_SET_ATTR("xrpl.consensus.mode.new", toDisplayString(after).c_str());
|
|
|
|
JLOG(j_.info()) << "Consensus mode change before=" << to_string(before)
|
|
<< ", after=" << to_string(after);
|
|
|
|
// If we were proposing but aren't any longer, we need to reset the
|
|
// censorship tracking to avoid bogus warnings.
|
|
if ((before == ConsensusMode::proposing || before == ConsensusMode::observing) &&
|
|
before != after)
|
|
censorshipDetector_.reset();
|
|
|
|
mode_ = after;
|
|
}
|
|
|
|
Json::Value
|
|
RCLConsensus::getJson(bool full) const
|
|
{
|
|
Json::Value ret;
|
|
{
|
|
std::lock_guard _{mutex_};
|
|
ret = consensus_.getJson(full);
|
|
}
|
|
ret["validating"] = adaptor_.validating();
|
|
return ret;
|
|
}
|
|
|
|
void
|
|
RCLConsensus::timerEntry(
|
|
NetClock::time_point const& now,
|
|
std::unique_ptr<std::stringstream> const& clog)
|
|
{
|
|
try
|
|
{
|
|
std::lock_guard _{mutex_};
|
|
consensus_.timerEntry(now, clog);
|
|
}
|
|
catch (SHAMapMissingNode const& mn)
|
|
{
|
|
// This should never happen
|
|
std::stringstream ss;
|
|
ss << "During consensus timerEntry: " << mn.what();
|
|
JLOG(j_.error()) << ss.str();
|
|
CLOG(clog) << ss.str();
|
|
Rethrow();
|
|
}
|
|
}
|
|
|
|
void
|
|
RCLConsensus::gotTxSet(NetClock::time_point const& now, RCLTxSet const& txSet)
|
|
{
|
|
try
|
|
{
|
|
std::lock_guard _{mutex_};
|
|
consensus_.gotTxSet(now, txSet);
|
|
}
|
|
catch (SHAMapMissingNode const& mn)
|
|
{
|
|
// This should never happen
|
|
JLOG(j_.error()) << "During consensus gotTxSet: " << mn.what();
|
|
Rethrow();
|
|
}
|
|
}
|
|
|
|
//! @see Consensus::simulate
|
|
|
|
void
|
|
RCLConsensus::simulate(
|
|
NetClock::time_point const& now,
|
|
std::optional<std::chrono::milliseconds> consensusDelay)
|
|
{
|
|
std::lock_guard _{mutex_};
|
|
consensus_.simulate(now, consensusDelay);
|
|
}
|
|
|
|
bool
|
|
RCLConsensus::peerProposal(NetClock::time_point const& now, RCLCxPeerPos const& newProposal)
|
|
{
|
|
std::lock_guard _{mutex_};
|
|
return consensus_.peerProposal(now, newProposal);
|
|
}
|
|
|
|
bool
|
|
RCLConsensus::Adaptor::preStartRound(RCLCxLedger const& prevLgr, hash_set<NodeID> const& nowTrusted)
|
|
{
|
|
// We have a key, we do not want out of sync validations after a restart
|
|
// and are not amendment blocked.
|
|
validating_ = validatorKeys_.keys && prevLgr.seq() >= app_.getMaxDisallowedLedger() &&
|
|
!app_.getOPs().isBlocked();
|
|
|
|
// If we are not running in standalone mode and there's a configured UNL,
|
|
// check to make sure that it's not expired.
|
|
if (validating_ && !app_.config().standalone() && (app_.getValidators().count() != 0u))
|
|
{
|
|
auto const when = app_.getValidators().expires();
|
|
|
|
if (!when || *when < app_.getTimeKeeper().now())
|
|
{
|
|
JLOG(j_.error()) << "Voluntarily bowing out of consensus process "
|
|
"because of an expired validator list.";
|
|
validating_ = false;
|
|
}
|
|
}
|
|
|
|
bool const synced = app_.getOPs().getOperatingMode() == OperatingMode::FULL;
|
|
|
|
if (validating_)
|
|
{
|
|
JLOG(j_.info()) << "Entering consensus process, validating, synced="
|
|
<< (synced ? "yes" : "no");
|
|
}
|
|
else
|
|
{
|
|
// Otherwise we just want to monitor the validation process.
|
|
JLOG(j_.info()) << "Entering consensus process, watching, synced="
|
|
<< (synced ? "yes" : "no");
|
|
}
|
|
|
|
// Notify inbound ledgers that we are starting a new round
|
|
inboundTransactions_.newRound(prevLgr.seq());
|
|
|
|
// Notify NegativeUNLVote that new validators are added
|
|
if (!nowTrusted.empty())
|
|
nUnlVote_.newValidators(prevLgr.seq() + 1, nowTrusted);
|
|
|
|
#ifdef XRPL_ENABLE_TELEMETRY
|
|
startRoundTracing(prevLgr);
|
|
#endif
|
|
|
|
// propose only if we're in sync with the network (and validating)
|
|
return validating_ && synced;
|
|
}
|
|
|
|
bool
|
|
RCLConsensus::Adaptor::haveValidated() const
|
|
{
|
|
return ledgerMaster_.haveValidated();
|
|
}
|
|
|
|
LedgerIndex
|
|
RCLConsensus::Adaptor::getValidLedgerIndex() const
|
|
{
|
|
return ledgerMaster_.getValidLedgerIndex();
|
|
}
|
|
|
|
std::pair<std::size_t, hash_set<RCLConsensus::Adaptor::NodeKey_t>>
|
|
RCLConsensus::Adaptor::getQuorumKeys() const
|
|
{
|
|
return app_.getValidators().getQuorumKeys();
|
|
}
|
|
|
|
std::size_t
|
|
RCLConsensus::Adaptor::laggards(
|
|
Ledger_t::Seq const seq,
|
|
hash_set<RCLConsensus::Adaptor::NodeKey_t>& trustedKeys) const
|
|
{
|
|
return app_.getValidations().laggards(seq, trustedKeys);
|
|
}
|
|
|
|
bool
|
|
RCLConsensus::Adaptor::validator() const
|
|
{
|
|
return validatorKeys_.keys.has_value();
|
|
}
|
|
|
|
void
|
|
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
|
|
{
|
|
if ((positions == 0u) && app_.getOPs().isFull())
|
|
app_.getOPs().setMode(OperatingMode::CONNECTED);
|
|
}
|
|
|
|
#ifdef XRPL_ENABLE_TELEMETRY
|
|
telemetry::Telemetry&
|
|
RCLConsensus::Adaptor::getTelemetry()
|
|
{
|
|
return app_.getTelemetry();
|
|
}
|
|
|
|
void
|
|
RCLConsensus::Adaptor::startRoundTracing(RCLCxLedger const& prevLgr)
|
|
{
|
|
// Save the previous round's context for span links, then end the
|
|
// previous round span before creating a new one.
|
|
if (roundSpan_)
|
|
{
|
|
prevRoundContext_ = roundSpan_->context();
|
|
roundSpan_.reset();
|
|
}
|
|
|
|
auto& tel = app_.getTelemetry();
|
|
if (!tel.shouldTraceConsensus())
|
|
return;
|
|
|
|
auto const& strategy = tel.getConsensusTraceStrategy();
|
|
|
|
// Build span links to previous round (follows-from) if available.
|
|
// This creates a causal chain between consecutive consensus rounds
|
|
// in the trace backend.
|
|
using LinkAttr = std::pair<std::string, opentelemetry::common::AttributeValue>;
|
|
using SpanLink = std::pair<opentelemetry::trace::SpanContext, std::vector<LinkAttr>>;
|
|
std::vector<SpanLink> links;
|
|
|
|
auto prevSpan = opentelemetry::trace::GetSpan(prevRoundContext_);
|
|
if (prevSpan && prevSpan->GetContext().IsValid())
|
|
{
|
|
links.emplace_back(
|
|
prevSpan->GetContext(),
|
|
std::vector<LinkAttr>{{"xrpl.link.type", std::string("follows_from")}});
|
|
}
|
|
|
|
if (strategy == "deterministic")
|
|
{
|
|
// Derive trace_id from ledger hash so all validators in this
|
|
// round produce spans under the same trace.
|
|
auto parentCtx = createDeterministicContext(prevLgr.id());
|
|
roundSpan_.emplace(tel.startSpan("consensus.round", parentCtx, links));
|
|
}
|
|
else
|
|
{
|
|
// "attribute" strategy: random trace_id, correlation via
|
|
// the xrpl.consensus.ledger_id attribute.
|
|
if (links.empty())
|
|
roundSpan_.emplace(tel.startSpan("consensus.round"));
|
|
else
|
|
{
|
|
// Use an empty context as parent (new root trace).
|
|
roundSpan_.emplace(
|
|
tel.startSpan("consensus.round", opentelemetry::context::Context{}, links));
|
|
}
|
|
}
|
|
|
|
// Set standard attributes on the round span.
|
|
roundSpan_->setAttribute("xrpl.consensus.ledger_id", to_string(prevLgr.id()).c_str());
|
|
roundSpan_->setAttribute("xrpl.consensus.ledger.seq", static_cast<int64_t>(prevLgr.seq() + 1));
|
|
roundSpan_->setAttribute("xrpl.consensus.mode", toDisplayString(mode_.load()).c_str());
|
|
roundSpan_->setAttribute("xrpl.consensus.trace_strategy", strategy.c_str());
|
|
roundSpan_->setAttribute("xrpl.consensus.round_id", static_cast<int64_t>(prevLgr.seq() + 1));
|
|
|
|
// Snapshot the SpanContext for cross-thread use by createValidationSpan().
|
|
roundSpanContext_ = roundSpan_->span().GetContext();
|
|
}
|
|
|
|
std::optional<telemetry::SpanGuard>
|
|
RCLConsensus::Adaptor::createValidationSpan()
|
|
{
|
|
if (!app_.getTelemetry().shouldTraceConsensus())
|
|
return std::nullopt;
|
|
|
|
// Build span link to the round span (follows-from relationship).
|
|
// The validation is triggered by the round but executes on a
|
|
// different thread and may outlive the round span.
|
|
std::vector<std::pair<
|
|
opentelemetry::trace::SpanContext,
|
|
std::vector<std::pair<std::string, opentelemetry::common::AttributeValue>>>>
|
|
links;
|
|
|
|
// Use the snapshotted SpanContext (set on consensus thread in
|
|
// startRoundTracing) rather than accessing roundSpan_ directly,
|
|
// since this method runs on the jtACCEPT worker thread.
|
|
if (roundSpanContext_ && roundSpanContext_->IsValid())
|
|
{
|
|
links.push_back({*roundSpanContext_, {}});
|
|
}
|
|
|
|
return telemetry::SpanGuard(app_.getTelemetry().startSpan(
|
|
"consensus.validation.send", opentelemetry::context::RuntimeContext::GetCurrent(), links));
|
|
}
|
|
#endif
|
|
|
|
void
|
|
RCLConsensus::startRound(
|
|
NetClock::time_point const& now,
|
|
RCLCxLedger::ID const& prevLgrId,
|
|
RCLCxLedger const& prevLgr,
|
|
hash_set<NodeID> const& nowUntrusted,
|
|
hash_set<NodeID> const& nowTrusted,
|
|
std::unique_ptr<std::stringstream> const& clog)
|
|
{
|
|
std::lock_guard _{mutex_};
|
|
consensus_.startRound(
|
|
now, prevLgrId, prevLgr, nowUntrusted, adaptor_.preStartRound(prevLgr, nowTrusted), clog);
|
|
}
|
|
|
|
RclConsensusLogger::RclConsensusLogger(char const* label, bool const validating, beast::Journal j)
|
|
: j_(j)
|
|
{
|
|
if (!validating && !j.info())
|
|
return;
|
|
start_ = std::chrono::steady_clock::now();
|
|
ss_ = std::make_unique<std::stringstream>();
|
|
header_ = "ConsensusLogger ";
|
|
header_ += label;
|
|
header_ += ": ";
|
|
}
|
|
|
|
RclConsensusLogger::~RclConsensusLogger()
|
|
{
|
|
if (!ss_)
|
|
return;
|
|
auto const duration = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|
std::chrono::steady_clock::now() - start_);
|
|
std::stringstream outSs;
|
|
outSs << header_ << "duration " << (duration.count() / 1000) << '.' << std::setw(3)
|
|
<< std::setfill('0') << (duration.count() % 1000) << "s. " << ss_->str();
|
|
j_.sink().writeAlways(beast::severities::kInfo, outSs.str());
|
|
}
|
|
|
|
} // namespace xrpl
|