diff --git a/examples/nodejs_client/.dockerignore b/examples/nodejs_client/.dockerignore new file mode 100644 index 00000000..5171c540 --- /dev/null +++ b/examples/nodejs_client/.dockerignore @@ -0,0 +1,2 @@ +node_modules +npm-debug.log \ No newline at end of file diff --git a/examples/nodejs_client/Dockerfile b/examples/nodejs_client/Dockerfile new file mode 100644 index 00000000..cd50b6ec --- /dev/null +++ b/examples/nodejs_client/Dockerfile @@ -0,0 +1,15 @@ +FROM node:12.18.3-buster-slim + +WORKDIR /text_client + +COPY package*.json ./ +RUN npm install + +COPY text-client.js ./ +COPY hp-client-lib.js ./ + +ENTRYPOINT ["node", "text-client.js"] + +# Comment this for localhost connection. +# Client connects to node1 of the cluster. +CMD ["node1", "8081"] \ No newline at end of file diff --git a/examples/nodejs_client/client-build.sh b/examples/nodejs_client/client-build.sh new file mode 100755 index 00000000..5e31709c --- /dev/null +++ b/examples/nodejs_client/client-build.sh @@ -0,0 +1,2 @@ +#!/bin/bash +docker build -t hp:text_client . \ No newline at end of file diff --git a/examples/nodejs_client/client-start.sh b/examples/nodejs_client/client-start.sh new file mode 100755 index 00000000..a933c4c5 --- /dev/null +++ b/examples/nodejs_client/client-start.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +# To connect to docker cluster node. +docker run -i -t --rm --network=hpnet hp:text_client + +# To connect to localhost node. +# docker run -i -t --rm --network=host hp:text_client diff --git a/src/bill/corebill.cpp b/src/bill/corebill.cpp index a3c8dff1..bbee5efa 100644 --- a/src/bill/corebill.cpp +++ b/src/bill/corebill.cpp @@ -6,82 +6,87 @@ namespace corebill { -// How many violations can occur for a host before being escalated. -constexpr uint32_t VIOLATION_THRESHOLD = 10; + // How many violations can occur for a host before being escalated. + constexpr uint32_t VIOLATION_THRESHOLD = 10; -// Violation cooldown interval. -constexpr uint32_t VIOLATION_REFRESH_INTERVAL = 600 * 1000; // 10 minutes + // Violation cooldown interval. + constexpr uint32_t VIOLATION_REFRESH_INTERVAL = 600 * 1000; // 10 minutes -// Keeps track of violation count against offending hosts. -std::unordered_map violation_counter; + // Keeps track of violation count against offending hosts. + std::unordered_map violation_counter; -// Keeps the graylisted hosts. -util::ttl_set graylist; + // Graylist mutex. + std::mutex graylist_mutex; -// Keeps the whitelisted hosts who would be ignored in all violation tracking. -std::unordered_set whitelist; + // Keeps the graylisted hosts. + util::ttl_set graylist; -/** + // Keeps the whitelisted hosts who would be ignored in all violation tracking. + std::unordered_set whitelist; + + /** * Report a violation. Violation means a force disconnection of a socket due to some threshold exceeding. */ -void report_violation(const std::string host) -{ - if (whitelist.find(host) != whitelist.end()) // Is in whitelist + void report_violation(const std::string host) { - LOG_DEBUG << host << " is whitelisted. Ignoring the violation."; - return; - } - - violation_stat &stat = violation_counter[host]; - - const uint64_t time_now = util::get_epoch_milliseconds(); - - stat.counter++; - - if (stat.timestamp == 0) - { - // Reset counter timestamp. - stat.timestamp = time_now; - } - else - { - // Check whether we have exceeded the threshold within the monitering interval. - const uint64_t elapsed_time = time_now - stat.timestamp; - if (elapsed_time <= VIOLATION_REFRESH_INTERVAL && stat.counter > VIOLATION_THRESHOLD) + if (whitelist.find(host) != whitelist.end()) // Is in whitelist { - // IP exceeded violation threshold. - - stat.timestamp = 0; - stat.counter = 0; - - graylist.emplace(host, VIOLATION_REFRESH_INTERVAL); - LOG_WARNING << host << " placed on graylist."; + LOG_DEBUG << host << " is whitelisted. Ignoring the violation."; + return; } - else if (elapsed_time > VIOLATION_REFRESH_INTERVAL) + + violation_stat &stat = violation_counter[host]; + + const uint64_t time_now = util::get_epoch_milliseconds(); + + stat.counter++; + + if (stat.timestamp == 0) { - // Start the counter fresh. + // Reset counter timestamp. stat.timestamp = time_now; - stat.counter = 1; + } + else + { + // Check whether we have exceeded the threshold within the monitering interval. + const uint64_t elapsed_time = time_now - stat.timestamp; + if (elapsed_time <= VIOLATION_REFRESH_INTERVAL && stat.counter > VIOLATION_THRESHOLD) + { + // IP exceeded violation threshold. + + stat.timestamp = 0; + stat.counter = 0; + std::scoped_lock gray_list_lock(graylist_mutex); + graylist.emplace(host, VIOLATION_REFRESH_INTERVAL); + LOG_WARNING << host << " placed on graylist."; + } + else if (elapsed_time > VIOLATION_REFRESH_INTERVAL) + { + // Start the counter fresh. + stat.timestamp = time_now; + stat.counter = 1; + } } } -} -void add_to_whitelist(const std::string host) -{ - // Add to whitelist and remove from all other offender lists. - whitelist.emplace(host); - graylist.erase(host); - violation_counter.erase(host); -} + void add_to_whitelist(const std::string host) + { + // Add to whitelist and remove from all other offender lists. + whitelist.emplace(host); + std::scoped_lock gray_list_lock(graylist_mutex); + graylist.erase(host); + violation_counter.erase(host); + } -void remove_from_whitelist(const std::string host) -{ - whitelist.erase(host); -} + void remove_from_whitelist(const std::string host) + { + whitelist.erase(host); + } -bool is_banned(const std::string &host) -{ - return graylist.exists(host); -} + bool is_banned(const std::string &host) + { + std::scoped_lock gray_list_lock(graylist_mutex); + return graylist.exists(host); + } } // namespace corebill \ No newline at end of file diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index 04b89c13..d2015746 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -275,12 +275,12 @@ namespace comm const uint64_t elapsed_time = time_now - t.timestamp; if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit) { - close(); + mark_for_closure(); t.timestamp = 0; t.counter_value = 0; - LOG_INFO << "Session " << uniqueid << " threshold exceeded. (type:" << threshold_type << " limit:" << t.threshold_limit << ")"; + LOG_INFO << "Session " << display_name() << " threshold exceeded. (type:" << threshold_type << " limit:" << t.threshold_limit << ")"; corebill::report_violation(host_address); } else if (elapsed_time > t.intervalms) diff --git a/src/p2p/peer_comm_server.cpp b/src/p2p/peer_comm_server.cpp index b43c5d73..e79f7096 100644 --- a/src/p2p/peer_comm_server.cpp +++ b/src/p2p/peer_comm_server.cpp @@ -74,7 +74,7 @@ namespace p2p if (conf::cfg.peermaxcons != 0) p2p::send_available_capacity_announcement(p2p::get_available_capacity()); - // Start peer list request loop is dynamic peer discovery is enabled. + // Start peer list request loop if dynamic peer discovery is enabled. if (conf::cfg.dynamicpeerdiscovery && known_remote_count > 0) { // If max known peer connection cap is reached then periodically request peer list from random known peer. @@ -166,6 +166,14 @@ namespace p2p { const std::string &host_address = std::get(host_result); p2p::peer_comm_session session(host_address, std::move(client), false, metric_thresholds); + + // Skip if this peer is banned due to corebill violations. + if (corebill::is_banned(host_address)) + { + LOG_DEBUG << "Skipping peer " << host_address << " from connecting. This peer is banned."; + continue; + } + session.known_ipport.emplace(peer.ip_port); known_remote_count++; diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index b43e0da3..4f307fce 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -48,6 +48,9 @@ namespace p2p // validate and handle each type of peer messages. int handle_peer_message(p2p::peer_comm_session &session, std::string_view message) { + // Adding message size to peer message characters(bytes) per minute counter. + session.increment_metric(comm::SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, message.size()); + const p2pmsg::Container *container; if (p2pmsg::validate_and_extract_container(&container, message) != 0) return 0; diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp index 96049a4c..72853efe 100644 --- a/src/usr/user_session_handler.cpp +++ b/src/usr/user_session_handler.cpp @@ -17,18 +17,27 @@ namespace usr */ int handle_user_connect(usr::user_comm_session &session) { - LOG_DEBUG << "User client connected " << session.display_name(); + // Allow connection only if the maximum capacity is not reached. 0 means allowing unlimited connections. + if ((conf::cfg.pubmaxcons == 0) || (usr::ctx.users.size() < conf::cfg.pubmaxcons)) + { + corebill::add_to_whitelist(session.host_address); + LOG_DEBUG << "User client connected " << session.display_name(); - // As soon as a user connects, we issue them a challenge message. We remember the - // challenge we issued and later verify the user's response with it. - std::vector msg; - jusrmsg::create_user_challenge(msg, session.issued_challenge); - session.send(msg); + // As soon as a user connects, we issue them a challenge message. We remember the + // challenge we issued and later verify the user's response with it. + std::vector msg; + jusrmsg::create_user_challenge(msg, session.issued_challenge); + session.send(msg); - // Set the challenge-issued value to true. - session.challenge_status = comm::CHALLENGE_ISSUED; - - return 0; + // Set the challenge-issued value to true. + session.challenge_status = comm::CHALLENGE_ISSUED; + return 0; + } + else + { + LOG_DEBUG << "Dropping the user connection. Maximum user capacity reached. Session: " << session.display_name() << " (limit: " << conf::cfg.pubmaxcons << ")."; + return -1; + } } /** @@ -36,6 +45,9 @@ namespace usr */ int handle_user_message(usr::user_comm_session &session, std::string_view message) { + // Adding message size to user message characters(bytes) per minute counter. + session.increment_metric(comm::SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, message.size()); + // First check whether this session is pending challenge. // Meaning we have previously issued a challenge to the client. if (session.challenge_status == comm::CHALLENGE_ISSUED) diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index aa93deaa..538a77dd 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -30,8 +30,8 @@ namespace usr int init() { metric_thresholds[0] = conf::cfg.pubmaxcpm; - metric_thresholds[1] = 0; - metric_thresholds[2] = 0; + metric_thresholds[1] = 0; // This metric doesn't apply to user context. + metric_thresholds[2] = 0; // This metric doesn't apply to user context. metric_thresholds[3] = conf::cfg.pubmaxbadmpm; // Start listening for incoming user connections.