diff --git a/CMakeLists.txt b/CMakeLists.txt index 855e86cf..32acdda5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,7 +43,6 @@ add_library(hpsock src/sock/socket_client.cpp src/sock/socket_server.cpp src/sock/socket_message.cpp - src/sock/socket_monitor.cpp src/sock/socket_session.cpp src/sock/socket_session_lambda.cpp ) diff --git a/src/conf.cpp b/src/conf.cpp index a526a12d..c8eb29a7 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -27,8 +27,8 @@ int init() return -1; // Append self peer to peer list. - std::string portstr = std::to_string(cfg.peerport); - std::string peerid = "0.0.0.0:" + portstr; + const std::string portstr = std::to_string(cfg.peerport); + const std::string peerid = "0.0.0.0:" + portstr; cfg.peers.emplace(std::move(peerid), std::make_pair("0.0.0.0", portstr)); // Append self pubkey to unl list. @@ -88,10 +88,6 @@ int create_contract() cfg.peerport = 22860; cfg.roundtime = 1000; cfg.pubport = 8080; - cfg.pubmaxsize = 0; - cfg.pubmaxcpm = 0; - cfg.peermaxsize = 0; - cfg.peermaxcpm = 0; #ifndef NDEBUG cfg.loglevel = "debug"; @@ -221,10 +217,16 @@ int load_config() cfg.peerport = d["peerport"].GetInt(); cfg.roundtime = d["roundtime"].GetInt(); cfg.pubport = d["pubport"].GetInt(); - cfg.pubmaxsize = d["pubmaxsize"].GetInt(); - cfg.pubmaxcpm = d["pubmaxcpm"].GetInt(); - cfg.peermaxsize = d["peermaxsize"].GetInt(); - cfg.peermaxcpm = d["peermaxcpm"].GetInt(); + + cfg.pubmaxsize = d["pubmaxsize"].GetUint64(); + cfg.pubmaxcpm = d["pubmaxcpm"].GetUint64(); + cfg.pubmaxbadmpm = d["pubmaxbadmpm"].GetUint64(); + + cfg.peermaxsize = d["peermaxsize"].GetUint64(); + cfg.peermaxcpm = d["peermaxcpm"].GetUint64(); + cfg.peermaxdupmpm = d["peermaxdupmpm"].GetUint64(); + cfg.peermaxbadmpm = d["peermaxbadmpm"].GetUint64(); + cfg.peermaxbadsigpm = d["peermaxbadsigpm"].GetUint64(); cfg.loglevel = d["loglevel"].GetString(); cfg.loggers.clear(); @@ -257,7 +259,7 @@ int save_config() d.AddMember("listenip", rapidjson::StringRef(cfg.listenip.data()), allocator); rapidjson::Value peers(rapidjson::kArrayType); - for (auto &[ipport_concat, ipport_pair] : cfg.peers) + for (const auto &[ipport_concat, ipport_pair] : cfg.peers) { rapidjson::Value v; v.SetString(rapidjson::StringRef(ipport_concat.data()), allocator); @@ -266,7 +268,7 @@ int save_config() d.AddMember("peers", peers, allocator); rapidjson::Value unl(rapidjson::kArrayType); - for (auto &nodepk : cfg.unl) + for (const auto &nodepk : cfg.unl) { rapidjson::Value v; std::string hex_pubkey; @@ -286,14 +288,20 @@ int save_config() d.AddMember("peerport", cfg.peerport, allocator); d.AddMember("roundtime", cfg.roundtime, allocator); d.AddMember("pubport", cfg.pubport, allocator); + d.AddMember("pubmaxsize", cfg.pubmaxsize, allocator); d.AddMember("pubmaxcpm", cfg.pubmaxcpm, allocator); + d.AddMember("pubmaxbadmpm", cfg.pubmaxbadmpm, allocator); + d.AddMember("peermaxsize", cfg.peermaxsize, allocator); d.AddMember("peermaxcpm", cfg.peermaxcpm, allocator); + d.AddMember("peermaxdupmpm", cfg.peermaxdupmpm, allocator); + d.AddMember("peermaxbadmpm", cfg.peermaxbadmpm, allocator); + d.AddMember("peermaxbadsigpm", cfg.peermaxbadsigpm, allocator); d.AddMember("loglevel", rapidjson::StringRef(cfg.loglevel.data()), allocator); rapidjson::Value loggers(rapidjson::kArrayType); - for (const std::string &logger : cfg.loggers) + for (std::string_view logger : cfg.loggers) { rapidjson::Value v; v.SetString(rapidjson::StringRef(logger.data()), allocator); @@ -424,8 +432,8 @@ int validate_config() } //Sign and verify a sample message to ensure we have a matching signing key pair. - std::string msg = "hotpocket"; - std::string sighex = crypto::sign_hex(msg, cfg.seckeyhex); + const std::string msg = "hotpocket"; + const std::string sighex = crypto::sign_hex(msg, cfg.seckeyhex); if (crypto::verify_hex(msg, sighex, cfg.pubkeyhex) != 0) { std::cout << "Invalid signing keys. Run with 'rekey' to generate new keys.\n"; @@ -442,9 +450,9 @@ int validate_config() */ int validate_contract_dir_paths() { - std::string paths[6] = {ctx.contractDir, ctx.configFile, ctx.histDir, ctx.stateDir, ctx.tlsKeyFile, ctx.tlsCertFile}; + const std::string paths[6] = {ctx.contractDir, ctx.configFile, ctx.histDir, ctx.stateDir, ctx.tlsKeyFile, ctx.tlsCertFile}; - for (std::string &path : paths) + for (const std::string &path : paths) { if (!boost::filesystem::exists(path)) { @@ -471,14 +479,15 @@ int validate_contract_dir_paths() * * @return 0 for successful validation. -1 for failure. */ -int is_schema_valid(rapidjson::Document &d) +int is_schema_valid(const rapidjson::Document &d) { const char *cfg_schema = "{" "\"type\": \"object\"," "\"required\": [ \"version\", \"pubkeyhex\", \"seckeyhex\", \"binary\", \"binargs\", \"listenip\"" ", \"peers\", \"unl\", \"peerport\", \"roundtime\", \"pubport\", \"pubmaxsize\", \"pubmaxcpm\"" - ", \"peermaxsize\", \"peermaxcpm\", \"loglevel\", \"loggers\" ]," + ", \"pubmaxbadmpm\", \"peermaxsize\", \"peermaxcpm\"" + ", \"peermaxdupmpm\", \"peermaxbadmpm\", \"peermaxbadsigpm\", \"loglevel\", \"loggers\" ]," "\"properties\": {" "\"version\": { \"type\": \"string\" }," "\"pubkeyhex\": { \"type\": \"string\" }," @@ -497,10 +506,17 @@ int is_schema_valid(rapidjson::Document &d) "\"peerport\": { \"type\": \"integer\" }," "\"roundtime\": { \"type\": \"integer\" }," "\"pubport\": { \"type\": \"integer\" }," + "\"pubmaxsize\": { \"type\": \"integer\" }," "\"pubmaxcpm\": { \"type\": \"integer\" }," + "\"pubmaxbadmpm\": { \"type\": \"integer\" }," + "\"peermaxsize\": { \"type\": \"integer\" }," "\"peermaxcpm\": { \"type\": \"integer\" }," + "\"peermaxdupmpm\": { \"type\": \"integer\" }," + "\"peermaxbadmpm\": { \"type\": \"integer\" }," + "\"peermaxbadsigpm\": { \"type\": \"integer\" }," + "\"loglevel\": { \"type\": \"string\" }," "\"loggers\": {" "\"type\": \"array\"," diff --git a/src/conf.hpp b/src/conf.hpp index 00c4e0dc..ea67cc09 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -46,13 +46,20 @@ struct contract_config std::string listenip; // The IPs to listen on for incoming connections std::unordered_map peers; // Map of peers keyed by ":" concatenated format std::unordered_set unl; // Unique node list (list of binary public keys) - uint16_t peerport; // Listening port for peer connections - uint16_t roundtime; // Consensus round time in ms - uint16_t pubport; // Listening port for public user connections - uint64_t pubmaxsize; // User message max size in bytes - uint16_t pubmaxcpm; // User message rate (characters(bytes) per minute) - uint64_t peermaxsize; // Peer message max size in bytes - uint64_t peermaxcpm; // Peer message rate (characters(bytes) per minute) + uint16_t peerport; // Listening port for peer connections + uint16_t roundtime; // Consensus round time in ms + uint16_t pubport; // Listening port for public user connections + + uint64_t pubmaxsize; // User message max size in bytes + uint64_t pubmaxcpm; // User message rate (characters(bytes) per minute) + uint64_t pubmaxbadmpm; // User bad messages per minute + + uint64_t peermaxsize; // Peer message max size in bytes + uint64_t peermaxcpm; // Peer message rate (characters(bytes) per minute) + uint64_t peermaxdupmpm; // Peer max duplicate messages per minute + uint64_t peermaxbadmpm; // Peer bad messages per minute + uint64_t peermaxbadsigpm; // Peer bad signatures per minute + std::string loglevel; // Log severity level (debug, info, warn, error) std::unordered_set loggers; // List of enabled loggers (console, file) }; @@ -83,7 +90,7 @@ int validate_config(); int validate_contract_dir_paths(); -int is_schema_valid(rapidjson::Document &d); +int is_schema_valid(const rapidjson::Document &d); int binpair_to_hex(); diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 7eee847c..dcb2892a 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -53,7 +53,7 @@ std::thread peer_thread; /** * Used to pass down the default settings to the socket session */ -sock::session_options sess_opts; +sock::session_options default_sess_opts; int init() { @@ -67,10 +67,12 @@ void start_peer_connections() { auto address = net::ip::make_address(conf::cfg.listenip); - //setting up the message max size. Retrieve it from config - // At the moment same settings are used to initialize a new server and client - sess_opts.max_message_size = conf::cfg.peermaxsize; - sess_opts.max_bytes_per_minute = conf::cfg.peermaxcpm; + // Setting up the message max size. Retrieve it from config + default_sess_opts.max_socket_read_len = conf::cfg.peermaxsize; + default_sess_opts.max_rawbytes_per_minute = conf::cfg.peermaxcpm; + default_sess_opts.max_dupmsgs_per_minute = conf::cfg.peermaxdupmpm; + default_sess_opts.max_badmsgs_per_minute = conf::cfg.peermaxbadmpm; + default_sess_opts.max_badsigmsgs_per_minute = conf::cfg.peermaxbadsigpm; // Start listening to peers std::make_shared>( @@ -78,7 +80,7 @@ void start_peer_connections() ctx, tcp::endpoint{address, conf::cfg.peerport}, global_peer_session_handler, - sess_opts) + default_sess_opts) ->run(); LOG_INFO << "Started listening for incoming peer connections on " << conf::cfg.listenip << ":" << conf::cfg.peerport; @@ -101,7 +103,7 @@ void peer_connection_watchdog() if (peer_connections.find(v.first) == peer_connections.end()) { LOG_DBG << "Trying to connect :" << v.second.first << ":" << v.second.second; - std::make_shared>(ioc, ctx, global_peer_session_handler, sess_opts) + std::make_shared>(ioc, ctx, global_peer_session_handler, default_sess_opts) ->run(v.second.first, v.second.second); } } diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 357a85fe..89adccac 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -7,6 +7,7 @@ #include "../fbschema/p2pmsg_content_generated.h" #include "../fbschema/p2pmsg_helpers.hpp" #include "../sock/socket_message.hpp" +#include "../sock/socket_session.hpp" #include "p2p.hpp" #include "peer_session_handler.hpp" @@ -23,14 +24,10 @@ util::rollover_hashset recent_peermsg_hashes(200); */ void peer_session_handler::on_connect(sock::socket_session *session) { - if (!session->flags[util::SESSION_FLAG::INBOUND]) + if (!session->flags[sock::SESSION_FLAG::INBOUND]) { - // We init the session unique id to associate with the peer. - session->init_uniqueid(); - { - std::lock_guard lock(p2p::peer_connections_mutex); - peer_connections.insert(std::make_pair(session->uniqueid, session)); - } + std::lock_guard lock(p2p::peer_connections_mutex); + peer_connections.insert(std::make_pair(session->uniqueid, session)); LOG_DBG << "Adding peer to list: " << session->uniqueid; } } @@ -56,6 +53,7 @@ void peer_session_handler::on_message(sock::socket_sessionincrement_metric(sock::SESSION_THRESHOLDS::MAX_DUPMSGS_PER_MINUTE, 1); LOG_DBG << "Duplicate peer message."; return; } @@ -67,6 +65,7 @@ void peer_session_handler::on_message(sock::socket_sessionincrement_metric(sock::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); LOG_DBG << "Proposal rejected due to trust failure."; return; } @@ -91,9 +90,8 @@ void peer_session_handler::on_message(sock::socket_sessionincrement_metric(sock::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); LOG_DBG << "Received invalid message type from peer"; - //TODO: remove/penalize node who sent the message. } } diff --git a/src/sock/socket_monitor.cpp b/src/sock/socket_monitor.cpp deleted file mode 100644 index ca602c78..00000000 --- a/src/sock/socket_monitor.cpp +++ /dev/null @@ -1,31 +0,0 @@ -#include "socket_monitor.hpp" -#include "../p2p/peer_session_handler.hpp" -#include "../usr/user_session_handler.hpp" - -namespace sock -{ - -/** - * Act upon exceeding various thresholds in socket communication - * - * @param threshold Type of threshold which has exceeded. - * @param threshold_limit Threshold limit at the time of exceedance. - * @param session Websocket session which exceeds the threshold. - */ -template -void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session *session) -{ - if (threshold == util::SESSION_THRESHOLDS::MAX_BYTES_PER_MINUTE) - { - // Can act accordingly - session->close(); - } -} - -/** - * Declaring templates with possible values for T because keeping all those in hpp file makes compile take a long time - */ -template void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session *session); - -template void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session *session); -} // namespace sock \ No newline at end of file diff --git a/src/sock/socket_monitor.hpp b/src/sock/socket_monitor.hpp deleted file mode 100644 index 066d1dcd..00000000 --- a/src/sock/socket_monitor.hpp +++ /dev/null @@ -1,13 +0,0 @@ -#ifndef _HP_SOCK_MONITOR_ -#define _HP_SOCK_MONITOR_ - -#include "../util.hpp" -#include "socket_session.hpp" - -namespace sock{ - -template -void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session *session); -} - -#endif \ No newline at end of file diff --git a/src/sock/socket_session.cpp b/src/sock/socket_session.cpp index 026e823c..e45910ad 100644 --- a/src/sock/socket_session.cpp +++ b/src/sock/socket_session.cpp @@ -1,11 +1,11 @@ #include "socket_session.hpp" #include "socket_message.hpp" -#include "socket_monitor.hpp" #include "socket_session_handler.hpp" namespace sock { +// Constructor template socket_session::socket_session(websocket::stream> websocket, socket_session_handler &sess_handler) : ws(std::move(websocket)), sess_handler(sess_handler) @@ -14,18 +14,13 @@ socket_session::socket_session(websocket::stream -socket_session::~socket_session() -{ - sess_handler.on_close(this); -} - /** - * Sets the largest permissible incoming message size. If exceeds over this limit will cause a - * protocol failure + * Sets the largest permissible incoming data length in a single receive. If exceeds over this limit will cause + * a protocol failure. Because this is internally handled by beast socket, we don't use socket_threshold struct + * to handle this. */ template -void socket_session::set_message_max_size(uint64_t size) +void socket_session::set_max_socket_read_len(uint64_t size) { ws.read_message_max(size); } @@ -34,28 +29,72 @@ void socket_session::set_message_max_size(uint64_t size) * Set thresholds to the socket session */ template -void socket_session::set_threshold(util::SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint64_t intervalms) +void socket_session::set_threshold(SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint32_t intervalms) { thresholds[threshold_type].counter_value = 0; thresholds[threshold_type].intervalms = intervalms; thresholds[threshold_type].threshold_limit = threshold_limit; } +/* +* Increment the provided thresholds counter value with the provided amount and validate it against the +* configured threshold limit. +*/ +template +void socket_session::increment_metric(SESSION_THRESHOLDS threshold_type, uint64_t amount) +{ + sock::session_threshold &t = thresholds[threshold_type]; + + // Ignore the counter if limit is set as 0. + if (t.threshold_limit == 0) + return; + + uint64_t time_now = util::get_epoch_milliseconds(); + + t.counter_value += amount; + if (t.timestamp == 0) + { + // Reset counter timestamp. + t.timestamp = time_now; + } + else + { + // Check whether we have exceeded the threshold within the monitering interval. + auto elapsed_time = time_now - t.timestamp; + if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit) + { + t.timestamp = 0; + t.counter_value = 0; + + LOG_INFO << "Session " << this->uniqueid << " threshold exceeded. (type:" << threshold_type << " limit:" << t.threshold_limit << ")"; + this->close(); + } + else if (elapsed_time > t.intervalms) + { + t.timestamp = time_now; + t.counter_value = amount; + } + } +} + //port and address will be used to identify from which remote party the message recieved in the handler template void socket_session::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts) { - if (sess_opts.max_message_size > 0) + if (sess_opts.max_socket_read_len > 0) { - // Setting maximum file size - set_message_max_size(sess_opts.max_message_size); + // Setting maximum data size within a single message. This is handled within the beast socket. + set_max_socket_read_len(sess_opts.max_socket_read_len); } - // Create new session_threshold and insert it to thresholds array + // Create new session_thresholds and insert it to thresholds vector. // Have to maintain the SESSION_THRESHOLDS enum order in inserting new thresholds to thresholds vector - // since enum's value is used as index in the vector to update vector values - session_threshold max_byte_per_message_threshold{sess_opts.max_bytes_per_minute, 0, 0, 60000}; - thresholds.push_back(std::move(max_byte_per_message_threshold)); + // since enum's value is used as index in the vector to update vector values. + thresholds.reserve(4); + thresholds.push_back(session_threshold(sess_opts.max_rawbytes_per_minute, 60000)); + thresholds.push_back(session_threshold(sess_opts.max_dupmsgs_per_minute, 60000)); + thresholds.push_back(session_threshold(sess_opts.max_badsigmsgs_per_minute, 60000)); + thresholds.push_back(session_threshold(sess_opts.max_badmsgs_per_minute, 60000)); ssl::stream_base::handshake_type handshake_type = ssl::stream_base::client; if (is_server_session) @@ -65,12 +104,17 @@ void socket_session::run(const std::string &&address, const std::string &&por * INBOUND true - when node acts as server * INBOUND false (OUTBOUND) - when node acts as client */ - flags.set(util::SESSION_FLAG::INBOUND); + flags.set(SESSION_FLAG::INBOUND); handshake_type = ssl::stream_base::server; } - this->port = port; - this->address = address; + this->port = std::move(port); + this->address = std::move(address); + + // Create a unique id for the session combining ip and port. + // We prepare this appended string here because we need to use it as an identifier of the session in various places. + this->uniqueid.reserve(port.size() + address.size() + 1); + this->uniqueid.append(address).append(":").append(port); // Set the timeout. beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30)); @@ -92,7 +136,7 @@ void socket_session::on_ssl_handshake(error_code ec) // the websocket stream has its own timeout system. beast::get_lowest_layer(ws).expires_never(); - if (flags[util::SESSION_FLAG::INBOUND]) + if (flags[SESSION_FLAG::INBOUND]) { // Set suggested timeout settings for the websocket ws.set_option( @@ -149,7 +193,7 @@ void socket_session::on_read(error_code ec, std::size_t) return fail(ec, "read"); } - increment(util::SESSION_THRESHOLDS::MAX_BYTES_PER_MINUTE, buffer.size()); + increment_metric(SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, buffer.size()); // Wrap the buffer data in a string_view and call session handler. // We DO NOT transfer ownership of buffer data to the session handler. It should @@ -165,46 +209,6 @@ void socket_session::on_read(error_code ec, std::size_t) ws_async_read(); } -/* -* Increment the provided thresholds counter value with the provided amount and validate it -*/ -template -void socket_session::increment(util::SESSION_THRESHOLDS threshold_type, uint64_t amount) -{ - sock::session_threshold &t = thresholds[threshold_type]; - - // Ignore the counter if limit is set as 0. - if (t.threshold_limit == 0) - return; - - uint64_t time_now = util::get_epoch_milliseconds(); - - t.counter_value += amount; - if (t.timestamp == 0) - { - t.timestamp = time_now; - } - else - { - auto elapsed_time = time_now - t.timestamp; - if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit) - { - t.timestamp = 0; - t.counter_value = 0; - - LOG_INFO << "Session " << this->uniqueid << " threshold exceeded. (type:" << threshold_type << " limit:" << t.threshold_limit << ")"; - - // Invoke the threshold monitor so any actions will be performed. - threshold_monitor(threshold_type, t.threshold_limit, this); - } - else if (elapsed_time > t.intervalms) - { - t.timestamp = time_now; - t.counter_value = amount; - } - } -} - /* * Send message through an active websocket connection */ @@ -269,16 +273,6 @@ void socket_session::on_close(error_code ec, int8_t type) return fail(ec, "close"); } -// When called, initializes the unique id string for this session. -template -void socket_session::init_uniqueid() -{ - // Create a unique id for the session combining ip and port. - // We prepare this appended string here because we need to use it for finding elemends from the maps - // for validation purposes whenever a message is received. - uniqueid.append(address).append(":").append(port); -} - /** * Executes on error */ @@ -293,6 +287,12 @@ void socket_session::fail(error_code ec, char const *what) return; } +template +socket_session::~socket_session() +{ + sess_handler.on_close(this); +} + // Template instantiations. template class socket_session; template class socket_session; diff --git a/src/sock/socket_session.hpp b/src/sock/socket_session.hpp index b55a3dd5..83c74d21 100644 --- a/src/sock/socket_session.hpp +++ b/src/sock/socket_session.hpp @@ -15,6 +15,36 @@ using error_code = boost::system::error_code; namespace sock { +/** + * Set of flags used to mark status information on the session. + * usr and p2p subsystems makes use of this to mark status information of user and peer sessions. + * Set flags are stored in 'flags' bitset of socket_session. + */ +enum SESSION_FLAG +{ + INBOUND = 0, + USER_CHALLENGE_ISSUED = 1, + USER_AUTHED = 2 +}; + +/** + * Enum used to track down various thresholds used in socket communication. + */ +enum SESSION_THRESHOLDS +{ + // Max incoming bytes per minute. + MAX_RAWBYTES_PER_MINUTE = 0, + + // Max duplicate messages per minute. + MAX_DUPMSGS_PER_MINUTE = 1, + + // Max messages with invalid signature per minute. + MAX_BADSIGMSGS_PER_MINUTE = 2, + + // Max messages with bad structure per minute. + MAX_BADMSGS_PER_MINUTE = 3 +}; + /* * Use this to keep in track of different thresholds which we need to deal with. e.g - maximum amount of bytes allowed per minute through a session * threshold_limit - Maximum threshold value which is allowed @@ -27,14 +57,23 @@ struct session_threshold uint64_t threshold_limit; uint64_t counter_value; uint64_t timestamp; - uint64_t intervalms; + uint32_t intervalms; + + session_threshold(uint64_t threshold_limit, uint32_t intervalms) + { + this->threshold_limit = threshold_limit; + this->intervalms = intervalms; + } }; // Use this to feed the session with default options from the config file struct session_options { - uint64_t max_message_size; - uint64_t max_bytes_per_minute; + uint64_t max_socket_read_len; + uint64_t max_rawbytes_per_minute; + uint64_t max_dupmsgs_per_minute; + uint64_t max_badsigmsgs_per_minute; + uint64_t max_badmsgs_per_minute; }; //Forward Declaration @@ -51,7 +90,7 @@ class socket_session : public std::enable_shared_from_this> websocket::stream> ws; // websocket stream used send an recieve messages std::vector queue; // used to store messages temporarily until it is sent to the relevant party socket_session_handler &sess_handler; // handler passed to gain access to websocket events - std::vector thresholds; // track down various thresholdsls + std::vector thresholds; // track down various communication thresholds void fail(error_code ec, char const *what); @@ -101,7 +140,7 @@ public: // The unique identifier of the remote party (format :). std::string uniqueid; - // The set of util::SESSION_FLAG enum flags that will be set by user-code of this calss. + // The set of sock::SESSION_FLAG enum flags that will be set by user-code of this calss. // We mainly use this to store contexual information about this session based on the use case. // Setting and reading flags to this is completely managed by user-code. std::bitset<8> flags; @@ -110,14 +149,11 @@ public: void send(T msg); - void set_message_max_size(uint64_t size); + void set_max_socket_read_len(uint64_t size); - void set_threshold(util::SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint64_t interval); + void set_threshold(SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint32_t intervalms); - void increment(util::SESSION_THRESHOLDS threshold_type, uint64_t amount); - - // When called, initializes the unique id string for this session. - void init_uniqueid(); + void increment_metric(SESSION_THRESHOLDS threshold_type, uint64_t amount); void close(); }; diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp index e82f05c2..57a9f4f4 100644 --- a/src/usr/user_session_handler.cpp +++ b/src/usr/user_session_handler.cpp @@ -1,5 +1,4 @@ #include "../pchheader.hpp" -#include "../util.hpp" #include "../hplog.hpp" #include "../jsonschema/usrmsg_helpers.hpp" #include "../sock/socket_session.hpp" @@ -23,14 +22,11 @@ void user_session_handler::on_connect(sock::socket_sessioninit_uniqueid(); - user_outbound_message outmsg(issue_challenge(session->uniqueid)); session->send(std::move(outmsg)); // Set the challenge-issued flag to help later checks in on_message. - session->flags.set(util::SESSION_FLAG::USER_CHALLENGE_ISSUED); + session->flags.set(sock::SESSION_FLAG::USER_CHALLENGE_ISSUED); } /** @@ -42,13 +38,13 @@ void user_session_handler::on_message( { // First check whether this session is pending challenge. // Meaning we have previously issued a challenge to the client, - if (session->flags[util::SESSION_FLAG::USER_CHALLENGE_ISSUED]) + if (session->flags[sock::SESSION_FLAG::USER_CHALLENGE_ISSUED]) { if (verify_challenge(message, session) == 0) return; } // Check whether this session belongs to an authenticated (challenge-verified) user. - else if (session->flags[util::SESSION_FLAG::USER_AUTHED]) + else if (session->flags[sock::SESSION_FLAG::USER_AUTHED]) { // Check whether this user is among authenticated users // and perform authenticated msg processing. @@ -58,16 +54,19 @@ void user_session_handler::on_message( { // This is an authed user. connected_user &user = itr->second; - if (handle_user_message(user, message) == 0) - return; - - LOG_DBG << "Bad message from user " << session->uniqueid; - // TODO: Increase session bad message count. + if (handle_user_message(user, message) != 0) + { + session->increment_metric(sock::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); + LOG_DBG << "Bad message from user " << session->uniqueid; + } } else { + session->increment_metric(sock::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); LOG_DBG << "User session id not found: " << session->uniqueid; } + + return; } // If for any reason we reach this point, we should drop the connection because none of the @@ -84,11 +83,11 @@ void user_session_handler::on_close(sock::socket_session // Cleanup any resources related to this session. // Session is awaiting challenge response. - if (session->flags[util::SESSION_FLAG::USER_CHALLENGE_ISSUED]) + if (session->flags[sock::SESSION_FLAG::USER_CHALLENGE_ISSUED]) ctx.pending_challenges.erase(session->uniqueid); // Session belongs to an authed user. - else if (session->flags[util::SESSION_FLAG::USER_AUTHED]) + else if (session->flags[sock::SESSION_FLAG::USER_AUTHED]) remove_user(session->uniqueid); LOG_INFO << "User disconnected " << session->uniqueid; diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index d99dd27e..42052e65 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -1,6 +1,7 @@ #include "../pchheader.hpp" #include "../jsonschema/usrmsg_helpers.hpp" #include "../sock/socket_server.hpp" +#include "../sock/socket_session.hpp" #include "../sock/socket_session_handler.hpp" #include "../util.hpp" #include "../conf.hpp" @@ -81,8 +82,8 @@ int verify_challenge(std::string_view message, sock::socket_sessionflags.reset(util::SESSION_FLAG::USER_CHALLENGE_ISSUED); // Clear challenge-issued flag - session->flags.set(util::SESSION_FLAG::USER_AUTHED); // Set the user-authed flag + session->flags.reset(sock::SESSION_FLAG::USER_CHALLENGE_ISSUED); // Clear challenge-issued flag + session->flags.set(sock::SESSION_FLAG::USER_AUTHED); // Set the user-authed flag add_user(session, userpubkey); // Add the user to the global authed user list ctx.pending_challenges.erase(session->uniqueid); // Remove the stored challenge @@ -203,15 +204,16 @@ void start_listening() { auto address = net::ip::make_address(conf::cfg.listenip); - listener_ctx.sess_opts.max_message_size = conf::cfg.pubmaxsize; - listener_ctx.sess_opts.max_bytes_per_minute = conf::cfg.pubmaxcpm; + listener_ctx.default_sess_opts.max_socket_read_len = conf::cfg.pubmaxsize; + listener_ctx.default_sess_opts.max_rawbytes_per_minute = conf::cfg.pubmaxcpm; + listener_ctx.default_sess_opts.max_badmsgs_per_minute = conf::cfg.pubmaxbadmpm; std::make_shared>( listener_ctx.ioc, listener_ctx.ssl_ctx, tcp::endpoint{address, conf::cfg.pubport}, listener_ctx.global_usr_session_handler, - listener_ctx.sess_opts) + listener_ctx.default_sess_opts) ->run(); listener_ctx.listener_thread = std::thread([&] { listener_ctx.ioc.run(); }); diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index 41855727..e5a66028 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -79,7 +79,7 @@ struct listener_context std::thread listener_thread; // Used to pass down the default settings to the socket session - sock::session_options sess_opts; + sock::session_options default_sess_opts; }; int init(); diff --git a/src/util.hpp b/src/util.hpp index de13da99..ac913ef7 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -13,36 +13,16 @@ namespace util // Hot Pocket version. Displayed on 'hotpocket version' and written to new contract configs. static const char *HP_VERSION = "0.1"; -// Current version of the peer message protocol. -static const int PEERMSG_VERSION = 1; - // Minimum compatible contract config version (this will be used to validate contract configs) static const char *MIN_CONTRACT_VERSION = "0.1"; +// Current version of the peer message protocol. +static const int PEERMSG_VERSION = 1; + // Minimum compatible peer message version (this will be used to accept/reject incoming peer connections) // (Keeping this as int for effcient msg payload and comparison) static const int MIN_PEERMSG_VERSION = 1; -/** - * Set of flags used to mark status information on the session. - * usr and p2p subsystems makes use of this to mark status information of user and peer sessions. - * Set flags are stored in 'flags_' bitset. - */ -enum SESSION_FLAG -{ - INBOUND = 0, - USER_CHALLENGE_ISSUED = 1, - USER_AUTHED = 2 -}; - -/** - * Enum used to track down various thresholds used in socket communication - */ -enum SESSION_THRESHOLDS -{ - MAX_BYTES_PER_MINUTE = 0 -}; - /** * FIFO hash set with a max size. */