We have to dispatch RPCServer operations onto a strand, otherwise

an error while both a read and write are pending can crash.

Conflicts:
	src/cpp/ripple/RPCServer.cpp
	src/cpp/ripple/RPCServer.h
This commit is contained in:
JoelKatz
2013-06-15 01:28:18 -07:00
parent 80520cbce7
commit bf60efcef2
2 changed files with 67 additions and 62 deletions

View File

@@ -19,16 +19,16 @@
SETUP_LOG (RPCServer)
RPCServer::RPCServer(boost::asio::io_service& io_service, boost::asio::ssl::context& context, NetworkOPs* nopNetwork)
: mNetOps(nopNetwork), mSocket(io_service, context)
: mNetOps(nopNetwork), mSocket(io_service, context), mStrand(io_service)
{
mRole = RPCHandler::GUEST;
}
void RPCServer::connected()
{
//std::cerr << "RPC request" << std::endl;
boost::asio::async_read_until(mSocket, mLineBuffer, "\r\n",
boost::bind(&RPCServer::handle_read_line, shared_from_this(), boost::asio::placeholders::error));
//std::cerr << "RPC request" << std::endl;
boost::asio::async_read_until (mSocket, mLineBuffer, "\r\n",
mStrand.wrap (boost::bind (&RPCServer::handle_read_line, shared_from_this (), boost::asio::placeholders::error)));
}
void RPCServer::handle_read_req(const boost::system::error_code& e)
@@ -48,8 +48,8 @@ void RPCServer::handle_read_req(const boost::system::error_code& e)
else
mReplyStr = handleRequest(req);
boost::asio::async_write(mSocket, boost::asio::buffer(mReplyStr),
boost::bind(&RPCServer::handle_write, shared_from_this(), boost::asio::placeholders::error));
boost::asio::async_write (mSocket, boost::asio::buffer (mReplyStr),
mStrand.wrap (boost::bind (&RPCServer::handle_write, shared_from_this (), boost::asio::placeholders::error)));
}
static void dummy_handler()
@@ -59,50 +59,53 @@ static void dummy_handler()
void RPCServer::handle_read_line(const boost::system::error_code& e)
{
if (e)
return;
if (e)
return;
HTTPRequestAction action = mHTTPRequest.consume(mLineBuffer);
HTTPRequestAction action = mHTTPRequest.consume (mLineBuffer);
if (action == haDO_REQUEST)
{ // request with no body
WriteLog (lsWARNING, RPCServer) << "RPC HTTP request with no body";
mSocket.async_shutdown(boost::bind(&dummy_handler));
return;
}
else if (action == haREAD_LINE)
{
boost::asio::async_read_until(mSocket, mLineBuffer, "\r\n",
boost::bind(&RPCServer::handle_read_line, shared_from_this(),
boost::asio::placeholders::error));
}
else if (action == haREAD_RAW)
{
int rLen = mHTTPRequest.getDataSize();
if ((rLen < 0) || (rLen > RPC_MAXIMUM_QUERY))
{
WriteLog (lsWARNING, RPCServer) << "Illegal RPC request length " << rLen;
mSocket.async_shutdown(boost::bind(&dummy_handler));
return;
}
if (action == haDO_REQUEST)
{
// request with no body
WriteLog (lsWARNING, RPCServer) << "RPC HTTP request with no body";
mSocket.async_shutdown (boost::bind (&dummy_handler));
return;
}
else if (action == haREAD_LINE)
{
boost::asio::async_read_until (mSocket, mLineBuffer, "\r\n",
mStrand.wrap (boost::bind (&RPCServer::handle_read_line, shared_from_this (),
boost::asio::placeholders::error)));
}
else if (action == haREAD_RAW)
{
int rLen = mHTTPRequest.getDataSize ();
int alreadyHave = mLineBuffer.size();
if ((rLen < 0) || (rLen > RPC_MAXIMUM_QUERY))
{
WriteLog (lsWARNING, RPCServer) << "Illegal RPC request length " << rLen;
mSocket.async_shutdown (boost::bind (&dummy_handler));
return;
}
if (alreadyHave < rLen)
{
mQueryVec.resize(rLen - alreadyHave);
boost::asio::async_read(mSocket, boost::asio::buffer(mQueryVec),
boost::bind(&RPCServer::handle_read_req, shared_from_this(), boost::asio::placeholders::error));
WriteLog (lsTRACE, RPCServer) << "Waiting for completed request: " << rLen;
}
else
{ // we have the whole thing
mQueryVec.resize(0);
handle_read_req(e);
}
}
else
mSocket.async_shutdown(boost::bind(&dummy_handler));
int alreadyHave = mLineBuffer.size ();
if (alreadyHave < rLen)
{
mQueryVec.resize (rLen - alreadyHave);
boost::asio::async_read (mSocket, boost::asio::buffer (mQueryVec),
mStrand.wrap (boost::bind (&RPCServer::handle_read_req, shared_from_this (), boost::asio::placeholders::error)));
WriteLog (lsTRACE, RPCServer) << "Waiting for completed request: " << rLen;
}
else
{
// we have the whole thing
mQueryVec.resize (0);
handle_read_req (e);
}
}
else
mSocket.async_shutdown (boost::bind (&dummy_handler));
}
std::string RPCServer::handleRequest(const std::string& requestStr)
@@ -181,24 +184,25 @@ bool RPCServer::parseAcceptRate(const std::string& sAcceptRate)
void RPCServer::handle_write(const boost::system::error_code& e)
{
//std::cerr << "async_write complete " << e << std::endl;
//std::cerr << "async_write complete " << e << std::endl;
if (!e)
{
HTTPRequestAction action = mHTTPRequest.requestDone(false);
if (action == haCLOSE_CONN)
mSocket.async_shutdown(boost::bind(&dummy_handler));
else
{
boost::asio::async_read_until(mSocket, mLineBuffer, "\r\n",
boost::bind(&RPCServer::handle_read_line, shared_from_this(), boost::asio::placeholders::error));
}
}
if (!e)
{
HTTPRequestAction action = mHTTPRequest.requestDone (false);
if (e != boost::asio::error::operation_aborted)
{
//connection_manager_.stop(shared_from_this());
}
if (action == haCLOSE_CONN)
mSocket.async_shutdown (boost::bind (&dummy_handler));
else
{
boost::asio::async_read_until (mSocket, mLineBuffer, "\r\n",
mStrand.wrap (boost::bind (&RPCServer::handle_read_line, shared_from_this (), boost::asio::placeholders::error)));
}
}
if (e != boost::asio::error::operation_aborted)
{
//connection_manager_.stop(shared_from_this());
}
}
// vim:ts=4

View File

@@ -16,6 +16,7 @@ private:
NetworkOPs* mNetOps;
AutoSocket mSocket;
boost::asio::io_service::strand mStrand;
boost::asio::streambuf mLineBuffer;
Blob mQueryVec;