mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-26 22:15:52 +00:00
Smarter peer selection for acquires.
This commit is contained in:
@@ -96,9 +96,9 @@ bool PeerSet::isActive()
|
||||
return !isDone();
|
||||
}
|
||||
|
||||
LedgerAcquire::LedgerAcquire(const uint256& hash) : PeerSet(hash, LEDGER_ACQUIRE_TIMEOUT),
|
||||
LedgerAcquire::LedgerAcquire(const uint256& hash, uint32 seq) : PeerSet(hash, LEDGER_ACQUIRE_TIMEOUT),
|
||||
mHaveBase(false), mHaveState(false), mHaveTransactions(false), mAborted(false), mSignaled(false), mAccept(false),
|
||||
mByHash(true), mWaitCount(0)
|
||||
mByHash(true), mWaitCount(0), mSeq(seq)
|
||||
{
|
||||
#ifdef LA_DEBUG
|
||||
cLog(lsTRACE) << "Acquiring ledger " << mHash;
|
||||
@@ -243,14 +243,15 @@ void LedgerAcquire::addPeers()
|
||||
// We traverse the peer list in random order so as not to favor any particular peer
|
||||
int firstPeer = rand() & vSize;
|
||||
|
||||
bool found = false;
|
||||
int found = 0;
|
||||
for (int i = 0; i < vSize; ++i)
|
||||
{
|
||||
Peer::ref peer = peerList[(i + firstPeer) % vSize];
|
||||
if (peer->hasLedger(getHash()))
|
||||
if (peer->hasLedger(getHash(), mSeq))
|
||||
{
|
||||
found = true;
|
||||
peerHas(peer);
|
||||
if (++found == 3)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -775,7 +776,7 @@ bool LedgerAcquire::takeTxRootNode(const std::vector<unsigned char>& data, SMAdd
|
||||
mLedger->peekTransactionMap()->addRootNode(mLedger->getTransHash(), data, snfWIRE, &tFilter));
|
||||
}
|
||||
|
||||
LedgerAcquire::pointer LedgerAcquireMaster::findCreate(const uint256& hash)
|
||||
LedgerAcquire::pointer LedgerAcquireMaster::findCreate(const uint256& hash, uint32 seq)
|
||||
{
|
||||
assert(hash.isNonZero());
|
||||
boost::mutex::scoped_lock sl(mLock);
|
||||
@@ -785,7 +786,7 @@ LedgerAcquire::pointer LedgerAcquireMaster::findCreate(const uint256& hash)
|
||||
ptr->touch();
|
||||
return ptr;
|
||||
}
|
||||
ptr = boost::make_shared<LedgerAcquire>(hash);
|
||||
ptr = boost::make_shared<LedgerAcquire>(hash, seq);
|
||||
if (!ptr->isDone())
|
||||
{
|
||||
ptr->addPeers();
|
||||
@@ -941,6 +942,8 @@ void LedgerAcquireMaster::gotLedgerData(Job&, uint256 hash,
|
||||
}
|
||||
if (!san.isInvalid())
|
||||
ledger->trigger(peer);
|
||||
else
|
||||
cLog(lsDEBUG) << "Peer sends invalid base data";
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -975,6 +978,8 @@ void LedgerAcquireMaster::gotLedgerData(Job&, uint256 hash,
|
||||
ledger->takeAsNode(nodeIDs, nodeData, ret);
|
||||
if (!ret.isInvalid())
|
||||
ledger->trigger(peer);
|
||||
else
|
||||
cLog(lsDEBUG) << "Peer sends invalid node data";
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -85,9 +85,10 @@ public:
|
||||
typedef boost::shared_ptr<LedgerAcquire> pointer;
|
||||
|
||||
protected:
|
||||
Ledger::pointer mLedger;
|
||||
bool mHaveBase, mHaveState, mHaveTransactions, mAborted, mSignaled, mAccept, mByHash;
|
||||
int mWaitCount;
|
||||
Ledger::pointer mLedger;
|
||||
bool mHaveBase, mHaveState, mHaveTransactions, mAborted, mSignaled, mAccept, mByHash;
|
||||
int mWaitCount;
|
||||
uint32 mSeq;
|
||||
|
||||
std::set<SHAMapNode> mRecentTXNodes;
|
||||
std::set<SHAMapNode> mRecentASNodes;
|
||||
@@ -104,7 +105,7 @@ protected:
|
||||
boost::weak_ptr<PeerSet> pmDowncast();
|
||||
|
||||
public:
|
||||
LedgerAcquire(const uint256& hash);
|
||||
LedgerAcquire(const uint256& hash, uint32 seq);
|
||||
virtual ~LedgerAcquire() { ; }
|
||||
|
||||
bool isBase() const { return mHaveBase; }
|
||||
@@ -149,7 +150,7 @@ protected:
|
||||
public:
|
||||
LedgerAcquireMaster() : mRecentFailures("LedgerAcquireRecentFailures", 0, LEDGER_REACQUIRE_INTERVAL) { ; }
|
||||
|
||||
LedgerAcquire::pointer findCreate(const uint256& hash);
|
||||
LedgerAcquire::pointer findCreate(const uint256& hash, uint32 seq);
|
||||
LedgerAcquire::pointer find(const uint256& hash);
|
||||
bool hasLedger(const uint256& ledgerHash);
|
||||
void dropLedger(const uint256& ledgerHash);
|
||||
|
||||
@@ -298,7 +298,7 @@ void LedgerConsensus::handleLCL(const uint256& lclHash)
|
||||
{ // need to start acquiring the correct consensus LCL
|
||||
cLog(lsWARNING) << "Need consensus ledger " << mPrevLedgerHash;
|
||||
|
||||
mAcquiringLedger = theApp->getMasterLedgerAcquire().findCreate(mPrevLedgerHash);
|
||||
mAcquiringLedger = theApp->getMasterLedgerAcquire().findCreate(mPrevLedgerHash, 0);
|
||||
mHaveCorrectLCL = false;
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -230,7 +230,7 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l
|
||||
return false;
|
||||
}
|
||||
|
||||
mMissingLedger = theApp->getMasterLedgerAcquire().findCreate(ledgerHash);
|
||||
mMissingLedger = theApp->getMasterLedgerAcquire().findCreate(ledgerHash, ledgerSeq);
|
||||
if (mMissingLedger->isComplete())
|
||||
{
|
||||
Ledger::pointer lgr = mMissingLedger->getLedger();
|
||||
@@ -270,7 +270,7 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l
|
||||
if ((fetchCount < fetchMax) && (it.first < ledgerSeq) &&
|
||||
!mCompleteLedgers.hasValue(it.first) && !theApp->getMasterLedgerAcquire().find(it.second))
|
||||
{
|
||||
LedgerAcquire::pointer acq = theApp->getMasterLedgerAcquire().findCreate(it.second);
|
||||
LedgerAcquire::pointer acq = theApp->getMasterLedgerAcquire().findCreate(it.second, it.first);
|
||||
if (acq && acq->isComplete())
|
||||
{
|
||||
acq->getLedger()->setAccepted();
|
||||
@@ -602,7 +602,7 @@ void LedgerMaster::tryPublish()
|
||||
}
|
||||
else
|
||||
{
|
||||
LedgerAcquire::pointer acq = theApp->getMasterLedgerAcquire().findCreate(hash);
|
||||
LedgerAcquire::pointer acq = theApp->getMasterLedgerAcquire().findCreate(hash, 0);
|
||||
if (!acq->isDone())
|
||||
{
|
||||
acq->setAccept();
|
||||
|
||||
@@ -772,7 +772,7 @@ bool NetworkOPs::checkLastClosedLedger(const std::vector<Peer::pointer>& peerLis
|
||||
{
|
||||
cLog(lsINFO) << "Acquiring consensus ledger " << closedLedger;
|
||||
if (!mAcquiringLedger || (mAcquiringLedger->getHash() != closedLedger))
|
||||
mAcquiringLedger = theApp->getMasterLedgerAcquire().findCreate(closedLedger);
|
||||
mAcquiringLedger = theApp->getMasterLedgerAcquire().findCreate(closedLedger, 0);
|
||||
if (!mAcquiringLedger || mAcquiringLedger->isFailed())
|
||||
{
|
||||
theApp->getMasterLedgerAcquire().dropLedger(closedLedger);
|
||||
|
||||
@@ -1488,11 +1488,14 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet)
|
||||
tLog(!ledger, lsTRACE) << "Don't have ledger " << ledgerhash;
|
||||
if (!ledger && (packet.has_querytype() && !packet.has_requestcookie()))
|
||||
{
|
||||
uint32 seq = 0;
|
||||
if (packet.has_ledgerseq())
|
||||
seq = packet.ledgerseq();
|
||||
std::vector<Peer::pointer> peerList = theApp->getConnectionPool().getPeerVector();
|
||||
std::vector<Peer::pointer> usablePeers;
|
||||
BOOST_FOREACH(Peer::ref peer, peerList)
|
||||
{
|
||||
if (peer->hasLedger(ledgerhash) && (peer.get() != this))
|
||||
if (peer->hasLedger(ledgerhash, seq) && (peer.get() != this))
|
||||
usablePeers.push_back(peer);
|
||||
}
|
||||
if (usablePeers.empty())
|
||||
@@ -1716,8 +1719,10 @@ void Peer::recvLedger(const boost::shared_ptr<ripple::TMLedgerData>& packet_ptr)
|
||||
punishPeer(LT_UnwantedData);
|
||||
}
|
||||
|
||||
bool Peer::hasLedger(const uint256& hash) const
|
||||
bool Peer::hasLedger(const uint256& hash, uint32 seq) const
|
||||
{
|
||||
if ((seq != 0) && (seq >= mMinLedger) && (seq <= mMaxLedger))
|
||||
return true;
|
||||
BOOST_FOREACH(const uint256& ledger, mRecentLedgers)
|
||||
if (ledger == hash)
|
||||
return true;
|
||||
|
||||
@@ -157,7 +157,7 @@ public:
|
||||
bool isOutbound() const { return !mInbound; }
|
||||
|
||||
const uint256& getClosedLedgerHash() const { return mClosedLedgerHash; }
|
||||
bool hasLedger(const uint256& hash) const;
|
||||
bool hasLedger(const uint256& hash, uint32 seq) const;
|
||||
bool hasTxSet(const uint256& hash) const;
|
||||
uint64 getPeerId() const { return mPeerId; }
|
||||
|
||||
|
||||
@@ -71,7 +71,6 @@ void SHAMap::getMissingNodes(std::vector<SHAMapNode>& nodeIDs, std::vector<uint2
|
||||
SHAMapTreeNode::pointer ptr =
|
||||
boost::make_shared<SHAMapTreeNode>(childID, nodeData, mSeq - 1,
|
||||
snfPREFIX, childHash, true);
|
||||
cLog(lsTRACE) << "Got sync node from cache: " << *ptr;
|
||||
mTNByID[*ptr] = ptr;
|
||||
d = ptr.get();
|
||||
filter->gotNode(true, childID, childHash, nodeData, ptr->getType());
|
||||
|
||||
Reference in New Issue
Block a user