Use JobQueue to process RPC-JSON asynchronously

This commit is contained in:
Vinnie Falco
2013-09-22 01:07:58 -07:00
parent 7dd41ffb5b
commit d16aa7f928
14 changed files with 215 additions and 85 deletions

View File

@@ -42,13 +42,6 @@ public:
/** Input: The Content-Body as a linear buffer if we have the HTTPRequest. */
std::string content;
/** Output: The buffer to send back as a reply.
Upon each entry into the callback, reply.size() will be zero.
If reply.size() is zero when the callback returns, no data is
sent.
*/
std::string reply;
/** A user-definable pointer.
The initial value is always zero.
Changes to the value are persisted between calls.

View File

@@ -83,17 +83,6 @@ void Door::handle_accept (error_code ec, Peer::Ptr peer, CompletionCounter)
async_accept();
// Save remote address in session
peer->session().remoteAddress = from_asio (
peer->get_socket().remote_endpoint()).withPort (0);
m_impl.handler().onAccept (peer->session());
if (peer->session().closed())
{
peer->cancel();
return;
}
peer->handle_accept();
}

View File

@@ -16,6 +16,7 @@ Peer::Peer (ServerImpl& impl, Port const& port)
, m_parser (HTTPParser::typeRequest)
, m_session (*this)
, m_writesPending (0)
, m_closed (false)
, m_callClose (false)
{
int flags;
@@ -55,6 +56,16 @@ SessionImpl& Peer::session ()
return m_session;
}
// Indicates that the Handler closed the Session
//
void Peer::close ()
{
// Make sure this happens on an i/o service thread.
m_impl.get_io_service().dispatch (m_strand.wrap (
boost::bind (&Peer::handle_close, Ptr (this),
CompletionCounter (this))));
}
// Cancels all pending i/o and timers and sends tcp shutdown.
//
void Peer::cancel ()
@@ -146,26 +157,23 @@ void Peer::async_read_some ()
CompletionCounter (this))));
}
// Sends a copy of the reply in the session if it is not empty.
// Returns `true` if m_session.closed is `true`
// On return, reply.empty() will return `true`.
//
void Peer::maybe_send_reply ()
{
if (! m_session.reply.empty())
{
async_write (boost::asio::const_buffers_1 (
&m_session.reply.front(), m_session.reply.size()));
m_session.reply.clear();
}
}
// Called when the acceptor gives us the connection.
//
void Peer::handle_accept ()
{
m_callClose = true;
// save remote addr
m_session.remoteAddress = from_asio (
get_socket().remote_endpoint()).withPort (0);
m_impl.handler().onAccept (m_session);
if (m_closed)
{
cancel();
return;
}
m_request_timer.expires_from_now (
boost::posix_time::seconds (
requestTimeoutSeconds));
@@ -238,6 +246,14 @@ void Peer::handle_request_timer (error_code ec, CompletionCounter)
cancel();
}
// Called when the Session is closed by the Handler.
//
void Peer::handle_close (CompletionCounter)
{
m_closed = true;
m_session.handle_close();
}
// Called when async_write completes.
//
void Peer::handle_write (error_code ec, std::size_t bytes_transferred,
@@ -253,7 +269,7 @@ void Peer::handle_write (error_code ec, std::size_t bytes_transferred,
}
bassert (m_writesPending > 0);
if (--m_writesPending == 0 && m_session.closed())
if (--m_writesPending == 0 && m_closed)
{
m_socket->shutdown (socket::shutdown_send);
}
@@ -300,7 +316,7 @@ void Peer::handle_read (error_code ec, std::size_t bytes_transferred, Completion
if (m_parser.fields().size() > 0)
{
handle_headers ();
if (m_session.closed())
if (m_closed)
return;
}
}
@@ -329,8 +345,6 @@ void Peer::handle_headers ()
m_session.headersComplete = m_parser.headersComplete();
m_session.headers = HTTPHeaders (m_parser.fields());
m_impl.handler().onHeaders (m_session);
maybe_send_reply ();
}
// Called when we have a complete http request.
@@ -340,7 +354,7 @@ void Peer::handle_request ()
// This is to guarantee onHeaders is called at least once.
handle_headers();
if (m_session.closed())
if (m_closed)
return;
m_session.request = m_parser.request();
@@ -354,8 +368,6 @@ void Peer::handle_request ()
// Process the HTTPRequest
m_impl.handler().onRequest (m_session);
maybe_send_reply ();
}
}

View File

@@ -47,12 +47,14 @@ public:
HTTPParser m_parser;
SessionImpl m_session;
int m_writesPending;
bool m_closed;
bool m_callClose;
Peer (ServerImpl& impl, Port const& port);
~Peer ();
socket& get_socket();
SessionImpl& session ();
void close ();
void cancel ();
void failed (error_code ec);
void asyncHandlersComplete ();
@@ -79,11 +81,12 @@ public:
}
void async_read_some ();
void maybe_send_reply ();
void handle_accept ();
void handle_handshake (error_code ec, CompletionCounter);
void handle_data_timer (error_code ec, CompletionCounter);
void handle_request_timer (error_code ec, CompletionCounter);
void handle_close (CompletionCounter);
void handle_write (error_code ec, std::size_t bytes_transferred,
SharedBuffer buf, CompletionCounter);

View File

@@ -12,7 +12,6 @@ Session::Session ()
, tag (nullptr)
{
content.reserve (1000);
reply.reserve (1000);
}
ScopedStream Session::operator<< (

View File

@@ -9,7 +9,6 @@ namespace HTTP {
SessionImpl::SessionImpl (Peer& peer)
: m_peer (peer)
, m_closed (false)
{
}
@@ -17,26 +16,32 @@ SessionImpl::~SessionImpl ()
{
}
bool SessionImpl::closed() const
{
return m_closed;
}
void SessionImpl::write (void const* buffer, std::size_t bytes)
{
m_peer.write (buffer, bytes);
}
// Called from an io_service thread
void SessionImpl::handle_close()
{
m_peer_ref = nullptr;
}
void SessionImpl::close()
{
m_closed = true;
m_peer.close();
}
void SessionImpl::detach()
{
if (! m_work)
if (m_detached.compareAndSetBool (1, 0))
{
bassert (! m_work);
bassert (m_peer_ref.empty());
m_peer_ref = &m_peer;
m_work = boost::in_place (boost::ref (
m_peer.m_impl.get_io_service()));
}
}
}

View File

@@ -21,14 +21,15 @@ class SessionImpl : public Session
{
public:
Peer& m_peer;
bool m_closed;
Atomic <int> m_detached;
SharedPtr <Peer> m_peer_ref;
boost::optional <boost::asio::io_service::work> m_work;
explicit SessionImpl (Peer& peer);
~SessionImpl ();
bool closed() const;
void write (void const* buffer, std::size_t bytes);
void close();
void handle_close();
void detach();
};

View File

@@ -84,10 +84,12 @@ public:
// VFALCO NOTE LocalCredentials starts the deprecated UNL service
, m_deprecatedUNL (UniqueNodeList::New (*m_jobQueue))
, m_rpcHTTPServer (RPCHTTPServer::New (*m_jobQueue,
LogJournal::get <HTTPServerLog> (), *m_networkOPs))
, m_rpcHTTPServer (RPCHTTPServer::New (*m_networkOPs,
LogJournal::get <HTTPServerLog> (), *m_jobQueue, *m_networkOPs))
#if ! RIPPLE_USE_RPC_SERVICE_MANAGER
, m_rpcServerHandler (*m_networkOPs) // passive object, not a Service
#endif
, m_nodeStoreScheduler (*m_jobQueue, *m_jobQueue)
@@ -808,7 +810,9 @@ private:
ScopedPointer <NetworkOPs> m_networkOPs;
ScopedPointer <UniqueNodeList> m_deprecatedUNL;
ScopedPointer <RPCHTTPServer> m_rpcHTTPServer;
#if ! RIPPLE_USE_RPC_SERVICE_MANAGER
RPCServerHandler m_rpcServerHandler;
#endif
NodeStoreScheduler m_nodeStoreScheduler;
ScopedPointer <NodeStore::Database> m_nodeStore;
ScopedPointer <SNTPClient> m_sntpClient;

View File

@@ -10,6 +10,8 @@ class RPCHTTPServerImp
, public HTTP::Handler
{
public:
Journal m_journal;
JobQueue& m_jobQueue;
NetworkOPs& m_networkOPs;
RPCServerHandler m_deprecatedHandler;
HTTP::Server m_server;
@@ -17,8 +19,11 @@ public:
RPCHTTPServerImp (Stoppable& parent,
Journal journal,
JobQueue& jobQueue,
NetworkOPs& networkOPs)
: RPCHTTPServer (parent)
, m_journal (journal)
, m_jobQueue (jobQueue)
, m_networkOPs (networkOPs)
, m_deprecatedHandler (networkOPs)
, m_server (*this, journal)
@@ -103,10 +108,15 @@ public:
void onRequest (HTTP::Session& session)
{
session.write (m_deprecatedHandler.processRequest (
session.content, session.remoteAddress.to_string()));
session.close();
#if 0
Job job;
processSession (job, session);
#else
session.detach();
m_jobQueue.addJob (jtRPC, "RPC", bind (
&RPCHTTPServerImp::processSession, this, _1,
ref (session)));
#endif
}
void onClose (HTTP::Session& session)
@@ -117,6 +127,114 @@ public:
{
stopped();
}
//--------------------------------------------------------------------------
void processSession (Job& job, HTTP::Session& session)
{
session.write (m_deprecatedHandler.processRequest (
session.content, session.remoteAddress.to_string()));
session.close();
}
std::string createResponse (
int statusCode,
std::string const& description)
{
return HTTPReply (statusCode, description);
}
bool isAuthorized (
std::map <std::string, std::string> const& headers)
{
return HTTPAuthorized (headers);
}
// Stolen directly from RPCServerHandler
std::string processRequest (std::string const& request, std::string const& remoteAddress)
{
Json::Value jvRequest;
{
Json::Reader reader;
if (! reader.parse (request, jvRequest) ||
jvRequest.isNull () ||
! jvRequest.isObject ())
{
return createResponse (400, "Unable to parse request");
}
}
Config::Role const role (getConfig ().getAdminRole (jvRequest, remoteAddress));
// Parse id now so errors from here on will have the id
//
// VFALCO NOTE Except that "id" isn't included in the following errors...
//
Json::Value const id = jvRequest ["id"];
Json::Value const method = jvRequest ["method"];
if (method.isNull ())
{
return createResponse (400, "Null method");
}
else if (! method.isString ())
{
return createResponse (400, "method is not string");
}
std::string strMethod = method.asString ();
// Parse params
Json::Value params = jvRequest ["params"];
if (params.isNull ())
{
params = Json::Value (Json::arrayValue);
}
else if (!params.isArray ())
{
return HTTPReply (400, "params unparseable");
}
// VFALCO TODO Shouldn't we handle this earlier?
//
if (role == Config::FORBID)
{
// VFALCO TODO Needs implementing
// FIXME Needs implementing
// XXX This needs rate limiting to prevent brute forcing password.
return HTTPReply (403, "Forbidden");
}
// This code does all the work on the io_service thread and
// has no rate-limiting based on source IP or anything.
// This is a temporary safety
if ((role != Config::ADMIN) && (getApp().getFeeTrack().isLoadedLocal()))
{
return HTTPReply (503, "Unable to service at this time");
}
std::string response;
m_journal.debug << "Query: " << strMethod << params;
RPCHandler rpcHandler (&m_networkOPs);
LoadType loadType = LT_RPCReference;
Json::Value const result (rpcHandler.doRpcCommand (
strMethod, params, role, &loadType));
// VFALCO NOTE We discard loadType since there is no endpoint to punish
m_journal.debug << "Reply: " << result;
response = JSONRPCReply (result, Json::Value (), id);
return createResponse (200, response);
}
};
//------------------------------------------------------------------------------
@@ -130,8 +248,9 @@ RPCHTTPServer::RPCHTTPServer (Stoppable& parent)
RPCHTTPServer* RPCHTTPServer::New (Stoppable& parent,
Journal journal,
JobQueue& jobQueue,
NetworkOPs& networkOPs)
{
return new RPCHTTPServerImp (parent, journal, networkOPs);
return new RPCHTTPServerImp (parent, journal, jobQueue, networkOPs);
}

View File

@@ -14,12 +14,12 @@ protected:
public:
static RPCHTTPServer* New (Stoppable& parent,
Journal journal, NetworkOPs& networkOPs);
Journal journal, JobQueue& jobQueue, NetworkOPs& networkOPs);
virtual ~RPCHTTPServer () { }
/** Opens listening ports based on the Config settings. */
virtual void setup(Journal journal) = 0;
virtual void setup (Journal journal) = 0;
};
#endif

