Implemented socket message templates. (#40)

Implemented socket message templates to support broadcast (shared_ptr) and to achieve buffer zero-copy.
This commit is contained in:
Ravin Perera
2019-10-23 13:04:57 +05:30
committed by GitHub
parent b4237f1285
commit 61b38bb0a0
16 changed files with 636 additions and 539 deletions

View File

@@ -7,9 +7,6 @@ find_package(Boost REQUIRED COMPONENTS system log)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY build)
add_executable(hpcore
src/sock/socket_client.cpp
src/sock/socket_server.cpp
src/sock/socket_session.cpp
src/p2p/peer_session_handler.cpp
src/p2p/p2p.cpp
src/util.cpp

View File

@@ -5,7 +5,6 @@
#include "../crypto.hpp"
#include "../util.hpp"
#include "../hplog.hpp"
#include "peer_session_handler.hpp"
#include "p2p.hpp"
namespace p2p
@@ -13,7 +12,7 @@ namespace p2p
/**
* Peer connections exposing to the application
*/
std::unordered_map<std::string, sock::socket_session *> peer_connections;
std::unordered_map<std::string, sock::socket_session<peer_outbound_message> *> peer_connections;
/**
* Peer session handler instance. This instance's methods will be fired for any peer socket activity.
@@ -51,7 +50,7 @@ void start_peer_connections()
auto address = net::ip::make_address(conf::cfg.listenip);
// Start listening to peers
std::make_shared<sock::socket_server>(
std::make_shared<sock::socket_server<peer_outbound_message>>(
ioc,
tcp::endpoint{address, conf::cfg.peerport},
global_peer_session_handler)
@@ -77,7 +76,8 @@ void peer_connection_watchdog()
if (peer_connections.find(v.first) == peer_connections.end())
{
LOG_DBG << "Trying to connect :" << v.second.first << ":" << v.second.second;
std::make_shared<sock::socket_client>(ioc, global_peer_session_handler)->run(v.second.first, v.second.second);
std::make_shared<sock::socket_client<peer_outbound_message>>(ioc, global_peer_session_handler)
->run(v.second.first, v.second.second);
}
}
@@ -126,7 +126,18 @@ bool validate_peer_message(std::string_view message, std::string_view signature,
return false;
}
//get message hash and see wheteher message is already recieved -> abandon
//verify message signature.
//this should be the last validation since this is bit expensive
auto signature_verified = crypto::verify(message, signature, pubkey);
if (signature_verified != 0)
{
LOG_DBG << "Signature verification failed";
return false;
}
// After signature is verified, get message hash and see wheteher
// message is already recieved -> abandon if duplicate.
auto messageHash = crypto::sha_512_hash(message, "PEERMSG", 7);
if (recent_peer_msghash.count(messageHash) == 0)
@@ -139,16 +150,6 @@ bool validate_peer_message(std::string_view message, std::string_view signature,
return false;
}
//verify message signature.
//this should be the last validation since this is bit expensive
auto signature_verified = crypto::verify(message, signature, pubkey);
if (signature_verified != 0)
{
LOG_DBG << "Signature verification failed";
return false;
}
return true;
}

View File

@@ -3,13 +3,14 @@
#include <unordered_map>
#include "../sock/socket_session.hpp"
#include "peer_session_handler.hpp"
namespace p2p
{
/**
* This is used to store active peer connections mapped by the unique key of socket session
*/
extern std::unordered_map<std::string, sock::socket_session *> peer_connections;
extern std::unordered_map<std::string, sock::socket_session<peer_outbound_message> *> peer_connections;
/**
* This is used to store hash of recent peer messages: messagehash -> timestamp of message

View File

@@ -1,20 +1,40 @@
#include <iostream>
#include <flatbuffers/flatbuffers.h>
#include "../conf.hpp"
#include "../crypto.hpp"
#include "p2p.hpp"
#include "../util.hpp"
#include "../hplog.hpp"
#include "p2p.hpp"
#include "peer_session_handler.hpp"
#include "flatbuffers/flatbuffers.h"
#include "message_content_generated.h"
#include "message_container_generated.h"
namespace p2p
{
peer_outbound_message::peer_outbound_message(
std::shared_ptr<flatbuffers::FlatBufferBuilder> _fbbuilder_ptr)
{
fbbuilder_ptr = _fbbuilder_ptr;
}
// Returns a reference to the flatbuffer builder object.
flatbuffers::FlatBufferBuilder &peer_outbound_message::builder()
{
return *fbbuilder_ptr;
}
// Returns a reference to the data buffer that must be written to the socket.
std::string_view peer_outbound_message::buffer()
{
return std::string_view(
reinterpret_cast<const char *>((*fbbuilder_ptr).GetBufferPointer()),
(*fbbuilder_ptr).GetSize());
}
//private method used to create a proposal message with dummy data.
//Will be similiar to consensus proposal creation in each stage.
const std::string create_message()
const std::string create_message(flatbuffers::FlatBufferBuilder &container_builder)
{
//todo:get a average propsal message size and allocate builder based on that.
/*
@@ -41,10 +61,6 @@ const std::string create_message()
const char *content_str = reinterpret_cast<const char *>(buf);
std::string_view message_content(content_str, size);
//todo: set container builder defualt builder size to combination of serialized content length + signature length(which is fixed)
// Do this when implementing consensus.
flatbuffers::FlatBufferBuilder container_builder(1024);
//create container message content from serialised content from previous step.
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> content = container_builder.CreateVector(buf, size);
@@ -63,45 +79,52 @@ const std::string create_message()
return std::string((char *)message_buf, buf_size);
}
//private method returns string_view from Flat Buffer vector of bytes.
std::string_view flatbuff_bytes_to_sv(const flatbuffers::Vector<uint8_t> *pointer)
/**
* Private method to return string_view from flat buffer data pointer and length.
*/
std::string_view flatbuff_bytes_to_sv(const uint8_t *data, flatbuffers::uoffset_t length)
{
flatbuffers::uoffset_t pointer_length = pointer->size();
const uint8_t *pointer_buf = pointer->Data();
const char *signature_content_str = reinterpret_cast<const char *>(data);
return std::string_view(signature_content_str, length);
}
const char *signature_content_str = reinterpret_cast<const char *>(pointer_buf);
return std::string_view(signature_content_str, pointer_length);
/**
* Private method to return string_view from Flat Buffer vector of bytes.
*/
std::string_view flatbuff_bytes_to_sv(const flatbuffers::Vector<uint8_t> *buffer)
{
return flatbuff_bytes_to_sv(buffer->Data(), buffer->size());
}
/**
* This gets hit every time a peer connects to HP via the peer port (configured in contract config).
*/
void peer_session_handler::on_connect(sock::socket_session *session)
void peer_session_handler::on_connect(sock::socket_session<peer_outbound_message> *session)
{
if (!session->flags_[util::SESSION_FLAG::INBOUND])
{
// We init the session unique id to associate with the challenge.
session->init_uniqueid();
peer_connections.insert(std::make_pair(session->uniqueid_, session));
LOG_DBG << "Adding peer to list :" << session->uniqueid_ + " " << session->address_ + " " << session->port_;
peer_connections.insert(std::make_pair(session->uniqueid, session));
LOG_DBG << "Adding peer to list: " << session->uniqueid << " " << session->address << " " << session->port;
}
else
{
std::string message = create_message();
session->send(std::move(message));
// todo: set container builder defualt builder size to combination of serialized content length + signature length(which is fixed)
peer_outbound_message msg(std::make_shared<flatbuffers::FlatBufferBuilder>(1024));
std::string message = create_message(msg.builder());
session->send(msg);
}
}
//peer session on message callback method
//validate and handle each type of peer messages.
void peer_session_handler::on_message(sock::socket_session *session, std::string &&message)
void peer_session_handler::on_message(sock::socket_session<peer_outbound_message> *session, std::string_view message)
{
// LOG_DBG << "on-message : " << message;
peer_connections.insert(std::make_pair(session->uniqueid_, session));
//session->send(std::make_shared<std::string>(message));
peer_connections.insert(std::make_pair(session->uniqueid, session));
//Accessing message buffer
uint8_t *container_pointer = (uint8_t *)message.data();
const uint8_t *container_pointer = reinterpret_cast<const uint8_t *>(message.data());
size_t container_length = message.length();
//Defining Flatbuffer verifier (default max depth = 64, max_tables = 1000000,)
@@ -116,16 +139,14 @@ void peer_session_handler::on_message(sock::socket_session *session, std::string
//Get serialised message content.
const flatbuffers::Vector<uint8_t> *container_content = container->content();
const uint8_t *container_content_buf = container_content->Data();
std::string_view message_content = flatbuff_bytes_to_sv(container_content);
//Accessing message content.
//Accessing message content and size.
const uint8_t *content_pointer = container_content->Data();
flatbuffers::uoffset_t content_size = container_content->size();
//Defining Flatbuffer verifier for content message verification.
//Since content is also serialised by using Filterbuf we can verify it using Filterbuffer.
flatbuffers::Verifier content_verifier(content_pointer, message_content.size());
flatbuffers::Verifier content_verifier(content_pointer, content_size);
//verify content message conent using flatbuffer verifier.
if (VerifyContainerBuffer(content_verifier))
@@ -140,12 +161,12 @@ void peer_session_handler::on_message(sock::socket_session *session, std::string
uint64_t timestamp = proposal->timestamp();
//Get public key of message originating node.
const flatbuffers::Vector<uint8_t> *pubkey = proposal->pubkey();
std::string_view message_pubkey = flatbuff_bytes_to_sv(pubkey);
std::string_view message_pubkey = flatbuff_bytes_to_sv(proposal->pubkey());
//Get signature from container message.
const flatbuffers::Vector<uint8_t> *signature = container->signature();
std::string_view message_signature = flatbuff_bytes_to_sv(signature);
std::string_view message_signature = flatbuff_bytes_to_sv(container->signature());
std::string_view message_content = flatbuff_bytes_to_sv(content_pointer, content_size);
//validate message for malleability, timeliness, signature and prune recieving messages.
bool validated = p2p::validate_peer_message(message_content, message_signature, message_pubkey, timestamp, version);
@@ -186,9 +207,9 @@ void peer_session_handler::on_message(sock::socket_session *session, std::string
}
//peer session on message callback method
void peer_session_handler::on_close(sock::socket_session *session)
void peer_session_handler::on_close(sock::socket_session<peer_outbound_message> *session)
{
LOG_DBG << "on_closing peer :" << session->uniqueid_;
LOG_DBG << "on_closing peer :" << session->uniqueid;
}
} // namespace p2p

View File

@@ -2,20 +2,40 @@
#define _HP_P2P_SESSION_H_
#include <boost/beast/core.hpp>
#include <flatbuffers/flatbuffers.h>
#include "../sock/socket_session_handler.hpp"
#include "../sock/socket_session.hpp"
namespace p2p
{
class peer_session_handler : public sock::socket_session_handler
/**
* Represents a peer message generated using flatbuffer that must be sent to the socket.
* We keep a shared_ptr of flatbuffer builder to support broadcasting the same message
* on multiple connections without copying buffer contents.
*/
class peer_outbound_message : public sock::outbound_message
{
std::shared_ptr<flatbuffers::FlatBufferBuilder> fbbuilder_ptr;
public:
peer_outbound_message(std::shared_ptr<flatbuffers::FlatBufferBuilder> _fbbuilder_ptr);
// Returns a reference to the flatbuffer builder object.
flatbuffers::FlatBufferBuilder& builder();
// Returns a reference to the data buffer that must be written to the socket.
virtual std::string_view buffer();
};
class peer_session_handler : public sock::socket_session_handler<peer_outbound_message>
{
public:
void on_connect(sock::socket_session *session);
void on_connect(sock::socket_session<peer_outbound_message> *session);
void on_message(sock::socket_session *session, std::string &&message);
void on_message(sock::socket_session<peer_outbound_message> *session, std::string_view message);
void on_close(sock::socket_session *session);
void on_close(sock::socket_session<peer_outbound_message> *session);
};
} // namespace p2p

View File

@@ -1,93 +0,0 @@
#include <iostream>
#include "socket_client.hpp"
#include "../hplog.hpp"
using tcp = net::ip::tcp;
using error = boost::system::error_code;
namespace sock
{
socket_client::socket_client(net::io_context &ioc, socket_session_handler &session_handler)
: resolver_(net::make_strand(ioc)), ws_(net::make_strand(ioc)), sess_handler_(session_handler)
{
}
/**
* Entry point to socket client which will intiate a connection to server
*/
// boost async_resolve function requires a port as a string because of that port is passed as a string
void socket_client::run(std::string_view host, std::string_view port)
{
host_ = host;
port_ = port;
// Look up the domain name
resolver_.async_resolve(
host,
port,
[self = shared_from_this()](error ec, tcp::resolver::results_type results) {
self->on_resolve(ec, results);
});
}
/**
* Executes on completion of resolving the server
*/
void socket_client::on_resolve(error ec, tcp::resolver::results_type results)
{
if (ec)
socket_client_fail(ec, "socket_client_resolve");
// Make the connection on the IP address we get from a lookup
beast::get_lowest_layer(ws_).async_connect(
results,
[self = shared_from_this()](error ec, tcp::resolver::results_type::endpoint_type type) {
self->on_connect(ec, type);
});
}
/**
* Executes on completion of connecting to the server
*/
void socket_client::on_connect(error ec, tcp::resolver::results_type::endpoint_type)
{
if (ec)
socket_client_fail(ec, "socket_client_connect");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::client));
// Perform the websocket handshake
ws_.async_handshake(host_, "/",
[self = shared_from_this()](error ec) {
self->on_handshake(ec);
});
}
/**
* Executes on completion of handshake
*/
void socket_client::on_handshake(error ec)
{
//Creates a new socket session object
std::make_shared<socket_session>(
ws_, sess_handler_)
->client_run(std::move(host_), std::move(port_), ec);
}
/**
* Executes on error
*/
void socket_client::socket_client_fail(beast::error_code ec, char const *what)
{
LOG_ERR << what << ": " << ec.message();
}
} // namespace sock

View File

@@ -5,6 +5,7 @@
#include <boost/beast.hpp>
#include "socket_session.hpp"
#include "socket_session_handler.hpp"
#include "../hplog.hpp"
namespace beast = boost::beast;
namespace net = boost::asio;
@@ -20,13 +21,14 @@ namespace sock
* Represents an active WebSocket client connection
* Based on the implementation from https://github.com/vinniefalco/CppCon2018
*/
class socket_client : public std::enable_shared_from_this<socket_client>
template <class T>
class socket_client : public std::enable_shared_from_this<socket_client<T>>
{
tcp::resolver resolver_; // resolver used to resolve host and the port
websocket::stream<beast::tcp_stream> ws_; // web socket stream used to send and receive messages
std::string host_; // address of the server in which the client connects
std::string port_; // port of the server in which client connects
socket_session_handler &sess_handler_; // handler passed to gain access to websocket events
tcp::resolver resolver; // resolver used to resolve host and the port
websocket::stream<beast::tcp_stream> ws; // web socket stream used to send and receive messages
std::string host; // address of the server in which the client connects
std::string port; // port of the server in which client connects
socket_session_handler<T> &sess_handler_; // handler passed to gain access to websocket events
void on_resolve(error ec, tcp::resolver::results_type results);
@@ -42,10 +44,99 @@ class socket_client : public std::enable_shared_from_this<socket_client>
public:
// Resolver and socket require an io_context
socket_client(net::io_context &ioc, socket_session_handler &session_handler);
socket_client(net::io_context &ioc, socket_session_handler<T> &session_handler);
//Entry point to the client which requires an active host and port
void run(std::string_view host, std::string_view port);
};
template <class T>
socket_client<T>::socket_client(net::io_context &ioc, socket_session_handler<T> &session_handler)
: resolver(net::make_strand(ioc)), ws(net::make_strand(ioc)), sess_handler_(session_handler)
{
}
/**
* Entry point to socket client which will intiate a connection to server
*/
// boost async_resolve function requires a port as a string because of that port is passed as a string
template <class T>
void socket_client<T>::run(std::string_view host, std::string_view port)
{
this->host = host;
this->port = port;
// Look up the domain name
resolver.async_resolve(
host,
port,
[self = this->shared_from_this()](error ec, tcp::resolver::results_type results) {
self->on_resolve(ec, results);
});
}
/**
* Executes on completion of resolving the server
*/
template <class T>
void socket_client<T>::on_resolve(error ec, tcp::resolver::results_type results)
{
if (ec)
socket_client_fail(ec, "socket_client_resolve");
// Make the connection on the IP address we get from a lookup
beast::get_lowest_layer(ws).async_connect(
results,
[self = this->shared_from_this()](error ec, tcp::resolver::results_type::endpoint_type type) {
self->on_connect(ec, type);
});
}
/**
* Executes on completion of connecting to the server
*/
template <class T>
void socket_client<T>::on_connect(error ec, tcp::resolver::results_type::endpoint_type)
{
if (ec)
socket_client_fail(ec, "socket_client_connect");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws).expires_never();
// Set suggested timeout settings for the websocket
ws.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::client));
// Perform the websocket handshake
ws.async_handshake(host, "/",
[self = this->shared_from_this()](error ec) {
self->on_handshake(ec);
});
}
/**
* Executes on completion of handshake
*/
template <class T>
void socket_client<T>::on_handshake(error ec)
{
//Creates a new socket session object
std::make_shared<socket_session<T>>(
ws, sess_handler_)
->client_run(std::move(host), std::move(port), ec);
}
/**
* Executes on error
*/
template <class T>
void socket_client<T>::socket_client_fail(beast::error_code ec, char const *what)
{
LOG_ERR << what << ": " << ec.message();
}
} // namespace sock
#endif

