From ef2bb22b67a917072f90ee9a020a527370707664 Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Tue, 4 May 2021 16:33:19 +0530 Subject: [PATCH] hpfs sync target refactoring. (#302) --- src/consensus.cpp | 14 +- src/hpfs/hpfs_sync.cpp | 312 ++++++++++++++--------------- src/hpfs/hpfs_sync.hpp | 20 +- src/ledger/ledger.cpp | 6 +- src/ledger/ledger_sync.cpp | 14 +- src/ledger/ledger_sync.hpp | 4 +- src/msg/fbuf/p2pmsg_conversion.cpp | 4 +- src/msg/fbuf/p2pmsg_conversion.hpp | 2 +- src/p2p/p2p.cpp | 2 +- src/p2p/peer_session_handler.cpp | 4 +- src/sc/contract_sync.cpp | 2 +- src/sc/contract_sync.hpp | 2 +- src/sc/sc.cpp | 4 +- 13 files changed, 177 insertions(+), 213 deletions(-) diff --git a/src/consensus.cpp b/src/consensus.cpp index dee7a8ab..36caef06 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -244,10 +244,9 @@ namespace consensus // We first request the latest shard. const std::string majority_shard_seq_no_str = std::to_string(majority_primary_shard_id.seq_no); - const std::string sync_name = "primary shard " + majority_shard_seq_no_str; const std::string shard_path = std::string(ledger::PRIMARY_DIR).append("/").append(majority_shard_seq_no_str); ledger::ledger_sync_worker.is_last_primary_shard_syncing = true; - ledger::ledger_sync_worker.set_target_push_front(hpfs::sync_target{sync_name, majority_primary_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); + ledger::ledger_sync_worker.set_target_push_front(hpfs::sync_target{majority_primary_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); } // Check out raw shard hash with majority raw shard hash. @@ -283,12 +282,12 @@ namespace consensus } else { - if (is_state_desync) - sc::contract_sync_worker.set_target_push_front(hpfs::sync_target{"state", majority_state_hash, sc::STATE_DIR_PATH, hpfs::BACKLOG_ITEM_TYPE::DIR}); - // Patch file sync is prioritized, Therefore it is set in the front of the sync target list. if (is_patch_desync) - sc::contract_sync_worker.set_target_push_front(hpfs::sync_target{"patch", majority_patch_hash, sc::PATCH_FILE_PATH, hpfs::BACKLOG_ITEM_TYPE::FILE}); + sc::contract_sync_worker.set_target_push_front(hpfs::sync_target{majority_patch_hash, sc::PATCH_FILE_PATH, hpfs::BACKLOG_ITEM_TYPE::FILE}); + + if (is_state_desync) + sc::contract_sync_worker.set_target_push_back(hpfs::sync_target{majority_state_hash, sc::STATE_DIR_PATH, hpfs::BACKLOG_ITEM_TYPE::DIR}); } } @@ -297,10 +296,9 @@ namespace consensus { conf::change_role(conf::ROLE::OBSERVER); const std::string majority_shard_seq_no_str = std::to_string(majority_raw_shard_id.seq_no); - const std::string sync_name = "raw shard " + majority_shard_seq_no_str; const std::string shard_path = std::string(ledger::RAW_DIR).append("/").append(majority_shard_seq_no_str); ledger::ledger_sync_worker.is_last_raw_shard_syncing = true; - ledger::ledger_sync_worker.set_target_push_back(hpfs::sync_target{sync_name, majority_raw_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); + ledger::ledger_sync_worker.set_target_push_back(hpfs::sync_target{majority_raw_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); } // If shards aren't aligned with max shard count, do the relevant shard cleanups and requests. diff --git a/src/hpfs/hpfs_sync.cpp b/src/hpfs/hpfs_sync.cpp index 3f9a2a04..2aa0acea 100644 --- a/src/hpfs/hpfs_sync.cpp +++ b/src/hpfs/hpfs_sync.cpp @@ -30,6 +30,18 @@ namespace hpfs constexpr int FILE_PERMS = 0644; +#define SYNC_ERROR -1 +#define SYNC_ABANDONED 0 +#define SYNC_ACHIEVED 1 +#define SYNC_PRIORITY_CHANGED 2 +#define SYNC_HASH_CHANGED 3 +#define REQUEST_LOOP_INTERRUPT \ + { \ + const int res = sync_interrupt(target); \ + if (res != -1) \ + return res; \ + } + /** * This should be called to activate the hpfs sync. */ @@ -58,28 +70,6 @@ namespace hpfs } } - /** - * Sets a list of sync targets. Sync finishes when all the targets are synced. - * Syncing happens sequentially. - * @param sync_target_list List of sync targets to sync towards. - */ - void hpfs_sync::set_target(const std::list &sync_target_list) - { - if (sync_target_list.empty()) - return; - - // Do not do anything if we are already syncing towards the specified target states. - if (is_shutting_down || (is_syncing && original_target_list == sync_target_list)) - return; - - original_target_list = sync_target_list; - target_list = std::move(sync_target_list); - - std::unique_lock lock(current_target_mutex); - current_target = target_list.front(); // Make the first element of the list the first target to sync. - is_syncing = true; - } - /** * This sets a prioritized sync target. This target will replace current sync target. * This target will immediately starting to sync and the interupted sync will resume @@ -87,11 +77,10 @@ namespace hpfs */ void hpfs_sync::set_target_push_front(const sync_target &target) { - { - std::shared_lock lock(current_target_mutex); - if (is_shutting_down || (is_syncing && current_target == target)) - return; - } + std::unique_lock lock(target_mutex); + + if (is_shutting_down || (is_syncing && target_list.front() == target)) + return; // Remove any previous sync targets for the same target vpath. target_list.remove_if([&target](const hpfs::sync_target &element) { @@ -100,9 +89,6 @@ namespace hpfs target_list.push_front(target); is_syncing = true; - std::unique_lock lock(current_target_mutex); - // Make the first element of the list the first target to sync. - current_target = target_list.front(); } /** @@ -111,24 +97,25 @@ namespace hpfs */ void hpfs_sync::set_target_push_back(const sync_target &target) { - // Current_target_mutex is not required since this function is currently used in a unique_lock - // scope. - if (is_shutting_down || (is_syncing && current_target == target)) + std::unique_lock lock(target_mutex); + + if (is_shutting_down) return; - // Check whether this target is already in the sync target list. - const auto itr = std::find(target_list.begin(), target_list.end(), target); + // Check whether the same vpath target is already in the sync target list. If so, update it's information. + const auto itr = std::find_if(target_list.begin(), target_list.end(), + [&target](const hpfs::sync_target &element) { + return element.vpath == target.vpath; + }); if (itr != target_list.end()) + { + itr->hash = target.hash; + itr->item_type = target.item_type; return; + } target_list.push_back(target); - if (!is_syncing) - { - std::unique_lock lock(current_target_mutex); - // Make the first element of the list the first target to sync. - current_target = target_list.front(); - is_syncing = true; - } + is_syncing = true; } /** @@ -149,81 +136,74 @@ namespace hpfs if (!is_syncing) continue; - bool is_sync_complete = false; - if (fs_mount->acquire_rw_session() != -1) - { - while (!is_shutting_down) - { - { - std::shared_lock lock(current_target_mutex); - LOG_INFO << "Hpfs " << name << " sync: Starting sync for target " << current_target.name << " hash: " << current_target.hash; - } - util::h32 new_state = util::h32_empty; - const int result = request_loop(current_target.hash, new_state); - - pending_requests.clear(); - candidate_hpfs_responses.clear(); - submitted_requests.clear(); - - if (result == -1 || result == 1 || is_shutting_down) - break; - - { - std::unique_lock lock(current_target_mutex); - - if (new_state == current_target.hash) - { - LOG_INFO << "Hpfs " << name << " sync: Target " << current_target.name << " hash achieved: " << new_state; - - // After every sync target completion, release and reacquire hpfs rw session so hpfs gets some room - // to update the last checkpoint. This helps any upcoming ro sessions to get updated file system state. - reacquire_rw_session(); - - on_current_sync_state_acheived(current_target); - - // Start syncing to next target. - const int result = start_syncing_next_target(); - if (result == 0) - break; - else if (result == 1) - continue; - } - else - { - LOG_INFO << "Hpfs " << name << " sync: Continuing sync for new " << current_target.name << " hash: " << current_target.hash; - continue; - } - } - } - fs_mount->release_rw_session(); - is_sync_complete = true; - } - else + if (fs_mount->acquire_rw_session() == -1) { LOG_ERROR << "Hpfs " << name << " sync: Failed to start hpfs rw session"; + continue; } - // Clear target list and original target list since the sync is complete. - target_list = {}; - original_target_list = {}; - is_syncing = false; - const sync_target last_sync_target = current_target; - current_target = {}; - if (is_sync_complete) - on_sync_complete(last_sync_target); + + while (!is_shutting_down) + { + hpfs::sync_target current_target = {}; + { + std::shared_lock lock(target_mutex); + + if (target_list.empty()) + { + LOG_INFO << "Hpfs " << name << " sync: All targets complete."; + is_syncing = false; + break; + } + + // Set the target to be top of the list. + current_target = target_list.front(); + } + + // Start syncing towards specified target. + const int result = request_loop(current_target); + + pending_requests.clear(); + candidate_hpfs_responses.clear(); + submitted_requests.clear(); + + if (is_shutting_down) + break; + + if (result != SYNC_HASH_CHANGED) + { + // The finished target can be removed from the list. + { + std::unique_lock lock(target_mutex); + target_list.pop_front(); + } + + // After every sync target abandon or completion, release and reacquire hpfs rw session so hpfs gets some room + // to update the last checkpoint. This helps any upcoming ro sessions to get updated file system state. + reacquire_rw_session(); + + if (result == SYNC_ABANDONED) + on_sync_target_abandoned(); + else if (result == SYNC_ACHIEVED) + on_sync_target_acheived(current_target); + } + } + fs_mount->release_rw_session(); } LOG_INFO << "Hpfs " << name << " sync: Worker stopped."; } /** - * Reqest loop. - * @return -1 on error. 0 when current sync state acheived or sync is stopped due to target change. - * Returns 1 on successfully finishing all the sync targets. - */ - int hpfs_sync::request_loop(const util::h32 current_target_hash, util::h32 &updated_state) + * Reqest loop which syncs towards the specified target. + * @return 0 when sync is abandoned due to resubmission threshold or shutdown. 1 when target sync hash acheived. + * 2 when target has been re-prioritized. 3 when target hash has changed. -1 on error. + */ + int hpfs_sync::request_loop(const hpfs::sync_target &target) { + LOG_INFO << "Hpfs " << name << " sync: Starting target:" << target.hash << " " << target.vpath; + // Send the initial root hpfs request of the current target. - submit_request(backlog_item{current_target.item_type, current_target.vpath, -1, current_target_hash}); + submit_request(backlog_item{target.item_type, target.vpath, -1, target.hash}); // Indicates whether any responses were processed in the previous loop iteration. bool prev_responses_processed = false; @@ -231,8 +211,10 @@ namespace hpfs // No. of repetitive resubmissions so far. (This is reset whenever we receive a hpfs response) uint16_t resubmissions_count = 0; - while (!should_stop_request_loop(current_target_hash)) + while (true) { + REQUEST_LOOP_INTERRUPT + // Wait a small delay if there were no responses processed during previous iteration. if (!prev_responses_processed) util::sleep(REQUEST_LOOP_WAIT); @@ -248,8 +230,7 @@ namespace hpfs for (auto &response : candidate_hpfs_responses) { - if (should_stop_request_loop(current_target_hash)) - return 0; + REQUEST_LOOP_INTERRUPT const std::string from = response.first.substr(2, 10); // Sender pubkey. const p2pmsg::P2PMsg &msg = *p2pmsg::GetP2PMsg(response.second.data()); @@ -342,19 +323,24 @@ namespace hpfs // After handling each response, check whether we have reached target hpfs state. // get_hash returns 0 incase target parent is not existing in our side. - if (fs_mount->get_hash(updated_state, hpfs::RW_SESSION_NAME, current_target.vpath) == -1) + util::h32 updated_state = util::h32_empty; + if (fs_mount->get_hash(updated_state, hpfs::RW_SESSION_NAME, target.vpath) == -1) { - LOG_ERROR << "Hpfs " << name << " sync: exiting due to hash check error."; - return -1; + LOG_ERROR << "Hpfs " << name << " sync: Abandoning due to hash check error. " << target.vpath; + return SYNC_ERROR; } // Update the central hpfs state tracker. - fs_mount->set_parent_hash(current_target.vpath, updated_state); + fs_mount->set_parent_hash(target.vpath, updated_state); - LOG_DEBUG << "Hpfs " << name << " sync: current:" << updated_state << " | target:" << current_target_hash - << " (" << current_target.vpath << ")"; - if (updated_state == current_target_hash) - return 0; + // End the loop if sync is complete. + if (updated_state == target.hash) + { + LOG_INFO << "Hpfs " << name << " sync: Achieved target:" << target.hash << " " << target.vpath; + return SYNC_ACHIEVED; + } + + LOG_DEBUG << "Hpfs " << name << " sync: Current:" << updated_state << " | target:" << target.hash << " " << target.vpath; } candidate_hpfs_responses.clear(); @@ -365,8 +351,7 @@ namespace hpfs // Check for long-awaited responses and re-request them. for (auto &[hash, request] : submitted_requests) { - if (should_stop_request_loop(current_target_hash)) - return 0; + REQUEST_LOOP_INTERRUPT if (request.waiting_time < request_resubmit_timeout) { @@ -377,17 +362,8 @@ namespace hpfs { if (++resubmissions_count > ABANDON_THRESHOLD) { - LOG_INFO << "Hpfs " << name << " sync: Resubmission threshold exceeded. Abandoning sync."; - - std::unique_lock lock(current_target_mutex); - const int result = start_syncing_next_target(); - if (result == 0) - { - current_target = {}; - on_sync_abandoned(); - return 1; // To stop syncing since we have sync all the targets. - } - return 0; + LOG_INFO << "Hpfs " << name << " sync: Sync abandoned due to resubmission threshold. " << target.vpath; + return SYNC_ABANDONED; } // Reset the counter and re-submit request. @@ -402,15 +378,48 @@ namespace hpfs const uint16_t available_slots = MAX_AWAITING_REQUESTS - submitted_requests.size(); for (int i = 0; i < available_slots && !pending_requests.empty(); i++) { - if (should_stop_request_loop(current_target_hash)) - return 0; + REQUEST_LOOP_INTERRUPT submit_request(pending_requests.front()); pending_requests.pop_front(); } } } - return 0; + return SYNC_ABANDONED; + } + + /** + * Indicates whether to break out of hpfs request processing loop and the reason. + * @return 0 if interrupted due to shutdown. 2 when target has been re-prioritized. + * 3 when target hash has changed. -1 if not interrupted. + */ + int hpfs_sync::sync_interrupt(const hpfs::sync_target &target) + { + if (is_shutting_down) + { + LOG_INFO << "Hpfs " << name << " sync: Sync abandoned due to shutdown. " << target.vpath; + return SYNC_ABANDONED; + } + + // Stop request loop if the target has changed. + std::shared_lock lock(target_mutex); + + const hpfs::sync_target &top_target = target_list.front(); + + if (target.vpath != top_target.vpath) + { + // A new high priority target has been set. + LOG_INFO << "Hpfs " << name << " sync: Higher priority target found. Abandoned: " << target.vpath; + return SYNC_PRIORITY_CHANGED; + } + else if (target.hash != top_target.hash) + { + // Hash changed of current target. + LOG_INFO << "Hpfs " << name << " sync: Target updated. New hash:" << top_target.hash << " " << top_target.vpath; + return SYNC_HASH_CHANGED; + } + + return -1; // No interrupt. } /** @@ -493,19 +502,6 @@ namespace hpfs return crypto::get_hash(offset, buf) == hash; } - /** - * Indicates whether to break out of hpfs request processing loop. - */ - bool hpfs_sync::should_stop_request_loop(const util::h32 ¤t_target_hash) - { - if (is_shutting_down) - return true; - - // Stop request loop if the target has changed. - std::shared_lock lock(current_target_mutex); - return current_target_hash != current_target.hash; - } - /** * Sends a hpfs request to a random peer. * @param path Requested file or dir path. @@ -776,7 +772,7 @@ namespace hpfs * This method can be used to invoke mount specific custom logic (after overriding this method) to be executed after * a sync target is acheived. */ - void hpfs_sync::on_current_sync_state_acheived(const sync_target &synced_target) + void hpfs_sync::on_sync_target_acheived(const sync_target &synced_target) { } @@ -784,7 +780,7 @@ namespace hpfs * This method can be used to invoke mount specific custom logic (after overriding this method) to be executed after * a sync is abondened. */ - void hpfs_sync::on_sync_abandoned() + void hpfs_sync::on_sync_target_abandoned() { } @@ -794,25 +790,7 @@ namespace hpfs */ void hpfs_sync::on_sync_complete(const sync_target &last_sync_target) { - LOG_INFO << "Hpfs " << name << " sync: All parents synced."; - } - - /** - * Starts syncing next target if available after current target finishes. - * @return returns 0 when the full sync is complete and 1 when more sync targets are available. - */ - int hpfs_sync::start_syncing_next_target() - { - target_list.pop_front(); // Remove the synced parent from the target list. - if (target_list.empty()) - { - return 0; - } - else - { - current_target = target_list.front(); - return 1; - } + LOG_INFO << "Hpfs " << name << " sync: All targets synced."; } /** diff --git a/src/hpfs/hpfs_sync.hpp b/src/hpfs/hpfs_sync.hpp index c98e1b15..a032873a 100644 --- a/src/hpfs/hpfs_sync.hpp +++ b/src/hpfs/hpfs_sync.hpp @@ -31,7 +31,6 @@ namespace hpfs struct sync_target { - std::string name; // Used for logging. util::h32 hash = util::h32_empty; std::string vpath; BACKLOG_ITEM_TYPE item_type = BACKLOG_ITEM_TYPE::DIR; @@ -48,27 +47,22 @@ namespace hpfs bool init_success = false; std::string name; // Name used for logging. - sync_target current_target = {}; + std::shared_mutex target_mutex; std::list target_list; // The current target hashes we are syncing towards. - // Store the originally submitted sync target list. This list is used to avoid submitting same list multiple times - // because target list is updated when the sync targets are acheived. - std::list original_target_list; - std::list pending_requests; // List of pending sync requests to be sent out. // List of submitted requests we are awaiting responses for, keyed by expected response path+hash. std::unordered_map submitted_requests; std::thread hpfs_sync_thread; - std::shared_mutex current_target_mutex; std::atomic is_shutting_down = false; void hpfs_syncer_loop(); - int request_loop(const util::h32 current_target_hash, util::h32 &updated_state); + int request_loop(const hpfs::sync_target &target); - int start_syncing_next_target(); + int sync_interrupt(const hpfs::sync_target &target); bool validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const mode_t dir_mode, const std::vector &peer_fs_entries); @@ -78,8 +72,6 @@ namespace hpfs bool validate_file_block_hash(std::string_view hash, const uint32_t block_id, std::string_view buf); - bool should_stop_request_loop(const util::h32 ¤t_target_hash); - void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, const util::h32 expected_hash, std::string &target_pubkey); @@ -100,9 +92,9 @@ namespace hpfs hpfs::hpfs_mount *fs_mount = NULL; - virtual void on_current_sync_state_acheived(const sync_target &synced_target); + virtual void on_sync_target_acheived(const sync_target &synced_target); - virtual void on_sync_abandoned(); + virtual void on_sync_target_abandoned(); virtual void on_sync_complete(const sync_target &last_sync_target); @@ -118,8 +110,6 @@ namespace hpfs void deinit(); - void set_target(const std::list &sync_target_list); - void set_target_push_front(const sync_target &target); void set_target_push_back(const sync_target &target); diff --git a/src/ledger/ledger.cpp b/src/ledger/ledger.cpp index eee01a3e..d06f9bc8 100644 --- a/src/ledger/ledger.cpp +++ b/src/ledger/ledger.cpp @@ -66,13 +66,13 @@ namespace ledger return -1; } - if (ledger_server.init("ledger", &ledger_fs) == -1) + if (ledger_server.init("ldgr", &ledger_fs) == -1) { LOG_ERROR << "Ledger file system serve worker initialization failed."; return -1; } - if (ledger_sync_worker.init("ledger", &ledger_fs) == -1) + if (ledger_sync_worker.init("ldgr", &ledger_fs) == -1) { LOG_ERROR << "Ledger file system sync worker initialization failed."; return -1; @@ -630,7 +630,7 @@ namespace ledger } const std::string shard_path = std::string(shard_parent_dir).append("/").append(std::to_string(seq_no)); - ledger_sync_worker.set_target_push_back(hpfs::sync_target{shard_path, prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); + ledger_sync_worker.set_target_push_back(hpfs::sync_target{prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); } } diff --git a/src/ledger/ledger_sync.cpp b/src/ledger/ledger_sync.cpp index 9f171b49..ea3c666e 100644 --- a/src/ledger/ledger_sync.cpp +++ b/src/ledger/ledger_sync.cpp @@ -7,7 +7,7 @@ namespace ledger { constexpr const char *HPFS_SESSION_NAME = "ro_shard_sync_status"; - void ledger_sync::on_current_sync_state_acheived(const hpfs::sync_target &synced_target) + void ledger_sync::on_sync_target_acheived(const hpfs::sync_target &synced_target) { const std::string shard_hash_file_path = fs_mount->physical_path(hpfs::RW_SESSION_NAME, synced_target.vpath) + PREV_SHARD_HASH_FILENAME; const int fd = open(shard_hash_file_path.c_str(), O_RDONLY | O_CLOEXEC); @@ -52,14 +52,14 @@ namespace ledger // Persist the lastest synced shard seq number to the max shard meta file. if (persist_max_shard_seq_no(PRIMARY_DIR, synced_shard_seq_no) == -1) { - LOG_ERROR << "Error updating max shard meta file in primary shard sync."; + LOG_ERROR << "Error updating max shard meta file in primary shard sync. " << synced_target.vpath; return; } const p2p::sequence_hash updated_primary_shard_id{synced_shard_seq_no, synced_target.hash}; if (get_last_ledger_and_update_context(hpfs::RW_SESSION_NAME, updated_primary_shard_id) == -1) { - LOG_ERROR << "Error updating context from the synced shard " << synced_target.name; + LOG_ERROR << "Error updating context from the synced shard " << synced_target.vpath; return; } ctx.set_last_primary_shard_id(updated_primary_shard_id); @@ -84,9 +84,8 @@ namespace ledger if (prev_shard_hash_from_file != util::h32_empty // Hash in the prev_shard.hash of the 0th shard is h32 empty. Syncing should be stopped then. && prev_shard_hash_from_file != prev_shard_hash_from_hpfs) // Continue to sync backwards if the hash from prev_shard.hash is not matching with the shard hash from hpfs. { - const std::string sync_name = "primary shard " + std::to_string(synced_shard_seq_no); const std::string shard_path = std::string(PRIMARY_DIR).append("/").append(std::to_string(synced_shard_seq_no)); - set_target_push_back(hpfs::sync_target{sync_name, prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); + set_target_push_back(hpfs::sync_target{prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); } else { @@ -136,9 +135,8 @@ namespace ledger if (prev_shard_hash_from_file != util::h32_empty // Hash in the prev_shard.hash of the 0th shard is h32 empty. Syncing should be stopped then. && prev_shard_hash_from_file != prev_shard_hash_from_hpfs) // Continue to sync backwards if the hash from prev_shard.hash is not matching with the shard hash from hpfs. { - const std::string sync_name = "raw shard " + std::to_string(synced_shard_seq_no); const std::string shard_path = std::string(RAW_DIR).append("/").append(std::to_string(synced_shard_seq_no)); - set_target_push_back(hpfs::sync_target{sync_name, prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); + set_target_push_back(hpfs::sync_target{prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); } else { @@ -163,7 +161,7 @@ namespace ledger candidate_hpfs_responses.splice(candidate_hpfs_responses.end(), p2p::ctx.collected_msgs.ledger_hpfs_responses); } - void ledger_sync::on_sync_abandoned() + void ledger_sync::on_sync_target_abandoned() { // Reset these flags since we are abandoning the sync. is_last_primary_shard_syncing = false; diff --git a/src/ledger/ledger_sync.hpp b/src/ledger/ledger_sync.hpp index e6a5a0f8..19e416f9 100644 --- a/src/ledger/ledger_sync.hpp +++ b/src/ledger/ledger_sync.hpp @@ -12,8 +12,8 @@ namespace ledger { private: void swap_collected_responses(); - void on_current_sync_state_acheived(const hpfs::sync_target &synced_target); - void on_sync_abandoned(); + void on_sync_target_acheived(const hpfs::sync_target &synced_target); + void on_sync_target_abandoned(); public: std::atomic is_last_primary_shard_syncing = false; diff --git a/src/msg/fbuf/p2pmsg_conversion.cpp b/src/msg/fbuf/p2pmsg_conversion.cpp index 7cb6cac7..bfb1412b 100644 --- a/src/msg/fbuf/p2pmsg_conversion.cpp +++ b/src/msg/fbuf/p2pmsg_conversion.cpp @@ -35,7 +35,7 @@ namespace msg::fbuf::p2pmsg return VerifyP2PMsgBuffer(verifier); } - const p2p::peer_message_info get_peer_message_info(std::string_view message) + const p2p::peer_message_info get_peer_message_info(std::string_view message, const p2p::peer_comm_session *session) { const auto p2p_msg = p2pmsg::GetP2PMsg(message.data()); @@ -45,7 +45,7 @@ namespace msg::fbuf::p2pmsg const uint64_t time_now = util::get_epoch_milliseconds(); if (p2p_msg->created_on() < (time_now - (conf::cfg.contract.roundtime * 4))) { - LOG_DEBUG << "Peer message is too old. type:" << p2p_msg->content_type(); + LOG_DEBUG << "Peer message is too old. type:" << p2p_msg->content_type() << " [" << (session ? session->uniqueid : "") << "]"; return p2p::peer_message_info{NULL, P2PMsgContent_NONE, 0}; } } diff --git a/src/msg/fbuf/p2pmsg_conversion.hpp b/src/msg/fbuf/p2pmsg_conversion.hpp index ab670015..cf31eccc 100644 --- a/src/msg/fbuf/p2pmsg_conversion.hpp +++ b/src/msg/fbuf/p2pmsg_conversion.hpp @@ -12,7 +12,7 @@ namespace msg::fbuf::p2pmsg bool verify_peer_message(std::string_view message); - const p2p::peer_message_info get_peer_message_info(std::string_view message); + const p2p::peer_message_info get_peer_message_info(std::string_view message, const p2p::peer_comm_session *session = NULL); bool verify_proposal_msg_trust(const p2p::peer_message_info &mi); diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 3ddf7061..159cd835 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -259,7 +259,7 @@ namespace p2p const uint64_t time_now = util::get_epoch_milliseconds(); if (originated_on < (time_now - (conf::cfg.contract.roundtime * 3))) { - LOG_DEBUG << "Peer message is too old for forwarding. type:" << msg_type; + LOG_DEBUG << "Peer message is too old for forwarding. type:" << msg_type << " [" << session.uniqueid << "]"; return false; } diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index eeb43d11..de2285ff 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -75,14 +75,14 @@ namespace p2p // Adding message size to peer message characters(bytes) per minute counter. session.increment_metric(comm::SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, message.size()); - const peer_message_info mi = p2pmsg::get_peer_message_info(message); + const peer_message_info mi = p2pmsg::get_peer_message_info(message, &session); if (!mi.p2p_msg) // Message buffer will be null if peer message was too old. return 0; if (!recent_peermsg_hashes.try_emplace(crypto::get_hash(message))) { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_DUPMSGS_PER_MINUTE, 1); - LOG_DEBUG << "Duplicate peer message. type:" << mi.type << " " << session.display_name(); + LOG_DEBUG << "Duplicate peer message. type:" << mi.type << " " << session.display_name() << " [" << session.uniqueid << "]"; return 0; } diff --git a/src/sc/contract_sync.cpp b/src/sc/contract_sync.cpp index 874935d8..b35b2c7b 100644 --- a/src/sc/contract_sync.cpp +++ b/src/sc/contract_sync.cpp @@ -8,7 +8,7 @@ namespace sc { - void contract_sync::on_current_sync_state_acheived(const hpfs::sync_target &synced_target) + void contract_sync::on_sync_target_acheived(const hpfs::sync_target &synced_target) { if (synced_target.vpath == PATCH_FILE_PATH) { diff --git a/src/sc/contract_sync.hpp b/src/sc/contract_sync.hpp index 14732437..cc91d4c5 100644 --- a/src/sc/contract_sync.hpp +++ b/src/sc/contract_sync.hpp @@ -11,7 +11,7 @@ namespace sc class contract_sync : public hpfs::hpfs_sync { private: - void on_current_sync_state_acheived(const hpfs::sync_target &synced_target); + void on_sync_target_acheived(const hpfs::sync_target &synced_target); void swap_collected_responses(); }; } // namespace sc diff --git a/src/sc/sc.cpp b/src/sc/sc.cpp index 743238d9..80654f48 100644 --- a/src/sc/sc.cpp +++ b/src/sc/sc.cpp @@ -35,7 +35,7 @@ namespace sc return -1; } - if (contract_server.init("contract", &contract_fs) == -1) + if (contract_server.init("cont", &contract_fs) == -1) { LOG_ERROR << "Contract file system serve worker initialization failed."; return -1; @@ -47,7 +47,7 @@ namespace sc } else { - if (contract_sync_worker.init("contract", &contract_fs) == -1) + if (contract_sync_worker.init("cont", &contract_fs) == -1) { LOG_ERROR << "Contract file system sync worker initialization failed."; return -1;