Duplicate check on message enqueue and broadcast

This commit is contained in:
chalith
2025-03-14 18:45:51 +05:30
parent 7c97aaa18f
commit 7f705d4bc4
8 changed files with 46 additions and 7 deletions

View File

@@ -91,7 +91,7 @@ namespace comm
else
should_disconnect = true; // Disconnect if we receive a bad message before challenge verification.
}
else if (priority == 1 || priority == 2)
else if ((priority == 1 || priority == 2) && accept_msg(data))
{
std::vector<char> msg(data.size());
memcpy(msg.data(), data.data(), data.size());
@@ -409,4 +409,9 @@ namespace comm
{
}
bool comm_session::accept_msg(std::string_view msg)
{
return true;
}
} // namespace comm

View File

@@ -60,6 +60,7 @@ namespace comm
virtual int handle_message(std::string_view msg);
virtual void handle_close();
virtual void handle_on_verified();
virtual bool accept_msg(std::string_view msg);
public:
std::string uniqueid; // Verified session: Pubkey in hex format, Unverified session: IP address.

View File

@@ -237,6 +237,14 @@ namespace p2p
(unl_only && !session->is_unl))
continue;
// Messages larger than the duplicate message threshold are ignored from the duplicate message check
// due to the overhead in hash generation for larger messages.
if (message.size() <= conf::MAX_SIZE_FOR_DUP_CHECK && !session->recent_sent_peermsg_hashes.try_emplace(crypto::get_hash(message)))
{
LOG_DEBUG << "Trying to send duplicate peer message. to:" << session->display_name();
continue;
}
session->send(message, priority);
}
}

View File

@@ -75,7 +75,7 @@ namespace p2p
if (!mi.p2p_msg) // Message buffer will be null if peer message was too old.
return 0;
// Messages larger than the duplicate message threshold is ignored from the duplicate message check
// Messages larger than the duplicate message threshold are ignored from the duplicate message check
// due to the overhead in hash generation for larger messages.
if (message_size <= conf::MAX_SIZE_FOR_DUP_CHECK && !recent_peermsg_hashes.try_emplace(crypto::get_hash(msg)))
{
@@ -334,4 +334,17 @@ namespace p2p
p2p::send_peer_requirement_announcement(true, this);
}
bool peer_comm_session::accept_msg(std::string_view msg)
{
// Messages larger than the duplicate message threshold are ignored from the duplicate message check
// due to the overhead in hash generation for larger messages.
if (msg.size() <= conf::MAX_SIZE_FOR_DUP_CHECK && !recent_recvd_peermsg_hashes.try_emplace(crypto::get_hash(msg)))
{
LOG_DEBUG << "Duplicate peer message received. from:" << display_name();
return false;
}
return true;
}
} // namespace p2p

View File

@@ -4,6 +4,7 @@
#include "../pchheader.hpp"
#include "../conf.hpp"
#include "../comm/comm_session.hpp"
#include "../util/rollover_hashset.hpp"
namespace p2p
{
@@ -20,13 +21,16 @@ namespace p2p
int handle_message(std::string_view msg);
void handle_close();
void handle_on_verified();
bool accept_msg(std::string_view msg);
public:
std::optional<conf::peer_ip_port> known_ipport; // A known ip/port information that matches with our peer list configuration.
bool need_consensus_msg_forwarding = false; // Holds whether this node requires consensus message forwarding.
bool is_unl = false; // Whether this session's pubkey is in unl list.
uint32_t reported_time_config = 0; // Initial time config reported by this peer on peer challenge.
bool is_full_history; // Stores whether the connection is to a full history node or not.
std::optional<conf::peer_ip_port> known_ipport; // A known ip/port information that matches with our peer list configuration.
bool need_consensus_msg_forwarding = false; // Holds whether this node requires consensus message forwarding.
bool is_unl = false; // Whether this session's pubkey is in unl list.
uint32_t reported_time_config = 0; // Initial time config reported by this peer on peer challenge.
bool is_full_history; // Stores whether the connection is to a full history node or not.
util::rollover_hashset recent_sent_peermsg_hashes = util::rollover_hashset(200); // The set of recent sent peer message hashes used for duplicate detection.
util::rollover_hashset recent_recvd_peermsg_hashes = util::rollover_hashset(200); // The set of recent received peer message hashes used for duplicate detection.
};
} // namespace p2p

View File

@@ -95,4 +95,9 @@ namespace usr
remove_user(pubkey);
}
bool user_comm_session::accept_msg(std::string_view msg)
{
return true;
}
} // namespace usr

View File

@@ -17,6 +17,7 @@ namespace usr
int handle_connect();
int handle_message(std::string_view msg);
void handle_close();
bool accept_msg(std::string_view msg);
};
} // namespace usr

View File

@@ -18,6 +18,7 @@ hpversion=0.6.4
let pubport=8080+$n
let peerport=22860+$n
let gpport=22880+$n
# Mount the node<id> contract directory into hpcore docker container and run.
# We specify --network=hpnet so all nodes will communicate via 'hpnet' docker virtual network.
@@ -25,6 +26,7 @@ let peerport=22860+$n
docker run --rm -t -i --network=hpnet --ip=172.1.1.${n} --name=node${n} \
-p ${pubport}:${pubport} \
-p ${peerport}:${peerport} \
-p ${gpport}:${gpport} \
--device /dev/fuse --cap-add SYS_ADMIN --security-opt apparmor:unconfined \
--mount type=bind,source=${clusterloc}/node${n},target=/contract \
hpcore:${hpversion} run /contract