diff --git a/src/ripple/server/impl/BasePeer.h b/src/ripple/server/impl/BasePeer.h index 77874949c..e7eac1ade 100644 --- a/src/ripple/server/impl/BasePeer.h +++ b/src/ripple/server/impl/BasePeer.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -87,7 +88,7 @@ BasePeer(Port const& port, Handler& handler, , sink_(journal.sink(), [] { - static int id = 0; + static std::atomic id{0}; return "##" + std::to_string(++id) + " "; }()) , j_(sink_) diff --git a/src/ripple/server/impl/BaseWSPeer.h b/src/ripple/server/impl/BaseWSPeer.h index a1acb8e33..24e83309f 100644 --- a/src/ripple/server/impl/BaseWSPeer.h +++ b/src/ripple/server/impl/BaseWSPeer.h @@ -22,6 +22,8 @@ #include #include +#include +#include #include #include #include @@ -60,6 +62,9 @@ private: bool do_close_ = false; beast::websocket::close_reason cr_; waitable_timer timer_; + bool close_on_timer_ = false; + bool ping_active_ = false; + beast::websocket::ping_data payload_; public: template @@ -152,16 +157,19 @@ protected: void on_close(error_code const& ec); - virtual - void - do_close() = 0; - void start_timer(); void cancel_timer(); + void + on_ping(error_code const& ec); + + void + on_ping_pong(bool is_pong, + beast::websocket::ping_data const& payload); + void on_timer(error_code ec); }; @@ -195,7 +203,12 @@ run() &BaseWSPeer::run, impl().shared_from_this())); impl().ws_.set_option(beast::websocket::decorate(identity{})); impl().ws_.set_option(port().pmd_options); + impl().ws_.set_option(beast::websocket::ping_callback{ + std::bind(&BaseWSPeer::on_ping_pong, this, + std::placeholders::_1, std::placeholders::_2)}); using namespace beast::asio; + start_timer(); + close_on_timer_ = true; impl().ws_.async_accept(request_, strand_.wrap(std::bind( &BaseWSPeer::on_ws_handshake, impl().shared_from_this(), placeholders::error))); @@ -212,12 +225,17 @@ send(std::shared_ptr w) return strand_.post(std::bind( &BaseWSPeer::send, impl().shared_from_this(), std::move(w))); + if(do_close_) + return; if(wq_.size() >= limit) { + JLOG(this->j_.info()) << + "closing slow client"; cr_.code = static_cast(4000); cr_.reason = "Client is too slow."; - do_close_ = true; + static_assert(limit >= 1, ""); wq_.erase(std::next(wq_.begin()), wq_.end()); + close(); return; } wq_.emplace_back(std::move(w)); @@ -233,9 +251,8 @@ close() if(! strand_.running_in_this_thread()) return strand_.post(std::bind( &BaseWSPeer::close, impl().shared_from_this())); - if(wq_.size() > 0) - do_close_ = true; - else + do_close_ = true; + if(wq_.empty()) impl().ws_.async_close({}, strand_.wrap(std::bind( &BaseWSPeer::on_close, impl().shared_from_this(), beast::asio::placeholders::error))); @@ -259,6 +276,7 @@ on_ws_handshake(error_code const& ec) { if(ec) return fail(ec, "on_ws_handshake"); + close_on_timer_ = false; do_read(); } @@ -278,7 +296,6 @@ void BaseWSPeer:: on_write(error_code const& ec) { - cancel_timer(); if(ec) return fail(ec, "write"); auto& w = *wq_.front(); @@ -329,7 +346,6 @@ do_read() impl().ws_.async_read(op_, rb_, strand_.wrap( std::bind(&BaseWSPeer::on_read, impl().shared_from_this(), placeholders::error))); - cancel_timer(); } template @@ -338,7 +354,7 @@ BaseWSPeer:: on_read(error_code const& ec) { if(ec == beast::websocket::error::closed) - return do_close(); + return on_close({}); if(ec) return fail(ec, "read"); auto const& data = rb_.data(); @@ -355,7 +371,7 @@ void BaseWSPeer:: on_close(error_code const& ec) { - // great + cancel_timer(); } template @@ -387,7 +403,41 @@ cancel_timer() timer_.cancel(ec); } -// Called when session times out +template +void +BaseWSPeer:: +on_ping(error_code const& ec) +{ + if(ec == boost::asio::error::operation_aborted) + return; + ping_active_ = false; + if(! ec) + return; + fail(ec, "on_ping"); +} + +template +void +BaseWSPeer:: +on_ping_pong(bool is_pong, + beast::websocket::ping_data const& payload) +{ + if(is_pong) + { + if(payload == payload_) + { + close_on_timer_ = false; + JLOG(this->j_.trace()) << + "got matching pong"; + } + else + { + JLOG(this->j_.trace()) << + "got pong"; + } + } +} + template void BaseWSPeer:: @@ -396,10 +446,29 @@ on_timer(error_code ec) if(ec == boost::asio::error::operation_aborted) return; if(! ec) + { + if(! close_on_timer_ || !ping_active_) + { + start_timer(); + ping_active_ = true; + // cryptographic is probably overkill.. + beast::rngfill(payload_.begin(), + payload_.size(), crypto_prng()); + impl().ws_.async_ping(payload_, + strand_.wrap(std::bind( + &BaseWSPeer::on_ping, + impl().shared_from_this(), + std::placeholders::_1))); + JLOG(this->j_.trace()) << + "sent pong"; + return; + } ec = boost::system::errc::make_error_code( boost::system::errc::timed_out); + } fail(ec, "timer"); } + } // ripple #endif diff --git a/src/ripple/server/impl/PlainWSPeer.h b/src/ripple/server/impl/PlainWSPeer.h index 8e84140a5..18f8ae4ca 100644 --- a/src/ripple/server/impl/PlainWSPeer.h +++ b/src/ripple/server/impl/PlainWSPeer.h @@ -30,7 +30,6 @@ class PlainWSPeer : public BaseWSPeer> , public std::enable_shared_from_this> { -private: friend class BasePeer; friend class BaseWSPeer; @@ -51,10 +50,6 @@ public: beast::http::request&& request, socket_type&& socket, beast::Journal journal); - -private: - void - do_close() override; }; //------------------------------------------------------------------------------ @@ -75,18 +70,6 @@ PlainWSPeer( { } -template -void -PlainWSPeer:: -do_close() -{ - error_code ec; - auto& sock = ws_.next_layer(); - sock.shutdown(socket_type::shutdown_both, ec); - if(ec) - return this->fail(ec, "do_close"); -} - } // ripple #endif diff --git a/src/ripple/server/impl/SSLWSPeer.h b/src/ripple/server/impl/SSLWSPeer.h index 2f208217e..383ebfd28 100644 --- a/src/ripple/server/impl/SSLWSPeer.h +++ b/src/ripple/server/impl/SSLWSPeer.h @@ -34,7 +34,6 @@ class SSLWSPeer : public BaseWSPeer> , public std::enable_shared_from_this> { -private: friend class BasePeer; friend class BaseWSPeer; @@ -58,13 +57,6 @@ public: std::unique_ptr< beast::asio::ssl_bundle>&& ssl_bundle, beast::Journal journal); - -private: - void - do_close() override; - - void - on_shutdown(error_code ec); }; //------------------------------------------------------------------------------ @@ -88,27 +80,6 @@ SSLWSPeer( { } -template -void -SSLWSPeer:: -do_close() -{ - //start_timer(); - using namespace beast::asio; - ws_.next_layer().async_shutdown( - this->strand_.wrap(std::bind(&SSLWSPeer::on_shutdown, - this->shared_from_this(), placeholders::error))); -} - -template -void -SSLWSPeer:: -on_shutdown(error_code ec) -{ - //cancel_timer(); - ws_.lowest_layer().close(ec); -} - } // ripple #endif