Config runtime re-save refactor. (#258)

* Check peer discovery config when persisting peers.

* Refactored config persistance.

* Fixed config read offset issue.

* Refactored peer list structs.

* Minor constexpr.
This commit is contained in:
Ravin Perera
2021-02-25 14:25:12 +05:30
committed by GitHub
parent 5b56d9c1b3
commit 1cebcb1c35
9 changed files with 98 additions and 63 deletions

View File

@@ -62,6 +62,9 @@ namespace conf
{
if (init_success)
{
if (persist_updated_configs() == -1)
LOG_ERROR << "Failed to persist config updates.";
// Releases the config file lock at the termination.
release_config_lock();
}
@@ -135,7 +138,7 @@ namespace conf
cfg.hp_version = util::HP_VERSION;
cfg.node.role = startup_role = ROLE::VALIDATOR;
cfg.node.role = ROLE::VALIDATOR;
cfg.node.history = HISTORY::CUSTOM;
cfg.node.history_config.max_primary_shards = 1;
cfg.node.history_config.max_blob_shards = 1;
@@ -318,15 +321,14 @@ namespace conf
}
if (node["role"] == ROLE_OBSERVER)
cfg.node.role = ROLE::OBSERVER;
startup_role = cfg.node.role = ROLE::OBSERVER;
else if (node["role"] == ROLE_VALIDATOR)
cfg.node.role = ROLE::VALIDATOR;
startup_role = cfg.node.role = ROLE::VALIDATOR;
else
{
std::cerr << "Invalid mode. 'observer' or 'validator' expected.\n";
return -1;
}
startup_role = cfg.node.role;
if (node["history"] == HISTORY_FULL)
cfg.node.history = HISTORY::FULL;
@@ -399,11 +401,10 @@ namespace conf
return -1;
}
peer_properties peer;
peer.ip_port.host_address = splitted_peers.front();
peer.ip_port.port = std::stoi(splitted_peers.back());
cfg.mesh.known_peers.push_back(peer);
peer_ip_port ipp;
ipp.host_address = splitted_peers.front();
ipp.port = std::stoi(splitted_peers.back());
cfg.mesh.known_peers.emplace(ipp);
splitted_peers.clear();
}
cfg.mesh.msg_forwarding = mesh["msg_forwarding"].as<bool>();
@@ -514,7 +515,6 @@ namespace conf
jsoncons::ojson node_config;
node_config.insert_or_assign("public_key", cfg.node.public_key_hex);
node_config.insert_or_assign("private_key", cfg.node.private_key_hex);
// We always save the startup role to config. Not the current role which might get changed dynamically during syncing.
node_config.insert_or_assign("role", cfg.node.role == ROLE::OBSERVER ? ROLE_OBSERVER : ROLE_VALIDATOR);
node_config.insert_or_assign("history", cfg.node.history == HISTORY::FULL ? HISTORY_FULL : HISTORY_CUSTOM);
@@ -541,9 +541,9 @@ namespace conf
mesh_config.insert_or_assign("idle_timeout", cfg.mesh.idle_timeout);
jsoncons::ojson peers(jsoncons::json_array_arg);
for (const auto &peer : cfg.mesh.known_peers)
for (const auto &ipp : cfg.mesh.known_peers)
{
const std::string concat_str = std::string(peer.ip_port.host_address).append(":").append(std::to_string(peer.ip_port.port));
const std::string concat_str = std::string(ipp.host_address).append(":").append(std::to_string(ipp.port));
peers.push_back(concat_str);
}
mesh_config.insert_or_assign("known_peers", peers);
@@ -815,9 +815,12 @@ namespace conf
}
buf.clear();
// Persist new changes to HP config file.
if (parse_contract_section_json(cfg.contract, jdoc, true) == -1 ||
write_config(cfg) == -1)
// Persist new changes to config file and runtime config.
hp_config temp_cfg;
if (read_config(temp_cfg) == -1 ||
parse_contract_section_json(temp_cfg.contract, jdoc, true) == -1 ||
parse_contract_section_json(cfg.contract, jdoc, true) == -1 ||
write_config(temp_cfg) == -1)
{
LOG_ERROR << "Error applying patch config.";
return -1;
@@ -828,18 +831,31 @@ namespace conf
}
/**
* Persists the specified known peers to the config file.
* Persists any updated config fields back to config file.
*/
int persist_known_peers_config(const std::vector<peer_properties> &peers)
int persist_updated_configs()
{
if (peers.empty())
const bool contains_updated_config = cfg.mesh.peer_discovery.enabled;
bool changes_made = false;
if (!contains_updated_config)
return 0;
const size_t max_count = conf::cfg.mesh.max_known_connections == 0
? peers.size()
: MIN(peers.size(), conf::cfg.mesh.max_known_connections);
cfg.mesh.known_peers = std::vector<peer_properties>(peers.begin(), peers.begin() + max_count);
return write_config(cfg);
// Read the original config into a temp struct.
hp_config temp_cfg;
if (read_config(temp_cfg) == -1)
return -1;
// Apply any actual changes to the temp struct.
// Apply known peer list updates.
if (conf::cfg.mesh.peer_discovery.enabled && !cfg.mesh.known_peers.empty())
{
temp_cfg.mesh.known_peers = cfg.mesh.known_peers;
changes_made = true;
}
// Persis the temp struct if any changes made to values.
return changes_made ? write_config(temp_cfg) : 0;
}
/**

View File

@@ -17,25 +17,20 @@ namespace conf
std::string host_address;
uint16_t port;
bool operator==(const peer_ip_port &ip_port)
bool operator==(const peer_ip_port &other) const
{
return host_address == ip_port.host_address && port == ip_port.port;
return host_address == other.host_address && port == other.port;
}
bool operator!=(const peer_ip_port &ip_port)
bool operator!=(const peer_ip_port &other) const
{
return !(host_address == ip_port.host_address && port == ip_port.port);
return !(host_address == other.host_address && port == other.port);
}
};
// Struct to represent information about a peer.
// Initially available capacity is set to -1 and timestamp is set to 0.
// Later it will be updated according to the capacity anouncement from the peers.
struct peer_properties
{
peer_ip_port ip_port;
int16_t available_capacity = -1;
uint64_t timestamp = 0;
bool operator<(const peer_ip_port &other) const
{
return (host_address == other.host_address) ? port < other.port : host_address < other.host_address;
}
};
// The role of the contract node.
@@ -154,7 +149,7 @@ namespace conf
uint16_t port = 0; // Listening port for peer connections
bool listen = true; // Whether to listen for incoming peer connections.
uint32_t idle_timeout = 0; // Idle connection timeout ms for peer connections.
std::vector<peer_properties> known_peers; // Vector of peers with ip_port, timestamp, capacity.
std::set<peer_ip_port> known_peers; // Ordered set of peers with ip_port.
bool msg_forwarding = false; // Whether peer message forwarding is on/off.
uint16_t max_connections = 0; // Max peer connections.
uint16_t max_known_connections = 0; // Max known peer connections.
@@ -248,7 +243,7 @@ namespace conf
int apply_patch_config(std::string_view hpfs_session_name);
int persist_known_peers_config(const std::vector<peer_properties> &peers);
int persist_updated_configs();
int set_config_lock();

View File

@@ -156,7 +156,7 @@ namespace msg::fbuf::p2pmsg
return nup;
}
const std::vector<conf::peer_properties> create_peer_list_response_from_msg(const p2p::peer_message_info &mi)
const std::vector<p2p::peer_properties> create_peer_list_response_from_msg(const p2p::peer_message_info &mi)
{
const auto &msg = *mi.p2p_msg->content_as_PeerListResponseMsg();
return flatbuf_peer_propertieslist_to_peer_propertiesvector(msg.peer_list());
@@ -239,14 +239,14 @@ namespace msg::fbuf::p2pmsg
}
}
const std::vector<conf::peer_properties>
const std::vector<p2p::peer_properties>
flatbuf_peer_propertieslist_to_peer_propertiesvector(const flatbuffers::Vector<flatbuffers::Offset<PeerProperties>> *fbvec)
{
std::vector<conf::peer_properties> peers;
std::vector<p2p::peer_properties> peers;
for (const PeerProperties *peer : *fbvec)
{
conf::peer_properties properties;
p2p::peer_properties properties;
properties.ip_port.host_address = flatbuf_str_to_sv(peer->host_address());
properties.ip_port.port = peer->port();
@@ -465,7 +465,7 @@ namespace msg::fbuf::p2pmsg
create_p2p_msg(builder, P2PMsgContent_PeerListRequestMsg, msg.Union());
}
void create_msg_from_peer_list_response(flatbuffers::FlatBufferBuilder &builder, const std::vector<conf::peer_properties> &peers, const std::optional<conf::peer_ip_port> &skipping_ip_port)
void create_msg_from_peer_list_response(flatbuffers::FlatBufferBuilder &builder, const std::vector<p2p::peer_properties> &peers, const std::optional<conf::peer_ip_port> &skipping_ip_port)
{
const auto msg = CreatePeerListResponseMsg(
builder,
@@ -520,7 +520,7 @@ namespace msg::fbuf::p2pmsg
}
const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<PeerProperties>>>
peer_propertiesvector_to_flatbuf_peer_propertieslist(flatbuffers::FlatBufferBuilder &builder, const std::vector<conf::peer_properties> &peers, const std::optional<conf::peer_ip_port> &skipping_ip_port)
peer_propertiesvector_to_flatbuf_peer_propertieslist(flatbuffers::FlatBufferBuilder &builder, const std::vector<p2p::peer_properties> &peers, const std::optional<conf::peer_ip_port> &skipping_ip_port)
{
std::vector<flatbuffers::Offset<PeerProperties>> fbvec;
fbvec.reserve(peers.size());

View File

@@ -29,7 +29,7 @@ namespace msg::fbuf::p2pmsg
const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const p2p::peer_message_info &mi);
const std::vector<conf::peer_properties> create_peer_list_response_from_msg(const p2p::peer_message_info &mi);
const std::vector<p2p::peer_properties> create_peer_list_response_from_msg(const p2p::peer_message_info &mi);
const p2p::peer_capacity_announcement create_peer_capacity_announcement_from_msg(const p2p::peer_message_info &mi);
@@ -46,7 +46,7 @@ namespace msg::fbuf::p2pmsg
void flatbuf_hpfsfshashentry_to_hpfsfshashentry(std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entries, const flatbuffers::Vector<flatbuffers::Offset<HpfsFSHashEntry>> *fhashes);
const std::vector<conf::peer_properties>
const std::vector<p2p::peer_properties>
flatbuf_peer_propertieslist_to_peer_propertiesvector(const flatbuffers::Vector<flatbuffers::Offset<PeerProperties>> *fbvec);
//---std to Flatbuf---//
@@ -85,7 +85,7 @@ namespace msg::fbuf::p2pmsg
void create_msg_from_peer_list_request(flatbuffers::FlatBufferBuilder &builder);
void create_msg_from_peer_list_response(flatbuffers::FlatBufferBuilder &builder, const std::vector<conf::peer_properties> &peers, const std::optional<conf::peer_ip_port> &skipping_ip_port);
void create_msg_from_peer_list_response(flatbuffers::FlatBufferBuilder &builder, const std::vector<p2p::peer_properties> &peers, const std::optional<conf::peer_ip_port> &skipping_ip_port);
const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<UserInputGroup>>>
user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map<std::string, std::list<usr::submitted_user_input>> &map);
@@ -96,7 +96,7 @@ namespace msg::fbuf::p2pmsg
std::vector<hpfs::child_hash_node> &hash_nodes);
const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<PeerProperties>>>
peer_propertiesvector_to_flatbuf_peer_propertieslist(flatbuffers::FlatBufferBuilder &builder, const std::vector<conf::peer_properties> &peers, const std::optional<conf::peer_ip_port> &skipping_ip_port);
peer_propertiesvector_to_flatbuf_peer_propertieslist(flatbuffers::FlatBufferBuilder &builder, const std::vector<p2p::peer_properties> &peers, const std::optional<conf::peer_ip_port> &skipping_ip_port);
const flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash>
seqhash_to_flatbuf_seqhash(flatbuffers::FlatBufferBuilder &builder, const p2p::sequence_hash &seqhash);

View File

@@ -12,6 +12,9 @@
namespace p2pmsg = msg::fbuf::p2pmsg;
// Maximum no. of peers that will be persisted back to config upon exit.
constexpr size_t MAX_PERSISTED_KNOWN_PEERS = 50;
namespace p2p
{
@@ -48,10 +51,16 @@ namespace p2p
{
if (init_success)
{
// Persist latest known peers information to config before the peer server is stopped.
// If peer discovery was enabled, update latest known peers information to config
// before the peer server is stopped. (config will permanently save it to disk upon exit)
if (conf::cfg.mesh.peer_discovery.enabled)
{
std::scoped_lock lock(ctx.server->req_known_remotes_mutex);
conf::persist_known_peers_config(ctx.server->req_known_remotes);
const std::vector<peer_properties> &peers = ctx.server->req_known_remotes;
const size_t count = MIN(MAX_PERSISTED_KNOWN_PEERS, peers.size());
conf::cfg.mesh.known_peers.clear();
for (size_t i = 0; i < count; i++)
conf::cfg.mesh.known_peers.emplace(peers[i].ip_port);
}
ctx.server->stop();
@@ -61,8 +70,11 @@ namespace p2p
int start_peer_connections()
{
const uint16_t listen_port = conf::cfg.mesh.listen ? conf::cfg.mesh.port : 0;
std::vector<peer_properties> known_peers;
for (const conf::peer_ip_port &ipp : conf::cfg.mesh.known_peers)
known_peers.push_back(peer_properties{ipp, -1, 0});
ctx.server.emplace(listen_port, metric_thresholds, conf::cfg.mesh.max_bytes_per_msg,
conf::cfg.mesh.max_connections, conf::cfg.mesh.max_in_connections_per_host, conf::cfg.mesh.known_peers);
conf::cfg.mesh.max_connections, conf::cfg.mesh.max_in_connections_per_host, std::move(known_peers));
if (ctx.server->start() == -1)
return -1;
@@ -367,7 +379,7 @@ namespace p2p
{
std::scoped_lock<std::mutex> lock(ctx.server->req_known_remotes_mutex);
const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](conf::peer_properties &p) { return p.ip_port == ip_port; });
const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) { return p.ip_port == ip_port; });
if (itr != ctx.server->req_known_remotes.end())
{
LOG_DEBUG << "Updating peer available capacity: Host address: " << itr->ip_port.host_address << ":" << itr->ip_port.port << ", Capacity: " << std::to_string(available_capacity);
@@ -395,13 +407,13 @@ namespace p2p
* Merging the response peer list with the own known peer list.
* @param peers Incoming peer list.
*/
void merge_peer_list(const std::vector<conf::peer_properties> &peers)
void merge_peer_list(const std::vector<peer_properties> &peers)
{
std::scoped_lock<std::mutex> lock(ctx.server->req_known_remotes_mutex);
for (const conf::peer_properties &peer : peers)
for (const peer_properties &peer : peers)
{
const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](conf::peer_properties &p) { return p.ip_port == peer.ip_port; });
const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) { return p.ip_port == peer.ip_port; });
// If the new peer is not in the peer list then add to the req_known_remotes
// Otherwise if new peer is recently updated (timestamp >) replace with the current one.
@@ -436,7 +448,7 @@ namespace p2p
void sort_known_remotes()
{
std::sort(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(),
[](const conf::peer_properties &p1, const conf::peer_properties &p2) {
[](const peer_properties &p1, const peer_properties &p2) {
return get_peer_weight(p1) < 0 || get_peer_weight(p1) > get_peer_weight(p2);
});
}
@@ -446,7 +458,7 @@ namespace p2p
* @param peer Properties of the peer.
* @returns -1 if available capacity is unlimited otherwise weight value.
*/
int32_t get_peer_weight(const conf::peer_properties &peer)
int32_t get_peer_weight(const peer_properties &peer)
{
const uint64_t time_now = util::get_epoch_milliseconds();
return peer.available_capacity >= 0 ? peer.available_capacity * 1000 * 60 / ceil(time_now - peer.timestamp) : -1;

View File

@@ -18,6 +18,16 @@ namespace p2p
constexpr uint16_t HPFS_RES_LIST_CAP = 64; // Maximum state response count.
constexpr uint16_t PEER_LIST_CAP = 64; // Maximum peer count.
// Struct to represent information about a peer.
// Initially available capacity is set to -1 and timestamp is set to 0.
// Later it will be updated according to the capacity anouncement from the peers.
struct peer_properties
{
conf::peer_ip_port ip_port;
int16_t available_capacity = -1;
uint64_t timestamp = 0;
};
struct sequence_hash
{
uint64_t seq_no = 0;
@@ -212,9 +222,9 @@ namespace p2p
void update_known_peer_available_capacity(const conf::peer_ip_port &ip_port, const int16_t available_capacity, const uint64_t &timestamp);
void merge_peer_list(const std::vector<conf::peer_properties> &peers);
void merge_peer_list(const std::vector<peer_properties> &peers);
int32_t get_peer_weight(const conf::peer_properties &peer);
int32_t get_peer_weight(const peer_properties &peer);
void sort_known_remotes();

View File

@@ -15,7 +15,7 @@ namespace p2p
peer_comm_server::peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size,
const uint64_t max_in_connections, const uint64_t max_in_connections_per_host,
const std::vector<conf::peer_properties> &req_known_remotes)
const std::vector<peer_properties> &req_known_remotes)
: comm::comm_server<peer_comm_session>("Peer", port, metric_thresholds, max_msg_size, max_in_connections, max_in_connections_per_host),
req_known_remotes(req_known_remotes) // Copy over known peers into internal collection.
{

View File

@@ -9,6 +9,8 @@ namespace p2p
// Globally exposed weakly connected status variable.
extern bool is_weakly_connected;
struct peer_properties;
class peer_comm_server : public comm::comm_server<peer_comm_session>
{
private:
@@ -30,10 +32,10 @@ namespace p2p
public:
std::atomic<uint16_t> known_remote_count = 0;
std::mutex req_known_remotes_mutex;
std::vector<conf::peer_properties> req_known_remotes;
std::vector<peer_properties> req_known_remotes;
peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size,
const uint64_t max_in_connections, const uint64_t max_in_connections_per_host,
const std::vector<conf::peer_properties> &req_known_remotes);
const std::vector<peer_properties> &req_known_remotes);
};
} // namespace p2p

View File

@@ -378,7 +378,7 @@ namespace util
buf.resize(st.st_size);
return read(fd, buf.data(), buf.size());
return pread(fd, buf.data(), buf.size(), 0);
}
/**