From b506b34b4f03a0965fb27c3977317839455eb657 Mon Sep 17 00:00:00 2001 From: Asanka Indrajith Date: Fri, 6 Dec 2019 05:08:51 -0500 Subject: [PATCH] Consensus reliability enhancements (#62) * Implemented going observer mode, fixed genesis lcl retrieval issue and stage closing time. * Fixed clearing all user output instead of consensed outputs * Modified waiting time to improve performance. * Fixed deadlock of waiting for insufficient peers because of recent changes. * Removed initial waiting time for peer connections to start consensus. --- src/conf.cpp | 25 +++++++----- src/conf.hpp | 8 ++-- src/cons/cons.cpp | 80 ++++++++++++++++++++++++------------- src/cons/cons.hpp | 4 +- src/cons/ledger_handler.cpp | 12 ++++-- src/main.cpp | 5 +-- 6 files changed, 85 insertions(+), 49 deletions(-) diff --git a/src/conf.cpp b/src/conf.cpp index ed46ca94..6e009686 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -14,8 +14,8 @@ contract_ctx ctx; // Global configuration struct exposed to the application. contract_config cfg; -const static char *MODE_PASSIVE = "passive"; -const static char *MODE_ACTIVE = "active"; +const static char *MODE_OBSERVING = "observing"; +const static char *MODE_PROPOSING = "proposing"; /** * Loads and initializes the contract config for execution. Must be called once during application startup. @@ -31,7 +31,7 @@ int init() if (validate_contract_dir_paths() != 0 || load_config() != 0 || validate_config() != 0) return -1; - if (cfg.mode == OPERATING_MODE::ACTIVE) + if (cfg.mode == OPERATING_MODE::PROPOSING) { // Append self peer to peer list. const std::string portstr = std::to_string(cfg.peerport); @@ -91,7 +91,7 @@ int create_contract() crypto::generate_signing_keys(cfg.pubkey, cfg.seckey); binpair_to_hex(); - cfg.mode = OPERATING_MODE::ACTIVE; + cfg.mode = OPERATING_MODE::PROPOSING; cfg.listenip = "0.0.0.0"; cfg.peerport = 22860; cfg.roundtime = 1000; @@ -202,13 +202,13 @@ int load_config() // Load up the values into the struct. - if (d["mode"] == MODE_PASSIVE) - cfg.mode = OPERATING_MODE::PASSIVE; - else if (d["mode"] == MODE_ACTIVE) - cfg.mode = OPERATING_MODE::ACTIVE; + if (d["mode"] == MODE_OBSERVING) + cfg.mode = OPERATING_MODE::OBSERVING; + else if (d["mode"] == MODE_PROPOSING) + cfg.mode = OPERATING_MODE::PROPOSING; else { - std::cout << "Invalid mode. 'passive' or 'active' expected.\n"; + std::cout << "Invalid mode. 'observing' or 'proposing' expected.\n"; return -1; } @@ -304,7 +304,7 @@ int save_config() d.SetObject(); rapidjson::Document::AllocatorType &allocator = d.GetAllocator(); d.AddMember("version", rapidjson::StringRef(util::HP_VERSION), allocator); - d.AddMember("mode", rapidjson::StringRef(cfg.mode == OPERATING_MODE::PASSIVE ? MODE_PASSIVE : MODE_ACTIVE), + d.AddMember("mode", rapidjson::StringRef(cfg.mode == OPERATING_MODE::OBSERVING ? MODE_OBSERVING : MODE_PROPOSING), allocator); d.AddMember("pubkeyhex", rapidjson::StringRef(cfg.pubkeyhex.data()), allocator); @@ -594,4 +594,9 @@ int is_schema_valid(const rapidjson::Document &d) return 0; } +void change_operating_mode(const OPERATING_MODE mode) +{ + cfg.mode = mode; +} + } // namespace conf diff --git a/src/conf.hpp b/src/conf.hpp index 286f6ac1..5a924f4d 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -16,8 +16,8 @@ typedef std::pair ip_port_pair; // The operating mode of the contract node. enum OPERATING_MODE { - PASSIVE = 0, // Observer mode. Only emits NUPs. Does not participate in voting. - ACTIVE = 1 // Consensus participant mode. + OBSERVING = 0, // Observer mode. Only emits NUPs. Does not participate in voting. + PROPOSING = 1 // Consensus participant mode. }; // Holds contextual information about the currently loaded contract. @@ -49,7 +49,7 @@ struct contract_config // Config elements which are loaded from the config file. - OPERATING_MODE mode; // Operating mode of the contract (Passive/Active). + OPERATING_MODE mode; // Operating mode of the contract (Observing/Proposing). std::string pubkeyhex; // Contract hex public key std::string seckeyhex; // Contract hex secret key std::string keytype; // Key generation algorithm used by libsodium @@ -110,6 +110,8 @@ int binpair_to_hex(); int hexpair_to_bin(); +void change_operating_mode(const OPERATING_MODE mode); + } // namespace conf #endif \ No newline at end of file diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index 135255bb..5cb31392 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -91,11 +91,11 @@ void consensus() p2p::ctx.collected_msgs.npl_messages.clear(); } - if (ctx.stage == 0) // Stage 0 means begining of a consensus round. - { + if (ctx.stage == 0) // Stage 0 means begining of a consensus round. + { // Broadcast non-unl proposals (NUP) containing inputs from locally connected users. broadcast_nonunl_proposal(); - util::sleep(conf::cfg.roundtime / 10); + //util::sleep(conf::cfg.roundtime / 10); // Verify and transfer user inputs from incoming NUPs onto consensus candidate data. verify_and_populate_candidate_user_inputs(); @@ -106,7 +106,7 @@ void consensus() } else // Stage 1, 2, 3 { - std::cout << "Started stage " << std::to_string(ctx.stage) << "\n"; + LOG_DBG << "Started stage " << std::to_string(ctx.stage) << "\n"; for (auto &[pubkey, proposal] : ctx.candidate_proposals) { bool self = proposal.pubkey == conf::cfg.pubkey; @@ -128,7 +128,7 @@ void consensus() check_majority_stage(is_stage_desync, reset_to_stage0, majority_stage, votes); if (is_stage_desync) { - timewait_stage(reset_to_stage0, floor(conf::cfg.roundtime / 10)); + timewait_stage(reset_to_stage0, floor(conf::cfg.roundtime / 20)); return; } @@ -145,13 +145,17 @@ void consensus() } if (is_lcl_desync) { - //We are resetting to stage 0 to avoid possible deadlock situations. - //Also we try to converge consensus by trying to reset every node in same time(close time range) - //by resetting node to max close time of candidate list of unl list peers. - timewait_stage(true, (time_off - ctx.time_now)); + //We are resetting to stage 0 to avoid possible deadlock situations by resetting every node in random time using max time. + //this might not make sense now after stage 1 now since we are applying a stage time resolution?. + timewait_stage(true, time_off - ctx.time_now); //LOG_DBG << "time off: " << std::to_string(time_off); return; } + else + { + //Node is in sync with current lcl ->switch to proposing mode. + conf::change_operating_mode(conf::OPERATING_MODE::PROPOSING); + } // In stage 1, 2, 3 we vote for incoming proposals and promote winning votes based on thresholds. const p2p::proposal stg_prop = create_stage123_proposal(votes); @@ -172,11 +176,8 @@ void consensus() // Transition to next stage. ctx.stage = (ctx.stage + 1) % 4; - // after a stage 0 novel proposal we will just busy wait for proposals - if (ctx.stage == 0) - util::sleep(conf::cfg.roundtime / 10); - else - util::sleep(conf::cfg.roundtime / 4); + // after a stage proposal we will just busy wait for proposals. + util::sleep(conf::cfg.roundtime / 4); } /** @@ -367,11 +368,14 @@ p2p::proposal create_stage123_proposal(vote_counter &votes) stg_prop.time = time; } } - + //todo:apply a round time resolution to increase close time reliability(for stage 1,2) if (ctx.stage == 3) get_ledger_time_resolution(stg_prop.time); + else + get_stage_time_resolution(stg_prop.time); + return stg_prop; } @@ -381,8 +385,8 @@ p2p::proposal create_stage123_proposal(vote_counter &votes) */ void broadcast_proposal(const p2p::proposal &p) { - // In passive mode, we do not send out any propopsals. - if (conf::cfg.mode == conf::OPERATING_MODE::PASSIVE) + // In observing mode, we do not send out any proposals. + if (conf::cfg.mode == conf::OPERATING_MODE::OBSERVING) return; p2p::peer_outbound_message msg(std::make_shared(1024)); @@ -447,7 +451,7 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &ti total_lcl_votes++; } - //keep track of max time of peers, so we can reset nodes in a close time range to increase reliability. + //keep track of max time of peers, so we can reset nodes in a random time range to increase reliability. //This is very usefull especially boostrapping a node cluster. if (cp.time > time_off) time_off = cp.time; @@ -460,6 +464,10 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &ti { LOG_DBG << "Not enough peers proposing to perform consensus" << std::to_string(total_lcl_votes) << " needed " << std::to_string(0.8 * conf::cfg.unl.size()); is_desync = true; + + //Not enough nodes are propsing. So Node is switching to Proposing if it's in observing mode. + conf::change_operating_mode(conf::OPERATING_MODE::PROPOSING); + return; } @@ -480,6 +488,10 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &ti { LOG_DBG << "We are not on the consensus ledger, requesting history from a random peer"; is_desync = true; + + //Node is in not sync with current lcl ->switch to observing mode. + conf::change_operating_mode(conf::OPERATING_MODE::OBSERVING); + should_request_history = true; return; } @@ -487,7 +499,6 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &ti if (winning_votes < 0.8 * ctx.candidate_proposals.size()) { // potential fork condition. - // critical!!! LOG_WARN << "No consensus on lcl. Possible fork condition. " << std::to_string(winning_votes) << std::to_string(ctx.candidate_proposals.size()); is_desync = true; return; @@ -521,7 +532,6 @@ void timewait_stage(const bool reset, const uint64_t time) { if (reset) { - ctx.candidate_proposals.clear(); ctx.stage = 0; } @@ -534,18 +544,33 @@ void timewait_stage(const bool reset, const uint64_t time) * also ensure it is sufficiently separated from the prior close time. * @param close_time voted/agreed closed time */ -const uint64_t get_ledger_time_resolution(uint64_t close_time) +uint64_t get_ledger_time_resolution(const uint64_t time) { - uint64_t closeResolution = conf::cfg.roundtime / 4; - //todo: change time resolution dynamically. + uint64_t closeResolution = conf::cfg.roundtime / 4; + //todo: change time resolution dynamically. //When nodes agree often reduce resolution time and increase if they don't. - + uint64_t close_time = time; close_time += (closeResolution / 2); close_time -= (close_time % closeResolution); return std::max(close_time, (ctx.prev_close_time + conf::cfg.roundtime)); } +/** +* Calculate the stage time +* Adjusting the stage time based on the current resolution. +* @param stage_time voted/agreed closed time +*/ +uint64_t get_stage_time_resolution(const uint64_t time) +{ + uint64_t closeResolution = conf::cfg.roundtime / 8; + uint64_t stage_time = time; + stage_time += (closeResolution / 2); + stage_time -= (stage_time % closeResolution); + + return stage_time; +} + /** * Finalize the ledger after consensus. * @param cons_prop The proposal that reached consensus. @@ -631,11 +656,11 @@ void dispatch_user_outputs(const p2p::proposal &cons_prop) user.session->send(usr::user_outbound_message(std::move(msg))); } } + + // now we can safely delete this candidate output. + ctx.candidate_user_outputs.erase(cu_itr); } } - - // now we can safely clear our candidate outputs. - ctx.candidate_user_outputs.clear(); } /** @@ -673,6 +698,7 @@ void feed_user_inputs_to_contract_bufmap(proc::contract_bufmap_t &bufmap, const bufpair.inputs.push_back(std::move(inputtofeed)); // Remove the input from the candidate set because we no longer need it. + //LOG_DBG << "candidate input deleted."; ctx.candidate_user_inputs.erase(itr); } } diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp index 59e007b8..697604a5 100644 --- a/src/cons/cons.hpp +++ b/src/cons/cons.hpp @@ -119,7 +119,9 @@ float_t get_stage_threshold(const uint8_t stage); void timewait_stage(const bool reset, const uint64_t time); -const uint64_t get_ledger_time_resolution(uint64_t close_time); +uint64_t get_ledger_time_resolution(const uint64_t time); + +uint64_t get_stage_time_resolution(const uint64_t time); void apply_ledger(const p2p::proposal &proposal); diff --git a/src/cons/ledger_handler.cpp b/src/cons/ledger_handler.cpp index e09b3e56..4bb55f76 100644 --- a/src/cons/ledger_handler.cpp +++ b/src/cons/ledger_handler.cpp @@ -256,6 +256,10 @@ bool check_required_lcl_availability(const p2p::history_request &hr) return false; } } + else + { + return false; //Very rare case: node asking for the genisis lcl. + } return true; } @@ -285,14 +289,14 @@ const p2p::history_response retrieve_ledger_history(const p2p::history_request & //eventhough sequence number are same, lcl hash can be changed if one of node is in a fork condition. if (hr.minimum_lcl != itr->second) { - LOG_DBG << "Invalid minimum ledger. Recieved min hash: "<< min_lcl_hash << " Node hash: " << itr->second; + LOG_DBG << "Invalid minimum ledger. Recieved min hash: " << min_lcl_hash << " Node hash: " << itr->second; history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER; return history_response; } } - else if (min_seq_no > cons::ctx.lcl_list.rbegin()->first) //Recieved minimum lcl sequence is ahead of node's lcl sequence. + else if (min_seq_no > cons::ctx.lcl_list.rbegin()->first) //Recieved minimum lcl sequence is ahead of node's lcl sequence. { - LOG_DBG << "Invalid minimum ledger. Recieved minimum sequence number is ahead of node current lcl sequence. hash: "<< min_lcl_hash; + LOG_DBG << "Invalid minimum ledger. Recieved minimum sequence number is ahead of node current lcl sequence. hash: " << min_lcl_hash; history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER; return history_response; } @@ -313,7 +317,7 @@ const p2p::history_response retrieve_ledger_history(const p2p::history_request & lcl_list.begin(), lcl_list.lower_bound(min_seq_no)); - //Get raw content of lcls that going to be send. + //Get raw content of lcls that going to be send. for (auto &[seq_no, lcl_hash] : lcl_list) { p2p::history_ledger ledger; diff --git a/src/main.cpp b/src/main.cpp index 3bccc4b9..adab7a3c 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -168,7 +168,7 @@ int main(int argc, char **argv) hplog::init(); LOG_INFO << "Operating mode: " - << (conf::cfg.mode == conf::OPERATING_MODE::PASSIVE ? "Passive" : "Active"); + << (conf::cfg.mode == conf::OPERATING_MODE::OBSERVING ? "Observing" : "Proposing"); if (p2p::init() != 0 || usr::init() != 0 || cons::init() != 0) return -1; @@ -178,9 +178,6 @@ int main(int argc, char **argv) // After initializing primary subsystems, register the SIGINT handler. signal(SIGINT, signal_handler); - //we are waiting for peer to estasblish peer connections. - sleep(10); //todo: replace waiting with a check to peer check. - while (true) { cons::consensus();