From b172152cbae8fa54bda7b6a388e3f5761afc2094 Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Tue, 25 May 2021 21:43:08 +0530 Subject: [PATCH] Sync and consensus improvements. (#312) * Refactored hpfs sync to support parallel target sync. * Removed role change in consensus. --- src/consensus.cpp | 54 +-- src/consensus.hpp | 3 +- src/hpfs/hpfs_sync.cpp | 673 ++++++++++++++++++------------------- src/hpfs/hpfs_sync.hpp | 92 +++-- src/ledger/ledger.cpp | 4 +- src/ledger/ledger_sync.cpp | 28 +- src/ledger/ledger_sync.hpp | 4 +- src/sc/contract_sync.cpp | 6 +- src/sc/contract_sync.hpp | 2 +- src/util/util.cpp | 3 + 10 files changed, 435 insertions(+), 434 deletions(-) diff --git a/src/consensus.cpp b/src/consensus.cpp index b260938a..1ed8122f 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -106,9 +106,6 @@ namespace consensus // arived ones and expired ones. revise_candidate_proposals(); - // If possible, switch back to validator mode before stage processing. (if we were syncing before) - check_sync_completion(); - // Get current lcl, state, patch, primary shard and raw shard info. p2p::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); util::h32 state_hash = sc::contract_fs.get_parent_hash(sc::STATE_DIR_PATH); @@ -142,9 +139,9 @@ namespace consensus const size_t unl_count = unl::count(); vote_counter votes; - const int sync_status = check_sync_status(unl_count, votes, lcl_id); + ctx.sync_status = check_sync_status(unl_count, votes, lcl_id); - if (sync_status == -2) // Unreliable votes. + if (ctx.sync_status == -2) // Unreliable votes. { ctx.unreliable_votes_attempts++; if (ctx.unreliable_votes_attempts >= MAX_UNRELIABLE_VOTES_ATTEMPTS) @@ -158,7 +155,7 @@ namespace consensus ctx.unreliable_votes_attempts = 0; } - if (sync_status == 0) + if (ctx.sync_status == 0) { // If we are in sync, vote and broadcast the winning votes to next stage. const p2p::proposal p = create_stage123_proposal(votes, unl_count, state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id); @@ -180,12 +177,9 @@ namespace consensus } } - // We have finished a consensus stage. Transition or reset stage based on sync status. - - if (sync_status == -2) - ctx.stage = 0; // Majority last primary shard unreliable. Reset to stage 0. - else - ctx.stage = (ctx.stage + 1) % 4; // Transition to next stage. (if at stage 3 go to next round stage 0) + // We have finished a consensus stage. + // Transition to next stage. (if at stage 3 go to next round stage 0) + ctx.stage = (ctx.stage + 1) % 4; } return 0; @@ -228,7 +222,7 @@ namespace consensus /** * Checks whether we are in sync with the received votes. - * @return 0 if we are in sync. -1 on ledger or hpfs desync. -2 if majority last ledger primary shard hash unreliable. + * @return 0 if we are in sync. -1 on ledger or contract state desync. -2 if majority last ledger primary shard hash unreliable. */ int check_sync_status(const size_t unl_count, vote_counter &votes, const p2p::sequence_hash &lcl_id) { @@ -240,13 +234,11 @@ namespace consensus // Last primary shard hash sync is commenced if we are out-of-sync with majority last primary shard hash. if (is_last_primary_shard_desync) { - conf::change_role(conf::ROLE::OBSERVER); - // We first request the latest shard. const std::string majority_shard_seq_no_str = std::to_string(majority_primary_shard_id.seq_no); const std::string shard_path = std::string(ledger::PRIMARY_DIR).append("/").append(majority_shard_seq_no_str); ledger::ledger_sync_worker.is_last_primary_shard_syncing = true; - ledger::ledger_sync_worker.set_target_push_front(hpfs::sync_target{majority_primary_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); + ledger::ledger_sync_worker.set_target(true, shard_path, majority_primary_shard_id.hash, true); } // Check out raw shard hash with majority raw shard hash. @@ -270,8 +262,6 @@ namespace consensus // Start hpfs sync if we are out-of-sync with majority hpfs patch hash or state hash. if (is_state_desync || is_patch_desync) { - conf::change_role(conf::ROLE::OBSERVER); - if (conf::cfg.node.history == conf::HISTORY::FULL) { // If state or patch is desync set target for the hpfs log sync with the next lcl seq_no. @@ -284,21 +274,20 @@ namespace consensus { // Patch file sync is prioritized, Therefore it is set in the front of the sync target list. if (is_patch_desync) - sc::contract_sync_worker.set_target_push_front(hpfs::sync_target{majority_patch_hash, sc::PATCH_FILE_PATH, hpfs::BACKLOG_ITEM_TYPE::FILE}); + sc::contract_sync_worker.set_target(false, sc::PATCH_FILE_PATH, majority_patch_hash, true); if (is_state_desync) - sc::contract_sync_worker.set_target_push_back(hpfs::sync_target{majority_state_hash, sc::STATE_DIR_PATH, hpfs::BACKLOG_ITEM_TYPE::DIR}); + sc::contract_sync_worker.set_target(true, sc::STATE_DIR_PATH, majority_state_hash); } } // If ledger raw shard is desync, We first request the latest raw shard. if (is_last_raw_shard_desync) { - conf::change_role(conf::ROLE::OBSERVER); const std::string majority_shard_seq_no_str = std::to_string(majority_raw_shard_id.seq_no); const std::string shard_path = std::string(ledger::RAW_DIR).append("/").append(majority_shard_seq_no_str); ledger::ledger_sync_worker.is_last_raw_shard_syncing = true; - ledger::ledger_sync_worker.set_target_push_back(hpfs::sync_target{majority_raw_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); + ledger::ledger_sync_worker.set_target(true, shard_path, majority_raw_shard_id.hash); } // If shards aren't aligned with max shard count, do the relevant shard cleanups and requests. @@ -317,7 +306,6 @@ namespace consensus // Proceed further only if last primary shard, last raw shard, state and patch hashes are in sync with majority. if (!is_last_primary_shard_desync && !is_last_raw_shard_desync && !is_state_desync && !is_patch_desync) { - conf::change_role(conf::ROLE::VALIDATOR); return 0; } @@ -329,19 +317,6 @@ namespace consensus return -2; } - /** - * Checks whether we can switch back from currently ongoing observer-mode sync operation - * that has been completed. - */ - void check_sync_completion() - { - const bool is_contract_syncing = (conf::cfg.node.history == conf::HISTORY::FULL) ? sc::hpfs_log_sync::sync_ctx.is_syncing : sc::contract_sync_worker.is_syncing; - // In ledger sync we only concern about last shard sync status to proceed with consensus. - const bool is_ledger_syncing = ledger::ledger_sync_worker.is_last_primary_shard_syncing || ledger::ledger_sync_worker.is_last_raw_shard_syncing; - if (conf::cfg.node.role == conf::ROLE::OBSERVER && !is_contract_syncing && !is_ledger_syncing) - conf::change_role(conf::ROLE::VALIDATOR); - } - /** * Moves proposals collected from the network into candidate proposals and * cleans up any outdated proposals from the candidate set. @@ -394,8 +369,9 @@ namespace consensus { const p2p::proposal &cp = itr->second; - // Only consider this round's proposals which are from current or previous stage. - const bool stage_valid = ctx.stage >= cp.stage && (ctx.stage - cp.stage) <= 1; + // If we are in sync, only consider this round's proposals which are from current or previous stage. + // Otherwise consider all proposals as long as they are from the same round. + const bool stage_valid = ctx.sync_status == 0 ? (ctx.stage >= cp.stage && (ctx.stage - cp.stage) <= 1) : true; const bool keep_candidate = (ctx.round_start_time == cp.time) && stage_valid; LOG_DEBUG << (keep_candidate ? "Prop--->" : "Erased") << " [s" << std::to_string(cp.stage) @@ -880,7 +856,7 @@ namespace consensus } } - // time is voted on a simple sorted (highest to lowest) and majority basis. + // time is voted on majority basis. uint32_t highest_time_vote = 0; for (const auto &[time, numvotes] : votes.time) { diff --git a/src/consensus.hpp b/src/consensus.hpp index b4248b92..8fa94808 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -111,6 +111,7 @@ namespace consensus uint32_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage. uint64_t round_boundry_offset = 0; // Time window boundry offset based on contract id. uint16_t unreliable_votes_attempts = 0; // No. of times we failed to get reliable votes continously. + int sync_status = 0; // Current sync status. std::optional contract_ctx; std::mutex contract_ctx_mutex; @@ -153,8 +154,6 @@ namespace consensus int check_sync_status(const size_t unl_count, vote_counter &votes, const p2p::sequence_hash &lcl_id); - void check_sync_completion(); - void revise_candidate_proposals(); int prepare_consensed_users(consensed_user_map &consensed_users, const p2p::proposal &cons_prop); diff --git a/src/hpfs/hpfs_sync.cpp b/src/hpfs/hpfs_sync.cpp index a13092c2..fc662828 100644 --- a/src/hpfs/hpfs_sync.cpp +++ b/src/hpfs/hpfs_sync.cpp @@ -19,9 +19,6 @@ namespace hpfs // Max number of requests that can be awaiting response at any given time. constexpr uint16_t MAX_AWAITING_REQUESTS = 4; - // Request loop sleep time (milliseconds). - constexpr uint16_t REQUEST_LOOP_WAIT = 10; - // Max no. of repetitive reqeust resubmissions before abandoning the sync. constexpr uint16_t ABANDON_THRESHOLD = 20; @@ -30,17 +27,8 @@ namespace hpfs constexpr int FILE_PERMS = 0644; -#define SYNC_ERROR -1 -#define SYNC_ABANDONED 0 -#define SYNC_ACHIEVED 1 -#define SYNC_PRIORITY_CHANGED 2 -#define SYNC_HASH_CHANGED 3 -#define REQUEST_LOOP_INTERRUPT \ - { \ - const int res = sync_interrupt(target); \ - if (res != -1) \ - return res; \ - } +// Locates the ongoing target for the provided request vpath. (Matched if target vpath is an ancestor path of the request vpath) +#define TARGET_OF_REQUEST(req_vpath) std::find_if(ongoing_targets.begin(), ongoing_targets.end(), [&](sync_item &t) { return req_vpath.rfind(t.vpath, 0) == 0; }) /** * This should be called to activate the hpfs sync. @@ -71,50 +59,18 @@ namespace hpfs } /** - * This sets a prioritized sync target. This target will replace current sync target. - * This target will immediately starting to sync and the interupted sync will resume - * once this sync target is acheived. - */ - void hpfs_sync::set_target_push_front(const sync_target &target) + * Add or update a sync target. + * @param is_dir whether the sync target is a dir or file. + * @param vpath The vpath of to sync. + * @param hash Target hash to achieve. + * @param high_priority Whether this target should be given higher priority over other ongoing targets. + */ + void hpfs_sync::set_target(const bool is_dir, const std::string &vpath, + const util::h32 &hash, const bool high_priority) { - std::unique_lock lock(target_mutex); - - if (is_shutting_down || (is_syncing && target_list.front() == target)) - return; - - // Remove any previous sync targets for the same target vpath. - target_list.remove_if([&target](const hpfs::sync_target &element) { - return element.vpath == target.vpath; - }); - - target_list.push_front(target); - is_syncing = true; - } - - /** - * Adds a new target to the syncing list. If the list was previously empty, current target - * will be updated and syncing will start. - */ - void hpfs_sync::set_target_push_back(const sync_target &target) - { - std::unique_lock lock(target_mutex); - - if (is_shutting_down) - return; - - // Check whether the same vpath target is already in the sync target list. If so, update it's information. - const auto itr = std::find_if(target_list.begin(), target_list.end(), - [&target](const hpfs::sync_target &element) { - return element.vpath == target.vpath; - }); - if (itr != target_list.end()) - { - itr->hash = target.hash; - itr->item_type = target.item_type; - return; - } - - target_list.push_back(target); + std::unique_lock lock(incoming_targets_mutex); + incoming_targets.emplace(sync_item{ + (is_dir ? SYNC_ITEM_TYPE::DIR : SYNC_ITEM_TYPE::FILE), vpath, -1, hash, high_priority}); is_syncing = true; } @@ -124,303 +80,360 @@ namespace hpfs void hpfs_sync::hpfs_syncer_loop() { util::mask_signal(); - LOG_INFO << "Hpfs " << name << " sync: Worker started."; + // Indicates whether any responses were processed in the previous loop iteration. + bool prev_responses_processed = false; + while (!is_shutting_down) { - util::sleep(IDLE_WAIT); + // Wait a small delay if there were no responses processed during previous iteration. + if (!prev_responses_processed) + util::sleep(IDLE_WAIT); - // Keep idling if we are not doing any sync activity. + // Check whether we have any new/changed targets. + if (check_incoming_targets() == -1) + break; - if (!is_syncing) - continue; + // Move the received hpfs responses to the local response list. + swap_collected_responses(); - if (fs_mount->acquire_rw_session() == -1) + if (ongoing_targets.empty()) { - LOG_ERROR << "Hpfs " << name << " sync: Failed to start hpfs rw session"; - continue; - } - - while (!is_shutting_down) - { - hpfs::sync_target current_target = {}; - { - std::shared_lock lock(target_mutex); - - if (target_list.empty()) - { - LOG_INFO << "Hpfs " << name << " sync: All targets complete."; - is_syncing = false; - break; - } - - // Set the target to be top of the list. - current_target = target_list.front(); - } - - // Start syncing towards specified target. - const int result = request_loop(current_target); - - pending_requests.clear(); candidate_hpfs_responses.clear(); - submitted_requests.clear(); - - if (is_shutting_down) - break; - - if (result != SYNC_HASH_CHANGED && result != SYNC_PRIORITY_CHANGED) - { - // The finished target can be removed from the list. - { - std::unique_lock lock(target_mutex); - target_list.pop_front(); - } - - // After every sync target abandon or completion, release and reacquire hpfs rw session so hpfs gets some room - // to update the last checkpoint. This helps any upcoming ro sessions to get updated file system state. - reacquire_rw_session(); - - if (result == SYNC_ABANDONED) - on_sync_target_abandoned(); - else if (result == SYNC_ACHIEVED) - on_sync_target_acheived(current_target); - } + continue; } - fs_mount->release_rw_session(); + + // Process any sync responses we have received. + prev_responses_processed = process_candidate_responses(); + + if (is_shutting_down) + break; + + // Submit any pending requests to peers. + perform_request_submissions(); } + if (rw_session_active) + fs_mount->release_rw_session(); + LOG_INFO << "Hpfs " << name << " sync: Worker stopped."; } /** - * Reqest loop which syncs towards the specified target. - * @return 0 when sync is abandoned due to resubmission threshold or shutdown. 1 when target sync hash acheived. - * 2 when target has been re-prioritized. 3 when target hash has changed. -1 on error. + * Checks for any new/updated targets that we have received and safely incorporates them into ongoing sync activity. + * @return o on success. -1 on error. */ - int hpfs_sync::request_loop(const hpfs::sync_target &target) + int hpfs_sync::check_incoming_targets() { - LOG_INFO << "Hpfs " << name << " sync: Starting target:" << target.hash << " " << target.vpath; - - // Send the initial root hpfs request of the current target. - submit_request(backlog_item{target.item_type, target.vpath, -1, target.hash}); - - // Indicates whether any responses were processed in the previous loop iteration. - bool prev_responses_processed = false; - - // No. of repetitive resubmissions so far. (This is reset whenever we receive a hpfs response) - uint16_t resubmissions_count = 0; - - while (true) { - REQUEST_LOOP_INTERRUPT - - // Wait a small delay if there were no responses processed during previous iteration. - if (!prev_responses_processed) - util::sleep(REQUEST_LOOP_WAIT); - - // Move the received hpfs responses to the local response list. - swap_collected_responses(); - - prev_responses_processed = !candidate_hpfs_responses.empty(); - - // Reset resubmissions counter whenever we have a resposne. - if (!candidate_hpfs_responses.empty()) - resubmissions_count = 0; - - for (auto &response : candidate_hpfs_responses) + std::unique_lock lock(incoming_targets_mutex); + for (const sync_item &target : incoming_targets) { - REQUEST_LOOP_INTERRUPT + // If we have an ongoing target with the same vpath but having a different hash, we need to destroy + // that target and insert the updated one. - const std::string from = response.first.substr(2, 10); // Sender pubkey. - const p2pmsg::P2PMsg &msg = *p2pmsg::GetP2PMsg(response.second.data()); - const p2pmsg::HpfsResponseMsg &resp_msg = *msg.content_as_HpfsResponseMsg(); - - // Check whether we are actually waiting for this response. If not, ignore it. - std::string_view hash = msg::fbuf::flatbuf_bytes_to_sv(resp_msg.hash()); - std::string_view vpath = msg::fbuf::flatbuf_str_to_sv(resp_msg.path()); - - const std::string key = std::string(vpath).append(hash); - const auto pending_resp_itr = submitted_requests.find(key); - if (pending_resp_itr == submitted_requests.end()) + const auto ex_target = std::find_if(ongoing_targets.begin(), ongoing_targets.end(), + [&](sync_item &t) { return t.vpath == target.vpath; }); + if (ex_target == ongoing_targets.end()) { - LOG_DEBUG << "Hpfs " << name << " sync: Skipping response from [" << from << "] because we are not looking for hash:" - << util::to_hex(hash).substr(0, 10) << " of " << vpath; - continue; + ongoing_targets.push_back(target); + pending_requests.emplace(target); // Places the root request for this target according to priority sorting. + + LOG_INFO << "Hpfs " << name << " sync: Target added. Hash:" << target.expected_hash << " " << target.vpath; } - - // Process the message based on response type. - const p2pmsg::HpfsResponse msg_type = resp_msg.content_type(); - - if (msg_type == p2pmsg::HpfsResponse_HpfsFsEntryResponse) + else if (ex_target->expected_hash != target.expected_hash) { - const p2pmsg::HpfsFsEntryResponse &fs_resp = *resp_msg.content_as_HpfsFsEntryResponse(); + // Existing target's expected hash is obsolete now. Therefore clear all ongoing activity for the obsolete target. + clear_target(ex_target); - // Get fs entries we have received. - std::vector peer_fs_entries; - p2pmsg::flatbuf_hpfsfshashentries_to_hpfsfshashentries(peer_fs_entries, fs_resp.entries()); + ongoing_targets.push_back(target); // Insert the new one to replace the obsolete target. + pending_requests.emplace(target); // Places the root request for this target according to 'sync_item' priority sorting. - // Validate received fs data against the hash. - if (!validate_fs_entry_hash(vpath, hash, fs_resp.dir_mode(), peer_fs_entries)) - { - LOG_INFO << "Hpfs " << name << " sync: Skipping mismatched fs entries response from [" << from << "] for " << vpath; - continue; - } - - LOG_DEBUG << "Hpfs " << name << " sync: Processing fs entries response from [" << from << "] for " << vpath; - handle_fs_entry_response(vpath, fs_resp.dir_mode(), peer_fs_entries); + LOG_INFO << "Hpfs " << name << " sync: Target updated. New hash:" << target.expected_hash << " " << target.vpath; } - else if (msg_type == p2pmsg::HpfsResponse_HpfsFileHashMapResponse) - { - const p2pmsg::HpfsFileHashMapResponse &file_resp = *resp_msg.content_as_HpfsFileHashMapResponse(); - - // File block hashes we received from the peer. - const util::h32 *block_hashes = reinterpret_cast(file_resp.hash_map()->data()); - const size_t block_hash_count = file_resp.hash_map()->size() / sizeof(util::h32); - - // Validate received hashmap against the hash. - if (!validate_file_hashmap_hash(vpath, hash, file_resp.file_mode(), block_hashes, block_hash_count)) - { - LOG_INFO << "Hpfs " << name << " sync: Skipping mismatched hashmap response from [" << from << "] for " << vpath; - continue; - } - - std::set responded_block_ids; - { - const flatbuffers::Vector *fbvec = file_resp.responded_block_ids(); - const uint32_t *ptr = file_resp.responded_block_ids()->data(); - const size_t count = file_resp.responded_block_ids()->size(); - for (size_t i = 0; i < count; i++) - responded_block_ids.emplace(ptr[i]); - } - - LOG_DEBUG << "Hpfs " << name << " sync: Processing file block hashes response from [" << from << "] for " << vpath; - handle_file_hashmap_response(vpath, file_resp.file_mode(), block_hashes, block_hash_count, - responded_block_ids, file_resp.file_length()); - } - else if (msg_type == p2pmsg::HpfsResponse_HpfsBlockResponse) - { - const p2pmsg::HpfsBlockResponse &block_resp = *resp_msg.content_as_HpfsBlockResponse(); - - // Get the file path of the block data we have received. - const uint32_t block_id = block_resp.block_id(); - std::string_view buf = msg::fbuf::flatbuf_bytes_to_sv(block_resp.data()); - - // Validate received block data against the hash. - if (!validate_file_block_hash(hash, block_id, buf)) - { - LOG_INFO << "Hpfs " << name << " sync: Skipping mismatched block response from [" << from << "] for block_id:" << block_id - << " (len:" << buf.length() << ") of " << vpath; - continue; - } - - LOG_DEBUG << "Hpfs " << name << " sync: Processing block response from [" << from << "] for block_id:" << block_id - << " (len:" << buf.length() << ") of " << vpath; - handle_file_block_response(vpath, block_id, buf); - } - - // Now that we have received matching hash and handled it, remove it from the waiting list. - submitted_requests.erase(pending_resp_itr); - - // After handling each response, check whether we have reached target hpfs state. - // get_hash returns 0 incase target parent is not existing in our side. - util::h32 updated_state = util::h32_empty; - if (fs_mount->get_hash(updated_state, hpfs::RW_SESSION_NAME, target.vpath) == -1) - { - LOG_ERROR << "Hpfs " << name << " sync: Abandoning due to hash check error. " << target.vpath; - return SYNC_ERROR; - } - - // Update the central hpfs state tracker. - fs_mount->set_parent_hash(target.vpath, updated_state); - - // End the loop if sync is complete. - if (updated_state == target.hash) - { - LOG_INFO << "Hpfs " << name << " sync: Achieved target:" << target.hash << " " << target.vpath; - return SYNC_ACHIEVED; - } - - LOG_DEBUG << "Hpfs " << name << " sync: Current:" << updated_state << " | target:" << target.hash << " " << target.vpath; } - candidate_hpfs_responses.clear(); + incoming_targets.clear(); + } - // No. of milliseconds to wait before resubmitting a request. - const uint32_t request_resubmit_timeout = hpfs::get_request_resubmit_timeout(); - - // Check for long-awaited responses and re-request them. - for (auto &[hash, request] : submitted_requests) + // Acquire/release hpfs rw session as needed. + if (!rw_session_active && !ongoing_targets.empty()) + { + if (fs_mount->acquire_rw_session() == -1) { - REQUEST_LOOP_INTERRUPT + LOG_ERROR << "Hpfs " << name << " sync: Failed to start hpfs rw session"; + return -1; + } + rw_session_active = true; + } + else if (rw_session_active && ongoing_targets.empty()) + { + if (fs_mount->release_rw_session() == -1) + { + LOG_ERROR << "Hpfs " << name << " sync: Failed to release hpfs rw session"; + return -1; + } + rw_session_active = false; + } - if (request.waiting_time < request_resubmit_timeout) + return 0; + } + + /** + * Clears the specified ongoing target and its associated requests. + * @param target_itr Iterator in the ongoing targets to be erased. + */ + void hpfs_sync::clear_target(const std::vector::iterator &target_itr) + { + // Clear pending requests under the obsolete target. + { + auto itr = pending_requests.begin(); + while (itr != pending_requests.end()) + { + if (itr->vpath.rfind(target_itr->vpath, 0) == 0) // If the request is a sub path of the target's vpath. + pending_requests.erase(itr++); + else + ++itr; + } + } + + // Clear submitted requests under the obsolete target. + { + auto itr = submitted_requests.begin(); + while (itr != submitted_requests.end()) + { + if (itr->second.vpath.rfind(target_itr->vpath, 0) == 0) // If the request is a sub path of the target's vpath. + submitted_requests.erase(itr++); + else + ++itr; + } + } + + ongoing_targets.erase(target_itr); // Clear the obsolete target. + } + + /** + * Submits requests from pending collection to peers, based on request throughput availabilty. + */ + void hpfs_sync::perform_request_submissions() + { + // No. of milliseconds to wait before resubmitting a request. + const uint32_t request_resubmit_timeout = hpfs::get_request_resubmit_timeout(); + + // Check for long-awaited responses and re-request them. + for (auto &[hash, request] : submitted_requests) + { + if (is_shutting_down) + return; + + if (request.waiting_time < request_resubmit_timeout) + { + // Increment wait time. + request.waiting_time += IDLE_WAIT; + } + else + { + // If we have exceeded continous resubmission threshold, clear everything (all targets) and go back to idle state. + if (++resubmissions_count > ABANDON_THRESHOLD) { - // Increment wait time. - request.waiting_time += REQUEST_LOOP_WAIT; + pending_requests.clear(); + submitted_requests.clear(); + ongoing_targets.clear(); + update_sync_status(); + LOG_INFO << "Hpfs " << name << " sync: All targets abandoned due to resubmission threshold."; + + on_sync_abandoned(); } else { - if (++resubmissions_count > ABANDON_THRESHOLD) - { - LOG_INFO << "Hpfs " << name << " sync: Sync abandoned due to resubmission threshold. " << target.vpath; - return SYNC_ABANDONED; - } - // Reset the counter and re-submit request. request.waiting_time = 0; submit_request(request, false, true); } } + } - // Check whether we can submit any more requests. - if (!pending_requests.empty() && submitted_requests.size() < MAX_AWAITING_REQUESTS) + // Check whether we can submit any more requests from the pending collection. + if (!pending_requests.empty() && submitted_requests.size() < MAX_AWAITING_REQUESTS) + { + const uint16_t available_slots = MAX_AWAITING_REQUESTS - submitted_requests.size(); + for (int i = 0; i < available_slots && !pending_requests.empty(); i++) { - const uint16_t available_slots = MAX_AWAITING_REQUESTS - submitted_requests.size(); - for (int i = 0; i < available_slots && !pending_requests.empty(); i++) - { - REQUEST_LOOP_INTERRUPT + if (is_shutting_down) + return; - submit_request(pending_requests.front()); - pending_requests.pop_front(); - } + submit_request(*pending_requests.begin()); + pending_requests.erase(pending_requests.begin()); } } - return SYNC_ABANDONED; } /** - * Indicates whether to break out of hpfs request processing loop and the reason. - * @return 0 if interrupted due to shutdown. 2 when target has been re-prioritized. - * 3 when target hash has changed. -1 if not interrupted. + * Safely updates the global sync status flag based on ongoing and incoming targets. */ - int hpfs_sync::sync_interrupt(const hpfs::sync_target &target) + void hpfs_sync::update_sync_status() { - if (is_shutting_down) + std::unique_lock lock(incoming_targets_mutex); + is_syncing = (!incoming_targets.empty() || !ongoing_targets.empty()); + } + + /** + * Processes any sync responses we have received and updates the local file system state. + * @return Whether any responses were processed or not. + */ + bool hpfs_sync::process_candidate_responses() + { + // Reset resubmissions counter whenever we have a resposne. + if (!candidate_hpfs_responses.empty()) + resubmissions_count = 0; + + const bool responses_processed = !candidate_hpfs_responses.empty(); + + for (auto &response : candidate_hpfs_responses) { - LOG_INFO << "Hpfs " << name << " sync: Sync abandoned due to shutdown. " << target.vpath; - return SYNC_ABANDONED; + if (is_shutting_down) + return false; + + const std::string from = response.first.substr(2, 10); // Sender pubkey. + const p2pmsg::P2PMsg &msg = *p2pmsg::GetP2PMsg(response.second.data()); + const p2pmsg::HpfsResponseMsg &resp_msg = *msg.content_as_HpfsResponseMsg(); + + // Check whether we are actually waiting for this response. If not, ignore it. + std::string_view hash = msg::fbuf::flatbuf_bytes_to_sv(resp_msg.hash()); + std::string_view vpath = msg::fbuf::flatbuf_str_to_sv(resp_msg.path()); + + const std::string key = std::string(vpath).append(hash); + const auto pending_resp_itr = submitted_requests.find(key); + if (pending_resp_itr == submitted_requests.end()) + { + LOG_DEBUG << "Hpfs " << name << " sync: Skipping response from [" << from << "] because we are not looking for hash:" + << util::to_hex(hash).substr(0, 10) << " of " << vpath; + continue; + } + + // Process the message based on response type. + const p2pmsg::HpfsResponse msg_type = resp_msg.content_type(); + + if (msg_type == p2pmsg::HpfsResponse_HpfsFsEntryResponse) + { + const p2pmsg::HpfsFsEntryResponse &fs_resp = *resp_msg.content_as_HpfsFsEntryResponse(); + + // Get fs entries we have received. + std::vector peer_fs_entries; + p2pmsg::flatbuf_hpfsfshashentries_to_hpfsfshashentries(peer_fs_entries, fs_resp.entries()); + + // Validate received fs data against the hash. + if (!validate_fs_entry_hash(vpath, hash, fs_resp.dir_mode(), peer_fs_entries)) + { + LOG_INFO << "Hpfs " << name << " sync: Skipping mismatched fs entries response from [" << from << "] for " << vpath; + continue; + } + + LOG_DEBUG << "Hpfs " << name << " sync: Processing fs entries response from [" << from << "] for " << vpath; + handle_fs_entry_response(vpath, fs_resp.dir_mode(), peer_fs_entries); + } + else if (msg_type == p2pmsg::HpfsResponse_HpfsFileHashMapResponse) + { + const p2pmsg::HpfsFileHashMapResponse &file_resp = *resp_msg.content_as_HpfsFileHashMapResponse(); + + // File block hashes we received from the peer. + const util::h32 *block_hashes = reinterpret_cast(file_resp.hash_map()->data()); + const size_t block_hash_count = file_resp.hash_map()->size() / sizeof(util::h32); + + // Validate received hashmap against the hash. + if (!validate_file_hashmap_hash(vpath, hash, file_resp.file_mode(), block_hashes, block_hash_count)) + { + LOG_INFO << "Hpfs " << name << " sync: Skipping mismatched hashmap response from [" << from << "] for " << vpath; + continue; + } + + std::set responded_block_ids; + { + const flatbuffers::Vector *fbvec = file_resp.responded_block_ids(); + const uint32_t *ptr = file_resp.responded_block_ids()->data(); + const size_t count = file_resp.responded_block_ids()->size(); + for (size_t i = 0; i < count; i++) + responded_block_ids.emplace(ptr[i]); + } + + LOG_DEBUG << "Hpfs " << name << " sync: Processing file block hashes response from [" << from << "] for " << vpath; + handle_file_hashmap_response(vpath, file_resp.file_mode(), block_hashes, block_hash_count, + responded_block_ids, file_resp.file_length()); + } + else if (msg_type == p2pmsg::HpfsResponse_HpfsBlockResponse) + { + const p2pmsg::HpfsBlockResponse &block_resp = *resp_msg.content_as_HpfsBlockResponse(); + + // Get the file path of the block data we have received. + const uint32_t block_id = block_resp.block_id(); + std::string_view buf = msg::fbuf::flatbuf_bytes_to_sv(block_resp.data()); + + // Validate received block data against the hash. + if (!validate_file_block_hash(hash, block_id, buf)) + { + LOG_INFO << "Hpfs " << name << " sync: Skipping mismatched block response from [" << from << "] for block_id:" << block_id + << " (len:" << buf.length() << ") of " << vpath; + continue; + } + + LOG_DEBUG << "Hpfs " << name << " sync: Processing block response from [" << from << "] for block_id:" << block_id + << " (len:" << buf.length() << ") of " << vpath; + handle_file_block_response(vpath, block_id, buf); + } + + // Now that we have received matching hash and handled it successfully, remove it from the waiting list. + submitted_requests.erase(pending_resp_itr); + + // After handling each response, check whether we have achieved the target hash. + { + // Find the ongoing target that this response belongs to. + const auto target_itr = TARGET_OF_REQUEST(vpath); + + if (target_itr != ongoing_targets.end()) + { + const std::string target_vpath = target_itr->vpath; + const util::h32 target_hash = target_itr->expected_hash; + + // get_hash returns 0 incase target parent is not existing in our side. + util::h32 updated_hash = util::h32_empty; + if (fs_mount->get_hash(updated_hash, hpfs::RW_SESSION_NAME, target_vpath) == -1) + { + LOG_ERROR << "Hpfs " << name << " sync: Hash check error. " << target_vpath; + } + + // Update the central hpfs state tracker. + fs_mount->set_parent_hash(target_vpath, updated_hash); + + // This target's sync is complete. + if (updated_hash == target_hash) + { + clear_target(target_itr); // Clear the completed target. + update_sync_status(); + LOG_INFO << "Hpfs " << name << " sync: Achieved target:" << target_hash << " " << target_vpath; + + // When target achieved, release and reacquire the hpfs rw session. This helps any upcoming + // ro sessions to get updated file system state. + fs_mount->release_rw_session(); + util::sleep(HPFS_REAQUIRE_WAIT); + fs_mount->acquire_rw_session(); + + on_sync_target_acheived(target_vpath, target_hash); + } + + LOG_DEBUG << "Hpfs " << name << " sync: Current:" << updated_hash << " | target:" << target_hash << " " << target_vpath; + } + else + { + // We should never hit this error. + LOG_ERROR << "Hpfs " << name << " sync: Process response: Failed to locate target matching " << vpath; + } + } } - // Stop request loop if the target has changed. - std::shared_lock lock(target_mutex); + candidate_hpfs_responses.clear(); - const hpfs::sync_target &top_target = target_list.front(); - - if (target.vpath != top_target.vpath) - { - // A new high priority target has been set. - LOG_INFO << "Hpfs " << name << " sync: Higher priority target found. Abandoned: " << target.vpath; - return SYNC_PRIORITY_CHANGED; - } - else if (target.hash != top_target.hash) - { - // Hash changed of current target. - LOG_INFO << "Hpfs " << name << " sync: Target updated. New hash:" << top_target.hash << " " << top_target.vpath; - return SYNC_HASH_CHANGED; - } - - return -1; // No interrupt. + return responses_processed; } /** @@ -551,27 +564,27 @@ namespace hpfs * Used for hint response monitoring. * @param is_resubmit Whether this is a request resubmission or not. */ - void hpfs_sync::submit_request(const backlog_item &request, const bool watch_only, const bool is_resubmit) + void hpfs_sync::submit_request(const sync_item &request, const bool watch_only, const bool is_resubmit) { - const std::string key = std::string(request.path) + const std::string key = std::string(request.vpath) .append(reinterpret_cast(&request.expected_hash), sizeof(util::h32)); submitted_requests.try_emplace(key, request); if (watch_only) { LOG_DEBUG << "Hpfs " << name << " sync: Watching response for request. type:" << request.type - << " path:" << request.path << " block_id:" << request.block_id + << " path:" << request.vpath << " block_id:" << request.block_id << " hash:" << request.expected_hash; } else { - const bool is_file = request.type != BACKLOG_ITEM_TYPE::DIR; + const bool is_file = request.type != SYNC_ITEM_TYPE::DIR; std::string target_pubkey; - request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, target_pubkey); + request_state_from_peer(request.vpath, is_file, request.block_id, request.expected_hash, target_pubkey); LOG_DEBUG << "Hpfs " << name << " sync: " << (is_resubmit ? "Re-submitting" : "Submitting") << " request to [" << (target_pubkey.empty() ? "" : target_pubkey.substr(2, 10)) << "]. type:" << request.type - << " path:" << request.path << " block_id:" << request.block_id + << " path:" << request.vpath << " block_id:" << request.block_id << " hash:" << request.expected_hash; } } @@ -608,16 +621,19 @@ namespace hpfs if (entry.response_type == p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::MISMATCHED) { - // We must request for this entry. Prioritize file hpfs requests over directories. - if (entry.is_file) - pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, entry.hash}); + // We must request for this entry using the same priority level of the root target. + const auto target_itr = TARGET_OF_REQUEST(child_vpath); + if (target_itr != ongoing_targets.end()) + pending_requests.emplace(sync_item{ + (entry.is_file ? SYNC_ITEM_TYPE::FILE : SYNC_ITEM_TYPE::DIR), child_vpath, -1, entry.hash, target_itr->high_priority}); else - pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, entry.hash}); + // We should never hit this error. + LOG_ERROR << "Hpfs " << name << " sync: Handle fs entry response: Failed to locate target matching " << vpath; } else if (entry.response_type == p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::RESPONDED) { // The peer has already responded with a pre-emptive hint response. So we must start watching for it. - submit_request(backlog_item{entry.is_file ? BACKLOG_ITEM_TYPE::FILE : BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, entry.hash}, true); + submit_request(sync_item{entry.is_file ? SYNC_ITEM_TYPE::FILE : SYNC_ITEM_TYPE::DIR, child_vpath, -1, entry.hash}, true); } else if (entry.response_type == p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::NOT_AVAILABLE) { @@ -665,12 +681,11 @@ namespace hpfs if (responded_block_ids.count(block_id) == 1) { // The peer has already responded with a hint response. So we must start watching for it. - submit_request(backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]}, true); + submit_request(sync_item{SYNC_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]}, true); } else if (block_id >= existing_hash_count || existing_hashes[block_id] != hashes[block_id]) { - // Insert at front to give priority to block requests while preserving block order. - pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]}); + pending_requests.emplace(sync_item{SYNC_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]}); } } @@ -773,7 +788,7 @@ namespace hpfs * This method can be used to invoke mount specific custom logic (after overriding this method) to be executed after * a sync target is acheived. */ - void hpfs_sync::on_sync_target_acheived(const sync_target &synced_target) + void hpfs_sync::on_sync_target_acheived(const std::string &vpath, const util::h32 &hash) { } @@ -781,36 +796,8 @@ namespace hpfs * This method can be used to invoke mount specific custom logic (after overriding this method) to be executed after * a sync is abondened. */ - void hpfs_sync::on_sync_target_abandoned() + void hpfs_sync::on_sync_abandoned() { } - /** - * This method can be used to invoke mount specific custom logic (after overriding this method) to be executed after - * a full sync is complete. - */ - void hpfs_sync::on_sync_complete(const sync_target &last_sync_target) - { - LOG_INFO << "Hpfs " << name << " sync: All targets synced."; - } - - /** - * Releases and reacquires the rw session after a short delay. - * This is used to give hpfs some room to update the last checkpoint during long runinng sync operations. - * @return 0 on success. -1 on failure. - */ - int hpfs_sync::reacquire_rw_session() - { - fs_mount->release_rw_session(); - util::sleep(HPFS_REAQUIRE_WAIT); - - if (fs_mount->acquire_rw_session() == -1) - { - LOG_ERROR << "Hpfs " << name << " sync: Error reacquring rw session."; - return -1; - } - - return 0; - } - } // namespace hpfs \ No newline at end of file diff --git a/src/hpfs/hpfs_sync.hpp b/src/hpfs/hpfs_sync.hpp index a032873a..07f1410a 100644 --- a/src/hpfs/hpfs_sync.hpp +++ b/src/hpfs/hpfs_sync.hpp @@ -9,7 +9,7 @@ namespace hpfs { - enum BACKLOG_ITEM_TYPE + enum SYNC_ITEM_TYPE { DIR = 0, FILE = 1, @@ -17,27 +17,54 @@ namespace hpfs }; // Represents a queued up state sync operation which needs to be performed. - struct backlog_item + struct sync_item { - BACKLOG_ITEM_TYPE type = BACKLOG_ITEM_TYPE::DIR; - std::string path; + SYNC_ITEM_TYPE type = SYNC_ITEM_TYPE::DIR; + std::string vpath; int32_t block_id = -1; // Only relevant if type=BLOCK util::h32 expected_hash; + bool high_priority = false; // No. of millisconds that this item has been waiting in pending state. // Used by pending_responses list to increase waiting time and resubmit request. uint32_t waiting_time = 0; - }; - struct sync_target - { - util::h32 hash = util::h32_empty; - std::string vpath; - BACKLOG_ITEM_TYPE item_type = BACKLOG_ITEM_TYPE::DIR; - - bool operator==(const sync_target &target) const + uint32_t priority() const { - return this->vpath == target.vpath && this->hash == target.hash; + // Lesser value means higher priority. + /** + * Priority order: + * High prio file block + * High prio file hashmap + * High prio dir children + * Low prio file block + * Low prio file hashmap + * Low prio dir children + */ + + return ((high_priority ? 1 : 2) * 10) + (type == SYNC_ITEM_TYPE::BLOCK ? 1 : (type == SYNC_ITEM_TYPE::FILE ? 2 : 3)); + } + + bool operator==(const sync_item &other) const + { + return type == other.type && vpath == other.vpath && block_id == other.block_id && expected_hash == other.expected_hash; + } + + bool operator<(const sync_item &other) const + { + const uint32_t prio = priority(); + const uint32_t other_prio = other.priority(); + if (prio == other_prio) + { + if (vpath == other.vpath) + return block_id < other.block_id; + else + return vpath < other.vpath; + } + else + { + return prio < other_prio; + } } }; @@ -47,22 +74,34 @@ namespace hpfs bool init_success = false; std::string name; // Name used for logging. - std::shared_mutex target_mutex; - std::list target_list; // The current target hashes we are syncing towards. - - std::list pending_requests; // List of pending sync requests to be sent out. + std::shared_mutex incoming_targets_mutex; + std::set incoming_targets; // The targets that we need to sync towards but have not looked at yet. + std::vector ongoing_targets; // The targets that we have taken into processing. + std::set pending_requests; // List of pending sync requests to be sent out. // List of submitted requests we are awaiting responses for, keyed by expected response path+hash. - std::unordered_map submitted_requests; + std::unordered_map submitted_requests; + + // No. of repetitive resubmissions so far. (This is reset whenever we receive a hpfs response) + uint16_t resubmissions_count = 0; + + // Whether the hpfs rw session is running or not. + bool rw_session_active = false; std::thread hpfs_sync_thread; std::atomic is_shutting_down = false; void hpfs_syncer_loop(); - int request_loop(const hpfs::sync_target &target); + int check_incoming_targets(); - int sync_interrupt(const hpfs::sync_target &target); + void clear_target(const std::vector::iterator &ex_target); + + void perform_request_submissions(); + + void update_sync_status(); + + bool process_candidate_responses(); bool validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const mode_t dir_mode, const std::vector &peer_fs_entries); @@ -75,7 +114,7 @@ namespace hpfs void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, const util::h32 expected_hash, std::string &target_pubkey); - void submit_request(const backlog_item &request, const bool watch_only = false, const bool is_resubmit = false); + void submit_request(const sync_item &request, const bool watch_only = false, const bool is_resubmit = false); int handle_fs_entry_response(std::string_view vpath, const mode_t dir_mode, const std::vector &peer_fs_entries); @@ -92,11 +131,9 @@ namespace hpfs hpfs::hpfs_mount *fs_mount = NULL; - virtual void on_sync_target_acheived(const sync_target &synced_target); + virtual void on_sync_target_acheived(const std::string &vpath, const util::h32 &hash); - virtual void on_sync_target_abandoned(); - - virtual void on_sync_complete(const sync_target &last_sync_target); + virtual void on_sync_abandoned(); // Move the collected responses from hpfs responses to a local response list. virtual void swap_collected_responses() = 0; // Must override in child classes. @@ -110,9 +147,8 @@ namespace hpfs void deinit(); - void set_target_push_front(const sync_target &target); - - void set_target_push_back(const sync_target &target); + void set_target(const bool is_dir, const std::string &vpath, + const util::h32 &hash, const bool high_priority = false); }; } // namespace hpfs diff --git a/src/ledger/ledger.cpp b/src/ledger/ledger.cpp index 9ef5129e..94e8535a 100644 --- a/src/ledger/ledger.cpp +++ b/src/ledger/ledger.cpp @@ -615,7 +615,7 @@ namespace ledger const int fd = open(prev_shard_hash_file_path.c_str(), O_RDONLY | O_CLOEXEC); if (fd == -1) { - LOG_DEBUG << "Cannot read " << prev_shard_hash_file_path; + LOG_ERROR << errno << ": Error reading prev.shard file " << prev_shard_hash_file_path; return; } @@ -630,7 +630,7 @@ namespace ledger } const std::string shard_path = std::string(shard_parent_dir).append("/").append(std::to_string(seq_no)); - ledger_sync_worker.set_target_push_back(hpfs::sync_target{prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); + ledger_sync_worker.set_target(true, shard_path, prev_shard_hash_from_file); } } diff --git a/src/ledger/ledger_sync.cpp b/src/ledger/ledger_sync.cpp index 66c5e8de..395c3bfe 100644 --- a/src/ledger/ledger_sync.cpp +++ b/src/ledger/ledger_sync.cpp @@ -7,13 +7,13 @@ namespace ledger { constexpr const char *HPFS_SESSION_NAME = "ro_shard_sync_status"; - void ledger_sync::on_sync_target_acheived(const hpfs::sync_target &synced_target) + void ledger_sync::on_sync_target_acheived(const std::string &vpath, const util::h32 &hash) { - const std::string shard_hash_file_path = fs_mount->physical_path(hpfs::RW_SESSION_NAME, synced_target.vpath) + PREV_SHARD_HASH_FILENAME; + const std::string shard_hash_file_path = fs_mount->physical_path(hpfs::RW_SESSION_NAME, vpath) + PREV_SHARD_HASH_FILENAME; const int fd = open(shard_hash_file_path.c_str(), O_RDONLY | O_CLOEXEC); if (fd == -1) { - LOG_DEBUG << "Cannot read " << shard_hash_file_path; + LOG_ERROR << errno << ": Error at target achived. Cannot read " << shard_hash_file_path; return; } @@ -26,13 +26,13 @@ namespace ledger LOG_ERROR << errno << ": Error reading hash file. " << shard_hash_file_path; return; } - const size_t pos = synced_target.vpath.find_last_of("/"); + const size_t pos = vpath.find_last_of("/"); if (pos == std::string::npos) { - LOG_ERROR << "Error retreiving shard no from " << synced_target.vpath; + LOG_ERROR << "Error retreiving shard no from " << vpath; return; } - const std::string synced_shard_seq_no_str = synced_target.vpath.substr(pos + 1); + const std::string synced_shard_seq_no_str = vpath.substr(pos + 1); uint64_t synced_shard_seq_no; if (util::stoull(synced_shard_seq_no_str, synced_shard_seq_no) == -1) { @@ -40,7 +40,7 @@ namespace ledger return; } - const std::string shard_parent_dir = synced_target.vpath.substr(0, pos); + const std::string shard_parent_dir = vpath.substr(0, pos); if (shard_parent_dir == PRIMARY_DIR) { @@ -52,14 +52,14 @@ namespace ledger // Persist the lastest synced shard seq number to the max shard meta file. if (persist_max_shard_seq_no(PRIMARY_DIR, synced_shard_seq_no) == -1) { - LOG_ERROR << "Error updating max shard meta file in primary shard sync. " << synced_target.vpath; + LOG_ERROR << "Error updating max shard meta file in primary shard sync. " << vpath; return; } - const p2p::sequence_hash updated_primary_shard_id{synced_shard_seq_no, synced_target.hash}; + const p2p::sequence_hash updated_primary_shard_id{synced_shard_seq_no, hash}; if (get_last_ledger_and_update_context(hpfs::RW_SESSION_NAME, updated_primary_shard_id, false) == -1) { - LOG_ERROR << "Error updating context from the synced shard " << synced_target.vpath; + LOG_ERROR << "Error updating context from the synced shard " << vpath; return; } ctx.set_last_primary_shard_id(updated_primary_shard_id); @@ -85,7 +85,7 @@ namespace ledger && prev_shard_hash_from_file != prev_shard_hash_from_hpfs) // Continue to sync backwards if the hash from prev_shard.hash is not matching with the shard hash from hpfs. { const std::string shard_path = std::string(PRIMARY_DIR).append("/").append(std::to_string(synced_shard_seq_no)); - set_target_push_back(hpfs::sync_target{prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); + set_target(true, shard_path, prev_shard_hash_from_file); } else { @@ -114,7 +114,7 @@ namespace ledger } last_raw_shard_seq_no = synced_shard_seq_no; - ctx.set_last_raw_shard_id(p2p::sequence_hash{synced_shard_seq_no, synced_target.hash}); + ctx.set_last_raw_shard_id(p2p::sequence_hash{synced_shard_seq_no, hash}); is_last_raw_shard_syncing = false; // If existing max shard is older than the max we can keep. Then delete all the existing shards. @@ -136,7 +136,7 @@ namespace ledger && prev_shard_hash_from_file != prev_shard_hash_from_hpfs) // Continue to sync backwards if the hash from prev_shard.hash is not matching with the shard hash from hpfs. { const std::string shard_path = std::string(RAW_DIR).append("/").append(std::to_string(synced_shard_seq_no)); - set_target_push_back(hpfs::sync_target{prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); + set_target(true, shard_path, prev_shard_hash_from_file); } else { @@ -161,7 +161,7 @@ namespace ledger candidate_hpfs_responses.splice(candidate_hpfs_responses.end(), p2p::ctx.collected_msgs.ledger_hpfs_responses); } - void ledger_sync::on_sync_target_abandoned() + void ledger_sync::on_sync_abandoned() { // Reset these flags since we are abandoning the sync. is_last_primary_shard_syncing = false; diff --git a/src/ledger/ledger_sync.hpp b/src/ledger/ledger_sync.hpp index 19e416f9..d98fa05d 100644 --- a/src/ledger/ledger_sync.hpp +++ b/src/ledger/ledger_sync.hpp @@ -12,8 +12,8 @@ namespace ledger { private: void swap_collected_responses(); - void on_sync_target_acheived(const hpfs::sync_target &synced_target); - void on_sync_target_abandoned(); + void on_sync_target_acheived(const std::string &vpath, const util::h32 &hash); + void on_sync_abandoned(); public: std::atomic is_last_primary_shard_syncing = false; diff --git a/src/sc/contract_sync.cpp b/src/sc/contract_sync.cpp index 1c4e8cfd..4c438b73 100644 --- a/src/sc/contract_sync.cpp +++ b/src/sc/contract_sync.cpp @@ -8,9 +8,9 @@ namespace sc { - void contract_sync::on_sync_target_acheived(const hpfs::sync_target &synced_target) + void contract_sync::on_sync_target_acheived(const std::string &vpath, const util::h32 &hash) { - if (synced_target.vpath == PATCH_FILE_PATH) + if (vpath == PATCH_FILE_PATH) { // Appling new patch file changes to hpcore runtime. if (conf::apply_patch_config(hpfs::RW_SESSION_NAME) == -1) @@ -23,7 +23,7 @@ namespace sc consensus::refresh_time_config(false); // Update global hash tracker with the new patch file hash. - fs_mount->set_parent_hash(synced_target.vpath, synced_target.hash); + fs_mount->set_parent_hash(vpath, hash); } } } diff --git a/src/sc/contract_sync.hpp b/src/sc/contract_sync.hpp index cc91d4c5..6be04682 100644 --- a/src/sc/contract_sync.hpp +++ b/src/sc/contract_sync.hpp @@ -11,7 +11,7 @@ namespace sc class contract_sync : public hpfs::hpfs_sync { private: - void on_sync_target_acheived(const hpfs::sync_target &synced_target); + void on_sync_target_acheived(const std::string &vpath, const util::h32 &hash); void swap_collected_responses(); }; } // namespace sc diff --git a/src/util/util.cpp b/src/util/util.cpp index 97cfb204..07e480e9 100644 --- a/src/util/util.cpp +++ b/src/util/util.cpp @@ -162,7 +162,10 @@ namespace util // Create this dir. if (!error_thrown && mkdir(path.data(), S_IRWXU | S_IRWXG | S_IROTH) == -1) + { + LOG_ERROR << errno << ": Error in recursive dir creation. " << path; error_thrown = true; + } if (error_thrown) return -1;