Fixed issue in ledger sync top ledger removal. (#193)

* History response ledger removal fix.
* Consensus code refactor.
This commit is contained in:
Ravin Perera
2020-12-14 12:35:07 +05:30
committed by GitHub
parent 17313dae29
commit bfbf41fb32
3 changed files with 75 additions and 77 deletions

View File

@@ -23,13 +23,7 @@ namespace p2pmsg = msg::fbuf::p2pmsg;
namespace consensus
{
/**
* Voting thresholds for consensus stages.
*/
constexpr float STAGE1_THRESHOLD = 0.5;
constexpr float STAGE2_THRESHOLD = 0.65;
constexpr float STAGE3_THRESHOLD = 0.8;
constexpr float STAGE_THRESHOLDS[] = {0.5, 0.65, 0.8}; // Voting thresholds for stage 1,2,3
constexpr float MAJORITY_THRESHOLD = 0.8;
constexpr size_t ROUND_NONCE_SIZE = 64;
@@ -112,16 +106,14 @@ namespace consensus
// arived ones and expired ones.
revise_candidate_proposals();
// If possible, switch back to proposer mode before stage processing.
// If possible, switch back to proposer mode before stage processing. (if we were syncing before)
check_sync_completion();
// Get current lcl and state.
std::string lcl = ledger::ctx.get_lcl();
const uint64_t lcl_seq_no = ledger::ctx.get_seq_no();
const size_t unl_count = unl::count();
std::string unl_hash = unl::get_hash();
hpfs::h32 state = state_common::ctx.get_state();
vote_counter votes;
std::string unl_hash = unl::get_hash();
if (ctx.stage == 0)
{
@@ -130,53 +122,54 @@ namespace consensus
if (verify_and_populate_candidate_user_inputs(lcl_seq_no) == -1)
return -1;
const p2p::proposal new_round_prop = create_stage0_proposal(lcl, state, unl_hash);
broadcast_proposal(new_round_prop);
const p2p::proposal p = create_stage0_proposal(lcl, state, unl_hash);
broadcast_proposal(p);
ctx.stage = 1; // Transition to next stage.
}
else if (ctx.stage == 1)
else
{
if (is_in_sync(lcl, unl_hash, unl_count, votes))
// Stages 1,2,3
const size_t unl_count = unl::count();
vote_counter votes;
const int sync_status = check_sync_status(lcl, unl_hash, unl_count, votes);
if (sync_status == 0)
{
// If we are in sync, vote and broadcast the winning votes to next stage.
const p2p::proposal p = create_stage123_proposal(STAGE1_THRESHOLD, votes, lcl, unl_count, state, unl_hash);
broadcast_proposal(p);
}
}
else if (ctx.stage == 2)
{
if (is_in_sync(lcl, unl_hash, unl_count, votes))
{
// If we are in sync, vote and broadcast the winning votes to next stage.
const p2p::proposal p = create_stage123_proposal(STAGE2_THRESHOLD, votes, lcl, unl_count, state, unl_hash);
broadcast_proposal(p);
}
// During stage 2, broadcast non-unl proposal (NUP) containing inputs from locally connected users.
// This will be captured and verified during every round stage 0.
// (We broadcast this at stage 2 instead of 3 to give it enough time to reach others before next round stage 0)
broadcast_nonunl_proposal();
}
else if (ctx.stage == 3)
{
if (is_in_sync(lcl, unl_hash, unl_count, votes))
{
// If we are in sync, vote and get the final winning votes.
// This is the consensus proposal which makes it into the ledger and contract execution
const p2p::proposal p = create_stage123_proposal(STAGE3_THRESHOLD, votes, lcl, unl_count, state, unl_hash);
const p2p::proposal p = create_stage123_proposal(votes, lcl, unl_count, state, unl_hash);
broadcast_proposal(p);
// Update the ledger and execute the contract using the consensus proposal.
if (update_ledger_and_execute_contract(p, lcl, state) == -1)
// Upon successful consensus at stage 3, update the ledger and execute the contract using the consensus proposal.
if (ctx.stage == 3 && update_ledger_and_execute_contract(p, lcl, state) == -1)
LOG_ERROR << "Error occured in Stage 3 consensus execution.";
}
if (ctx.stage == 2)
{
// At end of stage 2, broadcast non-unl proposal (NUP) containing inputs from locally connected users.
// This will be captured and verified during every round stage 0.
// (We broadcast this at stage 2 in order to give it enough time to reach others before next round stage 0)
broadcast_nonunl_proposal();
}
// We have finished a consensus stage. Transition or reset stage based on sync status.
if (sync_status == -2)
ctx.stage = 0; // Majority lcl unreliable. Reset to stage 0.
else
ctx.stage = (ctx.stage + 1) % 4; // Transition to next stage. (if at stage 3 go to next round stage 0)
}
// We have finished a consensus stage. Transition to next stage. (if at stage 3 go to next round stage 0)
ctx.stage = (ctx.stage + 1) % 4;
return 0;
}
bool is_in_sync(std::string_view lcl, std::string_view unl_hash, const size_t unl_count, vote_counter &votes)
/**
* Checks whether we are in sync with the received votes.
* @return 0 if we are in sync. -1 on lcl or state desync. -2 if majority lcl unreliable.
*/
int check_sync_status(std::string_view lcl, std::string_view unl_hash, const size_t unl_count, vote_counter &votes)
{
// Check if we're ahead/behind of consensus lcl.
bool is_lcl_desync = false;
@@ -219,11 +212,15 @@ namespace consensus
if (!is_lcl_desync && !is_state_desync && !is_unl_desync)
{
conf::change_operating_mode(conf::OPERATING_MODE::PROPOSER);
return true;
return 0;
}
// lcl or state desync.
return -1;
}
return false;
// Majority lcl couldn't be detected reliably.
return -2;
}
/**
@@ -374,6 +371,28 @@ namespace consensus
<< " users:" << nup.user_inputs.size();
}
/**
* Broadcasts the given proposal to all connected peers if in PROPOSER mode. Does not send in OBSERVER mode.
* @return 0 on success. -1 if no peers to broadcast.
*/
void broadcast_proposal(const p2p::proposal &p)
{
// In observer mode, we do not send out proposals.
if (conf::cfg.operating_mode == conf::OPERATING_MODE::OBSERVER || !conf::cfg.is_unl) // If we are a non-unl node, do not broadcast proposals.
return;
flatbuffers::FlatBufferBuilder fbuf(1024);
p2pmsg::create_msg_from_proposal(fbuf, p);
p2p::broadcast_message(fbuf, true, false, !conf::cfg.is_consensus_public);
LOG_DEBUG << "Proposed <s" << std::to_string(p.stage) << "> u/i/o:" << p.users.size()
<< "/" << p.hash_inputs.size()
<< "/" << p.hash_outputs.size()
<< " ts:" << std::to_string(p.time)
<< " lcl:" << p.lcl.substr(0, 15)
<< " state:" << p.state;
}
/**
* Enqueue npl messages to the npl messages queue.
* @param npl_msg Constructed npl message.
@@ -542,7 +561,7 @@ namespace consensus
return p;
}
p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state, std::string_view unl_hash)
p2p::proposal create_stage123_proposal(vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state, std::string_view unl_hash)
{
// The proposal to be emited at the end of this stage.
p2p::proposal p;
@@ -595,7 +614,7 @@ namespace consensus
increment(votes.unl_removals, pubkey);
}
uint32_t required_votes = ceil(vote_threshold * unl_count);
uint32_t required_votes = ceil(STAGE_THRESHOLDS[ctx.stage - 1] * unl_count);
// todo: check if inputs being proposed by another node are actually spoofed inputs
// from a user locally connected to this node.
@@ -664,28 +683,6 @@ namespace consensus
return p;
}
/**
* Broadcasts the given proposal to all connected peers if in PROPOSER mode. Does not send in OBSERVER mode.
* @return 0 on success. -1 if no peers to broadcast.
*/
void broadcast_proposal(const p2p::proposal &p)
{
// In observer mode, we do not send out proposals.
if (conf::cfg.operating_mode == conf::OPERATING_MODE::OBSERVER || !conf::cfg.is_unl) // If we are a non-unl node, do not broadcast proposals.
return;
flatbuffers::FlatBufferBuilder fbuf(1024);
p2pmsg::create_msg_from_proposal(fbuf, p);
p2p::broadcast_message(fbuf, true, false, !conf::cfg.is_consensus_public);
LOG_DEBUG << "Proposed <s" << std::to_string(p.stage) << "> u/i/o:" << p.users.size()
<< "/" << p.hash_inputs.size()
<< "/" << p.hash_outputs.size()
<< " ts:" << std::to_string(p.time)
<< " lcl:" << p.lcl.substr(0, 15)
<< " state:" << p.state;
}
/**
* Check whether our lcl is consistent with the proposals being made by our UNL peers lcl votes.
* @param is_desync Indicates whether our lcl is out-of-sync with majority lcl. Only valid if this method returns True.
@@ -708,7 +705,7 @@ namespace consensus
const uint32_t min_required = ceil(MAJORITY_THRESHOLD * unl_count);
if (total_lcl_votes < min_required)
{
LOG_DEBUG << "Not enough peers proposing to perform consensus. votes:" << total_lcl_votes << " needed:" << min_required;
LOG_INFO << "Not enough peers proposing to perform consensus. votes:" << total_lcl_votes << " needed:" << min_required;
return false;
}
@@ -736,7 +733,7 @@ namespace consensus
const uint32_t min_wins_required = ceil(MAJORITY_THRESHOLD * ctx.candidate_proposals.size());
if (winning_votes < min_wins_required)
{
LOG_DEBUG << "No consensus on lcl. Possible fork condition. won:" << winning_votes << " needed:" << min_wins_required;
LOG_INFO << "No consensus on lcl. Possible fork condition. won:" << winning_votes << " needed:" << min_wins_required;
return false;
}
else

View File

@@ -103,7 +103,7 @@ namespace consensus
int consensus();
bool is_in_sync(std::string_view lcl, std::string_view unl_hash, const size_t unl_count, vote_counter &votes);
int check_sync_status(std::string_view lcl, std::string_view unl_hash, const size_t unl_count, vote_counter &votes);
void check_sync_completion();
@@ -119,7 +119,7 @@ namespace consensus
p2p::proposal create_stage0_proposal(std::string_view lcl, hpfs::h32 state, std::string_view unl_hash);
p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state, std::string_view unl_hash);
p2p::proposal create_stage123_proposal(vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state, std::string_view unl_hash);
void broadcast_proposal(const p2p::proposal &p);

View File

@@ -792,11 +792,12 @@ namespace ledger
}
auto &[cache_seq_no, cache_lcl] = get_ledger_cache_top();
ctx.set_lcl(cache_seq_no, cache_lcl);
// Comparing the sequence number and the lcl to validate the joining point.
if ((history_itr->first - cache_seq_no != 1) && (history_first_proposal.lcl != cache_lcl))
if ((history_itr->first - cache_seq_no != 1) || (history_first_proposal.lcl != cache_lcl))
{
LOG_ERROR << "lcl sync: Ledger integrity check at history joining point failed";
LOG_ERROR << "lcl sync: Ledger integrity check at history joining point failed.";
return -1;
}
}