From ab06c272d3f68e87146fbca6da0bbe4e1184147e Mon Sep 17 00:00:00 2001 From: Ravidu Lashan Date: Wed, 9 Oct 2019 14:31:20 +0530 Subject: [PATCH] Websocket initial implementation (#9) * Added listener and session classes. * Added client session class. * Fixed minor code issues. * Initial server implementation p2p connection * Added a seperate thread to run the two servers * Implemented basic web socket architecture * Implemented basic peer to peer socket network * Added a sample message * Initial socket architecture completed * Improved code readability * Improved comments --- CMakeLists.txt | 4 + src/conf.h | 4 +- src/main.cpp | 4 +- src/sock/socket_client.cpp | 75 +++++++++++++++++ src/sock/socket_client.h | 51 ++++++++++++ src/sock/socket_server.cpp | 100 ++++++++++++++++++++++ src/sock/socket_server.h | 37 +++++++++ src/sock/socket_session.cpp | 133 ++++++++++++++++++++++++++++++ src/sock/socket_session.h | 55 ++++++++++++ src/sock/socket_session_handler.h | 27 ++++++ 10 files changed, 486 insertions(+), 4 deletions(-) create mode 100644 src/sock/socket_client.cpp create mode 100644 src/sock/socket_client.h create mode 100644 src/sock/socket_server.cpp create mode 100644 src/sock/socket_server.h create mode 100644 src/sock/socket_session.cpp create mode 100644 src/sock/socket_session.h create mode 100644 src/sock/socket_session_handler.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 3903752c..50e2fe66 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,10 +12,14 @@ add_executable(hpcore src/shared.cpp src/usr/usr.cpp src/shared.cpp + src/sock/socket_client.cpp + src/sock/socket_server.cpp + src/sock/socket_session.cpp ) target_link_libraries(hpcore libsodium.a libboost_system.a libboost_filesystem.a + pthread ) \ No newline at end of file diff --git a/src/conf.h b/src/conf.h index b137a8ed..34ec228b 100644 --- a/src/conf.h +++ b/src/conf.h @@ -48,9 +48,9 @@ struct ContractConfig string listenip; vector peers; vector unl; - int peerport; + unsigned short peerport; int roundtime; - int pubport; + unsigned short pubport; int pubmaxsize; int pubmaxcpm; }; diff --git a/src/main.cpp b/src/main.cpp index 46586f59..b50b9774 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -82,5 +82,5 @@ int parse_cmd(int argc, char **argv) cout << "hpcore (command = run | new | rekey)\n"; cout << "Example: hpcore run ~/mycontract\n"; - return -1; -} \ No newline at end of file + return 0; +} diff --git a/src/sock/socket_client.cpp b/src/sock/socket_client.cpp new file mode 100644 index 00000000..1ee785da --- /dev/null +++ b/src/sock/socket_client.cpp @@ -0,0 +1,75 @@ +#include +#include "socket_client.h" + +using tcp = net::ip::tcp; +using error = boost::system::error_code; + +namespace sock +{ + +socket_client::socket_client(net::io_context &ioc, socket_session_handler &session_handler) + : resolver_(net::make_strand(ioc)), ws_(net::make_strand(ioc)), socket_(ioc), sess_handler_(session_handler) +{ +} + +void socket_client::run(char const *host, char const *port) +{ + host_ = host; + port_ = (unsigned short)std::strtoul(port, NULL, 0); + + // Look up the domain name + resolver_.async_resolve( + host, + port, + [self = shared_from_this()](error ec, tcp::resolver::results_type results) { + self->on_resolve(ec, results); + }); +} + +void socket_client::on_resolve(error ec, tcp::resolver::results_type results) +{ + if (ec) + socket_client_fail(ec, "socket_client_resolve"); + + // Make the connection on the IP address we get from a lookup + beast::get_lowest_layer(ws_).async_connect( + results, + [self = shared_from_this()](error ec, tcp::resolver::results_type::endpoint_type type) { + self->on_connect(ec, type); + }); +} + +void socket_client::on_connect(error ec, tcp::resolver::results_type::endpoint_type) +{ + if (ec) + socket_client_fail(ec, "socket_client_connect"); + + // Turn off the timeout on the tcp_stream, because + // the websocket stream has its own timeout system. + beast::get_lowest_layer(ws_).expires_never(); + + // Set suggested timeout settings for the websocket + ws_.set_option( + websocket::stream_base::timeout::suggested( + beast::role_type::client)); + + // Perform the websocket handshake + ws_.async_handshake(host_, "/", + [self = shared_from_this()](error ec) { + self->on_handshake(ec); + }); +} + +void socket_client::on_handshake(error ec) +{ + std::make_shared( + std::move(socket_), sess_handler_) + ->client_run(port_, host_, ec); +} + +void socket_client::socket_client_fail(beast::error_code ec, char const *what) +{ + std::cerr << what << ": " << ec.message() << "\n"; +} + +} // namespace sock diff --git a/src/sock/socket_client.h b/src/sock/socket_client.h new file mode 100644 index 00000000..8c7a544e --- /dev/null +++ b/src/sock/socket_client.h @@ -0,0 +1,51 @@ +#ifndef _SOCK_CLIENT_SESSION_H_ +#define _SOCK_CLIENT_SESSION_H_ + +#include +#include +#include "socket_session.h" +#include "socket_session_handler.h" + +namespace beast = boost::beast; +namespace net = boost::asio; +namespace websocket = boost::beast::websocket; +namespace http = boost::beast::http; + +using tcp = net::ip::tcp; +using error = boost::system::error_code; + +namespace sock +{ + +/** + * Represents an active WebSocket client connection + * Based on the implementation from https://github.com/vinniefalco/CppCon2018 +*/ +class socket_client : public std::enable_shared_from_this +{ + tcp::resolver resolver_; + tcp::socket socket_; + websocket::stream ws_; + std::string host_; + unsigned short port_; + socket_session_handler &sess_handler_; + + void on_resolve(error ec, tcp::resolver::results_type results); + + void on_connect(error ec, tcp::resolver::results_type::endpoint_type); + + void on_handshake(error ec); + + void on_close(error ec); + + void socket_client_fail(beast::error_code ec, char const *what); + +public: + // Resolver and socket require an io_context + socket_client(net::io_context &ioc, socket_session_handler &session_handler); + + //Entry point to the client which requires an active host and port + void run(char const *host, char const *port); +}; +} // namespace sock +#endif \ No newline at end of file diff --git a/src/sock/socket_server.cpp b/src/sock/socket_server.cpp new file mode 100644 index 00000000..f2296375 --- /dev/null +++ b/src/sock/socket_server.cpp @@ -0,0 +1,100 @@ + +#include +#include +#include +#include +#include +#include "socket_server.h" + +namespace net = boost::asio; // namespace asio + +using tcp = net::ip::tcp; +using error_code = boost::system::error_code; + +namespace sock +{ + +socket_server::socket_server(net::io_context &ioc, tcp::endpoint endpoint, socket_session_handler &session_handler) + : acceptor_(ioc), socket_(ioc), sess_handler_(session_handler) +{ + error_code ec; + + // Open the acceptor + acceptor_.open(endpoint.protocol(), ec); + if (ec) + { + fail(ec, "open"); + return; + } + + // Allow address reuse + acceptor_.set_option(net::socket_base::reuse_address(true)); + if (ec) + { + fail(ec, "set_option"); + return; + } + + // Bind to the server address + acceptor_.bind(endpoint, ec); + if (ec) + { + fail(ec, "bind"); + return; + } + + // Start listening for connections + acceptor_.listen( + net::socket_base::max_listen_connections, ec); + if (ec) + { + fail(ec, "listen"); + return; + } +} + +void socket_server::run() +{ + // Start accepting a connection + acceptor_.async_accept( + socket_, + [self = shared_from_this()](error_code ec) { + self->on_accept(ec); + }); +} + +// Report a failure +void socket_server::fail(error_code ec, char const *what) +{ + // Don't report on canceled operations + if (ec == net::error::operation_aborted) + return; + std::cerr << what << ": " << ec.message() << "\n"; +} + +// Handle a connection +void socket_server::on_accept(error_code ec) +{ + if (ec) + { + return fail(ec, "accept"); + } + else + { + unsigned short port = socket_.remote_endpoint().port(); + std::string address = socket_.remote_endpoint().address().to_string(); + + // Launch a new session for this connection + std::make_shared( + std::move(socket_), sess_handler_) + ->server_run(port, address); + } + + // Accept another connection + acceptor_.async_accept( + socket_, + [self = shared_from_this()](error_code ec) { + self->on_accept(ec); + }); +} +} // namespace sock \ No newline at end of file diff --git a/src/sock/socket_server.h b/src/sock/socket_server.h new file mode 100644 index 00000000..2b51003e --- /dev/null +++ b/src/sock/socket_server.h @@ -0,0 +1,37 @@ +#ifndef _SOCK_SERVER_LISTENER_H_ +#define _SOCK_SERVER_LISTENER_H_ + +#include +#include "socket_session_handler.h" + +namespace net = boost::asio; // namespace asio + +using tcp = net::ip::tcp; +using error = boost::system::error_code; // from + +namespace sock +{ + +/** + * Represents an active WebSocket server connection + * Based on the implementation from https://github.com/vinniefalco/CppCon2018 +*/ +class socket_server : public std::enable_shared_from_this +{ + tcp::acceptor acceptor_; + tcp::socket socket_; + socket_session_handler &sess_handler_; + + void fail(error ec, char const *what); + + void on_accept(error ec); + +public: + socket_server(net::io_context &ioc, tcp::endpoint endpoint, socket_session_handler &session_handler); + + // Start accepting incoming connections + void run(); +}; +} // namespace sock + +#endif \ No newline at end of file diff --git a/src/sock/socket_session.cpp b/src/sock/socket_session.cpp new file mode 100644 index 00000000..4f78b63b --- /dev/null +++ b/src/sock/socket_session.cpp @@ -0,0 +1,133 @@ +#include +#include +#include +#include "socket_session.h" + +namespace net = boost::asio; + +using tcp = net::ip::tcp; +using error_code = boost::system::error_code; + +namespace sock +{ + +socket_session::socket_session(tcp::socket socket, socket_session_handler &sess_handler) + : ws_(std::move(socket)), sess_handler_(sess_handler) +{ +} + +void socket_session::server_run(const unsigned short &port, const std::string &address) +{ + port_ = port; + address_ = address; + + // Accept the websocket handshake + ws_.async_accept( + [sp = shared_from_this()]( + error ec) { + sp->on_accept(ec); + }); +} + +void socket_session::client_run(const unsigned short &port, const std::string &address, error ec) +{ + port_ = port; + address_ = address; + + sess_handler_.on_connect(this, ec); + if (ec) + return fail(ec, "handshake"); + + ws_.async_read( + buffer_, + [sp = shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_read(ec, bytes); + }); +} + +void socket_session::fail(error_code ec, char const *what) +{ + // Don't report these + if (ec == net::error::operation_aborted || + ec == websocket::error::closed) + return; + + std::cerr << what << ": " << ec.message() << "\n"; +} + +void socket_session::on_accept(error_code ec) +{ + sess_handler_.on_connect(this, ec); + + // Handle the error, if any + if (ec) + return fail(ec, "accept"); + + // Read a message + ws_.async_read( + buffer_, + [sp = shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_read(ec, bytes); + }); +} + +void socket_session::on_read(error_code ec, std::size_t) +{ + auto const string_message = std::make_shared(std::move(beast::buffers_to_string(buffer_.data()))); + sess_handler_.on_message(this, string_message, ec); + + // Handle the error, if any + if (ec) + return fail(ec, "read"); + + // Clear the buffer + buffer_.consume(buffer_.size()); + + // Read another message + ws_.async_read( + buffer_, + [sp = shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_read(ec, bytes); + }); +} + +void socket_session::send(std::shared_ptr const &ss) +{ + // Always add to queue + queue_.push_back(ss); + + // Are we already writing? + if (queue_.size() > 1) + return; + + // We are not currently writing, so send this immediately + ws_.async_write( + net::buffer(*queue_.front()), + [sp = shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_write(ec, bytes); + }); +} + +void socket_session::on_write(error_code ec, std::size_t) +{ + // Handle the error, if any + if (ec) + return fail(ec, "write"); + + // Remove the string from the queue + queue_.erase(queue_.begin()); + + // Send the next message if any + if (!queue_.empty()) + ws_.async_write( + net::buffer(*queue_.front()), + [sp = shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_write(ec, bytes); + }); +} +} // namespace sock \ No newline at end of file diff --git a/src/sock/socket_session.h b/src/sock/socket_session.h new file mode 100644 index 00000000..14d5c217 --- /dev/null +++ b/src/sock/socket_session.h @@ -0,0 +1,55 @@ +#ifndef _SOCK_SERVER_SESSION_H_ +#define _SOCK_SERVER_SESSION_H_ + +#include +#include +#include +#include +#include "socket_session_handler.h" + +namespace beast = boost::beast; +namespace net = boost::asio; +namespace websocket = boost::beast::websocket; +namespace http = boost::beast::http; + +using tcp = net::ip::tcp; +using error = boost::system::error_code; + +namespace sock +{ + +//Forward Declaration +class socket_session_handler; + +/** + * Represents an active WebSocket connection +*/ +class socket_session : public std::enable_shared_from_this +{ + beast::flat_buffer buffer_; + websocket::stream ws_; + std::vector> queue_; + socket_session_handler &sess_handler_; + + void fail(error ec, char const *what); + + void on_accept(error ec); + + void on_read(error ec, std::size_t bytes_transferred); + + void on_write(error ec, std::size_t bytes_transferred); + +public: + socket_session(tcp::socket socket, socket_session_handler &sess_handler); + + unsigned short port_; + std::string address_; + + void server_run(const unsigned short &port, const std::string &address); + void client_run(const unsigned short &port, const std::string &address, error ec); + + //Used to send message through an active websocket connection + void send(std::shared_ptr const &ss); +}; +} // namespace sock +#endif diff --git a/src/sock/socket_session_handler.h b/src/sock/socket_session_handler.h new file mode 100644 index 00000000..dc16cdaa --- /dev/null +++ b/src/sock/socket_session_handler.h @@ -0,0 +1,27 @@ +#ifndef _SOCK_SESSION_HANDLER_H_ +#define _SOCK_SESSION_HANDLER_H_ + +#include "socket_session.h" +#include + +using error = boost::system::error_code; + +namespace sock +{ + +// Forward declaration +class socket_session; + +/** + * Represents a WebSocket sessions handler. Can inherit from this class and access websocket events +*/ +class socket_session_handler +{ +public: + virtual void on_connect(socket_session *session, error ec) = 0; + virtual void on_message(socket_session *session, std::shared_ptr const &message, error ec) = 0; + virtual void on_close(socket_session *session) = 0; +}; +} // namespace sock + +#endif \ No newline at end of file