Rewrite HTTP/S server to use coroutines:

* Fix bug with more than one complete request in a read buffer
* Use stackful coroutines for simplified control flow
* Door refactored to detect handshakes
* Remove dependency on MultiSocket
* Remove dependency on handshake detect logic framework
This commit is contained in:
Vinnie Falco
2014-09-25 14:57:27 -07:00
parent 5ce6068df5
commit 6dfc805eaa
11 changed files with 773 additions and 775 deletions

View File

@@ -2575,9 +2575,6 @@
</ClCompile>
<ClInclude Include="..\..\src\ripple\http\impl\Door.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\http\impl\Peer.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
<ClInclude Include="..\..\src\ripple\http\impl\Peer.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\http\impl\Port.cpp">

View File

@@ -3666,9 +3666,6 @@
<ClInclude Include="..\..\src\ripple\http\impl\Door.h">
<Filter>ripple\http\impl</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\http\impl\Peer.cpp">
<Filter>ripple\http\impl</Filter>
</ClCompile>
<ClInclude Include="..\..\src\ripple\http\impl\Peer.h">
<Filter>ripple\http\impl</Filter>
</ClInclude>

View File

@@ -290,6 +290,8 @@ def config_env(toolchain, variant, env):
])
boost_libs = [
'boost_coroutine',
'boost_context',
'boost_date_time',
'boost_filesystem',
'boost_program_options',

View File

@@ -86,7 +86,7 @@ public:
The initial value is always zero.
Changes to the value are persisted between calls.
*/
void* tag;
void* tag = nullptr;
/** Returns the Journal to use for logging. */
virtual

View File

@@ -20,14 +20,139 @@
#include <ripple/http/impl/Door.h>
#include <ripple/http/impl/Peer.h>
#include <beast/asio/placeholders.h>
#include <boost/logic/tribool.hpp>
#include <functional>
#include <beast/streams/debug_ostream.h>
namespace ripple {
namespace HTTP {
Door::Door (ServerImpl& impl, Port const& port)
: server_ (impl)
, acceptor_ (server_.get_io_service(), to_asio (port))
/** Detect SSL client handshakes.
Analyzes the bytes in the provided buffer to detect the SSL client
handshake. If the buffer contains insufficient data, more data will be
read from the stream until there is enough to determine a result.
No bytes are discarded from buf. Any additional bytes read are retained.
buf must provide an interface compatible with boost::asio::streambuf
http://www.boost.org/doc/libs/1_56_0/doc/html/boost_asio/reference/streambuf.html
See
http://www.ietf.org/rfc/rfc2246.txt
Section 7.4. Handshake protocol
@param socket The stream to read from
@param buf A buffer to hold the received data
@param yield A yield context
@return The error code if an error occurs, otherwise `true` if
the data read indicates the SSL client handshake.
*/
template <class Socket, class StreamBuf, class Yield>
std::pair <boost::system::error_code, bool>
detect_ssl (Socket& socket, StreamBuf& buf, Yield yield)
{
std::pair <boost::system::error_code, bool> result;
result.second = false;
for(;;)
{
std::size_t const max = 4; // the most bytes we could need
std::array <unsigned char, max> data;
auto const bytes = boost::asio::buffer_copy (
boost::asio::buffer(data), buf.data());
if (bytes > 0)
{
if (data[0] != 0x16) // message type 0x16 = "SSL Handshake"
break;
}
if (bytes >= max)
{
result.second = true;
break;
}
std::size_t const bytes_transferred = boost::asio::async_read (socket,
buf.prepare(max - bytes), boost::asio::transfer_at_least(1),
yield[result.first]);
if (result.first)
break;
buf.commit (bytes_transferred);
}
return result;
}
//------------------------------------------------------------------------------
Door::connection::connection (Door& door, socket_type&& socket,
endpoint_type endpoint)
: door_ (door)
, socket_ (std::move(socket))
, endpoint_ (endpoint)
, strand_ (door.io_service_)
, timer_ (door.io_service_)
{
}
// Work-around because we can't call shared_from_this in ctor
void
Door::connection::run()
{
boost::asio::spawn (strand_, std::bind (&connection::do_detect,
shared_from_this(), std::placeholders::_1));
boost::asio::spawn (strand_, std::bind (&connection::do_timer,
shared_from_this(), std::placeholders::_1));
}
void
Door::connection::do_timer (yield_context yield)
{
error_code ec; // ignored
while (socket_.is_open())
{
timer_.async_wait (yield[ec]);
if (timer_.expires_from_now() <= std::chrono::seconds(0))
socket_.close();
}
}
void
Door::connection::do_detect (boost::asio::yield_context yield)
{
bool ssl;
error_code ec;
boost::asio::streambuf buf;
timer_.expires_from_now (std::chrono::seconds(15));
std::tie(ec, ssl) = detect_ssl (socket_, buf, yield);
if (! ec)
{
if (ssl)
{
auto const peer = std::make_shared <SSLPeer> (door_.server_,
door_.port_, door_.server_.journal(), endpoint_,
buf.data(), std::move(socket_));
peer->accept();
return;
}
auto const peer = std::make_shared <PlainPeer> (door_.server_,
door_.port_, door_.server_.journal(), endpoint_,
buf.data(), std::move(socket_));
peer->accept();
return;
}
socket_.close();
timer_.cancel();
}
//------------------------------------------------------------------------------
Door::Door (boost::asio::io_service& io_service,
ServerImpl& impl, Port const& port)
: io_service_ (io_service)
, timer_ (io_service)
, acceptor_ (io_service, to_asio (port))
, port_ (port)
, server_ (impl)
{
server_.add (*this);
@@ -44,8 +169,6 @@ Door::Door (ServerImpl& impl, Port const& port)
{
server_.journal().info << "Bound to endpoint " <<
to_string (acceptor_.local_endpoint());
async_accept();
}
else
{
@@ -60,10 +183,11 @@ Door::~Door ()
server_.remove (*this);
}
Port const&
Door::port () const
void
Door::listen()
{
return port_;
boost::asio::spawn (io_service_, std::bind (&Door::do_accept,
shared_from_this(), std::placeholders::_1));
}
void
@@ -72,37 +196,46 @@ Door::cancel ()
acceptor_.cancel();
}
void
Door::failed (error_code ec)
{
}
//------------------------------------------------------------------------------
void
Door::async_accept ()
Door::do_accept (boost::asio::yield_context yield)
{
auto const peer (std::make_shared <Peer> (server_, port_, server_.journal()));
acceptor_.async_accept (peer->get_socket(), endpoint_, std::bind (
&Door::handle_accept, Ptr(this),
beast::asio::placeholders::error, peer));
}
void
Door::handle_accept (error_code ec,
std::shared_ptr <Peer> const& peer)
{
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
for(;;)
{
server_.journal().error <<
"accept: " << ec.message();
return;
error_code ec;
endpoint_type endpoint;
socket_type socket (io_service_);
acceptor_.async_accept (socket, endpoint, yield[ec]);
if (ec)
{
if (ec != boost::asio::error::operation_aborted)
server_.journal().error <<
"accept: " << ec.message();
break;
}
if (port_.security == Port::Security::no_ssl)
{
auto const peer = std::make_shared <PlainPeer> (server_,
port_, server_.journal(), endpoint,
boost::asio::null_buffers(), std::move(socket));
peer->accept();
}
else if (port_.security == Port::Security::require_ssl)
{
auto const peer = std::make_shared <SSLPeer> (server_,
port_, server_.journal(), endpoint,
boost::asio::null_buffers(), std::move(socket));
peer->accept();
}
else
{
auto const c = std::make_shared <connection> (
*this, std::move(socket), endpoint);
c->run();
}
}
auto const endpoint = endpoint_;
async_accept();
peer->accept (endpoint);
}
}

View File

@@ -22,46 +22,79 @@
#include <ripple/http/impl/ServerImpl.h>
#include <ripple/http/impl/Types.h>
#include <beast/asio/placeholders.h>
#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <chrono>
#include <memory>
namespace ripple {
namespace HTTP {
/** A listening socket. */
class Door
: public beast::SharedObject
, public beast::List <Door>::Node
, public beast::LeakChecked <Door>
: public beast::List <Door>::Node
, public std::enable_shared_from_this <Door>
{
private:
// VFALCO TODO Use shared_ptr
typedef beast::SharedPtr <Door> Ptr;
using clock_type = std::chrono::steady_clock;
using timer_type = boost::asio::basic_waitable_timer<clock_type>;
using error_code = boost::system::error_code;
using yield_context = boost::asio::yield_context;
using protocol_type = boost::asio::ip::tcp;
using acceptor_type = protocol_type::acceptor;
using endpoint_type = protocol_type::endpoint;
using socket_type = protocol_type::socket;
ServerImpl& server_;
acceptor acceptor_;
endpoint_t endpoint_;
boost::asio::io_service& io_service_;
boost::asio::basic_waitable_timer <clock_type> timer_;
acceptor_type acceptor_;
Port port_;
ServerImpl& server_;
public:
Door (ServerImpl& impl, Port const& port);
Door (boost::asio::io_service& io_service,
ServerImpl& impl, Port const& port);
~Door ();
Port const&
port () const;
port() const
{
return port_;
}
void
cancel ();
void listen();
void cancel();
void
failed (error_code ec);
private:
class connection
: public std::enable_shared_from_this <connection>
{
private:
Door& door_;
socket_type socket_;
endpoint_type endpoint_;
boost::asio::io_service::strand strand_;
timer_type timer_;
void
async_accept ();
public:
connection (Door& door, socket_type&& socket,
endpoint_type endpoint);
void
handle_accept (error_code ec,
std::shared_ptr <Peer> const& peer);
void
run();
private:
void
do_timer (yield_context yield);
void
do_detect (yield_context yield);
};
void do_accept (yield_context yield);
};
}

View File

@@ -1,637 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/http/impl/Peer.h>
#include <beast/asio/ssl.h>
#include <cassert>
namespace ripple {
namespace HTTP {
/*
Reference:
http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
*/
std::atomic <std::size_t> Peer::s_count_;
std::size_t
Peer::count()
{
return s_count_.load();
}
Peer::Peer (ServerImpl& server, Port const& port, beast::Journal journal)
: journal_ (journal)
, server_ (server)
, strand_ (server_.get_io_service())
, timer_ (server_.get_io_service())
, parser_ (message_, true)
{
++s_count_;
static std::atomic <int> sid;
nid_ = ++sid;
id_ = std::string("#") + std::to_string(nid_) + " ";
tag = nullptr;
int flags = MultiSocket::Flag::server_role;
switch (port.security)
{
case Port::Security::no_ssl:
break;
case Port::Security::allow_ssl:
flags |= MultiSocket::Flag::ssl;
break;
case Port::Security::require_ssl:
flags |= MultiSocket::Flag::ssl_required;
break;
}
socket_.reset (MultiSocket::New (
server_.get_io_service(), port.context->get(), flags));
server_.add (*this);
if (journal_.trace) journal_.trace << id_ <<
"created";
}
Peer::~Peer ()
{
if (callClose_)
{
Stat stat;
stat.id = nid_;
stat.when = std::move (when_str_);
stat.elapsed = std::chrono::duration_cast <
std::chrono::seconds> (clock_type::now() - when_);
stat.requests = request_count_;
stat.bytes_in = bytes_in_;
stat.bytes_out = bytes_out_;
stat.ec = std::move (ec_);
server_.report (std::move (stat));
server_.handler().onClose (session(), ec_);
}
server_.remove (*this);
if (journal_.trace) journal_.trace << id_ <<
"destroyed: " << request_count_ <<
((request_count_ == 1) ? " request" : " requests");
--s_count_;
}
//------------------------------------------------------------------------------
// Called when the acceptor accepts our socket.
void
Peer::accept (boost::asio::ip::tcp::endpoint endpoint)
{
if (! strand_.running_in_this_thread())
return server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer::accept, shared_from_this(), endpoint)));
if (journal_.trace) journal_.trace << id_ <<
"accept: " << endpoint.address();
callClose_ = true;
when_ = clock_type::now();
when_str_ = beast::Time::getCurrentTime().formatted (
"%Y-%b-%d %H:%M:%S").toStdString();
server_.handler().onAccept (session());
// Handler might have closed
if (state_ == State::closed)
{
// VFALCO TODO Is this the correct way to close the socket?
// See what state the socket is in and verify.
//closed();
return;
}
if (socket_->needs_handshake ())
{
start_timer();
socket_->async_handshake (beast::asio::abstract_socket::server,
strand_.wrap (std::bind (&Peer::on_handshake, shared_from_this(),
beast::asio::placeholders::error)));
}
else
{
async_read();
}
}
// Called by a completion handler when error is not eof or aborted.
void
Peer::fail (error_code ec)
{
assert (ec);
assert (strand_.running_in_this_thread());
if (journal_.debug) journal_.debug << id_ <<
"fail: " << ec.message();
ec_ = ec;
socket_->cancel(ec);
}
// Start the timer.
// If the timer expires, the session is considered
// timed out and will be forcefully closed.
void
Peer::start_timer()
{
timer_.expires_from_now (
boost::posix_time::seconds (
timeoutSeconds));
timer_.async_wait (strand_.wrap (std::bind (
&Peer::on_timer, shared_from_this(),
beast::asio::placeholders::error)));
}
void
Peer::async_write ()
{
void const* data;
std::size_t bytes;
{
std::lock_guard <std::mutex> lock (mutex_);
buffer& b = write_queue_.front();
data = b.data.get() + b.used;
bytes = b.bytes - b.used;
}
start_timer();
boost::asio::async_write (*socket_, boost::asio::buffer (data, bytes),
boost::asio::transfer_at_least (1), strand_.wrap (std::bind (
&Peer::on_write, shared_from_this(),
beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred)));
}
void
Peer::async_read()
{
start_timer();
boost::asio::async_read (*socket_, read_buf_.prepare (bufferSize),
boost::asio::transfer_at_least (1), strand_.wrap (std::bind (
&Peer::on_read, shared_from_this(),
beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred)));
}
//------------------------------------------------------------------------------
// Called when session times out
void
Peer::on_timer (error_code ec)
{
if (state_ == State::closed)
{
if (journal_.trace) journal_.trace << id_ <<
"timer: closed";
return;
}
if (ec == boost::asio::error::operation_aborted)
{
// Disable this otherwise we log needlessly
// on every new read and write.
/*
if (journal_.trace) journal_.trace << id_ <<
"timer: aborted";
*/
return;
}
if (! ec)
ec = boost::system::errc::make_error_code (
boost::system::errc::timed_out);
if (journal_.debug) journal_.debug << id_ <<
"timer: " << ec.message();
fail (ec);
}
// Called when the handshake completes
void
Peer::on_handshake (error_code ec)
{
{
error_code ec;
timer_.cancel(ec);
}
bool const ssl = socket_->ssl_handle() != nullptr;
if (state_ == State::closed)
{
if (journal_.trace) journal_.trace << id_ <<
"handshake: closed";
return;
}
if (ec == boost::asio::error::operation_aborted)
{
if (journal_.trace) journal_.trace << id_ <<
"handshake: aborted";
return;
}
if (ec)
{
if (journal_.debug) journal_.debug << id_ <<
"handshake: " << ec.message();
return fail (ec);
}
if (journal_.trace) journal_.trace << id_ <<
"handshake" << (ssl ? ": ssl" : "");
async_read();
}
// Called repeatedly with the http request data
void
Peer::on_read (error_code ec, std::size_t bytes_transferred)
{
// This needs to happen before the call to onRequest
// otherwise we could time out if the Handler takes too long.
{
error_code ec;
timer_.cancel(ec);
}
if (state_ == State::closed)
{
if (journal_.trace) journal_.trace << id_ <<
"read: closed";
return;
}
if (ec == boost::asio::error::operation_aborted)
{
if (journal_.trace) journal_.trace << id_ <<
"read: aborted";
return;
}
if (beast::asio::is_short_read (ec))
{
if (journal_.trace) journal_.trace << id_ <<
"read: " << ec.message();
return fail (ec);
}
if (ec && ec != boost::asio::error::eof)
{
if (journal_.debug) journal_.debug << id_ <<
"read: " << ec.message();
return fail (ec);
}
bool const eof = ec == boost::asio::error::eof;
if (! eof)
{
if (journal_.trace) journal_.trace << id_ <<
"read: " << bytes_transferred << " bytes";
}
else
{
// End of stream reached:
// http://www.boost.org/doc/libs/1_56_0/doc/html/boost_asio/overview/core/streams.html
if (bytes_transferred != 0)
if (journal_.error) journal_.error << id_ <<
"read: eof (" << bytes_transferred << " bytes)";
if (journal_.debug) journal_.debug << id_ <<
"read: eof";
ec = error_code{};
}
bytes_in_ += bytes_transferred;
read_buf_.commit (bytes_transferred);
std::pair <bool, std::size_t> result;
if (! eof)
{
result = parser_.write (read_buf_.data());
if (result.first)
read_buf_.consume (result.second);
else
ec = parser_.error();
}
else
{
result.first = parser_.write_eof();
if (! result.first)
ec = parser_.error();
}
// VFALCO TODO Currently parsing errors are treated the
// same as the connection dropping. Instead, we
// should request that the handler compose a proper HTTP error
// response. This requires refactoring HTTPReply() into
// something sensible.
//
if (! ec && parser_.complete())
{
// Perform half-close when Connection: close and not SSL
if (! message_.keep_alive() &&
! socket_->needs_handshake())
socket_->shutdown (socket::shutdown_receive, ec);
if (! ec)
{
++request_count_;
server_.handler().onRequest (session());
return;
}
}
if (ec)
{
if (journal_.debug) journal_.debug << id_ <<
"read: " << ec.message();
return fail (ec);
}
if (! eof)
async_read();
}
// Called when async_write completes.
void
Peer::on_write (error_code ec, std::size_t bytes_transferred)
{
{
error_code ec;
timer_.cancel (ec);
}
if (state_ == State::closed)
{
if (journal_.trace) journal_.trace << id_ <<
"write: closed";
return;
}
if (ec == boost::asio::error::operation_aborted)
{
if (journal_.trace) journal_.trace << id_ <<
"write: aborted";
return;
}
if (ec)
{
if (journal_.debug) journal_.debug << id_ <<
"write: " << ec.message();
return fail (ec);
}
if (bytes_transferred == 0)
if (journal_.error) journal_.error << id_ <<
"write: 0 bytes";
if (journal_.trace) journal_.trace << id_ <<
"write: " << bytes_transferred << " bytes";
bytes_out_ += bytes_transferred;
bool empty;
{
std::lock_guard <std::mutex> lock (mutex_);
buffer& b = write_queue_.front();
b.used += bytes_transferred;
if (b.used == b.bytes)
{
write_queue_.pop_front();
empty = write_queue_.empty();
}
else
{
assert (b.used < b.bytes);
empty = false;
}
}
if (! empty)
return async_write();
if (! complete_)
return;
// Handler is done writing, did we graceful close?
if (state_ == State::flush)
{
if (socket_->needs_handshake())
{
// ssl::stream
start_timer();
socket_->async_shutdown (strand_.wrap (std::bind (
&Peer::on_shutdown, shared_from_this(),
beast::asio::placeholders::error)));
return;
}
{
error_code ec;
socket_->shutdown (MultiSocket::shutdown_send, ec);
}
return;
}
// keep-alive
complete_ = false;
async_read();
}
// Called when async_shutdown completes
void
Peer::on_shutdown (error_code ec)
{
{
error_code ec;
timer_.cancel (ec);
}
if (ec == boost::asio::error::operation_aborted)
{
// We canceled i/o on the socket, or we called async_shutdown
// and then closed the socket before getting the completion.
if (journal_.trace) journal_.trace << id_ <<
"shutdown: aborted";
return;
}
if (ec == boost::asio::error::eof)
{
// Happens when ssl::stream::async_shutdown completes without error
if (journal_.trace) journal_.trace << id_ <<
"shutdown: eof";
return;
}
if ((ec.category() == boost::asio::error::get_ssl_category())
&& (ERR_GET_REASON(ec.value()) == SSL_R_SHORT_READ))
{
// Remote peer failed to send SSL close_notify message.
if (journal_.trace) journal_.trace << id_ <<
"shutdown: missing close_notify";
return;
}
if (ec)
{
if (journal_.debug) journal_.debug << id_ <<
"shutdown: " << ec.message();
return fail (ec);
}
if (journal_.trace) journal_.trace << id_ <<
"shutdown";
}
//------------------------------------------------------------------------------
// Send a copy of the data.
void
Peer::write (void const* buffer, std::size_t bytes)
{
if (bytes == 0)
return;
bool empty;
{
std::lock_guard <std::mutex> lock (mutex_);
empty = write_queue_.empty();
write_queue_.emplace_back (buffer, bytes);
}
if (empty)
server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer::async_write, shared_from_this())));
}
// Make the Session asynchronous
void
Peer::detach ()
{
if (! detach_ref_)
{
assert (! work_);
// Maintain an additional reference while detached
detach_ref_ = shared_from_this();
// Prevent the io_service from running out of work.
// The work object will be destroyed with the Peer
// after the Session is closed and handlers complete.
//
work_ = boost::in_place (std::ref (
server_.get_io_service()));
}
}
// Called to indicate the response has been written (but not sent)
void
Peer::complete()
{
if (! strand_.running_in_this_thread())
return server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer::complete, shared_from_this())));
// Reattach
detach_ref_.reset();
work_ = boost::none;
message_ = beast::http::message{};
parser_ = beast::http::parser{message_, true};
complete_ = true;
bool empty;
{
std::lock_guard <std::mutex> lock (mutex_);
empty = write_queue_.empty();
}
if (empty)
{
// keep-alive
complete_ = false;
async_read();
}
}
// Called from the Handler to close the session.
void
Peer::close (bool graceful)
{
if (! strand_.running_in_this_thread())
return server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer::close, shared_from_this(), graceful)));
// Reattach
detach_ref_.reset();
work_ = boost::none;
assert (state_ == State::open);
state_ = graceful ? State::flush : State::closed;
complete_ = true;
if (graceful)
{
bool empty;
{
std::lock_guard <std::mutex> lock (mutex_);
empty = write_queue_.empty();
}
if (! empty)
return;
if (socket_->needs_handshake())
{
start_timer();
socket_->async_shutdown (strand_.wrap (std::bind (
&Peer::on_shutdown, shared_from_this(),
beast::asio::placeholders::error)));
return;
}
}
error_code ec;
timer_.cancel (ec);
socket_->close (ec);
}
}
}

