diff --git a/src/cpp/ripple/LedgerAcquire.cpp b/src/cpp/ripple/LedgerAcquire.cpp index e345dc18ca..cf80364607 100644 --- a/src/cpp/ripple/LedgerAcquire.cpp +++ b/src/cpp/ripple/LedgerAcquire.cpp @@ -98,7 +98,7 @@ bool PeerSet::isActive() LedgerAcquire::LedgerAcquire(const uint256& hash) : PeerSet(hash, LEDGER_ACQUIRE_TIMEOUT), mHaveBase(false), mHaveState(false), mHaveTransactions(false), mAborted(false), mSignaled(false), mAccept(false), - mByHash(true) + mByHash(true), mWaitCount(0) { #ifdef LA_DEBUG cLog(lsTRACE) << "Acquiring ledger " << mHash; @@ -214,6 +214,18 @@ void LedgerAcquire::onTimer(bool progress) } } +void LedgerAcquire::awaitData() +{ + boost::recursive_mutex::scoped_lock sl(mLock); + ++mWaitCount; +} + +void LedgerAcquire::noAwaitData() +{ + boost::recursive_mutex::scoped_lock sl(mLock); + if (mWaitCount > 0 ) --mWaitCount; +} + void LedgerAcquire::addPeers() { std::vector peerList = theApp->getConnectionPool().getPeerVector(); @@ -300,6 +312,13 @@ void LedgerAcquire::trigger(Peer::ref peer) return; } + if ((mWaitCount > 0) && peer) + { + mRecentPeers.push_back(peer->getPeerId()); + cLog(lsTRACE) << "Deferring peer"; + return; + } + if (sLog(lsTRACE)) { if (peer) @@ -484,6 +503,8 @@ void LedgerAcquire::trigger(Peer::ref peer) } } + mRecentPeers.clear(); + if (mComplete || mFailed) { cLog(lsDEBUG) << "Done:" << (mComplete ? " complete" : "") << (mFailed ? " failed " : " ") @@ -846,21 +867,23 @@ void LedgerAcquireMaster::dropLedger(const uint256& hash) mLedgers.erase(hash); } -void LedgerAcquireMaster::gotLedgerData(Job&, boost::shared_ptr packet_ptr, - boost::weak_ptr wPeer) +bool LedgerAcquireMaster::awaitLedgerData(const uint256& ledgerHash) +{ + LedgerAcquire::pointer ledger = find(ledgerHash); + if (!ledger) + return false; + ledger->awaitData(); + return true; +} + +void LedgerAcquireMaster::gotLedgerData(Job&, uint256 hash, + boost::shared_ptr packet_ptr, boost::weak_ptr wPeer) { ripple::TMLedgerData& packet = *packet_ptr; Peer::pointer peer = wPeer.lock(); if (!peer) return; - uint256 hash; - if (packet.ledgerhash().size() != 32) - { - peer->punishPeer(LT_InvalidRequest); - return; - } - memcpy(hash.begin(), packet.ledgerhash().data(), 32); cLog(lsTRACE) << "Got data (" << packet.nodes().size() << ") for acquiring ledger: " << hash; LedgerAcquire::pointer ledger = find(hash); @@ -870,6 +893,7 @@ void LedgerAcquireMaster::gotLedgerData(Job&, boost::shared_ptrpunishPeer(LT_InvalidRequest); return; } + ledger->noAwaitData(); if (packet.type() == ripple::liBASE) { diff --git a/src/cpp/ripple/LedgerAcquire.h b/src/cpp/ripple/LedgerAcquire.h index c255ef6b7e..4193ce96e1 100644 --- a/src/cpp/ripple/LedgerAcquire.h +++ b/src/cpp/ripple/LedgerAcquire.h @@ -87,11 +87,14 @@ public: protected: Ledger::pointer mLedger; bool mHaveBase, mHaveState, mHaveTransactions, mAborted, mSignaled, mAccept, mByHash; + int mWaitCount; std::set mRecentTXNodes; std::set mRecentASNodes; - std::vector< FUNCTION_TYPE > mOnComplete; + std::vector mRecentPeers; + + std::vector< FUNCTION_TYPE > mOnComplete; void done(); void onTimer(bool progress); @@ -124,6 +127,8 @@ public: void trigger(Peer::ref); bool tryLocal(); void addPeers(); + void awaitData(); + void noAwaitData(); typedef std::pair neededHash_t; std::vector getNeededHashes(); @@ -148,7 +153,9 @@ public: LedgerAcquire::pointer find(const uint256& hash); bool hasLedger(const uint256& ledgerHash); void dropLedger(const uint256& ledgerHash); - void gotLedgerData(Job&, boost::shared_ptr packet, boost::weak_ptr peer); + + bool awaitLedgerData(const uint256& ledgerHash); + void gotLedgerData(Job&, uint256 hash, boost::shared_ptr packet, boost::weak_ptr peer); int getFetchCount(int& timeoutCount); void logFailure(const uint256& h) { mRecentFailures.add(h); } diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/Peer.cpp index 4a87a4dc4d..55bb4f4e09 100644 --- a/src/cpp/ripple/Peer.cpp +++ b/src/cpp/ripple/Peer.cpp @@ -1659,18 +1659,17 @@ void Peer::recvLedger(const boost::shared_ptr& packet_ptr) return; } + uint256 hash; + if(packet.ledgerhash().size() != 32) + { + cLog(lsWARNING) << "TX candidate reply with invalid hash size"; + punishPeer(LT_InvalidRequest); + return; + } + memcpy(hash.begin(), packet.ledgerhash().data(), 32); + if (packet.type() == ripple::liTS_CANDIDATE) { // got data for a candidate transaction set - uint256 hash; - if(packet.ledgerhash().size() != 32) - { - cLog(lsWARNING) << "TX candidate reply with invalid hash size"; - punishPeer(LT_InvalidRequest); - return; - } - memcpy(hash.begin(), packet.ledgerhash().data(), 32); - - std::list nodeIDs; std::list< std::vector > nodeData; @@ -1692,9 +1691,12 @@ void Peer::recvLedger(const boost::shared_ptr& packet_ptr) return; } - theApp->getJobQueue().addJob(jtLEDGER_DATA, "gotLedgerData", - BIND_TYPE(&LedgerAcquireMaster::gotLedgerData, &theApp->getMasterLedgerAcquire(), - P_1, packet_ptr, boost::weak_ptr(shared_from_this()))); + if (theApp->getMasterLedgerAcquire().awaitLedgerData(hash)) + theApp->getJobQueue().addJob(jtLEDGER_DATA, "gotLedgerData", + BIND_TYPE(&LedgerAcquireMaster::gotLedgerData, &theApp->getMasterLedgerAcquire(), + P_1, hash, packet_ptr, boost::weak_ptr(shared_from_this()))); + else + punishPeer(LT_UnwantedData); } bool Peer::hasLedger(const uint256& hash) const