Fix handling of HTTP/S keep-alives (RIPD-556):

* Proper shutdown for ssl and non-ssl connections
* Report session id in history
* Report histogram of requests per session
* Change print name to 'http'
* Split logging into "HTTP" and "HTTP-RPC" partitions
* More logging and refinement of logging severities
* Log the request count when a session is destroyed

Conflicts:
	src/ripple/http/impl/Peer.cpp
	src/ripple/http/impl/Peer.h
	src/ripple/http/impl/ServerImpl.cpp
	src/ripple/module/app/main/Application.cpp
	src/ripple/module/app/main/RPCHTTPServer.cpp
This commit is contained in:
Vinnie Falco
2014-09-08 14:04:16 -07:00
parent 319ac14e7d
commit ee8bd8ddae
12 changed files with 529 additions and 236 deletions

View File

@@ -159,6 +159,8 @@
</ClInclude> </ClInclude>
<ClInclude Include="..\..\src\beast\beast\asio\socket_wrapper.h"> <ClInclude Include="..\..\src\beast\beast\asio\socket_wrapper.h">
</ClInclude> </ClInclude>
<ClInclude Include="..\..\src\beast\beast\asio\ssl.h">
</ClInclude>
<ClCompile Include="..\..\src\beast\beast\asio\tests\bind_handler.test.cpp"> <ClCompile Include="..\..\src\beast\beast\asio\tests\bind_handler.test.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild> <ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile> </ClCompile>

View File

@@ -711,6 +711,9 @@
<ClInclude Include="..\..\src\beast\beast\asio\socket_wrapper.h"> <ClInclude Include="..\..\src\beast\beast\asio\socket_wrapper.h">
<Filter>beast\asio</Filter> <Filter>beast\asio</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="..\..\src\beast\beast\asio\ssl.h">
<Filter>beast\asio</Filter>
</ClInclude>
<ClCompile Include="..\..\src\beast\beast\asio\tests\bind_handler.test.cpp"> <ClCompile Include="..\..\src\beast\beast\asio\tests\bind_handler.test.cpp">
<Filter>beast\asio\tests</Filter> <Filter>beast\asio\tests</Filter>
</ClCompile> </ClCompile>

View File

