HTTP(S)-RPC server improvements (RIPD-489, RIPD-533):

* Correct handling of Keep-Alive in socket handlers
* Report session history in print command
This commit is contained in:
Vinnie Falco
2014-08-26 09:25:28 -07:00
committed by Tom Ritchford
parent f97ef7039a
commit 04bcd93ba3
21 changed files with 1055 additions and 594 deletions

View File

@@ -351,23 +351,25 @@
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\HeapBlock.h">
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\basic_message.h">
<ClInclude Include="..\..\src\beast\beast\http\basic_parser.h">
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\basic_url.h">
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\body.h">
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\client_session.h">
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\detail\header_traits.h">
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\get.h">
<ClInclude Include="..\..\src\beast\beast\http\headers.h">
</ClInclude>
<ClCompile Include="..\..\src\beast\beast\http\HTTP.unity.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\impl\basic_url.cpp">
<ClCompile Include="..\..\src\beast\beast\http\impl\basic_parser.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\impl\get.cpp">
<ClCompile Include="..\..\src\beast\beast\http\impl\basic_url.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\impl\http-parser\http_parser.c">
@@ -386,15 +388,14 @@
<ClCompile Include="..\..\src\beast\beast\http\impl\ParsedURL.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\impl\parser.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\impl\raw_parser.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\impl\URL.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
<ClInclude Include="..\..\src\beast\beast\http\message.h">
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\method.h">
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\ParsedURL.h">
@@ -405,9 +406,6 @@
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\rfc2616.h">
</ClInclude>
<ClCompile Include="..\..\src\beast\beast\http\tests\basic_message.test.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\tests\basic_url.test.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
@@ -417,6 +415,9 @@
<ClCompile Include="..\..\src\beast\beast\http\tests\ParsedURL.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\tests\parser.test.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\tests\rfc2616.test.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
@@ -1940,6 +1941,9 @@
</ClInclude>
<ClInclude Include="..\..\src\ripple\http\Session.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\http\tests\Server.test.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple\json\impl\JsonPropertyStream.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>

View File

@@ -337,6 +337,9 @@
<Filter Include="ripple\http\impl">
<UniqueIdentifier>{43D68742-4714-D103-EE00-EB10BD045FB6}</UniqueIdentifier>
</Filter>
<Filter Include="ripple\http\tests">
<UniqueIdentifier>{AA0D98CC-99E6-61CE-86D7-35156DC4EE55}</UniqueIdentifier>
</Filter>
<Filter Include="ripple\json">
<UniqueIdentifier>{BEDCC703-A2C8-FF25-7E1E-3471BD39ED98}</UniqueIdentifier>
</Filter>
@@ -957,28 +960,31 @@
<ClInclude Include="..\..\src\beast\beast\HeapBlock.h">
<Filter>beast</Filter>
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\basic_message.h">
<ClInclude Include="..\..\src\beast\beast\http\basic_parser.h">
<Filter>beast\http</Filter>
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\basic_url.h">
<Filter>beast\http</Filter>
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\body.h">
<Filter>beast\http</Filter>
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\client_session.h">
<Filter>beast\http</Filter>
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\detail\header_traits.h">
<Filter>beast\http\detail</Filter>
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\get.h">
<ClInclude Include="..\..\src\beast\beast\http\headers.h">
<Filter>beast\http</Filter>
</ClInclude>
<ClCompile Include="..\..\src\beast\beast\http\HTTP.unity.cpp">
<Filter>beast\http</Filter>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\impl\basic_url.cpp">
<ClCompile Include="..\..\src\beast\beast\http\impl\basic_parser.cpp">
<Filter>beast\http\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\impl\get.cpp">
<ClCompile Include="..\..\src\beast\beast\http\impl\basic_url.cpp">
<Filter>beast\http\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\impl\http-parser\http_parser.c">
@@ -999,15 +1005,15 @@
<ClCompile Include="..\..\src\beast\beast\http\impl\ParsedURL.cpp">
<Filter>beast\http\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\impl\parser.cpp">
<Filter>beast\http\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\impl\raw_parser.cpp">
<Filter>beast\http\impl</Filter>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\impl\URL.cpp">
<Filter>beast\http\impl</Filter>
</ClCompile>
<ClInclude Include="..\..\src\beast\beast\http\message.h">
<Filter>beast\http</Filter>
</ClInclude>
<ClInclude Include="..\..\src\beast\beast\http\method.h">
<Filter>beast\http</Filter>
</ClInclude>
@@ -1023,9 +1029,6 @@
<ClInclude Include="..\..\src\beast\beast\http\rfc2616.h">
<Filter>beast\http</Filter>
</ClInclude>
<ClCompile Include="..\..\src\beast\beast\http\tests\basic_message.test.cpp">
<Filter>beast\http\tests</Filter>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\tests\basic_url.test.cpp">
<Filter>beast\http\tests</Filter>
</ClCompile>
@@ -1035,6 +1038,9 @@
<ClCompile Include="..\..\src\beast\beast\http\tests\ParsedURL.cpp">
<Filter>beast\http\tests</Filter>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\tests\parser.test.cpp">
<Filter>beast\http\tests</Filter>
</ClCompile>
<ClCompile Include="..\..\src\beast\beast\http\tests\rfc2616.test.cpp">
<Filter>beast\http\tests</Filter>
</ClCompile>
@@ -2946,6 +2952,9 @@
<ClInclude Include="..\..\src\ripple\http\Session.h">
<Filter>ripple\http</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\http\tests\Server.test.cpp">
<Filter>ripple\http\tests</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple\json\impl\JsonPropertyStream.cpp">
<Filter>ripple\json\impl</Filter>
</ClCompile>

View File

