Files
rippled/include/xrpl/server/detail/BaseWSPeer.h
2025-10-23 11:04:30 -04:00

525 lines
13 KiB
C++

#ifndef XRPL_SERVER_BASEWSPEER_H_INCLUDED
#define XRPL_SERVER_BASEWSPEER_H_INCLUDED
#include <xrpl/basics/safe_cast.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/beast/utility/rngfill.h>
#include <xrpl/crypto/csprng.h>
#include <xrpl/protocol/BuildInfo.h>
#include <xrpl/server/WSSession.h>
#include <xrpl/server/detail/BasePeer.h>
#include <xrpl/server/detail/LowestLayer.h>
#include <boost/asio/error.hpp>
#include <boost/beast/core/multi_buffer.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/logic/tribool.hpp>
#include <functional>
#include <list>
namespace ripple {
/** Represents an active WebSocket connection. */
template <class Handler, class Impl>
class BaseWSPeer : public BasePeer<Handler, Impl>, public WSSession
{
protected:
using clock_type = std::chrono::system_clock;
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
using BasePeer<Handler, Impl>::strand_;
private:
friend class BasePeer<Handler, Impl>;
http_request_type request_;
boost::beast::multi_buffer rb_;
boost::beast::multi_buffer wb_;
std::list<std::shared_ptr<WSMsg>> wq_;
/// The socket has been closed, or will close after the next write
/// finishes. Do not do any more writes, and don't try to close
/// again.
bool do_close_ = false;
boost::beast::websocket::close_reason cr_;
waitable_timer timer_;
bool close_on_timer_ = false;
bool ping_active_ = false;
boost::beast::websocket::ping_data payload_;
error_code ec_;
std::function<
void(boost::beast::websocket::frame_type, boost::beast::string_view)>
control_callback_;
public:
template <class Body, class Headers>
BaseWSPeer(
Port const& port,
Handler& handler,
boost::asio::executor const& executor,
waitable_timer timer,
endpoint_type remote_address,
boost::beast::http::request<Body, Headers>&& request,
beast::Journal journal);
void
run() override;
//
// WSSession
//
Port const&
port() const override
{
return this->port_;
}
http_request_type const&
request() const override
{
return this->request_;
}
boost::asio::ip::tcp::endpoint const&
remote_endpoint() const override
{
return this->remote_address_;
}
void
send(std::shared_ptr<WSMsg> w) override;
void
close() override;
void
close(boost::beast::websocket::close_reason const& reason) override;
void
complete() override;
protected:
Impl&
impl()
{
return *static_cast<Impl*>(this);
}
void
on_ws_handshake(error_code const& ec);
void
do_write();
void
on_write(error_code const& ec);
void
on_write_fin(error_code const& ec);
void
do_read();
void
on_read(error_code const& ec);
void
on_close(error_code const& ec);
void
start_timer();
void
cancel_timer();
void
on_ping(error_code const& ec);
void
on_ping_pong(
boost::beast::websocket::frame_type kind,
boost::beast::string_view payload);
void
on_timer(error_code ec);
template <class String>
void
fail(error_code ec, String const& what);
};
//------------------------------------------------------------------------------
template <class Handler, class Impl>
template <class Body, class Headers>
BaseWSPeer<Handler, Impl>::BaseWSPeer(
Port const& port,
Handler& handler,
boost::asio::executor const& executor,
waitable_timer timer,
endpoint_type remote_address,
boost::beast::http::request<Body, Headers>&& request,
beast::Journal journal)
: BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
, request_(std::move(request))
, timer_(std::move(timer))
, payload_("12345678") // ensures size is 8 bytes
{
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::run()
{
if (!strand_.running_in_this_thread())
return post(
strand_, std::bind(&BaseWSPeer::run, impl().shared_from_this()));
impl().ws_.set_option(port().pmd_options);
// Must manage the control callback memory outside of the `control_callback`
// function
control_callback_ = std::bind(
&BaseWSPeer::on_ping_pong,
this,
std::placeholders::_1,
std::placeholders::_2);
impl().ws_.control_callback(control_callback_);
start_timer();
close_on_timer_ = true;
impl().ws_.set_option(
boost::beast::websocket::stream_base::decorator([](auto& res) {
res.set(
boost::beast::http::field::server,
BuildInfo::getFullVersionString());
}));
impl().ws_.async_accept(
request_,
bind_executor(
strand_,
std::bind(
&BaseWSPeer::on_ws_handshake,
impl().shared_from_this(),
std::placeholders::_1)));
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::send(std::shared_ptr<WSMsg> w)
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(
&BaseWSPeer::send, impl().shared_from_this(), std::move(w)));
if (do_close_)
return;
if (wq_.size() > port().ws_queue_limit)
{
cr_.code = safe_cast<decltype(cr_.code)>(
boost::beast::websocket::close_code::policy_error);
cr_.reason = "Policy error: client is too slow.";
JLOG(this->j_.info()) << cr_.reason;
wq_.erase(std::next(wq_.begin()), wq_.end());
close(cr_);
return;
}
wq_.emplace_back(std::move(w));
if (wq_.size() == 1)
on_write({});
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::close()
{
close(boost::beast::websocket::close_reason{});
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::close(
boost::beast::websocket::close_reason const& reason)
{
if (!strand_.running_in_this_thread())
return post(strand_, [self = impl().shared_from_this(), reason] {
self->close(reason);
});
if (do_close_)
return;
do_close_ = true;
if (wq_.empty())
{
impl().ws_.async_close(
reason,
bind_executor(
strand_,
[self = impl().shared_from_this()](
boost::beast::error_code const& ec) {
self->on_close(ec);
}));
}
else
{
cr_ = reason;
}
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::complete()
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(&BaseWSPeer::complete, impl().shared_from_this()));
do_read();
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_ws_handshake(error_code const& ec)
{
if (ec)
return fail(ec, "on_ws_handshake");
close_on_timer_ = false;
do_read();
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::do_write()
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
on_write({});
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_write(error_code const& ec)
{
if (ec)
return fail(ec, "write");
auto& w = *wq_.front();
auto const result = w.prepare(
65536, std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
if (boost::indeterminate(result.first))
return;
start_timer();
if (!result.first)
impl().ws_.async_write_some(
static_cast<bool>(result.first),
result.second,
bind_executor(
strand_,
std::bind(
&BaseWSPeer::on_write,
impl().shared_from_this(),
std::placeholders::_1)));
else
impl().ws_.async_write_some(
static_cast<bool>(result.first),
result.second,
bind_executor(
strand_,
std::bind(
&BaseWSPeer::on_write_fin,
impl().shared_from_this(),
std::placeholders::_1)));
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_write_fin(error_code const& ec)
{
if (ec)
return fail(ec, "write_fin");
wq_.pop_front();
if (do_close_)
{
impl().ws_.async_close(
cr_,
bind_executor(
strand_,
std::bind(
&BaseWSPeer::on_close,
impl().shared_from_this(),
std::placeholders::_1)));
}
else if (!wq_.empty())
on_write({});
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::do_read()
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(&BaseWSPeer::do_read, impl().shared_from_this()));
impl().ws_.async_read(
rb_,
bind_executor(
strand_,
std::bind(
&BaseWSPeer::on_read,
impl().shared_from_this(),
std::placeholders::_1)));
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_read(error_code const& ec)
{
if (ec == boost::beast::websocket::error::closed)
return on_close({});
if (ec)
return fail(ec, "read");
auto const& data = rb_.data();
std::vector<boost::asio::const_buffer> b;
b.reserve(std::distance(data.begin(), data.end()));
std::copy(data.begin(), data.end(), std::back_inserter(b));
this->handler_.onWSMessage(impl().shared_from_this(), b);
rb_.consume(rb_.size());
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_close(error_code const& ec)
{
cancel_timer();
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::start_timer()
{
// Max seconds without completing a message
static constexpr std::chrono::seconds timeout{30};
static constexpr std::chrono::seconds timeoutLocal{3};
try
{
timer_.expires_after(
remote_endpoint().address().is_loopback() ? timeoutLocal : timeout);
}
catch (boost::system::system_error const& e)
{
return fail(e.code(), "start_timer");
}
timer_.async_wait(bind_executor(
strand_,
std::bind(
&BaseWSPeer<Handler, Impl>::on_timer,
impl().shared_from_this(),
std::placeholders::_1)));
}
// Convenience for discarding the error code
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::cancel_timer()
{
try
{
timer_.cancel();
}
catch (boost::system::system_error const&)
{
// ignored
}
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_ping(error_code const& ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
ping_active_ = false;
if (!ec)
return;
fail(ec, "on_ping");
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_ping_pong(
boost::beast::websocket::frame_type kind,
boost::beast::string_view payload)
{
if (kind == boost::beast::websocket::frame_type::pong)
{
boost::beast::string_view p(payload_.begin());
if (payload == p)
{
close_on_timer_ = false;
JLOG(this->j_.trace()) << "got matching pong";
}
else
{
JLOG(this->j_.trace()) << "got pong";
}
}
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_timer(error_code ec)
{
if (ec == boost::asio::error::operation_aborted)
return;
if (!ec)
{
if (!close_on_timer_ || !ping_active_)
{
start_timer();
close_on_timer_ = true;
ping_active_ = true;
// cryptographic is probably overkill..
beast::rngfill(payload_.begin(), payload_.size(), crypto_prng());
impl().ws_.async_ping(
payload_,
bind_executor(
strand_,
std::bind(
&BaseWSPeer::on_ping,
impl().shared_from_this(),
std::placeholders::_1)));
JLOG(this->j_.trace()) << "sent ping";
return;
}
ec = boost::system::errc::make_error_code(
boost::system::errc::timed_out);
}
fail(ec, "timer");
}
template <class Handler, class Impl>
template <class String>
void
BaseWSPeer<Handler, Impl>::fail(error_code ec, String const& what)
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::BaseWSPeer::fail : strand in this thread");
cancel_timer();
if (!ec_ && ec != boost::asio::error::operation_aborted)
{
ec_ = ec;
JLOG(this->j_.trace()) << what << ": " << ec.message();
ripple::get_lowest_layer(impl().ws_).socket().close(ec);
}
}
} // namespace ripple
#endif