From e7e1268a9915c5f17da2181acfbf85c8428f5579 Mon Sep 17 00:00:00 2001 From: Kithmini Gunawardhana Date: Sat, 23 Sep 2023 10:03:45 +0530 Subject: [PATCH] Networking fixes related to peer connectivity issues (#384) --- src/comm/comm_session.hpp | 9 +- src/conf.cpp | 5 +- src/consensus.cpp | 7 +- src/msg/controlmsg_common.hpp | 1 - src/msg/controlmsg_parser.cpp | 4 +- src/msg/controlmsg_parser.hpp | 2 +- src/msg/fbuf/p2pmsg.fbs | 10 +- src/msg/fbuf/p2pmsg_conversion.cpp | 18 +++ src/msg/fbuf/p2pmsg_conversion.hpp | 4 + src/msg/fbuf/p2pmsg_generated.h | 128 +++++++++++++++++- src/msg/json/controlmsg_json.cpp | 72 +++++++--- src/msg/json/controlmsg_json.hpp | 2 +- src/p2p/p2p.cpp | 106 +++++++++++---- src/p2p/p2p.hpp | 27 +++- src/p2p/peer_comm_server.cpp | 4 +- src/p2p/peer_comm_session.cpp | 29 +++- src/sc/sc.cpp | 50 ++++--- src/sc/sc.hpp | 3 - src/util/version.hpp | 2 +- test/local-cluster/cluster-start.sh | 2 +- .../consensus-test-continuous.sh | 2 +- test/local-cluster/consensus-test-loop.sh | 2 +- test/local-cluster/rundir.sh | 2 +- 23 files changed, 393 insertions(+), 98 deletions(-) diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp index 76a6ae8e..84aa1fa2 100644 --- a/src/comm/comm_session.hpp +++ b/src/comm/comm_session.hpp @@ -30,10 +30,11 @@ namespace comm VIOLATION_MSG_READ = 1, VIOLATION_READ_ERROR = 2, VIOLATION_THRESHOLD_EXCEEDED = 3, - VIOLATION_INACTIVITY = 4 + VIOLATION_INACTIVITY = 4, + VIOLATION_IRRELEVANT_KNOWN_PEER = 5 }; - /** + /** * Represents an active WebSocket connection */ class comm_session @@ -63,7 +64,7 @@ namespace comm std::string uniqueid; // Verified session: Pubkey in hex format, Unverified session: IP address. std::string pubkey; // Pubkey in binary format. const bool is_inbound; - const bool is_ipv4; // Whether the host is ipv4 or ipv6. + const bool is_ipv4; // Whether the host is ipv4 or ipv6. const std::string host_address; // Connection host address of the remote party. std::string issued_challenge; SESSION_STATE state = SESSION_STATE::NONE; @@ -71,7 +72,7 @@ namespace comm uint64_t last_activity_timestamp; // Keep track of the last activity timestamp in milliseconds. comm_session(corebill::tracker &violation_tracker, - std::string_view host_address, hpws::client &&hpws_client, const bool is_ipv4, const bool is_inbound, const uint64_t (&metric_thresholds)[5]); + std::string_view host_address, hpws::client &&hpws_client, const bool is_ipv4, const bool is_inbound, const uint64_t (&metric_thresholds)[5]); int init(); int process_next_inbound_message(const uint16_t priority); int send(const std::vector &message, const uint16_t priority = 2); diff --git a/src/conf.cpp b/src/conf.cpp index 4043d8b5..47e6e696 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -862,10 +862,7 @@ namespace conf */ int persist_updated_configs() { - const bool contains_updated_config = cfg.mesh.peer_discovery.enabled; bool changes_made = false; - if (!contains_updated_config) - return 0; // Read the original config into a temp struct. hp_config temp_cfg; @@ -875,7 +872,7 @@ namespace conf // Apply any actual changes to the temp struct. // Apply known peer list updates. - if (conf::cfg.mesh.peer_discovery.enabled && !cfg.mesh.known_peers.empty()) + if (!cfg.mesh.known_peers.empty()) { temp_cfg.mesh.known_peers = cfg.mesh.known_peers; changes_made = true; diff --git a/src/consensus.cpp b/src/consensus.cpp index d67f6780..4efb7ee2 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -791,7 +791,12 @@ namespace consensus { std::scoped_lock lock(ctx.contract_ctx_mutex); if (ctx.contract_ctx) - return ctx.contract_ctx->args.npl_messages.try_enqueue(std::move(npl_msg)); + { + if (ctx.contract_ctx->args.lcl_id == npl_msg.lcl_id) + return ctx.contract_ctx->args.npl_messages.try_enqueue(std::move(npl_msg)); + else + LOG_DEBUG << "Trying to add irrelevant NPL from " << util::to_hex(npl_msg.pubkey) << " | lcl-seq: " << npl_msg.lcl_id.seq_no; + } return false; } diff --git a/src/msg/controlmsg_common.hpp b/src/msg/controlmsg_common.hpp index 391f5d1d..195337e4 100644 --- a/src/msg/controlmsg_common.hpp +++ b/src/msg/controlmsg_common.hpp @@ -11,7 +11,6 @@ namespace msg::controlmsg constexpr const char *FLD_REMOVE = "remove"; // Message types - constexpr const char *MSGTYPE_CONTRACT_END = "contract_end"; constexpr const char *MSGTYPE_PEER_CHANGESET = "peer_changeset"; } // namespace msg::controlmsg diff --git a/src/msg/controlmsg_parser.cpp b/src/msg/controlmsg_parser.cpp index 461cef2c..12f22b82 100644 --- a/src/msg/controlmsg_parser.cpp +++ b/src/msg/controlmsg_parser.cpp @@ -16,9 +16,9 @@ namespace msg::controlmsg return jctlmsg::extract_type(extracted_type, jdoc); } - int controlmsg_parser::extract_peer_changeset(std::vector &added_peers, std::vector &removed_peers) const + int controlmsg_parser::extract_peer_changeset(std::vector &added_peers, std::vector &removed_peers, bool &overwrite) const { - return jctlmsg::extract_peer_changeset(added_peers, removed_peers, jdoc); + return jctlmsg::extract_peer_changeset(added_peers, removed_peers, overwrite, jdoc); } } // namespace msg::controlmsg \ No newline at end of file diff --git a/src/msg/controlmsg_parser.hpp b/src/msg/controlmsg_parser.hpp index e21eeeb0..6fb0c2a8 100644 --- a/src/msg/controlmsg_parser.hpp +++ b/src/msg/controlmsg_parser.hpp @@ -13,7 +13,7 @@ namespace msg::controlmsg public: int parse(std::string_view message); int extract_type(std::string &extracted_type) const; - int extract_peer_changeset(std::vector &added_peers, std::vector &removed_peers) const; + int extract_peer_changeset(std::vector &added_peers, std::vector &removed_peers, bool &overwrite) const; }; } // namespace msg::controlmsg diff --git a/src/msg/fbuf/p2pmsg.fbs b/src/msg/fbuf/p2pmsg.fbs index 14b91ca6..acb0ab61 100644 --- a/src/msg/fbuf/p2pmsg.fbs +++ b/src/msg/fbuf/p2pmsg.fbs @@ -16,7 +16,8 @@ union P2PMsgContent { PeerListRequestMsg, PeerListResponseMsg, HpfsLogRequest, - HpfsLogResponse + HpfsLogResponse, + SuppressMsg } table P2PMsg { @@ -82,6 +83,13 @@ table NplMsg { // Make sure to update signature generation/verification whenever these fields are changed. } +enum SuppressReason : byte { ContractIdMismatch = 0 } + +table SuppressMsg { + pubkey:[ubyte]; // Sender pubkey. + reason: SuppressReason; +} + //--hpfs requests and responses--// enum HpfsFsEntryResponseType : byte { Matched = 0, Mismatched = 1, Responded = 2, NotAvailable = 3 } diff --git a/src/msg/fbuf/p2pmsg_conversion.cpp b/src/msg/fbuf/p2pmsg_conversion.cpp index ad67e652..4c70923a 100644 --- a/src/msg/fbuf/p2pmsg_conversion.cpp +++ b/src/msg/fbuf/p2pmsg_conversion.cpp @@ -298,6 +298,14 @@ namespace msg::fbuf::p2pmsg return map; } + const p2p::suppress_message create_suppress_from_msg(const p2p::peer_message_info &mi) + { + const auto &msg = *mi.p2p_msg->content_as_SuppressMsg(); + return { + std::string(flatbuf_bytes_to_sv(msg.pubkey())), + (p2p::SUPPRESS_REASON)msg.reason()}; + } + void flatbuf_hpfsfshashentries_to_hpfsfshashentries(std::vector &fs_entries, const flatbuffers::Vector> *fhashes) { for (const HpfsFSHashEntry *f_hash : *fhashes) @@ -602,6 +610,16 @@ namespace msg::fbuf::p2pmsg create_p2p_msg(builder, P2PMsgContent_PeerListResponseMsg, msg.Union()); } + void create_suppress_msg(flatbuffers::FlatBufferBuilder &builder, const uint8_t reason) + { + const auto msg = CreateSuppressMsg( + builder, + sv_to_flatbuf_bytes(builder, conf::cfg.node.public_key), + (SuppressReason)reason); + + create_p2p_msg(builder, P2PMsgContent_SuppressMsg, msg.Union()); + } + const flatbuffers::Offset>> user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map) { diff --git a/src/msg/fbuf/p2pmsg_conversion.hpp b/src/msg/fbuf/p2pmsg_conversion.hpp index 2b47ff45..d117fb7f 100644 --- a/src/msg/fbuf/p2pmsg_conversion.hpp +++ b/src/msg/fbuf/p2pmsg_conversion.hpp @@ -42,6 +42,8 @@ namespace msg::fbuf::p2pmsg const p2p::hpfs_log_response create_hpfs_log_response_from_msg(const p2p::peer_message_info &mi); + const p2p::suppress_message create_suppress_from_msg(const p2p::peer_message_info &mi); + util::sequence_hash flatbuf_seqhash_to_seqhash(const msg::fbuf::p2pmsg::SequenceHash *fbseqhash); const std::set flatbuf_bytearrayvector_to_stringlist(const flatbuffers::Vector> *fbvec); @@ -101,6 +103,8 @@ namespace msg::fbuf::p2pmsg void create_msg_from_peer_list_response(flatbuffers::FlatBufferBuilder &builder, const std::vector &peers, const std::optional &skipping_ip_port); + void create_suppress_msg(flatbuffers::FlatBufferBuilder &builder, const uint8_t reason); + const flatbuffers::Offset>> user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map); diff --git a/src/msg/fbuf/p2pmsg_generated.h b/src/msg/fbuf/p2pmsg_generated.h index e64ea9e2..799b4e80 100644 --- a/src/msg/fbuf/p2pmsg_generated.h +++ b/src/msg/fbuf/p2pmsg_generated.h @@ -34,6 +34,9 @@ struct ProposalMsgBuilder; struct NplMsg; struct NplMsgBuilder; +struct SuppressMsg; +struct SuppressMsgBuilder; + struct HpfsFSHashEntry; struct HpfsFSHashEntryBuilder; @@ -100,11 +103,12 @@ enum P2PMsgContent { P2PMsgContent_PeerListResponseMsg = 11, P2PMsgContent_HpfsLogRequest = 12, P2PMsgContent_HpfsLogResponse = 13, + P2PMsgContent_SuppressMsg = 14, P2PMsgContent_MIN = P2PMsgContent_NONE, - P2PMsgContent_MAX = P2PMsgContent_HpfsLogResponse + P2PMsgContent_MAX = P2PMsgContent_SuppressMsg }; -inline const P2PMsgContent (&EnumValuesP2PMsgContent())[14] { +inline const P2PMsgContent (&EnumValuesP2PMsgContent())[15] { static const P2PMsgContent values[] = { P2PMsgContent_NONE, P2PMsgContent_PeerChallengeMsg, @@ -119,13 +123,14 @@ inline const P2PMsgContent (&EnumValuesP2PMsgContent())[14] { P2PMsgContent_PeerListRequestMsg, P2PMsgContent_PeerListResponseMsg, P2PMsgContent_HpfsLogRequest, - P2PMsgContent_HpfsLogResponse + P2PMsgContent_HpfsLogResponse, + P2PMsgContent_SuppressMsg }; return values; } inline const char * const *EnumNamesP2PMsgContent() { - static const char * const names[15] = { + static const char * const names[16] = { "NONE", "PeerChallengeMsg", "PeerChallengeResponseMsg", @@ -140,13 +145,14 @@ inline const char * const *EnumNamesP2PMsgContent() { "PeerListResponseMsg", "HpfsLogRequest", "HpfsLogResponse", + "SuppressMsg", nullptr }; return names; } inline const char *EnumNameP2PMsgContent(P2PMsgContent e) { - if (flatbuffers::IsOutRange(e, P2PMsgContent_NONE, P2PMsgContent_HpfsLogResponse)) return ""; + if (flatbuffers::IsOutRange(e, P2PMsgContent_NONE, P2PMsgContent_SuppressMsg)) return ""; const size_t index = static_cast(e); return EnumNamesP2PMsgContent()[index]; } @@ -207,9 +213,40 @@ template<> struct P2PMsgContentTraits { static const P2PMsgContent enum_value = P2PMsgContent_HpfsLogResponse; }; +template<> struct P2PMsgContentTraits { + static const P2PMsgContent enum_value = P2PMsgContent_SuppressMsg; +}; + bool VerifyP2PMsgContent(flatbuffers::Verifier &verifier, const void *obj, P2PMsgContent type); bool VerifyP2PMsgContentVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector> *values, const flatbuffers::Vector *types); +enum SuppressReason { + SuppressReason_ContractIdMismatch = 0, + SuppressReason_MIN = SuppressReason_ContractIdMismatch, + SuppressReason_MAX = SuppressReason_ContractIdMismatch +}; + +inline const SuppressReason (&EnumValuesSuppressReason())[1] { + static const SuppressReason values[] = { + SuppressReason_ContractIdMismatch + }; + return values; +} + +inline const char * const *EnumNamesSuppressReason() { + static const char * const names[2] = { + "ContractIdMismatch", + nullptr + }; + return names; +} + +inline const char *EnumNameSuppressReason(SuppressReason e) { + if (flatbuffers::IsOutRange(e, SuppressReason_ContractIdMismatch, SuppressReason_ContractIdMismatch)) return ""; + const size_t index = static_cast(e); + return EnumNamesSuppressReason()[index]; +} + enum HpfsFsEntryResponseType { HpfsFsEntryResponseType_Matched = 0, HpfsFsEntryResponseType_Mismatched = 1, @@ -415,6 +452,9 @@ struct P2PMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { const msg::fbuf::p2pmsg::HpfsLogResponse *content_as_HpfsLogResponse() const { return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_HpfsLogResponse ? static_cast(content()) : nullptr; } + const msg::fbuf::p2pmsg::SuppressMsg *content_as_SuppressMsg() const { + return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_SuppressMsg ? static_cast(content()) : nullptr; + } void *mutable_content() { return GetPointer(VT_CONTENT); } @@ -482,6 +522,10 @@ template<> inline const msg::fbuf::p2pmsg::HpfsLogResponse *P2PMsg::content_as inline const msg::fbuf::p2pmsg::SuppressMsg *P2PMsg::content_as() const { + return content_as_SuppressMsg(); +} + struct P2PMsgBuilder { typedef P2PMsg Table; flatbuffers::FlatBufferBuilder &fbb_; @@ -1338,6 +1382,76 @@ inline flatbuffers::Offset CreateNplMsgDirect( lcl_id); } +struct SuppressMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef SuppressMsgBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_PUBKEY = 4, + VT_REASON = 6 + }; + const flatbuffers::Vector *pubkey() const { + return GetPointer *>(VT_PUBKEY); + } + flatbuffers::Vector *mutable_pubkey() { + return GetPointer *>(VT_PUBKEY); + } + msg::fbuf::p2pmsg::SuppressReason reason() const { + return static_cast(GetField(VT_REASON, 0)); + } + bool mutate_reason(msg::fbuf::p2pmsg::SuppressReason _reason) { + return SetField(VT_REASON, static_cast(_reason), 0); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_PUBKEY) && + verifier.VerifyVector(pubkey()) && + VerifyField(verifier, VT_REASON) && + verifier.EndTable(); + } +}; + +struct SuppressMsgBuilder { + typedef SuppressMsg Table; + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_pubkey(flatbuffers::Offset> pubkey) { + fbb_.AddOffset(SuppressMsg::VT_PUBKEY, pubkey); + } + void add_reason(msg::fbuf::p2pmsg::SuppressReason reason) { + fbb_.AddElement(SuppressMsg::VT_REASON, static_cast(reason), 0); + } + explicit SuppressMsgBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + SuppressMsgBuilder &operator=(const SuppressMsgBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateSuppressMsg( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset> pubkey = 0, + msg::fbuf::p2pmsg::SuppressReason reason = msg::fbuf::p2pmsg::SuppressReason_ContractIdMismatch) { + SuppressMsgBuilder builder_(_fbb); + builder_.add_pubkey(pubkey); + builder_.add_reason(reason); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateSuppressMsgDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const std::vector *pubkey = nullptr, + msg::fbuf::p2pmsg::SuppressReason reason = msg::fbuf::p2pmsg::SuppressReason_ContractIdMismatch) { + auto pubkey__ = pubkey ? _fbb.CreateVector(*pubkey) : 0; + return msg::fbuf::p2pmsg::CreateSuppressMsg( + _fbb, + pubkey__, + reason); +} + struct HpfsFSHashEntry FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef HpfsFSHashEntryBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { @@ -2692,6 +2806,10 @@ inline bool VerifyP2PMsgContent(flatbuffers::Verifier &verifier, const void *obj auto ptr = reinterpret_cast(obj); return verifier.VerifyTable(ptr); } + case P2PMsgContent_SuppressMsg: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } default: return true; } } diff --git a/src/msg/json/controlmsg_json.cpp b/src/msg/json/controlmsg_json.cpp index e9ceb129..d6e54948 100644 --- a/src/msg/json/controlmsg_json.cpp +++ b/src/msg/json/controlmsg_json.cpp @@ -60,45 +60,73 @@ namespace msg::controlmsg::json * { * 'type': 'peer_changeset', * 'add': ['','', ...], - * 'remove': ['','', ...] + * 'remove': ['','', ...] | "*" * } */ - int extract_peer_changeset(std::vector &added_peers, std::vector &removed_peers, const jsoncons::json &d) + int extract_peer_changeset(std::vector &added_peers, std::vector &removed_peers, bool &overwrite, const jsoncons::json &d) { - if (extract_peers_from_array(added_peers, msg::controlmsg::FLD_ADD, d) == -1 || - extract_peers_from_array(removed_peers, msg::controlmsg::FLD_REMOVE, d) == -1) + overwrite = false; + + if (!d.contains(msg::controlmsg::FLD_ADD) || + extract_peers_from_array(added_peers, msg::controlmsg::FLD_ADD, d) == -1) + { return -1; + } + + if (d.contains(msg::controlmsg::FLD_REMOVE)) + { + // Remove field can be string "*" or an array of strings. + // "*" means peers should be overwritten. (remove all peers and add the specified peers) + if (d[msg::controlmsg::FLD_REMOVE].is_string()) + { + const std::string remove_str = d[msg::controlmsg::FLD_REMOVE].as(); + if (remove_str == "*") + { + overwrite = true; + } + else + { + return -1; + } + } + else + { + if (extract_peers_from_array(removed_peers, msg::controlmsg::FLD_REMOVE, d) == -1) + return -1; + } + } + else + { + return -1; + } return 0; } int extract_peers_from_array(std::vector &peers, std::string_view field, const jsoncons::json &d) { - if (d.contains(field)) + if (!d[field].is_array()) { - if (!d[field].is_array()) + LOG_ERROR << "Extract peers: Invalid array field: " << field; + return -1; + } + + for (auto &peer : d[field].array_range()) + { + if (!peer.is()) { - LOG_ERROR << "Extract peers: Invalid array field: " << field; + LOG_ERROR << "Extract peers: Invalid peer entry in field: " << field; return -1; } - for (auto &peer : d[field].array_range()) + conf::peer_ip_port ipp; + if (ipp.from_string(peer.as()) == -1) { - if (!peer.is()) - { - LOG_ERROR << "Extract peers: Invalid peer entry in field: " << field; - return -1; - } - - conf::peer_ip_port ipp; - if (ipp.from_string(peer.as()) == -1) - { - LOG_ERROR << "Extract peers: Invalid peer format in field: " << field; - return -1; - } - - peers.push_back(p2p::peer_properties{ipp, -1, 0, 0}); + LOG_ERROR << "Extract peers: Invalid peer format in field: " << field; + return -1; } + + peers.push_back(p2p::peer_properties{ipp, -1, 0, 0}); } return 0; diff --git a/src/msg/json/controlmsg_json.hpp b/src/msg/json/controlmsg_json.hpp index 3bb126c5..aa9b4870 100644 --- a/src/msg/json/controlmsg_json.hpp +++ b/src/msg/json/controlmsg_json.hpp @@ -13,7 +13,7 @@ namespace msg::controlmsg::json int extract_type(std::string &extracted_type, const jsoncons::json &d); - int extract_peer_changeset(std::vector &added_peers, std::vector &removed_peers, const jsoncons::json &d); + int extract_peer_changeset(std::vector &added_peers, std::vector &removed_peers, bool &overwrite, const jsoncons::json &d); int extract_peers_from_array(std::vector &peers, std::string_view field, const jsoncons::json &d); diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 046e314a..c48016fc 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -37,7 +37,7 @@ namespace p2p metric_thresholds[3] = conf::cfg.mesh.max_bad_msgs_per_min; metric_thresholds[4] = conf::cfg.mesh.idle_timeout; - //Entry point for p2p which will start peer connections to other nodes + // Entry point for p2p which will start peer connections to other nodes if (start_peer_connections() == -1) return -1; @@ -52,9 +52,11 @@ namespace p2p { if (init_success) { - // If peer discovery was enabled, update latest known peers information to config - // before the peer server is stopped. (config will permanently save it to disk upon exit) - if (conf::cfg.mesh.peer_discovery.enabled) + + /** + * Update latest known peers information to config + * before the peer server is stopped. (config will permanently save it to disk upon exit). + */ { std::scoped_lock lock(ctx.server->req_known_remotes_mutex); const std::vector &peers = ctx.server->req_known_remotes; @@ -223,7 +225,7 @@ namespace p2p if (send_to_self) self::send(message); - //Broadcast while locking the peer_connections. + // Broadcast while locking the peer_connections. std::scoped_lock lock(ctx.peer_connections_mutex); for (const auto &[k, session] : ctx.peer_connections) @@ -244,7 +246,7 @@ namespace p2p * @param msg_type The message type. * @param originated_on The originated epoch of the received message. * @return Returns true if the message is qualified for forwarding to peers. False otherwise. - */ + */ bool validate_for_peer_msg_forwarding(const peer_comm_session &session, const enum msg::fbuf::p2pmsg::P2PMsgContent msg_type, const uint64_t originated_on) { // Checking whether the message forwarding is enabled. @@ -291,7 +293,7 @@ namespace p2p */ void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf, std::string &target_pubkey, const bool full_history_only) { - //Send while locking the peer_connections. + // Send while locking the peer_connections. std::scoped_lock lock(ctx.peer_connections_mutex); const size_t connected_peers = ctx.peer_connections.size(); @@ -333,14 +335,14 @@ namespace p2p session = it->second; } - //send message to selected peer. + // send message to selected peer. session->send(msg::fbuf::builder_to_string_view(fbuf)); target_pubkey = session->uniqueid; } /** * Handle proposal message. This is called from peer and self message handlers. - */ + */ void handle_proposal_message(const p2p::proposal &p) { // Check the cap and insert proposal with lock. @@ -355,7 +357,7 @@ namespace p2p /** * Handle nonunl proposal message. This is called from peer and self message handlers. - */ + */ void handle_nonunl_proposal_message(const p2p::nonunl_proposal &nup) { // Check the cap and insert proposal with lock. @@ -379,6 +381,26 @@ namespace p2p } } + /** + * Handle a suppress message. This message is issued by a peer to suppress connection. + */ + void handle_suppress_message(const p2p::suppress_message &suppression, peer_comm_session *session) + { + if (suppression.reason == SUPPRESS_REASON::CONTRACT_MISMATCH) + { + LOG_DEBUG << "Peer " << session->known_ipport->to_string() << " suppressed us. Reason (" << SUPPRESS_REASON::CONTRACT_MISMATCH << ")."; + const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) + { return p.ip_port == session->known_ipport; }); + if (itr != ctx.server->req_known_remotes.end()) + { + LOG_DEBUG << "Marking to omit peer " << session->known_ipport->to_string(); + itr->has_suppressed_us = true; + } + } + else + LOG_DEBUG << "Invalid suppressing reason."; + } + /** * Sends the peer requirement to the given peer session. If a session is not given, broadcast to all the connected peers. * @param need_consensus_msg_forwarding True if the number of connections are below the threshold value. @@ -465,22 +487,41 @@ namespace p2p } /** - * Merging the response peer list with the own known peer list. - * @param merge_peers Peers that must be merged with existing known peers. - * @param remove_peers Peers that must be removed from existing known peers. - * @param from The session that sent us the peer list. + * Update the known peer list with the specified modifications. + * @param mode Update applying priority. + * @param add_peers Peers that must be added to existing known peers. + * @param remove_peers Peers that must be removed from existing known peers. Ignored if mode is OVERWRITE. + * @param from The session that sent us the peer list. Optional. */ - void merge_peer_list(const std::string &caller, const std::vector *merge_peers, const std::vector *remove_peers, const p2p::peer_comm_session *from) + void update_peer_list(const p2p::PEERS_UPDATE_MODE mode, const std::vector *add_peers, + const std::vector *remove_peers, const p2p::peer_comm_session *from) { std::scoped_lock lock(ctx.server->req_known_remotes_mutex); - if (merge_peers) + if (mode == p2p::PEERS_UPDATE_MODE::OVERWRITE) { - for (const peer_properties &peer : *merge_peers) + for (const peer_properties &kp : ctx.server->req_known_remotes) + { + std::scoped_lock lock(ctx.peer_connections_mutex); + const auto itr = std::find_if(ctx.peer_connections.begin(), ctx.peer_connections.end(), [&](const std::pair &pc) + { return pc.second->known_ipport == kp.ip_port; }); + if (itr != ctx.peer_connections.end()) + { + LOG_DEBUG << "Marking to close the session of removing peer conn:" << kp.ip_port.to_string(); + peer_comm_session &session_to_close = *itr->second; + session_to_close.mark_for_closure(comm::CLOSE_VIOLATION::VIOLATION_IRRELEVANT_KNOWN_PEER); + } + } + ctx.server->req_known_remotes.clear(); + } + + if (add_peers) + { + for (const peer_properties &peer : *add_peers) { if (peer.ip_port.host_address.empty()) { - LOG_DEBUG << caller << " : Skip received peer with blank host address " << peer.ip_port.to_string() << " from " << peer.ip_port.to_string(); + LOG_DEBUG << "Skip received peer with blank host address " << peer.ip_port.to_string() << " from " << peer.ip_port.to_string(); continue; } @@ -498,10 +539,17 @@ namespace p2p continue; } - if (ctx.server->dead_known_peers.exists(peer.ip_port.to_string())) + if (mode == p2p::PEERS_UPDATE_MODE::MERGE) { - LOG_DEBUG << "Rejecting " + peer.ip_port.to_string() + ". Peer was removed prior due to unavailability."; - continue; + if (ctx.server->dead_known_peers.exists(peer.ip_port.to_string())) + { + LOG_DEBUG << "Rejecting " + peer.ip_port.to_string() + ". Peer was removed prior due to unavailability."; + continue; + } + } + else + { + ctx.server->dead_known_peers.erase(peer.ip_port.to_string()); } const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) @@ -531,7 +579,7 @@ namespace p2p } } - if (remove_peers) + if (mode != p2p::PEERS_UPDATE_MODE::OVERWRITE && remove_peers) { for (const peer_properties &peer : *remove_peers) { @@ -541,13 +589,23 @@ namespace p2p if (itr != ctx.server->req_known_remotes.end()) { LOG_DEBUG << "Removing " << peer.ip_port.to_string() << " from known peer list."; + std::scoped_lock lock(ctx.peer_connections_mutex); + const auto conn_itr = std::find_if(ctx.peer_connections.begin(), ctx.peer_connections.end(), [&](const std::pair &pc) + { return pc.second->known_ipport == itr->ip_port; }); + + if (conn_itr != ctx.peer_connections.end()) + { + LOG_DEBUG << "Marking to close the session of removing peer conn:" << peer.ip_port.to_string(); + peer_comm_session &session_to_close = *conn_itr->second; + session_to_close.mark_for_closure(comm::CLOSE_VIOLATION::VIOLATION_IRRELEVANT_KNOWN_PEER); + } ctx.server->req_known_remotes.erase(itr); } } } // Sorting the known remote list according to the weight value after merging the peer list. - if (merge_peers || remove_peers) + if (add_peers || remove_peers) sort_known_remotes(); } @@ -593,7 +651,7 @@ namespace p2p /** * Update the peer trusted status on unl list updates. - */ + */ void update_unl_connections() { std::scoped_lock lock(ctx.peer_connections_mutex); diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 2d7db49c..2890bf5a 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -19,7 +19,14 @@ namespace p2p constexpr uint16_t HPFS_RES_LIST_CAP = 255; // Maximum state response count. constexpr uint16_t LOG_RECORD_REQ_LIST_CAP = 255; // Maximum log record request count. constexpr uint16_t LOG_RECORD_RES_LIST_CAP = 255; // Maximum log record response count. - constexpr uint16_t PEER_LIST_CAP = 128; // Maximum peer count. + constexpr uint16_t PEER_LIST_CAP = 256; // Maximum peer count. + + enum PEERS_UPDATE_MODE + { + MERGE = 0, // Gracefully merge new changes to our known peers. + FORCE = 1, // Update our known peers while giving priority to specified changes. + OVERWRITE = 2 // Completely overwrite our known peers with the given set of peers. + }; // Struct to represent information about a peer. // Initially available capacity is set to -1 and timestamp is set to 0. @@ -31,6 +38,7 @@ namespace p2p uint64_t timestamp = 0; int64_t weight = 0; int32_t failed_attempts = 0; + bool has_suppressed_us = false; }; struct proposal @@ -117,6 +125,18 @@ namespace p2p NOT_AVAILABLE = 3 // The entry does not exist on responder side. Requester must delete this on his side. }; + enum SUPPRESS_REASON + { + CONTRACT_MISMATCH = 0 // Suppress due to contract mismatch. + }; + + // Represents a peer suppression. + struct suppress_message + { + std::string pubkey; // Peer binary pubkey. + SUPPRESS_REASON reason; + }; + // Represents hpfs file system entry. struct hpfs_fs_hash_entry { @@ -226,6 +246,8 @@ namespace p2p void handle_npl_message(const p2p::npl_message &npl); + void handle_suppress_message(const p2p::suppress_message &suppression, peer_comm_session *session); + bool validate_for_peer_msg_forwarding(const peer_comm_session &session, const enum msg::fbuf::p2pmsg::P2PMsgContent msg_type, const uint64_t originated_on); void send_peer_requirement_announcement(const bool need_consensus_msg_forwarding, peer_comm_session *session = NULL); @@ -238,7 +260,8 @@ namespace p2p void update_known_peer_available_capacity(const conf::peer_ip_port &ip_port, const int16_t available_capacity, const uint64_t ×tamp); - void merge_peer_list(const std::string &caller, const std::vector *merge_peers, const std::vector *remove_peers, const p2p::peer_comm_session *from = NULL); + void update_peer_list(const p2p::PEERS_UPDATE_MODE mode, const std::vector *add_peers, + const std::vector *remove_peers, const p2p::peer_comm_session *from = NULL); void sort_known_remotes(); diff --git a/src/p2p/peer_comm_server.cpp b/src/p2p/peer_comm_server.cpp index cbfd0dc1..e8c92996 100644 --- a/src/p2p/peer_comm_server.cpp +++ b/src/p2p/peer_comm_server.cpp @@ -153,8 +153,8 @@ namespace p2p if (conf::cfg.mesh.max_connections != 0 && known_remote_count == conf::cfg.mesh.max_connections) break; - // Continue if the peer has no free slots. - if (peer.available_capacity == 0) + // Continue if the peer has no free slots or the peer has issued a suppression. + if (peer.available_capacity == 0 || peer.has_suppressed_us) continue; if (peer.ip_port.host_address.empty()) diff --git a/src/p2p/peer_comm_session.cpp b/src/p2p/peer_comm_session.cpp index cc749859..95d7b54c 100644 --- a/src/p2p/peer_comm_session.cpp +++ b/src/p2p/peer_comm_session.cpp @@ -106,12 +106,28 @@ namespace p2p if (mi.type == p2pmsg::P2PMsgContent_PeerChallengeMsg) { const p2p::peer_challenge chall = p2pmsg::create_peer_challenge_from_msg(mi); + flatbuffers::FlatBufferBuilder fbuf; // Check whether contract ids match. if (chall.contract_id != conf::cfg.contract.id) { LOG_ERROR << "Contract id mismatch. Dropping connection " << display_name(); - return -1; + + if (is_inbound) + { + // Sending the a graceful rejection. + p2pmsg::create_suppress_msg(fbuf, p2p::SUPPRESS_REASON::CONTRACT_MISMATCH); + + /** + * Returning with suppression message. + * If we do return -1 here, the session will be closed before the sending the message. + */ + return send(msg::fbuf::builder_to_string_view(fbuf)); + + } + + // Returning 0, but the session will be removed when corresponding peer suppression is received. + return 0; } // Remember the time config reported by this peer. @@ -121,7 +137,6 @@ namespace p2p is_full_history = chall.is_full_history; // Sending the challenge response to the sender. - flatbuffers::FlatBufferBuilder fbuf; p2pmsg::create_peer_challenge_response_from_challenge(fbuf, chall.challenge); return send(msg::fbuf::builder_to_string_view(fbuf)); } @@ -131,6 +146,14 @@ namespace p2p if (challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_ISSUED) return p2p::resolve_peer_challenge(*this, p2pmsg::create_peer_challenge_response_from_msg(mi)); } + else if (mi.type == p2pmsg::P2PMsgContent_SuppressMsg) + { + LOG_DEBUG << "Received suppress message. " << display_name(); + handle_suppress_message(p2pmsg::create_suppress_from_msg(mi), this); + + // Returning -1 to close the session as this should not be progressed. + return -1; + } if (challenge_status != comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED) { @@ -141,7 +164,7 @@ namespace p2p if (mi.type == p2pmsg::P2PMsgContent_PeerListResponseMsg) { const std::vector merge_peers = p2pmsg::create_peer_list_response_from_msg(mi); - p2p::merge_peer_list("Peer_Discovery", &merge_peers, NULL, this); + p2p::update_peer_list(p2p::PEERS_UPDATE_MODE::MERGE, &merge_peers, NULL, this); } else if (mi.type == p2pmsg::P2PMsgContent_PeerListRequestMsg) { diff --git a/src/sc/sc.cpp b/src/sc/sc.cpp index dca58633..284da8a5 100644 --- a/src/sc/sc.cpp +++ b/src/sc/sc.cpp @@ -478,13 +478,13 @@ namespace sc break; } - // Atempt to read messages from contract (regardless of contract terminated or not). + // Attempt to read messages from contract (regardless of contract terminated or not). const int control_read_res = read_control_outputs(ctx, out_fds[control_fd_idx]); const int npl_read_res = ctx.args.readonly ? 0 : read_npl_outputs(ctx, &out_fds[npl_fd_idx]); const int user_read_res = read_contract_fdmap_outputs(ctx.user_fds, out_fds, ctx.args.userbufs); messages_read = (control_read_res + npl_read_res + user_read_res) > 0; - if (ctx.termination_signaled || ctx.contract_pid == 0) + if (ctx.contract_pid == 0) { // If no messages were read after contract finished execution, exit the polling loop. // Otherwise keep running the loop becaue there might be further messages to read. @@ -523,9 +523,8 @@ namespace sc // Check if the contract has exited voluntarily. if (check_contract_exited(ctx, false) == 0) { - // Issue kill signal if the contract hasn't indicated the termination control message. - if (!ctx.termination_signaled) - kill(ctx.contract_pid, SIGTERM); + // Issue kill signal to kill the contract process. + kill(ctx.contract_pid, SIGKILL); check_contract_exited(ctx, true); // Blocking wait until exit. } } @@ -620,6 +619,9 @@ namespace sc { if (write_iosocket_seq_packet(ctx.control_fds, control_msg) == -1) { + // Consider that no write operation occurred; assume that contract termination might have caused these errors. + if (errno == EPIPE || errno == ECONNRESET) + return 0; LOG_ERROR << "Error writing HP inputs to SC"; return -1; } @@ -658,6 +660,9 @@ namespace sc // Writing the public key to the contract's fd (Skip first byte for key type prefix). if (write(writefd, pubkeyhex.data(), pubkeyhex.size()) == -1) { + // Consider that no write operation occurred; assume that contract termination might have caused these errors. if (errno == EPIPE || errno == ECONNRESET) + if (errno == EPIPE || errno == ECONNRESET) + return 0; LOG_ERROR << errno << ": Error writing npl message pubkey."; return -1; } @@ -665,6 +670,9 @@ namespace sc // Writing the message to the contract's fd. if (write(writefd, npl_msg.data.data(), npl_msg.data.size()) == -1) { + // Consider that no write operation occurred; assume that contract termination might have caused these errors. + if (errno == EPIPE || errno == ECONNRESET) + return 0; LOG_ERROR << errno << ": Error writing npl message data."; return -1; } @@ -676,7 +684,6 @@ namespace sc LOG_DEBUG << "NPL message dropped due to last primary shard mismatch."; } } - return 0; } @@ -1030,7 +1037,7 @@ namespace sc * @param is_stream_socket Indicates whether socket is steam socket or not. * @param pfd The pollfd struct containing poll status. * @param output The buffer to place the read output. - * @return -1 on error. Otherwise no. of bytes read. + * @return Returns -2 on neutral read, -1 on error, Otherwise no. of bytes read. */ int read_iosocket(const bool is_stream_socket, const pollfd pfd, std::string &output) { @@ -1045,7 +1052,11 @@ namespace sc if (res == -1) { - LOG_ERROR << errno << ": Error reading from contract socket. stream:" << is_stream_socket; + // Assuming that EPIPE or ECONNRESET resulted from contract termination, consider this as a neutral read. + if (errno == EPIPE || errno == ECONNRESET) + return -2; + else + LOG_ERROR << errno << ": Error reading from contract socket. stream:" << is_stream_socket; } return res; @@ -1139,16 +1150,21 @@ namespace sc if (parser.parse(msg) == -1 || parser.extract_type(type) == -1) return; - if (type == msg::controlmsg::MSGTYPE_CONTRACT_END) + if (type == msg::controlmsg::MSGTYPE_PEER_CHANGESET) { - ctx.termination_signaled = true; - } - else if (type == msg::controlmsg::MSGTYPE_PEER_CHANGESET) - { - std::vector added_peers; - std::vector removed_peers; - if (parser.extract_peer_changeset(added_peers, removed_peers) != -1) - p2p::merge_peer_list("Control_MSG", &added_peers, &removed_peers); + if (!conf::cfg.mesh.peer_discovery.enabled) + { + std::vector added_peers; + std::vector removed_peers; + bool overwrite = false; + if (parser.extract_peer_changeset(added_peers, removed_peers, overwrite) != -1) + { + const p2p::PEERS_UPDATE_MODE update_mode = (overwrite ? p2p::PEERS_UPDATE_MODE::OVERWRITE : p2p::PEERS_UPDATE_MODE::FORCE); + p2p::update_peer_list(update_mode, &added_peers, &removed_peers); + } + } + else + LOG_WARNING << "Not allowed to update peers via control msgs, as peer discovery is enabled."; } } diff --git a/src/sc/sc.hpp b/src/sc/sc.hpp index 0aa101f9..7ef679b6 100644 --- a/src/sc/sc.hpp +++ b/src/sc/sc.hpp @@ -134,9 +134,6 @@ namespace sc std::string stdout_file; std::string stderr_file; - // Indicates that the contract has sent termination control message. - bool termination_signaled = false; - // Indicates whether the contract exited normally without any errors. bool exit_success = false; diff --git a/src/util/version.hpp b/src/util/version.hpp index 7cf6d3b5..2a0e4bc4 100644 --- a/src/util/version.hpp +++ b/src/util/version.hpp @@ -6,7 +6,7 @@ namespace version { // HotPocket version. Written to new configs and p2p/user messages. - constexpr const char *HP_VERSION = "0.6.3"; + constexpr const char *HP_VERSION = "0.6.4"; // Minimum compatible config version (this will be used to validate configs). constexpr const char *MIN_CONFIG_VERSION = "0.6.3"; diff --git a/test/local-cluster/cluster-start.sh b/test/local-cluster/cluster-start.sh index 8b7ea043..5c09c737 100755 --- a/test/local-cluster/cluster-start.sh +++ b/test/local-cluster/cluster-start.sh @@ -14,7 +14,7 @@ fi clusterloc=$(pwd)/hpcluster n=$1 -hpversion=0.6.3 +hpversion=0.6.4 let pubport=8080+$n let peerport=22860+$n diff --git a/test/local-cluster/consensus-test-continuous.sh b/test/local-cluster/consensus-test-continuous.sh index bc914d52..07e701d5 100644 --- a/test/local-cluster/consensus-test-continuous.sh +++ b/test/local-cluster/consensus-test-continuous.sh @@ -5,7 +5,7 @@ WINDOWSIZE=60 # size of window in seconds to examine for successful consensus ro PIPE=concon.pipe clusterloc=$(pwd)/hpcluster n=1 -hpversion=0.6.3 +hpversion=0.6.4 let pubport=8080+$n while true; do diff --git a/test/local-cluster/consensus-test-loop.sh b/test/local-cluster/consensus-test-loop.sh index 306e09a5..9d5782f6 100644 --- a/test/local-cluster/consensus-test-loop.sh +++ b/test/local-cluster/consensus-test-loop.sh @@ -3,7 +3,7 @@ clusterloc=$(pwd)/hpcluster n=1 -hpversion=0.6.3 +hpversion=0.6.4 let pubport=8080+$n while true; do CONSENSUS="0" diff --git a/test/local-cluster/rundir.sh b/test/local-cluster/rundir.sh index 087ced31..e95c9058 100755 --- a/test/local-cluster/rundir.sh +++ b/test/local-cluster/rundir.sh @@ -15,7 +15,7 @@ fi dir=$(realpath $1) dirname=$(basename $dir) n=$1 -hpversion=0.6.3 +hpversion=0.6.4 let pubport=8080