Squashed 'src/beast/' changes from c00cd37..06f74f0

06f74f0 Set version to 1.0.0-b26
68f535f Tidy up warnings and tests:
4ee5fa9 Set version to 1.0.0-b25
229d390 Update README.md for CppCast 2017
c3e3a55 Fix deflate setup bug
439a224 WebSocket server examples and test tidying:
29565c8 Remove unnecessary include
caa3b39 Fix 32-bit arm7 warnings
0474cc5 Better handler_ptr (API Change):
ca38657 Fixes for websocket echo server:
797631c Set version to 1.0.0-b24
a450968 Add permessage-deflate WebSocket extension:
67e965e Make decorator copyable
42899fc Add optional yield_to arguments
61aef03 Simplify Travis package install specification
9d0d7c9 bjam use clang on MACOSX

git-subtree-dir: src/beast
git-subtree-split: 06f74f05f7de51d7f791a17c2b06840183332cbe
This commit is contained in:
Vinnie Falco
2017-02-02 09:05:27 -05:00
parent 7028579170
commit c652cf066d
56 changed files with 3678 additions and 1680 deletions

View File

@@ -83,7 +83,8 @@ class buffer_cat_helper<Bn...>::const_iterator
iter()
{
return *reinterpret_cast<
iter_t<I>*>(buf_.data());
iter_t<I>*>(static_cast<void*>(
buf_.data()));
}
template<std::size_t I>
@@ -91,7 +92,8 @@ class buffer_cat_helper<Bn...>::const_iterator
iter() const
{
return *reinterpret_cast<
iter_t<I> const*>(buf_.data());
iter_t<I> const*>(static_cast<
void const*>(buf_.data()));
}
public:

View File

@@ -11,23 +11,25 @@
#include <beast/core/detail/type_traits.hpp>
#include <atomic>
#include <cstdint>
#include <type_traits>
#include <utility>
namespace beast {
/** A smart pointer container.
/** A smart pointer container with associated completion handler.
This is a smart pointer that retains shared ownership of an
object through a pointer. Memory is managed using the allocation
and deallocation functions associated with a completion handler,
which is also stored in the object. The object is destroyed and
its memory deallocated when one of the following happens:
which is also stored in the object. The managed object is
destroyed and its memory deallocated when one of the following
happens:
@li The function @ref invoke is called.
@li The function @ref release_handler is called
@li The last remaining container owning the object is destroyed
@li The function @ref release_handler is called.
@li The last remaining container owning the object is destroyed.
Objects of this type are used in the implementation of
composed operations. Typically the composed operation's shared
@@ -38,6 +40,10 @@ namespace beast {
@note The reference count is stored using a 16 bit unsigned
integer. Making more than 2^16 copies of one object results
in undefined behavior.
@tparam T The type of the owned object.
@tparam Handler The type of the completion handler.
*/
template<class T, class Handler>
class handler_ptr
@@ -60,10 +66,10 @@ class handler_ptr
P* p_;
template<class DeducedHandler, class... Args>
handler_ptr(int, DeducedHandler&& handler, Args&&... args);
public:
/// The type of element this object stores
using element_type = T;
/// The type of handler this object stores
using handler_type = Handler;
@@ -88,6 +94,46 @@ public:
/// Copy constructor
handler_ptr(handler_ptr const& other);
/** Construct a new @ref handler_ptr
This creates a new @ref handler_ptr with an owned object
of type `T`. The allocator associated with the handler will
be used to allocate memory for the owned object. The constructor
for the owned object will be called thusly:
@code
T(handler, std::forward<Args>(args)...)
@endcode
@param handler The handler to associate with the owned
object. The argument will be moved.
@param args Optional arguments forwarded to
the owned object's constructor.
*/
template<class... Args>
handler_ptr(Handler&& handler, Args&&... args);
/** Construct a new @ref handler_ptr
This creates a new @ref handler_ptr with an owned object
of type `T`. The allocator associated with the handler will
be used to allocate memory for the owned object. The constructor
for the owned object will be called thusly:
@code
T(handler, std::forward<Args>(args)...)
@endcode
@param handler The handler to associate with the owned
object. The argument will be copied.
@param args Optional arguments forwarded to
the owned object's constructor.
*/
template<class... Args>
handler_ptr(Handler const& handler, Args&&... args);
/// Returns a reference to the handler
handler_type&
handler() const
@@ -95,25 +141,36 @@ public:
return p_->handler;
}
/// Returns a pointer to the owned object
/// Returns `true` if `*this` owns an object.
explicit
operator bool() const
{
return p_ && p_->t;
}
/** Returns a pointer to the owned object.
If `*this` owns an object, a pointer to the
object is returned, else `nullptr` is returned.
*/
T*
get() const
{
return p_->t;
return p_ ? p_->t : nullptr;
}
/// Return a reference to the owned object.
T&
operator*() const
{
return *get();
return *p_->t;
}
/// Return a pointer to the owned object.
T*
operator->() const
{
return get();
return p_->t;
}
/** Release ownership of the handler
@@ -137,33 +194,6 @@ public:
template<class... Args>
void
invoke(Args&&... args);
// VFALCO The free function interface works around
// a horrible Visual Studio 15 Update 3 bug
/** Construct a new `handler_ptr`.
@param handler The handler. The allocator associated with
the handler will be used to allocate memory for the owned
object. This argument will be forwarded to the owned object's
constructor.
@param args Optional arguments forwarded to
the owned object's constructor.
*/
/** @{ */
template<class U, class CompletionHandler, class... Args>
friend
handler_ptr<U, CompletionHandler>
make_handler_ptr(
CompletionHandler&& handler, Args&&... args);
template<class U, class CompletionHandler, class... Args>
friend
handler_ptr<U, CompletionHandler>
make_handler_ptr(
CompletionHandler const& handler, Args&&... args);
/** @} */
};
} // beast

View File

@@ -598,9 +598,9 @@ basic_streambuf<Allocator>::prepare(size_type n) ->
while(n > 0)
{
auto const size = std::max(alloc_size_, n);
auto& e = *reinterpret_cast<element*>(
alloc_traits::allocate(this->member(),
sizeof(element) + size));
auto& e = *reinterpret_cast<element*>(static_cast<
void*>(alloc_traits::allocate(this->member(),
sizeof(element) + size)));
alloc_traits::construct(this->member(), &e, size);
list_.push_back(e);
if(out_ == list_.end())

View File

@@ -45,9 +45,8 @@ public:
template<class DeducedHandler, class... Args>
read_some_op(DeducedHandler&& h,
dynabuf_readstream& srs, Args&&... args)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(h), srs,
std::forward<Args>(args)...))
: d_(std::forward<DeducedHandler>(h),
srs, std::forward<Args>(args)...)
{
(*this)(error_code{}, 0);
}

View File

@@ -39,15 +39,6 @@ P(DeducedHandler&& h, Args&&... args)
}
}
template<class T, class Handler>
template<class DeducedHandler, class... Args>
handler_ptr<T, Handler>::
handler_ptr(int, DeducedHandler&& handler, Args&&... args)
: p_(new P(std::forward<DeducedHandler>(handler),
std::forward<Args>(args)...))
{
}
template<class T, class Handler>
handler_ptr<T, Handler>::
~handler_ptr()
@@ -82,6 +73,27 @@ handler_ptr(handler_ptr const& other)
++p_->n;
}
template<class T, class Handler>
template<class... Args>
handler_ptr<T, Handler>::
handler_ptr(Handler&& handler, Args&&... args)
: p_(new P{std::move(handler),
std::forward<Args>(args)...})
{
static_assert(! std::is_array<T>::value,
"T must not be an array type");
}
template<class T, class Handler>
template<class... Args>
handler_ptr<T, Handler>::
handler_ptr(Handler const& handler, Args&&... args)
: p_(new P{handler, std::forward<Args>(args)...})
{
static_assert(! std::is_array<T>::value,
"T must not be an array type");
}
template<class T, class Handler>
auto
handler_ptr<T, Handler>::
@@ -112,27 +124,6 @@ invoke(Args&&... args)
p_->handler(std::forward<Args>(args)...);
}
template<
class T, class CompletionHandler, class... Args>
handler_ptr<T, CompletionHandler>
make_handler_ptr(
CompletionHandler&& handler, Args&&... args)
{
return handler_ptr<T, CompletionHandler>{0,
std::move(handler),
std::forward<Args>(args)...};
}
template<
class T, class CompletionHandler, class... Args>
handler_ptr<T, CompletionHandler>
make_handler_ptr(
CompletionHandler const& handler, Args&&... args)
{
return handler_ptr<T, CompletionHandler>{0,
handler, std::forward<Args>(args)...};
}
} // beast
#endif

View File

@@ -53,9 +53,8 @@ public:
template<class DeducedHandler, class... Args>
parse_op(DeducedHandler&& h, Stream& s, Args&&... args)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(h), s,
std::forward<Args>(args)...))
: d_(std::forward<DeducedHandler>(h),
s, std::forward<Args>(args)...)
{
(*this)(error_code{}, 0, false);
}

View File

@@ -64,9 +64,8 @@ public:
template<class DeducedHandler, class... Args>
read_header_op(
DeducedHandler&& h, Stream& s, Args&&... args)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(h), s,
std::forward<Args>(args)...))
: d_(std::forward<DeducedHandler>(h),
s, std::forward<Args>(args)...)
{
(*this)(error_code{}, false);
}
@@ -236,9 +235,8 @@ public:
template<class DeducedHandler, class... Args>
read_op(DeducedHandler&& h, Stream& s, Args&&... args)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(h), s,
std::forward<Args>(args)...))
: d_(std::forward<DeducedHandler>(h),
s, std::forward<Args>(args)...)
{
(*this)(error_code{}, false);
}

View File

@@ -126,9 +126,8 @@ public:
template<class DeducedHandler, class... Args>
write_streambuf_op(DeducedHandler&& h, Stream& s,
Args&&... args)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(h),
s, std::forward<Args>(args)...))
: d_(std::forward<DeducedHandler>(h),
s, std::forward<Args>(args)...)
{
(*this)(error_code{}, 0, false);
}
@@ -373,9 +372,8 @@ public:
template<class DeducedHandler, class... Args>
write_op(DeducedHandler&& h, Stream& s, Args&&... args)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(h), s,
std::forward<Args>(args)...))
: d_(std::forward<DeducedHandler>(h),
s, std::forward<Args>(args)...)
{
auto& d = *d_;
auto sp = d_;

View File

@@ -16,6 +16,6 @@
//
#define BEAST_VERSION 100000
#define BEAST_VERSION_STRING "1.0.0-b23"
#define BEAST_VERSION_STRING "1.0.0-b26"
#endif

View File

@@ -12,6 +12,7 @@
#include <beast/http/message.hpp>
#include <beast/http/string_body.hpp>
#include <beast/version.hpp>
#include <type_traits>
#include <utility>
namespace beast {
@@ -29,104 +30,136 @@ struct abstract_decorator
virtual
void
operator()(request_type& req) = 0;
operator()(request_type& req) const = 0;
virtual
void
operator()(response_type& resp) = 0;
operator()(response_type& res) const = 0;
};
template<class T>
template<class F>
class decorator : public abstract_decorator
{
T t_;
F f_;
class call_req_possible
{
template<class U, class R = decltype(
std::declval<U>().operator()(
std::declval<U const>().operator()(
std::declval<request_type&>()),
std::true_type{})>
static R check(int);
template<class>
static std::false_type check(...);
public:
using type = decltype(check<T>(0));
using type = decltype(check<F>(0));
};
class call_res_possible
{
template<class U, class R = decltype(
std::declval<U>().operator()(
std::declval<U const>().operator()(
std::declval<response_type&>()),
std::true_type{})>
static R check(int);
template<class>
static std::false_type check(...);
public:
using type = decltype(check<T>(0));
using type = decltype(check<F>(0));
};
public:
decorator() = default;
decorator(T&& t)
: t_(std::move(t))
decorator(F&& t)
: f_(std::move(t))
{
}
decorator(T const& t)
: t_(t)
decorator(F const& t)
: f_(t)
{
}
void
operator()(request_type& req) override
operator()(request_type& req) const override
{
(*this)(req, typename call_req_possible::type{});
}
void
operator()(response_type& resp) override
operator()(response_type& res) const override
{
(*this)(resp, typename call_res_possible::type{});
(*this)(res, typename call_res_possible::type{});
}
private:
void
operator()(request_type& req, std::true_type)
operator()(request_type& req, std::true_type) const
{
t_(req);
f_(req);
}
void
operator()(request_type& req, std::false_type)
operator()(request_type& req, std::false_type) const
{
req.fields.replace("User-Agent",
std::string{"Beast/"} + BEAST_VERSION_STRING);
}
void
operator()(response_type& res, std::true_type)
operator()(response_type& res, std::true_type) const
{
t_(res);
f_(res);
}
void
operator()(response_type& res, std::false_type)
operator()(response_type& res, std::false_type) const
{
res.fields.replace("Server",
std::string{"Beast/"} + BEAST_VERSION_STRING);
}
};
class decorator_type
{
std::shared_ptr<abstract_decorator> p_;
public:
decorator_type() = delete;
decorator_type(decorator_type&&) = default;
decorator_type(decorator_type const&) = default;
decorator_type& operator=(decorator_type&&) = default;
decorator_type& operator=(decorator_type const&) = default;
template<class F, class =
typename std::enable_if<! std::is_same<
typename std::decay<F>::type,
decorator_type>::value>>
decorator_type(F&& f)
: p_(std::make_shared<decorator<F>>(
std::forward<F>(f)))
{
BOOST_ASSERT(p_);
}
void
operator()(request_type& req)
{
(*p_)(req);
BOOST_ASSERT(p_);
}
void
operator()(response_type& res)
{
(*p_)(res);
BOOST_ASSERT(p_);
}
};
struct default_decorator
{
};
using decorator_type =
std::unique_ptr<abstract_decorator>;
} // detail
} // websocket
} // beast

