From b4b9132d18e4fc082347591c86d09aec0db5b25f Mon Sep 17 00:00:00 2001 From: Ravidu Lashan Date: Fri, 25 Oct 2019 21:57:28 +0530 Subject: [PATCH] Seperated out templated headers and implementation in sock namespace (#43) --- CMakeLists.txt | 3 + examples/hpclient/client.js | 10 +- src/sock/socket_client.cpp | 86 +++++++++++ src/sock/socket_client.hpp | 71 --------- src/sock/socket_server.cpp | 129 ++++++++++++++++ src/sock/socket_server.hpp | 108 ------------- src/sock/socket_session.cpp | 293 ++++++++++++++++++++++++++++++++++++ src/sock/socket_session.hpp | 265 -------------------------------- 8 files changed, 517 insertions(+), 448 deletions(-) create mode 100644 src/sock/socket_client.cpp create mode 100644 src/sock/socket_server.cpp create mode 100644 src/sock/socket_session.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index bc1ea96f..291dc5d0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,6 +7,9 @@ find_package(Boost REQUIRED COMPONENTS system log) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY build) add_executable(hpcore + src/sock/socket_client.cpp + src/sock/socket_server.cpp + src/sock/socket_session.cpp src/p2p/peer_session_handler.cpp src/p2p/p2p.cpp src/util.cpp diff --git a/examples/hpclient/client.js b/examples/hpclient/client.js index e90f50fe..5c4967cc 100644 --- a/examples/hpclient/client.js +++ b/examples/hpclient/client.js @@ -29,13 +29,15 @@ function main() { } - var server = 'ws://localhost:8080' + var server = 'wss://localhost:8080' - if (process.argv.length == 3) server = 'ws://localhost:' + process.argv[2] + if (process.argv.length == 3) server = 'wss://localhost:' + process.argv[2] - if (process.argv.length == 4) server = 'ws://' + process.argv[2] + ':' + process.argv[3] + if (process.argv.length == 4) server = 'wss://' + process.argv[2] + ':' + process.argv[3] - var ws = new ws_api(server) + var ws = new ws_api(server, { + rejectUnauthorized: false + }) /* anatomy of a public challenge { diff --git a/src/sock/socket_client.cpp b/src/sock/socket_client.cpp new file mode 100644 index 00000000..e4fdc301 --- /dev/null +++ b/src/sock/socket_client.cpp @@ -0,0 +1,86 @@ +#include "socket_client.hpp" +#include "../p2p/peer_session_handler.hpp" +#include "../usr/user_session_handler.hpp" + +namespace sock +{ + +template +socket_client::socket_client(net::io_context &ioc, ssl::context &ctx, socket_session_handler &session_handler, const session_options &session_options) + : resolver(net::make_strand(ioc)), ws(net::make_strand(ioc), ctx), sess_handler(session_handler), sess_opts(session_options) +{ +} + +/** + * Entry point to socket client which will intiate a connection to server +*/ +// boost async_resolve function requires a port as a string because of that port is passed as a string +template +void socket_client::run(std::string_view host, std::string_view port) +{ + this->host = host; + this->port = port; + + // Look up the domain name + resolver.async_resolve( + host, + port, + [self = this->shared_from_this()](error ec, tcp::resolver::results_type results) { + self->on_resolve(ec, results); + }); +} + +/** + * Executes on completion of resolving the server +*/ +template +void socket_client::on_resolve(error ec, tcp::resolver::results_type results) +{ + if (ec) + socket_client_fail(ec, "socket_client_resolve"); + + // Make the connection on the IP address we get from a lookup + beast::get_lowest_layer(ws).async_connect( + results, + [self = this->shared_from_this()](error ec, tcp::resolver::results_type::endpoint_type type) { + self->on_connect(ec, type); + }); +} + +/** + * Executes on completion of connecting to the server +*/ +template +void socket_client::on_connect(error ec, tcp::resolver::results_type::endpoint_type) +{ + if (ec) + { + socket_client_fail(ec, "socket_client_connect"); + } + else + { + //Creates a new socket session object + std::make_shared>( + std::move(ws), sess_handler) + ->run(std::move(host), std::move(port), false, sess_opts); + } +} + +/** + * Executes on error +*/ +template +void socket_client::socket_client_fail(beast::error_code ec, char const *what) +{ + LOG_ERR << what << ": " << ec.message(); +} + +/** + * Declaring templates with possible values for T because keeping all those in hpp file makes compile take a long time + */ +template socket_client::socket_client(net::io_context &ioc, ssl::context &ctx, socket_session_handler &session_handler, const session_options &session_options); +template void socket_client::run(std::string_view host, std::string_view port); + +template socket_client::socket_client(net::io_context &ioc, ssl::context &ctx, socket_session_handler &session_handler, const session_options &session_options); +template void socket_client::run(std::string_view host, std::string_view port); +} // namespace sock diff --git a/src/sock/socket_client.hpp b/src/sock/socket_client.hpp index 4073cb97..85c877a7 100644 --- a/src/sock/socket_client.hpp +++ b/src/sock/socket_client.hpp @@ -45,76 +45,5 @@ public: //Entry point to the client which requires an active host and port void run(std::string_view host, std::string_view port); }; - -template -socket_client::socket_client(net::io_context &ioc, ssl::context &ctx, socket_session_handler &session_handler, const session_options &session_options) - : resolver(net::make_strand(ioc)), ws(net::make_strand(ioc), ctx), sess_handler(session_handler), sess_opts(session_options) -{ -} - -/** - * Entry point to socket client which will intiate a connection to server -*/ -// boost async_resolve function requires a port as a string because of that port is passed as a string -template -void socket_client::run(std::string_view host, std::string_view port) -{ - this->host = host; - this->port = port; - - // Look up the domain name - resolver.async_resolve( - host, - port, - [self = this->shared_from_this()](error ec, tcp::resolver::results_type results) { - self->on_resolve(ec, results); - }); -} - -/** - * Executes on completion of resolving the server -*/ -template -void socket_client::on_resolve(error ec, tcp::resolver::results_type results) -{ - if (ec) - socket_client_fail(ec, "socket_client_resolve"); - - // Make the connection on the IP address we get from a lookup - beast::get_lowest_layer(ws).async_connect( - results, - [self = this->shared_from_this()](error ec, tcp::resolver::results_type::endpoint_type type) { - self->on_connect(ec, type); - }); -} - -/** - * Executes on completion of connecting to the server -*/ -template -void socket_client::on_connect(error ec, tcp::resolver::results_type::endpoint_type) -{ - if (ec) - { - socket_client_fail(ec, "socket_client_connect"); - } - else - { - //Creates a new socket session object - std::make_shared>( - std::move(ws), sess_handler) - ->run(std::move(host), std::move(port), false, sess_opts); - } -} - -/** - * Executes on error -*/ -template -void socket_client::socket_client_fail(beast::error_code ec, char const *what) -{ - LOG_ERR << what << ": " << ec.message(); -} - } // namespace sock #endif \ No newline at end of file diff --git a/src/sock/socket_server.cpp b/src/sock/socket_server.cpp new file mode 100644 index 00000000..973d1d62 --- /dev/null +++ b/src/sock/socket_server.cpp @@ -0,0 +1,129 @@ +#include "socket_server.hpp" +#include "../p2p/peer_session_handler.hpp" +#include "../usr/user_session_handler.hpp" + +namespace sock +{ + +template +socket_server::socket_server(net::io_context &ioc, ssl::context &ctx, tcp::endpoint endpoint, socket_session_handler &session_handler, const session_options &session_options) + : acceptor(net::make_strand(ioc)), ioc(ioc), ctx(ctx), sess_handler(session_handler), sess_opts(session_options) +{ + error_code ec; + + // Open the acceptor + acceptor.open(endpoint.protocol(), ec); + if (ec) + { + fail(ec, "open"); + return; + } + + // Allow address reuse + acceptor.set_option(net::socket_base::reuse_address(true)); + if (ec) + { + fail(ec, "set_option"); + return; + } + + // Bind to the server address + acceptor.bind(endpoint, ec); + if (ec) + { + fail(ec, "bind"); + return; + } + + // Start listening for connections + acceptor.listen( + net::socket_base::max_listen_connections, ec); + if (ec) + { + fail(ec, "listen"); + return; + } +} + +/** + * Entry point to socket server which accepts new connections +*/ +template +void socket_server::run() +{ + // Adding ssl context options disallowing requests which supports sslv2 and sslv3 which have security vulnerabilitis + ctx.set_options( + boost::asio::ssl::context::default_workarounds | + boost::asio::ssl::context::no_sslv2 | + boost::asio::ssl::context::no_sslv3); + + //Providing the certification file for ssl context + ctx.use_certificate_chain_file(conf::ctx.tlsCertFile); + + // Providing key file for the ssl context + ctx.use_private_key_file( + conf::ctx.tlsKeyFile, + boost::asio::ssl::context::pem); + + // Start accepting a connection + acceptor.async_accept( + net::make_strand(ioc), + beast::bind_front_handler( + &socket_server::on_accept, + this->shared_from_this())); +} + +/** + * Executes on acceptance of new connection +*/ +template +void socket_server::on_accept(error_code ec, tcp::socket socket) +{ + if (ec) + { + return fail(ec, "accept"); + } + else + { + std::string port = std::to_string(socket.remote_endpoint().port()); + std::string address = socket.remote_endpoint().address().to_string(); + + //Creating websocket stream required to pass to initiate a new session + websocket::stream> ws(std::move(socket), ctx); + + // Launch a new session for this connection + std::make_shared>( + std::move(ws), sess_handler) + ->run(std::move(port), std::move(address), true, sess_opts); + } + + // Accept another connection + acceptor.async_accept( + net::make_strand(ioc), + beast::bind_front_handler( + &socket_server::on_accept, + this->shared_from_this())); +} + +/** + * Executes on error +*/ +template +void socket_server::fail(error_code ec, char const *what) +{ + // Don't report on canceled operations + if (ec == net::error::operation_aborted) + return; + LOG_ERR << what << ": " << ec.message(); +} + +/** + * Declaring templates with possible values for T because keeping all those in hpp file makes compile take a long time + */ +template socket_server::socket_server(net::io_context &ioc, ssl::context &ctx, tcp::endpoint endpoint, socket_session_handler &session_handler, const session_options &session_options); +template void socket_server::run(); + +template socket_server::socket_server(net::io_context &ioc, ssl::context &ctx, tcp::endpoint endpoint, socket_session_handler &session_handler, const session_options &session_options); +template void socket_server::run(); + +} // namespace sock \ No newline at end of file diff --git a/src/sock/socket_server.hpp b/src/sock/socket_server.hpp index b74e1dcb..bee1b1d2 100644 --- a/src/sock/socket_server.hpp +++ b/src/sock/socket_server.hpp @@ -38,114 +38,6 @@ public: void run(); }; -template -socket_server::socket_server(net::io_context &ioc, ssl::context &ctx, tcp::endpoint endpoint, socket_session_handler &session_handler, const session_options &session_options) - : acceptor(net::make_strand(ioc)), ioc(ioc), ctx(ctx), sess_handler(session_handler), sess_opts(session_options) -{ - error_code ec; - - // Open the acceptor - acceptor.open(endpoint.protocol(), ec); - if (ec) - { - fail(ec, "open"); - return; - } - - // Allow address reuse - acceptor.set_option(net::socket_base::reuse_address(true)); - if (ec) - { - fail(ec, "set_option"); - return; - } - - // Bind to the server address - acceptor.bind(endpoint, ec); - if (ec) - { - fail(ec, "bind"); - return; - } - - // Start listening for connections - acceptor.listen( - net::socket_base::max_listen_connections, ec); - if (ec) - { - fail(ec, "listen"); - return; - } -} - -/** - * Entry point to socket server which accepts new connections -*/ -template -void socket_server::run() -{ - // Adding ssl context options disallowing requests which supports sslv2 and sslv3 which have security vulnerabilitis - ctx.set_options( - boost::asio::ssl::context::default_workarounds | - boost::asio::ssl::context::no_sslv2 | - boost::asio::ssl::context::no_sslv3); - - //Providing the certification file for ssl context - ctx.use_certificate_chain_file(conf::ctx.tlsCertFile); - - // Providing key file for the ssl context - ctx.use_private_key_file( - conf::ctx.tlsKeyFile, - boost::asio::ssl::context::pem); - - // Start accepting a connection - acceptor.async_accept( - net::make_strand(ioc), - beast::bind_front_handler( - &socket_server::on_accept, - this->shared_from_this())); -} - -/** - * Executes on acceptance of new connection -*/ -template -void socket_server::on_accept(error_code ec, tcp::socket socket) -{ - if (ec) - { - return fail(ec, "accept"); - } - else - { - //Creating websocket stream required to pass to initiate a new session - websocket::stream> ws(std::move(socket), ctx); - - // Launch a new session for this connection - std::make_shared>( - std::move(ws), sess_handler) - ->run(std::to_string(socket.remote_endpoint().port()), socket.remote_endpoint().address().to_string(), true, sess_opts); - } - - // Accept another connection - acceptor.async_accept( - net::make_strand(ioc), - beast::bind_front_handler( - &socket_server::on_accept, - this->shared_from_this())); -} - -/** - * Executes on error -*/ -template -void socket_server::fail(error_code ec, char const *what) -{ - // Don't report on canceled operations - if (ec == net::error::operation_aborted) - return; - LOG_ERR << what << ": " << ec.message(); -} } // namespace sock diff --git a/src/sock/socket_session.cpp b/src/sock/socket_session.cpp new file mode 100644 index 00000000..b4676752 --- /dev/null +++ b/src/sock/socket_session.cpp @@ -0,0 +1,293 @@ +#include "socket_session.hpp" +#include "../p2p/peer_session_handler.hpp" +#include "../usr/user_session_handler.hpp" + +namespace sock{ + + template +socket_session::socket_session(websocket::stream> websocket, socket_session_handler &sess_handler) + : ws(std::move(websocket)), sess_handler(sess_handler) +{ + // We use binary data instead of ASCII/UTF8 character data. + ws.binary(true); +} + +template +socket_session::~socket_session() +{ + sess_handler.on_close(this); +} + +/** + * Sets the largest permissible incoming message size. If exceeds over this limit will cause a + * protocol failure +*/ +template +void socket_session::set_message_max_size(std::uint64_t size) +{ + ws.read_message_max(size); +} + +//port and address will be used to identify from which remote party the message recieved in the handler +template +void socket_session::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts) +{ + ssl::stream_base::handshake_type handshake_type = ssl::stream_base::client; + + if (sess_opts.max_message_size > 0) + { + // Setting maximum file size + set_message_max_size(sess_opts.max_message_size); + } + + if (is_server_session) + { + /** + * Set this flag to identify whether this socket session created when node acts as a server + * INBOUND true - when node acts as server + * INBOUND false (OUTBOUND) - when node acts as client + */ + flags.set(util::SESSION_FLAG::INBOUND); + handshake_type = ssl::stream_base::server; + } + + this->port = port; + this->address = address; + + // Set the timeout. + beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30)); + + // Perform the SSL handshake + ws.next_layer().async_handshake( + handshake_type, + [sp = this->shared_from_this()](error_code ec) { + sp->on_ssl_handshake(ec); + }); +} + +/* +* Close an active websocket connection gracefully +*/ +template +void socket_session::on_ssl_handshake(error_code ec) +{ + if (ec) + return fail(ec, "handshake"); + + // Turn off the timeout on the tcp_stream, because + // the websocket stream has its own timeout system. + beast::get_lowest_layer(ws).expires_never(); + + if (flags[util::SESSION_FLAG::INBOUND]) + { + // Set suggested timeout settings for the websocket + ws.set_option( + websocket::stream_base::timeout::suggested( + beast::role_type::server)); + + // Accept the websocket handshake + ws.async_accept( + [sp = this->shared_from_this()]( + error_code ec) { + sp->on_accept(ec); + }); + } + else + { + + ws.set_option( + websocket::stream_base::timeout::suggested( + beast::role_type::client)); + + // Perform the websocket handshake + ws.async_handshake(this->address, "/", + [sp = this->shared_from_this()]( + error_code ec) { + sp->on_accept(ec); + }); + } +} + +/** + * Executes on acceptance of new connection +*/ +template +void socket_session::on_accept(error_code ec) +{ + // Handle the error, if any + if (ec) + return fail(ec, "accept"); + + sess_handler.on_connect(this); + + // Read a message + ws.async_read( + buffer, + [sp = this->shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_read(ec, bytes); + }); +} + +/* +* Executes on completion of recieiving a new message +*/ +template +void socket_session::on_read(error_code ec, std::size_t) +{ + //if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler + // read may get called when operation_aborted as well. + // We don't need to process read operation in that case. + if (ec == net::error::operation_aborted) + return; + + // Handle the error, if any + if (ec) + { + // if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler + on_close(ec, 1); + return fail(ec, "read"); + } + + // Wrap the buffer data in a string_view and call session handler. + // We DO NOT transfer ownership of buffer data to the session handler. It should + // read and process the message and we will clear the buffer after its done with it. + const char *buffer_data = net::buffer_cast(buffer.data()); + std::string_view message(buffer_data, buffer.size()); + sess_handler.on_message(this, message); + + // Clear the buffer + buffer.consume(buffer.size()); + + // Read another message + ws.async_read( + buffer, + [sp = this->shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_read(ec, bytes); + }); +} + +/* +* Send message through an active websocket connection +*/ +template +void socket_session::send(T msg) +{ + // Always add to queue + queue.push_back(std::move(msg)); + + // Are we already writing? + if (queue.size() > 1) + return; + + std::string_view sv = queue.front().buffer(); + + // We are not currently writing, so send this immediately + ws.async_write( + // Project the outbound_message buffer from the queue front into the asio buffer. + net::buffer(sv.data(), sv.length()), + [sp = this->shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_write(ec, bytes); + }); +} + +/* +* Executes on completion of write operation to a socket +*/ +template +void socket_session::on_write(error_code ec, std::size_t) +{ + // Handle the error, if any + if (ec) + return fail(ec, "write"); + + // Remove the string from the queue + queue.erase(queue.begin()); + + // Send the next message if any + if (!queue.empty()) + { + std::string_view sv = queue.front().buffer(); + ws.async_write( + net::buffer(sv.data(), sv.length()), + [sp = this->shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_write(ec, bytes); + }); + } +} + +/* +* Close an active websocket connection gracefully +*/ +template +void socket_session::close() +{ + // Close the WebSocket connection + ws.async_close(websocket::close_code::normal, + [sp = this->shared_from_this()]( + error_code ec) { + sp->on_close(ec, 0); + }); +} + +/* +* Executes on completion of closing a socket connection +*/ +//type will be used identify whether the error is due to failure in closing the web socket or transfer of another exception to this method +template +void socket_session::on_close(error_code ec, std::int8_t type) +{ + if (type == 1) + return; + + if (ec) + return fail(ec, "close"); +} + +// When called, initializes the unique id string for this session. +template +void socket_session::init_uniqueid() +{ + // Create a unique id for the session combining ip and port. + // We prepare this appended string here because we need to use it for finding elemends from the maps + // for validation purposes whenever a message is received. + uniqueid.append(address).append(":").append(port); +} + +/** + * Executes on error +*/ +template +void socket_session::fail(error_code ec, char const *what) +{ + LOG_ERR << what << ": " << ec.message(); + + // Don't report these + if (ec == net::error::operation_aborted || + ec == websocket::error::closed) + return; +} + +/** + * Declaring templates with possible values for T because keeping all those in hpp file makes compile take a long time + */ +template socket_session::socket_session(websocket::stream> websocket, socket_session_handler &sess_handler); +template socket_session::~socket_session(); +template void socket_session::set_message_max_size(std::uint64_t size); +template void socket_session::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts); +template void socket_session::send(p2p::peer_outbound_message msg); +template void socket_session::init_uniqueid(); +template void socket_session::close(); + +template socket_session::socket_session(websocket::stream> websocket, socket_session_handler &sess_handler); +template socket_session::~socket_session(); +template void socket_session::set_message_max_size(std::uint64_t size); +template void socket_session::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts); +template void socket_session::send(usr::user_outbound_message msg); +template void socket_session::init_uniqueid(); +template void socket_session::close(); + +} \ No newline at end of file diff --git a/src/sock/socket_session.hpp b/src/sock/socket_session.hpp index 4db72b8a..3189b129 100644 --- a/src/sock/socket_session.hpp +++ b/src/sock/socket_session.hpp @@ -108,272 +108,7 @@ public: void close(); }; -template -socket_session::socket_session(websocket::stream> websocket, socket_session_handler &sess_handler) - : ws(std::move(websocket)), sess_handler(sess_handler) -{ - // We use binary data instead of ASCII/UTF8 character data. - ws.binary(true); -} -template -socket_session::~socket_session() -{ - sess_handler.on_close(this); -} - -/** - * Sets the largest permissible incoming message size. If exceeds over this limit will cause a - * protocol failure -*/ -template -void socket_session::set_message_max_size(std::uint64_t size) -{ - ws.read_message_max(size); -} - -//port and address will be used to identify from which remote party the message recieved in the handler -template -void socket_session::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts) -{ - ssl::stream_base::handshake_type handshake_type = ssl::stream_base::client; - - if (sess_opts.max_message_size > 0) - { - // Setting maximum file size - set_message_max_size(sess_opts.max_message_size); - } - - if (is_server_session) - { - /** - * Set this flag to identify whether this socket session created when node acts as a server - * INBOUND true - when node acts as server - * INBOUND false (OUTBOUND) - when node acts as client - */ - flags.set(util::SESSION_FLAG::INBOUND); - handshake_type = ssl::stream_base::server; - } - - this->port = port; - this->address = address; - - // Set the timeout. - beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30)); - - // Perform the SSL handshake - ws.next_layer().async_handshake( - handshake_type, - [sp = this->shared_from_this()](error_code ec) { - sp->on_ssl_handshake(ec); - }); -} - -/* -* Close an active websocket connection gracefully -*/ -template -void socket_session::on_ssl_handshake(error_code ec) -{ - if (ec) - return fail(ec, "handshake"); - - // Turn off the timeout on the tcp_stream, because - // the websocket stream has its own timeout system. - beast::get_lowest_layer(ws).expires_never(); - - if (flags[util::SESSION_FLAG::INBOUND]) - { - // Set suggested timeout settings for the websocket - ws.set_option( - websocket::stream_base::timeout::suggested( - beast::role_type::server)); - - // Accept the websocket handshake - ws.async_accept( - [sp = this->shared_from_this()]( - error_code ec) { - sp->on_accept(ec); - }); - } - else - { - - ws.set_option( - websocket::stream_base::timeout::suggested( - beast::role_type::client)); - - // Perform the websocket handshake - ws.async_handshake(this->address, "/", - [sp = this->shared_from_this()]( - error_code ec) { - sp->on_accept(ec); - }); - } -} - -/** - * Executes on acceptance of new connection -*/ -template -void socket_session::on_accept(error_code ec) -{ - // Handle the error, if any - if (ec) - return fail(ec, "accept"); - - sess_handler.on_connect(this); - - // Read a message - ws.async_read( - buffer, - [sp = this->shared_from_this()]( - error_code ec, std::size_t bytes) { - sp->on_read(ec, bytes); - }); -} - -/* -* Executes on completion of recieiving a new message -*/ -template -void socket_session::on_read(error_code ec, std::size_t) -{ - //if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler - // read may get called when operation_aborted as well. - // We don't need to process read operation in that case. - if (ec == net::error::operation_aborted) - return; - - // Handle the error, if any - if (ec) - { - // if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler - on_close(ec, 1); - return fail(ec, "read"); - } - - // Wrap the buffer data in a string_view and call session handler. - // We DO NOT transfer ownership of buffer data to the session handler. It should - // read and process the message and we will clear the buffer after its done with it. - const char *buffer_data = net::buffer_cast(buffer.data()); - std::string_view message(buffer_data, buffer.size()); - sess_handler.on_message(this, message); - - // Clear the buffer - buffer.consume(buffer.size()); - - // Read another message - ws.async_read( - buffer, - [sp = this->shared_from_this()]( - error_code ec, std::size_t bytes) { - sp->on_read(ec, bytes); - }); -} - -/* -* Send message through an active websocket connection -*/ -template -void socket_session::send(T msg) -{ - // Always add to queue - queue.push_back(std::move(msg)); - - // Are we already writing? - if (queue.size() > 1) - return; - - std::string_view sv = queue.front().buffer(); - - // We are not currently writing, so send this immediately - ws.async_write( - // Project the outbound_message buffer from the queue front into the asio buffer. - net::buffer(sv.data(), sv.length()), - [sp = this->shared_from_this()]( - error_code ec, std::size_t bytes) { - sp->on_write(ec, bytes); - }); -} - -/* -* Executes on completion of write operation to a socket -*/ -template -void socket_session::on_write(error_code ec, std::size_t) -{ - // Handle the error, if any - if (ec) - return fail(ec, "write"); - - // Remove the string from the queue - queue.erase(queue.begin()); - - // Send the next message if any - if (!queue.empty()) - { - std::string_view sv = queue.front().buffer(); - ws.async_write( - net::buffer(sv.data(), sv.length()), - [sp = this->shared_from_this()]( - error_code ec, std::size_t bytes) { - sp->on_write(ec, bytes); - }); - } -} - -/* -* Close an active websocket connection gracefully -*/ -template -void socket_session::close() -{ - // Close the WebSocket connection - ws.async_close(websocket::close_code::normal, - [sp = this->shared_from_this()]( - error_code ec) { - sp->on_close(ec, 0); - }); -} - -/* -* Executes on completion of closing a socket connection -*/ -//type will be used identify whether the error is due to failure in closing the web socket or transfer of another exception to this method -template -void socket_session::on_close(error_code ec, std::int8_t type) -{ - if (type == 1) - return; - - if (ec) - return fail(ec, "close"); -} - -// When called, initializes the unique id string for this session. -template -void socket_session::init_uniqueid() -{ - // Create a unique id for the session combining ip and port. - // We prepare this appended string here because we need to use it for finding elemends from the maps - // for validation purposes whenever a message is received. - uniqueid.append(address).append(":").append(port); -} - -/** - * Executes on error -*/ -template -void socket_session::fail(error_code ec, char const *what) -{ - LOG_ERR << what << ": " << ec.message(); - - // Don't report these - if (ec == net::error::operation_aborted || - ec == websocket::error::closed) - return; -} } // namespace sock #endif