Compare commits

..

4 Commits

Author SHA1 Message Date
seelabs
0d4fe469c6 Set version to 0.60.1 2017-03-29 15:53:21 -04:00
Miguel Portilla
8b43d67a73 Cancel websocket timer on failure:
This fixes a problem where the timeout timer
would not be canceled with some errors.
fix #2026
2017-03-29 15:53:16 -04:00
Vinnie Falco
128f7cefb1 Send a websocket ping before timing out in server:
This fixes a problem where idle websocket client
connections could be disconnected due to inactivity.
2017-03-29 15:53:16 -04:00
seelabs
09f9720ebb Correctly calculate Escrow and PayChan identifiers:
This change fixes a technical flaw that resulted from using
32-bit space identifiers instead of the protocol-defined
16-bit values.

Details: https://ripple.com/build/ledger-format/#tree-format
2017-03-29 15:52:42 -04:00
6 changed files with 113 additions and 94 deletions

View File

@@ -33,7 +33,7 @@ char const* const versionString =
// The build version number. You must edit this for each release
// and follow the format described at http://semver.org/
//
"0.60.0"
"0.60.1"
#if defined(DEBUG) || defined(SANITIZER)
"+"

View File

@@ -317,7 +317,7 @@ escrow (AccountID const& source, std::uint32_t seq)
{
sha512_half_hasher h;
using beast::hash_append;
hash_append(h, spaceEscrow);
hash_append(h, std::uint16_t(spaceEscrow));
hash_append(h, source);
hash_append(h, seq);
return { ltESCROW, static_cast<uint256>(h) };
@@ -328,7 +328,7 @@ payChan (AccountID const& source, AccountID const& dst, std::uint32_t seq)
{
sha512_half_hasher h;
using beast::hash_append;
hash_append(h, spaceXRPUChannel);
hash_append(h, std::uint16_t(spaceXRPUChannel));
hash_append(h, source);
hash_append(h, dst);
hash_append(h, seq);

View File

@@ -24,6 +24,7 @@
#include <ripple/server/impl/io_list.h>
#include <ripple/beast/utility/WrappedSink.h>
#include <boost/asio.hpp>
#include <atomic>
#include <cassert>
#include <functional>
#include <string>
@@ -49,7 +50,6 @@ protected:
boost::asio::io_service::work work_;
boost::asio::io_service::strand strand_;
error_code ec_;
public:
BasePeer(Port const& port, Handler& handler,
@@ -60,11 +60,6 @@ public:
void
close() override;
protected:
template<class String>
void
fail(error_code ec, String const& what);
private:
Impl&
impl()
@@ -87,7 +82,7 @@ BasePeer(Port const& port, Handler& handler,
, sink_(journal.sink(),
[]
{
static int id = 0;
static std::atomic<unsigned> id{0};
return "##" + std::to_string(++id) + " ";
}())
, j_(sink_)
@@ -108,23 +103,6 @@ close()
impl().ws_.lowest_layer().close(ec);
}
template<class Handler, class Impl>
template<class String>
void
BasePeer<Handler, Impl>::
fail(error_code ec, String const& what)
{
assert(strand_.running_in_this_thread());
if(! ec_ &&
ec != boost::asio::error::operation_aborted)
{
ec_ = ec;
JLOG(j_.trace()) <<
what << ": " << ec.message();
impl().ws_.lowest_layer().close(ec);
}
}
} // ripple
#endif

View File

@@ -22,6 +22,8 @@
#include <ripple/server/impl/BasePeer.h>
#include <ripple/protocol/BuildInfo.h>
#include <ripple/beast/utility/rngfill.h>
#include <ripple/crypto/csprng.h>
#include <beast/websocket.hpp>
#include <beast/core/streambuf.hpp>
#include <beast/http/message.hpp>
@@ -40,15 +42,8 @@ protected:
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using waitable_timer = boost::asio::basic_waitable_timer <clock_type>;
using BasePeer<Handler, Impl>::fail;
using BasePeer<Handler, Impl>::strand_;
enum
{
// Max seconds without completing a message
timeoutSeconds = 30
};
private:
friend class BasePeer<Handler, Impl>;
@@ -60,6 +55,10 @@ private:
bool do_close_ = false;
beast::websocket::close_reason cr_;
waitable_timer timer_;
bool close_on_timer_ = false;
bool ping_active_ = false;
beast::websocket::ping_data payload_;
error_code ec_;
public:
template<class Body, class Headers>
@@ -152,18 +151,25 @@ protected:
void
on_close(error_code const& ec);
virtual
void
do_close() = 0;
void
start_timer();
void
cancel_timer();
void
on_ping(error_code const& ec);
void
on_ping_pong(bool is_pong,
beast::websocket::ping_data const& payload);
void
on_timer(error_code ec);
template<class String>
void
fail(error_code ec, String const& what);
};
//------------------------------------------------------------------------------
@@ -195,7 +201,12 @@ run()
&BaseWSPeer::run, impl().shared_from_this()));
impl().ws_.set_option(beast::websocket::decorate(identity{}));
impl().ws_.set_option(port().pmd_options);
impl().ws_.set_option(beast::websocket::ping_callback{
std::bind(&BaseWSPeer::on_ping_pong, this,
std::placeholders::_1, std::placeholders::_2)});
using namespace beast::asio;
start_timer();
close_on_timer_ = true;
impl().ws_.async_accept(request_, strand_.wrap(std::bind(
&BaseWSPeer::on_ws_handshake, impl().shared_from_this(),
placeholders::error)));
@@ -212,12 +223,17 @@ send(std::shared_ptr<WSMsg> w)
return strand_.post(std::bind(
&BaseWSPeer::send, impl().shared_from_this(),
std::move(w)));
if(do_close_)
return;
if(wq_.size() >= limit)
{
JLOG(this->j_.info()) <<
"closing slow client";
cr_.code = static_cast<beast::websocket::close_code::value>(4000);
cr_.reason = "Client is too slow.";
do_close_ = true;
static_assert(limit >= 1, "");
wq_.erase(std::next(wq_.begin()), wq_.end());
close();
return;
}
wq_.emplace_back(std::move(w));
@@ -233,9 +249,8 @@ close()
if(! strand_.running_in_this_thread())
return strand_.post(std::bind(
&BaseWSPeer::close, impl().shared_from_this()));
if(wq_.size() > 0)
do_close_ = true;
else
do_close_ = true;
if(wq_.empty())
impl().ws_.async_close({}, strand_.wrap(std::bind(
&BaseWSPeer::on_close, impl().shared_from_this(),
beast::asio::placeholders::error)));
@@ -259,6 +274,7 @@ on_ws_handshake(error_code const& ec)
{
if(ec)
return fail(ec, "on_ws_handshake");
close_on_timer_ = false;
do_read();
}
@@ -278,7 +294,6 @@ void
BaseWSPeer<Handler, Impl>::
on_write(error_code const& ec)
{
cancel_timer();
if(ec)
return fail(ec, "write");
auto& w = *wq_.front();
@@ -329,7 +344,6 @@ do_read()
impl().ws_.async_read(op_, rb_, strand_.wrap(
std::bind(&BaseWSPeer::on_read,
impl().shared_from_this(), placeholders::error)));
cancel_timer();
}
template<class Handler, class Impl>
@@ -338,7 +352,7 @@ BaseWSPeer<Handler, Impl>::
on_read(error_code const& ec)
{
if(ec == beast::websocket::error::closed)
return do_close();
return on_close({});
if(ec)
return fail(ec, "read");
auto const& data = rb_.data();
@@ -355,7 +369,7 @@ void
BaseWSPeer<Handler, Impl>::
on_close(error_code const& ec)
{
// great
cancel_timer();
}
template<class Handler, class Impl>
@@ -387,7 +401,41 @@ cancel_timer()
timer_.cancel(ec);
}
// Called when session times out
template<class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::
on_ping(error_code const& ec)
{
if(ec == boost::asio::error::operation_aborted)
return;
ping_active_ = false;
if(! ec)
return;
fail(ec, "on_ping");
}
template<class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::
on_ping_pong(bool is_pong,
beast::websocket::ping_data const& payload)
{
if(is_pong)
{
if(payload == payload_)
{
close_on_timer_ = false;
JLOG(this->j_.trace()) <<
"got matching pong";
}
else
{
JLOG(this->j_.trace()) <<
"got pong";
}
}
}
template<class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::
@@ -396,10 +444,49 @@ on_timer(error_code ec)
if(ec == boost::asio::error::operation_aborted)
return;
if(! ec)
{
if(! close_on_timer_ || ! ping_active_)
{
start_timer();
close_on_timer_ = true;
ping_active_ = true;
// cryptographic is probably overkill..
beast::rngfill(payload_.begin(),
payload_.size(), crypto_prng());
impl().ws_.async_ping(payload_,
strand_.wrap(std::bind(
&BaseWSPeer::on_ping,
impl().shared_from_this(),
std::placeholders::_1)));
JLOG(this->j_.trace()) <<
"sent ping";
return;
}
ec = boost::system::errc::make_error_code(
boost::system::errc::timed_out);
}
fail(ec, "timer");
}
template<class Handler, class Impl>
template<class String>
void
BaseWSPeer<Handler, Impl>::
fail(error_code ec, String const& what)
{
assert(strand_.running_in_this_thread());
cancel_timer();
if(! ec_ &&
ec != boost::asio::error::operation_aborted)
{
ec_ = ec;
JLOG(this->j_.trace()) <<
what << ": " << ec.message();
impl().ws_.lowest_layer().close(ec);
}
}
} // ripple
#endif

View File

@@ -30,7 +30,6 @@ class PlainWSPeer
: public BaseWSPeer<Handler, PlainWSPeer<Handler>>
, public std::enable_shared_from_this<PlainWSPeer<Handler>>
{
private:
friend class BasePeer<Handler, PlainWSPeer>;
friend class BaseWSPeer<Handler, PlainWSPeer>;
@@ -51,10 +50,6 @@ public:
beast::http::request<Body, Headers>&& request,
socket_type&& socket,
beast::Journal journal);
private:
void
do_close() override;
};
//------------------------------------------------------------------------------
@@ -75,18 +70,6 @@ PlainWSPeer(
{
}
template<class Handler>
void
PlainWSPeer<Handler>::
do_close()
{
error_code ec;
auto& sock = ws_.next_layer();
sock.shutdown(socket_type::shutdown_both, ec);
if(ec)
return this->fail(ec, "do_close");
}
} // ripple
#endif

View File

@@ -34,7 +34,6 @@ class SSLWSPeer
: public BaseWSPeer<Handler, SSLWSPeer<Handler>>
, public std::enable_shared_from_this<SSLWSPeer<Handler>>
{
private:
friend class BasePeer<Handler, SSLWSPeer>;
friend class BaseWSPeer<Handler, SSLWSPeer>;
@@ -58,13 +57,6 @@ public:
std::unique_ptr<
beast::asio::ssl_bundle>&& ssl_bundle,
beast::Journal journal);
private:
void
do_close() override;
void
on_shutdown(error_code ec);
};
//------------------------------------------------------------------------------
@@ -88,27 +80,6 @@ SSLWSPeer(
{
}
template<class Handler>
void
SSLWSPeer<Handler>::
do_close()
{
//start_timer();
using namespace beast::asio;
ws_.next_layer().async_shutdown(
this->strand_.wrap(std::bind(&SSLWSPeer::on_shutdown,
this->shared_from_this(), placeholders::error)));
}
template<class Handler>
void
SSLWSPeer<Handler>::
on_shutdown(error_code ec)
{
//cancel_timer();
ws_.lowest_layer().close(ec);
}
} // ripple
#endif