commit 949364701926cc4c3a078f52364d4b5e730e5e11 Author: CJ Cobb Date: Mon Dec 14 20:39:54 2020 -0500 Initial Commit * Copy and paste boost beast websocket server * Compile and link a simple protobuf message * Include gRPC diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 00000000..ce8abb14 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,73 @@ +# +# Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com) +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/boostorg/beast +# + +project(reporting) +cmake_minimum_required(VERSION 3.17) +FIND_PACKAGE( Boost 1.70 COMPONENTS thread REQUIRED ) + +add_executable (reporting + websocket_server_async.cpp +) + + + +include(FetchContent) + +FetchContent_Declare( + gRPC + GIT_REPOSITORY https://github.com/grpc/grpc + GIT_TAG v1.28.0 + ) +set(FETCHCONTENT_QUIET OFF) +FetchContent_MakeAvailable(gRPC) + + + +set(_PROTOBUF_LIBPROTOBUF libprotobuf) +set(_REFLECTION grpc++_reflection) +set(_PROTOBUF_PROTOC $) +set(_GRPC_GRPCPP grpc++) +set(_GRPC_CPP_PLUGIN_EXECUTABLE $) +message(${CMAKE_CURRENT_BINARY_DIR}) + +# Proto file +get_filename_component(proto "message.proto" ABSOLUTE) +get_filename_component(proto_path "${proto}" PATH) + +# Generated sources +set(proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/message.pb.cc") +set(proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/message.pb.h") +set(grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/message.grpc.pb.cc") +set(grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/message.grpc.pb.h") +add_custom_command( + OUTPUT "${proto_srcs}" "${proto_hdrs}" "${grpc_srcs}" "${grpc_hdrs}" + COMMAND ${_PROTOBUF_PROTOC} + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${proto_path}" + --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" + "${proto}" + DEPENDS "${proto}") + +# Include generated *.pb.h files +# Targets greeter_[async_](client|server) +target_sources(reporting PRIVATE message.pb.cc message.grpc.pb.cc) + + + + + + + + + + + +INCLUDE_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR} ${Boost_INCLUDE_DIR}) +TARGET_LINK_LIBRARIES(reporting LINK_PUBLIC ${Boost_LIBRARIES} ${_REFLECTION} ${_GRPC_GRPCPP} ${_PROTOBUF_LIBPROTOBUF}) diff --git a/message.proto b/message.proto new file mode 100644 index 00000000..9fa18a60 --- /dev/null +++ b/message.proto @@ -0,0 +1,5 @@ +package message; + +message Message { + repeated int32 id = 1; +} diff --git a/websocket_server_async.cpp b/websocket_server_async.cpp new file mode 100644 index 00000000..4b957086 --- /dev/null +++ b/websocket_server_async.cpp @@ -0,0 +1,281 @@ +// +// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +//------------------------------------------------------------------------------ +// +// Example: WebSocket server, asynchronous +// +//------------------------------------------------------------------------------ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace beast = boost::beast; // from +namespace http = beast::http; // from +namespace websocket = beast::websocket; // from +namespace net = boost::asio; // from +using tcp = boost::asio::ip::tcp; // from + +//------------------------------------------------------------------------------ + +// Report a failure +void +fail(beast::error_code ec, char const* what) +{ + std::cerr << what << ": " << ec.message() << "\n"; +} + +// Echoes back all received WebSocket messages +class session : public std::enable_shared_from_this +{ + websocket::stream ws_; + beast::flat_buffer buffer_; + +public: + // Take ownership of the socket + explicit + session(tcp::socket&& socket) + : ws_(std::move(socket)) + { + } + + // Get on the correct executor + void + run() + { + // We need to be executing within a strand to perform async operations + // on the I/O objects in this session. Although not strictly necessary + // for single-threaded contexts, this example code is written to be + // thread-safe by default. + net::dispatch(ws_.get_executor(), + beast::bind_front_handler( + &session::on_run, + shared_from_this())); + } + + // Start the asynchronous operation + void + on_run() + { + // Set suggested timeout settings for the websocket + ws_.set_option( + websocket::stream_base::timeout::suggested( + beast::role_type::server)); + + // Set a decorator to change the Server of the handshake + ws_.set_option(websocket::stream_base::decorator( + [](websocket::response_type& res) + { + res.set(http::field::server, + std::string(BOOST_BEAST_VERSION_STRING) + + " websocket-server-async"); + })); + // Accept the websocket handshake + ws_.async_accept( + beast::bind_front_handler( + &session::on_accept, + shared_from_this())); + } + + void + on_accept(beast::error_code ec) + { + if(ec) + return fail(ec, "accept"); + + // Read a message + do_read(); + } + + void + do_read() + { + // Read a message into our buffer + ws_.async_read( + buffer_, + beast::bind_front_handler( + &session::on_read, + shared_from_this())); + } + + void + on_read( + beast::error_code ec, + std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + // This indicates that the session was closed + if(ec == websocket::error::closed) + return; + + if(ec) + fail(ec, "read"); + + // Echo the message + ws_.text(ws_.got_text()); + ws_.async_write( + buffer_.data(), + beast::bind_front_handler( + &session::on_write, + shared_from_this())); + } + + void + on_write( + beast::error_code ec, + std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + if(ec) + return fail(ec, "write"); + + // Clear the buffer + buffer_.consume(buffer_.size()); + + // Do another read + do_read(); + } +}; + +//------------------------------------------------------------------------------ + +// Accepts incoming connections and launches the sessions +class listener : public std::enable_shared_from_this +{ + net::io_context& ioc_; + tcp::acceptor acceptor_; + +public: + listener( + net::io_context& ioc, + tcp::endpoint endpoint) + : ioc_(ioc) + , acceptor_(ioc) + { + beast::error_code ec; + + // Open the acceptor + acceptor_.open(endpoint.protocol(), ec); + if(ec) + { + fail(ec, "open"); + return; + } + + // Allow address reuse + acceptor_.set_option(net::socket_base::reuse_address(true), ec); + if(ec) + { + fail(ec, "set_option"); + return; + } + + // Bind to the server address + acceptor_.bind(endpoint, ec); + if(ec) + { + fail(ec, "bind"); + return; + } + + // Start listening for connections + acceptor_.listen( + net::socket_base::max_listen_connections, ec); + if(ec) + { + fail(ec, "listen"); + return; + } + } + + // Start accepting incoming connections + void + run() + { + do_accept(); + } + +private: + void + do_accept() + { + // The new connection gets its own strand + acceptor_.async_accept( + net::make_strand(ioc_), + beast::bind_front_handler( + &listener::on_accept, + shared_from_this())); + } + + void + on_accept(beast::error_code ec, tcp::socket socket) + { + if(ec) + { + fail(ec, "accept"); + } + else + { + // Create the session and run it + std::make_shared(std::move(socket))->run(); + } + + // Accept another connection + do_accept(); + } +}; + +//------------------------------------------------------------------------------ + +int main(int argc, char* argv[]) +{ + // Check command line arguments. + if (argc != 4) + { + std::cerr << + "Usage: websocket-server-async
\n" << + "Example:\n" << + " websocket-server-async 0.0.0.0 8080 1\n"; + return EXIT_FAILURE; + } + auto const address = net::ip::make_address(argv[1]); + auto const port = static_cast(std::atoi(argv[2])); + auto const threads = std::max(1, std::atoi(argv[3])); + + // The io_context is required for all I/O + net::io_context ioc{threads}; + + // Create and launch a listening port + std::make_shared(ioc, tcp::endpoint{address, port})->run(); + + // Run the I/O service on the requested number of threads + std::vector v; + v.reserve(threads - 1); + for(auto i = threads - 1; i > 0; --i) + v.emplace_back( + [&ioc] + { + ioc.run(); + }); + ioc.run(); + + return EXIT_SUCCESS; +}