Beast.WebSocket:

Beast.WebSocket provides developers with a robust WebSocket
implementation built on Boost.Asio with a consistent asynchronous
model using a modern C++ approach.
This commit is contained in:
Vinnie Falco
2016-02-25 16:17:19 -05:00
parent 5e8d028da2
commit 2cb3834bbb
106 changed files with 14671 additions and 772 deletions

View File

@@ -21,6 +21,7 @@
#define RIPPLE_SERVER_HANDLER_H_INCLUDED
#include <ripple/server/Handoff.h>
#include <ripple/server/WSSession.h>
#include <beast/asio/ssl_bundle.h>
#include <boost/asio/buffer.hpp>
#include <boost/asio/ip/tcp.hpp>
@@ -83,6 +84,20 @@ struct Handler
/** Called when the server has finished its stop. */
virtual void onStopped (Server& server) = 0;
//
// WebSockets
//
/** Called on a WebSocket Upgrade request. */
/** Called for each complete WebSocket message. */
virtual
void
onWSMessage(std::shared_ptr<WSSession> session,
std::vector<boost::asio::const_buffer> const& buffers) = 0;
};
} // ripple

View File

@@ -26,7 +26,12 @@
namespace ripple {
/** Multi-threaded, asynchronous HTTP server. */
/** A multi-protocol server.
This server maintains multiple configured listening ports,
with each listening port allows for multiple protocols including
HTTP, HTTP/S, WebSocket, Secure WebSocket, and the Peer protocol.
*/
class Server
{
public:

View File

@@ -21,6 +21,7 @@
#define RIPPLE_SERVER_SESSION_H_INCLUDED
#include <ripple/server/Writer.h>
#include <ripple/server/WSSession.h>
#include <beast/http/body.h>
#include <beast/http/message.h>
#include <ripple/beast/net/IPEndpoint.h>
@@ -130,6 +131,11 @@ public:
virtual
void
close (bool graceful) = 0;
/** Convert the connection to WebSocket. */
virtual
std::shared_ptr<WSSession>
websocketUpgrade() = 0;
};
} // ripple

View File

@@ -0,0 +1,134 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_WSSESSION_H_INCLUDED
#define RIPPLE_SERVER_WSSESSION_H_INCLUDED
#include <ripple/server/Port.h>
#include <ripple/server/Writer.h>
#include <boost/asio/buffer.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/logic/tribool.hpp>
#include <algorithm>
#include <functional>
#include <memory>
#include <utility>
#include <vector>
namespace ripple {
class WSMsg
{
public:
WSMsg() = default;
WSMsg(WSMsg const&) = delete;
WSMsg& operator=(WSMsg const&) = delete;
virtual ~WSMsg() = default;
/** Retrieve message data.
Returns a tribool indicating whether or not
data is available, and a ConstBufferSequence
representing the data.
tribool values:
maybe: Data is not ready yet
false: Data is available
true: Data is available, and
it is the last chunk of bytes.
Derived classes that do not know when the data
ends (for example, when returning the output of a
paged database query) may return `true` and an
empty vector.
*/
virtual
std::pair<boost::tribool,
std::vector<boost::asio::const_buffer>>
prepare(std::size_t bytes,
std::function<void(void)> resume) = 0;
};
template<class Streambuf>
class StreambufWSMsg : public WSMsg
{
Streambuf sb_;
std::size_t n_ = 0;
public:
StreambufWSMsg(Streambuf&& sb)
: sb_(std::move(sb))
{
}
std::pair<boost::tribool,
std::vector<boost::asio::const_buffer>>
prepare(std::size_t bytes,
std::function<void(void)>) override
{
if(sb_.size() == 0)
return { true, {} };
sb_.consume(n_);
boost::tribool done;
// VFALCO TODO respect `bytes` fully
if(bytes < sb_.size())
{
n_ = bytes;
done = boost::indeterminate;
}
else
{
n_ = sb_.size();
done = true;
}
std::vector<boost::asio::const_buffer> vb;
auto const& data = sb_.data();
vb.reserve(std::distance(
data.begin(), data.end()));
std::copy(data.begin(), data.end(),
std::back_inserter(vb));
return { done, vb };
}
};
struct WSSession
{
std::shared_ptr<void> appDefined;
virtual
Port const&
port() const = 0;
virtual
boost::asio::ip::tcp::endpoint const&
remote_endpoint() const = 0;
/** Send a WebSockets message. */
virtual
void
send(std::shared_ptr<WSMsg> w) = 0;
virtual
void
close() = 0;
};
} // ripple
#endif

