Large cluster optimizations. (#348)

* Added sync log to streamer.
* Fixed ledger closing attempt while syncing.
* Added diagnostic contract.
* Reset to stage 0 on unreliable votes.
* Reduced peer msg age threshold.
* Added health tracking.
* Weakly-connected detection improvement.
* Increased version 0.5.1.
* Improved client lib server version check.
* Added health logging support to text client.
* Added weakly connected status in status response.
* Increased max peers limits when serializing.
* Local docker cluster manual ip.
* Updated vultr script vm region order.
* Sync status reporting improvement.
* Added milliseconds to logging.
This commit is contained in:
Ravin Perera
2021-09-17 11:53:49 +05:30
committed by GitHub
parent c686745c81
commit 6dc0776b56
32 changed files with 720 additions and 86 deletions

View File

@@ -256,6 +256,12 @@ namespace conf
size_t max_file_count = 0; // Max no. of log files to keep.
};
struct health_config
{
bool proposal_stats = false;
bool connectivity_stats = false;
};
// Holds all the config values.
struct hp_config
{
@@ -265,6 +271,7 @@ namespace conf
user_config user;
hpfs_config hpfs;
log_config log;
health_config health; // For debugging only. Not included in the config file.
};
// Global contract context struct exposed to the application.

View File

@@ -92,6 +92,9 @@ namespace consensus
break;
}
if (ctx.stage == 0)
status::emit_proposal_health();
if (consensus() == -1)
{
LOG_ERROR << "Consensus thread exited due to an error.";
@@ -119,7 +122,7 @@ namespace consensus
revise_candidate_proposals(ctx.vote_status == VOTES_SYNCED);
// Attempt to close the ledger after scanning last round stage 3 proposals.
if (ctx.stage == 0)
if (ctx.stage == 0 && ctx.vote_status == VOTES_SYNCED)
attempt_ledger_close();
// Get current lcl, state, patch, primary shard and raw shard info.
@@ -156,7 +159,7 @@ namespace consensus
const size_t unl_count = unl::count();
vote_counter votes;
// Check whether we are in sync with other nodes using proposals.
// Check whether we are in sync with other nodes using the proposals we received.
{
int new_sync_status = check_sync_status(unl_count, votes, lcl_id);
@@ -174,7 +177,7 @@ namespace consensus
}
// Update the node's status if we went from in-sync to not-in-sync. We will report back as being in-sync only when ledger is created.
if (ctx.vote_status == VOTES_SYNCED && new_sync_status != VOTES_SYNCED)
if (new_sync_status == VOTES_DESYNC)
status::sync_status_changed(false);
// This marks entering into a new sync cycle.
@@ -195,9 +198,10 @@ namespace consensus
if (ctx.vote_status == VOTES_UNRELIABLE)
{
ctx.stage = 0;
ctx.unreliable_votes_attempts++;
// If we get too many consecative unreliable vote rounds, then we perform time config sniffing just in case the unreliable votes
// If we get too many consecutive unreliable vote rounds, then we perform time config sniffing just in case the unreliable votes
// are caused because our roundtime config information is different from other nodes.
if (ctx.unreliable_votes_attempts >= MAX_UNRELIABLE_VOTES_ATTEMPTS)
{
@@ -208,21 +212,21 @@ namespace consensus
else
{
ctx.unreliable_votes_attempts = 0;
}
if (ctx.vote_status == VOTES_SYNCED)
{
// If we are in sync, vote and broadcast the winning votes to next stage.
const p2p::proposal p = create_stage123_proposal(votes, unl_count, state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id);
broadcast_proposal(p);
// This marks the moment we finish a sync cycle. We are in stage 1 and we detect that our votes are in sync.
if (ctx.stage == 1 && ctx.sync_ongoing)
if (ctx.vote_status == VOTES_SYNCED)
{
// Clear any sync recovery pending state if we enter stage 1 while being in sync.
ctx.sync_ongoing = false;
status::sync_status_changed(true);
LOG_DEBUG << "Sync recovery completed.";
// If we are in sync, vote and broadcast the winning votes to next stage.
const p2p::proposal p = create_stage123_proposal(votes, unl_count, state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id);
broadcast_proposal(p);
// This marks the moment we finish a sync cycle. We are in stage 1 and we just detected that our votes are in sync.
if (ctx.stage == 1 && ctx.sync_ongoing)
{
// Clear any sync recovery pending state if we enter stage 1 while being in sync.
ctx.sync_ongoing = false;
status::sync_status_changed(true);
LOG_DEBUG << "Sync recovery completed.";
}
}
}
@@ -447,7 +451,7 @@ namespace consensus
/**
* Moves proposals collected from the network into candidate proposals and
* cleans up any outdated proposals from the candidate set.
* @param in_sync Whether the node is currently on sync or not. We relax the pruning criteria if we are not in sync.
* @param in_sync Whether the node is currently in sync or not. We relax the pruning criteria if we are not in sync.
*/
void revise_candidate_proposals(const bool in_sync)
{
@@ -459,6 +463,8 @@ namespace consensus
collected_proposals.splice(collected_proposals.end(), p2p::ctx.collected_msgs.proposals);
}
status::report_proposal_batch(collected_proposals);
// Prune incoming proposals if they are older than existing proposal from same node.
{
auto itr = collected_proposals.begin();

View File

@@ -43,15 +43,13 @@ namespace hplog
plog::util::localtime_s(&t, &record.getTime().time); // local time
plog::util::nostringstream ss;
ss << t.tm_year + 1900 << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_mon + 1 << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_mday << PLOG_NSTR(" ");
ss << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_hour << PLOG_NSTR(":")
<< std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_min << PLOG_NSTR(":")
<< std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_sec << PLOG_NSTR(" ");
// Uncomment for millseconds.
// << std::setfill(PLOG_NSTR('0')) << std::setw(3) << record.getTime().millitm << PLOG_NSTR(" ");
ss << PLOG_NSTR("[") << severity_to_string(record.getSeverity()) << PLOG_NSTR("][hpc] ");
ss << record.getMessage() << PLOG_NSTR("\n");
ss << t.tm_year + 1900 << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_mon + 1 << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_mday
<< PLOG_NSTR(" ") << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_hour
<< PLOG_NSTR(":") << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_min
<< PLOG_NSTR(":") << std::setfill(PLOG_NSTR('0')) << std::setw(2) << t.tm_sec
<< PLOG_NSTR(".") << std::setfill(PLOG_NSTR('0')) << std::setw(3) << record.getTime().millitm // Uncomment for millseconds.
<< PLOG_NSTR(" ") << PLOG_NSTR("[") << severity_to_string(record.getSeverity())
<< PLOG_NSTR("][hpc] ") << record.getMessage() << PLOG_NSTR("\n");
return ss.str();
}

View File

@@ -24,6 +24,7 @@ namespace msg::usrmsg::bson
* "contract_execution_enabled": true | false,
* "read_requests_enabled": true | false,
* "is_full_history_node": true | false,
* "weakly_connected": true | false,
* "current_unl": [ <ed prefixed pubkey>, ... ],
* "peers": [ "ip:port", ... ]
* }
@@ -33,6 +34,7 @@ namespace msg::usrmsg::bson
const util::sequence_hash lcl_id = status::get_lcl_id();
const std::set<std::string> unl = status::get_unl();
const bool in_sync = status::is_in_sync();
const bool weakly_connected = status::get_weakly_connected();
jsoncons::bson::bson_bytes_encoder encoder(msg);
encoder.begin_object();
@@ -54,6 +56,8 @@ namespace msg::usrmsg::bson
encoder.bool_value(conf::cfg.user.concurrent_read_requests != 0);
encoder.key(msg::usrmsg::FLD_IS_FULL_HISTORY_NODE);
encoder.bool_value(conf::cfg.node.history == conf::HISTORY::FULL);
encoder.key(msg::usrmsg::FLD_WEAKLY_CONNECTED);
encoder.bool_value(weakly_connected);
encoder.key(msg::usrmsg::FLD_CURRENT_UNL);
encoder.begin_array();
@@ -315,6 +319,70 @@ namespace msg::usrmsg::bson
encoder.flush();
}
/**
* Constructs health stat message.
* @param msg Buffer to construct the generated bson message into.
* Message format:
* {
* "type": "health_event",
* "proposals": {
* "comm_latency": {min:0, max:0, avg:0},
* "read_latency": {min:0, max:0, avg:0}
* "batch_size": 0
* },
* "peer_count": 0,
* "weakly_connected": true | false
* }
* @param ev Current health information.
*/
void create_health_notification(std::vector<uint8_t> &msg, const status::health_event &ev)
{
jsoncons::bson::bson_bytes_encoder encoder(msg);
encoder.begin_object();
encoder.key(msg::usrmsg::FLD_TYPE);
encoder.string_value(msg::usrmsg::MSGTYPE_HEALTH_EVENT);
encoder.key(msg::usrmsg::FLD_EVENT);
if (ev.index() == 0)
{
const status::proposal_health &phealth = std::get<status::proposal_health>(ev);
encoder.string_value(msg::usrmsg::HEALTH_EVENT_PROPOSAL);
encoder.key(msg::usrmsg::FLD_COMM_LATENCY);
encoder.begin_object();
encoder.key(msg::usrmsg::FLD_MIN);
encoder.uint64_value(phealth.comm_latency_min);
encoder.key(msg::usrmsg::FLD_MAX);
encoder.uint64_value(phealth.comm_latency_max);
encoder.key(msg::usrmsg::FLD_AVG);
encoder.uint64_value(phealth.comm_latency_avg);
encoder.end_object();
encoder.key(msg::usrmsg::FLD_READ_LATENCY);
encoder.begin_object();
encoder.key(msg::usrmsg::FLD_MIN);
encoder.uint64_value(phealth.read_latency_min);
encoder.key(msg::usrmsg::FLD_MAX);
encoder.uint64_value(phealth.read_latency_max);
encoder.key(msg::usrmsg::FLD_AVG);
encoder.uint64_value(phealth.read_latency_avg);
encoder.end_object();
encoder.key(msg::usrmsg::FLD_BATCH_SIZE);
encoder.uint64_value(phealth.batch_size);
}
else if (ev.index() == 1)
{
const status::connectivity_health &conn = std::get<status::connectivity_health>(ev);
encoder.string_value(msg::usrmsg::HEALTH_EVENT_CONNECTIVITY);
encoder.key(msg::usrmsg::FLD_PEER_COUNT);
encoder.uint64_value(conn.peer_count);
encoder.key(msg::usrmsg::FLD_WEAKLY_CONNECTED);
encoder.bool_value(conn.is_weakly_connected);
}
encoder.end_object();
encoder.flush();
}
/**
* Constructs a ledger query response.
* @param msg Buffer to construct the generated bson message string into.
@@ -533,6 +601,11 @@ namespace msg::usrmsg::bson
{
channel = usr::NOTIFICATION_CHANNEL::UNL_CHANGE;
}
else if (d[msg::usrmsg::FLD_CHANNEL] == msg::usrmsg::MSGTYPE_HEALTH_EVENT &&
(conf::cfg.health.proposal_stats || conf::cfg.health.connectivity_stats))
{
channel = usr::NOTIFICATION_CHANNEL::HEALTH_STAT;
}
else
{
LOG_DEBUG << "User subscription request invalid channel.";

View File

@@ -5,6 +5,7 @@
#include "../../util/merkle_hash_tree.hpp"
#include "../../ledger/ledger_query.hpp"
#include "../../usr/user_common.hpp"
#include "../../status.hpp"
namespace msg::usrmsg::bson
{
@@ -28,6 +29,8 @@ namespace msg::usrmsg::bson
void create_sync_status_notification(std::vector<uint8_t> &msg, const bool in_sync);
void create_health_notification(std::vector<uint8_t> &msg, const status::health_event &ev);
void create_ledger_query_response(std::vector<uint8_t> &msg, std::string_view reply_for,
const ledger::query::query_result &result);

View File

@@ -44,7 +44,7 @@ namespace msg::fbuf::p2pmsg
if (session && session->challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED && message.size() <= MAX_SIZE_FOR_TIME_CHECK)
{
const uint64_t time_now = util::get_epoch_milliseconds();
if (p2p_msg->created_on() < (time_now - (conf::cfg.contract.roundtime * 4)))
if (p2p_msg->created_on() < (time_now - (conf::cfg.contract.roundtime * 3)))
{
LOG_DEBUG << "Peer message is too old. type:" << p2p_msg->content_type() << " from:" << (session ? session->display_name() : "");
return p2p::peer_message_info{NULL, P2PMsgContent_NONE, 0};

View File

@@ -143,6 +143,7 @@ namespace msg::usrmsg::json
* "contract_execution_enabled": true | false,
* "read_requests_enabled": true | false,
* "is_full_history_node": true | false,
* "weakly_connected": true | false,
* "current_unl": [ "<ed prefixed pubkey hex>"", ... ],
* "peers": [ "ip:port", ... ]
* }
@@ -152,6 +153,7 @@ namespace msg::usrmsg::json
const util::sequence_hash lcl_id = status::get_lcl_id();
const std::set<std::string> unl = status::get_unl();
const bool in_sync = status::is_in_sync();
const bool weakly_connected = status::get_weakly_connected();
msg.reserve(1024);
msg += "{\"";
@@ -191,6 +193,11 @@ namespace msg::usrmsg::json
msg += SEP_COLON_NOQUOTE;
msg += conf::cfg.node.history == conf::HISTORY::FULL ? STR_TRUE : STR_FALSE;
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_WEAKLY_CONNECTED;
msg += SEP_COLON_NOQUOTE;
msg += weakly_connected ? STR_TRUE : STR_FALSE;
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_CURRENT_UNL;
msg += SEP_COLON_NOQUOTE;
msg += OPEN_SQR_BRACKET;
@@ -546,6 +553,97 @@ namespace msg::usrmsg::json
msg += "}";
}
/**
* Constructs health stat message.
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
* "type": "health_event",
* "event": "proposal" | "connectivity",
*
* // proposal
* "comm_latency": {min:0, max:0, avg:0},
* "read_latency": {min:0, max:0, avg:0}
* "batch_size": 0
*
* // connectivity
* "peer_count": 0,
* "weakly_connected": true | false
* }
* @param ev Current health information.
*/
void create_health_notification(std::vector<uint8_t> &msg, const status::health_event &ev)
{
msg.reserve(128);
msg += "{\"";
msg += msg::usrmsg::FLD_TYPE;
msg += SEP_COLON;
msg += msg::usrmsg::MSGTYPE_HEALTH_EVENT;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_EVENT;
msg += SEP_COLON;
if (ev.index() == 0)
{
const status::proposal_health &phealth = std::get<status::proposal_health>(ev);
msg += msg::usrmsg::HEALTH_EVENT_PROPOSAL;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_COMM_LATENCY;
msg += SEP_COLON_NOQUOTE;
msg += "{\"";
msg += msg::usrmsg::FLD_MIN;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(phealth.comm_latency_min);
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_MAX;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(phealth.comm_latency_max);
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_AVG;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(phealth.comm_latency_avg);
msg += "}";
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_READ_LATENCY;
msg += SEP_COLON_NOQUOTE;
msg += "{\"";
msg += msg::usrmsg::FLD_MIN;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(phealth.read_latency_min);
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_MAX;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(phealth.read_latency_max);
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_AVG;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(phealth.read_latency_avg);
msg += "}";
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_BATCH_SIZE;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(phealth.batch_size);
}
else if (ev.index() == 1)
{
const status::connectivity_health &conn = std::get<status::connectivity_health>(ev);
msg += msg::usrmsg::HEALTH_EVENT_CONNECTIVITY;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_PEER_COUNT;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(conn.peer_count);
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_WEAKLY_CONNECTED;
msg += SEP_COLON_NOQUOTE;
msg += conn.is_weakly_connected ? STR_TRUE : STR_FALSE;
}
msg += "}";
}
/**
* Constructs a ledger query response.
* @param msg Buffer to construct the generated json message string into.
@@ -892,6 +990,11 @@ namespace msg::usrmsg::json
{
channel = usr::NOTIFICATION_CHANNEL::UNL_CHANGE;
}
else if (d[msg::usrmsg::FLD_CHANNEL] == msg::usrmsg::MSGTYPE_HEALTH_EVENT &&
(conf::cfg.health.proposal_stats || conf::cfg.health.connectivity_stats))
{
channel = usr::NOTIFICATION_CHANNEL::HEALTH_STAT;
}
else
{
LOG_DEBUG << "User subscription request invalid channel.";

View File

@@ -5,6 +5,7 @@
#include "../../util/merkle_hash_tree.hpp"
#include "../../ledger/ledger_query.hpp"
#include "../../usr/user_common.hpp"
#include "../../status.hpp"
namespace msg::usrmsg::json
{
@@ -32,6 +33,8 @@ namespace msg::usrmsg::json
void create_sync_status_notification(std::vector<uint8_t> &msg, const bool in_sync);
void create_health_notification(std::vector<uint8_t> &msg, const status::health_event &ev);
void create_ledger_query_response(std::vector<uint8_t> &msg, std::string_view reply_for,
const ledger::query::query_result &result);

View File

@@ -9,7 +9,7 @@ namespace msg::usrmsg
constexpr size_t CHALLENGE_LEN = 16;
// Max no. of known peers to return in get status.
constexpr const size_t MAX_KNOWN_PEERS_INFO = 16;
constexpr const size_t MAX_KNOWN_PEERS_INFO = 100;
// Message field names
constexpr const char *FLD_HP_VERSION = "hp_version";
@@ -65,6 +65,14 @@ namespace msg::usrmsg
constexpr const char *FLD_LEDGER = "ledger";
constexpr const char *FLD_CHANNEL = "channel";
constexpr const char *FLD_ENABLED = "enabled";
constexpr const char *FLD_COMM_LATENCY = "comm_latency";
constexpr const char *FLD_READ_LATENCY = "read_latency";
constexpr const char *FLD_BATCH_SIZE = "batch_size";
constexpr const char *FLD_MIN = "min";
constexpr const char *FLD_MAX = "max";
constexpr const char *FLD_AVG = "avg";
constexpr const char *FLD_PEER_COUNT = "peer_count";
constexpr const char *FLD_WEAKLY_CONNECTED = "weakly_connected";
// Message types
constexpr const char *MSGTYPE_USER_CHALLENGE = "user_challenge";
@@ -81,6 +89,7 @@ namespace msg::usrmsg
constexpr const char *MSGTYPE_LCL_RESPONSE = "lcl_response";
constexpr const char *MSGTYPE_UNL_CHANGE = "unl_change";
constexpr const char *MSGTYPE_LEDGER_EVENT = "ledger_event";
constexpr const char *MSGTYPE_HEALTH_EVENT = "health_event";
constexpr const char *MSGTYPE_LEDGER_QUERY = "ledger_query";
constexpr const char *MSGTYPE_LEDGER_QUERY_RESULT = "ledger_query_result";
constexpr const char *MSGTYPE_SUBSCRIPTION = "subscription";
@@ -103,7 +112,8 @@ namespace msg::usrmsg
constexpr const char *STR_FALSE = "false";
constexpr const char *LEDGER_EVENT_LEDGER_CREATED = "ledger_created";
constexpr const char *LEDGER_EVENT_SYNC_STATUS = "sync_status";
constexpr const char *HEALTH_EVENT_PROPOSAL = "proposal";
constexpr const char *HEALTH_EVENT_CONNECTIVITY = "connectivity";
} // namespace msg::usrmsg

View File

@@ -80,6 +80,14 @@ namespace msg::usrmsg
busrmsg::create_sync_status_notification(msg, in_sync);
}
void usrmsg_parser::create_health_notification(std::vector<uint8_t> &msg, const status::health_event &ev) const
{
if (protocol == util::PROTOCOL::JSON)
jusrmsg::create_health_notification(msg, ev);
else
busrmsg::create_health_notification(msg, ev);
}
void usrmsg_parser::create_ledger_query_response(std::vector<uint8_t> &msg, std::string_view reply_for,
const ledger::query::query_result &result) const
{

View File

@@ -6,6 +6,7 @@
#include "../util/merkle_hash_tree.hpp"
#include "../ledger/ledger_query.hpp"
#include "../usr/user_common.hpp"
#include "../status.hpp"
namespace msg::usrmsg
{
@@ -37,6 +38,8 @@ namespace msg::usrmsg
void create_sync_status_notification(std::vector<uint8_t> &msg, const bool in_sync) const;
void create_health_notification(std::vector<uint8_t> &msg, const status::health_event &ev) const;
void create_ledger_query_response(std::vector<uint8_t> &msg, std::string_view reply_for,
const ledger::query::query_result &result) const;

View File

@@ -14,7 +14,7 @@
namespace p2pmsg = msg::fbuf::p2pmsg;
// Maximum no. of peers that will be persisted back to config upon exit.
constexpr size_t MAX_PERSISTED_KNOWN_PEERS = 50;
constexpr size_t MAX_PERSISTED_KNOWN_PEERS = 100;
namespace p2p
{
@@ -409,8 +409,16 @@ namespace p2p
*/
void send_known_peer_list(peer_comm_session *session)
{
const std::vector<peer_properties> &peers = ctx.server->req_known_remotes;
// Add self to known peer announcement (indicated as blank host address).
// peers.push_back(peer_properties{
// conf::peer_ip_port{"", conf::cfg.mesh.port},
// status::get_available_mesh_capacity(),
// util::get_epoch_milliseconds()});
flatbuffers::FlatBufferBuilder fbuf;
p2pmsg::create_msg_from_peer_list_response(fbuf, ctx.server->req_known_remotes, session->known_ipport);
p2pmsg::create_msg_from_peer_list_response(fbuf, peers, session->known_ipport);
session->send(msg::fbuf::builder_to_string_view(fbuf));
}
@@ -432,7 +440,7 @@ namespace p2p
itr->available_capacity = available_capacity;
itr->timestamp = timestamp;
// Sorting the known remote list according to the weight value after updating the peer properties.
// Sorting the known remote list according to the weight value after updating the peer properties.
sort_known_remotes();
}
}
@@ -452,13 +460,21 @@ namespace p2p
/**
* Merging the response peer list with the own known peer list.
* @param peers Incoming peer list.
* @param from The session that sent us the peer list.
*/
void merge_peer_list(const std::vector<peer_properties> &peers)
void merge_peer_list(const std::vector<peer_properties> &peers, const p2p::peer_comm_session &from)
{
std::scoped_lock<std::mutex> lock(ctx.server->req_known_remotes_mutex);
for (const peer_properties &peer : peers)
{
// If the peer address is indicated as empty, that is the entry for the peer who sent us this.
// We then fill that up with the host address we see for that peer.
// if (peer.ip_port.host_address.empty())
// {
// peer.ip_port.host_address = from.host_address;
// }
// If the peer is self, we won't add to the known peer list.
if (self::ip_port.has_value() && self::ip_port == peer.ip_port)
{
@@ -519,7 +535,7 @@ namespace p2p
* Calculate and retunrns the available capacity.
* @returns -1 if available capacity is unlimited otherwise available value.
*/
int16_t get_available_capacity()
int16_t calculate_available_capacity()
{
// If both max_connections and max_known_connections are configured calculate the capacity.
if (conf::cfg.mesh.max_connections != 0 && conf::cfg.mesh.max_known_connections != 0)

View File

@@ -237,11 +237,11 @@ namespace p2p
void update_known_peer_available_capacity(const conf::peer_ip_port &ip_port, const int16_t available_capacity, const uint64_t &timestamp);
void merge_peer_list(const std::vector<peer_properties> &peers);
void merge_peer_list(const std::vector<peer_properties> &peers, const p2p::peer_comm_session &from);
void sort_known_remotes();
int16_t get_available_capacity();
int16_t calculate_available_capacity();
void update_unl_connections();

View File

@@ -11,8 +11,6 @@
namespace p2p
{
constexpr float WEAKLY_CONNECTED_THRESHOLD = 0.7;
// Globally exposed weakly connected status variable.
bool is_weakly_connected = false;
peer_comm_server::peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size,
const uint64_t max_in_connections, const uint64_t max_in_connections_per_host,
@@ -45,24 +43,31 @@ namespace p2p
uint16_t peer_managing_counter = 0;
uint16_t known_connections_counter = 0;
uint16_t available_capacity_counter = 0;
while (!is_shutting_down)
{
peer_managing_counter++;
known_connections_counter++;
available_capacity_counter++;
if (known_connections_counter % 20 == 0)
if (known_connections_counter % 40 == 0)
{
maintain_known_connections();
known_connections_counter = 0;
}
// Send available peer capacity if peer max connections is configured.
if (conf::cfg.mesh.max_connections != 0)
p2p::send_available_capacity_announcement(p2p::get_available_capacity());
if (available_capacity_counter % 300 == 0)
{
status::set_available_mesh_capacity(p2p::calculate_available_capacity());
// Send available peer capacity if peer max connections is configured.
if (conf::cfg.mesh.max_connections != 0)
p2p::send_available_capacity_announcement(status::get_available_mesh_capacity());
}
// Start peer list request loop if dynamic peer discovery is enabled.
if (conf::cfg.mesh.peer_discovery.enabled && known_remote_count > 0)
if (conf::cfg.mesh.peer_discovery.enabled)
{
// If max known peer connection cap is reached then periodically request peer list from random known peer.
// Otherwise frequently request peer list from a random known peer.
@@ -193,23 +198,32 @@ namespace p2p
}
}
/**
* Check whether the node is weakly connected or strongly connected in every 60 seconds.
* Check whether the node is weakly connected or strongly connected.
*/
void peer_comm_server::detect_if_weakly_connected()
{
if (connected_status_check_counter == 600)
// If the node is already weakly connected, check every 2 seconds whether we are now strongly connected.
// Otherwise check every 60 seconds. This makes it harder to become weakly connected and easier to get out of it.
// This can help with unnessary flooding of forwarded messages across the network.
bool weakly_connected = status::get_weakly_connected();
if (connected_status_check_counter == (weakly_connected ? 20 : 600))
{
// Get the count of peers which are unl nodes.
// One is added to session list size only if we are a unl node, to reflect the self connection.
// One is added to peer count only if we are a unl node, to reflect the self connection.
const int connected_peer_count = std::count_if(sessions.begin(), sessions.end(), [](const p2p::peer_comm_session &session)
{ return session.is_unl; }) +
(conf::cfg.node.is_unl ? 1 : 0);
const bool current_state = connected_peer_count < (unl::count() * WEAKLY_CONNECTED_THRESHOLD);
if (is_weakly_connected != current_state)
if (weakly_connected != current_state)
{
is_weakly_connected = !is_weakly_connected;
send_peer_requirement_announcement(is_weakly_connected);
LOG_DEBUG << "Sent weakly connected announcement.";
weakly_connected = !weakly_connected;
send_peer_requirement_announcement(weakly_connected);
status::set_weakly_connected(weakly_connected);
if (weakly_connected)
LOG_WARNING << "Became weakly connected.";
else
LOG_INFO << "No longer weakly connected.";
}
connected_status_check_counter = 0;
}

View File

@@ -6,9 +6,6 @@
namespace p2p
{
// Globally exposed weakly connected status variable.
extern bool is_weakly_connected;
struct peer_properties;
class peer_comm_server : public comm::comm_server<peer_comm_session>

View File

@@ -13,6 +13,7 @@
#include "p2p.hpp"
#include "../unl.hpp"
#include "../sc/hpfs_log_sync.hpp"
#include "../status.hpp"
namespace p2pmsg = msg::fbuf::p2pmsg;
@@ -32,7 +33,7 @@ namespace p2p
int handle_peer_connect(p2p::peer_comm_session &session)
{
// Skip new inbound connection if max inbound connection cap is reached.
if (session.is_inbound && get_available_capacity() == 0)
if (session.is_inbound && calculate_available_capacity() == 0)
{
LOG_DEBUG << "Max peer connection cap reached. Rejecting new peer connection [" << session.display_name() << "]";
return -1;
@@ -148,7 +149,7 @@ namespace p2p
if (mi.type == p2pmsg::P2PMsgContent_PeerListResponseMsg)
{
p2p::merge_peer_list(p2pmsg::create_peer_list_response_from_msg(mi));
p2p::merge_peer_list(p2pmsg::create_peer_list_response_from_msg(mi), session);
}
else if (mi.type == p2pmsg::P2PMsgContent_PeerListRequestMsg)
{
@@ -334,9 +335,7 @@ namespace p2p
void handle_peer_on_verified(p2p::peer_comm_session &session)
{
// Sending newly verified node the requirement of consensus msg fowarding if this node is weakly connected.
if (p2p::is_weakly_connected)
{
p2p::send_peer_requirement_announcement(is_weakly_connected, &session);
}
if (status::get_weakly_connected())
p2p::send_peer_requirement_announcement(true, &session);
}
} // namespace p2p

View File

@@ -2,6 +2,7 @@
#include "util/sequence_hash.hpp"
#include "ledger/ledger_common.hpp"
#include "conf.hpp"
#include "p2p/p2p.hpp"
namespace status
{
@@ -20,6 +21,11 @@ namespace status
std::shared_mutex peers_mutex;
std::set<conf::peer_ip_port> peers; // Known ip:port pairs for connection verified peers.
std::atomic<size_t> peer_count = 0;
std::atomic<bool> weakly_connected = false;
std::atomic<int16_t> available_mesh_capacity = -1;
proposal_health phealth = {};
//----- Ledger status
@@ -33,7 +39,7 @@ namespace status
void ledger_created(const util::sequence_hash &ledger_id, const ledger::ledger_record &ledger)
{
// If currently not-in-sync, report it as in-sync when a ledger is created.
if (in_sync != 1)
if (in_sync.load() != 1)
sync_status_changed(true);
std::unique_lock lock(ledger_mutex);
@@ -44,8 +50,12 @@ namespace status
void sync_status_changed(const bool new_in_sync)
{
in_sync = new_in_sync ? 1 : 0;
event_queue.try_enqueue(sync_status_change_event{new_in_sync});
const int new_value = new_in_sync ? 1 : 0;
if (new_value != in_sync.load())
{
in_sync = new_value;
event_queue.try_enqueue(sync_status_change_event{new_in_sync});
}
}
const util::sequence_hash get_lcl_id()
@@ -56,7 +66,7 @@ namespace status
const bool is_in_sync()
{
return in_sync == 1;
return in_sync.load() == 1;
}
const ledger::ledger_record get_last_ledger()
@@ -93,6 +103,14 @@ namespace status
{
std::unique_lock lock(peers_mutex);
peers = std::move(updated_peers);
if (peers.size() != peer_count)
{
peer_count = peers.size();
if (conf::cfg.health.connectivity_stats)
event_queue.try_enqueue(connectivity_health{peer_count.load(), weakly_connected.load()});
}
}
const std::set<conf::peer_ip_port> get_peers()
@@ -101,4 +119,90 @@ namespace status
return peers;
}
const size_t get_peers_count()
{
return peer_count.load();
}
void set_weakly_connected(const bool is_weakly_connected)
{
if (weakly_connected.load() != is_weakly_connected)
{
weakly_connected = is_weakly_connected;
if (conf::cfg.health.connectivity_stats)
event_queue.try_enqueue(connectivity_health{peer_count.load(), weakly_connected.load()});
}
}
const bool get_weakly_connected()
{
return weakly_connected.load();
}
void set_available_mesh_capacity(const int16_t new_capacity)
{
available_mesh_capacity = new_capacity;
}
const int16_t get_available_mesh_capacity()
{
return available_mesh_capacity.load();
}
//----- Node health
void report_proposal_batch(const std::list<p2p::proposal> &proposals)
{
if (!conf::cfg.health.proposal_stats)
return;
phealth.comm_latency_min = UINT64_MAX;
phealth.comm_latency_max = 0;
phealth.comm_latency_avg = 0;
phealth.read_latency_min = UINT64_MAX;
phealth.read_latency_max = 0;
phealth.read_latency_avg = 0;
phealth.batch_size = proposals.size();
if (phealth.batch_size == 0)
return;
const uint64_t now = util::get_epoch_milliseconds();
uint64_t total_comm_latency = 0;
uint64_t total_read_latency = 0;
for (const p2p::proposal &p : proposals)
{
const uint64_t comm_latency = (p.sent_timestamp < p.recv_timestamp) ? (p.recv_timestamp - p.sent_timestamp) : 0;
const uint64_t read_latency = now - p.recv_timestamp;
total_comm_latency += comm_latency;
total_read_latency += read_latency;
if (comm_latency < phealth.comm_latency_min)
phealth.comm_latency_min = comm_latency;
if (comm_latency > phealth.comm_latency_max)
phealth.comm_latency_max = comm_latency;
if (read_latency < phealth.read_latency_min)
phealth.read_latency_min = read_latency;
if (read_latency > phealth.read_latency_max)
phealth.read_latency_max = read_latency;
}
phealth.comm_latency_avg = total_comm_latency / phealth.batch_size;
phealth.read_latency_avg = total_read_latency / phealth.batch_size;
}
void emit_proposal_health()
{
if (!conf::cfg.health.proposal_stats)
return;
event_queue.try_enqueue(phealth);
}
} // namespace status

View File

@@ -5,6 +5,7 @@
#include "util/sequence_hash.hpp"
#include "ledger/ledger_common.hpp"
#include "conf.hpp"
#include "p2p/p2p.hpp"
namespace status
{
@@ -23,8 +24,27 @@ namespace status
bool in_sync = false;
};
struct proposal_health
{
uint64_t comm_latency_min = 0;
uint64_t comm_latency_max = 0;
uint64_t comm_latency_avg = 0;
uint64_t read_latency_min = 0;
uint64_t read_latency_max = 0;
uint64_t read_latency_avg = 0;
uint64_t batch_size = 0;
};
struct connectivity_health
{
size_t peer_count = 0;
bool is_weakly_connected = false;
};
typedef std::variant<proposal_health, connectivity_health> health_event;
// Represents any kind of change that has happened in the node.
typedef std::variant<unl_change_event, ledger_created_event, sync_status_change_event> change_event;
typedef std::variant<unl_change_event, ledger_created_event, sync_status_change_event, health_event> change_event;
extern moodycamel::ConcurrentQueue<change_event> event_queue;
@@ -41,6 +61,14 @@ namespace status
void set_peers(const std::set<conf::peer_ip_port> &updated_peers);
const std::set<conf::peer_ip_port> get_peers();
const size_t get_peers_count();
void set_weakly_connected(const bool is_weakly_connected);
const bool get_weakly_connected();
void set_available_mesh_capacity(const int16_t new_capacity);
const int16_t get_available_mesh_capacity();
void report_proposal_batch(const std::list<p2p::proposal> &proposals);
void emit_proposal_health();
} // namespace status

View File

@@ -7,7 +7,8 @@ namespace usr
enum NOTIFICATION_CHANNEL
{
UNL_CHANGE = 0,
LEDGER_EVENT = 1
LEDGER_EVENT = 1,
HEALTH_STAT = 2
};
} // namespace usr

View File

@@ -609,6 +609,24 @@ namespace usr
}
}
}
else if (ev.index() == 3) // Health events. Broadcast for subscribed users.
{
std::scoped_lock<std::mutex> lock(ctx.users_mutex);
for (auto &[sid, user] : ctx.users)
{
if (user.subscriptions[NOTIFICATION_CHANNEL::HEALTH_STAT])
{
std::vector<uint8_t> &msg = protocol_msgs[user.protocol];
if (msg.empty()) // Construct the message with relevant protocol if not done so already.
{
msg::usrmsg::usrmsg_parser parser(user.protocol);
const status::health_event &health_ev = std::get<status::health_event>(ev);
parser.create_health_notification(msg, health_ev);
}
user.session.send(msg);
}
}
}
}
}

View File

@@ -36,7 +36,7 @@ namespace usr
size_t collected_input_size = 0;
// User's notification subscription toggles.
bool subscriptions[2];
bool subscriptions[3];
// Holds the websocket session of this user.
// We don't need to own the session object since the lifetime of user and session are coupled.
@@ -55,6 +55,7 @@ namespace usr
// Default subscriptions.
subscriptions[NOTIFICATION_CHANNEL::UNL_CHANGE] = false;
subscriptions[NOTIFICATION_CHANNEL::LEDGER_EVENT] = false;
subscriptions[NOTIFICATION_CHANNEL::HEALTH_STAT] = false;
}
};

View File

@@ -6,7 +6,7 @@
namespace version
{
// Hot Pocket version. Written to new configs and p2p/user messages.
constexpr const char *HP_VERSION = "0.5.0";
constexpr const char *HP_VERSION = "0.5.1";
// Minimum compatible config version (this will be used to validate configs).
constexpr const char *MIN_CONFIG_VERSION = "0.5.0";