hpfs sync target refactoring. (#302)

This commit is contained in:
Ravin Perera
2021-05-04 16:33:19 +05:30
committed by GitHub
parent f85059ea2a
commit ef2bb22b67
13 changed files with 177 additions and 213 deletions

View File

@@ -30,6 +30,18 @@ namespace hpfs
constexpr int FILE_PERMS = 0644;
#define SYNC_ERROR -1
#define SYNC_ABANDONED 0
#define SYNC_ACHIEVED 1
#define SYNC_PRIORITY_CHANGED 2
#define SYNC_HASH_CHANGED 3
#define REQUEST_LOOP_INTERRUPT \
{ \
const int res = sync_interrupt(target); \
if (res != -1) \
return res; \
}
/**
* This should be called to activate the hpfs sync.
*/
@@ -58,28 +70,6 @@ namespace hpfs
}
}
/**
* Sets a list of sync targets. Sync finishes when all the targets are synced.
* Syncing happens sequentially.
* @param sync_target_list List of sync targets to sync towards.
*/
void hpfs_sync::set_target(const std::list<sync_target> &sync_target_list)
{
if (sync_target_list.empty())
return;
// Do not do anything if we are already syncing towards the specified target states.
if (is_shutting_down || (is_syncing && original_target_list == sync_target_list))
return;
original_target_list = sync_target_list;
target_list = std::move(sync_target_list);
std::unique_lock lock(current_target_mutex);
current_target = target_list.front(); // Make the first element of the list the first target to sync.
is_syncing = true;
}
/**
* This sets a prioritized sync target. This target will replace current sync target.
* This target will immediately starting to sync and the interupted sync will resume
@@ -87,11 +77,10 @@ namespace hpfs
*/
void hpfs_sync::set_target_push_front(const sync_target &target)
{
{
std::shared_lock lock(current_target_mutex);
if (is_shutting_down || (is_syncing && current_target == target))
return;
}
std::unique_lock lock(target_mutex);
if (is_shutting_down || (is_syncing && target_list.front() == target))
return;
// Remove any previous sync targets for the same target vpath.
target_list.remove_if([&target](const hpfs::sync_target &element) {
@@ -100,9 +89,6 @@ namespace hpfs
target_list.push_front(target);
is_syncing = true;
std::unique_lock lock(current_target_mutex);
// Make the first element of the list the first target to sync.
current_target = target_list.front();
}
/**
@@ -111,24 +97,25 @@ namespace hpfs
*/
void hpfs_sync::set_target_push_back(const sync_target &target)
{
// Current_target_mutex is not required since this function is currently used in a unique_lock
// scope.
if (is_shutting_down || (is_syncing && current_target == target))
std::unique_lock lock(target_mutex);
if (is_shutting_down)
return;
// Check whether this target is already in the sync target list.
const auto itr = std::find(target_list.begin(), target_list.end(), target);
// Check whether the same vpath target is already in the sync target list. If so, update it's information.
const auto itr = std::find_if(target_list.begin(), target_list.end(),
[&target](const hpfs::sync_target &element) {
return element.vpath == target.vpath;
});
if (itr != target_list.end())
{
itr->hash = target.hash;
itr->item_type = target.item_type;
return;
}
target_list.push_back(target);
if (!is_syncing)
{
std::unique_lock lock(current_target_mutex);
// Make the first element of the list the first target to sync.
current_target = target_list.front();
is_syncing = true;
}
is_syncing = true;
}
/**
@@ -149,81 +136,74 @@ namespace hpfs
if (!is_syncing)
continue;
bool is_sync_complete = false;
if (fs_mount->acquire_rw_session() != -1)
{
while (!is_shutting_down)
{
{
std::shared_lock lock(current_target_mutex);
LOG_INFO << "Hpfs " << name << " sync: Starting sync for target " << current_target.name << " hash: " << current_target.hash;
}
util::h32 new_state = util::h32_empty;
const int result = request_loop(current_target.hash, new_state);
pending_requests.clear();
candidate_hpfs_responses.clear();
submitted_requests.clear();
if (result == -1 || result == 1 || is_shutting_down)
break;
{
std::unique_lock lock(current_target_mutex);
if (new_state == current_target.hash)
{
LOG_INFO << "Hpfs " << name << " sync: Target " << current_target.name << " hash achieved: " << new_state;
// After every sync target completion, release and reacquire hpfs rw session so hpfs gets some room
// to update the last checkpoint. This helps any upcoming ro sessions to get updated file system state.
reacquire_rw_session();
on_current_sync_state_acheived(current_target);
// Start syncing to next target.
const int result = start_syncing_next_target();
if (result == 0)
break;
else if (result == 1)
continue;
}
else
{
LOG_INFO << "Hpfs " << name << " sync: Continuing sync for new " << current_target.name << " hash: " << current_target.hash;
continue;
}
}
}
fs_mount->release_rw_session();
is_sync_complete = true;
}
else
if (fs_mount->acquire_rw_session() == -1)
{
LOG_ERROR << "Hpfs " << name << " sync: Failed to start hpfs rw session";
continue;
}
// Clear target list and original target list since the sync is complete.
target_list = {};
original_target_list = {};
is_syncing = false;
const sync_target last_sync_target = current_target;
current_target = {};
if (is_sync_complete)
on_sync_complete(last_sync_target);
while (!is_shutting_down)
{
hpfs::sync_target current_target = {};
{
std::shared_lock lock(target_mutex);
if (target_list.empty())
{
LOG_INFO << "Hpfs " << name << " sync: All targets complete.";
is_syncing = false;
break;
}
// Set the target to be top of the list.
current_target = target_list.front();
}
// Start syncing towards specified target.
const int result = request_loop(current_target);
pending_requests.clear();
candidate_hpfs_responses.clear();
submitted_requests.clear();
if (is_shutting_down)
break;
if (result != SYNC_HASH_CHANGED)
{
// The finished target can be removed from the list.
{
std::unique_lock lock(target_mutex);
target_list.pop_front();
}
// After every sync target abandon or completion, release and reacquire hpfs rw session so hpfs gets some room
// to update the last checkpoint. This helps any upcoming ro sessions to get updated file system state.
reacquire_rw_session();
if (result == SYNC_ABANDONED)
on_sync_target_abandoned();
else if (result == SYNC_ACHIEVED)
on_sync_target_acheived(current_target);
}
}
fs_mount->release_rw_session();
}
LOG_INFO << "Hpfs " << name << " sync: Worker stopped.";
}
/**
* Reqest loop.
* @return -1 on error. 0 when current sync state acheived or sync is stopped due to target change.
* Returns 1 on successfully finishing all the sync targets.
*/
int hpfs_sync::request_loop(const util::h32 current_target_hash, util::h32 &updated_state)
* Reqest loop which syncs towards the specified target.
* @return 0 when sync is abandoned due to resubmission threshold or shutdown. 1 when target sync hash acheived.
* 2 when target has been re-prioritized. 3 when target hash has changed. -1 on error.
*/
int hpfs_sync::request_loop(const hpfs::sync_target &target)
{
LOG_INFO << "Hpfs " << name << " sync: Starting target:" << target.hash << " " << target.vpath;
// Send the initial root hpfs request of the current target.
submit_request(backlog_item{current_target.item_type, current_target.vpath, -1, current_target_hash});
submit_request(backlog_item{target.item_type, target.vpath, -1, target.hash});
// Indicates whether any responses were processed in the previous loop iteration.
bool prev_responses_processed = false;
@@ -231,8 +211,10 @@ namespace hpfs
// No. of repetitive resubmissions so far. (This is reset whenever we receive a hpfs response)
uint16_t resubmissions_count = 0;
while (!should_stop_request_loop(current_target_hash))
while (true)
{
REQUEST_LOOP_INTERRUPT
// Wait a small delay if there were no responses processed during previous iteration.
if (!prev_responses_processed)
util::sleep(REQUEST_LOOP_WAIT);
@@ -248,8 +230,7 @@ namespace hpfs
for (auto &response : candidate_hpfs_responses)
{
if (should_stop_request_loop(current_target_hash))
return 0;
REQUEST_LOOP_INTERRUPT
const std::string from = response.first.substr(2, 10); // Sender pubkey.
const p2pmsg::P2PMsg &msg = *p2pmsg::GetP2PMsg(response.second.data());
@@ -342,19 +323,24 @@ namespace hpfs
// After handling each response, check whether we have reached target hpfs state.
// get_hash returns 0 incase target parent is not existing in our side.
if (fs_mount->get_hash(updated_state, hpfs::RW_SESSION_NAME, current_target.vpath) == -1)
util::h32 updated_state = util::h32_empty;
if (fs_mount->get_hash(updated_state, hpfs::RW_SESSION_NAME, target.vpath) == -1)
{
LOG_ERROR << "Hpfs " << name << " sync: exiting due to hash check error.";
return -1;
LOG_ERROR << "Hpfs " << name << " sync: Abandoning due to hash check error. " << target.vpath;
return SYNC_ERROR;
}
// Update the central hpfs state tracker.
fs_mount->set_parent_hash(current_target.vpath, updated_state);
fs_mount->set_parent_hash(target.vpath, updated_state);
LOG_DEBUG << "Hpfs " << name << " sync: current:" << updated_state << " | target:" << current_target_hash
<< " (" << current_target.vpath << ")";
if (updated_state == current_target_hash)
return 0;
// End the loop if sync is complete.
if (updated_state == target.hash)
{
LOG_INFO << "Hpfs " << name << " sync: Achieved target:" << target.hash << " " << target.vpath;
return SYNC_ACHIEVED;
}
LOG_DEBUG << "Hpfs " << name << " sync: Current:" << updated_state << " | target:" << target.hash << " " << target.vpath;
}
candidate_hpfs_responses.clear();
@@ -365,8 +351,7 @@ namespace hpfs
// Check for long-awaited responses and re-request them.
for (auto &[hash, request] : submitted_requests)
{
if (should_stop_request_loop(current_target_hash))
return 0;
REQUEST_LOOP_INTERRUPT
if (request.waiting_time < request_resubmit_timeout)
{
@@ -377,17 +362,8 @@ namespace hpfs
{
if (++resubmissions_count > ABANDON_THRESHOLD)
{
LOG_INFO << "Hpfs " << name << " sync: Resubmission threshold exceeded. Abandoning sync.";
std::unique_lock lock(current_target_mutex);
const int result = start_syncing_next_target();
if (result == 0)
{
current_target = {};
on_sync_abandoned();
return 1; // To stop syncing since we have sync all the targets.
}
return 0;
LOG_INFO << "Hpfs " << name << " sync: Sync abandoned due to resubmission threshold. " << target.vpath;
return SYNC_ABANDONED;
}
// Reset the counter and re-submit request.
@@ -402,15 +378,48 @@ namespace hpfs
const uint16_t available_slots = MAX_AWAITING_REQUESTS - submitted_requests.size();
for (int i = 0; i < available_slots && !pending_requests.empty(); i++)
{
if (should_stop_request_loop(current_target_hash))
return 0;
REQUEST_LOOP_INTERRUPT
submit_request(pending_requests.front());
pending_requests.pop_front();
}
}
}
return 0;
return SYNC_ABANDONED;
}
/**
* Indicates whether to break out of hpfs request processing loop and the reason.
* @return 0 if interrupted due to shutdown. 2 when target has been re-prioritized.
* 3 when target hash has changed. -1 if not interrupted.
*/
int hpfs_sync::sync_interrupt(const hpfs::sync_target &target)
{
if (is_shutting_down)
{
LOG_INFO << "Hpfs " << name << " sync: Sync abandoned due to shutdown. " << target.vpath;
return SYNC_ABANDONED;
}
// Stop request loop if the target has changed.
std::shared_lock lock(target_mutex);
const hpfs::sync_target &top_target = target_list.front();
if (target.vpath != top_target.vpath)
{
// A new high priority target has been set.
LOG_INFO << "Hpfs " << name << " sync: Higher priority target found. Abandoned: " << target.vpath;
return SYNC_PRIORITY_CHANGED;
}
else if (target.hash != top_target.hash)
{
// Hash changed of current target.
LOG_INFO << "Hpfs " << name << " sync: Target updated. New hash:" << top_target.hash << " " << top_target.vpath;
return SYNC_HASH_CHANGED;
}
return -1; // No interrupt.
}
/**
@@ -493,19 +502,6 @@ namespace hpfs
return crypto::get_hash(offset, buf) == hash;
}
/**
* Indicates whether to break out of hpfs request processing loop.
*/
bool hpfs_sync::should_stop_request_loop(const util::h32 &current_target_hash)
{
if (is_shutting_down)
return true;
// Stop request loop if the target has changed.
std::shared_lock lock(current_target_mutex);
return current_target_hash != current_target.hash;
}
/**
* Sends a hpfs request to a random peer.
* @param path Requested file or dir path.
@@ -776,7 +772,7 @@ namespace hpfs
* This method can be used to invoke mount specific custom logic (after overriding this method) to be executed after
* a sync target is acheived.
*/
void hpfs_sync::on_current_sync_state_acheived(const sync_target &synced_target)
void hpfs_sync::on_sync_target_acheived(const sync_target &synced_target)
{
}
@@ -784,7 +780,7 @@ namespace hpfs
* This method can be used to invoke mount specific custom logic (after overriding this method) to be executed after
* a sync is abondened.
*/
void hpfs_sync::on_sync_abandoned()
void hpfs_sync::on_sync_target_abandoned()
{
}
@@ -794,25 +790,7 @@ namespace hpfs
*/
void hpfs_sync::on_sync_complete(const sync_target &last_sync_target)
{
LOG_INFO << "Hpfs " << name << " sync: All parents synced.";
}
/**
* Starts syncing next target if available after current target finishes.
* @return returns 0 when the full sync is complete and 1 when more sync targets are available.
*/
int hpfs_sync::start_syncing_next_target()
{
target_list.pop_front(); // Remove the synced parent from the target list.
if (target_list.empty())
{
return 0;
}
else
{
current_target = target_list.front();
return 1;
}
LOG_INFO << "Hpfs " << name << " sync: All targets synced.";
}
/**