View File

@@ -26,56 +26,58 @@
#include <ripple/http/impl/ServerImpl.h>
#include <ripple/common/MultiSocket.h>
#include <beast/asio/placeholders.h>
#include <beast/asio/ssl.h> // for is_short_read?
#include <beast/http/message.h>
#include <beast/http/parser.h>
#include <beast/module/core/core.h>
#include <beast/module/asio/http/HTTPRequestParser.h>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/logic/tribool.hpp>
#include <boost/asio/spawn.hpp>
#include <atomic>
#include <cassert>
#include <chrono>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include <beast/cxx14/type_traits.h> // <type_traits>
namespace ripple {
namespace HTTP {
//------------------------------------------------------------------------------
/** Represents an active connection. */
class Peer
: public std::enable_shared_from_this <Peer>
, public Session
, public beast::List <Peer>::Node
, public beast::LeakChecked <Peer>
class BasicPeer
: public beast::List <BasicPeer>::Node
{
private:
typedef std::chrono::system_clock clock_type;
typedef MultiSocket socket_type;
public:
virtual ~BasicPeer() = default;
};
//------------------------------------------------------------------------------
/** Represents an active connection. */
template <class Impl>
class Peer
: public BasicPeer
, public Session
{
protected:
using clock_type = std::chrono::system_clock;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using waitable_timer = boost::asio::basic_waitable_timer <clock_type>;
enum
{
// Size of our receive buffer
bufferSize = 4 * 1024,
// Largest HTTP request allowed
maxRequestBytes = 32 * 1024,
// Max seconds without completing a message
timeoutSeconds = 30
};
enum class State
{
open,
flush,
closed
};
struct buffer
{
buffer (void const* ptr, std::size_t len)
@@ -93,23 +95,21 @@ private:
beast::Journal journal_;
ServerImpl& server_;
boost::asio::io_service::strand strand_;
waitable_timer timer_;
endpoint_type endpoint_;
std::string id_;
std::size_t nid_;
boost::asio::io_service::strand strand_;
boost::asio::deadline_timer timer_;
std::unique_ptr <socket_type> socket_;
boost::asio::streambuf read_buf_;
beast::http::message message_;
beast::http::parser parser_;
std::list <buffer> write_queue_;
std::mutex mutex_;
State state_ = State::open;
bool graceful_ = false;
bool complete_ = false;
bool callClose_ = false;
std::shared_ptr <Peer> detach_ref_;
boost::optional <boost::asio::io_service::work> work_;
boost::system::error_code ec_;
clock_type::time_point when_;
@@ -120,71 +120,54 @@ private:
//--------------------------------------------------------------------------
static std::atomic <std::size_t> s_count_;
public:
static
std::size_t
count();
template <class ConstBufferSequence>
Peer (ServerImpl& impl, Port const& port, beast::Journal journal,
endpoint_type endpoint, ConstBufferSequence const& buffers);
Peer (ServerImpl& impl, Port const& port, beast::Journal journal);
~Peer ();
socket&
get_socket()
{
return socket_->this_layer<socket>();
}
virtual
~Peer();
Session&
session ()
session()
{
return *this;
}
void
accept (boost::asio::ip::tcp::endpoint endpoint);
protected:
Impl&
impl()
{
return *static_cast<Impl*>(this);
}
private:
void
fail (error_code ec);
fail (error_code ec, char const* what);
void
start_timer();
void
async_write();
void
async_read();
//--------------------------------------------------------------------------
//
// Completion Handlers
//
cancel_timer();
void
on_timer (error_code ec);
void
on_handshake (error_code ec);
do_read (boost::asio::yield_context yield);
void
on_read (error_code ec, std::size_t bytes_transferred);
do_write (boost::asio::yield_context yield);
virtual
void
on_write (error_code ec, std::size_t bytes_transferred);
do_request() = 0;
virtual
void
on_shutdown (error_code ec);
do_close() = 0;
void
on_close (error_code ec);
//--------------------------------------------------------------------------
//
// Session
//
beast::Journal
journal() override
@@ -195,7 +178,7 @@ private:
beast::IP::Endpoint
remoteAddress() override
{
return from_asio (get_socket().remote_endpoint());
return from_asio (endpoint_);
}
beast::http::message&
@@ -217,6 +200,493 @@ private:
close (bool graceful) override;
};
//------------------------------------------------------------------------------
class PlainPeer
: public Peer <PlainPeer>
, public std::enable_shared_from_this <PlainPeer>
{
private:
friend class Peer <PlainPeer>;
using socket_type = boost::asio::ip::tcp::socket;
socket_type socket_;
public:
template <class ConstBufferSequence>
PlainPeer (ServerImpl& impl, Port const& port, beast::Journal journal,
endpoint_type endpoint, ConstBufferSequence const& buffers,
socket_type&& socket);
void
accept();
private:
void
do_request();
void
do_close();
};
template <class ConstBufferSequence>
PlainPeer::PlainPeer (ServerImpl& server, Port const& port,
beast::Journal journal, endpoint_type endpoint,
ConstBufferSequence const& buffers,
boost::asio::ip::tcp::socket&& socket)
: Peer (server, port, journal, endpoint, buffers)
, socket_(std::move(socket))
{
}
void
PlainPeer::accept ()
{
server_.handler().onAccept (session());
if (! socket_.is_open())
return;
boost::asio::spawn (strand_, std::bind (&PlainPeer::do_read,
shared_from_this(), std::placeholders::_1));
}
void
PlainPeer::do_request()
{
// Perform half-close when Connection: close and not SSL
error_code ec;
if (! message_.keep_alive())
socket_.shutdown (socket_type::shutdown_receive, ec);
if (! ec)
{
++request_count_;
server_.handler().onRequest (session());
return;
}
if (ec)
fail (ec, "request");
}
void
PlainPeer::do_close()
{
error_code ec;
socket_.shutdown (socket_type::shutdown_send, ec);
}
//------------------------------------------------------------------------------
class SSLPeer
: public Peer <SSLPeer>
, public std::enable_shared_from_this <SSLPeer>
{
private:
friend class Peer <SSLPeer>;
using next_layer_type = boost::asio::ip::tcp::socket;
using socket_type = boost::asio::ssl::stream <next_layer_type&>;
next_layer_type next_layer_;
socket_type socket_;
public:
template <class ConstBufferSequence>
SSLPeer (ServerImpl& impl, Port const& port, beast::Journal journal,
endpoint_type endpoint, ConstBufferSequence const& buffers,
next_layer_type&& socket);
void
accept();
private:
void
do_handshake (boost::asio::yield_context yield);
void
do_request();
void
do_close();
void
on_shutdown (error_code ec);
};
template <class ConstBufferSequence>
SSLPeer::SSLPeer (ServerImpl& server, Port const& port,
beast::Journal journal, endpoint_type endpoint,
ConstBufferSequence const& buffers,
boost::asio::ip::tcp::socket&& socket)
: Peer (server, port, journal, endpoint, buffers)
, next_layer_ (std::move(socket))
, socket_ (next_layer_, port.context->get())
{
}
// Called when the acceptor accepts our socket.
void
SSLPeer::accept ()
{
server_.handler().onAccept (session());
if (! next_layer_.is_open())
return;
boost::asio::spawn (strand_, std::bind (&SSLPeer::do_handshake,
shared_from_this(), std::placeholders::_1));
}
void
SSLPeer::do_handshake (boost::asio::yield_context yield)
{
error_code ec;
std::size_t const bytes_transferred = socket_.async_handshake (
socket_type::server, read_buf_.data(), yield[ec]);
if (ec)
return fail (ec, "handshake");
boost::asio::spawn (strand_, std::bind (&SSLPeer::do_read,
shared_from_this(), std::placeholders::_1));
}
void
SSLPeer::do_request()
{
++request_count_;
server_.handler().onRequest (session());
}
void
SSLPeer::do_close()
{
error_code ec;
socket_.async_shutdown (strand_.wrap (std::bind (
&SSLPeer::on_shutdown, shared_from_this(),
std::placeholders::_1)));
}
void
SSLPeer::on_shutdown (error_code ec)
{
socket_.next_layer().close(ec);
}
//------------------------------------------------------------------------------
template <class Impl>
template <class ConstBufferSequence>
Peer<Impl>::Peer (ServerImpl& server, Port const& port,
beast::Journal journal, endpoint_type endpoint,
ConstBufferSequence const& buffers)
: journal_ (journal)
, server_ (server)
, strand_ (server_.get_io_service())
, timer_ (server_.get_io_service())
, endpoint_ (endpoint)
{
read_buf_.commit (boost::asio::buffer_copy (read_buf_.prepare (
boost::asio::buffer_size (buffers)), buffers));
static std::atomic <int> sid;
nid_ = ++sid;
id_ = std::string("#") + std::to_string(nid_) + " ";
server_.add (*this);
if (journal_.trace) journal_.trace << id_ <<
"accept: " << endpoint.address();
when_ = clock_type::now();
when_str_ = beast::Time::getCurrentTime().formatted (
"%Y-%b-%d %H:%M:%S").toStdString();
}
template <class Impl>
Peer<Impl>::~Peer()
{
Stat stat;
stat.id = nid_;
stat.when = std::move (when_str_);
stat.elapsed = std::chrono::duration_cast <
std::chrono::seconds> (clock_type::now() - when_);
stat.requests = request_count_;
stat.bytes_in = bytes_in_;
stat.bytes_out = bytes_out_;
stat.ec = std::move (ec_);
server_.report (std::move (stat));
server_.handler().onClose (session(), ec_);
server_.remove (*this);
if (journal_.trace) journal_.trace << id_ <<
"destroyed: " << request_count_ <<
((request_count_ == 1) ? " request" : " requests");
}
//------------------------------------------------------------------------------
template <class Impl>
void
Peer<Impl>::fail (error_code ec, char const* what)
{
if (! ec_ && ec != boost::asio::error::operation_aborted)
{
ec_ = ec;
if (journal_.trace) journal_.trace << id_ <<
std::string(what) << ": " << ec.message();
impl().socket_.lowest_layer().close (ec);
}
}
template <class Impl>
void
Peer<Impl>::start_timer()
{
error_code ec;
timer_.expires_from_now (std::chrono::seconds(timeoutSeconds), ec);
if (ec)
return fail (ec, "start_timer");
timer_.async_wait (strand_.wrap (std::bind (
&Peer<Impl>::on_timer, impl().shared_from_this(),
beast::asio::placeholders::error)));
}
// Convenience for discarding the error code
template <class Impl>
void
Peer<Impl>::cancel_timer()
{
error_code ec;
timer_.cancel(ec);
}
// Called when session times out
template <class Impl>
void
Peer<Impl>::on_timer (error_code ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
if (! ec)
ec = boost::system::errc::make_error_code (
boost::system::errc::timed_out);
fail (ec, "timer");
}
//------------------------------------------------------------------------------
template <class Impl>
void
Peer<Impl>::do_read (boost::asio::yield_context yield)
{
complete_ = false;
error_code ec;
bool eof = false;
beast::http::parser parser (message_, true);
for(;;)
{
if (read_buf_.size() == 0)
{
start_timer();
auto const bytes_transferred = boost::asio::async_read (
impl().socket_, read_buf_.prepare (bufferSize),
boost::asio::transfer_at_least(1), yield[ec]);
cancel_timer();
eof = ec == boost::asio::error::eof;
if (eof)
{
ec = error_code{};
}
else if (! ec)
{
bytes_in_ += bytes_transferred;
read_buf_.commit (bytes_transferred);
}
}
if (! ec)
{
if (! eof)
{
// VFALCO TODO Currently parsing errors are treated the
// same as the connection dropping. Instead, we
// should request that the handler compose a proper HTTP error
// response. This requires refactoring HTTPReply() into
// something sensible.
auto const result = parser.write (read_buf_.data());
if (result.first)
read_buf_.consume (result.second);
else
ec = parser.error();
}
else
{
if (! parser.write_eof())
ec = parser.error();
}
}
if (! ec)
{
if (parser.complete())
return do_request();
else if (eof)
ec = boost::asio::error::eof; // incomplete request
}
if (ec)
return fail (ec, "read");
}
}
// Send everything in the write queue.
// The write queue must not be empty upon entry.
template <class Impl>
void
Peer<Impl>::do_write (boost::asio::yield_context yield)
{
error_code ec;
std::size_t bytes = 0;
for(;;)
{
bytes_out_ += bytes;
bool empty;
void const* data;
{
std::lock_guard <std::mutex> lock (mutex_);
buffer& b = write_queue_.front();
b.used += bytes;
if (b.used < b.bytes)
{
empty = false;
}
else
{
write_queue_.pop_front();
empty = write_queue_.empty();
}
data = b.data.get() + b.used;
bytes = b.bytes - b.used;
}
if (empty)
break;
start_timer();
boost::asio::async_write (impl().socket_, boost::asio::buffer (
data, bytes), boost::asio::transfer_at_least(1), yield[ec]);
cancel_timer();
if (ec)
return fail (ec, "write");
}
if (! complete_)
return;
if (graceful_)
return do_close();
boost::asio::spawn (strand_, std::bind (&Peer<Impl>::do_read,
impl().shared_from_this(), std::placeholders::_1));
}
//------------------------------------------------------------------------------
// Send a copy of the data.
template <class Impl>
void
Peer<Impl>::write (void const* buffer, std::size_t bytes)
{
if (bytes == 0)
return;
bool empty;
{
std::lock_guard <std::mutex> lock (mutex_);
empty = write_queue_.empty();
write_queue_.emplace_back (buffer, bytes);
}
if (empty)
boost::asio::spawn (strand_, std::bind (&Peer<Impl>::do_write,
impl().shared_from_this(), std::placeholders::_1));
}
// Make the Session asynchronous
template <class Impl>
void
Peer<Impl>::detach ()
{
if (! detach_ref_)
{
assert (! work_);
// Maintain an additional reference while detached
detach_ref_ = impl().shared_from_this();
// Prevent the io_service from running out of work.
// The work object will be destroyed with the Peer
// after the Session is closed and handlers complete.
//
work_ = boost::in_place (std::ref (
server_.get_io_service()));
}
}
// Called to indicate the response has been written (but not sent)
template <class Impl>
void
Peer<Impl>::complete()
{
if (! strand_.running_in_this_thread())
return server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer<Impl>::complete, impl().shared_from_this())));
// Reattach
detach_ref_.reset();
work_ = boost::none;
message_ = beast::http::message{};
complete_ = true;
if (! write_queue_.empty())
return;
// keep-alive
boost::asio::spawn (strand_, std::bind (&Peer<Impl>::do_read,
impl().shared_from_this(), std::placeholders::_1));
}
// Called from the Handler to close the session.
template <class Impl>
void
Peer<Impl>::close (bool graceful)
{
if (! strand_.running_in_this_thread())
return server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer<Impl>::close, impl().shared_from_this(),
graceful)));
// Reattach
detach_ref_.reset();
work_ = boost::none;
complete_ = true;
if (graceful)
{
graceful_ = true;
if (! write_queue_.empty())
return;
}
error_code ec;
timer_.cancel (ec);
impl().socket_.lowest_layer().close (ec);
}
}
}

