Ledger close refactor. (#330)

This commit is contained in:
Ravin Perera
2021-07-14 10:09:13 +05:30
committed by GitHub
parent d315f9c316
commit 7ba84e8e7a
12 changed files with 177 additions and 77 deletions

View File

@@ -105,6 +105,8 @@ namespace consensus
int consensus()
{
// A consensus round consists of 4 stages (0,1,2,3).
// In stage 0 we perform closing of ledger based on previous round stage 3 votes. In stages 1,2,3
// we progressively narrow down the votes based on increasing majority thresholds.
// For a given stage, this function may get visited multiple times due to time-wait conditions.
if (!wait_and_proceed_stage())
@@ -116,6 +118,10 @@ namespace consensus
// arived ones and expired ones.
revise_candidate_proposals(ctx.vote_status == VOTES_SYNCED);
// Attempt to close the ledger after scanning last round stage 3 proposals.
if (ctx.stage == 0)
attempt_ledger_close();
// Get current lcl, state, patch, primary shard and raw shard info.
util::sequence_hash lcl_id = ledger::ctx.get_lcl_id();
util::h32 state_hash = sc::contract_fs.get_parent_hash(sc::STATE_DIR_PATH);
@@ -154,7 +160,7 @@ namespace consensus
{
int new_sync_status = check_sync_status(unl_count, votes, lcl_id);
if (ctx.vote_status != VOTES_SYNCED && new_sync_status == VOTES_UNRELIABLE)
if (ctx.vote_status != VOTES_SYNCED && new_sync_status == VOTES_SYNCED)
{
// If we are just becoming 'in-sync' after being out-of-sync, check the vote status again after the proper
// pruning of candidate proposals. This is because we relax the proposal pruning rules when we are not in sync,
@@ -210,7 +216,7 @@ namespace consensus
const p2p::proposal p = create_stage123_proposal(votes, unl_count, state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id);
broadcast_proposal(p);
// This marks the moment we finish a sync cycle. We are in stage 1 and we ditect that our votes are in sync.
// This marks the moment we finish a sync cycle. We are in stage 1 and we detect that our votes are in sync.
if (ctx.stage == 1 && ctx.sync_ongoing)
{
// Clear any sync recovery pending state if we enter stage 1 while being in sync.
@@ -218,21 +224,6 @@ namespace consensus
status::sync_status_changed(true);
LOG_DEBUG << "Sync recovery completed.";
}
else if (ctx.stage == 3)
{
// Upon successful consensus at stage 3, update the ledger and execute the contract using the consensus proposal.
consensed_user_map consensed_users;
if (prepare_consensed_users(consensed_users, p) == -1 ||
commit_consensus_results(p, consensed_users, patch_hash) == -1)
{
LOG_ERROR << "Error occured in Stage 3 consensus execution.";
// Cleanup obsolete information before next round starts.
cleanup_output_collections();
cleanup_consensed_user_inputs(consensed_users);
}
}
}
// We have finished a consensus stage.
@@ -243,13 +234,76 @@ namespace consensus
return 0;
}
/**
* Scan the stage 3 proposals from last round and create the ledger if all majority critertia are met.
*/
void attempt_ledger_close()
{
std::map<util::h32, uint32_t> hash_votes; // Votes on the proposal hash.
util::h32 self_hash = util::h32_empty;
util::h32 majority_hash = util::h32_empty;
// Find the stage 3 proposal that we have made.
const auto itr = ctx.candidate_proposals.find(conf::cfg.node.public_key);
if (itr == ctx.candidate_proposals.end() || itr->second.stage != 3)
{
LOG_DEBUG << "We haven't proposed to close any ledger.";
return;
}
const p2p::proposal self_prop = itr->second;
// Count votes of all stage 3 proposal hashes.
for (const auto &[pubkey, cp] : ctx.candidate_proposals)
{
if (cp.stage == 3)
increment(hash_votes, cp.root_hash);
}
// Find the winning hash and no. of votes for it.
uint32_t winning_votes = 0;
for (const auto [hash, votes] : hash_votes)
{
if (votes > winning_votes)
{
winning_votes = votes;
majority_hash = hash;
}
}
const uint32_t min_votes_required = ceil(MAJORITY_THRESHOLD * unl::count());
if (winning_votes < min_votes_required)
{
LOG_INFO << "Cannot close ledger. Possible fork condition. won:" << winning_votes << " needed:" << min_votes_required;
return;
}
if (self_prop.root_hash != majority_hash)
{
LOG_INFO << "Cannot close ledger. Our proposal:" << self_prop.root_hash << " does not match with majority:" << majority_hash;
return;
}
LOG_DEBUG << "Closing ledger with proposal:" << self_prop.root_hash;
// Upon successful ledger close condition, update the ledger and execute the contract using the consensus proposal.
consensed_user_map consensed_users;
if (prepare_consensed_users(consensed_users, self_prop) == -1 ||
commit_consensus_results(self_prop, consensed_users) == -1)
{
LOG_ERROR << "Error occured when closing ledger";
// Cleanup obsolete information before next round starts.
cleanup_output_collections();
cleanup_consensed_user_inputs(consensed_users);
}
}
/**
* Performs the consensus finalalization activities with the provided consensused information.
* @param cons_prop The proposal which reached consensus.
* @param consensed_users Set of consensed users and their consensed inputs and outputs.
* @param patch_hash The current config patch hash.
*/
int commit_consensus_results(const p2p::proposal &cons_prop, const consensus::consensed_user_map &consensed_users, const util::h32 &patch_hash)
int commit_consensus_results(const p2p::proposal &cons_prop, const consensus::consensed_user_map &consensed_users)
{
// Creating a ledger while sync ongoing happens when we discover that our ledger votes are in sync at stage 2 or 3. At this point,
// we can create the ledger with majority votes. However we dont't have the raw contract outputs we should have had in the previous ledger
@@ -278,6 +332,7 @@ namespace consensus
dispatch_consensed_user_outputs(consensed_users, lcl_id);
// Apply consensed config patch file changes to the hpcore runtime and hp.cfg.
const util::h32 patch_hash = sc::contract_fs.get_parent_hash(sc::PATCH_FILE_PATH);
if (apply_consensed_patch_file_changes(cons_prop.patch_hash, patch_hash) == -1)
return -1;
@@ -436,21 +491,41 @@ namespace consensus
{
const p2p::proposal &cp = itr->second;
// If we are in sync, only consider this round's proposals which are from current or previous stage.
// Otherwise consider all proposals as long as they are from the same round.
const bool stage_valid = in_sync ? (ctx.stage >= cp.stage && (ctx.stage - cp.stage) <= 1) : true;
const bool keep_candidate = (ctx.round_start_time == cp.time) && stage_valid;
LOG_DEBUG << (keep_candidate ? "Prop--->" : "Erased")
<< " [s" << std::to_string(cp.stage)
<< "] u/i:" << cp.users.size()
<< "/" << cp.input_ordered_hashes.size()
<< " ts:" << cp.time
<< " state:" << cp.state_hash
<< " patch:" << cp.patch_hash
<< " lps:" << cp.last_primary_shard_id
<< " lrs:" << cp.last_raw_shard_id
<< " [from:" << ((cp.pubkey == conf::cfg.node.public_key) ? "self" : util::to_hex(cp.pubkey).substr(2, 10)) << "]"
<< "(" << (cp.recv_timestamp > cp.sent_timestamp ? (cp.recv_timestamp - cp.sent_timestamp) : 0) << "ms)";
// Drop all proposals which are older than previous round.
// If we are not in sync, consider all remaining proposals.
// If we are in sync, consider proposals from previous stage only.
bool keep_candidate = false;
if (ctx.round_start_time >= cp.time && (ctx.round_start_time - cp.time) <= conf::cfg.contract.roundtime)
{
if (!in_sync)
keep_candidate = true;
else
keep_candidate = ctx.round_start_time == cp.time
? ctx.stage >= cp.stage && (ctx.stage - cp.stage) <= 1 // This round previous stage.
: ctx.stage == 0 && cp.stage == 3; // Previous round stage 3.
}
if (keep_candidate)
{
LOG_DEBUG << "[s" << std::to_string(cp.stage)
<< "-" << cp.root_hash
<< "] u/i/t:" << cp.users.size()
<< "/" << cp.input_ordered_hashes.size()
<< "/" << cp.time
<< " s:" << cp.state_hash
<< " p:" << cp.patch_hash
<< " ps:" << cp.last_primary_shard_id
<< " rs:" << cp.last_raw_shard_id
<< " [frm:" << (cp.from_self ? "self" : util::to_hex(cp.pubkey).substr(2, 8))
<< "<" << (cp.recv_timestamp > cp.sent_timestamp ? (cp.recv_timestamp - cp.sent_timestamp) : 0) << "ms]";
}
else
{
LOG_DEBUG << "Erased [s" << std::to_string(cp.stage)
<< "-" << cp.root_hash
<< "] [frm:" << (cp.from_self ? "self" : util::to_hex(cp.pubkey).substr(2, 8)) << "]";
}
if (keep_candidate)
++itr;
@@ -616,7 +691,9 @@ namespace consensus
}
else
{
const uint64_t stage_start = ctx.round_start_time + (ctx.stage * ctx.stage_time);
// Stage start time is calculated based on the duration each stage gets. Stages 1,2,3 gets equal duration as configured
// by stage slice. Stage 0 gets entire remaining time out of the round window.
const uint64_t stage_start = ctx.round_start_time + ctx.stage0_duration + ((ctx.stage - 1) * ctx.stage123_duration);
// Compute stage time wait.
// Node wait between stages to collect enough proposals from previous stages from other nodes.
@@ -688,13 +765,14 @@ namespace consensus
p2pmsg::create_msg_from_proposal(fbuf, p);
p2p::broadcast_message(fbuf, true, false, !conf::cfg.contract.is_consensus_public, 1); // Use high priority send.
LOG_DEBUG << "Proposed <s" << std::to_string(p.stage) << "> u/i:" << p.users.size()
LOG_DEBUG << "Proposed-s" << std::to_string(p.stage)
<< " u/i/t:" << p.users.size()
<< "/" << p.input_ordered_hashes.size()
<< " ts:" << p.time
<< " state:" << p.state_hash
<< " patch:" << p.patch_hash
<< " lps:" << p.last_primary_shard_id
<< " lrs:" << p.last_raw_shard_id;
<< "/" << p.time
<< " s:" << p.state_hash
<< " p:" << p.patch_hash
<< " ps:" << p.last_primary_shard_id
<< " rs:" << p.last_raw_shard_id;
}
/**
@@ -980,10 +1058,10 @@ namespace consensus
}
// Check whether we have received enough votes in total.
const uint32_t min_required = ceil(MAJORITY_THRESHOLD * unl_count);
if (total_ledger_primary_hash_votes < min_required)
const uint32_t min_votes_required = ceil(MAJORITY_THRESHOLD * unl_count);
if (total_ledger_primary_hash_votes < min_votes_required)
{
LOG_INFO << "Not enough peers proposing to perform consensus. votes:" << total_ledger_primary_hash_votes << " needed:" << min_required;
LOG_INFO << "Not enough peers proposing to perform consensus. votes:" << total_ledger_primary_hash_votes << " needed:" << min_votes_required;
return false;
}
@@ -997,10 +1075,9 @@ namespace consensus
}
}
const uint32_t min_wins_required = ceil(MAJORITY_THRESHOLD * ctx.candidate_proposals.size());
if (winning_votes < min_wins_required)
if (winning_votes < 2) // min_votes_required
{
LOG_INFO << "No consensus on last shard hash. Possible fork condition. won:" << winning_votes << " needed:" << min_wins_required;
LOG_INFO << "No consensus on last shard hash. Possible fork condition. won:" << winning_votes << " needed:" << min_votes_required;
return false;
}
else if (ledger::ctx.get_last_primary_shard_id() != majority_primary_shard_id)
@@ -1375,8 +1452,9 @@ namespace consensus
conf::cfg.contract.stage_slice = majority_time_config - (conf::cfg.contract.roundtime * 100);
}
// We allocate configured stage slice for stages 0, 1, 2. Stage 3 gets the entire remaining time from the round window.
ctx.stage_time = conf::cfg.contract.roundtime * conf::cfg.contract.stage_slice / 100;
// We allocate configured stage slice for stages 1, 2, 3. Stage 0 gets the entire remaining time from the round window.
ctx.stage123_duration = conf::cfg.contract.roundtime * conf::cfg.contract.stage_slice / 100;
ctx.stage0_duration = conf::cfg.contract.roundtime - (ctx.stage123_duration * 3);
ctx.stage_reset_wait_threshold = conf::cfg.contract.roundtime / 10;
// We use a time window boundry offset based on contract id to vary the window boundries between