mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Weakly connected status announcement. (#135)
* Forward others' messages only to the weakly connected nodes instead of broadcasting to all the connected peers. * Announcing connected status depends on a threshold to other connected peers. * Forwarding messages of weakly connected peers to other peers.
This commit is contained in:
committed by
GitHub
parent
5c4edfafb9
commit
fabfdcce89
@@ -5,10 +5,12 @@
|
||||
#include "../util.hpp"
|
||||
#include "../bill/corebill.h"
|
||||
#include "../hpws/hpws.hpp"
|
||||
#include "../p2p/p2p.hpp"
|
||||
|
||||
namespace comm
|
||||
{
|
||||
constexpr uint32_t DEFAULT_MAX_MSG_SIZE = 16 * 1024 * 1024;
|
||||
constexpr float WEAKLY_CONNECTED_THRESHOLD = 0.7;
|
||||
|
||||
int comm_server::start(
|
||||
const uint16_t port, const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4],
|
||||
@@ -66,6 +68,30 @@ namespace comm
|
||||
else
|
||||
++itr;
|
||||
}
|
||||
|
||||
flatbuffers::FlatBufferBuilder fbuf(1024);
|
||||
if (sessions.size() > 1)
|
||||
{
|
||||
if (is_weakly_connected())
|
||||
{
|
||||
if (!weakly_connected_status_sent)
|
||||
{
|
||||
LOG_DEBUG << "Weakly connected status announcement sent";
|
||||
p2p::send_connected_status_announcement(fbuf, true);
|
||||
// Mark that the p2p message forwarding is requested.
|
||||
weakly_connected_status_sent = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (weakly_connected_status_sent)
|
||||
{
|
||||
LOG_DEBUG << "Strongly connected status announcement sent";
|
||||
p2p::send_connected_status_announcement(fbuf, false);
|
||||
weakly_connected_status_sent = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we reach this point that means we are shutting down.
|
||||
@@ -79,6 +105,11 @@ namespace comm
|
||||
LOG_INFO << (session_type == SESSION_TYPE::USER ? "User" : "Peer") << " listener stopped.";
|
||||
}
|
||||
|
||||
bool comm_server::is_weakly_connected()
|
||||
{
|
||||
return (sessions.size() - 1) < (conf::cfg.unl.size() * WEAKLY_CONNECTED_THRESHOLD);
|
||||
}
|
||||
|
||||
void comm_server::check_for_new_connection(
|
||||
std::list<comm_session> &sessions, const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4])
|
||||
{
|
||||
@@ -123,6 +154,12 @@ namespace comm
|
||||
// in accessing class member variables inside the thread.
|
||||
// Class member variables gives unacceptable values if the thread starts before the move operation.
|
||||
inserted_session.start_messaging_threads();
|
||||
// Making sure the newly connected node get the weakly connected announcement if the number of
|
||||
// connected peers are under the threshold.
|
||||
if ((sessions.size() > 1) && is_weakly_connected())
|
||||
{
|
||||
weakly_connected_status_sent = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ namespace comm
|
||||
std::thread watchdog_thread; // Connection watcher thread.
|
||||
std::thread inbound_message_processor_thread; // Incoming message processor thread.
|
||||
bool should_stop_listening = false;
|
||||
bool weakly_connected_status_sent = false; // keep track whether the weakly connected status announcement is sent or not.
|
||||
|
||||
std::list<comm_session> sessions;
|
||||
std::mutex sessions_mutex;
|
||||
@@ -37,6 +38,8 @@ namespace comm
|
||||
|
||||
std::string get_cgi_ip(const int fd);
|
||||
|
||||
bool is_weakly_connected();
|
||||
|
||||
public:
|
||||
// Start accepting incoming connections
|
||||
int start(
|
||||
|
||||
@@ -179,6 +179,7 @@ namespace comm
|
||||
if (out_msg_queue.try_dequeue(msg_to_send))
|
||||
{
|
||||
process_outbound_message(msg_to_send);
|
||||
util::sleep(1);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
@@ -38,12 +38,12 @@ namespace comm
|
||||
private:
|
||||
std::optional<hpws::client> hpws_client;
|
||||
const SESSION_TYPE session_type;
|
||||
std::vector<session_threshold> thresholds; // track down various communication thresholds
|
||||
|
||||
std::thread reader_thread; // The thread responsible for reading messages from the read fd.
|
||||
std::thread writer_thread; // The thread responsible for writing messages to the write fd.
|
||||
moodycamel::ReaderWriterQueue<std::vector<char>> in_msg_queue; // Holds incoming messages waiting to be processed.
|
||||
moodycamel::ConcurrentQueue<std::string> out_msg_queue; // Holds outgoing messages waiting to be processed.
|
||||
std::vector<session_threshold> thresholds; // track down various communication thresholds
|
||||
|
||||
std::thread reader_thread; // The thread responsible for reading messages from the read fd.
|
||||
std::thread writer_thread; // The thread responsible for writing messages to the write fd.
|
||||
moodycamel::ReaderWriterQueue<std::vector<char>> in_msg_queue; // Holds incoming messages waiting to be processed.
|
||||
moodycamel::ConcurrentQueue<std::string> out_msg_queue; // Holds outgoing messages waiting to be processed.
|
||||
|
||||
void reader_loop();
|
||||
|
||||
@@ -56,6 +56,7 @@ namespace comm
|
||||
conf::ip_port_pair known_ipport;
|
||||
SESSION_STATE state = SESSION_STATE::NOT_INITIALIZED;
|
||||
CHALLENGE_STATUS challenge_status = CHALLENGE_STATUS::NOT_ISSUED;
|
||||
bool is_weakly_connected = false; // Holds whether this node is weakly connected to the other nodes.
|
||||
|
||||
comm_session(
|
||||
std::string_view ip, hpws::client &&hpws_client, const SESSION_TYPE session_type,
|
||||
@@ -76,4 +77,4 @@ namespace comm
|
||||
|
||||
} // namespace comm
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
||||
@@ -24,7 +24,18 @@ table UserInputGroup {
|
||||
messages:[UserInput];
|
||||
}
|
||||
|
||||
union Message { Peer_Challenge_Response_Message, Peer_Challenge_Message, NonUnl_Proposal_Message, Proposal_Message, Npl_Message, State_Request_Message, State_Response_Message, History_Request_Message, History_Response_Message } //message content type
|
||||
union Message {
|
||||
Peer_Challenge_Response_Message,
|
||||
Peer_Challenge_Message,
|
||||
NonUnl_Proposal_Message,
|
||||
Proposal_Message,
|
||||
Npl_Message,
|
||||
State_Request_Message,
|
||||
State_Response_Message,
|
||||
History_Request_Message,
|
||||
History_Response_Message,
|
||||
Connected_Status_Announcement_Message
|
||||
} //message content type
|
||||
|
||||
table Content {
|
||||
message:Message;
|
||||
@@ -110,4 +121,8 @@ table State_FS_Hash_Entry{
|
||||
hash: [ubyte];
|
||||
}
|
||||
|
||||
table Connected_Status_Announcement_Message{
|
||||
is_weakly_connected: bool;
|
||||
}
|
||||
|
||||
root_type Content; //root type for message content
|
||||
@@ -66,6 +66,9 @@ struct Block_ResponseBuilder;
|
||||
struct State_FS_Hash_Entry;
|
||||
struct State_FS_Hash_EntryBuilder;
|
||||
|
||||
struct Connected_Status_Announcement_Message;
|
||||
struct Connected_Status_Announcement_MessageBuilder;
|
||||
|
||||
enum Message {
|
||||
Message_NONE = 0,
|
||||
Message_Peer_Challenge_Response_Message = 1,
|
||||
@@ -77,11 +80,12 @@ enum Message {
|
||||
Message_State_Response_Message = 7,
|
||||
Message_History_Request_Message = 8,
|
||||
Message_History_Response_Message = 9,
|
||||
Message_Connected_Status_Announcement_Message = 10,
|
||||
Message_MIN = Message_NONE,
|
||||
Message_MAX = Message_History_Response_Message
|
||||
Message_MAX = Message_Connected_Status_Announcement_Message
|
||||
};
|
||||
|
||||
inline const Message (&EnumValuesMessage())[10] {
|
||||
inline const Message (&EnumValuesMessage())[11] {
|
||||
static const Message values[] = {
|
||||
Message_NONE,
|
||||
Message_Peer_Challenge_Response_Message,
|
||||
@@ -92,13 +96,14 @@ inline const Message (&EnumValuesMessage())[10] {
|
||||
Message_State_Request_Message,
|
||||
Message_State_Response_Message,
|
||||
Message_History_Request_Message,
|
||||
Message_History_Response_Message
|
||||
Message_History_Response_Message,
|
||||
Message_Connected_Status_Announcement_Message
|
||||
};
|
||||
return values;
|
||||
}
|
||||
|
||||
inline const char * const *EnumNamesMessage() {
|
||||
static const char * const names[11] = {
|
||||
static const char * const names[12] = {
|
||||
"NONE",
|
||||
"Peer_Challenge_Response_Message",
|
||||
"Peer_Challenge_Message",
|
||||
@@ -109,13 +114,14 @@ inline const char * const *EnumNamesMessage() {
|
||||
"State_Response_Message",
|
||||
"History_Request_Message",
|
||||
"History_Response_Message",
|
||||
"Connected_Status_Announcement_Message",
|
||||
nullptr
|
||||
};
|
||||
return names;
|
||||
}
|
||||
|
||||
inline const char *EnumNameMessage(Message e) {
|
||||
if (flatbuffers::IsOutRange(e, Message_NONE, Message_History_Response_Message)) return "";
|
||||
if (flatbuffers::IsOutRange(e, Message_NONE, Message_Connected_Status_Announcement_Message)) return "";
|
||||
const size_t index = static_cast<size_t>(e);
|
||||
return EnumNamesMessage()[index];
|
||||
}
|
||||
@@ -160,6 +166,10 @@ template<> struct MessageTraits<msg::fbuf::p2pmsg::History_Response_Message> {
|
||||
static const Message enum_value = Message_History_Response_Message;
|
||||
};
|
||||
|
||||
template<> struct MessageTraits<msg::fbuf::p2pmsg::Connected_Status_Announcement_Message> {
|
||||
static const Message enum_value = Message_Connected_Status_Announcement_Message;
|
||||
};
|
||||
|
||||
bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Message type);
|
||||
bool VerifyMessageVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector<flatbuffers::Offset<void>> *values, const flatbuffers::Vector<uint8_t> *types);
|
||||
|
||||
@@ -574,6 +584,9 @@ struct Content FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
|
||||
const msg::fbuf::p2pmsg::History_Response_Message *message_as_History_Response_Message() const {
|
||||
return message_type() == msg::fbuf::p2pmsg::Message_History_Response_Message ? static_cast<const msg::fbuf::p2pmsg::History_Response_Message *>(message()) : nullptr;
|
||||
}
|
||||
const msg::fbuf::p2pmsg::Connected_Status_Announcement_Message *message_as_Connected_Status_Announcement_Message() const {
|
||||
return message_type() == msg::fbuf::p2pmsg::Message_Connected_Status_Announcement_Message ? static_cast<const msg::fbuf::p2pmsg::Connected_Status_Announcement_Message *>(message()) : nullptr;
|
||||
}
|
||||
void *mutable_message() {
|
||||
return GetPointer<void *>(VT_MESSAGE);
|
||||
}
|
||||
@@ -622,6 +635,10 @@ template<> inline const msg::fbuf::p2pmsg::History_Response_Message *Content::me
|
||||
return message_as_History_Response_Message();
|
||||
}
|
||||
|
||||
template<> inline const msg::fbuf::p2pmsg::Connected_Status_Announcement_Message *Content::message_as<msg::fbuf::p2pmsg::Connected_Status_Announcement_Message>() const {
|
||||
return message_as_Connected_Status_Announcement_Message();
|
||||
}
|
||||
|
||||
struct ContentBuilder {
|
||||
typedef Content Table;
|
||||
flatbuffers::FlatBufferBuilder &fbb_;
|
||||
@@ -1688,6 +1705,50 @@ inline flatbuffers::Offset<State_FS_Hash_Entry> CreateState_FS_Hash_EntryDirect(
|
||||
hash__);
|
||||
}
|
||||
|
||||
struct Connected_Status_Announcement_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
|
||||
typedef Connected_Status_Announcement_MessageBuilder Builder;
|
||||
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
|
||||
VT_IS_WEAKLY_CONNECTED = 4
|
||||
};
|
||||
bool is_weakly_connected() const {
|
||||
return GetField<uint8_t>(VT_IS_WEAKLY_CONNECTED, 0) != 0;
|
||||
}
|
||||
bool mutate_is_weakly_connected(bool _is_weakly_connected) {
|
||||
return SetField<uint8_t>(VT_IS_WEAKLY_CONNECTED, static_cast<uint8_t>(_is_weakly_connected), 0);
|
||||
}
|
||||
bool Verify(flatbuffers::Verifier &verifier) const {
|
||||
return VerifyTableStart(verifier) &&
|
||||
VerifyField<uint8_t>(verifier, VT_IS_WEAKLY_CONNECTED) &&
|
||||
verifier.EndTable();
|
||||
}
|
||||
};
|
||||
|
||||
struct Connected_Status_Announcement_MessageBuilder {
|
||||
typedef Connected_Status_Announcement_Message Table;
|
||||
flatbuffers::FlatBufferBuilder &fbb_;
|
||||
flatbuffers::uoffset_t start_;
|
||||
void add_is_weakly_connected(bool is_weakly_connected) {
|
||||
fbb_.AddElement<uint8_t>(Connected_Status_Announcement_Message::VT_IS_WEAKLY_CONNECTED, static_cast<uint8_t>(is_weakly_connected), 0);
|
||||
}
|
||||
explicit Connected_Status_Announcement_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb)
|
||||
: fbb_(_fbb) {
|
||||
start_ = fbb_.StartTable();
|
||||
}
|
||||
flatbuffers::Offset<Connected_Status_Announcement_Message> Finish() {
|
||||
const auto end = fbb_.EndTable(start_);
|
||||
auto o = flatbuffers::Offset<Connected_Status_Announcement_Message>(end);
|
||||
return o;
|
||||
}
|
||||
};
|
||||
|
||||
inline flatbuffers::Offset<Connected_Status_Announcement_Message> CreateConnected_Status_Announcement_Message(
|
||||
flatbuffers::FlatBufferBuilder &_fbb,
|
||||
bool is_weakly_connected = false) {
|
||||
Connected_Status_Announcement_MessageBuilder builder_(_fbb);
|
||||
builder_.add_is_weakly_connected(is_weakly_connected);
|
||||
return builder_.Finish();
|
||||
}
|
||||
|
||||
inline bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Message type) {
|
||||
switch (type) {
|
||||
case Message_NONE: {
|
||||
@@ -1729,6 +1790,10 @@ inline bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Mess
|
||||
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::History_Response_Message *>(obj);
|
||||
return verifier.VerifyTable(ptr);
|
||||
}
|
||||
case Message_Connected_Status_Announcement_Message: {
|
||||
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::Connected_Status_Announcement_Message *>(obj);
|
||||
return verifier.VerifyTable(ptr);
|
||||
}
|
||||
default: return true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
#include "../../crypto.hpp"
|
||||
#include "../../util.hpp"
|
||||
#include "../../hplog.hpp"
|
||||
#include "../../p2p/p2p.hpp"
|
||||
#include "../../hpfs/h32.hpp"
|
||||
#include "../../hpfs/hpfs.hpp"
|
||||
#include "p2pmsg_container_generated.h"
|
||||
@@ -415,7 +414,7 @@ namespace msg::fbuf::p2pmsg
|
||||
|
||||
// Now that we have built the content message,
|
||||
// we need to sign it and place it inside a container message.
|
||||
create_containermsg_from_content(container_builder, builder, {}, true);
|
||||
create_containermsg_from_content(container_builder, builder, {}, false);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -439,7 +438,7 @@ namespace msg::fbuf::p2pmsg
|
||||
|
||||
// Now that we have built the content message,
|
||||
// we need to sign it and place it inside a container message.
|
||||
create_containermsg_from_content(container_builder, builder, {}, true);
|
||||
create_containermsg_from_content(container_builder, builder, {}, false);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -464,7 +463,7 @@ namespace msg::fbuf::p2pmsg
|
||||
|
||||
// Now that we have built the content message,
|
||||
// we need to sign it and place it inside a container message.
|
||||
create_containermsg_from_content(container_builder, builder, lcl, true);
|
||||
create_containermsg_from_content(container_builder, builder, lcl, false);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -569,6 +568,28 @@ namespace msg::fbuf::p2pmsg
|
||||
create_containermsg_from_content(container_builder, builder, lcl, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create connected status announcement message.
|
||||
* @param container_builder Flatbuffer builder for the container message.
|
||||
* @param is_weakly_connected True if number of connections are below threshold and false otherwise.
|
||||
* @param lcl Lcl value to be passed in the container message.
|
||||
*/
|
||||
void create_msg_for_connected_status_announcement(flatbuffers::FlatBufferBuilder &container_builder, const bool is_weakly_connected, std::string_view lcl)
|
||||
{
|
||||
flatbuffers::FlatBufferBuilder builder(1024);
|
||||
|
||||
const flatbuffers::Offset<Connected_Status_Announcement_Message> announcement =
|
||||
CreateConnected_Status_Announcement_Message(
|
||||
builder,
|
||||
is_weakly_connected);
|
||||
|
||||
const flatbuffers::Offset<Content> message = CreateContent(builder, Message_Connected_Status_Announcement_Message, announcement.Union());
|
||||
builder.Finish(message); // Finished building message content to get serialised content.
|
||||
|
||||
// Now that we have built the content message,
|
||||
create_containermsg_from_content(container_builder, builder, lcl, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a Flatbuffer container message from the given Content message.
|
||||
* @param container_builder The Flatbuffer builder to which the final container message should be written to.
|
||||
@@ -739,5 +760,4 @@ namespace msg::fbuf::p2pmsg
|
||||
}
|
||||
return builder.CreateVector(fbvec);
|
||||
}
|
||||
|
||||
} // namespace msg::fbuf::p2pmsg
|
||||
@@ -95,6 +95,8 @@ namespace msg::fbuf::p2pmsg
|
||||
flatbuffers::FlatBufferBuilder &builder,
|
||||
std::vector<hpfs::child_hash_node> &hash_nodes);
|
||||
|
||||
void create_msg_for_connected_status_announcement(flatbuffers::FlatBufferBuilder &container_builder, const bool is_weakly_connected, std::string_view lcl);
|
||||
|
||||
} // namespace msg::fbuf::p2pmsg
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
||||
@@ -5,6 +5,8 @@
|
||||
#include "../util.hpp"
|
||||
#include "../hplog.hpp"
|
||||
#include "p2p.hpp"
|
||||
#include "../msg/fbuf/p2pmsg_helpers.hpp"
|
||||
#include "../ledger.hpp"
|
||||
|
||||
namespace p2p
|
||||
{
|
||||
@@ -124,6 +126,9 @@ namespace p2p
|
||||
|
||||
ex_session.mark_for_closure();
|
||||
p2p::ctx.peer_connections.erase(iter); // remove existing session.
|
||||
// We have to keep the weekly connected status of the removed session object.
|
||||
// If not, connected status received prior to connection dropping will be lost.
|
||||
session.is_weakly_connected = ex_session.is_weakly_connected;
|
||||
p2p::ctx.peer_connections.try_emplace(session.uniqueid, &session); // add new session.
|
||||
|
||||
LOG_DEBUG << "Replacing existing connection [" << session.uniqueid.substr(0, 10) << "]";
|
||||
@@ -146,20 +151,23 @@ namespace p2p
|
||||
* Broadcasts the given message to all currently connected outbound peers.
|
||||
* @param fbuf Peer outbound message to be broadcasted.
|
||||
* @param send_to_self Whether to also send the message to self (this node).
|
||||
* @param is_msg_forwarding Whether this broadcast is for message forwarding.
|
||||
*/
|
||||
void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self)
|
||||
void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding)
|
||||
{
|
||||
std::string_view msg = std::string_view(
|
||||
reinterpret_cast<const char *>(fbuf.GetBufferPointer()), fbuf.GetSize());
|
||||
broadcast_message(msg, send_to_self);
|
||||
|
||||
broadcast_message(msg, send_to_self, is_msg_forwarding);
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcast the given message to all connected outbound peers.
|
||||
* @param message Message to be forwarded.
|
||||
* @param is_msg_forwarding Whether this broadcast is for message forwarding.
|
||||
* @param skipping_session Session to be skipped in message forwarding(optional).
|
||||
*/
|
||||
void broadcast_message(std::string_view message, const bool send_to_self, const comm::comm_session *skipping_session)
|
||||
void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding, const comm::comm_session *skipping_session)
|
||||
{
|
||||
if (ctx.peer_connections.size() == 0)
|
||||
{
|
||||
@@ -173,7 +181,10 @@ namespace p2p
|
||||
for (const auto &[k, session] : ctx.peer_connections)
|
||||
{
|
||||
// Exclude given session and self if provided.
|
||||
if ((!send_to_self && session->is_self) || (skipping_session && skipping_session == session))
|
||||
// Messages are forwarded only to the weakly connected nodes only in the message forwarding mode.
|
||||
if ((!send_to_self && session->is_self) ||
|
||||
(skipping_session && skipping_session == session) ||
|
||||
(is_msg_forwarding && !session->is_weakly_connected))
|
||||
continue;
|
||||
|
||||
session->send(message);
|
||||
@@ -272,4 +283,15 @@ namespace p2p
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends the connected status broadcast announcement to all the connected peers.
|
||||
* @param fbuf Peer outbound message to be sent to peer.
|
||||
* @param is_weakly_connected True if the number of connections are below the threshold value.
|
||||
*/
|
||||
void send_connected_status_announcement(flatbuffers::FlatBufferBuilder &fbuf, const bool is_weakly_connected)
|
||||
{
|
||||
msg::fbuf::p2pmsg::create_msg_for_connected_status_announcement(fbuf, is_weakly_connected, ledger::ctx.get_lcl());
|
||||
p2p::broadcast_message(fbuf, false);
|
||||
}
|
||||
|
||||
} // namespace p2p
|
||||
@@ -137,15 +137,17 @@ namespace p2p
|
||||
|
||||
int resolve_peer_challenge(comm::comm_session &session, const peer_challenge_response &challenge_resp);
|
||||
|
||||
void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self);
|
||||
void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding = false);
|
||||
|
||||
void broadcast_message(std::string_view message, const bool send_to_self, const comm::comm_session *skipping_session = NULL);
|
||||
void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding = false, const comm::comm_session *skipping_session = NULL);
|
||||
|
||||
void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf);
|
||||
|
||||
void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf);
|
||||
|
||||
bool validate_for_peer_msg_forwarding(const comm::comm_session &session, const msg::fbuf::p2pmsg::Container *container, const msg::fbuf::p2pmsg::Message &content_message_type);
|
||||
|
||||
void send_connected_status_announcement(flatbuffers::FlatBufferBuilder &fbuf, const bool is_weakly_connected);
|
||||
} // namespace p2p
|
||||
|
||||
#endif
|
||||
@@ -77,8 +77,16 @@ namespace p2p
|
||||
// Check whether the message is qualified for forwarding.
|
||||
if (p2p::validate_for_peer_msg_forwarding(session, container, content_message_type))
|
||||
{
|
||||
// Forward message to peers.
|
||||
p2p::broadcast_message(message, false, &session);
|
||||
if (session.is_weakly_connected)
|
||||
{
|
||||
// Forward messages received by weakly connected nodes to other peers.
|
||||
p2p::broadcast_message(message, false, false, &session);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Forward message received from other nodes to weakly connected peers.
|
||||
p2p::broadcast_message(message, false, true, &session);
|
||||
}
|
||||
}
|
||||
|
||||
if (content_message_type == p2pmsg::Message_Peer_Challenge_Message) // message is a peer challenge announcement
|
||||
@@ -150,14 +158,21 @@ namespace p2p
|
||||
LOG_DEBUG << "NPL message enqueue failure. " << session.uniqueid.substr(0, 10);
|
||||
}
|
||||
}
|
||||
else if (content_message_type == p2pmsg::Message_Connected_Status_Announcement_Message) // This message is the connected status announcement message.
|
||||
{
|
||||
const p2pmsg::Connected_Status_Announcement_Message *announcement_msg = content->message_as_Connected_Status_Announcement_Message();
|
||||
session.is_weakly_connected = announcement_msg->is_weakly_connected();
|
||||
if (session.is_weakly_connected)
|
||||
{
|
||||
LOG_DEBUG << "Weakly connected announcement received from " << session.uniqueid.substr(0, 10);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG << "Strongly connected announcement received from " << session.uniqueid.substr(0, 10);
|
||||
}
|
||||
}
|
||||
else if (content_message_type == p2pmsg::Message_State_Request_Message)
|
||||
{
|
||||
if (p2pmsg::validate_container_trust(container) != 0)
|
||||
{
|
||||
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1);
|
||||
LOG_DEBUG << "State request message rejected due to trust failure. " << session.uniqueid.substr(0, 10);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Insert request with lock.
|
||||
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.state_requests_mutex);
|
||||
@@ -166,13 +181,6 @@ namespace p2p
|
||||
}
|
||||
else if (content_message_type == p2pmsg::Message_State_Response_Message)
|
||||
{
|
||||
if (p2pmsg::validate_container_trust(container) != 0)
|
||||
{
|
||||
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1);
|
||||
LOG_DEBUG << "State response message rejected due to trust failure. " << session.uniqueid.substr(0, 10);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (state_sync::ctx.is_syncing) // Only accept state responses if state is syncing.
|
||||
{
|
||||
// Insert state_response with lock.
|
||||
@@ -183,26 +191,12 @@ namespace p2p
|
||||
}
|
||||
else if (content_message_type == p2pmsg::Message_History_Request_Message) //message is a lcl history request message
|
||||
{
|
||||
if (p2pmsg::validate_container_trust(container) != 0)
|
||||
{
|
||||
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1);
|
||||
LOG_DEBUG << "History request message rejected due to trust failure. " << session.uniqueid.substr(0, 10);
|
||||
return 0;
|
||||
}
|
||||
|
||||
const p2p::history_request hr = p2pmsg::create_history_request_from_msg(*content->message_as_History_Request_Message());
|
||||
std::scoped_lock<std::mutex> lock(ledger::sync_ctx.list_mutex);
|
||||
ledger::sync_ctx.collected_history_requests.push_back(std::make_pair(session.uniqueid, std::move(hr)));
|
||||
}
|
||||
else if (content_message_type == p2pmsg::Message_History_Response_Message) //message is a lcl history response message
|
||||
{
|
||||
if (p2pmsg::validate_container_trust(container) != 0)
|
||||
{
|
||||
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1);
|
||||
LOG_DEBUG << "History response message rejected due to trust failure. " << session.uniqueid.substr(0, 10);
|
||||
return 0;
|
||||
}
|
||||
|
||||
const p2p::history_response hr = p2pmsg::create_history_response_from_msg(*content->message_as_History_Response_Message());
|
||||
std::scoped_lock<std::mutex> lock(ledger::sync_ctx.list_mutex);
|
||||
ledger::sync_ctx.collected_history_responses.push_back(std::move(hr));
|
||||
|
||||
Reference in New Issue
Block a user