From b77a3fc924cb5f5b59d366cedcf23288196fa194 Mon Sep 17 00:00:00 2001 From: Chalith Desaman Date: Thu, 10 Dec 2020 17:32:18 +0530 Subject: [PATCH] Validate peer state response content against the hash. (#190) --- src/hpfs/hpfs.hpp | 3 + src/state/state_sync.cpp | 191 ++++++++++++++++++++++++++++++--------- src/state/state_sync.hpp | 13 ++- 3 files changed, 162 insertions(+), 45 deletions(-) diff --git a/src/hpfs/hpfs.hpp b/src/hpfs/hpfs.hpp index 96028d53..332b7d03 100644 --- a/src/hpfs/hpfs.hpp +++ b/src/hpfs/hpfs.hpp @@ -18,6 +18,9 @@ namespace hpfs } }; + // File block size; + constexpr size_t BLOCK_SIZE = 4194304; // 4MB + int start_merge_process(pid_t &hpfs_pid); int start_ro_rw_process(pid_t &hpfs_pid, std::string &mount_dir, const bool readonly, const bool hash_map_enabled, const bool auto_start_session, const uint16_t timeout = 4000); diff --git a/src/state/state_sync.cpp b/src/state/state_sync.cpp index d5c88e1a..9895479f 100644 --- a/src/state/state_sync.cpp +++ b/src/state/state_sync.cpp @@ -194,18 +194,63 @@ namespace state_sync continue; } - // Now that we have received matching hash, remove it from the waiting list. - ctx.submitted_requests.erase(pending_resp_itr); - // Process the message based on response type. const msg::fbuf::p2pmsg::State_Response msg_type = resp_msg->state_response_type(); if (msg_type == msg::fbuf::p2pmsg::State_Response_Fs_Entry_Response) - handle_fs_entry_response(vpath, resp_msg->state_response_as_Fs_Entry_Response()); + { + const msg::fbuf::p2pmsg::Fs_Entry_Response *fs_resp = resp_msg->state_response_as_Fs_Entry_Response(); + + // Get fs entries we have received. + std::unordered_map peer_fs_entry_map; + msg::fbuf::p2pmsg::flatbuf_statefshashentry_to_statefshashentry(peer_fs_entry_map, fs_resp->entries()); + + // Validate received fs data against the hash. + if (!validate_fs_entry_hash(vpath, hash, peer_fs_entry_map)) + { + LOG_INFO << "State sync: Skipping state response due to fs entry hash mismatch."; + continue; + } + + handle_fs_entry_response(vpath, peer_fs_entry_map); + } else if (msg_type == msg::fbuf::p2pmsg::State_Response_File_HashMap_Response) - handle_file_hashmap_response(vpath, resp_msg->state_response_as_File_HashMap_Response()); + { + const msg::fbuf::p2pmsg::File_HashMap_Response *file_resp = resp_msg->state_response_as_File_HashMap_Response(); + + // File block hashes we received from the peer. + const hpfs::h32 *peer_hashes = reinterpret_cast(file_resp->hash_map()->data()); + const size_t peer_hash_count = file_resp->hash_map()->size() / sizeof(hpfs::h32); + + // Validate received hashmap against the hash. + if (!validate_file_hashmap_hash(vpath, hash, peer_hashes, peer_hash_count)) + { + LOG_INFO << "State sync: Skipping state response due to file hashmap hash mismatch."; + continue; + } + + handle_file_hashmap_response(vpath, peer_hashes, peer_hash_count, file_resp->file_length()); + } else if (msg_type == msg::fbuf::p2pmsg::State_Response_Block_Response) - handle_file_block_response(vpath, resp_msg->state_response_as_Block_Response()); + { + const msg::fbuf::p2pmsg::Block_Response *block_resp = resp_msg->state_response_as_Block_Response(); + + // Get the file path of the block data we have received. + const uint32_t block_id = block_resp->block_id(); + std::string_view buf = msg::fbuf::flatbuff_bytes_to_sv(block_resp->data()); + + // Validate received block data against the hash. + if (!validate_file_block_hash(hash, block_id, buf)) + { + LOG_INFO << "State sync: Skipping state response due to file block hash mismatch."; + continue; + } + + handle_file_block_response(vpath, block_id, buf); + } + + // Now that we have received matching hash and handled it, remove it from the waiting list. + ctx.submitted_requests.erase(pending_resp_itr); // After handling each response, check whether we have reached target state. if (hpfs::get_hash(updated_state, ctx.hpfs_mount_dir, "/") < 1) @@ -269,6 +314,68 @@ namespace state_sync return 0; } + /** + * Vadidated the received hash against the received fs entry map. + * @param vpath Virtual path of the fs. + * @param hash Received hash. + * @param fs_entry_map Received fs entry map. + * @returns true if hash is valid, otherwise false. + */ + bool validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const std::unordered_map &fs_entry_map) + { + hpfs::h32 content_hash; + + // Initilal hash is vpath hash. + content_hash = crypto::get_hash(vpath); + + // Then XOR the file hashes to the initial hash. + for (const auto &[name, fs_entry] : fs_entry_map) + { + content_hash ^= fs_entry.hash; + } + + return content_hash.to_string_view() == hash; + } + + /** + * Vadidated the received hash against the received file hash map. + * @param vpath Virtual path of the file. + * @param hash Received hash. + * @param hashes Received block hashes. + * @param hash_count Size of the hash list. + * @returns true if hash is valid, otherwise false. + */ + bool validate_file_hashmap_hash(std::string_view vpath, std::string_view hash, const hpfs::h32 *hashes, const size_t hash_count) + { + hpfs::h32 content_hash = hpfs::h32_empty; + + // Initilal hash is vpath hash. + content_hash = crypto::get_hash(vpath); + + // Then XOR the block hashes to the initial hash. + for (int32_t block_id = 0; block_id < hash_count; block_id++) + { + content_hash ^= hashes[block_id]; + } + + return content_hash.to_string_view() == hash; + } + + /** + * Vadidated the received hash against the received block. + * @param hash Received hash. + * @param block_id Id of the block. + * @param buf Block buffer. + * @returns true if hash is valid, otherwise false. + */ + bool validate_file_block_hash(std::string_view hash, const uint32_t block_id, std::string_view buf) + { + // Calculate block offset of this block. + const off_t block_offset = block_id * hpfs::BLOCK_SIZE; + std::string_view offset = std::string_view(reinterpret_cast(&block_offset), sizeof(off_t)); + return crypto::get_hash(offset, buf) == hash; + } + /** * Indicates whether to break out of state request processing loop. */ @@ -325,36 +432,35 @@ namespace state_sync /** * Process dir children response. + * @param vpath Virtual path of the fs. + * @param fs_entry_map Received fs entry map. + * @returns 0 on success, otherwise -1. */ - int handle_fs_entry_response(std::string_view parent_vpath, const msg::fbuf::p2pmsg::Fs_Entry_Response *fs_entry_resp) + int handle_fs_entry_response(std::string_view vpath, std::unordered_map &fs_entry_map) { // Get the parent path of the fs entries we have received. - LOG_DEBUG << "State sync: Processing fs entries response for " << parent_vpath; - - // Get fs entries we have received. - std::unordered_map peer_fs_entry_map; - msg::fbuf::p2pmsg::flatbuf_statefshashentry_to_statefshashentry(peer_fs_entry_map, fs_entry_resp->entries()); + LOG_DEBUG << "State sync: Processing fs entries response for " << vpath; // Create physical directory on our side if not exist. - std::string parent_physical_path = std::string(ctx.hpfs_mount_dir).append(parent_vpath); + std::string parent_physical_path = std::string(ctx.hpfs_mount_dir).append(vpath); if (util::create_dir_tree_recursive(parent_physical_path) == -1) return -1; // Get the children hash entries and compare with what we got from peer. std::vector existing_fs_entries; - if (hpfs::get_dir_children_hashes(existing_fs_entries, ctx.hpfs_mount_dir, parent_vpath) == -1) + if (hpfs::get_dir_children_hashes(existing_fs_entries, ctx.hpfs_mount_dir, vpath) == -1) return -1; // Request more info on fs entries that exist on both sides but are different. for (const auto &ex_entry : existing_fs_entries) { // Construct child vpath. - std::string child_vpath = std::string(parent_vpath) - .append(parent_vpath.back() != '/' ? "/" : "") + std::string child_vpath = std::string(vpath) + .append(vpath.back() != '/' ? "/" : "") .append(ex_entry.name); - const auto peer_itr = peer_fs_entry_map.find(ex_entry.name); - if (peer_itr != peer_fs_entry_map.end()) + const auto peer_itr = fs_entry_map.find(ex_entry.name); + if (peer_itr != fs_entry_map.end()) { // Request state if hash is different. if (peer_itr->second.hash != ex_entry.hash) @@ -366,7 +472,7 @@ namespace state_sync ctx.pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, peer_itr->second.hash}); } - peer_fs_entry_map.erase(peer_itr); + fs_entry_map.erase(peer_itr); } else { @@ -382,11 +488,11 @@ namespace state_sync } // Queue the remaining peer fs entries (that our side does not have at all) to request. - for (const auto &[name, fs_entry] : peer_fs_entry_map) + for (const auto &[name, fs_entry] : fs_entry_map) { // Construct child vpath. - std::string child_vpath = std::string(parent_vpath) - .append(parent_vpath.back() != '/' ? "/" : "") + std::string child_vpath = std::string(vpath) + .append(vpath.back() != '/' ? "/" : "") .append(name); // Prioritize file state requests over directories. @@ -401,37 +507,38 @@ namespace state_sync /** * Process file block hash map response. + * @param vpath Virtual path of the file. + * @param hash Received hash. + * @param hashes Received block hashes. + * @param file_length Size of the file. + * @returns 0 on success, otherwise -1. */ - int handle_file_hashmap_response(std::string_view file_vpath, const msg::fbuf::p2pmsg::File_HashMap_Response *file_resp) + int handle_file_hashmap_response(std::string_view vpath, const hpfs::h32 *hashes, const size_t hash_count, const uint64_t file_length) { // Get the file path of the block hashes we have received. - LOG_DEBUG << "State sync: Processing file block hashes response for " << file_vpath; + LOG_DEBUG << "State sync: Processing file block hashes response for " << vpath; // File block hashes on our side (file might not exist on our side). std::vector existing_hashes; - if (hpfs::get_file_block_hashes(existing_hashes, ctx.hpfs_mount_dir, file_vpath) == -1 && errno != ENOENT) + if (hpfs::get_file_block_hashes(existing_hashes, ctx.hpfs_mount_dir, vpath) == -1 && errno != ENOENT) return -1; const size_t existing_hash_count = existing_hashes.size(); - // File block hashes we received from the peer. - const hpfs::h32 *peer_hashes = reinterpret_cast(file_resp->hash_map()->data()); - const size_t peer_hash_count = file_resp->hash_map()->size() / sizeof(hpfs::h32); - // Compare the block hashes and request any differences. auto insert_itr = ctx.pending_requests.begin(); - const int32_t max_block_id = MAX(existing_hash_count, peer_hash_count) - 1; + const int32_t max_block_id = MAX(existing_hash_count, hash_count) - 1; for (int32_t block_id = 0; block_id <= max_block_id; block_id++) { // Insert at front to give priority to block requests while preserving block order. - if (block_id >= existing_hash_count || existing_hashes[block_id] != peer_hashes[block_id]) - ctx.pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(file_vpath), block_id, peer_hashes[block_id]}); + if (block_id >= existing_hash_count || existing_hashes[block_id] != hashes[block_id]) + ctx.pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]}); } - if (existing_hashes.size() >= peer_hash_count) + if (existing_hashes.size() >= hash_count) { // If peer file might be smaller, truncate our file to match with peer file. - std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(file_vpath); - if (truncate(file_physical_path.c_str(), file_resp->file_length()) == -1) + std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(vpath); + if (truncate(file_physical_path.c_str(), file_length) == -1) return -1; } @@ -440,18 +547,18 @@ namespace state_sync /** * Process file block response. + * @param vpath Virtual path of the file. + * @param block_id Id of the block. + * @param buf Block buffer. + * @returns 0 on success, otherwise -1. */ - int handle_file_block_response(std::string_view file_vpath, const msg::fbuf::p2pmsg::Block_Response *block_msg) + int handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf) { - // Get the file path of the block data we have received. - const uint32_t block_id = block_msg->block_id(); - std::string_view buf = msg::fbuf::flatbuff_bytes_to_sv(block_msg->data()); - LOG_DEBUG << "State sync: Writing block_id " << block_id << " (len:" << buf.length() - << ") of " << file_vpath; + << ") of " << vpath; - std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(file_vpath); + std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(vpath); const int fd = open(file_physical_path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, FILE_PERMS); if (fd == -1) { diff --git a/src/state/state_sync.hpp b/src/state/state_sync.hpp index 999db4bb..3d2c0fb8 100644 --- a/src/state/state_sync.hpp +++ b/src/state/state_sync.hpp @@ -5,6 +5,7 @@ #include "../p2p/p2p.hpp" #include "../msg/fbuf/p2pmsg_content_generated.h" #include "../hpfs/h32.hpp" +#include "../crypto.hpp" namespace state_sync { @@ -64,6 +65,12 @@ namespace state_sync int request_loop(const hpfs::h32 current_target, hpfs::h32 &updated_state); + bool validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const std::unordered_map &fs_entry_map); + + bool validate_file_hashmap_hash(std::string_view vpath, std::string_view hash, const hpfs::h32 *hashes, const size_t hash_count); + + bool validate_file_block_hash(std::string_view hash, const uint32_t block_id, std::string_view buf); + bool should_stop_request_loop(const hpfs::h32 current_target); void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, @@ -71,11 +78,11 @@ namespace state_sync void submit_request(const backlog_item &request, std::string_view lcl); - int handle_fs_entry_response(std::string_view parent_vpath, const msg::fbuf::p2pmsg::Fs_Entry_Response *fs_entry_resp); + int handle_fs_entry_response(std::string_view vpath, std::unordered_map &fs_entry_map); - int handle_file_hashmap_response(std::string_view file_vpath, const msg::fbuf::p2pmsg::File_HashMap_Response *file_resp); + int handle_file_hashmap_response(std::string_view vpath, const hpfs::h32 *hashes, const size_t hash_count, const uint64_t file_length); - int handle_file_block_response(std::string_view file_vpath, const msg::fbuf::p2pmsg::Block_Response *block_msg); + int handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf); } // namespace state_sync