From fabfdcce89c7572ca45fa8d4073a387f9b9e8622 Mon Sep 17 00:00:00 2001 From: Savinda Senevirathne Date: Fri, 23 Oct 2020 16:57:01 +0530 Subject: [PATCH] Weakly connected status announcement. (#135) * Forward others' messages only to the weakly connected nodes instead of broadcasting to all the connected peers. * Announcing connected status depends on a threshold to other connected peers. * Forwarding messages of weakly connected peers to other peers. --- src/comm/comm_server.cpp | 37 ++++++++++++ src/comm/comm_server.hpp | 3 + src/comm/comm_session.cpp | 1 + src/comm/comm_session.hpp | 15 ++--- src/msg/fbuf/p2pmsg_content.fbs | 17 +++++- src/msg/fbuf/p2pmsg_content_generated.h | 75 +++++++++++++++++++++++-- src/msg/fbuf/p2pmsg_helpers.cpp | 30 ++++++++-- src/msg/fbuf/p2pmsg_helpers.hpp | 4 +- src/p2p/p2p.cpp | 30 ++++++++-- src/p2p/p2p.hpp | 6 +- src/p2p/peer_session_handler.cpp | 52 ++++++++--------- 11 files changed, 216 insertions(+), 54 deletions(-) diff --git a/src/comm/comm_server.cpp b/src/comm/comm_server.cpp index afa4aedd..1681ac69 100644 --- a/src/comm/comm_server.cpp +++ b/src/comm/comm_server.cpp @@ -5,10 +5,12 @@ #include "../util.hpp" #include "../bill/corebill.h" #include "../hpws/hpws.hpp" +#include "../p2p/p2p.hpp" namespace comm { constexpr uint32_t DEFAULT_MAX_MSG_SIZE = 16 * 1024 * 1024; + constexpr float WEAKLY_CONNECTED_THRESHOLD = 0.7; int comm_server::start( const uint16_t port, const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4], @@ -66,6 +68,30 @@ namespace comm else ++itr; } + + flatbuffers::FlatBufferBuilder fbuf(1024); + if (sessions.size() > 1) + { + if (is_weakly_connected()) + { + if (!weakly_connected_status_sent) + { + LOG_DEBUG << "Weakly connected status announcement sent"; + p2p::send_connected_status_announcement(fbuf, true); + // Mark that the p2p message forwarding is requested. + weakly_connected_status_sent = true; + } + } + else + { + if (weakly_connected_status_sent) + { + LOG_DEBUG << "Strongly connected status announcement sent"; + p2p::send_connected_status_announcement(fbuf, false); + weakly_connected_status_sent = false; + } + } + } } // If we reach this point that means we are shutting down. @@ -79,6 +105,11 @@ namespace comm LOG_INFO << (session_type == SESSION_TYPE::USER ? "User" : "Peer") << " listener stopped."; } + bool comm_server::is_weakly_connected() + { + return (sessions.size() - 1) < (conf::cfg.unl.size() * WEAKLY_CONNECTED_THRESHOLD); + } + void comm_server::check_for_new_connection( std::list &sessions, const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4]) { @@ -123,6 +154,12 @@ namespace comm // in accessing class member variables inside the thread. // Class member variables gives unacceptable values if the thread starts before the move operation. inserted_session.start_messaging_threads(); + // Making sure the newly connected node get the weakly connected announcement if the number of + // connected peers are under the threshold. + if ((sessions.size() > 1) && is_weakly_connected()) + { + weakly_connected_status_sent = false; + } } } } diff --git a/src/comm/comm_server.hpp b/src/comm/comm_server.hpp index fdddcd79..bc8bb5a9 100644 --- a/src/comm/comm_server.hpp +++ b/src/comm/comm_server.hpp @@ -14,6 +14,7 @@ namespace comm std::thread watchdog_thread; // Connection watcher thread. std::thread inbound_message_processor_thread; // Incoming message processor thread. bool should_stop_listening = false; + bool weakly_connected_status_sent = false; // keep track whether the weakly connected status announcement is sent or not. std::list sessions; std::mutex sessions_mutex; @@ -37,6 +38,8 @@ namespace comm std::string get_cgi_ip(const int fd); + bool is_weakly_connected(); + public: // Start accepting incoming connections int start( diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index d01889cc..afec3d35 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -179,6 +179,7 @@ namespace comm if (out_msg_queue.try_dequeue(msg_to_send)) { process_outbound_message(msg_to_send); + util::sleep(1); } else { diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp index cb75ede7..d7b4c751 100644 --- a/src/comm/comm_session.hpp +++ b/src/comm/comm_session.hpp @@ -38,12 +38,12 @@ namespace comm private: std::optional hpws_client; const SESSION_TYPE session_type; - std::vector thresholds; // track down various communication thresholds - - std::thread reader_thread; // The thread responsible for reading messages from the read fd. - std::thread writer_thread; // The thread responsible for writing messages to the write fd. - moodycamel::ReaderWriterQueue> in_msg_queue; // Holds incoming messages waiting to be processed. - moodycamel::ConcurrentQueue out_msg_queue; // Holds outgoing messages waiting to be processed. + std::vector thresholds; // track down various communication thresholds + + std::thread reader_thread; // The thread responsible for reading messages from the read fd. + std::thread writer_thread; // The thread responsible for writing messages to the write fd. + moodycamel::ReaderWriterQueue> in_msg_queue; // Holds incoming messages waiting to be processed. + moodycamel::ConcurrentQueue out_msg_queue; // Holds outgoing messages waiting to be processed. void reader_loop(); @@ -56,6 +56,7 @@ namespace comm conf::ip_port_pair known_ipport; SESSION_STATE state = SESSION_STATE::NOT_INITIALIZED; CHALLENGE_STATUS challenge_status = CHALLENGE_STATUS::NOT_ISSUED; + bool is_weakly_connected = false; // Holds whether this node is weakly connected to the other nodes. comm_session( std::string_view ip, hpws::client &&hpws_client, const SESSION_TYPE session_type, @@ -76,4 +77,4 @@ namespace comm } // namespace comm -#endif \ No newline at end of file +#endif diff --git a/src/msg/fbuf/p2pmsg_content.fbs b/src/msg/fbuf/p2pmsg_content.fbs index df213ded..5a8bd5e8 100644 --- a/src/msg/fbuf/p2pmsg_content.fbs +++ b/src/msg/fbuf/p2pmsg_content.fbs @@ -24,7 +24,18 @@ table UserInputGroup { messages:[UserInput]; } -union Message { Peer_Challenge_Response_Message, Peer_Challenge_Message, NonUnl_Proposal_Message, Proposal_Message, Npl_Message, State_Request_Message, State_Response_Message, History_Request_Message, History_Response_Message } //message content type +union Message { + Peer_Challenge_Response_Message, + Peer_Challenge_Message, + NonUnl_Proposal_Message, + Proposal_Message, + Npl_Message, + State_Request_Message, + State_Response_Message, + History_Request_Message, + History_Response_Message, + Connected_Status_Announcement_Message +} //message content type table Content { message:Message; @@ -110,4 +121,8 @@ table State_FS_Hash_Entry{ hash: [ubyte]; } +table Connected_Status_Announcement_Message{ + is_weakly_connected: bool; +} + root_type Content; //root type for message content \ No newline at end of file diff --git a/src/msg/fbuf/p2pmsg_content_generated.h b/src/msg/fbuf/p2pmsg_content_generated.h index 6f0f9a10..2511e1a2 100644 --- a/src/msg/fbuf/p2pmsg_content_generated.h +++ b/src/msg/fbuf/p2pmsg_content_generated.h @@ -66,6 +66,9 @@ struct Block_ResponseBuilder; struct State_FS_Hash_Entry; struct State_FS_Hash_EntryBuilder; +struct Connected_Status_Announcement_Message; +struct Connected_Status_Announcement_MessageBuilder; + enum Message { Message_NONE = 0, Message_Peer_Challenge_Response_Message = 1, @@ -77,11 +80,12 @@ enum Message { Message_State_Response_Message = 7, Message_History_Request_Message = 8, Message_History_Response_Message = 9, + Message_Connected_Status_Announcement_Message = 10, Message_MIN = Message_NONE, - Message_MAX = Message_History_Response_Message + Message_MAX = Message_Connected_Status_Announcement_Message }; -inline const Message (&EnumValuesMessage())[10] { +inline const Message (&EnumValuesMessage())[11] { static const Message values[] = { Message_NONE, Message_Peer_Challenge_Response_Message, @@ -92,13 +96,14 @@ inline const Message (&EnumValuesMessage())[10] { Message_State_Request_Message, Message_State_Response_Message, Message_History_Request_Message, - Message_History_Response_Message + Message_History_Response_Message, + Message_Connected_Status_Announcement_Message }; return values; } inline const char * const *EnumNamesMessage() { - static const char * const names[11] = { + static const char * const names[12] = { "NONE", "Peer_Challenge_Response_Message", "Peer_Challenge_Message", @@ -109,13 +114,14 @@ inline const char * const *EnumNamesMessage() { "State_Response_Message", "History_Request_Message", "History_Response_Message", + "Connected_Status_Announcement_Message", nullptr }; return names; } inline const char *EnumNameMessage(Message e) { - if (flatbuffers::IsOutRange(e, Message_NONE, Message_History_Response_Message)) return ""; + if (flatbuffers::IsOutRange(e, Message_NONE, Message_Connected_Status_Announcement_Message)) return ""; const size_t index = static_cast(e); return EnumNamesMessage()[index]; } @@ -160,6 +166,10 @@ template<> struct MessageTraits { static const Message enum_value = Message_History_Response_Message; }; +template<> struct MessageTraits { + static const Message enum_value = Message_Connected_Status_Announcement_Message; +}; + bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Message type); bool VerifyMessageVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector> *values, const flatbuffers::Vector *types); @@ -574,6 +584,9 @@ struct Content FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { const msg::fbuf::p2pmsg::History_Response_Message *message_as_History_Response_Message() const { return message_type() == msg::fbuf::p2pmsg::Message_History_Response_Message ? static_cast(message()) : nullptr; } + const msg::fbuf::p2pmsg::Connected_Status_Announcement_Message *message_as_Connected_Status_Announcement_Message() const { + return message_type() == msg::fbuf::p2pmsg::Message_Connected_Status_Announcement_Message ? static_cast(message()) : nullptr; + } void *mutable_message() { return GetPointer(VT_MESSAGE); } @@ -622,6 +635,10 @@ template<> inline const msg::fbuf::p2pmsg::History_Response_Message *Content::me return message_as_History_Response_Message(); } +template<> inline const msg::fbuf::p2pmsg::Connected_Status_Announcement_Message *Content::message_as() const { + return message_as_Connected_Status_Announcement_Message(); +} + struct ContentBuilder { typedef Content Table; flatbuffers::FlatBufferBuilder &fbb_; @@ -1688,6 +1705,50 @@ inline flatbuffers::Offset CreateState_FS_Hash_EntryDirect( hash__); } +struct Connected_Status_Announcement_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef Connected_Status_Announcement_MessageBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_IS_WEAKLY_CONNECTED = 4 + }; + bool is_weakly_connected() const { + return GetField(VT_IS_WEAKLY_CONNECTED, 0) != 0; + } + bool mutate_is_weakly_connected(bool _is_weakly_connected) { + return SetField(VT_IS_WEAKLY_CONNECTED, static_cast(_is_weakly_connected), 0); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyField(verifier, VT_IS_WEAKLY_CONNECTED) && + verifier.EndTable(); + } +}; + +struct Connected_Status_Announcement_MessageBuilder { + typedef Connected_Status_Announcement_Message Table; + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_is_weakly_connected(bool is_weakly_connected) { + fbb_.AddElement(Connected_Status_Announcement_Message::VT_IS_WEAKLY_CONNECTED, static_cast(is_weakly_connected), 0); + } + explicit Connected_Status_Announcement_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateConnected_Status_Announcement_Message( + flatbuffers::FlatBufferBuilder &_fbb, + bool is_weakly_connected = false) { + Connected_Status_Announcement_MessageBuilder builder_(_fbb); + builder_.add_is_weakly_connected(is_weakly_connected); + return builder_.Finish(); +} + inline bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Message type) { switch (type) { case Message_NONE: { @@ -1729,6 +1790,10 @@ inline bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Mess auto ptr = reinterpret_cast(obj); return verifier.VerifyTable(ptr); } + case Message_Connected_Status_Announcement_Message: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } default: return true; } } diff --git a/src/msg/fbuf/p2pmsg_helpers.cpp b/src/msg/fbuf/p2pmsg_helpers.cpp index 4d375591..df3915dc 100644 --- a/src/msg/fbuf/p2pmsg_helpers.cpp +++ b/src/msg/fbuf/p2pmsg_helpers.cpp @@ -3,7 +3,6 @@ #include "../../crypto.hpp" #include "../../util.hpp" #include "../../hplog.hpp" -#include "../../p2p/p2p.hpp" #include "../../hpfs/h32.hpp" #include "../../hpfs/hpfs.hpp" #include "p2pmsg_container_generated.h" @@ -415,7 +414,7 @@ namespace msg::fbuf::p2pmsg // Now that we have built the content message, // we need to sign it and place it inside a container message. - create_containermsg_from_content(container_builder, builder, {}, true); + create_containermsg_from_content(container_builder, builder, {}, false); } /** @@ -439,7 +438,7 @@ namespace msg::fbuf::p2pmsg // Now that we have built the content message, // we need to sign it and place it inside a container message. - create_containermsg_from_content(container_builder, builder, {}, true); + create_containermsg_from_content(container_builder, builder, {}, false); } /** @@ -464,7 +463,7 @@ namespace msg::fbuf::p2pmsg // Now that we have built the content message, // we need to sign it and place it inside a container message. - create_containermsg_from_content(container_builder, builder, lcl, true); + create_containermsg_from_content(container_builder, builder, lcl, false); } /** @@ -569,6 +568,28 @@ namespace msg::fbuf::p2pmsg create_containermsg_from_content(container_builder, builder, lcl, true); } + /** + * Create connected status announcement message. + * @param container_builder Flatbuffer builder for the container message. + * @param is_weakly_connected True if number of connections are below threshold and false otherwise. + * @param lcl Lcl value to be passed in the container message. + */ + void create_msg_for_connected_status_announcement(flatbuffers::FlatBufferBuilder &container_builder, const bool is_weakly_connected, std::string_view lcl) + { + flatbuffers::FlatBufferBuilder builder(1024); + + const flatbuffers::Offset announcement = + CreateConnected_Status_Announcement_Message( + builder, + is_weakly_connected); + + const flatbuffers::Offset message = CreateContent(builder, Message_Connected_Status_Announcement_Message, announcement.Union()); + builder.Finish(message); // Finished building message content to get serialised content. + + // Now that we have built the content message, + create_containermsg_from_content(container_builder, builder, lcl, false); + } + /** * Creates a Flatbuffer container message from the given Content message. * @param container_builder The Flatbuffer builder to which the final container message should be written to. @@ -739,5 +760,4 @@ namespace msg::fbuf::p2pmsg } return builder.CreateVector(fbvec); } - } // namespace msg::fbuf::p2pmsg \ No newline at end of file diff --git a/src/msg/fbuf/p2pmsg_helpers.hpp b/src/msg/fbuf/p2pmsg_helpers.hpp index 9fc4e4ba..e3656103 100644 --- a/src/msg/fbuf/p2pmsg_helpers.hpp +++ b/src/msg/fbuf/p2pmsg_helpers.hpp @@ -95,6 +95,8 @@ namespace msg::fbuf::p2pmsg flatbuffers::FlatBufferBuilder &builder, std::vector &hash_nodes); + void create_msg_for_connected_status_announcement(flatbuffers::FlatBufferBuilder &container_builder, const bool is_weakly_connected, std::string_view lcl); + } // namespace msg::fbuf::p2pmsg -#endif \ No newline at end of file +#endif diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index e708be5d..6236146f 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -5,6 +5,8 @@ #include "../util.hpp" #include "../hplog.hpp" #include "p2p.hpp" +#include "../msg/fbuf/p2pmsg_helpers.hpp" +#include "../ledger.hpp" namespace p2p { @@ -124,6 +126,9 @@ namespace p2p ex_session.mark_for_closure(); p2p::ctx.peer_connections.erase(iter); // remove existing session. + // We have to keep the weekly connected status of the removed session object. + // If not, connected status received prior to connection dropping will be lost. + session.is_weakly_connected = ex_session.is_weakly_connected; p2p::ctx.peer_connections.try_emplace(session.uniqueid, &session); // add new session. LOG_DEBUG << "Replacing existing connection [" << session.uniqueid.substr(0, 10) << "]"; @@ -146,20 +151,23 @@ namespace p2p * Broadcasts the given message to all currently connected outbound peers. * @param fbuf Peer outbound message to be broadcasted. * @param send_to_self Whether to also send the message to self (this node). + * @param is_msg_forwarding Whether this broadcast is for message forwarding. */ - void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self) + void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding) { std::string_view msg = std::string_view( reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); - broadcast_message(msg, send_to_self); + + broadcast_message(msg, send_to_self, is_msg_forwarding); } /** * Broadcast the given message to all connected outbound peers. * @param message Message to be forwarded. + * @param is_msg_forwarding Whether this broadcast is for message forwarding. * @param skipping_session Session to be skipped in message forwarding(optional). */ - void broadcast_message(std::string_view message, const bool send_to_self, const comm::comm_session *skipping_session) + void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding, const comm::comm_session *skipping_session) { if (ctx.peer_connections.size() == 0) { @@ -173,7 +181,10 @@ namespace p2p for (const auto &[k, session] : ctx.peer_connections) { // Exclude given session and self if provided. - if ((!send_to_self && session->is_self) || (skipping_session && skipping_session == session)) + // Messages are forwarded only to the weakly connected nodes only in the message forwarding mode. + if ((!send_to_self && session->is_self) || + (skipping_session && skipping_session == session) || + (is_msg_forwarding && !session->is_weakly_connected)) continue; session->send(message); @@ -272,4 +283,15 @@ namespace p2p } } + /** + * Sends the connected status broadcast announcement to all the connected peers. + * @param fbuf Peer outbound message to be sent to peer. + * @param is_weakly_connected True if the number of connections are below the threshold value. + */ + void send_connected_status_announcement(flatbuffers::FlatBufferBuilder &fbuf, const bool is_weakly_connected) + { + msg::fbuf::p2pmsg::create_msg_for_connected_status_announcement(fbuf, is_weakly_connected, ledger::ctx.get_lcl()); + p2p::broadcast_message(fbuf, false); + } + } // namespace p2p \ No newline at end of file diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index f063aa97..04d0d5fb 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -137,15 +137,17 @@ namespace p2p int resolve_peer_challenge(comm::comm_session &session, const peer_challenge_response &challenge_resp); - void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self); + void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding = false); - void broadcast_message(std::string_view message, const bool send_to_self, const comm::comm_session *skipping_session = NULL); + void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding = false, const comm::comm_session *skipping_session = NULL); void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf); void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf); bool validate_for_peer_msg_forwarding(const comm::comm_session &session, const msg::fbuf::p2pmsg::Container *container, const msg::fbuf::p2pmsg::Message &content_message_type); + + void send_connected_status_announcement(flatbuffers::FlatBufferBuilder &fbuf, const bool is_weakly_connected); } // namespace p2p #endif \ No newline at end of file diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 8c274c56..79491d4b 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -77,8 +77,16 @@ namespace p2p // Check whether the message is qualified for forwarding. if (p2p::validate_for_peer_msg_forwarding(session, container, content_message_type)) { - // Forward message to peers. - p2p::broadcast_message(message, false, &session); + if (session.is_weakly_connected) + { + // Forward messages received by weakly connected nodes to other peers. + p2p::broadcast_message(message, false, false, &session); + } + else + { + // Forward message received from other nodes to weakly connected peers. + p2p::broadcast_message(message, false, true, &session); + } } if (content_message_type == p2pmsg::Message_Peer_Challenge_Message) // message is a peer challenge announcement @@ -150,14 +158,21 @@ namespace p2p LOG_DEBUG << "NPL message enqueue failure. " << session.uniqueid.substr(0, 10); } } + else if (content_message_type == p2pmsg::Message_Connected_Status_Announcement_Message) // This message is the connected status announcement message. + { + const p2pmsg::Connected_Status_Announcement_Message *announcement_msg = content->message_as_Connected_Status_Announcement_Message(); + session.is_weakly_connected = announcement_msg->is_weakly_connected(); + if (session.is_weakly_connected) + { + LOG_DEBUG << "Weakly connected announcement received from " << session.uniqueid.substr(0, 10); + } + else + { + LOG_DEBUG << "Strongly connected announcement received from " << session.uniqueid.substr(0, 10); + } + } else if (content_message_type == p2pmsg::Message_State_Request_Message) { - if (p2pmsg::validate_container_trust(container) != 0) - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); - LOG_DEBUG << "State request message rejected due to trust failure. " << session.uniqueid.substr(0, 10); - return 0; - } // Insert request with lock. std::scoped_lock lock(ctx.collected_msgs.state_requests_mutex); @@ -166,13 +181,6 @@ namespace p2p } else if (content_message_type == p2pmsg::Message_State_Response_Message) { - if (p2pmsg::validate_container_trust(container) != 0) - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); - LOG_DEBUG << "State response message rejected due to trust failure. " << session.uniqueid.substr(0, 10); - return 0; - } - if (state_sync::ctx.is_syncing) // Only accept state responses if state is syncing. { // Insert state_response with lock. @@ -183,26 +191,12 @@ namespace p2p } else if (content_message_type == p2pmsg::Message_History_Request_Message) //message is a lcl history request message { - if (p2pmsg::validate_container_trust(container) != 0) - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); - LOG_DEBUG << "History request message rejected due to trust failure. " << session.uniqueid.substr(0, 10); - return 0; - } - const p2p::history_request hr = p2pmsg::create_history_request_from_msg(*content->message_as_History_Request_Message()); std::scoped_lock lock(ledger::sync_ctx.list_mutex); ledger::sync_ctx.collected_history_requests.push_back(std::make_pair(session.uniqueid, std::move(hr))); } else if (content_message_type == p2pmsg::Message_History_Response_Message) //message is a lcl history response message { - if (p2pmsg::validate_container_trust(container) != 0) - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); - LOG_DEBUG << "History response message rejected due to trust failure. " << session.uniqueid.substr(0, 10); - return 0; - } - const p2p::history_response hr = p2pmsg::create_history_response_from_msg(*content->message_as_History_Response_Message()); std::scoped_lock lock(ledger::sync_ctx.list_mutex); ledger::sync_ctx.collected_history_responses.push_back(std::move(hr));