View File

@@ -125,7 +125,7 @@ public:
void
emplace(F&& f);
void
bool
maybe_invoke()
{
if(base_)
@@ -133,7 +133,9 @@ public:
auto const basep = base_;
base_ = nullptr;
(*basep)();
return true;
}
return false;
}
};

View File

@@ -60,11 +60,17 @@ void
maskgen_t<_>::rekey()
{
std::random_device rng;
#if 0
std::array<std::uint32_t, 32> e;
for(auto& i : e)
i = rng();
// VFALCO This constructor causes
// address sanitizer to fail, no idea why.
std::seed_seq ss(e.begin(), e.end());
g_.seed(ss);
#else
g_.seed(rng());
#endif
}
// VFALCO NOTE This generator has 5KB of state!
@@ -73,7 +79,7 @@ using maskgen = maskgen_t<std::minstd_rand>;
//------------------------------------------------------------------------------
using prepared_key_type =
using prepared_key =
std::conditional<sizeof(void*) == 8,
std::uint64_t, std::uint32_t>::type;

View File

@@ -0,0 +1,472 @@
//
// Copyright (c) 2013-2016 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BEAST_WEBSOCKET_DETAIL_PMD_EXTENSION_HPP
#define BEAST_WEBSOCKET_DETAIL_PMD_EXTENSION_HPP
#include <beast/core/error.hpp>
#include <beast/core/consuming_buffers.hpp>
#include <beast/core/detail/ci_char_traits.hpp>
#include <beast/zlib/deflate_stream.hpp>
#include <beast/zlib/inflate_stream.hpp>
#include <beast/websocket/option.hpp>
#include <beast/http/rfc7230.hpp>
#include <boost/asio/buffer.hpp>
#include <utility>
namespace beast {
namespace websocket {
namespace detail {
// permessage-deflate offer parameters
//
// "context takeover" means:
// preserve sliding window across messages
//
struct pmd_offer
{
bool accept;
// 0 = absent, or 8..15
int server_max_window_bits;
// -1 = present, 0 = absent, or 8..15
int client_max_window_bits;
// `true` if server_no_context_takeover offered
bool server_no_context_takeover;
// `true` if client_no_context_takeover offered
bool client_no_context_takeover;
};
template<class = void>
int
parse_bits(boost::string_ref const& s)
{
if(s.size() == 0)
return -1;
if(s.size() > 2)
return -1;
if(s[0] < '1' || s[0] > '9')
return -1;
int i = 0;
for(auto c : s)
{
if(c < '0' || c > '9')
return -1;
i = 10 * i + (c - '0');
}
return i;
}
// Parse permessage-deflate request fields
//
template<class Fields>
void
pmd_read(pmd_offer& offer, Fields const& fields)
{
offer.accept = false;
offer.server_max_window_bits= 0;
offer.client_max_window_bits = 0;
offer.server_no_context_takeover = false;
offer.client_no_context_takeover = false;
using beast::detail::ci_equal;
http::ext_list list{
fields["Sec-WebSocket-Extensions"]};
for(auto const& ext : list)
{
if(ci_equal(ext.first, "permessage-deflate"))
{
for(auto const& param : ext.second)
{
if(ci_equal(param.first,
"server_max_window_bits"))
{
if(offer.server_max_window_bits != 0)
{
// The negotiation offer contains multiple
// extension parameters with the same name.
//
return; // MUST decline
}
if(param.second.empty())
{
// The negotiation offer extension
// parameter is missing the value.
//
return; // MUST decline
}
offer.server_max_window_bits =
parse_bits(param.second);
if( offer.server_max_window_bits < 8 ||
offer.server_max_window_bits > 15)
{
// The negotiation offer contains an
// extension parameter with an invalid value.
//
return; // MUST decline
}
}
else if(ci_equal(param.first,
"client_max_window_bits"))
{
if(offer.client_max_window_bits != 0)
{
// The negotiation offer contains multiple
// extension parameters with the same name.
//
return; // MUST decline
}
if(! param.second.empty())
{
offer.client_max_window_bits =
parse_bits(param.second);
if( offer.client_max_window_bits < 8 ||
offer.client_max_window_bits > 15)
{
// The negotiation offer contains an
// extension parameter with an invalid value.
//
return; // MUST decline
}
}
else
{
offer.client_max_window_bits = -1;
}
}
else if(ci_equal(param.first,
"server_no_context_takeover"))
{
if(offer.server_no_context_takeover)
{
// The negotiation offer contains multiple
// extension parameters with the same name.
//
return; // MUST decline
}
if(! param.second.empty())
{
// The negotiation offer contains an
// extension parameter with an invalid value.
//
return; // MUST decline
}
offer.server_no_context_takeover = true;
}
else if(ci_equal(param.first,
"client_no_context_takeover"))
{
if(offer.client_no_context_takeover)
{
// The negotiation offer contains multiple
// extension parameters with the same name.
//
return; // MUST decline
}
if(! param.second.empty())
{
// The negotiation offer contains an
// extension parameter with an invalid value.
//
return; // MUST decline
}
offer.client_no_context_takeover = true;
}
else
{
// The negotiation offer contains an extension
// parameter not defined for use in an offer.
//
return; // MUST decline
}
}
offer.accept = true;
return;
}
}
}
// Set permessage-deflate fields for a client offer
//
template<class Fields>
void
pmd_write(Fields& fields, pmd_offer const& offer)
{
std::string s;
s = "permessage-deflate";
if(offer.server_max_window_bits != 0)
{
if(offer.server_max_window_bits != -1)
{
s += "; server_max_window_bits=";
s += std::to_string(
offer.server_max_window_bits);
}
else
{
s += "; server_max_window_bits";
}
}
if(offer.client_max_window_bits != 0)
{
if(offer.client_max_window_bits != -1)
{
s += "; client_max_window_bits=";
s += std::to_string(
offer.client_max_window_bits);
}
else
{
s += "; client_max_window_bits";
}
}
if(offer.server_no_context_takeover)
{
s += "; server_no_context_takeover";
}
if(offer.client_no_context_takeover)
{
s += "; client_no_context_takeover";
}
fields.replace("Sec-WebSocket-Extensions", s);
}
// Negotiate a permessage-deflate client offer
//
template<class Fields>
void
pmd_negotiate(
Fields& fields,
pmd_offer& config,
pmd_offer const& offer,
permessage_deflate const& o)
{
if(! (offer.accept && o.server_enable))
{
config.accept = false;
return;
}
config.accept = true;
std::string s = "permessage-deflate";
config.server_no_context_takeover =
offer.server_no_context_takeover ||
o.server_no_context_takeover;
if(config.server_no_context_takeover)
s += "; server_no_context_takeover";
config.client_no_context_takeover =
o.client_no_context_takeover ||
offer.client_no_context_takeover;
if(config.client_no_context_takeover)
s += "; client_no_context_takeover";
if(offer.server_max_window_bits != 0)
config.server_max_window_bits = std::min(
offer.server_max_window_bits,
o.server_max_window_bits);
else
config.server_max_window_bits =
o.server_max_window_bits;
if(config.server_max_window_bits < 15)
{
// ZLib's deflateInit silently treats 8 as
// 9 due to a bug, so prevent 8 from being used.
//
if(config.server_max_window_bits < 9)
config.server_max_window_bits = 9;
s += "; server_max_window_bits=";
s += std::to_string(
config.server_max_window_bits);
}
switch(offer.client_max_window_bits)
{
case -1:
// extension parameter is present with no value
config.client_max_window_bits =
o.client_max_window_bits;
if(config.client_max_window_bits < 15)
{
s += "; client_max_window_bits=";
s += std::to_string(
config.client_max_window_bits);
}
break;
case 0:
/* extension parameter is absent.
If a received extension negotiation offer doesn't have the
"client_max_window_bits" extension parameter, the corresponding
extension negotiation response to the offer MUST NOT include the
"client_max_window_bits" extension parameter.
*/
if(o.client_max_window_bits == 15)
config.client_max_window_bits = 15;
else
config.accept = false;
break;
default:
// extension parameter has value in [8..15]
config.client_max_window_bits = std::min(
o.client_max_window_bits,
offer.client_max_window_bits);
s += "; client_max_window_bits=";
s += std::to_string(
config.client_max_window_bits);
break;
}
if(config.accept)
fields.replace("Sec-WebSocket-Extensions", s);
}
// Normalize the server's response
//
inline
void
pmd_normalize(pmd_offer& offer)
{
if(offer.accept)
{
if( offer.server_max_window_bits == 0)
offer.server_max_window_bits = 15;
if( offer.client_max_window_bits == 0 ||
offer.client_max_window_bits == -1)
offer.client_max_window_bits = 15;
}
}
//--------------------------------------------------------------------
// Decompress into a DynamicBuffer
//
template<class InflateStream, class DynamicBuffer>
void
inflate(
InflateStream& zi,
DynamicBuffer& dynabuf,
boost::asio::const_buffer const& in,
error_code& ec)
{
using boost::asio::buffer_cast;
using boost::asio::buffer_size;
zlib::z_params zs;
zs.avail_in = buffer_size(in);
zs.next_in = buffer_cast<void const*>(in);
for(;;)
{
// VFALCO we could be smarter about the size
auto const bs = dynabuf.prepare(
read_size_helper(dynabuf, 65536));
auto const out = *bs.begin();
zs.avail_out = buffer_size(out);
zs.next_out = buffer_cast<void*>(out);
zi.write(zs, zlib::Flush::sync, ec);
dynabuf.commit(zs.total_out);
zs.total_out = 0;
if( ec == zlib::error::need_buffers ||
ec == zlib::error::end_of_stream)
{
ec = {};
break;
}
if(ec)
return;
}
}
// Compress a buffer sequence
// Returns: `true` if more calls are needed
//
template<class DeflateStream, class ConstBufferSequence>
bool
deflate(
DeflateStream& zo,
boost::asio::mutable_buffer& out,
consuming_buffers<ConstBufferSequence>& cb,
bool fin,
error_code& ec)
{
using boost::asio::buffer;
using boost::asio::buffer_cast;
using boost::asio::buffer_size;
BOOST_ASSERT(buffer_size(out) >= 6);
zlib::z_params zs;
zs.avail_in = 0;
zs.next_in = nullptr;
zs.avail_out = buffer_size(out);
zs.next_out = buffer_cast<void*>(out);
for(auto const& in : cb)
{
zs.avail_in = buffer_size(in);
if(zs.avail_in == 0)
continue;
zs.next_in = buffer_cast<void const*>(in);
zo.write(zs, zlib::Flush::none, ec);
if(ec)
{
if(ec != zlib::error::need_buffers)
return false;
BOOST_ASSERT(zs.avail_out == 0);
BOOST_ASSERT(zs.total_out == buffer_size(out));
ec = {};
break;
}
if(zs.avail_out == 0)
{
BOOST_ASSERT(zs.total_out == buffer_size(out));
break;
}
BOOST_ASSERT(zs.avail_in == 0);
}
cb.consume(zs.total_in);
if(zs.avail_out > 0 && fin)
{
auto const remain = buffer_size(cb);
if(remain == 0)
{
// Inspired by Mark Adler
// https://github.com/madler/zlib/issues/149
//
// VFALCO We could do this flush twice depending
// on how much space is in the output.
zo.write(zs, zlib::Flush::block, ec);
BOOST_ASSERT(! ec || ec == zlib::error::need_buffers);
if(ec == zlib::error::need_buffers)
ec = {};
if(ec)
return false;
if(zs.avail_out >= 6)
{
zo.write(zs, zlib::Flush::full, ec);
BOOST_ASSERT(! ec);
// remove flush marker
zs.total_out -= 4;
out = buffer(
buffer_cast<void*>(out), zs.total_out);
return false;
}
}
}
out = buffer(
buffer_cast<void*>(out), zs.total_out);
return true;
}
} // detail
} // websocket
} // beast
#endif

