mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Seperated out templated headers and implementation in sock namespace (#43)
This commit is contained in:
committed by
Ravin Perera
parent
46d114f44b
commit
b4b9132d18
@@ -7,6 +7,9 @@ find_package(Boost REQUIRED COMPONENTS system log)
|
||||
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY build)
|
||||
|
||||
add_executable(hpcore
|
||||
src/sock/socket_client.cpp
|
||||
src/sock/socket_server.cpp
|
||||
src/sock/socket_session.cpp
|
||||
src/p2p/peer_session_handler.cpp
|
||||
src/p2p/p2p.cpp
|
||||
src/util.cpp
|
||||
|
||||
@@ -29,13 +29,15 @@ function main() {
|
||||
}
|
||||
|
||||
|
||||
var server = 'ws://localhost:8080'
|
||||
var server = 'wss://localhost:8080'
|
||||
|
||||
if (process.argv.length == 3) server = 'ws://localhost:' + process.argv[2]
|
||||
if (process.argv.length == 3) server = 'wss://localhost:' + process.argv[2]
|
||||
|
||||
if (process.argv.length == 4) server = 'ws://' + process.argv[2] + ':' + process.argv[3]
|
||||
if (process.argv.length == 4) server = 'wss://' + process.argv[2] + ':' + process.argv[3]
|
||||
|
||||
var ws = new ws_api(server)
|
||||
var ws = new ws_api(server, {
|
||||
rejectUnauthorized: false
|
||||
})
|
||||
|
||||
/* anatomy of a public challenge
|
||||
{
|
||||
|
||||
86
src/sock/socket_client.cpp
Normal file
86
src/sock/socket_client.cpp
Normal file
@@ -0,0 +1,86 @@
|
||||
#include "socket_client.hpp"
|
||||
#include "../p2p/peer_session_handler.hpp"
|
||||
#include "../usr/user_session_handler.hpp"
|
||||
|
||||
namespace sock
|
||||
{
|
||||
|
||||
template <class T>
|
||||
socket_client<T>::socket_client(net::io_context &ioc, ssl::context &ctx, socket_session_handler<T> &session_handler, const session_options &session_options)
|
||||
: resolver(net::make_strand(ioc)), ws(net::make_strand(ioc), ctx), sess_handler(session_handler), sess_opts(session_options)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Entry point to socket client which will intiate a connection to server
|
||||
*/
|
||||
// boost async_resolve function requires a port as a string because of that port is passed as a string
|
||||
template <class T>
|
||||
void socket_client<T>::run(std::string_view host, std::string_view port)
|
||||
{
|
||||
this->host = host;
|
||||
this->port = port;
|
||||
|
||||
// Look up the domain name
|
||||
resolver.async_resolve(
|
||||
host,
|
||||
port,
|
||||
[self = this->shared_from_this()](error ec, tcp::resolver::results_type results) {
|
||||
self->on_resolve(ec, results);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on completion of resolving the server
|
||||
*/
|
||||
template <class T>
|
||||
void socket_client<T>::on_resolve(error ec, tcp::resolver::results_type results)
|
||||
{
|
||||
if (ec)
|
||||
socket_client_fail(ec, "socket_client_resolve");
|
||||
|
||||
// Make the connection on the IP address we get from a lookup
|
||||
beast::get_lowest_layer(ws).async_connect(
|
||||
results,
|
||||
[self = this->shared_from_this()](error ec, tcp::resolver::results_type::endpoint_type type) {
|
||||
self->on_connect(ec, type);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on completion of connecting to the server
|
||||
*/
|
||||
template <class T>
|
||||
void socket_client<T>::on_connect(error ec, tcp::resolver::results_type::endpoint_type)
|
||||
{
|
||||
if (ec)
|
||||
{
|
||||
socket_client_fail(ec, "socket_client_connect");
|
||||
}
|
||||
else
|
||||
{
|
||||
//Creates a new socket session object
|
||||
std::make_shared<socket_session<T>>(
|
||||
std::move(ws), sess_handler)
|
||||
->run(std::move(host), std::move(port), false, sess_opts);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on error
|
||||
*/
|
||||
template <class T>
|
||||
void socket_client<T>::socket_client_fail(beast::error_code ec, char const *what)
|
||||
{
|
||||
LOG_ERR << what << ": " << ec.message();
|
||||
}
|
||||
|
||||
/**
|
||||
* Declaring templates with possible values for T because keeping all those in hpp file makes compile take a long time
|
||||
*/
|
||||
template socket_client<p2p::peer_outbound_message>::socket_client(net::io_context &ioc, ssl::context &ctx, socket_session_handler<p2p::peer_outbound_message> &session_handler, const session_options &session_options);
|
||||
template void socket_client<p2p::peer_outbound_message>::run(std::string_view host, std::string_view port);
|
||||
|
||||
template socket_client<usr::user_outbound_message>::socket_client(net::io_context &ioc, ssl::context &ctx, socket_session_handler<usr::user_outbound_message> &session_handler, const session_options &session_options);
|
||||
template void socket_client<usr::user_outbound_message>::run(std::string_view host, std::string_view port);
|
||||
} // namespace sock
|
||||
@@ -45,76 +45,5 @@ public:
|
||||
//Entry point to the client which requires an active host and port
|
||||
void run(std::string_view host, std::string_view port);
|
||||
};
|
||||
|
||||
template <class T>
|
||||
socket_client<T>::socket_client(net::io_context &ioc, ssl::context &ctx, socket_session_handler<T> &session_handler, const session_options &session_options)
|
||||
: resolver(net::make_strand(ioc)), ws(net::make_strand(ioc), ctx), sess_handler(session_handler), sess_opts(session_options)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Entry point to socket client which will intiate a connection to server
|
||||
*/
|
||||
// boost async_resolve function requires a port as a string because of that port is passed as a string
|
||||
template <class T>
|
||||
void socket_client<T>::run(std::string_view host, std::string_view port)
|
||||
{
|
||||
this->host = host;
|
||||
this->port = port;
|
||||
|
||||
// Look up the domain name
|
||||
resolver.async_resolve(
|
||||
host,
|
||||
port,
|
||||
[self = this->shared_from_this()](error ec, tcp::resolver::results_type results) {
|
||||
self->on_resolve(ec, results);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on completion of resolving the server
|
||||
*/
|
||||
template <class T>
|
||||
void socket_client<T>::on_resolve(error ec, tcp::resolver::results_type results)
|
||||
{
|
||||
if (ec)
|
||||
socket_client_fail(ec, "socket_client_resolve");
|
||||
|
||||
// Make the connection on the IP address we get from a lookup
|
||||
beast::get_lowest_layer(ws).async_connect(
|
||||
results,
|
||||
[self = this->shared_from_this()](error ec, tcp::resolver::results_type::endpoint_type type) {
|
||||
self->on_connect(ec, type);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on completion of connecting to the server
|
||||
*/
|
||||
template <class T>
|
||||
void socket_client<T>::on_connect(error ec, tcp::resolver::results_type::endpoint_type)
|
||||
{
|
||||
if (ec)
|
||||
{
|
||||
socket_client_fail(ec, "socket_client_connect");
|
||||
}
|
||||
else
|
||||
{
|
||||
//Creates a new socket session object
|
||||
std::make_shared<socket_session<T>>(
|
||||
std::move(ws), sess_handler)
|
||||
->run(std::move(host), std::move(port), false, sess_opts);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on error
|
||||
*/
|
||||
template <class T>
|
||||
void socket_client<T>::socket_client_fail(beast::error_code ec, char const *what)
|
||||
{
|
||||
LOG_ERR << what << ": " << ec.message();
|
||||
}
|
||||
|
||||
} // namespace sock
|
||||
#endif
|
||||
129
src/sock/socket_server.cpp
Normal file
129
src/sock/socket_server.cpp
Normal file
@@ -0,0 +1,129 @@
|
||||
#include "socket_server.hpp"
|
||||
#include "../p2p/peer_session_handler.hpp"
|
||||
#include "../usr/user_session_handler.hpp"
|
||||
|
||||
namespace sock
|
||||
{
|
||||
|
||||
template <class T>
|
||||
socket_server<T>::socket_server(net::io_context &ioc, ssl::context &ctx, tcp::endpoint endpoint, socket_session_handler<T> &session_handler, const session_options &session_options)
|
||||
: acceptor(net::make_strand(ioc)), ioc(ioc), ctx(ctx), sess_handler(session_handler), sess_opts(session_options)
|
||||
{
|
||||
error_code ec;
|
||||
|
||||
// Open the acceptor
|
||||
acceptor.open(endpoint.protocol(), ec);
|
||||
if (ec)
|
||||
{
|
||||
fail(ec, "open");
|
||||
return;
|
||||
}
|
||||
|
||||
// Allow address reuse
|
||||
acceptor.set_option(net::socket_base::reuse_address(true));
|
||||
if (ec)
|
||||
{
|
||||
fail(ec, "set_option");
|
||||
return;
|
||||
}
|
||||
|
||||
// Bind to the server address
|
||||
acceptor.bind(endpoint, ec);
|
||||
if (ec)
|
||||
{
|
||||
fail(ec, "bind");
|
||||
return;
|
||||
}
|
||||
|
||||
// Start listening for connections
|
||||
acceptor.listen(
|
||||
net::socket_base::max_listen_connections, ec);
|
||||
if (ec)
|
||||
{
|
||||
fail(ec, "listen");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Entry point to socket server which accepts new connections
|
||||
*/
|
||||
template <class T>
|
||||
void socket_server<T>::run()
|
||||
{
|
||||
// Adding ssl context options disallowing requests which supports sslv2 and sslv3 which have security vulnerabilitis
|
||||
ctx.set_options(
|
||||
boost::asio::ssl::context::default_workarounds |
|
||||
boost::asio::ssl::context::no_sslv2 |
|
||||
boost::asio::ssl::context::no_sslv3);
|
||||
|
||||
//Providing the certification file for ssl context
|
||||
ctx.use_certificate_chain_file(conf::ctx.tlsCertFile);
|
||||
|
||||
// Providing key file for the ssl context
|
||||
ctx.use_private_key_file(
|
||||
conf::ctx.tlsKeyFile,
|
||||
boost::asio::ssl::context::pem);
|
||||
|
||||
// Start accepting a connection
|
||||
acceptor.async_accept(
|
||||
net::make_strand(ioc),
|
||||
beast::bind_front_handler(
|
||||
&socket_server<T>::on_accept,
|
||||
this->shared_from_this()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on acceptance of new connection
|
||||
*/
|
||||
template <class T>
|
||||
void socket_server<T>::on_accept(error_code ec, tcp::socket socket)
|
||||
{
|
||||
if (ec)
|
||||
{
|
||||
return fail(ec, "accept");
|
||||
}
|
||||
else
|
||||
{
|
||||
std::string port = std::to_string(socket.remote_endpoint().port());
|
||||
std::string address = socket.remote_endpoint().address().to_string();
|
||||
|
||||
//Creating websocket stream required to pass to initiate a new session
|
||||
websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws(std::move(socket), ctx);
|
||||
|
||||
// Launch a new session for this connection
|
||||
std::make_shared<socket_session<T>>(
|
||||
std::move(ws), sess_handler)
|
||||
->run(std::move(port), std::move(address), true, sess_opts);
|
||||
}
|
||||
|
||||
// Accept another connection
|
||||
acceptor.async_accept(
|
||||
net::make_strand(ioc),
|
||||
beast::bind_front_handler(
|
||||
&socket_server<T>::on_accept,
|
||||
this->shared_from_this()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on error
|
||||
*/
|
||||
template <class T>
|
||||
void socket_server<T>::fail(error_code ec, char const *what)
|
||||
{
|
||||
// Don't report on canceled operations
|
||||
if (ec == net::error::operation_aborted)
|
||||
return;
|
||||
LOG_ERR << what << ": " << ec.message();
|
||||
}
|
||||
|
||||
/**
|
||||
* Declaring templates with possible values for T because keeping all those in hpp file makes compile take a long time
|
||||
*/
|
||||
template socket_server<p2p::peer_outbound_message>::socket_server(net::io_context &ioc, ssl::context &ctx, tcp::endpoint endpoint, socket_session_handler<p2p::peer_outbound_message> &session_handler, const session_options &session_options);
|
||||
template void socket_server<p2p::peer_outbound_message>::run();
|
||||
|
||||
template socket_server<usr::user_outbound_message>::socket_server(net::io_context &ioc, ssl::context &ctx, tcp::endpoint endpoint, socket_session_handler<usr::user_outbound_message> &session_handler, const session_options &session_options);
|
||||
template void socket_server<usr::user_outbound_message>::run();
|
||||
|
||||
} // namespace sock
|
||||
@@ -38,114 +38,6 @@ public:
|
||||
void run();
|
||||
};
|
||||
|
||||
template <class T>
|
||||
socket_server<T>::socket_server(net::io_context &ioc, ssl::context &ctx, tcp::endpoint endpoint, socket_session_handler<T> &session_handler, const session_options &session_options)
|
||||
: acceptor(net::make_strand(ioc)), ioc(ioc), ctx(ctx), sess_handler(session_handler), sess_opts(session_options)
|
||||
{
|
||||
error_code ec;
|
||||
|
||||
// Open the acceptor
|
||||
acceptor.open(endpoint.protocol(), ec);
|
||||
if (ec)
|
||||
{
|
||||
fail(ec, "open");
|
||||
return;
|
||||
}
|
||||
|
||||
// Allow address reuse
|
||||
acceptor.set_option(net::socket_base::reuse_address(true));
|
||||
if (ec)
|
||||
{
|
||||
fail(ec, "set_option");
|
||||
return;
|
||||
}
|
||||
|
||||
// Bind to the server address
|
||||
acceptor.bind(endpoint, ec);
|
||||
if (ec)
|
||||
{
|
||||
fail(ec, "bind");
|
||||
return;
|
||||
}
|
||||
|
||||
// Start listening for connections
|
||||
acceptor.listen(
|
||||
net::socket_base::max_listen_connections, ec);
|
||||
if (ec)
|
||||
{
|
||||
fail(ec, "listen");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Entry point to socket server which accepts new connections
|
||||
*/
|
||||
template <class T>
|
||||
void socket_server<T>::run()
|
||||
{
|
||||
// Adding ssl context options disallowing requests which supports sslv2 and sslv3 which have security vulnerabilitis
|
||||
ctx.set_options(
|
||||
boost::asio::ssl::context::default_workarounds |
|
||||
boost::asio::ssl::context::no_sslv2 |
|
||||
boost::asio::ssl::context::no_sslv3);
|
||||
|
||||
//Providing the certification file for ssl context
|
||||
ctx.use_certificate_chain_file(conf::ctx.tlsCertFile);
|
||||
|
||||
// Providing key file for the ssl context
|
||||
ctx.use_private_key_file(
|
||||
conf::ctx.tlsKeyFile,
|
||||
boost::asio::ssl::context::pem);
|
||||
|
||||
// Start accepting a connection
|
||||
acceptor.async_accept(
|
||||
net::make_strand(ioc),
|
||||
beast::bind_front_handler(
|
||||
&socket_server<T>::on_accept,
|
||||
this->shared_from_this()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on acceptance of new connection
|
||||
*/
|
||||
template <class T>
|
||||
void socket_server<T>::on_accept(error_code ec, tcp::socket socket)
|
||||
{
|
||||
if (ec)
|
||||
{
|
||||
return fail(ec, "accept");
|
||||
}
|
||||
else
|
||||
{
|
||||
//Creating websocket stream required to pass to initiate a new session
|
||||
websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws(std::move(socket), ctx);
|
||||
|
||||
// Launch a new session for this connection
|
||||
std::make_shared<socket_session<T>>(
|
||||
std::move(ws), sess_handler)
|
||||
->run(std::to_string(socket.remote_endpoint().port()), socket.remote_endpoint().address().to_string(), true, sess_opts);
|
||||
}
|
||||
|
||||
// Accept another connection
|
||||
acceptor.async_accept(
|
||||
net::make_strand(ioc),
|
||||
beast::bind_front_handler(
|
||||
&socket_server<T>::on_accept,
|
||||
this->shared_from_this()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on error
|
||||
*/
|
||||
template <class T>
|
||||
void socket_server<T>::fail(error_code ec, char const *what)
|
||||
{
|
||||
// Don't report on canceled operations
|
||||
if (ec == net::error::operation_aborted)
|
||||
return;
|
||||
LOG_ERR << what << ": " << ec.message();
|
||||
}
|
||||
|
||||
} // namespace sock
|
||||
|
||||
|
||||
293
src/sock/socket_session.cpp
Normal file
293
src/sock/socket_session.cpp
Normal file
@@ -0,0 +1,293 @@
|
||||
#include "socket_session.hpp"
|
||||
#include "../p2p/peer_session_handler.hpp"
|
||||
#include "../usr/user_session_handler.hpp"
|
||||
|
||||
namespace sock{
|
||||
|
||||
template <class T>
|
||||
socket_session<T>::socket_session(websocket::stream<beast::ssl_stream<beast::tcp_stream>> websocket, socket_session_handler<T> &sess_handler)
|
||||
: ws(std::move(websocket)), sess_handler(sess_handler)
|
||||
{
|
||||
// We use binary data instead of ASCII/UTF8 character data.
|
||||
ws.binary(true);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
socket_session<T>::~socket_session()
|
||||
{
|
||||
sess_handler.on_close(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the largest permissible incoming message size. If exceeds over this limit will cause a
|
||||
* protocol failure
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::set_message_max_size(std::uint64_t size)
|
||||
{
|
||||
ws.read_message_max(size);
|
||||
}
|
||||
|
||||
//port and address will be used to identify from which remote party the message recieved in the handler
|
||||
template <class T>
|
||||
void socket_session<T>::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts)
|
||||
{
|
||||
ssl::stream_base::handshake_type handshake_type = ssl::stream_base::client;
|
||||
|
||||
if (sess_opts.max_message_size > 0)
|
||||
{
|
||||
// Setting maximum file size
|
||||
set_message_max_size(sess_opts.max_message_size);
|
||||
}
|
||||
|
||||
if (is_server_session)
|
||||
{
|
||||
/**
|
||||
* Set this flag to identify whether this socket session created when node acts as a server
|
||||
* INBOUND true - when node acts as server
|
||||
* INBOUND false (OUTBOUND) - when node acts as client
|
||||
*/
|
||||
flags.set(util::SESSION_FLAG::INBOUND);
|
||||
handshake_type = ssl::stream_base::server;
|
||||
}
|
||||
|
||||
this->port = port;
|
||||
this->address = address;
|
||||
|
||||
// Set the timeout.
|
||||
beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30));
|
||||
|
||||
// Perform the SSL handshake
|
||||
ws.next_layer().async_handshake(
|
||||
handshake_type,
|
||||
[sp = this->shared_from_this()](error_code ec) {
|
||||
sp->on_ssl_handshake(ec);
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Close an active websocket connection gracefully
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::on_ssl_handshake(error_code ec)
|
||||
{
|
||||
if (ec)
|
||||
return fail(ec, "handshake");
|
||||
|
||||
// Turn off the timeout on the tcp_stream, because
|
||||
// the websocket stream has its own timeout system.
|
||||
beast::get_lowest_layer(ws).expires_never();
|
||||
|
||||
if (flags[util::SESSION_FLAG::INBOUND])
|
||||
{
|
||||
// Set suggested timeout settings for the websocket
|
||||
ws.set_option(
|
||||
websocket::stream_base::timeout::suggested(
|
||||
beast::role_type::server));
|
||||
|
||||
// Accept the websocket handshake
|
||||
ws.async_accept(
|
||||
[sp = this->shared_from_this()](
|
||||
error_code ec) {
|
||||
sp->on_accept(ec);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
ws.set_option(
|
||||
websocket::stream_base::timeout::suggested(
|
||||
beast::role_type::client));
|
||||
|
||||
// Perform the websocket handshake
|
||||
ws.async_handshake(this->address, "/",
|
||||
[sp = this->shared_from_this()](
|
||||
error_code ec) {
|
||||
sp->on_accept(ec);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on acceptance of new connection
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::on_accept(error_code ec)
|
||||
{
|
||||
// Handle the error, if any
|
||||
if (ec)
|
||||
return fail(ec, "accept");
|
||||
|
||||
sess_handler.on_connect(this);
|
||||
|
||||
// Read a message
|
||||
ws.async_read(
|
||||
buffer,
|
||||
[sp = this->shared_from_this()](
|
||||
error_code ec, std::size_t bytes) {
|
||||
sp->on_read(ec, bytes);
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Executes on completion of recieiving a new message
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::on_read(error_code ec, std::size_t)
|
||||
{
|
||||
//if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler
|
||||
// read may get called when operation_aborted as well.
|
||||
// We don't need to process read operation in that case.
|
||||
if (ec == net::error::operation_aborted)
|
||||
return;
|
||||
|
||||
// Handle the error, if any
|
||||
if (ec)
|
||||
{
|
||||
// if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler
|
||||
on_close(ec, 1);
|
||||
return fail(ec, "read");
|
||||
}
|
||||
|
||||
// Wrap the buffer data in a string_view and call session handler.
|
||||
// We DO NOT transfer ownership of buffer data to the session handler. It should
|
||||
// read and process the message and we will clear the buffer after its done with it.
|
||||
const char *buffer_data = net::buffer_cast<const char *>(buffer.data());
|
||||
std::string_view message(buffer_data, buffer.size());
|
||||
sess_handler.on_message(this, message);
|
||||
|
||||
// Clear the buffer
|
||||
buffer.consume(buffer.size());
|
||||
|
||||
// Read another message
|
||||
ws.async_read(
|
||||
buffer,
|
||||
[sp = this->shared_from_this()](
|
||||
error_code ec, std::size_t bytes) {
|
||||
sp->on_read(ec, bytes);
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Send message through an active websocket connection
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::send(T msg)
|
||||
{
|
||||
// Always add to queue
|
||||
queue.push_back(std::move(msg));
|
||||
|
||||
// Are we already writing?
|
||||
if (queue.size() > 1)
|
||||
return;
|
||||
|
||||
std::string_view sv = queue.front().buffer();
|
||||
|
||||
// We are not currently writing, so send this immediately
|
||||
ws.async_write(
|
||||
// Project the outbound_message buffer from the queue front into the asio buffer.
|
||||
net::buffer(sv.data(), sv.length()),
|
||||
[sp = this->shared_from_this()](
|
||||
error_code ec, std::size_t bytes) {
|
||||
sp->on_write(ec, bytes);
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Executes on completion of write operation to a socket
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::on_write(error_code ec, std::size_t)
|
||||
{
|
||||
// Handle the error, if any
|
||||
if (ec)
|
||||
return fail(ec, "write");
|
||||
|
||||
// Remove the string from the queue
|
||||
queue.erase(queue.begin());
|
||||
|
||||
// Send the next message if any
|
||||
if (!queue.empty())
|
||||
{
|
||||
std::string_view sv = queue.front().buffer();
|
||||
ws.async_write(
|
||||
net::buffer(sv.data(), sv.length()),
|
||||
[sp = this->shared_from_this()](
|
||||
error_code ec, std::size_t bytes) {
|
||||
sp->on_write(ec, bytes);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Close an active websocket connection gracefully
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::close()
|
||||
{
|
||||
// Close the WebSocket connection
|
||||
ws.async_close(websocket::close_code::normal,
|
||||
[sp = this->shared_from_this()](
|
||||
error_code ec) {
|
||||
sp->on_close(ec, 0);
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Executes on completion of closing a socket connection
|
||||
*/
|
||||
//type will be used identify whether the error is due to failure in closing the web socket or transfer of another exception to this method
|
||||
template <class T>
|
||||
void socket_session<T>::on_close(error_code ec, std::int8_t type)
|
||||
{
|
||||
if (type == 1)
|
||||
return;
|
||||
|
||||
if (ec)
|
||||
return fail(ec, "close");
|
||||
}
|
||||
|
||||
// When called, initializes the unique id string for this session.
|
||||
template <class T>
|
||||
void socket_session<T>::init_uniqueid()
|
||||
{
|
||||
// Create a unique id for the session combining ip and port.
|
||||
// We prepare this appended string here because we need to use it for finding elemends from the maps
|
||||
// for validation purposes whenever a message is received.
|
||||
uniqueid.append(address).append(":").append(port);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on error
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::fail(error_code ec, char const *what)
|
||||
{
|
||||
LOG_ERR << what << ": " << ec.message();
|
||||
|
||||
// Don't report these
|
||||
if (ec == net::error::operation_aborted ||
|
||||
ec == websocket::error::closed)
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Declaring templates with possible values for T because keeping all those in hpp file makes compile take a long time
|
||||
*/
|
||||
template socket_session<p2p::peer_outbound_message>::socket_session(websocket::stream<beast::ssl_stream<beast::tcp_stream>> websocket, socket_session_handler<p2p::peer_outbound_message> &sess_handler);
|
||||
template socket_session<p2p::peer_outbound_message>::~socket_session();
|
||||
template void socket_session<p2p::peer_outbound_message>::set_message_max_size(std::uint64_t size);
|
||||
template void socket_session<p2p::peer_outbound_message>::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts);
|
||||
template void socket_session<p2p::peer_outbound_message>::send(p2p::peer_outbound_message msg);
|
||||
template void socket_session<p2p::peer_outbound_message>::init_uniqueid();
|
||||
template void socket_session<p2p::peer_outbound_message>::close();
|
||||
|
||||
template socket_session<usr::user_outbound_message>::socket_session(websocket::stream<beast::ssl_stream<beast::tcp_stream>> websocket, socket_session_handler<usr::user_outbound_message> &sess_handler);
|
||||
template socket_session<usr::user_outbound_message>::~socket_session();
|
||||
template void socket_session<usr::user_outbound_message>::set_message_max_size(std::uint64_t size);
|
||||
template void socket_session<usr::user_outbound_message>::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts);
|
||||
template void socket_session<usr::user_outbound_message>::send(usr::user_outbound_message msg);
|
||||
template void socket_session<usr::user_outbound_message>::init_uniqueid();
|
||||
template void socket_session<usr::user_outbound_message>::close();
|
||||
|
||||
}
|
||||
@@ -108,272 +108,7 @@ public:
|
||||
void close();
|
||||
};
|
||||
|
||||
template <class T>
|
||||
socket_session<T>::socket_session(websocket::stream<beast::ssl_stream<beast::tcp_stream>> websocket, socket_session_handler<T> &sess_handler)
|
||||
: ws(std::move(websocket)), sess_handler(sess_handler)
|
||||
{
|
||||
// We use binary data instead of ASCII/UTF8 character data.
|
||||
ws.binary(true);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
socket_session<T>::~socket_session()
|
||||
{
|
||||
sess_handler.on_close(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the largest permissible incoming message size. If exceeds over this limit will cause a
|
||||
* protocol failure
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::set_message_max_size(std::uint64_t size)
|
||||
{
|
||||
ws.read_message_max(size);
|
||||
}
|
||||
|
||||
//port and address will be used to identify from which remote party the message recieved in the handler
|
||||
template <class T>
|
||||
void socket_session<T>::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts)
|
||||
{
|
||||
ssl::stream_base::handshake_type handshake_type = ssl::stream_base::client;
|
||||
|
||||
if (sess_opts.max_message_size > 0)
|
||||
{
|
||||
// Setting maximum file size
|
||||
set_message_max_size(sess_opts.max_message_size);
|
||||
}
|
||||
|
||||
if (is_server_session)
|
||||
{
|
||||
/**
|
||||
* Set this flag to identify whether this socket session created when node acts as a server
|
||||
* INBOUND true - when node acts as server
|
||||
* INBOUND false (OUTBOUND) - when node acts as client
|
||||
*/
|
||||
flags.set(util::SESSION_FLAG::INBOUND);
|
||||
handshake_type = ssl::stream_base::server;
|
||||
}
|
||||
|
||||
this->port = port;
|
||||
this->address = address;
|
||||
|
||||
// Set the timeout.
|
||||
beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30));
|
||||
|
||||
// Perform the SSL handshake
|
||||
ws.next_layer().async_handshake(
|
||||
handshake_type,
|
||||
[sp = this->shared_from_this()](error_code ec) {
|
||||
sp->on_ssl_handshake(ec);
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Close an active websocket connection gracefully
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::on_ssl_handshake(error_code ec)
|
||||
{
|
||||
if (ec)
|
||||
return fail(ec, "handshake");
|
||||
|
||||
// Turn off the timeout on the tcp_stream, because
|
||||
// the websocket stream has its own timeout system.
|
||||
beast::get_lowest_layer(ws).expires_never();
|
||||
|
||||
if (flags[util::SESSION_FLAG::INBOUND])
|
||||
{
|
||||
// Set suggested timeout settings for the websocket
|
||||
ws.set_option(
|
||||
websocket::stream_base::timeout::suggested(
|
||||
beast::role_type::server));
|
||||
|
||||
// Accept the websocket handshake
|
||||
ws.async_accept(
|
||||
[sp = this->shared_from_this()](
|
||||
error_code ec) {
|
||||
sp->on_accept(ec);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
ws.set_option(
|
||||
websocket::stream_base::timeout::suggested(
|
||||
beast::role_type::client));
|
||||
|
||||
// Perform the websocket handshake
|
||||
ws.async_handshake(this->address, "/",
|
||||
[sp = this->shared_from_this()](
|
||||
error_code ec) {
|
||||
sp->on_accept(ec);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on acceptance of new connection
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::on_accept(error_code ec)
|
||||
{
|
||||
// Handle the error, if any
|
||||
if (ec)
|
||||
return fail(ec, "accept");
|
||||
|
||||
sess_handler.on_connect(this);
|
||||
|
||||
// Read a message
|
||||
ws.async_read(
|
||||
buffer,
|
||||
[sp = this->shared_from_this()](
|
||||
error_code ec, std::size_t bytes) {
|
||||
sp->on_read(ec, bytes);
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Executes on completion of recieiving a new message
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::on_read(error_code ec, std::size_t)
|
||||
{
|
||||
//if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler
|
||||
// read may get called when operation_aborted as well.
|
||||
// We don't need to process read operation in that case.
|
||||
if (ec == net::error::operation_aborted)
|
||||
return;
|
||||
|
||||
// Handle the error, if any
|
||||
if (ec)
|
||||
{
|
||||
// if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler
|
||||
on_close(ec, 1);
|
||||
return fail(ec, "read");
|
||||
}
|
||||
|
||||
// Wrap the buffer data in a string_view and call session handler.
|
||||
// We DO NOT transfer ownership of buffer data to the session handler. It should
|
||||
// read and process the message and we will clear the buffer after its done with it.
|
||||
const char *buffer_data = net::buffer_cast<const char *>(buffer.data());
|
||||
std::string_view message(buffer_data, buffer.size());
|
||||
sess_handler.on_message(this, message);
|
||||
|
||||
// Clear the buffer
|
||||
buffer.consume(buffer.size());
|
||||
|
||||
// Read another message
|
||||
ws.async_read(
|
||||
buffer,
|
||||
[sp = this->shared_from_this()](
|
||||
error_code ec, std::size_t bytes) {
|
||||
sp->on_read(ec, bytes);
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Send message through an active websocket connection
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::send(T msg)
|
||||
{
|
||||
// Always add to queue
|
||||
queue.push_back(std::move(msg));
|
||||
|
||||
// Are we already writing?
|
||||
if (queue.size() > 1)
|
||||
return;
|
||||
|
||||
std::string_view sv = queue.front().buffer();
|
||||
|
||||
// We are not currently writing, so send this immediately
|
||||
ws.async_write(
|
||||
// Project the outbound_message buffer from the queue front into the asio buffer.
|
||||
net::buffer(sv.data(), sv.length()),
|
||||
[sp = this->shared_from_this()](
|
||||
error_code ec, std::size_t bytes) {
|
||||
sp->on_write(ec, bytes);
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Executes on completion of write operation to a socket
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::on_write(error_code ec, std::size_t)
|
||||
{
|
||||
// Handle the error, if any
|
||||
if (ec)
|
||||
return fail(ec, "write");
|
||||
|
||||
// Remove the string from the queue
|
||||
queue.erase(queue.begin());
|
||||
|
||||
// Send the next message if any
|
||||
if (!queue.empty())
|
||||
{
|
||||
std::string_view sv = queue.front().buffer();
|
||||
ws.async_write(
|
||||
net::buffer(sv.data(), sv.length()),
|
||||
[sp = this->shared_from_this()](
|
||||
error_code ec, std::size_t bytes) {
|
||||
sp->on_write(ec, bytes);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Close an active websocket connection gracefully
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::close()
|
||||
{
|
||||
// Close the WebSocket connection
|
||||
ws.async_close(websocket::close_code::normal,
|
||||
[sp = this->shared_from_this()](
|
||||
error_code ec) {
|
||||
sp->on_close(ec, 0);
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Executes on completion of closing a socket connection
|
||||
*/
|
||||
//type will be used identify whether the error is due to failure in closing the web socket or transfer of another exception to this method
|
||||
template <class T>
|
||||
void socket_session<T>::on_close(error_code ec, std::int8_t type)
|
||||
{
|
||||
if (type == 1)
|
||||
return;
|
||||
|
||||
if (ec)
|
||||
return fail(ec, "close");
|
||||
}
|
||||
|
||||
// When called, initializes the unique id string for this session.
|
||||
template <class T>
|
||||
void socket_session<T>::init_uniqueid()
|
||||
{
|
||||
// Create a unique id for the session combining ip and port.
|
||||
// We prepare this appended string here because we need to use it for finding elemends from the maps
|
||||
// for validation purposes whenever a message is received.
|
||||
uniqueid.append(address).append(":").append(port);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on error
|
||||
*/
|
||||
template <class T>
|
||||
void socket_session<T>::fail(error_code ec, char const *what)
|
||||
{
|
||||
LOG_ERR << what << ": " << ec.message();
|
||||
|
||||
// Don't report these
|
||||
if (ec == net::error::operation_aborted ||
|
||||
ec == websocket::error::closed)
|
||||
return;
|
||||
}
|
||||
|
||||
} // namespace sock
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user