Implemented user duplicate input message check.

This commit is contained in:
Ravin Perera
2019-11-05 11:47:42 +05:30
parent 83189556de
commit 8a153d5bb5
7 changed files with 103 additions and 66 deletions

View File

@@ -51,19 +51,18 @@ void consensus()
ctx.candidate_proposals.splice(ctx.candidate_proposals.end(), p2p::collected_msgs.proposals);
}
std::cout << "Started stage " << std::to_string(ctx.stage) << "\n";
LOG_DBG << "Started stage " << std::to_string(ctx.stage);
for (auto p : ctx.candidate_proposals)
{
bool self = p.pubkey == conf::cfg.pubkey;
std::cout << "[stage" << std::to_string(p.stage)
LOG_DBG << "[stage" << std::to_string(p.stage)
<< "] users:" << p.users.size()
<< " hinp:" << p.hash_inputs.size()
<< " hout:" << p.hash_outputs.size()
<< " lcl:" << p.lcl
<< " self:" << self
<< "\n";
<< " self:" << self;
}
std::cout << "timenow:" << std::to_string(ctx.time_now) << "\n";
LOG_DBG << "timenow: " << std::to_string(ctx.time_now);
if (ctx.stage == 0)
{
@@ -207,29 +206,39 @@ void verify_and_populate_candidate_user_inputs()
for (const usr::user_submitted_message &umsg : umsgs)
{
// Verify the signature of the message content.
if (crypto::verify(umsg.content, umsg.sig, pubkey) == 0)
std::string sig_hash = crypto::get_hash(umsg.sig);
// Check for duplicate messages using hash of the signature.
if (ctx.recent_userinput_hashes.try_emplace(sig_hash))
{
// TODO: Also verify XRP payment token.
std::string nonce;
std::string input;
uint64_t maxledgerseqno;
jusrmsg::extract_input_container(nonce, input, maxledgerseqno, umsg.content);
// Ignore the input if our ledger has passed the input TTL.
if (maxledgerseqno > ctx.led_seq_no)
// Verify the signature of the message content.
if (crypto::verify(umsg.content, umsg.sig, pubkey) == 0)
{
// Hash is prefixed with the nonce to support user-defined sort order.
std::string hash = std::move(nonce);
// Append the hash of the message signature to get the final hash.
hash.append(crypto::get_hash(umsg.sig));
// TODO: Also verify XRP payment token/AppBill requirements.
ctx.candidate_user_inputs.try_emplace(
hash,
candidate_user_input(pubkey, std::move(input), maxledgerseqno));
std::string nonce;
std::string input;
uint64_t maxledgerseqno;
jusrmsg::extract_input_container(nonce, input, maxledgerseqno, umsg.content);
// Ignore the input if our ledger has passed the input TTL.
if (maxledgerseqno > ctx.led_seq_no)
{
// Hash is prefixed with the nonce to support user-defined sort order.
std::string hash = std::move(nonce);
// Append the hash of the message signature to get the final hash.
hash.append(sig_hash);
ctx.candidate_user_inputs.try_emplace(
hash,
candidate_user_input(pubkey, std::move(input), maxledgerseqno));
}
}
}
else
{
LOG_DBG << "Duplicate user message.";
}
}
}
}

View File

@@ -2,6 +2,7 @@
#define _HP_CONS_
#include "../pchheader.hpp"
#include "../util.hpp"
#include "../proc.hpp"
#include "../p2p/p2p.hpp"
#include "../usr/user_input.hpp"
@@ -69,6 +70,8 @@ struct consensus_context
// all users. We will use this map to distribute outputs back to connected users once consensus is achieved.
std::unordered_map<std::string, candidate_user_output> candidate_user_outputs;
util::rollover_hashset recent_userinput_hashes;
uint8_t stage;
uint64_t novel_proposal_time;
uint64_t time_now;
@@ -77,6 +80,10 @@ struct consensus_context
std::string novel_proposal;
int32_t next_sleep;
consensus_context() : recent_userinput_hashes(200)
{
}
};
struct vote_counter

View File

@@ -55,16 +55,6 @@ std::thread peer_thread;
*/
sock::session_options sess_opts;
// The set of recent peer message hashes used for duplicate detection.
std::unordered_set<std::string> recent_peermsg_hashes;
// The supporting list of recent peer message hashes used for adding and removing hashes from
// the 'recent_peermsg_hashes' in a first-in-first-out manner.
std::list<const std::string *> recent_peermsg_hashes_list;
// Maximum number of recent message hashes to remember.
static const int16_t MAX_RECENT_MSG_HASHES = 200;
int init()
{
//Entry point for p2p which will start peer connections to other nodes
@@ -120,36 +110,6 @@ void peer_connection_watchdog()
}
}
bool is_message_duplicate(std::string_view message)
{
// Get message hash and see whether message is already recieved -> abandon if duplicate.
std::string hash = crypto::get_hash(message);
auto itr = recent_peermsg_hashes.find(hash);
if (itr == recent_peermsg_hashes.end()) // Not found
{
// Add the new message hash to the list.
auto [newitr, success] = recent_peermsg_hashes.emplace(hash);
// Insert a pointer to the stored hash value into the ordered list of hashes.
recent_peermsg_hashes_list.push_back(&(*newitr));
// Remove old hashes if exceeding max hash count.
if (recent_peermsg_hashes_list.size() > MAX_RECENT_MSG_HASHES)
{
const std::string &oldesthash = *recent_peermsg_hashes_list.front();
recent_peermsg_hashes.erase(oldesthash);
recent_peermsg_hashes_list.pop_front();
}
return false;
}
LOG_DBG << "Duplicate peer message.";
return true;
}
/**
* Broadcasts the given message to all currently connected outbound peers.
*/

View File

@@ -53,8 +53,6 @@ void start_peer_connections();
void peer_connection_watchdog();
bool is_message_duplicate(std::string_view message);
void broadcast_message(peer_outbound_message msg);
} // namespace p2p

View File

@@ -15,6 +15,9 @@ namespace p2pmsg = fbschema::p2pmsg;
namespace p2p
{
// The set of recent peer message hashes used for duplicate detection.
util::rollover_hashset recent_peermsg_hashes(200);
/**
* This gets hit every time a peer connects to HP via the peer port (configured in contract config).
*/
@@ -51,8 +54,11 @@ void peer_session_handler::on_message(sock::socket_session<peer_outbound_message
if (p2pmsg::validate_and_extract_content(&content, content_ptr, content_size) != 0)
return;
if (is_message_duplicate(message))
if (!recent_peermsg_hashes.try_emplace(crypto::get_hash(message)))
{
LOG_DBG << "Duplicate peer message.";
return;
}
p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc

View File

@@ -1,8 +1,45 @@
#include "pchheader.hpp"
#include "util.hpp"
namespace util
{
// rollover_hashset class methods
rollover_hashset::rollover_hashset(uint32_t maxsize)
{
this->maxsize = maxsize == 0 ? 1 : maxsize;
}
/**
* Inserts the given hash to the list.
* @return True on succesful insertion. False if hash already exists.
*/
bool rollover_hashset::try_emplace(std::string hash)
{
auto itr = recent_hashes.find(hash);
if (itr == recent_hashes.end()) // Not found
{
// Add the new message hash to the set.
auto [newitr, success] = recent_hashes.emplace(hash);
// Insert a pointer to the stored hash value to the back of the ordered list of hashes.
recent_hashes_list.push_back(&(*newitr));
// Remove oldest hash if exceeding max size.
if (recent_hashes_list.size() > maxsize)
{
const std::string &oldest_hash = *recent_hashes_list.front();
recent_hashes.erase(oldest_hash);
recent_hashes_list.pop_front();
}
return true; // Hash was inserted successfuly.
}
return false; // Hash already exists.
}
/**
* Encodes provided bytes to hex string.
*

View File

@@ -43,6 +43,26 @@ enum SESSION_THRESHOLDS
MAX_BYTES_PER_MINUTE = 0
};
/**
* FIFO hash set with a max size.
*/
class rollover_hashset
{
private:
// The set of recent hashes used for duplicate detection.
std::unordered_set<std::string> recent_hashes;
// The supporting list of recent hashes used for adding and removing hashes from
// the 'recent_hashes' in a first-in-first-out manner.
std::list<const std::string *> recent_hashes_list;
uint32_t maxsize;
public:
rollover_hashset(uint32_t maxsize);
bool try_emplace(std::string hash);
};
int bin2hex(std::string &encoded_string, const unsigned char *bin, size_t bin_len);
int hex2bin(unsigned char *decoded, size_t decoded_len, std::string_view hex_str);