Request historical shards when max shard range is increased (#266)

* Remove and request historical shards at the startup

* Shard history requesting only at the first consensus round

* Removed test log

* Skip max shard seq no file in removing loop

* Updated the code comments

* Persisting condition changed

* Fixed code comment typos

* Fixed code comment typos

* Resolved PR comments

* Halt consensus until completing only the latest shard sync

* Added meaningful comments

* Resolved PR comments and updated hpfs binary

* Logic enhancement and cleanup

* Cleanup the code comment
This commit is contained in:
Chalith Desaman
2021-03-12 12:13:46 +05:30
committed by GitHub
parent b45cdb999d
commit 00a3da9a2b
8 changed files with 102 additions and 14 deletions

View File

@@ -199,6 +199,7 @@ namespace consensus
const std::string majority_shard_seq_no_str = std::to_string(majority_primary_shard_id.seq_no);
const std::string sync_name = "primary shard " + majority_shard_seq_no_str;
const std::string shard_path = std::string(ledger::PRIMARY_DIR).append("/").append(majority_shard_seq_no_str);
ledger::ledger_sync_worker.is_last_primary_shard_syncing = true;
ledger::ledger_sync_worker.set_target_push_front(hpfs::sync_target{sync_name, majority_primary_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR});
}
@@ -238,10 +239,24 @@ namespace consensus
const std::string majority_shard_seq_no_str = std::to_string(majority_blob_shard_id.seq_no);
const std::string sync_name = "blob shard " + majority_shard_seq_no_str;
const std::string shard_path = std::string(ledger::BLOB_DIR).append("/").append(majority_shard_seq_no_str);
ledger::ledger_sync_worker.is_last_blob_shard_syncing = true;
ledger::ledger_sync_worker.set_target_push_back(hpfs::sync_target{sync_name, majority_blob_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR});
}
}
// If shards aren't aligned with max shard count, Do the relevant shard cleanups and requests.
// In the first consensus round sync completion after the startup.
if (!ledger::ledger_sync_worker.is_syncing && (!ledger::ctx.primary_shards_persisted || !ledger::ctx.blob_shards_persisted) && ledger::ledger_fs.acquire_rw_session() != -1)
{
if (!ledger::ctx.primary_shards_persisted)
ledger::persist_shard_history(majority_primary_shard_id.seq_no, ledger::PRIMARY_DIR);
if (!ledger::ctx.blob_shards_persisted)
ledger::persist_shard_history(majority_blob_shard_id.seq_no, ledger::BLOB_DIR);
ledger::ledger_fs.release_rw_session();
}
// Proceed further only if last primary shard, last blob shard, state and patch hashes are in sync with majority.
if (!is_last_primary_shard_desync && !is_last_blob_shard_desync && !is_state_desync && !is_patch_desync)
{
@@ -263,7 +278,8 @@ namespace consensus
*/
void check_sync_completion()
{
if (conf::cfg.node.role == conf::ROLE::OBSERVER && !sc::contract_sync_worker.is_syncing && !ledger::ledger_sync_worker.is_syncing)
// In ledger sync we only concern about last shard sync status to proceed with consensus.
if (conf::cfg.node.role == conf::ROLE::OBSERVER && !sc::contract_sync_worker.is_syncing && !ledger::ledger_sync_worker.is_last_primary_shard_syncing && !ledger::ledger_sync_worker.is_last_blob_shard_syncing)
conf::change_role(conf::ROLE::VALIDATOR);
}

View File

@@ -242,7 +242,7 @@ namespace hpfs
const int fd = open(file_path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1)
{
LOG_ERROR << errno << ": Open failed. " << file_path;
LOG_ERROR << errno << ": Open failed " << file_path;
result = -1;
}
else

View File

@@ -274,7 +274,7 @@ namespace ledger
/**
* Remove old shards that exceeds max shard range from file system.
* @param led_shard_no minimum shard number to be in history.
* @param led_shard_no Minimum shard number to be in history.
*/
void remove_old_shards(const uint64_t led_shard_no, std::string_view shard_parent_dir)
{
@@ -297,6 +297,76 @@ namespace ledger
}
}
/**
* Cleanup and request historical shards according to the max we can keep.
* @param shard_seq_no Latest shard sequence number.
* @param shard_parent_dir Shard parent directory.
*/
void persist_shard_history(const uint64_t shard_seq_no, std::string_view shard_parent_dir)
{
// Skip if shard cleanup and requesting has been already done.
if ((shard_parent_dir == PRIMARY_DIR && ctx.primary_shards_persisted) || (shard_parent_dir == BLOB_DIR && ctx.blob_shards_persisted))
return;
// Set persisted flag to true. So this cleanup won't get executed again.
shard_parent_dir == PRIMARY_DIR ? ctx.primary_shards_persisted = true : ctx.blob_shards_persisted = true;
const std::string shard_dir_path = std::string(ledger_fs.physical_path(hpfs::RW_SESSION_NAME, shard_parent_dir));
const uint64_t max_shard_count = shard_dir_path == PRIMARY_DIR ? conf::cfg.node.history_config.max_primary_shards : conf::cfg.node.history_config.max_blob_shards;
const std::list<std::string> shard_list = util::fetch_dir_entries(shard_dir_path);
// Skip the sequence no file from the count.
uint64_t shard_count = shard_list.size() - 1;
// First, In history custom mode remove all the historical shards which are older than the min we can keep.
if (conf::cfg.node.history == conf::HISTORY::CUSTOM && shard_seq_no >= max_shard_count)
{
for (const std::string &shard : shard_list)
{
// Skip the sequence no file.
if (("/" + shard) == SHARD_SEQ_NO_FILENAME)
continue;
uint64_t seq_no;
if (util::stoull(shard, seq_no) != -1 && seq_no <= (shard_seq_no - max_shard_count))
{
const std::string shard_path = std::string(shard_dir_path).append("/").append(shard);
if (util::is_dir_exists(shard_path) && util::remove_directory_recursively(shard_path) == -1)
LOG_ERROR << errno << ": Error deleting shard: " << shard;
else
shard_count--;
}
}
}
// In full history mode request for all the historical nodes if not exists, Otherwise request if max count haven't reached
if (shard_seq_no >= shard_count && (conf::cfg.node.history == conf::HISTORY::FULL || shard_count < max_shard_count))
{
const uint64_t seq_no = shard_seq_no - shard_count;
const std::string prev_shard_hash_file_path = shard_dir_path + "/" + std::to_string(seq_no + 1) + PREV_SHARD_HASH_FILENAME;
const int fd = open(prev_shard_hash_file_path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1)
{
LOG_DEBUG << "Cannot read " << prev_shard_hash_file_path;
return;
}
util::h32 prev_shard_hash_from_file;
// Start reading hash excluding hp_version header.
const int res = pread(fd, &prev_shard_hash_from_file, sizeof(util::h32), util::HP_VERSION_HEADER_SIZE);
close(fd);
if (res == -1)
{
LOG_ERROR << errno << ": Error reading hash file. " << prev_shard_hash_file_path;
return;
}
const std::string sync_name = (shard_parent_dir == PRIMARY_DIR ? "primary" : "blob") + std::string(" shard ") + std::to_string(seq_no);
const std::string shard_path = std::string(shard_parent_dir).append("/").append(std::to_string(seq_no));
ledger_sync_worker.set_target_push_back(hpfs::sync_target{sync_name, prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR});
}
}
/**
* Save raw data from the consensused proposal. A blob file is only created if there is any user inputs or contract outputs
* to save disk space.

View File

@@ -25,6 +25,11 @@ namespace ledger
p2p::sequence_hash last_blob_shard_id;
public:
// These flags will be marked as true after doing the shards cleanup and requesting
// at the first consensus round to align with the max shard counts.
std::atomic<bool> primary_shards_persisted = false;
std::atomic<bool> blob_shards_persisted = false;
const p2p::sequence_hash get_lcl_id()
{
std::shared_lock lock(lcl_mutex);
@@ -87,6 +92,8 @@ namespace ledger
void remove_old_shards(const uint64_t led_shard_no, std::string_view shard_parent_dir);
void persist_shard_history(const uint64_t shard_seq_no, std::string_view shard_parent_dir);
int get_last_ledger_and_update_context(std::string_view session_name, const p2p::sequence_hash &last_primary_shard_id);
int get_last_shard_info(std::string_view session_name, p2p::sequence_hash &last_shard_id, const std::string &shard_parent_dir);

View File

@@ -27,17 +27,6 @@ namespace ledger
return -1;
}
if (conf::cfg.node.history == conf::HISTORY::CUSTOM)
{
//Remove old primary shards that exceeds max shard range.
if (last_primary_shard_id.seq_no >= conf::cfg.node.history_config.max_primary_shards)
remove_old_shards(last_primary_shard_id.seq_no - conf::cfg.node.history_config.max_primary_shards + 1, PRIMARY_DIR);
//Remove old blob shards that exceeds max shard range.
if (last_blob_shard_id.seq_no >= conf::cfg.node.history_config.max_blob_shards)
remove_old_shards(last_blob_shard_id.seq_no - conf::cfg.node.history_config.max_blob_shards + 1, BLOB_DIR);
}
if (release_rw_session() == -1)
{
LOG_ERROR << "Failed to release rw session at mount " << mount_dir << ".";

View File

@@ -68,6 +68,7 @@ namespace ledger
}
ctx.set_last_primary_shard_id(updated_primary_shard_id);
last_primary_shard_seq_no = synced_shard_seq_no;
is_last_primary_shard_syncing = false;
}
if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all shards if this is a full history node.
@@ -116,6 +117,7 @@ namespace ledger
last_blob_shard_seq_no = synced_shard_seq_no;
ctx.set_last_blob_shard_id(p2p::sequence_hash{synced_shard_seq_no, synced_target.hash});
is_last_blob_shard_syncing = false;
}
if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all blob shards if this is a full history node.

View File

@@ -13,6 +13,10 @@ namespace ledger
private:
void swap_collected_responses();
void on_current_sync_state_acheived(const hpfs::sync_target &synced_target);
public:
std::atomic<bool> is_last_primary_shard_syncing = false;
std::atomic<bool> is_last_blob_shard_syncing = false;
};
} // namespace ledger
#endif

Binary file not shown.