consensus: document explicit-final tradeoffs and tighten rng diagnostics

Keep explicit final proposal as an opt-in experimental path with implicit mode as default.

Add inline rationale/TBD notes, extend stall diagnostics, and cover runtime-config + CSF txn-path behavior with tests.
This commit is contained in:
Nicholas Dudfield
2026-03-03 17:08:38 +07:00
parent d32f34d3bf
commit fd1567d1ba
8 changed files with 737 additions and 33 deletions

View File

@@ -1095,6 +1095,42 @@ public:
}
}
void
testRngCommitRevealConvergesWithTransactions()
{
using namespace csf;
using namespace std::chrono;
testcase("RNG commit/reveal converges with non-empty tx set");
ConsensusParms const parms{};
Sim sim;
PeerGroup peers = sim.createGroup(5);
peers.trustAndConnect(
peers, round<milliseconds>(0.2 * parms.ledgerGRANULARITY));
for (Peer* peer : peers)
{
peer->enableRngConsensus_ = true;
peer->submit(Tx(static_cast<std::uint32_t>(peer->id)));
}
sim.run(1);
if (BEAST_EXPECT(sim.synchronized()))
{
for (Peer const* peer : peers)
{
auto const& lcl = peer->lastClosedLedger;
BEAST_EXPECT(!peer->lastEntropyWasFallback_);
BEAST_EXPECT(peer->lastEntropyCount_ > 0);
BEAST_EXPECT(peer->lastEntropyDigest_ != uint256{});
BEAST_EXPECT(lcl.txs().size() > 0);
}
}
}
void
testRngImpossibleQuorumFallback()
{
@@ -1489,6 +1525,7 @@ public:
testPreferredByBranch();
testPauseForLaggards();
testRngCommitRevealConverges();
testRngCommitRevealConvergesWithTransactions();
testRngImpossibleQuorumFallback();
testRngTimeoutWithPartialQuorum();
testRngCommitSetConflictForcesFallback();

View File

@@ -413,6 +413,54 @@ class RuntimeConfig_test : public beast::unit_test::suite
BEAST_EXPECT(cfg->rngClaimDropPctX100 == 0); // clamped to 0%
}
void
testExplicitFinalProposalToggle()
{
testcase("explicit_final_proposal round-trips and merges");
using namespace test::jtx;
Env env{*this};
// Global default for this node: skip explicit final proposal.
{
Json::Value params;
params["set"] = Json::objectValue;
params["set"]["*"] = Json::objectValue;
params["set"]["*"]["explicit_final_proposal"] = false;
auto result = runtimeConfig(env, params);
auto const& global = result["configs"]["*"];
BEAST_EXPECT(global["explicit_final_proposal"].asBool() == false);
}
auto& rc = env.app().getRuntimeConfig();
BEAST_EXPECT(rc.active());
// Global view is false.
auto globalCfg = rc.getConfig("*");
BEAST_EXPECT(globalCfg.has_value());
BEAST_EXPECT(globalCfg->explicitFinalProposal.has_value());
BEAST_EXPECT(*globalCfg->explicitFinalProposal == false);
// Per-peer override can re-enable.
{
Json::Value params;
params["set"] = Json::objectValue;
params["set"]["10.0.0.2:51235"] = Json::objectValue;
params["set"]["10.0.0.2:51235"]["explicit_final_proposal"] = true;
runtimeConfig(env, params);
}
auto peerCfg = rc.getConfig("10.0.0.2:51235");
BEAST_EXPECT(peerCfg.has_value());
BEAST_EXPECT(peerCfg->explicitFinalProposal.has_value());
BEAST_EXPECT(*peerCfg->explicitFinalProposal == true);
auto otherCfg = rc.getConfig("10.0.0.3:51235");
BEAST_EXPECT(otherCfg.has_value());
BEAST_EXPECT(otherCfg->explicitFinalProposal.has_value());
BEAST_EXPECT(*otherCfg->explicitFinalProposal == false);
}
void
testPerPeerClearInheritedFilter()
{
@@ -472,6 +520,7 @@ public:
testDropPctClamping();
testRngClaimDropPct();
testRngClaimDropPctClamping();
testExplicitFinalProposalToggle();
testPerPeerClearInheritedFilter();
}
};

View File

@@ -1317,6 +1317,18 @@ RCLConsensus::Adaptor::pendingCommitCount() const
return pendingCommits_.size();
}
std::size_t
RCLConsensus::Adaptor::pendingRevealCount() const
{
return pendingReveals_.size();
}
std::size_t
RCLConsensus::Adaptor::expectedProposerCount() const
{
return expectedProposers_.size();
}
bool
RCLConsensus::Adaptor::hasQuorumOfCommits() const
{
@@ -1372,6 +1384,129 @@ RCLConsensus::Adaptor::hasAnyReveals() const
return !pendingReveals_.empty();
}
bool
RCLConsensus::Adaptor::shouldSendExplicitFinalProposal() const
{
// Explicit-final-proposal policy is node-local and experimental.
//
// Default behavior is implicit finalization (no extra seq=4 proposal):
// entropy pseudo-tx is injected in onAccept/buildLCL.
//
// We only enable explicit-final when operators intentionally opt in via
// runtime config/env for measurement/diagnostics.
//
// TBD (2026-03-03): Keep collecting tx-bearing network data before
// revisiting whether explicit-final can be safely promoted beyond
// experimental use.
auto const cfg = app_.getRuntimeConfig().getConfig("*");
if (cfg && cfg->explicitFinalProposal.has_value())
return *cfg->explicitFinalProposal;
return false;
}
std::optional<RCLTxSet>
RCLConsensus::Adaptor::buildExplicitFinalProposalTxSet(
RCLTxSet const& txns,
LedgerIndex seq)
{
JLOG(j_.debug()) << "RNGFINAL: build synthetic txset"
<< " baseTxSet=" << txns.id() << " seq=" << seq
<< " commits=" << pendingCommits_.size()
<< " reveals=" << pendingReveals_.size()
<< " failed=" << entropyFailed_;
uint256 finalEntropy;
bool hasEntropy = false;
// Keep this entropy-selection logic aligned with injectEntropyPseudoTx().
// If these paths drift, different nodes can derive different synthetic
// hashes for the same round, which is especially harmful because this
// path mutates proposal tx-set identity late in establish.
if (app_.config().standalone())
{
finalEntropy = sha512Half(std::string("standalone-entropy"), seq);
hasEntropy = true;
}
else if (entropyFailed_ || pendingReveals_.empty())
{
finalEntropy.zero();
hasEntropy = true;
}
else
{
std::vector<std::pair<PublicKey, uint256>> sorted;
sorted.reserve(pendingReveals_.size());
for (auto const& [nodeId, reveal] : pendingReveals_)
{
auto it = nodeIdToKey_.find(nodeId);
if (it != nodeIdToKey_.end())
sorted.emplace_back(it->second, reveal);
}
if (!sorted.empty())
{
std::sort(
sorted.begin(), sorted.end(), [](auto const& a, auto const& b) {
return a.first.slice() < b.first.slice();
});
Serializer s;
for (auto const& [key, reveal] : sorted)
{
s.addVL(key.slice());
s.addBitString(reveal);
}
finalEntropy = sha512Half(s.slice());
hasEntropy = true;
}
}
if (!hasEntropy)
{
JLOG(j_.debug()) << "RNGFINAL: no entropy available for synthetic txset"
<< " baseTxSet=" << txns.id() << " seq=" << seq;
return std::nullopt;
}
auto const entropyCount = static_cast<std::uint16_t>(
app_.config().standalone() ? 20
: (entropyFailed_ || pendingReveals_.empty()
? 0
: pendingReveals_.size()));
STTx tx(ttCONSENSUS_ENTROPY, [&](auto& obj) {
obj.setFieldU32(sfLedgerSequence, seq);
obj.setAccountID(sfAccount, AccountID{});
obj.setFieldU32(sfSequence, 0);
obj.setFieldAmount(sfFee, STAmount{});
obj.setFieldH256(sfDigest, finalEntropy);
obj.setFieldU16(sfEntropyCount, entropyCount);
});
auto const txID = tx.getTransactionID();
if (txns.exists(txID))
{
JLOG(j_.debug()) << "RNGFINAL: pseudo-tx already in base set"
<< " txid=" << txID << " txSet=" << txns.id();
return txns;
}
RCLTxSet::MutableTxSet mutableTxSet{txns};
Serializer ser(512);
tx.add(ser);
mutableTxSet.insert(RCLCxTx{make_shamapitem(txID, ser.slice())});
auto syntheticSet = RCLTxSet{mutableTxSet};
auto const hash = syntheticSet.id();
inboundTransactions_.giveSet(hash, syntheticSet.map_, false);
JLOG(j_.debug()) << "RNGFINAL: built synthetic txset"
<< " hash=" << hash << " baseTxSet=" << txns.id()
<< " txid=" << txID << " entropyCount=" << entropyCount;
return syntheticSet;
}
uint256
RCLConsensus::Adaptor::buildCommitSet(LedgerIndex seq)
{
@@ -1470,9 +1605,14 @@ RCLConsensus::Adaptor::buildEntropySet(LedgerIndex seq)
obj.setFieldAmount(sfFee, STAmount{});
obj.setFieldH256(sfDigest, reveal);
obj.setFieldVL(sfSigningPubKey, kit->second.slice());
auto proofIt = proposalProofs_.find(nid);
if (proofIt != proposalProofs_.end())
obj.setFieldVL(sfBlob, serializeProof(proofIt->second));
// Intentionally omit sfBlob for reveal-set entries.
//
// Reveal proofs are timing-dependent (seq/closeTime/signature can
// differ while the reveal digest is identical), which makes the
// entropy-set hash non-deterministic across nodes under packet
// loss/reordering. We only need deterministic reveal material
// (validator identity + digest) for fetch/merge and entropy
// calculation.
});
Serializer s(2048);
@@ -1726,25 +1866,32 @@ RCLConsensus::Adaptor::handleAcquiredRngSet(std::shared_ptr<SHAMap> const& map)
return;
}
if (!stx->isFieldPresent(sfBlob))
std::optional<ProposalProof> parsedProof;
if (stx->isFieldPresent(sfBlob))
{
JLOG(j_.warn())
<< "RNG: rejecting proofless entry from " << nodeId
<< " in acquired set (" << sourceTag << ")";
return;
auto const proofBlob = stx->getFieldVL(sfBlob);
if (!verifyProof(proofBlob, pubKey, digest, isCommitSet))
{
JLOG(j_.warn()) << "RNG: invalid proof from " << nodeId
<< " in acquired set (" << sourceTag << ")";
return;
}
parsedProof = deserializeProof(proofBlob);
if (!parsedProof)
{
JLOG(j_.warn())
<< "RNG: rejecting malformed proof from " << nodeId
<< " in acquired set (" << sourceTag << ")";
return;
}
}
auto const proofBlob = stx->getFieldVL(sfBlob);
if (!verifyProof(proofBlob, pubKey, digest, isCommitSet))
{
JLOG(j_.warn()) << "RNG: invalid proof from " << nodeId
<< " in acquired set (" << sourceTag << ")";
return;
}
auto parsedProof = deserializeProof(proofBlob);
if (!parsedProof)
else if (isCommitSet)
{
// Commit entries must carry a verifiable proposal proof.
// Without this, an attacker could inject arbitrary digests
// for trusted node IDs via fetched sets.
JLOG(j_.warn())
<< "RNG: rejecting malformed proof from " << nodeId
<< "RNG: rejecting proofless commit entry from " << nodeId
<< " in acquired set (" << sourceTag << ")";
return;
}
@@ -1805,15 +1952,15 @@ RCLConsensus::Adaptor::handleAcquiredRngSet(std::shared_ptr<SHAMap> const& map)
// byte-identical SHAMap leaves for these entries.
if (isCommitSet)
{
if (parsedProof->proposeSeq == 0)
if (parsedProof && parsedProof->proposeSeq == 0)
commitProofs_.insert_or_assign(nodeId, *parsedProof);
else
else if (parsedProof)
JLOG(j_.debug()) << "RNG: commit proof from " << nodeId
<< " has non-zero proposeSeq="
<< parsedProof->proposeSeq
<< "; not caching for commitSet rebuild";
}
else
else if (parsedProof)
{
proposalProofs_.insert_or_assign(nodeId, *parsedProof);
}
@@ -2012,7 +2159,21 @@ RCLConsensus::Adaptor::injectEntropyPseudoTx(
obj.setFieldU16(sfEntropyCount, entropyCount);
});
retriableTxs.insert(std::make_shared<STTx>(std::move(tx)));
auto const txID = tx.getTransactionID();
auto alreadyPresent = std::any_of(
retriableTxs.begin(), retriableTxs.end(), [&](auto const& entry) {
return entry.first.getTXID() == txID;
});
if (alreadyPresent)
{
JLOG(j_.debug())
<< "RNG: entropy pseudo-tx already present, skip duplicate "
<< txID;
}
else
{
retriableTxs.insert(std::make_shared<STTx>(std::move(tx)));
}
}
//@@end rng-inject-pseudotx

View File

@@ -247,6 +247,14 @@ class RCLConsensus
std::size_t
pendingCommitCount() const;
/** Number of pending reveals (for diagnostics) */
std::size_t
pendingRevealCount() const;
/** Number of expected proposers this round (for diagnostics) */
std::size_t
expectedProposerCount() const;
/** Check if we have quorum of commits */
bool
hasQuorumOfCommits() const;
@@ -259,6 +267,18 @@ class RCLConsensus
bool
hasAnyReveals() const;
/** Whether to send an explicit final proposal (seq=4 style). */
bool
shouldSendExplicitFinalProposal() const;
/** Build synthetic tx-set that includes consensus-entropy pseudo-tx.
Used only for optional explicit-final-proposal experiments. This
does not mutate RNG state.
*/
std::optional<RCLTxSet>
buildExplicitFinalProposalTxSet(RCLTxSet const& txns, LedgerIndex seq);
/** Build real SHAMap from collected commits, register for fetch.
@param seq The ledger sequence being built
@return The SHAMap root hash (commitSetHash)

View File

@@ -40,6 +40,13 @@ struct ConfigVals
std::optional<int> sendDelayJitterMs;
std::optional<int> sendDropPctX100; // 0-10000 (pct * 100, avoids float)
std::optional<int> rngClaimDropPctX100; // 0-10000 (pct * 100)
// Controls explicit final proposal broadcast in the RNG reveal phase.
// true = attempt explicit-final proposal (experimental)
// false = keep implicit mode (recommended default for production)
//
// NOTE: This knob is intentionally explicit opt-in. The consensus system
// is fully functional without it via accept-time pseudo-tx injection.
std::optional<bool> explicitFinalProposal;
// If set (non-nullopt), only apply to these TrafficCount::category values.
// nullopt = not specified (inherit from global on merge).
// Empty set = explicitly "all categories" (overrides global filter).
@@ -59,7 +66,8 @@ struct ConfigVals
return (sendDelayMs && *sendDelayMs > 0) ||
(sendDelayJitterMs && *sendDelayJitterMs > 0) ||
(sendDropPctX100 && *sendDropPctX100 > 0) ||
(rngClaimDropPctX100 && *rngClaimDropPctX100 > 0);
(rngClaimDropPctX100 && *rngClaimDropPctX100 > 0) ||
explicitFinalProposal.has_value();
}
/** Merge other on top of this — other's set fields override. */
@@ -75,6 +83,8 @@ struct ConfigVals
result.sendDropPctX100 = other.sendDropPctX100;
if (other.rngClaimDropPctX100)
result.rngClaimDropPctX100 = other.rngClaimDropPctX100;
if (other.explicitFinalProposal.has_value())
result.explicitFinalProposal = other.explicitFinalProposal;
if (other.messageCategories)
result.messageCategories = other.messageCategories;
return result;

View File

@@ -23,12 +23,34 @@
#include <xrpl/json/json_value.h>
#include <algorithm>
#include <cctype>
#include <cstdlib>
#include <mutex>
#include <string>
namespace ripple {
namespace {
std::optional<bool>
parseBoolEnv(char const* env)
{
if (!env)
return std::nullopt;
std::string value{env};
std::transform(
value.begin(), value.end(), value.begin(), [](unsigned char c) {
return static_cast<char>(std::tolower(c));
});
if (value == "1" || value == "true" || value == "yes" || value == "on")
return true;
if (value == "0" || value == "false" || value == "no" || value == "off")
return false;
return std::nullopt;
}
ConfigVals
parseConfigVals(Json::Value const& v)
{
@@ -43,6 +65,8 @@ parseConfigVals(Json::Value const& v)
if (v.isMember("rng_claim_drop_pct"))
cfg.rngClaimDropPctX100 =
static_cast<int>(v["rng_claim_drop_pct"].asDouble() * 100);
if (v.isMember("explicit_final_proposal"))
cfg.explicitFinalProposal = v["explicit_final_proposal"].asBool();
return cfg;
}
} // namespace
@@ -75,6 +99,11 @@ RuntimeConfig::RuntimeConfig()
global.sendDropPctX100 = static_cast<int>(std::atof(env) * 100);
if (auto const* env = std::getenv("XAHAU_RNG_CLAIM_DROP_PCT"))
global.rngClaimDropPctX100 = static_cast<int>(std::atof(env) * 100);
// Explicit-final proposal is intentionally opt-in and defaults to
// implicit behavior when unset.
if (auto parsed =
parseBoolEnv(std::getenv("XAHAUD_EXPLICIT_FINAL_PROPOSAL")))
global.explicitFinalProposal = *parsed;
if (global.active())
{

View File

@@ -616,6 +616,7 @@ private:
EstablishState estState_{EstablishState::ConvergingTx};
std::chrono::steady_clock::time_point revealPhaseStart_{};
std::chrono::steady_clock::time_point commitHashConflictStart_{};
bool explicitFinalProposalSent_{false};
MonitoredMode mode_{ConsensusMode::observing};
bool firstRound_ = true;
bool haveCloseTimeConsensus_ = false;
@@ -789,6 +790,7 @@ Consensus<Adaptor>::startRoundInternal(
estState_ = EstablishState::ConvergingTx;
revealPhaseStart_ = {};
commitHashConflictStart_ = {};
explicitFinalProposalSent_ = false;
closeResolution_ = getNextLedgerTimeResolution(
previousLedger_.closeTimeResolution(),
@@ -1561,18 +1563,43 @@ Consensus<Adaptor>::phaseEstablish(
{
CLOG(clog) << "ledgerMIN_CONSENSUS not reached: "
<< parms.ledgerMIN_CONSENSUS.count() << "ms. ";
JLOG(j_.debug()) << "STALLDIAG: establish wait ledgerMIN_CONSENSUS"
<< " roundMs=" << result_->roundTime.read().count()
<< " minMs=" << parms.ledgerMIN_CONSENSUS.count()
<< " peerPositions=" << currPeerPositions_.size()
<< " prevProposers=" << prevProposers_
<< " phase=" << to_string(phase_)
<< " mode=" << to_string(mode_.get());
return;
}
updateOurPositions(clog);
bool const paused = shouldPause(clog);
bool const txConsensus = paused ? false : haveConsensus(clog);
// Nothing to do if too many laggards or we don't have consensus.
if (shouldPause(clog) || !haveConsensus(clog))
if (paused || !txConsensus)
{
JLOG(j_.info()) << "STALLDIAG: establish gate blocked"
<< " reason=" << (paused ? "pause" : "no-tx-consensus")
<< " roundMs=" << result_->roundTime.read().count()
<< " peerPositions=" << currPeerPositions_.size()
<< " prevProposers=" << prevProposers_
<< " phase=" << to_string(phase_)
<< " mode=" << to_string(mode_.get());
return;
}
if (!haveCloseTimeConsensus_)
{
JLOG(j_.info()) << "We have TX consensus but not CT consensus";
JLOG(j_.info()) << "STALLDIAG: establish gate blocked"
<< " reason=no-close-time-consensus"
<< " roundMs=" << result_->roundTime.read().count()
<< " peerPositions=" << currPeerPositions_.size()
<< " prevProposers=" << prevProposers_
<< " phase=" << to_string(phase_)
<< " mode=" << to_string(mode_.get());
CLOG(clog) << "We have TX consensus but not CT consensus. ";
return;
}
@@ -1607,9 +1634,96 @@ Consensus<Adaptor>::phaseEstablish(
})
{
auto const buildSeq = previousLedger_.seq() + typename Ledger_t::Seq{1};
auto const estStateName = [&]() -> char const* {
switch (estState_)
{
case EstablishState::ConvergingTx:
return "ConvergingTx";
case EstablishState::ConvergingCommit:
return "ConvergingCommit";
case EstablishState::ConvergingReveal:
return "ConvergingReveal";
}
return "Unknown";
};
auto logRngDiag = [&](char const* reason) {
auto const ourPos = result_->position.position();
auto const participants = currPeerPositions_.size() + 1;
JLOG(j_.info())
<< "STALLDIAG: " << reason << " state=" << estStateName()
<< " phase=" << to_string(phase_)
<< " mode=" << to_string(mode_.get())
<< " roundMs=" << result_->roundTime.read().count()
<< " convergePct=" << convergePercent_
<< " participants=" << participants
<< " peerPositions=" << currPeerPositions_.size()
<< " prevProposers=" << prevProposers_ << " explicitFinalSent="
<< (explicitFinalProposalSent_ ? "yes" : "no")
<< " closeTimeConsensus="
<< (haveCloseTimeConsensus_ ? "yes" : "no")
<< " txSet=" << ourPos;
if constexpr (requires(Position_t const& p) { p.commitSetHash; })
{
JLOG(j_.info())
<< "STALLDIAG: sidecar"
<< " commitSetHash="
<< (ourPos.commitSetHash ? to_string(*ourPos.commitSetHash)
: std::string{"none"})
<< " entropySetHash="
<< (ourPos.entropySetHash
? to_string(*ourPos.entropySetHash)
: std::string{"none"})
<< " myCommitment=" << (ourPos.myCommitment ? "yes" : "no")
<< " myReveal=" << (ourPos.myReveal ? "yes" : "no");
}
if constexpr (requires(Adaptor& a) {
a.pendingCommitCount();
a.quorumThreshold();
a.hasQuorumOfCommits();
a.hasMinimumReveals();
a.hasAnyReveals();
})
{
auto const commits = adaptor_.pendingCommitCount();
auto const quorum = adaptor_.quorumThreshold();
auto const commitQuorum = adaptor_.hasQuorumOfCommits();
auto const minReveals = adaptor_.hasMinimumReveals();
auto const anyReveals = adaptor_.hasAnyReveals();
std::optional<std::size_t> reveals;
std::optional<std::size_t> expected;
if constexpr (requires(Adaptor& a) { a.pendingRevealCount(); })
reveals = adaptor_.pendingRevealCount();
if constexpr (requires(Adaptor& a) {
a.expectedProposerCount();
})
expected = adaptor_.expectedProposerCount();
JLOG(j_.info())
<< "STALLDIAG: rng-counters"
<< " commits=" << commits << " quorum=" << quorum
<< " commitQuorum=" << (commitQuorum ? "yes" : "no")
<< " reveals="
<< (reveals ? std::to_string(*reveals) : std::string{"n/a"})
<< " minReveals=" << (minReveals ? "yes" : "no")
<< " anyReveals=" << (anyReveals ? "yes" : "no")
<< " expectedProposers="
<< (expected ? std::to_string(*expected)
: std::string{"n/a"});
}
};
auto publishEntropySet = [&]() {
auto entropySetHash = adaptor_.buildEntropySet(buildSeq);
auto newPos = result_->position.position();
if (newPos.entropySetHash &&
*newPos.entropySetHash == entropySetHash)
{
JLOG(j_.debug()) << "RNG: entropySet already published hash="
<< entropySetHash;
return;
}
newPos.entropySetHash = entropySetHash;
result_->position.changePosition(
@@ -1617,13 +1731,6 @@ Consensus<Adaptor>::phaseEstablish(
// Publish entropySetHash before accepting so lagging peers
// can fetch/merge reveal sets in ConvergingReveal.
//
// Experiment note:
// We tested single-broadcaster seq=3 fanout reduction and
// observed frequent syncing/mismatch loops under stressed
// packet drops (e.g. 25% dropped RNG claim traffic). Keep
// full proposer broadcast here for robustness; the extra
// proposal chatter is the explicit cost we pay.
if (mode_.get() == ConsensusMode::proposing)
adaptor_.propose(result_->position);
@@ -1687,6 +1794,7 @@ Consensus<Adaptor>::phaseEstablish(
JLOG(j_.debug())
<< "RNG: skipping commit wait (participants="
<< participants << " < threshold=" << threshold << ")";
logRngDiag("rng-commit-wait-impossible-quorum");
// Fall through to close with zero entropy
}
else
@@ -1694,7 +1802,10 @@ Consensus<Adaptor>::phaseEstablish(
bool timeout =
result_->roundTime.read() > parms.rngPIPELINE_TIMEOUT;
if (!timeout)
{
logRngDiag("rng-commit-wait");
return; // Wait for more commits
}
// Timeout waiting for all expected proposers.
// If we still have quorum (80% of UNL), proceed
@@ -1726,6 +1837,7 @@ Consensus<Adaptor>::phaseEstablish(
<< " (timeout fallback)";
return;
}
logRngDiag("rng-commit-timeout-below-quorum");
// Truly below quorum: fall through to zero entropy
}
}
@@ -1799,6 +1911,7 @@ Consensus<Adaptor>::phaseEstablish(
JLOG(j_.warn())
<< "RNG: conflicting commitSetHash detected; "
"waiting briefly for convergence/fetch";
logRngDiag("rng-commit-conflict-start");
return;
}
@@ -1815,6 +1928,7 @@ Consensus<Adaptor>::phaseEstablish(
std::chrono::milliseconds>(conflictElapsed)
.count()
<< "ms; staying in ConvergingCommit";
logRngDiag("rng-commit-conflict-wait");
return;
}
@@ -1830,6 +1944,7 @@ Consensus<Adaptor>::phaseEstablish(
JLOG(j_.warn())
<< "RNG: commitSetHash conflict persisted; forcing "
"zero-entropy fallback";
logRngDiag("rng-commit-conflict-timeout-fallback");
return;
}
}
@@ -1863,6 +1978,7 @@ Consensus<Adaptor>::phaseEstablish(
}
else
{
logRngDiag("rng-reveal-wait-after-transition");
return; // Wait for next tick
}
}
@@ -1874,28 +1990,271 @@ Consensus<Adaptor>::phaseEstablish(
std::chrono::steady_clock::now() - revealPhaseStart_;
bool timeout = elapsed > parms.rngREVEAL_TIMEOUT;
bool ready = false;
bool const revealConsensus =
haveConsensus(clog) && adaptor_.hasMinimumReveals();
if ((haveConsensus(clog) && adaptor_.hasMinimumReveals()) ||
timeout)
if (revealConsensus || timeout)
{
JLOG(j_.info())
<< "STALLDIAG: rng-reveal-gate-open"
<< " revealConsensus=" << (revealConsensus ? "yes" : "no")
<< " timeout=" << (timeout ? "yes" : "no") << " elapsedMs="
<< std::chrono::duration_cast<std::chrono::milliseconds>(
elapsed)
.count();
if (timeout && !adaptor_.hasAnyReveals())
{
adaptor_.setEntropyFailed();
JLOG(j_.warn()) << "RNG: entropy failed (no reveals)";
logRngDiag("rng-reveal-timeout-no-reveals");
}
else
{
publishEntropySet();
logRngDiag("rng-reveal-published-entropy-set");
}
ready = true;
}
if (!ready)
{
JLOG(j_.info())
<< "STALLDIAG: rng-reveal-gate-blocked"
<< " revealConsensus=" << (revealConsensus ? "yes" : "no")
<< " timeout=" << (timeout ? "yes" : "no") << " elapsedMs="
<< std::chrono::duration_cast<std::chrono::milliseconds>(
elapsed)
.count();
logRngDiag("rng-reveal-wait");
return;
}
// Optional explicit final proposal (seq=4 style):
// publish a synthetic tx-set hash that includes the
// consensus-entropy pseudo-tx just before accept.
//
// IMPORTANT DESIGN NOTE (read before editing this block):
//
// This path is intentionally OPTIONAL and default-off. It exists
// for diagnostics/perf experiments (for example, making monitor
// visibility of the final pseudo-tx set more direct), NOT as a
// required step for consensus correctness.
//
// Why so conservative?
// - The main consensus engine still keys agreement on tx-set hash.
// - Updating our tx-set hash here creates a "late identity
// change" in establish.
// - Under lossy/reordered networks, peers can be slightly out of
// phase: some nodes may have switched to the synthetic hash
// while others are still on the base hash.
// - That can fragment agreement during a critical window (two
// hashes in flight for one ledger), increase proposal chatter,
// and trigger sync churn.
//
// Therefore this logic must remain best-effort only:
// - Never required for liveness/safety.
// - No extra wait tick is introduced.
// - If gates are not met, we skip and continue to accept via the
// normal implicit path (accept-time pseudo-tx injection).
//
// TBD (2026-03-03): We did not find a robust timing model that
// folds this into a guaranteed-safe explicit final proposal across
// lossy/reordered links without increasing churn. Keep this path
// as opt-in for future evaluation.
if constexpr (requires(
Adaptor& a,
TxSet_t const& txns,
typename Ledger_t::Seq seq) {
a.shouldSendExplicitFinalProposal();
a.hasQuorumOfCommits();
a.buildExplicitFinalProposalTxSet(txns, seq);
})
{
bool fullParticipantCoverage = false;
bool entropyAligned = false;
if constexpr (requires(Position_t const& p) {
p.entropySetHash;
})
{
// Guard against "early switch" churn:
// require at least as many participants as the previous
// round before attempting the explicit-final mutation.
//
// This is a heuristic to reduce risk, not a proof of
// safety. We still keep the feature optional/default-off.
auto const participants = currPeerPositions_.size() + 1;
auto const expectedParticipants = prevProposers_ + 1;
fullParticipantCoverage =
participants >= expectedParticipants;
// Require a majority aligned on entropySetHash before
// mutating tx-set hash. If this threshold is loosened, the
// probability of hash fragmentation rises quickly.
auto const requiredEntropyAligned =
(expectedParticipants / 2) + 1;
auto const ourPos = result_->position.position();
if (ourPos.entropySetHash)
{
auto const expectedEntropy = *ourPos.entropySetHash;
std::size_t alignedPeers = 0;
bool conflict = false;
for (auto const& [_, peerPos] : currPeerPositions_)
{
auto const& peerPosition =
peerPos.proposal().position();
if (!peerPosition.entropySetHash)
continue;
if (*peerPosition.entropySetHash == expectedEntropy)
{
++alignedPeers;
continue;
}
conflict = true;
break;
}
auto const alignedParticipants = alignedPeers + 1;
entropyAligned = !conflict &&
alignedParticipants >= requiredEntropyAligned;
if (!entropyAligned)
{
JLOG(j_.debug())
<< "RNG: explicit-final entropy alignment "
"insufficient"
<< " alignedParticipants="
<< alignedParticipants
<< " required=" << requiredEntropyAligned
<< " conflict=" << (conflict ? "yes" : "no");
}
}
else
{
JLOG(j_.debug())
<< "RNG: explicit-final waiting on local "
"entropySetHash";
}
}
if (mode_.get() == ConsensusMode::proposing &&
!explicitFinalProposalSent_ &&
adaptor_.hasQuorumOfCommits() && revealConsensus &&
fullParticipantCoverage && entropyAligned &&
adaptor_.shouldSendExplicitFinalProposal())
{
// One-shot per round. This avoids repeated mutations/
// broadcasts from timer ticks, which can amplify network
// chatter in the exact conditions (loss/reordering) where
// this path is already fragile.
auto const synthSet =
adaptor_.buildExplicitFinalProposalTxSet(
result_->txns, buildSeq);
explicitFinalProposalSent_ = true;
if (synthSet)
{
auto const synthHash = synthSet->id();
auto currentPos = result_->position.position();
auto newPos = currentPos;
if constexpr (requires(typename Adaptor::Position_t p) {
p.updateTxSet(synthHash);
})
newPos.updateTxSet(synthHash);
else
newPos = synthHash;
if (!(newPos == currentPos))
{
// WARNING:
// This changes proposal tx-set identity late in
// establish. Keep this path tightly gated and
// optional. The canonical ledger path remains the
// implicit accept-time injection logic.
// Maintain the invariant that our active
// position's tx-set hash is present in acquired_,
// otherwise gotTxSet can assert if this set arrives
// back from the network.
auto const [_, inserted] =
acquired_.emplace(synthHash, *synthSet);
JLOG(j_.debug())
<< "RNG: cached explicit-final txSet="
<< synthHash << " inserted=" << inserted;
if (inserted)
{
// Make the synthetic set discoverable by peers
// immediately; otherwise they can request this
// hash and hit transient "getTxSet: Failed to
// find TX set" on random peers.
adaptor_.share(*synthSet);
JLOG(j_.debug())
<< "RNG: shared explicit-final txSet="
<< synthHash;
}
result_->position.changePosition(
newPos,
asCloseTime(result_->position.closeTime()),
now_);
adaptor_.propose(result_->position);
JLOG(j_.debug())
<< "RNG: explicit final proposal txSet="
<< synthHash;
logRngDiag("rng-explicit-final-proposed");
}
}
}
else
{
char const* reason = "disabled";
if (mode_.get() != ConsensusMode::proposing)
reason = "not-proposing";
else if (explicitFinalProposalSent_)
reason = "already-sent";
else if (!adaptor_.hasQuorumOfCommits())
reason = "no-commit-quorum";
else if (!revealConsensus)
reason = "reveal-timeout";
else if (!fullParticipantCoverage)
reason = "participant-gap";
else if (!entropyAligned)
reason = "entropy-not-aligned";
JLOG(j_.info())
<< "STALLDIAG: rng-explicit-final-skipped"
<< " reason=" << reason
<< " mode=" << to_string(mode_.get()) << " sent="
<< (explicitFinalProposalSent_ ? "yes" : "no");
}
}
}
}
//@@end rng-phase-establish-substates
JLOG(j_.info()) << "STALLDIAG: establish-ready-to-accept"
<< " phase=" << to_string(phase_)
<< " mode=" << to_string(mode_.get())
<< " roundMs=" << result_->roundTime.read().count()
<< " convergePct=" << convergePercent_
<< " peerPositions=" << currPeerPositions_.size()
<< " prevProposers=" << prevProposers_
<< " explicitFinalSent="
<< (explicitFinalProposalSent_ ? "yes" : "no");
if constexpr (requires(Position_t const& p) {
p.commitSetHash;
p.entropySetHash;
p.myCommitment;
p.myReveal;
})
{
auto const pos = result_->position.position();
JLOG(j_.info()) << "STALLDIAG: establish-ready-sidecar"
<< " txSet=" << pos << " commitSetHash="
<< (pos.commitSetHash ? to_string(*pos.commitSetHash)
: std::string{"none"})
<< " entropySetHash="
<< (pos.entropySetHash ? to_string(*pos.entropySetHash)
: std::string{"none"})
<< " myCommitment=" << (pos.myCommitment ? "yes" : "no")
<< " myReveal=" << (pos.myReveal ? "yes" : "no");
}
JLOG(j_.info()) << "STARTDIAG: converge cutoff"
<< " peerPositions=" << currPeerPositions_.size()
<< " roundTime=" << result_->roundTime.read().count()
@@ -1929,6 +2288,7 @@ Consensus<Adaptor>::closeLedger(std::unique_ptr<std::stringstream> const& clog)
phase_ = ConsensusPhase::establish;
estState_ = EstablishState::ConvergingTx;
explicitFinalProposalSent_ = false;
JLOG(j_.debug()) << "transitioned to ConsensusPhase::establish";
rawCloseTimes_.self = now_;
@@ -2281,6 +2641,33 @@ Consensus<Adaptor>::haveConsensus(
<< " roundTime=" << result_->roundTime.read().count()
<< "ms"
<< " mode=" << to_string(mode_.get());
JLOG(j_.info()) << "STALLDIAG: haveConsensus-self"
<< " position=" << ourPosition << " closeTime="
<< result_->position.closeTime().time_since_epoch().count()
<< " haveCloseTimeConsensus="
<< (haveCloseTimeConsensus_ ? "yes" : "no")
<< " phase=" << to_string(phase_);
if constexpr (requires(Position_t const& p) {
p.commitSetHash;
p.entropySetHash;
p.myCommitment;
p.myReveal;
})
{
JLOG(j_.info()) << "STALLDIAG: haveConsensus-sidecar"
<< " commitSetHash="
<< (ourPosition.commitSetHash
? to_string(*ourPosition.commitSetHash)
: std::string{"none"})
<< " entropySetHash="
<< (ourPosition.entropySetHash
? to_string(*ourPosition.entropySetHash)
: std::string{"none"})
<< " myCommitment="
<< (ourPosition.myCommitment ? "yes" : "no")
<< " myReveal="
<< (ourPosition.myReveal ? "yes" : "no");
}
// Determine if we actually have consensus or not
result_->state = checkConsensus(
@@ -2297,6 +2684,11 @@ Consensus<Adaptor>::haveConsensus(
if (result_->state == ConsensusState::No)
{
JLOG(j_.info()) << "STALLDIAG: haveConsensus-result"
<< " state=No"
<< " agree=" << agree << " disagree=" << disagree
<< " total=" << (agree + disagree)
<< " finished=" << currentFinished;
CLOG(clog) << "No consensus. ";
return false;
}
@@ -2305,6 +2697,7 @@ Consensus<Adaptor>::haveConsensus(
// without us.
if (result_->state == ConsensusState::MovedOn)
{
JLOG(j_.warn()) << "STALLDIAG: haveConsensus-result state=MovedOn";
JLOG(j_.error()) << "Unable to reach consensus";
JLOG(j_.error()) << Json::Compact{getJson(true)};
CLOG(clog) << "Unable to reach consensus "

View File

@@ -99,6 +99,9 @@ doRuntimeConfig(RPC::JsonContext& context)
pct = 100.0;
cfg.rngClaimDropPctX100 = static_cast<int>(pct * 100);
}
if (v.isMember("explicit_final_proposal"))
cfg.explicitFinalProposal =
v["explicit_final_proposal"].asBool();
if (v.isMember("message_types"))
{
auto const& mts = v["message_types"];
@@ -154,6 +157,8 @@ doRuntimeConfig(RPC::JsonContext& context)
entry["send_drop_pct"] = *cfg.sendDropPctX100 / 100.0;
if (cfg.rngClaimDropPctX100)
entry["rng_claim_drop_pct"] = *cfg.rngClaimDropPctX100 / 100.0;
if (cfg.explicitFinalProposal.has_value())
entry["explicit_final_proposal"] = *cfg.explicitFinalProposal;
if (cfg.messageCategories)
{
Json::Value types{Json::arrayValue};