diff --git a/src/consensus.cpp b/src/consensus.cpp index e9bf4cc5..63748df2 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -142,7 +142,7 @@ namespace consensus const size_t unl_count = unl::count(); vote_counter votes; - const int sync_status = check_sync_status(unl_count, votes); + const int sync_status = check_sync_status(unl_count, votes, lcl_id); if (sync_status == -2) // Unreliable votes. { @@ -230,7 +230,7 @@ namespace consensus * Checks whether we are in sync with the received votes. * @return 0 if we are in sync. -1 on ledger or hpfs desync. -2 if majority last ledger primary shard hash unreliable. */ - int check_sync_status(const size_t unl_count, vote_counter &votes) + int check_sync_status(const size_t unl_count, vote_counter &votes, const p2p::sequence_hash &lcl_id) { bool is_last_primary_shard_desync = false; p2p::sequence_hash majority_primary_shard_id; @@ -275,7 +275,11 @@ namespace consensus if (conf::cfg.node.history == conf::HISTORY::FULL) { - sc::hpfs_log_sync::set_sync_target(p2p::sequence_hash{ledger::ctx.get_lcl_id().seq_no + 1, hpfs::get_root_hash(majority_patch_hash, majority_state_hash)}); + // If state or patch is desync set target for the hpfs log sync with the next lcl seq_no. + // When requesting the next seq_no, serving peer will give all the hpfs logs upto it's latest. + // So hash mismatch won't happen in the next round. + if (!ledger::ledger_sync_worker.is_last_primary_shard_syncing) + sc::hpfs_log_sync::set_sync_target(lcl_id.seq_no + 1); } else { diff --git a/src/consensus.hpp b/src/consensus.hpp index 64ba7da6..9c797c94 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -151,7 +151,7 @@ namespace consensus int commit_consensus_results(const p2p::proposal &cons_prop, const consensus::consensed_user_map &consensed_users, const util::h32 &patch_hash); - int check_sync_status(const size_t unl_count, vote_counter &votes); + int check_sync_status(const size_t unl_count, vote_counter &votes, const p2p::sequence_hash &lcl_id); void check_sync_completion(); diff --git a/src/hpfs/hpfs_mount.cpp b/src/hpfs/hpfs_mount.cpp index 9822e3e9..4a0c6126 100644 --- a/src/hpfs/hpfs_mount.cpp +++ b/src/hpfs/hpfs_mount.cpp @@ -19,6 +19,8 @@ namespace hpfs constexpr const char *HMAP_CHILDREN = "::hpfs.hmap.children"; constexpr const char *INDEX_CONTROL = "/::hpfs.index"; + constexpr const char *INDEX_READ_QUERY_FULLSTOP = "/::hpfs.index.read."; + constexpr const char *INDEX_WRITE_QUERY_FULLSTOP = "/::hpfs.index.write."; constexpr const char *ROOT_PATH = "/"; constexpr const char *LOG_INDEX_FILENAME = "/log.hpfs.idx"; @@ -27,7 +29,7 @@ namespace hpfs constexpr uint16_t PROCESS_INIT_TIMEOUT = 2000; constexpr uint16_t INIT_CHECK_INTERVAL = 20; - constexpr uint64_t MAX_HPFS_LOG_READ_SIZE = 1 * 1024 * 1024; + constexpr uint64_t MAX_HPFS_LOG_READ_SIZE = 4 * 1024 * 1024; // 4MB /** * This should be called to activate the hpfs mount process. @@ -401,11 +403,14 @@ namespace hpfs /** * This updates the hpfs log index file with latest log offset and the root hash. + * @param seq_no Updating sequence number. * @return Returns 0 in success, otherwise -1. */ - int hpfs_mount::update_hpfs_log_index() + int hpfs_mount::update_hpfs_log_index(const uint64_t seq_no) { - const std::string index_file = mount_dir + INDEX_CONTROL; + // Sequence number is passed to hpfs by appending it to the path. + // File /::hpfs.index. + const std::string index_file = mount_dir + INDEX_CONTROL + "." + std::to_string(seq_no); // /::hpfs.index. const int fd = open(index_file.c_str(), O_RDWR); if (fd == -1) @@ -431,7 +436,7 @@ namespace hpfs int hpfs_mount::truncate_log_file(const uint64_t seq_no) { const std::string file_path = mount_dir + INDEX_CONTROL + "." + std::to_string(seq_no); - // File /hpfs::index. is truncated to invoke log file truncation in hpfs. + // File /::hpfs.index. is truncated to invoke log file truncation in hpfs. // This call waits until any running RW or RO sessions stop. if (truncate(file_path.c_str(), 0) == -1) { @@ -442,7 +447,8 @@ namespace hpfs } /** - * This reads the hpfs logs from given min to max ledger seq_no range. + * This reads the hpfs logs from given min to max ledger seq_no range. Read call will be handled as chuncks in multiple threads from the hpfs. + * So this function should only be called in a single thread. * @param min_ledger_seq_no Mininmum ledger seq number. * @param max_ledger_seq_no Maximum ledger seq number. * @param buf Buffer to read logs. @@ -450,7 +456,12 @@ namespace hpfs */ int hpfs_mount::read_hpfs_logs(const uint64_t min_ledger_seq_no, const uint64_t max_ledger_seq_no, std::vector &buf) { - const std::string index_file = mount_dir + INDEX_CONTROL + "." + std::to_string(min_ledger_seq_no) + "." + std::to_string(max_ledger_seq_no); + /** + * To complete the read operation. All the three open(), read() ad close() operations should be done in this order. + * This should be done within a single thread in atomic manner. + * File /::hpfs.index.read.. + */ + const std::string index_file = mount_dir + INDEX_READ_QUERY_FULLSTOP + std::to_string(min_ledger_seq_no) + "." + std::to_string(max_ledger_seq_no); const int fd = open(index_file.c_str(), O_RDONLY); if (fd == -1) @@ -474,13 +485,19 @@ namespace hpfs } /** - * This appends new log records to the hpfs log file. + * This appends new log records to the hpfs log file. Write call will be handled as chuncks in multiple threads from the hpfs. + * So this function should only be called in a single thread. * @param buf Hpfs log record buffer to write. * @return Returns 0 in success, otherwise -1. */ int hpfs_mount::append_hpfs_log_records(const std::vector &buf) { - const std::string index_file = mount_dir + INDEX_CONTROL; + /** + * To complete the read operation. All the three open(), write() ad close() operations should be done in this order. + * This should be done within a single thread in atomic manner. + * File /::hpfs.index.write. + */ + const std::string index_file = mount_dir + INDEX_WRITE_QUERY_FULLSTOP + std::to_string(buf.size()); const int fd = open(index_file.c_str(), O_RDWR); if (fd == -1) diff --git a/src/hpfs/hpfs_mount.hpp b/src/hpfs/hpfs_mount.hpp index 91df99f9..55a547dd 100644 --- a/src/hpfs/hpfs_mount.hpp +++ b/src/hpfs/hpfs_mount.hpp @@ -71,7 +71,7 @@ namespace hpfs const std::string physical_path(std::string_view session_name, std::string_view vpath); const util::h32 get_parent_hash(const std::string &parent_vpath); void set_parent_hash(const std::string &parent_vpath, const util::h32 new_state); - int update_hpfs_log_index(); + int update_hpfs_log_index(const uint64_t seq_no); int truncate_log_file(const uint64_t seq_no); int get_last_seq_no_from_index(uint64_t &seq_no); int get_hash_from_index_by_seq_no(util::h32 &hash, const uint64_t seq_no); diff --git a/src/ledger/ledger.cpp b/src/ledger/ledger.cpp index 5d5ef10b..946c240b 100644 --- a/src/ledger/ledger.cpp +++ b/src/ledger/ledger.cpp @@ -36,8 +36,6 @@ namespace ledger ledger::ledger_sync ledger_sync_worker; // Global ledger file system sync instance. ledger::ledger_serve ledger_server; // Ledger file server instance. - std::shared_mutex primary_index_file_mutex; - constexpr uint32_t LEDGER_FS_ID = 1; constexpr int FILE_PERMS = 0644; @@ -160,7 +158,7 @@ namespace ledger ctx.set_last_primary_shard_id(p2p::sequence_hash{shard_seq_no, last_primary_shard_hash}); // Update the hpfs log index file only in full history mode. - if (conf::cfg.node.history == conf::HISTORY::FULL && sc::contract_fs.update_hpfs_log_index() == -1) + if (conf::cfg.node.history == conf::HISTORY::FULL && sc::contract_fs.update_hpfs_log_index(new_lcl_id.seq_no) == -1) { LOG_ERROR << errno << ": Error updating the hpfs log index file."; return -1; @@ -193,6 +191,11 @@ namespace ledger { sqlite::close_db(&db); + // Update in-memory context raw shard hash after inserting new record. + util::h32 last_raw_shard_hash; + if (ledger_fs.get_hash(last_raw_shard_hash, hpfs::RW_SESSION_NAME, std::string(RAW_DIR).append("/").append(std::to_string(shard_seq_no))) != -1) + ctx.set_last_raw_shard_id(p2p::sequence_hash{shard_seq_no, last_raw_shard_hash}); + // Remove old shards if new one got created. if (shard_res == 1) remove_old_shards(lcl_id.seq_no, RAW_SHARD_SIZE, conf::cfg.node.history_config.max_raw_shards, RAW_DIR); diff --git a/src/ledger/ledger_sync.cpp b/src/ledger/ledger_sync.cpp index a4d472d9..6b076dec 100644 --- a/src/ledger/ledger_sync.cpp +++ b/src/ledger/ledger_sync.cpp @@ -69,6 +69,18 @@ namespace ledger // If existing max shard is older than the max we can keep. Then delete all the existing shards. remove_old_shards(ctx.get_lcl_id().seq_no, PRIMARY_SHARD_SIZE, conf::cfg.node.history_config.max_primary_shards, PRIMARY_DIR); + + // If node is in full history mode. Restarting the fs mount, So primary ledger shard sync changes would be reflected in the ro sessions. + // Which is used for hpfs log sync. + if (conf::cfg.node.history == conf::HISTORY::FULL) + { + fs_mount->release_rw_session(); + if (fs_mount->acquire_rw_session() == -1) + { + LOG_ERROR << "Error acquring rw session after achieving primary shard."; + return; + } + } } if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all shards if this is a full history node. diff --git a/src/msg/fbuf/p2pmsg.fbs b/src/msg/fbuf/p2pmsg.fbs index c0faf616..a9106842 100644 --- a/src/msg/fbuf/p2pmsg.fbs +++ b/src/msg/fbuf/p2pmsg.fbs @@ -126,7 +126,7 @@ table HpfsFSHashEntry{ table HpfsLogRequest { - target_record_id:SequenceHash; + target_seq_no:uint64; min_record_id:SequenceHash; } diff --git a/src/msg/fbuf/p2pmsg_conversion.cpp b/src/msg/fbuf/p2pmsg_conversion.cpp index b4591914..34d959db 100644 --- a/src/msg/fbuf/p2pmsg_conversion.cpp +++ b/src/msg/fbuf/p2pmsg_conversion.cpp @@ -213,7 +213,7 @@ namespace msg::fbuf::p2pmsg { const auto &msg = *mi.p2p_msg->content_as_HpfsLogRequest(); p2p::hpfs_log_request log_record; - log_record.target_record_id = flatbuf_seqhash_to_seqhash(msg.target_record_id()); + log_record.target_seq_no = msg.target_seq_no(); log_record.min_record_id = flatbuf_seqhash_to_seqhash(msg.min_record_id()); return log_record; } @@ -425,7 +425,7 @@ namespace msg::fbuf::p2pmsg { const auto msg = CreateHpfsLogRequest( builder, - seqhash_to_flatbuf_seqhash(builder, hpfs_log_request.target_record_id), + hpfs_log_request.target_seq_no, seqhash_to_flatbuf_seqhash(builder, hpfs_log_request.min_record_id)); create_p2p_msg(builder, P2PMsgContent_HpfsLogRequest, msg.Union()); diff --git a/src/msg/fbuf/p2pmsg_generated.h b/src/msg/fbuf/p2pmsg_generated.h index ced6cc96..c86f7108 100644 --- a/src/msg/fbuf/p2pmsg_generated.h +++ b/src/msg/fbuf/p2pmsg_generated.h @@ -1801,14 +1801,14 @@ inline flatbuffers::Offset CreateHpfsFSHashEntryDirect( struct HpfsLogRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef HpfsLogRequestBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_TARGET_RECORD_ID = 4, + VT_TARGET_SEQ_NO = 4, VT_MIN_RECORD_ID = 6 }; - const msg::fbuf::p2pmsg::SequenceHash *target_record_id() const { - return GetPointer(VT_TARGET_RECORD_ID); + uint64_t target_seq_no() const { + return GetField(VT_TARGET_SEQ_NO, 0); } - msg::fbuf::p2pmsg::SequenceHash *mutable_target_record_id() { - return GetPointer(VT_TARGET_RECORD_ID); + bool mutate_target_seq_no(uint64_t _target_seq_no) { + return SetField(VT_TARGET_SEQ_NO, _target_seq_no, 0); } const msg::fbuf::p2pmsg::SequenceHash *min_record_id() const { return GetPointer(VT_MIN_RECORD_ID); @@ -1818,8 +1818,7 @@ struct HpfsLogRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && - VerifyOffset(verifier, VT_TARGET_RECORD_ID) && - verifier.VerifyTable(target_record_id()) && + VerifyField(verifier, VT_TARGET_SEQ_NO) && VerifyOffset(verifier, VT_MIN_RECORD_ID) && verifier.VerifyTable(min_record_id()) && verifier.EndTable(); @@ -1830,8 +1829,8 @@ struct HpfsLogRequestBuilder { typedef HpfsLogRequest Table; flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; - void add_target_record_id(flatbuffers::Offset target_record_id) { - fbb_.AddOffset(HpfsLogRequest::VT_TARGET_RECORD_ID, target_record_id); + void add_target_seq_no(uint64_t target_seq_no) { + fbb_.AddElement(HpfsLogRequest::VT_TARGET_SEQ_NO, target_seq_no, 0); } void add_min_record_id(flatbuffers::Offset min_record_id) { fbb_.AddOffset(HpfsLogRequest::VT_MIN_RECORD_ID, min_record_id); @@ -1850,11 +1849,11 @@ struct HpfsLogRequestBuilder { inline flatbuffers::Offset CreateHpfsLogRequest( flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset target_record_id = 0, + uint64_t target_seq_no = 0, flatbuffers::Offset min_record_id = 0) { HpfsLogRequestBuilder builder_(_fbb); + builder_.add_target_seq_no(target_seq_no); builder_.add_min_record_id(min_record_id); - builder_.add_target_record_id(target_record_id); return builder_.Finish(); } diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 0665f980..4ab1d2ae 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -135,7 +135,7 @@ namespace p2p // Represents hpfs log sync request. struct hpfs_log_request { - sequence_hash target_record_id; + uint64_t target_seq_no; sequence_hash min_record_id; }; @@ -195,11 +195,11 @@ namespace p2p std::mutex ledger_hpfs_responses_mutex; // Mutex for ledger fs hpfs responses access race conditions. // Lists holding hpfs log requests and responses collected from incoming p2p messages. - std::list> log_record_requests; - std::mutex log_record_request_mutex; // Mutex for hpfs log request access race conditions. + std::list> hpfs_log_requests; + std::mutex hpfs_log_request_mutex; // Mutex for hpfs log request access race conditions. - std::list> log_record_responses; - std::mutex log_record_response_mutex; // Mutex for hpfs log responses access race conditions. + std::list> hpfs_log_responses; + std::mutex hpfs_log_response_mutex; // Mutex for hpfs log responses access race conditions. }; struct connected_context diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index be028027..17aa0d37 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -237,13 +237,13 @@ namespace p2p if (conf::cfg.node.history == conf::HISTORY::FULL) { // Check the cap and insert log record request with lock. - std::scoped_lock lock(ctx.collected_msgs.log_record_request_mutex); + std::scoped_lock lock(ctx.collected_msgs.hpfs_log_request_mutex); // If max number of log record requests reached, skip the rest. - if (ctx.collected_msgs.log_record_requests.size() < p2p::LOG_RECORD_REQ_LIST_CAP) + if (ctx.collected_msgs.hpfs_log_requests.size() < p2p::LOG_RECORD_REQ_LIST_CAP) { const p2p::hpfs_log_request hpfs_log_request = p2pmsg::create_hpfs_log_request_from_msg(mi); - ctx.collected_msgs.log_record_requests.push_back(std::make_pair(session.uniqueid, std::move(hpfs_log_request))); + ctx.collected_msgs.hpfs_log_requests.push_back(std::make_pair(session.uniqueid, std::move(hpfs_log_request))); } else LOG_DEBUG << "Hpfs log request rejected. Maximum request count reached. " << session.display_name(); @@ -254,13 +254,13 @@ namespace p2p if (conf::cfg.node.history == conf::HISTORY::FULL && sc::hpfs_log_sync::sync_ctx.is_syncing) { // Check the cap and insert log record response with lock. - std::scoped_lock lock(ctx.collected_msgs.log_record_response_mutex); + std::scoped_lock lock(ctx.collected_msgs.hpfs_log_response_mutex); // If max number of log record responses reached, skip the rest. - if (ctx.collected_msgs.log_record_responses.size() < p2p::LOG_RECORD_RES_LIST_CAP) + if (ctx.collected_msgs.hpfs_log_responses.size() < p2p::LOG_RECORD_RES_LIST_CAP) { const p2p::hpfs_log_response hpfs_log_response = p2pmsg::create_hpfs_log_response_from_msg(mi); - ctx.collected_msgs.log_record_responses.push_back(std::make_pair(session.uniqueid, std::move(hpfs_log_response))); + ctx.collected_msgs.hpfs_log_responses.push_back(std::make_pair(session.uniqueid, std::move(hpfs_log_response))); } else LOG_DEBUG << "Hpfs log response rejected. Maximum response count reached. " << session.display_name(); diff --git a/src/sc/hpfs_log_sync.cpp b/src/sc/hpfs_log_sync.cpp index aa38fc6c..78593696 100644 --- a/src/sc/hpfs_log_sync.cpp +++ b/src/sc/hpfs_log_sync.cpp @@ -25,6 +25,9 @@ namespace sc::hpfs_log_sync sync_context sync_ctx; bool init_success = false; + // Represent sequence number and the root hash of the genesis ledger. + p2p::sequence_hash genesis_seq_hash; + /** * Initialize log record syncer. */ @@ -34,6 +37,8 @@ namespace sc::hpfs_log_sync sync_ctx.log_record_sync_thread = std::thread(hpfs_log_syncer_loop); + genesis_seq_hash = {ledger::genesis.seq_no, hpfs::get_root_hash(ledger::genesis.config_hash, ledger::genesis.state_hash)}; + init_success = true; return 0; } @@ -47,18 +52,21 @@ namespace sc::hpfs_log_sync } } - void set_sync_target(const p2p::sequence_hash target) + void set_sync_target(const uint64_t target) { { - std::scoped_lock lock(sync_ctx.target_log_record_mutex); - if (sync_ctx.is_shutting_down || (sync_ctx.is_syncing && sync_ctx.target_log_record == target)) + std::scoped_lock lock(sync_ctx.target_log_seq_no_mutex); + if (sync_ctx.is_shutting_down || (sync_ctx.is_syncing && sync_ctx.target_log_seq_no == target)) return; - sync_ctx.target_log_record = target; + sync_ctx.target_log_seq_no = target; + + // Finding the minimum seq_no to request hpfs logs. + if (get_verified_min_record() == -1) + return; } - if (get_verified_min_record() == -1) - return; + LOG_INFO << "Hpfs log sync: Starting sync for target: " << sync_ctx.target_log_seq_no << " min: " << sync_ctx.min_log_record.seq_no; sync_ctx.target_requested_on = 0; sync_ctx.request_submissions = 0; @@ -81,25 +89,44 @@ namespace sc::hpfs_log_sync // Perform log sync activities. { - std::scoped_lock lock(sync_ctx.target_log_record_mutex); - if (!sync_ctx.target_log_record.empty()) + std::scoped_lock lock(sync_ctx.target_log_seq_no_mutex); + if (sync_ctx.target_log_seq_no > 0) send_hpfs_log_sync_request(); // Send log record requests if needed (or abandon if sync timeout). - // Process any history responses from other nodes. - if (!sync_ctx.target_log_record.empty() && check_hpfs_log_sync_responses() == 1) + // Process any hpfs log responses from other nodes. + if (sync_ctx.target_log_seq_no > 0 && check_hpfs_log_sync_responses() == 1) processed = true; - // Here we check for the updated log records to check whether target has archived. - if (sync_ctx.is_syncing && get_verified_min_record() == 1) + // Here we check for the updated log records to check whether target has archived only if any responses have been processed. + if (sync_ctx.is_syncing && processed && get_verified_min_record() == 1) { - LOG_INFO << "Hpfs log sync: sync target archived " << sync_ctx.target_log_record; - sync_ctx.target_log_record = {}; - sync_ctx.min_log_record = {}; - sync_ctx.is_syncing = false; + LOG_INFO << "Hpfs log sync: sync target archived: " << sync_ctx.target_log_seq_no; + sync_ctx.clear_target(); + + // After archiving the target, update latest state and patch hash in the in memory map. + util::h32 state_hash, patch_hash; + const std::string session_name = "ro_hpfs_log_sync"; + + if (sc::contract_fs.start_ro_session(session_name, true) != -1) + { + if (sc::contract_fs.get_hash(state_hash, session_name, sc::STATE_DIR_PATH) != -1) + sc::contract_fs.set_parent_hash(sc::STATE_DIR_PATH, state_hash); + else + LOG_ERROR << "Hpfs log sync: error getting the updated state hash"; + + if (sc::contract_fs.get_hash(patch_hash, session_name, sc::PATCH_FILE_PATH) != -1) + sc::contract_fs.set_parent_hash(sc::STATE_DIR_PATH, state_hash); + else + LOG_ERROR << "Hpfs log sync: error getting the updated patch hash"; + + sc::contract_fs.stop_ro_session(session_name); + } + else + LOG_ERROR << "Hpfs log sync: error starting the hpfs ro session"; } } - // Serve any history requests from other nodes. + // Serve any hpfs log requests from other nodes. if (check_hpfs_log_sync_requests() == 1) processed = true; @@ -112,7 +139,7 @@ namespace sc::hpfs_log_sync } /** - * Submits/resubmits log record requests as needed. Abandons sync if threshold reached. + * Submits/resubmits hpfs log requests as needed. Abandons sync if threshold reached. */ void send_hpfs_log_sync_request() { @@ -124,13 +151,13 @@ namespace sc::hpfs_log_sync if (sync_ctx.request_submissions < ABANDON_THRESHOLD) { flatbuffers::FlatBufferBuilder fbuf; - p2pmsg::create_msg_from_hpfs_log_request(fbuf, {sync_ctx.target_log_record, sync_ctx.min_log_record}); + p2pmsg::create_msg_from_hpfs_log_request(fbuf, {sync_ctx.target_log_seq_no, sync_ctx.min_log_record}); std::string target_pubkey; p2p::send_message_to_random_peer(fbuf, target_pubkey, true); if (!target_pubkey.empty()) LOG_DEBUG << "Hpfs log sync: Requesting from [" << target_pubkey.substr(2, 10) << "]." - << " min:" << sync_ctx.min_log_record - << " target:" << sync_ctx.target_log_record; + << " min:" << sync_ctx.min_log_record.seq_no + << " target:" << sync_ctx.target_log_seq_no; sync_ctx.target_requested_on = time_now; sync_ctx.request_submissions++; @@ -144,30 +171,29 @@ namespace sc::hpfs_log_sync } /** - * Processes any log record responses we have received from other peers. + * Processes any hpfs log responses we have received from other peers. * @return 0 if no respones were processed. 1 if at least one response was processed. */ int check_hpfs_log_sync_responses() { // Move over the collected responses to the local list. - std::list> log_record_responses; - + std::list> hpfs_log_responses; { - std::scoped_lock lock(p2p::ctx.collected_msgs.log_record_response_mutex); + std::scoped_lock lock(p2p::ctx.collected_msgs.hpfs_log_response_mutex); // Move collected hpfs responses over to local candidate responses list. - if (!p2p::ctx.collected_msgs.log_record_responses.empty()) - log_record_responses.splice(log_record_responses.end(), p2p::ctx.collected_msgs.log_record_responses); + if (!p2p::ctx.collected_msgs.hpfs_log_responses.empty()) + hpfs_log_responses.splice(hpfs_log_responses.end(), p2p::ctx.collected_msgs.hpfs_log_responses); } - for (const auto &[sess_id, log_response] : log_record_responses) + for (const auto &[sess_id, log_response] : hpfs_log_responses) handle_hpfs_log_sync_response(log_response); - - return log_record_responses.empty() ? 0 : 1; + + return hpfs_log_responses.empty() ? 0 : 1; } /** - * Serves any log record requests we have received from other peers. + * Serves any hpfs log requests we have received from other peers. * @return 0 if no requests were served. 1 if at least one request was served. */ int check_hpfs_log_sync_requests() @@ -176,11 +202,11 @@ namespace sc::hpfs_log_sync std::list> log_record_requests; { - std::scoped_lock lock(p2p::ctx.collected_msgs.log_record_request_mutex); + std::scoped_lock lock(p2p::ctx.collected_msgs.hpfs_log_request_mutex); // Move collected hpfs responses over to local candidate responses list. - if (!p2p::ctx.collected_msgs.log_record_requests.empty()) - log_record_requests.splice(log_record_requests.end(), p2p::ctx.collected_msgs.log_record_requests); + if (!p2p::ctx.collected_msgs.hpfs_log_requests.empty()) + log_record_requests.splice(log_record_requests.end(), p2p::ctx.collected_msgs.hpfs_log_requests); } for (const auto &[session_id, lr] : log_record_requests) @@ -191,7 +217,7 @@ namespace sc::hpfs_log_sync continue; p2p::hpfs_log_response resp; - if (sc::contract_fs.read_hpfs_logs(lr.min_record_id.seq_no, lr.target_record_id.seq_no, resp.log_record_bytes) == -1) + if (sc::contract_fs.read_hpfs_logs(lr.min_record_id.seq_no, lr.target_seq_no, resp.log_record_bytes) == -1) continue; resp.min_record_id = lr.min_record_id; flatbuffers::FlatBufferBuilder fbuf(1024); @@ -220,8 +246,7 @@ namespace sc::hpfs_log_sync bool check_required_log_record_availability(const p2p::hpfs_log_request &log_request) { // If requested min is the genesis we serve without checking. - const p2p::sequence_hash genesis{ledger::genesis.seq_no, hpfs::get_root_hash(ledger::genesis.config_hash, ledger::genesis.state_hash)}; - if (log_request.min_record_id == genesis) + if (log_request.min_record_id == genesis_seq_hash) return true; util::h32 root_hash; @@ -230,7 +255,7 @@ namespace sc::hpfs_log_sync if (root_hash != log_request.min_record_id.hash) { - LOG_DEBUG << "Requested root hash does not match with ours: seq no " << log_request.min_record_id.seq_no; + LOG_DEBUG << "Requested root hash does not match with ours: " << log_request.min_record_id; return false; } @@ -244,15 +269,8 @@ namespace sc::hpfs_log_sync */ int handle_hpfs_log_sync_response(const p2p::hpfs_log_response &log_response) { - p2p::sequence_hash requested_min_seq_id; - { - std::scoped_lock lock(sync_ctx.min_log_record_mutex); - requested_min_seq_id = sync_ctx.min_log_record; - } - // Append only if the response contains min_seq_no staring from requested min seq_no. - const p2p::sequence_hash genesis{ledger::genesis.seq_no, hpfs::get_root_hash(ledger::genesis.config_hash, ledger::genesis.state_hash)}; - if (log_response.min_record_id != requested_min_seq_id) + if (log_response.min_record_id != sync_ctx.min_log_record) { LOG_DEBUG << "Invalid joining point in the received hpfs log response"; return -1; @@ -260,7 +278,7 @@ namespace sc::hpfs_log_sync if (sc::contract_fs.append_hpfs_log_records(log_response.log_record_bytes) == -1) { - LOG_ERROR << errno << ": Error persisting hpfs log responses"; + LOG_ERROR << "Error persisting hpfs log responses"; return -1; } return 0; @@ -284,8 +302,7 @@ namespace sc::hpfs_log_sync if (last_from_index.seq_no == ledger::genesis.seq_no || last_from_ledger.seq_no == ledger::genesis.seq_no) { // Request full ledger. - std::scoped_lock lock(sync_ctx.min_log_record_mutex); - sync_ctx.min_log_record = {ledger::genesis.seq_no, hpfs::get_root_hash(ledger::genesis.config_hash, ledger::genesis.state_hash)}; + sync_ctx.min_log_record = genesis_seq_hash; return 0; } @@ -295,11 +312,10 @@ namespace sc::hpfs_log_sync return -1; } + // In sync. No need to sync. if (last_from_index == last_from_ledger) - { - // In sync. No need to sync. return 1; - } + if (last_from_index.seq_no == last_from_ledger.seq_no) { @@ -320,10 +336,7 @@ namespace sc::hpfs_log_sync } if (root_hash_from_ledger == last_from_index.hash) - { - std::scoped_lock lock(sync_ctx.min_log_record_mutex); sync_ctx.min_log_record = last_from_index; - } else { // Fork. @@ -358,12 +371,11 @@ namespace sc::hpfs_log_sync if (starting_point == 0) { // Request full ledger. - std::scoped_lock lock(sync_ctx.min_log_record_mutex); - sync_ctx.min_log_record = {ledger::genesis.seq_no, hpfs::get_root_hash(ledger::genesis.config_hash, ledger::genesis.state_hash)}; + sync_ctx.min_log_record = genesis_seq_hash; return 0; } - const char *session_name = "get_min_verified_ledger_record"; + const char *session_name = "ro_get_min_verified_ledger_record"; if (ledger::ledger_fs.start_ro_session(session_name, false) == -1) return -1; @@ -426,28 +438,25 @@ namespace sc::hpfs_log_sync if (ledger_root_hash != index_root_hash) { // Remove the full log and index file data and start from scratch. - if (sc::contract_fs.truncate_log_file(1) == -1) + if (sc::contract_fs.truncate_log_file(genesis_seq_hash.seq_no) == -1) { - LOG_ERROR << "Error truncating hpfs log file and index file from : " << (current_seq_no - 1); + LOG_ERROR << "Error truncating hpfs log file and index file from : 0"; return -1; } // Request full ledger - std::scoped_lock lock(sync_ctx.min_log_record_mutex); - sync_ctx.min_log_record = {ledger::genesis.seq_no, hpfs::get_root_hash(ledger::genesis.config_hash, ledger::genesis.state_hash)}; + sync_ctx.min_log_record = genesis_seq_hash; } else { // To account current_seq_no-- at the loop end. current_seq_no++; - // We have to truncate keeping the joining record. +1 is added to account that. - if (sc::contract_fs.truncate_log_file(current_seq_no + 1) == -1) + if (sc::contract_fs.truncate_log_file(current_seq_no) == -1) { - LOG_ERROR << "Error truncating hpfs log file and index file from : " << (current_seq_no + 1); + LOG_ERROR << "Error truncating hpfs log file and index file from : " << current_seq_no; return -1; } - // we have found the joining point - std::scoped_lock lock(sync_ctx.min_log_record_mutex); + // We have found the joining point. sync_ctx.min_log_record = {current_seq_no, ledger_root_hash}; } return 0; diff --git a/src/sc/hpfs_log_sync.hpp b/src/sc/hpfs_log_sync.hpp index 4508e9e9..4940c0f3 100644 --- a/src/sc/hpfs_log_sync.hpp +++ b/src/sc/hpfs_log_sync.hpp @@ -12,11 +12,10 @@ namespace sc::hpfs_log_sync { struct sync_context { - // The current target log record that we are syncing towards. - p2p::sequence_hash target_log_record; - std::mutex target_log_record_mutex; + // The current target log record seq no that we are syncing towards. + uint64_t target_log_seq_no; + std::mutex target_log_seq_no_mutex; p2p::sequence_hash min_log_record; - std::mutex min_log_record_mutex; uint64_t target_requested_on = 0; uint16_t request_submissions = 0; @@ -26,7 +25,7 @@ namespace sc::hpfs_log_sync void clear_target() { - target_log_record = {}; + target_log_seq_no = 0; min_log_record = {}; target_requested_on = 0; request_submissions = 0; @@ -39,7 +38,7 @@ namespace sc::hpfs_log_sync void deinit(); - void set_sync_target(const p2p::sequence_hash target); + void set_sync_target(const uint64_t target); void hpfs_log_syncer_loop(); diff --git a/test/bin/hpfs b/test/bin/hpfs index 06ec66cf..b5225742 100755 Binary files a/test/bin/hpfs and b/test/bin/hpfs differ