diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj
index 38e2a61142..fc1b5b5ff6 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj
+++ b/Builds/VisualStudio2013/RippleD.vcxproj
@@ -351,23 +351,25 @@
-
+
+
+
-
+
True
-
+
True
-
+
True
@@ -386,15 +388,14 @@
True
-
- True
-
True
True
+
+
@@ -405,9 +406,6 @@
-
- True
-
True
@@ -417,6 +415,9 @@
True
+
+ True
+
True
@@ -1940,6 +1941,9 @@
+
+ True
+
True
diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters
index 34f7c5f85e..8daa6415f0 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters
@@ -337,6 +337,9 @@
{43D68742-4714-D103-EE00-EB10BD045FB6}
+
+ {AA0D98CC-99E6-61CE-86D7-35156DC4EE55}
+
{BEDCC703-A2C8-FF25-7E1E-3471BD39ED98}
@@ -957,28 +960,31 @@
beast
-
+
beast\http
beast\http
+
+ beast\http
+
beast\http
beast\http\detail
-
+
beast\http
beast\http
-
+
beast\http\impl
-
+
beast\http\impl
@@ -999,15 +1005,15 @@
beast\http\impl
-
- beast\http\impl
-
beast\http\impl
beast\http\impl
+
+ beast\http
+
beast\http
@@ -1023,9 +1029,6 @@
beast\http
-
- beast\http\tests
-
beast\http\tests
@@ -1035,6 +1038,9 @@
beast\http\tests
+
+ beast\http\tests
+
beast\http\tests
@@ -2946,6 +2952,9 @@
ripple\http
+
+ ripple\http\tests
+
ripple\json\impl
diff --git a/src/ripple/common/MultiSocket.h b/src/ripple/common/MultiSocket.h
index e116164d5c..02843880a3 100644
--- a/src/ripple/common/MultiSocket.h
+++ b/src/ripple/common/MultiSocket.h
@@ -110,12 +110,14 @@ public:
virtual SSL* ssl_handle () = 0;
// Caller owns the socket
+ // VFALCO TODO return std::unique_ptr
static MultiSocket* New (
boost::asio::ip::tcp::socket& socket,
boost::asio::ssl::context& ssl_context,
int flags = 0);
// Caller owns the io_service
+ // VFALCO TODO return std::unique_ptr
static MultiSocket* New (
boost::asio::io_service& io_service,
boost::asio::ssl::context& ssl_context,
diff --git a/src/ripple/http/Server.h b/src/ripple/http/Server.h
index 10de42c116..dfd759f769 100644
--- a/src/ripple/http/Server.h
+++ b/src/ripple/http/Server.h
@@ -23,6 +23,8 @@
#include
#include
#include
+#include
+#include
#include
#include
#include
@@ -35,7 +37,7 @@ namespace HTTP {
/** Configuration information for a server listening port. */
struct Port
{
- enum Security
+ enum class Security
{
no_ssl,
allow_ssl,
@@ -74,19 +76,15 @@ struct Handler
/** Called when the connection is accepted and we know remoteAddress. */
virtual void onAccept (Session& session) = 0;
- /** Called repeatedly as new HTTP headers are received.
- Guaranteed to be called at least once.
- */
- virtual void onHeaders (Session& session) = 0;
-
- /** Called when we have the full Content-Body. */
+ /** Called when we have a complete HTTP request. */
virtual void onRequest (Session& session) = 0;
/** Called when the session ends.
Guaranteed to be called once.
@param errorCode Non zero for a failed connection.
*/
- virtual void onClose (Session& session, int errorCode) = 0;
+ virtual void onClose (Session& session,
+ boost::system::error_code const& ec) = 0;
/** Called when the server has finished its stop. */
virtual void onStopped (Server& server) = 0;
@@ -108,6 +106,7 @@ public:
*/
virtual
~Server ();
+
/** Returns the Journal associated with the server. */
beast::Journal
journal () const;
@@ -145,6 +144,9 @@ public:
void
stop ();
+ void
+ onWrite (beast::PropertyStream::Map& map);
+
private:
std::unique_ptr m_impl;
};
diff --git a/src/ripple/http/Session.h b/src/ripple/http/Session.h
index 54542c5e61..85a280af0e 100644
--- a/src/ripple/http/Session.h
+++ b/src/ripple/http/Session.h
@@ -20,6 +20,7 @@
#ifndef RIPPLE_HTTP_SESSION_H_INCLUDED
#define RIPPLE_HTTP_SESSION_H_INCLUDED
+#include
#include
#include
#include
@@ -88,26 +89,24 @@ public:
void* tag;
/** Returns the Journal to use for logging. */
- virtual beast::Journal journal() = 0;
+ virtual
+ beast::Journal
+ journal() = 0;
/** Returns the remote address of the connection. */
- virtual beast::IP::Endpoint remoteAddress() = 0;
-
- /** Returns `true` if the full HTTP headers have been received. */
- virtual bool headersComplete() = 0;
+ virtual
+ beast::IP::Endpoint
+ remoteAddress() = 0;
/** Returns the currently known set of headers. */
- virtual beast::HTTPHeaders headers() = 0;
-
- /** Returns the complete HTTP request when it is known. */
- virtual beast::SharedPtr const& request() = 0;
-
- /** Returns the entire Content-Body, if the request is complete. */
- virtual std::string content() = 0;
+ virtual
+ beast::http::message&
+ message() = 0;
/** Send a copy of data asynchronously. */
/** @{ */
- void write (std::string const& s)
+ void
+ write (std::string const& s)
{
if (! s.empty())
write (&s[0],
@@ -115,7 +114,8 @@ public:
}
template
- void write (BufferSequence const& buffers)
+ void
+ write (BufferSequence const& buffers)
{
for (typename BufferSequence::const_iterator iter (buffers.begin());
iter != buffers.end(); ++iter)
@@ -126,35 +126,54 @@ public:
}
}
- virtual void write (void const* buffer, std::size_t bytes) = 0;
+ virtual
+ void
+ write (void const* buffer, std::size_t bytes) = 0;
/** @} */
/** Output support using ostream. */
/** @{ */
- ScopedStream operator<< (std::ostream& manip (std::ostream&))
+ ScopedStream
+ operator<< (std::ostream& manip (std::ostream&))
{
return ScopedStream (*this, manip);
}
template
- ScopedStream operator<< (T const& t)
+ ScopedStream
+ operator<< (T const& t)
{
return ScopedStream (*this, t);
}
/** @} */
+ /** Indicate that the response is complete.
+ The handler should call this when it has completed writing
+ the response. If Keep-Alive is indicated on the connection,
+ this will trigger a read for the next request; else, the
+ connection will be closed when all remaining data has been sent.
+ */
+ virtual
+ void
+ complete() = 0;
+
/** Detach the session.
This holds the session open so that the response can be sent
asynchronously. Calls to io_service::run made by the server
will not return until all detached sessions are closed.
*/
- virtual void detach() = 0;
+ virtual
+ void
+ detach() = 0;
/** Close the session.
This will be performed asynchronously. The session will be
closed gracefully after all pending writes have completed.
+ @param graceful `true` to wait until all data has finished sending.
*/
- virtual void close() = 0;
+ virtual
+ void
+ close (bool graceful) = 0;
};
} // namespace HTTP
diff --git a/src/ripple/http/impl/Door.cpp b/src/ripple/http/impl/Door.cpp
index aa00157e0f..9b0c0fea88 100644
--- a/src/ripple/http/impl/Door.cpp
+++ b/src/ripple/http/impl/Door.cpp
@@ -25,31 +25,31 @@ namespace ripple {
namespace HTTP {
Door::Door (ServerImpl& impl, Port const& port)
- : impl_ (impl)
- , acceptor_ (impl_.get_io_service(), to_asio (port))
+ : server_ (impl)
+ , acceptor_ (server_.get_io_service(), to_asio (port))
, port_ (port)
{
- impl_.add (*this);
+ server_.add (*this);
error_code ec;
acceptor_.set_option (acceptor::reuse_address (true), ec);
if (ec)
{
- impl_.journal().error <<
+ server_.journal().error <<
"Error setting acceptor socket option: " << ec.message();
}
if (! ec)
{
- impl_.journal().info << "Bound to endpoint " <<
+ server_.journal().info << "Bound to endpoint " <<
to_string (acceptor_.local_endpoint());
async_accept();
}
else
{
- impl_.journal().error << "Error binding to endpoint " <<
+ server_.journal().error << "Error binding to endpoint " <<
to_string (acceptor_.local_endpoint()) <<
", '" << ec.message() << "'";
}
@@ -57,7 +57,7 @@ Door::Door (ServerImpl& impl, Port const& port)
Door::~Door ()
{
- impl_.remove (*this);
+ server_.remove (*this);
}
Port const&
@@ -80,7 +80,7 @@ Door::failed (error_code ec)
void
Door::async_accept ()
{
- auto const peer (std::make_shared (impl_, port_));
+ auto const peer (std::make_shared (server_, port_, server_.journal()));
acceptor_.async_accept (peer->get_socket(), std::bind (
&Door::handle_accept, Ptr(this),
beast::asio::placeholders::error, peer));
@@ -95,7 +95,7 @@ Door::handle_accept (error_code ec,
if (ec)
{
- impl_.journal().error << "Accept failed: " << ec.message();
+ server_.journal().error << "Accept failed: " << ec.message();
return;
}
diff --git a/src/ripple/http/impl/Door.h b/src/ripple/http/impl/Door.h
index 35a8f645eb..356c157ade 100644
--- a/src/ripple/http/impl/Door.h
+++ b/src/ripple/http/impl/Door.h
@@ -37,7 +37,7 @@ private:
// VFALCO TODO Use shared_ptr
typedef beast::SharedPtr Ptr;
- ServerImpl& impl_;
+ ServerImpl& server_;
acceptor acceptor_;
Port port_;
diff --git a/src/ripple/http/impl/Peer.cpp b/src/ripple/http/impl/Peer.cpp
index 5f11172bf4..ec4197c071 100644
--- a/src/ripple/http/impl/Peer.cpp
+++ b/src/ripple/http/impl/Peer.cpp
@@ -18,91 +18,386 @@
//==============================================================================
#include
+#include
namespace ripple {
namespace HTTP {
-Peer::Peer (ServerImpl& impl, Port const& port)
- : impl_ (impl)
- , strand_ (impl_.get_io_service())
- , data_timer_ (impl_.get_io_service())
- , request_timer_ (impl_.get_io_service())
- , buffer_ (bufferSize)
- , writesPending_ (0)
+Peer::Peer (ServerImpl& server, Port const& port, beast::Journal journal)
+ : journal_ (journal)
+ , server_ (server)
+ , strand_ (server_.get_io_service())
+ , timer_ (server_.get_io_service())
+ , parser_ (message_, true)
+ , pending_writes_ (0)
, closed_ (false)
, callClose_ (false)
- , errorCode_ (0)
, detached_ (0)
+ , request_count_ (0)
+ , bytes_in_ (0)
+ , bytes_out_ (0)
{
+ static std::atomic sid;
+ id_ = std::string("#") + std::to_string(sid++);
+
tag = nullptr;
int flags = MultiSocket::Flag::server_role;
+
switch (port.security)
{
- default:
- bassertfalse;
-
- case Port::no_ssl:
+ case Port::Security::no_ssl:
break;
- case Port::allow_ssl:
+ case Port::Security::allow_ssl:
flags |= MultiSocket::Flag::ssl;
break;
- case Port::require_ssl:
+ case Port::Security::require_ssl:
flags |= MultiSocket::Flag::ssl_required;
break;
}
socket_.reset (MultiSocket::New (
- impl_.get_io_service(), port.context->get(), flags));
+ server_.get_io_service(), port.context->get(), flags));
- impl_.add (*this);
+ server_.add (*this);
+
+ if (journal_.trace) journal_.trace <<
+ id_ << " created";
}
+
Peer::~Peer ()
{
if (callClose_)
- impl_.handler().onClose (session(), errorCode_);
+ {
+ Stat stat;
+ stat.when = std::move (when_str_);
+ stat.elapsed = std::chrono::duration_cast <
+ std::chrono::seconds> (clock_type::now() - when_);
+ stat.requests = request_count_;
+ stat.bytes_in = bytes_in_;
+ stat.bytes_out = bytes_out_;
+ stat.ec = std::move (ec_);
- impl_.remove (*this);
+ server_.report (std::move (stat));
+ if (journal_.trace) journal_.trace <<
+ id_ << " onClose (" << ec_ << ")";
+
+ server_.handler().onClose (session(), ec_);
+ }
+
+ server_.remove (*this);
+
+ if (journal_.trace) journal_.trace <<
+ id_ << " destroyed";
}
-//--------------------------------------------------------------------------
+//------------------------------------------------------------------------------
-// Returns the Content-Body as a single buffer.
-// VFALCO NOTE This is inefficient...
-std::string
-Peer::content()
+// Called when the acceptor accepts our socket.
+void
+Peer::accept ()
{
- std::string s;
- beast::DynamicBuffer const& body (
- parser_.request()->body ());
- s.resize (body.size ());
- boost::asio::buffer_copy (
- boost::asio::buffer (&s[0],
- s.size()), body.data ());
- return s;
+ if (! strand_.running_in_this_thread())
+ return server_.get_io_service().dispatch (strand_.wrap (
+ std::bind (&Peer::accept, shared_from_this())));
+
+ if (journal_.trace) journal_.trace <<
+ id_ << " accept";
+
+ callClose_ = true;
+
+ when_ = clock_type::now();
+ when_str_ = beast::Time::getCurrentTime().formatted (
+ "%Y-%b-%d %H:%M:%S").toStdString();
+
+ if (journal_.trace) journal_.trace <<
+ id_ << " onAccept";
+
+ server_.handler().onAccept (session());
+
+ // Handler might have closed
+ if (closed_)
+ {
+ // VFALCO TODO Is this the correct way to close the socket?
+ // See what state the socket is in and verify.
+ cancel();
+ return;
+ }
+
+ if (socket_->needs_handshake ())
+ {
+ start_timer();
+
+ socket_->async_handshake (beast::asio::abstract_socket::server,
+ strand_.wrap (std::bind (&Peer::on_ssl_handshake, shared_from_this(),
+ beast::asio::placeholders::error)));
+ }
+ else
+ {
+ on_read_request (error_code{}, 0);
+ }
}
+// Cancel all pending i/o and timers and send tcp shutdown.
+void
+Peer::cancel ()
+{
+ if (journal_.trace) journal_.trace <<
+ id_ << " cancel";
+
+ error_code ec;
+ timer_.cancel (ec);
+ socket_->cancel (ec);
+ socket_->shutdown (socket::shutdown_both, ec);
+}
+
+// Called by a completion handler when error is not eof or aborted.
+void
+Peer::failed (error_code const& ec)
+{
+ if (journal_.trace) journal_.trace <<
+ id_ << " failed, " << ec.message();
+
+ ec_ = ec;
+ assert (ec_);
+ cancel ();
+}
+
+// Start the timer.
+// If the timer expires, the session is considered
+// timed out and will be forcefully closed.
+void
+Peer::start_timer()
+{
+ if (journal_.trace) journal_.trace <<
+ id_ << " start_timer";
+
+ timer_.expires_from_now (
+ boost::posix_time::seconds (
+ timeoutSeconds));
+
+ timer_.async_wait (strand_.wrap (std::bind (
+ &Peer::on_timer, shared_from_this(),
+ beast::asio::placeholders::error)));
+}
+
+// Send a shared buffer
+void
+Peer::async_write (SharedBuffer const& buf)
+{
+ assert (buf.get().size() > 0);
+
+ ++pending_writes_;
+
+ if (journal_.trace) journal_.trace <<
+ id_ << " async_write, pending_writes = " << pending_writes_;
+
+ start_timer();
+
+ // Send the copy. We pass the SharedBuffer in the last parameter
+ // so that a reference is maintained as the handler gets copied.
+ // When the final completion function returns, the reference
+ // count will drop to zero and the buffer will be freed.
+ //
+ boost::asio::async_write (*socket_,
+ boost::asio::const_buffers_1 (&(*buf)[0], buf->size()),
+ strand_.wrap (std::bind (&Peer::on_write_response,
+ shared_from_this(), beast::asio::placeholders::error,
+ beast::asio::placeholders::bytes_transferred, buf)));
+}
+
+//------------------------------------------------------------------------------
+
+// Called when session times out
+void
+Peer::on_timer (error_code ec)
+{
+ if (ec == boost::asio::error::operation_aborted)
+ return;
+
+ if (! ec)
+ ec = boost::system::errc::make_error_code (
+ boost::system::errc::timed_out);
+
+ failed (ec);
+}
+
+// Called when the handshake completes
+void
+Peer::on_ssl_handshake (error_code ec)
+{
+ if (journal_.trace) journal_.trace <<
+ id_ << " on_ssl_handshake, " << ec.message();
+
+ if (ec == boost::asio::error::operation_aborted)
+ return;
+
+ if (ec != 0)
+ {
+ failed (ec);
+ return;
+ }
+
+ on_read_request (error_code{}, 0);
+}
+
+// Called repeatedly with the http request data
+void
+Peer::on_read_request (error_code ec, std::size_t bytes_transferred)
+{
+ if (journal_.trace)
+ {
+ if (! ec_)
+ journal_.trace <<
+ id_ << " on_read_request, " <<
+ bytes_transferred << " bytes";
+ else
+ journal_.trace <<
+ id_ << " on_read_request failed, " <<
+ ec.message();
+ }
+
+ if (ec == boost::asio::error::operation_aborted)
+ return;
+
+ bool const eof = ec == boost::asio::error::eof;
+ if (eof)
+ ec = error_code{};
+
+ if (! ec)
+ {
+ bytes_in_ += bytes_transferred;
+
+ read_buf_.commit (bytes_transferred);
+
+ std::pair result;
+
+ if (! eof)
+ {
+ result = parser_.write (read_buf_.data());
+ if (result.first)
+ read_buf_.consume (result.second);
+ else
+ ec = parser_.error();
+ }
+ else
+ {
+ result.first = parser_.write_eof();
+ if (! result.first)
+ ec = parser_.error();
+ }
+
+ if (! ec)
+ {
+ if (parser_.complete())
+ {
+ // ???
+ //if (! socket_->needs_handshake())
+ // socket_->shutdown (socket::shutdown_receive, ec);
+
+ if (journal_.trace) journal_.trace <<
+ id_ << " onRequest";
+
+ ++request_count_;
+
+ server_.handler().onRequest (session());
+ return;
+ }
+ }
+ }
+
+ if (ec)
+ {
+ failed (ec);
+ return;
+ }
+
+ start_timer();
+
+ socket_->async_read_some (read_buf_.prepare (bufferSize), strand_.wrap (
+ std::bind (&Peer::on_read_request, shared_from_this(),
+ beast::asio::placeholders::error,
+ beast::asio::placeholders::bytes_transferred)));
+}
+
+// Called when async_write completes.
+void
+Peer::on_write_response (error_code ec, std::size_t bytes_transferred,
+ SharedBuffer const& buf)
+{
+ timer_.cancel (ec);
+
+ if (journal_.trace)
+ {
+ if (! ec_)
+ journal_.trace <<
+ id_ << " on_write_response, " <<
+ bytes_transferred << " bytes";
+ else
+ journal_.trace <<
+ id_ << " on_write_response failed, " <<
+ ec.message();
+ }
+
+ if (ec == boost::asio::error::operation_aborted)
+ return;
+
+ if (ec != 0)
+ {
+ failed (ec);
+ return;
+ }
+
+ bytes_out_ += bytes_transferred;
+
+ assert (pending_writes_ > 0);
+ if (--pending_writes_ > 0)
+ return;
+
+ if (closed_)
+ {
+ socket_->shutdown (socket::shutdown_send, ec);
+ return;
+
+ }
+}
+
+//------------------------------------------------------------------------------
+
// Send a copy of the data.
void
Peer::write (void const* buffer, std::size_t bytes)
{
- // Make sure this happens on an io_service thread.
- impl_.get_io_service().dispatch (strand_.wrap (
+ server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer::async_write, shared_from_this(),
SharedBuffer (static_cast (buffer), bytes))));
}
+// Called to indicate the response has been written (but not sent)
+void
+Peer::complete()
+{
+ if (! strand_.running_in_this_thread())
+ return server_.get_io_service().dispatch (strand_.wrap (
+ std::bind (&Peer::complete, shared_from_this())));
+
+ message_ = beast::http::message{};
+ parser_ = beast::http::parser{message_, true};
+
+ on_read_request (error_code{}, 0);
+}
+
// Make the Session asynchronous
void
Peer::detach ()
{
if (detached_.exchange (1) == 0)
{
- bassert (! work_);
- bassert (detach_ref_ == nullptr);
+ assert (! work_);
+ assert (detach_ref_ == nullptr);
// Maintain an additional reference while detached
detach_ref_ = shared_from_this();
@@ -112,298 +407,32 @@ Peer::detach ()
// after the Session is closed and handlers complete.
//
work_ = boost::in_place (std::ref (
- impl_.get_io_service()));
+ server_.get_io_service()));
}
}
-// Called by the Handler to close the session.
+// Called from the Handler to close the session.
void
-Peer::close ()
-{
- // Make sure this happens on an io_service thread.
- impl_.get_io_service().dispatch (strand_.wrap (
- std::bind (&Peer::handle_close, shared_from_this())));
-}
-
-//--------------------------------------------------------------------------
-
-// Called when the handshake completes
-//
-void
-Peer::handle_handshake (error_code ec)
-{
- if (ec == boost::asio::error::operation_aborted)
- return;
-
- if (ec != 0)
- {
- failed (ec);
- return;
- }
-
- async_read_some();
-}
-
-// Called when the data timer expires
-//
-void
-Peer::handle_data_timer (error_code ec)
-{
- if (ec == boost::asio::error::operation_aborted)
- return;
-
- if (closed_)
- return;
-
- if (ec != 0)
- {
- failed (ec);
- return;
- }
-
- failed (boost::system::errc::make_error_code (
- boost::system::errc::timed_out));
-}
-
-// Called when the request timer expires
-//
-void
-Peer::handle_request_timer (error_code ec)
-{
- if (ec == boost::asio::error::operation_aborted)
- return;
-
- if (closed_)
- return;
-
- if (ec != 0)
- {
- failed (ec);
- return;
- }
-
- failed (boost::system::errc::make_error_code (
- boost::system::errc::timed_out));
-}
-
-// Called when async_write completes.
-void
-Peer::handle_write (error_code ec, std::size_t bytes_transferred,
- SharedBuffer const& buf)
-{
- if (ec == boost::asio::error::operation_aborted)
- return;
-
- if (ec != 0)
- {
- failed (ec);
- return;
- }
-
- bassert (writesPending_ > 0);
- if (--writesPending_ == 0 && closed_)
- socket_->shutdown (socket::shutdown_send, ec);
-}
-
-// Called when async_read_some completes.
-void
-Peer::handle_read (error_code ec, std::size_t bytes_transferred)
-{
- if (ec == boost::asio::error::operation_aborted)
- return;
-
- if (ec != 0 && ec != boost::asio::error::eof)
- {
- failed (ec);
- return;
- }
-
- std::size_t const bytes_parsed (parser_.process (
- buffer_.getData(), bytes_transferred));
-
- if (parser_.error() ||
- bytes_parsed != bytes_transferred)
- {
- failed (boost::system::errc::make_error_code (
- boost::system::errc::bad_message));
- return;
- }
-
- if (ec == boost::asio::error::eof)
- {
- parser_.process_eof();
- ec = error_code();
- }
-
- if (parser_.error())
- {
- failed (boost::system::errc::make_error_code (
- boost::system::errc::bad_message));
- return;
- }
-
- if (! parser_.finished())
- {
- // Feed some headers to the callback
- if (parser_.fields().size() > 0)
- {
- handle_headers();
- if (closed_)
- return;
- }
- }
-
- if (parser_.finished ())
- {
- data_timer_.cancel();
- // VFALCO NOTE: Should we cancel this one?
- request_timer_.cancel();
-
- if (! socket_->needs_handshake())
- socket_->shutdown (socket::shutdown_receive, ec);
-
- handle_request ();
- return;
- }
-
- async_read_some();
-}
-
-// Called when we have some new headers.
-void
-Peer::handle_headers ()
-{
- impl_.handler().onHeaders (session());
-}
-
-// Called when we have a complete http request.
-void
-Peer::handle_request ()
-{
- // This is to guarantee onHeaders is called at least once.
- handle_headers();
-
- if (closed_)
- return;
-
- // Process the HTTPRequest
- impl_.handler().onRequest (session());
-}
-
-// Called to close the session.
-void
-Peer::handle_close ()
+Peer::close (bool graceful)
{
closed_ = true;
+ if (! strand_.running_in_this_thread())
+ return server_.get_io_service().dispatch (strand_.wrap (
+ std::bind (&Peer::close, shared_from_this(), graceful)));
+
+ if (! graceful || pending_writes_ == 0)
+ {
+ // We should cancel pending i/o
+
+ if (pending_writes_ == 0)
+ {
+ }
+ }
+
// Release our additional reference
detach_ref_.reset();
}
-//--------------------------------------------------------------------------
-//
-// Peer
-//
-
-// Called when the acceptor accepts our socket.
-void
-Peer::accept ()
-{
- callClose_ = true;
-
- impl_.handler().onAccept (session());
-
- if (closed_)
- {
- cancel();
- return;
- }
-
- request_timer_.expires_from_now (
- boost::posix_time::seconds (
- requestTimeoutSeconds));
-
- request_timer_.async_wait (strand_.wrap (std::bind (
- &Peer::handle_request_timer, shared_from_this(),
- beast::asio::placeholders::error)));
-
- if (socket_->needs_handshake ())
- {
- socket_->async_handshake (beast::asio::abstract_socket::server,
- strand_.wrap (std::bind (&Peer::handle_handshake, shared_from_this(),
- beast::asio::placeholders::error)));
- }
- else
- {
- async_read_some();
- }
-}
-
-// Cancel all pending i/o and timers and send tcp shutdown.
-void
-Peer::cancel ()
-{
- error_code ec;
- data_timer_.cancel (ec);
- request_timer_.cancel (ec);
- socket_->cancel (ec);
- socket_->shutdown (socket::shutdown_both, ec);
-}
-
-// Called by a completion handler when error is not eof or aborted.
-void
-Peer::failed (error_code const& ec)
-{
- errorCode_ = ec.value();
- bassert (errorCode_ != 0);
- cancel ();
-}
-
-// Call the async_read_some initiating function.
-void
-Peer::async_read_some ()
-{
- // re-arm the data timer
- // (this cancels the previous wait, if any)
- //
- data_timer_.expires_from_now (
- boost::posix_time::seconds (
- dataTimeoutSeconds));
-
- data_timer_.async_wait (strand_.wrap (std::bind (
- &Peer::handle_data_timer, shared_from_this(),
- beast::asio::placeholders::error)));
-
- // issue the read
- //
- boost::asio::mutable_buffers_1 buf (
- buffer_.getData (), buffer_.getSize ());
-
- socket_->async_read_some (buf, strand_.wrap (
- std::bind (&Peer::handle_read, shared_from_this(),
- beast::asio::placeholders::error,
- beast::asio::placeholders::bytes_transferred)));
-}
-
-// Send a shared buffer
-void
-Peer::async_write (SharedBuffer const& buf)
-{
- bassert (buf.get().size() > 0);
-
- ++writesPending_;
-
- // Send the copy. We pass the SharedBuffer in the last parameter
- // so that a reference is maintained as the handler gets copied.
- // When the final completion function returns, the reference
- // count will drop to zero and the buffer will be freed.
- //
- boost::asio::async_write (*socket_,
- boost::asio::const_buffers_1 (&(*buf)[0], buf->size()),
- strand_.wrap (std::bind (&Peer::handle_write,
- shared_from_this(), beast::asio::placeholders::error,
- beast::asio::placeholders::bytes_transferred, buf)));
-}
-
}
}
diff --git a/src/ripple/http/impl/Peer.h b/src/ripple/http/impl/Peer.h
index 427aab4c8c..53559db9bf 100644
--- a/src/ripple/http/impl/Peer.h
+++ b/src/ripple/http/impl/Peer.h
@@ -26,9 +26,14 @@
#include
#include
#include
+#include
+#include
#include
#include
#include
+#include
+#include
+#include
#include
#include
@@ -48,129 +53,54 @@ class Peer
, public beast::LeakChecked
{
private:
+ typedef std::chrono::system_clock clock_type;
+
enum
{
// Size of our receive buffer
- bufferSize = 8192,
+ bufferSize = 2048,
// Largest HTTP request allowed
maxRequestBytes = 32 * 1024,
- // Max seconds without receiving a byte
- dataTimeoutSeconds = 10,
-
- // Max seconds without completing the request
- requestTimeoutSeconds = 30
+ // Max seconds without completing a message
+ timeoutSeconds = 30
+ //timeoutSeconds = 3
};
- ServerImpl& impl_;
+ beast::Journal journal_;
+ ServerImpl& server_;
+ std::string id_;
boost::asio::io_service::strand strand_;
- boost::asio::deadline_timer data_timer_;
- boost::asio::deadline_timer request_timer_;
+ boost::asio::deadline_timer timer_;
std::unique_ptr socket_;
- // VFALCO TODO Use c++11
- beast::MemoryBlock buffer_;
-
- beast::HTTPRequestParser parser_;
- int writesPending_;
+ boost::asio::streambuf read_buf_;
+ beast::http::message message_;
+ beast::http::parser parser_;
+ int pending_writes_;
bool closed_;
+ bool finished_;
bool callClose_;
std::shared_ptr detach_ref_;
boost::optional work_;
- int errorCode_;
+
+ boost::system::error_code ec_;
std::atomic detached_;
+ clock_type::time_point when_;
+ std::string when_str_;
+ int request_count_;
+ std::size_t bytes_in_;
+ std::size_t bytes_out_;
+
//--------------------------------------------------------------------------
public:
- Peer (ServerImpl& impl, Port const& port);
+ Peer (ServerImpl& impl, Port const& port, beast::Journal journal);
~Peer ();
-private:
- //--------------------------------------------------------------------------
- //
- // Session
- //
-
- beast::Journal
- journal()
- {
- return impl_.journal();
- }
-
- beast::IP::Endpoint
- remoteAddress()
- {
- return from_asio (get_socket().remote_endpoint());
- }
-
- bool
- headersComplete()
- {
- return parser_.headersComplete();
- }
-
- beast::HTTPHeaders
- headers()
- {
- return beast::HTTPHeaders (parser_.fields());
- }
-
- beast::SharedPtr const&
- request()
- {
- return parser_.request();
- }
-
- std::string
- content();
-
- void
- write (void const* buffer, std::size_t bytes);
-
- void
- detach ();
-
- void
- close ();
-
- //--------------------------------------------------------------------------
- //
- // Completion Handlers
- //
-
- void
- handle_handshake (error_code ec);
-
- void
- handle_data_timer (error_code ec);
-
- void
- handle_request_timer (error_code ec);
-
- void
- handle_write (error_code ec, std::size_t bytes_transferred,
- SharedBuffer const& buf);
-
- void
- handle_read (error_code ec, std::size_t bytes_transferred);
-
- void
- handle_headers ();
-
- void
- handle_request ();
-
- void
- handle_close ();
-
-public:
- //--------------------------------------------------------------------------
- //
- // Peer
- //
socket&
get_socket()
{
@@ -184,18 +114,76 @@ public:
}
void
- accept ();
+ accept();
+private:
void
- cancel ();
+ cancel();
void
failed (error_code const& ec);
void
- async_read_some ();
+ start_timer();
- void async_write (SharedBuffer const& buf);
+ void
+ async_write (SharedBuffer const& buf);
+
+ //--------------------------------------------------------------------------
+ //
+ // Completion Handlers
+ //
+
+ void
+ on_timer (error_code ec);
+
+ void
+ on_ssl_handshake (error_code ec);
+
+ void
+ on_read_request (error_code ec, std::size_t bytes_transferred);
+
+ void
+ on_write_response (error_code ec, std::size_t bytes_transferred,
+ SharedBuffer const& buf);
+
+ void
+ on_close ();
+
+ //--------------------------------------------------------------------------
+ //
+ // Session
+ //
+
+ beast::Journal
+ journal() override
+ {
+ return server_.journal();
+ }
+
+ beast::IP::Endpoint
+ remoteAddress() override
+ {
+ return from_asio (get_socket().remote_endpoint());
+ }
+
+ beast::http::message&
+ message() override
+ {
+ return message_;
+ }
+
+ void
+ write (void const* buffer, std::size_t bytes) override;
+
+ void
+ complete() override;
+
+ void
+ detach() override;
+
+ void
+ close (bool graceful) override;
};
}
diff --git a/src/ripple/http/impl/Port.cpp b/src/ripple/http/impl/Port.cpp
index 629250ca1a..ddf28e036b 100644
--- a/src/ripple/http/impl/Port.cpp
+++ b/src/ripple/http/impl/Port.cpp
@@ -21,7 +21,7 @@ namespace ripple {
namespace HTTP {
Port::Port ()
- : security (no_ssl)
+ : security (Security::no_ssl)
, port (0)
, context (nullptr)
{
diff --git a/src/ripple/http/impl/Server.cpp b/src/ripple/http/impl/Server.cpp
index cbceaf7350..70b6efa1f3 100644
--- a/src/ripple/http/impl/Server.cpp
+++ b/src/ripple/http/impl/Server.cpp
@@ -64,5 +64,11 @@ Server::stop ()
m_impl->stop(true);
}
+void
+Server::onWrite (beast::PropertyStream::Map& map)
+{
+ m_impl->onWrite (map);
+}
+
}
}
diff --git a/src/ripple/http/impl/ServerImpl.cpp b/src/ripple/http/impl/ServerImpl.cpp
index f77ff26d1f..632a6d2e50 100644
--- a/src/ripple/http/impl/ServerImpl.cpp
+++ b/src/ripple/http/impl/ServerImpl.cpp
@@ -18,53 +18,66 @@
//==============================================================================
#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
namespace ripple {
namespace HTTP {
-ServerImpl::ServerImpl (Server& server, Handler& handler, beast::Journal journal)
- : Thread ("HTTP::Server")
- , m_server (server)
+ServerImpl::ServerImpl (Server& server,
+ Handler& handler, beast::Journal journal)
+ : m_server (server)
, m_handler (handler)
, journal_ (journal)
- , m_strand (m_io_service)
- , m_work (boost::in_place (std::ref (m_io_service)))
+ , m_strand (io_service_)
+ , m_work (boost::in_place (std::ref (io_service_)))
, m_stopped (true)
{
- startThread ();
+ thread_ = std::thread (std::bind (
+ &ServerImpl::run, this));
}
ServerImpl::~ServerImpl ()
{
- stopThread ();
+ thread_.join();
}
-Ports const& ServerImpl::getPorts () const
+Ports const&
+ServerImpl::getPorts () const
{
- SharedState::ConstUnlockedAccess state (m_state);
- return state->ports;
+ std::lock_guard lock (mutex_);
+ return state_.ports;
}
-void ServerImpl::setPorts (Ports const& ports)
+void
+ServerImpl::setPorts (Ports const& ports)
{
- SharedState::Access state (m_state);
- state->ports = ports;
+ std::lock_guard lock (mutex_);
+ state_.ports = ports;
update();
}
-bool ServerImpl::stopping () const
+bool
+ServerImpl::stopping () const
{
return ! m_work;
}
-void ServerImpl::stop (bool wait)
+void
+ServerImpl::stop (bool wait)
{
if (! stopping())
{
m_work = boost::none;
update();
}
-
+
if (wait)
m_stopped.wait();
}
@@ -74,55 +87,99 @@ void ServerImpl::stop (bool wait)
// Server
//
-Handler& ServerImpl::handler()
+Handler&
+ServerImpl::handler()
{
return m_handler;
}
-boost::asio::io_service& ServerImpl::get_io_service()
+boost::asio::io_service&
+ServerImpl::get_io_service()
{
- return m_io_service;
+ return io_service_;
}
// Inserts the peer into our list of peers. We only remove it
// from the list inside the destructor of the Peer object. This
// way, the Peer can never outlive the server.
//
-void ServerImpl::add (Peer& peer)
+void
+ServerImpl::add (Peer& peer)
{
- SharedState::Access state (m_state);
- state->peers.push_back (peer);
+ std::lock_guard lock (mutex_);
+ state_.peers.push_back (peer);
}
-void ServerImpl::add (Door& door)
+void
+ServerImpl::add (Door& door)
{
- SharedState::Access state (m_state);
- state->doors.push_back (door);
+ std::lock_guard lock (mutex_);
+ state_.doors.push_back (door);
}
// Removes the peer from our list of peers. This is only called from
// the destructor of Peer. Essentially, the item in the list functions
// as a weak_ptr.
//
-void ServerImpl::remove (Peer& peer)
+void
+ServerImpl::remove (Peer& peer)
{
- SharedState::Access state (m_state);
- state->peers.erase (state->peers.iterator_to (peer));
+ std::lock_guard lock (mutex_);
+ state_.peers.erase (state_.peers.iterator_to (peer));
}
-void ServerImpl::remove (Door& door)
+void
+ServerImpl::remove (Door& door)
{
- SharedState::Access state (m_state);
- state->doors.push_back (door);
+ std::lock_guard lock (mutex_);
+ state_.doors.push_back (door);
}
//--------------------------------------------------------------------------
-//
-// Thread
-//
+
+void
+ServerImpl::report (Stat&& stat)
+{
+ std::lock_guard lock (mutex_);
+ if (stats_.size() >= historySize)
+ stats_.pop_back();
+ stats_.emplace_front (std::move(stat));
+}
+
+void
+ServerImpl::onWrite (beast::PropertyStream::Map& map)
+{
+ std::lock_guard lock (mutex_);
+
+ // VFALCO TODO Write the list of doors
+
+ {
+ beast::PropertyStream::Set set ("sessions", map);
+ for (auto const& stat : stats_)
+ {
+ beast::PropertyStream::Map item (set);
+
+ item ["when"] = stat.when;
+
+ {
+ std::stringstream ss;
+ ss << stat.elapsed;
+ item ["elapsed"] = ss.str();
+ }
+
+ item ["requests"] = stat.requests;
+ item ["bytes_in"] = stat.bytes_in;
+ item ["bytes_out"] = stat.bytes_out;
+ if (stat.ec)
+ item ["error"] = stat.ec.message();
+ }
+ }
+}
+
//--------------------------------------------------------------------------
-int ServerImpl::compare (Port const& lhs, Port const& rhs)
+int
+ServerImpl::compare (Port const& lhs, Port const& rhs)
{
if (lhs < rhs)
return -1;
@@ -131,18 +188,31 @@ int ServerImpl::compare (Port const& lhs, Port const& rhs)
return 0;
}
-// Updates our Door list based on settings.
-//
-void ServerImpl::handle_update ()
+void
+ServerImpl::update()
{
+ io_service_.post (m_strand.wrap (std::bind (
+ &ServerImpl::on_update, this)));
+}
+
+// Updates our Door list based on settings.
+void
+ServerImpl::on_update ()
+{
+ /*
+ if (! m_strand.running_in_this_thread())
+ io_service_.dispatch (m_strand.wrap (std::bind (
+ &ServerImpl::update, this)));
+ */
+
if (! stopping())
{
// Make a local copy to shorten the lock
//
Ports ports;
{
- SharedState::ConstAccess state (m_state);
- ports = state->ports;
+ std::lock_guard lock (mutex_);
+ ports = state_.ports;
}
std::sort (ports.begin(), ports.end());
@@ -206,19 +276,16 @@ void ServerImpl::handle_update ()
}
}
-// Causes handle_update to run on the io_service
-//
-void ServerImpl::update ()
+// Thread entry point to perform io_service work
+void
+ServerImpl::run()
{
- m_io_service.post (m_strand.wrap (std::bind (
- &ServerImpl::handle_update, this)));
-}
+ static std::atomic id;
-// The main i/o processing loop.
-//
-void ServerImpl::run ()
-{
- m_io_service.run ();
+ beast::Thread::setCurrentThreadName (
+ std::string("HTTP::Server #") + std::to_string (++id));
+
+ io_service_.run();
m_stopped.signal();
m_handler.onStopped (m_server);
diff --git a/src/ripple/http/impl/ServerImpl.h b/src/ripple/http/impl/ServerImpl.h
index 0443b32215..ecd48da9a1 100644
--- a/src/ripple/http/impl/ServerImpl.h
+++ b/src/ripple/http/impl/ServerImpl.h
@@ -20,6 +20,7 @@
#ifndef RIPPLE_HTTP_SERVERIMPL_H_INCLUDED
#define RIPPLE_HTTP_SERVERIMPL_H_INCLUDED
+#include
#include
#include
#include
@@ -27,6 +28,11 @@
#include
#include
#include
+#include
+#include
+#include
+#include
+#include
#include
namespace ripple {
@@ -35,9 +41,26 @@ namespace HTTP {
class Door;
class Peer;
-class ServerImpl : public beast::Thread
+struct Stat
+{
+ std::string when;
+ std::chrono::seconds elapsed;
+ int requests;
+ std::size_t bytes_in;
+ std::size_t bytes_out;
+ boost::system::error_code ec;
+};
+
+class ServerImpl
{
private:
+ typedef std::chrono::system_clock clock_type;
+
+ enum
+ {
+ historySize = 100
+ };
+
struct State
{
// Attributes for our listening ports
@@ -50,18 +73,21 @@ private:
beast::List doors;
};
- typedef beast::SharedData SharedState;
- typedef std::vector > Doors;
+ typedef std::vector > Doors;
Server& m_server;
Handler& m_handler;
+ std::thread thread_;
+ std::mutex mutable mutex_;
+ std::condition_variable cond_;
beast::Journal journal_;
- boost::asio::io_service m_io_service;
+ boost::asio::io_service io_service_;
boost::asio::io_service::strand m_strand;
boost::optional m_work;
beast::WaitableEvent m_stopped;
- SharedState m_state;
+ State state_;
Doors m_doors;
+ std::deque stats_;
public:
ServerImpl (Server& server, Handler& handler, beast::Journal journal);
@@ -104,15 +130,24 @@ public:
remove (Door& door);
void
- handle_update ();
+ report (Stat&& stat);
void
- update ();
+ onWrite (beast::PropertyStream::Map& map);
+
+private:
+ static
+ int
+ compare (Port const& lhs, Port const& rhs);
void
- run ();
+ update();
+
+ void
+ on_update();
- static int compare (Port const& lhs, Port const& rhs);
+ void
+ run();
};
diff --git a/src/ripple/http/tests/Server.test.cpp b/src/ripple/http/tests/Server.test.cpp
new file mode 100644
index 0000000000..687259259b
--- /dev/null
+++ b/src/ripple/http/tests/Server.test.cpp
@@ -0,0 +1,254 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2012, 2013 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace ripple {
+namespace HTTP {
+
+class Server_test : public beast::unit_test::suite
+{
+public:
+ enum
+ {
+ testPort = 1001
+ };
+
+ class TestSink : public beast::Journal::Sink
+ {
+ beast::unit_test::suite& suite_;
+
+ public:
+ TestSink (beast::unit_test::suite& suite)
+ : suite_ (suite)
+ {
+ }
+
+ void
+ write (beast::Journal::Severity level,
+ std::string const& text) override
+ {
+ suite_.log << text;
+ }
+ };
+
+ struct TestHandler : Handler
+ {
+ void
+ onAccept (Session& session) override
+ {
+ }
+
+ void
+ onRequest (Session& session) override
+ {
+ session << "Hello, world!\n";
+ if (session.message().keep_alive())
+ session.complete();
+ else
+ session.close (true);
+ }
+
+ void
+ onClose (Session& session,
+ boost::system::error_code const&) override
+ {
+ }
+
+ void
+ onStopped (Server& server)
+ {
+ }
+ };
+
+ // Connect to an address
+ template
+ bool
+ connect (Socket& s, std::string const& addr, int port)
+ {
+ try
+ {
+ typename Socket::endpoint_type ep (
+ boost::asio::ip::address::from_string (addr), port);
+ s.connect (ep);
+ pass();
+ return true;
+ }
+ catch (std::exception const& e)
+ {
+ fail (e.what());
+ }
+
+ return false;
+ }
+
+ // Write a string to the stream
+ template
+ bool
+ write (SyncWriteStream& s, std::string const& text)
+ {
+ try
+ {
+ boost::asio::write (s, boost::asio::buffer (text));
+ pass();
+ return true;
+ }
+ catch (std::exception const& e)
+ {
+ fail (e.what());
+ }
+ return false;
+ }
+
+ // Expect that reading the stream produces a matching string
+ template
+ bool
+ expect_read (SyncReadStream& s, std::string const& match)
+ {
+ boost::asio::streambuf b (1000); // limit on read
+ try
+ {
+ auto const n = boost::asio::read_until (s, b, '\n');
+ if (expect (n == match.size()))
+ {
+ std::string got;
+ got.resize (n);
+ boost::asio::buffer_copy (boost::asio::buffer (
+ &got[0], n), b.data());
+ return expect (got == match);
+ }
+ }
+ catch (std::length_error const& e)
+ {
+ fail(e.what());
+ }
+ return false;
+ }
+
+ void
+ test_request()
+ {
+ testcase("request");
+
+ boost::asio::io_service ios;
+ typedef boost::asio::ip::tcp::socket socket;
+ socket s (ios);
+
+ if (! connect (s, "127.0.0.1", testPort))
+ return;
+
+ if (! write (s,
+ "GET / HTTP/1.1\r\n"
+ "Connection: close\r\n"
+ "\r\n"))
+ return;
+
+ if (! expect_read (s, "Hello, world!\n"))
+ return ;
+
+ try
+ {
+ s.shutdown (socket::shutdown_both);
+ pass();
+ }
+ catch (std::exception const& e)
+ {
+ fail (e.what());
+ }
+
+ std::this_thread::sleep_for (std::chrono::seconds (1));
+ }
+
+ void
+ test_keepalive()
+ {
+ testcase("keepalive");
+
+ boost::asio::io_service ios;
+ typedef boost::asio::ip::tcp::socket socket;
+ socket s (ios);
+
+ if (! connect (s, "127.0.0.1", testPort))
+ return;
+
+ if (! write (s,
+ "GET / HTTP/1.1\r\n"
+ "Connection: Keep-Alive\r\n"
+ "\r\n"))
+ return;
+
+ if (! expect_read (s, "Hello, world!\n"))
+ return ;
+
+ if (! write (s,
+ "GET / HTTP/1.1\r\n"
+ "Connection: close\r\n"
+ "\r\n"))
+ return;
+
+ if (! expect_read (s, "Hello, world!\n"))
+ return ;
+
+ try
+ {
+ s.shutdown (socket::shutdown_both);
+ pass();
+ }
+ catch (std::exception const& e)
+ {
+ fail (e.what());
+ }
+ }
+
+ void
+ run()
+ {
+ TestSink sink {*this};
+ sink.severity (beast::Journal::Severity::kAll);
+ beast::Journal journal {sink};
+ TestHandler handler;
+ Server s (handler, journal);
+ Ports ports;
+ std::unique_ptr c (
+ RippleSSLContext::createBare ());
+ ports.emplace_back (testPort, beast::IP::Endpoint (
+ beast::IP::AddressV4 (127, 0, 0, 1), 0),
+ Port::Security::no_ssl, c.get());
+ s.setPorts (ports);
+
+ test_request();
+ //test_keepalive();
+
+ s.stop();
+
+ pass();
+ }
+};
+
+BEAST_DEFINE_TESTSUITE_MANUAL(Server,http,ripple);
+
+}
+}
diff --git a/src/ripple/module/app/main/Application.cpp b/src/ripple/module/app/main/Application.cpp
index 8df6edbc2f..257a37a866 100644
--- a/src/ripple/module/app/main/Application.cpp
+++ b/src/ripple/module/app/main/Application.cpp
@@ -302,7 +302,7 @@ public:
// VFALCO NOTE LocalCredentials starts the deprecated UNL service
, m_deprecatedUNL (UniqueNodeList::New (*m_jobQueue))
- , m_rpcHTTPServer (RPCHTTPServer::New (*m_networkOPs,
+ , m_rpcHTTPServer (make_RPCHTTPServer (*m_networkOPs,
m_logs.journal("HTTPServer"), *m_jobQueue, *m_networkOPs, *m_resourceManager))
, m_rpcServerHandler (*m_networkOPs, *m_resourceManager) // passive object, not a Service
@@ -363,6 +363,7 @@ public:
m_nodeStoreScheduler.setJobQueue (*m_jobQueue);
add (m_ledgerMaster->getPropertySource ());
+ add (*m_rpcHTTPServer);
// VFALCO TODO remove these once the call is thread safe.
HashMaps::getInstance ().initializeNonce ();
diff --git a/src/ripple/module/app/main/RPCHTTPServer.cpp b/src/ripple/module/app/main/RPCHTTPServer.cpp
index 24759a2035..c70b0b8492 100644
--- a/src/ripple/module/app/main/RPCHTTPServer.cpp
+++ b/src/ripple/module/app/main/RPCHTTPServer.cpp
@@ -40,10 +40,8 @@ public:
std::unique_ptr m_context;
RPCHTTPServerImp (Stoppable& parent,
- beast::Journal journal,
- JobQueue& jobQueue,
- NetworkOPs& networkOPs,
- Resource::Manager& resourceManager)
+ beast::Journal journal, JobQueue& jobQueue, NetworkOPs& networkOPs,
+ Resource::Manager& resourceManager)
: RPCHTTPServer (parent)
, m_resourceManager (resourceManager)
, m_journal (journal)
@@ -65,12 +63,13 @@ public:
}
}
- ~RPCHTTPServerImp ()
+ ~RPCHTTPServerImp()
{
m_server.stop();
}
- void setup (beast::Journal journal)
+ void
+ setup (beast::Journal journal) override
{
if (! getConfig ().getRpcIP().empty () &&
getConfig ().getRpcPort() != 0)
@@ -81,7 +80,7 @@ public:
//if (! is_unspecified (ep))
{
HTTP::Port port;
- port.security = HTTP::Port::allow_ssl;
+ port.security = HTTP::Port::Security::allow_ssl;
port.addr = ep.at_port(0);
if (getConfig ().getRpcPort() != 0)
port.port = getConfig ().getRpcPort();
@@ -105,12 +104,14 @@ public:
// Stoppable
//
- void onStop ()
+ void
+ onStop() override
{
m_server.stopAsync();
}
- void onChildrenStopped ()
+ void
+ onChildrenStopped() override
{
}
@@ -119,28 +120,26 @@ public:
// HTTP::Handler
//
- void onAccept (HTTP::Session& session)
+ void
+ onAccept (HTTP::Session& session) override
{
// Reject non-loopback connections if RPC_ALLOW_REMOTE is not set
if (! getConfig().RPC_ALLOW_REMOTE &&
! beast::IP::is_loopback (session.remoteAddress()))
{
- session.close();
+ session.close (false);
}
}
- void onHeaders (HTTP::Session& session)
- {
- }
-
- void onRequest (HTTP::Session& session)
+ void
+ onRequest (HTTP::Session& session) override
{
// Check user/password authorization
- auto const headers (session.request()->headers().build_map());
+ auto const headers (build_map (session.message().headers));
if (! HTTPAuthorized (headers))
{
session.write (HTTPReply (403, "Forbidden"));
- session.close();
+ session.close (true);
return;
}
@@ -158,17 +157,21 @@ public:
#endif
}
- void onClose (HTTP::Session& session, int errorCode)
+ void
+ onClose (HTTP::Session& session,
+ boost::system::error_code const&) override
{
}
- void onStopped (HTTP::Server&)
+ void
+ onStopped (HTTP::Server&) override
{
stopped();
}
//--------------------------------------------------------------------------
+ // Dispatched on the job queue
void processSession (Job& job, HTTP::Session& session)
{
#if 0
@@ -176,11 +179,19 @@ public:
session.write (m_deprecatedHandler.processRequest (
session.content(), session.remoteAddress().at_port(0)));
#else
- session.write (processRequest (session.content(),
+ auto const s (to_string(session.message().body));
+ session.write (processRequest (to_string(session.message().body),
session.remoteAddress().at_port(0)));
#endif
- session.close();
+ if (session.message().keep_alive())
+ {
+ session.complete();
+ }
+ else
+ {
+ session.close (true);
+ }
}
std::string createResponse (
@@ -280,24 +291,35 @@ public:
return createResponse (200, response);
}
+
+ //
+ // PropertyStream
+ //
+
+ void
+ onWrite (beast::PropertyStream::Map& map) override
+ {
+ m_server.onWrite (map);
+ }
};
//------------------------------------------------------------------------------
RPCHTTPServer::RPCHTTPServer (Stoppable& parent)
: Stoppable ("RPCHTTPServer", parent)
+ , Source ("rpc")
{
}
//------------------------------------------------------------------------------
-RPCHTTPServer* RPCHTTPServer::New (Stoppable& parent,
- beast::Journal journal,
- JobQueue& jobQueue,
- NetworkOPs& networkOPs,
- Resource::Manager& resourceManager)
+std::unique_ptr
+make_RPCHTTPServer (beast::Stoppable& parent, beast::Journal journal,
+ JobQueue& jobQueue, NetworkOPs& networkOPs,
+ Resource::Manager& resourceManager)
{
- return new RPCHTTPServerImp (parent, journal, jobQueue, networkOPs, resourceManager);
+ return std::make_unique (
+ parent, journal, jobQueue, networkOPs, resourceManager);
}
}
diff --git a/src/ripple/module/app/main/RPCHTTPServer.h b/src/ripple/module/app/main/RPCHTTPServer.h
index 979a8a7076..0a22736763 100644
--- a/src/ripple/module/app/main/RPCHTTPServer.h
+++ b/src/ripple/module/app/main/RPCHTTPServer.h
@@ -20,24 +20,34 @@
#ifndef RIPPLE_APP_RPCHTTPSERVER_H_INCLUDED
#define RIPPLE_APP_RPCHTTPSERVER_H_INCLUDED
+#include
+#include
+#include //
+
namespace ripple {
-class RPCHTTPServer : public beast::Stoppable
+class RPCHTTPServer
+ : public beast::Stoppable
+ , public beast::PropertyStream::Source
{
protected:
RPCHTTPServer (Stoppable& parent);
public:
- static RPCHTTPServer* New (Stoppable& parent,
- beast::Journal journal, JobQueue& jobQueue, NetworkOPs& networkOPs,
- Resource::Manager& resourceManager);
-
- virtual ~RPCHTTPServer () { }
+ virtual
+ ~RPCHTTPServer() = default;
/** Opens listening ports based on the Config settings. */
- virtual void setup (beast::Journal journal) = 0;
+ virtual
+ void
+ setup (beast::Journal journal) = 0;
};
+std::unique_ptr
+make_RPCHTTPServer (beast::Stoppable& parent, beast::Journal journal,
+ JobQueue& jobQueue, NetworkOPs& networkOPs,
+ Resource::Manager& resourceManager);
+
}
#endif
diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp
index 0de0fbbe12..193a82e237 100644
--- a/src/ripple/overlay/impl/PeerImp.cpp
+++ b/src/ripple/overlay/impl/PeerImp.cpp
@@ -40,8 +40,6 @@ peerTXData (Job&, std::weak_ptr wPeer, uint256 const& hash,
//------------------------------------------------------------------------------
-//------------------------------------------------------------------------------
-
/* Completion handlers for client role.
Logic steps:
1. Establish outgoing connection
@@ -114,11 +112,11 @@ PeerImp::on_connect (error_code ec)
beast::asio::placeholders::error)));
}
-beast::http::basic_message
+beast::http::message
PeerImp::make_request()
{
assert (! m_inbound);
- beast::http::basic_message m;
+ beast::http::message m;
m.method (beast::http::method_t::http_get);
m.url ("/");
m.version (1, 1);
@@ -152,8 +150,8 @@ PeerImp::on_connect_ssl (error_code ec)
}
#if RIPPLE_STRUCTURED_OVERLAY_CLIENT
- beast::http::basic_message req (make_request());
- beast::http::xwrite (write_buffer_, req);
+ beast::http::message req (make_request());
+ beast::http::write (write_buffer_, req);
on_write_http_request (error_code(), 0);
#else
@@ -204,8 +202,11 @@ PeerImp::on_read_http_response (error_code ec, std::size_t bytes_transferred)
if (! ec)
{
read_buffer_.commit (bytes_transferred);
+ bool success;
std::size_t bytes_consumed;
- std::tie (ec, bytes_consumed) = http_parser_->write (read_buffer_.data());
+ std::tie (success, bytes_consumed) = http_parser_->write (read_buffer_.data());
+ if (! success)
+ ec = http_parser_->error();
if (! ec)
{
@@ -232,6 +233,12 @@ PeerImp::on_read_http_response (error_code ec, std::size_t bytes_transferred)
}
}
+ if (ec == boost::asio::error::eof)
+ {
+ // remote closed their end
+ // VFALCO TODO Clean up the shutdown of the socket
+ }
+
if (ec)
{
m_journal.info <<
@@ -344,8 +351,12 @@ PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred)
if (! ec)
{
read_buffer_.commit (bytes_transferred);
+ bool success;
std::size_t bytes_consumed;
- std::tie (ec, bytes_consumed) = http_parser_->write (read_buffer_.data());
+ std::tie (success, bytes_consumed) = http_parser_->write (read_buffer_.data());
+ if (! success)
+ ec = http_parser_->error();
+
if (! ec)
{
read_buffer_.consume (bytes_consumed);
@@ -363,7 +374,7 @@ PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred)
"Upgrade: Ripple/1.2\r\n"
"Connection: Upgrade\r\n"
"\r\n";
- beast::http::xwrite (write_buffer_, ss.str());
+ beast::http::write (write_buffer_, ss.str());
on_write_http_response(error_code(), 0);
}
else
@@ -377,7 +388,7 @@ PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred)
"400 Bad Request
"
"The server requires an Upgrade request."
"