diff --git a/src/LedgerAcquire.cpp b/src/LedgerAcquire.cpp index 50497b3cda..3a94def523 100644 --- a/src/LedgerAcquire.cpp +++ b/src/LedgerAcquire.cpp @@ -26,42 +26,15 @@ PeerSet::PeerSet(const uint256& hash, int interval) : mHash(hash), mTimerInterva void PeerSet::peerHas(Peer::ref ptr) { boost::recursive_mutex::scoped_lock sl(mLock); - std::vector< boost::weak_ptr >::iterator it = mPeers.begin(); - while (it != mPeers.end()) - { - Peer::pointer pr = it->lock(); - if (!pr) // we have a dead entry, remove it - it = mPeers.erase(it); - else - { - if (pr->samePeer(ptr)) - return; // we already have this peer - ++it; - } - } - mPeers.push_back(ptr); + if (!mPeers.insert(std::make_pair(ptr->getPeerId(), 0)).second) + return; newPeer(ptr); } void PeerSet::badPeer(Peer::ref ptr) { boost::recursive_mutex::scoped_lock sl(mLock); - std::vector< boost::weak_ptr >::iterator it = mPeers.begin(); - while (it != mPeers.end()) - { - Peer::pointer pr = it->lock(); - if (!pr) // we have a dead entry, remove it - it = mPeers.erase(it); - else - { - if (ptr->samePeer(pr)) - { // We found a pointer to the bad peer - mPeers.erase(it); - return; - } - ++it; - } - } + mPeers.erase(ptr->getPeerId()); } void PeerSet::resetTimer() @@ -76,10 +49,13 @@ void PeerSet::invokeOnTimer() { ++mTimeouts; cLog(lsWARNING) << "Timeout(" << mTimeouts << ") pc=" << mPeers.size() << " acquiring " << mHash; + onTimer(false); } else + { mProgress = false; - onTimer(); + onTimer(true); + } } void PeerSet::TimerEntry(boost::weak_ptr wptr, const boost::system::error_code& result) @@ -138,18 +114,19 @@ bool LedgerAcquire::tryLocal() return mHaveTransactions && mHaveState; } -void LedgerAcquire::onTimer() +void LedgerAcquire::onTimer(bool progress) { if (getTimeouts() > 6) { setFailed(); done(); } - else + else if (!progress) { if (!getPeerCount()) addPeers(); - trigger(Peer::pointer(), true); + else + trigger(Peer::pointer(), true); } } @@ -225,13 +202,13 @@ void LedgerAcquire::trigger(Peer::ref peer, bool timer) if (sLog(lsTRACE)) { if (peer) - cLog(lsTRACE) << "Trigger acquiring ledger " << mHash << " from " << peer->getIP(); + cLog(lsTRACE) << "Trigger acquiring ledger " << mHash << " from " << peer->getIP(); else - cLog(lsTRACE) << "Trigger acquiring ledger " << mHash; + cLog(lsTRACE) << "Trigger acquiring ledger " << mHash; if (mComplete || mFailed) - cLog(lsTRACE) << "complete=" << mComplete << " failed=" << mFailed; + cLog(lsTRACE) << "complete=" << mComplete << " failed=" << mFailed; else - cLog(lsTRACE) << "base=" << mHaveBase << " tx=" << mHaveTransactions << " as=" << mHaveState; + cLog(lsTRACE) << "base=" << mHaveBase << " tx=" << mHaveTransactions << " as=" << mHaveState; } if (!mHaveBase) @@ -281,7 +258,7 @@ void LedgerAcquire::trigger(Peer::ref peer, bool timer) BOOST_FOREACH(SHAMapNode& it, nodeIDs) *(tmGL.add_nodeids()) = it.getRawString(); cLog(lsTRACE) << "Sending TX node " << nodeIDs.size() - << "request to " << (peer ? "selected peer" : "all peers"); + << " request to " << (peer ? "selected peer" : "all peers"); sendRequest(tmGL, peer); } } @@ -325,7 +302,8 @@ void LedgerAcquire::trigger(Peer::ref peer, bool timer) BOOST_FOREACH(SHAMapNode& it, nodeIDs) *(tmGL.add_nodeids()) = it.getRawString(); cLog(lsTRACE) << "Sending AS node " << nodeIDs.size() - << "request to " << (peer ? "selected peer" : "all peers"); + << " request to " << (peer ? "selected peer" : "all peers"); + tLog(nodeIDs.size() == 1, lsTRACE) << "AS node: " << nodeIDs[0]; sendRequest(tmGL, peer); } } @@ -355,42 +333,32 @@ void PeerSet::sendRequest(const ripple::TMGetLedger& tmGL) return; PackedMessage::pointer packet = boost::make_shared(tmGL, ripple::mtGET_LEDGER); - - std::vector< boost::weak_ptr >::iterator it = mPeers.begin(); - while (it != mPeers.end()) + for (boost::unordered_map::iterator it = mPeers.begin(), end = mPeers.end(); it != end; ++it) { - if (it->expired()) - it = mPeers.erase(it); - else - { - // FIXME: Track last peer sent to and time sent - Peer::pointer peer = it->lock(); - if (peer) - peer->sendPacket(packet); - return; - } + Peer::pointer peer = theApp->getConnectionPool().getPeerById(it->first); + if (peer) + peer->sendPacket(packet); } } -int PeerSet::takePeerSetFrom(const PeerSet& s) +int PeerSet::takePeerSetFrom(const PeerSet& s) { int ret = 0; mPeers.clear(); - mPeers.reserve(s.mPeers.size()); - BOOST_FOREACH(const boost::weak_ptr& p, s.mPeers) - if (p.lock()) - { - mPeers.push_back(p); - ++ret; - } + for (boost::unordered_map::const_iterator it = s.mPeers.begin(), end = s.mPeers.end(); + it != end; ++it) + { + mPeers.insert(std::make_pair(it->first, 0)); + ++ret; + } return ret; } int PeerSet::getPeerCount() const { int ret = 0; - BOOST_FOREACH(const boost::weak_ptr& p, mPeers) - if (p.lock()) + for (boost::unordered_map::const_iterator it = mPeers.begin(), end = mPeers.end(); it != end; ++it) + if (theApp->getConnectionPool().hasPeer(it->first)) ++ret; return ret; } @@ -464,10 +432,15 @@ bool LedgerAcquire::takeTxNode(const std::list& nodeIDs, bool LedgerAcquire::takeAsNode(const std::list& nodeIDs, const std::list< std::vector >& data) { -#ifdef LA_DEBUG - cLog(lsTRACE) << "got ASdata acquiring ledger " << mHash; -#endif - if (!mHaveBase) return false; + cLog(lsTRACE) << "got ASdata (" << nodeIDs.size() <<") acquiring ledger " << mHash; + tLog(nodeIDs.size() == 1, lsTRACE) << "got AS node: " << nodeIDs.front(); + + if (!mHaveBase) + { + cLog(lsWARNING) << "Don't have ledger base"; + return false; + } + std::list::const_iterator nodeIDit = nodeIDs.begin(); std::list< std::vector >::const_iterator nodeDatait = data.begin(); AccountStateSF tFilter(mLedger->getHash(), mLedger->getLedgerSeq()); @@ -477,10 +450,16 @@ bool LedgerAcquire::takeAsNode(const std::list& nodeIDs, { if (!mLedger->peekAccountStateMap()->addRootNode(mLedger->getAccountHash(), *nodeDatait, snfWIRE, &tFilter)) + { + cLog(lsWARNING) << "Bad ledger base"; return false; + } } else if (!mLedger->peekAccountStateMap()->addKnownNode(*nodeIDit, *nodeDatait, &tFilter)) + { + cLog(lsWARNING) << "Unable to add AS node"; return false; + } ++nodeIDit; ++nodeDatait; } @@ -582,7 +561,7 @@ bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref } if (!ledger->takeBase(packet.nodes(0).nodedata())) { - cLog(lsWARNING) << "Got unwanted base data"; + cLog(lsWARNING) << "Got invalid base data"; return false; } if ((packet.nodes().size() > 1) && !ledger->takeAsRootNode(strCopy(packet.nodes(1).nodedata()))) @@ -611,7 +590,10 @@ bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref { const ripple::TMLedgerNode& node = packet.nodes(i); if (!node.has_nodeid() || !node.has_nodedata()) + { + cLog(lsWARNING) << "Got bad node"; return false; + } nodeIDs.push_back(SHAMapNode(node.nodeid().data(), node.nodeid().size())); nodeData.push_back(std::vector(node.nodedata().begin(), node.nodedata().end())); diff --git a/src/LedgerAcquire.h b/src/LedgerAcquire.h index 2dcf018854..2ce741529a 100644 --- a/src/LedgerAcquire.h +++ b/src/LedgerAcquire.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include "Ledger.h" @@ -28,7 +29,7 @@ protected: boost::recursive_mutex mLock; boost::asio::deadline_timer mTimer; - std::vector< boost::weak_ptr > mPeers; + boost::unordered_map mPeers; PeerSet(const uint256& hash, int interval); virtual ~PeerSet() { ; } @@ -53,7 +54,7 @@ public: protected: virtual void newPeer(Peer::ref) = 0; - virtual void onTimer(void) = 0; + virtual void onTimer(bool progress) = 0; virtual boost::weak_ptr pmDowncast() = 0; void setComplete() { mComplete = true; } @@ -76,7 +77,7 @@ protected: std::vector< boost::function > mOnComplete; void done(); - void onTimer(); + void onTimer(bool progress); void newPeer(Peer::ref peer) { trigger(peer, false); } diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index 48939b5cde..bcdf318a8e 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -46,7 +46,7 @@ void TransactionAcquire::done() } } -void TransactionAcquire::onTimer() +void TransactionAcquire::onTimer(bool progress) { if (!getPeerCount()) { // out of peers @@ -68,7 +68,7 @@ void TransactionAcquire::onTimer() peerHas(peer); } } - else + else if (!progress) trigger(Peer::pointer(), true); } diff --git a/src/LedgerConsensus.h b/src/LedgerConsensus.h index 8ac72634a1..064d5800c9 100644 --- a/src/LedgerConsensus.h +++ b/src/LedgerConsensus.h @@ -30,7 +30,7 @@ protected: SHAMap::pointer mMap; bool mHaveRoot; - void onTimer(); + void onTimer(bool progress); void newPeer(Peer::ref peer) { trigger(peer, false); } void done();