From bf60efcef29d2c9da0a6a01f40cc16cb92af0dce Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sat, 15 Jun 2013 01:28:18 -0700 Subject: [PATCH] We have to dispatch RPCServer operations onto a strand, otherwise an error while both a read and write are pending can crash. Conflicts: src/cpp/ripple/RPCServer.cpp src/cpp/ripple/RPCServer.h --- src/cpp/ripple/RPCServer.cpp | 128 ++++++++++++++++++----------------- src/cpp/ripple/RPCServer.h | 1 + 2 files changed, 67 insertions(+), 62 deletions(-) diff --git a/src/cpp/ripple/RPCServer.cpp b/src/cpp/ripple/RPCServer.cpp index 8336df025f..1486d26d94 100644 --- a/src/cpp/ripple/RPCServer.cpp +++ b/src/cpp/ripple/RPCServer.cpp @@ -19,16 +19,16 @@ SETUP_LOG (RPCServer) RPCServer::RPCServer(boost::asio::io_service& io_service, boost::asio::ssl::context& context, NetworkOPs* nopNetwork) - : mNetOps(nopNetwork), mSocket(io_service, context) + : mNetOps(nopNetwork), mSocket(io_service, context), mStrand(io_service) { mRole = RPCHandler::GUEST; } void RPCServer::connected() { - //std::cerr << "RPC request" << std::endl; - boost::asio::async_read_until(mSocket, mLineBuffer, "\r\n", - boost::bind(&RPCServer::handle_read_line, shared_from_this(), boost::asio::placeholders::error)); + //std::cerr << "RPC request" << std::endl; + boost::asio::async_read_until (mSocket, mLineBuffer, "\r\n", + mStrand.wrap (boost::bind (&RPCServer::handle_read_line, shared_from_this (), boost::asio::placeholders::error))); } void RPCServer::handle_read_req(const boost::system::error_code& e) @@ -48,8 +48,8 @@ void RPCServer::handle_read_req(const boost::system::error_code& e) else mReplyStr = handleRequest(req); - boost::asio::async_write(mSocket, boost::asio::buffer(mReplyStr), - boost::bind(&RPCServer::handle_write, shared_from_this(), boost::asio::placeholders::error)); + boost::asio::async_write (mSocket, boost::asio::buffer (mReplyStr), + mStrand.wrap (boost::bind (&RPCServer::handle_write, shared_from_this (), boost::asio::placeholders::error))); } static void dummy_handler() @@ -59,50 +59,53 @@ static void dummy_handler() void RPCServer::handle_read_line(const boost::system::error_code& e) { - if (e) - return; + if (e) + return; - HTTPRequestAction action = mHTTPRequest.consume(mLineBuffer); + HTTPRequestAction action = mHTTPRequest.consume (mLineBuffer); - if (action == haDO_REQUEST) - { // request with no body - WriteLog (lsWARNING, RPCServer) << "RPC HTTP request with no body"; - mSocket.async_shutdown(boost::bind(&dummy_handler)); - return; - } - else if (action == haREAD_LINE) - { - boost::asio::async_read_until(mSocket, mLineBuffer, "\r\n", - boost::bind(&RPCServer::handle_read_line, shared_from_this(), - boost::asio::placeholders::error)); - } - else if (action == haREAD_RAW) - { - int rLen = mHTTPRequest.getDataSize(); - if ((rLen < 0) || (rLen > RPC_MAXIMUM_QUERY)) - { - WriteLog (lsWARNING, RPCServer) << "Illegal RPC request length " << rLen; - mSocket.async_shutdown(boost::bind(&dummy_handler)); - return; - } + if (action == haDO_REQUEST) + { + // request with no body + WriteLog (lsWARNING, RPCServer) << "RPC HTTP request with no body"; + mSocket.async_shutdown (boost::bind (&dummy_handler)); + return; + } + else if (action == haREAD_LINE) + { + boost::asio::async_read_until (mSocket, mLineBuffer, "\r\n", + mStrand.wrap (boost::bind (&RPCServer::handle_read_line, shared_from_this (), + boost::asio::placeholders::error))); + } + else if (action == haREAD_RAW) + { + int rLen = mHTTPRequest.getDataSize (); - int alreadyHave = mLineBuffer.size(); + if ((rLen < 0) || (rLen > RPC_MAXIMUM_QUERY)) + { + WriteLog (lsWARNING, RPCServer) << "Illegal RPC request length " << rLen; + mSocket.async_shutdown (boost::bind (&dummy_handler)); + return; + } - if (alreadyHave < rLen) - { - mQueryVec.resize(rLen - alreadyHave); - boost::asio::async_read(mSocket, boost::asio::buffer(mQueryVec), - boost::bind(&RPCServer::handle_read_req, shared_from_this(), boost::asio::placeholders::error)); - WriteLog (lsTRACE, RPCServer) << "Waiting for completed request: " << rLen; - } - else - { // we have the whole thing - mQueryVec.resize(0); - handle_read_req(e); - } - } - else - mSocket.async_shutdown(boost::bind(&dummy_handler)); + int alreadyHave = mLineBuffer.size (); + + if (alreadyHave < rLen) + { + mQueryVec.resize (rLen - alreadyHave); + boost::asio::async_read (mSocket, boost::asio::buffer (mQueryVec), + mStrand.wrap (boost::bind (&RPCServer::handle_read_req, shared_from_this (), boost::asio::placeholders::error))); + WriteLog (lsTRACE, RPCServer) << "Waiting for completed request: " << rLen; + } + else + { + // we have the whole thing + mQueryVec.resize (0); + handle_read_req (e); + } + } + else + mSocket.async_shutdown (boost::bind (&dummy_handler)); } std::string RPCServer::handleRequest(const std::string& requestStr) @@ -181,24 +184,25 @@ bool RPCServer::parseAcceptRate(const std::string& sAcceptRate) void RPCServer::handle_write(const boost::system::error_code& e) { - //std::cerr << "async_write complete " << e << std::endl; + //std::cerr << "async_write complete " << e << std::endl; - if (!e) - { - HTTPRequestAction action = mHTTPRequest.requestDone(false); - if (action == haCLOSE_CONN) - mSocket.async_shutdown(boost::bind(&dummy_handler)); - else - { - boost::asio::async_read_until(mSocket, mLineBuffer, "\r\n", - boost::bind(&RPCServer::handle_read_line, shared_from_this(), boost::asio::placeholders::error)); - } - } + if (!e) + { + HTTPRequestAction action = mHTTPRequest.requestDone (false); - if (e != boost::asio::error::operation_aborted) - { - //connection_manager_.stop(shared_from_this()); - } + if (action == haCLOSE_CONN) + mSocket.async_shutdown (boost::bind (&dummy_handler)); + else + { + boost::asio::async_read_until (mSocket, mLineBuffer, "\r\n", + mStrand.wrap (boost::bind (&RPCServer::handle_read_line, shared_from_this (), boost::asio::placeholders::error))); + } + } + + if (e != boost::asio::error::operation_aborted) + { + //connection_manager_.stop(shared_from_this()); + } } // vim:ts=4 diff --git a/src/cpp/ripple/RPCServer.h b/src/cpp/ripple/RPCServer.h index 7855144185..ac337e89eb 100644 --- a/src/cpp/ripple/RPCServer.h +++ b/src/cpp/ripple/RPCServer.h @@ -16,6 +16,7 @@ private: NetworkOPs* mNetOps; AutoSocket mSocket; + boost::asio::io_service::strand mStrand; boost::asio::streambuf mLineBuffer; Blob mQueryVec;