Introduce peer requirement announcement. (#156)

This commit is contained in:
Savinda Senevirathne
2020-11-20 16:01:46 +05:30
committed by GitHub
parent beedfb2709
commit fe24ef2388
15 changed files with 144 additions and 70 deletions

View File

@@ -305,6 +305,15 @@ namespace comm
}
}
/**
* Mark the connection as a verified connection.
*/
void comm_session::mark_as_verified()
{
challenge_status = CHALLENGE_STATUS::CHALLENGE_VERIFIED;
handle_on_verified();
}
int comm_session::handle_connect()
{
return 0;
@@ -319,4 +328,8 @@ namespace comm
{
}
void comm_session::handle_on_verified()
{
}
} // namespace comm

View File

@@ -44,6 +44,7 @@ namespace comm
virtual int handle_connect();
virtual int handle_message(std::string_view msg);
virtual void handle_close();
virtual void handle_on_verified();
public:
std::string uniqueid;
@@ -65,6 +66,7 @@ namespace comm
void check_last_activity_rules();
void mark_for_closure();
void close(const bool invoke_handler = true);
void mark_as_verified();
virtual const std::string display_name();
void set_threshold(const SESSION_THRESHOLDS threshold_type, const uint64_t threshold_limit, const uint32_t intervalms);

View File

@@ -34,7 +34,7 @@ union Message {
State_Response_Message,
History_Request_Message,
History_Response_Message,
Connected_Status_Announcement_Message,
Peer_Requirement_Announcement_Message,
Peer_List_Request_Message,
Peer_List_Response_Message,
Available_Capacity_Announcement_Message
@@ -124,8 +124,8 @@ table State_FS_Hash_Entry{
hash: [ubyte];
}
table Connected_Status_Announcement_Message{
is_weakly_connected: bool;
table Peer_Requirement_Announcement_Message{
need_consensus_msg_forwarding: bool;
}
table Available_Capacity_Announcement_Message{

View File

@@ -66,8 +66,8 @@ struct Block_ResponseBuilder;
struct State_FS_Hash_Entry;
struct State_FS_Hash_EntryBuilder;
struct Connected_Status_Announcement_Message;
struct Connected_Status_Announcement_MessageBuilder;
struct Peer_Requirement_Announcement_Message;
struct Peer_Requirement_Announcement_MessageBuilder;
struct Available_Capacity_Announcement_Message;
struct Available_Capacity_Announcement_MessageBuilder;
@@ -92,7 +92,7 @@ enum Message {
Message_State_Response_Message = 7,
Message_History_Request_Message = 8,
Message_History_Response_Message = 9,
Message_Connected_Status_Announcement_Message = 10,
Message_Peer_Requirement_Announcement_Message = 10,
Message_Peer_List_Request_Message = 11,
Message_Peer_List_Response_Message = 12,
Message_Available_Capacity_Announcement_Message = 13,
@@ -112,7 +112,7 @@ inline const Message (&EnumValuesMessage())[14] {
Message_State_Response_Message,
Message_History_Request_Message,
Message_History_Response_Message,
Message_Connected_Status_Announcement_Message,
Message_Peer_Requirement_Announcement_Message,
Message_Peer_List_Request_Message,
Message_Peer_List_Response_Message,
Message_Available_Capacity_Announcement_Message
@@ -132,7 +132,7 @@ inline const char * const *EnumNamesMessage() {
"State_Response_Message",
"History_Request_Message",
"History_Response_Message",
"Connected_Status_Announcement_Message",
"Peer_Requirement_Announcement_Message",
"Peer_List_Request_Message",
"Peer_List_Response_Message",
"Available_Capacity_Announcement_Message",
@@ -187,8 +187,8 @@ template<> struct MessageTraits<msg::fbuf::p2pmsg::History_Response_Message> {
static const Message enum_value = Message_History_Response_Message;
};
template<> struct MessageTraits<msg::fbuf::p2pmsg::Connected_Status_Announcement_Message> {
static const Message enum_value = Message_Connected_Status_Announcement_Message;
template<> struct MessageTraits<msg::fbuf::p2pmsg::Peer_Requirement_Announcement_Message> {
static const Message enum_value = Message_Peer_Requirement_Announcement_Message;
};
template<> struct MessageTraits<msg::fbuf::p2pmsg::Peer_List_Request_Message> {
@@ -617,8 +617,8 @@ struct Content FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
const msg::fbuf::p2pmsg::History_Response_Message *message_as_History_Response_Message() const {
return message_type() == msg::fbuf::p2pmsg::Message_History_Response_Message ? static_cast<const msg::fbuf::p2pmsg::History_Response_Message *>(message()) : nullptr;
}
const msg::fbuf::p2pmsg::Connected_Status_Announcement_Message *message_as_Connected_Status_Announcement_Message() const {
return message_type() == msg::fbuf::p2pmsg::Message_Connected_Status_Announcement_Message ? static_cast<const msg::fbuf::p2pmsg::Connected_Status_Announcement_Message *>(message()) : nullptr;
const msg::fbuf::p2pmsg::Peer_Requirement_Announcement_Message *message_as_Peer_Requirement_Announcement_Message() const {
return message_type() == msg::fbuf::p2pmsg::Message_Peer_Requirement_Announcement_Message ? static_cast<const msg::fbuf::p2pmsg::Peer_Requirement_Announcement_Message *>(message()) : nullptr;
}
const msg::fbuf::p2pmsg::Peer_List_Request_Message *message_as_Peer_List_Request_Message() const {
return message_type() == msg::fbuf::p2pmsg::Message_Peer_List_Request_Message ? static_cast<const msg::fbuf::p2pmsg::Peer_List_Request_Message *>(message()) : nullptr;
@@ -677,8 +677,8 @@ template<> inline const msg::fbuf::p2pmsg::History_Response_Message *Content::me
return message_as_History_Response_Message();
}
template<> inline const msg::fbuf::p2pmsg::Connected_Status_Announcement_Message *Content::message_as<msg::fbuf::p2pmsg::Connected_Status_Announcement_Message>() const {
return message_as_Connected_Status_Announcement_Message();
template<> inline const msg::fbuf::p2pmsg::Peer_Requirement_Announcement_Message *Content::message_as<msg::fbuf::p2pmsg::Peer_Requirement_Announcement_Message>() const {
return message_as_Peer_Requirement_Announcement_Message();
}
template<> inline const msg::fbuf::p2pmsg::Peer_List_Request_Message *Content::message_as<msg::fbuf::p2pmsg::Peer_List_Request_Message>() const {
@@ -1759,47 +1759,47 @@ inline flatbuffers::Offset<State_FS_Hash_Entry> CreateState_FS_Hash_EntryDirect(
hash__);
}
struct Connected_Status_Announcement_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef Connected_Status_Announcement_MessageBuilder Builder;
struct Peer_Requirement_Announcement_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef Peer_Requirement_Announcement_MessageBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_IS_WEAKLY_CONNECTED = 4
VT_NEED_CONSENSUS_MSG_FORWARDING = 4
};
bool is_weakly_connected() const {
return GetField<uint8_t>(VT_IS_WEAKLY_CONNECTED, 0) != 0;
bool need_consensus_msg_forwarding() const {
return GetField<uint8_t>(VT_NEED_CONSENSUS_MSG_FORWARDING, 0) != 0;
}
bool mutate_is_weakly_connected(bool _is_weakly_connected) {
return SetField<uint8_t>(VT_IS_WEAKLY_CONNECTED, static_cast<uint8_t>(_is_weakly_connected), 0);
bool mutate_need_consensus_msg_forwarding(bool _need_consensus_msg_forwarding) {
return SetField<uint8_t>(VT_NEED_CONSENSUS_MSG_FORWARDING, static_cast<uint8_t>(_need_consensus_msg_forwarding), 0);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<uint8_t>(verifier, VT_IS_WEAKLY_CONNECTED) &&
VerifyField<uint8_t>(verifier, VT_NEED_CONSENSUS_MSG_FORWARDING) &&
verifier.EndTable();
}
};
struct Connected_Status_Announcement_MessageBuilder {
typedef Connected_Status_Announcement_Message Table;
struct Peer_Requirement_Announcement_MessageBuilder {
typedef Peer_Requirement_Announcement_Message Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_is_weakly_connected(bool is_weakly_connected) {
fbb_.AddElement<uint8_t>(Connected_Status_Announcement_Message::VT_IS_WEAKLY_CONNECTED, static_cast<uint8_t>(is_weakly_connected), 0);
void add_need_consensus_msg_forwarding(bool need_consensus_msg_forwarding) {
fbb_.AddElement<uint8_t>(Peer_Requirement_Announcement_Message::VT_NEED_CONSENSUS_MSG_FORWARDING, static_cast<uint8_t>(need_consensus_msg_forwarding), 0);
}
explicit Connected_Status_Announcement_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb)
explicit Peer_Requirement_Announcement_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
flatbuffers::Offset<Connected_Status_Announcement_Message> Finish() {
flatbuffers::Offset<Peer_Requirement_Announcement_Message> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<Connected_Status_Announcement_Message>(end);
auto o = flatbuffers::Offset<Peer_Requirement_Announcement_Message>(end);
return o;
}
};
inline flatbuffers::Offset<Connected_Status_Announcement_Message> CreateConnected_Status_Announcement_Message(
inline flatbuffers::Offset<Peer_Requirement_Announcement_Message> CreatePeer_Requirement_Announcement_Message(
flatbuffers::FlatBufferBuilder &_fbb,
bool is_weakly_connected = false) {
Connected_Status_Announcement_MessageBuilder builder_(_fbb);
builder_.add_is_weakly_connected(is_weakly_connected);
bool need_consensus_msg_forwarding = false) {
Peer_Requirement_Announcement_MessageBuilder builder_(_fbb);
builder_.add_need_consensus_msg_forwarding(need_consensus_msg_forwarding);
return builder_.Finish();
}
@@ -2084,8 +2084,8 @@ inline bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Mess
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::History_Response_Message *>(obj);
return verifier.VerifyTable(ptr);
}
case Message_Connected_Status_Announcement_Message: {
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::Connected_Status_Announcement_Message *>(obj);
case Message_Peer_Requirement_Announcement_Message: {
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::Peer_Requirement_Announcement_Message *>(obj);
return verifier.VerifyTable(ptr);
}
case Message_Peer_List_Request_Message: {

View File

@@ -581,19 +581,19 @@ namespace msg::fbuf::p2pmsg
/**
* Create connected status announcement message.
* @param container_builder Flatbuffer builder for the container message.
* @param is_weakly_connected True if number of connections are below threshold and false otherwise.
* @param need_consensus_msg_forwarding True if number of connections are below threshold and false otherwise.
* @param lcl Lcl value to be passed in the container message.
*/
void create_msg_from_connected_status_announcement(flatbuffers::FlatBufferBuilder &container_builder, const bool is_weakly_connected, std::string_view lcl)
void create_msg_from_peer_requirement_announcement(flatbuffers::FlatBufferBuilder &container_builder, const bool need_consensus_msg_forwarding, std::string_view lcl)
{
flatbuffers::FlatBufferBuilder builder(1024);
const flatbuffers::Offset<Connected_Status_Announcement_Message> announcement =
CreateConnected_Status_Announcement_Message(
const flatbuffers::Offset<Peer_Requirement_Announcement_Message> announcement =
CreatePeer_Requirement_Announcement_Message(
builder,
is_weakly_connected);
need_consensus_msg_forwarding);
const flatbuffers::Offset<Content> message = CreateContent(builder, Message_Connected_Status_Announcement_Message, announcement.Union());
const flatbuffers::Offset<Content> message = CreateContent(builder, Message_Peer_Requirement_Announcement_Message, announcement.Union());
builder.Finish(message); // Finished building message content to get serialised content.
// Now that we have built the content message,

View File

@@ -70,7 +70,7 @@ namespace msg::fbuf::p2pmsg
void create_containermsg_from_content(
flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, std::string_view lcl, const bool sign);
void create_msg_from_connected_status_announcement(flatbuffers::FlatBufferBuilder &container_builder, const bool is_weakly_connected, std::string_view lcl);
void create_msg_from_peer_requirement_announcement(flatbuffers::FlatBufferBuilder &container_builder, const bool need_consensus_msg_forwarding, std::string_view lcl);
void create_msg_from_available_capacity_announcement(flatbuffers::FlatBufferBuilder &container_builder, const int16_t &available_capacity, const uint64_t &timestamp, std::string_view lcl);

View File

@@ -97,7 +97,8 @@ namespace p2p
{
// Add the new connection straight away, if we haven't seen it before.
session.uniqueid.swap(pubkeyhex);
session.challenge_status = comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED;
// Mark the connection as a verified connection.
session.mark_as_verified();
ctx.peer_connections.try_emplace(session.uniqueid, &session);
LOG_DEBUG << "Accepted verified connection [" << session.display_name() << "]";
@@ -117,13 +118,14 @@ namespace p2p
if (!session.known_ipport.has_value())
session.known_ipport.swap(ex_session.known_ipport);
session.uniqueid.swap(pubkeyhex);
session.challenge_status = comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED;
// Mark the connection as a verified connection.
session.mark_as_verified();
ex_session.mark_for_closure();
ctx.peer_connections.erase(iter); // remove existing session.
// We have to keep the weekly connected status of the removed session object.
// If not, connected status received prior to connection dropping will be lost.
session.is_weakly_connected = ex_session.is_weakly_connected;
// We have to keep the peer requirements of the removed session object.
// If not, requirements received prior to connection dropping will be lost.
session.need_consensus_msg_forwarding = ex_session.need_consensus_msg_forwarding;
ctx.peer_connections.try_emplace(session.uniqueid, &session); // add new session.
LOG_DEBUG << "Replacing existing connection [" << ex_session.display_name() << "] with [" << session.display_name() << "]";
@@ -174,9 +176,9 @@ namespace p2p
for (const auto &[k, session] : ctx.peer_connections)
{
// Exclude given session if provided.
// Messages are forwarded only to the weakly connected nodes only in the message forwarding mode.
// Messages are forwarded only to the requested nodes only in the message forwarding mode.
if ((skipping_session && skipping_session == session) ||
(is_msg_forwarding && !session->is_weakly_connected))
(is_msg_forwarding && !session->need_consensus_msg_forwarding))
continue;
session->send(message);
@@ -261,14 +263,24 @@ namespace p2p
}
/**
* Sends the connected status broadcast announcement to all the connected peers.
* @param fbuf Peer outbound message to be sent to peer.
* @param is_weakly_connected True if the number of connections are below the threshold value.
* Sends the peer requirement to the given peer session. If a session is not given, broadcast to all the connected peers.
* @param need_consensus_msg_forwarding True if the number of connections are below the threshold value.
* @param session The destination peer node.
*/
void send_connected_status_announcement(flatbuffers::FlatBufferBuilder &fbuf, const bool is_weakly_connected)
void send_peer_requirement_announcement(const bool need_consensus_msg_forwarding, peer_comm_session *session)
{
msg::fbuf::p2pmsg::create_msg_from_connected_status_announcement(fbuf, is_weakly_connected, ledger::ctx.get_lcl());
broadcast_message(fbuf, false);
flatbuffers::FlatBufferBuilder fbuf(1024);
msg::fbuf::p2pmsg::create_msg_from_peer_requirement_announcement(fbuf, need_consensus_msg_forwarding, ledger::ctx.get_lcl());
if (session)
{
std::string_view msg = std::string_view(
reinterpret_cast<const char *>(fbuf.GetBufferPointer()), fbuf.GetSize());
session->send(msg);
}
else
{
broadcast_message(fbuf, false);
}
}
/**

View File

@@ -151,7 +151,7 @@ namespace p2p
bool validate_for_peer_msg_forwarding(const peer_comm_session &session, const msg::fbuf::p2pmsg::Container *container, const msg::fbuf::p2pmsg::Message &content_message_type);
void send_connected_status_announcement(flatbuffers::FlatBufferBuilder &fbuf, const bool is_weakly_connected);
void send_peer_requirement_announcement(const bool need_consensus_msg_forwarding, peer_comm_session *session = NULL);
void send_available_capacity_announcement(const int16_t &available_capacity);

View File

@@ -8,6 +8,10 @@
namespace p2p
{
constexpr float WEAKLY_CONNECTED_THRESHOLD = 0.7;
// Globally exposed weakly connected status variable.
bool is_weakly_connected = false;
peer_comm_server::peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[4],
const uint64_t max_msg_size, std::vector<conf::peer_properties> &req_known_remotes)
: comm::comm_server<peer_comm_session>("Peer", port, metric_thresholds, max_msg_size),
@@ -95,6 +99,10 @@ namespace p2p
}
}
// Check connected status of the node and sends the announcment
// about the consensus message forwarding requirement.
detect_if_weakly_connected();
util::sleep(100);
}
@@ -166,7 +174,7 @@ namespace p2p
{
const std::string &host_address = std::get<std::string>(host_result);
p2p::peer_comm_session session(host_address, std::move(client), false, metric_thresholds);
// Skip if this peer is banned due to corebill violations.
if (corebill::is_banned(host_address))
{
@@ -183,5 +191,22 @@ namespace p2p
}
}
}
/**
* Check whether the node is weakly connected or strongly connected in every 60 seconds.
*/
void peer_comm_server::detect_if_weakly_connected()
{
if (connected_status_check_counter == 600)
{
// One is added to session list size to reflect the loop back connection.
const bool current_state = (sessions.size() + 1) < (conf::cfg.unl.size() * WEAKLY_CONNECTED_THRESHOLD);
if (is_weakly_connected != current_state)
{
is_weakly_connected = !is_weakly_connected;
send_peer_requirement_announcement(is_weakly_connected);
}
connected_status_check_counter = 0;
}
connected_status_check_counter++;
}
} // namespace p2p

View File

@@ -6,15 +6,20 @@
namespace p2p
{
// Globally exposed weakly connected status variable.
extern bool is_weakly_connected;
class peer_comm_server : public comm::comm_server<peer_comm_session>
{
private:
int custom_connection_invocations = -1;
// std::thread known_peers_thread; // Known peers connection establishment thread.
std::thread peer_managing_thread; // Thread to request known peer list from a random peer and announce available capacity.
uint16_t connected_status_check_counter = 0;
void maintain_known_connections();
void peer_managing_loop();
void detect_if_weakly_connected();
protected:
void start_custom_jobs();
@@ -22,7 +27,6 @@ namespace p2p
int process_custom_messages();
void custom_connections();
public:
std::atomic<uint16_t> known_remote_count = 0;
std::mutex req_known_remotes_mutex;

View File

@@ -19,6 +19,11 @@ namespace p2p
p2p::handle_peer_close(*this);
}
void peer_comm_session::handle_on_verified()
{
p2p::handle_peer_on_verified(*this);
}
/**
* Returns printable name for the session based on uniqueid (used for logging).
*/

View File

@@ -18,10 +18,11 @@ namespace p2p
int handle_connect();
int handle_message(std::string_view msg);
void handle_close();
void handle_on_verified();
public:
std::optional<conf::ip_port_prop> known_ipport; // A known ip/port information that matches with our peer list configuration.
bool is_weakly_connected = false; // Holds whether this node is weakly connected to the other nodes.
bool need_consensus_msg_forwarding = false; // Holds whether this node requires consensus message forwarding.
const std::string display_name();
};

View File

@@ -75,10 +75,10 @@ namespace p2p
const p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc
// Check whether the message is qualified for forwarding.
// Check whether the message is qualified for message forwarding.
if (p2p::validate_for_peer_msg_forwarding(session, container, content_message_type))
{
if (session.is_weakly_connected)
if (session.need_consensus_msg_forwarding)
{
// Forward messages received by weakly connected nodes to other peers.
p2p::broadcast_message(message, false, false, &session);
@@ -132,17 +132,17 @@ namespace p2p
p2p::update_known_peer_available_capacity(session.known_ipport.value(), announcement_msg->available_capacity(), announcement_msg->timestamp());
}
}
else if (content_message_type == p2pmsg::Message_Connected_Status_Announcement_Message) // This message is the connected status announcement message.
else if (content_message_type == p2pmsg::Message_Peer_Requirement_Announcement_Message) // This message is a peer requirement announcement message.
{
const p2pmsg::Connected_Status_Announcement_Message *announcement_msg = content->message_as_Connected_Status_Announcement_Message();
session.is_weakly_connected = announcement_msg->is_weakly_connected();
if (session.is_weakly_connected)
const p2pmsg::Peer_Requirement_Announcement_Message *announcement_msg = content->message_as_Peer_Requirement_Announcement_Message();
session.need_consensus_msg_forwarding = announcement_msg->need_consensus_msg_forwarding();
if (session.need_consensus_msg_forwarding)
{
LOG_DEBUG << "Weakly connected announcement received from " << session.display_name();
LOG_DEBUG << "Consensus message forwaring is required for " << session.display_name();
}
else
{
LOG_DEBUG << "Strongly connected announcement received from " << session.display_name();
LOG_DEBUG << "Consensus message forwaring is not required for " << session.display_name();
}
}
else if (content_message_type == p2pmsg::Message_Proposal_Message) // message is a proposal message
@@ -294,4 +294,15 @@ namespace p2p
return 0;
}
/**
* Logic related to peer sessions on verfied is invoked here.
*/
void handle_peer_on_verified(p2p::peer_comm_session &session)
{
// Sending newly verified node the requirement of consensus msg fowarding if this node is weakly connected.
if (p2p::is_weakly_connected)
{
p2p::send_peer_requirement_announcement(is_weakly_connected, &session);
}
}
} // namespace p2p

View File

@@ -14,6 +14,7 @@ namespace p2p
int handle_peer_message(p2p::peer_comm_session &session, std::string_view message);
int handle_self_message(std::string_view message);
int handle_peer_close(const p2p::peer_comm_session &session);
void handle_peer_on_verified(p2p::peer_comm_session &session);
void handle_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content);
void handle_nonunl_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content);
void handle_npl_message(const p2pmsg::Container *container, const p2pmsg::Content *content);

View File

@@ -212,7 +212,7 @@ namespace usr
const util::PROTOCOL protocol = (protocol_code == "json" ? util::PROTOCOL::JSON : util::PROTOCOL::BSON);
session.challenge_status = comm::CHALLENGE_VERIFIED; // Set as challenge verified
session.mark_as_verified(); // Mark connection as a verified connection.
session.issued_challenge.clear(); // Remove the stored challenge
session.uniqueid = pubkey;