diff --git a/src/consensus.cpp b/src/consensus.cpp index 63748df2..20bbc01e 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -407,7 +407,7 @@ namespace consensus << " state:" << cp.state_hash << " patch:" << cp.patch_hash << " lps:" << cp.last_primary_shard_id - << " lbs:" << cp.last_raw_shard_id + << " lrs:" << cp.last_raw_shard_id << " [from:" << ((cp.pubkey == conf::cfg.node.public_key) ? "self" : util::to_hex(cp.pubkey).substr(2, 10)) << "]" << "(" << (cp.recv_timestamp > cp.sent_timestamp ? (cp.recv_timestamp - cp.sent_timestamp) : 0) << "ms)"; @@ -642,8 +642,8 @@ namespace consensus << " ts:" << p.time << " state:" << p.state_hash << " patch:" << p.patch_hash - << " last_primary_shard_id:" << p.last_primary_shard_id - << " last_raw_shard_id:" << p.last_raw_shard_id; + << " lps:" << p.last_primary_shard_id + << " lrs:" << p.last_raw_shard_id; } /** diff --git a/src/hpfs/hpfs_mount.hpp b/src/hpfs/hpfs_mount.hpp index 55a547dd..dddba97c 100644 --- a/src/hpfs/hpfs_mount.hpp +++ b/src/hpfs/hpfs_mount.hpp @@ -24,7 +24,7 @@ namespace hpfs inline uint32_t get_request_resubmit_timeout() { - return conf::cfg.contract.roundtime; + return conf::cfg.contract.roundtime * 0.7; } const util::h32 get_root_hash(const util::h32 &child_one, const util::h32 &child_two); diff --git a/src/hpfs/hpfs_serve.cpp b/src/hpfs/hpfs_serve.cpp index 0784ef62..2e62c44d 100644 --- a/src/hpfs/hpfs_serve.cpp +++ b/src/hpfs/hpfs_serve.cpp @@ -11,12 +11,14 @@ namespace p2pmsg = msg::fbuf::p2pmsg; /** - * Class for serving hpfs requests from other peers. + * Class for serving hpfs sync requests from other peers. */ namespace hpfs { constexpr uint16_t LOOP_WAIT = 20; // Milliseconds constexpr const char *HPFS_SESSION_NAME = "rw"; + constexpr uint16_t MAX_HASHMAP_RESPONSES_PER_REQUEST = 4; + constexpr uint16_t MAX_BLOCK_RESPONSES_PER_REQUEST = 1; /** * @param server_name The name of the serving instance. (For identification purpose) @@ -90,25 +92,27 @@ namespace hpfs break; } - // Session id is in binary format. Converting to hex before printing. - LOG_DEBUG << "Serving hpfs request from [" << util::to_hex(session_id).substr(2, 10) << "]"; - flatbuffers::FlatBufferBuilder fbuf; - - if (hpfs_serve::create_hpfs_response(fbuf, hr) == 1) + std::list fbufs; + if (hpfs_serve::generate_sync_responses(fbufs, hr) == 0 && !fbufs.empty()) { - // Find the peer that we should send the hpfs response to. + // Find the peer that we should send the sync responses to. std::scoped_lock lock(p2p::ctx.peer_connections_mutex); const auto peer_itr = p2p::ctx.peer_connections.find(session_id); if (peer_itr != p2p::ctx.peer_connections.end()) { - std::string_view msg = std::string_view( - reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); - comm::comm_session *session = peer_itr->second; - session->send(msg); + + for (const flatbuffers::FlatBufferBuilder &fbuf : fbufs) + { + std::string_view msg = std::string_view( + reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); + session->send(msg); + } } } + + LOG_DEBUG << "Hpfs " << name << " serve: Sent " << fbufs.size() << " replies to [" << util::to_hex(session_id).substr(2, 10) << "]"; } fs_mount->release_rw_session(); @@ -120,13 +124,12 @@ namespace hpfs } /** - * Creates the reply message for a given hpfs request. - * @param fbuf The flatbuffer builder to construct the reply message. + * Creates reply messages for a given hpfs sync request. + * @param fbufs List of flatbuffer builders containing the generated reply messages. * @param hr The hpfs request which should be replied to. - * @return 1 if successful hpfs response was generated. 0 if request is invalid - * and no response was generated. -1 on error. + * @return 0 on success. -1 on error. */ - int hpfs_serve::create_hpfs_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::hpfs_request &hr) + int hpfs_serve::generate_sync_responses(std::list &fbufs, const p2p::hpfs_request &hr) { LOG_DEBUG << "Serving hpfs req. path:" << hr.parent_path << " block_id:" << hr.block_id; @@ -136,7 +139,7 @@ namespace hpfs // Vector to hold the block bytes. Normally block size is constant BLOCK_SIZE (4MB), but the // last block of a file may have a smaller size. std::vector block; - const int result = get_data_block(block, hr.parent_path, hr.block_id, hr.expected_hash); + const int result = get_data_block_with_hash_check(block, hr.parent_path, hr.block_id, hr.expected_hash); if (result == -1) { @@ -145,14 +148,7 @@ namespace hpfs } else if (result == 1) { - p2p::block_response resp; - resp.path = hr.parent_path; - resp.block_id = hr.block_id; - resp.hash = hr.expected_hash; - resp.data = std::string_view(reinterpret_cast(block.data()), block.size()); - - p2pmsg::create_msg_from_block_response(fbuf, resp, fs_mount->mount_id); - return 1; // Success. + p2pmsg::create_msg_from_block_response(fbufs.emplace_back(), hr.block_id, block, hr.expected_hash, hr.parent_path, fs_mount->mount_id); } } else @@ -163,7 +159,7 @@ namespace hpfs std::vector block_hashes; size_t file_length = 0; mode_t file_mode = 0; - const int result = get_data_block_hashes(block_hashes, file_length, file_mode, hr.parent_path, hr.expected_hash); + const int result = get_file_block_hashes_with_hash_check(block_hashes, file_length, file_mode, hr.parent_path, hr.expected_hash); if (result == -1) { @@ -172,18 +168,40 @@ namespace hpfs } else if (result == 1) { + // By looking at the block hashmap hints the requester has provided, we also include pre-emptive data block responses + // that the requester needs. + std::vector responded_block_ids; + for (uint32_t block_id = 0; block_id < block_hashes.size(); block_id++) + { + if (responded_block_ids.size() < MAX_BLOCK_RESPONSES_PER_REQUEST && + (hr.file_hashmap_hints.size() <= block_id || hr.file_hashmap_hints[block_id] != block_hashes[block_id])) + { + std::vector block; + if (get_data_block(block, hr.parent_path, block_id) != -1) + { + p2pmsg::create_msg_from_block_response(fbufs.emplace_back(), block_id, block, block_hashes[block_id], hr.parent_path, fs_mount->mount_id); + responded_block_ids.push_back(block_id); + + if (responded_block_ids.size() == MAX_BLOCK_RESPONSES_PER_REQUEST) + break; + } + } + } + + // Generate parent reply. We must insert it at the begning of replies. + // This is the reply the requester originally requested. But we also indicate in it any pre-emptive hint responses + // we are sending along with it. p2pmsg::create_msg_from_filehashmap_response( - fbuf, hr.parent_path, fs_mount->mount_id, block_hashes, - file_length, file_mode, hr.expected_hash); - return 1; // Success. + fbufs.emplace_front(), hr.parent_path, fs_mount->mount_id, block_hashes, + responded_block_ids, file_length, file_mode, hr.expected_hash); } } else { // If the hpfs request is for a directory we need to reply with the // file system entries and their hashes inside that dir. - std::vector child_hash_nodes; - const int result = get_fs_entry_hashes(child_hash_nodes, hr.parent_path, hr.expected_hash); + std::vector fs_entries; + const int result = get_fs_entry_hashes_with_hash_check(fs_entries, hr.parent_path, hr.expected_hash); if (result == -1) { @@ -193,7 +211,7 @@ namespace hpfs else if (result == 1) { // Get dir mode. - const std::string dir_path = fs_mount->rw_dir + hr.parent_path.data(); + const std::string dir_path = fs_mount->physical_path(HPFS_SESSION_NAME, hr.parent_path); struct stat st; if (stat(dir_path.data(), &st) == -1) { @@ -201,23 +219,137 @@ namespace hpfs return -1; } + // By looking at the fs entry hints the requester has provided, we also include pre-emptive hashmap and data block + // responses that the requester needs. + generate_fs_entry_hint_responses(fbufs, fs_entries, hr.fs_entry_hints, hr.parent_path); + + // Generate parent reply. We must insert it at the begning of replies. + // This is the reply the requester originally requested. But we also indicate in it any pre-emptive hint responses + // we are sending along with it. In this case, the 'fs entries' we are replying with are already marked as having an + // accompanying pre-emptive hint response. p2pmsg::create_msg_from_fsentry_response( - fbuf, hr.parent_path, fs_mount->mount_id, st.st_mode, child_hash_nodes, hr.expected_hash); - return 1; // Success. + fbufs.emplace_front(), hr.parent_path, fs_mount->mount_id, st.st_mode, fs_entries, hr.expected_hash); } } } - LOG_DEBUG << "No hpfs response generated."; return 0; } + /** + * Generates flatbuffer messages for any pre-emptive hint responses that we should send according to the fs entry hints provided by the requester. + * @param fbufs The flatbuffer message list to populate with hint responses. + * @param fs_entries The fs entry collection that is going to be sent with the parent fs entry reply. + * @param fs_entry_hints The fs entry hints the requester has provided. + * @param parent_path The vpath of the parent directory which contains the fs entries. + */ + void hpfs_serve::generate_fs_entry_hint_responses(std::list &fbufs, std::vector &fs_entries, + const std::vector &fs_entry_hints, std::string_view parent_vpath) + { + // Counters tracking how many pre-emptive hint responses of each type we have generated so far. + size_t hashmap_responses = 0; + size_t block_responses = 0; + + // Prepare hint map to provide match comparisons based on hints provided by the requester. + std::map hint_fs_entry_map; + for (const p2p::hpfs_fs_hash_entry &hint : fs_entry_hints) + hint_fs_entry_map.emplace(hint.name, std::move(hint)); + + // For each fs entry we are replying with, look for the possibilty of generating hint responses. + for (p2p::hpfs_fs_hash_entry &entry : fs_entries) + { + // Check with provided hints to include match information. + const auto itr = hint_fs_entry_map.find(entry.name); + // Whether fs entry exists on the requesting party. + const bool exists_on_requester = itr != hint_fs_entry_map.end(); + + // Remove the entry from the hint list so at the end, the hint map will only contain children we don't possess on our side. + if (exists_on_requester) + hint_fs_entry_map.erase(entry.name); + + entry.response_type = (exists_on_requester && itr->second.hash == entry.hash) ? p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::MATCHED : p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::MISMATCHED; + + // Send hashmap hint response if we haven't reached the limit. + const bool send_hashmap_response = (entry.response_type == p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::MISMATCHED) && (hashmap_responses < MAX_HASHMAP_RESPONSES_PER_REQUEST); + if (!send_hashmap_response) + continue; + + // Reaching this point means we have to generate the hashmap hint response along with the parent fs entry reply. + + std::string child_vpath = std::string(parent_vpath) + .append(parent_vpath.back() != '/' ? "/" : "") + .append(entry.name); + if (entry.is_file) + { + std::vector block_hashes; + size_t file_length = 0; + mode_t file_mode = 0; + if (get_file_block_hashes(block_hashes, file_length, file_mode, child_vpath) != -1) + { + std::vector responded_block_ids; + + // Can additionally send block data hint response for block 0, if we know that the entire file does not exist on other side. + const bool send_block_response = !exists_on_requester && block_responses < MAX_BLOCK_RESPONSES_PER_REQUEST; + if (send_block_response) + { + std::vector block; + if (get_data_block(block, child_vpath, 0) != -1) + { + p2pmsg::create_msg_from_block_response(fbufs.emplace_back(), 0, block, block_hashes[0], child_vpath, fs_mount->mount_id); + block_responses++; + responded_block_ids.push_back(0); + } + } + + // If block response already inserted, we must insert hashmap response before that. This is because the hint resposnes must be + // sent in the logical dependency order. In this case, the hashmap hint response will indicate to the requester of any pre-emptive + // block data responses we are sending. Therefore, block data hint response must follow the hashmap hint response. + auto pos = fbufs.end(); + if (!responded_block_ids.empty()) + pos--; + p2pmsg::create_msg_from_filehashmap_response( + *fbufs.emplace(pos), child_vpath, fs_mount->mount_id, block_hashes, + responded_block_ids, file_length, file_mode, entry.hash); + + entry.response_type = p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::RESPONDED; + hashmap_responses++; + } + } + else // Is dir. + { + // This is a directory, generate an fs entry resposne for that directory. + std::vector fs_entries; + if (get_fs_entry_hashes(fs_entries, child_vpath) > 0) + { + struct stat st; + if (stat(child_vpath.data(), &st) == -1) + { + LOG_ERROR << errno << ": Error in getting dir metadata: " << child_vpath; + } + else + { + p2pmsg::create_msg_from_fsentry_response( + fbufs.emplace_back(), child_vpath, fs_mount->mount_id, st.st_mode, fs_entries, entry.hash); + + entry.response_type = p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::RESPONDED; + hashmap_responses++; + } + } + } + } + + // Take the reamainig entries in the hint list and include them in the fs entry response as not exist. + // When the requester sees this, they will remove those entries from their side. + for (const auto &[name, hint] : hint_fs_entry_map) + fs_entries.push_back(p2p::hpfs_fs_hash_entry{name, hint.is_file, util::h32_empty, p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::NOT_AVAILABLE}); + } + /** * Retrieves the specified data block from a hpfs file if expected hash matches. * @return 1 if block data was succefully fetched. 0 if vpath or block does not exist. -1 on error. */ - int hpfs_serve::get_data_block(std::vector &block, const std::string_view vpath, - const uint32_t block_id, const util::h32 expected_hash) + int hpfs_serve::get_data_block_with_hash_check(std::vector &block, const std::string_view vpath, + const uint32_t block_id, const util::h32 expected_hash) { // Check whether the existing block hash matches expected hash. std::vector block_hashes; @@ -236,53 +368,10 @@ namespace hpfs } else // Get actual block data. { - struct stat st; - const std::string file_path = fs_mount->rw_dir + vpath.data(); - const off_t block_offset = block_id * hpfs::BLOCK_SIZE; - const int fd = open(file_path.c_str(), O_RDONLY | O_CLOEXEC); - if (fd == -1) - { - LOG_ERROR << errno << ": Open failed " << file_path; + if (get_data_block(block, vpath, block_id) == -1) result = -1; - } else - { - if (fstat(fd, &st) == -1) - { - LOG_ERROR << errno << ": Stat failed. " << file_path; - result = -1; - } - else if (!S_ISREG(st.st_mode)) - { - LOG_ERROR << "Not a file. " << file_path; - result = -1; - } - else if (block_offset > st.st_size) - { - LOG_ERROR << "Block offset " << block_offset << " larger than file " << st.st_size << " - " << file_path; - result = -1; - } - else - { - const size_t read_len = MIN(hpfs::BLOCK_SIZE, (st.st_size - block_offset)); - block.resize(read_len); - - lseek(fd, block_offset, SEEK_SET); - const int res = read(fd, block.data(), read_len); - if (res < read_len) - { - LOG_ERROR << errno << ": Read failed (result:" << res - << " off:" << block_offset << " len:" << read_len << "). " << file_path; - result = -1; - } - else - { - result = 1; // Success. - } - } - - close(fd); - } + result = 1; // Success. } } @@ -291,10 +380,10 @@ namespace hpfs /** * Retrieves the specified file block hashes if expected hash matches. - * @return 1 if block hashes were successfuly fetched. 0 if vpath does not exist. -1 on error. + * @return 1 if block hashes were successfuly fetched. 0 if hash mismatch. -1 on error. */ - int hpfs_serve::get_data_block_hashes(std::vector &hashes, size_t &file_length, mode_t &file_mode, - const std::string_view vpath, const util::h32 expected_hash) + int hpfs_serve::get_file_block_hashes_with_hash_check(std::vector &hashes, size_t &file_length, mode_t &file_mode, + const std::string_view vpath, const util::h32 expected_hash) { // Check whether the existing file hash matches expected hash. util::h32 file_hash = util::h32_empty; @@ -306,24 +395,12 @@ namespace hpfs LOG_DEBUG << "Expected hash mismatch."; result = 0; } - // Get the block hashes. - else if (fs_mount->get_file_block_hashes(hashes, HPFS_SESSION_NAME, vpath) < 0) - { - result = -1; - } else { - // Get actual file metadata. - const std::string file_path = fs_mount->rw_dir + vpath.data(); - struct stat st; - if (stat(file_path.c_str(), &st) == -1) - { - LOG_ERROR << errno << ": Stat failed when getting file metadata. " << file_path; + if (get_file_block_hashes(hashes, file_length, file_mode, vpath) == -1) result = -1; - } - file_length = st.st_size; - file_mode = st.st_mode; - result = 1; // Success. + else + result = 1; // Success. } } @@ -334,8 +411,8 @@ namespace hpfs * Retrieves the specified dir entry hashes if expected fir hash matches. * @return 1 if fs entry hashes were successfuly fetched. 0 if vpath does not exist. -1 on error. */ - int hpfs_serve::get_fs_entry_hashes(std::vector &hash_nodes, - const std::string_view vpath, const util::h32 expected_hash) + int hpfs_serve::get_fs_entry_hashes_with_hash_check(std::vector &fs_entries, + const std::string_view vpath, const util::h32 expected_hash) { // Check whether the existing dir hash matches expected hash. util::h32 dir_hash = util::h32_empty; @@ -348,7 +425,7 @@ namespace hpfs result = 0; } // Get the children hash nodes. - else if (fs_mount->get_dir_children_hashes(hash_nodes, HPFS_SESSION_NAME, vpath) < 0) + else if (get_fs_entry_hashes(fs_entries, vpath) < 0) { result = -1; } @@ -361,4 +438,104 @@ namespace hpfs return result; } + /** + * Fetches the specified file data block. + * @return 0 on success. -1 on error. + */ + int hpfs_serve::get_data_block(std::vector &block, const std::string_view vpath, const uint32_t block_id) + { + struct stat st; + const std::string file_path = fs_mount->physical_path(HPFS_SESSION_NAME, vpath); + const off_t block_offset = block_id * hpfs::BLOCK_SIZE; + const int fd = open(file_path.c_str(), O_RDONLY | O_CLOEXEC); + + if (fd == -1) + { + LOG_ERROR << errno << ": Open failed " << file_path; + return -1; + } + + int result = 0; + if (fstat(fd, &st) == -1) + { + LOG_ERROR << errno << ": Stat failed. " << file_path; + result = -1; + } + else if (!S_ISREG(st.st_mode)) + { + LOG_ERROR << "Not a file. " << file_path; + result = -1; + } + else if (block_offset > st.st_size) + { + LOG_ERROR << "Block offset " << block_offset << " larger than file " << st.st_size << " - " << file_path; + result = -1; + } + else + { + const size_t read_len = MIN(hpfs::BLOCK_SIZE, (st.st_size - block_offset)); + block.resize(read_len); + + lseek(fd, block_offset, SEEK_SET); + const int res = read(fd, block.data(), read_len); + if (res < read_len) + { + LOG_ERROR << errno << ": Read failed (result:" << res + << " off:" << block_offset << " len:" << read_len << "). " << file_path; + result = -1; + } + else + { + result = 0; // Success. + } + } + + close(fd); + return result; + } + + /** + * Fetches the file data block hash list. + * @return 0 on success. -1 on error. + */ + int hpfs_serve::get_file_block_hashes(std::vector &hashes, size_t &file_length, mode_t &file_mode, const std::string_view vpath) + { + // Get the block hashes. + if (fs_mount->get_file_block_hashes(hashes, HPFS_SESSION_NAME, vpath) < 0) + { + return -1; + } + else + { + // Get actual file metadata. + const std::string file_path = fs_mount->physical_path(HPFS_SESSION_NAME, vpath); + struct stat st; + if (stat(file_path.c_str(), &st) == -1) + { + LOG_ERROR << errno << ": Stat failed when getting file metadata. " << file_path; + return -1; + } + file_length = st.st_size; + file_mode = st.st_mode; + return 0; + } + } + + /** + * Populates the list of dir entry hashes for the specified vpath. + * @return 1 on success. 0 if vpath not found. -1 on error. + */ + int hpfs_serve::get_fs_entry_hashes(std::vector &fs_entries, const std::string_view vpath) + { + std::vector child_hash_nodes; + int res = fs_mount->get_dir_children_hashes(child_hash_nodes, HPFS_SESSION_NAME, vpath); + if (res > 0) + { + for (const hpfs::child_hash_node &hn : child_hash_nodes) + fs_entries.push_back(p2p::hpfs_fs_hash_entry{hn.name, hn.is_file, hn.hash}); + } + + return res; + } + } // namespace hpfs \ No newline at end of file diff --git a/src/hpfs/hpfs_serve.hpp b/src/hpfs/hpfs_serve.hpp index 9e35c3b9..6abde18f 100644 --- a/src/hpfs/hpfs_serve.hpp +++ b/src/hpfs/hpfs_serve.hpp @@ -16,13 +16,22 @@ namespace hpfs hpfs::hpfs_mount *fs_mount = NULL; std::string_view name; void hpfs_serve_loop(); - int create_hpfs_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::hpfs_request &hr); - int get_data_block(std::vector &block, const std::string_view vpath, - const uint32_t block_id, const util::h32 expected_hash); - int get_data_block_hashes(std::vector &hashes, size_t &file_length, mode_t &file_mode, - const std::string_view vpath, const util::h32 expected_hash); - int get_fs_entry_hashes(std::vector &hash_nodes, - const std::string_view vpath, const util::h32 expected_hash); + + int generate_sync_responses(std::list &fbufs, const p2p::hpfs_request &hr); + + void generate_fs_entry_hint_responses(std::list &fbufs, std::vector &our_fs_entries, + const std::vector &fs_entry_hints, std::string_view parent_vpath); + + int get_data_block_with_hash_check(std::vector &block, const std::string_view vpath, + const uint32_t block_id, const util::h32 expected_hash); + int get_file_block_hashes_with_hash_check(std::vector &hashes, size_t &file_length, mode_t &file_mode, + const std::string_view vpath, const util::h32 expected_hash); + int get_fs_entry_hashes_with_hash_check(std::vector &fs_entries, + const std::string_view vpath, const util::h32 expected_hash); + + int get_data_block(std::vector &block, const std::string_view vpath, const uint32_t block_id); + int get_file_block_hashes(std::vector &hashes, size_t &file_length, mode_t &file_mode, const std::string_view vpath); + int get_fs_entry_hashes(std::vector &fs_entries, const std::string_view vpath); protected: std::list> hpfs_requests; diff --git a/src/hpfs/hpfs_sync.cpp b/src/hpfs/hpfs_sync.cpp index e27a2512..3f9a2a04 100644 --- a/src/hpfs/hpfs_sync.cpp +++ b/src/hpfs/hpfs_sync.cpp @@ -251,8 +251,7 @@ namespace hpfs if (should_stop_request_loop(current_target_hash)) return 0; - LOG_DEBUG << "Hpfs " << name << " sync: Processing hpfs response from [" << response.first.substr(2, 10) << "]"; - + const std::string from = response.first.substr(2, 10); // Sender pubkey. const p2pmsg::P2PMsg &msg = *p2pmsg::GetP2PMsg(response.second.data()); const p2pmsg::HpfsResponseMsg &resp_msg = *msg.content_as_HpfsResponseMsg(); @@ -264,7 +263,8 @@ namespace hpfs const auto pending_resp_itr = submitted_requests.find(key); if (pending_resp_itr == submitted_requests.end()) { - LOG_DEBUG << "Hpfs " << name << " sync: Skipping hpfs response due to hash mismatch."; + LOG_DEBUG << "Hpfs " << name << " sync: Skipping response from [" << from << "] because we are not looking for hash:" + << util::to_hex(hash).substr(0, 10) << " of " << vpath; continue; } @@ -276,17 +276,18 @@ namespace hpfs const p2pmsg::HpfsFsEntryResponse &fs_resp = *resp_msg.content_as_HpfsFsEntryResponse(); // Get fs entries we have received. - std::unordered_map peer_fs_entry_map; - p2pmsg::flatbuf_hpfsfshashentry_to_hpfsfshashentry(peer_fs_entry_map, fs_resp.entries()); + std::vector peer_fs_entries; + p2pmsg::flatbuf_hpfsfshashentries_to_hpfsfshashentries(peer_fs_entries, fs_resp.entries()); // Validate received fs data against the hash. - if (!validate_fs_entry_hash(vpath, hash, fs_resp.dir_mode(), peer_fs_entry_map)) + if (!validate_fs_entry_hash(vpath, hash, fs_resp.dir_mode(), peer_fs_entries)) { - LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to fs entry hash mismatch."; + LOG_INFO << "Hpfs " << name << " sync: Skipping response from [" << from << "] due to fs entry hash mismatch."; continue; } - handle_fs_entry_response(vpath, fs_resp.dir_mode(), peer_fs_entry_map); + LOG_DEBUG << "Hpfs " << name << " sync: Processing fs entries response from [" << from << "] for " << vpath; + handle_fs_entry_response(vpath, fs_resp.dir_mode(), peer_fs_entries); } else if (msg_type == p2pmsg::HpfsResponse_HpfsFileHashMapResponse) { @@ -299,11 +300,22 @@ namespace hpfs // Validate received hashmap against the hash. if (!validate_file_hashmap_hash(vpath, hash, file_resp.file_mode(), block_hashes, block_hash_count)) { - LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to file hashmap hash mismatch."; + LOG_INFO << "Hpfs " << name << " sync: Skipping response from [" << from << "] due to file hashmap hash mismatch."; continue; } - handle_file_hashmap_response(vpath, file_resp.file_mode(), block_hashes, block_hash_count, file_resp.file_length()); + std::set responded_block_ids; + { + const flatbuffers::Vector *fbvec = file_resp.responded_block_ids(); + const uint32_t *ptr = file_resp.responded_block_ids()->data(); + const size_t count = file_resp.responded_block_ids()->size(); + for (size_t i = 0; i < count; i++) + responded_block_ids.emplace(ptr[i]); + } + + LOG_DEBUG << "Hpfs " << name << " sync: Processing file block hashes response from [" << from << "] for " << vpath; + handle_file_hashmap_response(vpath, file_resp.file_mode(), block_hashes, block_hash_count, + responded_block_ids, file_resp.file_length()); } else if (msg_type == p2pmsg::HpfsResponse_HpfsBlockResponse) { @@ -316,10 +328,12 @@ namespace hpfs // Validate received block data against the hash. if (!validate_file_block_hash(hash, block_id, buf)) { - LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to file block hash mismatch."; + LOG_INFO << "Hpfs " << name << " sync: Skipping response from [" << from << "] due to file block hash mismatch."; continue; } + LOG_DEBUG << "Hpfs " << name << " sync: Processing block response from [" << from << "] for block_id:" << block_id + << " (len:" << buf.length() << ") of " << vpath; handle_file_block_response(vpath, block_id, buf); } @@ -337,7 +351,8 @@ namespace hpfs // Update the central hpfs state tracker. fs_mount->set_parent_hash(current_target.vpath, updated_state); - LOG_DEBUG << "Hpfs " << name << " sync: current:" << updated_state << " | target:" << current_target_hash; + LOG_DEBUG << "Hpfs " << name << " sync: current:" << updated_state << " | target:" << current_target_hash + << " (" << current_target.vpath << ")"; if (updated_state == current_target_hash) return 0; } @@ -377,8 +392,7 @@ namespace hpfs // Reset the counter and re-submit request. request.waiting_time = 0; - LOG_DEBUG << "Hpfs " << name << " sync: Resubmitting request..."; - submit_request(request); + submit_request(request, false, true); } } @@ -391,8 +405,7 @@ namespace hpfs if (should_stop_request_loop(current_target_hash)) return 0; - const backlog_item &request = pending_requests.front(); - submit_request(request); + submit_request(pending_requests.front()); pending_requests.pop_front(); } } @@ -405,11 +418,11 @@ namespace hpfs * @param vpath Virtual path of the fs. * @param hash Received hash. * @param dir_mode Metadata 'mode' of the directory containing the fs entries. - * @param fs_entry_map Received fs entry map. + * @param peer_fs_entries Received peer fs entries. * @returns true if hash is valid, otherwise false. */ bool hpfs_sync::validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const mode_t dir_mode, - const std::unordered_map &fs_entry_map) + const std::vector &peer_fs_entries) { util::h32 content_hash; @@ -423,7 +436,7 @@ namespace hpfs content_hash ^= crypto::get_hash(mode_bytes, sizeof(mode_bytes)); // Then XOR the file hashes to the initial hash. - for (const auto &[name, fs_entry] : fs_entry_map) + for (const p2p::hpfs_fs_hash_entry &fs_entry : peer_fs_entries) { content_hash ^= fs_entry.hash; } @@ -511,6 +524,24 @@ namespace hpfs hr.expected_hash = expected_hash; hr.mount_id = fs_mount->mount_id; + // Include appropriate hints in the request, so the peer can send pre-emptive responses that are useful to us without having + // to submit additional requests. + if (!hr.is_file) // Dir fs entry request. + { + // Include fs entry information from our side in the request. + std::vector child_hash_nodes; + fs_mount->get_dir_children_hashes(child_hash_nodes, hpfs::RW_SESSION_NAME, path); + + for (const hpfs::child_hash_node &hn : child_hash_nodes) + hr.fs_entry_hints.push_back(p2p::hpfs_fs_hash_entry{hn.name, hn.is_file, hn.hash}); + } + else if (hr.is_file && hr.block_id == -1) // File hash map request. + { + // Include file hash map information from our side in the request (file might not exist on our side). + if (fs_mount->get_file_block_hashes(hr.file_hashmap_hints, hpfs::RW_SESSION_NAME, hr.parent_path) == -1) + hr.file_hashmap_hints.clear(); + } + flatbuffers::FlatBufferBuilder fbuf; p2pmsg::create_msg_from_hpfs_request(fbuf, hr); p2p::send_message_to_random_peer(fbuf, target_pubkey); //todo: send to a node that hold the expected hash to improve reliability of retrieving hpfs state. @@ -518,39 +549,49 @@ namespace hpfs /** * Submits a pending hpfs request to the peer. + * @param request The request to submit and start watching for response. + * @param watch_only Whether to actually send the request or watch for corresponding response only. + * Used for hint response monitoring. + * @param is_resubmit Whether this is a request resubmission or not. */ - void hpfs_sync::submit_request(const backlog_item &request) + void hpfs_sync::submit_request(const backlog_item &request, const bool watch_only, const bool is_resubmit) { const std::string key = std::string(request.path) .append(reinterpret_cast(&request.expected_hash), sizeof(util::h32)); submitted_requests.try_emplace(key, request); - const bool is_file = request.type != BACKLOG_ITEM_TYPE::DIR; - std::string target_pubkey; - request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, target_pubkey); - - if (!target_pubkey.empty()) - LOG_DEBUG << "Hpfs " << name << " sync: Requesting from [" << target_pubkey.substr(2, 10) << "]. type:" << request.type + if (watch_only) + { + LOG_DEBUG << "Hpfs " << name << " sync: Watching response for request. type:" << request.type << " path:" << request.path << " block_id:" << request.block_id << " hash:" << request.expected_hash; + } + else + { + const bool is_file = request.type != BACKLOG_ITEM_TYPE::DIR; + std::string target_pubkey; + request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, target_pubkey); + + LOG_DEBUG << "Hpfs " << name << " sync: " << (is_resubmit ? "Re-submitting" : "Submitting") + << " request to [" << (target_pubkey.empty() ? "" : target_pubkey.substr(2, 10)) << "]. type:" << request.type + << " path:" << request.path << " block_id:" << request.block_id + << " hash:" << request.expected_hash; + } } /** * Process dir children response. * @param vpath Virtual path of the fs. * @param dir_mode Metadata 'mode' of dir. - * @param fs_entry_map Received fs entry map. + * @param peer_fs_entries Received peer fs entries. * @returns 0 on success and no fs write peformed. 1 if write performed. -1 on failure. */ - int hpfs_sync::handle_fs_entry_response(std::string_view vpath, const mode_t dir_mode, std::unordered_map &fs_entry_map) + int hpfs_sync::handle_fs_entry_response(std::string_view vpath, const mode_t dir_mode, const std::vector &peer_fs_entries) { - // Get the parent path of the fs entries we have received. - LOG_DEBUG << "Hpfs " << name << " sync: Processing fs entries response for " << vpath; - bool write_performed = false; // Create physical directory on our side if not exist. - std::string parent_physical_path = fs_mount->rw_dir + vpath.data(); + std::string parent_physical_path = fs_mount->physical_path(hpfs::RW_SESSION_NAME, vpath); if (util::create_dir_tree_recursive(parent_physical_path) == -1) return -1; @@ -561,63 +602,40 @@ namespace hpfs else if (metadata_res == 1) write_performed = true; - // Get the children hash entries and compare with what we got from peer. - std::vector existing_fs_entries; - if (fs_mount->get_dir_children_hashes(existing_fs_entries, hpfs::RW_SESSION_NAME, vpath) == -1) - return -1; - - // Request more info on fs entries that exist on both sides but are different. - for (const auto &ex_entry : existing_fs_entries) + for (const p2p::hpfs_fs_hash_entry &entry : peer_fs_entries) { // Construct child vpath. std::string child_vpath = std::string(vpath) .append(vpath.back() != '/' ? "/" : "") - .append(ex_entry.name); + .append(entry.name); - const auto peer_itr = fs_entry_map.find(ex_entry.name); - if (peer_itr != fs_entry_map.end()) + if (entry.response_type == p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::MISMATCHED) { - // Request hpfs state if hash is different. - if (peer_itr->second.hash != ex_entry.hash) - { - // Prioritize file hpfs requests over directories. - if (ex_entry.is_file) - pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, peer_itr->second.hash}); - else - pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, peer_itr->second.hash}); - } - - fs_entry_map.erase(peer_itr); + // We must request for this entry. Prioritize file hpfs requests over directories. + if (entry.is_file) + pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, entry.hash}); + else + pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, entry.hash}); } - else + else if (entry.response_type == p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::RESPONDED) { - // If there was an entry that does not exist on other side, delete it. - std::string child_physical_path = fs_mount->rw_dir + child_vpath.data(); + // The peer has already responded with a pre-emptive hint response. So we must start watching for it. + submit_request(backlog_item{entry.is_file ? BACKLOG_ITEM_TYPE::FILE : BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, entry.hash}, true); + } + else if (entry.response_type == p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::NOT_AVAILABLE) + { + // This entry is not available in peer. So we must delete it from our side. + std::string child_physical_path = fs_mount->physical_path(hpfs::RW_SESSION_NAME, child_vpath); - if ((ex_entry.is_file && unlink(child_physical_path.c_str()) == -1) || - !ex_entry.is_file && util::remove_directory_recursively(child_physical_path.c_str()) == -1) + if ((entry.is_file && unlink(child_physical_path.c_str()) == -1) || + !entry.is_file && util::remove_directory_recursively(child_physical_path.c_str()) == -1) return -1; write_performed = true; - LOG_DEBUG << "Hpfs " << name << " sync: Deleted " << (ex_entry.is_file ? "file" : "dir") << " path " << child_vpath; + LOG_DEBUG << "Hpfs " << name << " sync: Deleted " << (entry.is_file ? "file" : "dir") << " path " << child_vpath; } } - // Queue the remaining peer fs entries (that our side does not have at all) to request. - for (const auto &[name, fs_entry] : fs_entry_map) - { - // Construct child vpath. - std::string child_vpath = std::string(vpath) - .append(vpath.back() != '/' ? "/" : "") - .append(name); - - // Prioritize file hpfs requests over directories. - if (fs_entry.is_file) - pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, fs_entry.hash}); - else - pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, fs_entry.hash}); - } - return write_performed ? 1 : 0; } @@ -627,14 +645,13 @@ namespace hpfs * @param file_mode Received metadata mode of the file. * @param hashes Received block hashes. * @param hash_count No. of received block hashes. + * @param responded_block_ids List of block ids already responded by the peer. * @param file_length Size of the file. * @returns 0 on success and no write operation performed. 1 if write opreation peformed. -1 on failure. */ - int hpfs_sync::handle_file_hashmap_response(std::string_view vpath, const mode_t file_mode, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length) + int hpfs_sync::handle_file_hashmap_response(std::string_view vpath, const mode_t file_mode, const util::h32 *hashes, const size_t hash_count, + const std::set &responded_block_ids, const uint64_t file_length) { - // Get the file path of the block hashes we have received. - LOG_DEBUG << "Hpfs " << name << " sync: Processing file block hashes response for " << vpath; - bool write_performed = false; // File block hashes on our side (file might not exist on our side). @@ -648,15 +665,22 @@ namespace hpfs const int32_t max_block_id = MAX(existing_hash_count, hash_count) - 1; for (int32_t block_id = 0; block_id <= max_block_id; block_id++) { - // Insert at front to give priority to block requests while preserving block order. - if (block_id >= existing_hash_count || existing_hashes[block_id] != hashes[block_id]) + if (responded_block_ids.count(block_id) == 1) + { + // The peer has already responded with a hint response. So we must start watching for it. + submit_request(backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]}, true); + } + else if (block_id >= existing_hash_count || existing_hashes[block_id] != hashes[block_id]) + { + // Insert at front to give priority to block requests while preserving block order. pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]}); + } } if (existing_hashes.size() >= hash_count) { // If peer file might be smaller, truncate our file to match with peer file. - std::string file_physical_path = fs_mount->rw_dir + vpath.data(); + std::string file_physical_path = fs_mount->physical_path(hpfs::RW_SESSION_NAME, vpath); if (truncate(file_physical_path.c_str(), file_length) == -1) return -1; @@ -664,7 +688,7 @@ namespace hpfs } // Apply physical file mode if received mode is different from our side. - const std::string physical_path = fs_mount->rw_dir + vpath.data(); + const std::string physical_path = fs_mount->physical_path(hpfs::RW_SESSION_NAME, vpath); const int metadata_res = apply_metadata_mode(physical_path, file_mode, false); if (metadata_res == -1) return -1; @@ -683,11 +707,7 @@ namespace hpfs */ int hpfs_sync::handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf) { - LOG_DEBUG << "Hpfs " << name << " sync: Writing block_id " << block_id - << " (len:" << buf.length() - << ") of " << vpath; - - std::string file_physical_path = fs_mount->rw_dir + vpath.data(); + std::string file_physical_path = fs_mount->physical_path(hpfs::RW_SESSION_NAME, vpath); const int fd = open(file_physical_path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, FILE_PERMS); if (fd == -1) { diff --git a/src/hpfs/hpfs_sync.hpp b/src/hpfs/hpfs_sync.hpp index 8c06e51f..c98e1b15 100644 --- a/src/hpfs/hpfs_sync.hpp +++ b/src/hpfs/hpfs_sync.hpp @@ -71,7 +71,7 @@ namespace hpfs int start_syncing_next_target(); bool validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const mode_t dir_mode, - const std::unordered_map &fs_entry_map); + const std::vector &peer_fs_entries); bool validate_file_hashmap_hash(std::string_view vpath, std::string_view hash, const mode_t file_mode, const util::h32 *hashes, const size_t hash_count); @@ -83,11 +83,12 @@ namespace hpfs void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, const util::h32 expected_hash, std::string &target_pubkey); - void submit_request(const backlog_item &request); + void submit_request(const backlog_item &request, const bool watch_only = false, const bool is_resubmit = false); - int handle_fs_entry_response(std::string_view vpath, const mode_t dir_mode, std::unordered_map &fs_entry_map); + int handle_fs_entry_response(std::string_view vpath, const mode_t dir_mode, const std::vector &peer_fs_entries); - int handle_file_hashmap_response(std::string_view vpath, const mode_t file_mode, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length); + int handle_file_hashmap_response(std::string_view vpath, const mode_t file_mode, const util::h32 *hashes, const size_t hash_count, + const std::set &responded_block_ids, const uint64_t file_length); int handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf); diff --git a/src/ledger/ledger_sync.cpp b/src/ledger/ledger_sync.cpp index a4d472d9..9f171b49 100644 --- a/src/ledger/ledger_sync.cpp +++ b/src/ledger/ledger_sync.cpp @@ -40,7 +40,6 @@ namespace ledger return; } - util::h32 prev_shard_hash_from_hpfs; const std::string shard_parent_dir = synced_target.vpath.substr(0, pos); if (shard_parent_dir == PRIMARY_DIR) @@ -75,8 +74,12 @@ namespace ledger last_primary_shard_seq_no - synced_shard_seq_no + 1 < conf::cfg.node.history_config.max_primary_shards) { // Check whether the hash of the previous shard matches with the hash in the prev_shard.hash file. - const std::string prev_shard_vpath = std::string(PRIMARY_DIR).append("/").append(std::to_string(--synced_shard_seq_no)); - fs_mount->get_hash(prev_shard_hash_from_hpfs, hpfs::RW_SESSION_NAME, prev_shard_vpath); + util::h32 prev_shard_hash_from_hpfs = util::h32_empty; + if (synced_shard_seq_no > 0) + { + const std::string prev_shard_vpath = std::string(PRIMARY_DIR).append("/").append(std::to_string(--synced_shard_seq_no)); + fs_mount->get_hash(prev_shard_hash_from_hpfs, hpfs::RW_SESSION_NAME, prev_shard_vpath); + } if (prev_shard_hash_from_file != util::h32_empty // Hash in the prev_shard.hash of the 0th shard is h32 empty. Syncing should be stopped then. && prev_shard_hash_from_file != prev_shard_hash_from_hpfs) // Continue to sync backwards if the hash from prev_shard.hash is not matching with the shard hash from hpfs. @@ -123,8 +126,12 @@ namespace ledger last_raw_shard_seq_no - synced_shard_seq_no + 1 < conf::cfg.node.history_config.max_raw_shards) { // Check whether the hash of the previous raw shard matches with the hash in the prev_shard.hash file. - const std::string prev_shard_vpath = std::string(RAW_DIR).append("/").append(std::to_string(--synced_shard_seq_no)); - fs_mount->get_hash(prev_shard_hash_from_hpfs, hpfs::RW_SESSION_NAME, prev_shard_vpath); + util::h32 prev_shard_hash_from_hpfs = util::h32_empty; + if (synced_shard_seq_no > 0) + { + const std::string prev_shard_vpath = std::string(RAW_DIR).append("/").append(std::to_string(--synced_shard_seq_no)); + fs_mount->get_hash(prev_shard_hash_from_hpfs, hpfs::RW_SESSION_NAME, prev_shard_vpath); + } if (prev_shard_hash_from_file != util::h32_empty // Hash in the prev_shard.hash of the 0th shard is h32 empty. Syncing should be stopped then. && prev_shard_hash_from_file != prev_shard_hash_from_hpfs) // Continue to sync backwards if the hash from prev_shard.hash is not matching with the shard hash from hpfs. diff --git a/src/msg/fbuf/p2pmsg.fbs b/src/msg/fbuf/p2pmsg.fbs index a9106842..1883e8a1 100644 --- a/src/msg/fbuf/p2pmsg.fbs +++ b/src/msg/fbuf/p2pmsg.fbs @@ -81,82 +81,100 @@ table NplMsg { // Make sure to update signature generation/verification whenever these fields are changed. } -table HpfsRequestMsg { - mount_id: uint32; - parent_path:string; - is_file:bool; - block_id:int32; - expected_hash:[ubyte]; -} +//--hpfs requests and responses--// -union HpfsResponse { - HpfsFileHashMapResponse, - HpfsBlockResponse, - HpfsFsEntryResponse -} +enum HpfsFsEntryResponseType : byte { Matched = 0, Mismatched = 1, Responded = 2, NotAvailable = 3 } -table HpfsResponseMsg{ - hash:[ubyte]; - path: string; - mount_id: uint32; - content:HpfsResponse; -} - -table HpfsFsEntryResponse{ - dir_mode:uint32; - entries: [HpfsFSHashEntry]; -} - -table HpfsFileHashMapResponse{ - file_length:uint64; - file_mode:uint32; - hash_map:[ubyte]; -} - -table HpfsBlockResponse{ - block_id:uint32; - data: [ubyte]; -} - -table HpfsFSHashEntry{ +table HpfsFSHashEntry { name: string; is_file: bool; hash: [ubyte]; + response_type: HpfsFsEntryResponseType; } -table HpfsLogRequest -{ - target_seq_no:uint64; - min_record_id:SequenceHash; +table HpfsFsEntryHint { + entries: [HpfsFSHashEntry]; } -table HpfsLogResponse -{ - min_record_id:SequenceHash; - log_record_bytes:[ubyte]; +table HpfsFileHashMapHint { + hash_map: [ubyte]; } -table PeerRequirementAnnouncementMsg{ +union HpfsRequestHint { + HpfsFsEntryHint, + HpfsFileHashMapHint +} + +table HpfsRequestMsg { + mount_id: uint32; + parent_path: string; + is_file: bool; + block_id: int32; + expected_hash: [ubyte]; + hint: HpfsRequestHint; +} + +union HpfsResponse { + HpfsFsEntryResponse, + HpfsFileHashMapResponse, + HpfsBlockResponse +} + +table HpfsResponseMsg { + hash: [ubyte]; + path: string; + mount_id: uint32; + content: HpfsResponse; +} + +table HpfsFsEntryResponse { + dir_mode: uint32; + entries: [HpfsFSHashEntry]; +} + +table HpfsFileHashMapResponse { + file_length: uint64; + file_mode: uint32; + hash_map: [ubyte]; + responded_block_ids: [uint32]; +} + +table HpfsBlockResponse { + block_id: uint32; + data: [ubyte]; +} + +table HpfsLogRequest { + target_seq_no: uint64; + min_record_id: SequenceHash; +} + +table HpfsLogResponse { + min_record_id: SequenceHash; + log_record_bytes: [ubyte]; +} + +table PeerRequirementAnnouncementMsg { need_consensus_msg_forwarding: bool; } -table PeerCapacityAnnouncementMsg{ - available_capacity:int16; - timestamp:uint64; +table PeerCapacityAnnouncementMsg { + available_capacity: int16; + timestamp: uint64; } -table PeerListRequestMsg{ +table PeerListRequestMsg { } -table PeerListResponseMsg{ +table PeerListResponseMsg { peer_list: [PeerProperties]; } table PeerProperties { - host_address:string; - port:uint16; - available_capacity:int16; - timestamp:uint64; + host_address: string; + port: uint16; + available_capacity: int16; + timestamp: uint64; } table SequenceHash { @@ -165,7 +183,7 @@ table SequenceHash { } table ByteArray { // To help represent list of byte arrays - array:[ubyte]; + array: [ubyte]; } root_type P2PMsg; //root type for all messages \ No newline at end of file diff --git a/src/msg/fbuf/p2pmsg_conversion.cpp b/src/msg/fbuf/p2pmsg_conversion.cpp index bc1b2041..7cb6cac7 100644 --- a/src/msg/fbuf/p2pmsg_conversion.cpp +++ b/src/msg/fbuf/p2pmsg_conversion.cpp @@ -206,6 +206,19 @@ namespace msg::fbuf::p2pmsg hr.is_file = msg.is_file(); hr.parent_path = flatbuf_str_to_sv(msg.parent_path()); hr.expected_hash = flatbuf_bytes_to_hash(msg.expected_hash()); + + if (msg.hint_type() == HpfsRequestHint_HpfsFsEntryHint) + { + flatbuf_hpfsfshashentries_to_hpfsfshashentries(hr.fs_entry_hints, msg.hint_as_HpfsFsEntryHint()->entries()); + } + else if (msg.hint_type() == HpfsRequestHint_HpfsFileHashMapHint) + { + const HpfsFileHashMapHint *hint = msg.hint_as_HpfsFileHashMapHint(); + const size_t block_hash_count = hint->hash_map()->size() / sizeof(util::h32); + hr.file_hashmap_hints.resize(block_hash_count); + memcpy(hr.file_hashmap_hints.data(), hint->hash_map()->data(), hint->hash_map()->size()); + } + return hr; } @@ -266,7 +279,7 @@ namespace msg::fbuf::p2pmsg return map; } - void flatbuf_hpfsfshashentry_to_hpfsfshashentry(std::unordered_map &fs_entries, const flatbuffers::Vector> *fhashes) + void flatbuf_hpfsfshashentries_to_hpfsfshashentries(std::vector &fs_entries, const flatbuffers::Vector> *fhashes) { for (const HpfsFSHashEntry *f_hash : *fhashes) { @@ -274,8 +287,9 @@ namespace msg::fbuf::p2pmsg entry.name = flatbuf_str_to_sv(f_hash->name()); entry.is_file = f_hash->is_file(); entry.hash = flatbuf_bytes_to_hash(f_hash->hash()); + entry.response_type = (p2p::HPFS_FS_ENTRY_RESPONSE_TYPE)f_hash->response_type(); - fs_entries.emplace(entry.name, std::move(entry)); + fs_entries.push_back(std::move(entry)); } } @@ -409,6 +423,33 @@ namespace msg::fbuf::p2pmsg } void create_msg_from_hpfs_request(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_request &hr) + { + if (!hr.is_file) // Dir fs entry request. + { + const auto hint = CreateHpfsFsEntryHint( + builder, + hpfsfshashentry_to_flatbuf_hpfsfshashentry(builder, hr.fs_entry_hints)); + + create_hpfs_request_msg(builder, hr, HpfsRequestHint_HpfsFsEntryHint, hint.Union()); + } + else if (hr.is_file && hr.block_id == -1) // File hash map request. + { + std::string_view hashmap_sv(reinterpret_cast(hr.file_hashmap_hints.data()), hr.file_hashmap_hints.size() * sizeof(util::h32)); + + const auto hint = CreateHpfsFileHashMapHint( + builder, + sv_to_flatbuf_bytes(builder, hashmap_sv)); + + create_hpfs_request_msg(builder, hr, HpfsRequestHint_HpfsFileHashMapHint, hint.Union()); + } + else + { + create_hpfs_request_msg(builder, hr); + } + } + + void create_hpfs_request_msg(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_request &hr, + msg::fbuf::p2pmsg::HpfsRequestHint hint_type, flatbuffers::Offset hint) { const auto msg = CreateHpfsRequestMsg( builder, @@ -416,7 +457,9 @@ namespace msg::fbuf::p2pmsg sv_to_flatbuf_str(builder, hr.parent_path), hr.is_file, hr.block_id, - hash_to_flatbuf_bytes(builder, hr.expected_hash)); + hash_to_flatbuf_bytes(builder, hr.expected_hash), + hint_type, + hint); create_p2p_msg(builder, P2PMsgContent_HpfsRequestMsg, msg.Union()); } @@ -443,12 +486,12 @@ namespace msg::fbuf::p2pmsg void create_msg_from_fsentry_response( flatbuffers::FlatBufferBuilder &builder, const std::string_view path, const uint32_t mount_id, const mode_t dir_mode, - std::vector &hash_nodes, const util::h32 &expected_hash) + std::vector &fs_entries, const util::h32 &expected_hash) { const auto child_msg = CreateHpfsFsEntryResponse( builder, dir_mode, - hpfsfshashentry_to_flatbuf_hpfsfshashentry(builder, hash_nodes)); + hpfsfshashentry_to_flatbuf_hpfsfshashentry(builder, fs_entries)); const auto msg = CreateHpfsResponseMsg( builder, @@ -463,7 +506,8 @@ namespace msg::fbuf::p2pmsg void create_msg_from_filehashmap_response( flatbuffers::FlatBufferBuilder &builder, std::string_view path, const uint32_t mount_id, - std::vector &hashmap, const std::size_t file_length, const mode_t file_mode, const util::h32 &expected_hash) + const std::vector &hashmap, const std::vector &responded_block_ids, + const std::size_t file_length, const mode_t file_mode, const util::h32 &expected_hash) { std::string_view hashmap_sv(reinterpret_cast(hashmap.data()), hashmap.size() * sizeof(util::h32)); @@ -471,7 +515,8 @@ namespace msg::fbuf::p2pmsg builder, file_length, file_mode, - sv_to_flatbuf_bytes(builder, hashmap_sv)); + sv_to_flatbuf_bytes(builder, hashmap_sv), + builder.CreateVector(responded_block_ids)); const auto msg = CreateHpfsResponseMsg( builder, @@ -484,17 +529,18 @@ namespace msg::fbuf::p2pmsg create_p2p_msg(builder, P2PMsgContent_HpfsResponseMsg, msg.Union()); } - void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &builder, p2p::block_response &block_resp, const uint32_t mount_id) + void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &builder, const uint32_t block_id, const std::vector &block_data, + const util::h32 &block_hash, std::string_view parent_path, const uint32_t mount_id) { const auto child_msg = CreateHpfsBlockResponse( builder, - block_resp.block_id, - sv_to_flatbuf_bytes(builder, block_resp.data)); + block_id, + builder.CreateVector(block_data)); const auto msg = CreateHpfsResponseMsg( builder, - hash_to_flatbuf_bytes(builder, block_resp.hash), - sv_to_flatbuf_str(builder, block_resp.path), + hash_to_flatbuf_bytes(builder, block_hash), + sv_to_flatbuf_str(builder, parent_path), mount_id, HpfsResponse_HpfsBlockResponse, child_msg.Union()); @@ -562,19 +608,18 @@ namespace msg::fbuf::p2pmsg } const flatbuffers::Offset>> - hpfsfshashentry_to_flatbuf_hpfsfshashentry( - flatbuffers::FlatBufferBuilder &builder, - std::vector &hash_nodes) + hpfsfshashentry_to_flatbuf_hpfsfshashentry(flatbuffers::FlatBufferBuilder &builder, const std::vector &fs_entries) { std::vector> fbvec; - fbvec.reserve(hash_nodes.size()); - for (auto const &hash_node : hash_nodes) + fbvec.reserve(fs_entries.size()); + for (auto const &fs_entry : fs_entries) { flatbuffers::Offset hpfs_fs_entry = CreateHpfsFSHashEntry( builder, - sv_to_flatbuf_str(builder, hash_node.name), - hash_node.is_file, - hash_to_flatbuf_bytes(builder, hash_node.hash)); + sv_to_flatbuf_str(builder, fs_entry.name), + fs_entry.is_file, + hash_to_flatbuf_bytes(builder, fs_entry.hash), + (HpfsFsEntryResponseType)fs_entry.response_type); fbvec.push_back(hpfs_fs_entry); } diff --git a/src/msg/fbuf/p2pmsg_conversion.hpp b/src/msg/fbuf/p2pmsg_conversion.hpp index b49db197..ab670015 100644 --- a/src/msg/fbuf/p2pmsg_conversion.hpp +++ b/src/msg/fbuf/p2pmsg_conversion.hpp @@ -3,7 +3,6 @@ #include "../../pchheader.hpp" #include "../../p2p/p2p.hpp" -#include "../../hpfs/hpfs_mount.hpp" #include "p2pmsg_generated.h" namespace msg::fbuf::p2pmsg @@ -48,7 +47,7 @@ namespace msg::fbuf::p2pmsg const std::unordered_map> flatbuf_user_input_group_to_user_input_map(const flatbuffers::Vector> *fbvec); - void flatbuf_hpfsfshashentry_to_hpfsfshashentry(std::unordered_map &fs_entries, const flatbuffers::Vector> *fhashes); + void flatbuf_hpfsfshashentries_to_hpfsfshashentries(std::vector &fs_entries, const flatbuffers::Vector> *fhashes); const std::vector flatbuf_peer_propertieslist_to_peer_propertiesvector(const flatbuffers::Vector> *fbvec); @@ -73,19 +72,24 @@ namespace msg::fbuf::p2pmsg void create_msg_from_hpfs_request(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_request &hr); + void create_hpfs_request_msg(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_request &hr, + msg::fbuf::p2pmsg::HpfsRequestHint hint_type = HpfsRequestHint_NONE, flatbuffers::Offset hint = 0); + void create_msg_from_hpfs_log_request(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_log_request &hpfs_log_request); void create_msg_from_hpfs_log_response(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_log_response &hpfs_log_response); void create_msg_from_fsentry_response( flatbuffers::FlatBufferBuilder &builder, const std::string_view path, const uint32_t mount_id, const mode_t dir_mode, - std::vector &hash_nodes, const util::h32 &expected_hash); + std::vector &fs_entries, const util::h32 &expected_hash); void create_msg_from_filehashmap_response( flatbuffers::FlatBufferBuilder &builder, std::string_view path, const uint32_t mount_id, - std::vector &hashmap, const std::size_t file_length, const mode_t file_mode, const util::h32 &expected_hash); + const std::vector &hashmap, const std::vector &responded_block_ids, + const std::size_t file_length, const mode_t file_mode, const util::h32 &expected_hash); - void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &builder, p2p::block_response &block_resp, const uint32_t mount_id); + void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &builder, const uint32_t block_id, const std::vector &block_data, + const util::h32 &block_hash, std::string_view parent_path, const uint32_t mount_id); void create_msg_from_peer_requirement_announcement(flatbuffers::FlatBufferBuilder &builder, const bool need_consensus_msg_forwarding); @@ -99,9 +103,7 @@ namespace msg::fbuf::p2pmsg user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map); const flatbuffers::Offset>> - hpfsfshashentry_to_flatbuf_hpfsfshashentry( - flatbuffers::FlatBufferBuilder &builder, - std::vector &hash_nodes); + hpfsfshashentry_to_flatbuf_hpfsfshashentry(flatbuffers::FlatBufferBuilder &builder, const std::vector &fs_entries); const flatbuffers::Offset>> peer_propertiesvector_to_flatbuf_peer_propertieslist(flatbuffers::FlatBufferBuilder &builder, const std::vector &peers, const std::optional &skipping_ip_port); diff --git a/src/msg/fbuf/p2pmsg_generated.h b/src/msg/fbuf/p2pmsg_generated.h index c86f7108..d5ea79c4 100644 --- a/src/msg/fbuf/p2pmsg_generated.h +++ b/src/msg/fbuf/p2pmsg_generated.h @@ -34,6 +34,15 @@ struct ProposalMsgBuilder; struct NplMsg; struct NplMsgBuilder; +struct HpfsFSHashEntry; +struct HpfsFSHashEntryBuilder; + +struct HpfsFsEntryHint; +struct HpfsFsEntryHintBuilder; + +struct HpfsFileHashMapHint; +struct HpfsFileHashMapHintBuilder; + struct HpfsRequestMsg; struct HpfsRequestMsgBuilder; @@ -49,9 +58,6 @@ struct HpfsFileHashMapResponseBuilder; struct HpfsBlockResponse; struct HpfsBlockResponseBuilder; -struct HpfsFSHashEntry; -struct HpfsFSHashEntryBuilder; - struct HpfsLogRequest; struct HpfsLogRequestBuilder; @@ -204,21 +210,105 @@ template<> struct P2PMsgContentTraits { bool VerifyP2PMsgContent(flatbuffers::Verifier &verifier, const void *obj, P2PMsgContent type); bool VerifyP2PMsgContentVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector> *values, const flatbuffers::Vector *types); +enum HpfsFsEntryResponseType { + HpfsFsEntryResponseType_Matched = 0, + HpfsFsEntryResponseType_Mismatched = 1, + HpfsFsEntryResponseType_Responded = 2, + HpfsFsEntryResponseType_NotAvailable = 3, + HpfsFsEntryResponseType_MIN = HpfsFsEntryResponseType_Matched, + HpfsFsEntryResponseType_MAX = HpfsFsEntryResponseType_NotAvailable +}; + +inline const HpfsFsEntryResponseType (&EnumValuesHpfsFsEntryResponseType())[4] { + static const HpfsFsEntryResponseType values[] = { + HpfsFsEntryResponseType_Matched, + HpfsFsEntryResponseType_Mismatched, + HpfsFsEntryResponseType_Responded, + HpfsFsEntryResponseType_NotAvailable + }; + return values; +} + +inline const char * const *EnumNamesHpfsFsEntryResponseType() { + static const char * const names[5] = { + "Matched", + "Mismatched", + "Responded", + "NotAvailable", + nullptr + }; + return names; +} + +inline const char *EnumNameHpfsFsEntryResponseType(HpfsFsEntryResponseType e) { + if (flatbuffers::IsOutRange(e, HpfsFsEntryResponseType_Matched, HpfsFsEntryResponseType_NotAvailable)) return ""; + const size_t index = static_cast(e); + return EnumNamesHpfsFsEntryResponseType()[index]; +} + +enum HpfsRequestHint { + HpfsRequestHint_NONE = 0, + HpfsRequestHint_HpfsFsEntryHint = 1, + HpfsRequestHint_HpfsFileHashMapHint = 2, + HpfsRequestHint_MIN = HpfsRequestHint_NONE, + HpfsRequestHint_MAX = HpfsRequestHint_HpfsFileHashMapHint +}; + +inline const HpfsRequestHint (&EnumValuesHpfsRequestHint())[3] { + static const HpfsRequestHint values[] = { + HpfsRequestHint_NONE, + HpfsRequestHint_HpfsFsEntryHint, + HpfsRequestHint_HpfsFileHashMapHint + }; + return values; +} + +inline const char * const *EnumNamesHpfsRequestHint() { + static const char * const names[4] = { + "NONE", + "HpfsFsEntryHint", + "HpfsFileHashMapHint", + nullptr + }; + return names; +} + +inline const char *EnumNameHpfsRequestHint(HpfsRequestHint e) { + if (flatbuffers::IsOutRange(e, HpfsRequestHint_NONE, HpfsRequestHint_HpfsFileHashMapHint)) return ""; + const size_t index = static_cast(e); + return EnumNamesHpfsRequestHint()[index]; +} + +template struct HpfsRequestHintTraits { + static const HpfsRequestHint enum_value = HpfsRequestHint_NONE; +}; + +template<> struct HpfsRequestHintTraits { + static const HpfsRequestHint enum_value = HpfsRequestHint_HpfsFsEntryHint; +}; + +template<> struct HpfsRequestHintTraits { + static const HpfsRequestHint enum_value = HpfsRequestHint_HpfsFileHashMapHint; +}; + +bool VerifyHpfsRequestHint(flatbuffers::Verifier &verifier, const void *obj, HpfsRequestHint type); +bool VerifyHpfsRequestHintVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector> *values, const flatbuffers::Vector *types); + enum HpfsResponse { HpfsResponse_NONE = 0, - HpfsResponse_HpfsFileHashMapResponse = 1, - HpfsResponse_HpfsBlockResponse = 2, - HpfsResponse_HpfsFsEntryResponse = 3, + HpfsResponse_HpfsFsEntryResponse = 1, + HpfsResponse_HpfsFileHashMapResponse = 2, + HpfsResponse_HpfsBlockResponse = 3, HpfsResponse_MIN = HpfsResponse_NONE, - HpfsResponse_MAX = HpfsResponse_HpfsFsEntryResponse + HpfsResponse_MAX = HpfsResponse_HpfsBlockResponse }; inline const HpfsResponse (&EnumValuesHpfsResponse())[4] { static const HpfsResponse values[] = { HpfsResponse_NONE, + HpfsResponse_HpfsFsEntryResponse, HpfsResponse_HpfsFileHashMapResponse, - HpfsResponse_HpfsBlockResponse, - HpfsResponse_HpfsFsEntryResponse + HpfsResponse_HpfsBlockResponse }; return values; } @@ -226,16 +316,16 @@ inline const HpfsResponse (&EnumValuesHpfsResponse())[4] { inline const char * const *EnumNamesHpfsResponse() { static const char * const names[5] = { "NONE", + "HpfsFsEntryResponse", "HpfsFileHashMapResponse", "HpfsBlockResponse", - "HpfsFsEntryResponse", nullptr }; return names; } inline const char *EnumNameHpfsResponse(HpfsResponse e) { - if (flatbuffers::IsOutRange(e, HpfsResponse_NONE, HpfsResponse_HpfsFsEntryResponse)) return ""; + if (flatbuffers::IsOutRange(e, HpfsResponse_NONE, HpfsResponse_HpfsBlockResponse)) return ""; const size_t index = static_cast(e); return EnumNamesHpfsResponse()[index]; } @@ -244,6 +334,10 @@ template struct HpfsResponseTraits { static const HpfsResponse enum_value = HpfsResponse_NONE; }; +template<> struct HpfsResponseTraits { + static const HpfsResponse enum_value = HpfsResponse_HpfsFsEntryResponse; +}; + template<> struct HpfsResponseTraits { static const HpfsResponse enum_value = HpfsResponse_HpfsFileHashMapResponse; }; @@ -252,10 +346,6 @@ template<> struct HpfsResponseTraits { static const HpfsResponse enum_value = HpfsResponse_HpfsBlockResponse; }; -template<> struct HpfsResponseTraits { - static const HpfsResponse enum_value = HpfsResponse_HpfsFsEntryResponse; -}; - bool VerifyHpfsResponse(flatbuffers::Verifier &verifier, const void *obj, HpfsResponse type); bool VerifyHpfsResponseVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector> *values, const flatbuffers::Vector *types); @@ -1231,6 +1321,219 @@ inline flatbuffers::Offset CreateNplMsgDirect( lcl_id); } +struct HpfsFSHashEntry FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef HpfsFSHashEntryBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_NAME = 4, + VT_IS_FILE = 6, + VT_HASH = 8, + VT_RESPONSE_TYPE = 10 + }; + const flatbuffers::String *name() const { + return GetPointer(VT_NAME); + } + flatbuffers::String *mutable_name() { + return GetPointer(VT_NAME); + } + bool is_file() const { + return GetField(VT_IS_FILE, 0) != 0; + } + bool mutate_is_file(bool _is_file) { + return SetField(VT_IS_FILE, static_cast(_is_file), 0); + } + const flatbuffers::Vector *hash() const { + return GetPointer *>(VT_HASH); + } + flatbuffers::Vector *mutable_hash() { + return GetPointer *>(VT_HASH); + } + msg::fbuf::p2pmsg::HpfsFsEntryResponseType response_type() const { + return static_cast(GetField(VT_RESPONSE_TYPE, 0)); + } + bool mutate_response_type(msg::fbuf::p2pmsg::HpfsFsEntryResponseType _response_type) { + return SetField(VT_RESPONSE_TYPE, static_cast(_response_type), 0); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_NAME) && + verifier.VerifyString(name()) && + VerifyField(verifier, VT_IS_FILE) && + VerifyOffset(verifier, VT_HASH) && + verifier.VerifyVector(hash()) && + VerifyField(verifier, VT_RESPONSE_TYPE) && + verifier.EndTable(); + } +}; + +struct HpfsFSHashEntryBuilder { + typedef HpfsFSHashEntry Table; + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_name(flatbuffers::Offset name) { + fbb_.AddOffset(HpfsFSHashEntry::VT_NAME, name); + } + void add_is_file(bool is_file) { + fbb_.AddElement(HpfsFSHashEntry::VT_IS_FILE, static_cast(is_file), 0); + } + void add_hash(flatbuffers::Offset> hash) { + fbb_.AddOffset(HpfsFSHashEntry::VT_HASH, hash); + } + void add_response_type(msg::fbuf::p2pmsg::HpfsFsEntryResponseType response_type) { + fbb_.AddElement(HpfsFSHashEntry::VT_RESPONSE_TYPE, static_cast(response_type), 0); + } + explicit HpfsFSHashEntryBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + HpfsFSHashEntryBuilder &operator=(const HpfsFSHashEntryBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateHpfsFSHashEntry( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset name = 0, + bool is_file = false, + flatbuffers::Offset> hash = 0, + msg::fbuf::p2pmsg::HpfsFsEntryResponseType response_type = msg::fbuf::p2pmsg::HpfsFsEntryResponseType_Matched) { + HpfsFSHashEntryBuilder builder_(_fbb); + builder_.add_hash(hash); + builder_.add_name(name); + builder_.add_response_type(response_type); + builder_.add_is_file(is_file); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateHpfsFSHashEntryDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const char *name = nullptr, + bool is_file = false, + const std::vector *hash = nullptr, + msg::fbuf::p2pmsg::HpfsFsEntryResponseType response_type = msg::fbuf::p2pmsg::HpfsFsEntryResponseType_Matched) { + auto name__ = name ? _fbb.CreateString(name) : 0; + auto hash__ = hash ? _fbb.CreateVector(*hash) : 0; + return msg::fbuf::p2pmsg::CreateHpfsFSHashEntry( + _fbb, + name__, + is_file, + hash__, + response_type); +} + +struct HpfsFsEntryHint FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef HpfsFsEntryHintBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_ENTRIES = 4 + }; + const flatbuffers::Vector> *entries() const { + return GetPointer> *>(VT_ENTRIES); + } + flatbuffers::Vector> *mutable_entries() { + return GetPointer> *>(VT_ENTRIES); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_ENTRIES) && + verifier.VerifyVector(entries()) && + verifier.VerifyVectorOfTables(entries()) && + verifier.EndTable(); + } +}; + +struct HpfsFsEntryHintBuilder { + typedef HpfsFsEntryHint Table; + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_entries(flatbuffers::Offset>> entries) { + fbb_.AddOffset(HpfsFsEntryHint::VT_ENTRIES, entries); + } + explicit HpfsFsEntryHintBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + HpfsFsEntryHintBuilder &operator=(const HpfsFsEntryHintBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateHpfsFsEntryHint( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset>> entries = 0) { + HpfsFsEntryHintBuilder builder_(_fbb); + builder_.add_entries(entries); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateHpfsFsEntryHintDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const std::vector> *entries = nullptr) { + auto entries__ = entries ? _fbb.CreateVector>(*entries) : 0; + return msg::fbuf::p2pmsg::CreateHpfsFsEntryHint( + _fbb, + entries__); +} + +struct HpfsFileHashMapHint FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef HpfsFileHashMapHintBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_HASH_MAP = 4 + }; + const flatbuffers::Vector *hash_map() const { + return GetPointer *>(VT_HASH_MAP); + } + flatbuffers::Vector *mutable_hash_map() { + return GetPointer *>(VT_HASH_MAP); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_HASH_MAP) && + verifier.VerifyVector(hash_map()) && + verifier.EndTable(); + } +}; + +struct HpfsFileHashMapHintBuilder { + typedef HpfsFileHashMapHint Table; + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_hash_map(flatbuffers::Offset> hash_map) { + fbb_.AddOffset(HpfsFileHashMapHint::VT_HASH_MAP, hash_map); + } + explicit HpfsFileHashMapHintBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + HpfsFileHashMapHintBuilder &operator=(const HpfsFileHashMapHintBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateHpfsFileHashMapHint( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset> hash_map = 0) { + HpfsFileHashMapHintBuilder builder_(_fbb); + builder_.add_hash_map(hash_map); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateHpfsFileHashMapHintDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const std::vector *hash_map = nullptr) { + auto hash_map__ = hash_map ? _fbb.CreateVector(*hash_map) : 0; + return msg::fbuf::p2pmsg::CreateHpfsFileHashMapHint( + _fbb, + hash_map__); +} + struct HpfsRequestMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef HpfsRequestMsgBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { @@ -1238,7 +1541,9 @@ struct HpfsRequestMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VT_PARENT_PATH = 6, VT_IS_FILE = 8, VT_BLOCK_ID = 10, - VT_EXPECTED_HASH = 12 + VT_EXPECTED_HASH = 12, + VT_HINT_TYPE = 14, + VT_HINT = 16 }; uint32_t mount_id() const { return GetField(VT_MOUNT_ID, 0); @@ -1270,6 +1575,22 @@ struct HpfsRequestMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { flatbuffers::Vector *mutable_expected_hash() { return GetPointer *>(VT_EXPECTED_HASH); } + msg::fbuf::p2pmsg::HpfsRequestHint hint_type() const { + return static_cast(GetField(VT_HINT_TYPE, 0)); + } + const void *hint() const { + return GetPointer(VT_HINT); + } + template const T *hint_as() const; + const msg::fbuf::p2pmsg::HpfsFsEntryHint *hint_as_HpfsFsEntryHint() const { + return hint_type() == msg::fbuf::p2pmsg::HpfsRequestHint_HpfsFsEntryHint ? static_cast(hint()) : nullptr; + } + const msg::fbuf::p2pmsg::HpfsFileHashMapHint *hint_as_HpfsFileHashMapHint() const { + return hint_type() == msg::fbuf::p2pmsg::HpfsRequestHint_HpfsFileHashMapHint ? static_cast(hint()) : nullptr; + } + void *mutable_hint() { + return GetPointer(VT_HINT); + } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && VerifyField(verifier, VT_MOUNT_ID) && @@ -1279,10 +1600,21 @@ struct HpfsRequestMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VerifyField(verifier, VT_BLOCK_ID) && VerifyOffset(verifier, VT_EXPECTED_HASH) && verifier.VerifyVector(expected_hash()) && + VerifyField(verifier, VT_HINT_TYPE) && + VerifyOffset(verifier, VT_HINT) && + VerifyHpfsRequestHint(verifier, hint(), hint_type()) && verifier.EndTable(); } }; +template<> inline const msg::fbuf::p2pmsg::HpfsFsEntryHint *HpfsRequestMsg::hint_as() const { + return hint_as_HpfsFsEntryHint(); +} + +template<> inline const msg::fbuf::p2pmsg::HpfsFileHashMapHint *HpfsRequestMsg::hint_as() const { + return hint_as_HpfsFileHashMapHint(); +} + struct HpfsRequestMsgBuilder { typedef HpfsRequestMsg Table; flatbuffers::FlatBufferBuilder &fbb_; @@ -1302,6 +1634,12 @@ struct HpfsRequestMsgBuilder { void add_expected_hash(flatbuffers::Offset> expected_hash) { fbb_.AddOffset(HpfsRequestMsg::VT_EXPECTED_HASH, expected_hash); } + void add_hint_type(msg::fbuf::p2pmsg::HpfsRequestHint hint_type) { + fbb_.AddElement(HpfsRequestMsg::VT_HINT_TYPE, static_cast(hint_type), 0); + } + void add_hint(flatbuffers::Offset hint) { + fbb_.AddOffset(HpfsRequestMsg::VT_HINT, hint); + } explicit HpfsRequestMsgBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { start_ = fbb_.StartTable(); @@ -1320,12 +1658,16 @@ inline flatbuffers::Offset CreateHpfsRequestMsg( flatbuffers::Offset parent_path = 0, bool is_file = false, int32_t block_id = 0, - flatbuffers::Offset> expected_hash = 0) { + flatbuffers::Offset> expected_hash = 0, + msg::fbuf::p2pmsg::HpfsRequestHint hint_type = msg::fbuf::p2pmsg::HpfsRequestHint_NONE, + flatbuffers::Offset hint = 0) { HpfsRequestMsgBuilder builder_(_fbb); + builder_.add_hint(hint); builder_.add_expected_hash(expected_hash); builder_.add_block_id(block_id); builder_.add_parent_path(parent_path); builder_.add_mount_id(mount_id); + builder_.add_hint_type(hint_type); builder_.add_is_file(is_file); return builder_.Finish(); } @@ -1336,7 +1678,9 @@ inline flatbuffers::Offset CreateHpfsRequestMsgDirect( const char *parent_path = nullptr, bool is_file = false, int32_t block_id = 0, - const std::vector *expected_hash = nullptr) { + const std::vector *expected_hash = nullptr, + msg::fbuf::p2pmsg::HpfsRequestHint hint_type = msg::fbuf::p2pmsg::HpfsRequestHint_NONE, + flatbuffers::Offset hint = 0) { auto parent_path__ = parent_path ? _fbb.CreateString(parent_path) : 0; auto expected_hash__ = expected_hash ? _fbb.CreateVector(*expected_hash) : 0; return msg::fbuf::p2pmsg::CreateHpfsRequestMsg( @@ -1345,7 +1689,9 @@ inline flatbuffers::Offset CreateHpfsRequestMsgDirect( parent_path__, is_file, block_id, - expected_hash__); + expected_hash__, + hint_type, + hint); } struct HpfsResponseMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { @@ -1382,15 +1728,15 @@ struct HpfsResponseMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { return GetPointer(VT_CONTENT); } template const T *content_as() const; + const msg::fbuf::p2pmsg::HpfsFsEntryResponse *content_as_HpfsFsEntryResponse() const { + return content_type() == msg::fbuf::p2pmsg::HpfsResponse_HpfsFsEntryResponse ? static_cast(content()) : nullptr; + } const msg::fbuf::p2pmsg::HpfsFileHashMapResponse *content_as_HpfsFileHashMapResponse() const { return content_type() == msg::fbuf::p2pmsg::HpfsResponse_HpfsFileHashMapResponse ? static_cast(content()) : nullptr; } const msg::fbuf::p2pmsg::HpfsBlockResponse *content_as_HpfsBlockResponse() const { return content_type() == msg::fbuf::p2pmsg::HpfsResponse_HpfsBlockResponse ? static_cast(content()) : nullptr; } - const msg::fbuf::p2pmsg::HpfsFsEntryResponse *content_as_HpfsFsEntryResponse() const { - return content_type() == msg::fbuf::p2pmsg::HpfsResponse_HpfsFsEntryResponse ? static_cast(content()) : nullptr; - } void *mutable_content() { return GetPointer(VT_CONTENT); } @@ -1408,6 +1754,10 @@ struct HpfsResponseMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { } }; +template<> inline const msg::fbuf::p2pmsg::HpfsFsEntryResponse *HpfsResponseMsg::content_as() const { + return content_as_HpfsFsEntryResponse(); +} + template<> inline const msg::fbuf::p2pmsg::HpfsFileHashMapResponse *HpfsResponseMsg::content_as() const { return content_as_HpfsFileHashMapResponse(); } @@ -1416,10 +1766,6 @@ template<> inline const msg::fbuf::p2pmsg::HpfsBlockResponse *HpfsResponseMsg::c return content_as_HpfsBlockResponse(); } -template<> inline const msg::fbuf::p2pmsg::HpfsFsEntryResponse *HpfsResponseMsg::content_as() const { - return content_as_HpfsFsEntryResponse(); -} - struct HpfsResponseMsgBuilder { typedef HpfsResponseMsg Table; flatbuffers::FlatBufferBuilder &fbb_; @@ -1561,7 +1907,8 @@ struct HpfsFileHashMapResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Ta enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_FILE_LENGTH = 4, VT_FILE_MODE = 6, - VT_HASH_MAP = 8 + VT_HASH_MAP = 8, + VT_RESPONDED_BLOCK_IDS = 10 }; uint64_t file_length() const { return GetField(VT_FILE_LENGTH, 0); @@ -1581,12 +1928,20 @@ struct HpfsFileHashMapResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Ta flatbuffers::Vector *mutable_hash_map() { return GetPointer *>(VT_HASH_MAP); } + const flatbuffers::Vector *responded_block_ids() const { + return GetPointer *>(VT_RESPONDED_BLOCK_IDS); + } + flatbuffers::Vector *mutable_responded_block_ids() { + return GetPointer *>(VT_RESPONDED_BLOCK_IDS); + } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && VerifyField(verifier, VT_FILE_LENGTH) && VerifyField(verifier, VT_FILE_MODE) && VerifyOffset(verifier, VT_HASH_MAP) && verifier.VerifyVector(hash_map()) && + VerifyOffset(verifier, VT_RESPONDED_BLOCK_IDS) && + verifier.VerifyVector(responded_block_ids()) && verifier.EndTable(); } }; @@ -1604,6 +1959,9 @@ struct HpfsFileHashMapResponseBuilder { void add_hash_map(flatbuffers::Offset> hash_map) { fbb_.AddOffset(HpfsFileHashMapResponse::VT_HASH_MAP, hash_map); } + void add_responded_block_ids(flatbuffers::Offset> responded_block_ids) { + fbb_.AddOffset(HpfsFileHashMapResponse::VT_RESPONDED_BLOCK_IDS, responded_block_ids); + } explicit HpfsFileHashMapResponseBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { start_ = fbb_.StartTable(); @@ -1620,9 +1978,11 @@ inline flatbuffers::Offset CreateHpfsFileHashMapRespons flatbuffers::FlatBufferBuilder &_fbb, uint64_t file_length = 0, uint32_t file_mode = 0, - flatbuffers::Offset> hash_map = 0) { + flatbuffers::Offset> hash_map = 0, + flatbuffers::Offset> responded_block_ids = 0) { HpfsFileHashMapResponseBuilder builder_(_fbb); builder_.add_file_length(file_length); + builder_.add_responded_block_ids(responded_block_ids); builder_.add_hash_map(hash_map); builder_.add_file_mode(file_mode); return builder_.Finish(); @@ -1632,13 +1992,16 @@ inline flatbuffers::Offset CreateHpfsFileHashMapRespons flatbuffers::FlatBufferBuilder &_fbb, uint64_t file_length = 0, uint32_t file_mode = 0, - const std::vector *hash_map = nullptr) { + const std::vector *hash_map = nullptr, + const std::vector *responded_block_ids = nullptr) { auto hash_map__ = hash_map ? _fbb.CreateVector(*hash_map) : 0; + auto responded_block_ids__ = responded_block_ids ? _fbb.CreateVector(*responded_block_ids) : 0; return msg::fbuf::p2pmsg::CreateHpfsFileHashMapResponse( _fbb, file_length, file_mode, - hash_map__); + hash_map__, + responded_block_ids__); } struct HpfsBlockResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { @@ -1711,93 +2074,6 @@ inline flatbuffers::Offset CreateHpfsBlockResponseDirect( data__); } -struct HpfsFSHashEntry FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { - typedef HpfsFSHashEntryBuilder Builder; - enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_NAME = 4, - VT_IS_FILE = 6, - VT_HASH = 8 - }; - const flatbuffers::String *name() const { - return GetPointer(VT_NAME); - } - flatbuffers::String *mutable_name() { - return GetPointer(VT_NAME); - } - bool is_file() const { - return GetField(VT_IS_FILE, 0) != 0; - } - bool mutate_is_file(bool _is_file) { - return SetField(VT_IS_FILE, static_cast(_is_file), 0); - } - const flatbuffers::Vector *hash() const { - return GetPointer *>(VT_HASH); - } - flatbuffers::Vector *mutable_hash() { - return GetPointer *>(VT_HASH); - } - bool Verify(flatbuffers::Verifier &verifier) const { - return VerifyTableStart(verifier) && - VerifyOffset(verifier, VT_NAME) && - verifier.VerifyString(name()) && - VerifyField(verifier, VT_IS_FILE) && - VerifyOffset(verifier, VT_HASH) && - verifier.VerifyVector(hash()) && - verifier.EndTable(); - } -}; - -struct HpfsFSHashEntryBuilder { - typedef HpfsFSHashEntry Table; - flatbuffers::FlatBufferBuilder &fbb_; - flatbuffers::uoffset_t start_; - void add_name(flatbuffers::Offset name) { - fbb_.AddOffset(HpfsFSHashEntry::VT_NAME, name); - } - void add_is_file(bool is_file) { - fbb_.AddElement(HpfsFSHashEntry::VT_IS_FILE, static_cast(is_file), 0); - } - void add_hash(flatbuffers::Offset> hash) { - fbb_.AddOffset(HpfsFSHashEntry::VT_HASH, hash); - } - explicit HpfsFSHashEntryBuilder(flatbuffers::FlatBufferBuilder &_fbb) - : fbb_(_fbb) { - start_ = fbb_.StartTable(); - } - HpfsFSHashEntryBuilder &operator=(const HpfsFSHashEntryBuilder &); - flatbuffers::Offset Finish() { - const auto end = fbb_.EndTable(start_); - auto o = flatbuffers::Offset(end); - return o; - } -}; - -inline flatbuffers::Offset CreateHpfsFSHashEntry( - flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset name = 0, - bool is_file = false, - flatbuffers::Offset> hash = 0) { - HpfsFSHashEntryBuilder builder_(_fbb); - builder_.add_hash(hash); - builder_.add_name(name); - builder_.add_is_file(is_file); - return builder_.Finish(); -} - -inline flatbuffers::Offset CreateHpfsFSHashEntryDirect( - flatbuffers::FlatBufferBuilder &_fbb, - const char *name = nullptr, - bool is_file = false, - const std::vector *hash = nullptr) { - auto name__ = name ? _fbb.CreateString(name) : 0; - auto hash__ = hash ? _fbb.CreateVector(*hash) : 0; - return msg::fbuf::p2pmsg::CreateHpfsFSHashEntry( - _fbb, - name__, - is_file, - hash__); -} - struct HpfsLogRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef HpfsLogRequestBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { @@ -2415,11 +2691,44 @@ inline bool VerifyP2PMsgContentVector(flatbuffers::Verifier &verifier, const fla return true; } +inline bool VerifyHpfsRequestHint(flatbuffers::Verifier &verifier, const void *obj, HpfsRequestHint type) { + switch (type) { + case HpfsRequestHint_NONE: { + return true; + } + case HpfsRequestHint_HpfsFsEntryHint: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } + case HpfsRequestHint_HpfsFileHashMapHint: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } + default: return true; + } +} + +inline bool VerifyHpfsRequestHintVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector> *values, const flatbuffers::Vector *types) { + if (!values || !types) return !values && !types; + if (values->size() != types->size()) return false; + for (flatbuffers::uoffset_t i = 0; i < values->size(); ++i) { + if (!VerifyHpfsRequestHint( + verifier, values->Get(i), types->GetEnum(i))) { + return false; + } + } + return true; +} + inline bool VerifyHpfsResponse(flatbuffers::Verifier &verifier, const void *obj, HpfsResponse type) { switch (type) { case HpfsResponse_NONE: { return true; } + case HpfsResponse_HpfsFsEntryResponse: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } case HpfsResponse_HpfsFileHashMapResponse: { auto ptr = reinterpret_cast(obj); return verifier.VerifyTable(ptr); @@ -2428,10 +2737,6 @@ inline bool VerifyHpfsResponse(flatbuffers::Verifier &verifier, const void *obj, auto ptr = reinterpret_cast(obj); return verifier.VerifyTable(ptr); } - case HpfsResponse_HpfsFsEntryResponse: { - auto ptr = reinterpret_cast(obj); - return verifier.VerifyTable(ptr); - } default: return true; } } diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 4ab1d2ae..75929807 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -5,6 +5,7 @@ #include "../usr/user_input.hpp" #include "../util/h32.hpp" #include "../conf.hpp" +#include "../hpfs/hpfs_mount.hpp" #include "../msg/fbuf/p2pmsg_generated.h" #include "peer_comm_server.hpp" #include "peer_comm_session.hpp" @@ -122,16 +123,6 @@ namespace p2p std::string data; }; - // Represents a hpfs request sent to a peer. - struct hpfs_request - { - uint32_t mount_id; // Relavent file system id. - std::string parent_path; // The requested file or dir path. - bool is_file = false; // Whether the path is a file or dir. - int32_t block_id = 0; // Block id of the file if we are requesting for file block. Otherwise -1. - util::h32 expected_hash; // The expected hash of the requested result. - }; - // Represents hpfs log sync request. struct hpfs_log_request { @@ -146,12 +137,24 @@ namespace p2p std::vector log_record_bytes; }; + enum HPFS_FS_ENTRY_RESPONSE_TYPE + { + MATCHED = 0, // The entry matches between requester and responder. No sync needed. + MISMATCHED = 1, // The entry does not match (either hash mismatch or new entry). Requester must request for this entry. + RESPONDED = 2, // The entry does not match and the repsonder has dispatched the sync response. + NOT_AVAILABLE = 3 // The entry does not exist on responder side. Requester must delete this on his side. + }; + // Represents hpfs file system entry. struct hpfs_fs_hash_entry { std::string name; // Name of the file/dir. bool is_file = false; // Whether this is a file or dir. util::h32 hash; // Hash of the file or dir. + + // Only relevant for hpfs responses. Indicates about the availabilty and status of this + // fs entry as reported by the responder. + HPFS_FS_ENTRY_RESPONSE_TYPE response_type = HPFS_FS_ENTRY_RESPONSE_TYPE::MATCHED; }; // Represents a file block data resposne. @@ -163,6 +166,18 @@ namespace p2p util::h32 hash; // Hash of the bloc data. }; + // Represents a hpfs request sent to a peer. + struct hpfs_request + { + uint32_t mount_id = 0; // Relavent file system id. + std::string parent_path; // The requested file or dir path. + bool is_file = false; // Whether the path is a file or dir. + int32_t block_id = 0; // Block id of the file if we are requesting for file block. Otherwise -1. + util::h32 expected_hash; // The expected hash of the requested result. + std::vector fs_entry_hints; // Included fs entry entry hints for the responder. + std::vector file_hashmap_hints; // Included file hash map hints for the responder. + }; + struct peer_message_info { const msg::fbuf::p2pmsg::P2PMsg *p2p_msg = NULL;