diff --git a/src/conf.cpp b/src/conf.cpp index 879daab3..5e71ee5b 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -115,7 +115,7 @@ namespace conf cfg.pubidletimeout = 0; cfg.peeridletimeout = 120; - cfg.is_consensus_public = true; + cfg.is_consensus_public = false; cfg.is_npl_public = false; cfg.msgforwarding = false; diff --git a/src/consensus.cpp b/src/consensus.cpp index fd6e6e37..11a0b909 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -103,19 +103,18 @@ namespace consensus // A consensus round consists of 4 stages (0,1,2,3). // For a given stage, this function may get visited multiple times due to time-wait conditions. - uint64_t stage_start = 0; - if (!wait_and_proceed_stage(stage_start)) + if (!wait_and_proceed_stage()) return 0; // This means the stage has been reset. LOG_DEBUG << "Started stage " << std::to_string(ctx.stage); - // We consider stage start time as the current discreet time throughout the stage. - ctx.time_now = stage_start; - // Throughout consensus, we continously update and prune the candidate proposals for newly // arived ones and expired ones. revise_candidate_proposals(); + // If possible, switch back to proposer mode before stage processing. + check_sync_completion(); + // Get current lcl and state. std::string lcl = ledger::ctx.get_lcl(); const uint64_t lcl_seq_no = ledger::ctx.get_seq_no(); @@ -164,9 +163,7 @@ namespace consensus // If we are in sync, vote and get the final winning votes. // This is the consensus proposal which makes it into the ledger and contract execution const p2p::proposal p = create_stage123_proposal(STAGE3_THRESHOLD, votes, lcl, unl_count, state, unl_hash); - - // Update the unl with the unl changeset that subjected to the consensus. - unl::apply_changeset(p.unl_changeset.additions, p.unl_changeset.removals); + broadcast_proposal(p); // Update the ledger and execute the contract using the consensus proposal. if (update_ledger_and_execute_contract(p, lcl, state) == -1) @@ -229,6 +226,16 @@ namespace consensus return false; } + /** + * Checks whether we can switch back from currently ongoing observer-mode sync operation + * that has been completed. + */ + void check_sync_completion() + { + if (conf::cfg.operating_mode == conf::OPERATING_MODE::OBSERVER && !state_sync::ctx.is_syncing && !ledger::sync_ctx.is_syncing) + conf::change_operating_mode(conf::OPERATING_MODE::PROPOSER); + } + /** * Moves proposals collected from the network into candidate proposals and * cleans up any outdated proposals from the candidate set. @@ -247,24 +254,17 @@ namespace consensus // Add propsals of new nodes and replace proposals from old nodes to reflect current status of nodes. for (const auto &proposal : collected_proposals) { - auto prop_itr = ctx.candidate_proposals.find(proposal.pubkey); - if (prop_itr != ctx.candidate_proposals.end()) - { - ctx.candidate_proposals.erase(prop_itr); - ctx.candidate_proposals.emplace(proposal.pubkey, std::move(proposal)); - } - else - { - ctx.candidate_proposals.emplace(proposal.pubkey, std::move(proposal)); - } + ctx.candidate_proposals.erase(proposal.pubkey); // Erase if already exists. + ctx.candidate_proposals.emplace(proposal.pubkey, std::move(proposal)); } // Prune any outdated proposals. auto itr = ctx.candidate_proposals.begin(); + const uint64_t time_now = util::get_epoch_milliseconds(); while (itr != ctx.candidate_proposals.end()) { const p2p::proposal &cp = itr->second; - const uint64_t time_diff = (ctx.time_now > cp.sent_timestamp) ? (ctx.time_now - cp.sent_timestamp) : 0; + const uint64_t time_diff = (time_now > cp.sent_timestamp) ? (time_now - cp.sent_timestamp) : 0; const int8_t stage_diff = ctx.stage - cp.stage; // only consider recent proposals and proposals from previous stage and current stage. @@ -277,7 +277,8 @@ namespace consensus << " ts:" << std::to_string(cp.time) << " lcl:" << cp.lcl.substr(0, 15) << " state:" << cp.state - << " [from:" << ((cp.pubkey == conf::cfg.pubkey) ? "self" : util::get_hex(cp.pubkey, 1, 5)) << "]"; + << " [from:" << ((cp.pubkey == conf::cfg.pubkey) ? "self" : util::get_hex(cp.pubkey, 1, 5)) << "]" + << "(" << std::to_string(cp.recv_timestamp > cp.sent_timestamp ? cp.recv_timestamp - cp.sent_timestamp : 0) << "ms)"; if (keep_candidate) ++itr; @@ -290,7 +291,7 @@ namespace consensus * Syncrhonise the stage/round time for fixed intervals and reset the stage. * @return True if consensus can proceed in the current round. False if stage is reset. */ - bool wait_and_proceed_stage(uint64_t &stage_start) + bool wait_and_proceed_stage() { // Here, nodes try to synchronise nodes stages using network clock. // We devide universal time to windows of equal size of roundtime. Each round must be synced with the @@ -299,23 +300,24 @@ namespace consensus const uint64_t now = util::get_epoch_milliseconds(); // Rrounds are discreet windows of roundtime. - // This gets the start time of current round window. Stage 0 must start in the next round window. - const uint64_t current_round_start = (((uint64_t)(now / conf::cfg.roundtime)) * conf::cfg.roundtime); if (ctx.stage == 0) { + // This gets the start time of current round window. Stage 0 must start in the window after that. + const uint64_t previous_round_start = (((uint64_t)(now / conf::cfg.roundtime)) * conf::cfg.roundtime); + // Stage 0 must start in the next round window. // (This makes sure stage 3 gets whichever the remaining time in the round after stages 0,1,2) - stage_start = current_round_start + conf::cfg.roundtime; - const uint64_t to_wait = stage_start - now; + ctx.round_start_time = previous_round_start + conf::cfg.roundtime; + const uint64_t to_wait = ctx.round_start_time - now; - LOG_DEBUG << "Waiting " << to_wait << "ms for next round stage 0"; + LOG_DEBUG << "Waiting " << to_wait << "ms for next round stage 0."; util::sleep(to_wait); return true; } else { - stage_start = current_round_start + (ctx.stage * ctx.stage_time); + const uint64_t stage_start = ctx.round_start_time + (ctx.stage * ctx.stage_time); // Compute stage time wait. // Node wait between stages to collect enough proposals from previous stages from other nodes. @@ -325,7 +327,7 @@ namespace consensus // it will join in next round. Otherwise it will continue particapating in this round. if (to_wait < ctx.stage_reset_wait_threshold) //todo: self claculating/adjusting network delay { - LOG_DEBUG << "Missed stage " << std::to_string(ctx.stage) << " window. Resetting to stage 0"; + LOG_DEBUG << "Missed stage " << std::to_string(ctx.stage) << " window. Resetting to stage 0."; ctx.stage = 1; return false; } @@ -515,51 +517,54 @@ namespace consensus { // This is the proposal that stage 0 votes on. // We report our own values in stage 0. - p2p::proposal stg_prop; - stg_prop.time = ctx.time_now; - stg_prop.stage = 0; - stg_prop.lcl = lcl; - stg_prop.state = state; - stg_prop.unl_hash = unl_hash; - crypto::random_bytes(stg_prop.nonce, ROUND_NONCE_SIZE); + p2p::proposal p; + p.time = ctx.round_start_time; + p.stage = 0; + p.lcl = lcl; + p.state = state; + p.unl_hash = unl_hash; + crypto::random_bytes(p.nonce, ROUND_NONCE_SIZE); // Populate the proposal with set of candidate user pubkeys. - stg_prop.users.swap(ctx.candidate_users); + p.users.swap(ctx.candidate_users); // Populate the proposal with hashes of user inputs. for (const auto &[hash, cand_input] : ctx.candidate_user_inputs) - stg_prop.hash_inputs.emplace(hash); + p.hash_inputs.emplace(hash); // Populate the proposal with hashes of user outputs. for (const auto &[hash, cand_output] : ctx.candidate_user_outputs) - stg_prop.hash_outputs.emplace(hash); + p.hash_outputs.emplace(hash); - // Populate the proposal wil unl changeset. - stg_prop.unl_changeset = ctx.candidate_unl_changeset; + // Populate the proposal with unl changeset. + p.unl_changeset = ctx.candidate_unl_changeset; - return stg_prop; + return p; } p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state, std::string_view unl_hash) { // The proposal to be emited at the end of this stage. - p2p::proposal stg_prop; - stg_prop.stage = ctx.stage; - stg_prop.state = state; + p2p::proposal p; + p.stage = ctx.stage; + p.state = state; - // we always vote for our current lcl and state regardless of what other peers are saying - // if there's a fork condition we will either request history and state from + // We always vote for our current lcl and state regardless of what other peers are saying. + // If there's a fork condition we will either request history and state from // our peers or we will halt depending on level of consensus on the sides of the fork. - stg_prop.lcl = lcl; + p.lcl = lcl; - stg_prop.unl_hash = unl_hash; + // We always votr for our current unl hash. + p.unl_hash = unl_hash; + + const uint64_t time_now = util::get_epoch_milliseconds(); // Vote for rest of the proposal fields by looking at candidate proposals. for (const auto &[pubkey, cp] : ctx.candidate_proposals) { // Vote for times. - // Everyone votes on an arbitrary time, as long as it's not in the future and within the round time. - if (ctx.time_now > cp.time && (ctx.time_now - cp.time) <= conf::cfg.roundtime) + // Everyone votes on the discreet time, as long as it's not in the future and within 2 round times. + if (time_now > cp.time && (time_now - cp.time) <= (conf::cfg.roundtime * 2)) increment(votes.time, cp.time); // Vote for round nonce. @@ -600,32 +605,32 @@ namespace consensus // Add user pubkeys which have votes over stage threshold to proposal. for (const auto &[pubkey, numvotes] : votes.users) if (numvotes >= required_votes || (ctx.stage == 1 && numvotes > 0)) - stg_prop.users.emplace(pubkey); + p.users.emplace(pubkey); // Add inputs which have votes over stage threshold to proposal. for (const auto &[hash, numvotes] : votes.inputs) if (numvotes >= required_votes || (ctx.stage == 1 && numvotes > 0)) - stg_prop.hash_inputs.emplace(hash); + p.hash_inputs.emplace(hash); // Add outputs which have votes over stage threshold to proposal. for (const auto &[hash, numvotes] : votes.outputs) if (numvotes >= required_votes) - stg_prop.hash_outputs.emplace(hash); + p.hash_outputs.emplace(hash); - // For the unl changeset reset required votes for majority votes. + // For the unl changeset, reset required votes for majority votes. required_votes = ceil(MAJORITY_THRESHOLD * unl_count); // Add unl additions which have votes over majority threshold to proposal. for (const auto &[pubkey, numvotes] : votes.unl_additions) if (numvotes >= required_votes) - stg_prop.unl_changeset.additions.emplace(pubkey); + p.unl_changeset.additions.emplace(pubkey); // Add unl removals which have votes over majority threshold to proposal. for (const auto &[pubkey, numvotes] : votes.unl_removals) if (numvotes >= required_votes) - stg_prop.unl_changeset.removals.emplace(pubkey); + p.unl_changeset.removals.emplace(pubkey); - // time is voted on a simple sorted (highest to lowest) and majority basis, since there will always be disagreement. + // time is voted on a simple sorted (highest to lowest) and majority basis. uint32_t highest_time_vote = 0; for (auto itr = votes.time.rbegin(); itr != votes.time.rend(); ++itr) { @@ -635,9 +640,12 @@ namespace consensus if (numvotes > highest_time_vote) { highest_time_vote = numvotes; - stg_prop.time = time; + p.time = time; } } + // If final time happens to be 0 (this can happen if there were no proposals to vote for), we set the time manually. + if (p.time == 0) + p.time = ctx.round_start_time; // Round nonce is voted on a simple sorted (highest to lowest) and majority basis, since there will always be disagreement. uint32_t highest_nonce_vote = 0; @@ -648,31 +656,29 @@ namespace consensus if (numvotes > highest_nonce_vote) { - highest_time_vote = numvotes; - stg_prop.nonce = nonce; + highest_nonce_vote = numvotes; + p.nonce = nonce; } } - return stg_prop; + return p; } /** - * Broadcasts the given proposal to all connected peers if in PROPOSER mode. Otherwise - * only send to self in OBSERVER mode. + * Broadcasts the given proposal to all connected peers if in PROPOSER mode. Does not send in OBSERVER mode. * @return 0 on success. -1 if no peers to broadcast. */ void broadcast_proposal(const p2p::proposal &p) { + // In observer mode, we do not send out proposals. + if (conf::cfg.operating_mode == conf::OPERATING_MODE::OBSERVER) + return; + flatbuffers::FlatBufferBuilder fbuf(1024); p2pmsg::create_msg_from_proposal(fbuf, p); + p2p::broadcast_message(fbuf, true, false, !conf::cfg.is_consensus_public); - // In observer mode, we only send out the proposal to ourselves. - if (conf::cfg.operating_mode == conf::OPERATING_MODE::OBSERVER) - p2p::send_message_to_self(fbuf); - else - p2p::broadcast_message(fbuf, true, false, !conf::cfg.is_consensus_public); - - LOG_DEBUG << "Proposed u/i/o:" << p.users.size() + LOG_DEBUG << "Proposed u/i/o:" << p.users.size() << "/" << p.hash_inputs.size() << "/" << p.hash_outputs.size() << " ts:" << std::to_string(p.time) @@ -843,7 +849,8 @@ namespace consensus } } - // Clear candidate unl changset after consensus rounds are completed. + // Update the unl with the unl changeset that subjected to the consensus. + unl::apply_changeset(cons_prop.unl_changeset.additions, cons_prop.unl_changeset.removals); ctx.candidate_unl_changeset.clear(); // Send any output from the previous consensus round to locally connected users. diff --git a/src/consensus.hpp b/src/consensus.hpp index efeb9da3..8620427b 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -68,7 +68,7 @@ namespace consensus p2p::contract_unl_changeset candidate_unl_changeset; uint8_t stage = 1; - uint64_t time_now = 0; + uint64_t round_start_time = 0; uint16_t stage_time = 0; // Time allocated to a consensus stage. uint16_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage. @@ -105,9 +105,11 @@ namespace consensus bool is_in_sync(std::string_view lcl, std::string_view unl_hash, const size_t unl_count, vote_counter &votes); + void check_sync_completion(); + void revise_candidate_proposals(); - bool wait_and_proceed_stage(uint64_t &stage_start); + bool wait_and_proceed_stage(); void broadcast_nonunl_proposal(); diff --git a/src/hpfs/h32.cpp b/src/hpfs/h32.cpp index 78c07e1c..d0dfdd0a 100644 --- a/src/hpfs/h32.cpp +++ b/src/hpfs/h32.cpp @@ -52,14 +52,14 @@ namespace hpfs std::ostream &operator<<(std::ostream &output, const h32 &h) { const uint8_t *buf = reinterpret_cast(&h); - for (int i = 0; i < 8; i++) + for (int i = 0; i < 5; i++) // Only print first 5 bytes in hex. output << std::hex << std::setfill('0') << std::setw(2) << (int)buf[i]; return output; } - // Helper class to support std::map/std::unordered_map custom hashing function. - // This is needed to use B2H as the std map container key. + // Helper func to support std::map/std::unordered_map custom hashing function. + // This is needed to use h32 as the std map container key. size_t h32_std_key_hasher::operator()(const h32 h) const { // Compute individual hash values. http://stackoverflow.com/a/1646913/126995 diff --git a/src/hpfs/hpfs.cpp b/src/hpfs/hpfs.cpp index 17506718..2bdb5d40 100644 --- a/src/hpfs/hpfs.cpp +++ b/src/hpfs/hpfs.cpp @@ -7,7 +7,7 @@ namespace hpfs { constexpr const char *HPFS_TRACE_ARG_ERROR = "trace=error"; - constexpr const char *HPFS_TRACE_ARG_DEBUG = "trace=debug"; + constexpr const char *HPFS_TRACE_ARG_DEBUG = "trace=error"; constexpr const char *HPFS_HMAP_HASH = "::hpfs.hmap.hash"; constexpr const char *HPFS_HMAP_CHILDREN = "::hpfs.hmap.children"; constexpr const char *HPFS_SESSION = "::hpfs.session"; diff --git a/src/ledger.cpp b/src/ledger.cpp index 3436a781..5b6d6fd0 100644 --- a/src/ledger.cpp +++ b/src/ledger.cpp @@ -16,6 +16,12 @@ namespace ledger constexpr uint64_t MAX_LEDGER_SEQUENCE = 256; // Max ledger block count to keep. constexpr uint16_t SYNCER_IDLE_WAIT = 20; // lcl syncer loop sleep time (milliseconds). + // Max no. of repetitive reqeust resubmissions before abandoning the sync. + constexpr uint16_t ABANDON_THRESHOLD = 10; + + // No. of milliseconds to wait before resubmitting a request. + uint16_t REQUEST_RESUBMIT_TIMEOUT; + ledger_context ctx; sync_context sync_ctx; bool init_success = false; @@ -25,6 +31,8 @@ namespace ledger */ int init() { + REQUEST_RESUBMIT_TIMEOUT = conf::cfg.roundtime; + // Filename list of the history folder. std::list sorted_folder_entries = util::fetch_dir_entries(conf::ctx.hist_dir); // Sorting to make filenames in seq_no order. @@ -138,24 +146,17 @@ namespace ledger return; } - const std::string current_lcl = ctx.get_lcl(); - { std::scoped_lock lock(sync_ctx.target_lcl_mutex); if (sync_ctx.target_lcl == target_lcl) return; sync_ctx.target_lcl = target_lcl; sync_ctx.target_lcl_seq_no = target_seq_no; + sync_ctx.target_requested_on = 0; + sync_ctx.request_submissions = 0; sync_ctx.is_syncing = true; - LOG_INFO << "lcl sync: Syncing for target:" << sync_ctx.target_lcl.substr(0, 15) << " (current:" << current_lcl.substr(0, 15) << ")"; - } - - // Request history from a random peer if needed. - // We do not send a request if the target is GENESIS block (nothing to request). - if (target_lcl != GENESIS_LEDGER) - { - send_ledger_history_request(current_lcl, target_lcl); + LOG_INFO << "lcl sync: Syncing for target:" << sync_ctx.target_lcl.substr(0, 15) << " (current:" << ctx.get_lcl().substr(0, 15) << ")"; } } @@ -168,112 +169,154 @@ namespace ledger LOG_INFO << "lcl sync: Worker started."; - std::list> history_requests; - std::list history_responses; - - // Indicates whether any requests/responses were processed in the previous loop iteration. - bool prev_processed = false; - while (!sync_ctx.is_shutting_down) { - // Wait a small delay if there were no requests/responses processed during previous iteration. - if (!prev_processed) - util::sleep(SYNCER_IDLE_WAIT); + // Indicates whether any requests/responses were processed in the loop iteration. + bool processed = false; - const std::string current_lcl = ctx.get_lcl(); - - // Move over the collected sync items to the local lists. - { - std::scoped_lock(sync_ctx.list_mutex); - history_requests.splice(history_requests.end(), sync_ctx.collected_history_requests); - history_responses.splice(history_responses.end(), sync_ctx.collected_history_responses); - } - - prev_processed = !history_requests.empty() || !history_responses.empty(); - - // Process any target lcl sync activities. + // Perform lcl sync activities. { std::scoped_lock lock(sync_ctx.target_lcl_mutex); - if (!sync_ctx.target_lcl.empty()) + send_lcl_sync_request(); // Send lcl requests if needed (or abandon if sync timeout). + + // Process any history responses from other nodes. + if (!sync_ctx.target_lcl.empty() && check_lcl_sync_responses() == 1) + processed = true; + } + + // Serve any history requests from other nodes. + if (check_lcl_sync_requests() == 1) + processed = true; + + // Wait a small delay if there were no requests/responses processed during previous iteration. + if (!processed) + util::sleep(SYNCER_IDLE_WAIT); + } + + LOG_INFO << "lcl sync: Worker stopped."; + } + + /** + * Submits/resubmits lcl history requests as needed. Abandons sync if threshold reached. + */ + void send_lcl_sync_request() + { + // If target lcl is genesis lcl, Clear the ledger history and reset target sequence number. + if (sync_ctx.target_lcl == GENESIS_LEDGER) + { + LOG_INFO << "lcl sync: Target is GENESIS. Clearing our history."; + clear_ledger(); + sync_ctx.clear_target(); + } + else + { + // Check whether we need to send any requests or abandon the sync due to timeout. + const uint64_t time_now = util::get_epoch_milliseconds(); + if ((sync_ctx.target_requested_on == 0) || // Initial request. + (time_now - sync_ctx.target_requested_on) > REQUEST_RESUBMIT_TIMEOUT) // Request resubmission. + { + if (sync_ctx.request_submissions < ABANDON_THRESHOLD) { - // If target lcl is genesis lcl, Clear the ledger history and reset target sequence number. - if (sync_ctx.target_lcl == GENESIS_LEDGER) - { - LOG_INFO << "lcl sync: Target is GENESIS. Clearing our history."; - clear_ledger(); - sync_ctx.target_lcl.clear(); - sync_ctx.target_lcl_seq_no = 0; - sync_ctx.is_syncing = false; - } - // If full history mode is not enabled check the target lcl seq no - // to see whether it's too far ahead. That means no one probably has our - // lcl in their ledgers. So we should clear our entire ledger history before requesting from peers. - else if (!conf::cfg.fullhistory && current_lcl != GENESIS_LEDGER && sync_ctx.target_lcl_seq_no > (ctx.get_seq_no() + MAX_LEDGER_SEQUENCE)) + // Before first request, if full history mode is not enabled check the target lcl seq no to see whether + // it's too far ahead. That means no one probably has our lcl in their ledgers. So we should clear our + // entire ledger history before requesting from peers. + if (sync_ctx.target_requested_on == 0 && !conf::cfg.fullhistory && sync_ctx.target_lcl_seq_no > (ctx.get_seq_no() + MAX_LEDGER_SEQUENCE)) { LOG_INFO << "lcl sync: Target " << sync_ctx.target_lcl.substr(0, 15) << " is too far ahead. Clearing our history."; clear_ledger(); } - else - { - // Scan any queued lcl history responses. - // Only process the first successful item which matches with our current lcl. - for (const p2p::history_response &hr : history_responses) - { - if (hr.requester_lcl == current_lcl) - { - std::string new_lcl; - if (handle_ledger_history_response(hr, new_lcl) != -1) - { - LOG_INFO << "lcl sync: Sync complete. New lcl:" << new_lcl.substr(0, 15); - sync_ctx.target_lcl.clear(); - sync_ctx.target_lcl_seq_no = 0; - sync_ctx.is_syncing = false; - break; - } - } - } - } + + send_ledger_history_request(ctx.get_lcl(), sync_ctx.target_lcl); + sync_ctx.target_requested_on = time_now; + sync_ctx.request_submissions++; } - - history_responses.clear(); - } - - // Serve any history requests from other nodes. - { - // Acquire lock so consensus does not update the ledger while we are reading the ledger. - std::scoped_lock ledger_lock(ctx.ledger_mutex); - - for (const auto &[session_id, hr] : history_requests) + else { - // First check whether we have the required lcl available. - if (!check_required_lcl_availability(hr.required_lcl)) - continue; - - p2p::history_response resp; - if (ledger::retrieve_ledger_history(hr, resp) != -1) - { - flatbuffers::FlatBufferBuilder fbuf(1024); - p2pmsg::create_msg_from_history_response(fbuf, resp); - std::string_view msg = msg::fbuf::flatbuff_bytes_to_sv(fbuf.GetBufferPointer(), fbuf.GetSize()); - - // Find the peer that we should send the state response to. - std::scoped_lock lock(p2p::ctx.peer_connections_mutex); - const auto peer_itr = p2p::ctx.peer_connections.find(session_id); - - if (peer_itr != p2p::ctx.peer_connections.end()) - { - comm::comm_session *session = peer_itr->second; - session->send(msg); - } - } + LOG_INFO << "lcl sync: Resubmission threshold exceeded. Abandoning sync."; + sync_ctx.clear_target(); } + } + } + } - history_requests.clear(); + /** + * Processes any lcl responses we have received from other peers. + * @return 0 if no respones were processed. 1 if at least one response was processed. + */ + int check_lcl_sync_responses() + { + // Move over the collected responses to the local list. + std::list history_responses; + { + std::scoped_lock(sync_ctx.list_mutex); + history_responses.splice(history_responses.end(), sync_ctx.collected_history_responses); + } + + const std::string current_lcl = ctx.get_lcl(); + + // Scan any queued lcl history responses. + // Only process the first successful item which matches with our current lcl. + for (const p2p::history_response &hr : history_responses) + { + if (hr.requester_lcl == current_lcl) + { + std::string new_lcl; + if (handle_ledger_history_response(hr, new_lcl) != -1) + { + LOG_INFO << "lcl sync: Sync complete. New lcl:" << new_lcl.substr(0, 15); + sync_ctx.clear_target(); + + break; + } } } - LOG_INFO << "lcl sync: Worker stopped."; + return history_responses.empty() ? 0 : 1; + } + + /** + * Serves any lcl requests we have received from other peers. + * @return 0 if no requests were served. 1 if at least one request was served. + */ + int check_lcl_sync_requests() + { + // Move over the collected requests to the local list. + std::list> history_requests; + { + std::scoped_lock(sync_ctx.list_mutex); + history_requests.splice(history_requests.end(), sync_ctx.collected_history_requests); + } + + // Acquire lock so consensus does not update the ledger while we are reading the ledger. + std::scoped_lock ledger_lock(ctx.ledger_mutex); + + for (const auto &[session_id, hr] : history_requests) + { + // First check whether we have the required lcl available. + if (!check_required_lcl_availability(hr.required_lcl)) + continue; + + p2p::history_response resp; + if (ledger::retrieve_ledger_history(hr, resp) != -1) + { + flatbuffers::FlatBufferBuilder fbuf(1024); + p2pmsg::create_msg_from_history_response(fbuf, resp); + std::string_view msg = msg::fbuf::flatbuff_bytes_to_sv(fbuf.GetBufferPointer(), fbuf.GetSize()); + + // Find the peer that we should send the state response to. + std::scoped_lock lock(p2p::ctx.peer_connections_mutex); + const auto peer_itr = p2p::ctx.peer_connections.find(session_id); + + if (peer_itr != p2p::ctx.peer_connections.end()) + { + comm::comm_session *session = peer_itr->second; + session->send(msg); + } + } + } + + return history_requests.empty() ? 0 : 1; } /** diff --git a/src/ledger.hpp b/src/ledger.hpp index afdc65fc..f7cf5cde 100644 --- a/src/ledger.hpp +++ b/src/ledger.hpp @@ -14,7 +14,9 @@ namespace ledger { // The current target lcl that we are syncing towards. std::string target_lcl; - uint64_t target_lcl_seq_no; + uint64_t target_lcl_seq_no = 0; + uint64_t target_requested_on = 0; + uint16_t request_submissions = 0; std::mutex target_lcl_mutex; // Lists holding history requests and responses collected from incoming p2p messages. @@ -25,6 +27,15 @@ namespace ledger std::thread lcl_sync_thread; std::atomic is_syncing = false; std::atomic is_shutting_down = false; + + void clear_target() + { + target_lcl.clear(); + target_lcl_seq_no = 0; + target_requested_on = 0; + request_submissions = 0; + is_syncing = false; + } }; struct ledger_context @@ -70,9 +81,15 @@ namespace ledger void deinit(); + void set_sync_target(const std::string &target_lcl); + void lcl_syncer_loop(); - void set_sync_target(const std::string &target_lcl); + void send_lcl_sync_request(); + + int check_lcl_sync_responses(); + + int check_lcl_sync_requests(); const std::pair get_ledger_cache_top(); diff --git a/src/msg/fbuf/p2pmsg_helpers.cpp b/src/msg/fbuf/p2pmsg_helpers.cpp index 30141a95..9ea84245 100644 --- a/src/msg/fbuf/p2pmsg_helpers.cpp +++ b/src/msg/fbuf/p2pmsg_helpers.cpp @@ -226,6 +226,7 @@ namespace msg::fbuf::p2pmsg p.pubkey = flatbuff_bytes_to_sv(pubkey); p.sent_timestamp = timestamp; + p.recv_timestamp = util::get_epoch_milliseconds(); p.time = msg.time(); p.nonce = flatbuff_bytes_to_sv(msg.nonce()); p.stage = msg.stage(); diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 7b22f768..70023c9d 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -47,6 +47,7 @@ namespace p2p std::string pubkey; uint64_t sent_timestamp = 0; // The timestamp of the sender when this proposal was sent. + uint64_t recv_timestamp = 0; // The timestamp when we received the proposal. (used for statsitics) uint64_t time = 0; // The time value that is voted on. uint8_t stage = 0; std::string nonce; // Random nonce that is used to reduce lcl predictability. diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 48ae58dc..9f97ae06 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -214,7 +214,7 @@ namespace p2p if (ctx.collected_msgs.state_responses.size() < p2p::STATE_RES_LIST_CAP) { std::string response(reinterpret_cast(content_ptr), content_size); - ctx.collected_msgs.state_responses.push_back(std::make_pair(session.pubkey, std::move(response))); + ctx.collected_msgs.state_responses.push_back(std::make_pair(session.uniqueid, std::move(response))); } else { diff --git a/src/state/state_sync.cpp b/src/state/state_sync.cpp index f4293dc5..ac3fa111 100644 --- a/src/state/state_sync.cpp +++ b/src/state/state_sync.cpp @@ -22,6 +22,9 @@ namespace state_sync // Request loop sleep time (milliseconds). constexpr uint16_t REQUEST_LOOP_WAIT = 10; + // Max no. of repetitive reqeust resubmissions before abandoning the sync. + constexpr uint16_t ABANDON_THRESHOLD = 20; + constexpr int FILE_PERMS = 0644; // No. of milliseconds to wait before resubmitting a request. @@ -93,9 +96,9 @@ namespace state_sync while (!ctx.is_shutting_down) { hpfs::h32 new_state = hpfs::h32_empty; - request_loop(ctx.target_state, new_state); + const int result = request_loop(ctx.target_state, new_state); - if (ctx.is_shutting_down) + if (result == -1 || ctx.is_shutting_down) break; ctx.pending_requests.clear(); @@ -133,13 +136,16 @@ namespace state_sync LOG_INFO << "State sync: Worker stopped."; } - void request_loop(const hpfs::h32 current_target, hpfs::h32 &updated_state) + int request_loop(const hpfs::h32 current_target, hpfs::h32 &updated_state) { std::string lcl = ledger::ctx.get_lcl(); // Indicates whether any responses were processed in the previous loop iteration. bool prev_responses_processed = false; + // No. of repetitive resubmissions so far. (This is reset whenever we receive a state response) + uint16_t resubmissions_count = 0; + // Send the initial root state request. submit_request(backlog_item{BACKLOG_ITEM_TYPE::DIR, "/", -1, current_target}, lcl); @@ -162,12 +168,16 @@ namespace state_sync prev_responses_processed = !ctx.candidate_state_responses.empty(); + // Reset resubmissions counter whenever we have a resposne. + if (!ctx.candidate_state_responses.empty()) + resubmissions_count = 0; + for (auto &response : ctx.candidate_state_responses) { if (should_stop_request_loop(current_target)) - return; + return 0; - LOG_DEBUG << "State sync: Processing state response from [" << response.first.substr(0, 10) << "]"; + LOG_DEBUG << "State sync: Processing state response from [" << response.first.substr(2, 10) << "]"; const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(response.second.data()); const msg::fbuf::p2pmsg::State_Response_Message *resp_msg = content->message_as_State_Response_Message(); @@ -201,7 +211,7 @@ namespace state_sync if (hpfs::get_hash(updated_state, ctx.hpfs_mount_dir, "/") < 1) { LOG_ERROR << "State sync: exiting due to hash check error."; - return; + return -1; } // Update the central state tracker. @@ -209,7 +219,7 @@ namespace state_sync LOG_DEBUG << "State sync: current:" << updated_state << " | target:" << current_target; if (updated_state == current_target) - return; + return 0; } ctx.candidate_state_responses.clear(); @@ -218,7 +228,7 @@ namespace state_sync for (auto &[hash, request] : ctx.submitted_requests) { if (should_stop_request_loop(current_target)) - return; + return 0; if (request.waiting_time < REQUEST_RESUBMIT_TIMEOUT) { @@ -227,6 +237,12 @@ namespace state_sync } else { + if (++resubmissions_count > ABANDON_THRESHOLD) + { + LOG_INFO << "State sync: Resubmission threshold exceeded. Abandoning sync."; + return -1; + } + // Reset the counter and re-submit request. request.waiting_time = 0; LOG_DEBUG << "State sync: Resubmitting request..."; @@ -241,7 +257,7 @@ namespace state_sync for (int i = 0; i < available_slots && !ctx.pending_requests.empty(); i++) { if (should_stop_request_loop(current_target)) - return; + return 0; const backlog_item &request = ctx.pending_requests.front(); submit_request(request, lcl); @@ -249,6 +265,8 @@ namespace state_sync } } } + + return 0; } /** @@ -299,9 +317,10 @@ namespace state_sync std::string target_pubkey; request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, lcl, target_pubkey); - LOG_DEBUG << "State sync: Requesting from [" << target_pubkey.substr(0, 10) << "]. type:" << request.type - << " path:" << request.path << " block_id:" << request.block_id - << " hash:" << request.expected_hash; + if (!target_pubkey.empty()) + LOG_DEBUG << "State sync: Requesting from [" << target_pubkey.substr(2, 10) << "]. type:" << request.type + << " path:" << request.path << " block_id:" << request.block_id + << " hash:" << request.expected_hash; } /** diff --git a/src/state/state_sync.hpp b/src/state/state_sync.hpp index bd084e87..999db4bb 100644 --- a/src/state/state_sync.hpp +++ b/src/state/state_sync.hpp @@ -62,7 +62,7 @@ namespace state_sync void state_syncer_loop(); - void request_loop(const hpfs::h32 current_target, hpfs::h32 &updated_state); + int request_loop(const hpfs::h32 current_target, hpfs::h32 &updated_state); bool should_stop_request_loop(const hpfs::h32 current_target); diff --git a/test/local-cluster/cluster-create.sh b/test/local-cluster/cluster-create.sh index 4c5a375c..1e424e1b 100755 --- a/test/local-cluster/cluster-create.sh +++ b/test/local-cluster/cluster-create.sh @@ -1,7 +1,9 @@ #!/bin/bash # Script to generate docker container clusters for local development testing. # Generate contract sub-directories within "hpcluster" directory for the given no. of cluster nodes. -# Usage (to generate 5-node cluster): ./cluster-create.sh 5 +# Usage: To generate 5-node cluster: ./cluster-create.sh 5 +# Specify log level (default: inf): ./cluster-create.sh 5 dbg +# Specify round time (default: 1000): ./cluster-create.sh 5 inf 2000 # Validate the node count arg. if [ -n "$1" ] && [ "$1" -eq "$1" ] 2>/dev/null; then @@ -12,6 +14,8 @@ else fi ncount=$1 +loglevel=$2 +roundtime=$3 hpcore=$(realpath ../..) # Contract can be set with 'export CONTRACT='. Defaults to nodejs echo contract. @@ -39,6 +43,13 @@ else # nodejs echo contract (default) binargs="/contract/bin/echo_contract.js" fi +if [ "$loglevel" = "" ]; then + loglevel=inf +fi +if [ "$roundtime" = "" ]; then + roundtime=1000 +fi + # Delete and recreate 'hpcluster' directory. rm -rf hpcluster > /dev/null 2>&1 mkdir hpcluster @@ -80,8 +91,8 @@ do appbillargs: '', \ peerport: ${peerport}, \ pubport: ${pubport}, \ - roundtime: 1000, \ - loglevel: 'inf', \ + roundtime: $roundtime, \ + loglevel: '$loglevel', \ loggers:['console', 'file'] \ }, null, 2)" > hp.cfg rm tmp.json diff --git a/test/vm-cluster/cluster.sh b/test/vm-cluster/cluster.sh index 9d3a351f..7177eeb1 100755 --- a/test/vm-cluster/cluster.sh +++ b/test/vm-cluster/cluster.sh @@ -51,12 +51,12 @@ let nodeid=$2-1 if [ "$mode" = "info" ] || [ "$mode" = "new" ] || [ "$mode" = "update" ] || [ "$mode" = "reconfig" ] || \ [ "$mode" = "start" ] || [ "$mode" = "stop" ] || [ "$mode" = "check" ] || [ "$mode" = "log" ] || [ "$mode" = "kill" ] || \ - [ "$mode" = "ssh" ] || [ "$mode" = "reboot" ] || [ "$mode" = "dns" ] || [ "$mode" = "ssl" ] || [ "$mode" = "lcl" ]; then + [ "$mode" = "ssh" ] || [ "$mode" = "reboot" ] || [ "$mode" = "dns" ] || [ "$mode" = "ssl" ] || [ "$mode" = "lcl" ] || [ "$mode" = "pubkey" ]; then echo "mode: $mode ($contdir)" else echo "Invalid command. [ info | new | update | reconfig" \ " | start [N] | stop [N] | check [N] | log | kill [N] | reboot | ssh or" \ - " | dns | ssl | lcl ] expected." + " | dns | ssl | lcl | pubkey ] expected." exit 1 fi @@ -75,6 +75,7 @@ fi # dns - Uploads given zerossl domain verification file to vm and starts http server for DNS check. # ssl - Uploads matching zerossl certificate bundle from ~/Downloads/ to the contract. # lcl - Displays the lcls of all nodes. +# pubkey - Displays the pubkey on specified vm node or entire cluster. if [ $mode = "info" ]; then echo "${vmaddrs[*]}" | tr ' ' '\n' @@ -246,6 +247,23 @@ if [ $mode = "lcl" ]; then exit 0 fi +if [ $mode = "pubkey" ]; then + command="cat $contdir/cfg/hp.cfg | grep pubkeyhex | cut -d '\"' -f4" + if [ $nodeid = -1 ]; then + for (( i=0; i<$vmcount; i++ )) + do + vmaddr=${vmaddrs[i]} + let nodeid=$i+1 + echo "node"$nodeid":" $(sshpass -p $vmpass ssh $vmuser@$vmaddr $command) & + done + wait + else + vmaddr=${vmaddrs[$nodeid]} + sshpass -p $vmpass ssh $vmuser@$vmaddr $command + fi + exit 0 +fi + # All code below this will only execute in 'new', 'update' or 'reconfig' mode. # Run setup/configuration of entire cluster. @@ -307,7 +325,7 @@ function joinarr { arr=("${!arrname}") skip=$2 - let prevlast=$ncount-2 + let prevlast=$vmcount-2 # Resetting prevlast if nothing is given to skip. if [ $skip -lt 0 ] then @@ -331,7 +349,7 @@ function joinarr { done str="$str]" - echo $str + echo $str # This returns the result. } # Loop through all nodes hp.cfg.