Sync and consensus improvements. (#312)

* Refactored hpfs sync to support parallel target sync.
* Removed role change in consensus.
This commit is contained in:
Ravin Perera
2021-05-25 21:43:08 +05:30
committed by GitHub
parent 6cfa47418c
commit b172152cba
10 changed files with 435 additions and 434 deletions

View File

@@ -106,9 +106,6 @@ namespace consensus
// arived ones and expired ones.
revise_candidate_proposals();
// If possible, switch back to validator mode before stage processing. (if we were syncing before)
check_sync_completion();
// Get current lcl, state, patch, primary shard and raw shard info.
p2p::sequence_hash lcl_id = ledger::ctx.get_lcl_id();
util::h32 state_hash = sc::contract_fs.get_parent_hash(sc::STATE_DIR_PATH);
@@ -142,9 +139,9 @@ namespace consensus
const size_t unl_count = unl::count();
vote_counter votes;
const int sync_status = check_sync_status(unl_count, votes, lcl_id);
ctx.sync_status = check_sync_status(unl_count, votes, lcl_id);
if (sync_status == -2) // Unreliable votes.
if (ctx.sync_status == -2) // Unreliable votes.
{
ctx.unreliable_votes_attempts++;
if (ctx.unreliable_votes_attempts >= MAX_UNRELIABLE_VOTES_ATTEMPTS)
@@ -158,7 +155,7 @@ namespace consensus
ctx.unreliable_votes_attempts = 0;
}
if (sync_status == 0)
if (ctx.sync_status == 0)
{
// If we are in sync, vote and broadcast the winning votes to next stage.
const p2p::proposal p = create_stage123_proposal(votes, unl_count, state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id);
@@ -180,12 +177,9 @@ namespace consensus
}
}
// We have finished a consensus stage. Transition or reset stage based on sync status.
if (sync_status == -2)
ctx.stage = 0; // Majority last primary shard unreliable. Reset to stage 0.
else
ctx.stage = (ctx.stage + 1) % 4; // Transition to next stage. (if at stage 3 go to next round stage 0)
// We have finished a consensus stage.
// Transition to next stage. (if at stage 3 go to next round stage 0)
ctx.stage = (ctx.stage + 1) % 4;
}
return 0;
@@ -228,7 +222,7 @@ namespace consensus
/**
* Checks whether we are in sync with the received votes.
* @return 0 if we are in sync. -1 on ledger or hpfs desync. -2 if majority last ledger primary shard hash unreliable.
* @return 0 if we are in sync. -1 on ledger or contract state desync. -2 if majority last ledger primary shard hash unreliable.
*/
int check_sync_status(const size_t unl_count, vote_counter &votes, const p2p::sequence_hash &lcl_id)
{
@@ -240,13 +234,11 @@ namespace consensus
// Last primary shard hash sync is commenced if we are out-of-sync with majority last primary shard hash.
if (is_last_primary_shard_desync)
{
conf::change_role(conf::ROLE::OBSERVER);
// We first request the latest shard.
const std::string majority_shard_seq_no_str = std::to_string(majority_primary_shard_id.seq_no);
const std::string shard_path = std::string(ledger::PRIMARY_DIR).append("/").append(majority_shard_seq_no_str);
ledger::ledger_sync_worker.is_last_primary_shard_syncing = true;
ledger::ledger_sync_worker.set_target_push_front(hpfs::sync_target{majority_primary_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR});
ledger::ledger_sync_worker.set_target(true, shard_path, majority_primary_shard_id.hash, true);
}
// Check out raw shard hash with majority raw shard hash.
@@ -270,8 +262,6 @@ namespace consensus
// Start hpfs sync if we are out-of-sync with majority hpfs patch hash or state hash.
if (is_state_desync || is_patch_desync)
{
conf::change_role(conf::ROLE::OBSERVER);
if (conf::cfg.node.history == conf::HISTORY::FULL)
{
// If state or patch is desync set target for the hpfs log sync with the next lcl seq_no.
@@ -284,21 +274,20 @@ namespace consensus
{
// Patch file sync is prioritized, Therefore it is set in the front of the sync target list.
if (is_patch_desync)
sc::contract_sync_worker.set_target_push_front(hpfs::sync_target{majority_patch_hash, sc::PATCH_FILE_PATH, hpfs::BACKLOG_ITEM_TYPE::FILE});
sc::contract_sync_worker.set_target(false, sc::PATCH_FILE_PATH, majority_patch_hash, true);
if (is_state_desync)
sc::contract_sync_worker.set_target_push_back(hpfs::sync_target{majority_state_hash, sc::STATE_DIR_PATH, hpfs::BACKLOG_ITEM_TYPE::DIR});
sc::contract_sync_worker.set_target(true, sc::STATE_DIR_PATH, majority_state_hash);
}
}
// If ledger raw shard is desync, We first request the latest raw shard.
if (is_last_raw_shard_desync)
{
conf::change_role(conf::ROLE::OBSERVER);
const std::string majority_shard_seq_no_str = std::to_string(majority_raw_shard_id.seq_no);
const std::string shard_path = std::string(ledger::RAW_DIR).append("/").append(majority_shard_seq_no_str);
ledger::ledger_sync_worker.is_last_raw_shard_syncing = true;
ledger::ledger_sync_worker.set_target_push_back(hpfs::sync_target{majority_raw_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR});
ledger::ledger_sync_worker.set_target(true, shard_path, majority_raw_shard_id.hash);
}
// If shards aren't aligned with max shard count, do the relevant shard cleanups and requests.
@@ -317,7 +306,6 @@ namespace consensus
// Proceed further only if last primary shard, last raw shard, state and patch hashes are in sync with majority.
if (!is_last_primary_shard_desync && !is_last_raw_shard_desync && !is_state_desync && !is_patch_desync)
{
conf::change_role(conf::ROLE::VALIDATOR);
return 0;
}
@@ -329,19 +317,6 @@ namespace consensus
return -2;
}
/**
* Checks whether we can switch back from currently ongoing observer-mode sync operation
* that has been completed.
*/
void check_sync_completion()
{
const bool is_contract_syncing = (conf::cfg.node.history == conf::HISTORY::FULL) ? sc::hpfs_log_sync::sync_ctx.is_syncing : sc::contract_sync_worker.is_syncing;
// In ledger sync we only concern about last shard sync status to proceed with consensus.
const bool is_ledger_syncing = ledger::ledger_sync_worker.is_last_primary_shard_syncing || ledger::ledger_sync_worker.is_last_raw_shard_syncing;
if (conf::cfg.node.role == conf::ROLE::OBSERVER && !is_contract_syncing && !is_ledger_syncing)
conf::change_role(conf::ROLE::VALIDATOR);
}
/**
* Moves proposals collected from the network into candidate proposals and
* cleans up any outdated proposals from the candidate set.
@@ -394,8 +369,9 @@ namespace consensus
{
const p2p::proposal &cp = itr->second;
// Only consider this round's proposals which are from current or previous stage.
const bool stage_valid = ctx.stage >= cp.stage && (ctx.stage - cp.stage) <= 1;
// If we are in sync, only consider this round's proposals which are from current or previous stage.
// Otherwise consider all proposals as long as they are from the same round.
const bool stage_valid = ctx.sync_status == 0 ? (ctx.stage >= cp.stage && (ctx.stage - cp.stage) <= 1) : true;
const bool keep_candidate = (ctx.round_start_time == cp.time) && stage_valid;
LOG_DEBUG << (keep_candidate ? "Prop--->" : "Erased")
<< " [s" << std::to_string(cp.stage)
@@ -880,7 +856,7 @@ namespace consensus
}
}
// time is voted on a simple sorted (highest to lowest) and majority basis.
// time is voted on majority basis.
uint32_t highest_time_vote = 0;
for (const auto &[time, numvotes] : votes.time)
{

View File

@@ -111,6 +111,7 @@ namespace consensus
uint32_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage.
uint64_t round_boundry_offset = 0; // Time window boundry offset based on contract id.
uint16_t unreliable_votes_attempts = 0; // No. of times we failed to get reliable votes continously.
int sync_status = 0; // Current sync status.
std::optional<sc::execution_context> contract_ctx;
std::mutex contract_ctx_mutex;
@@ -153,8 +154,6 @@ namespace consensus
int check_sync_status(const size_t unl_count, vote_counter &votes, const p2p::sequence_hash &lcl_id);
void check_sync_completion();
void revise_candidate_proposals();
int prepare_consensed_users(consensed_user_map &consensed_users, const p2p::proposal &cons_prop);

View File

@@ -19,9 +19,6 @@ namespace hpfs
// Max number of requests that can be awaiting response at any given time.
constexpr uint16_t MAX_AWAITING_REQUESTS = 4;
// Request loop sleep time (milliseconds).
constexpr uint16_t REQUEST_LOOP_WAIT = 10;
// Max no. of repetitive reqeust resubmissions before abandoning the sync.
constexpr uint16_t ABANDON_THRESHOLD = 20;
@@ -30,17 +27,8 @@ namespace hpfs
constexpr int FILE_PERMS = 0644;
#define SYNC_ERROR -1
#define SYNC_ABANDONED 0
#define SYNC_ACHIEVED 1
#define SYNC_PRIORITY_CHANGED 2
#define SYNC_HASH_CHANGED 3
#define REQUEST_LOOP_INTERRUPT \
{ \
const int res = sync_interrupt(target); \
if (res != -1) \
return res; \
}
// Locates the ongoing target for the provided request vpath. (Matched if target vpath is an ancestor path of the request vpath)
#define TARGET_OF_REQUEST(req_vpath) std::find_if(ongoing_targets.begin(), ongoing_targets.end(), [&](sync_item &t) { return req_vpath.rfind(t.vpath, 0) == 0; })
/**
* This should be called to activate the hpfs sync.
@@ -71,50 +59,18 @@ namespace hpfs
}
/**
* This sets a prioritized sync target. This target will replace current sync target.
* This target will immediately starting to sync and the interupted sync will resume
* once this sync target is acheived.
*/
void hpfs_sync::set_target_push_front(const sync_target &target)
* Add or update a sync target.
* @param is_dir whether the sync target is a dir or file.
* @param vpath The vpath of to sync.
* @param hash Target hash to achieve.
* @param high_priority Whether this target should be given higher priority over other ongoing targets.
*/
void hpfs_sync::set_target(const bool is_dir, const std::string &vpath,
const util::h32 &hash, const bool high_priority)
{
std::unique_lock lock(target_mutex);
if (is_shutting_down || (is_syncing && target_list.front() == target))
return;
// Remove any previous sync targets for the same target vpath.
target_list.remove_if([&target](const hpfs::sync_target &element) {
return element.vpath == target.vpath;
});
target_list.push_front(target);
is_syncing = true;
}
/**
* Adds a new target to the syncing list. If the list was previously empty, current target
* will be updated and syncing will start.
*/
void hpfs_sync::set_target_push_back(const sync_target &target)
{
std::unique_lock lock(target_mutex);
if (is_shutting_down)
return;
// Check whether the same vpath target is already in the sync target list. If so, update it's information.
const auto itr = std::find_if(target_list.begin(), target_list.end(),
[&target](const hpfs::sync_target &element) {
return element.vpath == target.vpath;
});
if (itr != target_list.end())
{
itr->hash = target.hash;
itr->item_type = target.item_type;
return;
}
target_list.push_back(target);
std::unique_lock lock(incoming_targets_mutex);
incoming_targets.emplace(sync_item{
(is_dir ? SYNC_ITEM_TYPE::DIR : SYNC_ITEM_TYPE::FILE), vpath, -1, hash, high_priority});
is_syncing = true;
}
@@ -124,303 +80,360 @@ namespace hpfs
void hpfs_sync::hpfs_syncer_loop()
{
util::mask_signal();
LOG_INFO << "Hpfs " << name << " sync: Worker started.";
// Indicates whether any responses were processed in the previous loop iteration.
bool prev_responses_processed = false;
while (!is_shutting_down)
{
util::sleep(IDLE_WAIT);
// Wait a small delay if there were no responses processed during previous iteration.
if (!prev_responses_processed)
util::sleep(IDLE_WAIT);
// Keep idling if we are not doing any sync activity.
// Check whether we have any new/changed targets.
if (check_incoming_targets() == -1)
break;
if (!is_syncing)
continue;
// Move the received hpfs responses to the local response list.
swap_collected_responses();
if (fs_mount->acquire_rw_session() == -1)
if (ongoing_targets.empty())
{
LOG_ERROR << "Hpfs " << name << " sync: Failed to start hpfs rw session";
continue;
}
while (!is_shutting_down)
{
hpfs::sync_target current_target = {};
{
std::shared_lock lock(target_mutex);
if (target_list.empty())
{
LOG_INFO << "Hpfs " << name << " sync: All targets complete.";
is_syncing = false;
break;
}
// Set the target to be top of the list.
current_target = target_list.front();
}
// Start syncing towards specified target.
const int result = request_loop(current_target);
pending_requests.clear();
candidate_hpfs_responses.clear();
submitted_requests.clear();
if (is_shutting_down)
break;
if (result != SYNC_HASH_CHANGED && result != SYNC_PRIORITY_CHANGED)
{
// The finished target can be removed from the list.
{
std::unique_lock lock(target_mutex);
target_list.pop_front();
}
// After every sync target abandon or completion, release and reacquire hpfs rw session so hpfs gets some room
// to update the last checkpoint. This helps any upcoming ro sessions to get updated file system state.
reacquire_rw_session();
if (result == SYNC_ABANDONED)
on_sync_target_abandoned();
else if (result == SYNC_ACHIEVED)
on_sync_target_acheived(current_target);
}
continue;
}
fs_mount->release_rw_session();
// Process any sync responses we have received.
prev_responses_processed = process_candidate_responses();
if (is_shutting_down)
break;
// Submit any pending requests to peers.
perform_request_submissions();
}
if (rw_session_active)
fs_mount->release_rw_session();
LOG_INFO << "Hpfs " << name << " sync: Worker stopped.";
}
/**
* Reqest loop which syncs towards the specified target.
* @return 0 when sync is abandoned due to resubmission threshold or shutdown. 1 when target sync hash acheived.
* 2 when target has been re-prioritized. 3 when target hash has changed. -1 on error.
* Checks for any new/updated targets that we have received and safely incorporates them into ongoing sync activity.
* @return o on success. -1 on error.
*/
int hpfs_sync::request_loop(const hpfs::sync_target &target)
int hpfs_sync::check_incoming_targets()
{
LOG_INFO << "Hpfs " << name << " sync: Starting target:" << target.hash << " " << target.vpath;
// Send the initial root hpfs request of the current target.
submit_request(backlog_item{target.item_type, target.vpath, -1, target.hash});
// Indicates whether any responses were processed in the previous loop iteration.
bool prev_responses_processed = false;
// No. of repetitive resubmissions so far. (This is reset whenever we receive a hpfs response)
uint16_t resubmissions_count = 0;
while (true)
{
REQUEST_LOOP_INTERRUPT
// Wait a small delay if there were no responses processed during previous iteration.
if (!prev_responses_processed)
util::sleep(REQUEST_LOOP_WAIT);
// Move the received hpfs responses to the local response list.
swap_collected_responses();
prev_responses_processed = !candidate_hpfs_responses.empty();
// Reset resubmissions counter whenever we have a resposne.
if (!candidate_hpfs_responses.empty())
resubmissions_count = 0;
for (auto &response : candidate_hpfs_responses)
std::unique_lock lock(incoming_targets_mutex);
for (const sync_item &target : incoming_targets)
{
REQUEST_LOOP_INTERRUPT
// If we have an ongoing target with the same vpath but having a different hash, we need to destroy
// that target and insert the updated one.
const std::string from = response.first.substr(2, 10); // Sender pubkey.
const p2pmsg::P2PMsg &msg = *p2pmsg::GetP2PMsg(response.second.data());
const p2pmsg::HpfsResponseMsg &resp_msg = *msg.content_as_HpfsResponseMsg();
// Check whether we are actually waiting for this response. If not, ignore it.
std::string_view hash = msg::fbuf::flatbuf_bytes_to_sv(resp_msg.hash());
std::string_view vpath = msg::fbuf::flatbuf_str_to_sv(resp_msg.path());
const std::string key = std::string(vpath).append(hash);
const auto pending_resp_itr = submitted_requests.find(key);
if (pending_resp_itr == submitted_requests.end())
const auto ex_target = std::find_if(ongoing_targets.begin(), ongoing_targets.end(),
[&](sync_item &t) { return t.vpath == target.vpath; });
if (ex_target == ongoing_targets.end())
{
LOG_DEBUG << "Hpfs " << name << " sync: Skipping response from [" << from << "] because we are not looking for hash:"
<< util::to_hex(hash).substr(0, 10) << " of " << vpath;
continue;
ongoing_targets.push_back(target);
pending_requests.emplace(target); // Places the root request for this target according to priority sorting.
LOG_INFO << "Hpfs " << name << " sync: Target added. Hash:" << target.expected_hash << " " << target.vpath;
}
// Process the message based on response type.
const p2pmsg::HpfsResponse msg_type = resp_msg.content_type();
if (msg_type == p2pmsg::HpfsResponse_HpfsFsEntryResponse)
else if (ex_target->expected_hash != target.expected_hash)
{
const p2pmsg::HpfsFsEntryResponse &fs_resp = *resp_msg.content_as_HpfsFsEntryResponse();
// Existing target's expected hash is obsolete now. Therefore clear all ongoing activity for the obsolete target.
clear_target(ex_target);
// Get fs entries we have received.
std::vector<p2p::hpfs_fs_hash_entry> peer_fs_entries;
p2pmsg::flatbuf_hpfsfshashentries_to_hpfsfshashentries(peer_fs_entries, fs_resp.entries());
ongoing_targets.push_back(target); // Insert the new one to replace the obsolete target.
pending_requests.emplace(target); // Places the root request for this target according to 'sync_item' priority sorting.
// Validate received fs data against the hash.
if (!validate_fs_entry_hash(vpath, hash, fs_resp.dir_mode(), peer_fs_entries))
{
LOG_INFO << "Hpfs " << name << " sync: Skipping mismatched fs entries response from [" << from << "] for " << vpath;
continue;
}
LOG_DEBUG << "Hpfs " << name << " sync: Processing fs entries response from [" << from << "] for " << vpath;
handle_fs_entry_response(vpath, fs_resp.dir_mode(), peer_fs_entries);
LOG_INFO << "Hpfs " << name << " sync: Target updated. New hash:" << target.expected_hash << " " << target.vpath;
}
else if (msg_type == p2pmsg::HpfsResponse_HpfsFileHashMapResponse)
{
const p2pmsg::HpfsFileHashMapResponse &file_resp = *resp_msg.content_as_HpfsFileHashMapResponse();
// File block hashes we received from the peer.
const util::h32 *block_hashes = reinterpret_cast<const util::h32 *>(file_resp.hash_map()->data());
const size_t block_hash_count = file_resp.hash_map()->size() / sizeof(util::h32);
// Validate received hashmap against the hash.
if (!validate_file_hashmap_hash(vpath, hash, file_resp.file_mode(), block_hashes, block_hash_count))
{
LOG_INFO << "Hpfs " << name << " sync: Skipping mismatched hashmap response from [" << from << "] for " << vpath;
continue;
}
std::set<uint32_t> responded_block_ids;
{
const flatbuffers::Vector<uint32_t> *fbvec = file_resp.responded_block_ids();
const uint32_t *ptr = file_resp.responded_block_ids()->data();
const size_t count = file_resp.responded_block_ids()->size();
for (size_t i = 0; i < count; i++)
responded_block_ids.emplace(ptr[i]);
}
LOG_DEBUG << "Hpfs " << name << " sync: Processing file block hashes response from [" << from << "] for " << vpath;
handle_file_hashmap_response(vpath, file_resp.file_mode(), block_hashes, block_hash_count,
responded_block_ids, file_resp.file_length());
}
else if (msg_type == p2pmsg::HpfsResponse_HpfsBlockResponse)
{
const p2pmsg::HpfsBlockResponse &block_resp = *resp_msg.content_as_HpfsBlockResponse();
// Get the file path of the block data we have received.
const uint32_t block_id = block_resp.block_id();
std::string_view buf = msg::fbuf::flatbuf_bytes_to_sv(block_resp.data());
// Validate received block data against the hash.
if (!validate_file_block_hash(hash, block_id, buf))
{
LOG_INFO << "Hpfs " << name << " sync: Skipping mismatched block response from [" << from << "] for block_id:" << block_id
<< " (len:" << buf.length() << ") of " << vpath;
continue;
}
LOG_DEBUG << "Hpfs " << name << " sync: Processing block response from [" << from << "] for block_id:" << block_id
<< " (len:" << buf.length() << ") of " << vpath;
handle_file_block_response(vpath, block_id, buf);
}
// Now that we have received matching hash and handled it, remove it from the waiting list.
submitted_requests.erase(pending_resp_itr);
// After handling each response, check whether we have reached target hpfs state.
// get_hash returns 0 incase target parent is not existing in our side.
util::h32 updated_state = util::h32_empty;
if (fs_mount->get_hash(updated_state, hpfs::RW_SESSION_NAME, target.vpath) == -1)
{
LOG_ERROR << "Hpfs " << name << " sync: Abandoning due to hash check error. " << target.vpath;
return SYNC_ERROR;
}
// Update the central hpfs state tracker.
fs_mount->set_parent_hash(target.vpath, updated_state);
// End the loop if sync is complete.
if (updated_state == target.hash)
{
LOG_INFO << "Hpfs " << name << " sync: Achieved target:" << target.hash << " " << target.vpath;
return SYNC_ACHIEVED;
}
LOG_DEBUG << "Hpfs " << name << " sync: Current:" << updated_state << " | target:" << target.hash << " " << target.vpath;
}
candidate_hpfs_responses.clear();
incoming_targets.clear();
}
// No. of milliseconds to wait before resubmitting a request.
const uint32_t request_resubmit_timeout = hpfs::get_request_resubmit_timeout();
// Check for long-awaited responses and re-request them.
for (auto &[hash, request] : submitted_requests)
// Acquire/release hpfs rw session as needed.
if (!rw_session_active && !ongoing_targets.empty())
{
if (fs_mount->acquire_rw_session() == -1)
{
REQUEST_LOOP_INTERRUPT
LOG_ERROR << "Hpfs " << name << " sync: Failed to start hpfs rw session";
return -1;
}
rw_session_active = true;
}
else if (rw_session_active && ongoing_targets.empty())
{
if (fs_mount->release_rw_session() == -1)
{
LOG_ERROR << "Hpfs " << name << " sync: Failed to release hpfs rw session";
return -1;
}
rw_session_active = false;
}
if (request.waiting_time < request_resubmit_timeout)
return 0;
}
/**
* Clears the specified ongoing target and its associated requests.
* @param target_itr Iterator in the ongoing targets to be erased.
*/
void hpfs_sync::clear_target(const std::vector<hpfs::sync_item>::iterator &target_itr)
{
// Clear pending requests under the obsolete target.
{
auto itr = pending_requests.begin();
while (itr != pending_requests.end())
{
if (itr->vpath.rfind(target_itr->vpath, 0) == 0) // If the request is a sub path of the target's vpath.
pending_requests.erase(itr++);
else
++itr;
}
}
// Clear submitted requests under the obsolete target.
{
auto itr = submitted_requests.begin();
while (itr != submitted_requests.end())
{
if (itr->second.vpath.rfind(target_itr->vpath, 0) == 0) // If the request is a sub path of the target's vpath.
submitted_requests.erase(itr++);
else
++itr;
}
}
ongoing_targets.erase(target_itr); // Clear the obsolete target.
}
/**
* Submits requests from pending collection to peers, based on request throughput availabilty.
*/
void hpfs_sync::perform_request_submissions()
{
// No. of milliseconds to wait before resubmitting a request.
const uint32_t request_resubmit_timeout = hpfs::get_request_resubmit_timeout();
// Check for long-awaited responses and re-request them.
for (auto &[hash, request] : submitted_requests)
{
if (is_shutting_down)
return;
if (request.waiting_time < request_resubmit_timeout)
{
// Increment wait time.
request.waiting_time += IDLE_WAIT;
}
else
{
// If we have exceeded continous resubmission threshold, clear everything (all targets) and go back to idle state.
if (++resubmissions_count > ABANDON_THRESHOLD)
{
// Increment wait time.
request.waiting_time += REQUEST_LOOP_WAIT;
pending_requests.clear();
submitted_requests.clear();
ongoing_targets.clear();
update_sync_status();
LOG_INFO << "Hpfs " << name << " sync: All targets abandoned due to resubmission threshold.";
on_sync_abandoned();
}
else
{
if (++resubmissions_count > ABANDON_THRESHOLD)
{
LOG_INFO << "Hpfs " << name << " sync: Sync abandoned due to resubmission threshold. " << target.vpath;
return SYNC_ABANDONED;
}
// Reset the counter and re-submit request.
request.waiting_time = 0;
submit_request(request, false, true);
}
}
}
// Check whether we can submit any more requests.
if (!pending_requests.empty() && submitted_requests.size() < MAX_AWAITING_REQUESTS)
// Check whether we can submit any more requests from the pending collection.
if (!pending_requests.empty() && submitted_requests.size() < MAX_AWAITING_REQUESTS)
{
const uint16_t available_slots = MAX_AWAITING_REQUESTS - submitted_requests.size();
for (int i = 0; i < available_slots && !pending_requests.empty(); i++)
{
const uint16_t available_slots = MAX_AWAITING_REQUESTS - submitted_requests.size();
for (int i = 0; i < available_slots && !pending_requests.empty(); i++)
{
REQUEST_LOOP_INTERRUPT
if (is_shutting_down)
return;
submit_request(pending_requests.front());
pending_requests.pop_front();
}
submit_request(*pending_requests.begin());
pending_requests.erase(pending_requests.begin());
}
}
return SYNC_ABANDONED;
}
/**
* Indicates whether to break out of hpfs request processing loop and the reason.
* @return 0 if interrupted due to shutdown. 2 when target has been re-prioritized.
* 3 when target hash has changed. -1 if not interrupted.
* Safely updates the global sync status flag based on ongoing and incoming targets.
*/
int hpfs_sync::sync_interrupt(const hpfs::sync_target &target)
void hpfs_sync::update_sync_status()
{
if (is_shutting_down)
std::unique_lock lock(incoming_targets_mutex);
is_syncing = (!incoming_targets.empty() || !ongoing_targets.empty());
}
/**
* Processes any sync responses we have received and updates the local file system state.
* @return Whether any responses were processed or not.
*/
bool hpfs_sync::process_candidate_responses()
{
// Reset resubmissions counter whenever we have a resposne.
if (!candidate_hpfs_responses.empty())
resubmissions_count = 0;
const bool responses_processed = !candidate_hpfs_responses.empty();
for (auto &response : candidate_hpfs_responses)
{
LOG_INFO << "Hpfs " << name << " sync: Sync abandoned due to shutdown. " << target.vpath;
return SYNC_ABANDONED;
if (is_shutting_down)
return false;
const std::string from = response.first.substr(2, 10); // Sender pubkey.
const p2pmsg::P2PMsg &msg = *p2pmsg::GetP2PMsg(response.second.data());
const p2pmsg::HpfsResponseMsg &resp_msg = *msg.content_as_HpfsResponseMsg();
// Check whether we are actually waiting for this response. If not, ignore it.
std::string_view hash = msg::fbuf::flatbuf_bytes_to_sv(resp_msg.hash());
std::string_view vpath = msg::fbuf::flatbuf_str_to_sv(resp_msg.path());
const std::string key = std::string(vpath).append(hash);
const auto pending_resp_itr = submitted_requests.find(key);
if (pending_resp_itr == submitted_requests.end())
{
LOG_DEBUG << "Hpfs " << name << " sync: Skipping response from [" << from << "] because we are not looking for hash:"
<< util::to_hex(hash).substr(0, 10) << " of " << vpath;
continue;
}
// Process the message based on response type.
const p2pmsg::HpfsResponse msg_type = resp_msg.content_type();
if (msg_type == p2pmsg::HpfsResponse_HpfsFsEntryResponse)
{
const p2pmsg::HpfsFsEntryResponse &fs_resp = *resp_msg.content_as_HpfsFsEntryResponse();
// Get fs entries we have received.
std::vector<p2p::hpfs_fs_hash_entry> peer_fs_entries;
p2pmsg::flatbuf_hpfsfshashentries_to_hpfsfshashentries(peer_fs_entries, fs_resp.entries());
// Validate received fs data against the hash.
if (!validate_fs_entry_hash(vpath, hash, fs_resp.dir_mode(), peer_fs_entries))
{
LOG_INFO << "Hpfs " << name << " sync: Skipping mismatched fs entries response from [" << from << "] for " << vpath;
continue;
}
LOG_DEBUG << "Hpfs " << name << " sync: Processing fs entries response from [" << from << "] for " << vpath;
handle_fs_entry_response(vpath, fs_resp.dir_mode(), peer_fs_entries);
}
else if (msg_type == p2pmsg::HpfsResponse_HpfsFileHashMapResponse)
{
const p2pmsg::HpfsFileHashMapResponse &file_resp = *resp_msg.content_as_HpfsFileHashMapResponse();
// File block hashes we received from the peer.
const util::h32 *block_hashes = reinterpret_cast<const util::h32 *>(file_resp.hash_map()->data());
const size_t block_hash_count = file_resp.hash_map()->size() / sizeof(util::h32);
// Validate received hashmap against the hash.
if (!validate_file_hashmap_hash(vpath, hash, file_resp.file_mode(), block_hashes, block_hash_count))
{
LOG_INFO << "Hpfs " << name << " sync: Skipping mismatched hashmap response from [" << from << "] for " << vpath;
continue;
}
std::set<uint32_t> responded_block_ids;
{
const flatbuffers::Vector<uint32_t> *fbvec = file_resp.responded_block_ids();
const uint32_t *ptr = file_resp.responded_block_ids()->data();
const size_t count = file_resp.responded_block_ids()->size();
for (size_t i = 0; i < count; i++)
responded_block_ids.emplace(ptr[i]);
}
LOG_DEBUG << "Hpfs " << name << " sync: Processing file block hashes response from [" << from << "] for " << vpath;
handle_file_hashmap_response(vpath, file_resp.file_mode(), block_hashes, block_hash_count,
responded_block_ids, file_resp.file_length());
}
else if (msg_type == p2pmsg::HpfsResponse_HpfsBlockResponse)
{
const p2pmsg::HpfsBlockResponse &block_resp = *resp_msg.content_as_HpfsBlockResponse();
// Get the file path of the block data we have received.
const uint32_t block_id = block_resp.block_id();
std::string_view buf = msg::fbuf::flatbuf_bytes_to_sv(block_resp.data());
// Validate received block data against the hash.
if (!validate_file_block_hash(hash, block_id, buf))
{
LOG_INFO << "Hpfs " << name << " sync: Skipping mismatched block response from [" << from << "] for block_id:" << block_id
<< " (len:" << buf.length() << ") of " << vpath;
continue;
}
LOG_DEBUG << "Hpfs " << name << " sync: Processing block response from [" << from << "] for block_id:" << block_id
<< " (len:" << buf.length() << ") of " << vpath;
handle_file_block_response(vpath, block_id, buf);
}
// Now that we have received matching hash and handled it successfully, remove it from the waiting list.
submitted_requests.erase(pending_resp_itr);
// After handling each response, check whether we have achieved the target hash.
{
// Find the ongoing target that this response belongs to.
const auto target_itr = TARGET_OF_REQUEST(vpath);
if (target_itr != ongoing_targets.end())
{
const std::string target_vpath = target_itr->vpath;
const util::h32 target_hash = target_itr->expected_hash;
// get_hash returns 0 incase target parent is not existing in our side.
util::h32 updated_hash = util::h32_empty;
if (fs_mount->get_hash(updated_hash, hpfs::RW_SESSION_NAME, target_vpath) == -1)
{
LOG_ERROR << "Hpfs " << name << " sync: Hash check error. " << target_vpath;
}
// Update the central hpfs state tracker.
fs_mount->set_parent_hash(target_vpath, updated_hash);
// This target's sync is complete.
if (updated_hash == target_hash)
{
clear_target(target_itr); // Clear the completed target.
update_sync_status();
LOG_INFO << "Hpfs " << name << " sync: Achieved target:" << target_hash << " " << target_vpath;
// When target achieved, release and reacquire the hpfs rw session. This helps any upcoming
// ro sessions to get updated file system state.
fs_mount->release_rw_session();
util::sleep(HPFS_REAQUIRE_WAIT);
fs_mount->acquire_rw_session();
on_sync_target_acheived(target_vpath, target_hash);
}
LOG_DEBUG << "Hpfs " << name << " sync: Current:" << updated_hash << " | target:" << target_hash << " " << target_vpath;
}
else
{
// We should never hit this error.
LOG_ERROR << "Hpfs " << name << " sync: Process response: Failed to locate target matching " << vpath;
}
}
}
// Stop request loop if the target has changed.
std::shared_lock lock(target_mutex);
candidate_hpfs_responses.clear();
const hpfs::sync_target &top_target = target_list.front();
if (target.vpath != top_target.vpath)
{
// A new high priority target has been set.
LOG_INFO << "Hpfs " << name << " sync: Higher priority target found. Abandoned: " << target.vpath;
return SYNC_PRIORITY_CHANGED;
}
else if (target.hash != top_target.hash)
{
// Hash changed of current target.
LOG_INFO << "Hpfs " << name << " sync: Target updated. New hash:" << top_target.hash << " " << top_target.vpath;
return SYNC_HASH_CHANGED;
}
return -1; // No interrupt.
return responses_processed;
}
/**
@@ -551,27 +564,27 @@ namespace hpfs
* Used for hint response monitoring.
* @param is_resubmit Whether this is a request resubmission or not.
*/
void hpfs_sync::submit_request(const backlog_item &request, const bool watch_only, const bool is_resubmit)
void hpfs_sync::submit_request(const sync_item &request, const bool watch_only, const bool is_resubmit)
{
const std::string key = std::string(request.path)
const std::string key = std::string(request.vpath)
.append(reinterpret_cast<const char *>(&request.expected_hash), sizeof(util::h32));
submitted_requests.try_emplace(key, request);
if (watch_only)
{
LOG_DEBUG << "Hpfs " << name << " sync: Watching response for request. type:" << request.type
<< " path:" << request.path << " block_id:" << request.block_id
<< " path:" << request.vpath << " block_id:" << request.block_id
<< " hash:" << request.expected_hash;
}
else
{
const bool is_file = request.type != BACKLOG_ITEM_TYPE::DIR;
const bool is_file = request.type != SYNC_ITEM_TYPE::DIR;
std::string target_pubkey;
request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, target_pubkey);
request_state_from_peer(request.vpath, is_file, request.block_id, request.expected_hash, target_pubkey);
LOG_DEBUG << "Hpfs " << name << " sync: " << (is_resubmit ? "Re-submitting" : "Submitting")
<< " request to [" << (target_pubkey.empty() ? "" : target_pubkey.substr(2, 10)) << "]. type:" << request.type
<< " path:" << request.path << " block_id:" << request.block_id
<< " path:" << request.vpath << " block_id:" << request.block_id
<< " hash:" << request.expected_hash;
}
}
@@ -608,16 +621,19 @@ namespace hpfs
if (entry.response_type == p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::MISMATCHED)
{
// We must request for this entry. Prioritize file hpfs requests over directories.
if (entry.is_file)
pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, entry.hash});
// We must request for this entry using the same priority level of the root target.
const auto target_itr = TARGET_OF_REQUEST(child_vpath);
if (target_itr != ongoing_targets.end())
pending_requests.emplace(sync_item{
(entry.is_file ? SYNC_ITEM_TYPE::FILE : SYNC_ITEM_TYPE::DIR), child_vpath, -1, entry.hash, target_itr->high_priority});
else
pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, entry.hash});
// We should never hit this error.
LOG_ERROR << "Hpfs " << name << " sync: Handle fs entry response: Failed to locate target matching " << vpath;
}
else if (entry.response_type == p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::RESPONDED)
{
// The peer has already responded with a pre-emptive hint response. So we must start watching for it.
submit_request(backlog_item{entry.is_file ? BACKLOG_ITEM_TYPE::FILE : BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, entry.hash}, true);
submit_request(sync_item{entry.is_file ? SYNC_ITEM_TYPE::FILE : SYNC_ITEM_TYPE::DIR, child_vpath, -1, entry.hash}, true);
}
else if (entry.response_type == p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::NOT_AVAILABLE)
{
@@ -665,12 +681,11 @@ namespace hpfs
if (responded_block_ids.count(block_id) == 1)
{
// The peer has already responded with a hint response. So we must start watching for it.
submit_request(backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]}, true);
submit_request(sync_item{SYNC_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]}, true);
}
else if (block_id >= existing_hash_count || existing_hashes[block_id] != hashes[block_id])
{
// Insert at front to give priority to block requests while preserving block order.
pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]});
pending_requests.emplace(sync_item{SYNC_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]});
}
}
@@ -773,7 +788,7 @@ namespace hpfs
* This method can be used to invoke mount specific custom logic (after overriding this method) to be executed after
* a sync target is acheived.
*/
void hpfs_sync::on_sync_target_acheived(const sync_target &synced_target)
void hpfs_sync::on_sync_target_acheived(const std::string &vpath, const util::h32 &hash)
{
}
@@ -781,36 +796,8 @@ namespace hpfs
* This method can be used to invoke mount specific custom logic (after overriding this method) to be executed after
* a sync is abondened.
*/
void hpfs_sync::on_sync_target_abandoned()
void hpfs_sync::on_sync_abandoned()
{
}
/**
* This method can be used to invoke mount specific custom logic (after overriding this method) to be executed after
* a full sync is complete.
*/
void hpfs_sync::on_sync_complete(const sync_target &last_sync_target)
{
LOG_INFO << "Hpfs " << name << " sync: All targets synced.";
}
/**
* Releases and reacquires the rw session after a short delay.
* This is used to give hpfs some room to update the last checkpoint during long runinng sync operations.
* @return 0 on success. -1 on failure.
*/
int hpfs_sync::reacquire_rw_session()
{
fs_mount->release_rw_session();
util::sleep(HPFS_REAQUIRE_WAIT);
if (fs_mount->acquire_rw_session() == -1)
{
LOG_ERROR << "Hpfs " << name << " sync: Error reacquring rw session.";
return -1;
}
return 0;
}
} // namespace hpfs

