diff --git a/src/cpp/ripple/Application.cpp b/src/cpp/ripple/Application.cpp index 31809985cb..8b1d75b096 100644 --- a/src/cpp/ripple/Application.cpp +++ b/src/cpp/ripple/Application.cpp @@ -360,6 +360,7 @@ void Application::sweep() mSLECache.sweep(); AcceptedLedger::sweep(); SHAMap::sweep(); + mNetOps.sweepFetchPack(); mSweepTimer.expires_from_now(boost::posix_time::seconds(theConfig.getSize(siSweepInterval))); mSweepTimer.async_wait(boost::bind(&Application::sweep, this)); } diff --git a/src/cpp/ripple/LedgerAcquire.cpp b/src/cpp/ripple/LedgerAcquire.cpp index a54fc5f6d7..3eedaf7e1a 100644 --- a/src/cpp/ripple/LedgerAcquire.cpp +++ b/src/cpp/ripple/LedgerAcquire.cpp @@ -363,7 +363,6 @@ void LedgerAcquire::trigger(Peer::ref peer) BOOST_FOREACH(neededHash_t& p, need) { cLog(lsWARNING) << "Want: " << p.second; - theApp->getOPs().addWantedHash(p.second); if (!typeSet) { tmBH.set_type(p.first); diff --git a/src/cpp/ripple/LedgerMaster.cpp b/src/cpp/ripple/LedgerMaster.cpp index 20a3d93c06..f7169777fb 100644 --- a/src/cpp/ripple/LedgerMaster.cpp +++ b/src/cpp/ripple/LedgerMaster.cpp @@ -265,7 +265,7 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l { typedef std::pair u_pair; std::vector vec = origLedger->getLedgerHashes(); - BOOST_REVERSE_FOREACH(const u_pair& it, vec) + BOOST_FOREACH(const u_pair& it, vec) { if ((fetchCount < fetchMax) && (it.first < ledgerSeq) && !mCompleteLedgers.hasValue(it.first) && !theApp->getMasterLedgerAcquire().find(it.second)) diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index b3d3a08d69..bb67e40c4a 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -34,7 +34,7 @@ void InfoSub::onSendEmpty() NetworkOPs::NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedgerMaster) : mMode(omDISCONNECTED), mNeedNetworkLedger(false), mProposing(false), mValidating(false), mNetTimer(io_service), mLedgerMaster(pLedgerMaster), mCloseTimeOffset(0), mLastCloseProposers(0), - mLastCloseConvergeTime(1000 * LEDGER_IDLE_INTERVAL), mLastValidationTime(0), + mLastCloseConvergeTime(1000 * LEDGER_IDLE_INTERVAL), mLastValidationTime(0), mFetchPack("FetchPack", 2048, 12), mLastLoadBase(256), mLastLoadFactor(256) { } @@ -156,18 +156,6 @@ bool NetworkOPs::isValidated(uint32 seq) return haveLedger(seq) && (seq <= mLedgerMaster->getValidatedLedger()->getLedgerSeq()); } -bool NetworkOPs::addWantedHash(const uint256& h) -{ - boost::recursive_mutex::scoped_lock sl(mWantedHashLock); - return mWantedHashes.insert(h).second; -} - -bool NetworkOPs::isWantedHash(const uint256& h, bool remove) -{ - boost::recursive_mutex::scoped_lock sl(mWantedHashLock); - return (remove ? mWantedHashes.erase(h) : mWantedHashes.count(h)) != 0; -} - void NetworkOPs::submitTransaction(Job&, SerializedTransaction::pointer iTrans, stCallback callback) { // this is an asynchronous interface Serializer s; @@ -2007,7 +1995,7 @@ void NetworkOPs::getBookPage(Ledger::pointer lpLedger, const uint160& uTakerPays } void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr wPeer, boost::shared_ptr request, - Ledger::pointer prevLedger, Ledger::pointer reqLedger) + Ledger::pointer wantLedger, Ledger::pointer haveLedger) { try { @@ -2021,25 +2009,32 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr wPeer, boost::shared_ reply.set_seq(request->seq()); reply.set_ledgerhash(reply.ledgerhash()); - std::list pack = - reqLedger->peekAccountStateMap()->getFetchPack(prevLedger->peekAccountStateMap().get(), false, 1024); - BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack) + do { - ripple::TMIndexedObject& newObj = *reply.add_objects(); - newObj.set_hash(node.first.begin(), 256 / 8); - newObj.set_data(&node.second[0], node.second.size()); - } - - if (reqLedger->getAccountHash().isNonZero() && (pack.size() < 768)) - { - pack = reqLedger->peekTransactionMap()->getFetchPack(NULL, true, 256); + std::list pack = wantLedger->peekAccountStateMap()->getFetchPack( + haveLedger->peekAccountStateMap().get(), false, 1024 - reply.objects().size()); BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack) { ripple::TMIndexedObject& newObj = *reply.add_objects(); newObj.set_hash(node.first.begin(), 256 / 8); newObj.set_data(&node.second[0], node.second.size()); } - } + + if (wantLedger->getAccountHash().isNonZero() && (pack.size() < 768)) + { + pack = wantLedger->peekTransactionMap()->getFetchPack(NULL, true, 256); + BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack) + { + ripple::TMIndexedObject& newObj = *reply.add_objects(); + newObj.set_hash(node.first.begin(), 256 / 8); + newObj.set_data(&node.second[0], node.second.size()); + } + } + if (reply.objects().size() >= 768) + break; + haveLedger = wantLedger; + wantLedger = getLedgerByHash(haveLedger->getParentHash()); + } while (wantLedger); cLog(lsINFO) << "Built fetch pack with " << reply.objects().size() << " nodes"; PackedMessage::pointer msg = boost::make_shared(reply, ripple::mtGET_OBJECTS); @@ -2051,4 +2046,19 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr wPeer, boost::shared_ } } +void NetworkOPs::sweepFetchPack() +{ + mFetchPack.sweep(); +} + +void NetworkOPs::addFetchPack(const uint256& hash, boost::shared_ptr< std::vector >& data) +{ + mFetchPack.canonicalize(hash, data, false); +} + +bool NetworkOPs::getFetchPack(const uint256& hash, std::vector& data) +{ + return mFetchPack.retrieve(hash, data); +} + // vim:ts=4 diff --git a/src/cpp/ripple/NetworkOPs.h b/src/cpp/ripple/NetworkOPs.h index 9bd01fa079..1fcd44cf9b 100644 --- a/src/cpp/ripple/NetworkOPs.h +++ b/src/cpp/ripple/NetworkOPs.h @@ -128,8 +128,7 @@ protected: subMapType mSubTransactions; // all accepted transactions subMapType mSubRTTransactions; // all proposed and accepted transactions - boost::recursive_mutex mWantedHashLock; - boost::unordered_set mWantedHashes; + TaggedCache< uint256, std::vector > mFetchPack; uint32 mLastLoadBase; uint32 mLastLoadFactor; @@ -261,7 +260,10 @@ public: void mapComplete(const uint256& hash, SHAMap::ref map); bool stillNeedTXSet(const uint256& hash); void makeFetchPack(Job&, boost::weak_ptr peer, boost::shared_ptr request, - Ledger::pointer prevLedger, Ledger::pointer reqLedger); + Ledger::pointer wantLedger, Ledger::pointer haveLedger); + void addFetchPack(const uint256& hash, boost::shared_ptr< std::vector >& data); + bool getFetchPack(const uint256& hash, std::vector& data); + void sweepFetchPack(); // network state machine void checkState(const boost::system::error_code& result); @@ -294,9 +296,6 @@ public: uint256 getConsensusLCL(); void reportFeeChange(); - bool addWantedHash(const uint256& h); - bool isWantedHash(const uint256& h, bool remove); - //Helper function to generate SQL query to get transactions std::string transactionsSQL(std::string selection, const RippleAddress& account, int32 minLedger, int32 maxLedger, bool descending, uint32 offset, int limit, diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/Peer.cpp index cc7e4adc6d..dd9c0048f6 100644 --- a/src/cpp/ripple/Peer.cpp +++ b/src/cpp/ripple/Peer.cpp @@ -1189,14 +1189,14 @@ void Peer::recvPeers(ripple::TMPeers& packet) void Peer::recvGetObjectByHash(const boost::shared_ptr& ptr) { ripple::TMGetObjectByHash& packet = *ptr; - if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK) - { - doFetchPack(ptr); - return; - } if (packet.query()) { // this is a query + if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK) + { + doFetchPack(ptr); + return; + } ripple::TMGetObjectByHash reply; reply.set_query(false); @@ -1233,16 +1233,6 @@ void Peer::recvGetObjectByHash(const boost::shared_ptrgetOPs().isWantedHash(hash, true)) - { - std::vector data(obj.data().begin(), obj.data().end()); - if (Serializer::getSHA512Half(data) != hash) - { - cLog(lsWARNING) << "Bad hash in data from peer"; - theApp->getOPs().addWantedHash(hash); - punishPeer(LT_BadData); - } - else - { - cLog(lsDEBUG) << "Got wanted hash " << hash; - theApp->getHashedObjectStore().store(type, seq, data, hash); - } - } - else - cLog(lsWARNING) << "Received unwanted hash " << getIP() << " " << hash; + + boost::shared_ptr< std::vector > data = boost::make_shared< std::vector > + (obj.data().begin(), obj.data().end()); + + theApp->getOPs().addFetchPack(hash, data); } } } @@ -1868,22 +1846,22 @@ void Peer::doFetchPack(const boost::shared_ptr& packe uint256 hash; memcpy(hash.begin(), packet->ledgerhash().data(), 32); - Ledger::pointer reqLedger = theApp->getOPs().getLedgerByHash(hash); - if (!reqLedger) + Ledger::pointer haveLedger = theApp->getOPs().getLedgerByHash(hash); + if (!haveLedger) { cLog(lsINFO) << "Peer requests fetch pack for ledger we don't have: " << hash; punishPeer(LT_RequestNoReply); return; } - if (!reqLedger->isClosed()) + if (!haveLedger->isClosed()) { - cLog(lsWARNING) << "Peer requests fetch pack for open ledger: " << hash; + cLog(lsWARNING) << "Peer requests fetch pack from open ledger: " << hash; punishPeer(LT_InvalidRequest); return; } - Ledger::pointer prevLedger = theApp->getOPs().getLedgerByHash(reqLedger->getParentHash()); - if (!prevLedger) + Ledger::pointer wantLedger = theApp->getOPs().getLedgerByHash(haveLedger->getParentHash()); + if (!wantLedger) { cLog(lsINFO) << "Peer requests fetch pack for ledger whose predecessor we don't have: " << hash; punishPeer(LT_RequestNoReply); @@ -1891,7 +1869,7 @@ void Peer::doFetchPack(const boost::shared_ptr& packe } theApp->getJobQueue().addJob(jtPACK, "MakeFetchPack", BIND_TYPE(&NetworkOPs::makeFetchPack, &theApp->getOPs(), P_1, - boost::weak_ptr(shared_from_this()), packet, prevLedger, reqLedger)); + boost::weak_ptr(shared_from_this()), packet, wantLedger, haveLedger)); } else { // received fetch pack diff --git a/src/cpp/ripple/SHAMap.h b/src/cpp/ripple/SHAMap.h index 147c360216..f228f73bb4 100644 --- a/src/cpp/ripple/SHAMap.h +++ b/src/cpp/ripple/SHAMap.h @@ -477,7 +477,7 @@ public: virtual void dump(bool withHashes = false); typedef std::pair< uint256, std::vector > fetchPackEntry_t; - std::list getFetchPack(SHAMap* prior, bool includeLeaves, int max); + std::list getFetchPack(SHAMap* have, bool includeLeaves, int max); static void sweep() { fullBelowCache.sweep(); } }; diff --git a/src/cpp/ripple/SHAMapSync.cpp b/src/cpp/ripple/SHAMapSync.cpp index 4ff3f3bf6d..7156d9feff 100644 --- a/src/cpp/ripple/SHAMapSync.cpp +++ b/src/cpp/ripple/SHAMapSync.cpp @@ -482,14 +482,14 @@ bool SHAMap::hasNode(const SHAMapNode& nodeID, const uint256& nodeHash) return node->getNodeHash() == nodeHash; } -std::list SHAMap::getFetchPack(SHAMap* prior, bool includeLeaves, int max) +std::list SHAMap::getFetchPack(SHAMap* have, bool includeLeaves, int max) { std::list ret; if (root->isLeaf()) { if (includeLeaves && !root->getNodeHash().isZero() && - (!prior || !prior->hasNode(*root, root->getNodeHash()))) + (!have || !have->hasNode(*root, root->getNodeHash()))) { Serializer s; root->addRaw(s, snfPREFIX); @@ -501,7 +501,7 @@ std::list SHAMap::getFetchPack(SHAMap* prior, bool inc if (root->getNodeHash().isZero()) return ret; - if (prior && (root->getNodeHash() == prior->root->getNodeHash())) + if (have && (root->getNodeHash() == have->root->getNodeHash())) return ret; std::stack stack; // contains unexplored non-matching inner node entries @@ -529,10 +529,10 @@ std::list SHAMap::getFetchPack(SHAMap* prior, bool inc SHAMapTreeNode *next = getNodePointer(childID, childHash); if (next->isInner()) { - if (!prior || !prior->hasNode(*next, childHash)) + if (!have || !have->hasNode(*next, childHash)) stack.push(next); } - else if (includeLeaves && (!prior || !prior->hasNode(childID, childHash))) + else if (includeLeaves && (!have || !have->hasNode(childID, childHash))) { Serializer s; node->addRaw(s, snfPREFIX); diff --git a/src/cpp/ripple/SHAMapSync.h b/src/cpp/ripple/SHAMapSync.h index 35b6e9b794..1c010d1040 100644 --- a/src/cpp/ripple/SHAMapSync.h +++ b/src/cpp/ripple/SHAMapSync.h @@ -35,8 +35,8 @@ public: theApp->getHashedObjectStore().store(hotACCOUNT_NODE, mLedgerSeq, nodeData, nodeHash); } virtual bool haveNode(const SHAMapNode& id, const uint256& nodeHash, std::vector& nodeData) - { // fetchNodeExternal already tried - return false; + { + return theApp->getOPs().getFetchPack(nodeHash, nodeData); } }; @@ -58,8 +58,8 @@ public: mLedgerSeq, nodeData, nodeHash); } virtual bool haveNode(const SHAMapNode& id, const uint256& nodeHash, std::vector& nodeData) - { // fetchNodeExternal already tried - return false; + { + return theApp->getOPs().getFetchPack(nodeHash, nodeData); } };