View File

@@ -105,7 +105,7 @@ ServerImpl::get_io_service()
// way, the Peer can never outlive the server.
//
void
ServerImpl::add (Peer& peer)
ServerImpl::add (BasicPeer& peer)
{
std::lock_guard <std::mutex> lock (mutex_);
state_.peers.push_back (peer);
@@ -123,7 +123,7 @@ ServerImpl::add (Door& door)
// as a weak_ptr.
//
void
ServerImpl::remove (Peer& peer)
ServerImpl::remove (BasicPeer& peer)
{
std::lock_guard <std::mutex> lock (mutex_);
state_.peers.erase (state_.peers.iterator_to (peer));
@@ -183,7 +183,7 @@ ServerImpl::onWrite (beast::PropertyStream::Map& map)
// VFALCO TODO Write the list of doors
map ["active"] = Peer::count();
map ["active"] = state_.peers.size();
{
std::string s;
@@ -282,7 +282,9 @@ ServerImpl::on_update ()
{
if (comp < 0)
{
doors.push_back (new Door (*this, *port));
doors.push_back (std::make_shared <Door> (
io_service_, *this, *port));
doors.back()->listen();
}
else
{
@@ -293,7 +295,9 @@ ServerImpl::on_update ()
}
else
{
doors.push_back (new Door (*this, *port));
doors.push_back (std::make_shared <Door> (
io_service_, *this, *port));
doors.back()->listen();
}
}

View File

@@ -38,8 +38,8 @@
namespace ripple {
namespace HTTP {
class BasicPeer;
class Door;
class Peer;
struct Stat
{
@@ -68,13 +68,13 @@ private:
Ports ports;
// All allocated Peer objects
beast::List <Peer> peers;
beast::List <BasicPeer> peers;
// All allocated Door objects
beast::List <Door> doors;
};
typedef std::vector <beast::SharedPtr <Door>> Doors;
typedef std::vector <std::shared_ptr<Door>> Doors;
Server& m_server;
Handler& m_handler;
@@ -121,13 +121,13 @@ public:
get_io_service();
void
add (Peer& peer);
add (BasicPeer& peer);
void
add (Door& door);
void
remove (Peer& peer);
remove (BasicPeer& peer);
void
remove (Door& door);

View File

@@ -20,7 +20,6 @@
#include <BeastConfig.h>
#include <ripple/http/impl/Door.cpp>
#include <ripple/http/impl/Peer.cpp>
#include <ripple/http/impl/Port.cpp>
#include <ripple/http/impl/ScopedStream.cpp>
#include <ripple/http/impl/ServerImpl.cpp>