From bb77c2090b5e20d5f37f978468e51ecf368902d5 Mon Sep 17 00:00:00 2001 From: Nicholas Dudfield Date: Fri, 6 Mar 2026 14:09:06 +0700 Subject: [PATCH] consensus: gate RNG substates by amendment state --- src/xrpld/app/consensus/RCLConsensus.cpp | 17 + src/xrpld/app/consensus/RCLConsensus.h | 10 + src/xrpld/consensus/Consensus.h | 1179 ++++++++++++---------- 3 files changed, 645 insertions(+), 561 deletions(-) diff --git a/src/xrpld/app/consensus/RCLConsensus.cpp b/src/xrpld/app/consensus/RCLConsensus.cpp index 70f360973..a8f27466a 100644 --- a/src/xrpld/app/consensus/RCLConsensus.cpp +++ b/src/xrpld/app/consensus/RCLConsensus.cpp @@ -1150,6 +1150,12 @@ RCLConsensus::Adaptor::preStartRound( RCLCxLedger const& prevLgr, hash_set const& nowTrusted) { + rngEnabledThisRound_ = + prevLgr.ledger_->rules().enabled(featureConsensusEntropy); + + JLOG(j_.debug()) << "RNGGATE: preStartRound prevSeq=" << prevLgr.seq() + << " rulesEnabled=" << rngEnabledThisRound_; + // We have a key, we do not want out of sync validations after a restart // and are not amendment blocked. validating_ = validatorKeys_.keys && @@ -1384,6 +1390,12 @@ RCLConsensus::Adaptor::hasAnyReveals() const return !pendingReveals_.empty(); } +bool +RCLConsensus::Adaptor::rngEnabled() const +{ + return rngEnabledThisRound_; +} + bool RCLConsensus::Adaptor::shouldSendExplicitFinalProposal() const { @@ -1676,6 +1688,11 @@ RCLConsensus::Adaptor::clearRngState() expectedProposers_.clear(); commitProofs_.clear(); proposalProofs_.clear(); + // Keep the round-level enable latch intact here. Consensus::startRound() + // calls preStartRound() first to snapshot whether RNG is enabled for the + // upcoming round, then immediately clears per-round working state. + // Resetting rngEnabledThisRound_ here would wipe that snapshot before + // phaseEstablish() can consult it. } void diff --git a/src/xrpld/app/consensus/RCLConsensus.h b/src/xrpld/app/consensus/RCLConsensus.h index a65662f77..9336e79b9 100644 --- a/src/xrpld/app/consensus/RCLConsensus.h +++ b/src/xrpld/app/consensus/RCLConsensus.h @@ -105,6 +105,7 @@ class RCLConsensus // Ephemeral entropy secret (in-memory only, crash = non-revealer) uint256 myEntropySecret_; bool entropyFailed_ = false; + bool rngEnabledThisRound_ = false; // Real SHAMaps for the current round (unbacked, ephemeral) std::shared_ptr commitSetMap_; @@ -267,6 +268,15 @@ class RCLConsensus bool hasAnyReveals() const; + /** Whether ConsensusEntropy is enabled for the current round. + + Latched from the previous ledger's rules at round start so the + generic consensus engine can skip RNG-specific waiting paths when + the amendment is inactive. + */ + bool + rngEnabled() const; + /** Whether to send an explicit final proposal (seq=4 style). */ bool shouldSendExplicitFinalProposal() const; diff --git a/src/xrpld/consensus/Consensus.h b/src/xrpld/consensus/Consensus.h index ebb1598b0..1b66005a6 100644 --- a/src/xrpld/consensus/Consensus.h +++ b/src/xrpld/consensus/Consensus.h @@ -1633,132 +1633,370 @@ Consensus::phaseEstablish( a.generateEntropySecret(); }) { - auto const buildSeq = previousLedger_.seq() + typename Ledger_t::Seq{1}; - auto const estStateName = [&]() -> char const* { - switch (estState_) - { - case EstablishState::ConvergingTx: - return "ConvergingTx"; - case EstablishState::ConvergingCommit: - return "ConvergingCommit"; - case EstablishState::ConvergingReveal: - return "ConvergingReveal"; - } - return "Unknown"; - }; - auto logRngDiag = [&](char const* reason) { - auto const ourPos = result_->position.position(); - auto const participants = currPeerPositions_.size() + 1; - JLOG(j_.info()) - << "STALLDIAG: " << reason << " state=" << estStateName() - << " phase=" << to_string(phase_) - << " mode=" << to_string(mode_.get()) - << " roundMs=" << result_->roundTime.read().count() - << " convergePct=" << convergePercent_ - << " participants=" << participants - << " peerPositions=" << currPeerPositions_.size() - << " prevProposers=" << prevProposers_ << " explicitFinalSent=" - << (explicitFinalProposalSent_ ? "yes" : "no") - << " closeTimeConsensus=" - << (haveCloseTimeConsensus_ ? "yes" : "no") - << " txSet=" << ourPos; + bool rngEnabled = true; + if constexpr (requires(Adaptor& a) { a.rngEnabled(); }) + rngEnabled = adaptor_.rngEnabled(); - if constexpr (requires(Position_t const& p) { p.commitSetHash; }) - { - JLOG(j_.info()) - << "STALLDIAG: sidecar" - << " commitSetHash=" - << (ourPos.commitSetHash ? to_string(*ourPos.commitSetHash) - : std::string{"none"}) - << " entropySetHash=" - << (ourPos.entropySetHash - ? to_string(*ourPos.entropySetHash) - : std::string{"none"}) - << " myCommitment=" << (ourPos.myCommitment ? "yes" : "no") - << " myReveal=" << (ourPos.myReveal ? "yes" : "no"); - } + JLOG(j_.debug()) << "RNGGATE: phaseEstablish prevSeq=" + << previousLedger_.seq() + << " rngEnabled=" << (rngEnabled ? "yes" : "no") + << " estState=" << static_cast(estState_) + << " phase=" << to_string(phase_) + << " mode=" << to_string(mode_.get()) + << " roundMs=" << result_->roundTime.read().count(); - if constexpr (requires(Adaptor& a) { - a.pendingCommitCount(); - a.quorumThreshold(); - a.hasQuorumOfCommits(); - a.hasMinimumReveals(); - a.hasAnyReveals(); - }) - { - auto const commits = adaptor_.pendingCommitCount(); - auto const quorum = adaptor_.quorumThreshold(); - auto const commitQuorum = adaptor_.hasQuorumOfCommits(); - auto const minReveals = adaptor_.hasMinimumReveals(); - auto const anyReveals = adaptor_.hasAnyReveals(); - std::optional reveals; - std::optional expected; - if constexpr (requires(Adaptor& a) { a.pendingRevealCount(); }) - reveals = adaptor_.pendingRevealCount(); - if constexpr (requires(Adaptor& a) { - a.expectedProposerCount(); - }) - expected = adaptor_.expectedProposerCount(); - - JLOG(j_.info()) - << "STALLDIAG: rng-counters" - << " commits=" << commits << " quorum=" << quorum - << " commitQuorum=" << (commitQuorum ? "yes" : "no") - << " reveals=" - << (reveals ? std::to_string(*reveals) : std::string{"n/a"}) - << " minReveals=" << (minReveals ? "yes" : "no") - << " anyReveals=" << (anyReveals ? "yes" : "no") - << " expectedProposers=" - << (expected ? std::to_string(*expected) - : std::string{"n/a"}); - } - }; - auto publishEntropySet = [&]() { - auto entropySetHash = adaptor_.buildEntropySet(buildSeq); - auto newPos = result_->position.position(); - if (newPos.entropySetHash && - *newPos.entropySetHash == entropySetHash) - { - JLOG(j_.debug()) << "RNG: entropySet already published hash=" - << entropySetHash; - return; - } - - newPos.entropySetHash = entropySetHash; - - result_->position.changePosition( - newPos, asCloseTime(result_->position.closeTime()), now_); - - // Publish entropySetHash before accepting so lagging peers - // can fetch/merge reveal sets in ConvergingReveal. - // - // This can look redundant in healthy rounds because txSetHash may - // be unchanged versus the prior proposal (for example, seq=2 and - // seq=3 showing the same tx summary in monitors). We still publish - // to create an additional delivery window for entropySetHash and - // to trigger fetch/merge on peers that missed earlier packets. - if (mode_.get() == ConsensusMode::proposing) - adaptor_.propose(result_->position); - - JLOG(j_.debug()) << "RNG: built entropySet"; - }; - - JLOG(j_.debug()) << "RNG: phaseEstablish estState=" - << static_cast(estState_); - - if (estState_ == EstablishState::ConvergingTx) + if (rngEnabled) { - if (adaptor_.hasQuorumOfCommits()) // all expected proposers (80% - // fallback) - { - auto commitSetHash = adaptor_.buildCommitSet(buildSeq); + auto const buildSeq = + previousLedger_.seq() + typename Ledger_t::Seq{1}; + auto const estStateName = [&]() -> char const* { + switch (estState_) + { + case EstablishState::ConvergingTx: + return "ConvergingTx"; + case EstablishState::ConvergingCommit: + return "ConvergingCommit"; + case EstablishState::ConvergingReveal: + return "ConvergingReveal"; + } + return "Unknown"; + }; + auto logRngDiag = [&](char const* reason) { + auto const ourPos = result_->position.position(); + auto const participants = currPeerPositions_.size() + 1; + JLOG(j_.info()) + << "STALLDIAG: " << reason << " state=" << estStateName() + << " phase=" << to_string(phase_) + << " mode=" << to_string(mode_.get()) + << " roundMs=" << result_->roundTime.read().count() + << " convergePct=" << convergePercent_ + << " participants=" << participants + << " peerPositions=" << currPeerPositions_.size() + << " prevProposers=" << prevProposers_ + << " explicitFinalSent=" + << (explicitFinalProposalSent_ ? "yes" : "no") + << " closeTimeConsensus=" + << (haveCloseTimeConsensus_ ? "yes" : "no") + << " txSet=" << ourPos; - // Keep the same entropy secret from onClose() — do NOT - // regenerate. The commitment in the commitSet was built - // from that original secret; regenerating would make the - // later reveal fail verification. + if constexpr (requires(Position_t const& p) { + p.commitSetHash; + }) + { + JLOG(j_.info()) + << "STALLDIAG: sidecar" + << " commitSetHash=" + << (ourPos.commitSetHash + ? to_string(*ourPos.commitSetHash) + : std::string{"none"}) + << " entropySetHash=" + << (ourPos.entropySetHash + ? to_string(*ourPos.entropySetHash) + : std::string{"none"}) + << " myCommitment=" + << (ourPos.myCommitment ? "yes" : "no") + << " myReveal=" << (ourPos.myReveal ? "yes" : "no"); + } + + if constexpr (requires(Adaptor& a) { + a.pendingCommitCount(); + a.quorumThreshold(); + a.hasQuorumOfCommits(); + a.hasMinimumReveals(); + a.hasAnyReveals(); + }) + { + auto const commits = adaptor_.pendingCommitCount(); + auto const quorum = adaptor_.quorumThreshold(); + auto const commitQuorum = adaptor_.hasQuorumOfCommits(); + auto const minReveals = adaptor_.hasMinimumReveals(); + auto const anyReveals = adaptor_.hasAnyReveals(); + std::optional reveals; + std::optional expected; + if constexpr (requires(Adaptor& a) { + a.pendingRevealCount(); + }) + reveals = adaptor_.pendingRevealCount(); + if constexpr (requires(Adaptor& a) { + a.expectedProposerCount(); + }) + expected = adaptor_.expectedProposerCount(); + + JLOG(j_.info()) + << "STALLDIAG: rng-counters" + << " commits=" << commits << " quorum=" << quorum + << " commitQuorum=" << (commitQuorum ? "yes" : "no") + << " reveals=" + << (reveals ? std::to_string(*reveals) + : std::string{"n/a"}) + << " minReveals=" << (minReveals ? "yes" : "no") + << " anyReveals=" << (anyReveals ? "yes" : "no") + << " expectedProposers=" + << (expected ? std::to_string(*expected) + : std::string{"n/a"}); + } + }; + auto publishEntropySet = [&]() { + auto entropySetHash = adaptor_.buildEntropySet(buildSeq); auto newPos = result_->position.position(); - newPos.commitSetHash = commitSetHash; + if (newPos.entropySetHash && + *newPos.entropySetHash == entropySetHash) + { + JLOG(j_.debug()) + << "RNG: entropySet already published hash=" + << entropySetHash; + return; + } + + newPos.entropySetHash = entropySetHash; + + result_->position.changePosition( + newPos, asCloseTime(result_->position.closeTime()), now_); + + // Publish entropySetHash before accepting so lagging peers + // can fetch/merge reveal sets in ConvergingReveal. + // + // This can look redundant in healthy rounds because txSetHash + // may be unchanged versus the prior proposal (for example, + // seq=2 and seq=3 showing the same tx summary in monitors). We + // still publish to create an additional delivery window for + // entropySetHash and to trigger fetch/merge on peers that + // missed earlier packets. + if (mode_.get() == ConsensusMode::proposing) + adaptor_.propose(result_->position); + + JLOG(j_.debug()) << "RNG: built entropySet"; + }; + + JLOG(j_.debug()) << "RNG: phaseEstablish estState=" + << static_cast(estState_); + + if (estState_ == EstablishState::ConvergingTx) + { + if (adaptor_.hasQuorumOfCommits()) // all expected proposers + // (80% fallback) + { + auto commitSetHash = adaptor_.buildCommitSet(buildSeq); + + // Keep the same entropy secret from onClose() — do NOT + // regenerate. The commitment in the commitSet was built + // from that original secret; regenerating would make the + // later reveal fail verification. + auto newPos = result_->position.position(); + newPos.commitSetHash = commitSetHash; + + result_->position.changePosition( + newPos, + asCloseTime(result_->position.closeTime()), + now_); + + if (mode_.get() == ConsensusMode::proposing) + adaptor_.propose(result_->position); + + estState_ = EstablishState::ConvergingCommit; + commitHashConflictStart_ = {}; + JLOG(j_.debug()) << "RNG: transitioned to ConvergingCommit" + << " commitSet=" << commitSetHash; + return; // Wait for next tick + } + + // Don't let the round close while waiting for commit quorum. + // Without this gate, execution falls through to the normal + // consensus close logic and nodes inject partial/zero entropy + // while others are still collecting — causing ledger + // mismatches. + // + // However, if we've already converged on the txSet (which we + // have — haveConsensus() passed above) and there aren't enough + // participants to ever reach quorum, skip immediately. With + // 3 nodes and quorum=3, losing one node means 2/3 commits + // forever — waiting 3s per round just delays recovery. + // + // NOTE: Late-joining nodes (e.g. restarting after a crash) + // cannot help here. They enter the round as proposing=false + // and onClose() skips commitment generation for non-proposers. + // It takes at least one full round of observing before + // consensus promotes them to proposing. + { + // participants = peers + ourselves + auto const participants = currPeerPositions_.size() + 1; + auto const threshold = adaptor_.quorumThreshold(); + bool const impossible = participants < threshold; + + if (impossible) + { + JLOG(j_.debug()) + << "RNG: skipping commit wait (participants=" + << participants << " < threshold=" << threshold + << ")"; + logRngDiag("rng-commit-wait-impossible-quorum"); + // Fall through to close with zero entropy + } + else + { + bool timeout = result_->roundTime.read() > + parms.rngPIPELINE_TIMEOUT; + if (!timeout) + { + logRngDiag("rng-commit-wait"); + return; // Wait for more commits + } + + // Timeout waiting for all expected proposers. + // If we still have quorum (80% of UNL), proceed + // with what we have — the SHAMap merge handles + // any fuzziness for this transition round. + auto const commits = adaptor_.pendingCommitCount(); + auto const quorum = adaptor_.quorumThreshold(); + if (commits >= quorum) + { + JLOG(j_.info()) + << "RNG: commit timeout but have quorum (" + << commits << "/" << quorum + << "), proceeding with partial set"; + // Jump to the same path as hasQuorumOfCommits + auto commitSetHash = + adaptor_.buildCommitSet(buildSeq); + auto newPos = result_->position.position(); + newPos.commitSetHash = commitSetHash; + result_->position.changePosition( + newPos, + asCloseTime(result_->position.closeTime()), + now_); + if (mode_.get() == ConsensusMode::proposing) + adaptor_.propose(result_->position); + estState_ = EstablishState::ConvergingCommit; + commitHashConflictStart_ = {}; + JLOG(j_.debug()) + << "RNG: transitioned to ConvergingCommit" + << " commitSet=" << commitSetHash + << " (timeout fallback)"; + return; + } + logRngDiag("rng-commit-timeout-below-quorum"); + // Truly below quorum: fall through to zero entropy + } + } + } + else if (estState_ == EstablishState::ConvergingCommit) + { + // If commit hashes diverge, we may not receive any additional + // tx-converged proposals in this state (peers can move to the + // next ledger quickly, causing prevLedger rejects). In that + // case, hashes observed during ConvergingTx would never be + // fetched because fetch is intentionally deferred there. + // + // Sweep currently tx-converged peer positions each tick so + // deferred commitSet hashes still get fetched/merged even + // without new accepted proposals in ConvergingCommit. + if constexpr ( + requires(Adaptor& a) { + a.fetchRngSetIfNeeded(std::optional{}); + } && requires(Position_t const& p) { p.commitSetHash; }) + { + auto const ourPos = result_->position.position(); + for (auto const& [nodeId, peerPos] : currPeerPositions_) + { + auto const& peerPosition = + peerPos.proposal().position(); + if (!(peerPosition == ourPos)) + continue; + adaptor_.fetchRngSetIfNeeded( + peerPosition.commitSetHash); + } + } + + // Fast path: if no commit-set conflicts are observed, do + // exactly what we did before (immediate reveal transition). + // + // Safety path: haveConsensus() only compares tx-set hash, not + // RNG sidecar fields. So commitSetHash disagreements can exist + // transiently even while tx consensus is true. We only add + // delay when we *actually* observe conflicting non-empty + // commitSetHash values among tx-converged positions. + if (hasConflictingCommitSetHashes()) + { + // Fetch/merge may have added missing commits since we last + // published our commitSetHash. Rebuild and re-publish so + // peers can converge on one deterministic hash instead of + // timing out. + auto pos = result_->position.position(); + auto const previousHash = pos.commitSetHash; + auto const refreshedHash = + adaptor_.buildCommitSet(buildSeq); + if (!previousHash || *previousHash != refreshedHash) + { + pos.commitSetHash = refreshedHash; + result_->position.changePosition( + pos, + asCloseTime(result_->position.closeTime()), + now_); + + if (mode_.get() == ConsensusMode::proposing) + adaptor_.propose(result_->position); + + JLOG(j_.debug()) + << "RNG: refreshed commitSetHash after merge to " + << refreshedHash; + } + + // Re-check after refreshing our own hash. + if (hasConflictingCommitSetHashes()) + { + auto const nowSteady = std::chrono::steady_clock::now(); + if (commitHashConflictStart_ == + std::chrono::steady_clock::time_point{}) + { + // First observed conflict: start a bounded grace + // window so benign ordering/fetch races can settle. + commitHashConflictStart_ = nowSteady; + JLOG(j_.warn()) + << "RNG: conflicting commitSetHash detected; " + "waiting briefly for convergence/fetch"; + logRngDiag("rng-commit-conflict-start"); + return; + } + + auto const conflictElapsed = + nowSteady - commitHashConflictStart_; + if (conflictElapsed <= parms.rngREVEAL_TIMEOUT) + { + // We are still inside the grace window, so keep + // waiting. This preserves the fast path when peers + // converge after a short delay. + JLOG(j_.debug()) + << "RNG: commitSetHash still conflicting after " + << std::chrono::duration_cast< + std::chrono::milliseconds>( + conflictElapsed) + .count() + << "ms; staying in ConvergingCommit"; + logRngDiag("rng-commit-conflict-wait"); + return; + } + + // If conflict persists past a bounded wait, force + // deterministic fallback for this round. + adaptor_.setEntropyFailed(); + estState_ = EstablishState::ConvergingReveal; + // Backdate revealPhaseStart_ so the ConvergingReveal + // timeout path fires immediately next tick. + revealPhaseStart_ = nowSteady - + parms.rngREVEAL_TIMEOUT - + std::chrono::milliseconds{1}; + commitHashConflictStart_ = {}; + JLOG(j_.warn()) + << "RNG: commitSetHash conflict persisted; forcing " + "zero-entropy fallback"; + logRngDiag("rng-commit-conflict-timeout-fallback"); + return; + } + } + + commitHashConflictStart_ = {}; + + auto newPos = result_->position.position(); + newPos.myReveal = adaptor_.getEntropySecret(); result_->position.changePosition( newPos, asCloseTime(result_->position.closeTime()), now_); @@ -1766,470 +2004,289 @@ Consensus::phaseEstablish( if (mode_.get() == ConsensusMode::proposing) adaptor_.propose(result_->position); - estState_ = EstablishState::ConvergingCommit; - commitHashConflictStart_ = {}; - JLOG(j_.debug()) << "RNG: transitioned to ConvergingCommit" - << " commitSet=" << commitSetHash; - return; // Wait for next tick - } + estState_ = EstablishState::ConvergingReveal; + revealPhaseStart_ = std::chrono::steady_clock::now(); + JLOG(j_.debug()) << "RNG: transitioned to ConvergingReveal" + << " reveal=" << adaptor_.getEntropySecret(); - // Don't let the round close while waiting for commit quorum. - // Without this gate, execution falls through to the normal - // consensus close logic and nodes inject partial/zero entropy - // while others are still collecting — causing ledger mismatches. - // - // However, if we've already converged on the txSet (which we - // have — haveConsensus() passed above) and there aren't enough - // participants to ever reach quorum, skip immediately. With - // 3 nodes and quorum=3, losing one node means 2/3 commits - // forever — waiting 3s per round just delays recovery. - // - // NOTE: Late-joining nodes (e.g. restarting after a crash) - // cannot help here. They enter the round as proposing=false - // and onClose() skips commitment generation for non-proposers. - // It takes at least one full round of observing before - // consensus promotes them to proposing. - { - // participants = peers + ourselves - auto const participants = currPeerPositions_.size() + 1; - auto const threshold = adaptor_.quorumThreshold(); - bool const impossible = participants < threshold; - - if (impossible) - { - JLOG(j_.debug()) - << "RNG: skipping commit wait (participants=" - << participants << " < threshold=" << threshold << ")"; - logRngDiag("rng-commit-wait-impossible-quorum"); - // Fall through to close with zero entropy - } - else - { - bool timeout = - result_->roundTime.read() > parms.rngPIPELINE_TIMEOUT; - if (!timeout) - { - logRngDiag("rng-commit-wait"); - return; // Wait for more commits - } - - // Timeout waiting for all expected proposers. - // If we still have quorum (80% of UNL), proceed - // with what we have — the SHAMap merge handles - // any fuzziness for this transition round. - auto const commits = adaptor_.pendingCommitCount(); - auto const quorum = adaptor_.quorumThreshold(); - if (commits >= quorum) - { - JLOG(j_.info()) - << "RNG: commit timeout but have quorum (" - << commits << "/" << quorum - << "), proceeding with partial set"; - // Jump to the same path as hasQuorumOfCommits - auto commitSetHash = adaptor_.buildCommitSet(buildSeq); - auto newPos = result_->position.position(); - newPos.commitSetHash = commitSetHash; - result_->position.changePosition( - newPos, - asCloseTime(result_->position.closeTime()), - now_); - if (mode_.get() == ConsensusMode::proposing) - adaptor_.propose(result_->position); - estState_ = EstablishState::ConvergingCommit; - commitHashConflictStart_ = {}; - JLOG(j_.debug()) - << "RNG: transitioned to ConvergingCommit" - << " commitSet=" << commitSetHash - << " (timeout fallback)"; - return; - } - logRngDiag("rng-commit-timeout-below-quorum"); - // Truly below quorum: fall through to zero entropy - } - } - } - else if (estState_ == EstablishState::ConvergingCommit) - { - // If commit hashes diverge, we may not receive any additional - // tx-converged proposals in this state (peers can move to the next - // ledger quickly, causing prevLedger rejects). In that case, hashes - // observed during ConvergingTx would never be fetched because fetch - // is intentionally deferred there. - // - // Sweep currently tx-converged peer positions each tick so deferred - // commitSet hashes still get fetched/merged even without new - // accepted proposals in ConvergingCommit. - if constexpr ( - requires(Adaptor& a) { - a.fetchRngSetIfNeeded(std::optional{}); - } && requires(Position_t const& p) { p.commitSetHash; }) - { - auto const ourPos = result_->position.position(); - for (auto const& [nodeId, peerPos] : currPeerPositions_) - { - auto const& peerPosition = peerPos.proposal().position(); - if (!(peerPosition == ourPos)) - continue; - adaptor_.fetchRngSetIfNeeded(peerPosition.commitSetHash); - } - } - - // Fast path: if no commit-set conflicts are observed, do exactly - // what we did before (immediate reveal transition). - // - // Safety path: haveConsensus() only compares tx-set hash, not RNG - // sidecar fields. So commitSetHash disagreements can exist - // transiently even while tx consensus is true. We only add delay - // when we *actually* observe conflicting non-empty commitSetHash - // values among tx-converged positions. - if (hasConflictingCommitSetHashes()) - { - // Fetch/merge may have added missing commits since we last - // published our commitSetHash. Rebuild and re-publish so peers - // can converge on one deterministic hash instead of timing out. - auto pos = result_->position.position(); - auto const previousHash = pos.commitSetHash; - auto const refreshedHash = adaptor_.buildCommitSet(buildSeq); - if (!previousHash || *previousHash != refreshedHash) - { - pos.commitSetHash = refreshedHash; - result_->position.changePosition( - pos, asCloseTime(result_->position.closeTime()), now_); - - if (mode_.get() == ConsensusMode::proposing) - adaptor_.propose(result_->position); - - JLOG(j_.debug()) - << "RNG: refreshed commitSetHash after merge to " - << refreshedHash; - } - - // Re-check after refreshing our own hash. - if (hasConflictingCommitSetHashes()) - { - auto const nowSteady = std::chrono::steady_clock::now(); - if (commitHashConflictStart_ == - std::chrono::steady_clock::time_point{}) - { - // First observed conflict: start a bounded grace window - // so benign ordering/fetch races can settle. - commitHashConflictStart_ = nowSteady; - JLOG(j_.warn()) - << "RNG: conflicting commitSetHash detected; " - "waiting briefly for convergence/fetch"; - logRngDiag("rng-commit-conflict-start"); - return; - } - - auto const conflictElapsed = - nowSteady - commitHashConflictStart_; - if (conflictElapsed <= parms.rngREVEAL_TIMEOUT) - { - // We are still inside the grace window, so keep - // waiting. This preserves the fast path when peers - // converge after a short delay. - JLOG(j_.debug()) - << "RNG: commitSetHash still conflicting after " - << std::chrono::duration_cast< - std::chrono::milliseconds>(conflictElapsed) - .count() - << "ms; staying in ConvergingCommit"; - logRngDiag("rng-commit-conflict-wait"); - return; - } - - // If conflict persists past a bounded wait, force - // deterministic fallback for this round. - adaptor_.setEntropyFailed(); - estState_ = EstablishState::ConvergingReveal; - // Backdate revealPhaseStart_ so the ConvergingReveal - // timeout path fires immediately next tick. - revealPhaseStart_ = nowSteady - parms.rngREVEAL_TIMEOUT - - std::chrono::milliseconds{1}; - commitHashConflictStart_ = {}; - JLOG(j_.warn()) - << "RNG: commitSetHash conflict persisted; forcing " - "zero-entropy fallback"; - logRngDiag("rng-commit-conflict-timeout-fallback"); - return; - } - } - - commitHashConflictStart_ = {}; - - auto newPos = result_->position.position(); - newPos.myReveal = adaptor_.getEntropySecret(); - - result_->position.changePosition( - newPos, asCloseTime(result_->position.closeTime()), now_); - - if (mode_.get() == ConsensusMode::proposing) - adaptor_.propose(result_->position); - - estState_ = EstablishState::ConvergingReveal; - revealPhaseStart_ = std::chrono::steady_clock::now(); - JLOG(j_.debug()) << "RNG: transitioned to ConvergingReveal" - << " reveal=" << adaptor_.getEntropySecret(); - - // Fast path: - // If all required reveals are already present at transition time, - // publish entropySet immediately and finish in this timer pass. - // This is state-based (reveal completeness), not tied to any - // particular proposal sequence number. - if (adaptor_.hasMinimumReveals()) - { - publishEntropySet(); - JLOG(j_.debug()) - << "RNG: fast-path published entropySet in same tick"; - } - else - { - logRngDiag("rng-reveal-wait-after-transition"); - return; // Wait for next tick - } - } - else if (estState_ == EstablishState::ConvergingReveal) - { - // Wait for ALL committers to reveal (not just 80%). - // Timeout measured from ConvergingReveal entry, not round start. - auto const elapsed = - std::chrono::steady_clock::now() - revealPhaseStart_; - bool timeout = elapsed > parms.rngREVEAL_TIMEOUT; - bool ready = false; - bool const revealConsensus = - haveConsensus(clog) && adaptor_.hasMinimumReveals(); - - if (revealConsensus || timeout) - { - JLOG(j_.info()) - << "STALLDIAG: rng-reveal-gate-open" - << " revealConsensus=" << (revealConsensus ? "yes" : "no") - << " timeout=" << (timeout ? "yes" : "no") << " elapsedMs=" - << std::chrono::duration_cast( - elapsed) - .count(); - if (timeout && !adaptor_.hasAnyReveals()) - { - adaptor_.setEntropyFailed(); - JLOG(j_.warn()) << "RNG: entropy failed (no reveals)"; - logRngDiag("rng-reveal-timeout-no-reveals"); - } - else + // Fast path: + // If all required reveals are already present at transition + // time, publish entropySet immediately and finish in this timer + // pass. This is state-based (reveal completeness), not tied to + // any particular proposal sequence number. + if (adaptor_.hasMinimumReveals()) { publishEntropySet(); - logRngDiag("rng-reveal-published-entropy-set"); + JLOG(j_.debug()) + << "RNG: fast-path published entropySet in same tick"; + } + else + { + logRngDiag("rng-reveal-wait-after-transition"); + return; // Wait for next tick } - ready = true; } - - if (!ready) + else if (estState_ == EstablishState::ConvergingReveal) { - JLOG(j_.info()) - << "STALLDIAG: rng-reveal-gate-blocked" - << " revealConsensus=" << (revealConsensus ? "yes" : "no") - << " timeout=" << (timeout ? "yes" : "no") << " elapsedMs=" - << std::chrono::duration_cast( - elapsed) - .count(); - logRngDiag("rng-reveal-wait"); - return; - } + // Wait for ALL committers to reveal (not just 80%). + // Timeout measured from ConvergingReveal entry, not round + // start. + auto const elapsed = + std::chrono::steady_clock::now() - revealPhaseStart_; + bool timeout = elapsed > parms.rngREVEAL_TIMEOUT; + bool ready = false; + bool const revealConsensus = + haveConsensus(clog) && adaptor_.hasMinimumReveals(); - // Optional explicit final proposal (seq=4 style): - // publish a synthetic tx-set hash that includes the - // consensus-entropy pseudo-tx just before accept. - // - // IMPORTANT DESIGN NOTE (read before editing this block): - // - // This path is intentionally OPTIONAL and default-off. It exists - // for diagnostics/perf experiments (for example, making monitor - // visibility of the final pseudo-tx set more direct), NOT as a - // required step for consensus correctness. - // - // Why so conservative? - // - The main consensus engine still keys agreement on tx-set hash. - // - Updating our tx-set hash here creates a "late identity - // change" in establish. - // - Under lossy/reordered networks, peers can be slightly out of - // phase: some nodes may have switched to the synthetic hash - // while others are still on the base hash. - // - That can fragment agreement during a critical window (two - // hashes in flight for one ledger), increase proposal chatter, - // and trigger sync churn. - // - // Therefore this logic must remain best-effort only: - // - Never required for liveness/safety. - // - No extra wait tick is introduced. - // - If gates are not met, we skip and continue to accept via the - // normal implicit path (accept-time pseudo-tx injection). - // - // TBD (2026-03-03): We did not find a robust timing model that - // folds this into a guaranteed-safe explicit final proposal across - // lossy/reordered links without increasing churn. Keep this path - // as opt-in for future evaluation. - if constexpr (requires( - Adaptor& a, - TxSet_t const& txns, - typename Ledger_t::Seq seq) { - a.shouldSendExplicitFinalProposal(); - a.hasQuorumOfCommits(); - a.buildExplicitFinalProposalTxSet(txns, seq); - }) - { - bool fullParticipantCoverage = false; - bool entropyAligned = false; - if constexpr (requires(Position_t const& p) { - p.entropySetHash; + if (revealConsensus || timeout) + { + JLOG(j_.info()) << "STALLDIAG: rng-reveal-gate-open" + << " revealConsensus=" + << (revealConsensus ? "yes" : "no") + << " timeout=" << (timeout ? "yes" : "no") + << " elapsedMs=" + << std::chrono::duration_cast< + std::chrono::milliseconds>(elapsed) + .count(); + if (timeout && !adaptor_.hasAnyReveals()) + { + adaptor_.setEntropyFailed(); + JLOG(j_.warn()) << "RNG: entropy failed (no reveals)"; + logRngDiag("rng-reveal-timeout-no-reveals"); + } + else + { + publishEntropySet(); + logRngDiag("rng-reveal-published-entropy-set"); + } + ready = true; + } + + if (!ready) + { + JLOG(j_.info()) << "STALLDIAG: rng-reveal-gate-blocked" + << " revealConsensus=" + << (revealConsensus ? "yes" : "no") + << " timeout=" << (timeout ? "yes" : "no") + << " elapsedMs=" + << std::chrono::duration_cast< + std::chrono::milliseconds>(elapsed) + .count(); + logRngDiag("rng-reveal-wait"); + return; + } + + // Optional explicit final proposal (seq=4 style): + // publish a synthetic tx-set hash that includes the + // consensus-entropy pseudo-tx just before accept. + // + // IMPORTANT DESIGN NOTE (read before editing this block): + // + // This path is intentionally OPTIONAL and default-off. It + // exists for diagnostics/perf experiments (for example, making + // monitor visibility of the final pseudo-tx set more direct), + // NOT as a required step for consensus correctness. + // + // Why so conservative? + // - The main consensus engine still keys agreement on tx-set + // hash. + // - Updating our tx-set hash here creates a "late identity + // change" in establish. + // - Under lossy/reordered networks, peers can be slightly out + // of + // phase: some nodes may have switched to the synthetic hash + // while others are still on the base hash. + // - That can fragment agreement during a critical window (two + // hashes in flight for one ledger), increase proposal + // chatter, and trigger sync churn. + // + // Therefore this logic must remain best-effort only: + // - Never required for liveness/safety. + // - No extra wait tick is introduced. + // - If gates are not met, we skip and continue to accept via + // the + // normal implicit path (accept-time pseudo-tx injection). + // + // TBD (2026-03-03): We did not find a robust timing model that + // folds this into a guaranteed-safe explicit final proposal + // across lossy/reordered links without increasing churn. Keep + // this path as opt-in for future evaluation. + if constexpr (requires( + Adaptor& a, + TxSet_t const& txns, + typename Ledger_t::Seq seq) { + a.shouldSendExplicitFinalProposal(); + a.hasQuorumOfCommits(); + a.buildExplicitFinalProposalTxSet(txns, seq); }) { - // Guard against "early switch" churn: - // require at least as many participants as the previous - // round before attempting the explicit-final mutation. - // - // This is a heuristic to reduce risk, not a proof of - // safety. We still keep the feature optional/default-off. - auto const participants = currPeerPositions_.size() + 1; - auto const expectedParticipants = prevProposers_ + 1; - fullParticipantCoverage = - participants >= expectedParticipants; - // Require a majority aligned on entropySetHash before - // mutating tx-set hash. If this threshold is loosened, the - // probability of hash fragmentation rises quickly. - auto const requiredEntropyAligned = - (expectedParticipants / 2) + 1; - auto const ourPos = result_->position.position(); - if (ourPos.entropySetHash) + bool fullParticipantCoverage = false; + bool entropyAligned = false; + if constexpr (requires(Position_t const& p) { + p.entropySetHash; + }) { - auto const expectedEntropy = *ourPos.entropySetHash; - std::size_t alignedPeers = 0; - bool conflict = false; - for (auto const& [_, peerPos] : currPeerPositions_) + // Guard against "early switch" churn: + // require at least as many participants as the previous + // round before attempting the explicit-final mutation. + // + // This is a heuristic to reduce risk, not a proof of + // safety. We still keep the feature + // optional/default-off. + auto const participants = currPeerPositions_.size() + 1; + auto const expectedParticipants = prevProposers_ + 1; + fullParticipantCoverage = + participants >= expectedParticipants; + // Require a majority aligned on entropySetHash before + // mutating tx-set hash. If this threshold is loosened, + // the probability of hash fragmentation rises quickly. + auto const requiredEntropyAligned = + (expectedParticipants / 2) + 1; + auto const ourPos = result_->position.position(); + if (ourPos.entropySetHash) { - auto const& peerPosition = - peerPos.proposal().position(); - if (!peerPosition.entropySetHash) - continue; - if (*peerPosition.entropySetHash == expectedEntropy) + auto const expectedEntropy = *ourPos.entropySetHash; + std::size_t alignedPeers = 0; + bool conflict = false; + for (auto const& [_, peerPos] : currPeerPositions_) { - ++alignedPeers; - continue; + auto const& peerPosition = + peerPos.proposal().position(); + if (!peerPosition.entropySetHash) + continue; + if (*peerPosition.entropySetHash == + expectedEntropy) + { + ++alignedPeers; + continue; + } + conflict = true; + break; } - conflict = true; - break; - } - auto const alignedParticipants = alignedPeers + 1; - entropyAligned = !conflict && - alignedParticipants >= requiredEntropyAligned; - if (!entropyAligned) + auto const alignedParticipants = alignedPeers + 1; + entropyAligned = !conflict && + alignedParticipants >= requiredEntropyAligned; + if (!entropyAligned) + { + JLOG(j_.debug()) + << "RNG: explicit-final entropy alignment " + "insufficient" + << " alignedParticipants=" + << alignedParticipants + << " required=" << requiredEntropyAligned + << " conflict=" + << (conflict ? "yes" : "no"); + } + } + else { JLOG(j_.debug()) - << "RNG: explicit-final entropy alignment " - "insufficient" - << " alignedParticipants=" - << alignedParticipants - << " required=" << requiredEntropyAligned - << " conflict=" << (conflict ? "yes" : "no"); + << "RNG: explicit-final waiting on local " + "entropySetHash"; + } + } + + if (mode_.get() == ConsensusMode::proposing && + !explicitFinalProposalSent_ && + adaptor_.hasQuorumOfCommits() && revealConsensus && + fullParticipantCoverage && entropyAligned && + adaptor_.shouldSendExplicitFinalProposal()) + { + // One-shot per round. This avoids repeated mutations/ + // broadcasts from timer ticks, which can amplify + // network chatter in the exact conditions + // (loss/reordering) where this path is already fragile. + auto const synthSet = + adaptor_.buildExplicitFinalProposalTxSet( + result_->txns, buildSeq); + explicitFinalProposalSent_ = true; + + if (synthSet) + { + auto const synthHash = synthSet->id(); + auto currentPos = result_->position.position(); + auto newPos = currentPos; + if constexpr (requires( + typename Adaptor::Position_t p) { + p.updateTxSet(synthHash); + }) + newPos.updateTxSet(synthHash); + else + newPos = synthHash; + + if (!(newPos == currentPos)) + { + // WARNING: + // This changes proposal tx-set identity late in + // establish. Keep this path tightly gated and + // optional. The canonical ledger path remains + // the implicit accept-time injection logic. + + // Maintain the invariant that our active + // position's tx-set hash is present in + // acquired_, otherwise gotTxSet can assert if + // this set arrives back from the network. + auto const [_, inserted] = + acquired_.emplace(synthHash, *synthSet); + JLOG(j_.debug()) + << "RNG: cached explicit-final txSet=" + << synthHash << " inserted=" << inserted; + if (inserted) + { + // Make the synthetic set discoverable by + // peers immediately; otherwise they can + // request this hash and hit transient + // "getTxSet: Failed to find TX set" on + // random peers. + adaptor_.share(*synthSet); + JLOG(j_.debug()) + << "RNG: shared explicit-final txSet=" + << synthHash; + } + result_->position.changePosition( + newPos, + asCloseTime(result_->position.closeTime()), + now_); + adaptor_.propose(result_->position); + JLOG(j_.debug()) + << "RNG: explicit final proposal txSet=" + << synthHash; + logRngDiag("rng-explicit-final-proposed"); + } } } else { - JLOG(j_.debug()) - << "RNG: explicit-final waiting on local " - "entropySetHash"; + char const* reason = "disabled"; + if (mode_.get() != ConsensusMode::proposing) + reason = "not-proposing"; + else if (explicitFinalProposalSent_) + reason = "already-sent"; + else if (!adaptor_.hasQuorumOfCommits()) + reason = "no-commit-quorum"; + else if (!revealConsensus) + reason = "reveal-timeout"; + else if (!fullParticipantCoverage) + reason = "participant-gap"; + else if (!entropyAligned) + reason = "entropy-not-aligned"; + JLOG(j_.info()) + << "STALLDIAG: rng-explicit-final-skipped" + << " reason=" << reason + << " mode=" << to_string(mode_.get()) << " sent=" + << (explicitFinalProposalSent_ ? "yes" : "no"); } } - - if (mode_.get() == ConsensusMode::proposing && - !explicitFinalProposalSent_ && - adaptor_.hasQuorumOfCommits() && revealConsensus && - fullParticipantCoverage && entropyAligned && - adaptor_.shouldSendExplicitFinalProposal()) - { - // One-shot per round. This avoids repeated mutations/ - // broadcasts from timer ticks, which can amplify network - // chatter in the exact conditions (loss/reordering) where - // this path is already fragile. - auto const synthSet = - adaptor_.buildExplicitFinalProposalTxSet( - result_->txns, buildSeq); - explicitFinalProposalSent_ = true; - - if (synthSet) - { - auto const synthHash = synthSet->id(); - auto currentPos = result_->position.position(); - auto newPos = currentPos; - if constexpr (requires(typename Adaptor::Position_t p) { - p.updateTxSet(synthHash); - }) - newPos.updateTxSet(synthHash); - else - newPos = synthHash; - - if (!(newPos == currentPos)) - { - // WARNING: - // This changes proposal tx-set identity late in - // establish. Keep this path tightly gated and - // optional. The canonical ledger path remains the - // implicit accept-time injection logic. - - // Maintain the invariant that our active - // position's tx-set hash is present in acquired_, - // otherwise gotTxSet can assert if this set arrives - // back from the network. - auto const [_, inserted] = - acquired_.emplace(synthHash, *synthSet); - JLOG(j_.debug()) - << "RNG: cached explicit-final txSet=" - << synthHash << " inserted=" << inserted; - if (inserted) - { - // Make the synthetic set discoverable by peers - // immediately; otherwise they can request this - // hash and hit transient "getTxSet: Failed to - // find TX set" on random peers. - adaptor_.share(*synthSet); - JLOG(j_.debug()) - << "RNG: shared explicit-final txSet=" - << synthHash; - } - result_->position.changePosition( - newPos, - asCloseTime(result_->position.closeTime()), - now_); - adaptor_.propose(result_->position); - JLOG(j_.debug()) - << "RNG: explicit final proposal txSet=" - << synthHash; - logRngDiag("rng-explicit-final-proposed"); - } - } - } - else - { - char const* reason = "disabled"; - if (mode_.get() != ConsensusMode::proposing) - reason = "not-proposing"; - else if (explicitFinalProposalSent_) - reason = "already-sent"; - else if (!adaptor_.hasQuorumOfCommits()) - reason = "no-commit-quorum"; - else if (!revealConsensus) - reason = "reveal-timeout"; - else if (!fullParticipantCoverage) - reason = "participant-gap"; - else if (!entropyAligned) - reason = "entropy-not-aligned"; - JLOG(j_.info()) - << "STALLDIAG: rng-explicit-final-skipped" - << " reason=" << reason - << " mode=" << to_string(mode_.get()) << " sent=" - << (explicitFinalProposalSent_ ? "yes" : "no"); - } } } + else + { + JLOG(j_.debug()) << "RNGGATE: skipping RNG substates" + << " prevSeq=" << previousLedger_.seq() + << " phase=" << to_string(phase_) + << " mode=" << to_string(mode_.get()); + } } //@@end rng-phase-establish-substates