diff --git a/src/ripple/app/main/ServerHandlerImp.cpp b/src/ripple/app/main/ServerHandlerImp.cpp index 6f87e4593d..9bff3ea297 100644 --- a/src/ripple/app/main/ServerHandlerImp.cpp +++ b/src/ripple/app/main/ServerHandlerImp.cpp @@ -122,11 +122,9 @@ ServerHandlerImp::onRequest (HTTP::Session& session) return; } - session.detach(); - m_jobQueue.addJob (jtCLIENT, "RPC-Client", std::bind ( &ServerHandlerImp::processSession, this, std::placeholders::_1, - std::ref (session))); + session.detach())); } void @@ -145,18 +143,19 @@ ServerHandlerImp::onStopped (HTTP::Server&) // Dispatched on the job queue void -ServerHandlerImp::processSession (Job& job, HTTP::Session& session) +ServerHandlerImp::processSession (Job& job, + std::shared_ptr const& session) { - session.write (processRequest (to_string(session.body()), - session.remoteAddress().at_port(0))); + session->write (processRequest (to_string(session->body()), + session->remoteAddress().at_port(0))); - if (session.request().keep_alive()) + if (session->request().keep_alive()) { - session.complete(); + session->complete(); } else { - session.close (true); + session->close (true); } } diff --git a/src/ripple/app/main/ServerHandlerImp.h b/src/ripple/app/main/ServerHandlerImp.h index 9d698eb7b8..2e14d7f1b4 100644 --- a/src/ripple/app/main/ServerHandlerImp.h +++ b/src/ripple/app/main/ServerHandlerImp.h @@ -80,7 +80,8 @@ private: //-------------------------------------------------------------------------- void - processSession (Job& job, HTTP::Session& session); + processSession (Job& job, + std::shared_ptr const& session); std::string createResponse (int statusCode, std::string const& description); diff --git a/src/ripple/http/Session.h b/src/ripple/http/Session.h index 7889b7d3c0..424821449e 100644 --- a/src/ripple/http/Session.h +++ b/src/ripple/http/Session.h @@ -25,6 +25,7 @@ #include #include #include +#include #include namespace ripple { @@ -104,7 +105,7 @@ public: will not return until all detached sessions are closed. */ virtual - void + std::shared_ptr detach() = 0; /** Indicate that the response is complete. diff --git a/src/ripple/http/impl/Door.cpp b/src/ripple/http/impl/Door.cpp index 573dd07d16..f34f993d39 100644 --- a/src/ripple/http/impl/Door.cpp +++ b/src/ripple/http/impl/Door.cpp @@ -69,36 +69,43 @@ detect_ssl (Socket& socket, StreamBuf& buf, Yield yield) break; } - std::size_t const bytes_transferred = boost::asio::async_read (socket, + buf.commit(boost::asio::async_read (socket, buf.prepare(max - bytes), boost::asio::transfer_at_least(1), - yield[result.first]); + yield[result.first])); if (result.first) break; - buf.commit (bytes_transferred); } return result; } //------------------------------------------------------------------------------ +Door::Child::Child(Door& door) + : door_(door) +{ +} + +Door::Child::~Child() +{ + door_.remove(*this); +} + +//------------------------------------------------------------------------------ + Door::detector::detector (Door& door, socket_type&& socket, endpoint_type endpoint) - : door_ (door) + : Child(door) , socket_ (std::move(socket)) , timer_ (socket_.get_io_service()) , remote_endpoint_ (endpoint) { - door_.add(*this); -} - -Door::detector::~detector() -{ - door_.remove(*this); } void Door::detector::run() { + // do_detect must be called before do_timer or else + // the timer can be canceled before it gets set. boost::asio::spawn (door_.strand_, std::bind (&detector::do_detect, shared_from_this(), std::placeholders::_1)); @@ -151,31 +158,55 @@ Door::Door (boost::asio::io_service& io_service, ServerImpl& server, Port const& port) : port_(port) , server_(server) - , acceptor_(io_service, to_asio(port), true) + , acceptor_(io_service) , strand_(io_service) { server_.add (*this); error_code ec; + endpoint_type const local_address = to_asio(port); + acceptor_.open(local_address.protocol(), ec); if (ec) { if (server_.journal().error) server_.journal().error << - "Error setting acceptor socket option: " << ec.message(); + "Error opening listener: " << ec.message(); + throw std::exception(); + return; } - if (! ec) - { - if (server_.journal().info) server_.journal().info << - "Bound to endpoint " << to_string (acceptor_.local_endpoint()); - } - else + acceptor_.set_option( + boost::asio::ip::tcp::acceptor::reuse_address(true), ec); + if (ec) { if (server_.journal().error) server_.journal().error << - "Error binding to endpoint " << - to_string (acceptor_.local_endpoint()) << - ", '" << ec.message() << "'"; + "Error setting listener options: " << ec.message(); + throw std::exception(); + return; } + + acceptor_.bind(local_address, ec); + if (ec) + { + if (server_.journal().error) server_.journal().error << + "Error binding to endpoint " << local_address << + ", '" << ec.message() << "'"; + throw std::exception(); + return; + } + + acceptor_.listen(boost::asio::socket_base::max_connections, ec); + if (ec) + { + if (server_.journal().error) server_.journal().error << + "Error on listen: " << local_address << + ", '" << ec.message() << "'"; + throw std::exception(); + return; + } + + if (server_.journal().info) server_.journal().info << + "Bound to endpoint " << to_string (acceptor_.local_endpoint()); } Door::~Door() @@ -205,28 +236,33 @@ Door::close() error_code ec; acceptor_.close(ec); // Close all detector, Peer objects - for(auto& _ : list_) - _.close(); -} - -//------------------------------------------------------------------------------ - -void -Door::add (Child& c) -{ std::lock_guard lock(mutex_); - list_.push_back(c); + for(auto& _ : list_) + { + auto const peer = _.second.lock(); + if (peer != nullptr) + peer->close(); + } } void Door::remove (Child& c) { std::lock_guard lock(mutex_); - list_.erase(list_.iterator_to(c)); + list_.erase(&c); if (list_.empty()) cond_.notify_all(); } +//------------------------------------------------------------------------------ + +void +Door::add (std::shared_ptr const& child) +{ + std::lock_guard lock(mutex_); + list_.emplace(child.get(), child); +} + void Door::create (bool ssl, beast::asio::streambuf&& buf, socket_type&& socket, endpoint_type remote_address) @@ -254,14 +290,16 @@ Door::create (bool ssl, beast::asio::streambuf&& buf, auto const peer = std::make_shared (*this, server_.journal(), remote_address, buf.data(), std::move(socket)); - peer->accept(); + add(peer); + peer->run(); return; } auto const peer = std::make_shared (*this, server_.journal(), remote_address, buf.data(), std::move(socket)); - peer->accept(); + add(peer); + peer->run(); return; } break; @@ -282,34 +320,34 @@ Door::do_accept (boost::asio::yield_context yield) endpoint_type endpoint; socket_type socket (acceptor_.get_io_service()); acceptor_.async_accept (socket, endpoint, yield[ec]); - if (server_.closed()) + if (ec && ec != boost::asio::error::operation_aborted) + if (server_.journal().error) server_.journal().error << + "accept: " << ec.message(); + if (ec == boost::asio::error::operation_aborted || server_.closed()) break; if (ec) - { - if (ec != boost::asio::error::operation_aborted) - if (server_.journal().error) server_.journal().error << - "accept: " << ec.message(); - break; - } - + continue; if (port_.security == Port::Security::no_ssl) { auto const peer = std::make_shared (*this, server_.journal(), endpoint, boost::asio::null_buffers(), std::move(socket)); - peer->accept(); + add(peer); + peer->run(); } else if (port_.security == Port::Security::require_ssl) { auto const peer = std::make_shared (*this, server_.journal(), endpoint, boost::asio::null_buffers(), std::move(socket)); - peer->accept(); + add(peer); + peer->run(); } else { auto const c = std::make_shared ( *this, std::move(socket), endpoint); + add(c); c->run(); } } diff --git a/src/ripple/http/impl/Door.h b/src/ripple/http/impl/Door.h index e25d5b20fe..77f80bb6f7 100644 --- a/src/ripple/http/impl/Door.h +++ b/src/ripple/http/impl/Door.h @@ -27,7 +27,7 @@ #include #include #include -#include +#include #include #include #include @@ -41,6 +41,18 @@ class Door : public ServerImpl::Child , public std::enable_shared_from_this { +public: + class Child + { + protected: + Door& door_; + + public: + Child (Door& door); + virtual ~Child(); + virtual void close() = 0; + }; + private: using clock_type = std::chrono::steady_clock; using timer_type = boost::asio::basic_waitable_timer; @@ -53,11 +65,10 @@ private: // Detects SSL on a socket class detector - : public std::enable_shared_from_this - , public ServerImpl::Child + : public Child + , public std::enable_shared_from_this { private: - Door& door_; socket_type socket_; timer_type timer_; endpoint_type remote_endpoint_; @@ -65,27 +76,22 @@ private: public: detector (Door& door, socket_type&& socket, endpoint_type endpoint); - - ~detector(); - void run(); - void close(); + void close() override; private: void do_timer (yield_context yield); void do_detect (yield_context yield); }; - using list_type = boost::intrusive::make_list >::type; - Port port_; ServerImpl& server_; acceptor_type acceptor_; boost::asio::io_service::strand strand_; std::mutex mutex_; std::condition_variable cond_; - list_type list_; + boost::container::flat_map< + Child*, std::weak_ptr> list_; public: Door (boost::asio::io_service& io_service, @@ -113,10 +119,6 @@ public: // Work-around because we can't call shared_from_this in ctor void run(); - void add (Child& c); - - void remove (Child& c); - /** Close the Door listening socket and connections. The listening socket is closed, and all open connections belonging to the Door are closed. @@ -125,7 +127,11 @@ public: */ void close(); + void remove (Child& c); + private: + void add (std::shared_ptr const& child); + void create (bool ssl, beast::asio::streambuf&& buf, socket_type&& socket, endpoint_type remote_address); diff --git a/src/ripple/http/impl/Peer.h b/src/ripple/http/impl/Peer.h index 0ee6f4803e..e6e5e1fbdb 100644 --- a/src/ripple/http/impl/Peer.h +++ b/src/ripple/http/impl/Peer.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_HTTP_PEER_H_INCLUDED #define RIPPLE_HTTP_PEER_H_INCLUDED +#include #include #include #include @@ -48,7 +49,7 @@ namespace HTTP { /** Represents an active connection. */ template class Peer - : public ServerImpl::Child + : public Door::Child , public Session { protected: @@ -81,7 +82,6 @@ protected: std::size_t used; }; - Door& door_; boost::asio::io_service::work work_; boost::asio::io_service::strand strand_; waitable_timer timer_; @@ -98,7 +98,6 @@ protected: std::mutex mutex_; bool graceful_ = false; bool complete_ = false; - std::shared_ptr detach_ref_; boost::system::error_code ec_; clock_type::time_point when_; @@ -188,7 +187,7 @@ protected: void write (void const* buffer, std::size_t bytes) override; - void + std::shared_ptr detach() override; void @@ -205,14 +204,13 @@ template Peer::Peer (Door& door, boost::asio::io_service& io_service, beast::Journal journal, endpoint_type endpoint, ConstBufferSequence const& buffers) - : door_(door) + : Child(door) , work_ (io_service) , strand_ (io_service) , timer_ (io_service) , endpoint_ (endpoint) , journal_ (journal) { - door_.add(*this); read_buf_.commit(boost::asio::buffer_copy(read_buf_.prepare ( boost::asio::buffer_size (buffers)), buffers)); static std::atomic sid; @@ -242,7 +240,6 @@ Peer::~Peer() if (journal_.trace) journal_.trace << id_ << "destroyed: " << request_count_ << ((request_count_ == 1) ? " request" : " requests"); - door_.remove (*this); } template @@ -254,7 +251,6 @@ Peer::close() (void(Peer::*)(void))&Peer::close, impl().shared_from_this())); error_code ec; - timer_.cancel(ec); impl().socket_.lowest_layer().close(ec); } @@ -452,12 +448,10 @@ Peer::write (void const* buffer, std::size_t bytes) // Make the Session asynchronous template -void -Peer::detach () +std::shared_ptr +Peer::detach() { - // Maintain an additional reference while detached - if (! detach_ref_) - detach_ref_ = impl().shared_from_this(); + return impl().shared_from_this(); } // Called to indicate the response has been written (but not sent) @@ -469,9 +463,6 @@ Peer::complete() return strand_.post(std::bind (&Peer::complete, impl().shared_from_this())); - // Reattach - detach_ref_.reset(); - message_ = beast::http::message{}; complete_ = true; @@ -493,21 +484,16 @@ Peer::close (bool graceful) (void(Peer::*)(bool))&Peer::close, impl().shared_from_this(), graceful)); - // Reattach - detach_ref_.reset(); - complete_ = true; - if (graceful) { graceful_ = true; - if (! write_queue_.empty()) return; + return do_close(); } error_code ec; - timer_.cancel (ec); impl().socket_.lowest_layer().close (ec); } diff --git a/src/ripple/http/impl/PlainPeer.h b/src/ripple/http/impl/PlainPeer.h index a551e010f3..3b96607b2b 100644 --- a/src/ripple/http/impl/PlainPeer.h +++ b/src/ripple/http/impl/PlainPeer.h @@ -40,7 +40,7 @@ public: ConstBufferSequence const& buffers, socket_type&& socket); void - accept(); + run(); private: void @@ -62,7 +62,7 @@ PlainPeer::PlainPeer (Door& door, beast::Journal journal, } void -PlainPeer::accept () +PlainPeer::run () { door_.server().handler().onAccept (session()); if (! socket_.is_open()) diff --git a/src/ripple/http/impl/SSLPeer.h b/src/ripple/http/impl/SSLPeer.h index 895a811b82..1aab1d2c48 100644 --- a/src/ripple/http/impl/SSLPeer.h +++ b/src/ripple/http/impl/SSLPeer.h @@ -42,7 +42,7 @@ public: ConstBufferSequence const& buffers, next_layer_type&& socket); void - accept(); + run(); private: void @@ -72,10 +72,10 @@ SSLPeer::SSLPeer (Door& door, beast::Journal journal, // Called when the acceptor accepts our socket. void -SSLPeer::accept () +SSLPeer::run () { door_.server().handler().onAccept (session()); - if (! next_layer_.is_open()) + if (! socket_.lowest_layer().is_open()) return; boost::asio::spawn (strand_, std::bind (&SSLPeer::do_handshake, @@ -86,11 +86,12 @@ void SSLPeer::do_handshake (boost::asio::yield_context yield) { error_code ec; - std::size_t const bytes_transferred = socket_.async_handshake ( - socket_type::server, read_buf_.data(), yield[ec]); + start_timer(); + read_buf_.consume(socket_.async_handshake( + socket_type::server, read_buf_.data(), yield[ec])); + cancel_timer(); if (ec) return fail (ec, "handshake"); - read_buf_.consume (bytes_transferred); boost::asio::spawn (strand_, std::bind (&SSLPeer::do_read, shared_from_this(), std::placeholders::_1)); } @@ -106,15 +107,17 @@ void SSLPeer::do_close() { error_code ec; + start_timer(); socket_.async_shutdown (strand_.wrap (std::bind ( &SSLPeer::on_shutdown, shared_from_this(), std::placeholders::_1))); + cancel_timer(); } void SSLPeer::on_shutdown (error_code ec) { - socket_.next_layer().close(ec); + socket_.lowest_layer().close(ec); } } diff --git a/src/ripple/http/impl/ServerImpl.cpp b/src/ripple/http/impl/ServerImpl.cpp index 9b0331a92d..c6b082d489 100644 --- a/src/ripple/http/impl/ServerImpl.cpp +++ b/src/ripple/http/impl/ServerImpl.cpp @@ -107,14 +107,22 @@ ServerImpl::onWrite (beast::PropertyStream::Map& map) void ServerImpl::close() { - std::lock_guard lock(mutex_); - if (work_) + bool stopped = false; { - work_ = boost::none; - // Close all Door objects - for(auto& _ :list_) - _.close(); + std::lock_guard lock(mutex_); + if (work_) + { + work_ = boost::none; + // Close all Door objects + if (list_.empty()) + stopped = true; + else + for(auto& _ :list_) + _.close(); + } } + if (stopped) + handler_.onStopped(*this); } //-------------------------------------------------------------------------- @@ -129,13 +137,18 @@ ServerImpl::add (Child& child) void ServerImpl::remove (Child& child) { - std::lock_guard lock(mutex_); - list_.erase(list_.iterator_to(child)); - if (list_.empty()) + bool stopped = false; { - handler_.onStopped(*this); - cond_.notify_all(); + std::lock_guard lock(mutex_); + list_.erase(list_.iterator_to(child)); + if (list_.empty()) + { + cond_.notify_all(); + stopped = true; + } } + if (stopped) + handler_.onStopped(*this); } bool