diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index 8ba9ca548..a9822824c 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -37,6 +37,7 @@ const char* Job::toString(JobType t) switch(t) { case jtINVALID: return "invalid"; + case jtPACK: return "makeFetchPack"; case jtPUBOLDLEDGER: return "publishAcqLedger"; case jtVALIDATION_ut: return "untrustedValidation"; case jtPROOFWORK: return "proofOfWork"; diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h index 62e058c00..5fc8f620c 100644 --- a/src/cpp/ripple/JobQueue.h +++ b/src/cpp/ripple/JobQueue.h @@ -22,21 +22,22 @@ enum JobType { // must be in priority order, low to high jtINVALID = -1, - jtPUBOLDLEDGER = 1, // An old ledger has been accepted - jtVALIDATION_ut = 2, // A validation from an untrusted source - jtPROOFWORK = 3, // A proof of work demand from another server - jtPROPOSAL_ut = 4, // A proposal from an untrusted source - jtLEDGER_DATA = 5, // Received data for a ledger we're acquiring - jtCLIENT = 6, // A websocket command from the client - jtTRANSACTION = 7, // A transaction received from the network - jtPUBLEDGER = 8, // Publish a fully-accepted ledger - jtWAL = 9, // Write-ahead logging - jtVALIDATION_t = 10, // A validation from a trusted source - jtWRITE = 11, // Write out hashed objects - jtTRANSACTION_l = 12, // A local transaction - jtPROPOSAL_t = 13, // A proposal from a trusted source - jtADMIN = 14, // An administrative operation - jtDEATH = 15, // job of death, used internally + jtPACK = 1, // Make a fetch pack for a peer + jtPUBOLDLEDGER = 2, // An old ledger has been accepted + jtVALIDATION_ut = 3, // A validation from an untrusted source + jtPROOFWORK = 4, // A proof of work demand from another server + jtPROPOSAL_ut = 5, // A proposal from an untrusted source + jtLEDGER_DATA = 6, // Received data for a ledger we're acquiring + jtCLIENT = 7, // A websocket command from the client + jtTRANSACTION = 8, // A transaction received from the network + jtPUBLEDGER = 9, // Publish a fully-accepted ledger + jtWAL = 10, // Write-ahead logging + jtVALIDATION_t = 11, // A validation from a trusted source + jtWRITE = 12, // Write out hashed objects + jtTRANSACTION_l = 13, // A local transaction + jtPROPOSAL_t = 14, // A proposal from a trusted source + jtADMIN = 15, // An administrative operation + jtDEATH = 16, // job of death, used internally // special types not dispatched by the job pool jtPEER = 24, diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index 600e39f6b..104771731 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -2004,4 +2004,45 @@ void NetworkOPs::getBookPage(Ledger::pointer lpLedger, const uint160& uTakerPays // jvResult["nodes"] = Json::Value(Json::arrayValue); } +void NetworkOPs::makeFetchPack(Job&, boost::weak_ptr wPeer, boost::shared_ptr request, + Ledger::pointer prevLedger, Ledger::pointer reqLedger) +{ + Peer::pointer peer = wPeer.lock(); + if (!peer) + return; + + ripple::TMGetObjectByHash reply; + reply.set_query(false); + if (request->has_seq()) + reply.set_seq(request->seq()); + reply.set_ledgerhash(reply.ledgerhash()); + + std::list< std::pair > > pack1 = getSyncInfo(prevLedger->peekAccountStateMap(), + reqLedger->peekAccountStateMap(), 1024); + + typedef std::pair< uint256, std::vector > uvpair_t; + BOOST_FOREACH(uvpair_t& node, pack1) + { + ripple::TMIndexedObject& newObj = *reply.add_objects(); + newObj.set_hash(node.first.begin(), 256 / 8); + newObj.set_data(&node.second[0], node.second.size()); + } + + if (reqLedger->getAccountHash().isNonZero()) + { + SHAMapIterator it(*reqLedger->peekTransactionMap(), true, true); + for (SHAMapTreeNode* node = it.getNext(); node != NULL; node = it.getNext()) + { + Serializer s; + node->addRaw(s, snfPREFIX); + ripple::TMIndexedObject& newObj = *reply.add_objects(); + newObj.set_hash(node->getNodeHash().begin(), 256 / 8); + newObj.set_data(&s.peekData()[0], s.peekData().size()); + } + } + + PackedMessage::pointer msg = boost::make_shared(reply, ripple::mtGET_OBJECTS); + peer->sendPacket(msg, false); +} + // vim:ts=4 diff --git a/src/cpp/ripple/NetworkOPs.h b/src/cpp/ripple/NetworkOPs.h index 1471b5ae5..9bd01fa07 100644 --- a/src/cpp/ripple/NetworkOPs.h +++ b/src/cpp/ripple/NetworkOPs.h @@ -260,6 +260,8 @@ public: bool hasTXSet(const boost::shared_ptr& peer, const uint256& set, ripple::TxSetStatus status); void mapComplete(const uint256& hash, SHAMap::ref map); bool stillNeedTXSet(const uint256& hash); + void makeFetchPack(Job&, boost::weak_ptr peer, boost::shared_ptr request, + Ledger::pointer prevLedger, Ledger::pointer reqLedger); // network state machine void checkState(const boost::system::error_code& result); diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/Peer.cpp index a1e397cdd..cc7e4adc6 100644 --- a/src/cpp/ripple/Peer.cpp +++ b/src/cpp/ripple/Peer.cpp @@ -652,8 +652,8 @@ void Peer::processReadBuffer() case ripple::mtGET_OBJECTS: { event->reName("Peer::getobjects"); - ripple::TMGetObjectByHash msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + boost::shared_ptr msg = boost::make_shared(); + if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvGetObjectByHash(msg); else cLog(lsWARNING) << "parse error: " << type; @@ -1186,8 +1186,15 @@ void Peer::recvPeers(ripple::TMPeers& packet) } } -void Peer::recvGetObjectByHash(ripple::TMGetObjectByHash& packet) +void Peer::recvGetObjectByHash(const boost::shared_ptr& ptr) { + ripple::TMGetObjectByHash& packet = *ptr; + if (packet.type() == ripple::TMGetObjectByHash::otFETCH_PACK) + { + doFetchPack(ptr); + return; + } + if (packet.query()) { // this is a query ripple::TMGetObjectByHash reply; @@ -1848,6 +1855,50 @@ void Peer::doProofOfWork(Job&, boost::weak_ptr peer, ProofOfWork::pointer } } +void Peer::doFetchPack(const boost::shared_ptr& packet) +{ + if (packet->query()) + { + if (packet->ledgerhash().size() != 32) + { + cLog(lsWARNING) << "FetchPack hash size malformed"; + punishPeer(LT_InvalidRequest); + return; + } + uint256 hash; + memcpy(hash.begin(), packet->ledgerhash().data(), 32); + + Ledger::pointer reqLedger = theApp->getOPs().getLedgerByHash(hash); + if (!reqLedger) + { + cLog(lsINFO) << "Peer requests fetch pack for ledger we don't have: " << hash; + punishPeer(LT_RequestNoReply); + return; + } + if (!reqLedger->isClosed()) + { + cLog(lsWARNING) << "Peer requests fetch pack for open ledger: " << hash; + punishPeer(LT_InvalidRequest); + return; + } + + Ledger::pointer prevLedger = theApp->getOPs().getLedgerByHash(reqLedger->getParentHash()); + if (!prevLedger) + { + cLog(lsINFO) << "Peer requests fetch pack for ledger whose predecessor we don't have: " << hash; + punishPeer(LT_RequestNoReply); + return; + } + theApp->getJobQueue().addJob(jtPACK, "MakeFetchPack", + BIND_TYPE(&NetworkOPs::makeFetchPack, &theApp->getOPs(), P_1, + boost::weak_ptr(shared_from_this()), packet, prevLedger, reqLedger)); + } + else + { // received fetch pack + // WRITEME + } +} + Json::Value Peer::getJson() { Json::Value ret(Json::objectValue); diff --git a/src/cpp/ripple/Peer.h b/src/cpp/ripple/Peer.h index 142f5e338..bace95d35 100644 --- a/src/cpp/ripple/Peer.h +++ b/src/cpp/ripple/Peer.h @@ -93,7 +93,7 @@ protected: void recvGetContacts(ripple::TMGetContacts& packet); void recvGetPeers(ripple::TMGetPeers& packet); void recvPeers(ripple::TMPeers& packet); - void recvGetObjectByHash(ripple::TMGetObjectByHash& packet); + void recvGetObjectByHash(const boost::shared_ptr& packet); void recvPing(ripple::TMPing& packet); void recvErrorMessage(ripple::TMErrorMsg& packet); void recvSearchTransaction(ripple::TMSearchTransaction& packet); @@ -111,6 +111,8 @@ protected: void addLedger(const uint256& ledger); void addTxSet(const uint256& TxSet); + void doFetchPack(const boost::shared_ptr& packet); + static void doProofOfWork(Job&, boost::weak_ptr, ProofOfWork::pointer); public: diff --git a/src/cpp/ripple/SHAMap.cpp b/src/cpp/ripple/SHAMap.cpp index c0191f6c4..7a2b91ed9 100644 --- a/src/cpp/ripple/SHAMap.cpp +++ b/src/cpp/ripple/SHAMap.cpp @@ -25,7 +25,7 @@ DECLARE_INSTANCE(SHAMap); DECLARE_INSTANCE(SHAMapItem); DECLARE_INSTANCE(SHAMapTreeNode); -void SHAMapNode::setHash() const +void SHAMapNode::setMHash() const { std::size_t h = theApp->getNonceST() + (mDepth * 0x9e3779b9); const unsigned int *ptr = reinterpret_cast(mNodeID.begin()); @@ -36,7 +36,7 @@ void SHAMapNode::setHash() const std::size_t hash_value(const SHAMapNode& mn) { - return mn.getHash(); + return mn.getMHash(); } std::size_t hash_value(const uint256& u) diff --git a/src/cpp/ripple/SHAMap.h b/src/cpp/ripple/SHAMap.h index 68db98473..2c10efc25 100644 --- a/src/cpp/ripple/SHAMap.h +++ b/src/cpp/ripple/SHAMap.h @@ -35,7 +35,7 @@ private: int mDepth; mutable size_t mHash; - void setHash() const; + void setMHash() const; protected: SHAMapNode(int depth, const uint256& id, bool) : mNodeID(id), mDepth(depth), mHash(0) { ; } @@ -51,7 +51,7 @@ public: const uint256& getNodeID() const { return mNodeID; } bool isValid() const { return (mDepth >= 0) && (mDepth < 64); } bool isRoot() const { return mDepth == 0; } - size_t getHash() const { if (mHash == 0) setHash(); return mHash; } + size_t getMHash() const { if (mHash == 0) setMHash(); return mHash; } virtual bool isPopulated() const { return false; } @@ -329,10 +329,33 @@ public: static SMAddNode invalid() { return SMAddNode(true, false); } }; +class SHAMapIterator +{ +friend class SHAMap; + + typedef std::pair stack_t; + + SHAMap& mMap; + std::stack mStack; + bool mInner, mLeaf, mLock; + +public: + SHAMapIterator(SHAMap& map, bool returnInner, bool returnLeaf); + ~SHAMapIterator(); + + bool lock(); + bool unlock(); + void reset(); + + SHAMapTreeNode* getNext(); +}; + extern bool SMANCombine(SMAddNode& existing, const SMAddNode& additional); class SHAMap : public IS_INSTANCE(SHAMap) { +friend class SHAMapIterator; + public: typedef boost::shared_ptr pointer; typedef const boost::shared_ptr& ref; @@ -341,7 +364,7 @@ public: typedef std::map SHAMapDiff; typedef boost::unordered_map SHADirtyMap; -private: +protected: uint32 mSeq; mutable boost::recursive_mutex mLock; boost::unordered_map mTNByID; @@ -356,8 +379,6 @@ private: static KeyCache fullBelowCache; -protected: - void dirtyUp(std::stack& stack, const uint256& target, uint256 prevHash); std::stack getStack(const uint256& id, bool include_nonmatching_leaf, bool partialOk); SHAMapTreeNode::pointer walkTo(const uint256& id, bool modify); @@ -479,5 +500,8 @@ public: static void sweep() { fullBelowCache.sweep(); } }; +extern std::list< std::pair > > + getSyncInfo(SHAMap::pointer have, SHAMap::pointer want, int max); + #endif // vim:ts=4 diff --git a/src/cpp/ripple/SHAMapNodes.cpp b/src/cpp/ripple/SHAMapNodes.cpp index f171b7a09..c0342d535 100644 --- a/src/cpp/ripple/SHAMapNodes.cpp +++ b/src/cpp/ripple/SHAMapNodes.cpp @@ -571,6 +571,95 @@ std::ostream& operator<<(std::ostream& out, const SHAMapMissingNode& mn) return out; } +SHAMapIterator::SHAMapIterator(SHAMap& map, bool returnInner, bool returnLeaf) : + mMap(map), mInner(returnInner), mLeaf(returnLeaf), mLock(false) +{ + mStack.push(stack_t(mMap.root.get(), 0)); +} +SHAMapIterator::~SHAMapIterator() +{ + if (mLock) + { + while (!mStack.empty()) + mStack.pop(); + mMap.mLock.unlock(); + } +} + +bool SHAMapIterator::lock() +{ + if (mLock) + return false; + mMap.mLock.lock(); + mLock = true; + return true; +} + +bool SHAMapIterator::unlock() +{ + if (!mLock) + return false; + mMap.mLock.unlock(); + mLock = false; + return true; +} + +void SHAMapIterator::reset() +{ + while (!mStack.empty()) + mStack.pop(); + mStack.push(stack_t(mMap.root.get(), 0)); +} + +SHAMapTreeNode* SHAMapIterator::getNext() +{ + if (mStack.empty()) + return NULL; + + stack_t& top = mStack.top(); + + if (top.first->isLeaf()) + { // special case, map has only one leaf + SHAMapTreeNode* ret = mLeaf ? top.first : NULL; + mStack.pop(); + return ret; + } + + while (1) + { + while (top.second < 16) + { // continue where we left off + if (top.first->isEmptyBranch(top.second)) + ++top.second; + else + { + SHAMapTreeNode* next = mMap.getNodePointer( + top.first->getChildNodeID(top.second), top.first->getChildHash(top.second)); + ++top.second; + if (next->isLeaf()) + { + if (mLeaf) + return next; + } + else + { + mStack.push(stack_t(next, 0)); + top = mStack.top(); + } + } + } + if (top.second == 16) + { // we ran off the end of an inner node + SHAMapTreeNode* ret = top.first; + mStack.pop(); + if (mInner) + return ret; + if (mStack.empty()) // ran off the end of the root + return NULL; + top = mStack.top(); + } + } +} // vim:ts=4 diff --git a/src/cpp/ripple/SHAMapSync.cpp b/src/cpp/ripple/SHAMapSync.cpp index 2cd5e1c14..a5c34c46f 100644 --- a/src/cpp/ripple/SHAMapSync.cpp +++ b/src/cpp/ripple/SHAMapSync.cpp @@ -97,7 +97,7 @@ void SHAMap::getMissingNodes(std::vector& nodeIDs, std::vectorsetFullBelow(); if (mType == smtSTATE) { - fullBelowCache.add(node->getHash()); + fullBelowCache.add(node->getNodeHash()); dropBelow(node); } } @@ -162,7 +162,7 @@ std::vector SHAMap::getNeededHashes(int max) node->setFullBelow(); if (mType == smtSTATE) { - fullBelowCache.add(node->getHash()); + fullBelowCache.add(node->getNodeHash()); dropBelow(node); } } @@ -172,6 +172,38 @@ std::vector SHAMap::getNeededHashes(int max) return ret; } +std::list< std::pair > > + getSyncInfo(SHAMap::pointer have, SHAMap::pointer want, int max) +{ + std::list< std::pair< uint256, std::vector > > ret; + SHAMapIterator haveI(*have, true, false); + SHAMapIterator wantI(*want, true, false); + SHAMapTreeNode *haveN = haveI.getNext(); + SHAMapTreeNode *wantN = wantI.getNext(); + while (wantN != NULL) + { + if (haveN && (haveN->getNodeHash() == wantN->getNodeHash())) + { // they match, advance both + haveN = haveI.getNext(); + wantN = wantI.getNext(); + } + else if (haveN && (haveN->getNodeHash() < wantN->getNodeHash())) + { // need to advance have pointer + haveN = haveI.getNext(); + } + else + { // unmatched inner node + Serializer s; + wantN->addRaw(s, snfPREFIX); + ret.push_back(std::make_pair(wantN->getNodeHash(), s.peekData())); + if (--max <= 0) + break; + wantN = wantI.getNext(); + } + } + return ret; +} + bool SHAMap::getNodeFat(const SHAMapNode& wanted, std::vector& nodeIDs, std::list >& rawNodes, bool fatRoot, bool fatLeaves) { // Gets a node and some of its children diff --git a/src/cpp/ripple/ripple.proto b/src/cpp/ripple/ripple.proto index 0cb6c45ba..d35640ba1 100644 --- a/src/cpp/ripple/ripple.proto +++ b/src/cpp/ripple/ripple.proto @@ -233,6 +233,7 @@ message TMGetObjectByHash otTRANSACTION_NODE = 3; otSTATE_NODE = 4; otCAS_OBJECT = 5; + otFETCH_PACK = 6; } required ObjectType type = 1;