diff --git a/CMakeLists.txt b/CMakeLists.txt index 4fffd9cc..67a36fba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,7 @@ add_executable(hpcore src/sock/socket_client.cpp src/sock/socket_server.cpp src/sock/socket_session.cpp + src/sock/socket_monitor.cpp src/p2p/peer_session_handler.cpp src/p2p/p2p.cpp src/usr/user_session_handler.cpp diff --git a/examples/hpclient/malicious-client.js b/examples/hpclient/malicious-client.js new file mode 100644 index 00000000..389acc21 --- /dev/null +++ b/examples/hpclient/malicious-client.js @@ -0,0 +1,116 @@ +// +// HotPocket client example code adopted from: +// https://github.com/codetsunami/hotpocket/blob/master/hp_client.js +// + +const fs = require('fs') +const ws_api = require('ws'); +const sodium = require('libsodium-wrappers') +const readline = require('readline') + +// sodium has a trigger when it's ready, we will wait and execute from there +sodium.ready.then(main).catch((e) => { console.log(e) }) + + +function main() { + + if (process.argv.length != 6) { + console.log("Incorrect format"); + console.log("node client.js "); + process.exit(); + } + + var args = process.argv.slice(2); + var keys = sodium.crypto_sign_keypair() + + + // check for client keys + if (!fs.existsSync('.hp_client_keys')) { + keys.privateKey = sodium.to_hex(keys.privateKey) + keys.publicKey = sodium.to_hex(keys.publicKey) + fs.writeFileSync('.hp_client_keys', JSON.stringify(keys)) + } else { + keys = JSON.parse(fs.readFileSync('.hp_client_keys')) + keys.privateKey = Uint8Array.from(Buffer.from(keys.privateKey, 'hex')) + keys.publicKey = Uint8Array.from(Buffer.from(keys.publicKey, 'hex')) + } + + + var server = 'wss://localhost:8080' + + if (process.argv.length == 3) server = 'wss://localhost:' + args[0] + + if (process.argv.length == 4) server = 'wss://' + args[0] + ':' + args[1] + + var ws = new ws_api(server, { + rejectUnauthorized: false + }) + + /* anatomy of a public challenge + { + hotpocket: 0.1, + type: 'public_challenge', + challenge: '' + } + */ + + + // if the console ctrl + c's us we should close ws gracefully + process.once('SIGINT', function (code) { + console.log('SIGINT received...'); + ws.close() + }); + + ws.on('message', (m) => { + console.log("-----Received raw message-----") + console.log(m.toString()) + console.log("------------------------------") + + try { + m = JSON.parse(m) + } catch (e) { + return + } + + if (m.type != 'public_challenge') return + + console.log("Received challenge message") + console.log(m) + + let pkhex = 'ed' + Buffer.from(keys.publicKey).toString('hex'); + console.log('My public key is: ' + pkhex); + + // sign the challenge and send back the response + var sigbytes = sodium.crypto_sign_detached(m.challenge, keys.privateKey); + var response = { + type: 'challenge_response', + challenge: m.challenge, + sig: Buffer.from(sigbytes).toString('hex'), + pubkey: pkhex + } + + console.log('Sending challenge response...'); + ws.send(JSON.stringify(response)); + + setInterval(() => { + var message = generateRandomMessage(args[2]); + console.log("Message :" + message); + ws.send(message); + }, args[3]); + + }); + + ws.on('close', () => { + console.log('Server disconnected.'); + }); +} + +function generateRandomMessage(length) { + var result = ''; + var characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'; + var charactersLength = characters.length; + for (var i = 0; i < length; i++) { + result += characters.charAt(Math.floor(Math.random() * charactersLength)); + } + return result; +} diff --git a/src/conf.cpp b/src/conf.cpp index ab48a675..b6382f48 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -100,9 +100,9 @@ int create_contract() cfg.roundtime = 1000; cfg.pubport = 8080; cfg.pubmaxsize = 65536; - cfg.pubmaxmpm = 100; + cfg.pubmaxcpm = 1000; cfg.peermaxsize = 65536; - cfg.peermaxmpm = 1000; + cfg.peermaxcpm = 1000; #ifndef NDEBUG cfg.loglevel = "debug"; @@ -233,9 +233,9 @@ int load_config() cfg.roundtime = d["roundtime"].GetInt(); cfg.pubport = d["pubport"].GetInt(); cfg.pubmaxsize = d["pubmaxsize"].GetInt(); - cfg.pubmaxmpm = d["pubmaxmpm"].GetInt(); + cfg.pubmaxcpm = d["pubmaxcpm"].GetInt(); cfg.peermaxsize = d["peermaxsize"].GetInt(); - cfg.peermaxmpm = d["peermaxmpm"].GetInt(); + cfg.peermaxcpm = d["peermaxcpm"].GetInt(); cfg.loglevel = d["loglevel"].GetString(); cfg.loggers.clear(); @@ -298,9 +298,9 @@ int save_config() d.AddMember("roundtime", cfg.roundtime, allocator); d.AddMember("pubport", cfg.pubport, allocator); d.AddMember("pubmaxsize", cfg.pubmaxsize, allocator); - d.AddMember("pubmaxmpm", cfg.pubmaxmpm, allocator); + d.AddMember("pubmaxcpm", cfg.pubmaxcpm, allocator); d.AddMember("peermaxsize", cfg.peermaxsize, allocator); - d.AddMember("peermaxmpm", cfg.peermaxmpm, allocator); + d.AddMember("peermaxcpm", cfg.peermaxcpm, allocator); d.AddMember("loglevel", rapidjson::StringRef(cfg.loglevel.data()), allocator); rapidjson::Value loggers(rapidjson::kArrayType); @@ -403,8 +403,8 @@ int validate_config() // Other required fields. if (cfg.binary.empty() || cfg.listenip.empty() || - cfg.peerport == 0 || cfg.roundtime == 0 || cfg.pubport == 0 || cfg.pubmaxsize == 0 || cfg.pubmaxmpm == 0 || cfg.peermaxsize == 0 || - cfg.peermaxmpm == 0 || cfg.loglevel.empty() || cfg.loggers.empty()) + cfg.peerport == 0 || cfg.roundtime == 0 || cfg.pubport == 0 || cfg.pubmaxsize == 0 || cfg.pubmaxcpm == 0 || cfg.peermaxsize == 0 || + cfg.peermaxcpm == 0 || cfg.loglevel.empty() || cfg.loggers.empty()) { std::cout << "Required configuration fields missing at " << ctx.configFile << std::endl; return -1; @@ -489,8 +489,8 @@ int is_schema_valid(rapidjson::Document &d) "{" "\"type\": \"object\"," "\"required\": [ \"version\", \"pubkeyhex\", \"seckeyhex\", \"binary\", \"binargs\", \"listenip\"" - ", \"peers\", \"unl\", \"peerport\", \"roundtime\", \"pubport\", \"pubmaxsize\", \"pubmaxmpm\"" - ", \"peermaxsize\", \"peermaxmpm\", \"loglevel\", \"loggers\" ]," + ", \"peers\", \"unl\", \"peerport\", \"roundtime\", \"pubport\", \"pubmaxsize\", \"pubmaxcpm\"" + ", \"peermaxsize\", \"peermaxcpm\", \"loglevel\", \"loggers\" ]," "\"properties\": {" "\"version\": { \"type\": \"string\" }," "\"pubkeyhex\": { \"type\": \"string\" }," @@ -510,9 +510,9 @@ int is_schema_valid(rapidjson::Document &d) "\"roundtime\": { \"type\": \"integer\" }," "\"pubport\": { \"type\": \"integer\" }," "\"pubmaxsize\": { \"type\": \"integer\" }," - "\"pubmaxmpm\": { \"type\": \"integer\" }," + "\"pubmaxcpm\": { \"type\": \"integer\" }," "\"peermaxsize\": { \"type\": \"integer\" }," - "\"peermaxmpm\": { \"type\": \"integer\" }," + "\"peermaxcpm\": { \"type\": \"integer\" }," "\"loglevel\": { \"type\": \"string\" }," "\"loggers\": {" "\"type\": \"array\"," diff --git a/src/conf.hpp b/src/conf.hpp index 389b57ec..745e0f65 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -49,13 +49,13 @@ struct contract_config std::string listenip; // The IPs to listen on for incoming connections std::unordered_map peers; // Map of peers keyed by ":" concatenated format std::unordered_set unl; // Unique node list (list of binary public keys) - std::uint16_t peerport; // Listening port for peer connections - std::uint16_t roundtime; // Consensus round time in ms - std::uint16_t pubport; // Listening port for public user connections - std::uint64_t pubmaxsize; // User message max size in bytes - std::uint16_t pubmaxmpm; // User message rate (in minutes) - std::uint64_t peermaxsize; // Peer message max size in bytes - std::uint64_t peermaxmpm; // Peer message rate (in minutes) + uint16_t peerport; // Listening port for peer connections + uint16_t roundtime; // Consensus round time in ms + uint16_t pubport; // Listening port for public user connections + uint64_t pubmaxsize; // User message max size in bytes + uint16_t pubmaxcpm; // User message rate (characters(bytes) per minute) + uint64_t peermaxsize; // Peer message max size in bytes + uint64_t peermaxcpm; // Peer message rate (characters(bytes) per minute) std::string loglevel; // Log severity level (debug, info, warn, error) std::unordered_set loggers; // List of enabled loggers (console, file) }; diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp index 14439a7f..b6a15517 100644 --- a/src/cons/cons.hpp +++ b/src/cons/cons.hpp @@ -70,7 +70,7 @@ int broadcast_proposal(const p2p::proposal &p); void check_majority_stage(bool &is_desync, bool &should_reset, int8_t &majority_stage, vote_counter &votes); -void run_contract_binary(std::int64_t time); +void run_contract_binary(int64_t time); } // namespace cons diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 16c2ad4a..425ee5a2 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -80,6 +80,7 @@ void start_peer_connections() //setting up the message max size. Retrieve it from config // At the moment same settings are used to initialize a new server and client sess_opts.max_message_size = conf::cfg.peermaxsize; + sess_opts.max_bytes_per_minute = conf::cfg.peermaxcpm; // Start listening to peers std::make_shared>( diff --git a/src/sock/socket_monitor.cpp b/src/sock/socket_monitor.cpp new file mode 100644 index 00000000..ca602c78 --- /dev/null +++ b/src/sock/socket_monitor.cpp @@ -0,0 +1,31 @@ +#include "socket_monitor.hpp" +#include "../p2p/peer_session_handler.hpp" +#include "../usr/user_session_handler.hpp" + +namespace sock +{ + +/** + * Act upon exceeding various thresholds in socket communication + * + * @param threshold Type of threshold which has exceeded. + * @param threshold_limit Threshold limit at the time of exceedance. + * @param session Websocket session which exceeds the threshold. + */ +template +void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session *session) +{ + if (threshold == util::SESSION_THRESHOLDS::MAX_BYTES_PER_MINUTE) + { + // Can act accordingly + session->close(); + } +} + +/** + * Declaring templates with possible values for T because keeping all those in hpp file makes compile take a long time + */ +template void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session *session); + +template void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session *session); +} // namespace sock \ No newline at end of file diff --git a/src/sock/socket_monitor.hpp b/src/sock/socket_monitor.hpp new file mode 100644 index 00000000..9cde0d4e --- /dev/null +++ b/src/sock/socket_monitor.hpp @@ -0,0 +1,13 @@ +#ifndef _SOCK_MONITOR_H_ +#define _SOCK_MONITOR_H_ + +#include "../util.hpp" +#include "socket_session.hpp" + +namespace sock{ + +template +void threshold_monitor(util::SESSION_THRESHOLDS threshold, int64_t threshold_limit, socket_session *session); +} + +#endif \ No newline at end of file diff --git a/src/sock/socket_session.cpp b/src/sock/socket_session.cpp index b4676752..ba2bb20f 100644 --- a/src/sock/socket_session.cpp +++ b/src/sock/socket_session.cpp @@ -1,10 +1,12 @@ #include "socket_session.hpp" #include "../p2p/peer_session_handler.hpp" #include "../usr/user_session_handler.hpp" +#include "socket_monitor.hpp" -namespace sock{ +namespace sock +{ - template +template socket_session::socket_session(websocket::stream> websocket, socket_session_handler &sess_handler) : ws(std::move(websocket)), sess_handler(sess_handler) { @@ -23,11 +25,22 @@ socket_session::~socket_session() * protocol failure */ template -void socket_session::set_message_max_size(std::uint64_t size) +void socket_session::set_message_max_size(uint64_t size) { ws.read_message_max(size); } +/** + * Set thresholds to the socket session +*/ +template +void socket_session::set_threshold(util::SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint64_t intervalms) +{ + thresholds[threshold_type].counter_value = 0; + thresholds[threshold_type].intervalms = intervalms; + thresholds[threshold_type].threshold_limit = threshold_limit; +} + //port and address will be used to identify from which remote party the message recieved in the handler template void socket_session::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts) @@ -40,6 +53,12 @@ void socket_session::run(const std::string &&address, const std::string &&por set_message_max_size(sess_opts.max_message_size); } + // Create new session_threshold and insert it to thresholds array + // Have to maintain the SESSION_THRESHOLDS enum order in inserting new thresholds to thresholds vector + // since enum's value is used as index in the vector to update vector values + session_threshold max_byte_per_message_threshold{sess_opts.max_bytes_per_minute, 0, 0, 60000}; + thresholds.push_back(std::move(max_byte_per_message_threshold)); + if (is_server_session) { /** @@ -94,7 +113,6 @@ void socket_session::on_ssl_handshake(error_code ec) } else { - ws.set_option( websocket::stream_base::timeout::suggested( beast::role_type::client)); @@ -149,6 +167,8 @@ void socket_session::on_read(error_code ec, std::size_t) return fail(ec, "read"); } + increment(util::SESSION_THRESHOLDS::MAX_BYTES_PER_MINUTE, buffer.size()); + // Wrap the buffer data in a string_view and call session handler. // We DO NOT transfer ownership of buffer data to the session handler. It should // read and process the message and we will clear the buffer after its done with it. @@ -168,6 +188,44 @@ void socket_session::on_read(error_code ec, std::size_t) }); } +/* +* Increment the provided thresholds counter value with the provided amount and validate it +*/ +template +void socket_session::increment(util::SESSION_THRESHOLDS threshold_type, uint64_t amount) +{ + sock::session_threshold &t = thresholds[threshold_type]; + + // Ignore the counter if limit is set as 0. + if (t.threshold_limit == 0) + return; + + uint64_t time_now = util::get_epoch_milliseconds(); + + t.counter_value += amount; + if (t.timestamp == 0) + { + t.timestamp = time_now; + } + else + { + auto elapsed_time = time_now - t.timestamp; + if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit) + { + t.timestamp = 0; + t.counter_value = 0; + + // Invoke the threshold monitor so any actions will be performed. + threshold_monitor(threshold_type, t.threshold_limit, this); + } + else if (elapsed_time > t.intervalms) + { + t.timestamp = time_now; + t.counter_value = amount; + } + } +} + /* * Send message through an active websocket connection */ @@ -238,7 +296,7 @@ void socket_session::close() */ //type will be used identify whether the error is due to failure in closing the web socket or transfer of another exception to this method template -void socket_session::on_close(error_code ec, std::int8_t type) +void socket_session::on_close(error_code ec, int8_t type) { if (type == 1) return; @@ -273,21 +331,25 @@ void socket_session::fail(error_code ec, char const *what) /** * Declaring templates with possible values for T because keeping all those in hpp file makes compile take a long time - */ + */ template socket_session::socket_session(websocket::stream> websocket, socket_session_handler &sess_handler); template socket_session::~socket_session(); -template void socket_session::set_message_max_size(std::uint64_t size); +template void socket_session::set_message_max_size(uint64_t size); template void socket_session::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts); template void socket_session::send(p2p::peer_outbound_message msg); +template void socket_session::set_threshold(util::SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint64_t intervalms); +template void socket_session::increment(util::SESSION_THRESHOLDS threshold_type, uint64_t amount); template void socket_session::init_uniqueid(); template void socket_session::close(); template socket_session::socket_session(websocket::stream> websocket, socket_session_handler &sess_handler); template socket_session::~socket_session(); -template void socket_session::set_message_max_size(std::uint64_t size); +template void socket_session::set_message_max_size(uint64_t size); template void socket_session::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts); template void socket_session::send(usr::user_outbound_message msg); +template void socket_session::set_threshold(util::SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint64_t intervalms); +template void socket_session::increment(util::SESSION_THRESHOLDS threshold_type, uint64_t amount); template void socket_session::init_uniqueid(); template void socket_session::close(); -} \ No newline at end of file +} // namespace sock \ No newline at end of file diff --git a/src/sock/socket_session.hpp b/src/sock/socket_session.hpp index 3189b129..8725eb6a 100644 --- a/src/sock/socket_session.hpp +++ b/src/sock/socket_session.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -40,10 +41,29 @@ public: virtual std::string_view buffer() = 0; }; +/* +* Use this to keep in track of different thresholds which we need to deal with. e.g - maximum amount of bytes allowed per minute through a session +* threshold_limit - Maximum threshold value which is allowed +* counter_value - Counter which keeps incrementing per every message +* timestamp - Timestamp when counter value changes +* intervalms - Time interval in miliseconds in which the threshold and the counter value should be compared +*/ +struct session_threshold +{ + uint64_t threshold_limit; + uint64_t counter_value; + uint64_t timestamp; + uint64_t intervalms; + + // session_threshold(uint64_t threshold_limit, uint64_t intervalms) + // : threshold_limit(threshold_limit), intervalms(intervalms), counter_value(0), timestamp(0) {} +}; + // Use this to feed the session with default options from the config file struct session_options { - std::uint64_t max_message_size; + uint64_t max_message_size; + uint64_t max_bytes_per_minute; }; //Forward Declaration @@ -60,6 +80,7 @@ class socket_session : public std::enable_shared_from_this> websocket::stream> ws; // websocket stream used send an recieve messages std::vector queue; // used to store messages temporarily until it is sent to the relevant party socket_session_handler &sess_handler; // handler passed to gain access to websocket events + std::vector thresholds; // track down various thresholdsls void fail(error_code ec, char const *what); @@ -71,7 +92,8 @@ class socket_session : public std::enable_shared_from_this> void on_write(error_code ec, std::size_t bytes_transferred); - void on_close(error_code ec, std::int8_t type); + void on_close(error_code ec, int8_t type); + public: socket_session(websocket::stream> websocket, socket_session_handler &sess_handler); @@ -100,7 +122,11 @@ public: void send(T msg); - void set_message_max_size(std::uint64_t size); + void set_message_max_size(uint64_t size); + + void set_threshold(util::SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint64_t interval); + + void increment(util::SESSION_THRESHOLDS threshold_type, uint64_t amount); // When called, initializes the unique id string for this session. void init_uniqueid(); @@ -108,7 +134,5 @@ public: void close(); }; - - } // namespace sock #endif diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 96ab5cb7..7fcd4a89 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -270,6 +270,7 @@ void start_listening() auto address = net::ip::make_address(conf::cfg.listenip); sess_opts.max_message_size = conf::cfg.pubmaxsize; + sess_opts.max_bytes_per_minute = conf::cfg.pubmaxcpm; std::make_shared>( ioc, diff --git a/src/util.hpp b/src/util.hpp index 1cafb567..8b606eb7 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -37,6 +37,14 @@ enum SESSION_FLAG USER_AUTHED = 2 }; +/** + * Enum used to track down various thresholds used in socket communication + */ +enum SESSION_THRESHOLDS +{ + MAX_BYTES_PER_MINUTE = 0 +}; + int bin2hex(std::string &encoded_string, const unsigned char *bin, size_t bin_len); int hex2bin(unsigned char *decoded, size_t decoded_len, std::string_view hex_str);