Networking fixes related to peer connectivity issues (#384)

This commit is contained in:
Kithmini Gunawardhana
2023-09-23 10:03:45 +05:30
committed by GitHub
parent 6a2384cd03
commit e7e1268a99
23 changed files with 393 additions and 98 deletions

View File

@@ -30,10 +30,11 @@ namespace comm
VIOLATION_MSG_READ = 1,
VIOLATION_READ_ERROR = 2,
VIOLATION_THRESHOLD_EXCEEDED = 3,
VIOLATION_INACTIVITY = 4
VIOLATION_INACTIVITY = 4,
VIOLATION_IRRELEVANT_KNOWN_PEER = 5
};
/**
/**
* Represents an active WebSocket connection
*/
class comm_session
@@ -63,7 +64,7 @@ namespace comm
std::string uniqueid; // Verified session: Pubkey in hex format, Unverified session: IP address.
std::string pubkey; // Pubkey in binary format.
const bool is_inbound;
const bool is_ipv4; // Whether the host is ipv4 or ipv6.
const bool is_ipv4; // Whether the host is ipv4 or ipv6.
const std::string host_address; // Connection host address of the remote party.
std::string issued_challenge;
SESSION_STATE state = SESSION_STATE::NONE;
@@ -71,7 +72,7 @@ namespace comm
uint64_t last_activity_timestamp; // Keep track of the last activity timestamp in milliseconds.
comm_session(corebill::tracker &violation_tracker,
std::string_view host_address, hpws::client &&hpws_client, const bool is_ipv4, const bool is_inbound, const uint64_t (&metric_thresholds)[5]);
std::string_view host_address, hpws::client &&hpws_client, const bool is_ipv4, const bool is_inbound, const uint64_t (&metric_thresholds)[5]);
int init();
int process_next_inbound_message(const uint16_t priority);
int send(const std::vector<uint8_t> &message, const uint16_t priority = 2);

View File

@@ -862,10 +862,7 @@ namespace conf
*/
int persist_updated_configs()
{
const bool contains_updated_config = cfg.mesh.peer_discovery.enabled;
bool changes_made = false;
if (!contains_updated_config)
return 0;
// Read the original config into a temp struct.
hp_config temp_cfg;
@@ -875,7 +872,7 @@ namespace conf
// Apply any actual changes to the temp struct.
// Apply known peer list updates.
if (conf::cfg.mesh.peer_discovery.enabled && !cfg.mesh.known_peers.empty())
if (!cfg.mesh.known_peers.empty())
{
temp_cfg.mesh.known_peers = cfg.mesh.known_peers;
changes_made = true;

View File

@@ -791,7 +791,12 @@ namespace consensus
{
std::scoped_lock lock(ctx.contract_ctx_mutex);
if (ctx.contract_ctx)
return ctx.contract_ctx->args.npl_messages.try_enqueue(std::move(npl_msg));
{
if (ctx.contract_ctx->args.lcl_id == npl_msg.lcl_id)
return ctx.contract_ctx->args.npl_messages.try_enqueue(std::move(npl_msg));
else
LOG_DEBUG << "Trying to add irrelevant NPL from " << util::to_hex(npl_msg.pubkey) << " | lcl-seq: " << npl_msg.lcl_id.seq_no;
}
return false;
}

View File

@@ -11,7 +11,6 @@ namespace msg::controlmsg
constexpr const char *FLD_REMOVE = "remove";
// Message types
constexpr const char *MSGTYPE_CONTRACT_END = "contract_end";
constexpr const char *MSGTYPE_PEER_CHANGESET = "peer_changeset";
} // namespace msg::controlmsg

View File

@@ -16,9 +16,9 @@ namespace msg::controlmsg
return jctlmsg::extract_type(extracted_type, jdoc);
}
int controlmsg_parser::extract_peer_changeset(std::vector<p2p::peer_properties> &added_peers, std::vector<p2p::peer_properties> &removed_peers) const
int controlmsg_parser::extract_peer_changeset(std::vector<p2p::peer_properties> &added_peers, std::vector<p2p::peer_properties> &removed_peers, bool &overwrite) const
{
return jctlmsg::extract_peer_changeset(added_peers, removed_peers, jdoc);
return jctlmsg::extract_peer_changeset(added_peers, removed_peers, overwrite, jdoc);
}
} // namespace msg::controlmsg

View File

@@ -13,7 +13,7 @@ namespace msg::controlmsg
public:
int parse(std::string_view message);
int extract_type(std::string &extracted_type) const;
int extract_peer_changeset(std::vector<p2p::peer_properties> &added_peers, std::vector<p2p::peer_properties> &removed_peers) const;
int extract_peer_changeset(std::vector<p2p::peer_properties> &added_peers, std::vector<p2p::peer_properties> &removed_peers, bool &overwrite) const;
};
} // namespace msg::controlmsg

View File

@@ -16,7 +16,8 @@ union P2PMsgContent {
PeerListRequestMsg,
PeerListResponseMsg,
HpfsLogRequest,
HpfsLogResponse
HpfsLogResponse,
SuppressMsg
}
table P2PMsg {
@@ -82,6 +83,13 @@ table NplMsg {
// Make sure to update signature generation/verification whenever these fields are changed.
}
enum SuppressReason : byte { ContractIdMismatch = 0 }
table SuppressMsg {
pubkey:[ubyte]; // Sender pubkey.
reason: SuppressReason;
}
//--hpfs requests and responses--//
enum HpfsFsEntryResponseType : byte { Matched = 0, Mismatched = 1, Responded = 2, NotAvailable = 3 }

View File

@@ -298,6 +298,14 @@ namespace msg::fbuf::p2pmsg
return map;
}
const p2p::suppress_message create_suppress_from_msg(const p2p::peer_message_info &mi)
{
const auto &msg = *mi.p2p_msg->content_as_SuppressMsg();
return {
std::string(flatbuf_bytes_to_sv(msg.pubkey())),
(p2p::SUPPRESS_REASON)msg.reason()};
}
void flatbuf_hpfsfshashentries_to_hpfsfshashentries(std::vector<p2p::hpfs_fs_hash_entry> &fs_entries, const flatbuffers::Vector<flatbuffers::Offset<HpfsFSHashEntry>> *fhashes)
{
for (const HpfsFSHashEntry *f_hash : *fhashes)
@@ -602,6 +610,16 @@ namespace msg::fbuf::p2pmsg
create_p2p_msg(builder, P2PMsgContent_PeerListResponseMsg, msg.Union());
}
void create_suppress_msg(flatbuffers::FlatBufferBuilder &builder, const uint8_t reason)
{
const auto msg = CreateSuppressMsg(
builder,
sv_to_flatbuf_bytes(builder, conf::cfg.node.public_key),
(SuppressReason)reason);
create_p2p_msg(builder, P2PMsgContent_SuppressMsg, msg.Union());
}
const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<UserInputGroup>>>
user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map<std::string, std::list<usr::submitted_user_input>> &map)
{

View File

@@ -42,6 +42,8 @@ namespace msg::fbuf::p2pmsg
const p2p::hpfs_log_response create_hpfs_log_response_from_msg(const p2p::peer_message_info &mi);
const p2p::suppress_message create_suppress_from_msg(const p2p::peer_message_info &mi);
util::sequence_hash flatbuf_seqhash_to_seqhash(const msg::fbuf::p2pmsg::SequenceHash *fbseqhash);
const std::set<std::string> flatbuf_bytearrayvector_to_stringlist(const flatbuffers::Vector<flatbuffers::Offset<ByteArray>> *fbvec);
@@ -101,6 +103,8 @@ namespace msg::fbuf::p2pmsg
void create_msg_from_peer_list_response(flatbuffers::FlatBufferBuilder &builder, const std::vector<p2p::peer_properties> &peers, const std::optional<conf::peer_ip_port> &skipping_ip_port);
void create_suppress_msg(flatbuffers::FlatBufferBuilder &builder, const uint8_t reason);
const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<UserInputGroup>>>
user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map<std::string, std::list<usr::submitted_user_input>> &map);

View File

@@ -34,6 +34,9 @@ struct ProposalMsgBuilder;
struct NplMsg;
struct NplMsgBuilder;
struct SuppressMsg;
struct SuppressMsgBuilder;
struct HpfsFSHashEntry;
struct HpfsFSHashEntryBuilder;
@@ -100,11 +103,12 @@ enum P2PMsgContent {
P2PMsgContent_PeerListResponseMsg = 11,
P2PMsgContent_HpfsLogRequest = 12,
P2PMsgContent_HpfsLogResponse = 13,
P2PMsgContent_SuppressMsg = 14,
P2PMsgContent_MIN = P2PMsgContent_NONE,
P2PMsgContent_MAX = P2PMsgContent_HpfsLogResponse
P2PMsgContent_MAX = P2PMsgContent_SuppressMsg
};
inline const P2PMsgContent (&EnumValuesP2PMsgContent())[14] {
inline const P2PMsgContent (&EnumValuesP2PMsgContent())[15] {
static const P2PMsgContent values[] = {
P2PMsgContent_NONE,
P2PMsgContent_PeerChallengeMsg,
@@ -119,13 +123,14 @@ inline const P2PMsgContent (&EnumValuesP2PMsgContent())[14] {
P2PMsgContent_PeerListRequestMsg,
P2PMsgContent_PeerListResponseMsg,
P2PMsgContent_HpfsLogRequest,
P2PMsgContent_HpfsLogResponse
P2PMsgContent_HpfsLogResponse,
P2PMsgContent_SuppressMsg
};
return values;
}
inline const char * const *EnumNamesP2PMsgContent() {
static const char * const names[15] = {
static const char * const names[16] = {
"NONE",
"PeerChallengeMsg",
"PeerChallengeResponseMsg",
@@ -140,13 +145,14 @@ inline const char * const *EnumNamesP2PMsgContent() {
"PeerListResponseMsg",
"HpfsLogRequest",
"HpfsLogResponse",
"SuppressMsg",
nullptr
};
return names;
}
inline const char *EnumNameP2PMsgContent(P2PMsgContent e) {
if (flatbuffers::IsOutRange(e, P2PMsgContent_NONE, P2PMsgContent_HpfsLogResponse)) return "";
if (flatbuffers::IsOutRange(e, P2PMsgContent_NONE, P2PMsgContent_SuppressMsg)) return "";
const size_t index = static_cast<size_t>(e);
return EnumNamesP2PMsgContent()[index];
}
@@ -207,9 +213,40 @@ template<> struct P2PMsgContentTraits<msg::fbuf::p2pmsg::HpfsLogResponse> {
static const P2PMsgContent enum_value = P2PMsgContent_HpfsLogResponse;
};
template<> struct P2PMsgContentTraits<msg::fbuf::p2pmsg::SuppressMsg> {
static const P2PMsgContent enum_value = P2PMsgContent_SuppressMsg;
};
bool VerifyP2PMsgContent(flatbuffers::Verifier &verifier, const void *obj, P2PMsgContent type);
bool VerifyP2PMsgContentVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector<flatbuffers::Offset<void>> *values, const flatbuffers::Vector<uint8_t> *types);
enum SuppressReason {
SuppressReason_ContractIdMismatch = 0,
SuppressReason_MIN = SuppressReason_ContractIdMismatch,
SuppressReason_MAX = SuppressReason_ContractIdMismatch
};
inline const SuppressReason (&EnumValuesSuppressReason())[1] {
static const SuppressReason values[] = {
SuppressReason_ContractIdMismatch
};
return values;
}
inline const char * const *EnumNamesSuppressReason() {
static const char * const names[2] = {
"ContractIdMismatch",
nullptr
};
return names;
}
inline const char *EnumNameSuppressReason(SuppressReason e) {
if (flatbuffers::IsOutRange(e, SuppressReason_ContractIdMismatch, SuppressReason_ContractIdMismatch)) return "";
const size_t index = static_cast<size_t>(e);
return EnumNamesSuppressReason()[index];
}
enum HpfsFsEntryResponseType {
HpfsFsEntryResponseType_Matched = 0,
HpfsFsEntryResponseType_Mismatched = 1,
@@ -415,6 +452,9 @@ struct P2PMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
const msg::fbuf::p2pmsg::HpfsLogResponse *content_as_HpfsLogResponse() const {
return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_HpfsLogResponse ? static_cast<const msg::fbuf::p2pmsg::HpfsLogResponse *>(content()) : nullptr;
}
const msg::fbuf::p2pmsg::SuppressMsg *content_as_SuppressMsg() const {
return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_SuppressMsg ? static_cast<const msg::fbuf::p2pmsg::SuppressMsg *>(content()) : nullptr;
}
void *mutable_content() {
return GetPointer<void *>(VT_CONTENT);
}
@@ -482,6 +522,10 @@ template<> inline const msg::fbuf::p2pmsg::HpfsLogResponse *P2PMsg::content_as<m
return content_as_HpfsLogResponse();
}
template<> inline const msg::fbuf::p2pmsg::SuppressMsg *P2PMsg::content_as<msg::fbuf::p2pmsg::SuppressMsg>() const {
return content_as_SuppressMsg();
}
struct P2PMsgBuilder {
typedef P2PMsg Table;
flatbuffers::FlatBufferBuilder &fbb_;
@@ -1338,6 +1382,76 @@ inline flatbuffers::Offset<NplMsg> CreateNplMsgDirect(
lcl_id);
}
struct SuppressMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef SuppressMsgBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_PUBKEY = 4,
VT_REASON = 6
};
const flatbuffers::Vector<uint8_t> *pubkey() const {
return GetPointer<const flatbuffers::Vector<uint8_t> *>(VT_PUBKEY);
}
flatbuffers::Vector<uint8_t> *mutable_pubkey() {
return GetPointer<flatbuffers::Vector<uint8_t> *>(VT_PUBKEY);
}
msg::fbuf::p2pmsg::SuppressReason reason() const {
return static_cast<msg::fbuf::p2pmsg::SuppressReason>(GetField<int8_t>(VT_REASON, 0));
}
bool mutate_reason(msg::fbuf::p2pmsg::SuppressReason _reason) {
return SetField<int8_t>(VT_REASON, static_cast<int8_t>(_reason), 0);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyOffset(verifier, VT_PUBKEY) &&
verifier.VerifyVector(pubkey()) &&
VerifyField<int8_t>(verifier, VT_REASON) &&
verifier.EndTable();
}
};
struct SuppressMsgBuilder {
typedef SuppressMsg Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_pubkey(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> pubkey) {
fbb_.AddOffset(SuppressMsg::VT_PUBKEY, pubkey);
}
void add_reason(msg::fbuf::p2pmsg::SuppressReason reason) {
fbb_.AddElement<int8_t>(SuppressMsg::VT_REASON, static_cast<int8_t>(reason), 0);
}
explicit SuppressMsgBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
SuppressMsgBuilder &operator=(const SuppressMsgBuilder &);
flatbuffers::Offset<SuppressMsg> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<SuppressMsg>(end);
return o;
}
};
inline flatbuffers::Offset<SuppressMsg> CreateSuppressMsg(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> pubkey = 0,
msg::fbuf::p2pmsg::SuppressReason reason = msg::fbuf::p2pmsg::SuppressReason_ContractIdMismatch) {
SuppressMsgBuilder builder_(_fbb);
builder_.add_pubkey(pubkey);
builder_.add_reason(reason);
return builder_.Finish();
}
inline flatbuffers::Offset<SuppressMsg> CreateSuppressMsgDirect(
flatbuffers::FlatBufferBuilder &_fbb,
const std::vector<uint8_t> *pubkey = nullptr,
msg::fbuf::p2pmsg::SuppressReason reason = msg::fbuf::p2pmsg::SuppressReason_ContractIdMismatch) {
auto pubkey__ = pubkey ? _fbb.CreateVector<uint8_t>(*pubkey) : 0;
return msg::fbuf::p2pmsg::CreateSuppressMsg(
_fbb,
pubkey__,
reason);
}
struct HpfsFSHashEntry FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef HpfsFSHashEntryBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
@@ -2692,6 +2806,10 @@ inline bool VerifyP2PMsgContent(flatbuffers::Verifier &verifier, const void *obj
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::HpfsLogResponse *>(obj);
return verifier.VerifyTable(ptr);
}
case P2PMsgContent_SuppressMsg: {
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::SuppressMsg *>(obj);
return verifier.VerifyTable(ptr);
}
default: return true;
}
}

