diff --git a/src/consensus.cpp b/src/consensus.cpp index 6127c752..32ec55e1 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -23,13 +23,7 @@ namespace p2pmsg = msg::fbuf::p2pmsg; namespace consensus { - - /** - * Voting thresholds for consensus stages. - */ - constexpr float STAGE1_THRESHOLD = 0.5; - constexpr float STAGE2_THRESHOLD = 0.65; - constexpr float STAGE3_THRESHOLD = 0.8; + constexpr float STAGE_THRESHOLDS[] = {0.5, 0.65, 0.8}; // Voting thresholds for stage 1,2,3 constexpr float MAJORITY_THRESHOLD = 0.8; constexpr size_t ROUND_NONCE_SIZE = 64; @@ -112,16 +106,14 @@ namespace consensus // arived ones and expired ones. revise_candidate_proposals(); - // If possible, switch back to proposer mode before stage processing. + // If possible, switch back to proposer mode before stage processing. (if we were syncing before) check_sync_completion(); // Get current lcl and state. std::string lcl = ledger::ctx.get_lcl(); const uint64_t lcl_seq_no = ledger::ctx.get_seq_no(); - const size_t unl_count = unl::count(); - std::string unl_hash = unl::get_hash(); hpfs::h32 state = state_common::ctx.get_state(); - vote_counter votes; + std::string unl_hash = unl::get_hash(); if (ctx.stage == 0) { @@ -130,53 +122,54 @@ namespace consensus if (verify_and_populate_candidate_user_inputs(lcl_seq_no) == -1) return -1; - const p2p::proposal new_round_prop = create_stage0_proposal(lcl, state, unl_hash); - broadcast_proposal(new_round_prop); + const p2p::proposal p = create_stage0_proposal(lcl, state, unl_hash); + broadcast_proposal(p); + + ctx.stage = 1; // Transition to next stage. } - else if (ctx.stage == 1) + else { - if (is_in_sync(lcl, unl_hash, unl_count, votes)) + // Stages 1,2,3 + + const size_t unl_count = unl::count(); + vote_counter votes; + const int sync_status = check_sync_status(lcl, unl_hash, unl_count, votes); + + if (sync_status == 0) { // If we are in sync, vote and broadcast the winning votes to next stage. - const p2p::proposal p = create_stage123_proposal(STAGE1_THRESHOLD, votes, lcl, unl_count, state, unl_hash); - broadcast_proposal(p); - } - } - else if (ctx.stage == 2) - { - if (is_in_sync(lcl, unl_hash, unl_count, votes)) - { - // If we are in sync, vote and broadcast the winning votes to next stage. - const p2p::proposal p = create_stage123_proposal(STAGE2_THRESHOLD, votes, lcl, unl_count, state, unl_hash); - broadcast_proposal(p); - } - - // During stage 2, broadcast non-unl proposal (NUP) containing inputs from locally connected users. - // This will be captured and verified during every round stage 0. - // (We broadcast this at stage 2 instead of 3 to give it enough time to reach others before next round stage 0) - broadcast_nonunl_proposal(); - } - else if (ctx.stage == 3) - { - if (is_in_sync(lcl, unl_hash, unl_count, votes)) - { - // If we are in sync, vote and get the final winning votes. - // This is the consensus proposal which makes it into the ledger and contract execution - const p2p::proposal p = create_stage123_proposal(STAGE3_THRESHOLD, votes, lcl, unl_count, state, unl_hash); + const p2p::proposal p = create_stage123_proposal(votes, lcl, unl_count, state, unl_hash); broadcast_proposal(p); - // Update the ledger and execute the contract using the consensus proposal. - if (update_ledger_and_execute_contract(p, lcl, state) == -1) + // Upon successful consensus at stage 3, update the ledger and execute the contract using the consensus proposal. + if (ctx.stage == 3 && update_ledger_and_execute_contract(p, lcl, state) == -1) LOG_ERROR << "Error occured in Stage 3 consensus execution."; } + + if (ctx.stage == 2) + { + // At end of stage 2, broadcast non-unl proposal (NUP) containing inputs from locally connected users. + // This will be captured and verified during every round stage 0. + // (We broadcast this at stage 2 in order to give it enough time to reach others before next round stage 0) + broadcast_nonunl_proposal(); + } + + // We have finished a consensus stage. Transition or reset stage based on sync status. + + if (sync_status == -2) + ctx.stage = 0; // Majority lcl unreliable. Reset to stage 0. + else + ctx.stage = (ctx.stage + 1) % 4; // Transition to next stage. (if at stage 3 go to next round stage 0) } - // We have finished a consensus stage. Transition to next stage. (if at stage 3 go to next round stage 0) - ctx.stage = (ctx.stage + 1) % 4; return 0; } - bool is_in_sync(std::string_view lcl, std::string_view unl_hash, const size_t unl_count, vote_counter &votes) + /** + * Checks whether we are in sync with the received votes. + * @return 0 if we are in sync. -1 on lcl or state desync. -2 if majority lcl unreliable. + */ + int check_sync_status(std::string_view lcl, std::string_view unl_hash, const size_t unl_count, vote_counter &votes) { // Check if we're ahead/behind of consensus lcl. bool is_lcl_desync = false; @@ -219,11 +212,15 @@ namespace consensus if (!is_lcl_desync && !is_state_desync && !is_unl_desync) { conf::change_operating_mode(conf::OPERATING_MODE::PROPOSER); - return true; + return 0; } + + // lcl or state desync. + return -1; } - return false; + // Majority lcl couldn't be detected reliably. + return -2; } /** @@ -374,6 +371,28 @@ namespace consensus << " users:" << nup.user_inputs.size(); } + /** + * Broadcasts the given proposal to all connected peers if in PROPOSER mode. Does not send in OBSERVER mode. + * @return 0 on success. -1 if no peers to broadcast. + */ + void broadcast_proposal(const p2p::proposal &p) + { + // In observer mode, we do not send out proposals. + if (conf::cfg.operating_mode == conf::OPERATING_MODE::OBSERVER || !conf::cfg.is_unl) // If we are a non-unl node, do not broadcast proposals. + return; + + flatbuffers::FlatBufferBuilder fbuf(1024); + p2pmsg::create_msg_from_proposal(fbuf, p); + p2p::broadcast_message(fbuf, true, false, !conf::cfg.is_consensus_public); + + LOG_DEBUG << "Proposed u/i/o:" << p.users.size() + << "/" << p.hash_inputs.size() + << "/" << p.hash_outputs.size() + << " ts:" << std::to_string(p.time) + << " lcl:" << p.lcl.substr(0, 15) + << " state:" << p.state; + } + /** * Enqueue npl messages to the npl messages queue. * @param npl_msg Constructed npl message. @@ -542,7 +561,7 @@ namespace consensus return p; } - p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state, std::string_view unl_hash) + p2p::proposal create_stage123_proposal(vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state, std::string_view unl_hash) { // The proposal to be emited at the end of this stage. p2p::proposal p; @@ -595,7 +614,7 @@ namespace consensus increment(votes.unl_removals, pubkey); } - uint32_t required_votes = ceil(vote_threshold * unl_count); + uint32_t required_votes = ceil(STAGE_THRESHOLDS[ctx.stage - 1] * unl_count); // todo: check if inputs being proposed by another node are actually spoofed inputs // from a user locally connected to this node. @@ -664,28 +683,6 @@ namespace consensus return p; } - /** - * Broadcasts the given proposal to all connected peers if in PROPOSER mode. Does not send in OBSERVER mode. - * @return 0 on success. -1 if no peers to broadcast. - */ - void broadcast_proposal(const p2p::proposal &p) - { - // In observer mode, we do not send out proposals. - if (conf::cfg.operating_mode == conf::OPERATING_MODE::OBSERVER || !conf::cfg.is_unl) // If we are a non-unl node, do not broadcast proposals. - return; - - flatbuffers::FlatBufferBuilder fbuf(1024); - p2pmsg::create_msg_from_proposal(fbuf, p); - p2p::broadcast_message(fbuf, true, false, !conf::cfg.is_consensus_public); - - LOG_DEBUG << "Proposed u/i/o:" << p.users.size() - << "/" << p.hash_inputs.size() - << "/" << p.hash_outputs.size() - << " ts:" << std::to_string(p.time) - << " lcl:" << p.lcl.substr(0, 15) - << " state:" << p.state; - } - /** * Check whether our lcl is consistent with the proposals being made by our UNL peers lcl votes. * @param is_desync Indicates whether our lcl is out-of-sync with majority lcl. Only valid if this method returns True. @@ -708,7 +705,7 @@ namespace consensus const uint32_t min_required = ceil(MAJORITY_THRESHOLD * unl_count); if (total_lcl_votes < min_required) { - LOG_DEBUG << "Not enough peers proposing to perform consensus. votes:" << total_lcl_votes << " needed:" << min_required; + LOG_INFO << "Not enough peers proposing to perform consensus. votes:" << total_lcl_votes << " needed:" << min_required; return false; } @@ -736,7 +733,7 @@ namespace consensus const uint32_t min_wins_required = ceil(MAJORITY_THRESHOLD * ctx.candidate_proposals.size()); if (winning_votes < min_wins_required) { - LOG_DEBUG << "No consensus on lcl. Possible fork condition. won:" << winning_votes << " needed:" << min_wins_required; + LOG_INFO << "No consensus on lcl. Possible fork condition. won:" << winning_votes << " needed:" << min_wins_required; return false; } else diff --git a/src/consensus.hpp b/src/consensus.hpp index 8620427b..756e31f2 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -103,7 +103,7 @@ namespace consensus int consensus(); - bool is_in_sync(std::string_view lcl, std::string_view unl_hash, const size_t unl_count, vote_counter &votes); + int check_sync_status(std::string_view lcl, std::string_view unl_hash, const size_t unl_count, vote_counter &votes); void check_sync_completion(); @@ -119,7 +119,7 @@ namespace consensus p2p::proposal create_stage0_proposal(std::string_view lcl, hpfs::h32 state, std::string_view unl_hash); - p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state, std::string_view unl_hash); + p2p::proposal create_stage123_proposal(vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state, std::string_view unl_hash); void broadcast_proposal(const p2p::proposal &p); diff --git a/src/ledger.cpp b/src/ledger.cpp index 9c82d514..b9803f47 100644 --- a/src/ledger.cpp +++ b/src/ledger.cpp @@ -792,11 +792,12 @@ namespace ledger } auto &[cache_seq_no, cache_lcl] = get_ledger_cache_top(); + ctx.set_lcl(cache_seq_no, cache_lcl); // Comparing the sequence number and the lcl to validate the joining point. - if ((history_itr->first - cache_seq_no != 1) && (history_first_proposal.lcl != cache_lcl)) + if ((history_itr->first - cache_seq_no != 1) || (history_first_proposal.lcl != cache_lcl)) { - LOG_ERROR << "lcl sync: Ledger integrity check at history joining point failed"; + LOG_ERROR << "lcl sync: Ledger integrity check at history joining point failed."; return -1; } }