From e392366c63a43c88452ab16e4fa9ac1b8941751b Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Fri, 15 Mar 2013 22:34:55 -0700 Subject: [PATCH] Get ledger acquire work off the main thread and out of the master lock. Prioritize ledger acquire work appripriately. This should help prevent the server from becoming overly sluggish while acquiring ledgers. Still todo: Finish all pending receive operations before sending out any data requests. --- src/cpp/ripple/JobQueue.cpp | 1 + src/cpp/ripple/JobQueue.h | 21 ++++++++++--------- src/cpp/ripple/LedgerAcquire.cpp | 36 +++++++++++++++++++++----------- src/cpp/ripple/LedgerAcquire.h | 2 +- src/cpp/ripple/Peer.cpp | 13 ++++++------ src/cpp/ripple/Peer.h | 2 +- 6 files changed, 45 insertions(+), 30 deletions(-) diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index 904af7e8e4..45114723bd 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -39,6 +39,7 @@ const char* Job::toString(JobType t) case jtVALIDATION_ut: return "untrustedValidation"; case jtPROOFWORK: return "proofOfWork"; case jtPROPOSAL_ut: return "untrustedProposal"; + case jtLEDGER_DATA: return "ledgerData"; case jtCLIENT: return "clientCommand"; case jtTRANSACTION: return "transaction"; case jtPUBLEDGER: return "publishNewLedger"; diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h index bf439cc6e1..673499529b 100644 --- a/src/cpp/ripple/JobQueue.h +++ b/src/cpp/ripple/JobQueue.h @@ -25,16 +25,17 @@ enum JobType jtVALIDATION_ut = 2, // A validation from an untrusted source jtPROOFWORK = 3, // A proof of work demand from another server jtPROPOSAL_ut = 4, // A proposal from an untrusted source - jtCLIENT = 5, // A websocket command from the client - jtTRANSACTION = 6, // A transaction received from the network - jtPUBLEDGER = 7, // Publish a fully-accepted ledger - jtWAL = 8, // Write-ahead logging - jtVALIDATION_t = 9, // A validation from a trusted source - jtWRITE = 10, // Write out hashed objects - jtTRANSACTION_l = 11, // A local transaction - jtPROPOSAL_t = 12, // A proposal from a trusted source - jtADMIN = 13, // An administrative operation - jtDEATH = 14, // job of death, used internally + jtLEDGER_DATA = 5, // Received data for a ledger we're acquiring + jtCLIENT = 6, // A websocket command from the client + jtTRANSACTION = 7, // A transaction received from the network + jtPUBLEDGER = 8, // Publish a fully-accepted ledger + jtWAL = 9, // Write-ahead logging + jtVALIDATION_t = 10, // A validation from a trusted source + jtWRITE = 11, // Write out hashed objects + jtTRANSACTION_l = 12, // A local transaction + jtPROPOSAL_t = 13, // A proposal from a trusted source + jtADMIN = 14, // An administrative operation + jtDEATH = 15, // job of death, used internally // special types not dispatched by the job pool jtPEER = 24, diff --git a/src/cpp/ripple/LedgerAcquire.cpp b/src/cpp/ripple/LedgerAcquire.cpp index 6d04733078..0dc34887e9 100644 --- a/src/cpp/ripple/LedgerAcquire.cpp +++ b/src/cpp/ripple/LedgerAcquire.cpp @@ -477,6 +477,7 @@ void LedgerAcquire::trigger(Peer::ref peer) { cLog(lsDEBUG) << "Done:" << (mComplete ? " complete" : "") << (mFailed ? " failed " : " ") << mLedger->getLedgerSeq(); + sl.unlock(); done(); } } @@ -830,13 +831,19 @@ void LedgerAcquireMaster::dropLedger(const uint256& hash) mLedgers.erase(hash); } -SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref peer) +void LedgerAcquireMaster::gotLedgerData(Job&, boost::shared_ptr packet_ptr, + boost::weak_ptr wPeer) { + ripple::TMLedgerData& packet = *packet_ptr; + Peer::pointer peer = wPeer.lock(); + if (!peer) + return; + uint256 hash; if (packet.ledgerhash().size() != 32) { - std::cerr << "Acquire error" << std::endl; - return SMAddNode::invalid(); + peer->punishPeer(LT_InvalidRequest); + return; } memcpy(hash.begin(), packet.ledgerhash().data(), 32); cLog(lsTRACE) << "Got data (" << packet.nodes().size() << ") for acquiring ledger: " << hash; @@ -845,7 +852,8 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer: if (!ledger) { cLog(lsINFO) << "Got data for ledger we're not acquiring"; - return SMAddNode(); + peer->punishPeer(LT_InvalidRequest); + return; } if (packet.type() == ripple::liBASE) @@ -853,12 +861,14 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer: if (packet.nodes_size() < 1) { cLog(lsWARNING) << "Got empty base data"; - return SMAddNode::invalid(); + peer->punishPeer(LT_InvalidRequest); + return; } if (!ledger->takeBase(packet.nodes(0).nodedata())) { cLog(lsWARNING) << "Got invalid base data"; - return SMAddNode::invalid(); + peer->punishPeer(LT_InvalidRequest); + return; } SMAddNode san = SMAddNode::useful(); if ((packet.nodes().size() > 1) && !ledger->takeAsRootNode(strCopy(packet.nodes(1).nodedata()), san)) @@ -871,7 +881,7 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer: } if (!san.isInvalid()) ledger->trigger(peer); - return san; + return; } if ((packet.type() == ripple::liTX_NODE) || (packet.type() == ripple::liAS_NODE)) @@ -882,7 +892,8 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer: if (packet.nodes().size() <= 0) { cLog(lsINFO) << "Got response with no nodes"; - return SMAddNode::invalid(); + peer->punishPeer(LT_InvalidRequest); + return; } for (int i = 0; i < packet.nodes().size(); ++i) { @@ -890,7 +901,8 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer: if (!node.has_nodeid() || !node.has_nodedata()) { cLog(lsWARNING) << "Got bad node"; - return SMAddNode::invalid(); + peer->punishPeer(LT_InvalidRequest); + return; } nodeIDs.push_back(SHAMapNode(node.nodeid().data(), node.nodeid().size())); @@ -902,12 +914,12 @@ SMAddNode LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer: else ledger->takeAsNode(nodeIDs, nodeData, ret); if (!ret.isInvalid()) - ledger->trigger(peer); - return ret; + ledger->trigger(peer); + return; } cLog(lsWARNING) << "Not sure what ledger data we got"; - return SMAddNode::invalid(); + peer->punishPeer(LT_InvalidRequest); } void LedgerAcquireMaster::sweep() diff --git a/src/cpp/ripple/LedgerAcquire.h b/src/cpp/ripple/LedgerAcquire.h index 28c8434a02..78ab42994d 100644 --- a/src/cpp/ripple/LedgerAcquire.h +++ b/src/cpp/ripple/LedgerAcquire.h @@ -147,7 +147,7 @@ public: LedgerAcquire::pointer find(const uint256& hash); bool hasLedger(const uint256& ledgerHash); void dropLedger(const uint256& ledgerHash); - SMAddNode gotLedgerData(ripple::TMLedgerData& packet, Peer::ref); + void gotLedgerData(Job&, boost::shared_ptr packet, boost::weak_ptr peer); int getFetchCount(int& timeoutCount); void logFailure(const uint256& h) { mRecentFailures.add(h); } diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/Peer.cpp index d13e1bce60..51cb14a498 100644 --- a/src/cpp/ripple/Peer.cpp +++ b/src/cpp/ripple/Peer.cpp @@ -590,8 +590,8 @@ void Peer::processReadBuffer() case ripple::mtLEDGER_DATA: { event->reName("Peer::ledgerdata"); - ripple::TMLedgerData msg; - if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) + boost::shared_ptr msg = boost::make_shared(); + if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) recvLedger(msg); else cLog(lsWARNING) << "parse error: " << type; @@ -1606,8 +1606,9 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet) sendPacket(oPacket, true); } -void Peer::recvLedger(ripple::TMLedgerData& packet) +void Peer::recvLedger(const boost::shared_ptr& packet_ptr) { + ripple::TMLedgerData& packet = *packet_ptr; if (packet.nodes().size() <= 0) { cLog(lsWARNING) << "Ledger/TXset data with no nodes"; @@ -1664,9 +1665,9 @@ void Peer::recvLedger(ripple::TMLedgerData& packet) return; } - SMAddNode san = theApp->getMasterLedgerAcquire().gotLedgerData(packet, shared_from_this()); - if (san.isInvalid()) - punishPeer(LT_UnwantedData); + theApp->getJobQueue().addJob(jtLEDGER_DATA, "gotLedgerData", + boost::bind(&LedgerAcquireMaster::gotLedgerData, &theApp->getMasterLedgerAcquire(), + _1, packet_ptr, boost::weak_ptr(shared_from_this()))); } bool Peer::hasLedger(const uint256& hash) const diff --git a/src/cpp/ripple/Peer.h b/src/cpp/ripple/Peer.h index a8fef6b6f8..1244a5170f 100644 --- a/src/cpp/ripple/Peer.h +++ b/src/cpp/ripple/Peer.h @@ -99,7 +99,7 @@ protected: void recvGetAccount(ripple::TMGetAccount& packet); void recvAccount(ripple::TMAccount& packet); void recvGetLedger(ripple::TMGetLedger& packet); - void recvLedger(ripple::TMLedgerData& packet); + void recvLedger(const boost::shared_ptr& packet); void recvStatus(ripple::TMStatusChange& packet); void recvPropose(const boost::shared_ptr& packet); void recvHaveTxSet(ripple::TMHaveTransactionSet& packet);