@@ -110,12 +110,14 @@ public:
virtual SSL* ssl_handle () = 0;
// Caller owns the socket
// VFALCO TODO return std::unique_ptr
static MultiSocket* New (
boost::asio::ip::tcp::socket& socket,
boost::asio::ssl::context& ssl_context,
int flags = 0);
// Caller owns the io_service
// VFALCO TODO return std::unique_ptr
static MultiSocket* New (
boost::asio::io_service& io_service,
boost::asio::ssl::context& ssl_context,

View File

@@ -23,6 +23,8 @@
#include <beast/net/IPEndpoint.h>
#include <beast/module/asio/basics/SSLContext.h>
#include <beast/utility/Journal.h>
#include <beast/utility/PropertyStream.h>
#include <boost/system/error_code.hpp>
#include <cstdint>
#include <memory>
#include <ostream>
@@ -35,7 +37,7 @@ namespace HTTP {
/** Configuration information for a server listening port. */
struct Port
{
enum Security
enum class Security
{
no_ssl,
allow_ssl,
@@ -74,19 +76,15 @@ struct Handler
/** Called when the connection is accepted and we know remoteAddress. */
virtual void onAccept (Session& session) = 0;
/** Called repeatedly as new HTTP headers are received.
Guaranteed to be called at least once.
*/
virtual void onHeaders (Session& session) = 0;
/** Called when we have the full Content-Body. */
/** Called when we have a complete HTTP request. */
virtual void onRequest (Session& session) = 0;
/** Called when the session ends.
Guaranteed to be called once.
@param errorCode Non zero for a failed connection.
*/
virtual void onClose (Session& session, int errorCode) = 0;
virtual void onClose (Session& session,
boost::system::error_code const& ec) = 0;
/** Called when the server has finished its stop. */
virtual void onStopped (Server& server) = 0;
@@ -108,6 +106,7 @@ public:
*/
virtual
~Server ();
/** Returns the Journal associated with the server. */
beast::Journal
journal () const;
@@ -145,6 +144,9 @@ public:
void
stop ();
void
onWrite (beast::PropertyStream::Map& map);
private:
std::unique_ptr <ServerImpl> m_impl;
};

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_HTTP_SESSION_H_INCLUDED
#define RIPPLE_HTTP_SESSION_H_INCLUDED
#include <beast/http/message.h>
#include <beast/smart_ptr/SharedPtr.h>
#include <beast/net/IPEndpoint.h>
#include <beast/utility/Journal.h>
@@ -88,26 +89,24 @@ public:
void* tag;
/** Returns the Journal to use for logging. */
virtual beast::Journal journal() = 0;
virtual
beast::Journal
journal() = 0;
/** Returns the remote address of the connection. */
virtual beast::IP::Endpoint remoteAddress() = 0;
/** Returns `true` if the full HTTP headers have been received. */
virtual bool headersComplete() = 0;
virtual
beast::IP::Endpoint
remoteAddress() = 0;
/** Returns the currently known set of headers. */
virtual beast::HTTPHeaders headers() = 0;
/** Returns the complete HTTP request when it is known. */
virtual beast::SharedPtr <beast::HTTPRequest> const& request() = 0;
/** Returns the entire Content-Body, if the request is complete. */
virtual std::string content() = 0;
virtual
beast::http::message&
message() = 0;
/** Send a copy of data asynchronously. */
/** @{ */
void write (std::string const& s)
void
write (std::string const& s)
{
if (! s.empty())
write (&s[0],
@@ -115,7 +114,8 @@ public:
}
template <typename BufferSequence>
void write (BufferSequence const& buffers)
void
write (BufferSequence const& buffers)
{
for (typename BufferSequence::const_iterator iter (buffers.begin());
iter != buffers.end(); ++iter)
@@ -126,35 +126,54 @@ public:
}
}
virtual void write (void const* buffer, std::size_t bytes) = 0;
virtual
void
write (void const* buffer, std::size_t bytes) = 0;
/** @} */
/** Output support using ostream. */
/** @{ */
ScopedStream operator<< (std::ostream& manip (std::ostream&))
ScopedStream
operator<< (std::ostream& manip (std::ostream&))
{
return ScopedStream (*this, manip);
}
template <typename T>
ScopedStream operator<< (T const& t)
ScopedStream
operator<< (T const& t)
{
return ScopedStream (*this, t);
}
/** @} */
/** Indicate that the response is complete.
The handler should call this when it has completed writing
the response. If Keep-Alive is indicated on the connection,
this will trigger a read for the next request; else, the
connection will be closed when all remaining data has been sent.
*/
virtual
void
complete() = 0;
/** Detach the session.
This holds the session open so that the response can be sent
asynchronously. Calls to io_service::run made by the server
will not return until all detached sessions are closed.
*/
virtual void detach() = 0;
virtual
void
detach() = 0;
/** Close the session.
This will be performed asynchronously. The session will be
closed gracefully after all pending writes have completed.
@param graceful `true` to wait until all data has finished sending.
*/
virtual void close() = 0;
virtual
void
close (bool graceful) = 0;
};
} // namespace HTTP

View File

@@ -25,31 +25,31 @@ namespace ripple {
namespace HTTP {
Door::Door (ServerImpl& impl, Port const& port)
: impl_ (impl)
, acceptor_ (impl_.get_io_service(), to_asio (port))
: server_ (impl)
, acceptor_ (server_.get_io_service(), to_asio (port))
, port_ (port)
{
impl_.add (*this);
server_.add (*this);
error_code ec;
acceptor_.set_option (acceptor::reuse_address (true), ec);
if (ec)
{
impl_.journal().error <<
server_.journal().error <<
"Error setting acceptor socket option: " << ec.message();
}
if (! ec)
{
impl_.journal().info << "Bound to endpoint " <<
server_.journal().info << "Bound to endpoint " <<
to_string (acceptor_.local_endpoint());
async_accept();
}
else
{
impl_.journal().error << "Error binding to endpoint " <<
server_.journal().error << "Error binding to endpoint " <<
to_string (acceptor_.local_endpoint()) <<
", '" << ec.message() << "'";
}
@@ -57,7 +57,7 @@ Door::Door (ServerImpl& impl, Port const& port)
Door::~Door ()
{
impl_.remove (*this);
server_.remove (*this);
}
Port const&
@@ -80,7 +80,7 @@ Door::failed (error_code ec)
void
Door::async_accept ()
{
auto const peer (std::make_shared <Peer> (impl_, port_));
auto const peer (std::make_shared <Peer> (server_, port_, server_.journal()));
acceptor_.async_accept (peer->get_socket(), std::bind (
&Door::handle_accept, Ptr(this),
beast::asio::placeholders::error, peer));
@@ -95,7 +95,7 @@ Door::handle_accept (error_code ec,
if (ec)
{
impl_.journal().error << "Accept failed: " << ec.message();
server_.journal().error << "Accept failed: " << ec.message();
return;
}

View File

@@ -37,7 +37,7 @@ private:
// VFALCO TODO Use shared_ptr
typedef beast::SharedPtr <Door> Ptr;
ServerImpl& impl_;
ServerImpl& server_;
acceptor acceptor_;
Port port_;

View File

@@ -18,91 +18,386 @@
//==============================================================================
#include <ripple/http/impl/Peer.h>
#include <cassert>
namespace ripple {
namespace HTTP {
Peer::Peer (ServerImpl& impl, Port const& port)
: impl_ (impl)
, strand_ (impl_.get_io_service())
, data_timer_ (impl_.get_io_service())
, request_timer_ (impl_.get_io_service())
, buffer_ (bufferSize)
, writesPending_ (0)
Peer::Peer (ServerImpl& server, Port const& port, beast::Journal journal)
: journal_ (journal)
, server_ (server)
, strand_ (server_.get_io_service())
, timer_ (server_.get_io_service())
, parser_ (message_, true)
, pending_writes_ (0)
, closed_ (false)
, callClose_ (false)
, errorCode_ (0)
, detached_ (0)
, request_count_ (0)
, bytes_in_ (0)
, bytes_out_ (0)
{
static std::atomic <int> sid;
id_ = std::string("#") + std::to_string(sid++);
tag = nullptr;
int flags = MultiSocket::Flag::server_role;
switch (port.security)
{
default:
bassertfalse;
case Port::no_ssl:
case Port::Security::no_ssl:
break;
case Port::allow_ssl:
case Port::Security::allow_ssl:
flags |= MultiSocket::Flag::ssl;
break;
case Port::require_ssl:
case Port::Security::require_ssl:
flags |= MultiSocket::Flag::ssl_required;
break;
}
socket_.reset (MultiSocket::New (
impl_.get_io_service(), port.context->get(), flags));
server_.get_io_service(), port.context->get(), flags));
impl_.add (*this);
server_.add (*this);
if (journal_.trace) journal_.trace <<
id_ << " created";
}
Peer::~Peer ()
{
if (callClose_)
impl_.handler().onClose (session(), errorCode_);
impl_.remove (*this);
}
//--------------------------------------------------------------------------
// Returns the Content-Body as a single buffer.
// VFALCO NOTE This is inefficient...
std::string
Peer::content()
{
std::string s;
beast::DynamicBuffer const& body (
parser_.request()->body ());
s.resize (body.size ());
boost::asio::buffer_copy (
boost::asio::buffer (&s[0],
s.size()), body.data <boost::asio::const_buffer>());
return s;
Stat stat;
stat.when = std::move (when_str_);
stat.elapsed = std::chrono::duration_cast <
std::chrono::seconds> (clock_type::now() - when_);
stat.requests = request_count_;
stat.bytes_in = bytes_in_;
stat.bytes_out = bytes_out_;
stat.ec = std::move (ec_);
server_.report (std::move (stat));
if (journal_.trace) journal_.trace <<
id_ << " onClose (" << ec_ << ")";
server_.handler().onClose (session(), ec_);
}
server_.remove (*this);
if (journal_.trace) journal_.trace <<
id_ << " destroyed";
}
//------------------------------------------------------------------------------
// Called when the acceptor accepts our socket.
void
Peer::accept ()
{
if (! strand_.running_in_this_thread())
return server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer::accept, shared_from_this())));
if (journal_.trace) journal_.trace <<
id_ << " accept";
callClose_ = true;
when_ = clock_type::now();
when_str_ = beast::Time::getCurrentTime().formatted (
"%Y-%b-%d %H:%M:%S").toStdString();
if (journal_.trace) journal_.trace <<
id_ << " onAccept";
server_.handler().onAccept (session());
// Handler might have closed
if (closed_)
{
// VFALCO TODO Is this the correct way to close the socket?
// See what state the socket is in and verify.
cancel();
return;
}
if (socket_->needs_handshake ())
{
start_timer();
socket_->async_handshake (beast::asio::abstract_socket::server,
strand_.wrap (std::bind (&Peer::on_ssl_handshake, shared_from_this(),
beast::asio::placeholders::error)));
}
else
{
on_read_request (error_code{}, 0);
}
}
// Cancel all pending i/o and timers and send tcp shutdown.
void
Peer::cancel ()
{
if (journal_.trace) journal_.trace <<
id_ << " cancel";
error_code ec;
timer_.cancel (ec);
socket_->cancel (ec);
socket_->shutdown (socket::shutdown_both, ec);
}
// Called by a completion handler when error is not eof or aborted.
void
Peer::failed (error_code const& ec)
{
if (journal_.trace) journal_.trace <<
id_ << " failed, " << ec.message();
ec_ = ec;
assert (ec_);
cancel ();
}
// Start the timer.
// If the timer expires, the session is considered
// timed out and will be forcefully closed.
void
Peer::start_timer()
{
if (journal_.trace) journal_.trace <<
id_ << " start_timer";
timer_.expires_from_now (
boost::posix_time::seconds (
timeoutSeconds));
timer_.async_wait (strand_.wrap (std::bind (
&Peer::on_timer, shared_from_this(),
beast::asio::placeholders::error)));
}
// Send a shared buffer
void
Peer::async_write (SharedBuffer const& buf)
{
assert (buf.get().size() > 0);
++pending_writes_;
if (journal_.trace) journal_.trace <<
id_ << " async_write, pending_writes = " << pending_writes_;
start_timer();
// Send the copy. We pass the SharedBuffer in the last parameter
// so that a reference is maintained as the handler gets copied.
// When the final completion function returns, the reference
// count will drop to zero and the buffer will be freed.
//
boost::asio::async_write (*socket_,
boost::asio::const_buffers_1 (&(*buf)[0], buf->size()),
strand_.wrap (std::bind (&Peer::on_write_response,
shared_from_this(), beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred, buf)));
}
//------------------------------------------------------------------------------
// Called when session times out
void
Peer::on_timer (error_code ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
if (! ec)
ec = boost::system::errc::make_error_code (
boost::system::errc::timed_out);
failed (ec);
}
// Called when the handshake completes
void
Peer::on_ssl_handshake (error_code ec)
{
if (journal_.trace) journal_.trace <<
id_ << " on_ssl_handshake, " << ec.message();
if (ec == boost::asio::error::operation_aborted)
return;
if (ec != 0)
{
failed (ec);
return;
}
on_read_request (error_code{}, 0);
}
// Called repeatedly with the http request data
void
Peer::on_read_request (error_code ec, std::size_t bytes_transferred)
{
if (journal_.trace)
{
if (! ec_)
journal_.trace <<
id_ << " on_read_request, " <<
bytes_transferred << " bytes";
else
journal_.trace <<
id_ << " on_read_request failed, " <<
ec.message();
}
if (ec == boost::asio::error::operation_aborted)
return;
bool const eof = ec == boost::asio::error::eof;
if (eof)
ec = error_code{};
if (! ec)
{
bytes_in_ += bytes_transferred;
read_buf_.commit (bytes_transferred);
std::pair <bool, std::size_t> result;
if (! eof)
{
result = parser_.write (read_buf_.data());
if (result.first)
read_buf_.consume (result.second);
else
ec = parser_.error();
}
else
{
result.first = parser_.write_eof();
if (! result.first)
ec = parser_.error();
}
if (! ec)
{
if (parser_.complete())
{
// ???
//if (! socket_->needs_handshake())
// socket_->shutdown (socket::shutdown_receive, ec);
if (journal_.trace) journal_.trace <<
id_ << " onRequest";
++request_count_;
server_.handler().onRequest (session());
return;
}
}
}
if (ec)
{
failed (ec);
return;
}
start_timer();
socket_->async_read_some (read_buf_.prepare (bufferSize), strand_.wrap (
std::bind (&Peer::on_read_request, shared_from_this(),
beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred)));
}
// Called when async_write completes.
void
Peer::on_write_response (error_code ec, std::size_t bytes_transferred,
SharedBuffer const& buf)
{
timer_.cancel (ec);
if (journal_.trace)
{
if (! ec_)
journal_.trace <<
id_ << " on_write_response, " <<
bytes_transferred << " bytes";
else
journal_.trace <<
id_ << " on_write_response failed, " <<
ec.message();
}
if (ec == boost::asio::error::operation_aborted)
return;
if (ec != 0)
{
failed (ec);
return;
}
bytes_out_ += bytes_transferred;
assert (pending_writes_ > 0);
if (--pending_writes_ > 0)
return;
if (closed_)
{
socket_->shutdown (socket::shutdown_send, ec);
return;
}
}
//------------------------------------------------------------------------------
// Send a copy of the data.
void
Peer::write (void const* buffer, std::size_t bytes)
{
// Make sure this happens on an io_service thread.
impl_.get_io_service().dispatch (strand_.wrap (
server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer::async_write, shared_from_this(),
SharedBuffer (static_cast <char const*> (buffer), bytes))));
}
// Called to indicate the response has been written (but not sent)
void
Peer::complete()
{
if (! strand_.running_in_this_thread())
return server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer::complete, shared_from_this())));
message_ = beast::http::message{};
parser_ = beast::http::parser{message_, true};
on_read_request (error_code{}, 0);
}
// Make the Session asynchronous
void
Peer::detach ()
{
if (detached_.exchange (1) == 0)
{
bassert (! work_);
bassert (detach_ref_ == nullptr);
assert (! work_);
assert (detach_ref_ == nullptr);
// Maintain an additional reference while detached
detach_ref_ = shared_from_this();
@@ -112,298 +407,32 @@ Peer::detach ()
// after the Session is closed and handlers complete.
//
work_ = boost::in_place (std::ref (
impl_.get_io_service()));
server_.get_io_service()));
}
}
// Called by the Handler to close the session.
// Called from the Handler to close the session.
void
Peer::close ()
{
// Make sure this happens on an io_service thread.
impl_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer::handle_close, shared_from_this())));
}
//--------------------------------------------------------------------------
// Called when the handshake completes
//
void
Peer::handle_handshake (error_code ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
if (ec != 0)
{
failed (ec);
return;
}
async_read_some();
}
// Called when the data timer expires
//
void
Peer::handle_data_timer (error_code ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
if (closed_)
return;
if (ec != 0)
{
failed (ec);
return;
}
failed (boost::system::errc::make_error_code (
boost::system::errc::timed_out));
}
// Called when the request timer expires
//
void
Peer::handle_request_timer (error_code ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
if (closed_)
return;
if (ec != 0)
{
failed (ec);
return;
}
failed (boost::system::errc::make_error_code (
boost::system::errc::timed_out));
}
// Called when async_write completes.
void
Peer::handle_write (error_code ec, std::size_t bytes_transferred,
SharedBuffer const& buf)
{
if (ec == boost::asio::error::operation_aborted)
return;
if (ec != 0)
{
failed (ec);
return;
}
bassert (writesPending_ > 0);
if (--writesPending_ == 0 && closed_)
socket_->shutdown (socket::shutdown_send, ec);
}
// Called when async_read_some completes.
void
Peer::handle_read (error_code ec, std::size_t bytes_transferred)
{
if (ec == boost::asio::error::operation_aborted)
return;
if (ec != 0 && ec != boost::asio::error::eof)
{
failed (ec);
return;
}
std::size_t const bytes_parsed (parser_.process (
buffer_.getData(), bytes_transferred));
if (parser_.error() ||
bytes_parsed != bytes_transferred)
{
failed (boost::system::errc::make_error_code (
boost::system::errc::bad_message));
return;
}
if (ec == boost::asio::error::eof)
{
parser_.process_eof();
ec = error_code();
}
if (parser_.error())
{
failed (boost::system::errc::make_error_code (
boost::system::errc::bad_message));
return;
}
if (! parser_.finished())
{
// Feed some headers to the callback
if (parser_.fields().size() > 0)
{
handle_headers();
if (closed_)
return;
}
}
if (parser_.finished ())
{
data_timer_.cancel();
// VFALCO NOTE: Should we cancel this one?
request_timer_.cancel();
if (! socket_->needs_handshake())
socket_->shutdown (socket::shutdown_receive, ec);
handle_request ();
return;
}
async_read_some();
}
// Called when we have some new headers.
void
Peer::handle_headers ()
{
impl_.handler().onHeaders (session());
}
// Called when we have a complete http request.
void
Peer::handle_request ()
{
// This is to guarantee onHeaders is called at least once.
handle_headers();
if (closed_)
return;
// Process the HTTPRequest
impl_.handler().onRequest (session());
}
// Called to close the session.
void
Peer::handle_close ()
Peer::close (bool graceful)
{
closed_ = true;
if (! strand_.running_in_this_thread())
return server_.get_io_service().dispatch (strand_.wrap (
std::bind (&Peer::close, shared_from_this(), graceful)));
if (! graceful || pending_writes_ == 0)
{
// We should cancel pending i/o
if (pending_writes_ == 0)
{
}
}
// Release our additional reference
detach_ref_.reset();
}
//--------------------------------------------------------------------------
//
// Peer
//
// Called when the acceptor accepts our socket.
void
Peer::accept ()
{
callClose_ = true;
impl_.handler().onAccept (session());
if (closed_)
{
cancel();
return;
}
request_timer_.expires_from_now (
boost::posix_time::seconds (
requestTimeoutSeconds));
request_timer_.async_wait (strand_.wrap (std::bind (
&Peer::handle_request_timer, shared_from_this(),
beast::asio::placeholders::error)));
if (socket_->needs_handshake ())
{
socket_->async_handshake (beast::asio::abstract_socket::server,
strand_.wrap (std::bind (&Peer::handle_handshake, shared_from_this(),
beast::asio::placeholders::error)));
}
else
{
async_read_some();
}
}
// Cancel all pending i/o and timers and send tcp shutdown.
void
Peer::cancel ()
{
error_code ec;
data_timer_.cancel (ec);
request_timer_.cancel (ec);
socket_->cancel (ec);
socket_->shutdown (socket::shutdown_both, ec);
}
// Called by a completion handler when error is not eof or aborted.
void
Peer::failed (error_code const& ec)
{
errorCode_ = ec.value();
bassert (errorCode_ != 0);
cancel ();
}
// Call the async_read_some initiating function.
void
Peer::async_read_some ()
{
// re-arm the data timer
// (this cancels the previous wait, if any)
//
data_timer_.expires_from_now (
boost::posix_time::seconds (
dataTimeoutSeconds));
data_timer_.async_wait (strand_.wrap (std::bind (
&Peer::handle_data_timer, shared_from_this(),
beast::asio::placeholders::error)));
// issue the read
//
boost::asio::mutable_buffers_1 buf (
buffer_.getData (), buffer_.getSize ());
socket_->async_read_some (buf, strand_.wrap (
std::bind (&Peer::handle_read, shared_from_this(),
beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred)));
}
// Send a shared buffer
void
Peer::async_write (SharedBuffer const& buf)
{
bassert (buf.get().size() > 0);
++writesPending_;
// Send the copy. We pass the SharedBuffer in the last parameter
// so that a reference is maintained as the handler gets copied.
// When the final completion function returns, the reference
// count will drop to zero and the buffer will be freed.
//
boost::asio::async_write (*socket_,
boost::asio::const_buffers_1 (&(*buf)[0], buf->size()),
strand_.wrap (std::bind (&Peer::handle_write,
shared_from_this(), beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred, buf)));
}
}
}

