From 33f153fc9a1b7d09737a20250c12c28ad3e02776 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 11 Jul 2016 01:22:16 -0700 Subject: [PATCH] Consensus refactor preliminary changes (RIPD-1011): * Standardize names of LedgerConsensusImp members * Rework visitStoredProposals * Clean up mapComplete * Move status helpers out of LedgerConsensusImp * Move applyTransaction out of LedgerConsensusUmp * Clean up applyTransactions --- src/ripple/app/ledger/LedgerConsensus.h | 39 +- src/ripple/app/ledger/LedgerProposal.h | 4 - src/ripple/app/ledger/LedgerTiming.h | 75 ++ src/ripple/app/ledger/impl/ConsensusImp.cpp | 21 +- src/ripple/app/ledger/impl/ConsensusImp.h | 4 +- .../app/ledger/impl/LedgerConsensusImp.cpp | 981 +++++++----------- .../app/ledger/impl/LedgerConsensusImp.h | 72 +- src/ripple/app/ledger/impl/LedgerTiming.cpp | 133 +++ src/ripple/app/misc/NetworkOPs.cpp | 2 +- src/ripple/app/tx/apply.h | 19 + src/ripple/app/tx/impl/apply.cpp | 48 + 11 files changed, 729 insertions(+), 669 deletions(-) diff --git a/src/ripple/app/ledger/LedgerConsensus.h b/src/ripple/app/ledger/LedgerConsensus.h index 0c0da71bf2..5ae903b838 100644 --- a/src/ripple/app/ledger/LedgerConsensus.h +++ b/src/ripple/app/ledger/LedgerConsensus.h @@ -47,8 +47,8 @@ public: virtual uint256 getLCL () = 0; - virtual void mapComplete (uint256 const& hash, - std::shared_ptr const& map, bool acquired) = 0; + virtual void gotMap (uint256 const& hash, + std::shared_ptr const& map) = 0; virtual void timerEntry () = 0; @@ -74,41 +74,6 @@ public: boost::optional consensusDelay) = 0; }; -//------------------------------------------------------------------------------ -/** Apply a set of transactions to a ledger - - @param set The set of transactions to apply - @param applyLedger The ledger to which the transactions should - be applied. - @param checkLedger A reference ledger for determining error - messages (typically new last closed - ledger). - @param retriableTransactions collect failed transactions in this set - @param openLgr true if applyLedger is open, else false. -*/ -void applyTransactions ( - Application& app, - SHAMap const* set, - OpenView& view, - ReadView const& checkLedger, - CanonicalTXSet& retriableTransactions, - ApplyFlags flags); - -/** Apply a single transaction to a ledger - @param view The open view to apply to - @param txn The transaction to apply - @param retryAssured True if another pass is assured - @param flags Flags for transactor - @return resultSuccess, resultFail or resultRetry -*/ -int applyTransaction ( - Application& app, - OpenView& view, - std::shared_ptr const& txn, - bool retryAssured, - ApplyFlags flags, - beast::Journal j); - } // ripple #endif diff --git a/src/ripple/app/ledger/LedgerProposal.h b/src/ripple/app/ledger/LedgerProposal.h index f5cb774304..44665d7705 100644 --- a/src/ripple/app/ledger/LedgerProposal.h +++ b/src/ripple/app/ledger/LedgerProposal.h @@ -107,10 +107,6 @@ public: return signature_; } - bool isPrevLedger (uint256 const& pl) const - { - return mPreviousLedger == pl; - } bool isInitial () const { return mProposeSeq == seqJoin; diff --git a/src/ripple/app/ledger/LedgerTiming.h b/src/ripple/app/ledger/LedgerTiming.h index ef427144ec..88cfb1dd70 100644 --- a/src/ripple/app/ledger/LedgerTiming.h +++ b/src/ripple/app/ledger/LedgerTiming.h @@ -23,6 +23,7 @@ #include #include #include +#include namespace ripple { @@ -50,6 +51,80 @@ roundCloseTime ( NetClock::time_point closeTime, NetClock::duration closeResolution); +/** Determines whether the current ledger should close at this time. + + This function should be called when a ledger is open and there is no close + in progress, or when a transaction is received and no close is in progress. + + @param anyTransactions indicates whether any transactions have been received + @param previousProposers proposers in the last closing + @param proposersClosed proposers who have currently closed this ledger + @param proposersValidated proposers who have validated the last closed + ledger + @param previousTime time for the previous ledger to reach consensus + @param currentTime time since the previous ledger's + (possibly rounded) close time + @param openTime time waiting to close this ledger + @param idleInterval the network's desired idle interval + @param j journal for logging +*/ +bool +shouldCloseLedger ( + bool anyTransactions, + int previousProposers, + int proposersClosed, + int proposersValidated, + std::chrono::milliseconds previousTime, + std::chrono::milliseconds currentTime, // Time since last ledger's close time + std::chrono::milliseconds openTime, // Time waiting to close this ledger + std::chrono::seconds idleInterval, + beast::Journal j); + + +/** Determine if a consensus has been reached + + This function determines if a consensus has been reached + + @param agreeing count of agreements with our position + @param total count of participants other than us + @param count_self whether we count ourselves + @return True if a consensus has been reached +*/ +bool +checkConsensusReached (int agreeing, int total, bool count_self); + +/** Whether we have or don't have a consensus */ +enum class ConsensusState +{ + No, // We do not have consensus + MovedOn, // The network has consensus without us + Yes // We have consensus along with the network +}; + +/** Determine whether the network reached consensus and whether we joined. + + @param previousProposers proposers in the last closing (not including us) + @param currentProposers proposers in this closing so far (not including us) + @param currentAgree proposers who agree with us + @param currentFinished proposers who have validated a ledger after this one + @param previousAgreeTime how long, in milliseconds, it took to agree on the + last ledger + @param currentAgreeTime how long, in milliseconds, we've been trying to + agree + @param proposing whether we should count ourselves + @param j journal for logging +*/ +ConsensusState +checkConsensus ( + int previousProposers, + int currentProposers, + int currentAgree, + int currentFinished, + std::chrono::milliseconds previousAgreeTime, + std::chrono::milliseconds currentAgreeTime, + bool proposing, + beast::Journal j); + //------------------------------------------------------------------------------ // These are protocol parameters used to control the behavior of the system and diff --git a/src/ripple/app/ledger/impl/ConsensusImp.cpp b/src/ripple/app/ledger/impl/ConsensusImp.cpp index 0006edbe44..b466084dfd 100644 --- a/src/ripple/app/ledger/impl/ConsensusImp.cpp +++ b/src/ripple/app/ledger/impl/ConsensusImp.cpp @@ -177,15 +177,22 @@ ConsensusImp::takePosition (int seq, std::shared_ptr const& position) } } -void -ConsensusImp::visitStoredProposals ( - std::function const& f) +std::vector > +ConsensusImp::getStoredProposals (uint256 const& prevLedger) { - std::lock_guard _(lock_); - for (auto const& it : storedProposals_) - for (auto const& prop : it.second) - f(prop); + std::vector > ret; + + { + std::lock_guard _(lock_); + + for (auto const& it : storedProposals_) + for (auto const& prop : it.second) + if (prop->getPrevLedger() == prevLedger) + ret.push_back (prop); + } + + return ret; } //============================================================================== diff --git a/src/ripple/app/ledger/impl/ConsensusImp.h b/src/ripple/app/ledger/impl/ConsensusImp.h index 0b15bfac6a..8f98f647ba 100644 --- a/src/ripple/app/ledger/impl/ConsensusImp.h +++ b/src/ripple/app/ledger/impl/ConsensusImp.h @@ -96,8 +96,8 @@ public: void takePosition (int seq, std::shared_ptr const& position); - void - visitStoredProposals (std::function const&); + std::vector > + getStoredProposals (uint256 const& previousLedger); private: beast::Journal journal_; diff --git a/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp b/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp index b1c53ca33a..d757b579f3 100644 --- a/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp +++ b/src/ripple/app/ledger/impl/LedgerConsensusImp.cpp @@ -53,176 +53,6 @@ namespace ripple { -/** Determines whether the current ledger should close at this time. - - This function should be called when a ledger is open and there is no close - in progress, or when a transaction is received and no close is in progress. - - @param anyTransactions indicates whether any transactions have been received - @param previousProposers proposers in the last closing - @param proposersClosed proposers who have currently closed this ledger - @param proposersValidated proposers who have validated the last closed - ledger - @param previousMSeconds time, in milliseconds, for the previous ledger to - reach consensus (in milliseconds) - @param currentMSeconds time, in milliseconds, since the previous ledger's - (possibly rounded) close time - @param openMSeconds time, in milliseconds, waiting to close this ledger - @param idleInterval the network's desired idle interval -*/ -static -bool -shouldCloseLedger ( - bool anyTransactions, - int previousProposers, - int proposersClosed, - int proposersValidated, - std::chrono::milliseconds previousMSeconds, - std::chrono::milliseconds currentMSeconds, // Time since last ledger's close time - std::chrono::milliseconds openMSeconds, // Time waiting to close this ledger - std::chrono::seconds idleInterval, - beast::Journal j) -{ - using namespace std::chrono_literals; - if ((previousMSeconds < -1s) || (previousMSeconds > 10min) || - (currentMSeconds > 10min)) - { - // These are unexpected cases, we just close the ledger - JLOG (j.warn()) << - "shouldCloseLedger Trans=" << (anyTransactions ? "yes" : "no") << - " Prop: " << previousProposers << "/" << proposersClosed << - " Secs: " << currentMSeconds.count() << " (last: " << - previousMSeconds.count() << ")"; - return true; - } - - if ((proposersClosed + proposersValidated) > (previousProposers / 2)) - { - // If more than half of the network has closed, we close - JLOG (j.trace()) << "Others have closed"; - return true; - } - - if (!anyTransactions) - { - // Only close at the end of the idle interval - return currentMSeconds >= idleInterval; // normal idle - } - - // Preserve minimum ledger open time - if (openMSeconds < LEDGER_MIN_CLOSE) - { - JLOG (j.debug()) << - "Must wait minimum time before closing"; - return false; - } - - // Don't let this ledger close more than twice as fast as the previous - // ledger reached consensus so that slower validators can slow down - // the network - if (openMSeconds < (previousMSeconds / 2)) - { - JLOG (j.debug()) << - "Ledger has not been open long enough"; - return false; - } - - // Close the ledger - return true; -} - -bool -checkConsensusReached (int agreeing, int total, bool count_self) -{ - // If we are alone, we have a consensus - if (total == 0) - return true; - - if (count_self) - { - ++agreeing; - ++total; - } - - int currentPercentage = (agreeing * 100) / total; - - return currentPercentage > minimumConsensusPercentage; -} - -/** What state the consensus process is on. */ -enum class ConsensusState -{ - No, // We do not have consensus - MovedOn, // The network has consensus without us - Yes // We have consensus along with the network -}; - -/** Determine whether the network reached consensus and whether we joined. - - @param previousProposers proposers in the last closing (not including us) - @param currentProposers proposers in this closing so far (not including us) - @param currentAgree proposers who agree with us - @param currentFinished proposers who have validated a ledger after this one - @param previousAgreeTime how long, in milliseconds, it took to agree on the - last ledger - @param currentAgreeTime how long, in milliseconds, we've been trying to - agree -*/ -static -ConsensusState -checkConsensus ( - int previousProposers, - int currentProposers, - int currentAgree, - int currentFinished, - std::chrono::milliseconds previousAgreeTime, - std::chrono::milliseconds currentAgreeTime, - bool proposing, - beast::Journal j) -{ - JLOG (j.trace()) << - "checkConsensus: prop=" << currentProposers << - "/" << previousProposers << - " agree=" << currentAgree << " validated=" << currentFinished << - " time=" << currentAgreeTime.count() << "/" << previousAgreeTime.count(); - - if (currentAgreeTime <= LEDGER_MIN_CONSENSUS) - return ConsensusState::No; - - if (currentProposers < (previousProposers * 3 / 4)) - { - // Less than 3/4 of the last ledger's proposers are present; don't - // rush: we may need more time. - if (currentAgreeTime < (previousAgreeTime + LEDGER_MIN_CONSENSUS)) - { - JLOG (j.trace()) << - "too fast, not enough proposers"; - return ConsensusState::No; - } - } - - // Have we, together with the nodes on our UNL list, reached the threshold - // to declare consensus? - if (checkConsensusReached (currentAgree, currentProposers, proposing)) - { - JLOG (j.debug()) << "normal consensus"; - return ConsensusState::Yes; - } - - // Have sufficient nodes on our UNL list moved on and reached the threshold - // to declare consensus? - if (checkConsensusReached (currentFinished, currentProposers, false)) - { - JLOG (j.warn()) << - "We see no consensus, but 80% of nodes have moved on"; - return ConsensusState::MovedOn; - } - - // no consensus yet - JLOG (j.trace()) << "no consensus"; - return ConsensusState::No; -} - LedgerConsensusImp::LedgerConsensusImp ( Application& app, ConsensusImp& consensus, @@ -233,21 +63,21 @@ LedgerConsensusImp::LedgerConsensusImp ( : app_ (app) , consensus_ (consensus) , inboundTransactions_ (inboundTransactions) - , m_localTX (localtx) + , localTX_ (localtx) , ledgerMaster_ (ledgerMaster) - , m_feeVote (feeVote) + , feeVote_ (feeVote) , state_ (State::open) - , mCloseTime {} - , mValPublic (app_.config().VALIDATION_PUB) - , mValSecret (app_.config().VALIDATION_PRIV) - , mConsensusFail (false) - , mCurrentMSeconds (0) - , mClosePercent (0) - , mCloseResolution (30) - , mHaveCloseTimeConsensus (false) - , mConsensusStartTime (std::chrono::steady_clock::now ()) - , mPreviousProposers (0) - , mPreviousMSeconds (0) + , closeTime_ {} + , valPublic_ (app_.config().VALIDATION_PUB) + , valSecret_ (app_.config().VALIDATION_PRIV) + , consensusFail_ (false) + , roundTime_ (0) + , closePercent_ (0) + , closeResolution_ (30) + , haveCloseTimeConsensus_ (false) + , consensusStartTime_ (std::chrono::steady_clock::now ()) + , previousProposers_ (0) + , previousRoundTime_ (0) , j_ (app.journal ("LedgerConsensus")) { JLOG (j_.debug()) << "Creating consensus object"; @@ -258,15 +88,15 @@ Json::Value LedgerConsensusImp::getJson (bool full) Json::Value ret (Json::objectValue); std::lock_guard _(lock_); - ret["proposing"] = mProposing; - ret["validating"] = mValidating; - ret["proposers"] = static_cast (mPeerPositions.size ()); + ret["proposing"] = proposing_; + ret["validating"] = validating_; + ret["proposers"] = static_cast (peerPositions_.size ()); - if (mHaveCorrectLCL) + if (haveCorrectLCL_) { ret["synched"] = true; - ret["ledger_seq"] = mPreviousLedger->info().seq + 1; - ret["close_granularity"] = mCloseResolution.count(); + ret["ledger_seq"] = previousLedger_->info().seq + 1; + ret["close_granularity"] = closeResolution_.count(); } else ret["synched"] = false; @@ -290,77 +120,78 @@ Json::Value LedgerConsensusImp::getJson (bool full) break; } - int v = mDisputes.size (); + int v = disputes_.size (); if ((v != 0) && !full) ret["disputes"] = v; - if (mOurPosition) - ret["our_position"] = mOurPosition->getJson (); + if (ourPosition_) + ret["our_position"] = ourPosition_->getJson (); if (full) { using Int = Json::Value::Int; - ret["current_ms"] = static_cast(mCurrentMSeconds.count()); - ret["close_percent"] = mClosePercent; - ret["close_resolution"] = mCloseResolution.count(); - ret["have_time_consensus"] = mHaveCloseTimeConsensus; - ret["previous_proposers"] = mPreviousProposers; - ret["previous_mseconds"] = static_cast(mPreviousMSeconds.count()); + ret["current_ms"] = static_cast(roundTime_.count()); + ret["close_percent"] = closePercent_; + ret["close_resolution"] = closeResolution_.count(); + ret["have_timeconsensus_"] = haveCloseTimeConsensus_; + ret["previous_proposers"] = previousProposers_; + ret["previous_mseconds"] = + static_cast(previousRoundTime_.count()); - if (!mPeerPositions.empty ()) + if (! peerPositions_.empty ()) { Json::Value ppj (Json::objectValue); - for (auto& pp : mPeerPositions) + for (auto& pp : peerPositions_) { ppj[to_string (pp.first)] = pp.second->getJson (); } - ret["peer_positions"] = ppj; + ret["peer_positions"] = std::move(ppj); } - if (!mAcquired.empty ()) + if (! acquired_.empty ()) { // acquired Json::Value acq (Json::objectValue); - for (auto& at : mAcquired) + for (auto& at : acquired_) { if (at.second) acq[to_string (at.first)] = "acquired"; else acq[to_string (at.first)] = "failed"; } - ret["acquired"] = acq; + ret["acquired"] = std::move(acq); } - if (!mDisputes.empty ()) + if (! disputes_.empty ()) { Json::Value dsj (Json::objectValue); - for (auto& dt : mDisputes) + for (auto& dt : disputes_) { dsj[to_string (dt.first)] = dt.second->getJson (); } - ret["disputes"] = dsj; + ret["disputes"] = std::move(dsj); } - if (!mCloseTimes.empty ()) + if (! closeTimes_.empty ()) { Json::Value ctj (Json::objectValue); - for (auto& ct : mCloseTimes) + for (auto& ct : closeTimes_) { ctj[std::to_string(ct.first.time_since_epoch().count())] = ct.second; } - ret["close_times"] = ctj; + ret["close_times"] = std::move(ctj); } - if (!mDeadNodes.empty ()) + if (! deadNodes_.empty ()) { Json::Value dnj (Json::arrayValue); - for (auto const& dn : mDeadNodes) + for (auto const& dn : deadNodes_) { dnj.append (to_string (dn)); } - ret["dead_nodes"] = dnj; + ret["dead_nodes"] = std::move(dnj); } } @@ -370,14 +201,47 @@ Json::Value LedgerConsensusImp::getJson (bool full) uint256 LedgerConsensusImp::getLCL () { std::lock_guard _(lock_); - return mPrevLedgerHash; + + return prevLedgerHash_; } +// Called when: +// 1) We take our initial position +// 2) We take a new position +// 3) We acquire a position a validator took +// +// We store it, notify peers that we have it, +// and update our tracking if any validators currently +// propose it void LedgerConsensusImp::mapCompleteInternal ( uint256 const& hash, std::shared_ptr const& map, bool acquired) { + { + auto it = acquired_.find (hash); + if (it != acquired_.end ()) + { + // We have already acquired (or proven invalid) this transaction set + if (map && ! it->second) + { + JLOG (j_.warn()) << "Map " << hash << " proven invalid then acquired"; + assert (false); + } + else if (it->second && ! map) + { + JLOG (j_.warn()) << "Map " << hash << " acquired then proven invalid"; + assert (false); + return; + } + else + { + // nothing to do + return; + } + } + } + if (acquired) { JLOG (j_.trace()) << "We have acquired txs " << hash; @@ -388,27 +252,15 @@ void LedgerConsensusImp::mapCompleteInternal ( JLOG (j_.warn()) << "Tried to acquire invalid transaction map: " << hash; - mAcquired[hash] = map; + acquired_[hash] = map; return; } assert (hash == map->getHash ().as_uint256()); - auto it = mAcquired.find (hash); - - // If we have already acquired this transaction set - if (it != mAcquired.end ()) - { - if (it->second) - return; // we already have this map - - // We previously failed to acquire this map, now we have it - mAcquired.erase (hash); - } - // We now have a map that we did not have before - if (!acquired) + if (! acquired) { // Put the map where others can get it inboundTransactions_.giveSet (hash, map, false); @@ -417,27 +269,27 @@ void LedgerConsensusImp::mapCompleteInternal ( // Inform directly-connected peers that we have this transaction set sendHaveTxSet (hash, true); - if (mOurPosition && (!mOurPosition->isBowOut ()) - && (hash != mOurPosition->getCurrentHash ())) + if (ourPosition_ && (! ourPosition_->isBowOut ()) + && (hash != ourPosition_->getCurrentHash ())) { // this will create disputed transactions - auto it2 = mAcquired.find (mOurPosition->getCurrentHash ()); + auto it = acquired_.find (ourPosition_->getCurrentHash ()); - if (it2 == mAcquired.end()) + if (it == acquired_.end()) LogicError ("We cannot find our own position!"); - assert ((it2->first == mOurPosition->getCurrentHash ()) - && it2->second); - mCompares.insert(hash); + assert ((it->first == ourPosition_->getCurrentHash ()) + && it->second); + compares_.insert(hash); // Our position is not the same as the acquired position - createDisputes (it2->second, map); + createDisputes (it->second, map); } - else if (!mOurPosition) + else if (! ourPosition_) { JLOG (j_.debug()) << "Not creating disputes: no position yet."; } - else if (mOurPosition->isBowOut ()) + else if (ourPosition_->isBowOut ()) { JLOG (j_.warn()) << "Not creating disputes: not participating."; @@ -448,12 +300,12 @@ void LedgerConsensusImp::mapCompleteInternal ( << "Not creating disputes: identical position."; } - mAcquired[hash] = map; + acquired_[hash] = map; // Adjust tracking for each peer that takes this position std::vector peers; auto const mapHash = map->getHash ().as_uint256(); - for (auto& it : mPeerPositions) + for (auto& it : peerPositions_) { if (it.second->getCurrentHash () == mapHash) peers.push_back (it.second->getPeerID ()); @@ -471,16 +323,15 @@ void LedgerConsensusImp::mapCompleteInternal ( } } -void LedgerConsensusImp::mapComplete ( +void LedgerConsensusImp::gotMap ( uint256 const& hash, - std::shared_ptr const& map, - bool acquired) + std::shared_ptr const& map) { std::lock_guard _(lock_); try { - mapCompleteInternal (hash, map, acquired); + mapCompleteInternal (hash, map, true); } catch (SHAMapMissingNode const& mn) { @@ -493,14 +344,14 @@ void LedgerConsensusImp::mapComplete ( void LedgerConsensusImp::checkLCL () { - uint256 netLgr = mPrevLedgerHash; + uint256 netLgr = prevLedgerHash_; int netLgrCount = 0; - uint256 favoredLedger = mPrevLedgerHash; // Don't jump forward + uint256 favoredLedger = prevLedgerHash_; // Don't jump forward uint256 priorLedger; - if (mHaveCorrectLCL) - priorLedger = mPreviousLedger->info().parentHash; // don't jump back + if (haveCorrectLCL_) + priorLedger = previousLedger_->info().parentHash; // don't jump back // Get validators that are on our ledger, or "close" to being on // our ledger. @@ -512,14 +363,14 @@ void LedgerConsensusImp::checkLCL () for (auto& it : vals) { if ((it.second.first > netLgrCount) || - ((it.second.first == netLgrCount) && (it.first == mPrevLedgerHash))) + ((it.second.first == netLgrCount) && (it.first == prevLedgerHash_))) { netLgr = it.first; netLgrCount = it.second.first; } } - if (netLgr != mPrevLedgerHash) + if (netLgr != prevLedgerHash_) { // LCL change const char* status; @@ -550,11 +401,11 @@ void LedgerConsensusImp::checkLCL () << "View of consensus changed during " << status << " (" << netLgrCount << ") status=" << status << ", " - << (mHaveCorrectLCL ? "CorrectLCL" : "IncorrectLCL"); - JLOG (j_.warn()) << mPrevLedgerHash + << (haveCorrectLCL_ ? "CorrectLCL" : "IncorrectLCL"); + JLOG (j_.warn()) << prevLedgerHash_ << " to " << netLgr; JLOG (j_.warn()) - << ripple::getJson (*mPreviousLedger); + << ripple::getJson (*previousLedger_); if (auto stream = j_.debug()) { @@ -564,61 +415,62 @@ void LedgerConsensusImp::checkLCL () stream << getJson (true); } - if (mHaveCorrectLCL) + if (haveCorrectLCL_) app_.getOPs ().consensusViewChange (); handleLCL (netLgr); } - else if (mPreviousLedger->info().hash != mPrevLedgerHash) + else if (previousLedger_->info().hash != prevLedgerHash_) handleLCL (netLgr); } +// Handle a change in the LCL during a consensus round void LedgerConsensusImp::handleLCL (uint256 const& lclHash) { - assert (lclHash != mPrevLedgerHash || - mPreviousLedger->info().hash != lclHash); + assert (lclHash != prevLedgerHash_ || + previousLedger_->info().hash != lclHash); - if (mPrevLedgerHash != lclHash) + if (prevLedgerHash_ != lclHash) { // first time switching to this ledger - mPrevLedgerHash = lclHash; + prevLedgerHash_ = lclHash; - if (mHaveCorrectLCL && mProposing && mOurPosition) + if (haveCorrectLCL_ && proposing_ && ourPosition_) { JLOG (j_.info()) << "Bowing out of consensus"; - mOurPosition->bowOut (); + ourPosition_->bowOut (); propose (); } // Stop proposing because we are out of sync - mProposing = false; - mPeerPositions.clear (); - mDisputes.clear (); - mCompares.clear (); - mCloseTimes.clear (); - mDeadNodes.clear (); + proposing_ = false; + peerPositions_.clear (); + disputes_.clear (); + compares_.clear (); + closeTimes_.clear (); + deadNodes_.clear (); // To get back in sync: playbackProposals (); } - if (mPreviousLedger->info().hash == mPrevLedgerHash) + if (previousLedger_->info().hash == prevLedgerHash_) return; // we need to switch the ledger we're working from - auto buildLCL = ledgerMaster_.getLedgerByHash (mPrevLedgerHash); + auto buildLCL = ledgerMaster_.getLedgerByHash (prevLedgerHash_); if (! buildLCL) { - if (mAcquiringLedger != lclHash) + if (acquiringLedger_ != lclHash) { // need to start acquiring the correct consensus LCL JLOG (j_.warn()) << - "Need consensus ledger " << mPrevLedgerHash; + "Need consensus ledger " << prevLedgerHash_; // Tell the ledger acquire system that we need the consensus ledger - mAcquiringLedger = mPrevLedgerHash; + acquiringLedger_ = prevLedgerHash_; auto app = &app_; - auto hash = mAcquiringLedger; + auto hash = acquiringLedger_; app_.getJobQueue().addJob ( jtADVANCE, "getConsensusLedger", [app, hash] (Job&) { @@ -626,7 +478,7 @@ void LedgerConsensusImp::handleLCL (uint256 const& lclHash) hash, 0, InboundLedger::fcCONSENSUS); }); - mHaveCorrectLCL = false; + haveCorrectLCL_ = false; } return; } @@ -634,14 +486,14 @@ void LedgerConsensusImp::handleLCL (uint256 const& lclHash) assert (!buildLCL->open() && buildLCL->isImmutable ()); assert (buildLCL->info().hash == lclHash); JLOG (j_.info()) << - "Have the consensus ledger " << mPrevLedgerHash; + "Have the consensus ledger " << prevLedgerHash_; startRound ( lclHash, buildLCL, - mCloseTime, - mPreviousProposers, - mPreviousMSeconds); - mProposing = false; + closeTime_, + previousProposers_, + previousRoundTime_); + proposing_ = false; } void LedgerConsensusImp::timerEntry () @@ -654,12 +506,12 @@ void LedgerConsensusImp::timerEntry () checkLCL (); using namespace std::chrono; - mCurrentMSeconds = duration_cast - (steady_clock::now() - mConsensusStartTime); + roundTime_ = duration_cast + (steady_clock::now() - consensusStartTime_); - mClosePercent = mCurrentMSeconds * 100 / + closePercent_ = roundTime_ * 100 / std::max ( - mPreviousMSeconds, AV_MIN_CONSENSUS_TIME); + previousRoundTime_, AV_MIN_CONSENSUS_TIME); switch (state_) { @@ -701,22 +553,22 @@ void LedgerConsensusImp::statePreClose () { // it is shortly before ledger close time bool anyTransactions = ! app_.openLedger().empty(); - int proposersClosed = mPeerPositions.size (); + int proposersClosed = peerPositions_.size (); int proposersValidated = app_.getValidations ().getTrustedValidationCount - (mPrevLedgerHash); + (prevLedgerHash_); // This computes how long since last ledger's close time using namespace std::chrono; milliseconds sinceClose; { - bool previousCloseCorrect = mHaveCorrectLCL - && getCloseAgree (mPreviousLedger->info()) - && (mPreviousLedger->info().closeTime != - (mPreviousLedger->info().parentCloseTime + 1s)); + bool previousCloseCorrect = haveCorrectLCL_ + && getCloseAgree (previousLedger_->info()) + && (previousLedger_->info().closeTime != + (previousLedger_->info().parentCloseTime + 1s)); auto closeTime = previousCloseCorrect - ? mPreviousLedger->info().closeTime // use consensus timing + ? previousLedger_->info().closeTime // use consensus timing : consensus_.getLastCloseTime(); // use the time we saw auto now = app_.timeKeeper().closeTime(); @@ -727,12 +579,12 @@ void LedgerConsensusImp::statePreClose () } auto const idleInterval = std::max(LEDGER_IDLE_INTERVAL, - 2 * mPreviousLedger->info().closeTimeResolution); + 2 * previousLedger_->info().closeTimeResolution); // Decide if we should close the ledger if (shouldCloseLedger (anyTransactions - , mPreviousProposers, proposersClosed, proposersValidated - , mPreviousMSeconds, sinceClose, mCurrentMSeconds + , previousProposers_, proposersClosed, proposersValidated + , previousRoundTime_, sinceClose, roundTime_ , idleInterval, app_.journal ("LedgerTiming"))) { closeLedger (); @@ -742,7 +594,7 @@ void LedgerConsensusImp::statePreClose () void LedgerConsensusImp::stateEstablish () { // Give everyone a chance to take an initial position - if (mCurrentMSeconds < LEDGER_MIN_CONSENSUS) + if (roundTime_ < LEDGER_MIN_CONSENSUS) return; updateOurPositions (); @@ -751,7 +603,7 @@ void LedgerConsensusImp::stateEstablish () if (!haveConsensus ()) return; - if (!mHaveCloseTimeConsensus) + if (!haveCloseTimeConsensus_) { JLOG (j_.info()) << "We have TX consensus but not CT consensus"; @@ -759,7 +611,7 @@ void LedgerConsensusImp::stateEstablish () } JLOG (j_.info()) << - "Converge cutoff (" << mPeerPositions.size () << " participants)"; + "Converge cutoff (" << peerPositions_.size () << " participants)"; state_ = State::processing; beginAccept (false); } @@ -768,10 +620,10 @@ bool LedgerConsensusImp::haveConsensus () { // CHECKME: should possibly count unacquired TX sets as disagreeing int agree = 0, disagree = 0; - uint256 ourPosition = mOurPosition->getCurrentHash (); + uint256 ourPosition = ourPosition_->getCurrentHash (); // Count number of agreements/disagreements with our position - for (auto& it : mPeerPositions) + for (auto& it : peerPositions_) { if (!it.second->isBowOut ()) { @@ -784,17 +636,17 @@ bool LedgerConsensusImp::haveConsensus () JLOG (j_.debug()) << to_string (it.first) << " has " << to_string (it.second->getCurrentHash ()); ++disagree; - if (mCompares.count(it.second->getCurrentHash()) == 0) + if (compares_.count(it.second->getCurrentHash()) == 0) { // Make sure we have generated disputes uint256 hash = it.second->getCurrentHash(); JLOG (j_.debug()) << "We have not compared to " << hash; - auto it1 = mAcquired.find (hash); - auto it2 = mAcquired.find(mOurPosition->getCurrentHash ()); - if ((it1 != mAcquired.end()) && (it2 != mAcquired.end()) + auto it1 = acquired_.find (hash); + auto it2 = acquired_.find(ourPosition_->getCurrentHash ()); + if ((it1 != acquired_.end()) && (it2 != acquired_.end()) && (it1->second) && (it2->second)) { - mCompares.insert(hash); + compares_.insert(hash); createDisputes(it2->second, it1->second); } } @@ -802,15 +654,15 @@ bool LedgerConsensusImp::haveConsensus () } } int currentValidations = app_.getValidations () - .getNodesAfter (mPrevLedgerHash); + .getNodesAfter (prevLedgerHash_); JLOG (j_.debug()) << "Checking for TX consensus: agree=" << agree << ", disagree=" << disagree; // Determine if we actually have consensus or not - auto ret = checkConsensus (mPreviousProposers, agree + disagree, agree, - currentValidations, mPreviousMSeconds, mCurrentMSeconds, mProposing, + auto ret = checkConsensus (previousProposers_, agree + disagree, agree, + currentValidations, previousRoundTime_, roundTime_, proposing_, app_.journal ("LedgerTiming")); if (ret == ConsensusState::No) @@ -818,10 +670,7 @@ bool LedgerConsensusImp::haveConsensus () // There is consensus, but we need to track if the network moved on // without us. - if (ret == ConsensusState::MovedOn) - mConsensusFail = true; - else - mConsensusFail = false; + consensusFail_ = (ret == ConsensusState::MovedOn); return true; } @@ -831,14 +680,14 @@ std::shared_ptr LedgerConsensusImp::getTransactionTree ( { std::lock_guard _(lock_); - auto it = mAcquired.find (hash); - if (it != mAcquired.end() && it->second) + auto it = acquired_.find (hash); + if (it != acquired_.end() && it->second) return it->second; auto set = inboundTransactions_.getSet (hash, true); if (set) - mAcquired[hash] = set; + acquired_[hash] = set; return set; } @@ -846,24 +695,25 @@ std::shared_ptr LedgerConsensusImp::getTransactionTree ( bool LedgerConsensusImp::peerPosition (LedgerProposal::ref newPosition) { std::lock_guard _(lock_); + auto const peerID = newPosition->getPeerID (); - if (newPosition->getPrevLedger() != mPrevLedgerHash) + if (newPosition->getPrevLedger() != prevLedgerHash_) { JLOG (j_.debug()) << "Got proposal for " << newPosition->getPrevLedger () - << " but we are on " << mPrevLedgerHash; + << " but we are on " << prevLedgerHash_; return false; } - if (mDeadNodes.find (peerID) != mDeadNodes.end ()) + if (deadNodes_.find (peerID) != deadNodes_.end ()) { JLOG (j_.info()) << "Position from dead node: " << to_string (peerID); return false; } - LedgerProposal::pointer& currentPosition = mPeerPositions[peerID]; + LedgerProposal::pointer& currentPosition = peerPositions_[peerID]; if (currentPosition) { @@ -880,10 +730,10 @@ bool LedgerConsensusImp::peerPosition (LedgerProposal::ref newPosition) { JLOG (j_.info()) << "Peer bows out: " << to_string (peerID); - for (auto& it : mDisputes) + for (auto& it : disputes_) it.second->unVote (peerID); - mPeerPositions.erase (peerID); - mDeadNodes.insert (peerID); + peerPositions_.erase (peerID); + deadNodes_.insert (peerID); return true; } @@ -893,7 +743,7 @@ bool LedgerConsensusImp::peerPosition (LedgerProposal::ref newPosition) JLOG (j_.trace()) << "Peer reports close time as " << newPosition->getCloseTime().time_since_epoch().count(); - ++mCloseTimes[newPosition->getCloseTime()]; + ++closeTimes_[newPosition->getCloseTime()]; } JLOG (j_.trace()) << "Processing peer proposal " @@ -906,7 +756,7 @@ bool LedgerConsensusImp::peerPosition (LedgerProposal::ref newPosition) if (set) { - for (auto& it : mDisputes) + for (auto& it : disputes_) it.second->setVote (peerID, set->hasItem (it.first)); } else @@ -925,7 +775,7 @@ void LedgerConsensusImp::simulate ( JLOG (j_.info()) << "Simulating consensus"; closeLedger (); - mCurrentMSeconds = consensusDelay.value_or(100ms); + roundTime_ = consensusDelay.value_or(100ms); beginAccept (true); JLOG (j_.info()) << "Simulation complete"; } @@ -934,9 +784,9 @@ void LedgerConsensusImp::accept (std::shared_ptr set) { // put our set where others can get it later if (set->getHash ().isNonZero ()) - consensus_.takePosition (mPreviousLedger->info().seq, set); + consensus_.takePosition (previousLedger_->info().seq, set); - auto closeTime = mOurPosition->getCloseTime(); + auto closeTime = ourPosition_->getCloseTime(); bool closeTimeCorrect; auto replay = ledgerMaster_.releaseReplay(); @@ -949,7 +799,7 @@ void LedgerConsensusImp::accept (std::shared_ptr set) else if (closeTime == NetClock::time_point{}) { // We agreed to disagree on the close time - closeTime = mPreviousLedger->info().closeTime + 1s; + closeTime = previousLedger_->info().closeTime + 1s; closeTimeCorrect = false; } else @@ -960,13 +810,13 @@ void LedgerConsensusImp::accept (std::shared_ptr set) } JLOG (j_.debug()) - << "Report: Prop=" << (mProposing ? "yes" : "no") - << " val=" << (mValidating ? "yes" : "no") - << " corLCL=" << (mHaveCorrectLCL ? "yes" : "no") - << " fail=" << (mConsensusFail ? "yes" : "no"); + << "Report: Prop=" << (proposing_ ? "yes" : "no") + << " val=" << (validating_ ? "yes" : "no") + << " corLCL=" << (haveCorrectLCL_ ? "yes" : "no") + << " fail=" << (consensusFail_ ? "yes" : "no"); JLOG (j_.debug()) - << "Report: Prev = " << mPrevLedgerHash - << ":" << mPreviousLedger->info().seq; + << "Report: Prev = " << prevLedgerHash_ + << ":" << previousLedger_->info().seq; JLOG (j_.debug()) << "Report: TxSt = " << set->getHash () << ", close " << closeTime.time_since_epoch().count() @@ -979,7 +829,7 @@ void LedgerConsensusImp::accept (std::shared_ptr set) { // Build the new last closed ledger auto buildLCL = std::make_shared( - *mPreviousLedger, + *previousLedger_, app_.timeKeeper().closeTime()); auto const v2_enabled = buildLCL->rules().enabled(featureSHAMapV2, app_.config().features); @@ -1003,17 +853,20 @@ void LedgerConsensusImp::accept (std::shared_ptr set) { // Special case, we are replaying a ledger close for (auto& tx : replay->txns_) - applyTransaction (app_, accum, tx.second, false, tapNO_CHECK_SIGN, j_); + applyTransaction (app_, accum, *tx.second, false, tapNO_CHECK_SIGN, j_); } else { // Normal case, we are not replaying a ledger close - applyTransactions (app_, set.get(), accum, - *buildLCL, retriableTxs, tapNONE); + retriableTxs = applyTransactions (app_, *set, accum, + [&buildLCL](uint256 const& txID) + { + return ! buildLCL->txExists(txID); + }); } // Update fee computations. app_.getTxQ().processValidatedLedger(app_, accum, - mCurrentMSeconds > 5s); + roundTime_ > 5s); accum.apply(*buildLCL); } @@ -1039,7 +892,7 @@ void LedgerConsensusImp::accept (std::shared_ptr set) buildLCL->unshare(); // Accept ledger - buildLCL->setAccepted(closeTime, mCloseResolution, + buildLCL->setAccepted(closeTime, closeResolution_, closeTimeCorrect, app_.config()); // And stash the ledger in the ledger master @@ -1062,20 +915,20 @@ void LedgerConsensusImp::accept (std::shared_ptr set) // Tell directly connected peers that we have a new LCL statusChange (protocol::neACCEPTED_LEDGER, *sharedLCL); - if (mValidating && + if (validating_ && ! ledgerMaster_.isCompatible (*sharedLCL, app_.journal("LedgerConsensus").warn(), "Not validating")) { - mValidating = false; + validating_ = false; } - if (mValidating && !mConsensusFail) + if (validating_ && ! consensusFail_) { // Build validation auto v = std::make_shared (newLCLHash, consensus_.validationTimestamp(app_.timeKeeper().now()), - mValPublic, mProposing); + valPublic_, proposing_); v->setFieldU32 (sfLedgerSequence, sharedLCL->info().seq); addLoad(v); // Our network load @@ -1083,11 +936,11 @@ void LedgerConsensusImp::accept (std::shared_ptr set) // next ledger is flag ledger { // Suggest fee changes and new features - m_feeVote.doValidation (sharedLCL, *v); + feeVote_.doValidation (sharedLCL, *v); app_.getAmendmentTable ().doValidation (sharedLCL, *v); } - auto const signingHash = v->sign (mValSecret); + auto const signingHash = v->sign (valSecret_); v->setTrusted (); // suppress it if we receive it - FIXME: wrong suppression app_.getHashRouter ().addSuppression (signingHash); @@ -1121,7 +974,7 @@ void LedgerConsensusImp::accept (std::shared_ptr set) // in the previous consensus round. // bool anyDisputes = false; - for (auto& it : mDisputes) + for (auto& it : disputes_) { if (!it.second->getOurVote ()) { @@ -1154,11 +1007,10 @@ void LedgerConsensusImp::accept (std::shared_ptr set) ledgerMaster_.peekMutex (), std::defer_lock); std::lock(lock, sl); - auto const localTx = m_localTX.getTxSet(); + auto const localTx = localTX_.getTxSet(); auto const oldOL = ledgerMaster_.getCurrentLedger(); - auto const lastVal = - app_.getLedgerMaster().getValidatedLedger(); + auto const lastVal = ledgerMaster_.getValidatedLedger(); boost::optional rules; if (lastVal) rules.emplace(*lastVal); @@ -1179,17 +1031,17 @@ void LedgerConsensusImp::accept (std::shared_ptr set) assert (ledgerMaster_.getClosedLedger()->info().hash == sharedLCL->info().hash); assert (app_.openLedger().current()->info().parentHash == sharedLCL->info().hash); - if (mValidating) + if (validating_) { // see how close our close time is to other node's // close time reports, and update our clock. JLOG (j_.info()) - << "We closed at " << mCloseTime.time_since_epoch().count(); + << "We closed at " << closeTime_.time_since_epoch().count(); using usec64_t = std::chrono::duration; - usec64_t closeTotal = mCloseTime.time_since_epoch(); + usec64_t closeTotal = closeTime_.time_since_epoch(); int closeCount = 1; - for (auto const& p : mCloseTimes) + for (auto const& p : closeTimes_) { // FIXME: Use median, not average JLOG (j_.info()) @@ -1206,7 +1058,7 @@ void LedgerConsensusImp::accept (std::shared_ptr set) using duration = std::chrono::duration; using time_point = std::chrono::time_point; auto offset = time_point{closeTotal} - - std::chrono::time_point_cast(mCloseTime); + std::chrono::time_point_cast(closeTime_); JLOG (j_.info()) << "Our close offset is estimated at " << offset.count() << " (" << closeCount << ")"; @@ -1219,7 +1071,7 @@ void LedgerConsensusImp::accept (std::shared_ptr set) std::lock_guard _(lock_); state_ = State::accepted; - correct = mHaveCorrectLCL; + correct = haveCorrectLCL_; } endConsensus (correct); @@ -1267,7 +1119,7 @@ void LedgerConsensusImp::addDisputedTransaction ( uint256 const& txID, Blob const& tx) { - if (mDisputes.find (txID) != mDisputes.end ()) + if (disputes_.find (txID) != disputes_.end ()) return; JLOG (j_.debug()) << "Transaction " @@ -1276,25 +1128,25 @@ void LedgerConsensusImp::addDisputedTransaction ( bool ourVote = false; // Update our vote on the disputed transaction - if (mOurPosition) + if (ourPosition_) { - auto mit (mAcquired.find (mOurPosition->getCurrentHash ())); + auto mit (acquired_.find (ourPosition_->getCurrentHash ())); - if (mit != mAcquired.end ()) + if (mit != acquired_.end ()) ourVote = mit->second->hasItem (txID); else assert (false); // We don't have our own position? } auto txn = std::make_shared (txID, tx, ourVote, j_); - mDisputes[txID] = txn; + disputes_[txID] = txn; // Update all of the peer's votes on the disputed transaction - for (auto& pit : mPeerPositions) + for (auto& pit : peerPositions_) { - auto cit (mAcquired.find (pit.second->getCurrentHash ())); + auto cit (acquired_.find (pit.second->getCurrentHash ())); - if ((cit != mAcquired.end ()) && cit->second) + if ((cit != acquired_.end ()) && cit->second) { txn->setVote (pit.first, cit->second->hasItem (txID)); } @@ -1317,7 +1169,7 @@ void LedgerConsensusImp::addDisputedTransaction ( void LedgerConsensusImp::adjustCount (std::shared_ptr const& map, const std::vector& peers) { - for (auto& it : mDisputes) + for (auto& it : disputes_) { bool setHas = map->hasItem (it.second->getTransactionID ()); for (auto const& pit : peers) @@ -1327,41 +1179,41 @@ void LedgerConsensusImp::adjustCount (std::shared_ptr const& map, void LedgerConsensusImp::leaveConsensus () { - if (mProposing) + if (proposing_) { - if (mOurPosition && ! mOurPosition->isBowOut ()) + if (ourPosition_ && ! ourPosition_->isBowOut ()) { - mOurPosition->bowOut(); + ourPosition_->bowOut(); propose(); } - mProposing = false; + proposing_ = false; } } void LedgerConsensusImp::propose () { JLOG (j_.trace()) << "We propose: " << - (mOurPosition->isBowOut () + (ourPosition_->isBowOut () ? std::string ("bowOut") - : to_string (mOurPosition->getCurrentHash ())); + : to_string (ourPosition_->getCurrentHash ())); protocol::TMProposeSet prop; - prop.set_currenttxhash (mOurPosition->getCurrentHash ().begin () + prop.set_currenttxhash (ourPosition_->getCurrentHash ().begin () , 256 / 8); - prop.set_previousledger (mOurPosition->getPrevLedger ().begin () + prop.set_previousledger (ourPosition_->getPrevLedger ().begin () , 256 / 8); - prop.set_proposeseq (mOurPosition->getProposeSeq ()); - prop.set_closetime(mOurPosition->getCloseTime().time_since_epoch().count()); + prop.set_proposeseq (ourPosition_->getProposeSeq ()); + prop.set_closetime(ourPosition_->getCloseTime().time_since_epoch().count()); - prop.set_nodepubkey (mValPublic.data(), mValPublic.size()); + prop.set_nodepubkey (valPublic_.data(), valPublic_.size()); - mOurPosition->setSignature ( + ourPosition_->setSignature ( signDigest ( - mValPublic, - mValSecret, - mOurPosition->getSigningHash())); + valPublic_, + valSecret_, + ourPosition_->getSigningHash())); - auto sig = mOurPosition->getSignature(); + auto sig = ourPosition_->getSignature(); prop.set_signature (sig.data(), sig.size()); app_.overlay().send(prop); @@ -1382,7 +1234,7 @@ void LedgerConsensusImp::statusChange ( { protocol::TMStatusChange s; - if (!mHaveCorrectLCL) + if (!haveCorrectLCL_) s.set_newevent (protocol::neLOST_SYNC); else s.set_newevent (event); @@ -1395,7 +1247,7 @@ void LedgerConsensusImp::statusChange ( std::decay_t::bytes); std::uint32_t uMin, uMax; - if (!ledgerMaster_.getFullValidatedRange (uMin, uMax)) + if (! ledgerMaster_.getFullValidatedRange (uMin, uMax)) { uMin = 0; uMax = 0; @@ -1431,13 +1283,13 @@ void LedgerConsensusImp::takeInitialPosition ( SHAMapItem (tx.first->getTransactionID(), std::move (s)), true, false); } - if ((app_.config().standalone() || (mProposing && mHaveCorrectLCL)) - && ((mPreviousLedger->info().seq % 256) == 0)) + if ((app_.config().standalone() || (proposing_ && haveCorrectLCL_)) + && ((previousLedger_->info().seq % 256) == 0)) { // previous ledger was flag ledger, add pseudo-transactions auto const validations = app_.getValidations().getValidations ( - mPreviousLedger->info().parentHash); + previousLedger_->info().parentHash); auto const count = std::count_if ( validations.begin(), validations.end(), @@ -1448,12 +1300,12 @@ void LedgerConsensusImp::takeInitialPosition ( if (count >= ledgerMaster_.getMinValidations()) { - m_feeVote.doVoting ( - mPreviousLedger, + feeVote_.doVoting ( + previousLedger_, validations, initialSet); app_.getAmendmentTable ().doVoting ( - mPreviousLedger, + previousLedger_, validations, initialSet); } @@ -1463,16 +1315,16 @@ void LedgerConsensusImp::takeInitialPosition ( initialSet = initialSet->snapShot (false); // Tell the ledger master not to acquire the ledger we're probably building - ledgerMaster_.setBuildingLedger (mPreviousLedger->info().seq + 1); + ledgerMaster_.setBuildingLedger (previousLedger_->info().seq + 1); auto txSet = initialSet->getHash ().as_uint256(); JLOG (j_.info()) << "initial position " << txSet; mapCompleteInternal (txSet, initialSet, false); - mOurPosition = std::make_shared ( - initialLedger->info().parentHash, txSet, mCloseTime); + ourPosition_ = std::make_shared ( + initialLedger->info().parentHash, txSet, closeTime_); - for (auto& it : mDisputes) + for (auto& it : disputes_) { it.second->setOurVote (initialLedger->txExists (it.first)); } @@ -1480,23 +1332,23 @@ void LedgerConsensusImp::takeInitialPosition ( // if any peers have taken a contrary position, process disputes hash_set found; - for (auto& it : mPeerPositions) + for (auto& it : peerPositions_) { - uint256 set = it.second->getCurrentHash (); + uint256 const& set = it.second->getCurrentHash (); if (found.insert (set).second) { - auto iit (mAcquired.find (set)); + auto iit (acquired_.find (set)); - if (iit != mAcquired.end ()) + if (iit != acquired_.end ()) { - mCompares.insert(iit->second->getHash().as_uint256()); + compares_.insert(iit->second->getHash().as_uint256()); createDisputes (initialSet, iit->second); } } } - if (mProposing) + if (proposing_) propose (); } @@ -1528,8 +1380,8 @@ LedgerConsensusImp::effectiveCloseTime(NetClock::time_point closeTime) return closeTime; return std::max( - roundCloseTime (closeTime, mCloseResolution), - (mPreviousLedger->info().closeTime + 1s)); + roundCloseTime (closeTime, closeResolution_), + (previousLedger_->info().closeTime + 1s)); } void LedgerConsensusImp::updateOurPositions () @@ -1547,38 +1399,39 @@ void LedgerConsensusImp::updateOurPositions () // Verify freshness of peer positions and compute close times std::map closeTimes; - auto it = mPeerPositions.begin (); - - while (it != mPeerPositions.end ()) { - if (it->second->isStale (peerCutoff)) + auto it = peerPositions_.begin (); + while (it != peerPositions_.end ()) { - // peer's proposal is stale, so remove it - auto const& peerID = it->second->getPeerID (); - JLOG (j_.warn()) - << "Removing stale proposal from " << peerID; - for (auto& dt : mDisputes) - dt.second->unVote (peerID); - it = mPeerPositions.erase (it); - } - else - { - // proposal is still fresh - ++closeTimes[effectiveCloseTime(it->second->getCloseTime())]; - ++it; + if (it->second->isStale (peerCutoff)) + { + // peer's proposal is stale, so remove it + auto const& peerID = it->second->getPeerID (); + JLOG (j_.warn()) + << "Removing stale proposal from " << peerID; + for (auto& dt : disputes_) + dt.second->unVote (peerID); + it = peerPositions_.erase (it); + } + else + { + // proposal is still fresh + ++closeTimes[effectiveCloseTime(it->second->getCloseTime())]; + ++it; + } } } // Update votes on disputed transactions - for (auto& it : mDisputes) + for (auto& it : disputes_) { // Because the threshold for inclusion increases, // time can change our position on a dispute - if (it.second->updateVote (mClosePercent, mProposing)) + if (it.second->updateVote (closePercent_, proposing_)) { if (!changes) { - ourPosition = mAcquired[mOurPosition->getCurrentHash ()] + ourPosition = acquired_[ourPosition_->getCurrentHash ()] ->snapShot (true); assert (ourPosition); changes = true; @@ -1600,30 +1453,30 @@ void LedgerConsensusImp::updateOurPositions () int neededWeight; - if (mClosePercent < AV_MID_CONSENSUS_TIME) + if (closePercent_ < AV_MID_CONSENSUS_TIME) neededWeight = AV_INIT_CONSENSUS_PCT; - else if (mClosePercent < AV_LATE_CONSENSUS_TIME) + else if (closePercent_ < AV_LATE_CONSENSUS_TIME) neededWeight = AV_MID_CONSENSUS_PCT; - else if (mClosePercent < AV_STUCK_CONSENSUS_TIME) + else if (closePercent_ < AV_STUCK_CONSENSUS_TIME) neededWeight = AV_LATE_CONSENSUS_PCT; else neededWeight = AV_STUCK_CONSENSUS_PCT; NetClock::time_point closeTime = {}; - mHaveCloseTimeConsensus = false; + haveCloseTimeConsensus_ = false; - if (mPeerPositions.empty ()) + if (peerPositions_.empty ()) { // no other times - mHaveCloseTimeConsensus = true; - closeTime = effectiveCloseTime(mOurPosition->getCloseTime()); + haveCloseTimeConsensus_ = true; + closeTime = effectiveCloseTime(ourPosition_->getCloseTime()); } else { - int participants = mPeerPositions.size (); - if (mProposing) + int participants = peerPositions_.size (); + if (proposing_) { - ++closeTimes[effectiveCloseTime(mOurPosition->getCloseTime())]; + ++closeTimes[effectiveCloseTime(ourPosition_->getCloseTime())]; ++participants; } @@ -1636,13 +1489,13 @@ void LedgerConsensusImp::updateOurPositions () participants, AV_CT_CONSENSUS_PCT); JLOG (j_.info()) << "Proposers:" - << mPeerPositions.size () << " nw:" << neededWeight + << peerPositions_.size () << " nw:" << neededWeight << " thrV:" << threshVote << " thrC:" << threshConsensus; for (auto const& it : closeTimes) { JLOG (j_.debug()) << "CCTime: seq " - << mPreviousLedger->info().seq + 1 << ": " + << previousLedger_->info().seq + 1 << ": " << it.first.time_since_epoch().count() << " has " << it.second << ", " << threshVote << " required"; @@ -1654,15 +1507,15 @@ void LedgerConsensusImp::updateOurPositions () threshVote = it.second; if (threshVote >= threshConsensus) - mHaveCloseTimeConsensus = true; + haveCloseTimeConsensus_ = true; } } - if (!mHaveCloseTimeConsensus) + if (!haveCloseTimeConsensus_) { JLOG (j_.debug()) << "No CT consensus:" - << " Proposers:" << mPeerPositions.size () - << " Proposing:" << (mProposing ? "yes" : "no") + << " Proposers:" << peerPositions_.size () + << " Proposing:" << (proposing_ ? "yes" : "no") << " Thresh:" << threshConsensus << " Pos:" << closeTime.time_since_epoch().count(); } @@ -1673,11 +1526,11 @@ void LedgerConsensusImp::updateOurPositions () // to the full network, this can be relaxed to force a change // only if the rounded close time has changed. if (!changes && - ((closeTime != mOurPosition->getCloseTime()) - || mOurPosition->isStale (ourCutoff))) + ((closeTime != ourPosition_->getCloseTime()) + || ourPosition_->isStale (ourCutoff))) { // close time changed or our position is stale - ourPosition = mAcquired[mOurPosition->getCurrentHash ()] + ourPosition = acquired_[ourPosition_->getCurrentHash ()] ->snapShot (true); assert (ourPosition); changes = true; // We pretend our position changed to force @@ -1693,9 +1546,9 @@ void LedgerConsensusImp::updateOurPositions () << closeTime.time_since_epoch().count() << ", tx " << newHash; - if (mOurPosition->changePosition(newHash, closeTime)) + if (ourPosition_->changePosition(newHash, closeTime)) { - if (mProposing) + if (proposing_) propose (); mapCompleteInternal (newHash, ourPosition, false); @@ -1705,46 +1558,46 @@ void LedgerConsensusImp::updateOurPositions () void LedgerConsensusImp::playbackProposals () { - consensus_.visitStoredProposals ( - [this](LedgerProposal::ref proposal) + auto proposals = consensus_.getStoredProposals (prevLedgerHash_); + + for (auto& proposal : proposals) + { + if (peerPosition (proposal)) { - if (proposal->isPrevLedger (mPrevLedgerHash) && - peerPosition (proposal)) - { - // Now that we know this proposal - // is useful, relay it - protocol::TMProposeSet prop; + // Now that we know this proposal + // is useful, relay it + protocol::TMProposeSet prop; - prop.set_proposeseq ( - proposal->getProposeSeq ()); - prop.set_closetime ( - proposal->getCloseTime ().time_since_epoch().count()); + prop.set_proposeseq ( + proposal->getProposeSeq ()); + prop.set_closetime ( + proposal->getCloseTime ().time_since_epoch().count()); - prop.set_currenttxhash ( - proposal->getCurrentHash().begin(), 256 / 8); - prop.set_previousledger ( - proposal->getPrevLedger().begin(), 256 / 8); + prop.set_currenttxhash ( + proposal->getCurrentHash().begin(), 256 / 8); + prop.set_previousledger ( + proposal->getPrevLedger().begin(), 256 / 8); - auto const pk = proposal->getPublicKey().slice(); - prop.set_nodepubkey (pk.data(), pk.size()); + auto const pk = proposal->getPublicKey().slice(); + prop.set_nodepubkey (pk.data(), pk.size()); - auto const sig = proposal->getSignature(); - prop.set_signature (sig.data(), sig.size()); + auto const sig = proposal->getSignature(); + prop.set_signature (sig.data(), sig.size()); - app_.overlay().relay ( - prop, proposal->getSuppressionID ()); - } - }); + app_.overlay().relay ( + prop, proposal->getSuppressionID ()); + } + } } void LedgerConsensusImp::closeLedger () { checkOurValidation (); state_ = State::establish; - mConsensusStartTime = std::chrono::steady_clock::now (); - mCloseTime = app_.timeKeeper().closeTime(); - consensus_.setLastCloseTime(mCloseTime); - statusChange (protocol::neCLOSING_LEDGER, *mPreviousLedger); + consensusStartTime_ = std::chrono::steady_clock::now (); + closeTime_ = app_.timeKeeper().closeTime(); + consensus_.setLastCloseTime(closeTime_); + statusChange (protocol::neCLOSING_LEDGER, *previousLedger_); ledgerMaster_.applyHeldTransactions (); takeInitialPosition (app_.openLedger().current()); } @@ -1753,7 +1606,7 @@ void LedgerConsensusImp::checkOurValidation () { // This only covers some cases - Fix for the case where we can't ever // acquire the consensus ledger - if (!mHaveCorrectLCL || !mValPublic.size () + if (! haveCorrectLCL_ || ! valPublic_.size () || app_.getOPs ().isNeedNetworkLedger ()) { return; @@ -1764,20 +1617,20 @@ void LedgerConsensusImp::checkOurValidation () if (lastValidation) { if (lastValidation->getFieldU32 (sfLedgerSequence) - == mPreviousLedger->info().seq) + == previousLedger_->info().seq) { return; } - if (lastValidation->getLedgerHash () == mPrevLedgerHash) + if (lastValidation->getLedgerHash () == prevLedgerHash_) return; } - auto v = std::make_shared (mPreviousLedger->info().hash, + auto v = std::make_shared (previousLedger_->info().hash, consensus_.validationTimestamp(app_.timeKeeper().now()), - mValPublic, false); + valPublic_, false); addLoad(v); v->setTrusted (); - auto const signingHash = v->sign (mValSecret); + auto const signingHash = v->sign (valSecret_); // FIXME: wrong supression app_.getHashRouter ().addSuppression (signingHash); app_.getValidations ().addValidation (v, "localMissing"); @@ -1790,7 +1643,7 @@ void LedgerConsensusImp::checkOurValidation () void LedgerConsensusImp::beginAccept (bool synchronous) { - auto consensusSet = mAcquired[mOurPosition->getCurrentHash ()]; + auto consensusSet = acquired_[ourPosition_->getCurrentHash ()]; if (!consensusSet) { @@ -1799,7 +1652,7 @@ void LedgerConsensusImp::beginAccept (bool synchronous) abort (); } - consensus_.newLCL (mPeerPositions.size (), mCurrentMSeconds); + consensus_.newLCL (peerPositions_.size (), roundTime_); if (synchronous) accept (consensusSet); @@ -1832,40 +1685,40 @@ void LedgerConsensusImp::startRound ( } state_ = State::open; - mCloseTime = closeTime; - mPrevLedgerHash = prevLCLHash; - mPreviousLedger = prevLedger; - mOurPosition.reset(); - mConsensusFail = false; - mCurrentMSeconds = 0ms; - mClosePercent = 0; - mHaveCloseTimeConsensus = false; - mConsensusStartTime = std::chrono::steady_clock::now(); - mPreviousProposers = previousProposers; - mPreviousMSeconds = previousConvergeTime; - inboundTransactions_.newRound (mPreviousLedger->info().seq); + closeTime_ = closeTime; + prevLedgerHash_ = prevLCLHash; + previousLedger_ = prevLedger; + ourPosition_.reset(); + consensusFail_ = false; + roundTime_ = 0ms; + closePercent_ = 0; + haveCloseTimeConsensus_ = false; + consensusStartTime_ = std::chrono::steady_clock::now(); + previousProposers_ = previousProposers; + previousRoundTime_ = previousConvergeTime; + inboundTransactions_.newRound (previousLedger_->info().seq); - mPeerPositions.clear(); - mAcquired.clear(); - mDisputes.clear(); - mCompares.clear(); - mCloseTimes.clear(); - mDeadNodes.clear(); + peerPositions_.clear(); + acquired_.clear(); + disputes_.clear(); + compares_.clear(); + closeTimes_.clear(); + deadNodes_.clear(); - mCloseResolution = getNextLedgerTimeResolution ( - mPreviousLedger->info().closeTimeResolution, - getCloseAgree (mPreviousLedger->info()), - mPreviousLedger->info().seq + 1); + closeResolution_ = getNextLedgerTimeResolution ( + previousLedger_->info().closeTimeResolution, + getCloseAgree (previousLedger_->info()), + previousLedger_->info().seq + 1); - if (mValPublic.size () && !app_.getOPs ().isNeedNetworkLedger ()) + if (valPublic_.size () && ! app_.getOPs ().isNeedNetworkLedger ()) { // If the validation keys were set, and if we need a ledger, // then we want to validate, and possibly propose a ledger. JLOG (j_.info()) << "Entering consensus process, validating"; - mValidating = true; + validating_ = true; // Propose if we are in sync with the network - mProposing = + proposing_ = app_.getOPs ().getOperatingMode () == NetworkOPs::omFULL; } else @@ -1873,23 +1726,23 @@ void LedgerConsensusImp::startRound ( // Otherwise we just want to monitor the validation process. JLOG (j_.info()) << "Entering consensus process, watching"; - mProposing = mValidating = false; + proposing_ = validating_ = false; } - mHaveCorrectLCL = (mPreviousLedger->info().hash == mPrevLedgerHash); + haveCorrectLCL_ = (previousLedger_->info().hash == prevLedgerHash_); - if (!mHaveCorrectLCL) + if (! haveCorrectLCL_) { // If we were not handed the correct LCL, then set our state // to not proposing. consensus_.setProposing (false, false); - handleLCL (mPrevLedgerHash); + handleLCL (prevLedgerHash_); - if (!mHaveCorrectLCL) + if (! haveCorrectLCL_) { JLOG (j_.info()) << "Entering consensus with: " - << mPreviousLedger->info().hash; + << previousLedger_->info().hash; JLOG (j_.info()) << "Correct LCL is: " << prevLCLHash; } @@ -1898,11 +1751,11 @@ void LedgerConsensusImp::startRound ( { // update the network status table as to whether we're // proposing/validating - consensus_.setProposing (mProposing, mValidating); + consensus_.setProposing (proposing_, validating_); } playbackProposals (); - if (mPeerPositions.size() > (mPreviousProposers / 2)) + if (peerPositions_.size() > (previousProposers_ / 2)) { // We may be falling behind, don't wait for the timer // consider closing the ledger immediately @@ -1937,89 +1790,34 @@ make_LedgerConsensus ( //------------------------------------------------------------------------------ -int -applyTransaction (Application& app, OpenView& view, - std::shared_ptr const& txn, - bool retryAssured, ApplyFlags flags, - beast::Journal j) -{ - // Returns false if the transaction has need not be retried. - if (retryAssured) - flags = flags | tapRETRY; - - JLOG (j.debug()) << "TXN " - << txn->getTransactionID () - //<< (engine.view().open() ? " open" : " closed") - // because of the optional in engine - << (retryAssured ? "/retry" : "/final"); - JLOG (j.trace()) << txn->getJson (0); - - try - { - auto const result = apply(app, - view, *txn, flags, j); - if (result.second) - { - JLOG (j.debug()) - << "Transaction applied: " << transHuman (result.first); - return LedgerConsensusImp::resultSuccess; - } - - if (isTefFailure (result.first) || isTemMalformed (result.first) || - isTelLocal (result.first)) - { - // failure - JLOG (j.debug()) - << "Transaction failure: " << transHuman (result.first); - return LedgerConsensusImp::resultFail; - } - - JLOG (j.debug()) - << "Transaction retry: " << transHuman (result.first); - return LedgerConsensusImp::resultRetry; - } - catch (std::exception const&) - { - JLOG (j.warn()) << "Throws"; - return LedgerConsensusImp::resultFail; - } -} - -void applyTransactions ( +CanonicalTXSet +applyTransactions ( Application& app, - SHAMap const* set, + SHAMap const& set, OpenView& view, - ReadView const& checkLedger, - CanonicalTXSet& retriableTxs, - ApplyFlags flags) + std::function txFilter) { - auto j = app.journal ("LedgerConsensus"); - if (set) + + CanonicalTXSet retriableTxs (set.getHash().as_uint256()); + + for (auto const& item : set) { - for (auto const& item : *set) + if (! txFilter (item.key())) + continue; + + // The transaction wan't filtered + // Add it to the set to be tried in canonical order + JLOG (j.debug()) << + "Processing candidate transaction: " << item.key(); + try { - if (checkLedger.txExists (item.key())) - continue; - - // The transaction isn't in the check ledger, try to apply it - JLOG (j.debug()) << - "Processing candidate transaction: " << item.key(); - std::shared_ptr txn; - try - { - txn = std::make_shared(SerialIter{item.slice()}); - } - catch (std::exception const&) - { - JLOG (j.warn()) << " Throws"; - } - - if (txn) - { - // All transactions execute in canonical order - retriableTxs.insert (txn); - } + retriableTxs.insert ( + std::make_shared(SerialIter{item.slice()})); + } + catch (std::exception const&) + { + JLOG (j.warn()) << "Txn " << item.key() << " throws"; } } @@ -2039,18 +1837,18 @@ void applyTransactions ( try { switch (applyTransaction (app, view, - it->second, certainRetry, flags, j)) + *it->second, certainRetry, tapNO_CHECK_SIGN, j)) { - case LedgerConsensusImp::resultSuccess: + case ApplyResult::Success: it = retriableTxs.erase (it); ++changes; break; - case LedgerConsensusImp::resultFail: + case ApplyResult::Fail: it = retriableTxs.erase (it); break; - case LedgerConsensusImp::resultRetry: + case ApplyResult::Retry: ++it; } } @@ -2067,7 +1865,7 @@ void applyTransactions ( // A non-retry pass made no changes if (!changes && !certainRetry) - return; + return retriableTxs; // Stop retriable passes if (!changes || (pass >= LEDGER_RETRY_PASSES)) @@ -2077,6 +1875,7 @@ void applyTransactions ( // If there are any transactions left, we must have // tried them in at least one final pass assert (retriableTxs.empty() || !certainRetry); + return retriableTxs; } } // ripple diff --git a/src/ripple/app/ledger/impl/LedgerConsensusImp.h b/src/ripple/app/ledger/impl/LedgerConsensusImp.h index 95e7b1f7c3..aa5f6a79f3 100644 --- a/src/ripple/app/ledger/impl/LedgerConsensusImp.h +++ b/src/ripple/app/ledger/impl/LedgerConsensusImp.h @@ -129,10 +129,9 @@ public: @param map the transaction set. @param acquired true if we have acquired the transaction set. */ - void mapComplete ( + void gotMap ( uint256 const& hash, - std::shared_ptr const& map, - bool acquired) override; + std::shared_ptr const& map) override; /** On timer call the correct handler for each state. @@ -303,56 +302,57 @@ private: Application& app_; ConsensusImp& consensus_; InboundTransactions& inboundTransactions_; - LocalTxs& m_localTX; + LocalTxs& localTX_; LedgerMaster& ledgerMaster_; - FeeVote& m_feeVote; + FeeVote& feeVote_; std::recursive_mutex lock_; State state_; // The wall time this ledger closed - NetClock::time_point mCloseTime; + NetClock::time_point closeTime_; - uint256 mPrevLedgerHash; - uint256 mAcquiringLedger; + uint256 prevLedgerHash_; + uint256 acquiringLedger_; - std::shared_ptr mPreviousLedger; - LedgerProposal::pointer mOurPosition; - PublicKey mValPublic; - SecretKey mValSecret; - bool mProposing, mValidating, mHaveCorrectLCL, mConsensusFail; + std::shared_ptr previousLedger_; + LedgerProposal::pointer ourPosition_; + PublicKey valPublic_; + SecretKey valSecret_; + bool proposing_, validating_, haveCorrectLCL_, consensusFail_; - std::chrono::milliseconds mCurrentMSeconds; + // How much time has elapsed since the round started + std::chrono::milliseconds roundTime_; // How long the close has taken, expressed as a percentage of the time that // we expected it to take. - int mClosePercent; + int closePercent_; - NetClock::duration mCloseResolution; + NetClock::duration closeResolution_; - bool mHaveCloseTimeConsensus; + bool haveCloseTimeConsensus_; - std::chrono::steady_clock::time_point mConsensusStartTime; - int mPreviousProposers; + std::chrono::steady_clock::time_point consensusStartTime_; + int previousProposers_; - // The time it took for the last consensus process to converge - std::chrono::milliseconds mPreviousMSeconds; + // Time it took for the last consensus round to converge + std::chrono::milliseconds previousRoundTime_; // Convergence tracking, trusted peers indexed by hash of public key - hash_map mPeerPositions; + hash_map peerPositions_; // Transaction Sets, indexed by hash of transaction tree - hash_map> mAcquired; + hash_map> acquired_; // Disputed transactions - hash_map> mDisputes; - hash_set mCompares; + hash_map> disputes_; + hash_set compares_; // Close time estimates, keep ordered for predictable traverse - std::map mCloseTimes; + std::map closeTimes_; // nodes that have bowed out of this consensus process - hash_set mDeadNodes; + hash_set deadNodes_; beast::Journal j_; }; @@ -367,6 +367,24 @@ make_LedgerConsensus ( LedgerMaster& ledgerMaster, FeeVote& feeVote); +//------------------------------------------------------------------------------ +/** Apply a set of transactions to a ledger + + Typically the txFilter is used to reject transactions + that already got in the prior ledger + + @param set set of transactions to apply + @param view ledger to apply to + @param txFilter callback, return false to reject txn + @return retriable transactions +*/ +CanonicalTXSet +applyTransactions ( + Application& app, + SHAMap const& set, + OpenView& view, + std::function txFilter); + } // ripple #endif diff --git a/src/ripple/app/ledger/impl/LedgerTiming.cpp b/src/ripple/app/ledger/impl/LedgerTiming.cpp index 9e80c846a7..eedf5f7970 100644 --- a/src/ripple/app/ledger/impl/LedgerTiming.cpp +++ b/src/ripple/app/ledger/impl/LedgerTiming.cpp @@ -73,4 +73,137 @@ roundCloseTime ( closeTime += (closeResolution / 2); return closeTime - (closeTime.time_since_epoch() % closeResolution); } + +bool +shouldCloseLedger ( + bool anyTransactions, + int previousProposers, + int proposersClosed, + int proposersValidated, + std::chrono::milliseconds previousTime, + std::chrono::milliseconds currentTime, // Time since last ledger's close time + std::chrono::milliseconds openTime, // Time waiting to close this ledger + std::chrono::seconds idleInterval, + beast::Journal j) +{ + using namespace std::chrono_literals; + if ((previousTime < -1s) || (previousTime > 10min) || + (currentTime > 10min)) + { + // These are unexpected cases, we just close the ledger + JLOG (j.warn()) << + "shouldCloseLedger Trans=" << (anyTransactions ? "yes" : "no") << + " Prop: " << previousProposers << "/" << proposersClosed << + " Secs: " << currentTime.count() << " (last: " << + previousTime.count() << ")"; + return true; + } + + if ((proposersClosed + proposersValidated) > (previousProposers / 2)) + { + // If more than half of the network has closed, we close + JLOG (j.trace()) << "Others have closed"; + return true; + } + + if (!anyTransactions) + { + // Only close at the end of the idle interval + return currentTime >= idleInterval; // normal idle + } + + // Preserve minimum ledger open time + if (openTime < LEDGER_MIN_CLOSE) + { + JLOG (j.debug()) << + "Must wait minimum time before closing"; + return false; + } + + // Don't let this ledger close more than twice as fast as the previous + // ledger reached consensus so that slower validators can slow down + // the network + if (openTime < (previousTime / 2)) + { + JLOG (j.debug()) << + "Ledger has not been open long enough"; + return false; + } + + // Close the ledger + return true; +} + +bool +checkConsensusReached (int agreeing, int total, bool count_self) +{ + // If we are alone, we have a consensus + if (total == 0) + return true; + + if (count_self) + { + ++agreeing; + ++total; + } + + int currentPercentage = (agreeing * 100) / total; + + return currentPercentage > minimumConsensusPercentage; +} + +ConsensusState +checkConsensus ( + int previousProposers, + int currentProposers, + int currentAgree, + int currentFinished, + std::chrono::milliseconds previousAgreeTime, + std::chrono::milliseconds currentAgreeTime, + bool proposing, + beast::Journal j) +{ + JLOG (j.trace()) << + "checkConsensus: prop=" << currentProposers << + "/" << previousProposers << + " agree=" << currentAgree << " validated=" << currentFinished << + " time=" << currentAgreeTime.count() << "/" << previousAgreeTime.count(); + + if (currentAgreeTime <= LEDGER_MIN_CONSENSUS) + return ConsensusState::No; + + if (currentProposers < (previousProposers * 3 / 4)) + { + // Less than 3/4 of the last ledger's proposers are present; don't + // rush: we may need more time. + if (currentAgreeTime < (previousAgreeTime + LEDGER_MIN_CONSENSUS)) + { + JLOG (j.trace()) << + "too fast, not enough proposers"; + return ConsensusState::No; + } + } + + // Have we, together with the nodes on our UNL list, reached the threshold + // to declare consensus? + if (checkConsensusReached (currentAgree, currentProposers, proposing)) + { + JLOG (j.debug()) << "normal consensus"; + return ConsensusState::Yes; + } + + // Have sufficient nodes on our UNL list moved on and reached the threshold + // to declare consensus? + if (checkConsensusReached (currentFinished, currentProposers, false)) + { + JLOG (j.warn()) << + "We see no consensus, but 80% of nodes have moved on"; + return ConsensusState::MovedOn; + } + + // no consensus yet + JLOG (j.trace()) << "no consensus"; + return ConsensusState::No; +} + } // ripple diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 27d47045d0..eecae28966 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -1514,7 +1514,7 @@ void NetworkOPsImp::mapComplete (uint256 const& hash, std::shared_ptr const& map) { - mLedgerConsensus->mapComplete (hash, map, true); + mLedgerConsensus->gotMap (hash, map); } void NetworkOPsImp::endConsensus (bool correctLCL) diff --git a/src/ripple/app/tx/apply.h b/src/ripple/app/tx/apply.h index af4a416c9f..1c2c83d0f9 100644 --- a/src/ripple/app/tx/apply.h +++ b/src/ripple/app/tx/apply.h @@ -95,6 +95,25 @@ apply (Application& app, OpenView& view, STTx const& tx, ApplyFlags flags, beast::Journal journal); + +/** Class for return value from applyTransaction */ +enum class ApplyResult +{ + Success, // Applied to this ledger + Fail, // Should not be retried in this ledger + Retry // Should be retried in this ledger +}; + +/** Transaction application helper + + Provides more detailed logging and decodes the + correct behavior based on the TER type +*/ +ApplyResult +applyTransaction(Application& app, OpenView& view, + STTx const& tx, bool retryAssured, ApplyFlags flags, + beast::Journal journal); + } // ripple #endif diff --git a/src/ripple/app/tx/impl/apply.cpp b/src/ripple/app/tx/impl/apply.cpp index 93bb99eccd..21d474619f 100644 --- a/src/ripple/app/tx/impl/apply.cpp +++ b/src/ripple/app/tx/impl/apply.cpp @@ -18,6 +18,7 @@ //============================================================================== #include +#include #include #include #include @@ -114,4 +115,51 @@ apply (Application& app, OpenView& view, return doApply(pcresult, app, view); } +ApplyResult +applyTransaction (Application& app, OpenView& view, + STTx const& txn, + bool retryAssured, ApplyFlags flags, + beast::Journal j) +{ + // Returns false if the transaction has need not be retried. + if (retryAssured) + flags = flags | tapRETRY; + + JLOG (j.debug()) << "TXN " + << txn.getTransactionID () + //<< (engine.view().open() ? " open" : " closed") + // because of the optional in engine + << (retryAssured ? "/retry" : "/final"); + + try + { + auto const result = apply(app, + view, txn, flags, j); + if (result.second) + { + JLOG (j.debug()) + << "Transaction applied: " << transHuman (result.first); + return ApplyResult::Success; + } + + if (isTefFailure (result.first) || isTemMalformed (result.first) || + isTelLocal (result.first)) + { + // failure + JLOG (j.debug()) + << "Transaction failure: " << transHuman (result.first); + return ApplyResult::Fail; + } + + JLOG (j.debug()) + << "Transaction retry: " << transHuman (result.first); + return ApplyResult::Retry; + } + catch (std::exception const&) + { + JLOG (j.warn()) << "Throws"; + return ApplyResult::Fail; + } +} + } // ripple