diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 66996f7a..145d1ff1 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -494,6 +494,12 @@ namespace p2p continue; } + if (ctx.server->dead_known_peers.exists(peer.ip_port.to_string())) + { + LOG_DEBUG << "Rejecting " + peer.ip_port.to_string() + ". Peer was removed prior due to unavailability."; + continue; + } + const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) { return p.ip_port == peer.ip_port; }); diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 688d0802..534d13ac 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -31,6 +31,7 @@ namespace p2p int16_t available_capacity = -1; uint64_t timestamp = 0; int64_t weight = 0; + int32_t failed_attempts = 0; }; struct proposal diff --git a/src/p2p/peer_comm_server.cpp b/src/p2p/peer_comm_server.cpp index badcef52..a68e2faf 100644 --- a/src/p2p/peer_comm_server.cpp +++ b/src/p2p/peer_comm_server.cpp @@ -11,6 +11,9 @@ namespace p2p { constexpr float WEAKLY_CONNECTED_THRESHOLD = 0.7; + constexpr int16_t PEER_FAILED_THRESHOLD = 10; + // Peer will be removed from the dead known peers collection after this period of time. + constexpr int32_t DEAD_PEER_TIMEOUT = 5 * 60 * 1000; // 5 minutes. peer_comm_server::peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size, const uint64_t max_in_connections, const uint64_t max_in_connections_per_host, @@ -134,6 +137,9 @@ namespace p2p peer_check_list = req_known_remotes; } + bool connections_changed = false; + std::vector failed_nodes; + for (const auto &peer : peer_check_list) { if (is_shutting_down) @@ -171,7 +177,14 @@ namespace p2p { const hpws::error error = std::get(client_result); if (error.first != 202) + { LOG_DEBUG << "Outbound connection hpws error:" << error.first << " " << error.second; + if (conf::cfg.mesh.peer_discovery.enabled) + { + failed_nodes.push_back(peer.ip_port); + connections_changed = true; + } + } } else { @@ -199,9 +212,34 @@ namespace p2p std::scoped_lock lock(new_sessions_mutex); new_sessions.emplace_back(std::move(session)); + connections_changed = true; } } } + if (conf::cfg.mesh.peer_discovery.enabled && connections_changed) + { + // Copy failed attempt data from failed_nodes to req_known_remotes. + std::scoped_lock lock(req_known_remotes_mutex); + + for (auto it = req_known_remotes.begin(); it != req_known_remotes.end();) + { + const auto itr = std::find(failed_nodes.begin(), failed_nodes.end(), it->ip_port); + if (itr != failed_nodes.end()) + it->failed_attempts++; + else if (it->failed_attempts > 0) // Reset failed attempts count if the connection succeeds. + it->failed_attempts = 0; + + if (it->failed_attempts >= PEER_FAILED_THRESHOLD) + { + LOG_INFO << "Removed " << it->ip_port.to_string() << " from known peer list due to unavailability."; + // Add the dead nodes ip data to reject same peer from peer discovery responses. + dead_known_peers.emplace(it->ip_port.to_string(), DEAD_PEER_TIMEOUT); + it = req_known_remotes.erase(it); + } + else + ++it; + } + } } /** * Check whether the node is weakly connected or strongly connected. diff --git a/src/p2p/peer_comm_server.hpp b/src/p2p/peer_comm_server.hpp index 33dce2ec..1ecc259c 100644 --- a/src/p2p/peer_comm_server.hpp +++ b/src/p2p/peer_comm_server.hpp @@ -2,6 +2,7 @@ #define _HP_P2P_PEER_COMM_SERVER_ #include "../comm/comm_server.hpp" +#include "../util/ttl_set.hpp" #include "peer_comm_session.hpp" namespace p2p @@ -28,6 +29,7 @@ namespace p2p std::atomic known_remote_count = 0; std::mutex req_known_remotes_mutex; std::vector req_known_remotes; + util::ttl_set dead_known_peers; peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size, const uint64_t max_in_connections, const uint64_t max_in_connections_per_host, const std::vector &req_known_remotes);