From 7b7e3b6750028d25bdfd390150d5bbe4743b36ef Mon Sep 17 00:00:00 2001 From: Mo Morsi Date: Mon, 17 Jun 2019 15:43:42 -0400 Subject: [PATCH] Return WS error on closure when balance threshold exceeds --- src/ripple/rpc/impl/ServerHandlerImp.cpp | 4 +-- src/ripple/server/WSSession.h | 5 ++- src/ripple/server/impl/BaseWSPeer.h | 46 ++++++++++++++++-------- 3 files changed, 37 insertions(+), 18 deletions(-) diff --git a/src/ripple/rpc/impl/ServerHandlerImp.cpp b/src/ripple/rpc/impl/ServerHandlerImp.cpp index 1e8043311..71a588f3e 100644 --- a/src/ripple/rpc/impl/ServerHandlerImp.cpp +++ b/src/ripple/rpc/impl/ServerHandlerImp.cpp @@ -359,7 +359,7 @@ ServerHandlerImp::onWSMessage( if (postResult == nullptr) { // The coroutine was rejected, probably because we're shutting down. - session->close(); + session->close({boost::beast::websocket::going_away, "Shutting Down"}); } } @@ -388,7 +388,7 @@ ServerHandlerImp::processSession( auto is = std::static_pointer_cast (session->appDefined); if (is->getConsumer().disconnect()) { - session->close(); + session->close({boost::beast::websocket::policy_error, "threshold exceeded"}); // FIX: This rpcError is not delivered since the session // was just closed. return rpcError(rpcSLOW_DOWN); diff --git a/src/ripple/server/WSSession.h b/src/ripple/server/WSSession.h index 230037384..9734f3640 100644 --- a/src/ripple/server/WSSession.h +++ b/src/ripple/server/WSSession.h @@ -136,9 +136,12 @@ struct WSSession void send(std::shared_ptr w) = 0; + virtual void + close() = 0; + virtual void - close() = 0; + close(boost::beast::websocket::close_reason const& reason) = 0; /** Indicate that the response is complete. The handler should call this when it has completed writing diff --git a/src/ripple/server/impl/BaseWSPeer.h b/src/ripple/server/impl/BaseWSPeer.h index 135a69fc0..729e24271 100644 --- a/src/ripple/server/impl/BaseWSPeer.h +++ b/src/ripple/server/impl/BaseWSPeer.h @@ -104,6 +104,9 @@ public: void close() override; + void + close(boost::beast::websocket::close_reason const& reason) override; + void complete() override; @@ -224,13 +227,12 @@ send(std::shared_ptr w) return; if(wq_.size() > port().ws_queue_limit) { - JLOG(this->j_.info()) << - "closing slow client"; cr_.code = safe_cast (boost::beast::websocket::close_code::policy_error); cr_.reason = "Policy error: client is too slow."; + JLOG(this->j_.info()) << cr_.reason; wq_.erase(std::next(wq_.begin()), wq_.end()); - close(); + close(cr_); return; } wq_.emplace_back(std::move(w)); @@ -238,24 +240,38 @@ send(std::shared_ptr w) on_write({}); } -template +template void -BaseWSPeer:: -close() +BaseWSPeer::close() { - if(! strand_.running_in_this_thread()) - return post( - strand_, std::bind(&BaseWSPeer::close, impl().shared_from_this())); + close(boost::beast::websocket::close_reason{}); +} + +template +void +BaseWSPeer::close( + boost::beast::websocket::close_reason const& reason) +{ + if (!strand_.running_in_this_thread()) + return post(strand_, [self = impl().shared_from_this(), reason] { + self->close(reason); + }); do_close_ = true; - if(wq_.empty()) + if (wq_.empty()) + { impl().ws_.async_close( - {}, + reason, bind_executor( strand_, - std::bind( - &BaseWSPeer::on_close, - impl().shared_from_this(), - std::placeholders::_1))); + [self = impl().shared_from_this()]( + boost::beast::error_code const& ec) { + self->on_close(ec); + })); + } + else + { + cr_ = reason; + } } template