1#ifndef XRPL_SERVER_BASEWSPEER_H_INCLUDED
2#define XRPL_SERVER_BASEWSPEER_H_INCLUDED
4#include <xrpl/basics/safe_cast.h>
5#include <xrpl/beast/utility/instrumentation.h>
6#include <xrpl/beast/utility/rngfill.h>
7#include <xrpl/crypto/csprng.h>
8#include <xrpl/protocol/BuildInfo.h>
9#include <xrpl/server/WSSession.h>
10#include <xrpl/server/detail/BasePeer.h>
11#include <xrpl/server/detail/LowestLayer.h>
13#include <boost/asio/error.hpp>
14#include <boost/beast/core/multi_buffer.hpp>
15#include <boost/beast/http/message.hpp>
16#include <boost/beast/websocket.hpp>
17#include <boost/logic/tribool.hpp>
25template <
class Handler,
class Impl>
36 friend class BasePeer<Handler, Impl>;
39 boost::beast::multi_buffer
rb_;
40 boost::beast::multi_buffer
wb_;
46 boost::beast::websocket::close_reason
cr_;
53 void(boost::beast::websocket::frame_type, boost::beast::string_view)>
57 template <
class Body,
class Headers>
61 boost::asio::executor
const& executor,
64 boost::beast::http::request<Body, Headers>&&
request,
86 boost::asio::ip::tcp::endpoint
const&
99 close(boost::beast::websocket::close_reason
const& reason)
override;
108 return *
static_cast<Impl*
>(
this);
143 boost::beast::websocket::frame_type kind,
144 boost::beast::string_view payload);
149 template <
class String>
156template <
class Handler,
class Impl>
157template <
class Body,
class Headers>
161 boost::asio::executor
const& executor,
164 boost::beast::http::request<Body, Headers>&& request,
166 :
BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
167 , request_(
std::move(request))
168 , timer_(
std::move(timer))
169 , payload_(
"12345678")
173template <
class Handler,
class Impl>
177 if (!strand_.running_in_this_thread())
180 impl().ws_.set_option(port().pmd_options);
186 std::placeholders::_1,
187 std::placeholders::_2);
188 impl().ws_.control_callback(control_callback_);
190 close_on_timer_ =
true;
191 impl().ws_.set_option(
192 boost::beast::websocket::stream_base::decorator([](
auto& res) {
194 boost::beast::http::field::server,
197 impl().ws_.async_accept(
203 impl().shared_from_this(),
204 std::placeholders::_1)));
207template <
class Handler,
class Impl>
211 if (!strand_.running_in_this_thread())
218 if (wq_.size() > port().ws_queue_limit)
220 cr_.code =
safe_cast<
decltype(cr_.code)>(
221 boost::beast::websocket::close_code::policy_error);
222 cr_.reason =
"Policy error: client is too slow.";
223 JLOG(this->j_.
info()) << cr_.reason;
224 wq_.erase(
std::next(wq_.begin()), wq_.end());
228 wq_.emplace_back(std::move(w));
233template <
class Handler,
class Impl>
237 close(boost::beast::websocket::close_reason{});
240template <
class Handler,
class Impl>
243 boost::beast::websocket::close_reason
const& reason)
245 if (!strand_.running_in_this_thread())
246 return post(strand_, [self = impl().shared_from_this(), reason] {
254 impl().ws_.async_close(
258 [self = impl().shared_from_this()](
259 boost::beast::error_code
const& ec) {
269template <
class Handler,
class Impl>
273 if (!strand_.running_in_this_thread())
280template <
class Handler,
class Impl>
285 return fail(ec,
"on_ws_handshake");
286 close_on_timer_ =
false;
290template <
class Handler,
class Impl>
294 if (!strand_.running_in_this_thread())
301template <
class Handler,
class Impl>
306 return fail(ec,
"write");
307 auto& w = *wq_.front();
308 auto const result = w.prepare(
310 if (boost::indeterminate(result.first))
314 impl().ws_.async_write_some(
315 static_cast<bool>(result.first),
321 impl().shared_from_this(),
322 std::placeholders::_1)));
324 impl().ws_.async_write_some(
325 static_cast<bool>(result.first),
331 impl().shared_from_this(),
332 std::placeholders::_1)));
335template <
class Handler,
class Impl>
340 return fail(ec,
"write_fin");
344 impl().ws_.async_close(
350 impl().shared_from_this(),
351 std::placeholders::_1)));
353 else if (!wq_.empty())
357template <
class Handler,
class Impl>
361 if (!strand_.running_in_this_thread())
365 impl().ws_.async_read(
371 impl().shared_from_this(),
372 std::placeholders::_1)));
375template <
class Handler,
class Impl>
379 if (ec == boost::beast::websocket::error::closed)
382 return fail(ec,
"read");
383 auto const& data = rb_.data();
387 this->handler_.onWSMessage(impl().shared_from_this(), b);
388 rb_.consume(rb_.size());
391template <
class Handler,
class Impl>
398template <
class Handler,
class Impl>
408 timer_.expires_after(
409 remote_endpoint().address().is_loopback() ? timeoutLocal : timeout);
411 catch (boost::system::system_error
const& e)
413 return fail(e.code(),
"start_timer");
416 timer_.async_wait(bind_executor(
420 impl().shared_from_this(),
421 std::placeholders::_1)));
425template <
class Handler,
class Impl>
433 catch (boost::system::system_error
const&)
439template <
class Handler,
class Impl>
443 if (ec == boost::asio::error::operation_aborted)
445 ping_active_ =
false;
451template <
class Handler,
class Impl>
454 boost::beast::websocket::frame_type kind,
455 boost::beast::string_view payload)
457 if (kind == boost::beast::websocket::frame_type::pong)
459 boost::beast::string_view p(payload_.begin());
462 close_on_timer_ =
false;
463 JLOG(this->j_.
trace()) <<
"got matching pong";
467 JLOG(this->j_.
trace()) <<
"got pong";
472template <
class Handler,
class Impl>
476 if (ec == boost::asio::error::operation_aborted)
480 if (!close_on_timer_ || !ping_active_)
483 close_on_timer_ =
true;
487 impl().ws_.async_ping(
493 impl().shared_from_this(),
494 std::placeholders::_1)));
495 JLOG(this->j_.
trace()) <<
"sent ping";
498 ec = boost::system::errc::make_error_code(
499 boost::system::errc::timed_out);
504template <
class Handler,
class Impl>
505template <
class String>
510 strand_.running_in_this_thread(),
511 "ripple::BaseWSPeer::fail : strand in this thread");
514 if (!ec_ && ec != boost::asio::error::operation_aborted)
517 JLOG(this->j_.
trace()) << what <<
": " << ec.message();
T back_inserter(T... args)
A generic endpoint for log messages.
Stream trace() const
Severity stream access functions.
endpoint_type remote_address_
boost::asio::strand< boost::asio::executor > strand_
Represents an active WebSocket connection.
BaseWSPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, waitable_timer timer, endpoint_type remote_address, boost::beast::http::request< Body, Headers > &&request, beast::Journal journal)
boost::asio::ip::tcp::endpoint endpoint_type
boost::system::error_code error_code
http_request_type const & request() const override
std::function< void(boost::beast::websocket::frame_type, boost::beast::string_view)> control_callback_
void fail(error_code ec, String const &what)
bool do_close_
The socket has been closed, or will close after the next write finishes.
void close(boost::beast::websocket::close_reason const &reason) override
http_request_type request_
void on_read(error_code const &ec)
void on_ws_handshake(error_code const &ec)
boost::beast::multi_buffer wb_
void on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload)
boost::asio::basic_waitable_timer< clock_type > waitable_timer
void on_close(error_code const &ec)
void on_write_fin(error_code const &ec)
boost::asio::ip::tcp::endpoint const & remote_endpoint() const override
Port const & port() const override
void complete() override
Indicate that the response is complete.
boost::beast::websocket::close_reason cr_
void send(std::shared_ptr< WSMsg > w) override
Send a WebSockets message.
std::list< std::shared_ptr< WSMsg > > wq_
void on_timer(error_code ec)
void on_ping(error_code const &ec)
boost::beast::multi_buffer rb_
void on_write(error_code const &ec)
boost::beast::websocket::ping_data payload_
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
std::string const & getFullVersionString()
Full server version string.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
constexpr std::enable_if_t< std::is_integral_v< Dest > &&std::is_integral_v< Src >, Dest > safe_cast(Src s) noexcept
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
decltype(auto) get_lowest_layer(T &t) noexcept
Configuration information for a Server listening port.