Optimizing finding last shard sequence number. (#253)

* Optimizing finding last shard in primary and blob.

* Resolving PR comments and bug fix.

* Resolving PR comments.
This commit is contained in:
Savinda Senevirathne
2021-02-22 10:47:55 +05:30
committed by GitHub
parent a775b0e419
commit ff197a8bb6
5 changed files with 125 additions and 72 deletions

View File

@@ -49,12 +49,6 @@ namespace ledger
return -1;
}
if (get_last_ledger_and_update_context() == -1)
{
LOG_ERROR << "Getting last ledger faild.";
return -1;
}
return 0;
}
@@ -237,6 +231,13 @@ namespace ledger
return -1;
}
close(fd);
// Persist newly created shard seq number as the max primary shard seq number.
if (persist_max_shard_seq_no(PRIMARY_DIR, shard_seq_no) == -1)
{
LOG_ERROR << "Error persisting maximum primary shard sequnce number.";
return -1;
}
}
else if (sqlite::open_db(shard_path + "/" + DATEBASE, db) == -1)
{
@@ -343,6 +344,13 @@ namespace ledger
return -1;
}
close(fd);
// Persist newly created shard seq number as the max blob shard seq number.
if (persist_max_shard_seq_no(BLOB_DIR, last_blob_shard_seq_no) == -1)
{
LOG_ERROR << "Error persisting maximum blob shard sequnce number.";
return -1;
}
}
ledger_blob blob;
@@ -412,91 +420,118 @@ namespace ledger
/**
* Get last ledger and update the context.
* @param session_name Hpfs session name.
* @param last_primary_shard_id Last primary shard id.
* @return Returns 0 on success -1 on error.
*/
int get_last_ledger_and_update_context()
int get_last_ledger_and_update_context(std::string_view session_name, const p2p::sequence_hash &last_primary_shard_id)
{
// Aquire hpfs rw session before accessing shards and insert ledger records.
if (ledger_fs.acquire_rw_session() == -1)
sqlite3 *db = NULL;
const std::string shard_path = ledger_fs.physical_path(session_name, ledger::PRIMARY_DIR) + "/" + std::to_string(last_primary_shard_id.seq_no);
if (last_primary_shard_id.seq_no == 0 && last_primary_shard_id.hash == util::h32_empty)
{
// This is the genesis ledger.
ctx.set_lcl_id(p2p::sequence_hash{0, util::h32_empty});
return 0;
}
if (sqlite::open_db(shard_path + "/" + DATEBASE, &db) == -1)
{
LOG_ERROR << errno << ": Error openning the shard database, shard: " << last_primary_shard_id.seq_no;
return -1;
const std::string shard_dir_path = ledger_fs.physical_path(hpfs::RW_SESSION_NAME, ledger::PRIMARY_DIR);
std::list<std::string> shards = util::fetch_dir_entries(shard_dir_path);
if (shards.size() == 0)
{
ledger_fs.release_rw_session();
p2p::sequence_hash lcl_id;
lcl_id.seq_no = 0;
// This is the genesis ledger.
lcl_id.hash = util::h32_empty;
ctx.set_lcl_id(lcl_id);
}
else
{
sqlite3 *db = NULL;
shards.sort([](std::string &a, std::string &b) {
uint64_t seq_no_a, seq_no_b;
util::stoull(a, seq_no_a);
util::stoull(b, seq_no_b);
return seq_no_a > seq_no_b;
});
const sqlite::ledger last_ledger = sqlite::get_last_ledger(db);
sqlite::close_db(&db);
uint64_t last_primary_shard_seq_no;
util::stoull(shards.front(), last_primary_shard_seq_no);
const std::string shard_path = std::string(shard_dir_path).append("/").append(shards.front());
// Open a database connection.
if (sqlite::open_db(shard_path + "/" + DATEBASE, &db) == -1)
{
LOG_ERROR << errno << ": Error openning the shard database, shard: " << shards.front();
ledger_fs.release_rw_session();
return -1;
}
sqlite::ledger last_ledger = sqlite::get_last_ledger(db);
sqlite::close_db(&db);
p2p::sequence_hash lcl_id;
lcl_id.seq_no = last_ledger.seq_no;
lcl_id.hash = util::to_bin(last_ledger.ledger_hash_hex);
ctx.set_lcl_id(lcl_id);
ledger_fs.release_rw_session();
}
// Update new lcl information.
p2p::sequence_hash lcl_id;
lcl_id.seq_no = last_ledger.seq_no;
lcl_id.hash = util::to_bin(last_ledger.ledger_hash_hex);
ctx.set_lcl_id(lcl_id);
return 0;
}
/**
* Get the hash and shard sequence number of the last shard in the ledger primary directory.
* Get the hash and shard sequence number of the last shard in the given parent directory.
* @param session_name Hpfs session name.
* @param last_shard_id Struct which holds last shard data. (sequence number and hash).
* @param shard_parent_dir Parent director vpath of the shards.
* @return
*/
int get_last_shard_info(std::string_view session_name, p2p::sequence_hash &last_shard_id, std::string_view shard_parent_dir)
int get_last_shard_info(std::string_view session_name, p2p::sequence_hash &last_shard_id, const std::string &shard_parent_dir)
{
const std::string shard_dir_path = ledger_fs.physical_path(session_name, shard_parent_dir);
std::list<std::string> shards = util::fetch_dir_entries(shard_dir_path);
const std::string last_shard_seq_no_vpath = shard_parent_dir + SHARD_SEQ_NO_FILENAME;
const std::string last_shard_seq_no_path = ledger_fs.physical_path(session_name, last_shard_seq_no_vpath);
if (shards.size() > 0)
const int fd = open(last_shard_seq_no_path.data(), O_RDONLY, FILE_PERMS);
if (fd == -1)
{
shards.sort([](std::string &a, std::string &b) {
uint64_t seq_no_a, seq_no_b;
util::stoull(a, seq_no_a);
util::stoull(b, seq_no_b);
return seq_no_a > seq_no_b;
});
const std::string shard_path = std::string(shard_parent_dir).append("/").append(shards.front());
if (ledger_fs.get_hash(last_shard_id.hash, session_name, shard_path) == -1 || util::stoull(shards.front(), last_shard_id.seq_no) == -1)
if (errno == ENOENT)
{
LOG_ERROR << "Error reading last shard hash in " << shard_path;
LOG_DEBUG << "Max shard sequence meta file not found. Starting from zero. " << last_shard_seq_no_path;
// Return defaults of sequence hash(0 for shard_seq_no and empty hash for shard hash).
last_shard_id = {};
return 0;
}
else
{
LOG_ERROR << errno << ": Error opening meta " << last_shard_seq_no_path;
return -1;
}
}
uint8_t last_shard_seq_no_buf[8];
if (read(fd, last_shard_seq_no_buf, 8) == -1)
{
LOG_ERROR << errno << ": Error reading " << last_shard_seq_no_path;
close(fd);
return -1;
}
close(fd);
last_shard_id.seq_no = util::uint64_from_bytes(last_shard_seq_no_buf);
const std::string shard_path = std::string(shard_parent_dir).append("/").append(std::to_string(last_shard_id.seq_no));
if (ledger_fs.get_hash(last_shard_id.hash, session_name, shard_path) == -1)
{
LOG_ERROR << "Error reading last shard hash in " << shard_path;
return -1;
}
return 0;
}
/**
* Update max_shard.seq_no meta file with the given latest shard sequence number which can be used to identify last shard
* sequence number in startup.
* @param shard_parent_dir Shard's parent directory. (primary or blob).
* @param last_shard_seq_no Last shard sequence number of the given parent.
* @return Return -1 on error and 0 on success.
*/
int persist_max_shard_seq_no(const std::string &shard_parent_dir, const uint64_t last_shard_seq_no)
{
const std::string last_shard_seq_no_vpath = shard_parent_dir + SHARD_SEQ_NO_FILENAME;
const std::string last_shard_seq_no_path = ledger_fs.physical_path(hpfs::RW_SESSION_NAME, last_shard_seq_no_vpath);
// Write the prev_shard.hash to the new folder.
const int fd = open(last_shard_seq_no_path.data(), O_CREAT | O_RDWR, FILE_PERMS);
if (fd == -1)
{
LOG_ERROR << errno << ": Error opening " << last_shard_seq_no_path;
return -1;
}
uint8_t seq_no_byte_str[8];
util::uint64_to_bytes(seq_no_byte_str, last_shard_seq_no);
if (write(fd, &seq_no_byte_str, 8) == -1)
{
LOG_ERROR << errno << ": Error updating the max_shard.seq_no file for shard " << std::to_string(last_shard_seq_no);
close(fd);
return -1;
}
close(fd);
return 0;
}
} // namespace ledger

