diff --git a/src/ripple_app/consensus/LedgerConsensus.cpp b/src/ripple_app/consensus/LedgerConsensus.cpp index 5bf3cb76ab..8519be294e 100644 --- a/src/ripple_app/consensus/LedgerConsensus.cpp +++ b/src/ripple_app/consensus/LedgerConsensus.cpp @@ -17,1571 +17,1909 @@ */ //============================================================================== -// #define TRUST_NETWORK - -#define LC_DEBUG - SETUP_LOG (LedgerConsensus) -LedgerConsensus::LedgerConsensus (uint256 const& prevLCLHash, Ledger::ref previousLedger, uint32 closeTime) - : mState (lcsPRE_CLOSE), mCloseTime (closeTime), mPrevLedgerHash (prevLCLHash), mPreviousLedger (previousLedger), - mValPublic (getConfig ().VALIDATION_PUB), mValPrivate (getConfig ().VALIDATION_PRIV), mConsensusFail (false), - mCurrentMSeconds (0), mClosePercent (0), mHaveCloseTimeConsensus (false), - mConsensusStartTime (boost::posix_time::microsec_clock::universal_time ()) +// #define TRUST_NETWORK + +class LedgerConsensusImp + : public LedgerConsensus + , public boost::enable_shared_from_this + , public CountedObject { - WriteLog (lsDEBUG, LedgerConsensus) << "Creating consensus object"; - WriteLog (lsTRACE, LedgerConsensus) << "LCL:" << previousLedger->getHash () << ", ct=" << closeTime; - mPreviousProposers = getApp().getOPs ().getPreviousProposers (); - mPreviousMSeconds = getApp().getOPs ().getPreviousConvergeTime (); - assert (mPreviousMSeconds); +public: + enum {resultSuccess, resultFail, resultRetry}; - mCloseResolution = ContinuousLedgerTiming::getNextLedgerTimeResolution ( - mPreviousLedger->getCloseResolution (), mPreviousLedger->getCloseAgree (), previousLedger->getLedgerSeq () + 1); + static char const* getCountedObjectName () { return "LedgerConsensus"; } - if (mValPublic.isSet () && mValPrivate.isSet () && !getApp().getOPs ().isNeedNetworkLedger ()) + LedgerConsensusImp (LedgerHash const & prevLCLHash, + Ledger::ref previousLedger, uint32 closeTime) + : mState (lcsPRE_CLOSE) + , mCloseTime (closeTime) + , mPrevLedgerHash (prevLCLHash) + , mPreviousLedger (previousLedger) + , mValPublic (getConfig ().VALIDATION_PUB) + , mValPrivate (getConfig ().VALIDATION_PRIV) + , mConsensusFail (false) + , mCurrentMSeconds (0) + , mClosePercent (0) + , mHaveCloseTimeConsensus (false) + , mConsensusStartTime + (boost::posix_time::microsec_clock::universal_time ()) { - WriteLog (lsINFO, LedgerConsensus) << "Entering consensus process, validating"; - mValidating = true; - mProposing = getApp().getOPs ().getOperatingMode () == NetworkOPs::omFULL; - } - else - { - WriteLog (lsINFO, LedgerConsensus) << "Entering consensus process, watching"; - mProposing = mValidating = false; - } + WriteLog (lsDEBUG, LedgerConsensus) << "Creating consensus object"; + WriteLog (lsTRACE, LedgerConsensus) + << "LCL:" << previousLedger->getHash () << ", ct=" << closeTime; + mPreviousProposers = getApp().getOPs ().getPreviousProposers (); + mPreviousMSeconds = getApp().getOPs ().getPreviousConvergeTime (); + assert (mPreviousMSeconds); - mHaveCorrectLCL = (mPreviousLedger->getHash () == mPrevLedgerHash); + mCloseResolution = ContinuousLedgerTiming::getNextLedgerTimeResolution ( + mPreviousLedger->getCloseResolution (), + mPreviousLedger->getCloseAgree (), + previousLedger->getLedgerSeq () + 1); - if (!mHaveCorrectLCL) - { - getApp().getOPs ().setProposing (false, false); - handleLCL (mPrevLedgerHash); + if (mValPublic.isSet () && mValPrivate.isSet () + && !getApp().getOPs ().isNeedNetworkLedger ()) + { + WriteLog (lsINFO, LedgerConsensus) + << "Entering consensus process, validating"; + mValidating = true; + mProposing = + getApp().getOPs ().getOperatingMode () == NetworkOPs::omFULL; + } + else + { + WriteLog (lsINFO, LedgerConsensus) + << "Entering consensus process, watching"; + mProposing = mValidating = false; + } + + mHaveCorrectLCL = (mPreviousLedger->getHash () == mPrevLedgerHash); if (!mHaveCorrectLCL) { - // mProposing = mValidating = false; - WriteLog (lsINFO, LedgerConsensus) << "Entering consensus with: " << previousLedger->getHash (); - WriteLog (lsINFO, LedgerConsensus) << "Correct LCL is: " << prevLCLHash; + getApp().getOPs ().setProposing (false, false); + handleLCL (mPrevLedgerHash); + + if (!mHaveCorrectLCL) + { + // mProposing = mValidating = false; + WriteLog (lsINFO, LedgerConsensus) + << "Entering consensus with: " + << previousLedger->getHash (); + WriteLog (lsINFO, LedgerConsensus) + << "Correct LCL is: " << prevLCLHash; + } } + else + getApp().getOPs ().setProposing (mProposing, mValidating); } - else - getApp().getOPs ().setProposing (mProposing, mValidating); -} - -void LedgerConsensus::checkOurValidation () -{ - // This only covers some cases - Fix for the case where we can't ever acquire the consensus ledger - if (!mHaveCorrectLCL || !mValPublic.isSet () || !mValPrivate.isSet () || getApp().getOPs ().isNeedNetworkLedger ()) - return; - - SerializedValidation::pointer lastVal = getApp().getOPs ().getLastValidation (); - - if (lastVal) + int startup () { - if (lastVal->getFieldU32 (sfLedgerSequence) == mPreviousLedger->getLedgerSeq ()) - return; - - if (lastVal->getLedgerHash () == mPrevLedgerHash) - return; + return 1; } - uint256 signingHash; - SerializedValidation::pointer v = boost::make_shared - (mPreviousLedger->getHash (), getApp().getOPs ().getValidationTimeNC (), mValPublic, false); - addLoad(v); - v->setTrusted (); - v->sign (signingHash, mValPrivate); - getApp().getHashRouter ().addSuppression (signingHash); // FIXME: wrong supression - getApp().getValidations ().addValidation (v, "localMissing"); - Blob validation = v->getSigned (); - protocol::TMValidation val; - val.set_validation (&validation[0], validation.size ()); -#if 0 - getApp().getPeers ().relayMessage (NULL, - boost::make_shared (val, protocol::mtVALIDATION)); -#endif - getApp().getOPs ().setLastValidation (v); - WriteLog (lsWARNING, LedgerConsensus) << "Sending partial validation"; -} - -/** Check if our last closed ledger matches the network's -*/ -void LedgerConsensus::checkLCL () -{ - uint256 netLgr = mPrevLedgerHash; - int netLgrCount = 0; - - uint256 favoredLedger = mPrevLedgerHash; // Don't jump forward - uint256 priorLedger; - - if (mHaveCorrectLCL) - priorLedger = mPreviousLedger->getParentHash (); // don't jump back - - boost::unordered_map vals = - getApp().getValidations ().getCurrentValidations (favoredLedger, priorLedger); - - typedef std::map::value_type u256_cvc_pair; - BOOST_FOREACH (u256_cvc_pair & it, vals) - - if ((it.second.first > netLgrCount) || - ((it.second.first == netLgrCount) && (it.first == mPrevLedgerHash))) + Json::Value getJson (bool full) { - netLgr = it.first; - netLgrCount = it.second.first; - } + Json::Value ret (Json::objectValue); + ret["proposing"] = mProposing; + ret["validating"] = mValidating; + ret["proposers"] = static_cast (mPeerPositions.size ()); - if (netLgr != mPrevLedgerHash) - { - // LCL change - const char* status; + if (mHaveCorrectLCL) + { + ret["synched"] = true; + ret["ledger_seq"] = mPreviousLedger->getLedgerSeq () + 1; + ret["close_granularity"] = mCloseResolution; + } + else + ret["synched"] = false; switch (mState) { case lcsPRE_CLOSE: - status = "PreClose"; + ret["state"] = "open"; break; case lcsESTABLISH: - status = "Establish"; + ret["state"] = "consensus"; break; case lcsFINISHED: - status = "Finished"; + ret["state"] = "finished"; break; case lcsACCEPTED: - status = "Accepted"; + ret["state"] = "accepted"; break; - - default: - status = "unknown"; } - WriteLog (lsWARNING, LedgerConsensus) << "View of consensus changed during " << status << " (" << netLgrCount << ") status=" - << status << ", " << (mHaveCorrectLCL ? "CorrectLCL" : "IncorrectLCL"); - WriteLog (lsWARNING, LedgerConsensus) << mPrevLedgerHash << " to " << netLgr; - WriteLog (lsWARNING, LedgerConsensus) << mPreviousLedger->getJson (0); + int v = mDisputes.size (); - if (ShouldLog (lsDEBUG, LedgerConsensus)) + if ((v != 0) && !full) + ret["disputes"] = v; + + if (mOurPosition) + ret["our_position"] = mOurPosition->getJson (); + + if (full) { - BOOST_FOREACH (u256_cvc_pair & it, vals) - WriteLog (lsDEBUG, LedgerConsensus) << "V: " << it.first << ", " << it.second.first; - } - if (mHaveCorrectLCL) - getApp().getOPs ().consensusViewChange (); + ret["current_ms"] = mCurrentMSeconds; + ret["close_percent"] = mClosePercent; + ret["close_resolution"] = mCloseResolution; + ret["have_time_consensus"] = mHaveCloseTimeConsensus; + ret["previous_proposers"] = mPreviousProposers; + ret["previous_mseconds"] = mPreviousMSeconds; - handleLCL (netLgr); - } - else if (mPreviousLedger->getHash () != mPrevLedgerHash) - handleLCL (netLgr); -} - -/** Change our view of the last closed ledger -*/ -void LedgerConsensus::handleLCL (uint256 const& lclHash) -{ - assert ((lclHash != mPrevLedgerHash) || (mPreviousLedger->getHash () != lclHash)); - - if (mPrevLedgerHash != lclHash) - { - // first time switching to this ledger - mPrevLedgerHash = lclHash; - - if (mHaveCorrectLCL && mProposing && mOurPosition) - { - WriteLog (lsINFO, LedgerConsensus) << "Bowing out of consensus"; - mOurPosition->bowOut (); - propose (); - } - - mProposing = false; - // mValidating = false; - mPeerPositions.clear (); - mDisputes.clear (); - mCloseTimes.clear (); - mDeadNodes.clear (); - playbackProposals (); - } - - if (mPreviousLedger->getHash () == mPrevLedgerHash) - return; - - // we need to switch the ledger we're working from - Ledger::pointer newLCL = getApp().getLedgerMaster ().getLedgerByHash (lclHash); - - if (newLCL) - { - assert (newLCL->isClosed ()); - assert (newLCL->isImmutable ()); - assert (newLCL->getHash () == lclHash); - mPreviousLedger = newLCL; - mPrevLedgerHash = lclHash; - } - else if (!mAcquiringLedger || (mAcquiringLedger->getHash () != mPrevLedgerHash)) - { - // need to start acquiring the correct consensus LCL - WriteLog (lsWARNING, LedgerConsensus) << "Need consensus ledger " << mPrevLedgerHash; - - if (mAcquiringLedger) - getApp().getInboundLedgers ().dropLedger (mAcquiringLedger->getHash ()); - - mAcquiringLedger = getApp().getInboundLedgers ().findCreate (mPrevLedgerHash, 0, true); - mHaveCorrectLCL = false; - return; - } - else - return; - - WriteLog (lsINFO, LedgerConsensus) << "Have the consensus ledger " << mPrevLedgerHash; - mHaveCorrectLCL = true; - - mCloseResolution = ContinuousLedgerTiming::getNextLedgerTimeResolution ( - mPreviousLedger->getCloseResolution (), mPreviousLedger->getCloseAgree (), - mPreviousLedger->getLedgerSeq () + 1); -} - -/** Take an initial position on what we think the consensus should be - based on the transactions that made it into our open ledger -*/ -void LedgerConsensus::takeInitialPosition (Ledger& initialLedger) -{ - SHAMap::pointer initialSet; - - if ((getConfig ().RUN_STANDALONE || (mProposing && mHaveCorrectLCL)) - && ((mPreviousLedger->getLedgerSeq () % 256) == 0)) - { - // previous ledger was flag ledger - SHAMap::pointer preSet = initialLedger.peekTransactionMap ()->snapShot (true); - getApp().getFeeVote ().doVoting (mPreviousLedger, preSet); - getApp().getFeatureTable ().doVoting (mPreviousLedger, preSet); - initialSet = preSet->snapShot (false); - } - else - initialSet = initialLedger.peekTransactionMap ()->snapShot (false); - - uint256 txSet = initialSet->getHash (); - WriteLog (lsINFO, LedgerConsensus) << "initial position " << txSet; - mapComplete (txSet, initialSet, false); - - if (mValidating) - mOurPosition = boost::make_shared - (mValPublic, mValPrivate, initialLedger.getParentHash (), txSet, mCloseTime); - else - mOurPosition = boost::make_shared (initialLedger.getParentHash (), txSet, mCloseTime); - - BOOST_FOREACH (u256_lct_pair & it, mDisputes) - { - it.second->setOurVote (initialLedger.hasTransaction (it.first)); - } - - // if any peers have taken a contrary position, process disputes - boost::unordered_set found; - BOOST_FOREACH (u160_prop_pair & it, mPeerPositions) - { - uint256 set = it.second->getCurrentHash (); - - if (found.insert (set).second) - { - boost::unordered_map::iterator iit = mAcquired.find (set); - - if (iit != mAcquired.end ()) + if (!mPeerPositions.empty ()) { - mCompares.insert(iit->second->getHash()); - createDisputes (initialSet, iit->second); - } + typedef boost::unordered_map::value_type pp_t; + Json::Value ppj (Json::objectValue); + BOOST_FOREACH (pp_t & pp, mPeerPositions) + { + ppj[pp.first.GetHex ()] = pp.second->getJson (); + } + ret["peer_positions"] = ppj; + } + + if (!mAcquired.empty ()) + { + // acquired + typedef boost::unordered_map::value_type ac_t; + Json::Value acq (Json::objectValue); + BOOST_FOREACH (ac_t & at, mAcquired) + { + if (at.second) + acq[at.first.GetHex ()] = "acquired"; + else + acq[at.first.GetHex ()] = "failed"; + } + ret["acquired"] = acq; + } + + if (!mAcquiring.empty ()) + { + typedef boost::unordered_map::value_type ac_t; + Json::Value acq (Json::arrayValue); + BOOST_FOREACH (ac_t & at, mAcquiring) + { + acq.append (at.first.GetHex ()); + } + ret["acquiring"] = acq; + } + + if (!mDisputes.empty ()) + { + typedef boost::unordered_map::value_type d_t; + Json::Value dsj (Json::objectValue); + BOOST_FOREACH (d_t & dt, mDisputes) + { + dsj[dt.first.GetHex ()] = dt.second->getJson (); + } + ret["disputes"] = dsj; + } + + if (!mCloseTimes.empty ()) + { + typedef std::map::value_type ct_t; + Json::Value ctj (Json::objectValue); + BOOST_FOREACH (ct_t & ct, mCloseTimes) + { + ctj[lexicalCastThrow (ct.first)] = ct.second; + } + ret["close_times"] = ctj; + } + + if (!mDeadNodes.empty ()) + { + Json::Value dnj (Json::arrayValue); + BOOST_FOREACH (const uint160 & dn, mDeadNodes) + { + dnj.append (dn.GetHex ()); + } + ret["dead_nodes"] = dnj; + } } + + return ret; } - if (mProposing) - propose (); -} + Ledger::ref peekPreviousLedger () + { + return mPreviousLedger; + } -/** Determine if we still need to acquire a transaction set from the network. + uint256 getLCL () + { + return mPrevLedgerHash; + } + + /** Get a transaction tree, + fetching it from the network is required and requested + */ + SHAMap::pointer getTransactionTree (uint256 const& hash, bool doAcquire) + { + boost::unordered_map::iterator it + = mAcquired.find (hash); + + if (it != mAcquired.end ()) + return it->second; + + if (mState == lcsPRE_CLOSE) + { + SHAMap::pointer currentMap + = getApp().getLedgerMaster ().getCurrentLedger () + ->peekTransactionMap (); + + if (currentMap->getHash () == hash) + { + WriteLog (lsDEBUG, LedgerConsensus) + << "Map " << hash << " is our current"; + currentMap = currentMap->snapShot (false); + mapComplete (hash, currentMap, false); + return currentMap; + } + } + + if (doAcquire) + { + TransactionAcquire::pointer& acquiring = mAcquiring[hash]; + + if (!acquiring) + { + if (hash.isZero ()) + { + SHAMap::pointer empty + = boost::make_shared (smtTRANSACTION); + mapComplete (hash, empty, false); + return empty; + } + + acquiring = boost::make_shared (hash); + startAcquiring (acquiring); + } + } + + return SHAMap::pointer (); + } + + /** We have a complete transaction set, + typically one acuired from the network + */ + void mapComplete (uint256 const& hash, SHAMap::ref map, bool acquired) + { + CondLog (acquired, lsINFO, LedgerConsensus) + << "We have acquired TXS " << hash; + + if (!map) + { + // this is an invalid/corrupt map + mAcquired[hash] = map; + mAcquiring.erase (hash); + WriteLog (lsWARNING, LedgerConsensus) + << "A trusted node directed us to acquire an invalid TXN map"; + return; + } + + assert (hash == map->getHash ()); + + boost::unordered_map::iterator it + = mAcquired.find (hash); + + if (mAcquired.find (hash) != mAcquired.end ()) + { + if (it->second) + { + mAcquiring.erase (hash); + return; // we already have this map + } + + // We previously failed to acquire this map, now we have it + mAcquired.erase (hash); + } + + if (mOurPosition && (!mOurPosition->isBowOut ()) + && (hash != mOurPosition->getCurrentHash ())) + { + // this could create disputed transactions + boost::unordered_map::iterator it2 + = mAcquired.find (mOurPosition->getCurrentHash ()); + + if (it2 != mAcquired.end ()) + { + assert ((it2->first == mOurPosition->getCurrentHash ()) + && it2->second); + mCompares.insert(hash); + createDisputes (it2->second, map); + } + else + assert (false); // We don't have our own position?! + } + else + WriteLog (lsDEBUG, LedgerConsensus) + << "Not ready to create disputes"; + + mAcquired[hash] = map; + mAcquiring.erase (hash); + + // Adjust tracking for each peer that takes this position + std::vector peers; + BOOST_FOREACH (u160_prop_pair & it, mPeerPositions) + { + if (it.second->getCurrentHash () == map->getHash ()) + peers.push_back (it.second->getPeerID ()); + } + + if (!peers.empty ()) + { + adjustCount (map, peers); + } + else + { + CondLog (acquired, lsWARNING, LedgerConsensus) + << "By the time we got the map " + << hash << " no peers were proposing it"; + } + + sendHaveTxSet (hash, true); + } + + /** Determine if we still need to acquire a transaction set from network. If a transaction set is popular, we probably have it. If it's unpopular, we probably don't need it (and the peer that initially made us retrieve it has probably already changed its position) -*/ -bool LedgerConsensus::stillNeedTXSet (uint256 const& hash) -{ - if (mAcquired.find (hash) != mAcquired.end ()) + */ + bool stillNeedTXSet (uint256 const& hash) + { + if (mAcquired.find (hash) != mAcquired.end ()) + return false; + + BOOST_FOREACH (u160_prop_pair & it, mPeerPositions) + { + if (it.second->getCurrentHash () == hash) + return true; + } return false; - - BOOST_FOREACH (u160_prop_pair & it, mPeerPositions) - { - if (it.second->getCurrentHash () == hash) - return true; - } - return false; -} - -/** Compare two proposed transaction sets and create disputed - transctions structures for any mismatches -*/ -void LedgerConsensus::createDisputes (SHAMap::ref m1, SHAMap::ref m2) -{ - if (m1->getHash() == m2->getHash()) - return; - - WriteLog (lsDEBUG, LedgerConsensus) << "createDisputes " << m1->getHash() << " to " << m2->getHash(); - SHAMap::Delta differences; - m1->compare (m2, differences, 16384); - - int dc = 0; - typedef std::map::value_type u256_diff_pair; - BOOST_FOREACH (u256_diff_pair & pos, differences) - { - ++dc; - // create disputed transactions (from the ledger that has them) - if (pos.second.first) - { - // transaction is in first map - assert (!pos.second.second); - addDisputedTransaction (pos.first, pos.second.first->peekData ()); - } - else if (pos.second.second) - { - // transaction is in second map - assert (!pos.second.first); - addDisputedTransaction (pos.first, pos.second.second->peekData ()); - } - else // No other disagreement over a transaction should be possible - assert (false); - } - WriteLog (lsDEBUG, LedgerConsensus) << dc << " differences found"; -} - -/** We have a complete transaction set, typically one acuired from the network -*/ -void LedgerConsensus::mapComplete (uint256 const& hash, SHAMap::ref map, bool acquired) -{ - CondLog (acquired, lsINFO, LedgerConsensus) << "We have acquired TXS " << hash; - - if (!map) - { - // this is an invalid/corrupt map - mAcquired[hash] = map; - mAcquiring.erase (hash); - WriteLog (lsWARNING, LedgerConsensus) << "A trusted node directed us to acquire an invalid TXN map"; - return; } - assert (hash == map->getHash ()); - - boost::unordered_map::iterator it = mAcquired.find (hash); - - if (mAcquired.find (hash) != mAcquired.end ()) + /** Check if our last closed ledger matches the network's + */ + void checkLCL () { - if (it->second) + uint256 netLgr = mPrevLedgerHash; + int netLgrCount = 0; + + uint256 favoredLedger = mPrevLedgerHash; // Don't jump forward + uint256 priorLedger; + + if (mHaveCorrectLCL) + priorLedger = mPreviousLedger->getParentHash (); // don't jump back + + boost::unordered_map vals = + getApp().getValidations ().getCurrentValidations + (favoredLedger, priorLedger); + + typedef std::map::value_type u256_cvc_pair; + BOOST_FOREACH (u256_cvc_pair & it, vals) + + if ((it.second.first > netLgrCount) || + ((it.second.first == netLgrCount) && (it.first == mPrevLedgerHash))) { - mAcquiring.erase (hash); - return; // we already have this map + netLgr = it.first; + netLgrCount = it.second.first; } - // We previously failed to acquire this map, now we have it - mAcquired.erase (hash); - } - - if (mOurPosition && (!mOurPosition->isBowOut ()) && (hash != mOurPosition->getCurrentHash ())) - { - // this could create disputed transactions - boost::unordered_map::iterator it2 = mAcquired.find (mOurPosition->getCurrentHash ()); - - if (it2 != mAcquired.end ()) + if (netLgr != mPrevLedgerHash) { - assert ((it2->first == mOurPosition->getCurrentHash ()) && it2->second); - mCompares.insert(hash); - createDisputes (it2->second, map); - } - else - assert (false); // We don't have our own position?! - } - else - WriteLog (lsDEBUG, LedgerConsensus) << "Not ready to create disputes"; + // LCL change + const char* status; - mAcquired[hash] = map; - mAcquiring.erase (hash); - - // Adjust tracking for each peer that takes this position - std::vector peers; - BOOST_FOREACH (u160_prop_pair & it, mPeerPositions) - { - if (it.second->getCurrentHash () == map->getHash ()) - peers.push_back (it.second->getPeerID ()); - } - - if (!peers.empty ()) - adjustCount (map, peers); - else - { - CondLog (acquired, lsWARNING, LedgerConsensus) << "By the time we got the map " << hash << " no peers were proposing it"; - } - - sendHaveTxSet (hash, true); -} - -/** Let peers know that we a particular transactions set so they - can fetch it from us. -*/ -void LedgerConsensus::sendHaveTxSet (uint256 const& hash, bool direct) -{ - protocol::TMHaveTransactionSet msg; - msg.set_hash (hash.begin (), 256 / 8); - msg.set_status (direct ? protocol::tsHAVE : protocol::tsCAN_GET); - PackedMessage::pointer packet = boost::make_shared (msg, protocol::mtHAVE_SET); - getApp().getPeers ().relayMessage (NULL, packet); -} - -/** Adjust the counts on all disputed transactions based on the set of peers taking this position -*/ -void LedgerConsensus::adjustCount (SHAMap::ref map, const std::vector& peers) -{ - BOOST_FOREACH (u256_lct_pair & it, mDisputes) - { - bool setHas = map->hasItem (it.second->getTransactionID ()); - BOOST_FOREACH (const uint160 & pit, peers) - it.second->setVote (pit, setHas); - } -} - -/** Send a node status change message to our peers -*/ -void LedgerConsensus::statusChange (protocol::NodeEvent event, Ledger& ledger) -{ - protocol::TMStatusChange s; - - if (!mHaveCorrectLCL) - s.set_newevent (protocol::neLOST_SYNC); - else - s.set_newevent (event); - - s.set_ledgerseq (ledger.getLedgerSeq ()); - s.set_networktime (getApp().getOPs ().getNetworkTimeNC ()); - uint256 hash = ledger.getParentHash (); - s.set_ledgerhashprevious (hash.begin (), hash.size ()); - hash = ledger.getHash (); - s.set_ledgerhash (hash.begin (), hash.size ()); - - uint32 uMin, uMax; - if (!getApp().getOPs ().getFullValidatedRange (uMin, uMax)) - { - uMin = 0; - uMax = 0; - } - s.set_firstseq (uMin); - s.set_lastseq (uMax); - - PackedMessage::pointer packet = boost::make_shared (s, protocol::mtSTATUS_CHANGE); - getApp().getPeers ().relayMessage (NULL, packet); - WriteLog (lsTRACE, LedgerConsensus) << "send status change to peer"; -} - -int LedgerConsensus::startup () -{ - return 1; -} - -void LedgerConsensus::statePreClose () -{ - // it is shortly before ledger close time - bool anyTransactions = getApp().getLedgerMaster ().getCurrentLedger ()->peekTransactionMap ()->getHash ().isNonZero (); - int proposersClosed = mPeerPositions.size (); - int proposersValidated = getApp().getValidations ().getTrustedValidationCount (mPrevLedgerHash); - - // This ledger is open. This computes how long since the last ledger closed - int sinceClose; - int idleInterval = 0; - - if (mHaveCorrectLCL && mPreviousLedger->getCloseAgree ()) - { - // we can use consensus timing - sinceClose = 1000 * (getApp().getOPs ().getCloseTimeNC () - mPreviousLedger->getCloseTimeNC ()); - idleInterval = 2 * mPreviousLedger->getCloseResolution (); - - if (idleInterval < LEDGER_IDLE_INTERVAL) - idleInterval = LEDGER_IDLE_INTERVAL; - } - else - { - sinceClose = 1000 * (getApp().getOPs ().getCloseTimeNC () - getApp().getOPs ().getLastCloseTime ()); - idleInterval = LEDGER_IDLE_INTERVAL; - } - - if (ContinuousLedgerTiming::shouldClose (anyTransactions, mPreviousProposers, proposersClosed, proposersValidated, - mPreviousMSeconds, sinceClose, mCurrentMSeconds, idleInterval)) - { - closeLedger (); - } -} - -/** We have just decided to close the ledger. Start the consensus timer, - stash the close time, inform peers, and take a position -*/ -void LedgerConsensus::closeLedger () -{ - checkOurValidation (); - mState = lcsESTABLISH; - mConsensusStartTime = boost::posix_time::microsec_clock::universal_time (); - mCloseTime = getApp().getOPs ().getCloseTimeNC (); - getApp().getOPs ().setLastCloseTime (mCloseTime); - statusChange (protocol::neCLOSING_LEDGER, *mPreviousLedger); - takeInitialPosition (*getApp().getLedgerMaster ().closeLedger (true)); -} - -/** We are establishing a consensus -*/ -void LedgerConsensus::stateEstablish () -{ - - // Give everyone a chance to take an initial position - if (mCurrentMSeconds < LEDGER_MIN_CONSENSUS) - return; - - updateOurPositions (); - - if (!mHaveCloseTimeConsensus) - { - CondLog (haveConsensus (false), lsINFO, LedgerConsensus) << "We have TX consensus but not CT consensus"; - } - else if (haveConsensus (true)) - { - WriteLog (lsINFO, LedgerConsensus) << "Converge cutoff (" << mPeerPositions.size () << " participants)"; - mState = lcsFINISHED; - beginAccept (false); - } -} - -void LedgerConsensus::stateFinished () -{ - // we are processing the finished ledger - // logic of calculating next ledger advances us out of this state - // nothing to do -} - -void LedgerConsensus::stateAccepted () -{ - // we have accepted a new ledger - endConsensus (); -} - -void LedgerConsensus::timerEntry () -{ - if ((mState != lcsFINISHED) && (mState != lcsACCEPTED)) - checkLCL (); - - mCurrentMSeconds = - (boost::posix_time::microsec_clock::universal_time () - mConsensusStartTime).total_milliseconds (); - mClosePercent = mCurrentMSeconds * 100 / mPreviousMSeconds; - - switch (mState) - { - case lcsPRE_CLOSE: - statePreClose (); - return; - - case lcsESTABLISH: - stateEstablish (); - - if (mState != lcsFINISHED) return; - - fallthru (); - - case lcsFINISHED: - stateFinished (); - - if (mState != lcsACCEPTED) return; - - fallthru (); - - case lcsACCEPTED: - stateAccepted (); - return; - } - - assert (false); -} - -void LedgerConsensus::updateOurPositions () -{ - boost::posix_time::ptime peerCutoff = boost::posix_time::second_clock::universal_time (); - boost::posix_time::ptime ourCutoff = peerCutoff - boost::posix_time::seconds (PROPOSE_INTERVAL); - peerCutoff -= boost::posix_time::seconds (PROPOSE_FRESHNESS); - - bool changes = false; - SHAMap::pointer ourPosition; - // std::vector addedTx, removedTx; - - // Verify freshness of peer positions and compute close times - std::map closeTimes; - boost::unordered_map::iterator it = mPeerPositions.begin (); - - while (it != mPeerPositions.end ()) - { - if (it->second->isStale (peerCutoff)) - { - // proposal is stale - uint160 peerID = it->second->getPeerID (); - WriteLog (lsWARNING, LedgerConsensus) << "Removing stale proposal from " << peerID; - BOOST_FOREACH (u256_lct_pair & it, mDisputes) - it.second->unVote (peerID); - it = mPeerPositions.erase (it); - } - else - { - // proposal is still fresh - ++closeTimes[roundCloseTime (it->second->getCloseTime ())]; - ++it; - } - } - - BOOST_FOREACH (u256_lct_pair & it, mDisputes) - { - // Because the threshold for inclusion increases, time can change our position on a dispute - if (it.second->updateVote (mClosePercent, mProposing)) - { - if (!changes) + switch (mState) { - ourPosition = mAcquired[mOurPosition->getCurrentHash ()]->snapShot (true); - assert (ourPosition); - changes = true; + case lcsPRE_CLOSE: + status = "PreClose"; + break; + + case lcsESTABLISH: + status = "Establish"; + break; + + case lcsFINISHED: + status = "Finished"; + break; + + case lcsACCEPTED: + status = "Accepted"; + break; + + default: + status = "unknown"; } - if (it.second->getOurVote ()) // now a yes + WriteLog (lsWARNING, LedgerConsensus) + << "View of consensus changed during " << status + << " (" << netLgrCount << ") status=" + << status << ", " + << (mHaveCorrectLCL ? "CorrectLCL" : "IncorrectLCL"); + WriteLog (lsWARNING, LedgerConsensus) << mPrevLedgerHash + << " to " << netLgr; + WriteLog (lsWARNING, LedgerConsensus) + << mPreviousLedger->getJson (0); + + if (ShouldLog (lsDEBUG, LedgerConsensus)) { - ourPosition->addItem (SHAMapItem (it.first, it.second->peekTransaction ()), true, false); - // addedTx.push_back(it.first); + BOOST_FOREACH (u256_cvc_pair & it, vals) + WriteLog (lsDEBUG, LedgerConsensus) + << "V: " << it.first << ", " << it.second.first; } - else // now a no + + if (mHaveCorrectLCL) + getApp().getOPs ().consensusViewChange (); + + handleLCL (netLgr); + } + else if (mPreviousLedger->getHash () != mPrevLedgerHash) + handleLCL (netLgr); + } + + /** Change our view of the last closed ledger + */ + void handleLCL (uint256 const& lclHash) + { + assert ((lclHash != mPrevLedgerHash) || (mPreviousLedger->getHash () != lclHash)); + + if (mPrevLedgerHash != lclHash) + { + // first time switching to this ledger + mPrevLedgerHash = lclHash; + + if (mHaveCorrectLCL && mProposing && mOurPosition) { - ourPosition->delItem (it.first); - // removedTx.push_back(it.first); - } - } - } - - - int neededWeight; - - if (mClosePercent < AV_MID_CONSENSUS_TIME) - neededWeight = AV_INIT_CONSENSUS_PCT; - else if (mClosePercent < AV_LATE_CONSENSUS_TIME) - neededWeight = AV_MID_CONSENSUS_PCT; - else if (mClosePercent < AV_STUCK_CONSENSUS_TIME) - neededWeight = AV_LATE_CONSENSUS_PCT; - else - neededWeight = AV_STUCK_CONSENSUS_PCT; - - uint32 closeTime = 0; - mHaveCloseTimeConsensus = false; - - if (mPeerPositions.empty ()) - { - // no other times - mHaveCloseTimeConsensus = true; - closeTime = roundCloseTime (mOurPosition->getCloseTime ()); - } - else - { - int threshVote = mPeerPositions.size (); // Threshold for non-zero vote - int threshConsensus = mPeerPositions.size (); // Threshold to declare consensus - - if (mProposing) - { - ++closeTimes[roundCloseTime (mOurPosition->getCloseTime ())]; - ++threshVote; - ++threshConsensus; - } - - threshVote = ((threshVote * neededWeight) + (neededWeight / 2)) / 100; - threshConsensus = ((threshConsensus * AV_CT_CONSENSUS_PCT) + (AV_CT_CONSENSUS_PCT / 2)) / 100; - - if (threshVote == 0) - threshVote = 1; - - if (threshConsensus == 0) - threshConsensus = 1; - - WriteLog (lsINFO, LedgerConsensus) << "Proposers:" << mPeerPositions.size () << " nw:" << neededWeight - << " thrV:" << threshVote << " thrC:" << threshConsensus; - - for (std::map::iterator it = closeTimes.begin (), end = closeTimes.end (); it != end; ++it) - { - WriteLog (lsDEBUG, LedgerConsensus) << "CCTime: seq" << mPreviousLedger->getLedgerSeq () + 1 << ": " << - it->first << " has " << it->second << ", " << threshVote << " required"; - - if (it->second >= threshVote) - { - WriteLog (lsDEBUG, LedgerConsensus) << "Close time consensus reached: " << it->first; - closeTime = it->first; - threshVote = it->second; - - if (threshVote >= threshConsensus) - mHaveCloseTimeConsensus = true; - } - } - - CondLog (!mHaveCloseTimeConsensus, lsDEBUG, LedgerConsensus) << "No CT consensus: Proposers:" << mPeerPositions.size () - << " Proposing:" << (mProposing ? "yes" : "no") << " Thresh:" << threshConsensus << " Pos:" << closeTime; - } - - if (!changes && - ((closeTime != roundCloseTime (mOurPosition->getCloseTime ())) || - mOurPosition->isStale (ourCutoff))) - { - // close time changed or our position is stale - ourPosition = mAcquired[mOurPosition->getCurrentHash ()]->snapShot (true); - assert (ourPosition); - changes = true; // We pretend our position changed to force a new proposal - } - - if (changes) - { - uint256 newHash = ourPosition->getHash (); - WriteLog (lsINFO, LedgerConsensus) << "Position change: CTime " << closeTime << ", tx " << newHash; - - if (mOurPosition->changePosition (newHash, closeTime)) - { - if (mProposing) + WriteLog (lsINFO, LedgerConsensus) << "Bowing out of consensus"; + mOurPosition->bowOut (); propose (); + } - mapComplete (newHash, ourPosition, false); + mProposing = false; + // mValidating = false; + mPeerPositions.clear (); + mDisputes.clear (); + mCloseTimes.clear (); + mDeadNodes.clear (); + playbackProposals (); + } + + if (mPreviousLedger->getHash () == mPrevLedgerHash) + return; + + // we need to switch the ledger we're working from + Ledger::pointer newLCL = + getApp().getLedgerMaster ().getLedgerByHash (lclHash); + + if (newLCL) + { + assert (newLCL->isClosed ()); + assert (newLCL->isImmutable ()); + assert (newLCL->getHash () == lclHash); + mPreviousLedger = newLCL; + mPrevLedgerHash = lclHash; + } + else if (!mAcquiringLedger || (mAcquiringLedger->getHash () + != mPrevLedgerHash)) + { + // need to start acquiring the correct consensus LCL + WriteLog (lsWARNING, LedgerConsensus) + << "Need consensus ledger " << mPrevLedgerHash; + + if (mAcquiringLedger) + { + getApp().getInboundLedgers ().dropLedger + (mAcquiringLedger->getHash ()); + } + + mAcquiringLedger = getApp().getInboundLedgers ().findCreate + (mPrevLedgerHash, 0, true); + mHaveCorrectLCL = false; + return; + } + else + return; + + WriteLog (lsINFO, LedgerConsensus) + << "Have the consensus ledger " << mPrevLedgerHash; + mHaveCorrectLCL = true; + + mCloseResolution = ContinuousLedgerTiming::getNextLedgerTimeResolution ( + mPreviousLedger->getCloseResolution () + ,mPreviousLedger->getCloseAgree () + ,mPreviousLedger->getLedgerSeq () + 1); + } + + void timerEntry () + { + if ((mState != lcsFINISHED) && (mState != lcsACCEPTED)) + checkLCL (); + + mCurrentMSeconds = + (boost::posix_time::microsec_clock::universal_time () + - mConsensusStartTime).total_milliseconds (); + mClosePercent = mCurrentMSeconds * 100 / mPreviousMSeconds; + + switch (mState) + { + case lcsPRE_CLOSE: + statePreClose (); + return; + + case lcsESTABLISH: + stateEstablish (); + + if (mState != lcsFINISHED) return; + + fallthru (); + + case lcsFINISHED: + stateFinished (); + + if (mState != lcsACCEPTED) return; + + fallthru (); + + case lcsACCEPTED: + stateAccepted (); + return; + } + + assert (false); + } + + // state handlers + void statePreClose () + { + // it is shortly before ledger close time + bool anyTransactions + = getApp().getLedgerMaster ().getCurrentLedger () + ->peekTransactionMap ()->getHash ().isNonZero (); + int proposersClosed = mPeerPositions.size (); + int proposersValidated + = getApp().getValidations ().getTrustedValidationCount + (mPrevLedgerHash); + + // This ledger is open. This computes how long since last ledger closed + int sinceClose; + int idleInterval = 0; + + if (mHaveCorrectLCL && mPreviousLedger->getCloseAgree ()) + { + // we can use consensus timing + sinceClose = 1000 * (getApp().getOPs ().getCloseTimeNC () + - mPreviousLedger->getCloseTimeNC ()); + idleInterval = 2 * mPreviousLedger->getCloseResolution (); + + if (idleInterval < LEDGER_IDLE_INTERVAL) + idleInterval = LEDGER_IDLE_INTERVAL; + } + else + { + sinceClose = 1000 * (getApp().getOPs ().getCloseTimeNC () + - getApp().getOPs ().getLastCloseTime ()); + idleInterval = LEDGER_IDLE_INTERVAL; + } + + if (ContinuousLedgerTiming::shouldClose (anyTransactions + , mPreviousProposers, proposersClosed, proposersValidated + , mPreviousMSeconds, sinceClose, mCurrentMSeconds + , idleInterval)) + { + closeLedger (); } } -} - -/** Check if we've reached consensus -*/ -bool LedgerConsensus::haveConsensus (bool forReal) -{ - // CHECKME: should possibly count unacquired TX sets as disagreeing - int agree = 0, disagree = 0; - uint256 ourPosition = mOurPosition->getCurrentHash (); - - BOOST_FOREACH (u160_prop_pair & it, mPeerPositions) + /** We are establishing a consensus + */ + void stateEstablish () { - if (!it.second->isBowOut ()) + + // Give everyone a chance to take an initial position + if (mCurrentMSeconds < LEDGER_MIN_CONSENSUS) + return; + + updateOurPositions (); + + if (!mHaveCloseTimeConsensus) { - if (it.second->getCurrentHash () == ourPosition) - ++agree; - else + CondLog (haveConsensus (false), lsINFO, LedgerConsensus) + << "We have TX consensus but not CT consensus"; + } + else if (haveConsensus (true)) + { + WriteLog (lsINFO, LedgerConsensus) + << "Converge cutoff (" << mPeerPositions.size () + << " participants)"; + mState = lcsFINISHED; + beginAccept (false); + } + } + + void stateFinished () + { + // we are processing the finished ledger + // logic of calculating next ledger advances us out of this state + // nothing to do + } + void stateAccepted () + { + // we have accepted a new ledger + endConsensus (); + } + + /** Check if we've reached consensus + */ + bool haveConsensus (bool forReal) + { + // CHECKME: should possibly count unacquired TX sets as disagreeing + int agree = 0, disagree = 0; + uint256 ourPosition = mOurPosition->getCurrentHash (); + + BOOST_FOREACH (u160_prop_pair & it, mPeerPositions) + { + if (!it.second->isBowOut ()) { - WriteLog (lsDEBUG, LedgerConsensus) << it.first.GetHex () << " has " << it.second->getCurrentHash ().GetHex (); - ++disagree; - if (mCompares.count(it.second->getCurrentHash()) == 0) - { // Make sure we have generated disputes - uint256 hash = it.second->getCurrentHash(); - WriteLog (lsDEBUG, LedgerConsensus) << "We have not compared to " << hash; - boost::unordered_map::iterator it1 = mAcquired.find (hash); - boost::unordered_map::iterator it2 = mAcquired.find (mOurPosition->getCurrentHash ()); - if ((it1 != mAcquired.end()) && (it2 != mAcquired.end()) && (it1->second) && (it2->second)) - { - mCompares.insert(hash); - createDisputes(it2->second, it1->second); + if (it.second->getCurrentHash () == ourPosition) + { + ++agree; + } + else + { + WriteLog (lsDEBUG, LedgerConsensus) << it.first.GetHex () + << " has " << it.second->getCurrentHash ().GetHex (); + ++disagree; + if (mCompares.count(it.second->getCurrentHash()) == 0) + { // Make sure we have generated disputes + uint256 hash = it.second->getCurrentHash(); + WriteLog (lsDEBUG, LedgerConsensus) + << "We have not compared to " << hash; + boost::unordered_map::iterator + it1 = mAcquired.find (hash); + boost::unordered_map::iterator + it2 = mAcquired.find + (mOurPosition->getCurrentHash ()); + if ((it1 != mAcquired.end()) && (it2 != mAcquired.end()) + && (it1->second) && (it2->second)) + { + mCompares.insert(hash); + createDisputes(it2->second, it1->second); + } } } } } + int currentValidations = getApp().getValidations () + .getNodesAfter (mPrevLedgerHash); + + WriteLog (lsDEBUG, LedgerConsensus) + << "Checking for TX consensus: agree=" << agree + << ", disagree=" << disagree; + + return ContinuousLedgerTiming::haveConsensus (mPreviousProposers, + agree + disagree, agree, currentValidations + , mPreviousMSeconds, mCurrentMSeconds, forReal, mConsensusFail); } - int currentValidations = getApp().getValidations ().getNodesAfter (mPrevLedgerHash); - WriteLog (lsDEBUG, LedgerConsensus) << "Checking for TX consensus: agree=" << agree << ", disagree=" << disagree; - - return ContinuousLedgerTiming::haveConsensus (mPreviousProposers, agree + disagree, agree, currentValidations, - mPreviousMSeconds, mCurrentMSeconds, forReal, mConsensusFail); -} - -/** Get a transaction tree, fetching it from the network is required and requested -*/ -SHAMap::pointer LedgerConsensus::getTransactionTree (uint256 const& hash, bool doAcquire) -{ - boost::unordered_map::iterator it = mAcquired.find (hash); - - if (it != mAcquired.end ()) - return it->second; - - if (mState == lcsPRE_CLOSE) + /** A server has taken a new position, adjust our tracking + */ + bool peerPosition (LedgerProposal::ref newPosition) { - SHAMap::pointer currentMap = getApp().getLedgerMaster ().getCurrentLedger ()->peekTransactionMap (); + uint160 peerID = newPosition->getPeerID (); - if (currentMap->getHash () == hash) + if (mDeadNodes.find (peerID) != mDeadNodes.end ()) { - WriteLog (lsDEBUG, LedgerConsensus) << "Map " << hash << " is our current"; - currentMap = currentMap->snapShot (false); - mapComplete (hash, currentMap, false); - return currentMap; - } - } - - if (doAcquire) - { - TransactionAcquire::pointer& acquiring = mAcquiring[hash]; - - if (!acquiring) - { - if (hash.isZero ()) - { - SHAMap::pointer empty = boost::make_shared (smtTRANSACTION); - mapComplete (hash, empty, false); - return empty; - } - - acquiring = boost::make_shared (hash); - startAcquiring (acquiring); - } - } - - return SHAMap::pointer (); -} - -/** Begin acquiring a transaction set -*/ -void LedgerConsensus::startAcquiring (TransactionAcquire::pointer acquire) -{ - boost::unordered_map< uint256, std::vector< boost::weak_ptr > >::iterator it = - mPeerData.find (acquire->getHash ()); - - if (it != mPeerData.end ()) - { - // Add any peers we already know have his transaction set - std::vector< boost::weak_ptr >& peerList = it->second; - std::vector< boost::weak_ptr >::iterator pit = peerList.begin (); - - while (pit != peerList.end ()) - { - Peer::pointer pr = pit->lock (); - - if (!pr) - pit = peerList.erase (pit); - else - { - acquire->peerHas (pr); - ++pit; - } - } - } - - std::vector peerList = getApp().getPeers ().getPeerVector (); - BOOST_FOREACH (Peer::ref peer, peerList) - { - if (peer->hasTxSet (acquire->getHash ())) - acquire->peerHas (peer); - } - - acquire->setTimer (); -} - -/** Make and send a proposal -*/ -void LedgerConsensus::propose () -{ - WriteLog (lsTRACE, LedgerConsensus) << "We propose: " << - (mOurPosition->isBowOut () ? std::string ("bowOut") : mOurPosition->getCurrentHash ().GetHex ()); - protocol::TMProposeSet prop; - - prop.set_currenttxhash (mOurPosition->getCurrentHash ().begin (), 256 / 8); - prop.set_previousledger (mOurPosition->getPrevLedger ().begin (), 256 / 8); - prop.set_proposeseq (mOurPosition->getProposeSeq ()); - prop.set_closetime (mOurPosition->getCloseTime ()); - - Blob pubKey = mOurPosition->getPubKey (); - Blob sig = mOurPosition->sign (); - prop.set_nodepubkey (&pubKey[0], pubKey.size ()); - prop.set_signature (&sig[0], sig.size ()); - getApp().getPeers ().relayMessage (NULL, - boost::make_shared (prop, protocol::mtPROPOSE_LEDGER)); -} - -/** Add a disputed transaction (one that at least one node wants in the consensus set and - at least one node does not) to our tracking -*/ -void LedgerConsensus::addDisputedTransaction (uint256 const& txID, Blob const& tx) -{ - if (mDisputes.find (txID) != mDisputes.end ()) - return; - - WriteLog (lsDEBUG, LedgerConsensus) << "Transaction " << txID << " is disputed"; - - bool ourVote = false; - - if (mOurPosition) - { - boost::unordered_map::iterator mit = mAcquired.find (mOurPosition->getCurrentHash ()); - - if (mit != mAcquired.end ()) - ourVote = mit->second->hasItem (txID); - else - assert (false); // We don't have our own position? - } - - DisputedTx::pointer txn = boost::make_shared (txID, tx, ourVote); - mDisputes[txID] = txn; - - BOOST_FOREACH (u160_prop_pair & pit, mPeerPositions) - { - boost::unordered_map::const_iterator cit = - mAcquired.find (pit.second->getCurrentHash ()); - - if ((cit != mAcquired.end ()) && cit->second) - txn->setVote (pit.first, cit->second->hasItem (txID)); - } - - // If we didn't relay this transaction recently, relay it - if (getApp().getHashRouter ().setFlag (txID, SF_RELAYED)) - { - protocol::TMTransaction msg; - msg.set_rawtransaction (& (tx.front ()), tx.size ()); - msg.set_status (protocol::tsNEW); - msg.set_receivetimestamp (getApp().getOPs ().getNetworkTimeNC ()); - PackedMessage::pointer packet = boost::make_shared (msg, protocol::mtTRANSACTION); - getApp().getPeers ().relayMessage (NULL, packet); - } -} - -/** A server has taken a new position, adjust our tracking -*/ -bool LedgerConsensus::peerPosition (LedgerProposal::ref newPosition) -{ - uint160 peerID = newPosition->getPeerID (); - - if (mDeadNodes.find (peerID) != mDeadNodes.end ()) - { - WriteLog (lsINFO, LedgerConsensus) << "Position from dead node: " << peerID.GetHex (); - return false; - } - - LedgerProposal::pointer& currentPosition = mPeerPositions[peerID]; - - if (currentPosition) - { - assert (peerID == currentPosition->getPeerID ()); - - if (newPosition->getProposeSeq () <= currentPosition->getProposeSeq ()) + WriteLog (lsINFO, LedgerConsensus) + << "Position from dead node: " << peerID.GetHex (); return false; - } + } + + LedgerProposal::pointer& currentPosition = mPeerPositions[peerID]; + + if (currentPosition) + { + assert (peerID == currentPosition->getPeerID ()); + + if (newPosition->getProposeSeq () + <= currentPosition->getProposeSeq ()) + { + return false; + } + } + + if (newPosition->getProposeSeq () == 0) + { + // new initial close time estimate + WriteLog (lsTRACE, LedgerConsensus) + << "Peer reports close time as " + << newPosition->getCloseTime (); + ++mCloseTimes[newPosition->getCloseTime ()]; + } + else if (newPosition->getProposeSeq () == LedgerProposal::seqLeave) + { + // peer bows out + WriteLog (lsINFO, LedgerConsensus) + << "Peer bows out: " << peerID.GetHex (); + BOOST_FOREACH (u256_lct_pair & it, mDisputes) + it.second->unVote (peerID); + mPeerPositions.erase (peerID); + mDeadNodes.insert (peerID); + return true; + } + + + WriteLog (lsTRACE, LedgerConsensus) << "Processing peer proposal " + << newPosition->getProposeSeq () << "/" + << newPosition->getCurrentHash (); + currentPosition = newPosition; + + SHAMap::pointer set + = getTransactionTree (newPosition->getCurrentHash (), true); + + if (set) + { + BOOST_FOREACH (u256_lct_pair & it, mDisputes) + it.second->setVote (peerID, set->hasItem (it.first)); + } + else + { + WriteLog (lsDEBUG, LedgerConsensus) + << "Don't have tx set for peer"; + // BOOST_FOREACH(u256_lct_pair& it, mDisputes) + // it.second->unVote(peerID); + } - if (newPosition->getProposeSeq () == 0) - { - // new initial close time estimate - WriteLog (lsTRACE, LedgerConsensus) << "Peer reports close time as " << newPosition->getCloseTime (); - ++mCloseTimes[newPosition->getCloseTime ()]; - } - else if (newPosition->getProposeSeq () == LedgerProposal::seqLeave) - { - // peer bows out - WriteLog (lsINFO, LedgerConsensus) << "Peer bows out: " << peerID.GetHex (); - BOOST_FOREACH (u256_lct_pair & it, mDisputes) - it.second->unVote (peerID); - mPeerPositions.erase (peerID); - mDeadNodes.insert (peerID); return true; } - - WriteLog (lsTRACE, LedgerConsensus) << "Processing peer proposal " - << newPosition->getProposeSeq () << "/" << newPosition->getCurrentHash (); - currentPosition = newPosition; - - SHAMap::pointer set = getTransactionTree (newPosition->getCurrentHash (), true); - - if (set) + /** A peer has informed us that it can give us a transaction set + */ + bool peerHasSet (Peer::ref peer, uint256 const& hashSet + , protocol::TxSetStatus status) { - BOOST_FOREACH (u256_lct_pair & it, mDisputes) - it.second->setVote (peerID, set->hasItem (it.first)); - } - else - { - WriteLog (lsDEBUG, LedgerConsensus) << "Don't have tx set for peer"; - // BOOST_FOREACH(u256_lct_pair& it, mDisputes) - // it.second->unVote(peerID); - } + if (status != protocol::tsHAVE) // Indirect requests for future support + return true; - return true; -} + std::vector< boost::weak_ptr >& set = mPeerData[hashSet]; + BOOST_FOREACH (boost::weak_ptr& iit, set) + + if (iit.lock () == peer) + return false; + + set.push_back (peer); + boost::unordered_map::iterator acq + = mAcquiring.find (hashSet); + + if (acq != mAcquiring.end ()) + { // make sure it doesn't go away + TransactionAcquire::pointer ta = acq->second; + ta->peerHas (peer); + } -/** A peer has informed us that it can give us a transaction set -*/ -bool LedgerConsensus::peerHasSet (Peer::ref peer, uint256 const& hashSet, protocol::TxSetStatus status) -{ - if (status != protocol::tsHAVE) // Indirect requests are for future support return true; - - std::vector< boost::weak_ptr >& set = mPeerData[hashSet]; - BOOST_FOREACH (boost::weak_ptr& iit, set) - - if (iit.lock () == peer) - return false; - - set.push_back (peer); - boost::unordered_map::iterator acq = mAcquiring.find (hashSet); - - if (acq != mAcquiring.end ()) - { - TransactionAcquire::pointer ta = acq->second; // make sure it doesn't go away - ta->peerHas (peer); } - return true; -} - -/** A peer has sent us some nodes from a transaction set -*/ -SHAMapAddNode LedgerConsensus::peerGaveNodes (Peer::ref peer, uint256 const& setHash, - const std::list& nodeIDs, const std::list< Blob >& nodeData) -{ - boost::unordered_map::iterator acq = mAcquiring.find (setHash); - - if (acq == mAcquiring.end ()) + /** A peer has sent us some nodes from a transaction set + */ + SHAMapAddNode peerGaveNodes (Peer::ref peer + , uint256 const& setHash, const std::list& nodeIDs + , const std::list< Blob >& nodeData) { - WriteLog (lsDEBUG, LedgerConsensus) << "Got TX data for set no longer acquiring: " << setHash; - return SHAMapAddNode (); - } + boost::unordered_map::iterator acq + = mAcquiring.find (setHash); - TransactionAcquire::pointer set = acq->second; // We must keep the set around during the function - return set->takeNodes (nodeIDs, nodeData, peer); -} - -/** We have a new LCL and must accept it -*/ -void LedgerConsensus::beginAccept (bool synchronous) -{ - SHAMap::pointer consensusSet = mAcquired[mOurPosition->getCurrentHash ()]; - - if (!consensusSet) - { - WriteLog (lsFATAL, LedgerConsensus) << "We don't have a consensus set"; - abort (); - return; - } - - getApp().getOPs ().newLCL (mPeerPositions.size (), mCurrentMSeconds, mNewLedgerHash); - - if (synchronous) - accept (consensusSet, LoadEvent::pointer ()); - else - { // FIXME: Post to JobQueue, not I/O service - getApp().getIOService ().post (BIND_TYPE (&LedgerConsensus::accept, shared_from_this (), consensusSet, - getApp().getJobQueue ().getLoadEvent (jtACCEPTLEDGER, "LedgerConsensus::beginAccept"))); - } -} - -/** If we radically changed our consensus context for some reason, we need to - replay recent proposals so that they're not lost. -*/ -void LedgerConsensus::playbackProposals () -{ - boost::unordered_map < uint160, - std::list > & storedProposals = getApp().getOPs ().peekStoredProposals (); - - for (boost::unordered_map< uint160, std::list >::iterator - it = storedProposals.begin (), end = storedProposals.end (); it != end; ++it) - { - bool relay = false; - BOOST_FOREACH (LedgerProposal::ref proposal, it->second) + if (acq == mAcquiring.end ()) { - if (proposal->hasSignature ()) - { - // we have the signature but don't know the ledger so couldn't verify - proposal->setPrevLedger (mPrevLedgerHash); + WriteLog (lsDEBUG, LedgerConsensus) + << "Got TX data for set no longer acquiring: " << setHash; + return SHAMapAddNode (); + } + // We must keep the set around during the function + TransactionAcquire::pointer set = acq->second; + return set->takeNodes (nodeIDs, nodeData, peer); + } - if (proposal->checkSign ()) + bool isOurPubKey (const RippleAddress & k) + { + return k == mValPublic; + } + + /** Simulate a consensus round without any network traffic + */ + void simulate () + { + WriteLog (lsINFO, LedgerConsensus) << "Simulating consensus"; + closeLedger (); + mCurrentMSeconds = 100; + beginAccept (true); + endConsensus (); + WriteLog (lsINFO, LedgerConsensus) << "Simulation complete"; + } +private: + /** We have a new last closed ledger, process it. Final accept logic + */ + void accept (SHAMap::ref set, LoadEvent::pointer) + { + if (set->getHash ().isNonZero ()) + // put our set where others can get it later + getApp().getOPs ().takePosition (mPreviousLedger + ->getLedgerSeq (), set); + + { + Application::ScopedLockType lock + (getApp ().getMasterLock (), __FILE__, __LINE__); + + assert (set->getHash () == mOurPosition->getCurrentHash ()); + // these are now obsolete + getApp().getOPs ().peekStoredProposals ().clear (); + + uint32 closeTime = roundCloseTime (mOurPosition->getCloseTime ()); + bool closeTimeCorrect = true; + + if (closeTime == 0) + { + // we agreed to disagree + closeTimeCorrect = false; + closeTime = mPreviousLedger->getCloseTimeNC () + 1; + } + + WriteLog (lsDEBUG, LedgerConsensus) + << "Report: Prop=" << (mProposing ? "yes" : "no") + << " val=" << (mValidating ? "yes" : "no") + << " corLCL=" << (mHaveCorrectLCL ? "yes" : "no") + << " fail=" << (mConsensusFail ? "yes" : "no"); + WriteLog (lsDEBUG, LedgerConsensus) + << "Report: Prev = " << mPrevLedgerHash + << ":" << mPreviousLedger->getLedgerSeq (); + WriteLog (lsDEBUG, LedgerConsensus) + << "Report: TxSt = " << set->getHash () + << ", close " << closeTime << (closeTimeCorrect ? "" : "X"); + + CanonicalTXSet failedTransactions (set->getHash ()); + + Ledger::pointer newLCL + = boost::make_shared (false + , boost::ref (*mPreviousLedger)); + + // Set up to write SHAMap changes to our database, + // perform updates, extract changes + newLCL->peekTransactionMap ()->armDirty (); + newLCL->peekAccountStateMap ()->armDirty (); + WriteLog (lsDEBUG, LedgerConsensus) + << "Applying consensus set transactions to the" + << " last closed ledger"; + applyTransactions (set, newLCL, newLCL, failedTransactions, false); + newLCL->updateSkipList (); + newLCL->setClosed (); + boost::shared_ptr acctNodes + = newLCL->peekAccountStateMap ()->disarmDirty (); + boost::shared_ptr txnNodes + = newLCL->peekTransactionMap ()->disarmDirty (); + + // write out dirty nodes (temporarily done here) + int fc; + + while ((fc = SHAMap::flushDirty (*acctNodes, 256 + , hotACCOUNT_NODE, newLCL->getLedgerSeq ())) > 0) + { + WriteLog (lsTRACE, LedgerConsensus) + << "Flushed " << fc << " dirty state nodes"; + } + + while ((fc = SHAMap::flushDirty (*txnNodes, 256 + , hotTRANSACTION_NODE, newLCL->getLedgerSeq ())) > 0) + { + WriteLog (lsTRACE, LedgerConsensus) + << "Flushed " << fc << " dirty transaction nodes"; + } + + newLCL->setAccepted (closeTime, mCloseResolution, closeTimeCorrect); + newLCL->updateHash (); + newLCL->setImmutable (); + getApp().getLedgerMaster().storeLedger(newLCL); + + WriteLog (lsDEBUG, LedgerConsensus) + << "Report: NewL = " << newLCL->getHash () + << ":" << newLCL->getLedgerSeq (); + uint256 newLCLHash = newLCL->getHash (); + + if (ShouldLog (lsTRACE, LedgerConsensus)) + { + WriteLog (lsTRACE, LedgerConsensus) << "newLCL"; + Json::Value p; + newLCL->addJson (p + , LEDGER_JSON_DUMP_TXRP | LEDGER_JSON_DUMP_STATE); + WriteLog (lsTRACE, LedgerConsensus) << p; + } + + statusChange (protocol::neACCEPTED_LEDGER, *newLCL); + + if (mValidating && !mConsensusFail) + { + uint256 signingHash; + SerializedValidation::pointer v = + boost::make_shared + (newLCLHash, getApp().getOPs ().getValidationTimeNC () + , mValPublic, mProposing); + v->setFieldU32 (sfLedgerSequence, newLCL->getLedgerSeq ()); + addLoad(v); + + if (((newLCL->getLedgerSeq () + 1) % 256) == 0) + // next ledger is flag ledger { - WriteLog (lsINFO, LedgerConsensus) << "Applying stored proposal"; - relay = peerPosition (proposal); + getApp().getFeeVote ().doValidation (newLCL, *v); + getApp().getFeatureTable ().doValidation (newLCL, *v); + } + + v->sign (signingHash, mValPrivate); + v->setTrusted (); + // suppress it if we receive it - FIXME: wrong suppression + getApp().getHashRouter ().addSuppression (signingHash); + getApp().getValidations ().addValidation (v, "local"); + getApp().getOPs ().setLastValidation (v); + Blob validation = v->getSigned (); + protocol::TMValidation val; + val.set_validation (&validation[0], validation.size ()); + int j = getApp().getPeers ().relayMessage (NULL, + boost::make_shared + (val, protocol::mtVALIDATION)); + WriteLog (lsINFO, LedgerConsensus) + << "CNF Val " << newLCLHash << " to " << j << " peers"; + } + else + WriteLog (lsINFO, LedgerConsensus) + << "CNF newLCL " << newLCLHash; + + Ledger::pointer newOL = boost::make_shared + (true, boost::ref (*newLCL)); + LedgerMaster::ScopedLockType sl + (getApp().getLedgerMaster ().peekMutex (), __FILE__, __LINE__); + + // Apply disputed transactions that didn't get in + TransactionEngine engine (newOL); + BOOST_FOREACH (u256_lct_pair & it, mDisputes) + { + if (!it.second->getOurVote ()) + { + // we voted NO + try + { + WriteLog (lsDEBUG, LedgerConsensus) + << "Test applying disputed transaction that did" + << " not get in"; + SerializerIterator sit (it.second->peekTransaction ()); + SerializedTransaction::pointer txn + = boost::make_shared + (boost::ref (sit)); + + if (applyTransaction (engine, txn, newOL, true, false)) + { + failedTransactions.push_back (txn); + } + } + catch (...) + { + WriteLog (lsDEBUG, LedgerConsensus) + << "Failed to apply transaction we voted NO on"; + } } } - else if (proposal->isPrevLedger (mPrevLedgerHash)) - relay = peerPosition (proposal); - if (relay) + WriteLog (lsDEBUG, LedgerConsensus) + << "Applying transactions from current open ledger"; + applyTransactions (getApp().getLedgerMaster ().getCurrentLedger + ()->peekTransactionMap (), newOL, newLCL, + failedTransactions, true); + getApp().getLedgerMaster ().pushLedger (newLCL, newOL); + mNewLedgerHash = newLCL->getHash (); + mState = lcsACCEPTED; + sl.unlock (); + + if (mValidating) { - WriteLog (lsWARNING, LedgerConsensus) << "We should do delayed relay of this proposal, but we cannot"; - } + // see how close our close time is to other node's + // close time reports + WriteLog (lsINFO, LedgerConsensus) + << "We closed at " + << lexicalCastThrow (mCloseTime); + uint64 closeTotal = mCloseTime; + int closeCount = 1; -#if 0 // FIXME: We can't do delayed relay because we don't have the signature - std::set peers - - if (relay && getApp().getHashRouter ().swapSet (proposal.getSuppress (), set, SF_RELAYED)) - { - WriteLog (lsDEBUG, LedgerConsensus) << "Stored proposal delayed relay"; - protocol::TMProposeSet set; - set.set_proposeseq - set.set_currenttxhash (, 256 / 8); - previousledger - closetime - nodepubkey - signature - PackedMessage::pointer message = boost::make_shared (set, protocol::mtPROPOSE_LEDGER); - getApp().getPeers ().relayMessageBut (peers, message); - } - -#endif - } - } -} - -// VFALCO TODO clean these macros up and put them somewhere. Try to eliminate them if possible. -#define LCAT_SUCCESS 0 -#define LCAT_FAIL 1 -#define LCAT_RETRY 2 - -/** Apply a transaction to a ledger -*/ -int LedgerConsensus::applyTransaction (TransactionEngine& engine, SerializedTransaction::ref txn, Ledger::ref ledger, - bool openLedger, bool retryAssured) -{ - // Returns false if the transaction has need not be retried. - TransactionEngineParams parms = openLedger ? tapOPEN_LEDGER : tapNONE; - - if (retryAssured) - parms = static_cast (parms | tapRETRY); - - if (getApp().getHashRouter ().setFlag (txn->getTransactionID (), SF_SIGGOOD)) - parms = static_cast (parms | tapNO_CHECK_SIGN); - - WriteLog (lsDEBUG, LedgerConsensus) << "TXN " << txn->getTransactionID () - << (openLedger ? " open" : " closed") - << (retryAssured ? "/retry" : "/final"); - WriteLog (lsTRACE, LedgerConsensus) << txn->getJson (0); - - // VFALCO TODO figure out what this "trust network" is all about and why it needs exceptions. -#ifndef TRUST_NETWORK - - try - { -#endif - - bool didApply; - TER result = engine.applyTransaction (*txn, parms, didApply); - - if (didApply) - { - WriteLog (lsDEBUG, LedgerConsensus) << "Transaction success: " << transHuman (result); - return LCAT_SUCCESS; - } - - if (isTefFailure (result) || isTemMalformed (result) || isTelLocal (result)) - { - // failure - WriteLog (lsDEBUG, LedgerConsensus) << "Transaction failure: " << transHuman (result); - return LCAT_FAIL; - } - - WriteLog (lsDEBUG, LedgerConsensus) << "Transaction retry: " << transHuman (result); - assert (!ledger->hasTransaction (txn->getTransactionID ())); - return LCAT_RETRY; - -#ifndef TRUST_NETWORK - } - catch (...) - { - WriteLog (lsWARNING, LedgerConsensus) << "Throws"; - return false; - } - -#endif -} - -/** Apply a set of transactions to a ledger -*/ -void LedgerConsensus::applyTransactions (SHAMap::ref set, Ledger::ref applyLedger, - Ledger::ref checkLedger, CanonicalTXSet& failedTransactions, bool openLgr) -{ - TransactionEngine engine (applyLedger); - - for (SHAMapItem::pointer item = set->peekFirstItem (); !!item; item = set->peekNextItem (item->getTag ())) - if (!checkLedger->hasTransaction (item->getTag ())) - { - WriteLog (lsINFO, LedgerConsensus) << "Processing candidate transaction: " << item->getTag (); -#ifndef TRUST_NETWORK - - try - { -#endif - SerializerIterator sit (item->peekSerializer ()); - SerializedTransaction::pointer txn = boost::make_shared (boost::ref (sit)); - - if (applyTransaction (engine, txn, applyLedger, openLgr, true) == LCAT_RETRY) - failedTransactions.push_back (txn); - -#ifndef TRUST_NETWORK - } - catch (...) - { - WriteLog (lsWARNING, LedgerConsensus) << " Throws"; - } - -#endif - } - - int changes; - bool certainRetry = true; - - for (int pass = 0; pass < LEDGER_TOTAL_PASSES; ++pass) - { - WriteLog (lsDEBUG, LedgerConsensus) << "Pass: " << pass << " Txns: " << failedTransactions.size () - << (certainRetry ? " retriable" : " final"); - changes = 0; - - CanonicalTXSet::iterator it = failedTransactions.begin (); - - while (it != failedTransactions.end ()) - { - try - { - switch (applyTransaction (engine, it->second, applyLedger, openLgr, certainRetry)) + for (std::map::iterator it = mCloseTimes.begin () + , end = mCloseTimes.end (); it != end; ++it) { - case LCAT_SUCCESS: - it = failedTransactions.erase (it); - ++changes; - break; + // FIXME: Use median, not average + WriteLog (lsINFO, LedgerConsensus) + << lexicalCastThrow (it->second) + << " time votes for " + << lexicalCastThrow (it->first); + closeCount += it->second; + closeTotal += static_cast + (it->first) * static_cast (it->second); + } - case LCAT_FAIL: - it = failedTransactions.erase (it); - break; + closeTotal += (closeCount / 2); + closeTotal /= closeCount; + int offset = static_cast (closeTotal) + - static_cast (mCloseTime); + WriteLog (lsINFO, LedgerConsensus) + << "Our close offset is estimated at " + << offset << " (" << closeCount << ")"; + getApp().getOPs ().closeTimeOffset (offset); + } + } + } - case LCAT_RETRY: - ++it; + /** Begin acquiring a transaction set + */ + void startAcquiring (TransactionAcquire::pointer acquire) + { + boost::unordered_map< uint256, + std::vector< boost::weak_ptr > >::iterator it = + mPeerData.find (acquire->getHash ()); + + if (it != mPeerData.end ()) + { + // Add any peers we already know have his transaction set + std::vector< boost::weak_ptr >& peerList = it->second; + std::vector< boost::weak_ptr >::iterator pit + = peerList.begin (); + + while (pit != peerList.end ()) + { + Peer::pointer pr = pit->lock (); + + if (!pr) + { + pit = peerList.erase (pit); + } + else + { + acquire->peerHas (pr); + ++pit; } } - catch (...) - { - WriteLog (lsWARNING, LedgerConsensus) << "Transaction throws"; - it = failedTransactions.erase (it); - } } - WriteLog (lsDEBUG, LedgerConsensus) << "Pass: " << pass << " finished " << changes << " changes"; + std::vector peerList + = getApp().getPeers ().getPeerVector (); + BOOST_FOREACH (Peer::ref peer, peerList) + { + if (peer->hasTxSet (acquire->getHash ())) + acquire->peerHas (peer); + } - // A non-retry pass made no changes - if (!changes && !certainRetry) + acquire->setTimer (); + } + + + + + // Where is this function? + SHAMap::pointer find (uint256 const & hash); + + + + + + /** Compare two proposed transaction sets and create disputed + transctions structures for any mismatches + */ + void createDisputes (SHAMap::ref m1, SHAMap::ref m2) + { + if (m1->getHash() == m2->getHash()) return; - // Stop retriable passes - if ((!changes) || (pass >= LEDGER_RETRY_PASSES)) - certainRetry = false; - } -} + WriteLog (lsDEBUG, LedgerConsensus) << "createDisputes " + << m1->getHash() << " to " << m2->getHash(); + SHAMap::Delta differences; + m1->compare (m2, differences, 16384); -uint32 LedgerConsensus::roundCloseTime (uint32 closeTime) -{ - return Ledger::roundCloseTime (closeTime, mCloseResolution); -} - -/** We have a new last closed ledger, process it -*/ -void LedgerConsensus::accept (SHAMap::ref set, LoadEvent::pointer) -{ - if (set->getHash ().isNonZero ()) // put our set where others can get it later - getApp().getOPs ().takePosition (mPreviousLedger->getLedgerSeq (), set); - - { - Application::ScopedLockType lock (getApp ().getMasterLock (), __FILE__, __LINE__); - - assert (set->getHash () == mOurPosition->getCurrentHash ()); - - getApp().getOPs ().peekStoredProposals ().clear (); // these are now obsolete - - uint32 closeTime = roundCloseTime (mOurPosition->getCloseTime ()); - bool closeTimeCorrect = true; - - if (closeTime == 0) + int dc = 0; + typedef std::map::value_type u256_diff_pair; + BOOST_FOREACH (u256_diff_pair & pos, differences) { - // we agreed to disagree - closeTimeCorrect = false; - closeTime = mPreviousLedger->getCloseTimeNC () + 1; - } - - WriteLog (lsDEBUG, LedgerConsensus) << "Report: Prop=" << (mProposing ? "yes" : "no") << " val=" << (mValidating ? "yes" : "no") << - " corLCL=" << (mHaveCorrectLCL ? "yes" : "no") << " fail=" << (mConsensusFail ? "yes" : "no"); - WriteLog (lsDEBUG, LedgerConsensus) << "Report: Prev = " << mPrevLedgerHash << ":" << mPreviousLedger->getLedgerSeq (); - WriteLog (lsDEBUG, LedgerConsensus) << "Report: TxSt = " << set->getHash () << ", close " << closeTime << (closeTimeCorrect ? "" : "X"); - - CanonicalTXSet failedTransactions (set->getHash ()); - - Ledger::pointer newLCL = boost::make_shared (false, boost::ref (*mPreviousLedger)); - - // Set up to write SHAMap changes to our database, perform updates, extract changes - newLCL->peekTransactionMap ()->armDirty (); - newLCL->peekAccountStateMap ()->armDirty (); - WriteLog (lsDEBUG, LedgerConsensus) << "Applying consensus set transactions to the last closed ledger"; - applyTransactions (set, newLCL, newLCL, failedTransactions, false); - newLCL->updateSkipList (); - newLCL->setClosed (); - boost::shared_ptr acctNodes = newLCL->peekAccountStateMap ()->disarmDirty (); - boost::shared_ptr txnNodes = newLCL->peekTransactionMap ()->disarmDirty (); - - // write out dirty nodes (temporarily done here) - int fc; - - while ((fc = SHAMap::flushDirty (*acctNodes, 256, hotACCOUNT_NODE, newLCL->getLedgerSeq ())) > 0) - { - WriteLog (lsTRACE, LedgerConsensus) << "Flushed " << fc << " dirty state nodes"; - } - - while ((fc = SHAMap::flushDirty (*txnNodes, 256, hotTRANSACTION_NODE, newLCL->getLedgerSeq ())) > 0) - { - WriteLog (lsTRACE, LedgerConsensus) << "Flushed " << fc << " dirty transaction nodes"; - } - - newLCL->setAccepted (closeTime, mCloseResolution, closeTimeCorrect); - newLCL->updateHash (); - newLCL->setImmutable (); - getApp().getLedgerMaster().storeLedger(newLCL); - - WriteLog (lsDEBUG, LedgerConsensus) << "Report: NewL = " << newLCL->getHash () << ":" << newLCL->getLedgerSeq (); - uint256 newLCLHash = newLCL->getHash (); - - if (ShouldLog (lsTRACE, LedgerConsensus)) - { - WriteLog (lsTRACE, LedgerConsensus) << "newLCL"; - Json::Value p; - newLCL->addJson (p, LEDGER_JSON_DUMP_TXRP | LEDGER_JSON_DUMP_STATE); - WriteLog (lsTRACE, LedgerConsensus) << p; - } - - statusChange (protocol::neACCEPTED_LEDGER, *newLCL); - - if (mValidating && !mConsensusFail) - { - uint256 signingHash; - SerializedValidation::pointer v = boost::make_shared - (newLCLHash, getApp().getOPs ().getValidationTimeNC (), mValPublic, mProposing); - v->setFieldU32 (sfLedgerSequence, newLCL->getLedgerSeq ()); - addLoad(v); - - if (((newLCL->getLedgerSeq () + 1) % 256) == 0) // next ledger is flag ledger + ++dc; + // create disputed transactions (from the ledger that has them) + if (pos.second.first) { - getApp().getFeeVote ().doValidation (newLCL, *v); - getApp().getFeatureTable ().doValidation (newLCL, *v); + // transaction is in first map + assert (!pos.second.second); + addDisputedTransaction (pos.first + , pos.second.first->peekData ()); } - - v->sign (signingHash, mValPrivate); - v->setTrusted (); - getApp().getHashRouter ().addSuppression (signingHash); // suppress it if we receive it - FIXME: wrong suppression - getApp().getValidations ().addValidation (v, "local"); - getApp().getOPs ().setLastValidation (v); - Blob validation = v->getSigned (); - protocol::TMValidation val; - val.set_validation (&validation[0], validation.size ()); - int j = getApp().getPeers ().relayMessage (NULL, - boost::make_shared (val, protocol::mtVALIDATION)); - WriteLog (lsINFO, LedgerConsensus) << "CNF Val " << newLCLHash << " to " << j << " peers"; + else if (pos.second.second) + { + // transaction is in second map + assert (!pos.second.first); + addDisputedTransaction (pos.first + , pos.second.second->peekData ()); + } + else // No other disagreement over a transaction should be possible + assert (false); } - else - WriteLog (lsINFO, LedgerConsensus) << "CNF newLCL " << newLCLHash; + WriteLog (lsDEBUG, LedgerConsensus) << dc << " differences found"; + } - Ledger::pointer newOL = boost::make_shared (true, boost::ref (*newLCL)); - LedgerMaster::ScopedLockType sl (getApp().getLedgerMaster ().peekMutex (), __FILE__, __LINE__); + /** Add a disputed transaction (one that at least one node wants + in the consensus set and at least one node does not) to our tracking + */ + void addDisputedTransaction (uint256 const& txID, Blob const& tx) + { + if (mDisputes.find (txID) != mDisputes.end ()) + return; - // Apply disputed transactions that didn't get in - TransactionEngine engine (newOL); + WriteLog (lsDEBUG, LedgerConsensus) << "Transaction " + << txID << " is disputed"; + + bool ourVote = false; + + if (mOurPosition) + { + boost::unordered_map::iterator mit + = mAcquired.find (mOurPosition->getCurrentHash ()); + + if (mit != mAcquired.end ()) + ourVote = mit->second->hasItem (txID); + else + assert (false); // We don't have our own position? + } + + DisputedTx::pointer txn = boost::make_shared + (txID, tx, ourVote); + mDisputes[txID] = txn; + + BOOST_FOREACH (u160_prop_pair & pit, mPeerPositions) + { + boost::unordered_map::const_iterator cit + = mAcquired.find (pit.second->getCurrentHash ()); + + if ((cit != mAcquired.end ()) && cit->second) + { + txn->setVote (pit.first, cit->second->hasItem (txID)); + } + } + + // If we didn't relay this transaction recently, relay it + if (getApp().getHashRouter ().setFlag (txID, SF_RELAYED)) + { + protocol::TMTransaction msg; + msg.set_rawtransaction (& (tx.front ()), tx.size ()); + msg.set_status (protocol::tsNEW); + msg.set_receivetimestamp (getApp().getOPs ().getNetworkTimeNC ()); + PackedMessage::pointer packet = boost::make_shared + (msg, protocol::mtTRANSACTION); + getApp().getPeers ().relayMessage (NULL, packet); + } + } + + /** Adjust the counts on all disputed transactions based + on the set of peers taking this position + */ + void adjustCount (SHAMap::ref map, const std::vector& peers) + { BOOST_FOREACH (u256_lct_pair & it, mDisputes) { - if (!it.second->getOurVote ()) + bool setHas = map->hasItem (it.second->getTransactionID ()); + BOOST_FOREACH (const uint160 & pit, peers) + it.second->setVote (pit, setHas); + } + } + + /** Make and send a proposal + */ + void propose () + { + WriteLog (lsTRACE, LedgerConsensus) << "We propose: " << + (mOurPosition->isBowOut () ? std::string ("bowOut") + : mOurPosition->getCurrentHash ().GetHex ()); + protocol::TMProposeSet prop; + + prop.set_currenttxhash (mOurPosition->getCurrentHash ().begin () + , 256 / 8); + prop.set_previousledger (mOurPosition->getPrevLedger ().begin () + , 256 / 8); + prop.set_proposeseq (mOurPosition->getProposeSeq ()); + prop.set_closetime (mOurPosition->getCloseTime ()); + + Blob pubKey = mOurPosition->getPubKey (); + Blob sig = mOurPosition->sign (); + prop.set_nodepubkey (&pubKey[0], pubKey.size ()); + prop.set_signature (&sig[0], sig.size ()); + getApp().getPeers ().relayMessage (NULL, + boost::make_shared + (prop, protocol::mtPROPOSE_LEDGER)); + } + + /** Let peers know that we a particular transactions set so they + can fetch it from us. + */ + void sendHaveTxSet (uint256 const& hash, bool direct) + { + protocol::TMHaveTransactionSet msg; + msg.set_hash (hash.begin (), 256 / 8); + msg.set_status (direct ? protocol::tsHAVE : protocol::tsCAN_GET); + PackedMessage::pointer packet + = boost::make_shared (msg, protocol::mtHAVE_SET); + getApp().getPeers ().relayMessage (NULL, packet); + } + + /** Apply a set of transactions to a ledger + */ + void applyTransactions (SHAMap::ref set, Ledger::ref applyLedger, + Ledger::ref checkLedger, CanonicalTXSet& failedTransactions, + bool openLgr) + { + TransactionEngine engine (applyLedger); + + for (SHAMapItem::pointer item = set->peekFirstItem (); !!item; + item = set->peekNextItem (item->getTag ())) + if (!checkLedger->hasTransaction (item->getTag ())) { - // we voted NO + WriteLog (lsINFO, LedgerConsensus) + << "Processing candidate transaction: " << item->getTag (); +#ifndef TRUST_NETWORK + try { - WriteLog (lsDEBUG, LedgerConsensus) << "Test applying disputed transaction that did not get in"; - SerializerIterator sit (it.second->peekTransaction ()); - SerializedTransaction::pointer txn = boost::make_shared (boost::ref (sit)); +#endif + SerializerIterator sit (item->peekSerializer ()); + SerializedTransaction::pointer txn + = boost::make_shared + (boost::ref (sit)); - if (applyTransaction (engine, txn, newOL, true, false)) + if (applyTransaction (engine, txn, + applyLedger, openLgr, true) == resultRetry) + { failedTransactions.push_back (txn); + } + +#ifndef TRUST_NETWORK } catch (...) { - WriteLog (lsDEBUG, LedgerConsensus) << "Failed to apply transaction we voted NO on"; + WriteLog (lsWARNING, LedgerConsensus) << " Throws"; + } + +#endif + } + + int changes; + bool certainRetry = true; + + for (int pass = 0; pass < LEDGER_TOTAL_PASSES; ++pass) + { + WriteLog (lsDEBUG, LedgerConsensus) << "Pass: " << pass << " Txns: " + << failedTransactions.size () + << (certainRetry ? " retriable" : " final"); + changes = 0; + + CanonicalTXSet::iterator it = failedTransactions.begin (); + + while (it != failedTransactions.end ()) + { + try + { + switch (applyTransaction (engine, it->second, + applyLedger, openLgr, certainRetry)) + { + case resultSuccess: + it = failedTransactions.erase (it); + ++changes; + break; + + case resultFail: + it = failedTransactions.erase (it); + break; + + case resultRetry: + ++it; + } + } + catch (...) + { + WriteLog (lsWARNING, LedgerConsensus) + << "Transaction throws"; + it = failedTransactions.erase (it); + } + } + + WriteLog (lsDEBUG, LedgerConsensus) << "Pass: " + << pass << " finished " << changes << " changes"; + + // A non-retry pass made no changes + if (!changes && !certainRetry) + return; + + // Stop retriable passes + if ((!changes) || (pass >= LEDGER_RETRY_PASSES)) + certainRetry = false; + } + } + + /** Apply a transaction to a ledger + */ + int applyTransaction (TransactionEngine& engine + , SerializedTransaction::ref txn, Ledger::ref ledger + , bool openLedger, bool retryAssured) + { + // Returns false if the transaction has need not be retried. + TransactionEngineParams parms = openLedger ? tapOPEN_LEDGER : tapNONE; + + if (retryAssured) + { + parms = static_cast (parms | tapRETRY); + } + + if (getApp().getHashRouter ().setFlag (txn->getTransactionID () + , SF_SIGGOOD)) + { + parms = static_cast + (parms | tapNO_CHECK_SIGN); + } + WriteLog (lsDEBUG, LedgerConsensus) << "TXN " + << txn->getTransactionID () + << (openLedger ? " open" : " closed") + << (retryAssured ? "/retry" : "/final"); + WriteLog (lsTRACE, LedgerConsensus) << txn->getJson (0); + + // VFALCO TODO figure out what this "trust network" + // is all about and why it needs exceptions. +#ifndef TRUST_NETWORK + + try + { +#endif + + bool didApply; + TER result = engine.applyTransaction (*txn, parms, didApply); + + if (didApply) + { + WriteLog (lsDEBUG, LedgerConsensus) + << "Transaction success: " << transHuman (result); + return resultSuccess; + } + + if (isTefFailure (result) || isTemMalformed + (result) || isTelLocal (result)) + { + // failure + WriteLog (lsDEBUG, LedgerConsensus) + << "Transaction failure: " << transHuman (result); + return resultFail; + } + + WriteLog (lsDEBUG, LedgerConsensus) + << "Transaction retry: " << transHuman (result); + assert (!ledger->hasTransaction (txn->getTransactionID ())); + return resultRetry; + +#ifndef TRUST_NETWORK + } + catch (...) + { + WriteLog (lsWARNING, LedgerConsensus) << "Throws"; + return false; + } + +#endif + } + + uint32 roundCloseTime (uint32 closeTime) + { + return Ledger::roundCloseTime (closeTime, mCloseResolution); + } + + /** Send a node status change message to our peers + */ + void statusChange (protocol::NodeEvent event, Ledger& ledger) + { + protocol::TMStatusChange s; + + if (!mHaveCorrectLCL) + s.set_newevent (protocol::neLOST_SYNC); + else + s.set_newevent (event); + + s.set_ledgerseq (ledger.getLedgerSeq ()); + s.set_networktime (getApp().getOPs ().getNetworkTimeNC ()); + uint256 hash = ledger.getParentHash (); + s.set_ledgerhashprevious (hash.begin (), hash.size ()); + hash = ledger.getHash (); + s.set_ledgerhash (hash.begin (), hash.size ()); + + uint32 uMin, uMax; + if (!getApp().getOPs ().getFullValidatedRange (uMin, uMax)) + { + uMin = 0; + uMax = 0; + } + s.set_firstseq (uMin); + s.set_lastseq (uMax); + + PackedMessage::pointer packet + = boost::make_shared (s, protocol::mtSTATUS_CHANGE); + getApp().getPeers ().relayMessage (NULL, packet); + WriteLog (lsTRACE, LedgerConsensus) << "send status change to peer"; + } + + /** Take an initial position on what we think the consensus should be + based on the transactions that made it into our open ledger + */ + void takeInitialPosition (Ledger& initialLedger) + { + SHAMap::pointer initialSet; + + if ((getConfig ().RUN_STANDALONE || (mProposing && mHaveCorrectLCL)) + && ((mPreviousLedger->getLedgerSeq () % 256) == 0)) + { + // previous ledger was flag ledger + SHAMap::pointer preSet + = initialLedger.peekTransactionMap ()->snapShot (true); + getApp().getFeeVote ().doVoting (mPreviousLedger, preSet); + getApp().getFeatureTable ().doVoting (mPreviousLedger, preSet); + initialSet = preSet->snapShot (false); + } + else + initialSet = initialLedger.peekTransactionMap ()->snapShot (false); + + uint256 txSet = initialSet->getHash (); + WriteLog (lsINFO, LedgerConsensus) << "initial position " << txSet; + mapComplete (txSet, initialSet, false); + + if (mValidating) + { + mOurPosition = boost::make_shared + (mValPublic, mValPrivate + , initialLedger.getParentHash () + , txSet, mCloseTime); + } + else + { + mOurPosition + = boost::make_shared + (initialLedger.getParentHash (), txSet, mCloseTime); + } + + BOOST_FOREACH (u256_lct_pair & it, mDisputes) + { + it.second->setOurVote (initialLedger.hasTransaction (it.first)); + } + + // if any peers have taken a contrary position, process disputes + boost::unordered_set found; + BOOST_FOREACH (u160_prop_pair & it, mPeerPositions) + { + uint256 set = it.second->getCurrentHash (); + + if (found.insert (set).second) + { + boost::unordered_map::iterator iit + = mAcquired.find (set); + + if (iit != mAcquired.end ()) + { + mCompares.insert(iit->second->getHash()); + createDisputes (initialSet, iit->second); } } } - WriteLog (lsDEBUG, LedgerConsensus) << "Applying transactions from current open ledger"; - applyTransactions (getApp().getLedgerMaster ().getCurrentLedger ()->peekTransactionMap (), newOL, newLCL, - failedTransactions, true); - getApp().getLedgerMaster ().pushLedger (newLCL, newOL); - mNewLedgerHash = newLCL->getHash (); - mState = lcsACCEPTED; - sl.unlock (); - - if (mValidating) - { - // see how close our close time is to other node's close time reports - WriteLog (lsINFO, LedgerConsensus) << "We closed at " << lexicalCastThrow (mCloseTime); - uint64 closeTotal = mCloseTime; - int closeCount = 1; - - for (std::map::iterator it = mCloseTimes.begin (), end = mCloseTimes.end (); it != end; ++it) - { - // FIXME: Use median, not average - WriteLog (lsINFO, LedgerConsensus) << lexicalCastThrow (it->second) << " time votes for " - << lexicalCastThrow (it->first); - closeCount += it->second; - closeTotal += static_cast (it->first) * static_cast (it->second); - } - - closeTotal += (closeCount / 2); - closeTotal /= closeCount; - int offset = static_cast (closeTotal) - static_cast (mCloseTime); - WriteLog (lsINFO, LedgerConsensus) << "Our close offset is estimated at " << offset << " (" << closeCount << ")"; - getApp().getOPs ().closeTimeOffset (offset); - } + if (mProposing) + propose (); } -} -void LedgerConsensus::endConsensus () -{ - getApp().getOPs ().endConsensus (mHaveCorrectLCL); -} - -/** Add our fee to our validation -*/ -void LedgerConsensus::addLoad(SerializedValidation::ref val) -{ - uint32 fee = std::max( - getApp().getFeeTrack().getLocalFee(), - getApp().getFeeTrack().getClusterFee()); - uint32 ref = getApp().getFeeTrack().getLoadBase(); - if (fee > ref) - val->setFieldU32(sfLoadFee, fee); -} - -/** Simulate a consensus round without any network traffic -*/ -void LedgerConsensus::simulate () -{ - WriteLog (lsINFO, LedgerConsensus) << "Simulating consensus"; - closeLedger (); - mCurrentMSeconds = 100; - beginAccept (true); - endConsensus (); - WriteLog (lsINFO, LedgerConsensus) << "Simulation complete"; -} - -Json::Value LedgerConsensus::getJson (bool full) -{ - Json::Value ret (Json::objectValue); - ret["proposing"] = mProposing; - ret["validating"] = mValidating; - ret["proposers"] = static_cast (mPeerPositions.size ()); - - if (mHaveCorrectLCL) + void updateOurPositions () { - ret["synched"] = true; - ret["ledger_seq"] = mPreviousLedger->getLedgerSeq () + 1; - ret["close_granularity"] = mCloseResolution; - } - else - ret["synched"] = false; + boost::posix_time::ptime peerCutoff + = boost::posix_time::second_clock::universal_time (); + boost::posix_time::ptime ourCutoff + = peerCutoff - boost::posix_time::seconds (PROPOSE_INTERVAL); + peerCutoff -= boost::posix_time::seconds (PROPOSE_FRESHNESS); - switch (mState) + bool changes = false; + SHAMap::pointer ourPosition; + // std::vector addedTx, removedTx; + + // Verify freshness of peer positions and compute close times + std::map closeTimes; + boost::unordered_map::iterator it + = mPeerPositions.begin (); + + while (it != mPeerPositions.end ()) + { + if (it->second->isStale (peerCutoff)) + { + // proposal is stale + uint160 peerID = it->second->getPeerID (); + WriteLog (lsWARNING, LedgerConsensus) + << "Removing stale proposal from " << peerID; + BOOST_FOREACH (u256_lct_pair & it, mDisputes) + it.second->unVote (peerID); + it = mPeerPositions.erase (it); + } + else + { + // proposal is still fresh + ++closeTimes[roundCloseTime (it->second->getCloseTime ())]; + ++it; + } + } + + BOOST_FOREACH (u256_lct_pair & it, mDisputes) + { + // Because the threshold for inclusion increases, + // time can change our position on a dispute + if (it.second->updateVote (mClosePercent, mProposing)) + { + if (!changes) + { + ourPosition = mAcquired[mOurPosition->getCurrentHash ()] + ->snapShot (true); + assert (ourPosition); + changes = true; + } + + if (it.second->getOurVote ()) // now a yes + { + ourPosition->addItem (SHAMapItem (it.first + , it.second->peekTransaction ()), true, false); + // addedTx.push_back(it.first); + } + else // now a no + { + ourPosition->delItem (it.first); + // removedTx.push_back(it.first); + } + } + } + + + int neededWeight; + + if (mClosePercent < AV_MID_CONSENSUS_TIME) + neededWeight = AV_INIT_CONSENSUS_PCT; + else if (mClosePercent < AV_LATE_CONSENSUS_TIME) + neededWeight = AV_MID_CONSENSUS_PCT; + else if (mClosePercent < AV_STUCK_CONSENSUS_TIME) + neededWeight = AV_LATE_CONSENSUS_PCT; + else + neededWeight = AV_STUCK_CONSENSUS_PCT; + + uint32 closeTime = 0; + mHaveCloseTimeConsensus = false; + + if (mPeerPositions.empty ()) + { + // no other times + mHaveCloseTimeConsensus = true; + closeTime = roundCloseTime (mOurPosition->getCloseTime ()); + } + else + { + // Threshold for non-zero vote + int threshVote = mPeerPositions.size (); + // Threshold to declare consensus + int threshConsensus = mPeerPositions.size (); + + if (mProposing) + { + ++closeTimes[roundCloseTime (mOurPosition->getCloseTime ())]; + ++threshVote; + ++threshConsensus; + } + + threshVote = ((threshVote * neededWeight) + (neededWeight / 2)) + / 100; + threshConsensus = ((threshConsensus * AV_CT_CONSENSUS_PCT) + + (AV_CT_CONSENSUS_PCT / 2)) / 100; + + if (threshVote == 0) + threshVote = 1; + + if (threshConsensus == 0) + threshConsensus = 1; + + WriteLog (lsINFO, LedgerConsensus) << "Proposers:" + << mPeerPositions.size () << " nw:" << neededWeight + << " thrV:" << threshVote << " thrC:" << threshConsensus; + + for (std::map::iterator it = closeTimes.begin () + , end = closeTimes.end (); it != end; ++it) + { + WriteLog (lsDEBUG, LedgerConsensus) << "CCTime: seq" + << mPreviousLedger->getLedgerSeq () + 1 << ": " + << it->first << " has " << it->second << ", " + << threshVote << " required"; + + if (it->second >= threshVote) + { + WriteLog (lsDEBUG, LedgerConsensus) + << "Close time consensus reached: " << it->first; + closeTime = it->first; + threshVote = it->second; + + if (threshVote >= threshConsensus) + mHaveCloseTimeConsensus = true; + } + } + + CondLog (!mHaveCloseTimeConsensus, lsDEBUG, LedgerConsensus) + << "No CT consensus: Proposers:" << mPeerPositions.size () + << " Proposing:" << (mProposing ? "yes" : "no") << " Thresh:" + << threshConsensus << " Pos:" << closeTime; + } + + if (!changes && + ((closeTime != roundCloseTime (mOurPosition->getCloseTime ())) + || mOurPosition->isStale (ourCutoff))) + { + // close time changed or our position is stale + ourPosition = mAcquired[mOurPosition->getCurrentHash ()] + ->snapShot (true); + assert (ourPosition); + changes = true; // We pretend our position changed to force + } // a new proposal + + if (changes) + { + uint256 newHash = ourPosition->getHash (); + WriteLog (lsINFO, LedgerConsensus) + << "Position change: CTime " << closeTime + << ", tx " << newHash; + + if (mOurPosition->changePosition (newHash, closeTime)) + { + if (mProposing) + propose (); + + mapComplete (newHash, ourPosition, false); + } + } + } + + /** If we radically changed our consensus context for some reason, + we need to replay recent proposals so that they're not lost. + */ + void playbackProposals () { - case lcsPRE_CLOSE: - ret["state"] = "open"; - break; + boost::unordered_map < uint160, + std::list > & storedProposals + = getApp().getOPs ().peekStoredProposals (); - case lcsESTABLISH: - ret["state"] = "consensus"; - break; + for (boost::unordered_map< uint160 + , std::list >::iterator it + = storedProposals.begin () + , end = storedProposals.end (); it != end; ++it) + { + bool relay = false; + BOOST_FOREACH (LedgerProposal::ref proposal, it->second) + { + if (proposal->hasSignature ()) + { + // we have the signature but don't know the + // ledger so couldn't verify + proposal->setPrevLedger (mPrevLedgerHash); - case lcsFINISHED: - ret["state"] = "finished"; - break; + if (proposal->checkSign ()) + { + WriteLog (lsINFO, LedgerConsensus) + << "Applying stored proposal"; + relay = peerPosition (proposal); + } + } + else if (proposal->isPrevLedger (mPrevLedgerHash)) + relay = peerPosition (proposal); - case lcsACCEPTED: - ret["state"] = "accepted"; - break; + if (relay) + { + WriteLog (lsWARNING, LedgerConsensus) + << "We should do delayed relay of this proposal," + << " but we cannot"; + } + + #if 0 + // FIXME: We can't do delayed relay because we don't have the signature + std::set peers + + if (relay && getApp().getHashRouter ().swapSet (proposal.getSuppress (), set, SF_RELAYED)) + { + WriteLog (lsDEBUG, LedgerConsensus) << "Stored proposal delayed relay"; + protocol::TMProposeSet set; + set.set_proposeseq + set.set_currenttxhash (, 256 / 8); + previousledger + closetime + nodepubkey + signature + PackedMessage::pointer message = boost::make_shared (set, protocol::mtPROPOSE_LEDGER); + getApp().getPeers ().relayMessageBut (peers, message); + } + + #endif + } + } } - int v = mDisputes.size (); - - if ((v != 0) && !full) - ret["disputes"] = v; - - if (mOurPosition) - ret["our_position"] = mOurPosition->getJson (); - - if (full) + /** We have just decided to close the ledger. Start the consensus timer, + stash the close time, inform peers, and take a position + */ + void closeLedger () { + checkOurValidation (); + mState = lcsESTABLISH; + mConsensusStartTime + = boost::posix_time::microsec_clock::universal_time (); + mCloseTime = getApp().getOPs ().getCloseTimeNC (); + getApp().getOPs ().setLastCloseTime (mCloseTime); + statusChange (protocol::neCLOSING_LEDGER, *mPreviousLedger); + takeInitialPosition (*getApp().getLedgerMaster ().closeLedger (true)); + } - ret["current_ms"] = mCurrentMSeconds; - ret["close_percent"] = mClosePercent; - ret["close_resolution"] = mCloseResolution; - ret["have_time_consensus"] = mHaveCloseTimeConsensus; - ret["previous_proposers"] = mPreviousProposers; - ret["previous_mseconds"] = mPreviousMSeconds; - - if (!mPeerPositions.empty ()) + void checkOurValidation () + { + // This only covers some cases - Fix for the case where we can't ever acquire the consensus ledger + if (!mHaveCorrectLCL || !mValPublic.isSet () + || !mValPrivate.isSet () + || getApp().getOPs ().isNeedNetworkLedger ()) { - typedef boost::unordered_map::value_type pp_t; - Json::Value ppj (Json::objectValue); - BOOST_FOREACH (pp_t & pp, mPeerPositions) - { - ppj[pp.first.GetHex ()] = pp.second->getJson (); - } - ret["peer_positions"] = ppj; + return; } - if (!mAcquired.empty ()) + SerializedValidation::pointer lastVal + = getApp().getOPs ().getLastValidation (); + + if (lastVal) { - // acquired - typedef boost::unordered_map::value_type ac_t; - Json::Value acq (Json::objectValue); - BOOST_FOREACH (ac_t & at, mAcquired) + if (lastVal->getFieldU32 (sfLedgerSequence) + == mPreviousLedger->getLedgerSeq ()) { - if (at.second) - acq[at.first.GetHex ()] = "acquired"; - else - acq[at.first.GetHex ()] = "failed"; + return; } - ret["acquired"] = acq; + if (lastVal->getLedgerHash () == mPrevLedgerHash) + return; } - if (!mAcquiring.empty ()) + uint256 signingHash; + SerializedValidation::pointer v + = boost::make_shared + (mPreviousLedger->getHash () + , getApp().getOPs ().getValidationTimeNC (), mValPublic, false); + addLoad(v); + v->setTrusted (); + v->sign (signingHash, mValPrivate); + // FIXME: wrong supression + getApp().getHashRouter ().addSuppression (signingHash); + getApp().getValidations ().addValidation (v, "localMissing"); + Blob validation = v->getSigned (); + protocol::TMValidation val; + val.set_validation (&validation[0], validation.size ()); + #if 0 + getApp().getPeers ().relayMessage (NULL, + boost::make_shared (val, protocol::mtVALIDATION)); + #endif + getApp().getOPs ().setLastValidation (v); + WriteLog (lsWARNING, LedgerConsensus) << "Sending partial validation"; + } + + /** We have a new LCL and must accept it + */ + void beginAccept (bool synchronous) + { + SHAMap::pointer consensusSet + = mAcquired[mOurPosition->getCurrentHash ()]; + + if (!consensusSet) { - typedef boost::unordered_map::value_type ac_t; - Json::Value acq (Json::arrayValue); - BOOST_FOREACH (ac_t & at, mAcquiring) - { - acq.append (at.first.GetHex ()); - } - ret["acquiring"] = acq; + WriteLog (lsFATAL, LedgerConsensus) + << "We don't have a consensus set"; + abort (); + return; } - if (!mDisputes.empty ()) - { - typedef boost::unordered_map::value_type d_t; - Json::Value dsj (Json::objectValue); - BOOST_FOREACH (d_t & dt, mDisputes) - { - dsj[dt.first.GetHex ()] = dt.second->getJson (); - } - ret["disputes"] = dsj; - } + getApp().getOPs ().newLCL + (mPeerPositions.size (), mCurrentMSeconds, mNewLedgerHash); - if (!mCloseTimes.empty ()) - { - typedef std::map::value_type ct_t; - Json::Value ctj (Json::objectValue); - BOOST_FOREACH (ct_t & ct, mCloseTimes) - { - ctj[lexicalCastThrow (ct.first)] = ct.second; - } - ret["close_times"] = ctj; - } - - if (!mDeadNodes.empty ()) - { - Json::Value dnj (Json::arrayValue); - BOOST_FOREACH (const uint160 & dn, mDeadNodes) - { - dnj.append (dn.GetHex ()); - } - ret["dead_nodes"] = dnj; + if (synchronous) + accept (consensusSet, LoadEvent::pointer ()); + else + { // FIXME: Post to JobQueue, not I/O service + getApp().getIOService ().post + (BIND_TYPE (&LedgerConsensusImp::accept + , shared_from_this (), consensusSet + , getApp().getJobQueue ().getLoadEvent + (jtACCEPTLEDGER, "LedgerConsensusImp::beginAccept"))); } } - return ret; + void endConsensus () + { + getApp().getOPs ().endConsensus (mHaveCorrectLCL); + } + + /** Add our fee to our validation + */ + void addLoad(SerializedValidation::ref val) + { + uint32 fee = std::max( + getApp().getFeeTrack().getLocalFee(), + getApp().getFeeTrack().getClusterFee()); + uint32 ref = getApp().getFeeTrack().getLoadBase(); + if (fee > ref) + val->setFieldU32(sfLoadFee, fee); + } +private: + // VFALCO TODO Rename these to look pretty + enum LCState + { + lcsPRE_CLOSE, // We haven't closed our ledger yet, + // but others might have + lcsESTABLISH, // Establishing consensus + lcsFINISHED, // We have closed on a transaction set + lcsACCEPTED, // We have accepted/validated + // a new last closed ledger + }; + + LCState mState; + uint32 mCloseTime; // The wall time this ledger closed + uint256 mPrevLedgerHash, mNewLedgerHash; + Ledger::pointer mPreviousLedger; + InboundLedger::pointer mAcquiringLedger; + LedgerProposal::pointer mOurPosition; + RippleAddress mValPublic, mValPrivate; + bool mProposing, mValidating, mHaveCorrectLCL, mConsensusFail; + + int mCurrentMSeconds, mClosePercent, mCloseResolution; + bool mHaveCloseTimeConsensus; + + boost::posix_time::ptime mConsensusStartTime; + int mPreviousProposers; + int mPreviousMSeconds; + + // Convergence tracking, trusted peers indexed by hash of public key + boost::unordered_map mPeerPositions; + + // Transaction Sets, indexed by hash of transaction tree + boost::unordered_map mAcquired; + boost::unordered_map mAcquiring; + + // Peer sets + boost::unordered_map > > mPeerData; + + // Disputed transactions + boost::unordered_map mDisputes; + boost::unordered_set mCompares; + + // Close time estimates + std::map mCloseTimes; + + // nodes that have bowed out of this consensus process + boost::unordered_set mDeadNodes; +}; + +//------------------------------------------------------------------------------ + +boost::shared_ptr LedgerConsensus::New(LedgerHash const & prevLCLHash, + Ledger::ref previousLedger, uint32 closeTime) +{ + return boost::make_shared( + prevLCLHash, previousLedger,closeTime); } + // vim:ts=4 diff --git a/src/ripple_app/consensus/LedgerConsensus.h b/src/ripple_app/consensus/LedgerConsensus.h index 5ceae4cad3..dbdbcbbe06 100644 --- a/src/ripple_app/consensus/LedgerConsensus.h +++ b/src/ripple_app/consensus/LedgerConsensus.h @@ -26,148 +26,56 @@ is destroyed when the process is complete. */ class LedgerConsensus - : public boost::enable_shared_from_this - , public CountedObject { -public: - static char const* getCountedObjectName () { return "LedgerConsensus"; } +public: + static shared_ptr New( + LedgerHash const & prevLCLHash, Ledger::ref previousLedger, + uint32 closeTime); - LedgerConsensus (LedgerHash const & prevLCLHash, Ledger::ref previousLedger, uint32 closeTime); + virtual int startup () = 0; - int startup (); + virtual Json::Value getJson (bool full) = 0; - Json::Value getJson (bool full); + virtual Ledger::ref peekPreviousLedger () = 0; - Ledger::ref peekPreviousLedger () - { - return mPreviousLedger; - } + virtual uint256 getLCL () = 0; - uint256 getLCL () - { - return mPrevLedgerHash; - } + virtual SHAMap::pointer getTransactionTree (uint256 const & hash, + bool doAcquire) = 0; - SHAMap::pointer getTransactionTree (uint256 const & hash, bool doAcquire); + virtual void mapComplete (uint256 const & hash, SHAMap::ref map, + bool acquired) = 0; - TransactionAcquire::pointer getAcquiring (uint256 const & hash); + virtual bool stillNeedTXSet (uint256 const & hash) = 0; - void mapComplete (uint256 const & hash, SHAMap::ref map, bool acquired); + virtual void checkLCL () = 0; - bool stillNeedTXSet (uint256 const & hash); + virtual void handleLCL (uint256 const & lclHash) = 0; - void checkLCL (); - - void handleLCL (uint256 const & lclHash); - - void timerEntry (); + virtual void timerEntry () = 0; // state handlers - void statePreClose (); - void stateEstablish (); - void stateCutoff (); - void stateFinished (); - void stateAccepted (); + virtual void statePreClose () = 0; + virtual void stateEstablish () = 0; + virtual void stateFinished () = 0; + virtual void stateAccepted () = 0; - bool haveConsensus (bool forReal); + virtual bool haveConsensus (bool forReal) = 0; - bool peerPosition (LedgerProposal::ref); + virtual bool peerPosition (LedgerProposal::ref) = 0; - bool peerHasSet (Peer::ref peer, uint256 const & set, protocol::TxSetStatus status); + virtual bool peerHasSet (Peer::ref peer, uint256 const & set, + protocol::TxSetStatus status) = 0; - SHAMapAddNode peerGaveNodes (Peer::ref peer, uint256 const & setHash, - const std::list& nodeIDs, const std::list< Blob >& nodeData); + virtual SHAMapAddNode peerGaveNodes (Peer::ref peer, + uint256 const & setHash, + const std::list& nodeIDs, + const std::list< Blob >& nodeData) = 0; - bool isOurPubKey (const RippleAddress & k) - { - return k == mValPublic; - } + virtual bool isOurPubKey (const RippleAddress & k) = 0; // test/debug - void simulate (); - -private: - // final accept logic - void accept (SHAMap::ref txSet, LoadEvent::pointer); - - void weHave (uint256 const & id, Peer::ref avoidPeer); - void startAcquiring (TransactionAcquire::pointer); - SHAMap::pointer find (uint256 const & hash); - - void createDisputes (SHAMap::ref, SHAMap::ref); - void addDisputedTransaction (uint256 const& , Blob const & transaction); - void adjustCount (SHAMap::ref map, const std::vector& peers); - void propose (); - - void addPosition (LedgerProposal&, bool ours); - void removePosition (LedgerProposal&, bool ours); - void sendHaveTxSet (uint256 const & set, bool direct); - void applyTransactions (SHAMap::ref transactionSet, Ledger::ref targetLedger, - Ledger::ref checkLedger, CanonicalTXSet & failedTransactions, bool openLgr); - int applyTransaction (TransactionEngine & engine, SerializedTransaction::ref txn, Ledger::ref targetLedger, - bool openLgr, bool retryAssured); - - uint32 roundCloseTime (uint32 closeTime); - - // manipulating our own position - void statusChange (protocol::NodeEvent, Ledger & ledger); - void takeInitialPosition (Ledger & initialLedger); - void updateOurPositions (); - void playbackProposals (); - int getThreshold (); - void closeLedger (); - void checkOurValidation (); - - void beginAccept (bool synchronous); - void endConsensus (); - - void addLoad (SerializedValidation::ref val); - -private: - // VFALCO TODO Rename these to look pretty - enum LCState - { - lcsPRE_CLOSE, // We haven't closed our ledger yet, but others might have - lcsESTABLISH, // Establishing consensus - lcsFINISHED, // We have closed on a transaction set - lcsACCEPTED, // We have accepted/validated a new last closed ledger - }; - - LCState mState; - uint32 mCloseTime; // The wall time this ledger closed - uint256 mPrevLedgerHash, mNewLedgerHash; - Ledger::pointer mPreviousLedger; - InboundLedger::pointer mAcquiringLedger; - LedgerProposal::pointer mOurPosition; - RippleAddress mValPublic, mValPrivate; - bool mProposing, mValidating, mHaveCorrectLCL, mConsensusFail; - - int mCurrentMSeconds, mClosePercent, mCloseResolution; - bool mHaveCloseTimeConsensus; - - boost::posix_time::ptime mConsensusStartTime; - int mPreviousProposers; - int mPreviousMSeconds; - - // Convergence tracking, trusted peers indexed by hash of public key - boost::unordered_map mPeerPositions; - - // Transaction Sets, indexed by hash of transaction tree - boost::unordered_map mAcquired; - boost::unordered_map mAcquiring; - - // Peer sets - boost::unordered_map > > mPeerData; - - // Disputed transactions - boost::unordered_map mDisputes; - boost::unordered_set mCompares; - - // Close time estimates - std::map mCloseTimes; - - // nodes that have bowed out of this consensus process - boost::unordered_set mDeadNodes; + virtual void simulate () = 0; }; diff --git a/src/ripple_app/ledger/InboundLedgers.cpp b/src/ripple_app/ledger/InboundLedgers.cpp index 2f6c65496a..d676e64923 100644 --- a/src/ripple_app/ledger/InboundLedgers.cpp +++ b/src/ripple_app/ledger/InboundLedgers.cpp @@ -17,347 +17,418 @@ */ //============================================================================== -typedef std::pair u256_acq_pair; -InboundLedgers::InboundLedgers (Stoppable& parent) - : Stoppable ("InboundLedgers", parent) - , mLock (this, "InboundLedger", __FILE__, __LINE__) - , mRecentFailures ("LedgerAcquireRecentFailures", 0, kReacquireIntervalSeconds) +class InboundLedgersImp + : public InboundLedgers + , public Stoppable + , public LeakChecked { -} - -InboundLedger::pointer InboundLedgers::findCreate (uint256 const& hash, uint32 seq, bool couldBeNew) -{ - assert (hash.isNonZero ()); - InboundLedger::pointer ret; +public: + typedef std::pair u256_acq_pair; + // How long before we try again to acquire the same ledger + static const int kReacquireIntervalSeconds = 300; + explicit InboundLedgersImp (Stoppable& parent) + : Stoppable ("InboundLedgers", parent) + , mLock (this, "InboundLedger", __FILE__, __LINE__) + , mRecentFailures ("LedgerAcquireRecentFailures", 0, + kReacquireIntervalSeconds) { - ScopedLockType sl (mLock, __FILE__, __LINE__); + } + + // VFALCO TODO Should this be called findOrAdd ? + // + InboundLedger::pointer findCreate (uint256 const& hash, uint32 seq, + bool couldBeNew) + { + assert (hash.isNonZero ()); + InboundLedger::pointer ret; - if (! isStopping ()) { + ScopedLockType sl (mLock, __FILE__, __LINE__); + + if (! isStopping ()) + { + boost::unordered_map::iterator it = mLedgers.find (hash); + if (it != mLedgers.end ()) + { + ret = it->second; + // FIXME: Should set the sequence if it's not set + } + else + { + ret = boost::make_shared (hash, seq); + assert (ret); + mLedgers.insert (std::make_pair (hash, ret)); + ret->init (sl, couldBeNew); + } + } + } + + return ret; + } + + InboundLedger::pointer find (uint256 const& hash) + { + assert (hash.isNonZero ()); + + InboundLedger::pointer ret; + + { + ScopedLockType sl (mLock, __FILE__, __LINE__); + boost::unordered_map::iterator it = mLedgers.find (hash); if (it != mLedgers.end ()) { ret = it->second; - // FIXME: Should set the sequence if it's not set - } - else - { - ret = boost::make_shared (hash, seq); - assert (ret); - mLedgers.insert (std::make_pair (hash, ret)); - ret->init (sl, couldBeNew); } } + + return ret; } - return ret; -} - -InboundLedger::pointer InboundLedgers::find (uint256 const& hash) -{ - assert (hash.isNonZero ()); - - InboundLedger::pointer ret; - + bool hasLedger (LedgerHash const& hash) { + assert (hash.isNonZero ()); + ScopedLockType sl (mLock, __FILE__, __LINE__); - - boost::unordered_map::iterator it = mLedgers.find (hash); - if (it != mLedgers.end ()) - { - ret = it->second; - } + return mLedgers.find (hash) != mLedgers.end (); } - return ret; -} - -bool InboundLedgers::hasLedger (LedgerHash const& hash) -{ - assert (hash.isNonZero ()); - - ScopedLockType sl (mLock, __FILE__, __LINE__); - return mLedgers.find (hash) != mLedgers.end (); -} - -void InboundLedgers::dropLedger (LedgerHash const& hash) -{ - assert (hash.isNonZero ()); - - ScopedLockType sl (mLock, __FILE__, __LINE__); - mLedgers.erase (hash); - -} - -bool InboundLedgers::awaitLedgerData (LedgerHash const& ledgerHash) -{ - InboundLedger::pointer ledger = find (ledgerHash); - - if (!ledger) - return false; - - ledger->awaitData (); - return true; -} - -/* -This gets called when - "We got some data from an inbound ledger" - -inboundLedgerTrigger: - "What do we do with this partial data?" - Figures out what to do with the responses to our requests for information. - -*/ -// means "We got some data from an inbound ledger" -void InboundLedgers::gotLedgerData (Job&, LedgerHash hash, - boost::shared_ptr packet_ptr, boost::weak_ptr wPeer) -{ - protocol::TMLedgerData& packet = *packet_ptr; - Peer::pointer peer = wPeer.lock (); - - WriteLog (lsTRACE, InboundLedger) << "Got data (" << packet.nodes ().size () << ") for acquiring ledger: " << hash; - - InboundLedger::pointer ledger = find (hash); - - if (!ledger) + void dropLedger (LedgerHash const& hash) { - WriteLog (lsTRACE, InboundLedger) << "Got data for ledger we're not acquiring"; + assert (hash.isNonZero ()); - if (peer) - { - peer->charge (Resource::feeInvalidRequest); - } + ScopedLockType sl (mLock, __FILE__, __LINE__); + mLedgers.erase (hash); - return; } - ledger->noAwaitData (); - - if (!peer) - return; - - if (packet.type () == protocol::liBASE) + bool awaitLedgerData (LedgerHash const& ledgerHash) { - if (packet.nodes_size () < 1) - { - WriteLog (lsWARNING, InboundLedger) << "Got empty base data"; - peer->charge (Resource::feeInvalidRequest); - return; - } + InboundLedger::pointer ledger = find (ledgerHash); - if (!ledger->takeBase (packet.nodes (0).nodedata ())) - { - WriteLog (lsWARNING, InboundLedger) << "Got invalid base data"; - peer->charge (Resource::feeInvalidRequest); - return; - } + if (!ledger) + return false; - SHAMapAddNode san = SHAMapAddNode::useful (); - - if ((packet.nodes ().size () > 1) && !ledger->takeAsRootNode (strCopy (packet.nodes (1).nodedata ()), san)) - { - WriteLog (lsWARNING, InboundLedger) << "Included ASbase invalid"; - } - - if ((packet.nodes ().size () > 2) && !ledger->takeTxRootNode (strCopy (packet.nodes (2).nodedata ()), san)) - { - WriteLog (lsWARNING, InboundLedger) << "Included TXbase invalid"; - } - - if (!san.isInvalid ()) - { - ledger->progress (); - ledger->trigger (peer); - } - else - WriteLog (lsDEBUG, InboundLedger) << "Peer sends invalid base data"; - - return; + ledger->awaitData (); + return true; } + /* + This gets called when + "We got some data from an inbound ledger" - if ((packet.type () == protocol::liTX_NODE) || (packet.type () == protocol::liAS_NODE)) + inboundLedgerTrigger: + "What do we do with this partial data?" + Figures out what to do with the responses to our requests for information. + + */ + // means "We got some data from an inbound ledger" + + // VFALCO TODO Why is hash passed by value? + // VFALCO TODO Remove the dependency on the Peer object. + // + void gotLedgerData (Job& job, + LedgerHash hash, + boost::shared_ptr packet_ptr, + boost::weak_ptr wPeer) { - std::list nodeIDs; - std::list< Blob > nodeData; + protocol::TMLedgerData& packet = *packet_ptr; + Peer::pointer peer = wPeer.lock (); - if (packet.nodes ().size () <= 0) + WriteLog (lsTRACE, InboundLedger) << "Got data (" << packet.nodes ().size () << ") for acquiring ledger: " << hash; + + InboundLedger::pointer ledger = find (hash); + + if (!ledger) { - WriteLog (lsINFO, InboundLedger) << "Got response with no nodes"; - peer->charge (Resource::feeInvalidRequest); - return; - } + WriteLog (lsTRACE, InboundLedger) << "Got data for ledger we're not acquiring"; - for (int i = 0; i < packet.nodes ().size (); ++i) - { - const protocol::TMLedgerNode& node = packet.nodes (i); - - if (!node.has_nodeid () || !node.has_nodedata ()) + if (peer) { - WriteLog (lsWARNING, InboundLedger) << "Got bad node"; + peer->charge (Resource::feeInvalidRequest); + } + + return; + } + + ledger->noAwaitData (); + + if (!peer) + return; + + if (packet.type () == protocol::liBASE) + { + if (packet.nodes_size () < 1) + { + WriteLog (lsWARNING, InboundLedger) << "Got empty base data"; peer->charge (Resource::feeInvalidRequest); return; } - nodeIDs.push_back (SHAMapNode (node.nodeid ().data (), node.nodeid ().size ())); - nodeData.push_back (Blob (node.nodedata ().begin (), node.nodedata ().end ())); - } - - SHAMapAddNode ret; - - if (packet.type () == protocol::liTX_NODE) - ledger->takeTxNode (nodeIDs, nodeData, ret); - else - ledger->takeAsNode (nodeIDs, nodeData, ret); - - if (!ret.isInvalid ()) - { - ledger->progress (); - ledger->trigger (peer); - } - else - WriteLog (lsDEBUG, InboundLedger) << "Peer sends invalid node data"; - - return; - } - - WriteLog (lsWARNING, InboundLedger) << "Not sure what ledger data we got"; - peer->charge (Resource::feeInvalidRequest); -} - -void InboundLedgers::sweep () -{ - mRecentFailures.sweep (); - - int const now = UptimeTimer::getInstance ().getElapsedSeconds (); - - // Make a list of things to sweep, while holding the lock - std::vector stuffToSweep; - std::size_t total; - { - ScopedLockType sl (mLock, __FILE__, __LINE__); - MapType::iterator it (mLedgers.begin ()); - total = mLedgers.size (); - stuffToSweep.reserve (total); - - while (it != mLedgers.end ()) - { - if (it->second->getLastAction () > now) + if (!ledger->takeBase (packet.nodes (0).nodedata ())) { - it->second->touch (); - ++it; + WriteLog (lsWARNING, InboundLedger) << "Got invalid base data"; + peer->charge (Resource::feeInvalidRequest); + return; } - else if ((it->second->getLastAction () + 60) < now) + + SHAMapAddNode san = SHAMapAddNode::useful (); + + if ((packet.nodes ().size () > 1) && !ledger->takeAsRootNode (strCopy (packet.nodes (1).nodedata ()), san)) { - stuffToSweep.push_back (it->second); - // shouldn't cause the actual final delete - // since we are holding a reference in the vector. - it = mLedgers.erase (it); + WriteLog (lsWARNING, InboundLedger) << "Included ASbase invalid"; + } + + if ((packet.nodes ().size () > 2) && !ledger->takeTxRootNode (strCopy (packet.nodes (2).nodedata ()), san)) + { + WriteLog (lsWARNING, InboundLedger) << "Included TXbase invalid"; + } + + if (!san.isInvalid ()) + { + ledger->progress (); + ledger->trigger (peer); } else + WriteLog (lsDEBUG, InboundLedger) << "Peer sends invalid base data"; + + return; + } + + if ((packet.type () == protocol::liTX_NODE) || (packet.type () == protocol::liAS_NODE)) + { + std::list nodeIDs; + std::list< Blob > nodeData; + + if (packet.nodes ().size () <= 0) { - ++it; + WriteLog (lsINFO, InboundLedger) << "Got response with no nodes"; + peer->charge (Resource::feeInvalidRequest); + return; + } + + for (int i = 0; i < packet.nodes ().size (); ++i) + { + const protocol::TMLedgerNode& node = packet.nodes (i); + + if (!node.has_nodeid () || !node.has_nodedata ()) + { + WriteLog (lsWARNING, InboundLedger) << "Got bad node"; + peer->charge (Resource::feeInvalidRequest); + return; + } + + nodeIDs.push_back (SHAMapNode (node.nodeid ().data (), node.nodeid ().size ())); + nodeData.push_back (Blob (node.nodedata ().begin (), node.nodedata ().end ())); + } + + SHAMapAddNode ret; + + if (packet.type () == protocol::liTX_NODE) + ledger->takeTxNode (nodeIDs, nodeData, ret); + else + ledger->takeAsNode (nodeIDs, nodeData, ret); + + if (!ret.isInvalid ()) + { + ledger->progress (); + ledger->trigger (peer); + } + else + WriteLog (lsDEBUG, InboundLedger) << "Peer sends invalid node data"; + + return; + } + + WriteLog (lsWARNING, InboundLedger) << "Not sure what ledger data we got"; + peer->charge (Resource::feeInvalidRequest); + } + + int getFetchCount (int& timeoutCount) + { + timeoutCount = 0; + int ret = 0; + + std::vector inboundLedgers; + + { + ScopedLockType sl (mLock, __FILE__, __LINE__); + + inboundLedgers.reserve(mLedgers.size()); + BOOST_FOREACH (const u256_acq_pair & it, mLedgers) + { + inboundLedgers.push_back(it); } } + + BOOST_FOREACH (const u256_acq_pair & it, inboundLedgers) + { + if (it.second->isActive ()) + { + ++ret; + timeoutCount += it.second->getTimeouts (); + } + } + return ret; } - WriteLog (lsDEBUG, InboundLedger) << - "Sweeped " << stuffToSweep.size () << - " out of " << total << " inbound ledgers."; -} + void logFailure (uint256 const& h) + { + mRecentFailures.add (h); + } -int InboundLedgers::getFetchCount (int& timeoutCount) -{ - timeoutCount = 0; - int ret = 0; - - std::vector inboundLedgers; + bool isFailure (uint256 const& h) + { + return mRecentFailures.isPresent (h, false); + } + void clearFailures () { ScopedLockType sl (mLock, __FILE__, __LINE__); - inboundLedgers.reserve(mLedgers.size()); - BOOST_FOREACH (const u256_acq_pair & it, mLedgers) + mRecentFailures.clear(); + mLedgers.clear(); + } + + Json::Value getInfo() + { + Json::Value ret(Json::objectValue); + + std::vector acquires; { - inboundLedgers.push_back(it); - } - } + ScopedLockType sl (mLock, __FILE__, __LINE__); - BOOST_FOREACH (const u256_acq_pair & it, inboundLedgers) - { - if (it.second->isActive ()) + acquires.reserve (mLedgers.size ()); + BOOST_FOREACH (const u256_acq_pair & it, mLedgers) + { + assert (it.second); + acquires.push_back (it); + } + } + + BOOST_FOREACH (const u256_acq_pair& it, acquires) { - ++ret; - timeoutCount += it.second->getTimeouts (); + uint32 seq = it.second->getSeq(); + if (seq > 1) + ret[lexicalCastThrow (seq)] = it.second->getJson(0); + else + ret[it.first.GetHex()] = it.second->getJson(0); } - } - return ret; -} - -void InboundLedgers::gotFetchPack (Job&) -{ - std::vector acquires; - { - ScopedLockType sl (mLock, __FILE__, __LINE__); - - acquires.reserve (mLedgers.size ()); - BOOST_FOREACH (const u256_acq_pair & it, mLedgers) - { - assert (it.second); - acquires.push_back (it.second); - } - } - - BOOST_FOREACH (const InboundLedger::pointer & acquire, acquires) - { - acquire->checkLocal (); - } -} - -void InboundLedgers::clearFailures () -{ - ScopedLockType sl (mLock, __FILE__, __LINE__); - - mRecentFailures.clear(); - mLedgers.clear(); -} - -Json::Value InboundLedgers::getInfo() -{ - Json::Value ret(Json::objectValue); - - std::vector acquires; - { - ScopedLockType sl (mLock, __FILE__, __LINE__); - - acquires.reserve (mLedgers.size ()); - BOOST_FOREACH (const u256_acq_pair & it, mLedgers) - { - assert (it.second); - acquires.push_back (it); - } - } - - BOOST_FOREACH (const u256_acq_pair& it, acquires) - { - uint32 seq = it.second->getSeq(); - if (seq > 1) - ret[lexicalCastThrow (seq)] = it.second->getJson(0); - else - ret[it.first.GetHex()] = it.second->getJson(0); - } return ret; -} + } -void InboundLedgers::onStop () + void gotFetchPack (Job&) + { + std::vector acquires; + { + ScopedLockType sl (mLock, __FILE__, __LINE__); + + acquires.reserve (mLedgers.size ()); + BOOST_FOREACH (const u256_acq_pair & it, mLedgers) + { + assert (it.second); + acquires.push_back (it.second); + } + } + + BOOST_FOREACH (const InboundLedger::pointer & acquire, acquires) + { + acquire->checkLocal (); + } + } + + void sweep () + { + mRecentFailures.sweep (); + + int const now = UptimeTimer::getInstance ().getElapsedSeconds (); + + // Make a list of things to sweep, while holding the lock + std::vector stuffToSweep; + std::size_t total; + { + ScopedLockType sl (mLock, __FILE__, __LINE__); + MapType::iterator it (mLedgers.begin ()); + total = mLedgers.size (); + stuffToSweep.reserve (total); + + while (it != mLedgers.end ()) + { + if (it->second->getLastAction () > now) + { + it->second->touch (); + ++it; + } + else if ((it->second->getLastAction () + 60) < now) + { + stuffToSweep.push_back (it->second); + // shouldn't cause the actual final delete + // since we are holding a reference in the vector. + it = mLedgers.erase (it); + } + else + { + ++it; + } + } + } + + WriteLog (lsDEBUG, InboundLedger) << + "Sweeped " << stuffToSweep.size () << + " out of " << total << " inbound ledgers."; + } + + void onStop () + { + ScopedLockType lock (mLock, __FILE__, __LINE__); + + mLedgers.clear(); + mRecentFailures.clear(); + + stopped(); + } + +private: + typedef boost::unordered_map MapType; + + typedef RippleRecursiveMutex LockType; + typedef LockType::ScopedLockType ScopedLockType; + LockType mLock; + + MapType mLedgers; + KeyCache mRecentFailures; +}; + +//------------------------------------------------------------------------------ + +InboundLedgers::~InboundLedgers() { - ScopedLockType lock (mLock, __FILE__, __LINE__); - - mLedgers.clear(); - mRecentFailures.clear(); - - stopped(); } + +InboundLedgers* InboundLedgers::New (Stoppable& parent) +{ + return new InboundLedgersImp (parent); +} + + + + + + + + + + + + + + + + + + + + + diff --git a/src/ripple_app/ledger/InboundLedgers.h b/src/ripple_app/ledger/InboundLedgers.h index 943f30ca9a..d48e1aa68a 100644 --- a/src/ripple_app/ledger/InboundLedgers.h +++ b/src/ripple_app/ledger/InboundLedgers.h @@ -24,68 +24,48 @@ @see InboundLedger */ -// VFALCO TODO Rename to InboundLedgers -// VFALCO TODO Create abstract interface class InboundLedgers - : public Stoppable - , public LeakChecked { public: - // How long before we try again to acquire the same ledger - static const int kReacquireIntervalSeconds = 300; + virtual ~InboundLedgers() = 0; - explicit InboundLedgers (Stoppable& parent); + static InboundLedgers* New (Stoppable& parent); // VFALCO TODO Should this be called findOrAdd ? // - InboundLedger::pointer findCreate (uint256 const& hash, uint32 seq, bool bCouldBeNew); + virtual InboundLedger::pointer findCreate (uint256 const& hash, + uint32 seq, + bool bCouldBeNew) = 0; - InboundLedger::pointer find (uint256 const& hash); + virtual InboundLedger::pointer find (uint256 const& hash) = 0; - bool hasLedger (LedgerHash const& ledgerHash); + virtual bool hasLedger (LedgerHash const& ledgerHash) = 0; - void dropLedger (LedgerHash const& ledgerHash); + virtual void dropLedger (LedgerHash const& ledgerHash) = 0; - bool awaitLedgerData (LedgerHash const& ledgerHash); + virtual bool awaitLedgerData (LedgerHash const& ledgerHash) = 0; // VFALCO TODO Why is hash passed by value? // VFALCO TODO Remove the dependency on the Peer object. // - void gotLedgerData (Job&, + virtual void gotLedgerData (Job& job, LedgerHash hash, boost::shared_ptr packet, - boost::weak_ptr peer); + boost::weak_ptr peer) = 0; - int getFetchCount (int& timeoutCount); + virtual int getFetchCount (int& timeoutCount) = 0; - void logFailure (uint256 const& h) - { - mRecentFailures.add (h); - } + virtual void logFailure (uint256 const& h) = 0; - bool isFailure (uint256 const& h) - { - return mRecentFailures.isPresent (h, false); - } + virtual bool isFailure (uint256 const& h) = 0; - void clearFailures(); + virtual void clearFailures() = 0; - Json::Value getInfo(); + virtual Json::Value getInfo() = 0; - void gotFetchPack (Job&); - void sweep (); + virtual void gotFetchPack (Job&) = 0; + virtual void sweep () = 0; - void onStop (); - -private: - typedef boost::unordered_map MapType; - - typedef RippleRecursiveMutex LockType; - typedef LockType::ScopedLockType ScopedLockType; - LockType mLock; - - MapType mLedgers; - KeyCache mRecentFailures; }; #endif diff --git a/src/ripple_app/main/Application.cpp b/src/ripple_app/main/Application.cpp index 135613df79..c56e9172b7 100644 --- a/src/ripple_app/main/Application.cpp +++ b/src/ripple_app/main/Application.cpp @@ -131,7 +131,7 @@ public: , m_sntpClient (SNTPClient::New (*this)) - , m_inboundLedgers (*m_jobQueue) + , m_inboundLedgers (InboundLedgers::New(*m_jobQueue)) , m_txQueue (TxQueue::New ()) @@ -215,7 +215,7 @@ public: InboundLedgers& getInboundLedgers () { - return m_inboundLedgers; + return *m_inboundLedgers; } TransactionMaster& getMasterTransaction () @@ -914,7 +914,7 @@ private: NodeStoreScheduler m_nodeStoreScheduler; ScopedPointer m_nodeStore; ScopedPointer m_sntpClient; - InboundLedgers m_inboundLedgers; + beast::unique_ptr m_inboundLedgers; ScopedPointer m_txQueue; ScopedPointer m_validators; ScopedPointer mFeatures; diff --git a/src/ripple_app/misc/NetworkOPs.cpp b/src/ripple_app/misc/NetworkOPs.cpp index d73b8c5ace..578c13b294 100644 --- a/src/ripple_app/misc/NetworkOPs.cpp +++ b/src/ripple_app/misc/NetworkOPs.cpp @@ -1403,8 +1403,10 @@ int NetworkOPsImp::beginConsensus (uint256 const& networkClosed, Ledger::pointer // Create a consensus object to get consensus on this ledger assert (!mConsensus); prevLedger->setImmutable (); - mConsensus = boost::make_shared ( - networkClosed, prevLedger, m_ledgerMaster.getCurrentLedger ()->getCloseTimeNC ()); + + mConsensus = LedgerConsensus::New( + networkClosed, prevLedger, + m_ledgerMaster.getCurrentLedger ()->getCloseTimeNC ()); m_journal.debug << "Initiating consensus engine"; return mConsensus->startup ();