Refactor HTTP::Server to support Universal Port:

These changes are necessary to support the Universal port feature. Synopsis:

* Persist HTTP peer io_service::work lifetime:
This simplification eliminates any potential for bugs caused by incorrect
lifetime management of the io_service::work object.

* Restructure Door to prevent data races, and handle clean exit:
The Server, Door, Door::detector, and Peer objects work together to
correctly implement graceful stop and destructors that block until
all child objects have been destroyed.

Cleanups:
* De-pimpl HTTP::Server
* Rename ServerImpl data members
* Tidy up HTTP::Port interface
This commit is contained in:
Vinnie Falco
2014-10-15 00:41:10 -07:00
parent 2fd139b307
commit dbdf68b248
15 changed files with 699 additions and 764 deletions

View File

@@ -37,7 +37,8 @@ ServerHandlerImp::ServerHandlerImp (Stoppable& parent, JobQueue& jobQueue,
, m_journal (deprecatedLogs().journal("Server"))
, m_jobQueue (jobQueue)
, m_networkOPs (networkOPs)
, m_server (*this, deprecatedLogs().journal("Server"))
, m_server (HTTP::make_Server(
*this, deprecatedLogs().journal("Server")))
, setup_ (setup)
{
if (setup_.secure)
@@ -49,7 +50,7 @@ ServerHandlerImp::ServerHandlerImp (Stoppable& parent, JobQueue& jobQueue,
ServerHandlerImp::~ServerHandlerImp()
{
m_server.stop();
m_server = nullptr;
}
void
@@ -77,9 +78,9 @@ ServerHandlerImp::setup (beast::Journal journal)
port.port = ep.port();
port.context = m_context.get ();
HTTP::Ports ports;
ports.push_back (port);
m_server.setPorts (ports);
std::vector<HTTP::Port> list;
list.push_back (port);
m_server->ports(list);
}
}
else
@@ -93,7 +94,7 @@ ServerHandlerImp::setup (beast::Journal journal)
void
ServerHandlerImp::onStop()
{
m_server.stopAsync();
m_server->close();
}
//--------------------------------------------------------------------------
@@ -256,7 +257,7 @@ ServerHandlerImp::processRequest (std::string const& request,
void
ServerHandlerImp::onWrite (beast::PropertyStream::Map& map)
{
m_server.onWrite (map);
m_server->onWrite (map);
}
//------------------------------------------------------------------------------

View File

@@ -38,7 +38,7 @@ private:
beast::Journal m_journal;
JobQueue& m_jobQueue;
NetworkOPs& m_networkOPs;
HTTP::Server m_server;
std::unique_ptr<HTTP::Server> m_server;
std::unique_ptr <RippleSSLContext> m_context;
RPC::Setup setup_;

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_HTTP_SERVER_H_INCLUDED
#define RIPPLE_HTTP_SERVER_H_INCLUDED
#include <ripple/basics/BasicConfig.h>
#include <ripple/common/RippleSSLContext.h>
#include <beast/net/IPEndpoint.h>
#include <beast/utility/Journal.h>
@@ -44,24 +45,20 @@ struct Port
require_ssl
};
Security security;
std::uint16_t port;
Security security = Security::no_ssl;
std::uint16_t port = 0;
beast::IP::Endpoint addr;
SSLContext* context;
SSLContext* context = nullptr;
Port ();
Port (Port const& other);
Port& operator= (Port const& other);
Port() = default;
Port (std::uint16_t port_, beast::IP::Endpoint const& addr_,
Security security_, SSLContext* context_);
Security security_, SSLContext* context_);
static
void
parse (Port& result, Section const& section, std::ostream& log);
};
bool operator== (Port const& lhs, Port const& rhs);
bool operator< (Port const& lhs, Port const& rhs);
/** A set of listening ports settings. */
typedef std::vector <Port> Ports;
//------------------------------------------------------------------------------
class Server;
@@ -92,65 +89,57 @@ struct Handler
//------------------------------------------------------------------------------
class ServerImpl;
/** Multi-threaded, asynchronous HTTP server. */
class Server
{
public:
/** Create the server using the specified handler. */
Server (Handler& handler, beast::Journal journal);
/** Destroy the server.
This blocks until the server stops.
The server is closed if it is not already closed. This call
blocks until the server has stopped.
*/
virtual
~Server ();
~Server() = default;
/** Returns the Journal associated with the server. */
virtual
beast::Journal
journal () const;
journal() = 0;
/** Returns the listening ports settings.
Thread safety:
Safe to call from any thread.
Cannot be called concurrently with setPorts.
*/
Ports const&
getPorts () const;
/** Set the listening ports settings.
These take effect immediately. Any current ports that are not in the
new set will be closed. Established connections will not be disturbed.
Thread safety:
Cannot be called concurrently.
/** Set the listening port settings.
This may only be called once.
*/
virtual
void
setPorts (Ports const& ports);
ports (std::vector<Port> const& v) = 0;
/** Notify the server to stop, without blocking.
virtual
void
onWrite (beast::PropertyStream::Map& map) = 0;
/** Close the server.
The close is performed asynchronously. The handler will be notified
when the server has stopped. The server is considered stopped when
there are no pending I/O completion handlers and all connections
have closed.
Thread safety:
Safe to call concurrently from any thread.
*/
virtual
void
stopAsync ();
close() = 0;
/** Notify the server to stop, and block until the stop is complete.
The handler's onStopped method will be called when the stop completes.
Thread safety:
Cannot be called concurrently.
Cannot be called from the thread of execution of any Handler functions.
*/
void
stop ();
void
onWrite (beast::PropertyStream::Map& map);
private:
std::unique_ptr <ServerImpl> m_impl;
/** Parse configuration settings into a list of ports. */
static
std::vector<Port>
parse (BasicConfig const& config, std::ostream& log);
};
/** Create the HTTP server using the specified handler. */
std::unique_ptr<Server>
make_Server (Handler& handler, beast::Journal journal);
//------------------------------------------------------------------------------
} // HTTP
} // ripple