View File

@@ -15,10 +15,13 @@
#include <beast/websocket/detail/frame.hpp>
#include <beast/websocket/detail/invokable.hpp>
#include <beast/websocket/detail/mask.hpp>
#include <beast/websocket/detail/pmd_extension.hpp>
#include <beast/websocket/detail/utf8_checker.hpp>
#include <beast/http/empty_body.hpp>
#include <beast/http/message.hpp>
#include <beast/http/string_body.hpp>
#include <beast/zlib/deflate_stream.hpp>
#include <beast/zlib/inflate_stream.hpp>
#include <boost/asio/error.hpp>
#include <boost/assert.hpp>
#include <cstdint>
@@ -53,20 +56,13 @@ protected:
std::size_t rd_msg_max_ =
16 * 1024 * 1024; // max message size
bool wr_autofrag_ = true; // auto fragment
std::size_t wr_buf_size_ = 4096; // mask buffer size
std::size_t wr_buf_size_ = 4096; // write buffer size
std::size_t rd_buf_size_ = 4096; // read buffer size
opcode wr_opcode_ = opcode::text; // outgoing message type
pong_cb pong_cb_; // pong callback
role_type role_; // server or client
bool failed_; // the connection failed
detail::frame_header rd_fh_; // current frame header
detail::prepared_key_type rd_key_; // prepared masking key
detail::utf8_checker rd_utf8_check_; // for current text msg
std::uint64_t rd_size_; // size of the current message so far
std::uint64_t rd_need_ = 0; // bytes left in msg frame payload
opcode rd_opcode_; // opcode of current msg
bool rd_cont_; // expecting a continuation frame
bool wr_close_; // sent close frame
op* wr_block_; // op currenly writing
@@ -75,6 +71,34 @@ protected:
invokable wr_op_; // invoked after read completes
close_reason cr_; // set from received close frame
// State information for the message being received
//
struct rd_t
{
// opcode of current message being read
opcode op;
// `true` if the next frame is a continuation.
bool cont;
// Checks that test messages are valid utf8
detail::utf8_checker utf8;
// Size of the current message so far.
std::uint64_t size;
// Size of the read buffer.
// This gets set to the read buffer size option at the
// beginning of sending a message, so that the option can be
// changed mid-send without affecting the current message.
std::size_t buf_size;
// The read buffer. Used for compression and masking.
std::unique_ptr<std::uint8_t[]> buf;
};
rd_t rd_;
// State information for the message being sent
//
struct wr_t
@@ -99,36 +123,43 @@ protected:
// This gets set to the write buffer size option at the
// beginning of sending a message, so that the option can be
// changed mid-send without affecting the current message.
std::size_t size;
std::size_t buf_size;
// The write buffer.
// The write buffer. Used for compression and masking.
// The buffer is allocated or reallocated at the beginning of
// sending a message.
std::unique_ptr<std::uint8_t[]> buf;
void
open()
{
cont = false;
size = 0;
}
void
close()
{
buf.reset();
}
};
wr_t wr_;
// State information for the permessage-deflate extension
struct pmd_t
{
// `true` if current read message is compressed
bool rd_set;
zlib::deflate_stream zo;
zlib::inflate_stream zi;
};
// If not engaged, then permessage-deflate is not
// enabled for the currently active session.
std::unique_ptr<pmd_t> pmd_;
// Local options for permessage-deflate
permessage_deflate pmd_opts_;
// Offer for clients, negotiated result for servers
pmd_offer pmd_config_;
stream_base(stream_base&&) = default;
stream_base(stream_base const&) = delete;
stream_base& operator=(stream_base&&) = default;
stream_base& operator=(stream_base const&) = delete;
stream_base()
: d_(new decorator<default_decorator>{})
: d_(detail::default_decorator{})
{
}
@@ -142,15 +173,24 @@ protected:
template<class DynamicBuffer>
std::size_t
read_fh1(DynamicBuffer& db, close_code::value& code);
read_fh1(detail::frame_header& fh,
DynamicBuffer& db, close_code::value& code);
template<class DynamicBuffer>
void
read_fh2(DynamicBuffer& db, close_code::value& code);
read_fh2(detail::frame_header& fh,
DynamicBuffer& db, close_code::value& code);
// Called before receiving the first frame of each message
template<class = void>
void
wr_prepare(bool compress);
rd_begin();
// Called before sending the first frame of each message
//
template<class = void>
void
wr_begin();
template<class DynamicBuffer>
void
@@ -161,7 +201,7 @@ protected:
write_ping(DynamicBuffer& db, opcode op, ping_data const& data);
};
template<class _>
template<class>
void
stream_base::
open(role_type role)
@@ -169,30 +209,61 @@ open(role_type role)
// VFALCO TODO analyze and remove dupe code in reset()
role_ = role;
failed_ = false;
rd_need_ = 0;
rd_cont_ = false;
rd_.cont = false;
wr_close_ = false;
wr_block_ = nullptr; // should be nullptr on close anyway
pong_data_ = nullptr; // should be nullptr on close anyway
wr_.open();
wr_.cont = false;
wr_.buf_size = 0;
if(((role_ == role_type::client && pmd_opts_.client_enable) ||
(role_ == role_type::server && pmd_opts_.server_enable)) &&
pmd_config_.accept)
{
pmd_normalize(pmd_config_);
pmd_.reset(new pmd_t);
if(role_ == role_type::client)
{
pmd_->zi.reset(
pmd_config_.server_max_window_bits);
pmd_->zo.reset(
pmd_opts_.compLevel,
pmd_config_.client_max_window_bits,
pmd_opts_.memLevel,
zlib::Strategy::normal);
}
else
{
pmd_->zi.reset(
pmd_config_.client_max_window_bits);
pmd_->zo.reset(
pmd_opts_.compLevel,
pmd_config_.server_max_window_bits,
pmd_opts_.memLevel,
zlib::Strategy::normal);
}
}
}
template<class _>
template<class>
void
stream_base::
close()
{
wr_.close();
rd_.buf.reset();
wr_.buf.reset();
pmd_.reset();
}
// Read fixed frame header
// Read fixed frame header from buffer
// Requires at least 2 bytes
//
template<class DynamicBuffer>
std::size_t
stream_base::
read_fh1(DynamicBuffer& db, close_code::value& code)
read_fh1(detail::frame_header& fh,
DynamicBuffer& db, close_code::value& code)
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
@@ -204,48 +275,51 @@ read_fh1(DynamicBuffer& db, close_code::value& code)
return 0;
};
std::uint8_t b[2];
assert(buffer_size(db.data()) >= sizeof(b));
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
std::size_t need;
rd_fh_.len = b[1] & 0x7f;
switch(rd_fh_.len)
fh.len = b[1] & 0x7f;
switch(fh.len)
{
case 126: need = 2; break;
case 127: need = 8; break;
default:
need = 0;
}
rd_fh_.mask = (b[1] & 0x80) != 0;
if(rd_fh_.mask)
fh.mask = (b[1] & 0x80) != 0;
if(fh.mask)
need += 4;
rd_fh_.op = static_cast<opcode>(b[0] & 0x0f);
rd_fh_.fin = (b[0] & 0x80) != 0;
rd_fh_.rsv1 = (b[0] & 0x40) != 0;
rd_fh_.rsv2 = (b[0] & 0x20) != 0;
rd_fh_.rsv3 = (b[0] & 0x10) != 0;
switch(rd_fh_.op)
fh.op = static_cast<opcode>(b[0] & 0x0f);
fh.fin = (b[0] & 0x80) != 0;
fh.rsv1 = (b[0] & 0x40) != 0;
fh.rsv2 = (b[0] & 0x20) != 0;
fh.rsv3 = (b[0] & 0x10) != 0;
switch(fh.op)
{
case opcode::binary:
case opcode::text:
if(rd_cont_)
if(rd_.cont)
{
// new data frame when continuation expected
return err(close_code::protocol_error);
}
if(rd_fh_.rsv1 || rd_fh_.rsv2 || rd_fh_.rsv3)
if((fh.rsv1 && ! pmd_) ||
fh.rsv2 || fh.rsv3)
{
// reserved bits not cleared
return err(close_code::protocol_error);
}
if(pmd_)
pmd_->rd_set = fh.rsv1;
break;
case opcode::cont:
if(! rd_cont_)
if(! rd_.cont)
{
// continuation without an active message
return err(close_code::protocol_error);
}
if(rd_fh_.rsv1 || rd_fh_.rsv2 || rd_fh_.rsv3)
if(fh.rsv1 || fh.rsv2 || fh.rsv3)
{
// reserved bits not cleared
return err(close_code::protocol_error);
@@ -253,22 +327,22 @@ read_fh1(DynamicBuffer& db, close_code::value& code)
break;
default:
if(is_reserved(rd_fh_.op))
if(is_reserved(fh.op))
{
// reserved opcode
return err(close_code::protocol_error);
}
if(! rd_fh_.fin)
if(! fh.fin)
{
// fragmented control message
return err(close_code::protocol_error);
}
if(rd_fh_.len > 125)
if(fh.len > 125)
{
// invalid length for control message
return err(close_code::protocol_error);
}
if(rd_fh_.rsv1 || rd_fh_.rsv2 || rd_fh_.rsv3)
if(fh.rsv1 || fh.rsv2 || fh.rsv3)
{
// reserved bits not cleared
return err(close_code::protocol_error);
@@ -276,13 +350,13 @@ read_fh1(DynamicBuffer& db, close_code::value& code)
break;
}
// unmasked frame from client
if(role_ == role_type::server && ! rd_fh_.mask)
if(role_ == role_type::server && ! fh.mask)
{
code = close_code::protocol_error;
return 0;
}
// masked frame from server
if(role_ == role_type::client && rd_fh_.mask)
if(role_ == role_type::client && fh.mask)
{
code = close_code::protocol_error;
return 0;
@@ -291,27 +365,28 @@ read_fh1(DynamicBuffer& db, close_code::value& code)
return need;
}
// Decode variable frame header from stream
// Decode variable frame header from buffer
//
template<class DynamicBuffer>
void
stream_base::
read_fh2(DynamicBuffer& db, close_code::value& code)
read_fh2(detail::frame_header& fh,
DynamicBuffer& db, close_code::value& code)
{
using boost::asio::buffer;
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
using namespace boost::endian;
switch(rd_fh_.len)
switch(fh.len)
{
case 126:
{
std::uint8_t b[2];
assert(buffer_size(db.data()) >= sizeof(b));
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
rd_fh_.len = big_uint16_to_native(&b[0]);
fh.len = big_uint16_to_native(&b[0]);
// length not canonical
if(rd_fh_.len < 126)
if(fh.len < 126)
{
code = close_code::protocol_error;
return;
@@ -321,11 +396,11 @@ read_fh2(DynamicBuffer& db, close_code::value& code)
case 127:
{
std::uint8_t b[8];
assert(buffer_size(db.data()) >= sizeof(b));
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
rd_fh_.len = big_uint64_to_native(&b[0]);
fh.len = big_uint64_to_native(&b[0]);
// length not canonical
if(rd_fh_.len < 65536)
if(fh.len < 65536)
{
code = close_code::protocol_error;
return;
@@ -333,67 +408,76 @@ read_fh2(DynamicBuffer& db, close_code::value& code)
break;
}
}
if(rd_fh_.mask)
if(fh.mask)
{
std::uint8_t b[4];
assert(buffer_size(db.data()) >= sizeof(b));
BOOST_ASSERT(buffer_size(db.data()) >= sizeof(b));
db.consume(buffer_copy(buffer(b), db.data()));
rd_fh_.key = little_uint32_to_native(&b[0]);
fh.key = little_uint32_to_native(&b[0]);
}
else
{
// initialize this otherwise operator== breaks
rd_fh_.key = 0;
fh.key = 0;
}
if(rd_fh_.mask)
prepare_key(rd_key_, rd_fh_.key);
if(! is_control(rd_fh_.op))
if(! is_control(fh.op))
{
if(rd_fh_.op != opcode::cont)
if(fh.op != opcode::cont)
{
rd_size_ = rd_fh_.len;
rd_opcode_ = rd_fh_.op;
rd_.size = 0;
rd_.op = fh.op;
}
else
{
if(rd_size_ > (std::numeric_limits<
std::uint64_t>::max)() - rd_fh_.len)
if(rd_.size > (std::numeric_limits<
std::uint64_t>::max)() - fh.len)
{
code = close_code::too_big;
return;
}
rd_size_ += rd_fh_.len;
}
if(rd_msg_max_ && rd_size_ > rd_msg_max_)
{
code = close_code::too_big;
return;
}
rd_need_ = rd_fh_.len;
rd_cont_ = ! rd_fh_.fin;
rd_.cont = ! fh.fin;
}
code = close_code::none;
}
template<class _>
template<class>
void
stream_base::
wr_prepare(bool compress)
rd_begin()
{
// Maintain the read buffer
if(pmd_)
{
if(! rd_.buf || rd_.buf_size != rd_buf_size_)
{
rd_.buf_size = rd_buf_size_;
rd_.buf.reset(new std::uint8_t[rd_.buf_size]);
}
}
}
template<class>
void
stream_base::
wr_begin()
{
wr_.autofrag = wr_autofrag_;
wr_.compress = compress;
if(compress || wr_.autofrag ||
wr_.compress = static_cast<bool>(pmd_);
// Maintain the write buffer
if( wr_.compress ||
role_ == detail::role_type::client)
{
if(! wr_.buf || wr_.size != wr_buf_size_)
if(! wr_.buf || wr_.buf_size != wr_buf_size_)
{
wr_.size = wr_buf_size_;
wr_.buf.reset(new std::uint8_t[wr_.size]);
wr_.buf_size = wr_buf_size_;
wr_.buf.reset(new std::uint8_t[wr_.buf_size]);
}
}
else
{
wr_.size = wr_buf_size_;
wr_.buf_size = wr_buf_size_;
wr_.buf.reset();
}
}
@@ -418,7 +502,7 @@ write_close(DynamicBuffer& db, close_reason const& cr)
detail::write(db, fh);
if(cr.code != close_code::none)
{
detail::prepared_key_type key;
detail::prepared_key key;
if(fh.mask)
detail::prepare_key(key, fh.key);
{
@@ -464,7 +548,7 @@ write_ping(
detail::write(db, fh);
if(data.empty())
return;
detail::prepared_key_type key;
detail::prepared_key key;
if(fh.mask)
detail::prepare_key(key, fh.key);
auto d = db.prepare(data.size());

View File

@@ -35,7 +35,7 @@ class stream<NextLayer>::response_op
{
bool cont;
stream<NextLayer>& ws;
http::response<http::string_body> resp;
http::response<http::string_body> res;
error_code final_ec;
int state = 0;
@@ -45,12 +45,12 @@ class stream<NextLayer>::response_op
bool cont_)
: cont(cont_)
, ws(ws_)
, resp(ws_.build_response(req))
, res(ws_.build_response(req))
{
// can't call stream::reset() here
// otherwise accept_op will malfunction
//
if(resp.status != 101)
if(res.status != 101)
final_ec = error::handshake_failed;
}
};
@@ -64,9 +64,8 @@ public:
template<class DeducedHandler, class... Args>
response_op(DeducedHandler&& h,
stream<NextLayer>& ws, Args&&... args)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(h), ws,
std::forward<Args>(args)...))
: d_(std::forward<DeducedHandler>(h),
ws, std::forward<Args>(args)...)
{
(*this)(error_code{}, false);
}
@@ -121,7 +120,7 @@ operator()(error_code ec, bool again)
// send response
d.state = 1;
http::async_write(d.ws.next_layer(),
d.resp, std::move(*this));
d.res, std::move(*this));
return;
// sent response
@@ -129,7 +128,11 @@ operator()(error_code ec, bool again)
d.state = 99;
ec = d.final_ec;
if(! ec)
{
pmd_read(
d.ws.pmd_config_, d.res.fields);
d.ws.open(detail::role_type::server);
}
break;
}
}
@@ -176,9 +179,8 @@ public:
template<class DeducedHandler, class... Args>
accept_op(DeducedHandler&& h,
stream<NextLayer>& ws, Args&&... args)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(h), ws,
std::forward<Args>(args)...))
: d_(std::forward<DeducedHandler>(h),
ws, std::forward<Args>(args)...)
{
(*this)(error_code{}, 0, false);
}
@@ -412,6 +414,7 @@ accept(http::request<Body, Fields> const& req,
// teardown if Connection: close.
return;
}
pmd_read(pmd_config_, req.fields);
open(detail::role_type::server);
}

