From 549ad3204fb4216214d05ee7eb9c8bb85c483400 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sun, 2 Nov 2014 17:51:20 -0800 Subject: [PATCH] Fix race conditions closing HTTP I/O objects: This fixes a case where stop can sometimes skip calling close on some I/O objects or crash in a rare circumstance where a connection is in the process of being torn down at the exact time the server is stopped. When the acceptor receives errors, it logs the error and continues listening instead of stopping. --- src/ripple/app/main/ServerHandlerImp.cpp | 17 ++- src/ripple/app/main/ServerHandlerImp.h | 3 +- src/ripple/http/Session.h | 3 +- src/ripple/http/impl/Door.cpp | 126 +++++++++++++++-------- src/ripple/http/impl/Door.h | 38 ++++--- src/ripple/http/impl/Peer.h | 30 ++---- src/ripple/http/impl/PlainPeer.h | 4 +- src/ripple/http/impl/SSLPeer.h | 17 +-- src/ripple/http/impl/ServerImpl.cpp | 35 +++++-- 9 files changed, 160 insertions(+), 113 deletions(-) diff --git a/src/ripple/app/main/ServerHandlerImp.cpp b/src/ripple/app/main/ServerHandlerImp.cpp index 6f87e4593d..9bff3ea297 100644 --- a/src/ripple/app/main/ServerHandlerImp.cpp +++ b/src/ripple/app/main/ServerHandlerImp.cpp @@ -122,11 +122,9 @@ ServerHandlerImp::onRequest (HTTP::Session& session) return; } - session.detach(); - m_jobQueue.addJob (jtCLIENT, "RPC-Client", std::bind ( &ServerHandlerImp::processSession, this, std::placeholders::_1, - std::ref (session))); + session.detach())); } void @@ -145,18 +143,19 @@ ServerHandlerImp::onStopped (HTTP::Server&) // Dispatched on the job queue void -ServerHandlerImp::processSession (Job& job, HTTP::Session& session) +ServerHandlerImp::processSession (Job& job, + std::shared_ptr const& session) { - session.write (processRequest (to_string(session.body()), - session.remoteAddress().at_port(0))); + session->write (processRequest (to_string(session->body()), + session->remoteAddress().at_port(0))); - if (session.request().keep_alive()) + if (session->request().keep_alive()) { - session.complete(); + session->complete(); } else { - session.close (true); + session->close (true); } } diff --git a/src/ripple/app/main/ServerHandlerImp.h b/src/ripple/app/main/ServerHandlerImp.h index 9d698eb7b8..2e14d7f1b4 100644 --- a/src/ripple/app/main/ServerHandlerImp.h +++ b/src/ripple/app/main/ServerHandlerImp.h @@ -80,7 +80,8 @@ private: //-------------------------------------------------------------------------- void - processSession (Job& job, HTTP::Session& session); + processSession (Job& job, + std::shared_ptr const& session); std::string createResponse (int statusCode, std::string const& description); diff --git a/src/ripple/http/Session.h b/src/ripple/http/Session.h index 7889b7d3c0..424821449e 100644 --- a/src/ripple/http/Session.h +++ b/src/ripple/http/Session.h @@ -25,6 +25,7 @@ #include #include #include +#include #include namespace ripple { @@ -104,7 +105,7 @@ public: will not return until all detached sessions are closed. */ virtual - void + std::shared_ptr detach() = 0; /** Indicate that the response is complete. diff --git a/src/ripple/http/impl/Door.cpp b/src/ripple/http/impl/Door.cpp index 573dd07d16..f34f993d39 100644 --- a/src/ripple/http/impl/Door.cpp +++ b/src/ripple/http/impl/Door.cpp @@ -69,36 +69,43 @@ detect_ssl (Socket& socket, StreamBuf& buf, Yield yield) break; } - std::size_t const bytes_transferred = boost::asio::async_read (socket, + buf.commit(boost::asio::async_read (socket, buf.prepare(max - bytes), boost::asio::transfer_at_least(1), - yield[result.first]); + yield[result.first])); if (result.first) break; - buf.commit (bytes_transferred); } return result; } //------------------------------------------------------------------------------ +Door::Child::Child(Door& door) + : door_(door) +{ +} + +Door::Child::~Child() +{ + door_.remove(*this); +} + +//------------------------------------------------------------------------------ + Door::detector::detector (Door& door, socket_type&& socket, endpoint_type endpoint) - : door_ (door) + : Child(door) , socket_ (std::move(socket)) , timer_ (socket_.get_io_service()) , remote_endpoint_ (endpoint) { - door_.add(*this); -} - -Door::detector::~detector() -{ - door_.remove(*this); } void Door::detector::run() { + // do_detect must be called before do_timer or else + // the timer can be canceled before it gets set. boost::asio::spawn (door_.strand_, std::bind (&detector::do_detect, shared_from_this(), std::placeholders::_1)); @@ -151,31 +158,55 @@ Door::Door (boost::asio::io_service& io_service, ServerImpl& server, Port const& port) : port_(port) , server_(server) - , acceptor_(io_service, to_asio(port), true) + , acceptor_(io_service) , strand_(io_service) { server_.add (*this); error_code ec; + endpoint_type const local_address = to_asio(port); + acceptor_.open(local_address.protocol(), ec); if (ec) { if (server_.journal().error) server_.journal().error << - "Error setting acceptor socket option: " << ec.message(); + "Error opening listener: " << ec.message(); + throw std::exception(); + return; } - if (! ec) - { - if (server_.journal().info) server_.journal().info << - "Bound to endpoint " << to_string (acceptor_.local_endpoint()); - } - else + acceptor_.set_option( + boost::asio::ip::tcp::acceptor::reuse_address(true), ec); + if (ec) { if (server_.journal().error) server_.journal().error << - "Error binding to endpoint " << - to_string (acceptor_.local_endpoint()) << - ", '" << ec.message() << "'"; + "Error setting listener options: " << ec.message(); + throw std::exception(); + return; } + + acceptor_.bind(local_address, ec); + if (ec) + { + if (server_.journal().error) server_.journal().error << + "Error binding to endpoint " << local_address << + ", '" << ec.message() << "'"; + throw std::exception(); + return; + } + + acceptor_.listen(boost::asio::socket_base::max_connections, ec); + if (ec) + { + if (server_.journal().error) server_.journal().error << + "Error on listen: " << local_address << + ", '" << ec.message() << "'"; + throw std::exception(); + return; + } + + if (server_.journal().info) server_.journal().info << + "Bound to endpoint " << to_string (acceptor_.local_endpoint()); } Door::~Door() @@ -205,28 +236,33 @@ Door::close() error_code ec; acceptor_.close(ec); // Close all detector, Peer objects - for(auto& _ : list_) - _.close(); -} - -//------------------------------------------------------------------------------ - -void -Door::add (Child& c) -{ std::lock_guard lock(mutex_); - list_.push_back(c); + for(auto& _ : list_) + { + auto const peer = _.second.lock(); + if (peer != nullptr) + peer->close(); + } } void Door::remove (Child& c) { std::lock_guard lock(mutex_); - list_.erase(list_.iterator_to(c)); + list_.erase(&c); if (list_.empty()) cond_.notify_all(); } +//------------------------------------------------------------------------------ + +void +Door::add (std::shared_ptr const& child) +{ + std::lock_guard lock(mutex_); + list_.emplace(child.get(), child); +} + void Door::create (bool ssl, beast::asio::streambuf&& buf, socket_type&& socket, endpoint_type remote_address) @@ -254,14 +290,16 @@ Door::create (bool ssl, beast::asio::streambuf&& buf, auto const peer = std::make_shared (*this, server_.journal(), remote_address, buf.data(), std::move(socket)); - peer->accept(); + add(peer); + peer->run(); return; } auto const peer = std::make_shared (*this, server_.journal(), remote_address, buf.data(), std::move(socket)); - peer->accept(); + add(peer); + peer->run(); return; } break; @@ -282,34 +320,34 @@ Door::do_accept (boost::asio::yield_context yield) endpoint_type endpoint; socket_type socket (acceptor_.get_io_service()); acceptor_.async_accept (socket, endpoint, yield[ec]); - if (server_.closed()) + if (ec && ec != boost::asio::error::operation_aborted) + if (server_.journal().error) server_.journal().error << + "accept: " << ec.message(); + if (ec == boost::asio::error::operation_aborted || server_.closed()) break; if (ec) - { - if (ec != boost::asio::error::operation_aborted) - if (server_.journal().error) server_.journal().error << - "accept: " << ec.message(); - break; - } - + continue; if (port_.security == Port::Security::no_ssl) { auto const peer = std::make_shared (*this, server_.journal(), endpoint, boost::asio::null_buffers(), std::move(socket)); - peer->accept(); + add(peer); + peer->run(); } else if (port_.security == Port::Security::require_ssl) { auto const peer = std::make_shared (*this, server_.journal(), endpoint, boost::asio::null_buffers(), std::move(socket)); - peer->accept(); + add(peer); + peer->run(); } else { auto const c = std::make_shared ( *this, std::move(socket), endpoint); + add(c); c->run(); } } diff --git a/src/ripple/http/impl/Door.h b/src/ripple/http/impl/Door.h index e25d5b20fe..77f80bb6f7 100644 --- a/src/ripple/http/impl/Door.h +++ b/src/ripple/http/impl/Door.h @@ -27,7 +27,7 @@ #include #include #include -#include +#include #include #include #include @@ -41,6 +41,18 @@ class Door : public ServerImpl::Child , public std::enable_shared_from_this { +public: + class Child + { + protected: + Door& door_; + + public: + Child (Door& door); + virtual ~Child(); + virtual void close() = 0; + }; + private: using clock_type = std::chrono::steady_clock; using timer_type = boost::asio::basic_waitable_timer; @@ -53,11 +65,10 @@ private: // Detects SSL on a socket class detector - : public std::enable_shared_from_this - , public ServerImpl::Child + : public Child + , public std::enable_shared_from_this { private: - Door& door_; socket_type socket_; timer_type timer_; endpoint_type remote_endpoint_; @@ -65,27 +76,22 @@ private: public: detector (Door& door, socket_type&& socket, endpoint_type endpoint); - - ~detector(); - void run(); - void close(); + void close() override; private: void do_timer (yield_context yield); void do_detect (yield_context yield); }; - using list_type = boost::intrusive::make_list >::type; - Port port_; ServerImpl& server_; acceptor_type acceptor_; boost::asio::io_service::strand strand_; std::mutex mutex_; std::condition_variable cond_; - list_type list_; + boost::container::flat_map< + Child*, std::weak_ptr> list_; public: Door (boost::asio::io_service& io_service, @@ -113,10 +119,6 @@ public: // Work-around because we can't call shared_from_this in ctor void run(); - void add (Child& c); - - void remove (Child& c); - /** Close the Door listening socket and connections. The listening socket is closed, and all open connections belonging to the Door are closed. @@ -125,7 +127,11 @@ public: */ void close(); + void remove (Child& c); + private: + void add (std::shared_ptr const& child); + void create (bool ssl, beast::asio::streambuf&& buf, socket_type&& socket, endpoint_type remote_address); diff --git a/src/ripple/http/impl/Peer.h b/src/ripple/http/impl/Peer.h index 0ee6f4803e..e6e5e1fbdb 100644 --- a/src/ripple/http/impl/Peer.h +++ b/src/ripple/http/impl/Peer.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_HTTP_PEER_H_INCLUDED #define RIPPLE_HTTP_PEER_H_INCLUDED +#include #include #include #include @@ -48,7 +49,7 @@ namespace HTTP { /** Represents an active connection. */ template class Peer - : public ServerImpl::Child + : public Door::Child , public Session { protected: @@ -81,7 +82,6 @@ protected: std::size_t used; }; - Door& door_; boost::asio::io_service::work work_; boost::asio::io_service::strand strand_; waitable_timer timer_; @@ -98,7 +98,6 @@ protected: std::mutex mutex_; bool graceful_ = false; bool complete_ = false; - std::shared_ptr detach_ref_; boost::system::error_code ec_; clock_type::time_point when_; @@ -188,7 +187,7 @@ protected: void write (void const* buffer, std::size_t bytes) override; - void + std::shared_ptr detach() override; void @@ -205,14 +204,13 @@ template Peer::Peer (Door& door, boost::asio::io_service& io_service, beast::Journal journal, endpoint_type endpoint, ConstBufferSequence const& buffers) - : door_(door) + : Child(door) , work_ (io_service) , strand_ (io_service) , timer_ (io_service) , endpoint_ (endpoint) , journal_ (journal) { - door_.add(*this); read_buf_.commit(boost::asio::buffer_copy(read_buf_.prepare ( boost::asio::buffer_size (buffers)), buffers)); static std::atomic sid; @@ -242,7 +240,6 @@ Peer::~Peer() if (journal_.trace) journal_.trace << id_ << "destroyed: " << request_count_ << ((request_count_ == 1) ? " request" : " requests"); - door_.remove (*this); } template @@ -254,7 +251,6 @@ Peer::close() (void(Peer::*)(void))&Peer::close, impl().shared_from_this())); error_code ec; - timer_.cancel(ec); impl().socket_.lowest_layer().close(ec); } @@ -452,12 +448,10 @@ Peer::write (void const* buffer, std::size_t bytes) // Make the Session asynchronous template -void -Peer::detach () +std::shared_ptr +Peer::detach() { - // Maintain an additional reference while detached - if (! detach_ref_) - detach_ref_ = impl().shared_from_this(); + return impl().shared_from_this(); } // Called to indicate the response has been written (but not sent) @@ -469,9 +463,6 @@ Peer::complete() return strand_.post(std::bind (&Peer::complete, impl().shared_from_this())); - // Reattach - detach_ref_.reset(); - message_ = beast::http::message{}; complete_ = true; @@ -493,21 +484,16 @@ Peer::close (bool graceful) (void(Peer::*)(bool))&Peer::close, impl().shared_from_this(), graceful)); - // Reattach - detach_ref_.reset(); - complete_ = true; - if (graceful) { graceful_ = true; - if (! write_queue_.empty()) return; + return do_close(); } error_code ec; - timer_.cancel (ec); impl().socket_.lowest_layer().close (ec); } diff --git a/src/ripple/http/impl/PlainPeer.h b/src/ripple/http/impl/PlainPeer.h index a551e010f3..3b96607b2b 100644 --- a/src/ripple/http/impl/PlainPeer.h +++ b/src/ripple/http/impl/PlainPeer.h @@ -40,7 +40,7 @@ public: ConstBufferSequence const& buffers, socket_type&& socket); void - accept(); + run(); private: void @@ -62,7 +62,7 @@ PlainPeer::PlainPeer (Door& door, beast::Journal journal, } void -PlainPeer::accept () +PlainPeer::run () { door_.server().handler().onAccept (session()); if (! socket_.is_open()) diff --git a/src/ripple/http/impl/SSLPeer.h b/src/ripple/http/impl/SSLPeer.h index 895a811b82..1aab1d2c48 100644 --- a/src/ripple/http/impl/SSLPeer.h +++ b/src/ripple/http/impl/SSLPeer.h @@ -42,7 +42,7 @@ public: ConstBufferSequence const& buffers, next_layer_type&& socket); void - accept(); + run(); private: void @@ -72,10 +72,10 @@ SSLPeer::SSLPeer (Door& door, beast::Journal journal, // Called when the acceptor accepts our socket. void -SSLPeer::accept () +SSLPeer::run () { door_.server().handler().onAccept (session()); - if (! next_layer_.is_open()) + if (! socket_.lowest_layer().is_open()) return; boost::asio::spawn (strand_, std::bind (&SSLPeer::do_handshake, @@ -86,11 +86,12 @@ void SSLPeer::do_handshake (boost::asio::yield_context yield) { error_code ec; - std::size_t const bytes_transferred = socket_.async_handshake ( - socket_type::server, read_buf_.data(), yield[ec]); + start_timer(); + read_buf_.consume(socket_.async_handshake( + socket_type::server, read_buf_.data(), yield[ec])); + cancel_timer(); if (ec) return fail (ec, "handshake"); - read_buf_.consume (bytes_transferred); boost::asio::spawn (strand_, std::bind (&SSLPeer::do_read, shared_from_this(), std::placeholders::_1)); } @@ -106,15 +107,17 @@ void SSLPeer::do_close() { error_code ec; + start_timer(); socket_.async_shutdown (strand_.wrap (std::bind ( &SSLPeer::on_shutdown, shared_from_this(), std::placeholders::_1))); + cancel_timer(); } void SSLPeer::on_shutdown (error_code ec) { - socket_.next_layer().close(ec); + socket_.lowest_layer().close(ec); } } diff --git a/src/ripple/http/impl/ServerImpl.cpp b/src/ripple/http/impl/ServerImpl.cpp index 9b0331a92d..c6b082d489 100644 --- a/src/ripple/http/impl/ServerImpl.cpp +++ b/src/ripple/http/impl/ServerImpl.cpp @@ -107,14 +107,22 @@ ServerImpl::onWrite (beast::PropertyStream::Map& map) void ServerImpl::close() { - std::lock_guard lock(mutex_); - if (work_) + bool stopped = false; { - work_ = boost::none; - // Close all Door objects - for(auto& _ :list_) - _.close(); + std::lock_guard lock(mutex_); + if (work_) + { + work_ = boost::none; + // Close all Door objects + if (list_.empty()) + stopped = true; + else + for(auto& _ :list_) + _.close(); + } } + if (stopped) + handler_.onStopped(*this); } //-------------------------------------------------------------------------- @@ -129,13 +137,18 @@ ServerImpl::add (Child& child) void ServerImpl::remove (Child& child) { - std::lock_guard lock(mutex_); - list_.erase(list_.iterator_to(child)); - if (list_.empty()) + bool stopped = false; { - handler_.onStopped(*this); - cond_.notify_all(); + std::lock_guard lock(mutex_); + list_.erase(list_.iterator_to(child)); + if (list_.empty()) + { + cond_.notify_all(); + stopped = true; + } } + if (stopped) + handler_.onStopped(*this); } bool