Refactor Server (RIPD-1120):

* Make Handler a template argument
This commit is contained in:
Vinnie Falco
2016-05-20 08:44:13 -04:00
parent 80a9a2bf5d
commit 289c8c9f09
53 changed files with 997 additions and 1176 deletions

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Copyright(c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@@ -22,9 +22,7 @@
#include <ripple/basics/Log.h>
#include <ripple/server/Session.h>
#include <ripple/server/impl/Door.h>
#include <ripple/server/impl/io_list.h>
#include <ripple/server/impl/ServerImpl.h>
#include <ripple/beast/net/IPAddressConversion.h>
#include <beast/core/placeholders.hpp>
#include <ripple/beast/asio/ssl_error.h> // for is_short_read?
@@ -48,7 +46,7 @@
namespace ripple {
/** Represents an active connection. */
template <class Impl>
template<class Handler, class Impl>
class BaseHTTPPeer
: public io_list::work
, public Session
@@ -71,12 +69,12 @@ protected:
struct buffer
{
buffer (void const* ptr, std::size_t len)
: data (new char[len])
, bytes (len)
, used (0)
buffer(void const* ptr, std::size_t len)
: data(new char[len])
, bytes(len)
, used(0)
{
memcpy (data.get(), ptr, len);
memcpy(data.get(), ptr, len);
}
std::unique_ptr <char[]> data;
@@ -111,8 +109,8 @@ protected:
//--------------------------------------------------------------------------
public:
template <class ConstBufferSequence>
BaseHTTPPeer (Port const& port, Handler& handler,
template<class ConstBufferSequence>
BaseHTTPPeer(Port const& port, Handler& handler,
boost::asio::io_service& io_service, beast::Journal journal,
endpoint_type remote_address, ConstBufferSequence const& buffers);
@@ -135,7 +133,7 @@ protected:
}
void
fail (error_code ec, char const* what);
fail(error_code ec, char const* what);
void
start_timer();
@@ -144,18 +142,18 @@ protected:
cancel_timer();
void
on_timer (error_code ec);
on_timer(error_code ec);
void
do_read (yield_context yield);
do_read(yield_context do_yield);
void
on_write(error_code const& ec,
std::size_t bytes_transferred);
void
do_writer (std::shared_ptr <Writer> const& writer,
bool keep_alive, yield_context yield);
do_writer(std::shared_ptr <Writer> const& writer,
bool keep_alive, yield_context do_yield);
virtual
void
@@ -192,10 +190,10 @@ protected:
}
void
write (void const* buffer, std::size_t bytes) override;
write(void const* buffer, std::size_t bytes) override;
void
write (std::shared_ptr <Writer> const& writer,
write(std::shared_ptr <Writer> const& writer,
bool keep_alive) override;
std::shared_ptr<Session>
@@ -205,27 +203,28 @@ protected:
complete() override;
void
close (bool graceful) override;
close(bool graceful) override;
};
//------------------------------------------------------------------------------
template <class Impl>
template <class ConstBufferSequence>
BaseHTTPPeer<Impl>::BaseHTTPPeer (Port const& port, Handler& handler,
template<class Handler, class Impl>
template<class ConstBufferSequence>
BaseHTTPPeer<Handler, Impl>::
BaseHTTPPeer(Port const& port, Handler& handler,
boost::asio::io_service& io_service, beast::Journal journal,
endpoint_type remote_address,
ConstBufferSequence const& buffers)
: port_(port)
, handler_(handler)
, work_ (io_service)
, strand_ (io_service)
, timer_ (io_service)
, remote_address_ (remote_address)
, journal_ (journal)
, work_(io_service)
, strand_(io_service)
, timer_(io_service)
, remote_address_(remote_address)
, journal_(journal)
{
read_buf_.commit(boost::asio::buffer_copy(read_buf_.prepare (
boost::asio::buffer_size (buffers)), buffers));
read_buf_.commit(boost::asio::buffer_copy(read_buf_.prepare(
boost::asio::buffer_size(buffers)), buffers));
static std::atomic <int> sid;
nid_ = ++sid;
id_ = std::string("#") + std::to_string(nid_) + " ";
@@ -233,22 +232,24 @@ BaseHTTPPeer<Impl>::BaseHTTPPeer (Port const& port, Handler& handler,
"accept: " << remote_address_.address();
}
template <class Impl>
BaseHTTPPeer<Impl>::~BaseHTTPPeer()
template<class Handler, class Impl>
BaseHTTPPeer<Handler, Impl>::
~BaseHTTPPeer()
{
handler_.onClose(session(), ec_);
JLOG(journal_.trace()) << id_ <<
"destroyed: " << request_count_ <<
((request_count_ == 1) ? " request" : " requests");
((request_count_ == 1) ? " request" : " requests");
}
template <class Impl>
template<class Handler, class Impl>
void
BaseHTTPPeer<Impl>::close()
BaseHTTPPeer<Handler, Impl>::
close()
{
if (! strand_.running_in_this_thread())
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(
(void(BaseHTTPPeer::*)(void))&BaseHTTPPeer::close,
(void(BaseHTTPPeer::*)(void))&BaseHTTPPeer::close,
impl().shared_from_this()));
error_code ec;
impl().stream_.lowest_layer().close(ec);
@@ -256,64 +257,69 @@ BaseHTTPPeer<Impl>::close()
//------------------------------------------------------------------------------
template <class Impl>
template<class Handler, class Impl>
void
BaseHTTPPeer<Impl>::fail (error_code ec, char const* what)
BaseHTTPPeer<Handler, Impl>::
fail(error_code ec, char const* what)
{
if (! ec_ && ec != boost::asio::error::operation_aborted)
if(! ec_ && ec != boost::asio::error::operation_aborted)
{
ec_ = ec;
JLOG(journal_.trace()) << id_ <<
std::string(what) << ": " << ec.message();
impl().stream_.lowest_layer().close (ec);
impl().stream_.lowest_layer().close(ec);
}
}
template <class Impl>
template<class Handler, class Impl>
void
BaseHTTPPeer<Impl>::start_timer()
BaseHTTPPeer<Handler, Impl>::
start_timer()
{
error_code ec;
timer_.expires_from_now (std::chrono::seconds(timeoutSeconds), ec);
if (ec)
return fail (ec, "start_timer");
timer_.async_wait (strand_.wrap (std::bind (
&BaseHTTPPeer<Impl>::on_timer, impl().shared_from_this(),
timer_.expires_from_now(std::chrono::seconds(timeoutSeconds), ec);
if(ec)
return fail(ec, "start_timer");
timer_.async_wait(strand_.wrap(std::bind(
&BaseHTTPPeer<Handler, Impl>::on_timer, impl().shared_from_this(),
beast::asio::placeholders::error)));
}
// Convenience for discarding the error code
template <class Impl>
template<class Handler, class Impl>
void
BaseHTTPPeer<Impl>::cancel_timer()
BaseHTTPPeer<Handler, Impl>::
cancel_timer()
{
error_code ec;
timer_.cancel(ec);
}
// Called when session times out
template <class Impl>
template<class Handler, class Impl>
void
BaseHTTPPeer<Impl>::on_timer (error_code ec)
BaseHTTPPeer<Handler, Impl>::
on_timer(error_code ec)
{
if (ec == boost::asio::error::operation_aborted)
if(ec == boost::asio::error::operation_aborted)
return;
if (! ec)
ec = boost::system::errc::make_error_code (
if(! ec)
ec = boost::system::errc::make_error_code(
boost::system::errc::timed_out);
fail (ec, "timer");
fail(ec, "timer");
}
//------------------------------------------------------------------------------
template <class Impl>
template<class Handler, class Impl>
void
BaseHTTPPeer<Impl>::do_read (yield_context yield)
BaseHTTPPeer<Handler, Impl>::
do_read(yield_context do_yield)
{
complete_ = false;
error_code ec;
beast::http::async_read(impl().stream_,
read_buf_, message_, yield[ec]);
read_buf_, message_, do_yield[ec]);
// VFALCO What if the connection was closed?
cancel_timer();
do_request();
@@ -321,9 +327,10 @@ BaseHTTPPeer<Impl>::do_read (yield_context yield)
// Send everything in the write queue.
// The write queue must not be empty upon entry.
template<class Impl>
template<class Handler, class Impl>
void
BaseHTTPPeer<Impl>::on_write(error_code const& ec,
BaseHTTPPeer<Handler, Impl>::
on_write(error_code const& ec,
std::size_t bytes_transferred)
{
cancel_timer();
@@ -349,19 +356,20 @@ BaseHTTPPeer<Impl>::on_write(error_code const& ec,
impl().shared_from_this(), placeholders::error,
placeholders::bytes_transferred)));
}
if (! complete_)
if(! complete_)
return;
if (graceful_)
if(graceful_)
return do_close();
boost::asio::spawn(strand_,
std::bind (&BaseHTTPPeer<Impl>::do_read,
std::bind(&BaseHTTPPeer<Handler, Impl>::do_read,
impl().shared_from_this(), std::placeholders::_1));
}
template <class Impl>
template<class Handler, class Impl>
void
BaseHTTPPeer<Impl>::do_writer (std::shared_ptr <Writer> const& writer,
bool keep_alive, yield_context yield)
BaseHTTPPeer<Handler, Impl>::
do_writer(std::shared_ptr <Writer> const& writer,
bool keep_alive, yield_context do_yield)
{
std::function <void(void)> resume;
{
@@ -369,43 +377,44 @@ BaseHTTPPeer<Impl>::do_writer (std::shared_ptr <Writer> const& writer,
resume = std::function <void(void)>(
[this, p, writer, keep_alive]()
{
boost::asio::spawn (strand_, std::bind (
&BaseHTTPPeer<Impl>::do_writer, p, writer, keep_alive,
boost::asio::spawn(strand_, std::bind(
&BaseHTTPPeer<Handler, Impl>::do_writer, p, writer, keep_alive,
std::placeholders::_1));
});
}
for(;;)
{
if (! writer->prepare (bufferSize, resume))
if(! writer->prepare(bufferSize, resume))
return;
error_code ec;
auto const bytes_transferred = boost::asio::async_write (
auto const bytes_transferred = boost::asio::async_write(
impl().stream_, writer->data(), boost::asio::transfer_at_least(1),
yield[ec]);
if (ec)
return fail (ec, "writer");
do_yield[ec]);
if(ec)
return fail(ec, "writer");
writer->consume(bytes_transferred);
if (writer->complete())
if(writer->complete())
break;
}
if (! keep_alive)
if(! keep_alive)
return do_close();
boost::asio::spawn (strand_, std::bind (&BaseHTTPPeer<Impl>::do_read,
boost::asio::spawn(strand_, std::bind(&BaseHTTPPeer<Handler, Impl>::do_read,
impl().shared_from_this(), std::placeholders::_1));
}
//------------------------------------------------------------------------------
// Send a copy of the data.
template <class Impl>
template<class Handler, class Impl>
void
BaseHTTPPeer<Impl>::write(
BaseHTTPPeer<Handler, Impl>::
write(
void const* buffer, std::size_t bytes)
{
if (bytes == 0)
if(bytes == 0)
return;
if([&]
{
@@ -414,7 +423,7 @@ BaseHTTPPeer<Impl>::write(
return wq_.size() == 1 && wq2_.size() == 0;
}())
{
if (strand_.running_in_this_thread())
if(strand_.running_in_this_thread())
return strand_.post(std::bind(
&BaseHTTPPeer::on_write,
impl().shared_from_this(),
@@ -424,33 +433,36 @@ BaseHTTPPeer<Impl>::write(
}
}
template <class Impl>
template<class Handler, class Impl>
void
BaseHTTPPeer<Impl>::write (std::shared_ptr <Writer> const& writer,
BaseHTTPPeer<Handler, Impl>::
write(std::shared_ptr <Writer> const& writer,
bool keep_alive)
{
boost::asio::spawn (strand_, std::bind (
&BaseHTTPPeer<Impl>::do_writer, impl().shared_from_this(),
boost::asio::spawn(strand_, std::bind(
&BaseHTTPPeer<Handler, Impl>::do_writer, impl().shared_from_this(),
writer, keep_alive, std::placeholders::_1));
}
// DEPRECATED
// Make the Session asynchronous
template <class Impl>
template<class Handler, class Impl>
std::shared_ptr<Session>
BaseHTTPPeer<Impl>::detach()
BaseHTTPPeer<Handler, Impl>::
detach()
{
return impl().shared_from_this();
}
// DEPRECATED
// Called to indicate the response has been written (but not sent)
template <class Impl>
// Called to indicate the response has been written(but not sent)
template<class Handler, class Impl>
void
BaseHTTPPeer<Impl>::complete()
BaseHTTPPeer<Handler, Impl>::
complete()
{
if (! strand_.running_in_this_thread())
return strand_.post(std::bind (&BaseHTTPPeer<Impl>::complete,
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(&BaseHTTPPeer<Handler, Impl>::complete,
impl().shared_from_this()));
message_ = {};
@@ -458,40 +470,41 @@ BaseHTTPPeer<Impl>::complete()
{
std::lock_guard<std::mutex> lock(mutex_);
if (! wq_.empty() && ! wq2_.empty())
if(! wq_.empty() && ! wq2_.empty())
return;
}
// keep-alive
boost::asio::spawn (strand_, std::bind (&BaseHTTPPeer<Impl>::do_read,
boost::asio::spawn(strand_, std::bind(&BaseHTTPPeer<Handler, Impl>::do_read,
impl().shared_from_this(), std::placeholders::_1));
}
// DEPRECATED
// Called from the Handler to close the session.
template <class Impl>
template<class Handler, class Impl>
void
BaseHTTPPeer<Impl>::close (bool graceful)
BaseHTTPPeer<Handler, Impl>::
close(bool graceful)
{
if (! strand_.running_in_this_thread())
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(
(void(BaseHTTPPeer::*)(bool))&BaseHTTPPeer<Impl>::close,
(void(BaseHTTPPeer::*)(bool))&BaseHTTPPeer<Handler, Impl>::close,
impl().shared_from_this(), graceful));
complete_ = true;
if (graceful)
if(graceful)
{
graceful_ = true;
{
std::lock_guard<std::mutex> lock(mutex_);
if (! wq_.empty() || ! wq2_.empty())
if(! wq_.empty() || ! wq2_.empty())
return;
}
return do_close();
}
error_code ec;
impl().stream_.lowest_layer().close (ec);
impl().stream_.lowest_layer().close(ec);
}
} // ripple

View File

@@ -20,7 +20,6 @@
#ifndef RIPPLE_SERVER_BASEPEER_H_INCLUDED
#define RIPPLE_SERVER_BASEPEER_H_INCLUDED
#include <ripple/server/Handler.h>
#include <ripple/server/Port.h>
#include <ripple/server/impl/io_list.h>
#include <ripple/beast/utility/WrappedSink.h>
@@ -32,7 +31,7 @@
namespace ripple {
// Common part of all peers
template<class Impl>
template<class Handler, class Impl>
class BasePeer
: public io_list::work
{
@@ -76,8 +75,9 @@ private:
//------------------------------------------------------------------------------
template<class Impl>
BasePeer<Impl>::BasePeer(Port const& port, Handler& handler,
template<class Handler, class Impl>
BasePeer<Handler, Impl>::
BasePeer(Port const& port, Handler& handler,
endpoint_type remote_address,
boost::asio::io_service& io_service,
beast::Journal journal)
@@ -96,9 +96,10 @@ BasePeer<Impl>::BasePeer(Port const& port, Handler& handler,
{
}
template<class Impl>
template<class Handler, class Impl>
void
BasePeer<Impl>::close()
BasePeer<Handler, Impl>::
close()
{
if (! strand_.running_in_this_thread())
return strand_.post(std::bind(
@@ -107,10 +108,11 @@ BasePeer<Impl>::close()
impl().ws_.lowest_layer().close(ec);
}
template<class Impl>
template<class Handler, class Impl>
template<class String>
void
BasePeer<Impl>::fail(error_code ec, String const& what)
BasePeer<Handler, Impl>::
fail(error_code ec, String const& what)
{
assert(strand_.running_in_this_thread());
if(! ec_ &&

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Copyright(c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@@ -30,9 +30,9 @@
namespace ripple {
/** Represents an active WebSocket connection. */
template <class Impl>
template<class Handler, class Impl>
class BaseWSPeer
: public BasePeer<Impl>
: public BasePeer<Handler, Impl>
, public WSSession
{
protected:
@@ -40,8 +40,8 @@ protected:
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using waitable_timer = boost::asio::basic_waitable_timer <clock_type>;
using BasePeer<Impl>::fail;
using BasePeer<Impl>::strand_;
using BasePeer<Handler, Impl>::fail;
using BasePeer<Handler, Impl>::strand_;
enum
{
@@ -50,7 +50,7 @@ protected:
};
private:
friend class BasePeer<Impl>;
friend class BasePeer<Handler, Impl>;
http_request_type request_;
beast::websocket::opcode op_;
@@ -168,25 +168,27 @@ protected:
//------------------------------------------------------------------------------
template<class Impl>
template<class Handler, class Impl>
template<class Body, class Headers>
BaseWSPeer<Impl>::BaseWSPeer(
BaseWSPeer<Handler, Impl>::
BaseWSPeer(
Port const& port,
Handler& handler,
endpoint_type remote_address,
beast::http::request_v1<Body, Headers>&& request,
boost::asio::io_service& io_service,
beast::Journal journal)
: BasePeer<Impl>(port, handler, remote_address,
: BasePeer<Handler, Impl>(port, handler, remote_address,
io_service, journal)
, request_(std::move(request))
, timer_(io_service)
{
}
template<class Impl>
template<class Handler, class Impl>
void
BaseWSPeer<Impl>::run()
BaseWSPeer<Handler, Impl>::
run()
{
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(
@@ -198,9 +200,10 @@ BaseWSPeer<Impl>::run()
placeholders::error)));
}
template<class Impl>
template<class Handler, class Impl>
void
BaseWSPeer<Impl>::send(std::shared_ptr<WSMsg> w)
BaseWSPeer<Handler, Impl>::
send(std::shared_ptr<WSMsg> w)
{
// Maximum send queue size
static std::size_t constexpr limit = 100;
@@ -208,7 +211,7 @@ BaseWSPeer<Impl>::send(std::shared_ptr<WSMsg> w)
return strand_.post(std::bind(
&BaseWSPeer::send, impl().shared_from_this(),
std::move(w)));
if (wq_.size() >= limit)
if(wq_.size() >= limit)
{
cr_.code = static_cast<beast::websocket::close_code::value>(4000);
cr_.reason = "Client is too slow.";
@@ -221,9 +224,10 @@ BaseWSPeer<Impl>::send(std::shared_ptr<WSMsg> w)
on_write({});
}
template<class Impl>
template<class Handler, class Impl>
void
BaseWSPeer<Impl>::close()
BaseWSPeer<Handler, Impl>::
close()
{
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(
@@ -236,28 +240,31 @@ BaseWSPeer<Impl>::close()
beast::asio::placeholders::error)));
}
template<class Impl>
template<class Handler, class Impl>
void
BaseWSPeer<Impl>::complete()
BaseWSPeer<Handler, Impl>::
complete()
{
if (! strand_.running_in_this_thread())
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(
&BaseWSPeer::complete, impl().shared_from_this()));
do_read();
}
template<class Impl>
template<class Handler, class Impl>
void
BaseWSPeer<Impl>::on_write_sb(error_code const& ec)
BaseWSPeer<Handler, Impl>::
on_write_sb(error_code const& ec)
{
if(ec)
return fail(ec, "write_resp");
do_read();
}
template<class Impl>
template<class Handler, class Impl>
void
BaseWSPeer<Impl>::do_write()
BaseWSPeer<Handler, Impl>::
do_write()
{
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(
@@ -265,9 +272,10 @@ BaseWSPeer<Impl>::do_write()
on_write({});
}
template<class Impl>
template<class Handler, class Impl>
void
BaseWSPeer<Impl>::on_write(error_code const& ec)
BaseWSPeer<Handler, Impl>::
on_write(error_code const& ec)
{
cancel_timer();
if(ec)
@@ -292,9 +300,10 @@ BaseWSPeer<Impl>::on_write(error_code const& ec)
placeholders::error)));
}
template<class Impl>
template<class Handler, class Impl>
void
BaseWSPeer<Impl>::on_write_fin(error_code const& ec)
BaseWSPeer<Handler, Impl>::
on_write_fin(error_code const& ec)
{
if(ec)
return fail(ec, "write_fin");
@@ -307,9 +316,10 @@ BaseWSPeer<Impl>::on_write_fin(error_code const& ec)
on_write({});
}
template<class Impl>
template<class Handler, class Impl>
void
BaseWSPeer<Impl>::do_read()
BaseWSPeer<Handler, Impl>::
do_read()
{
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(
@@ -321,9 +331,10 @@ BaseWSPeer<Impl>::do_read()
cancel_timer();
}
template<class Impl>
template<class Handler, class Impl>
void
BaseWSPeer<Impl>::on_read(error_code const& ec)
BaseWSPeer<Handler, Impl>::
on_read(error_code const& ec)
{
if(ec == beast::websocket::error::closed)
return do_close();
@@ -338,48 +349,52 @@ BaseWSPeer<Impl>::on_read(error_code const& ec)
rb_.consume(rb_.size());
}
template<class Impl>
template<class Handler, class Impl>
void
BaseWSPeer<Impl>::on_close(error_code const& ec)
BaseWSPeer<Handler, Impl>::
on_close(error_code const& ec)
{
// great
}
template <class Impl>
template<class Handler, class Impl>
void
BaseWSPeer<Impl>::start_timer()
BaseWSPeer<Handler, Impl>::
start_timer()
{
// Max seconds without completing a message
static constexpr std::chrono::seconds timeout{30};
error_code ec;
timer_.expires_from_now (timeout, ec);
if (ec)
return fail (ec, "start_timer");
timer_.async_wait (strand_.wrap (std::bind (
&BaseWSPeer<Impl>::on_timer, impl().shared_from_this(),
timer_.expires_from_now(timeout, ec);
if(ec)
return fail(ec, "start_timer");
timer_.async_wait(strand_.wrap(std::bind(
&BaseWSPeer<Handler, Impl>::on_timer, impl().shared_from_this(),
beast::asio::placeholders::error)));
}
// Convenience for discarding the error code
template <class Impl>
template<class Handler, class Impl>
void
BaseWSPeer<Impl>::cancel_timer()
BaseWSPeer<Handler, Impl>::
cancel_timer()
{
error_code ec;
timer_.cancel(ec);
}
// Called when session times out
template <class Impl>
template<class Handler, class Impl>
void
BaseWSPeer<Impl>::on_timer (error_code ec)
BaseWSPeer<Handler, Impl>::
on_timer(error_code ec)
{
if (ec == boost::asio::error::operation_aborted)
if(ec == boost::asio::error::operation_aborted)
return;
if (! ec)
ec = boost::system::errc::make_error_code (
if(! ec)
ec = boost::system::errc::make_error_code(
boost::system::errc::timed_out);
fail (ec, "timer");
fail(ec, "timer");
}
} // ripple

View File

@@ -1,296 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/basics/contract.h>
#include <ripple/basics/Log.h>
#include <ripple/server/impl/Door.h>
#include <ripple/server/impl/PlainHTTPPeer.h>
#include <ripple/server/impl/SSLHTTPPeer.h>
#include <boost/asio/buffer.hpp>
#include <beast/core/placeholders.hpp>
#include <ripple/beast/asio/ssl_bundle.h>
#include <functional>
namespace ripple {
/** Detect SSL client handshakes.
Analyzes the bytes in the provided buffer to detect the SSL client
handshake. If the buffer contains insufficient data, more data will be
read from the stream until there is enough to determine a result.
No bytes are discarded from buf. Any additional bytes read are retained.
buf must provide an interface compatible with boost::asio::streambuf
http://boost.org/doc/libs/1_56_0/doc/html/boost_asio/reference/streambuf.html
See
http://www.ietf.org/rfc/rfc2246.txt
Section 7.4. Handshake protocol
@param socket The stream to read from
@param buf A buffer to hold the received data
@param yield A yield context
@return The error code if an error occurs, otherwise `true` if
the data read indicates the SSL client handshake.
*/
template <class Socket, class StreamBuf, class Yield>
std::pair <boost::system::error_code, bool>
detect_ssl (Socket& socket, StreamBuf& buf, Yield yield)
{
std::pair <boost::system::error_code, bool> result;
result.second = false;
for(;;)
{
std::size_t const max = 4; // the most bytes we could need
unsigned char data[max];
auto const bytes = boost::asio::buffer_copy (
boost::asio::buffer(data), buf.data());
if (bytes > 0)
{
if (data[0] != 0x16) // message type 0x16 = "SSL Handshake"
break;
}
if (bytes >= max)
{
result.second = true;
break;
}
buf.commit(boost::asio::async_read (socket,
buf.prepare(max - bytes), boost::asio::transfer_at_least(1),
yield[result.first]));
if (result.first)
break;
}
return result;
}
//------------------------------------------------------------------------------
Door::Detector::Detector(Port const& port,
Handler& handler, socket_type&& socket,
endpoint_type remote_address, beast::Journal j)
: port_(port)
, handler_(handler)
, socket_(std::move(socket))
, timer_(socket_.get_io_service())
, remote_address_(remote_address)
, strand_(socket_.get_io_service())
, j_(j)
{
}
void
Door::Detector::run()
{
// do_detect must be called before do_timer or else
// the timer can be canceled before it gets set.
boost::asio::spawn (strand_, std::bind (&Detector::do_detect,
shared_from_this(), std::placeholders::_1));
boost::asio::spawn (strand_, std::bind (&Detector::do_timer,
shared_from_this(), std::placeholders::_1));
}
void
Door::Detector::close()
{
error_code ec;
socket_.close(ec);
timer_.cancel(ec);
}
void
Door::Detector::do_timer (yield_context yield)
{
error_code ec; // ignored
while (socket_.is_open())
{
timer_.async_wait (yield[ec]);
if (timer_.expires_from_now() <= std::chrono::seconds(0))
socket_.close(ec);
}
}
void
Door::Detector::do_detect (boost::asio::yield_context yield)
{
bool ssl;
error_code ec;
beast::streambuf buf(16);
timer_.expires_from_now(std::chrono::seconds(15));
std::tie(ec, ssl) = detect_ssl(socket_, buf, yield);
error_code unused;
timer_.cancel(unused);
if (! ec)
{
if (ssl)
{
if(auto sp = ios().emplace<SSLHTTPPeer>(port_, handler_,
j_, remote_address_, buf.data(),
std::move(socket_)))
sp->run();
return;
}
if(auto sp = ios().emplace<PlainHTTPPeer>(port_, handler_,
j_, remote_address_, buf.data(),
std::move(socket_)))
sp->run();
return;
}
if (ec != boost::asio::error::operation_aborted)
{
JLOG(j_.trace()) <<
"Error detecting ssl: " << ec.message() <<
" from " << remote_address_;
}
}
//------------------------------------------------------------------------------
Door::Door (Handler& handler, boost::asio::io_service& io_service,
Port const& port, beast::Journal j)
: j_(j)
, port_(port)
, handler_(handler)
, acceptor_(io_service)
, strand_(io_service)
, ssl_(
port_.protocol.count("https") > 0 ||
//port_.protocol.count("wss") > 0 ||
port_.protocol.count("wss2") > 0 ||
port_.protocol.count("peer") > 0)
, plain_(
port_.protocol.count("http") > 0 ||
//port_.protocol.count("ws") > 0 ||
port_.protocol.count("ws2"))
{
error_code ec;
endpoint_type const local_address =
endpoint_type(port.ip, port.port);
acceptor_.open(local_address.protocol(), ec);
if (ec)
{
JLOG(j_.error()) <<
"Open port '" << port.name << "' failed:" << ec.message();
Throw<std::exception> ();
}
acceptor_.set_option(
boost::asio::ip::tcp::acceptor::reuse_address(true), ec);
if (ec)
{
JLOG(j_.error()) <<
"Option for port '" << port.name << "' failed:" << ec.message();
Throw<std::exception> ();
}
acceptor_.bind(local_address, ec);
if (ec)
{
JLOG(j_.error()) <<
"Bind port '" << port.name << "' failed:" << ec.message();
Throw<std::exception> ();
}
acceptor_.listen(boost::asio::socket_base::max_connections, ec);
if (ec)
{
JLOG(j_.error()) <<
"Listen on port '" << port.name << "' failed:" << ec.message();
Throw<std::exception> ();
}
JLOG(j_.info()) <<
"Opened " << port;
}
void
Door::run()
{
boost::asio::spawn (strand_, std::bind(&Door::do_accept,
shared_from_this(), std::placeholders::_1));
}
void
Door::close()
{
if (! strand_.running_in_this_thread())
return strand_.post(std::bind(
&Door::close, shared_from_this()));
error_code ec;
acceptor_.close(ec);
}
//------------------------------------------------------------------------------
template <class ConstBufferSequence>
void
Door::create (bool ssl, ConstBufferSequence const& buffers,
socket_type&& socket, endpoint_type remote_address)
{
if (ssl)
{
if(auto sp = ios().emplace<SSLHTTPPeer>(port_, handler_,
j_, remote_address, buffers,
std::move(socket)))
sp->run();
return;
}
if(auto sp = ios().emplace<PlainHTTPPeer>(port_, handler_,
j_, remote_address, buffers,
std::move(socket)))
sp->run();
}
void
Door::do_accept (boost::asio::yield_context yield)
{
for(;;)
{
error_code ec;
endpoint_type remote_address;
socket_type socket (acceptor_.get_io_service());
acceptor_.async_accept (socket, remote_address, yield[ec]);
if (ec && ec != boost::asio::error::operation_aborted)
{
JLOG(j_.error()) <<
"accept: " << ec.message();
}
if (ec == boost::asio::error::operation_aborted)
break;
if (ec)
continue;
if (ssl_ && plain_)
{
if(auto sp = ios().emplace<Detector>(port_,
handler_, std::move(socket), remote_address,
j_))
sp->run();
}
else if (ssl_ || plain_)
{
create(ssl_, boost::asio::null_buffers{},
std::move(socket), remote_address);
}
}
}
} // ripple

View File

@@ -21,24 +21,32 @@
#define RIPPLE_SERVER_DOOR_H_INCLUDED
#include <ripple/server/impl/io_list.h>
#include <ripple/server/impl/ServerImpl.h>
#include <ripple/basics/contract.h>
#include <ripple/basics/Log.h>
#include <ripple/server/impl/PlainHTTPPeer.h>
#include <ripple/server/impl/SSLHTTPPeer.h>
#include <ripple/beast/asio/ssl_bundle.h>
#include <beast/core/placeholders.hpp>
#include <beast/core/streambuf.hpp>
#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/container/flat_map.hpp>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
namespace ripple {
/** A listening socket. */
template<class Handler>
class Door
: public io_list::work
, public std::enable_shared_from_this<Door>
, public std::enable_shared_from_this<Door<Handler>>
{
private:
using clock_type = std::chrono::steady_clock;
@@ -107,6 +115,287 @@ private:
void do_accept (yield_context yield);
};
/** Detect SSL client handshakes.
Analyzes the bytes in the provided buffer to detect the SSL client
handshake. If the buffer contains insufficient data, more data will be
read from the stream until there is enough to determine a result.
No bytes are discarded from buf. Any additional bytes read are retained.
buf must provide an interface compatible with boost::asio::streambuf
http://boost.org/doc/libs/1_56_0/doc/html/boost_asio/reference/streambuf.html
See
http://www.ietf.org/rfc/rfc2246.txt
Section 7.4. Handshake protocol
@param socket The stream to read from
@param buf A buffer to hold the received data
@param do_yield A do_yield context
@return The error code if an error occurs, otherwise `true` if
the data read indicates the SSL client handshake.
*/
template <class Socket, class StreamBuf, class Yield>
std::pair <boost::system::error_code, bool>
detect_ssl (Socket& socket, StreamBuf& buf, Yield do_yield)
{
std::pair <boost::system::error_code, bool> result;
result.second = false;
for(;;)
{
std::size_t const max = 4; // the most bytes we could need
unsigned char data[max];
auto const bytes = boost::asio::buffer_copy (
boost::asio::buffer(data), buf.data());
if (bytes > 0)
{
if (data[0] != 0x16) // message type 0x16 = "SSL Handshake"
break;
}
if (bytes >= max)
{
result.second = true;
break;
}
buf.commit(boost::asio::async_read (socket,
buf.prepare(max - bytes), boost::asio::transfer_at_least(1),
do_yield[result.first]));
if (result.first)
break;
}
return result;
}
template<class Handler>
Door<Handler>::Detector::
Detector(Port const& port,
Handler& handler, socket_type&& socket,
endpoint_type remote_address, beast::Journal j)
: port_(port)
, handler_(handler)
, socket_(std::move(socket))
, timer_(socket_.get_io_service())
, remote_address_(remote_address)
, strand_(socket_.get_io_service())
, j_(j)
{
}
template<class Handler>
void
Door<Handler>::Detector::
run()
{
// do_detect must be called before do_timer or else
// the timer can be canceled before it gets set.
boost::asio::spawn(strand_, std::bind (&Detector::do_detect,
this->shared_from_this(), std::placeholders::_1));
boost::asio::spawn(strand_, std::bind (&Detector::do_timer,
this->shared_from_this(), std::placeholders::_1));
}
template<class Handler>
void
Door<Handler>::Detector::
close()
{
error_code ec;
socket_.close(ec);
timer_.cancel(ec);
}
template<class Handler>
void
Door<Handler>::Detector::
do_timer(yield_context do_yield)
{
error_code ec; // ignored
while (socket_.is_open())
{
timer_.async_wait (do_yield[ec]);
if (timer_.expires_from_now() <= std::chrono::seconds(0))
socket_.close(ec);
}
}
template<class Handler>
void
Door<Handler>::Detector::
do_detect(boost::asio::yield_context do_yield)
{
bool ssl;
error_code ec;
beast::streambuf buf(16);
timer_.expires_from_now(std::chrono::seconds(15));
std::tie(ec, ssl) = detect_ssl(socket_, buf, do_yield);
error_code unused;
timer_.cancel(unused);
if (! ec)
{
if (ssl)
{
if(auto sp = ios().template emplace<SSLHTTPPeer<Handler>>(
port_, handler_, j_, remote_address_,
buf.data(), std::move(socket_)))
sp->run();
return;
}
if(auto sp = ios().template emplace<PlainHTTPPeer<Handler>>(
port_, handler_, j_, remote_address_,
buf.data(), std::move(socket_)))
sp->run();
return;
}
if (ec != boost::asio::error::operation_aborted)
{
JLOG(j_.trace()) <<
"Error detecting ssl: " << ec.message() <<
" from " << remote_address_;
}
}
//------------------------------------------------------------------------------
template<class Handler>
Door<Handler>::
Door(Handler& handler, boost::asio::io_service& io_service,
Port const& port, beast::Journal j)
: j_(j)
, port_(port)
, handler_(handler)
, acceptor_(io_service)
, strand_(io_service)
, ssl_(
port_.protocol.count("https") > 0 ||
//port_.protocol.count("wss") > 0 ||
port_.protocol.count("wss2") > 0 ||
port_.protocol.count("peer") > 0)
, plain_(
port_.protocol.count("http") > 0 ||
//port_.protocol.count("ws") > 0 ||
port_.protocol.count("ws2"))
{
error_code ec;
endpoint_type const local_address =
endpoint_type(port.ip, port.port);
acceptor_.open(local_address.protocol(), ec);
if (ec)
{
JLOG(j_.error()) <<
"Open port '" << port.name << "' failed:" << ec.message();
Throw<std::exception> ();
}
acceptor_.set_option(
boost::asio::ip::tcp::acceptor::reuse_address(true), ec);
if (ec)
{
JLOG(j_.error()) <<
"Option for port '" << port.name << "' failed:" << ec.message();
Throw<std::exception> ();
}
acceptor_.bind(local_address, ec);
if (ec)
{
JLOG(j_.error()) <<
"Bind port '" << port.name << "' failed:" << ec.message();
Throw<std::exception> ();
}
acceptor_.listen(boost::asio::socket_base::max_connections, ec);
if (ec)
{
JLOG(j_.error()) <<
"Listen on port '" << port.name << "' failed:" << ec.message();
Throw<std::exception> ();
}
JLOG(j_.info()) <<
"Opened " << port;
}
template<class Handler>
void
Door<Handler>::
run()
{
boost::asio::spawn(strand_, std::bind(&Door<Handler>::do_accept,
this->shared_from_this(), std::placeholders::_1));
}
template<class Handler>
void
Door<Handler>::
close()
{
if (! strand_.running_in_this_thread())
return strand_.post(std::bind(
&Door<Handler>::close, this->shared_from_this()));
error_code ec;
acceptor_.close(ec);
}
//------------------------------------------------------------------------------
template<class Handler>
template<class ConstBufferSequence>
void
Door<Handler>::
create(bool ssl, ConstBufferSequence const& buffers,
socket_type&& socket, endpoint_type remote_address)
{
if (ssl)
{
if(auto sp = ios().template emplace<SSLHTTPPeer<Handler>>(
port_, handler_, j_, remote_address,
buffers, std::move(socket)))
sp->run();
return;
}
if(auto sp = ios().template emplace<PlainHTTPPeer<Handler>>(
port_, handler_, j_, remote_address,
buffers, std::move(socket)))
sp->run();
}
template<class Handler>
void
Door<Handler>::
do_accept(boost::asio::yield_context do_yield)
{
for(;;)
{
error_code ec;
endpoint_type remote_address;
socket_type socket (acceptor_.get_io_service());
acceptor_.async_accept (socket, remote_address, do_yield[ec]);
if (ec && ec != boost::asio::error::operation_aborted)
{
JLOG(j_.error()) <<
"accept: " << ec.message();
}
if (ec == boost::asio::error::operation_aborted)
break;
if (ec)
continue;
if (ssl_ && plain_)
{
if(auto sp = ios().template emplace<Detector>(
port_, handler_, std::move(socket),
remote_address, j_))
sp->run();
}
else if (ssl_ || plain_)
{
create(ssl_, boost::asio::null_buffers{},
std::move(socket), remote_address);
}
}
}
} // ripple
#endif

View File

@@ -26,19 +26,21 @@
namespace ripple {
template<class Handler>
class PlainHTTPPeer
: public BaseHTTPPeer<PlainHTTPPeer>
, public std::enable_shared_from_this <PlainHTTPPeer>
: public BaseHTTPPeer<Handler, PlainHTTPPeer<Handler>>
, public std::enable_shared_from_this<PlainHTTPPeer<Handler>>
{
private:
friend class BaseHTTPPeer<PlainHTTPPeer>;
friend class BaseHTTPPeer<Handler, PlainHTTPPeer>;
using socket_type = boost::asio::ip::tcp::socket;
using endpoint_type = boost::asio::ip::tcp::endpoint;
socket_type stream_;
public:
template <class ConstBufferSequence>
PlainHTTPPeer (Port const& port, Handler& handler,
template<class ConstBufferSequence>
PlainHTTPPeer(Port const& port, Handler& handler,
beast::Journal journal, endpoint_type remote_address,
ConstBufferSequence const& buffers, socket_type&& socket);
@@ -58,12 +60,14 @@ private:
//------------------------------------------------------------------------------
template <class ConstBufferSequence>
PlainHTTPPeer::PlainHTTPPeer (Port const& port, Handler& handler,
template<class Handler>
template<class ConstBufferSequence>
PlainHTTPPeer<Handler>::
PlainHTTPPeer(Port const& port, Handler& handler,
beast::Journal journal, endpoint_type remote_endpoint,
ConstBufferSequence const& buffers, socket_type&& socket)
: BaseHTTPPeer(port, handler, socket.get_io_service(),
journal, remote_endpoint, buffers)
: BaseHTTPPeer<Handler, PlainHTTPPeer>(port, handler,
socket.get_io_service(), journal, remote_endpoint, buffers)
, stream_(std::move(socket))
{
// Set TCP_NODELAY on loopback interfaces,
@@ -74,67 +78,75 @@ PlainHTTPPeer::PlainHTTPPeer (Port const& port, Handler& handler,
stream_.set_option(boost::asio::ip::tcp::no_delay{true});
}
template<class Handler>
void
PlainHTTPPeer::run()
PlainHTTPPeer<Handler>::
run()
{
if (!handler_.onAccept (session(), remote_address_))
if (! this->handler_.onAccept(this->session(), this->remote_address_))
{
boost::asio::spawn (strand_,
boost::asio::spawn(this->strand_,
std::bind (&PlainHTTPPeer::do_close,
shared_from_this()));
this->shared_from_this()));
return;
}
if (! stream_.is_open())
return;
boost::asio::spawn (strand_, std::bind (&PlainHTTPPeer::do_read,
shared_from_this(), std::placeholders::_1));
boost::asio::spawn(this->strand_, std::bind(&PlainHTTPPeer::do_read,
this->shared_from_this(), std::placeholders::_1));
}
template<class Handler>
std::shared_ptr<WSSession>
PlainHTTPPeer::websocketUpgrade()
PlainHTTPPeer<Handler>::
websocketUpgrade()
{
auto ws = ios().emplace<PlainWSPeer>(
port_, handler_, remote_address_,
std::move(message_), std::move(stream_),
journal_);
auto ws = this->ios().template emplace<PlainWSPeer<Handler>>(
this->port_, this->handler_, this->remote_address_,
std::move(this->message_), std::move(stream_),
this->journal_);
return ws;
}
template<class Handler>
void
PlainHTTPPeer::do_request()
PlainHTTPPeer<Handler>::
do_request()
{
++request_count_;
auto const what = handler_.onHandoff (session(),
std::move(stream_), std::move(message_), remote_address_);
++this->request_count_;
auto const what = this->handler_.onHandoff(this->session(),
std::move(stream_), std::move(this->message_), this->remote_address_);
if (what.moved)
return;
error_code ec;
boost::system::error_code ec;
if (what.response)
{
// half-close on Connection: close
if (! what.keep_alive)
stream_.shutdown (socket_type::shutdown_receive, ec);
stream_.shutdown(socket_type::shutdown_receive, ec);
if (ec)
return fail (ec, "request");
return write(what.response, what.keep_alive);
return this->fail(ec, "request");
return this->write(what.response, what.keep_alive);
}
// Perform half-close when Connection: close and not SSL
if (! is_keep_alive(message_))
stream_.shutdown (socket_type::shutdown_receive, ec);
if (! is_keep_alive(this->message_))
stream_.shutdown(socket_type::shutdown_receive, ec);
if (ec)
return fail (ec, "request");
return this->fail(ec, "request");
// legacy
handler_.onRequest (session());
this->handler_.onRequest(this->session());
}
template<class Handler>
void
PlainHTTPPeer::do_close()
PlainHTTPPeer<Handler>::
do_close()
{
error_code ec;
stream_.shutdown (socket_type::shutdown_send, ec);
boost::system::error_code ec;
stream_.shutdown(socket_type::shutdown_send, ec);
}
} // ripple

View File

@@ -25,13 +25,14 @@
namespace ripple {
template<class Handler>
class PlainWSPeer
: public BaseWSPeer<PlainWSPeer>
, public std::enable_shared_from_this<PlainWSPeer>
: public BaseWSPeer<Handler, PlainWSPeer<Handler>>
, public std::enable_shared_from_this<PlainWSPeer<Handler>>
{
private:
friend class BasePeer<PlainWSPeer>;
friend class BaseWSPeer<PlainWSPeer>;
friend class BasePeer<Handler, PlainWSPeer>;
friend class BaseWSPeer<Handler, PlainWSPeer>;
using clock_type = std::chrono::system_clock;
using error_code = boost::system::error_code;
@@ -58,28 +59,32 @@ private:
//------------------------------------------------------------------------------
template<class Handler>
template<class Body, class Headers>
PlainWSPeer::PlainWSPeer(
PlainWSPeer<Handler>::
PlainWSPeer(
Port const& port,
Handler& handler,
endpoint_type remote_address,
beast::http::request_v1<Body, Headers>&& request,
socket_type&& socket,
beast::Journal journal)
: BaseWSPeer(port, handler, remote_address, std::move(request),
socket.get_io_service(), journal)
: BaseWSPeer<Handler, PlainWSPeer>(port, handler, remote_address,
std::move(request), socket.get_io_service(), journal)
, ws_(std::move(socket))
{
}
template<class Handler>
void
PlainWSPeer::do_close()
PlainWSPeer<Handler>::
do_close()
{
error_code ec;
auto& sock = ws_.next_layer();
sock.shutdown(socket_type::shutdown_both, ec);
if(ec)
return fail(ec, "do_close");
return this->fail(ec, "do_close");
}
} // ripple

View File

@@ -1,114 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/server/Role.h>
namespace ripple {
bool
passwordUnrequiredOrSentCorrect (Port const& port,
Json::Value const& params) {
assert(! port.admin_ip.empty ());
bool const passwordRequired = (!port.admin_user.empty() ||
!port.admin_password.empty());
return !passwordRequired ||
((params["admin_password"].isString() &&
params["admin_password"].asString() == port.admin_password) &&
(params["admin_user"].isString() &&
params["admin_user"].asString() == port.admin_user));
}
bool
ipAllowed (beast::IP::Address const& remoteIp,
std::vector<beast::IP::Address> const& adminIp)
{
return std::find_if (adminIp.begin (), adminIp.end (),
[&remoteIp](beast::IP::Address const& ip) { return ip.is_any () ||
ip == remoteIp; }) != adminIp.end ();
}
bool
isAdmin (Port const& port, Json::Value const& params,
beast::IP::Address const& remoteIp)
{
return ipAllowed (remoteIp, port.admin_ip) &&
passwordUnrequiredOrSentCorrect (port, params);
}
Role
requestRole (Role const& required, Port const& port,
Json::Value const& params, beast::IP::Endpoint const& remoteIp,
std::string const& user)
{
if (isAdmin(port, params, remoteIp.address()))
return Role::ADMIN;
if (required == Role::ADMIN)
return Role::FORBID;
if (isIdentified(port, remoteIp.address(), user))
return Role::IDENTIFIED;
return Role::GUEST;
}
/**
* ADMIN and IDENTIFIED roles shall have unlimited resources.
*/
bool
isUnlimited (Role const& required, Port const& port,
Json::Value const&params, beast::IP::Endpoint const& remoteIp,
std::string const& user)
{
Role role = requestRole(required, port, params, remoteIp, user);
if (role == Role::ADMIN || role == Role::IDENTIFIED)
return true;
else
return false;
}
bool
isUnlimited (Role const& role)
{
return role == Role::ADMIN || role == Role::IDENTIFIED;
}
Resource::Consumer
requestInboundEndpoint (Resource::Manager& manager,
beast::IP::Endpoint const& remoteAddress,
Port const& port, std::string const& user)
{
if (isUnlimited (Role::GUEST, port, Json::Value(), remoteAddress, user))
return manager.newUnlimitedEndpoint (to_string (remoteAddress));
return manager.newInboundEndpoint(remoteAddress);
}
bool
isIdentified (Port const& port, beast::IP::Address const& remoteIp,
std::string const& user)
{
return ! user.empty() && ipAllowed (remoteIp, port.secure_gateway_ip);
}
}

View File

@@ -27,21 +27,25 @@
namespace ripple {
template<class Handler>
class SSLHTTPPeer
: public BaseHTTPPeer<SSLHTTPPeer>
, public std::enable_shared_from_this <SSLHTTPPeer>
: public BaseHTTPPeer<Handler, SSLHTTPPeer<Handler>>
, public std::enable_shared_from_this<SSLHTTPPeer<Handler>>
{
private:
friend class BaseHTTPPeer<SSLHTTPPeer>;
friend class BaseHTTPPeer<Handler, SSLHTTPPeer>;
using socket_type = boost::asio::ip::tcp::socket;
using stream_type = boost::asio::ssl::stream <socket_type&>;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using yield_context = boost::asio::yield_context;
using error_code = boost::system::error_code;
std::unique_ptr<beast::asio::ssl_bundle> ssl_bundle_;
stream_type& stream_;
public:
template <class ConstBufferSequence>
SSLHTTPPeer (Port const& port, Handler& handler,
template<class ConstBufferSequence>
SSLHTTPPeer(Port const& port, Handler& handler,
beast::Journal journal, endpoint_type remote_address,
ConstBufferSequence const& buffers, socket_type&& socket);
@@ -53,7 +57,7 @@ public:
private:
void
do_handshake (yield_context yield);
do_handshake(yield_context do_yield);
void
do_request() override;
@@ -62,16 +66,19 @@ private:
do_close() override;
void
on_shutdown (error_code ec);
on_shutdown(error_code ec);
};
//------------------------------------------------------------------------------
template <class ConstBufferSequence>
SSLHTTPPeer::SSLHTTPPeer (Port const& port, Handler& handler,
template<class Handler>
template<class ConstBufferSequence>
SSLHTTPPeer<Handler>::
SSLHTTPPeer(Port const& port, Handler& handler,
beast::Journal journal, endpoint_type remote_address,
ConstBufferSequence const& buffers, socket_type&& socket)
: BaseHTTPPeer (port, handler, socket.get_io_service(), journal, remote_address, buffers)
: BaseHTTPPeer<Handler, SSLHTTPPeer>(port, handler,
socket.get_io_service(), journal, remote_address, buffers)
, ssl_bundle_(std::make_unique<beast::asio::ssl_bundle>(
port.context, std::move(socket)))
, stream_(ssl_bundle_->stream)
@@ -79,86 +86,99 @@ SSLHTTPPeer::SSLHTTPPeer (Port const& port, Handler& handler,
}
// Called when the acceptor accepts our socket.
template<class Handler>
void
SSLHTTPPeer::run()
SSLHTTPPeer<Handler>::
run()
{
if (!handler_.onAccept (session(), remote_address_))
if(! this->handler_.onAccept(this->session(), this->remote_address_))
{
boost::asio::spawn (strand_,
std::bind (&SSLHTTPPeer::do_close,
shared_from_this()));
boost::asio::spawn(this->strand_,
std::bind(&SSLHTTPPeer::do_close,
this->shared_from_this()));
return;
}
if (! stream_.lowest_layer().is_open())
return;
boost::asio::spawn (strand_, std::bind (&SSLHTTPPeer::do_handshake,
shared_from_this(), std::placeholders::_1));
boost::asio::spawn(this->strand_, std::bind(
&SSLHTTPPeer::do_handshake, this->shared_from_this(),
std::placeholders::_1));
}
template<class Handler>
std::shared_ptr<WSSession>
SSLHTTPPeer::websocketUpgrade()
SSLHTTPPeer<Handler>::
websocketUpgrade()
{
auto ws = ios().emplace<SSLWSPeer>(
port_, handler_, remote_address_,
std::move(message_), std::move(ssl_bundle_),
journal_);
auto ws = this->ios().template emplace<SSLWSPeer<Handler>>(
this->port_, this->handler_, this->remote_address_,
std::move(this->message_), std::move(this->ssl_bundle_),
this->journal_);
return ws;
}
template<class Handler>
void
SSLHTTPPeer::do_handshake (yield_context yield)
SSLHTTPPeer<Handler>::
do_handshake(yield_context do_yield)
{
error_code ec;
stream_.set_verify_mode (boost::asio::ssl::verify_none);
start_timer();
read_buf_.consume(stream_.async_handshake(
stream_type::server, read_buf_.data(), yield[ec]));
cancel_timer();
boost::system::error_code ec;
stream_.set_verify_mode(boost::asio::ssl::verify_none);
this->start_timer();
this->read_buf_.consume(stream_.async_handshake(
stream_type::server, this->read_buf_.data(), do_yield[ec]));
this->cancel_timer();
if (ec)
return fail(ec, "handshake");
return this->fail(ec, "handshake");
bool const http =
port().protocol.count("peer") > 0 ||
this->port().protocol.count("peer") > 0 ||
//port().protocol.count("wss") > 0 ||
port().protocol.count("wss2") > 0 ||
port().protocol.count("https") > 0;
if (http)
this->port().protocol.count("wss2") > 0 ||
this->port().protocol.count("https") > 0;
if(http)
{
boost::asio::spawn (strand_, std::bind (&SSLHTTPPeer::do_read,
shared_from_this(), std::placeholders::_1));
boost::asio::spawn(this->strand_,
std::bind(&SSLHTTPPeer::do_read,
this->shared_from_this(), std::placeholders::_1));
return;
}
// this will be destroyed
// `this` will be destroyed
}
template<class Handler>
void
SSLHTTPPeer::do_request()
SSLHTTPPeer<Handler>::
do_request()
{
++request_count_;
auto const what = handler_.onHandoff (session(),
std::move(ssl_bundle_), std::move(message_), remote_address_);
if (what.moved)
++this->request_count_;
auto const what = this->handler_.onHandoff(this->session(),
std::move(ssl_bundle_), std::move(this->message_),
this->remote_address_);
if(what.moved)
return;
if (what.response)
return write(what.response, what.keep_alive);
if(what.response)
return this->write(what.response, what.keep_alive);
// legacy
handler_.onRequest (session());
this->handler_.onRequest(this->session());
}
template<class Handler>
void
SSLHTTPPeer::do_close()
SSLHTTPPeer<Handler>::
do_close()
{
start_timer();
stream_.async_shutdown (strand_.wrap (std::bind (
&SSLHTTPPeer::on_shutdown, shared_from_this(),
this->start_timer();
stream_.async_shutdown(this->strand_.wrap(std::bind (
&SSLHTTPPeer::on_shutdown, this->shared_from_this(),
std::placeholders::_1)));
}
template<class Handler>
void
SSLHTTPPeer::on_shutdown (error_code ec)
SSLHTTPPeer<Handler>::
on_shutdown(error_code ec)
{
cancel_timer();
this->cancel_timer();
stream_.lowest_layer().close(ec);
}

View File

@@ -29,13 +29,14 @@
namespace ripple {
template<class Handler>
class SSLWSPeer
: public BaseWSPeer<SSLWSPeer>
, public std::enable_shared_from_this<SSLWSPeer>
: public BaseWSPeer<Handler, SSLWSPeer<Handler>>
, public std::enable_shared_from_this<SSLWSPeer<Handler>>
{
private:
friend class BasePeer<SSLWSPeer>;
friend class BaseWSPeer<SSLWSPeer>;
friend class BasePeer<Handler, SSLWSPeer>;
friend class BaseWSPeer<Handler, SSLWSPeer>;
using clock_type = std::chrono::system_clock;
using error_code = boost::system::error_code;
@@ -68,8 +69,10 @@ private:
//------------------------------------------------------------------------------
template<class Handler>
template<class Body, class Headers>
SSLWSPeer::SSLWSPeer(
SSLWSPeer<Handler>::
SSLWSPeer(
Port const& port,
Handler& handler,
endpoint_type remote_endpoint,
@@ -77,25 +80,30 @@ SSLWSPeer::SSLWSPeer(
std::unique_ptr<
beast::asio::ssl_bundle>&& ssl_bundle,
beast::Journal journal)
: BaseWSPeer(port, handler, remote_endpoint, std::move(request),
ssl_bundle->socket.get_io_service(), journal)
: BaseWSPeer<Handler, SSLWSPeer>(port, handler,
remote_endpoint, std::move(request),
ssl_bundle->socket.get_io_service(), journal)
, ssl_bundle_(std::move(ssl_bundle))
, ws_(ssl_bundle_->stream)
{
}
template<class Handler>
void
SSLWSPeer::do_close()
SSLWSPeer<Handler>::
do_close()
{
//start_timer();
using namespace beast::asio;
ws_.next_layer().async_shutdown(
strand_.wrap(std::bind(&SSLWSPeer::on_shutdown,
shared_from_this(), placeholders::error)));
this->strand_.wrap(std::bind(&SSLWSPeer::on_shutdown,
this->shared_from_this(), placeholders::error)));
}
template<class Handler>
void
SSLWSPeer::on_shutdown(error_code ec)
SSLWSPeer<Handler>::
on_shutdown(error_code ec)
{
//cancel_timer();
ws_.lowest_layer().close(ec);

View File

@@ -1,881 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/beast/rfc2616.h>
#include <ripple/beast/net/IPAddressConversion.h>
#include <ripple/json/json_reader.h>
#include <ripple/server/json_body.h>
#include <ripple/server/make_ServerHandler.h>
#include <ripple/server/impl/JSONRPCUtil.h>
#include <ripple/server/impl/ServerHandlerImp.h>
#include <ripple/basics/contract.h>
#include <ripple/basics/Log.h>
#include <ripple/basics/make_SSLContext.h>
#include <ripple/core/JobQueue.h>
#include <ripple/json/to_string.h>
#include <ripple/net/RPCErr.h>
#include <ripple/server/make_Server.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/resource/ResourceManager.h>
#include <ripple/resource/Fees.h>
#include <ripple/rpc/impl/Tuning.h>
#include <ripple/rpc/RPCHandler.h>
#include <beast/core/detail/base64.hpp>
#include <beast/http/headers.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/type_traits.hpp>
#include <boost/optional.hpp>
#include <boost/regex.hpp>
#include <algorithm>
#include <stdexcept>
namespace ripple {
ServerHandler::ServerHandler (Stoppable& parent)
: Stoppable ("ServerHandler", parent)
{
}
//------------------------------------------------------------------------------
ServerHandlerImp::ServerHandlerImp (Application& app, Stoppable& parent,
boost::asio::io_service& io_service, JobQueue& jobQueue,
NetworkOPs& networkOPs, Resource::Manager& resourceManager,
CollectorManager& cm)
: ServerHandler (parent)
, app_ (app)
, m_resourceManager (resourceManager)
, m_journal (app_.journal("Server"))
, m_networkOPs (networkOPs)
, m_server (make_Server(
*this, io_service, app_.journal("Server")))
, m_jobQueue (jobQueue)
{
auto const& group (cm.group ("rpc"));
rpc_requests_ = group->make_counter ("requests");
rpc_size_ = group->make_event ("size");
rpc_time_ = group->make_event ("time");
}
ServerHandlerImp::~ServerHandlerImp()
{
m_server = nullptr;
}
void
ServerHandlerImp::setup (Setup const& setup, beast::Journal journal)
{
setup_ = setup;
m_server->ports (setup.ports);
}
//------------------------------------------------------------------------------
void
ServerHandlerImp::onStop()
{
m_server->close();
}
//------------------------------------------------------------------------------
bool
ServerHandlerImp::onAccept (Session& session,
boost::asio::ip::tcp::endpoint endpoint)
{
std::lock_guard<std::mutex> l(countlock_);
auto const c = ++count_[session.port()];
if (session.port().limit && c >= session.port().limit)
{
JLOG (m_journal.trace()) <<
session.port().name << " is full; dropping " <<
endpoint;
return false;
}
return true;
}
auto
ServerHandlerImp::onHandoff (Session& session,
std::unique_ptr <beast::asio::ssl_bundle>&& bundle,
http_request_type&& request,
boost::asio::ip::tcp::endpoint remote_address) ->
Handoff
{
if(isWebsocketUpgrade(request))
{
Handoff handoff;
if(session.port().protocol.count("wss2") > 0)
{
auto const ws = session.websocketUpgrade();
auto is = std::make_shared<WSInfoSub>(m_networkOPs, ws);
is->getConsumer() = requestInboundEndpoint(
m_resourceManager,
beast::IPAddressConversion::from_asio(remote_address),
session.port(), is->user());
ws->appDefined = std::move(is);
ws->run();
handoff.moved = true;
return handoff;
}
if(session.port().protocol.count("wss") > 0)
return handoff; // Pass to websocket
}
if(session.port().protocol.count("peer") > 0)
{
return app_.overlay().onHandoff(std::move(bundle),
std::move(request), remote_address);
}
// Pass to legacy onRequest
return {};
}
auto
ServerHandlerImp::onHandoff (Session& session,
boost::asio::ip::tcp::socket&& socket,
http_request_type&& request,
boost::asio::ip::tcp::endpoint remote_address) ->
Handoff
{
Handoff handoff;
if(session.port().protocol.count("ws2") > 0 &&
isWebsocketUpgrade (request))
{
auto const ws = session.websocketUpgrade();
auto is = std::make_shared<WSInfoSub>(m_networkOPs, ws);
is->getConsumer() = requestInboundEndpoint(
m_resourceManager,
beast::IPAddressConversion::from_asio(remote_address),
session.port(), is->user());
ws->appDefined = std::move(is);
ws->run();
handoff.moved = true;
}
// Otherwise pass to legacy onRequest or websocket
return handoff;
}
static inline
Json::Output makeOutput (Session& session)
{
return [&](boost::string_ref const& b)
{
session.write (b.data(), b.size());
};
}
// HACK!
static
std::map<std::string, std::string>
build_map(beast::http::headers const& h)
{
std::map <std::string, std::string> c;
for (auto const& e : h)
{
auto key (e.first);
// TODO Replace with safe C++14 version
std::transform (key.begin(), key.end(), key.begin(), ::tolower);
c [key] = e.second;
}
return c;
}
template<class ConstBufferSequence>
static
std::string
buffers_to_string(ConstBufferSequence const& bs)
{
using boost::asio::buffer_cast;
using boost::asio::buffer_size;
std::string s;
s.reserve(buffer_size(bs));
for(auto const& b : bs)
s.append(buffer_cast<char const*>(b),
buffer_size(b));
return s;
}
void
ServerHandlerImp::onRequest (Session& session)
{
// Make sure RPC is enabled on the port
if (session.port().protocol.count("http") == 0 &&
session.port().protocol.count("https") == 0)
{
HTTPReply (403, "Forbidden", makeOutput (session), app_.journal ("RPC"));
session.close (true);
return;
}
// Check user/password authorization
if (! authorized (
session.port(), build_map(session.request().headers)))
{
HTTPReply (403, "Forbidden", makeOutput (session), app_.journal ("RPC"));
session.close (true);
return;
}
m_jobQueue.postCoro(jtCLIENT, "RPC-Client",
[this, detach = session.detach()](std::shared_ptr<JobCoro> jc)
{
processSession(detach, jc);
});
}
void
ServerHandlerImp::onWSMessage(
std::shared_ptr<WSSession> session,
std::vector<boost::asio::const_buffer> const& buffers)
{
Json::Value jv;
auto const size = boost::asio::buffer_size(buffers);
if (size > RPC::Tuning::maxRequestSize ||
! Json::Reader{}.parse(jv, buffers) ||
! jv ||
! jv.isObject())
{
Json::Value jvResult(Json::objectValue);
jvResult[jss::type] = jss::error;
jvResult[jss::error] = "jsonInvalid";
jvResult[jss::value] = buffers_to_string(buffers);
beast::streambuf sb;
Json::stream(jvResult,
[&sb](auto const p, auto const n)
{
sb.commit(boost::asio::buffer_copy(
sb.prepare(n), boost::asio::buffer(p, n)));
});
JLOG(m_journal.trace())
<< "Websocket sending '" << jvResult << "'";
session->send(std::make_shared<
StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
return;
}
JLOG(m_journal.trace())
<< "Websocket received '" << jv << "'";
m_jobQueue.postCoro(jtCLIENT, "WS-Client",
[this, session = std::move(session),
jv = std::move(jv)](auto const& jc)
{
auto const jr =
this->processSession(session, jc, jv);
auto const s = to_string(jr);
auto const n = s.length();
beast::streambuf sb(n);
sb.commit(boost::asio::buffer_copy(
sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
session->send(std::make_shared<
StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
});
}
void
ServerHandlerImp::onClose (Session& session,
boost::system::error_code const&)
{
std::lock_guard<std::mutex> l(countlock_);
--count_[session.port()];
}
void
ServerHandlerImp::onStopped (Server&)
{
stopped();
}
//------------------------------------------------------------------------------
Json::Value
ServerHandlerImp::processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobCoro> const& coro,
Json::Value const& jv)
{
auto is = std::static_pointer_cast<WSInfoSub> (session->appDefined);
if (is->getConsumer().disconnect())
{
session->close();
return rpcError(rpcSLOW_DOWN);
}
// Requests without "command" are invalid.
Json::Value jr(Json::objectValue);
if (! jv.isMember (jss::command))
{
jr[jss::type] = jss::response;
jr[jss::status] = jss::error;
jr[jss::error] = jss::missingCommand;
jr[jss::request] = jv;
if (jv.isMember (jss::id))
jr[jss::id] = jv[jss::id];
is->getConsumer().charge(Resource::feeInvalidRPC);
return jr;
}
Resource::Charge loadType = Resource::feeReferenceRPC;
auto required = RPC::roleRequired(jv[jss::command].asString());
auto role = requestRole(
required,
session->port(),
jv,
beast::IP::from_asio(session->remote_endpoint().address()),
is->user());
if (Role::FORBID == role)
{
jr[jss::result] = rpcError (rpcFORBIDDEN);
}
else
{
RPC::Context context{
app_.journal("RPCHandler"),
jv,
app_,
loadType,
app_.getOPs(),
app_.getLedgerMaster(),
is->getConsumer(),
role,
coro,
is,
{is->user(), is->forwarded_for()}
};
RPC::doCommand(context, jr[jss::result]);
}
is->getConsumer().charge(loadType);
if (is->getConsumer().warn())
jr[jss::warning] = jss::load;
// Currently we will simply unwrap errors returned by the RPC
// API, in the future maybe we can make the responses
// consistent.
//
// Regularize result. This is duplicate code.
if (jr[jss::result].isMember(jss::error))
{
jr = jr[jss::result];
jr[jss::status] = jss::error;
jr[jss::request] = jv;
}
else
{
jr[jss::status] = jss::success;
// For testing resource limits on this connection.
if (is->getConsumer().isUnlimited() &&
jv[jss::command].asString() == "ping")
jr[jss::unlimited] = true;
}
if (jv.isMember(jss::id))
jr[jss::id] = jv[jss::id];
jr[jss::type] = jss::response;
return jr;
}
// Run as a coroutine.
void
ServerHandlerImp::processSession (std::shared_ptr<Session> const& session,
std::shared_ptr<JobCoro> jobCoro)
{
processRequest (
session->port(), buffers_to_string(
session->request().body.data()),
session->remoteAddress().at_port (0),
makeOutput (*session), jobCoro,
[&]
{
auto const iter =
session->request().headers.find(
"X-Forwarded-For");
if(iter != session->request().headers.end())
return iter->second;
return std::string{};
}(),
[&]
{
auto const iter =
session->request().headers.find(
"X-User");
if(iter != session->request().headers.end())
return iter->second;
return std::string{};
}());
if(is_keep_alive(session->request()))
session->complete();
else
session->close (true);
}
void
ServerHandlerImp::processRequest (Port const& port,
std::string const& request, beast::IP::Endpoint const& remoteIPAddress,
Output&& output, std::shared_ptr<JobCoro> jobCoro,
std::string forwardedFor, std::string user)
{
auto rpcJ = app_.journal ("RPC");
Json::Value jsonRPC;
{
Json::Reader reader;
if ((request.size () > RPC::Tuning::maxRequestSize) ||
! reader.parse (request, jsonRPC) ||
! jsonRPC ||
! jsonRPC.isObject ())
{
HTTPReply (400, "Unable to parse request", output, rpcJ);
return;
}
}
// Parse id now so errors from here on will have the id
//
// VFALCO NOTE Except that "id" isn't included in the following errors.
//
Json::Value const& id = jsonRPC ["id"];
Json::Value const& method = jsonRPC ["method"];
if (! method) {
HTTPReply (400, "Null method", output, rpcJ);
return;
}
if (!method.isString ()) {
HTTPReply (400, "method is not string", output, rpcJ);
return;
}
/* ---------------------------------------------------------------------- */
auto role = Role::FORBID;
auto required = RPC::roleRequired(id.asString());
if (jsonRPC.isMember("params") &&
jsonRPC["params"].isArray() &&
jsonRPC["params"].size() > 0 &&
jsonRPC["params"][Json::UInt(0)].isObject())
{
role = requestRole(required, port, jsonRPC["params"][Json::UInt(0)],
remoteIPAddress, user);
}
else
{
role = requestRole(required, port, Json::objectValue,
remoteIPAddress, user);
}
/**
* Clear header-assigned values if not positively identified from a
* secure_gateway.
*/
if (role != Role::IDENTIFIED)
{
forwardedFor.clear();
user.clear();
}
Resource::Consumer usage;
if (isUnlimited(role))
{
usage = m_resourceManager.newUnlimitedEndpoint(
remoteIPAddress.to_string());
}
else
{
usage = m_resourceManager.newInboundEndpoint(remoteIPAddress);
if (usage.disconnect())
{
HTTPReply(503, "Server is overloaded", output, rpcJ);
return;
}
}
std::string strMethod = method.asString ();
if (strMethod.empty())
{
HTTPReply (400, "method is empty", output, rpcJ);
return;
}
// Extract request parameters from the request Json as `params`.
//
// If the field "params" is empty, `params` is an empty object.
//
// Otherwise, that field must be an array of length 1 (why?)
// and we take that first entry and validate that it's an object.
Json::Value params = jsonRPC [jss::params];
if (! params)
params = Json::Value (Json::objectValue);
else if (!params.isArray () || params.size() != 1)
{
HTTPReply (400, "params unparseable", output, rpcJ);
return;
}
else
{
params = std::move (params[0u]);
if (!params.isObject())
{
HTTPReply (400, "params unparseable", output, rpcJ);
return;
}
}
// VFALCO TODO Shouldn't we handle this earlier?
//
if (role == Role::FORBID)
{
// VFALCO TODO Needs implementing
// FIXME Needs implementing
// XXX This needs rate limiting to prevent brute forcing password.
HTTPReply (403, "Forbidden", output, rpcJ);
return;
}
JLOG(m_journal.debug()) << "Query: " << strMethod << params;
// Provide the JSON-RPC method as the field "command" in the request.
params[jss::command] = strMethod;
JLOG (m_journal.trace())
<< "doRpcCommand:" << strMethod << ":" << params;
Resource::Charge loadType = Resource::feeReferenceRPC;
auto const start (std::chrono::high_resolution_clock::now ());
RPC::Context context {m_journal, params, app_, loadType, m_networkOPs,
app_.getLedgerMaster(), usage, role, jobCoro, InfoSub::pointer(),
{user, forwardedFor}};
Json::Value result;
RPC::doCommand (context, result);
// Always report "status". On an error report the request as received.
if (result.isMember (jss::error))
{
result[jss::status] = jss::error;
result[jss::request] = params;
JLOG (m_journal.debug()) <<
"rpcError: " << result [jss::error] <<
": " << result [jss::error_message];
}
else
{
result[jss::status] = jss::success;
}
Json::Value reply (Json::objectValue);
reply[jss::result] = std::move (result);
auto response = to_string (reply);
rpc_time_.notify (static_cast <beast::insight::Event::value_type> (
std::chrono::duration_cast <std::chrono::milliseconds> (
std::chrono::high_resolution_clock::now () - start)));
++rpc_requests_;
rpc_size_.notify (static_cast <beast::insight::Event::value_type> (
response.size ()));
response += '\n';
usage.charge (loadType);
if (auto stream = m_journal.debug())
{
static const int maxSize = 10000;
if (response.size() <= maxSize)
stream << "Reply: " << response;
else
stream << "Reply: " << response.substr (0, maxSize);
}
HTTPReply (200, response, output, rpcJ);
}
//------------------------------------------------------------------------------
// Returns `true` if the HTTP request is a Websockets Upgrade
// http://en.wikipedia.org/wiki/HTTP/1.1_Upgrade_header#Use_with_WebSockets
bool
ServerHandlerImp::isWebsocketUpgrade (http_request_type const& request)
{
if (is_upgrade(request))
return request.headers["Upgrade"] == "websocket";
return false;
}
// VFALCO TODO Rewrite to use beast::http::headers
bool
ServerHandlerImp::authorized (Port const& port,
std::map<std::string, std::string> const& h)
{
if (port.user.empty() || port.password.empty())
return true;
auto const it = h.find ("authorization");
if ((it == h.end ()) || (it->second.substr (0, 6) != "Basic "))
return false;
std::string strUserPass64 = it->second.substr (6);
boost::trim (strUserPass64);
std::string strUserPass = beast::detail::base64_decode (strUserPass64);
std::string::size_type nColon = strUserPass.find (":");
if (nColon == std::string::npos)
return false;
std::string strUser = strUserPass.substr (0, nColon);
std::string strPassword = strUserPass.substr (nColon + 1);
return strUser == port.user && strPassword == port.password;
}
//------------------------------------------------------------------------------
void
ServerHandler::Setup::makeContexts()
{
for(auto& p : ports)
{
if (p.secure())
{
if (p.ssl_key.empty() && p.ssl_cert.empty() &&
p.ssl_chain.empty())
p.context = make_SSLContext();
else
p.context = make_SSLContextAuthed (
p.ssl_key, p.ssl_cert, p.ssl_chain);
}
else
{
p.context = std::make_shared<
boost::asio::ssl::context>(
boost::asio::ssl::context::sslv23);
}
}
}
static
Port
to_Port(ParsedPort const& parsed, std::ostream& log)
{
Port p;
p.name = parsed.name;
if (! parsed.ip)
{
log << "Missing 'ip' in [" << p.name << "]\n";
Throw<std::exception> ();
}
p.ip = *parsed.ip;
if (! parsed.port)
{
log << "Missing 'port' in [" << p.name << "]\n";
Throw<std::exception> ();
}
else if (*parsed.port == 0)
{
log << "Port " << *parsed.port << "in [" << p.name << "] is invalid\n";
Throw<std::exception> ();
}
p.port = *parsed.port;
if (parsed.admin_ip)
p.admin_ip = *parsed.admin_ip;
if (parsed.secure_gateway_ip)
p.secure_gateway_ip = *parsed.secure_gateway_ip;
if (parsed.protocol.empty())
{
log << "Missing 'protocol' in [" << p.name << "]\n";
Throw<std::exception> ();
}
p.protocol = parsed.protocol;
if (p.websockets() &&
(parsed.protocol.count("peer") > 0 ||
parsed.protocol.count("http") > 0 ||
parsed.protocol.count("https") > 0))
{
log << "Invalid protocol combination in [" << p.name << "]\n";
Throw<std::exception> ();
}
p.user = parsed.user;
p.password = parsed.password;
p.admin_user = parsed.admin_user;
p.admin_password = parsed.admin_password;
p.ssl_key = parsed.ssl_key;
p.ssl_cert = parsed.ssl_cert;
p.ssl_chain = parsed.ssl_chain;
return p;
}
static
std::vector<Port>
parse_Ports (
Config const& config,
std::ostream& log)
{
std::vector<Port> result;
if (! config.exists("server"))
{
log <<
"Required section [server] is missing\n";
Throw<std::exception> ();
}
ParsedPort common;
parse_Port (common, config["server"], log);
auto const& names = config.section("server").values();
result.reserve(names.size());
for (auto const& name : names)
{
if (! config.exists(name))
{
log <<
"Missing section: [" << name << "]\n";
Throw<std::exception> ();
}
ParsedPort parsed = common;
parsed.name = name;
parse_Port(parsed, config[name], log);
result.push_back(to_Port(parsed, log));
}
if (config.standalone())
{
auto it = result.begin ();
while (it != result.end())
{
auto& p = it->protocol;
// Remove the peer protocol, and if that would
// leave the port empty, remove the port as well
if (p.erase ("peer") && p.empty())
it = result.erase (it);
else
++it;
}
}
else
{
auto const count = std::count_if (
result.cbegin(), result.cend(),
[](Port const& p)
{
return p.protocol.count("peer") != 0;
});
if (count > 1)
{
log << "Error: More than one peer protocol configured in [server]\n";
Throw<std::exception> ();
}
if (count == 0)
log << "Warning: No peer protocol configured\n";
}
return result;
}
// Fill out the client portion of the Setup
static
void
setup_Client (ServerHandler::Setup& setup)
{
decltype(setup.ports)::const_iterator iter;
for (iter = setup.ports.cbegin();
iter != setup.ports.cend(); ++iter)
if (iter->protocol.count("http") > 0 ||
iter->protocol.count("https") > 0)
break;
if (iter == setup.ports.cend())
return;
setup.client.secure =
iter->protocol.count("https") > 0;
setup.client.ip = iter->ip.to_string();
// VFALCO HACK! to make localhost work
if (setup.client.ip == "0.0.0.0")
setup.client.ip = "127.0.0.1";
setup.client.port = iter->port;
setup.client.user = iter->user;
setup.client.password = iter->password;
setup.client.admin_user = iter->admin_user;
setup.client.admin_password = iter->admin_password;
}
// Fill out the overlay portion of the Setup
static
void
setup_Overlay (ServerHandler::Setup& setup)
{
auto const iter = std::find_if(
setup.ports.cbegin(), setup.ports.cend(),
[](Port const& port)
{
return port.protocol.count("peer") != 0;
});
if (iter == setup.ports.cend())
{
setup.overlay.port = 0;
return;
}
setup.overlay.ip = iter->ip;
setup.overlay.port = iter->port;
}
ServerHandler::Setup
setup_ServerHandler(
Config const& config,
std::ostream& log)
{
ServerHandler::Setup setup;
setup.ports = parse_Ports(config, log);
setup_Client(setup);
setup_Overlay(setup);
return setup;
}
std::unique_ptr <ServerHandler>
make_ServerHandler (Application& app, Stoppable& parent,
boost::asio::io_service& io_service, JobQueue& jobQueue,
NetworkOPs& networkOPs, Resource::Manager& resourceManager,
CollectorManager& cm)
{
return std::make_unique<ServerHandlerImp>(app, parent,
io_service, jobQueue, networkOPs, resourceManager, cm);
}
} // ripple

View File

@@ -1,211 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_SERVERHANDLERIMP_H_INCLUDED
#define RIPPLE_SERVER_SERVERHANDLERIMP_H_INCLUDED
#include <ripple/core/Job.h>
#include <ripple/core/JobCoro.h>
#include <ripple/json/Output.h>
#include <ripple/json/to_string.h>
#include <ripple/net/InfoSub.h>
#include <ripple/server/Handler.h>
#include <ripple/server/ServerHandler.h>
#include <ripple/server/Session.h>
#include <ripple/server/WSSession.h>
#include <ripple/rpc/RPCHandler.h>
#include <ripple/app/main/CollectorManager.h>
#include <map>
#include <mutex>
namespace ripple {
inline
bool operator< (Port const& lhs, Port const& rhs)
{
return lhs.name < rhs.name;
}
class WSInfoSub : public InfoSub
{
std::weak_ptr<WSSession> ws_;
std::string user_;
std::string fwdfor_;
public:
WSInfoSub(Source& source,
std::shared_ptr<WSSession> const& ws)
: InfoSub(source)
, ws_(ws)
{
auto const& h = ws->request().headers;
auto it = h.find("X-User");
if (it != h.end() &&
isIdentified(
ws->port(), beast::IPAddressConversion::from_asio(
ws->remote_endpoint()).address(), it->second))
{
user_ = it->second;
it = h.find("X-Forwarded-For");
if (it != h.end())
fwdfor_ = it->second;
}
}
std::string
user() const
{
return user_;
}
std::string
forwarded_for() const
{
return fwdfor_;
}
void
send(Json::Value const& jv, bool)
{
auto sp = ws_.lock();
if(! sp)
return;
beast::streambuf sb;
stream(jv,
[&](void const* data, std::size_t n)
{
sb.commit(boost::asio::buffer_copy(
sb.prepare(n), boost::asio::buffer(data, n)));
});
auto m = std::make_shared<
StreambufWSMsg<decltype(sb)>>(
std::move(sb));
sp->send(m);
}
};
// Private implementation
class ServerHandlerImp
: public ServerHandler
, public Handler
{
private:
Application& app_;
Resource::Manager& m_resourceManager;
beast::Journal m_journal;
NetworkOPs& m_networkOPs;
std::unique_ptr<Server> m_server;
Setup setup_;
JobQueue& m_jobQueue;
beast::insight::Counter rpc_requests_;
beast::insight::Event rpc_size_;
beast::insight::Event rpc_time_;
std::mutex countlock_;
std::map<std::reference_wrapper<Port const>, int> count_;
public:
ServerHandlerImp (Application& app, Stoppable& parent,
boost::asio::io_service& io_service, JobQueue& jobQueue,
NetworkOPs& networkOPs, Resource::Manager& resourceManager,
CollectorManager& cm);
~ServerHandlerImp();
private:
using Output = Json::Output;
void
setup (Setup const& setup, beast::Journal journal) override;
Setup const&
setup() const override
{
return setup_;
}
//
// Stoppable
//
void
onStop() override;
//
// Handler
//
bool
onAccept (Session& session,
boost::asio::ip::tcp::endpoint endpoint) override;
Handoff
onHandoff (Session& session,
std::unique_ptr <beast::asio::ssl_bundle>&& bundle,
http_request_type&& request,
boost::asio::ip::tcp::endpoint remote_address) override;
Handoff
onHandoff (Session& session, boost::asio::ip::tcp::socket&& socket,
http_request_type&& request,
boost::asio::ip::tcp::endpoint remote_address) override;
void
onRequest (Session& session) override;
void
onWSMessage(std::shared_ptr<WSSession> session,
std::vector<boost::asio::const_buffer> const& buffers) override;
void
onClose (Session& session,
boost::system::error_code const&) override;
void
onStopped (Server&) override;
//--------------------------------------------------------------------------
Json::Value
processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobCoro> const& coro,
Json::Value const& jv);
void
processSession (std::shared_ptr<Session> const&,
std::shared_ptr<JobCoro> jobCoro);
void
processRequest (Port const& port, std::string const& request,
beast::IP::Endpoint const& remoteIPAddress, Output&&,
std::shared_ptr<JobCoro> jobCoro,
std::string forwardedFor, std::string user);
private:
bool
isWebsocketUpgrade (http_request_type const& request);
bool
authorized (Port const& port,
std::map<std::string, std::string> const& h);
};
}
#endif

View File

@@ -1,101 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <BeastConfig.h>
#include <ripple/basics/contract.h>
#include <ripple/server/impl/ServerImpl.h>
#include <ripple/server/impl/BaseHTTPPeer.h>
#include <boost/chrono/chrono_io.hpp>
#include <boost/utility/in_place_factory.hpp>
#include <cassert>
#include <ctime>
#include <iomanip>
#include <iostream>
#include <string>
#include <stdio.h>
#include <time.h>
namespace ripple {
ServerImpl::ServerImpl (Handler& handler,
boost::asio::io_service& io_service, beast::Journal journal)
: handler_(handler)
, j_(journal)
, io_service_(io_service)
, strand_(io_service_)
, work_(io_service_)
{
}
ServerImpl::~ServerImpl()
{
// Handler::onStopped will not be called
work_ = boost::none;
ios_.close();
ios_.join();
}
void
ServerImpl::ports (std::vector<Port> const& ports)
{
if (closed())
Throw<std::logic_error> ("ports() on closed Server");
ports_.reserve(ports.size());
for(auto const& port : ports)
{
if (! port.websockets())
{
ports_.push_back(port);
if(auto sp = ios_.emplace<Door>(handler_,
io_service_, ports_.back(), j_))
{
list_.push_back(sp);
sp->run();
}
}
}
}
void
ServerImpl::close()
{
ios_.close(
[&]
{
work_ = boost::none;
handler_.onStopped(*this);
});
}
bool
ServerImpl::closed()
{
return ios_.closed();
}
//--------------------------------------------------------------------------
std::unique_ptr<Server>
make_Server (Handler& handler,
boost::asio::io_service& io_service, beast::Journal journal)
{
return std::make_unique<ServerImpl>(handler, io_service, journal);
}
} // ripple

View File

@@ -21,8 +21,8 @@
#define RIPPLE_SERVER_SERVERIMPL_H_INCLUDED
#include <ripple/basics/chrono.h>
#include <ripple/server/Handler.h>
#include <ripple/server/Server.h>
#include <ripple/server/impl/Door.h>
#include <ripple/server/impl/io_list.h>
#include <ripple/beast/core/List.h>
#include <ripple/beast/core/Thread.h>
@@ -30,17 +30,52 @@
#include <boost/optional.hpp>
#include <array>
#include <chrono>
#include <condition_variable>
#include <deque>
#include <functional>
#include <mutex>
#include <thread>
namespace ripple {
class BasicPeer;
class Door;
/** A multi-protocol server.
This server maintains multiple configured listening ports,
with each listening port allows for multiple protocols including
HTTP, HTTP/S, WebSocket, Secure WebSocket, and the Peer protocol.
*/
class Server
{
public:
/** Destroy the server.
The server is closed if it is not already closed. This call
blocks until the server has stopped.
*/
virtual
~Server() = default;
/** Returns the Journal associated with the server. */
virtual
beast::Journal
journal() = 0;
/** Set the listening port settings.
This may only be called once.
*/
virtual
void
ports (std::vector<Port> const& v) = 0;
/** Close the server.
The close is performed asynchronously. The handler will be notified
when the server has stopped. The server is considered stopped when
there are no pending I/O completion handlers and all connections
have closed.
Thread safety:
Safe to call concurrently from any thread.
*/
virtual
void
close() = 0;
};
template<class Handler>
class ServerImpl : public Server
{
private:
@@ -51,7 +86,7 @@ private:
historySize = 100
};
using Doors = std::vector <std::shared_ptr<Door>>;
using Doors = std::vector <std::shared_ptr<Door<Handler>>>;
Handler& handler_;
beast::Journal j_;
@@ -61,14 +96,14 @@ private:
std::mutex m_;
std::vector<Port> ports_;
std::vector<std::weak_ptr<Door>> list_;
std::vector<std::weak_ptr<Door<Handler>>> list_;
int high_ = 0;
std::array <std::size_t, 64> hist_;
io_list ios_;
public:
ServerImpl (Handler& handler,
ServerImpl(Handler& handler,
boost::asio::io_service& io_service, beast::Journal journal);
~ServerImpl();
@@ -91,7 +126,6 @@ public:
return ios_;
}
public:
boost::asio::io_service&
get_io_service()
{
@@ -107,7 +141,71 @@ private:
ceil_log2 (unsigned long long x);
};
template<class Handler>
ServerImpl<Handler>::
ServerImpl(Handler& handler,
boost::asio::io_service& io_service, beast::Journal journal)
: handler_(handler)
, j_(journal)
, io_service_(io_service)
, strand_(io_service_)
, work_(io_service_)
{
}
template<class Handler>
ServerImpl<Handler>::
~ServerImpl()
{
// Handler::onStopped will not be called
work_ = boost::none;
ios_.close();
ios_.join();
}
template<class Handler>
void
ServerImpl<Handler>::
ports (std::vector<Port> const& ports)
{
if (closed())
Throw<std::logic_error> ("ports() on closed Server");
ports_.reserve(ports.size());
for(auto const& port : ports)
{
if (! port.websockets())
{
ports_.push_back(port);
if(auto sp = ios_.emplace<Door<Handler>>(handler_,
io_service_, ports_.back(), j_))
{
list_.push_back(sp);
sp->run();
}
}
}
}
template<class Handler>
void
ServerImpl<Handler>::
close()
{
ios_.close(
[&]
{
work_ = boost::none;
handler_.onStopped(*this);
});
}
template<class Handler>
bool
ServerImpl<Handler>::
closed()
{
return ios_.closed();
}
} // ripple
#endif