Validate peer state response content against the hash. (#190)

This commit is contained in:
Chalith Desaman
2020-12-10 17:32:18 +05:30
committed by GitHub
parent b86efb2d9b
commit b77a3fc924
3 changed files with 162 additions and 45 deletions

View File

@@ -18,6 +18,9 @@ namespace hpfs
}
};
// File block size;
constexpr size_t BLOCK_SIZE = 4194304; // 4MB
int start_merge_process(pid_t &hpfs_pid);
int start_ro_rw_process(pid_t &hpfs_pid, std::string &mount_dir, const bool readonly,
const bool hash_map_enabled, const bool auto_start_session, const uint16_t timeout = 4000);

View File

@@ -194,18 +194,63 @@ namespace state_sync
continue;
}
// Now that we have received matching hash, remove it from the waiting list.
ctx.submitted_requests.erase(pending_resp_itr);
// Process the message based on response type.
const msg::fbuf::p2pmsg::State_Response msg_type = resp_msg->state_response_type();
if (msg_type == msg::fbuf::p2pmsg::State_Response_Fs_Entry_Response)
handle_fs_entry_response(vpath, resp_msg->state_response_as_Fs_Entry_Response());
{
const msg::fbuf::p2pmsg::Fs_Entry_Response *fs_resp = resp_msg->state_response_as_Fs_Entry_Response();
// Get fs entries we have received.
std::unordered_map<std::string, p2p::state_fs_hash_entry> peer_fs_entry_map;
msg::fbuf::p2pmsg::flatbuf_statefshashentry_to_statefshashentry(peer_fs_entry_map, fs_resp->entries());
// Validate received fs data against the hash.
if (!validate_fs_entry_hash(vpath, hash, peer_fs_entry_map))
{
LOG_INFO << "State sync: Skipping state response due to fs entry hash mismatch.";
continue;
}
handle_fs_entry_response(vpath, peer_fs_entry_map);
}
else if (msg_type == msg::fbuf::p2pmsg::State_Response_File_HashMap_Response)
handle_file_hashmap_response(vpath, resp_msg->state_response_as_File_HashMap_Response());
{
const msg::fbuf::p2pmsg::File_HashMap_Response *file_resp = resp_msg->state_response_as_File_HashMap_Response();
// File block hashes we received from the peer.
const hpfs::h32 *peer_hashes = reinterpret_cast<const hpfs::h32 *>(file_resp->hash_map()->data());
const size_t peer_hash_count = file_resp->hash_map()->size() / sizeof(hpfs::h32);
// Validate received hashmap against the hash.
if (!validate_file_hashmap_hash(vpath, hash, peer_hashes, peer_hash_count))
{
LOG_INFO << "State sync: Skipping state response due to file hashmap hash mismatch.";
continue;
}
handle_file_hashmap_response(vpath, peer_hashes, peer_hash_count, file_resp->file_length());
}
else if (msg_type == msg::fbuf::p2pmsg::State_Response_Block_Response)
handle_file_block_response(vpath, resp_msg->state_response_as_Block_Response());
{
const msg::fbuf::p2pmsg::Block_Response *block_resp = resp_msg->state_response_as_Block_Response();
// Get the file path of the block data we have received.
const uint32_t block_id = block_resp->block_id();
std::string_view buf = msg::fbuf::flatbuff_bytes_to_sv(block_resp->data());
// Validate received block data against the hash.
if (!validate_file_block_hash(hash, block_id, buf))
{
LOG_INFO << "State sync: Skipping state response due to file block hash mismatch.";
continue;
}
handle_file_block_response(vpath, block_id, buf);
}
// Now that we have received matching hash and handled it, remove it from the waiting list.
ctx.submitted_requests.erase(pending_resp_itr);
// After handling each response, check whether we have reached target state.
if (hpfs::get_hash(updated_state, ctx.hpfs_mount_dir, "/") < 1)
@@ -269,6 +314,68 @@ namespace state_sync
return 0;
}
/**
* Vadidated the received hash against the received fs entry map.
* @param vpath Virtual path of the fs.
* @param hash Received hash.
* @param fs_entry_map Received fs entry map.
* @returns true if hash is valid, otherwise false.
*/
bool validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const std::unordered_map<std::string, p2p::state_fs_hash_entry> &fs_entry_map)
{
hpfs::h32 content_hash;
// Initilal hash is vpath hash.
content_hash = crypto::get_hash(vpath);
// Then XOR the file hashes to the initial hash.
for (const auto &[name, fs_entry] : fs_entry_map)
{
content_hash ^= fs_entry.hash;
}
return content_hash.to_string_view() == hash;
}
/**
* Vadidated the received hash against the received file hash map.
* @param vpath Virtual path of the file.
* @param hash Received hash.
* @param hashes Received block hashes.
* @param hash_count Size of the hash list.
* @returns true if hash is valid, otherwise false.
*/
bool validate_file_hashmap_hash(std::string_view vpath, std::string_view hash, const hpfs::h32 *hashes, const size_t hash_count)
{
hpfs::h32 content_hash = hpfs::h32_empty;
// Initilal hash is vpath hash.
content_hash = crypto::get_hash(vpath);
// Then XOR the block hashes to the initial hash.
for (int32_t block_id = 0; block_id < hash_count; block_id++)
{
content_hash ^= hashes[block_id];
}
return content_hash.to_string_view() == hash;
}
/**
* Vadidated the received hash against the received block.
* @param hash Received hash.
* @param block_id Id of the block.
* @param buf Block buffer.
* @returns true if hash is valid, otherwise false.
*/
bool validate_file_block_hash(std::string_view hash, const uint32_t block_id, std::string_view buf)
{
// Calculate block offset of this block.
const off_t block_offset = block_id * hpfs::BLOCK_SIZE;
std::string_view offset = std::string_view(reinterpret_cast<const char *>(&block_offset), sizeof(off_t));
return crypto::get_hash(offset, buf) == hash;
}
/**
* Indicates whether to break out of state request processing loop.
*/
@@ -325,36 +432,35 @@ namespace state_sync
/**
* Process dir children response.
* @param vpath Virtual path of the fs.
* @param fs_entry_map Received fs entry map.
* @returns 0 on success, otherwise -1.
*/
int handle_fs_entry_response(std::string_view parent_vpath, const msg::fbuf::p2pmsg::Fs_Entry_Response *fs_entry_resp)
int handle_fs_entry_response(std::string_view vpath, std::unordered_map<std::string, p2p::state_fs_hash_entry> &fs_entry_map)
{
// Get the parent path of the fs entries we have received.
LOG_DEBUG << "State sync: Processing fs entries response for " << parent_vpath;
// Get fs entries we have received.
std::unordered_map<std::string, p2p::state_fs_hash_entry> peer_fs_entry_map;
msg::fbuf::p2pmsg::flatbuf_statefshashentry_to_statefshashentry(peer_fs_entry_map, fs_entry_resp->entries());
LOG_DEBUG << "State sync: Processing fs entries response for " << vpath;
// Create physical directory on our side if not exist.
std::string parent_physical_path = std::string(ctx.hpfs_mount_dir).append(parent_vpath);
std::string parent_physical_path = std::string(ctx.hpfs_mount_dir).append(vpath);
if (util::create_dir_tree_recursive(parent_physical_path) == -1)
return -1;
// Get the children hash entries and compare with what we got from peer.
std::vector<hpfs::child_hash_node> existing_fs_entries;
if (hpfs::get_dir_children_hashes(existing_fs_entries, ctx.hpfs_mount_dir, parent_vpath) == -1)
if (hpfs::get_dir_children_hashes(existing_fs_entries, ctx.hpfs_mount_dir, vpath) == -1)
return -1;
// Request more info on fs entries that exist on both sides but are different.
for (const auto &ex_entry : existing_fs_entries)
{
// Construct child vpath.
std::string child_vpath = std::string(parent_vpath)
.append(parent_vpath.back() != '/' ? "/" : "")
std::string child_vpath = std::string(vpath)
.append(vpath.back() != '/' ? "/" : "")
.append(ex_entry.name);
const auto peer_itr = peer_fs_entry_map.find(ex_entry.name);
if (peer_itr != peer_fs_entry_map.end())
const auto peer_itr = fs_entry_map.find(ex_entry.name);
if (peer_itr != fs_entry_map.end())
{
// Request state if hash is different.
if (peer_itr->second.hash != ex_entry.hash)
@@ -366,7 +472,7 @@ namespace state_sync
ctx.pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, peer_itr->second.hash});
}
peer_fs_entry_map.erase(peer_itr);
fs_entry_map.erase(peer_itr);
}
else
{
@@ -382,11 +488,11 @@ namespace state_sync
}
// Queue the remaining peer fs entries (that our side does not have at all) to request.
for (const auto &[name, fs_entry] : peer_fs_entry_map)
for (const auto &[name, fs_entry] : fs_entry_map)
{
// Construct child vpath.
std::string child_vpath = std::string(parent_vpath)
.append(parent_vpath.back() != '/' ? "/" : "")
std::string child_vpath = std::string(vpath)
.append(vpath.back() != '/' ? "/" : "")
.append(name);
// Prioritize file state requests over directories.
@@ -401,37 +507,38 @@ namespace state_sync
/**
* Process file block hash map response.
* @param vpath Virtual path of the file.
* @param hash Received hash.
* @param hashes Received block hashes.
* @param file_length Size of the file.
* @returns 0 on success, otherwise -1.
*/
int handle_file_hashmap_response(std::string_view file_vpath, const msg::fbuf::p2pmsg::File_HashMap_Response *file_resp)
int handle_file_hashmap_response(std::string_view vpath, const hpfs::h32 *hashes, const size_t hash_count, const uint64_t file_length)
{
// Get the file path of the block hashes we have received.
LOG_DEBUG << "State sync: Processing file block hashes response for " << file_vpath;
LOG_DEBUG << "State sync: Processing file block hashes response for " << vpath;
// File block hashes on our side (file might not exist on our side).
std::vector<hpfs::h32> existing_hashes;
if (hpfs::get_file_block_hashes(existing_hashes, ctx.hpfs_mount_dir, file_vpath) == -1 && errno != ENOENT)
if (hpfs::get_file_block_hashes(existing_hashes, ctx.hpfs_mount_dir, vpath) == -1 && errno != ENOENT)
return -1;
const size_t existing_hash_count = existing_hashes.size();
// File block hashes we received from the peer.
const hpfs::h32 *peer_hashes = reinterpret_cast<const hpfs::h32 *>(file_resp->hash_map()->data());
const size_t peer_hash_count = file_resp->hash_map()->size() / sizeof(hpfs::h32);
// Compare the block hashes and request any differences.
auto insert_itr = ctx.pending_requests.begin();
const int32_t max_block_id = MAX(existing_hash_count, peer_hash_count) - 1;
const int32_t max_block_id = MAX(existing_hash_count, hash_count) - 1;
for (int32_t block_id = 0; block_id <= max_block_id; block_id++)
{
// Insert at front to give priority to block requests while preserving block order.
if (block_id >= existing_hash_count || existing_hashes[block_id] != peer_hashes[block_id])
ctx.pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(file_vpath), block_id, peer_hashes[block_id]});
if (block_id >= existing_hash_count || existing_hashes[block_id] != hashes[block_id])
ctx.pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]});
}
if (existing_hashes.size() >= peer_hash_count)
if (existing_hashes.size() >= hash_count)
{
// If peer file might be smaller, truncate our file to match with peer file.
std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(file_vpath);
if (truncate(file_physical_path.c_str(), file_resp->file_length()) == -1)
std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(vpath);
if (truncate(file_physical_path.c_str(), file_length) == -1)
return -1;
}
@@ -440,18 +547,18 @@ namespace state_sync
/**
* Process file block response.
* @param vpath Virtual path of the file.
* @param block_id Id of the block.
* @param buf Block buffer.
* @returns 0 on success, otherwise -1.
*/
int handle_file_block_response(std::string_view file_vpath, const msg::fbuf::p2pmsg::Block_Response *block_msg)
int handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf)
{
// Get the file path of the block data we have received.
const uint32_t block_id = block_msg->block_id();
std::string_view buf = msg::fbuf::flatbuff_bytes_to_sv(block_msg->data());
LOG_DEBUG << "State sync: Writing block_id " << block_id
<< " (len:" << buf.length()
<< ") of " << file_vpath;
<< ") of " << vpath;
std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(file_vpath);
std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(vpath);
const int fd = open(file_physical_path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, FILE_PERMS);
if (fd == -1)
{

View File

@@ -5,6 +5,7 @@
#include "../p2p/p2p.hpp"
#include "../msg/fbuf/p2pmsg_content_generated.h"
#include "../hpfs/h32.hpp"
#include "../crypto.hpp"
namespace state_sync
{
@@ -64,6 +65,12 @@ namespace state_sync
int request_loop(const hpfs::h32 current_target, hpfs::h32 &updated_state);
bool validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const std::unordered_map<std::string, p2p::state_fs_hash_entry> &fs_entry_map);
bool validate_file_hashmap_hash(std::string_view vpath, std::string_view hash, const hpfs::h32 *hashes, const size_t hash_count);
bool validate_file_block_hash(std::string_view hash, const uint32_t block_id, std::string_view buf);
bool should_stop_request_loop(const hpfs::h32 current_target);
void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id,
@@ -71,11 +78,11 @@ namespace state_sync
void submit_request(const backlog_item &request, std::string_view lcl);
int handle_fs_entry_response(std::string_view parent_vpath, const msg::fbuf::p2pmsg::Fs_Entry_Response *fs_entry_resp);
int handle_fs_entry_response(std::string_view vpath, std::unordered_map<std::string, p2p::state_fs_hash_entry> &fs_entry_map);
int handle_file_hashmap_response(std::string_view file_vpath, const msg::fbuf::p2pmsg::File_HashMap_Response *file_resp);
int handle_file_hashmap_response(std::string_view vpath, const hpfs::h32 *hashes, const size_t hash_count, const uint64_t file_length);
int handle_file_block_response(std::string_view file_vpath, const msg::fbuf::p2pmsg::Block_Response *block_msg);
int handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf);
} // namespace state_sync