diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj
index 98a966c37..1d8f7ca5f 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj
+++ b/Builds/VisualStudio2013/RippleD.vcxproj
@@ -159,6 +159,8 @@
+
+
True
diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters
index 6c4b66ea7..2b043ba8b 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters
@@ -711,6 +711,9 @@
beast\asio
+
+ beast\asio
+
beast\asio\tests
diff --git a/src/ripple/http/Session.h b/src/ripple/http/Session.h
index 85a280af0..10e7f6fc2 100644
--- a/src/ripple/http/Session.h
+++ b/src/ripple/http/Session.h
@@ -147,6 +147,15 @@ public:
}
/** @} */
+ /** Detach the session.
+ This holds the session open so that the response can be sent
+ asynchronously. Calls to io_service::run made by the server
+ will not return until all detached sessions are closed.
+ */
+ virtual
+ void
+ detach() = 0;
+
/** Indicate that the response is complete.
The handler should call this when it has completed writing
the response. If Keep-Alive is indicated on the connection,
@@ -157,15 +166,6 @@ public:
void
complete() = 0;
- /** Detach the session.
- This holds the session open so that the response can be sent
- asynchronously. Calls to io_service::run made by the server
- will not return until all detached sessions are closed.
- */
- virtual
- void
- detach() = 0;
-
/** Close the session.
This will be performed asynchronously. The session will be
closed gracefully after all pending writes have completed.
diff --git a/src/ripple/http/impl/Door.cpp b/src/ripple/http/impl/Door.cpp
index 9b0c0fea8..86ecd716b 100644
--- a/src/ripple/http/impl/Door.cpp
+++ b/src/ripple/http/impl/Door.cpp
@@ -81,7 +81,7 @@ void
Door::async_accept ()
{
auto const peer (std::make_shared (server_, port_, server_.journal()));
- acceptor_.async_accept (peer->get_socket(), std::bind (
+ acceptor_.async_accept (peer->get_socket(), endpoint_, std::bind (
&Door::handle_accept, Ptr(this),
beast::asio::placeholders::error, peer));
}
@@ -95,13 +95,14 @@ Door::handle_accept (error_code ec,
if (ec)
{
- server_.journal().error << "Accept failed: " << ec.message();
+ server_.journal().error <<
+ "accept: " << ec.message();
return;
}
+ auto const endpoint = endpoint_;
async_accept();
-
- peer->accept();
+ peer->accept (endpoint);
}
}
diff --git a/src/ripple/http/impl/Door.h b/src/ripple/http/impl/Door.h
index 356c157ad..0950ea884 100644
--- a/src/ripple/http/impl/Door.h
+++ b/src/ripple/http/impl/Door.h
@@ -39,6 +39,7 @@ private:
ServerImpl& server_;
acceptor acceptor_;
+ endpoint_t endpoint_;
Port port_;
public:
diff --git a/src/ripple/http/impl/Peer.cpp b/src/ripple/http/impl/Peer.cpp
index ec4197c07..fb461c211 100644
--- a/src/ripple/http/impl/Peer.cpp
+++ b/src/ripple/http/impl/Peer.cpp
@@ -18,27 +18,35 @@
//==============================================================================
#include
+#include
#include
namespace ripple {
namespace HTTP {
+/*
+Reference:
+http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
+*/
+
+std::atomic Peer::s_count_;
+
+std::size_t
+Peer::count()
+{
+ return s_count_.load();
+}
+
Peer::Peer (ServerImpl& server, Port const& port, beast::Journal journal)
: journal_ (journal)
, server_ (server)
, strand_ (server_.get_io_service())
, timer_ (server_.get_io_service())
, parser_ (message_, true)
- , pending_writes_ (0)
- , closed_ (false)
- , callClose_ (false)
- , detached_ (0)
- , request_count_ (0)
- , bytes_in_ (0)
- , bytes_out_ (0)
{
static std::atomic sid;
- id_ = std::string("#") + std::to_string(sid++);
+ nid_ = ++sid;
+ id_ = std::string("#") + std::to_string(nid_) + " ";
tag = nullptr;
@@ -63,16 +71,16 @@ Peer::Peer (ServerImpl& server, Port const& port, beast::Journal journal)
server_.add (*this);
- if (journal_.trace) journal_.trace <<
- id_ << " created";
+ if (journal_.trace) journal_.trace << id_ <<
+ "created";
}
-
Peer::~Peer ()
{
if (callClose_)
{
Stat stat;
+ stat.id = nid_;
stat.when = std::move (when_str_);
stat.elapsed = std::chrono::duration_cast <
std::chrono::seconds> (clock_type::now() - when_);
@@ -80,32 +88,32 @@ Peer::~Peer ()
stat.bytes_in = bytes_in_;
stat.bytes_out = bytes_out_;
stat.ec = std::move (ec_);
-
server_.report (std::move (stat));
- if (journal_.trace) journal_.trace <<
- id_ << " onClose (" << ec_ << ")";
server_.handler().onClose (session(), ec_);
}
server_.remove (*this);
- if (journal_.trace) journal_.trace <<
- id_ << " destroyed";
+ if (journal_.trace) journal_.trace << id_ <<
+ "destroyed: " << request_count_ <<
+ ((request_count_ == 1) ? " request" : " requests");
+
+ --s_count_;
}
//------------------------------------------------------------------------------
// Called when the acceptor accepts our socket.
void
-Peer::accept ()
+Peer::accept (boost::asio::ip::tcp::endpoint endpoint)
{
if (! strand_.running_in_this_thread())
return server_.get_io_service().dispatch (strand_.wrap (
- std::bind (&Peer::accept, shared_from_this())));
+ std::bind (&Peer::accept, shared_from_this(), endpoint)));
- if (journal_.trace) journal_.trace <<
- id_ << " accept";
+ if (journal_.trace) journal_.trace << id_ <<
+ "accept: " << endpoint.address();
callClose_ = true;
@@ -113,57 +121,42 @@ Peer::accept ()
when_str_ = beast::Time::getCurrentTime().formatted (
"%Y-%b-%d %H:%M:%S").toStdString();
- if (journal_.trace) journal_.trace <<
- id_ << " onAccept";
-
server_.handler().onAccept (session());
// Handler might have closed
- if (closed_)
+ if (state_ == State::closed)
{
// VFALCO TODO Is this the correct way to close the socket?
// See what state the socket is in and verify.
- cancel();
+ //closed();
return;
}
if (socket_->needs_handshake ())
{
start_timer();
-
socket_->async_handshake (beast::asio::abstract_socket::server,
- strand_.wrap (std::bind (&Peer::on_ssl_handshake, shared_from_this(),
+ strand_.wrap (std::bind (&Peer::on_handshake, shared_from_this(),
beast::asio::placeholders::error)));
}
else
{
- on_read_request (error_code{}, 0);
+ async_read();
}
}
-// Cancel all pending i/o and timers and send tcp shutdown.
-void
-Peer::cancel ()
-{
- if (journal_.trace) journal_.trace <<
- id_ << " cancel";
-
- error_code ec;
- timer_.cancel (ec);
- socket_->cancel (ec);
- socket_->shutdown (socket::shutdown_both, ec);
-}
-
// Called by a completion handler when error is not eof or aborted.
void
-Peer::failed (error_code const& ec)
+Peer::fail (error_code ec)
{
- if (journal_.trace) journal_.trace <<
- id_ << " failed, " << ec.message();
+ assert (ec);
+ assert (strand_.running_in_this_thread());
+
+ if (journal_.debug) journal_.debug << id_ <<
+ "fail: " << ec.message();
ec_ = ec;
- assert (ec_);
- cancel ();
+ socket_->cancel(ec);
}
// Start the timer.
@@ -172,9 +165,6 @@ Peer::failed (error_code const& ec)
void
Peer::start_timer()
{
- if (journal_.trace) journal_.trace <<
- id_ << " start_timer";
-
timer_.expires_from_now (
boost::posix_time::seconds (
timeoutSeconds));
@@ -184,29 +174,35 @@ Peer::start_timer()
beast::asio::placeholders::error)));
}
-// Send a shared buffer
void
-Peer::async_write (SharedBuffer const& buf)
+Peer::async_write ()
{
- assert (buf.get().size() > 0);
-
- ++pending_writes_;
-
- if (journal_.trace) journal_.trace <<
- id_ << " async_write, pending_writes = " << pending_writes_;
+ void const* data;
+ std::size_t bytes;
+ {
+ std::lock_guard lock (mutex_);
+ buffer& b = write_queue_.front();
+ data = b.data.get() + b.used;
+ bytes = b.bytes - b.used;
+ }
start_timer();
+ boost::asio::async_write (*socket_, boost::asio::buffer (data, bytes),
+ boost::asio::transfer_at_least (1), strand_.wrap (std::bind (
+ &Peer::on_write, shared_from_this(),
+ beast::asio::placeholders::error,
+ beast::asio::placeholders::bytes_transferred)));
+}
- // Send the copy. We pass the SharedBuffer in the last parameter
- // so that a reference is maintained as the handler gets copied.
- // When the final completion function returns, the reference
- // count will drop to zero and the buffer will be freed.
- //
- boost::asio::async_write (*socket_,
- boost::asio::const_buffers_1 (&(*buf)[0], buf->size()),
- strand_.wrap (std::bind (&Peer::on_write_response,
- shared_from_this(), beast::asio::placeholders::error,
- beast::asio::placeholders::bytes_transferred, buf)));
+void
+Peer::async_read()
+{
+ start_timer();
+ boost::asio::async_read (*socket_, read_buf_.prepare (bufferSize),
+ boost::asio::transfer_at_least (1), strand_.wrap (std::bind (
+ &Peer::on_read, shared_from_this(),
+ beast::asio::placeholders::error,
+ beast::asio::placeholders::bytes_transferred)));
}
//------------------------------------------------------------------------------
@@ -215,154 +211,310 @@ Peer::async_write (SharedBuffer const& buf)
void
Peer::on_timer (error_code ec)
{
- if (ec == boost::asio::error::operation_aborted)
+ if (state_ == State::closed)
+ {
+ if (journal_.trace) journal_.trace << id_ <<
+ "timer: closed";
return;
+ }
+
+ if (ec == boost::asio::error::operation_aborted)
+ {
+ // Disable this otherwise we log needlessly
+ // on every new read and write.
+ /*
+ if (journal_.trace) journal_.trace << id_ <<
+ "timer: aborted";
+ */
+ return;
+ }
if (! ec)
ec = boost::system::errc::make_error_code (
boost::system::errc::timed_out);
- failed (ec);
+ if (journal_.debug) journal_.debug << id_ <<
+ "timer: " << ec.message();
+ fail (ec);
}
// Called when the handshake completes
void
-Peer::on_ssl_handshake (error_code ec)
+Peer::on_handshake (error_code ec)
{
- if (journal_.trace) journal_.trace <<
- id_ << " on_ssl_handshake, " << ec.message();
-
- if (ec == boost::asio::error::operation_aborted)
- return;
-
- if (ec != 0)
{
- failed (ec);
+ error_code ec;
+ timer_.cancel(ec);
+ }
+
+ bool const ssl = socket_->ssl_handle() != nullptr;
+
+ if (state_ == State::closed)
+ {
+ if (journal_.trace) journal_.trace << id_ <<
+ "handshake: closed";
return;
}
- on_read_request (error_code{}, 0);
+ if (ec == boost::asio::error::operation_aborted)
+ {
+ if (journal_.trace) journal_.trace << id_ <<
+ "handshake: aborted";
+ return;
+ }
+
+ if (ec)
+ {
+ if (journal_.debug) journal_.debug << id_ <<
+ "handshake: " << ec.message();
+ return fail (ec);
+ }
+
+ if (journal_.trace) journal_.trace << id_ <<
+ "handshake" << (ssl ? ": ssl" : "");
+
+ async_read();
}
// Called repeatedly with the http request data
void
-Peer::on_read_request (error_code ec, std::size_t bytes_transferred)
+Peer::on_read (error_code ec, std::size_t bytes_transferred)
{
- if (journal_.trace)
+ // This needs to happen before the call to onRequest
+ // otherwise we could time out if the Handler takes too long.
{
- if (! ec_)
- journal_.trace <<
- id_ << " on_read_request, " <<
- bytes_transferred << " bytes";
- else
- journal_.trace <<
- id_ << " on_read_request failed, " <<
- ec.message();
+ error_code ec;
+ timer_.cancel(ec);
+ }
+
+ if (state_ == State::closed)
+ {
+ if (journal_.trace) journal_.trace << id_ <<
+ "read: closed";
+ return;
}
if (ec == boost::asio::error::operation_aborted)
+ {
+ if (journal_.trace) journal_.trace << id_ <<
+ "read: aborted";
return;
+ }
+
+ if (beast::asio::is_short_read (ec))
+ {
+ if (journal_.trace) journal_.trace << id_ <<
+ "read: " << ec.message();
+ return fail (ec);
+ }
+
+ if (ec && ec != boost::asio::error::eof)
+ {
+ if (journal_.debug) journal_.debug << id_ <<
+ "read: " << ec.message();
+ return fail (ec);
+ }
bool const eof = ec == boost::asio::error::eof;
- if (eof)
- ec = error_code{};
-
- if (! ec)
+ if (! eof)
{
- bytes_in_ += bytes_transferred;
+ if (journal_.trace) journal_.trace << id_ <<
+ "read: " << bytes_transferred << " bytes";
+ }
+ else
+ {
+ // End of stream reached:
+ // http://www.boost.org/doc/libs/1_56_0/doc/html/boost_asio/overview/core/streams.html
+ if (bytes_transferred != 0)
+ if (journal_.error) journal_.error << id_ <<
+ "read: eof (" << bytes_transferred << " bytes)";
+ if (journal_.debug) journal_.debug << id_ <<
+ "read: eof";
+ ec = error_code{};
+ }
- read_buf_.commit (bytes_transferred);
+ bytes_in_ += bytes_transferred;
+ read_buf_.commit (bytes_transferred);
- std::pair result;
-
- if (! eof)
- {
- result = parser_.write (read_buf_.data());
- if (result.first)
- read_buf_.consume (result.second);
- else
- ec = parser_.error();
- }
+ std::pair result;
+ if (! eof)
+ {
+ result = parser_.write (read_buf_.data());
+ if (result.first)
+ read_buf_.consume (result.second);
else
- {
- result.first = parser_.write_eof();
- if (! result.first)
- ec = parser_.error();
- }
+ ec = parser_.error();
+ }
+ else
+ {
+ result.first = parser_.write_eof();
+ if (! result.first)
+ ec = parser_.error();
+ }
+
+ // VFALCO TODO Currently parsing errors are treated the
+ // same as the connection dropping. Instead, we
+ // should request that the handler compose a proper HTTP error
+ // response. This requires refactoring HTTPReply() into
+ // something sensible.
+ //
+ if (! ec && parser_.complete())
+ {
+ // Perform half-close when Connection: close and not SSL
+ if (! message_.keep_alive() &&
+ ! socket_->needs_handshake())
+ socket_->shutdown (socket::shutdown_receive, ec);
if (! ec)
{
- if (parser_.complete())
- {
- // ???
- //if (! socket_->needs_handshake())
- // socket_->shutdown (socket::shutdown_receive, ec);
-
- if (journal_.trace) journal_.trace <<
- id_ << " onRequest";
-
- ++request_count_;
-
- server_.handler().onRequest (session());
- return;
- }
+ ++request_count_;
+ server_.handler().onRequest (session());
+ return;
}
}
if (ec)
{
- failed (ec);
- return;
+ if (journal_.debug) journal_.debug << id_ <<
+ "read: " << ec.message();
+ return fail (ec);
}
- start_timer();
-
- socket_->async_read_some (read_buf_.prepare (bufferSize), strand_.wrap (
- std::bind (&Peer::on_read_request, shared_from_this(),
- beast::asio::placeholders::error,
- beast::asio::placeholders::bytes_transferred)));
+ if (! eof)
+ async_read();
}
// Called when async_write completes.
void
-Peer::on_write_response (error_code ec, std::size_t bytes_transferred,
- SharedBuffer const& buf)
+Peer::on_write (error_code ec, std::size_t bytes_transferred)
{
- timer_.cancel (ec);
-
- if (journal_.trace)
{
- if (! ec_)
- journal_.trace <<
- id_ << " on_write_response, " <<
- bytes_transferred << " bytes";
- else
- journal_.trace <<
- id_ << " on_write_response failed, " <<
- ec.message();
+ error_code ec;
+ timer_.cancel (ec);
+ }
+
+ if (state_ == State::closed)
+ {
+ if (journal_.trace) journal_.trace << id_ <<
+ "write: closed";
+ return;
}
if (ec == boost::asio::error::operation_aborted)
- return;
-
- if (ec != 0)
{
- failed (ec);
+ if (journal_.trace) journal_.trace << id_ <<
+ "write: aborted";
return;
}
+ if (ec)
+ {
+ if (journal_.debug) journal_.debug << id_ <<
+ "write: " << ec.message();
+ return fail (ec);
+ }
+
+ if (bytes_transferred == 0)
+ if (journal_.error) journal_.error << id_ <<
+ "write: 0 bytes";
+
+ if (journal_.trace) journal_.trace << id_ <<
+ "write: " << bytes_transferred << " bytes";
+
bytes_out_ += bytes_transferred;
- assert (pending_writes_ > 0);
- if (--pending_writes_ > 0)
- return;
-
- if (closed_)
+ bool empty;
{
- socket_->shutdown (socket::shutdown_send, ec);
+ std::lock_guard lock (mutex_);
+ buffer& b = write_queue_.front();
+ b.used += bytes_transferred;
+ if (b.used == b.bytes)
+ {
+ write_queue_.pop_front();
+ empty = write_queue_.empty();
+ }
+ else
+ {
+ assert (b.used < b.bytes);
+ empty = false;
+ }
+ }
+
+ if (! empty)
+ return async_write();
+
+ if (! complete_)
return;
+ // Handler is done writing, did we graceful close?
+ if (state_ == State::flush)
+ {
+ if (socket_->needs_handshake())
+ {
+ // ssl::stream
+ start_timer();
+ socket_->async_shutdown (strand_.wrap (std::bind (
+ &Peer::on_shutdown, shared_from_this(),
+ beast::asio::placeholders::error)));
+ return;
+ }
+
+ {
+ error_code ec;
+ socket_->shutdown (MultiSocket::shutdown_send, ec);
+ }
+ return;
}
+
+ // keep-alive
+ complete_ = false;
+ async_read();
+}
+
+// Called when async_shutdown completes
+void
+Peer::on_shutdown (error_code ec)
+{
+ {
+ error_code ec;
+ timer_.cancel (ec);
+ }
+
+ if (ec == boost::asio::error::operation_aborted)
+ {
+ // We canceled i/o on the socket, or we called async_shutdown
+ // and then closed the socket before getting the completion.
+ if (journal_.trace) journal_.trace << id_ <<
+ "shutdown: aborted";
+ return;
+ }
+
+ if (ec == boost::asio::error::eof)
+ {
+ // Happens when ssl::stream::async_shutdown completes without error
+ if (journal_.trace) journal_.trace << id_ <<
+ "shutdown: eof";
+ return;
+ }
+
+ if ((ec.category() == boost::asio::error::get_ssl_category())
+ && (ERR_GET_REASON(ec.value()) == SSL_R_SHORT_READ))
+ {
+ // Remote peer failed to send SSL close_notify message.
+ if (journal_.trace) journal_.trace << id_ <<
+ "shutdown: missing close_notify";
+ return;
+ }
+
+ if (ec)
+ {
+ if (journal_.debug) journal_.debug << id_ <<
+ "shutdown: " << ec.message();
+ return fail (ec);
+ }
+
+ if (journal_.trace) journal_.trace << id_ <<
+ "shutdown";
}
//------------------------------------------------------------------------------
@@ -371,33 +523,28 @@ Peer::on_write_response (error_code ec, std::size_t bytes_transferred,
void
Peer::write (void const* buffer, std::size_t bytes)
{
- server_.get_io_service().dispatch (strand_.wrap (
- std::bind (&Peer::async_write, shared_from_this(),
- SharedBuffer (static_cast (buffer), bytes))));
-}
+ if (bytes == 0)
+ return;
-// Called to indicate the response has been written (but not sent)
-void
-Peer::complete()
-{
- if (! strand_.running_in_this_thread())
- return server_.get_io_service().dispatch (strand_.wrap (
- std::bind (&Peer::complete, shared_from_this())));
+ bool empty;
+ {
+ std::lock_guard lock (mutex_);
+ empty = write_queue_.empty();
+ write_queue_.emplace_back (buffer, bytes);
+ }
- message_ = beast::http::message{};
- parser_ = beast::http::parser{message_, true};
-
- on_read_request (error_code{}, 0);
+ if (empty)
+ server_.get_io_service().dispatch (strand_.wrap (
+ std::bind (&Peer::async_write, shared_from_this())));
}
// Make the Session asynchronous
void
Peer::detach ()
{
- if (detached_.exchange (1) == 0)
+ if (! detach_ref_)
{
assert (! work_);
- assert (detach_ref_ == nullptr);
// Maintain an additional reference while detached
detach_ref_ = shared_from_this();
@@ -411,27 +558,77 @@ Peer::detach ()
}
}
+// Called to indicate the response has been written (but not sent)
+void
+Peer::complete()
+{
+ if (! strand_.running_in_this_thread())
+ return server_.get_io_service().dispatch (strand_.wrap (
+ std::bind (&Peer::complete, shared_from_this())));
+
+ // Reattach
+ detach_ref_.reset();
+ work_ = boost::none;
+
+ message_ = beast::http::message{};
+ parser_ = beast::http::parser{message_, true};
+ complete_ = true;
+
+ bool empty;
+ {
+ std::lock_guard lock (mutex_);
+ empty = write_queue_.empty();
+ }
+
+ if (empty)
+ {
+ // keep-alive
+ complete_ = false;
+ async_read();
+ }
+}
+
// Called from the Handler to close the session.
void
Peer::close (bool graceful)
{
- closed_ = true;
-
if (! strand_.running_in_this_thread())
return server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer::close, shared_from_this(), graceful)));
- if (! graceful || pending_writes_ == 0)
- {
- // We should cancel pending i/o
+ // Reattach
+ detach_ref_.reset();
+ work_ = boost::none;
- if (pending_writes_ == 0)
+ assert (state_ == State::open);
+ state_ = graceful ? State::flush : State::closed;
+
+ complete_ = true;
+
+ if (graceful)
+ {
+ bool empty;
{
+ std::lock_guard lock (mutex_);
+ empty = write_queue_.empty();
+ }
+
+ if (! empty)
+ return;
+
+ if (socket_->needs_handshake())
+ {
+ start_timer();
+ socket_->async_shutdown (strand_.wrap (std::bind (
+ &Peer::on_shutdown, shared_from_this(),
+ beast::asio::placeholders::error)));
+ return;
}
}
- // Release our additional reference
- detach_ref_.reset();
+ error_code ec;
+ timer_.cancel (ec);
+ socket_->close (ec);
}
}
diff --git a/src/ripple/http/impl/Peer.h b/src/ripple/http/impl/Peer.h
index 53559db9b..030428901 100644
--- a/src/ripple/http/impl/Peer.h
+++ b/src/ripple/http/impl/Peer.h
@@ -33,13 +33,19 @@
#include
#include
#include
+#include
+#include
#include
#include
+#include
#include
+#include
namespace ripple {
namespace HTTP {
+//------------------------------------------------------------------------------
+
// Holds the copy of buffers being sent
// VFALCO TODO Replace with std::shared_ptr
//
@@ -54,50 +60,79 @@ class Peer
{
private:
typedef std::chrono::system_clock clock_type;
+ typedef MultiSocket socket_type;
enum
{
// Size of our receive buffer
- bufferSize = 2048,
+ bufferSize = 4 * 1024,
// Largest HTTP request allowed
maxRequestBytes = 32 * 1024,
// Max seconds without completing a message
timeoutSeconds = 30
- //timeoutSeconds = 3
};
+ enum class State
+ {
+ open,
+ flush,
+ closed
+ };
+
+ struct buffer
+ {
+ buffer (void const* ptr, std::size_t len)
+ : data (new char[len])
+ , bytes (len)
+ , used (0)
+ {
+ memcpy (data.get(), ptr, len);
+ }
+
+ std::unique_ptr data;
+ std::size_t bytes;
+ std::size_t used;
+ };
+
beast::Journal journal_;
ServerImpl& server_;
std::string id_;
+ std::size_t nid_;
boost::asio::io_service::strand strand_;
boost::asio::deadline_timer timer_;
- std::unique_ptr socket_;
+ std::unique_ptr socket_;
boost::asio::streambuf read_buf_;
beast::http::message message_;
beast::http::parser parser_;
- int pending_writes_;
- bool closed_;
- bool finished_;
- bool callClose_;
+ std::list write_queue_;
+ std::mutex mutex_;
+ State state_ = State::open;
+ bool complete_ = false;
+ bool callClose_ = false;
std::shared_ptr detach_ref_;
boost::optional work_;
boost::system::error_code ec_;
- std::atomic detached_;
clock_type::time_point when_;
std::string when_str_;
- int request_count_;
- std::size_t bytes_in_;
- std::size_t bytes_out_;
+ int request_count_ = 0;
+ std::size_t bytes_in_ = 0;
+ std::size_t bytes_out_ = 0;
//--------------------------------------------------------------------------
+ static std::atomic s_count_;
+
public:
+ static
+ std::size_t
+ count();
+
Peer (ServerImpl& impl, Port const& port, beast::Journal journal);
~Peer ();
@@ -114,20 +149,20 @@ public:
}
void
- accept();
+ accept (boost::asio::ip::tcp::endpoint endpoint);
private:
void
- cancel();
-
- void
- failed (error_code const& ec);
+ fail (error_code ec);
void
start_timer();
void
- async_write (SharedBuffer const& buf);
+ async_write();
+
+ void
+ async_read();
//--------------------------------------------------------------------------
//
@@ -138,17 +173,19 @@ private:
on_timer (error_code ec);
void
- on_ssl_handshake (error_code ec);
+ on_handshake (error_code ec);
void
- on_read_request (error_code ec, std::size_t bytes_transferred);
+ on_read (error_code ec, std::size_t bytes_transferred);
void
- on_write_response (error_code ec, std::size_t bytes_transferred,
- SharedBuffer const& buf);
+ on_write (error_code ec, std::size_t bytes_transferred);
void
- on_close ();
+ on_shutdown (error_code ec);
+
+ void
+ on_close (error_code ec);
//--------------------------------------------------------------------------
//
@@ -177,10 +214,10 @@ private:
write (void const* buffer, std::size_t bytes) override;
void
- complete() override;
+ detach() override;
void
- detach() override;
+ complete() override;
void
close (bool graceful) override;
diff --git a/src/ripple/http/impl/ServerImpl.cpp b/src/ripple/http/impl/ServerImpl.cpp
index 632a6d2e5..741a7fd3e 100644
--- a/src/ripple/http/impl/ServerImpl.cpp
+++ b/src/ripple/http/impl/ServerImpl.cpp
@@ -38,6 +38,7 @@ ServerImpl::ServerImpl (Server& server,
, m_strand (io_service_)
, m_work (boost::in_place (std::ref (io_service_)))
, m_stopped (true)
+ , hist_{}
{
thread_ = std::thread (std::bind (
&ServerImpl::run, this));
@@ -132,15 +133,44 @@ void
ServerImpl::remove (Door& door)
{
std::lock_guard lock (mutex_);
- state_.doors.push_back (door);
+ state_.doors.erase (state_.doors.iterator_to (door));
}
//--------------------------------------------------------------------------
+int
+ServerImpl::ceil_log2 (unsigned long long x)
+{
+ static const unsigned long long t[6] = {
+ 0xFFFFFFFF00000000ull,
+ 0x00000000FFFF0000ull,
+ 0x000000000000FF00ull,
+ 0x00000000000000F0ull,
+ 0x000000000000000Cull,
+ 0x0000000000000002ull
+ };
+
+ int y = (((x & (x - 1)) == 0) ? 0 : 1);
+ int j = 32;
+ int i;
+
+ for(i = 0; i < 6; i++) {
+ int k = (((x & t[i]) == 0) ? 0 : j);
+ y += k;
+ x >>= k;
+ j >>= 1;
+ }
+
+ return y;
+}
+
void
ServerImpl::report (Stat&& stat)
{
+ int const bucket = ceil_log2 (stat.requests);
std::lock_guard lock (mutex_);
+ ++hist_[bucket];
+ high_ = std::max (high_, bucket);
if (stats_.size() >= historySize)
stats_.pop_back();
stats_.emplace_front (std::move(stat));
@@ -153,12 +183,26 @@ ServerImpl::onWrite (beast::PropertyStream::Map& map)
// VFALCO TODO Write the list of doors
+ map ["active"] = Peer::count();
+
{
- beast::PropertyStream::Set set ("sessions", map);
+ std::string s;
+ for (int i = 0; i <= high_; ++i)
+ {
+ if (i)
+ s += ", ";
+ s += std::to_string (hist_[i]);
+ }
+ map ["hist"] = s;
+ }
+
+ {
+ beast::PropertyStream::Set set ("history", map);
for (auto const& stat : stats_)
{
beast::PropertyStream::Map item (set);
+ item ["id"] = stat.id;
item ["when"] = stat.when;
{
diff --git a/src/ripple/http/impl/ServerImpl.h b/src/ripple/http/impl/ServerImpl.h
index ecd48da9a..4aede59ad 100644
--- a/src/ripple/http/impl/ServerImpl.h
+++ b/src/ripple/http/impl/ServerImpl.h
@@ -28,6 +28,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -43,6 +44,7 @@ class Peer;
struct Stat
{
+ std::size_t id;
std::string when;
std::chrono::seconds elapsed;
int requests;
@@ -88,6 +90,8 @@ private:
State state_;
Doors m_doors;
std::deque stats_;
+ std::array hist_;
+ int high_ = 0;
public:
ServerImpl (Server& server, Handler& handler, beast::Journal journal);
@@ -136,6 +140,10 @@ public:
onWrite (beast::PropertyStream::Map& map);
private:
+ static
+ int
+ ceil_log2 (unsigned long long x);
+
static
int
compare (Port const& lhs, Port const& rhs);
diff --git a/src/ripple/module/app/main/Application.cpp b/src/ripple/module/app/main/Application.cpp
index 257a37a86..dcab531e7 100644
--- a/src/ripple/module/app/main/Application.cpp
+++ b/src/ripple/module/app/main/Application.cpp
@@ -303,7 +303,7 @@ public:
, m_deprecatedUNL (UniqueNodeList::New (*m_jobQueue))
, m_rpcHTTPServer (make_RPCHTTPServer (*m_networkOPs,
- m_logs.journal("HTTPServer"), *m_jobQueue, *m_networkOPs, *m_resourceManager))
+ *m_jobQueue, *m_networkOPs, *m_resourceManager))
, m_rpcServerHandler (*m_networkOPs, *m_resourceManager) // passive object, not a Service
diff --git a/src/ripple/module/app/main/RPCHTTPServer.cpp b/src/ripple/module/app/main/RPCHTTPServer.cpp
index c70b0b849..f6c0d7ec9 100644
--- a/src/ripple/module/app/main/RPCHTTPServer.cpp
+++ b/src/ripple/module/app/main/RPCHTTPServer.cpp
@@ -39,16 +39,15 @@ public:
HTTP::Server m_server;
std::unique_ptr m_context;
- RPCHTTPServerImp (Stoppable& parent,
- beast::Journal journal, JobQueue& jobQueue, NetworkOPs& networkOPs,
- Resource::Manager& resourceManager)
+ RPCHTTPServerImp (Stoppable& parent, JobQueue& jobQueue,
+ NetworkOPs& networkOPs, Resource::Manager& resourceManager)
: RPCHTTPServer (parent)
, m_resourceManager (resourceManager)
- , m_journal (journal)
+ , m_journal (deprecatedLogs().journal("HTTP-RPC"))
, m_jobQueue (jobQueue)
, m_networkOPs (networkOPs)
, m_deprecatedHandler (networkOPs, resourceManager)
- , m_server (*this, journal)
+ , m_server (*this, deprecatedLogs().journal("HTTP"))
{
if (getConfig ().RPC_SECURE == 0)
{
@@ -307,19 +306,18 @@ public:
RPCHTTPServer::RPCHTTPServer (Stoppable& parent)
: Stoppable ("RPCHTTPServer", parent)
- , Source ("rpc")
+ , Source ("http")
{
}
//------------------------------------------------------------------------------
std::unique_ptr
-make_RPCHTTPServer (beast::Stoppable& parent, beast::Journal journal,
- JobQueue& jobQueue, NetworkOPs& networkOPs,
- Resource::Manager& resourceManager)
+make_RPCHTTPServer (beast::Stoppable& parent, JobQueue& jobQueue,
+ NetworkOPs& networkOPs, Resource::Manager& resourceManager)
{
return std::make_unique (
- parent, journal, jobQueue, networkOPs, resourceManager);
+ parent, jobQueue, networkOPs, resourceManager);
}
}
diff --git a/src/ripple/module/app/main/RPCHTTPServer.h b/src/ripple/module/app/main/RPCHTTPServer.h
index 0a2273676..51e5295f4 100644
--- a/src/ripple/module/app/main/RPCHTTPServer.h
+++ b/src/ripple/module/app/main/RPCHTTPServer.h
@@ -1,4 +1,4 @@
-//------------------------------------------------------------------------------
+ //------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
@@ -37,17 +37,19 @@ public:
virtual
~RPCHTTPServer() = default;
- /** Opens listening ports based on the Config settings. */
+ /** Opens listening ports based on the Config settings
+ This is implemented outside the constructor to support
+ two-stage initialization in the Application object.
+ */
virtual
void
setup (beast::Journal journal) = 0;
};
std::unique_ptr
-make_RPCHTTPServer (beast::Stoppable& parent, beast::Journal journal,
- JobQueue& jobQueue, NetworkOPs& networkOPs,
- Resource::Manager& resourceManager);
+make_RPCHTTPServer (beast::Stoppable& parent, JobQueue& jobQueue,
+ NetworkOPs& networkOPs, Resource::Manager& resourceManager);
-}
+} // ripple
#endif