Added read message max size in socket communication (#42)

* Added peermaxmsg size to config

* Completed inital implementation

* Completed adding message max size

* Refactored code

* Refactored code to have data types using std library

* Added peer maxmpm

* Added const to session_options
This commit is contained in:
Ravidu Lashan
2019-10-25 11:34:54 +05:30
committed by GitHub
parent 40358890af
commit 46d114f44b
8 changed files with 107 additions and 57 deletions

View File

@@ -93,7 +93,9 @@ int create_contract()
cfg.roundtime = 1000;
cfg.pubport = 8080;
cfg.pubmaxsize = 65536;
cfg.pubmaxcpm = 100;
cfg.pubmaxmpm = 100;
cfg.peermaxsize = 65536;
cfg.peermaxmpm = 1000;
#ifndef NDEBUG
cfg.loglevel = "debug";
@@ -197,9 +199,9 @@ int load_config()
boost::split(splitted_peers, ipport_concat, boost::is_any_of(":"));
if (splitted_peers.size() == 2)
{
// Push the peer address and the port to peers array
cfg.peers.emplace(std::make_pair(ipport_concat, std::make_pair(splitted_peers.front(), splitted_peers.back())));
splitted_peers.clear();
// Push the peer address and the port to peers array
cfg.peers.emplace(std::make_pair(ipport_concat, std::make_pair(splitted_peers.front(), splitted_peers.back())));
splitted_peers.clear();
}
}
@@ -224,7 +226,9 @@ int load_config()
cfg.roundtime = d["roundtime"].GetInt();
cfg.pubport = d["pubport"].GetInt();
cfg.pubmaxsize = d["pubmaxsize"].GetInt();
cfg.pubmaxcpm = d["pubmaxcpm"].GetInt();
cfg.pubmaxmpm = d["pubmaxmpm"].GetInt();
cfg.peermaxsize = d["peermaxsize"].GetInt();
cfg.peermaxmpm = d["peermaxmpm"].GetInt();
cfg.loglevel = d["loglevel"].GetString();
cfg.loggers.clear();
@@ -236,7 +240,7 @@ int load_config()
return -1;
return 0;
}
}
/**
* Saves the current values of the 'cfg' struct into the config file.
*
@@ -287,7 +291,9 @@ int save_config()
d.AddMember("roundtime", cfg.roundtime, allocator);
d.AddMember("pubport", cfg.pubport, allocator);
d.AddMember("pubmaxsize", cfg.pubmaxsize, allocator);
d.AddMember("pubmaxcpm", cfg.pubmaxcpm, allocator);
d.AddMember("pubmaxmpm", cfg.pubmaxmpm, allocator);
d.AddMember("peermaxsize", cfg.peermaxsize, allocator);
d.AddMember("peermaxmpm", cfg.peermaxmpm, allocator);
d.AddMember("loglevel", rapidjson::StringRef(cfg.loglevel.data()), allocator);
rapidjson::Value loggers(rapidjson::kArrayType);
@@ -390,8 +396,8 @@ int validate_config()
// Other required fields.
if (cfg.binary.empty() || cfg.listenip.empty() ||
cfg.peerport == 0 || cfg.roundtime == 0 || cfg.pubport == 0 || cfg.pubmaxsize == 0 || cfg.pubmaxcpm == 0 ||
cfg.loglevel.empty() || cfg.loggers.empty())
cfg.peerport == 0 || cfg.roundtime == 0 || cfg.pubport == 0 || cfg.pubmaxsize == 0 || cfg.pubmaxmpm == 0 || cfg.peermaxsize == 0 ||
cfg.peermaxmpm == 0 || cfg.loglevel.empty() || cfg.loggers.empty())
{
std::cout << "Required configuration fields missing at " << ctx.configFile << std::endl;
return -1;
@@ -447,11 +453,11 @@ int validate_contract_dir_paths()
{
if (!std::experimental::filesystem::exists(path))
{
if(path == ctx.tlsKeyFile || path == ctx.tlsCertFile)
if (path == ctx.tlsKeyFile || path == ctx.tlsCertFile)
{
std::cout << path << " does not exist. Please provide self-signed certificates. Can generate using command\n" <<
"openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -keyout key.pem -out cert.pem\n" <<
"and add it to "+ ctx.configDir;
std::cout << path << " does not exist. Please provide self-signed certificates. Can generate using command\n"
<< "openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -keyout key.pem -out cert.pem\n"
<< "and add it to " + ctx.configDir;
}
else
{
@@ -476,8 +482,8 @@ int is_schema_valid(rapidjson::Document &d)
"{"
"\"type\": \"object\","
"\"required\": [ \"version\", \"pubkeyhex\", \"seckeyhex\", \"binary\", \"binargs\", \"listenip\""
", \"peers\", \"unl\", \"peerport\", \"roundtime\", \"pubport\", \"pubmaxsize\", \"pubmaxcpm\""
", \"loglevel\", \"loggers\" ],"
", \"peers\", \"unl\", \"peerport\", \"roundtime\", \"pubport\", \"pubmaxsize\", \"pubmaxmpm\""
", \"peermaxsize\", \"peermaxmpm\", \"loglevel\", \"loggers\" ],"
"\"properties\": {"
"\"version\": { \"type\": \"string\" },"
"\"pubkeyhex\": { \"type\": \"string\" },"
@@ -497,7 +503,9 @@ int is_schema_valid(rapidjson::Document &d)
"\"roundtime\": { \"type\": \"integer\" },"
"\"pubport\": { \"type\": \"integer\" },"
"\"pubmaxsize\": { \"type\": \"integer\" },"
"\"pubmaxcpm\": { \"type\": \"integer\" },"
"\"pubmaxmpm\": { \"type\": \"integer\" },"
"\"peermaxsize\": { \"type\": \"integer\" },"
"\"peermaxmpm\": { \"type\": \"integer\" },"
"\"loglevel\": { \"type\": \"string\" },"
"\"loggers\": {"
"\"type\": \"array\","

View File

@@ -41,21 +41,23 @@ struct contract_config
// Config elements which are loaded from the config file.
std::string pubkeyhex; // Contract hex public key
std::string seckeyhex; // Contract hex secret key
std::string keytype; // Key generation algorithm used by libsodium
std::string binary; // Full path to the contract binary
std::string binargs; // CLI arguments to pass to the contract binary
std::string listenip; // The IPs to listen on for incoming connections
std::unordered_map<std::string, ip_port_pair> peers; // Map of peers keyed by "<ip address>:<port>" concatenated format
std::unordered_set<std::string> unl; // Unique node list (list of binary public keys)
std::uint16_t peerport; // Listening port for peer connections
int roundtime; // Consensus round time in ms
std::uint16_t pubport; // Listening port for public user connections
int pubmaxsize; // User message max size in bytes
int pubmaxcpm; // User message rate
std::string loglevel; // Log severity level (debug, info, warn, error)
std::unordered_set<std::string> loggers; // List of enabled loggers (console, file)
std::string pubkeyhex; // Contract hex public key
std::string seckeyhex; // Contract hex secret key
std::string keytype; // Key generation algorithm used by libsodium
std::string binary; // Full path to the contract binary
std::string binargs; // CLI arguments to pass to the contract binary
std::string listenip; // The IPs to listen on for incoming connections
std::unordered_map<std::string, ip_port_pair> peers; // Map of peers keyed by "<ip address>:<port>" concatenated format
std::unordered_set<std::string> unl; // Unique node list (list of binary public keys)
std::uint16_t peerport; // Listening port for peer connections
std::uint16_t roundtime; // Consensus round time in ms
std::uint16_t pubport; // Listening port for public user connections
std::uint64_t pubmaxsize; // User message max size in bytes
std::uint16_t pubmaxmpm; // User message rate (in minutes)
std::uint64_t peermaxsize; // Peer message max size in bytes
std::uint64_t peermaxmpm; // Peer message rate (in minutes)
std::string loglevel; // Log severity level (debug, info, warn, error)
std::unordered_set<std::string> loggers; // List of enabled loggers (console, file)
};
// Global contract context struct exposed to the application.

View File

@@ -42,6 +42,11 @@ std::thread peer_watchdog_thread;
*/
std::thread peer_thread;
/**
* Used to pass down the default settings to the socket session
*/
sock::session_options sess_opts;
std::map<std::string, time_t> recent_peer_msghash;
int init()
@@ -56,12 +61,17 @@ void start_peer_connections()
{
auto address = net::ip::make_address(conf::cfg.listenip);
//setting up the message max size. Retrieve it from config
// At the moment same settings are used to initialize a new server and client
sess_opts.max_message_size = conf::cfg.peermaxsize;
// Start listening to peers
std::make_shared<sock::socket_server<peer_outbound_message>>(
ioc,
ctx,
tcp::endpoint{address, conf::cfg.peerport},
global_peer_session_handler)
global_peer_session_handler,
sess_opts)
->run();
LOG_INFO << "Started listening for incoming peer connections on " << conf::cfg.listenip << ":" << conf::cfg.peerport;
@@ -84,7 +94,7 @@ void peer_connection_watchdog()
if (peer_connections.find(v.first) == peer_connections.end())
{
LOG_DBG << "Trying to connect :" << v.second.first << ":" << v.second.second;
std::make_shared<sock::socket_client<peer_outbound_message>>(ioc, ctx, global_peer_session_handler)
std::make_shared<sock::socket_client<peer_outbound_message>>(ioc, ctx, global_peer_session_handler, sess_opts)
->run(v.second.first, v.second.second);
}
}