View File

@@ -56,9 +56,8 @@ public:
template<class DeducedHandler, class... Args>
close_op(DeducedHandler&& h,
stream<NextLayer>& ws, Args&&... args)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(h), ws,
std::forward<Args>(args)...))
: d_(std::forward<DeducedHandler>(h),
ws, std::forward<Args>(args)...)
{
(*this)(error_code{}, false);
}

View File

@@ -59,9 +59,8 @@ public:
template<class DeducedHandler, class... Args>
handshake_op(DeducedHandler&& h,
stream<NextLayer>& ws, Args&&... args)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(h), ws,
std::forward<Args>(args)...))
: d_(std::forward<DeducedHandler>(h),
ws, std::forward<Args>(args)...)
{
(*this)(error_code{}, false);
}
@@ -118,6 +117,8 @@ operator()(error_code ec, bool again)
d.state = 1;
// VFALCO Do we need the ability to move
// a message on the async_write?
pmd_read(
d.ws.pmd_config_, d.req.fields);
http::async_write(d.ws.stream_,
d.req, std::move(*this));
return;
@@ -187,8 +188,12 @@ handshake(boost::string_ref const& host,
"SyncStream requirements not met");
reset();
std::string key;
http::write(stream_,
build_request(host, resource, key), ec);
{
auto const req =
build_request(host, resource, key);
pmd_read(pmd_config_, req.fields);
http::write(stream_, req, ec);
}
if(ec)
return;
http::response<http::string_body> res;

View File

@@ -55,9 +55,8 @@ public:
template<class DeducedHandler, class... Args>
ping_op(DeducedHandler&& h,
stream<NextLayer>& ws, Args&&... args)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(h), ws,
std::forward<Args>(args)...))
: d_(std::forward<DeducedHandler>(h),
ws, std::forward<Args>(args)...)
{
(*this)(error_code{}, false);
}

View File

