diff --git a/src/consensus.cpp b/src/consensus.cpp index bdf373be..91b99769 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -199,6 +199,7 @@ namespace consensus const std::string majority_shard_seq_no_str = std::to_string(majority_primary_shard_id.seq_no); const std::string sync_name = "primary shard " + majority_shard_seq_no_str; const std::string shard_path = std::string(ledger::PRIMARY_DIR).append("/").append(majority_shard_seq_no_str); + ledger::ledger_sync_worker.is_last_primary_shard_syncing = true; ledger::ledger_sync_worker.set_target_push_front(hpfs::sync_target{sync_name, majority_primary_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); } @@ -238,10 +239,24 @@ namespace consensus const std::string majority_shard_seq_no_str = std::to_string(majority_blob_shard_id.seq_no); const std::string sync_name = "blob shard " + majority_shard_seq_no_str; const std::string shard_path = std::string(ledger::BLOB_DIR).append("/").append(majority_shard_seq_no_str); + ledger::ledger_sync_worker.is_last_blob_shard_syncing = true; ledger::ledger_sync_worker.set_target_push_back(hpfs::sync_target{sync_name, majority_blob_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); } } + // If shards aren't aligned with max shard count, Do the relevant shard cleanups and requests. + // In the first consensus round sync completion after the startup. + if (!ledger::ledger_sync_worker.is_syncing && (!ledger::ctx.primary_shards_persisted || !ledger::ctx.blob_shards_persisted) && ledger::ledger_fs.acquire_rw_session() != -1) + { + if (!ledger::ctx.primary_shards_persisted) + ledger::persist_shard_history(majority_primary_shard_id.seq_no, ledger::PRIMARY_DIR); + + if (!ledger::ctx.blob_shards_persisted) + ledger::persist_shard_history(majority_blob_shard_id.seq_no, ledger::BLOB_DIR); + + ledger::ledger_fs.release_rw_session(); + } + // Proceed further only if last primary shard, last blob shard, state and patch hashes are in sync with majority. if (!is_last_primary_shard_desync && !is_last_blob_shard_desync && !is_state_desync && !is_patch_desync) { @@ -263,7 +278,8 @@ namespace consensus */ void check_sync_completion() { - if (conf::cfg.node.role == conf::ROLE::OBSERVER && !sc::contract_sync_worker.is_syncing && !ledger::ledger_sync_worker.is_syncing) + // In ledger sync we only concern about last shard sync status to proceed with consensus. + if (conf::cfg.node.role == conf::ROLE::OBSERVER && !sc::contract_sync_worker.is_syncing && !ledger::ledger_sync_worker.is_last_primary_shard_syncing && !ledger::ledger_sync_worker.is_last_blob_shard_syncing) conf::change_role(conf::ROLE::VALIDATOR); } diff --git a/src/hpfs/hpfs_serve.cpp b/src/hpfs/hpfs_serve.cpp index 37448c30..0784ef62 100644 --- a/src/hpfs/hpfs_serve.cpp +++ b/src/hpfs/hpfs_serve.cpp @@ -242,7 +242,7 @@ namespace hpfs const int fd = open(file_path.c_str(), O_RDONLY | O_CLOEXEC); if (fd == -1) { - LOG_ERROR << errno << ": Open failed. " << file_path; + LOG_ERROR << errno << ": Open failed " << file_path; result = -1; } else diff --git a/src/ledger/ledger.cpp b/src/ledger/ledger.cpp index 1a5d1deb..a8fa9e73 100644 --- a/src/ledger/ledger.cpp +++ b/src/ledger/ledger.cpp @@ -274,7 +274,7 @@ namespace ledger /** * Remove old shards that exceeds max shard range from file system. - * @param led_shard_no minimum shard number to be in history. + * @param led_shard_no Minimum shard number to be in history. */ void remove_old_shards(const uint64_t led_shard_no, std::string_view shard_parent_dir) { @@ -297,6 +297,76 @@ namespace ledger } } + /** + * Cleanup and request historical shards according to the max we can keep. + * @param shard_seq_no Latest shard sequence number. + * @param shard_parent_dir Shard parent directory. + */ + void persist_shard_history(const uint64_t shard_seq_no, std::string_view shard_parent_dir) + { + // Skip if shard cleanup and requesting has been already done. + if ((shard_parent_dir == PRIMARY_DIR && ctx.primary_shards_persisted) || (shard_parent_dir == BLOB_DIR && ctx.blob_shards_persisted)) + return; + + // Set persisted flag to true. So this cleanup won't get executed again. + shard_parent_dir == PRIMARY_DIR ? ctx.primary_shards_persisted = true : ctx.blob_shards_persisted = true; + + const std::string shard_dir_path = std::string(ledger_fs.physical_path(hpfs::RW_SESSION_NAME, shard_parent_dir)); + const uint64_t max_shard_count = shard_dir_path == PRIMARY_DIR ? conf::cfg.node.history_config.max_primary_shards : conf::cfg.node.history_config.max_blob_shards; + const std::list shard_list = util::fetch_dir_entries(shard_dir_path); + // Skip the sequence no file from the count. + uint64_t shard_count = shard_list.size() - 1; + + // First, In history custom mode remove all the historical shards which are older than the min we can keep. + if (conf::cfg.node.history == conf::HISTORY::CUSTOM && shard_seq_no >= max_shard_count) + { + for (const std::string &shard : shard_list) + { + // Skip the sequence no file. + if (("/" + shard) == SHARD_SEQ_NO_FILENAME) + continue; + + uint64_t seq_no; + if (util::stoull(shard, seq_no) != -1 && seq_no <= (shard_seq_no - max_shard_count)) + { + const std::string shard_path = std::string(shard_dir_path).append("/").append(shard); + if (util::is_dir_exists(shard_path) && util::remove_directory_recursively(shard_path) == -1) + LOG_ERROR << errno << ": Error deleting shard: " << shard; + else + shard_count--; + } + } + } + + // In full history mode request for all the historical nodes if not exists, Otherwise request if max count haven't reached + if (shard_seq_no >= shard_count && (conf::cfg.node.history == conf::HISTORY::FULL || shard_count < max_shard_count)) + { + const uint64_t seq_no = shard_seq_no - shard_count; + + const std::string prev_shard_hash_file_path = shard_dir_path + "/" + std::to_string(seq_no + 1) + PREV_SHARD_HASH_FILENAME; + const int fd = open(prev_shard_hash_file_path.c_str(), O_RDONLY | O_CLOEXEC); + if (fd == -1) + { + LOG_DEBUG << "Cannot read " << prev_shard_hash_file_path; + return; + } + + util::h32 prev_shard_hash_from_file; + // Start reading hash excluding hp_version header. + const int res = pread(fd, &prev_shard_hash_from_file, sizeof(util::h32), util::HP_VERSION_HEADER_SIZE); + close(fd); + if (res == -1) + { + LOG_ERROR << errno << ": Error reading hash file. " << prev_shard_hash_file_path; + return; + } + + const std::string sync_name = (shard_parent_dir == PRIMARY_DIR ? "primary" : "blob") + std::string(" shard ") + std::to_string(seq_no); + const std::string shard_path = std::string(shard_parent_dir).append("/").append(std::to_string(seq_no)); + ledger_sync_worker.set_target_push_back(hpfs::sync_target{sync_name, prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); + } + } + /** * Save raw data from the consensused proposal. A blob file is only created if there is any user inputs or contract outputs * to save disk space. diff --git a/src/ledger/ledger.hpp b/src/ledger/ledger.hpp index 948a36ec..68271e0c 100644 --- a/src/ledger/ledger.hpp +++ b/src/ledger/ledger.hpp @@ -25,6 +25,11 @@ namespace ledger p2p::sequence_hash last_blob_shard_id; public: + // These flags will be marked as true after doing the shards cleanup and requesting + // at the first consensus round to align with the max shard counts. + std::atomic primary_shards_persisted = false; + std::atomic blob_shards_persisted = false; + const p2p::sequence_hash get_lcl_id() { std::shared_lock lock(lcl_mutex); @@ -87,6 +92,8 @@ namespace ledger void remove_old_shards(const uint64_t led_shard_no, std::string_view shard_parent_dir); + void persist_shard_history(const uint64_t shard_seq_no, std::string_view shard_parent_dir); + int get_last_ledger_and_update_context(std::string_view session_name, const p2p::sequence_hash &last_primary_shard_id); int get_last_shard_info(std::string_view session_name, p2p::sequence_hash &last_shard_id, const std::string &shard_parent_dir); diff --git a/src/ledger/ledger_mount.cpp b/src/ledger/ledger_mount.cpp index 1c8b108e..206c6cc4 100644 --- a/src/ledger/ledger_mount.cpp +++ b/src/ledger/ledger_mount.cpp @@ -27,17 +27,6 @@ namespace ledger return -1; } - if (conf::cfg.node.history == conf::HISTORY::CUSTOM) - { - //Remove old primary shards that exceeds max shard range. - if (last_primary_shard_id.seq_no >= conf::cfg.node.history_config.max_primary_shards) - remove_old_shards(last_primary_shard_id.seq_no - conf::cfg.node.history_config.max_primary_shards + 1, PRIMARY_DIR); - - //Remove old blob shards that exceeds max shard range. - if (last_blob_shard_id.seq_no >= conf::cfg.node.history_config.max_blob_shards) - remove_old_shards(last_blob_shard_id.seq_no - conf::cfg.node.history_config.max_blob_shards + 1, BLOB_DIR); - } - if (release_rw_session() == -1) { LOG_ERROR << "Failed to release rw session at mount " << mount_dir << "."; diff --git a/src/ledger/ledger_sync.cpp b/src/ledger/ledger_sync.cpp index 6d9d0b92..9dc64d4c 100644 --- a/src/ledger/ledger_sync.cpp +++ b/src/ledger/ledger_sync.cpp @@ -68,6 +68,7 @@ namespace ledger } ctx.set_last_primary_shard_id(updated_primary_shard_id); last_primary_shard_seq_no = synced_shard_seq_no; + is_last_primary_shard_syncing = false; } if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all shards if this is a full history node. @@ -116,6 +117,7 @@ namespace ledger last_blob_shard_seq_no = synced_shard_seq_no; ctx.set_last_blob_shard_id(p2p::sequence_hash{synced_shard_seq_no, synced_target.hash}); + is_last_blob_shard_syncing = false; } if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all blob shards if this is a full history node. diff --git a/src/ledger/ledger_sync.hpp b/src/ledger/ledger_sync.hpp index 14000376..4500b7a1 100644 --- a/src/ledger/ledger_sync.hpp +++ b/src/ledger/ledger_sync.hpp @@ -13,6 +13,10 @@ namespace ledger private: void swap_collected_responses(); void on_current_sync_state_acheived(const hpfs::sync_target &synced_target); + + public: + std::atomic is_last_primary_shard_syncing = false; + std::atomic is_last_blob_shard_syncing = false; }; } // namespace ledger #endif \ No newline at end of file diff --git a/test/bin/hpfs b/test/bin/hpfs index e67bfc32..c2a5b7dc 100755 Binary files a/test/bin/hpfs and b/test/bin/hpfs differ