View File

@@ -26,9 +26,14 @@
#include <ripple/http/impl/ServerImpl.h>
#include <ripple/common/MultiSocket.h>
#include <beast/asio/placeholders.h>
#include <beast/http/message.h>
#include <beast/http/parser.h>
#include <beast/module/core/core.h>
#include <beast/module/asio/basics/SharedArg.h>
#include <beast/module/asio/http/HTTPRequestParser.h>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/streambuf.hpp>
#include <chrono>
#include <functional>
#include <memory>
@@ -48,129 +53,54 @@ class Peer
, public beast::LeakChecked <Peer>
{
private:
typedef std::chrono::system_clock clock_type;
enum
{
// Size of our receive buffer
bufferSize = 8192,
bufferSize = 2048,
// Largest HTTP request allowed
maxRequestBytes = 32 * 1024,
// Max seconds without receiving a byte
dataTimeoutSeconds = 10,
// Max seconds without completing the request
requestTimeoutSeconds = 30
// Max seconds without completing a message
timeoutSeconds = 30
//timeoutSeconds = 3
};
ServerImpl& impl_;
beast::Journal journal_;
ServerImpl& server_;
std::string id_;
boost::asio::io_service::strand strand_;
boost::asio::deadline_timer data_timer_;
boost::asio::deadline_timer request_timer_;
boost::asio::deadline_timer timer_;
std::unique_ptr <MultiSocket> socket_;
// VFALCO TODO Use c++11
beast::MemoryBlock buffer_;
beast::HTTPRequestParser parser_;
int writesPending_;
boost::asio::streambuf read_buf_;
beast::http::message message_;
beast::http::parser parser_;
int pending_writes_;
bool closed_;
bool finished_;
bool callClose_;
std::shared_ptr <Peer> detach_ref_;
boost::optional <boost::asio::io_service::work> work_;
int errorCode_;
boost::system::error_code ec_;
std::atomic <int> detached_;
clock_type::time_point when_;
std::string when_str_;
int request_count_;
std::size_t bytes_in_;
std::size_t bytes_out_;
//--------------------------------------------------------------------------
public:
Peer (ServerImpl& impl, Port const& port);
Peer (ServerImpl& impl, Port const& port, beast::Journal journal);
~Peer ();
private:
//--------------------------------------------------------------------------
//
// Session
//
beast::Journal
journal()
{
return impl_.journal();
}
beast::IP::Endpoint
remoteAddress()
{
return from_asio (get_socket().remote_endpoint());
}
bool
headersComplete()
{
return parser_.headersComplete();
}
beast::HTTPHeaders
headers()
{
return beast::HTTPHeaders (parser_.fields());
}
beast::SharedPtr <beast::HTTPRequest> const&
request()
{
return parser_.request();
}
std::string
content();
void
write (void const* buffer, std::size_t bytes);
void
detach ();
void
close ();
//--------------------------------------------------------------------------
//
// Completion Handlers
//
void
handle_handshake (error_code ec);
void
handle_data_timer (error_code ec);
void
handle_request_timer (error_code ec);
void
handle_write (error_code ec, std::size_t bytes_transferred,
SharedBuffer const& buf);
void
handle_read (error_code ec, std::size_t bytes_transferred);
void
handle_headers ();
void
handle_request ();
void
handle_close ();
public:
//--------------------------------------------------------------------------
//
// Peer
//
socket&
get_socket()
{
@@ -186,6 +116,7 @@ public:
void
accept();
private:
void
cancel();
@@ -193,9 +124,66 @@ public:
failed (error_code const& ec);
void
async_read_some ();
start_timer();
void async_write (SharedBuffer const& buf);
void
async_write (SharedBuffer const& buf);
//--------------------------------------------------------------------------
//
// Completion Handlers
//
void
on_timer (error_code ec);
void
on_ssl_handshake (error_code ec);
void
on_read_request (error_code ec, std::size_t bytes_transferred);
void
on_write_response (error_code ec, std::size_t bytes_transferred,
SharedBuffer const& buf);
void
on_close ();
//--------------------------------------------------------------------------
//
// Session
//
beast::Journal
journal() override
{
return server_.journal();
}
beast::IP::Endpoint
remoteAddress() override
{
return from_asio (get_socket().remote_endpoint());
}
beast::http::message&
message() override
{
return message_;
}
void
write (void const* buffer, std::size_t bytes) override;
void
complete() override;
void
detach() override;
void
close (bool graceful) override;
};
}