@@ -147,6 +147,15 @@ public:
} }
/** @} */ /** @} */
/** Detach the session.
This holds the session open so that the response can be sent
asynchronously. Calls to io_service::run made by the server
will not return until all detached sessions are closed.
*/
virtual
void
detach() = 0;
/** Indicate that the response is complete. /** Indicate that the response is complete.
The handler should call this when it has completed writing The handler should call this when it has completed writing
the response. If Keep-Alive is indicated on the connection, the response. If Keep-Alive is indicated on the connection,
@@ -157,15 +166,6 @@ public:
void void
complete() = 0; complete() = 0;
/** Detach the session.
This holds the session open so that the response can be sent
asynchronously. Calls to io_service::run made by the server
will not return until all detached sessions are closed.
*/
virtual
void
detach() = 0;
/** Close the session. /** Close the session.
This will be performed asynchronously. The session will be This will be performed asynchronously. The session will be
closed gracefully after all pending writes have completed. closed gracefully after all pending writes have completed.

View File

@@ -81,7 +81,7 @@ void
Door::async_accept () Door::async_accept ()
{ {
auto const peer (std::make_shared <Peer> (server_, port_, server_.journal())); auto const peer (std::make_shared <Peer> (server_, port_, server_.journal()));
acceptor_.async_accept (peer->get_socket(), std::bind ( acceptor_.async_accept (peer->get_socket(), endpoint_, std::bind (
&Door::handle_accept, Ptr(this), &Door::handle_accept, Ptr(this),
beast::asio::placeholders::error, peer)); beast::asio::placeholders::error, peer));
} }
@@ -95,13 +95,14 @@ Door::handle_accept (error_code ec,
if (ec) if (ec)
{ {
server_.journal().error << "Accept failed: " << ec.message(); server_.journal().error <<
"accept: " << ec.message();
return; return;
} }
auto const endpoint = endpoint_;
async_accept(); async_accept();
peer->accept (endpoint);
peer->accept();
} }
} }

View File

@@ -39,6 +39,7 @@ private:
ServerImpl& server_; ServerImpl& server_;
acceptor acceptor_; acceptor acceptor_;
endpoint_t endpoint_;
Port port_; Port port_;
public: public:

View File

@@ -18,27 +18,35 @@
//============================================================================== //==============================================================================
#include <ripple/http/impl/Peer.h> #include <ripple/http/impl/Peer.h>
#include <beast/asio/ssl.h>
#include <cassert> #include <cassert>
namespace ripple { namespace ripple {
namespace HTTP { namespace HTTP {
/*
Reference:
http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
*/
std::atomic <std::size_t> Peer::s_count_;
std::size_t
Peer::count()
{
return s_count_.load();
}
Peer::Peer (ServerImpl& server, Port const& port, beast::Journal journal) Peer::Peer (ServerImpl& server, Port const& port, beast::Journal journal)
: journal_ (journal) : journal_ (journal)
, server_ (server) , server_ (server)
, strand_ (server_.get_io_service()) , strand_ (server_.get_io_service())
, timer_ (server_.get_io_service()) , timer_ (server_.get_io_service())
, parser_ (message_, true) , parser_ (message_, true)
, pending_writes_ (0)
, closed_ (false)
, callClose_ (false)
, detached_ (0)
, request_count_ (0)
, bytes_in_ (0)
, bytes_out_ (0)
{ {
static std::atomic <int> sid; static std::atomic <int> sid;
id_ = std::string("#") + std::to_string(sid++); nid_ = ++sid;
id_ = std::string("#") + std::to_string(nid_) + " ";
tag = nullptr; tag = nullptr;
@@ -63,16 +71,16 @@ Peer::Peer (ServerImpl& server, Port const& port, beast::Journal journal)
server_.add (*this); server_.add (*this);
if (journal_.trace) journal_.trace << if (journal_.trace) journal_.trace << id_ <<
id_ << " created"; "created";
} }
Peer::~Peer () Peer::~Peer ()
{ {
if (callClose_) if (callClose_)
{ {
Stat stat; Stat stat;
stat.id = nid_;
stat.when = std::move (when_str_); stat.when = std::move (when_str_);
stat.elapsed = std::chrono::duration_cast < stat.elapsed = std::chrono::duration_cast <
std::chrono::seconds> (clock_type::now() - when_); std::chrono::seconds> (clock_type::now() - when_);
@@ -80,32 +88,32 @@ Peer::~Peer ()
stat.bytes_in = bytes_in_; stat.bytes_in = bytes_in_;
stat.bytes_out = bytes_out_; stat.bytes_out = bytes_out_;
stat.ec = std::move (ec_); stat.ec = std::move (ec_);
server_.report (std::move (stat)); server_.report (std::move (stat));
if (journal_.trace) journal_.trace <<
id_ << " onClose (" << ec_ << ")";
server_.handler().onClose (session(), ec_); server_.handler().onClose (session(), ec_);
} }
server_.remove (*this); server_.remove (*this);
if (journal_.trace) journal_.trace << if (journal_.trace) journal_.trace << id_ <<
id_ << " destroyed"; "destroyed: " << request_count_ <<
((request_count_ == 1) ? " request" : " requests");
--s_count_;
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Called when the acceptor accepts our socket. // Called when the acceptor accepts our socket.
void void
Peer::accept () Peer::accept (boost::asio::ip::tcp::endpoint endpoint)
{ {
if (! strand_.running_in_this_thread()) if (! strand_.running_in_this_thread())
return server_.get_io_service().dispatch (strand_.wrap ( return server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer::accept, shared_from_this()))); std::bind (&Peer::accept, shared_from_this(), endpoint)));
if (journal_.trace) journal_.trace << if (journal_.trace) journal_.trace << id_ <<
id_ << " accept"; "accept: " << endpoint.address();
callClose_ = true; callClose_ = true;
@@ -113,57 +121,42 @@ Peer::accept ()
when_str_ = beast::Time::getCurrentTime().formatted ( when_str_ = beast::Time::getCurrentTime().formatted (
"%Y-%b-%d %H:%M:%S").toStdString(); "%Y-%b-%d %H:%M:%S").toStdString();
if (journal_.trace) journal_.trace <<
id_ << " onAccept";
server_.handler().onAccept (session()); server_.handler().onAccept (session());
// Handler might have closed // Handler might have closed
if (closed_) if (state_ == State::closed)
{ {
// VFALCO TODO Is this the correct way to close the socket? // VFALCO TODO Is this the correct way to close the socket?
// See what state the socket is in and verify. // See what state the socket is in and verify.
cancel(); //closed();
return; return;
} }
if (socket_->needs_handshake ()) if (socket_->needs_handshake ())
{ {
start_timer(); start_timer();
socket_->async_handshake (beast::asio::abstract_socket::server, socket_->async_handshake (beast::asio::abstract_socket::server,
strand_.wrap (std::bind (&Peer::on_ssl_handshake, shared_from_this(), strand_.wrap (std::bind (&Peer::on_handshake, shared_from_this(),
beast::asio::placeholders::error))); beast::asio::placeholders::error)));
} }
else else
{ {
on_read_request (error_code{}, 0); async_read();
} }
} }
// Cancel all pending i/o and timers and send tcp shutdown.
void
Peer::cancel ()
{
if (journal_.trace) journal_.trace <<
id_ << " cancel";
error_code ec;
timer_.cancel (ec);
socket_->cancel (ec);
socket_->shutdown (socket::shutdown_both, ec);
}
// Called by a completion handler when error is not eof or aborted. // Called by a completion handler when error is not eof or aborted.
void void
Peer::failed (error_code const& ec) Peer::fail (error_code ec)
{ {
if (journal_.trace) journal_.trace << assert (ec);
id_ << " failed, " << ec.message(); assert (strand_.running_in_this_thread());
if (journal_.debug) journal_.debug << id_ <<
"fail: " << ec.message();
ec_ = ec; ec_ = ec;
assert (ec_); socket_->cancel(ec);
cancel ();
} }
// Start the timer. // Start the timer.
@@ -172,9 +165,6 @@ Peer::failed (error_code const& ec)
void void
Peer::start_timer() Peer::start_timer()
{ {
if (journal_.trace) journal_.trace <<
id_ << " start_timer";
timer_.expires_from_now ( timer_.expires_from_now (
boost::posix_time::seconds ( boost::posix_time::seconds (
timeoutSeconds)); timeoutSeconds));
@@ -184,29 +174,35 @@ Peer::start_timer()
beast::asio::placeholders::error))); beast::asio::placeholders::error)));
} }
// Send a shared buffer
void void
Peer::async_write (SharedBuffer const& buf) Peer::async_write ()
{ {
assert (buf.get().size() > 0); void const* data;
std::size_t bytes;
++pending_writes_; {
std::lock_guard <std::mutex> lock (mutex_);
if (journal_.trace) journal_.trace << buffer& b = write_queue_.front();
id_ << " async_write, pending_writes = " << pending_writes_; data = b.data.get() + b.used;
bytes = b.bytes - b.used;
}
start_timer(); start_timer();
boost::asio::async_write (*socket_, boost::asio::buffer (data, bytes),
boost::asio::transfer_at_least (1), strand_.wrap (std::bind (
&Peer::on_write, shared_from_this(),
beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred)));
}
// Send the copy. We pass the SharedBuffer in the last parameter void
// so that a reference is maintained as the handler gets copied. Peer::async_read()
// When the final completion function returns, the reference {
// count will drop to zero and the buffer will be freed. start_timer();
// boost::asio::async_read (*socket_, read_buf_.prepare (bufferSize),
boost::asio::async_write (*socket_, boost::asio::transfer_at_least (1), strand_.wrap (std::bind (
boost::asio::const_buffers_1 (&(*buf)[0], buf->size()), &Peer::on_read, shared_from_this(),
strand_.wrap (std::bind (&Peer::on_write_response, beast::asio::placeholders::error,
shared_from_this(), beast::asio::placeholders::error, beast::asio::placeholders::bytes_transferred)));
beast::asio::placeholders::bytes_transferred, buf)));
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -215,154 +211,310 @@ Peer::async_write (SharedBuffer const& buf)
void void
Peer::on_timer (error_code ec) Peer::on_timer (error_code ec)
{ {
if (ec == boost::asio::error::operation_aborted) if (state_ == State::closed)
{
if (journal_.trace) journal_.trace << id_ <<
"timer: closed";
return; return;
}
if (ec == boost::asio::error::operation_aborted)
{
// Disable this otherwise we log needlessly
// on every new read and write.
/*
if (journal_.trace) journal_.trace << id_ <<
"timer: aborted";
*/
return;
}
if (! ec) if (! ec)
ec = boost::system::errc::make_error_code ( ec = boost::system::errc::make_error_code (
boost::system::errc::timed_out); boost::system::errc::timed_out);
failed (ec); if (journal_.debug) journal_.debug << id_ <<
"timer: " << ec.message();
fail (ec);
} }
// Called when the handshake completes // Called when the handshake completes
void void
Peer::on_ssl_handshake (error_code ec) Peer::on_handshake (error_code ec)
{ {
if (journal_.trace) journal_.trace <<
id_ << " on_ssl_handshake, " << ec.message();
if (ec == boost::asio::error::operation_aborted)
return;
if (ec != 0)
{ {
failed (ec); error_code ec;
timer_.cancel(ec);
}
bool const ssl = socket_->ssl_handle() != nullptr;
if (state_ == State::closed)
{
if (journal_.trace) journal_.trace << id_ <<
"handshake: closed";
return; return;
} }
on_read_request (error_code{}, 0); if (ec == boost::asio::error::operation_aborted)
{
if (journal_.trace) journal_.trace << id_ <<
"handshake: aborted";
return;
}
if (ec)
{
if (journal_.debug) journal_.debug << id_ <<
"handshake: " << ec.message();
return fail (ec);
}
if (journal_.trace) journal_.trace << id_ <<
"handshake" << (ssl ? ": ssl" : "");
async_read();
} }
// Called repeatedly with the http request data // Called repeatedly with the http request data
void void
Peer::on_read_request (error_code ec, std::size_t bytes_transferred) Peer::on_read (error_code ec, std::size_t bytes_transferred)
{ {
if (journal_.trace) // This needs to happen before the call to onRequest
// otherwise we could time out if the Handler takes too long.
{ {
if (! ec_) error_code ec;
journal_.trace << timer_.cancel(ec);
id_ << " on_read_request, " << }
bytes_transferred << " bytes";
else if (state_ == State::closed)
journal_.trace << {
id_ << " on_read_request failed, " << if (journal_.trace) journal_.trace << id_ <<
ec.message(); "read: closed";
return;
} }
if (ec == boost::asio::error::operation_aborted) if (ec == boost::asio::error::operation_aborted)
{
if (journal_.trace) journal_.trace << id_ <<
"read: aborted";
return; return;
}
if (beast::asio::is_short_read (ec))
{
if (journal_.trace) journal_.trace << id_ <<
"read: " << ec.message();
return fail (ec);
}
if (ec && ec != boost::asio::error::eof)
{
if (journal_.debug) journal_.debug << id_ <<
"read: " << ec.message();
return fail (ec);
}
bool const eof = ec == boost::asio::error::eof; bool const eof = ec == boost::asio::error::eof;
if (eof) if (! eof)
ec = error_code{};
if (! ec)
{ {
bytes_in_ += bytes_transferred; if (journal_.trace) journal_.trace << id_ <<
"read: " << bytes_transferred << " bytes";
}
else
{
// End of stream reached:
// http://www.boost.org/doc/libs/1_56_0/doc/html/boost_asio/overview/core/streams.html
if (bytes_transferred != 0)
if (journal_.error) journal_.error << id_ <<
"read: eof (" << bytes_transferred << " bytes)";
if (journal_.debug) journal_.debug << id_ <<
"read: eof";
ec = error_code{};
}
read_buf_.commit (bytes_transferred); bytes_in_ += bytes_transferred;
read_buf_.commit (bytes_transferred);
std::pair <bool, std::size_t> result; std::pair <bool, std::size_t> result;
if (! eof)
if (! eof) {
{ result = parser_.write (read_buf_.data());
result = parser_.write (read_buf_.data()); if (result.first)
if (result.first) read_buf_.consume (result.second);
read_buf_.consume (result.second);
else
ec = parser_.error();
}
else else
{ ec = parser_.error();
result.first = parser_.write_eof(); }
if (! result.first) else
ec = parser_.error(); {
} result.first = parser_.write_eof();
if (! result.first)
ec = parser_.error();
}
// VFALCO TODO Currently parsing errors are treated the
// same as the connection dropping. Instead, we
// should request that the handler compose a proper HTTP error
// response. This requires refactoring HTTPReply() into
// something sensible.
//
if (! ec && parser_.complete())
{
// Perform half-close when Connection: close and not SSL
if (! message_.keep_alive() &&
! socket_->needs_handshake())
socket_->shutdown (socket::shutdown_receive, ec);
if (! ec) if (! ec)
{ {
if (parser_.complete()) ++request_count_;
{ server_.handler().onRequest (session());
// ??? return;
//if (! socket_->needs_handshake())
// socket_->shutdown (socket::shutdown_receive, ec);
if (journal_.trace) journal_.trace <<
id_ << " onRequest";
++request_count_;
server_.handler().onRequest (session());
return;
}
} }
} }
if (ec) if (ec)
{ {
failed (ec); if (journal_.debug) journal_.debug << id_ <<
return; "read: " << ec.message();
return fail (ec);
} }
start_timer(); if (! eof)
async_read();
socket_->async_read_some (read_buf_.prepare (bufferSize), strand_.wrap (
std::bind (&Peer::on_read_request, shared_from_this(),
beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred)));
} }
// Called when async_write completes. // Called when async_write completes.
void void
Peer::on_write_response (error_code ec, std::size_t bytes_transferred, Peer::on_write (error_code ec, std::size_t bytes_transferred)
SharedBuffer const& buf)
{ {
timer_.cancel (ec);
if (journal_.trace)
{ {
if (! ec_) error_code ec;
journal_.trace << timer_.cancel (ec);
id_ << " on_write_response, " << }
bytes_transferred << " bytes";
else if (state_ == State::closed)
journal_.trace << {
id_ << " on_write_response failed, " << if (journal_.trace) journal_.trace << id_ <<
ec.message(); "write: closed";
return;
} }
if (ec == boost::asio::error::operation_aborted) if (ec == boost::asio::error::operation_aborted)
return;
if (ec != 0)
{ {
failed (ec); if (journal_.trace) journal_.trace << id_ <<
"write: aborted";
return; return;
} }
if (ec)
{
if (journal_.debug) journal_.debug << id_ <<
"write: " << ec.message();
return fail (ec);
}
if (bytes_transferred == 0)
if (journal_.error) journal_.error << id_ <<
"write: 0 bytes";
if (journal_.trace) journal_.trace << id_ <<
"write: " << bytes_transferred << " bytes";
bytes_out_ += bytes_transferred; bytes_out_ += bytes_transferred;
assert (pending_writes_ > 0); bool empty;
if (--pending_writes_ > 0)
return;
if (closed_)
{ {
socket_->shutdown (socket::shutdown_send, ec); std::lock_guard <std::mutex> lock (mutex_);
buffer& b = write_queue_.front();
b.used += bytes_transferred;
if (b.used == b.bytes)
{
write_queue_.pop_front();
empty = write_queue_.empty();
}
else
{
assert (b.used < b.bytes);
empty = false;
}
}
if (! empty)
return async_write();
if (! complete_)
return; return;
// Handler is done writing, did we graceful close?
if (state_ == State::flush)
{
if (socket_->needs_handshake())
{
// ssl::stream
start_timer();
socket_->async_shutdown (strand_.wrap (std::bind (
&Peer::on_shutdown, shared_from_this(),
beast::asio::placeholders::error)));
return;
}
{
error_code ec;
socket_->shutdown (MultiSocket::shutdown_send, ec);
}
return;
} }
// keep-alive
complete_ = false;
async_read();
}
// Called when async_shutdown completes
void
Peer::on_shutdown (error_code ec)
{
{
error_code ec;
timer_.cancel (ec);
}
if (ec == boost::asio::error::operation_aborted)
{
// We canceled i/o on the socket, or we called async_shutdown
// and then closed the socket before getting the completion.
if (journal_.trace) journal_.trace << id_ <<
"shutdown: aborted";
return;
}
if (ec == boost::asio::error::eof)
{
// Happens when ssl::stream::async_shutdown completes without error
if (journal_.trace) journal_.trace << id_ <<
"shutdown: eof";
return;
}
if ((ec.category() == boost::asio::error::get_ssl_category())
&& (ERR_GET_REASON(ec.value()) == SSL_R_SHORT_READ))
{
// Remote peer failed to send SSL close_notify message.
if (journal_.trace) journal_.trace << id_ <<
"shutdown: missing close_notify";
return;
}
if (ec)
{
if (journal_.debug) journal_.debug << id_ <<
"shutdown: " << ec.message();
return fail (ec);
}
if (journal_.trace) journal_.trace << id_ <<
"shutdown";
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -371,33 +523,28 @@ Peer::on_write_response (error_code ec, std::size_t bytes_transferred,
void void
Peer::write (void const* buffer, std::size_t bytes) Peer::write (void const* buffer, std::size_t bytes)
{ {
server_.get_io_service().dispatch (strand_.wrap ( if (bytes == 0)
std::bind (&Peer::async_write, shared_from_this(), return;
SharedBuffer (static_cast <char const*> (buffer), bytes))));
}
// Called to indicate the response has been written (but not sent) bool empty;
void {
Peer::complete() std::lock_guard <std::mutex> lock (mutex_);
{ empty = write_queue_.empty();
if (! strand_.running_in_this_thread()) write_queue_.emplace_back (buffer, bytes);
return server_.get_io_service().dispatch (strand_.wrap ( }
std::bind (&Peer::complete, shared_from_this())));
message_ = beast::http::message{}; if (empty)
parser_ = beast::http::parser{message_, true}; server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer::async_write, shared_from_this())));
on_read_request (error_code{}, 0);
} }
// Make the Session asynchronous // Make the Session asynchronous
void void
Peer::detach () Peer::detach ()
{ {
if (detached_.exchange (1) == 0) if (! detach_ref_)
{ {
assert (! work_); assert (! work_);
assert (detach_ref_ == nullptr);
// Maintain an additional reference while detached // Maintain an additional reference while detached
detach_ref_ = shared_from_this(); detach_ref_ = shared_from_this();
@@ -411,27 +558,77 @@ Peer::detach ()
} }
} }
// Called to indicate the response has been written (but not sent)
void
Peer::complete()
{
if (! strand_.running_in_this_thread())
return server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer::complete, shared_from_this())));
// Reattach
detach_ref_.reset();
work_ = boost::none;
message_ = beast::http::message{};
parser_ = beast::http::parser{message_, true};
complete_ = true;
bool empty;
{
std::lock_guard <std::mutex> lock (mutex_);
empty = write_queue_.empty();
}
if (empty)
{
// keep-alive
complete_ = false;
async_read();
}
}
// Called from the Handler to close the session. // Called from the Handler to close the session.
void void
Peer::close (bool graceful) Peer::close (bool graceful)
{ {
closed_ = true;
if (! strand_.running_in_this_thread()) if (! strand_.running_in_this_thread())
return server_.get_io_service().dispatch (strand_.wrap ( return server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer::close, shared_from_this(), graceful))); std::bind (&Peer::close, shared_from_this(), graceful)));
if (! graceful || pending_writes_ == 0) // Reattach
{ detach_ref_.reset();
// We should cancel pending i/o work_ = boost::none;
if (pending_writes_ == 0) assert (state_ == State::open);
state_ = graceful ? State::flush : State::closed;
complete_ = true;
if (graceful)
{
bool empty;
{ {
std::lock_guard <std::mutex> lock (mutex_);
empty = write_queue_.empty();
}
if (! empty)
return;
if (socket_->needs_handshake())
{
start_timer();
socket_->async_shutdown (strand_.wrap (std::bind (
&Peer::on_shutdown, shared_from_this(),
beast::asio::placeholders::error)));
return;
} }
} }
// Release our additional reference error_code ec;
detach_ref_.reset(); timer_.cancel (ec);
socket_->close (ec);
} }
} }

