From 403f15dc48575a4ab66b87e8e72bdfc564fd2235 Mon Sep 17 00:00:00 2001 From: Howard Hinnant Date: Wed, 23 Jul 2014 16:50:55 -0400 Subject: [PATCH] Documentation for Ledger Consensus implementation (RIPD-405) --- .../module/app/consensus/LedgerConsensus.cpp | 332 ++++++++++++++---- 1 file changed, 262 insertions(+), 70 deletions(-) diff --git a/src/ripple/module/app/consensus/LedgerConsensus.cpp b/src/ripple/module/app/consensus/LedgerConsensus.cpp index 0029b7411..baabb6cc8 100644 --- a/src/ripple/module/app/consensus/LedgerConsensus.cpp +++ b/src/ripple/module/app/consensus/LedgerConsensus.cpp @@ -22,16 +22,46 @@ namespace ripple { +/** + Provides the implementation for LedgerConsensus. + + Achieves consensus on the next ledger. + This object is created when the consensus process starts, and + is destroyed when the process is complete. + + Nearly everything herein is invoked with the master lock. + + Two things need consensus: + 1. The set of transactions. + 2. The close time for the ledger. +*/ class LedgerConsensusImp : public LedgerConsensus , public std::enable_shared_from_this , public CountedObject { public: + /** + * The result of applying a transaction to a ledger. + */ enum {resultSuccess, resultFail, resultRetry}; static char const* getCountedObjectName () { return "LedgerConsensus"; } + LedgerConsensusImp(LedgerConsensusImp const&) = delete; + LedgerConsensusImp& operator=(LedgerConsensusImp const&) = delete; + + /** + The result of applying a transaction to a ledger. + + @param clock The clock which will be used to measure time. + @param localtx A set of local transactions to apply. + @param prevLCLHash The hash of the Last Closed Ledger (LCL). + @param previousLedger Best guess of what the Last Closed Ledger (LCL) + was. + @param closeTime Closing time point of the LCL. + @param feeVote Our recommendation for the voting fee. + */ LedgerConsensusImp (clock_type& clock, LocalTxs& localtx, LedgerHash const & prevLCLHash, Ledger::ref previousLedger, std::uint32_t closeTime, FeeVote& feeVote) @@ -58,6 +88,7 @@ public: mPreviousMSeconds = getApp().getOPs ().getPreviousConvergeTime (); assert (mPreviousMSeconds); + // Adapt close time resolution to recent network conditions mCloseResolution = ContinuousLedgerTiming::getNextLedgerTimeResolution ( mPreviousLedger->getCloseResolution (), mPreviousLedger->getCloseAgree (), @@ -66,14 +97,18 @@ public: if (mValPublic.isSet () && mValPrivate.isSet () && !getApp().getOPs ().isNeedNetworkLedger ()) { + // If the validation keys were set, and if we need a ledger, + // then we want to validate, and possibly propose a ledger. WriteLog (lsINFO, LedgerConsensus) << "Entering consensus process, validating"; mValidating = true; + // Propose if we are in sync with the network mProposing = getApp().getOPs ().getOperatingMode () == NetworkOPs::omFULL; } else { + // Otherwise we just want to monitor the validation process. WriteLog (lsINFO, LedgerConsensus) << "Entering consensus process, watching"; mProposing = mValidating = false; @@ -83,6 +118,8 @@ public: if (!mHaveCorrectLCL) { + // If we were not handed the correct LCL, then set our state + // to not proposing. getApp().getOPs ().setProposing (false, false); handleLCL (mPrevLedgerHash); @@ -96,14 +133,27 @@ public: << "Correct LCL is: " << prevLCLHash; } } - else + else // update the network status table as to whether we're proposing/validating getApp().getOPs ().setProposing (mProposing, mValidating); } + + /** + This function is called, but its return value is always ignored. + + @return 1. + */ int startup () { return 1; } + /** + Get the Json state of the consensus process. + Called by the consensus_info RPC. + + @param full True if verbose response desired. + @return The Json state. + */ Json::Value getJson (bool full) { Json::Value ret (Json::objectValue); @@ -236,8 +286,16 @@ public: return mPrevLedgerHash; } - /** Get a transaction tree, - fetching it from the network is required and requested + /** + Get a transaction tree, fetching it from the network if required and + requested. When the transaction acquire engine successfully acquires + a transaction set, it will call back. + + @param hash hash of the requested transaction tree. + @param doAcquire true if we should get this from the network if we don't + already have it. + @return Pointer to the transaction tree if we got it, else + nullptr. */ SHAMap::pointer getTransactionTree (uint256 const& hash, bool doAcquire) { @@ -287,14 +345,19 @@ public: return SHAMap::pointer (); } - /** We have a complete transaction set, typically acquired from the network + /** + We have a complete transaction set, typically acquired from the network + + @param hash hash of the transaction set. + @param map the transaction set. + @param acquired true if we have acquired the transaction set. */ void mapComplete (uint256 const& hash, SHAMap::ref map, bool acquired) { CondLog (acquired, lsINFO, LedgerConsensus) << "We have acquired TXS " << hash; - if (!map) + if (!map) // if the transaction failed { // this is an invalid/corrupt map mAcquired[hash] = map; @@ -308,6 +371,7 @@ public: auto it = mAcquired.find (hash); + // If we have already acquired this transaction set if (mAcquired.find (hash) != mAcquired.end ()) { if (it->second) @@ -320,10 +384,12 @@ public: mAcquired.erase (hash); } + // We now have a map that we did not have before + if (mOurPosition && (!mOurPosition->isBowOut ()) && (hash != mOurPosition->getCurrentHash ())) { - // this could create disputed transactions + // this will create disputed transactions auto it2 = mAcquired.find (mOurPosition->getCurrentHash ()); if (it2 != mAcquired.end ()) @@ -331,6 +397,7 @@ public: assert ((it2->first == mOurPosition->getCurrentHash ()) && it2->second); mCompares.insert(hash); + // Our position is not the same as the acquired position createDisputes (it2->second, map); } else @@ -362,13 +429,18 @@ public: << hash << " no peers were proposing it"; } + // Send our transaction set to directly connected peers sendHaveTxSet (hash, true); } - /** Determine if we still need to acquire a transaction set from network. - If a transaction set is popular, we probably have it. If it's unpopular, - we probably don't need it (and the peer that initially made us - retrieve it has probably already changed its position) + /** + Determine if we still need to acquire a transaction set from network. + If a transaction set is popular, we probably have it. If it's unpopular, + we probably don't need it (and the peer that initially made us + retrieve it has probably already changed its position). + + @param hash hash of the transaction set. + @return true if we need to acquire it, else false. */ bool stillNeedTXSet (uint256 const& hash) { @@ -383,7 +455,12 @@ public: return false; } - /** Check if our last closed ledger matches the network's + /** + Check if our last closed ledger matches the network's. + This tells us if we are still in sync with the network. + This also helps us if we enter the consensus round with + the wrong ledger, to leave it with the correct ledger so + that we can participate in the next round. */ void checkLCL () { @@ -396,6 +473,8 @@ public: if (mHaveCorrectLCL) priorLedger = mPreviousLedger->getParentHash (); // don't jump back + // Get validators that are on our ledger, or "close" to being on + // our ledger. ripple::unordered_map vals = getApp().getValidations ().getCurrentValidations (favoredLedger, priorLedger); @@ -465,7 +544,10 @@ public: handleLCL (netLgr); } - /** Change our view of the last closed ledger + /** + Change our view of the last closed ledger + + @param lclHash Hash of the last closed ledger. */ void handleLCL (uint256 const& lclHash) { @@ -483,12 +565,14 @@ public: propose (); } + // Stop proposing because we are out of sync mProposing = false; // mValidating = false; mPeerPositions.clear (); mDisputes.clear (); mCloseTimes.clear (); mDeadNodes.clear (); + // To get back in sync: playbackProposals (); } @@ -504,6 +588,7 @@ public: // need to start acquiring the correct consensus LCL WriteLog (lsWARNING, LedgerConsensus) << "Need consensus ledger " << mPrevLedgerHash; + // Tell the ledger acquire system that we need the consensus ledger mAcquiringLedger = mPrevLedgerHash; getApp().getJobQueue().addJob (jtADVANCE, "getConsensusLedger", std::bind ( @@ -530,7 +615,9 @@ public: - + /** + On timer call the correct handler for each state. + */ void timerEntry () { if ((mState != lcsFINISHED) && (mState != lcsACCEPTED)) @@ -569,7 +656,9 @@ public: assert (false); } - // state handlers + /** + Handle pre-close state. + */ void statePreClose () { // it is shortly before ledger close time @@ -597,6 +686,7 @@ public: } else { + // Use the time we saw the last ledger close sinceClose = 1000 * (getApp().getOPs ().getCloseTimeNC () - getApp().getOPs ().getLastCloseTime ()); idleInterval = LEDGER_IDLE_INTERVAL; @@ -605,6 +695,7 @@ public: idleInterval = std::max (idleInterval, LEDGER_IDLE_INTERVAL); idleInterval = std::max (idleInterval, 2 * mPreviousLedger->getCloseResolution ()); + // Decide if we should close the ledger if (ContinuousLedgerTiming::shouldClose (anyTransactions , mPreviousProposers, proposersClosed, proposersValidated , mPreviousMSeconds, sinceClose, mCurrentMSeconds @@ -613,7 +704,10 @@ public: closeLedger (); } } + /** We are establishing a consensus + Update our position only on the timer, and in this state. + If we have consensus, move to the finish state */ void stateEstablish () { @@ -659,6 +753,7 @@ public: int agree = 0, disagree = 0; uint256 ourPosition = mOurPosition->getCurrentHash (); + // Count number of agreements/disagreements with our position for (auto& it : mPeerPositions) { if (!it.second->isBowOut ()) @@ -696,12 +791,18 @@ public: << "Checking for TX consensus: agree=" << agree << ", disagree=" << disagree; + // Determine if we actually have consensus or not return ContinuousLedgerTiming::haveConsensus (mPreviousProposers, agree + disagree, agree, currentValidations , mPreviousMSeconds, mCurrentMSeconds, forReal, mConsensusFail); } - /** A server has taken a new position, adjust our tracking + /** + A server has taken a new position, adjust our tracking + Called when a peer takes a new postion. + + @param newPosition the new position + @return true if we should do delayed relay of this position. */ bool peerPosition (LedgerProposal::ref newPosition) { @@ -770,7 +871,13 @@ public: return true; } - /** A peer has informed us that it can give us a transaction set + /** + A peer has informed us that it can give us a transaction set + + @param peer The peer we can get it from. + @param hashSet The transaction set we can get. + @param status Says whether or not the peer has the transaction locally. + @return true if we have or acquire the transaction set. */ bool peerHasSet (Peer::ptr const& peer, uint256 const& hashSet , protocol::TxSetStatus status) @@ -793,7 +900,14 @@ public: return true; } - /** A peer has sent us some nodes from a transaction set + /** + A peer has sent us some nodes from a transaction set + + @param peer The peer which has sent the nodes + @param setHash The transaction set + @param nodeIDs The nodes in the transaction set + @param nodeData The data + @return The status results of adding the nodes. */ SHAMapAddNode peerGaveNodes (Peer::ptr const& peer , uint256 const& setHash, const std::list& nodeIDs @@ -830,6 +944,8 @@ public: } private: /** We have a new last closed ledger, process it. Final accept logic + + @param set Our consensus set */ void accept (SHAMap::pointer set) { @@ -869,15 +985,17 @@ private: << "Report: TxSt = " << set->getHash () << ", close " << closeTime << (closeTimeCorrect ? "" : "X"); + // Put failed transactions into a deterministic order CanonicalTXSet failedTransactions (set->getHash ()); + // Build the new last closed ledger Ledger::pointer newLCL = std::make_shared (false , *mPreviousLedger); // Set up to write SHAMap changes to our database, // perform updates, extract changes - newLCL->peekTransactionMap ()->armDirty (); + newLCL->peekTransactionMap ()->armDirty (); // start tracking changes newLCL->peekAccountStateMap ()->armDirty (); WriteLog (lsDEBUG, LedgerConsensus) << "Applying consensus set transactions to the" @@ -886,7 +1004,7 @@ private: newLCL->updateSkipList (); newLCL->setClosed (); std::shared_ptr acctNodes - = newLCL->peekAccountStateMap ()->disarmDirty (); + = newLCL->peekAccountStateMap ()->disarmDirty (); // stop tracking changes std::shared_ptr txnNodes = newLCL->peekTransactionMap ()->disarmDirty (); @@ -907,8 +1025,10 @@ private: << "Flushed " << fc << " dirty transaction nodes"; } + // Accept ledger newLCL->setAccepted (closeTime, mCloseResolution, closeTimeCorrect); + // And stash the ledger in the ledger master if (getApp().getLedgerMaster().storeLedger (newLCL)) WriteLog (lsDEBUG, LedgerConsensus) << "Consensus built ledger we already had"; @@ -923,22 +1043,24 @@ private: << "Report: NewL = " << newLCL->getHash () << ":" << newLCL->getLedgerSeq (); uint256 newLCLHash = newLCL->getHash (); - + // Tell directly connected peers that we have a new LCL statusChange (protocol::neACCEPTED_LEDGER, *newLCL); if (mValidating && !mConsensusFail) { + // Build validation uint256 signingHash; SerializedValidation::pointer v = std::make_shared (newLCLHash, getApp().getOPs ().getValidationTimeNC () , mValPublic, mProposing); v->setFieldU32 (sfLedgerSequence, newLCL->getLedgerSeq ()); - addLoad(v); + addLoad(v); // Our network load if (((newLCL->getLedgerSeq () + 1) % 256) == 0) // next ledger is flag ledger { + // Suggest fee changes and new features m_feeVote.doValidation (newLCL, *v); getApp().getAmendmentTable ().doValidation (newLCL, *v); } @@ -952,6 +1074,7 @@ private: Blob validation = v->getSigned (); protocol::TMValidation val; val.set_validation (&validation[0], validation.size ()); + // Send signed validation to all of our directly connected peers getApp ().overlay ().foreach (send_always ( std::make_shared ( val, protocol::mtVALIDATION))); @@ -965,6 +1088,7 @@ private: // See if we can accept a ledger as fully-validated getApp().getLedgerMaster().consensusBuilt (newLCL); + // Build new open ledger Ledger::pointer newOL = std::make_shared (true, *newLCL); LedgerMaster::ScopedLockType sl @@ -1003,22 +1127,25 @@ private: newOL, newLCL, failedTransactions, true); } - { - Ledger::pointer oldOL = getApp().getLedgerMaster().getCurrentLedger(); - if (oldOL->peekTransactionMap()->getHash().isNonZero ()) - { + { + // Apply transactions from the old open ledger + Ledger::pointer oldOL = getApp().getLedgerMaster().getCurrentLedger(); + if (oldOL->peekTransactionMap()->getHash().isNonZero ()) + { WriteLog (lsDEBUG, LedgerConsensus) << "Applying transactions from current open ledger"; applyTransactions (oldOL->peekTransactionMap (), newOL, newLCL, failedTransactions, true); } - } + } { + // Apply local transactions TransactionEngine engine (newOL); m_localTX.apply (engine); } + // We have a new Last Closed Ledger and new Open Ledger getApp().getLedgerMaster ().pushLedger (newLCL, newOL); mNewLedgerHash = newLCL->getHash (); mState = lcsACCEPTED; @@ -1027,7 +1154,7 @@ private: if (mValidating) { // see how close our close time is to other node's - // close time reports + // close time reports, and update our clock. WriteLog (lsINFO, LedgerConsensus) << "We closed at " << beast::lexicalCastThrow (mCloseTime); @@ -1059,7 +1186,10 @@ private: } } - /** Begin acquiring a transaction set + /** + Begin acquiring a transaction set + + @param acquire The transaction set to acquire. */ void startAcquiring (TransactionAcquire::pointer acquire) { @@ -1110,11 +1240,12 @@ private: acquire->setTimer (); } - // Where is this function? - SHAMap::pointer find (uint256 const & hash); - - /** Compare two proposed transaction sets and create disputed + /** + Compare two proposed transaction sets and create disputed transctions structures for any mismatches + + @param m1 One transaction set + @param m2 The other transaction set */ void createDisputes (SHAMap::ref m1, SHAMap::ref m2) { @@ -1127,20 +1258,21 @@ private: m1->compare (m2, differences, 16384); int dc = 0; + // for each difference between the transactions for (auto& pos : differences) { ++dc; // create disputed transactions (from the ledger that has them) if (pos.second.first) { - // transaction is in first map + // transaction is only in first map assert (!pos.second.second); addDisputedTransaction (pos.first , pos.second.first->peekData ()); } else if (pos.second.second) { - // transaction is in second map + // transaction is only in second map assert (!pos.second.first); addDisputedTransaction (pos.first , pos.second.second->peekData ()); @@ -1151,8 +1283,12 @@ private: WriteLog (lsDEBUG, LedgerConsensus) << dc << " differences found"; } - /** Add a disputed transaction (one that at least one node wants - in the consensus set and at least one node does not) to our tracking + /** + Add a disputed transaction (one that at least one node wants + in the consensus set and at least one node does not) to our tracking + + @param txID The ID of the disputed transaction + @param tx The data of the disputed transaction */ void addDisputedTransaction (uint256 const& txID, Blob const& tx) { @@ -1164,6 +1300,7 @@ private: bool ourVote = false; + // Update our vote on the disputed transaction if (mOurPosition) { auto mit (mAcquired.find (mOurPosition->getCurrentHash ())); @@ -1178,6 +1315,7 @@ private: (txID, tx, ourVote); mDisputes[txID] = txn; + // Update all of the peer's votes on the disputed transaction for (auto& pit : mPeerPositions) { auto cit (mAcquired.find (pit.second->getCurrentHash ())); @@ -1201,8 +1339,12 @@ private: } } - /** Adjust the counts on all disputed transactions based + /** + Adjust the votes on all disputed transactions based on the set of peers taking this position + + @param map A disputed position + @param peers peers which are taking the position map */ void adjustCount (SHAMap::ref map, const std::vector& peers) { @@ -1241,7 +1383,11 @@ private: } /** Let peers know that we a particular transactions set so they - can fetch it from us. + can fetch it from us. + + @param hash The ID of the transaction. + @param direct true if we have this transaction locally, else a + directly connected peer has it. */ void sendHaveTxSet (uint256 const& hash, bool direct) { @@ -1254,6 +1400,14 @@ private: } /** Apply a set of transactions to a ledger + + @param set The set of transactions to apply + @param applyLedger The ledger to which the transactions should be + applied. + @param checkLedger A reference ledger for determining error + messages (typically new last closed ledger). + @param failedTransactions collect failed transactions in this set + @param openLgr true if applyLedger is open, else false. */ void applyTransactions (SHAMap::ref set, Ledger::ref applyLedger, Ledger::ref checkLedger, CanonicalTXSet& failedTransactions, @@ -1262,40 +1416,44 @@ private: TransactionEngine engine (applyLedger); if (set) - for (SHAMapItem::pointer item = set->peekFirstItem (); !!item; - item = set->peekNextItem (item->getTag ())) - if (!checkLedger->hasTransaction (item->getTag ())) + { + for (SHAMapItem::pointer item = set->peekFirstItem (); !!item; + item = set->peekNextItem (item->getTag ())) { - WriteLog (lsINFO, LedgerConsensus) - << "Processing candidate transaction: " << item->getTag (); -#ifndef TRUST_NETWORK - - try + // If the checkLedger doesn't have the transaction + if (!checkLedger->hasTransaction (item->getTag ())) { -#endif - SerializerIterator sit (item->peekSerializer ()); - SerializedTransaction::pointer txn - = std::make_shared(sit); - - if (applyTransaction (engine, txn, - applyLedger, openLgr, true) == resultRetry) + // Then try to apply the transaction to applyLedger + WriteLog (lsINFO, LedgerConsensus) << + "Processing candidate transaction: " << item->getTag (); +#ifndef TRUST_NETWORK + try { - failedTransactions.push_back (txn); - } - -#ifndef TRUST_NETWORK - } - catch (...) - { - WriteLog (lsWARNING, LedgerConsensus) << " Throws"; - } - #endif + SerializerIterator sit (item->peekSerializer ()); + SerializedTransaction::pointer txn + = std::make_shared(sit); + if (applyTransaction (engine, txn, + applyLedger, openLgr, true) == resultRetry) + { + // On failure, stash the failed transaction for + // later retry. + failedTransactions.push_back (txn); + } +#ifndef TRUST_NETWORK + } + catch (...) + { + WriteLog (lsWARNING, LedgerConsensus) << " Throws"; + } +#endif + } } + } int changes; bool certainRetry = true; - + // Attempt to apply all of the failed transactions for (int pass = 0; pass < LEDGER_TOTAL_PASSES; ++pass) { WriteLog (lsDEBUG, LedgerConsensus) << "Pass: " << pass << " Txns: " @@ -1351,6 +1509,13 @@ private: } /** Apply a transaction to a ledger + + @param engine The transaction engine used for the application. + @param txn The transaction to be applied to ledger. + @param ledger The ledger to apply txn to. + @param openLedger true if ledger is open + @param retryAssured true if the transaction should be retried on failure. + @return One of resultSuccess, resultFail or resultRetry. */ int applyTransaction (TransactionEngine& engine , SerializedTransaction::ref txn, Ledger::ref ledger @@ -1413,18 +1578,28 @@ private: catch (...) { WriteLog (lsWARNING, LedgerConsensus) << "Throws"; - return false; + return resultFail; } #endif } + /** + Round the close time to the close time resolution. + + @param closeTime The time to be rouned. + @return The rounded close time. + */ std::uint32_t roundCloseTime (std::uint32_t closeTime) { return Ledger::roundCloseTime (closeTime, mCloseResolution); } - /** Send a node status change message to our peers + /** Send a node status change message to our directly connected peers + + @param event The event which caused the status change. This is + typically neACCEPTED_LEDGER or neCLOSING_LEDGER. + @param ledger The ledger associated with the event. */ void statusChange (protocol::NodeEvent event, Ledger& ledger) { @@ -1465,6 +1640,8 @@ private: /** Take an initial position on what we think the consensus should be based on the transactions that made it into our open ledger + + @param initialLedger The initial position (ledger) to be formed. */ void takeInitialPosition (Ledger& initialLedger) { @@ -1532,16 +1709,27 @@ private: propose (); } - // For a given number of participants and required percent - // for consensus, how many participants must agree? + /** + For a given number of participants and required percent + for consensus, how many participants must agree? + + @param size number of validators + @param percent desired percent for consensus + @return number of participates which must agree + */ static int computePercent (int size, int percent) { int result = ((size * percent) + (percent / 2)) / 100; return (result == 0) ? 1 : result; } + /** + Called while trying to avalanche towards consensus. + Adjusts our positions to try to agree with other validators. + */ void updateOurPositions () { + // Compute a cutoff time boost::posix_time::ptime peerCutoff = boost::posix_time::second_clock::universal_time (); boost::posix_time::ptime ourCutoff @@ -1560,7 +1748,7 @@ private: { if (it->second->isStale (peerCutoff)) { - // proposal is stale + // peer's proposal is stale, so remove it auto const& peerID = it->second->getPeerID (); WriteLog (lsWARNING, LedgerConsensus) << "Removing stale proposal from " << peerID; @@ -1576,6 +1764,7 @@ private: } } + // Update votes on disputed transactions for (auto& it : mDisputes) { // Because the threshold for inclusion increases, @@ -1604,7 +1793,6 @@ private: } } - int neededWeight; if (mClosePercent < AV_MID_CONSENSUS_TIME) @@ -1779,6 +1967,10 @@ private: takeInitialPosition (*getApp().getLedgerMaster ().getCurrentLedger ()); } + /** + If we missed a consensus round, we may be missing a validation. + This will send an older owed validation if we previously missed it. + */ void checkOurValidation () { // This only covers some cases - Fix for the case where we can't ever acquire the consensus ledger