From 04bcd93ba3de768addb5160840f60b1f30c21dc9 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Tue, 26 Aug 2014 09:25:28 -0700 Subject: [PATCH] HTTP(S)-RPC server improvements (RIPD-489, RIPD-533): * Correct handling of Keep-Alive in socket handlers * Report session history in print command --- Builds/VisualStudio2013/RippleD.vcxproj | 24 +- .../VisualStudio2013/RippleD.vcxproj.filters | 29 +- src/ripple/common/MultiSocket.h | 2 + src/ripple/http/Server.h | 18 +- src/ripple/http/Session.h | 57 +- src/ripple/http/impl/Door.cpp | 18 +- src/ripple/http/impl/Door.h | 2 +- src/ripple/http/impl/Peer.cpp | 663 +++++++++--------- src/ripple/http/impl/Peer.h | 194 +++-- src/ripple/http/impl/Port.cpp | 2 +- src/ripple/http/impl/Server.cpp | 6 + src/ripple/http/impl/ServerImpl.cpp | 169 +++-- src/ripple/http/impl/ServerImpl.h | 53 +- src/ripple/http/tests/Server.test.cpp | 254 +++++++ src/ripple/module/app/main/Application.cpp | 3 +- src/ripple/module/app/main/RPCHTTPServer.cpp | 78 ++- src/ripple/module/app/main/RPCHTTPServer.h | 24 +- src/ripple/overlay/impl/PeerImp.cpp | 37 +- src/ripple/overlay/impl/PeerImp.h | 13 +- src/ripple/overlay/impl/peer_info.h | 2 +- src/ripple/unity/http.cpp | 1 + 21 files changed, 1055 insertions(+), 594 deletions(-) create mode 100644 src/ripple/http/tests/Server.test.cpp diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index 38e2a61142..fc1b5b5ff6 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -351,23 +351,25 @@ - + + + - + True - + True - + True @@ -386,15 +388,14 @@ True - - True - True True + + @@ -405,9 +406,6 @@ - - True - True @@ -417,6 +415,9 @@ True + + True + True @@ -1940,6 +1941,9 @@ + + True + True diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index 34f7c5f85e..8daa6415f0 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -337,6 +337,9 @@ {43D68742-4714-D103-EE00-EB10BD045FB6} + + {AA0D98CC-99E6-61CE-86D7-35156DC4EE55} + {BEDCC703-A2C8-FF25-7E1E-3471BD39ED98} @@ -957,28 +960,31 @@ beast - + beast\http beast\http + + beast\http + beast\http beast\http\detail - + beast\http beast\http - + beast\http\impl - + beast\http\impl @@ -999,15 +1005,15 @@ beast\http\impl - - beast\http\impl - beast\http\impl beast\http\impl + + beast\http + beast\http @@ -1023,9 +1029,6 @@ beast\http - - beast\http\tests - beast\http\tests @@ -1035,6 +1038,9 @@ beast\http\tests + + beast\http\tests + beast\http\tests @@ -2946,6 +2952,9 @@ ripple\http + + ripple\http\tests + ripple\json\impl diff --git a/src/ripple/common/MultiSocket.h b/src/ripple/common/MultiSocket.h index e116164d5c..02843880a3 100644 --- a/src/ripple/common/MultiSocket.h +++ b/src/ripple/common/MultiSocket.h @@ -110,12 +110,14 @@ public: virtual SSL* ssl_handle () = 0; // Caller owns the socket + // VFALCO TODO return std::unique_ptr static MultiSocket* New ( boost::asio::ip::tcp::socket& socket, boost::asio::ssl::context& ssl_context, int flags = 0); // Caller owns the io_service + // VFALCO TODO return std::unique_ptr static MultiSocket* New ( boost::asio::io_service& io_service, boost::asio::ssl::context& ssl_context, diff --git a/src/ripple/http/Server.h b/src/ripple/http/Server.h index 10de42c116..dfd759f769 100644 --- a/src/ripple/http/Server.h +++ b/src/ripple/http/Server.h @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include #include #include @@ -35,7 +37,7 @@ namespace HTTP { /** Configuration information for a server listening port. */ struct Port { - enum Security + enum class Security { no_ssl, allow_ssl, @@ -74,19 +76,15 @@ struct Handler /** Called when the connection is accepted and we know remoteAddress. */ virtual void onAccept (Session& session) = 0; - /** Called repeatedly as new HTTP headers are received. - Guaranteed to be called at least once. - */ - virtual void onHeaders (Session& session) = 0; - - /** Called when we have the full Content-Body. */ + /** Called when we have a complete HTTP request. */ virtual void onRequest (Session& session) = 0; /** Called when the session ends. Guaranteed to be called once. @param errorCode Non zero for a failed connection. */ - virtual void onClose (Session& session, int errorCode) = 0; + virtual void onClose (Session& session, + boost::system::error_code const& ec) = 0; /** Called when the server has finished its stop. */ virtual void onStopped (Server& server) = 0; @@ -108,6 +106,7 @@ public: */ virtual ~Server (); + /** Returns the Journal associated with the server. */ beast::Journal journal () const; @@ -145,6 +144,9 @@ public: void stop (); + void + onWrite (beast::PropertyStream::Map& map); + private: std::unique_ptr m_impl; }; diff --git a/src/ripple/http/Session.h b/src/ripple/http/Session.h index 54542c5e61..85a280af0e 100644 --- a/src/ripple/http/Session.h +++ b/src/ripple/http/Session.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_HTTP_SESSION_H_INCLUDED #define RIPPLE_HTTP_SESSION_H_INCLUDED +#include #include #include #include @@ -88,26 +89,24 @@ public: void* tag; /** Returns the Journal to use for logging. */ - virtual beast::Journal journal() = 0; + virtual + beast::Journal + journal() = 0; /** Returns the remote address of the connection. */ - virtual beast::IP::Endpoint remoteAddress() = 0; - - /** Returns `true` if the full HTTP headers have been received. */ - virtual bool headersComplete() = 0; + virtual + beast::IP::Endpoint + remoteAddress() = 0; /** Returns the currently known set of headers. */ - virtual beast::HTTPHeaders headers() = 0; - - /** Returns the complete HTTP request when it is known. */ - virtual beast::SharedPtr const& request() = 0; - - /** Returns the entire Content-Body, if the request is complete. */ - virtual std::string content() = 0; + virtual + beast::http::message& + message() = 0; /** Send a copy of data asynchronously. */ /** @{ */ - void write (std::string const& s) + void + write (std::string const& s) { if (! s.empty()) write (&s[0], @@ -115,7 +114,8 @@ public: } template - void write (BufferSequence const& buffers) + void + write (BufferSequence const& buffers) { for (typename BufferSequence::const_iterator iter (buffers.begin()); iter != buffers.end(); ++iter) @@ -126,35 +126,54 @@ public: } } - virtual void write (void const* buffer, std::size_t bytes) = 0; + virtual + void + write (void const* buffer, std::size_t bytes) = 0; /** @} */ /** Output support using ostream. */ /** @{ */ - ScopedStream operator<< (std::ostream& manip (std::ostream&)) + ScopedStream + operator<< (std::ostream& manip (std::ostream&)) { return ScopedStream (*this, manip); } template - ScopedStream operator<< (T const& t) + ScopedStream + operator<< (T const& t) { return ScopedStream (*this, t); } /** @} */ + /** Indicate that the response is complete. + The handler should call this when it has completed writing + the response. If Keep-Alive is indicated on the connection, + this will trigger a read for the next request; else, the + connection will be closed when all remaining data has been sent. + */ + virtual + void + complete() = 0; + /** Detach the session. This holds the session open so that the response can be sent asynchronously. Calls to io_service::run made by the server will not return until all detached sessions are closed. */ - virtual void detach() = 0; + virtual + void + detach() = 0; /** Close the session. This will be performed asynchronously. The session will be closed gracefully after all pending writes have completed. + @param graceful `true` to wait until all data has finished sending. */ - virtual void close() = 0; + virtual + void + close (bool graceful) = 0; }; } // namespace HTTP diff --git a/src/ripple/http/impl/Door.cpp b/src/ripple/http/impl/Door.cpp index aa00157e0f..9b0c0fea88 100644 --- a/src/ripple/http/impl/Door.cpp +++ b/src/ripple/http/impl/Door.cpp @@ -25,31 +25,31 @@ namespace ripple { namespace HTTP { Door::Door (ServerImpl& impl, Port const& port) - : impl_ (impl) - , acceptor_ (impl_.get_io_service(), to_asio (port)) + : server_ (impl) + , acceptor_ (server_.get_io_service(), to_asio (port)) , port_ (port) { - impl_.add (*this); + server_.add (*this); error_code ec; acceptor_.set_option (acceptor::reuse_address (true), ec); if (ec) { - impl_.journal().error << + server_.journal().error << "Error setting acceptor socket option: " << ec.message(); } if (! ec) { - impl_.journal().info << "Bound to endpoint " << + server_.journal().info << "Bound to endpoint " << to_string (acceptor_.local_endpoint()); async_accept(); } else { - impl_.journal().error << "Error binding to endpoint " << + server_.journal().error << "Error binding to endpoint " << to_string (acceptor_.local_endpoint()) << ", '" << ec.message() << "'"; } @@ -57,7 +57,7 @@ Door::Door (ServerImpl& impl, Port const& port) Door::~Door () { - impl_.remove (*this); + server_.remove (*this); } Port const& @@ -80,7 +80,7 @@ Door::failed (error_code ec) void Door::async_accept () { - auto const peer (std::make_shared (impl_, port_)); + auto const peer (std::make_shared (server_, port_, server_.journal())); acceptor_.async_accept (peer->get_socket(), std::bind ( &Door::handle_accept, Ptr(this), beast::asio::placeholders::error, peer)); @@ -95,7 +95,7 @@ Door::handle_accept (error_code ec, if (ec) { - impl_.journal().error << "Accept failed: " << ec.message(); + server_.journal().error << "Accept failed: " << ec.message(); return; } diff --git a/src/ripple/http/impl/Door.h b/src/ripple/http/impl/Door.h index 35a8f645eb..356c157ade 100644 --- a/src/ripple/http/impl/Door.h +++ b/src/ripple/http/impl/Door.h @@ -37,7 +37,7 @@ private: // VFALCO TODO Use shared_ptr typedef beast::SharedPtr Ptr; - ServerImpl& impl_; + ServerImpl& server_; acceptor acceptor_; Port port_; diff --git a/src/ripple/http/impl/Peer.cpp b/src/ripple/http/impl/Peer.cpp index 5f11172bf4..ec4197c071 100644 --- a/src/ripple/http/impl/Peer.cpp +++ b/src/ripple/http/impl/Peer.cpp @@ -18,91 +18,386 @@ //============================================================================== #include +#include namespace ripple { namespace HTTP { -Peer::Peer (ServerImpl& impl, Port const& port) - : impl_ (impl) - , strand_ (impl_.get_io_service()) - , data_timer_ (impl_.get_io_service()) - , request_timer_ (impl_.get_io_service()) - , buffer_ (bufferSize) - , writesPending_ (0) +Peer::Peer (ServerImpl& server, Port const& port, beast::Journal journal) + : journal_ (journal) + , server_ (server) + , strand_ (server_.get_io_service()) + , timer_ (server_.get_io_service()) + , parser_ (message_, true) + , pending_writes_ (0) , closed_ (false) , callClose_ (false) - , errorCode_ (0) , detached_ (0) + , request_count_ (0) + , bytes_in_ (0) + , bytes_out_ (0) { + static std::atomic sid; + id_ = std::string("#") + std::to_string(sid++); + tag = nullptr; int flags = MultiSocket::Flag::server_role; + switch (port.security) { - default: - bassertfalse; - - case Port::no_ssl: + case Port::Security::no_ssl: break; - case Port::allow_ssl: + case Port::Security::allow_ssl: flags |= MultiSocket::Flag::ssl; break; - case Port::require_ssl: + case Port::Security::require_ssl: flags |= MultiSocket::Flag::ssl_required; break; } socket_.reset (MultiSocket::New ( - impl_.get_io_service(), port.context->get(), flags)); + server_.get_io_service(), port.context->get(), flags)); - impl_.add (*this); + server_.add (*this); + + if (journal_.trace) journal_.trace << + id_ << " created"; } + Peer::~Peer () { if (callClose_) - impl_.handler().onClose (session(), errorCode_); + { + Stat stat; + stat.when = std::move (when_str_); + stat.elapsed = std::chrono::duration_cast < + std::chrono::seconds> (clock_type::now() - when_); + stat.requests = request_count_; + stat.bytes_in = bytes_in_; + stat.bytes_out = bytes_out_; + stat.ec = std::move (ec_); - impl_.remove (*this); + server_.report (std::move (stat)); + if (journal_.trace) journal_.trace << + id_ << " onClose (" << ec_ << ")"; + + server_.handler().onClose (session(), ec_); + } + + server_.remove (*this); + + if (journal_.trace) journal_.trace << + id_ << " destroyed"; } -//-------------------------------------------------------------------------- +//------------------------------------------------------------------------------ -// Returns the Content-Body as a single buffer. -// VFALCO NOTE This is inefficient... -std::string -Peer::content() +// Called when the acceptor accepts our socket. +void +Peer::accept () { - std::string s; - beast::DynamicBuffer const& body ( - parser_.request()->body ()); - s.resize (body.size ()); - boost::asio::buffer_copy ( - boost::asio::buffer (&s[0], - s.size()), body.data ()); - return s; + if (! strand_.running_in_this_thread()) + return server_.get_io_service().dispatch (strand_.wrap ( + std::bind (&Peer::accept, shared_from_this()))); + + if (journal_.trace) journal_.trace << + id_ << " accept"; + + callClose_ = true; + + when_ = clock_type::now(); + when_str_ = beast::Time::getCurrentTime().formatted ( + "%Y-%b-%d %H:%M:%S").toStdString(); + + if (journal_.trace) journal_.trace << + id_ << " onAccept"; + + server_.handler().onAccept (session()); + + // Handler might have closed + if (closed_) + { + // VFALCO TODO Is this the correct way to close the socket? + // See what state the socket is in and verify. + cancel(); + return; + } + + if (socket_->needs_handshake ()) + { + start_timer(); + + socket_->async_handshake (beast::asio::abstract_socket::server, + strand_.wrap (std::bind (&Peer::on_ssl_handshake, shared_from_this(), + beast::asio::placeholders::error))); + } + else + { + on_read_request (error_code{}, 0); + } } +// Cancel all pending i/o and timers and send tcp shutdown. +void +Peer::cancel () +{ + if (journal_.trace) journal_.trace << + id_ << " cancel"; + + error_code ec; + timer_.cancel (ec); + socket_->cancel (ec); + socket_->shutdown (socket::shutdown_both, ec); +} + +// Called by a completion handler when error is not eof or aborted. +void +Peer::failed (error_code const& ec) +{ + if (journal_.trace) journal_.trace << + id_ << " failed, " << ec.message(); + + ec_ = ec; + assert (ec_); + cancel (); +} + +// Start the timer. +// If the timer expires, the session is considered +// timed out and will be forcefully closed. +void +Peer::start_timer() +{ + if (journal_.trace) journal_.trace << + id_ << " start_timer"; + + timer_.expires_from_now ( + boost::posix_time::seconds ( + timeoutSeconds)); + + timer_.async_wait (strand_.wrap (std::bind ( + &Peer::on_timer, shared_from_this(), + beast::asio::placeholders::error))); +} + +// Send a shared buffer +void +Peer::async_write (SharedBuffer const& buf) +{ + assert (buf.get().size() > 0); + + ++pending_writes_; + + if (journal_.trace) journal_.trace << + id_ << " async_write, pending_writes = " << pending_writes_; + + start_timer(); + + // Send the copy. We pass the SharedBuffer in the last parameter + // so that a reference is maintained as the handler gets copied. + // When the final completion function returns, the reference + // count will drop to zero and the buffer will be freed. + // + boost::asio::async_write (*socket_, + boost::asio::const_buffers_1 (&(*buf)[0], buf->size()), + strand_.wrap (std::bind (&Peer::on_write_response, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred, buf))); +} + +//------------------------------------------------------------------------------ + +// Called when session times out +void +Peer::on_timer (error_code ec) +{ + if (ec == boost::asio::error::operation_aborted) + return; + + if (! ec) + ec = boost::system::errc::make_error_code ( + boost::system::errc::timed_out); + + failed (ec); +} + +// Called when the handshake completes +void +Peer::on_ssl_handshake (error_code ec) +{ + if (journal_.trace) journal_.trace << + id_ << " on_ssl_handshake, " << ec.message(); + + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec != 0) + { + failed (ec); + return; + } + + on_read_request (error_code{}, 0); +} + +// Called repeatedly with the http request data +void +Peer::on_read_request (error_code ec, std::size_t bytes_transferred) +{ + if (journal_.trace) + { + if (! ec_) + journal_.trace << + id_ << " on_read_request, " << + bytes_transferred << " bytes"; + else + journal_.trace << + id_ << " on_read_request failed, " << + ec.message(); + } + + if (ec == boost::asio::error::operation_aborted) + return; + + bool const eof = ec == boost::asio::error::eof; + if (eof) + ec = error_code{}; + + if (! ec) + { + bytes_in_ += bytes_transferred; + + read_buf_.commit (bytes_transferred); + + std::pair result; + + if (! eof) + { + result = parser_.write (read_buf_.data()); + if (result.first) + read_buf_.consume (result.second); + else + ec = parser_.error(); + } + else + { + result.first = parser_.write_eof(); + if (! result.first) + ec = parser_.error(); + } + + if (! ec) + { + if (parser_.complete()) + { + // ??? + //if (! socket_->needs_handshake()) + // socket_->shutdown (socket::shutdown_receive, ec); + + if (journal_.trace) journal_.trace << + id_ << " onRequest"; + + ++request_count_; + + server_.handler().onRequest (session()); + return; + } + } + } + + if (ec) + { + failed (ec); + return; + } + + start_timer(); + + socket_->async_read_some (read_buf_.prepare (bufferSize), strand_.wrap ( + std::bind (&Peer::on_read_request, shared_from_this(), + beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); +} + +// Called when async_write completes. +void +Peer::on_write_response (error_code ec, std::size_t bytes_transferred, + SharedBuffer const& buf) +{ + timer_.cancel (ec); + + if (journal_.trace) + { + if (! ec_) + journal_.trace << + id_ << " on_write_response, " << + bytes_transferred << " bytes"; + else + journal_.trace << + id_ << " on_write_response failed, " << + ec.message(); + } + + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec != 0) + { + failed (ec); + return; + } + + bytes_out_ += bytes_transferred; + + assert (pending_writes_ > 0); + if (--pending_writes_ > 0) + return; + + if (closed_) + { + socket_->shutdown (socket::shutdown_send, ec); + return; + + } +} + +//------------------------------------------------------------------------------ + // Send a copy of the data. void Peer::write (void const* buffer, std::size_t bytes) { - // Make sure this happens on an io_service thread. - impl_.get_io_service().dispatch (strand_.wrap ( + server_.get_io_service().dispatch (strand_.wrap ( std::bind (&Peer::async_write, shared_from_this(), SharedBuffer (static_cast (buffer), bytes)))); } +// Called to indicate the response has been written (but not sent) +void +Peer::complete() +{ + if (! strand_.running_in_this_thread()) + return server_.get_io_service().dispatch (strand_.wrap ( + std::bind (&Peer::complete, shared_from_this()))); + + message_ = beast::http::message{}; + parser_ = beast::http::parser{message_, true}; + + on_read_request (error_code{}, 0); +} + // Make the Session asynchronous void Peer::detach () { if (detached_.exchange (1) == 0) { - bassert (! work_); - bassert (detach_ref_ == nullptr); + assert (! work_); + assert (detach_ref_ == nullptr); // Maintain an additional reference while detached detach_ref_ = shared_from_this(); @@ -112,298 +407,32 @@ Peer::detach () // after the Session is closed and handlers complete. // work_ = boost::in_place (std::ref ( - impl_.get_io_service())); + server_.get_io_service())); } } -// Called by the Handler to close the session. +// Called from the Handler to close the session. void -Peer::close () -{ - // Make sure this happens on an io_service thread. - impl_.get_io_service().dispatch (strand_.wrap ( - std::bind (&Peer::handle_close, shared_from_this()))); -} - -//-------------------------------------------------------------------------- - -// Called when the handshake completes -// -void -Peer::handle_handshake (error_code ec) -{ - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec != 0) - { - failed (ec); - return; - } - - async_read_some(); -} - -// Called when the data timer expires -// -void -Peer::handle_data_timer (error_code ec) -{ - if (ec == boost::asio::error::operation_aborted) - return; - - if (closed_) - return; - - if (ec != 0) - { - failed (ec); - return; - } - - failed (boost::system::errc::make_error_code ( - boost::system::errc::timed_out)); -} - -// Called when the request timer expires -// -void -Peer::handle_request_timer (error_code ec) -{ - if (ec == boost::asio::error::operation_aborted) - return; - - if (closed_) - return; - - if (ec != 0) - { - failed (ec); - return; - } - - failed (boost::system::errc::make_error_code ( - boost::system::errc::timed_out)); -} - -// Called when async_write completes. -void -Peer::handle_write (error_code ec, std::size_t bytes_transferred, - SharedBuffer const& buf) -{ - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec != 0) - { - failed (ec); - return; - } - - bassert (writesPending_ > 0); - if (--writesPending_ == 0 && closed_) - socket_->shutdown (socket::shutdown_send, ec); -} - -// Called when async_read_some completes. -void -Peer::handle_read (error_code ec, std::size_t bytes_transferred) -{ - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec != 0 && ec != boost::asio::error::eof) - { - failed (ec); - return; - } - - std::size_t const bytes_parsed (parser_.process ( - buffer_.getData(), bytes_transferred)); - - if (parser_.error() || - bytes_parsed != bytes_transferred) - { - failed (boost::system::errc::make_error_code ( - boost::system::errc::bad_message)); - return; - } - - if (ec == boost::asio::error::eof) - { - parser_.process_eof(); - ec = error_code(); - } - - if (parser_.error()) - { - failed (boost::system::errc::make_error_code ( - boost::system::errc::bad_message)); - return; - } - - if (! parser_.finished()) - { - // Feed some headers to the callback - if (parser_.fields().size() > 0) - { - handle_headers(); - if (closed_) - return; - } - } - - if (parser_.finished ()) - { - data_timer_.cancel(); - // VFALCO NOTE: Should we cancel this one? - request_timer_.cancel(); - - if (! socket_->needs_handshake()) - socket_->shutdown (socket::shutdown_receive, ec); - - handle_request (); - return; - } - - async_read_some(); -} - -// Called when we have some new headers. -void -Peer::handle_headers () -{ - impl_.handler().onHeaders (session()); -} - -// Called when we have a complete http request. -void -Peer::handle_request () -{ - // This is to guarantee onHeaders is called at least once. - handle_headers(); - - if (closed_) - return; - - // Process the HTTPRequest - impl_.handler().onRequest (session()); -} - -// Called to close the session. -void -Peer::handle_close () +Peer::close (bool graceful) { closed_ = true; + if (! strand_.running_in_this_thread()) + return server_.get_io_service().dispatch (strand_.wrap ( + std::bind (&Peer::close, shared_from_this(), graceful))); + + if (! graceful || pending_writes_ == 0) + { + // We should cancel pending i/o + + if (pending_writes_ == 0) + { + } + } + // Release our additional reference detach_ref_.reset(); } -//-------------------------------------------------------------------------- -// -// Peer -// - -// Called when the acceptor accepts our socket. -void -Peer::accept () -{ - callClose_ = true; - - impl_.handler().onAccept (session()); - - if (closed_) - { - cancel(); - return; - } - - request_timer_.expires_from_now ( - boost::posix_time::seconds ( - requestTimeoutSeconds)); - - request_timer_.async_wait (strand_.wrap (std::bind ( - &Peer::handle_request_timer, shared_from_this(), - beast::asio::placeholders::error))); - - if (socket_->needs_handshake ()) - { - socket_->async_handshake (beast::asio::abstract_socket::server, - strand_.wrap (std::bind (&Peer::handle_handshake, shared_from_this(), - beast::asio::placeholders::error))); - } - else - { - async_read_some(); - } -} - -// Cancel all pending i/o and timers and send tcp shutdown. -void -Peer::cancel () -{ - error_code ec; - data_timer_.cancel (ec); - request_timer_.cancel (ec); - socket_->cancel (ec); - socket_->shutdown (socket::shutdown_both, ec); -} - -// Called by a completion handler when error is not eof or aborted. -void -Peer::failed (error_code const& ec) -{ - errorCode_ = ec.value(); - bassert (errorCode_ != 0); - cancel (); -} - -// Call the async_read_some initiating function. -void -Peer::async_read_some () -{ - // re-arm the data timer - // (this cancels the previous wait, if any) - // - data_timer_.expires_from_now ( - boost::posix_time::seconds ( - dataTimeoutSeconds)); - - data_timer_.async_wait (strand_.wrap (std::bind ( - &Peer::handle_data_timer, shared_from_this(), - beast::asio::placeholders::error))); - - // issue the read - // - boost::asio::mutable_buffers_1 buf ( - buffer_.getData (), buffer_.getSize ()); - - socket_->async_read_some (buf, strand_.wrap ( - std::bind (&Peer::handle_read, shared_from_this(), - beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); -} - -// Send a shared buffer -void -Peer::async_write (SharedBuffer const& buf) -{ - bassert (buf.get().size() > 0); - - ++writesPending_; - - // Send the copy. We pass the SharedBuffer in the last parameter - // so that a reference is maintained as the handler gets copied. - // When the final completion function returns, the reference - // count will drop to zero and the buffer will be freed. - // - boost::asio::async_write (*socket_, - boost::asio::const_buffers_1 (&(*buf)[0], buf->size()), - strand_.wrap (std::bind (&Peer::handle_write, - shared_from_this(), beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred, buf))); -} - } } diff --git a/src/ripple/http/impl/Peer.h b/src/ripple/http/impl/Peer.h index 427aab4c8c..53559db9bf 100644 --- a/src/ripple/http/impl/Peer.h +++ b/src/ripple/http/impl/Peer.h @@ -26,9 +26,14 @@ #include #include #include +#include +#include #include #include #include +#include +#include +#include #include #include @@ -48,129 +53,54 @@ class Peer , public beast::LeakChecked { private: + typedef std::chrono::system_clock clock_type; + enum { // Size of our receive buffer - bufferSize = 8192, + bufferSize = 2048, // Largest HTTP request allowed maxRequestBytes = 32 * 1024, - // Max seconds without receiving a byte - dataTimeoutSeconds = 10, - - // Max seconds without completing the request - requestTimeoutSeconds = 30 + // Max seconds without completing a message + timeoutSeconds = 30 + //timeoutSeconds = 3 }; - ServerImpl& impl_; + beast::Journal journal_; + ServerImpl& server_; + std::string id_; boost::asio::io_service::strand strand_; - boost::asio::deadline_timer data_timer_; - boost::asio::deadline_timer request_timer_; + boost::asio::deadline_timer timer_; std::unique_ptr socket_; - // VFALCO TODO Use c++11 - beast::MemoryBlock buffer_; - - beast::HTTPRequestParser parser_; - int writesPending_; + boost::asio::streambuf read_buf_; + beast::http::message message_; + beast::http::parser parser_; + int pending_writes_; bool closed_; + bool finished_; bool callClose_; std::shared_ptr detach_ref_; boost::optional work_; - int errorCode_; + + boost::system::error_code ec_; std::atomic detached_; + clock_type::time_point when_; + std::string when_str_; + int request_count_; + std::size_t bytes_in_; + std::size_t bytes_out_; + //-------------------------------------------------------------------------- public: - Peer (ServerImpl& impl, Port const& port); + Peer (ServerImpl& impl, Port const& port, beast::Journal journal); ~Peer (); -private: - //-------------------------------------------------------------------------- - // - // Session - // - - beast::Journal - journal() - { - return impl_.journal(); - } - - beast::IP::Endpoint - remoteAddress() - { - return from_asio (get_socket().remote_endpoint()); - } - - bool - headersComplete() - { - return parser_.headersComplete(); - } - - beast::HTTPHeaders - headers() - { - return beast::HTTPHeaders (parser_.fields()); - } - - beast::SharedPtr const& - request() - { - return parser_.request(); - } - - std::string - content(); - - void - write (void const* buffer, std::size_t bytes); - - void - detach (); - - void - close (); - - //-------------------------------------------------------------------------- - // - // Completion Handlers - // - - void - handle_handshake (error_code ec); - - void - handle_data_timer (error_code ec); - - void - handle_request_timer (error_code ec); - - void - handle_write (error_code ec, std::size_t bytes_transferred, - SharedBuffer const& buf); - - void - handle_read (error_code ec, std::size_t bytes_transferred); - - void - handle_headers (); - - void - handle_request (); - - void - handle_close (); - -public: - //-------------------------------------------------------------------------- - // - // Peer - // socket& get_socket() { @@ -184,18 +114,76 @@ public: } void - accept (); + accept(); +private: void - cancel (); + cancel(); void failed (error_code const& ec); void - async_read_some (); + start_timer(); - void async_write (SharedBuffer const& buf); + void + async_write (SharedBuffer const& buf); + + //-------------------------------------------------------------------------- + // + // Completion Handlers + // + + void + on_timer (error_code ec); + + void + on_ssl_handshake (error_code ec); + + void + on_read_request (error_code ec, std::size_t bytes_transferred); + + void + on_write_response (error_code ec, std::size_t bytes_transferred, + SharedBuffer const& buf); + + void + on_close (); + + //-------------------------------------------------------------------------- + // + // Session + // + + beast::Journal + journal() override + { + return server_.journal(); + } + + beast::IP::Endpoint + remoteAddress() override + { + return from_asio (get_socket().remote_endpoint()); + } + + beast::http::message& + message() override + { + return message_; + } + + void + write (void const* buffer, std::size_t bytes) override; + + void + complete() override; + + void + detach() override; + + void + close (bool graceful) override; }; } diff --git a/src/ripple/http/impl/Port.cpp b/src/ripple/http/impl/Port.cpp index 629250ca1a..ddf28e036b 100644 --- a/src/ripple/http/impl/Port.cpp +++ b/src/ripple/http/impl/Port.cpp @@ -21,7 +21,7 @@ namespace ripple { namespace HTTP { Port::Port () - : security (no_ssl) + : security (Security::no_ssl) , port (0) , context (nullptr) { diff --git a/src/ripple/http/impl/Server.cpp b/src/ripple/http/impl/Server.cpp index cbceaf7350..70b6efa1f3 100644 --- a/src/ripple/http/impl/Server.cpp +++ b/src/ripple/http/impl/Server.cpp @@ -64,5 +64,11 @@ Server::stop () m_impl->stop(true); } +void +Server::onWrite (beast::PropertyStream::Map& map) +{ + m_impl->onWrite (map); +} + } } diff --git a/src/ripple/http/impl/ServerImpl.cpp b/src/ripple/http/impl/ServerImpl.cpp index f77ff26d1f..632a6d2e50 100644 --- a/src/ripple/http/impl/ServerImpl.cpp +++ b/src/ripple/http/impl/ServerImpl.cpp @@ -18,53 +18,66 @@ //============================================================================== #include +#include +#include +#include +#include +#include +#include +#include +#include namespace ripple { namespace HTTP { -ServerImpl::ServerImpl (Server& server, Handler& handler, beast::Journal journal) - : Thread ("HTTP::Server") - , m_server (server) +ServerImpl::ServerImpl (Server& server, + Handler& handler, beast::Journal journal) + : m_server (server) , m_handler (handler) , journal_ (journal) - , m_strand (m_io_service) - , m_work (boost::in_place (std::ref (m_io_service))) + , m_strand (io_service_) + , m_work (boost::in_place (std::ref (io_service_))) , m_stopped (true) { - startThread (); + thread_ = std::thread (std::bind ( + &ServerImpl::run, this)); } ServerImpl::~ServerImpl () { - stopThread (); + thread_.join(); } -Ports const& ServerImpl::getPorts () const +Ports const& +ServerImpl::getPorts () const { - SharedState::ConstUnlockedAccess state (m_state); - return state->ports; + std::lock_guard lock (mutex_); + return state_.ports; } -void ServerImpl::setPorts (Ports const& ports) +void +ServerImpl::setPorts (Ports const& ports) { - SharedState::Access state (m_state); - state->ports = ports; + std::lock_guard lock (mutex_); + state_.ports = ports; update(); } -bool ServerImpl::stopping () const +bool +ServerImpl::stopping () const { return ! m_work; } -void ServerImpl::stop (bool wait) +void +ServerImpl::stop (bool wait) { if (! stopping()) { m_work = boost::none; update(); } - + if (wait) m_stopped.wait(); } @@ -74,55 +87,99 @@ void ServerImpl::stop (bool wait) // Server // -Handler& ServerImpl::handler() +Handler& +ServerImpl::handler() { return m_handler; } -boost::asio::io_service& ServerImpl::get_io_service() +boost::asio::io_service& +ServerImpl::get_io_service() { - return m_io_service; + return io_service_; } // Inserts the peer into our list of peers. We only remove it // from the list inside the destructor of the Peer object. This // way, the Peer can never outlive the server. // -void ServerImpl::add (Peer& peer) +void +ServerImpl::add (Peer& peer) { - SharedState::Access state (m_state); - state->peers.push_back (peer); + std::lock_guard lock (mutex_); + state_.peers.push_back (peer); } -void ServerImpl::add (Door& door) +void +ServerImpl::add (Door& door) { - SharedState::Access state (m_state); - state->doors.push_back (door); + std::lock_guard lock (mutex_); + state_.doors.push_back (door); } // Removes the peer from our list of peers. This is only called from // the destructor of Peer. Essentially, the item in the list functions // as a weak_ptr. // -void ServerImpl::remove (Peer& peer) +void +ServerImpl::remove (Peer& peer) { - SharedState::Access state (m_state); - state->peers.erase (state->peers.iterator_to (peer)); + std::lock_guard lock (mutex_); + state_.peers.erase (state_.peers.iterator_to (peer)); } -void ServerImpl::remove (Door& door) +void +ServerImpl::remove (Door& door) { - SharedState::Access state (m_state); - state->doors.push_back (door); + std::lock_guard lock (mutex_); + state_.doors.push_back (door); } //-------------------------------------------------------------------------- -// -// Thread -// + +void +ServerImpl::report (Stat&& stat) +{ + std::lock_guard lock (mutex_); + if (stats_.size() >= historySize) + stats_.pop_back(); + stats_.emplace_front (std::move(stat)); +} + +void +ServerImpl::onWrite (beast::PropertyStream::Map& map) +{ + std::lock_guard lock (mutex_); + + // VFALCO TODO Write the list of doors + + { + beast::PropertyStream::Set set ("sessions", map); + for (auto const& stat : stats_) + { + beast::PropertyStream::Map item (set); + + item ["when"] = stat.when; + + { + std::stringstream ss; + ss << stat.elapsed; + item ["elapsed"] = ss.str(); + } + + item ["requests"] = stat.requests; + item ["bytes_in"] = stat.bytes_in; + item ["bytes_out"] = stat.bytes_out; + if (stat.ec) + item ["error"] = stat.ec.message(); + } + } +} + //-------------------------------------------------------------------------- -int ServerImpl::compare (Port const& lhs, Port const& rhs) +int +ServerImpl::compare (Port const& lhs, Port const& rhs) { if (lhs < rhs) return -1; @@ -131,18 +188,31 @@ int ServerImpl::compare (Port const& lhs, Port const& rhs) return 0; } -// Updates our Door list based on settings. -// -void ServerImpl::handle_update () +void +ServerImpl::update() { + io_service_.post (m_strand.wrap (std::bind ( + &ServerImpl::on_update, this))); +} + +// Updates our Door list based on settings. +void +ServerImpl::on_update () +{ + /* + if (! m_strand.running_in_this_thread()) + io_service_.dispatch (m_strand.wrap (std::bind ( + &ServerImpl::update, this))); + */ + if (! stopping()) { // Make a local copy to shorten the lock // Ports ports; { - SharedState::ConstAccess state (m_state); - ports = state->ports; + std::lock_guard lock (mutex_); + ports = state_.ports; } std::sort (ports.begin(), ports.end()); @@ -206,19 +276,16 @@ void ServerImpl::handle_update () } } -// Causes handle_update to run on the io_service -// -void ServerImpl::update () +// Thread entry point to perform io_service work +void +ServerImpl::run() { - m_io_service.post (m_strand.wrap (std::bind ( - &ServerImpl::handle_update, this))); -} + static std::atomic id; -// The main i/o processing loop. -// -void ServerImpl::run () -{ - m_io_service.run (); + beast::Thread::setCurrentThreadName ( + std::string("HTTP::Server #") + std::to_string (++id)); + + io_service_.run(); m_stopped.signal(); m_handler.onStopped (m_server); diff --git a/src/ripple/http/impl/ServerImpl.h b/src/ripple/http/impl/ServerImpl.h index 0443b32215..ecd48da9a1 100644 --- a/src/ripple/http/impl/ServerImpl.h +++ b/src/ripple/http/impl/ServerImpl.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_HTTP_SERVERIMPL_H_INCLUDED #define RIPPLE_HTTP_SERVERIMPL_H_INCLUDED +#include #include #include #include @@ -27,6 +28,11 @@ #include #include #include +#include +#include +#include +#include +#include #include namespace ripple { @@ -35,9 +41,26 @@ namespace HTTP { class Door; class Peer; -class ServerImpl : public beast::Thread +struct Stat +{ + std::string when; + std::chrono::seconds elapsed; + int requests; + std::size_t bytes_in; + std::size_t bytes_out; + boost::system::error_code ec; +}; + +class ServerImpl { private: + typedef std::chrono::system_clock clock_type; + + enum + { + historySize = 100 + }; + struct State { // Attributes for our listening ports @@ -50,18 +73,21 @@ private: beast::List doors; }; - typedef beast::SharedData SharedState; - typedef std::vector > Doors; + typedef std::vector > Doors; Server& m_server; Handler& m_handler; + std::thread thread_; + std::mutex mutable mutex_; + std::condition_variable cond_; beast::Journal journal_; - boost::asio::io_service m_io_service; + boost::asio::io_service io_service_; boost::asio::io_service::strand m_strand; boost::optional m_work; beast::WaitableEvent m_stopped; - SharedState m_state; + State state_; Doors m_doors; + std::deque stats_; public: ServerImpl (Server& server, Handler& handler, beast::Journal journal); @@ -104,15 +130,24 @@ public: remove (Door& door); void - handle_update (); + report (Stat&& stat); void - update (); + onWrite (beast::PropertyStream::Map& map); + +private: + static + int + compare (Port const& lhs, Port const& rhs); void - run (); + update(); + + void + on_update(); - static int compare (Port const& lhs, Port const& rhs); + void + run(); }; diff --git a/src/ripple/http/tests/Server.test.cpp b/src/ripple/http/tests/Server.test.cpp new file mode 100644 index 0000000000..687259259b --- /dev/null +++ b/src/ripple/http/tests/Server.test.cpp @@ -0,0 +1,254 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { +namespace HTTP { + +class Server_test : public beast::unit_test::suite +{ +public: + enum + { + testPort = 1001 + }; + + class TestSink : public beast::Journal::Sink + { + beast::unit_test::suite& suite_; + + public: + TestSink (beast::unit_test::suite& suite) + : suite_ (suite) + { + } + + void + write (beast::Journal::Severity level, + std::string const& text) override + { + suite_.log << text; + } + }; + + struct TestHandler : Handler + { + void + onAccept (Session& session) override + { + } + + void + onRequest (Session& session) override + { + session << "Hello, world!\n"; + if (session.message().keep_alive()) + session.complete(); + else + session.close (true); + } + + void + onClose (Session& session, + boost::system::error_code const&) override + { + } + + void + onStopped (Server& server) + { + } + }; + + // Connect to an address + template + bool + connect (Socket& s, std::string const& addr, int port) + { + try + { + typename Socket::endpoint_type ep ( + boost::asio::ip::address::from_string (addr), port); + s.connect (ep); + pass(); + return true; + } + catch (std::exception const& e) + { + fail (e.what()); + } + + return false; + } + + // Write a string to the stream + template + bool + write (SyncWriteStream& s, std::string const& text) + { + try + { + boost::asio::write (s, boost::asio::buffer (text)); + pass(); + return true; + } + catch (std::exception const& e) + { + fail (e.what()); + } + return false; + } + + // Expect that reading the stream produces a matching string + template + bool + expect_read (SyncReadStream& s, std::string const& match) + { + boost::asio::streambuf b (1000); // limit on read + try + { + auto const n = boost::asio::read_until (s, b, '\n'); + if (expect (n == match.size())) + { + std::string got; + got.resize (n); + boost::asio::buffer_copy (boost::asio::buffer ( + &got[0], n), b.data()); + return expect (got == match); + } + } + catch (std::length_error const& e) + { + fail(e.what()); + } + return false; + } + + void + test_request() + { + testcase("request"); + + boost::asio::io_service ios; + typedef boost::asio::ip::tcp::socket socket; + socket s (ios); + + if (! connect (s, "127.0.0.1", testPort)) + return; + + if (! write (s, + "GET / HTTP/1.1\r\n" + "Connection: close\r\n" + "\r\n")) + return; + + if (! expect_read (s, "Hello, world!\n")) + return ; + + try + { + s.shutdown (socket::shutdown_both); + pass(); + } + catch (std::exception const& e) + { + fail (e.what()); + } + + std::this_thread::sleep_for (std::chrono::seconds (1)); + } + + void + test_keepalive() + { + testcase("keepalive"); + + boost::asio::io_service ios; + typedef boost::asio::ip::tcp::socket socket; + socket s (ios); + + if (! connect (s, "127.0.0.1", testPort)) + return; + + if (! write (s, + "GET / HTTP/1.1\r\n" + "Connection: Keep-Alive\r\n" + "\r\n")) + return; + + if (! expect_read (s, "Hello, world!\n")) + return ; + + if (! write (s, + "GET / HTTP/1.1\r\n" + "Connection: close\r\n" + "\r\n")) + return; + + if (! expect_read (s, "Hello, world!\n")) + return ; + + try + { + s.shutdown (socket::shutdown_both); + pass(); + } + catch (std::exception const& e) + { + fail (e.what()); + } + } + + void + run() + { + TestSink sink {*this}; + sink.severity (beast::Journal::Severity::kAll); + beast::Journal journal {sink}; + TestHandler handler; + Server s (handler, journal); + Ports ports; + std::unique_ptr c ( + RippleSSLContext::createBare ()); + ports.emplace_back (testPort, beast::IP::Endpoint ( + beast::IP::AddressV4 (127, 0, 0, 1), 0), + Port::Security::no_ssl, c.get()); + s.setPorts (ports); + + test_request(); + //test_keepalive(); + + s.stop(); + + pass(); + } +}; + +BEAST_DEFINE_TESTSUITE_MANUAL(Server,http,ripple); + +} +} diff --git a/src/ripple/module/app/main/Application.cpp b/src/ripple/module/app/main/Application.cpp index 8df6edbc2f..257a37a866 100644 --- a/src/ripple/module/app/main/Application.cpp +++ b/src/ripple/module/app/main/Application.cpp @@ -302,7 +302,7 @@ public: // VFALCO NOTE LocalCredentials starts the deprecated UNL service , m_deprecatedUNL (UniqueNodeList::New (*m_jobQueue)) - , m_rpcHTTPServer (RPCHTTPServer::New (*m_networkOPs, + , m_rpcHTTPServer (make_RPCHTTPServer (*m_networkOPs, m_logs.journal("HTTPServer"), *m_jobQueue, *m_networkOPs, *m_resourceManager)) , m_rpcServerHandler (*m_networkOPs, *m_resourceManager) // passive object, not a Service @@ -363,6 +363,7 @@ public: m_nodeStoreScheduler.setJobQueue (*m_jobQueue); add (m_ledgerMaster->getPropertySource ()); + add (*m_rpcHTTPServer); // VFALCO TODO remove these once the call is thread safe. HashMaps::getInstance ().initializeNonce (); diff --git a/src/ripple/module/app/main/RPCHTTPServer.cpp b/src/ripple/module/app/main/RPCHTTPServer.cpp index 24759a2035..c70b0b8492 100644 --- a/src/ripple/module/app/main/RPCHTTPServer.cpp +++ b/src/ripple/module/app/main/RPCHTTPServer.cpp @@ -40,10 +40,8 @@ public: std::unique_ptr m_context; RPCHTTPServerImp (Stoppable& parent, - beast::Journal journal, - JobQueue& jobQueue, - NetworkOPs& networkOPs, - Resource::Manager& resourceManager) + beast::Journal journal, JobQueue& jobQueue, NetworkOPs& networkOPs, + Resource::Manager& resourceManager) : RPCHTTPServer (parent) , m_resourceManager (resourceManager) , m_journal (journal) @@ -65,12 +63,13 @@ public: } } - ~RPCHTTPServerImp () + ~RPCHTTPServerImp() { m_server.stop(); } - void setup (beast::Journal journal) + void + setup (beast::Journal journal) override { if (! getConfig ().getRpcIP().empty () && getConfig ().getRpcPort() != 0) @@ -81,7 +80,7 @@ public: //if (! is_unspecified (ep)) { HTTP::Port port; - port.security = HTTP::Port::allow_ssl; + port.security = HTTP::Port::Security::allow_ssl; port.addr = ep.at_port(0); if (getConfig ().getRpcPort() != 0) port.port = getConfig ().getRpcPort(); @@ -105,12 +104,14 @@ public: // Stoppable // - void onStop () + void + onStop() override { m_server.stopAsync(); } - void onChildrenStopped () + void + onChildrenStopped() override { } @@ -119,28 +120,26 @@ public: // HTTP::Handler // - void onAccept (HTTP::Session& session) + void + onAccept (HTTP::Session& session) override { // Reject non-loopback connections if RPC_ALLOW_REMOTE is not set if (! getConfig().RPC_ALLOW_REMOTE && ! beast::IP::is_loopback (session.remoteAddress())) { - session.close(); + session.close (false); } } - void onHeaders (HTTP::Session& session) - { - } - - void onRequest (HTTP::Session& session) + void + onRequest (HTTP::Session& session) override { // Check user/password authorization - auto const headers (session.request()->headers().build_map()); + auto const headers (build_map (session.message().headers)); if (! HTTPAuthorized (headers)) { session.write (HTTPReply (403, "Forbidden")); - session.close(); + session.close (true); return; } @@ -158,17 +157,21 @@ public: #endif } - void onClose (HTTP::Session& session, int errorCode) + void + onClose (HTTP::Session& session, + boost::system::error_code const&) override { } - void onStopped (HTTP::Server&) + void + onStopped (HTTP::Server&) override { stopped(); } //-------------------------------------------------------------------------- + // Dispatched on the job queue void processSession (Job& job, HTTP::Session& session) { #if 0 @@ -176,11 +179,19 @@ public: session.write (m_deprecatedHandler.processRequest ( session.content(), session.remoteAddress().at_port(0))); #else - session.write (processRequest (session.content(), + auto const s (to_string(session.message().body)); + session.write (processRequest (to_string(session.message().body), session.remoteAddress().at_port(0))); #endif - session.close(); + if (session.message().keep_alive()) + { + session.complete(); + } + else + { + session.close (true); + } } std::string createResponse ( @@ -280,24 +291,35 @@ public: return createResponse (200, response); } + + // + // PropertyStream + // + + void + onWrite (beast::PropertyStream::Map& map) override + { + m_server.onWrite (map); + } }; //------------------------------------------------------------------------------ RPCHTTPServer::RPCHTTPServer (Stoppable& parent) : Stoppable ("RPCHTTPServer", parent) + , Source ("rpc") { } //------------------------------------------------------------------------------ -RPCHTTPServer* RPCHTTPServer::New (Stoppable& parent, - beast::Journal journal, - JobQueue& jobQueue, - NetworkOPs& networkOPs, - Resource::Manager& resourceManager) +std::unique_ptr +make_RPCHTTPServer (beast::Stoppable& parent, beast::Journal journal, + JobQueue& jobQueue, NetworkOPs& networkOPs, + Resource::Manager& resourceManager) { - return new RPCHTTPServerImp (parent, journal, jobQueue, networkOPs, resourceManager); + return std::make_unique ( + parent, journal, jobQueue, networkOPs, resourceManager); } } diff --git a/src/ripple/module/app/main/RPCHTTPServer.h b/src/ripple/module/app/main/RPCHTTPServer.h index 979a8a7076..0a22736763 100644 --- a/src/ripple/module/app/main/RPCHTTPServer.h +++ b/src/ripple/module/app/main/RPCHTTPServer.h @@ -20,24 +20,34 @@ #ifndef RIPPLE_APP_RPCHTTPSERVER_H_INCLUDED #define RIPPLE_APP_RPCHTTPSERVER_H_INCLUDED +#include +#include +#include // + namespace ripple { -class RPCHTTPServer : public beast::Stoppable +class RPCHTTPServer + : public beast::Stoppable + , public beast::PropertyStream::Source { protected: RPCHTTPServer (Stoppable& parent); public: - static RPCHTTPServer* New (Stoppable& parent, - beast::Journal journal, JobQueue& jobQueue, NetworkOPs& networkOPs, - Resource::Manager& resourceManager); - - virtual ~RPCHTTPServer () { } + virtual + ~RPCHTTPServer() = default; /** Opens listening ports based on the Config settings. */ - virtual void setup (beast::Journal journal) = 0; + virtual + void + setup (beast::Journal journal) = 0; }; +std::unique_ptr +make_RPCHTTPServer (beast::Stoppable& parent, beast::Journal journal, + JobQueue& jobQueue, NetworkOPs& networkOPs, + Resource::Manager& resourceManager); + } #endif diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 0de0fbbe12..193a82e237 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -40,8 +40,6 @@ peerTXData (Job&, std::weak_ptr wPeer, uint256 const& hash, //------------------------------------------------------------------------------ -//------------------------------------------------------------------------------ - /* Completion handlers for client role. Logic steps: 1. Establish outgoing connection @@ -114,11 +112,11 @@ PeerImp::on_connect (error_code ec) beast::asio::placeholders::error))); } -beast::http::basic_message +beast::http::message PeerImp::make_request() { assert (! m_inbound); - beast::http::basic_message m; + beast::http::message m; m.method (beast::http::method_t::http_get); m.url ("/"); m.version (1, 1); @@ -152,8 +150,8 @@ PeerImp::on_connect_ssl (error_code ec) } #if RIPPLE_STRUCTURED_OVERLAY_CLIENT - beast::http::basic_message req (make_request()); - beast::http::xwrite (write_buffer_, req); + beast::http::message req (make_request()); + beast::http::write (write_buffer_, req); on_write_http_request (error_code(), 0); #else @@ -204,8 +202,11 @@ PeerImp::on_read_http_response (error_code ec, std::size_t bytes_transferred) if (! ec) { read_buffer_.commit (bytes_transferred); + bool success; std::size_t bytes_consumed; - std::tie (ec, bytes_consumed) = http_parser_->write (read_buffer_.data()); + std::tie (success, bytes_consumed) = http_parser_->write (read_buffer_.data()); + if (! success) + ec = http_parser_->error(); if (! ec) { @@ -232,6 +233,12 @@ PeerImp::on_read_http_response (error_code ec, std::size_t bytes_transferred) } } + if (ec == boost::asio::error::eof) + { + // remote closed their end + // VFALCO TODO Clean up the shutdown of the socket + } + if (ec) { m_journal.info << @@ -344,8 +351,12 @@ PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred) if (! ec) { read_buffer_.commit (bytes_transferred); + bool success; std::size_t bytes_consumed; - std::tie (ec, bytes_consumed) = http_parser_->write (read_buffer_.data()); + std::tie (success, bytes_consumed) = http_parser_->write (read_buffer_.data()); + if (! success) + ec = http_parser_->error(); + if (! ec) { read_buffer_.consume (bytes_consumed); @@ -363,7 +374,7 @@ PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred) "Upgrade: Ripple/1.2\r\n" "Connection: Upgrade\r\n" "\r\n"; - beast::http::xwrite (write_buffer_, ss.str()); + beast::http::write (write_buffer_, ss.str()); on_write_http_response(error_code(), 0); } else @@ -377,7 +388,7 @@ PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred) "400 Bad Request
" "The server requires an Upgrade request." ""; - beast::http::xwrite (write_buffer_, ss.str()); + beast::http::write (write_buffer_, ss.str()); on_write_http_response(error_code(), 0); } return; @@ -399,10 +410,10 @@ PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred) beast::asio::placeholders::bytes_transferred))); } -beast::http::basic_message -PeerImp::make_response (beast::http::basic_message const& req) +beast::http::message +PeerImp::make_response (beast::http::message const& req) { - beast::http::basic_message resp; + beast::http::message resp; // Unimplemented return resp; } diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 966cb67fb7..1598a94561 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -39,7 +39,8 @@ #include #include -#include +#include +#include #include @@ -167,8 +168,8 @@ public: boost::asio::streambuf read_buffer_; - boost::optional http_message_; - boost::optional http_parser_; + boost::optional http_message_; + boost::optional http_parser_; message_stream message_stream_; boost::asio::streambuf write_buffer_; @@ -280,7 +281,7 @@ private: void on_connect (error_code ec); - beast::http::basic_message + beast::http::message make_request(); void @@ -308,8 +309,8 @@ private: void on_read_http_request (error_code ec, std::size_t bytes_transferred); - beast::http::basic_message - make_response (beast::http::basic_message const& req); + beast::http::message + make_response (beast::http::message const& req); void on_write_http_response (error_code ec, std::size_t bytes_transferred); diff --git a/src/ripple/overlay/impl/peer_info.h b/src/ripple/overlay/impl/peer_info.h index 3d3730b022..803c343df3 100644 --- a/src/ripple/overlay/impl/peer_info.h +++ b/src/ripple/overlay/impl/peer_info.h @@ -20,7 +20,7 @@ #ifndef RIPPLE_OVERLAY_PEER_INFO_H_INCLUDED #define RIPPLE_OVERLAY_PEER_INFO_H_INCLUDED -#include +#include namespace ripple { diff --git a/src/ripple/unity/http.cpp b/src/ripple/unity/http.cpp index 1ad92de1fe..d73e3233f8 100644 --- a/src/ripple/unity/http.cpp +++ b/src/ripple/unity/http.cpp @@ -25,3 +25,4 @@ #include #include #include +#include