mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Included output hash and ledger info in output return message. (#284)
This commit is contained in:
@@ -168,13 +168,15 @@ namespace consensus
|
||||
if (ctx.stage == 3)
|
||||
{
|
||||
consensed_user_map consensed_users;
|
||||
if (prepare_consensed_users_and_inputs(consensed_users, p) == -1 ||
|
||||
update_ledger(p, consensed_users, patch_hash, lcl_id) == -1 ||
|
||||
execute_contract(p, consensed_users, lcl_id) == -1)
|
||||
if (prepare_consensed_users(consensed_users, p) == -1 ||
|
||||
commit_consensus_results(p, consensed_users, patch_hash) == -1)
|
||||
{
|
||||
LOG_ERROR << "Error occured in Stage 3 consensus execution.";
|
||||
|
||||
// Cleanup any buffers occupied by consensed inputs regardless of any errors occured.
|
||||
purge_user_input_buffers(consensed_users);
|
||||
// Cleanup obsolete information before next round starts.
|
||||
cleanup_output_collections();
|
||||
cleanup_consensed_user_inputs(consensed_users);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,6 +191,41 @@ namespace consensus
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs the consensus finalalization activities with the provided consensused information.
|
||||
* @param cons_prop The proposal which reached consensus.
|
||||
* @param consensed_users Set of consensed users and their consensed inputs and outputs.
|
||||
* @param patch_hash The current config patch hash.
|
||||
*/
|
||||
int commit_consensus_results(const p2p::proposal &cons_prop, const consensus::consensed_user_map &consensed_users, const util::h32 &patch_hash)
|
||||
{
|
||||
// Persist the new ledger with the consensus results.
|
||||
if (ledger::save_ledger(cons_prop, consensed_users) == -1)
|
||||
return -1;
|
||||
|
||||
p2p::sequence_hash lcl_id = ledger::ctx.get_lcl_id();
|
||||
LOG_INFO << "****Ledger created**** (lcl:" << lcl_id << " state:" << cons_prop.state_hash << " patch:" << cons_prop.patch_hash << ")";
|
||||
|
||||
// Now that there's a new ledger, prune any newly-expired candidate inputs.
|
||||
expire_candidate_inputs(lcl_id);
|
||||
|
||||
// Inform locally connected users that their inputs made it into the ledger.
|
||||
dispatch_consensed_user_input_responses(consensed_users, lcl_id);
|
||||
|
||||
// Send consensed outputs to locally connected users.
|
||||
dispatch_consensed_user_outputs(consensed_users, lcl_id);
|
||||
|
||||
// Apply consensed config patch file changes to the hpcore runtime and hp.cfg.
|
||||
if (apply_consensed_patch_file_changes(cons_prop.patch_hash, patch_hash) == -1)
|
||||
return -1;
|
||||
|
||||
// Execute the smart contract with the consensed user inputs.
|
||||
if (execute_contract(cons_prop, consensed_users, lcl_id) == -1)
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether we are in sync with the received votes.
|
||||
* @return 0 if we are in sync. -1 on ledger or hpfs desync. -2 if majority last ledger primary shard hash unreliable.
|
||||
@@ -357,42 +394,61 @@ namespace consensus
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare the consensed user map including the consensed user inputs based on the consensus proposal.
|
||||
* Prepare the consensed user map including the consensed inputs/outputs for those users based on the consensus proposal.
|
||||
* @param consensed_users The consensed user map to populate.
|
||||
* @param cons_prop The proposal that reached consensus.
|
||||
* @return 0 on success. -1 on failure.
|
||||
*/
|
||||
int prepare_consensed_users_and_inputs(consensed_user_map &consensed_users, const p2p::proposal &cons_prop)
|
||||
int prepare_consensed_users(consensed_user_map &consensed_users, const p2p::proposal &cons_prop)
|
||||
{
|
||||
// Prepare consensed user input set by joining consensus proposal input ordered hashes and candidate user input set.
|
||||
// consensed inputs are removed from the candidate set.
|
||||
|
||||
int ret = 0;
|
||||
|
||||
// Populate the users map with all consensed users regardless of whether they have inputs or not.
|
||||
for (const std::string &pubkey : cons_prop.users)
|
||||
consensed_users.try_emplace(pubkey, std::vector<consensed_user_input>());
|
||||
consensed_users.try_emplace(pubkey, consensed_user{});
|
||||
|
||||
// Prepare consensed user input set by joining consensus proposal input ordered hashes and candidate user input set.
|
||||
// consensed inputs are removed from the candidate set.
|
||||
for (const std::string &ordered_hash : cons_prop.input_ordered_hashes)
|
||||
{
|
||||
// For each consensus input ordered hash, we need to find the candidate input.
|
||||
const auto itr = ctx.candidate_user_inputs.find(ordered_hash);
|
||||
const bool hash_found = (itr != ctx.candidate_user_inputs.end());
|
||||
if (!hash_found)
|
||||
if (hash_found)
|
||||
{
|
||||
LOG_ERROR << "Input required but wasn't in our candidate inputs map, this will potentially cause desync.";
|
||||
candidate_user_input &ci = itr->second;
|
||||
consensed_users[ci.user_pubkey].consensed_inputs.emplace_back(ordered_hash, ci.input);
|
||||
|
||||
// Erase the consensed input from the candidate set.
|
||||
ctx.candidate_user_inputs.erase(itr);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_WARNING << "Input required but wasn't in our candidate inputs map, this will potentially cause desync.";
|
||||
|
||||
// We set error return value but keep on moving candidate inputs to consensed inputs.
|
||||
// This is so that their underlying buffers can get deallocated during stage 3 execution steps.
|
||||
ret = -1;
|
||||
}
|
||||
}
|
||||
|
||||
// If final elected output hash matches our output hash, move the outputs into consensed outputs.
|
||||
{
|
||||
if (cons_prop.output_hash == ctx.user_outputs_hashtree.root_hash())
|
||||
{
|
||||
for (const auto &[hash, gen_output] : ctx.generated_user_outputs)
|
||||
{
|
||||
consensed_user_output &con_out = consensed_users[gen_output.user_pubkey].consensed_outputs;
|
||||
con_out.hash = hash;
|
||||
|
||||
for (const sc::contract_output &co : gen_output.outputs)
|
||||
con_out.outputs.push_back(std::move(co.message));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
candidate_user_input &ci = itr->second;
|
||||
consensed_users[ci.user_pubkey].emplace_back(ordered_hash, ci.input, ci.protocol);
|
||||
|
||||
// Erase the consensed input from the candidate set.
|
||||
ctx.candidate_user_inputs.erase(itr);
|
||||
LOG_WARNING << "Consenses output hash didn't match our output hash.";
|
||||
ret = -1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -400,17 +456,39 @@ namespace consensus
|
||||
}
|
||||
|
||||
/**
|
||||
* Purges the underyling buffers that belong to provided consensed user inputs.
|
||||
* @param consensed_users The consensed user map that contains input pointers.
|
||||
* Removes any candidate inputs that has lived passed the current ledger seq no.
|
||||
*/
|
||||
void expire_candidate_inputs(const p2p::sequence_hash &lcl_id)
|
||||
{
|
||||
auto itr = ctx.candidate_user_inputs.begin();
|
||||
while (itr != ctx.candidate_user_inputs.end())
|
||||
{
|
||||
if (itr->second.max_ledger_seq_no <= lcl_id.seq_no)
|
||||
{
|
||||
// Erase the candidate input along with its data buffer in the input store.
|
||||
usr::input_store.purge(itr->second.input);
|
||||
ctx.candidate_user_inputs.erase(itr++);
|
||||
}
|
||||
else
|
||||
{
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleans up any consensused user inputs that are not relevant for the next round.
|
||||
* @param consensed_users The consensed user map that contains consensed inputs.
|
||||
* @return 0 on success. -1 on failure.
|
||||
*/
|
||||
int purge_user_input_buffers(const consensed_user_map &consensed_users)
|
||||
int cleanup_consensed_user_inputs(const consensed_user_map &consensed_users)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
for (const auto &[pubkey, inputs] : consensed_users)
|
||||
// Purges the underyling buffers that belong to provided consensed user inputs.
|
||||
for (const auto &[pubkey, user] : consensed_users)
|
||||
{
|
||||
for (const consensed_user_input &ci : inputs)
|
||||
for (const consensed_user_input &ci : user.consensed_inputs)
|
||||
{
|
||||
if (usr::input_store.purge(ci.input) == -1)
|
||||
ret = -1;
|
||||
@@ -420,6 +498,17 @@ namespace consensus
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the contract output collections that are no longer needed for the next round.
|
||||
*/
|
||||
void cleanup_output_collections()
|
||||
{
|
||||
ctx.user_outputs_our_sig.clear();
|
||||
ctx.generated_user_outputs.clear();
|
||||
ctx.user_outputs_hashtree.clear();
|
||||
ctx.user_outputs_unl_sig.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Syncrhonise the stage/round time for fixed intervals and reset the stage.
|
||||
* @return True if consensus can proceed in the current round. False if stage is reset.
|
||||
@@ -605,7 +694,7 @@ namespace consensus
|
||||
if (reject_reason == NULL)
|
||||
extracted_inputs.push_back(std::move(extracted));
|
||||
else
|
||||
rejections[pubkey].push_back(usr::input_status_response{submitted_input.protocol, crypto::get_hash(submitted_input.sig), reject_reason});
|
||||
rejections[pubkey].push_back(usr::input_status_response{crypto::get_hash(submitted_input.sig), reject_reason});
|
||||
}
|
||||
|
||||
// This will sort the inputs in nonce order so the validation will follow the same order on all nodes.
|
||||
@@ -628,7 +717,7 @@ namespace consensus
|
||||
// No reject reason means we should go ahead and subject the input to consensus.
|
||||
ctx.candidate_user_inputs.try_emplace(
|
||||
ordered_hash,
|
||||
candidate_user_input(pubkey, stored_input, extracted_input.max_ledger_seq_no, extracted_input.protocol));
|
||||
candidate_user_input(pubkey, stored_input, extracted_input.max_ledger_seq_no));
|
||||
}
|
||||
|
||||
// If the input was rejected we need to inform the user.
|
||||
@@ -636,7 +725,7 @@ namespace consensus
|
||||
{
|
||||
// We need to consider the last 32 bytes of each ordered hash to get input hash without the nonce prefix.
|
||||
const std::string input_hash = std::string(util::get_string_suffix(ordered_hash, BLAKE3_OUT_LEN));
|
||||
rejections[pubkey].push_back(usr::input_status_response{extracted_input.protocol, std::move(input_hash), reject_reason});
|
||||
rejections[pubkey].push_back(usr::input_status_response{std::move(input_hash), reject_reason});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -933,56 +1022,6 @@ namespace consensus
|
||||
is_patch_desync = (sc::contract_fs.get_parent_hash(sc::PATCH_FILE_PATH) != majority_patch_hash);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the ledger after reaching consensus.
|
||||
* @param cons_prop The proposal that reached consensus.
|
||||
* @param consensed_users Consensed users and their inputs.
|
||||
* @param patch_hash The patch hash.
|
||||
* @param lcl_id Last lcl seq_no and hash.
|
||||
* @return 0 on success. -1 on error.
|
||||
*/
|
||||
int update_ledger(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const util::h32 &patch_hash, p2p::sequence_hash &new_lcl_id)
|
||||
{
|
||||
if (ledger::save_ledger(cons_prop, consensed_users, ctx.generated_user_outputs) == -1)
|
||||
return -1;
|
||||
|
||||
new_lcl_id = ledger::ctx.get_lcl_id();
|
||||
const p2p::sequence_hash new_last_primary_shard_id = ledger::ctx.get_last_primary_shard_id();
|
||||
|
||||
LOG_INFO << "****Ledger created**** (lcl:" << new_lcl_id << " state:" << cons_prop.state_hash << " patch:" << cons_prop.patch_hash << ")";
|
||||
|
||||
// Send back the inputs "accepted" responses to the user.
|
||||
if (dispatch_consensed_user_input_responses(consensed_users, new_lcl_id) == -1)
|
||||
return -1;
|
||||
|
||||
// Send any output from the previous consensus round to locally connected users.
|
||||
dispatch_user_outputs(cons_prop, new_lcl_id);
|
||||
|
||||
// Apply consensed patch file changes to the hpcore runtime and hp.cfg.
|
||||
if (apply_consensed_patch_file_changes(cons_prop.patch_hash, patch_hash) == -1)
|
||||
return -1;
|
||||
|
||||
// After the current ledger seq no is updated, we remove any newly expired inputs from candidate set.
|
||||
{
|
||||
auto itr = ctx.candidate_user_inputs.begin();
|
||||
while (itr != ctx.candidate_user_inputs.end())
|
||||
{
|
||||
if (itr->second.max_ledger_seq_no <= new_lcl_id.seq_no)
|
||||
{
|
||||
// Erase the candidate input along with its data buffer in the input store.
|
||||
usr::input_store.purge(itr->second.input);
|
||||
ctx.candidate_user_inputs.erase(itr++);
|
||||
}
|
||||
else
|
||||
{
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the contract after consensus.
|
||||
* @param cons_prop The proposal that reached consensus.
|
||||
@@ -1015,14 +1054,16 @@ namespace consensus
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Cleanup the fed inputs and extract the generated outputs.
|
||||
cleanup_consensed_user_inputs(consensed_users);
|
||||
extract_user_outputs_from_contract_bufmap(args.userbufs);
|
||||
|
||||
// Get the new state hash after contract execution.
|
||||
const util::h32 &new_state_hash = args.post_execution_state_hash;
|
||||
|
||||
// Update state hash in contract fs global hash tracker.
|
||||
sc::contract_fs.set_parent_hash(sc::STATE_DIR_PATH, new_state_hash);
|
||||
|
||||
extract_user_outputs_from_contract_bufmap(args.userbufs);
|
||||
|
||||
// Generate user output hash merkle tree and signature with state hash included.
|
||||
if (!ctx.generated_user_outputs.empty())
|
||||
{
|
||||
@@ -1043,95 +1084,81 @@ namespace consensus
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatch acceptance status responses to consensed user inputs, if the recipients are connected to us locally.
|
||||
* @param consensed_users The map of consensed users and their inputs.
|
||||
* Dispatch acceptence status responses to consensed user inputs, if the recipients are connected to us locally.
|
||||
* @param consensed_users The map of consensed users containing their inputs.
|
||||
* @param lcl_id The ledger the inputs got included in.
|
||||
* @return 0 on success. -1 on failure.
|
||||
*/
|
||||
int dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id)
|
||||
void dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id)
|
||||
{
|
||||
if (consensed_users.empty())
|
||||
return;
|
||||
|
||||
std::unordered_map<std::string, std::vector<usr::input_status_response>> responses;
|
||||
|
||||
for (const auto &[pubkey, inputs] : consensed_users)
|
||||
for (const auto &[pubkey, user] : consensed_users)
|
||||
{
|
||||
if (inputs.empty())
|
||||
if (user.consensed_inputs.empty())
|
||||
continue;
|
||||
|
||||
const auto [itr, success] = responses.emplace(pubkey, std::vector<usr::input_status_response>());
|
||||
|
||||
for (const consensed_user_input &ci : inputs)
|
||||
for (const consensed_user_input &ci : user.consensed_inputs)
|
||||
{
|
||||
// We need to consider the last 32 bytes of each ordered hash to get input hash without the nonce prefix.
|
||||
const std::string input_hash = std::string(util::get_string_suffix(ci.ordered_hash, BLAKE3_OUT_LEN));
|
||||
itr->second.push_back(usr::input_status_response{ci.protocol, input_hash, NULL});
|
||||
itr->second.push_back(usr::input_status_response{input_hash, NULL});
|
||||
}
|
||||
}
|
||||
|
||||
usr::send_input_status_responses(responses, lcl_id.seq_no, lcl_id.hash);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatch any consensus-reached outputs to matching users if they are connected to us locally.
|
||||
* @param cons_prop The proposal that achieved consensus.
|
||||
* @param lcl_id Lcl sequnce no hash info.
|
||||
* @param consensed_users The map of consensed users containing their outputs.
|
||||
* @param lcl_id The ledger the outputs got included in.
|
||||
*/
|
||||
void dispatch_user_outputs(const p2p::proposal &cons_prop, const p2p::sequence_hash &lcl_id)
|
||||
void dispatch_consensed_user_outputs(const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id)
|
||||
{
|
||||
if (cons_prop.output_hash == ctx.user_outputs_hashtree.root_hash())
|
||||
if (!consensed_users.empty())
|
||||
{
|
||||
std::scoped_lock<std::mutex> lock(usr::ctx.users_mutex);
|
||||
|
||||
// If final elected output hash matches our output hash, distribute the outputs
|
||||
// to locally connected users.
|
||||
for (auto &[hash, user_output] : ctx.generated_user_outputs)
|
||||
for (const auto &[pubkey, cu] : consensed_users)
|
||||
{
|
||||
if (cu.consensed_outputs.outputs.empty())
|
||||
continue;
|
||||
|
||||
// Find user to send by pubkey.
|
||||
const auto user_itr = usr::ctx.users.find(user_output.user_pubkey);
|
||||
const auto user_itr = usr::ctx.users.find(pubkey);
|
||||
if (user_itr != usr::ctx.users.end()) // match found
|
||||
{
|
||||
const usr::connected_user &user = user_itr->second;
|
||||
msg::usrmsg::usrmsg_parser parser(user.protocol);
|
||||
|
||||
// Get the collapsed hash tree with this user's output hash remaining independently.
|
||||
util::merkle_hash_node collapsed_hash_root = ctx.user_outputs_hashtree.collapse(cu.consensed_outputs.hash);
|
||||
|
||||
// Send the outputs and signatures to the user.
|
||||
std::vector<uint8_t> msg;
|
||||
|
||||
// Get the collapsed hash tree with this user's output hash remaining independently.
|
||||
util::merkle_hash_node collapsed_hash_root = ctx.user_outputs_hashtree.collapse(hash);
|
||||
|
||||
std::vector<std::string_view> outputs;
|
||||
for (const sc::contract_output &output : user_output.outputs)
|
||||
outputs.emplace_back(output.message);
|
||||
|
||||
parser.create_contract_output_container(msg, outputs, collapsed_hash_root, ctx.user_outputs_unl_sig, lcl_id.seq_no, lcl_id.hash.to_string_view());
|
||||
|
||||
parser.create_contract_output_container(msg, cu.consensed_outputs.hash, cu.consensed_outputs.outputs, collapsed_hash_root, ctx.user_outputs_unl_sig,
|
||||
lcl_id.seq_no, lcl_id.hash.to_string_view());
|
||||
user.session.send(msg);
|
||||
}
|
||||
|
||||
user_output.outputs.clear(); // We no longer need this user's outputs.
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_INFO << "Output required but didn't match our output hash.";
|
||||
}
|
||||
|
||||
// Clear the output hash tree and signature because we no longer need it.
|
||||
ctx.user_outputs_hashtree.clear();
|
||||
ctx.user_outputs_our_sig.clear();
|
||||
ctx.user_outputs_unl_sig.clear();
|
||||
ctx.generated_user_outputs.clear();
|
||||
cleanup_output_collections();
|
||||
}
|
||||
|
||||
/**
|
||||
* Transfers consensus-reached inputs into the provided contract buf map so it can be fed into the contract process.
|
||||
* @param bufmap The contract bufmap which needs to be populated with inputs.
|
||||
* @param consensed_users Set of consensed users keyed by user binary pubkey and any inputs.
|
||||
* @param consensed_users Set of consensed users keyed by user binary pubkey.
|
||||
*/
|
||||
void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const consensed_user_map &consensed_users)
|
||||
{
|
||||
for (const auto &[pubkey, inputs] : consensed_users)
|
||||
for (const auto &[pubkey, user] : consensed_users)
|
||||
{
|
||||
// Populate the buf map with user pubkey regardless of whether user has any inputs or not.
|
||||
// This is in case the contract wanted to emit some data to a user without needing any input.
|
||||
@@ -1139,7 +1166,7 @@ namespace consensus
|
||||
|
||||
// Populate the input contents into the bufmap.
|
||||
// It's VERY important that we preserve the original input order when feeding to the contract as well.
|
||||
for (const consensed_user_input &ci : inputs)
|
||||
for (const consensed_user_input &ci : user.consensed_inputs)
|
||||
itr->second.inputs.push_back(ci.input);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user