View File

@@ -1,112 +0,0 @@
#include <iostream>
#include <string>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/strand.hpp>
#include "socket_server.hpp"
#include "../hplog.hpp"
namespace net = boost::asio; // namespace asio
using tcp = net::ip::tcp;
using error_code = boost::system::error_code;
namespace sock
{
socket_server::socket_server(net::io_context &ioc, tcp::endpoint endpoint, socket_session_handler &session_handler)
: acceptor_(ioc), socket_(ioc),sess_handler_(session_handler)
{
error_code ec;
// Open the acceptor
acceptor_.open(endpoint.protocol(), ec);
if (ec)
{
fail(ec, "open");
return;
}
// Allow address reuse
acceptor_.set_option(net::socket_base::reuse_address(true));
if (ec)
{
fail(ec, "set_option");
return;
}
// Bind to the server address
acceptor_.bind(endpoint, ec);
if (ec)
{
fail(ec, "bind");
return;
}
// Start listening for connections
acceptor_.listen(
net::socket_base::max_listen_connections, ec);
if (ec)
{
fail(ec, "listen");
return;
}
}
/**
* Entry point to socket server which accepts new connections
*/
void socket_server::run()
{
// Start accepting a connection
acceptor_.async_accept(
socket_,
[self = shared_from_this()](error_code ec) {
self->on_accept(ec);
});
}
/**
* Executes on error
*/
void socket_server::fail(error_code ec, char const *what)
{
// Don't report on canceled operations
if (ec == net::error::operation_aborted)
return;
LOG_ERR << what << ": " << ec.message();
}
/**
* Executes on acceptance of new connection
*/
void socket_server::on_accept(error_code ec)
{
if (ec)
{
return fail(ec, "accept");
}
else
{
std::string port = std::to_string(socket_.remote_endpoint().port());
std::string address = socket_.remote_endpoint().address().to_string();
//Creating websocket stream required to pass to initiate a new session
websocket::stream<beast::tcp_stream> ws(std::move(socket_));
// Launch a new session for this connection
std::make_shared<socket_session>(
ws, sess_handler_)
->server_run(std::move(address), std::move(port));
}
// Accept another connection
acceptor_.async_accept(
socket_,
[self = shared_from_this()](error_code ec) {
self->on_accept(ec);
});
}
} // namespace sock