View File

@@ -87,9 +87,11 @@ namespace ledger
void remove_old_shards(const uint64_t led_shard_no, std::string_view shard_parent_dir);
int get_last_ledger_and_update_context();
int get_last_ledger_and_update_context(std::string_view session_name, const p2p::sequence_hash &last_primary_shard_id);
int get_last_shard_info(std::string_view session_name, p2p::sequence_hash &last_shard_id, std::string_view shard_parent_dir);
int get_last_shard_info(std::string_view session_name, p2p::sequence_hash &last_shard_id, const std::string &shard_parent_dir);
int persist_max_shard_seq_no(const std::string &shard_parent_dir, const uint64_t last_shard_seq_no);
} // namespace ledger

View File

@@ -16,6 +16,7 @@ namespace ledger
if (start_ro_session(session_name, true) == -1 ||
get_last_shard_info(session_name, last_primary_shard_id, PRIMARY_DIR) == -1 ||
get_last_ledger_and_update_context(session_name, last_primary_shard_id) == -1 ||
get_last_shard_info(session_name, last_blob_shard_id, BLOB_DIR) == -1 ||
stop_ro_session(session_name) == -1)
{

View File

@@ -11,6 +11,7 @@ namespace ledger
constexpr const char *PRIMARY_DIR = "/primary"; // Ledger primary directory name.
constexpr const char *BLOB_DIR = "/blob"; // Ledger blob directory name.
constexpr const char *PREV_SHARD_HASH_FILENAME = "/prev_shard.hash"; // Previous shard hash file name.
constexpr const char *SHARD_SEQ_NO_FILENAME = "/max_shard.seq_no"; // Meta file containing the maximum shard seq number information.
/**
* Represents ledger file system mount.
*/

View File

@@ -48,13 +48,20 @@ namespace ledger
uint64_t last_primary_shard_seq_no = ctx.get_last_primary_shard_id().seq_no;
if (last_primary_shard_seq_no <= synced_shard_seq_no)
{
if (get_last_ledger_and_update_context() == -1)
// Persist the lastest synced shard seq number to the max shard meta file.
if (persist_max_shard_seq_no(PRIMARY_DIR, synced_shard_seq_no) == -1)
{
LOG_ERROR << "Error updating max shard meta file in primary shard sync.";
return;
}
const p2p::sequence_hash updated_primary_shard_id{synced_shard_seq_no, synced_target.hash};
if (get_last_ledger_and_update_context(hpfs::RW_SESSION_NAME, updated_primary_shard_id) == -1)
{
LOG_ERROR << "Error updating context from the synced shard " << synced_target.name;
return;
}
ctx.set_last_primary_shard_id(updated_primary_shard_id);
last_primary_shard_seq_no = synced_shard_seq_no;
ctx.set_last_primary_shard_id(p2p::sequence_hash{synced_shard_seq_no, synced_target.hash});
}
if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all shards if this is a full history node.
@@ -90,10 +97,17 @@ namespace ledger
uint64_t last_blob_shard_seq_no = ctx.get_last_blob_shard_id().seq_no;
if (last_blob_shard_seq_no <= synced_shard_seq_no)
{
// Persist the lastest synced shard seq number to the max shard meta file.
if (persist_max_shard_seq_no(BLOB_DIR, synced_shard_seq_no) == -1)
{
LOG_ERROR << "Error updating max shard meta file in blob shard sync.";
return;
}
last_blob_shard_seq_no = synced_shard_seq_no;
ctx.set_last_blob_shard_id(p2p::sequence_hash{synced_shard_seq_no, synced_target.hash});
}
if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all blob shards if this is a full history node.
last_blob_shard_seq_no - synced_shard_seq_no + 1 < conf::cfg.node.history_config.max_blob_shards)
{