diff --git a/src/test/consensus/Consensus_test.cpp b/src/test/consensus/Consensus_test.cpp index 63d525370..4096e0f02 100644 --- a/src/test/consensus/Consensus_test.cpp +++ b/src/test/consensus/Consensus_test.cpp @@ -1095,6 +1095,42 @@ public: } } + void + testRngCommitRevealConvergesWithTransactions() + { + using namespace csf; + using namespace std::chrono; + + testcase("RNG commit/reveal converges with non-empty tx set"); + + ConsensusParms const parms{}; + Sim sim; + PeerGroup peers = sim.createGroup(5); + + peers.trustAndConnect( + peers, round(0.2 * parms.ledgerGRANULARITY)); + + for (Peer* peer : peers) + { + peer->enableRngConsensus_ = true; + peer->submit(Tx(static_cast(peer->id))); + } + + sim.run(1); + + if (BEAST_EXPECT(sim.synchronized())) + { + for (Peer const* peer : peers) + { + auto const& lcl = peer->lastClosedLedger; + BEAST_EXPECT(!peer->lastEntropyWasFallback_); + BEAST_EXPECT(peer->lastEntropyCount_ > 0); + BEAST_EXPECT(peer->lastEntropyDigest_ != uint256{}); + BEAST_EXPECT(lcl.txs().size() > 0); + } + } + } + void testRngImpossibleQuorumFallback() { @@ -1489,6 +1525,7 @@ public: testPreferredByBranch(); testPauseForLaggards(); testRngCommitRevealConverges(); + testRngCommitRevealConvergesWithTransactions(); testRngImpossibleQuorumFallback(); testRngTimeoutWithPartialQuorum(); testRngCommitSetConflictForcesFallback(); diff --git a/src/test/rpc/RuntimeConfig_test.cpp b/src/test/rpc/RuntimeConfig_test.cpp index efe9d0821..307e2604d 100644 --- a/src/test/rpc/RuntimeConfig_test.cpp +++ b/src/test/rpc/RuntimeConfig_test.cpp @@ -413,6 +413,54 @@ class RuntimeConfig_test : public beast::unit_test::suite BEAST_EXPECT(cfg->rngClaimDropPctX100 == 0); // clamped to 0% } + void + testExplicitFinalProposalToggle() + { + testcase("explicit_final_proposal round-trips and merges"); + using namespace test::jtx; + Env env{*this}; + + // Global default for this node: skip explicit final proposal. + { + Json::Value params; + params["set"] = Json::objectValue; + params["set"]["*"] = Json::objectValue; + params["set"]["*"]["explicit_final_proposal"] = false; + auto result = runtimeConfig(env, params); + + auto const& global = result["configs"]["*"]; + BEAST_EXPECT(global["explicit_final_proposal"].asBool() == false); + } + + auto& rc = env.app().getRuntimeConfig(); + BEAST_EXPECT(rc.active()); + + // Global view is false. + auto globalCfg = rc.getConfig("*"); + BEAST_EXPECT(globalCfg.has_value()); + BEAST_EXPECT(globalCfg->explicitFinalProposal.has_value()); + BEAST_EXPECT(*globalCfg->explicitFinalProposal == false); + + // Per-peer override can re-enable. + { + Json::Value params; + params["set"] = Json::objectValue; + params["set"]["10.0.0.2:51235"] = Json::objectValue; + params["set"]["10.0.0.2:51235"]["explicit_final_proposal"] = true; + runtimeConfig(env, params); + } + + auto peerCfg = rc.getConfig("10.0.0.2:51235"); + BEAST_EXPECT(peerCfg.has_value()); + BEAST_EXPECT(peerCfg->explicitFinalProposal.has_value()); + BEAST_EXPECT(*peerCfg->explicitFinalProposal == true); + + auto otherCfg = rc.getConfig("10.0.0.3:51235"); + BEAST_EXPECT(otherCfg.has_value()); + BEAST_EXPECT(otherCfg->explicitFinalProposal.has_value()); + BEAST_EXPECT(*otherCfg->explicitFinalProposal == false); + } + void testPerPeerClearInheritedFilter() { @@ -472,6 +520,7 @@ public: testDropPctClamping(); testRngClaimDropPct(); testRngClaimDropPctClamping(); + testExplicitFinalProposalToggle(); testPerPeerClearInheritedFilter(); } }; diff --git a/src/xrpld/app/consensus/RCLConsensus.cpp b/src/xrpld/app/consensus/RCLConsensus.cpp index ac498abde..a49f0c50f 100644 --- a/src/xrpld/app/consensus/RCLConsensus.cpp +++ b/src/xrpld/app/consensus/RCLConsensus.cpp @@ -1317,6 +1317,18 @@ RCLConsensus::Adaptor::pendingCommitCount() const return pendingCommits_.size(); } +std::size_t +RCLConsensus::Adaptor::pendingRevealCount() const +{ + return pendingReveals_.size(); +} + +std::size_t +RCLConsensus::Adaptor::expectedProposerCount() const +{ + return expectedProposers_.size(); +} + bool RCLConsensus::Adaptor::hasQuorumOfCommits() const { @@ -1372,6 +1384,129 @@ RCLConsensus::Adaptor::hasAnyReveals() const return !pendingReveals_.empty(); } +bool +RCLConsensus::Adaptor::shouldSendExplicitFinalProposal() const +{ + // Explicit-final-proposal policy is node-local and experimental. + // + // Default behavior is implicit finalization (no extra seq=4 proposal): + // entropy pseudo-tx is injected in onAccept/buildLCL. + // + // We only enable explicit-final when operators intentionally opt in via + // runtime config/env for measurement/diagnostics. + // + // TBD (2026-03-03): Keep collecting tx-bearing network data before + // revisiting whether explicit-final can be safely promoted beyond + // experimental use. + auto const cfg = app_.getRuntimeConfig().getConfig("*"); + if (cfg && cfg->explicitFinalProposal.has_value()) + return *cfg->explicitFinalProposal; + return false; +} + +std::optional +RCLConsensus::Adaptor::buildExplicitFinalProposalTxSet( + RCLTxSet const& txns, + LedgerIndex seq) +{ + JLOG(j_.debug()) << "RNGFINAL: build synthetic txset" + << " baseTxSet=" << txns.id() << " seq=" << seq + << " commits=" << pendingCommits_.size() + << " reveals=" << pendingReveals_.size() + << " failed=" << entropyFailed_; + + uint256 finalEntropy; + bool hasEntropy = false; + + // Keep this entropy-selection logic aligned with injectEntropyPseudoTx(). + // If these paths drift, different nodes can derive different synthetic + // hashes for the same round, which is especially harmful because this + // path mutates proposal tx-set identity late in establish. + if (app_.config().standalone()) + { + finalEntropy = sha512Half(std::string("standalone-entropy"), seq); + hasEntropy = true; + } + else if (entropyFailed_ || pendingReveals_.empty()) + { + finalEntropy.zero(); + hasEntropy = true; + } + else + { + std::vector> sorted; + sorted.reserve(pendingReveals_.size()); + + for (auto const& [nodeId, reveal] : pendingReveals_) + { + auto it = nodeIdToKey_.find(nodeId); + if (it != nodeIdToKey_.end()) + sorted.emplace_back(it->second, reveal); + } + + if (!sorted.empty()) + { + std::sort( + sorted.begin(), sorted.end(), [](auto const& a, auto const& b) { + return a.first.slice() < b.first.slice(); + }); + + Serializer s; + for (auto const& [key, reveal] : sorted) + { + s.addVL(key.slice()); + s.addBitString(reveal); + } + finalEntropy = sha512Half(s.slice()); + hasEntropy = true; + } + } + + if (!hasEntropy) + { + JLOG(j_.debug()) << "RNGFINAL: no entropy available for synthetic txset" + << " baseTxSet=" << txns.id() << " seq=" << seq; + return std::nullopt; + } + + auto const entropyCount = static_cast( + app_.config().standalone() ? 20 + : (entropyFailed_ || pendingReveals_.empty() + ? 0 + : pendingReveals_.size())); + + STTx tx(ttCONSENSUS_ENTROPY, [&](auto& obj) { + obj.setFieldU32(sfLedgerSequence, seq); + obj.setAccountID(sfAccount, AccountID{}); + obj.setFieldU32(sfSequence, 0); + obj.setFieldAmount(sfFee, STAmount{}); + obj.setFieldH256(sfDigest, finalEntropy); + obj.setFieldU16(sfEntropyCount, entropyCount); + }); + + auto const txID = tx.getTransactionID(); + if (txns.exists(txID)) + { + JLOG(j_.debug()) << "RNGFINAL: pseudo-tx already in base set" + << " txid=" << txID << " txSet=" << txns.id(); + return txns; + } + + RCLTxSet::MutableTxSet mutableTxSet{txns}; + Serializer ser(512); + tx.add(ser); + mutableTxSet.insert(RCLCxTx{make_shamapitem(txID, ser.slice())}); + auto syntheticSet = RCLTxSet{mutableTxSet}; + auto const hash = syntheticSet.id(); + inboundTransactions_.giveSet(hash, syntheticSet.map_, false); + + JLOG(j_.debug()) << "RNGFINAL: built synthetic txset" + << " hash=" << hash << " baseTxSet=" << txns.id() + << " txid=" << txID << " entropyCount=" << entropyCount; + + return syntheticSet; +} + uint256 RCLConsensus::Adaptor::buildCommitSet(LedgerIndex seq) { @@ -1470,9 +1605,14 @@ RCLConsensus::Adaptor::buildEntropySet(LedgerIndex seq) obj.setFieldAmount(sfFee, STAmount{}); obj.setFieldH256(sfDigest, reveal); obj.setFieldVL(sfSigningPubKey, kit->second.slice()); - auto proofIt = proposalProofs_.find(nid); - if (proofIt != proposalProofs_.end()) - obj.setFieldVL(sfBlob, serializeProof(proofIt->second)); + // Intentionally omit sfBlob for reveal-set entries. + // + // Reveal proofs are timing-dependent (seq/closeTime/signature can + // differ while the reveal digest is identical), which makes the + // entropy-set hash non-deterministic across nodes under packet + // loss/reordering. We only need deterministic reveal material + // (validator identity + digest) for fetch/merge and entropy + // calculation. }); Serializer s(2048); @@ -1726,25 +1866,32 @@ RCLConsensus::Adaptor::handleAcquiredRngSet(std::shared_ptr const& map) return; } - if (!stx->isFieldPresent(sfBlob)) + std::optional parsedProof; + if (stx->isFieldPresent(sfBlob)) { - JLOG(j_.warn()) - << "RNG: rejecting proofless entry from " << nodeId - << " in acquired set (" << sourceTag << ")"; - return; + auto const proofBlob = stx->getFieldVL(sfBlob); + if (!verifyProof(proofBlob, pubKey, digest, isCommitSet)) + { + JLOG(j_.warn()) << "RNG: invalid proof from " << nodeId + << " in acquired set (" << sourceTag << ")"; + return; + } + parsedProof = deserializeProof(proofBlob); + if (!parsedProof) + { + JLOG(j_.warn()) + << "RNG: rejecting malformed proof from " << nodeId + << " in acquired set (" << sourceTag << ")"; + return; + } } - auto const proofBlob = stx->getFieldVL(sfBlob); - if (!verifyProof(proofBlob, pubKey, digest, isCommitSet)) - { - JLOG(j_.warn()) << "RNG: invalid proof from " << nodeId - << " in acquired set (" << sourceTag << ")"; - return; - } - auto parsedProof = deserializeProof(proofBlob); - if (!parsedProof) + else if (isCommitSet) { + // Commit entries must carry a verifiable proposal proof. + // Without this, an attacker could inject arbitrary digests + // for trusted node IDs via fetched sets. JLOG(j_.warn()) - << "RNG: rejecting malformed proof from " << nodeId + << "RNG: rejecting proofless commit entry from " << nodeId << " in acquired set (" << sourceTag << ")"; return; } @@ -1805,15 +1952,15 @@ RCLConsensus::Adaptor::handleAcquiredRngSet(std::shared_ptr const& map) // byte-identical SHAMap leaves for these entries. if (isCommitSet) { - if (parsedProof->proposeSeq == 0) + if (parsedProof && parsedProof->proposeSeq == 0) commitProofs_.insert_or_assign(nodeId, *parsedProof); - else + else if (parsedProof) JLOG(j_.debug()) << "RNG: commit proof from " << nodeId << " has non-zero proposeSeq=" << parsedProof->proposeSeq << "; not caching for commitSet rebuild"; } - else + else if (parsedProof) { proposalProofs_.insert_or_assign(nodeId, *parsedProof); } @@ -2012,7 +2159,21 @@ RCLConsensus::Adaptor::injectEntropyPseudoTx( obj.setFieldU16(sfEntropyCount, entropyCount); }); - retriableTxs.insert(std::make_shared(std::move(tx))); + auto const txID = tx.getTransactionID(); + auto alreadyPresent = std::any_of( + retriableTxs.begin(), retriableTxs.end(), [&](auto const& entry) { + return entry.first.getTXID() == txID; + }); + if (alreadyPresent) + { + JLOG(j_.debug()) + << "RNG: entropy pseudo-tx already present, skip duplicate " + << txID; + } + else + { + retriableTxs.insert(std::make_shared(std::move(tx))); + } } //@@end rng-inject-pseudotx diff --git a/src/xrpld/app/consensus/RCLConsensus.h b/src/xrpld/app/consensus/RCLConsensus.h index f876e82c8..a65662f77 100644 --- a/src/xrpld/app/consensus/RCLConsensus.h +++ b/src/xrpld/app/consensus/RCLConsensus.h @@ -247,6 +247,14 @@ class RCLConsensus std::size_t pendingCommitCount() const; + /** Number of pending reveals (for diagnostics) */ + std::size_t + pendingRevealCount() const; + + /** Number of expected proposers this round (for diagnostics) */ + std::size_t + expectedProposerCount() const; + /** Check if we have quorum of commits */ bool hasQuorumOfCommits() const; @@ -259,6 +267,18 @@ class RCLConsensus bool hasAnyReveals() const; + /** Whether to send an explicit final proposal (seq=4 style). */ + bool + shouldSendExplicitFinalProposal() const; + + /** Build synthetic tx-set that includes consensus-entropy pseudo-tx. + + Used only for optional explicit-final-proposal experiments. This + does not mutate RNG state. + */ + std::optional + buildExplicitFinalProposalTxSet(RCLTxSet const& txns, LedgerIndex seq); + /** Build real SHAMap from collected commits, register for fetch. @param seq The ledger sequence being built @return The SHAMap root hash (commitSetHash) diff --git a/src/xrpld/app/misc/RuntimeConfig.h b/src/xrpld/app/misc/RuntimeConfig.h index 051f1baf6..9aae1d8ec 100644 --- a/src/xrpld/app/misc/RuntimeConfig.h +++ b/src/xrpld/app/misc/RuntimeConfig.h @@ -40,6 +40,13 @@ struct ConfigVals std::optional sendDelayJitterMs; std::optional sendDropPctX100; // 0-10000 (pct * 100, avoids float) std::optional rngClaimDropPctX100; // 0-10000 (pct * 100) + // Controls explicit final proposal broadcast in the RNG reveal phase. + // true = attempt explicit-final proposal (experimental) + // false = keep implicit mode (recommended default for production) + // + // NOTE: This knob is intentionally explicit opt-in. The consensus system + // is fully functional without it via accept-time pseudo-tx injection. + std::optional explicitFinalProposal; // If set (non-nullopt), only apply to these TrafficCount::category values. // nullopt = not specified (inherit from global on merge). // Empty set = explicitly "all categories" (overrides global filter). @@ -59,7 +66,8 @@ struct ConfigVals return (sendDelayMs && *sendDelayMs > 0) || (sendDelayJitterMs && *sendDelayJitterMs > 0) || (sendDropPctX100 && *sendDropPctX100 > 0) || - (rngClaimDropPctX100 && *rngClaimDropPctX100 > 0); + (rngClaimDropPctX100 && *rngClaimDropPctX100 > 0) || + explicitFinalProposal.has_value(); } /** Merge other on top of this — other's set fields override. */ @@ -75,6 +83,8 @@ struct ConfigVals result.sendDropPctX100 = other.sendDropPctX100; if (other.rngClaimDropPctX100) result.rngClaimDropPctX100 = other.rngClaimDropPctX100; + if (other.explicitFinalProposal.has_value()) + result.explicitFinalProposal = other.explicitFinalProposal; if (other.messageCategories) result.messageCategories = other.messageCategories; return result; diff --git a/src/xrpld/app/misc/detail/RuntimeConfig.cpp b/src/xrpld/app/misc/detail/RuntimeConfig.cpp index 63f9e58d7..a1e7c56a5 100644 --- a/src/xrpld/app/misc/detail/RuntimeConfig.cpp +++ b/src/xrpld/app/misc/detail/RuntimeConfig.cpp @@ -23,12 +23,34 @@ #include #include +#include #include #include +#include namespace ripple { namespace { +std::optional +parseBoolEnv(char const* env) +{ + if (!env) + return std::nullopt; + + std::string value{env}; + std::transform( + value.begin(), value.end(), value.begin(), [](unsigned char c) { + return static_cast(std::tolower(c)); + }); + + if (value == "1" || value == "true" || value == "yes" || value == "on") + return true; + if (value == "0" || value == "false" || value == "no" || value == "off") + return false; + + return std::nullopt; +} + ConfigVals parseConfigVals(Json::Value const& v) { @@ -43,6 +65,8 @@ parseConfigVals(Json::Value const& v) if (v.isMember("rng_claim_drop_pct")) cfg.rngClaimDropPctX100 = static_cast(v["rng_claim_drop_pct"].asDouble() * 100); + if (v.isMember("explicit_final_proposal")) + cfg.explicitFinalProposal = v["explicit_final_proposal"].asBool(); return cfg; } } // namespace @@ -75,6 +99,11 @@ RuntimeConfig::RuntimeConfig() global.sendDropPctX100 = static_cast(std::atof(env) * 100); if (auto const* env = std::getenv("XAHAU_RNG_CLAIM_DROP_PCT")) global.rngClaimDropPctX100 = static_cast(std::atof(env) * 100); + // Explicit-final proposal is intentionally opt-in and defaults to + // implicit behavior when unset. + if (auto parsed = + parseBoolEnv(std::getenv("XAHAUD_EXPLICIT_FINAL_PROPOSAL"))) + global.explicitFinalProposal = *parsed; if (global.active()) { diff --git a/src/xrpld/consensus/Consensus.h b/src/xrpld/consensus/Consensus.h index 27d47c12a..1e6651d82 100644 --- a/src/xrpld/consensus/Consensus.h +++ b/src/xrpld/consensus/Consensus.h @@ -616,6 +616,7 @@ private: EstablishState estState_{EstablishState::ConvergingTx}; std::chrono::steady_clock::time_point revealPhaseStart_{}; std::chrono::steady_clock::time_point commitHashConflictStart_{}; + bool explicitFinalProposalSent_{false}; MonitoredMode mode_{ConsensusMode::observing}; bool firstRound_ = true; bool haveCloseTimeConsensus_ = false; @@ -789,6 +790,7 @@ Consensus::startRoundInternal( estState_ = EstablishState::ConvergingTx; revealPhaseStart_ = {}; commitHashConflictStart_ = {}; + explicitFinalProposalSent_ = false; closeResolution_ = getNextLedgerTimeResolution( previousLedger_.closeTimeResolution(), @@ -1561,18 +1563,43 @@ Consensus::phaseEstablish( { CLOG(clog) << "ledgerMIN_CONSENSUS not reached: " << parms.ledgerMIN_CONSENSUS.count() << "ms. "; + JLOG(j_.debug()) << "STALLDIAG: establish wait ledgerMIN_CONSENSUS" + << " roundMs=" << result_->roundTime.read().count() + << " minMs=" << parms.ledgerMIN_CONSENSUS.count() + << " peerPositions=" << currPeerPositions_.size() + << " prevProposers=" << prevProposers_ + << " phase=" << to_string(phase_) + << " mode=" << to_string(mode_.get()); return; } updateOurPositions(clog); + bool const paused = shouldPause(clog); + bool const txConsensus = paused ? false : haveConsensus(clog); // Nothing to do if too many laggards or we don't have consensus. - if (shouldPause(clog) || !haveConsensus(clog)) + if (paused || !txConsensus) + { + JLOG(j_.info()) << "STALLDIAG: establish gate blocked" + << " reason=" << (paused ? "pause" : "no-tx-consensus") + << " roundMs=" << result_->roundTime.read().count() + << " peerPositions=" << currPeerPositions_.size() + << " prevProposers=" << prevProposers_ + << " phase=" << to_string(phase_) + << " mode=" << to_string(mode_.get()); return; + } if (!haveCloseTimeConsensus_) { JLOG(j_.info()) << "We have TX consensus but not CT consensus"; + JLOG(j_.info()) << "STALLDIAG: establish gate blocked" + << " reason=no-close-time-consensus" + << " roundMs=" << result_->roundTime.read().count() + << " peerPositions=" << currPeerPositions_.size() + << " prevProposers=" << prevProposers_ + << " phase=" << to_string(phase_) + << " mode=" << to_string(mode_.get()); CLOG(clog) << "We have TX consensus but not CT consensus. "; return; } @@ -1607,9 +1634,96 @@ Consensus::phaseEstablish( }) { auto const buildSeq = previousLedger_.seq() + typename Ledger_t::Seq{1}; + auto const estStateName = [&]() -> char const* { + switch (estState_) + { + case EstablishState::ConvergingTx: + return "ConvergingTx"; + case EstablishState::ConvergingCommit: + return "ConvergingCommit"; + case EstablishState::ConvergingReveal: + return "ConvergingReveal"; + } + return "Unknown"; + }; + auto logRngDiag = [&](char const* reason) { + auto const ourPos = result_->position.position(); + auto const participants = currPeerPositions_.size() + 1; + JLOG(j_.info()) + << "STALLDIAG: " << reason << " state=" << estStateName() + << " phase=" << to_string(phase_) + << " mode=" << to_string(mode_.get()) + << " roundMs=" << result_->roundTime.read().count() + << " convergePct=" << convergePercent_ + << " participants=" << participants + << " peerPositions=" << currPeerPositions_.size() + << " prevProposers=" << prevProposers_ << " explicitFinalSent=" + << (explicitFinalProposalSent_ ? "yes" : "no") + << " closeTimeConsensus=" + << (haveCloseTimeConsensus_ ? "yes" : "no") + << " txSet=" << ourPos; + + if constexpr (requires(Position_t const& p) { p.commitSetHash; }) + { + JLOG(j_.info()) + << "STALLDIAG: sidecar" + << " commitSetHash=" + << (ourPos.commitSetHash ? to_string(*ourPos.commitSetHash) + : std::string{"none"}) + << " entropySetHash=" + << (ourPos.entropySetHash + ? to_string(*ourPos.entropySetHash) + : std::string{"none"}) + << " myCommitment=" << (ourPos.myCommitment ? "yes" : "no") + << " myReveal=" << (ourPos.myReveal ? "yes" : "no"); + } + + if constexpr (requires(Adaptor& a) { + a.pendingCommitCount(); + a.quorumThreshold(); + a.hasQuorumOfCommits(); + a.hasMinimumReveals(); + a.hasAnyReveals(); + }) + { + auto const commits = adaptor_.pendingCommitCount(); + auto const quorum = adaptor_.quorumThreshold(); + auto const commitQuorum = adaptor_.hasQuorumOfCommits(); + auto const minReveals = adaptor_.hasMinimumReveals(); + auto const anyReveals = adaptor_.hasAnyReveals(); + std::optional reveals; + std::optional expected; + if constexpr (requires(Adaptor& a) { a.pendingRevealCount(); }) + reveals = adaptor_.pendingRevealCount(); + if constexpr (requires(Adaptor& a) { + a.expectedProposerCount(); + }) + expected = adaptor_.expectedProposerCount(); + + JLOG(j_.info()) + << "STALLDIAG: rng-counters" + << " commits=" << commits << " quorum=" << quorum + << " commitQuorum=" << (commitQuorum ? "yes" : "no") + << " reveals=" + << (reveals ? std::to_string(*reveals) : std::string{"n/a"}) + << " minReveals=" << (minReveals ? "yes" : "no") + << " anyReveals=" << (anyReveals ? "yes" : "no") + << " expectedProposers=" + << (expected ? std::to_string(*expected) + : std::string{"n/a"}); + } + }; auto publishEntropySet = [&]() { auto entropySetHash = adaptor_.buildEntropySet(buildSeq); auto newPos = result_->position.position(); + if (newPos.entropySetHash && + *newPos.entropySetHash == entropySetHash) + { + JLOG(j_.debug()) << "RNG: entropySet already published hash=" + << entropySetHash; + return; + } + newPos.entropySetHash = entropySetHash; result_->position.changePosition( @@ -1617,13 +1731,6 @@ Consensus::phaseEstablish( // Publish entropySetHash before accepting so lagging peers // can fetch/merge reveal sets in ConvergingReveal. - // - // Experiment note: - // We tested single-broadcaster seq=3 fanout reduction and - // observed frequent syncing/mismatch loops under stressed - // packet drops (e.g. 25% dropped RNG claim traffic). Keep - // full proposer broadcast here for robustness; the extra - // proposal chatter is the explicit cost we pay. if (mode_.get() == ConsensusMode::proposing) adaptor_.propose(result_->position); @@ -1687,6 +1794,7 @@ Consensus::phaseEstablish( JLOG(j_.debug()) << "RNG: skipping commit wait (participants=" << participants << " < threshold=" << threshold << ")"; + logRngDiag("rng-commit-wait-impossible-quorum"); // Fall through to close with zero entropy } else @@ -1694,7 +1802,10 @@ Consensus::phaseEstablish( bool timeout = result_->roundTime.read() > parms.rngPIPELINE_TIMEOUT; if (!timeout) + { + logRngDiag("rng-commit-wait"); return; // Wait for more commits + } // Timeout waiting for all expected proposers. // If we still have quorum (80% of UNL), proceed @@ -1726,6 +1837,7 @@ Consensus::phaseEstablish( << " (timeout fallback)"; return; } + logRngDiag("rng-commit-timeout-below-quorum"); // Truly below quorum: fall through to zero entropy } } @@ -1799,6 +1911,7 @@ Consensus::phaseEstablish( JLOG(j_.warn()) << "RNG: conflicting commitSetHash detected; " "waiting briefly for convergence/fetch"; + logRngDiag("rng-commit-conflict-start"); return; } @@ -1815,6 +1928,7 @@ Consensus::phaseEstablish( std::chrono::milliseconds>(conflictElapsed) .count() << "ms; staying in ConvergingCommit"; + logRngDiag("rng-commit-conflict-wait"); return; } @@ -1830,6 +1944,7 @@ Consensus::phaseEstablish( JLOG(j_.warn()) << "RNG: commitSetHash conflict persisted; forcing " "zero-entropy fallback"; + logRngDiag("rng-commit-conflict-timeout-fallback"); return; } } @@ -1863,6 +1978,7 @@ Consensus::phaseEstablish( } else { + logRngDiag("rng-reveal-wait-after-transition"); return; // Wait for next tick } } @@ -1874,28 +1990,271 @@ Consensus::phaseEstablish( std::chrono::steady_clock::now() - revealPhaseStart_; bool timeout = elapsed > parms.rngREVEAL_TIMEOUT; bool ready = false; + bool const revealConsensus = + haveConsensus(clog) && adaptor_.hasMinimumReveals(); - if ((haveConsensus(clog) && adaptor_.hasMinimumReveals()) || - timeout) + if (revealConsensus || timeout) { + JLOG(j_.info()) + << "STALLDIAG: rng-reveal-gate-open" + << " revealConsensus=" << (revealConsensus ? "yes" : "no") + << " timeout=" << (timeout ? "yes" : "no") << " elapsedMs=" + << std::chrono::duration_cast( + elapsed) + .count(); if (timeout && !adaptor_.hasAnyReveals()) { adaptor_.setEntropyFailed(); JLOG(j_.warn()) << "RNG: entropy failed (no reveals)"; + logRngDiag("rng-reveal-timeout-no-reveals"); } else { publishEntropySet(); + logRngDiag("rng-reveal-published-entropy-set"); } ready = true; } if (!ready) + { + JLOG(j_.info()) + << "STALLDIAG: rng-reveal-gate-blocked" + << " revealConsensus=" << (revealConsensus ? "yes" : "no") + << " timeout=" << (timeout ? "yes" : "no") << " elapsedMs=" + << std::chrono::duration_cast( + elapsed) + .count(); + logRngDiag("rng-reveal-wait"); return; + } + + // Optional explicit final proposal (seq=4 style): + // publish a synthetic tx-set hash that includes the + // consensus-entropy pseudo-tx just before accept. + // + // IMPORTANT DESIGN NOTE (read before editing this block): + // + // This path is intentionally OPTIONAL and default-off. It exists + // for diagnostics/perf experiments (for example, making monitor + // visibility of the final pseudo-tx set more direct), NOT as a + // required step for consensus correctness. + // + // Why so conservative? + // - The main consensus engine still keys agreement on tx-set hash. + // - Updating our tx-set hash here creates a "late identity + // change" in establish. + // - Under lossy/reordered networks, peers can be slightly out of + // phase: some nodes may have switched to the synthetic hash + // while others are still on the base hash. + // - That can fragment agreement during a critical window (two + // hashes in flight for one ledger), increase proposal chatter, + // and trigger sync churn. + // + // Therefore this logic must remain best-effort only: + // - Never required for liveness/safety. + // - No extra wait tick is introduced. + // - If gates are not met, we skip and continue to accept via the + // normal implicit path (accept-time pseudo-tx injection). + // + // TBD (2026-03-03): We did not find a robust timing model that + // folds this into a guaranteed-safe explicit final proposal across + // lossy/reordered links without increasing churn. Keep this path + // as opt-in for future evaluation. + if constexpr (requires( + Adaptor& a, + TxSet_t const& txns, + typename Ledger_t::Seq seq) { + a.shouldSendExplicitFinalProposal(); + a.hasQuorumOfCommits(); + a.buildExplicitFinalProposalTxSet(txns, seq); + }) + { + bool fullParticipantCoverage = false; + bool entropyAligned = false; + if constexpr (requires(Position_t const& p) { + p.entropySetHash; + }) + { + // Guard against "early switch" churn: + // require at least as many participants as the previous + // round before attempting the explicit-final mutation. + // + // This is a heuristic to reduce risk, not a proof of + // safety. We still keep the feature optional/default-off. + auto const participants = currPeerPositions_.size() + 1; + auto const expectedParticipants = prevProposers_ + 1; + fullParticipantCoverage = + participants >= expectedParticipants; + // Require a majority aligned on entropySetHash before + // mutating tx-set hash. If this threshold is loosened, the + // probability of hash fragmentation rises quickly. + auto const requiredEntropyAligned = + (expectedParticipants / 2) + 1; + auto const ourPos = result_->position.position(); + if (ourPos.entropySetHash) + { + auto const expectedEntropy = *ourPos.entropySetHash; + std::size_t alignedPeers = 0; + bool conflict = false; + for (auto const& [_, peerPos] : currPeerPositions_) + { + auto const& peerPosition = + peerPos.proposal().position(); + if (!peerPosition.entropySetHash) + continue; + if (*peerPosition.entropySetHash == expectedEntropy) + { + ++alignedPeers; + continue; + } + conflict = true; + break; + } + + auto const alignedParticipants = alignedPeers + 1; + entropyAligned = !conflict && + alignedParticipants >= requiredEntropyAligned; + if (!entropyAligned) + { + JLOG(j_.debug()) + << "RNG: explicit-final entropy alignment " + "insufficient" + << " alignedParticipants=" + << alignedParticipants + << " required=" << requiredEntropyAligned + << " conflict=" << (conflict ? "yes" : "no"); + } + } + else + { + JLOG(j_.debug()) + << "RNG: explicit-final waiting on local " + "entropySetHash"; + } + } + + if (mode_.get() == ConsensusMode::proposing && + !explicitFinalProposalSent_ && + adaptor_.hasQuorumOfCommits() && revealConsensus && + fullParticipantCoverage && entropyAligned && + adaptor_.shouldSendExplicitFinalProposal()) + { + // One-shot per round. This avoids repeated mutations/ + // broadcasts from timer ticks, which can amplify network + // chatter in the exact conditions (loss/reordering) where + // this path is already fragile. + auto const synthSet = + adaptor_.buildExplicitFinalProposalTxSet( + result_->txns, buildSeq); + explicitFinalProposalSent_ = true; + + if (synthSet) + { + auto const synthHash = synthSet->id(); + auto currentPos = result_->position.position(); + auto newPos = currentPos; + if constexpr (requires(typename Adaptor::Position_t p) { + p.updateTxSet(synthHash); + }) + newPos.updateTxSet(synthHash); + else + newPos = synthHash; + + if (!(newPos == currentPos)) + { + // WARNING: + // This changes proposal tx-set identity late in + // establish. Keep this path tightly gated and + // optional. The canonical ledger path remains the + // implicit accept-time injection logic. + + // Maintain the invariant that our active + // position's tx-set hash is present in acquired_, + // otherwise gotTxSet can assert if this set arrives + // back from the network. + auto const [_, inserted] = + acquired_.emplace(synthHash, *synthSet); + JLOG(j_.debug()) + << "RNG: cached explicit-final txSet=" + << synthHash << " inserted=" << inserted; + if (inserted) + { + // Make the synthetic set discoverable by peers + // immediately; otherwise they can request this + // hash and hit transient "getTxSet: Failed to + // find TX set" on random peers. + adaptor_.share(*synthSet); + JLOG(j_.debug()) + << "RNG: shared explicit-final txSet=" + << synthHash; + } + result_->position.changePosition( + newPos, + asCloseTime(result_->position.closeTime()), + now_); + adaptor_.propose(result_->position); + JLOG(j_.debug()) + << "RNG: explicit final proposal txSet=" + << synthHash; + logRngDiag("rng-explicit-final-proposed"); + } + } + } + else + { + char const* reason = "disabled"; + if (mode_.get() != ConsensusMode::proposing) + reason = "not-proposing"; + else if (explicitFinalProposalSent_) + reason = "already-sent"; + else if (!adaptor_.hasQuorumOfCommits()) + reason = "no-commit-quorum"; + else if (!revealConsensus) + reason = "reveal-timeout"; + else if (!fullParticipantCoverage) + reason = "participant-gap"; + else if (!entropyAligned) + reason = "entropy-not-aligned"; + JLOG(j_.info()) + << "STALLDIAG: rng-explicit-final-skipped" + << " reason=" << reason + << " mode=" << to_string(mode_.get()) << " sent=" + << (explicitFinalProposalSent_ ? "yes" : "no"); + } + } } } //@@end rng-phase-establish-substates + JLOG(j_.info()) << "STALLDIAG: establish-ready-to-accept" + << " phase=" << to_string(phase_) + << " mode=" << to_string(mode_.get()) + << " roundMs=" << result_->roundTime.read().count() + << " convergePct=" << convergePercent_ + << " peerPositions=" << currPeerPositions_.size() + << " prevProposers=" << prevProposers_ + << " explicitFinalSent=" + << (explicitFinalProposalSent_ ? "yes" : "no"); + if constexpr (requires(Position_t const& p) { + p.commitSetHash; + p.entropySetHash; + p.myCommitment; + p.myReveal; + }) + { + auto const pos = result_->position.position(); + JLOG(j_.info()) << "STALLDIAG: establish-ready-sidecar" + << " txSet=" << pos << " commitSetHash=" + << (pos.commitSetHash ? to_string(*pos.commitSetHash) + : std::string{"none"}) + << " entropySetHash=" + << (pos.entropySetHash ? to_string(*pos.entropySetHash) + : std::string{"none"}) + << " myCommitment=" << (pos.myCommitment ? "yes" : "no") + << " myReveal=" << (pos.myReveal ? "yes" : "no"); + } + JLOG(j_.info()) << "STARTDIAG: converge cutoff" << " peerPositions=" << currPeerPositions_.size() << " roundTime=" << result_->roundTime.read().count() @@ -1929,6 +2288,7 @@ Consensus::closeLedger(std::unique_ptr const& clog) phase_ = ConsensusPhase::establish; estState_ = EstablishState::ConvergingTx; + explicitFinalProposalSent_ = false; JLOG(j_.debug()) << "transitioned to ConsensusPhase::establish"; rawCloseTimes_.self = now_; @@ -2281,6 +2641,33 @@ Consensus::haveConsensus( << " roundTime=" << result_->roundTime.read().count() << "ms" << " mode=" << to_string(mode_.get()); + JLOG(j_.info()) << "STALLDIAG: haveConsensus-self" + << " position=" << ourPosition << " closeTime=" + << result_->position.closeTime().time_since_epoch().count() + << " haveCloseTimeConsensus=" + << (haveCloseTimeConsensus_ ? "yes" : "no") + << " phase=" << to_string(phase_); + if constexpr (requires(Position_t const& p) { + p.commitSetHash; + p.entropySetHash; + p.myCommitment; + p.myReveal; + }) + { + JLOG(j_.info()) << "STALLDIAG: haveConsensus-sidecar" + << " commitSetHash=" + << (ourPosition.commitSetHash + ? to_string(*ourPosition.commitSetHash) + : std::string{"none"}) + << " entropySetHash=" + << (ourPosition.entropySetHash + ? to_string(*ourPosition.entropySetHash) + : std::string{"none"}) + << " myCommitment=" + << (ourPosition.myCommitment ? "yes" : "no") + << " myReveal=" + << (ourPosition.myReveal ? "yes" : "no"); + } // Determine if we actually have consensus or not result_->state = checkConsensus( @@ -2297,6 +2684,11 @@ Consensus::haveConsensus( if (result_->state == ConsensusState::No) { + JLOG(j_.info()) << "STALLDIAG: haveConsensus-result" + << " state=No" + << " agree=" << agree << " disagree=" << disagree + << " total=" << (agree + disagree) + << " finished=" << currentFinished; CLOG(clog) << "No consensus. "; return false; } @@ -2305,6 +2697,7 @@ Consensus::haveConsensus( // without us. if (result_->state == ConsensusState::MovedOn) { + JLOG(j_.warn()) << "STALLDIAG: haveConsensus-result state=MovedOn"; JLOG(j_.error()) << "Unable to reach consensus"; JLOG(j_.error()) << Json::Compact{getJson(true)}; CLOG(clog) << "Unable to reach consensus " diff --git a/src/xrpld/rpc/handlers/RuntimeConfig.cpp b/src/xrpld/rpc/handlers/RuntimeConfig.cpp index 50eee2009..a96793882 100644 --- a/src/xrpld/rpc/handlers/RuntimeConfig.cpp +++ b/src/xrpld/rpc/handlers/RuntimeConfig.cpp @@ -99,6 +99,9 @@ doRuntimeConfig(RPC::JsonContext& context) pct = 100.0; cfg.rngClaimDropPctX100 = static_cast(pct * 100); } + if (v.isMember("explicit_final_proposal")) + cfg.explicitFinalProposal = + v["explicit_final_proposal"].asBool(); if (v.isMember("message_types")) { auto const& mts = v["message_types"]; @@ -154,6 +157,8 @@ doRuntimeConfig(RPC::JsonContext& context) entry["send_drop_pct"] = *cfg.sendDropPctX100 / 100.0; if (cfg.rngClaimDropPctX100) entry["rng_claim_drop_pct"] = *cfg.rngClaimDropPctX100 / 100.0; + if (cfg.explicitFinalProposal.has_value()) + entry["explicit_final_proposal"] = *cfg.explicitFinalProposal; if (cfg.messageCategories) { Json::Value types{Json::arrayValue};