Add support for Beast Websockets (RIPD-1097)

This commit is contained in:
Miguel Portilla
2016-03-25 17:13:46 -04:00
committed by Howard Hinnant
parent f45e279e06
commit d7a778ce6a
11 changed files with 294 additions and 158 deletions

View File

@@ -24,7 +24,7 @@
#include <ripple/json/json_forwards.h>
#include <ripple/json/json_value.h>
#include <boost/asio/buffer.hpp>
#include <stack>
namespace Json
@@ -64,6 +64,16 @@ public:
/// \see Json::operator>>(std::istream&, Json::Value&).
bool parse ( std::istream& is, Value& root);
/** \brief Read a Value from a <a HREF="http://www.json.org">JSON</a> buffer sequence.
* \param root [out] Contains the root value of the document if it was
* successfully parsed.
* \param UTF-8 encoded buffer sequence.
* \return \c true if the buffer was successfully parsed, \c false if an error occurred.
*/
template<class BufferSequence>
bool
parse(Value& root, BufferSequence const& bs);
/** \brief Returns a user friendly string that list errors in the parsed document.
* \return Formatted error message with the list of errors with their location in
* the parsed document. An empty string is returned if no error occurred
@@ -161,6 +171,20 @@ private:
Value* lastValue_;
};
template<class BufferSequence>
bool
Reader::parse(Value& root, BufferSequence const& bs)
{
using namespace boost::asio;
for (auto const& b : bs)
{
auto begin = buffer_cast<const char*>(b);
if(! parse(begin, begin + buffer_size(b), root))
return false;
}
return true;
}
/** \brief Read from 'sin' into 'root'.
Always keep comments from the input JSON.

View File

@@ -118,6 +118,7 @@ public:
};
public:
InfoSub (Source& source);
InfoSub (Source& source, Consumer consumer);
virtual ~InfoSub ();
@@ -156,6 +157,14 @@ private:
hash_set <AccountID> normalSubscriptions_;
std::shared_ptr <PathRequest> mPathRequest;
std::uint64_t mSeq;
static
int
assign_id()
{
static std::atomic<std::uint64_t> id(0);
return ++id;
}
};
} // ripple

View File

@@ -43,12 +43,17 @@ InfoSub::Source::Source (char const* name, Stoppable& parent)
//------------------------------------------------------------------------------
InfoSub::InfoSub (Source& source, Consumer consumer)
: m_consumer (consumer)
, m_source (source)
InfoSub::InfoSub(Source& source)
: m_source(source)
, mSeq(assign_id())
{
}
InfoSub::InfoSub(Source& source, Consumer consumer)
: m_consumer(consumer)
, m_source(source)
, mSeq(assign_id())
{
static std::atomic <int> s_seq_id (0);
mSeq = ++s_seq_id;
}
InfoSub::~InfoSub ()

View File

@@ -111,10 +111,18 @@ struct WSSession
{
std::shared_ptr<void> appDefined;
virtual
void
run() = 0;
virtual
Port const&
port() const = 0;
virtual
http_request_type const&
request() const = 0;
virtual
boost::asio::ip::tcp::endpoint const&
remote_endpoint() const = 0;
@@ -127,6 +135,14 @@ struct WSSession
virtual
void
close() = 0;
/** Indicate that the response is complete.
The handler should call this when it has completed writing
the response.
*/
virtual
void
complete() = 0;
};
} // ripple

View File

@@ -67,7 +67,6 @@ protected:
// Max seconds without completing a message
timeoutSeconds = 30
};
struct buffer
@@ -105,7 +104,6 @@ protected:
bool complete_ = false;
boost::system::error_code ec_;
clock_type::time_point when_;
int request_count_ = 0;
std::size_t bytes_in_ = 0;
std::size_t bytes_out_ = 0;
@@ -233,7 +231,6 @@ BaseHTTPPeer<Impl>::BaseHTTPPeer (Port const& port, Handler& handler,
id_ = std::string("#") + std::to_string(nid_) + " ";
JLOG(journal_.trace()) << id_ <<
"accept: " << remote_address_.address();
when_ = clock_type::now();
}
template <class Impl>

View File