View File

@@ -18,15 +18,13 @@
//==============================================================================
#include <ripple/http/impl/Door.h>
#include <ripple/http/impl/Peer.h>
#include <ripple/http/impl/PlainPeer.h>
#include <ripple/http/impl/SSLPeer.h>
#include <boost/asio/buffer.hpp>
#include <beast/asio/placeholders.h>
#include <beast/asio/ssl_bundle.h>
#include <boost/logic/tribool.hpp>
#include <functional>
#include <beast/streams/debug_ostream.h>
namespace ripple {
namespace HTTP {
@@ -83,123 +81,198 @@ detect_ssl (Socket& socket, StreamBuf& buf, Yield yield)
//------------------------------------------------------------------------------
Door::connection::connection (Door& door, socket_type&& socket,
Door::detector::detector (Door& door, socket_type&& socket,
endpoint_type endpoint)
: door_ (door)
, socket_ (std::move(socket))
, endpoint_ (endpoint)
, strand_ (door.io_service_)
, timer_ (door.io_service_)
, timer_ (socket_.get_io_service())
, remote_endpoint_ (endpoint)
{
door_.add(*this);
}
// Work-around because we can't call shared_from_this in ctor
void
Door::connection::run()
Door::detector::~detector()
{
boost::asio::spawn (strand_, std::bind (&connection::do_detect,
door_.remove(*this);
}
void
Door::detector::run()
{
boost::asio::spawn (door_.strand_, std::bind (&detector::do_detect,
shared_from_this(), std::placeholders::_1));
boost::asio::spawn (strand_, std::bind (&connection::do_timer,
boost::asio::spawn (door_.strand_, std::bind (&detector::do_timer,
shared_from_this(), std::placeholders::_1));
}
void
Door::connection::do_timer (yield_context yield)
Door::detector::close()
{
error_code ec;
socket_.close(ec);
timer_.cancel(ec);
}
void
Door::detector::do_timer (yield_context yield)
{
error_code ec; // ignored
while (socket_.is_open())
{
timer_.async_wait (yield[ec]);
if (timer_.expires_from_now() <= std::chrono::seconds(0))
socket_.close();
socket_.close(ec);
}
}
void
Door::connection::do_detect (boost::asio::yield_context yield)
Door::detector::do_detect (boost::asio::yield_context yield)
{
bool ssl;
error_code ec;
boost::asio::streambuf buf;
timer_.expires_from_now (std::chrono::seconds(15));
std::tie(ec, ssl) = detect_ssl (socket_, buf, yield);
beast::asio::streambuf buf(16);
timer_.expires_from_now(std::chrono::seconds(15));
std::tie(ec, ssl) = detect_ssl(socket_, buf, yield);
error_code unused;
timer_.cancel(unused);
if (! ec)
{
if (ssl)
{
auto const peer = std::make_shared <SSLPeer> (door_.server_,
door_.port_, door_.server_.journal(), endpoint_,
buf.data(), std::move(socket_));
peer->accept();
return;
}
auto const peer = std::make_shared <PlainPeer> (door_.server_,
door_.port_, door_.server_.journal(), endpoint_,
buf.data(), std::move(socket_));
peer->accept();
return;
}
socket_.close();
timer_.cancel();
return door_.create(ssl, std::move(buf),
std::move(socket_), remote_endpoint_);
if (ec != boost::asio::error::operation_aborted)
if (door_.server_.journal().trace) door_.server_.journal().trace <<
"Error detecting ssl: " << ec.message() <<
" from " << remote_endpoint_;
}
//------------------------------------------------------------------------------
Door::Door (boost::asio::io_service& io_service,
ServerImpl& impl, Port const& port)
: io_service_ (io_service)
, timer_ (io_service)
, acceptor_ (io_service, to_asio (port))
, port_ (port)
, server_ (impl)
ServerImpl& server, Port const& port)
: port_(port)
, server_(server)
, acceptor_(io_service, to_asio(port), true)
, strand_(io_service)
{
server_.add (*this);
error_code ec;
acceptor_.set_option (acceptor::reuse_address (true), ec);
if (ec)
{
server_.journal().error <<
if (server_.journal().error) server_.journal().error <<
"Error setting acceptor socket option: " << ec.message();
}
if (! ec)
{
server_.journal().info << "Bound to endpoint " <<
to_string (acceptor_.local_endpoint());
if (server_.journal().info) server_.journal().info <<
"Bound to endpoint " << to_string (acceptor_.local_endpoint());
}
else
{
server_.journal().error << "Error binding to endpoint " <<
to_string (acceptor_.local_endpoint()) <<
", '" << ec.message() << "'";
if (server_.journal().error) server_.journal().error <<
"Error binding to endpoint " <<
to_string (acceptor_.local_endpoint()) <<
", '" << ec.message() << "'";
}
}
Door::~Door ()
Door::~Door()
{
{
// Block until all detector, Peer objects destroyed
std::unique_lock<std::mutex> lock(mutex_);
while (! list_.empty())
cond_.wait(lock);
}
server_.remove (*this);
}
void
Door::listen()
Door::run()
{
boost::asio::spawn (io_service_, std::bind (&Door::do_accept,
boost::asio::spawn (strand_, std::bind (&Door::do_accept,
shared_from_this(), std::placeholders::_1));
}
void
Door::cancel ()
Door::close()
{
acceptor_.cancel();
if (! strand_.running_in_this_thread())
return strand_.post(std::bind(
&Door::close, shared_from_this()));
error_code ec;
acceptor_.close(ec);
// Close all detector, Peer objects
for(auto& _ : list_)
_.close();
}
//------------------------------------------------------------------------------
void
Door::add (Child& c)
{
std::lock_guard<std::mutex> lock(mutex_);
list_.push_back(c);
}
void
Door::remove (Child& c)
{
std::lock_guard<std::mutex> lock(mutex_);
list_.erase(list_.iterator_to(c));
if (list_.empty())
cond_.notify_all();
}
void
Door::create (bool ssl, beast::asio::streambuf&& buf,
socket_type&& socket, endpoint_type remote_address)
{
if (server_.closed())
return;
error_code ec;
switch (port_.security)
{
case Port::Security::no_ssl:
if (ssl)
ec = boost::system::errc::make_error_code (
boost::system::errc::invalid_argument);
case Port::Security::require_ssl:
if (! ssl)
ec = boost::system::errc::make_error_code (
boost::system::errc::invalid_argument);
case Port::Security::allow_ssl:
if (! ec)
{
if (ssl)
{
auto const peer = std::make_shared <SSLPeer> (*this,
server_.journal(), remote_address, buf.data(),
std::move(socket));
peer->accept();
return;
}
auto const peer = std::make_shared <PlainPeer> (*this,
server_.journal(), remote_address, buf.data(),
std::move(socket));
peer->accept();
return;
}
break;
}
if (ec)
if (server_.journal().trace) server_.journal().trace <<
"Error detecting ssl: " << ec.message() <<
" from " << remote_address;
}
void
Door::do_accept (boost::asio::yield_context yield)
{
@@ -207,33 +280,35 @@ Door::do_accept (boost::asio::yield_context yield)
{
error_code ec;
endpoint_type endpoint;
socket_type socket (io_service_);
socket_type socket (acceptor_.get_io_service());
acceptor_.async_accept (socket, endpoint, yield[ec]);
if (server_.closed())
break;
if (ec)
{
if (ec != boost::asio::error::operation_aborted)
server_.journal().error <<
if (server_.journal().error) server_.journal().error <<
"accept: " << ec.message();
break;
}
if (port_.security == Port::Security::no_ssl)
{
auto const peer = std::make_shared <PlainPeer> (server_,
port_, server_.journal(), endpoint,
boost::asio::null_buffers(), std::move(socket));
auto const peer = std::make_shared <PlainPeer> (*this,
server_.journal(), endpoint, boost::asio::null_buffers(),
std::move(socket));
peer->accept();
}
else if (port_.security == Port::Security::require_ssl)
{
auto const peer = std::make_shared <SSLPeer> (server_,
port_, server_.journal(), endpoint,
boost::asio::null_buffers(), std::move(socket));
auto const peer = std::make_shared <SSLPeer> (*this,
server_.journal(), endpoint, boost::asio::null_buffers(),
std::move(socket));
peer->accept();
}
else
{
auto const c = std::make_shared <connection> (
auto const c = std::make_shared <detector> (
*this, std::move(socket), endpoint);
c->run();
}

View File

@@ -22,19 +22,23 @@
#include <ripple/http/impl/ServerImpl.h>
#include <ripple/http/impl/Types.h>
#include <beast/asio/streambuf.h>
#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/intrusive/list.hpp>
#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
namespace ripple {
namespace HTTP {
/** A listening socket. */
class Door
: public beast::List <Door>::Node
: public ServerImpl::Child
, public std::enable_shared_from_this <Door>
{
private:
@@ -47,17 +51,58 @@ private:
using endpoint_type = protocol_type::endpoint;
using socket_type = protocol_type::socket;
boost::asio::io_service& io_service_;
boost::asio::basic_waitable_timer <clock_type> timer_;
acceptor_type acceptor_;
// Detects SSL on a socket
class detector
: public std::enable_shared_from_this <detector>
, public ServerImpl::Child
{
private:
Door& door_;
socket_type socket_;
timer_type timer_;
endpoint_type remote_endpoint_;
public:
detector (Door& door, socket_type&& socket,
endpoint_type endpoint);
~detector();
void run();
void close();
private:
void do_timer (yield_context yield);
void do_detect (yield_context yield);
};
using list_type = boost::intrusive::make_list <Child,
boost::intrusive::constant_time_size <false>>::type;
Port port_;
ServerImpl& server_;
acceptor_type acceptor_;
boost::asio::io_service::strand strand_;
std::mutex mutex_;
std::condition_variable cond_;
list_type list_;
public:
Door (boost::asio::io_service& io_service,
ServerImpl& impl, Port const& port);
ServerImpl& server, Port const& port);
~Door ();
/** Destroy the door.
Blocks until there are no pending I/O completion
handlers, and all connections have been destroyed.
close() must be called before the destructor.
*/
~Door();
ServerImpl&
server()
{
return server_;
}
Port const&
port() const
@@ -65,34 +110,24 @@ public:
return port_;
}
void listen();
void cancel();
// Work-around because we can't call shared_from_this in ctor
void run();
void add (Child& c);
void remove (Child& c);
/** Close the Door listening socket and connections.
The listening socket is closed, and all open connections
belonging to the Door are closed.
Thread Safety:
May be called concurrently
*/
void close();
private:
class connection
: public std::enable_shared_from_this <connection>
{
private:
Door& door_;
socket_type socket_;
endpoint_type endpoint_;
boost::asio::io_service::strand strand_;
timer_type timer_;
public:
connection (Door& door, socket_type&& socket,
endpoint_type endpoint);
void
run();
private:
void
do_timer (yield_context yield);
void
do_detect (yield_context yield);
};
void create (bool ssl, beast::asio::streambuf&& buf,
socket_type&& socket, endpoint_type remote_address);
void do_accept (yield_context yield);
};

View File

@@ -28,7 +28,7 @@
#include <beast/asio/ssl.h> // for is_short_read?
#include <beast/http/message.h>
#include <beast/http/parser.h>
#include <beast/module/core/core.h>
#include <beast/module/core/time/Time.h>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/streambuf.hpp>
@@ -45,21 +45,10 @@
namespace ripple {
namespace HTTP {
//------------------------------------------------------------------------------
class BasicPeer
: public beast::List <BasicPeer>::Node
{
public:
virtual ~BasicPeer() = default;
};
//------------------------------------------------------------------------------
/** Represents an active connection. */
template <class Impl>
class Peer
: public BasicPeer
: public ServerImpl::Child
, public Session
{
protected:
@@ -92,11 +81,12 @@ protected:
std::size_t used;
};
beast::Journal journal_;
ServerImpl& server_;
Door& door_;
boost::asio::io_service::work work_;
boost::asio::io_service::strand strand_;
waitable_timer timer_;
endpoint_type endpoint_;
beast::Journal journal_;
std::string id_;
std::size_t nid_;
@@ -109,7 +99,6 @@ protected:
bool graceful_ = false;
bool complete_ = false;
std::shared_ptr <Peer> detach_ref_;
boost::optional <boost::asio::io_service::work> work_;
boost::system::error_code ec_;
clock_type::time_point when_;
@@ -122,8 +111,9 @@ protected:
public:
template <class ConstBufferSequence>
Peer (ServerImpl& impl, Port const& port, beast::Journal journal,
endpoint_type endpoint, ConstBufferSequence const& buffers);
Peer (Door& door, boost::asio::io_service& io_service,
beast::Journal journal, endpoint_type endpoint,
ConstBufferSequence const& buffers);
virtual
~Peer();
@@ -134,6 +124,8 @@ public:
return *this;
}
void close() override;
protected:
Impl&
impl()
@@ -172,7 +164,7 @@ protected:
beast::Journal
journal() override
{
return server_.journal();
return door_.server().journal();
}
beast::IP::Endpoint
@@ -208,198 +200,26 @@ protected:
//------------------------------------------------------------------------------
class PlainPeer
: public Peer <PlainPeer>
, public std::enable_shared_from_this <PlainPeer>
{
private:
friend class Peer <PlainPeer>;
using socket_type = boost::asio::ip::tcp::socket;
socket_type socket_;
public:
template <class ConstBufferSequence>
PlainPeer (ServerImpl& impl, Port const& port, beast::Journal journal,
endpoint_type endpoint, ConstBufferSequence const& buffers,
socket_type&& socket);
void
accept();
private:
void
do_request();
void
do_close();
};
template <class ConstBufferSequence>
PlainPeer::PlainPeer (ServerImpl& server, Port const& port,
beast::Journal journal, endpoint_type endpoint,
ConstBufferSequence const& buffers,
boost::asio::ip::tcp::socket&& socket)
: Peer (server, port, journal, endpoint, buffers)
, socket_(std::move(socket))
{
}
void
PlainPeer::accept ()
{
server_.handler().onAccept (session());
if (! socket_.is_open())
return;
boost::asio::spawn (strand_, std::bind (&PlainPeer::do_read,
shared_from_this(), std::placeholders::_1));
}
void
PlainPeer::do_request()
{
// Perform half-close when Connection: close and not SSL
error_code ec;
if (! message_.keep_alive())
socket_.shutdown (socket_type::shutdown_receive, ec);
if (! ec)
{
++request_count_;
server_.handler().onRequest (session());
return;
}
if (ec)
fail (ec, "request");
}
void
PlainPeer::do_close()
{
error_code ec;
socket_.shutdown (socket_type::shutdown_send, ec);
}
//------------------------------------------------------------------------------
class SSLPeer
: public Peer <SSLPeer>
, public std::enable_shared_from_this <SSLPeer>
{
private:
friend class Peer <SSLPeer>;
using next_layer_type = boost::asio::ip::tcp::socket;
using socket_type = boost::asio::ssl::stream <next_layer_type&>;
next_layer_type next_layer_;
socket_type socket_;
public:
template <class ConstBufferSequence>
SSLPeer (ServerImpl& impl, Port const& port, beast::Journal journal,
endpoint_type endpoint, ConstBufferSequence const& buffers,
next_layer_type&& socket);
void
accept();
private:
void
do_handshake (boost::asio::yield_context yield);
void
do_request();
void
do_close();
void
on_shutdown (error_code ec);
};
template <class ConstBufferSequence>
SSLPeer::SSLPeer (ServerImpl& server, Port const& port,
beast::Journal journal, endpoint_type endpoint,
ConstBufferSequence const& buffers,
boost::asio::ip::tcp::socket&& socket)
: Peer (server, port, journal, endpoint, buffers)
, next_layer_ (std::move(socket))
, socket_ (next_layer_, port.context->get())
{
}
// Called when the acceptor accepts our socket.
void
SSLPeer::accept ()
{
server_.handler().onAccept (session());
if (! next_layer_.is_open())
return;
boost::asio::spawn (strand_, std::bind (&SSLPeer::do_handshake,
shared_from_this(), std::placeholders::_1));
}
void
SSLPeer::do_handshake (boost::asio::yield_context yield)
{
error_code ec;
std::size_t const bytes_transferred = socket_.async_handshake (
socket_type::server, read_buf_.data(), yield[ec]);
if (ec)
return fail (ec, "handshake");
read_buf_.consume (bytes_transferred);
boost::asio::spawn (strand_, std::bind (&SSLPeer::do_read,
shared_from_this(), std::placeholders::_1));
}
void
SSLPeer::do_request()
{
++request_count_;
server_.handler().onRequest (session());
}
void
SSLPeer::do_close()
{
error_code ec;
socket_.async_shutdown (strand_.wrap (std::bind (
&SSLPeer::on_shutdown, shared_from_this(),
std::placeholders::_1)));
}
void
SSLPeer::on_shutdown (error_code ec)
{
socket_.next_layer().close(ec);
}
//------------------------------------------------------------------------------
template <class Impl>
template <class ConstBufferSequence>
Peer<Impl>::Peer (ServerImpl& server, Port const& port,
beast::Journal journal, endpoint_type endpoint,
ConstBufferSequence const& buffers)
: journal_ (journal)
, server_ (server)
, strand_ (server_.get_io_service())
, timer_ (server_.get_io_service())
Peer<Impl>::Peer (Door& door, boost::asio::io_service& io_service,
beast::Journal journal, endpoint_type endpoint,
ConstBufferSequence const& buffers)
: door_(door)
, work_ (io_service)
, strand_ (io_service)
, timer_ (io_service)
, endpoint_ (endpoint)
, journal_ (journal)
{
read_buf_.commit (boost::asio::buffer_copy (read_buf_.prepare (
door_.add(*this);
read_buf_.commit(boost::asio::buffer_copy(read_buf_.prepare (
boost::asio::buffer_size (buffers)), buffers));
static std::atomic <int> sid;
nid_ = ++sid;
id_ = std::string("#") + std::to_string(nid_) + " ";
server_.add (*this);
if (journal_.trace) journal_.trace << id_ <<
"accept: " << endpoint.address();
when_ = clock_type::now();
when_str_ = beast::Time::getCurrentTime().formatted (
"%Y-%b-%d %H:%M:%S").toStdString();
@@ -417,15 +237,25 @@ Peer<Impl>::~Peer()
stat.bytes_in = bytes_in_;
stat.bytes_out = bytes_out_;
stat.ec = std::move (ec_);
server_.report (std::move (stat));
server_.handler().onClose (session(), ec_);
server_.remove (*this);
door_.server().report (std::move (stat));
door_.server().handler().onClose (session(), ec_);
if (journal_.trace) journal_.trace << id_ <<
"destroyed: " << request_count_ <<
((request_count_ == 1) ? " request" : " requests");
door_.remove (*this);
}
template <class Impl>
void
Peer<Impl>::close()
{
if (! strand_.running_in_this_thread())
return strand_.post(std::bind(
(void(Peer::*)(void))&Peer::close,
impl().shared_from_this()));
error_code ec;
timer_.cancel(ec);
impl().socket_.lowest_layer().close(ec);
}
//------------------------------------------------------------------------------
@@ -625,20 +455,9 @@ template <class Impl>
void
Peer<Impl>::detach ()
{
// Maintain an additional reference while detached
if (! detach_ref_)
{
assert (! work_);
// Maintain an additional reference while detached
detach_ref_ = impl().shared_from_this();
// Prevent the io_service from running out of work.
// The work object will be destroyed with the Peer
// after the Session is closed and handlers complete.
//
work_ = boost::in_place (std::ref (
server_.get_io_service()));
}
}
// Called to indicate the response has been written (but not sent)
@@ -647,12 +466,11 @@ void
Peer<Impl>::complete()
{
if (! strand_.running_in_this_thread())
return server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer<Impl>::complete, impl().shared_from_this())));
return strand_.post(std::bind (&Peer<Impl>::complete,
impl().shared_from_this()));
// Reattach
detach_ref_.reset();
work_ = boost::none;
message_ = beast::http::message{};
complete_ = true;
@@ -671,13 +489,12 @@ void
Peer<Impl>::close (bool graceful)
{
if (! strand_.running_in_this_thread())
return server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer<Impl>::close, impl().shared_from_this(),
graceful)));
return strand_.post(std::bind(
(void(Peer::*)(bool))&Peer<Impl>::close,
impl().shared_from_this(), graceful));
// Reattach
detach_ref_.reset();
work_ = boost::none;
complete_ = true;

View File

@@ -0,0 +1,104 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_HTTP_PLAINPEER_H_INCLUDED
#define RIPPLE_HTTP_PLAINPEER_H_INCLUDED
#include <ripple/http/impl/Peer.h>
namespace ripple {
namespace HTTP {
class PlainPeer
: public Peer <PlainPeer>
, public std::enable_shared_from_this <PlainPeer>
{
private:
friend class Peer <PlainPeer>;
using socket_type = boost::asio::ip::tcp::socket;
socket_type socket_;
public:
template <class ConstBufferSequence>
PlainPeer (Door& door, beast::Journal journal, endpoint_type endpoint,
ConstBufferSequence const& buffers, socket_type&& socket);
void
accept();
private:
void
do_request();
void
do_close();
};
//------------------------------------------------------------------------------
template <class ConstBufferSequence>
PlainPeer::PlainPeer (Door& door, beast::Journal journal,
endpoint_type endpoint, ConstBufferSequence const& buffers,
boost::asio::ip::tcp::socket&& socket)
: Peer (door, socket.get_io_service(), journal, endpoint, buffers)
, socket_(std::move(socket))
{
}
void
PlainPeer::accept ()
{
door_.server().handler().onAccept (session());
if (! socket_.is_open())
return;
boost::asio::spawn (strand_, std::bind (&PlainPeer::do_read,
shared_from_this(), std::placeholders::_1));
}
void
PlainPeer::do_request()
{
// Perform half-close when Connection: close and not SSL
error_code ec;
if (! message_.keep_alive())
socket_.shutdown (socket_type::shutdown_receive, ec);
if (! ec)
{
++request_count_;
door_.server().handler().onRequest (session());
return;
}
if (ec)
fail (ec, "request");
}
void
PlainPeer::do_close()
{
error_code ec;
socket_.shutdown (socket_type::shutdown_send, ec);
}
}
}
#endif

View File

@@ -20,30 +20,6 @@
namespace ripple {
namespace HTTP {
Port::Port ()
: security (Security::no_ssl)
, port (0)
, context (nullptr)
{
}
Port::Port (Port const& other)
: security (other.security)
, port (other.port)
, addr (other.addr)
, context (other.context)
{
}
Port& Port::operator= (Port const& other)
{
port = other.port;
addr = other.addr;
security = other.security;
context = other.context;
return *this;
}
Port::Port (std::uint16_t port_, beast::IP::Endpoint const& addr_,
Security security_, SSLContext* context_)
: security (security_)
@@ -53,37 +29,5 @@ Port::Port (std::uint16_t port_, beast::IP::Endpoint const& addr_,
{
}
bool operator== (Port const& lhs, Port const& rhs)
{
if (lhs.addr != rhs.addr)
return false;
if (lhs.port != rhs.port)
return false;
if (lhs.security != rhs.security)
return false;
// 'context' does not participate in the comparison
return true;
}
bool operator< (Port const& lhs, Port const& rhs)
{
if (lhs.addr > rhs.addr)
return false;
else if (lhs.addr < rhs.addr)
return true;
if (lhs.port > rhs.port)
return false;
else if (lhs.port < rhs.port)
return true;
if (lhs.security > rhs.security)
return false;
else if (lhs.security < rhs.security)
return true;
return true;
}
}
}

View File

@@ -0,0 +1,123 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_HTTP_SSLPEER_H_INCLUDED
#define RIPPLE_HTTP_SSLPEER_H_INCLUDED
#include <ripple/http/impl/Peer.h>
namespace ripple {
namespace HTTP {
class SSLPeer
: public Peer <SSLPeer>
, public std::enable_shared_from_this <SSLPeer>
{
private:
friend class Peer <SSLPeer>;
using next_layer_type = boost::asio::ip::tcp::socket;
using socket_type = boost::asio::ssl::stream <next_layer_type&>;
next_layer_type next_layer_;
socket_type socket_;
public:
template <class ConstBufferSequence>
SSLPeer (Door& door, beast::Journal journal, endpoint_type endpoint,
ConstBufferSequence const& buffers, next_layer_type&& socket);
void
accept();
private:
void
do_handshake (boost::asio::yield_context yield);
void
do_request();
void
do_close();
void
on_shutdown (error_code ec);
};
//------------------------------------------------------------------------------
template <class ConstBufferSequence>
SSLPeer::SSLPeer (Door& door, beast::Journal journal,
endpoint_type endpoint, ConstBufferSequence const& buffers,
boost::asio::ip::tcp::socket&& socket)
: Peer (door, socket.get_io_service(), journal, endpoint, buffers)
, next_layer_ (std::move(socket))
, socket_ (next_layer_, door_.port().context->get())
{
}
// Called when the acceptor accepts our socket.
void
SSLPeer::accept ()
{
door_.server().handler().onAccept (session());
if (! next_layer_.is_open())
return;
boost::asio::spawn (strand_, std::bind (&SSLPeer::do_handshake,
shared_from_this(), std::placeholders::_1));
}
void
SSLPeer::do_handshake (boost::asio::yield_context yield)
{
error_code ec;
std::size_t const bytes_transferred = socket_.async_handshake (
socket_type::server, read_buf_.data(), yield[ec]);
if (ec)
return fail (ec, "handshake");
read_buf_.consume (bytes_transferred);
boost::asio::spawn (strand_, std::bind (&SSLPeer::do_read,
shared_from_this(), std::placeholders::_1));
}
void
SSLPeer::do_request()
{
++request_count_;
door_.server().handler().onRequest (session());
}
void
SSLPeer::do_close()
{
error_code ec;
socket_.async_shutdown (strand_.wrap (std::bind (
&SSLPeer::on_shutdown, shared_from_this(),
std::placeholders::_1)));
}
void
SSLPeer::on_shutdown (error_code ec)
{
socket_.next_layer().close(ec);
}
}
}
#endif

View File

@@ -20,54 +20,54 @@
#include <ripple/http/Server.h>
#include <ripple/http/impl/ServerImpl.h>
#include <beast/cxx14/memory.h> // <memory>
#include <stdexcept>
namespace ripple {
namespace HTTP {
Server::Server (Handler& handler, beast::Journal journal)
: m_impl (std::make_unique <ServerImpl> (*this, handler, journal))
std::unique_ptr<Server>
make_Server (Handler& handler, beast::Journal journal)
{
return std::make_unique<ServerImpl>(handler, journal);
}
Server::~Server ()
{
stop();
}
beast::Journal
Server::journal () const
{
return m_impl->journal();
}
Ports const&
Server::getPorts () const
{
return m_impl->getPorts();
}
//------------------------------------------------------------------------------
void
Server::setPorts (Ports const& ports)
Port::parse (Port& port,
Section const& section, std::ostream& log)
{
m_impl->setPorts (ports);
}
void
Server::stopAsync ()
std::vector<Port>
Server::parse (BasicConfig const& config, std::ostream& log)
{
m_impl->stop(false);
}
std::vector <Port> result;
void
Server::stop ()
{
m_impl->stop(true);
}
if (! config.exists("doors"))
{
log <<
"Missing section: [doors]\n";
return result;
}
void
Server::onWrite (beast::PropertyStream::Map& map)
{
m_impl->onWrite (map);
Port common;
Port::parse (common, config["doors"], log);
auto const& names = config.section("doors").values();
for (auto const& name : names)
{
if (! config.exists(name))
{
log <<
"Missing section: [" << name << "]\n";
//throw std::
}
result.push_back(common);
Port::parse (result.back(), config[name], log);
}
return result;
}
}

View File

@@ -18,8 +18,10 @@
//==============================================================================
#include <ripple/http/impl/ServerImpl.h>
#include <ripple/http/impl/Peer.h>
#include <beast/chrono/chrono_io.h>
#include <boost/chrono/chrono_io.hpp>
#include <cassert>
#include <ctime>
#include <iomanip>
#include <iostream>
@@ -30,110 +32,129 @@
namespace ripple {
namespace HTTP {
ServerImpl::ServerImpl (Server& server,
Handler& handler, beast::Journal journal)
: m_server (server)
, m_handler (handler)
ServerImpl::ServerImpl (Handler& handler, beast::Journal journal)
: handler_ (handler)
, journal_ (journal)
, m_strand (io_service_)
, m_work (boost::in_place (std::ref (io_service_)))
, m_stopped (true)
, strand_ (io_service_)
, work_ (boost::in_place (std::ref (io_service_)))
, hist_{}
{
thread_ = std::thread (std::bind (
&ServerImpl::run, this));
}
ServerImpl::~ServerImpl ()
ServerImpl::~ServerImpl()
{
close();
{
// Block until all Door objects destroyed
std::unique_lock<std::mutex> lock(mutex_);
while (! list_.empty())
cond_.wait(lock);
}
thread_.join();
}
Ports const&
ServerImpl::getPorts () const
void
ServerImpl::ports (std::vector<Port> const& ports)
{
std::lock_guard <std::mutex> lock (mutex_);
return state_.ports;
if (closed())
throw std::logic_error("ports() on closed HTTP::Server");
for(auto const& _ : ports)
std::make_shared<Door>(
io_service_, *this, _)->run();
}
void
ServerImpl::setPorts (Ports const& ports)
ServerImpl::onWrite (beast::PropertyStream::Map& map)
{
std::lock_guard <std::mutex> lock (mutex_);
state_.ports = ports;
update();
}
bool
ServerImpl::stopping () const
{
return ! m_work;
}
void
ServerImpl::stop (bool wait)
{
if (! stopping())
map ["active"] = list_.size();
{
m_work = boost::none;
update();
std::string s;
for (int i = 0; i <= high_; ++i)
{
if (i)
s += ", ";
s += std::to_string (hist_[i]);
}
map ["hist"] = s;
}
{
beast::PropertyStream::Set set ("history", map);
for (auto const& stat : stats_)
{
beast::PropertyStream::Map item (set);
if (wait)
m_stopped.wait();
item ["id"] = stat.id;
item ["when"] = stat.when;
{
std::stringstream ss;
ss << stat.elapsed;
item ["elapsed"] = ss.str();
}
item ["requests"] = stat.requests;
item ["bytes_in"] = stat.bytes_in;
item ["bytes_out"] = stat.bytes_out;
if (stat.ec)
item ["error"] = stat.ec.message();
}
}
}
void
ServerImpl::close()
{
std::lock_guard<std::mutex> lock(mutex_);
if (work_)
{
work_ = boost::none;
// Close all Door objects
for(auto& _ :list_)
_.close();
}
}
//--------------------------------------------------------------------------
//
// Server
//
Handler&
ServerImpl::handler()
{
return m_handler;
}
boost::asio::io_service&
ServerImpl::get_io_service()
{
return io_service_;
}
// Inserts the peer into our list of peers. We only remove it
// from the list inside the destructor of the Peer object. This
// way, the Peer can never outlive the server.
//
void
ServerImpl::add (BasicPeer& peer)
ServerImpl::add (Child& child)
{
std::lock_guard <std::mutex> lock (mutex_);
state_.peers.push_back (peer);
std::lock_guard<std::mutex> lock(mutex_);
list_.push_back(child);
}
void
ServerImpl::add (Door& door)
ServerImpl::remove (Child& child)
{
std::lock_guard <std::mutex> lock (mutex_);
state_.doors.push_back (door);
std::lock_guard<std::mutex> lock(mutex_);
list_.erase(list_.iterator_to(child));
if (list_.empty())
{
handler_.onStopped(*this);
cond_.notify_all();
}
}
// Removes the peer from our list of peers. This is only called from
// the destructor of Peer. Essentially, the item in the list functions
// as a weak_ptr.
//
void
ServerImpl::remove (BasicPeer& peer)
bool
ServerImpl::closed()
{
std::lock_guard <std::mutex> lock (mutex_);
state_.peers.erase (state_.peers.iterator_to (peer));
std::lock_guard<std::mutex> lock(mutex_);
return ! work_;
}
void
ServerImpl::remove (Door& door)
ServerImpl::report (Stat&& stat)
{
int const bucket = ceil_log2 (stat.requests);
std::lock_guard <std::mutex> lock (mutex_);
state_.doors.erase (state_.doors.iterator_to (door));
++hist_[bucket];
high_ = std::max (high_, bucket);
if (stats_.size() >= historySize)
stats_.pop_back();
stats_.emplace_front (std::move(stat));
}
//--------------------------------------------------------------------------
@@ -164,179 +185,13 @@ ServerImpl::ceil_log2 (unsigned long long x)
return y;
}
void
ServerImpl::report (Stat&& stat)
{
int const bucket = ceil_log2 (stat.requests);
std::lock_guard <std::mutex> lock (mutex_);
++hist_[bucket];
high_ = std::max (high_, bucket);
if (stats_.size() >= historySize)
stats_.pop_back();
stats_.emplace_front (std::move(stat));
}
void
ServerImpl::onWrite (beast::PropertyStream::Map& map)
{
std::lock_guard <std::mutex> lock (mutex_);
// VFALCO TODO Write the list of doors
map ["active"] = state_.peers.size();
{
std::string s;
for (int i = 0; i <= high_; ++i)
{
if (i)
s += ", ";
s += std::to_string (hist_[i]);
}
map ["hist"] = s;
}
{
beast::PropertyStream::Set set ("history", map);
for (auto const& stat : stats_)
{
beast::PropertyStream::Map item (set);
item ["id"] = stat.id;
item ["when"] = stat.when;
{
std::stringstream ss;
ss << stat.elapsed;
item ["elapsed"] = ss.str();
}
item ["requests"] = stat.requests;
item ["bytes_in"] = stat.bytes_in;
item ["bytes_out"] = stat.bytes_out;
if (stat.ec)
item ["error"] = stat.ec.message();
}
}
}
//--------------------------------------------------------------------------
int
ServerImpl::compare (Port const& lhs, Port const& rhs)
{
if (lhs < rhs)
return -1;
else if (rhs < lhs)
return 1;
return 0;
}
void
ServerImpl::update()
{
io_service_.post (m_strand.wrap (std::bind (
&ServerImpl::on_update, this)));
}
// Updates our Door list based on settings.
void
ServerImpl::on_update ()
{
/*
if (! m_strand.running_in_this_thread())
io_service_.dispatch (m_strand.wrap (std::bind (
&ServerImpl::update, this)));
*/
if (! stopping())
{
// Make a local copy to shorten the lock
//
Ports ports;
{
std::lock_guard <std::mutex> lock (mutex_);
ports = state_.ports;
}
std::sort (ports.begin(), ports.end());
// Walk the Door list and the Port list simultaneously and
// build a replacement Door vector which we will then swap in.
//
Doors doors;
Doors::iterator door (m_doors.begin());
for (Ports::const_iterator port (ports.begin());
port != ports.end(); ++port)
{
int comp = 0;
while (door != m_doors.end() &&
((comp = compare (*port, (*door)->port())) > 0))
{
(*door)->cancel();
++door;
}
if (door != m_doors.end())
{
if (comp < 0)
{
doors.push_back (std::make_shared <Door> (
io_service_, *this, *port));
doors.back()->listen();
}
else
{
// old Port and new Port are the same
doors.push_back (*door);
++door;
}
}
else
{
doors.push_back (std::make_shared <Door> (
io_service_, *this, *port));
doors.back()->listen();
}
}
// Any remaining Door objects are not in the new set, so cancel them.
//
for (;door != m_doors.end();)
(*door)->cancel();
m_doors.swap (doors);
}
else
{
// Cancel pending I/O on all doors.
//
for (Doors::iterator iter (m_doors.begin());
iter != m_doors.end(); ++iter)
{
(*iter)->cancel();
}
// Remove our references to the old doors.
//
m_doors.resize (0);
}
}
// Thread entry point to perform io_service work
void
ServerImpl::run()
{
static std::atomic <int> id;
beast::Thread::setCurrentThreadName (
std::string("HTTP::Server #") + std::to_string (++id));
io_service_.run();
m_stopped.signal();
m_handler.onStopped (m_server);
}
}

View File

@@ -26,6 +26,7 @@
#include <beast/threads/SharedData.h>
#include <beast/threads/Thread.h>
#include <boost/asio.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/optional.hpp>
#include <array>
#include <chrono>
@@ -52,9 +53,20 @@ struct Stat
boost::system::error_code ec;
};
class ServerImpl
class ServerImpl : public Server
{
public:
class Child : public boost::intrusive::list_base_hook <
boost::intrusive::link_mode <boost::intrusive::normal_link>>
{
public:
virtual void close() = 0;
};
private:
using list_type = boost::intrusive::make_list <Child,
boost::intrusive::constant_time_size <false>>::type;
typedef std::chrono::system_clock clock_type;
enum
@@ -62,97 +74,71 @@ private:
historySize = 100
};
struct State
{
// Attributes for our listening ports
Ports ports;
// All allocated Peer objects
beast::List <BasicPeer> peers;
// All allocated Door objects
beast::List <Door> doors;
};
typedef std::vector <std::shared_ptr<Door>> Doors;
Server& m_server;
Handler& m_handler;
Handler& handler_;
std::thread thread_;
std::mutex mutable mutex_;
std::condition_variable cond_;
list_type list_;
beast::Journal journal_;
boost::asio::io_service io_service_;
boost::asio::io_service::strand m_strand;
boost::optional <boost::asio::io_service::work> m_work;
beast::WaitableEvent m_stopped;
State state_;
Doors m_doors;
boost::asio::io_service::strand strand_;
boost::optional <boost::asio::io_service::work> work_;
std::deque <Stat> stats_;
std::array <std::size_t, 64> hist_;
int high_ = 0;
public:
ServerImpl (Server& server, Handler& handler, beast::Journal journal);
ServerImpl (Handler& handler, beast::Journal journal);
~ServerImpl ();
beast::Journal
journal() const
journal() override
{
return journal_;
}
Ports const&
getPorts () const;
void
ports (std::vector<Port> const& ports) override;
void
setPorts (Ports const& ports);
bool
stopping () const;
onWrite (beast::PropertyStream::Map& map) override;
void
stop (bool wait);
close() override;
public:
Handler&
handler();
handler()
{
return handler_;
}
boost::asio::io_service&
get_io_service();
get_io_service()
{
return io_service_;
}
void
add (BasicPeer& peer);
add (Child& child);
void
add (Door& door);
remove (Child& child);
void
remove (BasicPeer& peer);
void
remove (Door& door);
bool
closed();
void
report (Stat&& stat);
void
onWrite (beast::PropertyStream::Map& map);
private:
static
int
ceil_log2 (unsigned long long x);
static
int
compare (Port const& lhs, Port const& rhs);
void
update();
void
on_update();
void
run();
};

View File

@@ -155,8 +155,6 @@ public:
void
test_request()
{
testcase("request");
boost::asio::io_service ios;
typedef boost::asio::ip::tcp::socket socket;
socket s (ios);
@@ -189,8 +187,6 @@ public:
void
test_keepalive()
{
testcase("keepalive");
boost::asio::io_service ios;
typedef boost::asio::ip::tcp::socket socket;
socket s (ios);
@@ -234,19 +230,19 @@ public:
sink.severity (beast::Journal::Severity::kAll);
beast::Journal journal {sink};
TestHandler handler;
Server s (handler, journal);
Ports ports;
auto s = make_Server (handler, journal);
std::vector<Port> list;
std::unique_ptr <RippleSSLContext> c (
RippleSSLContext::createBare ());
ports.emplace_back (testPort, beast::IP::Endpoint (
list.emplace_back (testPort, beast::IP::Endpoint (
beast::IP::AddressV4 (127, 0, 0, 1), 0),
Port::Security::no_ssl, c.get());
s.setPorts (ports);
s->ports (list);
test_request();
//test_keepalive();
s.stop();
//s->close();
s = nullptr;
pass();
}