@@ -18,6 +18,7 @@
#include <beast/core/detail/clamp.hpp>
#include <boost/assert.hpp>
#include <boost/optional.hpp>
#include <limits>
#include <memory>
namespace beast {
@@ -48,6 +49,9 @@ class stream<NextLayer>::read_frame_op
frame_info& fi;
DynamicBuffer& db;
fb_type fb;
std::uint64_t remain;
detail::frame_header fh;
detail::prepared_key key;
boost::optional<dmb_type> dmb;
boost::optional<fmb_type> fmb;
int state = 0;
@@ -72,9 +76,8 @@ public:
template<class DeducedHandler, class... Args>
read_frame_op(DeducedHandler&& h,
stream<NextLayer>& ws, Args&&... args)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(h), ws,
std::forward<Args>(args)...))
: d_(std::forward<DeducedHandler>(h),
ws, std::forward<Args>(args)...)
{
(*this)(error_code{}, 0, false);
}
@@ -142,23 +145,26 @@ template<class NextLayer>
template<class DynamicBuffer, class Handler>
void
stream<NextLayer>::read_frame_op<DynamicBuffer, Handler>::
operator()(error_code ec,std::size_t bytes_transferred, bool again)
operator()(error_code ec,
std::size_t bytes_transferred, bool again)
{
using beast::detail::clamp;
using boost::asio::buffer;
enum
{
do_start = 0,
do_read_payload = 1,
do_frame_done = 3,
do_read_fh = 4,
do_control_payload = 7,
do_control = 8,
do_pong_resume = 9,
do_pong = 11,
do_close_resume = 13,
do_close = 15,
do_teardown = 16,
do_fail = 18,
do_inflate_payload = 30,
do_frame_done = 4,
do_read_fh = 5,
do_control_payload = 8,
do_control = 9,
do_pong_resume = 10,
do_pong = 12,
do_close_resume = 14,
do_close = 16,
do_teardown = 17,
do_fail = 19,
do_call_handler = 99
};
@@ -181,32 +187,51 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
boost::asio::error::operation_aborted, 0));
return;
}
d.state = d.ws.rd_need_ > 0 ?
do_read_payload : do_read_fh;
d.state = do_read_fh;
break;
//------------------------------------------------------------------
case do_read_payload:
d.state = do_read_payload + 1;
d.dmb = d.db.prepare(clamp(d.ws.rd_need_));
// receive payload data
if(d.fh.len == 0)
{
d.state = do_frame_done;
break;
}
// Enforce message size limit
if(d.ws.rd_msg_max_ && d.fh.len >
d.ws.rd_msg_max_ - d.ws.rd_.size)
{
code = close_code::too_big;
d.state = do_fail;
break;
}
d.ws.rd_.size += d.fh.len;
d.remain = d.fh.len;
if(d.fh.mask)
detail::prepare_key(d.key, d.fh.key);
// fall through
case do_read_payload + 1:
d.state = do_read_payload + 2;
d.dmb = d.db.prepare(clamp(d.remain));
// Read frame payload data
d.ws.stream_.async_read_some(
*d.dmb, std::move(*this));
return;
case do_read_payload + 1:
case do_read_payload + 2:
{
d.ws.rd_need_ -= bytes_transferred;
d.remain -= bytes_transferred;
auto const pb = prepare_buffers(
bytes_transferred, *d.dmb);
if(d.ws.rd_fh_.mask)
detail::mask_inplace(pb, d.ws.rd_key_);
if(d.ws.rd_opcode_ == opcode::text)
if(d.fh.mask)
detail::mask_inplace(pb, d.key);
if(d.ws.rd_.op == opcode::text)
{
if(! d.ws.rd_utf8_check_.write(pb) ||
(d.ws.rd_need_ == 0 && d.ws.rd_fh_.fin &&
! d.ws.rd_utf8_check_.finish()))
if(! d.ws.rd_.utf8.write(pb) ||
(d.remain == 0 && d.fh.fin &&
! d.ws.rd_.utf8.finish()))
{
// invalid utf8
code = close_code::bad_payload;
@@ -215,21 +240,102 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
}
}
d.db.commit(bytes_transferred);
if(d.ws.rd_need_ > 0)
if(d.remain > 0)
{
d.state = do_read_payload;
d.state = do_read_payload + 1;
break;
}
d.state = do_frame_done;
break;
}
//------------------------------------------------------------------
case do_inflate_payload:
d.remain = d.fh.len;
if(d.fh.len == 0)
{
// inflate even if fh.len == 0, otherwise we
// never emit the end-of-stream deflate block.
bytes_transferred = 0;
d.state = do_inflate_payload + 2;
break;
}
if(d.fh.mask)
detail::prepare_key(d.key, d.fh.key);
// fall through
case do_inflate_payload + 1:
{
d.state = do_inflate_payload + 2;
// Read compressed frame payload data
d.ws.stream_.async_read_some(
buffer(d.ws.rd_.buf.get(), clamp(
d.remain, d.ws.rd_.buf_size)),
std::move(*this));
return;
}
case do_inflate_payload + 2:
{
d.remain -= bytes_transferred;
auto const in = buffer(
d.ws.rd_.buf.get(), bytes_transferred);
if(d.fh.mask)
detail::mask_inplace(in, d.key);
auto const prev = d.db.size();
detail::inflate(d.ws.pmd_->zi, d.db, in, ec);
d.ws.failed_ = ec != 0;
if(d.ws.failed_)
break;
if(d.remain == 0 && d.fh.fin)
{
static std::uint8_t constexpr
empty_block[4] = {
0x00, 0x00, 0xff, 0xff };
detail::inflate(d.ws.pmd_->zi, d.db,
buffer(&empty_block[0], 4), ec);
d.ws.failed_ = ec != 0;
if(d.ws.failed_)
break;
}
if(d.ws.rd_.op == opcode::text)
{
consuming_buffers<typename
DynamicBuffer::const_buffers_type
> cb{d.db.data()};
cb.consume(prev);
if(! d.ws.rd_.utf8.write(cb) ||
(d.remain == 0 && d.fh.fin &&
! d.ws.rd_.utf8.finish()))
{
// invalid utf8
code = close_code::bad_payload;
d.state = do_fail;
break;
}
}
if(d.remain > 0)
{
d.state = do_inflate_payload + 1;
break;
}
if(d.fh.fin && (
(d.ws.role_ == detail::role_type::client &&
d.ws.pmd_config_.server_no_context_takeover) ||
(d.ws.role_ == detail::role_type::server &&
d.ws.pmd_config_.client_no_context_takeover)))
d.ws.pmd_->zi.reset();
d.state = do_frame_done;
break;
}
//------------------------------------------------------------------
case do_frame_done:
// call handler
d.fi.op = d.ws.rd_opcode_;
d.fi.fin = d.ws.rd_fh_.fin &&
d.ws.rd_need_ == 0;
d.fi.op = d.ws.rd_.op;
d.fi.fin = d.fh.fin;
goto upcall;
//------------------------------------------------------------------
@@ -244,7 +350,8 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
{
d.fb.commit(bytes_transferred);
code = close_code::none;
auto const n = d.ws.read_fh1(d.fb, code);
auto const n = d.ws.read_fh1(
d.fh, d.fb, code);
if(code != close_code::none)
{
// protocol error
@@ -266,21 +373,21 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
case do_read_fh + 2:
d.fb.commit(bytes_transferred);
code = close_code::none;
d.ws.read_fh2(d.fb, code);
d.ws.read_fh2(d.fh, d.fb, code);
if(code != close_code::none)
{
// protocol error
d.state = do_fail;
break;
}
if(detail::is_control(d.ws.rd_fh_.op))
if(detail::is_control(d.fh.op))
{
if(d.ws.rd_fh_.len > 0)
if(d.fh.len > 0)
{
// read control payload
d.state = do_control_payload;
d.fmb = d.fb.prepare(static_cast<
std::size_t>(d.ws.rd_fh_.len));
std::size_t>(d.fh.len));
boost::asio::async_read(d.ws.stream_,
*d.fmb, std::move(*this));
return;
@@ -288,21 +395,29 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
d.state = do_control;
break;
}
if(d.ws.rd_need_ > 0)
if(d.fh.op == opcode::text ||
d.fh.op == opcode::binary)
d.ws.rd_begin();
if(d.fh.len == 0 && ! d.fh.fin)
{
d.state = do_read_payload;
// Empty message frame
d.state = do_frame_done;
break;
}
// empty frame
d.state = do_frame_done;
if(! d.ws.pmd_ || ! d.ws.pmd_->rd_set)
d.state = do_read_payload;
else
d.state = do_inflate_payload;
break;
//------------------------------------------------------------------
case do_control_payload:
if(d.ws.rd_fh_.mask)
detail::mask_inplace(
*d.fmb, d.ws.rd_key_);
if(d.fh.mask)
{
detail::prepare_key(d.key, d.fh.key);
detail::mask_inplace(*d.fmb, d.key);
}
d.fb.commit(bytes_transferred);
d.state = do_control; // VFALCO fall through?
break;
@@ -310,7 +425,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
//------------------------------------------------------------------
case do_control:
if(d.ws.rd_fh_.op == opcode::ping)
if(d.fh.op == opcode::ping)
{
ping_data data;
detail::read(data, d.fb.data());
@@ -335,7 +450,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
d.state = do_pong;
break;
}
else if(d.ws.rd_fh_.op == opcode::pong)
else if(d.fh.op == opcode::pong)
{
code = close_code::none;
ping_data payload;
@@ -346,7 +461,7 @@ operator()(error_code ec,std::size_t bytes_transferred, bool again)
d.state = do_read_fh;
break;
}
BOOST_ASSERT(d.ws.rd_fh_.op == opcode::close);
BOOST_ASSERT(d.fh.op == opcode::close);
{
detail::read(d.ws.cr_, d.fb.data(), code);
if(code != close_code::none)
@@ -589,110 +704,218 @@ read_frame(frame_info& fi, DynamicBuffer& dynabuf, error_code& ec)
static_assert(beast::is_DynamicBuffer<DynamicBuffer>::value,
"DynamicBuffer requirements not met");
using beast::detail::clamp;
using boost::asio::buffer;
using boost::asio::buffer_cast;
using boost::asio::buffer_size;
close_code::value code{};
for(;;)
{
if(rd_need_ == 0)
// Read frame header
detail::frame_header fh;
detail::frame_streambuf fb;
{
// read header
detail::frame_streambuf fb;
do_read_fh(fb, code, ec);
fb.commit(boost::asio::read(
stream_, fb.prepare(2), ec));
failed_ = ec != 0;
if(failed_)
return;
{
auto const n = read_fh1(fh, fb, code);
if(code != close_code::none)
goto do_close;
if(n > 0)
{
fb.commit(boost::asio::read(
stream_, fb.prepare(n), ec));
failed_ = ec != 0;
if(failed_)
return;
}
}
read_fh2(fh, fb, code);
failed_ = ec != 0;
if(failed_)
return;
if(code != close_code::none)
break;
if(detail::is_control(rd_fh_.op))
goto do_close;
}
if(detail::is_control(fh.op))
{
// Read control frame payload
if(fh.len > 0)
{
// read control payload
if(rd_fh_.len > 0)
auto const mb = fb.prepare(
static_cast<std::size_t>(fh.len));
fb.commit(boost::asio::read(stream_, mb, ec));
failed_ = ec != 0;
if(failed_)
return;
if(fh.mask)
{
auto const mb = fb.prepare(
static_cast<std::size_t>(rd_fh_.len));
fb.commit(boost::asio::read(stream_, mb, ec));
failed_ = ec != 0;
if(failed_)
return;
if(rd_fh_.mask)
detail::mask_inplace(mb, rd_key_);
fb.commit(static_cast<std::size_t>(rd_fh_.len));
detail::prepared_key key;
detail::prepare_key(key, fh.key);
detail::mask_inplace(mb, key);
}
if(rd_fh_.op == opcode::ping)
fb.commit(static_cast<std::size_t>(fh.len));
}
// Process control frame
if(fh.op == opcode::ping)
{
ping_data data;
detail::read(data, fb.data());
fb.reset();
write_ping<static_streambuf>(
fb, opcode::pong, data);
boost::asio::write(stream_, fb.data(), ec);
failed_ = ec != 0;
if(failed_)
return;
continue;
}
else if(fh.op == opcode::pong)
{
ping_data payload;
detail::read(payload, fb.data());
if(pong_cb_)
pong_cb_(payload);
continue;
}
BOOST_ASSERT(fh.op == opcode::close);
{
detail::read(cr_, fb.data(), code);
if(code != close_code::none)
goto do_close;
if(! wr_close_)
{
ping_data data;
detail::read(data, fb.data());
auto cr = cr_;
if(cr.code == close_code::none)
cr.code = close_code::normal;
cr.reason = "";
fb.reset();
write_ping<static_streambuf>(
fb, opcode::pong, data);
wr_close_ = true;
write_close<static_streambuf>(fb, cr);
boost::asio::write(stream_, fb.data(), ec);
failed_ = ec != 0;
if(failed_)
return;
continue;
}
else if(rd_fh_.op == opcode::pong)
{
ping_data payload;
detail::read(payload, fb.data());
if(pong_cb_)
pong_cb_(payload);
continue;
}
BOOST_ASSERT(rd_fh_.op == opcode::close);
{
detail::read(cr_, fb.data(), code);
if(code != close_code::none)
break;
if(! wr_close_)
{
auto cr = cr_;
if(cr.code == close_code::none)
cr.code = close_code::normal;
cr.reason = "";
fb.reset();
wr_close_ = true;
write_close<static_streambuf>(fb, cr);
boost::asio::write(stream_, fb.data(), ec);
failed_ = ec != 0;
if(failed_)
return;
}
break;
}
}
if(rd_need_ == 0 && ! rd_fh_.fin)
{
// empty frame
continue;
goto do_close;
}
}
// read payload
auto smb = dynabuf.prepare(clamp(rd_need_));
auto const bytes_transferred =
stream_.read_some(smb, ec);
failed_ = ec != 0;
if(failed_)
return;
rd_need_ -= bytes_transferred;
auto const pb = prepare_buffers(
bytes_transferred, smb);
if(rd_fh_.mask)
detail::mask_inplace(pb, rd_key_);
if(rd_opcode_ == opcode::text)
if(fh.op != opcode::cont)
rd_begin();
if(fh.len == 0 && ! fh.fin)
{
if(! rd_utf8_check_.write(pb) ||
(rd_need_ == 0 && rd_fh_.fin &&
! rd_utf8_check_.finish()))
// empty frame
continue;
}
auto remain = fh.len;
detail::prepared_key key;
if(fh.mask)
detail::prepare_key(key, fh.key);
if(! pmd_ || ! pmd_->rd_set)
{
// Enforce message size limit
if(rd_msg_max_ && fh.len >
rd_msg_max_ - rd_.size)
{
code = close_code::bad_payload;
break;
code = close_code::too_big;
goto do_close;
}
rd_.size += fh.len;
// Read message frame payload
while(remain > 0)
{
auto b =
dynabuf.prepare(clamp(remain));
auto const bytes_transferred =
stream_.read_some(b, ec);
failed_ = ec != 0;
if(failed_)
return;
BOOST_ASSERT(bytes_transferred > 0);
remain -= bytes_transferred;
auto const pb = prepare_buffers(
bytes_transferred, b);
if(fh.mask)
detail::mask_inplace(pb, key);
if(rd_.op == opcode::text)
{
if(! rd_.utf8.write(pb) ||
(remain == 0 && fh.fin &&
! rd_.utf8.finish()))
{
code = close_code::bad_payload;
goto do_close;
}
}
dynabuf.commit(bytes_transferred);
}
}
dynabuf.commit(bytes_transferred);
fi.op = rd_opcode_;
fi.fin = rd_fh_.fin && rd_need_ == 0;
else
{
// Read compressed message frame payload:
// inflate even if fh.len == 0, otherwise we
// never emit the end-of-stream deflate block.
for(;;)
{
auto const bytes_transferred =
stream_.read_some(buffer(rd_.buf.get(),
clamp(remain, rd_.buf_size)), ec);
failed_ = ec != 0;
if(failed_)
return;
remain -= bytes_transferred;
auto const in = buffer(
rd_.buf.get(), bytes_transferred);
if(fh.mask)
detail::mask_inplace(in, key);
auto const prev = dynabuf.size();
detail::inflate(pmd_->zi, dynabuf, in, ec);
failed_ = ec != 0;
if(failed_)
return;
if(remain == 0 && fh.fin)
{
static std::uint8_t constexpr
empty_block[4] = {
0x00, 0x00, 0xff, 0xff };
detail::inflate(pmd_->zi, dynabuf,
buffer(&empty_block[0], 4), ec);
failed_ = ec != 0;
if(failed_)
return;
}
if(rd_.op == opcode::text)
{
consuming_buffers<typename
DynamicBuffer::const_buffers_type
> cb{dynabuf.data()};
cb.consume(prev);
if(! rd_.utf8.write(cb) || (
remain == 0 && fh.fin &&
! rd_.utf8.finish()))
{
code = close_code::bad_payload;
goto do_close;
}
}
if(remain == 0)
break;
}
if(fh.fin && (
(role_ == detail::role_type::client &&
pmd_config_.server_no_context_takeover) ||
(role_ == detail::role_type::server &&
pmd_config_.client_no_context_takeover)))
pmd_->zi.reset();
}
fi.op = rd_.op;
fi.fin = fh.fin;
return;
}
do_close:
if(code != close_code::none)
{
// Fail the connection (per rfc6455)
@@ -759,9 +982,8 @@ public:
template<class DeducedHandler, class... Args>
read_op(DeducedHandler&& h,
stream<NextLayer>& ws, Args&&... args)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(h), ws,
std::forward<Args>(args)...))
: d_(std::forward<DeducedHandler>(h),
ws, std::forward<Args>(args)...)
{
(*this)(error_code{}, false);
}

View File

@@ -59,9 +59,7 @@ public:
explicit
teardown_ssl_op(
DeducedHandler&& h, stream_type& stream)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(
h), stream))
: d_(std::forward<DeducedHandler>(h), stream)
{
(*this)(error_code{}, false);
}

