From 61b38bb0a00da7b0fdb2c3979cb778812fd71dc2 Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Wed, 23 Oct 2019 13:04:57 +0530 Subject: [PATCH] Implemented socket message templates. (#40) Implemented socket message templates to support broadcast (shared_ptr) and to achieve buffer zero-copy. --- CMakeLists.txt | 3 - src/p2p/p2p.cpp | 31 ++-- src/p2p/p2p.hpp | 3 +- src/p2p/peer_session_handler.cpp | 89 +++++---- src/p2p/peer_session_handler.hpp | 28 ++- src/sock/socket_client.cpp | 93 ---------- src/sock/socket_client.hpp | 105 ++++++++++- src/sock/socket_server.cpp | 112 ----------- src/sock/socket_server.hpp | 122 +++++++++++- src/sock/socket_session.cpp | 209 --------------------- src/sock/socket_session.hpp | 277 ++++++++++++++++++++++++++-- src/sock/socket_session_handler.hpp | 8 +- src/usr/user_session_handler.cpp | 52 ++++-- src/usr/user_session_handler.hpp | 30 ++- src/usr/usr.cpp | 6 +- src/usr/usr.hpp | 7 +- 16 files changed, 636 insertions(+), 539 deletions(-) delete mode 100644 src/sock/socket_client.cpp delete mode 100644 src/sock/socket_server.cpp delete mode 100644 src/sock/socket_session.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index c6562f44..7e7c0863 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,9 +7,6 @@ find_package(Boost REQUIRED COMPONENTS system log) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY build) add_executable(hpcore - src/sock/socket_client.cpp - src/sock/socket_server.cpp - src/sock/socket_session.cpp src/p2p/peer_session_handler.cpp src/p2p/p2p.cpp src/util.cpp diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 61a7d4a0..11fd6c69 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -5,7 +5,6 @@ #include "../crypto.hpp" #include "../util.hpp" #include "../hplog.hpp" -#include "peer_session_handler.hpp" #include "p2p.hpp" namespace p2p @@ -13,7 +12,7 @@ namespace p2p /** * Peer connections exposing to the application */ -std::unordered_map peer_connections; +std::unordered_map *> peer_connections; /** * Peer session handler instance. This instance's methods will be fired for any peer socket activity. @@ -51,7 +50,7 @@ void start_peer_connections() auto address = net::ip::make_address(conf::cfg.listenip); // Start listening to peers - std::make_shared( + std::make_shared>( ioc, tcp::endpoint{address, conf::cfg.peerport}, global_peer_session_handler) @@ -77,7 +76,8 @@ void peer_connection_watchdog() if (peer_connections.find(v.first) == peer_connections.end()) { LOG_DBG << "Trying to connect :" << v.second.first << ":" << v.second.second; - std::make_shared(ioc, global_peer_session_handler)->run(v.second.first, v.second.second); + std::make_shared>(ioc, global_peer_session_handler) + ->run(v.second.first, v.second.second); } } @@ -126,7 +126,18 @@ bool validate_peer_message(std::string_view message, std::string_view signature, return false; } - //get message hash and see wheteher message is already recieved -> abandon + //verify message signature. + //this should be the last validation since this is bit expensive + auto signature_verified = crypto::verify(message, signature, pubkey); + + if (signature_verified != 0) + { + LOG_DBG << "Signature verification failed"; + return false; + } + + // After signature is verified, get message hash and see wheteher + // message is already recieved -> abandon if duplicate. auto messageHash = crypto::sha_512_hash(message, "PEERMSG", 7); if (recent_peer_msghash.count(messageHash) == 0) @@ -139,16 +150,6 @@ bool validate_peer_message(std::string_view message, std::string_view signature, return false; } - //verify message signature. - //this should be the last validation since this is bit expensive - auto signature_verified = crypto::verify(message, signature, pubkey); - - if (signature_verified != 0) - { - LOG_DBG << "Signature verification failed"; - return false; - } - return true; } diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 0bbf964b..3d5cba28 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -3,13 +3,14 @@ #include #include "../sock/socket_session.hpp" +#include "peer_session_handler.hpp" namespace p2p { /** * This is used to store active peer connections mapped by the unique key of socket session */ -extern std::unordered_map peer_connections; +extern std::unordered_map *> peer_connections; /** * This is used to store hash of recent peer messages: messagehash -> timestamp of message diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 3bbb1f49..a3770f13 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -1,20 +1,40 @@ #include +#include #include "../conf.hpp" #include "../crypto.hpp" -#include "p2p.hpp" #include "../util.hpp" #include "../hplog.hpp" +#include "p2p.hpp" #include "peer_session_handler.hpp" -#include "flatbuffers/flatbuffers.h" #include "message_content_generated.h" #include "message_container_generated.h" namespace p2p { +peer_outbound_message::peer_outbound_message( + std::shared_ptr _fbbuilder_ptr) +{ + fbbuilder_ptr = _fbbuilder_ptr; +} + +// Returns a reference to the flatbuffer builder object. +flatbuffers::FlatBufferBuilder &peer_outbound_message::builder() +{ + return *fbbuilder_ptr; +} + +// Returns a reference to the data buffer that must be written to the socket. +std::string_view peer_outbound_message::buffer() +{ + return std::string_view( + reinterpret_cast((*fbbuilder_ptr).GetBufferPointer()), + (*fbbuilder_ptr).GetSize()); +} + //private method used to create a proposal message with dummy data. //Will be similiar to consensus proposal creation in each stage. -const std::string create_message() +const std::string create_message(flatbuffers::FlatBufferBuilder &container_builder) { //todo:get a average propsal message size and allocate builder based on that. /* @@ -41,10 +61,6 @@ const std::string create_message() const char *content_str = reinterpret_cast(buf); std::string_view message_content(content_str, size); - //todo: set container builder defualt builder size to combination of serialized content length + signature length(which is fixed) - // Do this when implementing consensus. - flatbuffers::FlatBufferBuilder container_builder(1024); - //create container message content from serialised content from previous step. flatbuffers::Offset> content = container_builder.CreateVector(buf, size); @@ -63,45 +79,52 @@ const std::string create_message() return std::string((char *)message_buf, buf_size); } -//private method returns string_view from Flat Buffer vector of bytes. -std::string_view flatbuff_bytes_to_sv(const flatbuffers::Vector *pointer) +/** + * Private method to return string_view from flat buffer data pointer and length. + */ +std::string_view flatbuff_bytes_to_sv(const uint8_t *data, flatbuffers::uoffset_t length) { - flatbuffers::uoffset_t pointer_length = pointer->size(); - const uint8_t *pointer_buf = pointer->Data(); + const char *signature_content_str = reinterpret_cast(data); + return std::string_view(signature_content_str, length); +} - const char *signature_content_str = reinterpret_cast(pointer_buf); - return std::string_view(signature_content_str, pointer_length); +/** + * Private method to return string_view from Flat Buffer vector of bytes. + */ +std::string_view flatbuff_bytes_to_sv(const flatbuffers::Vector *buffer) +{ + return flatbuff_bytes_to_sv(buffer->Data(), buffer->size()); } /** * This gets hit every time a peer connects to HP via the peer port (configured in contract config). */ -void peer_session_handler::on_connect(sock::socket_session *session) +void peer_session_handler::on_connect(sock::socket_session *session) { if (!session->flags_[util::SESSION_FLAG::INBOUND]) { // We init the session unique id to associate with the challenge. session->init_uniqueid(); - peer_connections.insert(std::make_pair(session->uniqueid_, session)); - LOG_DBG << "Adding peer to list :" << session->uniqueid_ + " " << session->address_ + " " << session->port_; + peer_connections.insert(std::make_pair(session->uniqueid, session)); + LOG_DBG << "Adding peer to list: " << session->uniqueid << " " << session->address << " " << session->port; } else { - std::string message = create_message(); - session->send(std::move(message)); + // todo: set container builder defualt builder size to combination of serialized content length + signature length(which is fixed) + peer_outbound_message msg(std::make_shared(1024)); + std::string message = create_message(msg.builder()); + session->send(msg); } } //peer session on message callback method //validate and handle each type of peer messages. -void peer_session_handler::on_message(sock::socket_session *session, std::string &&message) +void peer_session_handler::on_message(sock::socket_session *session, std::string_view message) { - // LOG_DBG << "on-message : " << message; - peer_connections.insert(std::make_pair(session->uniqueid_, session)); - //session->send(std::make_shared(message)); + peer_connections.insert(std::make_pair(session->uniqueid, session)); //Accessing message buffer - uint8_t *container_pointer = (uint8_t *)message.data(); + const uint8_t *container_pointer = reinterpret_cast(message.data()); size_t container_length = message.length(); //Defining Flatbuffer verifier (default max depth = 64, max_tables = 1000000,) @@ -116,16 +139,14 @@ void peer_session_handler::on_message(sock::socket_session *session, std::string //Get serialised message content. const flatbuffers::Vector *container_content = container->content(); - const uint8_t *container_content_buf = container_content->Data(); - std::string_view message_content = flatbuff_bytes_to_sv(container_content); - - //Accessing message content. + //Accessing message content and size. const uint8_t *content_pointer = container_content->Data(); + flatbuffers::uoffset_t content_size = container_content->size(); //Defining Flatbuffer verifier for content message verification. //Since content is also serialised by using Filterbuf we can verify it using Filterbuffer. - flatbuffers::Verifier content_verifier(content_pointer, message_content.size()); + flatbuffers::Verifier content_verifier(content_pointer, content_size); //verify content message conent using flatbuffer verifier. if (VerifyContainerBuffer(content_verifier)) @@ -140,12 +161,12 @@ void peer_session_handler::on_message(sock::socket_session *session, std::string uint64_t timestamp = proposal->timestamp(); //Get public key of message originating node. - const flatbuffers::Vector *pubkey = proposal->pubkey(); - std::string_view message_pubkey = flatbuff_bytes_to_sv(pubkey); + std::string_view message_pubkey = flatbuff_bytes_to_sv(proposal->pubkey()); //Get signature from container message. - const flatbuffers::Vector *signature = container->signature(); - std::string_view message_signature = flatbuff_bytes_to_sv(signature); + std::string_view message_signature = flatbuff_bytes_to_sv(container->signature()); + + std::string_view message_content = flatbuff_bytes_to_sv(content_pointer, content_size); //validate message for malleability, timeliness, signature and prune recieving messages. bool validated = p2p::validate_peer_message(message_content, message_signature, message_pubkey, timestamp, version); @@ -186,9 +207,9 @@ void peer_session_handler::on_message(sock::socket_session *session, std::string } //peer session on message callback method -void peer_session_handler::on_close(sock::socket_session *session) +void peer_session_handler::on_close(sock::socket_session *session) { - LOG_DBG << "on_closing peer :" << session->uniqueid_; + LOG_DBG << "on_closing peer :" << session->uniqueid; } } // namespace p2p \ No newline at end of file diff --git a/src/p2p/peer_session_handler.hpp b/src/p2p/peer_session_handler.hpp index bf83f451..c0f306e4 100644 --- a/src/p2p/peer_session_handler.hpp +++ b/src/p2p/peer_session_handler.hpp @@ -2,20 +2,40 @@ #define _HP_P2P_SESSION_H_ #include +#include #include "../sock/socket_session_handler.hpp" #include "../sock/socket_session.hpp" namespace p2p { -class peer_session_handler : public sock::socket_session_handler +/** + * Represents a peer message generated using flatbuffer that must be sent to the socket. + * We keep a shared_ptr of flatbuffer builder to support broadcasting the same message + * on multiple connections without copying buffer contents. + */ +class peer_outbound_message : public sock::outbound_message +{ + std::shared_ptr fbbuilder_ptr; + +public: + peer_outbound_message(std::shared_ptr _fbbuilder_ptr); + + // Returns a reference to the flatbuffer builder object. + flatbuffers::FlatBufferBuilder& builder(); + + // Returns a reference to the data buffer that must be written to the socket. + virtual std::string_view buffer(); +}; + +class peer_session_handler : public sock::socket_session_handler { public: - void on_connect(sock::socket_session *session); + void on_connect(sock::socket_session *session); - void on_message(sock::socket_session *session, std::string &&message); + void on_message(sock::socket_session *session, std::string_view message); - void on_close(sock::socket_session *session); + void on_close(sock::socket_session *session); }; } // namespace p2p diff --git a/src/sock/socket_client.cpp b/src/sock/socket_client.cpp deleted file mode 100644 index 04ab1382..00000000 --- a/src/sock/socket_client.cpp +++ /dev/null @@ -1,93 +0,0 @@ -#include -#include "socket_client.hpp" -#include "../hplog.hpp" - -using tcp = net::ip::tcp; -using error = boost::system::error_code; - -namespace sock -{ - -socket_client::socket_client(net::io_context &ioc, socket_session_handler &session_handler) - : resolver_(net::make_strand(ioc)), ws_(net::make_strand(ioc)), sess_handler_(session_handler) -{ -} - -/** - * Entry point to socket client which will intiate a connection to server -*/ -// boost async_resolve function requires a port as a string because of that port is passed as a string -void socket_client::run(std::string_view host, std::string_view port) -{ - host_ = host; - port_ = port; - - // Look up the domain name - resolver_.async_resolve( - host, - port, - [self = shared_from_this()](error ec, tcp::resolver::results_type results) { - self->on_resolve(ec, results); - }); -} - -/** - * Executes on completion of resolving the server -*/ -void socket_client::on_resolve(error ec, tcp::resolver::results_type results) -{ - if (ec) - socket_client_fail(ec, "socket_client_resolve"); - - // Make the connection on the IP address we get from a lookup - beast::get_lowest_layer(ws_).async_connect( - results, - [self = shared_from_this()](error ec, tcp::resolver::results_type::endpoint_type type) { - self->on_connect(ec, type); - }); -} - -/** - * Executes on completion of connecting to the server -*/ -void socket_client::on_connect(error ec, tcp::resolver::results_type::endpoint_type) -{ - if (ec) - socket_client_fail(ec, "socket_client_connect"); - - // Turn off the timeout on the tcp_stream, because - // the websocket stream has its own timeout system. - beast::get_lowest_layer(ws_).expires_never(); - - // Set suggested timeout settings for the websocket - ws_.set_option( - websocket::stream_base::timeout::suggested( - beast::role_type::client)); - - // Perform the websocket handshake - ws_.async_handshake(host_, "/", - [self = shared_from_this()](error ec) { - self->on_handshake(ec); - }); -} - -/** - * Executes on completion of handshake -*/ -void socket_client::on_handshake(error ec) -{ - //Creates a new socket session object - std::make_shared( - ws_, sess_handler_) - ->client_run(std::move(host_), std::move(port_), ec); -} - -/** - * Executes on error -*/ -void socket_client::socket_client_fail(beast::error_code ec, char const *what) -{ - LOG_ERR << what << ": " << ec.message(); -} - -} // namespace sock diff --git a/src/sock/socket_client.hpp b/src/sock/socket_client.hpp index f552629e..cd7783b9 100644 --- a/src/sock/socket_client.hpp +++ b/src/sock/socket_client.hpp @@ -5,6 +5,7 @@ #include #include "socket_session.hpp" #include "socket_session_handler.hpp" +#include "../hplog.hpp" namespace beast = boost::beast; namespace net = boost::asio; @@ -20,13 +21,14 @@ namespace sock * Represents an active WebSocket client connection * Based on the implementation from https://github.com/vinniefalco/CppCon2018 */ -class socket_client : public std::enable_shared_from_this +template +class socket_client : public std::enable_shared_from_this> { - tcp::resolver resolver_; // resolver used to resolve host and the port - websocket::stream ws_; // web socket stream used to send and receive messages - std::string host_; // address of the server in which the client connects - std::string port_; // port of the server in which client connects - socket_session_handler &sess_handler_; // handler passed to gain access to websocket events + tcp::resolver resolver; // resolver used to resolve host and the port + websocket::stream ws; // web socket stream used to send and receive messages + std::string host; // address of the server in which the client connects + std::string port; // port of the server in which client connects + socket_session_handler &sess_handler_; // handler passed to gain access to websocket events void on_resolve(error ec, tcp::resolver::results_type results); @@ -42,10 +44,99 @@ class socket_client : public std::enable_shared_from_this public: // Resolver and socket require an io_context - socket_client(net::io_context &ioc, socket_session_handler &session_handler); + socket_client(net::io_context &ioc, socket_session_handler &session_handler); //Entry point to the client which requires an active host and port void run(std::string_view host, std::string_view port); }; + +template +socket_client::socket_client(net::io_context &ioc, socket_session_handler &session_handler) + : resolver(net::make_strand(ioc)), ws(net::make_strand(ioc)), sess_handler_(session_handler) +{ +} + +/** + * Entry point to socket client which will intiate a connection to server +*/ +// boost async_resolve function requires a port as a string because of that port is passed as a string +template +void socket_client::run(std::string_view host, std::string_view port) +{ + this->host = host; + this->port = port; + + // Look up the domain name + resolver.async_resolve( + host, + port, + [self = this->shared_from_this()](error ec, tcp::resolver::results_type results) { + self->on_resolve(ec, results); + }); +} + +/** + * Executes on completion of resolving the server +*/ +template +void socket_client::on_resolve(error ec, tcp::resolver::results_type results) +{ + if (ec) + socket_client_fail(ec, "socket_client_resolve"); + + // Make the connection on the IP address we get from a lookup + beast::get_lowest_layer(ws).async_connect( + results, + [self = this->shared_from_this()](error ec, tcp::resolver::results_type::endpoint_type type) { + self->on_connect(ec, type); + }); +} + +/** + * Executes on completion of connecting to the server +*/ +template +void socket_client::on_connect(error ec, tcp::resolver::results_type::endpoint_type) +{ + if (ec) + socket_client_fail(ec, "socket_client_connect"); + + // Turn off the timeout on the tcp_stream, because + // the websocket stream has its own timeout system. + beast::get_lowest_layer(ws).expires_never(); + + // Set suggested timeout settings for the websocket + ws.set_option( + websocket::stream_base::timeout::suggested( + beast::role_type::client)); + + // Perform the websocket handshake + ws.async_handshake(host, "/", + [self = this->shared_from_this()](error ec) { + self->on_handshake(ec); + }); +} + +/** + * Executes on completion of handshake +*/ +template +void socket_client::on_handshake(error ec) +{ + //Creates a new socket session object + std::make_shared>( + ws, sess_handler_) + ->client_run(std::move(host), std::move(port), ec); +} + +/** + * Executes on error +*/ +template +void socket_client::socket_client_fail(beast::error_code ec, char const *what) +{ + LOG_ERR << what << ": " << ec.message(); +} + } // namespace sock #endif \ No newline at end of file diff --git a/src/sock/socket_server.cpp b/src/sock/socket_server.cpp deleted file mode 100644 index 304e96b4..00000000 --- a/src/sock/socket_server.cpp +++ /dev/null @@ -1,112 +0,0 @@ - -#include -#include -#include -#include -#include -#include "socket_server.hpp" -#include "../hplog.hpp" - -namespace net = boost::asio; // namespace asio - -using tcp = net::ip::tcp; -using error_code = boost::system::error_code; - -namespace sock -{ - -socket_server::socket_server(net::io_context &ioc, tcp::endpoint endpoint, socket_session_handler &session_handler) - : acceptor_(ioc), socket_(ioc),sess_handler_(session_handler) -{ - error_code ec; - - // Open the acceptor - acceptor_.open(endpoint.protocol(), ec); - if (ec) - { - fail(ec, "open"); - return; - } - - // Allow address reuse - acceptor_.set_option(net::socket_base::reuse_address(true)); - if (ec) - { - fail(ec, "set_option"); - return; - } - - // Bind to the server address - acceptor_.bind(endpoint, ec); - if (ec) - { - fail(ec, "bind"); - return; - } - - // Start listening for connections - acceptor_.listen( - net::socket_base::max_listen_connections, ec); - if (ec) - { - fail(ec, "listen"); - return; - } -} - -/** - * Entry point to socket server which accepts new connections -*/ -void socket_server::run() -{ - - // Start accepting a connection - acceptor_.async_accept( - socket_, - [self = shared_from_this()](error_code ec) { - self->on_accept(ec); - }); -} - -/** - * Executes on error -*/ -void socket_server::fail(error_code ec, char const *what) -{ - // Don't report on canceled operations - if (ec == net::error::operation_aborted) - return; - LOG_ERR << what << ": " << ec.message(); -} - -/** - * Executes on acceptance of new connection -*/ -void socket_server::on_accept(error_code ec) -{ - if (ec) - { - return fail(ec, "accept"); - } - else - { - std::string port = std::to_string(socket_.remote_endpoint().port()); - std::string address = socket_.remote_endpoint().address().to_string(); - - //Creating websocket stream required to pass to initiate a new session - websocket::stream ws(std::move(socket_)); - - // Launch a new session for this connection - std::make_shared( - ws, sess_handler_) - ->server_run(std::move(address), std::move(port)); - } - - // Accept another connection - acceptor_.async_accept( - socket_, - [self = shared_from_this()](error_code ec) { - self->on_accept(ec); - }); -} -} // namespace sock \ No newline at end of file diff --git a/src/sock/socket_server.hpp b/src/sock/socket_server.hpp index f977712b..92b879c1 100644 --- a/src/sock/socket_server.hpp +++ b/src/sock/socket_server.hpp @@ -2,12 +2,16 @@ #define _SOCK_SERVER_LISTENER_H_ #include +#include +#include +#include #include "socket_session_handler.hpp" +#include "../hplog.hpp" namespace net = boost::asio; // namespace asio using tcp = net::ip::tcp; -using error = boost::system::error_code; // from +using error_code = boost::system::error_code; namespace sock { @@ -16,22 +20,124 @@ namespace sock * Represents an active WebSocket server connection * Based on the implementation from https://github.com/vinniefalco/CppCon2018 */ -class socket_server : public std::enable_shared_from_this +template +class socket_server : public std::enable_shared_from_this> { - tcp::acceptor acceptor_; // acceptor which accepts new connections - tcp::socket socket_; // socket in which the client connects - socket_session_handler &sess_handler_; // handler passed to gain access to websocket events + tcp::acceptor acceptor; // acceptor which accepts new connections + tcp::socket socket; // socket in which the client connects + socket_session_handler &sess_handler; // handler passed to gain access to websocket events - void fail(error ec, char const *what); + void fail(error_code ec, char const *what); - void on_accept(error ec); + void on_accept(error_code ec); public: - socket_server(net::io_context &ioc, tcp::endpoint endpoint, socket_session_handler &session_handler); + socket_server(net::io_context &ioc, tcp::endpoint endpoint, socket_session_handler &session_handler); // Start accepting incoming connections void run(); }; + + +template +socket_server::socket_server(net::io_context &ioc, tcp::endpoint endpoint, socket_session_handler &session_handler) + : acceptor(ioc), socket(ioc), sess_handler(session_handler) +{ + error_code ec; + + // Open the acceptor + acceptor.open(endpoint.protocol(), ec); + if (ec) + { + fail(ec, "open"); + return; + } + + // Allow address reuse + acceptor.set_option(net::socket_base::reuse_address(true)); + if (ec) + { + fail(ec, "set_option"); + return; + } + + // Bind to the server address + acceptor.bind(endpoint, ec); + if (ec) + { + fail(ec, "bind"); + return; + } + + // Start listening for connections + acceptor.listen( + net::socket_base::max_listen_connections, ec); + if (ec) + { + fail(ec, "listen"); + return; + } +} + +/** + * Entry point to socket server which accepts new connections +*/ +template +void socket_server::run() +{ + + // Start accepting a connection + acceptor.async_accept( + socket, + [self = this->shared_from_this()](error_code ec) { + self->on_accept(ec); + }); +} + +/** + * Executes on error +*/ +template +void socket_server::fail(error_code ec, char const *what) +{ + // Don't report on canceled operations + if (ec == net::error::operation_aborted) + return; + LOG_ERR << what << ": " << ec.message(); +} + +/** + * Executes on acceptance of new connection +*/ +template +void socket_server::on_accept(error_code ec) +{ + if (ec) + { + return fail(ec, "accept"); + } + else + { + std::string port = std::to_string(socket.remote_endpoint().port()); + std::string address = socket.remote_endpoint().address().to_string(); + + //Creating websocket stream required to pass to initiate a new session + websocket::stream ws(std::move(socket)); + + // Launch a new session for this connection + std::make_shared>( + ws, sess_handler) + ->server_run(std::move(address), std::move(port)); + } + + // Accept another connection + acceptor.async_accept( + socket, + [self = this->shared_from_this()](error_code ec) { + self->on_accept(ec); + }); +} + } // namespace sock #endif \ No newline at end of file diff --git a/src/sock/socket_session.cpp b/src/sock/socket_session.cpp deleted file mode 100644 index 0dd3dfe3..00000000 --- a/src/sock/socket_session.cpp +++ /dev/null @@ -1,209 +0,0 @@ -#include -#include -#include -#include "socket_session.hpp" -#include "../util.hpp" - -namespace net = boost::asio; - -using tcp = net::ip::tcp; -using error_code = boost::system::error_code; - -namespace sock -{ - -socket_session::socket_session(websocket::stream &websocket, socket_session_handler &sess_handler) - : ws_(std::move(websocket)), sess_handler_(sess_handler) -{ - ws_.binary(true); -} - -socket_session::~socket_session() -{ - sess_handler_.on_close(this); -} - -//port and address will be used to identify from which client the message recieved in the handler -void socket_session::server_run(const std::string &&address, const std::string &&port) -{ - port_ = port; - address_ = address; - - //Set this flag to identify whether this socket session created when node acts as a server - flags_.set(util::SESSION_FLAG::INBOUND); - - // Accept the websocket handshake - ws_.async_accept( - [sp = shared_from_this()]( - error ec) { - sp->on_accept(ec); - }); -} - -//port and address will be used to identify from which server the message recieved in the handler -void socket_session::client_run(const std::string &&address, const std::string &&port, error ec) -{ - port_ = port; - address_ = address; - - if (ec) - return fail(ec, "handshake"); - - sess_handler_.on_connect(this); - - ws_.async_read( - buffer_, - [sp = shared_from_this()]( - error_code ec, std::size_t bytes) { - sp->on_read(ec, bytes); - }); -} - -/** - * Executes on error -*/ -void socket_session::fail(error_code ec, char const *what) -{ - // LOG_ERR << what << ": " << ec.message(); - - // Don't report these - if (ec == net::error::operation_aborted || - ec == websocket::error::closed) - return; -} - -/** - * Executes on acceptance of new connection -*/ -void socket_session::on_accept(error_code ec) -{ - // Handle the error, if any - if (ec) - return fail(ec, "accept"); - - sess_handler_.on_connect(this); - - // Read a message - ws_.async_read( - buffer_, - [sp = shared_from_this()]( - error_code ec, std::size_t bytes) { - sp->on_read(ec, bytes); - }); -} - -/* -* Executes on completion of recieiving a new message -*/ -void socket_session::on_read(error_code ec, std::size_t) -{ - //if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler - // read may get called when operation_aborted as well. - // We don't need to process read operation in that case. - if (ec == net::error::operation_aborted) - return; - - // Handle the error, if any - if (ec) - { - // if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler - on_close(ec, 1); - return fail(ec, "read"); - } - - std::string message = beast::buffers_to_string(buffer_.data()); - sess_handler_.on_message(this, std::move(message)); - - // Clear the buffer - buffer_.consume(buffer_.size()); - - // Read another message - ws_.async_read( - buffer_, - [sp = shared_from_this()]( - error_code ec, std::size_t bytes) { - sp->on_read(ec, bytes); - }); -} - -/* -* Send message through an active websocket connection -*/ -void socket_session::send(std::string &&ss) -{ - // Always add to queue - queue_.push_back(ss); - - // Are we already writing? - if (queue_.size() > 1) - return; - - // We are not currently writing, so send this immediately - ws_.async_write( - net::buffer(queue_.front()), - [sp = shared_from_this()]( - error_code ec, std::size_t bytes) { - sp->on_write(ec, bytes); - }); -} - -/* -* Executes on completion of write operation to a socket -*/ -void socket_session::on_write(error_code ec, std::size_t) -{ - // Handle the error, if any - if (ec) - return fail(ec, "write"); - - // Remove the string from the queue - queue_.erase(queue_.begin()); - - // Send the next message if any - if (!queue_.empty()) - ws_.async_write( - net::buffer(queue_.front()), - [sp = shared_from_this()]( - error_code ec, std::size_t bytes) { - sp->on_write(ec, bytes); - }); -} - -/* -* Close an active websocket connection gracefully -*/ -void socket_session::close() -{ - // Close the WebSocket connection - ws_.async_close(websocket::close_code::normal, - [sp = shared_from_this()]( - error_code ec) { - sp->on_close(ec, 0); - }); -} - -/* -* Executes on completion of closing a socket connection -*/ -//type will be used identify whether the error is due to failure in closing the web socket or transfer of another exception to this method -void socket_session::on_close(error_code ec, std::int8_t type) -{ - // sess_handler_.on_close(this); - - // if (type == 1) - // return; - - // if (ec) - // return fail(ec, "close"); -} - -// When called, initializes the unique id string for this session. -void socket_session::init_uniqueid() -{ - // Create a unique id for the session combining ip and port. - // We prepare this appended string here because we need to use it for finding elemends from the maps - // for validation purposes whenever a message is received. - uniqueid_.append(address_).append(":").append(port_); -} - -} // namespace sock \ No newline at end of file diff --git a/src/sock/socket_session.hpp b/src/sock/socket_session.hpp index 670b9085..7426a9b2 100644 --- a/src/sock/socket_session.hpp +++ b/src/sock/socket_session.hpp @@ -1,11 +1,14 @@ #ifndef _SOCK_SERVER_SESSION_H_ #define _SOCK_SERVER_SESSION_H_ -#include +#include #include #include #include #include +#include +#include +#include "../util.hpp" #include "socket_session_handler.hpp" namespace beast = boost::beast; @@ -14,51 +17,67 @@ namespace websocket = boost::beast::websocket; namespace http = boost::beast::http; using tcp = net::ip::tcp; -using error = boost::system::error_code; +using error_code = boost::system::error_code; namespace sock { +/** + * Represents an outbound message that is sent with a websocket. + * We use this class to wrap different object types holding actual message contents. + * We use this mechanism to achieve end-to-end zero-copy between original message + * content generator and websocket flush. + */ +class outbound_message +{ +public: + // Returns a pointer to the internal buffer owned by the message object. + // Contents of this buffer is the message that is sent/received with the socket. + virtual std::string_view buffer() = 0; +}; + //Forward Declaration +template class socket_session_handler; /** * Represents an active WebSocket connection */ -class socket_session : public std::enable_shared_from_this +template +class socket_session : public std::enable_shared_from_this> { - beast::flat_buffer buffer_; // used to store incoming messages - websocket::stream ws_; // websocket stream used send an recieve messages - std::vector queue_; // uses to store messages temporarily until it is sent to the relevant party - socket_session_handler &sess_handler_; // handler passed to gain access to websocket events + beast::flat_buffer buffer; // used to store incoming messages + websocket::stream ws; // websocket stream used send an recieve messages + std::vector queue; // used to store messages temporarily until it is sent to the relevant party + socket_session_handler &sess_handler; // handler passed to gain access to websocket events - void fail(error ec, char const *what); + void fail(error_code ec, char const *what); - void on_accept(error ec); + void on_accept(error_code ec); - void on_read(error ec, std::size_t bytes_transferred); + void on_read(error_code ec, std::size_t bytes_transferred); - void on_write(error ec, std::size_t bytes_transferred); + void on_write(error_code ec, std::size_t bytes_transferred); - void on_close(error ec, std::int8_t type); + void on_close(error_code ec, std::int8_t type); public: - socket_session(websocket::stream &websocket, socket_session_handler &sess_handler); - + socket_session(websocket::stream &websocket, socket_session_handler &sess_handler); + ~socket_session(); // Port and the address of the remote party is being saved to used in the session handler - // to identify from which remote party the message recieved. Since the port is passed as a string + // to identify from which remote party the message recieved. Since the port is passed as a string // from the parent we store as it is, since we are not going to pass it anywhere or used in a method // The port of the remote party. - std::string port_; + std::string port; // The IP address of the remote party. - std::string address_; + std::string address; // The unique identifier of the remote party (format :). - std::string uniqueid_; + std::string uniqueid; // The set of util::SESSION_FLAG enum flags that will be set by user-code of this calss. // We mainly use this to store contexual information about this session based on the use case. @@ -66,14 +85,232 @@ public: std::bitset<8> flags_; void server_run(const std::string &&address, const std::string &&port); - void client_run(const std::string &&address, const std::string &&port, error ec); + void client_run(const std::string &&address, const std::string &&port, error_code ec); - void send(std::string &&ss); + void send(T msg); // When called, initializes the unique id string for this session. void init_uniqueid(); void close(); }; + +template +socket_session::socket_session(websocket::stream &websocket, socket_session_handler &sess_handler) + : ws(std::move(websocket)), sess_handler(sess_handler) +{ + // We use binary data instead of ASCII/UTF8 character data. + ws.binary(true); +} + +template +socket_session::~socket_session() +{ + sess_handler.on_close(this); +} + +//port and address will be used to identify from which client the message recieved in the handler +template +void socket_session::server_run(const std::string &&address, const std::string &&port) +{ + this->port = port; + this->address = address; + + //Set this flag to identify whether this socket session created when node acts as a server + flags_.set(util::SESSION_FLAG::INBOUND); + + // Accept the websocket handshake + ws.async_accept( + [sp = this->shared_from_this()]( + error_code ec) { + sp->on_accept(ec); + }); +} + +//port and address will be used to identify from which server the message recieved in the handler +template +void socket_session::client_run(const std::string &&address, const std::string &&port, error_code ec) +{ + this->port = port; + this->address = address; + + if (ec) + return fail(ec, "handshake"); + + sess_handler.on_connect(this); + + ws.async_read( + buffer, + [sp = this->shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_read(ec, bytes); + }); +} + +/** + * Executes on error +*/ +template +void socket_session::fail(error_code ec, char const *what) +{ + // LOG_ERR << what << ": " << ec.message(); + + // Don't report these + if (ec == net::error::operation_aborted || + ec == websocket::error::closed) + return; +} + +/** + * Executes on acceptance of new connection +*/ +template +void socket_session::on_accept(error_code ec) +{ + // Handle the error, if any + if (ec) + return fail(ec, "accept"); + + sess_handler.on_connect(this); + + // Read a message + ws.async_read( + buffer, + [sp = this->shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_read(ec, bytes); + }); +} + +/* +* Executes on completion of recieiving a new message +*/ +template +void socket_session::on_read(error_code ec, std::size_t) +{ + //if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler + // read may get called when operation_aborted as well. + // We don't need to process read operation in that case. + if (ec == net::error::operation_aborted) + return; + + // Handle the error, if any + if (ec) + { + // if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler + on_close(ec, 1); + return fail(ec, "read"); + } + + // Wrap the buffer data in a string_view and call session handler. + // We DO NOT transfer ownership of buffer data to the session handler. It should + // read and process the message and we will clear the buffer after its done with it. + const char *buffer_data = net::buffer_cast(buffer.data()); + std::string_view message(buffer_data, buffer.size()); + sess_handler.on_message(this, message); + + // Clear the buffer + buffer.consume(buffer.size()); + + // Read another message + ws.async_read( + buffer, + [sp = this->shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_read(ec, bytes); + }); +} + +/* +* Send message through an active websocket connection +*/ +template +void socket_session::send(T msg) +{ + // Always add to queue + queue.push_back(std::move(msg)); + + // Are we already writing? + if (queue.size() > 1) + return; + + std::string_view sv = queue.front().buffer(); + + // We are not currently writing, so send this immediately + ws.async_write( + // Project the outbound_message buffer from the queue front into the asio buffer. + net::buffer(sv.data(), sv.length()), + [sp = this->shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_write(ec, bytes); + }); +} + +/* +* Executes on completion of write operation to a socket +*/ +template +void socket_session::on_write(error_code ec, std::size_t) +{ + // Handle the error, if any + if (ec) + return fail(ec, "write"); + + // Remove the string from the queue + queue.erase(queue.begin()); + + // Send the next message if any + if (!queue.empty()) + { + std::string_view sv = queue.front().buffer(); + ws.async_write( + net::buffer(sv.data(), sv.length()), + [sp = this->shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_write(ec, bytes); + }); + } +} + +/* +* Close an active websocket connection gracefully +*/ +template +void socket_session::close() +{ + // Close the WebSocket connection + ws.async_close(websocket::close_code::normal, + [sp = this->shared_from_this()]( + error_code ec) { + sp->on_close(ec, 0); + }); +} + +/* +* Executes on completion of closing a socket connection +*/ +//type will be used identify whether the error is due to failure in closing the web socket or transfer of another exception to this method +template +void socket_session::on_close(error_code ec, std::int8_t type) +{ + // sess_handler.on_close(this); + + // if (type == 1) + // return; + + // if (ec) + // return fail(ec, "close"); +} + +// When called, initializes the unique id string for this session. +template +void socket_session::init_uniqueid() +{ + // Create a unique id for the session combining ip and port. + // We prepare this appended string here because we need to use it for finding elemends from the maps + // for validation purposes whenever a message is received. + uniqueid.append(address).append(":").append(port); +} + } // namespace sock #endif diff --git a/src/sock/socket_session_handler.hpp b/src/sock/socket_session_handler.hpp index 99a95c23..e4f5b3eb 100644 --- a/src/sock/socket_session_handler.hpp +++ b/src/sock/socket_session_handler.hpp @@ -7,28 +7,30 @@ namespace sock { // Forward declaration +template class socket_session; /** * Represents a WebSocket sessions handler. Can inherit from this class and access websocket events */ +template class socket_session_handler { public: /** * Executes on initiation of a new connection */ - virtual void on_connect(socket_session *session) = 0; + virtual void on_connect(socket_session *session) = 0; /** * Executes on recieval of new message */ - virtual void on_message(socket_session *session, std::string &&message) = 0; + virtual void on_message(socket_session *session, std::string_view message) = 0; /** * Executes on websocket connection close */ - virtual void on_close(socket_session *session) = 0; + virtual void on_close(socket_session *session) = 0; }; } // namespace sock diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp index f33bde7e..4cdfdd57 100644 --- a/src/usr/user_session_handler.cpp +++ b/src/usr/user_session_handler.cpp @@ -19,27 +19,39 @@ using error = boost::system::error_code; namespace usr { +user_outbound_message::user_outbound_message(std::string &&_msg) +{ + msg = std::move(_msg); +} + +// Returns the buffer that should be written to the socket. +std::string_view user_outbound_message::buffer() +{ + return std::string_view(msg.data(), msg.size()); +} + /** * This gets hit every time a client connects to HP via the public port (configured in contract config). */ -void user_session_handler::on_connect(sock::socket_session *session) +void user_session_handler::on_connect(sock::socket_session *session) { - LOG_INFO << "User client connected " << session->address_ << ":" << session->port_; + LOG_INFO << "User client connected " << session->address << ":" << session->port; // As soon as a user connects, we issue them a challenge message. We remember the // challenge we issued and later verifies the user's response with it. - std::string msg; + std::string msgstr; std::string challengehex; - usr::create_user_challenge(msg, challengehex); + usr::create_user_challenge(msgstr, challengehex); // We init the session unique id to associate with the challenge. session->init_uniqueid(); // Create an entry in pending_challenges for later tracking upon challenge response. - usr::pending_challenges[session->uniqueid_] = challengehex; + usr::pending_challenges[session->uniqueid] = challengehex; - session->send(std::move(msg)); + user_outbound_message outmsg(std::move(msgstr)); + session->send(std::move(outmsg)); // Set the challenge-issued flag to help later checks in on_message. session->flags_.set(util::SESSION_FLAG::USER_CHALLENGE_ISSUED); @@ -48,14 +60,16 @@ void user_session_handler::on_connect(sock::socket_session *session) /** * This gets hit every time we receive some data from a client connected to the HP public port. */ -void user_session_handler::on_message(sock::socket_session *session, std::string &&message) +void user_session_handler::on_message( + sock::socket_session *session, + std::string_view message) { // First check whether this session is pending challenge. // Meaning we have previously issued a challenge to the client, if (session->flags_[util::SESSION_FLAG::USER_CHALLENGE_ISSUED]) { // The received message must be the challenge response. We need to verify it. - auto itr = usr::pending_challenges.find(session->uniqueid_); + auto itr = usr::pending_challenges.find(session->uniqueid); if (itr != usr::pending_challenges.end()) { std::string userpubkeyhex; @@ -82,20 +96,20 @@ void user_session_handler::on_message(sock::socket_session *session, std::string session->flags_.reset(util::SESSION_FLAG::USER_CHALLENGE_ISSUED); // Clear challenge-issued flag session->flags_.set(util::SESSION_FLAG::USER_AUTHED); // Set the user-authed flag usr::add_user(session, userpubkey); // Add the user to the global authed user list - usr::pending_challenges.erase(session->uniqueid_); // Remove the stored challenge + usr::pending_challenges.erase(session->uniqueid); // Remove the stored challenge - LOG_INFO << "User connection " << session->uniqueid_ << " authenticated. Public key " - << userpubkeyhex; + LOG_INFO << "User connection " << session->uniqueid << " authenticated. Public key " + << userpubkeyhex; return; } else { - LOG_INFO << "Duplicate user public key " << session->uniqueid_; + LOG_INFO << "Duplicate user public key " << session->uniqueid; } } else { - LOG_INFO << "Challenge verification failed " << session->uniqueid_; + LOG_INFO << "Challenge verification failed " << session->uniqueid; } } } @@ -105,7 +119,7 @@ void user_session_handler::on_message(sock::socket_session *session, std::string // Check whether this user is among authenticated users // and perform authenticated msg processing. - auto itr = usr::users.find(session->uniqueid_); + auto itr = usr::users.find(session->uniqueid); if (itr != usr::users.end()) { // This is an authed user. @@ -121,30 +135,30 @@ void user_session_handler::on_message(sock::socket_session *session, std::string // If for any reason we reach this point, we should drop the connection. session->close(); - LOG_INFO << "Dropped the user connection " << session->address_ << ":" << session->port_; + LOG_INFO << "Dropped the user connection " << session->address << ":" << session->port; } /** * This gets hit every time a client disconnects from the HP public port. */ -void user_session_handler::on_close(sock::socket_session *session) +void user_session_handler::on_close(sock::socket_session *session) { // Cleanup any resources related to this session. // Session is awaiting challenge response. if (session->flags_[util::SESSION_FLAG::USER_CHALLENGE_ISSUED]) { - usr::pending_challenges.erase(session->uniqueid_); + usr::pending_challenges.erase(session->uniqueid); } // Session belongs to an authed user. else if (session->flags_[util::SESSION_FLAG::USER_AUTHED]) { // Wait for SC process completion before we remove existing user. proc::await_contract_execution(); - usr::remove_user(session->uniqueid_); + usr::remove_user(session->uniqueid); } - LOG_INFO << "User disconnected " << session->uniqueid_; + LOG_INFO << "User disconnected " << session->uniqueid; } } // namespace usr \ No newline at end of file diff --git a/src/usr/user_session_handler.hpp b/src/usr/user_session_handler.hpp index 158a40aa..bece2c45 100644 --- a/src/usr/user_session_handler.hpp +++ b/src/usr/user_session_handler.hpp @@ -1,3 +1,6 @@ +#ifndef _HP_USER_SESSION_HANDLER_H_ +#define _HP_USER_SESSION_HANDLER_H_ + #include #include "../sock/socket_session_handler.hpp" #include "../sock/socket_session.hpp" @@ -5,12 +8,29 @@ namespace usr { -class user_session_handler : public sock::socket_session_handler +/** + * Represents a message (bytes) that is sent to a user. + */ +class user_outbound_message : public sock::outbound_message { + // Contains message bytes. + std::string msg; + public: - void on_connect(sock::socket_session *session); - void on_message(sock::socket_session *session, std::string &&message); - void on_close(sock::socket_session *session); + user_outbound_message(std::string &&_msg); + + // Returns the buffer that should be written to the socket. + virtual std::string_view buffer(); }; -} // namespace usr \ No newline at end of file +class user_session_handler : public sock::socket_session_handler +{ +public: + void on_connect(sock::socket_session *session); + void on_message(sock::socket_session *session, std::string_view message); + void on_close(sock::socket_session *session); +}; + +} // namespace usr + +#endif \ No newline at end of file diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index db645245..ec1392dc 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -202,9 +202,9 @@ int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string * @param pubkey User's binary public key. * @return 0 on successful additions. -1 on failure. */ -int add_user(sock::socket_session *session, const std::string &pubkey) +int add_user(sock::socket_session *session, const std::string &pubkey) { - const std::string &sessionid = session->uniqueid_; + const std::string &sessionid = session->uniqueid; if (users.count(sessionid) == 1) { LOG_INFO << sessionid << " already exist. Cannot add user."; @@ -249,7 +249,7 @@ int remove_user(const std::string &sessionid) void start_listening() { auto address = net::ip::make_address(conf::cfg.listenip); - std::make_shared( + std::make_shared>( ioc, tcp::endpoint{address, conf::cfg.pubport}, global_usr_session_handler) diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index 9207ca1f..a6f092d2 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -6,6 +6,7 @@ #include #include "../util.hpp" #include "../sock/socket_session.hpp" +#include "user_session_handler.hpp" /** * Maintains the global user list with pending input outputs and manages user connections. @@ -27,12 +28,12 @@ struct connected_user // Holds the websocket session of this user. // We don't need to own the session object since the lifetime of user and session are coupled. - sock::socket_session *session; + sock::socket_session *session; /** * @param _pubkey The public key of the user in binary format. */ - connected_user(sock::socket_session *_session, std::string_view _pubkey) + connected_user(sock::socket_session *_session, std::string_view _pubkey) { session = _session; pubkey = _pubkey; @@ -64,7 +65,7 @@ void create_user_challenge(std::string &msg, std::string &challengehex); int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string_view response, std::string_view original_challenge); -int add_user(sock::socket_session *session, const std::string &pubkey); +int add_user(sock::socket_session *session, const std::string &pubkey); int remove_user(const std::string &sessionid);