//------------------------------------------------------------------------------ /* This file is part of rippled: https://github.com/ripple/rippled Copyright (c) 2012, 2013 Ripple Labs Inc. Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies. THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ //============================================================================== #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace ripple { /** Provides the implementation for LedgerConsensus. Achieves consensus on the next ledger. This object is created when the consensus process starts, and is destroyed when the process is complete. Nearly everything herein is invoked with the master lock. Two things need consensus: 1. The set of transactions. 2. The close time for the ledger. */ class LedgerConsensusImp : public LedgerConsensus , public std::enable_shared_from_this , public CountedObject { private: enum class State { // We haven't closed our ledger yet, but others might have open, // Establishing consensus establish, // We have closed on a transaction set finished, // We have accepted / validated a new last closed ledger accepted, }; public: /** * The result of applying a transaction to a ledger. */ enum {resultSuccess, resultFail, resultRetry}; static char const* getCountedObjectName () { return "LedgerConsensus"; } LedgerConsensusImp(LedgerConsensusImp const&) = delete; LedgerConsensusImp& operator=(LedgerConsensusImp const&) = delete; /** @param previousProposers the number of participants in the last round @param previousConvergeTime how long the last round took (ms) @param inboundTransactions @param localtx transactions issued by local clients @param inboundTransactions the set of @param localtx A set of local transactions to apply @param prevLCLHash The hash of the Last Closed Ledger (LCL). @param previousLedger Best guess of what the LCL was. @param closeTime Closing time point of the LCL. @param feeVote Our desired fee levels and voting logic. */ LedgerConsensusImp ( int previousProposers, int previousConvergeTime, InboundTransactions& inboundTransactions, LocalTxs& localtx, LedgerHash const & prevLCLHash, Ledger::ref previousLedger, std::uint32_t closeTime, FeeVote& feeVote) : inboundTransactions_ (inboundTransactions) , m_localTX (localtx) , m_feeVote (feeVote) , state_ (State::open) , mCloseTime (closeTime) , mPrevLedgerHash (prevLCLHash) , mPreviousLedger (previousLedger) , mValPublic (getConfig ().VALIDATION_PUB) , mValPrivate (getConfig ().VALIDATION_PRIV) , mConsensusFail (false) , mCurrentMSeconds (0) , mClosePercent (0) , mHaveCloseTimeConsensus (false) , mConsensusStartTime (std::chrono::steady_clock::now ()) , mPreviousProposers (previousProposers) , mPreviousMSeconds (previousConvergeTime) { WriteLog (lsDEBUG, LedgerConsensus) << "Creating consensus object"; WriteLog (lsTRACE, LedgerConsensus) << "LCL:" << previousLedger->getHash () << ", ct=" << closeTime; assert (mPreviousMSeconds); inboundTransactions_.newRound (mPreviousLedger->getLedgerSeq()); // Adapt close time resolution to recent network conditions mCloseResolution = getNextLedgerTimeResolution ( mPreviousLedger->getCloseResolution (), mPreviousLedger->getCloseAgree (), mPreviousLedger->getLedgerSeq () + 1); if (mValPublic.isSet () && mValPrivate.isSet () && !getApp().getOPs ().isNeedNetworkLedger ()) { // If the validation keys were set, and if we need a ledger, // then we want to validate, and possibly propose a ledger. WriteLog (lsINFO, LedgerConsensus) << "Entering consensus process, validating"; mValidating = true; // Propose if we are in sync with the network mProposing = getApp().getOPs ().getOperatingMode () == NetworkOPs::omFULL; } else { // Otherwise we just want to monitor the validation process. WriteLog (lsINFO, LedgerConsensus) << "Entering consensus process, watching"; mProposing = mValidating = false; } mHaveCorrectLCL = (mPreviousLedger->getHash () == mPrevLedgerHash); if (!mHaveCorrectLCL) { // If we were not handed the correct LCL, then set our state // to not proposing. getApp().getOPs ().setProposing (false, false); handleLCL (mPrevLedgerHash); if (!mHaveCorrectLCL) { // mProposing = mValidating = false; WriteLog (lsINFO, LedgerConsensus) << "Entering consensus with: " << previousLedger->getHash (); WriteLog (lsINFO, LedgerConsensus) << "Correct LCL is: " << prevLCLHash; } } else // update the network status table as to whether we're proposing/validating getApp().getOPs ().setProposing (mProposing, mValidating); } /** Get the Json state of the consensus process. Called by the consensus_info RPC. @param full True if verbose response desired. @return The Json state. */ Json::Value getJson (bool full) { Json::Value ret (Json::objectValue); ret["proposing"] = mProposing; ret["validating"] = mValidating; ret["proposers"] = static_cast (mPeerPositions.size ()); if (mHaveCorrectLCL) { ret["synched"] = true; ret["ledger_seq"] = mPreviousLedger->getLedgerSeq () + 1; ret["close_granularity"] = mCloseResolution; } else ret["synched"] = false; switch (state_) { case State::open: ret[jss::state] = "open"; break; case State::establish: ret[jss::state] = "consensus"; break; case State::finished: ret[jss::state] = "finished"; break; case State::accepted: ret[jss::state] = "accepted"; break; } int v = mDisputes.size (); if ((v != 0) && !full) ret["disputes"] = v; if (mOurPosition) ret["our_position"] = mOurPosition->getJson (); if (full) { ret["current_ms"] = mCurrentMSeconds; ret["close_percent"] = mClosePercent; ret["close_resolution"] = mCloseResolution; ret["have_time_consensus"] = mHaveCloseTimeConsensus; ret["previous_proposers"] = mPreviousProposers; ret["previous_mseconds"] = mPreviousMSeconds; if (!mPeerPositions.empty ()) { Json::Value ppj (Json::objectValue); for (auto& pp : mPeerPositions) { ppj[to_string (pp.first)] = pp.second->getJson (); } ret["peer_positions"] = ppj; } if (!mAcquired.empty ()) { // acquired Json::Value acq (Json::objectValue); for (auto& at : mAcquired) { if (at.second) acq[to_string (at.first)] = "acquired"; else acq[to_string (at.first)] = "failed"; } ret["acquired"] = acq; } if (!mDisputes.empty ()) { Json::Value dsj (Json::objectValue); for (auto& dt : mDisputes) { dsj[to_string (dt.first)] = dt.second->getJson (); } ret["disputes"] = dsj; } if (!mCloseTimes.empty ()) { Json::Value ctj (Json::objectValue); for (auto& ct : mCloseTimes) { ctj[beast::lexicalCastThrow (ct.first)] = ct.second; } ret["close_times"] = ctj; } if (!mDeadNodes.empty ()) { Json::Value dnj (Json::arrayValue); for (auto const& dn : mDeadNodes) { dnj.append (to_string (dn)); } ret["dead_nodes"] = dnj; } } return ret; } uint256 getLCL () { return mPrevLedgerHash; } /** We have a complete transaction set, typically acquired from the network @param hash hash of the transaction set. @param map the transaction set. @param acquired true if we have acquired the transaction set. */ void mapComplete (uint256 const& hash, std::shared_ptr const& map, bool acquired) { try { mapCompleteInternal (hash, map, acquired); } catch (SHAMapMissingNode const& mn) { leaveConsensus(); WriteLog (lsERROR, LedgerConsensus) << "Missing node processing complete map " << mn; throw; } } void mapCompleteInternal (uint256 const& hash, std::shared_ptr const& map, bool acquired) { CondLog (acquired, lsDEBUG, LedgerConsensus) << "We have acquired TXS " << hash; if (!map) // If the map was invalid { // this is an invalid/corrupt map mAcquired[hash] = map; WriteLog (lsWARNING, LedgerConsensus) << "A trusted node directed us to acquire an invalid TXN map"; return; } assert (hash == map->getHash ()); auto it = mAcquired.find (hash); // If we have already acquired this transaction set if (mAcquired.find (hash) != mAcquired.end ()) { if (it->second) { return; // we already have this map } // We previously failed to acquire this map, now we have it mAcquired.erase (hash); } // We now have a map that we did not have before if (!acquired) { // Put the map where others can get it inboundTransactions_.giveSet (hash, map, false); } // Inform directly-connected peers that we have this transaction set sendHaveTxSet (hash, true); if (mOurPosition && (!mOurPosition->isBowOut ()) && (hash != mOurPosition->getCurrentHash ())) { // this will create disputed transactions auto it2 = mAcquired.find (mOurPosition->getCurrentHash ()); if (it2 != mAcquired.end ()) { assert ((it2->first == mOurPosition->getCurrentHash ()) && it2->second); mCompares.insert(hash); // Our position is not the same as the acquired position createDisputes (it2->second, map); } else assert (false); // We don't have our own position?! } else WriteLog (lsDEBUG, LedgerConsensus) << "Not ready to create disputes"; mAcquired[hash] = map; // Adjust tracking for each peer that takes this position std::vector peers; for (auto& it : mPeerPositions) { if (it.second->getCurrentHash () == map->getHash ()) peers.push_back (it.second->getPeerID ()); } if (!peers.empty ()) { adjustCount (map, peers); } else { CondLog (acquired, lsWARNING, LedgerConsensus) << "By the time we got the map " << hash << " no peers were proposing it"; } } /** Check if our last closed ledger matches the network's. This tells us if we are still in sync with the network. This also helps us if we enter the consensus round with the wrong ledger, to leave it with the correct ledger so that we can participate in the next round. */ void checkLCL () { uint256 netLgr = mPrevLedgerHash; int netLgrCount = 0; uint256 favoredLedger = mPrevLedgerHash; // Don't jump forward uint256 priorLedger; if (mHaveCorrectLCL) priorLedger = mPreviousLedger->getParentHash (); // don't jump back // Get validators that are on our ledger, or "close" to being on // our ledger. hash_map vals = getApp().getValidations ().getCurrentValidations (favoredLedger, priorLedger); for (auto& it : vals) { if ((it.second.first > netLgrCount) || ((it.second.first == netLgrCount) && (it.first == mPrevLedgerHash))) { netLgr = it.first; netLgrCount = it.second.first; } } if (netLgr != mPrevLedgerHash) { // LCL change const char* status; switch (state_) { case State::open: status = "open"; break; case State::establish: status = "establish"; break; case State::finished: status = "finished"; break; case State::accepted: status = "accepted"; break; default: status = "unknown"; } WriteLog (lsWARNING, LedgerConsensus) << "View of consensus changed during " << status << " (" << netLgrCount << ") status=" << status << ", " << (mHaveCorrectLCL ? "CorrectLCL" : "IncorrectLCL"); WriteLog (lsWARNING, LedgerConsensus) << mPrevLedgerHash << " to " << netLgr; WriteLog (lsWARNING, LedgerConsensus) << ripple::getJson (*mPreviousLedger); if (ShouldLog (lsDEBUG, LedgerConsensus)) { for (auto& it : vals) { WriteLog (lsDEBUG, LedgerConsensus) << "V: " << it.first << ", " << it.second.first; } } if (mHaveCorrectLCL) getApp().getOPs ().consensusViewChange (); handleLCL (netLgr); } else if (mPreviousLedger->getHash () != mPrevLedgerHash) handleLCL (netLgr); } /** Change our view of the last closed ledger @param lclHash Hash of the last closed ledger. */ void handleLCL (uint256 const& lclHash) { assert ((lclHash != mPrevLedgerHash) || (mPreviousLedger->getHash () != lclHash)); if (mPrevLedgerHash != lclHash) { // first time switching to this ledger mPrevLedgerHash = lclHash; if (mHaveCorrectLCL && mProposing && mOurPosition) { WriteLog (lsINFO, LedgerConsensus) << "Bowing out of consensus"; mOurPosition->bowOut (); propose (); } // Stop proposing because we are out of sync mProposing = false; // mValidating = false; mPeerPositions.clear (); mDisputes.clear (); mCloseTimes.clear (); mDeadNodes.clear (); // To get back in sync: playbackProposals (); } if (mPreviousLedger->getHash () == mPrevLedgerHash) return; // we need to switch the ledger we're working from auto newLCL = getApp().getLedgerMaster ().getLedgerByHash (mPrevLedgerHash); if (!newLCL) { if (mAcquiringLedger != lclHash) { // need to start acquiring the correct consensus LCL WriteLog (lsWARNING, LedgerConsensus) << "Need consensus ledger " << mPrevLedgerHash; // Tell the ledger acquire system that we need the consensus ledger mAcquiringLedger = mPrevLedgerHash; getApp().getJobQueue().addJob (jtADVANCE, "getConsensusLedger", std::bind ( &InboundLedgers::acquire, &getApp().getInboundLedgers(), mPrevLedgerHash, 0, InboundLedger::fcCONSENSUS)); mHaveCorrectLCL = false; } return; } assert (newLCL->isClosed () && newLCL->isImmutable ()); assert (newLCL->getHash () == lclHash); mPreviousLedger = newLCL; mPrevLedgerHash = lclHash; WriteLog (lsINFO, LedgerConsensus) << "Have the consensus ledger " << mPrevLedgerHash; mHaveCorrectLCL = true; mCloseResolution = getNextLedgerTimeResolution ( mPreviousLedger->getCloseResolution (), mPreviousLedger->getCloseAgree (), mPreviousLedger->getLedgerSeq () + 1); } /** On timer call the correct handler for each state. */ void timerEntry () { try { doTimer(); } catch (SHAMapMissingNode const& mn) { leaveConsensus (); WriteLog (lsERROR, LedgerConsensus) << "Missing node during consensus process " << mn; throw; } } void doTimer () { if ((state_ != State::finished) && (state_ != State::accepted)) checkLCL (); mCurrentMSeconds = std::chrono::duration_cast (std::chrono::steady_clock::now() - mConsensusStartTime).count (); mClosePercent = mCurrentMSeconds * 100 / mPreviousMSeconds; switch (state_) { case State::open: statePreClose (); return; case State::establish: stateEstablish (); if (state_ != State::finished) return; // Fall through case State::finished: stateFinished (); if (state_ != State::accepted) return; // Fall through case State::accepted: stateAccepted (); return; } assert (false); } /** Handle pre-close state. */ void statePreClose () { // it is shortly before ledger close time bool anyTransactions = getApp().getLedgerMaster ().getCurrentLedger () ->peekTransactionMap ()->getHash ().isNonZero (); int proposersClosed = mPeerPositions.size (); int proposersValidated = getApp().getValidations ().getTrustedValidationCount (mPrevLedgerHash); // This ledger is open. This computes how long since last ledger closed int sinceClose; int idleInterval = 0; if (mHaveCorrectLCL && mPreviousLedger->getCloseAgree ()) { // we can use consensus timing sinceClose = 1000 * (getApp().getOPs ().getCloseTimeNC () - mPreviousLedger->getCloseTimeNC ()); idleInterval = 2 * mPreviousLedger->getCloseResolution (); if (idleInterval < LEDGER_IDLE_INTERVAL) idleInterval = LEDGER_IDLE_INTERVAL; } else { // Use the time we saw the last ledger close sinceClose = 1000 * (getApp().getOPs ().getCloseTimeNC () - getApp().getOPs ().getLastCloseTime ()); idleInterval = LEDGER_IDLE_INTERVAL; } idleInterval = std::max (idleInterval, LEDGER_IDLE_INTERVAL); idleInterval = std::max (idleInterval, 2 * mPreviousLedger->getCloseResolution ()); // Decide if we should close the ledger if (shouldCloseLedger (anyTransactions , mPreviousProposers, proposersClosed, proposersValidated , mPreviousMSeconds, sinceClose, mCurrentMSeconds , idleInterval)) { closeLedger (); } } /** We are establishing a consensus Update our position only on the timer, and in this state. If we have consensus, move to the finish state */ void stateEstablish () { // Give everyone a chance to take an initial position if (mCurrentMSeconds < LEDGER_MIN_CONSENSUS) return; updateOurPositions (); // Nothing to do if we don't have consensus. if (!haveConsensus ()) return; if (!mHaveCloseTimeConsensus) { WriteLog (lsINFO, LedgerConsensus) << "We have TX consensus but not CT consensus"; return; } WriteLog (lsINFO, LedgerConsensus) << "Converge cutoff (" << mPeerPositions.size () << " participants)"; state_ = State::finished; beginAccept (false); } void stateFinished () { // we are processing the finished ledger // logic of calculating next ledger advances us out of this state // nothing to do } void stateAccepted () { // we have accepted a new ledger endConsensus (); } /** Check if we've reached consensus */ bool haveConsensus () { // CHECKME: should possibly count unacquired TX sets as disagreeing int agree = 0, disagree = 0; uint256 ourPosition = mOurPosition->getCurrentHash (); // Count number of agreements/disagreements with our position for (auto& it : mPeerPositions) { if (!it.second->isBowOut ()) { if (it.second->getCurrentHash () == ourPosition) { ++agree; } else { WriteLog (lsDEBUG, LedgerConsensus) << to_string (it.first) << " has " << to_string (it.second->getCurrentHash ()); ++disagree; if (mCompares.count(it.second->getCurrentHash()) == 0) { // Make sure we have generated disputes uint256 hash = it.second->getCurrentHash(); WriteLog (lsDEBUG, LedgerConsensus) << "We have not compared to " << hash; auto it1 = mAcquired.find (hash); auto it2 = mAcquired.find(mOurPosition->getCurrentHash ()); if ((it1 != mAcquired.end()) && (it2 != mAcquired.end()) && (it1->second) && (it2->second)) { mCompares.insert(hash); createDisputes(it2->second, it1->second); } } } } } int currentValidations = getApp().getValidations () .getNodesAfter (mPrevLedgerHash); WriteLog (lsDEBUG, LedgerConsensus) << "Checking for TX consensus: agree=" << agree << ", disagree=" << disagree; // Determine if we actually have consensus or not auto ret = checkConsensus (mPreviousProposers, agree + disagree, agree, currentValidations, mPreviousMSeconds, mCurrentMSeconds); if (ret == ConsensusState::No) return false; // There is consensus, but we need to track if the network moved on // without us. if (ret == ConsensusState::MovedOn) mConsensusFail = true; else mConsensusFail = false; return true; } std::shared_ptr getTransactionTree (uint256 const& hash) { auto it = mAcquired.find (hash); if (it != mAcquired.end() && it->second) return it->second; auto set = inboundTransactions_.getSet (hash, true); if (set) mAcquired[hash] = set; return set; } /** A server has taken a new position, adjust our tracking Called when a peer takes a new postion. @param newPosition the new position @return true if we should do delayed relay of this position. */ bool peerPosition (LedgerProposal::ref newPosition) { auto const peerID = newPosition->getPeerID (); if (mDeadNodes.find (peerID) != mDeadNodes.end ()) { WriteLog (lsINFO, LedgerConsensus) << "Position from dead node: " << to_string (peerID); return false; } LedgerProposal::pointer& currentPosition = mPeerPositions[peerID]; if (currentPosition) { assert (peerID == currentPosition->getPeerID ()); if (newPosition->getProposeSeq () <= currentPosition->getProposeSeq ()) { return false; } } if (newPosition->isBowOut ()) { WriteLog (lsINFO, LedgerConsensus) << "Peer bows out: " << to_string (peerID); for (auto& it : mDisputes) it.second->unVote (peerID); mPeerPositions.erase (peerID); mDeadNodes.insert (peerID); return true; } if (newPosition->isInitial ()) { // Record the close time estimate WriteLog (lsTRACE, LedgerConsensus) << "Peer reports close time as " << newPosition->getCloseTime (); ++mCloseTimes[newPosition->getCloseTime ()]; } WriteLog (lsTRACE, LedgerConsensus) << "Processing peer proposal " << newPosition->getProposeSeq () << "/" << newPosition->getCurrentHash (); currentPosition = newPosition; std::shared_ptr set = getTransactionTree (newPosition->getCurrentHash ()); if (set) { for (auto& it : mDisputes) it.second->setVote (peerID, set->hasItem (it.first)); } else { WriteLog (lsDEBUG, LedgerConsensus) << "Don't have tx set for peer"; } return true; } void simulate () { WriteLog (lsINFO, LedgerConsensus) << "Simulating consensus"; closeLedger (); mCurrentMSeconds = 100; beginAccept (true); endConsensus (); WriteLog (lsINFO, LedgerConsensus) << "Simulation complete"; } private: /** We have a new last closed ledger, process it. Final accept logic @param set Our consensus set */ void accept (std::shared_ptr set) { { auto lock = beast::make_lock(getApp().getMasterMutex()); // put our set where others can get it later if (set->getHash ().isNonZero ()) getApp().getOPs ().takePosition ( mPreviousLedger->getLedgerSeq (), set); assert (set->getHash () == mOurPosition->getCurrentHash ()); // these are now obsolete getApp().getOPs ().peekStoredProposals ().clear (); } std::uint32_t closeTime = roundCloseTime ( mOurPosition->getCloseTime (), mCloseResolution); bool closeTimeCorrect = true; if (closeTime == 0) { // we agreed to disagree closeTimeCorrect = false; closeTime = mPreviousLedger->getCloseTimeNC () + 1; } WriteLog (lsDEBUG, LedgerConsensus) << "Report: Prop=" << (mProposing ? "yes" : "no") << " val=" << (mValidating ? "yes" : "no") << " corLCL=" << (mHaveCorrectLCL ? "yes" : "no") << " fail=" << (mConsensusFail ? "yes" : "no"); WriteLog (lsDEBUG, LedgerConsensus) << "Report: Prev = " << mPrevLedgerHash << ":" << mPreviousLedger->getLedgerSeq (); WriteLog (lsDEBUG, LedgerConsensus) << "Report: TxSt = " << set->getHash () << ", close " << closeTime << (closeTimeCorrect ? "" : "X"); // Put failed transactions into a deterministic order CanonicalTXSet retriableTransactions (set->getHash ()); // Build the new last closed ledger auto newLCL = std::make_shared (false, *mPreviousLedger); // Set up to write SHAMap changes to our database, // perform updates, extract changes WriteLog (lsDEBUG, LedgerConsensus) << "Applying consensus set transactions to the" << " last closed ledger"; applyTransactions (set, newLCL, newLCL, retriableTransactions, false); newLCL->updateSkipList (); newLCL->setClosed (); int asf = newLCL->peekAccountStateMap ()->flushDirty ( hotACCOUNT_NODE, newLCL->getLedgerSeq()); int tmf = newLCL->peekTransactionMap ()->flushDirty ( hotTRANSACTION_NODE, newLCL->getLedgerSeq()); WriteLog (lsDEBUG, LedgerConsensus) << "Flushed " << asf << " account and " << tmf << "transaction nodes"; // Accept ledger newLCL->setAccepted (closeTime, mCloseResolution, closeTimeCorrect); // And stash the ledger in the ledger master if (getApp().getLedgerMaster().storeLedger (newLCL)) WriteLog (lsDEBUG, LedgerConsensus) << "Consensus built ledger we already had"; else if (getApp().getInboundLedgers().find (newLCL->getHash())) WriteLog (lsDEBUG, LedgerConsensus) << "Consensus built ledger we were acquiring"; else WriteLog (lsDEBUG, LedgerConsensus) << "Consensus built new ledger"; WriteLog (lsDEBUG, LedgerConsensus) << "Report: NewL = " << newLCL->getHash () << ":" << newLCL->getLedgerSeq (); uint256 newLCLHash = newLCL->getHash (); // Tell directly connected peers that we have a new LCL statusChange (protocol::neACCEPTED_LEDGER, *newLCL); if (mValidating && !mConsensusFail) { // Build validation uint256 signingHash; STValidation::pointer v = std::make_shared (newLCLHash, getApp().getOPs ().getValidationTimeNC () , mValPublic, mProposing); v->setFieldU32 (sfLedgerSequence, newLCL->getLedgerSeq ()); addLoad(v); // Our network load if (((newLCL->getLedgerSeq () + 1) % 256) == 0) // next ledger is flag ledger { // Suggest fee changes and new features m_feeVote.doValidation (newLCL, *v); getApp().getAmendmentTable ().doValidation (newLCL, *v); } signingHash = v->sign (mValPrivate); v->setTrusted (); // suppress it if we receive it - FIXME: wrong suppression getApp().getHashRouter ().addSuppression (signingHash); getApp().getValidations ().addValidation (v, "local"); getApp().getOPs ().setLastValidation (v); Blob validation = v->getSigned (); protocol::TMValidation val; val.set_validation (&validation[0], validation.size ()); // Send signed validation to all of our directly connected peers getApp().overlay().send(val); WriteLog (lsINFO, LedgerConsensus) << "CNF Val " << newLCLHash; } else WriteLog (lsINFO, LedgerConsensus) << "CNF newLCL " << newLCLHash; // See if we can accept a ledger as fully-validated getApp().getLedgerMaster().consensusBuilt (newLCL); // Build new open ledger auto newOL = std::make_shared (true, *newLCL); // Apply disputed transactions that didn't get in TransactionEngine engine (newOL); bool anyDisputes = false; for (auto& it : mDisputes) { if (!it.second->getOurVote ()) { // we voted NO try { WriteLog (lsDEBUG, LedgerConsensus) << "Test applying disputed transaction that did" << " not get in"; SerialIter sit (it.second->peekTransaction ()); STTx::pointer txn = std::make_shared(sit); retriableTransactions.push_back (txn); anyDisputes = true; } catch (...) { WriteLog (lsDEBUG, LedgerConsensus) << "Failed to apply transaction we voted NO on"; } } } if (anyDisputes) { applyTransactions (std::shared_ptr(), newOL, newLCL, retriableTransactions, true); } { auto lock = beast::make_lock(getApp().getMasterMutex(), std::defer_lock); LedgerMaster::ScopedLockType sl (getApp().getLedgerMaster ().peekMutex (), std::defer_lock); std::lock(lock, sl); // Apply transactions from the old open ledger Ledger::pointer oldOL = getApp().getLedgerMaster().getCurrentLedger(); if (oldOL->peekTransactionMap()->getHash().isNonZero ()) { WriteLog (lsDEBUG, LedgerConsensus) << "Applying transactions from current open ledger"; applyTransactions (oldOL->peekTransactionMap (), newOL, newLCL, retriableTransactions, true); } // Apply local transactions TransactionEngine engine (newOL); m_localTX.apply (engine); // We have a new Last Closed Ledger and new Open Ledger getApp().getLedgerMaster ().pushLedger (newLCL, newOL); } mNewLedgerHash = newLCL->getHash (); state_ = State::accepted; if (mValidating) { // see how close our close time is to other node's // close time reports, and update our clock. WriteLog (lsINFO, LedgerConsensus) << "We closed at " << mCloseTime; std::uint64_t closeTotal = mCloseTime; int closeCount = 1; for (auto it = mCloseTimes.begin () , end = mCloseTimes.end (); it != end; ++it) { // FIXME: Use median, not average WriteLog (lsINFO, LedgerConsensus) << beast::lexicalCastThrow (it->second) << " time votes for " << beast::lexicalCastThrow (it->first); closeCount += it->second; closeTotal += static_cast (it->first) * static_cast (it->second); } closeTotal += (closeCount / 2); closeTotal /= closeCount; int offset = static_cast (closeTotal) - static_cast (mCloseTime); WriteLog (lsINFO, LedgerConsensus) << "Our close offset is estimated at " << offset << " (" << closeCount << ")"; getApp().getOPs ().closeTimeOffset (offset); } } /** Compare two proposed transaction sets and create disputed transctions structures for any mismatches @param m1 One transaction set @param m2 The other transaction set */ void createDisputes (std::shared_ptr const& m1, std::shared_ptr const& m2) { if (m1->getHash() == m2->getHash()) return; WriteLog (lsDEBUG, LedgerConsensus) << "createDisputes " << m1->getHash() << " to " << m2->getHash(); SHAMap::Delta differences; m1->compare (m2, differences, 16384); int dc = 0; // for each difference between the transactions for (auto& pos : differences) { ++dc; // create disputed transactions (from the ledger that has them) if (pos.second.first) { // transaction is only in first map assert (!pos.second.second); addDisputedTransaction (pos.first , pos.second.first->peekData ()); } else if (pos.second.second) { // transaction is only in second map assert (!pos.second.first); addDisputedTransaction (pos.first , pos.second.second->peekData ()); } else // No other disagreement over a transaction should be possible assert (false); } WriteLog (lsDEBUG, LedgerConsensus) << dc << " differences found"; } /** Add a disputed transaction (one that at least one node wants in the consensus set and at least one node does not) to our tracking @param txID The ID of the disputed transaction @param tx The data of the disputed transaction */ void addDisputedTransaction (uint256 const& txID, Blob const& tx) { if (mDisputes.find (txID) != mDisputes.end ()) return; WriteLog (lsDEBUG, LedgerConsensus) << "Transaction " << txID << " is disputed"; bool ourVote = false; // Update our vote on the disputed transaction if (mOurPosition) { auto mit (mAcquired.find (mOurPosition->getCurrentHash ())); if (mit != mAcquired.end ()) ourVote = mit->second->hasItem (txID); else assert (false); // We don't have our own position? } DisputedTx::pointer txn = std::make_shared (txID, tx, ourVote); mDisputes[txID] = txn; // Update all of the peer's votes on the disputed transaction for (auto& pit : mPeerPositions) { auto cit (mAcquired.find (pit.second->getCurrentHash ())); if ((cit != mAcquired.end ()) && cit->second) { txn->setVote (pit.first, cit->second->hasItem (txID)); } } // If we didn't relay this transaction recently, relay it if (getApp().getHashRouter ().setFlag (txID, SF_RELAYED)) { protocol::TMTransaction msg; msg.set_rawtransaction (& (tx.front ()), tx.size ()); msg.set_status (protocol::tsNEW); msg.set_receivetimestamp (getApp().getOPs ().getNetworkTimeNC ()); getApp ().overlay ().foreach (send_always ( std::make_shared ( msg, protocol::mtTRANSACTION))); } } /** Adjust the votes on all disputed transactions based on the set of peers taking this position @param map A disputed position @param peers peers which are taking the position map */ void adjustCount (std::shared_ptr const& map, const std::vector& peers) { for (auto& it : mDisputes) { bool setHas = map->hasItem (it.second->getTransactionID ()); for (auto const& pit : peers) it.second->setVote (pit, setHas); } } /** Revoke our outstanding proposal, if any, and cease proposing at least until this round ends */ void leaveConsensus () { if (mProposing) { if (mOurPosition && ! mOurPosition->isBowOut ()) { mOurPosition->bowOut(); propose(); } mProposing = false; } } /** Make and send a proposal */ void propose () { WriteLog (lsTRACE, LedgerConsensus) << "We propose: " << (mOurPosition->isBowOut () ? std::string ("bowOut") : to_string (mOurPosition->getCurrentHash ())); protocol::TMProposeSet prop; prop.set_currenttxhash (mOurPosition->getCurrentHash ().begin () , 256 / 8); prop.set_previousledger (mOurPosition->getPrevLedger ().begin () , 256 / 8); prop.set_proposeseq (mOurPosition->getProposeSeq ()); prop.set_closetime (mOurPosition->getCloseTime ()); Blob const pubKey = mValPublic.getNodePublic (); prop.set_nodepubkey (&pubKey[0], pubKey.size ()); Blob const sig = mOurPosition->sign (mValPrivate); prop.set_signature (&sig[0], sig.size ()); getApp().overlay().send(prop); } /** Let peers know that we a particular transactions set so they can fetch it from us. @param hash The ID of the transaction. @param direct true if we have this transaction set locally, else a directly connected peer has it. */ void sendHaveTxSet (uint256 const& hash, bool direct) { protocol::TMHaveTransactionSet msg; msg.set_hash (hash.begin (), 256 / 8); msg.set_status (direct ? protocol::tsHAVE : protocol::tsCAN_GET); getApp ().overlay ().foreach (send_always ( std::make_shared ( msg, protocol::mtHAVE_SET))); } /** Send a node status change message to our directly connected peers @param event The event which caused the status change. This is typically neACCEPTED_LEDGER or neCLOSING_LEDGER. @param ledger The ledger associated with the event. */ void statusChange (protocol::NodeEvent event, Ledger& ledger) { protocol::TMStatusChange s; if (!mHaveCorrectLCL) s.set_newevent (protocol::neLOST_SYNC); else s.set_newevent (event); s.set_ledgerseq (ledger.getLedgerSeq ()); s.set_networktime (getApp().getOPs ().getNetworkTimeNC ()); s.set_ledgerhashprevious(ledger.getParentHash ().begin (), std::decay_t::bytes); s.set_ledgerhash (ledger.getHash ().begin (), std::decay_t::bytes); std::uint32_t uMin, uMax; if (!getApp().getOPs ().getFullValidatedRange (uMin, uMax)) { uMin = 0; uMax = 0; } else { // Don't advertise ledgers we're not willing to serve std::uint32_t early = getApp().getLedgerMaster().getEarliestFetch (); if (uMin < early) uMin = early; } s.set_firstseq (uMin); s.set_lastseq (uMax); getApp ().overlay ().foreach (send_always ( std::make_shared ( s, protocol::mtSTATUS_CHANGE))); WriteLog (lsTRACE, LedgerConsensus) << "send status change to peer"; } /** Take an initial position on what we think the consensus should be based on the transactions that made it into our open ledger @param initialLedger The ledger that contains our initial position. */ void takeInitialPosition (Ledger& initialLedger) { std::shared_ptr initialSet; if ((getConfig ().RUN_STANDALONE || (mProposing && mHaveCorrectLCL)) && ((mPreviousLedger->getLedgerSeq () % 256) == 0)) { // previous ledger was flag ledger std::shared_ptr preSet = initialLedger.peekTransactionMap ()->snapShot (true); m_feeVote.doVoting (mPreviousLedger, preSet); getApp().getAmendmentTable ().doVoting (mPreviousLedger, preSet); initialSet = preSet->snapShot (false); } else initialSet = initialLedger.peekTransactionMap ()->snapShot (false); // Tell the ledger master not to acquire the ledger we're probably building getApp().getLedgerMaster().setBuildingLedger (mPreviousLedger->getLedgerSeq () + 1); uint256 txSet = initialSet->getHash (); WriteLog (lsINFO, LedgerConsensus) << "initial position " << txSet; mapCompleteInternal (txSet, initialSet, false); mOurPosition = std::make_shared (mValPublic, initialLedger.getParentHash (), txSet, mCloseTime); for (auto& it : mDisputes) { it.second->setOurVote (initialLedger.hasTransaction (it.first)); } // if any peers have taken a contrary position, process disputes hash_set found; for (auto& it : mPeerPositions) { uint256 set = it.second->getCurrentHash (); if (found.insert (set).second) { auto iit (mAcquired.find (set)); if (iit != mAcquired.end ()) { mCompares.insert(iit->second->getHash()); createDisputes (initialSet, iit->second); } } } if (mProposing) propose (); } /** How many of the participants must agree to reach a given threshold? Note that the number may not precisely yield the requested percentage. For example, with with size = 5 and percent = 70, we return 3, but 3 out of 5 works out to 60%. There are no security implications to this. @param participants the number of participants (i.e. validators) @param the percent that we want to reach @return the number of participants which must agree */ static int participantsNeeded (int participants, int percent) { int result = ((participants * percent) + (percent / 2)) / 100; return (result == 0) ? 1 : result; } /** Called while trying to avalanche towards consensus. Adjusts our positions to try to agree with other validators. */ void updateOurPositions () { // Compute a cutoff time auto peerCutoff = std::chrono::steady_clock::now (); auto ourCutoff = peerCutoff - std::chrono::seconds (PROPOSE_INTERVAL); peerCutoff -= std::chrono::seconds (PROPOSE_FRESHNESS); bool changes = false; std::shared_ptr ourPosition; // std::vector addedTx, removedTx; // Verify freshness of peer positions and compute close times std::map closeTimes; auto it = mPeerPositions.begin (); while (it != mPeerPositions.end ()) { if (it->second->isStale (peerCutoff)) { // peer's proposal is stale, so remove it auto const& peerID = it->second->getPeerID (); WriteLog (lsWARNING, LedgerConsensus) << "Removing stale proposal from " << peerID; for (auto& dt : mDisputes) dt.second->unVote (peerID); it = mPeerPositions.erase (it); } else { // proposal is still fresh ++closeTimes[roundCloseTime (it->second->getCloseTime (), mCloseResolution)]; ++it; } } // Update votes on disputed transactions for (auto& it : mDisputes) { // Because the threshold for inclusion increases, // time can change our position on a dispute if (it.second->updateVote (mClosePercent, mProposing)) { if (!changes) { ourPosition = mAcquired[mOurPosition->getCurrentHash ()] ->snapShot (true); assert (ourPosition); changes = true; } if (it.second->getOurVote ()) // now a yes { ourPosition->addItem (SHAMapItem (it.first , it.second->peekTransaction ()), true, false); // addedTx.push_back(it.first); } else // now a no { ourPosition->delItem (it.first); // removedTx.push_back(it.first); } } } int neededWeight; if (mClosePercent < AV_MID_CONSENSUS_TIME) neededWeight = AV_INIT_CONSENSUS_PCT; else if (mClosePercent < AV_LATE_CONSENSUS_TIME) neededWeight = AV_MID_CONSENSUS_PCT; else if (mClosePercent < AV_STUCK_CONSENSUS_TIME) neededWeight = AV_LATE_CONSENSUS_PCT; else neededWeight = AV_STUCK_CONSENSUS_PCT; std::uint32_t closeTime = 0; mHaveCloseTimeConsensus = false; if (mPeerPositions.empty ()) { // no other times mHaveCloseTimeConsensus = true; closeTime = roundCloseTime (mOurPosition->getCloseTime (), mCloseResolution); } else { int participants = mPeerPositions.size (); if (mProposing) { ++closeTimes[roundCloseTime (mOurPosition->getCloseTime (), mCloseResolution)]; ++participants; } // Threshold for non-zero vote int threshVote = participantsNeeded (participants, neededWeight); // Threshold to declare consensus int const threshConsensus = participantsNeeded ( participants, AV_CT_CONSENSUS_PCT); WriteLog (lsINFO, LedgerConsensus) << "Proposers:" << mPeerPositions.size () << " nw:" << neededWeight << " thrV:" << threshVote << " thrC:" << threshConsensus; for (auto it = closeTimes.begin () , end = closeTimes.end (); it != end; ++it) { WriteLog (lsDEBUG, LedgerConsensus) << "CCTime: seq" << mPreviousLedger->getLedgerSeq () + 1 << ": " << it->first << " has " << it->second << ", " << threshVote << " required"; if (it->second >= threshVote) { WriteLog (lsDEBUG, LedgerConsensus) << "Close time consensus reached: " << it->first; closeTime = it->first; threshVote = it->second; if (threshVote >= threshConsensus) mHaveCloseTimeConsensus = true; } } // If we agree to disagree on the close time, don't delay consensus if (!mHaveCloseTimeConsensus && (closeTimes[0] > threshConsensus)) { closeTime = 0; mHaveCloseTimeConsensus = true; } CondLog (!mHaveCloseTimeConsensus, lsDEBUG, LedgerConsensus) << "No CT consensus: Proposers:" << mPeerPositions.size () << " Proposing:" << (mProposing ? "yes" : "no") << " Thresh:" << threshConsensus << " Pos:" << closeTime; } if (!changes && ((closeTime != roundCloseTime (mOurPosition->getCloseTime (), mCloseResolution)) || mOurPosition->isStale (ourCutoff))) { // close time changed or our position is stale ourPosition = mAcquired[mOurPosition->getCurrentHash ()] ->snapShot (true); assert (ourPosition); changes = true; // We pretend our position changed to force } // a new proposal if (changes) { uint256 newHash = ourPosition->getHash (); WriteLog (lsINFO, LedgerConsensus) << "Position change: CTime " << closeTime << ", tx " << newHash; if (mOurPosition->changePosition (newHash, closeTime)) { if (mProposing) propose (); mapCompleteInternal (newHash, ourPosition, false); } } } /** If we radically changed our consensus context for some reason, we need to replay recent proposals so that they're not lost. */ void playbackProposals () { for (auto const& it: getApp().getOPs ().peekStoredProposals ()) { bool relay = false; for (auto const& proposal : it.second) { if (proposal->isPrevLedger (mPrevLedgerHash)) relay = peerPosition (proposal); if (relay) { WriteLog (lsWARNING, LedgerConsensus) << "We should do delayed relay of this proposal," << " but we cannot"; } } } } /** We have just decided to close the ledger. Start the consensus timer, stash the close time, inform peers, and take a position */ void closeLedger () { checkOurValidation (); state_ = State::establish; mConsensusStartTime = std::chrono::steady_clock::now (); mCloseTime = getApp().getOPs ().getCloseTimeNC (); getApp().getOPs ().setLastCloseTime (mCloseTime); statusChange (protocol::neCLOSING_LEDGER, *mPreviousLedger); getApp().getLedgerMaster().applyHeldTransactions (); takeInitialPosition (*getApp().getLedgerMaster ().getCurrentLedger ()); } /** If we missed a consensus round, we may be missing a validation. This will send an older owed validation if we previously missed it. */ void checkOurValidation () { // This only covers some cases - Fix for the case where we can't ever // acquire the consensus ledger if (!mHaveCorrectLCL || !mValPublic.isSet () || !mValPrivate.isSet () || getApp().getOPs ().isNeedNetworkLedger ()) { return; } STValidation::pointer lastVal = getApp().getOPs ().getLastValidation (); if (lastVal) { if (lastVal->getFieldU32 (sfLedgerSequence) == mPreviousLedger->getLedgerSeq ()) { return; } if (lastVal->getLedgerHash () == mPrevLedgerHash) return; } uint256 signingHash; STValidation::pointer v = std::make_shared (mPreviousLedger->getHash () , getApp().getOPs ().getValidationTimeNC (), mValPublic, false); addLoad(v); v->setTrusted (); signingHash = v->sign (mValPrivate); // FIXME: wrong supression getApp().getHashRouter ().addSuppression (signingHash); getApp().getValidations ().addValidation (v, "localMissing"); Blob validation = v->getSigned (); protocol::TMValidation val; val.set_validation (&validation[0], validation.size ()); getApp().getOPs ().setLastValidation (v); WriteLog (lsWARNING, LedgerConsensus) << "Sending partial validation"; } /** We have a new LCL and must accept it */ void beginAccept (bool synchronous) { std::shared_ptr consensusSet = mAcquired[mOurPosition->getCurrentHash ()]; if (!consensusSet) { WriteLog (lsFATAL, LedgerConsensus) << "We don't have a consensus set"; abort (); return; } getApp().getOPs ().newLCL (mPeerPositions.size (), mCurrentMSeconds, mNewLedgerHash); if (synchronous) accept (consensusSet); else { getApp().getJobQueue().addJob (jtACCEPT, "acceptLedger", std::bind (&LedgerConsensusImp::accept, shared_from_this (), consensusSet)); } } void endConsensus () { getApp().getOPs ().endConsensus (mHaveCorrectLCL); } /** Add our load fee to our validation */ void addLoad(STValidation::ref val) { std::uint32_t fee = std::max( getApp().getFeeTrack().getLocalFee(), getApp().getFeeTrack().getClusterFee()); std::uint32_t ref = getApp().getFeeTrack().getLoadBase(); if (fee > ref) val->setFieldU32(sfLoadFee, fee); } private: InboundTransactions& inboundTransactions_; LocalTxs& m_localTX; FeeVote& m_feeVote; State state_; std::uint32_t mCloseTime; // The wall time this ledger closed uint256 mPrevLedgerHash, mNewLedgerHash, mAcquiringLedger; Ledger::pointer mPreviousLedger; LedgerProposal::pointer mOurPosition; RippleAddress mValPublic, mValPrivate; bool mProposing, mValidating, mHaveCorrectLCL, mConsensusFail; int mCurrentMSeconds; // How long the close has taken, expressed as a percentage of the time that // we expected it to take. int mClosePercent; int mCloseResolution; bool mHaveCloseTimeConsensus; std::chrono::steady_clock::time_point mConsensusStartTime; int mPreviousProposers; // The time it took for the last consensus process to converge int mPreviousMSeconds; // Convergence tracking, trusted peers indexed by hash of public key hash_map mPeerPositions; // Transaction Sets, indexed by hash of transaction tree hash_map> mAcquired; // Disputed transactions hash_map mDisputes; hash_set mCompares; // Close time estimates std::map mCloseTimes; // nodes that have bowed out of this consensus process hash_set mDeadNodes; }; //------------------------------------------------------------------------------ LedgerConsensus::~LedgerConsensus () { } std::shared_ptr make_LedgerConsensus (int previousProposers, int previousConvergeTime, InboundTransactions& inboundTransactions, LocalTxs& localtx, LedgerHash const &prevLCLHash, Ledger::ref previousLedger, std::uint32_t closeTime, FeeVote& feeVote) { return std::make_shared (previousProposers, previousConvergeTime, inboundTransactions, localtx, prevLCLHash, previousLedger, closeTime, feeVote); } /** Apply a transaction to a ledger @param engine The transaction engine containing the ledger. @param txn The transaction to be applied to ledger. @param openLedger true if ledger is open @param retryAssured true if the transaction should be retried on failure. @return One of resultSuccess, resultFail or resultRetry. */ static int applyTransaction (TransactionEngine& engine , STTx::ref txn, bool openLedger, bool retryAssured) { // Returns false if the transaction has need not be retried. TransactionEngineParams parms = openLedger ? tapOPEN_LEDGER : tapNONE; if (retryAssured) { parms = static_cast (parms | tapRETRY); } if ((getApp().getHashRouter ().getFlags (txn->getTransactionID ()) & SF_SIGGOOD) == SF_SIGGOOD) { parms = static_cast (parms | tapNO_CHECK_SIGN); } WriteLog (lsDEBUG, LedgerConsensus) << "TXN " << txn->getTransactionID () << (openLedger ? " open" : " closed") << (retryAssured ? "/retry" : "/final"); WriteLog (lsTRACE, LedgerConsensus) << txn->getJson (0); try { auto result = engine.applyTransaction (*txn, parms); if (result.second) { WriteLog (lsDEBUG, LedgerConsensus) << "Transaction applied: " << transHuman (result.first); return LedgerConsensusImp::resultSuccess; } if (isTefFailure (result.first) || isTemMalformed (result.first) || isTelLocal (result.first)) { // failure WriteLog (lsDEBUG, LedgerConsensus) << "Transaction failure: " << transHuman (result.first); return LedgerConsensusImp::resultFail; } WriteLog (lsDEBUG, LedgerConsensus) << "Transaction retry: " << transHuman (result.first); return LedgerConsensusImp::resultRetry; } catch (...) { WriteLog (lsWARNING, LedgerConsensus) << "Throws"; return LedgerConsensusImp::resultFail; } } /** Apply a set of transactions to a ledger @param set The set of transactions to apply @param applyLedger The ledger to which the transactions should be applied. @param checkLedger A reference ledger for determining error messages (typically new last closed ledger). @param retriableTransactions collect failed transactions in this set @param openLgr true if applyLedger is open, else false. */ void applyTransactions (std::shared_ptr const& set, Ledger::ref applyLedger, Ledger::ref checkLedger, CanonicalTXSet& retriableTransactions, bool openLgr) { TransactionEngine engine (applyLedger); if (set) { for (std::shared_ptr item = set->peekFirstItem (); !!item; item = set->peekNextItem (item->getTag ())) { // If the checkLedger doesn't have the transaction if (!checkLedger->hasTransaction (item->getTag ())) { // Then try to apply the transaction to applyLedger WriteLog (lsDEBUG, LedgerConsensus) << "Processing candidate transaction: " << item->getTag (); try { SerialIter sit (item->peekSerializer ()); STTx::pointer txn = std::make_shared(sit); if (applyTransaction (engine, txn, openLgr, true) == LedgerConsensusImp::resultRetry) { // On failure, stash the failed transaction for // later retry. retriableTransactions.push_back (txn); } } catch (...) { WriteLog (lsWARNING, LedgerConsensus) << " Throws"; } } } } int changes; bool certainRetry = true; // Attempt to apply all of the retriable transactions for (int pass = 0; pass < LEDGER_TOTAL_PASSES; ++pass) { WriteLog (lsDEBUG, LedgerConsensus) << "Pass: " << pass << " Txns: " << retriableTransactions.size () << (certainRetry ? " retriable" : " final"); changes = 0; auto it = retriableTransactions.begin (); while (it != retriableTransactions.end ()) { try { switch (applyTransaction (engine, it->second, openLgr, certainRetry)) { case LedgerConsensusImp::resultSuccess: it = retriableTransactions.erase (it); ++changes; break; case LedgerConsensusImp::resultFail: it = retriableTransactions.erase (it); break; case LedgerConsensusImp::resultRetry: ++it; } } catch (...) { WriteLog (lsWARNING, LedgerConsensus) << "Transaction throws"; it = retriableTransactions.erase (it); } } WriteLog (lsDEBUG, LedgerConsensus) << "Pass: " << pass << " finished " << changes << " changes"; // A non-retry pass made no changes if (!changes && !certainRetry) return; // Stop retriable passes if ((!changes) || (pass >= LEDGER_RETRY_PASSES)) certainRetry = false; } // If there are any transactions left, we must have // tried them in at least one final pass assert (retriableTransactions.empty() || !certainRetry); } } // ripple