diff --git a/src/comm/comm_server.hpp b/src/comm/comm_server.hpp index 02dbe677..c8175d49 100644 --- a/src/comm/comm_server.hpp +++ b/src/comm/comm_server.hpp @@ -11,6 +11,7 @@ namespace comm { constexpr uint32_t DEFAULT_MAX_MSG_SIZE = 4 * 1024 * 1024; + constexpr uint64_t DEFAULT_MAX_CONNECTIONS = 99999; template class comm_server @@ -18,6 +19,8 @@ namespace comm protected: const uint64_t (&metric_thresholds)[5]; const uint64_t max_msg_size; + const uint64_t max_in_connections; + const uint64_t max_in_connections_per_host; bool is_shutting_down = false; std::list sessions; std::list new_sessions; // Sessions that haven't been initialized properly which are yet to be merge to "sessions" list. @@ -190,8 +193,8 @@ namespace comm conf::ctx.hpws_exe_path, max_msg_size, listen_port, - 512, // Max connections - 2, // Max connections per IP. + max_in_connections, + max_in_connections_per_host, conf::ctx.tls_cert_file, conf::ctx.tls_key_file, {}, @@ -210,11 +213,14 @@ namespace comm } public: - comm_server(std::string_view name, const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size) + comm_server(std::string_view name, const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size, + const uint64_t max_in_connections, const uint64_t max_in_connections_per_host) : name(name), listen_port(port), metric_thresholds(metric_thresholds), - max_msg_size(max_msg_size > 0 ? max_msg_size : DEFAULT_MAX_MSG_SIZE) + max_msg_size(max_msg_size > 0 ? max_msg_size : DEFAULT_MAX_MSG_SIZE), + max_in_connections(max_in_connections > 0 ? max_in_connections : DEFAULT_MAX_CONNECTIONS), + max_in_connections_per_host(max_in_connections_per_host > 0 ? max_in_connections_per_host : DEFAULT_MAX_CONNECTIONS) { } diff --git a/src/conf.cpp b/src/conf.cpp index 80090526..c138faec 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -345,6 +345,7 @@ namespace conf cfg.mesh.msg_forwarding = mesh["msg_forwarding"].as(); cfg.mesh.max_connections = mesh["max_connections"].as(); cfg.mesh.max_known_connections = mesh["max_known_connections"].as(); + cfg.mesh.max_in_connections_per_host = mesh["max_in_connections_per_host"].as(); // If max_connections is greater than max_known_connections then show error and stop execution. if (cfg.mesh.max_known_connections > cfg.mesh.max_connections) { @@ -383,7 +384,8 @@ namespace conf { const jsoncons::ojson &user = d["user"]; cfg.user.port = user["port"].as(); - cfg.user.max_connections = user["max_connections"].as(); + cfg.user.max_connections = user["max_connections"].as(); + cfg.user.max_in_connections_per_host = user["max_in_connections_per_host"].as(); cfg.user.max_bytes_per_msg = user["max_bytes_per_msg"].as(); cfg.user.max_bytes_per_min = user["max_bytes_per_min"].as(); cfg.user.max_bad_msgs_per_min = user["max_bad_msgs_per_min"].as(); @@ -429,69 +431,80 @@ namespace conf jsoncons::ojson d; d.insert_or_assign("hp_version", cfg.hp_version); - // Node configs. - jsoncons::ojson node_config; - node_config.insert_or_assign("public_key", cfg.node.public_key_hex); - node_config.insert_or_assign("private_key", cfg.node.private_key_hex); - node_config.insert_or_assign("role", cfg.node.role == ROLE::OBSERVER ? ROLE_OBSERVER : ROLE_VALIDATOR); - // node_config.insert_or_assign("full_history", cfg.node.full_history); - d.insert_or_assign("node", node_config); + // Node config. + { + jsoncons::ojson node_config; + node_config.insert_or_assign("public_key", cfg.node.public_key_hex); + node_config.insert_or_assign("private_key", cfg.node.private_key_hex); + node_config.insert_or_assign("role", cfg.node.role == ROLE::OBSERVER ? ROLE_OBSERVER : ROLE_VALIDATOR); + d.insert_or_assign("node", node_config); + } // Contract config section. - jsoncons::ojson contract; - populate_contract_section_json(contract, cfg.contract, false); - d.insert_or_assign("contract", contract); + { + jsoncons::ojson contract; + populate_contract_section_json(contract, cfg.contract, false); + d.insert_or_assign("contract", contract); + } // Mesh configs. - jsoncons::ojson mesh_config; - mesh_config.insert_or_assign("port", cfg.mesh.port); - - jsoncons::ojson peers(jsoncons::json_array_arg); - for (const auto &peer : cfg.mesh.known_peers) { - const std::string concat_str = std::string(peer.ip_port.host_address).append(":").append(std::to_string(peer.ip_port.port)); - peers.push_back(concat_str); + jsoncons::ojson mesh_config; + mesh_config.insert_or_assign("port", cfg.mesh.port); + + jsoncons::ojson peers(jsoncons::json_array_arg); + for (const auto &peer : cfg.mesh.known_peers) + { + const std::string concat_str = std::string(peer.ip_port.host_address).append(":").append(std::to_string(peer.ip_port.port)); + peers.push_back(concat_str); + } + mesh_config.insert_or_assign("known_peers", peers); + mesh_config.insert_or_assign("msg_forwarding", cfg.mesh.msg_forwarding); + mesh_config.insert_or_assign("max_connections", cfg.mesh.max_connections); + mesh_config.insert_or_assign("max_known_connections", cfg.mesh.max_known_connections); + mesh_config.insert_or_assign("max_in_connections_per_host", cfg.mesh.max_in_connections_per_host); + mesh_config.insert_or_assign("max_bytes_per_msg", cfg.mesh.max_bytes_per_msg); + mesh_config.insert_or_assign("max_bytes_per_min", cfg.mesh.max_bytes_per_min); + mesh_config.insert_or_assign("max_bad_msgs_per_min", cfg.mesh.max_bad_msgs_per_min); + mesh_config.insert_or_assign("max_bad_msgsigs_per_min", cfg.mesh.max_bad_msgsigs_per_min); + mesh_config.insert_or_assign("max_dup_msgs_per_min", cfg.mesh.max_dup_msgs_per_min); + mesh_config.insert_or_assign("idle_timeout", cfg.mesh.idle_timeout); + + jsoncons::ojson peer_discovery_config; + peer_discovery_config.insert_or_assign("enabled", cfg.mesh.peer_discovery.enabled); + peer_discovery_config.insert_or_assign("interval", cfg.mesh.peer_discovery.interval); + + mesh_config.insert_or_assign("peer_discovery", peer_discovery_config); + d.insert_or_assign("mesh", mesh_config); } - mesh_config.insert_or_assign("known_peers", peers); - mesh_config.insert_or_assign("msg_forwarding", cfg.mesh.msg_forwarding); - mesh_config.insert_or_assign("max_connections", cfg.mesh.max_connections); - mesh_config.insert_or_assign("max_known_connections", cfg.mesh.max_known_connections); - mesh_config.insert_or_assign("max_bytes_per_msg", cfg.mesh.max_bytes_per_msg); - mesh_config.insert_or_assign("max_bytes_per_min", cfg.mesh.max_bytes_per_min); - mesh_config.insert_or_assign("max_bad_msgs_per_min", cfg.mesh.max_bad_msgs_per_min); - mesh_config.insert_or_assign("max_bad_msgsigs_per_min", cfg.mesh.max_bad_msgsigs_per_min); - mesh_config.insert_or_assign("max_dup_msgs_per_min", cfg.mesh.max_dup_msgs_per_min); - mesh_config.insert_or_assign("idle_timeout", cfg.mesh.idle_timeout); - - jsoncons::ojson peer_discovery_config; - peer_discovery_config.insert_or_assign("enabled", cfg.mesh.peer_discovery.enabled); - peer_discovery_config.insert_or_assign("interval", cfg.mesh.peer_discovery.interval); - - mesh_config.insert_or_assign("peer_discovery", peer_discovery_config); - d.insert_or_assign("mesh", mesh_config); // User configs. - jsoncons::ojson user_config; - user_config.insert_or_assign("port", cfg.user.port); - user_config.insert_or_assign("idle_timeout", cfg.user.idle_timeout); - user_config.insert_or_assign("max_bytes_per_msg", cfg.user.max_bytes_per_msg); - user_config.insert_or_assign("max_bytes_per_min", cfg.user.max_bytes_per_min); - user_config.insert_or_assign("max_bad_msgs_per_min", cfg.user.max_bad_msgs_per_min); - user_config.insert_or_assign("max_connections", cfg.user.max_connections); - user_config.insert_or_assign("enabled", cfg.user.enabled); - d.insert_or_assign("user", user_config); + { + jsoncons::ojson user_config; + user_config.insert_or_assign("port", cfg.user.port); + user_config.insert_or_assign("idle_timeout", cfg.user.idle_timeout); + user_config.insert_or_assign("max_bytes_per_msg", cfg.user.max_bytes_per_msg); + user_config.insert_or_assign("max_bytes_per_min", cfg.user.max_bytes_per_min); + user_config.insert_or_assign("max_bad_msgs_per_min", cfg.user.max_bad_msgs_per_min); + user_config.insert_or_assign("max_connections", cfg.user.max_connections); + user_config.insert_or_assign("max_in_connections_per_host", cfg.user.max_in_connections_per_host); + user_config.insert_or_assign("enabled", cfg.user.enabled); + d.insert_or_assign("user", user_config); + } // Log configs. - jsoncons::ojson log_config; - log_config.insert_or_assign("loglevel", cfg.log.loglevel); - - jsoncons::ojson loggers(jsoncons::json_array_arg); - for (std::string_view logger : cfg.log.loggers) { - loggers.push_back(logger); + jsoncons::ojson log_config; + log_config.insert_or_assign("loglevel", cfg.log.loglevel); + + jsoncons::ojson loggers(jsoncons::json_array_arg); + for (std::string_view logger : cfg.log.loggers) + { + loggers.push_back(logger); + } + log_config.insert_or_assign("loggers", loggers); + d.insert_or_assign("log", log_config); } - log_config.insert_or_assign("loggers", loggers); - d.insert_or_assign("log", log_config); return write_json_file(ctx.config_file, d); } diff --git a/src/conf.hpp b/src/conf.hpp index c784087e..d17d23a4 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -99,13 +99,14 @@ namespace conf struct user_config { - uint16_t port = 0; // Listening port for public user connections - uint16_t idle_timeout = 0; // Idle connection timeout for user connections in seconds. - uint64_t max_bytes_per_msg = 0; // User message max size in bytes - uint64_t max_bytes_per_min = 0; // User message rate (characters(bytes) per minute) - uint64_t max_bad_msgs_per_min = 0; // User bad messages per minute - uint16_t max_connections = 0; // Max inbound user connections - bool enabled = true; // User connections enable/disable. + uint16_t port = 0; // Listening port for public user connections + uint16_t idle_timeout = 0; // Idle connection timeout for user connections in seconds. + uint64_t max_bytes_per_msg = 0; // User message max size in bytes + uint64_t max_bytes_per_min = 0; // User message rate (characters(bytes) per minute) + uint64_t max_bad_msgs_per_min = 0; // User bad messages per minute + uint16_t max_connections = 0; // Max inbound user connections + uint16_t max_in_connections_per_host = 0; // Max inbound user connections per remote host (IP). + bool enabled = true; // User connections enable/disable. }; struct peer_discovery_config @@ -117,15 +118,16 @@ namespace conf struct mesh_config { uint16_t port = 0; // Listening port for peer connections - std::vector known_peers; // Vector of peers with ip_port, timestamp, capacity + std::vector known_peers; // Vector of peers with ip_port, timestamp, capacity. bool msg_forwarding = false; // Whether peer message forwarding is on/off. - uint16_t max_connections = 0; // Max peer connections - uint16_t max_known_connections = 0; // Max known peer connections - uint64_t max_bytes_per_msg = 0; // Peer message max size in bytes - uint64_t max_bytes_per_min = 0; // Peer message rate (characters(bytes) per minute) - uint64_t max_bad_msgs_per_min = 0; // Peer bad messages per minute - uint64_t max_bad_msgsigs_per_min = 0; // Peer bad signatures per minute - uint64_t max_dup_msgs_per_min = 0; // Peer max duplicate messages per minute + uint16_t max_connections = 0; // Max peer connections. + uint16_t max_known_connections = 0; // Max known peer connections. + uint16_t max_in_connections_per_host = 0; // Max inbound peer connections per remote host (IP). + uint64_t max_bytes_per_msg = 0; // Peer message max size in bytes. + uint64_t max_bytes_per_min = 0; // Peer message rate (characters(bytes) per minute). + uint64_t max_bad_msgs_per_min = 0; // Peer bad messages per minute. + uint64_t max_bad_msgsigs_per_min = 0; // Peer bad signatures per minute. + uint64_t max_dup_msgs_per_min = 0; // Peer max duplicate messages per minute. uint16_t idle_timeout = 0; // Idle connection timeout for peer connections in seconds. peer_discovery_config peer_discovery; // Peer discovery configs. }; diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 32b774ff..96b97268 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -49,7 +49,8 @@ namespace p2p int start_peer_connections() { - ctx.server.emplace(conf::cfg.mesh.port, metric_thresholds, conf::cfg.mesh.max_bytes_per_msg, conf::cfg.mesh.known_peers); + ctx.server.emplace(conf::cfg.mesh.port, metric_thresholds, conf::cfg.mesh.max_bytes_per_msg, + conf::cfg.mesh.max_connections, conf::cfg.mesh.max_in_connections_per_host, conf::cfg.mesh.known_peers); if (ctx.server->start() == -1) return -1; diff --git a/src/p2p/peer_comm_server.cpp b/src/p2p/peer_comm_server.cpp index 334584b2..5f62159a 100644 --- a/src/p2p/peer_comm_server.cpp +++ b/src/p2p/peer_comm_server.cpp @@ -3,6 +3,7 @@ #include "../msg/fbuf/p2pmsg_helpers.hpp" #include "../ledger.hpp" #include "../unl.hpp" +#include "../conf.hpp" #include "peer_comm_server.hpp" #include "peer_comm_session.hpp" #include "self_node.hpp" @@ -13,9 +14,10 @@ namespace p2p // Globally exposed weakly connected status variable. bool is_weakly_connected = false; - peer_comm_server::peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], - const uint64_t max_msg_size, std::vector &req_known_remotes) - : comm::comm_server("Peer", port, metric_thresholds, max_msg_size), + peer_comm_server::peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size, + const uint64_t max_in_connections, const uint64_t max_in_connections_per_host, + std::vector &req_known_remotes) + : comm::comm_server("Peer", port, metric_thresholds, max_msg_size, max_in_connections, max_in_connections_per_host), req_known_remotes(req_known_remotes) { } diff --git a/src/p2p/peer_comm_server.hpp b/src/p2p/peer_comm_server.hpp index f98145aa..87b51434 100644 --- a/src/p2p/peer_comm_server.hpp +++ b/src/p2p/peer_comm_server.hpp @@ -31,8 +31,9 @@ namespace p2p std::atomic known_remote_count = 0; std::mutex req_known_remotes_mutex; std::vector &req_known_remotes; - peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], - const uint64_t max_msg_size, std::vector &req_known_remotes); + peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size, + const uint64_t max_in_connections, const uint64_t max_in_connections_per_host, + std::vector &req_known_remotes); }; } // namespace p2p diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index b97da187..e595054b 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -79,7 +79,8 @@ namespace usr */ int start_listening() { - ctx.server.emplace("User", conf::cfg.user.port, metric_thresholds, conf::cfg.user.max_bytes_per_msg); + ctx.server.emplace("User", conf::cfg.user.port, metric_thresholds, conf::cfg.user.max_bytes_per_msg, + conf::cfg.user.max_connections, conf::cfg.user.max_in_connections_per_host); if (ctx.server->start() == -1) return -1;