rippled
Loading...
Searching...
No Matches
Consensus.h
1#ifndef XRPL_CONSENSUS_CONSENSUS_H_INCLUDED
2#define XRPL_CONSENSUS_CONSENSUS_H_INCLUDED
3
4#include <xrpld/consensus/ConsensusParms.h>
5#include <xrpld/consensus/ConsensusProposal.h>
6#include <xrpld/consensus/ConsensusTypes.h>
7#include <xrpld/consensus/DisputedTx.h>
8#include <xrpld/consensus/LedgerTiming.h>
9
10#include <xrpl/basics/Log.h>
11#include <xrpl/basics/chrono.h>
12#include <xrpl/beast/utility/Journal.h>
13#include <xrpl/json/json_writer.h>
14
15#include <algorithm>
16#include <chrono>
17#include <deque>
18#include <optional>
19#include <sstream>
20
21namespace ripple {
22
42bool
44 bool anyTransactions,
45 std::size_t prevProposers,
46 std::size_t proposersClosed,
47 std::size_t proposersValidated,
48 std::chrono::milliseconds prevRoundTime,
49 std::chrono::milliseconds timeSincePrevClose,
51 std::chrono::milliseconds idleInterval,
52 ConsensusParms const& parms,
54 std::unique_ptr<std::stringstream> const& clog = {});
55
77 std::size_t prevProposers,
78 std::size_t currentProposers,
79 std::size_t currentAgree,
80 std::size_t currentFinished,
81 std::chrono::milliseconds previousAgreeTime,
82 std::chrono::milliseconds currentAgreeTime,
83 bool stalled,
84 ConsensusParms const& parms,
85 bool proposing,
87 std::unique_ptr<std::stringstream> const& clog = {});
88
277template <class Adaptor>
279{
280 using Ledger_t = typename Adaptor::Ledger_t;
281 using TxSet_t = typename Adaptor::TxSet_t;
282 using NodeID_t = typename Adaptor::NodeID_t;
283 using Tx_t = typename TxSet_t::Tx;
284 using PeerPosition_t = typename Adaptor::PeerPosition_t;
286 NodeID_t,
287 typename Ledger_t::ID,
288 typename TxSet_t::ID>;
289
291
292 // Helper class to ensure adaptor is notified whenever the ConsensusMode
293 // changes
295 {
297
298 public:
300 {
301 }
303 get() const
304 {
305 return mode_;
306 }
307
308 void
309 set(ConsensusMode mode, Adaptor& a)
310 {
311 a.onModeChange(mode_, mode);
312 mode_ = mode;
313 }
314 };
315
316public:
319
320 Consensus(Consensus&&) noexcept = default;
321
328 Consensus(clock_type const& clock, Adaptor& adaptor, beast::Journal j);
329
345 void
347 NetClock::time_point const& now,
348 typename Ledger_t::ID const& prevLedgerID,
349 Ledger_t prevLedger,
350 hash_set<NodeID_t> const& nowUntrusted,
351 bool proposing,
352 std::unique_ptr<std::stringstream> const& clog = {});
353
360 bool
362 NetClock::time_point const& now,
363 PeerPosition_t const& newProposal);
364
370 void
372 NetClock::time_point const& now,
373 std::unique_ptr<std::stringstream> const& clog = {});
374
380 void
381 gotTxSet(NetClock::time_point const& now, TxSet_t const& txSet);
382
399 void
401 NetClock::time_point const& now,
403
411 typename Ledger_t::ID
413 {
414 return prevLedgerID_;
415 }
416
418 phase() const
419 {
420 return phase_;
421 }
422
431 getJson(bool full) const;
432
433private:
434 void
436 NetClock::time_point const& now,
437 typename Ledger_t::ID const& prevLedgerID,
438 Ledger_t const& prevLedger,
439 ConsensusMode mode,
441
442 // Change our view of the previous ledger
443 void
445 typename Ledger_t::ID const& lgrId,
447
453 void
455
459 void
461
464 bool
466 NetClock::time_point const& now,
467 PeerPosition_t const& newProposal);
468
475 void
477
486 void
488
511 bool
513
514 // Close the open ledger and establish initial position.
515 void
517
518 // Adjust our positions to try to agree with other validators.
519 void
521
522 bool
524
525 // Create disputes between our position and the provided one.
526 void
528 TxSet_t const& o,
529 std::unique_ptr<std::stringstream> const& clog = {});
530
531 // Update our disputes given that this node has adopted a new position.
532 // Will call createDisputes as needed.
533 void
534 updateDisputes(NodeID_t const& node, TxSet_t const& other);
535
536 // Revoke our outstanding proposal, if any, and cease proposing
537 // until this round ends.
538 void
540
541 // The rounded or effective close time estimate from a proposer
544
545private:
546 Adaptor& adaptor_;
547
550 bool firstRound_ = true;
552
554
555 // How long the consensus convergence has taken, expressed as
556 // a percentage of the time that we expected it to take.
558
559 // How long has this round been open
561
563
566
567 // Time it took for the last consensus round to converge
569
570 //-------------------------------------------------------------------------
571 // Network time measurements of consensus progress
572
573 // The current network adjusted time. This is the network time the
574 // ledger would close if it closed now
577
578 //-------------------------------------------------------------------------
579 // Non-peer (self) consensus data
580
581 // Last validated ledger ID provided to consensus
582 typename Ledger_t::ID prevLedgerID_;
583 // Last validated ledger seen by consensus
585
586 // Transaction Sets, indexed by hash of transaction tree
588
591
592 // The number of calls to phaseEstablish where none of our peers
593 // have changed any votes on disputed transactions.
595
596 // The total number of times we have called phaseEstablish
598
599 //-------------------------------------------------------------------------
600 // Peer related consensus data
601
602 // Peer proposed positions for the current round
604
605 // Recently received peer positions, available when transitioning between
606 // ledgers or rounds
608
609 // The number of proposers who participated in the last consensus round
611
612 // nodes that have bowed out of this consensus process
614
615 // Journal for debugging
617};
618
619template <class Adaptor>
621 clock_type const& clock,
622 Adaptor& adaptor,
623 beast::Journal journal)
624 : adaptor_(adaptor), clock_(clock), j_{journal}
625{
626 JLOG(j_.debug()) << "Creating consensus object";
627}
628
629template <class Adaptor>
630void
632 NetClock::time_point const& now,
633 typename Ledger_t::ID const& prevLedgerID,
634 Ledger_t prevLedger,
635 hash_set<NodeID_t> const& nowUntrusted,
636 bool proposing,
638{
639 if (firstRound_)
640 {
641 // take our initial view of closeTime_ from the seed ledger
642 prevRoundTime_ = adaptor_.parms().ledgerIDLE_INTERVAL;
643 prevCloseTime_ = prevLedger.closeTime();
644 firstRound_ = false;
645 }
646 else
647 {
648 prevCloseTime_ = rawCloseTimes_.self;
649 }
650
651 for (NodeID_t const& n : nowUntrusted)
652 recentPeerPositions_.erase(n);
653
654 ConsensusMode startMode =
656
657 // We were handed the wrong ledger
658 if (prevLedger.id() != prevLedgerID)
659 {
660 // try to acquire the correct one
661 if (auto newLedger = adaptor_.acquireLedger(prevLedgerID))
662 {
663 prevLedger = *newLedger;
664 }
665 else // Unable to acquire the correct ledger
666 {
667 startMode = ConsensusMode::wrongLedger;
668 JLOG(j_.info())
669 << "Entering consensus with: " << previousLedger_.id();
670 JLOG(j_.info()) << "Correct LCL is: " << prevLedgerID;
671 }
672 }
673
674 startRoundInternal(now, prevLedgerID, prevLedger, startMode, clog);
675}
676template <class Adaptor>
677void
679 NetClock::time_point const& now,
680 typename Ledger_t::ID const& prevLedgerID,
681 Ledger_t const& prevLedger,
682 ConsensusMode mode,
684{
685 phase_ = ConsensusPhase::open;
686 JLOG(j_.debug()) << "transitioned to ConsensusPhase::open ";
687 CLOG(clog) << "startRoundInternal transitioned to ConsensusPhase::open, "
688 "previous ledgerID: "
689 << prevLedgerID << ", seq: " << prevLedger.seq() << ". ";
690 mode_.set(mode, adaptor_);
691 now_ = now;
692 prevLedgerID_ = prevLedgerID;
693 previousLedger_ = prevLedger;
694 result_.reset();
695 convergePercent_ = 0;
696 closeTimeAvalancheState_ = ConsensusParms::init;
697 haveCloseTimeConsensus_ = false;
698 openTime_.reset(clock_.now());
699 currPeerPositions_.clear();
700 acquired_.clear();
701 rawCloseTimes_.peers.clear();
702 rawCloseTimes_.self = {};
703 deadNodes_.clear();
704
705 closeResolution_ = getNextLedgerTimeResolution(
706 previousLedger_.closeTimeResolution(),
707 previousLedger_.closeAgree(),
708 previousLedger_.seq() + typename Ledger_t::Seq{1});
709
710 playbackProposals();
711 CLOG(clog) << "number of peer proposals,previous proposers: "
712 << currPeerPositions_.size() << ',' << prevProposers_ << ". ";
713 if (currPeerPositions_.size() > (prevProposers_ / 2))
714 {
715 // We may be falling behind, don't wait for the timer
716 // consider closing the ledger immediately
717 CLOG(clog) << "consider closing the ledger immediately. ";
718 timerEntry(now_, clog);
719 }
720}
721
722template <class Adaptor>
723bool
725 NetClock::time_point const& now,
726 PeerPosition_t const& newPeerPos)
727{
728 JLOG(j_.debug()) << "PROPOSAL " << newPeerPos.render();
729 auto const& peerID = newPeerPos.proposal().nodeID();
730
731 // Always need to store recent positions
732 {
733 auto& props = recentPeerPositions_[peerID];
734
735 if (props.size() >= 10)
736 props.pop_front();
737
738 props.push_back(newPeerPos);
739 }
740 return peerProposalInternal(now, newPeerPos);
741}
742
743template <class Adaptor>
744bool
746 NetClock::time_point const& now,
747 PeerPosition_t const& newPeerPos)
748{
749 // Nothing to do for now if we are currently working on a ledger
750 if (phase_ == ConsensusPhase::accepted)
751 return false;
752
753 now_ = now;
754
755 auto const& newPeerProp = newPeerPos.proposal();
756
757 if (newPeerProp.prevLedger() != prevLedgerID_)
758 {
759 JLOG(j_.debug()) << "Got proposal for " << newPeerProp.prevLedger()
760 << " but we are on " << prevLedgerID_;
761 return false;
762 }
763
764 auto const& peerID = newPeerProp.nodeID();
765
766 if (deadNodes_.find(peerID) != deadNodes_.end())
767 {
768 JLOG(j_.info()) << "Position from dead node: " << peerID;
769 return false;
770 }
771
772 {
773 // update current position
774 auto peerPosIt = currPeerPositions_.find(peerID);
775
776 if (peerPosIt != currPeerPositions_.end())
777 {
778 if (newPeerProp.proposeSeq() <=
779 peerPosIt->second.proposal().proposeSeq())
780 {
781 return false;
782 }
783 }
784
785 if (newPeerProp.isBowOut())
786 {
787 JLOG(j_.info()) << "Peer " << peerID << " bows out";
788 if (result_)
789 {
790 for (auto& it : result_->disputes)
791 it.second.unVote(peerID);
792 }
793 if (peerPosIt != currPeerPositions_.end())
794 currPeerPositions_.erase(peerID);
795 deadNodes_.insert(peerID);
796
797 return true;
798 }
799
800 if (peerPosIt != currPeerPositions_.end())
801 peerPosIt->second = newPeerPos;
802 else
803 currPeerPositions_.emplace(peerID, newPeerPos);
804 }
805
806 if (newPeerProp.isInitial())
807 {
808 // Record the close time estimate
809 JLOG(j_.trace()) << "Peer reports close time as "
810 << newPeerProp.closeTime().time_since_epoch().count();
811 ++rawCloseTimes_.peers[newPeerProp.closeTime()];
812 }
813
814 JLOG(j_.trace()) << "Processing peer proposal " << newPeerProp.proposeSeq()
815 << "/" << newPeerProp.position();
816
817 {
818 auto const ait = acquired_.find(newPeerProp.position());
819 if (ait == acquired_.end())
820 {
821 // acquireTxSet will return the set if it is available, or
822 // spawn a request for it and return nullopt/nullptr. It will call
823 // gotTxSet once it arrives
824 if (auto set = adaptor_.acquireTxSet(newPeerProp.position()))
825 gotTxSet(now_, *set);
826 else
827 JLOG(j_.debug()) << "Don't have tx set for peer";
828 }
829 else if (result_)
830 {
831 updateDisputes(newPeerProp.nodeID(), ait->second);
832 }
833 }
834
835 return true;
836}
837
838template <class Adaptor>
839void
841 NetClock::time_point const& now,
843{
844 CLOG(clog) << "Consensus<Adaptor>::timerEntry. ";
845 // Nothing to do if we are currently working on a ledger
846 if (phase_ == ConsensusPhase::accepted)
847 {
848 CLOG(clog) << "Nothing to do during accepted phase. ";
849 return;
850 }
851
852 now_ = now;
853 CLOG(clog) << "Set network adjusted time to " << to_string(now) << ". ";
854
855 // Check we are on the proper ledger (this may change phase_)
856 auto const phaseOrig = phase_;
857 CLOG(clog) << "Phase " << to_string(phaseOrig) << ". ";
858 checkLedger(clog);
859 if (phaseOrig != phase_)
860 {
861 CLOG(clog) << "Changed phase to << " << to_string(phase_) << ". ";
862 }
863
864 if (phase_ == ConsensusPhase::open)
865 phaseOpen(clog);
866 else if (phase_ == ConsensusPhase::establish)
867 phaseEstablish(clog);
868 CLOG(clog) << "timerEntry finishing in phase " << to_string(phase_) << ". ";
869}
870
871template <class Adaptor>
872void
874 NetClock::time_point const& now,
875 TxSet_t const& txSet)
876{
877 // Nothing to do if we've finished work on a ledger
878 if (phase_ == ConsensusPhase::accepted)
879 return;
880
881 now_ = now;
882
883 auto id = txSet.id();
884
885 // If we've already processed this transaction set since requesting
886 // it from the network, there is nothing to do now
887 if (!acquired_.emplace(id, txSet).second)
888 return;
889
890 if (!result_)
891 {
892 JLOG(j_.debug()) << "Not creating disputes: no position yet.";
893 }
894 else
895 {
896 // Our position is added to acquired_ as soon as we create it,
897 // so this txSet must differ
898 XRPL_ASSERT(
899 id != result_->position.position(),
900 "ripple::Consensus::gotTxSet : updated transaction set");
901 bool any = false;
902 for (auto const& [nodeId, peerPos] : currPeerPositions_)
903 {
904 if (peerPos.proposal().position() == id)
905 {
906 updateDisputes(nodeId, txSet);
907 any = true;
908 }
909 }
910
911 if (!any)
912 {
913 JLOG(j_.warn())
914 << "By the time we got " << id << " no peers were proposing it";
915 }
916 }
917}
918
919template <class Adaptor>
920void
922 NetClock::time_point const& now,
924{
925 using namespace std::chrono_literals;
926 JLOG(j_.info()) << "Simulating consensus";
927 now_ = now;
928 closeLedger({});
929 result_->roundTime.tick(consensusDelay.value_or(100ms));
930 result_->proposers = prevProposers_ = currPeerPositions_.size();
931 prevRoundTime_ = result_->roundTime.read();
933 adaptor_.onForceAccept(
934 *result_,
935 previousLedger_,
936 closeResolution_,
937 rawCloseTimes_,
938 mode_.get(),
939 getJson(true));
940 JLOG(j_.info()) << "Simulation complete";
941}
942
943template <class Adaptor>
946{
947 using std::to_string;
948 using Int = Json::Value::Int;
949
951
952 ret["proposing"] = (mode_.get() == ConsensusMode::proposing);
953 ret["proposers"] = static_cast<int>(currPeerPositions_.size());
954
955 if (mode_.get() != ConsensusMode::wrongLedger)
956 {
957 ret["synched"] = true;
958 ret["ledger_seq"] =
959 static_cast<std::uint32_t>(previousLedger_.seq()) + 1;
960 ret["close_granularity"] = static_cast<Int>(closeResolution_.count());
961 }
962 else
963 ret["synched"] = false;
964
965 ret["phase"] = to_string(phase_);
966
967 if (result_ && !result_->disputes.empty() && !full)
968 ret["disputes"] = static_cast<Int>(result_->disputes.size());
969
970 if (result_)
971 ret["our_position"] = result_->position.getJson();
972
973 if (full)
974 {
975 if (result_)
976 ret["current_ms"] =
977 static_cast<Int>(result_->roundTime.read().count());
978 ret["converge_percent"] = convergePercent_;
979 ret["close_resolution"] = static_cast<Int>(closeResolution_.count());
980 ret["have_time_consensus"] = haveCloseTimeConsensus_;
981 ret["previous_proposers"] = static_cast<Int>(prevProposers_);
982 ret["previous_mseconds"] = static_cast<Int>(prevRoundTime_.count());
983
984 if (!currPeerPositions_.empty())
985 {
987
988 for (auto const& [nodeId, peerPos] : currPeerPositions_)
989 {
990 ppj[to_string(nodeId)] = peerPos.getJson();
991 }
992 ret["peer_positions"] = std::move(ppj);
993 }
994
995 if (!acquired_.empty())
996 {
998 for (auto const& at : acquired_)
999 {
1000 acq.append(to_string(at.first));
1001 }
1002 ret["acquired"] = std::move(acq);
1003 }
1004
1005 if (result_ && !result_->disputes.empty())
1006 {
1008 for (auto const& [txId, dispute] : result_->disputes)
1009 {
1010 dsj[to_string(txId)] = dispute.getJson();
1011 }
1012 ret["disputes"] = std::move(dsj);
1013 }
1014
1015 if (!rawCloseTimes_.peers.empty())
1016 {
1018 for (auto const& ct : rawCloseTimes_.peers)
1019 {
1020 ctj[std::to_string(ct.first.time_since_epoch().count())] =
1021 ct.second;
1022 }
1023 ret["close_times"] = std::move(ctj);
1024 }
1025
1026 if (!deadNodes_.empty())
1027 {
1029 for (auto const& dn : deadNodes_)
1030 {
1031 dnj.append(to_string(dn));
1032 }
1033 ret["dead_nodes"] = std::move(dnj);
1034 }
1035 }
1036
1037 return ret;
1038}
1039
1040// Handle a change in the prior ledger during a consensus round
1041template <class Adaptor>
1042void
1044 typename Ledger_t::ID const& lgrId,
1046{
1047 CLOG(clog) << "handleWrongLedger. ";
1048 XRPL_ASSERT(
1049 lgrId != prevLedgerID_ || previousLedger_.id() != lgrId,
1050 "ripple::Consensus::handleWrongLedger : have wrong ledger");
1051
1052 // Stop proposing because we are out of sync
1053 leaveConsensus(clog);
1054
1055 // First time switching to this ledger
1056 if (prevLedgerID_ != lgrId)
1057 {
1058 prevLedgerID_ = lgrId;
1059
1060 // Clear out state
1061 if (result_)
1062 {
1063 result_->disputes.clear();
1064 result_->compares.clear();
1065 }
1066
1067 currPeerPositions_.clear();
1068 rawCloseTimes_.peers.clear();
1069 deadNodes_.clear();
1070
1071 // Get back in sync, this will also recreate disputes
1072 playbackProposals();
1073 }
1074
1075 if (previousLedger_.id() == prevLedgerID_)
1076 {
1077 CLOG(clog) << "previousLedger_.id() == prevLeverID_ " << prevLedgerID_
1078 << ". ";
1079 return;
1080 }
1081
1082 // we need to switch the ledger we're working from
1083 if (auto newLedger = adaptor_.acquireLedger(prevLedgerID_))
1084 {
1085 JLOG(j_.info()) << "Have the consensus ledger " << prevLedgerID_;
1086 CLOG(clog) << "Have the consensus ledger " << prevLedgerID_ << ". ";
1087 startRoundInternal(
1088 now_, lgrId, *newLedger, ConsensusMode::switchedLedger, clog);
1089 }
1090 else
1091 {
1092 CLOG(clog) << "Still on wrong ledger. ";
1093 mode_.set(ConsensusMode::wrongLedger, adaptor_);
1094 }
1095}
1096
1097template <class Adaptor>
1098void
1100{
1101 CLOG(clog) << "checkLedger. ";
1102
1103 auto netLgr =
1104 adaptor_.getPrevLedger(prevLedgerID_, previousLedger_, mode_.get());
1105 CLOG(clog) << "network ledgerid " << netLgr << ", "
1106 << "previous ledger " << prevLedgerID_ << ". ";
1107
1108 if (netLgr != prevLedgerID_)
1109 {
1111 ss << "View of consensus changed during " << to_string(phase_)
1112 << " mode=" << to_string(mode_.get()) << ", " << prevLedgerID_
1113 << " to " << netLgr << ", "
1114 << Json::Compact{previousLedger_.getJson()} << ". ";
1115 JLOG(j_.warn()) << ss.str();
1116 CLOG(clog) << ss.str();
1117 CLOG(clog) << "State on consensus change "
1118 << Json::Compact{getJson(true)} << ". ";
1119 handleWrongLedger(netLgr, clog);
1120 }
1121 else if (previousLedger_.id() != prevLedgerID_)
1122 {
1123 CLOG(clog) << "previousLedger_.id() != prevLedgerID_: "
1124 << previousLedger_.id() << ',' << to_string(prevLedgerID_)
1125 << ". ";
1126 handleWrongLedger(netLgr, clog);
1127 }
1128}
1129
1130template <class Adaptor>
1131void
1133{
1134 for (auto const& it : recentPeerPositions_)
1135 {
1136 for (auto const& pos : it.second)
1137 {
1138 if (pos.proposal().prevLedger() == prevLedgerID_)
1139 {
1140 if (peerProposalInternal(now_, pos))
1141 adaptor_.share(pos);
1142 }
1143 }
1144 }
1145}
1146
1147template <class Adaptor>
1148void
1150{
1151 CLOG(clog) << "phaseOpen. ";
1152 using namespace std::chrono;
1153
1154 // it is shortly before ledger close time
1155 bool anyTransactions = adaptor_.hasOpenTransactions();
1156 auto proposersClosed = currPeerPositions_.size();
1157 auto proposersValidated = adaptor_.proposersValidated(prevLedgerID_);
1158
1159 openTime_.tick(clock_.now());
1160
1161 // This computes how long since last ledger's close time
1162 milliseconds sinceClose;
1163 {
1164 auto const mode = mode_.get();
1165 bool const closeAgree = previousLedger_.closeAgree();
1166 auto const prevCloseTime = previousLedger_.closeTime();
1167 auto const prevParentCloseTimePlus1 =
1168 previousLedger_.parentCloseTime() + 1s;
1169 bool const previousCloseCorrect =
1170 (mode != ConsensusMode::wrongLedger) && closeAgree &&
1171 (prevCloseTime != prevParentCloseTimePlus1);
1172
1173 auto const lastCloseTime = previousCloseCorrect
1174 ? prevCloseTime // use consensus timing
1175 : prevCloseTime_; // use the time we saw internally
1176
1177 if (now_ >= lastCloseTime)
1178 sinceClose = duration_cast<milliseconds>(now_ - lastCloseTime);
1179 else
1180 sinceClose = -duration_cast<milliseconds>(lastCloseTime - now_);
1181 CLOG(clog) << "calculating how long since last ledger's close time "
1182 "based on mode : "
1183 << to_string(mode) << ", previous closeAgree: " << closeAgree
1184 << ", previous close time: " << to_string(prevCloseTime)
1185 << ", previous parent close time + 1s: "
1186 << to_string(prevParentCloseTimePlus1)
1187 << ", previous close time seen internally: "
1188 << to_string(prevCloseTime_)
1189 << ", last close time: " << to_string(lastCloseTime)
1190 << ", since close: " << sinceClose.count() << ". ";
1191 }
1192
1193 auto const idleInterval = std::max<milliseconds>(
1194 adaptor_.parms().ledgerIDLE_INTERVAL,
1195 2 * previousLedger_.closeTimeResolution());
1196 CLOG(clog) << "idle interval set to " << idleInterval.count()
1197 << "ms based on "
1198 << "ledgerIDLE_INTERVAL: "
1199 << adaptor_.parms().ledgerIDLE_INTERVAL.count()
1200 << ", previous ledger close time resolution: "
1201 << previousLedger_.closeTimeResolution().count() << "ms. ";
1202
1203 // Decide if we should close the ledger
1205 anyTransactions,
1206 prevProposers_,
1207 proposersClosed,
1208 proposersValidated,
1209 prevRoundTime_,
1210 sinceClose,
1211 openTime_.read(),
1212 idleInterval,
1213 adaptor_.parms(),
1214 j_,
1215 clog))
1216 {
1217 CLOG(clog) << "closing ledger. ";
1218 closeLedger(clog);
1219 }
1220}
1221
1222template <class Adaptor>
1223bool
1226{
1227 CLOG(clog) << "shouldPause? ";
1228 auto const& parms = adaptor_.parms();
1229 std::uint32_t const ahead(
1230 previousLedger_.seq() -
1231 std::min(adaptor_.getValidLedgerIndex(), previousLedger_.seq()));
1232 auto [quorum, trustedKeys] = adaptor_.getQuorumKeys();
1233 std::size_t const totalValidators = trustedKeys.size();
1234 std::size_t laggards =
1235 adaptor_.laggards(previousLedger_.seq(), trustedKeys);
1236 std::size_t const offline = trustedKeys.size();
1237
1238 std::stringstream vars;
1239 vars << " consensuslog (working seq: " << previousLedger_.seq() << ", "
1240 << "validated seq: " << adaptor_.getValidLedgerIndex() << ", "
1241 << "am validator: " << adaptor_.validator() << ", "
1242 << "have validated: " << adaptor_.haveValidated() << ", "
1243 << "roundTime: " << result_->roundTime.read().count() << ", "
1244 << "max consensus time: " << parms.ledgerMAX_CONSENSUS.count() << ", "
1245 << "validators: " << totalValidators << ", "
1246 << "laggards: " << laggards << ", "
1247 << "offline: " << offline << ", "
1248 << "quorum: " << quorum << ")";
1249
1250 if (!ahead || !laggards || !totalValidators || !adaptor_.validator() ||
1251 !adaptor_.haveValidated() ||
1252 result_->roundTime.read() > parms.ledgerMAX_CONSENSUS)
1253 {
1254 j_.debug() << "not pausing (early)" << vars.str();
1255 CLOG(clog) << "Not pausing (early). ";
1256 return false;
1257 }
1258
1259 bool willPause = false;
1260
1274 constexpr static std::size_t maxPausePhase = 4;
1275
1295 std::size_t const phase = (ahead - 1) % (maxPausePhase + 1);
1296
1297 // validators that remain after the laggards() function are considered
1298 // offline, and should be considered as laggards for purposes of
1299 // evaluating whether the threshold for non-laggards has been reached.
1300 switch (phase)
1301 {
1302 case 0:
1303 // Laggards and offline shouldn't preclude consensus.
1304 if (laggards + offline > totalValidators - quorum)
1305 willPause = true;
1306 break;
1307 case maxPausePhase:
1308 // No tolerance.
1309 willPause = true;
1310 break;
1311 default:
1312 // Ensure that sufficient validators are known to be not lagging.
1313 // Their sufficiently most recent validation sequence was equal to
1314 // or greater than our own.
1315 //
1316 // The threshold is the amount required for quorum plus
1317 // the proportion of the remainder based on number of intermediate
1318 // phases between 0 and max.
1319 float const nonLaggards = totalValidators - (laggards + offline);
1320 float const quorumRatio =
1321 static_cast<float>(quorum) / totalValidators;
1322 float const allowedDissent = 1.0f - quorumRatio;
1323 float const phaseFactor = static_cast<float>(phase) / maxPausePhase;
1324
1325 if (nonLaggards / totalValidators <
1326 quorumRatio + (allowedDissent * phaseFactor))
1327 {
1328 willPause = true;
1329 }
1330 }
1331
1332 if (willPause)
1333 {
1334 j_.warn() << "pausing" << vars.str();
1335 CLOG(clog) << "pausing " << vars.str() << ". ";
1336 }
1337 else
1338 {
1339 j_.debug() << "not pausing" << vars.str();
1340 CLOG(clog) << "not pausing. ";
1341 }
1342 return willPause;
1343}
1344
1345template <class Adaptor>
1346void
1349{
1350 CLOG(clog) << "phaseEstablish. ";
1351 // can only establish consensus if we already took a stance
1352 XRPL_ASSERT(result_, "ripple::Consensus::phaseEstablish : result is set");
1353
1354 ++peerUnchangedCounter_;
1355 ++establishCounter_;
1356
1357 using namespace std::chrono;
1358 ConsensusParms const& parms = adaptor_.parms();
1359
1360 result_->roundTime.tick(clock_.now());
1361 result_->proposers = currPeerPositions_.size();
1362
1363 convergePercent_ = result_->roundTime.read() * 100 /
1364 std::max<milliseconds>(prevRoundTime_, parms.avMIN_CONSENSUS_TIME);
1365 CLOG(clog) << "convergePercent_ " << convergePercent_
1366 << " is based on round duration so far: "
1367 << result_->roundTime.read().count() << "ms, "
1368 << "previous round duration: " << prevRoundTime_.count()
1369 << "ms, "
1370 << "avMIN_CONSENSUS_TIME: " << parms.avMIN_CONSENSUS_TIME.count()
1371 << "ms. ";
1372
1373 // Give everyone a chance to take an initial position
1374 if (result_->roundTime.read() < parms.ledgerMIN_CONSENSUS)
1375 {
1376 CLOG(clog) << "ledgerMIN_CONSENSUS not reached: "
1377 << parms.ledgerMIN_CONSENSUS.count() << "ms. ";
1378 return;
1379 }
1380
1381 updateOurPositions(clog);
1382
1383 // Nothing to do if too many laggards or we don't have consensus.
1384 if (shouldPause(clog) || !haveConsensus(clog))
1385 return;
1386
1387 if (!haveCloseTimeConsensus_)
1388 {
1389 JLOG(j_.info()) << "We have TX consensus but not CT consensus";
1390 CLOG(clog) << "We have TX consensus but not CT consensus. ";
1391 return;
1392 }
1393
1394 JLOG(j_.info()) << "Converge cutoff (" << currPeerPositions_.size()
1395 << " participants)";
1396 CLOG(clog) << "Converge cutoff (" << currPeerPositions_.size()
1397 << " participants). Transitioned to ConsensusPhase::accepted. ";
1398 adaptor_.updateOperatingMode(currPeerPositions_.size());
1399 prevProposers_ = currPeerPositions_.size();
1400 prevRoundTime_ = result_->roundTime.read();
1401 phase_ = ConsensusPhase::accepted;
1402 JLOG(j_.debug()) << "transitioned to ConsensusPhase::accepted";
1403 adaptor_.onAccept(
1404 *result_,
1405 previousLedger_,
1406 closeResolution_,
1407 rawCloseTimes_,
1408 mode_.get(),
1409 getJson(true),
1410 adaptor_.validating());
1411}
1412
1413template <class Adaptor>
1414void
1416{
1417 // We should not be closing if we already have a position
1418 XRPL_ASSERT(!result_, "ripple::Consensus::closeLedger : result is not set");
1419
1421 JLOG(j_.debug()) << "transitioned to ConsensusPhase::establish";
1422 rawCloseTimes_.self = now_;
1423 peerUnchangedCounter_ = 0;
1424 establishCounter_ = 0;
1425
1426 result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get()));
1427 result_->roundTime.reset(clock_.now());
1428 // Share the newly created transaction set if we haven't already
1429 // received it from a peer
1430 if (acquired_.emplace(result_->txns.id(), result_->txns).second)
1431 adaptor_.share(result_->txns);
1432
1433 auto const mode = mode_.get();
1434 CLOG(clog)
1435 << "closeLedger transitioned to ConsensusPhase::establish, mode: "
1436 << to_string(mode)
1437 << ", number of peer positions: " << currPeerPositions_.size() << ". ";
1438 if (mode == ConsensusMode::proposing)
1439 adaptor_.propose(result_->position);
1440
1441 // Create disputes with any peer positions we have transactions for
1442 for (auto const& pit : currPeerPositions_)
1443 {
1444 auto const& pos = pit.second.proposal().position();
1445 auto const it = acquired_.find(pos);
1446 if (it != acquired_.end())
1447 createDisputes(it->second, clog);
1448 }
1449}
1450
1463inline int
1464participantsNeeded(int participants, int percent)
1465{
1466 int result = ((participants * percent) + (percent / 2)) / 100;
1467
1468 return (result == 0) ? 1 : result;
1469}
1470
1471template <class Adaptor>
1472void
1475{
1476 // We must have a position if we are updating it
1477 XRPL_ASSERT(
1478 result_, "ripple::Consensus::updateOurPositions : result is set");
1479 ConsensusParms const& parms = adaptor_.parms();
1480
1481 // Compute a cutoff time
1482 auto const peerCutoff = now_ - parms.proposeFRESHNESS;
1483 auto const ourCutoff = now_ - parms.proposeINTERVAL;
1484 CLOG(clog) << "updateOurPositions. peerCutoff " << to_string(peerCutoff)
1485 << ", ourCutoff " << to_string(ourCutoff) << ". ";
1486
1487 // Verify freshness of peer positions and compute close times
1489 {
1490 auto it = currPeerPositions_.begin();
1491 while (it != currPeerPositions_.end())
1492 {
1493 Proposal_t const& peerProp = it->second.proposal();
1494 if (peerProp.isStale(peerCutoff))
1495 {
1496 // peer's proposal is stale, so remove it
1497 NodeID_t const& peerID = peerProp.nodeID();
1498 JLOG(j_.warn()) << "Removing stale proposal from " << peerID;
1499 for (auto& dt : result_->disputes)
1500 dt.second.unVote(peerID);
1501 it = currPeerPositions_.erase(it);
1502 }
1503 else
1504 {
1505 // proposal is still fresh
1506 ++closeTimeVotes[asCloseTime(peerProp.closeTime())];
1507 ++it;
1508 }
1509 }
1510 }
1511
1512 // This will stay unseated unless there are any changes
1513 std::optional<TxSet_t> ourNewSet;
1514
1515 // Update votes on disputed transactions
1516 {
1518 for (auto& [txId, dispute] : result_->disputes)
1519 {
1520 // Because the threshold for inclusion increases,
1521 // time can change our position on a dispute
1522 if (dispute.updateVote(
1523 convergePercent_,
1524 mode_.get() == ConsensusMode::proposing,
1525 parms))
1526 {
1527 if (!mutableSet)
1528 mutableSet.emplace(result_->txns);
1529
1530 if (dispute.getOurVote())
1531 {
1532 // now a yes
1533 mutableSet->insert(dispute.tx());
1534 }
1535 else
1536 {
1537 // now a no
1538 mutableSet->erase(txId);
1539 }
1540 }
1541 }
1542
1543 if (mutableSet)
1544 ourNewSet.emplace(std::move(*mutableSet));
1545 }
1546
1547 NetClock::time_point consensusCloseTime = {};
1548 haveCloseTimeConsensus_ = false;
1549
1550 if (currPeerPositions_.empty())
1551 {
1552 // no other times
1553 haveCloseTimeConsensus_ = true;
1554 consensusCloseTime = asCloseTime(result_->position.closeTime());
1555 }
1556 else
1557 {
1558 // We don't track rounds for close time, so just pass 0s
1559 auto const [neededWeight, newState] = getNeededWeight(
1560 parms, closeTimeAvalancheState_, convergePercent_, 0, 0);
1561 if (newState)
1562 closeTimeAvalancheState_ = *newState;
1563 CLOG(clog) << "neededWeight " << neededWeight << ". ";
1564
1565 int participants = currPeerPositions_.size();
1566 if (mode_.get() == ConsensusMode::proposing)
1567 {
1568 ++closeTimeVotes[asCloseTime(result_->position.closeTime())];
1569 ++participants;
1570 }
1571
1572 // Threshold for non-zero vote
1573 int threshVote = participantsNeeded(participants, neededWeight);
1574
1575 // Threshold to declare consensus
1576 int const threshConsensus =
1577 participantsNeeded(participants, parms.avCT_CONSENSUS_PCT);
1578
1580 ss << "Proposers:" << currPeerPositions_.size()
1581 << " nw:" << neededWeight << " thrV:" << threshVote
1582 << " thrC:" << threshConsensus;
1583 JLOG(j_.info()) << ss.str();
1584 CLOG(clog) << ss.str();
1585
1586 for (auto const& [t, v] : closeTimeVotes)
1587 {
1588 JLOG(j_.debug())
1589 << "CCTime: seq "
1590 << static_cast<std::uint32_t>(previousLedger_.seq()) + 1 << ": "
1591 << t.time_since_epoch().count() << " has " << v << ", "
1592 << threshVote << " required";
1593
1594 if (v >= threshVote)
1595 {
1596 // A close time has enough votes for us to try to agree
1597 consensusCloseTime = t;
1598 threshVote = v;
1599
1600 if (threshVote >= threshConsensus)
1601 haveCloseTimeConsensus_ = true;
1602 }
1603 }
1604
1605 if (!haveCloseTimeConsensus_)
1606 {
1607 JLOG(j_.debug())
1608 << "No CT consensus:"
1609 << " Proposers:" << currPeerPositions_.size()
1610 << " Mode:" << to_string(mode_.get())
1611 << " Thresh:" << threshConsensus
1612 << " Pos:" << consensusCloseTime.time_since_epoch().count();
1613 CLOG(clog) << "No close time consensus. ";
1614 }
1615 }
1616
1617 if (!ourNewSet &&
1618 ((consensusCloseTime != asCloseTime(result_->position.closeTime())) ||
1619 result_->position.isStale(ourCutoff)))
1620 {
1621 // close time changed or our position is stale
1622 ourNewSet.emplace(result_->txns);
1623 }
1624
1625 if (ourNewSet)
1626 {
1627 auto newID = ourNewSet->id();
1628
1629 result_->txns = std::move(*ourNewSet);
1630
1632 ss << "Position change: CTime "
1633 << consensusCloseTime.time_since_epoch().count() << ", tx " << newID;
1634 JLOG(j_.info()) << ss.str();
1635 CLOG(clog) << ss.str();
1636
1637 result_->position.changePosition(newID, consensusCloseTime, now_);
1638
1639 // Share our new transaction set and update disputes
1640 // if we haven't already received it
1641 if (acquired_.emplace(newID, result_->txns).second)
1642 {
1643 if (!result_->position.isBowOut())
1644 adaptor_.share(result_->txns);
1645
1646 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1647 {
1648 Proposal_t const& p = peerPos.proposal();
1649 if (p.position() == newID)
1650 updateDisputes(nodeId, result_->txns);
1651 }
1652 }
1653
1654 // Share our new position if we are still participating this round
1655 if (!result_->position.isBowOut() &&
1656 (mode_.get() == ConsensusMode::proposing))
1657 adaptor_.propose(result_->position);
1658 }
1659}
1660
1661template <class Adaptor>
1662bool
1665{
1666 // Must have a stance if we are checking for consensus
1667 XRPL_ASSERT(result_, "ripple::Consensus::haveConsensus : has result");
1668
1669 // CHECKME: should possibly count unacquired TX sets as disagreeing
1670 int agree = 0, disagree = 0;
1671
1672 auto ourPosition = result_->position.position();
1673
1674 // Count number of agreements/disagreements with our position
1675 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1676 {
1677 Proposal_t const& peerProp = peerPos.proposal();
1678 if (peerProp.position() == ourPosition)
1679 {
1680 ++agree;
1681 }
1682 else
1683 {
1684 JLOG(j_.debug()) << "Proposal disagreement: Peer " << nodeId
1685 << " has " << peerProp.position();
1686 ++disagree;
1687 }
1688 }
1689 auto currentFinished =
1690 adaptor_.proposersFinished(previousLedger_, prevLedgerID_);
1691
1692 JLOG(j_.debug()) << "Checking for TX consensus: agree=" << agree
1693 << ", disagree=" << disagree;
1694
1695 ConsensusParms const& parms = adaptor_.parms();
1696 // Stalling is BAD. It means that we have a consensus on the close time, so
1697 // peers are talking, but we have disputed transactions that peers are
1698 // unable or unwilling to come to agreement on one way or the other.
1699 bool const stalled = haveCloseTimeConsensus_ &&
1700 !result_->disputes.empty() &&
1701 std::ranges::all_of(result_->disputes,
1702 [this, &parms, &clog](auto const& dispute) {
1703 return dispute.second.stalled(
1704 parms,
1705 mode_.get() == ConsensusMode::proposing,
1706 peerUnchangedCounter_,
1707 j_,
1708 clog);
1709 });
1710 if (stalled)
1711 {
1713 ss << "Consensus detects as stalled with " << (agree + disagree) << "/"
1714 << prevProposers_ << " proposers, and " << result_->disputes.size()
1715 << " stalled disputed transactions.";
1716 JLOG(j_.error()) << ss.str();
1717 CLOG(clog) << ss.str();
1718 }
1719
1720 // Determine if we actually have consensus or not
1721 result_->state = checkConsensus(
1722 prevProposers_,
1723 agree + disagree,
1724 agree,
1725 currentFinished,
1726 prevRoundTime_,
1727 result_->roundTime.read(),
1728 stalled,
1729 parms,
1730 mode_.get() == ConsensusMode::proposing,
1731 j_,
1732 clog);
1733
1734 if (result_->state == ConsensusState::No)
1735 {
1736 CLOG(clog) << "No consensus. ";
1737 return false;
1738 }
1739
1740 // Consensus has taken far too long. Drop out of the round.
1741 if (result_->state == ConsensusState::Expired)
1742 {
1743 static auto const minimumCounter =
1744 parms.avalancheCutoffs.size() * parms.avMIN_ROUNDS;
1746 if (establishCounter_ < minimumCounter)
1747 {
1748 // If each round of phaseEstablish takes a very long time, we may
1749 // "expire" before we've given consensus enough time at each
1750 // avalanche level to actually come to a consensus. In that case,
1751 // keep trying. This should only happen if there are an extremely
1752 // large number of disputes such that each round takes an inordinate
1753 // amount of time.
1754
1755 ss << "Consensus time has expired in round " << establishCounter_
1756 << "; continue until round " << minimumCounter << ". "
1757 << Json::Compact{getJson(false)};
1758 JLOG(j_.error()) << ss.str();
1759 CLOG(clog) << ss.str() << ". ";
1760 return false;
1761 }
1762 ss << "Consensus expired. " << Json::Compact{getJson(true)};
1763 JLOG(j_.error()) << ss.str();
1764 CLOG(clog) << ss.str() << ". ";
1765 leaveConsensus(clog);
1766 }
1767 // There is consensus, but we need to track if the network moved on
1768 // without us.
1769 if (result_->state == ConsensusState::MovedOn)
1770 {
1771 JLOG(j_.error()) << "Unable to reach consensus";
1772 JLOG(j_.error()) << Json::Compact{getJson(true)};
1773 CLOG(clog) << "Unable to reach consensus "
1774 << Json::Compact{getJson(true)} << ". ";
1775 }
1776
1777 CLOG(clog) << "Consensus has been reached. ";
1778 return true;
1779}
1780
1781template <class Adaptor>
1782void
1785{
1786 if (mode_.get() == ConsensusMode::proposing)
1787 {
1788 if (result_ && !result_->position.isBowOut())
1789 {
1790 result_->position.bowOut(now_);
1791 adaptor_.propose(result_->position);
1792 }
1793
1794 mode_.set(ConsensusMode::observing, adaptor_);
1795 JLOG(j_.info()) << "Bowing out of consensus";
1796 CLOG(clog) << "Bowing out of consensus. ";
1797 }
1798}
1799
1800template <class Adaptor>
1801void
1803 TxSet_t const& o,
1805{
1806 // Cannot create disputes without our stance
1807 XRPL_ASSERT(result_, "ripple::Consensus::createDisputes : result is set");
1808
1809 // Only create disputes if this is a new set
1810 auto const emplaced = result_->compares.emplace(o.id()).second;
1811 CLOG(clog) << "createDisputes: new set? " << !emplaced << ". ";
1812 if (!emplaced)
1813 return;
1814
1815 // Nothing to dispute if we agree
1816 if (result_->txns.id() == o.id())
1817 {
1818 CLOG(clog) << "both sets are identical. ";
1819 return;
1820 }
1821
1822 CLOG(clog) << "comparing existing with new set: " << result_->txns.id()
1823 << ',' << o.id() << ". ";
1824 JLOG(j_.debug()) << "createDisputes " << result_->txns.id() << " to "
1825 << o.id();
1826
1827 auto differences = result_->txns.compare(o);
1828
1829 int dc = 0;
1830
1831 for (auto const& [txId, inThisSet] : differences)
1832 {
1833 ++dc;
1834 // create disputed transactions (from the ledger that has them)
1835 XRPL_ASSERT(
1836 (inThisSet && result_->txns.find(txId) && !o.find(txId)) ||
1837 (!inThisSet && !result_->txns.find(txId) && o.find(txId)),
1838 "ripple::Consensus::createDisputes : has disputed transactions");
1839
1840 Tx_t tx = inThisSet ? result_->txns.find(txId) : o.find(txId);
1841 auto txID = tx.id();
1842
1843 if (result_->disputes.find(txID) != result_->disputes.end())
1844 continue;
1845
1846 JLOG(j_.debug()) << "Transaction " << txID << " is disputed";
1847
1848 typename Result::Dispute_t dtx{
1849 tx,
1850 result_->txns.exists(txID),
1851 std::max(prevProposers_, currPeerPositions_.size()),
1852 j_};
1853
1854 // Update all of the available peer's votes on the disputed transaction
1855 for (auto const& [nodeId, peerPos] : currPeerPositions_)
1856 {
1857 Proposal_t const& peerProp = peerPos.proposal();
1858 auto const cit = acquired_.find(peerProp.position());
1859 if (cit != acquired_.end() &&
1860 dtx.setVote(nodeId, cit->second.exists(txID)))
1861 peerUnchangedCounter_ = 0;
1862 }
1863 adaptor_.share(dtx.tx());
1864
1865 result_->disputes.emplace(txID, std::move(dtx));
1866 }
1867 JLOG(j_.debug()) << dc << " differences found";
1868 CLOG(clog) << "disputes: " << dc << ". ";
1869}
1870
1871template <class Adaptor>
1872void
1874{
1875 // Cannot updateDisputes without our stance
1876 XRPL_ASSERT(result_, "ripple::Consensus::updateDisputes : result is set");
1877
1878 // Ensure we have created disputes against this set if we haven't seen
1879 // it before
1880 if (result_->compares.find(other.id()) == result_->compares.end())
1881 createDisputes(other);
1882
1883 for (auto& it : result_->disputes)
1884 {
1885 auto& d = it.second;
1886 if (d.setVote(node, other.exists(d.tx().id())))
1887 peerUnchangedCounter_ = 0;
1888 }
1889}
1890
1891template <class Adaptor>
1894{
1895 return roundCloseTime(raw, closeResolution_);
1896}
1897
1898} // namespace ripple
1899
1900#endif
T all_of(T... args)
T begin(T... args)
Decorator for streaming out compact json.
Represents a JSON value.
Definition json_value.h:131
Value & append(Value const &value)
Append value to array at the end.
Json::Int Int
Definition json_value.h:139
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Stream warn() const
Definition Journal.h:321
NodeID_t const & nodeID() const
Identifying which peer took this position.
NetClock::time_point const & closeTime() const
The current position on the consensus close time.
Position_t const & position() const
Get the proposed position.
bool isStale(NetClock::time_point cutoff) const
Get whether this position is stale relative to the provided cutoff.
Measures the duration of phases of consensus.
void set(ConsensusMode mode, Adaptor &a)
Definition Consensus.h:309
ConsensusMode get() const
Definition Consensus.h:303
Generic implementation of consensus algorithm.
Definition Consensus.h:279
hash_map< typename TxSet_t::ID, TxSet_t const > acquired_
Definition Consensus.h:587
void playbackProposals()
If we radically changed our consensus context for some reason, we need to replay recent proposals so ...
Definition Consensus.h:1132
void timerEntry(NetClock::time_point const &now, std::unique_ptr< std::stringstream > const &clog={})
Call periodically to drive consensus forward.
Definition Consensus.h:840
ConsensusTimer openTime_
Definition Consensus.h:560
void startRoundInternal(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t const &prevLedger, ConsensusMode mode, std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:678
void phaseEstablish(std::unique_ptr< std::stringstream > const &clog)
Handle establish phase.
Definition Consensus.h:1347
typename Adaptor::PeerPosition_t PeerPosition_t
Definition Consensus.h:284
ConsensusPhase phase_
Definition Consensus.h:548
NetClock::time_point prevCloseTime_
Definition Consensus.h:576
clock_type const & clock_
Definition Consensus.h:553
void leaveConsensus(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1783
void updateDisputes(NodeID_t const &node, TxSet_t const &other)
Definition Consensus.h:1873
Ledger_t previousLedger_
Definition Consensus.h:584
typename Adaptor::TxSet_t TxSet_t
Definition Consensus.h:281
ConsensusParms::AvalancheState closeTimeAvalancheState_
Definition Consensus.h:564
std::size_t establishCounter_
Definition Consensus.h:597
void updateOurPositions(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1473
bool haveConsensus(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1663
Ledger_t::ID prevLedgerID() const
Get the previous ledger ID.
Definition Consensus.h:412
void handleWrongLedger(typename Ledger_t::ID const &lgrId, std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1043
void checkLedger(std::unique_ptr< std::stringstream > const &clog)
Check if our previous ledger matches the network's.
Definition Consensus.h:1099
hash_map< NodeID_t, std::deque< PeerPosition_t > > recentPeerPositions_
Definition Consensus.h:607
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Simulate the consensus process without any network traffic.
Definition Consensus.h:921
Json::Value getJson(bool full) const
Get the Json state of the consensus process.
Definition Consensus.h:945
typename TxSet_t::Tx Tx_t
Definition Consensus.h:283
void startRound(NetClock::time_point const &now, typename Ledger_t::ID const &prevLedgerID, Ledger_t prevLedger, hash_set< NodeID_t > const &nowUntrusted, bool proposing, std::unique_ptr< std::stringstream > const &clog={})
Kick-off the next round of consensus.
Definition Consensus.h:631
Consensus(Consensus &&) noexcept=default
NetClock::time_point now_
Definition Consensus.h:575
std::size_t prevProposers_
Definition Consensus.h:610
NetClock::time_point asCloseTime(NetClock::time_point raw) const
Definition Consensus.h:1893
beast::Journal const j_
Definition Consensus.h:616
void createDisputes(TxSet_t const &o, std::unique_ptr< std::stringstream > const &clog={})
Definition Consensus.h:1802
void gotTxSet(NetClock::time_point const &now, TxSet_t const &txSet)
Process a transaction set acquired from the network.
Definition Consensus.h:873
Adaptor & adaptor_
Definition Consensus.h:546
typename Adaptor::Ledger_t Ledger_t
Definition Consensus.h:280
ConsensusPhase phase() const
Definition Consensus.h:418
std::size_t peerUnchangedCounter_
Definition Consensus.h:594
typename Adaptor::NodeID_t NodeID_t
Definition Consensus.h:282
void closeLedger(std::unique_ptr< std::stringstream > const &clog)
Definition Consensus.h:1415
NetClock::duration closeResolution_
Definition Consensus.h:562
bool peerProposal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
A peer has proposed a new position, adjust our tracking.
Definition Consensus.h:724
bool peerProposalInternal(NetClock::time_point const &now, PeerPosition_t const &newProposal)
Handle a replayed or a new peer proposal.
Definition Consensus.h:745
MonitoredMode mode_
Definition Consensus.h:549
hash_map< NodeID_t, PeerPosition_t > currPeerPositions_
Definition Consensus.h:603
bool shouldPause(std::unique_ptr< std::stringstream > const &clog) const
Evaluate whether pausing increases likelihood of validation.
Definition Consensus.h:1224
void phaseOpen(std::unique_ptr< std::stringstream > const &clog)
Handle pre-close phase.
Definition Consensus.h:1149
ConsensusCloseTimes rawCloseTimes_
Definition Consensus.h:590
std::chrono::milliseconds prevRoundTime_
Definition Consensus.h:568
std::optional< Result > result_
Definition Consensus.h:589
hash_set< NodeID_t > deadNodes_
Definition Consensus.h:613
Ledger_t::ID prevLedgerID_
Definition Consensus.h:582
bool haveCloseTimeConsensus_
Definition Consensus.h:551
A transaction discovered to be in dispute during consensus.
Definition DisputedTx.h:30
T emplace(T... args)
T is_same_v
T max(T... args)
T min(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:26
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::pair< std::size_t, std::optional< ConsensusParms::AvalancheState > > getNeededWeight(ConsensusParms const &p, ConsensusParms::AvalancheState currentState, int percentTime, std::size_t currentRounds, std::size_t minimumRounds)
ConsensusMode
Represents how a node currently participates in Consensus.
@ wrongLedger
We have the wrong ledger and are attempting to acquire it.
@ proposing
We are normal participant in consensus and propose our position.
@ switchedLedger
We switched ledgers since we started this consensus round but are now running on what we believe is t...
@ observing
We are observing peer positions, but not proposing our position.
ConsensusState checkConsensus(std::size_t prevProposers, std::size_t currentProposers, std::size_t currentAgree, std::size_t currentFinished, std::chrono::milliseconds previousAgreeTime, std::chrono::milliseconds currentAgreeTime, bool stalled, ConsensusParms const &parms, bool proposing, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determine whether the network reached consensus and whether we joined.
std::chrono::time_point< Clock, Duration > roundCloseTime(std::chrono::time_point< Clock, Duration > closeTime, std::chrono::duration< Rep, Period > closeResolution)
Calculates the close time for a ledger, given a close time resolution.
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
auto constexpr ledgerDefaultTimeResolution
Initial resolution of ledger close time.
ConsensusPhase
Phases of consensus for a single ledger round.
@ accepted
We have accepted a new last closed ledger and are waiting on a call to startRound to begin the next c...
@ open
We haven't closed our ledger yet, but others might have.
@ establish
Establishing consensus by exchanging proposals with our peers.
ConsensusState
Whether we have or don't have a consensus.
@ Expired
Consensus time limit has hard-expired.
@ MovedOn
The network has consensus without us.
@ No
We do not have consensus.
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:611
bool shouldCloseLedger(bool anyTransactions, std::size_t prevProposers, std::size_t proposersClosed, std::size_t proposersValidated, std::chrono::milliseconds prevRoundTime, std::chrono::milliseconds timeSincePrevClose, std::chrono::milliseconds openTime, std::chrono::milliseconds idleInterval, ConsensusParms const &parms, beast::Journal j, std::unique_ptr< std::stringstream > const &clog)
Determines whether the current ledger should close at this time.
Definition Consensus.cpp:8
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
int participantsNeeded(int participants, int percent)
How many of the participants must agree to reach a given threshold?
Definition Consensus.h:1464
std::chrono::duration< Rep, Period > getNextLedgerTimeResolution(std::chrono::duration< Rep, Period > previousResolution, bool previousAgree, Seq ledgerSeq)
Calculates the close time resolution for the specified ledger.
STL namespace.
T read(T... args)
T size(T... args)
T str(T... args)
Stores the set of initial close times.
Consensus algorithm parameters.
std::size_t const avCT_CONSENSUS_PCT
Percentage of nodes required to reach agreement on ledger close time.
std::chrono::seconds const proposeINTERVAL
How often we force generating a new proposal to keep ours fresh.
std::size_t const avMIN_ROUNDS
Number of rounds before certain actions can happen.
std::chrono::milliseconds const avMIN_CONSENSUS_TIME
The minimum amount of time to consider the previous round to have taken.
std::chrono::milliseconds const ledgerMIN_CONSENSUS
The number of seconds we wait minimum to ensure participation.
std::map< AvalancheState, AvalancheCutoff > const avalancheCutoffs
Map the consensus requirement avalanche state to the amount of time that must pass before moving to t...
std::chrono::seconds const proposeFRESHNESS
How long we consider a proposal fresh.
Encapsulates the result of consensus.
T time_since_epoch(T... args)
T to_string(T... args)
T value_or(T... args)