diff --git a/Builds/VisualStudio2012/RippleD.vcxproj b/Builds/VisualStudio2012/RippleD.vcxproj index 6c9d3b6ef..c80c2f27b 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj +++ b/Builds/VisualStudio2012/RippleD.vcxproj @@ -22,18 +22,6 @@ - - true - true - true - true - - - true - true - true - true - true true @@ -52,13 +40,7 @@ true true - - true - true - true - true - - + true true true @@ -1562,7 +1544,6 @@ - diff --git a/Builds/VisualStudio2012/RippleD.vcxproj.filters b/Builds/VisualStudio2012/RippleD.vcxproj.filters index cf499ed92..f7ddb8a6d 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2012/RippleD.vcxproj.filters @@ -1008,18 +1008,6 @@ [1] Ripple\http\impl - - [1] Ripple\http\impl - - - [1] Ripple\http\impl - - - [1] Ripple\http\impl - - - [1] Ripple\http\impl - [1] Ripple\rpc\impl @@ -1029,6 +1017,9 @@ [1] Ripple\rpc\impl + + [1] Ripple\http\impl + @@ -2013,9 +2004,6 @@ [1] Ripple\http\impl - - [1] Ripple\http\impl - [1] Ripple\http\impl diff --git a/src/ripple/http/api/Handler.h b/src/ripple/http/api/Handler.h index bba376d8d..bc0db6086 100644 --- a/src/ripple/http/api/Handler.h +++ b/src/ripple/http/api/Handler.h @@ -34,8 +34,9 @@ struct Handler /** Called when the session ends. Guaranteed to be called once. + @param errorCode Non zero for a failed connection. */ - virtual void onClose (Session& session) = 0; + virtual void onClose (Session& session, int errorCode) = 0; /** Called when the server has finished its stop. */ virtual void onStopped (Server& server) = 0; diff --git a/src/ripple/http/api/Session.h b/src/ripple/http/api/Session.h index 707c6ff36..624285248 100644 --- a/src/ripple/http/api/Session.h +++ b/src/ripple/http/api/Session.h @@ -22,33 +22,29 @@ namespace HTTP { class Session : public Uncopyable { public: - Session (); - - /** Input: The Journal the server is using. */ - Journal journal; - - /** Input: The remote address of the connection. */ - IPEndpoint remoteAddress; - - /** Input: `true` if all the headers have been received. */ - bool headersComplete; - - /** Input: The currently known set of HTTP headers. */ - HTTPHeaders headers; - - /** Input: The full HTTPRequest when it is known. */ - SharedPtr request; - - /** Input: The Content-Body as a linear buffer if we have the HTTPRequest. */ - std::string content; - /** A user-definable pointer. The initial value is always zero. Changes to the value are persisted between calls. */ void* tag; + /** Returns the Journal to use for logging. */ + virtual Journal journal() = 0; + /** Returns the remote address of the connection. */ + virtual IPEndpoint remoteAddress() = 0; + + /** Returns `true` if the full HTTP headers have been received. */ + virtual bool headersComplete() = 0; + + /** Returns the currently known set of headers. */ + virtual HTTPHeaders headers() = 0; + + /** Returns the complete HTTP request when it is known. */ + virtual SharedPtr const& request() = 0; + + /** Returns the entire Content-Body, if the request is complete. */ + virtual std::string content() = 0; /** Send a copy of data asynchronously. */ /** @{ */ @@ -76,7 +72,10 @@ public: /** Output support using ostream. */ /** @{ */ - ScopedStream operator<< (std::ostream& manip (std::ostream&)); + ScopedStream operator<< (std::ostream& manip (std::ostream&)) + { + return ScopedStream (*this, manip); + } template ScopedStream operator<< (T const& t) diff --git a/src/ripple/http/impl/Door.cpp b/src/ripple/http/impl/Door.cpp deleted file mode 100644 index 59b066599..000000000 --- a/src/ripple/http/impl/Door.cpp +++ /dev/null @@ -1,91 +0,0 @@ -//------------------------------------------------------------------------------ -/* - Copyright (c) 2011-2013, OpenCoin, Inc. -*/ -//============================================================================== - -namespace ripple { -namespace HTTP { - -Door::Door (ServerImpl& impl, Port const& port) - : m_impl (impl) - , m_acceptor (m_impl.get_io_service(), to_asio (port)) - , m_port (port) -{ - m_impl.add (*this); - - error_code ec; - - m_acceptor.set_option (acceptor::reuse_address (true), ec); - if (ec) - { - m_impl.journal().error << - "Error setting acceptor socket option: " << ec.message(); - } - - if (! ec) - { - m_impl.journal().info << "Bound to endpoint " << - to_string (m_acceptor.local_endpoint()); - - async_accept(); - } - else - { - m_impl.journal().error << "Error binding to endpoint " << - to_string (m_acceptor.local_endpoint()) << - ", '" << ec.message() << "'"; - } -} - -Door::~Door () -{ - m_impl.remove (*this); -} - -Port const& Door::port () const -{ - return m_port; -} - -void Door::cancel () -{ - m_acceptor.cancel(); -} - -void Door::failed (error_code ec) -{ -} - -void Door::asyncHandlersComplete () -{ -} - -void Door::async_accept () -{ - Peer* peer (new Peer (m_impl, m_port)); - m_acceptor.async_accept (peer->get_socket(), boost::bind ( - &Door::handle_accept, Ptr(this), - boost::asio::placeholders::error, - Peer::Ptr (peer), CompletionCounter (this))); -} - -void Door::handle_accept (error_code ec, Peer::Ptr peer, CompletionCounter) -{ - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec) - { - m_impl.journal().error << "Accept failed: " << ec.message(); - return; - } - - async_accept(); - - peer->handle_accept(); -} - -} -} - diff --git a/src/ripple/http/impl/Door.h b/src/ripple/http/impl/Door.h index cdce116a5..aed7f7f23 100644 --- a/src/ripple/http/impl/Door.h +++ b/src/ripple/http/impl/Door.h @@ -24,14 +24,84 @@ public: acceptor m_acceptor; Port m_port; - Door (ServerImpl& impl, Port const& port); - ~Door (); - Port const& port () const; - void cancel (); - void failed (error_code ec); - void asyncHandlersComplete (); - void async_accept (); - void handle_accept (error_code ec, Peer::Ptr peer, CompletionCounter); + Door (ServerImpl& impl, Port const& port) + : m_impl (impl) + , m_acceptor (m_impl.get_io_service(), to_asio (port)) + , m_port (port) + { + m_impl.add (*this); + + error_code ec; + + m_acceptor.set_option (acceptor::reuse_address (true), ec); + if (ec) + { + m_impl.journal().error << + "Error setting acceptor socket option: " << ec.message(); + } + + if (! ec) + { + m_impl.journal().info << "Bound to endpoint " << + to_string (m_acceptor.local_endpoint()); + + async_accept(); + } + else + { + m_impl.journal().error << "Error binding to endpoint " << + to_string (m_acceptor.local_endpoint()) << + ", '" << ec.message() << "'"; + } + } + + ~Door () + { + m_impl.remove (*this); + } + + Port const& port () const + { + return m_port; + } + + void cancel () + { + m_acceptor.cancel(); + } + + void failed (error_code ec) + { + } + + void asyncHandlersComplete () + { + } + + void async_accept () + { + Peer* peer (new Peer (m_impl, m_port)); + m_acceptor.async_accept (peer->get_socket(), boost::bind ( + &Door::handle_accept, Ptr(this), + boost::asio::placeholders::error, + Peer::Ptr (peer), CompletionCounter (this))); + } + + void handle_accept (error_code ec, Peer::Ptr peer, CompletionCounter) + { + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec) + { + m_impl.journal().error << "Accept failed: " << ec.message(); + return; + } + + async_accept(); + + peer->handle_accept(); + } }; } diff --git a/src/ripple/http/impl/Peer.cpp b/src/ripple/http/impl/Peer.cpp deleted file mode 100644 index 21123a482..000000000 --- a/src/ripple/http/impl/Peer.cpp +++ /dev/null @@ -1,374 +0,0 @@ -//------------------------------------------------------------------------------ -/* - Copyright (c) 2011-2013, OpenCoin, Inc. -*/ -//============================================================================== - -namespace ripple { -namespace HTTP { - -Peer::Peer (ServerImpl& impl, Port const& port) - : m_impl (impl) - , m_strand (m_impl.get_io_service()) - , m_data_timer (m_impl.get_io_service()) - , m_request_timer (m_impl.get_io_service()) - , m_buffer (bufferSize) - , m_parser (HTTPParser::typeRequest) - , m_session (*this) - , m_writesPending (0) - , m_closed (false) - , m_callClose (false) -{ - int flags; - switch (port.security) - { - default: - bassertfalse; - case Port::no_ssl: flags = MultiSocket::none; break; - case Port::allow_ssl: flags = MultiSocket::server_ssl; break; - case Port::require_ssl: flags = MultiSocket::server_ssl_required; break; - } - - m_socket = MultiSocket::New (m_impl.get_io_service(), port.context->get(), flags); - - m_impl.add (*this); -} - -Peer::~Peer () -{ - if (m_callClose) - m_impl.handler().onClose (m_session); - - m_impl.remove (*this); -} - -// Returns the asio socket for the peer. -// -socket& Peer::get_socket() -{ - return m_socket->this_layer(); -} - -// Return the Session associated with this peer's session. -// -SessionImpl& Peer::session () -{ - return m_session; -} - -// Indicates that the Handler closed the Session -// -void Peer::close () -{ - // Make sure this happens on an i/o service thread. - m_impl.get_io_service().dispatch (m_strand.wrap ( - boost::bind (&Peer::handle_close, Ptr (this), - CompletionCounter (this)))); -} - -// Cancels all pending i/o and timers and sends tcp shutdown. -// -void Peer::cancel () -{ - error_code ec; - m_data_timer.cancel (ec); - m_request_timer.cancel (ec); - m_socket->cancel (ec); - m_socket->shutdown (socket::shutdown_both); -} - -// Called when I/O completes with an error that is not eof or aborted. -// -void Peer::failed (error_code ec) -{ - cancel (); -} - -// Called when there are no more completion handlers pending. -// -void Peer::asyncHandlersComplete () -{ -} - -// Send a copy of the data. -// -void Peer::write (void const* buffer, std::size_t bytes) -{ - SharedBuffer buf (static_cast (buffer), bytes); - // Make sure this happens on an i/o service thread. - m_impl.get_io_service().dispatch (m_strand.wrap ( - boost::bind (&Peer::handle_write, Ptr (this), - buf, CompletionCounter (this)))); -} - -// Called from an io_service thread to write the shared buffer. -// -void Peer::handle_write (SharedBuffer const& buf, CompletionCounter) -{ - async_write (buf); -} - -// Send a shared buffer -// -void Peer::async_write (SharedBuffer const& buf) -{ - bassert (buf.get().size() > 0); - - ++m_writesPending; - - // Send the copy. We pass the SharedArg in the last parameter - // so that a reference is maintained as the handler gets copied. - // When the final completion function returns, the reference - // count will drop to zero and the buffer will be freed. - // - boost::asio::async_write (*m_socket, - boost::asio::const_buffers_1 (&buf->front(), buf->size()), - m_strand.wrap (boost::bind (&Peer::handle_write, - Ptr (this), boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred, - buf, CompletionCounter (this)))); -} - -// Calls the async_read_some initiating function. -// -void Peer::async_read_some () -{ - // re-arm the data timer - // (this cancels the previous wait, if any) - // - m_data_timer.expires_from_now ( - boost::posix_time::seconds ( - dataTimeoutSeconds)); - - m_data_timer.async_wait (m_strand.wrap (boost::bind ( - &Peer::handle_data_timer, Ptr(this), - boost::asio::placeholders::error, - CompletionCounter (this)))); - - // issue the read - // - boost::asio::mutable_buffers_1 buf ( - m_buffer.getData (), m_buffer.getSize ()); - - m_socket->async_read_some (buf, m_strand.wrap ( - boost::bind (&Peer::handle_read, Ptr (this), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred, - CompletionCounter (this)))); -} - -// Called when the acceptor gives us the connection. -// -void Peer::handle_accept () -{ - m_callClose = true; - - // save remote addr - m_session.remoteAddress = from_asio ( - get_socket().remote_endpoint()).withPort (0); - m_impl.handler().onAccept (m_session); - - if (m_closed) - { - cancel(); - return; - } - - m_request_timer.expires_from_now ( - boost::posix_time::seconds ( - requestTimeoutSeconds)); - - m_request_timer.async_wait (m_strand.wrap (boost::bind ( - &Peer::handle_request_timer, Ptr(this), - boost::asio::placeholders::error, - CompletionCounter (this)))); - - if (m_socket->needs_handshake ()) - { - m_socket->async_handshake (Socket::server, m_strand.wrap ( - boost::bind (&Peer::handle_handshake, Ptr(this), - boost::asio::placeholders::error, - CompletionCounter (this)))); - } - else - { - async_read_some(); - } -} - -// Called when the handshake completes -// -void Peer::handle_handshake (error_code ec, CompletionCounter) -{ - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec != 0) - { - // fail - return; - } - - async_read_some(); -} - -// Called when the data timer expires -// -void Peer::handle_data_timer (error_code ec, CompletionCounter) -{ - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec != 0) - { - // fail - return; - } - - // They took too long to send any bytes - cancel(); -} - -// Called when the request timer expires -// -void Peer::handle_request_timer (error_code ec, CompletionCounter) -{ - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec != 0) - { - // fail - return; - } - - // They took too long to complete the request - cancel(); -} - -// Called when the Session is closed by the Handler. -// -void Peer::handle_close (CompletionCounter) -{ - m_closed = true; - m_session.handle_close(); -} - -// Called when async_write completes. -// -void Peer::handle_write (error_code ec, std::size_t bytes_transferred, - SharedBuffer buf, CompletionCounter) -{ - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec != 0) - { - failed (ec); - return; - } - - bassert (m_writesPending > 0); - if (--m_writesPending == 0 && m_closed) - { - m_socket->shutdown (socket::shutdown_send); - } -} - -// Called when async_read_some completes. -// -void Peer::handle_read (error_code ec, std::size_t bytes_transferred, CompletionCounter) -{ - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec != 0 && ec != boost::asio::error::eof) - { - failed (ec); - return; - } - - std::size_t const bytes_parsed (m_parser.process ( - m_buffer.getData(), bytes_transferred)); - - if (m_parser.error() || - bytes_parsed != bytes_transferred) - { - // set ec manually and call failed() - return; - } - - if (ec == boost::asio::error::eof) - { - m_parser.process_eof(); - ec = error_code(); - } - - if (m_parser.error()) - { - // set ec manually and call failed() - return; - } - - if (! m_parser.finished()) - { - // Feed some headers to the callback - if (m_parser.fields().size() > 0) - { - handle_headers (); - if (m_closed) - return; - } - } - - if (m_parser.finished ()) - { - m_data_timer.cancel(); - - // VFALCO NOTE: Should we cancel this one? - m_request_timer.cancel(); - - if (! m_socket->needs_handshake()) - m_socket->shutdown (socket::shutdown_receive); - - handle_request (); - return; - } - - async_read_some(); -} - -// Called when we have some new headers. -// -void Peer::handle_headers () -{ - m_session.headersComplete = m_parser.headersComplete(); - m_session.headers = HTTPHeaders (m_parser.fields()); - m_impl.handler().onHeaders (m_session); -} - -// Called when we have a complete http request. -// -void Peer::handle_request () -{ - // This is to guarantee onHeaders is called at least once. - handle_headers(); - - if (m_closed) - return; - - m_session.request = m_parser.request(); - - // Turn the Content-Body into a linear buffer. - ContentBodyBuffer const& body (m_session.request->body ()); - m_session.content.resize (body.size ()); - boost::asio::buffer_copy ( - boost::asio::buffer (&m_session.content.front(), - m_session.content.size()), body.data()); - - // Process the HTTPRequest - m_impl.handler().onRequest (m_session); -} - -} -} diff --git a/src/ripple/http/impl/Peer.h b/src/ripple/http/impl/Peer.h index 7731a66ea..727d5d3ed 100644 --- a/src/ripple/http/impl/Peer.h +++ b/src/ripple/http/impl/Peer.h @@ -10,10 +10,14 @@ namespace ripple { namespace HTTP { +// Holds the copy of buffers being sent +typedef SharedArg SharedBuffer; + /** Represents an active connection. */ class Peer : public SharedObject , public AsyncObject + , public Session , public List ::Node , public LeakChecked { @@ -43,23 +47,435 @@ public: ScopedPointer m_socket; MemoryBlock m_buffer; HTTPParser m_parser; - SessionImpl m_session; int m_writesPending; bool m_closed; bool m_callClose; + Atomic m_detached; + SharedPtr m_detach_ref; + boost::optional m_work; + int m_errorCode; - Peer (ServerImpl& impl, Port const& port); - ~Peer (); - socket& get_socket(); - SessionImpl& session (); - void close (); - void cancel (); - void failed (error_code ec); - void asyncHandlersComplete (); - void write (void const* buffer, std::size_t bytes); - void handle_write (SharedBuffer const& buf, CompletionCounter); - void async_write (SharedBuffer const& buf); + //-------------------------------------------------------------------------- + Peer (ServerImpl& impl, Port const& port) + : m_impl (impl) + , m_strand (m_impl.get_io_service()) + , m_data_timer (m_impl.get_io_service()) + , m_request_timer (m_impl.get_io_service()) + , m_buffer (bufferSize) + , m_parser (HTTPParser::typeRequest) + , m_writesPending (0) + , m_closed (false) + , m_callClose (false) + , m_errorCode (0) + { + tag = nullptr; + + int flags; + switch (port.security) + { + default: + bassertfalse; + case Port::no_ssl: flags = MultiSocket::none; break; + case Port::allow_ssl: flags = MultiSocket::server_ssl; break; + case Port::require_ssl: flags = MultiSocket::server_ssl_required; break; + } + + m_socket = MultiSocket::New (m_impl.get_io_service(), port.context->get(), flags); + + m_impl.add (*this); + } + + ~Peer () + { + if (m_callClose) + m_impl.handler().onClose (session(), m_errorCode); + + m_impl.remove (*this); + } + + //-------------------------------------------------------------------------- + // + // Session + // + + Journal journal() + { + return m_impl.journal(); + } + + IPEndpoint remoteAddress() + { + return from_asio (get_socket().remote_endpoint()); + } + + bool headersComplete() + { + return m_parser.headersComplete(); + } + + HTTPHeaders headers() + { + return HTTPHeaders (m_parser.fields()); + } + + SharedPtr const& request() + { + return m_parser.request(); + } + + // Returns the Content-Body as a single buffer. + // VFALCO NOTE This is inefficient... + std::string content() + { + std::string s; + ContentBodyBuffer const& body ( + m_parser.request()->body ()); + s.resize (body.size ()); + boost::asio::buffer_copy ( + boost::asio::buffer (&s.front(), + s.size()), body.data()); + return s; + } + + // Send a copy of the data. + void write (void const* buffer, std::size_t bytes) + { + // Make sure this happens on an io_service thread. + m_impl.get_io_service().dispatch (m_strand.wrap ( + boost::bind (&Peer::handle_write, Ptr (this), + SharedBuffer (static_cast (buffer), bytes), + CompletionCounter (this)))); + } + + // Make the Session asynchronous + void detach () + { + if (m_detached.compareAndSetBool (1, 0)) + { + bassert (! m_work); + bassert (m_detach_ref.empty()); + + // Maintain an additional reference while detached + m_detach_ref = this; + + // Prevent the io_service from running out of work. + // The work object will be destroyed with the Peer + // after the Session is closed and handlers complete. + // + m_work = boost::in_place (boost::ref ( + m_impl.get_io_service())); + } + } + + // Called by the Handler to close the session. + void close () + { + // Make sure this happens on an io_service thread. + m_impl.get_io_service().dispatch (m_strand.wrap ( + boost::bind (&Peer::handle_close, Ptr (this), + CompletionCounter (this)))); + } + + //-------------------------------------------------------------------------- + // + // Completion Handlers + // + + // Called when the last pending completion handler returns. + void asyncHandlersComplete () + { + } + + // Called when the acceptor accepts our socket. + void handle_accept () + { + m_callClose = true; + + m_impl.handler().onAccept (session()); + + if (m_closed) + { + cancel(); + return; + } + + m_request_timer.expires_from_now ( + boost::posix_time::seconds ( + requestTimeoutSeconds)); + + m_request_timer.async_wait (m_strand.wrap (boost::bind ( + &Peer::handle_request_timer, Ptr(this), + boost::asio::placeholders::error, + CompletionCounter (this)))); + + if (m_socket->needs_handshake ()) + { + m_socket->async_handshake (Socket::server, m_strand.wrap ( + boost::bind (&Peer::handle_handshake, Ptr(this), + boost::asio::placeholders::error, + CompletionCounter (this)))); + } + else + { + async_read_some(); + } + } + + // Called from an io_service thread to write the shared buffer. + void handle_write (SharedBuffer const& buf, CompletionCounter) + { + async_write (buf); + } + + // Called when the handshake completes + // + void handle_handshake (error_code ec, CompletionCounter) + { + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec != 0) + { + failed (ec); + return; + } + + async_read_some(); + } + + // Called when the data timer expires + // + void handle_data_timer (error_code ec, CompletionCounter) + { + if (ec == boost::asio::error::operation_aborted) + return; + + if (m_closed) + return; + + if (ec != 0) + { + failed (ec); + return; + } + + failed (boost::system::errc::make_error_code ( + boost::system::errc::timed_out)); + } + + // Called when the request timer expires + // + void handle_request_timer (error_code ec, CompletionCounter) + { + if (ec == boost::asio::error::operation_aborted) + return; + + if (m_closed) + return; + + if (ec != 0) + { + failed (ec); + return; + } + + failed (boost::system::errc::make_error_code ( + boost::system::errc::timed_out)); + } + + // Called when async_write completes. + void handle_write (error_code ec, std::size_t bytes_transferred, + SharedBuffer buf, CompletionCounter) + { + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec != 0) + { + failed (ec); + return; + } + + bassert (m_writesPending > 0); + if (--m_writesPending == 0 && m_closed) + m_socket->shutdown (socket::shutdown_send); + } + + // Called when async_read_some completes. + void handle_read (error_code ec, std::size_t bytes_transferred, CompletionCounter) + { + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec != 0 && ec != boost::asio::error::eof) + { + failed (ec); + return; + } + + std::size_t const bytes_parsed (m_parser.process ( + m_buffer.getData(), bytes_transferred)); + + if (m_parser.error() || + bytes_parsed != bytes_transferred) + { + failed (boost::system::errc::make_error_code ( + boost::system::errc::bad_message)); + return; + } + + if (ec == boost::asio::error::eof) + { + m_parser.process_eof(); + ec = error_code(); + } + + if (m_parser.error()) + { + failed (boost::system::errc::make_error_code ( + boost::system::errc::bad_message)); + return; + } + + if (! m_parser.finished()) + { + // Feed some headers to the callback + if (m_parser.fields().size() > 0) + { + handle_headers(); + if (m_closed) + return; + } + } + + if (m_parser.finished ()) + { + m_data_timer.cancel(); + // VFALCO NOTE: Should we cancel this one? + m_request_timer.cancel(); + + if (! m_socket->needs_handshake()) + m_socket->shutdown (socket::shutdown_receive); + + handle_request (); + return; + } + + async_read_some(); + } + + // Called when we have some new headers. + void handle_headers () + { + m_impl.handler().onHeaders (session()); + } + + // Called when we have a complete http request. + void handle_request () + { + // This is to guarantee onHeaders is called at least once. + handle_headers(); + + if (m_closed) + return; + + // Process the HTTPRequest + m_impl.handler().onRequest (session()); + } + + // Called to close the session. + void handle_close (CompletionCounter) + { + m_closed = true; + + // Release our additional reference + m_detach_ref = nullptr; + } + + //-------------------------------------------------------------------------- + // + // Peer + // + + // Returns the asio socket for the peer. + socket& get_socket() + { + return m_socket->this_layer(); + } + + // Return the Session associated with this peer's session. + Session& session () + { + return *this; + } + + // Cancel all pending i/o and timers and send tcp shutdown. + void cancel () + { + error_code ec; + m_data_timer.cancel (ec); + m_request_timer.cancel (ec); + m_socket->cancel (ec); + m_socket->shutdown (socket::shutdown_both); + } + + // Called by a completion handler when error is not eof or aborted. + void failed (error_code const& ec) + { + m_errorCode = ec.value(); + bassert (m_errorCode != 0); + cancel (); + } + + // Call the async_read_some initiating function. + void async_read_some () + { + // re-arm the data timer + // (this cancels the previous wait, if any) + // + m_data_timer.expires_from_now ( + boost::posix_time::seconds ( + dataTimeoutSeconds)); + + m_data_timer.async_wait (m_strand.wrap (boost::bind ( + &Peer::handle_data_timer, Ptr(this), + boost::asio::placeholders::error, + CompletionCounter (this)))); + + // issue the read + // + boost::asio::mutable_buffers_1 buf ( + m_buffer.getData (), m_buffer.getSize ()); + + m_socket->async_read_some (buf, m_strand.wrap ( + boost::bind (&Peer::handle_read, Ptr (this), + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred, + CompletionCounter (this)))); + } + + // Send a shared buffer + void async_write (SharedBuffer const& buf) + { + bassert (buf.get().size() > 0); + + ++m_writesPending; + + // Send the copy. We pass the SharedBuffer in the last parameter + // so that a reference is maintained as the handler gets copied. + // When the final completion function returns, the reference + // count will drop to zero and the buffer will be freed. + // + boost::asio::async_write (*m_socket, + boost::asio::const_buffers_1 (&buf->front(), buf->size()), + m_strand.wrap (boost::bind (&Peer::handle_write, + Ptr (this), boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred, + buf, CompletionCounter (this)))); + } + + // Send a copy of a BufferSequence template void async_write (BufferSequence const& buffers) { @@ -77,23 +493,6 @@ public: boost::asio::buffer_size (buffer))); } } - - void async_read_some (); - - void handle_accept (); - void handle_handshake (error_code ec, CompletionCounter); - void handle_data_timer (error_code ec, CompletionCounter); - void handle_request_timer (error_code ec, CompletionCounter); - void handle_close (CompletionCounter); - - void handle_write (error_code ec, std::size_t bytes_transferred, - SharedBuffer buf, CompletionCounter); - - void handle_read (error_code ec, std::size_t bytes_transferred, - CompletionCounter); - - void handle_headers (); - void handle_request (); }; } diff --git a/src/ripple/http/impl/ServerImpl.cpp b/src/ripple/http/impl/ServerImpl.cpp new file mode 100644 index 000000000..69b8791a7 --- /dev/null +++ b/src/ripple/http/impl/ServerImpl.cpp @@ -0,0 +1,208 @@ +//------------------------------------------------------------------------------ +/* + Copyright (c) 2011-2013, OpenCoin, Inc. +*/ +//============================================================================== + +namespace ripple { +namespace HTTP { + +ServerImpl::ServerImpl (Server& server, Handler& handler, Journal journal) + : Thread ("HTTP::Server") + , m_server (server) + , m_handler (handler) + , m_journal (journal) + , m_strand (m_io_service) + , m_work (boost::in_place (boost::ref (m_io_service))) + , m_stopped (true) +{ + startThread (); +} + +ServerImpl::~ServerImpl () +{ + stopThread (); +} + +Journal const& ServerImpl::journal() const +{ + return m_journal; +} + +Ports const& ServerImpl::getPorts () const +{ + SharedState::UnlockedAccess state (m_state); + return state->ports; +} + +void ServerImpl::setPorts (Ports const& ports) +{ + SharedState::Access state (m_state); + state->ports = ports; + update(); +} + +bool ServerImpl::stopping () const +{ + return ! m_work; +} + +void ServerImpl::stop (bool wait) +{ + if (! stopping()) + { + m_work = boost::none; + update(); + } + + if (wait) + m_stopped.wait(); +} + +//-------------------------------------------------------------------------- +// +// Server +// + +Handler& ServerImpl::handler() +{ + return m_handler; +} + +boost::asio::io_service& ServerImpl::get_io_service() +{ + return m_io_service; +} + +// Inserts the peer into our list of peers. We only remove it +// from the list inside the destructor of the Peer object. This +// way, the Peer can never outlive the server. +// +void ServerImpl::add (Peer& peer) +{ + SharedState::Access state (m_state); + state->peers.push_back (peer); +} + +void ServerImpl::add (Door& door) +{ + SharedState::Access state (m_state); + state->doors.push_back (door); +} + +// Removes the peer from our list of peers. This is only called from +// the destructor of Peer. Essentially, the item in the list functions +// as a weak_ptr. +// +void ServerImpl::remove (Peer& peer) +{ + SharedState::Access state (m_state); + state->peers.erase (state->peers.iterator_to (peer)); +} + +void ServerImpl::remove (Door& door) +{ + SharedState::Access state (m_state); + state->doors.push_back (door); +} + +//-------------------------------------------------------------------------- +// +// Thread +// + +// Updates our Door list based on settings. +// +void ServerImpl::handle_update () +{ + if (! stopping()) + { + // Make a local copy to shorten the lock + // + Ports ports; + { + SharedState::ConstAccess state (m_state); + ports = state->ports; + } + + std::sort (ports.begin(), ports.end()); + + // Walk the Door list and the Port list simultaneously and + // build a replacement Door vector which we will then swap in. + // + Doors doors; + Doors::iterator door (m_doors.begin()); + for (Ports::const_iterator port (ports.begin()); + port != ports.end(); ++port) + { + int comp; + + while (door != m_doors.end() && + ((comp = compare (*port, (*door)->port())) > 0)) + { + (*door)->cancel(); + ++door; + } + + if (door != m_doors.end()) + { + if (comp < 0) + { + doors.push_back (new Door (*this, *port)); + } + else + { + // old Port and new Port are the same + doors.push_back (*door); + ++door; + } + } + else + { + doors.push_back (new Door (*this, *port)); + } + } + + // Any remaining Door objects are not in the new set, so cancel them. + // + for (;door != m_doors.end();) + (*door)->cancel(); + + m_doors.swap (doors); + } + else + { + // Cancel pending I/O on all doors. + // + for (Doors::iterator iter (m_doors.begin()); + iter != m_doors.end(); ++iter) + { + (*iter)->cancel(); + } + + // Remove our references to the old doors. + // + m_doors.resize (0); + } +} + +// Causes handle_update to run on the io_service +// +void ServerImpl::update () +{ + m_io_service.post (m_strand.wrap (boost::bind ( + &ServerImpl::handle_update, this))); +} + +// The main i/o processing loop. +// +void ServerImpl::run () +{ + m_io_service.run (); + + m_stopped.signal(); + m_handler.onStopped (m_server); +} + +} +} diff --git a/src/ripple/http/impl/ServerImpl.h b/src/ripple/http/impl/ServerImpl.h index c76b4df98..daa9b3517 100644 --- a/src/ripple/http/impl/ServerImpl.h +++ b/src/ripple/http/impl/ServerImpl.h @@ -10,6 +10,9 @@ namespace ripple { namespace HTTP { +class Door; +class Peer; + class ServerImpl : public Thread { public: @@ -26,7 +29,7 @@ public: }; typedef SharedData SharedState; - typedef std::vector Doors; + typedef std::vector > Doors; Server& m_server; Handler& m_handler; @@ -38,204 +41,24 @@ public: SharedState m_state; Doors m_doors; - //-------------------------------------------------------------------------- + ServerImpl (Server& server, Handler& handler, Journal journal); + ~ServerImpl (); + Journal const& journal() const; + Ports const& getPorts () const; + void setPorts (Ports const& ports); + bool stopping () const; + void stop (bool wait); - ServerImpl (Server& server, Handler& handler, Journal journal) - : Thread ("HTTP::Server") - , m_server (server) - , m_handler (handler) - , m_journal (journal) - , m_strand (m_io_service) - , m_work (boost::in_place (boost::ref (m_io_service))) - , m_stopped (true) - { - startThread (); - } + Handler& handler(); + boost::asio::io_service& get_io_service(); + void add (Peer& peer); + void add (Door& door); + void remove (Peer& peer); + void remove (Door& door); - ~ServerImpl () - { - stopThread (); - } - - Journal const& journal() const - { - return m_journal; - } - - Ports const& getPorts () const - { - SharedState::UnlockedAccess state (m_state); - return state->ports; - } - - void setPorts (Ports const& ports) - { - SharedState::Access state (m_state); - state->ports = ports; - update(); - } - - bool stopping () const - { - return ! m_work; - } - - void stop (bool wait) - { - if (! stopping()) - { - m_work = boost::none; - update(); - } - - if (wait) - m_stopped.wait(); - } - - //-------------------------------------------------------------------------- - // - // Server - // - - Handler& handler() - { - return m_handler; - } - - boost::asio::io_service& get_io_service() - { - return m_io_service; - } - - // Inserts the peer into our list of peers. We only remove it - // from the list inside the destructor of the Peer object. This - // way, the Peer can never outlive the server. - // - void add (Peer& peer) - { - SharedState::Access state (m_state); - state->peers.push_back (peer); - } - - void add (Door& door) - { - SharedState::Access state (m_state); - state->doors.push_back (door); - } - - // Removes the peer from our list of peers. This is only called from - // the destructor of Peer. Essentially, the item in the list functions - // as a weak_ptr. - // - void remove (Peer& peer) - { - SharedState::Access state (m_state); - state->peers.erase (state->peers.iterator_to (peer)); - } - - void remove (Door& door) - { - SharedState::Access state (m_state); - state->doors.push_back (door); - } - - //-------------------------------------------------------------------------- - // - // Thread - // - - // Updates our Door list based on settings. - // - void handle_update () - { - if (! stopping()) - { - // Make a local copy to shorten the lock - // - Ports ports; - { - SharedState::ConstAccess state (m_state); - ports = state->ports; - } - - std::sort (ports.begin(), ports.end()); - - // Walk the Door list and the Port list simultaneously and - // build a replacement Door vector which we will then swap in. - // - Doors doors; - Doors::iterator door (m_doors.begin()); - for (Ports::const_iterator port (ports.begin()); - port != ports.end(); ++port) - { - int comp; - - while (door != m_doors.end() && - ((comp = compare (*port, (*door)->port())) > 0)) - { - (*door)->cancel(); - ++door; - } - - if (door != m_doors.end()) - { - if (comp < 0) - { - doors.push_back (new Door (*this, *port)); - } - else - { - // old Port and new Port are the same - doors.push_back (*door); - ++door; - } - } - else - { - doors.push_back (new Door (*this, *port)); - } - } - - // Any remaining Door objects are not in the new set, so cancel them. - // - for (;door != m_doors.end();) - (*door)->cancel(); - - m_doors.swap (doors); - } - else - { - // Cancel pending I/O on all doors. - // - for (Doors::iterator iter (m_doors.begin()); - iter != m_doors.end(); ++iter) - { - (*iter)->cancel(); - } - - // Remove our references to the old doors. - // - m_doors.resize (0); - } - } - - // Causes handle_update to run on the io_service - // - void update () - { - m_io_service.post (m_strand.wrap (boost::bind ( - &ServerImpl::handle_update, this))); - } - - // The main i/o processing loop. - // - void run () - { - m_io_service.run (); - - m_stopped.signal(); - m_handler.onStopped (m_server); - } + void handle_update (); + void update (); + void run (); }; } diff --git a/src/ripple/http/impl/Session.cpp b/src/ripple/http/impl/Session.cpp deleted file mode 100644 index 08eb153a7..000000000 --- a/src/ripple/http/impl/Session.cpp +++ /dev/null @@ -1,24 +0,0 @@ -//------------------------------------------------------------------------------ -/* - Copyright (c) 2011-2013, OpenCoin, Inc. -*/ -//============================================================================== - -namespace ripple { -namespace HTTP { - -Session::Session () - : headersComplete (false) - , tag (nullptr) -{ - content.reserve (1000); -} - -ScopedStream Session::operator<< ( - std::ostream& manip (std::ostream&)) -{ - return ScopedStream (*this, manip); -} - -} -} diff --git a/src/ripple/http/impl/SessionImpl.cpp b/src/ripple/http/impl/SessionImpl.cpp deleted file mode 100644 index fab4fcef6..000000000 --- a/src/ripple/http/impl/SessionImpl.cpp +++ /dev/null @@ -1,48 +0,0 @@ -//------------------------------------------------------------------------------ -/* - Copyright (c) 2011-2013, OpenCoin, Inc. -*/ -//============================================================================== - -namespace ripple { -namespace HTTP { - -SessionImpl::SessionImpl (Peer& peer) - : m_peer (peer) -{ -} - -SessionImpl::~SessionImpl () -{ -} - -void SessionImpl::write (void const* buffer, std::size_t bytes) -{ - m_peer.write (buffer, bytes); -} - -// Called from an io_service thread -void SessionImpl::handle_close() -{ - m_peer_ref = nullptr; -} - -void SessionImpl::close() -{ - m_peer.close(); -} - -void SessionImpl::detach() -{ - if (m_detached.compareAndSetBool (1, 0)) - { - bassert (! m_work); - bassert (m_peer_ref.empty()); - m_peer_ref = &m_peer; - m_work = boost::in_place (boost::ref ( - m_peer.m_impl.get_io_service())); - } -} - -} -} diff --git a/src/ripple/http/impl/SessionImpl.h b/src/ripple/http/impl/SessionImpl.h deleted file mode 100644 index f3a9a8eac..000000000 --- a/src/ripple/http/impl/SessionImpl.h +++ /dev/null @@ -1,37 +0,0 @@ -//------------------------------------------------------------------------------ -/* - Copyright (c) 2011-2013, OpenCoin, Inc. -*/ -//============================================================================== - -#ifndef RIPPLE_HTTP_SESSIONIMPL_H_INCLUDED -#define RIPPLE_HTTP_SESSIONIMPL_H_INCLUDED - -namespace ripple { -namespace HTTP { - -// Holds the copy of buffers being sent -typedef SharedArg SharedBuffer; - -class Peer; - -class SessionImpl : public Session -{ -public: - Peer& m_peer; - Atomic m_detached; - SharedPtr m_peer_ref; - boost::optional m_work; - - explicit SessionImpl (Peer& peer); - ~SessionImpl (); - void write (void const* buffer, std::size_t bytes); - void close(); - void handle_close(); - void detach(); -}; - -} -} - -#endif diff --git a/src/ripple/http/ripple_http.cpp b/src/ripple/http/ripple_http.cpp index b79023eb9..1a8a997ff 100644 --- a/src/ripple/http/ripple_http.cpp +++ b/src/ripple/http/ripple_http.cpp @@ -16,14 +16,10 @@ #include "impl/Port.cpp" #include "impl/ScopedStream.cpp" -#include "impl/Session.cpp" -# include "impl/Types.h" -# include "impl/SessionImpl.h" -# include "impl/Peer.h" -# include "impl/Door.h" -# include "impl/ServerImpl.h" -#include "impl/Door.cpp" -#include "impl/Peer.cpp" +# include "impl/Types.h" +# include "impl/ServerImpl.h" +# include "impl/Peer.h" +# include "impl/Door.h" +#include "impl/ServerImpl.cpp" #include "impl/Server.cpp" -#include "impl/SessionImpl.cpp" diff --git a/src/ripple_app/main/RPCHTTPServer.cpp b/src/ripple_app/main/RPCHTTPServer.cpp index b43e4dd12..2486a6cc0 100644 --- a/src/ripple_app/main/RPCHTTPServer.cpp +++ b/src/ripple_app/main/RPCHTTPServer.cpp @@ -96,7 +96,7 @@ public: { // Reject non-loopback connections if RPC_ALLOW_REMOTE is not set if (! getConfig().RPC_ALLOW_REMOTE && - ! session.remoteAddress.isLoopback()) + ! session.remoteAddress().isLoopback()) { session.close(); } @@ -119,7 +119,7 @@ public: #endif } - void onClose (HTTP::Session& session) + void onClose (HTTP::Session& session, int errorCode) { } @@ -133,7 +133,7 @@ public: void processSession (Job& job, HTTP::Session& session) { session.write (m_deprecatedHandler.processRequest ( - session.content, session.remoteAddress.to_string())); + session.content(), session.remoteAddress().withPort(0).to_string())); session.close(); }