WebSocket refactoring and tests:

websocket:

* Move echo server to test/
* Fix warnings
* Fix maskgen being uncopyable
* Simplify utf8_checker special member declarations
* Fix stream move assignable when owning the next layer
* Add javadocs for stream special members
* Add stream unit tests
* Move throwing member definitions to the .ipp file
* Use get_lowest_layer in stream declaration
* Perform type checks at each call site instead of constructor
* Demote close_code to a non-class enum:
    Otherwise, application specific close codes
    cannot be assigned without using static_cast.

core:

* Add streambuf_readstream special members tests
* Add move assignment operator to streambuf_readstream
* Add detail/get_lowest_layer trait
* Add static_string tests
* Move static_string from websocket to core
This commit is contained in:
Vinnie Falco
2016-04-30 13:00:33 -04:00
parent 47dc31d8c2
commit 9390eb016c
38 changed files with 1792 additions and 567 deletions

View File

@@ -16,6 +16,7 @@ add_executable (core-tests
placeholders.cpp
prepare_buffers.cpp
static_streambuf.cpp
static_string.cpp
streambuf.cpp
streambuf_readstream.cpp
to_string.cpp
@@ -54,21 +55,6 @@ if (NOT WIN32)
target_link_libraries(http-tests ${Boost_LIBRARIES})
endif()
add_executable (websocket-tests
${BEAST_INCLUDES}
main.cpp
websocket/error.cpp
websocket/option.cpp
websocket/rfc6455.cpp
websocket/static_string.cpp
websocket/teardown.cpp
websocket/utf8_checker.cpp
)
if (NOT WIN32)
target_link_libraries(websocket-tests ${Boost_LIBRARIES})
endif()
add_executable (parser-bench
${BEAST_INCLUDES}
main.cpp
@@ -80,3 +66,31 @@ if (NOT WIN32)
target_link_libraries(parser-bench ${Boost_LIBRARIES})
endif()
add_executable (websocket-tests
${BEAST_INCLUDES}
websocket/websocket_async_echo_peer.hpp
websocket/websocket_sync_echo_peer.hpp
main.cpp
websocket/error.cpp
websocket/option.cpp
websocket/rfc6455.cpp
websocket/stream.cpp
websocket/teardown.cpp
websocket/utf8_checker.cpp
)
if (NOT WIN32)
target_link_libraries(websocket-tests ${Boost_LIBRARIES})
endif()
add_executable (websocket-echo
${BEAST_INCLUDES}
sig_wait.hpp
websocket/websocket_async_echo_peer.hpp
websocket/websocket_sync_echo_peer.hpp
websocket/websocket_echo.cpp
)
if (NOT WIN32)
target_link_libraries(websocket-echo ${Boost_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
endif()

View File

@@ -19,6 +19,7 @@ unit-test core-tests :
placeholders.cpp
prepare_buffers.cpp
static_streambuf.cpp
static_string.cpp
streambuf.cpp
streambuf_readstream.cpp
to_string.cpp
@@ -48,18 +49,23 @@ unit-test http-tests :
http/write.cpp
;
unit-test websocket-tests :
main.cpp
websocket/error.cpp
websocket/option.cpp
websocket/rfc6455.cpp
websocket/static_string.cpp
websocket/teardown.cpp
websocket/utf8_checker.cpp
;
unit-test parser-bench :
main.cpp
http/nodejs_parser.cpp
http/parser_bench.cpp
;
unit-test websocket-tests :
main.cpp
websocket/error.cpp
websocket/option.cpp
websocket/rfc6455.cpp
websocket/stream.cpp
websocket/teardown.cpp
websocket/utf8_checker.cpp
;
exe websocket-echo :
websocket/websocket_echo.cpp
;

42
test/sig_wait.hpp Normal file
View File

@@ -0,0 +1,42 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef BEAST_EXAMPLE_SIG_WAIT_H_INCLUDED
#define BEAST_EXAMPLE_SIG_WAIT_H_INCLUDED
#include <boost/asio.hpp>
#include <condition_variable>
#include <mutex>
// Block until SIGINT or SIGTERM
inline
void
sig_wait()
{
boost::asio::io_service ios;
boost::asio::signal_set signals(
ios, SIGINT, SIGTERM);
signals.async_wait(
[&](boost::system::error_code const&, int)
{
});
ios.run();
}
#endif

191
test/static_string.cpp Normal file
View File

@@ -0,0 +1,191 @@
//
// Copyright (c) 2013-2016 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Test that header file is self-contained.
#include <beast/static_string.hpp>
#include <beast/detail/unit_test/suite.hpp>
namespace beast {
namespace websocket {
class static_string_test : public beast::detail::unit_test::suite
{
public:
void testMembers()
{
using str1 = static_string<1>;
using str2 = static_string<2>;
{
str1 s1;
expect(s1 == "");
expect(s1.empty());
expect(s1.size() == 0);
expect(s1.max_size() == 1);
expect(s1.capacity() == 1);
expect(s1.begin() == s1.end());
expect(s1.cbegin() == s1.cend());
expect(s1.rbegin() == s1.rend());
expect(s1.crbegin() == s1.crend());
try
{
expect(s1.at(0) == 0);
fail();
}
catch(std::exception const&)
{
pass();
}
expect(s1.data()[0] == 0);
expect(*s1.c_str() == 0);
expect(std::distance(s1.begin(), s1.end()) == 0);
expect(std::distance(s1.cbegin(), s1.cend()) == 0);
expect(std::distance(s1.rbegin(), s1.rend()) == 0);
expect(std::distance(s1.crbegin(), s1.crend()) == 0);
expect(s1.compare(s1) == 0);
expect(s1.to_string() == std::string{});
}
{
str1 const s1;
expect(s1 == "");
expect(s1.empty());
expect(s1.size() == 0);
expect(s1.max_size() == 1);
expect(s1.capacity() == 1);
expect(s1.begin() == s1.end());
expect(s1.cbegin() == s1.cend());
expect(s1.rbegin() == s1.rend());
expect(s1.crbegin() == s1.crend());
try
{
expect(s1.at(0) == 0);
fail();
}
catch(std::exception const&)
{
pass();
}
expect(s1.data()[0] == 0);
expect(*s1.c_str() == 0);
expect(std::distance(s1.begin(), s1.end()) == 0);
expect(std::distance(s1.cbegin(), s1.cend()) == 0);
expect(std::distance(s1.rbegin(), s1.rend()) == 0);
expect(std::distance(s1.crbegin(), s1.crend()) == 0);
expect(s1.compare(s1) == 0);
expect(s1.to_string() == std::string{});
}
{
str1 s1;
str1 s2("x");
expect(s2 == "x");
expect(s2[0] == 'x');
expect(s2.at(0) == 'x');
expect(s2.front() == 'x');
expect(s2.back() == 'x');
str1 const s3(s2);
expect(s3 == "x");
expect(s3[0] == 'x');
expect(s3.at(0) == 'x');
expect(s3.front() == 'x');
expect(s3.back() == 'x');
s2 = "y";
expect(s2 == "y");
s1 = s2;
expect(s1 == "y");
s1.clear();
expect(s1.empty());
expect(s1.size() == 0);
}
{
str2 s1("x");
str1 s2(s1);
expect(s2 == "x");
str1 s3;
s3 = s2;
expect(s3 == "x");
s1 = "xy";
expect(s1.size() == 2);
expect(s1[0] == 'x');
expect(s1[1] == 'y');
expect(s1.at(0) == 'x');
expect(s1.at(1) == 'y');
expect(s1.front() == 'x');
expect(s1.back() == 'y');
auto const s4 = s1;
expect(s4[0] == 'x');
expect(s4[1] == 'y');
expect(s4.at(0) == 'x');
expect(s4.at(1) == 'y');
expect(s4.front() == 'x');
expect(s4.back() == 'y');
try
{
s3 = s1;
fail();
}
catch(std::exception const&)
{
pass();
}
try
{
str1 s5(s1);
fail();
}
catch(std::exception const&)
{
pass();
}
}
{
str2 s1("x");
str2 s2("x");
expect(s1 == s2);
expect(s1 <= s2);
expect(s1 >= s2);
expect(! (s1 < s2));
expect(! (s1 > s2));
expect(! (s1 != s2));
}
{
str1 s1("x");
str2 s2("x");
expect(s1 == s2);
expect(s1 <= s2);
expect(s1 >= s2);
expect(! (s1 < s2));
expect(! (s1 > s2));
expect(! (s1 != s2));
}
{
str2 s("x");
expect(s == "x");
expect(s <= "x");
expect(s >= "x");
expect(! (s < "x"));
expect(! (s > "x"));
expect(! (s != "x"));
expect("x" == s);
expect("x" <= s);
expect("x" >= s);
expect(! ("x" < s));
expect(! ("x" > s));
expect(! ("x" != s));
}
pass();
}
void run() override
{
testMembers();
}
};
BEAST_DEFINE_TESTSUITE(static_string,websocket,beast);
} // websocket
} // beast

View File

@@ -7,3 +7,40 @@
// Test that header file is self-contained.
#include <beast/streambuf_readstream.hpp>
#include <beast/streambuf.hpp>
#include <beast/detail/unit_test/suite.hpp>
#include <boost/asio.hpp>
namespace beast {
class streambuf_readstream_test : public beast::detail::unit_test::suite
{
public:
void testSpecial()
{
using socket_type = boost::asio::ip::tcp::socket;
boost::asio::io_service ios;
{
streambuf_readstream<socket_type, streambuf> srs(ios);
streambuf_readstream<socket_type, streambuf> srs2(std::move(srs));
srs = std::move(srs2);
}
{
socket_type sock(ios);
streambuf_readstream<socket_type&, streambuf> srs(sock);
streambuf_readstream<socket_type&, streambuf> srs2(std::move(srs));
}
pass();
}
void run() override
{
testSpecial();
}
};
BEAST_DEFINE_TESTSUITE(streambuf_readstream,core,beast);
} // beast

View File

@@ -1,9 +0,0 @@
//
// Copyright (c) 2013-2016 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Test that header file is self-contained.
#include <beast/websocket/static_string.hpp>

454
test/websocket/stream.cpp Normal file
View File

@@ -0,0 +1,454 @@
//
// Copyright (c) 2013-2016 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Test that header file is self-contained.
#include <beast/websocket/stream.hpp>
#include "websocket_async_echo_peer.hpp"
#include "websocket_sync_echo_peer.hpp"
#include <beast/bind_handler.hpp>
#include <beast/streambuf.hpp>
#include <beast/detail/unit_test/suite.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/optional.hpp>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <thread>
#include <beast/http/parser.hpp>
namespace beast {
namespace websocket {
class stream_test : public beast::detail::unit_test::suite
{
boost::asio::io_service ios_;
boost::optional<boost::asio::io_service::work> work_;
std::thread thread_;
std::mutex m_;
std::condition_variable cv_;
bool running_ = false;;
public:
using endpoint_type = boost::asio::ip::tcp::endpoint;
using address_type = boost::asio::ip::address;
using socket_type = boost::asio::ip::tcp::socket;
// meets the requirements of AsyncStream, SyncStream
class string_Stream
{
std::string s_;
boost::asio::io_service& ios_;
public:
string_Stream(boost::asio::io_service& ios,
std::string s)
: s_(s)
, ios_(ios)
{
}
boost::asio::io_service&
get_io_service()
{
return ios_;
}
template<class MutableBufferSequence>
std::size_t
read_some(MutableBufferSequence const& buffers)
{
error_code ec;
auto const n = read_some(buffers, ec);
if(ec)
throw boost::system::system_error{ec};
return n;
}
template<class MutableBufferSequence>
std::size_t
read_some(MutableBufferSequence const& buffers,
error_code& ec)
{
auto const n = boost::asio::buffer_copy(
buffers, boost::asio::buffer(s_));
s_.erase(0, n);
return n;
}
template<class MutableBufferSequence, class ReadHandler>
typename async_completion<ReadHandler,
void(std::size_t, error_code)>::result_type
async_read_some(MutableBufferSequence const& buffers,
ReadHandler&& handler)
{
auto const n = boost::asio::buffer_copy(
buffers, boost::asio::buffer(s_));
s_.erase(0, n);
async_completion<ReadHandler,
void(error_code, std::size_t)> completion(handler);
ios_.post(bind_handler(
completion.handler, error_code{}, n));
return completion.result.get();
}
template<class ConstBufferSequence>
std::size_t
write_some(ConstBufferSequence const& buffers)
{
error_code ec;
auto const n = write_some(buffers, ec);
if(ec)
throw boost::system::system_error{ec};
return n;
}
template<class ConstBufferSequence>
std::size_t
write_some(ConstBufferSequence const& buffers,
error_code&)
{
return boost::asio::buffer_size(buffers);
}
template<class ConstBuffeSequence, class WriteHandler>
typename async_completion<WriteHandler,
void(std::size_t, error_code)>::result_type
async_write_some(ConstBuffeSequence const& buffers,
WriteHandler&& handler)
{
async_completion<WriteHandler,
void(error_code, std::size_t)> completion(handler);
ios_.post(bind_handler(completion.handler,
error_code{}, boost::asio::buffer_size(buffers)));
return completion.result.get();
}
};
stream_test()
: work_(ios_)
, thread_([&]{ ios_.run(); })
{
}
~stream_test()
{
work_ = boost::none;
thread_.join();
}
template<class Function>
void exec(Function&& f)
{
{
std::lock_guard<std::mutex> lock(m_);
running_ = true;
}
boost::asio::spawn(ios_,
[&](boost::asio::yield_context do_yield)
{
f(do_yield);
std::lock_guard<std::mutex> lock(m_);
running_ = false;
cv_.notify_all();
}
, boost::coroutines::attributes(2 * 1024 * 1024));
std::unique_lock<std::mutex> lock(m_);
cv_.wait(lock, [&]{ return ! running_; });
}
void testSpecialMembers()
{
stream<socket_type> ws(ios_);
{
stream<socket_type> ws2(std::move(ws));
}
{
stream<socket_type> ws2(ios_);
ws = std::move(ws2);
}
pass();
}
void testOptions()
{
stream<socket_type> ws(ios_);
ws.set_option(message_type(opcode::binary));
ws.set_option(read_buffer_size(8192));
ws.set_option(read_message_max(1 * 1024 * 1024));
ws.set_option(write_buffer_size(2048));
pass();
}
template<std::size_t N>
static
boost::asio::const_buffers_1
strbuf(const char (&s)[N])
{
return boost::asio::const_buffers_1(&s[0], N-1);
}
void testAccept(boost::asio::yield_context do_yield)
{
{
stream<string_Stream> ws(ios_,
"GET / HTTP/1.1\r\n"
"Host: localhost:80\r\n"
"Upgrade: WebSocket\r\n"
"Connection: upgrade\r\n"
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
"Sec-WebSocket-Version: 13\r\n"
"\r\n");
try
{
ws.accept();
pass();
}
catch(...)
{
fail();
}
}
{
stream<string_Stream> ws(ios_,
"GET / HTTP/1.1\r\n"
"\r\n");
try
{
ws.accept();
fail();
}
catch(...)
{
pass();
}
}
{
stream<string_Stream> ws(ios_,
"GET / HTTP/1.1\r\n"
"Host: localhost:80\r\n"
"Upgrade: WebSocket\r\n"
"Connection: upgrade\r\n"
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
"Sec-WebSocket-Version: 13\r\n"
"\r\n");
error_code ec;
ws.accept(ec);
expect(! ec, ec.message());
}
{
stream<string_Stream> ws(ios_,
"GET / HTTP/1.1\r\n"
"\r\n");
error_code ec;
ws.accept(ec);
expect(ec);
}
{
stream<string_Stream> ws(ios_,
"GET / HTTP/1.1\r\n"
"Host: localhost:80\r\n"
"Upgrade: WebSocket\r\n"
"Connection: upgrade\r\n"
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
"Sec-WebSocket-Version: 13\r\n"
"\r\n");
error_code ec;
ws.async_accept(do_yield[ec]);
expect(! ec, ec.message());
}
{
stream<string_Stream> ws(ios_,
"GET / HTTP/1.1\r\n"
"\r\n");
error_code ec;
ws.async_accept(do_yield[ec]);
expect(ec);
}
{
stream<string_Stream> ws(ios_,
"Host: localhost:80\r\n"
"Upgrade: WebSocket\r\n"
"Connection: upgrade\r\n"
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
"Sec-WebSocket-Version: 13\r\n"
"\r\n");
try
{
ws.accept(strbuf(
"GET / HTTP/1.1\r\n"));
pass();
}
catch(...)
{
fail();
}
}
{
stream<string_Stream> ws(ios_,
"\r\n");
try
{
ws.accept(strbuf(
"GET / HTTP/1.1\r\n"));
fail();
}
catch(...)
{
pass();
}
}
{
stream<string_Stream> ws(ios_,
"Host: localhost:80\r\n"
"Upgrade: WebSocket\r\n"
"Connection: upgrade\r\n"
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
"Sec-WebSocket-Version: 13\r\n"
"\r\n");
error_code ec;
ws.accept(strbuf(
"GET / HTTP/1.1\r\n"), ec);
expect(! ec, ec.message());
}
{
stream<string_Stream> ws(ios_,
"GET / HTTP/1.1\r\n"
"\r\n");
error_code ec;
ws.accept(ec);
expect(ec);
}
{
stream<string_Stream> ws(ios_,
"Host: localhost:80\r\n"
"Upgrade: WebSocket\r\n"
"Connection: upgrade\r\n"
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
"Sec-WebSocket-Version: 13\r\n"
"\r\n");
error_code ec;
ws.async_accept(strbuf(
"GET / HTTP/1.1\r\n"), do_yield[ec]);
expect(! ec, ec.message());
}
{
stream<string_Stream> ws(ios_,
"\r\n");
error_code ec;
ws.async_accept(strbuf(
"GET / HTTP/1.1\r\n"), do_yield[ec]);
expect(ec);
}
}
void testHandshake(endpoint_type const& ep,
boost::asio::yield_context do_yield)
{
{
// disconnected socket
socket_type sock(ios_);
stream<decltype(sock)&> ws(sock);
try
{
ws.handshake("localhost", "/");
fail();
}
catch(boost::system::system_error const&)
{
pass();
}
catch(...)
{
fail();
}
error_code ec;
ws.handshake("localhost", "/", ec);
if(! expect(ec))
return;
ws.async_handshake("localhost", "/", do_yield[ec]);
if(! expect(ec))
return;
}
{
error_code ec;
socket_type sock(ios_);
sock.connect(ep, ec);
if(! expect(! ec, ec.message()))
return;
stream<decltype(sock)&> ws(sock);
ws.handshake("localhost", "/", ec);
if(! expect(! ec, ec.message()))
return;
ws.close({}, ec);
if(! expect(! ec, ec.message()))
return;
streambuf sb;
opcode op;
ws.read(op, sb, ec);
if(! expect(ec == error::closed, ec.message()))
return;
expect(ws.reason().code == close_code::normal);
}
{
error_code ec;
socket_type sock(ios_);
sock.connect(ep, ec);
if(! expect(! ec, ec.message()))
return;
stream<decltype(sock)&> ws(sock);
ws.async_handshake("localhost", "/", do_yield[ec]);
if(! expect(! ec, ec.message()))
return;
ws.async_close({}, do_yield[ec]);
if(! expect(! ec, ec.message()))
return;
streambuf sb;
opcode op;
ws.async_read(op, sb, do_yield[ec]);
if(! expect(ec == error::closed, ec.message()))
return;
expect(ws.reason().code == close_code::normal);
}
}
void run() override
{
testSpecialMembers();
testOptions();
exec(std::bind(&stream_test::testAccept,
this, std::placeholders::_1));
auto const any = endpoint_type{
address_type::from_string("127.0.0.1"), 0};
{
sync_echo_peer server(true, any);
exec(std::bind(&stream_test::testHandshake,
this, server.local_endpoint(),
std::placeholders::_1));
}
{
async_echo_peer server(true, any, 1);
exec(std::bind(&stream_test::testHandshake,
this, server.local_endpoint(),
std::placeholders::_1));
}
pass();
}
};
BEAST_DEFINE_TESTSUITE(stream,websocket,beast);
} // websocket
} // beast

View File

@@ -0,0 +1,275 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef BEAST_WEBSOCKET_ASYNC_ECHO_PEER_H_INCLUDED
#define BEAST_WEBSOCKET_ASYNC_ECHO_PEER_H_INCLUDED
#include <beast/placeholders.hpp>
#include <beast/streambuf.hpp>
#include <beast/websocket.hpp>
#include <boost/optional.hpp>
#include <functional>
#include <iostream>
#include <memory>
#include <thread>
namespace beast {
namespace websocket {
// Asynchronous WebSocket echo client/server
//
class async_echo_peer
{
public:
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using address_type = boost::asio::ip::address;
using socket_type = boost::asio::ip::tcp::socket;
private:
boost::asio::io_service ios_;
socket_type sock_;
boost::asio::ip::tcp::acceptor acceptor_;
std::vector<std::thread> thread_;
public:
async_echo_peer(bool server,
endpoint_type const& ep, std::size_t threads)
: sock_(ios_)
, acceptor_(ios_)
{
if(server)
{
error_code ec;
acceptor_.open(ep.protocol(), ec);
maybe_throw(ec, "open");
acceptor_.set_option(
boost::asio::socket_base::reuse_address{true});
acceptor_.bind(ep, ec);
maybe_throw(ec, "bind");
acceptor_.listen(
boost::asio::socket_base::max_connections, ec);
maybe_throw(ec, "listen");
acceptor_.async_accept(sock_,
std::bind(&async_echo_peer::on_accept, this,
beast::asio::placeholders::error));
}
else
{
Peer{std::move(sock_), ep};
}
thread_.reserve(threads);
for(std::size_t i = 0; i < threads; ++i)
thread_.emplace_back(
[&]{ ios_.run(); });
}
~async_echo_peer()
{
error_code ec;
ios_.dispatch(
[&]{ acceptor_.close(ec); });
for(auto& t : thread_)
t.join();
}
endpoint_type
local_endpoint() const
{
return acceptor_.local_endpoint();
}
private:
class Peer
{
struct data
{
int state = 0;
boost::optional<endpoint_type> ep;
websocket::stream<socket_type> ws;
websocket::opcode op;
beast::streambuf sb;
int id;
data(socket_type&& sock_)
: ws(std::move(sock_))
, id([]
{
static int n = 0;
return ++n;
}())
{
}
data(socket_type&& sock_,
endpoint_type const& ep_)
: ep(ep_)
, ws(std::move(sock_))
, id([]
{
static int n = 0;
return ++n;
}())
{
}
};
std::shared_ptr<data> d_;
public:
Peer(Peer&&) = default;
Peer(Peer const&) = default;
Peer& operator=(Peer&&) = delete;
Peer& operator=(Peer const&) = delete;
struct identity
{
template<class Body, class Headers>
void
operator()(http::message<true, Body, Headers>& req)
{
req.headers.replace("User-Agent", "async_echo_client");
}
template<class Body, class Headers>
void
operator()(http::message<false, Body, Headers>& resp)
{
resp.headers.replace("Server", "async_echo_server");
}
};
template<class... Args>
explicit
Peer(socket_type&& sock, Args&&... args)
: d_(std::make_shared<data>(
std::forward<socket_type>(sock),
std::forward<Args>(args)...))
{
auto& d = *d_;
d.ws.set_option(decorate(identity{}));
d.ws.set_option(read_message_max(64 * 1024 * 1024));
run();
}
void run()
{
auto& d = *d_;
if(! d.ep)
{
d.ws.async_accept(std::move(*this));
}
else
{
d.state = 4;
d.ws.next_layer().async_connect(
*d.ep, std::move(*this));
}
}
void operator()(error_code ec)
{
auto& d = *d_;
switch(d_->state)
{
// did accept
case 0:
if(ec)
return fail(ec, "async_accept");
// start
case 1:
if(ec)
return fail(ec, "async_handshake");
d.sb.consume(d.sb.size());
// read message
d.state = 2;
d.ws.async_read(d.op, d.sb, std::move(*this));
return;
// got message
case 2:
if(ec == websocket::error::closed)
return;
if(ec)
return fail(ec, "async_read");
// write message
d.state = 1;
d.ws.set_option(websocket::message_type(d.op));
d.ws.async_write(d.sb.data(), std::move(*this));
return;
// connected
case 4:
if(ec)
return fail(ec, "async_connect");
d.state = 1;
d.ws.async_handshake(
d.ep->address().to_string() + ":" +
std::to_string(d.ep->port()),
"/", std::move(*this));
return;
}
}
private:
void
fail(error_code ec, std::string what)
{
if(ec != websocket::error::closed)
std::cerr << "#" << d_->id << " " <<
what << ": " << ec.message() << std::endl;
}
};
void
fail(error_code ec, std::string what)
{
std::cerr <<
what << ": " << ec.message() << std::endl;
}
void
maybe_throw(error_code ec, std::string what)
{
if(ec)
{
fail(ec, what);
throw ec;
}
}
void
on_accept(error_code ec)
{
if(! acceptor_.is_open())
return;
maybe_throw(ec, "accept");
socket_type sock(std::move(sock_));
acceptor_.async_accept(sock_,
std::bind(&async_echo_peer::on_accept, this,
beast::asio::placeholders::error));
Peer{std::move(sock)};
}
};
} // websocket
} // beast
#endif

View File

@@ -0,0 +1,36 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "websocket_async_echo_peer.hpp"
#include "websocket_sync_echo_peer.hpp"
#include "../sig_wait.hpp"
int main()
{
using endpoint_type = boost::asio::ip::tcp::endpoint;
using address_type = boost::asio::ip::address;
beast::websocket::async_echo_peer s1(true, endpoint_type{
address_type::from_string("127.0.0.1"), 6000 }, 4);
beast::websocket::sync_echo_peer s2(true, endpoint_type{
address_type::from_string("127.0.0.1"), 6001 });
sig_wait();
}

View File

@@ -0,0 +1,200 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef BEAST_WEBSOCKET_SYNC_ECHO_PEER_H_INCLUDED
#define BEAST_WEBSOCKET_SYNC_ECHO_PEER_H_INCLUDED
#include <beast/streambuf.hpp>
#include <beast/websocket.hpp>
#include <boost/optional.hpp>
#include <functional>
#include <iostream>
#include <memory>
#include <thread>
namespace beast {
namespace websocket {
// Synchronous WebSocket echo client/server
//
class sync_echo_peer
{
public:
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
using address_type = boost::asio::ip::address;
using socket_type = boost::asio::ip::tcp::socket;
private:
boost::asio::io_service ios_;
socket_type sock_;
boost::asio::ip::tcp::acceptor acceptor_;
std::thread thread_;
public:
sync_echo_peer(bool server, endpoint_type ep)
: sock_(ios_)
, acceptor_(ios_)
{
error_code ec;
acceptor_.open(ep.protocol(), ec);
maybe_throw(ec, "open");
acceptor_.set_option(
boost::asio::socket_base::reuse_address{true});
acceptor_.bind(ep, ec);
maybe_throw(ec, "bind");
acceptor_.listen(
boost::asio::socket_base::max_connections, ec);
maybe_throw(ec, "listen");
acceptor_.async_accept(sock_,
std::bind(&sync_echo_peer::on_accept, this,
beast::asio::placeholders::error));
thread_ = std::thread{[&]{ ios_.run(); }};
}
~sync_echo_peer()
{
error_code ec;
ios_.dispatch(
[&]{ acceptor_.close(ec); });
thread_.join();
}
endpoint_type
local_endpoint() const
{
return acceptor_.local_endpoint();
}
private:
static
void
fail(error_code ec, std::string what)
{
std::cerr <<
what << ": " << ec.message() << std::endl;
}
static
void
fail(int id, error_code ec, std::string what)
{
std::cerr << "#" << std::to_string(id) << " " <<
what << ": " << ec.message() << std::endl;
}
static
void
maybe_throw(error_code ec, std::string what)
{
if(ec)
{
fail(ec, what);
throw ec;
}
}
struct lambda
{
int id;
sync_echo_peer& self;
socket_type sock;
boost::asio::io_service::work work;
lambda(int id_, sync_echo_peer& self_,
socket_type&& sock_)
: id(id_)
, self(self_)
, sock(std::move(sock_))
, work(sock.get_io_service())
{
}
void operator()()
{
self.do_peer(id, std::move(sock));
}
};
void
on_accept(error_code ec)
{
if(ec == boost::asio::error::operation_aborted)
return;
maybe_throw(ec, "accept");
static int id_ = 0;
std::thread{lambda{++id_, *this, std::move(sock_)}}.detach();
acceptor_.async_accept(sock_,
std::bind(&sync_echo_peer::on_accept, this,
beast::asio::placeholders::error));
}
struct identity
{
template<class Body, class Headers>
void
operator()(http::message<true, Body, Headers>& req)
{
req.headers.replace("User-Agent", "sync_echo_client");
}
template<class Body, class Headers>
void
operator()(http::message<false, Body, Headers>& resp)
{
resp.headers.replace("Server", "sync_echo_server");
}
};
void
do_peer(int id, socket_type&& sock)
{
websocket::stream<socket_type> ws(std::move(sock));
ws.set_option(decorate(identity{}));
ws.set_option(read_message_max(64 * 1024 * 1024));
error_code ec;
ws.accept(ec);
if(ec)
{
fail(id, ec, "accept");
return;
}
for(;;)
{
websocket::opcode op;
beast::streambuf sb;
ws.read(op, sb, ec);
if(ec)
break;
ws.set_option(websocket::message_type(op));
ws.write(sb.data(), ec);
if(ec)
break;
}
if(ec && ec != websocket::error::closed)
{
fail(id, ec, "read");
}
}
};
} // websocket
} // beast
#endif