View File

@@ -3736,6 +3736,8 @@ Json::Value RPCHandler::doCommand (const Json::Value& params, int iRole, LoadTyp
{
if (iRole != Config::ADMIN)
{
// VFALCO NOTE Should we also add up the jtRPC jobs?
//
int jc = getApp().getJobQueue ().getJobCountGE (jtCLIENT);
if (jc > 500)

View File

@@ -101,6 +101,7 @@ const char* Job::toString (JobType t)
case jtLEDGER_DATA: return "ledgerData";
case jtUPDATE_PF: return "updatePaths";
case jtCLIENT: return "clientCommand";
case jtRPC: return "RPC";
case jtTRANSACTION: return "transaction";
case jtUNL: return "unl";
case jtADVANCE: return "advanceLedger";

View File

@@ -22,34 +22,35 @@ enum JobType
jtLEDGER_DATA = 7, // Received data for a ledger we're acquiring
jtUPDATE_PF = 8, // Update pathfinding requests
jtCLIENT = 9, // A websocket command from the client
jtTRANSACTION = 10, // A transaction received from the network
jtUNL = 11, // A Score or Fetch of the UNL (DEPRECATED)
jtADVANCE = 12, // Advance validated/acquired ledgers
jtPUBLEDGER = 13, // Publish a fully-accepted ledger
jtTXN_DATA = 14, // Fetch a proposed set
jtWAL = 15, // Write-ahead logging
jtVALIDATION_t = 16, // A validation from a trusted source
jtWRITE = 17, // Write out hashed objects
jtPROPOSAL_t = 18, // A proposal from a trusted source
jtSWEEP = 19, // Sweep for stale structures
jtNETOP_CLUSTER = 20, // NetworkOPs cluster peer report
jtNETOP_TIMER = 21, // NetworkOPs net timer processing
jtADMIN = 22, // An administrative operation
jtRPC = 10, // A websocket command from the client
jtTRANSACTION = 11, // A transaction received from the network
jtUNL = 12, // A Score or Fetch of the UNL (DEPRECATED)
jtADVANCE = 13, // Advance validated/acquired ledgers
jtPUBLEDGER = 14, // Publish a fully-accepted ledger
jtTXN_DATA = 15, // Fetch a proposed set
jtWAL = 16, // Write-ahead logging
jtVALIDATION_t = 17, // A validation from a trusted source
jtWRITE = 18, // Write out hashed objects
jtPROPOSAL_t = 19, // A proposal from a trusted source
jtSWEEP = 20, // Sweep for stale structures
jtNETOP_CLUSTER = 21, // NetworkOPs cluster peer report
jtNETOP_TIMER = 22, // NetworkOPs net timer processing
jtADMIN = 23, // An administrative operation
// special types not dispatched by the job pool
jtPEER = 24,
jtDISK = 25,
jtACCEPTLEDGER = 26,
jtTXN_PROC = 27,
jtOB_SETUP = 28,
jtPATH_FIND = 29,
jtHO_READ = 30,
jtHO_WRITE = 31,
jtGENERIC = 32, // Used just to measure time
}; // CAUTION: If you add new types, add them to ripple_Job.cpp too
jtPEER = 30,
jtDISK = 31,
jtACCEPTLEDGER = 32,
jtTXN_PROC = 33,
jtOB_SETUP = 34,
jtPATH_FIND = 35,
jtHO_READ = 36,
jtHO_WRITE = 37,
jtGENERIC = 38, // Used just to measure time
}; // CAUTION: If you add new types, add them to Job.cpp too
// VFALCO TODO move this into the enum so it calculates itself?
#define NUM_JOB_TYPES 48 // why 48 and not 32?
#define NUM_JOB_TYPES 48 // why 48 and not 38?
class Job
{

View File

@@ -590,7 +590,7 @@ private:
default:
// Someone added a JobType but forgot to set a limit.
// Did they also forget to add it to ripple_Job.cpp?
// Did they also forget to add it to Job.cpp?
bassertfalse;
break;
@@ -600,6 +600,7 @@ private:
case jtPROPOSAL_ut:
case jtUPDATE_PF:
case jtCLIENT:
case jtRPC:
case jtTRANSACTION:
case jtPUBLEDGER:
case jtADVANCE: