Peer message duplicate detection.

This commit is contained in:
Ravin Perera
2019-10-28 23:28:51 +05:30
parent 1fa19c381f
commit eeadd1d744
5 changed files with 50 additions and 30 deletions

View File

@@ -34,7 +34,7 @@ namespace fbschema::p2pmsg
* Verifies Conatiner message structure and outputs faltbuffer Container pointer to access the given buffer.
*
* @param container_ref A pointer reference to assign the pointer to the Container object.
* @param container_bud The buffer containing the data that should validated and interpreted
* @param container_buf The buffer containing the data that should be validated and interpreted
* via the container pointer.
* @return 0 on successful verification. -1 for failure.
*/
@@ -57,9 +57,6 @@ int validate_and_extract_container(const Container **container_ref, std::string_
//Get message container
const Container *container = GetContainer(container_buf_ptr);
//Validation are prioritzed base on expensiveness of validation.
//i.e - signature validation is done at the end.
//check protocol version of message whether it is greater than minimum supported protocol version.
const uint16_t version = container->version();
if (version < util::MIN_PEERMSG_VERSION)
@@ -68,29 +65,14 @@ int validate_and_extract_container(const Container **container_ref, std::string_
return -1;
}
int64_t time_now = util::get_epoch_milliseconds();
//check message timestamp.
int64_t time_now = util::get_epoch_milliseconds();
if (container->timestamp() < (time_now - conf::cfg.roundtime * 4))
{
LOG_DBG << "Peer message is too old.";
return -1;
}
// After signature is verified, get message hash and see wheteher
// message is already recieved -> abandon if duplicate.
// auto messageHash = crypto::sha_512_hash(message, "PEERMSG", 7);
// if (recent_peer_msghash.count(messageHash) == 0)
// {
// recent_peer_msghash.try_emplace(std::move(messageHash), timestamp);
// }
// else
// {
// LOG_DBG << "Duplicate message";
// return -1;
// }
//Assign container and content out params.
*container_ref = container;
return 0;
@@ -363,4 +345,4 @@ hashbuffermap_to_flatbuf_rawoutputs(flatbuffers::FlatBufferBuilder &builder, con
return builder.CreateVector(fbvec);
}
} // namespace p2p
} // namespace fbschema::p2pmsg

View File

@@ -46,6 +46,6 @@ hashbuffermap_to_flatbuf_rawinputs(flatbuffers::FlatBufferBuilder &builder, cons
const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<RawOutput>>>
hashbuffermap_to_flatbuf_rawoutputs(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map<std::string, util::hash_buffer> &map);
} // namespace p2p
} // namespace fbschema::p2pmsg
#endif

View File

@@ -35,7 +35,7 @@ p2p::peer_session_handler global_peer_session_handler;
net::io_context ioc;
/**
* SSL context used by the boost library in providing tls support
* SSL context used by the boost library in providing tls support
*/
ssl::context ctx{ssl::context::tlsv13};
@@ -53,9 +53,17 @@ std::thread peer_thread;
/**
* Used to pass down the default settings to the socket session
*/
sock::session_options sess_opts;
sock::session_options sess_opts;
std::map<std::string, int64_t> recent_peer_msghash;
// The set of recent peer message hashes used for duplicate detection.
std::unordered_set<std::string> recent_peermsg_hashes;
// The supporting list of recent peer message hashes used for adding and removing hashes from
// the 'recent_peermsg_hashes' in a first-in-first-out manner.
std::list<const std::string *> recent_peermsg_hashes_list;
// Maximum number of recent message hashes to remember.
static const int16_t MAX_RECENT_MSG_HASHES = 200;
int init()
{
@@ -111,4 +119,34 @@ void peer_connection_watchdog()
}
}
bool is_message_duplicate(std::string_view message)
{
// Get message hash and see whether message is already recieved -> abandon if duplicate.
std::string hash = crypto::sha_512_hash(message);
auto itr = recent_peermsg_hashes.find(hash);
if (itr == recent_peermsg_hashes.end()) // Not found
{
// Add the new message hash to the list.
auto [newitr, success] = recent_peermsg_hashes.emplace(hash);
// Insert a pointer to the stored hash value into the ordered list of hashes.
recent_peermsg_hashes_list.push_back(&(*newitr));
// Remove old hashes if exceeding max hash count.
if (recent_peermsg_hashes_list.size() > MAX_RECENT_MSG_HASHES)
{
const std::string &oldesthash = *recent_peermsg_hashes_list.front();
recent_peermsg_hashes.erase(oldesthash);
recent_peermsg_hashes_list.pop_front();
}
return false;
}
LOG_DBG << "Duplicate peer message.";
return true;
}
} // namespace p2p

View File

@@ -42,11 +42,6 @@ extern message_collection collected_msgs;
extern std::unordered_map<std::string, sock::socket_session<peer_outbound_message> *> peer_connections;
extern std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions.
/**
* This is used to store hash of recent peer messages: messagehash -> timestamp of message
*/
extern std::map<std::string, int64_t> recent_peer_msghash;
int init();
//p2p message handling
@@ -54,6 +49,8 @@ void start_peer_connections();
void peer_connection_watchdog();
bool is_message_duplicate(std::string_view message);
} // namespace p2p
#endif

View File

@@ -71,6 +71,9 @@ void peer_session_handler::on_message(sock::socket_session<peer_outbound_message
if (p2pmsg::validate_and_extract_content(&content, content_ptr, content_size) != 0)
return;
if (is_message_duplicate(message))
return;
p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc
if (content_message_type == p2pmsg::Message_Proposal_Message) //message is a proposal message