From 9d1163c8c381e7c47064edb9f0bb1b355cce8eb8 Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Tue, 16 Feb 2021 14:27:03 +0530 Subject: [PATCH] Config flag to disable peer port listening. (#245) --- src/comm/comm_server.hpp | 7 +++++-- src/comm/comm_session.cpp | 4 ++-- src/conf.cpp | 20 ++++++++++++-------- src/conf.hpp | 3 ++- src/p2p/p2p.cpp | 3 ++- src/usr/usr.cpp | 9 ++++----- 6 files changed, 27 insertions(+), 19 deletions(-) diff --git a/src/comm/comm_server.hpp b/src/comm/comm_server.hpp index c8175d49..580e7f41 100644 --- a/src/comm/comm_server.hpp +++ b/src/comm/comm_server.hpp @@ -113,7 +113,10 @@ namespace comm void check_for_new_connection() { - std::variant accept_result = hpws_server.value().accept(true); + if (listen_port == 0) + return; + + std::variant accept_result = hpws_server->accept(true); if (std::holds_alternative(accept_result)) { @@ -226,7 +229,7 @@ namespace comm int start() { - if (start_hpws_server() == -1) + if (listen_port > 0 && start_hpws_server() == -1) return -1; watchdog_thread = std::thread(&comm_server::connection_watchdog, this); diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index 3cbb91ea..f66cd516 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -83,7 +83,7 @@ namespace comm std::optional error = hpws_client->ack(data); if (error.has_value()) { - LOG_DEBUG << "hpws client ack failed:" << error.value().first << " " << error.value().second; + LOG_DEBUG << "hpws client ack failed:" << error->first << " " << error->second; should_disconnect = true; } } @@ -159,7 +159,7 @@ namespace comm std::optional error = hpws_client->write(message); if (error.has_value()) { - LOG_DEBUG << "hpws client write failed:" << error.value().first << " " << error.value().second; + LOG_DEBUG << "hpws client write failed:" << error->first << " " << error->second; return -1; } return 0; diff --git a/src/conf.cpp b/src/conf.cpp index 8fbfd001..646f9148 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -13,8 +13,8 @@ namespace conf // Global configuration struct exposed to the application. hp_config cfg; - // Stores the initial startup mode of the node. - ROLE startup_mode; + // Stores the initial startup role of the node. + ROLE startup_role; constexpr int FILE_PERMS = 0644; @@ -133,7 +133,7 @@ namespace conf cfg.hp_version = util::HP_VERSION; - cfg.node.role = ROLE::VALIDATOR; + cfg.node.role = startup_role = ROLE::VALIDATOR; cfg.node.full_history = false; cfg.contract.id = crypto::generate_uuid(); @@ -323,7 +323,7 @@ namespace conf std::cerr << "Invalid mode. 'observer' or 'validator' expected.\n"; return -1; } - startup_mode = cfg.node.role; + startup_role = cfg.node.role; } catch (const std::exception &e) { @@ -346,6 +346,8 @@ namespace conf { const jsoncons::ojson &mesh = d["mesh"]; cfg.mesh.port = mesh["port"].as(); + cfg.mesh.listen = mesh["listen"].as(); + // Storing peers in unordered map keyed by the concatenated address:port and also saving address and port // seperately to retrieve easily when handling peer connections. std::vector splitted_peers; @@ -406,6 +408,7 @@ namespace conf { const jsoncons::ojson &user = d["user"]; cfg.user.port = user["port"].as(); + cfg.user.listen = user["listen"].as(); cfg.user.max_connections = user["max_connections"].as(); cfg.user.max_in_connections_per_host = user["max_in_connections_per_host"].as(); cfg.user.max_bytes_per_msg = user["max_bytes_per_msg"].as(); @@ -413,7 +416,6 @@ namespace conf cfg.user.max_bad_msgs_per_min = user["max_bad_msgs_per_min"].as(); cfg.user.idle_timeout = user["idle_timeout"].as(); cfg.user.concurrent_read_reqeuests = user["concurrent_read_reqeuests"].as(); - cfg.user.enabled = user["enabled"].as(); } catch (const std::exception &e) { @@ -479,7 +481,8 @@ namespace conf jsoncons::ojson node_config; node_config.insert_or_assign("public_key", cfg.node.public_key_hex); node_config.insert_or_assign("private_key", cfg.node.private_key_hex); - node_config.insert_or_assign("role", cfg.node.role == ROLE::OBSERVER ? ROLE_OBSERVER : ROLE_VALIDATOR); + // We always save the startup role to config. Not the current role which might get changed dynamically during syncing. + node_config.insert_or_assign("role", startup_role == ROLE::OBSERVER ? ROLE_OBSERVER : ROLE_VALIDATOR); d.insert_or_assign("node", node_config); } @@ -494,6 +497,7 @@ namespace conf { jsoncons::ojson mesh_config; mesh_config.insert_or_assign("port", cfg.mesh.port); + mesh_config.insert_or_assign("listen", cfg.mesh.listen); jsoncons::ojson peers(jsoncons::json_array_arg); for (const auto &peer : cfg.mesh.known_peers) @@ -525,13 +529,13 @@ namespace conf { jsoncons::ojson user_config; user_config.insert_or_assign("port", cfg.user.port); + user_config.insert_or_assign("listen", cfg.user.listen); user_config.insert_or_assign("idle_timeout", cfg.user.idle_timeout); user_config.insert_or_assign("max_bytes_per_msg", cfg.user.max_bytes_per_msg); user_config.insert_or_assign("max_bytes_per_min", cfg.user.max_bytes_per_min); user_config.insert_or_assign("max_bad_msgs_per_min", cfg.user.max_bad_msgs_per_min); user_config.insert_or_assign("max_connections", cfg.user.max_connections); user_config.insert_or_assign("max_in_connections_per_host", cfg.user.max_in_connections_per_host); - user_config.insert_or_assign("enabled", cfg.user.enabled); user_config.insert_or_assign("concurrent_read_reqeuests", cfg.user.concurrent_read_reqeuests); d.insert_or_assign("user", user_config); } @@ -679,7 +683,7 @@ namespace conf void change_role(const ROLE role) { // Do not allow to change the mode if the node was started as an observer. - if (startup_mode == ROLE::OBSERVER || cfg.node.role == role) + if (startup_role == ROLE::OBSERVER || cfg.node.role == role) return; cfg.node.role = role; diff --git a/src/conf.hpp b/src/conf.hpp index 4047f4c8..89eb807b 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -117,6 +117,7 @@ namespace conf struct user_config { uint16_t port = 0; // Listening port for public user connections + bool listen = true; // Whether to listen for incoming user connections. uint16_t idle_timeout = 0; // Idle connection timeout for user connections in seconds. uint64_t max_bytes_per_msg = 0; // User message max size in bytes uint64_t max_bytes_per_min = 0; // User message rate (characters(bytes) per minute) @@ -124,7 +125,6 @@ namespace conf uint16_t max_connections = 0; // Max inbound user connections uint16_t max_in_connections_per_host = 0; // Max inbound user connections per remote host (IP). uint64_t concurrent_read_reqeuests = 10; // Supported concurrent read requests count. - bool enabled = true; // User connections enable/disable. }; struct peer_discovery_config @@ -136,6 +136,7 @@ namespace conf struct mesh_config { uint16_t port = 0; // Listening port for peer connections + bool listen = true; // Whether to listen for incoming peer connections. std::vector known_peers; // Vector of peers with ip_port, timestamp, capacity. bool msg_forwarding = false; // Whether peer message forwarding is on/off. uint16_t max_connections = 0; // Max peer connections. diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 96b97268..e5f6a147 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -49,7 +49,8 @@ namespace p2p int start_peer_connections() { - ctx.server.emplace(conf::cfg.mesh.port, metric_thresholds, conf::cfg.mesh.max_bytes_per_msg, + const uint16_t listen_port = conf::cfg.mesh.listen ? conf::cfg.mesh.port : 0; + ctx.server.emplace(listen_port, metric_thresholds, conf::cfg.mesh.max_bytes_per_msg, conf::cfg.mesh.max_connections, conf::cfg.mesh.max_in_connections_per_host, conf::cfg.mesh.known_peers); if (ctx.server->start() == -1) return -1; diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 76bdbff6..482d952d 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -45,8 +45,8 @@ namespace usr if (input_store.init() == -1) return -1; - // Start listening for incoming user connections only if user connections config is enabled. - if (conf::cfg.user.enabled) + // Start listening for incoming user connections only if user connection listening is enabled. + if (conf::cfg.user.listen) { if (start_listening() == -1) return -1; @@ -68,10 +68,9 @@ namespace usr if (init_success) { // Stop com server only if user connections config is enabled (Otherwise server hasn't been started). - if (conf::cfg.user.enabled) - { + if (conf::cfg.user.listen) ctx.server->stop(); - } + input_store.deinit(); } }