Improved user inputs handling in consensus (#46)

Updated flatbuffer proposal raw_inputs, raw_outputs data structure.
Improved user inputs handling in consensus.
This commit is contained in:
Ravin Perera
2019-10-28 00:17:35 +05:30
committed by GitHub
parent d6acee4e09
commit 5ea2bef62a
16 changed files with 539 additions and 196 deletions

View File

@@ -1,5 +1,3 @@
#include <unordered_map>
#include <list>
#include <math.h>
#include <thread>
#include <flatbuffers/flatbuffers.h>
@@ -43,8 +41,8 @@ void consensus()
ctx.time_now = util::get_epoch_milliseconds();
// Throughout consensus, we move over the incoming proposals collected via the network so far into
// the candidate proposal set (move and append). This is to have a private working set for the consensus and void
// threading conflicts with network incoming proposals list.
// the candidate proposal set (move and append). This is to have a private working set for the consensus and avoid
// threading conflicts with network incoming proposals.
{
std::lock_guard<std::mutex> lock(p2p::collected_msgs.proposals_mutex);
ctx.candidate_proposals.splice(ctx.candidate_proposals.end(), p2p::collected_msgs.proposals);
@@ -54,20 +52,25 @@ void consensus()
{
// Stage 0 means begining of a consensus round.
// Remove any useless candidate proposals so we'll have a cleaner proposal set to look at
// when we transition to stage 1.
auto itr = ctx.candidate_proposals.begin();
while (itr != ctx.candidate_proposals.end())
{
// Remove any proposal from previous round's stage 3.
// Remove any proposal from self (pubkey match).
// todo: check the state of these to ensure we're running consensus ledger
if (itr->stage == 3 || conf::cfg.pubkey == itr->pubkey)
ctx.candidate_proposals.erase(itr++);
else
++itr;
// Remove any useless candidate proposals so we'll have a cleaner proposal set to look at
// when we transition to stage 1.
auto itr = ctx.candidate_proposals.begin();
while (itr != ctx.candidate_proposals.end())
{
// Remove any proposal from previous round's stage 3.
// Remove any proposal from self (pubkey match).
// todo: check the state of these to ensure we're running consensus ledger
if (itr->stage == 3 || conf::cfg.pubkey == itr->pubkey)
ctx.candidate_proposals.erase(itr++);
else
++itr;
}
}
// Transfer connected user data onto consensus candidate data.
populate_candidate_users_and_inputs();
// In stage 0 we create a novel proposal and broadcast it.
const p2p::proposal stg_prop = create_stage0_proposal();
if (broadcast_proposal(stg_prop) != 0)
@@ -127,6 +130,25 @@ void consensus()
std::this_thread::sleep_for(std::chrono::milliseconds(conf::cfg.roundtime / 4));
}
/**
* Populate connected users and their inputs (if any) into consensus candidate data.
*/
void populate_candidate_users_and_inputs()
{
// Lock the connected user list until we do this operation.
std::lock_guard<std::mutex> lock(usr::users_mutex);
for (auto &[sid, con_user] : usr::users)
{
// Populate the user into candidate user inputs map.
// We do this regardless of whether the user has any inputs or not.
std::list<util::hash_buffer> &inplist = ctx.candidate_users[con_user.pubkey];
// Transfer the connected user's inputs (if any) to the candidate user's inputs list.
inplist.splice(inplist.end(), con_user.inputs);
}
}
p2p::proposal create_stage0_proposal()
{
// The proposal we are going to emit in stage 0.
@@ -137,36 +159,36 @@ p2p::proposal create_stage0_proposal()
stg_prop.stage = 0;
stg_prop.lcl = ctx.lcl;
// Populate the stg_prop with users list (user pubkey list) and their inputs.
{
std::lock_guard<std::mutex> lock(usr::users_mutex);
for (auto &[sid, user] : usr::users)
{
// add all the user connections we host
stg_prop.users.emplace(user.pubkey);
// Populate the poposal with users list (user pubkey list) and their inputs.
// and all their pending messages
if (!user.inbuffer.empty())
{
std::string input;
input.swap(user.inbuffer);
stg_prop.raw_inputs.try_emplace(user.pubkey, std::move(input));
}
for (auto [pubkey, inputs] : ctx.candidate_users)
{
// Add all the user connections we host.
stg_prop.users.emplace(pubkey);
// Add all their pending inputs.
if (!inputs.empty())
{
std::vector<util::hash_buffer> inpvec;
for (util::hash_buffer &hashbuf : inputs)
inpvec.push_back(hashbuf); // Copy all hashbufs from candidate inputs into the proposal.
stg_prop.raw_inputs.emplace(pubkey, std::move(inpvec));
}
}
// Populate the stg_prop with any contract outputs from previous round's stage 3.
for (auto &[pubkey, bufpair] : ctx.local_userbuf)
for (auto &[pubkey, bufpair] : ctx.useriobufmap)
{
if (!bufpair.second.empty()) // bufpair.second is the output buffer.
if (!bufpair.output.empty())
{
std::string rawoutput;
rawoutput.swap(bufpair.second);
rawoutput.swap(bufpair.output);
stg_prop.raw_outputs.try_emplace(pubkey, std::move(rawoutput));
stg_prop.raw_outputs.try_emplace(pubkey, util::hash_buffer(rawoutput, pubkey));
}
}
ctx.local_userbuf.clear();
ctx.useriobufmap.clear();
// todo: set propsal states
// todo: generate stg_prop hash and check with ctx.novel_proposal, we are sending same stg_prop again.
@@ -188,7 +210,7 @@ p2p::proposal create_stage123_proposal(vote_counter &votes)
//todo:check lcl votes and wait for proposals
// Vote for rest of the proposal fields
// Vote for rest of the proposal fields by looking at candidate proposals.
for (const p2p::proposal &cp : ctx.candidate_proposals)
{
// Vote for times.
@@ -202,64 +224,52 @@ p2p::proposal create_stage123_proposal(vote_counter &votes)
// Vote for user inputs
// Proposals from stage 0 will have raw inputs in them.
// Proposals from stage 0 will have raw inputs (and their hashes) in them.
if (!cp.raw_inputs.empty())
{
for (auto &[pubkey, input] : cp.raw_inputs)
for (auto &[pubkey, inputs] : cp.raw_inputs)
{
// Hash the pubkey+input.
std::string str_to_hash;
str_to_hash.reserve(pubkey.size() + input.size());
str_to_hash.append(pubkey);
str_to_hash.append(input);
std::string hash = crypto::sha_512_hash(str_to_hash, "INP", 3);
// Vote for the input hash.
for (util::hash_buffer input : inputs)
{
increment(votes.inputs, input.hash);
// Vote for the hash.
increment(votes.inputs, hash);
// Remember the actual input along with the hash for future use for apply-ledger.
ctx.possible_inputs.try_emplace(
std::move(hash),
std::make_pair(pubkey, input));
std::string inputbuffer;
inputbuffer.swap(input.buffer);
// Remember the actual input along with the hash for future use for apply-ledger.
ctx.possible_inputs.try_emplace(input.hash, std::make_pair(pubkey, inputbuffer));
}
}
}
// Proposals from stage 1, 2, 3 will have hashed inputs in them.
// Proposals from stage 1, 2, 3 will have only input hashes in them.
else if (!cp.hash_inputs.empty())
{
for (const std::string &inputhash : cp.hash_inputs)
increment(votes.inputs, inputhash);
}
// Vote for user outputs
// Vote for contract outputs
// Proposals from stage 0 will have raw user outputs in them.
if (!cp.raw_outputs.empty())
{
for (auto &[pubkey, output] : cp.raw_outputs)
for (auto [pubkey, output] : cp.raw_outputs)
{
// Hash the pubkey+input.
std::string str_to_hash;
str_to_hash.reserve(pubkey.size() + output.size());
str_to_hash.append(pubkey);
str_to_hash.append(output);
std::string hash = crypto::sha_512_hash(str_to_hash, "OUT", 3);
// Vote for the hash.
increment<std::string>(votes.outputs, hash);
increment<std::string>(votes.outputs, output.hash);
// Remember the actual output along with the hash for future use for apply-ledger.
ctx.possible_outputs.try_emplace(
std::move(hash),
std::make_pair(pubkey, output));
std::string outputbuf;
outputbuf.swap(output.buffer);
// Remember the actual output along with the hash for future use for apply-ledger and sending back to user.
ctx.possible_outputs.try_emplace(output.hash, std::make_pair(pubkey, outputbuf));
}
}
// Proposals from stage 1, 2, 3 will have hashed user outputs in them.
else if (!cp.hash_outputs.empty())
{
for (auto outputhash : cp.hash_outputs)
{
increment<std::string>(votes.outputs, outputhash);
}
}
// todo: repeat above for state
@@ -458,7 +468,6 @@ void apply_ledger(const p2p::proposal &cons_prop)
//create input to feed to binary contract run
//todo:remove entries from pending inputs that made their way into a closed ledger
for (const std::string &hash : cons_prop.hash_inputs)
{
auto itr = ctx.possible_inputs.find(hash);
@@ -471,7 +480,7 @@ void apply_ledger(const p2p::proposal &cons_prop)
}
else
{
// Prepare ctx.local_userbuf with user inputs to feed to the contract.
// Prepare ctx.useriobufmap with user inputs to feed to the contract.
const std::string &pubkey = itr->second.first;
std::string rawinput = itr->second.second;
@@ -479,21 +488,45 @@ void apply_ledger(const p2p::proposal &cons_prop)
std::string inputtofeed;
inputtofeed.swap(rawinput);
std::pair<std::string, std::string> bufpair;
bufpair.first = std::move(inputtofeed);
ctx.local_userbuf.try_emplace(pubkey, std::move(bufpair));
proc::contract_iobuf_pair &bufpair = ctx.useriobufmap[pubkey];
bufpair.inputs.push_back(std::move(inputtofeed));
}
}
ctx.possible_inputs.clear();
run_contract_binary(cons_prop.time);
// Remove entries from candidate inputs that made their way into a closed ledger
auto cu_itr = ctx.candidate_users.begin();
while (cu_itr != ctx.candidate_users.end())
{
// Delete any ledger inputs for this user.
std::list<util::hash_buffer> &inputs = cu_itr->second;
auto inp_itr = inputs.begin();
while (inp_itr != inputs.end())
{
// Delete the input from the list, if it was part of consensus proposal.
if (cons_prop.hash_inputs.count(inp_itr->hash))
inputs.erase(inp_itr++);
else
++inp_itr;
}
// Delete the user from the list if there are no more unprocessed inputs.
if (cu_itr->second.empty())
ctx.candidate_users.erase(cu_itr++);
else
++cu_itr;
}
}
void run_contract_binary(int64_t time_now)
{
std::pair<std::string, std::string> hpscbufpair;
std::unordered_map<std::string, std::pair<std::string, std::string>> nplbufs;
// todo:implement proper data structures to exchange npl and hpsc bufs
proc::contract_bufmap_t nplbufs;
proc::contract_iobuf_pair hpscbufpair;
proc::ContractExecArgs eargs(time_now, ctx.local_userbuf, nplbufs, hpscbufpair);
proc::ContractExecArgs eargs(time_now, ctx.useriobufmap, nplbufs, hpscbufpair);
proc::exec_contract(eargs);
}