diff --git a/src/conf.cpp b/src/conf.cpp index bdd03e26..a571327c 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -867,7 +867,7 @@ namespace conf /** * Locks the config file. If already locked means there's another hpcore instance running in the same directory. * If so, log error and return, Otherwise lock the config. - * @return Returns 0 if lock is successfully aquired, -1 on error. + * @return Returns 0 if lock is successfully acquired, -1 on error. */ int set_config_lock() { @@ -891,7 +891,7 @@ namespace conf /** * Releases the config file and closes the opened file descriptor. - * @return Returns 0 if lock is successfully aquired, -1 on error. + * @return Returns 0 if lock is successfully acquired, -1 on error. */ int release_config_lock() { diff --git a/src/hpfs/hpfs_sync.cpp b/src/hpfs/hpfs_sync.cpp index 984a33c6..e27a2512 100644 --- a/src/hpfs/hpfs_sync.cpp +++ b/src/hpfs/hpfs_sync.cpp @@ -25,6 +25,9 @@ namespace hpfs // Max no. of repetitive reqeust resubmissions before abandoning the sync. constexpr uint16_t ABANDON_THRESHOLD = 20; + // No. of mulliseconds to wait before reacquiring hpfs rw session. + constexpr uint16_t HPFS_REAQUIRE_WAIT = 10; + constexpr int FILE_PERMS = 0644; /** @@ -171,6 +174,11 @@ namespace hpfs if (new_state == current_target.hash) { LOG_INFO << "Hpfs " << name << " sync: Target " << current_target.name << " hash achieved: " << new_state; + + // After every sync target completion, release and reacquire hpfs rw session so hpfs gets some room + // to update the last checkpoint. This helps any upcoming ro sessions to get updated file system state. + reacquire_rw_session(); + on_current_sync_state_acheived(current_target); // Start syncing to next target. @@ -532,21 +540,26 @@ namespace hpfs * @param vpath Virtual path of the fs. * @param dir_mode Metadata 'mode' of dir. * @param fs_entry_map Received fs entry map. - * @returns 0 on success, otherwise -1. + * @returns 0 on success and no fs write peformed. 1 if write performed. -1 on failure. */ int hpfs_sync::handle_fs_entry_response(std::string_view vpath, const mode_t dir_mode, std::unordered_map &fs_entry_map) { // Get the parent path of the fs entries we have received. LOG_DEBUG << "Hpfs " << name << " sync: Processing fs entries response for " << vpath; + bool write_performed = false; + // Create physical directory on our side if not exist. std::string parent_physical_path = fs_mount->rw_dir + vpath.data(); if (util::create_dir_tree_recursive(parent_physical_path) == -1) return -1; // Apply physical dir mode if received mode is different from our side. - if (apply_metadata_mode(parent_physical_path, dir_mode, true) == -1) + const int metadata_res = apply_metadata_mode(parent_physical_path, dir_mode, true); + if (metadata_res == -1) return -1; + else if (metadata_res == 1) + write_performed = true; // Get the children hash entries and compare with what we got from peer. std::vector existing_fs_entries; @@ -585,6 +598,7 @@ namespace hpfs !ex_entry.is_file && util::remove_directory_recursively(child_physical_path.c_str()) == -1) return -1; + write_performed = true; LOG_DEBUG << "Hpfs " << name << " sync: Deleted " << (ex_entry.is_file ? "file" : "dir") << " path " << child_vpath; } } @@ -604,7 +618,7 @@ namespace hpfs pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, fs_entry.hash}); } - return 0; + return write_performed ? 1 : 0; } /** @@ -614,13 +628,15 @@ namespace hpfs * @param hashes Received block hashes. * @param hash_count No. of received block hashes. * @param file_length Size of the file. - * @returns 0 on success, otherwise -1. + * @returns 0 on success and no write operation performed. 1 if write opreation peformed. -1 on failure. */ int hpfs_sync::handle_file_hashmap_response(std::string_view vpath, const mode_t file_mode, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length) { // Get the file path of the block hashes we have received. LOG_DEBUG << "Hpfs " << name << " sync: Processing file block hashes response for " << vpath; + bool write_performed = false; + // File block hashes on our side (file might not exist on our side). std::vector existing_hashes; if (fs_mount->get_file_block_hashes(existing_hashes, hpfs::RW_SESSION_NAME, vpath) == -1 && errno != ENOENT) @@ -643,14 +659,19 @@ namespace hpfs std::string file_physical_path = fs_mount->rw_dir + vpath.data(); if (truncate(file_physical_path.c_str(), file_length) == -1) return -1; + + write_performed = true; } // Apply physical file mode if received mode is different from our side. const std::string physical_path = fs_mount->rw_dir + vpath.data(); - if (apply_metadata_mode(physical_path, file_mode, false) == -1) + const int metadata_res = apply_metadata_mode(physical_path, file_mode, false); + if (metadata_res == -1) return -1; + else if (metadata_res == 1) + write_performed = true; - return 0; + return write_performed ? 1 : 0; } /** @@ -689,7 +710,7 @@ namespace hpfs /** * Applies the specified to local file/dir if different. If it's a file, this will create the file * if not exist. - * @returns 0 on success, otherwise -1. + * @returns 0 if no change made. 1 if a change was made. -1 on failure. */ int hpfs_sync::apply_metadata_mode(std::string_view physical_path, const mode_t mode, const bool is_dir) { @@ -708,7 +729,7 @@ namespace hpfs } else { - return 0; + return 1; } } @@ -724,9 +745,11 @@ namespace hpfs LOG_ERROR << errno << ": Error in applying file/dir mode. " << physical_path; return -1; } + + return 1; } - return 0; + return 0; // No change made. } /** @@ -772,4 +795,23 @@ namespace hpfs } } + /** + * Releases and reacquires the rw session after a short delay. + * This is used to give hpfs some room to update the last checkpoint during long runinng sync operations. + * @return 0 on success. -1 on failure. + */ + int hpfs_sync::reacquire_rw_session() + { + fs_mount->release_rw_session(); + util::sleep(HPFS_REAQUIRE_WAIT); + + if (fs_mount->acquire_rw_session() == -1) + { + LOG_ERROR << "Hpfs " << name << " sync: Error reacquring rw session."; + return -1; + } + + return 0; + } + } // namespace hpfs \ No newline at end of file diff --git a/src/hpfs/hpfs_sync.hpp b/src/hpfs/hpfs_sync.hpp index 97dded0a..8c06e51f 100644 --- a/src/hpfs/hpfs_sync.hpp +++ b/src/hpfs/hpfs_sync.hpp @@ -108,6 +108,8 @@ namespace hpfs // Move the collected responses from hpfs responses to a local response list. virtual void swap_collected_responses() = 0; // Must override in child classes. + int reacquire_rw_session(); + public: std::atomic is_syncing = false; diff --git a/src/ledger/ledger.cpp b/src/ledger/ledger.cpp index 946c240b..eee01a3e 100644 --- a/src/ledger/ledger.cpp +++ b/src/ledger/ledger.cpp @@ -107,7 +107,7 @@ namespace ledger */ int update_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users) { - // Aquire hpfs rw session before writing into shards. + // Acquire hpfs rw session before writing into shards. if (ledger_fs.acquire_rw_session() == -1) return -1; diff --git a/src/ledger/ledger_sync.cpp b/src/ledger/ledger_sync.cpp index 6b076dec..a4d472d9 100644 --- a/src/ledger/ledger_sync.cpp +++ b/src/ledger/ledger_sync.cpp @@ -69,18 +69,6 @@ namespace ledger // If existing max shard is older than the max we can keep. Then delete all the existing shards. remove_old_shards(ctx.get_lcl_id().seq_no, PRIMARY_SHARD_SIZE, conf::cfg.node.history_config.max_primary_shards, PRIMARY_DIR); - - // If node is in full history mode. Restarting the fs mount, So primary ledger shard sync changes would be reflected in the ro sessions. - // Which is used for hpfs log sync. - if (conf::cfg.node.history == conf::HISTORY::FULL) - { - fs_mount->release_rw_session(); - if (fs_mount->acquire_rw_session() == -1) - { - LOG_ERROR << "Error acquring rw session after achieving primary shard."; - return; - } - } } if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all shards if this is a full history node. diff --git a/src/util/util.cpp b/src/util/util.cpp index f6bb2a83..97cfb204 100644 --- a/src/util/util.cpp +++ b/src/util/util.cpp @@ -382,7 +382,7 @@ namespace util * @param is_rwlock Whether the record lock is a write lock. * @param start Starting offset for the lock. * @param len Number of bytes to lock. - * @return Returns 0 if lock is successfully aquired, -1 on error. + * @return Returns 0 if lock is successfully acquired, -1 on error. */ int set_lock(const int fd, struct flock &lock, const bool is_rwlock, const off_t start, const off_t len) {