Added bad and duplicate message counters to socket_session (#55)

This commit is contained in:
Ravin Perera
2019-11-07 06:57:34 +05:30
committed by GitHub
parent 8a153d5bb5
commit d16b43406b
13 changed files with 210 additions and 215 deletions

View File

@@ -43,7 +43,6 @@ add_library(hpsock
src/sock/socket_client.cpp
src/sock/socket_server.cpp
src/sock/socket_message.cpp
src/sock/socket_monitor.cpp
src/sock/socket_session.cpp
src/sock/socket_session_lambda.cpp
)

View File

@@ -27,8 +27,8 @@ int init()
return -1;
// Append self peer to peer list.
std::string portstr = std::to_string(cfg.peerport);
std::string peerid = "0.0.0.0:" + portstr;
const std::string portstr = std::to_string(cfg.peerport);
const std::string peerid = "0.0.0.0:" + portstr;
cfg.peers.emplace(std::move(peerid), std::make_pair("0.0.0.0", portstr));
// Append self pubkey to unl list.
@@ -88,10 +88,6 @@ int create_contract()
cfg.peerport = 22860;
cfg.roundtime = 1000;
cfg.pubport = 8080;
cfg.pubmaxsize = 0;
cfg.pubmaxcpm = 0;
cfg.peermaxsize = 0;
cfg.peermaxcpm = 0;
#ifndef NDEBUG
cfg.loglevel = "debug";
@@ -221,10 +217,16 @@ int load_config()
cfg.peerport = d["peerport"].GetInt();
cfg.roundtime = d["roundtime"].GetInt();
cfg.pubport = d["pubport"].GetInt();
cfg.pubmaxsize = d["pubmaxsize"].GetInt();
cfg.pubmaxcpm = d["pubmaxcpm"].GetInt();
cfg.peermaxsize = d["peermaxsize"].GetInt();
cfg.peermaxcpm = d["peermaxcpm"].GetInt();
cfg.pubmaxsize = d["pubmaxsize"].GetUint64();
cfg.pubmaxcpm = d["pubmaxcpm"].GetUint64();
cfg.pubmaxbadmpm = d["pubmaxbadmpm"].GetUint64();
cfg.peermaxsize = d["peermaxsize"].GetUint64();
cfg.peermaxcpm = d["peermaxcpm"].GetUint64();
cfg.peermaxdupmpm = d["peermaxdupmpm"].GetUint64();
cfg.peermaxbadmpm = d["peermaxbadmpm"].GetUint64();
cfg.peermaxbadsigpm = d["peermaxbadsigpm"].GetUint64();
cfg.loglevel = d["loglevel"].GetString();
cfg.loggers.clear();
@@ -257,7 +259,7 @@ int save_config()
d.AddMember("listenip", rapidjson::StringRef(cfg.listenip.data()), allocator);
rapidjson::Value peers(rapidjson::kArrayType);
for (auto &[ipport_concat, ipport_pair] : cfg.peers)
for (const auto &[ipport_concat, ipport_pair] : cfg.peers)
{
rapidjson::Value v;
v.SetString(rapidjson::StringRef(ipport_concat.data()), allocator);
@@ -266,7 +268,7 @@ int save_config()
d.AddMember("peers", peers, allocator);
rapidjson::Value unl(rapidjson::kArrayType);
for (auto &nodepk : cfg.unl)
for (const auto &nodepk : cfg.unl)
{
rapidjson::Value v;
std::string hex_pubkey;
@@ -286,14 +288,20 @@ int save_config()
d.AddMember("peerport", cfg.peerport, allocator);
d.AddMember("roundtime", cfg.roundtime, allocator);
d.AddMember("pubport", cfg.pubport, allocator);
d.AddMember("pubmaxsize", cfg.pubmaxsize, allocator);
d.AddMember("pubmaxcpm", cfg.pubmaxcpm, allocator);
d.AddMember("pubmaxbadmpm", cfg.pubmaxbadmpm, allocator);
d.AddMember("peermaxsize", cfg.peermaxsize, allocator);
d.AddMember("peermaxcpm", cfg.peermaxcpm, allocator);
d.AddMember("peermaxdupmpm", cfg.peermaxdupmpm, allocator);
d.AddMember("peermaxbadmpm", cfg.peermaxbadmpm, allocator);
d.AddMember("peermaxbadsigpm", cfg.peermaxbadsigpm, allocator);
d.AddMember("loglevel", rapidjson::StringRef(cfg.loglevel.data()), allocator);
rapidjson::Value loggers(rapidjson::kArrayType);
for (const std::string &logger : cfg.loggers)
for (std::string_view logger : cfg.loggers)
{
rapidjson::Value v;
v.SetString(rapidjson::StringRef(logger.data()), allocator);
@@ -424,8 +432,8 @@ int validate_config()
}
//Sign and verify a sample message to ensure we have a matching signing key pair.
std::string msg = "hotpocket";
std::string sighex = crypto::sign_hex(msg, cfg.seckeyhex);
const std::string msg = "hotpocket";
const std::string sighex = crypto::sign_hex(msg, cfg.seckeyhex);
if (crypto::verify_hex(msg, sighex, cfg.pubkeyhex) != 0)
{
std::cout << "Invalid signing keys. Run with 'rekey' to generate new keys.\n";
@@ -442,9 +450,9 @@ int validate_config()
*/
int validate_contract_dir_paths()
{
std::string paths[6] = {ctx.contractDir, ctx.configFile, ctx.histDir, ctx.stateDir, ctx.tlsKeyFile, ctx.tlsCertFile};
const std::string paths[6] = {ctx.contractDir, ctx.configFile, ctx.histDir, ctx.stateDir, ctx.tlsKeyFile, ctx.tlsCertFile};
for (std::string &path : paths)
for (const std::string &path : paths)
{
if (!boost::filesystem::exists(path))
{
@@ -471,14 +479,15 @@ int validate_contract_dir_paths()
*
* @return 0 for successful validation. -1 for failure.
*/
int is_schema_valid(rapidjson::Document &d)
int is_schema_valid(const rapidjson::Document &d)
{
const char *cfg_schema =
"{"
"\"type\": \"object\","
"\"required\": [ \"version\", \"pubkeyhex\", \"seckeyhex\", \"binary\", \"binargs\", \"listenip\""
", \"peers\", \"unl\", \"peerport\", \"roundtime\", \"pubport\", \"pubmaxsize\", \"pubmaxcpm\""
", \"peermaxsize\", \"peermaxcpm\", \"loglevel\", \"loggers\" ],"
", \"pubmaxbadmpm\", \"peermaxsize\", \"peermaxcpm\""
", \"peermaxdupmpm\", \"peermaxbadmpm\", \"peermaxbadsigpm\", \"loglevel\", \"loggers\" ],"
"\"properties\": {"
"\"version\": { \"type\": \"string\" },"
"\"pubkeyhex\": { \"type\": \"string\" },"
@@ -497,10 +506,17 @@ int is_schema_valid(rapidjson::Document &d)
"\"peerport\": { \"type\": \"integer\" },"
"\"roundtime\": { \"type\": \"integer\" },"
"\"pubport\": { \"type\": \"integer\" },"
"\"pubmaxsize\": { \"type\": \"integer\" },"
"\"pubmaxcpm\": { \"type\": \"integer\" },"
"\"pubmaxbadmpm\": { \"type\": \"integer\" },"
"\"peermaxsize\": { \"type\": \"integer\" },"
"\"peermaxcpm\": { \"type\": \"integer\" },"
"\"peermaxdupmpm\": { \"type\": \"integer\" },"
"\"peermaxbadmpm\": { \"type\": \"integer\" },"
"\"peermaxbadsigpm\": { \"type\": \"integer\" },"
"\"loglevel\": { \"type\": \"string\" },"
"\"loggers\": {"
"\"type\": \"array\","

View File

@@ -46,13 +46,20 @@ struct contract_config
std::string listenip; // The IPs to listen on for incoming connections
std::unordered_map<std::string, ip_port_pair> peers; // Map of peers keyed by "<ip address>:<port>" concatenated format
std::unordered_set<std::string> unl; // Unique node list (list of binary public keys)
uint16_t peerport; // Listening port for peer connections
uint16_t roundtime; // Consensus round time in ms
uint16_t pubport; // Listening port for public user connections
uint64_t pubmaxsize; // User message max size in bytes
uint16_t pubmaxcpm; // User message rate (characters(bytes) per minute)
uint64_t peermaxsize; // Peer message max size in bytes
uint64_t peermaxcpm; // Peer message rate (characters(bytes) per minute)
uint16_t peerport; // Listening port for peer connections
uint16_t roundtime; // Consensus round time in ms
uint16_t pubport; // Listening port for public user connections
uint64_t pubmaxsize; // User message max size in bytes
uint64_t pubmaxcpm; // User message rate (characters(bytes) per minute)
uint64_t pubmaxbadmpm; // User bad messages per minute
uint64_t peermaxsize; // Peer message max size in bytes
uint64_t peermaxcpm; // Peer message rate (characters(bytes) per minute)
uint64_t peermaxdupmpm; // Peer max duplicate messages per minute
uint64_t peermaxbadmpm; // Peer bad messages per minute
uint64_t peermaxbadsigpm; // Peer bad signatures per minute
std::string loglevel; // Log severity level (debug, info, warn, error)
std::unordered_set<std::string> loggers; // List of enabled loggers (console, file)
};
@@ -83,7 +90,7 @@ int validate_config();
int validate_contract_dir_paths();
int is_schema_valid(rapidjson::Document &d);
int is_schema_valid(const rapidjson::Document &d);
int binpair_to_hex();

View File

@@ -53,7 +53,7 @@ std::thread peer_thread;
/**
* Used to pass down the default settings to the socket session
*/
sock::session_options sess_opts;
sock::session_options default_sess_opts;
int init()
{
@@ -67,10 +67,12 @@ void start_peer_connections()
{
auto address = net::ip::make_address(conf::cfg.listenip);
//setting up the message max size. Retrieve it from config
// At the moment same settings are used to initialize a new server and client
sess_opts.max_message_size = conf::cfg.peermaxsize;
sess_opts.max_bytes_per_minute = conf::cfg.peermaxcpm;
// Setting up the message max size. Retrieve it from config
default_sess_opts.max_socket_read_len = conf::cfg.peermaxsize;
default_sess_opts.max_rawbytes_per_minute = conf::cfg.peermaxcpm;
default_sess_opts.max_dupmsgs_per_minute = conf::cfg.peermaxdupmpm;
default_sess_opts.max_badmsgs_per_minute = conf::cfg.peermaxbadmpm;
default_sess_opts.max_badsigmsgs_per_minute = conf::cfg.peermaxbadsigpm;
// Start listening to peers
std::make_shared<sock::socket_server<peer_outbound_message>>(
@@ -78,7 +80,7 @@ void start_peer_connections()
ctx,
tcp::endpoint{address, conf::cfg.peerport},
global_peer_session_handler,
sess_opts)
default_sess_opts)
->run();
LOG_INFO << "Started listening for incoming peer connections on " << conf::cfg.listenip << ":" << conf::cfg.peerport;
@@ -101,7 +103,7 @@ void peer_connection_watchdog()
if (peer_connections.find(v.first) == peer_connections.end())
{
LOG_DBG << "Trying to connect :" << v.second.first << ":" << v.second.second;
std::make_shared<sock::socket_client<peer_outbound_message>>(ioc, ctx, global_peer_session_handler, sess_opts)
std::make_shared<sock::socket_client<peer_outbound_message>>(ioc, ctx, global_peer_session_handler, default_sess_opts)
->run(v.second.first, v.second.second);
}
}

View File

@@ -7,6 +7,7 @@
#include "../fbschema/p2pmsg_content_generated.h"
#include "../fbschema/p2pmsg_helpers.hpp"
#include "../sock/socket_message.hpp"
#include "../sock/socket_session.hpp"
#include "p2p.hpp"
#include "peer_session_handler.hpp"
@@ -23,14 +24,10 @@ util::rollover_hashset recent_peermsg_hashes(200);
*/
void peer_session_handler::on_connect(sock::socket_session<peer_outbound_message> *session)
{
if (!session->flags[util::SESSION_FLAG::INBOUND])
if (!session->flags[sock::SESSION_FLAG::INBOUND])
{
// We init the session unique id to associate with the peer.
session->init_uniqueid();
{
std::lock_guard<std::mutex> lock(p2p::peer_connections_mutex);
peer_connections.insert(std::make_pair(session->uniqueid, session));
}
std::lock_guard<std::mutex> lock(p2p::peer_connections_mutex);
peer_connections.insert(std::make_pair(session->uniqueid, session));
LOG_DBG << "Adding peer to list: " << session->uniqueid;
}
}
@@ -56,6 +53,7 @@ void peer_session_handler::on_message(sock::socket_session<peer_outbound_message
if (!recent_peermsg_hashes.try_emplace(crypto::get_hash(message)))
{
session->increment_metric(sock::SESSION_THRESHOLDS::MAX_DUPMSGS_PER_MINUTE, 1);
LOG_DBG << "Duplicate peer message.";
return;
}
@@ -67,6 +65,7 @@ void peer_session_handler::on_message(sock::socket_session<peer_outbound_message
// We only trust proposals coming from trusted peers.
if (p2pmsg::validate_container_trust(container) != 0)
{
session->increment_metric(sock::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1);
LOG_DBG << "Proposal rejected due to trust failure.";
return;
}
@@ -91,9 +90,8 @@ void peer_session_handler::on_message(sock::socket_session<peer_outbound_message
}
else
{
//warn received invalid message from peer.
session->increment_metric(sock::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1);
LOG_DBG << "Received invalid message type from peer";
//TODO: remove/penalize node who sent the message.
}
}

View File

@@ -1,31 +0,0 @@
#include "socket_monitor.hpp"
#include "../p2p/peer_session_handler.hpp"
#include "../usr/user_session_handler.hpp"
namespace sock
{
/**
* Act upon exceeding various thresholds in socket communication
*
* @param threshold Type of threshold which has exceeded.
* @param threshold_limit Threshold limit at the time of exceedance.
* @param session Websocket session which exceeds the threshold.
*/
template <class T>
void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session<T> *session)
{
if (threshold == util::SESSION_THRESHOLDS::MAX_BYTES_PER_MINUTE)
{
// Can act accordingly
session->close();
}
}
/**
* Declaring templates with possible values for T because keeping all those in hpp file makes compile take a long time
*/
template void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session<p2p::peer_outbound_message> *session);
template void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session<usr::user_outbound_message> *session);
} // namespace sock

View File

@@ -1,13 +0,0 @@
#ifndef _HP_SOCK_MONITOR_
#define _HP_SOCK_MONITOR_
#include "../util.hpp"
#include "socket_session.hpp"
namespace sock{
template <class T>
void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session<T> *session);
}
#endif

View File

@@ -1,11 +1,11 @@
#include "socket_session.hpp"
#include "socket_message.hpp"
#include "socket_monitor.hpp"
#include "socket_session_handler.hpp"
namespace sock
{
// Constructor
template <class T>
socket_session<T>::socket_session(websocket::stream<beast::ssl_stream<beast::tcp_stream>> websocket, socket_session_handler<T> &sess_handler)
: ws(std::move(websocket)), sess_handler(sess_handler)
@@ -14,18 +14,13 @@ socket_session<T>::socket_session(websocket::stream<beast::ssl_stream<beast::tcp
ws.binary(true);
}
template <class T>
socket_session<T>::~socket_session()
{
sess_handler.on_close(this);
}
/**
* Sets the largest permissible incoming message size. If exceeds over this limit will cause a
* protocol failure
* Sets the largest permissible incoming data length in a single receive. If exceeds over this limit will cause
* a protocol failure. Because this is internally handled by beast socket, we don't use socket_threshold struct
* to handle this.
*/
template <class T>
void socket_session<T>::set_message_max_size(uint64_t size)
void socket_session<T>::set_max_socket_read_len(uint64_t size)
{
ws.read_message_max(size);
}
@@ -34,28 +29,72 @@ void socket_session<T>::set_message_max_size(uint64_t size)
* Set thresholds to the socket session
*/
template <class T>
void socket_session<T>::set_threshold(util::SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint64_t intervalms)
void socket_session<T>::set_threshold(SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint32_t intervalms)
{
thresholds[threshold_type].counter_value = 0;
thresholds[threshold_type].intervalms = intervalms;
thresholds[threshold_type].threshold_limit = threshold_limit;
}
/*
* Increment the provided thresholds counter value with the provided amount and validate it against the
* configured threshold limit.
*/
template <class T>
void socket_session<T>::increment_metric(SESSION_THRESHOLDS threshold_type, uint64_t amount)
{
sock::session_threshold &t = thresholds[threshold_type];
// Ignore the counter if limit is set as 0.
if (t.threshold_limit == 0)
return;
uint64_t time_now = util::get_epoch_milliseconds();
t.counter_value += amount;
if (t.timestamp == 0)
{
// Reset counter timestamp.
t.timestamp = time_now;
}
else
{
// Check whether we have exceeded the threshold within the monitering interval.
auto elapsed_time = time_now - t.timestamp;
if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit)
{
t.timestamp = 0;
t.counter_value = 0;
LOG_INFO << "Session " << this->uniqueid << " threshold exceeded. (type:" << threshold_type << " limit:" << t.threshold_limit << ")";
this->close();
}
else if (elapsed_time > t.intervalms)
{
t.timestamp = time_now;
t.counter_value = amount;
}
}
}
//port and address will be used to identify from which remote party the message recieved in the handler
template <class T>
void socket_session<T>::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts)
{
if (sess_opts.max_message_size > 0)
if (sess_opts.max_socket_read_len > 0)
{
// Setting maximum file size
set_message_max_size(sess_opts.max_message_size);
// Setting maximum data size within a single message. This is handled within the beast socket.
set_max_socket_read_len(sess_opts.max_socket_read_len);
}
// Create new session_threshold and insert it to thresholds array
// Create new session_thresholds and insert it to thresholds vector.
// Have to maintain the SESSION_THRESHOLDS enum order in inserting new thresholds to thresholds vector
// since enum's value is used as index in the vector to update vector values
session_threshold max_byte_per_message_threshold{sess_opts.max_bytes_per_minute, 0, 0, 60000};
thresholds.push_back(std::move(max_byte_per_message_threshold));
// since enum's value is used as index in the vector to update vector values.
thresholds.reserve(4);
thresholds.push_back(session_threshold(sess_opts.max_rawbytes_per_minute, 60000));
thresholds.push_back(session_threshold(sess_opts.max_dupmsgs_per_minute, 60000));
thresholds.push_back(session_threshold(sess_opts.max_badsigmsgs_per_minute, 60000));
thresholds.push_back(session_threshold(sess_opts.max_badmsgs_per_minute, 60000));
ssl::stream_base::handshake_type handshake_type = ssl::stream_base::client;
if (is_server_session)
@@ -65,12 +104,17 @@ void socket_session<T>::run(const std::string &&address, const std::string &&por
* INBOUND true - when node acts as server
* INBOUND false (OUTBOUND) - when node acts as client
*/
flags.set(util::SESSION_FLAG::INBOUND);
flags.set(SESSION_FLAG::INBOUND);
handshake_type = ssl::stream_base::server;
}
this->port = port;
this->address = address;
this->port = std::move(port);
this->address = std::move(address);
// Create a unique id for the session combining ip and port.
// We prepare this appended string here because we need to use it as an identifier of the session in various places.
this->uniqueid.reserve(port.size() + address.size() + 1);
this->uniqueid.append(address).append(":").append(port);
// Set the timeout.
beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30));
@@ -92,7 +136,7 @@ void socket_session<T>::on_ssl_handshake(error_code ec)
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws).expires_never();
if (flags[util::SESSION_FLAG::INBOUND])
if (flags[SESSION_FLAG::INBOUND])
{
// Set suggested timeout settings for the websocket
ws.set_option(
@@ -149,7 +193,7 @@ void socket_session<T>::on_read(error_code ec, std::size_t)
return fail(ec, "read");
}
increment(util::SESSION_THRESHOLDS::MAX_BYTES_PER_MINUTE, buffer.size());
increment_metric(SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, buffer.size());
// Wrap the buffer data in a string_view and call session handler.
// We DO NOT transfer ownership of buffer data to the session handler. It should
@@ -165,46 +209,6 @@ void socket_session<T>::on_read(error_code ec, std::size_t)
ws_async_read();
}
/*
* Increment the provided thresholds counter value with the provided amount and validate it
*/
template <class T>
void socket_session<T>::increment(util::SESSION_THRESHOLDS threshold_type, uint64_t amount)
{
sock::session_threshold &t = thresholds[threshold_type];
// Ignore the counter if limit is set as 0.
if (t.threshold_limit == 0)
return;
uint64_t time_now = util::get_epoch_milliseconds();
t.counter_value += amount;
if (t.timestamp == 0)
{
t.timestamp = time_now;
}
else
{
auto elapsed_time = time_now - t.timestamp;
if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit)
{
t.timestamp = 0;
t.counter_value = 0;
LOG_INFO << "Session " << this->uniqueid << " threshold exceeded. (type:" << threshold_type << " limit:" << t.threshold_limit << ")";
// Invoke the threshold monitor so any actions will be performed.
threshold_monitor(threshold_type, t.threshold_limit, this);
}
else if (elapsed_time > t.intervalms)
{
t.timestamp = time_now;
t.counter_value = amount;
}
}
}
/*
* Send message through an active websocket connection
*/
@@ -269,16 +273,6 @@ void socket_session<T>::on_close(error_code ec, int8_t type)
return fail(ec, "close");
}
// When called, initializes the unique id string for this session.
template <class T>
void socket_session<T>::init_uniqueid()
{
// Create a unique id for the session combining ip and port.
// We prepare this appended string here because we need to use it for finding elemends from the maps
// for validation purposes whenever a message is received.
uniqueid.append(address).append(":").append(port);
}
/**
* Executes on error
*/
@@ -293,6 +287,12 @@ void socket_session<T>::fail(error_code ec, char const *what)
return;
}
template <class T>
socket_session<T>::~socket_session()
{
sess_handler.on_close(this);
}
// Template instantiations.
template class socket_session<p2p::peer_outbound_message>;
template class socket_session<usr::user_outbound_message>;

