From af48f3b01f6911da46c265e13884a9021cff99b2 Mon Sep 17 00:00:00 2001 From: Savinda Senevirathne Date: Thu, 1 Apr 2021 13:08:29 +0530 Subject: [PATCH] Hpfs log file sync and fork detection. (#279) Basic infrastructure for log file sync. A fork is detected by comparing the hpfs log index file against the ledger. Min and max ledger sequence hashes are sent to a full history node to request log history. --- CMakeLists.txt | 1 + src/consensus.cpp | 43 ++-- src/hpfs/hpfs_mount.cpp | 105 +++++++- src/hpfs/hpfs_mount.hpp | 10 +- src/ledger/ledger.cpp | 43 +++- src/ledger/ledger.hpp | 2 + src/ledger/sqlite.cpp | 2 +- src/msg/fbuf/p2pmsg.fbs | 18 +- src/msg/fbuf/p2pmsg_conversion.cpp | 48 +++- src/msg/fbuf/p2pmsg_conversion.hpp | 8 + src/msg/fbuf/p2pmsg_generated.h | 239 ++++++++++++++--- src/p2p/p2p.cpp | 45 +++- src/p2p/p2p.hpp | 32 ++- src/p2p/peer_comm_session.hpp | 1 + src/p2p/peer_session_handler.cpp | 38 +++ src/sc/hpfs_log_sync.cpp | 400 +++++++++++++++++++++++++++++ src/sc/hpfs_log_sync.hpp | 60 +++++ src/sc/sc.cpp | 20 +- 18 files changed, 1041 insertions(+), 74 deletions(-) create mode 100644 src/sc/hpfs_log_sync.cpp create mode 100644 src/sc/hpfs_log_sync.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 3d440341..38e7be4d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,6 +43,7 @@ add_executable(hpcore src/sc/contract_serve.cpp src/sc/contract_sync.cpp src/sc/sc.cpp + src/sc/hpfs_log_sync.cpp src/comm/comm_session.cpp src/msg/fbuf/common_helpers.cpp src/msg/fbuf/ledger_helpers.cpp diff --git a/src/consensus.cpp b/src/consensus.cpp index 50ab7b9d..1e9111fd 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -14,6 +14,7 @@ #include "unl.hpp" #include "ledger/ledger.hpp" #include "consensus.hpp" +#include "sc/hpfs_log_sync.hpp" namespace p2pmsg = msg::fbuf::p2pmsg; @@ -222,26 +223,34 @@ namespace consensus is_patch_update_pending = false; // Start hpfs sync if we are out-of-sync with majority hpfs patch hash or state hash. - if (is_state_desync || is_patch_desync || is_last_blob_shard_desync) + if (is_state_desync || is_patch_desync) { conf::change_role(conf::ROLE::OBSERVER); - if (is_state_desync) - sc::contract_sync_worker.set_target_push_front(hpfs::sync_target{"state", majority_state_hash, sc::STATE_DIR_PATH, hpfs::BACKLOG_ITEM_TYPE::DIR}); - - // Patch file sync is prioritized, Therefore it is set in the front of the sync target list. - if (is_patch_desync) - sc::contract_sync_worker.set_target_push_front(hpfs::sync_target{"patch", majority_patch_hash, sc::PATCH_FILE_PATH, hpfs::BACKLOG_ITEM_TYPE::FILE}); - - // If ledger blob shard is desync, We first request the latest blob shard. - if (is_last_blob_shard_desync) + if (conf::cfg.node.history == conf::HISTORY::FULL) { - const std::string majority_shard_seq_no_str = std::to_string(majority_blob_shard_id.seq_no); - const std::string sync_name = "blob shard " + majority_shard_seq_no_str; - const std::string shard_path = std::string(ledger::BLOB_DIR).append("/").append(majority_shard_seq_no_str); - ledger::ledger_sync_worker.is_last_blob_shard_syncing = true; - ledger::ledger_sync_worker.set_target_push_back(hpfs::sync_target{sync_name, majority_blob_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); + sc::hpfs_log_sync::set_sync_target(p2p::sequence_hash{ledger::ctx.get_lcl_id().seq_no + 1, hpfs::get_root_hash(majority_patch_hash, majority_state_hash)}); } + else + { + if (is_state_desync) + sc::contract_sync_worker.set_target_push_front(hpfs::sync_target{"state", majority_state_hash, sc::STATE_DIR_PATH, hpfs::BACKLOG_ITEM_TYPE::DIR}); + + // Patch file sync is prioritized, Therefore it is set in the front of the sync target list. + if (is_patch_desync) + sc::contract_sync_worker.set_target_push_front(hpfs::sync_target{"patch", majority_patch_hash, sc::PATCH_FILE_PATH, hpfs::BACKLOG_ITEM_TYPE::FILE}); + } + } + + // If ledger blob shard is desync, We first request the latest blob shard. + if (is_last_blob_shard_desync) + { + conf::change_role(conf::ROLE::OBSERVER); + const std::string majority_shard_seq_no_str = std::to_string(majority_blob_shard_id.seq_no); + const std::string sync_name = "blob shard " + majority_shard_seq_no_str; + const std::string shard_path = std::string(ledger::BLOB_DIR).append("/").append(majority_shard_seq_no_str); + ledger::ledger_sync_worker.is_last_blob_shard_syncing = true; + ledger::ledger_sync_worker.set_target_push_back(hpfs::sync_target{sync_name, majority_blob_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); } // If shards aren't aligned with max shard count, Do the relevant shard cleanups and requests. @@ -278,8 +287,10 @@ namespace consensus */ void check_sync_completion() { + const bool is_contract_syncing = (conf::cfg.node.history == conf::HISTORY::FULL) ? sc::hpfs_log_sync::sync_ctx.is_syncing : sc::contract_sync_worker.is_syncing; // In ledger sync we only concern about last shard sync status to proceed with consensus. - if (conf::cfg.node.role == conf::ROLE::OBSERVER && !sc::contract_sync_worker.is_syncing && !ledger::ledger_sync_worker.is_last_primary_shard_syncing && !ledger::ledger_sync_worker.is_last_blob_shard_syncing) + const bool is_ledger_syncing = ledger::ledger_sync_worker.is_last_primary_shard_syncing || ledger::ledger_sync_worker.is_last_blob_shard_syncing; + if (conf::cfg.node.role == conf::ROLE::OBSERVER && !is_contract_syncing && !is_ledger_syncing) conf::change_role(conf::ROLE::VALIDATOR); } diff --git a/src/hpfs/hpfs_mount.cpp b/src/hpfs/hpfs_mount.cpp index 3617e654..1bd41a9a 100644 --- a/src/hpfs/hpfs_mount.cpp +++ b/src/hpfs/hpfs_mount.cpp @@ -4,6 +4,7 @@ #include "../util/util.hpp" #include "../util/h32.hpp" #include "../sc/sc.hpp" +#include "../crypto.hpp" namespace hpfs { @@ -15,7 +16,11 @@ namespace hpfs constexpr const char *RO_SESSION_HMAP = "/::hpfs.ro.hmap."; constexpr const char *HMAP_HASH = "::hpfs.hmap.hash"; constexpr const char *HMAP_CHILDREN = "::hpfs.hmap.children"; + + constexpr const char *ROOT_PATH = "/"; constexpr const char *INDEX_UPDATE = "/::hpfs.index"; + constexpr const char *LOG_INDEX_FILENAME = "/log.hpfs.idx"; + constexpr ino_t ROOT_INO = 1; constexpr uint16_t PROCESS_INIT_TIMEOUT = 2000; @@ -359,8 +364,7 @@ namespace hpfs /** * This returns the hash of a given parent. * @param parent_vpath vpath of the parent file or directory. - * @return Returns the hash of the given vpath if available or - * an empth h32 hash if parent vpath not available. + * @return The hash of the given vpath if available or an empth h32 hash if parent vpath not available. */ const util::h32 hpfs_mount::get_parent_hash(const std::string &parent_vpath) { @@ -399,9 +403,9 @@ namespace hpfs const int fd = open(index_file.c_str(), O_RDWR); if (fd == -1) return -1; - + // We just send empty buffer with write size 1 to invoke the hpfs index update. - // Write syscall isn't invoking with write size 0. + // Write syscall isn't invoking with write size 0. if (write(fd, "", 1) == -1) { close(fd); @@ -415,7 +419,7 @@ namespace hpfs /** * Invoke log file and hpfs index file starting from the given sequence number. This function is a blocking call. * @param seq_no Sequence number to start truncation from. - * @return Returns -1 on error and 0 on success. + * @return -1 on error and 0 on success. */ int hpfs_mount::truncate_log_file(const uint64_t seq_no) { @@ -430,4 +434,95 @@ namespace hpfs return 0; } + /** + * Get the last sequence number updated in the index file. + * @param seq_no The last sequence number. + * @return -1 on error and 0 on success. + */ + int hpfs_mount::get_last_seq_no_from_index(uint64_t &seq_no) + { + const std::string path = fs_dir + "/" + LOG_INDEX_FILENAME; + const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC); + + if (fd == -1) + { + LOG_ERROR << errno << ": Error opening hpfs index file " << path; + return -1; + } + + struct stat st; + if (fstat(fd, &st) == -1) + { + close(fd); + LOG_ERROR << errno << ": Error reading hpfs index file " << path; + return -1; + } + close(fd); + seq_no = st.st_size / (sizeof(uint64_t) + sizeof(util::h32)); + return 0; + } + + /** + * Get the root hash for the given sequence number from hpfs index file. + * @param hash Root hash in the state of given sequence number. + * @param seq_no Sequence number to get the root hash of. + * @return -1 on error and 0 on success. + */ + int hpfs_mount::get_hash_from_index_by_seq_no(util::h32 &hash, const uint64_t seq_no) + { + const std::string path = fs_dir + "/" + LOG_INDEX_FILENAME; + const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC); + + if (fd == -1) + { + LOG_ERROR << errno << ": Error opening hpfs index file " << path; + return -1; + } + const off_t offset = ((seq_no - 1) * (sizeof(uint64_t) + sizeof(util::h32))) + sizeof(uint64_t); + if (pread(fd, &hash, sizeof(util::h32), offset) < sizeof(util::h32)) + { + LOG_ERROR << errno << ": Error reading hash from the given offset " << std::to_string(offset); + close(fd); + return -1; + } + close(fd); + return 0; + } + + /** + * Returns root hash when the two childrens are given. + * @param child_one First child of the root. + * @param child_two Second child of the root. + * @return The calculated root hash. + */ + const util::h32 get_root_hash(const util::h32 &child_one, const util::h32 &child_two) + { + util::h32 name_hash; + name_hash = crypto::get_hash(ROOT_PATH); + + util::h32 root_hash = name_hash; + root_hash ^= child_one; + root_hash ^= child_two; + + return root_hash; + } + + /** + * Returns root hash when the two childrens are given. + * @param child_one First child of the root. + * @param child_two Second child of the root. + * @return The calculated root hash. + */ + const util::h32 get_root_hash(std::string_view child_one, std::string_view child_two) + { + + util::h32 h32_child_one; + util::h32 h32_child_two; + + h32_child_one = child_one; + h32_child_two = child_two; + + return get_root_hash(h32_child_one, h32_child_two); + } + } // namespace hpfs \ No newline at end of file diff --git a/src/hpfs/hpfs_mount.hpp b/src/hpfs/hpfs_mount.hpp index 61113f88..e9fa9660 100644 --- a/src/hpfs/hpfs_mount.hpp +++ b/src/hpfs/hpfs_mount.hpp @@ -7,8 +7,8 @@ namespace hpfs { - constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB; - constexpr const char *RW_SESSION_NAME = "rw"; // The built-in session name used by hpfs for RW sessions. + constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB; + constexpr const char *RW_SESSION_NAME = "rw"; // The built-in session name used by hpfs for RW sessions. struct child_hash_node { @@ -27,6 +27,10 @@ namespace hpfs return conf::cfg.contract.roundtime; } + const util::h32 get_root_hash(const util::h32 &child_one, const util::h32 &child_two); + + const util::h32 get_root_hash(std::string_view child_one, std::string_view child_two); + /** * This class represents a hpfs file system mount. */ @@ -69,6 +73,8 @@ namespace hpfs void set_parent_hash(const std::string &parent_vpath, const util::h32 new_state); int update_hpfs_log_index(); int truncate_log_file(const uint64_t seq_no); + int get_last_seq_no_from_index(uint64_t &seq_no); + int get_hash_from_index_by_seq_no(util::h32 &hash, const uint64_t seq_no); }; } // namespace hpfs diff --git a/src/ledger/ledger.cpp b/src/ledger/ledger.cpp index f592f9a4..5eb945aa 100644 --- a/src/ledger/ledger.cpp +++ b/src/ledger/ledger.cpp @@ -554,7 +554,7 @@ namespace ledger sqlite3 *db = NULL; const std::string shard_path = ledger_fs.physical_path(session_name, ledger::PRIMARY_DIR) + "/" + std::to_string(last_primary_shard_id.seq_no); - if (last_primary_shard_id.seq_no == 0 && last_primary_shard_id.hash == util::h32_empty) + if (last_primary_shard_id.empty()) { // This is the genesis ledger. ctx.set_lcl_id(p2p::sequence_hash{0, util::h32_empty}); @@ -563,7 +563,7 @@ namespace ledger if (sqlite::open_db(shard_path + "/" + DATABASE, &db) == -1) { - LOG_ERROR << errno << ": Error openning the shard database, shard: " << last_primary_shard_id.seq_no; + LOG_ERROR << errno << ": Error openning the shard database, shard: " << std::to_string(last_primary_shard_id.seq_no); return -1; } @@ -671,4 +671,43 @@ namespace ledger close(fd); return 0; } + + /** + * Calculate root hash of contract_fs from the ledger record of given seq_no. + * @param root_hash The calculated root hash as of the given seq_no. + * @param seq_no Ledger's sequence number. + * @return Returns -1 on error and 0 on success. + */ + int get_root_hash_from_ledger(util::h32 &root_hash, const uint64_t seq_no) + { + sqlite3 *db = NULL; + const char *session_name = "root_hash_from_ledger"; + if (ledger_fs.start_ro_session(session_name, false) == -1) + return -1; + + const uint64_t shard_seq_no = (seq_no - 1) / PRIMARY_SHARD_SIZE; + + const std::string shard_path = ledger_fs.physical_path(session_name, ledger::PRIMARY_DIR) + "/" + std::to_string(shard_seq_no); + + if (sqlite::open_db(shard_path + "/" + DATABASE, &db) == -1) + { + LOG_ERROR << errno << ": Error openning the shard database, shard: " << std::to_string(shard_seq_no); + ledger_fs.stop_ro_session(session_name); + return -1; + } + + ledger::ledger_record ledger; + if (sqlite::get_ledger_by_seq_no(db, seq_no, ledger) == -1) + { + LOG_ERROR << "Error getting ledger by sequence number: " << std::to_string(seq_no); + sqlite::close_db(&db); + ledger_fs.stop_ro_session(session_name); + return -1; + } + sqlite::close_db(&db); + ledger_fs.stop_ro_session(session_name); + + root_hash = hpfs::get_root_hash(ledger.config_hash, ledger.state_hash); + return 0; + } } // namespace ledger \ No newline at end of file diff --git a/src/ledger/ledger.hpp b/src/ledger/ledger.hpp index 74dc4e64..ad4128a5 100644 --- a/src/ledger/ledger.hpp +++ b/src/ledger/ledger.hpp @@ -88,6 +88,8 @@ namespace ledger int persist_max_shard_seq_no(const std::string &shard_parent_dir, const uint64_t last_shard_seq_no); + int get_root_hash_from_ledger(util::h32 &root_hash, const uint64_t seq_no); + } // namespace ledger #endif diff --git a/src/ledger/sqlite.cpp b/src/ledger/sqlite.cpp index bbe38c35..2acc216a 100644 --- a/src/ledger/sqlite.cpp +++ b/src/ledger/sqlite.cpp @@ -377,4 +377,4 @@ namespace ledger::sqlite ledger.output_hash = GET_H32_BLOB(9); } -} // namespace ledger::sqlite \ No newline at end of file +} // namespace ledger::sqlite diff --git a/src/msg/fbuf/p2pmsg.fbs b/src/msg/fbuf/p2pmsg.fbs index 81fd4d8c..8e4fa514 100644 --- a/src/msg/fbuf/p2pmsg.fbs +++ b/src/msg/fbuf/p2pmsg.fbs @@ -14,7 +14,9 @@ union P2PMsgContent { PeerRequirementAnnouncementMsg, PeerCapacityAnnouncementMsg, PeerListRequestMsg, - PeerListResponseMsg + PeerListResponseMsg, + LogRecordRequest, + LogRecordResponse } table P2PMsg { @@ -26,6 +28,7 @@ table P2PMsg { table PeerChallengeMsg { contract_id:string; roundtime:uint32; + is_full_history:bool; challenge:[ubyte]; } @@ -121,6 +124,19 @@ table HpfsFSHashEntry{ hash: [ubyte]; } +table LogRecordRequest +{ + target_record_id:SequenceHash; + min_record_id:SequenceHash; +} + +table LogRecordResponse +{ + min_record_id:SequenceHash; + max_record_id:SequenceHash; + log_record_bytes:[ubyte]; +} + table PeerRequirementAnnouncementMsg{ need_consensus_msg_forwarding: bool; } diff --git a/src/msg/fbuf/p2pmsg_conversion.cpp b/src/msg/fbuf/p2pmsg_conversion.cpp index d75b43c5..cd073bb0 100644 --- a/src/msg/fbuf/p2pmsg_conversion.cpp +++ b/src/msg/fbuf/p2pmsg_conversion.cpp @@ -58,7 +58,7 @@ namespace msg::fbuf::p2pmsg const auto &msg = *mi.p2p_msg->content_as_ProposalMsg(); std::string_view pubkey = flatbuf_bytes_to_sv(msg.pubkey()); - + // Before verifying the hash, Validate if the message is from a trusted node. if (!unl::exists(std::string(pubkey))) { @@ -89,7 +89,7 @@ namespace msg::fbuf::p2pmsg const auto &msg = *mi.p2p_msg->content_as_NplMsg(); std::string_view pubkey = flatbuf_bytes_to_sv(msg.pubkey()); - + // Before verifying the hash, Validate if the message is from a trusted node. if (!unl::exists(std::string(pubkey))) { @@ -111,6 +111,7 @@ namespace msg::fbuf::p2pmsg return { std::string(flatbuf_str_to_sv(msg.contract_id())), msg.roundtime(), + msg.is_full_history(), std::string(flatbuf_bytes_to_sv(msg.challenge()))}; } @@ -208,6 +209,27 @@ namespace msg::fbuf::p2pmsg return hr; } + const p2p::hpfs_log_request create_hpfs_log_request_from_msg(const p2p::peer_message_info &mi) + { + const auto &msg = *mi.p2p_msg->content_as_LogRecordRequest(); + p2p::hpfs_log_request log_record; + log_record.target_record_id = flatbuf_seqhash_to_seqhash(msg.target_record_id()); + log_record.min_record_id = flatbuf_seqhash_to_seqhash(msg.min_record_id()); + return log_record; + } + + const p2p::hpfs_log_response create_hpfs_log_response_from_msg(const p2p::peer_message_info &mi) + { + const auto &msg = *mi.p2p_msg->content_as_LogRecordResponse(); + p2p::hpfs_log_response hpfs_log_response; + hpfs_log_response.min_record_id = flatbuf_seqhash_to_seqhash(msg.min_record_id()); + hpfs_log_response.max_record_id = flatbuf_seqhash_to_seqhash(msg.max_record_id()); + hpfs_log_response.log_record_bytes.reserve(msg.log_record_bytes()->size()); + for (const auto byte: *msg.log_record_bytes()) + hpfs_log_response.log_record_bytes.push_back(byte); + return hpfs_log_response; + } + p2p::sequence_hash flatbuf_seqhash_to_seqhash(const SequenceHash *fbseqhash) { return { @@ -327,6 +349,7 @@ namespace msg::fbuf::p2pmsg builder, sv_to_flatbuf_str(builder, conf::cfg.contract.id), conf::cfg.contract.roundtime, + conf::cfg.node.history == conf::HISTORY::FULL, sv_to_flatbuf_bytes(builder, challenge)); create_p2p_msg(builder, P2PMsgContent_PeerChallengeMsg, msg.Union()); } @@ -399,6 +422,27 @@ namespace msg::fbuf::p2pmsg create_p2p_msg(builder, P2PMsgContent_HpfsRequestMsg, msg.Union()); } + void create_msg_from_hpfs_log_request(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_log_request &hpfs_log_request) + { + const auto msg = CreateLogRecordRequest( + builder, + seqhash_to_flatbuf_seqhash(builder, hpfs_log_request.target_record_id), + seqhash_to_flatbuf_seqhash(builder, hpfs_log_request.min_record_id)); + + create_p2p_msg(builder, P2PMsgContent_LogRecordRequest, msg.Union()); + } + + void create_msg_from_hpfs_log_response(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_log_response &hpfs_log_response) + { + const auto msg = CreateLogRecordResponse( + builder, + seqhash_to_flatbuf_seqhash(builder, hpfs_log_response.min_record_id), + seqhash_to_flatbuf_seqhash(builder, hpfs_log_response.max_record_id), + builder.CreateVector(hpfs_log_response.log_record_bytes)); + + create_p2p_msg(builder, P2PMsgContent_LogRecordResponse, msg.Union()); + } + void create_msg_from_fsentry_response( flatbuffers::FlatBufferBuilder &builder, const std::string_view path, const uint32_t mount_id, const mode_t dir_mode, std::vector &hash_nodes, const util::h32 &expected_hash) diff --git a/src/msg/fbuf/p2pmsg_conversion.hpp b/src/msg/fbuf/p2pmsg_conversion.hpp index b62e20b2..b49db197 100644 --- a/src/msg/fbuf/p2pmsg_conversion.hpp +++ b/src/msg/fbuf/p2pmsg_conversion.hpp @@ -37,6 +37,10 @@ namespace msg::fbuf::p2pmsg const p2p::hpfs_request create_hpfs_request_from_msg(const p2p::peer_message_info &mi); + const p2p::hpfs_log_request create_hpfs_log_request_from_msg(const p2p::peer_message_info &mi); + + const p2p::hpfs_log_response create_hpfs_log_response_from_msg(const p2p::peer_message_info &mi); + p2p::sequence_hash flatbuf_seqhash_to_seqhash(const msg::fbuf::p2pmsg::SequenceHash *fbseqhash); const std::set flatbuf_bytearrayvector_to_stringlist(const flatbuffers::Vector> *fbvec); @@ -69,6 +73,10 @@ namespace msg::fbuf::p2pmsg void create_msg_from_hpfs_request(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_request &hr); + void create_msg_from_hpfs_log_request(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_log_request &hpfs_log_request); + + void create_msg_from_hpfs_log_response(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_log_response &hpfs_log_response); + void create_msg_from_fsentry_response( flatbuffers::FlatBufferBuilder &builder, const std::string_view path, const uint32_t mount_id, const mode_t dir_mode, std::vector &hash_nodes, const util::h32 &expected_hash); diff --git a/src/msg/fbuf/p2pmsg_generated.h b/src/msg/fbuf/p2pmsg_generated.h index 571db1d6..9844b5ed 100644 --- a/src/msg/fbuf/p2pmsg_generated.h +++ b/src/msg/fbuf/p2pmsg_generated.h @@ -52,6 +52,12 @@ struct HpfsBlockResponseBuilder; struct HpfsFSHashEntry; struct HpfsFSHashEntryBuilder; +struct LogRecordRequest; +struct LogRecordRequestBuilder; + +struct LogRecordResponse; +struct LogRecordResponseBuilder; + struct PeerRequirementAnnouncementMsg; struct PeerRequirementAnnouncementMsgBuilder; @@ -73,7 +79,7 @@ struct SequenceHashBuilder; struct ByteArray; struct ByteArrayBuilder; -enum P2PMsgContent { +enum P2PMsgContent : uint8_t { P2PMsgContent_NONE = 0, P2PMsgContent_PeerChallengeMsg = 1, P2PMsgContent_PeerChallengeResponseMsg = 2, @@ -86,11 +92,13 @@ enum P2PMsgContent { P2PMsgContent_PeerCapacityAnnouncementMsg = 9, P2PMsgContent_PeerListRequestMsg = 10, P2PMsgContent_PeerListResponseMsg = 11, + P2PMsgContent_LogRecordRequest = 12, + P2PMsgContent_LogRecordResponse = 13, P2PMsgContent_MIN = P2PMsgContent_NONE, - P2PMsgContent_MAX = P2PMsgContent_PeerListResponseMsg + P2PMsgContent_MAX = P2PMsgContent_LogRecordResponse }; -inline const P2PMsgContent (&EnumValuesP2PMsgContent())[12] { +inline const P2PMsgContent (&EnumValuesP2PMsgContent())[14] { static const P2PMsgContent values[] = { P2PMsgContent_NONE, P2PMsgContent_PeerChallengeMsg, @@ -103,13 +111,15 @@ inline const P2PMsgContent (&EnumValuesP2PMsgContent())[12] { P2PMsgContent_PeerRequirementAnnouncementMsg, P2PMsgContent_PeerCapacityAnnouncementMsg, P2PMsgContent_PeerListRequestMsg, - P2PMsgContent_PeerListResponseMsg + P2PMsgContent_PeerListResponseMsg, + P2PMsgContent_LogRecordRequest, + P2PMsgContent_LogRecordResponse }; return values; } inline const char * const *EnumNamesP2PMsgContent() { - static const char * const names[13] = { + static const char * const names[15] = { "NONE", "PeerChallengeMsg", "PeerChallengeResponseMsg", @@ -122,13 +132,15 @@ inline const char * const *EnumNamesP2PMsgContent() { "PeerCapacityAnnouncementMsg", "PeerListRequestMsg", "PeerListResponseMsg", + "LogRecordRequest", + "LogRecordResponse", nullptr }; return names; } inline const char *EnumNameP2PMsgContent(P2PMsgContent e) { - if (flatbuffers::IsOutRange(e, P2PMsgContent_NONE, P2PMsgContent_PeerListResponseMsg)) return ""; + if (flatbuffers::IsOutRange(e, P2PMsgContent_NONE, P2PMsgContent_LogRecordResponse)) return ""; const size_t index = static_cast(e); return EnumNamesP2PMsgContent()[index]; } @@ -181,10 +193,18 @@ template<> struct P2PMsgContentTraits { static const P2PMsgContent enum_value = P2PMsgContent_PeerListResponseMsg; }; +template<> struct P2PMsgContentTraits { + static const P2PMsgContent enum_value = P2PMsgContent_LogRecordRequest; +}; + +template<> struct P2PMsgContentTraits { + static const P2PMsgContent enum_value = P2PMsgContent_LogRecordResponse; +}; + bool VerifyP2PMsgContent(flatbuffers::Verifier &verifier, const void *obj, P2PMsgContent type); bool VerifyP2PMsgContentVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector> *values, const flatbuffers::Vector *types); -enum HpfsResponse { +enum HpfsResponse : uint8_t { HpfsResponse_NONE = 0, HpfsResponse_HpfsFileHashMapResponse = 1, HpfsResponse_HpfsBlockResponse = 2, @@ -299,6 +319,12 @@ struct P2PMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { const msg::fbuf::p2pmsg::PeerListResponseMsg *content_as_PeerListResponseMsg() const { return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_PeerListResponseMsg ? static_cast(content()) : nullptr; } + const msg::fbuf::p2pmsg::LogRecordRequest *content_as_LogRecordRequest() const { + return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_LogRecordRequest ? static_cast(content()) : nullptr; + } + const msg::fbuf::p2pmsg::LogRecordResponse *content_as_LogRecordResponse() const { + return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_LogRecordResponse ? static_cast(content()) : nullptr; + } void *mutable_content() { return GetPointer(VT_CONTENT); } @@ -358,6 +384,14 @@ template<> inline const msg::fbuf::p2pmsg::PeerListResponseMsg *P2PMsg::content_ return content_as_PeerListResponseMsg(); } +template<> inline const msg::fbuf::p2pmsg::LogRecordRequest *P2PMsg::content_as() const { + return content_as_LogRecordRequest(); +} + +template<> inline const msg::fbuf::p2pmsg::LogRecordResponse *P2PMsg::content_as() const { + return content_as_LogRecordResponse(); +} + struct P2PMsgBuilder { typedef P2PMsg Table; flatbuffers::FlatBufferBuilder &fbb_; @@ -378,7 +412,6 @@ struct P2PMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - P2PMsgBuilder &operator=(const P2PMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -420,7 +453,8 @@ struct PeerChallengeMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_CONTRACT_ID = 4, VT_ROUNDTIME = 6, - VT_CHALLENGE = 8 + VT_IS_FULL_HISTORY = 8, + VT_CHALLENGE = 10 }; const flatbuffers::String *contract_id() const { return GetPointer(VT_CONTRACT_ID); @@ -434,6 +468,12 @@ struct PeerChallengeMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { bool mutate_roundtime(uint32_t _roundtime) { return SetField(VT_ROUNDTIME, _roundtime, 0); } + bool is_full_history() const { + return GetField(VT_IS_FULL_HISTORY, 0) != 0; + } + bool mutate_is_full_history(bool _is_full_history) { + return SetField(VT_IS_FULL_HISTORY, static_cast(_is_full_history), 0); + } const flatbuffers::Vector *challenge() const { return GetPointer *>(VT_CHALLENGE); } @@ -445,6 +485,7 @@ struct PeerChallengeMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VerifyOffset(verifier, VT_CONTRACT_ID) && verifier.VerifyString(contract_id()) && VerifyField(verifier, VT_ROUNDTIME) && + VerifyField(verifier, VT_IS_FULL_HISTORY) && VerifyOffset(verifier, VT_CHALLENGE) && verifier.VerifyVector(challenge()) && verifier.EndTable(); @@ -461,6 +502,9 @@ struct PeerChallengeMsgBuilder { void add_roundtime(uint32_t roundtime) { fbb_.AddElement(PeerChallengeMsg::VT_ROUNDTIME, roundtime, 0); } + void add_is_full_history(bool is_full_history) { + fbb_.AddElement(PeerChallengeMsg::VT_IS_FULL_HISTORY, static_cast(is_full_history), 0); + } void add_challenge(flatbuffers::Offset> challenge) { fbb_.AddOffset(PeerChallengeMsg::VT_CHALLENGE, challenge); } @@ -468,7 +512,6 @@ struct PeerChallengeMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - PeerChallengeMsgBuilder &operator=(const PeerChallengeMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -480,11 +523,13 @@ inline flatbuffers::Offset CreatePeerChallengeMsg( flatbuffers::FlatBufferBuilder &_fbb, flatbuffers::Offset contract_id = 0, uint32_t roundtime = 0, + bool is_full_history = false, flatbuffers::Offset> challenge = 0) { PeerChallengeMsgBuilder builder_(_fbb); builder_.add_challenge(challenge); builder_.add_roundtime(roundtime); builder_.add_contract_id(contract_id); + builder_.add_is_full_history(is_full_history); return builder_.Finish(); } @@ -492,6 +537,7 @@ inline flatbuffers::Offset CreatePeerChallengeMsgDirect( flatbuffers::FlatBufferBuilder &_fbb, const char *contract_id = nullptr, uint32_t roundtime = 0, + bool is_full_history = false, const std::vector *challenge = nullptr) { auto contract_id__ = contract_id ? _fbb.CreateString(contract_id) : 0; auto challenge__ = challenge ? _fbb.CreateVector(*challenge) : 0; @@ -499,6 +545,7 @@ inline flatbuffers::Offset CreatePeerChallengeMsgDirect( _fbb, contract_id__, roundtime, + is_full_history, challenge__); } @@ -556,7 +603,6 @@ struct PeerChallengeResponseMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - PeerChallengeResponseMsgBuilder &operator=(const PeerChallengeResponseMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -644,7 +690,6 @@ struct UserInputBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - UserInputBuilder &operator=(const UserInputBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -721,7 +766,6 @@ struct UserInputGroupBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - UserInputGroupBuilder &operator=(const UserInputGroupBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -782,7 +826,6 @@ struct NonUnlProposalMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - NonUnlProposalMsgBuilder &operator=(const NonUnlProposalMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -992,7 +1035,6 @@ struct ProposalMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - ProposalMsgBuilder &operator=(const ProposalMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1143,7 +1185,6 @@ struct NplMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - NplMsgBuilder &operator=(const NplMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1257,7 +1298,6 @@ struct HpfsRequestMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - HpfsRequestMsgBuilder &operator=(const HpfsRequestMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1394,7 +1434,6 @@ struct HpfsResponseMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - HpfsResponseMsgBuilder &operator=(const HpfsResponseMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1478,7 +1517,6 @@ struct HpfsFsEntryResponseBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - HpfsFsEntryResponseBuilder &operator=(const HpfsFsEntryResponseBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1559,7 +1597,6 @@ struct HpfsFileHashMapResponseBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - HpfsFileHashMapResponseBuilder &operator=(const HpfsFileHashMapResponseBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1633,7 +1670,6 @@ struct HpfsBlockResponseBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - HpfsBlockResponseBuilder &operator=(const HpfsBlockResponseBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1715,7 +1751,6 @@ struct HpfsFSHashEntryBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - HpfsFSHashEntryBuilder &operator=(const HpfsFSHashEntryBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1749,6 +1784,151 @@ inline flatbuffers::Offset CreateHpfsFSHashEntryDirect( hash__); } +struct LogRecordRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef LogRecordRequestBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_TARGET_RECORD_ID = 4, + VT_MIN_RECORD_ID = 6 + }; + const msg::fbuf::p2pmsg::SequenceHash *target_record_id() const { + return GetPointer(VT_TARGET_RECORD_ID); + } + msg::fbuf::p2pmsg::SequenceHash *mutable_target_record_id() { + return GetPointer(VT_TARGET_RECORD_ID); + } + const msg::fbuf::p2pmsg::SequenceHash *min_record_id() const { + return GetPointer(VT_MIN_RECORD_ID); + } + msg::fbuf::p2pmsg::SequenceHash *mutable_min_record_id() { + return GetPointer(VT_MIN_RECORD_ID); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_TARGET_RECORD_ID) && + verifier.VerifyTable(target_record_id()) && + VerifyOffset(verifier, VT_MIN_RECORD_ID) && + verifier.VerifyTable(min_record_id()) && + verifier.EndTable(); + } +}; + +struct LogRecordRequestBuilder { + typedef LogRecordRequest Table; + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_target_record_id(flatbuffers::Offset target_record_id) { + fbb_.AddOffset(LogRecordRequest::VT_TARGET_RECORD_ID, target_record_id); + } + void add_min_record_id(flatbuffers::Offset min_record_id) { + fbb_.AddOffset(LogRecordRequest::VT_MIN_RECORD_ID, min_record_id); + } + explicit LogRecordRequestBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateLogRecordRequest( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset target_record_id = 0, + flatbuffers::Offset min_record_id = 0) { + LogRecordRequestBuilder builder_(_fbb); + builder_.add_min_record_id(min_record_id); + builder_.add_target_record_id(target_record_id); + return builder_.Finish(); +} + +struct LogRecordResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef LogRecordResponseBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_MIN_RECORD_ID = 4, + VT_MAX_RECORD_ID = 6, + VT_LOG_RECORD_BYTES = 8 + }; + const msg::fbuf::p2pmsg::SequenceHash *min_record_id() const { + return GetPointer(VT_MIN_RECORD_ID); + } + msg::fbuf::p2pmsg::SequenceHash *mutable_min_record_id() { + return GetPointer(VT_MIN_RECORD_ID); + } + const msg::fbuf::p2pmsg::SequenceHash *max_record_id() const { + return GetPointer(VT_MAX_RECORD_ID); + } + msg::fbuf::p2pmsg::SequenceHash *mutable_max_record_id() { + return GetPointer(VT_MAX_RECORD_ID); + } + const flatbuffers::Vector *log_record_bytes() const { + return GetPointer *>(VT_LOG_RECORD_BYTES); + } + flatbuffers::Vector *mutable_log_record_bytes() { + return GetPointer *>(VT_LOG_RECORD_BYTES); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_MIN_RECORD_ID) && + verifier.VerifyTable(min_record_id()) && + VerifyOffset(verifier, VT_MAX_RECORD_ID) && + verifier.VerifyTable(max_record_id()) && + VerifyOffset(verifier, VT_LOG_RECORD_BYTES) && + verifier.VerifyVector(log_record_bytes()) && + verifier.EndTable(); + } +}; + +struct LogRecordResponseBuilder { + typedef LogRecordResponse Table; + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_min_record_id(flatbuffers::Offset min_record_id) { + fbb_.AddOffset(LogRecordResponse::VT_MIN_RECORD_ID, min_record_id); + } + void add_max_record_id(flatbuffers::Offset max_record_id) { + fbb_.AddOffset(LogRecordResponse::VT_MAX_RECORD_ID, max_record_id); + } + void add_log_record_bytes(flatbuffers::Offset> log_record_bytes) { + fbb_.AddOffset(LogRecordResponse::VT_LOG_RECORD_BYTES, log_record_bytes); + } + explicit LogRecordResponseBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateLogRecordResponse( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset min_record_id = 0, + flatbuffers::Offset max_record_id = 0, + flatbuffers::Offset> log_record_bytes = 0) { + LogRecordResponseBuilder builder_(_fbb); + builder_.add_log_record_bytes(log_record_bytes); + builder_.add_max_record_id(max_record_id); + builder_.add_min_record_id(min_record_id); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateLogRecordResponseDirect( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset min_record_id = 0, + flatbuffers::Offset max_record_id = 0, + const std::vector *log_record_bytes = nullptr) { + auto log_record_bytes__ = log_record_bytes ? _fbb.CreateVector(*log_record_bytes) : 0; + return msg::fbuf::p2pmsg::CreateLogRecordResponse( + _fbb, + min_record_id, + max_record_id, + log_record_bytes__); +} + struct PeerRequirementAnnouncementMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef PeerRequirementAnnouncementMsgBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { @@ -1778,7 +1958,6 @@ struct PeerRequirementAnnouncementMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - PeerRequirementAnnouncementMsgBuilder &operator=(const PeerRequirementAnnouncementMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1834,7 +2013,6 @@ struct PeerCapacityAnnouncementMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - PeerCapacityAnnouncementMsgBuilder &operator=(const PeerCapacityAnnouncementMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1868,7 +2046,6 @@ struct PeerListRequestMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - PeerListRequestMsgBuilder &operator=(const PeerListRequestMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1913,7 +2090,6 @@ struct PeerListResponseMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - PeerListResponseMsgBuilder &operator=(const PeerListResponseMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -2001,7 +2177,6 @@ struct PeerPropertiesBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - PeerPropertiesBuilder &operator=(const PeerPropertiesBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -2079,7 +2254,6 @@ struct SequenceHashBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - SequenceHashBuilder &operator=(const SequenceHashBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -2138,7 +2312,6 @@ struct ByteArrayBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } - ByteArrayBuilder &operator=(const ByteArrayBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -2212,6 +2385,14 @@ inline bool VerifyP2PMsgContent(flatbuffers::Verifier &verifier, const void *obj auto ptr = reinterpret_cast(obj); return verifier.VerifyTable(ptr); } + case P2PMsgContent_LogRecordRequest: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } + case P2PMsgContent_LogRecordResponse: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } default: return true; } } diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 18fa7ff7..161fdcfa 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -112,7 +112,7 @@ namespace p2p if (res == 0) { LOG_DEBUG << "Pubkey violation. Rejecting new peer connection [" << session.display_name() << "]"; - + // It's possible, Self node might've been added to the known peers by peer discovery. // If so remove the self from known peers. if (session.known_ipport.has_value()) @@ -282,8 +282,9 @@ namespace p2p * Sends the given message to a random peer (except self). * @param fbuf Peer outbound message to be sent to peer. * @param target_pubkey Randomly selected target peer pubkey. + * @param full_history_only Should send only to a random full history node. */ - void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf, std::string &target_pubkey) + void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf, std::string &target_pubkey, const bool full_history_only) { //Send while locking the peer_connections. std::scoped_lock lock(ctx.peer_connections_mutex); @@ -295,19 +296,41 @@ namespace p2p return; } - while (true) + peer_comm_session *session = NULL; + + if (full_history_only) + { + // Stores full history session list. + std::vector full_history_sessions; + for (auto [key, session] : ctx.peer_connections) + { + if (session->is_full_history) + full_history_sessions.push_back(session); + } + + if (full_history_sessions.size() == 0) + { + LOG_DEBUG << "No full history peers to random send."; + return; + } + auto it = full_history_sessions.begin(); + // Initialize random number generator with current timestamp. + const int random_peer_index = (rand() % full_history_sessions.size()); // Select a random peer index. + std::advance(it, random_peer_index); // Move iterator to point to random selected peer. + session = *it; + } + else { // Initialize random number generator with current timestamp. - const int random_peer_index = (rand() % connected_peers); // select a random peer index. auto it = ctx.peer_connections.begin(); - std::advance(it, random_peer_index); //move iterator to point to random selected peer. - - //send message to selected peer. - peer_comm_session *session = it->second; - session->send(msg::fbuf::builder_to_string_view(fbuf)); - target_pubkey = session->uniqueid; - break; + const int random_peer_index = (rand() % connected_peers); // Select a random peer index. + std::advance(it, random_peer_index); // Move iterator to point to random selected peer. + session = it->second; } + + //send message to selected peer. + session->send(msg::fbuf::builder_to_string_view(fbuf)); + target_pubkey = session->uniqueid; } /** diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index f13b1cb3..23e43153 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -17,6 +17,8 @@ namespace p2p constexpr uint16_t HPFS_REQ_LIST_CAP = 64; // Maximum state request count. constexpr uint16_t HPFS_RES_LIST_CAP = 64; // Maximum state response count. constexpr uint16_t PEER_LIST_CAP = 64; // Maximum peer count. + constexpr uint16_t LOG_RECORD_REQ_LIST_CAP = 64; // Maximum log record request count. + constexpr uint16_t LOG_RECORD_RES_LIST_CAP = 64; // Maximum log record response count. // Struct to represent information about a peer. // Initially available capacity is set to -1 and timestamp is set to 0. @@ -52,6 +54,11 @@ namespace p2p { return std::to_string(seq_no) + "-" + util::to_hex(hash.to_string_view()); } + + const bool empty() const + { + return seq_no == 0 && hash == util::h32_empty; + } }; // This is a helper method for sequence_hash structure which enables printing it straight away. std::ostream &operator<<(std::ostream &output, const sequence_hash &seq_hash); @@ -85,6 +92,7 @@ namespace p2p { std::string contract_id; uint32_t roundtime = 0; + bool is_full_history = false; std::string challenge; }; @@ -124,6 +132,21 @@ namespace p2p util::h32 expected_hash; // The expected hash of the requested result. }; + // Represents hpfs log sync request. + struct hpfs_log_request + { + sequence_hash target_record_id; + sequence_hash min_record_id; + }; + + // Represents hpfs log sync response. + struct hpfs_log_response + { + sequence_hash min_record_id; + sequence_hash max_record_id; + std::vector log_record_bytes; + }; + // Represents hpfs file system entry. struct hpfs_fs_hash_entry { @@ -171,6 +194,13 @@ namespace p2p // List of pairs indicating the session pubkey hex and the ledger fs hpfs responses. std::list> ledger_hpfs_responses; std::mutex ledger_hpfs_responses_mutex; // Mutex for ledger fs hpfs responses access race conditions. + + // Lists holding hpfs log requests and responses collected from incoming p2p messages. + std::list> log_record_requests; + std::mutex log_record_request_mutex; // Mutex for hpfs log request access race conditions. + + std::list> log_record_responses; + std::mutex log_record_response_mutex; // Mutex for hpfs log responses access race conditions. }; struct connected_context @@ -202,7 +232,7 @@ namespace p2p void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf); - void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf, std::string &target_pubkey); + void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf, std::string &target_pubkey, const bool full_history_only = false); void handle_proposal_message(const p2p::proposal &p); diff --git a/src/p2p/peer_comm_session.hpp b/src/p2p/peer_comm_session.hpp index 37cd7b3e..0793efe9 100644 --- a/src/p2p/peer_comm_session.hpp +++ b/src/p2p/peer_comm_session.hpp @@ -25,6 +25,7 @@ namespace p2p bool need_consensus_msg_forwarding = false; // Holds whether this node requires consensus message forwarding. bool is_unl = false; // Whether this session's pubkey is in unl list. uint32_t reported_roundtime = 0; // Initial roundtime reported by this peer on peer challenge. + bool is_full_history; // Stores whether the connection is to a full history node or not. }; } // namespace p2p diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index ca5e58e6..fbd056f8 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -12,6 +12,7 @@ #include "peer_comm_session.hpp" #include "p2p.hpp" #include "../unl.hpp" +#include "../sc/hpfs_log_sync.hpp" namespace p2pmsg = msg::fbuf::p2pmsg; @@ -108,6 +109,9 @@ namespace p2p // Remember the roundtime reported by this peer. session.reported_roundtime = chall.roundtime; + // Whether this node is a full history node or not. + session.is_full_history = chall.is_full_history; + // Sending the challenge response to the sender. flatbuffers::FlatBufferBuilder fbuf; p2pmsg::create_peer_challenge_response_from_challenge(fbuf, chall.challenge); @@ -228,6 +232,40 @@ namespace p2p LOG_DEBUG << "Ledger hpfs response rejected. Maximum response count reached. " << session.display_name(); } } + else if (mi.type == p2pmsg::P2PMsgContent_LogRecordRequest) + { + if (conf::cfg.node.history == conf::HISTORY::FULL) + { + // Check the cap and insert log record request with lock. + std::scoped_lock lock(ctx.collected_msgs.log_record_request_mutex); + + // If max number of log record requests reached, skip the rest. + if (ctx.collected_msgs.log_record_requests.size() < p2p::LOG_RECORD_REQ_LIST_CAP) + { + const p2p::hpfs_log_request hpfs_log_request = p2pmsg::create_hpfs_log_request_from_msg(mi); + ctx.collected_msgs.log_record_requests.push_back(std::make_pair(session.uniqueid, std::move(hpfs_log_request))); + } + else + LOG_DEBUG << "Hpfs log request rejected. Maximum request count reached. " << session.display_name(); + } + } + else if (mi.type == p2pmsg::P2PMsgContent_LogRecordResponse) + { + if (conf::cfg.node.history == conf::HISTORY::FULL && sc::hpfs_log_sync::sync_ctx.is_syncing) + { + // Check the cap and insert log record response with lock. + std::scoped_lock lock(ctx.collected_msgs.log_record_response_mutex); + + // If max number of log record responses reached, skip the rest. + if (ctx.collected_msgs.log_record_responses.size() < p2p::LOG_RECORD_RES_LIST_CAP) + { + const p2p::hpfs_log_response hpfs_log_response = p2pmsg::create_hpfs_log_response_from_msg(mi); + ctx.collected_msgs.log_record_responses.push_back(std::make_pair(session.uniqueid, std::move(hpfs_log_response))); + } + else + LOG_DEBUG << "Hpfs log response rejected. Maximum response count reached. " << session.display_name(); + } + } else { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); diff --git a/src/sc/hpfs_log_sync.cpp b/src/sc/hpfs_log_sync.cpp new file mode 100644 index 00000000..a534b327 --- /dev/null +++ b/src/sc/hpfs_log_sync.cpp @@ -0,0 +1,400 @@ +#include "hpfs_log_sync.hpp" +#include "../conf.hpp" +#include "../crypto.hpp" +#include "../ledger/ledger.hpp" +#include "../msg/fbuf/p2pmsg_conversion.hpp" +#include "../ledger/sqlite.hpp" + +namespace p2pmsg = msg::fbuf::p2pmsg; + +/** + * This namespace is responsible for contract state syncing in full history modes. Full history nodes cannot use normal hpfs sync since replay ability should be preserved. + * Hence log file records are requested from another full history node. +*/ +namespace sc::hpfs_log_sync +{ + constexpr int FILE_PERMS = 0644; + constexpr uint16_t SYNCER_IDLE_WAIT = 20; // log syncer loop sleep time (milliseconds). + + // Max no. of repetitive reqeust resubmissions before abandoning the sync. + constexpr uint16_t ABANDON_THRESHOLD = 10; + + // No. of milliseconds to wait before resubmitting a request. + uint16_t REQUEST_RESUBMIT_TIMEOUT; + + sync_context sync_ctx; + bool init_success = false; + + /** + * Initialize log record syncer. + */ + int init() + { + REQUEST_RESUBMIT_TIMEOUT = conf::cfg.contract.roundtime; + + sync_ctx.log_record_sync_thread = std::thread(hpfs_log_syncer_loop); + + init_success = true; + return 0; + } + + void deinit() + { + if (init_success) + { + sync_ctx.is_shutting_down = true; + sync_ctx.log_record_sync_thread.join(); + } + } + + void set_sync_target(const p2p::sequence_hash target) + { + { + std::scoped_lock lock(sync_ctx.target_log_record_mutex); + if (sync_ctx.is_shutting_down || (sync_ctx.is_syncing && sync_ctx.target_log_record == target)) + return; + + sync_ctx.target_log_record = target; + } + + if (get_verified_min_record() == -1) + return; + + sync_ctx.target_requested_on = 0; + sync_ctx.request_submissions = 0; + sync_ctx.is_syncing = true; + } + + /** + * Runs the log sync worker loop. + */ + void hpfs_log_syncer_loop() + { + util::mask_signal(); + + LOG_INFO << "Hpfs log sync: Worker started."; + + while (!sync_ctx.is_shutting_down) + { + // Indicates whether any requests/responses were processed in the loop iteration. + bool processed = false; + + // Perform log sync activities. + { + std::scoped_lock lock(sync_ctx.target_log_record_mutex); + if (!sync_ctx.target_log_record.empty()) + send_hpfs_log_sync_request(); // Send log record requests if needed (or abandon if sync timeout). + + // Process any history responses from other nodes. + if (!sync_ctx.target_log_record.empty() && check_hpfs_log_sync_responses() == 1) + processed = true; + } + + // Serve any history requests from other nodes. + if (check_hpfs_log_sync_requests() == 1) + processed = true; + + // Wait a small delay if there were no requests/responses processed during previous iteration. + if (!processed) + util::sleep(SYNCER_IDLE_WAIT); + } + + LOG_INFO << "Hpfs log sync: Worker stopped."; + } + + /** + * Submits/resubmits log record requests as needed. Abandons sync if threshold reached. + */ + void send_hpfs_log_sync_request() + { + // Check whether we need to send any requests or abandon the sync due to timeout. + const uint64_t time_now = util::get_epoch_milliseconds(); + if ((sync_ctx.target_requested_on == 0) || // Initial request. + (time_now - sync_ctx.target_requested_on) > REQUEST_RESUBMIT_TIMEOUT) // Request resubmission. + { + if (sync_ctx.request_submissions < ABANDON_THRESHOLD) + { + flatbuffers::FlatBufferBuilder fbuf; + p2pmsg::create_msg_from_hpfs_log_request(fbuf, {sync_ctx.target_log_record, sync_ctx.min_log_record}); + std::string target_pubkey; + p2p::send_message_to_random_peer(fbuf, target_pubkey, true); + if (!target_pubkey.empty()) + LOG_DEBUG << "Hpfs log sync: Requesting from [" << target_pubkey.substr(2, 10) << "]." + << " min:" << sync_ctx.min_log_record + << " target:" << sync_ctx.target_log_record; + + sync_ctx.target_requested_on = time_now; + sync_ctx.request_submissions++; + } + else + { + LOG_INFO << "Hpfs log sync: Resubmission threshold exceeded. Abandoning sync."; + sync_ctx.clear_target(); + } + } + } + + /** + * Processes any log record responses we have received from other peers. + * @return 0 if no respones were processed. 1 if at least one response was processed. + */ + int check_hpfs_log_sync_responses() + { + // Move over the collected responses to the local list. + std::list> log_record_responses; + + { + std::scoped_lock lock(p2p::ctx.collected_msgs.log_record_response_mutex); + + // Move collected hpfs responses over to local candidate responses list. + if (!p2p::ctx.collected_msgs.log_record_responses.empty()) + log_record_responses.splice(log_record_responses.end(), p2p::ctx.collected_msgs.log_record_responses); + } + + return log_record_responses.empty() ? 0 : 1; + } + + /** + * Serves any log record requests we have received from other peers. + * @return 0 if no requests were served. 1 if at least one request was served. + */ + int check_hpfs_log_sync_requests() + { + // // Move over the collected requests to the local list. + std::list> log_record_requests; + + { + std::scoped_lock lock(p2p::ctx.collected_msgs.log_record_request_mutex); + + // Move collected hpfs responses over to local candidate responses list. + if (!p2p::ctx.collected_msgs.log_record_requests.empty()) + log_record_requests.splice(log_record_requests.end(), p2p::ctx.collected_msgs.log_record_requests); + } + + for (const auto &[session_id, lr] : log_record_requests) + { + flatbuffers::FlatBufferBuilder fbuf(1024); + p2p::hpfs_log_response resp; + resp.max_record_id = lr.target_record_id; + resp.min_record_id = lr.min_record_id; + resp.log_record_bytes = std::vector(); + p2pmsg::create_msg_from_hpfs_log_response(fbuf, resp); + std::string_view msg = std::string_view(reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); + + // Find the peer that we should send the history response to. + std::scoped_lock lock(p2p::ctx.peer_connections_mutex); + const auto peer_itr = p2p::ctx.peer_connections.find(util::to_bin(session_id)); + + if (peer_itr != p2p::ctx.peer_connections.end()) + { + comm::comm_session *session = peer_itr->second; + session->send(msg); + } + } + + return log_record_requests.empty() ? 0 : 1; + } + + /** + * Check requested sequence number is in node's log file. + * @param lr log record request information. + * @return true if requested sequence number is in node's log file. + */ + bool check_required_log_record_availability(const p2p::sequence_hash &min_log_record) + { + return true; + } + + /** + * Handle recieved ledger history response. + * @param lr log record request information. + * @return 0 on successful log update. -1 on failure. + */ + int handle_hpfs_log_sync_response(const p2p::hpfs_log_response &hr, std::string &new_log_record_seqno) + { + return 0; + } + + /** + * Get the verified minimum required ledger. + * @return -1 on error, 0 on successfully setting minimum target and returns 1 if already in sync. + */ + int get_verified_min_record() + { + p2p::sequence_hash last_from_index; + if (sc::contract_fs.get_last_seq_no_from_index(last_from_index.seq_no) == -1 || + sc::contract_fs.get_hash_from_index_by_seq_no(last_from_index.hash, last_from_index.seq_no) == -1) + { + LOG_ERROR << "Error getting last ledger record data from index file."; + return -1; + } + + const p2p::sequence_hash last_from_ledger = ledger::ctx.get_lcl_id(); + + if (last_from_index.seq_no == 0) + { + // Request full ledger. + std::scoped_lock lock(sync_ctx.min_log_record_mutex); + sync_ctx.min_log_record = {ledger::genesis.seq_no, hpfs::get_root_hash(ledger::genesis.config_hash, ledger::genesis.state_hash)}; + return 0; + } + + if (last_from_index == last_from_ledger) + { + // In sync. No need to sync. + return 1; + } + + if (last_from_index.seq_no == last_from_ledger.seq_no) + { + // In a fork because hashes are not equal though the sequence numbers are equal. + if (set_joining_point_for_fork(last_from_index.seq_no - 1) == -1) + { + LOG_ERROR << "Error detecting forked position"; + return -1; + } + } + else if (last_from_ledger.seq_no > last_from_index.seq_no) + { + util::h32 root_hash_from_ledger; + if (ledger::get_root_hash_from_ledger(root_hash_from_ledger, last_from_index.seq_no) == -1) + { + LOG_ERROR << "Error getting root hash from ledger for sequence number: " << last_from_index.seq_no; + return -1; + } + + if (root_hash_from_ledger == last_from_index.hash) + { + std::scoped_lock lock(sync_ctx.min_log_record_mutex); + sync_ctx.min_log_record = last_from_index; + } + else + { + // Fork. + if (set_joining_point_for_fork(last_from_index.seq_no - 1) == -1) + { + LOG_ERROR << "Error detecting forked position"; + return -1; + } + } + } + else + { + // When index seq is greater than ledger, start from ledger and go back. + if (set_joining_point_for_fork(last_from_ledger.seq_no - 1) == -1) + { + LOG_ERROR << "Error detecting forked position"; + return -1; + } + } + + return 0; + } + + /** + * Set the joining point as the minimum log record in a case of fork condition by checking index file data + * against synced ledger data. + * @param starting_point Starting sequence number to backtrack until a joining state is found. If no joining point is found, min is set to genesis. + * @return -1 on error and 0 on success. + */ + int set_joining_point_for_fork(const uint64_t starting_point) + { + if (starting_point == 0) + { + // Request full ledger. + std::scoped_lock lock(sync_ctx.min_log_record_mutex); + sync_ctx.min_log_record = {ledger::genesis.seq_no, hpfs::get_root_hash(ledger::genesis.config_hash, ledger::genesis.state_hash)}; + return 0; + } + + const char *session_name = "get_min_verified_ledger_record"; + if (ledger::ledger_fs.start_ro_session(session_name, false) == -1) + return -1; + + std::string prev_shard_path; + sqlite3 *db = NULL; + + util::h32 ledger_root_hash; + util::h32 index_root_hash; + uint64_t current_seq_no = starting_point; + + do + { + const uint64_t shard_seq_no = (current_seq_no - 1) / ledger::PRIMARY_SHARD_SIZE; + const std::string shard_path = ledger::ledger_fs.physical_path(session_name, ledger::PRIMARY_DIR) + "/" + std::to_string(shard_seq_no); + + // Change db connection if the shard changes. + if (prev_shard_path != shard_path) + { + // Close previous session if any. + if (db != NULL) + ledger::sqlite::close_db(&db); + + if (ledger::sqlite::open_db(shard_path + "/" + ledger::DATABASE, &db) == -1) + { + LOG_ERROR << errno << ": Error openning the shard database, shard: " << shard_seq_no; + ledger::ledger_fs.stop_ro_session(session_name); + return -1; + } + prev_shard_path = shard_path; + } + + // Get root hash for the current sequence number from the ledger. + ledger::ledger_record ledger; + if (ledger::sqlite::get_ledger_by_seq_no(db, current_seq_no, ledger) == -1) + { + LOG_ERROR << "Error getting ledger by sequence number: " << current_seq_no; + ledger::sqlite::close_db(&db); + ledger::ledger_fs.stop_ro_session(session_name); + return -1; + } + // Root hash is calculated from its children(patch and state). + ledger_root_hash = hpfs::get_root_hash(ledger.config_hash, ledger.state_hash); + + // Get root hash for the current seq number from index file. + if (sc::contract_fs.get_hash_from_index_by_seq_no(index_root_hash, current_seq_no) == -1) + { + LOG_ERROR << "Error getting hash from index by sequence number: " << current_seq_no; + ledger::sqlite::close_db(&db); + ledger::ledger_fs.stop_ro_session(session_name); + return -1; + } + + current_seq_no--; + } while (current_seq_no > 0 && ledger_root_hash != index_root_hash); + + ledger::sqlite::close_db(&db); + ledger::ledger_fs.stop_ro_session(session_name); + + // Didn't found a match point until it reaches genesis. Request full ledger. + if (ledger_root_hash != index_root_hash) + { + // Remove the full log and index file data and start from scratch. + if (sc::contract_fs.truncate_log_file(1) == -1) + { + LOG_ERROR << "Error truncating hpfs log file and index file from : " << (current_seq_no - 1); + return -1; + } + // Request full ledger + std::scoped_lock lock(sync_ctx.min_log_record_mutex); + sync_ctx.min_log_record = {ledger::genesis.seq_no, hpfs::get_root_hash(ledger::genesis.config_hash, ledger::genesis.state_hash)}; + } + else + { + // To account current_seq_no-- at the loop end. + current_seq_no++; + + // We have to truncate keeping the joining record. +1 is added to account that. + if (sc::contract_fs.truncate_log_file(current_seq_no + 1) == -1) + { + LOG_ERROR << "Error truncating hpfs log file and index file from : " << (current_seq_no + 1); + return -1; + } + // we have found the joining point + std::scoped_lock lock(sync_ctx.min_log_record_mutex); + sync_ctx.min_log_record = {current_seq_no, ledger_root_hash}; + } + return 0; + } + +} // namespace ledger \ No newline at end of file diff --git a/src/sc/hpfs_log_sync.hpp b/src/sc/hpfs_log_sync.hpp new file mode 100644 index 00000000..3150e4c6 --- /dev/null +++ b/src/sc/hpfs_log_sync.hpp @@ -0,0 +1,60 @@ +#ifndef _HP_SC_HPFS_LOG_SYNC +#define _HP_SC_HPFS_LOG_SYNC + +#include "../pchheader.hpp" +#include "../p2p/p2p.hpp" + +/** + * This namespace is responsible for contract state syncing in full history modes. Full history nodes cannot use normal hpfs sync since replay ability should be preserved. + * Hence log file records are requested from another full history node. +*/ +namespace sc::hpfs_log_sync +{ + struct sync_context + { + // The current target log record that we are syncing towards. + p2p::sequence_hash target_log_record; + std::mutex target_log_record_mutex; + p2p::sequence_hash min_log_record; + std::mutex min_log_record_mutex; + uint64_t target_requested_on = 0; + uint16_t request_submissions = 0; + + std::thread log_record_sync_thread; + std::atomic is_syncing = false; + std::atomic is_shutting_down = false; + + void clear_target() + { + target_log_record = {}; + min_log_record = {}; + target_requested_on = 0; + request_submissions = 0; + is_syncing = false; + } + }; + extern sync_context sync_ctx; + + int init(); + + void deinit(); + + void set_sync_target(const p2p::sequence_hash target); + + void hpfs_log_syncer_loop(); + + void send_hpfs_log_sync_request(); + + int check_hpfs_log_sync_responses(); + + int check_hpfs_log_sync_requests(); + + bool check_required_log_record_availability(const p2p::sequence_hash &min_log_record); + + int handle_hpfs_log_sync_response(const p2p::hpfs_log_response &hr, std::string &new_lcl); + + int get_verified_min_record(); + + int set_joining_point_for_fork(const uint64_t starting_point); +} +#endif \ No newline at end of file diff --git a/src/sc/sc.cpp b/src/sc/sc.cpp index 3529f4de..4283d741 100644 --- a/src/sc/sc.cpp +++ b/src/sc/sc.cpp @@ -10,6 +10,7 @@ #include "../util/version.hpp" #include "contract_serve.hpp" #include "sc.hpp" +#include "hpfs_log_sync.hpp" namespace sc { @@ -40,17 +41,28 @@ namespace sc return -1; } - if (contract_sync_worker.init("contract", &contract_fs) == -1) + if (conf::cfg.node.history == conf::HISTORY::FULL) { - LOG_ERROR << "Contract file system sync worker initialization failed."; - return -1; + hpfs_log_sync::init(); + } + else + { + if (contract_sync_worker.init("contract", &contract_fs) == -1) + { + LOG_ERROR << "Contract file system sync worker initialization failed."; + return -1; + } } return 0; } void deinit() { - contract_sync_worker.deinit(); + if (conf::cfg.node.history == conf::HISTORY::FULL) + hpfs_log_sync::deinit(); + else + contract_sync_worker.deinit(); + contract_server.deinit(); contract_fs.deinit(); }