From ee8bd8ddaed40de284dfc401232ad48bbb3d4fdf Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Mon, 8 Sep 2014 14:04:16 -0700 Subject: [PATCH] Fix handling of HTTP/S keep-alives (RIPD-556): * Proper shutdown for ssl and non-ssl connections * Report session id in history * Report histogram of requests per session * Change print name to 'http' * Split logging into "HTTP" and "HTTP-RPC" partitions * More logging and refinement of logging severities * Log the request count when a session is destroyed Conflicts: src/ripple/http/impl/Peer.cpp src/ripple/http/impl/Peer.h src/ripple/http/impl/ServerImpl.cpp src/ripple/module/app/main/Application.cpp src/ripple/module/app/main/RPCHTTPServer.cpp --- Builds/VisualStudio2013/RippleD.vcxproj | 2 + .../VisualStudio2013/RippleD.vcxproj.filters | 3 + src/ripple/http/Session.h | 18 +- src/ripple/http/impl/Door.cpp | 9 +- src/ripple/http/impl/Door.h | 1 + src/ripple/http/impl/Peer.cpp | 557 ++++++++++++------ src/ripple/http/impl/Peer.h | 85 ++- src/ripple/http/impl/ServerImpl.cpp | 48 +- src/ripple/http/impl/ServerImpl.h | 8 + src/ripple/module/app/main/Application.cpp | 2 +- src/ripple/module/app/main/RPCHTTPServer.cpp | 18 +- src/ripple/module/app/main/RPCHTTPServer.h | 14 +- 12 files changed, 529 insertions(+), 236 deletions(-) diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index 98a966c37..1d8f7ca5f 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -159,6 +159,8 @@ + + True diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index 6c4b66ea7..2b043ba8b 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -711,6 +711,9 @@ beast\asio + + beast\asio + beast\asio\tests diff --git a/src/ripple/http/Session.h b/src/ripple/http/Session.h index 85a280af0..10e7f6fc2 100644 --- a/src/ripple/http/Session.h +++ b/src/ripple/http/Session.h @@ -147,6 +147,15 @@ public: } /** @} */ + /** Detach the session. + This holds the session open so that the response can be sent + asynchronously. Calls to io_service::run made by the server + will not return until all detached sessions are closed. + */ + virtual + void + detach() = 0; + /** Indicate that the response is complete. The handler should call this when it has completed writing the response. If Keep-Alive is indicated on the connection, @@ -157,15 +166,6 @@ public: void complete() = 0; - /** Detach the session. - This holds the session open so that the response can be sent - asynchronously. Calls to io_service::run made by the server - will not return until all detached sessions are closed. - */ - virtual - void - detach() = 0; - /** Close the session. This will be performed asynchronously. The session will be closed gracefully after all pending writes have completed. diff --git a/src/ripple/http/impl/Door.cpp b/src/ripple/http/impl/Door.cpp index 9b0c0fea8..86ecd716b 100644 --- a/src/ripple/http/impl/Door.cpp +++ b/src/ripple/http/impl/Door.cpp @@ -81,7 +81,7 @@ void Door::async_accept () { auto const peer (std::make_shared (server_, port_, server_.journal())); - acceptor_.async_accept (peer->get_socket(), std::bind ( + acceptor_.async_accept (peer->get_socket(), endpoint_, std::bind ( &Door::handle_accept, Ptr(this), beast::asio::placeholders::error, peer)); } @@ -95,13 +95,14 @@ Door::handle_accept (error_code ec, if (ec) { - server_.journal().error << "Accept failed: " << ec.message(); + server_.journal().error << + "accept: " << ec.message(); return; } + auto const endpoint = endpoint_; async_accept(); - - peer->accept(); + peer->accept (endpoint); } } diff --git a/src/ripple/http/impl/Door.h b/src/ripple/http/impl/Door.h index 356c157ad..0950ea884 100644 --- a/src/ripple/http/impl/Door.h +++ b/src/ripple/http/impl/Door.h @@ -39,6 +39,7 @@ private: ServerImpl& server_; acceptor acceptor_; + endpoint_t endpoint_; Port port_; public: diff --git a/src/ripple/http/impl/Peer.cpp b/src/ripple/http/impl/Peer.cpp index ec4197c07..fb461c211 100644 --- a/src/ripple/http/impl/Peer.cpp +++ b/src/ripple/http/impl/Peer.cpp @@ -18,27 +18,35 @@ //============================================================================== #include +#include #include namespace ripple { namespace HTTP { +/* +Reference: +http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error +*/ + +std::atomic Peer::s_count_; + +std::size_t +Peer::count() +{ + return s_count_.load(); +} + Peer::Peer (ServerImpl& server, Port const& port, beast::Journal journal) : journal_ (journal) , server_ (server) , strand_ (server_.get_io_service()) , timer_ (server_.get_io_service()) , parser_ (message_, true) - , pending_writes_ (0) - , closed_ (false) - , callClose_ (false) - , detached_ (0) - , request_count_ (0) - , bytes_in_ (0) - , bytes_out_ (0) { static std::atomic sid; - id_ = std::string("#") + std::to_string(sid++); + nid_ = ++sid; + id_ = std::string("#") + std::to_string(nid_) + " "; tag = nullptr; @@ -63,16 +71,16 @@ Peer::Peer (ServerImpl& server, Port const& port, beast::Journal journal) server_.add (*this); - if (journal_.trace) journal_.trace << - id_ << " created"; + if (journal_.trace) journal_.trace << id_ << + "created"; } - Peer::~Peer () { if (callClose_) { Stat stat; + stat.id = nid_; stat.when = std::move (when_str_); stat.elapsed = std::chrono::duration_cast < std::chrono::seconds> (clock_type::now() - when_); @@ -80,32 +88,32 @@ Peer::~Peer () stat.bytes_in = bytes_in_; stat.bytes_out = bytes_out_; stat.ec = std::move (ec_); - server_.report (std::move (stat)); - if (journal_.trace) journal_.trace << - id_ << " onClose (" << ec_ << ")"; server_.handler().onClose (session(), ec_); } server_.remove (*this); - if (journal_.trace) journal_.trace << - id_ << " destroyed"; + if (journal_.trace) journal_.trace << id_ << + "destroyed: " << request_count_ << + ((request_count_ == 1) ? " request" : " requests"); + + --s_count_; } //------------------------------------------------------------------------------ // Called when the acceptor accepts our socket. void -Peer::accept () +Peer::accept (boost::asio::ip::tcp::endpoint endpoint) { if (! strand_.running_in_this_thread()) return server_.get_io_service().dispatch (strand_.wrap ( - std::bind (&Peer::accept, shared_from_this()))); + std::bind (&Peer::accept, shared_from_this(), endpoint))); - if (journal_.trace) journal_.trace << - id_ << " accept"; + if (journal_.trace) journal_.trace << id_ << + "accept: " << endpoint.address(); callClose_ = true; @@ -113,57 +121,42 @@ Peer::accept () when_str_ = beast::Time::getCurrentTime().formatted ( "%Y-%b-%d %H:%M:%S").toStdString(); - if (journal_.trace) journal_.trace << - id_ << " onAccept"; - server_.handler().onAccept (session()); // Handler might have closed - if (closed_) + if (state_ == State::closed) { // VFALCO TODO Is this the correct way to close the socket? // See what state the socket is in and verify. - cancel(); + //closed(); return; } if (socket_->needs_handshake ()) { start_timer(); - socket_->async_handshake (beast::asio::abstract_socket::server, - strand_.wrap (std::bind (&Peer::on_ssl_handshake, shared_from_this(), + strand_.wrap (std::bind (&Peer::on_handshake, shared_from_this(), beast::asio::placeholders::error))); } else { - on_read_request (error_code{}, 0); + async_read(); } } -// Cancel all pending i/o and timers and send tcp shutdown. -void -Peer::cancel () -{ - if (journal_.trace) journal_.trace << - id_ << " cancel"; - - error_code ec; - timer_.cancel (ec); - socket_->cancel (ec); - socket_->shutdown (socket::shutdown_both, ec); -} - // Called by a completion handler when error is not eof or aborted. void -Peer::failed (error_code const& ec) +Peer::fail (error_code ec) { - if (journal_.trace) journal_.trace << - id_ << " failed, " << ec.message(); + assert (ec); + assert (strand_.running_in_this_thread()); + + if (journal_.debug) journal_.debug << id_ << + "fail: " << ec.message(); ec_ = ec; - assert (ec_); - cancel (); + socket_->cancel(ec); } // Start the timer. @@ -172,9 +165,6 @@ Peer::failed (error_code const& ec) void Peer::start_timer() { - if (journal_.trace) journal_.trace << - id_ << " start_timer"; - timer_.expires_from_now ( boost::posix_time::seconds ( timeoutSeconds)); @@ -184,29 +174,35 @@ Peer::start_timer() beast::asio::placeholders::error))); } -// Send a shared buffer void -Peer::async_write (SharedBuffer const& buf) +Peer::async_write () { - assert (buf.get().size() > 0); - - ++pending_writes_; - - if (journal_.trace) journal_.trace << - id_ << " async_write, pending_writes = " << pending_writes_; + void const* data; + std::size_t bytes; + { + std::lock_guard lock (mutex_); + buffer& b = write_queue_.front(); + data = b.data.get() + b.used; + bytes = b.bytes - b.used; + } start_timer(); + boost::asio::async_write (*socket_, boost::asio::buffer (data, bytes), + boost::asio::transfer_at_least (1), strand_.wrap (std::bind ( + &Peer::on_write, shared_from_this(), + beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); +} - // Send the copy. We pass the SharedBuffer in the last parameter - // so that a reference is maintained as the handler gets copied. - // When the final completion function returns, the reference - // count will drop to zero and the buffer will be freed. - // - boost::asio::async_write (*socket_, - boost::asio::const_buffers_1 (&(*buf)[0], buf->size()), - strand_.wrap (std::bind (&Peer::on_write_response, - shared_from_this(), beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred, buf))); +void +Peer::async_read() +{ + start_timer(); + boost::asio::async_read (*socket_, read_buf_.prepare (bufferSize), + boost::asio::transfer_at_least (1), strand_.wrap (std::bind ( + &Peer::on_read, shared_from_this(), + beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); } //------------------------------------------------------------------------------ @@ -215,154 +211,310 @@ Peer::async_write (SharedBuffer const& buf) void Peer::on_timer (error_code ec) { - if (ec == boost::asio::error::operation_aborted) + if (state_ == State::closed) + { + if (journal_.trace) journal_.trace << id_ << + "timer: closed"; return; + } + + if (ec == boost::asio::error::operation_aborted) + { + // Disable this otherwise we log needlessly + // on every new read and write. + /* + if (journal_.trace) journal_.trace << id_ << + "timer: aborted"; + */ + return; + } if (! ec) ec = boost::system::errc::make_error_code ( boost::system::errc::timed_out); - failed (ec); + if (journal_.debug) journal_.debug << id_ << + "timer: " << ec.message(); + fail (ec); } // Called when the handshake completes void -Peer::on_ssl_handshake (error_code ec) +Peer::on_handshake (error_code ec) { - if (journal_.trace) journal_.trace << - id_ << " on_ssl_handshake, " << ec.message(); - - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec != 0) { - failed (ec); + error_code ec; + timer_.cancel(ec); + } + + bool const ssl = socket_->ssl_handle() != nullptr; + + if (state_ == State::closed) + { + if (journal_.trace) journal_.trace << id_ << + "handshake: closed"; return; } - on_read_request (error_code{}, 0); + if (ec == boost::asio::error::operation_aborted) + { + if (journal_.trace) journal_.trace << id_ << + "handshake: aborted"; + return; + } + + if (ec) + { + if (journal_.debug) journal_.debug << id_ << + "handshake: " << ec.message(); + return fail (ec); + } + + if (journal_.trace) journal_.trace << id_ << + "handshake" << (ssl ? ": ssl" : ""); + + async_read(); } // Called repeatedly with the http request data void -Peer::on_read_request (error_code ec, std::size_t bytes_transferred) +Peer::on_read (error_code ec, std::size_t bytes_transferred) { - if (journal_.trace) + // This needs to happen before the call to onRequest + // otherwise we could time out if the Handler takes too long. { - if (! ec_) - journal_.trace << - id_ << " on_read_request, " << - bytes_transferred << " bytes"; - else - journal_.trace << - id_ << " on_read_request failed, " << - ec.message(); + error_code ec; + timer_.cancel(ec); + } + + if (state_ == State::closed) + { + if (journal_.trace) journal_.trace << id_ << + "read: closed"; + return; } if (ec == boost::asio::error::operation_aborted) + { + if (journal_.trace) journal_.trace << id_ << + "read: aborted"; return; + } + + if (beast::asio::is_short_read (ec)) + { + if (journal_.trace) journal_.trace << id_ << + "read: " << ec.message(); + return fail (ec); + } + + if (ec && ec != boost::asio::error::eof) + { + if (journal_.debug) journal_.debug << id_ << + "read: " << ec.message(); + return fail (ec); + } bool const eof = ec == boost::asio::error::eof; - if (eof) - ec = error_code{}; - - if (! ec) + if (! eof) { - bytes_in_ += bytes_transferred; + if (journal_.trace) journal_.trace << id_ << + "read: " << bytes_transferred << " bytes"; + } + else + { + // End of stream reached: + // http://www.boost.org/doc/libs/1_56_0/doc/html/boost_asio/overview/core/streams.html + if (bytes_transferred != 0) + if (journal_.error) journal_.error << id_ << + "read: eof (" << bytes_transferred << " bytes)"; + if (journal_.debug) journal_.debug << id_ << + "read: eof"; + ec = error_code{}; + } - read_buf_.commit (bytes_transferred); + bytes_in_ += bytes_transferred; + read_buf_.commit (bytes_transferred); - std::pair result; - - if (! eof) - { - result = parser_.write (read_buf_.data()); - if (result.first) - read_buf_.consume (result.second); - else - ec = parser_.error(); - } + std::pair result; + if (! eof) + { + result = parser_.write (read_buf_.data()); + if (result.first) + read_buf_.consume (result.second); else - { - result.first = parser_.write_eof(); - if (! result.first) - ec = parser_.error(); - } + ec = parser_.error(); + } + else + { + result.first = parser_.write_eof(); + if (! result.first) + ec = parser_.error(); + } + + // VFALCO TODO Currently parsing errors are treated the + // same as the connection dropping. Instead, we + // should request that the handler compose a proper HTTP error + // response. This requires refactoring HTTPReply() into + // something sensible. + // + if (! ec && parser_.complete()) + { + // Perform half-close when Connection: close and not SSL + if (! message_.keep_alive() && + ! socket_->needs_handshake()) + socket_->shutdown (socket::shutdown_receive, ec); if (! ec) { - if (parser_.complete()) - { - // ??? - //if (! socket_->needs_handshake()) - // socket_->shutdown (socket::shutdown_receive, ec); - - if (journal_.trace) journal_.trace << - id_ << " onRequest"; - - ++request_count_; - - server_.handler().onRequest (session()); - return; - } + ++request_count_; + server_.handler().onRequest (session()); + return; } } if (ec) { - failed (ec); - return; + if (journal_.debug) journal_.debug << id_ << + "read: " << ec.message(); + return fail (ec); } - start_timer(); - - socket_->async_read_some (read_buf_.prepare (bufferSize), strand_.wrap ( - std::bind (&Peer::on_read_request, shared_from_this(), - beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); + if (! eof) + async_read(); } // Called when async_write completes. void -Peer::on_write_response (error_code ec, std::size_t bytes_transferred, - SharedBuffer const& buf) +Peer::on_write (error_code ec, std::size_t bytes_transferred) { - timer_.cancel (ec); - - if (journal_.trace) { - if (! ec_) - journal_.trace << - id_ << " on_write_response, " << - bytes_transferred << " bytes"; - else - journal_.trace << - id_ << " on_write_response failed, " << - ec.message(); + error_code ec; + timer_.cancel (ec); + } + + if (state_ == State::closed) + { + if (journal_.trace) journal_.trace << id_ << + "write: closed"; + return; } if (ec == boost::asio::error::operation_aborted) - return; - - if (ec != 0) { - failed (ec); + if (journal_.trace) journal_.trace << id_ << + "write: aborted"; return; } + if (ec) + { + if (journal_.debug) journal_.debug << id_ << + "write: " << ec.message(); + return fail (ec); + } + + if (bytes_transferred == 0) + if (journal_.error) journal_.error << id_ << + "write: 0 bytes"; + + if (journal_.trace) journal_.trace << id_ << + "write: " << bytes_transferred << " bytes"; + bytes_out_ += bytes_transferred; - assert (pending_writes_ > 0); - if (--pending_writes_ > 0) - return; - - if (closed_) + bool empty; { - socket_->shutdown (socket::shutdown_send, ec); + std::lock_guard lock (mutex_); + buffer& b = write_queue_.front(); + b.used += bytes_transferred; + if (b.used == b.bytes) + { + write_queue_.pop_front(); + empty = write_queue_.empty(); + } + else + { + assert (b.used < b.bytes); + empty = false; + } + } + + if (! empty) + return async_write(); + + if (! complete_) return; + // Handler is done writing, did we graceful close? + if (state_ == State::flush) + { + if (socket_->needs_handshake()) + { + // ssl::stream + start_timer(); + socket_->async_shutdown (strand_.wrap (std::bind ( + &Peer::on_shutdown, shared_from_this(), + beast::asio::placeholders::error))); + return; + } + + { + error_code ec; + socket_->shutdown (MultiSocket::shutdown_send, ec); + } + return; } + + // keep-alive + complete_ = false; + async_read(); +} + +// Called when async_shutdown completes +void +Peer::on_shutdown (error_code ec) +{ + { + error_code ec; + timer_.cancel (ec); + } + + if (ec == boost::asio::error::operation_aborted) + { + // We canceled i/o on the socket, or we called async_shutdown + // and then closed the socket before getting the completion. + if (journal_.trace) journal_.trace << id_ << + "shutdown: aborted"; + return; + } + + if (ec == boost::asio::error::eof) + { + // Happens when ssl::stream::async_shutdown completes without error + if (journal_.trace) journal_.trace << id_ << + "shutdown: eof"; + return; + } + + if ((ec.category() == boost::asio::error::get_ssl_category()) + && (ERR_GET_REASON(ec.value()) == SSL_R_SHORT_READ)) + { + // Remote peer failed to send SSL close_notify message. + if (journal_.trace) journal_.trace << id_ << + "shutdown: missing close_notify"; + return; + } + + if (ec) + { + if (journal_.debug) journal_.debug << id_ << + "shutdown: " << ec.message(); + return fail (ec); + } + + if (journal_.trace) journal_.trace << id_ << + "shutdown"; } //------------------------------------------------------------------------------ @@ -371,33 +523,28 @@ Peer::on_write_response (error_code ec, std::size_t bytes_transferred, void Peer::write (void const* buffer, std::size_t bytes) { - server_.get_io_service().dispatch (strand_.wrap ( - std::bind (&Peer::async_write, shared_from_this(), - SharedBuffer (static_cast (buffer), bytes)))); -} + if (bytes == 0) + return; -// Called to indicate the response has been written (but not sent) -void -Peer::complete() -{ - if (! strand_.running_in_this_thread()) - return server_.get_io_service().dispatch (strand_.wrap ( - std::bind (&Peer::complete, shared_from_this()))); + bool empty; + { + std::lock_guard lock (mutex_); + empty = write_queue_.empty(); + write_queue_.emplace_back (buffer, bytes); + } - message_ = beast::http::message{}; - parser_ = beast::http::parser{message_, true}; - - on_read_request (error_code{}, 0); + if (empty) + server_.get_io_service().dispatch (strand_.wrap ( + std::bind (&Peer::async_write, shared_from_this()))); } // Make the Session asynchronous void Peer::detach () { - if (detached_.exchange (1) == 0) + if (! detach_ref_) { assert (! work_); - assert (detach_ref_ == nullptr); // Maintain an additional reference while detached detach_ref_ = shared_from_this(); @@ -411,27 +558,77 @@ Peer::detach () } } +// Called to indicate the response has been written (but not sent) +void +Peer::complete() +{ + if (! strand_.running_in_this_thread()) + return server_.get_io_service().dispatch (strand_.wrap ( + std::bind (&Peer::complete, shared_from_this()))); + + // Reattach + detach_ref_.reset(); + work_ = boost::none; + + message_ = beast::http::message{}; + parser_ = beast::http::parser{message_, true}; + complete_ = true; + + bool empty; + { + std::lock_guard lock (mutex_); + empty = write_queue_.empty(); + } + + if (empty) + { + // keep-alive + complete_ = false; + async_read(); + } +} + // Called from the Handler to close the session. void Peer::close (bool graceful) { - closed_ = true; - if (! strand_.running_in_this_thread()) return server_.get_io_service().dispatch (strand_.wrap ( std::bind (&Peer::close, shared_from_this(), graceful))); - if (! graceful || pending_writes_ == 0) - { - // We should cancel pending i/o + // Reattach + detach_ref_.reset(); + work_ = boost::none; - if (pending_writes_ == 0) + assert (state_ == State::open); + state_ = graceful ? State::flush : State::closed; + + complete_ = true; + + if (graceful) + { + bool empty; { + std::lock_guard lock (mutex_); + empty = write_queue_.empty(); + } + + if (! empty) + return; + + if (socket_->needs_handshake()) + { + start_timer(); + socket_->async_shutdown (strand_.wrap (std::bind ( + &Peer::on_shutdown, shared_from_this(), + beast::asio::placeholders::error))); + return; } } - // Release our additional reference - detach_ref_.reset(); + error_code ec; + timer_.cancel (ec); + socket_->close (ec); } } diff --git a/src/ripple/http/impl/Peer.h b/src/ripple/http/impl/Peer.h index 53559db9b..030428901 100644 --- a/src/ripple/http/impl/Peer.h +++ b/src/ripple/http/impl/Peer.h @@ -33,13 +33,19 @@ #include #include #include +#include +#include #include #include +#include #include +#include namespace ripple { namespace HTTP { +//------------------------------------------------------------------------------ + // Holds the copy of buffers being sent // VFALCO TODO Replace with std::shared_ptr // @@ -54,50 +60,79 @@ class Peer { private: typedef std::chrono::system_clock clock_type; + typedef MultiSocket socket_type; enum { // Size of our receive buffer - bufferSize = 2048, + bufferSize = 4 * 1024, // Largest HTTP request allowed maxRequestBytes = 32 * 1024, // Max seconds without completing a message timeoutSeconds = 30 - //timeoutSeconds = 3 }; + enum class State + { + open, + flush, + closed + }; + + struct buffer + { + buffer (void const* ptr, std::size_t len) + : data (new char[len]) + , bytes (len) + , used (0) + { + memcpy (data.get(), ptr, len); + } + + std::unique_ptr data; + std::size_t bytes; + std::size_t used; + }; + beast::Journal journal_; ServerImpl& server_; std::string id_; + std::size_t nid_; boost::asio::io_service::strand strand_; boost::asio::deadline_timer timer_; - std::unique_ptr socket_; + std::unique_ptr socket_; boost::asio::streambuf read_buf_; beast::http::message message_; beast::http::parser parser_; - int pending_writes_; - bool closed_; - bool finished_; - bool callClose_; + std::list write_queue_; + std::mutex mutex_; + State state_ = State::open; + bool complete_ = false; + bool callClose_ = false; std::shared_ptr detach_ref_; boost::optional work_; boost::system::error_code ec_; - std::atomic detached_; clock_type::time_point when_; std::string when_str_; - int request_count_; - std::size_t bytes_in_; - std::size_t bytes_out_; + int request_count_ = 0; + std::size_t bytes_in_ = 0; + std::size_t bytes_out_ = 0; //-------------------------------------------------------------------------- + static std::atomic s_count_; + public: + static + std::size_t + count(); + Peer (ServerImpl& impl, Port const& port, beast::Journal journal); ~Peer (); @@ -114,20 +149,20 @@ public: } void - accept(); + accept (boost::asio::ip::tcp::endpoint endpoint); private: void - cancel(); - - void - failed (error_code const& ec); + fail (error_code ec); void start_timer(); void - async_write (SharedBuffer const& buf); + async_write(); + + void + async_read(); //-------------------------------------------------------------------------- // @@ -138,17 +173,19 @@ private: on_timer (error_code ec); void - on_ssl_handshake (error_code ec); + on_handshake (error_code ec); void - on_read_request (error_code ec, std::size_t bytes_transferred); + on_read (error_code ec, std::size_t bytes_transferred); void - on_write_response (error_code ec, std::size_t bytes_transferred, - SharedBuffer const& buf); + on_write (error_code ec, std::size_t bytes_transferred); void - on_close (); + on_shutdown (error_code ec); + + void + on_close (error_code ec); //-------------------------------------------------------------------------- // @@ -177,10 +214,10 @@ private: write (void const* buffer, std::size_t bytes) override; void - complete() override; + detach() override; void - detach() override; + complete() override; void close (bool graceful) override; diff --git a/src/ripple/http/impl/ServerImpl.cpp b/src/ripple/http/impl/ServerImpl.cpp index 632a6d2e5..741a7fd3e 100644 --- a/src/ripple/http/impl/ServerImpl.cpp +++ b/src/ripple/http/impl/ServerImpl.cpp @@ -38,6 +38,7 @@ ServerImpl::ServerImpl (Server& server, , m_strand (io_service_) , m_work (boost::in_place (std::ref (io_service_))) , m_stopped (true) + , hist_{} { thread_ = std::thread (std::bind ( &ServerImpl::run, this)); @@ -132,15 +133,44 @@ void ServerImpl::remove (Door& door) { std::lock_guard lock (mutex_); - state_.doors.push_back (door); + state_.doors.erase (state_.doors.iterator_to (door)); } //-------------------------------------------------------------------------- +int +ServerImpl::ceil_log2 (unsigned long long x) +{ + static const unsigned long long t[6] = { + 0xFFFFFFFF00000000ull, + 0x00000000FFFF0000ull, + 0x000000000000FF00ull, + 0x00000000000000F0ull, + 0x000000000000000Cull, + 0x0000000000000002ull + }; + + int y = (((x & (x - 1)) == 0) ? 0 : 1); + int j = 32; + int i; + + for(i = 0; i < 6; i++) { + int k = (((x & t[i]) == 0) ? 0 : j); + y += k; + x >>= k; + j >>= 1; + } + + return y; +} + void ServerImpl::report (Stat&& stat) { + int const bucket = ceil_log2 (stat.requests); std::lock_guard lock (mutex_); + ++hist_[bucket]; + high_ = std::max (high_, bucket); if (stats_.size() >= historySize) stats_.pop_back(); stats_.emplace_front (std::move(stat)); @@ -153,12 +183,26 @@ ServerImpl::onWrite (beast::PropertyStream::Map& map) // VFALCO TODO Write the list of doors + map ["active"] = Peer::count(); + { - beast::PropertyStream::Set set ("sessions", map); + std::string s; + for (int i = 0; i <= high_; ++i) + { + if (i) + s += ", "; + s += std::to_string (hist_[i]); + } + map ["hist"] = s; + } + + { + beast::PropertyStream::Set set ("history", map); for (auto const& stat : stats_) { beast::PropertyStream::Map item (set); + item ["id"] = stat.id; item ["when"] = stat.when; { diff --git a/src/ripple/http/impl/ServerImpl.h b/src/ripple/http/impl/ServerImpl.h index ecd48da9a..4aede59ad 100644 --- a/src/ripple/http/impl/ServerImpl.h +++ b/src/ripple/http/impl/ServerImpl.h @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -43,6 +44,7 @@ class Peer; struct Stat { + std::size_t id; std::string when; std::chrono::seconds elapsed; int requests; @@ -88,6 +90,8 @@ private: State state_; Doors m_doors; std::deque stats_; + std::array hist_; + int high_ = 0; public: ServerImpl (Server& server, Handler& handler, beast::Journal journal); @@ -136,6 +140,10 @@ public: onWrite (beast::PropertyStream::Map& map); private: + static + int + ceil_log2 (unsigned long long x); + static int compare (Port const& lhs, Port const& rhs); diff --git a/src/ripple/module/app/main/Application.cpp b/src/ripple/module/app/main/Application.cpp index 257a37a86..dcab531e7 100644 --- a/src/ripple/module/app/main/Application.cpp +++ b/src/ripple/module/app/main/Application.cpp @@ -303,7 +303,7 @@ public: , m_deprecatedUNL (UniqueNodeList::New (*m_jobQueue)) , m_rpcHTTPServer (make_RPCHTTPServer (*m_networkOPs, - m_logs.journal("HTTPServer"), *m_jobQueue, *m_networkOPs, *m_resourceManager)) + *m_jobQueue, *m_networkOPs, *m_resourceManager)) , m_rpcServerHandler (*m_networkOPs, *m_resourceManager) // passive object, not a Service diff --git a/src/ripple/module/app/main/RPCHTTPServer.cpp b/src/ripple/module/app/main/RPCHTTPServer.cpp index c70b0b849..f6c0d7ec9 100644 --- a/src/ripple/module/app/main/RPCHTTPServer.cpp +++ b/src/ripple/module/app/main/RPCHTTPServer.cpp @@ -39,16 +39,15 @@ public: HTTP::Server m_server; std::unique_ptr m_context; - RPCHTTPServerImp (Stoppable& parent, - beast::Journal journal, JobQueue& jobQueue, NetworkOPs& networkOPs, - Resource::Manager& resourceManager) + RPCHTTPServerImp (Stoppable& parent, JobQueue& jobQueue, + NetworkOPs& networkOPs, Resource::Manager& resourceManager) : RPCHTTPServer (parent) , m_resourceManager (resourceManager) - , m_journal (journal) + , m_journal (deprecatedLogs().journal("HTTP-RPC")) , m_jobQueue (jobQueue) , m_networkOPs (networkOPs) , m_deprecatedHandler (networkOPs, resourceManager) - , m_server (*this, journal) + , m_server (*this, deprecatedLogs().journal("HTTP")) { if (getConfig ().RPC_SECURE == 0) { @@ -307,19 +306,18 @@ public: RPCHTTPServer::RPCHTTPServer (Stoppable& parent) : Stoppable ("RPCHTTPServer", parent) - , Source ("rpc") + , Source ("http") { } //------------------------------------------------------------------------------ std::unique_ptr -make_RPCHTTPServer (beast::Stoppable& parent, beast::Journal journal, - JobQueue& jobQueue, NetworkOPs& networkOPs, - Resource::Manager& resourceManager) +make_RPCHTTPServer (beast::Stoppable& parent, JobQueue& jobQueue, + NetworkOPs& networkOPs, Resource::Manager& resourceManager) { return std::make_unique ( - parent, journal, jobQueue, networkOPs, resourceManager); + parent, jobQueue, networkOPs, resourceManager); } } diff --git a/src/ripple/module/app/main/RPCHTTPServer.h b/src/ripple/module/app/main/RPCHTTPServer.h index 0a2273676..51e5295f4 100644 --- a/src/ripple/module/app/main/RPCHTTPServer.h +++ b/src/ripple/module/app/main/RPCHTTPServer.h @@ -1,4 +1,4 @@ -//------------------------------------------------------------------------------ + //------------------------------------------------------------------------------ /* This file is part of rippled: https://github.com/ripple/rippled Copyright (c) 2012, 2013 Ripple Labs Inc. @@ -37,17 +37,19 @@ public: virtual ~RPCHTTPServer() = default; - /** Opens listening ports based on the Config settings. */ + /** Opens listening ports based on the Config settings + This is implemented outside the constructor to support + two-stage initialization in the Application object. + */ virtual void setup (beast::Journal journal) = 0; }; std::unique_ptr -make_RPCHTTPServer (beast::Stoppable& parent, beast::Journal journal, - JobQueue& jobQueue, NetworkOPs& networkOPs, - Resource::Manager& resourceManager); +make_RPCHTTPServer (beast::Stoppable& parent, JobQueue& jobQueue, + NetworkOPs& networkOPs, Resource::Manager& resourceManager); -} +} // ripple #endif