mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Reacquire hpfs rw session after each sync target completion. (#296)
This commit is contained in:
@@ -867,7 +867,7 @@ namespace conf
|
||||
/**
|
||||
* Locks the config file. If already locked means there's another hpcore instance running in the same directory.
|
||||
* If so, log error and return, Otherwise lock the config.
|
||||
* @return Returns 0 if lock is successfully aquired, -1 on error.
|
||||
* @return Returns 0 if lock is successfully acquired, -1 on error.
|
||||
*/
|
||||
int set_config_lock()
|
||||
{
|
||||
@@ -891,7 +891,7 @@ namespace conf
|
||||
|
||||
/**
|
||||
* Releases the config file and closes the opened file descriptor.
|
||||
* @return Returns 0 if lock is successfully aquired, -1 on error.
|
||||
* @return Returns 0 if lock is successfully acquired, -1 on error.
|
||||
*/
|
||||
int release_config_lock()
|
||||
{
|
||||
|
||||
@@ -25,6 +25,9 @@ namespace hpfs
|
||||
// Max no. of repetitive reqeust resubmissions before abandoning the sync.
|
||||
constexpr uint16_t ABANDON_THRESHOLD = 20;
|
||||
|
||||
// No. of mulliseconds to wait before reacquiring hpfs rw session.
|
||||
constexpr uint16_t HPFS_REAQUIRE_WAIT = 10;
|
||||
|
||||
constexpr int FILE_PERMS = 0644;
|
||||
|
||||
/**
|
||||
@@ -171,6 +174,11 @@ namespace hpfs
|
||||
if (new_state == current_target.hash)
|
||||
{
|
||||
LOG_INFO << "Hpfs " << name << " sync: Target " << current_target.name << " hash achieved: " << new_state;
|
||||
|
||||
// After every sync target completion, release and reacquire hpfs rw session so hpfs gets some room
|
||||
// to update the last checkpoint. This helps any upcoming ro sessions to get updated file system state.
|
||||
reacquire_rw_session();
|
||||
|
||||
on_current_sync_state_acheived(current_target);
|
||||
|
||||
// Start syncing to next target.
|
||||
@@ -532,21 +540,26 @@ namespace hpfs
|
||||
* @param vpath Virtual path of the fs.
|
||||
* @param dir_mode Metadata 'mode' of dir.
|
||||
* @param fs_entry_map Received fs entry map.
|
||||
* @returns 0 on success, otherwise -1.
|
||||
* @returns 0 on success and no fs write peformed. 1 if write performed. -1 on failure.
|
||||
*/
|
||||
int hpfs_sync::handle_fs_entry_response(std::string_view vpath, const mode_t dir_mode, std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entry_map)
|
||||
{
|
||||
// Get the parent path of the fs entries we have received.
|
||||
LOG_DEBUG << "Hpfs " << name << " sync: Processing fs entries response for " << vpath;
|
||||
|
||||
bool write_performed = false;
|
||||
|
||||
// Create physical directory on our side if not exist.
|
||||
std::string parent_physical_path = fs_mount->rw_dir + vpath.data();
|
||||
if (util::create_dir_tree_recursive(parent_physical_path) == -1)
|
||||
return -1;
|
||||
|
||||
// Apply physical dir mode if received mode is different from our side.
|
||||
if (apply_metadata_mode(parent_physical_path, dir_mode, true) == -1)
|
||||
const int metadata_res = apply_metadata_mode(parent_physical_path, dir_mode, true);
|
||||
if (metadata_res == -1)
|
||||
return -1;
|
||||
else if (metadata_res == 1)
|
||||
write_performed = true;
|
||||
|
||||
// Get the children hash entries and compare with what we got from peer.
|
||||
std::vector<hpfs::child_hash_node> existing_fs_entries;
|
||||
@@ -585,6 +598,7 @@ namespace hpfs
|
||||
!ex_entry.is_file && util::remove_directory_recursively(child_physical_path.c_str()) == -1)
|
||||
return -1;
|
||||
|
||||
write_performed = true;
|
||||
LOG_DEBUG << "Hpfs " << name << " sync: Deleted " << (ex_entry.is_file ? "file" : "dir") << " path " << child_vpath;
|
||||
}
|
||||
}
|
||||
@@ -604,7 +618,7 @@ namespace hpfs
|
||||
pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, fs_entry.hash});
|
||||
}
|
||||
|
||||
return 0;
|
||||
return write_performed ? 1 : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -614,13 +628,15 @@ namespace hpfs
|
||||
* @param hashes Received block hashes.
|
||||
* @param hash_count No. of received block hashes.
|
||||
* @param file_length Size of the file.
|
||||
* @returns 0 on success, otherwise -1.
|
||||
* @returns 0 on success and no write operation performed. 1 if write opreation peformed. -1 on failure.
|
||||
*/
|
||||
int hpfs_sync::handle_file_hashmap_response(std::string_view vpath, const mode_t file_mode, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length)
|
||||
{
|
||||
// Get the file path of the block hashes we have received.
|
||||
LOG_DEBUG << "Hpfs " << name << " sync: Processing file block hashes response for " << vpath;
|
||||
|
||||
bool write_performed = false;
|
||||
|
||||
// File block hashes on our side (file might not exist on our side).
|
||||
std::vector<util::h32> existing_hashes;
|
||||
if (fs_mount->get_file_block_hashes(existing_hashes, hpfs::RW_SESSION_NAME, vpath) == -1 && errno != ENOENT)
|
||||
@@ -643,14 +659,19 @@ namespace hpfs
|
||||
std::string file_physical_path = fs_mount->rw_dir + vpath.data();
|
||||
if (truncate(file_physical_path.c_str(), file_length) == -1)
|
||||
return -1;
|
||||
|
||||
write_performed = true;
|
||||
}
|
||||
|
||||
// Apply physical file mode if received mode is different from our side.
|
||||
const std::string physical_path = fs_mount->rw_dir + vpath.data();
|
||||
if (apply_metadata_mode(physical_path, file_mode, false) == -1)
|
||||
const int metadata_res = apply_metadata_mode(physical_path, file_mode, false);
|
||||
if (metadata_res == -1)
|
||||
return -1;
|
||||
else if (metadata_res == 1)
|
||||
write_performed = true;
|
||||
|
||||
return 0;
|
||||
return write_performed ? 1 : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -689,7 +710,7 @@ namespace hpfs
|
||||
/**
|
||||
* Applies the specified to local file/dir if different. If it's a file, this will create the file
|
||||
* if not exist.
|
||||
* @returns 0 on success, otherwise -1.
|
||||
* @returns 0 if no change made. 1 if a change was made. -1 on failure.
|
||||
*/
|
||||
int hpfs_sync::apply_metadata_mode(std::string_view physical_path, const mode_t mode, const bool is_dir)
|
||||
{
|
||||
@@ -708,7 +729,7 @@ namespace hpfs
|
||||
}
|
||||
else
|
||||
{
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -724,9 +745,11 @@ namespace hpfs
|
||||
LOG_ERROR << errno << ": Error in applying file/dir mode. " << physical_path;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return 0; // No change made.
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -772,4 +795,23 @@ namespace hpfs
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases and reacquires the rw session after a short delay.
|
||||
* This is used to give hpfs some room to update the last checkpoint during long runinng sync operations.
|
||||
* @return 0 on success. -1 on failure.
|
||||
*/
|
||||
int hpfs_sync::reacquire_rw_session()
|
||||
{
|
||||
fs_mount->release_rw_session();
|
||||
util::sleep(HPFS_REAQUIRE_WAIT);
|
||||
|
||||
if (fs_mount->acquire_rw_session() == -1)
|
||||
{
|
||||
LOG_ERROR << "Hpfs " << name << " sync: Error reacquring rw session.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
} // namespace hpfs
|
||||
@@ -108,6 +108,8 @@ namespace hpfs
|
||||
// Move the collected responses from hpfs responses to a local response list.
|
||||
virtual void swap_collected_responses() = 0; // Must override in child classes.
|
||||
|
||||
int reacquire_rw_session();
|
||||
|
||||
public:
|
||||
std::atomic<bool> is_syncing = false;
|
||||
|
||||
|
||||
@@ -107,7 +107,7 @@ namespace ledger
|
||||
*/
|
||||
int update_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users)
|
||||
{
|
||||
// Aquire hpfs rw session before writing into shards.
|
||||
// Acquire hpfs rw session before writing into shards.
|
||||
if (ledger_fs.acquire_rw_session() == -1)
|
||||
return -1;
|
||||
|
||||
|
||||
@@ -69,18 +69,6 @@ namespace ledger
|
||||
|
||||
// If existing max shard is older than the max we can keep. Then delete all the existing shards.
|
||||
remove_old_shards(ctx.get_lcl_id().seq_no, PRIMARY_SHARD_SIZE, conf::cfg.node.history_config.max_primary_shards, PRIMARY_DIR);
|
||||
|
||||
// If node is in full history mode. Restarting the fs mount, So primary ledger shard sync changes would be reflected in the ro sessions.
|
||||
// Which is used for hpfs log sync.
|
||||
if (conf::cfg.node.history == conf::HISTORY::FULL)
|
||||
{
|
||||
fs_mount->release_rw_session();
|
||||
if (fs_mount->acquire_rw_session() == -1)
|
||||
{
|
||||
LOG_ERROR << "Error acquring rw session after achieving primary shard.";
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all shards if this is a full history node.
|
||||
|
||||
@@ -382,7 +382,7 @@ namespace util
|
||||
* @param is_rwlock Whether the record lock is a write lock.
|
||||
* @param start Starting offset for the lock.
|
||||
* @param len Number of bytes to lock.
|
||||
* @return Returns 0 if lock is successfully aquired, -1 on error.
|
||||
* @return Returns 0 if lock is successfully acquired, -1 on error.
|
||||
*/
|
||||
int set_lock(const int fd, struct flock &lock, const bool is_rwlock, const off_t start, const off_t len)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user