From b2dbe8ef839be85cd9a9791c2d83d376f6926d3a Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sat, 14 Dec 2013 20:16:54 -0800 Subject: [PATCH] Ledger acquire fixes/cleanups/logging * Inbound ledger and SHAMapAddNode cleanup * Log acquire stats * Fix progress logic * Remove ledgers we no longer need to acquire * Stash stale state data in our fetch pack, it can still be useful * Stash in fetch pack if acquire terminated while job was pending * Account for duplicate/invalid nodes in a few cases previously missed * Dispatch each InboundLedger once (not per data) * Trigger only the "best" peer * Don't call tryAdvance on failed acquires --- src/ripple_app/consensus/LedgerConsensus.cpp | 28 +- src/ripple_app/ledger/InboundLedger.cpp | 308 +++++++++++++++---- src/ripple_app/ledger/InboundLedger.h | 30 +- src/ripple_app/ledger/InboundLedgers.cpp | 218 +++++++------ src/ripple_app/ledger/InboundLedgers.h | 25 +- src/ripple_app/ledger/LedgerMaster.cpp | 2 +- src/ripple_app/misc/NetworkOPs.cpp | 7 +- src/ripple_app/misc/Validations.cpp | 2 +- src/ripple_app/peers/Peer.cpp | 9 +- src/ripple_app/peers/PeerSet.cpp | 10 +- src/ripple_app/peers/PeerSet.h | 10 +- src/ripple_app/shamap/SHAMapAddNode.h | 103 ++++--- src/ripple_app/shamap/SHAMapSync.cpp | 33 +- src/ripple_app/tx/TransactionAcquire.cpp | 24 +- 14 files changed, 515 insertions(+), 294 deletions(-) diff --git a/src/ripple_app/consensus/LedgerConsensus.cpp b/src/ripple_app/consensus/LedgerConsensus.cpp index 8519be294..436c558b9 100644 --- a/src/ripple_app/consensus/LedgerConsensus.cpp +++ b/src/ripple_app/consensus/LedgerConsensus.cpp @@ -499,8 +499,7 @@ public: return; // we need to switch the ledger we're working from - Ledger::pointer newLCL = - getApp().getLedgerMaster ().getLedgerByHash (lclHash); + Ledger::pointer newLCL = getApp().getLedgerMaster ().getLedgerByHash (lclHash); if (newLCL) { @@ -510,37 +509,32 @@ public: mPreviousLedger = newLCL; mPrevLedgerHash = lclHash; } - else if (!mAcquiringLedger || (mAcquiringLedger->getHash () - != mPrevLedgerHash)) + else if (!mAcquiringLedger || (mAcquiringLedger->getHash () != mPrevLedgerHash)) { // need to start acquiring the correct consensus LCL - WriteLog (lsWARNING, LedgerConsensus) - << "Need consensus ledger " << mPrevLedgerHash; + WriteLog (lsWARNING, LedgerConsensus) << "Need consensus ledger " << mPrevLedgerHash; if (mAcquiringLedger) - { - getApp().getInboundLedgers ().dropLedger - (mAcquiringLedger->getHash ()); - } + getApp().getInboundLedgers ().dropLedger (mAcquiringLedger->getHash ()); - mAcquiringLedger = getApp().getInboundLedgers ().findCreate - (mPrevLedgerHash, 0, true); + mAcquiringLedger = getApp().getInboundLedgers ().findCreateConsensusLedger (mPrevLedgerHash); mHaveCorrectLCL = false; return; } else return; - WriteLog (lsINFO, LedgerConsensus) - << "Have the consensus ledger " << mPrevLedgerHash; + WriteLog (lsINFO, LedgerConsensus) << "Have the consensus ledger " << mPrevLedgerHash; mHaveCorrectLCL = true; mCloseResolution = ContinuousLedgerTiming::getNextLedgerTimeResolution ( - mPreviousLedger->getCloseResolution () - ,mPreviousLedger->getCloseAgree () - ,mPreviousLedger->getLedgerSeq () + 1); + mPreviousLedger->getCloseResolution (), mPreviousLedger->getCloseAgree (), + mPreviousLedger->getLedgerSeq () + 1); } + + + void timerEntry () { if ((mState != lcsFINISHED) && (mState != lcsACCEPTED)) diff --git a/src/ripple_app/ledger/InboundLedger.cpp b/src/ripple_app/ledger/InboundLedger.cpp index 98e29fb57..0254d4259 100644 --- a/src/ripple_app/ledger/InboundLedger.cpp +++ b/src/ripple_app/ledger/InboundLedger.cpp @@ -33,12 +33,10 @@ InboundLedger::InboundLedger (uint256 const& hash, uint32 seq) , mAborted (false) , mSignaled (false) , mByHash (true) - , mWaitCount (0) , mSeq (seq) + , mReceiveDispatched (false) { -#ifdef LA_DEBUG WriteLog (lsTRACE, InboundLedger) << "Acquiring ledger " << mHash; -#endif } bool InboundLedger::checkLocal () @@ -55,6 +53,12 @@ bool InboundLedger::checkLocal () InboundLedger::~InboundLedger () { + BOOST_FOREACH (PeerDataPairType& entry, mReceivedData) + { + if (entry.second->type () == protocol::liAS_NODE) + getApp().getInboundLedgers().gotStaleData(entry.second); + } + } void InboundLedger::init(ScopedLockType& collectionLock, bool couldBeNew) @@ -127,7 +131,6 @@ bool InboundLedger::tryLocal () if (mLedger->peekTransactionMap ()->fetchRoot (mLedger->getTransHash (), &filter)) { - WriteLog (lsTRACE, InboundLedger) << "Got root txn map locally"; std::vector h = mLedger->getNeededTransactionHashes (1, &filter); if (h.empty ()) @@ -152,7 +155,6 @@ bool InboundLedger::tryLocal () if (mLedger->peekAccountStateMap ()->fetchRoot (mLedger->getAccountHash (), &filter)) { - WriteLog (lsTRACE, InboundLedger) << "Got root AS map locally"; std::vector h = mLedger->getNeededAccountStateHashes (1, &filter); if (h.empty ()) @@ -175,17 +177,25 @@ bool InboundLedger::tryLocal () return mComplete; } +/** Called with a lock by the PeerSet when the timer expires +*/ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&) { mRecentTXNodes.clear (); mRecentASNodes.clear (); + if (isDone()) + { + WriteLog (lsINFO, InboundLedger) << "Already done " << mHash; + return; + } + if (getTimeouts () > LEDGER_TIMEOUT_COUNT) { if (mSeq != 0) - WriteLog (lsWARNING, InboundLedger) << getTimeouts() << " timeouts for ledger " << mSeq; + WriteLog (lsWARNING, InboundLedger) << getTimeouts() << " timeouts for ledger " << mSeq; else - WriteLog (lsWARNING, InboundLedger) << getTimeouts() << " timeouts for ledger " << mHash; + WriteLog (lsWARNING, InboundLedger) << getTimeouts() << " timeouts for ledger " << mHash; setFailed (); done (); return; @@ -193,17 +203,7 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&) if (!wasProgress) { - if (isDone()) - { - WriteLog (lsINFO, InboundLedger) << "Already done " << mHash; - return; - } checkLocal(); - if (isDone()) - { - WriteLog (lsINFO, InboundLedger) << "Completed fetch " << mHash; - return; - } mAggressive = true; mByHash = true; @@ -216,23 +216,6 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&) } } -void InboundLedger::awaitData () -{ - ++mWaitCount; -} - -void InboundLedger::noAwaitData () -{ // subtract one if mWaitCount is greater than zero - do - { - int j = mWaitCount.get(); - if (j <= 0) - return; - if (mWaitCount.compareAndSetBool(j - 1, j)) - return; - } while (1); -} - void InboundLedger::addPeers () { std::vector peerList = getApp().getPeers ().getPeerVector (); @@ -264,8 +247,8 @@ void InboundLedger::addPeers () { if (peerHas (peerList[ (i + firstPeer) % vSize])) ++found; - } - if (mSeq != 0) + } + if (mSeq != 0) WriteLog (lsDEBUG, InboundLedger) << "Chose " << found << " peer(s) for ledger " << mSeq; else WriteLog (lsDEBUG, InboundLedger) << "Chose " << found << " peer(s) for ledger " << getHash ().GetHex(); @@ -287,8 +270,10 @@ static void LADispatch ( std::vector< FUNCTION_TYPE > trig) { if (la->isComplete() && !la->isFailed()) + { getApp().getLedgerMaster().checkAccept(la->getLedger()); - getApp().getLedgerMaster().tryAdvance(); + getApp().getLedgerMaster().tryAdvance(); + } for (unsigned int i = 0; i < trig.size (); ++i) trig[i] (la); } @@ -325,14 +310,14 @@ void InboundLedger::done () BIND_TYPE (LADispatch, P_1, shared_from_this (), triggers)); } -bool InboundLedger::addOnComplete (FUNCTION_TYPE trigger) +bool InboundLedger::addOnComplete (FUNCTION_TYPE triggerFunc) { ScopedLockType sl (mLock, __FILE__, __LINE__); if (isDone ()) return false; - mOnComplete.push_back (trigger); + mOnComplete.push_back (triggerFunc); return true; } @@ -347,12 +332,6 @@ void InboundLedger::trigger (Peer::ref peer) return; } - if ((mWaitCount.get() > 0) && peer) - { - WriteLog (lsTRACE, InboundLedger) << "Skipping peer"; - return; - } - if (ShouldLog (lsTRACE, InboundLedger)) { if (peer) @@ -550,7 +529,7 @@ void InboundLedger::trigger (Peer::ref peer) BOOST_FOREACH (SHAMapNode const& it, nodeIDs) { * (tmGL.add_nodeids ()) = it.getRawString (); - } + } WriteLog (lsTRACE, InboundLedger) << "Sending AS node " << nodeIDs.size () << " request to " << (peer ? "selected peer" : "all peers"); CondLog (nodeIDs.size () == 1, lsTRACE, InboundLedger) << "AS node: " << nodeIDs[0]; @@ -638,13 +617,15 @@ void InboundLedger::filterNodes (std::vector& nodeIDs, std::vectorgetTransHash ()) + if (mLedger->getTransHash ().isZero ()) mHaveTransactions = true; - if (!mLedger->getAccountHash ()) + if (mLedger->getAccountHash ().isZero ()) mHaveState = true; mLedger->setAcquiring (); return true; } +/** Process TX data received from a peer + Call with a lock +*/ bool InboundLedger::takeTxNode (const std::list& nodeIDs, const std::list< Blob >& data, SHAMapAddNode& san) { - ScopedLockType sl (mLock, __FILE__, __LINE__); if (!mHaveBase) + { + WriteLog (lsWARNING, InboundLedger) << "TX node without base"; + san.incInvalid(); return false; + } if (mHaveTransactions || mFailed) + { + san.incDuplicate(); return true; + } std::list::const_iterator nodeIDit = nodeIDs.begin (); std::list< Blob >::const_iterator nodeDatait = data.begin (); @@ -700,13 +690,15 @@ bool InboundLedger::takeTxNode (const std::list& nodeIDs, { if (nodeIDit->isRoot ()) { - if (!san.combine (mLedger->peekTransactionMap ()->addRootNode (mLedger->getTransHash (), *nodeDatait, - snfWIRE, &tFilter))) + san += mLedger->peekTransactionMap ()->addRootNode (mLedger->getTransHash (), *nodeDatait, + snfWIRE, &tFilter); + if (!san.isGood()) return false; } else { - if (!san.combine (mLedger->peekTransactionMap ()->addKnownNode (*nodeIDit, *nodeDatait, &tFilter))) + san += mLedger->peekTransactionMap ()->addKnownNode (*nodeIDit, *nodeDatait, &tFilter); + if (!san.isGood()) return false; } @@ -729,6 +721,9 @@ bool InboundLedger::takeTxNode (const std::list& nodeIDs, return true; } +/** Process AS data received from a peer + Call with a lock +*/ bool InboundLedger::takeAsNode (const std::list& nodeIDs, const std::list< Blob >& data, SHAMapAddNode& san) { @@ -740,11 +735,15 @@ bool InboundLedger::takeAsNode (const std::list& nodeIDs, if (!mHaveBase) { WriteLog (lsWARNING, InboundLedger) << "Don't have ledger base"; + san.incInvalid(); return false; } if (mHaveState || mFailed) + { + san.incDuplicate(); return true; + } std::list::const_iterator nodeIDit = nodeIDs.begin (); std::list< Blob >::const_iterator nodeDatait = data.begin (); @@ -754,17 +753,22 @@ bool InboundLedger::takeAsNode (const std::list& nodeIDs, { if (nodeIDit->isRoot ()) { - if (!san.combine (mLedger->peekAccountStateMap ()->addRootNode (mLedger->getAccountHash (), - *nodeDatait, snfWIRE, &tFilter))) + san += mLedger->peekAccountStateMap () + ->addRootNode (mLedger->getAccountHash (), *nodeDatait, snfWIRE, &tFilter); + if (!san.isGood ()) { WriteLog (lsWARNING, InboundLedger) << "Bad ledger base"; return false; } } - else if (!san.combine (mLedger->peekAccountStateMap ()->addKnownNode (*nodeIDit, *nodeDatait, &tFilter))) + else { - WriteLog (lsWARNING, InboundLedger) << "Unable to add AS node"; - return false; + san += mLedger->peekAccountStateMap ()->addKnownNode (*nodeIDit, *nodeDatait, &tFilter); + if (!san.isGood ()) + { + WriteLog (lsWARNING, InboundLedger) << "Unable to add AS node"; + return false; + } } ++nodeIDit; @@ -786,34 +790,48 @@ bool InboundLedger::takeAsNode (const std::list& nodeIDs, return true; } +/** Process AS root node received from a peer + Call with a lock +*/ bool InboundLedger::takeAsRootNode (Blob const& data, SHAMapAddNode& san) { - ScopedLockType sl (mLock, __FILE__, __LINE__); - if (mFailed || mHaveState) + { + san.incDuplicate(); return true; + } if (!mHaveBase) + { + san.incInvalid(); return false; + } AccountStateSF tFilter (mLedger->getLedgerSeq ()); - return san.combine ( - mLedger->peekAccountStateMap ()->addRootNode (mLedger->getAccountHash (), data, snfWIRE, &tFilter)); + san += mLedger->peekAccountStateMap ()->addRootNode (mLedger->getAccountHash (), data, snfWIRE, &tFilter); + return san.isGood(); } +/** Process AS root node received from a peer + Call with a lock +*/ bool InboundLedger::takeTxRootNode (Blob const& data, SHAMapAddNode& san) { - ScopedLockType sl (mLock, __FILE__, __LINE__); - if (mFailed || mHaveState) + { + san.incDuplicate(); return true; + } if (!mHaveBase) + { + san.incInvalid(); return false; + } TransactionStateSF tFilter (mLedger->getLedgerSeq ()); - return san.combine ( - mLedger->peekTransactionMap ()->addRootNode (mLedger->getTransHash (), data, snfWIRE, &tFilter)); + san += mLedger->peekTransactionMap ()->addRootNode (mLedger->getTransHash (), data, snfWIRE, &tFilter); + return san.isGood(); } std::vector InboundLedger::getNeededHashes () @@ -849,6 +867,164 @@ std::vector InboundLedger::getNeededHashes () return ret; } +/** Stash a TMLedgerData received from a peer for later processing + Returns 'true' if we need to dispatch +*/ +bool InboundLedger::gotData (boost::weak_ptr peer, boost::shared_ptr data) +{ + ScopedLockType sl (mReceivedDataLock, __FILE__, __LINE__); + + mReceivedData.push_back (PeerDataPairType (peer, data)); + + if (mReceiveDispatched) + return false; + + mReceiveDispatched = true; + return true; +} + +/** Process one TMLedgerData + Returns the number of useful nodes +*/ +int InboundLedger::processData (boost::shared_ptr peer, protocol::TMLedgerData& packet) +{ + ScopedLockType sl (mLock, __FILE__, __LINE__); + + if (packet.type () == protocol::liBASE) + { + if (packet.nodes_size () < 1) + { + WriteLog (lsWARNING, InboundLedger) << "Got empty base data"; + peer->charge (Resource::feeInvalidRequest); + return -1; + } + + if (!mHaveBase && !takeBase (packet.nodes (0).nodedata ())) + { + WriteLog (lsWARNING, InboundLedger) << "Got invalid base data"; + peer->charge (Resource::feeInvalidRequest); + return -1; + } + + SHAMapAddNode san; + + if (!mHaveState && (packet.nodes ().size () > 1) && + !takeAsRootNode (strCopy (packet.nodes (1).nodedata ()), san)) + { + WriteLog (lsWARNING, InboundLedger) << "Included ASbase invalid"; + } + + if (!mHaveTransactions && (packet.nodes ().size () > 2) && + !takeTxRootNode (strCopy (packet.nodes (2).nodedata ()), san)) + { + WriteLog (lsWARNING, InboundLedger) << "Included TXbase invalid"; + } + + if (!san.isInvalid ()) + progress (); + else + WriteLog (lsDEBUG, InboundLedger) << "Peer sends invalid base data"; + + return san.getGood (); + } + + if ((packet.type () == protocol::liTX_NODE) || (packet.type () == protocol::liAS_NODE)) + { + std::list nodeIDs; + std::list< Blob > nodeData; + + if (packet.nodes ().size () == 0) + { + WriteLog (lsINFO, InboundLedger) << "Got response with no nodes"; + peer->charge (Resource::feeInvalidRequest); + return -1; + } + + for (int i = 0; i < packet.nodes ().size (); ++i) + { + const protocol::TMLedgerNode& node = packet.nodes (i); + + if (!node.has_nodeid () || !node.has_nodedata ()) + { + WriteLog (lsWARNING, InboundLedger) << "Got bad node"; + peer->charge (Resource::feeInvalidRequest); + return -1; + } + + nodeIDs.push_back (SHAMapNode (node.nodeid ().data (), node.nodeid ().size ())); + nodeData.push_back (Blob (node.nodedata ().begin (), node.nodedata ().end ())); + } + + SHAMapAddNode ret; + + if (packet.type () == protocol::liTX_NODE) + { + takeTxNode (nodeIDs, nodeData, ret); + WriteLog (lsDEBUG, InboundLedger) << "Ledger TX node stats: " << ret.get(); + } + else + { + takeAsNode (nodeIDs, nodeData, ret); + WriteLog (lsDEBUG, InboundLedger) << "Ledger AS node stats: " << ret.get(); + } + + if (!ret.isInvalid ()) + progress (); + else + WriteLog (lsDEBUG, InboundLedger) << "Peer sends invalid node data"; + + return ret.getGood (); + } + + return -1; +} + +/** Process pending TMLedgerData + Query the 'best' peer +*/ +void InboundLedger::runData () +{ + boost::shared_ptr chosenPeer; + int chosenPeerCount = -1; + + std::vector data; + do + { + + data.clear(); + { + ScopedLockType sl (mReceivedDataLock, __FILE__, __LINE__); + + if (mReceivedData.empty ()) + { + mReceiveDispatched = false; + break; + } + data.swap(mReceivedData); + } + + BOOST_FOREACH (PeerDataPairType& entry, data) + { + Peer::pointer peer = entry.first.lock(); + if (peer) + { + int count = processData (peer, *(entry.second)); + if (count > chosenPeerCount) + { + chosenPeer = peer; + chosenPeerCount = count; + } + } + } + + } while (1); + + if (chosenPeer) + trigger (chosenPeer); + +} + + Json::Value InboundLedger::getJson (int) { Json::Value ret (Json::objectValue); diff --git a/src/ripple_app/ledger/InboundLedger.h b/src/ripple_app/ledger/InboundLedger.h index f7a0ced45..7605b7bd8 100644 --- a/src/ripple_app/ledger/InboundLedger.h +++ b/src/ripple_app/ledger/InboundLedger.h @@ -31,6 +31,7 @@ public: static char const* getCountedObjectName () { return "InboundLedger"; } typedef boost::shared_ptr pointer; + typedef std::pair < boost::weak_ptr, boost::shared_ptr > PeerDataPairType; public: InboundLedger (uint256 const& hash, uint32 seq); @@ -69,21 +70,14 @@ public: // VFALCO TODO Make this the Listener / Observer pattern bool addOnComplete (FUNCTION_TYPE); - bool takeBase (const std::string& data); - bool takeTxNode (const std::list& IDs, const std::list& data, - SHAMapAddNode&); - bool takeTxRootNode (Blob const& data, SHAMapAddNode&); - bool takeAsNode (const std::list& IDs, const std::list& data, - SHAMapAddNode&); - bool takeAsRootNode (Blob const& data, SHAMapAddNode&); void trigger (Peer::ref); bool tryLocal (); void addPeers (); - void awaitData (); - void noAwaitData (); bool checkLocal (); void init(ScopedLockType& collectionLock, bool couldBeNew); + bool gotData (boost::weak_ptr, boost::shared_ptr); + typedef std::pair neededHash_t; std::vector getNeededHashes (); @@ -92,6 +86,7 @@ public: std::set& recentNodes, int max, bool aggressive); Json::Value getJson (int); + void runData (); private: void done (); @@ -105,6 +100,16 @@ private: boost::weak_ptr pmDowncast (); + int processData (boost::shared_ptr peer, protocol::TMLedgerData& data); + + bool takeBase (const std::string& data); + bool takeTxNode (const std::list& IDs, const std::list& data, + SHAMapAddNode&); + bool takeTxRootNode (Blob const& data, SHAMapAddNode&); + bool takeAsNode (const std::list& IDs, const std::list& data, + SHAMapAddNode&); + bool takeAsRootNode (Blob const& data, SHAMapAddNode&); + private: Ledger::pointer mLedger; bool mHaveBase; @@ -113,12 +118,17 @@ private: bool mAborted; bool mSignaled; bool mByHash; - beast::Atomic mWaitCount; uint32 mSeq; std::set mRecentTXNodes; std::set mRecentASNodes; + + // Data we have received from peers + PeerSet::LockType mReceivedDataLock; + std::vector mReceivedData; + bool mReceiveDispatched; + std::vector > mOnComplete; }; diff --git a/src/ripple_app/ledger/InboundLedgers.cpp b/src/ripple_app/ledger/InboundLedgers.cpp index d676e6492..8c20362d7 100644 --- a/src/ripple_app/ledger/InboundLedgers.cpp +++ b/src/ripple_app/ledger/InboundLedgers.cpp @@ -38,8 +38,7 @@ public: // VFALCO TODO Should this be called findOrAdd ? // - InboundLedger::pointer findCreate (uint256 const& hash, uint32 seq, - bool couldBeNew) + InboundLedger::pointer findCreate (uint256 const& hash, uint32 seq, bool couldBeNew) { assert (hash.isNonZero ()); InboundLedger::pointer ret; @@ -68,6 +67,51 @@ public: return ret; } + InboundLedger::pointer findCreateConsensusLedger (uint256 const& hash) + { + // We do not want to destroy the ledger while holding the collection lock + InboundLedger::pointer oldLedger; + + { + // If there was an old consensus inbound ledger, remove it + ScopedLockType sl (mLock, __FILE__, __LINE__); + if (mConsensusLedger.isNonZero() && (mValidationLedger != mConsensusLedger) && (hash != mConsensusLedger)) + { + boost::unordered_map::iterator it = mLedgers.find (mConsensusLedger); + if (it != mLedgers.end ()) + { + oldLedger = it->second; + mLedgers.erase (it); + } + } + mConsensusLedger = hash; + } + + return findCreate (hash, 0, true); + } + + InboundLedger::pointer findCreateValidationLedger (uint256 const& hash) + { + InboundLedger::pointer oldLedger; + + { + // If there was an old validation inbound ledger, remove it + ScopedLockType sl (mLock, __FILE__, __LINE__); + if (mValidationLedger.isNonZero() && (mValidationLedger != mConsensusLedger) && (hash != mValidationLedger)) + { + boost::unordered_map::iterator it = mLedgers.find (mValidationLedger); + if (it != mLedgers.end ()) + { + oldLedger = it->second; + mLedgers.erase (it); + } + } + mValidationLedger = hash; + } + + return findCreate (hash, 0, true); + } + InboundLedger::pointer find (uint256 const& hash) { assert (hash.isNonZero ()); @@ -104,16 +148,6 @@ public: } - bool awaitLedgerData (LedgerHash const& ledgerHash) - { - InboundLedger::pointer ledger = find (ledgerHash); - - if (!ledger) - return false; - - ledger->awaitData (); - return true; - } /* This gets called when "We got some data from an inbound ledger" @@ -127,14 +161,13 @@ public: // VFALCO TODO Why is hash passed by value? // VFALCO TODO Remove the dependency on the Peer object. - // - void gotLedgerData (Job& job, - LedgerHash hash, - boost::shared_ptr packet_ptr, - boost::weak_ptr wPeer) + /** We received a TMLedgerData from a peer. + */ + bool gotLedgerData (LedgerHash const& hash, + boost::shared_ptr peer, + boost::shared_ptr packet_ptr) { protocol::TMLedgerData& packet = *packet_ptr; - Peer::pointer peer = wPeer.lock (); WriteLog (lsTRACE, InboundLedger) << "Got data (" << packet.nodes ().size () << ") for acquiring ledger: " << hash; @@ -142,107 +175,24 @@ public: if (!ledger) { - WriteLog (lsTRACE, InboundLedger) << "Got data for ledger we're not acquiring"; + WriteLog (lsTRACE, InboundLedger) << "Got data for ledger we're no longer acquiring"; - if (peer) + // If it's state node data, stash it because it still might be useful + if (packet.type () == protocol::liAS_NODE) { - peer->charge (Resource::feeInvalidRequest); + getApp().getJobQueue().addJob(jtLEDGER_DATA, "gotStaleData", + BIND_TYPE(&InboundLedgers::gotStaleData, this, packet_ptr)); } - return; + return false; } - ledger->noAwaitData (); + // Stash the data for later processing and see if we need to dispatch + if (ledger->gotData(boost::weak_ptr(peer), packet_ptr)) + getApp().getJobQueue().addJob (jtLEDGER_DATA, "processLedgerData", + BIND_TYPE (&InboundLedgers::doLedgerData, this, P_1, hash)); - if (!peer) - return; - - if (packet.type () == protocol::liBASE) - { - if (packet.nodes_size () < 1) - { - WriteLog (lsWARNING, InboundLedger) << "Got empty base data"; - peer->charge (Resource::feeInvalidRequest); - return; - } - - if (!ledger->takeBase (packet.nodes (0).nodedata ())) - { - WriteLog (lsWARNING, InboundLedger) << "Got invalid base data"; - peer->charge (Resource::feeInvalidRequest); - return; - } - - SHAMapAddNode san = SHAMapAddNode::useful (); - - if ((packet.nodes ().size () > 1) && !ledger->takeAsRootNode (strCopy (packet.nodes (1).nodedata ()), san)) - { - WriteLog (lsWARNING, InboundLedger) << "Included ASbase invalid"; - } - - if ((packet.nodes ().size () > 2) && !ledger->takeTxRootNode (strCopy (packet.nodes (2).nodedata ()), san)) - { - WriteLog (lsWARNING, InboundLedger) << "Included TXbase invalid"; - } - - if (!san.isInvalid ()) - { - ledger->progress (); - ledger->trigger (peer); - } - else - WriteLog (lsDEBUG, InboundLedger) << "Peer sends invalid base data"; - - return; - } - - if ((packet.type () == protocol::liTX_NODE) || (packet.type () == protocol::liAS_NODE)) - { - std::list nodeIDs; - std::list< Blob > nodeData; - - if (packet.nodes ().size () <= 0) - { - WriteLog (lsINFO, InboundLedger) << "Got response with no nodes"; - peer->charge (Resource::feeInvalidRequest); - return; - } - - for (int i = 0; i < packet.nodes ().size (); ++i) - { - const protocol::TMLedgerNode& node = packet.nodes (i); - - if (!node.has_nodeid () || !node.has_nodedata ()) - { - WriteLog (lsWARNING, InboundLedger) << "Got bad node"; - peer->charge (Resource::feeInvalidRequest); - return; - } - - nodeIDs.push_back (SHAMapNode (node.nodeid ().data (), node.nodeid ().size ())); - nodeData.push_back (Blob (node.nodedata ().begin (), node.nodedata ().end ())); - } - - SHAMapAddNode ret; - - if (packet.type () == protocol::liTX_NODE) - ledger->takeTxNode (nodeIDs, nodeData, ret); - else - ledger->takeAsNode (nodeIDs, nodeData, ret); - - if (!ret.isInvalid ()) - { - ledger->progress (); - ledger->trigger (peer); - } - else - WriteLog (lsDEBUG, InboundLedger) << "Peer sends invalid node data"; - - return; - } - - WriteLog (lsWARNING, InboundLedger) << "Not sure what ledger data we got"; - peer->charge (Resource::feeInvalidRequest); + return true; } int getFetchCount (int& timeoutCount) @@ -283,6 +233,47 @@ public: return mRecentFailures.isPresent (h, false); } + void doLedgerData (Job&, LedgerHash hash) + { + InboundLedger::pointer ledger = find (hash); + + if (ledger) + ledger->runData (); + } + + /** We got some data for a ledger we are no longer acquiring + Since we paid the price to receive it, we might as well stash it in case we need it. + Nodes are received in wire format and must be stashed/hashed in prefix format + */ + void gotStaleData (boost::shared_ptr packet_ptr) + { + const uint256 uZero; + try + { + for (int i = 0; i < packet_ptr->nodes ().size (); ++i) + { + const protocol::TMLedgerNode& node = packet_ptr->nodes (i); + + if (!node.has_nodeid () || !node.has_nodedata ()) + return; + + Serializer s; + SHAMapTreeNode newNode( + SHAMapNode (node.nodeid().data(), node.nodeid().size()), + Blob (node.nodedata().begin(), node.nodedata().end()), + 0, snfWIRE, uZero, false); + newNode.addRaw(s, snfPREFIX); + + boost::shared_ptr blob = boost::make_shared (s.begin(), s.end()); + + getApp().getOPs().addFetchPack (newNode.getNodeHash(), blob); + } + } + catch (...) + { + } + } + void clearFailures () { ScopedLockType sl (mLock, __FILE__, __LINE__); @@ -399,6 +390,9 @@ private: MapType mLedgers; KeyCache mRecentFailures; + + uint256 mConsensusLedger; + uint256 mValidationLedger; }; //------------------------------------------------------------------------------ diff --git a/src/ripple_app/ledger/InboundLedgers.h b/src/ripple_app/ledger/InboundLedgers.h index d48e1aa68..82475405d 100644 --- a/src/ripple_app/ledger/InboundLedgers.h +++ b/src/ripple_app/ledger/InboundLedgers.h @@ -34,24 +34,30 @@ public: // VFALCO TODO Should this be called findOrAdd ? // virtual InboundLedger::pointer findCreate (uint256 const& hash, - uint32 seq, - bool bCouldBeNew) = 0; + uint32 seq, bool bCouldBeNew) = 0; - virtual InboundLedger::pointer find (uint256 const& hash) = 0; + virtual InboundLedger::pointer find (LedgerHash const& hash) = 0; virtual bool hasLedger (LedgerHash const& ledgerHash) = 0; - virtual void dropLedger (LedgerHash const& ledgerHash) = 0; + virtual InboundLedger::pointer findCreateConsensusLedger ( + LedgerHash const& hash) = 0; + virtual InboundLedger::pointer findCreateValidationLedger ( + LedgerHash const& hash) = 0; - virtual bool awaitLedgerData (LedgerHash const& ledgerHash) = 0; + virtual void dropLedger (LedgerHash const& ledgerHash) = 0; // VFALCO TODO Why is hash passed by value? // VFALCO TODO Remove the dependency on the Peer object. // - virtual void gotLedgerData (Job& job, - LedgerHash hash, - boost::shared_ptr packet, - boost::weak_ptr peer) = 0; + virtual bool gotLedgerData (LedgerHash const& ledgerHash, + boost::shared_ptr, + boost::shared_ptr ) = 0; + + virtual void doLedgerData (Job&, LedgerHash hash) = 0; + + virtual void gotStaleData ( + boost::shared_ptr packet) = 0; virtual int getFetchCount (int& timeoutCount) = 0; @@ -66,6 +72,7 @@ public: virtual void gotFetchPack (Job&) = 0; virtual void sweep () = 0; + virtual void onStop() = 0; }; #endif diff --git a/src/ripple_app/ledger/LedgerMaster.cpp b/src/ripple_app/ledger/LedgerMaster.cpp index 2275775a9..4b1adfc4e 100644 --- a/src/ripple_app/ledger/LedgerMaster.cpp +++ b/src/ripple_app/ledger/LedgerMaster.cpp @@ -628,7 +628,7 @@ public: if (!ledger) { - InboundLedger::pointer l = getApp().getInboundLedgers().findCreate(hash, 0, false); + InboundLedger::pointer l = getApp().getInboundLedgers().findCreateValidationLedger(hash); if (l && l->isComplete() && !l->isFailed()) ledger = l->getLedger(); else diff --git a/src/ripple_app/misc/NetworkOPs.cpp b/src/ripple_app/misc/NetworkOPs.cpp index 578c13b29..6d09b5952 100644 --- a/src/ripple_app/misc/NetworkOPs.cpp +++ b/src/ripple_app/misc/NetworkOPs.cpp @@ -50,7 +50,7 @@ public: , mLastCloseConvergeTime (1000 * LEDGER_IDLE_INTERVAL) , mLastCloseTime (0) , mLastValidationTime (0) - , mFetchPack ("FetchPack", 2048, 20) + , mFetchPack ("FetchPack", 65536, 45) , mFetchSeq (0) , mLastLoadBase (256) , mLastLoadFactor (256) @@ -3066,6 +3066,11 @@ int NetworkOPsImp::getFetchSize () void NetworkOPsImp::gotFetchPack (bool progress, uint32 seq) { + + // FIXME: Calling this function more than once will result in + // InboundLedgers::gotFetchPack being called more than once + // which is expensive. A flag should track whether we've already dispatched + getApp().getJobQueue ().addJob (jtLEDGER_DATA, "gotFetchPack", BIND_TYPE (&InboundLedgers::gotFetchPack, &getApp().getInboundLedgers (), P_1)); } diff --git a/src/ripple_app/misc/Validations.cpp b/src/ripple_app/misc/Validations.cpp index 76684d25a..d711e2e74 100644 --- a/src/ripple_app/misc/Validations.cpp +++ b/src/ripple_app/misc/Validations.cpp @@ -122,7 +122,7 @@ private: WriteLog (lsDEBUG, Validations) << "Val for " << hash << " from " << signer.humanNodePublic () << " added " << (val->isTrusted () ? "trusted/" : "UNtrusted/") << (isCurrent ? "current" : "stale"); - if (val->isTrusted ()) + if (val->isTrusted () && isCurrent) getApp().getLedgerMaster ().checkAccept (hash); // FIXME: This never forwards untrusted validations diff --git a/src/ripple_app/peers/Peer.cpp b/src/ripple_app/peers/Peer.cpp index be0551a6a..b085bb41b 100644 --- a/src/ripple_app/peers/Peer.cpp +++ b/src/ripple_app/peers/Peer.cpp @@ -2428,14 +2428,9 @@ void PeerImp::recvLedger (const boost::shared_ptr& packe return; } - if (getApp().getInboundLedgers ().awaitLedgerData (hash)) - { - getApp().getJobQueue ().addJob (jtLEDGER_DATA, "gotLedgerData", - BIND_TYPE (&InboundLedgers::gotLedgerData, &getApp().getInboundLedgers (), - P_1, hash, packet_ptr, boost::weak_ptr (shared_from_this ()))); - } - else + if (!getApp().getInboundLedgers ().gotLedgerData (hash, shared_from_this(), packet_ptr)) { + WriteLog (lsINFO, Peer) << "Got data for unwanted ledger"; charge (Resource::feeUnwantedData); } } diff --git a/src/ripple_app/peers/PeerSet.cpp b/src/ripple_app/peers/PeerSet.cpp index b9cdb2a04..a1e3a79fa 100644 --- a/src/ripple_app/peers/PeerSet.cpp +++ b/src/ripple_app/peers/PeerSet.cpp @@ -28,9 +28,10 @@ PeerSet::PeerSet (uint256 const& hash, int interval, bool txnData) , mFailed (false) , mAggressive (false) , mTxnData (txnData) + , mProgress (false) , mTimer (getApp().getIOService ()) { - mLastAction = mLastProgress = UptimeTimer::getInstance ().getElapsedSeconds (); + mLastAction = UptimeTimer::getInstance ().getElapsedSeconds (); assert ((mTimerInterval > 10) && (mTimerInterval < 30000)); } @@ -71,7 +72,10 @@ void PeerSet::invokeOnTimer () onTimer (false, sl); } else + { + clearProgress (); onTimer (true, sl); + } if (!isDone ()) setTimer (); @@ -88,7 +92,7 @@ void PeerSet::TimerEntry (boost::weak_ptr wptr, const boost::system::er { if (ptr->mTxnData) { - getApp().getJobQueue ().addJob (jtTXN_DATA, "timerEntry", + getApp().getJobQueue ().addJob (jtTXN_DATA, "timerEntryTxn", BIND_TYPE (&PeerSet::TimerJobEntry, P_1, ptr)); } else @@ -101,7 +105,7 @@ void PeerSet::TimerEntry (boost::weak_ptr wptr, const boost::system::er ptr->setTimer (); } else - getApp().getJobQueue ().addJob (jtLEDGER_DATA, "timerEntry", + getApp().getJobQueue ().addJob (jtLEDGER_DATA, "timerEntryLgr", BIND_TYPE (&PeerSet::TimerJobEntry, P_1, ptr)); } } diff --git a/src/ripple_app/peers/PeerSet.h b/src/ripple_app/peers/PeerSet.h index a1b0a36ac..251913da5 100644 --- a/src/ripple_app/peers/PeerSet.h +++ b/src/ripple_app/peers/PeerSet.h @@ -47,12 +47,16 @@ public: bool isActive (); void progress () { - mLastProgress = UptimeTimer::getInstance().getElapsedSeconds(); + mProgress = true; mAggressive = false; } + void clearProgress () + { + mProgress = false; + } bool isProgress () { - return (mLastProgress + (mTimerInterval / 1000)) > UptimeTimer::getInstance().getElapsedSeconds(); + return mProgress; } void touch () { @@ -114,7 +118,7 @@ protected: bool mAggressive; bool mTxnData; int mLastAction; - int mLastProgress; + bool mProgress; // VFALCO TODO move the responsibility for the timer to a higher level diff --git a/src/ripple_app/shamap/SHAMapAddNode.h b/src/ripple_app/shamap/SHAMapAddNode.h index 3761ef408..6e0c7098d 100644 --- a/src/ripple_app/shamap/SHAMapAddNode.h +++ b/src/ripple_app/shamap/SHAMapAddNode.h @@ -25,77 +25,110 @@ class SHAMapAddNode { public: SHAMapAddNode () - : mInvalid (false) - , mUseful (false) + : mGood (0) + , mBad (0) + , mDuplicate (0) { } - void setInvalid () + SHAMapAddNode (int good, int bad, int duplicate) + : mGood(good) + , mBad(bad) + , mDuplicate(duplicate) { - mInvalid = true; } - void setUseful () + + void incInvalid () { - mUseful = true; + ++mBad; } + void incUseful () + { + ++mGood; + } + void incDuplicate () + { + ++mDuplicate; + } + void reset () { - mInvalid = false; - mUseful = false; + mGood = mBad = mDuplicate = 0; + } + + int getGood () + { + return mGood; } bool isInvalid () const { - return mInvalid; + return mBad > 0; } bool isUseful () const { - return mUseful; + return mGood > 0; } - bool combine (SHAMapAddNode const& n) + SHAMapAddNode& operator+= (SHAMapAddNode const& n) { - // VFALCO NOTE What is the meaning of these lines? + mGood += n.mGood; + mBad += n.mBad; + mDuplicate += n.mDuplicate; - if (n.mInvalid) - { - mInvalid = true; - return false; - } - - if (n.mUseful) - mUseful = true; - - return true; + return *this; } - operator bool () const + bool isGood () const { - return !mInvalid; + return (mGood + mDuplicate) > mBad; } - static SHAMapAddNode okay () + static SHAMapAddNode duplicate () { - return SHAMapAddNode (false, false); + return SHAMapAddNode (0, 0, 1); } static SHAMapAddNode useful () { - return SHAMapAddNode (false, true); + return SHAMapAddNode (1, 0, 0); } static SHAMapAddNode invalid () { - return SHAMapAddNode (true, false); + return SHAMapAddNode (0, 1, 0); + } + + std::string get () + { + std::string ret; + if (mGood > 0) + { + ret.append("good:"); + ret.append(lexicalCastThrow(mGood)); + } + if (mBad > 0) + { + if (!ret.empty()) + ret.append(" "); + ret.append("bad:"); + ret.append(lexicalCastThrow(mBad)); + } + if (mDuplicate > 0) + { + if (!ret.empty()) + ret.append(" "); + ret.append("dupe:"); + ret.append(lexicalCastThrow(mDuplicate)); + } + if (ret.empty ()) + ret = "no nodes processed"; + return ret; } private: - SHAMapAddNode (bool i, bool u) - : mInvalid (i) - , mUseful (u) - { - } - bool mInvalid; - bool mUseful; + int mGood; + int mBad; + int mDuplicate; }; #endif diff --git a/src/ripple_app/shamap/SHAMapSync.cpp b/src/ripple_app/shamap/SHAMapSync.cpp index c524bab9c..db64ff6cb 100644 --- a/src/ripple_app/shamap/SHAMapSync.cpp +++ b/src/ripple_app/shamap/SHAMapSync.cpp @@ -101,6 +101,7 @@ void SHAMap::getMissingNodes (std::vector& nodeIDs, std::vectorisValid ()); + assert (root->getNodeHash().isNonZero ()); if (root->isFullBelow ()) { @@ -260,7 +261,7 @@ SHAMapAddNode SHAMap::addRootNode (Blob const& rootNode, SHANodeFormat format, if (root->getNodeHash ().isNonZero ()) { WriteLog (lsTRACE, SHAMap) << "got root node, already have one"; - return SHAMapAddNode::okay (); + return SHAMapAddNode::duplicate (); } assert (mSeq >= 1); @@ -277,12 +278,10 @@ SHAMapAddNode SHAMap::addRootNode (Blob const& rootNode, SHANodeFormat format, root = node; mTNByID[*root] = root; - if (root->getNodeHash ().isZero ()) - { - root->setFullBelow (); + if (root->isLeaf()) clearSynching (); - } - else if (filter) + + if (filter) { Serializer s; root->addRaw (s, snfPREFIX); @@ -302,7 +301,7 @@ SHAMapAddNode SHAMap::addRootNode (uint256 const& hash, Blob const& rootNode, SH { WriteLog (lsTRACE, SHAMap) << "got root node, already have one"; assert (root->getNodeHash () == hash); - return SHAMapAddNode::okay (); + return SHAMapAddNode::duplicate (); } assert (mSeq >= 1); @@ -315,12 +314,10 @@ SHAMapAddNode SHAMap::addRootNode (uint256 const& hash, Blob const& rootNode, SH root = node; mTNByID[*root] = root; - if (root->getNodeHash ().isZero ()) - { - root->setFullBelow (); + if (root->isLeaf()) clearSynching (); - } - else if (filter) + + if (filter) { Serializer s; root->addRaw (s, snfPREFIX); @@ -338,13 +335,13 @@ SHAMapAddNode SHAMap::addKnownNode (const SHAMapNode& node, Blob const& rawNode, if (!isSynching ()) { WriteLog (lsTRACE, SHAMap) << "AddKnownNode while not synching"; - return SHAMapAddNode::okay (); + return SHAMapAddNode::duplicate (); } ScopedLockType sl (mLock, __FILE__, __LINE__); if (checkCacheNode (node)) // Do we already have this node? - return SHAMapAddNode::okay (); + return SHAMapAddNode::duplicate (); SHAMapTreeNode::pointer parent = checkCacheNode(node.getParentNodeID()); SHAMapTreeNode* iNode = parent ? parent.get() : root.get (); @@ -361,7 +358,7 @@ SHAMapAddNode SHAMap::addKnownNode (const SHAMapNode& node, Blob const& rawNode, } if (fullBelowCache.isPresent (iNode->getChildHash (branch))) - return SHAMapAddNode::okay (); + return SHAMapAddNode::duplicate (); SHAMapTreeNode *nextNode = getNodePointerNT (iNode->getChildNodeID (branch), iNode->getChildHash (branch), filter); if (!nextNode) @@ -400,7 +397,7 @@ SHAMapAddNode SHAMap::addKnownNode (const SHAMapNode& node, Blob const& rawNode, } WriteLog (lsTRACE, SHAMap) << "got node, already had it (late)"; - return SHAMapAddNode::okay (); + return SHAMapAddNode::duplicate (); } bool SHAMap::deepCompare (SHAMap& other) @@ -737,7 +734,7 @@ public: unexpected (gotNodes.size () < 1, "NodeSize"); - unexpected (!destination.addRootNode (*gotNodes.begin (), snfWIRE, NULL), "AddRootNode"); + unexpected (!destination.addRootNode (*gotNodes.begin (), snfWIRE, NULL).isGood(), "AddRootNode"); nodeIDs.clear (); gotNodes.clear (); @@ -791,7 +788,7 @@ public: bytes += rawNodeIterator->size (); #endif - if (!destination.addKnownNode (*nodeIDIterator, *rawNodeIterator, NULL)) + if (!destination.addKnownNode (*nodeIDIterator, *rawNodeIterator, NULL).isGood ()) { WriteLog (lsTRACE, SHAMap) << "AddKnownNode fails"; fail ("AddKnownNode"); diff --git a/src/ripple_app/tx/TransactionAcquire.cpp b/src/ripple_app/tx/TransactionAcquire.cpp index b5075df51..caf9e0c35 100644 --- a/src/ripple_app/tx/TransactionAcquire.cpp +++ b/src/ripple_app/tx/TransactionAcquire.cpp @@ -122,9 +122,14 @@ boost::weak_ptr TransactionAcquire::pmDowncast () void TransactionAcquire::trigger (Peer::ref peer) { - if (mComplete || mFailed) + if (mComplete) { - WriteLog (lsINFO, TransactionAcquire) << "complete or failed"; + WriteLog (lsINFO, TransactionAcquire) << "trigger after complete"; + return; + } + if (mFailed) + { + WriteLog (lsINFO, TransactionAcquire) << "trigger after fail"; return; } @@ -167,7 +172,9 @@ void TransactionAcquire::trigger (Peer::ref peer) tmGL.set_querytype (protocol::qtINDIRECT); BOOST_FOREACH (SHAMapNode & it, nodeIDs) - * (tmGL.add_nodeids ()) = it.getRawString (); + { + * (tmGL.add_nodeids ()) = it.getRawString (); + } sendRequest (tmGL, peer); } } @@ -201,20 +208,15 @@ SHAMapAddNode TransactionAcquire::takeNodes (const std::list& nodeID if (nodeIDit->isRoot ()) { if (mHaveRoot) - { - WriteLog (lsWARNING, TransactionAcquire) << "Got root TXS node, already have it"; - return SHAMapAddNode (); - } - - if (!mMap->addRootNode (getHash (), *nodeDatait, snfWIRE, NULL)) + WriteLog (lsDEBUG, TransactionAcquire) << "Got root TXS node, already have it"; + else if (!mMap->addRootNode (getHash (), *nodeDatait, snfWIRE, NULL).isGood()) { WriteLog (lsWARNING, TransactionAcquire) << "TX acquire got bad root node"; - return SHAMapAddNode::invalid (); } else mHaveRoot = true; } - else if (!mMap->addKnownNode (*nodeIDit, *nodeDatait, &sf)) + else if (!mMap->addKnownNode (*nodeIDit, *nodeDatait, &sf).isGood()) { WriteLog (lsWARNING, TransactionAcquire) << "TX acquire got bad non-root node"; return SHAMapAddNode::invalid ();