View File

@@ -11,7 +11,7 @@
namespace p2p
{
peer_outbound_message::peer_outbound_message(
std::shared_ptr<flatbuffers::FlatBufferBuilder> _fbbuilder_ptr)
{

View File

@@ -26,6 +26,7 @@ class socket_client : public std::enable_shared_from_this<socket_client<T>>
std::string host; // address of the server in which the client connects
std::string port; // port of the server in which client connects
socket_session_handler<T> &sess_handler; // handler passed to gain access to websocket events
const session_options &sess_opts; // session options needed to pass to session
void on_resolve(error ec, tcp::resolver::results_type results);
@@ -39,15 +40,15 @@ class socket_client : public std::enable_shared_from_this<socket_client<T>>
public:
// Resolver and socket require an io_context
socket_client(net::io_context &ioc, ssl::context &ctx, socket_session_handler<T> &session_handler);
socket_client(net::io_context &ioc, ssl::context &ctx, socket_session_handler<T> &session_handler, const session_options &session_options);
//Entry point to the client which requires an active host and port
void run(std::string_view host, std::string_view port);
};
template <class T>
socket_client<T>::socket_client(net::io_context &ioc, ssl::context &ctx, socket_session_handler<T> &session_handler)
: resolver(net::make_strand(ioc)), ws(net::make_strand(ioc), ctx), sess_handler(session_handler)
socket_client<T>::socket_client(net::io_context &ioc, ssl::context &ctx, socket_session_handler<T> &session_handler, const session_options &session_options)
: resolver(net::make_strand(ioc)), ws(net::make_strand(ioc), ctx), sess_handler(session_handler), sess_opts(session_options)
{
}
@@ -102,7 +103,7 @@ void socket_client<T>::on_connect(error ec, tcp::resolver::results_type::endpoin
//Creates a new socket session object
std::make_shared<socket_session<T>>(
std::move(ws), sess_handler)
->run(std::move(host), std::move(port), false);
->run(std::move(host), std::move(port), false, sess_opts);
}
}

