From 0f7dbc7bc0a83e2756446e50a5e60d194aa310ad Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Mon, 8 Feb 2016 10:19:25 -0500 Subject: [PATCH] Fix Server race conditions: Class io_list manages children that perform asynchronous I/O operations. The treatment of close and destruction is refactored to fix race conditions during exit. --- Builds/VisualStudio2015/RippleD.vcxproj | 2 + .../VisualStudio2015/RippleD.vcxproj.filters | 3 + src/ripple/app/main/Application.cpp | 1 - src/ripple/server/Server.h | 4 - src/ripple/server/ServerHandler.h | 5 +- src/ripple/server/impl/BaseHTTPPeer.h | 37 ++- src/ripple/server/impl/Door.cpp | 174 +++++------ src/ripple/server/impl/Door.h | 67 ++--- src/ripple/server/impl/JSONRPCUtil.cpp | 2 - src/ripple/server/impl/PlainHTTPPeer.h | 19 +- src/ripple/server/impl/SSLHTTPPeer.h | 21 +- src/ripple/server/impl/ServerHandlerImp.cpp | 9 - src/ripple/server/impl/ServerHandlerImp.h | 7 - src/ripple/server/impl/ServerImpl.cpp | 157 ++-------- src/ripple/server/impl/ServerImpl.h | 52 +--- src/ripple/server/impl/io_list.h | 272 ++++++++++++++++++ src/ripple/server/tests/Server_test.cpp | 2 +- 17 files changed, 434 insertions(+), 400 deletions(-) create mode 100644 src/ripple/server/impl/io_list.h diff --git a/Builds/VisualStudio2015/RippleD.vcxproj b/Builds/VisualStudio2015/RippleD.vcxproj index b74d84bf4..7f0e19eab 100644 --- a/Builds/VisualStudio2015/RippleD.vcxproj +++ b/Builds/VisualStudio2015/RippleD.vcxproj @@ -3313,6 +3313,8 @@ + + True True diff --git a/Builds/VisualStudio2015/RippleD.vcxproj.filters b/Builds/VisualStudio2015/RippleD.vcxproj.filters index c94fdbb65..e7047369e 100644 --- a/Builds/VisualStudio2015/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2015/RippleD.vcxproj.filters @@ -3801,6 +3801,9 @@ ripple\server\impl + + ripple\server\impl + ripple\server\impl diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 494a203d9..e2ae11700 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -520,7 +520,6 @@ public: m_nodeStoreScheduler.setJobQueue (*m_jobQueue); add (m_ledgerMaster->getPropertySource ()); - add (*serverHandler_); } //-------------------------------------------------------------------------- diff --git a/src/ripple/server/Server.h b/src/ripple/server/Server.h index e9eecb142..ba638c0be 100644 --- a/src/ripple/server/Server.h +++ b/src/ripple/server/Server.h @@ -49,10 +49,6 @@ public: void ports (std::vector const& v) = 0; - virtual - void - onWrite (beast::PropertyStream::Map& map) = 0; - /** Close the server. The close is performed asynchronously. The handler will be notified when the server has stopped. The server is considered stopped when diff --git a/src/ripple/server/ServerHandler.h b/src/ripple/server/ServerHandler.h index 5359b9e5e..98cf43ed7 100644 --- a/src/ripple/server/ServerHandler.h +++ b/src/ripple/server/ServerHandler.h @@ -25,16 +25,13 @@ #include #include #include -#include #include #include #include namespace ripple { -class ServerHandler - : public beast::Stoppable - , public beast::PropertyStream::Source +class ServerHandler : public beast::Stoppable { protected: ServerHandler (Stoppable& parent); diff --git a/src/ripple/server/impl/BaseHTTPPeer.h b/src/ripple/server/impl/BaseHTTPPeer.h index 1c04c0f53..02d780b72 100644 --- a/src/ripple/server/impl/BaseHTTPPeer.h +++ b/src/ripple/server/impl/BaseHTTPPeer.h @@ -20,8 +20,9 @@ #ifndef RIPPLE_SERVER_BASEHTTPPEER_H_INCLUDED #define RIPPLE_SERVER_BASEHTTPPEER_H_INCLUDED -#include #include +#include +#include #include #include #include @@ -46,7 +47,7 @@ namespace ripple { /** Represents an active connection. */ template class BaseHTTPPeer - : public Door::Child + : public io_list::work , public Session { protected: @@ -81,6 +82,8 @@ protected: std::size_t used; }; + Port const& port_; + Handler& handler_; boost::asio::io_service::work work_; boost::asio::io_service::strand strand_; waitable_timer timer_; @@ -108,9 +111,9 @@ protected: public: template - BaseHTTPPeer (Door& door, boost::asio::io_service& io_service, - beast::Journal journal, endpoint_type remote_address, - ConstBufferSequence const& buffers); + BaseHTTPPeer (Port const& port, Handler& handler, + boost::asio::io_service& io_service, beast::Journal journal, + endpoint_type remote_address, ConstBufferSequence const& buffers); virtual ~BaseHTTPPeer(); @@ -165,13 +168,13 @@ protected: beast::Journal journal() override { - return door_.server().journal(); + return journal_; } Port const& port() override { - return door_.port(); + return port_; } beast::IP::Endpoint @@ -213,10 +216,12 @@ protected: template template -BaseHTTPPeer::BaseHTTPPeer (Door& door, boost::asio::io_service& io_service, - beast::Journal journal, endpoint_type remote_address, +BaseHTTPPeer::BaseHTTPPeer (Port const& port, Handler& handler, + boost::asio::io_service& io_service, beast::Journal journal, + endpoint_type remote_address, ConstBufferSequence const& buffers) - : Child(door) + : port_(port) + , handler_(handler) , work_ (io_service) , strand_ (io_service) , timer_ (io_service) @@ -236,17 +241,7 @@ BaseHTTPPeer::BaseHTTPPeer (Door& door, boost::asio::io_service& io_servic template BaseHTTPPeer::~BaseHTTPPeer() { - Stat stat; - stat.id = nid_; - stat.elapsed = std::chrono::duration_cast < - std::chrono::seconds> (clock_type::now() - when_); - stat.requests = request_count_; - stat.bytes_in = bytes_in_; - stat.bytes_out = bytes_out_; - stat.ec = std::move (ec_); - door_.server().report (std::move (stat)); - if(door_.server().handler()) - door_.server().handler()->onClose(session(), ec_); + handler_.onClose(session(), ec_); if (journal_.trace) journal_.trace << id_ << "destroyed: " << request_count_ << ((request_count_ == 1) ? " request" : " requests"); diff --git a/src/ripple/server/impl/Door.cpp b/src/ripple/server/impl/Door.cpp index f866d2000..e53b1a6d5 100644 --- a/src/ripple/server/impl/Door.cpp +++ b/src/ripple/server/impl/Door.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -35,7 +36,7 @@ namespace ripple { read from the stream until there is enough to determine a result. No bytes are discarded from buf. Any additional bytes read are retained. buf must provide an interface compatible with boost::asio::streambuf - http://boost.org/doc/libs/1_56_0/doc/html/boost_asio/reference/streambuf.html + http://boost.org/doc/libs/1_56_0/doc/html/boost_asio/reference/streambuf.html See http://www.ietf.org/rfc/rfc2246.txt Section 7.4. Handshake protocol @@ -81,41 +82,33 @@ detect_ssl (Socket& socket, StreamBuf& buf, Yield yield) //------------------------------------------------------------------------------ -Door::Child::Child(Door& door) - : door_(door) -{ -} - -Door::Child::~Child() -{ - door_.remove(*this); -} - -//------------------------------------------------------------------------------ - -Door::detector::detector (Door& door, socket_type&& socket, - endpoint_type remote_address) - : Child(door) +Door::Detector::Detector(Port const& port, + Handler& handler, socket_type&& socket, + endpoint_type remote_address, beast::Journal j) + : port_(port) + , handler_(handler) , socket_(std::move(socket)) , timer_(socket_.get_io_service()) , remote_address_(remote_address) + , strand_(socket_.get_io_service()) + , j_(j) { } void -Door::detector::run() +Door::Detector::run() { // do_detect must be called before do_timer or else // the timer can be canceled before it gets set. - boost::asio::spawn (door_.strand_, std::bind (&detector::do_detect, + boost::asio::spawn (strand_, std::bind (&Detector::do_detect, shared_from_this(), std::placeholders::_1)); - boost::asio::spawn (door_.strand_, std::bind (&detector::do_timer, + boost::asio::spawn (strand_, std::bind (&Detector::do_timer, shared_from_this(), std::placeholders::_1)); } void -Door::detector::close() +Door::Detector::close() { error_code ec; socket_.close(ec); @@ -123,7 +116,7 @@ Door::detector::close() } void -Door::detector::do_timer (yield_context yield) +Door::Detector::do_timer (yield_context yield) { error_code ec; // ignored while (socket_.is_open()) @@ -135,7 +128,7 @@ Door::detector::do_timer (yield_context yield) } void -Door::detector::do_detect (boost::asio::yield_context yield) +Door::Detector::do_detect (boost::asio::yield_context yield) { bool ssl; error_code ec; @@ -145,29 +138,45 @@ Door::detector::do_detect (boost::asio::yield_context yield) error_code unused; timer_.cancel(unused); if (! ec) - return door_.create(ssl, buf.data(), - std::move(socket_), remote_address_); + { + if (ssl) + { + if(auto sp = ios().emplace(port_, handler_, + j_, remote_address_, buf.data(), + std::move(socket_))) + sp->run(); + return; + } + if(auto sp = ios().emplace(port_, handler_, + j_, remote_address_, buf.data(), + std::move(socket_))) + sp->run(); + return; + } if (ec != boost::asio::error::operation_aborted) - if (door_.server_.journal().trace) door_.server_.journal().trace << + { + JLOG(j_.trace) << "Error detecting ssl: " << ec.message() << " from " << remote_address_; + } } //------------------------------------------------------------------------------ -Door::Door (boost::asio::io_service& io_service, - ServerImpl& server, Port const& port) - : port_(std::make_shared(port)) - , server_(server) +Door::Door (Handler& handler, boost::asio::io_service& io_service, + Port const& port, beast::Journal j) + : j_(j) + , port_(port) + , handler_(handler) , acceptor_(io_service) , strand_(io_service) , ssl_ ( - port_->protocol.count("https") > 0 || - //port_->protocol.count("wss") > 0 || - port_->protocol.count("peer") > 0) + port_.protocol.count("https") > 0 || + //port_.protocol.count("wss") > 0 || + port_.protocol.count("peer") > 0) , plain_ ( - //port_->protocol.count("ws") > 0 || - port_->protocol.count("http") > 0) + //port_.protocol.count("ws") > 0 || + port_.protocol.count("http") > 0) { error_code ec; endpoint_type const local_address = @@ -176,7 +185,7 @@ Door::Door (boost::asio::io_service& io_service, acceptor_.open(local_address.protocol(), ec); if (ec) { - if (server_.journal().error) server_.journal().error << + JLOG(j_.error) << "Open port '" << port.name << "' failed:" << ec.message(); Throw (); } @@ -185,7 +194,7 @@ Door::Door (boost::asio::io_service& io_service, boost::asio::ip::tcp::acceptor::reuse_address(true), ec); if (ec) { - if (server_.journal().error) server_.journal().error << + JLOG(j_.error) << "Option for port '" << port.name << "' failed:" << ec.message(); Throw (); } @@ -193,7 +202,7 @@ Door::Door (boost::asio::io_service& io_service, acceptor_.bind(local_address, ec); if (ec) { - if (server_.journal().error) server_.journal().error << + JLOG(j_.error) << "Bind port '" << port.name << "' failed:" << ec.message(); Throw (); } @@ -201,30 +210,20 @@ Door::Door (boost::asio::io_service& io_service, acceptor_.listen(boost::asio::socket_base::max_connections, ec); if (ec) { - if (server_.journal().error) server_.journal().error << + JLOG(j_.error) << "Listen on port '" << port.name << "' failed:" << ec.message(); Throw (); } - if (server_.journal().info) server_.journal().info << + JLOG(j_.info) << "Opened " << port; } -Door::~Door() -{ - { - // Block until all detector, Peer objects destroyed - std::unique_lock lock(mutex_); - while (! list_.empty()) - cond_.wait(lock); - } -} - void Door::run() { - boost::asio::spawn (strand_, std::bind (&Door::do_accept, - this, std::placeholders::_1)); + boost::asio::spawn (strand_, std::bind(&Door::do_accept, + shared_from_this(), std::placeholders::_1)); } void @@ -232,70 +231,30 @@ Door::close() { if (! strand_.running_in_this_thread()) return strand_.post(std::bind( - &Door::close, this)); + &Door::close, shared_from_this())); error_code ec; acceptor_.close(ec); - // Close all detector, Peer objects - std::vector> v; - { - std::lock_guard lock(mutex_); - for(auto& p : list_) - { - auto const peer = p.second.lock(); - if (peer != nullptr) - { - peer->close(); - // Must destroy shared_ptr outside the - // lock otherwise deadlock from the - // managed object's destructor. - v.emplace_back(std::move(peer)); - } - } - } -} - -void -Door::remove (Child& c) -{ - std::lock_guard lock(mutex_); - auto const n = list_.erase(&c); - if(n != 1) - Throw("missing child"); - if (list_.empty()) - cond_.notify_all(); } //------------------------------------------------------------------------------ -void -Door::add (std::shared_ptr const& child) -{ - std::lock_guard lock(mutex_); - list_.emplace(child.get(), child); -} - template void Door::create (bool ssl, ConstBufferSequence const& buffers, socket_type&& socket, endpoint_type remote_address) { - if (server_.closed()) - return; - if (ssl) { - auto const peer = std::make_shared (*this, - server_.journal(), remote_address, buffers, - std::move(socket)); - add(peer); - return peer->run(); + if(auto sp = ios().emplace(port_, handler_, + j_, remote_address, buffers, + std::move(socket))) + sp->run(); + return; } - - auto const peer = std::make_shared (*this, - server_.journal(), remote_address, buffers, - std::move(socket)); - add(peer); - peer->run(); + if(auto sp = ios().emplace(port_, handler_, + j_, remote_address, buffers, + std::move(socket))) + sp->run(); } void @@ -308,19 +267,19 @@ Door::do_accept (boost::asio::yield_context yield) socket_type socket (acceptor_.get_io_service()); acceptor_.async_accept (socket, remote_address, yield[ec]); if (ec && ec != boost::asio::error::operation_aborted) - if (server_.journal().error) server_.journal().error << + if (j_.error) j_.error << "accept: " << ec.message(); - if (ec == boost::asio::error::operation_aborted || server_.closed()) + if (ec == boost::asio::error::operation_aborted) break; if (ec) continue; if (ssl_ && plain_) { - auto const c = std::make_shared ( - *this, std::move(socket), remote_address); - add(c); - c->run(); + if(auto sp = ios().emplace(port_, + handler_, std::move(socket), remote_address, + j_)) + sp->run(); } else if (ssl_ || plain_) { @@ -328,7 +287,6 @@ Door::do_accept (boost::asio::yield_context yield) std::move(socket), remote_address); } } - server_.remove(); } } // ripple diff --git a/src/ripple/server/impl/Door.h b/src/ripple/server/impl/Door.h index df26ba828..67e1d3840 100644 --- a/src/ripple/server/impl/Door.h +++ b/src/ripple/server/impl/Door.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_SERVER_DOOR_H_INCLUDED #define RIPPLE_SERVER_DOOR_H_INCLUDED +#include #include #include #include @@ -36,20 +37,9 @@ namespace ripple { /** A listening socket. */ class Door - : public ServerImpl::Child + : public io_list::work + , public std::enable_shared_from_this { -public: - class Child - { - protected: - Door& door_; - - public: - Child (Door& door); - virtual ~Child(); - virtual void close() = 0; - }; - private: using clock_type = std::chrono::steady_clock; using timer_type = boost::asio::basic_waitable_timer; @@ -61,18 +51,23 @@ private: using socket_type = protocol_type::socket; // Detects SSL on a socket - class detector - : public Child - , public std::enable_shared_from_this + class Detector + : public io_list::work + , public std::enable_shared_from_this { private: + Port const& port_; + Handler& handler_; socket_type socket_; timer_type timer_; endpoint_type remote_address_; + boost::asio::io_service::strand strand_; + beast::Journal j_; public: - detector (Door& door, socket_type&& socket, - endpoint_type remote_address); + Detector (Port const& port, Handler& handler, + socket_type&& socket, endpoint_type remote_address, + beast::Journal j); void run(); void close() override; @@ -81,39 +76,17 @@ private: void do_detect (yield_context yield); }; - std::shared_ptr port_; - ServerImpl& server_; + beast::Journal j_; + Port const& port_; + Handler& handler_; acceptor_type acceptor_; boost::asio::io_service::strand strand_; - std::mutex mutex_; - std::condition_variable cond_; - boost::container::flat_map< - Child*, std::weak_ptr> list_; bool ssl_; bool plain_; public: - Door (boost::asio::io_service& io_service, - ServerImpl& server, Port const& port); - - /** Destroy the door. - Blocks until there are no pending I/O completion - handlers, and all connections have been destroyed. - close() must be called before the destructor. - */ - ~Door(); - - ServerImpl& - server() - { - return server_; - } - - Port const& - port() const - { - return *port_; - } + Door(Handler& handler, boost::asio::io_service& io_service, + Port const& port, beast::Journal j); // Work-around because we can't call shared_from_this in ctor void run(); @@ -126,11 +99,7 @@ public: */ void close(); - void remove (Child& c); - private: - void add (std::shared_ptr const& child); - template void create (bool ssl, ConstBufferSequence const& buffers, socket_type&& socket, endpoint_type remote_address); diff --git a/src/ripple/server/impl/JSONRPCUtil.cpp b/src/ripple/server/impl/JSONRPCUtil.cpp index 838272ae6..7dabcfb3f 100644 --- a/src/ripple/server/impl/JSONRPCUtil.cpp +++ b/src/ripple/server/impl/JSONRPCUtil.cpp @@ -28,8 +28,6 @@ namespace ripple { -unsigned int const gMaxHTTPHeaderSize = 0x02000000; - std::string getHTTPHeaderTimestamp () { // CHECKME This is probably called often enough that optimizing it makes diff --git a/src/ripple/server/impl/PlainHTTPPeer.h b/src/ripple/server/impl/PlainHTTPPeer.h index a2284c260..b955e128e 100644 --- a/src/ripple/server/impl/PlainHTTPPeer.h +++ b/src/ripple/server/impl/PlainHTTPPeer.h @@ -37,8 +37,9 @@ private: public: template - PlainHTTPPeer (Door& door, beast::Journal journal, endpoint_type endpoint, - ConstBufferSequence const& buffers, socket_type&& socket); + PlainHTTPPeer (Port const& port, Handler& handler, + beast::Journal journal, endpoint_type remote_address, + ConstBufferSequence const& buffers, socket_type&& socket); void run(); @@ -54,10 +55,10 @@ private: //------------------------------------------------------------------------------ template -PlainHTTPPeer::PlainHTTPPeer (Door& door, beast::Journal journal, - endpoint_type remote_address, ConstBufferSequence const& buffers, - socket_type&& socket) - : BaseHTTPPeer (door, socket.get_io_service(), journal, remote_address, buffers) +PlainHTTPPeer::PlainHTTPPeer (Port const& port, Handler& handler, + beast::Journal journal, endpoint_type remote_address, + ConstBufferSequence const& buffers, socket_type&& socket) + : BaseHTTPPeer(port, handler, socket.get_io_service(), journal, remote_address, buffers) , stream_(std::move(socket)) { } @@ -65,7 +66,7 @@ PlainHTTPPeer::PlainHTTPPeer (Door& door, beast::Journal journal, void PlainHTTPPeer::run () { - door_.server().handler()->onAccept (session()); + handler_.onAccept (session()); if (! stream_.is_open()) return; @@ -77,7 +78,7 @@ void PlainHTTPPeer::do_request() { ++request_count_; - auto const what = door_.server().handler()->onHandoff (session(), + auto const what = handler_.onHandoff (session(), std::move(stream_), std::move(message_), remote_address_); if (what.moved) return; @@ -98,7 +99,7 @@ PlainHTTPPeer::do_request() if (ec) return fail (ec, "request"); // legacy - door_.server().handler()->onRequest (session()); + handler_.onRequest (session()); } void diff --git a/src/ripple/server/impl/SSLHTTPPeer.h b/src/ripple/server/impl/SSLHTTPPeer.h index 86c56c040..07eeb6e9d 100644 --- a/src/ripple/server/impl/SSLHTTPPeer.h +++ b/src/ripple/server/impl/SSLHTTPPeer.h @@ -40,8 +40,9 @@ private: public: template - SSLHTTPPeer (Door& door, beast::Journal journal, endpoint_type remote_address, - ConstBufferSequence const& buffers, socket_type&& socket); + SSLHTTPPeer (Port const& port, Handler& handler, + beast::Journal journal, endpoint_type remote_address, + ConstBufferSequence const& buffers, socket_type&& socket); void run(); @@ -63,12 +64,12 @@ private: //------------------------------------------------------------------------------ template -SSLHTTPPeer::SSLHTTPPeer (Door& door, beast::Journal journal, - endpoint_type remote_address, ConstBufferSequence const& buffers, - socket_type&& socket) - : BaseHTTPPeer (door, socket.get_io_service(), journal, remote_address, buffers) +SSLHTTPPeer::SSLHTTPPeer (Port const& port, Handler& handler, + beast::Journal journal, endpoint_type remote_address, + ConstBufferSequence const& buffers, socket_type&& socket) + : BaseHTTPPeer (port, handler, socket.get_io_service(), journal, remote_address, buffers) , ssl_bundle_(std::make_unique( - port().context, std::move(socket))) + port.context, std::move(socket))) , stream_(ssl_bundle_->stream) { } @@ -77,7 +78,7 @@ SSLHTTPPeer::SSLHTTPPeer (Door& door, beast::Journal journal, void SSLHTTPPeer::run() { - door_.server().handler()->onAccept (session()); + handler_.onAccept (session()); if (! stream_.lowest_layer().is_open()) return; @@ -113,14 +114,14 @@ void SSLHTTPPeer::do_request() { ++request_count_; - auto const what = door_.server().handler()->onHandoff (session(), + auto const what = handler_.onHandoff (session(), std::move(ssl_bundle_), std::move(message_), remote_address_); if (what.moved) return; if (what.response) return write(what.response, what.keep_alive); // legacy - door_.server().handler()->onRequest (session()); + handler_.onRequest (session()); } void diff --git a/src/ripple/server/impl/ServerHandlerImp.cpp b/src/ripple/server/impl/ServerHandlerImp.cpp index 1f6d7fa8a..d4514cc81 100644 --- a/src/ripple/server/impl/ServerHandlerImp.cpp +++ b/src/ripple/server/impl/ServerHandlerImp.cpp @@ -48,7 +48,6 @@ namespace ripple { ServerHandler::ServerHandler (Stoppable& parent) : Stoppable ("ServerHandler", parent) - , Source ("server") { } @@ -449,14 +448,6 @@ ServerHandlerImp::authorized (Port const& port, //------------------------------------------------------------------------------ -void -ServerHandlerImp::onWrite (beast::PropertyStream::Map& map) -{ - m_server->onWrite (map); -} - -//------------------------------------------------------------------------------ - void ServerHandler::appendStandardFields (beast::http::message& message) { diff --git a/src/ripple/server/impl/ServerHandlerImp.h b/src/ripple/server/impl/ServerHandlerImp.h index fc43c54c8..8d43c2086 100644 --- a/src/ripple/server/impl/ServerHandlerImp.h +++ b/src/ripple/server/impl/ServerHandlerImp.h @@ -118,13 +118,6 @@ private: std::shared_ptr jobCoro, std::string forwardedFor, std::string user); - // - // PropertyStream - // - - void - onWrite (beast::PropertyStream::Map& map) override; - private: bool isWebsocketUpgrade (beast::http::message const& request); diff --git a/src/ripple/server/impl/ServerImpl.cpp b/src/ripple/server/impl/ServerImpl.cpp index 266cb4275..9a88c1e87 100644 --- a/src/ripple/server/impl/ServerImpl.cpp +++ b/src/ripple/server/impl/ServerImpl.cpp @@ -36,26 +36,20 @@ namespace ripple { ServerImpl::ServerImpl (Handler& handler, boost::asio::io_service& io_service, beast::Journal journal) - : handler_ (&handler) - , journal_ (journal) - , io_service_ (io_service) - , strand_ (io_service_) - , work_ (boost::in_place (std::ref(io_service))) - , hist_{} + : handler_(handler) + , j_(journal) + , io_service_(io_service) + , strand_(io_service_) + , work_(io_service_) { } ServerImpl::~ServerImpl() { - // Prevent call to handler - handler_ = nullptr; - close(); - { - // Block until all Doors are done accepting - std::unique_lock lock(mutex_); - while (accepting_ > 0) - cond_.wait(lock); - } + // Handler::onStopped will not be called + work_ = boost::none; + ios_.close(); + ios_.join(); } void @@ -63,53 +57,18 @@ ServerImpl::ports (std::vector const& ports) { if (closed()) Throw ("ports() on closed Server"); + ports_.reserve(ports.size()); for(auto const& port : ports) { if (! port.websockets()) { - ++accepting_; - list_.emplace_back(std::make_unique( - io_service_, *this, port)); - } - } - for(auto const& door : list_) - door->run(); -} - -void -ServerImpl::onWrite (beast::PropertyStream::Map& map) -{ - std::lock_guard lock (mutex_); - map ["active"] = list_.size(); - { - std::string s; - for (int i = 0; i <= high_; ++i) - { - if (i) - s += ", "; - s += std::to_string (hist_[i]); - } - map ["hist"] = s; - } - { - beast::PropertyStream::Set set ("history", map); - for (auto const& stat : stats_) - { - beast::PropertyStream::Map item (set); - - item ["id"] = stat.id; - + ports_.push_back(port); + if(auto sp = ios_.emplace(handler_, + io_service_, ports_.back(), j_)) { - std::stringstream ss; - ss << stat.elapsed; - item ["elapsed"] = ss.str(); + list_.push_back(sp); + sp->run(); } - - item ["requests"] = stat.requests; - item ["bytes_in"] = stat.bytes_in; - item ["bytes_out"] = stat.bytes_out; - if (stat.ec) - item ["error"] = stat.ec.message(); } } } @@ -117,91 +76,18 @@ ServerImpl::onWrite (beast::PropertyStream::Map& map) void ServerImpl::close() { - Handler* h = nullptr; + ios_.close( + [&] { - std::lock_guard lock(mutex_); - if (work_) - { - work_ = boost::none; - // Close all Door objects - if (accepting_ == 0) - { - std::swap (h, handler_); - } - else - { - for(auto& door : list_) - door->close(); - } - } - } - if (h) - h->onStopped(*this); -} - -//-------------------------------------------------------------------------- - -void -ServerImpl::remove() -{ - Handler* h = nullptr; - { - std::lock_guard lock(mutex_); - if(--accepting_ == 0) - { - cond_.notify_all(); - std::swap (h, handler_); - } - } - if (h) - h->onStopped(*this); + work_ = boost::none; + handler_.onStopped(*this); + }); } bool ServerImpl::closed() { - std::lock_guard lock(mutex_); - return ! work_; -} - -void -ServerImpl::report (Stat&& stat) -{ - int const bucket = ceil_log2 (stat.requests); - std::lock_guard lock (mutex_); - ++hist_[bucket]; - high_ = std::max (high_, bucket); - if (stats_.size() >= historySize) - stats_.pop_back(); - stats_.emplace_front (std::move(stat)); -} - -//-------------------------------------------------------------------------- - -int -ServerImpl::ceil_log2 (unsigned long long x) -{ - static const unsigned long long t[6] = { - 0xFFFFFFFF00000000ull, - 0x00000000FFFF0000ull, - 0x000000000000FF00ull, - 0x00000000000000F0ull, - 0x000000000000000Cull, - 0x0000000000000002ull - }; - - int y = (((x & (x - 1)) == 0) ? 0 : 1); - int j = 32; - int i; - - for(i = 0; i < 6; i++) { - int k = (((x & t[i]) == 0) ? 0 : j); - y += k; - x >>= k; - j >>= 1; - } - - return y; + return ios_.closed(); } //-------------------------------------------------------------------------- @@ -214,4 +100,3 @@ make_Server (Handler& handler, } } // ripple - diff --git a/src/ripple/server/impl/ServerImpl.h b/src/ripple/server/impl/ServerImpl.h index c17ab4cbb..3799fee95 100644 --- a/src/ripple/server/impl/ServerImpl.h +++ b/src/ripple/server/impl/ServerImpl.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -40,25 +41,8 @@ namespace ripple { class BasicPeer; class Door; -struct Stat -{ - std::size_t id; - std::chrono::seconds elapsed; - int requests; - std::size_t bytes_in; - std::size_t bytes_out; - boost::system::error_code ec; -}; - class ServerImpl : public Server { -public: - struct Child - { - virtual ~Child() = default; - virtual void close() = 0; - }; - private: using clock_type = std::chrono::system_clock; @@ -69,20 +53,19 @@ private: using Doors = std::vector >; - Handler* handler_; - beast::Journal journal_; + Handler& handler_; + beast::Journal j_; boost::asio::io_service& io_service_; boost::asio::io_service::strand strand_; boost::optional work_; - std::mutex mutable mutex_; - std::condition_variable cond_; - std::vector> list_; - std::size_t accepting_ = 0; - std::deque stats_; + std::mutex m_; + std::vector ports_; + std::vector> list_; int high_ = 0; - std::array hist_; + + io_list ios_; public: ServerImpl (Handler& handler, @@ -93,40 +76,31 @@ public: beast::Journal journal() override { - return journal_; + return j_; } void ports (std::vector const& ports) override; - void - onWrite (beast::PropertyStream::Map& map) override; - void close() override; -public: - Handler* - handler() + io_list& + ios() { - return handler_; + return ios_; } +public: boost::asio::io_service& get_io_service() { return io_service_; } - void - remove(); - bool closed(); - void - report (Stat&& stat); - private: static int diff --git a/src/ripple/server/impl/io_list.h b/src/ripple/server/impl/io_list.h new file mode 100644 index 000000000..faff29ed9 --- /dev/null +++ b/src/ripple/server/impl/io_list.h @@ -0,0 +1,272 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_SERVER_IO_LIST_H_INCLUDED +#define RIPPLE_SERVER_IO_LIST_H_INCLUDED + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { + +/** Manages a set of objects performing asynchronous I/O. */ +class io_list final +{ +public: + class work + { + template + void destroy(); + + friend class io_list; + io_list* ios_ = nullptr; + + public: + virtual ~work() + { + destroy(); + } + + /** Return the io_list associated with the work. + + Requirements: + The call to io_list::emplace to + create the work has already returned. + */ + io_list& + ios() + { + return *ios_; + } + + virtual void close() = 0; + }; + +private: + template + void destroy(); + + std::mutex m_; + std::size_t n_ = 0; + bool closed_ = false; + std::condition_variable cv_; + boost::container::flat_map> map_; + std::function f_; + +public: + io_list() = default; + + /** Destroy the list. + + Effects: + Closes the io_list if it was not previously + closed. No finisher is invoked in this case. + + Blocks until all work is destroyed. + */ + ~io_list() + { + destroy(); + } + + /** Return `true` if the list is closed. + + Thread Safety: + Undefined result if called concurrently + with close(). + */ + bool + closed() const + { + return closed_; + } + + /** Create associated work if not closed. + + Requirements: + `std::is_base_of_v == true` + + Thread Safety: + May be called concurrently. + + Effects: + Atomically creates, inserts, and returns new + work T, or returns nullptr if the io_list is + closed, + + If the call succeeds and returns a new object, + it is guaranteed that a subsequent call to close + will invoke work::close on the object. + + */ + template + std::shared_ptr + emplace(Args&&... args); + + /** Cancel active I/O. + + Thread Safety: + May not be called concurrently. + + Effects: + Associated work is closed. + + Finisher if provided, will be called when + all associated work is destroyed. The finisher + may be called from a foreign thread, or within + the call to this function. + + Only the first call to close will set the + finisher. + + No effect after the first call. + */ + template + void + close(Finisher&& f); + + void + close() + { + close([]{}); + } + + /** Block until the io_list stops. + + Effects: + The caller is blocked until the io_list is + closed and all associated work is destroyed. + + Thread safety: + May be called concurrently. + + Preconditions: + No call to io_service::run on any io_service + used by work objects associated with this io_list + exists in the caller's call stack. + */ + template + void + join(); +}; + +//------------------------------------------------------------------------------ + +template +void +io_list::work::destroy() +{ + if(! ios_) + return; + std::function f; + { + std::lock_guard< + std::mutex> lock(ios_->m_); + ios_->map_.erase(this); + if(--ios_->n_ == 0 && + ios_->closed_) + { + std::swap(f, ios_->f_); + ios_->cv_.notify_all(); + } + } + if(f) + f(); +} + +template +void +io_list::destroy() +{ + close(); + join(); +} + +template +std::shared_ptr +io_list::emplace(Args&&... args) +{ + static_assert(std::is_base_of::value, + "T must derive from io_list::work"); + if(closed_) + return nullptr; + auto sp = std::make_shared( + std::forward(args)...); + decltype(sp) dead; + + std::lock_guard lock(m_); + if(! closed_) + { + ++n_; + sp->work::ios_ = this; + map_.emplace(sp.get(), sp); + } + else + { + std::swap(sp, dead); + } + return sp; +} + +template +void +io_list::close(Finisher&& f) +{ + std::unique_lock lock(m_); + if(closed_) + return; + closed_ = true; + auto map = std::move(map_); + if(! map.empty()) + { + f_ = std::forward(f); + lock.unlock(); + for(auto const& p : map) + if(auto sp = p.second.lock()) + sp->close(); + } + else + { + lock.unlock(); + f(); + } +} + +template +void +io_list::join() +{ + std::unique_lock lock(m_); + cv_.wait(lock, + [&] + { + return closed_ && n_ == 0; + }); +} + +} // ripple + +#endif diff --git a/src/ripple/server/tests/Server_test.cpp b/src/ripple/server/tests/Server_test.cpp index 6469db070..10375337e 100644 --- a/src/ripple/server/tests/Server_test.cpp +++ b/src/ripple/server/tests/Server_test.cpp @@ -361,7 +361,7 @@ public: }; NullHandler h; - for(int i = 0; i < 10000; ++i) + for(int i = 0; i < 1000; ++i) { TestThread thread; auto s = make_Server(h,