View File

@@ -10,6 +10,7 @@
#include <beast/websocket/teardown.hpp>
#include <beast/websocket/detail/hybi13.hpp>
#include <beast/websocket/detail/pmd_extension.hpp>
#include <beast/http/read.hpp>
#include <beast/http/write.hpp>
#include <beast/http/reason.hpp>
@@ -25,6 +26,7 @@
#include <boost/endian/buffers.hpp>
#include <algorithm>
#include <memory>
#include <stdexcept>
#include <utility>
namespace beast {
@@ -38,6 +40,30 @@ stream(Args&&... args)
{
}
template<class NextLayer>
void
stream<NextLayer>::
set_option(permessage_deflate const& o)
{
if( o.server_max_window_bits > 15 ||
o.server_max_window_bits < 9)
throw std::invalid_argument{
"invalid server_max_window_bits"};
if( o.client_max_window_bits > 15 ||
o.client_max_window_bits < 9)
throw std::invalid_argument{
"invalid client_max_window_bits"};
if( o.compLevel < 0 ||
o.compLevel > 9)
throw std::invalid_argument{
"invalid compLevel"};
if( o.memLevel < 1 ||
o.memLevel > 9)
throw std::invalid_argument{
"invalid memLevel"};
pmd_opts_ = o;
}
//------------------------------------------------------------------------------
template<class NextLayer>
@@ -46,8 +72,7 @@ stream<NextLayer>::
reset()
{
failed_ = false;
rd_need_ = 0;
rd_cont_ = false;
rd_.cont = false;
wr_close_ = false;
wr_.cont = false;
wr_block_ = nullptr; // should be nullptr on close anyway
@@ -72,7 +97,22 @@ build_request(boost::string_ref const& host,
key = detail::make_sec_ws_key(maskgen_);
req.fields.insert("Sec-WebSocket-Key", key);
req.fields.insert("Sec-WebSocket-Version", "13");
(*d_)(req);
if(pmd_opts_.client_enable)
{
detail::pmd_offer config;
config.accept = true;
config.server_max_window_bits =
pmd_opts_.server_max_window_bits;
config.client_max_window_bits =
pmd_opts_.client_max_window_bits;
config.server_no_context_takeover =
pmd_opts_.server_no_context_takeover;
config.client_no_context_takeover =
pmd_opts_.client_no_context_takeover;
detail::pmd_write(
req.fields, config);
}
d_(req);
http::prepare(req, http::connection::upgrade);
return req;
}
@@ -91,7 +131,7 @@ build_response(http::request<Body, Fields> const& req)
res.reason = http::reason_string(res.status);
res.version = req.version;
res.body = text;
(*d_)(res);
d_(res);
prepare(res,
(is_keep_alive(req) && keep_alive_) ?
http::connection::keep_alive :
@@ -122,6 +162,7 @@ build_response(http::request<Body, Fields> const& req)
res.reason = http::reason_string(res.status);
res.version = req.version;
res.fields.insert("Sec-WebSocket-Version", "13");
d_(res);
prepare(res,
(is_keep_alive(req) && keep_alive_) ?
http::connection::keep_alive :
@@ -130,6 +171,13 @@ build_response(http::request<Body, Fields> const& req)
}
}
http::response<http::string_body> res;
{
detail::pmd_offer offer;
detail::pmd_offer unused;
pmd_read(offer, req.fields);
pmd_negotiate(
res.fields, unused, offer, pmd_opts_);
}
res.status = 101;
res.reason = http::reason_string(res.status);
res.version = req.version;
@@ -141,7 +189,7 @@ build_response(http::request<Body, Fields> const& req)
detail::make_sec_ws_accept(key));
}
res.fields.replace("Server", "Beast.WSProto");
(*d_)(res);
d_(res);
http::prepare(res, http::connection::upgrade);
return res;
}
@@ -168,32 +216,14 @@ do_response(http::response<Body, Fields> const& res,
if(res.fields["Sec-WebSocket-Accept"] !=
detail::make_sec_ws_accept(key))
return fail();
detail::pmd_offer offer;
pmd_read(offer, res.fields);
// VFALCO see if offer satisfies pmd_config_,
// return an error if not.
pmd_config_ = offer; // overwrite for now
open(detail::role_type::client);
}
template<class NextLayer>
void
stream<NextLayer>::
do_read_fh(detail::frame_streambuf& fb,
close_code::value& code, error_code& ec)
{
fb.commit(boost::asio::read(
stream_, fb.prepare(2), ec));
if(ec)
return;
auto const n = read_fh1(fb, code);
if(code != close_code::none)
return;
if(n > 0)
{
fb.commit(boost::asio::read(
stream_, fb.prepare(n), ec));
if(ec)
return;
}
read_fh2(fb, code);
}
} // websocket
} // beast

View File

@@ -47,9 +47,7 @@ public:
teardown_tcp_op(
DeducedHandler&& h,
socket_type& socket)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(
h), socket))
: d_(std::forward<DeducedHandler>(h), socket)
{
(*this)(error_code{}, 0, false);
}

View File

@@ -26,74 +26,6 @@
namespace beast {
namespace websocket {
/*
template<class ConstBufferSequence>
void
write_frame(bool fin, ConstBufferSequence const& buffer)
Depending on the settings of autofragment role, and compression,
different algorithms are used.
1. autofragment: false
compression: false
In the server role, this will send a single frame in one
system call, by concatenating the frame header and the payload.
In the client role, this will send a single frame in one system
call, using the write buffer to calculate masked data.
2. autofragment: true
compression: false
In the server role, this will send one or more frames in one
system call per sent frame. Each frame is sent by concatenating
the frame header and payload. The size of each sent frame will
not exceed the write buffer size option.
In the client role, this will send one or more frames in one
system call per sent frame, using the write buffer to calculate
masked data. The size of each sent frame will not exceed the
write buffer size option.
3. autofragment: false
compression: true
In the server role, this will...
*/
/*
if(compress)
compress buffers into write_buffer
if(write_buffer_avail == write_buffer_size || fin`)
if(mask)
apply mask to write buffer
write frame header, write_buffer as one frame
else if(auto-fragment)
if(fin || write_buffer_avail + buffers size == write_buffer_size)
if(mask)
append buffers to write buffer
apply mask to write buffer
write frame header, write buffer as one frame
else:
write frame header, write buffer, and buffers as one frame
else:
append buffers to write buffer
else if(mask)
copy buffers to write_buffer
apply mask to write_buffer
write frame header and possibly full write_buffer in a single call
loop:
copy buffers to write_buffer
apply mask to write_buffer
write write_buffer in a single call
else
write frame header, buffers as one frame
*/
//------------------------------------------------------------------------------
template<class NextLayer>
template<class Buffers, class Handler>
class stream<NextLayer>::write_frame_op
@@ -104,53 +36,23 @@ class stream<NextLayer>::write_frame_op
bool cont;
stream<NextLayer>& ws;
consuming_buffers<Buffers> cb;
bool fin;
detail::frame_header fh;
detail::fh_streambuf fh_buf;
detail::prepared_key_type key;
void* tmp;
std::size_t tmp_size;
detail::prepared_key key;
std::uint64_t remain;
int state = 0;
int entry;
data(Handler& handler_, stream<NextLayer>& ws_,
bool fin, Buffers const& bs)
bool fin_, Buffers const& bs)
: handler(handler_)
, cont(beast_asio_helpers::
is_continuation(handler))
, ws(ws_)
, cb(bs)
, fin(fin_)
{
using beast::detail::clamp;
fh.op = ws.wr_.cont ?
opcode::cont : ws.wr_opcode_;
ws.wr_.cont = ! fin;
fh.fin = fin;
fh.rsv1 = false;
fh.rsv2 = false;
fh.rsv3 = false;
fh.len = boost::asio::buffer_size(cb);
fh.mask = ws.role_ == detail::role_type::client;
if(fh.mask)
{
fh.key = ws.maskgen_();
detail::prepare_key(key, fh.key);
tmp_size = clamp(fh.len, ws.wr_buf_size_);
tmp = beast_asio_helpers::
allocate(tmp_size, handler);
remain = fh.len;
}
else
{
tmp = nullptr;
}
detail::write<static_streambuf>(fh_buf, fh);
}
~data()
{
if(tmp)
beast_asio_helpers::
deallocate(tmp, tmp_size, handler);
}
};
@@ -163,21 +65,27 @@ public:
template<class DeducedHandler, class... Args>
write_frame_op(DeducedHandler&& h,
stream<NextLayer>& ws, Args&&... args)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(h),
ws, std::forward<Args>(args)...))
: d_(std::forward<DeducedHandler>(h),
ws, std::forward<Args>(args)...)
{
(*this)(error_code{}, false);
(*this)(error_code{}, 0, false);
}
void operator()()
{
(*this)(error_code{});
(*this)(error_code{}, 0, true);
}
void operator()(error_code ec, std::size_t);
void operator()(error_code const& ec)
{
(*this)(ec, 0, true);
}
void operator()(error_code ec, bool again = true);
void operator()(error_code ec,
std::size_t bytes_transferred);
void operator()(error_code ec,
std::size_t bytes_transferred, bool again);
friend
void* asio_handler_allocate(
@@ -215,12 +123,12 @@ template<class Buffers, class Handler>
void
stream<NextLayer>::
write_frame_op<Buffers, Handler>::
operator()(error_code ec, std::size_t)
operator()(error_code ec, std::size_t bytes_transferred)
{
auto& d = *d_;
if(ec)
d.ws.failed_ = true;
(*this)(ec);
(*this)(ec, bytes_transferred, true);
}
template<class NextLayer>
@@ -228,11 +136,24 @@ template<class Buffers, class Handler>
void
stream<NextLayer>::
write_frame_op<Buffers, Handler>::
operator()(error_code ec, bool again)
operator()(error_code ec,
std::size_t bytes_transferred, bool again)
{
using beast::detail::clamp;
using boost::asio::buffer;
using boost::asio::buffer_copy;
using boost::asio::mutable_buffers_1;
using boost::asio::buffer_size;
enum
{
do_init = 0,
do_nomask_nofrag = 20,
do_nomask_frag = 30,
do_mask_nofrag = 40,
do_mask_frag = 50,
do_deflate = 60,
do_maybe_suspend = 80,
do_upcall = 99
};
auto& d = *d_;
d.cont = d.cont || again;
if(ec)
@@ -241,11 +162,299 @@ operator()(error_code ec, bool again)
{
switch(d.state)
{
case 0:
case do_init:
if(! d.ws.wr_.cont)
{
d.ws.wr_begin();
d.fh.rsv1 = d.ws.wr_.compress;
}
else
{
d.fh.rsv1 = false;
}
d.fh.rsv2 = false;
d.fh.rsv3 = false;
d.fh.op = d.ws.wr_.cont ?
opcode::cont : d.ws.wr_opcode_;
d.fh.mask =
d.ws.role_ == detail::role_type::client;
if(d.ws.wr_.compress)
{
d.entry = do_deflate;
}
else if(! d.fh.mask)
{
if(! d.ws.wr_.autofrag)
{
d.entry = do_nomask_nofrag;
}
else
{
BOOST_ASSERT(d.ws.wr_.buf_size != 0);
d.remain = buffer_size(d.cb);
if(d.remain > d.ws.wr_.buf_size)
d.entry = do_nomask_frag;
else
d.entry = do_nomask_nofrag;
}
}
else
{
if(! d.ws.wr_.autofrag)
{
d.entry = do_mask_nofrag;
}
else
{
BOOST_ASSERT(d.ws.wr_.buf_size != 0);
d.remain = buffer_size(d.cb);
if(d.remain > d.ws.wr_.buf_size)
d.entry = do_mask_frag;
else
d.entry = do_mask_nofrag;
}
}
d.state = do_maybe_suspend;
break;
//----------------------------------------------------------------------
case do_nomask_nofrag:
{
d.fh.fin = d.fin;
d.fh.len = buffer_size(d.cb);
detail::write<static_streambuf>(
d.fh_buf, d.fh);
d.ws.wr_.cont = ! d.fin;
// Send frame
d.state = do_upcall;
BOOST_ASSERT(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
buffer_cat(d.fh_buf.data(), d.cb),
std::move(*this));
return;
}
//----------------------------------------------------------------------
case do_nomask_frag:
{
auto const n = clamp(
d.remain, d.ws.wr_.buf_size);
d.remain -= n;
d.fh.len = n;
d.fh.fin = d.fin ? d.remain == 0 : false;
detail::write<static_streambuf>(
d.fh_buf, d.fh);
d.ws.wr_.cont = ! d.fin;
// Send frame
d.state = d.remain == 0 ?
do_upcall : do_nomask_frag + 1;
BOOST_ASSERT(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
buffer_cat(d.fh_buf.data(),
prepare_buffers(n, d.cb)),
std::move(*this));
return;
}
case do_nomask_frag + 1:
d.cb.consume(
bytes_transferred - d.fh_buf.size());
d.fh_buf.reset();
d.fh.op = opcode::cont;
if(d.ws.wr_block_ == &d)
d.ws.wr_block_ = nullptr;
if(d.ws.rd_op_.maybe_invoke())
{
d.state = do_maybe_suspend;
d.ws.get_io_service().post(
std::move(*this));
return;
}
d.state = d.entry;
break;
//----------------------------------------------------------------------
case do_mask_nofrag:
{
d.remain = buffer_size(d.cb);
d.fh.fin = d.fin;
d.fh.len = d.remain;
d.fh.key = d.ws.maskgen_();
detail::prepare_key(d.key, d.fh.key);
detail::write<static_streambuf>(
d.fh_buf, d.fh);
auto const n =
clamp(d.remain, d.ws.wr_.buf_size);
auto const b =
buffer(d.ws.wr_.buf.get(), n);
buffer_copy(b, d.cb);
detail::mask_inplace(b, d.key);
d.remain -= n;
d.ws.wr_.cont = ! d.fin;
// Send frame header and partial payload
d.state = d.remain == 0 ?
do_upcall : do_mask_nofrag + 1;
BOOST_ASSERT(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
buffer_cat(d.fh_buf.data(), b),
std::move(*this));
return;
}
case do_mask_nofrag + 1:
{
d.cb.consume(d.ws.wr_.buf_size);
auto const n =
clamp(d.remain, d.ws.wr_.buf_size);
auto const b =
buffer(d.ws.wr_.buf.get(), n);
buffer_copy(b, d.cb);
detail::mask_inplace(b, d.key);
d.remain -= n;
// Send parial payload
if(d.remain == 0)
d.state = do_upcall;
boost::asio::async_write(
d.ws.stream_, b, std::move(*this));
return;
}
//----------------------------------------------------------------------
case do_mask_frag:
{
auto const n = clamp(
d.remain, d.ws.wr_.buf_size);
d.remain -= n;
d.fh.len = n;
d.fh.key = d.ws.maskgen_();
d.fh.fin = d.fin ? d.remain == 0 : false;
detail::prepare_key(d.key, d.fh.key);
auto const b = buffer(
d.ws.wr_.buf.get(), n);
buffer_copy(b, d.cb);
detail::mask_inplace(b, d.key);
detail::write<static_streambuf>(
d.fh_buf, d.fh);
d.ws.wr_.cont = ! d.fin;
// Send frame
d.state = d.remain == 0 ?
do_upcall : do_mask_frag + 1;
BOOST_ASSERT(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
buffer_cat(d.fh_buf.data(), b),
std::move(*this));
return;
}
case do_mask_frag + 1:
d.cb.consume(
bytes_transferred - d.fh_buf.size());
d.fh_buf.reset();
d.fh.op = opcode::cont;
BOOST_ASSERT(d.ws.wr_block_ == &d);
d.ws.wr_block_ = nullptr;
if(d.ws.rd_op_.maybe_invoke())
{
d.state = do_maybe_suspend;
d.ws.get_io_service().post(
std::move(*this));
return;
}
d.state = d.entry;
break;
//----------------------------------------------------------------------
case do_deflate:
{
auto b = buffer(d.ws.wr_.buf.get(),
d.ws.wr_.buf_size);
auto const more = detail::deflate(
d.ws.pmd_->zo, b, d.cb, d.fin, ec);
d.ws.failed_ = ec != 0;
if(d.ws.failed_)
goto upcall;
auto const n = buffer_size(b);
if(n == 0)
{
// The input was consumed, but there
// is no output due to compression
// latency.
BOOST_ASSERT(! d.fin);
BOOST_ASSERT(buffer_size(d.cb) == 0);
// We can skip the dispatch if the
// asynchronous initiation function is
// not on call stack but its hard to
// figure out so be safe and dispatch.
d.state = do_upcall;
d.ws.get_io_service().post(std::move(*this));
return;
}
if(d.fh.mask)
{
d.fh.key = d.ws.maskgen_();
detail::prepared_key key;
detail::prepare_key(key, d.fh.key);
detail::mask_inplace(b, key);
}
d.fh.fin = ! more;
d.fh.len = n;
detail::fh_streambuf fh_buf;
detail::write<static_streambuf>(fh_buf, d.fh);
d.ws.wr_.cont = ! d.fin;
// Send frame
d.state = more ?
do_deflate + 1 : do_deflate + 2;
BOOST_ASSERT(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
buffer_cat(fh_buf.data(), b),
std::move(*this));
return;
}
case do_deflate + 1:
d.fh.op = opcode::cont;
d.fh.rsv1 = false;
BOOST_ASSERT(d.ws.wr_block_ == &d);
d.ws.wr_block_ = nullptr;
if(d.ws.rd_op_.maybe_invoke())
{
d.state = do_maybe_suspend;
d.ws.get_io_service().post(
std::move(*this));
return;
}
d.state = d.entry;
break;
case do_deflate + 2:
if(d.fh.fin && (
(d.ws.role_ == detail::role_type::client &&
d.ws.pmd_config_.client_no_context_takeover) ||
(d.ws.role_ == detail::role_type::server &&
d.ws.pmd_config_.server_no_context_takeover)))
d.ws.pmd_->zo.reset();
goto upcall;
//----------------------------------------------------------------------
case do_maybe_suspend:
{
if(d.ws.wr_block_)
{
// suspend
d.state = 3;
d.state = do_maybe_suspend + 1;
d.ws.wr_op_.template emplace<
write_frame_op>(std::move(*this));
return;
@@ -253,79 +462,35 @@ operator()(error_code ec, bool again)
if(d.ws.failed_ || d.ws.wr_close_)
{
// call handler
d.state = 99;
d.state = do_upcall;
d.ws.get_io_service().post(
bind_handler(std::move(*this),
boost::asio::error::operation_aborted));
return;
}
// fall through
case 1:
{
if(! d.fh.mask)
{
// send header and entire payload
d.state = 99;
BOOST_ASSERT(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
buffer_cat(d.fh_buf.data(), d.cb),
std::move(*this));
return;
}
auto const n = clamp(d.remain, d.tmp_size);
mutable_buffers_1 mb{d.tmp, n};
buffer_copy(mb, d.cb);
d.cb.consume(n);
d.remain -= n;
detail::mask_inplace(mb, d.key);
// send header and payload
d.state = d.remain > 0 ? 2 : 99;
BOOST_ASSERT(! d.ws.wr_block_);
d.ws.wr_block_ = &d;
boost::asio::async_write(d.ws.stream_,
buffer_cat(d.fh_buf.data(),
mb), std::move(*this));
return;
d.state = d.entry;
break;
}
// sent masked payload
case 2:
{
auto const n = clamp(d.remain, d.tmp_size);
mutable_buffers_1 mb{d.tmp,
static_cast<std::size_t>(n)};
buffer_copy(mb, d.cb);
d.cb.consume(n);
d.remain -= n;
detail::mask_inplace(mb, d.key);
// send payload
if(d.remain == 0)
d.state = 99;
BOOST_ASSERT(d.ws.wr_block_ == &d);
boost::asio::async_write(
d.ws.stream_, mb, std::move(*this));
return;
}
case 3:
d.state = 4;
case do_maybe_suspend + 1:
d.state = do_maybe_suspend + 2;
d.ws.get_io_service().post(bind_handler(
std::move(*this), ec));
return;
case 4:
case do_maybe_suspend + 2:
if(d.ws.failed_ || d.ws.wr_close_)
{
// call handler
ec = boost::asio::error::operation_aborted;
goto upcall;
}
d.state = 1;
d.state = d.entry;
break;
case 99:
//----------------------------------------------------------------------
case do_upcall:
goto upcall;
}
}
@@ -391,120 +556,182 @@ write_frame(bool fin,
using boost::asio::buffer;
using boost::asio::buffer_copy;
using boost::asio::buffer_size;
bool const compress = false;
if(! wr_.cont)
wr_prepare(compress);
detail::frame_header fh;
fh.op = wr_.cont ? opcode::cont : wr_opcode_;
fh.rsv1 = false;
if(! wr_.cont)
{
wr_begin();
fh.rsv1 = wr_.compress;
}
else
{
fh.rsv1 = false;
}
fh.rsv2 = false;
fh.rsv3 = false;
fh.op = wr_.cont ? opcode::cont : wr_opcode_;
fh.mask = role_ == detail::role_type::client;
wr_.cont = ! fin;
auto remain = buffer_size(buffers);
if(compress)
if(wr_.compress)
{
// TODO
}
else if(! fh.mask && ! wr_.autofrag)
{
fh.fin = fin;
fh.len = remain;
detail::fh_streambuf fh_buf;
detail::write<static_streambuf>(fh_buf, fh);
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), buffers), ec);
failed_ = ec != 0;
if(failed_)
return;
return;
}
else if(! fh.mask && wr_.autofrag)
{
BOOST_ASSERT(wr_.size != 0);
consuming_buffers<
ConstBufferSequence> cb(buffers);
ConstBufferSequence> cb{buffers};
for(;;)
{
auto const n = clamp(remain, wr_.size);
fh.len = n;
remain -= n;
fh.fin = fin ? remain == 0 : false;
detail::fh_streambuf fh_buf;
detail::write<static_streambuf>(fh_buf, fh);
boost::asio::write(stream_,
buffer_cat(fh_buf.data(),
prepare_buffers(n, cb)), ec);
auto b = buffer(
wr_.buf.get(), wr_.buf_size);
auto const more = detail::deflate(
pmd_->zo, b, cb, fin, ec);
failed_ = ec != 0;
if(failed_)
return;
if(remain == 0)
auto const n = buffer_size(b);
if(n == 0)
{
// The input was consumed, but there
// is no output due to compression
// latency.
BOOST_ASSERT(! fin);
BOOST_ASSERT(buffer_size(cb) == 0);
fh.fin = false;
break;
}
if(fh.mask)
{
fh.key = maskgen_();
detail::prepared_key key;
detail::prepare_key(key, fh.key);
detail::mask_inplace(b, key);
}
fh.fin = ! more;
fh.len = n;
detail::fh_streambuf fh_buf;
detail::write<static_streambuf>(fh_buf, fh);
wr_.cont = ! fin;
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), b), ec);
failed_ = ec != 0;
if(failed_)
return;
if(! more)
break;
fh.op = opcode::cont;
cb.consume(n);
fh.rsv1 = false;
}
if(fh.fin && (
(role_ == detail::role_type::client &&
pmd_config_.client_no_context_takeover) ||
(role_ == detail::role_type::server &&
pmd_config_.server_no_context_takeover)))
pmd_->zo.reset();
return;
}
if(! fh.mask)
{
if(! wr_.autofrag)
{
// no mask, no autofrag
fh.fin = fin;
fh.len = remain;
detail::fh_streambuf fh_buf;
detail::write<static_streambuf>(fh_buf, fh);
wr_.cont = ! fin;
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), buffers), ec);
failed_ = ec != 0;
if(failed_)
return;
}
else
{
// no mask, autofrag
BOOST_ASSERT(wr_.buf_size != 0);
consuming_buffers<
ConstBufferSequence> cb{buffers};
for(;;)
{
auto const n = clamp(remain, wr_.buf_size);
remain -= n;
fh.len = n;
fh.fin = fin ? remain == 0 : false;
detail::fh_streambuf fh_buf;
detail::write<static_streambuf>(fh_buf, fh);
wr_.cont = ! fin;
boost::asio::write(stream_,
buffer_cat(fh_buf.data(),
prepare_buffers(n, cb)), ec);
failed_ = ec != 0;
if(failed_)
return;
if(remain == 0)
break;
fh.op = opcode::cont;
cb.consume(n);
}
}
return;
}
else if(fh.mask && ! wr_.autofrag)
if(! wr_.autofrag)
{
fh.key = maskgen_();
detail::prepared_key_type key;
detail::prepare_key(key, fh.key);
// mask, no autofrag
fh.fin = fin;
fh.len = remain;
fh.key = maskgen_();
detail::prepared_key key;
detail::prepare_key(key, fh.key);
detail::fh_streambuf fh_buf;
detail::write<static_streambuf>(fh_buf, fh);
consuming_buffers<
ConstBufferSequence> cb(buffers);
ConstBufferSequence> cb{buffers};
{
auto const n = clamp(remain, wr_.size);
auto const mb = buffer(wr_.buf.get(), n);
buffer_copy(mb, cb);
auto const n = clamp(remain, wr_.buf_size);
auto const b = buffer(wr_.buf.get(), n);
buffer_copy(b, cb);
cb.consume(n);
remain -= n;
detail::mask_inplace(mb, key);
detail::mask_inplace(b, key);
wr_.cont = ! fin;
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), mb), ec);
buffer_cat(fh_buf.data(), b), ec);
failed_ = ec != 0;
if(failed_)
return;
}
while(remain > 0)
{
auto const n = clamp(remain, wr_.size);
auto const mb = buffer(wr_.buf.get(), n);
buffer_copy(mb, cb);
auto const n = clamp(remain, wr_.buf_size);
auto const b = buffer(wr_.buf.get(), n);
buffer_copy(b, cb);
cb.consume(n);
remain -= n;
detail::mask_inplace(mb, key);
boost::asio::write(stream_, mb, ec);
detail::mask_inplace(b, key);
boost::asio::write(stream_, b, ec);
failed_ = ec != 0;
if(failed_)
return;
}
return;
}
else if(fh.mask && wr_.autofrag)
{
BOOST_ASSERT(wr_.size != 0);
// mask, autofrag
BOOST_ASSERT(wr_.buf_size != 0);
consuming_buffers<
ConstBufferSequence> cb(buffers);
ConstBufferSequence> cb{buffers};
for(;;)
{
fh.key = maskgen_();
detail::prepared_key_type key;
detail::prepared_key key;
detail::prepare_key(key, fh.key);
auto const n = clamp(remain, wr_.size);
auto const mb = buffer(wr_.buf.get(), n);
buffer_copy(mb, cb);
detail::mask_inplace(mb, key);
auto const n = clamp(remain, wr_.buf_size);
auto const b = buffer(wr_.buf.get(), n);
buffer_copy(b, cb);
detail::mask_inplace(b, key);
fh.len = n;
remain -= n;
fh.fin = fin ? remain == 0 : false;
detail::fh_streambuf fh_buf;
detail::write<static_streambuf>(fh_buf, fh);
boost::asio::write(stream_,
buffer_cat(fh_buf.data(), mb), ec);
buffer_cat(fh_buf.data(), b), ec);
failed_ = ec != 0;
if(failed_)
return;
@@ -552,9 +779,8 @@ public:
explicit
write_op(DeducedHandler&& h,
stream<NextLayer>& ws, Args&&... args)
: d_(make_handler_ptr<data, Handler>(
std::forward<DeducedHandler>(h), ws,
std::forward<Args>(args)...))
: d_(std::forward<DeducedHandler>(h),
ws, std::forward<Args>(args)...)
{
(*this)(error_code{}, false);
}
@@ -675,8 +901,6 @@ write(ConstBufferSequence const& buffers, error_code& ec)
write_frame(true, buffers, ec);
}
//------------------------------------------------------------------------------
} // websocket
} // beast