View File

@@ -15,6 +15,36 @@ using error_code = boost::system::error_code;
namespace sock
{
/**
* Set of flags used to mark status information on the session.
* usr and p2p subsystems makes use of this to mark status information of user and peer sessions.
* Set flags are stored in 'flags' bitset of socket_session.
*/
enum SESSION_FLAG
{
INBOUND = 0,
USER_CHALLENGE_ISSUED = 1,
USER_AUTHED = 2
};
/**
* Enum used to track down various thresholds used in socket communication.
*/
enum SESSION_THRESHOLDS
{
// Max incoming bytes per minute.
MAX_RAWBYTES_PER_MINUTE = 0,
// Max duplicate messages per minute.
MAX_DUPMSGS_PER_MINUTE = 1,
// Max messages with invalid signature per minute.
MAX_BADSIGMSGS_PER_MINUTE = 2,
// Max messages with bad structure per minute.
MAX_BADMSGS_PER_MINUTE = 3
};
/*
* Use this to keep in track of different thresholds which we need to deal with. e.g - maximum amount of bytes allowed per minute through a session
* threshold_limit - Maximum threshold value which is allowed
@@ -27,14 +57,23 @@ struct session_threshold
uint64_t threshold_limit;
uint64_t counter_value;
uint64_t timestamp;
uint64_t intervalms;
uint32_t intervalms;
session_threshold(uint64_t threshold_limit, uint32_t intervalms)
{
this->threshold_limit = threshold_limit;
this->intervalms = intervalms;
}
};
// Use this to feed the session with default options from the config file
struct session_options
{
uint64_t max_message_size;
uint64_t max_bytes_per_minute;
uint64_t max_socket_read_len;
uint64_t max_rawbytes_per_minute;
uint64_t max_dupmsgs_per_minute;
uint64_t max_badsigmsgs_per_minute;
uint64_t max_badmsgs_per_minute;
};
//Forward Declaration
@@ -51,7 +90,7 @@ class socket_session : public std::enable_shared_from_this<socket_session<T>>
websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws; // websocket stream used send an recieve messages
std::vector<T> queue; // used to store messages temporarily until it is sent to the relevant party
socket_session_handler<T> &sess_handler; // handler passed to gain access to websocket events
std::vector<session_threshold> thresholds; // track down various thresholdsls
std::vector<session_threshold> thresholds; // track down various communication thresholds
void fail(error_code ec, char const *what);
@@ -101,7 +140,7 @@ public:
// The unique identifier of the remote party (format <ip>:<port>).
std::string uniqueid;
// The set of util::SESSION_FLAG enum flags that will be set by user-code of this calss.
// The set of sock::SESSION_FLAG enum flags that will be set by user-code of this calss.
// We mainly use this to store contexual information about this session based on the use case.
// Setting and reading flags to this is completely managed by user-code.
std::bitset<8> flags;
@@ -110,14 +149,11 @@ public:
void send(T msg);
void set_message_max_size(uint64_t size);
void set_max_socket_read_len(uint64_t size);
void set_threshold(util::SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint64_t interval);
void set_threshold(SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint32_t intervalms);
void increment(util::SESSION_THRESHOLDS threshold_type, uint64_t amount);
// When called, initializes the unique id string for this session.
void init_uniqueid();
void increment_metric(SESSION_THRESHOLDS threshold_type, uint64_t amount);
void close();
};

View File

@@ -1,5 +1,4 @@
#include "../pchheader.hpp"
#include "../util.hpp"
#include "../hplog.hpp"
#include "../jsonschema/usrmsg_helpers.hpp"
#include "../sock/socket_session.hpp"
@@ -23,14 +22,11 @@ void user_session_handler::on_connect(sock::socket_session<user_outbound_message
// As soon as a user connects, we issue them a challenge message. We remember the
// challenge we issued and later verifies the user's response with it.
// We init the session unique id to associate with the challenge.
session->init_uniqueid();
user_outbound_message outmsg(issue_challenge(session->uniqueid));
session->send(std::move(outmsg));
// Set the challenge-issued flag to help later checks in on_message.
session->flags.set(util::SESSION_FLAG::USER_CHALLENGE_ISSUED);
session->flags.set(sock::SESSION_FLAG::USER_CHALLENGE_ISSUED);
}
/**
@@ -42,13 +38,13 @@ void user_session_handler::on_message(
{
// First check whether this session is pending challenge.
// Meaning we have previously issued a challenge to the client,
if (session->flags[util::SESSION_FLAG::USER_CHALLENGE_ISSUED])
if (session->flags[sock::SESSION_FLAG::USER_CHALLENGE_ISSUED])
{
if (verify_challenge(message, session) == 0)
return;
}
// Check whether this session belongs to an authenticated (challenge-verified) user.
else if (session->flags[util::SESSION_FLAG::USER_AUTHED])
else if (session->flags[sock::SESSION_FLAG::USER_AUTHED])
{
// Check whether this user is among authenticated users
// and perform authenticated msg processing.
@@ -58,16 +54,19 @@ void user_session_handler::on_message(
{
// This is an authed user.
connected_user &user = itr->second;
if (handle_user_message(user, message) == 0)
return;
LOG_DBG << "Bad message from user " << session->uniqueid;
// TODO: Increase session bad message count.
if (handle_user_message(user, message) != 0)
{
session->increment_metric(sock::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1);
LOG_DBG << "Bad message from user " << session->uniqueid;
}
}
else
{
session->increment_metric(sock::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1);
LOG_DBG << "User session id not found: " << session->uniqueid;
}
return;
}
// If for any reason we reach this point, we should drop the connection because none of the
@@ -84,11 +83,11 @@ void user_session_handler::on_close(sock::socket_session<user_outbound_message>
// Cleanup any resources related to this session.
// Session is awaiting challenge response.
if (session->flags[util::SESSION_FLAG::USER_CHALLENGE_ISSUED])
if (session->flags[sock::SESSION_FLAG::USER_CHALLENGE_ISSUED])
ctx.pending_challenges.erase(session->uniqueid);
// Session belongs to an authed user.
else if (session->flags[util::SESSION_FLAG::USER_AUTHED])
else if (session->flags[sock::SESSION_FLAG::USER_AUTHED])
remove_user(session->uniqueid);
LOG_INFO << "User disconnected " << session->uniqueid;

View File

@@ -1,6 +1,7 @@
#include "../pchheader.hpp"
#include "../jsonschema/usrmsg_helpers.hpp"
#include "../sock/socket_server.hpp"
#include "../sock/socket_session.hpp"
#include "../sock/socket_session_handler.hpp"
#include "../util.hpp"
#include "../conf.hpp"
@@ -81,8 +82,8 @@ int verify_challenge(std::string_view message, sock::socket_session<user_outboun
// All good. Unique public key.
// Promote the connection from pending-challenges to authenticated users.
session->flags.reset(util::SESSION_FLAG::USER_CHALLENGE_ISSUED); // Clear challenge-issued flag
session->flags.set(util::SESSION_FLAG::USER_AUTHED); // Set the user-authed flag
session->flags.reset(sock::SESSION_FLAG::USER_CHALLENGE_ISSUED); // Clear challenge-issued flag
session->flags.set(sock::SESSION_FLAG::USER_AUTHED); // Set the user-authed flag
add_user(session, userpubkey); // Add the user to the global authed user list
ctx.pending_challenges.erase(session->uniqueid); // Remove the stored challenge
@@ -203,15 +204,16 @@ void start_listening()
{
auto address = net::ip::make_address(conf::cfg.listenip);
listener_ctx.sess_opts.max_message_size = conf::cfg.pubmaxsize;
listener_ctx.sess_opts.max_bytes_per_minute = conf::cfg.pubmaxcpm;
listener_ctx.default_sess_opts.max_socket_read_len = conf::cfg.pubmaxsize;
listener_ctx.default_sess_opts.max_rawbytes_per_minute = conf::cfg.pubmaxcpm;
listener_ctx.default_sess_opts.max_badmsgs_per_minute = conf::cfg.pubmaxbadmpm;
std::make_shared<sock::socket_server<user_outbound_message>>(
listener_ctx.ioc,
listener_ctx.ssl_ctx,
tcp::endpoint{address, conf::cfg.pubport},
listener_ctx.global_usr_session_handler,
listener_ctx.sess_opts)
listener_ctx.default_sess_opts)
->run();
listener_ctx.listener_thread = std::thread([&] { listener_ctx.ioc.run(); });

View File

@@ -79,7 +79,7 @@ struct listener_context
std::thread listener_thread;
// Used to pass down the default settings to the socket session
sock::session_options sess_opts;
sock::session_options default_sess_opts;
};
int init();

View File

@@ -13,36 +13,16 @@ namespace util
// Hot Pocket version. Displayed on 'hotpocket version' and written to new contract configs.
static const char *HP_VERSION = "0.1";
// Current version of the peer message protocol.
static const int PEERMSG_VERSION = 1;
// Minimum compatible contract config version (this will be used to validate contract configs)
static const char *MIN_CONTRACT_VERSION = "0.1";
// Current version of the peer message protocol.
static const int PEERMSG_VERSION = 1;
// Minimum compatible peer message version (this will be used to accept/reject incoming peer connections)
// (Keeping this as int for effcient msg payload and comparison)
static const int MIN_PEERMSG_VERSION = 1;
/**
* Set of flags used to mark status information on the session.
* usr and p2p subsystems makes use of this to mark status information of user and peer sessions.
* Set flags are stored in 'flags_' bitset.
*/
enum SESSION_FLAG
{
INBOUND = 0,
USER_CHALLENGE_ISSUED = 1,
USER_AUTHED = 2
};
/**
* Enum used to track down various thresholds used in socket communication
*/
enum SESSION_THRESHOLDS
{
MAX_BYTES_PER_MINUTE = 0
};
/**
* FIFO hash set with a max size.
*/