@@ -43,6 +43,12 @@ protected:
using BasePeer<Impl>::fail;
using BasePeer<Impl>::strand_;
enum
{
// Max seconds without completing a message
timeoutSeconds = 30
};
private:
friend class BasePeer<Impl>;
@@ -52,6 +58,8 @@ private:
beast::streambuf wb_;
std::list<std::shared_ptr<WSMsg>> wq_;
bool do_close_ = false;
beast::websocket::close_reason cr_;
waitable_timer timer_;
public:
template<class Body, class Headers>
@@ -64,7 +72,7 @@ public:
beast::Journal journal);
void
run();
run() override;
//
// WSSession
@@ -76,6 +84,12 @@ public:
return this->port_;
}
http_request_type const&
request() const override
{
return this->request_;
}
boost::asio::ip::tcp::endpoint const&
remote_endpoint() const override
{
@@ -88,6 +102,9 @@ public:
void
close() override;
void
complete() override;
protected:
struct identity
{
@@ -138,6 +155,15 @@ protected:
virtual
void
do_close() = 0;
void
start_timer();
void
cancel_timer();
void
on_timer(error_code ec);
};
//------------------------------------------------------------------------------
@@ -154,6 +180,7 @@ BaseWSPeer<Impl>::BaseWSPeer(
: BasePeer<Impl>(port, handler, remote_address,
io_service, journal)
, request_(std::move(request))
, timer_(io_service)
{
}
@@ -175,11 +202,21 @@ template<class Impl>
void
BaseWSPeer<Impl>::send(std::shared_ptr<WSMsg> w)
{
// Maximum send queue size
static std::size_t constexpr limit = 100;
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(
&BaseWSPeer::send, impl().shared_from_this(),
std::move(w)));
wq_.emplace_back(std::move(w));
if (wq_.size() >= limit)
{
cr_.code = static_cast<beast::websocket::close_code::value>(4000);
cr_.reason = "Client is too slow.";
do_close_ = true;
wq_.erase(std::next(wq_.begin()), wq_.end());
}
else
wq_.emplace_back(std::move(w));
if(wq_.size() == 1)
on_write({});
}
@@ -199,6 +236,16 @@ BaseWSPeer<Impl>::close()
beast::asio::placeholders::error)));
}
template<class Impl>
void
BaseWSPeer<Impl>::complete()
{
if (! strand_.running_in_this_thread())
return strand_.post(std::bind(
&BaseWSPeer::complete, impl().shared_from_this()));
do_read();
}
template<class Impl>
void
BaseWSPeer<Impl>::on_write_sb(error_code const& ec)
@@ -222,6 +269,7 @@ template<class Impl>
void
BaseWSPeer<Impl>::on_write(error_code const& ec)
{
cancel_timer();
if(ec)
return fail(ec, "write");
auto& w = *wq_.front();
@@ -231,6 +279,7 @@ BaseWSPeer<Impl>::on_write(error_code const& ec)
impl().shared_from_this()));
if(boost::indeterminate(result.first))
return;
start_timer();
if(! result.first)
impl().ws_.async_write_frame(
result.first, result.second, strand_.wrap(std::bind(
@@ -251,7 +300,7 @@ BaseWSPeer<Impl>::on_write_fin(error_code const& ec)
return fail(ec, "write_fin");
wq_.pop_front();
if(do_close_)
impl().ws_.async_close({}, strand_.wrap(std::bind(
impl().ws_.async_close(cr_, strand_.wrap(std::bind(
&BaseWSPeer::on_close, impl().shared_from_this(),
beast::asio::placeholders::error)));
else if(! wq_.empty())
@@ -269,6 +318,7 @@ BaseWSPeer<Impl>::do_read()
impl().ws_.async_read(op_, rb_, strand_.wrap(
std::bind(&BaseWSPeer::on_read,
impl().shared_from_this(), placeholders::error)));
cancel_timer();
}
template<class Impl>
@@ -286,7 +336,6 @@ BaseWSPeer<Impl>::on_read(error_code const& ec)
std::back_inserter(b));
this->handler_.onWSMessage(impl().shared_from_this(), b);
rb_.consume(rb_.size());
do_read();
}
template<class Impl>
@@ -296,6 +345,42 @@ BaseWSPeer<Impl>::on_close(error_code const& ec)
// great
}
template <class Impl>
void
BaseWSPeer<Impl>::start_timer()
{
// Max seconds without completing a message
static constexpr std::chrono::seconds timeout{30};
error_code ec;
timer_.expires_from_now (timeout, ec);
if (ec)
return fail (ec, "start_timer");
timer_.async_wait (strand_.wrap (std::bind (
&BaseWSPeer<Impl>::on_timer, impl().shared_from_this(),
beast::asio::placeholders::error)));
}
// Convenience for discarding the error code
template <class Impl>
void
BaseWSPeer<Impl>::cancel_timer()
{
error_code ec;
timer_.cancel(ec);
}
// Called when session times out
template <class Impl>
void
BaseWSPeer<Impl>::on_timer (error_code ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
if (! ec)
ec = boost::system::errc::make_error_code (
boost::system::errc::timed_out);
fail (ec, "timer");
}
} // ripple
#endif

View File

@@ -99,7 +99,6 @@ PlainHTTPPeer::websocketUpgrade()
port_, handler_, remote_address_,
std::move(message_), std::move(stream_),
journal_);
ws->run();
return ws;
}

