diff --git a/src/conf.cpp b/src/conf.cpp index d79f0236..a4b19dee 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -92,6 +92,8 @@ namespace conf cfg.roundtime = 1000; cfg.pubport = 8080; + cfg.msgforwarding = false; + #ifndef NDEBUG cfg.loglevel_type = conf::LOG_SEVERITY::DEBUG; cfg.loglevel = "dbg"; @@ -291,6 +293,8 @@ namespace conf cfg.peermaxbadsigpm = d["peermaxbadsigpm"].as(); cfg.peermaxcons = d["peermaxcons"].as(); + cfg.msgforwarding = d["msgforwarding"].as(); + cfg.loglevel = d["loglevel"].as(); cfg.loglevel_type = get_loglevel_type(cfg.loglevel); cfg.loggers.clear(); @@ -359,6 +363,8 @@ namespace conf d.insert_or_assign("peermaxbadsigpm", cfg.peermaxbadsigpm); d.insert_or_assign("peermaxcons", cfg.peermaxcons); + d.insert_or_assign("msgforwarding", cfg.msgforwarding); + d.insert_or_assign("loglevel", cfg.loglevel); jsoncons::ojson loggers(jsoncons::json_array_arg); diff --git a/src/conf.hpp b/src/conf.hpp index 11ed1a8f..8b7f0a47 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -88,6 +88,8 @@ namespace conf uint64_t peermaxbadsigpm = 0; // Peer bad signatures per minute uint16_t peermaxcons = 0; // Max inbound peer connections + bool msgforwarding = false; // Whether peer message forwarding is on/off. + std::string loglevel; // Log severity level (debug, info, warn, error) LOG_SEVERITY loglevel_type; // Log severity level enum (debug, info, warn, error) std::unordered_set loggers; // List of enabled loggers (console, file) diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 12d3341d..e708be5d 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -5,7 +5,6 @@ #include "../util.hpp" #include "../hplog.hpp" #include "p2p.hpp" -#include "peer_session_handler.hpp" namespace p2p { @@ -144,11 +143,23 @@ namespace p2p } /** - * Broadcasts the given message to all currently connected outbound peers. - * @param fbuf Peer outbound message to be broadcasted. - * @param send_to_self Whether to also send the message to self (this node). - */ + * Broadcasts the given message to all currently connected outbound peers. + * @param fbuf Peer outbound message to be broadcasted. + * @param send_to_self Whether to also send the message to self (this node). + */ void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self) + { + std::string_view msg = std::string_view( + reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); + broadcast_message(msg, send_to_self); + } + + /** + * Broadcast the given message to all connected outbound peers. + * @param message Message to be forwarded. + * @param skipping_session Session to be skipped in message forwarding(optional). + */ + void broadcast_message(std::string_view message, const bool send_to_self, const comm::comm_session *skipping_session) { if (ctx.peer_connections.size() == 0) { @@ -161,14 +172,43 @@ namespace p2p for (const auto &[k, session] : ctx.peer_connections) { - if (!send_to_self && session->is_self) + // Exclude given session and self if provided. + if ((!send_to_self && session->is_self) || (skipping_session && skipping_session == session)) continue; - std::string_view msg = std::string_view( - reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); - session->send(msg); + session->send(message); } } + /** + * Check whether the given message is qualified to be forwarded to peers. + * @param container The message container. + * @param content_message_type The message type. + * @return Returns true if the message is qualified for forwarding to peers. False otherwise. + */ + bool validate_for_peer_msg_forwarding(const comm::comm_session &session, const msg::fbuf::p2pmsg::Container *container, const msg::fbuf::p2pmsg::Message &content_message_type) + { + // Checking whether the message forwarding is enabled and skip if the message is sent from self. + if (!conf::cfg.msgforwarding || session.is_self) + { + return false; + } + + const int64_t time_now = util::get_epoch_milliseconds(); + // Checking the time to live of the container. The time to live for forwarding is three times the round time. + if (container->timestamp() < (time_now - (conf::cfg.roundtime * 3))) + { + LOG_DEBUG << "Peer message is too old for forwarding."; + return false; + } + // Only the selected types of messages are forwarded. + if (content_message_type == msg::fbuf::p2pmsg::Message_Proposal_Message || + content_message_type == msg::fbuf::p2pmsg::Message_NonUnl_Proposal_Message || + content_message_type == msg::fbuf::p2pmsg::Message_Npl_Message) + { + return true; + } + return false; + } /** * Sends the given message to self (this node). diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 39809332..f063aa97 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -8,6 +8,7 @@ #include "peer_session_handler.hpp" #include "../hpfs/h32.hpp" #include "../conf.hpp" +#include "../msg/fbuf/p2pmsg_container_generated.h" namespace p2p { @@ -138,10 +139,13 @@ namespace p2p void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self); + void broadcast_message(std::string_view message, const bool send_to_self, const comm::comm_session *skipping_session = NULL); + void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf); void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf); + bool validate_for_peer_msg_forwarding(const comm::comm_session &session, const msg::fbuf::p2pmsg::Container *container, const msg::fbuf::p2pmsg::Message &content_message_type); } // namespace p2p #endif \ No newline at end of file diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 19c55ae0..8c274c56 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -74,6 +74,12 @@ namespace p2p } const p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc + // Check whether the message is qualified for forwarding. + if (p2p::validate_for_peer_msg_forwarding(session, container, content_message_type)) + { + // Forward message to peers. + p2p::broadcast_message(message, false, &session); + } if (content_message_type == p2pmsg::Message_Peer_Challenge_Message) // message is a peer challenge announcement {