View File

@@ -2,12 +2,16 @@
#define _SOCK_SERVER_LISTENER_H_
#include <boost/asio.hpp>
#include <boost/asio/strand.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include "socket_session_handler.hpp"
#include "../hplog.hpp"
namespace net = boost::asio; // namespace asio
using tcp = net::ip::tcp;
using error = boost::system::error_code; // from <boost/system/error_code.hpp>
using error_code = boost::system::error_code;
namespace sock
{
@@ -16,22 +20,124 @@ namespace sock
* Represents an active WebSocket server connection
* Based on the implementation from https://github.com/vinniefalco/CppCon2018
*/
class socket_server : public std::enable_shared_from_this<socket_server>
template <class T>
class socket_server : public std::enable_shared_from_this<socket_server<T>>
{
tcp::acceptor acceptor_; // acceptor which accepts new connections
tcp::socket socket_; // socket in which the client connects
socket_session_handler &sess_handler_; // handler passed to gain access to websocket events
tcp::acceptor acceptor; // acceptor which accepts new connections
tcp::socket socket; // socket in which the client connects
socket_session_handler<T> &sess_handler; // handler passed to gain access to websocket events
void fail(error ec, char const *what);
void fail(error_code ec, char const *what);
void on_accept(error ec);
void on_accept(error_code ec);
public:
socket_server(net::io_context &ioc, tcp::endpoint endpoint, socket_session_handler &session_handler);
socket_server(net::io_context &ioc, tcp::endpoint endpoint, socket_session_handler<T> &session_handler);
// Start accepting incoming connections
void run();
};
template <class T>
socket_server<T>::socket_server(net::io_context &ioc, tcp::endpoint endpoint, socket_session_handler<T> &session_handler)
: acceptor(ioc), socket(ioc), sess_handler(session_handler)
{
error_code ec;
// Open the acceptor
acceptor.open(endpoint.protocol(), ec);
if (ec)
{
fail(ec, "open");
return;
}
// Allow address reuse
acceptor.set_option(net::socket_base::reuse_address(true));
if (ec)
{
fail(ec, "set_option");
return;
}
// Bind to the server address
acceptor.bind(endpoint, ec);
if (ec)
{
fail(ec, "bind");
return;
}
// Start listening for connections
acceptor.listen(
net::socket_base::max_listen_connections, ec);
if (ec)
{
fail(ec, "listen");
return;
}
}
/**
* Entry point to socket server which accepts new connections
*/
template <class T>
void socket_server<T>::run()
{
// Start accepting a connection
acceptor.async_accept(
socket,
[self = this->shared_from_this()](error_code ec) {
self->on_accept(ec);
});
}
/**
* Executes on error
*/
template <class T>
void socket_server<T>::fail(error_code ec, char const *what)
{
// Don't report on canceled operations
if (ec == net::error::operation_aborted)
return;
LOG_ERR << what << ": " << ec.message();
}
/**
* Executes on acceptance of new connection
*/
template <class T>
void socket_server<T>::on_accept(error_code ec)
{
if (ec)
{
return fail(ec, "accept");
}
else
{
std::string port = std::to_string(socket.remote_endpoint().port());
std::string address = socket.remote_endpoint().address().to_string();
//Creating websocket stream required to pass to initiate a new session
websocket::stream<beast::tcp_stream> ws(std::move(socket));
// Launch a new session for this connection
std::make_shared<socket_session<T>>(
ws, sess_handler)
->server_run(std::move(address), std::move(port));
}
// Accept another connection
acceptor.async_accept(
socket,
[self = this->shared_from_this()](error_code ec) {
self->on_accept(ec);
});
}
} // namespace sock
#endif

