20#ifndef RIPPLE_CONSENSUS_CONSENSUS_H_INCLUDED
21#define RIPPLE_CONSENSUS_CONSENSUS_H_INCLUDED
23#include <xrpld/consensus/ConsensusParms.h>
24#include <xrpld/consensus/ConsensusProposal.h>
25#include <xrpld/consensus/ConsensusTypes.h>
26#include <xrpld/consensus/DisputedTx.h>
27#include <xrpld/consensus/LedgerTiming.h>
29#include <xrpl/basics/Log.h>
30#include <xrpl/basics/chrono.h>
31#include <xrpl/beast/utility/Journal.h>
32#include <xrpl/json/json_writer.h>
70 ConsensusParms
const& parms,
97 ConsensusParms
const& parms,
290template <
class Adaptor>
296 using Tx_t =
typename TxSet_t::Tx;
300 typename Ledger_t::ID,
301 typename TxSet_t::ID>;
324 a.onModeChange(
mode_, mode);
365 std::unique_ptr<
std::stringstream> const& clog = {});
424 typename Ledger_t::ID
458 typename Ledger_t::ID
const& lgrId,
622template <
class Adaptor>
627 : adaptor_(adaptor), clock_(clock), j_{journal}
629 JLOG(
j_.
debug()) <<
"Creating consensus object";
632template <
class Adaptor>
636 typename Ledger_t::ID
const& prevLedgerID,
645 prevRoundTime_ = adaptor_.parms().ledgerIDLE_INTERVAL;
646 prevCloseTime_ = prevLedger.closeTime();
651 prevCloseTime_ = rawCloseTimes_.self;
654 for (
NodeID_t const& n : nowUntrusted)
655 recentPeerPositions_.erase(n);
661 if (prevLedger.id() != prevLedgerID)
664 if (
auto newLedger = adaptor_.acquireLedger(prevLedgerID))
666 prevLedger = *newLedger;
672 <<
"Entering consensus with: " << previousLedger_.id();
673 JLOG(j_.
info()) <<
"Correct LCL is: " << prevLedgerID;
677 startRoundInternal(now, prevLedgerID, prevLedger, startMode, clog);
679template <
class Adaptor>
683 typename Ledger_t::ID
const& prevLedgerID,
689 JLOG(j_.
debug()) <<
"transitioned to ConsensusPhase::open ";
690 CLOG(clog) <<
"startRoundInternal transitioned to ConsensusPhase::open, "
691 "previous ledgerID: "
692 << prevLedgerID <<
", seq: " << prevLedger.seq() <<
". ";
693 mode_.set(mode, adaptor_);
695 prevLedgerID_ = prevLedgerID;
696 previousLedger_ = prevLedger;
698 convergePercent_ = 0;
699 haveCloseTimeConsensus_ =
false;
700 openTime_.reset(clock_.now());
701 currPeerPositions_.clear();
703 rawCloseTimes_.peers.clear();
704 rawCloseTimes_.self = {};
708 previousLedger_.closeTimeResolution(),
709 previousLedger_.closeAgree(),
710 previousLedger_.seq() +
typename Ledger_t::Seq{1});
713 CLOG(clog) <<
"number of peer proposals,previous proposers: "
714 << currPeerPositions_.size() <<
',' << prevProposers_ <<
". ";
715 if (currPeerPositions_.size() > (prevProposers_ / 2))
719 CLOG(clog) <<
"consider closing the ledger immediately. ";
720 timerEntry(now_, clog);
724template <
class Adaptor>
730 JLOG(j_.
debug()) <<
"PROPOSAL " << newPeerPos.render();
731 auto const& peerID = newPeerPos.proposal().nodeID();
735 auto& props = recentPeerPositions_[peerID];
737 if (props.size() >= 10)
740 props.push_back(newPeerPos);
742 return peerProposalInternal(now, newPeerPos);
745template <
class Adaptor>
757 auto const& newPeerProp = newPeerPos.proposal();
759 if (newPeerProp.prevLedger() != prevLedgerID_)
761 JLOG(j_.
debug()) <<
"Got proposal for " << newPeerProp.prevLedger()
762 <<
" but we are on " << prevLedgerID_;
766 auto const& peerID = newPeerProp.nodeID();
768 if (deadNodes_.find(peerID) != deadNodes_.end())
770 JLOG(j_.
info()) <<
"Position from dead node: " << peerID;
776 auto peerPosIt = currPeerPositions_.find(peerID);
778 if (peerPosIt != currPeerPositions_.end())
780 if (newPeerProp.proposeSeq() <=
781 peerPosIt->second.proposal().proposeSeq())
787 if (newPeerProp.isBowOut())
789 JLOG(j_.
info()) <<
"Peer " << peerID <<
" bows out";
792 for (
auto& it : result_->disputes)
793 it.second.unVote(peerID);
795 if (peerPosIt != currPeerPositions_.end())
796 currPeerPositions_.erase(peerID);
797 deadNodes_.insert(peerID);
802 if (peerPosIt != currPeerPositions_.end())
803 peerPosIt->second = newPeerPos;
805 currPeerPositions_.emplace(peerID, newPeerPos);
808 if (newPeerProp.isInitial())
811 JLOG(j_.
trace()) <<
"Peer reports close time as "
812 << newPeerProp.closeTime().time_since_epoch().count();
813 ++rawCloseTimes_.peers[newPeerProp.closeTime()];
816 JLOG(j_.
trace()) <<
"Processing peer proposal " << newPeerProp.proposeSeq()
817 <<
"/" << newPeerProp.position();
820 auto const ait = acquired_.find(newPeerProp.position());
821 if (ait == acquired_.end())
826 if (
auto set = adaptor_.acquireTxSet(newPeerProp.position()))
827 gotTxSet(now_, *
set);
829 JLOG(j_.
debug()) <<
"Don't have tx set for peer";
833 updateDisputes(newPeerProp.nodeID(), ait->second);
840template <
class Adaptor>
846 CLOG(clog) <<
"Consensus<Adaptor>::timerEntry. ";
850 CLOG(clog) <<
"Nothing to do during accepted phase. ";
855 CLOG(clog) <<
"Set network adjusted time to " <<
to_string(now) <<
". ";
858 const auto phaseOrig = phase_;
859 CLOG(clog) <<
"Phase " <<
to_string(phaseOrig) <<
". ";
861 if (phaseOrig != phase_)
863 CLOG(clog) <<
"Changed phase to << " <<
to_string(phase_) <<
". ";
869 phaseEstablish(clog);
870 CLOG(clog) <<
"timerEntry finishing in phase " <<
to_string(phase_) <<
". ";
873template <
class Adaptor>
885 auto id = txSet.id();
889 if (!acquired_.emplace(
id, txSet).second)
894 JLOG(j_.
debug()) <<
"Not creating disputes: no position yet.";
901 id != result_->position.position(),
902 "ripple::Consensus::gotTxSet : updated transaction set");
904 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
906 if (peerPos.proposal().position() ==
id)
908 updateDisputes(nodeId, txSet);
916 <<
"By the time we got " <<
id <<
" no peers were proposing it";
921template <
class Adaptor>
927 using namespace std::chrono_literals;
928 JLOG(j_.
info()) <<
"Simulating consensus";
931 result_->roundTime.tick(consensusDelay.
value_or(100ms));
932 result_->proposers = prevProposers_ = currPeerPositions_.size();
933 prevRoundTime_ = result_->roundTime.read();
935 adaptor_.onForceAccept(
942 JLOG(j_.
info()) <<
"Simulation complete";
945template <
class Adaptor>
955 ret[
"proposers"] =
static_cast<int>(currPeerPositions_.size());
959 ret[
"synched"] =
true;
962 ret[
"close_granularity"] =
static_cast<Int
>(closeResolution_.count());
965 ret[
"synched"] =
false;
969 if (result_ && !result_->disputes.empty() && !full)
970 ret[
"disputes"] =
static_cast<Int
>(result_->disputes.size());
973 ret[
"our_position"] = result_->position.getJson();
979 static_cast<Int
>(result_->roundTime.read().count());
980 ret[
"converge_percent"] = convergePercent_;
981 ret[
"close_resolution"] =
static_cast<Int
>(closeResolution_.count());
982 ret[
"have_time_consensus"] = haveCloseTimeConsensus_;
983 ret[
"previous_proposers"] =
static_cast<Int
>(prevProposers_);
984 ret[
"previous_mseconds"] =
static_cast<Int
>(prevRoundTime_.count());
986 if (!currPeerPositions_.empty())
990 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
992 ppj[
to_string(nodeId)] = peerPos.getJson();
994 ret[
"peer_positions"] = std::move(ppj);
997 if (!acquired_.empty())
1000 for (
auto const& at : acquired_)
1004 ret[
"acquired"] = std::move(acq);
1007 if (result_ && !result_->disputes.empty())
1010 for (
auto const& [txId, dispute] : result_->disputes)
1012 dsj[
to_string(txId)] = dispute.getJson();
1014 ret[
"disputes"] = std::move(dsj);
1017 if (!rawCloseTimes_.peers.empty())
1020 for (
auto const& ct : rawCloseTimes_.peers)
1025 ret[
"close_times"] = std::move(ctj);
1028 if (!deadNodes_.empty())
1031 for (
auto const& dn : deadNodes_)
1035 ret[
"dead_nodes"] = std::move(dnj);
1043template <
class Adaptor>
1046 typename Ledger_t::ID
const& lgrId,
1049 CLOG(clog) <<
"handleWrongLedger. ";
1051 lgrId != prevLedgerID_ || previousLedger_.id() != lgrId,
1052 "ripple::Consensus::handleWrongLedger : have wrong ledger");
1055 leaveConsensus(clog);
1058 if (prevLedgerID_ != lgrId)
1060 prevLedgerID_ = lgrId;
1065 result_->disputes.clear();
1066 result_->compares.clear();
1069 currPeerPositions_.clear();
1070 rawCloseTimes_.peers.clear();
1074 playbackProposals();
1077 if (previousLedger_.id() == prevLedgerID_)
1079 CLOG(clog) <<
"previousLedger_.id() == prevLeverID_ " << prevLedgerID_
1085 if (
auto newLedger = adaptor_.acquireLedger(prevLedgerID_))
1087 JLOG(j_.
info()) <<
"Have the consensus ledger " << prevLedgerID_;
1088 CLOG(clog) <<
"Have the consensus ledger " << prevLedgerID_ <<
". ";
1094 CLOG(clog) <<
"Still on wrong ledger. ";
1099template <
class Adaptor>
1103 CLOG(clog) <<
"checkLedger. ";
1106 adaptor_.getPrevLedger(prevLedgerID_, previousLedger_, mode_.get());
1107 CLOG(clog) <<
"network ledgerid " << netLgr <<
", " <<
"previous ledger "
1108 << prevLedgerID_ <<
". ";
1110 if (netLgr != prevLedgerID_)
1113 ss <<
"View of consensus changed during " <<
to_string(phase_)
1114 <<
" mode=" <<
to_string(mode_.get()) <<
", " << prevLedgerID_
1115 <<
" to " << netLgr <<
", "
1118 CLOG(clog) << ss.
str();
1119 CLOG(clog) <<
"State on consensus change "
1121 handleWrongLedger(netLgr, clog);
1123 else if (previousLedger_.id() != prevLedgerID_)
1125 CLOG(clog) <<
"previousLedger_.id() != prevLedgerID_: "
1126 << previousLedger_.id() <<
',' <<
to_string(prevLedgerID_)
1128 handleWrongLedger(netLgr, clog);
1132template <
class Adaptor>
1136 for (
auto const& it : recentPeerPositions_)
1138 for (
auto const& pos : it.second)
1140 if (pos.proposal().prevLedger() == prevLedgerID_)
1142 if (peerProposalInternal(now_, pos))
1143 adaptor_.share(pos);
1149template <
class Adaptor>
1153 CLOG(clog) <<
"phaseOpen. ";
1157 bool anyTransactions = adaptor_.hasOpenTransactions();
1158 auto proposersClosed = currPeerPositions_.size();
1159 auto proposersValidated = adaptor_.proposersValidated(prevLedgerID_);
1161 openTime_.tick(clock_.now());
1166 auto const mode = mode_.get();
1167 bool const closeAgree = previousLedger_.closeAgree();
1168 auto const prevCloseTime = previousLedger_.closeTime();
1169 auto const prevParentCloseTimePlus1 =
1170 previousLedger_.parentCloseTime() + 1s;
1171 bool const previousCloseCorrect =
1173 (prevCloseTime != prevParentCloseTimePlus1);
1175 auto const lastCloseTime = previousCloseCorrect
1179 if (now_ >= lastCloseTime)
1180 sinceClose = duration_cast<milliseconds>(now_ - lastCloseTime);
1182 sinceClose = -duration_cast<milliseconds>(lastCloseTime - now_);
1183 CLOG(
clog) <<
"calculating how long since last ledger's close time "
1185 <<
to_string(mode) <<
", previous closeAgree: " << closeAgree
1186 <<
", previous close time: " <<
to_string(prevCloseTime)
1187 <<
", previous parent close time + 1s: "
1189 <<
", previous close time seen internally: "
1191 <<
", last close time: " <<
to_string(lastCloseTime)
1192 <<
", since close: " << sinceClose.
count() <<
". ";
1195 auto const idleInterval = std::max<milliseconds>(
1196 adaptor_.parms().ledgerIDLE_INTERVAL,
1197 2 * previousLedger_.closeTimeResolution());
1198 CLOG(
clog) <<
"idle interval set to " << idleInterval.
count()
1199 <<
"ms based on " <<
"ledgerIDLE_INTERVAL: "
1200 << adaptor_.parms().ledgerIDLE_INTERVAL.count()
1201 <<
", previous ledger close time resolution: "
1202 << previousLedger_.closeTimeResolution().count() <<
"ms. ";
1218 CLOG(
clog) <<
"closing ledger. ";
1223template <
class Adaptor>
1228 CLOG(
clog) <<
"shouldPause? ";
1229 auto const& parms = adaptor_.parms();
1231 previousLedger_.seq() -
1232 std::min(adaptor_.getValidLedgerIndex(), previousLedger_.seq()));
1233 auto [quorum, trustedKeys] = adaptor_.getQuorumKeys();
1234 std::size_t const totalValidators = trustedKeys.size();
1236 adaptor_.laggards(previousLedger_.seq(), trustedKeys);
1240 vars <<
" consensuslog (working seq: " << previousLedger_.seq() <<
", "
1241 <<
"validated seq: " << adaptor_.getValidLedgerIndex() <<
", "
1242 <<
"am validator: " << adaptor_.validator() <<
", "
1243 <<
"have validated: " << adaptor_.haveValidated() <<
", "
1244 <<
"roundTime: " << result_->roundTime.
read().count() <<
", "
1245 <<
"max consensus time: " << parms.ledgerMAX_CONSENSUS.count() <<
", "
1246 <<
"validators: " << totalValidators <<
", "
1247 <<
"laggards: " << laggards <<
", " <<
"offline: " << offline <<
", "
1248 <<
"quorum: " << quorum <<
")";
1250 if (!ahead || !laggards || !totalValidators || !adaptor_.validator() ||
1251 !adaptor_.haveValidated() ||
1252 result_->roundTime.read() > parms.ledgerMAX_CONSENSUS)
1254 j_.
debug() <<
"not pausing (early)" << vars.
str();
1255 CLOG(
clog) <<
"Not pausing (early). ";
1259 bool willPause =
false;
1295 std::size_t const phase = (ahead - 1) % (maxPausePhase + 1);
1304 if (laggards + offline > totalValidators - quorum)
1319 float const nonLaggards = totalValidators - (laggards + offline);
1320 float const quorumRatio =
1321 static_cast<float>(quorum) / totalValidators;
1322 float const allowedDissent = 1.0f - quorumRatio;
1323 float const phaseFactor =
static_cast<float>(phase) / maxPausePhase;
1325 if (nonLaggards / totalValidators <
1326 quorumRatio + (allowedDissent * phaseFactor))
1334 j_.
warn() <<
"pausing" << vars.
str();
1335 CLOG(
clog) <<
"pausing " << vars.
str() <<
". ";
1339 j_.
debug() <<
"not pausing" << vars.
str();
1340 CLOG(
clog) <<
"not pausing. ";
1345template <
class Adaptor>
1350 CLOG(
clog) <<
"phaseEstablish. ";
1352 XRPL_ASSERT(result_,
"ripple::Consensus::phaseEstablish : result is set");
1357 result_->roundTime.tick(clock_.now());
1358 result_->proposers = currPeerPositions_.size();
1360 convergePercent_ = result_->roundTime.read() * 100 /
1362 CLOG(
clog) <<
"convergePercent_ " << convergePercent_
1363 <<
" is based on round duration so far: "
1364 << result_->roundTime.read().count() <<
"ms, "
1365 <<
"previous round duration: " << prevRoundTime_.count()
1373 CLOG(
clog) <<
"ledgerMIN_CONSENSUS not reached: "
1378 updateOurPositions(
clog);
1381 if (shouldPause(
clog) || !haveConsensus(
clog))
1384 if (!haveCloseTimeConsensus_)
1386 JLOG(j_.
info()) <<
"We have TX consensus but not CT consensus";
1387 CLOG(
clog) <<
"We have TX consensus but not CT consensus. ";
1391 JLOG(j_.
info()) <<
"Converge cutoff (" << currPeerPositions_.size()
1392 <<
" participants)";
1393 CLOG(
clog) <<
"Converge cutoff (" << currPeerPositions_.size()
1394 <<
" participants). Transitioned to ConsensusPhase::accepted. ";
1395 adaptor_.updateOperatingMode(currPeerPositions_.size());
1396 prevProposers_ = currPeerPositions_.size();
1397 prevRoundTime_ = result_->roundTime.read();
1399 JLOG(j_.
debug()) <<
"transitioned to ConsensusPhase::accepted";
1407 adaptor_.validating());
1410template <
class Adaptor>
1415 XRPL_ASSERT(!result_,
"ripple::Consensus::closeLedger : result is not set");
1418 JLOG(j_.
debug()) <<
"transitioned to ConsensusPhase::establish";
1419 rawCloseTimes_.self = now_;
1421 result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get()));
1422 result_->roundTime.reset(clock_.now());
1425 if (acquired_.emplace(result_->txns.id(), result_->txns).second)
1426 adaptor_.share(result_->txns);
1428 const auto mode = mode_.get();
1430 <<
"closeLedger transitioned to ConsensusPhase::establish, mode: "
1432 <<
", number of peer positions: " << currPeerPositions_.
size() <<
". ";
1434 adaptor_.propose(result_->position);
1437 for (
auto const& pit : currPeerPositions_)
1439 auto const& pos = pit.second.proposal().position();
1440 auto const it = acquired_.find(pos);
1441 if (it != acquired_.end())
1442 createDisputes(it->second,
clog);
1461 int result = ((participants * percent) + (percent / 2)) / 100;
1463 return (result == 0) ? 1 : result;
1466template <
class Adaptor>
1473 result_,
"ripple::Consensus::updateOurPositions : result is set");
1479 CLOG(
clog) <<
"updateOurPositions. peerCutoff " <<
to_string(peerCutoff)
1480 <<
", ourCutoff " <<
to_string(ourCutoff) <<
". ";
1485 auto it = currPeerPositions_.
begin();
1486 while (it != currPeerPositions_.end())
1488 Proposal_t const& peerProp = it->second.proposal();
1489 if (peerProp.
isStale(peerCutoff))
1493 JLOG(j_.
warn()) <<
"Removing stale proposal from " << peerID;
1494 for (
auto& dt : result_->disputes)
1495 dt.second.unVote(peerID);
1496 it = currPeerPositions_.erase(it);
1501 ++closeTimeVotes[asCloseTime(peerProp.
closeTime())];
1513 for (
auto& [txId, dispute] : result_->disputes)
1517 if (dispute.updateVote(
1523 mutableSet.
emplace(result_->txns);
1525 if (dispute.getOurVote())
1528 mutableSet->insert(dispute.tx());
1533 mutableSet->erase(txId);
1539 ourNewSet.
emplace(std::move(*mutableSet));
1543 haveCloseTimeConsensus_ =
false;
1545 if (currPeerPositions_.empty())
1548 haveCloseTimeConsensus_ =
true;
1549 consensusCloseTime = asCloseTime(result_->position.closeTime());
1563 CLOG(
clog) <<
"neededWeight " << neededWeight <<
". ";
1565 int participants = currPeerPositions_.size();
1568 ++closeTimeVotes[asCloseTime(result_->position.closeTime())];
1576 int const threshConsensus =
1580 ss <<
"Proposers:" << currPeerPositions_.size()
1581 <<
" nw:" << neededWeight <<
" thrV:" << threshVote
1582 <<
" thrC:" << threshConsensus;
1586 for (
auto const& [t, v] : closeTimeVotes)
1590 <<
static_cast<std::uint32_t>(previousLedger_.seq()) + 1 <<
": "
1591 << t.time_since_epoch().count() <<
" has " << v <<
", "
1592 << threshVote <<
" required";
1594 if (v >= threshVote)
1597 consensusCloseTime = t;
1600 if (threshVote >= threshConsensus)
1601 haveCloseTimeConsensus_ =
true;
1605 if (!haveCloseTimeConsensus_)
1608 <<
"No CT consensus:" <<
" Proposers:"
1609 << currPeerPositions_.size()
1611 <<
" Thresh:" << threshConsensus
1613 CLOG(
clog) <<
"No close time consensus. ";
1618 ((consensusCloseTime != asCloseTime(result_->position.closeTime())) ||
1619 result_->position.isStale(ourCutoff)))
1622 ourNewSet.
emplace(result_->txns);
1627 auto newID = ourNewSet->id();
1629 result_->txns = std::move(*ourNewSet);
1632 ss <<
"Position change: CTime "
1637 result_->position.changePosition(newID, consensusCloseTime, now_);
1641 if (acquired_.emplace(newID, result_->txns).second)
1643 if (!result_->position.isBowOut())
1644 adaptor_.share(result_->txns);
1646 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
1650 updateDisputes(nodeId, result_->txns);
1655 if (!result_->position.isBowOut() &&
1657 adaptor_.propose(result_->position);
1661template <
class Adaptor>
1667 XRPL_ASSERT(result_,
"ripple::Consensus::haveConsensus : has result");
1670 int agree = 0, disagree = 0;
1672 auto ourPosition = result_->position.position();
1675 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
1677 Proposal_t const& peerProp = peerPos.proposal();
1678 if (peerProp.
position() == ourPosition)
1684 JLOG(j_.
debug()) << nodeId <<
" has " << peerProp.
position();
1688 auto currentFinished =
1689 adaptor_.proposersFinished(previousLedger_, prevLedgerID_);
1691 JLOG(j_.
debug()) <<
"Checking for TX consensus: agree=" << agree
1692 <<
", disagree=" << disagree;
1701 result_->roundTime.read(),
1709 CLOG(
clog) <<
"No consensus. ";
1717 JLOG(j_.
error()) <<
"Unable to reach consensus";
1719 CLOG(
clog) <<
"Unable to reach consensus "
1723 CLOG(
clog) <<
"Consensus has been reached. ";
1727template <
class Adaptor>
1734 if (result_ && !result_->position.isBowOut())
1736 result_->position.bowOut(now_);
1737 adaptor_.propose(result_->position);
1741 JLOG(j_.
info()) <<
"Bowing out of consensus";
1742 CLOG(
clog) <<
"Bowing out of consensus. ";
1746template <
class Adaptor>
1753 XRPL_ASSERT(result_,
"ripple::Consensus::createDisputes : result is set");
1756 auto const emplaced = result_->compares.emplace(o.id()).second;
1757 CLOG(
clog) <<
"createDisputes: new set? " << !emplaced <<
". ";
1762 if (result_->txns.id() == o.id())
1764 CLOG(
clog) <<
"both sets are identical. ";
1768 CLOG(
clog) <<
"comparing existing with new set: " << result_->txns.id()
1769 <<
',' << o.id() <<
". ";
1770 JLOG(j_.
debug()) <<
"createDisputes " << result_->txns.id() <<
" to "
1773 auto differences = result_->txns.compare(o);
1777 for (
auto const& [txId, inThisSet] : differences)
1782 (inThisSet && result_->txns.find(txId) && !o.find(txId)) ||
1783 (!inThisSet && !result_->txns.find(txId) && o.find(txId)),
1784 "ripple::Consensus::createDisputes : has disputed transactions");
1786 Tx_t tx = inThisSet ? result_->txns.find(txId) : o.find(txId);
1787 auto txID = tx.id();
1789 if (result_->disputes.find(txID) != result_->disputes.end())
1792 JLOG(j_.
debug()) <<
"Transaction " << txID <<
" is disputed";
1796 result_->txns.exists(txID),
1797 std::max(prevProposers_, currPeerPositions_.size()),
1801 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
1803 Proposal_t const& peerProp = peerPos.proposal();
1804 auto const cit = acquired_.find(peerProp.
position());
1805 if (cit != acquired_.end())
1806 dtx.setVote(nodeId, cit->second.exists(txID));
1808 adaptor_.share(dtx.tx());
1810 result_->disputes.emplace(txID, std::move(dtx));
1812 JLOG(j_.
debug()) << dc <<
" differences found";
1813 CLOG(
clog) <<
"disputes: " << dc <<
". ";
1816template <
class Adaptor>
1821 XRPL_ASSERT(result_,
"ripple::Consensus::updateDisputes : result is set");
1825 if (result_->compares.find(other.id()) == result_->compares.end())
1826 createDisputes(other);
1828 for (
auto& it : result_->disputes)
1830 auto& d = it.second;
1831 d.setVote(node, other.exists(d.tx().id()));
1835template <
class Adaptor>
Decorator for streaming out compact json.
Value & append(const Value &value)
Append value to array at the end.
A generic endpoint for log messages.
Stream trace() const
Severity stream access functions.
NodeID_t const & nodeID() const
Identifying which peer took this position.
NetClock::time_point const & closeTime() const
The current position on the consensus close time.
Position_t const & position() const
Get the proposed position.
bool isStale(NetClock::time_point cutoff) const
Get whether this position is stale relative to the provided cutoff.
Measures the duration of phases of consensus.
void set(ConsensusMode mode, Adaptor &a)
MonitoredMode(ConsensusMode m)
ConsensusMode get() const
Generic implementation of consensus algorithm.
void playbackProposals()
If we radically changed our consensus context for some reason, we need to replay recent proposals so ...
void timerEntry(NetClock::time_point const &now, std::unique_ptr< std::stringstream > const &clog={})
Call periodically to drive consensus forward.
void startRoundInternal(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t const &prevLedger, ConsensusMode mode, std::unique_ptr< std::stringstream > const &clog)
void phaseEstablish(std::unique_ptr< std::stringstream > const &clog)
Handle establish phase.
typename Adaptor::PeerPosition_t PeerPosition_t
NetClock::time_point prevCloseTime_
clock_type const & clock_
void leaveConsensus(std::unique_ptr< std::stringstream > const &clog)
void updateDisputes(NodeID_t const &node, TxSet_t const &other)
typename Adaptor::TxSet_t TxSet_t
void updateOurPositions(std::unique_ptr< std::stringstream > const &clog)
bool haveConsensus(std::unique_ptr< std::stringstream > const &clog)
Ledger_t::ID prevLedgerID() const
Get the previous ledger ID.
void handleWrongLedger(typename Ledger_t::ID const &lgrId, std::unique_ptr< std::stringstream > const &clog)
void checkLedger(std::unique_ptr< std::stringstream > const &clog)
Check if our previous ledger matches the network's.
hash_map< NodeID_t, std::deque< PeerPosition_t > > recentPeerPositions_
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Simulate the consensus process without any network traffic.
Json::Value getJson(bool full) const
Get the Json state of the consensus process.
typename TxSet_t::Tx Tx_t
void startRound(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t prevLedger, hash_set< NodeID_t > const &nowUntrusted, bool proposing, std::unique_ptr< std::stringstream > const &clog={})
Kick-off the next round of consensus.
Consensus(Consensus &&) noexcept=default
NetClock::time_point now_
std::size_t prevProposers_
NetClock::time_point asCloseTime(NetClock::time_point raw) const
void createDisputes(TxSet_t const &o, std::unique_ptr< std::stringstream > const &clog={})
void gotTxSet(NetClock::time_point const &now, TxSet_t const &txSet)
Process a transaction set acquired from the network.
typename Adaptor::Ledger_t Ledger_t
ConsensusPhase phase() const
hash_map< typename TxSet_t::ID, const TxSet_t > acquired_
typename Adaptor::NodeID_t NodeID_t
void closeLedger(std::unique_ptr< std::stringstream > const &clog)
NetClock::duration closeResolution_
bool peerProposal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
A peer has proposed a new position, adjust our tracking.
bool peerProposalInternal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
Handle a replayed or a new peer proposal.
hash_map< NodeID_t, PeerPosition_t > currPeerPositions_
bool shouldPause(std::unique_ptr< std::stringstream > const &clog) const
Evaluate whether pausing increases likelihood of validation.
void phaseOpen(std::unique_ptr< std::stringstream > const &clog)
Handle pre-close phase.
ConsensusCloseTimes rawCloseTimes_
std::chrono::milliseconds prevRoundTime_
std::optional< Result > result_
hash_set< NodeID_t > deadNodes_
Ledger_t::ID prevLedgerID_
bool haveCloseTimeConsensus_
A transaction discovered to be in dispute during consensus.
@ arrayValue
array value (ordered list)
@ objectValue
object value (collection of name/value pairs).
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
ConsensusState checkConsensus(std::size_t prevProposers, std::size_t currentProposers, std::size_t currentAgree, std::size_t currentFinished, std::chrono::milliseconds previousAgreeTime, std::chrono::milliseconds currentAgreeTime, ConsensusParms const &parms, bool proposing, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determine whether the network reached consensus and whether we joined.
ConsensusMode
Represents how a node currently participates in Consensus.
@ wrongLedger
We have the wrong ledger and are attempting to acquire it.
@ proposing
We are normal participant in consensus and propose our position.
@ switchedLedger
We switched ledgers since we started this consensus round but are now running on what we believe is t...
@ observing
We are observing peer positions, but not proposing our position.
std::chrono::time_point< Clock, Duration > roundCloseTime(std::chrono::time_point< Clock, Duration > closeTime, std::chrono::duration< Rep, Period > closeResolution)
Calculates the close time for a ledger, given a close time resolution.
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
auto constexpr ledgerDefaultTimeResolution
Initial resolution of ledger close time.
ConsensusPhase
Phases of consensus for a single ledger round.
@ accepted
We have accepted a new last closed ledger and are waiting on a call to startRound to begin the next c...
@ open
We haven't closed our ledger yet, but others might have.
@ establish
Establishing consensus by exchanging proposals with our peers.
ConsensusState
Whether we have or don't have a consensus.
@ MovedOn
The network has consensus without us.
@ No
We do not have consensus.
std::string to_string(base_uint< Bits, Tag > const &a)
bool shouldCloseLedger(bool anyTransactions, std::size_t prevProposers, std::size_t proposersClosed, std::size_t proposersValidated, std::chrono::milliseconds prevRoundTime, std::chrono::milliseconds timeSincePrevClose, std::chrono::milliseconds openTime, std::chrono::milliseconds idleInterval, ConsensusParms const &parms, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determines whether the current ledger should close at this time.
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
int participantsNeeded(int participants, int percent)
How many of the participants must agree to reach a given threshold?
std::chrono::duration< Rep, Period > getNextLedgerTimeResolution(std::chrono::duration< Rep, Period > previousResolution, bool previousAgree, Seq ledgerSeq)
Calculates the close time resolution for the specified ledger.
Stores the set of initial close times.
Consensus algorithm parameters.
std::size_t avINIT_CONSENSUS_PCT
Percentage of nodes on our UNL that must vote yes.
std::chrono::milliseconds ledgerMIN_CONSENSUS
The number of seconds we wait minimum to ensure participation.
std::chrono::milliseconds avMIN_CONSENSUS_TIME
The minimum amount of time to consider the previous round to have taken.
std::size_t avLATE_CONSENSUS_PCT
Percentage of nodes that most vote yes after advancing.
std::size_t avSTUCK_CONSENSUS_PCT
Percentage of nodes that must vote yes after we are stuck.
std::chrono::seconds proposeFRESHNESS
How long we consider a proposal fresh.
std::size_t avLATE_CONSENSUS_TIME
Percentage of previous round duration before we advance.
std::chrono::seconds proposeINTERVAL
How often we force generating a new proposal to keep ours fresh.
std::size_t avCT_CONSENSUS_PCT
Percentage of nodes required to reach agreement on ledger close time.
std::size_t avMID_CONSENSUS_PCT
Percentage of nodes that most vote yes after advancing.
std::size_t avSTUCK_CONSENSUS_TIME
Percentage of previous round duration before we are stuck.
std::size_t avMID_CONSENSUS_TIME
Percentage of previous round duration before we advance.
Encapsulates the result of consensus.
T time_since_epoch(T... args)