diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index 8ce1bba1e5..b95b41a4d6 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -2575,9 +2575,6 @@ - - True - diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index 9b1c36d4cd..10d0bed843 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -3666,9 +3666,6 @@ ripple\http\impl - - ripple\http\impl - ripple\http\impl diff --git a/SConstruct b/SConstruct index 60692bcfad..115fde358e 100644 --- a/SConstruct +++ b/SConstruct @@ -290,6 +290,8 @@ def config_env(toolchain, variant, env): ]) boost_libs = [ + 'boost_coroutine', + 'boost_context', 'boost_date_time', 'boost_filesystem', 'boost_program_options', diff --git a/src/ripple/http/Session.h b/src/ripple/http/Session.h index 10e7f6fc2f..31dc2f589d 100644 --- a/src/ripple/http/Session.h +++ b/src/ripple/http/Session.h @@ -86,7 +86,7 @@ public: The initial value is always zero. Changes to the value are persisted between calls. */ - void* tag; + void* tag = nullptr; /** Returns the Journal to use for logging. */ virtual diff --git a/src/ripple/http/impl/Door.cpp b/src/ripple/http/impl/Door.cpp index 86ecd716bf..f93b63d623 100644 --- a/src/ripple/http/impl/Door.cpp +++ b/src/ripple/http/impl/Door.cpp @@ -20,14 +20,139 @@ #include #include #include +#include +#include + +#include namespace ripple { namespace HTTP { -Door::Door (ServerImpl& impl, Port const& port) - : server_ (impl) - , acceptor_ (server_.get_io_service(), to_asio (port)) +/** Detect SSL client handshakes. + Analyzes the bytes in the provided buffer to detect the SSL client + handshake. If the buffer contains insufficient data, more data will be + read from the stream until there is enough to determine a result. + No bytes are discarded from buf. Any additional bytes read are retained. + buf must provide an interface compatible with boost::asio::streambuf + http://www.boost.org/doc/libs/1_56_0/doc/html/boost_asio/reference/streambuf.html + See + http://www.ietf.org/rfc/rfc2246.txt + Section 7.4. Handshake protocol + @param socket The stream to read from + @param buf A buffer to hold the received data + @param yield A yield context + @return The error code if an error occurs, otherwise `true` if + the data read indicates the SSL client handshake. +*/ +template +std::pair +detect_ssl (Socket& socket, StreamBuf& buf, Yield yield) +{ + std::pair result; + result.second = false; + for(;;) + { + std::size_t const max = 4; // the most bytes we could need + std::array data; + auto const bytes = boost::asio::buffer_copy ( + boost::asio::buffer(data), buf.data()); + + if (bytes > 0) + { + if (data[0] != 0x16) // message type 0x16 = "SSL Handshake" + break; + } + + if (bytes >= max) + { + result.second = true; + break; + } + + std::size_t const bytes_transferred = boost::asio::async_read (socket, + buf.prepare(max - bytes), boost::asio::transfer_at_least(1), + yield[result.first]); + if (result.first) + break; + buf.commit (bytes_transferred); + } + return result; +} + +//------------------------------------------------------------------------------ + +Door::connection::connection (Door& door, socket_type&& socket, + endpoint_type endpoint) + : door_ (door) + , socket_ (std::move(socket)) + , endpoint_ (endpoint) + , strand_ (door.io_service_) + , timer_ (door.io_service_) +{ +} + +// Work-around because we can't call shared_from_this in ctor +void +Door::connection::run() +{ + boost::asio::spawn (strand_, std::bind (&connection::do_detect, + shared_from_this(), std::placeholders::_1)); + + boost::asio::spawn (strand_, std::bind (&connection::do_timer, + shared_from_this(), std::placeholders::_1)); +} + +void +Door::connection::do_timer (yield_context yield) +{ + error_code ec; // ignored + while (socket_.is_open()) + { + timer_.async_wait (yield[ec]); + if (timer_.expires_from_now() <= std::chrono::seconds(0)) + socket_.close(); + } +} + +void +Door::connection::do_detect (boost::asio::yield_context yield) +{ + bool ssl; + error_code ec; + boost::asio::streambuf buf; + timer_.expires_from_now (std::chrono::seconds(15)); + std::tie(ec, ssl) = detect_ssl (socket_, buf, yield); + if (! ec) + { + if (ssl) + { + auto const peer = std::make_shared (door_.server_, + door_.port_, door_.server_.journal(), endpoint_, + buf.data(), std::move(socket_)); + peer->accept(); + return; + } + + auto const peer = std::make_shared (door_.server_, + door_.port_, door_.server_.journal(), endpoint_, + buf.data(), std::move(socket_)); + peer->accept(); + return; + } + + socket_.close(); + timer_.cancel(); +} + +//------------------------------------------------------------------------------ + +Door::Door (boost::asio::io_service& io_service, + ServerImpl& impl, Port const& port) + : io_service_ (io_service) + , timer_ (io_service) + , acceptor_ (io_service, to_asio (port)) , port_ (port) + , server_ (impl) { server_.add (*this); @@ -44,8 +169,6 @@ Door::Door (ServerImpl& impl, Port const& port) { server_.journal().info << "Bound to endpoint " << to_string (acceptor_.local_endpoint()); - - async_accept(); } else { @@ -60,10 +183,11 @@ Door::~Door () server_.remove (*this); } -Port const& -Door::port () const +void +Door::listen() { - return port_; + boost::asio::spawn (io_service_, std::bind (&Door::do_accept, + shared_from_this(), std::placeholders::_1)); } void @@ -72,37 +196,46 @@ Door::cancel () acceptor_.cancel(); } -void -Door::failed (error_code ec) -{ -} +//------------------------------------------------------------------------------ void -Door::async_accept () +Door::do_accept (boost::asio::yield_context yield) { - auto const peer (std::make_shared (server_, port_, server_.journal())); - acceptor_.async_accept (peer->get_socket(), endpoint_, std::bind ( - &Door::handle_accept, Ptr(this), - beast::asio::placeholders::error, peer)); -} - -void -Door::handle_accept (error_code ec, - std::shared_ptr const& peer) -{ - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec) + for(;;) { - server_.journal().error << - "accept: " << ec.message(); - return; + error_code ec; + endpoint_type endpoint; + socket_type socket (io_service_); + acceptor_.async_accept (socket, endpoint, yield[ec]); + if (ec) + { + if (ec != boost::asio::error::operation_aborted) + server_.journal().error << + "accept: " << ec.message(); + break; + } + + if (port_.security == Port::Security::no_ssl) + { + auto const peer = std::make_shared (server_, + port_, server_.journal(), endpoint, + boost::asio::null_buffers(), std::move(socket)); + peer->accept(); + } + else if (port_.security == Port::Security::require_ssl) + { + auto const peer = std::make_shared (server_, + port_, server_.journal(), endpoint, + boost::asio::null_buffers(), std::move(socket)); + peer->accept(); + } + else + { + auto const c = std::make_shared ( + *this, std::move(socket), endpoint); + c->run(); + } } - - auto const endpoint = endpoint_; - async_accept(); - peer->accept (endpoint); } } diff --git a/src/ripple/http/impl/Door.h b/src/ripple/http/impl/Door.h index 0950ea8847..96d4ef23fc 100644 --- a/src/ripple/http/impl/Door.h +++ b/src/ripple/http/impl/Door.h @@ -22,46 +22,79 @@ #include #include -#include +#include +#include +#include +#include +#include +#include namespace ripple { namespace HTTP { /** A listening socket. */ class Door - : public beast::SharedObject - , public beast::List ::Node - , public beast::LeakChecked + : public beast::List ::Node + , public std::enable_shared_from_this { private: - // VFALCO TODO Use shared_ptr - typedef beast::SharedPtr Ptr; + using clock_type = std::chrono::steady_clock; + using timer_type = boost::asio::basic_waitable_timer; + using error_code = boost::system::error_code; + using yield_context = boost::asio::yield_context; + using protocol_type = boost::asio::ip::tcp; + using acceptor_type = protocol_type::acceptor; + using endpoint_type = protocol_type::endpoint; + using socket_type = protocol_type::socket; - ServerImpl& server_; - acceptor acceptor_; - endpoint_t endpoint_; + boost::asio::io_service& io_service_; + boost::asio::basic_waitable_timer timer_; + acceptor_type acceptor_; Port port_; + ServerImpl& server_; public: - Door (ServerImpl& impl, Port const& port); + Door (boost::asio::io_service& io_service, + ServerImpl& impl, Port const& port); ~Door (); Port const& - port () const; + port() const + { + return port_; + } - void - cancel (); + void listen(); + void cancel(); - void - failed (error_code ec); +private: + class connection + : public std::enable_shared_from_this + { + private: + Door& door_; + socket_type socket_; + endpoint_type endpoint_; + boost::asio::io_service::strand strand_; + timer_type timer_; - void - async_accept (); + public: + connection (Door& door, socket_type&& socket, + endpoint_type endpoint); - void - handle_accept (error_code ec, - std::shared_ptr const& peer); + void + run(); + + private: + void + do_timer (yield_context yield); + + void + do_detect (yield_context yield); + }; + + void do_accept (yield_context yield); }; } diff --git a/src/ripple/http/impl/Peer.cpp b/src/ripple/http/impl/Peer.cpp deleted file mode 100644 index b6ee04222d..0000000000 --- a/src/ripple/http/impl/Peer.cpp +++ /dev/null @@ -1,637 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include -#include -#include - -namespace ripple { -namespace HTTP { - -/* -Reference: -http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error -*/ - -std::atomic Peer::s_count_; - -std::size_t -Peer::count() -{ - return s_count_.load(); -} - -Peer::Peer (ServerImpl& server, Port const& port, beast::Journal journal) - : journal_ (journal) - , server_ (server) - , strand_ (server_.get_io_service()) - , timer_ (server_.get_io_service()) - , parser_ (message_, true) -{ - ++s_count_; - - static std::atomic sid; - nid_ = ++sid; - id_ = std::string("#") + std::to_string(nid_) + " "; - - tag = nullptr; - - int flags = MultiSocket::Flag::server_role; - - switch (port.security) - { - case Port::Security::no_ssl: - break; - - case Port::Security::allow_ssl: - flags |= MultiSocket::Flag::ssl; - break; - - case Port::Security::require_ssl: - flags |= MultiSocket::Flag::ssl_required; - break; - } - - socket_.reset (MultiSocket::New ( - server_.get_io_service(), port.context->get(), flags)); - - server_.add (*this); - - if (journal_.trace) journal_.trace << id_ << - "created"; -} - -Peer::~Peer () -{ - if (callClose_) - { - Stat stat; - stat.id = nid_; - stat.when = std::move (when_str_); - stat.elapsed = std::chrono::duration_cast < - std::chrono::seconds> (clock_type::now() - when_); - stat.requests = request_count_; - stat.bytes_in = bytes_in_; - stat.bytes_out = bytes_out_; - stat.ec = std::move (ec_); - server_.report (std::move (stat)); - - server_.handler().onClose (session(), ec_); - } - - server_.remove (*this); - - if (journal_.trace) journal_.trace << id_ << - "destroyed: " << request_count_ << - ((request_count_ == 1) ? " request" : " requests"); - - --s_count_; -} - -//------------------------------------------------------------------------------ - -// Called when the acceptor accepts our socket. -void -Peer::accept (boost::asio::ip::tcp::endpoint endpoint) -{ - if (! strand_.running_in_this_thread()) - return server_.get_io_service().dispatch (strand_.wrap ( - std::bind (&Peer::accept, shared_from_this(), endpoint))); - - if (journal_.trace) journal_.trace << id_ << - "accept: " << endpoint.address(); - - callClose_ = true; - - when_ = clock_type::now(); - when_str_ = beast::Time::getCurrentTime().formatted ( - "%Y-%b-%d %H:%M:%S").toStdString(); - - server_.handler().onAccept (session()); - - // Handler might have closed - if (state_ == State::closed) - { - // VFALCO TODO Is this the correct way to close the socket? - // See what state the socket is in and verify. - //closed(); - return; - } - - if (socket_->needs_handshake ()) - { - start_timer(); - socket_->async_handshake (beast::asio::abstract_socket::server, - strand_.wrap (std::bind (&Peer::on_handshake, shared_from_this(), - beast::asio::placeholders::error))); - } - else - { - async_read(); - } -} - -// Called by a completion handler when error is not eof or aborted. -void -Peer::fail (error_code ec) -{ - assert (ec); - assert (strand_.running_in_this_thread()); - - if (journal_.debug) journal_.debug << id_ << - "fail: " << ec.message(); - - ec_ = ec; - socket_->cancel(ec); -} - -// Start the timer. -// If the timer expires, the session is considered -// timed out and will be forcefully closed. -void -Peer::start_timer() -{ - timer_.expires_from_now ( - boost::posix_time::seconds ( - timeoutSeconds)); - - timer_.async_wait (strand_.wrap (std::bind ( - &Peer::on_timer, shared_from_this(), - beast::asio::placeholders::error))); -} - -void -Peer::async_write () -{ - void const* data; - std::size_t bytes; - { - std::lock_guard lock (mutex_); - buffer& b = write_queue_.front(); - data = b.data.get() + b.used; - bytes = b.bytes - b.used; - } - - start_timer(); - boost::asio::async_write (*socket_, boost::asio::buffer (data, bytes), - boost::asio::transfer_at_least (1), strand_.wrap (std::bind ( - &Peer::on_write, shared_from_this(), - beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); -} - -void -Peer::async_read() -{ - start_timer(); - boost::asio::async_read (*socket_, read_buf_.prepare (bufferSize), - boost::asio::transfer_at_least (1), strand_.wrap (std::bind ( - &Peer::on_read, shared_from_this(), - beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); -} - -//------------------------------------------------------------------------------ - -// Called when session times out -void -Peer::on_timer (error_code ec) -{ - if (state_ == State::closed) - { - if (journal_.trace) journal_.trace << id_ << - "timer: closed"; - return; - } - - if (ec == boost::asio::error::operation_aborted) - { - // Disable this otherwise we log needlessly - // on every new read and write. - /* - if (journal_.trace) journal_.trace << id_ << - "timer: aborted"; - */ - return; - } - - if (! ec) - ec = boost::system::errc::make_error_code ( - boost::system::errc::timed_out); - - if (journal_.debug) journal_.debug << id_ << - "timer: " << ec.message(); - fail (ec); -} - -// Called when the handshake completes -void -Peer::on_handshake (error_code ec) -{ - { - error_code ec; - timer_.cancel(ec); - } - - bool const ssl = socket_->ssl_handle() != nullptr; - - if (state_ == State::closed) - { - if (journal_.trace) journal_.trace << id_ << - "handshake: closed"; - return; - } - - if (ec == boost::asio::error::operation_aborted) - { - if (journal_.trace) journal_.trace << id_ << - "handshake: aborted"; - return; - } - - if (ec) - { - if (journal_.debug) journal_.debug << id_ << - "handshake: " << ec.message(); - return fail (ec); - } - - if (journal_.trace) journal_.trace << id_ << - "handshake" << (ssl ? ": ssl" : ""); - - async_read(); -} - -// Called repeatedly with the http request data -void -Peer::on_read (error_code ec, std::size_t bytes_transferred) -{ - // This needs to happen before the call to onRequest - // otherwise we could time out if the Handler takes too long. - { - error_code ec; - timer_.cancel(ec); - } - - if (state_ == State::closed) - { - if (journal_.trace) journal_.trace << id_ << - "read: closed"; - return; - } - - if (ec == boost::asio::error::operation_aborted) - { - if (journal_.trace) journal_.trace << id_ << - "read: aborted"; - return; - } - - if (beast::asio::is_short_read (ec)) - { - if (journal_.trace) journal_.trace << id_ << - "read: " << ec.message(); - return fail (ec); - } - - if (ec && ec != boost::asio::error::eof) - { - if (journal_.debug) journal_.debug << id_ << - "read: " << ec.message(); - return fail (ec); - } - - bool const eof = ec == boost::asio::error::eof; - if (! eof) - { - if (journal_.trace) journal_.trace << id_ << - "read: " << bytes_transferred << " bytes"; - } - else - { - // End of stream reached: - // http://www.boost.org/doc/libs/1_56_0/doc/html/boost_asio/overview/core/streams.html - if (bytes_transferred != 0) - if (journal_.error) journal_.error << id_ << - "read: eof (" << bytes_transferred << " bytes)"; - if (journal_.debug) journal_.debug << id_ << - "read: eof"; - ec = error_code{}; - } - - bytes_in_ += bytes_transferred; - read_buf_.commit (bytes_transferred); - - std::pair result; - if (! eof) - { - result = parser_.write (read_buf_.data()); - if (result.first) - read_buf_.consume (result.second); - else - ec = parser_.error(); - } - else - { - result.first = parser_.write_eof(); - if (! result.first) - ec = parser_.error(); - } - - // VFALCO TODO Currently parsing errors are treated the - // same as the connection dropping. Instead, we - // should request that the handler compose a proper HTTP error - // response. This requires refactoring HTTPReply() into - // something sensible. - // - if (! ec && parser_.complete()) - { - // Perform half-close when Connection: close and not SSL - if (! message_.keep_alive() && - ! socket_->needs_handshake()) - socket_->shutdown (socket::shutdown_receive, ec); - - if (! ec) - { - ++request_count_; - server_.handler().onRequest (session()); - return; - } - } - - if (ec) - { - if (journal_.debug) journal_.debug << id_ << - "read: " << ec.message(); - return fail (ec); - } - - if (! eof) - async_read(); -} - -// Called when async_write completes. -void -Peer::on_write (error_code ec, std::size_t bytes_transferred) -{ - { - error_code ec; - timer_.cancel (ec); - } - - if (state_ == State::closed) - { - if (journal_.trace) journal_.trace << id_ << - "write: closed"; - return; - } - - if (ec == boost::asio::error::operation_aborted) - { - if (journal_.trace) journal_.trace << id_ << - "write: aborted"; - return; - } - - if (ec) - { - if (journal_.debug) journal_.debug << id_ << - "write: " << ec.message(); - return fail (ec); - } - - if (bytes_transferred == 0) - if (journal_.error) journal_.error << id_ << - "write: 0 bytes"; - - if (journal_.trace) journal_.trace << id_ << - "write: " << bytes_transferred << " bytes"; - - bytes_out_ += bytes_transferred; - - bool empty; - { - std::lock_guard lock (mutex_); - buffer& b = write_queue_.front(); - b.used += bytes_transferred; - if (b.used == b.bytes) - { - write_queue_.pop_front(); - empty = write_queue_.empty(); - } - else - { - assert (b.used < b.bytes); - empty = false; - } - } - - if (! empty) - return async_write(); - - if (! complete_) - return; - - // Handler is done writing, did we graceful close? - if (state_ == State::flush) - { - if (socket_->needs_handshake()) - { - // ssl::stream - start_timer(); - socket_->async_shutdown (strand_.wrap (std::bind ( - &Peer::on_shutdown, shared_from_this(), - beast::asio::placeholders::error))); - return; - } - - { - error_code ec; - socket_->shutdown (MultiSocket::shutdown_send, ec); - } - return; - } - - // keep-alive - complete_ = false; - async_read(); -} - -// Called when async_shutdown completes -void -Peer::on_shutdown (error_code ec) -{ - { - error_code ec; - timer_.cancel (ec); - } - - if (ec == boost::asio::error::operation_aborted) - { - // We canceled i/o on the socket, or we called async_shutdown - // and then closed the socket before getting the completion. - if (journal_.trace) journal_.trace << id_ << - "shutdown: aborted"; - return; - } - - if (ec == boost::asio::error::eof) - { - // Happens when ssl::stream::async_shutdown completes without error - if (journal_.trace) journal_.trace << id_ << - "shutdown: eof"; - return; - } - - if ((ec.category() == boost::asio::error::get_ssl_category()) - && (ERR_GET_REASON(ec.value()) == SSL_R_SHORT_READ)) - { - // Remote peer failed to send SSL close_notify message. - if (journal_.trace) journal_.trace << id_ << - "shutdown: missing close_notify"; - return; - } - - if (ec) - { - if (journal_.debug) journal_.debug << id_ << - "shutdown: " << ec.message(); - return fail (ec); - } - - if (journal_.trace) journal_.trace << id_ << - "shutdown"; -} - -//------------------------------------------------------------------------------ - -// Send a copy of the data. -void -Peer::write (void const* buffer, std::size_t bytes) -{ - if (bytes == 0) - return; - - bool empty; - { - std::lock_guard lock (mutex_); - empty = write_queue_.empty(); - write_queue_.emplace_back (buffer, bytes); - } - - if (empty) - server_.get_io_service().dispatch (strand_.wrap ( - std::bind (&Peer::async_write, shared_from_this()))); -} - -// Make the Session asynchronous -void -Peer::detach () -{ - if (! detach_ref_) - { - assert (! work_); - - // Maintain an additional reference while detached - detach_ref_ = shared_from_this(); - - // Prevent the io_service from running out of work. - // The work object will be destroyed with the Peer - // after the Session is closed and handlers complete. - // - work_ = boost::in_place (std::ref ( - server_.get_io_service())); - } -} - -// Called to indicate the response has been written (but not sent) -void -Peer::complete() -{ - if (! strand_.running_in_this_thread()) - return server_.get_io_service().dispatch (strand_.wrap ( - std::bind (&Peer::complete, shared_from_this()))); - - // Reattach - detach_ref_.reset(); - work_ = boost::none; - - message_ = beast::http::message{}; - parser_ = beast::http::parser{message_, true}; - complete_ = true; - - bool empty; - { - std::lock_guard lock (mutex_); - empty = write_queue_.empty(); - } - - if (empty) - { - // keep-alive - complete_ = false; - async_read(); - } -} - -// Called from the Handler to close the session. -void -Peer::close (bool graceful) -{ - if (! strand_.running_in_this_thread()) - return server_.get_io_service().dispatch (strand_.wrap ( - std::bind (&Peer::close, shared_from_this(), graceful))); - - // Reattach - detach_ref_.reset(); - work_ = boost::none; - - assert (state_ == State::open); - state_ = graceful ? State::flush : State::closed; - - complete_ = true; - - if (graceful) - { - bool empty; - { - std::lock_guard lock (mutex_); - empty = write_queue_.empty(); - } - - if (! empty) - return; - - if (socket_->needs_handshake()) - { - start_timer(); - socket_->async_shutdown (strand_.wrap (std::bind ( - &Peer::on_shutdown, shared_from_this(), - beast::asio::placeholders::error))); - return; - } - } - - error_code ec; - timer_.cancel (ec); - socket_->close (ec); -} - -} -} diff --git a/src/ripple/http/impl/Peer.h b/src/ripple/http/impl/Peer.h index 25da875e32..5a7fa028ad 100644 --- a/src/ripple/http/impl/Peer.h +++ b/src/ripple/http/impl/Peer.h @@ -26,56 +26,58 @@ #include #include #include +#include // for is_short_read? #include #include #include -#include #include +#include #include -#include +#include #include +#include #include #include #include #include #include +#include // namespace ripple { namespace HTTP { //------------------------------------------------------------------------------ -/** Represents an active connection. */ -class Peer - : public std::enable_shared_from_this - , public Session - , public beast::List ::Node - , public beast::LeakChecked +class BasicPeer + : public beast::List ::Node { -private: - typedef std::chrono::system_clock clock_type; - typedef MultiSocket socket_type; +public: + virtual ~BasicPeer() = default; +}; + +//------------------------------------------------------------------------------ + +/** Represents an active connection. */ +template +class Peer + : public BasicPeer + , public Session +{ +protected: + using clock_type = std::chrono::system_clock; + using endpoint_type = boost::asio::ip::tcp::endpoint; + using waitable_timer = boost::asio::basic_waitable_timer ; enum { // Size of our receive buffer bufferSize = 4 * 1024, - // Largest HTTP request allowed - maxRequestBytes = 32 * 1024, - // Max seconds without completing a message timeoutSeconds = 30 }; - enum class State - { - open, - flush, - closed - }; - struct buffer { buffer (void const* ptr, std::size_t len) @@ -93,23 +95,21 @@ private: beast::Journal journal_; ServerImpl& server_; + boost::asio::io_service::strand strand_; + waitable_timer timer_; + endpoint_type endpoint_; + std::string id_; std::size_t nid_; - boost::asio::io_service::strand strand_; - boost::asio::deadline_timer timer_; - std::unique_ptr socket_; boost::asio::streambuf read_buf_; beast::http::message message_; - beast::http::parser parser_; std::list write_queue_; std::mutex mutex_; - State state_ = State::open; + bool graceful_ = false; bool complete_ = false; - bool callClose_ = false; std::shared_ptr detach_ref_; boost::optional work_; - boost::system::error_code ec_; clock_type::time_point when_; @@ -120,71 +120,54 @@ private: //-------------------------------------------------------------------------- - static std::atomic s_count_; - public: - static - std::size_t - count(); + template + Peer (ServerImpl& impl, Port const& port, beast::Journal journal, + endpoint_type endpoint, ConstBufferSequence const& buffers); - Peer (ServerImpl& impl, Port const& port, beast::Journal journal); - ~Peer (); - - socket& - get_socket() - { - return socket_->this_layer(); - } + virtual + ~Peer(); Session& - session () + session() { return *this; } - void - accept (boost::asio::ip::tcp::endpoint endpoint); +protected: + Impl& + impl() + { + return *static_cast(this); + } -private: void - fail (error_code ec); + fail (error_code ec, char const* what); void start_timer(); void - async_write(); - - void - async_read(); - - //-------------------------------------------------------------------------- - // - // Completion Handlers - // + cancel_timer(); void on_timer (error_code ec); void - on_handshake (error_code ec); + do_read (boost::asio::yield_context yield); void - on_read (error_code ec, std::size_t bytes_transferred); + do_write (boost::asio::yield_context yield); + virtual void - on_write (error_code ec, std::size_t bytes_transferred); + do_request() = 0; + virtual void - on_shutdown (error_code ec); + do_close() = 0; - void - on_close (error_code ec); - - //-------------------------------------------------------------------------- - // // Session - // beast::Journal journal() override @@ -195,7 +178,7 @@ private: beast::IP::Endpoint remoteAddress() override { - return from_asio (get_socket().remote_endpoint()); + return from_asio (endpoint_); } beast::http::message& @@ -217,6 +200,493 @@ private: close (bool graceful) override; }; +//------------------------------------------------------------------------------ + +class PlainPeer + : public Peer + , public std::enable_shared_from_this +{ +private: + friend class Peer ; + using socket_type = boost::asio::ip::tcp::socket; + socket_type socket_; + +public: + template + PlainPeer (ServerImpl& impl, Port const& port, beast::Journal journal, + endpoint_type endpoint, ConstBufferSequence const& buffers, + socket_type&& socket); + + void + accept(); + +private: + void + do_request(); + + void + do_close(); +}; + +template +PlainPeer::PlainPeer (ServerImpl& server, Port const& port, + beast::Journal journal, endpoint_type endpoint, + ConstBufferSequence const& buffers, + boost::asio::ip::tcp::socket&& socket) + : Peer (server, port, journal, endpoint, buffers) + , socket_(std::move(socket)) +{ +} + +void +PlainPeer::accept () +{ + server_.handler().onAccept (session()); + if (! socket_.is_open()) + return; + + boost::asio::spawn (strand_, std::bind (&PlainPeer::do_read, + shared_from_this(), std::placeholders::_1)); +} + +void +PlainPeer::do_request() +{ + // Perform half-close when Connection: close and not SSL + error_code ec; + if (! message_.keep_alive()) + socket_.shutdown (socket_type::shutdown_receive, ec); + + if (! ec) + { + ++request_count_; + server_.handler().onRequest (session()); + return; + } + + if (ec) + fail (ec, "request"); +} + +void +PlainPeer::do_close() +{ + error_code ec; + socket_.shutdown (socket_type::shutdown_send, ec); +} + +//------------------------------------------------------------------------------ + +class SSLPeer + : public Peer + , public std::enable_shared_from_this +{ +private: + friend class Peer ; + using next_layer_type = boost::asio::ip::tcp::socket; + using socket_type = boost::asio::ssl::stream ; + next_layer_type next_layer_; + socket_type socket_; + +public: + template + SSLPeer (ServerImpl& impl, Port const& port, beast::Journal journal, + endpoint_type endpoint, ConstBufferSequence const& buffers, + next_layer_type&& socket); + + void + accept(); + +private: + void + do_handshake (boost::asio::yield_context yield); + + void + do_request(); + + void + do_close(); + + void + on_shutdown (error_code ec); +}; + +template +SSLPeer::SSLPeer (ServerImpl& server, Port const& port, + beast::Journal journal, endpoint_type endpoint, + ConstBufferSequence const& buffers, + boost::asio::ip::tcp::socket&& socket) + : Peer (server, port, journal, endpoint, buffers) + , next_layer_ (std::move(socket)) + , socket_ (next_layer_, port.context->get()) +{ +} + +// Called when the acceptor accepts our socket. +void +SSLPeer::accept () +{ + server_.handler().onAccept (session()); + if (! next_layer_.is_open()) + return; + + boost::asio::spawn (strand_, std::bind (&SSLPeer::do_handshake, + shared_from_this(), std::placeholders::_1)); +} + +void +SSLPeer::do_handshake (boost::asio::yield_context yield) +{ + error_code ec; + std::size_t const bytes_transferred = socket_.async_handshake ( + socket_type::server, read_buf_.data(), yield[ec]); + if (ec) + return fail (ec, "handshake"); + boost::asio::spawn (strand_, std::bind (&SSLPeer::do_read, + shared_from_this(), std::placeholders::_1)); +} + +void +SSLPeer::do_request() +{ + ++request_count_; + server_.handler().onRequest (session()); +} + +void +SSLPeer::do_close() +{ + error_code ec; + socket_.async_shutdown (strand_.wrap (std::bind ( + &SSLPeer::on_shutdown, shared_from_this(), + std::placeholders::_1))); +} + +void +SSLPeer::on_shutdown (error_code ec) +{ + socket_.next_layer().close(ec); +} + +//------------------------------------------------------------------------------ + +template +template +Peer::Peer (ServerImpl& server, Port const& port, + beast::Journal journal, endpoint_type endpoint, + ConstBufferSequence const& buffers) + : journal_ (journal) + , server_ (server) + , strand_ (server_.get_io_service()) + , timer_ (server_.get_io_service()) + , endpoint_ (endpoint) +{ + read_buf_.commit (boost::asio::buffer_copy (read_buf_.prepare ( + boost::asio::buffer_size (buffers)), buffers)); + + static std::atomic sid; + nid_ = ++sid; + id_ = std::string("#") + std::to_string(nid_) + " "; + + server_.add (*this); + + if (journal_.trace) journal_.trace << id_ << + "accept: " << endpoint.address(); + + when_ = clock_type::now(); + when_str_ = beast::Time::getCurrentTime().formatted ( + "%Y-%b-%d %H:%M:%S").toStdString(); +} + +template +Peer::~Peer() +{ + Stat stat; + stat.id = nid_; + stat.when = std::move (when_str_); + stat.elapsed = std::chrono::duration_cast < + std::chrono::seconds> (clock_type::now() - when_); + stat.requests = request_count_; + stat.bytes_in = bytes_in_; + stat.bytes_out = bytes_out_; + stat.ec = std::move (ec_); + server_.report (std::move (stat)); + + server_.handler().onClose (session(), ec_); + + server_.remove (*this); + + if (journal_.trace) journal_.trace << id_ << + "destroyed: " << request_count_ << + ((request_count_ == 1) ? " request" : " requests"); +} + +//------------------------------------------------------------------------------ + +template +void +Peer::fail (error_code ec, char const* what) +{ + if (! ec_ && ec != boost::asio::error::operation_aborted) + { + ec_ = ec; + if (journal_.trace) journal_.trace << id_ << + std::string(what) << ": " << ec.message(); + impl().socket_.lowest_layer().close (ec); + } +} + +template +void +Peer::start_timer() +{ + error_code ec; + timer_.expires_from_now (std::chrono::seconds(timeoutSeconds), ec); + if (ec) + return fail (ec, "start_timer"); + timer_.async_wait (strand_.wrap (std::bind ( + &Peer::on_timer, impl().shared_from_this(), + beast::asio::placeholders::error))); +} + +// Convenience for discarding the error code +template +void +Peer::cancel_timer() +{ + error_code ec; + timer_.cancel(ec); +} + +// Called when session times out +template +void +Peer::on_timer (error_code ec) +{ + if (ec == boost::asio::error::operation_aborted) + return; + if (! ec) + ec = boost::system::errc::make_error_code ( + boost::system::errc::timed_out); + fail (ec, "timer"); +} + +//------------------------------------------------------------------------------ + +template +void +Peer::do_read (boost::asio::yield_context yield) +{ + complete_ = false; + + error_code ec; + bool eof = false; + beast::http::parser parser (message_, true); + for(;;) + { + if (read_buf_.size() == 0) + { + start_timer(); + auto const bytes_transferred = boost::asio::async_read ( + impl().socket_, read_buf_.prepare (bufferSize), + boost::asio::transfer_at_least(1), yield[ec]); + cancel_timer(); + + eof = ec == boost::asio::error::eof; + if (eof) + { + ec = error_code{}; + } + else if (! ec) + { + bytes_in_ += bytes_transferred; + read_buf_.commit (bytes_transferred); + } + } + + if (! ec) + { + if (! eof) + { + // VFALCO TODO Currently parsing errors are treated the + // same as the connection dropping. Instead, we + // should request that the handler compose a proper HTTP error + // response. This requires refactoring HTTPReply() into + // something sensible. + auto const result = parser.write (read_buf_.data()); + if (result.first) + read_buf_.consume (result.second); + else + ec = parser.error(); + } + else + { + if (! parser.write_eof()) + ec = parser.error(); + } + } + + if (! ec) + { + if (parser.complete()) + return do_request(); + else if (eof) + ec = boost::asio::error::eof; // incomplete request + } + + if (ec) + return fail (ec, "read"); + } +} + +// Send everything in the write queue. +// The write queue must not be empty upon entry. +template +void +Peer::do_write (boost::asio::yield_context yield) +{ + error_code ec; + std::size_t bytes = 0; + for(;;) + { + bytes_out_ += bytes; + + bool empty; + void const* data; + { + std::lock_guard lock (mutex_); + buffer& b = write_queue_.front(); + b.used += bytes; + if (b.used < b.bytes) + { + empty = false; + } + else + { + write_queue_.pop_front(); + empty = write_queue_.empty(); + } + data = b.data.get() + b.used; + bytes = b.bytes - b.used; + } + if (empty) + break; + + start_timer(); + boost::asio::async_write (impl().socket_, boost::asio::buffer ( + data, bytes), boost::asio::transfer_at_least(1), yield[ec]); + cancel_timer(); + + if (ec) + return fail (ec, "write"); + } + + if (! complete_) + return; + + if (graceful_) + return do_close(); + + boost::asio::spawn (strand_, std::bind (&Peer::do_read, + impl().shared_from_this(), std::placeholders::_1)); +} + +//------------------------------------------------------------------------------ + +// Send a copy of the data. +template +void +Peer::write (void const* buffer, std::size_t bytes) +{ + if (bytes == 0) + return; + + bool empty; + { + std::lock_guard lock (mutex_); + empty = write_queue_.empty(); + write_queue_.emplace_back (buffer, bytes); + } + + if (empty) + boost::asio::spawn (strand_, std::bind (&Peer::do_write, + impl().shared_from_this(), std::placeholders::_1)); +} + +// Make the Session asynchronous +template +void +Peer::detach () +{ + if (! detach_ref_) + { + assert (! work_); + + // Maintain an additional reference while detached + detach_ref_ = impl().shared_from_this(); + + // Prevent the io_service from running out of work. + // The work object will be destroyed with the Peer + // after the Session is closed and handlers complete. + // + work_ = boost::in_place (std::ref ( + server_.get_io_service())); + } +} + +// Called to indicate the response has been written (but not sent) +template +void +Peer::complete() +{ + if (! strand_.running_in_this_thread()) + return server_.get_io_service().dispatch (strand_.wrap ( + std::bind (&Peer::complete, impl().shared_from_this()))); + + // Reattach + detach_ref_.reset(); + work_ = boost::none; + + message_ = beast::http::message{}; + complete_ = true; + + if (! write_queue_.empty()) + return; + + // keep-alive + boost::asio::spawn (strand_, std::bind (&Peer::do_read, + impl().shared_from_this(), std::placeholders::_1)); +} + +// Called from the Handler to close the session. +template +void +Peer::close (bool graceful) +{ + if (! strand_.running_in_this_thread()) + return server_.get_io_service().dispatch (strand_.wrap ( + std::bind (&Peer::close, impl().shared_from_this(), + graceful))); + + // Reattach + detach_ref_.reset(); + work_ = boost::none; + + complete_ = true; + + if (graceful) + { + graceful_ = true; + + if (! write_queue_.empty()) + return; + } + + error_code ec; + timer_.cancel (ec); + impl().socket_.lowest_layer().close (ec); +} + } } diff --git a/src/ripple/http/impl/ServerImpl.cpp b/src/ripple/http/impl/ServerImpl.cpp index 741a7fd3e4..f4541a130e 100644 --- a/src/ripple/http/impl/ServerImpl.cpp +++ b/src/ripple/http/impl/ServerImpl.cpp @@ -105,7 +105,7 @@ ServerImpl::get_io_service() // way, the Peer can never outlive the server. // void -ServerImpl::add (Peer& peer) +ServerImpl::add (BasicPeer& peer) { std::lock_guard lock (mutex_); state_.peers.push_back (peer); @@ -123,7 +123,7 @@ ServerImpl::add (Door& door) // as a weak_ptr. // void -ServerImpl::remove (Peer& peer) +ServerImpl::remove (BasicPeer& peer) { std::lock_guard lock (mutex_); state_.peers.erase (state_.peers.iterator_to (peer)); @@ -183,7 +183,7 @@ ServerImpl::onWrite (beast::PropertyStream::Map& map) // VFALCO TODO Write the list of doors - map ["active"] = Peer::count(); + map ["active"] = state_.peers.size(); { std::string s; @@ -282,7 +282,9 @@ ServerImpl::on_update () { if (comp < 0) { - doors.push_back (new Door (*this, *port)); + doors.push_back (std::make_shared ( + io_service_, *this, *port)); + doors.back()->listen(); } else { @@ -293,7 +295,9 @@ ServerImpl::on_update () } else { - doors.push_back (new Door (*this, *port)); + doors.push_back (std::make_shared ( + io_service_, *this, *port)); + doors.back()->listen(); } } diff --git a/src/ripple/http/impl/ServerImpl.h b/src/ripple/http/impl/ServerImpl.h index 19cb19035b..a22972fcbd 100644 --- a/src/ripple/http/impl/ServerImpl.h +++ b/src/ripple/http/impl/ServerImpl.h @@ -38,8 +38,8 @@ namespace ripple { namespace HTTP { +class BasicPeer; class Door; -class Peer; struct Stat { @@ -68,13 +68,13 @@ private: Ports ports; // All allocated Peer objects - beast::List peers; + beast::List peers; // All allocated Door objects beast::List doors; }; - typedef std::vector > Doors; + typedef std::vector > Doors; Server& m_server; Handler& m_handler; @@ -121,13 +121,13 @@ public: get_io_service(); void - add (Peer& peer); + add (BasicPeer& peer); void add (Door& door); void - remove (Peer& peer); + remove (BasicPeer& peer); void remove (Door& door); diff --git a/src/ripple/unity/http.cpp b/src/ripple/unity/http.cpp index d73e3233f8..33752ff242 100644 --- a/src/ripple/unity/http.cpp +++ b/src/ripple/unity/http.cpp @@ -20,7 +20,6 @@ #include #include -#include #include #include #include