Added configs to max in connections per remote host. (#229)

This commit is contained in:
Ravin Perera
2021-01-29 13:05:39 +05:30
committed by GitHub
parent 70493d827d
commit 3d36912c25
7 changed files with 104 additions and 78 deletions

View File

@@ -11,6 +11,7 @@
namespace comm
{
constexpr uint32_t DEFAULT_MAX_MSG_SIZE = 4 * 1024 * 1024;
constexpr uint64_t DEFAULT_MAX_CONNECTIONS = 99999;
template <typename T>
class comm_server
@@ -18,6 +19,8 @@ namespace comm
protected:
const uint64_t (&metric_thresholds)[5];
const uint64_t max_msg_size;
const uint64_t max_in_connections;
const uint64_t max_in_connections_per_host;
bool is_shutting_down = false;
std::list<T> sessions;
std::list<T> new_sessions; // Sessions that haven't been initialized properly which are yet to be merge to "sessions" list.
@@ -190,8 +193,8 @@ namespace comm
conf::ctx.hpws_exe_path,
max_msg_size,
listen_port,
512, // Max connections
2, // Max connections per IP.
max_in_connections,
max_in_connections_per_host,
conf::ctx.tls_cert_file,
conf::ctx.tls_key_file,
{},
@@ -210,11 +213,14 @@ namespace comm
}
public:
comm_server(std::string_view name, const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size)
comm_server(std::string_view name, const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size,
const uint64_t max_in_connections, const uint64_t max_in_connections_per_host)
: name(name),
listen_port(port),
metric_thresholds(metric_thresholds),
max_msg_size(max_msg_size > 0 ? max_msg_size : DEFAULT_MAX_MSG_SIZE)
max_msg_size(max_msg_size > 0 ? max_msg_size : DEFAULT_MAX_MSG_SIZE),
max_in_connections(max_in_connections > 0 ? max_in_connections : DEFAULT_MAX_CONNECTIONS),
max_in_connections_per_host(max_in_connections_per_host > 0 ? max_in_connections_per_host : DEFAULT_MAX_CONNECTIONS)
{
}

View File

@@ -345,6 +345,7 @@ namespace conf
cfg.mesh.msg_forwarding = mesh["msg_forwarding"].as<bool>();
cfg.mesh.max_connections = mesh["max_connections"].as<uint16_t>();
cfg.mesh.max_known_connections = mesh["max_known_connections"].as<uint16_t>();
cfg.mesh.max_in_connections_per_host = mesh["max_in_connections_per_host"].as<uint16_t>();
// If max_connections is greater than max_known_connections then show error and stop execution.
if (cfg.mesh.max_known_connections > cfg.mesh.max_connections)
{
@@ -383,7 +384,8 @@ namespace conf
{
const jsoncons::ojson &user = d["user"];
cfg.user.port = user["port"].as<uint16_t>();
cfg.user.max_connections = user["max_connections"].as<unsigned int>();
cfg.user.max_connections = user["max_connections"].as<uint64_t>();
cfg.user.max_in_connections_per_host = user["max_in_connections_per_host"].as<uint64_t>();
cfg.user.max_bytes_per_msg = user["max_bytes_per_msg"].as<uint64_t>();
cfg.user.max_bytes_per_min = user["max_bytes_per_min"].as<uint64_t>();
cfg.user.max_bad_msgs_per_min = user["max_bad_msgs_per_min"].as<uint64_t>();
@@ -429,69 +431,80 @@ namespace conf
jsoncons::ojson d;
d.insert_or_assign("hp_version", cfg.hp_version);
// Node configs.
jsoncons::ojson node_config;
node_config.insert_or_assign("public_key", cfg.node.public_key_hex);
node_config.insert_or_assign("private_key", cfg.node.private_key_hex);
node_config.insert_or_assign("role", cfg.node.role == ROLE::OBSERVER ? ROLE_OBSERVER : ROLE_VALIDATOR);
// node_config.insert_or_assign("full_history", cfg.node.full_history);
d.insert_or_assign("node", node_config);
// Node config.
{
jsoncons::ojson node_config;
node_config.insert_or_assign("public_key", cfg.node.public_key_hex);
node_config.insert_or_assign("private_key", cfg.node.private_key_hex);
node_config.insert_or_assign("role", cfg.node.role == ROLE::OBSERVER ? ROLE_OBSERVER : ROLE_VALIDATOR);
d.insert_or_assign("node", node_config);
}
// Contract config section.
jsoncons::ojson contract;
populate_contract_section_json(contract, cfg.contract, false);
d.insert_or_assign("contract", contract);
{
jsoncons::ojson contract;
populate_contract_section_json(contract, cfg.contract, false);
d.insert_or_assign("contract", contract);
}
// Mesh configs.
jsoncons::ojson mesh_config;
mesh_config.insert_or_assign("port", cfg.mesh.port);
jsoncons::ojson peers(jsoncons::json_array_arg);
for (const auto &peer : cfg.mesh.known_peers)
{
const std::string concat_str = std::string(peer.ip_port.host_address).append(":").append(std::to_string(peer.ip_port.port));
peers.push_back(concat_str);
jsoncons::ojson mesh_config;
mesh_config.insert_or_assign("port", cfg.mesh.port);
jsoncons::ojson peers(jsoncons::json_array_arg);
for (const auto &peer : cfg.mesh.known_peers)
{
const std::string concat_str = std::string(peer.ip_port.host_address).append(":").append(std::to_string(peer.ip_port.port));
peers.push_back(concat_str);
}
mesh_config.insert_or_assign("known_peers", peers);
mesh_config.insert_or_assign("msg_forwarding", cfg.mesh.msg_forwarding);
mesh_config.insert_or_assign("max_connections", cfg.mesh.max_connections);
mesh_config.insert_or_assign("max_known_connections", cfg.mesh.max_known_connections);
mesh_config.insert_or_assign("max_in_connections_per_host", cfg.mesh.max_in_connections_per_host);
mesh_config.insert_or_assign("max_bytes_per_msg", cfg.mesh.max_bytes_per_msg);
mesh_config.insert_or_assign("max_bytes_per_min", cfg.mesh.max_bytes_per_min);
mesh_config.insert_or_assign("max_bad_msgs_per_min", cfg.mesh.max_bad_msgs_per_min);
mesh_config.insert_or_assign("max_bad_msgsigs_per_min", cfg.mesh.max_bad_msgsigs_per_min);
mesh_config.insert_or_assign("max_dup_msgs_per_min", cfg.mesh.max_dup_msgs_per_min);
mesh_config.insert_or_assign("idle_timeout", cfg.mesh.idle_timeout);
jsoncons::ojson peer_discovery_config;
peer_discovery_config.insert_or_assign("enabled", cfg.mesh.peer_discovery.enabled);
peer_discovery_config.insert_or_assign("interval", cfg.mesh.peer_discovery.interval);
mesh_config.insert_or_assign("peer_discovery", peer_discovery_config);
d.insert_or_assign("mesh", mesh_config);
}
mesh_config.insert_or_assign("known_peers", peers);
mesh_config.insert_or_assign("msg_forwarding", cfg.mesh.msg_forwarding);
mesh_config.insert_or_assign("max_connections", cfg.mesh.max_connections);
mesh_config.insert_or_assign("max_known_connections", cfg.mesh.max_known_connections);
mesh_config.insert_or_assign("max_bytes_per_msg", cfg.mesh.max_bytes_per_msg);
mesh_config.insert_or_assign("max_bytes_per_min", cfg.mesh.max_bytes_per_min);
mesh_config.insert_or_assign("max_bad_msgs_per_min", cfg.mesh.max_bad_msgs_per_min);
mesh_config.insert_or_assign("max_bad_msgsigs_per_min", cfg.mesh.max_bad_msgsigs_per_min);
mesh_config.insert_or_assign("max_dup_msgs_per_min", cfg.mesh.max_dup_msgs_per_min);
mesh_config.insert_or_assign("idle_timeout", cfg.mesh.idle_timeout);
jsoncons::ojson peer_discovery_config;
peer_discovery_config.insert_or_assign("enabled", cfg.mesh.peer_discovery.enabled);
peer_discovery_config.insert_or_assign("interval", cfg.mesh.peer_discovery.interval);
mesh_config.insert_or_assign("peer_discovery", peer_discovery_config);
d.insert_or_assign("mesh", mesh_config);
// User configs.
jsoncons::ojson user_config;
user_config.insert_or_assign("port", cfg.user.port);
user_config.insert_or_assign("idle_timeout", cfg.user.idle_timeout);
user_config.insert_or_assign("max_bytes_per_msg", cfg.user.max_bytes_per_msg);
user_config.insert_or_assign("max_bytes_per_min", cfg.user.max_bytes_per_min);
user_config.insert_or_assign("max_bad_msgs_per_min", cfg.user.max_bad_msgs_per_min);
user_config.insert_or_assign("max_connections", cfg.user.max_connections);
user_config.insert_or_assign("enabled", cfg.user.enabled);
d.insert_or_assign("user", user_config);
{
jsoncons::ojson user_config;
user_config.insert_or_assign("port", cfg.user.port);
user_config.insert_or_assign("idle_timeout", cfg.user.idle_timeout);
user_config.insert_or_assign("max_bytes_per_msg", cfg.user.max_bytes_per_msg);
user_config.insert_or_assign("max_bytes_per_min", cfg.user.max_bytes_per_min);
user_config.insert_or_assign("max_bad_msgs_per_min", cfg.user.max_bad_msgs_per_min);
user_config.insert_or_assign("max_connections", cfg.user.max_connections);
user_config.insert_or_assign("max_in_connections_per_host", cfg.user.max_in_connections_per_host);
user_config.insert_or_assign("enabled", cfg.user.enabled);
d.insert_or_assign("user", user_config);
}
// Log configs.
jsoncons::ojson log_config;
log_config.insert_or_assign("loglevel", cfg.log.loglevel);
jsoncons::ojson loggers(jsoncons::json_array_arg);
for (std::string_view logger : cfg.log.loggers)
{
loggers.push_back(logger);
jsoncons::ojson log_config;
log_config.insert_or_assign("loglevel", cfg.log.loglevel);
jsoncons::ojson loggers(jsoncons::json_array_arg);
for (std::string_view logger : cfg.log.loggers)
{
loggers.push_back(logger);
}
log_config.insert_or_assign("loggers", loggers);
d.insert_or_assign("log", log_config);
}
log_config.insert_or_assign("loggers", loggers);
d.insert_or_assign("log", log_config);
return write_json_file(ctx.config_file, d);
}

View File

@@ -99,13 +99,14 @@ namespace conf
struct user_config
{
uint16_t port = 0; // Listening port for public user connections
uint16_t idle_timeout = 0; // Idle connection timeout for user connections in seconds.
uint64_t max_bytes_per_msg = 0; // User message max size in bytes
uint64_t max_bytes_per_min = 0; // User message rate (characters(bytes) per minute)
uint64_t max_bad_msgs_per_min = 0; // User bad messages per minute
uint16_t max_connections = 0; // Max inbound user connections
bool enabled = true; // User connections enable/disable.
uint16_t port = 0; // Listening port for public user connections
uint16_t idle_timeout = 0; // Idle connection timeout for user connections in seconds.
uint64_t max_bytes_per_msg = 0; // User message max size in bytes
uint64_t max_bytes_per_min = 0; // User message rate (characters(bytes) per minute)
uint64_t max_bad_msgs_per_min = 0; // User bad messages per minute
uint16_t max_connections = 0; // Max inbound user connections
uint16_t max_in_connections_per_host = 0; // Max inbound user connections per remote host (IP).
bool enabled = true; // User connections enable/disable.
};
struct peer_discovery_config
@@ -117,15 +118,16 @@ namespace conf
struct mesh_config
{
uint16_t port = 0; // Listening port for peer connections
std::vector<peer_properties> known_peers; // Vector of peers with ip_port, timestamp, capacity
std::vector<peer_properties> known_peers; // Vector of peers with ip_port, timestamp, capacity.
bool msg_forwarding = false; // Whether peer message forwarding is on/off.
uint16_t max_connections = 0; // Max peer connections
uint16_t max_known_connections = 0; // Max known peer connections
uint64_t max_bytes_per_msg = 0; // Peer message max size in bytes
uint64_t max_bytes_per_min = 0; // Peer message rate (characters(bytes) per minute)
uint64_t max_bad_msgs_per_min = 0; // Peer bad messages per minute
uint64_t max_bad_msgsigs_per_min = 0; // Peer bad signatures per minute
uint64_t max_dup_msgs_per_min = 0; // Peer max duplicate messages per minute
uint16_t max_connections = 0; // Max peer connections.
uint16_t max_known_connections = 0; // Max known peer connections.
uint16_t max_in_connections_per_host = 0; // Max inbound peer connections per remote host (IP).
uint64_t max_bytes_per_msg = 0; // Peer message max size in bytes.
uint64_t max_bytes_per_min = 0; // Peer message rate (characters(bytes) per minute).
uint64_t max_bad_msgs_per_min = 0; // Peer bad messages per minute.
uint64_t max_bad_msgsigs_per_min = 0; // Peer bad signatures per minute.
uint64_t max_dup_msgs_per_min = 0; // Peer max duplicate messages per minute.
uint16_t idle_timeout = 0; // Idle connection timeout for peer connections in seconds.
peer_discovery_config peer_discovery; // Peer discovery configs.
};

View File

@@ -49,7 +49,8 @@ namespace p2p
int start_peer_connections()
{
ctx.server.emplace(conf::cfg.mesh.port, metric_thresholds, conf::cfg.mesh.max_bytes_per_msg, conf::cfg.mesh.known_peers);
ctx.server.emplace(conf::cfg.mesh.port, metric_thresholds, conf::cfg.mesh.max_bytes_per_msg,
conf::cfg.mesh.max_connections, conf::cfg.mesh.max_in_connections_per_host, conf::cfg.mesh.known_peers);
if (ctx.server->start() == -1)
return -1;

View File

@@ -3,6 +3,7 @@
#include "../msg/fbuf/p2pmsg_helpers.hpp"
#include "../ledger.hpp"
#include "../unl.hpp"
#include "../conf.hpp"
#include "peer_comm_server.hpp"
#include "peer_comm_session.hpp"
#include "self_node.hpp"
@@ -13,9 +14,10 @@ namespace p2p
// Globally exposed weakly connected status variable.
bool is_weakly_connected = false;
peer_comm_server::peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5],
const uint64_t max_msg_size, std::vector<conf::peer_properties> &req_known_remotes)
: comm::comm_server<peer_comm_session>("Peer", port, metric_thresholds, max_msg_size),
peer_comm_server::peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size,
const uint64_t max_in_connections, const uint64_t max_in_connections_per_host,
std::vector<conf::peer_properties> &req_known_remotes)
: comm::comm_server<peer_comm_session>("Peer", port, metric_thresholds, max_msg_size, max_in_connections, max_in_connections_per_host),
req_known_remotes(req_known_remotes)
{
}

View File

@@ -31,8 +31,9 @@ namespace p2p
std::atomic<uint16_t> known_remote_count = 0;
std::mutex req_known_remotes_mutex;
std::vector<conf::peer_properties> &req_known_remotes;
peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5],
const uint64_t max_msg_size, std::vector<conf::peer_properties> &req_known_remotes);
peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size,
const uint64_t max_in_connections, const uint64_t max_in_connections_per_host,
std::vector<conf::peer_properties> &req_known_remotes);
};
} // namespace p2p

View File

@@ -79,7 +79,8 @@ namespace usr
*/
int start_listening()
{
ctx.server.emplace("User", conf::cfg.user.port, metric_thresholds, conf::cfg.user.max_bytes_per_msg);
ctx.server.emplace("User", conf::cfg.user.port, metric_thresholds, conf::cfg.user.max_bytes_per_msg,
conf::cfg.user.max_connections, conf::cfg.user.max_in_connections_per_host);
if (ctx.server->start() == -1)
return -1;