Fix Server race conditions:

Class io_list manages children that perform asynchronous
I/O operations. The treatment of close and destruction is
refactored to fix race conditions during exit.
This commit is contained in:
Vinnie Falco
2016-02-08 10:19:25 -05:00
parent 137dd351b8
commit 0f7dbc7bc0
17 changed files with 434 additions and 400 deletions

View File

@@ -3313,6 +3313,8 @@
</ClCompile> </ClCompile>
<ClInclude Include="..\..\src\ripple\server\impl\Door.h"> <ClInclude Include="..\..\src\ripple\server\impl\Door.h">
</ClInclude> </ClInclude>
<ClInclude Include="..\..\src\ripple\server\impl\io_list.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\server\impl\JSONRPCUtil.cpp"> <ClCompile Include="..\..\src\ripple\server\impl\JSONRPCUtil.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug|x64'">True</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='release|x64'">True</ExcludedFromBuild>

View File

@@ -3801,6 +3801,9 @@
<ClInclude Include="..\..\src\ripple\server\impl\Door.h"> <ClInclude Include="..\..\src\ripple\server\impl\Door.h">
<Filter>ripple\server\impl</Filter> <Filter>ripple\server\impl</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="..\..\src\ripple\server\impl\io_list.h">
<Filter>ripple\server\impl</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\server\impl\JSONRPCUtil.cpp"> <ClCompile Include="..\..\src\ripple\server\impl\JSONRPCUtil.cpp">
<Filter>ripple\server\impl</Filter> <Filter>ripple\server\impl</Filter>
</ClCompile> </ClCompile>

View File

@@ -520,7 +520,6 @@ public:
m_nodeStoreScheduler.setJobQueue (*m_jobQueue); m_nodeStoreScheduler.setJobQueue (*m_jobQueue);
add (m_ledgerMaster->getPropertySource ()); add (m_ledgerMaster->getPropertySource ());
add (*serverHandler_);
} }
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------

View File

