From d16aa7f928dc04e7d8ce34fef68785af37240701 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sun, 22 Sep 2013 01:07:58 -0700 Subject: [PATCH] Use JobQueue to process RPC-JSON asynchronously --- src/ripple/http/api/Session.h | 7 -- src/ripple/http/impl/Door.cpp | 11 -- src/ripple/http/impl/Peer.cpp | 54 ++++++---- src/ripple/http/impl/Peer.h | 5 +- src/ripple/http/impl/Session.cpp | 1 - src/ripple/http/impl/SessionImpl.cpp | 21 ++-- src/ripple/http/impl/SessionImpl.h | 5 +- src/ripple_app/main/Application.cpp | 8 +- src/ripple_app/main/RPCHTTPServer.cpp | 129 +++++++++++++++++++++++- src/ripple_app/main/RPCHTTPServer.h | 4 +- src/ripple_app/rpc/RPCHandler.cpp | 2 + src/ripple_core/functional/Job.cpp | 1 + src/ripple_core/functional/Job.h | 49 ++++----- src/ripple_core/functional/JobQueue.cpp | 3 +- 14 files changed, 215 insertions(+), 85 deletions(-) diff --git a/src/ripple/http/api/Session.h b/src/ripple/http/api/Session.h index a2721f2ce4..5feb304b95 100644 --- a/src/ripple/http/api/Session.h +++ b/src/ripple/http/api/Session.h @@ -42,13 +42,6 @@ public: /** Input: The Content-Body as a linear buffer if we have the HTTPRequest. */ std::string content; - /** Output: The buffer to send back as a reply. - Upon each entry into the callback, reply.size() will be zero. - If reply.size() is zero when the callback returns, no data is - sent. - */ - std::string reply; - /** A user-definable pointer. The initial value is always zero. Changes to the value are persisted between calls. diff --git a/src/ripple/http/impl/Door.cpp b/src/ripple/http/impl/Door.cpp index af438e066c..59b066599e 100644 --- a/src/ripple/http/impl/Door.cpp +++ b/src/ripple/http/impl/Door.cpp @@ -83,17 +83,6 @@ void Door::handle_accept (error_code ec, Peer::Ptr peer, CompletionCounter) async_accept(); - // Save remote address in session - peer->session().remoteAddress = from_asio ( - peer->get_socket().remote_endpoint()).withPort (0); - m_impl.handler().onAccept (peer->session()); - - if (peer->session().closed()) - { - peer->cancel(); - return; - } - peer->handle_accept(); } diff --git a/src/ripple/http/impl/Peer.cpp b/src/ripple/http/impl/Peer.cpp index aba83de9c8..21123a4827 100644 --- a/src/ripple/http/impl/Peer.cpp +++ b/src/ripple/http/impl/Peer.cpp @@ -16,6 +16,7 @@ Peer::Peer (ServerImpl& impl, Port const& port) , m_parser (HTTPParser::typeRequest) , m_session (*this) , m_writesPending (0) + , m_closed (false) , m_callClose (false) { int flags; @@ -55,6 +56,16 @@ SessionImpl& Peer::session () return m_session; } +// Indicates that the Handler closed the Session +// +void Peer::close () +{ + // Make sure this happens on an i/o service thread. + m_impl.get_io_service().dispatch (m_strand.wrap ( + boost::bind (&Peer::handle_close, Ptr (this), + CompletionCounter (this)))); +} + // Cancels all pending i/o and timers and sends tcp shutdown. // void Peer::cancel () @@ -146,26 +157,23 @@ void Peer::async_read_some () CompletionCounter (this)))); } -// Sends a copy of the reply in the session if it is not empty. -// Returns `true` if m_session.closed is `true` -// On return, reply.empty() will return `true`. -// -void Peer::maybe_send_reply () -{ - if (! m_session.reply.empty()) - { - async_write (boost::asio::const_buffers_1 ( - &m_session.reply.front(), m_session.reply.size())); - m_session.reply.clear(); - } -} - // Called when the acceptor gives us the connection. // void Peer::handle_accept () { m_callClose = true; + // save remote addr + m_session.remoteAddress = from_asio ( + get_socket().remote_endpoint()).withPort (0); + m_impl.handler().onAccept (m_session); + + if (m_closed) + { + cancel(); + return; + } + m_request_timer.expires_from_now ( boost::posix_time::seconds ( requestTimeoutSeconds)); @@ -238,6 +246,14 @@ void Peer::handle_request_timer (error_code ec, CompletionCounter) cancel(); } +// Called when the Session is closed by the Handler. +// +void Peer::handle_close (CompletionCounter) +{ + m_closed = true; + m_session.handle_close(); +} + // Called when async_write completes. // void Peer::handle_write (error_code ec, std::size_t bytes_transferred, @@ -253,7 +269,7 @@ void Peer::handle_write (error_code ec, std::size_t bytes_transferred, } bassert (m_writesPending > 0); - if (--m_writesPending == 0 && m_session.closed()) + if (--m_writesPending == 0 && m_closed) { m_socket->shutdown (socket::shutdown_send); } @@ -300,7 +316,7 @@ void Peer::handle_read (error_code ec, std::size_t bytes_transferred, Completion if (m_parser.fields().size() > 0) { handle_headers (); - if (m_session.closed()) + if (m_closed) return; } } @@ -329,8 +345,6 @@ void Peer::handle_headers () m_session.headersComplete = m_parser.headersComplete(); m_session.headers = HTTPHeaders (m_parser.fields()); m_impl.handler().onHeaders (m_session); - - maybe_send_reply (); } // Called when we have a complete http request. @@ -340,7 +354,7 @@ void Peer::handle_request () // This is to guarantee onHeaders is called at least once. handle_headers(); - if (m_session.closed()) + if (m_closed) return; m_session.request = m_parser.request(); @@ -354,8 +368,6 @@ void Peer::handle_request () // Process the HTTPRequest m_impl.handler().onRequest (m_session); - - maybe_send_reply (); } } diff --git a/src/ripple/http/impl/Peer.h b/src/ripple/http/impl/Peer.h index 16e4de88a5..7b4d5405da 100644 --- a/src/ripple/http/impl/Peer.h +++ b/src/ripple/http/impl/Peer.h @@ -47,12 +47,14 @@ public: HTTPParser m_parser; SessionImpl m_session; int m_writesPending; + bool m_closed; bool m_callClose; Peer (ServerImpl& impl, Port const& port); ~Peer (); socket& get_socket(); SessionImpl& session (); + void close (); void cancel (); void failed (error_code ec); void asyncHandlersComplete (); @@ -79,11 +81,12 @@ public: } void async_read_some (); - void maybe_send_reply (); + void handle_accept (); void handle_handshake (error_code ec, CompletionCounter); void handle_data_timer (error_code ec, CompletionCounter); void handle_request_timer (error_code ec, CompletionCounter); + void handle_close (CompletionCounter); void handle_write (error_code ec, std::size_t bytes_transferred, SharedBuffer buf, CompletionCounter); diff --git a/src/ripple/http/impl/Session.cpp b/src/ripple/http/impl/Session.cpp index 25ae5ab750..08eb153a78 100644 --- a/src/ripple/http/impl/Session.cpp +++ b/src/ripple/http/impl/Session.cpp @@ -12,7 +12,6 @@ Session::Session () , tag (nullptr) { content.reserve (1000); - reply.reserve (1000); } ScopedStream Session::operator<< ( diff --git a/src/ripple/http/impl/SessionImpl.cpp b/src/ripple/http/impl/SessionImpl.cpp index c300042c55..fab4fcef6b 100644 --- a/src/ripple/http/impl/SessionImpl.cpp +++ b/src/ripple/http/impl/SessionImpl.cpp @@ -9,7 +9,6 @@ namespace HTTP { SessionImpl::SessionImpl (Peer& peer) : m_peer (peer) - , m_closed (false) { } @@ -17,26 +16,32 @@ SessionImpl::~SessionImpl () { } -bool SessionImpl::closed() const -{ - return m_closed; -} - void SessionImpl::write (void const* buffer, std::size_t bytes) { m_peer.write (buffer, bytes); } +// Called from an io_service thread +void SessionImpl::handle_close() +{ + m_peer_ref = nullptr; +} + void SessionImpl::close() { - m_closed = true; + m_peer.close(); } void SessionImpl::detach() { - if (! m_work) + if (m_detached.compareAndSetBool (1, 0)) + { + bassert (! m_work); + bassert (m_peer_ref.empty()); + m_peer_ref = &m_peer; m_work = boost::in_place (boost::ref ( m_peer.m_impl.get_io_service())); + } } } diff --git a/src/ripple/http/impl/SessionImpl.h b/src/ripple/http/impl/SessionImpl.h index dd13a57cf7..055c9048c1 100644 --- a/src/ripple/http/impl/SessionImpl.h +++ b/src/ripple/http/impl/SessionImpl.h @@ -21,14 +21,15 @@ class SessionImpl : public Session { public: Peer& m_peer; - bool m_closed; + Atomic m_detached; + SharedPtr m_peer_ref; boost::optional m_work; explicit SessionImpl (Peer& peer); ~SessionImpl (); - bool closed() const; void write (void const* buffer, std::size_t bytes); void close(); + void handle_close(); void detach(); }; diff --git a/src/ripple_app/main/Application.cpp b/src/ripple_app/main/Application.cpp index 0fb91d6c42..9785910e26 100644 --- a/src/ripple_app/main/Application.cpp +++ b/src/ripple_app/main/Application.cpp @@ -84,10 +84,12 @@ public: // VFALCO NOTE LocalCredentials starts the deprecated UNL service , m_deprecatedUNL (UniqueNodeList::New (*m_jobQueue)) - , m_rpcHTTPServer (RPCHTTPServer::New (*m_jobQueue, - LogJournal::get (), *m_networkOPs)) + , m_rpcHTTPServer (RPCHTTPServer::New (*m_networkOPs, + LogJournal::get (), *m_jobQueue, *m_networkOPs)) +#if ! RIPPLE_USE_RPC_SERVICE_MANAGER , m_rpcServerHandler (*m_networkOPs) // passive object, not a Service +#endif , m_nodeStoreScheduler (*m_jobQueue, *m_jobQueue) @@ -808,7 +810,9 @@ private: ScopedPointer m_networkOPs; ScopedPointer m_deprecatedUNL; ScopedPointer m_rpcHTTPServer; +#if ! RIPPLE_USE_RPC_SERVICE_MANAGER RPCServerHandler m_rpcServerHandler; +#endif NodeStoreScheduler m_nodeStoreScheduler; ScopedPointer m_nodeStore; ScopedPointer m_sntpClient; diff --git a/src/ripple_app/main/RPCHTTPServer.cpp b/src/ripple_app/main/RPCHTTPServer.cpp index 31f49ab015..b43e4dd123 100644 --- a/src/ripple_app/main/RPCHTTPServer.cpp +++ b/src/ripple_app/main/RPCHTTPServer.cpp @@ -10,6 +10,8 @@ class RPCHTTPServerImp , public HTTP::Handler { public: + Journal m_journal; + JobQueue& m_jobQueue; NetworkOPs& m_networkOPs; RPCServerHandler m_deprecatedHandler; HTTP::Server m_server; @@ -17,8 +19,11 @@ public: RPCHTTPServerImp (Stoppable& parent, Journal journal, + JobQueue& jobQueue, NetworkOPs& networkOPs) : RPCHTTPServer (parent) + , m_journal (journal) + , m_jobQueue (jobQueue) , m_networkOPs (networkOPs) , m_deprecatedHandler (networkOPs) , m_server (*this, journal) @@ -103,10 +108,15 @@ public: void onRequest (HTTP::Session& session) { - session.write (m_deprecatedHandler.processRequest ( - session.content, session.remoteAddress.to_string())); - - session.close(); +#if 0 + Job job; + processSession (job, session); +#else + session.detach(); + m_jobQueue.addJob (jtRPC, "RPC", bind ( + &RPCHTTPServerImp::processSession, this, _1, + ref (session))); +#endif } void onClose (HTTP::Session& session) @@ -117,6 +127,114 @@ public: { stopped(); } + + //-------------------------------------------------------------------------- + + void processSession (Job& job, HTTP::Session& session) + { + session.write (m_deprecatedHandler.processRequest ( + session.content, session.remoteAddress.to_string())); + + session.close(); + } + + std::string createResponse ( + int statusCode, + std::string const& description) + { + return HTTPReply (statusCode, description); + } + + bool isAuthorized ( + std::map const& headers) + { + return HTTPAuthorized (headers); + } + + // Stolen directly from RPCServerHandler + std::string processRequest (std::string const& request, std::string const& remoteAddress) + { + Json::Value jvRequest; + { + Json::Reader reader; + + if (! reader.parse (request, jvRequest) || + jvRequest.isNull () || + ! jvRequest.isObject ()) + { + return createResponse (400, "Unable to parse request"); + } + } + + Config::Role const role (getConfig ().getAdminRole (jvRequest, remoteAddress)); + + // Parse id now so errors from here on will have the id + // + // VFALCO NOTE Except that "id" isn't included in the following errors... + // + Json::Value const id = jvRequest ["id"]; + + Json::Value const method = jvRequest ["method"]; + + if (method.isNull ()) + { + return createResponse (400, "Null method"); + } + else if (! method.isString ()) + { + return createResponse (400, "method is not string"); + } + + std::string strMethod = method.asString (); + + // Parse params + Json::Value params = jvRequest ["params"]; + + if (params.isNull ()) + { + params = Json::Value (Json::arrayValue); + } + else if (!params.isArray ()) + { + return HTTPReply (400, "params unparseable"); + } + + // VFALCO TODO Shouldn't we handle this earlier? + // + if (role == Config::FORBID) + { + // VFALCO TODO Needs implementing + // FIXME Needs implementing + // XXX This needs rate limiting to prevent brute forcing password. + return HTTPReply (403, "Forbidden"); + } + + // This code does all the work on the io_service thread and + // has no rate-limiting based on source IP or anything. + // This is a temporary safety + if ((role != Config::ADMIN) && (getApp().getFeeTrack().isLoadedLocal())) + { + return HTTPReply (503, "Unable to service at this time"); + } + + std::string response; + + m_journal.debug << "Query: " << strMethod << params; + + RPCHandler rpcHandler (&m_networkOPs); + + LoadType loadType = LT_RPCReference; + + Json::Value const result (rpcHandler.doRpcCommand ( + strMethod, params, role, &loadType)); + // VFALCO NOTE We discard loadType since there is no endpoint to punish + + m_journal.debug << "Reply: " << result; + + response = JSONRPCReply (result, Json::Value (), id); + + return createResponse (200, response); + } }; //------------------------------------------------------------------------------ @@ -130,8 +248,9 @@ RPCHTTPServer::RPCHTTPServer (Stoppable& parent) RPCHTTPServer* RPCHTTPServer::New (Stoppable& parent, Journal journal, + JobQueue& jobQueue, NetworkOPs& networkOPs) { - return new RPCHTTPServerImp (parent, journal, networkOPs); + return new RPCHTTPServerImp (parent, journal, jobQueue, networkOPs); } diff --git a/src/ripple_app/main/RPCHTTPServer.h b/src/ripple_app/main/RPCHTTPServer.h index 9898e1e41d..4052b8f531 100644 --- a/src/ripple_app/main/RPCHTTPServer.h +++ b/src/ripple_app/main/RPCHTTPServer.h @@ -14,12 +14,12 @@ protected: public: static RPCHTTPServer* New (Stoppable& parent, - Journal journal, NetworkOPs& networkOPs); + Journal journal, JobQueue& jobQueue, NetworkOPs& networkOPs); virtual ~RPCHTTPServer () { } /** Opens listening ports based on the Config settings. */ - virtual void setup(Journal journal) = 0; + virtual void setup (Journal journal) = 0; }; #endif diff --git a/src/ripple_app/rpc/RPCHandler.cpp b/src/ripple_app/rpc/RPCHandler.cpp index c60c1c2ce7..3911ebdf93 100644 --- a/src/ripple_app/rpc/RPCHandler.cpp +++ b/src/ripple_app/rpc/RPCHandler.cpp @@ -3736,6 +3736,8 @@ Json::Value RPCHandler::doCommand (const Json::Value& params, int iRole, LoadTyp { if (iRole != Config::ADMIN) { + // VFALCO NOTE Should we also add up the jtRPC jobs? + // int jc = getApp().getJobQueue ().getJobCountGE (jtCLIENT); if (jc > 500) diff --git a/src/ripple_core/functional/Job.cpp b/src/ripple_core/functional/Job.cpp index 83713a295a..62d86f165f 100644 --- a/src/ripple_core/functional/Job.cpp +++ b/src/ripple_core/functional/Job.cpp @@ -101,6 +101,7 @@ const char* Job::toString (JobType t) case jtLEDGER_DATA: return "ledgerData"; case jtUPDATE_PF: return "updatePaths"; case jtCLIENT: return "clientCommand"; + case jtRPC: return "RPC"; case jtTRANSACTION: return "transaction"; case jtUNL: return "unl"; case jtADVANCE: return "advanceLedger"; diff --git a/src/ripple_core/functional/Job.h b/src/ripple_core/functional/Job.h index 293fac2b60..6d7717383b 100644 --- a/src/ripple_core/functional/Job.h +++ b/src/ripple_core/functional/Job.h @@ -22,34 +22,35 @@ enum JobType jtLEDGER_DATA = 7, // Received data for a ledger we're acquiring jtUPDATE_PF = 8, // Update pathfinding requests jtCLIENT = 9, // A websocket command from the client - jtTRANSACTION = 10, // A transaction received from the network - jtUNL = 11, // A Score or Fetch of the UNL (DEPRECATED) - jtADVANCE = 12, // Advance validated/acquired ledgers - jtPUBLEDGER = 13, // Publish a fully-accepted ledger - jtTXN_DATA = 14, // Fetch a proposed set - jtWAL = 15, // Write-ahead logging - jtVALIDATION_t = 16, // A validation from a trusted source - jtWRITE = 17, // Write out hashed objects - jtPROPOSAL_t = 18, // A proposal from a trusted source - jtSWEEP = 19, // Sweep for stale structures - jtNETOP_CLUSTER = 20, // NetworkOPs cluster peer report - jtNETOP_TIMER = 21, // NetworkOPs net timer processing - jtADMIN = 22, // An administrative operation + jtRPC = 10, // A websocket command from the client + jtTRANSACTION = 11, // A transaction received from the network + jtUNL = 12, // A Score or Fetch of the UNL (DEPRECATED) + jtADVANCE = 13, // Advance validated/acquired ledgers + jtPUBLEDGER = 14, // Publish a fully-accepted ledger + jtTXN_DATA = 15, // Fetch a proposed set + jtWAL = 16, // Write-ahead logging + jtVALIDATION_t = 17, // A validation from a trusted source + jtWRITE = 18, // Write out hashed objects + jtPROPOSAL_t = 19, // A proposal from a trusted source + jtSWEEP = 20, // Sweep for stale structures + jtNETOP_CLUSTER = 21, // NetworkOPs cluster peer report + jtNETOP_TIMER = 22, // NetworkOPs net timer processing + jtADMIN = 23, // An administrative operation // special types not dispatched by the job pool - jtPEER = 24, - jtDISK = 25, - jtACCEPTLEDGER = 26, - jtTXN_PROC = 27, - jtOB_SETUP = 28, - jtPATH_FIND = 29, - jtHO_READ = 30, - jtHO_WRITE = 31, - jtGENERIC = 32, // Used just to measure time -}; // CAUTION: If you add new types, add them to ripple_Job.cpp too + jtPEER = 30, + jtDISK = 31, + jtACCEPTLEDGER = 32, + jtTXN_PROC = 33, + jtOB_SETUP = 34, + jtPATH_FIND = 35, + jtHO_READ = 36, + jtHO_WRITE = 37, + jtGENERIC = 38, // Used just to measure time +}; // CAUTION: If you add new types, add them to Job.cpp too // VFALCO TODO move this into the enum so it calculates itself? -#define NUM_JOB_TYPES 48 // why 48 and not 32? +#define NUM_JOB_TYPES 48 // why 48 and not 38? class Job { diff --git a/src/ripple_core/functional/JobQueue.cpp b/src/ripple_core/functional/JobQueue.cpp index bb458c30b3..c51c9ecb39 100644 --- a/src/ripple_core/functional/JobQueue.cpp +++ b/src/ripple_core/functional/JobQueue.cpp @@ -590,7 +590,7 @@ private: default: // Someone added a JobType but forgot to set a limit. - // Did they also forget to add it to ripple_Job.cpp? + // Did they also forget to add it to Job.cpp? bassertfalse; break; @@ -600,6 +600,7 @@ private: case jtPROPOSAL_ut: case jtUPDATE_PF: case jtCLIENT: + case jtRPC: case jtTRANSACTION: case jtPUBLEDGER: case jtADVANCE: