P2P Flatbuffers schema refactor. (#255)

This commit is contained in:
Ravin Perera
2021-02-23 13:15:03 +05:30
committed by GitHub
parent 312713c296
commit 137c199b07
36 changed files with 3456 additions and 4211 deletions

View File

@@ -5,9 +5,8 @@
#include "../util/util.hpp"
#include "../util/rollover_hashset.hpp"
#include "../hplog.hpp"
#include "../msg/fbuf/p2pmsg_container_generated.h"
#include "../msg/fbuf/p2pmsg_content_generated.h"
#include "../msg/fbuf/p2pmsg_helpers.hpp"
#include "../msg/fbuf/p2pmsg_generated.h"
#include "../msg/fbuf/p2pmsg_conversion.hpp"
#include "../msg/fbuf/common_helpers.hpp"
#include "../ledger/ledger.hpp"
#include "peer_comm_session.hpp"
@@ -36,7 +35,7 @@ namespace p2p
}
// Send peer challenge.
flatbuffers::FlatBufferBuilder fbuf(1024);
flatbuffers::FlatBufferBuilder fbuf;
p2pmsg::create_msg_from_peer_challenge(fbuf, session.issued_challenge);
std::string_view msg = std::string_view(
reinterpret_cast<const char *>(fbuf.GetBufferPointer()), fbuf.GetSize());
@@ -54,37 +53,35 @@ namespace p2p
// Adding message size to peer message characters(bytes) per minute counter.
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, message.size());
const p2pmsg::Container *container;
if (p2pmsg::validate_and_extract_container(&container, message) != 0)
if (!p2pmsg::verify_peer_message(message))
{
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1);
LOG_DEBUG << "Flatbuffer verify: Bad peer message.";
return 0;
}
//Get serialised message content.
const flatbuffers::Vector<uint8_t> *container_content = container->content();
const peer_message_info mi = p2pmsg::get_peer_message_info(message);
//Accessing message content and size.
const uint8_t *content_ptr = container_content->Data();
const flatbuffers::uoffset_t content_size = container_content->size();
const p2pmsg::Content *content;
if (p2pmsg::validate_and_extract_content(&content, content_ptr, content_size) != 0)
if (mi.type == p2pmsg::P2PMsgContent_NONE)
{
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1);
LOG_DEBUG << "Received invalid peer message type. " << session.display_name();
return 0;
if (!recent_peermsg_hashes.try_emplace(crypto::get_hash(message)))
}
else if (!recent_peermsg_hashes.try_emplace(crypto::get_hash(message)))
{
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_DUPMSGS_PER_MINUTE, 1);
LOG_DEBUG << "Duplicate peer message. " << session.display_name();
return 0;
}
const p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc
// Check whether the message is qualified for message forwarding.
if (p2p::validate_for_peer_msg_forwarding(session, container, content_message_type))
if (p2p::validate_for_peer_msg_forwarding(session, mi.type, mi.originated_on))
{
// Npl messages and consensus proposals are forwarded only to unl nodes if relavent flags (npl and consensus) are set to private.
// If consensus and npl flags are public, these messages are forward to all the connected nodes.
const bool unl_only = (!conf::cfg.contract.is_npl_public && content_message_type == p2pmsg::Message_Npl_Message) ||
(!conf::cfg.contract.is_consensus_public && content_message_type == p2pmsg::Message_Proposal_Message);
const bool unl_only = (!conf::cfg.contract.is_npl_public && mi.type == p2pmsg::P2PMsgContent_NplMsg) ||
(!conf::cfg.contract.is_consensus_public && mi.type == p2pmsg::P2PMsgContent_ProposalMsg);
if (session.need_consensus_msg_forwarding)
{
// Forward messages received by weakly connected nodes to other peers.
@@ -97,9 +94,9 @@ namespace p2p
}
}
if (content_message_type == p2pmsg::Message_Peer_Challenge_Message) // message is a peer challenge announcement
if (mi.type == p2pmsg::P2PMsgContent_PeerChallengeMsg)
{
const p2p::peer_challenge chall = p2pmsg::get_peer_challenge_from_msg(*content->message_as_Peer_Challenge_Message());
const p2p::peer_challenge chall = p2pmsg::create_peer_challenge_from_msg(mi);
// Check whether contract ids match.
if (chall.contract_id != conf::cfg.contract.id)
@@ -112,20 +109,15 @@ namespace p2p
session.reported_roundtime = chall.roundtime;
// Sending the challenge response to the sender.
flatbuffers::FlatBufferBuilder fbuf(1024);
flatbuffers::FlatBufferBuilder fbuf;
p2pmsg::create_peer_challenge_response_from_challenge(fbuf, chall.challenge);
std::string_view msg = std::string_view(
reinterpret_cast<const char *>(fbuf.GetBufferPointer()), fbuf.GetSize());
return session.send(msg);
return session.send(msg::fbuf::builder_to_string_view(fbuf));
}
else if (content_message_type == p2pmsg::Message_Peer_Challenge_Response_Message) // message is a peer challenge response
else if (mi.type == p2pmsg::P2PMsgContent_PeerChallengeResponseMsg)
{
// Ignore if challenge is already resolved.
if (session.challenge_status == comm::CHALLENGE_ISSUED)
{
const p2p::peer_challenge_response challenge_resp = p2pmsg::create_peer_challenge_response_from_msg(*content->message_as_Peer_Challenge_Response_Message(), container->pubkey());
return p2p::resolve_peer_challenge(session, challenge_resp);
}
return p2p::resolve_peer_challenge(session, p2pmsg::create_peer_challenge_response_from_msg(mi));
}
if (session.challenge_status != comm::CHALLENGE_VERIFIED)
@@ -134,64 +126,57 @@ namespace p2p
return 0;
}
if (content_message_type == p2pmsg::Message_Peer_List_Response_Message) // This message is the peer list response message.
if (mi.type == p2pmsg::P2PMsgContent_PeerListResponseMsg)
{
p2p::merge_peer_list(p2pmsg::create_peer_list_response_from_msg(*content->message_as_Peer_List_Response_Message()));
p2p::merge_peer_list(p2pmsg::create_peer_list_response_from_msg(mi));
}
else if (content_message_type == p2pmsg::Message_Peer_List_Request_Message) // This message is the peer list request message.
else if (mi.type == p2pmsg::P2PMsgContent_PeerListRequestMsg)
{
p2p::send_known_peer_list(&session);
}
else if (content_message_type == p2pmsg::Message_Available_Capacity_Announcement_Message) // This message is the available capacity announcement message.
else if (mi.type == p2pmsg::P2PMsgContent_PeerCapacityAnnouncementMsg)
{
if (session.known_ipport.has_value())
{
const p2pmsg::Available_Capacity_Announcement_Message *announcement_msg = content->message_as_Available_Capacity_Announcement_Message();
p2p::update_known_peer_available_capacity(session.known_ipport.value(), announcement_msg->available_capacity(), announcement_msg->timestamp());
const p2p::peer_capacity_announcement ann = p2pmsg::create_peer_capacity_announcement_from_msg(mi);
p2p::update_known_peer_available_capacity(session.known_ipport.value(), ann.available_capacity, ann.timestamp);
}
}
else if (content_message_type == p2pmsg::Message_Peer_Requirement_Announcement_Message) // This message is a peer requirement announcement message.
else if (mi.type == p2pmsg::P2PMsgContent_PeerRequirementAnnouncementMsg)
{
const p2pmsg::Peer_Requirement_Announcement_Message *announcement_msg = content->message_as_Peer_Requirement_Announcement_Message();
session.need_consensus_msg_forwarding = announcement_msg->need_consensus_msg_forwarding();
if (session.need_consensus_msg_forwarding)
LOG_DEBUG << "Consensus message forwaring is required for " << session.display_name();
else
LOG_DEBUG << "Consensus message forwaring is not required for " << session.display_name();
const p2p::peer_requirement_announcement ann = p2pmsg::create_peer_requirement_announcement_from_msg(mi);
session.need_consensus_msg_forwarding = ann.need_consensus_msg_forwarding;
LOG_DEBUG << "Peer requirement: " << session.display_name() << " consensus msg forwarding:" << ann.need_consensus_msg_forwarding;
}
else if (content_message_type == p2pmsg::Message_Proposal_Message) // message is a proposal message
else if (mi.type == p2pmsg::P2PMsgContent_NonUnlProposalMsg)
{
// We only trust proposals coming from UNL peers.
if (p2pmsg::validate_container_trust(container) != 0)
handle_nonunl_proposal_message(p2pmsg::create_nonunl_proposal_from_msg(mi));
}
else if (mi.type == p2pmsg::P2PMsgContent_ProposalMsg)
{
if (!p2pmsg::verify_proposal_msg_signature(mi))
{
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1);
LOG_DEBUG << "Proposal rejected due to trust failure. " << session.display_name();
return 0;
}
if (handle_proposal_message(container, content) != 0)
LOG_DEBUG << "Proposal rejected. Maximum proposal count reached. " << session.display_name();
handle_proposal_message(p2pmsg::create_proposal_from_msg(mi));
}
else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message
else if (mi.type == p2pmsg::P2PMsgContent_NplMsg)
{
if (handle_nonunl_proposal_message(container, content) != 0)
LOG_DEBUG << "Nonunl proposal rejected. Maximum nonunl proposal count reached. " << session.display_name();
}
else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message
{
if (p2pmsg::validate_container_trust(container) != 0)
if (!p2pmsg::verify_npl_msg_signature(mi))
{
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1);
LOG_DEBUG << "NPL message rejected due to trust failure. " << session.display_name();
LOG_DEBUG << "Npl message rejected due to trust failure. " << session.display_name();
return 0;
}
handle_npl_message(container, content);
handle_npl_message(p2pmsg::create_npl_from_msg(mi));
}
else if (content_message_type == p2pmsg::Message_Hpfs_Request_Message)
else if (mi.type == p2pmsg::P2PMsgContent_HpfsRequestMsg)
{
const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(content_ptr);
const p2p::hpfs_request hr = p2pmsg::create_hpfs_request_from_msg(*content->message_as_Hpfs_Request_Message());
const p2p::hpfs_request hr = p2pmsg::create_hpfs_request_from_msg(mi);
if (hr.mount_id == sc::contract_fs.mount_id)
{
// Check the cap and insert request with lock.
@@ -199,13 +184,9 @@ namespace p2p
// If max number of state requests reached skip the rest.
if (ctx.collected_msgs.contract_hpfs_requests.size() < p2p::HPFS_REQ_LIST_CAP)
{
ctx.collected_msgs.contract_hpfs_requests.push_back(std::make_pair(session.pubkey, std::move(hr)));
}
else
{
LOG_DEBUG << "Hpfs contract fs request rejected. Maximum hpfs contract fs request count reached. " << session.display_name();
}
}
else if (hr.mount_id == ledger::ledger_fs.mount_id)
{
@@ -214,52 +195,37 @@ namespace p2p
// If max number of state requests reached skip the rest.
if (ctx.collected_msgs.ledger_hpfs_requests.size() < p2p::HPFS_REQ_LIST_CAP)
{
ctx.collected_msgs.ledger_hpfs_requests.push_back(std::make_pair(session.pubkey, std::move(hr)));
}
else
{
LOG_DEBUG << "Hpfs ledger fs request rejected. Maximum hpfs ledger fs request count reached. " << session.display_name();
}
}
}
else if (content_message_type == p2pmsg::Message_Hpfs_Response_Message)
else if (mi.type == p2pmsg::P2PMsgContent_HpfsResponseMsg)
{
const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(content_ptr);
const msg::fbuf::p2pmsg::Hpfs_Response_Message *resp_msg = content->message_as_Hpfs_Response_Message();
const p2pmsg::HpfsResponseMsg &resp_msg = *mi.p2p_msg->content_as_HpfsResponseMsg();
// Only accept hpfs responses if hpfs fs is syncing.
if (sc::contract_sync_worker.is_syncing && resp_msg->mount_id() == sc::contract_fs.mount_id)
if (sc::contract_sync_worker.is_syncing && resp_msg.mount_id() == sc::contract_fs.mount_id)
{
// Check the cap and insert state_response with lock.
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.contract_hpfs_responses_mutex);
// If max number of state responses reached skip the rest.
if (ctx.collected_msgs.contract_hpfs_responses.size() < p2p::HPFS_RES_LIST_CAP)
{
std::string response(reinterpret_cast<const char *>(content_ptr), content_size);
ctx.collected_msgs.contract_hpfs_responses.push_back(std::make_pair(session.uniqueid, std::move(response)));
}
ctx.collected_msgs.contract_hpfs_responses.push_back(std::make_pair(session.uniqueid, std::string(message)));
else
{
LOG_DEBUG << "Contract hpfs response rejected. Maximum contract hpfs response count reached. " << session.display_name();
}
LOG_DEBUG << "Contract hpfs response rejected. Maximum response count reached. " << session.display_name();
}
else if (ledger::ledger_sync_worker.is_syncing && resp_msg->mount_id() == ledger::ledger_fs.mount_id)
else if (ledger::ledger_sync_worker.is_syncing && resp_msg.mount_id() == ledger::ledger_fs.mount_id)
{
// Check the cap and insert state_response with lock.
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.ledger_hpfs_responses_mutex);
// If max number of state responses reached skip the rest.
if (ctx.collected_msgs.ledger_hpfs_responses.size() < p2p::HPFS_RES_LIST_CAP)
{
std::string response(reinterpret_cast<const char *>(content_ptr), content_size);
ctx.collected_msgs.ledger_hpfs_responses.push_back(std::make_pair(session.uniqueid, std::move(response)));
}
ctx.collected_msgs.ledger_hpfs_responses.push_back(std::make_pair(session.uniqueid, std::string(message)));
else
{
LOG_DEBUG << "Ledger hpfs response rejected. Maximum ledger hpfs response count reached. " << session.display_name();
}
LOG_DEBUG << "Ledger hpfs response rejected. Maximum response count reached. " << session.display_name();
}
}
else
@@ -275,96 +241,18 @@ namespace p2p
*/
int handle_self_message(std::string_view message)
{
const p2pmsg::Container *container;
if (p2pmsg::validate_and_extract_container(&container, message) != 0)
return 0;
const peer_message_info mi = p2pmsg::get_peer_message_info(message);
//Get serialised message content.
const flatbuffers::Vector<uint8_t> *container_content = container->content();
//Accessing message content and size.
const uint8_t *content_ptr = container_content->Data();
const flatbuffers::uoffset_t content_size = container_content->size();
const p2pmsg::Content *content;
if (p2pmsg::validate_and_extract_content(&content, content_ptr, content_size) != 0)
return 0;
const p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc
if (content_message_type == p2pmsg::Message_Proposal_Message) // message is a proposal message
{
if (handle_proposal_message(container, content) != 0)
LOG_DEBUG << "Proposal rejected. Maximum proposal count reached. self";
}
else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message
{
if (handle_nonunl_proposal_message(container, content) != 0)
LOG_DEBUG << "Nonunl proposal rejected. Maximum nonunl proposal count reached. self";
}
else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message
handle_npl_message(container, content);
if (mi.type == p2pmsg::P2PMsgContent_ProposalMsg)
handle_proposal_message(p2pmsg::create_proposal_from_msg(mi));
else if (mi.type == p2pmsg::P2PMsgContent_NonUnlProposalMsg)
handle_nonunl_proposal_message(p2pmsg::create_nonunl_proposal_from_msg(mi));
else if (mi.type == p2pmsg::P2PMsgContent_NplMsg)
handle_npl_message(p2pmsg::create_npl_from_msg(mi));
return 0;
}
/**
* Handle proposal message.
* @param container Message container.
* @param content Message content.
* @return returns 0 if proposal is pushed to the list, otherwise -1.
*/
int handle_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content)
{
// Check the cap and insert proposal with lock.
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.proposals_mutex);
// If max number of proposals reached skip the rest.
if (ctx.collected_msgs.proposals.size() == p2p::PROPOSAL_LIST_CAP)
return -1;
ctx.collected_msgs.proposals.push_back(
p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey(), container->timestamp(), *container->last_primary_shard_id()));
return 0;
}
/**
* Handle nonunl proposal message.
* @param container Message container.
* @param content Message content.
* @return returns 0 if nonunl proposal is pushed to the list, otherwise -1.
*/
int handle_nonunl_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content)
{
// Check the cap and insert proposal with lock.
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.nonunl_proposals_mutex);
// If max number of nonunl proposals reached skip the rest.
if (ctx.collected_msgs.nonunl_proposals.size() == p2p::NONUNL_PROPOSAL_LIST_CAP)
return -1;
ctx.collected_msgs.nonunl_proposals.push_back(
p2pmsg::create_nonunl_proposal_from_msg(*content->message_as_NonUnl_Proposal_Message(), container->timestamp()));
return 0;
}
void handle_npl_message(const p2pmsg::Container *container, const p2pmsg::Content *content)
{
const p2pmsg::Npl_Message *npl_p2p_msg = content->message_as_Npl_Message();
npl_message msg;
msg.data = msg::fbuf::flatbuff_bytes_to_sv(npl_p2p_msg->data());
msg.pubkey = msg::fbuf::flatbuff_bytes_to_sv(container->pubkey());
msg.last_primary_shard_id.seq_no = container->last_primary_shard_id()->shard_seq_no();
msg.last_primary_shard_id.hash = msg::fbuf::flatbuff_bytes_to_hash(container->last_primary_shard_id()->shard_hash());
if (!consensus::push_npl_message(msg))
{
LOG_DEBUG << "NPL message from self enqueue failure.";
}
}
//peer session on message callback method
int handle_peer_close(const p2p::peer_comm_session &session)
{