Send a websocket ping before timing out in server:

This fixes a problem where idle websocket client
connections could be disconnected due to inactivity.
This commit is contained in:
Vinnie Falco
2017-02-24 19:14:03 -05:00
committed by Brad Chase
parent bc5a74057d
commit 15f969a469
4 changed files with 84 additions and 60 deletions

View File

@@ -24,6 +24,7 @@
#include <ripple/server/impl/io_list.h>
#include <ripple/beast/utility/WrappedSink.h>
#include <boost/asio.hpp>
#include <atomic>
#include <cassert>
#include <functional>
#include <string>
@@ -87,7 +88,7 @@ BasePeer(Port const& port, Handler& handler,
, sink_(journal.sink(),
[]
{
static int id = 0;
static std::atomic<unsigned> id{0};
return "##" + std::to_string(++id) + " ";
}())
, j_(sink_)

View File

@@ -22,6 +22,8 @@
#include <ripple/server/impl/BasePeer.h>
#include <ripple/protocol/BuildInfo.h>
#include <ripple/beast/utility/rngfill.h>
#include <ripple/crypto/csprng.h>
#include <beast/websocket.hpp>
#include <beast/core/streambuf.hpp>
#include <beast/http/message.hpp>
@@ -60,6 +62,9 @@ private:
bool do_close_ = false;
beast::websocket::close_reason cr_;
waitable_timer timer_;
bool close_on_timer_ = false;
bool ping_active_ = false;
beast::websocket::ping_data payload_;
public:
template<class Body, class Headers>
@@ -152,16 +157,19 @@ protected:
void
on_close(error_code const& ec);
virtual
void
do_close() = 0;
void
start_timer();
void
cancel_timer();
void
on_ping(error_code const& ec);
void
on_ping_pong(bool is_pong,
beast::websocket::ping_data const& payload);
void
on_timer(error_code ec);
};
@@ -195,7 +203,12 @@ run()
&BaseWSPeer::run, impl().shared_from_this()));
impl().ws_.set_option(beast::websocket::decorate(identity{}));
impl().ws_.set_option(port().pmd_options);
impl().ws_.set_option(beast::websocket::ping_callback{
std::bind(&BaseWSPeer::on_ping_pong, this,
std::placeholders::_1, std::placeholders::_2)});
using namespace beast::asio;
start_timer();
close_on_timer_ = true;
impl().ws_.async_accept(request_, strand_.wrap(std::bind(
&BaseWSPeer::on_ws_handshake, impl().shared_from_this(),
placeholders::error)));
@@ -212,12 +225,17 @@ send(std::shared_ptr<WSMsg> w)
return strand_.post(std::bind(
&BaseWSPeer::send, impl().shared_from_this(),
std::move(w)));
if(do_close_)
return;
if(wq_.size() >= limit)
{
JLOG(this->j_.info()) <<
"closing slow client";
cr_.code = static_cast<beast::websocket::close_code::value>(4000);
cr_.reason = "Client is too slow.";
do_close_ = true;
static_assert(limit >= 1, "");
wq_.erase(std::next(wq_.begin()), wq_.end());
close();
return;
}
wq_.emplace_back(std::move(w));
@@ -233,9 +251,8 @@ close()
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(
&BaseWSPeer::close, impl().shared_from_this()));
if(wq_.size() > 0)
do_close_ = true;
else
do_close_ = true;
if(wq_.empty())
impl().ws_.async_close({}, strand_.wrap(std::bind(
&BaseWSPeer::on_close, impl().shared_from_this(),
beast::asio::placeholders::error)));
@@ -259,6 +276,7 @@ on_ws_handshake(error_code const& ec)
{
if(ec)
return fail(ec, "on_ws_handshake");
close_on_timer_ = false;
do_read();
}
@@ -278,7 +296,6 @@ void
BaseWSPeer<Handler, Impl>::
on_write(error_code const& ec)
{
cancel_timer();
if(ec)
return fail(ec, "write");
auto& w = *wq_.front();
@@ -329,7 +346,6 @@ do_read()
impl().ws_.async_read(op_, rb_, strand_.wrap(
std::bind(&BaseWSPeer::on_read,
impl().shared_from_this(), placeholders::error)));
cancel_timer();
}
template<class Handler, class Impl>
@@ -338,7 +354,7 @@ BaseWSPeer<Handler, Impl>::
on_read(error_code const& ec)
{
if(ec == beast::websocket::error::closed)
return do_close();
return on_close({});
if(ec)
return fail(ec, "read");
auto const& data = rb_.data();
@@ -355,7 +371,7 @@ void
BaseWSPeer<Handler, Impl>::
on_close(error_code const& ec)
{
// great
cancel_timer();
}
template<class Handler, class Impl>
@@ -387,7 +403,41 @@ cancel_timer()
timer_.cancel(ec);
}
// Called when session times out
template<class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::
on_ping(error_code const& ec)
{
if(ec == boost::asio::error::operation_aborted)
return;
ping_active_ = false;
if(! ec)
return;
fail(ec, "on_ping");
}
template<class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::
on_ping_pong(bool is_pong,
beast::websocket::ping_data const& payload)
{
if(is_pong)
{
if(payload == payload_)
{
close_on_timer_ = false;
JLOG(this->j_.trace()) <<
"got matching pong";
}
else
{
JLOG(this->j_.trace()) <<
"got pong";
}
}
}
template<class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::
@@ -396,10 +446,29 @@ on_timer(error_code ec)
if(ec == boost::asio::error::operation_aborted)
return;
if(! ec)
{
if(! close_on_timer_ || !ping_active_)
{
start_timer();
ping_active_ = true;
// cryptographic is probably overkill..
beast::rngfill(payload_.begin(),
payload_.size(), crypto_prng());
impl().ws_.async_ping(payload_,
strand_.wrap(std::bind(
&BaseWSPeer::on_ping,
impl().shared_from_this(),
std::placeholders::_1)));
JLOG(this->j_.trace()) <<
"sent pong";
return;
}
ec = boost::system::errc::make_error_code(
boost::system::errc::timed_out);
}
fail(ec, "timer");
}
} // ripple
#endif

