From bebdace51969bd19149f42f7d9522b35f63ae1c8 Mon Sep 17 00:00:00 2001 From: Asanka Indrajith Date: Mon, 25 Nov 2019 07:40:22 -0500 Subject: [PATCH] Achieving reliable consensus. (#60) * Remove considering stage when voting and considering lcl. * Stage sync , lcl sync and candidate set changes. * Implemented ledger close time resolution and fixed ledger retrieval issues. * Code cleanup and added more comments on reliability changes. * Added further comments and clenup. --- cluster-create.sh | 2 +- src/cons/cons.cpp | 163 +++++++++++++----------- src/cons/cons.hpp | 18 ++- src/cons/ledger_handler.cpp | 133 ++++++++++++------- src/fbschema/p2pmsg_content.fbs | 8 ++ src/fbschema/p2pmsg_content_generated.h | 56 +++++++- src/fbschema/p2pmsg_helpers.cpp | 6 +- src/main.cpp | 3 +- src/p2p/p2p.cpp | 3 +- src/p2p/p2p.hpp | 12 +- 10 files changed, 267 insertions(+), 137 deletions(-) diff --git a/cluster-create.sh b/cluster-create.sh index 17809318..274380d4 100755 --- a/cluster-create.sh +++ b/cluster-create.sh @@ -50,7 +50,7 @@ do binargs: './bin/contract.js', \ peerport: ${peerport}, \ pubport: ${pubport}, \ - roundtime: 10000, + roundtime: 1000, loglevel: 'debug' }, null, 2)" > hp.cfg rm tmp.json diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index bb67e66d..07c34f86 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -40,7 +40,7 @@ int init() ctx.led_seq_no = ldr_hist.led_seq_no; ctx.lcl = ldr_hist.lcl; ctx.lcl_list.swap(ldr_hist.lcl_list); - + ctx.prev_close_time = util::get_epoch_milliseconds(); return 0; } @@ -52,15 +52,29 @@ void consensus() // Get the latest current time. ctx.time_now = util::get_epoch_milliseconds(); + std::list collected_proposals; // Throughout consensus, we move over the incoming proposals collected via the network so far into // the candidate proposal set (move and append). This is to have a private working set for the consensus // and avoid threading conflicts with network incoming proposals. { std::lock_guard lock(p2p::ctx.collected_msgs.proposals_mutex); - ctx.candidate_proposals.splice(ctx.candidate_proposals.end(), p2p::ctx.collected_msgs.proposals); + collected_proposals.splice(collected_proposals.end(), p2p::ctx.collected_msgs.proposals); } + //Copy collected propsals to candidate set of proposals. + //Add mpropsals of new nodes and Replace messages from old nodes to reflect current status of nodes. + for (const auto &proposal : collected_proposals) + { + auto prop_itr = ctx.candidate_proposals.find(proposal.pubkey); + if (prop_itr != ctx.candidate_proposals.end()) + { + ctx.candidate_proposals.erase(prop_itr); + ctx.candidate_proposals.emplace(proposal.pubkey, std::move(proposal)); + } + else + ctx.candidate_proposals.emplace(proposal.pubkey, std::move(proposal)); + } // Throughout consensus, we move over the incoming npl messages collected via the network so far into // the candidate npl message set (move and append). This is to have a private working set for the consensus // and avoid threading conflicts with network incoming npl messages. @@ -78,25 +92,8 @@ void consensus() p2p::ctx.collected_msgs.npl_messages.clear(); } - if (ctx.stage == 0) - { - // Stage 0 means begining of a consensus round. - { - // Remove any useless candidate proposals so we'll have a cleaner proposal set to look at - // when we transition to stage 1. - auto itr = ctx.candidate_proposals.begin(); - while (itr != ctx.candidate_proposals.end()) - { - // Remove any proposal from previous round's stage 3. - // Remove any proposal from self (pubkey match). - // todo: check the state of these to ensure we're running consensus ledger - if (itr->stage == 3 || conf::cfg.pubkey == itr->pubkey) - ctx.candidate_proposals.erase(itr++); - else - ++itr; - } - } - + if (ctx.stage == 0) // Stage 0 means begining of a consensus round. + { // Broadcast non-unl proposals (NUP) containing inputs from locally connected users. broadcast_nonunl_proposal(); util::sleep(conf::cfg.roundtime / 10); @@ -110,21 +107,19 @@ void consensus() } else // Stage 1, 2, 3 { - std::cout << "Started stage " << std::to_string(ctx.stage) << "\n"; - for (auto p : ctx.candidate_proposals) + for (auto &[pubkey, proposal] : ctx.candidate_proposals) { - bool self = p.pubkey == conf::cfg.pubkey; - LOG_DBG << "[stage" << std::to_string(p.stage) - << "] users:" << p.users.size() - << " hinp:" << p.hash_inputs.size() - << " hout:" << p.hash_outputs.size() - << " lcl:" << p.lcl + bool self = proposal.pubkey == conf::cfg.pubkey; + LOG_DBG << "[stage" << std::to_string(proposal.stage) + << "] users:" << proposal.users.size() + << " hinp:" << proposal.hash_inputs.size() + << " hout:" << proposal.hash_outputs.size() + << " lcl:" << proposal.lcl << " self:" << self << "\n"; } - LOG_DBG << "timenow:" << std::to_string(ctx.time_now) << "\n"; // Initialize vote counters vote_counter votes; @@ -134,14 +129,15 @@ void consensus() check_majority_stage(is_stage_desync, reset_to_stage0, majority_stage, votes); if (is_stage_desync) { - timewait_stage(reset_to_stage0); + timewait_stage(reset_to_stage0, floor(conf::cfg.roundtime / 10)); return; } // check if we're ahead/behind of consensus lcl bool is_lcl_desync, should_request_history; std::string majority_lcl; - check_lcl_votes(is_lcl_desync, should_request_history, majority_lcl, votes); + uint64_t time_off = 0; + check_lcl_votes(is_lcl_desync, should_request_history, time_off, majority_lcl, votes); if (should_request_history) { @@ -150,9 +146,11 @@ void consensus() } if (is_lcl_desync) { - bool should_reset = (ctx.time_now - ctx.novel_proposal_time) > (floor(conf::cfg.roundtime) + floor(rand() % conf::cfg.roundtime)); - //for now we are resetting to stage 0 to avoid possible deadlock situations - timewait_stage(should_reset); + //We are resetting to stage 0 to avoid possible deadlock situations. + //Also we try to converge consensus by trying to reset every node in same time(close time range) + //by resetting node to max close time of candidate list of unl list peers. + timewait_stage(true, (time_off - ctx.time_now)); + //LOG_DBG << "time off: " << std::to_string(time_off); return; } @@ -160,19 +158,9 @@ void consensus() const p2p::proposal stg_prop = create_stage123_proposal(votes); broadcast_proposal(stg_prop); - // Remove all candidate proposals that are behind our current stage. - auto itr = ctx.candidate_proposals.begin(); - while (itr != ctx.candidate_proposals.end()) - { - if (itr->stage < ctx.stage) - ctx.candidate_proposals.erase(itr++); - else - ++itr; - } - //ctx.candidate_proposals.clear(); - if (ctx.stage == 3) { + ctx.prev_close_time = stg_prop.time; apply_ledger(stg_prop); // We have finished a consensus round (all 4 stages). @@ -187,7 +175,7 @@ void consensus() // after a stage 0 novel proposal we will just busy wait for proposals if (ctx.stage == 0) - util::sleep(conf::cfg.roundtime / 100); + util::sleep(conf::cfg.roundtime / 10); else util::sleep(conf::cfg.roundtime / 4); } @@ -322,7 +310,7 @@ p2p::proposal create_stage123_proposal(vote_counter &votes) stg_prop.lcl = ctx.lcl; // Vote for rest of the proposal fields by looking at candidate proposals. - for (const p2p::proposal &cp : ctx.candidate_proposals) + for (const auto &[pubkey, cp] : ctx.candidate_proposals) { // Vote for times. // Everyone votes on an arbitrary time, as long as its within the round time and not in the future. @@ -380,6 +368,10 @@ p2p::proposal create_stage123_proposal(vote_counter &votes) stg_prop.time = time; } } + + //todo:apply a round time resolution to increase close time reliability(for stage 1,2) + if (ctx.stage == 3) + get_ledger_time_resolution(stg_prop.time); return stg_prop; } @@ -410,7 +402,7 @@ void broadcast_proposal(const p2p::proposal &p) void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority_stage, vote_counter &votes) { // Stage votes. - for (const p2p::proposal &cp : ctx.candidate_proposals) + for (const auto &[pubkey, cp] : ctx.candidate_proposals) { // Vote stages if only proposal lcl is match with node's last consensus lcl if (cp.lcl == ctx.lcl) @@ -438,32 +430,28 @@ void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority LOG_DBG << "Stage desync (Reset:" << should_reset << "). Node stage:" << std::to_string(ctx.stage) << " is ahead of majority stage:" << std::to_string(majority_stage); } - else if (majority_stage > ctx.stage - 1) - { - should_reset = true; - is_desync = true; - - LOG_DBG << "Stage desync (Reset:" << should_reset << "). Node stage:" << std::to_string(ctx.stage) - << " is behind majority stage:" << std::to_string(majority_stage); - } } /** * Check our LCL is consistent with the proposals being made by our UNL peers lcl_votes. */ -void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes) +void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &time_off, std::string &majority_lcl, vote_counter &votes) { - // Stage votes. int32_t total_lcl_votes = 0; - for (const p2p::proposal &cp : ctx.candidate_proposals) + for (const auto &[pubkey, cp] : ctx.candidate_proposals) { - // only consider recent proposals and proposals from previous stage. - if ((ctx.time_now - cp.timestamp < conf::cfg.roundtime * 4) && (cp.stage == ctx.stage - 1)) + // only consider recent proposals and proposals from previous stage and current stage. + if ((ctx.time_now - cp.timestamp < conf::cfg.roundtime * 4) && cp.stage >= (ctx.stage - 1)) { increment(votes.lcl, cp.lcl); total_lcl_votes++; } + + //keep track of max time of peers, so we can reset nodes in a close time range to increase reliability. + //This is very usefull especially boostrapping a node cluster. + if (cp.time > time_off) + time_off = cp.time; } is_desync = false; @@ -486,26 +474,25 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string } } - double wining_votes_unl_ratio = winning_votes / conf::cfg.unl.size(); - if (wining_votes_unl_ratio < 0.8) - { - // potential fork condition. - LOG_DBG << "No consensus on lcl. Possible fork condition."; - is_desync = true; - return; - } - //if winning lcl is not matched node lcl, - //that means vode is not on the consensus ledger. + //that means vote is not on the consensus ledger. //Should request history from a peer. if (ctx.lcl != majority_lcl) { LOG_DBG << "We are not on the consensus ledger, requesting history from a random peer"; is_desync = true; - should_request_history = true; return; } + + if (winning_votes < 0.8 * ctx.candidate_proposals.size()) + { + // potential fork condition. + // critical!!! + LOG_WARN << "No consensus on lcl. Possible fork condition. " << std::to_string(winning_votes) << std::to_string(ctx.candidate_proposals.size()); + is_desync = true; + return; + } } /** @@ -526,12 +513,38 @@ float_t get_stage_threshold(const uint8_t stage) return -1; } -void timewait_stage(const bool reset) +/** +* Awiat/Sleep consensus to time milliseconds and reset consensus. +* @param reset reset consensus stage to 0 or not. +* @param time milliseconds to sleep/await. +*/ +void timewait_stage(const bool reset, const uint64_t time) { if (reset) + { + ctx.candidate_proposals.clear(); ctx.stage = 0; + } - util::sleep(conf::cfg.roundtime / 100); + util::sleep(time); +} + +/** +* Calculate the effective ledger close time +* After adjusting the ledger close time based on the current resolution, +* also ensure it is sufficiently separated from the prior close time. +* @param close_time voted/agreed closed time +*/ +const uint64_t get_ledger_time_resolution(uint64_t close_time) +{ + uint64_t closeResolution = conf::cfg.roundtime / 4; + //todo: change time resolution dynamically. + //When nodes agree often reduce resolution time and increase if they don't. + + close_time += (closeResolution / 2); + close_time -= (close_time % closeResolution); + + return std::max(close_time, (ctx.prev_close_time + conf::cfg.roundtime)); } /** diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp index b46bb242..59e007b8 100644 --- a/src/cons/cons.hpp +++ b/src/cons/cons.hpp @@ -44,8 +44,10 @@ struct candidate_user_output */ struct consensus_context { - // The set of proposals that are being collected as consensus stages are progressing. - std::list candidate_proposals; + // The map of proposals that are being collected as consensus stages are progressing. + // peer public key is the key. + // todo: having a queue of proposals against peer pubkey. + std::unordered_map candidate_proposals; // The set of npl messages that are being collected as consensus stages are progressing. std::list candidate_npl_messages; @@ -70,7 +72,13 @@ struct consensus_context uint64_t time_now; std::string lcl; uint64_t led_seq_no; + //Map of closed ledgers(only lrdgername[sequnece_number-hash], state hash) with sequence number as map key. + //contains closed ledgers from latest to latest - MAX_LEDGER_SEQUENCE. + //this is loaded when node started and updated throughout consensus - delete ledgers that falls behind MAX_LEDGER_SEQUENCE range. + //We will use this to track lcls related logic.- track state, lcl request, response. std::map lcl_list; + //ledger close time of previous hash + uint64_t prev_close_time; consensus_context() : recent_userinput_hashes(200) { @@ -105,11 +113,13 @@ void broadcast_proposal(const p2p::proposal &p); void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority_stage, vote_counter &votes); -void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes); +void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &time_off, std::string &majority_lcl, vote_counter &votes); float_t get_stage_threshold(const uint8_t stage); -void timewait_stage(const bool reset); +void timewait_stage(const bool reset, const uint64_t time); + +const uint64_t get_ledger_time_resolution(uint64_t close_time); void apply_ledger(const p2p::proposal &proposal); diff --git a/src/cons/ledger_handler.cpp b/src/cons/ledger_handler.cpp index 690117cb..e09b3e56 100644 --- a/src/cons/ledger_handler.cpp +++ b/src/cons/ledger_handler.cpp @@ -62,7 +62,7 @@ const std::tuple save_ledger(const p2p::proposal &p cons::ctx.lcl_list.emplace(led_seq_no, file_name); //Remove old ledgers that exceeds max sequence range. - if (led_seq_no > MAX_LEDGER_SEQUENCE) + if (led_seq_no > MAX_LEDGER_SEQUENCE) { remove_old_ledgers(led_seq_no - MAX_LEDGER_SEQUENCE); } @@ -98,7 +98,8 @@ void remove_old_ledgers(const uint64_t led_seq_no) if (boost::filesystem::exists(file_path)) boost::filesystem::remove(file_path); } - cons::ctx.lcl_list.erase(cons::ctx.lcl_list.begin(), cons::ctx.lcl_list.lower_bound(led_seq_no + 1)); + if (!cons::ctx.lcl_list.empty()) + cons::ctx.lcl_list.erase(cons::ctx.lcl_list.begin(), cons::ctx.lcl_list.lower_bound(led_seq_no + 1)); } /** @@ -244,7 +245,8 @@ bool check_required_lcl_availability(const p2p::history_request &hr) if (itr == cons::ctx.lcl_list.end()) { LOG_DBG << "Required lcl peer asked for is not in our lcl cache."; - //either this node is also not in consesnsus ledger or other node requesting a lcl that is older than maximum ledger range. + //either this node is also not in consesnsus ledger or other node requesting a lcl that is older than node's current + // minimum lcl sequence becuase of maximum ledger history range. return false; } else if (itr->second != hr.required_lcl) @@ -265,9 +267,9 @@ bool check_required_lcl_availability(const p2p::history_request &hr) const p2p::history_response retrieve_ledger_history(const p2p::history_request &hr) { p2p::history_response history_response; - size_t pos = hr.minimum_lcl.find("-"); uint64_t min_seq_no = 0; + std::string min_lcl_hash; //get sequence number of minimum lcl required if (pos != std::string::npos) @@ -278,22 +280,40 @@ const p2p::history_response retrieve_ledger_history(const p2p::history_request & const auto itr = cons::ctx.lcl_list.find(min_seq_no); if (itr != cons::ctx.lcl_list.end()) //requested minimum lcl is not in our lcl history cache { - LOG_DBG << "Minimum lcl peer asked for is not in our lcl cache. Therefore sending from node minimum lcl"; min_seq_no = itr->first; + //check whether minimum lcl node ask for is same as this node's. + //eventhough sequence number are same, lcl hash can be changed if one of node is in a fork condition. + if (hr.minimum_lcl != itr->second) + { + LOG_DBG << "Invalid minimum ledger. Recieved min hash: "<< min_lcl_hash << " Node hash: " << itr->second; + history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER; + return history_response; + } + } + else if (min_seq_no > cons::ctx.lcl_list.rbegin()->first) //Recieved minimum lcl sequence is ahead of node's lcl sequence. + { + LOG_DBG << "Invalid minimum ledger. Recieved minimum sequence number is ahead of node current lcl sequence. hash: "<< min_lcl_hash; + history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER; + return history_response; } else { + LOG_DBG << "Minimum lcl peer asked for is not in our lcl cache. Therefore sending from node minimum lcl"; min_seq_no = cons::ctx.lcl_list.begin()->first; } - //copy current history cache. - std::map lcl_list = cons::ctx.lcl_list; + //LOG_DBG << "history request min seq: " << std::to_string(min_seq_no); - //filter out cache and get raw files here. + //copy current history cache. + std::map + lcl_list = cons::ctx.lcl_list; + + //filter out cache from finalized minimum sequence. lcl_list.erase( lcl_list.begin(), lcl_list.lower_bound(min_seq_no)); + //Get raw content of lcls that going to be send. for (auto &[seq_no, lcl_hash] : lcl_list) { p2p::history_ledger ledger; @@ -350,49 +370,59 @@ void handle_ledger_history_response(const p2p::history_response &hr) return; } - //check whether recieved lcl history contains the current lcl node required. - bool have_requested_lcl = false; - for (auto &[seq_no, ledger] : hr.hist_ledgers) + if (hr.error == p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER) { - if (last_requested_lcl == ledger.lcl) + // This means we are in a fork ledger.Remove/rollback current ledger. + // Basically in the long run we'll rolback one by one untill we catch up to valid minimum ledger . + remove_ledger(ctx.lcl); + cons::ctx.lcl_list.erase(ctx.lcl_list.rbegin()->first); + } + else + { + //check whether recieved lcl history contains the current lcl node required. + bool have_requested_lcl = false; + for (auto &[seq_no, ledger] : hr.hist_ledgers) { - have_requested_lcl = true; - break; + if (last_requested_lcl == ledger.lcl) + { + have_requested_lcl = true; + break; + } } - } - if (!have_requested_lcl) - { - LOG_DBG << "Peer sent us a history response but not containing the lcl we asked for!"; - return; - } - - //Check integrity of recieved lcl list. - //By checking recieved lcl hashes matches lcl content by applying hashing for each raw content. - for (auto &[seq_no, ledger] : hr.hist_ledgers) - { - const size_t pos = ledger.lcl.find("-"); - std::string rec_lcl_hash = ledger.lcl.substr((pos + 1), (ledger.lcl.size() - 1)); - - //Get binary hash of the the serialized lcl. - const std::string lcl = crypto::get_hash(&ledger.raw_ledger[0], ledger.raw_ledger.size()); - - //Get hex from binary hash - std::string lcl_hash; - - util::bin2hex(lcl_hash, - reinterpret_cast(lcl.data()), - lcl.size()); - - //LOG_DBG << "passed lcl: " << ledger.lcl << " gen lcl: " << lcl_hash; - - //recieved lcl hash and hash generated from recieved lcl content doesn't match -> abandon applying it - if (lcl_hash != rec_lcl_hash) + if (!have_requested_lcl) { - LOG_WARN << "peer sent us a history response we asked for but the ledger data does not match the ledger hashes"; - //todo: we should penalize peer who send this? + LOG_DBG << "Peer sent us a history response but not containing the lcl we asked for! " << hr.hist_ledgers.size(); return; } + + //Check integrity of recieved lcl list. + //By checking recieved lcl hashes matches lcl content by applying hashing for each raw content. + for (auto &[seq_no, ledger] : hr.hist_ledgers) + { + const size_t pos = ledger.lcl.find("-"); + std::string rec_lcl_hash = ledger.lcl.substr((pos + 1), (ledger.lcl.size() - 1)); + + //Get binary hash of the the serialized lcl. + const std::string lcl = crypto::get_hash(&ledger.raw_ledger[0], ledger.raw_ledger.size()); + + //Get hex from binary hash + std::string lcl_hash; + + util::bin2hex(lcl_hash, + reinterpret_cast(lcl.data()), + lcl.size()); + + //LOG_DBG << "passed lcl: " << ledger.lcl << " gen lcl: " << lcl_hash; + + //recieved lcl hash and hash generated from recieved lcl content doesn't match -> abandon applying it + if (lcl_hash != rec_lcl_hash) + { + LOG_WARN << "peer sent us a history response we asked for but the ledger data does not match the ledger hashes"; + //todo: we should penalize peer who send this? + return; + } + } } //Execution to here means the history data sent checks out @@ -410,9 +440,18 @@ void handle_ledger_history_response(const p2p::history_response &hr) } last_requested_lcl = ""; - const auto latest_lcl_itr = cons::ctx.lcl_list.rbegin(); - cons::ctx.lcl = latest_lcl_itr->second; - cons::ctx.led_seq_no = latest_lcl_itr->first; + + if (cons::ctx.lcl_list.empty()) + { + cons::ctx.led_seq_no = 0; + cons::ctx.lcl = "0-genesis"; + } + else + { + const auto latest_lcl_itr = cons::ctx.lcl_list.rbegin(); + cons::ctx.lcl = latest_lcl_itr->second; + cons::ctx.led_seq_no = latest_lcl_itr->first; + } } } // namespace cons \ No newline at end of file diff --git a/src/fbschema/p2pmsg_content.fbs b/src/fbschema/p2pmsg_content.fbs index 9cc3d67c..a6482d2e 100644 --- a/src/fbschema/p2pmsg_content.fbs +++ b/src/fbschema/p2pmsg_content.fbs @@ -40,8 +40,16 @@ table History_Request_Message { //Ledger History request type message schema required_lcl:[ubyte]; } +enum Ledger_Response_Error : ubyte +{ + None = 0, + Invalid_Min_Ledger = 1, + Req_Ledger_Not_Found = 2 +} + table History_Response_Message { //Ledger History request type message schema hist_ledgers:[HistoryLedgerPair]; + error: Ledger_Response_Error; } table HistoryLedgerPair { //A key, value pair of byte[]. diff --git a/src/fbschema/p2pmsg_content_generated.h b/src/fbschema/p2pmsg_content_generated.h index 6a0e1535..07f42c41 100644 --- a/src/fbschema/p2pmsg_content_generated.h +++ b/src/fbschema/p2pmsg_content_generated.h @@ -104,6 +104,39 @@ template<> struct MessageTraits { bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Message type); bool VerifyMessageVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector> *values, const flatbuffers::Vector *types); +enum Ledger_Response_Error { + Ledger_Response_Error_None = 0, + Ledger_Response_Error_Invalid_Min_Ledger = 1, + Ledger_Response_Error_Req_Ledger_Not_Found = 2, + Ledger_Response_Error_MIN = Ledger_Response_Error_None, + Ledger_Response_Error_MAX = Ledger_Response_Error_Req_Ledger_Not_Found +}; + +inline const Ledger_Response_Error (&EnumValuesLedger_Response_Error())[3] { + static const Ledger_Response_Error values[] = { + Ledger_Response_Error_None, + Ledger_Response_Error_Invalid_Min_Ledger, + Ledger_Response_Error_Req_Ledger_Not_Found + }; + return values; +} + +inline const char * const *EnumNamesLedger_Response_Error() { + static const char * const names[] = { + "None", + "Invalid_Min_Ledger", + "Req_Ledger_Not_Found", + nullptr + }; + return names; +} + +inline const char *EnumNameLedger_Response_Error(Ledger_Response_Error e) { + if (e < Ledger_Response_Error_None || e > Ledger_Response_Error_Req_Ledger_Not_Found) return ""; + const size_t index = static_cast(e); + return EnumNamesLedger_Response_Error()[index]; +} + struct UserSubmittedMessage FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_CONTENT = 4, @@ -653,7 +686,8 @@ inline flatbuffers::Offset CreateHistory_Request_Messag struct History_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_HIST_LEDGERS = 4 + VT_HIST_LEDGERS = 4, + VT_ERROR = 6 }; const flatbuffers::Vector> *hist_ledgers() const { return GetPointer> *>(VT_HIST_LEDGERS); @@ -661,11 +695,18 @@ struct History_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::T flatbuffers::Vector> *mutable_hist_ledgers() { return GetPointer> *>(VT_HIST_LEDGERS); } + Ledger_Response_Error error() const { + return static_cast(GetField(VT_ERROR, 0)); + } + bool mutate_error(Ledger_Response_Error _error) { + return SetField(VT_ERROR, static_cast(_error), 0); + } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && VerifyOffset(verifier, VT_HIST_LEDGERS) && verifier.VerifyVector(hist_ledgers()) && verifier.VerifyVectorOfTables(hist_ledgers()) && + VerifyField(verifier, VT_ERROR) && verifier.EndTable(); } }; @@ -676,6 +717,9 @@ struct History_Response_MessageBuilder { void add_hist_ledgers(flatbuffers::Offset>> hist_ledgers) { fbb_.AddOffset(History_Response_Message::VT_HIST_LEDGERS, hist_ledgers); } + void add_error(Ledger_Response_Error error) { + fbb_.AddElement(History_Response_Message::VT_ERROR, static_cast(error), 0); + } explicit History_Response_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { start_ = fbb_.StartTable(); @@ -690,19 +734,23 @@ struct History_Response_MessageBuilder { inline flatbuffers::Offset CreateHistory_Response_Message( flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset>> hist_ledgers = 0) { + flatbuffers::Offset>> hist_ledgers = 0, + Ledger_Response_Error error = Ledger_Response_Error_None) { History_Response_MessageBuilder builder_(_fbb); builder_.add_hist_ledgers(hist_ledgers); + builder_.add_error(error); return builder_.Finish(); } inline flatbuffers::Offset CreateHistory_Response_MessageDirect( flatbuffers::FlatBufferBuilder &_fbb, - const std::vector> *hist_ledgers = nullptr) { + const std::vector> *hist_ledgers = nullptr, + Ledger_Response_Error error = Ledger_Response_Error_None) { auto hist_ledgers__ = hist_ledgers ? _fbb.CreateVector>(*hist_ledgers) : 0; return fbschema::p2pmsg::CreateHistory_Response_Message( _fbb, - hist_ledgers__); + hist_ledgers__, + error); } struct HistoryLedgerPair FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { diff --git a/src/fbschema/p2pmsg_helpers.cpp b/src/fbschema/p2pmsg_helpers.cpp index 1da6d07a..4d232def 100644 --- a/src/fbschema/p2pmsg_helpers.cpp +++ b/src/fbschema/p2pmsg_helpers.cpp @@ -166,6 +166,9 @@ const p2p::history_response create_history_response_from_msg(const History_Respo if (msg.hist_ledgers()) hr.hist_ledgers = flatbuf_historyledgermap_to_historyledgermap(msg.hist_ledgers()); + if (msg.error()) + hr.error = (p2p::LEDGER_RESPONSE_ERROR)msg.error(); + return hr; } @@ -318,7 +321,8 @@ void create_msg_from_history_response(flatbuffers::FlatBufferBuilder &container_ flatbuffers::Offset hrmsg = CreateHistory_Response_Message( builder, - historyledgermap_to_flatbuf_historyledgermap(builder, hr.hist_ledgers)); + historyledgermap_to_flatbuf_historyledgermap(builder, hr.hist_ledgers), + (Ledger_Response_Error)hr.error); flatbuffers::Offset message = CreateContent(builder, Message_History_Response_Message, hrmsg.Union()); builder.Finish(message); // Finished building message content to get serialised content. diff --git a/src/main.cpp b/src/main.cpp index 346c1707..906beb29 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -176,8 +176,7 @@ int main(int argc, char **argv) signal(SIGINT, signal_handler); //we are waiting for peer to estasblish peer connections. - //otherwise we'll run into not enough peers propsing/stage desync deadlock directly now. - sleep(3); + sleep(10); //todo: replace waiting with a check to peer check. while (true) { diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index cc298516..fd939ae7 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -71,8 +71,7 @@ void peer_connection_watchdog() } } - //util::sleep(conf::cfg.roundtime * 4); - util::sleep(200); + util::sleep(conf::cfg.roundtime * 4); } } diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index bcbc2991..13c39ea4 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -38,9 +38,19 @@ struct history_ledger std::vector raw_ledger; }; +enum LEDGER_RESPONSE_ERROR +{ + NONE = 0, + INVALID_MIN_LEDGER = 1, + REQ_LEDGER_NOT_FOUND = 2 +}; + + struct history_response { std::map hist_ledgers; + LEDGER_RESPONSE_ERROR error; + }; struct npl_message @@ -50,7 +60,7 @@ struct npl_message struct message_collection { - std::list proposals; + std::list proposals; std::mutex proposals_mutex; // Mutex for proposals access race conditions. std::list nonunl_proposals;