View File

@@ -21,7 +21,7 @@ namespace ripple {
namespace HTTP {
Port::Port ()
: security (no_ssl)
: security (Security::no_ssl)
, port (0)
, context (nullptr)
{

View File

@@ -64,5 +64,11 @@ Server::stop ()
m_impl->stop(true);
}
void
Server::onWrite (beast::PropertyStream::Map& map)
{
m_impl->onWrite (map);
}
}
}

View File

@@ -18,46 +18,59 @@
//==============================================================================
#include <ripple/http/impl/ServerImpl.h>
#include <beast/chrono/chrono_io.h>
#include <boost/chrono/chrono_io.hpp>
#include <ctime>
#include <iomanip>
#include <iostream>
#include <string>
#include <stdio.h>
#include <time.h>
namespace ripple {
namespace HTTP {
ServerImpl::ServerImpl (Server& server, Handler& handler, beast::Journal journal)
: Thread ("HTTP::Server")
, m_server (server)
ServerImpl::ServerImpl (Server& server,
Handler& handler, beast::Journal journal)
: m_server (server)
, m_handler (handler)
, journal_ (journal)
, m_strand (m_io_service)
, m_work (boost::in_place (std::ref (m_io_service)))
, m_strand (io_service_)
, m_work (boost::in_place (std::ref (io_service_)))
, m_stopped (true)
{
startThread ();
thread_ = std::thread (std::bind (
&ServerImpl::run, this));
}
ServerImpl::~ServerImpl ()
{
stopThread ();
thread_.join();
}
Ports const& ServerImpl::getPorts () const
Ports const&
ServerImpl::getPorts () const
{
SharedState::ConstUnlockedAccess state (m_state);
return state->ports;
std::lock_guard <std::mutex> lock (mutex_);
return state_.ports;
}
void ServerImpl::setPorts (Ports const& ports)
void
ServerImpl::setPorts (Ports const& ports)
{
SharedState::Access state (m_state);
state->ports = ports;
std::lock_guard <std::mutex> lock (mutex_);
state_.ports = ports;
update();
}
bool ServerImpl::stopping () const
bool
ServerImpl::stopping () const
{
return ! m_work;
}
void ServerImpl::stop (bool wait)
void
ServerImpl::stop (bool wait)
{
if (! stopping())
{
@@ -74,55 +87,99 @@ void ServerImpl::stop (bool wait)
// Server
//
Handler& ServerImpl::handler()
Handler&
ServerImpl::handler()
{
return m_handler;
}
boost::asio::io_service& ServerImpl::get_io_service()
boost::asio::io_service&
ServerImpl::get_io_service()
{
return m_io_service;
return io_service_;
}
// Inserts the peer into our list of peers. We only remove it
// from the list inside the destructor of the Peer object. This
// way, the Peer can never outlive the server.
//
void ServerImpl::add (Peer& peer)
void
ServerImpl::add (Peer& peer)
{
SharedState::Access state (m_state);
state->peers.push_back (peer);
std::lock_guard <std::mutex> lock (mutex_);
state_.peers.push_back (peer);
}
void ServerImpl::add (Door& door)
void
ServerImpl::add (Door& door)
{
SharedState::Access state (m_state);
state->doors.push_back (door);
std::lock_guard <std::mutex> lock (mutex_);
state_.doors.push_back (door);
}
// Removes the peer from our list of peers. This is only called from
// the destructor of Peer. Essentially, the item in the list functions
// as a weak_ptr.
//
void ServerImpl::remove (Peer& peer)
void
ServerImpl::remove (Peer& peer)
{
SharedState::Access state (m_state);
state->peers.erase (state->peers.iterator_to (peer));
std::lock_guard <std::mutex> lock (mutex_);
state_.peers.erase (state_.peers.iterator_to (peer));
}
void ServerImpl::remove (Door& door)
void
ServerImpl::remove (Door& door)
{
SharedState::Access state (m_state);
state->doors.push_back (door);
std::lock_guard <std::mutex> lock (mutex_);
state_.doors.push_back (door);
}
//--------------------------------------------------------------------------
//
// Thread
//
void
ServerImpl::report (Stat&& stat)
{
std::lock_guard <std::mutex> lock (mutex_);
if (stats_.size() >= historySize)
stats_.pop_back();
stats_.emplace_front (std::move(stat));
}
void
ServerImpl::onWrite (beast::PropertyStream::Map& map)
{
std::lock_guard <std::mutex> lock (mutex_);
// VFALCO TODO Write the list of doors
{
beast::PropertyStream::Set set ("sessions", map);
for (auto const& stat : stats_)
{
beast::PropertyStream::Map item (set);
item ["when"] = stat.when;
{
std::stringstream ss;
ss << stat.elapsed;
item ["elapsed"] = ss.str();
}
item ["requests"] = stat.requests;
item ["bytes_in"] = stat.bytes_in;
item ["bytes_out"] = stat.bytes_out;
if (stat.ec)
item ["error"] = stat.ec.message();
}
}
}
//--------------------------------------------------------------------------
int ServerImpl::compare (Port const& lhs, Port const& rhs)
int
ServerImpl::compare (Port const& lhs, Port const& rhs)
{
if (lhs < rhs)
return -1;
@@ -131,18 +188,31 @@ int ServerImpl::compare (Port const& lhs, Port const& rhs)
return 0;
}
// Updates our Door list based on settings.
//
void ServerImpl::handle_update ()
void
ServerImpl::update()
{
io_service_.post (m_strand.wrap (std::bind (
&ServerImpl::on_update, this)));
}
// Updates our Door list based on settings.
void
ServerImpl::on_update ()
{
/*
if (! m_strand.running_in_this_thread())
io_service_.dispatch (m_strand.wrap (std::bind (
&ServerImpl::update, this)));
*/
if (! stopping())
{
// Make a local copy to shorten the lock
//
Ports ports;
{
SharedState::ConstAccess state (m_state);
ports = state->ports;
std::lock_guard <std::mutex> lock (mutex_);
ports = state_.ports;
}
std::sort (ports.begin(), ports.end());
@@ -206,19 +276,16 @@ void ServerImpl::handle_update ()
}
}
// Causes handle_update to run on the io_service
//
void ServerImpl::update ()
// Thread entry point to perform io_service work
void
ServerImpl::run()
{
m_io_service.post (m_strand.wrap (std::bind (
&ServerImpl::handle_update, this)));
}
static std::atomic <int> id;
// The main i/o processing loop.
//
void ServerImpl::run ()
{
m_io_service.run ();
beast::Thread::setCurrentThreadName (
std::string("HTTP::Server #") + std::to_string (++id));
io_service_.run();
m_stopped.signal();
m_handler.onStopped (m_server);

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_HTTP_SERVERIMPL_H_INCLUDED
#define RIPPLE_HTTP_SERVERIMPL_H_INCLUDED
#include <ripple/common/seconds_clock.h>
#include <ripple/http/Server.h>
#include <beast/intrusive/List.h>
#include <beast/threads/SharedData.h>
@@ -27,6 +28,11 @@
#include <beast/module/asio/basics/SharedArg.h>
#include <boost/asio.hpp>
#include <boost/optional.hpp>
#include <chrono>
#include <condition_variable>
#include <deque>
#include <functional>
#include <mutex>
#include <thread>
namespace ripple {
@@ -35,9 +41,26 @@ namespace HTTP {
class Door;
class Peer;
class ServerImpl : public beast::Thread
struct Stat
{
std::string when;
std::chrono::seconds elapsed;
int requests;
std::size_t bytes_in;
std::size_t bytes_out;
boost::system::error_code ec;
};
class ServerImpl
{
private:
typedef std::chrono::system_clock clock_type;
enum
{
historySize = 100
};
struct State
{
// Attributes for our listening ports
@@ -50,18 +73,21 @@ private:
beast::List <Door> doors;
};
typedef beast::SharedData <State> SharedState;
typedef std::vector <beast::SharedPtr <Door>> Doors;
Server& m_server;
Handler& m_handler;
std::thread thread_;
std::mutex mutable mutex_;
std::condition_variable cond_;
beast::Journal journal_;
boost::asio::io_service m_io_service;
boost::asio::io_service io_service_;
boost::asio::io_service::strand m_strand;
boost::optional <boost::asio::io_service::work> m_work;
beast::WaitableEvent m_stopped;
SharedState m_state;
State state_;
Doors m_doors;
std::deque <Stat> stats_;
public:
ServerImpl (Server& server, Handler& handler, beast::Journal journal);
@@ -104,15 +130,24 @@ public:
remove (Door& door);
void
handle_update ();
report (Stat&& stat);
void
onWrite (beast::PropertyStream::Map& map);
private:
static
int
compare (Port const& lhs, Port const& rhs);
void
update();
void
run ();
on_update();
static int compare (Port const& lhs, Port const& rhs);
void
run();
};

View File

@@ -0,0 +1,254 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/common/RippleSSLContext.h>
#include <ripple/http/Session.h>
#include <ripple/http/Server.h>
#include <beast/unit_test/suite.h>
#include <boost/asio/ip/tcp.hpp>
#include <chrono>
#include <stdexcept>
#include <thread>
namespace ripple {
namespace HTTP {
class Server_test : public beast::unit_test::suite
{
public:
enum
{
testPort = 1001
};
class TestSink : public beast::Journal::Sink
{
beast::unit_test::suite& suite_;
public:
TestSink (beast::unit_test::suite& suite)
: suite_ (suite)
{
}
void
write (beast::Journal::Severity level,
std::string const& text) override
{
suite_.log << text;
}
};
struct TestHandler : Handler
{
void
onAccept (Session& session) override
{
}
void
onRequest (Session& session) override
{
session << "Hello, world!\n";
if (session.message().keep_alive())
session.complete();
else
session.close (true);
}
void
onClose (Session& session,
boost::system::error_code const&) override
{
}
void
onStopped (Server& server)
{
}
};
// Connect to an address
template <class Socket>
bool
connect (Socket& s, std::string const& addr, int port)
{
try
{
typename Socket::endpoint_type ep (
boost::asio::ip::address::from_string (addr), port);
s.connect (ep);
pass();
return true;
}
catch (std::exception const& e)
{
fail (e.what());
}
return false;
}
// Write a string to the stream
template <class SyncWriteStream>
bool
write (SyncWriteStream& s, std::string const& text)
{
try
{
boost::asio::write (s, boost::asio::buffer (text));
pass();
return true;
}
catch (std::exception const& e)
{
fail (e.what());
}
return false;
}
// Expect that reading the stream produces a matching string
template <class SyncReadStream>
bool
expect_read (SyncReadStream& s, std::string const& match)
{
boost::asio::streambuf b (1000); // limit on read
try
{
auto const n = boost::asio::read_until (s, b, '\n');
if (expect (n == match.size()))
{
std::string got;
got.resize (n);
boost::asio::buffer_copy (boost::asio::buffer (
&got[0], n), b.data());
return expect (got == match);
}
}
catch (std::length_error const& e)
{
fail(e.what());
}
return false;
}
void
test_request()
{
testcase("request");
boost::asio::io_service ios;
typedef boost::asio::ip::tcp::socket socket;
socket s (ios);
if (! connect (s, "127.0.0.1", testPort))
return;
if (! write (s,
"GET / HTTP/1.1\r\n"
"Connection: close\r\n"
"\r\n"))
return;
if (! expect_read (s, "Hello, world!\n"))
return ;
try
{
s.shutdown (socket::shutdown_both);
pass();
}
catch (std::exception const& e)
{
fail (e.what());
}
std::this_thread::sleep_for (std::chrono::seconds (1));
}
void
test_keepalive()
{
testcase("keepalive");
boost::asio::io_service ios;
typedef boost::asio::ip::tcp::socket socket;
socket s (ios);
if (! connect (s, "127.0.0.1", testPort))
return;
if (! write (s,
"GET / HTTP/1.1\r\n"
"Connection: Keep-Alive\r\n"
"\r\n"))
return;
if (! expect_read (s, "Hello, world!\n"))
return ;
if (! write (s,
"GET / HTTP/1.1\r\n"
"Connection: close\r\n"
"\r\n"))
return;
if (! expect_read (s, "Hello, world!\n"))
return ;
try
{
s.shutdown (socket::shutdown_both);
pass();
}
catch (std::exception const& e)
{
fail (e.what());
}
}
void
run()
{
TestSink sink {*this};
sink.severity (beast::Journal::Severity::kAll);
beast::Journal journal {sink};
TestHandler handler;
Server s (handler, journal);
Ports ports;
std::unique_ptr <RippleSSLContext> c (
RippleSSLContext::createBare ());
ports.emplace_back (testPort, beast::IP::Endpoint (
beast::IP::AddressV4 (127, 0, 0, 1), 0),
Port::Security::no_ssl, c.get());
s.setPorts (ports);
test_request();
//test_keepalive();
s.stop();
pass();
}
};
BEAST_DEFINE_TESTSUITE_MANUAL(Server,http,ripple);
}
}

View File

@@ -302,7 +302,7 @@ public:
// VFALCO NOTE LocalCredentials starts the deprecated UNL service
, m_deprecatedUNL (UniqueNodeList::New (*m_jobQueue))
, m_rpcHTTPServer (RPCHTTPServer::New (*m_networkOPs,
, m_rpcHTTPServer (make_RPCHTTPServer (*m_networkOPs,
m_logs.journal("HTTPServer"), *m_jobQueue, *m_networkOPs, *m_resourceManager))
, m_rpcServerHandler (*m_networkOPs, *m_resourceManager) // passive object, not a Service
@@ -363,6 +363,7 @@ public:
m_nodeStoreScheduler.setJobQueue (*m_jobQueue);
add (m_ledgerMaster->getPropertySource ());
add (*m_rpcHTTPServer);
// VFALCO TODO remove these once the call is thread safe.
HashMaps::getInstance ().initializeNonce <size_t> ();

View File

@@ -40,9 +40,7 @@ public:
std::unique_ptr <RippleSSLContext> m_context;
RPCHTTPServerImp (Stoppable& parent,
beast::Journal journal,
JobQueue& jobQueue,
NetworkOPs& networkOPs,
beast::Journal journal, JobQueue& jobQueue, NetworkOPs& networkOPs,
Resource::Manager& resourceManager)
: RPCHTTPServer (parent)
, m_resourceManager (resourceManager)
@@ -70,7 +68,8 @@ public:
m_server.stop();
}
void setup (beast::Journal journal)
void
setup (beast::Journal journal) override
{
if (! getConfig ().getRpcIP().empty () &&
getConfig ().getRpcPort() != 0)
@@ -81,7 +80,7 @@ public:
//if (! is_unspecified (ep))
{
HTTP::Port port;
port.security = HTTP::Port::allow_ssl;
port.security = HTTP::Port::Security::allow_ssl;
port.addr = ep.at_port(0);
if (getConfig ().getRpcPort() != 0)
port.port = getConfig ().getRpcPort();
@@ -105,12 +104,14 @@ public:
// Stoppable
//
void onStop ()
void
onStop() override
{
m_server.stopAsync();
}
void onChildrenStopped ()
void
onChildrenStopped() override
{
}
@@ -119,28 +120,26 @@ public:
// HTTP::Handler
//
void onAccept (HTTP::Session& session)
void
onAccept (HTTP::Session& session) override
{
// Reject non-loopback connections if RPC_ALLOW_REMOTE is not set
if (! getConfig().RPC_ALLOW_REMOTE &&
! beast::IP::is_loopback (session.remoteAddress()))
{
session.close();
session.close (false);
}
}
void onHeaders (HTTP::Session& session)
{
}
void onRequest (HTTP::Session& session)
void
onRequest (HTTP::Session& session) override
{
// Check user/password authorization
auto const headers (session.request()->headers().build_map());
auto const headers (build_map (session.message().headers));
if (! HTTPAuthorized (headers))
{
session.write (HTTPReply (403, "Forbidden"));
session.close();
session.close (true);
return;
}
@@ -158,17 +157,21 @@ public:
#endif
}
void onClose (HTTP::Session& session, int errorCode)
void
onClose (HTTP::Session& session,
boost::system::error_code const&) override
{
}
void onStopped (HTTP::Server&)
void
onStopped (HTTP::Server&) override
{
stopped();
}
//--------------------------------------------------------------------------
// Dispatched on the job queue
void processSession (Job& job, HTTP::Session& session)
{
#if 0
@@ -176,11 +179,19 @@ public:
session.write (m_deprecatedHandler.processRequest (
session.content(), session.remoteAddress().at_port(0)));
#else
session.write (processRequest (session.content(),
auto const s (to_string(session.message().body));
session.write (processRequest (to_string(session.message().body),
session.remoteAddress().at_port(0)));
#endif
session.close();
if (session.message().keep_alive())
{
session.complete();
}
else
{
session.close (true);
}
}
std::string createResponse (
@@ -280,24 +291,35 @@ public:
return createResponse (200, response);
}
//
// PropertyStream
//
void
onWrite (beast::PropertyStream::Map& map) override
{
m_server.onWrite (map);
}
};
//------------------------------------------------------------------------------
RPCHTTPServer::RPCHTTPServer (Stoppable& parent)
: Stoppable ("RPCHTTPServer", parent)
, Source ("rpc")
{
}
//------------------------------------------------------------------------------
RPCHTTPServer* RPCHTTPServer::New (Stoppable& parent,
beast::Journal journal,
JobQueue& jobQueue,
NetworkOPs& networkOPs,
std::unique_ptr <RPCHTTPServer>
make_RPCHTTPServer (beast::Stoppable& parent, beast::Journal journal,
JobQueue& jobQueue, NetworkOPs& networkOPs,
Resource::Manager& resourceManager)
{
return new RPCHTTPServerImp (parent, journal, jobQueue, networkOPs, resourceManager);
return std::make_unique <RPCHTTPServerImp> (
parent, journal, jobQueue, networkOPs, resourceManager);
}
}

View File

@@ -20,24 +20,34 @@
#ifndef RIPPLE_APP_RPCHTTPSERVER_H_INCLUDED
#define RIPPLE_APP_RPCHTTPSERVER_H_INCLUDED
#include <beast/utility/Journal.h>
#include <beast/utility/PropertyStream.h>
#include <beast/cxx14/memory.h> // <memory>
namespace ripple {
class RPCHTTPServer : public beast::Stoppable
class RPCHTTPServer
: public beast::Stoppable
, public beast::PropertyStream::Source
{
protected:
RPCHTTPServer (Stoppable& parent);
public:
static RPCHTTPServer* New (Stoppable& parent,
beast::Journal journal, JobQueue& jobQueue, NetworkOPs& networkOPs,
Resource::Manager& resourceManager);
virtual ~RPCHTTPServer () { }
virtual
~RPCHTTPServer() = default;
/** Opens listening ports based on the Config settings. */
virtual void setup (beast::Journal journal) = 0;
virtual
void
setup (beast::Journal journal) = 0;
};
std::unique_ptr <RPCHTTPServer>
make_RPCHTTPServer (beast::Stoppable& parent, beast::Journal journal,
JobQueue& jobQueue, NetworkOPs& networkOPs,
Resource::Manager& resourceManager);
}
#endif

View File

@@ -40,8 +40,6 @@ peerTXData (Job&, std::weak_ptr <Peer> wPeer, uint256 const& hash,
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
/* Completion handlers for client role.
Logic steps:
1. Establish outgoing connection
@@ -114,11 +112,11 @@ PeerImp::on_connect (error_code ec)
beast::asio::placeholders::error)));
}
beast::http::basic_message
beast::http::message
PeerImp::make_request()
{
assert (! m_inbound);
beast::http::basic_message m;
beast::http::message m;
m.method (beast::http::method_t::http_get);
m.url ("/");
m.version (1, 1);
@@ -152,8 +150,8 @@ PeerImp::on_connect_ssl (error_code ec)
}
#if RIPPLE_STRUCTURED_OVERLAY_CLIENT
beast::http::basic_message req (make_request());
beast::http::xwrite (write_buffer_, req);
beast::http::message req (make_request());
beast::http::write (write_buffer_, req);
on_write_http_request (error_code(), 0);
#else
@@ -204,8 +202,11 @@ PeerImp::on_read_http_response (error_code ec, std::size_t bytes_transferred)
if (! ec)
{
read_buffer_.commit (bytes_transferred);
bool success;
std::size_t bytes_consumed;
std::tie (ec, bytes_consumed) = http_parser_->write (read_buffer_.data());
std::tie (success, bytes_consumed) = http_parser_->write (read_buffer_.data());
if (! success)
ec = http_parser_->error();
if (! ec)
{
@@ -232,6 +233,12 @@ PeerImp::on_read_http_response (error_code ec, std::size_t bytes_transferred)
}
}
if (ec == boost::asio::error::eof)
{
// remote closed their end
// VFALCO TODO Clean up the shutdown of the socket
}
if (ec)
{
m_journal.info <<
@@ -344,8 +351,12 @@ PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred)
if (! ec)
{
read_buffer_.commit (bytes_transferred);
bool success;
std::size_t bytes_consumed;
std::tie (ec, bytes_consumed) = http_parser_->write (read_buffer_.data());
std::tie (success, bytes_consumed) = http_parser_->write (read_buffer_.data());
if (! success)
ec = http_parser_->error();
if (! ec)
{
read_buffer_.consume (bytes_consumed);
@@ -363,7 +374,7 @@ PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred)
"Upgrade: Ripple/1.2\r\n"
"Connection: Upgrade\r\n"
"\r\n";
beast::http::xwrite (write_buffer_, ss.str());
beast::http::write (write_buffer_, ss.str());
on_write_http_response(error_code(), 0);
}
else
@@ -377,7 +388,7 @@ PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred)
"400 Bad Request<br>"
"The server requires an Upgrade request."
"</body></html>";
beast::http::xwrite (write_buffer_, ss.str());
beast::http::write (write_buffer_, ss.str());
on_write_http_response(error_code(), 0);
}
return;
@@ -399,10 +410,10 @@ PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred)
beast::asio::placeholders::bytes_transferred)));
}
beast::http::basic_message
PeerImp::make_response (beast::http::basic_message const& req)
beast::http::message
PeerImp::make_response (beast::http::message const& req)
{
beast::http::basic_message resp;
beast::http::message resp;
// Unimplemented
return resp;
}

View File

@@ -39,7 +39,8 @@
#include <beast/asio/IPAddressConversion.h>
#include <beast/asio/placeholders.h>
#include <beast/http/basic_message.h>
#include <beast/http/message.h>
#include <beast/http/parser.h>
#include <cstdint>
@@ -167,8 +168,8 @@ public:
boost::asio::streambuf read_buffer_;
boost::optional <beast::http::basic_message> http_message_;
boost::optional <beast::http::basic_message::parser> http_parser_;
boost::optional <beast::http::message> http_message_;
boost::optional <beast::http::parser> http_parser_;
message_stream message_stream_;
boost::asio::streambuf write_buffer_;
@@ -280,7 +281,7 @@ private:
void
on_connect (error_code ec);
beast::http::basic_message
beast::http::message
make_request();
void
@@ -308,8 +309,8 @@ private:
void
on_read_http_request (error_code ec, std::size_t bytes_transferred);
beast::http::basic_message
make_response (beast::http::basic_message const& req);
beast::http::message
make_response (beast::http::message const& req);
void
on_write_http_response (error_code ec, std::size_t bytes_transferred);

View File

@@ -20,7 +20,7 @@
#ifndef RIPPLE_OVERLAY_PEER_INFO_H_INCLUDED
#define RIPPLE_OVERLAY_PEER_INFO_H_INCLUDED
#include <beast/http/basic_message.h>
#include <beast/http/message.h>
namespace ripple {

View File

@@ -25,3 +25,4 @@
#include <ripple/http/impl/ScopedStream.cpp>
#include <ripple/http/impl/ServerImpl.cpp>
#include <ripple/http/impl/Server.cpp>
#include <ripple/http/tests/Server.test.cpp>