#include "../pchheader.hpp" #include "../conf.hpp" #include "../consensus.hpp" #include "../crypto.hpp" #include "../util/util.hpp" #include "../util/rollover_hashset.hpp" #include "../hplog.hpp" #include "../msg/fbuf/p2pmsg_container_generated.h" #include "../msg/fbuf/p2pmsg_content_generated.h" #include "../msg/fbuf/p2pmsg_helpers.hpp" #include "../msg/fbuf/common_helpers.hpp" #include "../ledger/ledger.hpp" #include "peer_comm_session.hpp" #include "p2p.hpp" #include "../unl.hpp" namespace p2pmsg = msg::fbuf::p2pmsg; namespace p2p { // The set of recent peer message hashes used for duplicate detection. util::rollover_hashset recent_peermsg_hashes(200); /** * This gets hit every time a peer connects to HP via the peer port (configured in config). * @param session connected session. * @return returns 0 if connection is successful and peer challenge is sent otherwise, -1. */ int handle_peer_connect(p2p::peer_comm_session &session) { // Skip new inbound connection if max inbound connection cap is reached. if (session.is_inbound && get_available_capacity() == 0) { LOG_DEBUG << "Max peer connection cap reached. Rejecting new peer connection [" << session.display_name() << "]"; return -1; } // Send peer challenge. flatbuffers::FlatBufferBuilder fbuf(1024); p2pmsg::create_msg_from_peer_challenge(fbuf, session.issued_challenge); std::string_view msg = std::string_view( reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); session.send(msg); session.challenge_status = comm::CHALLENGE_ISSUED; return 0; } /** * Peer session on message callback method. Validate and handle each type of peer messages. * @return 0 on normal execution. -1 when session needs to be closed as a result of message handling. */ int handle_peer_message(p2p::peer_comm_session &session, std::string_view message) { // Adding message size to peer message characters(bytes) per minute counter. session.increment_metric(comm::SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, message.size()); const p2pmsg::Container *container; if (p2pmsg::validate_and_extract_container(&container, message) != 0) return 0; //Get serialised message content. const flatbuffers::Vector *container_content = container->content(); //Accessing message content and size. const uint8_t *content_ptr = container_content->Data(); const flatbuffers::uoffset_t content_size = container_content->size(); const p2pmsg::Content *content; if (p2pmsg::validate_and_extract_content(&content, content_ptr, content_size) != 0) return 0; if (!recent_peermsg_hashes.try_emplace(crypto::get_hash(message))) { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_DUPMSGS_PER_MINUTE, 1); LOG_DEBUG << "Duplicate peer message. " << session.display_name(); return 0; } const p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc // Check whether the message is qualified for message forwarding. if (p2p::validate_for_peer_msg_forwarding(session, container, content_message_type)) { // Npl messages and consensus proposals are forwarded only to unl nodes if relavent flags (npl and consensus) are set to private. // If consensus and npl flags are public, these messages are forward to all the connected nodes. const bool unl_only = (!conf::cfg.contract.is_npl_public && content_message_type == p2pmsg::Message_Npl_Message) || (!conf::cfg.contract.is_consensus_public && content_message_type == p2pmsg::Message_Proposal_Message); if (session.need_consensus_msg_forwarding) { // Forward messages received by weakly connected nodes to other peers. p2p::broadcast_message(message, false, false, unl_only, &session); } else { // Forward message received from other nodes to weakly connected peers. p2p::broadcast_message(message, false, true, unl_only, &session); } } if (content_message_type == p2pmsg::Message_Peer_Challenge_Message) // message is a peer challenge announcement { const p2p::peer_challenge chall = p2pmsg::get_peer_challenge_from_msg(*content->message_as_Peer_Challenge_Message()); // Check whether contract ids match. if (chall.contract_id != conf::cfg.contract.id) { LOG_ERROR << "Contract id mismatch. Dropping connection " << session.display_name(); return -1; } // Remember the roundtime reported by this peer. session.reported_roundtime = chall.roundtime; // Sending the challenge response to the sender. flatbuffers::FlatBufferBuilder fbuf(1024); p2pmsg::create_peer_challenge_response_from_challenge(fbuf, chall.challenge); std::string_view msg = std::string_view( reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); return session.send(msg); } else if (content_message_type == p2pmsg::Message_Peer_Challenge_Response_Message) // message is a peer challenge response { // Ignore if challenge is already resolved. if (session.challenge_status == comm::CHALLENGE_ISSUED) { const p2p::peer_challenge_response challenge_resp = p2pmsg::create_peer_challenge_response_from_msg(*content->message_as_Peer_Challenge_Response_Message(), container->pubkey()); return p2p::resolve_peer_challenge(session, challenge_resp); } } if (session.challenge_status != comm::CHALLENGE_VERIFIED) { LOG_DEBUG << "Cannot accept messages. Peer challenge unresolved. " << session.display_name(); return 0; } if (content_message_type == p2pmsg::Message_Peer_List_Response_Message) // This message is the peer list response message. { p2p::merge_peer_list(p2pmsg::create_peer_list_response_from_msg(*content->message_as_Peer_List_Response_Message())); } else if (content_message_type == p2pmsg::Message_Peer_List_Request_Message) // This message is the peer list request message. { p2p::send_known_peer_list(&session); } else if (content_message_type == p2pmsg::Message_Available_Capacity_Announcement_Message) // This message is the available capacity announcement message. { if (session.known_ipport.has_value()) { const p2pmsg::Available_Capacity_Announcement_Message *announcement_msg = content->message_as_Available_Capacity_Announcement_Message(); p2p::update_known_peer_available_capacity(session.known_ipport.value(), announcement_msg->available_capacity(), announcement_msg->timestamp()); } } else if (content_message_type == p2pmsg::Message_Peer_Requirement_Announcement_Message) // This message is a peer requirement announcement message. { const p2pmsg::Peer_Requirement_Announcement_Message *announcement_msg = content->message_as_Peer_Requirement_Announcement_Message(); session.need_consensus_msg_forwarding = announcement_msg->need_consensus_msg_forwarding(); if (session.need_consensus_msg_forwarding) LOG_DEBUG << "Consensus message forwaring is required for " << session.display_name(); else LOG_DEBUG << "Consensus message forwaring is not required for " << session.display_name(); } else if (content_message_type == p2pmsg::Message_Proposal_Message) // message is a proposal message { // We only trust proposals coming from UNL peers. if (p2pmsg::validate_container_trust(container) != 0) { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); LOG_DEBUG << "Proposal rejected due to trust failure. " << session.display_name(); return 0; } if (handle_proposal_message(container, content) != 0) LOG_DEBUG << "Proposal rejected. Maximum proposal count reached. " << session.display_name(); } else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message { if (handle_nonunl_proposal_message(container, content) != 0) LOG_DEBUG << "Nonunl proposal rejected. Maximum nonunl proposal count reached. " << session.display_name(); } else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message { if (p2pmsg::validate_container_trust(container) != 0) { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); LOG_DEBUG << "NPL message rejected due to trust failure. " << session.display_name(); return 0; } handle_npl_message(container, content); } else if (content_message_type == p2pmsg::Message_Hpfs_Request_Message) { const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(content_ptr); const p2p::hpfs_request hr = p2pmsg::create_hpfs_request_from_msg(*content->message_as_Hpfs_Request_Message()); if (hr.mount_id == sc::contract_fs.mount_id) { // Check the cap and insert request with lock. std::scoped_lock lock(ctx.collected_msgs.contract_hpfs_requests_mutex); // If max number of state requests reached skip the rest. if (ctx.collected_msgs.contract_hpfs_requests.size() < p2p::HPFS_REQ_LIST_CAP) { ctx.collected_msgs.contract_hpfs_requests.push_back(std::make_pair(session.pubkey, std::move(hr))); } else { LOG_DEBUG << "Hpfs contract fs request rejected. Maximum hpfs contract fs request count reached. " << session.display_name(); } } else if (hr.mount_id == ledger::ledger_fs.mount_id) { // Check the cap and insert request with lock. std::scoped_lock lock(ctx.collected_msgs.ledger_hpfs_requests_mutex); // If max number of state requests reached skip the rest. if (ctx.collected_msgs.ledger_hpfs_requests.size() < p2p::HPFS_REQ_LIST_CAP) { ctx.collected_msgs.ledger_hpfs_requests.push_back(std::make_pair(session.pubkey, std::move(hr))); } else { LOG_DEBUG << "Hpfs ledger fs request rejected. Maximum hpfs ledger fs request count reached. " << session.display_name(); } } } else if (content_message_type == p2pmsg::Message_Hpfs_Response_Message) { const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(content_ptr); const msg::fbuf::p2pmsg::Hpfs_Response_Message *resp_msg = content->message_as_Hpfs_Response_Message(); // Only accept hpfs responses if hpfs fs is syncing. if (sc::contract_sync_worker.is_syncing && resp_msg->mount_id() == sc::contract_fs.mount_id) { // Check the cap and insert state_response with lock. std::scoped_lock lock(ctx.collected_msgs.contract_hpfs_responses_mutex); // If max number of state responses reached skip the rest. if (ctx.collected_msgs.contract_hpfs_responses.size() < p2p::HPFS_RES_LIST_CAP) { std::string response(reinterpret_cast(content_ptr), content_size); ctx.collected_msgs.contract_hpfs_responses.push_back(std::make_pair(session.uniqueid, std::move(response))); } else { LOG_DEBUG << "Contract hpfs response rejected. Maximum contract hpfs response count reached. " << session.display_name(); } } else if (ledger::ledger_sync_worker.is_syncing && resp_msg->mount_id() == ledger::ledger_fs.mount_id) { // Check the cap and insert state_response with lock. std::scoped_lock lock(ctx.collected_msgs.ledger_hpfs_responses_mutex); // If max number of state responses reached skip the rest. if (ctx.collected_msgs.ledger_hpfs_responses.size() < p2p::HPFS_RES_LIST_CAP) { std::string response(reinterpret_cast(content_ptr), content_size); ctx.collected_msgs.ledger_hpfs_responses.push_back(std::make_pair(session.uniqueid, std::move(response))); } else { LOG_DEBUG << "Ledger hpfs response rejected. Maximum ledger hpfs response count reached. " << session.display_name(); } } } else { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); LOG_DEBUG << "Received invalid peer message type. " << session.display_name(); } return 0; } /** * Handles messages that we receive from ourselves. */ int handle_self_message(std::string_view message) { const p2pmsg::Container *container; if (p2pmsg::validate_and_extract_container(&container, message) != 0) return 0; //Get serialised message content. const flatbuffers::Vector *container_content = container->content(); //Accessing message content and size. const uint8_t *content_ptr = container_content->Data(); const flatbuffers::uoffset_t content_size = container_content->size(); const p2pmsg::Content *content; if (p2pmsg::validate_and_extract_content(&content, content_ptr, content_size) != 0) return 0; const p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc if (content_message_type == p2pmsg::Message_Proposal_Message) // message is a proposal message { if (handle_proposal_message(container, content) != 0) LOG_DEBUG << "Proposal rejected. Maximum proposal count reached. self"; } else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message { if (handle_nonunl_proposal_message(container, content) != 0) LOG_DEBUG << "Nonunl proposal rejected. Maximum nonunl proposal count reached. self"; } else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message handle_npl_message(container, content); return 0; } /** * Handle proposal message. * @param container Message container. * @param content Message content. * @return returns 0 if proposal is pushed to the list, otherwise -1. */ int handle_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content) { // Check the cap and insert proposal with lock. std::scoped_lock lock(ctx.collected_msgs.proposals_mutex); // If max number of proposals reached skip the rest. if (ctx.collected_msgs.proposals.size() == p2p::PROPOSAL_LIST_CAP) return -1; ctx.collected_msgs.proposals.push_back( p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey(), container->timestamp(), container->lcl(), *container->last_primary_shard_id())); return 0; } /** * Handle nonunl proposal message. * @param container Message container. * @param content Message content. * @return returns 0 if nonunl proposal is pushed to the list, otherwise -1. */ int handle_nonunl_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content) { // Check the cap and insert proposal with lock. std::scoped_lock lock(ctx.collected_msgs.nonunl_proposals_mutex); // If max number of nonunl proposals reached skip the rest. if (ctx.collected_msgs.nonunl_proposals.size() == p2p::NONUNL_PROPOSAL_LIST_CAP) return -1; ctx.collected_msgs.nonunl_proposals.push_back( p2pmsg::create_nonunl_proposal_from_msg(*content->message_as_NonUnl_Proposal_Message(), container->timestamp())); return 0; } void handle_npl_message(const p2pmsg::Container *container, const p2pmsg::Content *content) { const p2pmsg::Npl_Message *npl_p2p_msg = content->message_as_Npl_Message(); npl_message msg; msg.data = msg::fbuf::flatbuff_bytes_to_sv(npl_p2p_msg->data()); msg.pubkey = msg::fbuf::flatbuff_bytes_to_sv(container->pubkey()); msg.lcl = msg::fbuf::flatbuff_bytes_to_sv(container->lcl()); if (!consensus::push_npl_message(msg)) { LOG_DEBUG << "NPL message from self enqueue failure."; } } //peer session on message callback method int handle_peer_close(const p2p::peer_comm_session &session) { { // Erase the corresponding pubkey peer connection if it's this session. std::scoped_lock lock(ctx.peer_connections_mutex); const auto itr = ctx.peer_connections.find(session.pubkey); if (itr != ctx.peer_connections.end() && itr->second == &session) { ctx.peer_connections.erase(itr); } } // Update peer properties to default on peer close. if (session.known_ipport.has_value()) p2p::update_known_peer_available_capacity(session.known_ipport.value(), -1, 0); return 0; } /** * Logic related to peer sessions on verfied is invoked here. */ void handle_peer_on_verified(p2p::peer_comm_session &session) { // Sending newly verified node the requirement of consensus msg fowarding if this node is weakly connected. if (p2p::is_weakly_connected) { p2p::send_peer_requirement_announcement(is_weakly_connected, &session); } } } // namespace p2p