View File

@@ -1,209 +0,0 @@
#include <iostream>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include "socket_session.hpp"
#include "../util.hpp"
namespace net = boost::asio;
using tcp = net::ip::tcp;
using error_code = boost::system::error_code;
namespace sock
{
socket_session::socket_session(websocket::stream<beast::tcp_stream> &websocket, socket_session_handler &sess_handler)
: ws_(std::move(websocket)), sess_handler_(sess_handler)
{
ws_.binary(true);
}
socket_session::~socket_session()
{
sess_handler_.on_close(this);
}
//port and address will be used to identify from which client the message recieved in the handler
void socket_session::server_run(const std::string &&address, const std::string &&port)
{
port_ = port;
address_ = address;
//Set this flag to identify whether this socket session created when node acts as a server
flags_.set(util::SESSION_FLAG::INBOUND);
// Accept the websocket handshake
ws_.async_accept(
[sp = shared_from_this()](
error ec) {
sp->on_accept(ec);
});
}
//port and address will be used to identify from which server the message recieved in the handler
void socket_session::client_run(const std::string &&address, const std::string &&port, error ec)
{
port_ = port;
address_ = address;
if (ec)
return fail(ec, "handshake");
sess_handler_.on_connect(this);
ws_.async_read(
buffer_,
[sp = shared_from_this()](
error_code ec, std::size_t bytes) {
sp->on_read(ec, bytes);
});
}
/**
* Executes on error
*/
void socket_session::fail(error_code ec, char const *what)
{
// LOG_ERR << what << ": " << ec.message();
// Don't report these
if (ec == net::error::operation_aborted ||
ec == websocket::error::closed)
return;
}
/**
* Executes on acceptance of new connection
*/
void socket_session::on_accept(error_code ec)
{
// Handle the error, if any
if (ec)
return fail(ec, "accept");
sess_handler_.on_connect(this);
// Read a message
ws_.async_read(
buffer_,
[sp = shared_from_this()](
error_code ec, std::size_t bytes) {
sp->on_read(ec, bytes);
});
}
/*
* Executes on completion of recieiving a new message
*/
void socket_session::on_read(error_code ec, std::size_t)
{
//if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler
// read may get called when operation_aborted as well.
// We don't need to process read operation in that case.
if (ec == net::error::operation_aborted)
return;
// Handle the error, if any
if (ec)
{
// if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler
on_close(ec, 1);
return fail(ec, "read");
}
std::string message = beast::buffers_to_string(buffer_.data());
sess_handler_.on_message(this, std::move(message));
// Clear the buffer
buffer_.consume(buffer_.size());
// Read another message
ws_.async_read(
buffer_,
[sp = shared_from_this()](
error_code ec, std::size_t bytes) {
sp->on_read(ec, bytes);
});
}
/*
* Send message through an active websocket connection
*/
void socket_session::send(std::string &&ss)
{
// Always add to queue
queue_.push_back(ss);
// Are we already writing?
if (queue_.size() > 1)
return;
// We are not currently writing, so send this immediately
ws_.async_write(
net::buffer(queue_.front()),
[sp = shared_from_this()](
error_code ec, std::size_t bytes) {
sp->on_write(ec, bytes);
});
}
/*
* Executes on completion of write operation to a socket
*/
void socket_session::on_write(error_code ec, std::size_t)
{
// Handle the error, if any
if (ec)
return fail(ec, "write");
// Remove the string from the queue
queue_.erase(queue_.begin());
// Send the next message if any
if (!queue_.empty())
ws_.async_write(
net::buffer(queue_.front()),
[sp = shared_from_this()](
error_code ec, std::size_t bytes) {
sp->on_write(ec, bytes);
});
}
/*
* Close an active websocket connection gracefully
*/
void socket_session::close()
{
// Close the WebSocket connection
ws_.async_close(websocket::close_code::normal,
[sp = shared_from_this()](
error_code ec) {
sp->on_close(ec, 0);
});
}
/*
* Executes on completion of closing a socket connection
*/
//type will be used identify whether the error is due to failure in closing the web socket or transfer of another exception to this method
void socket_session::on_close(error_code ec, std::int8_t type)
{
// sess_handler_.on_close(this);
// if (type == 1)
// return;
// if (ec)
// return fail(ec, "close");
}
// When called, initializes the unique id string for this session.
void socket_session::init_uniqueid()
{
// Create a unique id for the session combining ip and port.
// We prepare this appended string here because we need to use it for finding elemends from the maps
// for validation purposes whenever a message is received.
uniqueid_.append(address_).append(":").append(port_);
}
} // namespace sock