View File

@@ -9,7 +9,7 @@
namespace hpfs
{
enum BACKLOG_ITEM_TYPE
enum SYNC_ITEM_TYPE
{
DIR = 0,
FILE = 1,
@@ -17,27 +17,54 @@ namespace hpfs
};
// Represents a queued up state sync operation which needs to be performed.
struct backlog_item
struct sync_item
{
BACKLOG_ITEM_TYPE type = BACKLOG_ITEM_TYPE::DIR;
std::string path;
SYNC_ITEM_TYPE type = SYNC_ITEM_TYPE::DIR;
std::string vpath;
int32_t block_id = -1; // Only relevant if type=BLOCK
util::h32 expected_hash;
bool high_priority = false;
// No. of millisconds that this item has been waiting in pending state.
// Used by pending_responses list to increase waiting time and resubmit request.
uint32_t waiting_time = 0;
};
struct sync_target
{
util::h32 hash = util::h32_empty;
std::string vpath;
BACKLOG_ITEM_TYPE item_type = BACKLOG_ITEM_TYPE::DIR;
bool operator==(const sync_target &target) const
uint32_t priority() const
{
return this->vpath == target.vpath && this->hash == target.hash;
// Lesser value means higher priority.
/**
* Priority order:
* High prio file block
* High prio file hashmap
* High prio dir children
* Low prio file block
* Low prio file hashmap
* Low prio dir children
*/
return ((high_priority ? 1 : 2) * 10) + (type == SYNC_ITEM_TYPE::BLOCK ? 1 : (type == SYNC_ITEM_TYPE::FILE ? 2 : 3));
}
bool operator==(const sync_item &other) const
{
return type == other.type && vpath == other.vpath && block_id == other.block_id && expected_hash == other.expected_hash;
}
bool operator<(const sync_item &other) const
{
const uint32_t prio = priority();
const uint32_t other_prio = other.priority();
if (prio == other_prio)
{
if (vpath == other.vpath)
return block_id < other.block_id;
else
return vpath < other.vpath;
}
else
{
return prio < other_prio;
}
}
};
@@ -47,22 +74,34 @@ namespace hpfs
bool init_success = false;
std::string name; // Name used for logging.
std::shared_mutex target_mutex;
std::list<sync_target> target_list; // The current target hashes we are syncing towards.
std::list<backlog_item> pending_requests; // List of pending sync requests to be sent out.
std::shared_mutex incoming_targets_mutex;
std::set<sync_item> incoming_targets; // The targets that we need to sync towards but have not looked at yet.
std::vector<sync_item> ongoing_targets; // The targets that we have taken into processing.
std::set<sync_item> pending_requests; // List of pending sync requests to be sent out.
// List of submitted requests we are awaiting responses for, keyed by expected response path+hash.
std::unordered_map<std::string, backlog_item> submitted_requests;
std::unordered_map<std::string, sync_item> submitted_requests;
// No. of repetitive resubmissions so far. (This is reset whenever we receive a hpfs response)
uint16_t resubmissions_count = 0;
// Whether the hpfs rw session is running or not.
bool rw_session_active = false;
std::thread hpfs_sync_thread;
std::atomic<bool> is_shutting_down = false;
void hpfs_syncer_loop();
int request_loop(const hpfs::sync_target &target);
int check_incoming_targets();
int sync_interrupt(const hpfs::sync_target &target);
void clear_target(const std::vector<hpfs::sync_item>::iterator &ex_target);
void perform_request_submissions();
void update_sync_status();
bool process_candidate_responses();
bool validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const mode_t dir_mode,
const std::vector<p2p::hpfs_fs_hash_entry> &peer_fs_entries);
@@ -75,7 +114,7 @@ namespace hpfs
void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id,
const util::h32 expected_hash, std::string &target_pubkey);
void submit_request(const backlog_item &request, const bool watch_only = false, const bool is_resubmit = false);
void submit_request(const sync_item &request, const bool watch_only = false, const bool is_resubmit = false);
int handle_fs_entry_response(std::string_view vpath, const mode_t dir_mode, const std::vector<p2p::hpfs_fs_hash_entry> &peer_fs_entries);
@@ -92,11 +131,9 @@ namespace hpfs
hpfs::hpfs_mount *fs_mount = NULL;
virtual void on_sync_target_acheived(const sync_target &synced_target);
virtual void on_sync_target_acheived(const std::string &vpath, const util::h32 &hash);
virtual void on_sync_target_abandoned();
virtual void on_sync_complete(const sync_target &last_sync_target);
virtual void on_sync_abandoned();
// Move the collected responses from hpfs responses to a local response list.
virtual void swap_collected_responses() = 0; // Must override in child classes.
@@ -110,9 +147,8 @@ namespace hpfs
void deinit();
void set_target_push_front(const sync_target &target);
void set_target_push_back(const sync_target &target);
void set_target(const bool is_dir, const std::string &vpath,
const util::h32 &hash, const bool high_priority = false);
};
} // namespace hpfs