View File

@@ -36,7 +36,10 @@ public:
bool
complete() = 0;
/** Removes bytes from the input sequence. */
/** Removes bytes from the input sequence.
Can be called with 0.
*/
virtual
void
consume (std::size_t bytes) = 0;

View File

@@ -0,0 +1,128 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_BASEPEER_H_INCLUDED
#define RIPPLE_SERVER_BASEPEER_H_INCLUDED
#include <ripple/server/Handler.h>
#include <ripple/server/Port.h>
#include <ripple/server/impl/io_list.h>
#include <ripple/beast/utility/WrappedSink.h>
#include <boost/asio.hpp>
#include <cassert>
#include <functional>
#include <string>
namespace ripple {
// Common part of all peers
template<class Impl>
class BasePeer
: public io_list::work
{
protected:
using clock_type = std::chrono::system_clock;
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using waitable_timer = boost::asio::basic_waitable_timer <clock_type>;
Port const& port_;
Handler& handler_;
endpoint_type remote_address_;
beast::WrappedSink sink_;
beast::Journal j_;
boost::asio::io_service::work work_;
boost::asio::io_service::strand strand_;
error_code ec_;
public:
BasePeer(Port const& port, Handler& handler,
endpoint_type remote_address,
boost::asio::io_service& io_service,
beast::Journal journal);
void
close() override;
protected:
template<class String>
void
fail(error_code ec, String const& what);
private:
Impl&
impl()
{
return *static_cast<Impl*>(this);
}
};
//------------------------------------------------------------------------------
template<class Impl>
BasePeer<Impl>::BasePeer(Port const& port, Handler& handler,
endpoint_type remote_address,
boost::asio::io_service& io_service,
beast::Journal journal)
: port_(port)
, handler_(handler)
, remote_address_(remote_address)
, sink_(journal.sink(),
[]
{
static int id = 0;
return "##" + std::to_string(++id) + " ";
}())
, j_(sink_)
, work_(io_service)
, strand_(io_service)
{
}
template<class Impl>
void
BasePeer<Impl>::close()
{
if (! strand_.running_in_this_thread())
return strand_.post(std::bind(
&BasePeer::close, impl().shared_from_this()));
error_code ec;
impl().ws_.lowest_layer().close(ec);
}
template<class Impl>
template<class String>
void
BasePeer<Impl>::fail(error_code ec, String const& what)
{
assert(strand_.running_in_this_thread());
if(! ec_ &&
ec != boost::asio::error::operation_aborted)
{
ec_ = ec;
JLOG(j_.trace()) <<
what << ": " << ec.message();
impl().ws_.lowest_layer().close(ec);
}
}
} // ripple
#endif

View File

