From ff197a8bb63ff06d7afd6de208e8962740998704 Mon Sep 17 00:00:00 2001 From: Savinda Senevirathne Date: Mon, 22 Feb 2021 10:47:55 +0530 Subject: [PATCH] Optimizing finding last shard sequence number. (#253) * Optimizing finding last shard in primary and blob. * Resolving PR comments and bug fix. * Resolving PR comments. --- src/ledger/ledger.cpp | 169 ++++++++++++++++++++++-------------- src/ledger/ledger.hpp | 6 +- src/ledger/ledger_mount.cpp | 1 + src/ledger/ledger_mount.hpp | 1 + src/ledger/ledger_sync.cpp | 20 ++++- 5 files changed, 125 insertions(+), 72 deletions(-) diff --git a/src/ledger/ledger.cpp b/src/ledger/ledger.cpp index 9287e872..0a5159b2 100644 --- a/src/ledger/ledger.cpp +++ b/src/ledger/ledger.cpp @@ -49,12 +49,6 @@ namespace ledger return -1; } - if (get_last_ledger_and_update_context() == -1) - { - LOG_ERROR << "Getting last ledger faild."; - return -1; - } - return 0; } @@ -237,6 +231,13 @@ namespace ledger return -1; } close(fd); + + // Persist newly created shard seq number as the max primary shard seq number. + if (persist_max_shard_seq_no(PRIMARY_DIR, shard_seq_no) == -1) + { + LOG_ERROR << "Error persisting maximum primary shard sequnce number."; + return -1; + } } else if (sqlite::open_db(shard_path + "/" + DATEBASE, db) == -1) { @@ -343,6 +344,13 @@ namespace ledger return -1; } close(fd); + + // Persist newly created shard seq number as the max blob shard seq number. + if (persist_max_shard_seq_no(BLOB_DIR, last_blob_shard_seq_no) == -1) + { + LOG_ERROR << "Error persisting maximum blob shard sequnce number."; + return -1; + } } ledger_blob blob; @@ -412,91 +420,118 @@ namespace ledger /** * Get last ledger and update the context. + * @param session_name Hpfs session name. + * @param last_primary_shard_id Last primary shard id. * @return Returns 0 on success -1 on error. */ - int get_last_ledger_and_update_context() + int get_last_ledger_and_update_context(std::string_view session_name, const p2p::sequence_hash &last_primary_shard_id) { - // Aquire hpfs rw session before accessing shards and insert ledger records. - if (ledger_fs.acquire_rw_session() == -1) + sqlite3 *db = NULL; + const std::string shard_path = ledger_fs.physical_path(session_name, ledger::PRIMARY_DIR) + "/" + std::to_string(last_primary_shard_id.seq_no); + + if (last_primary_shard_id.seq_no == 0 && last_primary_shard_id.hash == util::h32_empty) + { + // This is the genesis ledger. + ctx.set_lcl_id(p2p::sequence_hash{0, util::h32_empty}); + return 0; + } + + if (sqlite::open_db(shard_path + "/" + DATEBASE, &db) == -1) + { + LOG_ERROR << errno << ": Error openning the shard database, shard: " << last_primary_shard_id.seq_no; return -1; - - const std::string shard_dir_path = ledger_fs.physical_path(hpfs::RW_SESSION_NAME, ledger::PRIMARY_DIR); - std::list shards = util::fetch_dir_entries(shard_dir_path); - - if (shards.size() == 0) - { - ledger_fs.release_rw_session(); - p2p::sequence_hash lcl_id; - lcl_id.seq_no = 0; - // This is the genesis ledger. - lcl_id.hash = util::h32_empty; - ctx.set_lcl_id(lcl_id); } - else - { - sqlite3 *db = NULL; - shards.sort([](std::string &a, std::string &b) { - uint64_t seq_no_a, seq_no_b; - util::stoull(a, seq_no_a); - util::stoull(b, seq_no_b); - return seq_no_a > seq_no_b; - }); + const sqlite::ledger last_ledger = sqlite::get_last_ledger(db); + sqlite::close_db(&db); - uint64_t last_primary_shard_seq_no; - util::stoull(shards.front(), last_primary_shard_seq_no); - const std::string shard_path = std::string(shard_dir_path).append("/").append(shards.front()); - - // Open a database connection. - if (sqlite::open_db(shard_path + "/" + DATEBASE, &db) == -1) - { - LOG_ERROR << errno << ": Error openning the shard database, shard: " << shards.front(); - ledger_fs.release_rw_session(); - return -1; - } - - sqlite::ledger last_ledger = sqlite::get_last_ledger(db); - sqlite::close_db(&db); - - p2p::sequence_hash lcl_id; - lcl_id.seq_no = last_ledger.seq_no; - lcl_id.hash = util::to_bin(last_ledger.ledger_hash_hex); - ctx.set_lcl_id(lcl_id); - - ledger_fs.release_rw_session(); - } + // Update new lcl information. + p2p::sequence_hash lcl_id; + lcl_id.seq_no = last_ledger.seq_no; + lcl_id.hash = util::to_bin(last_ledger.ledger_hash_hex); + ctx.set_lcl_id(lcl_id); return 0; } /** - * Get the hash and shard sequence number of the last shard in the ledger primary directory. + * Get the hash and shard sequence number of the last shard in the given parent directory. * @param session_name Hpfs session name. * @param last_shard_id Struct which holds last shard data. (sequence number and hash). * @param shard_parent_dir Parent director vpath of the shards. * @return */ - int get_last_shard_info(std::string_view session_name, p2p::sequence_hash &last_shard_id, std::string_view shard_parent_dir) + int get_last_shard_info(std::string_view session_name, p2p::sequence_hash &last_shard_id, const std::string &shard_parent_dir) { - const std::string shard_dir_path = ledger_fs.physical_path(session_name, shard_parent_dir); - std::list shards = util::fetch_dir_entries(shard_dir_path); + const std::string last_shard_seq_no_vpath = shard_parent_dir + SHARD_SEQ_NO_FILENAME; + const std::string last_shard_seq_no_path = ledger_fs.physical_path(session_name, last_shard_seq_no_vpath); - if (shards.size() > 0) + const int fd = open(last_shard_seq_no_path.data(), O_RDONLY, FILE_PERMS); + if (fd == -1) { - shards.sort([](std::string &a, std::string &b) { - uint64_t seq_no_a, seq_no_b; - util::stoull(a, seq_no_a); - util::stoull(b, seq_no_b); - return seq_no_a > seq_no_b; - }); - - const std::string shard_path = std::string(shard_parent_dir).append("/").append(shards.front()); - if (ledger_fs.get_hash(last_shard_id.hash, session_name, shard_path) == -1 || util::stoull(shards.front(), last_shard_id.seq_no) == -1) + if (errno == ENOENT) { - LOG_ERROR << "Error reading last shard hash in " << shard_path; + LOG_DEBUG << "Max shard sequence meta file not found. Starting from zero. " << last_shard_seq_no_path; + // Return defaults of sequence hash(0 for shard_seq_no and empty hash for shard hash). + last_shard_id = {}; + return 0; + } + else + { + LOG_ERROR << errno << ": Error opening meta " << last_shard_seq_no_path; return -1; } } + uint8_t last_shard_seq_no_buf[8]; + if (read(fd, last_shard_seq_no_buf, 8) == -1) + { + + LOG_ERROR << errno << ": Error reading " << last_shard_seq_no_path; + close(fd); + return -1; + } + close(fd); + + last_shard_id.seq_no = util::uint64_from_bytes(last_shard_seq_no_buf); + const std::string shard_path = std::string(shard_parent_dir).append("/").append(std::to_string(last_shard_id.seq_no)); + if (ledger_fs.get_hash(last_shard_id.hash, session_name, shard_path) == -1) + { + LOG_ERROR << "Error reading last shard hash in " << shard_path; + return -1; + } + + return 0; + } + + + /** + * Update max_shard.seq_no meta file with the given latest shard sequence number which can be used to identify last shard + * sequence number in startup. + * @param shard_parent_dir Shard's parent directory. (primary or blob). + * @param last_shard_seq_no Last shard sequence number of the given parent. + * @return Return -1 on error and 0 on success. + */ + int persist_max_shard_seq_no(const std::string &shard_parent_dir, const uint64_t last_shard_seq_no) + { + const std::string last_shard_seq_no_vpath = shard_parent_dir + SHARD_SEQ_NO_FILENAME; + const std::string last_shard_seq_no_path = ledger_fs.physical_path(hpfs::RW_SESSION_NAME, last_shard_seq_no_vpath); + + // Write the prev_shard.hash to the new folder. + const int fd = open(last_shard_seq_no_path.data(), O_CREAT | O_RDWR, FILE_PERMS); + if (fd == -1) + { + LOG_ERROR << errno << ": Error opening " << last_shard_seq_no_path; + return -1; + } + uint8_t seq_no_byte_str[8]; + util::uint64_to_bytes(seq_no_byte_str, last_shard_seq_no); + if (write(fd, &seq_no_byte_str, 8) == -1) + { + LOG_ERROR << errno << ": Error updating the max_shard.seq_no file for shard " << std::to_string(last_shard_seq_no); + close(fd); + return -1; + } + close(fd); return 0; } } // namespace ledger \ No newline at end of file diff --git a/src/ledger/ledger.hpp b/src/ledger/ledger.hpp index b311fb1c..948a36ec 100644 --- a/src/ledger/ledger.hpp +++ b/src/ledger/ledger.hpp @@ -87,9 +87,11 @@ namespace ledger void remove_old_shards(const uint64_t led_shard_no, std::string_view shard_parent_dir); - int get_last_ledger_and_update_context(); + int get_last_ledger_and_update_context(std::string_view session_name, const p2p::sequence_hash &last_primary_shard_id); - int get_last_shard_info(std::string_view session_name, p2p::sequence_hash &last_shard_id, std::string_view shard_parent_dir); + int get_last_shard_info(std::string_view session_name, p2p::sequence_hash &last_shard_id, const std::string &shard_parent_dir); + + int persist_max_shard_seq_no(const std::string &shard_parent_dir, const uint64_t last_shard_seq_no); } // namespace ledger diff --git a/src/ledger/ledger_mount.cpp b/src/ledger/ledger_mount.cpp index b2090a9c..fbc4a478 100644 --- a/src/ledger/ledger_mount.cpp +++ b/src/ledger/ledger_mount.cpp @@ -16,6 +16,7 @@ namespace ledger if (start_ro_session(session_name, true) == -1 || get_last_shard_info(session_name, last_primary_shard_id, PRIMARY_DIR) == -1 || + get_last_ledger_and_update_context(session_name, last_primary_shard_id) == -1 || get_last_shard_info(session_name, last_blob_shard_id, BLOB_DIR) == -1 || stop_ro_session(session_name) == -1) { diff --git a/src/ledger/ledger_mount.hpp b/src/ledger/ledger_mount.hpp index b0b947ff..996eee6a 100644 --- a/src/ledger/ledger_mount.hpp +++ b/src/ledger/ledger_mount.hpp @@ -11,6 +11,7 @@ namespace ledger constexpr const char *PRIMARY_DIR = "/primary"; // Ledger primary directory name. constexpr const char *BLOB_DIR = "/blob"; // Ledger blob directory name. constexpr const char *PREV_SHARD_HASH_FILENAME = "/prev_shard.hash"; // Previous shard hash file name. + constexpr const char *SHARD_SEQ_NO_FILENAME = "/max_shard.seq_no"; // Meta file containing the maximum shard seq number information. /** * Represents ledger file system mount. */ diff --git a/src/ledger/ledger_sync.cpp b/src/ledger/ledger_sync.cpp index 82031811..1933b9ea 100644 --- a/src/ledger/ledger_sync.cpp +++ b/src/ledger/ledger_sync.cpp @@ -48,13 +48,20 @@ namespace ledger uint64_t last_primary_shard_seq_no = ctx.get_last_primary_shard_id().seq_no; if (last_primary_shard_seq_no <= synced_shard_seq_no) { - if (get_last_ledger_and_update_context() == -1) + // Persist the lastest synced shard seq number to the max shard meta file. + if (persist_max_shard_seq_no(PRIMARY_DIR, synced_shard_seq_no) == -1) + { + LOG_ERROR << "Error updating max shard meta file in primary shard sync."; + return; + } + const p2p::sequence_hash updated_primary_shard_id{synced_shard_seq_no, synced_target.hash}; + if (get_last_ledger_and_update_context(hpfs::RW_SESSION_NAME, updated_primary_shard_id) == -1) { LOG_ERROR << "Error updating context from the synced shard " << synced_target.name; return; } + ctx.set_last_primary_shard_id(updated_primary_shard_id); last_primary_shard_seq_no = synced_shard_seq_no; - ctx.set_last_primary_shard_id(p2p::sequence_hash{synced_shard_seq_no, synced_target.hash}); } if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all shards if this is a full history node. @@ -90,10 +97,17 @@ namespace ledger uint64_t last_blob_shard_seq_no = ctx.get_last_blob_shard_id().seq_no; if (last_blob_shard_seq_no <= synced_shard_seq_no) { + // Persist the lastest synced shard seq number to the max shard meta file. + if (persist_max_shard_seq_no(BLOB_DIR, synced_shard_seq_no) == -1) + { + LOG_ERROR << "Error updating max shard meta file in blob shard sync."; + return; + } + last_blob_shard_seq_no = synced_shard_seq_no; ctx.set_last_blob_shard_id(p2p::sequence_hash{synced_shard_seq_no, synced_target.hash}); } - + if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all blob shards if this is a full history node. last_blob_shard_seq_no - synced_shard_seq_no + 1 < conf::cfg.node.history_config.max_blob_shards) {