diff --git a/src/ripple/app/ledger/Consensus.h b/src/ripple/app/ledger/Consensus.h index edf16be624..722ca0b27b 100644 --- a/src/ripple/app/ledger/Consensus.h +++ b/src/ripple/app/ledger/Consensus.h @@ -62,14 +62,20 @@ public: std::chrono::milliseconds getLastCloseDuration () const = 0; - /** Called when a new round of consensus is about to begin */ + /** Called to create a LedgerConsensus instance */ virtual std::shared_ptr - startRound ( + makeLedgerConsensus ( Application& app, InboundTransactions& inboundTransactions, - LocalTxs& localtx, LedgerMaster& ledgerMaster, + LocalTxs& localTxs) = 0; + + /** Called when a new round of consensus is about to begin */ + virtual + void + startRound ( + LedgerConsensus& consensus, LedgerHash const &prevLCLHash, Ledger::ref previousLedger, NetClock::time_point closeTime) = 0; diff --git a/src/ripple/app/ledger/LedgerConsensus.h b/src/ripple/app/ledger/LedgerConsensus.h index 096f17e906..08e667fba6 100644 --- a/src/ripple/app/ledger/LedgerConsensus.h +++ b/src/ripple/app/ledger/LedgerConsensus.h @@ -54,6 +54,13 @@ public: virtual bool peerPosition (LedgerProposal::ref) = 0; + virtual void startRound ( + LedgerHash const& prevLCLHash, + Ledger::ref prevLedger, + NetClock::time_point closeTime, + int previousProposers, + std::chrono::milliseconds previousConvergeTime) = 0; + /** Simulate the consensus process without any network traffic. The end result, is that consensus begins and completes as if everyone diff --git a/src/ripple/app/ledger/LedgerHistory.cpp b/src/ripple/app/ledger/LedgerHistory.cpp index 6c2881bf74..01dc9138e2 100644 --- a/src/ripple/app/ledger/LedgerHistory.cpp +++ b/src/ripple/app/ledger/LedgerHistory.cpp @@ -402,13 +402,22 @@ void LedgerHistory::builtLedger (Ledger::ref ledger, Json::Value consensus) auto entry = std::make_shared(); m_consensus_validated.canonicalize(index, entry, false); - if (entry->validated && (entry->validated.get() != hash)) + if (entry->validated && ! entry->built) { - JLOG (j_.error) << "MISMATCH: seq=" << index - << " validated:" << entry->validated.get() - << " then:" << hash; - handleMismatch (hash, entry->validated.get(), consensus); + if (entry->validated.get() != hash) + { + JLOG (j_.error) << "MISMATCH: seq=" << index + << " validated:" << entry->validated.get() + << " then:" << hash; + handleMismatch (hash, entry->validated.get(), consensus); + } + else + { + // We validated a ledger and then built it locally + JLOG (j_.debug) << "MATCH: seq=" << index << " late"; + } } + entry->built.emplace (hash); entry->consensus.emplace (std::move (consensus)); } @@ -424,12 +433,20 @@ void LedgerHistory::validatedLedger (Ledger::ref ledger) auto entry = std::make_shared(); m_consensus_validated.canonicalize(index, entry, false); - if (entry->built && (entry->built.get() != hash)) + if (entry->built && ! entry->validated) { - JLOG (j_.error) << "MISMATCH: seq=" << index - << " built:" << entry->built.get() - << " then:" << hash; - handleMismatch (entry->built.get(), hash, entry->consensus.get()); + if (entry->built.get() != hash) + { + JLOG (j_.error) << "MISMATCH: seq=" << index + << " built:" << entry->built.get() + << " then:" << hash; + handleMismatch (entry->built.get(), hash, entry->consensus.get()); + } + else + { + // We built a ledger locally and then validated it + JLOG (j_.debug) << "MATCH: seq=" << index; + } } entry->validated.emplace (hash); diff --git a/src/ripple/app/ledger/impl/ConsensusImp.cpp b/src/ripple/app/ledger/impl/ConsensusImp.cpp index b231796699..48dc11d275 100644 --- a/src/ripple/app/ledger/impl/ConsensusImp.cpp +++ b/src/ripple/app/ledger/impl/ConsensusImp.cpp @@ -64,21 +64,31 @@ ConsensusImp::getLastCloseDuration () const } std::shared_ptr -ConsensusImp::startRound ( +ConsensusImp::makeLedgerConsensus ( Application& app, InboundTransactions& inboundTransactions, - LocalTxs& localtx, LedgerMaster& ledgerMaster, + LocalTxs& localTxs) +{ + return make_LedgerConsensus (app, *this, + inboundTransactions, localTxs, ledgerMaster, *feeVote_); +} + +void +ConsensusImp::startRound ( + LedgerConsensus& consensus, LedgerHash const &prevLCLHash, Ledger::ref previousLedger, NetClock::time_point closeTime) { - return make_LedgerConsensus (app, *this, lastCloseProposers_, - lastCloseConvergeTook_, inboundTransactions, localtx, ledgerMaster, - prevLCLHash, previousLedger, closeTime, *feeVote_); + consensus.startRound ( + prevLCLHash, + previousLedger, + closeTime, + lastCloseProposers_, + lastCloseConvergeTook_); } - void ConsensusImp::setProposing (bool p, bool v) { @@ -101,12 +111,10 @@ ConsensusImp::setLastValidation (STValidation::ref v) void ConsensusImp::newLCL ( int proposers, - std::chrono::milliseconds convergeTime, - uint256 const& ledgerHash) + std::chrono::milliseconds convergeTime) { lastCloseProposers_ = proposers; lastCloseConvergeTook_ = convergeTime; - lastCloseHash_ = ledgerHash; } NetClock::time_point @@ -136,6 +144,8 @@ ConsensusImp::storeProposal ( LedgerProposal::ref proposal, RippleAddress const& peerPublic) { + std::lock_guard _(lock_); + auto& props = storedProposals_[peerPublic.getNodeID ()]; if (props.size () >= 10) @@ -148,6 +158,8 @@ ConsensusImp::storeProposal ( void ConsensusImp::takePosition (int seq, std::shared_ptr const& position) { + std::lock_guard _(lock_); + recentPositions_[position->getHash ().as_uint256()] = std::make_pair (seq, position); if (recentPositions_.size () > 4) @@ -165,10 +177,15 @@ ConsensusImp::takePosition (int seq, std::shared_ptr const& position) } } -Consensus::Proposals& -ConsensusImp::peekStoredProposals () +void +ConsensusImp::visitStoredProposals ( + std::function const& f) { - return storedProposals_; + std::lock_guard _(lock_); + + for (auto const& it : storedProposals_) + for (auto const& prop : it.second) + f(prop); } //============================================================================== diff --git a/src/ripple/app/ledger/impl/ConsensusImp.h b/src/ripple/app/ledger/impl/ConsensusImp.h index ddb886b63b..e68325b03f 100644 --- a/src/ripple/app/ledger/impl/ConsensusImp.h +++ b/src/ripple/app/ledger/impl/ConsensusImp.h @@ -53,12 +53,16 @@ public: getLastCloseDuration () const override; std::shared_ptr - startRound ( + makeLedgerConsensus ( Application& app, InboundTransactions& inboundTransactions, - LocalTxs& localtx, LedgerMaster& ledgerMaster, - LedgerHash const &prevLCLHash, + LocalTxs& localTxs) override; + + void + startRound ( + LedgerConsensus& ledgerConsensus, + LedgerHash const& prevLCLHash, Ledger::ref previousLedger, NetClock::time_point closeTime) override; @@ -82,8 +86,7 @@ public: void newLCL ( int proposers, - std::chrono::milliseconds convergeTime, - uint256 const& ledgerHash); + std::chrono::milliseconds convergeTime); NetClock::time_point validationTimestamp (NetClock::time_point vt); @@ -93,8 +96,8 @@ public: void takePosition (int seq, std::shared_ptr const& position); - Consensus::Proposals& - peekStoredProposals (); + void + visitStoredProposals (std::function const&); private: beast::Journal journal_; @@ -112,9 +115,6 @@ private: // How long the last ledger close took, in milliseconds std::chrono::milliseconds lastCloseConvergeTook_; - // The hash of the last closed ledger - uint256 lastCloseHash_; - // The timestamp of the last validation we used, in network time. This is // only used for our own validations. NetClock::time_point lastValidationTimestamp_; @@ -126,6 +126,9 @@ private: std::map>> recentPositions_; Consensus::Proposals storedProposals_; + + // lock to protect recentPositions_ and storedProposals_ + std::mutex lock_; }; } diff --git a/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp b/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp index 03a77f37ef..06362a94d7 100644 --- a/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp +++ b/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp @@ -215,14 +215,9 @@ checkConsensus ( LedgerConsensusImp::LedgerConsensusImp ( Application& app, ConsensusImp& consensus, - int previousProposers, - std::chrono::milliseconds previousConvergeTime, InboundTransactions& inboundTransactions, LocalTxs& localtx, LedgerMaster& ledgerMaster, - LedgerHash const & prevLCLHash, - Ledger::ref previousLedger, - NetClock::time_point closeTime, FeeVote& feeVote) : app_ (app) , consensus_ (consensus) @@ -231,91 +226,30 @@ LedgerConsensusImp::LedgerConsensusImp ( , ledgerMaster_ (ledgerMaster) , m_feeVote (feeVote) , state_ (State::open) - , mCloseTime {closeTime} - , mPrevLedgerHash (prevLCLHash) - , mPreviousLedger (previousLedger) + , mCloseTime {} , mValPublic (app_.config().VALIDATION_PUB) , mValPrivate (app_.config().VALIDATION_PRIV) + , mProposing (false) + , mValidating (false) + , mHaveCorrectLCL (true) , mConsensusFail (false) , mCurrentMSeconds (0) , mClosePercent (0) + , mCloseResolution (30) , mHaveCloseTimeConsensus (false) , mConsensusStartTime (std::chrono::steady_clock::now ()) - , mPreviousProposers (previousProposers) - , mPreviousMSeconds (previousConvergeTime) + , mPreviousProposers (0) + , mPreviousMSeconds (0) , j_ (app.journal ("LedgerConsensus")) { JLOG (j_.debug) << "Creating consensus object"; - JLOG (j_.trace) - << "LCL:" << previousLedger->getHash () - << ", ct=" << closeTime.time_since_epoch().count(); - - assert (mPreviousMSeconds != 0ms); - - inboundTransactions_.newRound (mPreviousLedger->info().seq); - - // Adapt close time resolution to recent network conditions - mCloseResolution = getNextLedgerTimeResolution ( - mPreviousLedger->info().closeTimeResolution, - getCloseAgree (mPreviousLedger->info()), - mPreviousLedger->info().seq + 1); - - if (mValPublic.isSet () && mValPrivate.isSet () - && !app_.getOPs ().isNeedNetworkLedger ()) - { - // If the validation keys were set, and if we need a ledger, - // then we want to validate, and possibly propose a ledger. - JLOG (j_.info) - << "Entering consensus process, validating"; - mValidating = true; - // Propose if we are in sync with the network - mProposing = - app_.getOPs ().getOperatingMode () == NetworkOPs::omFULL; - } - else - { - // Otherwise we just want to monitor the validation process. - JLOG (j_.info) - << "Entering consensus process, watching"; - mProposing = mValidating = false; - } - - mHaveCorrectLCL = (mPreviousLedger->getHash () == mPrevLedgerHash); - - if (!mHaveCorrectLCL) - { - // If we were not handed the correct LCL, then set our state - // to not proposing. - consensus_.setProposing (false, false); - handleLCL (mPrevLedgerHash); - - if (!mHaveCorrectLCL) - { - // mProposing = mValidating = false; - JLOG (j_.info) - << "Entering consensus with: " - << previousLedger->getHash (); - JLOG (j_.info) - << "Correct LCL is: " << prevLCLHash; - } - } - else - // update the network status table as to whether we're - // proposing/validating - consensus_.setProposing (mProposing, mValidating); - - playbackProposals (); - if (mPeerPositions.size() > (mPreviousProposers / 2)) - { - // We may be falling behind, don't wait for the timer - // consider closing the ledger immediately - timerEntry (); - } } Json::Value LedgerConsensusImp::getJson (bool full) { Json::Value ret (Json::objectValue); + std::lock_guard _(lock_); + ret["proposing"] = mProposing; ret["validating"] = mValidating; ret["proposers"] = static_cast (mPeerPositions.size ()); @@ -339,8 +273,8 @@ Json::Value LedgerConsensusImp::getJson (bool full) ret[jss::state] = "consensus"; break; - case State::finished: - ret[jss::state] = "finished"; + case State::processing: + ret[jss::state] = "processing"; break; case State::accepted: @@ -427,6 +361,7 @@ Json::Value LedgerConsensusImp::getJson (bool full) uint256 LedgerConsensusImp::getLCL () { + std::lock_guard _(lock_); return mPrevLedgerHash; } @@ -528,6 +463,8 @@ void LedgerConsensusImp::mapComplete ( std::shared_ptr const& map, bool acquired) { + std::lock_guard _(lock_); + try { mapCompleteInternal (hash, map, acquired); @@ -584,8 +521,8 @@ void LedgerConsensusImp::checkLCL () status = "establish"; break; - case State::finished: - status = "finished"; + case State::processing: + status = "processing"; break; case State::accepted: @@ -644,6 +581,7 @@ void LedgerConsensusImp::handleLCL (uint256 const& lclHash) mProposing = false; mPeerPositions.clear (); mDisputes.clear (); + mCompares.clear (); mCloseTimes.clear (); mDeadNodes.clear (); // To get back in sync: @@ -682,25 +620,25 @@ void LedgerConsensusImp::handleLCL (uint256 const& lclHash) assert (!newLCL->info().open && newLCL->isImmutable ()); assert (newLCL->getHash () == lclHash); - mPreviousLedger = newLCL; - mPrevLedgerHash = lclHash; - JLOG (j_.info) << "Have the consensus ledger " << mPrevLedgerHash; - mHaveCorrectLCL = true; - - mCloseResolution = getNextLedgerTimeResolution ( - mPreviousLedger->info().closeTimeResolution, - getCloseAgree(mPreviousLedger->info()), - mPreviousLedger->info().seq + 1); + startRound ( + lclHash, + newLCL, + mCloseTime, + mPreviousProposers, + mPreviousMSeconds); + mProposing = false; } void LedgerConsensusImp::timerEntry () { + std::lock_guard _(lock_); + try { - if ((state_ != State::finished) && (state_ != State::accepted)) - checkLCL (); + if ((state_ != State::processing) && (state_ != State::accepted)) + checkLCL (); using namespace std::chrono; mCurrentMSeconds = duration_cast @@ -711,24 +649,24 @@ void LedgerConsensusImp::timerEntry () { case State::open: statePreClose (); - return; + + if (state_ != State::establish) return; + + // Fall through case State::establish: stateEstablish (); + return; - if (state_ != State::finished) return; - - // Fall through - - case State::finished: - stateFinished (); - - if (state_ != State::accepted) return; - - // Fall through + case State::processing: + // We are processing the finished ledger + // logic of calculating next ledger advances us out of this state + // nothing to do + return; case State::accepted: - stateAccepted (); + // NetworkOPs needs to setup the next round + // nothing to do return; } @@ -806,23 +744,10 @@ void LedgerConsensusImp::stateEstablish () JLOG (j_.info) << "Converge cutoff (" << mPeerPositions.size () << " participants)"; - state_ = State::finished; + state_ = State::processing; beginAccept (false); } -void LedgerConsensusImp::stateFinished () -{ - // we are processing the finished ledger - // logic of calculating next ledger advances us out of this state - // nothing to do -} - -void LedgerConsensusImp::stateAccepted () -{ - // we have accepted a new ledger - endConsensus (); -} - bool LedgerConsensusImp::haveConsensus () { // CHECKME: should possibly count unacquired TX sets as disagreeing @@ -888,6 +813,8 @@ bool LedgerConsensusImp::haveConsensus () std::shared_ptr LedgerConsensusImp::getTransactionTree ( uint256 const& hash) { + std::lock_guard _(lock_); + auto it = mAcquired.find (hash); if (it != mAcquired.end() && it->second) return it->second; @@ -902,8 +829,17 @@ std::shared_ptr LedgerConsensusImp::getTransactionTree ( bool LedgerConsensusImp::peerPosition (LedgerProposal::ref newPosition) { + std::lock_guard _(lock_); auto const peerID = newPosition->getPeerID (); + if (newPosition->getPrevLedger() != mPrevLedgerHash) + { + JLOG (j_.debug) << "Got proposal for " + << newPosition->getPrevLedger () + << " but we are on " << mPrevLedgerHash; + return false; + } + if (mDeadNodes.find (peerID) != mDeadNodes.end ()) { JLOG (j_.info) @@ -968,28 +904,20 @@ bool LedgerConsensusImp::peerPosition (LedgerProposal::ref newPosition) void LedgerConsensusImp::simulate () { + std::lock_guard _(lock_); + JLOG (j_.info) << "Simulating consensus"; closeLedger (); mCurrentMSeconds = 100ms; beginAccept (true); - endConsensus (); JLOG (j_.info) << "Simulation complete"; } void LedgerConsensusImp::accept (std::shared_ptr set) { - Json::Value consensusStatus; - - { - auto lock = beast::make_lock(app_.getMasterMutex()); - - // put our set where others can get it later - if (set->getHash ().isNonZero ()) - consensus_.takePosition (mPreviousLedger->info().seq, set); - - assert (set->getHash ().as_uint256() == mOurPosition->getCurrentHash ()); - consensusStatus = getJson (true); - } + // put our set where others can get it later + if (set->getHash ().isNonZero ()) + consensus_.takePosition (mPreviousLedger->info().seq, set); auto closeTime = mOurPosition->getCloseTime(); bool closeTimeCorrect; @@ -1146,7 +1074,7 @@ void LedgerConsensusImp::accept (std::shared_ptr set) << "CNF newLCL " << newLCLHash; // See if we can accept a ledger as fully-validated - ledgerMaster_.consensusBuilt (newLCL, std::move (consensusStatus)); + ledgerMaster_.consensusBuilt (newLCL, getJson (true)); { // Apply disputed transactions that didn't get in @@ -1214,9 +1142,7 @@ void LedgerConsensusImp::accept (std::shared_ptr set) }); } - mNewLedgerHash = newLCL->getHash (); ledgerMaster_.switchLCL (newLCL); - state_ = State::accepted; assert (ledgerMaster_.getClosedLedger()->getHash() == newLCL->getHash()); assert (app_.openLedger().current()->info().parentHash == newLCL->getHash()); @@ -1254,6 +1180,17 @@ void LedgerConsensusImp::accept (std::shared_ptr set) << offset.count() << " (" << closeCount << ")"; app_.timeKeeper().adjustCloseTime(offset); } + + // we have accepted a new ledger + bool correct; + { + std::lock_guard _(lock_); + + state_ = State::accepted; + correct = mHaveCorrectLCL; + } + + endConsensus (correct); } void LedgerConsensusImp::createDisputes ( @@ -1707,19 +1644,15 @@ void LedgerConsensusImp::updateOurPositions () void LedgerConsensusImp::playbackProposals () { - for (auto const& it: consensus_.peekStoredProposals ()) - { - for (auto const& proposal : it.second) + consensus_.visitStoredProposals ( + [this](LedgerProposal::ref proposal) { if (proposal->isPrevLedger (mPrevLedgerHash) && peerPosition (proposal)) { - JLOG (j_.warning) - << "We should do delayed relay of this proposal," - << " but we cannot"; + // FIXME: Should do delayed relay } - } - } + }); } void LedgerConsensusImp::closeLedger () @@ -1786,7 +1719,7 @@ void LedgerConsensusImp::beginAccept (bool synchronous) return; } - consensus_.newLCL(mPeerPositions.size(), mCurrentMSeconds, mNewLedgerHash); + consensus_.newLCL (mPeerPositions.size (), mCurrentMSeconds); if (synchronous) accept (consensusSet); @@ -1798,9 +1731,104 @@ void LedgerConsensusImp::beginAccept (bool synchronous) } } -void LedgerConsensusImp::endConsensus () +void LedgerConsensusImp::endConsensus (bool correctLCL) { - app_.getOPs ().endConsensus (mHaveCorrectLCL); + app_.getOPs ().endConsensus (correctLCL); +} + +void LedgerConsensusImp::startRound ( + LedgerHash const& prevLCLHash, + Ledger::ref prevLedger, + NetClock::time_point closeTime, + int previousProposers, + std::chrono::milliseconds previousConvergeTime) +{ + std::lock_guard _(lock_); + + if (state_ == State::processing) + { + // We can't start a new round while we're processing + return; + } + + state_ = State::open; + mCloseTime = closeTime; + mPrevLedgerHash = prevLCLHash; + mPreviousLedger = prevLedger; + mOurPosition.reset(); + mConsensusFail = false; + mCurrentMSeconds = 0ms; + mClosePercent = 0; + mHaveCloseTimeConsensus = false; + mConsensusStartTime = std::chrono::steady_clock::now(); + mPreviousProposers = previousProposers; + mPreviousMSeconds = previousConvergeTime; + inboundTransactions_.newRound (mPreviousLedger->info().seq); + + mPeerPositions.clear(); + mAcquired.clear(); + mDisputes.clear(); + mCompares.clear(); + mCloseTimes.clear(); + mDeadNodes.clear(); + + mCloseResolution = getNextLedgerTimeResolution ( + mPreviousLedger->info().closeTimeResolution, + getCloseAgree (mPreviousLedger->info()), + mPreviousLedger->info().seq + 1); + + if (mValPublic.isSet () && mValPrivate.isSet () + && !app_.getOPs ().isNeedNetworkLedger ()) + { + // If the validation keys were set, and if we need a ledger, + // then we want to validate, and possibly propose a ledger. + JLOG (j_.info) + << "Entering consensus process, validating"; + mValidating = true; + // Propose if we are in sync with the network + mProposing = + app_.getOPs ().getOperatingMode () == NetworkOPs::omFULL; + } + else + { + // Otherwise we just want to monitor the validation process. + JLOG (j_.info) + << "Entering consensus process, watching"; + mProposing = mValidating = false; + } + + mHaveCorrectLCL = (mPreviousLedger->getHash () == mPrevLedgerHash); + + if (!mHaveCorrectLCL) + { + // If we were not handed the correct LCL, then set our state + // to not proposing. + consensus_.setProposing (false, false); + handleLCL (mPrevLedgerHash); + + if (!mHaveCorrectLCL) + { + // mProposing = mValidating = false; + JLOG (j_.info) + << "Entering consensus with: " + << mPreviousLedger->getHash (); + JLOG (j_.info) + << "Correct LCL is: " << prevLCLHash; + } + } + else + // update the network status table as to whether we're + // proposing/validating + consensus_.setProposing (mProposing, mValidating); + + playbackProposals (); + if (mPeerPositions.size() > (mPreviousProposers / 2)) + { + // We may be falling behind, don't wait for the timer + // consider closing the ledger immediately + timerEntry (); + } + } void LedgerConsensusImp::addLoad(STValidation::ref val) @@ -1815,16 +1843,16 @@ void LedgerConsensusImp::addLoad(STValidation::ref val) //------------------------------------------------------------------------------ std::shared_ptr -make_LedgerConsensus (Application& app, ConsensusImp& consensus, int previousProposers, - std::chrono::milliseconds previousConvergeTime, +make_LedgerConsensus ( + Application& app, + ConsensusImp& consensus, InboundTransactions& inboundTransactions, - LocalTxs& localtx, LedgerMaster& ledgerMaster, - LedgerHash const &prevLCLHash, - Ledger::ref previousLedger, NetClock::time_point closeTime, FeeVote& feeVote) + LocalTxs& localtx, + LedgerMaster& ledgerMaster, + FeeVote& feeVote) { - return std::make_shared (app, consensus, previousProposers, - previousConvergeTime, inboundTransactions, localtx, ledgerMaster, - prevLCLHash, previousLedger, closeTime, feeVote); + return std::make_shared (app, consensus, + inboundTransactions, localtx, ledgerMaster, feeVote); } //------------------------------------------------------------------------------ diff --git a/src/ripple/app/ledger/impl/LedgerConsensusImp.h b/src/ripple/app/ledger/impl/LedgerConsensusImp.h index ff739e01f7..f4888f42ff 100644 --- a/src/ripple/app/ledger/impl/LedgerConsensusImp.h +++ b/src/ripple/app/ledger/impl/LedgerConsensusImp.h @@ -60,10 +60,12 @@ private: // Establishing consensus establish, - // We have closed on a transaction set - finished, + // We have closed on a transaction set and are + // processing the new ledger + processing, // We have accepted / validated a new last closed ledger + // and need to start a new round accepted, }; @@ -81,30 +83,33 @@ public: ~LedgerConsensusImp () = default; /** - @param previousProposers the number of participants in the last round - @param previousConvergeTime how long the last round took (ms) - @param inboundTransactions @param localtx transactions issued by local clients - @param inboundTransactions the set of + @param inboundTransactions set of inbound transaction sets @param localtx A set of local transactions to apply - @param prevLCLHash The hash of the Last Closed Ledger (LCL). - @param previousLedger Best guess of what the LCL was. - @param closeTime Closing time point of the LCL. @param feeVote Our desired fee levels and voting logic. */ LedgerConsensusImp ( Application& app, ConsensusImp& consensus, - int previousProposers, - std::chrono::milliseconds previousConvergeTime, InboundTransactions& inboundTransactions, LocalTxs& localtx, LedgerMaster& ledgerMaster, - LedgerHash const & prevLCLHash, - Ledger::ref previousLedger, - NetClock::time_point closeTime, FeeVote& feeVote); + /** + @param prevLCLHash The hash of the Last Closed Ledger (LCL). + @param previousLedger Best guess of what the LCL was. + @param closeTime Closing time point of the LCL. + @param previousProposers the number of participants in the last round + @param previousConvergeTime how long the last round took (ms) + */ + void startRound ( + LedgerHash const& prevLCLHash, + Ledger::ref prevLedger, + NetClock::time_point closeTime, + int previousProposers, + std::chrono::milliseconds previousConvergeTime) override; + /** Get the Json state of the consensus process. Called by the consensus_info RPC. @@ -129,45 +134,11 @@ public: std::shared_ptr const& map, bool acquired) override; - /** - Check if our last closed ledger matches the network's. - This tells us if we are still in sync with the network. - This also helps us if we enter the consensus round with - the wrong ledger, to leave it with the correct ledger so - that we can participate in the next round. - */ - void checkLCL (); - - /** - Change our view of the last closed ledger - - @param lclHash Hash of the last closed ledger. - */ - void handleLCL (uint256 const& lclHash); - /** On timer call the correct handler for each state. */ void timerEntry () override; - /** - Handle pre-close state. - */ - void statePreClose (); - - /** We are establishing a consensus - Update our position only on the timer, and in this state. - If we have consensus, move to the finish state - */ - void stateEstablish (); - - void stateFinished (); - - void stateAccepted (); - - /** Check if we've reached consensus */ - bool haveConsensus (); - std::shared_ptr getTransactionTree (uint256 const& hash); /** @@ -182,6 +153,36 @@ public: void simulate () override; private: + /** + Handle pre-close state. + */ + void statePreClose (); + + /** We are establishing a consensus + Update our position only on the timer, and in this state. + If we have consensus, move to the finish state + */ + void stateEstablish (); + + /** Check if we've reached consensus */ + bool haveConsensus (); + + /** + Check if our last closed ledger matches the network's. + This tells us if we are still in sync with the network. + This also helps us if we enter the consensus round with + the wrong ledger, to leave it with the correct ledger so + that we can participate in the next round. + */ + void checkLCL (); + + /** + Change our view of the last closed ledger + + @param lclHash Hash of the last closed ledger. + */ + void handleLCL (uint256 const& lclHash); + /** We have a complete transaction set, typically acquired from the network @@ -288,7 +289,8 @@ private: /** We have a new LCL and must accept it */ void beginAccept (bool synchronous); - void endConsensus (); + void endConsensus (bool correctLCL); + /** Add our load fee to our validation */ void addLoad(STValidation::ref val); @@ -303,10 +305,16 @@ private: LocalTxs& m_localTX; LedgerMaster& ledgerMaster_; FeeVote& m_feeVote; + std::recursive_mutex lock_; State state_; - NetClock::time_point mCloseTime; // The wall time this ledger closed - uint256 mPrevLedgerHash, mNewLedgerHash, mAcquiringLedger; + + // The wall time this ledger closed + NetClock::time_point mCloseTime; + + uint256 mPrevLedgerHash; + uint256 mAcquiringLedger; + Ledger::pointer mPreviousLedger; LedgerProposal::pointer mOurPosition; RippleAddress mValPublic, mValPrivate; @@ -349,12 +357,13 @@ private: //------------------------------------------------------------------------------ std::shared_ptr -make_LedgerConsensus (Application& app, ConsensusImp& consensus, int previousProposers, - std::chrono::milliseconds previousConvergeTime, +make_LedgerConsensus ( + Application& app, + ConsensusImp& consensus, InboundTransactions& inboundTransactions, - LocalTxs& localtx, LedgerMaster& ledgerMaster, - LedgerHash const &prevLCLHash, Ledger::ref previousLedger, - NetClock::time_point closeTime, FeeVote& feeVote); + LocalTxs& localtx, + LedgerMaster& ledgerMaster, + FeeVote& feeVote); } // ripple diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index e6403a0abb..085879ffdb 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -1030,6 +1030,12 @@ void ApplicationImp::setup() *config_); add (*m_overlay); // add to PropertyStream + // start first consensus round + if (! m_networkOPs->beginConsensus(m_ledgerMaster->getClosedLedger()->info().hash)) + { + LogicError ("Unable to start consensus"); + } + m_overlay->setupValidatorKeyManifests (*config_, getWalletDB ()); { diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 10d17701af..382c6471cb 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -193,6 +193,8 @@ public: , m_heartbeatTimer (this) , m_clusterTimer (this) , mConsensus (make_Consensus (app_.config(), app_.logs())) + , mLedgerConsensus (mConsensus->makeLedgerConsensus ( + app, app.getInboundTransactions(), ledgerMaster, *m_localTX)) , m_ledgerMaster (ledgerMaster) , mLastLoadBase (256) , mLastLoadFactor (256) @@ -303,10 +305,10 @@ private: Ledger::pointer newLedger, bool duringConsensus); bool checkLastClosedLedger ( const Overlay::PeerSequence&, uint256& networkClosed); - bool beginConsensus (uint256 const& networkClosed); void tryStartConsensus (); public: + bool beginConsensus (uint256 const& networkClosed) override; void endConsensus (bool correctLCL) override; void setStandAlone () override { @@ -474,7 +476,6 @@ private: Json::Value transJson ( const STTx& stTxn, TER terResult, bool bValidated, std::shared_ptr const& lpCurrent); - bool haveConsensusObject (); void pubValidatedTransaction ( Ledger::ref alAccepted, const AcceptedLedgerTx& alTransaction); @@ -659,13 +660,10 @@ void NetworkOPsImp::processHeartbeatTimer () else if (mMode == omCONNECTED) setMode (omCONNECTED); - if (!mLedgerConsensus) - tryStartConsensus (); - - if (mLedgerConsensus) - mLedgerConsensus->timerEntry (); } + mLedgerConsensus->timerEntry (); + setHeartbeatTimer (); } @@ -1191,7 +1189,7 @@ void NetworkOPsImp::tryStartConsensus () } } - if ((!mLedgerConsensus) && (mMode != omDISCONNECTED)) + if (mMode != omDISCONNECTED) beginConsensus (networkClosed); } @@ -1428,15 +1426,10 @@ bool NetworkOPsImp::beginConsensus (uint256 const& networkClosed) assert (closingInfo.parentHash == m_ledgerMaster.getClosedLedger ()->getHash ()); - // Create a consensus object to get consensus on this ledger - assert (!mLedgerConsensus); prevLedger->setImmutable (app_.config()); - mLedgerConsensus = mConsensus->startRound ( - app_, - app_.getInboundTransactions(), - *m_localTX, - m_ledgerMaster, + mConsensus->startRound ( + *mLedgerConsensus, networkClosed, prevLedger, closingInfo.closeTime); @@ -1445,40 +1438,8 @@ bool NetworkOPsImp::beginConsensus (uint256 const& networkClosed) return true; } -bool NetworkOPsImp::haveConsensusObject () -{ - if (mLedgerConsensus != nullptr) - return true; - - if ((mMode == omFULL) || (mMode == omTRACKING)) - { - tryStartConsensus (); - } - else - { - // we need to get into the consensus process - uint256 networkClosed; - Overlay::PeerSequence peerList = app_.overlay ().getActivePeers (); - bool ledgerChange = checkLastClosedLedger (peerList, networkClosed); - - if (!ledgerChange) - { - m_journal.info << "Beginning consensus due to peer action"; - if ( ((mMode == omTRACKING) || (mMode == omSYNCING)) && - (mConsensus->getLastCloseProposers() >= m_ledgerMaster.getMinValidations()) ) - setMode (omFULL); - beginConsensus (networkClosed); - } - } - - return mLedgerConsensus != nullptr; -} - uint256 NetworkOPsImp::getConsensusLCL () { - if (!haveConsensusObject ()) - return uint256 (); - return mLedgerConsensus->getLCL (); } @@ -1487,33 +1448,9 @@ void NetworkOPsImp::processTrustedProposal ( std::shared_ptr set, const RippleAddress& nodePublic) { { - auto lock = beast::make_lock(app_.getMasterMutex()); + mConsensus->storeProposal (proposal, nodePublic); - bool relay = true; - - if (mConsensus) - mConsensus->storeProposal (proposal, nodePublic); - else - m_journal.warning << "Unable to store proposal"; - - if (!haveConsensusObject ()) - { - m_journal.info << "Received proposal outside consensus window"; - - if (mMode == omFULL) - relay = false; - } - else if (mLedgerConsensus->getLCL () == proposal->getPrevLedger ()) - { - relay = mLedgerConsensus->peerPosition (proposal); - m_journal.trace - << "Proposal processing finished, relay=" << relay; - } - else - m_journal.debug << "Got proposal for " << proposal->getPrevLedger () - << " but we are on " << mLedgerConsensus->getLCL(); - - if (relay) + if (mLedgerConsensus->peerPosition (proposal)) app_.overlay().relay(*set, proposal->getSuppressionID()); else m_journal.info << "Not relaying trusted proposal"; @@ -1524,10 +1461,7 @@ void NetworkOPsImp::mapComplete (uint256 const& hash, std::shared_ptr const& map) { - std::lock_guard lock(app_.getMasterMutex()); - - if (haveConsensusObject ()) - mLedgerConsensus->mapComplete (hash, map, true); + mLedgerConsensus->mapComplete (hash, map, true); } void NetworkOPsImp::endConsensus (bool correctLCL) @@ -1546,7 +1480,7 @@ void NetworkOPsImp::endConsensus (bool correctLCL) } } - mLedgerConsensus = std::shared_ptr (); + tryStartConsensus(); } void NetworkOPsImp::consensusViewChange () @@ -1952,12 +1886,7 @@ bool NetworkOPsImp::recvValidation ( Json::Value NetworkOPsImp::getConsensusInfo () { - if (mLedgerConsensus) - return mLedgerConsensus->getJson (true); - - Json::Value info = Json::objectValue; - info[jss::consensus] = "none"; - return info; + return mLedgerConsensus->getJson (true); } Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin) @@ -2027,8 +1956,7 @@ Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin) info[jss::last_close] = lastClose; - // if (mLedgerConsensus) - // info[jss::consensus] = mLedgerConsensus->getJson(); + // info[jss::consensus] = mLedgerConsensus->getJson(); if (admin) info[jss::load] = m_job_queue.getJson (); diff --git a/src/ripple/app/misc/NetworkOPs.h b/src/ripple/app/misc/NetworkOPs.h index 66ca279639..06e3a3835d 100644 --- a/src/ripple/app/misc/NetworkOPs.h +++ b/src/ripple/app/misc/NetworkOPs.h @@ -162,6 +162,7 @@ public: std::shared_ptr const& map) = 0; // network state machine + virtual bool beginConsensus (uint256 const& netLCL) = 0; virtual void endConsensus (bool correctLCL) = 0; virtual void setStandAlone () = 0; virtual void setStateTimer () = 0; diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 6b2dd63b1d..a6cfb1ba8d 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1917,13 +1917,7 @@ PeerImp::checkPropose (Job& job, } else { - uint256 consensusLCL; - { - std::lock_guard lock (app_.getMasterMutex()); - consensusLCL = app_.getOPs ().getConsensusLCL (); - } - - if (consensusLCL == proposal->getPrevLedger()) + if (app_.getOPs().getConsensusLCL() == proposal->getPrevLedger()) { // relay untrusted proposal p_journal_.trace <<