View File

@@ -105,15 +105,7 @@ struct auto_fragment
#if GENERATING_DOCS
using decorate = implementation_defined;
#else
template<class Decorator>
inline
detail::decorator_type
decorate(Decorator&& d)
{
return detail::decorator_type{new
detail::decorator<typename std::decay<Decorator>::type>{
std::forward<Decorator>(d)}};
}
using decorate = detail::decorator_type;
#endif
/** Keep-alive option.
@@ -200,6 +192,47 @@ using pong_cb = std::function<void(ping_data const&)>;
} // detail
/** permessage-deflate extension options.
These settings control the permessage-deflate extension,
which allows messages to be compressed.
@note Objects of this type are used with
@ref beast::websocket::stream::set_option.
*/
struct permessage_deflate
{
/// `true` to offer the extension in the server role
bool server_enable = false;
/// `true` to offer the extension in the client role
bool client_enable = false;
/** Maximum server window bits to offer
@note Due to a bug in ZLib, this value must be greater than 8.
*/
int server_max_window_bits = 15;
/** Maximum client window bits to offer
@note Due to a bug in ZLib, this value must be greater than 8.
*/
int client_max_window_bits = 15;
/// `true` if server_no_context_takeover desired
bool server_no_context_takeover = false;
/// `true` if client_no_context_takeover desired
bool client_no_context_takeover = false;
/// Deflate compression level 0..9
int compLevel = 8;
/// Deflate memory level, 1..9
int memLevel = 4;
};
/** Pong callback option.
Sets the callback to be invoked whenever a pong is received
@@ -250,12 +283,15 @@ struct pong_callback
/** Read buffer size option.
Sets the number of bytes allocated to the socket's read buffer.
If this is zero, then reads are not buffered. Setting this
higher can improve performance when expecting to receive
many small frames.
Sets the size of the read buffer used by the implementation to
receive frames. The read buffer is needed when permessage-deflate
is used.
The default is no buffering.
Lowering the size of the buffer can decrease the memory requirements
for each connection, while increasing the size of the buffer can reduce
the number of calls made to the next layer to read data.
The default setting is 4096. The minimum value is 8.
@note Objects of this type are used with
@ref beast::websocket::stream::set_option.

View File

@@ -194,10 +194,10 @@ public:
#if GENERATING_DOCS
set_option(implementation_defined o)
#else
set_option(detail::decorator_type o)
set_option(detail::decorator_type const& o)
#endif
{
d_ = std::move(o);
d_ = o;
}
/// Set the keep-alive option
@@ -214,6 +214,17 @@ public:
wr_opcode_ = o.value;
}
/// Set the permessage-deflate extension options
void
set_option(permessage_deflate const& o);
/// Get the permessage-deflate extension options
void
get_option(permessage_deflate& o)
{
o = pmd_opts_;
}
/// Set the pong callback
void
set_option(pong_callback o)
@@ -225,7 +236,9 @@ public:
void
set_option(read_buffer_size const& o)
{
stream_.capacity(o.value);
rd_buf_size_ = o.value;
// VFALCO What was the thinking here?
//stream_.capacity(o.value);
}
/// Set the maximum incoming message size allowed
@@ -1635,10 +1648,6 @@ private:
void
do_response(http::response<Body, Fields> const& resp,
boost::string_ref const& key, error_code& ec);
void
do_read_fh(detail::frame_streambuf& fb,
close_code::value& code, error_code& ec);
};
} // websocket

View File

@@ -807,7 +807,7 @@ deflate_stream::get_lut() ->
//std::uint16_t bl_count[maxBits+1];
// Initialize the mapping length (0..255) -> length code (0..28)
std::uint8_t length = 0;
int length = 0;
for(std::uint8_t code = 0; code < lengthCodes-1; ++code)
{
tables.base_length[code] = length;
@@ -815,11 +815,11 @@ deflate_stream::get_lut() ->
for(unsigned n = 0; n < run; ++n)
tables.length_code[length++] = code;
}
BOOST_ASSERT(length == 0);
BOOST_ASSERT(length == 256);
// Note that the length 255 (match length 258) can be represented
// in two different ways: code 284 + 5 bits or code 285, so we
// overwrite length_code[255] to use the best encoding:
tables.length_code[length-1] = lengthCodes-1;
tables.length_code[255] = lengthCodes-1;
// Initialize the mapping dist (0..32K) -> dist code (0..29)
{