diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index 8ba9ca548d..217b350361 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -37,6 +37,7 @@ const char* Job::toString(JobType t) switch(t) { case jtINVALID: return "invalid"; + case jtPACK: return "makeFetchPack"; case jtPUBOLDLEDGER: return "publishAcqLedger"; case jtVALIDATION_ut: return "untrustedValidation"; case jtPROOFWORK: return "proofOfWork"; @@ -304,7 +305,7 @@ void JobQueue::threadEntry() // } } - if (mShuttingDown) + if (mJobSet.empty()) break; JobType type; diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h index 62e058c009..5fc8f620c3 100644 --- a/src/cpp/ripple/JobQueue.h +++ b/src/cpp/ripple/JobQueue.h @@ -22,21 +22,22 @@ enum JobType { // must be in priority order, low to high jtINVALID = -1, - jtPUBOLDLEDGER = 1, // An old ledger has been accepted - jtVALIDATION_ut = 2, // A validation from an untrusted source - jtPROOFWORK = 3, // A proof of work demand from another server - jtPROPOSAL_ut = 4, // A proposal from an untrusted source - jtLEDGER_DATA = 5, // Received data for a ledger we're acquiring - jtCLIENT = 6, // A websocket command from the client - jtTRANSACTION = 7, // A transaction received from the network - jtPUBLEDGER = 8, // Publish a fully-accepted ledger - jtWAL = 9, // Write-ahead logging - jtVALIDATION_t = 10, // A validation from a trusted source - jtWRITE = 11, // Write out hashed objects - jtTRANSACTION_l = 12, // A local transaction - jtPROPOSAL_t = 13, // A proposal from a trusted source - jtADMIN = 14, // An administrative operation - jtDEATH = 15, // job of death, used internally + jtPACK = 1, // Make a fetch pack for a peer + jtPUBOLDLEDGER = 2, // An old ledger has been accepted + jtVALIDATION_ut = 3, // A validation from an untrusted source + jtPROOFWORK = 4, // A proof of work demand from another server + jtPROPOSAL_ut = 5, // A proposal from an untrusted source + jtLEDGER_DATA = 6, // Received data for a ledger we're acquiring + jtCLIENT = 7, // A websocket command from the client + jtTRANSACTION = 8, // A transaction received from the network + jtPUBLEDGER = 9, // Publish a fully-accepted ledger + jtWAL = 10, // Write-ahead logging + jtVALIDATION_t = 11, // A validation from a trusted source + jtWRITE = 12, // Write out hashed objects + jtTRANSACTION_l = 13, // A local transaction + jtPROPOSAL_t = 14, // A proposal from a trusted source + jtADMIN = 15, // An administrative operation + jtDEATH = 16, // job of death, used internally // special types not dispatched by the job pool jtPEER = 24, diff --git a/src/cpp/ripple/LedgerConsensus.cpp b/src/cpp/ripple/LedgerConsensus.cpp index d6916fff37..76afd7cb56 100644 --- a/src/cpp/ripple/LedgerConsensus.cpp +++ b/src/cpp/ripple/LedgerConsensus.cpp @@ -472,6 +472,12 @@ void LedgerConsensus::statusChange(ripple::NodeEvent event, Ledger& ledger) s.set_networktime(theApp->getOPs().getNetworkTimeNC()); uint256 hash = ledger.getParentHash(); s.set_ledgerhashprevious(hash.begin(), hash.size()); + + uint32 uMin, uMax; + theApp->getOPs().getValidatedRange(uMin, uMax); + s.set_firstseq(uMin); + s.set_lastseq(uMax); + hash = ledger.getHash(); s.set_ledgerhash(hash.begin(), hash.size()); PackedMessage::pointer packet = boost::make_shared(s, ripple::mtSTATUS_CHANGE); diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index 41006ac865..93015c3564 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -6,6 +6,7 @@ #include "utils.h" #include "Application.h" #include "Transaction.h" +#include "HashPrefixes.h" #include "LedgerConsensus.h" #include "LedgerTiming.h" #include "Log.h" @@ -2003,4 +2004,69 @@ void NetworkOPs::getBookPage(Ledger::pointer lpLedger, const uint160& uTakerPays // jvResult["nodes"] = Json::Value(Json::arrayValue); } +void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr wPeer, boost::shared_ptr request, + Ledger::pointer wantLedger, Ledger::pointer haveLedger) +{ + try + { + Peer::pointer peer = wPeer.lock(); + if (!peer) + return; + + ripple::TMGetObjectByHash reply; + reply.set_query(false); + if (request->has_seq()) + reply.set_seq(request->seq()); + reply.set_ledgerhash(reply.ledgerhash()); + reply.set_type(ripple::TMGetObjectByHash::otFETCH_PACK); + + do + { + uint32 lSeq = wantLedger->getLedgerSeq(); + + ripple::TMIndexedObject& newObj = *reply.add_objects(); + newObj.set_hash(wantLedger->getHash().begin(), 256 / 8); + Serializer s(256); + s.add32(sHP_Ledger); + wantLedger->addRaw(s); + newObj.set_data(s.getDataPtr(), s.getLength()); + newObj.set_ledgerseq(lSeq); + + std::list pack = wantLedger->peekAccountStateMap()->getFetchPack( + haveLedger->peekAccountStateMap().get(), false, 1024 - reply.objects().size()); + BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack) + { + ripple::TMIndexedObject& newObj = *reply.add_objects(); + newObj.set_hash(node.first.begin(), 256 / 8); + newObj.set_data(&node.second[0], node.second.size()); + newObj.set_ledgerseq(lSeq); + } + + if (wantLedger->getAccountHash().isNonZero() && (pack.size() < 768)) + { + pack = wantLedger->peekTransactionMap()->getFetchPack(NULL, true, 256); + BOOST_FOREACH(SHAMap::fetchPackEntry_t& node, pack) + { + ripple::TMIndexedObject& newObj = *reply.add_objects(); + newObj.set_hash(node.first.begin(), 256 / 8); + newObj.set_data(&node.second[0], node.second.size()); + newObj.set_ledgerseq(lSeq); + } + } + if (reply.objects().size() >= 768) + break; + haveLedger = wantLedger; + wantLedger = getLedgerByHash(haveLedger->getParentHash()); + } while (wantLedger); + + cLog(lsINFO) << "Built fetch pack with " << reply.objects().size() << " nodes"; + PackedMessage::pointer msg = boost::make_shared(reply, ripple::mtGET_OBJECTS); + peer->sendPacket(msg, false); + } + catch (...) + { + cLog(lsWARNING) << "Exception building fetch pach"; + } +} + // vim:ts=4 diff --git a/src/cpp/ripple/NetworkOPs.h b/src/cpp/ripple/NetworkOPs.h index b4aa58c21c..79859b61b1 100644 --- a/src/cpp/ripple/NetworkOPs.h +++ b/src/cpp/ripple/NetworkOPs.h @@ -260,6 +260,8 @@ public: bool hasTXSet(const boost::shared_ptr& peer, const uint256& set, ripple::TxSetStatus status); void mapComplete(const uint256& hash, SHAMap::ref map); bool stillNeedTXSet(const uint256& hash); + void makeFetchPack(Job&, boost::weak_ptr peer, boost::shared_ptr request, + Ledger::pointer wantLedger, Ledger::pointer haveLedger); // network state machine void checkState(const boost::system::error_code& result); diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/Peer.cpp index 6289dc9193..6280675d04 100644 --- a/src/cpp/ripple/Peer.cpp +++ b/src/cpp/ripple/Peer.cpp @@ -36,6 +36,8 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, mPeerId(peerID), mPrivate(false), mLoad(""), + mMinLedger(0), + mMaxLedger(0), mSocketSsl(io_service, ctx), mActivityTimer(io_service), mIOStrand(io_service) @@ -652,8 +654,8 @@ void Peer::processReadBuffer() case ripple::mtGET_OBJECTS: { event->reName("Peer::getobjects"); - ripple::TMGetObjectByHash msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + boost::shared_ptr msg = boost::make_shared(); + if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvGetObjectByHash(msg); else cLog(lsWARNING) << "parse error: " << type; @@ -1184,10 +1186,18 @@ void Peer::recvPeers(ripple::TMPeers& packet) } } -void Peer::recvGetObjectByHash(ripple::TMGetObjectByHash& packet) +void Peer::recvGetObjectByHash(const boost::shared_ptr& ptr) { + ripple::TMGetObjectByHash& packet = *ptr; + if (packet.query()) { // this is a query + if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK) + { + doFetchPack(ptr); + return; + } + ripple::TMGetObjectByHash reply; reply.set_query(false); @@ -1846,6 +1856,48 @@ void Peer::doProofOfWork(Job&, boost::weak_ptr peer, ProofOfWork::pointer } } +void Peer::doFetchPack(const boost::shared_ptr& packet) +{ + if (packet->ledgerhash().size() != 32) + { + cLog(lsWARNING) << "FetchPack hash size malformed"; + punishPeer(LT_InvalidRequest); + return; + } + uint256 hash; + memcpy(hash.begin(), packet->ledgerhash().data(), 32); + + Ledger::pointer haveLedger = theApp->getOPs().getLedgerByHash(hash); + if (!haveLedger) + { + cLog(lsINFO) << "Peer requests fetch pack for ledger we don't have: " << hash; + punishPeer(LT_RequestNoReply); + return; + } + if (!haveLedger->isClosed()) + { + cLog(lsWARNING) << "Peer requests fetch pack from open ledger: " << hash; + punishPeer(LT_InvalidRequest); + return; + } + + Ledger::pointer wantLedger = theApp->getOPs().getLedgerByHash(haveLedger->getParentHash()); + if (!wantLedger) + { + cLog(lsINFO) << "Peer requests fetch pack for ledger whose predecessor we don't have: " << hash; + punishPeer(LT_RequestNoReply); + return; + } + theApp->getJobQueue().addJob(jtPACK, "MakeFetchPack", + BIND_TYPE(&NetworkOPs::makeFetchPack, &theApp->getOPs(), P_1, + boost::weak_ptr(shared_from_this()), packet, wantLedger, haveLedger)); +} + +bool Peer::hasProto(int version) +{ + return mHello.has_protoversion() && (mHello.protoversion() >= version); +} + Json::Value Peer::getJson() { Json::Value ret(Json::objectValue); diff --git a/src/cpp/ripple/Peer.h b/src/cpp/ripple/Peer.h index 142f5e3389..d0c48b2cd9 100644 --- a/src/cpp/ripple/Peer.h +++ b/src/cpp/ripple/Peer.h @@ -47,6 +47,7 @@ private: uint64 mPeerId; bool mPrivate; // Keep peer IP private. LoadSource mLoad; + uint32 mMinLedger, mMaxLedger; uint256 mClosedLedgerHash; uint256 mPreviousLedgerHash; @@ -93,7 +94,7 @@ protected: void recvGetContacts(ripple::TMGetContacts& packet); void recvGetPeers(ripple::TMGetPeers& packet); void recvPeers(ripple::TMPeers& packet); - void recvGetObjectByHash(ripple::TMGetObjectByHash& packet); + void recvGetObjectByHash(const boost::shared_ptr& packet); void recvPing(ripple::TMPing& packet); void recvErrorMessage(ripple::TMErrorMsg& packet); void recvSearchTransaction(ripple::TMSearchTransaction& packet); @@ -111,6 +112,8 @@ protected: void addLedger(const uint256& ledger); void addTxSet(const uint256& TxSet); + void doFetchPack(const boost::shared_ptr& packet); + static void doProofOfWork(Job&, boost::weak_ptr, ProofOfWork::pointer); public: @@ -160,6 +163,8 @@ public: const RippleAddress& getNodePublic() const { return mNodePublic; } void cycleStatus() { mPreviousLedgerHash = mClosedLedgerHash; mClosedLedgerHash.zero(); } + bool hasProto(int version); + bool hasRange(uint32 uMin, uint32 uMax) { return (uMin >= mMinLedger) && (uMax <= mMaxLedger); } }; #endif diff --git a/src/cpp/ripple/SHAMap.h b/src/cpp/ripple/SHAMap.h index f76cc0e443..187f42694e 100644 --- a/src/cpp/ripple/SHAMap.h +++ b/src/cpp/ripple/SHAMap.h @@ -375,6 +375,7 @@ protected: SHAMapItem::pointer onlyBelow(SHAMapTreeNode*); void eraseChildren(SHAMapTreeNode::pointer); void dropBelow(SHAMapTreeNode*); + bool hasNode(const SHAMapNode& id, const uint256& hash); bool walkBranch(SHAMapTreeNode* node, SHAMapItem::ref otherMapItem, bool isFirstMap, SHAMapDiff& differences, int& maxCount); @@ -476,6 +477,9 @@ public: bool deepCompare(SHAMap& other); virtual void dump(bool withHashes = false); + typedef std::pair< uint256, std::vector > fetchPackEntry_t; + std::list getFetchPack(SHAMap* have, bool includeLeaves, int max); + static void sweep() { fullBelowCache.sweep(); } }; diff --git a/src/cpp/ripple/SHAMapSync.cpp b/src/cpp/ripple/SHAMapSync.cpp index 024540fd10..b6d3b2132c 100644 --- a/src/cpp/ripple/SHAMapSync.cpp +++ b/src/cpp/ripple/SHAMapSync.cpp @@ -461,6 +461,101 @@ bool SHAMap::deepCompare(SHAMap& other) return true; } +bool SHAMap::hasNode(const SHAMapNode& nodeID, const uint256& nodeHash) +{ + SHAMapTreeNode* node = root.get(); + while (node->isInner() && (node->getDepth() < nodeID.getDepth())) + { + int branch = node->selectBranch(nodeID.getNodeID()); + if (node->isEmptyBranch(branch)) + break; + node = getNodePointer(node->getChildNodeID(branch), node->getChildHash(branch)); + } + return node->getNodeHash() == nodeHash; +} + +std::list SHAMap::getFetchPack(SHAMap* have, bool includeLeaves, int max) +{ + std::list ret; + + boost::recursive_mutex::scoped_lock ul1(mLock); + + UPTR_T< boost::unique_lock > ul2; + if (have) + { + UPTR_T< boost::unique_lock > ul3( + new boost::unique_lock(have->mLock, boost::try_to_lock)); + if (!(*ul3)) + { + cLog(lsINFO) << "Unable to create pack due to lock"; + return ret; + } + ul2.swap(ul3); + } + + + if (root->isLeaf()) + { + if (includeLeaves && !root->getNodeHash().isZero() && + (!have || !have->hasNode(*root, root->getNodeHash()))) + { + Serializer s; + root->addRaw(s, snfPREFIX); + ret.push_back(fetchPackEntry_t(root->getNodeHash(), s.peekData())); + } + return ret; + } + + if (root->getNodeHash().isZero()) + return ret; + + if (have && (root->getNodeHash() == have->root->getNodeHash())) + return ret; + + std::stack stack; // contains unexplored non-matching inner node entries + stack.push(root.get()); + + while (!stack.empty()) + { + SHAMapTreeNode* node = stack.top(); + stack.pop(); + + // 1) Add this node to the pack + Serializer s; + node->addRaw(s, snfPREFIX); + ret.push_back(fetchPackEntry_t(node->getNodeHash(), s.peekData())); + --max; + + // 2) push non-matching child inner nodes + for (int i = 0; i < 16; ++i) + { + if (!node->isEmptyBranch(i)) + { + const uint256& childHash = node->getChildHash(i); + SHAMapNode childID = node->getChildNodeID(i); + + SHAMapTreeNode *next = getNodePointer(childID, childHash); + if (next->isInner()) + { + if (!have || !have->hasNode(*next, childHash)) + stack.push(next); + } + else if (includeLeaves && (!have || !have->hasNode(childID, childHash))) + { + Serializer s; + node->addRaw(s, snfPREFIX); + ret.push_back(fetchPackEntry_t(node->getNodeHash(), s.peekData())); + --max; + } + } + } + + if (max <= 0) + break; + } + return ret; +} + #ifdef DEBUG #define SMS_DEBUG #endif