View File

@@ -60,45 +60,73 @@ namespace msg::controlmsg::json
* {
* 'type': 'peer_changeset',
* 'add': ['<ip1>','<ip2>', ...],
* 'remove': ['<ip1>','<ip2>', ...]
* 'remove': ['<ip1>','<ip2>', ...] | "*"
* }
*/
int extract_peer_changeset(std::vector<p2p::peer_properties> &added_peers, std::vector<p2p::peer_properties> &removed_peers, const jsoncons::json &d)
int extract_peer_changeset(std::vector<p2p::peer_properties> &added_peers, std::vector<p2p::peer_properties> &removed_peers, bool &overwrite, const jsoncons::json &d)
{
if (extract_peers_from_array(added_peers, msg::controlmsg::FLD_ADD, d) == -1 ||
extract_peers_from_array(removed_peers, msg::controlmsg::FLD_REMOVE, d) == -1)
overwrite = false;
if (!d.contains(msg::controlmsg::FLD_ADD) ||
extract_peers_from_array(added_peers, msg::controlmsg::FLD_ADD, d) == -1)
{
return -1;
}
if (d.contains(msg::controlmsg::FLD_REMOVE))
{
// Remove field can be string "*" or an array of strings.
// "*" means peers should be overwritten. (remove all peers and add the specified peers)
if (d[msg::controlmsg::FLD_REMOVE].is_string())
{
const std::string remove_str = d[msg::controlmsg::FLD_REMOVE].as<std::string>();
if (remove_str == "*")
{
overwrite = true;
}
else
{
return -1;
}
}
else
{
if (extract_peers_from_array(removed_peers, msg::controlmsg::FLD_REMOVE, d) == -1)
return -1;
}
}
else
{
return -1;
}
return 0;
}
int extract_peers_from_array(std::vector<p2p::peer_properties> &peers, std::string_view field, const jsoncons::json &d)
{
if (d.contains(field))
if (!d[field].is_array())
{
if (!d[field].is_array())
LOG_ERROR << "Extract peers: Invalid array field: " << field;
return -1;
}
for (auto &peer : d[field].array_range())
{
if (!peer.is<std::string>())
{
LOG_ERROR << "Extract peers: Invalid array field: " << field;
LOG_ERROR << "Extract peers: Invalid peer entry in field: " << field;
return -1;
}
for (auto &peer : d[field].array_range())
conf::peer_ip_port ipp;
if (ipp.from_string(peer.as<std::string_view>()) == -1)
{
if (!peer.is<std::string>())
{
LOG_ERROR << "Extract peers: Invalid peer entry in field: " << field;
return -1;
}
conf::peer_ip_port ipp;
if (ipp.from_string(peer.as<std::string_view>()) == -1)
{
LOG_ERROR << "Extract peers: Invalid peer format in field: " << field;
return -1;
}
peers.push_back(p2p::peer_properties{ipp, -1, 0, 0});
LOG_ERROR << "Extract peers: Invalid peer format in field: " << field;
return -1;
}
peers.push_back(p2p::peer_properties{ipp, -1, 0, 0});
}
return 0;

View File

@@ -13,7 +13,7 @@ namespace msg::controlmsg::json
int extract_type(std::string &extracted_type, const jsoncons::json &d);
int extract_peer_changeset(std::vector<p2p::peer_properties> &added_peers, std::vector<p2p::peer_properties> &removed_peers, const jsoncons::json &d);
int extract_peer_changeset(std::vector<p2p::peer_properties> &added_peers, std::vector<p2p::peer_properties> &removed_peers, bool &overwrite, const jsoncons::json &d);
int extract_peers_from_array(std::vector<p2p::peer_properties> &peers, std::string_view field, const jsoncons::json &d);

View File

@@ -37,7 +37,7 @@ namespace p2p
metric_thresholds[3] = conf::cfg.mesh.max_bad_msgs_per_min;
metric_thresholds[4] = conf::cfg.mesh.idle_timeout;
//Entry point for p2p which will start peer connections to other nodes
// Entry point for p2p which will start peer connections to other nodes
if (start_peer_connections() == -1)
return -1;
@@ -52,9 +52,11 @@ namespace p2p
{
if (init_success)
{
// If peer discovery was enabled, update latest known peers information to config
// before the peer server is stopped. (config will permanently save it to disk upon exit)
if (conf::cfg.mesh.peer_discovery.enabled)
/**
* Update latest known peers information to config
* before the peer server is stopped. (config will permanently save it to disk upon exit).
*/
{
std::scoped_lock lock(ctx.server->req_known_remotes_mutex);
const std::vector<peer_properties> &peers = ctx.server->req_known_remotes;
@@ -223,7 +225,7 @@ namespace p2p
if (send_to_self)
self::send(message);
//Broadcast while locking the peer_connections.
// Broadcast while locking the peer_connections.
std::scoped_lock<std::mutex> lock(ctx.peer_connections_mutex);
for (const auto &[k, session] : ctx.peer_connections)
@@ -244,7 +246,7 @@ namespace p2p
* @param msg_type The message type.
* @param originated_on The originated epoch of the received message.
* @return Returns true if the message is qualified for forwarding to peers. False otherwise.
*/
*/
bool validate_for_peer_msg_forwarding(const peer_comm_session &session, const enum msg::fbuf::p2pmsg::P2PMsgContent msg_type, const uint64_t originated_on)
{
// Checking whether the message forwarding is enabled.
@@ -291,7 +293,7 @@ namespace p2p
*/
void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf, std::string &target_pubkey, const bool full_history_only)
{
//Send while locking the peer_connections.
// Send while locking the peer_connections.
std::scoped_lock<std::mutex> lock(ctx.peer_connections_mutex);
const size_t connected_peers = ctx.peer_connections.size();
@@ -333,14 +335,14 @@ namespace p2p
session = it->second;
}
//send message to selected peer.
// send message to selected peer.
session->send(msg::fbuf::builder_to_string_view(fbuf));
target_pubkey = session->uniqueid;
}
/**
* Handle proposal message. This is called from peer and self message handlers.
*/
*/
void handle_proposal_message(const p2p::proposal &p)
{
// Check the cap and insert proposal with lock.
@@ -355,7 +357,7 @@ namespace p2p
/**
* Handle nonunl proposal message. This is called from peer and self message handlers.
*/
*/
void handle_nonunl_proposal_message(const p2p::nonunl_proposal &nup)
{
// Check the cap and insert proposal with lock.
@@ -379,6 +381,26 @@ namespace p2p
}
}
/**
* Handle a suppress message. This message is issued by a peer to suppress connection.
*/
void handle_suppress_message(const p2p::suppress_message &suppression, peer_comm_session *session)
{
if (suppression.reason == SUPPRESS_REASON::CONTRACT_MISMATCH)
{
LOG_DEBUG << "Peer " << session->known_ipport->to_string() << " suppressed us. Reason (" << SUPPRESS_REASON::CONTRACT_MISMATCH << ").";
const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p)
{ return p.ip_port == session->known_ipport; });
if (itr != ctx.server->req_known_remotes.end())
{
LOG_DEBUG << "Marking to omit peer " << session->known_ipport->to_string();
itr->has_suppressed_us = true;
}
}
else
LOG_DEBUG << "Invalid suppressing reason.";
}
/**
* Sends the peer requirement to the given peer session. If a session is not given, broadcast to all the connected peers.
* @param need_consensus_msg_forwarding True if the number of connections are below the threshold value.
@@ -465,22 +487,41 @@ namespace p2p
}
/**
* Merging the response peer list with the own known peer list.
* @param merge_peers Peers that must be merged with existing known peers.
* @param remove_peers Peers that must be removed from existing known peers.
* @param from The session that sent us the peer list.
* Update the known peer list with the specified modifications.
* @param mode Update applying priority.
* @param add_peers Peers that must be added to existing known peers.
* @param remove_peers Peers that must be removed from existing known peers. Ignored if mode is OVERWRITE.
* @param from The session that sent us the peer list. Optional.
*/
void merge_peer_list(const std::string &caller, const std::vector<peer_properties> *merge_peers, const std::vector<peer_properties> *remove_peers, const p2p::peer_comm_session *from)
void update_peer_list(const p2p::PEERS_UPDATE_MODE mode, const std::vector<peer_properties> *add_peers,
const std::vector<peer_properties> *remove_peers, const p2p::peer_comm_session *from)
{
std::scoped_lock<std::mutex> lock(ctx.server->req_known_remotes_mutex);
if (merge_peers)
if (mode == p2p::PEERS_UPDATE_MODE::OVERWRITE)
{
for (const peer_properties &peer : *merge_peers)
for (const peer_properties &kp : ctx.server->req_known_remotes)
{
std::scoped_lock<std::mutex> lock(ctx.peer_connections_mutex);
const auto itr = std::find_if(ctx.peer_connections.begin(), ctx.peer_connections.end(), [&](const std::pair<std::string, peer_comm_session *> &pc)
{ return pc.second->known_ipport == kp.ip_port; });
if (itr != ctx.peer_connections.end())
{
LOG_DEBUG << "Marking to close the session of removing peer conn:" << kp.ip_port.to_string();
peer_comm_session &session_to_close = *itr->second;
session_to_close.mark_for_closure(comm::CLOSE_VIOLATION::VIOLATION_IRRELEVANT_KNOWN_PEER);
}
}
ctx.server->req_known_remotes.clear();
}
if (add_peers)
{
for (const peer_properties &peer : *add_peers)
{
if (peer.ip_port.host_address.empty())
{
LOG_DEBUG << caller << " : Skip received peer with blank host address " << peer.ip_port.to_string() << " from " << peer.ip_port.to_string();
LOG_DEBUG << "Skip received peer with blank host address " << peer.ip_port.to_string() << " from " << peer.ip_port.to_string();
continue;
}
@@ -498,10 +539,17 @@ namespace p2p
continue;
}
if (ctx.server->dead_known_peers.exists(peer.ip_port.to_string()))
if (mode == p2p::PEERS_UPDATE_MODE::MERGE)
{
LOG_DEBUG << "Rejecting " + peer.ip_port.to_string() + ". Peer was removed prior due to unavailability.";
continue;
if (ctx.server->dead_known_peers.exists(peer.ip_port.to_string()))
{
LOG_DEBUG << "Rejecting " + peer.ip_port.to_string() + ". Peer was removed prior due to unavailability.";
continue;
}
}
else
{
ctx.server->dead_known_peers.erase(peer.ip_port.to_string());
}
const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p)
@@ -531,7 +579,7 @@ namespace p2p
}
}
if (remove_peers)
if (mode != p2p::PEERS_UPDATE_MODE::OVERWRITE && remove_peers)
{
for (const peer_properties &peer : *remove_peers)
{
@@ -541,13 +589,23 @@ namespace p2p
if (itr != ctx.server->req_known_remotes.end())
{
LOG_DEBUG << "Removing " << peer.ip_port.to_string() << " from known peer list.";
std::scoped_lock<std::mutex> lock(ctx.peer_connections_mutex);
const auto conn_itr = std::find_if(ctx.peer_connections.begin(), ctx.peer_connections.end(), [&](const std::pair<std::string, peer_comm_session *> &pc)
{ return pc.second->known_ipport == itr->ip_port; });
if (conn_itr != ctx.peer_connections.end())
{
LOG_DEBUG << "Marking to close the session of removing peer conn:" << peer.ip_port.to_string();
peer_comm_session &session_to_close = *conn_itr->second;
session_to_close.mark_for_closure(comm::CLOSE_VIOLATION::VIOLATION_IRRELEVANT_KNOWN_PEER);
}
ctx.server->req_known_remotes.erase(itr);
}
}
}
// Sorting the known remote list according to the weight value after merging the peer list.
if (merge_peers || remove_peers)
if (add_peers || remove_peers)
sort_known_remotes();
}
@@ -593,7 +651,7 @@ namespace p2p
/**
* Update the peer trusted status on unl list updates.
*/
*/
void update_unl_connections()
{
std::scoped_lock<std::mutex> lock(ctx.peer_connections_mutex);

View File

@@ -19,7 +19,14 @@ namespace p2p
constexpr uint16_t HPFS_RES_LIST_CAP = 255; // Maximum state response count.
constexpr uint16_t LOG_RECORD_REQ_LIST_CAP = 255; // Maximum log record request count.
constexpr uint16_t LOG_RECORD_RES_LIST_CAP = 255; // Maximum log record response count.
constexpr uint16_t PEER_LIST_CAP = 128; // Maximum peer count.
constexpr uint16_t PEER_LIST_CAP = 256; // Maximum peer count.
enum PEERS_UPDATE_MODE
{
MERGE = 0, // Gracefully merge new changes to our known peers.
FORCE = 1, // Update our known peers while giving priority to specified changes.
OVERWRITE = 2 // Completely overwrite our known peers with the given set of peers.
};
// Struct to represent information about a peer.
// Initially available capacity is set to -1 and timestamp is set to 0.
@@ -31,6 +38,7 @@ namespace p2p
uint64_t timestamp = 0;
int64_t weight = 0;
int32_t failed_attempts = 0;
bool has_suppressed_us = false;
};
struct proposal
@@ -117,6 +125,18 @@ namespace p2p
NOT_AVAILABLE = 3 // The entry does not exist on responder side. Requester must delete this on his side.
};
enum SUPPRESS_REASON
{
CONTRACT_MISMATCH = 0 // Suppress due to contract mismatch.
};
// Represents a peer suppression.
struct suppress_message
{
std::string pubkey; // Peer binary pubkey.
SUPPRESS_REASON reason;
};
// Represents hpfs file system entry.
struct hpfs_fs_hash_entry
{
@@ -226,6 +246,8 @@ namespace p2p
void handle_npl_message(const p2p::npl_message &npl);
void handle_suppress_message(const p2p::suppress_message &suppression, peer_comm_session *session);
bool validate_for_peer_msg_forwarding(const peer_comm_session &session, const enum msg::fbuf::p2pmsg::P2PMsgContent msg_type, const uint64_t originated_on);
void send_peer_requirement_announcement(const bool need_consensus_msg_forwarding, peer_comm_session *session = NULL);
@@ -238,7 +260,8 @@ namespace p2p
void update_known_peer_available_capacity(const conf::peer_ip_port &ip_port, const int16_t available_capacity, const uint64_t &timestamp);
void merge_peer_list(const std::string &caller, const std::vector<peer_properties> *merge_peers, const std::vector<peer_properties> *remove_peers, const p2p::peer_comm_session *from = NULL);
void update_peer_list(const p2p::PEERS_UPDATE_MODE mode, const std::vector<peer_properties> *add_peers,
const std::vector<peer_properties> *remove_peers, const p2p::peer_comm_session *from = NULL);
void sort_known_remotes();

View File

@@ -153,8 +153,8 @@ namespace p2p
if (conf::cfg.mesh.max_connections != 0 && known_remote_count == conf::cfg.mesh.max_connections)
break;
// Continue if the peer has no free slots.
if (peer.available_capacity == 0)
// Continue if the peer has no free slots or the peer has issued a suppression.
if (peer.available_capacity == 0 || peer.has_suppressed_us)
continue;
if (peer.ip_port.host_address.empty())

View File

@@ -106,12 +106,28 @@ namespace p2p
if (mi.type == p2pmsg::P2PMsgContent_PeerChallengeMsg)
{
const p2p::peer_challenge chall = p2pmsg::create_peer_challenge_from_msg(mi);
flatbuffers::FlatBufferBuilder fbuf;
// Check whether contract ids match.
if (chall.contract_id != conf::cfg.contract.id)
{
LOG_ERROR << "Contract id mismatch. Dropping connection " << display_name();
return -1;
if (is_inbound)
{
// Sending the a graceful rejection.
p2pmsg::create_suppress_msg(fbuf, p2p::SUPPRESS_REASON::CONTRACT_MISMATCH);
/**
* Returning with suppression message.
* If we do return -1 here, the session will be closed before the sending the message.
*/
return send(msg::fbuf::builder_to_string_view(fbuf));
}
// Returning 0, but the session will be removed when corresponding peer suppression is received.
return 0;
}
// Remember the time config reported by this peer.
@@ -121,7 +137,6 @@ namespace p2p
is_full_history = chall.is_full_history;
// Sending the challenge response to the sender.
flatbuffers::FlatBufferBuilder fbuf;
p2pmsg::create_peer_challenge_response_from_challenge(fbuf, chall.challenge);
return send(msg::fbuf::builder_to_string_view(fbuf));
}
@@ -131,6 +146,14 @@ namespace p2p
if (challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_ISSUED)
return p2p::resolve_peer_challenge(*this, p2pmsg::create_peer_challenge_response_from_msg(mi));
}
else if (mi.type == p2pmsg::P2PMsgContent_SuppressMsg)
{
LOG_DEBUG << "Received suppress message. " << display_name();
handle_suppress_message(p2pmsg::create_suppress_from_msg(mi), this);
// Returning -1 to close the session as this should not be progressed.
return -1;
}
if (challenge_status != comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED)
{
@@ -141,7 +164,7 @@ namespace p2p
if (mi.type == p2pmsg::P2PMsgContent_PeerListResponseMsg)
{
const std::vector<p2p::peer_properties> merge_peers = p2pmsg::create_peer_list_response_from_msg(mi);
p2p::merge_peer_list("Peer_Discovery", &merge_peers, NULL, this);
p2p::update_peer_list(p2p::PEERS_UPDATE_MODE::MERGE, &merge_peers, NULL, this);
}
else if (mi.type == p2pmsg::P2PMsgContent_PeerListRequestMsg)
{

View File

@@ -478,13 +478,13 @@ namespace sc
break;
}
// Atempt to read messages from contract (regardless of contract terminated or not).
// Attempt to read messages from contract (regardless of contract terminated or not).
const int control_read_res = read_control_outputs(ctx, out_fds[control_fd_idx]);
const int npl_read_res = ctx.args.readonly ? 0 : read_npl_outputs(ctx, &out_fds[npl_fd_idx]);
const int user_read_res = read_contract_fdmap_outputs(ctx.user_fds, out_fds, ctx.args.userbufs);
messages_read = (control_read_res + npl_read_res + user_read_res) > 0;
if (ctx.termination_signaled || ctx.contract_pid == 0)
if (ctx.contract_pid == 0)
{
// If no messages were read after contract finished execution, exit the polling loop.
// Otherwise keep running the loop becaue there might be further messages to read.
@@ -523,9 +523,8 @@ namespace sc
// Check if the contract has exited voluntarily.
if (check_contract_exited(ctx, false) == 0)
{
// Issue kill signal if the contract hasn't indicated the termination control message.
if (!ctx.termination_signaled)
kill(ctx.contract_pid, SIGTERM);
// Issue kill signal to kill the contract process.
kill(ctx.contract_pid, SIGKILL);
check_contract_exited(ctx, true); // Blocking wait until exit.
}
}
@@ -620,6 +619,9 @@ namespace sc
{
if (write_iosocket_seq_packet(ctx.control_fds, control_msg) == -1)
{
// Consider that no write operation occurred; assume that contract termination might have caused these errors.
if (errno == EPIPE || errno == ECONNRESET)
return 0;
LOG_ERROR << "Error writing HP inputs to SC";
return -1;
}
@@ -658,6 +660,9 @@ namespace sc
// Writing the public key to the contract's fd (Skip first byte for key type prefix).
if (write(writefd, pubkeyhex.data(), pubkeyhex.size()) == -1)
{
// Consider that no write operation occurred; assume that contract termination might have caused these errors. if (errno == EPIPE || errno == ECONNRESET)
if (errno == EPIPE || errno == ECONNRESET)
return 0;
LOG_ERROR << errno << ": Error writing npl message pubkey.";
return -1;
}
@@ -665,6 +670,9 @@ namespace sc
// Writing the message to the contract's fd.
if (write(writefd, npl_msg.data.data(), npl_msg.data.size()) == -1)
{
// Consider that no write operation occurred; assume that contract termination might have caused these errors.
if (errno == EPIPE || errno == ECONNRESET)
return 0;
LOG_ERROR << errno << ": Error writing npl message data.";
return -1;
}
@@ -676,7 +684,6 @@ namespace sc
LOG_DEBUG << "NPL message dropped due to last primary shard mismatch.";
}
}
return 0;
}
@@ -1030,7 +1037,7 @@ namespace sc
* @param is_stream_socket Indicates whether socket is steam socket or not.
* @param pfd The pollfd struct containing poll status.
* @param output The buffer to place the read output.
* @return -1 on error. Otherwise no. of bytes read.
* @return Returns -2 on neutral read, -1 on error, Otherwise no. of bytes read.
*/
int read_iosocket(const bool is_stream_socket, const pollfd pfd, std::string &output)
{
@@ -1045,7 +1052,11 @@ namespace sc
if (res == -1)
{
LOG_ERROR << errno << ": Error reading from contract socket. stream:" << is_stream_socket;
// Assuming that EPIPE or ECONNRESET resulted from contract termination, consider this as a neutral read.
if (errno == EPIPE || errno == ECONNRESET)
return -2;
else
LOG_ERROR << errno << ": Error reading from contract socket. stream:" << is_stream_socket;
}
return res;
@@ -1139,16 +1150,21 @@ namespace sc
if (parser.parse(msg) == -1 || parser.extract_type(type) == -1)
return;
if (type == msg::controlmsg::MSGTYPE_CONTRACT_END)
if (type == msg::controlmsg::MSGTYPE_PEER_CHANGESET)
{
ctx.termination_signaled = true;
}
else if (type == msg::controlmsg::MSGTYPE_PEER_CHANGESET)
{
std::vector<p2p::peer_properties> added_peers;
std::vector<p2p::peer_properties> removed_peers;
if (parser.extract_peer_changeset(added_peers, removed_peers) != -1)
p2p::merge_peer_list("Control_MSG", &added_peers, &removed_peers);
if (!conf::cfg.mesh.peer_discovery.enabled)
{
std::vector<p2p::peer_properties> added_peers;
std::vector<p2p::peer_properties> removed_peers;
bool overwrite = false;
if (parser.extract_peer_changeset(added_peers, removed_peers, overwrite) != -1)
{
const p2p::PEERS_UPDATE_MODE update_mode = (overwrite ? p2p::PEERS_UPDATE_MODE::OVERWRITE : p2p::PEERS_UPDATE_MODE::FORCE);
p2p::update_peer_list(update_mode, &added_peers, &removed_peers);
}
}
else
LOG_WARNING << "Not allowed to update peers via control msgs, as peer discovery is enabled.";
}
}