@@ -0,0 +1,301 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_BASEWSPEER_H_INCLUDED
#define RIPPLE_SERVER_BASEWSPEER_H_INCLUDED
#include <ripple/server/impl/BasePeer.h>
#include <ripple/protocol/BuildInfo.h>
#include <beast/wsproto.h>
#include <beast/asio/streambuf.h>
#include <beast/http/message.h>
#include <cassert>
namespace ripple {
/** Represents an active WebSocket connection. */
template <class Impl>
class BaseWSPeer
: public BasePeer<Impl>
, public WSSession
{
protected:
using clock_type = std::chrono::system_clock;
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using waitable_timer = boost::asio::basic_waitable_timer <clock_type>;
using BasePeer<Impl>::fail;
using BasePeer<Impl>::strand_;
private:
friend class BasePeer<Impl>;
http_request_type request_;
beast::wsproto::opcode op_;
beast::streambuf rb_;
beast::streambuf wb_;
std::list<std::shared_ptr<WSMsg>> wq_;
bool do_close_ = false;
public:
template<class Body, class Headers>
BaseWSPeer(
Port const& port,
Handler& handler,
endpoint_type remote_address,
beast::http::message<true, Body, Headers>&& request,
boost::asio::io_service& io_service,
beast::Journal journal);
void
run();
//
// WSSession
//
Port const&
port() const override
{
return this->port_;
}
boost::asio::ip::tcp::endpoint const&
remote_endpoint() const override
{
return this->remote_address_;
}
void
send(std::shared_ptr<WSMsg> w) override;
void
close() override;
protected:
struct identity
{
template<class Body, class Headers>
void
operator()(beast::http::message<true, Body, Headers>& req)
{
req.headers.replace("User-Agent",
BuildInfo::getFullVersionString());
}
template<class Body, class Headers>
void
operator()(beast::http::message<false, Body, Headers>& resp)
{
resp.headers.replace("Server",
BuildInfo::getFullVersionString());
}
};
Impl&
impl()
{
return *static_cast<Impl*>(this);
}
void
on_write_sb(error_code const& ec);
void
do_write();
void
on_write(error_code const& ec);
void
on_write_fin(error_code const& ec);
void
do_read();
void
on_read(error_code const& ec);
void
on_close(error_code const& ec);
virtual
void
do_close() = 0;
};
//------------------------------------------------------------------------------
template<class Impl>
template<class Body, class Headers>
BaseWSPeer<Impl>::BaseWSPeer(
Port const& port,
Handler& handler,
endpoint_type remote_address,
beast::http::message<true, Body, Headers>&& request,
boost::asio::io_service& io_service,
beast::Journal journal)
: BasePeer<Impl>(port, handler, remote_address,
io_service, journal)
, request_(std::move(request))
{
}
template<class Impl>
void
BaseWSPeer<Impl>::run()
{
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(
&BaseWSPeer::run, impl().shared_from_this()));
impl().ws_.set_option(beast::wsproto::decorate(identity{}));
using namespace beast::asio;
impl().ws_.async_accept(request_, strand_.wrap(std::bind(
&BaseWSPeer::on_write_sb, impl().shared_from_this(),
placeholders::error)));
}
template<class Impl>
void
BaseWSPeer<Impl>::send(std::shared_ptr<WSMsg> w)
{
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(
&BaseWSPeer::send, impl().shared_from_this(),
std::move(w)));
wq_.emplace_back(std::move(w));
if(wq_.size() == 1)
on_write({});
}
template<class Impl>
void
BaseWSPeer<Impl>::close()
{
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(
&BaseWSPeer::close, impl().shared_from_this()));
if(wq_.size() > 0)
do_close_ = true;
else
impl().ws_.async_close({}, strand_.wrap(std::bind(
&BaseWSPeer::on_close, impl().shared_from_this(),
beast::asio::placeholders::error)));
}
template<class Impl>
void
BaseWSPeer<Impl>::on_write_sb(error_code const& ec)
{
if(ec)
return fail(ec, "write_resp");
do_read();
}
template<class Impl>
void
BaseWSPeer<Impl>::do_write()
{
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(
&BaseWSPeer::do_write, impl().shared_from_this()));
on_write({});
}
template<class Impl>
void
BaseWSPeer<Impl>::on_write(error_code const& ec)
{
if(ec)
return fail(ec, "write");
auto& w = *wq_.front();
using namespace beast::asio;
auto const result = w.prepare(65536,
std::bind(&BaseWSPeer::do_write,
impl().shared_from_this()));
if(boost::indeterminate(result.first))
return;
if(! result.first)
impl().ws_.async_write_frame(
result.first, result.second, strand_.wrap(std::bind(
&BaseWSPeer::on_write, impl().shared_from_this(),
placeholders::error)));
else
impl().ws_.async_write_frame(
result.first, result.second, strand_.wrap(std::bind(
&BaseWSPeer::on_write_fin, impl().shared_from_this(),
placeholders::error)));
}
template<class Impl>
void
BaseWSPeer<Impl>::on_write_fin(error_code const& ec)
{
if(ec)
return fail(ec, "write_fin");
wq_.pop_front();
if(do_close_)
impl().ws_.async_close({}, strand_.wrap(std::bind(
&BaseWSPeer::on_close, impl().shared_from_this(),
beast::asio::placeholders::error)));
else if(! wq_.empty())
on_write({});
}
template<class Impl>
void
BaseWSPeer<Impl>::do_read()
{
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(
&BaseWSPeer::do_read, impl().shared_from_this()));
using namespace beast::asio;
impl().ws_.async_read(op_, rb_, strand_.wrap(
std::bind(&BaseWSPeer::on_read,
impl().shared_from_this(), placeholders::error)));
}
template<class Impl>
void
BaseWSPeer<Impl>::on_read(error_code const& ec)
{
if(ec == beast::wsproto::error::closed)
return do_close();
if(ec)
return fail(ec, "read");
auto const& data = rb_.data();
std::vector<boost::asio::const_buffer> b;
b.reserve(std::distance(data.begin(), data.end()));
std::copy(data.begin(), data.end(),
std::back_inserter(b));
this->handler_.onWSMessage(impl().shared_from_this(), b);
rb_.consume(rb_.size());
do_read();
}
template<class Impl>
void
BaseWSPeer<Impl>::on_close(error_code const& ec)
{
// great
}
} // ripple
#endif