View File

@@ -1,11 +1,14 @@
#ifndef _SOCK_SERVER_SESSION_H_
#define _SOCK_SERVER_SESSION_H_
#include <string>
#include <memory>
#include <vector>
#include <bitset>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include "../util.hpp"
#include "socket_session_handler.hpp"
namespace beast = boost::beast;
@@ -14,51 +17,67 @@ namespace websocket = boost::beast::websocket;
namespace http = boost::beast::http;
using tcp = net::ip::tcp;
using error = boost::system::error_code;
using error_code = boost::system::error_code;
namespace sock
{
/**
* Represents an outbound message that is sent with a websocket.
* We use this class to wrap different object types holding actual message contents.
* We use this mechanism to achieve end-to-end zero-copy between original message
* content generator and websocket flush.
*/
class outbound_message
{
public:
// Returns a pointer to the internal buffer owned by the message object.
// Contents of this buffer is the message that is sent/received with the socket.
virtual std::string_view buffer() = 0;
};
//Forward Declaration
template <class T>
class socket_session_handler;
/**
* Represents an active WebSocket connection
*/
class socket_session : public std::enable_shared_from_this<socket_session>
template <class T>
class socket_session : public std::enable_shared_from_this<socket_session<T>>
{
beast::flat_buffer buffer_; // used to store incoming messages
websocket::stream<beast::tcp_stream> ws_; // websocket stream used send an recieve messages
std::vector<std::string> queue_; // uses to store messages temporarily until it is sent to the relevant party
socket_session_handler &sess_handler_; // handler passed to gain access to websocket events
beast::flat_buffer buffer; // used to store incoming messages
websocket::stream<beast::tcp_stream> ws; // websocket stream used send an recieve messages
std::vector<T> queue; // used to store messages temporarily until it is sent to the relevant party
socket_session_handler<T> &sess_handler; // handler passed to gain access to websocket events
void fail(error ec, char const *what);
void fail(error_code ec, char const *what);
void on_accept(error ec);
void on_accept(error_code ec);
void on_read(error ec, std::size_t bytes_transferred);
void on_read(error_code ec, std::size_t bytes_transferred);
void on_write(error ec, std::size_t bytes_transferred);
void on_write(error_code ec, std::size_t bytes_transferred);
void on_close(error ec, std::int8_t type);
void on_close(error_code ec, std::int8_t type);
public:
socket_session(websocket::stream<beast::tcp_stream> &websocket, socket_session_handler &sess_handler);
socket_session(websocket::stream<beast::tcp_stream> &websocket, socket_session_handler<T> &sess_handler);
~socket_session();
// Port and the address of the remote party is being saved to used in the session handler
// to identify from which remote party the message recieved. Since the port is passed as a string
// to identify from which remote party the message recieved. Since the port is passed as a string
// from the parent we store as it is, since we are not going to pass it anywhere or used in a method
// The port of the remote party.
std::string port_;
std::string port;
// The IP address of the remote party.
std::string address_;
std::string address;
// The unique identifier of the remote party (format <ip>:<port>).
std::string uniqueid_;
std::string uniqueid;
// The set of util::SESSION_FLAG enum flags that will be set by user-code of this calss.
// We mainly use this to store contexual information about this session based on the use case.
@@ -66,14 +85,232 @@ public:
std::bitset<8> flags_;
void server_run(const std::string &&address, const std::string &&port);
void client_run(const std::string &&address, const std::string &&port, error ec);
void client_run(const std::string &&address, const std::string &&port, error_code ec);
void send(std::string &&ss);
void send(T msg);
// When called, initializes the unique id string for this session.
void init_uniqueid();
void close();
};
template <class T>
socket_session<T>::socket_session(websocket::stream<beast::tcp_stream> &websocket, socket_session_handler<T> &sess_handler)
: ws(std::move(websocket)), sess_handler(sess_handler)
{
// We use binary data instead of ASCII/UTF8 character data.
ws.binary(true);
}
template <class T>
socket_session<T>::~socket_session()
{
sess_handler.on_close(this);
}
//port and address will be used to identify from which client the message recieved in the handler
template <class T>
void socket_session<T>::server_run(const std::string &&address, const std::string &&port)
{
this->port = port;
this->address = address;
//Set this flag to identify whether this socket session created when node acts as a server
flags_.set(util::SESSION_FLAG::INBOUND);
// Accept the websocket handshake
ws.async_accept(
[sp = this->shared_from_this()](
error_code ec) {
sp->on_accept(ec);
});
}
//port and address will be used to identify from which server the message recieved in the handler
template <class T>
void socket_session<T>::client_run(const std::string &&address, const std::string &&port, error_code ec)
{
this->port = port;
this->address = address;
if (ec)
return fail(ec, "handshake");
sess_handler.on_connect(this);
ws.async_read(
buffer,
[sp = this->shared_from_this()](
error_code ec, std::size_t bytes) {
sp->on_read(ec, bytes);
});
}
/**
* Executes on error
*/
template <class T>
void socket_session<T>::fail(error_code ec, char const *what)
{
// LOG_ERR << what << ": " << ec.message();
// Don't report these
if (ec == net::error::operation_aborted ||
ec == websocket::error::closed)
return;
}
/**
* Executes on acceptance of new connection
*/
template <class T>
void socket_session<T>::on_accept(error_code ec)
{
// Handle the error, if any
if (ec)
return fail(ec, "accept");
sess_handler.on_connect(this);
// Read a message
ws.async_read(
buffer,
[sp = this->shared_from_this()](
error_code ec, std::size_t bytes) {
sp->on_read(ec, bytes);
});
}
/*
* Executes on completion of recieiving a new message
*/
template <class T>
void socket_session<T>::on_read(error_code ec, std::size_t)
{
//if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler
// read may get called when operation_aborted as well.
// We don't need to process read operation in that case.
if (ec == net::error::operation_aborted)
return;
// Handle the error, if any
if (ec)
{
// if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler
on_close(ec, 1);
return fail(ec, "read");
}
// Wrap the buffer data in a string_view and call session handler.
// We DO NOT transfer ownership of buffer data to the session handler. It should
// read and process the message and we will clear the buffer after its done with it.
const char *buffer_data = net::buffer_cast<const char *>(buffer.data());
std::string_view message(buffer_data, buffer.size());
sess_handler.on_message(this, message);
// Clear the buffer
buffer.consume(buffer.size());
// Read another message
ws.async_read(
buffer,
[sp = this->shared_from_this()](
error_code ec, std::size_t bytes) {
sp->on_read(ec, bytes);
});
}
/*
* Send message through an active websocket connection
*/
template <class T>
void socket_session<T>::send(T msg)
{
// Always add to queue
queue.push_back(std::move(msg));
// Are we already writing?
if (queue.size() > 1)
return;
std::string_view sv = queue.front().buffer();
// We are not currently writing, so send this immediately
ws.async_write(
// Project the outbound_message buffer from the queue front into the asio buffer.
net::buffer(sv.data(), sv.length()),
[sp = this->shared_from_this()](
error_code ec, std::size_t bytes) {
sp->on_write(ec, bytes);
});
}
/*
* Executes on completion of write operation to a socket
*/
template <class T>
void socket_session<T>::on_write(error_code ec, std::size_t)
{
// Handle the error, if any
if (ec)
return fail(ec, "write");
// Remove the string from the queue
queue.erase(queue.begin());
// Send the next message if any
if (!queue.empty())
{
std::string_view sv = queue.front().buffer();
ws.async_write(
net::buffer(sv.data(), sv.length()),
[sp = this->shared_from_this()](
error_code ec, std::size_t bytes) {
sp->on_write(ec, bytes);
});
}
}
/*
* Close an active websocket connection gracefully
*/
template <class T>
void socket_session<T>::close()
{
// Close the WebSocket connection
ws.async_close(websocket::close_code::normal,
[sp = this->shared_from_this()](
error_code ec) {
sp->on_close(ec, 0);
});
}
/*
* Executes on completion of closing a socket connection
*/
//type will be used identify whether the error is due to failure in closing the web socket or transfer of another exception to this method
template <class T>
void socket_session<T>::on_close(error_code ec, std::int8_t type)
{
// sess_handler.on_close(this);
// if (type == 1)
// return;
// if (ec)
// return fail(ec, "close");
}
// When called, initializes the unique id string for this session.
template <class T>
void socket_session<T>::init_uniqueid()
{
// Create a unique id for the session combining ip and port.
// We prepare this appended string here because we need to use it for finding elemends from the maps
// for validation purposes whenever a message is received.
uniqueid.append(address).append(":").append(port);
}
} // namespace sock
#endif