View File

@@ -134,9 +134,6 @@ namespace sc
std::string stdout_file;
std::string stderr_file;
// Indicates that the contract has sent termination control message.
bool termination_signaled = false;
// Indicates whether the contract exited normally without any errors.
bool exit_success = false;

View File

@@ -6,7 +6,7 @@
namespace version
{
// HotPocket version. Written to new configs and p2p/user messages.
constexpr const char *HP_VERSION = "0.6.3";
constexpr const char *HP_VERSION = "0.6.4";
// Minimum compatible config version (this will be used to validate configs).
constexpr const char *MIN_CONFIG_VERSION = "0.6.3";

View File

@@ -14,7 +14,7 @@ fi
clusterloc=$(pwd)/hpcluster
n=$1
hpversion=0.6.3
hpversion=0.6.4
let pubport=8080+$n
let peerport=22860+$n

View File

@@ -5,7 +5,7 @@ WINDOWSIZE=60 # size of window in seconds to examine for successful consensus ro
PIPE=concon.pipe
clusterloc=$(pwd)/hpcluster
n=1
hpversion=0.6.3
hpversion=0.6.4
let pubport=8080+$n
while true; do

View File

@@ -3,7 +3,7 @@
clusterloc=$(pwd)/hpcluster
n=1
hpversion=0.6.3
hpversion=0.6.4
let pubport=8080+$n
while true; do
CONSENSUS="0"

View File

@@ -15,7 +15,7 @@ fi
dir=$(realpath $1)
dirname=$(basename $dir)
n=$1
hpversion=0.6.3
hpversion=0.6.4
let pubport=8080