diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index 0d5a64ce..e376d7e4 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -22,8 +22,8 @@ namespace cons { /** - * Voting thresholds for consensus stages. - */ + * Voting thresholds for consensus stages. + */ constexpr float STAGE1_THRESHOLD = 0.5; constexpr float STAGE2_THRESHOLD = 0.65; constexpr float STAGE3_THRESHOLD = 0.8; @@ -33,6 +33,11 @@ namespace cons bool init_success = false; + bool is_shutting_down = false; + + // Consensus processing thread. + std::thread consensus_thread; + int init() { //load lcl details from lcl history. @@ -58,28 +63,56 @@ namespace cons ctx.contract_ctx.args.state_dir = conf::ctx.state_rw_dir; ctx.contract_ctx.args.readonly = false; + // Starting consensus processing thread. + consensus_thread = std::thread(cons::run_consensus); + init_success = true; return 0; } /** - * Cleanup any resources. - */ + * Cleanup any resources. + */ void deinit() { - // Stop the contract if running. - sc::stop(ctx.contract_ctx); + if (init_success) + { + // Making the consensus while loop stop. + is_shutting_down = true; + + // Stop the contract if running. + sc::stop(ctx.contract_ctx); + + // Joining consensus processing thread. + if (consensus_thread.joinable()) + consensus_thread.join(); + } } - int run_consensus() + /** + * Joins the consensus processing thread. + */ + void wait() { - while (true) + consensus_thread.join(); + } + + void run_consensus() + { + util::mask_signal(); + + LOG_INFO << "Consensus processor started."; + + while (!is_shutting_down) { if (consensus() == -1) - return -1; + { + LOG_ERR << "Consensus thread exited due to an error."; + break; + } } - return 0; + LOG_INFO << "Consensus processor stopped."; } int consensus() diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp index 8aac2272..789153e4 100644 --- a/src/cons/cons.hpp +++ b/src/cons/cons.hpp @@ -12,155 +12,156 @@ namespace cons { - -/** - * Represents a contract input that takes part in consensus. - */ -struct candidate_user_input -{ - const std::string userpubkey; - const uint64_t maxledgerseqno = 0; - std::string input; - - candidate_user_input(const std::string userpubkey, const std::string input, const uint64_t maxledgerseqno) - : userpubkey(std::move(userpubkey)), input(std::move(input)), maxledgerseqno(maxledgerseqno) + /** + * Represents a contract input that takes part in consensus. + */ + struct candidate_user_input { - } -}; + const std::string userpubkey; + const uint64_t maxledgerseqno = 0; + std::string input; -/** - * Represents a contract output that takes part in consensus. - */ -struct candidate_user_output -{ - const std::string userpubkey; - std::string output; + candidate_user_input(const std::string userpubkey, const std::string input, const uint64_t maxledgerseqno) + : userpubkey(std::move(userpubkey)), input(std::move(input)), maxledgerseqno(maxledgerseqno) + { + } + }; - candidate_user_output(const std::string userpubkey, const std::string output) - : userpubkey(std::move(userpubkey)), output(std::move(output)) + /** + * Represents a contract output that takes part in consensus. + */ + struct candidate_user_output { - } -}; + const std::string userpubkey; + std::string output; -/** - * This is used to store consensus information - */ -struct consensus_context -{ - // The map of proposals that are being collected as consensus stages are progressing. - // peer public key is the key. - // todo: having a queue of proposals against peer pubkey. - std::unordered_map candidate_proposals; + candidate_user_output(const std::string userpubkey, const std::string output) + : userpubkey(std::move(userpubkey)), output(std::move(output)) + { + } + }; - // The set of npl messages that are being collected as consensus stages are progressing. - std::list candidate_npl_messages; - - // Set of user pubkeys that is said to be connected to the cluster. This will be cleared in each round. - std::unordered_set candidate_users; - - // Map of candidate user inputs with input hash as map key. Inputs will stay here until they - // achieve consensus or expire (due to maxledgerseqno). Input hash is globally unique among inputs - // from all users. We will use this map to feed inputs into the contract once consensus is achieved. - std::unordered_map candidate_user_inputs; - - // Map of outputs generated by the contract with output hash is the map key. Outputs will stay - // here until the end of the current consensus round. Output hash is globally unique among outputs for - // all users. We will use this map to distribute outputs back to connected users once consensus is achieved. - std::unordered_map candidate_user_outputs; - - util::rollover_hashset recent_userinput_hashes; - - uint8_t stage = 0; - uint64_t time_now = 0; - std::string lcl; - uint64_t led_seq_no = 0; - hpfs::h32 state = hpfs::h32_empty; - - //Map of closed ledgers(only lrdgername[sequnece_number-hash], state hash) with sequence number as map key. - //contains closed ledgers from latest to latest - MAX_LEDGER_SEQUENCE. - //this is loaded when node started and updated throughout consensus - delete ledgers that falls behind MAX_LEDGER_SEQUENCE range. - //We will use this to track lcls related logic.- track state, lcl request, response. - std::map ledger_cache; - std::string last_requested_lcl; - - //ledger close time of previous hash - uint16_t stage_time = 0; // Time allocated to a consensus stage. - uint16_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage. - - std::mutex state_sync_lock; - sc::execution_context contract_ctx; - bool is_shutting_down = false; - - consensus_context() - : recent_userinput_hashes(200) + /** + * This is used to store consensus information + */ + struct consensus_context { - } -}; + // The map of proposals that are being collected as consensus stages are progressing. + // peer public key is the key. + // todo: having a queue of proposals against peer pubkey. + std::unordered_map candidate_proposals; -struct vote_counter -{ - std::map time; - std::map lcl; - std::map users; - std::map inputs; - std::map outputs; - std::map state; -}; + // The set of npl messages that are being collected as consensus stages are progressing. + std::list candidate_npl_messages; -extern consensus_context ctx; + // Set of user pubkeys that is said to be connected to the cluster. This will be cleared in each round. + std::unordered_set candidate_users; -int init(); + // Map of candidate user inputs with input hash as map key. Inputs will stay here until they + // achieve consensus or expire (due to maxledgerseqno). Input hash is globally unique among inputs + // from all users. We will use this map to feed inputs into the contract once consensus is achieved. + std::unordered_map candidate_user_inputs; -void deinit(); + // Map of outputs generated by the contract with output hash is the map key. Outputs will stay + // here until the end of the current consensus round. Output hash is globally unique among outputs for + // all users. We will use this map to distribute outputs back to connected users once consensus is achieved. + std::unordered_map candidate_user_outputs; -int run_consensus(); + util::rollover_hashset recent_userinput_hashes; -int consensus(); + uint8_t stage = 0; + uint64_t time_now = 0; + std::string lcl; + uint64_t led_seq_no = 0; + hpfs::h32 state = hpfs::h32_empty; -void purify_candidate_proposals(); + //Map of closed ledgers(only lrdgername[sequnece_number-hash], state hash) with sequence number as map key. + //contains closed ledgers from latest to latest - MAX_LEDGER_SEQUENCE. + //this is loaded when node started and updated throughout consensus - delete ledgers that falls behind MAX_LEDGER_SEQUENCE range. + //We will use this to track lcls related logic.- track state, lcl request, response. + std::map ledger_cache; + std::string last_requested_lcl; -bool wait_and_proceed_stage(uint64_t &stage_start); + //ledger close time of previous hash + uint16_t stage_time = 0; // Time allocated to a consensus stage. + uint16_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage. -void broadcast_nonunl_proposal(); + std::mutex state_sync_lock; + sc::execution_context contract_ctx; + bool is_shutting_down = false; -void verify_and_populate_candidate_user_inputs(); + consensus_context() + : recent_userinput_hashes(200) + { + } + }; -bool verify_appbill_check(std::string_view pubkey, const size_t input_len); + struct vote_counter + { + std::map time; + std::map lcl; + std::map users; + std::map inputs; + std::map outputs; + std::map state; + }; -p2p::proposal create_stage0_proposal(); + extern consensus_context ctx; -p2p::proposal create_stage123_proposal(vote_counter &votes); + int init(); -void broadcast_proposal(const p2p::proposal &p); + void deinit(); -void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes); + void wait(); -void check_state_votes(bool &is_desync, hpfs::h32 &majority_state, vote_counter &votes); + void run_consensus(); -float_t get_stage_threshold(const uint8_t stage); + int consensus(); -void timewait_stage(const bool reset, const uint64_t time); + void purify_candidate_proposals(); -uint64_t get_ledger_time_resolution(const uint64_t time); + bool wait_and_proceed_stage(uint64_t &stage_start); -uint64_t get_stage_time_resolution(const uint64_t time); + void broadcast_nonunl_proposal(); -int apply_ledger(const p2p::proposal &proposal); + void verify_and_populate_candidate_user_inputs(); -void dispatch_user_outputs(const p2p::proposal &cons_prop); + bool verify_appbill_check(std::string_view pubkey, const size_t input_len); -void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop); + p2p::proposal create_stage0_proposal(); -void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap); + p2p::proposal create_stage123_proposal(vote_counter &votes); -void broadcast_npl_output(std::string &output); + void broadcast_proposal(const p2p::proposal &p); -template -void increment(std::map &counter, const T &candidate); + void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes); -int get_initial_state_hash(hpfs::h32 &hash); + void check_state_votes(bool &is_desync, hpfs::h32 &majority_state, vote_counter &votes); -void on_state_sync_completion(const hpfs::h32 new_state); + float_t get_stage_threshold(const uint8_t stage); + + void timewait_stage(const bool reset, const uint64_t time); + + uint64_t get_ledger_time_resolution(const uint64_t time); + + uint64_t get_stage_time_resolution(const uint64_t time); + + int apply_ledger(const p2p::proposal &proposal); + + void dispatch_user_outputs(const p2p::proposal &cons_prop); + + void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop); + + void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap); + + void broadcast_npl_output(std::string &output); + + template + void increment(std::map &counter, const T &candidate); + + int get_initial_state_hash(hpfs::h32 &hash); + + void on_state_sync_completion(const hpfs::h32 new_state); } // namespace cons diff --git a/src/main.cpp b/src/main.cpp index 1831a375..1bc7befa 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -201,13 +201,11 @@ int main(int argc, char **argv) // After initializing primary subsystems, register the SIGINT handler. signal(SIGINT, &sigint_handler); - if (cons::run_consensus() == -1) - { - LOG_ERR << "Error occured in consensus."; - deinit(); - return -1; - } + // Wait until consensus thread finishes. + cons::wait(); + // deinit() here only gets called when there is an error in consensus. + // If not deinit in the sigint handler is called when a SIGINT is received. deinit(); } } diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 6ade8ca6..a5b8eda1 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -153,9 +153,8 @@ namespace p2p { if (ctx.peer_connections.size() == 0) { - LOG_DBG << "No peers to broadcast (not even self). Waiting until at least one peer connects."; - while (ctx.peer_connections.size() == 0) - util::sleep(100); + LOG_DBG << "No peers to broadcast (not even self). Cannot broadcast."; + return; } //Broadcast while locking the peer_connections.