View File

@@ -33,13 +33,19 @@
#include <beast/module/asio/http/HTTPRequestParser.h> #include <beast/module/asio/http/HTTPRequestParser.h>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <boost/asio/streambuf.hpp> #include <boost/asio/streambuf.hpp>
#include <boost/logic/tribool.hpp>
#include <atomic>
#include <chrono> #include <chrono>
#include <functional> #include <functional>
#include <list>
#include <memory> #include <memory>
#include <mutex>
namespace ripple { namespace ripple {
namespace HTTP { namespace HTTP {
//------------------------------------------------------------------------------
// Holds the copy of buffers being sent // Holds the copy of buffers being sent
// VFALCO TODO Replace with std::shared_ptr<std::string> // VFALCO TODO Replace with std::shared_ptr<std::string>
// //
@@ -54,50 +60,79 @@ class Peer
{ {
private: private:
typedef std::chrono::system_clock clock_type; typedef std::chrono::system_clock clock_type;
typedef MultiSocket socket_type;
enum enum
{ {
// Size of our receive buffer // Size of our receive buffer
bufferSize = 2048, bufferSize = 4 * 1024,
// Largest HTTP request allowed // Largest HTTP request allowed
maxRequestBytes = 32 * 1024, maxRequestBytes = 32 * 1024,
// Max seconds without completing a message // Max seconds without completing a message
timeoutSeconds = 30 timeoutSeconds = 30
//timeoutSeconds = 3
}; };
enum class State
{
open,
flush,
closed
};
struct buffer
{
buffer (void const* ptr, std::size_t len)
: data (new char[len])
, bytes (len)
, used (0)
{
memcpy (data.get(), ptr, len);
}
std::unique_ptr <char[]> data;
std::size_t bytes;
std::size_t used;
};
beast::Journal journal_; beast::Journal journal_;
ServerImpl& server_; ServerImpl& server_;
std::string id_; std::string id_;
std::size_t nid_;
boost::asio::io_service::strand strand_; boost::asio::io_service::strand strand_;
boost::asio::deadline_timer timer_; boost::asio::deadline_timer timer_;
std::unique_ptr <MultiSocket> socket_; std::unique_ptr <socket_type> socket_;
boost::asio::streambuf read_buf_; boost::asio::streambuf read_buf_;
beast::http::message message_; beast::http::message message_;
beast::http::parser parser_; beast::http::parser parser_;
int pending_writes_; std::list <buffer> write_queue_;
bool closed_; std::mutex mutex_;
bool finished_; State state_ = State::open;
bool callClose_; bool complete_ = false;
bool callClose_ = false;
std::shared_ptr <Peer> detach_ref_; std::shared_ptr <Peer> detach_ref_;
boost::optional <boost::asio::io_service::work> work_; boost::optional <boost::asio::io_service::work> work_;
boost::system::error_code ec_; boost::system::error_code ec_;
std::atomic <int> detached_;
clock_type::time_point when_; clock_type::time_point when_;
std::string when_str_; std::string when_str_;
int request_count_; int request_count_ = 0;
std::size_t bytes_in_; std::size_t bytes_in_ = 0;
std::size_t bytes_out_; std::size_t bytes_out_ = 0;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
static std::atomic <std::size_t> s_count_;
public: public:
static
std::size_t
count();
Peer (ServerImpl& impl, Port const& port, beast::Journal journal); Peer (ServerImpl& impl, Port const& port, beast::Journal journal);
~Peer (); ~Peer ();
@@ -114,20 +149,20 @@ public:
} }
void void
accept(); accept (boost::asio::ip::tcp::endpoint endpoint);
private: private:
void void
cancel(); fail (error_code ec);
void
failed (error_code const& ec);
void void
start_timer(); start_timer();
void void
async_write (SharedBuffer const& buf); async_write();
void
async_read();
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
// //
@@ -138,17 +173,19 @@ private:
on_timer (error_code ec); on_timer (error_code ec);
void void
on_ssl_handshake (error_code ec); on_handshake (error_code ec);
void void
on_read_request (error_code ec, std::size_t bytes_transferred); on_read (error_code ec, std::size_t bytes_transferred);
void void
on_write_response (error_code ec, std::size_t bytes_transferred, on_write (error_code ec, std::size_t bytes_transferred);
SharedBuffer const& buf);
void void
on_close (); on_shutdown (error_code ec);
void
on_close (error_code ec);
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
// //
@@ -177,10 +214,10 @@ private:
write (void const* buffer, std::size_t bytes) override; write (void const* buffer, std::size_t bytes) override;
void void
complete() override; detach() override;
void void
detach() override; complete() override;
void void
close (bool graceful) override; close (bool graceful) override;

View File

@@ -38,6 +38,7 @@ ServerImpl::ServerImpl (Server& server,
, m_strand (io_service_) , m_strand (io_service_)
, m_work (boost::in_place (std::ref (io_service_))) , m_work (boost::in_place (std::ref (io_service_)))
, m_stopped (true) , m_stopped (true)
, hist_{}
{ {
thread_ = std::thread (std::bind ( thread_ = std::thread (std::bind (
&ServerImpl::run, this)); &ServerImpl::run, this));
@@ -132,15 +133,44 @@ void
ServerImpl::remove (Door& door) ServerImpl::remove (Door& door)
{ {
std::lock_guard <std::mutex> lock (mutex_); std::lock_guard <std::mutex> lock (mutex_);
state_.doors.push_back (door); state_.doors.erase (state_.doors.iterator_to (door));
} }
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
int
ServerImpl::ceil_log2 (unsigned long long x)
{
static const unsigned long long t[6] = {
0xFFFFFFFF00000000ull,
0x00000000FFFF0000ull,
0x000000000000FF00ull,
0x00000000000000F0ull,
0x000000000000000Cull,
0x0000000000000002ull
};
int y = (((x & (x - 1)) == 0) ? 0 : 1);
int j = 32;
int i;
for(i = 0; i < 6; i++) {
int k = (((x & t[i]) == 0) ? 0 : j);
y += k;
x >>= k;
j >>= 1;
}
return y;
}
void void
ServerImpl::report (Stat&& stat) ServerImpl::report (Stat&& stat)
{ {
int const bucket = ceil_log2 (stat.requests);
std::lock_guard <std::mutex> lock (mutex_); std::lock_guard <std::mutex> lock (mutex_);
++hist_[bucket];
high_ = std::max (high_, bucket);
if (stats_.size() >= historySize) if (stats_.size() >= historySize)
stats_.pop_back(); stats_.pop_back();
stats_.emplace_front (std::move(stat)); stats_.emplace_front (std::move(stat));
@@ -153,12 +183,26 @@ ServerImpl::onWrite (beast::PropertyStream::Map& map)
// VFALCO TODO Write the list of doors // VFALCO TODO Write the list of doors
map ["active"] = Peer::count();
{ {
beast::PropertyStream::Set set ("sessions", map); std::string s;
for (int i = 0; i <= high_; ++i)
{
if (i)
s += ", ";
s += std::to_string (hist_[i]);
}
map ["hist"] = s;
}
{
beast::PropertyStream::Set set ("history", map);
for (auto const& stat : stats_) for (auto const& stat : stats_)
{ {
beast::PropertyStream::Map item (set); beast::PropertyStream::Map item (set);
item ["id"] = stat.id;
item ["when"] = stat.when; item ["when"] = stat.when;
{ {

View File

@@ -28,6 +28,7 @@
#include <beast/module/asio/basics/SharedArg.h> #include <beast/module/asio/basics/SharedArg.h>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <boost/optional.hpp> #include <boost/optional.hpp>
#include <array>
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <deque> #include <deque>
@@ -43,6 +44,7 @@ class Peer;
struct Stat struct Stat
{ {
std::size_t id;
std::string when; std::string when;
std::chrono::seconds elapsed; std::chrono::seconds elapsed;
int requests; int requests;
@@ -88,6 +90,8 @@ private:
State state_; State state_;
Doors m_doors; Doors m_doors;
std::deque <Stat> stats_; std::deque <Stat> stats_;
std::array <std::size_t, 64> hist_;
int high_ = 0;
public: public:
ServerImpl (Server& server, Handler& handler, beast::Journal journal); ServerImpl (Server& server, Handler& handler, beast::Journal journal);
@@ -136,6 +140,10 @@ public:
onWrite (beast::PropertyStream::Map& map); onWrite (beast::PropertyStream::Map& map);
private: private:
static
int
ceil_log2 (unsigned long long x);
static static
int int
compare (Port const& lhs, Port const& rhs); compare (Port const& lhs, Port const& rhs);

View File

@@ -303,7 +303,7 @@ public:
, m_deprecatedUNL (UniqueNodeList::New (*m_jobQueue)) , m_deprecatedUNL (UniqueNodeList::New (*m_jobQueue))
, m_rpcHTTPServer (make_RPCHTTPServer (*m_networkOPs, , m_rpcHTTPServer (make_RPCHTTPServer (*m_networkOPs,
m_logs.journal("HTTPServer"), *m_jobQueue, *m_networkOPs, *m_resourceManager)) *m_jobQueue, *m_networkOPs, *m_resourceManager))
, m_rpcServerHandler (*m_networkOPs, *m_resourceManager) // passive object, not a Service , m_rpcServerHandler (*m_networkOPs, *m_resourceManager) // passive object, not a Service

View File

@@ -39,16 +39,15 @@ public:
HTTP::Server m_server; HTTP::Server m_server;
std::unique_ptr <RippleSSLContext> m_context; std::unique_ptr <RippleSSLContext> m_context;
RPCHTTPServerImp (Stoppable& parent, RPCHTTPServerImp (Stoppable& parent, JobQueue& jobQueue,
beast::Journal journal, JobQueue& jobQueue, NetworkOPs& networkOPs, NetworkOPs& networkOPs, Resource::Manager& resourceManager)
Resource::Manager& resourceManager)
: RPCHTTPServer (parent) : RPCHTTPServer (parent)
, m_resourceManager (resourceManager) , m_resourceManager (resourceManager)
, m_journal (journal) , m_journal (deprecatedLogs().journal("HTTP-RPC"))
, m_jobQueue (jobQueue) , m_jobQueue (jobQueue)
, m_networkOPs (networkOPs) , m_networkOPs (networkOPs)
, m_deprecatedHandler (networkOPs, resourceManager) , m_deprecatedHandler (networkOPs, resourceManager)
, m_server (*this, journal) , m_server (*this, deprecatedLogs().journal("HTTP"))
{ {
if (getConfig ().RPC_SECURE == 0) if (getConfig ().RPC_SECURE == 0)
{ {
@@ -307,19 +306,18 @@ public:
RPCHTTPServer::RPCHTTPServer (Stoppable& parent) RPCHTTPServer::RPCHTTPServer (Stoppable& parent)
: Stoppable ("RPCHTTPServer", parent) : Stoppable ("RPCHTTPServer", parent)
, Source ("rpc") , Source ("http")
{ {
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
std::unique_ptr <RPCHTTPServer> std::unique_ptr <RPCHTTPServer>
make_RPCHTTPServer (beast::Stoppable& parent, beast::Journal journal, make_RPCHTTPServer (beast::Stoppable& parent, JobQueue& jobQueue,
JobQueue& jobQueue, NetworkOPs& networkOPs, NetworkOPs& networkOPs, Resource::Manager& resourceManager)
Resource::Manager& resourceManager)
{ {
return std::make_unique <RPCHTTPServerImp> ( return std::make_unique <RPCHTTPServerImp> (
parent, journal, jobQueue, networkOPs, resourceManager); parent, jobQueue, networkOPs, resourceManager);
} }
} }

View File

@@ -1,4 +1,4 @@
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
/* /*
This file is part of rippled: https://github.com/ripple/rippled This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc. Copyright (c) 2012, 2013 Ripple Labs Inc.
@@ -37,17 +37,19 @@ public:
virtual virtual
~RPCHTTPServer() = default; ~RPCHTTPServer() = default;
/** Opens listening ports based on the Config settings. */ /** Opens listening ports based on the Config settings
This is implemented outside the constructor to support
two-stage initialization in the Application object.
*/
virtual virtual
void void
setup (beast::Journal journal) = 0; setup (beast::Journal journal) = 0;
}; };
std::unique_ptr <RPCHTTPServer> std::unique_ptr <RPCHTTPServer>
make_RPCHTTPServer (beast::Stoppable& parent, beast::Journal journal, make_RPCHTTPServer (beast::Stoppable& parent, JobQueue& jobQueue,
JobQueue& jobQueue, NetworkOPs& networkOPs, NetworkOPs& networkOPs, Resource::Manager& resourceManager);
Resource::Manager& resourceManager);
} } // ripple
#endif #endif