mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Per XLS-0095, we are taking steps to rename ripple(d) to xrpl(d). This change specifically removes all copyright notices referencing Ripple, XRPLF, and certain affiliated contributors upon mutual agreement, so the notice in the LICENSE.md file applies throughout. Copyright notices referencing external contributions remain as-is. Duplicate verbiage is also removed.
525 lines
13 KiB
C++
525 lines
13 KiB
C++
#ifndef XRPL_SERVER_BASEWSPEER_H_INCLUDED
|
|
#define XRPL_SERVER_BASEWSPEER_H_INCLUDED
|
|
|
|
#include <xrpl/basics/safe_cast.h>
|
|
#include <xrpl/beast/utility/instrumentation.h>
|
|
#include <xrpl/beast/utility/rngfill.h>
|
|
#include <xrpl/crypto/csprng.h>
|
|
#include <xrpl/protocol/BuildInfo.h>
|
|
#include <xrpl/server/WSSession.h>
|
|
#include <xrpl/server/detail/BasePeer.h>
|
|
#include <xrpl/server/detail/LowestLayer.h>
|
|
|
|
#include <boost/asio/error.hpp>
|
|
#include <boost/beast/core/multi_buffer.hpp>
|
|
#include <boost/beast/http/message.hpp>
|
|
#include <boost/beast/websocket.hpp>
|
|
#include <boost/logic/tribool.hpp>
|
|
|
|
#include <functional>
|
|
#include <list>
|
|
|
|
namespace ripple {
|
|
|
|
/** Represents an active WebSocket connection. */
|
|
template <class Handler, class Impl>
|
|
class BaseWSPeer : public BasePeer<Handler, Impl>, public WSSession
|
|
{
|
|
protected:
|
|
using clock_type = std::chrono::system_clock;
|
|
using error_code = boost::system::error_code;
|
|
using endpoint_type = boost::asio::ip::tcp::endpoint;
|
|
using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
|
|
using BasePeer<Handler, Impl>::strand_;
|
|
|
|
private:
|
|
friend class BasePeer<Handler, Impl>;
|
|
|
|
http_request_type request_;
|
|
boost::beast::multi_buffer rb_;
|
|
boost::beast::multi_buffer wb_;
|
|
std::list<std::shared_ptr<WSMsg>> wq_;
|
|
/// The socket has been closed, or will close after the next write
|
|
/// finishes. Do not do any more writes, and don't try to close
|
|
/// again.
|
|
bool do_close_ = false;
|
|
boost::beast::websocket::close_reason cr_;
|
|
waitable_timer timer_;
|
|
bool close_on_timer_ = false;
|
|
bool ping_active_ = false;
|
|
boost::beast::websocket::ping_data payload_;
|
|
error_code ec_;
|
|
std::function<
|
|
void(boost::beast::websocket::frame_type, boost::beast::string_view)>
|
|
control_callback_;
|
|
|
|
public:
|
|
template <class Body, class Headers>
|
|
BaseWSPeer(
|
|
Port const& port,
|
|
Handler& handler,
|
|
boost::asio::executor const& executor,
|
|
waitable_timer timer,
|
|
endpoint_type remote_address,
|
|
boost::beast::http::request<Body, Headers>&& request,
|
|
beast::Journal journal);
|
|
|
|
void
|
|
run() override;
|
|
|
|
//
|
|
// WSSession
|
|
//
|
|
|
|
Port const&
|
|
port() const override
|
|
{
|
|
return this->port_;
|
|
}
|
|
|
|
http_request_type const&
|
|
request() const override
|
|
{
|
|
return this->request_;
|
|
}
|
|
|
|
boost::asio::ip::tcp::endpoint const&
|
|
remote_endpoint() const override
|
|
{
|
|
return this->remote_address_;
|
|
}
|
|
|
|
void
|
|
send(std::shared_ptr<WSMsg> w) override;
|
|
|
|
void
|
|
close() override;
|
|
|
|
void
|
|
close(boost::beast::websocket::close_reason const& reason) override;
|
|
|
|
void
|
|
complete() override;
|
|
|
|
protected:
|
|
Impl&
|
|
impl()
|
|
{
|
|
return *static_cast<Impl*>(this);
|
|
}
|
|
|
|
void
|
|
on_ws_handshake(error_code const& ec);
|
|
|
|
void
|
|
do_write();
|
|
|
|
void
|
|
on_write(error_code const& ec);
|
|
|
|
void
|
|
on_write_fin(error_code const& ec);
|
|
|
|
void
|
|
do_read();
|
|
|
|
void
|
|
on_read(error_code const& ec);
|
|
|
|
void
|
|
on_close(error_code const& ec);
|
|
|
|
void
|
|
start_timer();
|
|
|
|
void
|
|
cancel_timer();
|
|
|
|
void
|
|
on_ping(error_code const& ec);
|
|
|
|
void
|
|
on_ping_pong(
|
|
boost::beast::websocket::frame_type kind,
|
|
boost::beast::string_view payload);
|
|
|
|
void
|
|
on_timer(error_code ec);
|
|
|
|
template <class String>
|
|
void
|
|
fail(error_code ec, String const& what);
|
|
};
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
template <class Handler, class Impl>
|
|
template <class Body, class Headers>
|
|
BaseWSPeer<Handler, Impl>::BaseWSPeer(
|
|
Port const& port,
|
|
Handler& handler,
|
|
boost::asio::executor const& executor,
|
|
waitable_timer timer,
|
|
endpoint_type remote_address,
|
|
boost::beast::http::request<Body, Headers>&& request,
|
|
beast::Journal journal)
|
|
: BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
|
|
, request_(std::move(request))
|
|
, timer_(std::move(timer))
|
|
, payload_("12345678") // ensures size is 8 bytes
|
|
{
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::run()
|
|
{
|
|
if (!strand_.running_in_this_thread())
|
|
return post(
|
|
strand_, std::bind(&BaseWSPeer::run, impl().shared_from_this()));
|
|
impl().ws_.set_option(port().pmd_options);
|
|
// Must manage the control callback memory outside of the `control_callback`
|
|
// function
|
|
control_callback_ = std::bind(
|
|
&BaseWSPeer::on_ping_pong,
|
|
this,
|
|
std::placeholders::_1,
|
|
std::placeholders::_2);
|
|
impl().ws_.control_callback(control_callback_);
|
|
start_timer();
|
|
close_on_timer_ = true;
|
|
impl().ws_.set_option(
|
|
boost::beast::websocket::stream_base::decorator([](auto& res) {
|
|
res.set(
|
|
boost::beast::http::field::server,
|
|
BuildInfo::getFullVersionString());
|
|
}));
|
|
impl().ws_.async_accept(
|
|
request_,
|
|
bind_executor(
|
|
strand_,
|
|
std::bind(
|
|
&BaseWSPeer::on_ws_handshake,
|
|
impl().shared_from_this(),
|
|
std::placeholders::_1)));
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::send(std::shared_ptr<WSMsg> w)
|
|
{
|
|
if (!strand_.running_in_this_thread())
|
|
return post(
|
|
strand_,
|
|
std::bind(
|
|
&BaseWSPeer::send, impl().shared_from_this(), std::move(w)));
|
|
if (do_close_)
|
|
return;
|
|
if (wq_.size() > port().ws_queue_limit)
|
|
{
|
|
cr_.code = safe_cast<decltype(cr_.code)>(
|
|
boost::beast::websocket::close_code::policy_error);
|
|
cr_.reason = "Policy error: client is too slow.";
|
|
JLOG(this->j_.info()) << cr_.reason;
|
|
wq_.erase(std::next(wq_.begin()), wq_.end());
|
|
close(cr_);
|
|
return;
|
|
}
|
|
wq_.emplace_back(std::move(w));
|
|
if (wq_.size() == 1)
|
|
on_write({});
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::close()
|
|
{
|
|
close(boost::beast::websocket::close_reason{});
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::close(
|
|
boost::beast::websocket::close_reason const& reason)
|
|
{
|
|
if (!strand_.running_in_this_thread())
|
|
return post(strand_, [self = impl().shared_from_this(), reason] {
|
|
self->close(reason);
|
|
});
|
|
if (do_close_)
|
|
return;
|
|
do_close_ = true;
|
|
if (wq_.empty())
|
|
{
|
|
impl().ws_.async_close(
|
|
reason,
|
|
bind_executor(
|
|
strand_,
|
|
[self = impl().shared_from_this()](
|
|
boost::beast::error_code const& ec) {
|
|
self->on_close(ec);
|
|
}));
|
|
}
|
|
else
|
|
{
|
|
cr_ = reason;
|
|
}
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::complete()
|
|
{
|
|
if (!strand_.running_in_this_thread())
|
|
return post(
|
|
strand_,
|
|
std::bind(&BaseWSPeer::complete, impl().shared_from_this()));
|
|
do_read();
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::on_ws_handshake(error_code const& ec)
|
|
{
|
|
if (ec)
|
|
return fail(ec, "on_ws_handshake");
|
|
close_on_timer_ = false;
|
|
do_read();
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::do_write()
|
|
{
|
|
if (!strand_.running_in_this_thread())
|
|
return post(
|
|
strand_,
|
|
std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
|
|
on_write({});
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::on_write(error_code const& ec)
|
|
{
|
|
if (ec)
|
|
return fail(ec, "write");
|
|
auto& w = *wq_.front();
|
|
auto const result = w.prepare(
|
|
65536, std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
|
|
if (boost::indeterminate(result.first))
|
|
return;
|
|
start_timer();
|
|
if (!result.first)
|
|
impl().ws_.async_write_some(
|
|
static_cast<bool>(result.first),
|
|
result.second,
|
|
bind_executor(
|
|
strand_,
|
|
std::bind(
|
|
&BaseWSPeer::on_write,
|
|
impl().shared_from_this(),
|
|
std::placeholders::_1)));
|
|
else
|
|
impl().ws_.async_write_some(
|
|
static_cast<bool>(result.first),
|
|
result.second,
|
|
bind_executor(
|
|
strand_,
|
|
std::bind(
|
|
&BaseWSPeer::on_write_fin,
|
|
impl().shared_from_this(),
|
|
std::placeholders::_1)));
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::on_write_fin(error_code const& ec)
|
|
{
|
|
if (ec)
|
|
return fail(ec, "write_fin");
|
|
wq_.pop_front();
|
|
if (do_close_)
|
|
{
|
|
impl().ws_.async_close(
|
|
cr_,
|
|
bind_executor(
|
|
strand_,
|
|
std::bind(
|
|
&BaseWSPeer::on_close,
|
|
impl().shared_from_this(),
|
|
std::placeholders::_1)));
|
|
}
|
|
else if (!wq_.empty())
|
|
on_write({});
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::do_read()
|
|
{
|
|
if (!strand_.running_in_this_thread())
|
|
return post(
|
|
strand_,
|
|
std::bind(&BaseWSPeer::do_read, impl().shared_from_this()));
|
|
impl().ws_.async_read(
|
|
rb_,
|
|
bind_executor(
|
|
strand_,
|
|
std::bind(
|
|
&BaseWSPeer::on_read,
|
|
impl().shared_from_this(),
|
|
std::placeholders::_1)));
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::on_read(error_code const& ec)
|
|
{
|
|
if (ec == boost::beast::websocket::error::closed)
|
|
return on_close({});
|
|
if (ec)
|
|
return fail(ec, "read");
|
|
auto const& data = rb_.data();
|
|
std::vector<boost::asio::const_buffer> b;
|
|
b.reserve(std::distance(data.begin(), data.end()));
|
|
std::copy(data.begin(), data.end(), std::back_inserter(b));
|
|
this->handler_.onWSMessage(impl().shared_from_this(), b);
|
|
rb_.consume(rb_.size());
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::on_close(error_code const& ec)
|
|
{
|
|
cancel_timer();
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::start_timer()
|
|
{
|
|
// Max seconds without completing a message
|
|
static constexpr std::chrono::seconds timeout{30};
|
|
static constexpr std::chrono::seconds timeoutLocal{3};
|
|
|
|
try
|
|
{
|
|
timer_.expires_after(
|
|
remote_endpoint().address().is_loopback() ? timeoutLocal : timeout);
|
|
}
|
|
catch (boost::system::system_error const& e)
|
|
{
|
|
return fail(e.code(), "start_timer");
|
|
}
|
|
|
|
timer_.async_wait(bind_executor(
|
|
strand_,
|
|
std::bind(
|
|
&BaseWSPeer<Handler, Impl>::on_timer,
|
|
impl().shared_from_this(),
|
|
std::placeholders::_1)));
|
|
}
|
|
|
|
// Convenience for discarding the error code
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::cancel_timer()
|
|
{
|
|
try
|
|
{
|
|
timer_.cancel();
|
|
}
|
|
catch (boost::system::system_error const&)
|
|
{
|
|
// ignored
|
|
}
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::on_ping(error_code const& ec)
|
|
{
|
|
if (ec == boost::asio::error::operation_aborted)
|
|
return;
|
|
ping_active_ = false;
|
|
if (!ec)
|
|
return;
|
|
fail(ec, "on_ping");
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::on_ping_pong(
|
|
boost::beast::websocket::frame_type kind,
|
|
boost::beast::string_view payload)
|
|
{
|
|
if (kind == boost::beast::websocket::frame_type::pong)
|
|
{
|
|
boost::beast::string_view p(payload_.begin());
|
|
if (payload == p)
|
|
{
|
|
close_on_timer_ = false;
|
|
JLOG(this->j_.trace()) << "got matching pong";
|
|
}
|
|
else
|
|
{
|
|
JLOG(this->j_.trace()) << "got pong";
|
|
}
|
|
}
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::on_timer(error_code ec)
|
|
{
|
|
if (ec == boost::asio::error::operation_aborted)
|
|
return;
|
|
if (!ec)
|
|
{
|
|
if (!close_on_timer_ || !ping_active_)
|
|
{
|
|
start_timer();
|
|
close_on_timer_ = true;
|
|
ping_active_ = true;
|
|
// cryptographic is probably overkill..
|
|
beast::rngfill(payload_.begin(), payload_.size(), crypto_prng());
|
|
impl().ws_.async_ping(
|
|
payload_,
|
|
bind_executor(
|
|
strand_,
|
|
std::bind(
|
|
&BaseWSPeer::on_ping,
|
|
impl().shared_from_this(),
|
|
std::placeholders::_1)));
|
|
JLOG(this->j_.trace()) << "sent ping";
|
|
return;
|
|
}
|
|
ec = boost::system::errc::make_error_code(
|
|
boost::system::errc::timed_out);
|
|
}
|
|
fail(ec, "timer");
|
|
}
|
|
|
|
template <class Handler, class Impl>
|
|
template <class String>
|
|
void
|
|
BaseWSPeer<Handler, Impl>::fail(error_code ec, String const& what)
|
|
{
|
|
XRPL_ASSERT(
|
|
strand_.running_in_this_thread(),
|
|
"ripple::BaseWSPeer::fail : strand in this thread");
|
|
|
|
cancel_timer();
|
|
if (!ec_ && ec != boost::asio::error::operation_aborted)
|
|
{
|
|
ec_ = ec;
|
|
JLOG(this->j_.trace()) << what << ": " << ec.message();
|
|
ripple::get_lowest_layer(impl().ws_).socket().close(ec);
|
|
}
|
|
}
|
|
|
|
} // namespace ripple
|
|
|
|
#endif
|