View File

@@ -23,6 +23,7 @@
namespace ripple {
// Detects legacy websockets only.
bool
Port::websockets() const
{
@@ -33,7 +34,9 @@ bool
Port::secure() const
{
return protocol.count("peer") > 0 ||
protocol.count("https") > 0 || protocol.count("wss") > 0;
protocol.count("https") > 0 ||
protocol.count("wss") > 0 ||
protocol.count("wss2") > 0;
}
std::string

View File

@@ -104,7 +104,6 @@ SSLHTTPPeer::websocketUpgrade()
port_, handler_, remote_address_,
std::move(message_), std::move(ssl_bundle_),
journal_);
ws->run();
return ws;
}
@@ -118,10 +117,11 @@ SSLHTTPPeer::do_handshake (yield_context yield)
stream_type::server, read_buf_.data(), yield[ec]));
cancel_timer();
if (ec)
return fail (ec, "handshake");
return fail(ec, "handshake");
bool const http =
port().protocol.count("peer") > 0 ||
//|| port().protocol.count("wss") > 0
//port().protocol.count("wss") > 0 ||
port().protocol.count("wss2") > 0 ||
port().protocol.count("https") > 0;
if (http)
{

View File

@@ -124,37 +124,35 @@ ServerHandlerImp::onHandoff (Session& session,
boost::asio::ip::tcp::endpoint remote_address) ->
Handoff
{
if (session.port().protocol.count("wss2") > 0 &&
isWebsocketUpgrade (request))
if(isWebsocketUpgrade(request))
{
// VFALCO TODO
Resource::Consumer usage;
//if (isUnlimited (role))
// usage = m_resourceManager.newUnlimitedEndpoint (
// remoteIPAddress.to_string());
//else
usage = m_resourceManager.newInboundEndpoint(
beast::IP::from_asio(remote_address));
auto const ws = session.websocketUpgrade();
ws->appDefined = std::make_shared<WSInfoSub>(
m_networkOPs, usage, ws);
Handoff handoff;
handoff.moved = true;
return handoff;
if(session.port().protocol.count("wss2") > 0)
{
auto const ws = session.websocketUpgrade();
auto is = std::make_shared<WSInfoSub>(m_networkOPs, ws);
is->getConsumer() = requestInboundEndpoint(
m_resourceManager,
beast::IPAddressConversion::from_asio(remote_address),
session.port(), is->user());
ws->appDefined = std::move(is);
ws->run();
handoff.moved = true;
return handoff;
}
if(session.port().protocol.count("wss") > 0)
return handoff; // Pass to websocket
}
if (session.port().protocol.count("wss") > 0 &&
isWebsocketUpgrade (request))
if(session.port().protocol.count("peer") > 0)
{
// Pass to websockets
Handoff handoff;
// handoff.moved = true;
return handoff;
}
if (session.port().protocol.count("peer") > 0)
return app_.overlay().onHandoff (std::move(bundle),
return app_.overlay().onHandoff(std::move(bundle),
std::move(request), remote_address);
// Pass through to legacy onRequest
return Handoff{};
}
// Pass to legacy onRequest
return {};
}
auto
@@ -164,34 +162,22 @@ ServerHandlerImp::onHandoff (Session& session,
boost::asio::ip::tcp::endpoint remote_address) ->
Handoff
{
if (session.port().protocol.count("ws2") > 0 &&
Handoff handoff;
if(session.port().protocol.count("ws2") > 0 &&
isWebsocketUpgrade (request))
{
// VFALCO TODO
Resource::Consumer usage;
//if (isUnlimited (role))
// usage = m_resourceManager.newUnlimitedEndpoint (
// remoteIPAddress.to_string());
//else
usage = m_resourceManager.newInboundEndpoint(
beast::IP::from_asio(remote_address));
auto const ws = session.websocketUpgrade();
ws->appDefined = std::make_shared<WSInfoSub>(
m_networkOPs, usage, ws);
Handoff handoff;
auto is = std::make_shared<WSInfoSub>(m_networkOPs, ws);
is->getConsumer() = requestInboundEndpoint(
m_resourceManager,
beast::IPAddressConversion::from_asio(remote_address),
session.port(), is->user());
ws->appDefined = std::move(is);
ws->run();
handoff.moved = true;
return handoff;
}
if (session.port().protocol.count("ws") > 0 &&
isWebsocketUpgrade (request))
{
// Pass to websockets
Handoff handoff;
// handoff.moved = true;
return handoff;
}
// Pass through to legacy onRequest
return Handoff{};
// Otherwise pass to legacy onRequest or websocket
return handoff;
}
static inline
@@ -219,6 +205,21 @@ build_map(beast::http::headers const& h)
return c;
}
template<class ConstBufferSequence>
static
std::string
buffers_to_string(ConstBufferSequence const& bs)
{
using boost::asio::buffer_cast;
using boost::asio::buffer_size;
std::string s;
s.reserve(buffer_size(bs));
for(auto const& b : bs)
s.append(buffer_cast<char const*>(b),
buffer_size(b));
return s;
}
void
ServerHandlerImp::onRequest (Session& session)
{
@@ -252,29 +253,37 @@ ServerHandlerImp::onWSMessage(
std::shared_ptr<WSSession> session,
std::vector<boost::asio::const_buffer> const& buffers)
{
// VFALCO This is inefficient, the JSON
// should be parsed from the buffer sequence.
std::string s;
s.reserve(boost::asio::buffer_size(buffers));
std::copy(boost::asio::buffers_begin(buffers),
boost::asio::buffers_end(buffers),
std::back_inserter(s));
//m_journal.error << "Recv: " << s;
Json::Value jv;
// VFALCO should we parse a coroutine instead?
if(! Json::Reader{}.parse(s, jv))
auto const size = boost::asio::buffer_size(buffers);
if (size > RPC::Tuning::maxRequestSize ||
! Json::Reader{}.parse(jv, buffers) ||
! jv ||
! jv.isObject())
{
// TODO Send error
Json::Value jvResult(Json::objectValue);
jvResult[jss::type] = jss::error;
jvResult[jss::error] = "jsonInvalid";
jvResult[jss::value] = buffers_to_string(buffers);
beast::streambuf sb;
Json::stream(jvResult,
[&sb](auto const p, auto const n)
{
sb.commit(boost::asio::buffer_copy(
sb.prepare(n), boost::asio::buffer(p, n)));
});
session->send(std::make_shared<
StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
return;
}
m_jobQueue.postCoro(jtCLIENT, "WS-Client",
[this, session = std::move(session),
jv = std::move(jv)](auto const& coro)
jv = std::move(jv)](auto const& jc)
{
auto const jr =
this->processSession(session, coro, jv);
this->processSession(session, jc, jv);
beast::streambuf sb;
//m_journal.error << "Send: " << to_string(jr);
Json::stream(jr,
[&sb](auto const p, auto const n)
{
@@ -283,6 +292,7 @@ ServerHandlerImp::onWSMessage(
});
session->send(std::make_shared<
StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
});
}
@@ -308,58 +318,44 @@ ServerHandlerImp::processSession(
std::shared_ptr<JobCoro> const& coro,
Json::Value const& jv)
{
auto is = std::static_pointer_cast<InfoSub> (session->appDefined);
/*
if (getConsumer().disconnect ())
auto is = std::static_pointer_cast<WSInfoSub> (session->appDefined);
if (is->getConsumer().disconnect())
{
disconnect ();
return rpcError (rpcSLOW_DOWN);
session->close();
return rpcError(rpcSLOW_DOWN);
}
*/
// Requests without "command" are invalid.
//
if (!jv.isMember (jss::command))
Json::Value jr(Json::objectValue);
if (! jv.isMember (jss::command))
{
Json::Value jr (Json::objectValue);
jr[jss::type] = jss::response;
jr[jss::status] = jss::error;
jr[jss::error] = jss::missingCommand;
jr[jss::type] = jss::response;
jr[jss::status] = jss::error;
jr[jss::error] = jss::missingCommand;
jr[jss::request] = jv;
if (jv.isMember (jss::id))
{
jr[jss::id] = jv[jss::id];
}
/*
getConsumer().charge (Resource::feeInvalidRPC);
*/
is->getConsumer().charge(Resource::feeInvalidRPC);
return jr;
}
Resource::Charge loadType = Resource::feeReferenceRPC;
Json::Value jr (Json::objectValue);
auto required = RPC::roleRequired (jv[jss::command].asString());
// VFALCO TODO Get identity/credentials from HTTP headers
std::string const user = "";
std::string const fwdfor = "";
auto role = requestRole (required, session->port(), jv,
auto required = RPC::roleRequired(jv[jss::command].asString());
auto role = requestRole(
required,
session->port(),
jv,
beast::IP::from_asio(session->remote_endpoint().address()),
user);
is->user());
if (Role::FORBID == role)
{
jr[jss::result] = rpcError (rpcFORBIDDEN);
jr[jss::result] = rpcError (rpcFORBIDDEN);
}
else
{
// VFALCO TODO InfoSub parameter in context
RPC::Context context{
app_.journal ("RPCHandler"),
app_.journal("RPCHandler"),
jv,
app_,
loadType,
@@ -369,28 +365,24 @@ ServerHandlerImp::processSession(
role,
coro,
is,
{ user, fwdfor }
{is->user(), is->forwarded_for()}
};
RPC::doCommand (context, jr[jss::result]);
RPC::doCommand(context, jr[jss::result]);
}
/*
getConsumer().charge (loadType);
if (getConsumer().warn ())
{
is->getConsumer().charge(loadType);
if (is->getConsumer().warn())
jr[jss::warning] = jss::load;
}
*/
// Currently we will simply unwrap errors returned by the RPC
// API, in the future maybe we can make the responses
// consistent.
//
// Regularize result. This is duplicate code.
if (jr[jss::result].isMember (jss::error))
if (jr[jss::result].isMember(jss::error))
{
jr = jr[jss::result];
jr[jss::status] = jss::error;
jr = jr[jss::result];
jr[jss::status] = jss::error;
jr[jss::request] = jv;
}
@@ -399,47 +391,27 @@ ServerHandlerImp::processSession(
jr[jss::status] = jss::success;
// For testing resource limits on this connection.
if (jv[jss::command].asString() == "ping")
{
/*
if (getConsumer().isUnlimited())
if (is->getConsumer().isUnlimited() &&
jv[jss::command].asString() == "ping")
jr[jss::unlimited] = true;
*/
}
}
if (jv.isMember (jss::id))
{
if (jv.isMember(jss::id))
jr[jss::id] = jv[jss::id];
}
jr[jss::type] = jss::response;
return jr;
}
template<class ConstBufferSequence>
static
std::string
buffers_to_string(ConstBufferSequence const& bs)
{
using boost::asio::buffer_cast;
using boost::asio::buffer_size;
std::string s;
s.reserve(buffer_size(bs));
for(auto const& b : bs)
s.append(buffer_cast<char const*>(b),
buffer_size(b));
return s;
}
// Run as a coroutine.
void
ServerHandlerImp::processSession (std::shared_ptr<Session> const& session,
std::shared_ptr<JobCoro> jobCoro)
{
processRequest (session->port(), buffers_to_string(session->request().body.data()),
session->remoteAddress().at_port (0), makeOutput (*session), jobCoro,
processRequest (
session->port(), buffers_to_string(
session->request().body.data()),
session->remoteAddress().at_port (0),
makeOutput (*session), jobCoro,
[&]
{
auto const iter =
@@ -506,10 +478,10 @@ ServerHandlerImp::processRequest (Port const& port,
/* ---------------------------------------------------------------------- */
auto role = Role::FORBID;
auto required = RPC::roleRequired(id.asString());
if (jsonRPC.isObject() && jsonRPC.isMember("params") &&
jsonRPC["params"].isArray() && jsonRPC["params"].size() > 0 &&
jsonRPC["params"][Json::UInt(0)].isObject())
if (jsonRPC.isMember("params") &&
jsonRPC["params"].isArray() &&
jsonRPC["params"].size() > 0 &&
jsonRPC["params"][Json::UInt(0)].isObject())
{
role = requestRole(required, port, jsonRPC["params"][Json::UInt(0)],
remoteIPAddress, user);

View File

@@ -45,13 +45,39 @@ bool operator< (Port const& lhs, Port const& rhs)
class WSInfoSub : public InfoSub
{
std::weak_ptr<WSSession> ws_;
std::string user_;
std::string fwdfor_;
public:
WSInfoSub(Source& source, Consumer consumer,
WSInfoSub(Source& source,
std::shared_ptr<WSSession> const& ws)
: InfoSub(source, consumer)
: InfoSub(source)
, ws_(ws)
{
auto const& h = ws->request().headers;
auto it = h.find("X-User");
if (it != h.end() &&
isIdentified(
ws->port(), beast::IPAddressConversion::from_asio(
ws->remote_endpoint()).address(), it->second))
{
user_ = it->second;
it = h.find("X-Forwarded-For");
if (it != h.end())
fwdfor_ = it->second;
}
}
std::string
user() const
{
return user_;
}
std::string
forwarded_for() const
{
return fwdfor_;
}
void