View File

@@ -7,28 +7,30 @@ namespace sock
{
// Forward declaration
template <class T>
class socket_session;
/**
* Represents a WebSocket sessions handler. Can inherit from this class and access websocket events
*/
template <class T>
class socket_session_handler
{
public:
/**
* Executes on initiation of a new connection
*/
virtual void on_connect(socket_session *session) = 0;
virtual void on_connect(socket_session<T> *session) = 0;
/**
* Executes on recieval of new message
*/
virtual void on_message(socket_session *session, std::string &&message) = 0;
virtual void on_message(socket_session<T> *session, std::string_view message) = 0;
/**
* Executes on websocket connection close
*/
virtual void on_close(socket_session *session) = 0;
virtual void on_close(socket_session<T> *session) = 0;
};
} // namespace sock

View File

@@ -19,27 +19,39 @@ using error = boost::system::error_code;
namespace usr
{
user_outbound_message::user_outbound_message(std::string &&_msg)
{
msg = std::move(_msg);
}
// Returns the buffer that should be written to the socket.
std::string_view user_outbound_message::buffer()
{
return std::string_view(msg.data(), msg.size());
}
/**
* This gets hit every time a client connects to HP via the public port (configured in contract config).
*/
void user_session_handler::on_connect(sock::socket_session *session)
void user_session_handler::on_connect(sock::socket_session<user_outbound_message> *session)
{
LOG_INFO << "User client connected " << session->address_ << ":" << session->port_;
LOG_INFO << "User client connected " << session->address << ":" << session->port;
// As soon as a user connects, we issue them a challenge message. We remember the
// challenge we issued and later verifies the user's response with it.
std::string msg;
std::string msgstr;
std::string challengehex;
usr::create_user_challenge(msg, challengehex);
usr::create_user_challenge(msgstr, challengehex);
// We init the session unique id to associate with the challenge.
session->init_uniqueid();
// Create an entry in pending_challenges for later tracking upon challenge response.
usr::pending_challenges[session->uniqueid_] = challengehex;
usr::pending_challenges[session->uniqueid] = challengehex;
session->send(std::move(msg));
user_outbound_message outmsg(std::move(msgstr));
session->send(std::move(outmsg));
// Set the challenge-issued flag to help later checks in on_message.
session->flags_.set(util::SESSION_FLAG::USER_CHALLENGE_ISSUED);
@@ -48,14 +60,16 @@ void user_session_handler::on_connect(sock::socket_session *session)
/**
* This gets hit every time we receive some data from a client connected to the HP public port.
*/
void user_session_handler::on_message(sock::socket_session *session, std::string &&message)
void user_session_handler::on_message(
sock::socket_session<user_outbound_message> *session,
std::string_view message)
{
// First check whether this session is pending challenge.
// Meaning we have previously issued a challenge to the client,
if (session->flags_[util::SESSION_FLAG::USER_CHALLENGE_ISSUED])
{
// The received message must be the challenge response. We need to verify it.
auto itr = usr::pending_challenges.find(session->uniqueid_);
auto itr = usr::pending_challenges.find(session->uniqueid);
if (itr != usr::pending_challenges.end())
{
std::string userpubkeyhex;
@@ -82,20 +96,20 @@ void user_session_handler::on_message(sock::socket_session *session, std::string
session->flags_.reset(util::SESSION_FLAG::USER_CHALLENGE_ISSUED); // Clear challenge-issued flag
session->flags_.set(util::SESSION_FLAG::USER_AUTHED); // Set the user-authed flag
usr::add_user(session, userpubkey); // Add the user to the global authed user list
usr::pending_challenges.erase(session->uniqueid_); // Remove the stored challenge
usr::pending_challenges.erase(session->uniqueid); // Remove the stored challenge
LOG_INFO << "User connection " << session->uniqueid_ << " authenticated. Public key "
<< userpubkeyhex;
LOG_INFO << "User connection " << session->uniqueid << " authenticated. Public key "
<< userpubkeyhex;
return;
}
else
{
LOG_INFO << "Duplicate user public key " << session->uniqueid_;
LOG_INFO << "Duplicate user public key " << session->uniqueid;
}
}
else
{
LOG_INFO << "Challenge verification failed " << session->uniqueid_;
LOG_INFO << "Challenge verification failed " << session->uniqueid;
}
}
}
@@ -105,7 +119,7 @@ void user_session_handler::on_message(sock::socket_session *session, std::string
// Check whether this user is among authenticated users
// and perform authenticated msg processing.
auto itr = usr::users.find(session->uniqueid_);
auto itr = usr::users.find(session->uniqueid);
if (itr != usr::users.end())
{
// This is an authed user.
@@ -121,30 +135,30 @@ void user_session_handler::on_message(sock::socket_session *session, std::string
// If for any reason we reach this point, we should drop the connection.
session->close();
LOG_INFO << "Dropped the user connection " << session->address_ << ":" << session->port_;
LOG_INFO << "Dropped the user connection " << session->address << ":" << session->port;
}
/**
* This gets hit every time a client disconnects from the HP public port.
*/
void user_session_handler::on_close(sock::socket_session *session)
void user_session_handler::on_close(sock::socket_session<user_outbound_message> *session)
{
// Cleanup any resources related to this session.
// Session is awaiting challenge response.
if (session->flags_[util::SESSION_FLAG::USER_CHALLENGE_ISSUED])
{
usr::pending_challenges.erase(session->uniqueid_);
usr::pending_challenges.erase(session->uniqueid);
}
// Session belongs to an authed user.
else if (session->flags_[util::SESSION_FLAG::USER_AUTHED])
{
// Wait for SC process completion before we remove existing user.
proc::await_contract_execution();
usr::remove_user(session->uniqueid_);
usr::remove_user(session->uniqueid);
}
LOG_INFO << "User disconnected " << session->uniqueid_;
LOG_INFO << "User disconnected " << session->uniqueid;
}
} // namespace usr

View File

@@ -1,3 +1,6 @@
#ifndef _HP_USER_SESSION_HANDLER_H_
#define _HP_USER_SESSION_HANDLER_H_
#include <boost/beast/core.hpp>
#include "../sock/socket_session_handler.hpp"
#include "../sock/socket_session.hpp"
@@ -5,12 +8,29 @@
namespace usr
{
class user_session_handler : public sock::socket_session_handler
/**
* Represents a message (bytes) that is sent to a user.
*/
class user_outbound_message : public sock::outbound_message
{
// Contains message bytes.
std::string msg;
public:
void on_connect(sock::socket_session *session);
void on_message(sock::socket_session *session, std::string &&message);
void on_close(sock::socket_session *session);
user_outbound_message(std::string &&_msg);
// Returns the buffer that should be written to the socket.
virtual std::string_view buffer();
};
} // namespace usr
class user_session_handler : public sock::socket_session_handler<user_outbound_message>
{
public:
void on_connect(sock::socket_session<user_outbound_message> *session);
void on_message(sock::socket_session<user_outbound_message> *session, std::string_view message);
void on_close(sock::socket_session<user_outbound_message> *session);
};
} // namespace usr
#endif

View File

@@ -202,9 +202,9 @@ int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string
* @param pubkey User's binary public key.
* @return 0 on successful additions. -1 on failure.
*/
int add_user(sock::socket_session *session, const std::string &pubkey)
int add_user(sock::socket_session<user_outbound_message> *session, const std::string &pubkey)
{
const std::string &sessionid = session->uniqueid_;
const std::string &sessionid = session->uniqueid;
if (users.count(sessionid) == 1)
{
LOG_INFO << sessionid << " already exist. Cannot add user.";
@@ -249,7 +249,7 @@ int remove_user(const std::string &sessionid)
void start_listening()
{
auto address = net::ip::make_address(conf::cfg.listenip);
std::make_shared<sock::socket_server>(
std::make_shared<sock::socket_server<user_outbound_message>>(
ioc,
tcp::endpoint{address, conf::cfg.pubport},
global_usr_session_handler)

View File

@@ -6,6 +6,7 @@
#include <unordered_map>
#include "../util.hpp"
#include "../sock/socket_session.hpp"
#include "user_session_handler.hpp"
/**
* Maintains the global user list with pending input outputs and manages user connections.
@@ -27,12 +28,12 @@ struct connected_user
// Holds the websocket session of this user.
// We don't need to own the session object since the lifetime of user and session are coupled.
sock::socket_session *session;
sock::socket_session<user_outbound_message> *session;
/**
* @param _pubkey The public key of the user in binary format.
*/
connected_user(sock::socket_session *_session, std::string_view _pubkey)
connected_user(sock::socket_session<user_outbound_message> *_session, std::string_view _pubkey)
{
session = _session;
pubkey = _pubkey;
@@ -64,7 +65,7 @@ void create_user_challenge(std::string &msg, std::string &challengehex);
int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string_view response, std::string_view original_challenge);
int add_user(sock::socket_session *session, const std::string &pubkey);
int add_user(sock::socket_session<user_outbound_message> *session, const std::string &pubkey);
int remove_user(const std::string &sessionid);