View File

@@ -30,7 +30,6 @@ class PlainWSPeer
: public BaseWSPeer<Handler, PlainWSPeer<Handler>>
, public std::enable_shared_from_this<PlainWSPeer<Handler>>
{
private:
friend class BasePeer<Handler, PlainWSPeer>;
friend class BaseWSPeer<Handler, PlainWSPeer>;
@@ -51,10 +50,6 @@ public:
beast::http::request<Body, Headers>&& request,
socket_type&& socket,
beast::Journal journal);
private:
void
do_close() override;
};
//------------------------------------------------------------------------------
@@ -75,18 +70,6 @@ PlainWSPeer(
{
}
template<class Handler>
void
PlainWSPeer<Handler>::
do_close()
{
error_code ec;
auto& sock = ws_.next_layer();
sock.shutdown(socket_type::shutdown_both, ec);
if(ec)
return this->fail(ec, "do_close");
}
} // ripple
#endif

View File

@@ -34,7 +34,6 @@ class SSLWSPeer
: public BaseWSPeer<Handler, SSLWSPeer<Handler>>
, public std::enable_shared_from_this<SSLWSPeer<Handler>>
{
private:
friend class BasePeer<Handler, SSLWSPeer>;
friend class BaseWSPeer<Handler, SSLWSPeer>;
@@ -58,13 +57,6 @@ public:
std::unique_ptr<
beast::asio::ssl_bundle>&& ssl_bundle,
beast::Journal journal);
private:
void
do_close() override;
void
on_shutdown(error_code ec);
};
//------------------------------------------------------------------------------
@@ -88,27 +80,6 @@ SSLWSPeer(
{
}
template<class Handler>
void
SSLWSPeer<Handler>::
do_close()
{
//start_timer();
using namespace beast::asio;
ws_.next_layer().async_shutdown(
this->strand_.wrap(std::bind(&SSLWSPeer::on_shutdown,
this->shared_from_this(), placeholders::error)));
}
template<class Handler>
void
SSLWSPeer<Handler>::
on_shutdown(error_code ec)
{
//cancel_timer();
ws_.lowest_layer().close(ec);
}
} // ripple
#endif