From a2151bfa4792551f547330f416c32f8a9cfda688 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sun, 22 Sep 2013 00:13:58 -0700 Subject: [PATCH] Split HTTP::Server to its own module --- Builds/QtCreator/rippled.pro | 3 +- Builds/VisualStudio2012/RippleD.vcxproj | 59 +- .../VisualStudio2012/RippleD.vcxproj.filters | 108 +- SConstruct | 3 +- src/ripple/frame/api/HTTPServer.cpp | 1016 ----------------- src/ripple/frame/api/HTTPServer.h | 263 ----- src/ripple/http/api/Handler.h | 47 + src/ripple/http/api/Port.h | 51 + src/ripple/http/api/ScopedStream.h | 57 + src/ripple/http/api/Server.h | 70 ++ src/ripple/http/api/Session.h | 112 ++ src/ripple/http/impl/Door.cpp | 102 ++ src/ripple/http/impl/Door.h | 42 + src/ripple/http/impl/Peer.cpp | 362 ++++++ src/ripple/http/impl/Peer.h | 101 ++ src/ripple/http/impl/Port.cpp | 77 ++ src/ripple/http/impl/ScopedStream.cpp | 44 + src/ripple/http/impl/Server.cpp | 46 + src/ripple/http/impl/ServerImpl.h | 246 ++++ src/ripple/http/impl/Session.cpp | 25 + src/ripple/http/impl/SessionImpl.cpp | 43 + src/ripple/http/impl/SessionImpl.h | 38 + src/ripple/http/impl/Types.h | 58 + src/ripple/http/ripple_http.cpp | 29 + .../ripple_frame.h => http/ripple_http.h} | 13 +- src/ripple/{frame => rpc}/api/RPCService.cpp | 0 src/ripple/{frame => rpc}/api/RPCService.h | 36 - .../ripple_frame.cpp => rpc/ripple_rpc.cpp} | 5 +- src/ripple/rpc/ripple_rpc.h | 16 + src/ripple/validators/ripple_validators.h | 3 +- src/ripple_app/main/RPCHTTPServer.cpp | 20 +- src/ripple_app/ripple_app.cpp | 2 + src/ripple_app/ripple_app.h | 1 - src/ripple_core/ripple_core.h | 2 +- 34 files changed, 1729 insertions(+), 1371 deletions(-) delete mode 100644 src/ripple/frame/api/HTTPServer.cpp delete mode 100644 src/ripple/frame/api/HTTPServer.h create mode 100644 src/ripple/http/api/Handler.h create mode 100644 src/ripple/http/api/Port.h create mode 100644 src/ripple/http/api/ScopedStream.h create mode 100644 src/ripple/http/api/Server.h create mode 100644 src/ripple/http/api/Session.h create mode 100644 src/ripple/http/impl/Door.cpp create mode 100644 src/ripple/http/impl/Door.h create mode 100644 src/ripple/http/impl/Peer.cpp create mode 100644 src/ripple/http/impl/Peer.h create mode 100644 src/ripple/http/impl/Port.cpp create mode 100644 src/ripple/http/impl/ScopedStream.cpp create mode 100644 src/ripple/http/impl/Server.cpp create mode 100644 src/ripple/http/impl/ServerImpl.h create mode 100644 src/ripple/http/impl/Session.cpp create mode 100644 src/ripple/http/impl/SessionImpl.cpp create mode 100644 src/ripple/http/impl/SessionImpl.h create mode 100644 src/ripple/http/impl/Types.h create mode 100644 src/ripple/http/ripple_http.cpp rename src/ripple/{frame/ripple_frame.h => http/ripple_http.h} (69%) rename src/ripple/{frame => rpc}/api/RPCService.cpp (100%) rename src/ripple/{frame => rpc}/api/RPCService.h (75%) rename src/ripple/{frame/ripple_frame.cpp => rpc/ripple_rpc.cpp} (77%) create mode 100644 src/ripple/rpc/ripple_rpc.h diff --git a/Builds/QtCreator/rippled.pro b/Builds/QtCreator/rippled.pro index a29bcfb49..f31d9f678 100644 --- a/Builds/QtCreator/rippled.pro +++ b/Builds/QtCreator/rippled.pro @@ -65,8 +65,9 @@ UI_HEADERS_DIR += ../../src/ripple_basics SOURCES += \ ../../src/ripple/beast/ripple_beast.cpp \ ../../src/ripple/beast/ripple_beastc.c \ - ../../src/ripple/frame/ripple_frame.cpp \ + ../../src/ripple/http/ripple_http.cpp \ ../../src/ripple/json/ripple_json.cpp \ + ../../src/ripple/rpc/ripple_rpc.cpp \ ../../src/ripple/sophia/ripple_sophia.c \ ../../src/ripple/testoverlay/ripple_testoverlay.cpp \ ../../src/ripple/validators/ripple_validators.cpp diff --git a/Builds/VisualStudio2012/RippleD.vcxproj b/Builds/VisualStudio2012/RippleD.vcxproj index fc31aea31..1c860a8a9 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj +++ b/Builds/VisualStudio2012/RippleD.vcxproj @@ -22,19 +22,49 @@ - + true true true true - + true true true true - + + true + true + true + true + + + true + true + true + true + + + true + true + true + true + + + true + true + true + true + + + true + true + true + true + + true true @@ -54,6 +84,13 @@ true + + true + true + true + true + + true @@ -1505,9 +1542,17 @@ - - - + + + + + + + + + + + @@ -1517,6 +1562,8 @@ + + diff --git a/Builds/VisualStudio2012/RippleD.vcxproj.filters b/Builds/VisualStudio2012/RippleD.vcxproj.filters index 8f87ff90d..7ed0294cc 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2012/RippleD.vcxproj.filters @@ -181,12 +181,6 @@ {a56f8183-6f95-4455-a6f7-097eb930abaa} - - {e8545fbd-90eb-4586-9ee2-8a5956808b1a} - - - {a4dd852c-651b-4ea9-a051-252dc0eaea24} - {490d0cdb-7ce1-42f2-b234-6874dae550b7} @@ -202,6 +196,21 @@ {071582fa-cf16-4e41-8791-613cfe00eef8} + + {7abb5fcf-8793-45d0-95db-0cf448198765} + + + {005e1f40-38ac-4904-af7c-4a018c5662f4} + + + {98e572a2-c89a-4ab7-b1d5-7687786e48dd} + + + {c7cc6c12-0c92-423e-9394-f5df64b4ddd7} + + + {386ebc1c-0cbe-43a6-b48e-ac3c503da0ee} + @@ -921,9 +930,6 @@ [1] Ripple\json - - [1] Ripple\frame - [2] Old Ripple\ripple_core\nodestore @@ -978,18 +984,42 @@ [2] Old Ripple\ripple_app\node - - [1] Ripple\frame\api - [2] Old Ripple\ripple_app\main - - [1] Ripple\frame\api - [2] Old Ripple\ripple_app\main + + [1] Ripple\rpc + + + [1] Ripple\rpc\api + + + [1] Ripple\http + + + [1] Ripple\http\impl + + + [1] Ripple\http\impl + + + [1] Ripple\http\impl + + + [1] Ripple\http\impl + + + [1] Ripple\http\impl + + + [1] Ripple\http\impl + + + [1] Ripple\http\impl + @@ -1869,9 +1899,6 @@ [1] Ripple\json\api - - [1] Ripple\frame - [2] Old Ripple\ripple_core\nodestore @@ -1944,18 +1971,51 @@ [2] Old Ripple\ripple_app\node - - [1] Ripple\frame\api - [2] Old Ripple\ripple_app\main - - [1] Ripple\frame\api - [2] Old Ripple\ripple_app\main + + [1] Ripple\rpc + + + [1] Ripple\rpc\api + + + [1] Ripple\http + + + [1] Ripple\http\api + + + [1] Ripple\http\api + + + [1] Ripple\http\api + + + [1] Ripple\http\api + + + [1] Ripple\http\api + + + [1] Ripple\http\impl + + + [1] Ripple\http\impl + + + [1] Ripple\http\impl + + + [1] Ripple\http\impl + + + [1] Ripple\http\impl + diff --git a/SConstruct b/SConstruct index d033097da..a586af0d2 100644 --- a/SConstruct +++ b/SConstruct @@ -146,8 +146,9 @@ COMPILED_FILES.extend (['src/ripple/beast/ripple_beastc.c']) # New-style Ripple unity sources # COMPILED_FILES.extend([ - 'src/ripple/frame/ripple_frame.cpp', + 'src/ripple/http/ripple_http.cpp', 'src/ripple/json/ripple_json.cpp', + 'src/ripple/rpc/ripple_rpc.cpp', 'src/ripple/sophia/ripple_sophia.c', 'src/ripple/testoverlay/ripple_testoverlay.cpp', 'src/ripple/validators/ripple_validators.cpp' diff --git a/src/ripple/frame/api/HTTPServer.cpp b/src/ripple/frame/api/HTTPServer.cpp deleted file mode 100644 index 7969e1e6e..000000000 --- a/src/ripple/frame/api/HTTPServer.cpp +++ /dev/null @@ -1,1016 +0,0 @@ -//------------------------------------------------------------------------------ -/* - Copyright (c) 2011-2013, OpenCoin, Inc. -*/ -//============================================================================== - -#include "../ripple_net/ripple_net.h" - -namespace ripple { - -using namespace beast; - -//------------------------------------------------------------------------------ - -HTTPServer::Port::Port () - : port (0) - , security (no_ssl) - , context (nullptr) -{ -} - -HTTPServer::Port::Port (Port const& other) - : port (other.port) - , addr (other.addr) - , security (other.security) - , context (other.context) -{ -} - -HTTPServer::Port& HTTPServer::Port::operator= (Port const& other) -{ - port = other.port; - addr = other.addr; - security = other.security; - context = other.context; - return *this; -} - -HTTPServer::Port::Port ( - uint16 port_, - IPEndpoint const& addr_, - Security security_, - SSLContext* context_) - : port (port_) - , addr (addr_) - , security (security_) - , context (context_) -{ -} - -int compare (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs) -{ - int comp; - - comp = compare (lhs.addr, rhs.addr); - if (comp != 0) - return comp; - - if (lhs.port < rhs.port) - return -1; - else if (lhs.port > rhs.port) - return 1; - - if (lhs.security < rhs.security) - return -1; - else if (lhs.security > rhs.security) - return 1; - - // 'context' does not participate in the comparison - - return 0; -} - -bool operator== (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs) { return compare (lhs, rhs) == 0; } -bool operator!= (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs) { return compare (lhs, rhs) != 0; } -bool operator< (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs) { return compare (lhs, rhs) < 0; } -bool operator<= (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs) { return compare (lhs, rhs) <= 0; } -bool operator> (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs) { return compare (lhs, rhs) > 0; } -bool operator>= (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs) { return compare (lhs, rhs) >= 0; } - -//------------------------------------------------------------------------------ - -HTTPServer::ScopedStream::ScopedStream (Session& session) - : m_session (session) -{ -} - -HTTPServer::ScopedStream::ScopedStream (ScopedStream const& other) - : m_session (other.m_session) -{ -} - -HTTPServer::ScopedStream::ScopedStream (Session& session, - std::ostream& manip (std::ostream&)) - : m_session (session) -{ - m_ostream << manip; -} - -HTTPServer::ScopedStream::~ScopedStream () -{ - if (! m_ostream.str().empty()) - m_session.write (m_ostream.str()); -} - -std::ostream& HTTPServer::ScopedStream::operator<< (std::ostream& manip (std::ostream&)) const -{ - return m_ostream << manip; -} - -std::ostringstream& HTTPServer::ScopedStream::ostream () const -{ - return m_ostream; -} - -//------------------------------------------------------------------------------ - -HTTPServer::Session::Session () - : headersComplete (false) - , tag (nullptr) -{ - content.reserve (1000); - reply.reserve (1000); -} - -HTTPServer::ScopedStream HTTPServer::Session::operator<< ( - std::ostream& manip (std::ostream&)) -{ - return ScopedStream (*this, manip); -} - -//------------------------------------------------------------------------------ - -class HTTPServer::Impl : public Thread -{ -public: - typedef boost::system::error_code error_code; - typedef boost::asio::ip::tcp Protocol; - typedef boost::asio::ip::address address; - typedef Protocol::endpoint endpoint_t; - typedef Protocol::acceptor acceptor; - typedef Protocol::socket socket; - - static std::string to_string (address const& addr) - { - return addr.to_string(); - } - - static std::string to_string (endpoint_t const& endpoint) - { - std::stringstream ss; - ss << to_string (endpoint.address()); - if (endpoint.port() != 0) - ss << ":" << std::dec << endpoint.port(); - return std::string (ss.str()); - } - - static endpoint_t to_asio (Port const& port) - { - if (port.addr.isV4()) - { - IPEndpoint::V4 v4 (port.addr.v4()); - std::string const& s (v4.to_string()); - return endpoint_t (address().from_string (s), port.port); - } - - //IPEndpoint::V6 v6 (ep.v6()); - return endpoint_t (); - } - - static IPEndpoint from_asio (endpoint_t const& endpoint) - { - std::stringstream ss (to_string (endpoint)); - IPEndpoint ep; - ss >> ep; - return ep; - } - - //-------------------------------------------------------------------------- - - // Holds the copy of buffers being sent - typedef SharedArg SharedBuffer; - - class Peer; - - class SessionImp : public Session - { - public: - Peer& m_peer; - bool m_closed; - boost::optional m_work; - - explicit SessionImp (Peer& peer) - : m_peer (peer) - , m_closed (false) - { - } - - ~SessionImp () - { - } - - bool closed() const - { - return m_closed; - } - - void write (void const* buffer, std::size_t bytes) - { - m_peer.write (buffer, bytes); - } - - void close() - { - m_closed = true; - } - - void detach() - { - if (! m_work) - m_work = boost::in_place (boost::ref ( - m_peer.m_impl.get_io_service())); - } - }; - - //-------------------------------------------------------------------------- - - /** Represents an active connection. */ - class Peer - : public SharedObject - , public AsyncObject - , public List ::Node - , public LeakChecked - { - public: - enum - { - // Size of our receive buffer - bufferSize = 8192, - - // Largest HTTP request allowed - maxRequestBytes = 32 * 1024, - - // Max seconds without receiving a byte - dataTimeoutSeconds = 10, - - // Max seconds without completing the request - requestTimeoutSeconds = 30 - - }; - - typedef SharedPtr Ptr; - - Impl& m_impl; - boost::asio::io_service::strand m_strand; - boost::asio::deadline_timer m_data_timer; - boost::asio::deadline_timer m_request_timer; - ScopedPointer m_socket; - MemoryBlock m_buffer; - HTTPParser m_parser; - SessionImp m_session; - int m_writesPending; - bool m_callClose; - - Peer (Impl& impl, Port const& port) - : m_impl (impl) - , m_strand (m_impl.get_io_service()) - , m_data_timer (m_impl.get_io_service()) - , m_request_timer (m_impl.get_io_service()) - , m_buffer (bufferSize) - , m_parser (HTTPParser::typeRequest) - , m_session (*this) - , m_writesPending (0) - , m_callClose (false) - { - int flags; - switch (port.security) - { - default: - bassertfalse; - case Port::no_ssl: flags = MultiSocket::none; break; - case Port::allow_ssl: flags = MultiSocket::server_ssl; break; - case Port::require_ssl: flags = MultiSocket::server_ssl_required; break; - } - - m_socket = MultiSocket::New (m_impl.get_io_service(), port.context->get(), flags); - - m_impl.add (*this); - } - - ~Peer () - { - if (m_callClose) - m_impl.handler().onClose (m_session); - - m_impl.remove (*this); - } - - // Returns the asio socket for the peer. - // - socket& get_socket() - { - return m_socket->this_layer(); - } - - // Return the Session associated with this peer's session. - // - SessionImp& session () - { - return m_session; - } - - // Cancels all pending i/o and timers and sends tcp shutdown. - // - void cancel () - { - error_code ec; - m_data_timer.cancel (ec); - m_request_timer.cancel (ec); - m_socket->cancel (ec); - m_socket->shutdown (socket::shutdown_both); - } - - // Called when I/O completes with an error that is not eof or aborted. - // - void failed (error_code ec) - { - cancel (); - } - - // Called when there are no more completion handlers pending. - // - void asyncHandlersComplete () - { - } - - // Send a copy of the data. - // - void write (void const* buffer, std::size_t bytes) - { - SharedBuffer buf (static_cast (buffer), bytes); - // Make sure this happens on an i/o service thread. - m_impl.get_io_service().dispatch (m_strand.wrap ( - boost::bind (&Peer::handle_write, Ptr (this), - buf, CompletionCounter (this)))); - } - - // Called from an io_service thread to write the shared buffer. - // - void handle_write (SharedBuffer const& buf, CompletionCounter) - { - async_write (buf); - } - - // Send a shared buffer - // - void async_write (SharedBuffer const& buf) - { - bassert (buf.get().size() > 0); - - ++m_writesPending; - - // Send the copy. We pass the SharedArg in the last parameter - // so that a reference is maintained as the handler gets copied. - // When the final completion function returns, the reference - // count will drop to zero and the buffer will be freed. - // - boost::asio::async_write (*m_socket, - boost::asio::const_buffers_1 (&buf->front(), buf->size()), - m_strand.wrap (boost::bind (&Peer::handle_write, - Ptr (this), boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred, - buf, CompletionCounter (this)))); - } - - // Send a copy of the buffer sequence. - // - template - void async_write (BufferSequence const& buffers) - { - // Count the number of buffers - std::size_t const nbuf (std::distance ( - buffers.begin(), buffers.end())); - - // Iterate over each linear vector in the BufferSequence. - for (typename BufferSequence::const_iterator iter (buffers.begin()); - iter != buffers.end(); ++iter) - { - typename BufferSequence::value_type const& buffer (*iter); - - // Put a copy of this section of the buffer sequence into - // a reference counted, shared container. - // - SharedBuffer buf ( - boost::asio::buffer_cast (buffer), - boost::asio::buffer_size (buffer)); - - async_write (buf); - } - } - - // Calls the async_read_some initiating function. - // - void async_read_some () - { - // re-arm the data timer - // (this cancels the previous wait, if any) - // - m_data_timer.expires_from_now ( - boost::posix_time::seconds ( - dataTimeoutSeconds)); - - m_data_timer.async_wait (m_strand.wrap (boost::bind ( - &Peer::handle_data_timer, Ptr(this), - boost::asio::placeholders::error, - CompletionCounter (this)))); - - // issue the read - // - boost::asio::mutable_buffers_1 buf ( - m_buffer.getData (), m_buffer.getSize ()); - - m_socket->async_read_some (buf, m_strand.wrap ( - boost::bind (&Peer::handle_read, Ptr (this), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred, - CompletionCounter (this)))); - } - - // Sends a copy of the reply in the session if it is not empty. - // Returns `true` if m_session.closed is `true` - // On return, reply.empty() will return `true`. - // - void maybe_send_reply () - { - if (! m_session.reply.empty()) - { - async_write (boost::asio::const_buffers_1 ( - &m_session.reply.front(), m_session.reply.size())); - m_session.reply.clear(); - } - } - - // Called when the acceptor gives us the connection. - // - void handle_accept () - { - m_callClose = true; - - m_request_timer.expires_from_now ( - boost::posix_time::seconds ( - requestTimeoutSeconds)); - - m_request_timer.async_wait (m_strand.wrap (boost::bind ( - &Peer::handle_request_timer, Ptr(this), - boost::asio::placeholders::error, - CompletionCounter (this)))); - - if (m_socket->needs_handshake ()) - { - m_socket->async_handshake (Socket::server, m_strand.wrap ( - boost::bind (&Peer::handle_handshake, Ptr(this), - boost::asio::placeholders::error, - CompletionCounter (this)))); - } - else - { - async_read_some(); - } - } - - // Called when the handshake completes - // - void handle_handshake (error_code ec, CompletionCounter) - { - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec != 0) - { - // fail - return; - } - - async_read_some(); - } - - // Called when the data timer expires - // - void handle_data_timer (error_code ec, CompletionCounter) - { - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec != 0) - { - // fail - return; - } - - // They took too long to send any bytes - cancel(); - } - - // Called when the request timer expires - // - void handle_request_timer (error_code ec, CompletionCounter) - { - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec != 0) - { - // fail - return; - } - - // They took too long to complete the request - cancel(); - } - - // Called when async_write completes. - // - void handle_write (error_code ec, std::size_t bytes_transferred, - SharedBuffer buf, CompletionCounter) - { - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec != 0) - { - failed (ec); - return; - } - - bassert (m_writesPending > 0); - if (--m_writesPending == 0 && m_session.closed()) - { - m_socket->shutdown (socket::shutdown_send); - } - } - - // Called when async_read_some completes. - // - void handle_read (error_code ec, std::size_t bytes_transferred, CompletionCounter) - { - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec != 0 && ec != boost::asio::error::eof) - { - failed (ec); - return; - } - - std::size_t const bytes_parsed (m_parser.process ( - m_buffer.getData(), bytes_transferred)); - - if (m_parser.error() || - bytes_parsed != bytes_transferred) - { - // set ec manually and call failed() - return; - } - - if (ec == boost::asio::error::eof) - { - m_parser.process_eof(); - ec = error_code(); - } - - if (m_parser.error()) - { - // set ec manually and call failed() - return; - } - - if (! m_parser.finished()) - { - // Feed some headers to the callback - if (m_parser.fields().size() > 0) - { - handle_headers (); - if (m_session.closed()) - return; - } - } - - if (m_parser.finished ()) - { - m_data_timer.cancel(); - - // VFALCO NOTE: Should we cancel this one? - m_request_timer.cancel(); - - if (! m_socket->needs_handshake()) - m_socket->shutdown (socket::shutdown_receive); - - handle_request (); - return; - } - - async_read_some(); - } - - // Called when we have some new headers. - // - void handle_headers () - { - m_session.headersComplete = m_parser.headersComplete(); - m_session.headers = HTTPHeaders (m_parser.fields()); - m_impl.handler().onHeaders (m_session); - - maybe_send_reply (); - } - - // Called when we have a complete http request. - // - void handle_request () - { - // This is to guarantee onHeaders is called at least once. - handle_headers(); - - if (m_session.closed()) - return; - - m_session.request = m_parser.request(); - - // Turn the Content-Body into a linear buffer. - ContentBodyBuffer const& body (m_session.request->body ()); - m_session.content.resize (body.size ()); - boost::asio::buffer_copy ( - boost::asio::buffer (&m_session.content.front(), - m_session.content.size()), body.data()); - - // Process the HTTPRequest - m_impl.handler().onRequest (m_session); - - maybe_send_reply (); - } - }; - - //-------------------------------------------------------------------------- - - /** A listening socket. */ - class Door - : public SharedObject - , public AsyncObject - , public List ::Node - , public LeakChecked - { - public: - typedef SharedPtr Ptr; - - Impl& m_impl; - acceptor m_acceptor; - Port m_port; - - Door (Impl& impl, Port const& port) - : m_impl (impl) - , m_acceptor (m_impl.get_io_service(), to_asio (port)) - , m_port (port) - { - m_impl.add (*this); - - error_code ec; - - m_acceptor.set_option (acceptor::reuse_address (true), ec); - if (ec) - { - m_impl.journal().error << - "Error setting acceptor socket option: " << ec.message(); - } - - if (! ec) - { - m_impl.journal().info << "Bound to endpoint " << - to_string (m_acceptor.local_endpoint()); - - async_accept(); - } - else - { - m_impl.journal().error << "Error binding to endpoint " << - to_string (m_acceptor.local_endpoint()) << - ", '" << ec.message() << "'"; - } - } - - ~Door () - { - m_impl.remove (*this); - } - - Port const& port () const - { - return m_port; - } - - void cancel () - { - m_acceptor.cancel(); - } - - void failed (error_code ec) - { - } - - void asyncHandlersComplete () - { - } - - void async_accept () - { - Peer* peer (new Peer (m_impl, m_port)); - m_acceptor.async_accept (peer->get_socket(), boost::bind ( - &Door::handle_accept, Ptr(this), - boost::asio::placeholders::error, - Peer::Ptr (peer), CompletionCounter (this))); - } - - void handle_accept (error_code ec, Peer::Ptr peer, CompletionCounter) - { - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec) - { - m_impl.journal().error << "Accept failed: " << ec.message(); - return; - } - - async_accept(); - - // Save remote address in session - peer->session().remoteAddress = from_asio ( - peer->get_socket().remote_endpoint()).withPort (0); - m_impl.handler().onAccept (peer->session()); - - if (peer->session().closed()) - { - peer->cancel(); - return; - } - - peer->handle_accept(); - } - }; - - //-------------------------------------------------------------------------- - - struct State - { - // Attributes for our listening ports - Ports ports; - - // All allocated Peer objects - List peers; - - // All allocated Door objects - List doors; - }; - - typedef SharedData SharedState; - typedef std::vector Doors; - - HTTPServer& m_server; - Handler& m_handler; - Journal m_journal; - boost::asio::io_service m_io_service; - boost::asio::io_service::strand m_strand; - boost::optional m_work; - WaitableEvent m_stopped; - SharedState m_state; - Doors m_doors; - - //-------------------------------------------------------------------------- - - Impl (HTTPServer& server, Handler& handler, Journal journal) - : Thread ("RPC::HTTPServer") - , m_server (server) - , m_handler (handler) - , m_journal (journal) - , m_strand (m_io_service) - , m_work (boost::in_place (boost::ref (m_io_service))) - , m_stopped (true) - { - startThread (); - } - - ~Impl () - { - stopThread (); - } - - Journal const& journal() const - { - return m_journal; - } - - Ports const& getPorts () const - { - SharedState::UnlockedAccess state (m_state); - return state->ports; - } - - void setPorts (Ports const& ports) - { - SharedState::Access state (m_state); - state->ports = ports; - update(); - } - - bool stopping () const - { - return ! m_work; - } - - void stop (bool wait) - { - if (! stopping()) - { - m_work = boost::none; - update(); - } - - if (wait) - m_stopped.wait(); - } - - //-------------------------------------------------------------------------- - // - // Server - // - - Handler& handler() - { - return m_handler; - } - - boost::asio::io_service& get_io_service() - { - return m_io_service; - } - - // Inserts the peer into our list of peers. We only remove it - // from the list inside the destructor of the Peer object. This - // way, the Peer can never outlive the server. - // - void add (Peer& peer) - { - SharedState::Access state (m_state); - state->peers.push_back (peer); - } - - void add (Door& door) - { - SharedState::Access state (m_state); - state->doors.push_back (door); - } - - // Removes the peer from our list of peers. This is only called from - // the destructor of Peer. Essentially, the item in the list functions - // as a weak_ptr. - // - void remove (Peer& peer) - { - SharedState::Access state (m_state); - state->peers.erase (state->peers.iterator_to (peer)); - } - - void remove (Door& door) - { - SharedState::Access state (m_state); - state->doors.push_back (door); - } - - //-------------------------------------------------------------------------- - // - // Thread - // - - // Updates our Door list based on settings. - // - void handle_update () - { - if (! stopping()) - { - // Make a local copy to shorten the lock - // - Ports ports; - { - SharedState::ConstAccess state (m_state); - ports = state->ports; - } - - std::sort (ports.begin(), ports.end()); - - // Walk the Door list and the Port list simultaneously and - // build a replacement Door vector which we will then swap in. - // - Doors doors; - Doors::iterator door (m_doors.begin()); - for (Ports::const_iterator port (ports.begin()); - port != ports.end(); ++port) - { - int comp; - - while (door != m_doors.end() && - ((comp = compare (*port, (*door)->port())) > 0)) - { - (*door)->cancel(); - ++door; - } - - if (door != m_doors.end()) - { - if (comp < 0) - { - doors.push_back (new Door (*this, *port)); - } - else - { - // old Port and new Port are the same - doors.push_back (*door); - ++door; - } - } - else - { - doors.push_back (new Door (*this, *port)); - } - } - - // Any remaining Door objects are not in the new set, so cancel them. - // - for (;door != m_doors.end();) - (*door)->cancel(); - - m_doors.swap (doors); - } - else - { - // Cancel pending I/O on all doors. - // - for (Doors::iterator iter (m_doors.begin()); - iter != m_doors.end(); ++iter) - { - (*iter)->cancel(); - } - - // Remove our references to the old doors. - // - m_doors.resize (0); - } - } - - // Causes handle_update to run on the io_service - // - void update () - { - m_io_service.post (m_strand.wrap (boost::bind ( - &Impl::handle_update, this))); - } - - // The main i/o processing loop. - // - void run () - { - m_io_service.run (); - - m_stopped.signal(); - m_handler.onStopped (m_server); - } -}; - -//------------------------------------------------------------------------------ - -HTTPServer::HTTPServer (Handler& handler, Journal journal) - : m_impl (new Impl (*this, handler, journal)) -{ -} - -HTTPServer::~HTTPServer () -{ - stop(); -} - -Journal const& HTTPServer::journal () const -{ - return m_impl->journal(); -} - -HTTPServer::Ports const& HTTPServer::getPorts () const -{ - return m_impl->getPorts(); -} - -void HTTPServer::setPorts (Ports const& ports) -{ - m_impl->setPorts (ports); -} - -void HTTPServer::stopAsync () -{ - m_impl->stop(false); -} - -void HTTPServer::stop () -{ - m_impl->stop(true); -} - -//------------------------------------------------------------------------------ - -} diff --git a/src/ripple/frame/api/HTTPServer.h b/src/ripple/frame/api/HTTPServer.h deleted file mode 100644 index 77bf7ea8a..000000000 --- a/src/ripple/frame/api/HTTPServer.h +++ /dev/null @@ -1,263 +0,0 @@ -//------------------------------------------------------------------------------ -/* - Copyright (c) 2011-2013, OpenCoin, Inc. -*/ -//============================================================================== - -#ifndef RIPPLE_FRAME_HTTPSERVER_H_INCLUDED -#define RIPPLE_FRAME_HTTPSERVER_H_INCLUDED - -#include - -namespace ripple { - -using namespace beast; - -/** Multi-threaded, asynchronous HTTP server. */ -class HTTPServer -{ -public: - /** Configuration information for a listening port. */ - struct Port - { - enum Security - { - no_ssl, - allow_ssl, - require_ssl - }; - - Port (); - Port (Port const& other); - Port& operator= (Port const& other); - Port (uint16 port_, IPEndpoint const& addr_, - Security security_, SSLContext* context_); - - uint16 port; - IPEndpoint addr; - Security security; - SSLContext* context; - }; - - //-------------------------------------------------------------------------- - - class Session; - - /** Scoped ostream-based RAII container for building the HTTP response. */ - class ScopedStream - { - public: - explicit ScopedStream (Session& session); - ScopedStream (ScopedStream const& other); - - template - ScopedStream (Session& session, T const& t) - : m_session (session) - { - m_ostream << t; - } - - ScopedStream (Session& session, std::ostream& manip (std::ostream&)); - - ~ScopedStream (); - - std::ostringstream& ostream () const; - - std::ostream& operator<< (std::ostream& manip (std::ostream&)) const; - - template - std::ostream& operator<< (T const& t) const - { - return m_ostream << t; - } - - private: - ScopedStream& operator= (ScopedStream const&); // disallowed - - Session& m_session; - std::ostringstream mutable m_ostream; - }; - - //-------------------------------------------------------------------------- - - /** Persistent state information for a connection session. - These values are preserved between calls for efficiency. - Some fields are input parameters, some are output parameters, - and all only become defined during specific callbacks. - */ - class Session : public Uncopyable - { - public: - Session (); - - /** Input: The Journal the HTTPServer is using. */ - Journal journal; - - /** Input: The remote address of the connection. */ - IPEndpoint remoteAddress; - - /** Input: `true` if all the headers have been received. */ - bool headersComplete; - - /** Input: The currently known set of HTTP headers. */ - HTTPHeaders headers; - - /** Input: The full HTTPRequest when it is known. */ - SharedPtr request; - - /** Input: The Content-Body as a linear buffer if we have the HTTPRequest. */ - std::string content; - - /** Output: The buffer to send back as a reply. - Upon each entry into the callback, reply.size() will be zero. - If reply.size() is zero when the callback returns, no data is - sent. - */ - std::string reply; - - /** A user-definable pointer. - The initial value is always zero. - Changes to the value are persisted between calls. - */ - void* tag; - - - - /** Send a copy of data asynchronously. */ - /** @{ */ - void write (std::string const& s) - { - if (! s.empty()) - write (&s.front(), - std::distance (s.begin(), s.end())); - } - - template - void write (BufferSequence const& buffers) - { - for (typename BufferSequence::const_iterator iter (buffers.begin()); - iter != buffers.end(); ++iter) - { - typename BufferSequence::value_type const& buffer (*iter); - write (boost::asio::buffer_cast (buffer), - boost::asio::buffer_size (buffer)); - } - } - - virtual void write (void const* buffer, std::size_t bytes) = 0; - /** @} */ - - /** Output support using ostream. */ - /** @{ */ - ScopedStream operator<< (std::ostream& manip (std::ostream&)); - - template - ScopedStream operator<< (T const& t) - { - return ScopedStream (*this, t); - } - /** @} */ - - /** Detach the session. - This holds the session open so that the response can be sent - asynchronously. Calls to io_service::run made by the HTTPServer - will not return until all detached sessions are closed. - */ - virtual void detach() = 0; - - /** Close the session. - This will be performed asynchronously. The session will be - closed gracefully after all pending writes have completed. - */ - virtual void close() = 0; - }; - - //-------------------------------------------------------------------------- - - /** Processes all sessions. - Thread safety: - Must be safe to call concurrently from any number of foreign threads. - */ - struct Handler - { - /** Called when the connection is accepted and we know remoteAddress. */ - virtual void onAccept (Session& session) = 0; - - /** Called repeatedly as new HTTP headers are received. - Guaranteed to be called at least once. - */ - virtual void onHeaders (Session& session) = 0; - - /** Called when we have the full Content-Body. */ - virtual void onRequest (Session& session) = 0; - - /** Called when the session ends. - Guaranteed to be called once. - */ - virtual void onClose (Session& session) = 0; - - /** Called when the HTTPServer has finished its stop. */ - virtual void onStopped (HTTPServer& server) = 0; - }; - - //-------------------------------------------------------------------------- - - /** A set of listening ports settings. */ - typedef std::vector Ports; - - /** Create the server using the specified handler. */ - HTTPServer (Handler& handler, Journal journal); - - /** Destroy the server. - This blocks until the server stops. - */ - virtual ~HTTPServer (); - - /** Returns the Journal associated with the server. */ - Journal const& journal () const; - - /** Returns the listening ports settings. - Thread safety: - Safe to call from any thread. - Cannot be called concurrently with setPorts. - */ - Ports const& getPorts () const; - - /** Set the listening ports settings. - These take effect immediately. Any current ports that are not in the - new set will be closed. Established connections will not be disturbed. - Thread safety: - Cannot be called concurrently. - */ - void setPorts (Ports const& ports); - - /** Notify the server to stop, without blocking. - Thread safety: - Safe to call concurrently from any thread. - */ - void stopAsync (); - - /** Notify the server to stop, and block until the stop is complete. - The handler's onStopped method will be called when the stop completes. - Thread safety: - Cannot be called concurrently. - Cannot be called from the thread of execution of any Handler functions. - */ - void stop (); - -private: - class Impl; - ScopedPointer m_impl; -}; - -int compare (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs); -bool operator== (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs); -bool operator!= (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs); -bool operator< (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs); -bool operator<= (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs); -bool operator> (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs); -bool operator>= (HTTPServer::Port const& lhs, HTTPServer::Port const& rhs); - -} - -#endif diff --git a/src/ripple/http/api/Handler.h b/src/ripple/http/api/Handler.h new file mode 100644 index 000000000..e976aa349 --- /dev/null +++ b/src/ripple/http/api/Handler.h @@ -0,0 +1,47 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +#ifndef RIPPLE_HTTP_HANDLER_H_INCLUDED +#define RIPPLE_HTTP_HANDLER_H_INCLUDED + +namespace ripple { +namespace HTTP { + +using namespace beast; + +class Server; +class Session; + +/** Processes all sessions. + Thread safety: + Must be safe to call concurrently from any number of foreign threads. +*/ +struct Handler +{ + /** Called when the connection is accepted and we know remoteAddress. */ + virtual void onAccept (Session& session) = 0; + + /** Called repeatedly as new HTTP headers are received. + Guaranteed to be called at least once. + */ + virtual void onHeaders (Session& session) = 0; + + /** Called when we have the full Content-Body. */ + virtual void onRequest (Session& session) = 0; + + /** Called when the session ends. + Guaranteed to be called once. + */ + virtual void onClose (Session& session) = 0; + + /** Called when the server has finished its stop. */ + virtual void onStopped (Server& server) = 0; +}; + +} +} + +#endif diff --git a/src/ripple/http/api/Port.h b/src/ripple/http/api/Port.h new file mode 100644 index 000000000..a715d7e71 --- /dev/null +++ b/src/ripple/http/api/Port.h @@ -0,0 +1,51 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +#ifndef RIPPLE_HTTP_PORT_H_INCLUDED +#define RIPPLE_HTTP_PORT_H_INCLUDED + +namespace ripple { +namespace HTTP { + +using namespace beast; + +/** Configuration information for a server listening port. */ +struct Port +{ + enum Security + { + no_ssl, + allow_ssl, + require_ssl + }; + + Port (); + Port (Port const& other); + Port& operator= (Port const& other); + Port (uint16 port_, IPEndpoint const& addr_, + Security security_, SSLContext* context_); + + uint16 port; + IPEndpoint addr; + Security security; + SSLContext* context; +}; + +int compare (Port const& lhs, Port const& rhs); +bool operator== (Port const& lhs, Port const& rhs); +bool operator!= (Port const& lhs, Port const& rhs); +bool operator< (Port const& lhs, Port const& rhs); +bool operator<= (Port const& lhs, Port const& rhs); +bool operator> (Port const& lhs, Port const& rhs); +bool operator>= (Port const& lhs, Port const& rhs); + +/** A set of listening ports settings. */ +typedef std::vector Ports; + +} +} + +#endif diff --git a/src/ripple/http/api/ScopedStream.h b/src/ripple/http/api/ScopedStream.h new file mode 100644 index 000000000..03b584405 --- /dev/null +++ b/src/ripple/http/api/ScopedStream.h @@ -0,0 +1,57 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +#ifndef RIPPLE_HTTP_SCOPEDSTREAM_H_INCLUDED +#define RIPPLE_HTTP_SCOPEDSTREAM_H_INCLUDED + +#include + +namespace ripple { +namespace HTTP { + +using namespace beast; + +class Session; + +/** Scoped ostream-based RAII container for building the HTTP response. */ +class ScopedStream +{ +public: + explicit ScopedStream (Session& session); + ScopedStream (ScopedStream const& other); + + template + ScopedStream (Session& session, T const& t) + : m_session (session) + { + m_ostream << t; + } + + ScopedStream (Session& session, std::ostream& manip (std::ostream&)); + + ~ScopedStream (); + + std::ostringstream& ostream () const; + + std::ostream& operator<< (std::ostream& manip (std::ostream&)) const; + + template + std::ostream& operator<< (T const& t) const + { + return m_ostream << t; + } + +private: + ScopedStream& operator= (ScopedStream const&); // disallowed + + Session& m_session; + std::ostringstream mutable m_ostream; +}; + +} +} + +#endif diff --git a/src/ripple/http/api/Server.h b/src/ripple/http/api/Server.h new file mode 100644 index 000000000..a10fa632e --- /dev/null +++ b/src/ripple/http/api/Server.h @@ -0,0 +1,70 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +#ifndef RIPPLE_HTTP_SERVER_H_INCLUDED +#define RIPPLE_HTTP_SERVER_H_INCLUDED + +#include + +namespace ripple { +namespace HTTP { + +using namespace beast; + +class ServerImpl; + +/** Multi-threaded, asynchronous HTTP server. */ +class Server +{ +public: + /** Create the server using the specified handler. */ + Server (Handler& handler, Journal journal); + + /** Destroy the server. + This blocks until the server stops. + */ + virtual ~Server (); + + /** Returns the Journal associated with the server. */ + Journal const& journal () const; + + /** Returns the listening ports settings. + Thread safety: + Safe to call from any thread. + Cannot be called concurrently with setPorts. + */ + Ports const& getPorts () const; + + /** Set the listening ports settings. + These take effect immediately. Any current ports that are not in the + new set will be closed. Established connections will not be disturbed. + Thread safety: + Cannot be called concurrently. + */ + void setPorts (Ports const& ports); + + /** Notify the server to stop, without blocking. + Thread safety: + Safe to call concurrently from any thread. + */ + void stopAsync (); + + /** Notify the server to stop, and block until the stop is complete. + The handler's onStopped method will be called when the stop completes. + Thread safety: + Cannot be called concurrently. + Cannot be called from the thread of execution of any Handler functions. + */ + void stop (); + +private: + ScopedPointer m_impl; +}; + +} +} + +#endif diff --git a/src/ripple/http/api/Session.h b/src/ripple/http/api/Session.h new file mode 100644 index 000000000..a2721f2ce --- /dev/null +++ b/src/ripple/http/api/Session.h @@ -0,0 +1,112 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +#ifndef RIPPLE_HTTP_SESSION_H_INCLUDED +#define RIPPLE_HTTP_SESSION_H_INCLUDED + +#include + +namespace ripple { +namespace HTTP { + +using namespace beast; + +/** Persistent state information for a connection session. + These values are preserved between calls for efficiency. + Some fields are input parameters, some are output parameters, + and all only become defined during specific callbacks. +*/ +class Session : public Uncopyable +{ +public: + Session (); + + /** Input: The Journal the server is using. */ + Journal journal; + + /** Input: The remote address of the connection. */ + IPEndpoint remoteAddress; + + /** Input: `true` if all the headers have been received. */ + bool headersComplete; + + /** Input: The currently known set of HTTP headers. */ + HTTPHeaders headers; + + /** Input: The full HTTPRequest when it is known. */ + SharedPtr request; + + /** Input: The Content-Body as a linear buffer if we have the HTTPRequest. */ + std::string content; + + /** Output: The buffer to send back as a reply. + Upon each entry into the callback, reply.size() will be zero. + If reply.size() is zero when the callback returns, no data is + sent. + */ + std::string reply; + + /** A user-definable pointer. + The initial value is always zero. + Changes to the value are persisted between calls. + */ + void* tag; + + + + /** Send a copy of data asynchronously. */ + /** @{ */ + void write (std::string const& s) + { + if (! s.empty()) + write (&s.front(), + std::distance (s.begin(), s.end())); + } + + template + void write (BufferSequence const& buffers) + { + for (typename BufferSequence::const_iterator iter (buffers.begin()); + iter != buffers.end(); ++iter) + { + typename BufferSequence::value_type const& buffer (*iter); + write (boost::asio::buffer_cast (buffer), + boost::asio::buffer_size (buffer)); + } + } + + virtual void write (void const* buffer, std::size_t bytes) = 0; + /** @} */ + + /** Output support using ostream. */ + /** @{ */ + ScopedStream operator<< (std::ostream& manip (std::ostream&)); + + template + ScopedStream operator<< (T const& t) + { + return ScopedStream (*this, t); + } + /** @} */ + + /** Detach the session. + This holds the session open so that the response can be sent + asynchronously. Calls to io_service::run made by the server + will not return until all detached sessions are closed. + */ + virtual void detach() = 0; + + /** Close the session. + This will be performed asynchronously. The session will be + closed gracefully after all pending writes have completed. + */ + virtual void close() = 0; +}; + +} +} + +#endif diff --git a/src/ripple/http/impl/Door.cpp b/src/ripple/http/impl/Door.cpp new file mode 100644 index 000000000..af438e066 --- /dev/null +++ b/src/ripple/http/impl/Door.cpp @@ -0,0 +1,102 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +namespace ripple { +namespace HTTP { + +Door::Door (ServerImpl& impl, Port const& port) + : m_impl (impl) + , m_acceptor (m_impl.get_io_service(), to_asio (port)) + , m_port (port) +{ + m_impl.add (*this); + + error_code ec; + + m_acceptor.set_option (acceptor::reuse_address (true), ec); + if (ec) + { + m_impl.journal().error << + "Error setting acceptor socket option: " << ec.message(); + } + + if (! ec) + { + m_impl.journal().info << "Bound to endpoint " << + to_string (m_acceptor.local_endpoint()); + + async_accept(); + } + else + { + m_impl.journal().error << "Error binding to endpoint " << + to_string (m_acceptor.local_endpoint()) << + ", '" << ec.message() << "'"; + } +} + +Door::~Door () +{ + m_impl.remove (*this); +} + +Port const& Door::port () const +{ + return m_port; +} + +void Door::cancel () +{ + m_acceptor.cancel(); +} + +void Door::failed (error_code ec) +{ +} + +void Door::asyncHandlersComplete () +{ +} + +void Door::async_accept () +{ + Peer* peer (new Peer (m_impl, m_port)); + m_acceptor.async_accept (peer->get_socket(), boost::bind ( + &Door::handle_accept, Ptr(this), + boost::asio::placeholders::error, + Peer::Ptr (peer), CompletionCounter (this))); +} + +void Door::handle_accept (error_code ec, Peer::Ptr peer, CompletionCounter) +{ + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec) + { + m_impl.journal().error << "Accept failed: " << ec.message(); + return; + } + + async_accept(); + + // Save remote address in session + peer->session().remoteAddress = from_asio ( + peer->get_socket().remote_endpoint()).withPort (0); + m_impl.handler().onAccept (peer->session()); + + if (peer->session().closed()) + { + peer->cancel(); + return; + } + + peer->handle_accept(); +} + +} +} + diff --git a/src/ripple/http/impl/Door.h b/src/ripple/http/impl/Door.h new file mode 100644 index 000000000..b0bcfd1ad --- /dev/null +++ b/src/ripple/http/impl/Door.h @@ -0,0 +1,42 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +#ifndef RIPPLE_HTTP_DOOR_H_INCLUDED +#define RIPPLE_HTTP_DOOR_H_INCLUDED + +namespace ripple { +namespace HTTP { + +using namespace beast; + +/** A listening socket. */ +class Door + : public SharedObject + , public AsyncObject + , public List ::Node + , public LeakChecked +{ +public: + typedef SharedPtr Ptr; + + ServerImpl& m_impl; + acceptor m_acceptor; + Port m_port; + + Door (ServerImpl& impl, Port const& port); + ~Door (); + Port const& port () const; + void cancel (); + void failed (error_code ec); + void asyncHandlersComplete (); + void async_accept (); + void handle_accept (error_code ec, Peer::Ptr peer, CompletionCounter); +}; + +} +} + +#endif diff --git a/src/ripple/http/impl/Peer.cpp b/src/ripple/http/impl/Peer.cpp new file mode 100644 index 000000000..aba83de9c --- /dev/null +++ b/src/ripple/http/impl/Peer.cpp @@ -0,0 +1,362 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +namespace ripple { +namespace HTTP { + +Peer::Peer (ServerImpl& impl, Port const& port) + : m_impl (impl) + , m_strand (m_impl.get_io_service()) + , m_data_timer (m_impl.get_io_service()) + , m_request_timer (m_impl.get_io_service()) + , m_buffer (bufferSize) + , m_parser (HTTPParser::typeRequest) + , m_session (*this) + , m_writesPending (0) + , m_callClose (false) +{ + int flags; + switch (port.security) + { + default: + bassertfalse; + case Port::no_ssl: flags = MultiSocket::none; break; + case Port::allow_ssl: flags = MultiSocket::server_ssl; break; + case Port::require_ssl: flags = MultiSocket::server_ssl_required; break; + } + + m_socket = MultiSocket::New (m_impl.get_io_service(), port.context->get(), flags); + + m_impl.add (*this); +} + +Peer::~Peer () +{ + if (m_callClose) + m_impl.handler().onClose (m_session); + + m_impl.remove (*this); +} + +// Returns the asio socket for the peer. +// +socket& Peer::get_socket() +{ + return m_socket->this_layer(); +} + +// Return the Session associated with this peer's session. +// +SessionImpl& Peer::session () +{ + return m_session; +} + +// Cancels all pending i/o and timers and sends tcp shutdown. +// +void Peer::cancel () +{ + error_code ec; + m_data_timer.cancel (ec); + m_request_timer.cancel (ec); + m_socket->cancel (ec); + m_socket->shutdown (socket::shutdown_both); +} + +// Called when I/O completes with an error that is not eof or aborted. +// +void Peer::failed (error_code ec) +{ + cancel (); +} + +// Called when there are no more completion handlers pending. +// +void Peer::asyncHandlersComplete () +{ +} + +// Send a copy of the data. +// +void Peer::write (void const* buffer, std::size_t bytes) +{ + SharedBuffer buf (static_cast (buffer), bytes); + // Make sure this happens on an i/o service thread. + m_impl.get_io_service().dispatch (m_strand.wrap ( + boost::bind (&Peer::handle_write, Ptr (this), + buf, CompletionCounter (this)))); +} + +// Called from an io_service thread to write the shared buffer. +// +void Peer::handle_write (SharedBuffer const& buf, CompletionCounter) +{ + async_write (buf); +} + +// Send a shared buffer +// +void Peer::async_write (SharedBuffer const& buf) +{ + bassert (buf.get().size() > 0); + + ++m_writesPending; + + // Send the copy. We pass the SharedArg in the last parameter + // so that a reference is maintained as the handler gets copied. + // When the final completion function returns, the reference + // count will drop to zero and the buffer will be freed. + // + boost::asio::async_write (*m_socket, + boost::asio::const_buffers_1 (&buf->front(), buf->size()), + m_strand.wrap (boost::bind (&Peer::handle_write, + Ptr (this), boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred, + buf, CompletionCounter (this)))); +} + +// Calls the async_read_some initiating function. +// +void Peer::async_read_some () +{ + // re-arm the data timer + // (this cancels the previous wait, if any) + // + m_data_timer.expires_from_now ( + boost::posix_time::seconds ( + dataTimeoutSeconds)); + + m_data_timer.async_wait (m_strand.wrap (boost::bind ( + &Peer::handle_data_timer, Ptr(this), + boost::asio::placeholders::error, + CompletionCounter (this)))); + + // issue the read + // + boost::asio::mutable_buffers_1 buf ( + m_buffer.getData (), m_buffer.getSize ()); + + m_socket->async_read_some (buf, m_strand.wrap ( + boost::bind (&Peer::handle_read, Ptr (this), + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred, + CompletionCounter (this)))); +} + +// Sends a copy of the reply in the session if it is not empty. +// Returns `true` if m_session.closed is `true` +// On return, reply.empty() will return `true`. +// +void Peer::maybe_send_reply () +{ + if (! m_session.reply.empty()) + { + async_write (boost::asio::const_buffers_1 ( + &m_session.reply.front(), m_session.reply.size())); + m_session.reply.clear(); + } +} + +// Called when the acceptor gives us the connection. +// +void Peer::handle_accept () +{ + m_callClose = true; + + m_request_timer.expires_from_now ( + boost::posix_time::seconds ( + requestTimeoutSeconds)); + + m_request_timer.async_wait (m_strand.wrap (boost::bind ( + &Peer::handle_request_timer, Ptr(this), + boost::asio::placeholders::error, + CompletionCounter (this)))); + + if (m_socket->needs_handshake ()) + { + m_socket->async_handshake (Socket::server, m_strand.wrap ( + boost::bind (&Peer::handle_handshake, Ptr(this), + boost::asio::placeholders::error, + CompletionCounter (this)))); + } + else + { + async_read_some(); + } +} + +// Called when the handshake completes +// +void Peer::handle_handshake (error_code ec, CompletionCounter) +{ + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec != 0) + { + // fail + return; + } + + async_read_some(); +} + +// Called when the data timer expires +// +void Peer::handle_data_timer (error_code ec, CompletionCounter) +{ + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec != 0) + { + // fail + return; + } + + // They took too long to send any bytes + cancel(); +} + +// Called when the request timer expires +// +void Peer::handle_request_timer (error_code ec, CompletionCounter) +{ + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec != 0) + { + // fail + return; + } + + // They took too long to complete the request + cancel(); +} + +// Called when async_write completes. +// +void Peer::handle_write (error_code ec, std::size_t bytes_transferred, + SharedBuffer buf, CompletionCounter) +{ + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec != 0) + { + failed (ec); + return; + } + + bassert (m_writesPending > 0); + if (--m_writesPending == 0 && m_session.closed()) + { + m_socket->shutdown (socket::shutdown_send); + } +} + +// Called when async_read_some completes. +// +void Peer::handle_read (error_code ec, std::size_t bytes_transferred, CompletionCounter) +{ + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec != 0 && ec != boost::asio::error::eof) + { + failed (ec); + return; + } + + std::size_t const bytes_parsed (m_parser.process ( + m_buffer.getData(), bytes_transferred)); + + if (m_parser.error() || + bytes_parsed != bytes_transferred) + { + // set ec manually and call failed() + return; + } + + if (ec == boost::asio::error::eof) + { + m_parser.process_eof(); + ec = error_code(); + } + + if (m_parser.error()) + { + // set ec manually and call failed() + return; + } + + if (! m_parser.finished()) + { + // Feed some headers to the callback + if (m_parser.fields().size() > 0) + { + handle_headers (); + if (m_session.closed()) + return; + } + } + + if (m_parser.finished ()) + { + m_data_timer.cancel(); + + // VFALCO NOTE: Should we cancel this one? + m_request_timer.cancel(); + + if (! m_socket->needs_handshake()) + m_socket->shutdown (socket::shutdown_receive); + + handle_request (); + return; + } + + async_read_some(); +} + +// Called when we have some new headers. +// +void Peer::handle_headers () +{ + m_session.headersComplete = m_parser.headersComplete(); + m_session.headers = HTTPHeaders (m_parser.fields()); + m_impl.handler().onHeaders (m_session); + + maybe_send_reply (); +} + +// Called when we have a complete http request. +// +void Peer::handle_request () +{ + // This is to guarantee onHeaders is called at least once. + handle_headers(); + + if (m_session.closed()) + return; + + m_session.request = m_parser.request(); + + // Turn the Content-Body into a linear buffer. + ContentBodyBuffer const& body (m_session.request->body ()); + m_session.content.resize (body.size ()); + boost::asio::buffer_copy ( + boost::asio::buffer (&m_session.content.front(), + m_session.content.size()), body.data()); + + // Process the HTTPRequest + m_impl.handler().onRequest (m_session); + + maybe_send_reply (); +} + +} +} diff --git a/src/ripple/http/impl/Peer.h b/src/ripple/http/impl/Peer.h new file mode 100644 index 000000000..16e4de88a --- /dev/null +++ b/src/ripple/http/impl/Peer.h @@ -0,0 +1,101 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +#ifndef RIPPLE_HTTP_PEER_H_INCLUDED +#define RIPPLE_HTTP_PEER_H_INCLUDED + +namespace ripple { +namespace HTTP { + +using namespace beast; + +/** Represents an active connection. */ +class Peer + : public SharedObject + , public AsyncObject + , public List ::Node + , public LeakChecked +{ +public: + enum + { + // Size of our receive buffer + bufferSize = 8192, + + // Largest HTTP request allowed + maxRequestBytes = 32 * 1024, + + // Max seconds without receiving a byte + dataTimeoutSeconds = 10, + + // Max seconds without completing the request + requestTimeoutSeconds = 30 + + }; + + typedef SharedPtr Ptr; + + ServerImpl& m_impl; + boost::asio::io_service::strand m_strand; + boost::asio::deadline_timer m_data_timer; + boost::asio::deadline_timer m_request_timer; + ScopedPointer m_socket; + MemoryBlock m_buffer; + HTTPParser m_parser; + SessionImpl m_session; + int m_writesPending; + bool m_callClose; + + Peer (ServerImpl& impl, Port const& port); + ~Peer (); + socket& get_socket(); + SessionImpl& session (); + void cancel (); + void failed (error_code ec); + void asyncHandlersComplete (); + void write (void const* buffer, std::size_t bytes); + void handle_write (SharedBuffer const& buf, CompletionCounter); + void async_write (SharedBuffer const& buf); + + template + void async_write (BufferSequence const& buffers) + { + // Iterate over each linear vector in the BufferSequence. + for (typename BufferSequence::const_iterator iter (buffers.begin()); + iter != buffers.end(); ++iter) + { + typename BufferSequence::value_type const& buffer (*iter); + + // Put a copy of this section of the buffer sequence into + // a reference counted, shared container. + // + async_write (SharedBuffer ( + boost::asio::buffer_cast (buffer), + boost::asio::buffer_size (buffer))); + } + } + + void async_read_some (); + void maybe_send_reply (); + void handle_accept (); + void handle_handshake (error_code ec, CompletionCounter); + void handle_data_timer (error_code ec, CompletionCounter); + void handle_request_timer (error_code ec, CompletionCounter); + + void handle_write (error_code ec, std::size_t bytes_transferred, + SharedBuffer buf, CompletionCounter); + + void handle_read (error_code ec, std::size_t bytes_transferred, + CompletionCounter); + + void handle_headers (); + void handle_request (); +}; + +} +} + +#endif diff --git a/src/ripple/http/impl/Port.cpp b/src/ripple/http/impl/Port.cpp new file mode 100644 index 000000000..093434204 --- /dev/null +++ b/src/ripple/http/impl/Port.cpp @@ -0,0 +1,77 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +namespace ripple { +namespace HTTP { + +Port::Port () + : port (0) + , security (no_ssl) + , context (nullptr) +{ +} + +Port::Port (Port const& other) + : port (other.port) + , addr (other.addr) + , security (other.security) + , context (other.context) +{ +} + +Port& Port::operator= (Port const& other) +{ + port = other.port; + addr = other.addr; + security = other.security; + context = other.context; + return *this; +} + +Port::Port ( + uint16 port_, + IPEndpoint const& addr_, + Security security_, + SSLContext* context_) + : port (port_) + , addr (addr_) + , security (security_) + , context (context_) +{ +} + +int compare (Port const& lhs, Port const& rhs) +{ + int comp; + + comp = compare (lhs.addr, rhs.addr); + if (comp != 0) + return comp; + + if (lhs.port < rhs.port) + return -1; + else if (lhs.port > rhs.port) + return 1; + + if (lhs.security < rhs.security) + return -1; + else if (lhs.security > rhs.security) + return 1; + + // 'context' does not participate in the comparison + + return 0; +} + +bool operator== (Port const& lhs, Port const& rhs) { return compare (lhs, rhs) == 0; } +bool operator!= (Port const& lhs, Port const& rhs) { return compare (lhs, rhs) != 0; } +bool operator< (Port const& lhs, Port const& rhs) { return compare (lhs, rhs) < 0; } +bool operator<= (Port const& lhs, Port const& rhs) { return compare (lhs, rhs) <= 0; } +bool operator> (Port const& lhs, Port const& rhs) { return compare (lhs, rhs) > 0; } +bool operator>= (Port const& lhs, Port const& rhs) { return compare (lhs, rhs) >= 0; } + +} +} diff --git a/src/ripple/http/impl/ScopedStream.cpp b/src/ripple/http/impl/ScopedStream.cpp new file mode 100644 index 000000000..4da221bae --- /dev/null +++ b/src/ripple/http/impl/ScopedStream.cpp @@ -0,0 +1,44 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +namespace ripple { +namespace HTTP { + +ScopedStream::ScopedStream (Session& session) + : m_session (session) +{ +} + +ScopedStream::ScopedStream (ScopedStream const& other) + : m_session (other.m_session) +{ +} + +ScopedStream::ScopedStream (Session& session, + std::ostream& manip (std::ostream&)) + : m_session (session) +{ + m_ostream << manip; +} + +ScopedStream::~ScopedStream () +{ + if (! m_ostream.str().empty()) + m_session.write (m_ostream.str()); +} + +std::ostream& ScopedStream::operator<< (std::ostream& manip (std::ostream&)) const +{ + return m_ostream << manip; +} + +std::ostringstream& ScopedStream::ostream () const +{ + return m_ostream; +} + +} +} diff --git a/src/ripple/http/impl/Server.cpp b/src/ripple/http/impl/Server.cpp new file mode 100644 index 000000000..fbe11929c --- /dev/null +++ b/src/ripple/http/impl/Server.cpp @@ -0,0 +1,46 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +namespace ripple { +namespace HTTP { + +Server::Server (Handler& handler, Journal journal) + : m_impl (new ServerImpl (*this, handler, journal)) +{ +} + +Server::~Server () +{ + stop(); +} + +Journal const& Server::journal () const +{ + return m_impl->journal(); +} + +Ports const& Server::getPorts () const +{ + return m_impl->getPorts(); +} + +void Server::setPorts (Ports const& ports) +{ + m_impl->setPorts (ports); +} + +void Server::stopAsync () +{ + m_impl->stop(false); +} + +void Server::stop () +{ + m_impl->stop(true); +} + +} +} diff --git a/src/ripple/http/impl/ServerImpl.h b/src/ripple/http/impl/ServerImpl.h new file mode 100644 index 000000000..e6fa22b45 --- /dev/null +++ b/src/ripple/http/impl/ServerImpl.h @@ -0,0 +1,246 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +#ifndef RIPPLE_HTTP_SERVERIMPL_H_INCLUDED +#define RIPPLE_HTTP_SERVERIMPL_H_INCLUDED + +namespace ripple { +namespace HTTP { + +using namespace beast; + +class ServerImpl : public Thread +{ +public: + struct State + { + // Attributes for our listening ports + Ports ports; + + // All allocated Peer objects + List peers; + + // All allocated Door objects + List doors; + }; + + typedef SharedData SharedState; + typedef std::vector Doors; + + Server& m_server; + Handler& m_handler; + Journal m_journal; + boost::asio::io_service m_io_service; + boost::asio::io_service::strand m_strand; + boost::optional m_work; + WaitableEvent m_stopped; + SharedState m_state; + Doors m_doors; + + //-------------------------------------------------------------------------- + + ServerImpl (Server& server, Handler& handler, Journal journal) + : Thread ("HTTP::Server") + , m_server (server) + , m_handler (handler) + , m_journal (journal) + , m_strand (m_io_service) + , m_work (boost::in_place (boost::ref (m_io_service))) + , m_stopped (true) + { + startThread (); + } + + ~ServerImpl () + { + stopThread (); + } + + Journal const& journal() const + { + return m_journal; + } + + Ports const& getPorts () const + { + SharedState::UnlockedAccess state (m_state); + return state->ports; + } + + void setPorts (Ports const& ports) + { + SharedState::Access state (m_state); + state->ports = ports; + update(); + } + + bool stopping () const + { + return ! m_work; + } + + void stop (bool wait) + { + if (! stopping()) + { + m_work = boost::none; + update(); + } + + if (wait) + m_stopped.wait(); + } + + //-------------------------------------------------------------------------- + // + // Server + // + + Handler& handler() + { + return m_handler; + } + + boost::asio::io_service& get_io_service() + { + return m_io_service; + } + + // Inserts the peer into our list of peers. We only remove it + // from the list inside the destructor of the Peer object. This + // way, the Peer can never outlive the server. + // + void add (Peer& peer) + { + SharedState::Access state (m_state); + state->peers.push_back (peer); + } + + void add (Door& door) + { + SharedState::Access state (m_state); + state->doors.push_back (door); + } + + // Removes the peer from our list of peers. This is only called from + // the destructor of Peer. Essentially, the item in the list functions + // as a weak_ptr. + // + void remove (Peer& peer) + { + SharedState::Access state (m_state); + state->peers.erase (state->peers.iterator_to (peer)); + } + + void remove (Door& door) + { + SharedState::Access state (m_state); + state->doors.push_back (door); + } + + //-------------------------------------------------------------------------- + // + // Thread + // + + // Updates our Door list based on settings. + // + void handle_update () + { + if (! stopping()) + { + // Make a local copy to shorten the lock + // + Ports ports; + { + SharedState::ConstAccess state (m_state); + ports = state->ports; + } + + std::sort (ports.begin(), ports.end()); + + // Walk the Door list and the Port list simultaneously and + // build a replacement Door vector which we will then swap in. + // + Doors doors; + Doors::iterator door (m_doors.begin()); + for (Ports::const_iterator port (ports.begin()); + port != ports.end(); ++port) + { + int comp; + + while (door != m_doors.end() && + ((comp = compare (*port, (*door)->port())) > 0)) + { + (*door)->cancel(); + ++door; + } + + if (door != m_doors.end()) + { + if (comp < 0) + { + doors.push_back (new Door (*this, *port)); + } + else + { + // old Port and new Port are the same + doors.push_back (*door); + ++door; + } + } + else + { + doors.push_back (new Door (*this, *port)); + } + } + + // Any remaining Door objects are not in the new set, so cancel them. + // + for (;door != m_doors.end();) + (*door)->cancel(); + + m_doors.swap (doors); + } + else + { + // Cancel pending I/O on all doors. + // + for (Doors::iterator iter (m_doors.begin()); + iter != m_doors.end(); ++iter) + { + (*iter)->cancel(); + } + + // Remove our references to the old doors. + // + m_doors.resize (0); + } + } + + // Causes handle_update to run on the io_service + // + void update () + { + m_io_service.post (m_strand.wrap (boost::bind ( + &ServerImpl::handle_update, this))); + } + + // The main i/o processing loop. + // + void run () + { + m_io_service.run (); + + m_stopped.signal(); + m_handler.onStopped (m_server); + } +}; + +} +} + +#endif diff --git a/src/ripple/http/impl/Session.cpp b/src/ripple/http/impl/Session.cpp new file mode 100644 index 000000000..25ae5ab75 --- /dev/null +++ b/src/ripple/http/impl/Session.cpp @@ -0,0 +1,25 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +namespace ripple { +namespace HTTP { + +Session::Session () + : headersComplete (false) + , tag (nullptr) +{ + content.reserve (1000); + reply.reserve (1000); +} + +ScopedStream Session::operator<< ( + std::ostream& manip (std::ostream&)) +{ + return ScopedStream (*this, manip); +} + +} +} diff --git a/src/ripple/http/impl/SessionImpl.cpp b/src/ripple/http/impl/SessionImpl.cpp new file mode 100644 index 000000000..c300042c5 --- /dev/null +++ b/src/ripple/http/impl/SessionImpl.cpp @@ -0,0 +1,43 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +namespace ripple { +namespace HTTP { + +SessionImpl::SessionImpl (Peer& peer) + : m_peer (peer) + , m_closed (false) +{ +} + +SessionImpl::~SessionImpl () +{ +} + +bool SessionImpl::closed() const +{ + return m_closed; +} + +void SessionImpl::write (void const* buffer, std::size_t bytes) +{ + m_peer.write (buffer, bytes); +} + +void SessionImpl::close() +{ + m_closed = true; +} + +void SessionImpl::detach() +{ + if (! m_work) + m_work = boost::in_place (boost::ref ( + m_peer.m_impl.get_io_service())); +} + +} +} diff --git a/src/ripple/http/impl/SessionImpl.h b/src/ripple/http/impl/SessionImpl.h new file mode 100644 index 000000000..dd13a57cf --- /dev/null +++ b/src/ripple/http/impl/SessionImpl.h @@ -0,0 +1,38 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +#ifndef RIPPLE_HTTP_SESSIONIMPL_H_INCLUDED +#define RIPPLE_HTTP_SESSIONIMPL_H_INCLUDED + +namespace ripple { +namespace HTTP { + +using namespace beast; + +// Holds the copy of buffers being sent +typedef SharedArg SharedBuffer; + +class Peer; + +class SessionImpl : public Session +{ +public: + Peer& m_peer; + bool m_closed; + boost::optional m_work; + + explicit SessionImpl (Peer& peer); + ~SessionImpl (); + bool closed() const; + void write (void const* buffer, std::size_t bytes); + void close(); + void detach(); +}; + +} +} + +#endif diff --git a/src/ripple/http/impl/Types.h b/src/ripple/http/impl/Types.h new file mode 100644 index 000000000..52367a229 --- /dev/null +++ b/src/ripple/http/impl/Types.h @@ -0,0 +1,58 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +#ifndef RIPPLE_HTTP_TYPES_H_INCLUDED +#define RIPPLE_HTTP_TYPES_H_INCLUDED + +namespace ripple { +namespace HTTP { + +typedef boost::system::error_code error_code; +typedef boost::asio::ip::tcp Protocol; +typedef boost::asio::ip::address address; +typedef Protocol::endpoint endpoint_t; +typedef Protocol::acceptor acceptor; +typedef Protocol::socket socket; + +inline std::string to_string (address const& addr) +{ + return addr.to_string(); +} + +inline std::string to_string (endpoint_t const& endpoint) +{ + std::stringstream ss; + ss << to_string (endpoint.address()); + if (endpoint.port() != 0) + ss << ":" << std::dec << endpoint.port(); + return std::string (ss.str()); +} + +inline endpoint_t to_asio (Port const& port) +{ + if (port.addr.isV4()) + { + IPEndpoint::V4 v4 (port.addr.v4()); + std::string const& s (v4.to_string()); + return endpoint_t (address().from_string (s), port.port); + } + + //IPEndpoint::V6 v6 (ep.v6()); + return endpoint_t (); +} + +inline IPEndpoint from_asio (endpoint_t const& endpoint) +{ + std::stringstream ss (to_string (endpoint)); + IPEndpoint ep; + ss >> ep; + return ep; +} + +} +} + +#endif diff --git a/src/ripple/http/ripple_http.cpp b/src/ripple/http/ripple_http.cpp new file mode 100644 index 000000000..b79023eb9 --- /dev/null +++ b/src/ripple/http/ripple_http.cpp @@ -0,0 +1,29 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +#include "BeastConfig.h" + +#include "ripple_http.h" + +#include "../ripple_net/ripple_net.h" + +#include "beast/modules/beast_core/system/BeforeBoost.h" // must come first +#include +#include + +#include "impl/Port.cpp" +#include "impl/ScopedStream.cpp" +#include "impl/Session.cpp" + +# include "impl/Types.h" +# include "impl/SessionImpl.h" +# include "impl/Peer.h" +# include "impl/Door.h" +# include "impl/ServerImpl.h" +#include "impl/Door.cpp" +#include "impl/Peer.cpp" +#include "impl/Server.cpp" +#include "impl/SessionImpl.cpp" diff --git a/src/ripple/frame/ripple_frame.h b/src/ripple/http/ripple_http.h similarity index 69% rename from src/ripple/frame/ripple_frame.h rename to src/ripple/http/ripple_http.h index a26a2909c..7b309945f 100644 --- a/src/ripple/frame/ripple_frame.h +++ b/src/ripple/http/ripple_http.h @@ -4,8 +4,8 @@ */ //============================================================================== -#ifndef RIPPLE_FRAME_H_INCLUDED -#define RIPPLE_FRAME_H_INCLUDED +#ifndef RIPPLE_HTTP_H_INCLUDED +#define RIPPLE_HTTP_H_INCLUDED #include "beast/modules/beast_core/beast_core.h" @@ -13,9 +13,10 @@ // just for HTTPMessage!! #include "beast/modules/beast_asio/beast_asio.h" -#include "../json/ripple_json.h" - -#include "api/HTTPServer.h" -#include "api/RPCService.h" +# include "api/Port.h" +# include "api/ScopedStream.h" +# include "api/Session.h" +# include "api/Handler.h" +#include "api/Server.h" #endif diff --git a/src/ripple/frame/api/RPCService.cpp b/src/ripple/rpc/api/RPCService.cpp similarity index 100% rename from src/ripple/frame/api/RPCService.cpp rename to src/ripple/rpc/api/RPCService.cpp diff --git a/src/ripple/frame/api/RPCService.h b/src/ripple/rpc/api/RPCService.h similarity index 75% rename from src/ripple/frame/api/RPCService.h rename to src/ripple/rpc/api/RPCService.h index 0c40c06b9..c227f71c7 100644 --- a/src/ripple/frame/api/RPCService.h +++ b/src/ripple/rpc/api/RPCService.h @@ -102,42 +102,6 @@ public: virtual std::pair call ( std::string const& method, Json::Value const& args) = 0; - /** Execute an RPC command asynchronously. - - If the method exists, the dispatcher is invoked to provide the - context for calling the handler with the argument list and this - function returns `true` immediately. The dispatcher calls the - CompletionHandler when the operation is complete. If the method - does not exist, `false` is returned. - - Copies of the Dispatcher and CompletionHandler are made as needed. - - CompletionHandler must be compatible with this signature: - void (Json::Value const&) - - Dispatcher is a functor compatible with this signature: - void (Handler const& handler, - Json::Value const& args, - CompletionHandler completionHandler); - - Thread safety: - Safe to call from any thread. - - @return `true` if a handler was found. - */ - template - bool call_async (std::string const& method, - Json::Value const& args, - CompletionHandler completionHandler, - Dispatcher dispatcher) - { - Handler const* handler (find (method)); - if (! handler) - return false; - dispatcher (*handler, args, completionHandler); - return true; - } - /** Returns the Handler for the specified method, or nullptr. Thread safety: Safe to call from any threads. diff --git a/src/ripple/frame/ripple_frame.cpp b/src/ripple/rpc/ripple_rpc.cpp similarity index 77% rename from src/ripple/frame/ripple_frame.cpp rename to src/ripple/rpc/ripple_rpc.cpp index 45cef89f1..4b838c37a 100644 --- a/src/ripple/frame/ripple_frame.cpp +++ b/src/ripple/rpc/ripple_rpc.cpp @@ -6,12 +6,9 @@ #include "BeastConfig.h" -#include "ripple_frame.h" +#include "ripple_rpc.h" #include "beast/modules/beast_core/system/BeforeBoost.h" // must come first -#include -#include #include -#include "api/HTTPServer.cpp" #include "api/RPCService.cpp" diff --git a/src/ripple/rpc/ripple_rpc.h b/src/ripple/rpc/ripple_rpc.h new file mode 100644 index 000000000..2e1033795 --- /dev/null +++ b/src/ripple/rpc/ripple_rpc.h @@ -0,0 +1,16 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +#ifndef RIPPLE_RPC_H_INCLUDED +#define RIPPLE_RPC_H_INCLUDED + +#include "beast/modules/beast_core/beast_core.h" + +#include "../json/ripple_json.h" + +#include "api/RPCService.h" + +#endif diff --git a/src/ripple/validators/ripple_validators.h b/src/ripple/validators/ripple_validators.h index b68e92256..70d0bccc3 100644 --- a/src/ripple/validators/ripple_validators.h +++ b/src/ripple/validators/ripple_validators.h @@ -17,8 +17,7 @@ // #include "beast/modules/beast_asio/beast_asio.h" -#include "../ripple/frame/ripple_frame.h" -//#include "../ripple_basics/ripple_basics.h" +#include "../ripple/rpc/ripple_rpc.h" #include "../ripple_data/ripple_data.h" namespace ripple diff --git a/src/ripple_app/main/RPCHTTPServer.cpp b/src/ripple_app/main/RPCHTTPServer.cpp index 4718f1607..31f49ab01 100644 --- a/src/ripple_app/main/RPCHTTPServer.cpp +++ b/src/ripple_app/main/RPCHTTPServer.cpp @@ -7,12 +7,12 @@ class RPCHTTPServerImp : public RPCHTTPServer , public LeakChecked - , public HTTPServer::Handler + , public HTTP::Handler { public: NetworkOPs& m_networkOPs; RPCServerHandler m_deprecatedHandler; - HTTPServer m_server; + HTTP::Server m_server; ScopedPointer m_context; RPCHTTPServerImp (Stoppable& parent, @@ -49,7 +49,7 @@ public: IPEndpoint ep (IPEndpoint::from_string (getConfig().getRpcIP())); if (! ep.empty()) { - HTTPServer::Port port; + HTTP::Port port; port.addr = ep.withPort(0); if (getConfig ().getRpcPort() != 0) port.port = getConfig ().getRpcPort(); @@ -57,7 +57,7 @@ public: port.port = ep.port(); port.context = m_context; - HTTPServer::Ports ports; + HTTP::Ports ports; ports.push_back (port); m_server.setPorts (ports); } @@ -84,10 +84,10 @@ public: //-------------------------------------------------------------------------- // - // HTTPServer::Handler + // HTTP::Handler // - void onAccept (HTTPServer::Session& session) + void onAccept (HTTP::Session& session) { // Reject non-loopback connections if RPC_ALLOW_REMOTE is not set if (! getConfig().RPC_ALLOW_REMOTE && @@ -97,11 +97,11 @@ public: } } - void onHeaders (HTTPServer::Session& session) + void onHeaders (HTTP::Session& session) { } - void onRequest (HTTPServer::Session& session) + void onRequest (HTTP::Session& session) { session.write (m_deprecatedHandler.processRequest ( session.content, session.remoteAddress.to_string())); @@ -109,11 +109,11 @@ public: session.close(); } - void onClose (HTTPServer::Session& session) + void onClose (HTTP::Session& session) { } - void onStopped (HTTPServer&) + void onStopped (HTTP::Server&) { stopped(); } diff --git a/src/ripple_app/ripple_app.cpp b/src/ripple_app/ripple_app.cpp index 6d52b06f3..c11c4c123 100644 --- a/src/ripple_app/ripple_app.cpp +++ b/src/ripple_app/ripple_app.cpp @@ -21,6 +21,8 @@ // This .cpp will end up including all of the public header // material in Ripple since it holds the Application object. +#include "../ripple/http/ripple_http.h" +#include "../ripple/rpc/ripple_rpc.h" #include "../ripple/validators/ripple_validators.h" namespace ripple diff --git a/src/ripple_app/ripple_app.h b/src/ripple_app/ripple_app.h index 2a209226c..67a2490c4 100644 --- a/src/ripple_app/ripple_app.h +++ b/src/ripple_app/ripple_app.h @@ -34,7 +34,6 @@ //------------------------------------------------------------------------------ -#include "../ripple/frame/ripple_frame.h" #include "../ripple_basics/ripple_basics.h" #include "../ripple_core/ripple_core.h" #include "../ripple_data/ripple_data.h" diff --git a/src/ripple_core/ripple_core.h b/src/ripple_core/ripple_core.h index e5da96c83..aecd72d92 100644 --- a/src/ripple_core/ripple_core.h +++ b/src/ripple_core/ripple_core.h @@ -10,7 +10,7 @@ // VFALCO TODO For UniformResourceLocator, remove asap #include "beast/modules/beast_asio/beast_asio.h" -#include "../ripple/frame/ripple_frame.h" +#include "../ripple/rpc/ripple_rpc.h" #include "../ripple_basics/ripple_basics.h" #include "../ripple_data/ripple_data.h"