1#ifndef XRPL_CONSENSUS_CONSENSUS_H_INCLUDED
2#define XRPL_CONSENSUS_CONSENSUS_H_INCLUDED
4#include <xrpld/consensus/ConsensusParms.h>
5#include <xrpld/consensus/ConsensusProposal.h>
6#include <xrpld/consensus/ConsensusTypes.h>
7#include <xrpld/consensus/DisputedTx.h>
8#include <xrpld/consensus/LedgerTiming.h>
10#include <xrpl/basics/Log.h>
11#include <xrpl/basics/chrono.h>
12#include <xrpl/beast/utility/Journal.h>
13#include <xrpl/json/json_writer.h>
52 ConsensusParms
const& parms,
84 ConsensusParms
const& parms,
277template <
class Adaptor>
283 using Tx_t =
typename TxSet_t::Tx;
287 typename Ledger_t::ID,
288 typename TxSet_t::ID>;
311 a.onModeChange(
mode_, mode);
352 std::unique_ptr<
std::stringstream> const& clog = {});
411 typename Ledger_t::ID
445 typename Ledger_t::ID
const& lgrId,
619template <
class Adaptor>
624 : adaptor_(adaptor), clock_(clock), j_{journal}
626 JLOG(
j_.
debug()) <<
"Creating consensus object";
629template <
class Adaptor>
633 typename Ledger_t::ID
const& prevLedgerID,
642 prevRoundTime_ = adaptor_.parms().ledgerIDLE_INTERVAL;
643 prevCloseTime_ = prevLedger.closeTime();
648 prevCloseTime_ = rawCloseTimes_.self;
651 for (
NodeID_t const& n : nowUntrusted)
652 recentPeerPositions_.erase(n);
658 if (prevLedger.id() != prevLedgerID)
661 if (
auto newLedger = adaptor_.acquireLedger(prevLedgerID))
663 prevLedger = *newLedger;
669 <<
"Entering consensus with: " << previousLedger_.id();
670 JLOG(j_.
info()) <<
"Correct LCL is: " << prevLedgerID;
674 startRoundInternal(now, prevLedgerID, prevLedger, startMode, clog);
676template <
class Adaptor>
680 typename Ledger_t::ID
const& prevLedgerID,
686 JLOG(j_.
debug()) <<
"transitioned to ConsensusPhase::open ";
687 CLOG(clog) <<
"startRoundInternal transitioned to ConsensusPhase::open, "
688 "previous ledgerID: "
689 << prevLedgerID <<
", seq: " << prevLedger.seq() <<
". ";
690 mode_.set(mode, adaptor_);
692 prevLedgerID_ = prevLedgerID;
693 previousLedger_ = prevLedger;
695 convergePercent_ = 0;
697 haveCloseTimeConsensus_ =
false;
698 openTime_.reset(clock_.now());
699 currPeerPositions_.clear();
701 rawCloseTimes_.peers.clear();
702 rawCloseTimes_.self = {};
706 previousLedger_.closeTimeResolution(),
707 previousLedger_.closeAgree(),
708 previousLedger_.seq() +
typename Ledger_t::Seq{1});
711 CLOG(clog) <<
"number of peer proposals,previous proposers: "
712 << currPeerPositions_.size() <<
',' << prevProposers_ <<
". ";
713 if (currPeerPositions_.size() > (prevProposers_ / 2))
717 CLOG(clog) <<
"consider closing the ledger immediately. ";
718 timerEntry(now_, clog);
722template <
class Adaptor>
728 JLOG(j_.
debug()) <<
"PROPOSAL " << newPeerPos.render();
729 auto const& peerID = newPeerPos.proposal().nodeID();
733 auto& props = recentPeerPositions_[peerID];
735 if (props.size() >= 10)
738 props.push_back(newPeerPos);
740 return peerProposalInternal(now, newPeerPos);
743template <
class Adaptor>
755 auto const& newPeerProp = newPeerPos.proposal();
757 if (newPeerProp.prevLedger() != prevLedgerID_)
759 JLOG(j_.
debug()) <<
"Got proposal for " << newPeerProp.prevLedger()
760 <<
" but we are on " << prevLedgerID_;
764 auto const& peerID = newPeerProp.nodeID();
766 if (deadNodes_.find(peerID) != deadNodes_.end())
768 JLOG(j_.
info()) <<
"Position from dead node: " << peerID;
774 auto peerPosIt = currPeerPositions_.find(peerID);
776 if (peerPosIt != currPeerPositions_.end())
778 if (newPeerProp.proposeSeq() <=
779 peerPosIt->second.proposal().proposeSeq())
785 if (newPeerProp.isBowOut())
787 JLOG(j_.
info()) <<
"Peer " << peerID <<
" bows out";
790 for (
auto& it : result_->disputes)
791 it.second.unVote(peerID);
793 if (peerPosIt != currPeerPositions_.end())
794 currPeerPositions_.erase(peerID);
795 deadNodes_.insert(peerID);
800 if (peerPosIt != currPeerPositions_.end())
801 peerPosIt->second = newPeerPos;
803 currPeerPositions_.emplace(peerID, newPeerPos);
806 if (newPeerProp.isInitial())
809 JLOG(j_.
trace()) <<
"Peer reports close time as "
810 << newPeerProp.closeTime().time_since_epoch().count();
811 ++rawCloseTimes_.peers[newPeerProp.closeTime()];
814 JLOG(j_.
trace()) <<
"Processing peer proposal " << newPeerProp.proposeSeq()
815 <<
"/" << newPeerProp.position();
818 auto const ait = acquired_.find(newPeerProp.position());
819 if (ait == acquired_.end())
824 if (
auto set = adaptor_.acquireTxSet(newPeerProp.position()))
825 gotTxSet(now_, *
set);
827 JLOG(j_.
debug()) <<
"Don't have tx set for peer";
831 updateDisputes(newPeerProp.nodeID(), ait->second);
838template <
class Adaptor>
844 CLOG(clog) <<
"Consensus<Adaptor>::timerEntry. ";
848 CLOG(clog) <<
"Nothing to do during accepted phase. ";
853 CLOG(clog) <<
"Set network adjusted time to " <<
to_string(now) <<
". ";
856 auto const phaseOrig = phase_;
857 CLOG(clog) <<
"Phase " <<
to_string(phaseOrig) <<
". ";
859 if (phaseOrig != phase_)
861 CLOG(clog) <<
"Changed phase to << " <<
to_string(phase_) <<
". ";
867 phaseEstablish(clog);
868 CLOG(clog) <<
"timerEntry finishing in phase " <<
to_string(phase_) <<
". ";
871template <
class Adaptor>
883 auto id = txSet.id();
887 if (!acquired_.emplace(
id, txSet).second)
892 JLOG(j_.
debug()) <<
"Not creating disputes: no position yet.";
899 id != result_->position.position(),
900 "ripple::Consensus::gotTxSet : updated transaction set");
902 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
904 if (peerPos.proposal().position() == id)
906 updateDisputes(nodeId, txSet);
914 <<
"By the time we got " <<
id <<
" no peers were proposing it";
919template <
class Adaptor>
925 using namespace std::chrono_literals;
926 JLOG(j_.
info()) <<
"Simulating consensus";
929 result_->roundTime.tick(consensusDelay.
value_or(100ms));
930 result_->proposers = prevProposers_ = currPeerPositions_.size();
931 prevRoundTime_ = result_->roundTime.read();
933 adaptor_.onForceAccept(
940 JLOG(j_.
info()) <<
"Simulation complete";
943template <
class Adaptor>
953 ret[
"proposers"] =
static_cast<int>(currPeerPositions_.size());
957 ret[
"synched"] =
true;
960 ret[
"close_granularity"] =
static_cast<Int
>(closeResolution_.count());
963 ret[
"synched"] =
false;
967 if (result_ && !result_->disputes.empty() && !full)
968 ret[
"disputes"] =
static_cast<Int
>(result_->disputes.size());
971 ret[
"our_position"] = result_->position.getJson();
977 static_cast<Int
>(result_->roundTime.read().count());
978 ret[
"converge_percent"] = convergePercent_;
979 ret[
"close_resolution"] =
static_cast<Int
>(closeResolution_.count());
980 ret[
"have_time_consensus"] = haveCloseTimeConsensus_;
981 ret[
"previous_proposers"] =
static_cast<Int
>(prevProposers_);
982 ret[
"previous_mseconds"] =
static_cast<Int
>(prevRoundTime_.count());
984 if (!currPeerPositions_.empty())
988 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
990 ppj[
to_string(nodeId)] = peerPos.getJson();
992 ret[
"peer_positions"] = std::move(ppj);
995 if (!acquired_.empty())
998 for (
auto const& at : acquired_)
1002 ret[
"acquired"] = std::move(acq);
1005 if (result_ && !result_->disputes.empty())
1008 for (
auto const& [txId, dispute] : result_->disputes)
1010 dsj[
to_string(txId)] = dispute.getJson();
1012 ret[
"disputes"] = std::move(dsj);
1015 if (!rawCloseTimes_.peers.empty())
1018 for (
auto const& ct : rawCloseTimes_.peers)
1023 ret[
"close_times"] = std::move(ctj);
1026 if (!deadNodes_.empty())
1029 for (
auto const& dn : deadNodes_)
1033 ret[
"dead_nodes"] = std::move(dnj);
1041template <
class Adaptor>
1044 typename Ledger_t::ID
const& lgrId,
1047 CLOG(clog) <<
"handleWrongLedger. ";
1049 lgrId != prevLedgerID_ || previousLedger_.id() != lgrId,
1050 "ripple::Consensus::handleWrongLedger : have wrong ledger");
1053 leaveConsensus(clog);
1056 if (prevLedgerID_ != lgrId)
1058 prevLedgerID_ = lgrId;
1063 result_->disputes.clear();
1064 result_->compares.clear();
1067 currPeerPositions_.clear();
1068 rawCloseTimes_.peers.clear();
1072 playbackProposals();
1075 if (previousLedger_.id() == prevLedgerID_)
1077 CLOG(clog) <<
"previousLedger_.id() == prevLeverID_ " << prevLedgerID_
1083 if (
auto newLedger = adaptor_.acquireLedger(prevLedgerID_))
1085 JLOG(j_.
info()) <<
"Have the consensus ledger " << prevLedgerID_;
1086 CLOG(clog) <<
"Have the consensus ledger " << prevLedgerID_ <<
". ";
1092 CLOG(clog) <<
"Still on wrong ledger. ";
1097template <
class Adaptor>
1101 CLOG(clog) <<
"checkLedger. ";
1104 adaptor_.getPrevLedger(prevLedgerID_, previousLedger_, mode_.get());
1105 CLOG(clog) <<
"network ledgerid " << netLgr <<
", "
1106 <<
"previous ledger " << prevLedgerID_ <<
". ";
1108 if (netLgr != prevLedgerID_)
1111 ss <<
"View of consensus changed during " <<
to_string(phase_)
1112 <<
" mode=" <<
to_string(mode_.get()) <<
", " << prevLedgerID_
1113 <<
" to " << netLgr <<
", "
1116 CLOG(clog) << ss.
str();
1117 CLOG(clog) <<
"State on consensus change "
1119 handleWrongLedger(netLgr, clog);
1121 else if (previousLedger_.id() != prevLedgerID_)
1123 CLOG(clog) <<
"previousLedger_.id() != prevLedgerID_: "
1124 << previousLedger_.id() <<
',' <<
to_string(prevLedgerID_)
1126 handleWrongLedger(netLgr, clog);
1130template <
class Adaptor>
1134 for (
auto const& it : recentPeerPositions_)
1136 for (
auto const& pos : it.second)
1138 if (pos.proposal().prevLedger() == prevLedgerID_)
1140 if (peerProposalInternal(now_, pos))
1141 adaptor_.share(pos);
1147template <
class Adaptor>
1151 CLOG(clog) <<
"phaseOpen. ";
1155 bool anyTransactions = adaptor_.hasOpenTransactions();
1156 auto proposersClosed = currPeerPositions_.size();
1157 auto proposersValidated = adaptor_.proposersValidated(prevLedgerID_);
1159 openTime_.tick(clock_.now());
1164 auto const mode = mode_.get();
1165 bool const closeAgree = previousLedger_.closeAgree();
1166 auto const prevCloseTime = previousLedger_.closeTime();
1167 auto const prevParentCloseTimePlus1 =
1168 previousLedger_.parentCloseTime() + 1s;
1169 bool const previousCloseCorrect =
1171 (prevCloseTime != prevParentCloseTimePlus1);
1173 auto const lastCloseTime = previousCloseCorrect
1177 if (now_ >= lastCloseTime)
1178 sinceClose = duration_cast<milliseconds>(now_ - lastCloseTime);
1180 sinceClose = -duration_cast<milliseconds>(lastCloseTime - now_);
1181 CLOG(
clog) <<
"calculating how long since last ledger's close time "
1183 <<
to_string(mode) <<
", previous closeAgree: " << closeAgree
1184 <<
", previous close time: " <<
to_string(prevCloseTime)
1185 <<
", previous parent close time + 1s: "
1187 <<
", previous close time seen internally: "
1189 <<
", last close time: " <<
to_string(lastCloseTime)
1190 <<
", since close: " << sinceClose.
count() <<
". ";
1194 adaptor_.parms().ledgerIDLE_INTERVAL,
1195 2 * previousLedger_.closeTimeResolution());
1196 CLOG(
clog) <<
"idle interval set to " << idleInterval.
count()
1198 <<
"ledgerIDLE_INTERVAL: "
1199 << adaptor_.parms().ledgerIDLE_INTERVAL.count()
1200 <<
", previous ledger close time resolution: "
1201 << previousLedger_.closeTimeResolution().count() <<
"ms. ";
1217 CLOG(
clog) <<
"closing ledger. ";
1222template <
class Adaptor>
1227 CLOG(
clog) <<
"shouldPause? ";
1228 auto const& parms = adaptor_.parms();
1230 previousLedger_.seq() -
1231 std::min(adaptor_.getValidLedgerIndex(), previousLedger_.seq()));
1232 auto [quorum, trustedKeys] = adaptor_.getQuorumKeys();
1233 std::size_t const totalValidators = trustedKeys.size();
1235 adaptor_.laggards(previousLedger_.seq(), trustedKeys);
1239 vars <<
" consensuslog (working seq: " << previousLedger_.seq() <<
", "
1240 <<
"validated seq: " << adaptor_.getValidLedgerIndex() <<
", "
1241 <<
"am validator: " << adaptor_.validator() <<
", "
1242 <<
"have validated: " << adaptor_.haveValidated() <<
", "
1243 <<
"roundTime: " << result_->roundTime.
read().count() <<
", "
1244 <<
"max consensus time: " << parms.ledgerMAX_CONSENSUS.count() <<
", "
1245 <<
"validators: " << totalValidators <<
", "
1246 <<
"laggards: " << laggards <<
", "
1247 <<
"offline: " << offline <<
", "
1248 <<
"quorum: " << quorum <<
")";
1250 if (!ahead || !laggards || !totalValidators || !adaptor_.validator() ||
1251 !adaptor_.haveValidated() ||
1252 result_->roundTime.read() > parms.ledgerMAX_CONSENSUS)
1254 j_.
debug() <<
"not pausing (early)" << vars.
str();
1255 CLOG(
clog) <<
"Not pausing (early). ";
1259 bool willPause =
false;
1295 std::size_t const phase = (ahead - 1) % (maxPausePhase + 1);
1304 if (laggards + offline > totalValidators - quorum)
1319 float const nonLaggards = totalValidators - (laggards + offline);
1320 float const quorumRatio =
1321 static_cast<float>(quorum) / totalValidators;
1322 float const allowedDissent = 1.0f - quorumRatio;
1323 float const phaseFactor =
static_cast<float>(phase) / maxPausePhase;
1325 if (nonLaggards / totalValidators <
1326 quorumRatio + (allowedDissent * phaseFactor))
1334 j_.
warn() <<
"pausing" << vars.
str();
1335 CLOG(
clog) <<
"pausing " << vars.
str() <<
". ";
1339 j_.
debug() <<
"not pausing" << vars.
str();
1340 CLOG(
clog) <<
"not pausing. ";
1345template <
class Adaptor>
1350 CLOG(
clog) <<
"phaseEstablish. ";
1352 XRPL_ASSERT(result_,
"ripple::Consensus::phaseEstablish : result is set");
1354 ++peerUnchangedCounter_;
1355 ++establishCounter_;
1360 result_->roundTime.tick(clock_.now());
1361 result_->proposers = currPeerPositions_.size();
1363 convergePercent_ = result_->roundTime.read() * 100 /
1365 CLOG(
clog) <<
"convergePercent_ " << convergePercent_
1366 <<
" is based on round duration so far: "
1367 << result_->roundTime.read().count() <<
"ms, "
1368 <<
"previous round duration: " << prevRoundTime_.count()
1376 CLOG(
clog) <<
"ledgerMIN_CONSENSUS not reached: "
1381 updateOurPositions(
clog);
1384 if (shouldPause(
clog) || !haveConsensus(
clog))
1387 if (!haveCloseTimeConsensus_)
1389 JLOG(j_.
info()) <<
"We have TX consensus but not CT consensus";
1390 CLOG(
clog) <<
"We have TX consensus but not CT consensus. ";
1394 JLOG(j_.
info()) <<
"Converge cutoff (" << currPeerPositions_.size()
1395 <<
" participants)";
1396 CLOG(
clog) <<
"Converge cutoff (" << currPeerPositions_.size()
1397 <<
" participants). Transitioned to ConsensusPhase::accepted. ";
1398 adaptor_.updateOperatingMode(currPeerPositions_.size());
1399 prevProposers_ = currPeerPositions_.size();
1400 prevRoundTime_ = result_->roundTime.read();
1402 JLOG(j_.
debug()) <<
"transitioned to ConsensusPhase::accepted";
1410 adaptor_.validating());
1413template <
class Adaptor>
1418 XRPL_ASSERT(!result_,
"ripple::Consensus::closeLedger : result is not set");
1421 JLOG(j_.
debug()) <<
"transitioned to ConsensusPhase::establish";
1422 rawCloseTimes_.self = now_;
1423 peerUnchangedCounter_ = 0;
1424 establishCounter_ = 0;
1426 result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get()));
1427 result_->roundTime.reset(clock_.now());
1430 if (acquired_.emplace(result_->txns.id(), result_->txns).second)
1431 adaptor_.share(result_->txns);
1433 auto const mode = mode_.get();
1435 <<
"closeLedger transitioned to ConsensusPhase::establish, mode: "
1437 <<
", number of peer positions: " << currPeerPositions_.
size() <<
". ";
1439 adaptor_.propose(result_->position);
1442 for (
auto const& pit : currPeerPositions_)
1444 auto const& pos = pit.second.proposal().position();
1445 auto const it = acquired_.find(pos);
1446 if (it != acquired_.end())
1447 createDisputes(it->second,
clog);
1466 int result = ((participants * percent) + (percent / 2)) / 100;
1468 return (result == 0) ? 1 : result;
1471template <
class Adaptor>
1478 result_,
"ripple::Consensus::updateOurPositions : result is set");
1484 CLOG(
clog) <<
"updateOurPositions. peerCutoff " <<
to_string(peerCutoff)
1485 <<
", ourCutoff " <<
to_string(ourCutoff) <<
". ";
1490 auto it = currPeerPositions_.
begin();
1491 while (it != currPeerPositions_.end())
1493 Proposal_t const& peerProp = it->second.proposal();
1494 if (peerProp.
isStale(peerCutoff))
1498 JLOG(j_.
warn()) <<
"Removing stale proposal from " << peerID;
1499 for (
auto& dt : result_->disputes)
1500 dt.second.unVote(peerID);
1501 it = currPeerPositions_.erase(it);
1506 ++closeTimeVotes[asCloseTime(peerProp.
closeTime())];
1518 for (
auto& [txId, dispute] : result_->disputes)
1522 if (dispute.updateVote(
1528 mutableSet.
emplace(result_->txns);
1530 if (dispute.getOurVote())
1533 mutableSet->insert(dispute.tx());
1538 mutableSet->erase(txId);
1544 ourNewSet.
emplace(std::move(*mutableSet));
1548 haveCloseTimeConsensus_ =
false;
1550 if (currPeerPositions_.empty())
1553 haveCloseTimeConsensus_ =
true;
1554 consensusCloseTime = asCloseTime(result_->position.closeTime());
1560 parms, closeTimeAvalancheState_, convergePercent_, 0, 0);
1562 closeTimeAvalancheState_ = *newState;
1563 CLOG(
clog) <<
"neededWeight " << neededWeight <<
". ";
1565 int participants = currPeerPositions_.size();
1568 ++closeTimeVotes[asCloseTime(result_->position.closeTime())];
1576 int const threshConsensus =
1580 ss <<
"Proposers:" << currPeerPositions_.size()
1581 <<
" nw:" << neededWeight <<
" thrV:" << threshVote
1582 <<
" thrC:" << threshConsensus;
1586 for (
auto const& [t, v] : closeTimeVotes)
1590 <<
static_cast<std::uint32_t>(previousLedger_.seq()) + 1 <<
": "
1591 << t.time_since_epoch().count() <<
" has " << v <<
", "
1592 << threshVote <<
" required";
1594 if (v >= threshVote)
1597 consensusCloseTime = t;
1600 if (threshVote >= threshConsensus)
1601 haveCloseTimeConsensus_ =
true;
1605 if (!haveCloseTimeConsensus_)
1608 <<
"No CT consensus:"
1609 <<
" Proposers:" << currPeerPositions_.size()
1611 <<
" Thresh:" << threshConsensus
1613 CLOG(
clog) <<
"No close time consensus. ";
1618 ((consensusCloseTime != asCloseTime(result_->position.closeTime())) ||
1619 result_->position.isStale(ourCutoff)))
1622 ourNewSet.
emplace(result_->txns);
1627 auto newID = ourNewSet->id();
1629 result_->txns = std::move(*ourNewSet);
1632 ss <<
"Position change: CTime "
1637 result_->position.changePosition(newID, consensusCloseTime, now_);
1641 if (acquired_.emplace(newID, result_->txns).second)
1643 if (!result_->position.isBowOut())
1644 adaptor_.share(result_->txns);
1646 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
1650 updateDisputes(nodeId, result_->txns);
1655 if (!result_->position.isBowOut() &&
1657 adaptor_.propose(result_->position);
1661template <
class Adaptor>
1667 XRPL_ASSERT(result_,
"ripple::Consensus::haveConsensus : has result");
1670 int agree = 0, disagree = 0;
1672 auto ourPosition = result_->position.position();
1675 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
1677 Proposal_t const& peerProp = peerPos.proposal();
1678 if (peerProp.
position() == ourPosition)
1684 JLOG(j_.
debug()) <<
"Proposal disagreement: Peer " << nodeId
1689 auto currentFinished =
1690 adaptor_.proposersFinished(previousLedger_, prevLedgerID_);
1692 JLOG(j_.
debug()) <<
"Checking for TX consensus: agree=" << agree
1693 <<
", disagree=" << disagree;
1699 bool const stalled = haveCloseTimeConsensus_ &&
1700 !result_->disputes.empty() &&
1702 [
this, &parms, &
clog](
auto const& dispute) {
1703 return dispute.second.stalled(
1705 mode_.get() == ConsensusMode::proposing,
1706 peerUnchangedCounter_,
1713 ss <<
"Consensus detects as stalled with " << (agree + disagree) <<
"/"
1714 << prevProposers_ <<
" proposers, and " << result_->disputes.size()
1715 <<
" stalled disputed transactions.";
1727 result_->roundTime.read(),
1736 CLOG(
clog) <<
"No consensus. ";
1743 static auto const minimumCounter =
1746 if (establishCounter_ < minimumCounter)
1755 ss <<
"Consensus time has expired in round " << establishCounter_
1756 <<
"; continue until round " << minimumCounter <<
". "
1759 CLOG(
clog) << ss.
str() <<
". ";
1764 CLOG(
clog) << ss.
str() <<
". ";
1765 leaveConsensus(
clog);
1771 JLOG(j_.
error()) <<
"Unable to reach consensus";
1773 CLOG(
clog) <<
"Unable to reach consensus "
1777 CLOG(
clog) <<
"Consensus has been reached. ";
1781template <
class Adaptor>
1788 if (result_ && !result_->position.isBowOut())
1790 result_->position.bowOut(now_);
1791 adaptor_.propose(result_->position);
1795 JLOG(j_.
info()) <<
"Bowing out of consensus";
1796 CLOG(
clog) <<
"Bowing out of consensus. ";
1800template <
class Adaptor>
1807 XRPL_ASSERT(result_,
"ripple::Consensus::createDisputes : result is set");
1810 auto const emplaced = result_->compares.emplace(o.id()).second;
1811 CLOG(
clog) <<
"createDisputes: new set? " << !emplaced <<
". ";
1816 if (result_->txns.id() == o.id())
1818 CLOG(
clog) <<
"both sets are identical. ";
1822 CLOG(
clog) <<
"comparing existing with new set: " << result_->txns.id()
1823 <<
',' << o.id() <<
". ";
1824 JLOG(j_.
debug()) <<
"createDisputes " << result_->txns.id() <<
" to "
1827 auto differences = result_->txns.compare(o);
1831 for (
auto const& [txId, inThisSet] : differences)
1836 (inThisSet && result_->txns.find(txId) && !o.find(txId)) ||
1837 (!inThisSet && !result_->txns.find(txId) && o.find(txId)),
1838 "ripple::Consensus::createDisputes : has disputed transactions");
1840 Tx_t tx = inThisSet ? result_->txns.find(txId) : o.find(txId);
1841 auto txID = tx.id();
1843 if (result_->disputes.find(txID) != result_->disputes.end())
1846 JLOG(j_.
debug()) <<
"Transaction " << txID <<
" is disputed";
1850 result_->txns.exists(txID),
1851 std::max(prevProposers_, currPeerPositions_.size()),
1855 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
1857 Proposal_t const& peerProp = peerPos.proposal();
1858 auto const cit = acquired_.find(peerProp.
position());
1859 if (cit != acquired_.end() &&
1860 dtx.setVote(nodeId, cit->second.exists(txID)))
1861 peerUnchangedCounter_ = 0;
1863 adaptor_.share(dtx.tx());
1865 result_->disputes.emplace(txID, std::move(dtx));
1867 JLOG(j_.
debug()) << dc <<
" differences found";
1868 CLOG(
clog) <<
"disputes: " << dc <<
". ";
1871template <
class Adaptor>
1876 XRPL_ASSERT(result_,
"ripple::Consensus::updateDisputes : result is set");
1880 if (result_->compares.find(other.id()) == result_->compares.end())
1881 createDisputes(other);
1883 for (
auto& it : result_->disputes)
1885 auto& d = it.second;
1886 if (d.setVote(node, other.exists(d.tx().id())))
1887 peerUnchangedCounter_ = 0;
1891template <
class Adaptor>
Decorator for streaming out compact json.
Value & append(Value const &value)
Append value to array at the end.
A generic endpoint for log messages.
Stream trace() const
Severity stream access functions.
NodeID_t const & nodeID() const
Identifying which peer took this position.
NetClock::time_point const & closeTime() const
The current position on the consensus close time.
Position_t const & position() const
Get the proposed position.
bool isStale(NetClock::time_point cutoff) const
Get whether this position is stale relative to the provided cutoff.
Measures the duration of phases of consensus.
void set(ConsensusMode mode, Adaptor &a)
MonitoredMode(ConsensusMode m)
ConsensusMode get() const
Generic implementation of consensus algorithm.
hash_map< typename TxSet_t::ID, TxSet_t const > acquired_
void playbackProposals()
If we radically changed our consensus context for some reason, we need to replay recent proposals so ...
void timerEntry(NetClock::time_point const &now, std::unique_ptr< std::stringstream > const &clog={})
Call periodically to drive consensus forward.
void startRoundInternal(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t const &prevLedger, ConsensusMode mode, std::unique_ptr< std::stringstream > const &clog)
void phaseEstablish(std::unique_ptr< std::stringstream > const &clog)
Handle establish phase.
typename Adaptor::PeerPosition_t PeerPosition_t
NetClock::time_point prevCloseTime_
clock_type const & clock_
void leaveConsensus(std::unique_ptr< std::stringstream > const &clog)
void updateDisputes(NodeID_t const &node, TxSet_t const &other)
typename Adaptor::TxSet_t TxSet_t
ConsensusParms::AvalancheState closeTimeAvalancheState_
std::size_t establishCounter_
void updateOurPositions(std::unique_ptr< std::stringstream > const &clog)
bool haveConsensus(std::unique_ptr< std::stringstream > const &clog)
Ledger_t::ID prevLedgerID() const
Get the previous ledger ID.
void handleWrongLedger(typename Ledger_t::ID const &lgrId, std::unique_ptr< std::stringstream > const &clog)
void checkLedger(std::unique_ptr< std::stringstream > const &clog)
Check if our previous ledger matches the network's.
hash_map< NodeID_t, std::deque< PeerPosition_t > > recentPeerPositions_
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Simulate the consensus process without any network traffic.
Json::Value getJson(bool full) const
Get the Json state of the consensus process.
typename TxSet_t::Tx Tx_t
void startRound(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t prevLedger, hash_set< NodeID_t > const &nowUntrusted, bool proposing, std::unique_ptr< std::stringstream > const &clog={})
Kick-off the next round of consensus.
Consensus(Consensus &&) noexcept=default
NetClock::time_point now_
std::size_t prevProposers_
NetClock::time_point asCloseTime(NetClock::time_point raw) const
void createDisputes(TxSet_t const &o, std::unique_ptr< std::stringstream > const &clog={})
void gotTxSet(NetClock::time_point const &now, TxSet_t const &txSet)
Process a transaction set acquired from the network.
typename Adaptor::Ledger_t Ledger_t
ConsensusPhase phase() const
std::size_t peerUnchangedCounter_
typename Adaptor::NodeID_t NodeID_t
void closeLedger(std::unique_ptr< std::stringstream > const &clog)
NetClock::duration closeResolution_
bool peerProposal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
A peer has proposed a new position, adjust our tracking.
bool peerProposalInternal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
Handle a replayed or a new peer proposal.
hash_map< NodeID_t, PeerPosition_t > currPeerPositions_
bool shouldPause(std::unique_ptr< std::stringstream > const &clog) const
Evaluate whether pausing increases likelihood of validation.
void phaseOpen(std::unique_ptr< std::stringstream > const &clog)
Handle pre-close phase.
ConsensusCloseTimes rawCloseTimes_
std::chrono::milliseconds prevRoundTime_
std::optional< Result > result_
hash_set< NodeID_t > deadNodes_
Ledger_t::ID prevLedgerID_
bool haveCloseTimeConsensus_
A transaction discovered to be in dispute during consensus.
@ arrayValue
array value (ordered list)
@ objectValue
object value (collection of name/value pairs).
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::pair< std::size_t, std::optional< ConsensusParms::AvalancheState > > getNeededWeight(ConsensusParms const &p, ConsensusParms::AvalancheState currentState, int percentTime, std::size_t currentRounds, std::size_t minimumRounds)
ConsensusMode
Represents how a node currently participates in Consensus.
@ wrongLedger
We have the wrong ledger and are attempting to acquire it.
@ proposing
We are normal participant in consensus and propose our position.
@ switchedLedger
We switched ledgers since we started this consensus round but are now running on what we believe is t...
@ observing
We are observing peer positions, but not proposing our position.
ConsensusState checkConsensus(std::size_t prevProposers, std::size_t currentProposers, std::size_t currentAgree, std::size_t currentFinished, std::chrono::milliseconds previousAgreeTime, std::chrono::milliseconds currentAgreeTime, bool stalled, ConsensusParms const &parms, bool proposing, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determine whether the network reached consensus and whether we joined.
std::chrono::time_point< Clock, Duration > roundCloseTime(std::chrono::time_point< Clock, Duration > closeTime, std::chrono::duration< Rep, Period > closeResolution)
Calculates the close time for a ledger, given a close time resolution.
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
auto constexpr ledgerDefaultTimeResolution
Initial resolution of ledger close time.
ConsensusPhase
Phases of consensus for a single ledger round.
@ accepted
We have accepted a new last closed ledger and are waiting on a call to startRound to begin the next c...
@ open
We haven't closed our ledger yet, but others might have.
@ establish
Establishing consensus by exchanging proposals with our peers.
ConsensusState
Whether we have or don't have a consensus.
@ Expired
Consensus time limit has hard-expired.
@ MovedOn
The network has consensus without us.
@ No
We do not have consensus.
std::string to_string(base_uint< Bits, Tag > const &a)
bool shouldCloseLedger(bool anyTransactions, std::size_t prevProposers, std::size_t proposersClosed, std::size_t proposersValidated, std::chrono::milliseconds prevRoundTime, std::chrono::milliseconds timeSincePrevClose, std::chrono::milliseconds openTime, std::chrono::milliseconds idleInterval, ConsensusParms const &parms, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determines whether the current ledger should close at this time.
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
int participantsNeeded(int participants, int percent)
How many of the participants must agree to reach a given threshold?
std::chrono::duration< Rep, Period > getNextLedgerTimeResolution(std::chrono::duration< Rep, Period > previousResolution, bool previousAgree, Seq ledgerSeq)
Calculates the close time resolution for the specified ledger.
Stores the set of initial close times.
Consensus algorithm parameters.
std::size_t const avCT_CONSENSUS_PCT
Percentage of nodes required to reach agreement on ledger close time.
std::chrono::seconds const proposeINTERVAL
How often we force generating a new proposal to keep ours fresh.
std::size_t const avMIN_ROUNDS
Number of rounds before certain actions can happen.
std::chrono::milliseconds const avMIN_CONSENSUS_TIME
The minimum amount of time to consider the previous round to have taken.
std::chrono::milliseconds const ledgerMIN_CONSENSUS
The number of seconds we wait minimum to ensure participation.
std::map< AvalancheState, AvalancheCutoff > const avalancheCutoffs
Map the consensus requirement avalanche state to the amount of time that must pass before moving to t...
std::chrono::seconds const proposeFRESHNESS
How long we consider a proposal fresh.
Encapsulates the result of consensus.
T time_since_epoch(T... args)