Return WS error on closure when balance threshold exceeds

This commit is contained in:
Mo Morsi
2019-06-17 15:43:42 -04:00
committed by Nik Bougalis
parent a988b3224f
commit 7b7e3b6750
3 changed files with 37 additions and 18 deletions

View File

@@ -359,7 +359,7 @@ ServerHandlerImp::onWSMessage(
if (postResult == nullptr)
{
// The coroutine was rejected, probably because we're shutting down.
session->close();
session->close({boost::beast::websocket::going_away, "Shutting Down"});
}
}
@@ -388,7 +388,7 @@ ServerHandlerImp::processSession(
auto is = std::static_pointer_cast<WSInfoSub> (session->appDefined);
if (is->getConsumer().disconnect())
{
session->close();
session->close({boost::beast::websocket::policy_error, "threshold exceeded"});
// FIX: This rpcError is not delivered since the session
// was just closed.
return rpcError(rpcSLOW_DOWN);

View File

@@ -136,9 +136,12 @@ struct WSSession
void
send(std::shared_ptr<WSMsg> w) = 0;
virtual void
close() = 0;
virtual
void
close() = 0;
close(boost::beast::websocket::close_reason const& reason) = 0;
/** Indicate that the response is complete.
The handler should call this when it has completed writing

View File

@@ -104,6 +104,9 @@ public:
void
close() override;
void
close(boost::beast::websocket::close_reason const& reason) override;
void
complete() override;
@@ -224,13 +227,12 @@ send(std::shared_ptr<WSMsg> w)
return;
if(wq_.size() > port().ws_queue_limit)
{
JLOG(this->j_.info()) <<
"closing slow client";
cr_.code = safe_cast<decltype(cr_.code)>
(boost::beast::websocket::close_code::policy_error);
cr_.reason = "Policy error: client is too slow.";
JLOG(this->j_.info()) << cr_.reason;
wq_.erase(std::next(wq_.begin()), wq_.end());
close();
close(cr_);
return;
}
wq_.emplace_back(std::move(w));
@@ -238,24 +240,38 @@ send(std::shared_ptr<WSMsg> w)
on_write({});
}
template<class Handler, class Impl>
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::
close()
BaseWSPeer<Handler, Impl>::close()
{
if(! strand_.running_in_this_thread())
return post(
strand_, std::bind(&BaseWSPeer::close, impl().shared_from_this()));
close(boost::beast::websocket::close_reason{});
}
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::close(
boost::beast::websocket::close_reason const& reason)
{
if (!strand_.running_in_this_thread())
return post(strand_, [self = impl().shared_from_this(), reason] {
self->close(reason);
});
do_close_ = true;
if(wq_.empty())
if (wq_.empty())
{
impl().ws_.async_close(
{},
reason,
bind_executor(
strand_,
std::bind(
&BaseWSPeer::on_close,
impl().shared_from_this(),
std::placeholders::_1)));
[self = impl().shared_from_this()](
boost::beast::error_code const& ec) {
self->on_close(ec);
}));
}
else
{
cr_ = reason;
}
}
template<class Handler, class Impl>