View File

@@ -21,25 +21,26 @@ namespace sock
template <class T>
class socket_server : public std::enable_shared_from_this<socket_server<T>>
{
tcp::acceptor acceptor; // acceptor which accepts new connections
net::io_context &ioc; // socket in which the client connects
ssl::context &ctx; // ssl context which provides support for tls
tcp::acceptor acceptor; // acceptor which accepts new connections
net::io_context &ioc; // socket in which the client connects
ssl::context &ctx; // ssl context which provides support for tls
socket_session_handler<T> &sess_handler; // handler passed to gain access to websocket events
const session_options &sess_opts; // session options needed to pass to session
void fail(error_code ec, char const *what);
void on_accept(error_code ec, tcp::socket socket);
public:
socket_server(net::io_context &ioc, ssl::context &ctx, tcp::endpoint endpoint, socket_session_handler<T> &session_handler);
socket_server(net::io_context &ioc, ssl::context &ctx, tcp::endpoint endpoint, socket_session_handler<T> &session_handler, const session_options &session_options);
// Start accepting incoming connections
void run();
};
template <class T>
socket_server<T>::socket_server(net::io_context &ioc, ssl::context &ctx, tcp::endpoint endpoint, socket_session_handler<T> &session_handler)
: acceptor(net::make_strand(ioc)), ioc(ioc), ctx(ctx), sess_handler(session_handler)
socket_server<T>::socket_server(net::io_context &ioc, ssl::context &ctx, tcp::endpoint endpoint, socket_session_handler<T> &session_handler, const session_options &session_options)
: acceptor(net::make_strand(ioc)), ioc(ioc), ctx(ctx), sess_handler(session_handler), sess_opts(session_options)
{
error_code ec;
@@ -83,7 +84,7 @@ socket_server<T>::socket_server(net::io_context &ioc, ssl::context &ctx, tcp::en
template <class T>
void socket_server<T>::run()
{
// Adding ssl context options disallowing requests which supports sslv2 and sslv3 which have security vulnerabilitis
// Adding ssl context options disallowing requests which supports sslv2 and sslv3 which have security vulnerabilitis
ctx.set_options(
boost::asio::ssl::context::default_workarounds |
boost::asio::ssl::context::no_sslv2 |
@@ -117,17 +118,13 @@ void socket_server<T>::on_accept(error_code ec, tcp::socket socket)
}
else
{
std::string port = std::to_string(socket.remote_endpoint().port());
std::string address = socket.remote_endpoint().address().to_string();
//Creating websocket stream required to pass to initiate a new session
websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws(std::move(socket), ctx);
// Launch a new session for this connection
std::make_shared<socket_session<T>>(
std::move(ws), sess_handler)
->run(std::move(address), std::move(port), true);
->run(std::to_string(socket.remote_endpoint().port()), socket.remote_endpoint().address().to_string(), true, sess_opts);
}
// Accept another connection

View File

@@ -14,7 +14,6 @@
#include "../util.hpp"
#include "../hplog.hpp"
namespace beast = boost::beast;
namespace net = boost::asio;
namespace websocket = boost::beast::websocket;
@@ -41,6 +40,12 @@ public:
virtual std::string_view buffer() = 0;
};
// Use this to feed the session with default options from the config file
struct session_options
{
std::uint64_t max_message_size;
};
//Forward Declaration
template <class T>
class socket_session_handler;
@@ -91,10 +96,12 @@ public:
// Setting and reading flags to this is completely managed by user-code.
std::bitset<8> flags;
void run(const std::string &&address, const std::string &&port, bool is_server_session);
void run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts);
void send(T msg);
void set_message_max_size(std::uint64_t size);
// When called, initializes the unique id string for this session.
void init_uniqueid();
@@ -112,15 +119,31 @@ socket_session<T>::socket_session(websocket::stream<beast::ssl_stream<beast::tcp
template <class T>
socket_session<T>::~socket_session()
{
sess_handler.on_close(this);
sess_handler.on_close(this);
}
/**
* Sets the largest permissible incoming message size. If exceeds over this limit will cause a
* protocol failure
*/
template <class T>
void socket_session<T>::set_message_max_size(std::uint64_t size)
{
ws.read_message_max(size);
}
//port and address will be used to identify from which remote party the message recieved in the handler
template <class T>
void socket_session<T>::run(const std::string &&address, const std::string &&port, bool is_server_session)
void socket_session<T>::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts)
{
ssl::stream_base::handshake_type handshake_type = ssl::stream_base::client;
if (sess_opts.max_message_size > 0)
{
// Setting maximum file size
set_message_max_size(sess_opts.max_message_size);
}
if (is_server_session)
{
/**
@@ -199,7 +222,7 @@ void socket_session<T>::on_accept(error_code ec)
if (ec)
return fail(ec, "accept");
sess_handler.on_connect(this);
sess_handler.on_connect(this);
// Read a message
ws.async_read(
@@ -235,7 +258,7 @@ void socket_session<T>::on_read(error_code ec, std::size_t)
// read and process the message and we will clear the buffer after its done with it.
const char *buffer_data = net::buffer_cast<const char *>(buffer.data());
std::string_view message(buffer_data, buffer.size());
sess_handler.on_message(this, message);
sess_handler.on_message(this, message);
// Clear the buffer
buffer.consume(buffer.size());

View File

@@ -53,6 +53,11 @@ net::io_context ioc;
*/
std::thread listener_thread;
/**
* Used to pass down the default settings to the socket session
*/
sock::session_options sess_opts;
// Challenge response fields.
// These fields are used on challenge response validation.
static const char *CHALLENGE_RESP_TYPE = "type";
@@ -251,13 +256,17 @@ int remove_user(const std::string &sessionid)
*/
void start_listening()
{
auto address = net::ip::make_address(conf::cfg.listenip);
sess_opts.max_message_size = conf::cfg.pubmaxsize;
std::make_shared<sock::socket_server<user_outbound_message>>(
ioc,
ctx,
tcp::endpoint{address, conf::cfg.pubport},
global_usr_session_handler)
global_usr_session_handler,
sess_opts)
->run();
listener_thread = std::thread([&] { ioc.run(); });