diff --git a/src/cpp/ripple/ConnectionPool.cpp b/src/cpp/ripple/ConnectionPool.cpp index 3d184d4cda..89f0d55489 100644 --- a/src/cpp/ripple/ConnectionPool.cpp +++ b/src/cpp/ripple/ConnectionPool.cpp @@ -241,7 +241,7 @@ int ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& m if ((!fromPeer || !(peer.get() == fromPeer)) && peer->isConnected()) { ++sentTo; - peer->sendPacket(msg); + peer->sendPacket(msg, false); } } @@ -254,7 +254,7 @@ void ConnectionPool::relayMessageBut(const std::set& fromPeers, const Pa BOOST_FOREACH(Peer::ref peer, peerVector) { if (peer->isConnected() && (fromPeers.count(peer->getPeerId()) == 0)) - peer->sendPacket(msg); + peer->sendPacket(msg, false); } } @@ -265,7 +265,7 @@ void ConnectionPool::relayMessageTo(const std::set& fromPeers, const Pac BOOST_FOREACH(Peer::ref peer, peerVector) { if (peer->isConnected() && (fromPeers.count(peer->getPeerId()) != 0)) - peer->sendPacket(msg); + peer->sendPacket(msg, false); } } diff --git a/src/cpp/ripple/Ledger.cpp b/src/cpp/ripple/Ledger.cpp index 73fc51eb90..bbd127b659 100644 --- a/src/cpp/ripple/Ledger.cpp +++ b/src/cpp/ripple/Ledger.cpp @@ -514,6 +514,8 @@ void Ledger::saveAcceptedLedger(Job&, bool fromConsensus) if (theApp->getJobQueue().getJobCountTotal(jtPUBOLDLEDGER) < 2) theApp->getLedgerMaster().resumeAcquiring(); + else + cLog(lsDEBUG) << "no resume, too many pending ledger saves"; } #ifndef NO_SQLITE3_PREPARE diff --git a/src/cpp/ripple/LedgerAcquire.cpp b/src/cpp/ripple/LedgerAcquire.cpp index 1cd2ef1c48..1a299fbe88 100644 --- a/src/cpp/ripple/LedgerAcquire.cpp +++ b/src/cpp/ripple/LedgerAcquire.cpp @@ -183,7 +183,7 @@ void LedgerAcquire::onTimer(bool progress) { if (getTimeouts() > LEDGER_TIMEOUT_COUNT) { - cLog(lsWARNING) << "Six timeouts for ledger " << mHash; + cLog(lsWARNING) << "Too many timeouts for ledger " << mHash; setFailed(); done(); return; @@ -351,7 +351,7 @@ void LedgerAcquire::trigger(Peer::ref peer) if (iPeer) { mByHash = false; - iPeer->sendPacket(packet); + iPeer->sendPacket(packet, false); } } } @@ -485,7 +485,7 @@ void PeerSet::sendRequest(const ripple::TMGetLedger& tmGL, Peer::ref peer) if (!peer) sendRequest(tmGL); else - peer->sendPacket(boost::make_shared(tmGL, ripple::mtGET_LEDGER)); + peer->sendPacket(boost::make_shared(tmGL, ripple::mtGET_LEDGER), false); } void PeerSet::sendRequest(const ripple::TMGetLedger& tmGL) @@ -499,7 +499,7 @@ void PeerSet::sendRequest(const ripple::TMGetLedger& tmGL) { Peer::pointer peer = theApp->getConnectionPool().getPeerById(it->first); if (peer) - peer->sendPacket(packet); + peer->sendPacket(packet, false); } } diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/Peer.cpp index a96da59c02..d13e1bce60 100644 --- a/src/cpp/ripple/Peer.cpp +++ b/src/cpp/ripple/Peer.cpp @@ -37,20 +37,19 @@ Peer::Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, mPrivate(false), mLoad(""), mSocketSsl(io_service, ctx), - mActivityTimer(io_service) + mActivityTimer(io_service), + mIOStrand(io_service) { cLog(lsDEBUG) << "CREATING PEER: " << ADDRESS(this); } void Peer::handleWrite(const boost::system::error_code& error, size_t bytes_transferred) -{ +{ // Call on IO strand #ifdef DEBUG // if (!error) // std::cerr << "Peer::handleWrite bytes: "<< bytes_transferred << std::endl; #endif - boost::recursive_mutex::scoped_lock sl(ioMutex); - mSendingPacket.reset(); if (mDetaching) @@ -62,7 +61,7 @@ void Peer::handleWrite(const boost::system::error_code& error, size_t bytes_tran { cLog(lsINFO) << "Peer: Write: Error: " << ADDRESS(this) << ": bytes=" << bytes_transferred << ": " << error.category().name() << ": " << error.message() << ": " << error; - detach("hw"); + detach("hw", true); } else if (!mSendQ.empty()) { @@ -86,10 +85,13 @@ void Peer::setIpPort(const std::string& strIP, int iPort) << (mNodePublic.isValid() ? mNodePublic.humanNodePublic() : "-") << " " << getIP() << " " << getPort(); } -void Peer::detach(const char *rsn) +void Peer::detach(const char *rsn, bool onIOStrand) { - boost::recursive_mutex::scoped_lock sl(ioMutex); - + if (!onIOStrand) + { + mIOStrand.post(boost::bind(&Peer::detach, shared_from_this(), rsn, true)); + return; + } if (!mDetaching) { mDetaching = true; // Race is ok. @@ -103,7 +105,8 @@ void Peer::detach(const char *rsn) mSendQ.clear(); (void) mActivityTimer.cancel(); - mSocketSsl.async_shutdown(boost::bind(&Peer::handleShutdown, shared_from_this(), boost::asio::placeholders::error)); + mSocketSsl.async_shutdown(mIOStrand.wrap(boost::bind(&Peer::handleShutdown, shared_from_this(), + boost::asio::placeholders::error))); if (mNodePublic.isValid()) { @@ -130,13 +133,13 @@ void Peer::detach(const char *rsn) } void Peer::handlePingTimer(const boost::system::error_code& ecResult) -{ +{ // called on IO strand if (ecResult || mDetaching) return; if (mActive == 1) { // ping out - detach("pto"); + detach("pto", true); return; } @@ -145,14 +148,14 @@ void Peer::handlePingTimer(const boost::system::error_code& ecResult) mActive = 1; ripple::TMPing packet; packet.set_type(ripple::TMPing::ptPING); - sendPacket(boost::make_shared(packet, ripple::mtPING)); + sendPacket(boost::make_shared(packet, ripple::mtPING), true); } else // active->idle mActive = 0; mActivityTimer.expires_from_now(boost::posix_time::seconds(NODE_IDLE_SECONDS)); - mActivityTimer.async_wait(boost::bind(&Peer::handlePingTimer, shared_from_this(), - boost::asio::placeholders::error)); + mActivityTimer.async_wait(mIOStrand.wrap(boost::bind(&Peer::handlePingTimer, shared_from_this(), + boost::asio::placeholders::error))); } @@ -173,7 +176,7 @@ void Peer::handleVerifyTimer(const boost::system::error_code& ecResult) { //cLog(lsINFO) << "Peer: Verify: Peer failed to verify in time."; - detach("hvt"); + detach("hvt", true); } } @@ -198,19 +201,19 @@ void Peer::connect(const std::string& strIp, int iPort) if (err || itrEndpoint == boost::asio::ip::tcp::resolver::iterator()) { cLog(lsWARNING) << "Peer: Connect: Bad IP: " << strIp; - detach("c"); + detach("c", false); return; } else { mActivityTimer.expires_from_now(boost::posix_time::seconds(NODE_VERIFY_SECONDS), err); - mActivityTimer.async_wait(boost::bind(&Peer::handleVerifyTimer, shared_from_this(), - boost::asio::placeholders::error)); + mActivityTimer.async_wait(mIOStrand.wrap(boost::bind(&Peer::handleVerifyTimer, shared_from_this(), + boost::asio::placeholders::error))); if (err) { cLog(lsWARNING) << "Peer: Connect: Failed to set timer."; - detach("c2"); + detach("c2", false); return; } } @@ -219,15 +222,14 @@ void Peer::connect(const std::string& strIp, int iPort) { cLog(lsINFO) << "Peer: Connect: Outbound: " << ADDRESS(this) << ": " << mIpPort.first << " " << mIpPort.second; - boost::recursive_mutex::scoped_lock sl(ioMutex); boost::asio::async_connect( getSocket(), itrEndpoint, - boost::bind( + mIOStrand.wrap(boost::bind( &Peer::handleConnect, shared_from_this(), boost::asio::placeholders::error, - boost::asio::placeholders::iterator)); + boost::asio::placeholders::iterator))); } } @@ -240,7 +242,7 @@ void Peer::handleStart(const boost::system::error_code& error) if (error) { cLog(lsINFO) << "Peer: Handshake: Error: " << error.category().name() << ": " << error.message() << ": " << error; - detach("hs"); + detach("hs", true); } else { @@ -255,18 +257,16 @@ void Peer::handleConnect(const boost::system::error_code& error, boost::asio::ip if (error) { cLog(lsINFO) << "Peer: Connect: Error: " << error.category().name() << ": " << error.message() << ": " << error; - detach("hc"); + detach("hc", true); } else { cLog(lsINFO) << "Connect peer: success."; - boost::recursive_mutex::scoped_lock sl(ioMutex); - mSocketSsl.set_verify_mode(boost::asio::ssl::verify_none); mSocketSsl.async_handshake(boost::asio::ssl::stream::client, - boost::bind(&Peer::handleStart, shared_from_this(), boost::asio::placeholders::error)); + mIOStrand.wrap(boost::bind(&Peer::handleStart, shared_from_this(), boost::asio::placeholders::error))); } } @@ -284,8 +284,6 @@ void Peer::connected(const boost::system::error_code& error) if (iPort == SYSTEM_PEER_PORT) //TODO: Why are you doing this? iPort = -1; - boost::recursive_mutex::scoped_lock sl(ioMutex); - if (!error) { // Not redundant ip and port, handshake, and start. @@ -296,35 +294,38 @@ void Peer::connected(const boost::system::error_code& error) mSocketSsl.set_verify_mode(boost::asio::ssl::verify_none); mSocketSsl.async_handshake(boost::asio::ssl::stream::server, - boost::bind(&Peer::handleStart, shared_from_this(), boost::asio::placeholders::error)); + mIOStrand.wrap(boost::bind(&Peer::handleStart, shared_from_this(), boost::asio::placeholders::error))); } else if (!mDetaching) { cLog(lsINFO) << "Peer: Inbound: Error: " << ADDRESS(this) << ": " << strIp << " " << iPort << " : " << error.category().name() << ": " << error.message() << ": " << error; - detach("ctd"); + detach("ctd", false); } } void Peer::sendPacketForce(const PackedMessage::pointer& packet) -{ // must hold I/O mutex +{ // must be on IO strand if (!mDetaching) { mSendingPacket = packet; boost::asio::async_write(mSocketSsl, boost::asio::buffer(packet->getBuffer()), - boost::bind(&Peer::handleWrite, shared_from_this(), + mIOStrand.wrap(boost::bind(&Peer::handleWrite, shared_from_this(), boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + boost::asio::placeholders::bytes_transferred))); } } -void Peer::sendPacket(const PackedMessage::pointer& packet) +void Peer::sendPacket(const PackedMessage::pointer& packet, bool onStrand) { - boost::recursive_mutex::scoped_lock sl(ioMutex); - if (packet) { + if (!onStrand) + { + mIOStrand.post(boost::bind(&Peer::sendPacket, shared_from_this(), packet, true)); + return; + } if (mSendingPacket) { mSendQ.push_back(packet); @@ -338,15 +339,13 @@ void Peer::sendPacket(const PackedMessage::pointer& packet) void Peer::startReadHeader() { - boost::recursive_mutex::scoped_lock sl(ioMutex); - if (!mDetaching) { mReadbuf.clear(); mReadbuf.resize(HEADER_SIZE); - boost::asio::async_read(mSocketSsl, boost::asio::buffer(mReadbuf), - boost::bind(&Peer::handleReadHeader, shared_from_this(), boost::asio::placeholders::error)); + boost::asio::async_read(mSocketSsl, boost::asio::buffer(mReadbuf), mIOStrand.wrap( + boost::bind(&Peer::handleReadHeader, shared_from_this(), boost::asio::placeholders::error))); } } @@ -356,21 +355,17 @@ void Peer::startReadBody(unsigned msg_len) // bytes. Expand it to fit in the body as well, and start async // read into the body. - boost::recursive_mutex::scoped_lock sl(ioMutex); - if (!mDetaching) { mReadbuf.resize(HEADER_SIZE + msg_len); boost::asio::async_read(mSocketSsl, boost::asio::buffer(&mReadbuf[HEADER_SIZE], msg_len), - boost::bind(&Peer::handleReadBody, shared_from_this(), boost::asio::placeholders::error)); + mIOStrand.wrap(boost::bind(&Peer::handleReadBody, shared_from_this(), boost::asio::placeholders::error))); } } void Peer::handleReadHeader(const boost::system::error_code& error) { - boost::recursive_mutex::scoped_lock sl(ioMutex); - if (mDetaching) { // Drop data or error if detaching. @@ -382,7 +377,7 @@ void Peer::handleReadHeader(const boost::system::error_code& error) // WRITEME: Compare to maximum message length, abort if too large if ((msg_len > (32 * 1024 * 1024)) || (msg_len == 0)) { - detach("hrh"); + detach("hrh", true); return; } startReadBody(msg_len); @@ -390,26 +385,22 @@ void Peer::handleReadHeader(const boost::system::error_code& error) else { cLog(lsINFO) << "Peer: Header: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error; - detach("hrh2"); + detach("hrh2", true); } } void Peer::handleReadBody(const boost::system::error_code& error) { + if (mDetaching) { - boost::recursive_mutex::scoped_lock sl(ioMutex); - - if (mDetaching) - { - return; - } - else if (error) - { - cLog(lsINFO) << "Peer: Body: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error; - boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock()); - detach("hrb"); - return; - } + return; + } + else if (error) + { + cLog(lsINFO) << "Peer: Body: Error: " << ADDRESS(this) << ": " << error.category().name() << ": " << error.message() << ": " << error; + boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock()); + detach("hrb", true); + return; } processReadBuffer(); @@ -433,7 +424,7 @@ void Peer::processReadBuffer() if (mHelloed == (type == ripple::mtHELLO)) { cLog(lsWARNING) << "Wrong message type: " << type; - detach("prb1"); + detach("prb1", true); } else { @@ -684,8 +675,8 @@ void Peer::recvHello(ripple::TMHello& packet) (void) mActivityTimer.cancel(); mActivityTimer.expires_from_now(boost::posix_time::seconds(NODE_IDLE_SECONDS)); - mActivityTimer.async_wait(boost::bind(&Peer::handlePingTimer, shared_from_this(), - boost::asio::placeholders::error)); + mActivityTimer.async_wait(mIOStrand.wrap(boost::bind(&Peer::handlePingTimer, shared_from_this(), + boost::asio::placeholders::error))); uint32 ourTime = theApp->getOPs().getNetworkTimeNC(); uint32 minTime = ourTime - 20; @@ -807,7 +798,7 @@ void Peer::recvHello(ripple::TMHello& packet) if (bDetach) { mNodePublic.clear(); - detach("recvh"); + detach("recvh", true); } else { @@ -939,8 +930,9 @@ static void checkPropose(Job& job, boost::shared_ptr packe if (isTrusted) { - theApp->getIOService().post(boost::bind(&NetworkOPs::processTrustedProposal, &theApp->getOPs(), - proposal, packet, nodePublic, prevLedger, sigGood)); + theApp->getJobQueue().addJob(jtPROPOSAL_t, "trustedProposal", + boost::bind(&NetworkOPs::processTrustedProposal, &theApp->getOPs(), + proposal, packet, nodePublic, prevLedger, sigGood)); } else if (sigGood && (prevLedger == consensusLCL)) { // relay untrusted proposal @@ -1139,7 +1131,7 @@ void Peer::recvGetPeers(ripple::TMGetPeers& packet) } PackedMessage::pointer message = boost::make_shared(peers, ripple::mtPEERS); - sendPacket(message); + sendPacket(message, true); } } @@ -1200,7 +1192,7 @@ void Peer::recvGetObjectByHash(ripple::TMGetObjectByHash& packet) } cLog(lsTRACE) << "GetObjByHash had " << reply.objects_size() << " of " << packet.objects_size() << " for " << getIP(); - sendPacket(boost::make_shared(reply, ripple::mtGET_OBJECTS)); + sendPacket(boost::make_shared(reply, ripple::mtGET_OBJECTS), true); } else { // this is a reply @@ -1248,7 +1240,7 @@ void Peer::recvPing(ripple::TMPing& packet) if (packet.type() == ripple::TMPing::ptPING) { packet.set_type(ripple::TMPing::ptPONG); - sendPacket(boost::make_shared(packet, ripple::mtPING)); + sendPacket(boost::make_shared(packet, ripple::mtPING), true); } else if (packet.type() == ripple::TMPing::ptPONG) { @@ -1419,7 +1411,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet) } Peer::ref selectedPeer = usablePeers[rand() % usablePeers.size()]; packet.set_requestcookie(getPeerId()); - selectedPeer->sendPacket(boost::make_shared(packet, ripple::mtGET_LEDGER)); + selectedPeer->sendPacket(boost::make_shared(packet, ripple::mtGET_LEDGER), false); return; } cLog(lsERROR) << "We do not have the map our peer wants"; @@ -1466,7 +1458,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet) } Peer::ref selectedPeer = usablePeers[rand() % usablePeers.size()]; packet.set_requestcookie(getPeerId()); - selectedPeer->sendPacket(boost::make_shared(packet, ripple::mtGET_LEDGER)); + selectedPeer->sendPacket(boost::make_shared(packet, ripple::mtGET_LEDGER), false); cLog(lsDEBUG) << "Ledger request routed"; return; } @@ -1536,7 +1528,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet) } PackedMessage::pointer oPacket = boost::make_shared(reply, ripple::mtLEDGER_DATA); - sendPacket(oPacket); + sendPacket(oPacket, true); return; } @@ -1611,7 +1603,7 @@ void Peer::recvGetLedger(ripple::TMGetLedger& packet) } } PackedMessage::pointer oPacket = boost::make_shared(reply, ripple::mtLEDGER_DATA); - sendPacket(oPacket); + sendPacket(oPacket, true); } void Peer::recvLedger(ripple::TMLedgerData& packet) @@ -1629,7 +1621,7 @@ void Peer::recvLedger(ripple::TMLedgerData& packet) if (target) { packet.clear_requestcookie(); - target->sendPacket(boost::make_shared(packet, ripple::mtLEDGER_DATA)); + target->sendPacket(boost::make_shared(packet, ripple::mtLEDGER_DATA), true); } else { @@ -1717,8 +1709,6 @@ void Peer::addTxSet(const uint256& hash) // (both sides get the same information, neither side controls it) void Peer::getSessionCookie(std::string& strDst) { - boost::recursive_mutex::scoped_lock sl(ioMutex); - SSL* ssl = mSocketSsl.native_handle(); if (!ssl) throw std::runtime_error("No underlying connection"); @@ -1776,7 +1766,7 @@ void Peer::sendHello() } PackedMessage::pointer packet = boost::make_shared(h, ripple::mtHELLO); - sendPacket(packet); + sendPacket(packet, true); } void Peer::sendGetPeers() @@ -1788,7 +1778,7 @@ void Peer::sendGetPeers() PackedMessage::pointer packet = boost::make_shared(getPeers, ripple::mtGET_PEERS); - sendPacket(packet); + sendPacket(packet, true); } void Peer::punishPeer(LoadType l) @@ -1817,7 +1807,7 @@ void Peer::doProofOfWork(Job&, boost::weak_ptr peer, ProofOfWork::pointer ripple::TMProofWork reply; reply.set_token(pow->getToken()); reply.set_response(solution.begin(), solution.size()); - pptr->sendPacket(boost::make_shared(reply, ripple::mtPROOFOFWORK)); + pptr->sendPacket(boost::make_shared(reply, ripple::mtPROOFOFWORK), false); } else { diff --git a/src/cpp/ripple/Peer.h b/src/cpp/ripple/Peer.h index 06a07e5d9e..a8fef6b6f8 100644 --- a/src/cpp/ripple/Peer.h +++ b/src/cpp/ripple/Peer.h @@ -62,12 +62,12 @@ private: protected: - boost::recursive_mutex ioMutex; - std::vector mReadbuf; - std::list mSendQ; - PackedMessage::pointer mSendingPacket; - ripple::TMStatusChange mLastStatus; - ripple::TMHello mHello; + boost::asio::io_service::strand mIOStrand; + std::vector mReadbuf; + std::list mSendQ; + PackedMessage::pointer mSendingPacket; + ripple::TMStatusChange mLastStatus; + ripple::TMHello mHello; Peer(boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 peerId, bool inbound); @@ -133,11 +133,11 @@ public: void connect(const std::string& strIp, int iPort); void connected(const boost::system::error_code& error); - void detach(const char *); + void detach(const char *, bool onIOStrand); bool samePeer(Peer::ref p) { return samePeer(*p); } bool samePeer(const Peer& p) { return this == &p; } - void sendPacket(const PackedMessage::pointer& packet); + void sendPacket(const PackedMessage::pointer& packet, bool onStrand); void sendLedgerProposal(Ledger::ref ledger); void sendFullLedger(Ledger::ref ledger); void sendGetFullLedger(uint256& hash);