diff --git a/src/cpp/ripple/Application.cpp b/src/cpp/ripple/Application.cpp index 31809985cb..8b1d75b096 100644 --- a/src/cpp/ripple/Application.cpp +++ b/src/cpp/ripple/Application.cpp @@ -360,6 +360,7 @@ void Application::sweep() mSLECache.sweep(); AcceptedLedger::sweep(); SHAMap::sweep(); + mNetOps.sweepFetchPack(); mSweepTimer.expires_from_now(boost::posix_time::seconds(theConfig.getSize(siSweepInterval))); mSweepTimer.async_wait(boost::bind(&Application::sweep, this)); } diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index 217b350361..9a9dd8a9f5 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -225,7 +225,6 @@ void JobQueue::shutdown() mJobCond.notify_all(); while (mThreadCount != 0) mJobCond.wait(sl); - cLog(lsDEBUG) << "Job queue has shut down"; } void JobQueue::setThreadCount(int c) diff --git a/src/cpp/ripple/Ledger.cpp b/src/cpp/ripple/Ledger.cpp index 715a84a331..97a766cfec 100644 --- a/src/cpp/ripple/Ledger.cpp +++ b/src/cpp/ripple/Ledger.cpp @@ -58,7 +58,7 @@ Ledger::Ledger(const uint256 &parentHash, const uint256 &transHash, const uint25 try { if (mTransHash.isNonZero()) - mTransactionMap->fetchRoot(mTransHash); + mTransactionMap->fetchRoot(mTransHash, NULL); } catch (...) { @@ -68,7 +68,7 @@ Ledger::Ledger(const uint256 &parentHash, const uint256 &transHash, const uint25 try { if (mAccountHash.isNonZero()) - mAccountStateMap->fetchRoot(mAccountHash); + mAccountStateMap->fetchRoot(mAccountHash, NULL); } catch (...) { @@ -1625,7 +1625,7 @@ uint64 Ledger::scaleFeeLoad(uint64 fee, bool bAdmin) return theApp->getFeeTrack().scaleFeeLoad(fee, mBaseFee, mReferenceFeeUnits, bAdmin); } -std::vector Ledger::getNeededTransactionHashes(int max) +std::vector Ledger::getNeededTransactionHashes(int max, SHAMapSyncFilter* filter) { std::vector ret; if (mTransHash.isNonZero()) @@ -1633,12 +1633,12 @@ std::vector Ledger::getNeededTransactionHashes(int max) if (mTransactionMap->getHash().isZero()) ret.push_back(mTransHash); else - ret = mTransactionMap->getNeededHashes(max); + ret = mTransactionMap->getNeededHashes(max, filter); } return ret; } -std::vector Ledger::getNeededAccountStateHashes(int max) +std::vector Ledger::getNeededAccountStateHashes(int max, SHAMapSyncFilter* filter) { std::vector ret; if (mAccountHash.isNonZero()) @@ -1646,7 +1646,7 @@ std::vector Ledger::getNeededAccountStateHashes(int max) if (mAccountStateMap->getHash().isZero()) ret.push_back(mAccountHash); else - ret = mAccountStateMap->getNeededHashes(max); + ret = mAccountStateMap->getNeededHashes(max, filter); } return ret; } diff --git a/src/cpp/ripple/Ledger.h b/src/cpp/ripple/Ledger.h index 3fb0a898f3..6b33999688 100644 --- a/src/cpp/ripple/Ledger.h +++ b/src/cpp/ripple/Ledger.h @@ -222,8 +222,8 @@ public: static uint256 getLedgerFeatureIndex(); static uint256 getLedgerFeeIndex(); - std::vector getNeededTransactionHashes(int max); - std::vector getNeededAccountStateHashes(int max); + std::vector getNeededTransactionHashes(int max, SHAMapSyncFilter* filter); + std::vector getNeededAccountStateHashes(int max, SHAMapSyncFilter* filter); // index calculation functions static uint256 getAccountRootIndex(const uint160& uAccountID); diff --git a/src/cpp/ripple/LedgerAcquire.cpp b/src/cpp/ripple/LedgerAcquire.cpp index a54fc5f6d7..51da537be3 100644 --- a/src/cpp/ripple/LedgerAcquire.cpp +++ b/src/cpp/ripple/LedgerAcquire.cpp @@ -114,9 +114,19 @@ bool LedgerAcquire::tryLocal() // Nothing we can do without the ledger base HashedObject::pointer node = theApp->getHashedObjectStore().retrieve(mHash); if (!node) - return false; + { + std::vector data; + if (!theApp->getOPs().getFetchPack(mHash, data)) + return false; + cLog(lsTRACE) << "Ledger base found in fetch pack"; + mLedger = boost::make_shared(data, true); + theApp->getHashedObjectStore().store(hotLEDGER, mLedger->getLedgerSeq(), data, mHash); + } + else + { + mLedger = boost::make_shared(strCopy(node->getData()), true); + } - mLedger = boost::make_shared(strCopy(node->getData()), true); if (mLedger->getHash() != mHash) { // We know for a fact the ledger can never be acquired cLog(lsWARNING) << mHash << " cannot be a ledger"; @@ -134,9 +144,10 @@ bool LedgerAcquire::tryLocal() { try { - mLedger->peekTransactionMap()->fetchRoot(mLedger->getTransHash()); + TransactionStateSF filter(mLedger->getLedgerSeq()); + mLedger->peekTransactionMap()->fetchRoot(mLedger->getTransHash(), &filter); cLog(lsDEBUG) << "Got root txn map locally"; - std::vector h = mLedger->getNeededTransactionHashes(1); + std::vector h = mLedger->getNeededTransactionHashes(1, &filter); if (h.empty()) { cLog(lsDEBUG) << "Had full txn map locally"; @@ -157,9 +168,10 @@ bool LedgerAcquire::tryLocal() { try { - mLedger->peekAccountStateMap()->fetchRoot(mLedger->getAccountHash()); + AccountStateSF filter(mLedger->getLedgerSeq()); + mLedger->peekAccountStateMap()->fetchRoot(mLedger->getAccountHash(), &filter); cLog(lsDEBUG) << "Got root AS map locally"; - std::vector h = mLedger->getNeededAccountStateHashes(1); + std::vector h = mLedger->getNeededAccountStateHashes(1, &filter); if (h.empty()) { cLog(lsDEBUG) << "Had full AS map locally"; @@ -357,13 +369,10 @@ void LedgerAcquire::trigger(Peer::ref peer) ripple::TMGetObjectByHash tmBH; tmBH.set_query(true); tmBH.set_ledgerhash(mHash.begin(), mHash.size()); - if (mHaveBase) - tmBH.set_seq(mLedger->getLedgerSeq()); bool typeSet = false; BOOST_FOREACH(neededHash_t& p, need) { cLog(lsWARNING) << "Want: " << p.second; - theApp->getOPs().addWantedHash(p.second); if (!typeSet) { tmBH.set_type(p.first); @@ -783,7 +792,13 @@ LedgerAcquire::pointer LedgerAcquireMaster::findCreate(const uint256& hash) ptr->setTimer(); // Cannot call in constructor } else + { + Ledger::pointer ledger = ptr->getLedger(); + ledger->setClosed(); + ledger->setImmutable(); + theApp->getLedgerMaster().storeLedger(ledger); cLog(lsDEBUG) << "Acquiring ledger we already have: " << hash; + } return ptr; } @@ -810,13 +825,15 @@ std::vector LedgerAcquire::getNeededHashes() } if (!mHaveState) { - std::vector v = mLedger->getNeededAccountStateHashes(4); + AccountStateSF filter(mLedger->getLedgerSeq()); + std::vector v = mLedger->getNeededAccountStateHashes(4, &filter); BOOST_FOREACH(const uint256& h, v) ret.push_back(std::make_pair(ripple::TMGetObjectByHash::otSTATE_NODE, h)); } if (!mHaveTransactions) { - std::vector v = mLedger->getNeededAccountStateHashes(4); + TransactionStateSF filter(mLedger->getLedgerSeq()); + std::vector v = mLedger->getNeededAccountStateHashes(4, &filter); BOOST_FOREACH(const uint256& h, v) ret.push_back(std::make_pair(ripple::TMGetObjectByHash::otTRANSACTION_NODE, h)); } @@ -840,7 +857,7 @@ Json::Value LedgerAcquire::getJson(int) if (mHaveBase && !mHaveState) { Json::Value hv(Json::arrayValue); - std::vector v = mLedger->peekAccountStateMap()->getNeededHashes(16); + std::vector v = mLedger->peekAccountStateMap()->getNeededHashes(16, NULL); BOOST_FOREACH(const uint256& h, v) hv.append(h.GetHex()); ret["needed_state_hashes"] = hv; @@ -848,7 +865,7 @@ Json::Value LedgerAcquire::getJson(int) if (mHaveBase && !mHaveTransactions) { Json::Value hv(Json::arrayValue); - std::vector v = mLedger->peekTransactionMap()->getNeededHashes(16); + std::vector v = mLedger->peekTransactionMap()->getNeededHashes(16, NULL); BOOST_FOREACH(const uint256& h, v) hv.append(h.GetHex()); ret["needed_transaction_hashes"] = hv; diff --git a/src/cpp/ripple/LedgerConsensus.cpp b/src/cpp/ripple/LedgerConsensus.cpp index 4878f15ebd..af56d28249 100644 --- a/src/cpp/ripple/LedgerConsensus.cpp +++ b/src/cpp/ripple/LedgerConsensus.cpp @@ -474,6 +474,12 @@ void LedgerConsensus::statusChange(ripple::NodeEvent event, Ledger& ledger) s.set_ledgerhashprevious(hash.begin(), hash.size()); hash = ledger.getHash(); s.set_ledgerhash(hash.begin(), hash.size()); + + uint32 uMin, uMax; + theApp->getOPs().getValidatedRange(uMin, uMax); + s.set_firstseq(uMin); + s.set_lastseq(uMax); + PackedMessage::pointer packet = boost::make_shared(s, ripple::mtSTATUS_CHANGE); theApp->getConnectionPool().relayMessage(NULL, packet); cLog(lsTRACE) << "send status change to peer"; diff --git a/src/cpp/ripple/LedgerMaster.cpp b/src/cpp/ripple/LedgerMaster.cpp index 20a3d93c06..6368a580e1 100644 --- a/src/cpp/ripple/LedgerMaster.cpp +++ b/src/cpp/ripple/LedgerMaster.cpp @@ -265,18 +265,59 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l { typedef std::pair u_pair; std::vector vec = origLedger->getLedgerHashes(); - BOOST_REVERSE_FOREACH(const u_pair& it, vec) + BOOST_FOREACH(const u_pair& it, vec) { if ((fetchCount < fetchMax) && (it.first < ledgerSeq) && !mCompleteLedgers.hasValue(it.first) && !theApp->getMasterLedgerAcquire().find(it.second)) { - ++fetchCount; - theApp->getMasterLedgerAcquire().findCreate(it.second); + LedgerAcquire::pointer acq = theApp->getMasterLedgerAcquire().findCreate(it.second); + if (acq && acq->isComplete()) + { + acq->getLedger()->setAccepted(); + setFullLedger(acq->getLedger()); + mLedgerHistory.addAcceptedLedger(acq->getLedger(), false); + } + else ++fetchCount; } } } } + if (theApp->getOPs().shouldFetchPack()) + { // refill our fetch pack + Ledger::pointer nextLedger = mLedgerHistory.getLedgerBySeq(ledgerSeq + 1); + if (nextLedger) + { + ripple::TMGetObjectByHash tmBH; + tmBH.set_type(ripple::TMGetObjectByHash::otFETCH_PACK); + tmBH.set_query(true); + tmBH.set_seq(ledgerSeq); + tmBH.set_ledgerhash(ledgerHash.begin(), 32); + std::vector peerList = theApp->getConnectionPool().getPeerVector(); + + Peer::pointer target; + int count = 0; + + BOOST_FOREACH(const Peer::pointer& peer, peerList) + { + if (peer->hasRange(ledgerSeq, ledgerSeq + 1)) + { + if (count++ == 0) + target = peer; + else if ((rand() % count) == 0) + target = peer; + } + } + if (target) + { + PackedMessage::pointer packet = boost::make_shared(tmBH, ripple::mtGET_OBJECTS); + target->sendPacket(packet, false); + } + else + cLog(lsTRACE) << "No peer for fetch pack"; + } + } + return true; } diff --git a/src/cpp/ripple/LoadManager.h b/src/cpp/ripple/LoadManager.h index 7acf6dc9a6..3926202416 100644 --- a/src/cpp/ripple/LoadManager.h +++ b/src/cpp/ripple/LoadManager.h @@ -182,6 +182,7 @@ public: void setRemoteFee(uint32); bool raiseLocalFee(); bool lowerLocalFee(); + bool isLoaded() { return (raiseCount != 0) || (mLocalTxnLoadFee != lftNormalFee); } }; diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index b3d3a08d69..901dcea113 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -6,6 +6,7 @@ #include "utils.h" #include "Application.h" #include "Transaction.h" +#include "HashPrefixes.h" #include "LedgerConsensus.h" #include "LedgerTiming.h" #include "Log.h" @@ -35,6 +36,7 @@ NetworkOPs::NetworkOPs(boost::asio::io_service& io_service, LedgerMaster* pLedge mMode(omDISCONNECTED), mNeedNetworkLedger(false), mProposing(false), mValidating(false), mNetTimer(io_service), mLedgerMaster(pLedgerMaster), mCloseTimeOffset(0), mLastCloseProposers(0), mLastCloseConvergeTime(1000 * LEDGER_IDLE_INTERVAL), mLastValidationTime(0), + mFetchPack("FetchPack", 2048, 30), mLastFetchPack(0), mLastLoadBase(256), mLastLoadFactor(256) { } @@ -156,18 +158,6 @@ bool NetworkOPs::isValidated(uint32 seq) return haveLedger(seq) && (seq <= mLedgerMaster->getValidatedLedger()->getLedgerSeq()); } -bool NetworkOPs::addWantedHash(const uint256& h) -{ - boost::recursive_mutex::scoped_lock sl(mWantedHashLock); - return mWantedHashes.insert(h).second; -} - -bool NetworkOPs::isWantedHash(const uint256& h, bool remove) -{ - boost::recursive_mutex::scoped_lock sl(mWantedHashLock); - return (remove ? mWantedHashes.erase(h) : mWantedHashes.count(h)) != 0; -} - void NetworkOPs::submitTransaction(Job&, SerializedTransaction::pointer iTrans, stCallback callback) { // this is an asynchronous interface Serializer s; @@ -1276,6 +1266,12 @@ Json::Value NetworkOPs::getServerInfo(bool human, bool admin) info["complete_ledgers"] = theApp->getLedgerMaster().getCompleteLedgers(); + + + size_t fp = mFetchPack.getCacheSize(); + if (fp != 0) + info["fetch_pack"] = Json::UInt(fp); + info["peers"] = theApp->getConnectionPool().getPeerCount(); Json::Value lastClose = Json::objectValue; @@ -2007,7 +2003,7 @@ void NetworkOPs::getBookPage(Ledger::pointer lpLedger, const uint160& uTakerPays } void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr wPeer, boost::shared_ptr request, - Ledger::pointer prevLedger, Ledger::pointer reqLedger) + Ledger::pointer wantLedger, Ledger::pointer haveLedger) { try { @@ -2020,26 +2016,46 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr wPeer, boost::shared_ if (request->has_seq()) reply.set_seq(request->seq()); reply.set_ledgerhash(reply.ledgerhash()); + reply.set_type(ripple::TMGetObjectByHash::otFETCH_PACK); - std::list pack = - reqLedger->peekAccountStateMap()->getFetchPack(prevLedger->peekAccountStateMap().get(), false, 1024); - BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack) + do { + uint32 lSeq = wantLedger->getLedgerSeq(); + ripple::TMIndexedObject& newObj = *reply.add_objects(); - newObj.set_hash(node.first.begin(), 256 / 8); - newObj.set_data(&node.second[0], node.second.size()); - } + newObj.set_hash(wantLedger->getHash().begin(), 256 / 8); + Serializer s(256); + s.add32(sHP_Ledger); + wantLedger->addRaw(s); + newObj.set_data(s.getDataPtr(), s.getLength()); + newObj.set_ledgerseq(lSeq); - if (reqLedger->getAccountHash().isNonZero() && (pack.size() < 768)) - { - pack = reqLedger->peekTransactionMap()->getFetchPack(NULL, true, 256); + std::list pack = wantLedger->peekAccountStateMap()->getFetchPack( + haveLedger->peekAccountStateMap().get(), false, 1024 - reply.objects().size()); BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack) { ripple::TMIndexedObject& newObj = *reply.add_objects(); newObj.set_hash(node.first.begin(), 256 / 8); newObj.set_data(&node.second[0], node.second.size()); + newObj.set_ledgerseq(lSeq); } - } + + if (wantLedger->getAccountHash().isNonZero() && (pack.size() < 768)) + { + pack = wantLedger->peekTransactionMap()->getFetchPack(NULL, true, 256); + BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack) + { + ripple::TMIndexedObject& newObj = *reply.add_objects(); + newObj.set_hash(node.first.begin(), 256 / 8); + newObj.set_data(&node.second[0], node.second.size()); + newObj.set_ledgerseq(lSeq); + } + } + if (reply.objects().size() >= 512) + break; + haveLedger = wantLedger; + wantLedger = getLedgerByHash(haveLedger->getParentHash()); + } while (wantLedger); cLog(lsINFO) << "Built fetch pack with " << reply.objects().size() << " nodes"; PackedMessage::pointer msg = boost::make_shared(reply, ripple::mtGET_OBJECTS); @@ -2051,4 +2067,45 @@ void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr wPeer, boost::shared_ } } +void NetworkOPs::sweepFetchPack() +{ + mFetchPack.sweep(); +} + +void NetworkOPs::addFetchPack(const uint256& hash, boost::shared_ptr< std::vector >& data) +{ + mFetchPack.canonicalize(hash, data, false); +} + +bool NetworkOPs::getFetchPack(const uint256& hash, std::vector& data) +{ + bool ret = mFetchPack.retrieve(hash, data); + if (!ret) + return false; + mFetchPack.del(hash, false); + if (hash != Serializer::getSHA512Half(data)) + { + cLog(lsWARNING) << "Bad entry in fetch pack"; + return false; + } + return true; +} + +bool NetworkOPs::shouldFetchPack() +{ + uint32 now = getNetworkTimeNC(); + if (mLastFetchPack == now) + return false; + mFetchPack.sweep(); + if (mFetchPack.getCacheSize() > 384) + return false; + mLastFetchPack = now; + return true; +} + +int NetworkOPs::getFetchSize() +{ + return mFetchPack.getCacheSize(); +} + // vim:ts=4 diff --git a/src/cpp/ripple/NetworkOPs.h b/src/cpp/ripple/NetworkOPs.h index 9bd01fa079..9a0234c9c4 100644 --- a/src/cpp/ripple/NetworkOPs.h +++ b/src/cpp/ripple/NetworkOPs.h @@ -128,8 +128,8 @@ protected: subMapType mSubTransactions; // all accepted transactions subMapType mSubRTTransactions; // all proposed and accepted transactions - boost::recursive_mutex mWantedHashLock; - boost::unordered_set mWantedHashes; + TaggedCache< uint256, std::vector > mFetchPack; + uint32 mLastFetchPack; uint32 mLastLoadBase; uint32 mLastLoadFactor; @@ -261,7 +261,13 @@ public: void mapComplete(const uint256& hash, SHAMap::ref map); bool stillNeedTXSet(const uint256& hash); void makeFetchPack(Job&, boost::weak_ptr peer, boost::shared_ptr request, - Ledger::pointer prevLedger, Ledger::pointer reqLedger); + Ledger::pointer wantLedger, Ledger::pointer haveLedger); + bool shouldFetchPack(); + void gotFetchPack() { mLastFetchPack = 0; } + void addFetchPack(const uint256& hash, boost::shared_ptr< std::vector >& data); + bool getFetchPack(const uint256& hash, std::vector& data); + int getFetchSize(); + void sweepFetchPack(); // network state machine void checkState(const boost::system::error_code& result); @@ -294,9 +300,6 @@ public: uint256 getConsensusLCL(); void reportFeeChange(); - bool addWantedHash(const uint256& h); - bool isWantedHash(const uint256& h, bool remove); - //Helper function to generate SQL query to get transactions std::string transactionsSQL(std::string selection, const RippleAddress& account, int32 minLedger, int32 maxLedger, bool descending, uint32 offset, int limit, diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/Peer.cpp index cc7e4adc6d..6e252348d3 100644 --- a/src/cpp/ripple/Peer.cpp +++ b/src/cpp/ripple/Peer.cpp @@ -36,6 +36,8 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, mPeerId(peerID), mPrivate(false), mLoad(""), + mMinLedger(0), + mMaxLedger(0), mSocketSsl(io_service, ctx), mActivityTimer(io_service), mIOStrand(io_service) @@ -727,7 +729,7 @@ void Peer::recvHello(ripple::TMHello& packet) cLog(lsINFO) << "Recv(Hello): " << getIP() << " :Clock far off -" << ourTime - packet.nettime(); } } - else if (packet.protoversionmin() < MAKE_VERSION_INT(MIN_PROTO_MAJOR, MIN_PROTO_MINOR)) + else if (packet.protoversionmin() > MAKE_VERSION_INT(PROTO_VERSION_MAJOR, PROTO_VERSION_MINOR)) { cLog(lsINFO) << "Recv(Hello): Server requires protocol version " << GET_VERSION_MAJOR(packet.protoversion()) << "." << GET_VERSION_MINOR(packet.protoversion()) @@ -1189,14 +1191,14 @@ void Peer::recvPeers(ripple::TMPeers& packet) void Peer::recvGetObjectByHash(const boost::shared_ptr& ptr) { ripple::TMGetObjectByHash& packet = *ptr; - if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK) - { - doFetchPack(ptr); - return; - } if (packet.query()) { // this is a query + if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK) + { + doFetchPack(ptr); + return; + } ripple::TMGetObjectByHash reply; reply.set_query(false); @@ -1233,42 +1235,43 @@ void Peer::recvGetObjectByHash(const boost::shared_ptrgetOPs().gotFetchPack(); + uint32 pLSeq = 0; + bool pLDo = true; for (int i = 0; i < packet.objects_size(); ++i) { const ripple::TMIndexedObject& obj = packet.objects(i); if (obj.has_hash() && (obj.hash().size() == (256/8))) { - uint256 hash; - memcpy(hash.begin(), obj.hash().data(), 256 / 8); - if (theApp->getOPs().isWantedHash(hash, true)) + + if (obj.has_ledgerseq()) { - std::vector data(obj.data().begin(), obj.data().end()); - if (Serializer::getSHA512Half(data) != hash) + if (obj.ledgerseq() != pLSeq) { - cLog(lsWARNING) << "Bad hash in data from peer"; - theApp->getOPs().addWantedHash(hash); - punishPeer(LT_BadData); - } - else - { - cLog(lsDEBUG) << "Got wanted hash " << hash; - theApp->getHashedObjectStore().store(type, seq, data, hash); + tLog(pLDo && (pLSeq != 0), lsDEBUG) << "Recevied full fetch pack for " << pLSeq; + pLSeq = obj.ledgerseq(); + pLDo = !theApp->getOPs().haveLedger(pLSeq); + if (!pLDo) + { + cLog(lsDEBUG) << "Got pack for " << pLSeq << " too late"; + } } } - else - cLog(lsWARNING) << "Received unwanted hash " << getIP() << " " << hash; + + if (pLDo) + { + uint256 hash; + memcpy(hash.begin(), obj.hash().data(), 256 / 8); + + boost::shared_ptr< std::vector > data = boost::make_shared< std::vector > + (obj.data().begin(), obj.data().end()); + + theApp->getOPs().addFetchPack(hash, data); + } } } + tLog(pLDo && (pLSeq != 0), lsDEBUG) << "Received partial fetch pack for " << pLSeq; } } @@ -1403,6 +1406,11 @@ void Peer::recvStatus(ripple::TMStatusChange& packet) addLedger(mPreviousLedgerHash); } else mPreviousLedgerHash.zero(); + + if (packet.has_firstseq()) + mMinLedger = packet.firstseq(); + if (packet.has_lastseq()) + mMaxLedger = packet.lastseq(); } void Peer::recvGetLedger(ripple::TMGetLedger& packet) @@ -1857,46 +1865,49 @@ void Peer::doProofOfWork(Job&, boost::weak_ptr peer, ProofOfWork::pointer void Peer::doFetchPack(const boost::shared_ptr& packet) { - if (packet->query()) + if (theApp->getFeeTrack().isLoaded()) { - if (packet->ledgerhash().size() != 32) - { - cLog(lsWARNING) << "FetchPack hash size malformed"; - punishPeer(LT_InvalidRequest); - return; - } - uint256 hash; - memcpy(hash.begin(), packet->ledgerhash().data(), 32); - - Ledger::pointer reqLedger = theApp->getOPs().getLedgerByHash(hash); - if (!reqLedger) - { - cLog(lsINFO) << "Peer requests fetch pack for ledger we don't have: " << hash; - punishPeer(LT_RequestNoReply); - return; - } - if (!reqLedger->isClosed()) - { - cLog(lsWARNING) << "Peer requests fetch pack for open ledger: " << hash; - punishPeer(LT_InvalidRequest); - return; - } - - Ledger::pointer prevLedger = theApp->getOPs().getLedgerByHash(reqLedger->getParentHash()); - if (!prevLedger) - { - cLog(lsINFO) << "Peer requests fetch pack for ledger whose predecessor we don't have: " << hash; - punishPeer(LT_RequestNoReply); - return; - } - theApp->getJobQueue().addJob(jtPACK, "MakeFetchPack", - BIND_TYPE(&NetworkOPs::makeFetchPack, &theApp->getOPs(), P_1, - boost::weak_ptr(shared_from_this()), packet, prevLedger, reqLedger)); + cLog(lsINFO) << "Too busy to make fetch pack"; + return; } - else - { // received fetch pack - // WRITEME + if (packet->ledgerhash().size() != 32) + { + cLog(lsWARNING) << "FetchPack hash size malformed"; + punishPeer(LT_InvalidRequest); + return; } + uint256 hash; + memcpy(hash.begin(), packet->ledgerhash().data(), 32); + + Ledger::pointer haveLedger = theApp->getOPs().getLedgerByHash(hash); + if (!haveLedger) + { + cLog(lsINFO) << "Peer requests fetch pack for ledger we don't have: " << hash; + punishPeer(LT_RequestNoReply); + return; + } + if (!haveLedger->isClosed()) + { + cLog(lsWARNING) << "Peer requests fetch pack from open ledger: " << hash; + punishPeer(LT_InvalidRequest); + return; + } + + Ledger::pointer wantLedger = theApp->getOPs().getLedgerByHash(haveLedger->getParentHash()); + if (!wantLedger) + { + cLog(lsINFO) << "Peer requests fetch pack for ledger whose predecessor we don't have: " << hash; + punishPeer(LT_RequestNoReply); + return; + } + theApp->getJobQueue().addJob(jtPACK, "MakeFetchPack", + BIND_TYPE(&NetworkOPs::makeFetchPack, &theApp->getOPs(), P_1, + boost::weak_ptr(shared_from_this()), packet, wantLedger, haveLedger)); +} + +bool Peer::hasProto(int version) +{ + return mHello.has_protoversion() && (mHello.protoversion() >= version); } Json::Value Peer::getJson() diff --git a/src/cpp/ripple/Peer.h b/src/cpp/ripple/Peer.h index bace95d354..d0c48b2cd9 100644 --- a/src/cpp/ripple/Peer.h +++ b/src/cpp/ripple/Peer.h @@ -47,6 +47,7 @@ private: uint64 mPeerId; bool mPrivate; // Keep peer IP private. LoadSource mLoad; + uint32 mMinLedger, mMaxLedger; uint256 mClosedLedgerHash; uint256 mPreviousLedgerHash; @@ -162,6 +163,8 @@ public: const RippleAddress& getNodePublic() const { return mNodePublic; } void cycleStatus() { mPreviousLedgerHash = mClosedLedgerHash; mClosedLedgerHash.zero(); } + bool hasProto(int version); + bool hasRange(uint32 uMin, uint32 uMax) { return (uMin >= mMinLedger) && (uMax <= mMaxLedger); } }; #endif diff --git a/src/cpp/ripple/SHAMap.cpp b/src/cpp/ripple/SHAMap.cpp index 7a2b91ed99..2dd8a9f850 100644 --- a/src/cpp/ripple/SHAMap.cpp +++ b/src/cpp/ripple/SHAMap.cpp @@ -731,7 +731,7 @@ SHAMapTreeNode::pointer SHAMap::fetchNodeExternal(const SHAMapNode& id, const ui } } -void SHAMap::fetchRoot(const uint256& hash) +void SHAMap::fetchRoot(const uint256& hash, SHAMapSyncFilter* filter) { if (sLog(lsTRACE)) { @@ -742,7 +742,20 @@ void SHAMap::fetchRoot(const uint256& hash) else cLog(lsTRACE) << "Fetch root SHAMap node " << hash; } - root = fetchNodeExternal(SHAMapNode(), hash); + try + { + root = fetchNodeExternal(SHAMapNode(), hash); + } + catch (SHAMapMissingNode& mn) + { + std::vector nodeData; + if (!filter || !filter->haveNode(SHAMapNode(), hash, nodeData)) + throw; + root = boost::make_shared(SHAMapNode(), nodeData, + mSeq - 1, snfPREFIX, hash, true); + mTNByID[*root] = root; + filter->gotNode(true, SHAMapNode(), hash, nodeData, root->getType()); + } assert(root->getNodeHash() == hash); } diff --git a/src/cpp/ripple/SHAMap.h b/src/cpp/ripple/SHAMap.h index 147c360216..adf0a76cb2 100644 --- a/src/cpp/ripple/SHAMap.h +++ b/src/cpp/ripple/SHAMap.h @@ -21,6 +21,7 @@ DEFINE_INSTANCE(SHAMapItem); DEFINE_INSTANCE(SHAMapTreeNode); class SHAMap; +class SHAMapSyncFilter; // A tree-like map of SHA256 hashes // The trees are designed for rapid synchronization and compression of differences @@ -253,7 +254,7 @@ public: SHAMapSyncFilter() { ; } virtual ~SHAMapSyncFilter() { ; } - virtual void gotNode(const SHAMapNode& id, const uint256& nodeHash, + virtual void gotNode(bool fromFilter, const SHAMapNode& id, const uint256& nodeHash, const std::vector& nodeData, SHAMapTreeNode::TNType type) { ; } @@ -397,7 +398,7 @@ public: ScopedLock Lock() const { return ScopedLock(mLock); } bool hasNode(const SHAMapNode& id); - void fetchRoot(const uint256& hash); + void fetchRoot(const uint256& hash, SHAMapSyncFilter* filter); // normal hash access functions bool hasItem(const uint256& id); @@ -431,7 +432,7 @@ public: bool getNodeFat(const SHAMapNode& node, std::vector& nodeIDs, std::list >& rawNode, bool fatRoot, bool fatLeaves); bool getRootNode(Serializer& s, SHANodeFormat format); - std::vector getNeededHashes(int max); + std::vector getNeededHashes(int max, SHAMapSyncFilter* filter); SMAddNode addRootNode(const uint256& hash, const std::vector& rootNode, SHANodeFormat format, SHAMapSyncFilter* filter); SMAddNode addRootNode(const std::vector& rootNode, SHANodeFormat format, @@ -477,7 +478,7 @@ public: virtual void dump(bool withHashes = false); typedef std::pair< uint256, std::vector > fetchPackEntry_t; - std::list getFetchPack(SHAMap* prior, bool includeLeaves, int max); + std::list getFetchPack(SHAMap* have, bool includeLeaves, int max); static void sweep() { fullBelowCache.sweep(); } }; diff --git a/src/cpp/ripple/SHAMapSync.cpp b/src/cpp/ripple/SHAMapSync.cpp index 085c9aec72..b7ec400cf3 100644 --- a/src/cpp/ripple/SHAMapSync.cpp +++ b/src/cpp/ripple/SHAMapSync.cpp @@ -69,10 +69,12 @@ void SHAMap::getMissingNodes(std::vector& nodeIDs, std::vector= 1); SHAMapTreeNode::pointer ptr = - boost::make_shared(childID, nodeData, mSeq - 1, snfPREFIX, childHash, true); + boost::make_shared(childID, nodeData, mSeq - 1, + snfPREFIX, childHash, true); cLog(lsTRACE) << "Got sync node from cache: " << *ptr; mTNByID[*ptr] = ptr; d = ptr.get(); + filter->gotNode(true, childID, childHash, nodeData, ptr->getType()); } } } @@ -106,7 +108,7 @@ void SHAMap::getMissingNodes(std::vector& nodeIDs, std::vector SHAMap::getNeededHashes(int max) +std::vector SHAMap::getNeededHashes(int max, SHAMapSyncFilter* filter) { std::vector ret; boost::recursive_mutex::scoped_lock sl(mLock); @@ -135,20 +137,38 @@ std::vector SHAMap::getNeededHashes(int max) if (!node->isEmptyBranch(branch)) { const uint256& childHash = node->getChildHash(branch); + SHAMapNode childID = node->getChildNodeID(branch); if (!fullBelowCache.isPresent(childHash)) { + SHAMapTreeNode* d = NULL; try { - SHAMapTreeNode* d = getNodePointer(node->getChildNodeID(branch), childHash); + d = getNodePointer(node->getChildNodeID(branch), childHash); assert(d); + } + catch (SHAMapMissingNode&) + { // node is not in the map + std::vector nodeData; + if (filter && filter->haveNode(childID, childHash, nodeData)) + { + SHAMapTreeNode::pointer ptr = + boost::make_shared(childID, nodeData, mSeq -1, + snfPREFIX, childHash, true); + mTNByID[*ptr] = ptr; + d = ptr.get(); + filter->gotNode(true, childID, childHash, nodeData, ptr->getType()); + } + } + if (d) + { if (d->isInner() && !d->isFullBelow()) { have_all = false; stack.push(d); } } - catch (SHAMapMissingNode&) - { // node is not in the map + else + { have_all = false; ret.push_back(childHash); if (--max <= 0) @@ -263,7 +283,7 @@ SMAddNode SHAMap::addRootNode(const std::vector& rootNode, SHANod { Serializer s; root->addRaw(s, snfPREFIX); - filter->gotNode(*root, root->getNodeHash(), s.peekData(), root->getType()); + filter->gotNode(false, *root, root->getNodeHash(), s.peekData(), root->getType()); } return SMAddNode::useful(); @@ -299,7 +319,7 @@ SMAddNode SHAMap::addRootNode(const uint256& hash, const std::vectoraddRaw(s, snfPREFIX); - filter->gotNode(*root, root->getNodeHash(), s.peekData(), root->getType()); + filter->gotNode(false, *root, root->getNodeHash(), s.peekData(), root->getType()); } return SMAddNode::useful(); @@ -371,7 +391,7 @@ SMAddNode SHAMap::addKnownNode(const SHAMapNode& node, const std::vectoraddRaw(s, snfPREFIX); - filter->gotNode(node, hash, s.peekData(), newNode->getType()); + filter->gotNode(false, node, hash, s.peekData(), newNode->getType()); } mTNByID[*newNode] = newNode; @@ -472,7 +492,7 @@ bool SHAMap::deepCompare(SHAMap& other) bool SHAMap::hasNode(const SHAMapNode& nodeID, const uint256& nodeHash) { SHAMapTreeNode* node = root.get(); - while (node->isInner() && (node->getDepth() <= nodeID.getDepth())) + while (node->isInner() && (node->getDepth() < nodeID.getDepth())) { int branch = node->selectBranch(nodeID.getNodeID()); if (node->isEmptyBranch(branch)) @@ -482,14 +502,30 @@ bool SHAMap::hasNode(const SHAMapNode& nodeID, const uint256& nodeHash) return node->getNodeHash() == nodeHash; } -std::list SHAMap::getFetchPack(SHAMap* prior, bool includeLeaves, int max) +std::list SHAMap::getFetchPack(SHAMap* have, bool includeLeaves, int max) { std::list ret; + boost::recursive_mutex::scoped_lock ul1(mLock); + + UPTR_T< boost::unique_lock > ul2; + if (have) + { + UPTR_T< boost::unique_lock > ul3( + new boost::unique_lock(have->mLock, boost::try_to_lock)); + if (!(*ul3)) + { + cLog(lsINFO) << "Unable to create pack due to lock"; + return ret; + } + ul2.swap(ul3); + } + + if (root->isLeaf()) { if (includeLeaves && !root->getNodeHash().isZero() && - (!prior || !prior->hasNode(*root, root->getNodeHash()))) + (!have || !have->hasNode(*root, root->getNodeHash()))) { Serializer s; root->addRaw(s, snfPREFIX); @@ -501,7 +537,7 @@ std::list SHAMap::getFetchPack(SHAMap* prior, bool inc if (root->getNodeHash().isZero()) return ret; - if (prior && (root->getNodeHash() == prior->root->getNodeHash())) + if (have && (root->getNodeHash() == have->root->getNodeHash())) return ret; std::stack stack; // contains unexplored non-matching inner node entries @@ -529,10 +565,10 @@ std::list SHAMap::getFetchPack(SHAMap* prior, bool inc SHAMapTreeNode *next = getNodePointer(childID, childHash); if (next->isInner()) { - if (!prior || !prior->hasNode(*next, childHash)) + if (!have || !have->hasNode(*next, childHash)) stack.push(next); } - else if (includeLeaves && (!prior || !prior->hasNode(childID, childHash))) + else if (includeLeaves && (!have || !have->hasNode(childID, childHash))) { Serializer s; node->addRaw(s, snfPREFIX); @@ -545,8 +581,6 @@ std::list SHAMap::getFetchPack(SHAMap* prior, bool inc if (max <= 0) break; } - - cLog(lsINFO) << "Fetch pack has " << ret.size() << " entries"; return ret; } diff --git a/src/cpp/ripple/SHAMapSync.h b/src/cpp/ripple/SHAMapSync.h index 35b6e9b794..ea7caee9d7 100644 --- a/src/cpp/ripple/SHAMapSync.h +++ b/src/cpp/ripple/SHAMapSync.h @@ -13,7 +13,7 @@ class ConsensusTransSetSF : public SHAMapSyncFilter public: ConsensusTransSetSF() { ; } - virtual void gotNode(const SHAMapNode& id, const uint256& nodeHash, + virtual void gotNode(bool fromFilter, const SHAMapNode& id, const uint256& nodeHash, const std::vector& nodeData, SHAMapTreeNode::TNType); virtual bool haveNode(const SHAMapNode& id, const uint256& nodeHash, std::vector& nodeData); @@ -29,14 +29,14 @@ public: AccountStateSF(uint32 ledgerSeq) : mLedgerSeq(ledgerSeq) { ; } - virtual void gotNode(const SHAMapNode& id, const uint256& nodeHash, + virtual void gotNode(bool fromFilter, const SHAMapNode& id, const uint256& nodeHash, const std::vector& nodeData, SHAMapTreeNode::TNType) { theApp->getHashedObjectStore().store(hotACCOUNT_NODE, mLedgerSeq, nodeData, nodeHash); } virtual bool haveNode(const SHAMapNode& id, const uint256& nodeHash, std::vector& nodeData) - { // fetchNodeExternal already tried - return false; + { + return theApp->getOPs().getFetchPack(nodeHash, nodeData); } }; @@ -50,7 +50,7 @@ public: TransactionStateSF(uint32 ledgerSeq) : mLedgerSeq(ledgerSeq) { ; } - virtual void gotNode(const SHAMapNode& id, const uint256& nodeHash, + virtual void gotNode(bool fromFilter, const SHAMapNode& id, const uint256& nodeHash, const std::vector& nodeData, SHAMapTreeNode::TNType type) { theApp->getHashedObjectStore().store( @@ -58,8 +58,8 @@ public: mLedgerSeq, nodeData, nodeHash); } virtual bool haveNode(const SHAMapNode& id, const uint256& nodeHash, std::vector& nodeData) - { // fetchNodeExternal already tried - return false; + { + return theApp->getOPs().getFetchPack(nodeHash, nodeData); } }; diff --git a/src/cpp/ripple/TaggedCache.h b/src/cpp/ripple/TaggedCache.h index 8b12d80da6..c7fb419028 100644 --- a/src/cpp/ripple/TaggedCache.h +++ b/src/cpp/ripple/TaggedCache.h @@ -232,7 +232,7 @@ template bool TaggedCache::del(c if (!valid || entry.isExpired()) mCache.erase(cit); - return true; + return ret; } template diff --git a/src/cpp/ripple/TransactionAcquire.cpp b/src/cpp/ripple/TransactionAcquire.cpp index f2e1ee548e..fb432afaf4 100644 --- a/src/cpp/ripple/TransactionAcquire.cpp +++ b/src/cpp/ripple/TransactionAcquire.cpp @@ -195,9 +195,11 @@ SMAddNode TransactionAcquire::takeNodes(const std::list& nodeIDs, } } -void ConsensusTransSetSF::gotNode(const SHAMapNode& id, const uint256& nodeHash, +void ConsensusTransSetSF::gotNode(bool fromFilter, const SHAMapNode& id, const uint256& nodeHash, const std::vector& nodeData, SHAMapTreeNode::TNType type) { + if (fromFilter) + return; theApp->getTempNodeCache().store(nodeHash, nodeData); if ((type == SHAMapTreeNode::tnTRANSACTION_NM) && (nodeData.size() > 16)) { // this is a transaction, and we didn't have it diff --git a/src/cpp/ripple/Version.h b/src/cpp/ripple/Version.h index 897a19d97e..930c0b9dbf 100644 --- a/src/cpp/ripple/Version.h +++ b/src/cpp/ripple/Version.h @@ -6,7 +6,7 @@ #define SERVER_VERSION_MAJOR 0 #define SERVER_VERSION_MINOR 8 -#define SERVER_VERSION_SUB "-b" +#define SERVER_VERSION_SUB "-c" #define SERVER_NAME "Ripple" #define SV_STRINGIZE(x) SV_STRINGIZE2(x) diff --git a/src/cpp/ripple/ripple.proto b/src/cpp/ripple/ripple.proto index d35640ba15..8e5def2536 100644 --- a/src/cpp/ripple/ripple.proto +++ b/src/cpp/ripple/ripple.proto @@ -119,6 +119,8 @@ message TMStatusChange { optional bytes ledgerHash = 4; optional bytes ledgerHashPrevious = 5; optional uint64 networkTime = 6; + optional uint32 firstSeq = 7; + optional uint32 lastSeq = 8; } @@ -222,6 +224,7 @@ message TMIndexedObject optional bytes nodeID = 2; optional bytes index = 3; optional bytes data = 4; + optional uint32 ledgerSeq = 5; } message TMGetObjectByHash