Refactored user I/O with signed inputs and NUPs. (#53)

This commit is contained in:
Ravin Perera
2019-11-03 22:45:23 +05:30
committed by GitHub
parent cb364cc420
commit 83189556de
27 changed files with 958 additions and 596 deletions

View File

@@ -2,8 +2,10 @@
#include "../pchheader.hpp"
#include "../conf.hpp"
#include "../usr/usr.hpp"
#include "../usr/user_input.hpp"
#include "../p2p/p2p.hpp"
#include "../fbschema/p2pmsg_helpers.hpp"
#include "../jsonschema/usrmsg_helpers.hpp"
#include "../p2p/peer_session_handler.hpp"
#include "../hplog.hpp"
#include "../crypto.hpp"
@@ -12,33 +14,19 @@
#include "cons.hpp"
namespace p2pmsg = fbschema::p2pmsg;
namespace jusrmsg = jsonschema::usrmsg;
namespace cons
{
consensus_context ctx;
/**
* Increment voting table counter.
*
* @param counter The counter map in which a vote should be incremented.
* @param candidate The candidate whose vote should be increased by 1.
*/
template <typename T>
void increment(std::map<T, int32_t> &counter, const T &candidate)
{
if (counter.count(candidate))
counter[candidate]++;
else
counter.try_emplace(candidate, 1);
}
int init()
{
//set start stage
ctx.stage = 0;
//load lcl detals from lcl history.
//load lcl details from lcl history.
const ledger_history ldr_hist = load_ledger();
ctx.led_seq_no = ldr_hist.led_seq_no;
ctx.lcl = ldr_hist.lcl;
@@ -56,8 +44,8 @@ void consensus()
ctx.time_now = util::get_epoch_milliseconds();
// Throughout consensus, we move over the incoming proposals collected via the network so far into
// the candidate proposal set (move and append). This is to have a private working set for the consensus and avoid
// threading conflicts with network incoming proposals.
// the candidate proposal set (move and append). This is to have a private working set for the consensus
// and avoid threading conflicts with network incoming proposals.
{
std::lock_guard<std::mutex> lock(p2p::collected_msgs.proposals_mutex);
ctx.candidate_proposals.splice(ctx.candidate_proposals.end(), p2p::collected_msgs.proposals);
@@ -69,9 +57,7 @@ void consensus()
bool self = p.pubkey == conf::cfg.pubkey;
std::cout << "[stage" << std::to_string(p.stage)
<< "] users:" << p.users.size()
<< " rinp:" << p.raw_inputs.size()
<< " hinp:" << p.hash_inputs.size()
<< " rout:" << p.raw_outputs.size()
<< " hout:" << p.hash_outputs.size()
<< " lcl:" << p.lcl
<< " self:" << self
@@ -99,17 +85,16 @@ void consensus()
}
}
// Transfer connected user data onto consensus candidate data.
populate_candidate_users_and_inputs();
// Broadcast non-unl proposals (NUP) containing inputs from locally connected users.
broadcast_nonunl_proposal();
util::sleep(conf::cfg.roundtime / 10);
// Verify and transfer user inputs from incoming NUPs onto consensus candidate data.
verify_and_populate_candidate_user_inputs();
// In stage 0 we create a novel proposal and broadcast it.
const p2p::proposal stg_prop = create_stage0_proposal();
if (broadcast_proposal(stg_prop) != 0)
{
// No peers to broadcast stage0 proposal (not even self). So we wait and try stage 0 again.
timewait_stage(true);
return;
}
broadcast_proposal(stg_prop);
}
else // Stage 1, 2, 3
{
@@ -173,28 +158,82 @@ void consensus()
// after a stage 0 novel proposal we will just busy wait for proposals
if (ctx.stage == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(conf::cfg.roundtime / 100));
util::sleep(conf::cfg.roundtime / 100);
else
std::this_thread::sleep_for(std::chrono::milliseconds(conf::cfg.roundtime / 4));
util::sleep(conf::cfg.roundtime / 4);
}
/**
* Populate connected users and their inputs (if any) into consensus candidate data.
* Broadcasts any inputs from locally connected users via an NUP.
* @return 0 for successful broadcast. -1 for failure.
*/
void populate_candidate_users_and_inputs()
void broadcast_nonunl_proposal()
{
// Lock the connected user list until we do this operation.
std::lock_guard<std::mutex> lock(usr::ctx.users_mutex);
for (auto &[sid, con_user] : usr::ctx.users)
// Construct NUP.
p2p::nonunl_proposal nup;
std::lock_guard<std::mutex> lock(p2p::collected_msgs.nonunl_proposals_mutex);
for (auto &[sid, user] : usr::ctx.users)
{
// Populate the user into candidate user inputs map.
// We do this regardless of whether the user has any inputs or not.
std::list<usr::user_submitted_message> usermsgs;
usermsgs.splice(usermsgs.end(), user.submitted_inputs);
std::list<util::hash_buffer> &inplist = ctx.candidate_users[con_user.pubkey];
// Transfer the connected user's inputs (if any) to the candidate user's inputs list.
inplist.splice(inplist.end(), con_user.inputs);
// We should create an entry for each user pubkey, even if the user has no inputs. This is
// because this data map will be used to track connected users as well in addition to inputs.
nup.user_messages.try_emplace(user.pubkey, std::move(usermsgs));
}
p2p::peer_outbound_message msg(std::make_shared<flatbuffers::FlatBufferBuilder>(1024));
p2pmsg::create_msg_from_nonunl_proposal(msg.builder(), nup);
p2p::broadcast_message(msg);
LOG_DBG << "NUP sent." << " users:" << nup.user_messages.size();
}
/**
* Verifies the user signatures and populate non-expired user inputs from collected
* non-unl proposals (if any) into consensus candidate data.
*/
void verify_and_populate_candidate_user_inputs()
{
// Lock the list so any network activity is blocked.
std::lock_guard<std::mutex> lock(p2p::collected_msgs.nonunl_proposals_mutex);
for (const p2p::nonunl_proposal &p : p2p::collected_msgs.nonunl_proposals)
{
for (const auto &[pubkey, umsgs] : p.user_messages)
{
// Populate user list.
ctx.candidate_users.emplace(pubkey);
for (const usr::user_submitted_message &umsg : umsgs)
{
// Verify the signature of the message content.
if (crypto::verify(umsg.content, umsg.sig, pubkey) == 0)
{
// TODO: Also verify XRP payment token.
std::string nonce;
std::string input;
uint64_t maxledgerseqno;
jusrmsg::extract_input_container(nonce, input, maxledgerseqno, umsg.content);
// Ignore the input if our ledger has passed the input TTL.
if (maxledgerseqno > ctx.led_seq_no)
{
// Hash is prefixed with the nonce to support user-defined sort order.
std::string hash = std::move(nonce);
// Append the hash of the message signature to get the final hash.
hash.append(crypto::get_hash(umsg.sig));
ctx.candidate_user_inputs.try_emplace(
hash,
candidate_user_input(pubkey, std::move(input), maxledgerseqno));
}
}
}
}
}
p2p::collected_msgs.nonunl_proposals.clear();
}
p2p::proposal create_stage0_proposal()
@@ -206,39 +245,23 @@ p2p::proposal create_stage0_proposal()
stg_prop.stage = 0;
stg_prop.lcl = ctx.lcl;
// Populate the poposal with users list (user pubkey list) and their inputs.
for (auto [pubkey, inputs] : ctx.candidate_users)
{
// Add all the user connections we host.
// Populate the proposal with set of candidate user pubkeys.
for (const std::string &pubkey : ctx.candidate_users)
stg_prop.users.emplace(pubkey);
// Add all their pending inputs.
if (!inputs.empty())
{
std::vector<util::hash_buffer> inpvec;
for (util::hash_buffer &hashbuf : inputs)
inpvec.push_back(hashbuf); // Copy all hashbufs from candidate inputs into the proposal.
// We don't need candidate_users anymore, so clear it. It will be repopulated during next censensus round.
ctx.candidate_users.clear();
stg_prop.raw_inputs.emplace(pubkey, std::move(inpvec));
}
}
// Populate the proposal with hashes of user inputs.
for (const auto &[hash, cand_input] : ctx.candidate_user_inputs)
stg_prop.hash_inputs.emplace(hash);
// Populate the stg_prop with any contract outputs from previous round's stage 3.
for (auto &[pubkey, bufpair] : ctx.useriobufmap)
{
if (!bufpair.output.empty())
{
std::string rawoutput;
rawoutput.swap(bufpair.output);
stg_prop.raw_outputs.try_emplace(pubkey, util::hash_buffer(rawoutput, pubkey));
}
}
ctx.useriobufmap.clear();
// Populate the proposal with hashes of user outputs.
for (const auto &[hash, cand_output] : ctx.candidate_user_outputs)
stg_prop.hash_outputs.emplace(hash);
// todo: set propsal states
// todo: generate stg_prop hash and check with ctx.novel_proposal, we are sending same stg_prop again.
// todo: generate stg_prop hash and check with ctx.novel_proposal, we are sending same proposal again.
return stg_prop;
}
@@ -254,69 +277,27 @@ p2p::proposal create_stage123_proposal(vote_counter &votes)
// our peers or we will halt depending on level of consensus on the sides of the fork
stg_prop.lcl = ctx.lcl;
//todo:check lcl votes and wait for proposals
// Vote for rest of the proposal fields by looking at candidate proposals.
for (const p2p::proposal &cp : ctx.candidate_proposals)
{
// Vote for times.
// Everyone votes on an arbitrary time, as long as its within the round time and not in the future
// Everyone votes on an arbitrary time, as long as its within the round time and not in the future.
if (ctx.time_now > cp.time && (ctx.time_now - cp.time) < conf::cfg.roundtime)
increment(votes.time, cp.time);
// Vote for user connections
for (const std::string &user : cp.users)
increment(votes.users, user);
// Vote for user pubkeys.
for (const std::string &pubkey : cp.users)
increment(votes.users, pubkey);
// Vote for user inputs
// Vote for user inputs (hashes). Only vote for the inputs that are in our candidate_inputs set.
for (const std::string &hash : cp.hash_inputs)
if (ctx.candidate_user_inputs.count(hash) > 0)
increment(votes.inputs, hash);
// Proposals from stage 0 will have raw inputs (and their hashes) in them.
if (!cp.raw_inputs.empty())
{
for (auto &[pubkey, inputs] : cp.raw_inputs)
{
// Vote for the input hash.
for (util::hash_buffer input : inputs)
{
increment(votes.inputs, input.hash);
std::string inputbuffer;
inputbuffer.swap(input.buffer);
// Remember the actual input along with the hash for future use for apply-ledger.
ctx.possible_inputs.try_emplace(input.hash, std::make_pair(pubkey, inputbuffer));
}
}
}
// Proposals from stage 1, 2, 3 will have only input hashes in them.
else if (!cp.hash_inputs.empty())
{
for (const std::string &inputhash : cp.hash_inputs)
increment(votes.inputs, inputhash);
}
// Vote for contract outputs
// Proposals from stage 0 will have raw user outputs in them.
if (!cp.raw_outputs.empty())
{
for (auto [pubkey, output] : cp.raw_outputs)
{
// Vote for the hash.
increment<std::string>(votes.outputs, output.hash);
std::string outputbuf;
outputbuf.swap(output.buffer);
// Remember the actual output along with the hash for future use for apply-ledger and sending back to user.
ctx.possible_outputs.try_emplace(output.hash, std::make_pair(pubkey, outputbuf));
}
}
// Proposals from stage 1, 2, 3 will have hashed user outputs in them.
else if (!cp.hash_outputs.empty())
{
for (auto outputhash : cp.hash_outputs)
increment<std::string>(votes.outputs, outputhash);
}
// Vote for contract outputs (hashes). Only vote for the outputs that are in our candidate_outputs set.
for (const std::string &hash : cp.hash_outputs)
if (ctx.candidate_user_outputs.count(hash) > 0)
increment(votes.outputs, hash);
// todo: repeat above for state
}
@@ -328,14 +309,14 @@ p2p::proposal create_stage123_proposal(vote_counter &votes)
// if we're at proposal stage 1 we'll accept any input and connection that has 1 or more vote.
// Add user connections which have votes over stage threshold to proposal.
for (auto &[userpubkey, numvotes] : votes.users)
if (numvotes >= vote_threshold || (numvotes > 0 && ctx.stage == 1))
stg_prop.users.emplace(userpubkey);
// Add user pubkeys which have votes over stage threshold to proposal.
for (auto &[pubkey, numvotes] : votes.users)
if (numvotes >= vote_threshold || (ctx.stage == 1 && numvotes > 0))
stg_prop.users.emplace(pubkey);
// Add inputs which have votes over stage threshold to proposal.
for (auto &[hash, numvotes] : votes.inputs)
if (numvotes >= vote_threshold || (numvotes > 0 && ctx.stage == 1))
if (numvotes >= vote_threshold || (ctx.stage == 1 && numvotes > 0))
stg_prop.hash_inputs.emplace(hash);
// Add outputs which have votes over stage threshold to proposal.
@@ -363,33 +344,16 @@ p2p::proposal create_stage123_proposal(vote_counter &votes)
* Broadcasts the given proposal to all connected peers.
* @return 0 on success. -1 if no peers to broadcast.
*/
int broadcast_proposal(const p2p::proposal &p)
void broadcast_proposal(const p2p::proposal &p)
{
p2p::peer_outbound_message msg(std::make_shared<flatbuffers::FlatBufferBuilder>(1024));
p2pmsg::create_msg_from_proposal(msg.builder(), p);
{
//Broadcast while locking the peer_connections.
std::lock_guard<std::mutex> lock(p2p::peer_connections_mutex);
if (p2p::peer_connections.size() == 0)
{
LOG_DBG << "No peers to broadcast";
return -1;
}
for (auto &[k, session] : p2p::peer_connections)
session->send(msg);
}
p2p::broadcast_message(msg);
LOG_DBG << "Proposed [stage" << std::to_string(p.stage)
<< "] users:" << p.users.size()
<< " rinp:" << p.raw_inputs.size()
<< " hinp:" << p.hash_inputs.size()
<< " rout:" << p.raw_outputs.size()
<< " hout:" << p.hash_outputs.size();
return 0;
}
/**
@@ -520,7 +484,7 @@ void timewait_stage(bool reset)
if (reset)
ctx.stage = 0;
std::this_thread::sleep_for(std::chrono::milliseconds(conf::cfg.roundtime / 100));
util::sleep(conf::cfg.roundtime / 100);
}
/**
@@ -529,112 +493,167 @@ void timewait_stage(bool reset)
*/
void apply_ledger(const p2p::proposal &cons_prop)
{
// todo:write lcl.
ctx.led_seq_no++;
ctx.lcl = cons::save_ledger(cons_prop, ctx.led_seq_no);
// Send any output from the previous consensus round to users.
// After the current ledger seq no is updated, we remove any newly expired inputs from candidate set.
{
auto itr = ctx.candidate_user_inputs.begin();
while (itr != ctx.candidate_user_inputs.end())
{
if (itr->second.maxledgerseqno <= ctx.led_seq_no)
ctx.candidate_user_inputs.erase(itr++);
else
++itr;
}
}
// Send any output from the previous consensus round to locally connected users.
dispatch_user_outputs(cons_prop);
// todo:check state against the winning / canonical state
// and act accordingly (rollback, ask state from peer, etc.)
proc::contract_bufmap_t useriobufmap;
feed_inputs_to_contract_bufmap(useriobufmap, cons_prop);
run_contract_binary(cons_prop.time, useriobufmap);
extract_outputs_from_contract_bufmap(useriobufmap);
}
/**
* Dispatch any consensus-reached outputs to matching users if they are connected to us locally.
* @param cons_prop The proposal that achieved consensus.
*/
void dispatch_user_outputs(const p2p::proposal &cons_prop)
{
std::lock_guard<std::mutex> lock(usr::ctx.users_mutex);
for (const std::string &hash : cons_prop.hash_outputs)
{
auto itr = ctx.possible_outputs.find(hash);
bool hashfound = (itr != ctx.possible_outputs.end());
auto cu_itr = ctx.candidate_user_outputs.find(hash);
bool hashfound = (cu_itr != ctx.candidate_user_outputs.end());
if (!hashfound)
{
// There's no possiblity for this to happen.
LOG_ERR << "Output required but wasn't in our possible output dict, this will potentially cause desync.";
LOG_ERR << "Output required but wasn't in our candidate outputs map, this will potentially cause desync.";
// todo: consider fatal
}
else
{
// Send outputs to users.
auto &[pubkey, output] = itr->second;
std::string outputtosend;
outputtosend.swap(output);
// Send matching outputs to locally connected users.
candidate_user_output &cand_output = cu_itr->second;
// Find the user session by user pubkey.
auto sess_itr = usr::ctx.sessionids.find(cand_output.userpubkey);
if (sess_itr != usr::ctx.sessionids.end()) // match found
{
std::lock_guard<std::mutex> lock(usr::ctx.users_mutex);
// Find the user by session id.
const std::string sessionid = usr::ctx.sessionids[pubkey];
auto itr = usr::ctx.users.find(sessionid);
if (itr != usr::ctx.users.end())
auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id.
if (user_itr != usr::ctx.users.end()) // match found
{
const usr::connected_user &user = itr->second;
std::string outputtosend;
outputtosend.swap(cand_output.output);
usr::user_outbound_message outmsg(std::move(outputtosend));
const usr::connected_user &user = user_itr->second;
user.session->send(std::move(outmsg));
}
}
}
}
// now we can safely clear our candidate outputs.
ctx.candidate_user_outputs.clear();
}
// now we can safely clear our outputs.
ctx.possible_outputs.empty();
//todo:check state against the winning / canonical state
//and act accordingly (rollback, ask state from peer, etc.)
//create input to feed to binary contract run
/**
* Transfers consensus-reached inputs into the provided contract buf map so it can be fed into the contract process.
* @param bufmap The contract bufmap which needs to be populated with inputs.
* @param cons_prop The proposal that achieved consensus.
*/
void feed_inputs_to_contract_bufmap(proc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop)
{
// Populate the buf map with all currently connected users regardless of whether they have inputs or not.
// This is in case the contract wanted to emit some data to a user without needing any input.
for (const std::string &pubkey : cons_prop.users)
bufmap.try_emplace(pubkey, proc::contract_iobuf_pair());
for (const std::string &hash : cons_prop.hash_inputs)
{
auto itr = ctx.possible_inputs.find(hash);
bool hashfound = (itr != ctx.possible_inputs.end());
// For each consensus input hash, we need to find the actual input content to feed the contract.
auto itr = ctx.candidate_user_inputs.find(hash);
bool hashfound = (itr != ctx.candidate_user_inputs.end());
if (!hashfound)
{
// There's no possiblity for this to happen.
LOG_ERR << "input required but wasn't in our possible input dict, this will potentially cause desync";
// todo: consider fatal
LOG_ERR << "input required but wasn't in our candidate inputs map, this will potentially cause desync.";
// TODO: consider fatal
}
else
{
// Prepare ctx.useriobufmap with user inputs to feed to the contract.
// Populate the input content into the bufmap.
const std::string &pubkey = itr->second.first;
std::string rawinput = itr->second.second;
candidate_user_input &cand_input = itr->second;
std::string inputtofeed;
inputtofeed.swap(rawinput);
inputtofeed.swap(cand_input.input);
proc::contract_iobuf_pair &bufpair = ctx.useriobufmap[pubkey];
proc::contract_iobuf_pair &bufpair = bufmap[cand_input.userpubkey];
bufpair.inputs.push_back(std::move(inputtofeed));
// Remove the input from the candidate set because we no longer need it.
ctx.candidate_user_inputs.erase(itr);
}
}
ctx.possible_inputs.clear();
run_contract_binary(cons_prop.time);
// Remove entries from candidate inputs that made their way into a closed ledger
auto cu_itr = ctx.candidate_users.begin();
while (cu_itr != ctx.candidate_users.end())
{
// Delete any ledger inputs for this user.
std::list<util::hash_buffer> &inputs = cu_itr->second;
auto inp_itr = inputs.begin();
while (inp_itr != inputs.end())
{
// Delete the input from the list, if it was part of consensus proposal.
if (cons_prop.hash_inputs.count(inp_itr->hash))
inputs.erase(inp_itr++);
else
++inp_itr;
}
// Delete the user from the list if there are no more unprocessed inputs.
if (cu_itr->second.empty())
ctx.candidate_users.erase(cu_itr++);
else
++cu_itr;
}
}
void run_contract_binary(int64_t time_now)
/**
* Reads any outputs the contract has produced on the provided buf map and transfers them to candidate outputs
* for the next consensus round.
* @param bufmap The contract bufmap containing the outputs produced by the contract.
*/
void extract_outputs_from_contract_bufmap(proc::contract_bufmap_t &bufmap)
{
// todo:implement proper data structures to exchange npl and hpsc bufs
proc::contract_bufmap_t nplbufs;
for (auto &[pubkey, bufpair] : bufmap)
{
if (!bufpair.output.empty())
{
std::string output;
output.swap(bufpair.output);
std::string hash = crypto::get_hash(pubkey, output);
ctx.candidate_user_outputs.try_emplace(
std::move(hash),
candidate_user_output(pubkey, std::move(output)));
}
}
}
/**
* Executes the smart contract with the specified time and provided I/O buf maps.
* @param time_now The time that must be passed on to the contract.
* @param useriobufmap The contract bufmap which holds user I/O buffers.
*/
void run_contract_binary(int64_t time_now, proc::contract_bufmap_t &useriobufmap)
{
// todo:implement exchange of npl and hpsc bufs
proc::contract_bufmap_t nplbufmap;
proc::contract_iobuf_pair hpscbufpair;
proc::ContractExecArgs eargs(time_now, ctx.useriobufmap, nplbufs, hpscbufpair);
proc::contract_exec_args eargs(time_now, useriobufmap, nplbufmap, hpscbufpair);
proc::exec_contract(eargs);
}
/**
* Increment voting table counter.
* @param counter The counter map in which a vote should be incremented.
* @param candidate The candidate whose vote should be increased by 1.
*/
template <typename T>
void increment(std::map<T, int32_t> &counter, const T &candidate)
{
if (counter.count(candidate))
counter[candidate]++;
else
counter.try_emplace(candidate, 1);
}
} // namespace cons