20 #include <ripple/app/consensus/RCLConsensus.h>
21 #include <ripple/app/consensus/RCLValidations.h>
22 #include <ripple/app/ledger/BuildLedger.h>
23 #include <ripple/app/ledger/InboundLedgers.h>
24 #include <ripple/app/ledger/InboundTransactions.h>
25 #include <ripple/app/ledger/Ledger.h>
26 #include <ripple/app/ledger/LedgerMaster.h>
27 #include <ripple/app/ledger/LocalTxs.h>
28 #include <ripple/app/ledger/OpenLedger.h>
29 #include <ripple/app/misc/AmendmentTable.h>
30 #include <ripple/app/misc/HashRouter.h>
31 #include <ripple/app/misc/LoadFeeTrack.h>
32 #include <ripple/app/misc/NegativeUNLVote.h>
33 #include <ripple/app/misc/NetworkOPs.h>
34 #include <ripple/app/misc/TxQ.h>
35 #include <ripple/app/misc/ValidatorKeys.h>
36 #include <ripple/app/misc/ValidatorList.h>
37 #include <ripple/basics/random.h>
38 #include <ripple/beast/core/LexicalCast.h>
39 #include <ripple/consensus/LedgerTiming.h>
40 #include <ripple/nodestore/DatabaseShard.h>
41 #include <ripple/overlay/Overlay.h>
42 #include <ripple/overlay/predicates.h>
43 #include <ripple/protocol/BuildInfo.h>
44 #include <ripple/protocol/Feature.h>
45 #include <ripple/protocol/digest.h>
69 , consensus_(clock, adaptor_, journal)
83 , feeVote_(
std::move(feeVote))
86 , inboundTransactions_{inboundTransactions}
88 , validatorKeys_(validatorKeys)
94 , nUnlVote_(validatorKeys_.nodeID,
j_)
96 assert(valCookie_ != 0);
98 JLOG(
j_.
info()) <<
"Consensus engine started (cookie: " +
101 if (validatorKeys_.nodeID != beast::zero)
105 JLOG(
j_.
info()) <<
"Validator identity: "
108 validatorKeys_.masterPublicKey);
110 if (validatorKeys_.masterPublicKey != validatorKeys_.publicKey)
113 <<
"Validator ephemeral signing key: "
124 auto built = ledgerMaster_.getLedgerByHash(hash);
127 if (acquiringLedger_ != hash)
130 JLOG(
j_.
warn()) <<
"Need consensus ledger " << hash;
133 acquiringLedger_ = hash;
135 app_.getJobQueue().addJob(
136 jtADVANCE,
"getConsensusLedger", [
id = hash, &app = app_]() {
137 app.getInboundLedgers().
acquire(
144 assert(!built->open() && built->isImmutable());
145 assert(built->info().hash == hash);
148 inboundTransactions_.newRound(built->info().seq);
156 protocol::TMProposeSet prop;
160 prop.set_proposeseq(
proposal.proposeSeq());
161 prop.set_closetime(
proposal.closeTime().time_since_epoch().count());
163 prop.set_currenttxhash(
165 prop.set_previousledger(
169 prop.set_nodepubkey(pk.data(), pk.size());
172 prop.set_signature(sig.data(), sig.size());
174 if (
proposal.ledgerSeq().has_value())
175 prop.set_ledgerseq(*
proposal.ledgerSeq());
184 if (app_.getHashRouter().shouldRelay(tx.
id()))
186 JLOG(
j_.
trace()) <<
"Relaying disputed tx " << tx.
id();
187 auto const slice = tx.
tx_->slice();
188 protocol::TMTransaction msg;
189 msg.set_rawtransaction(slice.data(), slice.size());
190 msg.set_status(protocol::tsNEW);
191 msg.set_receivetimestamp(
192 app_.timeKeeper().now().time_since_epoch().count());
194 app_.overlay().relay(tx.
id(), msg, skip);
198 JLOG(
j_.
trace()) <<
"Not relaying disputed tx " << tx.
id();
204 JLOG(
j_.
debug()) << (
proposal.isBowOut() ?
"We bow out: " :
"We propose: ")
208 protocol::TMProposeSet prop;
210 prop.set_currenttxhash(
212 prop.set_previousledger(
214 prop.set_proposeseq(
proposal.proposeSeq());
215 prop.set_closetime(
proposal.closeTime().time_since_epoch().count());
217 validatorKeys_.publicKey.data(), validatorKeys_.publicKey.size());
218 prop.set_ledgerseq(*
proposal.ledgerSeq());
221 validatorKeys_.publicKey,
222 validatorKeys_.secretKey,
225 prop.set_signature(sig.data(), sig.size());
232 validatorKeys_.publicKey,
235 app_.getHashRouter().addSuppression(suppression);
237 app_.overlay().broadcast(prop);
243 inboundTransactions_.giveSet(txns.
id(), txns.
map_,
false);
249 if (
auto txns = inboundTransactions_.getSet(setId,
true))
259 return !app_.openLedger().empty();
265 return app_.getValidations().numTrustedForLedger(h);
287 ledgerMaster_.getValidLedgerIndex());
289 if (netLgr != ledgerID)
292 app_.getOPs().consensusViewChange();
310 notify(protocol::neCLOSING_LEDGER, ledger, !wrongLCL);
312 auto const& prevLedger = ledger.ledger_;
314 ledgerMaster_.applyHeldTransactions();
316 ledgerMaster_.setBuildingLedger(prevLedger->info().seq + 1);
318 auto initialLedger = app_.openLedger().current();
322 initialSet->setUnbacked();
325 for (
auto const& tx : initialLedger->txs)
327 JLOG(
j_.
trace()) <<
"Adding open ledger TX "
328 << tx.first->getTransactionID();
337 if (app_.config().standalone() || (
proposing && !wrongLCL))
339 if (prevLedger->isFlagLedger())
343 auto validations = app_.validators().negativeUNLFilter(
344 app_.getValidations().getTrustedForLedger(
345 prevLedger->info().parentHash, prevLedger->seq() - 1));
346 if (validations.size() >= app_.validators().quorum())
348 feeVote_->doVoting(prevLedger, validations, initialSet);
349 app_.getAmendmentTable().doVoting(
350 prevLedger, validations, initialSet);
354 prevLedger->isVotingLedger() &&
362 app_.validators().getTrustedMasterKeys(),
363 app_.getValidations(),
369 initialSet = initialSet->snapShot(
false);
373 LedgerIndex const seq = prevLedger->info().seq + 1;
376 initialSet->visitLeaves(
378 seq](boost::intrusive_ptr<SHAMapItem const>
const& item) {
382 censorshipDetector_.propose(std::move(proposed));
386 auto const setHash = initialSet->getHash().as_uint256();
388 std::move(initialSet),
390 initialLedger->info().parentHash,
394 app_.timeKeeper().closeTime(),
395 validatorKeys_.nodeID,
396 initialLedger->info().seq,
409 auto txsBuilt = buildAndValidate(
410 result, prevLedger, closeResolution,
mode, std::move(consensusJson));
411 prepareOpenLedger(std::move(txsBuilt), result, rawCloseTimes,
mode);
422 app_.getJobQueue().addJob(
427 cj = std::move(consensusJson),
428 txsBuilt = std::move(tb)]()
mutable {
434 prepareOpenLedger(std::move(txsBuilt), result, rawCloseTimes,
mode);
435 this->app_.getOPs().endConsensus();
452 bool closeTimeCorrect;
463 using namespace std::chrono_literals;
464 consensusCloseTime = prevLedger.
closeTime() + 1s;
465 closeTimeCorrect =
false;
471 consensusCloseTime, closeResolution, prevLedger.
closeTime());
472 closeTimeCorrect =
true;
476 <<
" val=" << (validating_ ?
"yes" :
"no")
477 <<
" corLCL=" << (haveCorrectLCL ?
"yes" :
"no")
478 <<
" fail=" << (consensusFail ?
"yes" :
"no");
479 JLOG(
j_.
debug()) <<
"Report: Prev = " << prevLedger.
id() <<
":"
491 JLOG(
j_.
debug()) <<
"Building canonical tx set: " << retriableTxs.key();
493 for (
auto const& item : *result.
txns.map_)
498 std::make_shared<STTx const>(
SerialIter{item.slice()}));
499 JLOG(
j_.
trace()) <<
" Tx: " << item.key();
503 failed.
insert(item.key());
505 <<
" Tx: " << item.key() <<
" throws: " << ex.
what();
509 auto built = buildLCL(
518 auto const newLCLHash = built.id();
519 JLOG(
j_.
debug()) <<
"Built ledger #" << built.seq() <<
": " << newLCLHash;
522 notify(protocol::neACCEPTED_LEDGER, built, haveCorrectLCL);
531 result.
txns.map_->visitLeaves(
532 [&
accepted](boost::intrusive_ptr<SHAMapItem const>
const& item) {
537 for (
auto const& r : retriableTxs)
538 failed.
insert(r.first.getTXID());
540 censorshipDetector_.check(
543 j = app_.journal(
"CensorshipDetector"),
545 if (failed.count(id))
548 auto const wait = curr - seq;
550 if (wait && (wait % censorshipWarnInternal == 0))
552 std::ostringstream ss;
553 ss <<
"Potential Censorship: Eligible tx " << id
554 <<
", which we are tracking since ledger " << seq
555 <<
" has not been included as of ledger " << curr <<
".";
557 JLOG(j.warn()) << ss.str();
565 validating_ = ledgerMaster_.isCompatible(
566 *built.ledger_,
j_.
warn(),
"Not validating");
568 if (validating_ && !consensusFail &&
569 app_.getValidations().canValidateSeq(built.seq()))
571 validate(built, result.txns, proposing);
572 JLOG(
j_.
info()) <<
"CNF Val " << newLCLHash;
575 JLOG(
j_.
info()) <<
"CNF buildLCL " << newLCLHash;
578 ledgerMaster_.consensusBuilt(
579 built.ledger_, result.txns.id(), std::move(consensusJson));
581 return {retriableTxs, built};
591 auto& retriableTxs = txsBuilt.first;
592 auto const& built = txsBuilt.second;
607 bool anyDisputes =
false;
608 for (
auto const& [_, dispute] : result.
disputes)
611 if (!dispute.getOurVote())
617 <<
"Test applying disputed transaction that did"
618 <<
" not get in " << dispute.tx().id();
621 auto txn = std::make_shared<STTx const>(sit);
628 retriableTxs.insert(txn);
634 JLOG(
j_.
trace()) <<
"Failed to apply transaction we voted "
646 auto const lastVal = ledgerMaster_.getValidatedLedger();
651 rules.
emplace(app_.config().features);
652 app_.openLedger().accept(
656 localTxs_.getTxSet(),
663 return app_.getTxQ().accept(app_, view);
668 app_.getOPs().reportFeeChange();
673 ledgerMaster_.switchLCL(built.ledger_);
676 assert(ledgerMaster_.getClosedLedger()->info().hash == built.id());
677 assert(app_.openLedger().current()->info().parentHash == built.id());
689 auto closeTime = rawCloseTimes.
self;
691 JLOG(
j_.
info()) <<
"We closed at "
692 << closeTime.time_since_epoch().count();
694 usec64_t closeTotal =
695 std::chrono::duration_cast<usec64_t>(closeTime.time_since_epoch());
698 for (
auto const& [t, v] : rawCloseTimes.
peers)
704 std::chrono::duration_cast<usec64_t>(t.time_since_epoch()) * v;
707 closeTotal += usec64_t(closeCount / 2);
708 closeTotal /= closeCount;
713 auto offset = time_point{closeTotal} -
714 std::chrono::time_point_cast<duration>(closeTime);
715 JLOG(
j_.
info()) <<
"Our close offset is estimated at " << offset.count()
716 <<
" (" << closeCount <<
")";
718 app_.timeKeeper().adjustCloseTime(offset);
724 protocol::NodeEvent ne,
728 protocol::TMStatusChange s;
731 s.set_newevent(protocol::neLOST_SYNC);
735 s.set_ledgerseq(ledger.
seq());
736 s.set_networktime(app_.timeKeeper().now().time_since_epoch().count());
737 s.set_ledgerhashprevious(
744 if (!ledgerMaster_.getFullValidatedRange(uMin, uMax))
752 uMin =
std::max(uMin, ledgerMaster_.getEarliestFetch());
754 s.set_firstseq(uMin);
756 app_.overlay().foreach(
757 send_always(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
758 JLOG(
j_.
trace()) <<
"send status change to peer";
766 bool closeTimeCorrect,
772 if (
auto const replayData = ledgerMaster_.releaseReplay())
774 assert(replayData->parent()->info().hash == previousLedger.
id());
789 using namespace std::chrono_literals;
790 app_.getTxQ().processClosedLedger(app_, *built, roundTime > 5s);
793 if (ledgerMaster_.storeLedger(built))
794 JLOG(
j_.
debug()) <<
"Consensus built ledger we already had";
795 else if (app_.getInboundLedgers().find(built->info().hash))
796 JLOG(
j_.
debug()) <<
"Consensus built ledger we were acquiring";
798 JLOG(
j_.
debug()) <<
"Consensus built new ledger";
808 using namespace std::chrono_literals;
810 auto validationTime = app_.timeKeeper().closeTime();
811 if (validationTime <= lastValidationTime_)
812 validationTime = lastValidationTime_ + 1s;
813 lastValidationTime_ = validationTime;
815 auto v = std::make_shared<STValidation>(
817 validatorKeys_.publicKey,
818 validatorKeys_.secretKey,
819 validatorKeys_.nodeID,
821 v.setFieldH256(sfLedgerHash, ledger.id());
822 v.setFieldH256(sfConsensusHash, txns.id());
824 v.setFieldU32(sfLedgerSequence, ledger.seq());
827 v.setFlag(vfFullValidation);
829 if (ledger.ledger_->rules().enabled(featureHardenedValidations))
834 if (auto const vl = ledgerMaster_.getValidatedLedger())
835 v.setFieldH256(sfValidatedHash, vl->info().hash);
837 v.setFieldU64(sfCookie, valCookie_);
840 if (ledger.ledger_->isVotingLedger())
842 sfServerVersion, BuildInfo::getEncodedVersion());
847 auto const& ft = app_.getFeeTrack();
848 auto const fee = std::max(ft.getLocalFee(), ft.getClusterFee());
849 if (fee > ft.getLoadBase())
850 v.setFieldU32(sfLoadFee, fee);
855 if (ledger.
ledger_->isVotingLedger())
858 feeVote_->doValidation(
859 ledger.ledger_->fees(), ledger.ledger_->rules(), v);
864 auto const amendments = app_.getAmendmentTable().doValidation(
865 getEnabledAmendments(*ledger.ledger_));
867 if (!amendments.empty())
869 sfAmendments, STVector256(sfAmendments, amendments));
873 auto const serialized = v->getSerialized();
881 protocol::TMValidation val;
882 val.set_validation(serialized.data(), serialized.size());
883 app_.overlay().broadcast(val);
886 app_.getOPs().pubValidation(v);
892 JLOG(
j_.
info()) <<
"Consensus mode change before=" <<
to_string(before)
900 censorshipDetector_.reset();
911 static bool const standalone = ledgerMaster_.standalone();
912 auto const& validLedger = ledgerMaster_.getValidatedLedger();
914 return (app_.getOPs().isFull() && !standalone &&
915 (validLedger && (newLedger.
id() != validLedger->info().hash) &&
916 (newLedger.
seq() >= validLedger->info().seq))) &&
946 JLOG(
j_.
error()) <<
"During consensus timerEntry: " << mn.
what();
962 JLOG(
j_.
error()) <<
"During consensus gotTxSet: " << mn.
what();
984 return consensus_.peerProposal(now, newProposal);
994 validating_ = validatorKeys_.publicKey.size() != 0 &&
995 prevLgr.
seq() >= app_.getMaxDisallowedLedger() &&
996 !app_.getOPs().isBlocked();
1000 if (validating_ && !app_.config().standalone() && app_.validators().count())
1002 auto const when = app_.validators().expires();
1004 if (!when || *when < app_.timeKeeper().now())
1006 JLOG(
j_.
error()) <<
"Voluntarily bowing out of consensus process "
1007 "because of an expired validator list.";
1008 validating_ =
false;
1016 JLOG(
j_.
info()) <<
"Entering consensus process, validating, synced="
1017 << (synced ?
"yes" :
"no");
1022 JLOG(
j_.
info()) <<
"Entering consensus process, watching, synced="
1023 << (synced ?
"yes" :
"no");
1027 inboundTransactions_.newRound(prevLgr.
seq());
1031 !nowTrusted.
empty())
1032 nUnlVote_.newValidators(prevLgr.
seq() + 1, nowTrusted);
1035 return validating_ && synced;
1041 return ledgerMaster_.haveValidated();
1047 return ledgerMaster_.getValidLedgerIndex();
1053 return app_.validators().getQuorumKeys();
1059 return app_.validators().quorum();
1067 return app_.getValidations().laggards(seq, trustedKeys);
1073 return !validatorKeys_.publicKey.empty();
1079 if (!positions && app_.getOPs().isFull())