@@ -49,10 +49,6 @@ public:
void void
ports (std::vector<Port> const& v) = 0; ports (std::vector<Port> const& v) = 0;
virtual
void
onWrite (beast::PropertyStream::Map& map) = 0;
/** Close the server. /** Close the server.
The close is performed asynchronously. The handler will be notified The close is performed asynchronously. The handler will be notified
when the server has stopped. The server is considered stopped when when the server has stopped. The server is considered stopped when

View File

@@ -25,16 +25,13 @@
#include <ripple/server/Port.h> #include <ripple/server/Port.h>
#include <ripple/overlay/Overlay.h> #include <ripple/overlay/Overlay.h>
#include <beast/utility/Journal.h> #include <beast/utility/Journal.h>
#include <beast/utility/PropertyStream.h>
#include <boost/asio/ip/address.hpp> #include <boost/asio/ip/address.hpp>
#include <memory> #include <memory>
#include <vector> #include <vector>
namespace ripple { namespace ripple {
class ServerHandler class ServerHandler : public beast::Stoppable
: public beast::Stoppable
, public beast::PropertyStream::Source
{ {
protected: protected:
ServerHandler (Stoppable& parent); ServerHandler (Stoppable& parent);

View File

@@ -20,8 +20,9 @@
#ifndef RIPPLE_SERVER_BASEHTTPPEER_H_INCLUDED #ifndef RIPPLE_SERVER_BASEHTTPPEER_H_INCLUDED
#define RIPPLE_SERVER_BASEHTTPPEER_H_INCLUDED #define RIPPLE_SERVER_BASEHTTPPEER_H_INCLUDED
#include <ripple/server/impl/Door.h>
#include <ripple/server/Session.h> #include <ripple/server/Session.h>
#include <ripple/server/impl/Door.h>
#include <ripple/server/impl/io_list.h>
#include <ripple/server/impl/ServerImpl.h> #include <ripple/server/impl/ServerImpl.h>
#include <beast/asio/IPAddressConversion.h> #include <beast/asio/IPAddressConversion.h>
#include <beast/asio/placeholders.h> #include <beast/asio/placeholders.h>
@@ -46,7 +47,7 @@ namespace ripple {
/** Represents an active connection. */ /** Represents an active connection. */
template <class Impl> template <class Impl>
class BaseHTTPPeer class BaseHTTPPeer
: public Door::Child : public io_list::work
, public Session , public Session
{ {
protected: protected:
@@ -81,6 +82,8 @@ protected:
std::size_t used; std::size_t used;
}; };
Port const& port_;
Handler& handler_;
boost::asio::io_service::work work_; boost::asio::io_service::work work_;
boost::asio::io_service::strand strand_; boost::asio::io_service::strand strand_;
waitable_timer timer_; waitable_timer timer_;
@@ -108,9 +111,9 @@ protected:
public: public:
template <class ConstBufferSequence> template <class ConstBufferSequence>
BaseHTTPPeer (Door& door, boost::asio::io_service& io_service, BaseHTTPPeer (Port const& port, Handler& handler,
beast::Journal journal, endpoint_type remote_address, boost::asio::io_service& io_service, beast::Journal journal,
ConstBufferSequence const& buffers); endpoint_type remote_address, ConstBufferSequence const& buffers);
virtual virtual
~BaseHTTPPeer(); ~BaseHTTPPeer();
@@ -165,13 +168,13 @@ protected:
beast::Journal beast::Journal
journal() override journal() override
{ {
return door_.server().journal(); return journal_;
} }
Port const& Port const&
port() override port() override
{ {
return door_.port(); return port_;
} }
beast::IP::Endpoint beast::IP::Endpoint
@@ -213,10 +216,12 @@ protected:
template <class Impl> template <class Impl>
template <class ConstBufferSequence> template <class ConstBufferSequence>
BaseHTTPPeer<Impl>::BaseHTTPPeer (Door& door, boost::asio::io_service& io_service, BaseHTTPPeer<Impl>::BaseHTTPPeer (Port const& port, Handler& handler,
beast::Journal journal, endpoint_type remote_address, boost::asio::io_service& io_service, beast::Journal journal,
endpoint_type remote_address,
ConstBufferSequence const& buffers) ConstBufferSequence const& buffers)
: Child(door) : port_(port)
, handler_(handler)
, work_ (io_service) , work_ (io_service)
, strand_ (io_service) , strand_ (io_service)
, timer_ (io_service) , timer_ (io_service)
@@ -236,17 +241,7 @@ BaseHTTPPeer<Impl>::BaseHTTPPeer (Door& door, boost::asio::io_service& io_servic
template <class Impl> template <class Impl>
BaseHTTPPeer<Impl>::~BaseHTTPPeer() BaseHTTPPeer<Impl>::~BaseHTTPPeer()
{ {
Stat stat; handler_.onClose(session(), ec_);
stat.id = nid_;
stat.elapsed = std::chrono::duration_cast <
std::chrono::seconds> (clock_type::now() - when_);
stat.requests = request_count_;
stat.bytes_in = bytes_in_;
stat.bytes_out = bytes_out_;
stat.ec = std::move (ec_);
door_.server().report (std::move (stat));
if(door_.server().handler())
door_.server().handler()->onClose(session(), ec_);
if (journal_.trace) journal_.trace << id_ << if (journal_.trace) journal_.trace << id_ <<
"destroyed: " << request_count_ << "destroyed: " << request_count_ <<
((request_count_ == 1) ? " request" : " requests"); ((request_count_ == 1) ? " request" : " requests");

View File

@@ -19,6 +19,7 @@
#include <BeastConfig.h> #include <BeastConfig.h>
#include <ripple/basics/contract.h> #include <ripple/basics/contract.h>
#include <ripple/basics/Log.h>
#include <ripple/server/impl/Door.h> #include <ripple/server/impl/Door.h>
#include <ripple/server/impl/PlainHTTPPeer.h> #include <ripple/server/impl/PlainHTTPPeer.h>
#include <ripple/server/impl/SSLHTTPPeer.h> #include <ripple/server/impl/SSLHTTPPeer.h>
@@ -35,7 +36,7 @@ namespace ripple {
read from the stream until there is enough to determine a result. read from the stream until there is enough to determine a result.
No bytes are discarded from buf. Any additional bytes read are retained. No bytes are discarded from buf. Any additional bytes read are retained.
buf must provide an interface compatible with boost::asio::streambuf buf must provide an interface compatible with boost::asio::streambuf
http://boost.org/doc/libs/1_56_0/doc/html/boost_asio/reference/streambuf.html http://boost.org/doc/libs/1_56_0/doc/html/boost_asio/reference/streambuf.html
See See
http://www.ietf.org/rfc/rfc2246.txt http://www.ietf.org/rfc/rfc2246.txt
Section 7.4. Handshake protocol Section 7.4. Handshake protocol
@@ -81,41 +82,33 @@ detect_ssl (Socket& socket, StreamBuf& buf, Yield yield)
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
Door::Child::Child(Door& door) Door::Detector::Detector(Port const& port,
: door_(door) Handler& handler, socket_type&& socket,
{ endpoint_type remote_address, beast::Journal j)
} : port_(port)
, handler_(handler)
Door::Child::~Child()
{
door_.remove(*this);
}
//------------------------------------------------------------------------------
Door::detector::detector (Door& door, socket_type&& socket,
endpoint_type remote_address)
: Child(door)
, socket_(std::move(socket)) , socket_(std::move(socket))
, timer_(socket_.get_io_service()) , timer_(socket_.get_io_service())
, remote_address_(remote_address) , remote_address_(remote_address)
, strand_(socket_.get_io_service())
, j_(j)
{ {
} }
void void
Door::detector::run() Door::Detector::run()
{ {
// do_detect must be called before do_timer or else // do_detect must be called before do_timer or else
// the timer can be canceled before it gets set. // the timer can be canceled before it gets set.
boost::asio::spawn (door_.strand_, std::bind (&detector::do_detect, boost::asio::spawn (strand_, std::bind (&Detector::do_detect,
shared_from_this(), std::placeholders::_1)); shared_from_this(), std::placeholders::_1));
boost::asio::spawn (door_.strand_, std::bind (&detector::do_timer, boost::asio::spawn (strand_, std::bind (&Detector::do_timer,
shared_from_this(), std::placeholders::_1)); shared_from_this(), std::placeholders::_1));
} }
void void
Door::detector::close() Door::Detector::close()
{ {
error_code ec; error_code ec;
socket_.close(ec); socket_.close(ec);
@@ -123,7 +116,7 @@ Door::detector::close()
} }
void void
Door::detector::do_timer (yield_context yield) Door::Detector::do_timer (yield_context yield)
{ {
error_code ec; // ignored error_code ec; // ignored
while (socket_.is_open()) while (socket_.is_open())
@@ -135,7 +128,7 @@ Door::detector::do_timer (yield_context yield)
} }
void void
Door::detector::do_detect (boost::asio::yield_context yield) Door::Detector::do_detect (boost::asio::yield_context yield)
{ {
bool ssl; bool ssl;
error_code ec; error_code ec;
@@ -145,29 +138,45 @@ Door::detector::do_detect (boost::asio::yield_context yield)
error_code unused; error_code unused;
timer_.cancel(unused); timer_.cancel(unused);
if (! ec) if (! ec)
return door_.create(ssl, buf.data(), {
std::move(socket_), remote_address_); if (ssl)
{
if(auto sp = ios().emplace<SSLHTTPPeer>(port_, handler_,
j_, remote_address_, buf.data(),
std::move(socket_)))
sp->run();
return;
}
if(auto sp = ios().emplace<PlainHTTPPeer>(port_, handler_,
j_, remote_address_, buf.data(),
std::move(socket_)))
sp->run();
return;
}
if (ec != boost::asio::error::operation_aborted) if (ec != boost::asio::error::operation_aborted)
if (door_.server_.journal().trace) door_.server_.journal().trace << {
JLOG(j_.trace) <<
"Error detecting ssl: " << ec.message() << "Error detecting ssl: " << ec.message() <<
" from " << remote_address_; " from " << remote_address_;
}
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
Door::Door (boost::asio::io_service& io_service, Door::Door (Handler& handler, boost::asio::io_service& io_service,
ServerImpl& server, Port const& port) Port const& port, beast::Journal j)
: port_(std::make_shared<Port>(port)) : j_(j)
, server_(server) , port_(port)
, handler_(handler)
, acceptor_(io_service) , acceptor_(io_service)
, strand_(io_service) , strand_(io_service)
, ssl_ ( , ssl_ (
port_->protocol.count("https") > 0 || port_.protocol.count("https") > 0 ||
//port_->protocol.count("wss") > 0 || //port_.protocol.count("wss") > 0 ||
port_->protocol.count("peer") > 0) port_.protocol.count("peer") > 0)
, plain_ ( , plain_ (
//port_->protocol.count("ws") > 0 || //port_.protocol.count("ws") > 0 ||
port_->protocol.count("http") > 0) port_.protocol.count("http") > 0)
{ {
error_code ec; error_code ec;
endpoint_type const local_address = endpoint_type const local_address =
@@ -176,7 +185,7 @@ Door::Door (boost::asio::io_service& io_service,
acceptor_.open(local_address.protocol(), ec); acceptor_.open(local_address.protocol(), ec);
if (ec) if (ec)
{ {
if (server_.journal().error) server_.journal().error << JLOG(j_.error) <<
"Open port '" << port.name << "' failed:" << ec.message(); "Open port '" << port.name << "' failed:" << ec.message();
Throw<std::exception> (); Throw<std::exception> ();
} }
@@ -185,7 +194,7 @@ Door::Door (boost::asio::io_service& io_service,
boost::asio::ip::tcp::acceptor::reuse_address(true), ec); boost::asio::ip::tcp::acceptor::reuse_address(true), ec);
if (ec) if (ec)
{ {
if (server_.journal().error) server_.journal().error << JLOG(j_.error) <<
"Option for port '" << port.name << "' failed:" << ec.message(); "Option for port '" << port.name << "' failed:" << ec.message();
Throw<std::exception> (); Throw<std::exception> ();
} }
@@ -193,7 +202,7 @@ Door::Door (boost::asio::io_service& io_service,
acceptor_.bind(local_address, ec); acceptor_.bind(local_address, ec);
if (ec) if (ec)
{ {
if (server_.journal().error) server_.journal().error << JLOG(j_.error) <<
"Bind port '" << port.name << "' failed:" << ec.message(); "Bind port '" << port.name << "' failed:" << ec.message();
Throw<std::exception> (); Throw<std::exception> ();
} }
@@ -201,30 +210,20 @@ Door::Door (boost::asio::io_service& io_service,
acceptor_.listen(boost::asio::socket_base::max_connections, ec); acceptor_.listen(boost::asio::socket_base::max_connections, ec);
if (ec) if (ec)
{ {
if (server_.journal().error) server_.journal().error << JLOG(j_.error) <<
"Listen on port '" << port.name << "' failed:" << ec.message(); "Listen on port '" << port.name << "' failed:" << ec.message();
Throw<std::exception> (); Throw<std::exception> ();
} }
if (server_.journal().info) server_.journal().info << JLOG(j_.info) <<
"Opened " << port; "Opened " << port;
} }
Door::~Door()
{
{
// Block until all detector, Peer objects destroyed
std::unique_lock<std::mutex> lock(mutex_);
while (! list_.empty())
cond_.wait(lock);
}
}
void void
Door::run() Door::run()
{ {
boost::asio::spawn (strand_, std::bind (&Door::do_accept, boost::asio::spawn (strand_, std::bind(&Door::do_accept,
this, std::placeholders::_1)); shared_from_this(), std::placeholders::_1));
} }
void void
@@ -232,70 +231,30 @@ Door::close()
{ {
if (! strand_.running_in_this_thread()) if (! strand_.running_in_this_thread())
return strand_.post(std::bind( return strand_.post(std::bind(
&Door::close, this)); &Door::close, shared_from_this()));
error_code ec; error_code ec;
acceptor_.close(ec); acceptor_.close(ec);
// Close all detector, Peer objects
std::vector<std::shared_ptr<Child>> v;
{
std::lock_guard<std::mutex> lock(mutex_);
for(auto& p : list_)
{
auto const peer = p.second.lock();
if (peer != nullptr)
{
peer->close();
// Must destroy shared_ptr outside the
// lock otherwise deadlock from the
// managed object's destructor.
v.emplace_back(std::move(peer));
}
}
}
}
void
Door::remove (Child& c)
{
std::lock_guard<std::mutex> lock(mutex_);
auto const n = list_.erase(&c);
if(n != 1)
Throw<std::runtime_error>("missing child");
if (list_.empty())
cond_.notify_all();
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void
Door::add (std::shared_ptr<Child> const& child)
{
std::lock_guard<std::mutex> lock(mutex_);
list_.emplace(child.get(), child);
}
template <class ConstBufferSequence> template <class ConstBufferSequence>
void void
Door::create (bool ssl, ConstBufferSequence const& buffers, Door::create (bool ssl, ConstBufferSequence const& buffers,
socket_type&& socket, endpoint_type remote_address) socket_type&& socket, endpoint_type remote_address)
{ {
if (server_.closed())
return;
if (ssl) if (ssl)
{ {
auto const peer = std::make_shared <SSLHTTPPeer> (*this, if(auto sp = ios().emplace<SSLHTTPPeer>(port_, handler_,
server_.journal(), remote_address, buffers, j_, remote_address, buffers,
std::move(socket)); std::move(socket)))
add(peer); sp->run();
return peer->run(); return;
} }
if(auto sp = ios().emplace<PlainHTTPPeer>(port_, handler_,
auto const peer = std::make_shared <PlainHTTPPeer> (*this, j_, remote_address, buffers,
server_.journal(), remote_address, buffers, std::move(socket)))
std::move(socket)); sp->run();
add(peer);
peer->run();
} }
void void
@@ -308,19 +267,19 @@ Door::do_accept (boost::asio::yield_context yield)
socket_type socket (acceptor_.get_io_service()); socket_type socket (acceptor_.get_io_service());
acceptor_.async_accept (socket, remote_address, yield[ec]); acceptor_.async_accept (socket, remote_address, yield[ec]);
if (ec && ec != boost::asio::error::operation_aborted) if (ec && ec != boost::asio::error::operation_aborted)
if (server_.journal().error) server_.journal().error << if (j_.error) j_.error <<
"accept: " << ec.message(); "accept: " << ec.message();
if (ec == boost::asio::error::operation_aborted || server_.closed()) if (ec == boost::asio::error::operation_aborted)
break; break;
if (ec) if (ec)
continue; continue;
if (ssl_ && plain_) if (ssl_ && plain_)
{ {
auto const c = std::make_shared <detector> ( if(auto sp = ios().emplace<Detector>(port_,
*this, std::move(socket), remote_address); handler_, std::move(socket), remote_address,
add(c); j_))
c->run(); sp->run();
} }
else if (ssl_ || plain_) else if (ssl_ || plain_)
{ {
@@ -328,7 +287,6 @@ Door::do_accept (boost::asio::yield_context yield)
std::move(socket), remote_address); std::move(socket), remote_address);
} }
} }
server_.remove();
} }
} // ripple } // ripple

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_SERVER_DOOR_H_INCLUDED #ifndef RIPPLE_SERVER_DOOR_H_INCLUDED
#define RIPPLE_SERVER_DOOR_H_INCLUDED #define RIPPLE_SERVER_DOOR_H_INCLUDED
#include <ripple/server/impl/io_list.h>
#include <ripple/server/impl/ServerImpl.h> #include <ripple/server/impl/ServerImpl.h>
#include <beast/asio/streambuf.h> #include <beast/asio/streambuf.h>
#include <boost/asio/basic_waitable_timer.hpp> #include <boost/asio/basic_waitable_timer.hpp>
@@ -36,20 +37,9 @@ namespace ripple {
/** A listening socket. */ /** A listening socket. */
class Door class Door
: public ServerImpl::Child : public io_list::work
, public std::enable_shared_from_this<Door>
{ {
public:
class Child
{
protected:
Door& door_;
public:
Child (Door& door);
virtual ~Child();
virtual void close() = 0;
};
private: private:
using clock_type = std::chrono::steady_clock; using clock_type = std::chrono::steady_clock;
using timer_type = boost::asio::basic_waitable_timer<clock_type>; using timer_type = boost::asio::basic_waitable_timer<clock_type>;
@@ -61,18 +51,23 @@ private:
using socket_type = protocol_type::socket; using socket_type = protocol_type::socket;
// Detects SSL on a socket // Detects SSL on a socket
class detector class Detector
: public Child : public io_list::work
, public std::enable_shared_from_this <detector> , public std::enable_shared_from_this<Detector>
{ {
private: private:
Port const& port_;
Handler& handler_;
socket_type socket_; socket_type socket_;
timer_type timer_; timer_type timer_;
endpoint_type remote_address_; endpoint_type remote_address_;
boost::asio::io_service::strand strand_;
beast::Journal j_;
public: public:
detector (Door& door, socket_type&& socket, Detector (Port const& port, Handler& handler,
endpoint_type remote_address); socket_type&& socket, endpoint_type remote_address,
beast::Journal j);
void run(); void run();
void close() override; void close() override;
@@ -81,39 +76,17 @@ private:
void do_detect (yield_context yield); void do_detect (yield_context yield);
}; };
std::shared_ptr<Port> port_; beast::Journal j_;
ServerImpl& server_; Port const& port_;
Handler& handler_;
acceptor_type acceptor_; acceptor_type acceptor_;
boost::asio::io_service::strand strand_; boost::asio::io_service::strand strand_;
std::mutex mutex_;
std::condition_variable cond_;
boost::container::flat_map<
Child*, std::weak_ptr<Child>> list_;
bool ssl_; bool ssl_;
bool plain_; bool plain_;
public: public:
Door (boost::asio::io_service& io_service, Door(Handler& handler, boost::asio::io_service& io_service,
ServerImpl& server, Port const& port); Port const& port, beast::Journal j);
/** Destroy the door.
Blocks until there are no pending I/O completion
handlers, and all connections have been destroyed.
close() must be called before the destructor.
*/
~Door();
ServerImpl&
server()
{
return server_;
}
Port const&
port() const
{
return *port_;
}
// Work-around because we can't call shared_from_this in ctor // Work-around because we can't call shared_from_this in ctor
void run(); void run();
@@ -126,11 +99,7 @@ public:
*/ */
void close(); void close();
void remove (Child& c);
private: private:
void add (std::shared_ptr<Child> const& child);
template <class ConstBufferSequence> template <class ConstBufferSequence>
void create (bool ssl, ConstBufferSequence const& buffers, void create (bool ssl, ConstBufferSequence const& buffers,
socket_type&& socket, endpoint_type remote_address); socket_type&& socket, endpoint_type remote_address);

View File

@@ -28,8 +28,6 @@
namespace ripple { namespace ripple {
unsigned int const gMaxHTTPHeaderSize = 0x02000000;
std::string getHTTPHeaderTimestamp () std::string getHTTPHeaderTimestamp ()
{ {
// CHECKME This is probably called often enough that optimizing it makes // CHECKME This is probably called often enough that optimizing it makes

View File

@@ -37,8 +37,9 @@ private:
public: public:
template <class ConstBufferSequence> template <class ConstBufferSequence>
PlainHTTPPeer (Door& door, beast::Journal journal, endpoint_type endpoint, PlainHTTPPeer (Port const& port, Handler& handler,
ConstBufferSequence const& buffers, socket_type&& socket); beast::Journal journal, endpoint_type remote_address,
ConstBufferSequence const& buffers, socket_type&& socket);
void void
run(); run();
@@ -54,10 +55,10 @@ private:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
template <class ConstBufferSequence> template <class ConstBufferSequence>
PlainHTTPPeer::PlainHTTPPeer (Door& door, beast::Journal journal, PlainHTTPPeer::PlainHTTPPeer (Port const& port, Handler& handler,
endpoint_type remote_address, ConstBufferSequence const& buffers, beast::Journal journal, endpoint_type remote_address,
socket_type&& socket) ConstBufferSequence const& buffers, socket_type&& socket)
: BaseHTTPPeer (door, socket.get_io_service(), journal, remote_address, buffers) : BaseHTTPPeer(port, handler, socket.get_io_service(), journal, remote_address, buffers)
, stream_(std::move(socket)) , stream_(std::move(socket))
{ {
} }
@@ -65,7 +66,7 @@ PlainHTTPPeer::PlainHTTPPeer (Door& door, beast::Journal journal,
void void
PlainHTTPPeer::run () PlainHTTPPeer::run ()
{ {
door_.server().handler()->onAccept (session()); handler_.onAccept (session());
if (! stream_.is_open()) if (! stream_.is_open())
return; return;
@@ -77,7 +78,7 @@ void
PlainHTTPPeer::do_request() PlainHTTPPeer::do_request()
{ {
++request_count_; ++request_count_;
auto const what = door_.server().handler()->onHandoff (session(), auto const what = handler_.onHandoff (session(),
std::move(stream_), std::move(message_), remote_address_); std::move(stream_), std::move(message_), remote_address_);
if (what.moved) if (what.moved)
return; return;
@@ -98,7 +99,7 @@ PlainHTTPPeer::do_request()
if (ec) if (ec)
return fail (ec, "request"); return fail (ec, "request");
// legacy // legacy
door_.server().handler()->onRequest (session()); handler_.onRequest (session());
} }
void void

View File

@@ -40,8 +40,9 @@ private:
public: public:
template <class ConstBufferSequence> template <class ConstBufferSequence>
SSLHTTPPeer (Door& door, beast::Journal journal, endpoint_type remote_address, SSLHTTPPeer (Port const& port, Handler& handler,
ConstBufferSequence const& buffers, socket_type&& socket); beast::Journal journal, endpoint_type remote_address,
ConstBufferSequence const& buffers, socket_type&& socket);
void void
run(); run();
@@ -63,12 +64,12 @@ private:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
template <class ConstBufferSequence> template <class ConstBufferSequence>
SSLHTTPPeer::SSLHTTPPeer (Door& door, beast::Journal journal, SSLHTTPPeer::SSLHTTPPeer (Port const& port, Handler& handler,
endpoint_type remote_address, ConstBufferSequence const& buffers, beast::Journal journal, endpoint_type remote_address,
socket_type&& socket) ConstBufferSequence const& buffers, socket_type&& socket)
: BaseHTTPPeer (door, socket.get_io_service(), journal, remote_address, buffers) : BaseHTTPPeer (port, handler, socket.get_io_service(), journal, remote_address, buffers)
, ssl_bundle_(std::make_unique<beast::asio::ssl_bundle>( , ssl_bundle_(std::make_unique<beast::asio::ssl_bundle>(
port().context, std::move(socket))) port.context, std::move(socket)))
, stream_(ssl_bundle_->stream) , stream_(ssl_bundle_->stream)
{ {
} }
@@ -77,7 +78,7 @@ SSLHTTPPeer::SSLHTTPPeer (Door& door, beast::Journal journal,
void void
SSLHTTPPeer::run() SSLHTTPPeer::run()
{ {
door_.server().handler()->onAccept (session()); handler_.onAccept (session());
if (! stream_.lowest_layer().is_open()) if (! stream_.lowest_layer().is_open())
return; return;
@@ -113,14 +114,14 @@ void
SSLHTTPPeer::do_request() SSLHTTPPeer::do_request()
{ {
++request_count_; ++request_count_;
auto const what = door_.server().handler()->onHandoff (session(), auto const what = handler_.onHandoff (session(),
std::move(ssl_bundle_), std::move(message_), remote_address_); std::move(ssl_bundle_), std::move(message_), remote_address_);
if (what.moved) if (what.moved)
return; return;
if (what.response) if (what.response)
return write(what.response, what.keep_alive); return write(what.response, what.keep_alive);
// legacy // legacy
door_.server().handler()->onRequest (session()); handler_.onRequest (session());
} }
void void

View File

@@ -48,7 +48,6 @@ namespace ripple {
ServerHandler::ServerHandler (Stoppable& parent) ServerHandler::ServerHandler (Stoppable& parent)
: Stoppable ("ServerHandler", parent) : Stoppable ("ServerHandler", parent)
, Source ("server")
{ {
} }
@@ -449,14 +448,6 @@ ServerHandlerImp::authorized (Port const& port,
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void
ServerHandlerImp::onWrite (beast::PropertyStream::Map& map)
{
m_server->onWrite (map);
}
//------------------------------------------------------------------------------
void void
ServerHandler::appendStandardFields (beast::http::message& message) ServerHandler::appendStandardFields (beast::http::message& message)
{ {

View File

@@ -118,13 +118,6 @@ private:
std::shared_ptr<JobCoro> jobCoro, std::shared_ptr<JobCoro> jobCoro,
std::string forwardedFor, std::string user); std::string forwardedFor, std::string user);
//
// PropertyStream
//
void
onWrite (beast::PropertyStream::Map& map) override;
private: private:
bool bool
isWebsocketUpgrade (beast::http::message const& request); isWebsocketUpgrade (beast::http::message const& request);

View File

@@ -36,26 +36,20 @@ namespace ripple {
ServerImpl::ServerImpl (Handler& handler, ServerImpl::ServerImpl (Handler& handler,
boost::asio::io_service& io_service, beast::Journal journal) boost::asio::io_service& io_service, beast::Journal journal)
: handler_ (&handler) : handler_(handler)
, journal_ (journal) , j_(journal)
, io_service_ (io_service) , io_service_(io_service)
, strand_ (io_service_) , strand_(io_service_)
, work_ (boost::in_place (std::ref(io_service))) , work_(io_service_)
, hist_{}
{ {
} }
ServerImpl::~ServerImpl() ServerImpl::~ServerImpl()
{ {
// Prevent call to handler // Handler::onStopped will not be called
handler_ = nullptr; work_ = boost::none;
close(); ios_.close();
{ ios_.join();
// Block until all Doors are done accepting
std::unique_lock<std::mutex> lock(mutex_);
while (accepting_ > 0)
cond_.wait(lock);
}
} }
void void
@@ -63,53 +57,18 @@ ServerImpl::ports (std::vector<Port> const& ports)
{ {
if (closed()) if (closed())
Throw<std::logic_error> ("ports() on closed Server"); Throw<std::logic_error> ("ports() on closed Server");
ports_.reserve(ports.size());
for(auto const& port : ports) for(auto const& port : ports)
{ {
if (! port.websockets()) if (! port.websockets())
{ {
++accepting_; ports_.push_back(port);
list_.emplace_back(std::make_unique<Door>( if(auto sp = ios_.emplace<Door>(handler_,
io_service_, *this, port)); io_service_, ports_.back(), j_))
}
}
for(auto const& door : list_)
door->run();
}
void
ServerImpl::onWrite (beast::PropertyStream::Map& map)
{
std::lock_guard <std::mutex> lock (mutex_);
map ["active"] = list_.size();
{
std::string s;
for (int i = 0; i <= high_; ++i)
{
if (i)
s += ", ";
s += std::to_string (hist_[i]);
}
map ["hist"] = s;
}
{
beast::PropertyStream::Set set ("history", map);
for (auto const& stat : stats_)
{
beast::PropertyStream::Map item (set);
item ["id"] = stat.id;
{ {
std::stringstream ss; list_.push_back(sp);
ss << stat.elapsed; sp->run();
item ["elapsed"] = ss.str();
} }
item ["requests"] = stat.requests;
item ["bytes_in"] = stat.bytes_in;
item ["bytes_out"] = stat.bytes_out;
if (stat.ec)
item ["error"] = stat.ec.message();
} }
} }
} }
@@ -117,91 +76,18 @@ ServerImpl::onWrite (beast::PropertyStream::Map& map)
void void
ServerImpl::close() ServerImpl::close()
{ {
Handler* h = nullptr; ios_.close(
[&]
{ {
std::lock_guard<std::mutex> lock(mutex_); work_ = boost::none;
if (work_) handler_.onStopped(*this);
{ });
work_ = boost::none;
// Close all Door objects
if (accepting_ == 0)
{
std::swap (h, handler_);
}
else
{
for(auto& door : list_)
door->close();
}
}
}
if (h)
h->onStopped(*this);
}
//--------------------------------------------------------------------------
void
ServerImpl::remove()
{
Handler* h = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
if(--accepting_ == 0)
{
cond_.notify_all();
std::swap (h, handler_);
}
}
if (h)
h->onStopped(*this);
} }
bool bool
ServerImpl::closed() ServerImpl::closed()
{ {
std::lock_guard<std::mutex> lock(mutex_); return ios_.closed();
return ! work_;
}
void
ServerImpl::report (Stat&& stat)
{
int const bucket = ceil_log2 (stat.requests);
std::lock_guard <std::mutex> lock (mutex_);
++hist_[bucket];
high_ = std::max (high_, bucket);
if (stats_.size() >= historySize)
stats_.pop_back();
stats_.emplace_front (std::move(stat));
}
//--------------------------------------------------------------------------
int
ServerImpl::ceil_log2 (unsigned long long x)
{
static const unsigned long long t[6] = {
0xFFFFFFFF00000000ull,
0x00000000FFFF0000ull,
0x000000000000FF00ull,
0x00000000000000F0ull,
0x000000000000000Cull,
0x0000000000000002ull
};
int y = (((x & (x - 1)) == 0) ? 0 : 1);
int j = 32;
int i;
for(i = 0; i < 6; i++) {
int k = (((x & t[i]) == 0) ? 0 : j);
y += k;
x >>= k;
j >>= 1;
}
return y;
} }
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
@@ -214,4 +100,3 @@ make_Server (Handler& handler,
} }
} // ripple } // ripple

View File

@@ -23,6 +23,7 @@
#include <ripple/basics/chrono.h> #include <ripple/basics/chrono.h>
#include <ripple/server/Handler.h> #include <ripple/server/Handler.h>
#include <ripple/server/Server.h> #include <ripple/server/Server.h>
#include <ripple/server/impl/io_list.h>
#include <beast/intrusive/List.h> #include <beast/intrusive/List.h>
#include <beast/threads/Thread.h> #include <beast/threads/Thread.h>
#include <boost/asio.hpp> #include <boost/asio.hpp>
@@ -40,25 +41,8 @@ namespace ripple {
class BasicPeer; class BasicPeer;
class Door; class Door;
struct Stat
{
std::size_t id;
std::chrono::seconds elapsed;
int requests;
std::size_t bytes_in;
std::size_t bytes_out;
boost::system::error_code ec;
};
class ServerImpl : public Server class ServerImpl : public Server
{ {
public:
struct Child
{
virtual ~Child() = default;
virtual void close() = 0;
};
private: private:
using clock_type = std::chrono::system_clock; using clock_type = std::chrono::system_clock;
@@ -69,20 +53,19 @@ private:
using Doors = std::vector <std::shared_ptr<Door>>; using Doors = std::vector <std::shared_ptr<Door>>;
Handler* handler_; Handler& handler_;
beast::Journal journal_; beast::Journal j_;
boost::asio::io_service& io_service_; boost::asio::io_service& io_service_;
boost::asio::io_service::strand strand_; boost::asio::io_service::strand strand_;
boost::optional <boost::asio::io_service::work> work_; boost::optional <boost::asio::io_service::work> work_;
std::mutex mutable mutex_; std::mutex m_;
std::condition_variable cond_; std::vector<Port> ports_;
std::vector<std::unique_ptr<Door>> list_; std::vector<std::weak_ptr<Door>> list_;
std::size_t accepting_ = 0;
std::deque <Stat> stats_;
int high_ = 0; int high_ = 0;
std::array <std::size_t, 64> hist_; std::array <std::size_t, 64> hist_;
io_list ios_;
public: public:
ServerImpl (Handler& handler, ServerImpl (Handler& handler,
@@ -93,40 +76,31 @@ public:
beast::Journal beast::Journal
journal() override journal() override
{ {
return journal_; return j_;
} }
void void
ports (std::vector<Port> const& ports) override; ports (std::vector<Port> const& ports) override;
void
onWrite (beast::PropertyStream::Map& map) override;
void void
close() override; close() override;
public: io_list&
Handler* ios()
handler()
{ {
return handler_; return ios_;
} }
public:
boost::asio::io_service& boost::asio::io_service&
get_io_service() get_io_service()
{ {
return io_service_; return io_service_;
} }
void
remove();
bool bool
closed(); closed();
void
report (Stat&& stat);
private: private:
static static
int int

View File

@@ -0,0 +1,272 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_IO_LIST_H_INCLUDED
#define RIPPLE_SERVER_IO_LIST_H_INCLUDED
#include <boost/container/flat_map.hpp>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <type_traits>
#include <utility>
namespace ripple {
/** Manages a set of objects performing asynchronous I/O. */
class io_list final
{
public:
class work
{
template<class = void>
void destroy();
friend class io_list;
io_list* ios_ = nullptr;
public:
virtual ~work()
{
destroy();
}
/** Return the io_list associated with the work.
Requirements:
The call to io_list::emplace to
create the work has already returned.
*/
io_list&
ios()
{
return *ios_;
}
virtual void close() = 0;
};
private:
template<class = void>
void destroy();
std::mutex m_;
std::size_t n_ = 0;
bool closed_ = false;
std::condition_variable cv_;
boost::container::flat_map<work*,
std::weak_ptr<work>> map_;
std::function<void(void)> f_;
public:
io_list() = default;
/** Destroy the list.
Effects:
Closes the io_list if it was not previously
closed. No finisher is invoked in this case.
Blocks until all work is destroyed.
*/
~io_list()
{
destroy();
}
/** Return `true` if the list is closed.
Thread Safety:
Undefined result if called concurrently
with close().
*/
bool
closed() const
{
return closed_;
}
/** Create associated work if not closed.
Requirements:
`std::is_base_of_v<work, T> == true`
Thread Safety:
May be called concurrently.
Effects:
Atomically creates, inserts, and returns new
work T, or returns nullptr if the io_list is
closed,
If the call succeeds and returns a new object,
it is guaranteed that a subsequent call to close
will invoke work::close on the object.
*/
template <class T, class... Args>
std::shared_ptr<T>
emplace(Args&&... args);
/** Cancel active I/O.
Thread Safety:
May not be called concurrently.
Effects:
Associated work is closed.
Finisher if provided, will be called when
all associated work is destroyed. The finisher
may be called from a foreign thread, or within
the call to this function.
Only the first call to close will set the
finisher.
No effect after the first call.
*/
template<class Finisher>
void
close(Finisher&& f);
void
close()
{
close([]{});
}
/** Block until the io_list stops.
Effects:
The caller is blocked until the io_list is
closed and all associated work is destroyed.
Thread safety:
May be called concurrently.
Preconditions:
No call to io_service::run on any io_service
used by work objects associated with this io_list
exists in the caller's call stack.
*/
template<class = void>
void
join();
};
//------------------------------------------------------------------------------
template<class>
void
io_list::work::destroy()
{
if(! ios_)
return;
std::function<void(void)> f;
{
std::lock_guard<
std::mutex> lock(ios_->m_);
ios_->map_.erase(this);
if(--ios_->n_ == 0 &&
ios_->closed_)
{
std::swap(f, ios_->f_);
ios_->cv_.notify_all();
}
}
if(f)
f();
}
template<class>
void
io_list::destroy()
{
close();
join();
}
template <class T, class... Args>
std::shared_ptr<T>
io_list::emplace(Args&&... args)
{
static_assert(std::is_base_of<work, T>::value,
"T must derive from io_list::work");
if(closed_)
return nullptr;
auto sp = std::make_shared<T>(
std::forward<Args>(args)...);
decltype(sp) dead;
std::lock_guard<std::mutex> lock(m_);
if(! closed_)
{
++n_;
sp->work::ios_ = this;
map_.emplace(sp.get(), sp);
}
else
{
std::swap(sp, dead);
}
return sp;
}
template<class Finisher>
void
io_list::close(Finisher&& f)
{
std::unique_lock<std::mutex> lock(m_);
if(closed_)
return;
closed_ = true;
auto map = std::move(map_);
if(! map.empty())
{
f_ = std::forward<Finisher>(f);
lock.unlock();
for(auto const& p : map)
if(auto sp = p.second.lock())
sp->close();
}
else
{
lock.unlock();
f();
}
}
template<class>
void
io_list::join()
{
std::unique_lock<std::mutex> lock(m_);
cv_.wait(lock,
[&]
{
return closed_ && n_ == 0;
});
}
} // ripple
#endif

View File

@@ -361,7 +361,7 @@ public:
}; };
NullHandler h; NullHandler h;
for(int i = 0; i < 10000; ++i) for(int i = 0; i < 1000; ++i)
{ {
TestThread thread; TestThread thread;
auto s = make_Server(h, auto s = make_Server(h,