From aeccecb57845e272fabc903f2b6a200c8f7d6b70 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 22 Apr 2013 11:09:07 -0700 Subject: [PATCH] ech pack stuff. --- src/cpp/ripple/LedgerAcquire.cpp | 1 + src/cpp/ripple/LedgerMaster.cpp | 4 ++-- src/cpp/ripple/NetworkOPs.cpp | 30 ++++++++++++++++++++++++++++-- src/cpp/ripple/NetworkOPs.h | 3 +++ src/cpp/ripple/Peer.cpp | 32 +++++++++++++++++++++++++++----- src/cpp/ripple/SHAMapSync.cpp | 2 -- src/cpp/ripple/ripple.proto | 1 + 7 files changed, 62 insertions(+), 11 deletions(-) diff --git a/src/cpp/ripple/LedgerAcquire.cpp b/src/cpp/ripple/LedgerAcquire.cpp index d93a84b78d..99bbf7ed53 100644 --- a/src/cpp/ripple/LedgerAcquire.cpp +++ b/src/cpp/ripple/LedgerAcquire.cpp @@ -118,6 +118,7 @@ bool LedgerAcquire::tryLocal() std::vector data; if (!theApp->getOPs().getFetchPack(mHash, data)) return false; + cLog(lsINFO) << "Ledger base found in fetch pack"; mLedger = boost::make_shared(data, true); theApp->getHashedObjectStore().store(hotLEDGER, mLedger->getLedgerSeq(), data, mHash); } diff --git a/src/cpp/ripple/LedgerMaster.cpp b/src/cpp/ripple/LedgerMaster.cpp index b20ba7c5de..0e9f08da3d 100644 --- a/src/cpp/ripple/LedgerMaster.cpp +++ b/src/cpp/ripple/LedgerMaster.cpp @@ -251,7 +251,7 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l theApp->getIOService().post(boost::bind(&LedgerMaster::missingAcquireComplete, this, mMissingLedger)); } - if (theApp->getOPs().getFetchSize() < 256) + if (theApp->getOPs().shouldFetchPack()) { // refill our fetch pack Ledger::pointer nextLedger = mLedgerHistory.getLedgerBySeq(ledgerSeq + 1); if (nextLedger) @@ -282,7 +282,7 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l target->sendPacket(packet, false); } else - cLog(lsINFO) << "No peer for fetch pack"; + cLog(lsTRACE) << "No peer for fetch pack"; } } diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index f560a14a95..1491d47fee 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -6,6 +6,7 @@ #include "utils.h" #include "Application.h" #include "Transaction.h" +#include "HashPrefixes.h" #include "LedgerConsensus.h" #include "LedgerTiming.h" #include "Log.h" @@ -34,7 +35,8 @@ void InfoSub::onSendEmpty() NetworkOPs::NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedgerMaster) : mMode(omDISCONNECTED), mNeedNetworkLedger(false), mProposing(false), mValidating(false), mNetTimer(io_service), mLedgerMaster(pLedgerMaster), mCloseTimeOffset(0), mLastCloseProposers(0), - mLastCloseConvergeTime(1000 * LEDGER_IDLE_INTERVAL), mLastValidationTime(0), mFetchPack("FetchPack", 2048, 12), + mLastCloseConvergeTime(1000 * LEDGER_IDLE_INTERVAL), mLastValidationTime(0), + mFetchPack("FetchPack", 2048, 3), mLastFetchPack(0), mLastLoadBase(256), mLastLoadFactor(256) { } @@ -2018,6 +2020,16 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr wPeer, boost::shared_ do { + uint32 lSeq = wantLedger->getLedgerSeq(); + + ripple::TMIndexedObject& newObj = *reply.add_objects(); + newObj.set_hash(wantLedger->getHash().begin(), 256 / 8); + Serializer s(256); + s.add32(sHP_Ledger); + wantLedger->addRaw(s); + newObj.set_data(s.getDataPtr(), s.getLength()); + newObj.set_ledgerseq(lSeq); + std::list pack = wantLedger->peekAccountStateMap()->getFetchPack( haveLedger->peekAccountStateMap().get(), false, 1024 - reply.objects().size()); BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack) @@ -2025,6 +2037,7 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr wPeer, boost::shared_ ripple::TMIndexedObject& newObj = *reply.add_objects(); newObj.set_hash(node.first.begin(), 256 / 8); newObj.set_data(&node.second[0], node.second.size()); + newObj.set_ledgerseq(lSeq); } if (wantLedger->getAccountHash().isNonZero() && (pack.size() < 768)) @@ -2035,6 +2048,7 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr wPeer, boost::shared_ ripple::TMIndexedObject& newObj = *reply.add_objects(); newObj.set_hash(node.first.begin(), 256 / 8); newObj.set_data(&node.second[0], node.second.size()); + newObj.set_ledgerseq(lSeq); } } if (reply.objects().size() >= 768) @@ -2074,7 +2088,19 @@ bool NetworkOPs::getFetchPack(const uint256& hash, std::vector& d cLog(lsWARNING) << "Bad entry in fetch pack"; return false; } - cLog(lsTRACE) << hash << " found in fetch pack"; + cLog(lsINFO) << hash << " found in fetch pack"; + return true; +} + +bool NetworkOPs::shouldFetchPack() +{ + uint32 now = getNetworkTimeNC(); + if (mLastFetchPack == now) + return false; + mFetchPack.sweep(); + if (mFetchPack.getCacheSize() > 384) + return false; + mLastFetchPack = now; return true; } diff --git a/src/cpp/ripple/NetworkOPs.h b/src/cpp/ripple/NetworkOPs.h index 1d80bdcaf0..9a0234c9c4 100644 --- a/src/cpp/ripple/NetworkOPs.h +++ b/src/cpp/ripple/NetworkOPs.h @@ -129,6 +129,7 @@ protected: subMapType mSubRTTransactions; // all proposed and accepted transactions TaggedCache< uint256, std::vector > mFetchPack; + uint32 mLastFetchPack; uint32 mLastLoadBase; uint32 mLastLoadFactor; @@ -261,6 +262,8 @@ public: bool stillNeedTXSet(const uint256& hash); void makeFetchPack(Job&, boost::weak_ptr peer, boost::shared_ptr request, Ledger::pointer wantLedger, Ledger::pointer haveLedger); + bool shouldFetchPack(); + void gotFetchPack() { mLastFetchPack = 0; } void addFetchPack(const uint256& hash, boost::shared_ptr< std::vector >& data); bool getFetchPack(const uint256& hash, std::vector& data); int getFetchSize(); diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/Peer.cpp index 9fdf5afaad..5475f4e8d6 100644 --- a/src/cpp/ripple/Peer.cpp +++ b/src/cpp/ripple/Peer.cpp @@ -1235,18 +1235,40 @@ void Peer::recvGetObjectByHash(const boost::shared_ptrgetOPs().gotFetchPack(); for (int i = 0; i < packet.objects_size(); ++i) { const ripple::TMIndexedObject& obj = packet.objects(i); if (obj.has_hash() && (obj.hash().size() == (256/8))) { - uint256 hash; - memcpy(hash.begin(), obj.hash().data(), 256 / 8); + uint32 pLSeq = 0; + bool pLDo = true; - boost::shared_ptr< std::vector > data = boost::make_shared< std::vector > - (obj.data().begin(), obj.data().end()); + if (obj.has_ledgerseq()) + { + if (obj.ledgerseq() != pLSeq) + { + pLSeq = obj.ledgerseq(); + pLDo = !theApp->getOPs().haveLedger(pLSeq); + if (!pLDo) + { + cLog(lsDEBUG) << "Got pack for " << pLSeq << " too late"; + } + else cLog(lsDEBUG) << "Got pack for " << pLSeq; + } + } - theApp->getOPs().addFetchPack(hash, data); + if (pLDo) + { + uint256 hash; + memcpy(hash.begin(), obj.hash().data(), 256 / 8); + + boost::shared_ptr< std::vector > data = boost::make_shared< std::vector > + (obj.data().begin(), obj.data().end()); + + theApp->getOPs().addFetchPack(hash, data); + } } } } diff --git a/src/cpp/ripple/SHAMapSync.cpp b/src/cpp/ripple/SHAMapSync.cpp index a1b6307e02..b7ec400cf3 100644 --- a/src/cpp/ripple/SHAMapSync.cpp +++ b/src/cpp/ripple/SHAMapSync.cpp @@ -581,8 +581,6 @@ std::list SHAMap::getFetchPack(SHAMap* have, bool incl if (max <= 0) break; } - - cLog(lsINFO) << "Made pack with " << ret.size() << " entries"; return ret; } diff --git a/src/cpp/ripple/ripple.proto b/src/cpp/ripple/ripple.proto index 3046b9087e..8e5def2536 100644 --- a/src/cpp/ripple/ripple.proto +++ b/src/cpp/ripple/ripple.proto @@ -224,6 +224,7 @@ message TMIndexedObject optional bytes nodeID = 2; optional bytes index = 3; optional bytes data = 4; + optional uint32 ledgerSeq = 5; } message TMGetObjectByHash