Files
rippled/src/xrpld/app/consensus/RCLConsensus.cpp
Pratik Mankawde a44d91ec27 leftover clang-tidy fixes
Signed-off-by: Pratik Mankawde <3397372+pratikmankawde@users.noreply.github.com>
2026-05-29 17:52:45 +01:00

1364 lines
46 KiB
C++

#include <xrpld/app/consensus/RCLConsensus.h>
#include <xrpld/app/consensus/RCLCensorshipDetector.h>
#include <xrpld/app/consensus/RCLCxLedger.h>
#include <xrpld/app/consensus/RCLCxPeerPos.h>
#include <xrpld/app/consensus/RCLCxTx.h>
#include <xrpld/app/consensus/RCLValidations.h>
#include <xrpld/app/ledger/BuildLedger.h>
#include <xrpld/app/ledger/InboundLedger.h>
#include <xrpld/app/ledger/InboundLedgers.h>
#include <xrpld/app/ledger/InboundTransactions.h>
#include <xrpld/app/ledger/LedgerMaster.h>
#include <xrpld/app/ledger/LocalTxs.h>
#include <xrpld/app/ledger/OpenLedger.h>
#include <xrpld/app/misc/FeeVote.h>
#include <xrpld/app/misc/NegativeUNLVote.h>
#include <xrpld/app/misc/TxQ.h>
#include <xrpld/app/misc/ValidatorKeys.h>
#include <xrpld/app/misc/ValidatorList.h>
#include <xrpld/consensus/Consensus.h>
#include <xrpld/consensus/ConsensusSpanNames.h>
#include <xrpld/consensus/ConsensusTypes.h>
#include <xrpld/overlay/Overlay.h>
#include <xrpld/overlay/predicates.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/Slice.h>
#include <xrpl/basics/UnorderedContainers.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/chrono.h>
#include <xrpl/basics/contract.h>
#include <xrpl/basics/random.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/Zero.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/core/HashRouter.h>
#include <xrpl/core/Job.h>
#include <xrpl/crypto/csprng.h>
#include <xrpl/json/json_value.h>
#include <xrpl/json/json_writer.h>
#include <xrpl/ledger/AmendmentTable.h>
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/ledger/Ledger.h>
#include <xrpl/ledger/LedgerTiming.h>
#include <xrpl/ledger/OpenView.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/ledger/View.h>
#include <xrpl/protocol/BuildInfo.h>
#include <xrpl/protocol/Protocol.h>
#include <xrpl/protocol/PublicKey.h>
#include <xrpl/protocol/RippleLedgerHash.h>
#include <xrpl/protocol/Rules.h>
#include <xrpl/protocol/SField.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/STValidation.h>
#include <xrpl/protocol/SecretKey.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/UintTypes.h>
#include <xrpl/protocol/digest.h>
#include <xrpl/protocol/tokens.h>
#include <xrpl/server/LoadFeeTrack.h>
#include <xrpl/server/NetworkOPs.h>
#include <xrpl/shamap/SHAMapItem.h>
#include <xrpl/shamap/SHAMapMissingNode.h>
#include <xrpl/shamap/SHAMapTreeNode.h>
#include <xrpl/telemetry/SpanGuard.h>
#include <xrpl/telemetry/SpanNames.h>
#include <xrpl/telemetry/Telemetry.h>
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <xrpl.pb.h>
#include <algorithm>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <exception>
#include <iomanip>
#include <limits>
#include <memory>
#include <mutex>
#include <optional>
#include <set>
#include <sstream>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
namespace xrpl {
RCLConsensus::RCLConsensus(
Application& app,
std::unique_ptr<FeeVote>&& feeVote,
LedgerMaster& ledgerMaster,
LocalTxs& localTxs,
InboundTransactions& inboundTransactions,
Consensus<Adaptor>::clock_type const& clock,
ValidatorKeys const& validatorKeys,
beast::Journal journal)
: adaptor_(
app,
std::move(feeVote),
ledgerMaster,
localTxs,
inboundTransactions,
validatorKeys,
journal)
, consensus_(clock, adaptor_, journal)
, j_(journal)
{
}
RCLConsensus::Adaptor::Adaptor(
Application& app,
std::unique_ptr<FeeVote>&& feeVote,
LedgerMaster& ledgerMaster,
LocalTxs& localTxs,
InboundTransactions& inboundTransactions,
ValidatorKeys const& validatorKeys,
beast::Journal journal)
: app_(app)
, feeVote_(std::move(feeVote))
, ledgerMaster_(ledgerMaster)
, localTxs_(localTxs)
, inboundTransactions_{inboundTransactions}
, j_(journal)
, validatorKeys_(validatorKeys)
, valCookie_(1 + randInt(cryptoPrng(), std::numeric_limits<std::uint64_t>::max() - 1))
, nUnlVote_(validatorKeys_.nodeID, j_)
{
XRPL_ASSERT(valCookie_, "xrpl::RCLConsensus::Adaptor::Adaptor : nonzero cookie");
JLOG(j_.info()) << "Consensus engine started (cookie: " + std::to_string(valCookie_) + ")";
if (validatorKeys_.nodeID != beast::kZero && validatorKeys_.keys)
{
JLOG(j_.info()) << "Validator identity: "
<< toBase58(TokenType::NodePublic, validatorKeys_.keys->masterPublicKey);
if (validatorKeys_.keys->masterPublicKey != validatorKeys_.keys->publicKey)
{
JLOG(j_.debug()) << "Validator ephemeral signing key: "
<< toBase58(TokenType::NodePublic, validatorKeys_.keys->publicKey)
<< " (seq: " << std::to_string(validatorKeys_.sequence) << ")";
}
}
}
std::optional<RCLCxLedger>
RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
{
// we need to switch the ledger we're working from
auto built = ledgerMaster_.getLedgerByHash(hash);
if (!built)
{
if (acquiringLedger_ != hash)
{
// need to start acquiring the correct consensus LCL
JLOG(j_.warn()) << "Need consensus ledger " << hash;
// Tell the ledger acquire system that we need the consensus ledger
acquiringLedger_ = hash;
app_.getJobQueue().addJob(JtAdvance, "GetConsL1", [id = hash, &app = app_, this]() {
JLOG(j_.debug()) << "JOB advanceLedger getConsensusLedger1 started";
app.getInboundLedgers().acquireAsync(id, 0, InboundLedger::Reason::CONSENSUS);
});
}
return std::nullopt;
}
XRPL_ASSERT(
!built->open() && built->isImmutable(),
"xrpl::RCLConsensus::Adaptor::acquireLedger : valid ledger state");
XRPL_ASSERT(
built->header().hash == hash,
"xrpl::RCLConsensus::Adaptor::acquireLedger : ledger hash match");
// Notify inbound transactions of the new ledger sequence number
inboundTransactions_.newRound(built->header().seq);
return RCLCxLedger(built);
}
void
RCLConsensus::Adaptor::share(RCLCxPeerPos const& peerPos)
{
protocol::TMProposeSet prop;
auto const& proposal = peerPos.proposal();
prop.set_proposeseq(proposal.proposeSeq());
prop.set_closetime(proposal.closeTime().time_since_epoch().count());
prop.set_currenttxhash(proposal.position().begin(), proposal.position().size());
prop.set_previousledger(proposal.prevLedger().begin(), proposal.prevLedger().size());
auto const pk = peerPos.publicKey().slice();
prop.set_nodepubkey(pk.data(), pk.size());
auto const sig = peerPos.signature();
prop.set_signature(sig.data(), sig.size());
app_.getOverlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey());
}
void
RCLConsensus::Adaptor::share(RCLCxTx const& tx)
{
// If we didn't relay this transaction recently, relay it to all peers
if (app_.getHashRouter().shouldRelay(tx.id()))
{
JLOG(j_.debug()) << "Relaying disputed tx " << tx.id();
auto const slice = tx.tx->slice();
protocol::TMTransaction msg;
msg.set_rawtransaction(slice.data(), slice.size());
msg.set_status(protocol::tsNEW);
msg.set_receivetimestamp(app_.getTimeKeeper().now().time_since_epoch().count());
static std::set<Peer::id_t> const kSkip{};
app_.getOverlay().relay(tx.id(), msg, kSkip);
}
else
{
JLOG(j_.debug()) << "Not relaying disputed tx " << tx.id();
}
}
void
RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal)
{
auto span = telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus,
telemetry::seg::consensus,
telemetry::consensus::span::op::proposalSend);
span.setAttribute(
telemetry::consensus::span::attr::round, static_cast<int64_t>(proposal.proposeSeq()));
JLOG(j_.trace()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ")
<< xrpl::to_string(proposal.prevLedger()) << " -> "
<< xrpl::to_string(proposal.position());
protocol::TMProposeSet prop;
prop.set_currenttxhash(proposal.position().begin(), proposal.position().size());
prop.set_previousledger(proposal.prevLedger().begin(), proposal.prevLedger().size());
prop.set_proposeseq(proposal.proposeSeq());
prop.set_closetime(proposal.closeTime().time_since_epoch().count());
if (!validatorKeys_.keys)
{
JLOG(j_.warn()) << "RCLConsensus::Adaptor::propose: ValidatorKeys "
"not set: \n";
return;
}
auto const& keys = *validatorKeys_.keys;
prop.set_nodepubkey(keys.publicKey.data(), keys.publicKey.size());
auto sig = signDigest(keys.publicKey, keys.secretKey, proposal.signingHash());
prop.set_signature(sig.data(), sig.size());
auto const suppression = proposalUniqueId(
proposal.position(),
proposal.prevLedger(),
proposal.proposeSeq(),
proposal.closeTime(),
keys.publicKey,
sig);
app_.getHashRouter().addSuppression(suppression);
// Inject the current thread's active span context (e.g. the
// consensus round span from Phase 4) so receiving peers can link
// their proposal.receive span as a child of this trace.
telemetry::SpanGuard::injectCurrentContextToProtobuf(*prop.mutable_trace_context());
app_.getOverlay().broadcast(prop);
}
void
RCLConsensus::Adaptor::share(RCLTxSet const& txns)
{
inboundTransactions_.giveSet(txns.id(), txns.map, false);
}
std::optional<RCLTxSet>
RCLConsensus::Adaptor::acquireTxSet(RCLTxSet::ID const& setId)
{
if (auto txns = inboundTransactions_.getSet(setId, true))
{
return RCLTxSet{std::move(txns)};
}
return std::nullopt;
}
bool
RCLConsensus::Adaptor::hasOpenTransactions() const
{
return !app_.getOpenLedger().empty();
}
std::size_t
RCLConsensus::Adaptor::proposersValidated(LedgerHash const& h) const
{
return app_.getValidations().numTrustedForLedger(h);
}
std::size_t
RCLConsensus::Adaptor::proposersFinished(RCLCxLedger const& ledger, LedgerHash const& h) const
{
RCLValidations& vals = app_.getValidations();
return vals.getNodesAfter(RCLValidatedLedger(ledger.ledger, vals.adaptor().journal()), h);
}
uint256
RCLConsensus::Adaptor::getPrevLedger(
uint256 ledgerID,
RCLCxLedger const& ledger,
ConsensusMode mode)
{
RCLValidations& vals = app_.getValidations();
uint256 netLgr = vals.getPreferred(
RCLValidatedLedger{ledger.ledger, vals.adaptor().journal()},
ledgerMaster_.getValidLedgerIndex());
if (netLgr != ledgerID)
{
if (mode != ConsensusMode::WrongLedger)
app_.getOPs().consensusViewChange();
JLOG(j_.debug()) << json::Compact(app_.getValidations().getJsonTrie());
}
return netLgr;
}
auto
RCLConsensus::Adaptor::onClose(
RCLCxLedger const& ledger,
NetClock::time_point const& closeTime,
ConsensusMode mode) -> Result
{
namespace cs = telemetry::consensus::span;
auto span = telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, cs::op::ledgerClose);
span.setAttribute(cs::attr::ledgerSeq, static_cast<int64_t>(ledger.ledger->header().seq) + 1);
span.setAttribute(cs::attr::mode, toDisplayString(mode).c_str());
span.setAttribute(
cs::attr::txCountOpen, static_cast<int64_t>(app_.getOpenLedger().current()->txCount()));
span.setAttribute(
cs::attr::closeTimeResolutionMs,
static_cast<int64_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(ledger.closeTimeResolution())
.count()));
bool const wrongLCL = mode == ConsensusMode::WrongLedger;
bool const proposing = mode == ConsensusMode::Proposing;
notify(protocol::neCLOSING_LEDGER, ledger, !wrongLCL);
auto const& prevLedger = ledger.ledger;
ledgerMaster_.applyHeldTransactions();
// Tell the ledger master not to acquire the ledger we're probably building
ledgerMaster_.setBuildingLedger(prevLedger->header().seq + 1);
auto initialLedger = app_.getOpenLedger().current();
auto initialSet = std::make_shared<SHAMap>(SHAMapType::TRANSACTION, app_.getNodeFamily());
initialSet->setUnbacked();
// Build SHAMap containing all transactions in our open ledger
for (auto const& tx : initialLedger->txs)
{
JLOG(j_.trace()) << "Adding open ledger TX " << tx.first->getTransactionID();
Serializer s(2048);
tx.first->add(s);
initialSet->addItem(
SHAMapNodeType::TnTransactionNm,
makeShamapitem(tx.first->getTransactionID(), s.slice()));
}
// Add pseudo-transactions to the set
if (app_.config().standalone() || (proposing && !wrongLCL))
{
if (prevLedger->isFlagLedger())
{
// previous ledger was flag ledger, add fee and amendment
// pseudo-transactions
auto validations =
app_.getValidators().negativeUNLFilter(app_.getValidations().getTrustedForLedger(
prevLedger->header().parentHash, prevLedger->seq() - 1));
if (validations.size() >= app_.getValidators().quorum())
{
feeVote_->doVoting(prevLedger, validations, initialSet);
app_.getAmendmentTable().doVoting(prevLedger, validations, initialSet, j_);
}
}
else if (prevLedger->isVotingLedger())
{
// previous ledger was a voting ledger,
// so the current consensus session is for a flag ledger,
// add negative UNL pseudo-transactions
nUnlVote_.doVoting(
prevLedger,
app_.getValidators().getTrustedMasterKeys(),
app_.getValidations(),
initialSet);
}
}
// Now we need an immutable snapshot
initialSet = initialSet->snapShot(false);
if (!wrongLCL)
{
LedgerIndex const seq = prevLedger->header().seq + 1;
RCLCensorshipDetector<TxID, LedgerIndex>::TxIDSeqVec proposed;
initialSet->visitLeaves(
[&proposed, seq](boost::intrusive_ptr<SHAMapItem const> const& item) {
proposed.emplace_back(item->key(), seq);
});
censorshipDetector_.propose(std::move(proposed));
}
// Needed because of the move below.
auto const setHash = initialSet->getHash().asUInt256();
return Result{
std::move(initialSet),
RCLCxPeerPos::Proposal{
initialLedger->header().parentHash,
RCLCxPeerPos::Proposal::kSeqJoin,
setHash,
closeTime,
app_.getTimeKeeper().closeTime(),
validatorKeys_.nodeID}};
}
void
RCLConsensus::Adaptor::onForceAccept(
Result const& result,
RCLCxLedger const& prevLedger,
NetClock::duration const& closeResolution,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode,
json::Value&& consensusJson)
{
auto acceptSpan = makeAcceptSpan(result);
doAccept(
result,
prevLedger,
closeResolution,
rawCloseTimes,
mode,
std::move(consensusJson),
std::move(acceptSpan));
}
void
RCLConsensus::Adaptor::onAccept(
Result const& result,
RCLCxLedger const& prevLedger,
NetClock::duration const& closeResolution,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode,
json::Value&& consensusJson,
bool const validating)
{
auto acceptSpan = makeAcceptSpan(result);
app_.getJobQueue().addJob(
JtAccept,
"AcceptLedger",
// NOLINTNEXTLINE(cppcoreguidelines-misleading-capture-default-by-value)
[=, this, cj = std::move(consensusJson), sp = std::move(acceptSpan)]() mutable {
// Note that no lock is held or acquired during this job.
// This is because generic Consensus guarantees that once a ledger
// is accepted, the consensus results and capture by reference state
// will not change until startRound is called (which happens via
// endConsensus).
RclConsensusLogger clog("onAccept", validating, j_);
this->doAccept(
result,
prevLedger,
closeResolution,
rawCloseTimes,
mode,
std::move(cj),
std::move(sp));
this->app_.getOPs().endConsensus(clog.ss());
});
}
std::shared_ptr<telemetry::SpanGuard>
RCLConsensus::Adaptor::makeAcceptSpan(Result const& result)
{
namespace cs = telemetry::consensus::span;
auto span = std::make_shared<telemetry::SpanGuard>(
telemetry::SpanGuard::childSpan(cs::accept, roundSpanContext_));
span->setAttribute(cs::attr::proposers, static_cast<int64_t>(result.proposers));
span->setAttribute(
cs::attr::roundTimeMs, static_cast<int64_t>(result.roundTime.read().count()));
span->setAttribute(cs::attr::quorum, static_cast<int64_t>(app_.getValidators().quorum()));
// Capture the accept span's context so createValidationSpan() — which
// runs on the jtACCEPT worker thread — can link the validation.send
// span to the accept span (matching the design diagram and the
// "validation follows acceptance" causal model).
if (*span)
{
acceptSpanContext_ = span->captureContext();
}
return span;
}
void
RCLConsensus::Adaptor::doAccept(
Result const& result,
RCLCxLedger const& prevLedger,
NetClock::duration closeResolution,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode,
json::Value&& consensusJson,
std::shared_ptr<telemetry::SpanGuard> acceptSpan)
{
namespace cs = telemetry::consensus::span;
prevProposers_ = result.proposers;
prevRoundTime_ = result.roundTime.read();
bool closeTimeCorrect = false;
bool const proposing = mode == ConsensusMode::Proposing;
bool const haveCorrectLCL = mode != ConsensusMode::WrongLedger;
bool const consensusFail = result.state == ConsensusState::MovedOn;
auto consensusCloseTime = result.position.closeTime();
if (consensusCloseTime == NetClock::time_point{})
{
// We agreed to disagree on the close time
using namespace std::chrono_literals;
consensusCloseTime = prevLedger.closeTime() + 1s;
closeTimeCorrect = false;
}
else
{
// We agreed on a close time
consensusCloseTime =
effCloseTime(consensusCloseTime, closeResolution, prevLedger.closeTime());
closeTimeCorrect = true;
}
auto doAcceptSpan = acceptSpan
? acceptSpan->childSpan(cs::acceptApply)
: telemetry::SpanGuard::childSpan(cs::acceptApply, roundSpanContext_);
doAcceptSpan.setAttribute(cs::attr::ledgerSeq, static_cast<int64_t>(prevLedger.seq()) + 1);
doAcceptSpan.setAttribute(
cs::attr::closeTime, static_cast<int64_t>(consensusCloseTime.time_since_epoch().count()));
doAcceptSpan.setAttribute(cs::attr::closeTimeCorrect, closeTimeCorrect);
doAcceptSpan.setAttribute(
cs::attr::closeResolutionMs,
static_cast<int64_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(closeResolution).count()));
doAcceptSpan.setAttribute(
cs::attr::consensusState, std::string(consensusFail ? "moved_on" : "finished"));
doAcceptSpan.setAttribute(cs::attr::proposing, proposing);
doAcceptSpan.setAttribute(
cs::attr::roundTimeMs, static_cast<int64_t>(result.roundTime.read().count()));
doAcceptSpan.setAttribute(
cs::attr::parentCloseTime,
static_cast<int64_t>(prevLedger.closeTime().time_since_epoch().count()));
doAcceptSpan.setAttribute(
cs::attr::closeTimeSelf,
static_cast<int64_t>(rawCloseTimes.self.time_since_epoch().count()));
doAcceptSpan.setAttribute(
cs::attr::closeTimeVoteBins, static_cast<int64_t>(rawCloseTimes.peers.size()));
doAcceptSpan.setAttribute(
cs::attr::disputesResolvedCount, static_cast<int64_t>(result.disputes.size()));
{
auto const prevRes = prevLedger.closeTimeResolution();
auto const dir = [&]() -> std::string {
if (closeResolution > prevRes)
{
return "increased";
}
if (closeResolution < prevRes)
{
return "decreased";
}
return "unchanged";
}();
doAcceptSpan.setAttribute(cs::attr::resolutionDirection, std::move(dir));
}
JLOG(j_.debug()) << "Report: Prop=" << (proposing ? "yes" : "no")
<< " val=" << (validating_ ? "yes" : "no")
<< " corLCL=" << (haveCorrectLCL ? "yes" : "no")
<< " fail=" << (consensusFail ? "yes" : "no");
JLOG(j_.debug()) << "Report: Prev = " << prevLedger.id() << ":" << prevLedger.seq();
//--------------------------------------------------------------------------
std::set<TxID> failed;
// We want to put transactions in an unpredictable but deterministic order:
// we use the hash of the set.
//
// FIXME: Use a std::vector and a custom sorter instead of CanonicalTXSet?
CanonicalTXSet retriableTxs{result.txns.map->getHash().asUInt256()};
JLOG(j_.debug()) << "Building canonical tx set: " << retriableTxs.key();
int64_t txCount = 0;
for (auto const& item : *result.txns.map)
{
try
{
retriableTxs.insert(std::make_shared<STTx const>(SerialIter{item.slice()}));
JLOG(j_.debug()) << " Tx: " << item.key();
++txCount;
auto const txHash = to_string(item.key());
doAcceptSpan.addEvent(cs::event::txIncluded, {{cs::attr::txId, txHash}});
}
catch (std::exception const& ex)
{
failed.insert(item.key());
JLOG(j_.warn()) << " Tx: " << item.key() << " throws: " << ex.what();
}
}
doAcceptSpan.setAttribute(cs::attr::txCount, txCount);
auto built = buildLCL(
prevLedger,
retriableTxs,
consensusCloseTime,
closeTimeCorrect,
closeResolution,
result.roundTime.read(),
failed);
auto const newLCLHash = built.id();
JLOG(j_.debug()) << "Built ledger #" << built.seq() << ": " << newLCLHash;
// Tell directly connected peers that we have a new LCL
notify(protocol::neACCEPTED_LEDGER, built, haveCorrectLCL);
// As long as we're in sync with the network, attempt to detect attempts
// at censorship of transaction by tracking which ones don't make it in
// after a period of time.
if (haveCorrectLCL && result.state == ConsensusState::Yes)
{
std::vector<TxID> accepted;
result.txns.map->visitLeaves(
[&accepted](boost::intrusive_ptr<SHAMapItem const> const& item) {
accepted.push_back(item->key());
});
// Track all the transactions which failed or were marked as retriable
for (auto const& r : retriableTxs)
failed.insert(r.first.getTXID());
censorshipDetector_.check(
std::move(accepted),
[curr = built.seq(), j = app_.getJournal("CensorshipDetector"), &failed](
uint256 const& id, LedgerIndex seq) {
if (failed.contains(id))
return true;
auto const wait = curr - seq;
if (wait && (wait % kCensorshipWarnInternal == 0))
{
std::ostringstream ss;
ss << "Potential Censorship: Eligible tx " << id
<< ", which we are tracking since ledger " << seq
<< " has not been included as of ledger " << curr << ".";
JLOG(j.warn()) << ss.str();
}
return false;
});
}
if (validating_)
validating_ = ledgerMaster_.isCompatible(*built.ledger, j_.warn(), "Not validating");
if (validating_ && !consensusFail && app_.getValidations().canValidateSeq(built.seq()))
{
validate(built, result.txns, proposing);
JLOG(j_.info()) << "CNF Val " << newLCLHash;
}
else
JLOG(j_.info()) << "CNF buildLCL " << newLCLHash;
// See if we can accept a ledger as fully-validated
ledgerMaster_.consensusBuilt(built.ledger, result.txns.id(), std::move(consensusJson));
//-------------------------------------------------------------------------
{
// Apply disputed transactions that didn't get in
//
// The first crack of transactions to get into the new
// open ledger goes to transactions proposed by a validator
// we trust but not included in the consensus set.
//
// These are done first because they are the most likely
// to receive agreement during consensus. They are also
// ordered logically "sooner" than transactions not mentioned
// in the previous consensus round.
//
bool anyDisputes = false;
for (auto const& [_, dispute] : result.disputes)
{
(void)_;
if (!dispute.getOurVote())
{
// we voted NO
try
{
JLOG(j_.debug()) << "Test applying disputed transaction that did"
<< " not get in " << dispute.tx().id();
SerialIter sit(dispute.tx().tx->slice());
auto txn = std::make_shared<STTx const>(sit);
// Disputed pseudo-transactions that were not accepted
// can't be successfully applied in the next ledger
if (isPseudoTx(*txn))
continue;
retriableTxs.insert(txn);
anyDisputes = true;
}
catch (std::exception const& ex)
{
JLOG(j_.debug()) << "Failed to apply transaction we voted "
"NO on. Exception: "
<< ex.what();
}
}
}
// Build new open ledger
std::unique_lock lock{app_.getMasterMutex(), std::defer_lock};
std::unique_lock sl{ledgerMaster_.peekMutex(), std::defer_lock};
std::lock(lock, sl);
auto const lastVal = ledgerMaster_.getValidatedLedger();
std::optional<Rules> rules;
if (lastVal)
{
rules = makeRulesGivenLedger(*lastVal, app_.config().features);
}
else
{
rules.emplace(app_.config().features);
}
app_.getOpenLedger().accept(
app_,
*rules,
built.ledger,
localTxs_.getTxSet(),
anyDisputes,
retriableTxs,
TapNone,
"consensus",
[&](OpenView& view, beast::Journal j) {
// Stuff the ledger with transactions from the queue.
return app_.getTxQ().accept(app_, view);
});
// Signal a potential fee change to subscribers after the open ledger
// is created
app_.getOPs().reportFeeChange();
}
//-------------------------------------------------------------------------
{
ledgerMaster_.switchLCL(built.ledger);
// Do these need to exist?
XRPL_ASSERT(
ledgerMaster_.getClosedLedger()->header().hash == built.id(),
"xrpl::RCLConsensus::Adaptor::doAccept : ledger hash match");
XRPL_ASSERT(
app_.getOpenLedger().current()->header().parentHash == built.id(),
"xrpl::RCLConsensus::Adaptor::doAccept : parent hash match");
}
//-------------------------------------------------------------------------
// we entered the round with the network,
// see how close our close time is to other node's
// close time reports, and update our clock.
if ((mode == ConsensusMode::Proposing || mode == ConsensusMode::Observing) && !consensusFail)
{
auto closeTime = rawCloseTimes.self;
JLOG(j_.info()) << "We closed at " << closeTime.time_since_epoch().count();
using usec64_t = std::chrono::duration<std::uint64_t>;
usec64_t closeTotal = std::chrono::duration_cast<usec64_t>(closeTime.time_since_epoch());
int closeCount = 1;
for (auto const& [t, v] : rawCloseTimes.peers)
{
JLOG(j_.info()) << std::to_string(v) << " time votes for "
<< std::to_string(t.time_since_epoch().count());
closeCount += v;
closeTotal += std::chrono::duration_cast<usec64_t>(t.time_since_epoch()) * v;
}
closeTotal += usec64_t(closeCount / 2); // for round to nearest
closeTotal /= closeCount;
// Use signed times since we are subtracting
using duration = std::chrono::duration<std::int32_t>;
using time_point = std::chrono::time_point<NetClock, duration>;
auto offset = time_point{closeTotal} - std::chrono::time_point_cast<duration>(closeTime);
JLOG(j_.info()) << "Our close offset is estimated at " << offset.count() << " ("
<< closeCount << ")";
app_.getTimeKeeper().adjustCloseTime(offset);
}
}
void
RCLConsensus::Adaptor::notify(
protocol::NodeEvent ne,
RCLCxLedger const& ledger,
bool haveCorrectLCL)
{
protocol::TMStatusChange s;
if (!haveCorrectLCL)
{
s.set_newevent(protocol::neLOST_SYNC);
}
else
{
s.set_newevent(ne);
}
s.set_ledgerseq(ledger.seq());
s.set_networktime(app_.getTimeKeeper().now().time_since_epoch().count());
s.set_ledgerhashprevious(
ledger.parentID().begin(), std::decay_t<decltype(ledger.parentID())>::kBytes);
s.set_ledgerhash(ledger.id().begin(), std::decay_t<decltype(ledger.id())>::kBytes);
std::uint32_t uMin = 0, uMax = 0;
if (!ledgerMaster_.getFullValidatedRange(uMin, uMax))
{
uMin = 0;
uMax = 0;
}
else
{
// Don't advertise ledgers we're not willing to serve
uMin = std::max(uMin, ledgerMaster_.getEarliestFetch());
}
s.set_firstseq(uMin);
s.set_lastseq(uMax);
app_.getOverlay().foreach(SendAlways(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
JLOG(j_.trace()) << "send status change to peer";
}
RCLCxLedger
RCLConsensus::Adaptor::buildLCL(
RCLCxLedger const& previousLedger,
CanonicalTXSet& retriableTxs,
NetClock::time_point closeTime,
bool closeTimeCorrect,
NetClock::duration closeResolution,
std::chrono::milliseconds roundTime,
std::set<TxID>& failedTxs)
{
std::shared_ptr<Ledger> built = [&]() {
if (auto const replayData = ledgerMaster_.releaseReplay())
{
XRPL_ASSERT(
replayData->parent()->header().hash == previousLedger.id(),
"xrpl::RCLConsensus::Adaptor::buildLCL : parent hash match");
return buildLedger(*replayData, TapNone, app_, j_);
}
return buildLedger(
previousLedger.ledger,
closeTime,
closeTimeCorrect,
closeResolution,
app_,
retriableTxs,
failedTxs,
j_);
}();
// Update fee computations based on accepted txs
using namespace std::chrono_literals;
app_.getTxQ().processClosedLedger(app_, *built, roundTime > 5s);
// And stash the ledger in the ledger master
if (ledgerMaster_.storeLedger(built))
{
JLOG(j_.debug()) << "Consensus built ledger we already had";
}
else if (app_.getInboundLedgers().find(built->header().hash))
{
JLOG(j_.debug()) << "Consensus built ledger we were acquiring";
}
else
JLOG(j_.debug()) << "Consensus built new ledger";
return RCLCxLedger{std::move(built)};
}
void
RCLConsensus::Adaptor::validate(RCLCxLedger const& ledger, RCLTxSet const& txns, bool proposing)
{
auto valSpan = createValidationSpan();
if (valSpan)
{
namespace cs = telemetry::consensus::span;
valSpan->setAttribute(cs::attr::ledgerSeq, static_cast<int64_t>(ledger.seq()));
valSpan->setAttribute(cs::attr::proposing, proposing);
// proposing implies a full validation (vfFullValidation is set on
// the STValidation only when proposing — see below).
valSpan->setAttribute(cs::attr::fullValidation, proposing);
}
using namespace std::chrono_literals;
auto validationTime = app_.getTimeKeeper().closeTime();
if (validationTime <= lastValidationTime_)
validationTime = lastValidationTime_ + 1s;
lastValidationTime_ = validationTime;
if (valSpan)
{
valSpan->setAttribute(
telemetry::consensus::span::attr::validationSignTime,
static_cast<int64_t>(validationTime.time_since_epoch().count()));
}
if (!validatorKeys_.keys)
{
JLOG(j_.warn()) << "RCLConsensus::Adaptor::validate: ValidatorKeys "
"not set\n";
return;
}
auto const& keys = *validatorKeys_.keys;
auto v = std::make_shared<STValidation>(
lastValidationTime_,
keys.publicKey,
keys.secretKey,
validatorKeys_.nodeID,
[&](STValidation& v) {
v.setFieldH256(sfLedgerHash, ledger.id());
v.setFieldH256(sfConsensusHash, txns.id());
v.setFieldU32(sfLedgerSequence, ledger.seq());
if (proposing)
v.setFlag(kVfFullValidation);
// Attest to the hash of what we consider to be the last fully
// validated ledger. This may be the hash of the ledger we are
// validating here, and that's fine.
if (auto const vl = ledgerMaster_.getValidatedLedger())
v.setFieldH256(sfValidatedHash, vl->header().hash);
v.setFieldU64(sfCookie, valCookie_);
// Report our server version every flag ledger:
if (ledger.ledger->isVotingLedger())
v.setFieldU64(sfServerVersion, BuildInfo::getEncodedVersion());
// Report our load
{
auto const& ft = app_.getFeeTrack();
auto const fee = std::max(ft.getLocalFee(), ft.getClusterFee());
if (fee > ft.getLoadBase())
v.setFieldU32(sfLoadFee, fee);
}
// If the next ledger is a flag ledger, suggest fee changes and
// new features:
if (ledger.ledger->isVotingLedger())
{
// Fees:
feeVote_->doValidation(ledger.ledger->fees(), ledger.ledger->rules(), v);
// Amendments
// FIXME: pass `v` and have the function insert the array
// directly?
auto const amendments =
app_.getAmendmentTable().doValidation(getEnabledAmendments(*ledger.ledger));
if (!amendments.empty())
v.setFieldV256(sfAmendments, STVector256(sfAmendments, amendments));
}
});
auto const serialized = v->getSerialized();
// suppress it if we receive it
app_.getHashRouter().addSuppression(sha512Half(makeSlice(serialized)));
handleNewValidation(app_, v, "local");
// Broadcast to all our peers:
protocol::TMValidation val;
val.set_validation(serialized.data(), serialized.size());
// Inject the current thread's active span context so receiving
// peers can link their validation.receive span as a child.
//
// TODO(observability/secure-OTel): the trace_context appended below is
// outside the cryptographic signature on `serialized` and is therefore
// unauthenticated. Receivers cannot prove it was not tampered with by
// a relay. A signed trace context (either folded into the validation
// payload or carried by an authenticated trace_state token) is tracked
// as a follow-up — see PR #6425 discussion r3317273388 and
// OpenTelemetryPlan/secure-OTel.md. Until then, downstream consumers
// must treat the validation trace_context as advisory only.
telemetry::SpanGuard::injectCurrentContextToProtobuf(*val.mutable_trace_context());
app_.getOverlay().broadcast(val);
// Publish to all our subscribers:
app_.getOPs().pubValidation(v);
}
void
RCLConsensus::Adaptor::onModeChange(ConsensusMode before, ConsensusMode after)
{
namespace cs = telemetry::consensus::span;
auto span = telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, cs::op::modeChange);
span.setAttribute(cs::attr::modeOld, toDisplayString(before).c_str());
span.setAttribute(cs::attr::modeNew, toDisplayString(after).c_str());
JLOG(j_.info()) << "Consensus mode change before=" << to_string(before)
<< ", after=" << to_string(after);
// If we were proposing but aren't any longer, we need to reset the
// censorship tracking to avoid bogus warnings.
if ((before == ConsensusMode::Proposing || before == ConsensusMode::Observing) &&
before != after)
censorshipDetector_.reset();
mode_ = after;
}
json::Value
RCLConsensus::getJson(bool full) const
{
json::Value ret;
{
std::scoped_lock const _{mutex_};
ret = consensus_.getJson(full);
}
ret["validating"] = adaptor_.validating();
return ret;
}
void
RCLConsensus::timerEntry(
NetClock::time_point const& now,
std::unique_ptr<std::stringstream> const& clog)
{
try
{
std::scoped_lock const _{mutex_};
consensus_.timerEntry(now, clog);
}
catch (SHAMapMissingNode const& mn)
{
// This should never happen
std::stringstream ss;
ss << "During consensus timerEntry: " << mn.what();
JLOG(j_.error()) << ss.str();
CLOG(clog) << ss.str();
rethrow();
}
}
void
RCLConsensus::gotTxSet(NetClock::time_point const& now, RCLTxSet const& txSet)
{
try
{
std::scoped_lock const _{mutex_};
consensus_.gotTxSet(now, txSet);
}
catch (SHAMapMissingNode const& mn)
{
// This should never happen
JLOG(j_.error()) << "During consensus gotTxSet: " << mn.what();
rethrow();
}
}
//! @see Consensus::simulate
void
RCLConsensus::simulate(
NetClock::time_point const& now,
std::optional<std::chrono::milliseconds> consensusDelay)
{
std::scoped_lock const _{mutex_};
consensus_.simulate(now, consensusDelay);
}
bool
RCLConsensus::peerProposal(NetClock::time_point const& now, RCLCxPeerPos const& newProposal)
{
std::scoped_lock const _{mutex_};
return consensus_.peerProposal(now, newProposal);
}
bool
RCLConsensus::Adaptor::preStartRound(RCLCxLedger const& prevLgr, hash_set<NodeID> const& nowTrusted)
{
// We have a key, we do not want out of sync validations after a restart
// and are not amendment blocked.
validating_ = validatorKeys_.keys && prevLgr.seq() >= app_.getMaxDisallowedLedger() &&
!app_.getOPs().isBlocked();
// If we are not running in standalone mode and there's a configured UNL,
// check to make sure that it's not expired.
if (validating_ && !app_.config().standalone() && (app_.getValidators().count() != 0u))
{
auto const when = app_.getValidators().expires();
if (!when || *when < app_.getTimeKeeper().now())
{
JLOG(j_.error()) << "Voluntarily bowing out of consensus process "
"because of an expired validator list.";
validating_ = false;
}
}
bool const synced = app_.getOPs().getOperatingMode() == OperatingMode::FULL;
if (validating_)
{
JLOG(j_.info()) << "Entering consensus process, validating, synced="
<< (synced ? "yes" : "no");
}
else
{
// Otherwise we just want to monitor the validation process.
JLOG(j_.info()) << "Entering consensus process, watching, synced="
<< (synced ? "yes" : "no");
}
// Notify inbound ledgers that we are starting a new round
inboundTransactions_.newRound(prevLgr.seq());
// Notify NegativeUNLVote that new validators are added
if (!nowTrusted.empty())
nUnlVote_.newValidators(prevLgr.seq() + 1, nowTrusted);
startRoundTracing(prevLgr);
// propose only if we're in sync with the network (and validating)
return validating_ && synced;
}
bool
RCLConsensus::Adaptor::haveValidated() const
{
return ledgerMaster_.haveValidated();
}
LedgerIndex
RCLConsensus::Adaptor::getValidLedgerIndex() const
{
return ledgerMaster_.getValidLedgerIndex();
}
std::pair<std::size_t, hash_set<RCLConsensus::Adaptor::NodeKey_t>>
RCLConsensus::Adaptor::getQuorumKeys() const
{
return app_.getValidators().getQuorumKeys();
}
std::size_t
RCLConsensus::Adaptor::laggards(
Ledger_t::Seq const seq,
hash_set<RCLConsensus::Adaptor::NodeKey_t>& trustedKeys) const
{
return app_.getValidations().laggards(seq, trustedKeys);
}
bool
RCLConsensus::Adaptor::validator() const
{
return validatorKeys_.keys.has_value();
}
void
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
{
if ((positions == 0u) && app_.getOPs().isFull())
app_.getOPs().setMode(OperatingMode::CONNECTED);
}
void
RCLConsensus::Adaptor::startRoundTracing(RCLCxLedger const& prevLgr)
{
namespace cs = telemetry::consensus::span;
// Capture the prior round's context BEFORE the new span overwrites
// roundSpanContext_; used to add a follows-from link from the new
// round's span so consecutive rounds remain navigable.
prevRoundSpanContext_ = roundSpanContext_;
// Reset the prior accept context so a stale value can't be used to
// link a validation that fires before this round's accept span exists.
acceptSpanContext_ = telemetry::SpanContext{};
if (roundSpan_)
roundSpan_.reset();
auto const& strategy = app_.getTelemetry().getConsensusTraceStrategy();
telemetry::SpanContext const* const link =
prevRoundSpanContext_.isValid() ? &prevRoundSpanContext_ : nullptr;
if (strategy == "deterministic")
{
roundSpan_.emplace(
telemetry::SpanGuard::hashSpan(
telemetry::TraceCategory::Consensus,
cs::round,
prevLgr.id().data(),
prevLgr.id().kBytes,
link));
}
else
{
roundSpan_.emplace(
telemetry::SpanGuard::span(
telemetry::TraceCategory::Consensus, telemetry::seg::consensus, cs::op::round));
}
if (!*roundSpan_)
return;
roundSpan_->setAttribute(cs::attr::ledgerId, to_string(prevLgr.id()).c_str());
roundSpan_->setAttribute(cs::attr::ledgerSeq, static_cast<int64_t>(prevLgr.seq()) + 1);
roundSpan_->setAttribute(cs::attr::mode, toDisplayString(mode_.load()).c_str());
roundSpan_->setAttribute(cs::attr::traceStrategy, strategy.c_str());
roundSpan_->setAttribute(cs::attr::roundId, static_cast<int64_t>(prevLgr.seq()) + 1);
roundSpan_->setAttribute(cs::attr::previousLedgerSeq, static_cast<int64_t>(prevLgr.seq()));
roundSpan_->setAttribute(cs::attr::previousProposers, static_cast<int64_t>(prevProposers_));
roundSpan_->setAttribute(
cs::attr::previousRoundTimeMs, static_cast<int64_t>(prevRoundTime_.load().count()));
roundSpan_->setAttribute(cs::attr::consensusPhase, "open");
roundSpan_->addEvent(cs::event::phaseOpen);
roundSpanContext_ = roundSpan_->captureContext();
}
std::optional<telemetry::SpanGuard>
RCLConsensus::Adaptor::createValidationSpan()
{
namespace cs = telemetry::consensus::span;
// Prefer linking to the accept span (matches the design diagram and
// the "validation follows acceptance" causal model). Fall back to the
// round span only if the accept context isn't yet captured (e.g.
// tracing started after onAccept, or makeAcceptSpan returned a null
// guard).
if (acceptSpanContext_.isValid())
{
return telemetry::SpanGuard::linkedSpan(cs::validationSend, acceptSpanContext_);
}
if (!roundSpanContext_.isValid())
{
return std::nullopt;
}
return telemetry::SpanGuard::linkedSpan(cs::validationSend, roundSpanContext_);
}
void
RCLConsensus::Adaptor::onPhaseEvent(std::string_view eventName, std::string_view phaseLabel)
{
namespace cs = telemetry::consensus::span;
if (!roundSpan_ || !*roundSpan_)
{
return;
}
roundSpan_->addEvent(eventName);
if (!phaseLabel.empty())
{
roundSpan_->setAttribute(cs::attr::consensusPhase, phaseLabel);
}
}
void
RCLConsensus::Adaptor::onOutcomeEvent(std::string_view eventName)
{
if (!roundSpan_ || !*roundSpan_)
{
return;
}
roundSpan_->addEvent(eventName);
}
void
RCLConsensus::startRound(
NetClock::time_point const& now,
RCLCxLedger::ID const& prevLgrId,
RCLCxLedger const& prevLgr,
hash_set<NodeID> const& nowUntrusted,
hash_set<NodeID> const& nowTrusted,
std::unique_ptr<std::stringstream> const& clog)
{
std::scoped_lock const _{mutex_};
consensus_.startRound(
now, prevLgrId, prevLgr, nowUntrusted, adaptor_.preStartRound(prevLgr, nowTrusted), clog);
}
RclConsensusLogger::RclConsensusLogger(char const* label, bool const validating, beast::Journal j)
: j_(j)
{
if (!validating && !j.info())
return;
start_ = std::chrono::steady_clock::now();
ss_ = std::make_unique<std::stringstream>();
header_ = "ConsensusLogger ";
header_ += label;
header_ += ": ";
}
RclConsensusLogger::~RclConsensusLogger()
{
if (!ss_)
return;
auto const duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start_);
std::stringstream outSs;
outSs << header_ << "duration " << (duration.count() / 1000) << '.' << std::setw(3)
<< std::setfill('0') << (duration.count() % 1000) << "s. " << ss_->str();
j_.sink().writeAlways(beast::Severity::Info, outSs.str());
}
} // namespace xrpl