diff --git a/src/conf.cpp b/src/conf.cpp index 0b7f2cff..a8a6d5c3 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -62,6 +62,9 @@ namespace conf { if (init_success) { + if (persist_updated_configs() == -1) + LOG_ERROR << "Failed to persist config updates."; + // Releases the config file lock at the termination. release_config_lock(); } @@ -135,7 +138,7 @@ namespace conf cfg.hp_version = util::HP_VERSION; - cfg.node.role = startup_role = ROLE::VALIDATOR; + cfg.node.role = ROLE::VALIDATOR; cfg.node.history = HISTORY::CUSTOM; cfg.node.history_config.max_primary_shards = 1; cfg.node.history_config.max_blob_shards = 1; @@ -318,15 +321,14 @@ namespace conf } if (node["role"] == ROLE_OBSERVER) - cfg.node.role = ROLE::OBSERVER; + startup_role = cfg.node.role = ROLE::OBSERVER; else if (node["role"] == ROLE_VALIDATOR) - cfg.node.role = ROLE::VALIDATOR; + startup_role = cfg.node.role = ROLE::VALIDATOR; else { std::cerr << "Invalid mode. 'observer' or 'validator' expected.\n"; return -1; } - startup_role = cfg.node.role; if (node["history"] == HISTORY_FULL) cfg.node.history = HISTORY::FULL; @@ -399,11 +401,10 @@ namespace conf return -1; } - peer_properties peer; - peer.ip_port.host_address = splitted_peers.front(); - peer.ip_port.port = std::stoi(splitted_peers.back()); - - cfg.mesh.known_peers.push_back(peer); + peer_ip_port ipp; + ipp.host_address = splitted_peers.front(); + ipp.port = std::stoi(splitted_peers.back()); + cfg.mesh.known_peers.emplace(ipp); splitted_peers.clear(); } cfg.mesh.msg_forwarding = mesh["msg_forwarding"].as(); @@ -514,7 +515,6 @@ namespace conf jsoncons::ojson node_config; node_config.insert_or_assign("public_key", cfg.node.public_key_hex); node_config.insert_or_assign("private_key", cfg.node.private_key_hex); - // We always save the startup role to config. Not the current role which might get changed dynamically during syncing. node_config.insert_or_assign("role", cfg.node.role == ROLE::OBSERVER ? ROLE_OBSERVER : ROLE_VALIDATOR); node_config.insert_or_assign("history", cfg.node.history == HISTORY::FULL ? HISTORY_FULL : HISTORY_CUSTOM); @@ -541,9 +541,9 @@ namespace conf mesh_config.insert_or_assign("idle_timeout", cfg.mesh.idle_timeout); jsoncons::ojson peers(jsoncons::json_array_arg); - for (const auto &peer : cfg.mesh.known_peers) + for (const auto &ipp : cfg.mesh.known_peers) { - const std::string concat_str = std::string(peer.ip_port.host_address).append(":").append(std::to_string(peer.ip_port.port)); + const std::string concat_str = std::string(ipp.host_address).append(":").append(std::to_string(ipp.port)); peers.push_back(concat_str); } mesh_config.insert_or_assign("known_peers", peers); @@ -815,9 +815,12 @@ namespace conf } buf.clear(); - // Persist new changes to HP config file. - if (parse_contract_section_json(cfg.contract, jdoc, true) == -1 || - write_config(cfg) == -1) + // Persist new changes to config file and runtime config. + hp_config temp_cfg; + if (read_config(temp_cfg) == -1 || + parse_contract_section_json(temp_cfg.contract, jdoc, true) == -1 || + parse_contract_section_json(cfg.contract, jdoc, true) == -1 || + write_config(temp_cfg) == -1) { LOG_ERROR << "Error applying patch config."; return -1; @@ -828,18 +831,31 @@ namespace conf } /** - * Persists the specified known peers to the config file. + * Persists any updated config fields back to config file. */ - int persist_known_peers_config(const std::vector &peers) + int persist_updated_configs() { - if (peers.empty()) + const bool contains_updated_config = cfg.mesh.peer_discovery.enabled; + bool changes_made = false; + if (!contains_updated_config) return 0; - const size_t max_count = conf::cfg.mesh.max_known_connections == 0 - ? peers.size() - : MIN(peers.size(), conf::cfg.mesh.max_known_connections); - cfg.mesh.known_peers = std::vector(peers.begin(), peers.begin() + max_count); - return write_config(cfg); + // Read the original config into a temp struct. + hp_config temp_cfg; + if (read_config(temp_cfg) == -1) + return -1; + + // Apply any actual changes to the temp struct. + + // Apply known peer list updates. + if (conf::cfg.mesh.peer_discovery.enabled && !cfg.mesh.known_peers.empty()) + { + temp_cfg.mesh.known_peers = cfg.mesh.known_peers; + changes_made = true; + } + + // Persis the temp struct if any changes made to values. + return changes_made ? write_config(temp_cfg) : 0; } /** diff --git a/src/conf.hpp b/src/conf.hpp index 3c945a1d..d85184b3 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -17,25 +17,20 @@ namespace conf std::string host_address; uint16_t port; - bool operator==(const peer_ip_port &ip_port) + bool operator==(const peer_ip_port &other) const { - return host_address == ip_port.host_address && port == ip_port.port; + return host_address == other.host_address && port == other.port; } - bool operator!=(const peer_ip_port &ip_port) + bool operator!=(const peer_ip_port &other) const { - return !(host_address == ip_port.host_address && port == ip_port.port); + return !(host_address == other.host_address && port == other.port); } - }; - // Struct to represent information about a peer. - // Initially available capacity is set to -1 and timestamp is set to 0. - // Later it will be updated according to the capacity anouncement from the peers. - struct peer_properties - { - peer_ip_port ip_port; - int16_t available_capacity = -1; - uint64_t timestamp = 0; + bool operator<(const peer_ip_port &other) const + { + return (host_address == other.host_address) ? port < other.port : host_address < other.host_address; + } }; // The role of the contract node. @@ -154,7 +149,7 @@ namespace conf uint16_t port = 0; // Listening port for peer connections bool listen = true; // Whether to listen for incoming peer connections. uint32_t idle_timeout = 0; // Idle connection timeout ms for peer connections. - std::vector known_peers; // Vector of peers with ip_port, timestamp, capacity. + std::set known_peers; // Ordered set of peers with ip_port. bool msg_forwarding = false; // Whether peer message forwarding is on/off. uint16_t max_connections = 0; // Max peer connections. uint16_t max_known_connections = 0; // Max known peer connections. @@ -248,7 +243,7 @@ namespace conf int apply_patch_config(std::string_view hpfs_session_name); - int persist_known_peers_config(const std::vector &peers); + int persist_updated_configs(); int set_config_lock(); diff --git a/src/msg/fbuf/p2pmsg_conversion.cpp b/src/msg/fbuf/p2pmsg_conversion.cpp index 5ff823a9..0d2d2679 100644 --- a/src/msg/fbuf/p2pmsg_conversion.cpp +++ b/src/msg/fbuf/p2pmsg_conversion.cpp @@ -156,7 +156,7 @@ namespace msg::fbuf::p2pmsg return nup; } - const std::vector create_peer_list_response_from_msg(const p2p::peer_message_info &mi) + const std::vector create_peer_list_response_from_msg(const p2p::peer_message_info &mi) { const auto &msg = *mi.p2p_msg->content_as_PeerListResponseMsg(); return flatbuf_peer_propertieslist_to_peer_propertiesvector(msg.peer_list()); @@ -239,14 +239,14 @@ namespace msg::fbuf::p2pmsg } } - const std::vector + const std::vector flatbuf_peer_propertieslist_to_peer_propertiesvector(const flatbuffers::Vector> *fbvec) { - std::vector peers; + std::vector peers; for (const PeerProperties *peer : *fbvec) { - conf::peer_properties properties; + p2p::peer_properties properties; properties.ip_port.host_address = flatbuf_str_to_sv(peer->host_address()); properties.ip_port.port = peer->port(); @@ -465,7 +465,7 @@ namespace msg::fbuf::p2pmsg create_p2p_msg(builder, P2PMsgContent_PeerListRequestMsg, msg.Union()); } - void create_msg_from_peer_list_response(flatbuffers::FlatBufferBuilder &builder, const std::vector &peers, const std::optional &skipping_ip_port) + void create_msg_from_peer_list_response(flatbuffers::FlatBufferBuilder &builder, const std::vector &peers, const std::optional &skipping_ip_port) { const auto msg = CreatePeerListResponseMsg( builder, @@ -520,7 +520,7 @@ namespace msg::fbuf::p2pmsg } const flatbuffers::Offset>> - peer_propertiesvector_to_flatbuf_peer_propertieslist(flatbuffers::FlatBufferBuilder &builder, const std::vector &peers, const std::optional &skipping_ip_port) + peer_propertiesvector_to_flatbuf_peer_propertieslist(flatbuffers::FlatBufferBuilder &builder, const std::vector &peers, const std::optional &skipping_ip_port) { std::vector> fbvec; fbvec.reserve(peers.size()); diff --git a/src/msg/fbuf/p2pmsg_conversion.hpp b/src/msg/fbuf/p2pmsg_conversion.hpp index ac6d69bb..371bc0c2 100644 --- a/src/msg/fbuf/p2pmsg_conversion.hpp +++ b/src/msg/fbuf/p2pmsg_conversion.hpp @@ -29,7 +29,7 @@ namespace msg::fbuf::p2pmsg const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const p2p::peer_message_info &mi); - const std::vector create_peer_list_response_from_msg(const p2p::peer_message_info &mi); + const std::vector create_peer_list_response_from_msg(const p2p::peer_message_info &mi); const p2p::peer_capacity_announcement create_peer_capacity_announcement_from_msg(const p2p::peer_message_info &mi); @@ -46,7 +46,7 @@ namespace msg::fbuf::p2pmsg void flatbuf_hpfsfshashentry_to_hpfsfshashentry(std::unordered_map &fs_entries, const flatbuffers::Vector> *fhashes); - const std::vector + const std::vector flatbuf_peer_propertieslist_to_peer_propertiesvector(const flatbuffers::Vector> *fbvec); //---std to Flatbuf---// @@ -85,7 +85,7 @@ namespace msg::fbuf::p2pmsg void create_msg_from_peer_list_request(flatbuffers::FlatBufferBuilder &builder); - void create_msg_from_peer_list_response(flatbuffers::FlatBufferBuilder &builder, const std::vector &peers, const std::optional &skipping_ip_port); + void create_msg_from_peer_list_response(flatbuffers::FlatBufferBuilder &builder, const std::vector &peers, const std::optional &skipping_ip_port); const flatbuffers::Offset>> user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map); @@ -96,7 +96,7 @@ namespace msg::fbuf::p2pmsg std::vector &hash_nodes); const flatbuffers::Offset>> - peer_propertiesvector_to_flatbuf_peer_propertieslist(flatbuffers::FlatBufferBuilder &builder, const std::vector &peers, const std::optional &skipping_ip_port); + peer_propertiesvector_to_flatbuf_peer_propertieslist(flatbuffers::FlatBufferBuilder &builder, const std::vector &peers, const std::optional &skipping_ip_port); const flatbuffers::Offset seqhash_to_flatbuf_seqhash(flatbuffers::FlatBufferBuilder &builder, const p2p::sequence_hash &seqhash); diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 8637afb0..54228346 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -12,6 +12,9 @@ namespace p2pmsg = msg::fbuf::p2pmsg; +// Maximum no. of peers that will be persisted back to config upon exit. +constexpr size_t MAX_PERSISTED_KNOWN_PEERS = 50; + namespace p2p { @@ -48,10 +51,16 @@ namespace p2p { if (init_success) { - // Persist latest known peers information to config before the peer server is stopped. + // If peer discovery was enabled, update latest known peers information to config + // before the peer server is stopped. (config will permanently save it to disk upon exit) + if (conf::cfg.mesh.peer_discovery.enabled) { std::scoped_lock lock(ctx.server->req_known_remotes_mutex); - conf::persist_known_peers_config(ctx.server->req_known_remotes); + const std::vector &peers = ctx.server->req_known_remotes; + const size_t count = MIN(MAX_PERSISTED_KNOWN_PEERS, peers.size()); + conf::cfg.mesh.known_peers.clear(); + for (size_t i = 0; i < count; i++) + conf::cfg.mesh.known_peers.emplace(peers[i].ip_port); } ctx.server->stop(); @@ -61,8 +70,11 @@ namespace p2p int start_peer_connections() { const uint16_t listen_port = conf::cfg.mesh.listen ? conf::cfg.mesh.port : 0; + std::vector known_peers; + for (const conf::peer_ip_port &ipp : conf::cfg.mesh.known_peers) + known_peers.push_back(peer_properties{ipp, -1, 0}); ctx.server.emplace(listen_port, metric_thresholds, conf::cfg.mesh.max_bytes_per_msg, - conf::cfg.mesh.max_connections, conf::cfg.mesh.max_in_connections_per_host, conf::cfg.mesh.known_peers); + conf::cfg.mesh.max_connections, conf::cfg.mesh.max_in_connections_per_host, std::move(known_peers)); if (ctx.server->start() == -1) return -1; @@ -367,7 +379,7 @@ namespace p2p { std::scoped_lock lock(ctx.server->req_known_remotes_mutex); - const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](conf::peer_properties &p) { return p.ip_port == ip_port; }); + const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) { return p.ip_port == ip_port; }); if (itr != ctx.server->req_known_remotes.end()) { LOG_DEBUG << "Updating peer available capacity: Host address: " << itr->ip_port.host_address << ":" << itr->ip_port.port << ", Capacity: " << std::to_string(available_capacity); @@ -395,13 +407,13 @@ namespace p2p * Merging the response peer list with the own known peer list. * @param peers Incoming peer list. */ - void merge_peer_list(const std::vector &peers) + void merge_peer_list(const std::vector &peers) { std::scoped_lock lock(ctx.server->req_known_remotes_mutex); - for (const conf::peer_properties &peer : peers) + for (const peer_properties &peer : peers) { - const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](conf::peer_properties &p) { return p.ip_port == peer.ip_port; }); + const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) { return p.ip_port == peer.ip_port; }); // If the new peer is not in the peer list then add to the req_known_remotes // Otherwise if new peer is recently updated (timestamp >) replace with the current one. @@ -436,7 +448,7 @@ namespace p2p void sort_known_remotes() { std::sort(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), - [](const conf::peer_properties &p1, const conf::peer_properties &p2) { + [](const peer_properties &p1, const peer_properties &p2) { return get_peer_weight(p1) < 0 || get_peer_weight(p1) > get_peer_weight(p2); }); } @@ -446,7 +458,7 @@ namespace p2p * @param peer Properties of the peer. * @returns -1 if available capacity is unlimited otherwise weight value. */ - int32_t get_peer_weight(const conf::peer_properties &peer) + int32_t get_peer_weight(const peer_properties &peer) { const uint64_t time_now = util::get_epoch_milliseconds(); return peer.available_capacity >= 0 ? peer.available_capacity * 1000 * 60 / ceil(time_now - peer.timestamp) : -1; diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 43ef3462..e425534c 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -18,6 +18,16 @@ namespace p2p constexpr uint16_t HPFS_RES_LIST_CAP = 64; // Maximum state response count. constexpr uint16_t PEER_LIST_CAP = 64; // Maximum peer count. + // Struct to represent information about a peer. + // Initially available capacity is set to -1 and timestamp is set to 0. + // Later it will be updated according to the capacity anouncement from the peers. + struct peer_properties + { + conf::peer_ip_port ip_port; + int16_t available_capacity = -1; + uint64_t timestamp = 0; + }; + struct sequence_hash { uint64_t seq_no = 0; @@ -212,9 +222,9 @@ namespace p2p void update_known_peer_available_capacity(const conf::peer_ip_port &ip_port, const int16_t available_capacity, const uint64_t ×tamp); - void merge_peer_list(const std::vector &peers); + void merge_peer_list(const std::vector &peers); - int32_t get_peer_weight(const conf::peer_properties &peer); + int32_t get_peer_weight(const peer_properties &peer); void sort_known_remotes(); diff --git a/src/p2p/peer_comm_server.cpp b/src/p2p/peer_comm_server.cpp index b3f5f8be..adcb282d 100644 --- a/src/p2p/peer_comm_server.cpp +++ b/src/p2p/peer_comm_server.cpp @@ -15,7 +15,7 @@ namespace p2p peer_comm_server::peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size, const uint64_t max_in_connections, const uint64_t max_in_connections_per_host, - const std::vector &req_known_remotes) + const std::vector &req_known_remotes) : comm::comm_server("Peer", port, metric_thresholds, max_msg_size, max_in_connections, max_in_connections_per_host), req_known_remotes(req_known_remotes) // Copy over known peers into internal collection. { diff --git a/src/p2p/peer_comm_server.hpp b/src/p2p/peer_comm_server.hpp index 30e7480d..1c0a4703 100644 --- a/src/p2p/peer_comm_server.hpp +++ b/src/p2p/peer_comm_server.hpp @@ -9,6 +9,8 @@ namespace p2p // Globally exposed weakly connected status variable. extern bool is_weakly_connected; + struct peer_properties; + class peer_comm_server : public comm::comm_server { private: @@ -30,10 +32,10 @@ namespace p2p public: std::atomic known_remote_count = 0; std::mutex req_known_remotes_mutex; - std::vector req_known_remotes; + std::vector req_known_remotes; peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size, const uint64_t max_in_connections, const uint64_t max_in_connections_per_host, - const std::vector &req_known_remotes); + const std::vector &req_known_remotes); }; } // namespace p2p diff --git a/src/util/util.cpp b/src/util/util.cpp index ac3e8863..683b60c4 100644 --- a/src/util/util.cpp +++ b/src/util/util.cpp @@ -378,7 +378,7 @@ namespace util buf.resize(st.st_size); - return read(fd, buf.data(), buf.size()); + return pread(fd, buf.data(), buf.size(), 0); } /**