Files
rippled/beast/wsproto/impl/socket.ipp
Vinnie Falco 54f6f0ceba Beast.WebSocket:
Beast.WebSocket provides developers with a robust WebSocket
implementation built on Boost.Asio with a consistent asynchronous
model using a modern C++ approach.
2016-04-20 12:01:24 -04:00

816 lines
24 KiB
C++

//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef BEAST_WSPROTO_IMPL_SOCKET_IPP_INCLUDED
#define BEAST_WSPROTO_IMPL_SOCKET_IPP_INCLUDED
#include <beast/wsproto/teardown.h>
#include <beast/wsproto/detail/hybi13.h>
#include <beast/wsproto/impl/accept_op.ipp>
#include <beast/wsproto/impl/close_op.ipp>
#include <beast/wsproto/impl/handshake_op.ipp>
#include <beast/wsproto/impl/read_op.ipp>
#include <beast/wsproto/impl/read_frame_op.ipp>
#include <beast/wsproto/impl/response_op.ipp>
#include <beast/wsproto/impl/write_op.ipp>
#include <beast/wsproto/impl/write_frame_op.ipp>
#include <beast/asio/append_buffers.h>
#include <beast/asio/async_completion.h>
#include <beast/asio/consuming_buffers.h>
#include <beast/asio/prepare_buffers.h>
#include <beast/asio/static_streambuf.h>
#include <beast/asio/streambuf.h>
#include <beast/asio/type_check.h>
#include <beast/http/read.h>
#include <beast/http/write.h>
#include <beast/http/reason.h>
#include <beast/http/rfc2616.h>
#include <boost/endian/buffers.hpp>
#include <algorithm>
#include <cassert>
#include <memory>
#include <utility>
namespace beast {
namespace wsproto {
namespace detail {
template<class _>
void
socket_base::prepare_fh(close_code& code)
{
// continuation without an active message
if(! rd_cont_ && rd_fh_.op == opcode::cont)
{
code = close_code::protocol_error;
return;
}
// new data frame when continuation expected
if(rd_cont_ && ! is_control(rd_fh_.op) &&
rd_fh_.op != opcode::cont)
{
code = close_code::protocol_error;
return;
}
if(rd_fh_.mask)
prepare_key(rd_key_, rd_fh_.key);
if(! is_control(rd_fh_.op))
{
if(rd_fh_.op != opcode::cont)
{
rd_size_ = rd_fh_.len;
rd_opcode_ = rd_fh_.op;
}
else
{
if(rd_size_ > std::numeric_limits<
std::uint64_t>::max() - rd_fh_.len)
{
code = close_code::too_big;
return;
}
rd_size_ += rd_fh_.len;
}
if(rd_size_ > rd_msg_max_)
{
code = close_code::too_big;
return;
}
rd_need_ = rd_fh_.len;
rd_cont_ = ! rd_fh_.fin;
}
}
template<class Streambuf>
void
socket_base::write_close(
Streambuf& sb, close_reason const& cr)
{
using namespace boost::endian;
frame_header fh;
fh.op = opcode::close;
fh.fin = true;
fh.rsv1 = false;
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = cr.code == close_code::none ?
0 : 2 + cr.reason.size();
if((fh.mask = (role_ == role_type::client)))
fh.key = maskgen_();
detail::write(sb, fh);
if(cr.code != close_code::none)
{
detail::prepared_key_type key;
if(fh.mask)
detail::prepare_key(key, fh.key);
{
std::uint8_t b[2];
::new(&b[0]) big_uint16_buf_t{
(std::uint16_t)cr.code};
auto d = sb.prepare(2);
boost::asio::buffer_copy(d,
boost::asio::buffer(b));
if(fh.mask)
detail::mask_inplace(d, key);
sb.commit(2);
}
if(! cr.reason.empty())
{
auto d = sb.prepare(cr.reason.size());
boost::asio::buffer_copy(d,
boost::asio::const_buffer(
cr.reason.data(), cr.reason.size()));
if(fh.mask)
detail::mask_inplace(d, key);
sb.commit(cr.reason.size());
}
}
}
template<class Streambuf>
void
socket_base::write_ping(Streambuf& sb,
opcode op, ping_payload_type const& data)
{
frame_header fh;
fh.op = op;
fh.fin = true;
fh.rsv1 = false;
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = data.size();
if((fh.mask = (role_ == role_type::client)))
fh.key = maskgen_();
detail::write(sb, fh);
if(data.empty())
return;
detail::prepared_key_type key;
if(fh.mask)
detail::prepare_key(key, fh.key);
auto d = sb.prepare(data.size());
boost::asio::buffer_copy(d,
boost::asio::const_buffers_1(
data.data(), data.size()));
if(fh.mask)
detail::mask_inplace(d, key);
sb.commit(data.size());
}
} // detail
//------------------------------------------------------------------------------
template<class Stream>
template<class... Args>
socket<Stream>::socket(Args&&... args)
: next_layer_(std::forward<Args>(args)...)
, stream_(next_layer_)
{
static_assert(is_Stream<next_layer_type>::value,
"Stream requirements not met");
}
template<class Stream>
void
socket<Stream>::accept(error_code& ec)
{
accept(boost::asio::null_buffers{}, ec);
}
template<class Stream>
template<class AcceptHandler>
auto
socket<Stream>::async_accept(AcceptHandler&& handler)
{
return async_accept(boost::asio::null_buffers{},
std::forward<AcceptHandler>(handler));
}
template<class Stream>
template<class ConstBufferSequence>
void
socket<Stream>::accept(
ConstBufferSequence const& buffers)
{
static_assert(is_ConstBufferSequence<
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
error_code ec;
accept(buffers, ec);
detail::maybe_throw(ec, "accept");
}
template<class Stream>
template<class ConstBufferSequence>
void
socket<Stream>::accept(
ConstBufferSequence const& buffers, error_code& ec)
{
static_assert(beast::is_ConstBufferSequence<
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
stream_.buffer().commit(buffer_copy(
stream_.buffer().prepare(
buffer_size(buffers)), buffers));
http::request<http::empty_body> m;
http::read(next_layer_, stream_.buffer(), m, ec);
if(ec)
return;
accept(m, ec);
}
template<class Stream>
template<class ConstBufferSequence, class AcceptHandler>
auto
socket<Stream>::async_accept(
ConstBufferSequence const& bs, AcceptHandler&& handler)
{
static_assert(beast::is_ConstBufferSequence<
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
beast::async_completion<
AcceptHandler, void(error_code)
> completion(handler);
accept_op<decltype(completion.handler)>{
completion.handler, *this, bs};
return completion.result.get();
}
template<class Stream>
template<class Body, class Headers>
void
socket<Stream>::accept(
http::message<true, Body, Headers> const& request)
{
error_code ec;
accept(request, ec);
detail::maybe_throw(ec, "accept");
}
template<class Stream>
template<class Body, class Headers>
void
socket<Stream>::accept(
http::message<true, Body, Headers> const& req,
error_code& ec)
{
auto resp = build_response(req);
http::write(stream_, resp, ec);
if(resp.status != 101)
{
ec = error::handshake_failed;
// VFALCO TODO Respect keep alive setting, perform
// teardown if Connection: close.
return;
}
role_ = role_type::server;
}
template<class Stream>
template<class Body, class Headers, class AcceptHandler>
auto
socket<Stream>::async_accept(
http::message<true, Body, Headers> const& req,
AcceptHandler&& handler)
{
beast::async_completion<
AcceptHandler, void(error_code)
> completion(handler);
response_op<decltype(completion.handler)>{
completion.handler, *this, req,
boost_asio_handler_cont_helpers::
is_continuation(completion.handler)};
return completion.result.get();
}
template<class Stream>
void
socket<Stream>::handshake(boost::string_ref const& host,
boost::string_ref const& resource, error_code& ec)
{
std::string key;
http::write(stream_,
build_request(host, resource, key), ec);
if(ec)
return;
http::response<http::string_body> resp;
http::read(next_layer_, stream_.buffer(), resp, ec);
if(ec)
return;
do_response(resp, key, ec);
}
template<class Stream>
template<class HandshakeHandler>
auto
socket<Stream>::async_handshake(boost::string_ref const& host,
boost::string_ref const& resource, HandshakeHandler&& handler)
{
beast::async_completion<
HandshakeHandler, void(error_code)
> completion(handler);
handshake_op<decltype(completion.handler)>{
completion.handler, *this, host, resource};
return completion.result.get();
}
template<class Stream>
void
socket<Stream>::close(
close_reason const& cr, error_code& ec)
{
assert(! wr_close_);
wr_close_ = true;
detail::frame_streambuf fb;
write_close<static_streambuf>(fb, cr);
boost::asio::write(stream_, fb.data(), ec);
error_ = ec != 0;
}
template<class Stream>
template<class CloseHandler>
auto
socket<Stream>::async_close(
close_reason const& cr, CloseHandler&& handler)
{
beast::async_completion<
CloseHandler, void(error_code)
> completion(handler);
close_op<decltype(completion.handler)>{
completion.handler, *this, cr};
return completion.result.get();
}
template<class Stream>
template<class Streambuf>
void
socket<Stream>::
read(opcode& op, Streambuf& streambuf, error_code& ec)
{
frame_info fi;
for(;;)
{
read_frame(fi, streambuf, ec);
if(ec)
break;
op = fi.op;
if(fi.fin)
break;
}
}
template<class Stream>
template<class Streambuf, class ReadHandler>
auto
socket<Stream>::
async_read(opcode& op,
Streambuf& streambuf, ReadHandler&& handler)
{
static_assert(beast::is_Streambuf<Streambuf>::value,
"Streambuf requirements not met");
beast::async_completion<
ReadHandler, void(error_code)
> completion(handler);
read_op<Streambuf, decltype(completion.handler)>{
completion.handler, *this, op, streambuf};
return completion.result.get();
}
template<class Stream>
template<class Streambuf>
void
socket<Stream>::read_frame(frame_info& fi,
Streambuf& streambuf, error_code& ec)
{
close_code code{};
for(;;)
{
if(rd_need_ == 0)
{
// read header
detail::frame_streambuf fb;
do_read_fh(fb, code, ec);
if((error_ = ec != 0))
return;
if(code != close_code::none)
break;
if(detail::is_control(rd_fh_.op))
{
// read control payload
if(rd_fh_.len > 0)
{
auto const mb = fb.prepare(
static_cast<std::size_t>(rd_fh_.len));
fb.commit(boost::asio::read(stream_, mb, ec));
if((error_ = ec != 0))
return;
if(rd_fh_.mask)
detail::mask_inplace(mb, rd_key_);
fb.commit(static_cast<std::size_t>(rd_fh_.len));
}
if(rd_fh_.op == opcode::ping)
{
ping_payload_type data;
detail::read(data, fb.data(), code);
if(code != close_code::none)
break;
fb.reset();
write_ping<static_streambuf>(
fb, opcode::pong, data);
boost::asio::write(stream_, fb.data(), ec);
if((error_ = ec != 0))
return;
continue;
}
else if(rd_fh_.op == opcode::pong)
{
ping_payload_type data;
detail::read(data, fb.data(), code);
if((error_ = ec != 0))
break;
// VFALCO How to notify callers using
// the synchronous interface?
continue;
}
assert(rd_fh_.op == opcode::close);
{
detail::read(cr_, fb.data(), code);
if(code != close_code::none)
break;
if(! wr_close_)
{
auto cr = cr_;
if(cr.code == close_code::none)
cr.code = close_code::normal;
cr.reason = "";
fb.reset();
wr_close_ = true;
write_close<static_streambuf>(fb, cr);
boost::asio::write(stream_, fb.data(), ec);
if((error_ = ec != 0))
return;
}
break;
}
}
if(rd_need_ == 0 && ! rd_fh_.fin)
{
// empty frame
continue;
}
}
// read payload
auto smb = streambuf.prepare(
detail::clamp(rd_need_));
auto const bytes_transferred =
stream_.read_some(smb, ec);
if((error_ = ec != 0))
return;
rd_need_ -= bytes_transferred;
auto const pb = prepare_buffers(
bytes_transferred, smb);
if(rd_fh_.mask)
detail::mask_inplace(pb, rd_key_);
if(rd_opcode_ == opcode::text)
{
if(! rd_utf8_check_.write(pb) ||
(rd_need_ == 0 && rd_fh_.fin &&
! rd_utf8_check_.finish()))
{
code = close_code::bad_payload;
break;
}
}
streambuf.commit(bytes_transferred);
fi.op = rd_opcode_;
fi.fin = rd_fh_.fin && rd_need_ == 0;
return;
}
if(code != close_code::none)
{
// Fail the connection (per rfc6455)
if(! wr_close_)
{
wr_close_ = true;
detail::frame_streambuf fb;
write_close<static_streambuf>(fb, code);
boost::asio::write(stream_, fb.data(), ec);
if((error_ = ec != 0))
return;
}
wsproto_helpers::call_teardown(next_layer_, ec);
if((error_ = ec != 0))
return;
ec = error::failed;
error_ = true;
return;
}
if(! ec)
wsproto_helpers::call_teardown(next_layer_, ec);
if(! ec)
ec = error::closed;
error_ = ec != 0;
}
template<class Stream>
template<class Streambuf, class ReadHandler>
auto
socket<Stream>::async_read_frame(frame_info& fi,
Streambuf& streambuf, ReadHandler&& handler)
{
static_assert(beast::is_Streambuf<Streambuf>::value,
"Streambuf requirements not met");
beast::async_completion<
ReadHandler, void(error_code)> completion(handler);
read_frame_op<Streambuf, decltype(completion.handler)>{
completion.handler, *this, fi, streambuf};
return completion.result.get();
}
template<class Stream>
template<class ConstBufferSequence>
void
socket<Stream>::write(
ConstBufferSequence const& bs, error_code& ec)
{
static_assert(beast::is_ConstBufferSequence<
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
using boost::asio::buffer_size;
consuming_buffers<ConstBufferSequence> cb(bs);
auto remain = buffer_size(cb);
for(;;)
{
auto const n =
detail::clamp(remain, wr_frag_size_);
remain -= n;
auto const fin = remain <= 0;
write_frame(fin, prepare_buffers(n, cb), ec);
cb.consume(n);
if(ec)
return;
if(fin)
break;
}
}
template<class Stream>
template<class ConstBufferSequence, class WriteHandler>
auto
socket<Stream>::async_write(
ConstBufferSequence const& bs, WriteHandler&& handler)
{
static_assert(beast::is_ConstBufferSequence<
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
beast::async_completion<
WriteHandler, void(error_code)> completion(handler);
write_op<ConstBufferSequence, decltype(completion.handler)>{
completion.handler, *this, bs};
return completion.result.get();
}
template<class Stream>
template<class ConstBufferSequence>
void
socket<Stream>::write_frame(bool fin,
ConstBufferSequence const& bs, error_code& ec)
{
static_assert(beast::is_ConstBufferSequence<
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
using boost::asio::mutable_buffers_1;
detail::frame_header fh;
fh.op = wr_cont_ ? opcode::cont : wr_opcode_;
wr_cont_ = ! fin;
fh.fin = fin;
fh.rsv1 = false;
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = buffer_size(bs);
if((fh.mask = (role_ == role_type::client)))
fh.key = maskgen_();
detail::fh_streambuf fh_buf;
detail::write<static_streambuf>(fh_buf, fh);
if(! fh.mask)
{
// send header and payload
boost::asio::write(stream_,
append_buffers(fh_buf.data(), bs), ec);
error_ = ec != 0;
return;
}
detail::prepared_key_type key;
detail::prepare_key(key, fh.key);
auto const tmp_size = detail::clamp(
fh.len, wr_buf_size_);
std::unique_ptr<std::uint8_t[]> up(
new std::uint8_t[tmp_size]);
auto const tmp = up.get();
std::uint64_t remain = fh.len;
consuming_buffers<ConstBufferSequence> cb(bs);
{
auto const n =
detail::clamp(remain, tmp_size);
mutable_buffers_1 mb{tmp, n};
buffer_copy(mb, cb);
cb.consume(n);
remain -= n;
detail::mask_inplace(mb, key);
// send header and payload
boost::asio::write(stream_,
append_buffers(fh_buf.data(), mb), ec);
if(ec)
{
error_ = ec != 0;
return;
}
}
while(remain > 0)
{
auto const n =
detail::clamp(remain, tmp_size);
mutable_buffers_1 mb{tmp, n};
buffer_copy(mb, cb);
cb.consume(n);
remain -= n;
detail::mask_inplace(mb, key);
// send payload
boost::asio::write(stream_, mb, ec);
if(ec)
{
error_ = ec != 0;
return;
}
}
}
template<class Stream>
template<class ConstBufferSequence, class WriteHandler>
auto
socket<Stream>::async_write_frame(bool fin,
ConstBufferSequence const& bs, WriteHandler&& handler)
{
static_assert(beast::is_ConstBufferSequence<
ConstBufferSequence>::value,
"ConstBufferSequence requirements not met");
beast::async_completion<
WriteHandler, void(error_code)
> completion(handler);
write_frame_op<ConstBufferSequence, decltype(
completion.handler)>{completion.handler,
*this, fin, bs};
return completion.result.get();
}
//------------------------------------------------------------------------------
template<class Stream>
http::request<http::empty_body>
socket<Stream>::build_request(boost::string_ref const& host,
boost::string_ref const& resource, std::string& key)
{
http::request<http::empty_body> req;
req.url = "/";
req.version = 11;
req.method = http::method_t::http_get;
req.headers.insert("Host", host);
req.headers.insert("Connection", "upgrade");
req.headers.insert("Upgrade", "websocket");
key = detail::make_sec_ws_key(maskgen_);
req.headers.insert("Sec-WebSocket-Key", key);
req.headers.insert("Sec-WebSocket-Version", "13");
(*d_)(req);
return req;
}
template<class Stream>
template<class Body, class Headers>
http::response<http::string_body>
socket<Stream>::build_response(
http::message<true, Body, Headers> const& req)
{
auto err =
[&](auto const& text)
{
http::response<http::string_body> resp(
{400, http::reason_string(400), req.version});
resp.body = text;
// VFALCO TODO respect keep-alive here
return resp;
};
if(req.version < 11)
return err("HTTP version 1.1 required");
if(req.method != http::method_t::http_get)
return err("Wrong method");
if(! is_upgrade(req))
return err("Expected Upgrade request");
if(! req.headers.exists("Host"))
return err("Missing Host");
if(! req.headers.exists("Sec-WebSocket-Key"))
return err("Missing Sec-WebSocket-Key");
{
auto const version =
req.headers["Sec-WebSocket-Version"];
if(version.empty())
return err("Missing Sec-WebSocket-Version");
if(version != "13")
return err("Unsupported Sec-WebSocket-Version");
}
if(! rfc2616::token_in_list(
req.headers["Upgrade"], "websocket"))
return err("Missing websocket Upgrade token");
http::response<http::string_body> resp(
{101, http::reason_string(101), req.version});
resp.headers.insert("Upgrade", "websocket");
resp.headers.insert("Connection", "upgrade");
{
auto const key =
req.headers["Sec-WebSocket-Key"];
resp.headers.insert("Sec-WebSocket-Key", key);
resp.headers.insert("Sec-WebSocket-Accept",
detail::make_sec_ws_accept(key));
}
resp.headers.replace("Server", "Beast.WSProto");
(*d_)(resp);
return resp;
}
template<class Stream>
template<class Body, class Headers>
void
socket<Stream>::do_response(
http::message<false, Body, Headers> const& resp,
boost::string_ref const& key, error_code& ec)
{
// VFALCO Review these error codes
auto fail = [&]{ ec = error::response_failed; };
if(resp.status != 101)
return fail();
if(! is_upgrade(resp))
return fail();
if(! rfc2616::ci_equal(
resp.headers["Upgrade"], "websocket"))
return fail();
if(! resp.headers.exists("Sec-WebSocket-Accept"))
return fail();
if(resp.headers["Sec-WebSocket-Accept"] !=
detail::make_sec_ws_accept(key))
return fail();
role_ = role_type::client;
}
template<class Stream>
void
socket<Stream>::do_read_fh(
detail::frame_streambuf& fb,
close_code& code, error_code& ec)
{
fb.commit(boost::asio::read(
stream_, fb.prepare(2), ec));
if(ec)
return;
auto const n = detail::read_fh1(
rd_fh_, fb, role_, code);
if(code != close_code::none)
return;
if(n > 0)
{
fb.commit(boost::asio::read(
stream_, fb.prepare(n), ec));
if(ec)
return;
}
detail::read_fh2(
rd_fh_, fb, role_, code);
if(code != close_code::none)
return;
prepare_fh(code);
}
} // wsproto
} // beast
#endif