View File

@@ -615,7 +615,7 @@ namespace ledger
const int fd = open(prev_shard_hash_file_path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1)
{
LOG_DEBUG << "Cannot read " << prev_shard_hash_file_path;
LOG_ERROR << errno << ": Error reading prev.shard file " << prev_shard_hash_file_path;
return;
}
@@ -630,7 +630,7 @@ namespace ledger
}
const std::string shard_path = std::string(shard_parent_dir).append("/").append(std::to_string(seq_no));
ledger_sync_worker.set_target_push_back(hpfs::sync_target{prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR});
ledger_sync_worker.set_target(true, shard_path, prev_shard_hash_from_file);
}
}

View File

@@ -7,13 +7,13 @@ namespace ledger
{
constexpr const char *HPFS_SESSION_NAME = "ro_shard_sync_status";
void ledger_sync::on_sync_target_acheived(const hpfs::sync_target &synced_target)
void ledger_sync::on_sync_target_acheived(const std::string &vpath, const util::h32 &hash)
{
const std::string shard_hash_file_path = fs_mount->physical_path(hpfs::RW_SESSION_NAME, synced_target.vpath) + PREV_SHARD_HASH_FILENAME;
const std::string shard_hash_file_path = fs_mount->physical_path(hpfs::RW_SESSION_NAME, vpath) + PREV_SHARD_HASH_FILENAME;
const int fd = open(shard_hash_file_path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1)
{
LOG_DEBUG << "Cannot read " << shard_hash_file_path;
LOG_ERROR << errno << ": Error at target achived. Cannot read " << shard_hash_file_path;
return;
}
@@ -26,13 +26,13 @@ namespace ledger
LOG_ERROR << errno << ": Error reading hash file. " << shard_hash_file_path;
return;
}
const size_t pos = synced_target.vpath.find_last_of("/");
const size_t pos = vpath.find_last_of("/");
if (pos == std::string::npos)
{
LOG_ERROR << "Error retreiving shard no from " << synced_target.vpath;
LOG_ERROR << "Error retreiving shard no from " << vpath;
return;
}
const std::string synced_shard_seq_no_str = synced_target.vpath.substr(pos + 1);
const std::string synced_shard_seq_no_str = vpath.substr(pos + 1);
uint64_t synced_shard_seq_no;
if (util::stoull(synced_shard_seq_no_str, synced_shard_seq_no) == -1)
{
@@ -40,7 +40,7 @@ namespace ledger
return;
}
const std::string shard_parent_dir = synced_target.vpath.substr(0, pos);
const std::string shard_parent_dir = vpath.substr(0, pos);
if (shard_parent_dir == PRIMARY_DIR)
{
@@ -52,14 +52,14 @@ namespace ledger
// Persist the lastest synced shard seq number to the max shard meta file.
if (persist_max_shard_seq_no(PRIMARY_DIR, synced_shard_seq_no) == -1)
{
LOG_ERROR << "Error updating max shard meta file in primary shard sync. " << synced_target.vpath;
LOG_ERROR << "Error updating max shard meta file in primary shard sync. " << vpath;
return;
}
const p2p::sequence_hash updated_primary_shard_id{synced_shard_seq_no, synced_target.hash};
const p2p::sequence_hash updated_primary_shard_id{synced_shard_seq_no, hash};
if (get_last_ledger_and_update_context(hpfs::RW_SESSION_NAME, updated_primary_shard_id, false) == -1)
{
LOG_ERROR << "Error updating context from the synced shard " << synced_target.vpath;
LOG_ERROR << "Error updating context from the synced shard " << vpath;
return;
}
ctx.set_last_primary_shard_id(updated_primary_shard_id);
@@ -85,7 +85,7 @@ namespace ledger
&& prev_shard_hash_from_file != prev_shard_hash_from_hpfs) // Continue to sync backwards if the hash from prev_shard.hash is not matching with the shard hash from hpfs.
{
const std::string shard_path = std::string(PRIMARY_DIR).append("/").append(std::to_string(synced_shard_seq_no));
set_target_push_back(hpfs::sync_target{prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR});
set_target(true, shard_path, prev_shard_hash_from_file);
}
else
{
@@ -114,7 +114,7 @@ namespace ledger
}
last_raw_shard_seq_no = synced_shard_seq_no;
ctx.set_last_raw_shard_id(p2p::sequence_hash{synced_shard_seq_no, synced_target.hash});
ctx.set_last_raw_shard_id(p2p::sequence_hash{synced_shard_seq_no, hash});
is_last_raw_shard_syncing = false;
// If existing max shard is older than the max we can keep. Then delete all the existing shards.
@@ -136,7 +136,7 @@ namespace ledger
&& prev_shard_hash_from_file != prev_shard_hash_from_hpfs) // Continue to sync backwards if the hash from prev_shard.hash is not matching with the shard hash from hpfs.
{
const std::string shard_path = std::string(RAW_DIR).append("/").append(std::to_string(synced_shard_seq_no));
set_target_push_back(hpfs::sync_target{prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR});
set_target(true, shard_path, prev_shard_hash_from_file);
}
else
{
@@ -161,7 +161,7 @@ namespace ledger
candidate_hpfs_responses.splice(candidate_hpfs_responses.end(), p2p::ctx.collected_msgs.ledger_hpfs_responses);
}
void ledger_sync::on_sync_target_abandoned()
void ledger_sync::on_sync_abandoned()
{
// Reset these flags since we are abandoning the sync.
is_last_primary_shard_syncing = false;

View File

@@ -12,8 +12,8 @@ namespace ledger
{
private:
void swap_collected_responses();
void on_sync_target_acheived(const hpfs::sync_target &synced_target);
void on_sync_target_abandoned();
void on_sync_target_acheived(const std::string &vpath, const util::h32 &hash);
void on_sync_abandoned();
public:
std::atomic<bool> is_last_primary_shard_syncing = false;

View File

@@ -8,9 +8,9 @@
namespace sc
{
void contract_sync::on_sync_target_acheived(const hpfs::sync_target &synced_target)
void contract_sync::on_sync_target_acheived(const std::string &vpath, const util::h32 &hash)
{
if (synced_target.vpath == PATCH_FILE_PATH)
if (vpath == PATCH_FILE_PATH)
{
// Appling new patch file changes to hpcore runtime.
if (conf::apply_patch_config(hpfs::RW_SESSION_NAME) == -1)
@@ -23,7 +23,7 @@ namespace sc
consensus::refresh_time_config(false);
// Update global hash tracker with the new patch file hash.
fs_mount->set_parent_hash(synced_target.vpath, synced_target.hash);
fs_mount->set_parent_hash(vpath, hash);
}
}
}

View File

@@ -11,7 +11,7 @@ namespace sc
class contract_sync : public hpfs::hpfs_sync
{
private:
void on_sync_target_acheived(const hpfs::sync_target &synced_target);
void on_sync_target_acheived(const std::string &vpath, const util::h32 &hash);
void swap_collected_responses();
};
} // namespace sc

View File

@@ -162,7 +162,10 @@ namespace util
// Create this dir.
if (!error_thrown && mkdir(path.data(), S_IRWXU | S_IRWXG | S_IROTH) == -1)
{
LOG_ERROR << errno << ": Error in recursive dir creation. " << path;
error_thrown = true;
}
if (error_thrown)
return -1;