#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace xrpl { /** Represents an active WebSocket connection. */ template class BaseWSPeer : public BasePeer, public WSSession { protected: using clock_type = std::chrono::system_clock; using error_code = boost::system::error_code; using endpoint_type = boost::asio::ip::tcp::endpoint; using waitable_timer = boost::asio::basic_waitable_timer; using BasePeer::strand_; private: friend class BasePeer; http_request_type request_; boost::beast::multi_buffer rb_; boost::beast::multi_buffer wb_; std::list> wq_; /// The socket has been closed, or will close after the next write /// finishes. Do not do any more writes, and don't try to close /// again. bool do_close_ = false; boost::beast::websocket::close_reason cr_; waitable_timer timer_; bool close_on_timer_ = false; bool ping_active_ = false; boost::beast::websocket::ping_data payload_; error_code ec_; std::function control_callback_; public: template BaseWSPeer( Port const& port, Handler& handler, boost::asio::executor const& executor, waitable_timer timer, endpoint_type remoteAddress, boost::beast::http::request&& request, beast::Journal journal); void run() override; // // WSSession // [[nodiscard]] Port const& port() const override { return this->port_; } [[nodiscard]] http_request_type const& request() const override { return this->request_; } [[nodiscard]] boost::asio::ip::tcp::endpoint const& remoteEndpoint() const override { return this->remote_address_; } void send(std::shared_ptr w) override; void close() override; void close(boost::beast::websocket::close_reason const& reason) override; void complete() override; protected: Impl& impl() { return *static_cast(this); } void onWsHandshake(error_code const& ec); void doWrite(); void onWrite(error_code const& ec); void onWriteFin(error_code const& ec); void doRead(); void onRead(error_code const& ec); void onClose(error_code const& ec); void startTimer(); void cancelTimer(); void onPing(error_code const& ec); void onPingPong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload); void onTimer(error_code ec); template void fail(error_code ec, String const& what); }; //------------------------------------------------------------------------------ template template BaseWSPeer::BaseWSPeer( Port const& port, Handler& handler, boost::asio::executor const& executor, waitable_timer timer, endpoint_type remoteAddress, boost::beast::http::request&& request, beast::Journal journal) : BasePeer(port, handler, executor, remoteAddress, journal) , request_(std::move(request)) , timer_(std::move(timer)) , payload_("12345678") // ensures size is 8 bytes { } template void BaseWSPeer::run() { if (!strand_.running_in_this_thread()) return post(strand_, std::bind(&BaseWSPeer::run, impl().shared_from_this())); impl().ws_.set_option(port().pmd_options); // Must manage the control callback memory outside of the `control_callback` // function control_callback_ = std::bind(&BaseWSPeer::onPingPong, this, std::placeholders::_1, std::placeholders::_2); impl().ws_.control_callback(control_callback_); startTimer(); close_on_timer_ = true; impl().ws_.set_option(boost::beast::websocket::stream_base::decorator([](auto& res) { res.set(boost::beast::http::field::server, BuildInfo::getFullVersionString()); })); impl().ws_.async_accept( request_, bind_executor( strand_, std::bind( &BaseWSPeer::onWsHandshake, impl().shared_from_this(), std::placeholders::_1))); } template void BaseWSPeer::send(std::shared_ptr w) { if (!strand_.running_in_this_thread()) return post(strand_, std::bind(&BaseWSPeer::send, impl().shared_from_this(), std::move(w))); if (do_close_) return; if (wq_.size() > port().ws_queue_limit) { cr_.code = safeCast(boost::beast::websocket::close_code::policy_error); cr_.reason = "Policy error: client is too slow."; JLOG(this->j_.info()) << cr_.reason; wq_.erase(std::next(wq_.begin()), wq_.end()); close(cr_); return; } wq_.emplace_back(std::move(w)); if (wq_.size() == 1) onWrite({}); } template void BaseWSPeer::close() { close(boost::beast::websocket::close_reason{}); } template void BaseWSPeer::close(boost::beast::websocket::close_reason const& reason) { if (!strand_.running_in_this_thread()) return post(strand_, [self = impl().shared_from_this(), reason] { self->close(reason); }); if (do_close_) return; do_close_ = true; if (wq_.empty()) { impl().ws_.async_close( reason, bind_executor( strand_, [self = impl().shared_from_this()](boost::beast::error_code const& ec) { self->onClose(ec); })); } else { cr_ = reason; } } template void BaseWSPeer::complete() { if (!strand_.running_in_this_thread()) return post(strand_, std::bind(&BaseWSPeer::complete, impl().shared_from_this())); doRead(); } template void BaseWSPeer::onWsHandshake(error_code const& ec) { if (ec) return fail(ec, "on_ws_handshake"); close_on_timer_ = false; doRead(); } template void BaseWSPeer::doWrite() { if (!strand_.running_in_this_thread()) return post(strand_, std::bind(&BaseWSPeer::doWrite, impl().shared_from_this())); onWrite({}); } template void BaseWSPeer::onWrite(error_code const& ec) { if (ec) return fail(ec, "write"); auto& w = *wq_.front(); auto const result = w.prepare(65536, std::bind(&BaseWSPeer::doWrite, impl().shared_from_this())); if (boost::indeterminate(result.first)) return; startTimer(); if (!result.first) { impl().ws_.async_write_some( static_cast(result.first), result.second, bind_executor( strand_, std::bind(&BaseWSPeer::onWrite, impl().shared_from_this(), std::placeholders::_1))); } else { impl().ws_.async_write_some( static_cast(result.first), result.second, bind_executor( strand_, std::bind( &BaseWSPeer::onWriteFin, impl().shared_from_this(), std::placeholders::_1))); } } template void BaseWSPeer::onWriteFin(error_code const& ec) { if (ec) return fail(ec, "write_fin"); wq_.pop_front(); if (do_close_) { impl().ws_.async_close( cr_, bind_executor( strand_, std::bind(&BaseWSPeer::onClose, impl().shared_from_this(), std::placeholders::_1))); } else if (!wq_.empty()) { onWrite({}); } } template void BaseWSPeer::doRead() { if (!strand_.running_in_this_thread()) return post(strand_, std::bind(&BaseWSPeer::doRead, impl().shared_from_this())); impl().ws_.async_read( rb_, bind_executor( strand_, std::bind(&BaseWSPeer::onRead, impl().shared_from_this(), std::placeholders::_1))); } template void BaseWSPeer::onRead(error_code const& ec) { if (ec == boost::beast::websocket::error::closed) return onClose({}); if (ec) return fail(ec, "read"); auto const& data = rb_.data(); std::vector b; b.reserve(std::distance(data.begin(), data.end())); std::ranges::copy(data, std::back_inserter(b)); this->handler_.onWSMessage(impl().shared_from_this(), b); rb_.consume(rb_.size()); } template void BaseWSPeer::onClose(error_code const& ec) { cancelTimer(); } template void BaseWSPeer::startTimer() { // Max seconds without completing a message static constexpr std::chrono::seconds kTimeout{30}; static constexpr std::chrono::seconds kTimeoutLocal{3}; try { timer_.expires_after(remoteEndpoint().address().is_loopback() ? kTimeoutLocal : kTimeout); } catch (boost::system::system_error const& e) { return fail(e.code(), "start_timer"); } timer_.async_wait(bind_executor( strand_, std::bind( &BaseWSPeer::onTimer, impl().shared_from_this(), std::placeholders::_1))); } // Convenience for discarding the error code template void BaseWSPeer::cancelTimer() { try { timer_.cancel(); } catch (boost::system::system_error const&) // NOLINT(bugprone-empty-catch) { // ignored } } template void BaseWSPeer::onPing(error_code const& ec) { if (ec == boost::asio::error::operation_aborted) return; ping_active_ = false; if (!ec) return; fail(ec, "on_ping"); } template void BaseWSPeer::onPingPong( boost::beast::websocket::frame_type kind, boost::beast::string_view payload) { if (kind == boost::beast::websocket::frame_type::pong) { boost::beast::string_view const p(payload_.begin()); if (payload == p) { close_on_timer_ = false; JLOG(this->j_.trace()) << "got matching pong"; } else { JLOG(this->j_.trace()) << "got pong"; } } } template void BaseWSPeer::onTimer(error_code ec) { if (ec == boost::asio::error::operation_aborted) return; if (!ec) { if (!close_on_timer_ || !ping_active_) { startTimer(); close_on_timer_ = true; ping_active_ = true; // cryptographic is probably overkill.. beast::rngfill(payload_.begin(), payload_.size(), cryptoPrng()); impl().ws_.async_ping( payload_, bind_executor( strand_, std::bind( &BaseWSPeer::onPing, impl().shared_from_this(), std::placeholders::_1))); JLOG(this->j_.trace()) << "sent ping"; return; } ec = boost::system::errc::make_error_code(boost::system::errc::timed_out); } fail(ec, "timer"); } template template void BaseWSPeer::fail(error_code ec, String const& what) { XRPL_ASSERT(strand_.running_in_this_thread(), "xrpl::BaseWSPeer::fail : strand in this thread"); cancelTimer(); if (!ec_ && ec != boost::asio::error::operation_aborted) { ec_ = ec; JLOG(this->j_.trace()) << what << ": " << ec.message(); xrpl::getLowestLayer(impl().ws_).socket().close(ec); } } } // namespace xrpl