diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index 904af7e8e4..45114723bd 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -39,6 +39,7 @@ const char* Job::toString(JobType t) case jtVALIDATION_ut: return "untrustedValidation"; case jtPROOFWORK: return "proofOfWork"; case jtPROPOSAL_ut: return "untrustedProposal"; + case jtLEDGER_DATA: return "ledgerData"; case jtCLIENT: return "clientCommand"; case jtTRANSACTION: return "transaction"; case jtPUBLEDGER: return "publishNewLedger"; diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h index bf439cc6e1..673499529b 100644 --- a/src/cpp/ripple/JobQueue.h +++ b/src/cpp/ripple/JobQueue.h @@ -25,16 +25,17 @@ enum JobType jtVALIDATION_ut = 2, // A validation from an untrusted source jtPROOFWORK = 3, // A proof of work demand from another server jtPROPOSAL_ut = 4, // A proposal from an untrusted source - jtCLIENT = 5, // A websocket command from the client - jtTRANSACTION = 6, // A transaction received from the network - jtPUBLEDGER = 7, // Publish a fully-accepted ledger - jtWAL = 8, // Write-ahead logging - jtVALIDATION_t = 9, // A validation from a trusted source - jtWRITE = 10, // Write out hashed objects - jtTRANSACTION_l = 11, // A local transaction - jtPROPOSAL_t = 12, // A proposal from a trusted source - jtADMIN = 13, // An administrative operation - jtDEATH = 14, // job of death, used internally + jtLEDGER_DATA = 5, // Received data for a ledger we're acquiring + jtCLIENT = 6, // A websocket command from the client + jtTRANSACTION = 7, // A transaction received from the network + jtPUBLEDGER = 8, // Publish a fully-accepted ledger + jtWAL = 9, // Write-ahead logging + jtVALIDATION_t = 10, // A validation from a trusted source + jtWRITE = 11, // Write out hashed objects + jtTRANSACTION_l = 12, // A local transaction + jtPROPOSAL_t = 13, // A proposal from a trusted source + jtADMIN = 14, // An administrative operation + jtDEATH = 15, // job of death, used internally // special types not dispatched by the job pool jtPEER = 24, diff --git a/src/cpp/ripple/LedgerAcquire.cpp b/src/cpp/ripple/LedgerAcquire.cpp index 6d04733078..0dc34887e9 100644 --- a/src/cpp/ripple/LedgerAcquire.cpp +++ b/src/cpp/ripple/LedgerAcquire.cpp @@ -477,6 +477,7 @@ void LedgerAcquire::trigger(Peer::ref peer) { cLog(lsDEBUG) << "Done:" << (mComplete ? " complete" : "") << (mFailed ? " failed " : " ") << mLedger->getLedgerSeq(); + sl.unlock(); done(); } } @@ -830,13 +831,19 @@ void LedgerAcquireMaster::dropLedger(const uint256& hash) mLedgers.erase(hash); } -SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref peer) +void LedgerAcquireMaster::gotLedgerData(Job&, boost::shared_ptr packet_ptr, + boost::weak_ptr wPeer) { + ripple::TMLedgerData& packet = *packet_ptr; + Peer::pointer peer = wPeer.lock(); + if (!peer) + return; + uint256 hash; if (packet.ledgerhash().size() != 32) { - std::cerr << "Acquire error" << std::endl; - return SMAddNode::invalid(); + peer->punishPeer(LT_InvalidRequest); + return; } memcpy(hash.begin(), packet.ledgerhash().data(), 32); cLog(lsTRACE) << "Got data (" << packet.nodes().size() << ") for acquiring ledger: " << hash; @@ -845,7 +852,8 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer: if (!ledger) { cLog(lsINFO) << "Got data for ledger we're not acquiring"; - return SMAddNode(); + peer->punishPeer(LT_InvalidRequest); + return; } if (packet.type() == ripple::liBASE) @@ -853,12 +861,14 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer: if (packet.nodes_size() < 1) { cLog(lsWARNING) << "Got empty base data"; - return SMAddNode::invalid(); + peer->punishPeer(LT_InvalidRequest); + return; } if (!ledger->takeBase(packet.nodes(0).nodedata())) { cLog(lsWARNING) << "Got invalid base data"; - return SMAddNode::invalid(); + peer->punishPeer(LT_InvalidRequest); + return; } SMAddNode san = SMAddNode::useful(); if ((packet.nodes().size() > 1) && !ledger->takeAsRootNode(strCopy(packet.nodes(1).nodedata()), san)) @@ -871,7 +881,7 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer: } if (!san.isInvalid()) ledger->trigger(peer); - return san; + return; } if ((packet.type() == ripple::liTX_NODE) || (packet.type() == ripple::liAS_NODE)) @@ -882,7 +892,8 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer: if (packet.nodes().size() <= 0) { cLog(lsINFO) << "Got response with no nodes"; - return SMAddNode::invalid(); + peer->punishPeer(LT_InvalidRequest); + return; } for (int i = 0; i < packet.nodes().size(); ++i) { @@ -890,7 +901,8 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer: if (!node.has_nodeid() || !node.has_nodedata()) { cLog(lsWARNING) << "Got bad node"; - return SMAddNode::invalid(); + peer->punishPeer(LT_InvalidRequest); + return; } nodeIDs.push_back(SHAMapNode(node.nodeid().data(), node.nodeid().size())); @@ -902,12 +914,12 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer: else ledger->takeAsNode(nodeIDs, nodeData, ret); if (!ret.isInvalid()) - ledger->trigger(peer); - return ret; + ledger->trigger(peer); + return; } cLog(lsWARNING) << "Not sure what ledger data we got"; - return SMAddNode::invalid(); + peer->punishPeer(LT_InvalidRequest); } void LedgerAcquireMaster::sweep() diff --git a/src/cpp/ripple/LedgerAcquire.h b/src/cpp/ripple/LedgerAcquire.h index 28c8434a02..78ab42994d 100644 --- a/src/cpp/ripple/LedgerAcquire.h +++ b/src/cpp/ripple/LedgerAcquire.h @@ -147,7 +147,7 @@ public: LedgerAcquire::pointer find(const uint256& hash); bool hasLedger(const uint256& ledgerHash); void dropLedger(const uint256& ledgerHash); - SMAddNode gotLedgerData(ripple::TMLedgerData& packet, Peer::ref); + void gotLedgerData(Job&, boost::shared_ptr packet, boost::weak_ptr peer); int getFetchCount(int& timeoutCount); void logFailure(const uint256& h) { mRecentFailures.add(h); } diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/Peer.cpp index d13e1bce60..51cb14a498 100644 --- a/src/cpp/ripple/Peer.cpp +++ b/src/cpp/ripple/Peer.cpp @@ -590,8 +590,8 @@ void Peer::processReadBuffer() case ripple::mtLEDGER_DATA: { event->reName("Peer::ledgerdata"); - ripple::TMLedgerData msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + boost::shared_ptr msg = boost::make_shared(); + if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvLedger(msg); else cLog(lsWARNING) << "parse error: " << type; @@ -1606,8 +1606,9 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet) sendPacket(oPacket, true); } -void Peer::recvLedger(ripple::TMLedgerData& packet) +void Peer::recvLedger(const boost::shared_ptr& packet_ptr) { + ripple::TMLedgerData& packet = *packet_ptr; if (packet.nodes().size() <= 0) { cLog(lsWARNING) << "Ledger/TXset data with no nodes"; @@ -1664,9 +1665,9 @@ void Peer::recvLedger(ripple::TMLedgerData& packet) return; } - SMAddNode san = theApp->getMasterLedgerAcquire().gotLedgerData(packet, shared_from_this()); - if (san.isInvalid()) - punishPeer(LT_UnwantedData); + theApp->getJobQueue().addJob(jtLEDGER_DATA, "gotLedgerData", + boost::bind(&LedgerAcquireMaster::gotLedgerData, &theApp->getMasterLedgerAcquire(), + _1, packet_ptr, boost::weak_ptr(shared_from_this()))); } bool Peer::hasLedger(const uint256& hash) const diff --git a/src/cpp/ripple/Peer.h b/src/cpp/ripple/Peer.h index a8fef6b6f8..1244a5170f 100644 --- a/src/cpp/ripple/Peer.h +++ b/src/cpp/ripple/Peer.h @@ -99,7 +99,7 @@ protected: void recvGetAccount(ripple::TMGetAccount& packet); void recvAccount(ripple::TMAccount& packet); void recvGetLedger(ripple::TMGetLedger& packet); - void recvLedger(ripple::TMLedgerData& packet); + void recvLedger(const boost::shared_ptr& packet); void recvStatus(ripple::TMStatusChange& packet); void recvPropose(const boost::shared_ptr& packet); void recvHaveTxSet(ripple::TMHaveTransactionSet& packet);