Time fetches better.

This commit is contained in:
JoelKatz
2013-03-29 08:22:52 -07:00
parent 758ee2557d
commit 85afb49058
3 changed files with 58 additions and 25 deletions

View File

@@ -98,7 +98,7 @@ bool PeerSet::isActive()
LedgerAcquire::LedgerAcquire(const uint256& hash) : PeerSet(hash, LEDGER_ACQUIRE_TIMEOUT),
mHaveBase(false), mHaveState(false), mHaveTransactions(false), mAborted(false), mSignaled(false), mAccept(false),
mByHash(true)
mByHash(true), mWaitCount(0)
{
#ifdef LA_DEBUG
cLog(lsTRACE) << "Acquiring ledger " << mHash;
@@ -214,6 +214,18 @@ void LedgerAcquire::onTimer(bool progress)
}
}
void LedgerAcquire::awaitData()
{
boost::recursive_mutex::scoped_lock sl(mLock);
++mWaitCount;
}
void LedgerAcquire::noAwaitData()
{
boost::recursive_mutex::scoped_lock sl(mLock);
if (mWaitCount > 0 ) --mWaitCount;
}
void LedgerAcquire::addPeers()
{
std::vector<Peer::pointer> peerList = theApp->getConnectionPool().getPeerVector();
@@ -300,6 +312,13 @@ void LedgerAcquire::trigger(Peer::ref peer)
return;
}
if ((mWaitCount > 0) && peer)
{
mRecentPeers.push_back(peer->getPeerId());
cLog(lsTRACE) << "Deferring peer";
return;
}
if (sLog(lsTRACE))
{
if (peer)
@@ -484,6 +503,8 @@ void LedgerAcquire::trigger(Peer::ref peer)
}
}
mRecentPeers.clear();
if (mComplete || mFailed)
{
cLog(lsDEBUG) << "Done:" << (mComplete ? " complete" : "") << (mFailed ? " failed " : " ")
@@ -846,21 +867,23 @@ void LedgerAcquireMaster::dropLedger(const uint256& hash)
mLedgers.erase(hash);
}
void LedgerAcquireMaster::gotLedgerData(Job&, boost::shared_ptr<ripple::TMLedgerData> packet_ptr,
boost::weak_ptr<Peer> wPeer)
bool LedgerAcquireMaster::awaitLedgerData(const uint256& ledgerHash)
{
LedgerAcquire::pointer ledger = find(ledgerHash);
if (!ledger)
return false;
ledger->awaitData();
return true;
}
void LedgerAcquireMaster::gotLedgerData(Job&, uint256 hash,
boost::shared_ptr<ripple::TMLedgerData> packet_ptr, boost::weak_ptr<Peer> wPeer)
{
ripple::TMLedgerData& packet = *packet_ptr;
Peer::pointer peer = wPeer.lock();
if (!peer)
return;
uint256 hash;
if (packet.ledgerhash().size() != 32)
{
peer->punishPeer(LT_InvalidRequest);
return;
}
memcpy(hash.begin(), packet.ledgerhash().data(), 32);
cLog(lsTRACE) << "Got data (" << packet.nodes().size() << ") for acquiring ledger: " << hash;
LedgerAcquire::pointer ledger = find(hash);
@@ -870,6 +893,7 @@ void LedgerAcquireMaster::gotLedgerData(Job&, boost::shared_ptr<ripple::TMLedger
peer->punishPeer(LT_InvalidRequest);
return;
}
ledger->noAwaitData();
if (packet.type() == ripple::liBASE)
{

View File

@@ -87,11 +87,14 @@ public:
protected:
Ledger::pointer mLedger;
bool mHaveBase, mHaveState, mHaveTransactions, mAborted, mSignaled, mAccept, mByHash;
int mWaitCount;
std::set<SHAMapNode> mRecentTXNodes;
std::set<SHAMapNode> mRecentASNodes;
std::vector< FUNCTION_TYPE<void (LedgerAcquire::pointer)> > mOnComplete;
std::vector<uint64> mRecentPeers;
std::vector< FUNCTION_TYPE<void (LedgerAcquire::pointer)> > mOnComplete;
void done();
void onTimer(bool progress);
@@ -124,6 +127,8 @@ public:
void trigger(Peer::ref);
bool tryLocal();
void addPeers();
void awaitData();
void noAwaitData();
typedef std::pair<ripple::TMGetObjectByHash::ObjectType, uint256> neededHash_t;
std::vector<neededHash_t> getNeededHashes();
@@ -148,7 +153,9 @@ public:
LedgerAcquire::pointer find(const uint256& hash);
bool hasLedger(const uint256& ledgerHash);
void dropLedger(const uint256& ledgerHash);
void gotLedgerData(Job&, boost::shared_ptr<ripple::TMLedgerData> packet, boost::weak_ptr<Peer> peer);
bool awaitLedgerData(const uint256& ledgerHash);
void gotLedgerData(Job&, uint256 hash, boost::shared_ptr<ripple::TMLedgerData> packet, boost::weak_ptr<Peer> peer);
int getFetchCount(int& timeoutCount);
void logFailure(const uint256& h) { mRecentFailures.add(h); }

View File

@@ -1659,18 +1659,17 @@ void Peer::recvLedger(const boost::shared_ptr<ripple::TMLedgerData>& packet_ptr)
return;
}
uint256 hash;
if(packet.ledgerhash().size() != 32)
{
cLog(lsWARNING) << "TX candidate reply with invalid hash size";
punishPeer(LT_InvalidRequest);
return;
}
memcpy(hash.begin(), packet.ledgerhash().data(), 32);
if (packet.type() == ripple::liTS_CANDIDATE)
{ // got data for a candidate transaction set
uint256 hash;
if(packet.ledgerhash().size() != 32)
{
cLog(lsWARNING) << "TX candidate reply with invalid hash size";
punishPeer(LT_InvalidRequest);
return;
}
memcpy(hash.begin(), packet.ledgerhash().data(), 32);
std::list<SHAMapNode> nodeIDs;
std::list< std::vector<unsigned char> > nodeData;
@@ -1692,9 +1691,12 @@ void Peer::recvLedger(const boost::shared_ptr<ripple::TMLedgerData>& packet_ptr)
return;
}
theApp->getJobQueue().addJob(jtLEDGER_DATA, "gotLedgerData",
BIND_TYPE(&LedgerAcquireMaster::gotLedgerData, &theApp->getMasterLedgerAcquire(),
P_1, packet_ptr, boost::weak_ptr<Peer>(shared_from_this())));
if (theApp->getMasterLedgerAcquire().awaitLedgerData(hash))
theApp->getJobQueue().addJob(jtLEDGER_DATA, "gotLedgerData",
BIND_TYPE(&LedgerAcquireMaster::gotLedgerData, &theApp->getMasterLedgerAcquire(),
P_1, hash, packet_ptr, boost::weak_ptr<Peer>(shared_from_this())));
else
punishPeer(LT_UnwantedData);
}
bool Peer::hasLedger(const uint256& hash) const