View File

@@ -170,13 +170,15 @@ Door::Door (Handler& handler, boost::asio::io_service& io_service,
, handler_(handler)
, acceptor_(io_service)
, strand_(io_service)
, ssl_ (
, ssl_(
port_.protocol.count("https") > 0 ||
//port_.protocol.count("wss") > 0 ||
port_.protocol.count("peer") > 0)
, plain_ (
port_.protocol.count("wss2") > 0 ||
port_.protocol.count("peer") > 0)
, plain_(
port_.protocol.count("http") > 0 ||
//port_.protocol.count("ws") > 0 ||
port_.protocol.count("http") > 0)
port_.protocol.count("ws2"))
{
error_code ec;
endpoint_type const local_address =

View File

@@ -21,6 +21,7 @@
#define RIPPLE_SERVER_PLAINHTTPPEER_H_INCLUDED
#include <ripple/server/impl/BaseHTTPPeer.h>
#include <ripple/server/impl/PlainWSPeer.h>
#include <memory>
namespace ripple {
@@ -44,12 +45,15 @@ public:
void
run();
std::shared_ptr<WSSession>
websocketUpgrade() override;
private:
void
do_request();
do_request() override;
void
do_close();
do_close() override;
};
//------------------------------------------------------------------------------
@@ -71,7 +75,7 @@ PlainHTTPPeer::PlainHTTPPeer (Port const& port, Handler& handler,
}
void
PlainHTTPPeer::run ()
PlainHTTPPeer::run()
{
if (!handler_.onAccept (session(), remote_address_))
{
@@ -88,6 +92,17 @@ PlainHTTPPeer::run ()
shared_from_this(), std::placeholders::_1));
}
std::shared_ptr<WSSession>
PlainHTTPPeer::websocketUpgrade()
{
auto ws = ios().emplace<PlainWSPeer>(
port_, handler_, remote_address_,
std::move(message_), std::move(stream_),
journal_);
ws->run();
return ws;
}
void
PlainHTTPPeer::do_request()
{

View File

@@ -0,0 +1,87 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_PLAINWSPEER_H_INCLUDED
#define RIPPLE_SERVER_PLAINWSPEER_H_INCLUDED
#include <ripple/server/impl/BaseWSPeer.h>
#include <memory>
namespace ripple {
class PlainWSPeer
: public BaseWSPeer<PlainWSPeer>
, public std::enable_shared_from_this<PlainWSPeer>
{
private:
friend class BasePeer<PlainWSPeer>;
friend class BaseWSPeer<PlainWSPeer>;
using clock_type = std::chrono::system_clock;
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using waitable_timer = boost::asio::basic_waitable_timer <clock_type>;
using socket_type = boost::asio::ip::tcp::socket;
beast::wsproto::socket<socket_type> ws_;
public:
template<class Body, class Headers>
PlainWSPeer(
Port const& port,
Handler& handler,
endpoint_type remote_address,
beast::http::message<true, Body, Headers>&& request,
socket_type&& socket,
beast::Journal journal);
private:
void
do_close() override;
};
//------------------------------------------------------------------------------
template<class Body, class Headers>
PlainWSPeer::PlainWSPeer(
Port const& port,
Handler& handler,
endpoint_type remote_address,
beast::http::message<true, Body, Headers>&& request,
socket_type&& socket,
beast::Journal journal)
: BaseWSPeer(port, handler, remote_address, std::move(request),
socket.get_io_service(), journal)
, ws_(std::move(socket))
{
}
void
PlainWSPeer::do_close()
{
error_code ec;
auto& sock = ws_.next_layer();
sock.shutdown(socket_type::shutdown_both, ec);
if(ec)
return fail(ec, "do_close");
}
} // ripple
#endif

View File

@@ -21,6 +21,7 @@
#define RIPPLE_SERVER_SSLHTTPPEER_H_INCLUDED
#include <ripple/server/impl/BaseHTTPPeer.h>
#include <ripple/server/impl/SSLWSPeer.h>
#include <beast/asio/ssl_bundle.h>
#include <memory>
@@ -47,15 +48,18 @@ public:
void
run();
std::shared_ptr<WSSession>
websocketUpgrade() override;
private:
void
do_handshake (yield_context yield);
void
do_request();
do_request() override;
void
do_close();
do_close() override;
void
on_shutdown (error_code ec);
@@ -93,6 +97,17 @@ SSLHTTPPeer::run()
shared_from_this(), std::placeholders::_1));
}
std::shared_ptr<WSSession>
SSLHTTPPeer::websocketUpgrade()
{
auto ws = ios().emplace<SSLWSPeer>(
port_, handler_, remote_address_,
std::move(message_), std::move(ssl_bundle_),
journal_);
ws->run();
return ws;
}
void
SSLHTTPPeer::do_handshake (yield_context yield)
{

View File

@@ -0,0 +1,106 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_SERVER_SSLWSPEER_H_INCLUDED
#define RIPPLE_SERVER_SSLWSPEER_H_INCLUDED
#include <ripple/server/impl/BaseHTTPPeer.h>
#include <ripple/server/WSSession.h>
#include <beast/asio/ssl_bundle.h>
#include <beast/asio/placeholders.h>
#include <beast/wsproto/ssl.h>
#include <memory>
namespace ripple {
class SSLWSPeer
: public BaseWSPeer<SSLWSPeer>
, public std::enable_shared_from_this<SSLWSPeer>
{
private:
friend class BasePeer<SSLWSPeer>;
friend class BaseWSPeer<SSLWSPeer>;
using clock_type = std::chrono::system_clock;
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using waitable_timer =
boost::asio::basic_waitable_timer <clock_type>;
std::unique_ptr<beast::asio::ssl_bundle> ssl_bundle_;
beast::wsproto::socket<
beast::asio::ssl_bundle::stream_type&> ws_;
public:
template<class Body, class Headers>
SSLWSPeer(
Port const& port,
Handler& handler,
endpoint_type remote_endpoint,
beast::http::message<true, Body, Headers>&& request,
std::unique_ptr<
beast::asio::ssl_bundle>&& ssl_bundle,
beast::Journal journal);
private:
void
do_close() override;
void
on_shutdown(error_code ec);
};
//------------------------------------------------------------------------------
template<class Body, class Headers>
SSLWSPeer::SSLWSPeer(
Port const& port,
Handler& handler,
endpoint_type remote_endpoint,
beast::http::message<true, Body, Headers>&& request,
std::unique_ptr<
beast::asio::ssl_bundle>&& ssl_bundle,
beast::Journal journal)
: BaseWSPeer(port, handler, remote_endpoint, std::move(request),
ssl_bundle->socket.get_io_service(), journal)
, ssl_bundle_(std::move(ssl_bundle))
, ws_(ssl_bundle_->stream)
{
}
void
SSLWSPeer::do_close()
{
//start_timer();
using namespace beast::asio;
ws_.next_layer().async_shutdown(
strand_.wrap(std::bind(&SSLWSPeer::on_shutdown,
shared_from_this(), placeholders::error)));
}
void
SSLWSPeer::on_shutdown(error_code ec)
{
//cancel_timer();
ws_.lowest_layer().close(ec);
}
} // ripple
#endif

View File

@@ -19,6 +19,8 @@
#include <BeastConfig.h>
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/beast/net/IPAddressConversion.h>
#include <ripple/json/json_reader.h>
#include <ripple/server/JsonWriter.h>
#include <ripple/server/make_ServerHandler.h>
@@ -29,13 +31,14 @@
#include <ripple/basics/make_SSLContext.h>
#include <ripple/core/JobQueue.h>
#include <ripple/json/to_string.h>
#include <ripple/net/RPCErr.h>
#include <ripple/server/make_Server.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/resource/ResourceManager.h>
#include <ripple/resource/Fees.h>
#include <ripple/rpc/impl/Tuning.h>
#include <beast/crypto/base64.h>
#include <ripple/rpc/RPCHandler.h>
#include <beast/crypto/base64.h>
#include <beast/http/rfc2616.h>
#include <boost/algorithm/string.hpp>
#include <boost/type_traits.hpp>
@@ -120,6 +123,24 @@ ServerHandlerImp::onHandoff (Session& session,
boost::asio::ip::tcp::endpoint remote_address) ->
Handoff
{
if (session.port().protocol.count("wss2") > 0 &&
isWebsocketUpgrade (request))
{
// VFALCO TODO
Resource::Consumer usage;
//if (isUnlimited (role))
// usage = m_resourceManager.newUnlimitedEndpoint (
// remoteIPAddress.to_string());
//else
usage = m_resourceManager.newInboundEndpoint(
beast::IP::from_asio(remote_address));
auto const ws = session.websocketUpgrade();
ws->appDefined = std::make_shared<WSInfoSub>(
m_networkOPs, usage, ws);
Handoff handoff;
handoff.moved = true;
return handoff;
}
if (session.port().protocol.count("wss") > 0 &&
isWebsocketUpgrade (request))
{
@@ -142,6 +163,24 @@ ServerHandlerImp::onHandoff (Session& session,
boost::asio::ip::tcp::endpoint remote_address) ->
Handoff
{
if (session.port().protocol.count("ws2") > 0 &&
isWebsocketUpgrade (request))
{
// VFALCO TODO
Resource::Consumer usage;
//if (isUnlimited (role))
// usage = m_resourceManager.newUnlimitedEndpoint (
// remoteIPAddress.to_string());
//else
usage = m_resourceManager.newInboundEndpoint(
beast::IP::from_asio(remote_address));
auto const ws = session.websocketUpgrade();
ws->appDefined = std::make_shared<WSInfoSub>(
m_networkOPs, usage, ws);
Handoff handoff;
handoff.moved = true;
return handoff;
}
if (session.port().protocol.count("ws") > 0 &&
isWebsocketUpgrade (request))
{
@@ -208,6 +247,45 @@ ServerHandlerImp::onRequest (Session& session)
});
}
void
ServerHandlerImp::onWSMessage(
std::shared_ptr<WSSession> session,
std::vector<boost::asio::const_buffer> const& buffers)
{
// VFALCO This is inefficient, the JSON
// should be parsed from the buffer sequence.
std::string s;
s.reserve(boost::asio::buffer_size(buffers));
std::copy(boost::asio::buffers_begin(buffers),
boost::asio::buffers_end(buffers),
std::back_inserter(s));
//m_journal.error << "Recv: " << s;
Json::Value jv;
// VFALCO should we parse a coroutine instead?
if(! Json::Reader{}.parse(s, jv))
{
// TODO Send error
return;
}
m_jobQueue.postCoro(jtCLIENT, "WS-Client",
[this, session = std::move(session),
jv = std::move(jv)](auto const& coro)
{
auto const jr =
this->processSession(session, coro, jv);
beast::streambuf sb;
//m_journal.error << "Send: " << to_string(jr);
Json::stream(jr,
[&sb](auto const p, auto const n)
{
sb.commit(boost::asio::buffer_copy(
sb.prepare(n), boost::asio::buffer(p, n)));
});
session->send(std::make_shared<
StreambufWSMsg<decltype(sb)>>(std::move(sb)));
});
}
void
ServerHandlerImp::onClose (Session& session,
boost::system::error_code const&)
@@ -224,6 +302,122 @@ ServerHandlerImp::onStopped (Server&)
//------------------------------------------------------------------------------
Json::Value
ServerHandlerImp::processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobCoro> const& coro,
Json::Value const& jv)
{
auto is = std::static_pointer_cast<InfoSub> (session->appDefined);
/*
if (getConsumer().disconnect ())
{
disconnect ();
return rpcError (rpcSLOW_DOWN);
}
*/
// Requests without "command" are invalid.
//
if (!jv.isMember (jss::command))
{
Json::Value jr (Json::objectValue);
jr[jss::type] = jss::response;
jr[jss::status] = jss::error;
jr[jss::error] = jss::missingCommand;
jr[jss::request] = jv;
if (jv.isMember (jss::id))
{
jr[jss::id] = jv[jss::id];
}
/*
getConsumer().charge (Resource::feeInvalidRPC);
*/
return jr;
}
Resource::Charge loadType = Resource::feeReferenceRPC;
Json::Value jr (Json::objectValue);
auto required = RPC::roleRequired (jv[jss::command].asString());
// VFALCO TODO Get identity/credentials from HTTP headers
std::string const user = "";
std::string const fwdfor = "";
auto role = requestRole (required, session->port(), jv,
beast::IP::from_asio(session->remote_endpoint().address()),
user);
if (Role::FORBID == role)
{
jr[jss::result] = rpcError (rpcFORBIDDEN);
}
else
{
// VFALCO TODO InfoSub parameter in context
RPC::Context context{
app_.journal ("RPCHandler"),
jv,
app_,
loadType,
app_.getOPs(),
app_.getLedgerMaster(),
is->getConsumer(),
role,
coro,
is,
{ user, fwdfor }
};
RPC::doCommand (context, jr[jss::result]);
}
/*
getConsumer().charge (loadType);
if (getConsumer().warn ())
{
jr[jss::warning] = jss::load;
}
*/
// Currently we will simply unwrap errors returned by the RPC
// API, in the future maybe we can make the responses
// consistent.
//
// Regularize result. This is duplicate code.
if (jr[jss::result].isMember (jss::error))
{
jr = jr[jss::result];
jr[jss::status] = jss::error;
jr[jss::request] = jv;
}
else
{
jr[jss::status] = jss::success;
// For testing resource limits on this connection.
if (jv[jss::command].asString() == "ping")
{
/*
if (getConsumer().isUnlimited())
jr[jss::unlimited] = true;
*/
}
}
if (jv.isMember (jss::id))
{
jr[jss::id] = jv[jss::id];
}
jr[jss::type] = jss::response;
return jr;
}
template<class ConstBufferSequence>
static
std::string

View File

@@ -23,9 +23,13 @@
#include <ripple/core/Job.h>
#include <ripple/core/JobCoro.h>
#include <ripple/json/Output.h>
#include <ripple/json/to_string.h>
#include <ripple/net/InfoSub.h>
#include <ripple/server/Handler.h>
#include <ripple/server/JsonWriter.h>
#include <ripple/server/ServerHandler.h>
#include <ripple/server/Session.h>
#include <ripple/server/WSSession.h>
#include <ripple/rpc/RPCHandler.h>
#include <ripple/app/main/CollectorManager.h>
#include <map>
@@ -33,17 +37,46 @@
namespace ripple {
inline
bool operator< (Port const& lhs, Port const& rhs)
{
return lhs.name < rhs.name;
}
class WSInfoSub : public InfoSub
{
std::weak_ptr<WSSession> ws_;
public:
WSInfoSub(Source& source, Consumer consumer,
std::shared_ptr<WSSession> const& ws)
: InfoSub(source, consumer)
, ws_(ws)
{
}
void
send(Json::Value const& jv, bool)
{
auto sp = ws_.lock();
if(! sp)
return;
beast::streambuf sb;
write(sb, jv);
auto m = std::make_shared<
StreambufWSMsg<decltype(sb)>>(
std::move(sb));
sp->send(m);
}
};
// Private implementation
class ServerHandlerImp
: public ServerHandler
, public Handler
{
private:
Application& app_;
Resource::Manager& m_resourceManager;
beast::Journal m_journal;
@@ -85,7 +118,7 @@ private:
onStop() override;
//
// HTTP::Handler
// Handler
//
bool
@@ -105,6 +138,10 @@ private:
void
onRequest (Session& session) override;
void
onWSMessage(std::shared_ptr<WSSession> session,
std::vector<boost::asio::const_buffer> const& buffers) override;
void
onClose (Session& session,
boost::system::error_code const&) override;
@@ -114,6 +151,12 @@ private:
//--------------------------------------------------------------------------
Json::Value
processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobCoro> const& coro,
Json::Value const& jv);
void
processSession (std::shared_ptr<Session> const&,
std::shared_ptr<JobCoro> jobCoro);

View File

@@ -131,6 +131,12 @@ public:
session.close (true);
}
void
onWSMessage(std::shared_ptr<WSSession> session,
std::vector<boost::asio::const_buffer> const&) override
{
}
void
onClose (Session& session,
boost::system::error_code const&) override
@@ -328,6 +334,12 @@ public:
{
}
void
onWSMessage(std::shared_ptr<WSSession> session,
std::vector<boost::asio::const_buffer> const& buffers) override
{
}
void
onClose (Session& session,
boost::system::error_code const&) override