Use peer IDs in the acquire code. Fix a bug where we issue way too many acquire queries.

This commit is contained in:
JoelKatz
2012-11-05 17:16:00 -08:00
parent 4d067676ea
commit 66f8fad377
4 changed files with 57 additions and 74 deletions

View File

@@ -26,42 +26,15 @@ PeerSet::PeerSet(const uint256& hash, int interval) : mHash(hash), mTimerInterva
void PeerSet::peerHas(Peer::ref ptr)
{
boost::recursive_mutex::scoped_lock sl(mLock);
std::vector< boost::weak_ptr<Peer> >::iterator it = mPeers.begin();
while (it != mPeers.end())
{
Peer::pointer pr = it->lock();
if (!pr) // we have a dead entry, remove it
it = mPeers.erase(it);
else
{
if (pr->samePeer(ptr))
return; // we already have this peer
++it;
}
}
mPeers.push_back(ptr);
if (!mPeers.insert(std::make_pair(ptr->getPeerId(), 0)).second)
return;
newPeer(ptr);
}
void PeerSet::badPeer(Peer::ref ptr)
{
boost::recursive_mutex::scoped_lock sl(mLock);
std::vector< boost::weak_ptr<Peer> >::iterator it = mPeers.begin();
while (it != mPeers.end())
{
Peer::pointer pr = it->lock();
if (!pr) // we have a dead entry, remove it
it = mPeers.erase(it);
else
{
if (ptr->samePeer(pr))
{ // We found a pointer to the bad peer
mPeers.erase(it);
return;
}
++it;
}
}
mPeers.erase(ptr->getPeerId());
}
void PeerSet::resetTimer()
@@ -76,10 +49,13 @@ void PeerSet::invokeOnTimer()
{
++mTimeouts;
cLog(lsWARNING) << "Timeout(" << mTimeouts << ") pc=" << mPeers.size() << " acquiring " << mHash;
onTimer(false);
}
else
{
mProgress = false;
onTimer();
onTimer(true);
}
}
void PeerSet::TimerEntry(boost::weak_ptr<PeerSet> wptr, const boost::system::error_code& result)
@@ -138,18 +114,19 @@ bool LedgerAcquire::tryLocal()
return mHaveTransactions && mHaveState;
}
void LedgerAcquire::onTimer()
void LedgerAcquire::onTimer(bool progress)
{
if (getTimeouts() > 6)
{
setFailed();
done();
}
else
else if (!progress)
{
if (!getPeerCount())
addPeers();
trigger(Peer::pointer(), true);
else
trigger(Peer::pointer(), true);
}
}
@@ -225,13 +202,13 @@ void LedgerAcquire::trigger(Peer::ref peer, bool timer)
if (sLog(lsTRACE))
{
if (peer)
cLog(lsTRACE) << "Trigger acquiring ledger " << mHash << " from " << peer->getIP();
cLog(lsTRACE) << "Trigger acquiring ledger " << mHash << " from " << peer->getIP();
else
cLog(lsTRACE) << "Trigger acquiring ledger " << mHash;
cLog(lsTRACE) << "Trigger acquiring ledger " << mHash;
if (mComplete || mFailed)
cLog(lsTRACE) << "complete=" << mComplete << " failed=" << mFailed;
cLog(lsTRACE) << "complete=" << mComplete << " failed=" << mFailed;
else
cLog(lsTRACE) << "base=" << mHaveBase << " tx=" << mHaveTransactions << " as=" << mHaveState;
cLog(lsTRACE) << "base=" << mHaveBase << " tx=" << mHaveTransactions << " as=" << mHaveState;
}
if (!mHaveBase)
@@ -281,7 +258,7 @@ void LedgerAcquire::trigger(Peer::ref peer, bool timer)
BOOST_FOREACH(SHAMapNode& it, nodeIDs)
*(tmGL.add_nodeids()) = it.getRawString();
cLog(lsTRACE) << "Sending TX node " << nodeIDs.size()
<< "request to " << (peer ? "selected peer" : "all peers");
<< " request to " << (peer ? "selected peer" : "all peers");
sendRequest(tmGL, peer);
}
}
@@ -325,7 +302,8 @@ void LedgerAcquire::trigger(Peer::ref peer, bool timer)
BOOST_FOREACH(SHAMapNode& it, nodeIDs)
*(tmGL.add_nodeids()) = it.getRawString();
cLog(lsTRACE) << "Sending AS node " << nodeIDs.size()
<< "request to " << (peer ? "selected peer" : "all peers");
<< " request to " << (peer ? "selected peer" : "all peers");
tLog(nodeIDs.size() == 1, lsTRACE) << "AS node: " << nodeIDs[0];
sendRequest(tmGL, peer);
}
}
@@ -355,42 +333,32 @@ void PeerSet::sendRequest(const ripple::TMGetLedger& tmGL)
return;
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(tmGL, ripple::mtGET_LEDGER);
std::vector< boost::weak_ptr<Peer> >::iterator it = mPeers.begin();
while (it != mPeers.end())
for (boost::unordered_map<uint64, int>::iterator it = mPeers.begin(), end = mPeers.end(); it != end; ++it)
{
if (it->expired())
it = mPeers.erase(it);
else
{
// FIXME: Track last peer sent to and time sent
Peer::pointer peer = it->lock();
if (peer)
peer->sendPacket(packet);
return;
}
Peer::pointer peer = theApp->getConnectionPool().getPeerById(it->first);
if (peer)
peer->sendPacket(packet);
}
}
int PeerSet::takePeerSetFrom(const PeerSet& s)
int PeerSet::takePeerSetFrom(const PeerSet& s)
{
int ret = 0;
mPeers.clear();
mPeers.reserve(s.mPeers.size());
BOOST_FOREACH(const boost::weak_ptr<Peer>& p, s.mPeers)
if (p.lock())
{
mPeers.push_back(p);
++ret;
}
for (boost::unordered_map<uint64, int>::const_iterator it = s.mPeers.begin(), end = s.mPeers.end();
it != end; ++it)
{
mPeers.insert(std::make_pair(it->first, 0));
++ret;
}
return ret;
}
int PeerSet::getPeerCount() const
{
int ret = 0;
BOOST_FOREACH(const boost::weak_ptr<Peer>& p, mPeers)
if (p.lock())
for (boost::unordered_map<uint64, int>::const_iterator it = mPeers.begin(), end = mPeers.end(); it != end; ++it)
if (theApp->getConnectionPool().hasPeer(it->first))
++ret;
return ret;
}
@@ -464,10 +432,15 @@ bool LedgerAcquire::takeTxNode(const std::list<SHAMapNode>& nodeIDs,
bool LedgerAcquire::takeAsNode(const std::list<SHAMapNode>& nodeIDs,
const std::list< std::vector<unsigned char> >& data)
{
#ifdef LA_DEBUG
cLog(lsTRACE) << "got ASdata acquiring ledger " << mHash;
#endif
if (!mHaveBase) return false;
cLog(lsTRACE) << "got ASdata (" << nodeIDs.size() <<") acquiring ledger " << mHash;
tLog(nodeIDs.size() == 1, lsTRACE) << "got AS node: " << nodeIDs.front();
if (!mHaveBase)
{
cLog(lsWARNING) << "Don't have ledger base";
return false;
}
std::list<SHAMapNode>::const_iterator nodeIDit = nodeIDs.begin();
std::list< std::vector<unsigned char> >::const_iterator nodeDatait = data.begin();
AccountStateSF tFilter(mLedger->getHash(), mLedger->getLedgerSeq());
@@ -477,10 +450,16 @@ bool LedgerAcquire::takeAsNode(const std::list<SHAMapNode>& nodeIDs,
{
if (!mLedger->peekAccountStateMap()->addRootNode(mLedger->getAccountHash(),
*nodeDatait, snfWIRE, &tFilter))
{
cLog(lsWARNING) << "Bad ledger base";
return false;
}
}
else if (!mLedger->peekAccountStateMap()->addKnownNode(*nodeIDit, *nodeDatait, &tFilter))
{
cLog(lsWARNING) << "Unable to add AS node";
return false;
}
++nodeIDit;
++nodeDatait;
}
@@ -582,7 +561,7 @@ bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref
}
if (!ledger->takeBase(packet.nodes(0).nodedata()))
{
cLog(lsWARNING) << "Got unwanted base data";
cLog(lsWARNING) << "Got invalid base data";
return false;
}
if ((packet.nodes().size() > 1) && !ledger->takeAsRootNode(strCopy(packet.nodes(1).nodedata())))
@@ -611,7 +590,10 @@ bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref
{
const ripple::TMLedgerNode& node = packet.nodes(i);
if (!node.has_nodeid() || !node.has_nodedata())
{
cLog(lsWARNING) << "Got bad node";
return false;
}
nodeIDs.push_back(SHAMapNode(node.nodeid().data(), node.nodeid().size()));
nodeData.push_back(std::vector<unsigned char>(node.nodedata().begin(), node.nodedata().end()));

View File

@@ -9,6 +9,7 @@
#include <boost/function.hpp>
#include <boost/asio.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/unordered_map.hpp>
#include <boost/weak_ptr.hpp>
#include "Ledger.h"
@@ -28,7 +29,7 @@ protected:
boost::recursive_mutex mLock;
boost::asio::deadline_timer mTimer;
std::vector< boost::weak_ptr<Peer> > mPeers;
boost::unordered_map<uint64, int> mPeers;
PeerSet(const uint256& hash, int interval);
virtual ~PeerSet() { ; }
@@ -53,7 +54,7 @@ public:
protected:
virtual void newPeer(Peer::ref) = 0;
virtual void onTimer(void) = 0;
virtual void onTimer(bool progress) = 0;
virtual boost::weak_ptr<PeerSet> pmDowncast() = 0;
void setComplete() { mComplete = true; }
@@ -76,7 +77,7 @@ protected:
std::vector< boost::function<void (LedgerAcquire::pointer)> > mOnComplete;
void done();
void onTimer();
void onTimer(bool progress);
void newPeer(Peer::ref peer) { trigger(peer, false); }

View File

@@ -46,7 +46,7 @@ void TransactionAcquire::done()
}
}
void TransactionAcquire::onTimer()
void TransactionAcquire::onTimer(bool progress)
{
if (!getPeerCount())
{ // out of peers
@@ -68,7 +68,7 @@ void TransactionAcquire::onTimer()
peerHas(peer);
}
}
else
else if (!progress)
trigger(Peer::pointer(), true);
}

View File

@@ -30,7 +30,7 @@ protected:
SHAMap::pointer mMap;
bool mHaveRoot;
void onTimer();
void onTimer(bool progress);
void newPeer(Peer::ref peer) { trigger(peer, false); }
void done();