WebSocket ping, fixes, coverage:

* Improve test coverage
* tests for invokable in composed ops

* Update documentation
* Add License badge to README
* Target Windows 7 SDK and later
* Make role_type private
* Remove extra unused masking functions
* Allow stream reuse / reconnect after failure
* Restructure logic of composed operations
* Allow 0 for read_message_max meaning no limit
* Respect keep alive when building HTTP responses
* Check version in upgrade request
* Response with 426 status on unsupported WebSocket version
* Remove unnecessary Sec-WebSocket-Key in HTTP responses
* Rename to mask_buffer_size

* Remove maybe_throw
* Add ping, async_ping, async_on_pong
* Add ping_op
* Add pong_op
* Fix crash in accept_op
* Fix suspend in close_op
* Fix read_frame_op logic
* Fix crash in read_op
* Fix races in echo sync and async echo servers
This commit is contained in:
Vinnie Falco
2016-05-15 16:22:25 -04:00
parent a570b74038
commit eb7bd6a2f1
40 changed files with 2757 additions and 1365 deletions

View File

@@ -13,6 +13,7 @@
#include <beast/websocket/impl/accept_op.ipp>
#include <beast/websocket/impl/close_op.ipp>
#include <beast/websocket/impl/handshake_op.ipp>
#include <beast/websocket/impl/ping_op.ipp>
#include <beast/websocket/impl/read_op.ipp>
#include <beast/websocket/impl/read_frame_op.ipp>
#include <beast/websocket/impl/response_op.ipp>
@@ -40,6 +41,13 @@ namespace websocket {
namespace detail {
template<class _>
void
stream_base::open(role_type role)
{
role_ = role;
}
template<class _>
void
stream_base::prepare_fh(close_code::value& code)
@@ -76,7 +84,7 @@ stream_base::prepare_fh(close_code::value& code)
}
rd_size_ += rd_fh_.len;
}
if(rd_size_ > rd_msg_max_)
if(rd_msg_max_ && rd_size_ > rd_msg_max_)
{
code = close_code::too_big;
return;
@@ -100,7 +108,7 @@ stream_base::write_close(
fh.rsv3 = false;
fh.len = cr.code == close_code::none ?
0 : 2 + cr.reason.size();
fh.mask = role_ == role_type::client;
fh.mask = role_ == detail::role_type::client;
if(fh.mask)
fh.key = maskgen_();
detail::write(sb, fh);
@@ -136,7 +144,7 @@ stream_base::write_close(
template<class Streambuf>
void
stream_base::write_ping(Streambuf& sb,
opcode op, ping_payload_type const& data)
opcode op, ping_data const& data)
{
frame_header fh;
fh.op = op;
@@ -184,7 +192,8 @@ accept()
"SyncStream requirements not met");
error_code ec;
accept(boost::asio::null_buffers{}, ec);
detail::maybe_throw(ec, "accept");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -223,7 +232,8 @@ accept(ConstBufferSequence const& buffers)
"ConstBufferSequence requirements not met");
error_code ec;
accept(buffers, ec);
detail::maybe_throw(ec, "accept");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -239,6 +249,7 @@ accept(ConstBufferSequence const& buffers, error_code& ec)
"ConstBufferSequence requirements not met");
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
reset();
stream_.buffer().commit(buffer_copy(
stream_.buffer().prepare(
buffer_size(buffers)), buffers));
@@ -279,7 +290,8 @@ accept(http::request_v1<Body, Headers> const& request)
"SyncStream requirements not met");
error_code ec;
accept(request, ec);
detail::maybe_throw(ec, "accept");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -291,8 +303,11 @@ accept(http::request_v1<Body, Headers> const& req,
{
static_assert(is_SyncStream<next_layer_type>::value,
"SyncStream requirements not met");
reset();
auto const res = build_response(req);
http::write(stream_, res, ec);
if(ec)
return;
if(res.status != 101)
{
ec = error::handshake_failed;
@@ -300,7 +315,7 @@ accept(http::request_v1<Body, Headers> const& req,
// teardown if Connection: close.
return;
}
role_ = role_type::server;
open(detail::role_type::server);
}
template<class NextLayer>
@@ -316,6 +331,7 @@ async_accept(http::request_v1<Body, Headers> const& req,
beast::async_completion<
AcceptHandler, void(error_code)
> completion(handler);
reset();
response_op<decltype(completion.handler)>{
completion.handler, *this, req,
boost_asio_handler_cont_helpers::
@@ -333,7 +349,8 @@ handshake(boost::string_ref const& host,
"SyncStream requirements not met");
error_code ec;
handshake(host, resource, ec);
detail::maybe_throw(ec, "upgrade");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -344,6 +361,7 @@ handshake(boost::string_ref const& host,
{
static_assert(is_SyncStream<next_layer_type>::value,
"SyncStream requirements not met");
reset();
std::string key;
http::write(stream_,
build_request(host, resource, key), ec);
@@ -383,7 +401,8 @@ close(close_reason const& cr)
"SyncStream requirements not met");
error_code ec;
close(cr, ec);
detail::maybe_throw(ec, "close");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -398,7 +417,7 @@ close(close_reason const& cr, error_code& ec)
detail::frame_streambuf fb;
write_close<static_streambuf>(fb, cr);
boost::asio::write(stream_, fb.data(), ec);
error_ = ec != 0;
failed_ = ec != 0;
}
template<class NextLayer>
@@ -418,6 +437,45 @@ async_close(close_reason const& cr, CloseHandler&& handler)
return completion.result.get();
}
template<class NextLayer>
void
stream<NextLayer>::
ping(ping_data const& payload)
{
error_code ec;
ping(payload, ec);
if(ec)
throw system_error{ec};
}
template<class NextLayer>
void
stream<NextLayer>::
ping(ping_data const& payload, error_code& ec)
{
detail::frame_streambuf sb;
write_ping<static_streambuf>(
sb, opcode::ping, payload);
boost::asio::write(stream_, sb.data(), ec);
}
template<class NextLayer>
template<class PingHandler>
typename async_completion<
PingHandler, void(error_code)>::result_type
stream<NextLayer>::
async_ping(ping_data const& payload, PingHandler&& handler)
{
static_assert(is_AsyncStream<next_layer_type>::value,
"AsyncStream requirements requirements not met");
beast::async_completion<
PingHandler, void(error_code)
> completion(handler);
ping_op<decltype(completion.handler)>{
completion.handler, *this, payload};
return completion.result.get();
}
template<class NextLayer>
template<class Streambuf>
void
@@ -428,7 +486,8 @@ read(opcode& op, Streambuf& streambuf)
"SyncStream requirements not met");
error_code ec;
read(op, streambuf, ec);
detail::maybe_throw(ec, "read");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -481,7 +540,8 @@ read_frame(frame_info& fi, Streambuf& streambuf)
"SyncStream requirements not met");
error_code ec;
read_frame(fi, streambuf, ec);
detail::maybe_throw(ec, "read_some");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -500,8 +560,8 @@ read_frame(frame_info& fi, Streambuf& streambuf, error_code& ec)
// read header
detail::frame_streambuf fb;
do_read_fh(fb, code, ec);
error_ = ec != 0;
if(error_)
failed_ = ec != 0;
if(failed_)
return;
if(code != close_code::none)
break;
@@ -513,8 +573,8 @@ read_frame(frame_info& fi, Streambuf& streambuf, error_code& ec)
auto const mb = fb.prepare(
static_cast<std::size_t>(rd_fh_.len));
fb.commit(boost::asio::read(stream_, mb, ec));
error_ = ec != 0;
if(error_)
failed_ = ec != 0;
if(failed_)
return;
if(rd_fh_.mask)
detail::mask_inplace(mb, rd_key_);
@@ -522,27 +582,23 @@ read_frame(frame_info& fi, Streambuf& streambuf, error_code& ec)
}
if(rd_fh_.op == opcode::ping)
{
ping_payload_type data;
detail::read(data, fb.data(), code);
if(code != close_code::none)
break;
ping_data data;
detail::read(data, fb.data());
fb.reset();
write_ping<static_streambuf>(
fb, opcode::pong, data);
boost::asio::write(stream_, fb.data(), ec);
error_ = ec != 0;
if(error_)
failed_ = ec != 0;
if(failed_)
return;
continue;
}
else if(rd_fh_.op == opcode::pong)
{
ping_payload_type data;
detail::read(data, fb.data(), code);
if(code != close_code::none)
break;
// VFALCO How to notify callers using
// the synchronous interface?
ping_data payload;
detail::read(payload, fb.data());
if(pong_cb_)
pong_cb_(payload);
continue;
}
assert(rd_fh_.op == opcode::close);
@@ -560,8 +616,8 @@ read_frame(frame_info& fi, Streambuf& streambuf, error_code& ec)
wr_close_ = true;
write_close<static_streambuf>(fb, cr);
boost::asio::write(stream_, fb.data(), ec);
error_ = ec != 0;
if(error_)
failed_ = ec != 0;
if(failed_)
return;
}
break;
@@ -578,8 +634,8 @@ read_frame(frame_info& fi, Streambuf& streambuf, error_code& ec)
detail::clamp(rd_need_));
auto const bytes_transferred =
stream_.read_some(smb, ec);
error_ = ec != 0;
if(error_)
failed_ = ec != 0;
if(failed_)
return;
rd_need_ -= bytes_transferred;
auto const pb = prepare_buffers(
@@ -610,23 +666,23 @@ read_frame(frame_info& fi, Streambuf& streambuf, error_code& ec)
detail::frame_streambuf fb;
write_close<static_streambuf>(fb, code);
boost::asio::write(stream_, fb.data(), ec);
error_ = ec != 0;
if(error_)
failed_ = ec != 0;
if(failed_)
return;
}
websocket_helpers::call_teardown(next_layer(), ec);
error_ = ec != 0;
if(error_)
failed_ = ec != 0;
if(failed_)
return;
ec = error::failed;
error_ = true;
failed_ = true;
return;
}
if(! ec)
websocket_helpers::call_teardown(next_layer(), ec);
if(! ec)
ec = error::closed;
error_ = ec != 0;
failed_ = ec != 0;
}
template<class NextLayer>
@@ -658,7 +714,8 @@ write(ConstBufferSequence const& buffers)
"SyncStream requirements not met");
error_code ec;
write(buffers, ec);
detail::maybe_throw(ec, "write");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -719,7 +776,8 @@ write_frame(bool fin, ConstBufferSequence const& buffers)
"SyncStream requirements not met");
error_code ec;
write_frame(fin, buffers, ec);
detail::maybe_throw(ec, "write");
if(ec)
throw system_error{ec};
}
template<class NextLayer>
@@ -744,7 +802,7 @@ write_frame(bool fin, ConstBufferSequence const& bs, error_code& ec)
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = buffer_size(bs);
fh.mask = role_ == role_type::client;
fh.mask = role_ == detail::role_type::client;
if(fh.mask)
fh.key = maskgen_();
detail::fh_streambuf fh_buf;
@@ -754,22 +812,21 @@ write_frame(bool fin, ConstBufferSequence const& bs, error_code& ec)
// send header and payload
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), bs), ec);
error_ = ec != 0;
failed_ = ec != 0;
return;
}
detail::prepared_key_type key;
detail::prepare_key(key, fh.key);
auto const tmp_size = detail::clamp(
fh.len, wr_buf_size_);
auto const tmp_size =
detail::clamp(fh.len, mask_buf_size_);
std::unique_ptr<std::uint8_t[]> up(
new std::uint8_t[tmp_size]);
auto const tmp = up.get();
std::uint64_t remain = fh.len;
consuming_buffers<ConstBufferSequence> cb(bs);
{
auto const n =
detail::clamp(remain, tmp_size);
mutable_buffers_1 mb{tmp, n};
mutable_buffers_1 mb{up.get(), n};
buffer_copy(mb, cb);
cb.consume(n);
remain -= n;
@@ -779,7 +836,7 @@ write_frame(bool fin, ConstBufferSequence const& bs, error_code& ec)
buffer_cat(fh_buf.data(), mb), ec);
if(ec)
{
error_ = ec != 0;
failed_ = ec != 0;
return;
}
}
@@ -787,7 +844,7 @@ write_frame(bool fin, ConstBufferSequence const& bs, error_code& ec)
{
auto const n =
detail::clamp(remain, tmp_size);
mutable_buffers_1 mb{tmp, n};
mutable_buffers_1 mb{up.get(), n};
buffer_copy(mb, cb);
cb.consume(n);
remain -= n;
@@ -796,7 +853,7 @@ write_frame(bool fin, ConstBufferSequence const& bs, error_code& ec)
boost::asio::write(stream_, mb, ec);
if(ec)
{
error_ = ec != 0;
failed_ = ec != 0;
return;
}
}
@@ -826,6 +883,23 @@ async_write_frame(bool fin,
//------------------------------------------------------------------------------
template<class NextLayer>
void
stream<NextLayer>::
reset()
{
failed_ = false;
rd_need_ = 0;
rd_cont_ = false;
wr_close_ = false;
wr_cont_ = false;
wr_block_ = nullptr; // should be nullptr on close anyway
pong_data_ = nullptr; // should be nullptr on close anyway
stream_.buffer().consume(
stream_.buffer().size());
}
template<class NextLayer>
http::request_v1<http::empty_body>
stream<NextLayer>::
@@ -860,8 +934,11 @@ build_response(http::request_v1<Body, Headers> const& req)
res.reason = http::reason_string(res.status);
res.version = req.version;
res.body = text;
// VFALCO TODO respect keep-alive here
prepare(res);
(*d_)(res);
prepare(res,
(is_keep_alive(req) && keep_alive_) ?
http::connection::keep_alive :
http::connection::close);
return res;
};
if(req.version < 11)
@@ -874,17 +951,28 @@ build_response(http::request_v1<Body, Headers> const& req)
return err("Missing Host");
if(! req.headers.exists("Sec-WebSocket-Key"))
return err("Missing Sec-WebSocket-Key");
if(! rfc2616::token_in_list(
req.headers["Upgrade"], "websocket"))
return err("Missing websocket Upgrade token");
{
auto const version =
req.headers["Sec-WebSocket-Version"];
if(version.empty())
return err("Missing Sec-WebSocket-Version");
if(version != "13")
return err("Unsupported Sec-WebSocket-Version");
{
http::response_v1<http::string_body> res;
res.status = 426;
res.reason = http::reason_string(res.status);
res.version = req.version;
res.headers.insert("Sec-WebSocket-Version", "13");
prepare(res,
(is_keep_alive(req) && keep_alive_) ?
http::connection::keep_alive :
http::connection::close);
return res;
}
}
if(! rfc2616::token_in_list(
req.headers["Upgrade"], "websocket"))
return err("Missing websocket Upgrade token");
http::response_v1<http::string_body> res;
res.status = 101;
res.reason = http::reason_string(res.status);
@@ -893,7 +981,6 @@ build_response(http::request_v1<Body, Headers> const& req)
{
auto const key =
req.headers["Sec-WebSocket-Key"];
res.headers.insert("Sec-WebSocket-Key", key);
res.headers.insert("Sec-WebSocket-Accept",
detail::make_sec_ws_accept(key));
}
@@ -912,6 +999,8 @@ do_response(http::response_v1<Body, Headers> const& res,
{
// VFALCO Review these error codes
auto fail = [&]{ ec = error::response_failed; };
if(res.version < 11)
return fail();
if(res.status != 101)
return fail();
if(! is_upgrade(res))
@@ -924,7 +1013,7 @@ do_response(http::response_v1<Body, Headers> const& res,
if(res.headers["Sec-WebSocket-Accept"] !=
detail::make_sec_ws_accept(key))
return fail();
role_ = role_type::client;
open(detail::role_type::client);
}
template<class NextLayer>