diff --git a/CMakeLists.txt b/CMakeLists.txt index 3be708c7..2cbbc4f0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,7 +38,7 @@ add_executable(hpcore src/crypto.cpp src/conf.cpp src/hplog.cpp - src/bill/corebill.cpp + src/corebill/tracker.cpp src/hpfs/hpfs_mount.cpp src/hpfs/hpfs_serve.cpp src/hpfs/hpfs_sync.cpp @@ -57,11 +57,9 @@ add_executable(hpcore src/msg/usrmsg_parser.cpp src/p2p/peer_comm_server.cpp src/p2p/peer_comm_session.cpp - src/p2p/peer_session_handler.cpp src/p2p/self_node.cpp src/p2p/p2p.cpp src/usr/user_comm_session.cpp - src/usr/user_session_handler.cpp src/usr/input_nonce_map.cpp src/usr/usr.cpp src/usr/read_req.cpp diff --git a/src/bill/corebill.cpp b/src/bill/corebill.cpp deleted file mode 100644 index 53a77f36..00000000 --- a/src/bill/corebill.cpp +++ /dev/null @@ -1,91 +0,0 @@ -#include "../pchheader.hpp" -#include "../util/util.hpp" -#include "../util/ttl_set.hpp" -#include "../hplog.hpp" -#include "corebill.h" - -namespace corebill -{ - - // How many violations can occur for a host before being escalated. - constexpr uint32_t VIOLATION_THRESHOLD = 10; - - // Violation cooldown interval. - constexpr uint32_t VIOLATION_REFRESH_INTERVAL = 600 * 1000; // 10 minutes - - // Keeps track of violation count against offending hosts. - std::unordered_map violation_counter; - - // Graylist mutex. - std::mutex graylist_mutex; - - // Keeps the graylisted hosts. - util::ttl_set graylist; - - // Keeps the whitelisted hosts who would be ignored in all violation tracking. - std::unordered_set whitelist; - - /** - * Report a violation. Violation means a force disconnection of a socket due to some threshold exceeding. - */ - void report_violation(const std::string host) - { - if (whitelist.find(host) != whitelist.end()) // Is in whitelist - { - LOG_DEBUG << host << " is whitelisted. Ignoring the violation."; - return; - } - - violation_stat &stat = violation_counter[host]; - - const uint64_t time_now = util::get_epoch_milliseconds(); - - stat.counter++; - - if (stat.timestamp == 0) - { - // Reset counter timestamp. - stat.timestamp = time_now; - } - - // Check whether we have exceeded the threshold within the monitering interval. - const uint64_t elapsed_time = time_now - stat.timestamp; - if (elapsed_time <= VIOLATION_REFRESH_INTERVAL && stat.counter > VIOLATION_THRESHOLD) - { - // IP exceeded violation threshold. - - stat.timestamp = 0; - stat.counter = 0; - std::scoped_lock gray_list_lock(graylist_mutex); - graylist.emplace(host, VIOLATION_REFRESH_INTERVAL); - LOG_WARNING << host << " placed on graylist."; - } - else if (elapsed_time > VIOLATION_REFRESH_INTERVAL) - { - // Start the counter fresh. - stat.timestamp = time_now; - stat.counter = 1; - } - } - - void add_to_whitelist(const std::string host) - { - // Add to whitelist and remove from all other offender lists. - whitelist.emplace(host); - std::scoped_lock gray_list_lock(graylist_mutex); - graylist.erase(host); - violation_counter.erase(host); - } - - void remove_from_whitelist(const std::string host) - { - whitelist.erase(host); - } - - bool is_banned(const std::string &host) - { - std::scoped_lock gray_list_lock(graylist_mutex); - return graylist.exists(host); - } - -} // namespace corebill \ No newline at end of file diff --git a/src/bill/corebill.h b/src/bill/corebill.h deleted file mode 100644 index 8ac0bec3..00000000 --- a/src/bill/corebill.h +++ /dev/null @@ -1,24 +0,0 @@ -#ifndef _HP_COREBILL_ -#define _HP_COREBILL_ - -#include "../pchheader.hpp" - -namespace corebill -{ - -/** - * Keeps the violation counter and the timestamp of the monitoring window. - */ -struct violation_stat -{ - uint32_t counter = 0; - uint64_t timestamp = 0; -}; - -void report_violation(const std::string host); -void add_to_whitelist(const std::string host); -bool is_banned(const std::string &host); - -} // namespace corebill - -#endif \ No newline at end of file diff --git a/src/comm/comm_server.hpp b/src/comm/comm_server.hpp index be939ce3..3bffe279 100644 --- a/src/comm/comm_server.hpp +++ b/src/comm/comm_server.hpp @@ -4,7 +4,8 @@ #include "../pchheader.hpp" #include "../hplog.hpp" #include "../util/util.hpp" -#include "../bill/corebill.h" +#include "../corebill/corebill.hpp" +#include "../corebill/tracker.hpp" #include "hpws.hpp" #include "comm_session.hpp" @@ -57,6 +58,8 @@ namespace comm { util::sleep(100); + apply_ban_updates(); + // Accept any new incoming connection if available. check_for_new_connection(); @@ -106,6 +109,29 @@ namespace comm LOG_INFO << name << " listener stopped."; } + void apply_ban_updates() + { + corebill::ban_update b; + while (violation_tracker.ban_updates.try_dequeue(b)) + { + in_addr ia4 = {}; + in6_addr ia6 = {}; + + if (inet_pton((b.is_ipv4 ? AF_INET : AF_INET6), b.host.c_str(), (b.is_ipv4 ? (void *)&ia4 : (void *)&ia6)) == 1) + { + const uint32_t *addr = b.is_ipv4 ? (uint32_t *)&ia4.s_addr : ia6.__in6_u.__u6_addr32; + if (b.is_ban) + hpws_server->ban_ip(addr, b.ttl_sec, b.is_ipv4); + else + hpws_server->unban_ip(addr, b.is_ipv4); + } + else + { + LOG_ERROR << "Invalid host " << b.host << " in ban update."; + } + } + } + void check_for_new_connection() { if (listen_port == 0) @@ -134,18 +160,12 @@ namespace comm else { const std::string &host_address = std::get(host_result); - if (!corebill::is_banned(host_address)) - { - // We do not directly add to sessions list. We simply add to new_sessions list under a lock so the main server - // loop will take care of initialize the new sessions. This is because inherited classes (eg. peer_comm_server) - // need a way to safely inject new sessions from another thread. - std::scoped_lock lock(new_sessions_mutex); - new_sessions.emplace_back(host_address, std::move(client), true, metric_thresholds); - } - else - { - LOG_DEBUG << "Dropping " << name << " connection for banned host " << host_address; - } + + // We do not directly add to sessions list. We simply add to new_sessions list under a lock so the main server + // loop will take care of initialize the new sessions. This is because inherited classes (eg. peer_comm_server) + // need a way to safely inject new sessions from another thread. + std::scoped_lock lock(new_sessions_mutex); + new_sessions.emplace_back(this->violation_tracker, host_address, std::move(client), client.is_ipv4, true, metric_thresholds); } // If the hpws client object was not added to a session so far, in will get dstructed and the channel will close. @@ -181,7 +201,7 @@ namespace comm messages_processed = true; if (result == -1) - session.mark_for_closure(); + session.mark_for_closure(CLOSE_VIOLATION::VIOLATION_MSG_READ); } } } @@ -195,7 +215,7 @@ namespace comm messages_processed = true; if (result == -1) - session.mark_for_closure(); + session.mark_for_closure(CLOSE_VIOLATION::VIOLATION_MSG_READ); } } @@ -233,6 +253,8 @@ namespace comm } public: + corebill::tracker violation_tracker; + comm_server(std::string_view name, const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size, const uint64_t max_in_connections, const uint64_t max_in_connections_per_host, const bool use_priority_queues) : name(name), diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index 9ca23fad..45868c7b 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -2,7 +2,7 @@ #include "../hplog.hpp" #include "../util/util.hpp" #include "../conf.hpp" -#include "../bill/corebill.h" +#include "../corebill/tracker.hpp" #include "hpws.hpp" #include "comm_session.hpp" @@ -12,11 +12,13 @@ namespace comm constexpr uint32_t UNVERIFIED_INACTIVE_TIMEOUT = 5000; // Time threshold ms for unverified inactive connections. constexpr uint16_t MAX_IN_MSG_QUEUE_SIZE = 255; // Maximum in message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31).... - comm_session::comm_session( - std::string_view host_address, hpws::client &&hpws_client, const bool is_inbound, const uint64_t (&metric_thresholds)[5]) - : uniqueid(host_address), + comm_session::comm_session(corebill::tracker &violation_tracker, + std::string_view host_address, hpws::client &&hpws_client, const bool is_ipv4, const bool is_inbound, const uint64_t (&metric_thresholds)[5]) + : violation_tracker(violation_tracker), + uniqueid(host_address), host_address(host_address), hpws_client(std::move(hpws_client)), + is_ipv4(is_ipv4), is_inbound(is_inbound), in_msg_queue1(MAX_IN_MSG_QUEUE_SIZE), in_msg_queue2(MAX_IN_MSG_QUEUE_SIZE) @@ -81,7 +83,10 @@ namespace comm const int priority = get_message_priority(data); if (priority == 0) // priority 0 means a bad message. { - increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); + if (challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED) + increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); + else + should_disconnect = true; // Disconnect if we receive a bad message before challenge verification. } else if (priority == 1 || priority == 2) { @@ -110,9 +115,14 @@ namespace comm if (should_disconnect) { + // Report bad behaviour for connection drops occuring prior to challenge verification. + const CLOSE_VIOLATION reason = (challenge_status != comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED) + ? CLOSE_VIOLATION::VIOLATION_READ_ERROR + : CLOSE_VIOLATION::VIOLATION_NONE; + // Here we mark the session as needing to close. // The session will be properly "closed" and cleaned up by the global comm_server thread. - mark_for_closure(); + mark_for_closure(reason); break; } } @@ -242,12 +252,15 @@ namespace comm * Mark the session as needing to close. The session will be properly "closed" * and cleaned up by the global comm_server thread. */ - void comm_session::mark_for_closure() + void comm_session::mark_for_closure(const CLOSE_VIOLATION reason) { - if (state == SESSION_STATE::CLOSED) + if (state == SESSION_STATE::MUST_CLOSE || state == SESSION_STATE::CLOSED) return; state = SESSION_STATE::MUST_CLOSE; + + if (reason != CLOSE_VIOLATION::VIOLATION_NONE) + violation_tracker.report_violation(host_address, is_ipv4, std::to_string(reason)); } /** @@ -323,17 +336,16 @@ namespace comm t.timestamp = time_now; } - // Check whether we have exceeded the threshold within the monitering interval. + // Check whether we have exceeded the threshold within the monitoring interval. const uint64_t elapsed_time = time_now - t.timestamp; if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit) { - mark_for_closure(); t.timestamp = 0; t.counter_value = 0; LOG_INFO << "Session " << display_name() << " threshold exceeded. (type:" << threshold_type << " limit:" << t.threshold_limit << ")"; - corebill::report_violation(host_address); + mark_for_closure(CLOSE_VIOLATION::VIOLATION_THRESHOLD_EXCEEDED); } else if (elapsed_time > t.intervalms) { @@ -356,7 +368,7 @@ namespace comm if (util::get_epoch_milliseconds() - last_activity_timestamp >= timeout) { LOG_DEBUG << "Closing " << display_name() << " connection due to inactivity."; - mark_for_closure(); + mark_for_closure(CLOSE_VIOLATION::VIOLATION_INACTIVITY); } } diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp index 6c381a0f..76a6ae8e 100644 --- a/src/comm/comm_session.hpp +++ b/src/comm/comm_session.hpp @@ -3,12 +3,12 @@ #include "../pchheader.hpp" #include "../conf.hpp" +#include "../corebill/tracker.hpp" #include "hpws.hpp" #include "comm_session_threshold.hpp" namespace comm { - enum CHALLENGE_STATUS { NOT_ISSUED, @@ -24,12 +24,22 @@ namespace comm CLOSED // Session is fully closed. }; + enum CLOSE_VIOLATION + { + VIOLATION_NONE = 0, + VIOLATION_MSG_READ = 1, + VIOLATION_READ_ERROR = 2, + VIOLATION_THRESHOLD_EXCEEDED = 3, + VIOLATION_INACTIVITY = 4 + }; + /** * Represents an active WebSocket connection */ class comm_session { private: + corebill::tracker &violation_tracker; std::optional hpws_client; std::vector thresholds; // track down various communication thresholds @@ -53,14 +63,15 @@ namespace comm std::string uniqueid; // Verified session: Pubkey in hex format, Unverified session: IP address. std::string pubkey; // Pubkey in binary format. const bool is_inbound; + const bool is_ipv4; // Whether the host is ipv4 or ipv6. const std::string host_address; // Connection host address of the remote party. std::string issued_challenge; SESSION_STATE state = SESSION_STATE::NONE; CHALLENGE_STATUS challenge_status = CHALLENGE_STATUS::NOT_ISSUED; uint64_t last_activity_timestamp; // Keep track of the last activity timestamp in milliseconds. - comm_session( - std::string_view host_address, hpws::client &&hpws_client, const bool is_inbound, const uint64_t (&metric_thresholds)[5]); + comm_session(corebill::tracker &violation_tracker, + std::string_view host_address, hpws::client &&hpws_client, const bool is_ipv4, const bool is_inbound, const uint64_t (&metric_thresholds)[5]); int init(); int process_next_inbound_message(const uint16_t priority); int send(const std::vector &message, const uint16_t priority = 2); @@ -68,7 +79,7 @@ namespace comm int process_outbound_message(std::string_view message); void process_outbound_msg_queue(); void check_last_activity_rules(); - void mark_for_closure(); + void mark_for_closure(const CLOSE_VIOLATION reason = CLOSE_VIOLATION::VIOLATION_NONE); void close(); void mark_as_verified(); virtual const std::string display_name() const; diff --git a/src/comm/hpws.hpp b/src/comm/hpws.hpp index b6b63e4b..aa287bff 100644 --- a/src/comm/hpws.hpp +++ b/src/comm/hpws.hpp @@ -2,7 +2,6 @@ #define HPWS_INCLUDE #include #include -#include #include #include #include @@ -18,6 +17,7 @@ #include #include #include +#include #define DECODE_O_SIZE(control_msg, into) \ { \ @@ -84,6 +84,7 @@ namespace hpws pid_t child_pid, int buffer_fd[4], void *buffer[4]) : endpoint(endpoint), + is_ipv4(endpoint.sa.sa_family == AF_INET), max_buffer_size(max_buffer_size), child_pid(child_pid), get(get) { @@ -100,6 +101,8 @@ namespace hpws } public: + bool is_ipv4 = false; + // No copy constructor client(const client &) = delete; @@ -107,6 +110,7 @@ namespace hpws client(client &&old) : child_pid(old.child_pid), max_buffer_size(old.max_buffer_size), endpoint(old.endpoint), + is_ipv4(old.is_ipv4), get(old.get) { old.moved = true; @@ -152,9 +156,11 @@ namespace hpws const std::variant host_address() { char hostname[NI_MAXHOST]; - const int ret = getnameinfo((sockaddr *)&endpoint, sizeof(sockaddr), hostname, sizeof(hostname), NULL, 0, NI_NUMERICHOST); - if (ret != 0) - return error{10, gai_strerror(ret)}; + const char *ret = (endpoint.sa.sa_family == AF_INET) + ? inet_ntop(AF_INET, &endpoint.sin.sin_addr, hostname, NI_MAXHOST) + : inet_ntop(AF_INET6, &endpoint.sin6.sin6_addr, hostname, NI_MAXHOST); + if (!ret) + return error{10, "error in inet_ntop"}; return hostname; } @@ -605,6 +611,56 @@ namespace hpws return max_buffer_size_; } + void ban_ip(const uint32_t *addr, const uint32_t ttl_sec, const bool ipv4) + { + const size_t len = ipv4 ? 11 : 23; + char buf[len]; + buf[0] = 'i'; + buf[1] = '+'; + buf[2] = ipv4 ? '4' : '6'; + + uint32_t *addr_buf = (uint32_t *)&buf[3]; + if (ipv4) + { + addr_buf[0] = addr[0]; + } + else + { + addr_buf[0] = addr[0]; + addr_buf[1] = addr[2]; + addr_buf[2] = addr[3]; + addr_buf[3] = addr[4]; + } + + *(uint32_t *)&buf[len - 4] = ttl_sec; + + write(this->master_control_fd_, buf, len); + } + + void unban_ip(const uint32_t *addr, const bool ipv4) + { + const size_t len = ipv4 ? 7 : 19; + char buf[len]; + buf[0] = 'i'; + buf[1] = '-'; + buf[2] = ipv4 ? '4' : '6'; + + uint32_t *addr_buf = (uint32_t *)&buf[3]; + if (ipv4) + { + addr_buf[0] = addr[0]; + } + else + { + addr_buf[0] = addr[0]; + addr_buf[1] = addr[2]; + addr_buf[2] = addr[3]; + addr_buf[3] = addr[4]; + } + + write(this->master_control_fd_, buf, len); + } + std::variant accept(const bool no_block = false) { diff --git a/src/consensus.cpp b/src/consensus.cpp index 8b6d46e2..14c0c270 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -7,7 +7,6 @@ #include "msg/fbuf/p2pmsg_conversion.hpp" #include "msg/usrmsg_parser.hpp" #include "msg/usrmsg_common.hpp" -#include "p2p/peer_session_handler.hpp" #include "hplog.hpp" #include "crypto.hpp" #include "util/h32.hpp" diff --git a/src/corebill/corebill.hpp b/src/corebill/corebill.hpp new file mode 100644 index 00000000..16427946 --- /dev/null +++ b/src/corebill/corebill.hpp @@ -0,0 +1,28 @@ +#ifndef _HP_COREBILL_ +#define _HP_COREBILL_ + +#include "../pchheader.hpp" + +namespace corebill +{ + + /** + * Keeps the violation counter and the timestamp of the monitoring window. + */ + struct violation_stat + { + uint32_t counter = 0; + uint64_t timestamp = 0; + }; + + struct ban_update + { + bool is_ban = false; // Whether to ban or unban. + bool is_ipv4 = false; // If host is ipv4 or ipv6. + std::string host; + uint32_t ttl_sec; // Time in seconds to enforce the ban. Relevent only for bans. + }; + +} // namespace corebill + +#endif \ No newline at end of file diff --git a/src/corebill/tracker.cpp b/src/corebill/tracker.cpp new file mode 100644 index 00000000..4601be48 --- /dev/null +++ b/src/corebill/tracker.cpp @@ -0,0 +1,60 @@ +#include "../pchheader.hpp" +#include "corebill.hpp" +#include "tracker.hpp" +#include "../util/util.hpp" + +namespace corebill +{ + // How many violations can a host make within the refresh interval before being banned. + constexpr uint32_t VIOLATION_THRESHOLD = 5; + + // Violation cooldown interval. + constexpr uint32_t VIOLATION_REFRESH_INTERVAL = 600 * 1000; // 10 minutes + + // Ban period. + constexpr uint32_t BAN_TTL_SEC = 600; // 10 minutes. + + /** + * Report a violation. Violation means the connection has displayed a bad behaviour. + * When multiple violations occur within a time window, we ban that host from connecting again for a certain duration. + */ + void tracker::report_violation(const std::string &host, const bool ipv4, const std::string &reason) + { + std::scoped_lock lock(ban_mutex); + + violation_stat &stat = violation_counter[host]; + const uint64_t time_now = util::get_epoch_milliseconds(); + + LOG_INFO << "Reported violation '" << reason << "' from " << host; + + // Check whether we have exceeded the violation threshold within the time window. + const uint64_t elapsed_time = time_now - stat.timestamp; + if (elapsed_time <= VIOLATION_REFRESH_INTERVAL && (stat.counter + 1) > VIOLATION_THRESHOLD) + { + violation_counter.erase(host); + + // IP exceeded violation threshold. We must ban the host. + LOG_WARNING << "Banning " << host << " for " << BAN_TTL_SEC << "s"; + ban_updates.enqueue(ban_update{true, ipv4, host, BAN_TTL_SEC}); // Inform hpws about the ban. + banned_hosts.emplace(host, BAN_TTL_SEC * 1000); // Add to local ban list to cross-check outgoing connections. + return; + } + + if (stat.timestamp == 0 || elapsed_time > VIOLATION_REFRESH_INTERVAL) + { + // Start the counter fresh. + stat.timestamp = time_now; + stat.counter = 1; + } + else + { + stat.counter++; + } + } + + bool tracker::is_banned(const std::string &host) + { + std::scoped_lock lock(ban_mutex); + return banned_hosts.exists(host); + } +} \ No newline at end of file diff --git a/src/corebill/tracker.hpp b/src/corebill/tracker.hpp new file mode 100644 index 00000000..ae164248 --- /dev/null +++ b/src/corebill/tracker.hpp @@ -0,0 +1,25 @@ +#ifndef _HP_COREBILL_TRACKER_ +#define _HP_COREBILL_TRACKER_ + +#include "../pchheader.hpp" +#include "../util/ttl_set.hpp" +#include "corebill.hpp" + +namespace corebill +{ + class tracker + { + private: + // Keeps track of violation count against offending hosts. + std::unordered_map violation_counter; + util::ttl_set banned_hosts; + std::mutex ban_mutex; + + public: + moodycamel::ConcurrentQueue ban_updates; + void report_violation(const std::string &host, const bool ipv4, const std::string &reason); + bool is_banned(const std::string &host); + }; +} + +#endif \ No newline at end of file diff --git a/src/msg/fbuf/p2pmsg_conversion.cpp b/src/msg/fbuf/p2pmsg_conversion.cpp index 12ede0cc..f250c667 100644 --- a/src/msg/fbuf/p2pmsg_conversion.cpp +++ b/src/msg/fbuf/p2pmsg_conversion.cpp @@ -17,8 +17,6 @@ namespace msg::fbuf::p2pmsg /** * This section contains Flatbuffer message reading/writing helpers. - * These helpers are mainly used by peer_session_handler and other components which sends outgoing p2p messages. - * * A p2p flatbuffer message is a bucket with hp version and the message 'content'. */ diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 534d13ac..ab5dc95e 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -10,7 +10,6 @@ #include "../msg/fbuf/p2pmsg_generated.h" #include "peer_comm_server.hpp" #include "peer_comm_session.hpp" -#include "peer_session_handler.hpp" namespace p2p { diff --git a/src/p2p/peer_comm_server.cpp b/src/p2p/peer_comm_server.cpp index a68e2faf..ac9e569c 100644 --- a/src/p2p/peer_comm_server.cpp +++ b/src/p2p/peer_comm_server.cpp @@ -198,12 +198,12 @@ namespace p2p else { const std::string &host_address = std::get(host_result); - p2p::peer_comm_session session(host_address, std::move(client), false, metric_thresholds); + p2p::peer_comm_session session(this->violation_tracker, host_address, std::move(client), client.is_ipv4, false, metric_thresholds); // Skip if this peer is banned due to corebill violations. - if (corebill::is_banned(host_address)) + if (violation_tracker.is_banned(host_address)) { - LOG_DEBUG << "Skipping peer " << host_address << " from connecting. This peer is banned."; + LOG_DEBUG << "Skipping connecting to banned peer " << host_address; continue; } diff --git a/src/p2p/peer_comm_session.cpp b/src/p2p/peer_comm_session.cpp index 93054596..6bfcd152 100644 --- a/src/p2p/peer_comm_session.cpp +++ b/src/p2p/peer_comm_session.cpp @@ -1,32 +1,317 @@ #include "../pchheader.hpp" +#include "../util/rollover_hashset.hpp" +#include "../msg/fbuf/p2pmsg_generated.h" +#include "../msg/fbuf/p2pmsg_conversion.hpp" +#include "../msg/fbuf/common_helpers.hpp" +#include "../crypto.hpp" +#include "../sc/hpfs_log_sync.hpp" +#include "../sc/sc.hpp" +#include "../ledger/ledger.hpp" #include "peer_comm_session.hpp" -#include "peer_session_handler.hpp" + +namespace p2pmsg = msg::fbuf::p2pmsg; namespace p2p { + // Max size of messages which are subjected to duplicate message check. + constexpr size_t MAX_SIZE_FOR_DUP_CHECK = 1 * 1024 * 1024; // 1 MB + + // The set of recent peer message hashes used for duplicate detection. + util::rollover_hashset recent_peermsg_hashes(200); + + /** + * This gets hit every time a peer connects to HP via the peer port (configured in config). + * @param session connected session. + * @return returns 0 if connection is successful and peer challenge is sent otherwise, -1. + */ int peer_comm_session::handle_connect() { - return p2p::handle_peer_connect(*this); + // Skip new inbound connection if max inbound connection cap is reached. + if (is_inbound && calculate_available_capacity() == 0) + { + LOG_DEBUG << "Max peer connection cap reached. Rejecting new peer connection [" << display_name() << "]"; + return -1; + } + + // Send peer challenge. + flatbuffers::FlatBufferBuilder fbuf; + p2pmsg::create_msg_from_peer_challenge(fbuf, issued_challenge); + std::string_view msg = std::string_view( + reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); + send(msg); + challenge_status = comm::CHALLENGE_STATUS::CHALLENGE_ISSUED; + return 0; } + /** + * Returns the priority that should be assigned to the message. + * @return 0 if bad message. 1 or 2 if correct priority was assigned. + */ int peer_comm_session::get_message_priority(std::string_view msg) { - return p2p::get_message_priority(msg); + if (!p2pmsg::verify_peer_message(msg)) + { + LOG_DEBUG << "Flatbuffer verify: Bad peer message."; + return 0; + } + + const auto p2p_msg = p2pmsg::GetP2PMsg(msg.data()); + const msg::fbuf::p2pmsg::P2PMsgContent type = p2p_msg->content_type(); + + if (type == p2pmsg::P2PMsgContent_ProposalMsg || type == p2pmsg::P2PMsgContent_NonUnlProposalMsg) + return 1; // High priority + else + return 2; // Low priority } + /** + * Peer session on message callback method. Validate and handle each type of peer messages. + * @return 0 on normal execution. -1 when session needs to be closed as a result of message handling. + */ int peer_comm_session::handle_message(std::string_view msg) { - return p2p::handle_peer_message(*this, msg); + const size_t message_size = msg.size(); + // Adding message size to peer message characters(bytes) per minute counter. + increment_metric(comm::SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, message_size); + + const peer_message_info mi = p2pmsg::get_peer_message_info(msg, this); + if (!mi.p2p_msg) // Message buffer will be null if peer message was too old. + return 0; + + // Messages larger than the duplicate message threshold is ignored from the duplicate message check + // due to the overhead in hash generation for larger messages. + if (message_size <= MAX_SIZE_FOR_DUP_CHECK && !recent_peermsg_hashes.try_emplace(crypto::get_hash(msg))) + { + increment_metric(comm::SESSION_THRESHOLDS::MAX_DUPMSGS_PER_MINUTE, 1); + LOG_DEBUG << "Duplicate peer message. type:" << mi.type << " from:" << display_name(); + return 0; + } + + // Check whether the message is qualified for message forwarding. + if (p2p::validate_for_peer_msg_forwarding(*this, mi.type, mi.originated_on)) + { + // Npl messages and consensus proposals are forwarded only to unl nodes if relavent flags (npl and consensus) are set to private. + // If consensus and npl flags are public, these messages are forward to all the connected nodes. + const bool unl_only = (!conf::cfg.contract.is_npl_public && mi.type == p2pmsg::P2PMsgContent_NplMsg) || + (!conf::cfg.contract.is_consensus_public && mi.type == p2pmsg::P2PMsgContent_ProposalMsg); + if (need_consensus_msg_forwarding) + { + // Forward messages received by weakly connected nodes to other peers. + p2p::broadcast_message(msg, false, false, unl_only, this); + } + else + { + // Forward message received from other nodes to weakly connected peers. + p2p::broadcast_message(msg, false, true, unl_only, this); + } + } + + if (mi.type == p2pmsg::P2PMsgContent_PeerChallengeMsg) + { + const p2p::peer_challenge chall = p2pmsg::create_peer_challenge_from_msg(mi); + + // Check whether contract ids match. + if (chall.contract_id != conf::cfg.contract.id) + { + LOG_ERROR << "Contract id mismatch. Dropping connection " << display_name(); + return -1; + } + + // Remember the time config reported by this peer. + reported_time_config = chall.time_config; + + // Whether this node is a full history node or not. + is_full_history = chall.is_full_history; + + // Sending the challenge response to the sender. + flatbuffers::FlatBufferBuilder fbuf; + p2pmsg::create_peer_challenge_response_from_challenge(fbuf, chall.challenge); + return send(msg::fbuf::builder_to_string_view(fbuf)); + } + else if (mi.type == p2pmsg::P2PMsgContent_PeerChallengeResponseMsg) + { + // Ignore if challenge is already resolved. + if (challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_ISSUED) + return p2p::resolve_peer_challenge(*this, p2pmsg::create_peer_challenge_response_from_msg(mi)); + } + + if (challenge_status != comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED) + { + LOG_DEBUG << "Cannot accept messages. Peer challenge unresolved. " << display_name(); + return 0; + } + + if (mi.type == p2pmsg::P2PMsgContent_PeerListResponseMsg) + { + const std::vector merge_peers = p2pmsg::create_peer_list_response_from_msg(mi); + p2p::merge_peer_list("Peer_Discovery", &merge_peers, NULL, this); + } + else if (mi.type == p2pmsg::P2PMsgContent_PeerListRequestMsg) + { + p2p::send_known_peer_list(this); + } + else if (mi.type == p2pmsg::P2PMsgContent_PeerCapacityAnnouncementMsg) + { + if (known_ipport.has_value()) + { + const p2p::peer_capacity_announcement ann = p2pmsg::create_peer_capacity_announcement_from_msg(mi); + p2p::update_known_peer_available_capacity(known_ipport.value(), ann.available_capacity, ann.timestamp); + } + } + else if (mi.type == p2pmsg::P2PMsgContent_PeerRequirementAnnouncementMsg) + { + const p2p::peer_requirement_announcement ann = p2pmsg::create_peer_requirement_announcement_from_msg(mi); + need_consensus_msg_forwarding = ann.need_consensus_msg_forwarding; + LOG_DEBUG << "Peer requirement: " << display_name() << " consensus msg forwarding:" << ann.need_consensus_msg_forwarding; + } + else if (mi.type == p2pmsg::P2PMsgContent_NonUnlProposalMsg) + { + handle_nonunl_proposal_message(p2pmsg::create_nonunl_proposal_from_msg(mi)); + } + else if (mi.type == p2pmsg::P2PMsgContent_ProposalMsg) + { + const util::h32 hash = p2pmsg::verify_proposal_msg_trust(mi); + if (hash == util::h32_empty) + { + increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); + LOG_DEBUG << "Proposal rejected due to trust failure. " << display_name(); + return 0; + } + + handle_proposal_message(p2pmsg::create_proposal_from_msg(mi, hash)); + } + else if (mi.type == p2pmsg::P2PMsgContent_NplMsg) + { + if (!p2pmsg::verify_npl_msg_trust(mi)) + { + increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); + LOG_DEBUG << "Npl message rejected due to trust failure. " << display_name(); + return 0; + } + + handle_npl_message(p2pmsg::create_npl_from_msg(mi)); + } + else if (mi.type == p2pmsg::P2PMsgContent_HpfsRequestMsg) + { + const p2p::hpfs_request hr = p2pmsg::create_hpfs_request_from_msg(mi); + if (hr.mount_id == sc::contract_fs.mount_id) + { + // Check the cap and insert request with lock. + std::scoped_lock lock(ctx.collected_msgs.contract_hpfs_requests_mutex); + + // If max number of state requests reached skip the rest. + if (ctx.collected_msgs.contract_hpfs_requests.size() < p2p::HPFS_REQ_LIST_CAP) + ctx.collected_msgs.contract_hpfs_requests.push_back(std::make_pair(pubkey, std::move(hr))); + else + LOG_DEBUG << "Hpfs contract fs request rejected. Maximum hpfs contract fs request count reached. " << display_name(); + } + else if (hr.mount_id == ledger::ledger_fs.mount_id) + { + // Check the cap and insert request with lock. + std::scoped_lock lock(ctx.collected_msgs.ledger_hpfs_requests_mutex); + + // If max number of state requests reached skip the rest. + if (ctx.collected_msgs.ledger_hpfs_requests.size() < p2p::HPFS_REQ_LIST_CAP) + ctx.collected_msgs.ledger_hpfs_requests.push_back(std::make_pair(pubkey, std::move(hr))); + else + LOG_DEBUG << "Hpfs ledger fs request rejected. Maximum hpfs ledger fs request count reached. " << display_name(); + } + } + else if (mi.type == p2pmsg::P2PMsgContent_HpfsResponseMsg) + { + const p2pmsg::HpfsResponseMsg &resp_msg = *mi.p2p_msg->content_as_HpfsResponseMsg(); + + // Only accept hpfs responses if hpfs fs is syncing. + if (sc::contract_sync_worker.is_syncing && resp_msg.mount_id() == sc::contract_fs.mount_id) + { + // Check the cap and insert state_response with lock. + std::scoped_lock lock(ctx.collected_msgs.contract_hpfs_responses_mutex); + + // If max number of state responses reached skip the rest. + if (ctx.collected_msgs.contract_hpfs_responses.size() < p2p::HPFS_RES_LIST_CAP) + ctx.collected_msgs.contract_hpfs_responses.push_back(std::make_pair(uniqueid, std::string(msg))); + else + LOG_DEBUG << "Contract hpfs response rejected. Maximum response count reached. " << display_name(); + } + else if (ledger::ledger_sync_worker.is_syncing && resp_msg.mount_id() == ledger::ledger_fs.mount_id) + { + // Check the cap and insert state_response with lock. + std::scoped_lock lock(ctx.collected_msgs.ledger_hpfs_responses_mutex); + + // If max number of state responses reached skip the rest. + if (ctx.collected_msgs.ledger_hpfs_responses.size() < p2p::HPFS_RES_LIST_CAP) + ctx.collected_msgs.ledger_hpfs_responses.push_back(std::make_pair(uniqueid, std::string(msg))); + else + LOG_DEBUG << "Ledger hpfs response rejected. Maximum response count reached. " << display_name(); + } + } + else if (mi.type == p2pmsg::P2PMsgContent_HpfsLogRequest) + { + if (conf::cfg.node.history == conf::HISTORY::FULL) + { + // Check the cap and insert log record request with lock. + std::scoped_lock lock(ctx.collected_msgs.hpfs_log_request_mutex); + + // If max number of log record requests reached, skip the rest. + if (ctx.collected_msgs.hpfs_log_requests.size() < p2p::LOG_RECORD_REQ_LIST_CAP) + { + const p2p::hpfs_log_request hpfs_log_request = p2pmsg::create_hpfs_log_request_from_msg(mi); + ctx.collected_msgs.hpfs_log_requests.push_back(std::make_pair(uniqueid, std::move(hpfs_log_request))); + } + else + LOG_DEBUG << "Hpfs log request rejected. Maximum request count reached. " << display_name(); + } + } + else if (mi.type == p2pmsg::P2PMsgContent_HpfsLogResponse) + { + if (conf::cfg.node.history == conf::HISTORY::FULL && sc::hpfs_log_sync::sync_ctx.is_syncing) + { + // Check the cap and insert log record response with lock. + std::scoped_lock lock(ctx.collected_msgs.hpfs_log_response_mutex); + + // If max number of log record responses reached, skip the rest. + if (ctx.collected_msgs.hpfs_log_responses.size() < p2p::LOG_RECORD_RES_LIST_CAP) + { + const p2p::hpfs_log_response hpfs_log_response = p2pmsg::create_hpfs_log_response_from_msg(mi); + ctx.collected_msgs.hpfs_log_responses.push_back(std::make_pair(uniqueid, std::move(hpfs_log_response))); + } + else + LOG_DEBUG << "Hpfs log response rejected. Maximum response count reached. " << display_name(); + } + } + else + { + increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); + LOG_DEBUG << "Received invalid peer message type [" << mi.type << "]. " << display_name(); + } + return 0; } void peer_comm_session::handle_close() { - p2p::handle_peer_close(*this); + { + // Erase the corresponding pubkey peer connection if it's this session. + std::scoped_lock lock(ctx.peer_connections_mutex); + const auto itr = ctx.peer_connections.find(pubkey); + if (itr != ctx.peer_connections.end() && itr->second == this) + { + ctx.peer_connections.erase(itr); + } + } + + // Update peer properties to default on peer close. + if (known_ipport.has_value()) + p2p::update_known_peer_available_capacity(known_ipport.value(), -1, 0); } + /** + * Logic related to peer sessions on verfied is invoked here. + */ void peer_comm_session::handle_on_verified() { - p2p::handle_peer_on_verified(*this); + // Sending newly verified node the requirement of consensus msg fowarding if this node is weakly connected. + if (status::get_weakly_connected()) + p2p::send_peer_requirement_announcement(true, this); } } // namespace p2p \ No newline at end of file diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp deleted file mode 100644 index a2c604c7..00000000 --- a/src/p2p/peer_session_handler.cpp +++ /dev/null @@ -1,342 +0,0 @@ -#include "../pchheader.hpp" -#include "../conf.hpp" -#include "../consensus.hpp" -#include "../crypto.hpp" -#include "../util/util.hpp" -#include "../util/rollover_hashset.hpp" -#include "../hplog.hpp" -#include "../msg/fbuf/p2pmsg_generated.h" -#include "../msg/fbuf/p2pmsg_conversion.hpp" -#include "../msg/fbuf/common_helpers.hpp" -#include "../ledger/ledger.hpp" -#include "peer_comm_session.hpp" -#include "p2p.hpp" -#include "../unl.hpp" -#include "../sc/hpfs_log_sync.hpp" -#include "../status.hpp" - -namespace p2pmsg = msg::fbuf::p2pmsg; - -namespace p2p -{ - // Max size of messages which are subjected to duplicate message check. - constexpr size_t MAX_SIZE_FOR_DUP_CHECK = 1 * 1024 * 1024; // 1 MB - - // The set of recent peer message hashes used for duplicate detection. - util::rollover_hashset recent_peermsg_hashes(200); - - /** - * This gets hit every time a peer connects to HP via the peer port (configured in config). - * @param session connected session. - * @return returns 0 if connection is successful and peer challenge is sent otherwise, -1. - */ - int handle_peer_connect(p2p::peer_comm_session &session) - { - // Skip new inbound connection if max inbound connection cap is reached. - if (session.is_inbound && calculate_available_capacity() == 0) - { - LOG_DEBUG << "Max peer connection cap reached. Rejecting new peer connection [" << session.display_name() << "]"; - return -1; - } - - // Send peer challenge. - flatbuffers::FlatBufferBuilder fbuf; - p2pmsg::create_msg_from_peer_challenge(fbuf, session.issued_challenge); - std::string_view msg = std::string_view( - reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); - session.send(msg); - session.challenge_status = comm::CHALLENGE_ISSUED; - return 0; - } - - /** - * Returns the priority that should be assigned to the message. - * @return 0 if bad message. 1 or 2 if correct priority was assigned. - */ - int get_message_priority(std::string_view message) - { - if (!p2pmsg::verify_peer_message(message)) - { - LOG_DEBUG << "Flatbuffer verify: Bad peer message."; - return 0; - } - - const auto p2p_msg = p2pmsg::GetP2PMsg(message.data()); - const msg::fbuf::p2pmsg::P2PMsgContent type = p2p_msg->content_type(); - - if (type == p2pmsg::P2PMsgContent_ProposalMsg || type == p2pmsg::P2PMsgContent_NonUnlProposalMsg) - return 1; // High priority - else - return 2; // Low priority - } - - /** - * Peer session on message callback method. Validate and handle each type of peer messages. - * @return 0 on normal execution. -1 when session needs to be closed as a result of message handling. - */ - int handle_peer_message(p2p::peer_comm_session &session, std::string_view message) - { - const size_t message_size = message.size(); - // Adding message size to peer message characters(bytes) per minute counter. - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, message_size); - - const peer_message_info mi = p2pmsg::get_peer_message_info(message, &session); - if (!mi.p2p_msg) // Message buffer will be null if peer message was too old. - return 0; - - // Messages larger than the duplicate message threshold is ignored from the duplicate message check - // due to the overhead in hash generation for larger messages. - if (message_size <= MAX_SIZE_FOR_DUP_CHECK && !recent_peermsg_hashes.try_emplace(crypto::get_hash(message))) - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_DUPMSGS_PER_MINUTE, 1); - LOG_DEBUG << "Duplicate peer message. type:" << mi.type << " from:" << session.display_name(); - return 0; - } - - // Check whether the message is qualified for message forwarding. - if (p2p::validate_for_peer_msg_forwarding(session, mi.type, mi.originated_on)) - { - // Npl messages and consensus proposals are forwarded only to unl nodes if relavent flags (npl and consensus) are set to private. - // If consensus and npl flags are public, these messages are forward to all the connected nodes. - const bool unl_only = (!conf::cfg.contract.is_npl_public && mi.type == p2pmsg::P2PMsgContent_NplMsg) || - (!conf::cfg.contract.is_consensus_public && mi.type == p2pmsg::P2PMsgContent_ProposalMsg); - if (session.need_consensus_msg_forwarding) - { - // Forward messages received by weakly connected nodes to other peers. - p2p::broadcast_message(message, false, false, unl_only, &session); - } - else - { - // Forward message received from other nodes to weakly connected peers. - p2p::broadcast_message(message, false, true, unl_only, &session); - } - } - - if (mi.type == p2pmsg::P2PMsgContent_PeerChallengeMsg) - { - const p2p::peer_challenge chall = p2pmsg::create_peer_challenge_from_msg(mi); - - // Check whether contract ids match. - if (chall.contract_id != conf::cfg.contract.id) - { - LOG_ERROR << "Contract id mismatch. Dropping connection " << session.display_name(); - return -1; - } - - // Remember the time config reported by this peer. - session.reported_time_config = chall.time_config; - - // Whether this node is a full history node or not. - session.is_full_history = chall.is_full_history; - - // Sending the challenge response to the sender. - flatbuffers::FlatBufferBuilder fbuf; - p2pmsg::create_peer_challenge_response_from_challenge(fbuf, chall.challenge); - return session.send(msg::fbuf::builder_to_string_view(fbuf)); - } - else if (mi.type == p2pmsg::P2PMsgContent_PeerChallengeResponseMsg) - { - // Ignore if challenge is already resolved. - if (session.challenge_status == comm::CHALLENGE_ISSUED) - return p2p::resolve_peer_challenge(session, p2pmsg::create_peer_challenge_response_from_msg(mi)); - } - - if (session.challenge_status != comm::CHALLENGE_VERIFIED) - { - LOG_DEBUG << "Cannot accept messages. Peer challenge unresolved. " << session.display_name(); - return 0; - } - - if (mi.type == p2pmsg::P2PMsgContent_PeerListResponseMsg) - { - const std::vector merge_peers = p2pmsg::create_peer_list_response_from_msg(mi); - p2p::merge_peer_list("Peer_Discovery", &merge_peers, NULL, &session); - } - else if (mi.type == p2pmsg::P2PMsgContent_PeerListRequestMsg) - { - p2p::send_known_peer_list(&session); - } - else if (mi.type == p2pmsg::P2PMsgContent_PeerCapacityAnnouncementMsg) - { - if (session.known_ipport.has_value()) - { - const p2p::peer_capacity_announcement ann = p2pmsg::create_peer_capacity_announcement_from_msg(mi); - p2p::update_known_peer_available_capacity(session.known_ipport.value(), ann.available_capacity, ann.timestamp); - } - } - else if (mi.type == p2pmsg::P2PMsgContent_PeerRequirementAnnouncementMsg) - { - const p2p::peer_requirement_announcement ann = p2pmsg::create_peer_requirement_announcement_from_msg(mi); - session.need_consensus_msg_forwarding = ann.need_consensus_msg_forwarding; - LOG_DEBUG << "Peer requirement: " << session.display_name() << " consensus msg forwarding:" << ann.need_consensus_msg_forwarding; - } - else if (mi.type == p2pmsg::P2PMsgContent_NonUnlProposalMsg) - { - handle_nonunl_proposal_message(p2pmsg::create_nonunl_proposal_from_msg(mi)); - } - else if (mi.type == p2pmsg::P2PMsgContent_ProposalMsg) - { - const util::h32 hash = p2pmsg::verify_proposal_msg_trust(mi); - if (hash == util::h32_empty) - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); - LOG_DEBUG << "Proposal rejected due to trust failure. " << session.display_name(); - return 0; - } - - handle_proposal_message(p2pmsg::create_proposal_from_msg(mi, hash)); - } - else if (mi.type == p2pmsg::P2PMsgContent_NplMsg) - { - if (!p2pmsg::verify_npl_msg_trust(mi)) - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); - LOG_DEBUG << "Npl message rejected due to trust failure. " << session.display_name(); - return 0; - } - - handle_npl_message(p2pmsg::create_npl_from_msg(mi)); - } - else if (mi.type == p2pmsg::P2PMsgContent_HpfsRequestMsg) - { - const p2p::hpfs_request hr = p2pmsg::create_hpfs_request_from_msg(mi); - if (hr.mount_id == sc::contract_fs.mount_id) - { - // Check the cap and insert request with lock. - std::scoped_lock lock(ctx.collected_msgs.contract_hpfs_requests_mutex); - - // If max number of state requests reached skip the rest. - if (ctx.collected_msgs.contract_hpfs_requests.size() < p2p::HPFS_REQ_LIST_CAP) - ctx.collected_msgs.contract_hpfs_requests.push_back(std::make_pair(session.pubkey, std::move(hr))); - else - LOG_DEBUG << "Hpfs contract fs request rejected. Maximum hpfs contract fs request count reached. " << session.display_name(); - } - else if (hr.mount_id == ledger::ledger_fs.mount_id) - { - // Check the cap and insert request with lock. - std::scoped_lock lock(ctx.collected_msgs.ledger_hpfs_requests_mutex); - - // If max number of state requests reached skip the rest. - if (ctx.collected_msgs.ledger_hpfs_requests.size() < p2p::HPFS_REQ_LIST_CAP) - ctx.collected_msgs.ledger_hpfs_requests.push_back(std::make_pair(session.pubkey, std::move(hr))); - else - LOG_DEBUG << "Hpfs ledger fs request rejected. Maximum hpfs ledger fs request count reached. " << session.display_name(); - } - } - else if (mi.type == p2pmsg::P2PMsgContent_HpfsResponseMsg) - { - const p2pmsg::HpfsResponseMsg &resp_msg = *mi.p2p_msg->content_as_HpfsResponseMsg(); - - // Only accept hpfs responses if hpfs fs is syncing. - if (sc::contract_sync_worker.is_syncing && resp_msg.mount_id() == sc::contract_fs.mount_id) - { - // Check the cap and insert state_response with lock. - std::scoped_lock lock(ctx.collected_msgs.contract_hpfs_responses_mutex); - - // If max number of state responses reached skip the rest. - if (ctx.collected_msgs.contract_hpfs_responses.size() < p2p::HPFS_RES_LIST_CAP) - ctx.collected_msgs.contract_hpfs_responses.push_back(std::make_pair(session.uniqueid, std::string(message))); - else - LOG_DEBUG << "Contract hpfs response rejected. Maximum response count reached. " << session.display_name(); - } - else if (ledger::ledger_sync_worker.is_syncing && resp_msg.mount_id() == ledger::ledger_fs.mount_id) - { - // Check the cap and insert state_response with lock. - std::scoped_lock lock(ctx.collected_msgs.ledger_hpfs_responses_mutex); - - // If max number of state responses reached skip the rest. - if (ctx.collected_msgs.ledger_hpfs_responses.size() < p2p::HPFS_RES_LIST_CAP) - ctx.collected_msgs.ledger_hpfs_responses.push_back(std::make_pair(session.uniqueid, std::string(message))); - else - LOG_DEBUG << "Ledger hpfs response rejected. Maximum response count reached. " << session.display_name(); - } - } - else if (mi.type == p2pmsg::P2PMsgContent_HpfsLogRequest) - { - if (conf::cfg.node.history == conf::HISTORY::FULL) - { - // Check the cap and insert log record request with lock. - std::scoped_lock lock(ctx.collected_msgs.hpfs_log_request_mutex); - - // If max number of log record requests reached, skip the rest. - if (ctx.collected_msgs.hpfs_log_requests.size() < p2p::LOG_RECORD_REQ_LIST_CAP) - { - const p2p::hpfs_log_request hpfs_log_request = p2pmsg::create_hpfs_log_request_from_msg(mi); - ctx.collected_msgs.hpfs_log_requests.push_back(std::make_pair(session.uniqueid, std::move(hpfs_log_request))); - } - else - LOG_DEBUG << "Hpfs log request rejected. Maximum request count reached. " << session.display_name(); - } - } - else if (mi.type == p2pmsg::P2PMsgContent_HpfsLogResponse) - { - if (conf::cfg.node.history == conf::HISTORY::FULL && sc::hpfs_log_sync::sync_ctx.is_syncing) - { - // Check the cap and insert log record response with lock. - std::scoped_lock lock(ctx.collected_msgs.hpfs_log_response_mutex); - - // If max number of log record responses reached, skip the rest. - if (ctx.collected_msgs.hpfs_log_responses.size() < p2p::LOG_RECORD_RES_LIST_CAP) - { - const p2p::hpfs_log_response hpfs_log_response = p2pmsg::create_hpfs_log_response_from_msg(mi); - ctx.collected_msgs.hpfs_log_responses.push_back(std::make_pair(session.uniqueid, std::move(hpfs_log_response))); - } - else - LOG_DEBUG << "Hpfs log response rejected. Maximum response count reached. " << session.display_name(); - } - } - else - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); - LOG_DEBUG << "Received invalid peer message type [" << mi.type << "]. " << session.display_name(); - } - return 0; - } - - /** - * Handles messages that we receive from ourselves. - */ - int handle_self_message(std::string_view message) - { - const peer_message_info mi = p2pmsg::get_peer_message_info(message); - - if (mi.type == p2pmsg::P2PMsgContent_ProposalMsg) - handle_proposal_message(p2pmsg::create_proposal_from_msg(mi, hash_proposal_msg(*mi.p2p_msg->content_as_ProposalMsg()))); - else if (mi.type == p2pmsg::P2PMsgContent_NonUnlProposalMsg) - handle_nonunl_proposal_message(p2pmsg::create_nonunl_proposal_from_msg(mi)); - else if (mi.type == p2pmsg::P2PMsgContent_NplMsg) - handle_npl_message(p2pmsg::create_npl_from_msg(mi)); - - return 0; - } - - //peer session on message callback method - int handle_peer_close(const p2p::peer_comm_session &session) - { - { - // Erase the corresponding pubkey peer connection if it's this session. - std::scoped_lock lock(ctx.peer_connections_mutex); - const auto itr = ctx.peer_connections.find(session.pubkey); - if (itr != ctx.peer_connections.end() && itr->second == &session) - { - ctx.peer_connections.erase(itr); - } - } - - // Update peer properties to default on peer close. - if (session.known_ipport.has_value()) - p2p::update_known_peer_available_capacity(session.known_ipport.value(), -1, 0); - - return 0; - } - - /** - * Logic related to peer sessions on verfied is invoked here. - */ - void handle_peer_on_verified(p2p::peer_comm_session &session) - { - // Sending newly verified node the requirement of consensus msg fowarding if this node is weakly connected. - if (status::get_weakly_connected()) - p2p::send_peer_requirement_announcement(true, &session); - } -} // namespace p2p \ No newline at end of file diff --git a/src/p2p/peer_session_handler.hpp b/src/p2p/peer_session_handler.hpp deleted file mode 100644 index 3757275c..00000000 --- a/src/p2p/peer_session_handler.hpp +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef _HP_P2P_PEER_SESSION_HANDLER_ -#define _HP_P2P_PEER_SESSION_HANDLER_ - -#include "../pchheader.hpp" -#include "peer_comm_session.hpp" - -namespace p2p -{ - int handle_peer_connect(p2p::peer_comm_session &session); - int get_message_priority(std::string_view message); - int handle_peer_message(p2p::peer_comm_session &session, std::string_view message); - int handle_self_message(std::string_view message); - int handle_peer_close(const p2p::peer_comm_session &session); - void handle_peer_on_verified(p2p::peer_comm_session &session); - -} // namespace p2p -#endif \ No newline at end of file diff --git a/src/p2p/self_node.cpp b/src/p2p/self_node.cpp index 33cb66e6..64414947 100644 --- a/src/p2p/self_node.cpp +++ b/src/p2p/self_node.cpp @@ -1,5 +1,11 @@ #include "../pchheader.hpp" -#include "peer_session_handler.hpp" +#include "../conf.hpp" +#include "p2p.hpp" +#include "../msg/fbuf/p2pmsg_generated.h" +#include "../msg/fbuf/p2pmsg_conversion.hpp" +#include "../msg/fbuf/common_helpers.hpp" + +namespace p2pmsg = msg::fbuf::p2pmsg; namespace p2p::self { @@ -16,7 +22,17 @@ namespace p2p::self { std::string msg; if (msg_queue.try_dequeue(msg)) - return p2p::handle_self_message(msg); + { + // Handle the message we received from ourselves. + const peer_message_info mi = p2pmsg::get_peer_message_info(msg); + + if (mi.type == p2pmsg::P2PMsgContent_ProposalMsg) + handle_proposal_message(p2pmsg::create_proposal_from_msg(mi, hash_proposal_msg(*mi.p2p_msg->content_as_ProposalMsg()))); + else if (mi.type == p2pmsg::P2PMsgContent_NonUnlProposalMsg) + handle_nonunl_proposal_message(p2pmsg::create_nonunl_proposal_from_msg(mi)); + else if (mi.type == p2pmsg::P2PMsgContent_NplMsg) + handle_npl_message(p2pmsg::create_npl_from_msg(mi)); + } return 0; } diff --git a/src/usr/user_comm_session.cpp b/src/usr/user_comm_session.cpp index c4d19fdc..92ffa86f 100644 --- a/src/usr/user_comm_session.cpp +++ b/src/usr/user_comm_session.cpp @@ -1,24 +1,98 @@ #include "../pchheader.hpp" #include "../util/util.hpp" +#include "../msg/json/usrmsg_json.hpp" #include "user_comm_session.hpp" -#include "user_session_handler.hpp" +#include "usr.hpp" + +namespace jusrmsg = msg::usrmsg::json; namespace usr { + /** + * This gets hit every time a client connects to HP via the public port (configured in config). + * @return returns 0 if connection is successful and user challenge is sent, otherwise -1. + */ int user_comm_session::handle_connect() { - return usr::handle_user_connect(*this); + // Allow connection only if the maximum capacity is not reached. 0 means allowing unlimited connections. + if ((conf::cfg.user.max_connections == 0) || (ctx.users.size() < conf::cfg.user.max_connections)) + { + LOG_DEBUG << "User client connected " << display_name(); + + // As soon as a user connects, we issue them a challenge message. We remember the + // challenge we issued and later verify the user's response with it. + std::vector msg; + jusrmsg::create_user_challenge(msg, issued_challenge); + send(msg); + + // Set the challenge-issued value to true. + challenge_status = comm::CHALLENGE_STATUS::CHALLENGE_ISSUED; + return 0; + } + else + { + LOG_DEBUG << "Dropping the user connection. Maximum user capacity reached. [" << display_name() << "] (limit: " << conf::cfg.user.max_connections << ")."; + return -1; + } } + /** + * This gets hit every time we receive some data from a client connected to the HP public port. + */ int user_comm_session::handle_message(std::string_view msg) { - return usr::handle_user_message(*this, msg); + // Adding message size to user message characters(bytes) per minute counter. + increment_metric(comm::SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, msg.size()); + + // First check whether this session is pending challenge. + // Meaning we have previously issued a challenge to the client. + if (challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_ISSUED) + { + if (verify_challenge(msg, *this) == 0) + return 0; + + LOG_DEBUG << "User challenge verification failed. " << display_name(); + } + // Check whether this session belongs to an authenticated (challenge-verified) user. + else if (challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED) + { + // Check whether this user is among authenticated users + // and perform authenticated msg processing. + + const auto itr = ctx.users.find(pubkey); + if (itr != ctx.users.end()) + { + // This is an authed user. + connected_user &user = itr->second; + if (handle_authed_user_message(user, msg) != 0) + { + increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); + LOG_DEBUG << "Bad message from user " << display_name(); + } + } + else + { + increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); + LOG_DEBUG << "User session id not found: " << display_name(); + } + + return 0; + } + + // If for any reason we reach this point, we should drop the connection because none of the + // valid cases match. + LOG_DEBUG << "Dropping the user connection " << display_name(); + return -1; } + /** + * This gets hit every time a client disconnects from the HP public port. + */ void user_comm_session::handle_close() { - usr::handle_user_close(*this); + // Session belongs to an authed user. + if (challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED) + remove_user(pubkey); } - } // namespace usr \ No newline at end of file diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp deleted file mode 100644 index 0dcde16b..00000000 --- a/src/usr/user_session_handler.cpp +++ /dev/null @@ -1,102 +0,0 @@ -#include "../pchheader.hpp" -#include "../hplog.hpp" -#include "../msg/json/usrmsg_json.hpp" -#include "../bill/corebill.h" -#include "usr.hpp" -#include "user_session_handler.hpp" -#include "user_comm_session.hpp" - -namespace jusrmsg = msg::usrmsg::json; - -namespace usr -{ - /** - * This gets hit every time a client connects to HP via the public port (configured in config). - * @param session connected session. - * @return returns 0 if connection is successful and user challenge is sent, otherwise -1. - */ - int handle_user_connect(usr::user_comm_session &session) - { - // Allow connection only if the maximum capacity is not reached. 0 means allowing unlimited connections. - if ((conf::cfg.user.max_connections == 0) || (usr::ctx.users.size() < conf::cfg.user.max_connections)) - { - LOG_DEBUG << "User client connected " << session.display_name(); - - // As soon as a user connects, we issue them a challenge message. We remember the - // challenge we issued and later verify the user's response with it. - std::vector msg; - jusrmsg::create_user_challenge(msg, session.issued_challenge); - session.send(msg); - - // Set the challenge-issued value to true. - session.challenge_status = comm::CHALLENGE_ISSUED; - return 0; - } - else - { - LOG_DEBUG << "Dropping the user connection. Maximum user capacity reached. Session: " << session.display_name() << " (limit: " << conf::cfg.user.max_connections << ")."; - return -1; - } - } - - /** - * This gets hit every time we receive some data from a client connected to the HP public port. - */ - int handle_user_message(usr::user_comm_session &session, std::string_view message) - { - // Adding message size to user message characters(bytes) per minute counter. - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, message.size()); - - // First check whether this session is pending challenge. - // Meaning we have previously issued a challenge to the client. - if (session.challenge_status == comm::CHALLENGE_ISSUED) - { - if (verify_challenge(message, session) == 0) - return 0; - } - // Check whether this session belongs to an authenticated (challenge-verified) user. - else if (session.challenge_status == comm::CHALLENGE_VERIFIED) - { - // Check whether this user is among authenticated users - // and perform authenticated msg processing. - - const auto itr = ctx.users.find(session.pubkey); - if (itr != ctx.users.end()) - { - // This is an authed user. - connected_user &user = itr->second; - if (handle_authed_user_message(user, message) != 0) - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); - LOG_DEBUG << "Bad message from user " << session.display_name(); - } - } - else - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); - LOG_DEBUG << "User session id not found: " << session.display_name(); - } - - return 0; - } - - // If for any reason we reach this point, we should drop the connection because none of the - // valid cases match. - LOG_DEBUG << "Dropping the user connection " << session.display_name(); - corebill::report_violation(session.host_address); - return -1; - } - - /** - * This gets hit every time a client disconnects from the HP public port. - */ - int handle_user_close(const usr::user_comm_session &session) - { - // Session belongs to an authed user. - if (session.challenge_status == comm::CHALLENGE_VERIFIED) - remove_user(session.pubkey); - - return 0; - } - -} // namespace usr \ No newline at end of file diff --git a/src/usr/user_session_handler.hpp b/src/usr/user_session_handler.hpp deleted file mode 100644 index 3c72590b..00000000 --- a/src/usr/user_session_handler.hpp +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef _HP_USER_SESSION_HANDLER_ -#define _HP_USER_SESSION_HANDLER_ - -#include "../pchheader.hpp" -#include "user_comm_session.hpp" - -namespace usr -{ - int handle_user_connect(usr::user_comm_session &session); - int handle_user_message(usr::user_comm_session &session, std::string_view message); - int handle_user_close(const usr::user_comm_session &session); - -} // namespace usr - -#endif \ No newline at end of file diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 0c1a4190..564ef771 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -12,7 +12,6 @@ #include "../hpfs/hpfs_mount.hpp" #include "../status.hpp" #include "usr.hpp" -#include "user_session_handler.hpp" #include "user_comm_session.hpp" #include "user_comm_server.hpp" #include "user_input.hpp" diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index 313adec0..90ae1d11 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -9,7 +9,6 @@ #include "../msg/usrmsg_parser.hpp" #include "user_comm_session.hpp" #include "user_comm_server.hpp" -#include "user_session_handler.hpp" #include "user_input.hpp" #include "user_common.hpp" diff --git a/test/bin/hpws b/test/bin/hpws index 26dd64b3..91634d23 100755 Binary files a/test/bin/hpws and b/test/bin/hpws differ