Pre-emptive hpfs sync responses. (#299)

Hpfs sync requester reports its own filesystem state to the responder. Responder uses that information to pre-emptively send additional filesystem data the requester may need.
This commit is contained in:
Ravin Perera
2021-05-01 08:27:42 +05:30
committed by GitHub
parent 3381608afd
commit 4f82afe5b1
12 changed files with 1023 additions and 424 deletions

View File

@@ -407,7 +407,7 @@ namespace consensus
<< " state:" << cp.state_hash
<< " patch:" << cp.patch_hash
<< " lps:" << cp.last_primary_shard_id
<< " lbs:" << cp.last_raw_shard_id
<< " lrs:" << cp.last_raw_shard_id
<< " [from:" << ((cp.pubkey == conf::cfg.node.public_key) ? "self" : util::to_hex(cp.pubkey).substr(2, 10)) << "]"
<< "(" << (cp.recv_timestamp > cp.sent_timestamp ? (cp.recv_timestamp - cp.sent_timestamp) : 0) << "ms)";
@@ -642,8 +642,8 @@ namespace consensus
<< " ts:" << p.time
<< " state:" << p.state_hash
<< " patch:" << p.patch_hash
<< " last_primary_shard_id:" << p.last_primary_shard_id
<< " last_raw_shard_id:" << p.last_raw_shard_id;
<< " lps:" << p.last_primary_shard_id
<< " lrs:" << p.last_raw_shard_id;
}
/**

View File

@@ -24,7 +24,7 @@ namespace hpfs
inline uint32_t get_request_resubmit_timeout()
{
return conf::cfg.contract.roundtime;
return conf::cfg.contract.roundtime * 0.7;
}
const util::h32 get_root_hash(const util::h32 &child_one, const util::h32 &child_two);

View File

@@ -11,12 +11,14 @@
namespace p2pmsg = msg::fbuf::p2pmsg;
/**
* Class for serving hpfs requests from other peers.
* Class for serving hpfs sync requests from other peers.
*/
namespace hpfs
{
constexpr uint16_t LOOP_WAIT = 20; // Milliseconds
constexpr const char *HPFS_SESSION_NAME = "rw";
constexpr uint16_t MAX_HASHMAP_RESPONSES_PER_REQUEST = 4;
constexpr uint16_t MAX_BLOCK_RESPONSES_PER_REQUEST = 1;
/**
* @param server_name The name of the serving instance. (For identification purpose)
@@ -90,25 +92,27 @@ namespace hpfs
break;
}
// Session id is in binary format. Converting to hex before printing.
LOG_DEBUG << "Serving hpfs request from [" << util::to_hex(session_id).substr(2, 10) << "]";
flatbuffers::FlatBufferBuilder fbuf;
if (hpfs_serve::create_hpfs_response(fbuf, hr) == 1)
std::list<flatbuffers::FlatBufferBuilder> fbufs;
if (hpfs_serve::generate_sync_responses(fbufs, hr) == 0 && !fbufs.empty())
{
// Find the peer that we should send the hpfs response to.
// Find the peer that we should send the sync responses to.
std::scoped_lock<std::mutex> lock(p2p::ctx.peer_connections_mutex);
const auto peer_itr = p2p::ctx.peer_connections.find(session_id);
if (peer_itr != p2p::ctx.peer_connections.end())
{
std::string_view msg = std::string_view(
reinterpret_cast<const char *>(fbuf.GetBufferPointer()), fbuf.GetSize());
comm::comm_session *session = peer_itr->second;
session->send(msg);
for (const flatbuffers::FlatBufferBuilder &fbuf : fbufs)
{
std::string_view msg = std::string_view(
reinterpret_cast<const char *>(fbuf.GetBufferPointer()), fbuf.GetSize());
session->send(msg);
}
}
}
LOG_DEBUG << "Hpfs " << name << " serve: Sent " << fbufs.size() << " replies to [" << util::to_hex(session_id).substr(2, 10) << "]";
}
fs_mount->release_rw_session();
@@ -120,13 +124,12 @@ namespace hpfs
}
/**
* Creates the reply message for a given hpfs request.
* @param fbuf The flatbuffer builder to construct the reply message.
* Creates reply messages for a given hpfs sync request.
* @param fbufs List of flatbuffer builders containing the generated reply messages.
* @param hr The hpfs request which should be replied to.
* @return 1 if successful hpfs response was generated. 0 if request is invalid
* and no response was generated. -1 on error.
* @return 0 on success. -1 on error.
*/
int hpfs_serve::create_hpfs_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::hpfs_request &hr)
int hpfs_serve::generate_sync_responses(std::list<flatbuffers::FlatBufferBuilder> &fbufs, const p2p::hpfs_request &hr)
{
LOG_DEBUG << "Serving hpfs req. path:" << hr.parent_path << " block_id:" << hr.block_id;
@@ -136,7 +139,7 @@ namespace hpfs
// Vector to hold the block bytes. Normally block size is constant BLOCK_SIZE (4MB), but the
// last block of a file may have a smaller size.
std::vector<uint8_t> block;
const int result = get_data_block(block, hr.parent_path, hr.block_id, hr.expected_hash);
const int result = get_data_block_with_hash_check(block, hr.parent_path, hr.block_id, hr.expected_hash);
if (result == -1)
{
@@ -145,14 +148,7 @@ namespace hpfs
}
else if (result == 1)
{
p2p::block_response resp;
resp.path = hr.parent_path;
resp.block_id = hr.block_id;
resp.hash = hr.expected_hash;
resp.data = std::string_view(reinterpret_cast<const char *>(block.data()), block.size());
p2pmsg::create_msg_from_block_response(fbuf, resp, fs_mount->mount_id);
return 1; // Success.
p2pmsg::create_msg_from_block_response(fbufs.emplace_back(), hr.block_id, block, hr.expected_hash, hr.parent_path, fs_mount->mount_id);
}
}
else
@@ -163,7 +159,7 @@ namespace hpfs
std::vector<util::h32> block_hashes;
size_t file_length = 0;
mode_t file_mode = 0;
const int result = get_data_block_hashes(block_hashes, file_length, file_mode, hr.parent_path, hr.expected_hash);
const int result = get_file_block_hashes_with_hash_check(block_hashes, file_length, file_mode, hr.parent_path, hr.expected_hash);
if (result == -1)
{
@@ -172,18 +168,40 @@ namespace hpfs
}
else if (result == 1)
{
// By looking at the block hashmap hints the requester has provided, we also include pre-emptive data block responses
// that the requester needs.
std::vector<uint32_t> responded_block_ids;
for (uint32_t block_id = 0; block_id < block_hashes.size(); block_id++)
{
if (responded_block_ids.size() < MAX_BLOCK_RESPONSES_PER_REQUEST &&
(hr.file_hashmap_hints.size() <= block_id || hr.file_hashmap_hints[block_id] != block_hashes[block_id]))
{
std::vector<uint8_t> block;
if (get_data_block(block, hr.parent_path, block_id) != -1)
{
p2pmsg::create_msg_from_block_response(fbufs.emplace_back(), block_id, block, block_hashes[block_id], hr.parent_path, fs_mount->mount_id);
responded_block_ids.push_back(block_id);
if (responded_block_ids.size() == MAX_BLOCK_RESPONSES_PER_REQUEST)
break;
}
}
}
// Generate parent reply. We must insert it at the begning of replies.
// This is the reply the requester originally requested. But we also indicate in it any pre-emptive hint responses
// we are sending along with it.
p2pmsg::create_msg_from_filehashmap_response(
fbuf, hr.parent_path, fs_mount->mount_id, block_hashes,
file_length, file_mode, hr.expected_hash);
return 1; // Success.
fbufs.emplace_front(), hr.parent_path, fs_mount->mount_id, block_hashes,
responded_block_ids, file_length, file_mode, hr.expected_hash);
}
}
else
{
// If the hpfs request is for a directory we need to reply with the
// file system entries and their hashes inside that dir.
std::vector<hpfs::child_hash_node> child_hash_nodes;
const int result = get_fs_entry_hashes(child_hash_nodes, hr.parent_path, hr.expected_hash);
std::vector<p2p::hpfs_fs_hash_entry> fs_entries;
const int result = get_fs_entry_hashes_with_hash_check(fs_entries, hr.parent_path, hr.expected_hash);
if (result == -1)
{
@@ -193,7 +211,7 @@ namespace hpfs
else if (result == 1)
{
// Get dir mode.
const std::string dir_path = fs_mount->rw_dir + hr.parent_path.data();
const std::string dir_path = fs_mount->physical_path(HPFS_SESSION_NAME, hr.parent_path);
struct stat st;
if (stat(dir_path.data(), &st) == -1)
{
@@ -201,23 +219,137 @@ namespace hpfs
return -1;
}
// By looking at the fs entry hints the requester has provided, we also include pre-emptive hashmap and data block
// responses that the requester needs.
generate_fs_entry_hint_responses(fbufs, fs_entries, hr.fs_entry_hints, hr.parent_path);
// Generate parent reply. We must insert it at the begning of replies.
// This is the reply the requester originally requested. But we also indicate in it any pre-emptive hint responses
// we are sending along with it. In this case, the 'fs entries' we are replying with are already marked as having an
// accompanying pre-emptive hint response.
p2pmsg::create_msg_from_fsentry_response(
fbuf, hr.parent_path, fs_mount->mount_id, st.st_mode, child_hash_nodes, hr.expected_hash);
return 1; // Success.
fbufs.emplace_front(), hr.parent_path, fs_mount->mount_id, st.st_mode, fs_entries, hr.expected_hash);
}
}
}
LOG_DEBUG << "No hpfs response generated.";
return 0;
}
/**
* Generates flatbuffer messages for any pre-emptive hint responses that we should send according to the fs entry hints provided by the requester.
* @param fbufs The flatbuffer message list to populate with hint responses.
* @param fs_entries The fs entry collection that is going to be sent with the parent fs entry reply.
* @param fs_entry_hints The fs entry hints the requester has provided.
* @param parent_path The vpath of the parent directory which contains the fs entries.
*/
void hpfs_serve::generate_fs_entry_hint_responses(std::list<flatbuffers::FlatBufferBuilder> &fbufs, std::vector<p2p::hpfs_fs_hash_entry> &fs_entries,
const std::vector<p2p::hpfs_fs_hash_entry> &fs_entry_hints, std::string_view parent_vpath)
{
// Counters tracking how many pre-emptive hint responses of each type we have generated so far.
size_t hashmap_responses = 0;
size_t block_responses = 0;
// Prepare hint map to provide match comparisons based on hints provided by the requester.
std::map<std::string, p2p::hpfs_fs_hash_entry> hint_fs_entry_map;
for (const p2p::hpfs_fs_hash_entry &hint : fs_entry_hints)
hint_fs_entry_map.emplace(hint.name, std::move(hint));
// For each fs entry we are replying with, look for the possibilty of generating hint responses.
for (p2p::hpfs_fs_hash_entry &entry : fs_entries)
{
// Check with provided hints to include match information.
const auto itr = hint_fs_entry_map.find(entry.name);
// Whether fs entry exists on the requesting party.
const bool exists_on_requester = itr != hint_fs_entry_map.end();
// Remove the entry from the hint list so at the end, the hint map will only contain children we don't possess on our side.
if (exists_on_requester)
hint_fs_entry_map.erase(entry.name);
entry.response_type = (exists_on_requester && itr->second.hash == entry.hash) ? p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::MATCHED : p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::MISMATCHED;
// Send hashmap hint response if we haven't reached the limit.
const bool send_hashmap_response = (entry.response_type == p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::MISMATCHED) && (hashmap_responses < MAX_HASHMAP_RESPONSES_PER_REQUEST);
if (!send_hashmap_response)
continue;
// Reaching this point means we have to generate the hashmap hint response along with the parent fs entry reply.
std::string child_vpath = std::string(parent_vpath)
.append(parent_vpath.back() != '/' ? "/" : "")
.append(entry.name);
if (entry.is_file)
{
std::vector<util::h32> block_hashes;
size_t file_length = 0;
mode_t file_mode = 0;
if (get_file_block_hashes(block_hashes, file_length, file_mode, child_vpath) != -1)
{
std::vector<uint32_t> responded_block_ids;
// Can additionally send block data hint response for block 0, if we know that the entire file does not exist on other side.
const bool send_block_response = !exists_on_requester && block_responses < MAX_BLOCK_RESPONSES_PER_REQUEST;
if (send_block_response)
{
std::vector<uint8_t> block;
if (get_data_block(block, child_vpath, 0) != -1)
{
p2pmsg::create_msg_from_block_response(fbufs.emplace_back(), 0, block, block_hashes[0], child_vpath, fs_mount->mount_id);
block_responses++;
responded_block_ids.push_back(0);
}
}
// If block response already inserted, we must insert hashmap response before that. This is because the hint resposnes must be
// sent in the logical dependency order. In this case, the hashmap hint response will indicate to the requester of any pre-emptive
// block data responses we are sending. Therefore, block data hint response must follow the hashmap hint response.
auto pos = fbufs.end();
if (!responded_block_ids.empty())
pos--;
p2pmsg::create_msg_from_filehashmap_response(
*fbufs.emplace(pos), child_vpath, fs_mount->mount_id, block_hashes,
responded_block_ids, file_length, file_mode, entry.hash);
entry.response_type = p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::RESPONDED;
hashmap_responses++;
}
}
else // Is dir.
{
// This is a directory, generate an fs entry resposne for that directory.
std::vector<p2p::hpfs_fs_hash_entry> fs_entries;
if (get_fs_entry_hashes(fs_entries, child_vpath) > 0)
{
struct stat st;
if (stat(child_vpath.data(), &st) == -1)
{
LOG_ERROR << errno << ": Error in getting dir metadata: " << child_vpath;
}
else
{
p2pmsg::create_msg_from_fsentry_response(
fbufs.emplace_back(), child_vpath, fs_mount->mount_id, st.st_mode, fs_entries, entry.hash);
entry.response_type = p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::RESPONDED;
hashmap_responses++;
}
}
}
}
// Take the reamainig entries in the hint list and include them in the fs entry response as not exist.
// When the requester sees this, they will remove those entries from their side.
for (const auto &[name, hint] : hint_fs_entry_map)
fs_entries.push_back(p2p::hpfs_fs_hash_entry{name, hint.is_file, util::h32_empty, p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::NOT_AVAILABLE});
}
/**
* Retrieves the specified data block from a hpfs file if expected hash matches.
* @return 1 if block data was succefully fetched. 0 if vpath or block does not exist. -1 on error.
*/
int hpfs_serve::get_data_block(std::vector<uint8_t> &block, const std::string_view vpath,
const uint32_t block_id, const util::h32 expected_hash)
int hpfs_serve::get_data_block_with_hash_check(std::vector<uint8_t> &block, const std::string_view vpath,
const uint32_t block_id, const util::h32 expected_hash)
{
// Check whether the existing block hash matches expected hash.
std::vector<util::h32> block_hashes;
@@ -236,53 +368,10 @@ namespace hpfs
}
else // Get actual block data.
{
struct stat st;
const std::string file_path = fs_mount->rw_dir + vpath.data();
const off_t block_offset = block_id * hpfs::BLOCK_SIZE;
const int fd = open(file_path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1)
{
LOG_ERROR << errno << ": Open failed " << file_path;
if (get_data_block(block, vpath, block_id) == -1)
result = -1;
}
else
{
if (fstat(fd, &st) == -1)
{
LOG_ERROR << errno << ": Stat failed. " << file_path;
result = -1;
}
else if (!S_ISREG(st.st_mode))
{
LOG_ERROR << "Not a file. " << file_path;
result = -1;
}
else if (block_offset > st.st_size)
{
LOG_ERROR << "Block offset " << block_offset << " larger than file " << st.st_size << " - " << file_path;
result = -1;
}
else
{
const size_t read_len = MIN(hpfs::BLOCK_SIZE, (st.st_size - block_offset));
block.resize(read_len);
lseek(fd, block_offset, SEEK_SET);
const int res = read(fd, block.data(), read_len);
if (res < read_len)
{
LOG_ERROR << errno << ": Read failed (result:" << res
<< " off:" << block_offset << " len:" << read_len << "). " << file_path;
result = -1;
}
else
{
result = 1; // Success.
}
}
close(fd);
}
result = 1; // Success.
}
}
@@ -291,10 +380,10 @@ namespace hpfs
/**
* Retrieves the specified file block hashes if expected hash matches.
* @return 1 if block hashes were successfuly fetched. 0 if vpath does not exist. -1 on error.
* @return 1 if block hashes were successfuly fetched. 0 if hash mismatch. -1 on error.
*/
int hpfs_serve::get_data_block_hashes(std::vector<util::h32> &hashes, size_t &file_length, mode_t &file_mode,
const std::string_view vpath, const util::h32 expected_hash)
int hpfs_serve::get_file_block_hashes_with_hash_check(std::vector<util::h32> &hashes, size_t &file_length, mode_t &file_mode,
const std::string_view vpath, const util::h32 expected_hash)
{
// Check whether the existing file hash matches expected hash.
util::h32 file_hash = util::h32_empty;
@@ -306,24 +395,12 @@ namespace hpfs
LOG_DEBUG << "Expected hash mismatch.";
result = 0;
}
// Get the block hashes.
else if (fs_mount->get_file_block_hashes(hashes, HPFS_SESSION_NAME, vpath) < 0)
{
result = -1;
}
else
{
// Get actual file metadata.
const std::string file_path = fs_mount->rw_dir + vpath.data();
struct stat st;
if (stat(file_path.c_str(), &st) == -1)
{
LOG_ERROR << errno << ": Stat failed when getting file metadata. " << file_path;
if (get_file_block_hashes(hashes, file_length, file_mode, vpath) == -1)
result = -1;
}
file_length = st.st_size;
file_mode = st.st_mode;
result = 1; // Success.
else
result = 1; // Success.
}
}
@@ -334,8 +411,8 @@ namespace hpfs
* Retrieves the specified dir entry hashes if expected fir hash matches.
* @return 1 if fs entry hashes were successfuly fetched. 0 if vpath does not exist. -1 on error.
*/
int hpfs_serve::get_fs_entry_hashes(std::vector<hpfs::child_hash_node> &hash_nodes,
const std::string_view vpath, const util::h32 expected_hash)
int hpfs_serve::get_fs_entry_hashes_with_hash_check(std::vector<p2p::hpfs_fs_hash_entry> &fs_entries,
const std::string_view vpath, const util::h32 expected_hash)
{
// Check whether the existing dir hash matches expected hash.
util::h32 dir_hash = util::h32_empty;
@@ -348,7 +425,7 @@ namespace hpfs
result = 0;
}
// Get the children hash nodes.
else if (fs_mount->get_dir_children_hashes(hash_nodes, HPFS_SESSION_NAME, vpath) < 0)
else if (get_fs_entry_hashes(fs_entries, vpath) < 0)
{
result = -1;
}
@@ -361,4 +438,104 @@ namespace hpfs
return result;
}
/**
* Fetches the specified file data block.
* @return 0 on success. -1 on error.
*/
int hpfs_serve::get_data_block(std::vector<uint8_t> &block, const std::string_view vpath, const uint32_t block_id)
{
struct stat st;
const std::string file_path = fs_mount->physical_path(HPFS_SESSION_NAME, vpath);
const off_t block_offset = block_id * hpfs::BLOCK_SIZE;
const int fd = open(file_path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1)
{
LOG_ERROR << errno << ": Open failed " << file_path;
return -1;
}
int result = 0;
if (fstat(fd, &st) == -1)
{
LOG_ERROR << errno << ": Stat failed. " << file_path;
result = -1;
}
else if (!S_ISREG(st.st_mode))
{
LOG_ERROR << "Not a file. " << file_path;
result = -1;
}
else if (block_offset > st.st_size)
{
LOG_ERROR << "Block offset " << block_offset << " larger than file " << st.st_size << " - " << file_path;
result = -1;
}
else
{
const size_t read_len = MIN(hpfs::BLOCK_SIZE, (st.st_size - block_offset));
block.resize(read_len);
lseek(fd, block_offset, SEEK_SET);
const int res = read(fd, block.data(), read_len);
if (res < read_len)
{
LOG_ERROR << errno << ": Read failed (result:" << res
<< " off:" << block_offset << " len:" << read_len << "). " << file_path;
result = -1;
}
else
{
result = 0; // Success.
}
}
close(fd);
return result;
}
/**
* Fetches the file data block hash list.
* @return 0 on success. -1 on error.
*/
int hpfs_serve::get_file_block_hashes(std::vector<util::h32> &hashes, size_t &file_length, mode_t &file_mode, const std::string_view vpath)
{
// Get the block hashes.
if (fs_mount->get_file_block_hashes(hashes, HPFS_SESSION_NAME, vpath) < 0)
{
return -1;
}
else
{
// Get actual file metadata.
const std::string file_path = fs_mount->physical_path(HPFS_SESSION_NAME, vpath);
struct stat st;
if (stat(file_path.c_str(), &st) == -1)
{
LOG_ERROR << errno << ": Stat failed when getting file metadata. " << file_path;
return -1;
}
file_length = st.st_size;
file_mode = st.st_mode;
return 0;
}
}
/**
* Populates the list of dir entry hashes for the specified vpath.
* @return 1 on success. 0 if vpath not found. -1 on error.
*/
int hpfs_serve::get_fs_entry_hashes(std::vector<p2p::hpfs_fs_hash_entry> &fs_entries, const std::string_view vpath)
{
std::vector<hpfs::child_hash_node> child_hash_nodes;
int res = fs_mount->get_dir_children_hashes(child_hash_nodes, HPFS_SESSION_NAME, vpath);
if (res > 0)
{
for (const hpfs::child_hash_node &hn : child_hash_nodes)
fs_entries.push_back(p2p::hpfs_fs_hash_entry{hn.name, hn.is_file, hn.hash});
}
return res;
}
} // namespace hpfs

View File

@@ -16,13 +16,22 @@ namespace hpfs
hpfs::hpfs_mount *fs_mount = NULL;
std::string_view name;
void hpfs_serve_loop();
int create_hpfs_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::hpfs_request &hr);
int get_data_block(std::vector<uint8_t> &block, const std::string_view vpath,
const uint32_t block_id, const util::h32 expected_hash);
int get_data_block_hashes(std::vector<util::h32> &hashes, size_t &file_length, mode_t &file_mode,
const std::string_view vpath, const util::h32 expected_hash);
int get_fs_entry_hashes(std::vector<hpfs::child_hash_node> &hash_nodes,
const std::string_view vpath, const util::h32 expected_hash);
int generate_sync_responses(std::list<flatbuffers::FlatBufferBuilder> &fbufs, const p2p::hpfs_request &hr);
void generate_fs_entry_hint_responses(std::list<flatbuffers::FlatBufferBuilder> &fbufs, std::vector<p2p::hpfs_fs_hash_entry> &our_fs_entries,
const std::vector<p2p::hpfs_fs_hash_entry> &fs_entry_hints, std::string_view parent_vpath);
int get_data_block_with_hash_check(std::vector<uint8_t> &block, const std::string_view vpath,
const uint32_t block_id, const util::h32 expected_hash);
int get_file_block_hashes_with_hash_check(std::vector<util::h32> &hashes, size_t &file_length, mode_t &file_mode,
const std::string_view vpath, const util::h32 expected_hash);
int get_fs_entry_hashes_with_hash_check(std::vector<p2p::hpfs_fs_hash_entry> &fs_entries,
const std::string_view vpath, const util::h32 expected_hash);
int get_data_block(std::vector<uint8_t> &block, const std::string_view vpath, const uint32_t block_id);
int get_file_block_hashes(std::vector<util::h32> &hashes, size_t &file_length, mode_t &file_mode, const std::string_view vpath);
int get_fs_entry_hashes(std::vector<p2p::hpfs_fs_hash_entry> &fs_entries, const std::string_view vpath);
protected:
std::list<std::pair<std::string, p2p::hpfs_request>> hpfs_requests;

View File

@@ -251,8 +251,7 @@ namespace hpfs
if (should_stop_request_loop(current_target_hash))
return 0;
LOG_DEBUG << "Hpfs " << name << " sync: Processing hpfs response from [" << response.first.substr(2, 10) << "]";
const std::string from = response.first.substr(2, 10); // Sender pubkey.
const p2pmsg::P2PMsg &msg = *p2pmsg::GetP2PMsg(response.second.data());
const p2pmsg::HpfsResponseMsg &resp_msg = *msg.content_as_HpfsResponseMsg();
@@ -264,7 +263,8 @@ namespace hpfs
const auto pending_resp_itr = submitted_requests.find(key);
if (pending_resp_itr == submitted_requests.end())
{
LOG_DEBUG << "Hpfs " << name << " sync: Skipping hpfs response due to hash mismatch.";
LOG_DEBUG << "Hpfs " << name << " sync: Skipping response from [" << from << "] because we are not looking for hash:"
<< util::to_hex(hash).substr(0, 10) << " of " << vpath;
continue;
}
@@ -276,17 +276,18 @@ namespace hpfs
const p2pmsg::HpfsFsEntryResponse &fs_resp = *resp_msg.content_as_HpfsFsEntryResponse();
// Get fs entries we have received.
std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> peer_fs_entry_map;
p2pmsg::flatbuf_hpfsfshashentry_to_hpfsfshashentry(peer_fs_entry_map, fs_resp.entries());
std::vector<p2p::hpfs_fs_hash_entry> peer_fs_entries;
p2pmsg::flatbuf_hpfsfshashentries_to_hpfsfshashentries(peer_fs_entries, fs_resp.entries());
// Validate received fs data against the hash.
if (!validate_fs_entry_hash(vpath, hash, fs_resp.dir_mode(), peer_fs_entry_map))
if (!validate_fs_entry_hash(vpath, hash, fs_resp.dir_mode(), peer_fs_entries))
{
LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to fs entry hash mismatch.";
LOG_INFO << "Hpfs " << name << " sync: Skipping response from [" << from << "] due to fs entry hash mismatch.";
continue;
}
handle_fs_entry_response(vpath, fs_resp.dir_mode(), peer_fs_entry_map);
LOG_DEBUG << "Hpfs " << name << " sync: Processing fs entries response from [" << from << "] for " << vpath;
handle_fs_entry_response(vpath, fs_resp.dir_mode(), peer_fs_entries);
}
else if (msg_type == p2pmsg::HpfsResponse_HpfsFileHashMapResponse)
{
@@ -299,11 +300,22 @@ namespace hpfs
// Validate received hashmap against the hash.
if (!validate_file_hashmap_hash(vpath, hash, file_resp.file_mode(), block_hashes, block_hash_count))
{
LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to file hashmap hash mismatch.";
LOG_INFO << "Hpfs " << name << " sync: Skipping response from [" << from << "] due to file hashmap hash mismatch.";
continue;
}
handle_file_hashmap_response(vpath, file_resp.file_mode(), block_hashes, block_hash_count, file_resp.file_length());
std::set<uint32_t> responded_block_ids;
{
const flatbuffers::Vector<uint32_t> *fbvec = file_resp.responded_block_ids();
const uint32_t *ptr = file_resp.responded_block_ids()->data();
const size_t count = file_resp.responded_block_ids()->size();
for (size_t i = 0; i < count; i++)
responded_block_ids.emplace(ptr[i]);
}
LOG_DEBUG << "Hpfs " << name << " sync: Processing file block hashes response from [" << from << "] for " << vpath;
handle_file_hashmap_response(vpath, file_resp.file_mode(), block_hashes, block_hash_count,
responded_block_ids, file_resp.file_length());
}
else if (msg_type == p2pmsg::HpfsResponse_HpfsBlockResponse)
{
@@ -316,10 +328,12 @@ namespace hpfs
// Validate received block data against the hash.
if (!validate_file_block_hash(hash, block_id, buf))
{
LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to file block hash mismatch.";
LOG_INFO << "Hpfs " << name << " sync: Skipping response from [" << from << "] due to file block hash mismatch.";
continue;
}
LOG_DEBUG << "Hpfs " << name << " sync: Processing block response from [" << from << "] for block_id:" << block_id
<< " (len:" << buf.length() << ") of " << vpath;
handle_file_block_response(vpath, block_id, buf);
}
@@ -337,7 +351,8 @@ namespace hpfs
// Update the central hpfs state tracker.
fs_mount->set_parent_hash(current_target.vpath, updated_state);
LOG_DEBUG << "Hpfs " << name << " sync: current:" << updated_state << " | target:" << current_target_hash;
LOG_DEBUG << "Hpfs " << name << " sync: current:" << updated_state << " | target:" << current_target_hash
<< " (" << current_target.vpath << ")";
if (updated_state == current_target_hash)
return 0;
}
@@ -377,8 +392,7 @@ namespace hpfs
// Reset the counter and re-submit request.
request.waiting_time = 0;
LOG_DEBUG << "Hpfs " << name << " sync: Resubmitting request...";
submit_request(request);
submit_request(request, false, true);
}
}
@@ -391,8 +405,7 @@ namespace hpfs
if (should_stop_request_loop(current_target_hash))
return 0;
const backlog_item &request = pending_requests.front();
submit_request(request);
submit_request(pending_requests.front());
pending_requests.pop_front();
}
}
@@ -405,11 +418,11 @@ namespace hpfs
* @param vpath Virtual path of the fs.
* @param hash Received hash.
* @param dir_mode Metadata 'mode' of the directory containing the fs entries.
* @param fs_entry_map Received fs entry map.
* @param peer_fs_entries Received peer fs entries.
* @returns true if hash is valid, otherwise false.
*/
bool hpfs_sync::validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const mode_t dir_mode,
const std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entry_map)
const std::vector<p2p::hpfs_fs_hash_entry> &peer_fs_entries)
{
util::h32 content_hash;
@@ -423,7 +436,7 @@ namespace hpfs
content_hash ^= crypto::get_hash(mode_bytes, sizeof(mode_bytes));
// Then XOR the file hashes to the initial hash.
for (const auto &[name, fs_entry] : fs_entry_map)
for (const p2p::hpfs_fs_hash_entry &fs_entry : peer_fs_entries)
{
content_hash ^= fs_entry.hash;
}
@@ -511,6 +524,24 @@ namespace hpfs
hr.expected_hash = expected_hash;
hr.mount_id = fs_mount->mount_id;
// Include appropriate hints in the request, so the peer can send pre-emptive responses that are useful to us without having
// to submit additional requests.
if (!hr.is_file) // Dir fs entry request.
{
// Include fs entry information from our side in the request.
std::vector<hpfs::child_hash_node> child_hash_nodes;
fs_mount->get_dir_children_hashes(child_hash_nodes, hpfs::RW_SESSION_NAME, path);
for (const hpfs::child_hash_node &hn : child_hash_nodes)
hr.fs_entry_hints.push_back(p2p::hpfs_fs_hash_entry{hn.name, hn.is_file, hn.hash});
}
else if (hr.is_file && hr.block_id == -1) // File hash map request.
{
// Include file hash map information from our side in the request (file might not exist on our side).
if (fs_mount->get_file_block_hashes(hr.file_hashmap_hints, hpfs::RW_SESSION_NAME, hr.parent_path) == -1)
hr.file_hashmap_hints.clear();
}
flatbuffers::FlatBufferBuilder fbuf;
p2pmsg::create_msg_from_hpfs_request(fbuf, hr);
p2p::send_message_to_random_peer(fbuf, target_pubkey); //todo: send to a node that hold the expected hash to improve reliability of retrieving hpfs state.
@@ -518,39 +549,49 @@ namespace hpfs
/**
* Submits a pending hpfs request to the peer.
* @param request The request to submit and start watching for response.
* @param watch_only Whether to actually send the request or watch for corresponding response only.
* Used for hint response monitoring.
* @param is_resubmit Whether this is a request resubmission or not.
*/
void hpfs_sync::submit_request(const backlog_item &request)
void hpfs_sync::submit_request(const backlog_item &request, const bool watch_only, const bool is_resubmit)
{
const std::string key = std::string(request.path)
.append(reinterpret_cast<const char *>(&request.expected_hash), sizeof(util::h32));
submitted_requests.try_emplace(key, request);
const bool is_file = request.type != BACKLOG_ITEM_TYPE::DIR;
std::string target_pubkey;
request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, target_pubkey);
if (!target_pubkey.empty())
LOG_DEBUG << "Hpfs " << name << " sync: Requesting from [" << target_pubkey.substr(2, 10) << "]. type:" << request.type
if (watch_only)
{
LOG_DEBUG << "Hpfs " << name << " sync: Watching response for request. type:" << request.type
<< " path:" << request.path << " block_id:" << request.block_id
<< " hash:" << request.expected_hash;
}
else
{
const bool is_file = request.type != BACKLOG_ITEM_TYPE::DIR;
std::string target_pubkey;
request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, target_pubkey);
LOG_DEBUG << "Hpfs " << name << " sync: " << (is_resubmit ? "Re-submitting" : "Submitting")
<< " request to [" << (target_pubkey.empty() ? "" : target_pubkey.substr(2, 10)) << "]. type:" << request.type
<< " path:" << request.path << " block_id:" << request.block_id
<< " hash:" << request.expected_hash;
}
}
/**
* Process dir children response.
* @param vpath Virtual path of the fs.
* @param dir_mode Metadata 'mode' of dir.
* @param fs_entry_map Received fs entry map.
* @param peer_fs_entries Received peer fs entries.
* @returns 0 on success and no fs write peformed. 1 if write performed. -1 on failure.
*/
int hpfs_sync::handle_fs_entry_response(std::string_view vpath, const mode_t dir_mode, std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entry_map)
int hpfs_sync::handle_fs_entry_response(std::string_view vpath, const mode_t dir_mode, const std::vector<p2p::hpfs_fs_hash_entry> &peer_fs_entries)
{
// Get the parent path of the fs entries we have received.
LOG_DEBUG << "Hpfs " << name << " sync: Processing fs entries response for " << vpath;
bool write_performed = false;
// Create physical directory on our side if not exist.
std::string parent_physical_path = fs_mount->rw_dir + vpath.data();
std::string parent_physical_path = fs_mount->physical_path(hpfs::RW_SESSION_NAME, vpath);
if (util::create_dir_tree_recursive(parent_physical_path) == -1)
return -1;
@@ -561,63 +602,40 @@ namespace hpfs
else if (metadata_res == 1)
write_performed = true;
// Get the children hash entries and compare with what we got from peer.
std::vector<hpfs::child_hash_node> existing_fs_entries;
if (fs_mount->get_dir_children_hashes(existing_fs_entries, hpfs::RW_SESSION_NAME, vpath) == -1)
return -1;
// Request more info on fs entries that exist on both sides but are different.
for (const auto &ex_entry : existing_fs_entries)
for (const p2p::hpfs_fs_hash_entry &entry : peer_fs_entries)
{
// Construct child vpath.
std::string child_vpath = std::string(vpath)
.append(vpath.back() != '/' ? "/" : "")
.append(ex_entry.name);
.append(entry.name);
const auto peer_itr = fs_entry_map.find(ex_entry.name);
if (peer_itr != fs_entry_map.end())
if (entry.response_type == p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::MISMATCHED)
{
// Request hpfs state if hash is different.
if (peer_itr->second.hash != ex_entry.hash)
{
// Prioritize file hpfs requests over directories.
if (ex_entry.is_file)
pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, peer_itr->second.hash});
else
pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, peer_itr->second.hash});
}
fs_entry_map.erase(peer_itr);
// We must request for this entry. Prioritize file hpfs requests over directories.
if (entry.is_file)
pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, entry.hash});
else
pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, entry.hash});
}
else
else if (entry.response_type == p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::RESPONDED)
{
// If there was an entry that does not exist on other side, delete it.
std::string child_physical_path = fs_mount->rw_dir + child_vpath.data();
// The peer has already responded with a pre-emptive hint response. So we must start watching for it.
submit_request(backlog_item{entry.is_file ? BACKLOG_ITEM_TYPE::FILE : BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, entry.hash}, true);
}
else if (entry.response_type == p2p::HPFS_FS_ENTRY_RESPONSE_TYPE::NOT_AVAILABLE)
{
// This entry is not available in peer. So we must delete it from our side.
std::string child_physical_path = fs_mount->physical_path(hpfs::RW_SESSION_NAME, child_vpath);
if ((ex_entry.is_file && unlink(child_physical_path.c_str()) == -1) ||
!ex_entry.is_file && util::remove_directory_recursively(child_physical_path.c_str()) == -1)
if ((entry.is_file && unlink(child_physical_path.c_str()) == -1) ||
!entry.is_file && util::remove_directory_recursively(child_physical_path.c_str()) == -1)
return -1;
write_performed = true;
LOG_DEBUG << "Hpfs " << name << " sync: Deleted " << (ex_entry.is_file ? "file" : "dir") << " path " << child_vpath;
LOG_DEBUG << "Hpfs " << name << " sync: Deleted " << (entry.is_file ? "file" : "dir") << " path " << child_vpath;
}
}
// Queue the remaining peer fs entries (that our side does not have at all) to request.
for (const auto &[name, fs_entry] : fs_entry_map)
{
// Construct child vpath.
std::string child_vpath = std::string(vpath)
.append(vpath.back() != '/' ? "/" : "")
.append(name);
// Prioritize file hpfs requests over directories.
if (fs_entry.is_file)
pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, fs_entry.hash});
else
pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, fs_entry.hash});
}
return write_performed ? 1 : 0;
}
@@ -627,14 +645,13 @@ namespace hpfs
* @param file_mode Received metadata mode of the file.
* @param hashes Received block hashes.
* @param hash_count No. of received block hashes.
* @param responded_block_ids List of block ids already responded by the peer.
* @param file_length Size of the file.
* @returns 0 on success and no write operation performed. 1 if write opreation peformed. -1 on failure.
*/
int hpfs_sync::handle_file_hashmap_response(std::string_view vpath, const mode_t file_mode, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length)
int hpfs_sync::handle_file_hashmap_response(std::string_view vpath, const mode_t file_mode, const util::h32 *hashes, const size_t hash_count,
const std::set<uint32_t> &responded_block_ids, const uint64_t file_length)
{
// Get the file path of the block hashes we have received.
LOG_DEBUG << "Hpfs " << name << " sync: Processing file block hashes response for " << vpath;
bool write_performed = false;
// File block hashes on our side (file might not exist on our side).
@@ -648,15 +665,22 @@ namespace hpfs
const int32_t max_block_id = MAX(existing_hash_count, hash_count) - 1;
for (int32_t block_id = 0; block_id <= max_block_id; block_id++)
{
// Insert at front to give priority to block requests while preserving block order.
if (block_id >= existing_hash_count || existing_hashes[block_id] != hashes[block_id])
if (responded_block_ids.count(block_id) == 1)
{
// The peer has already responded with a hint response. So we must start watching for it.
submit_request(backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]}, true);
}
else if (block_id >= existing_hash_count || existing_hashes[block_id] != hashes[block_id])
{
// Insert at front to give priority to block requests while preserving block order.
pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]});
}
}
if (existing_hashes.size() >= hash_count)
{
// If peer file might be smaller, truncate our file to match with peer file.
std::string file_physical_path = fs_mount->rw_dir + vpath.data();
std::string file_physical_path = fs_mount->physical_path(hpfs::RW_SESSION_NAME, vpath);
if (truncate(file_physical_path.c_str(), file_length) == -1)
return -1;
@@ -664,7 +688,7 @@ namespace hpfs
}
// Apply physical file mode if received mode is different from our side.
const std::string physical_path = fs_mount->rw_dir + vpath.data();
const std::string physical_path = fs_mount->physical_path(hpfs::RW_SESSION_NAME, vpath);
const int metadata_res = apply_metadata_mode(physical_path, file_mode, false);
if (metadata_res == -1)
return -1;
@@ -683,11 +707,7 @@ namespace hpfs
*/
int hpfs_sync::handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf)
{
LOG_DEBUG << "Hpfs " << name << " sync: Writing block_id " << block_id
<< " (len:" << buf.length()
<< ") of " << vpath;
std::string file_physical_path = fs_mount->rw_dir + vpath.data();
std::string file_physical_path = fs_mount->physical_path(hpfs::RW_SESSION_NAME, vpath);
const int fd = open(file_physical_path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, FILE_PERMS);
if (fd == -1)
{

View File

@@ -71,7 +71,7 @@ namespace hpfs
int start_syncing_next_target();
bool validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const mode_t dir_mode,
const std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entry_map);
const std::vector<p2p::hpfs_fs_hash_entry> &peer_fs_entries);
bool validate_file_hashmap_hash(std::string_view vpath, std::string_view hash, const mode_t file_mode,
const util::h32 *hashes, const size_t hash_count);
@@ -83,11 +83,12 @@ namespace hpfs
void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id,
const util::h32 expected_hash, std::string &target_pubkey);
void submit_request(const backlog_item &request);
void submit_request(const backlog_item &request, const bool watch_only = false, const bool is_resubmit = false);
int handle_fs_entry_response(std::string_view vpath, const mode_t dir_mode, std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entry_map);
int handle_fs_entry_response(std::string_view vpath, const mode_t dir_mode, const std::vector<p2p::hpfs_fs_hash_entry> &peer_fs_entries);
int handle_file_hashmap_response(std::string_view vpath, const mode_t file_mode, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length);
int handle_file_hashmap_response(std::string_view vpath, const mode_t file_mode, const util::h32 *hashes, const size_t hash_count,
const std::set<uint32_t> &responded_block_ids, const uint64_t file_length);
int handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf);

View File

@@ -40,7 +40,6 @@ namespace ledger
return;
}
util::h32 prev_shard_hash_from_hpfs;
const std::string shard_parent_dir = synced_target.vpath.substr(0, pos);
if (shard_parent_dir == PRIMARY_DIR)
@@ -75,8 +74,12 @@ namespace ledger
last_primary_shard_seq_no - synced_shard_seq_no + 1 < conf::cfg.node.history_config.max_primary_shards)
{
// Check whether the hash of the previous shard matches with the hash in the prev_shard.hash file.
const std::string prev_shard_vpath = std::string(PRIMARY_DIR).append("/").append(std::to_string(--synced_shard_seq_no));
fs_mount->get_hash(prev_shard_hash_from_hpfs, hpfs::RW_SESSION_NAME, prev_shard_vpath);
util::h32 prev_shard_hash_from_hpfs = util::h32_empty;
if (synced_shard_seq_no > 0)
{
const std::string prev_shard_vpath = std::string(PRIMARY_DIR).append("/").append(std::to_string(--synced_shard_seq_no));
fs_mount->get_hash(prev_shard_hash_from_hpfs, hpfs::RW_SESSION_NAME, prev_shard_vpath);
}
if (prev_shard_hash_from_file != util::h32_empty // Hash in the prev_shard.hash of the 0th shard is h32 empty. Syncing should be stopped then.
&& prev_shard_hash_from_file != prev_shard_hash_from_hpfs) // Continue to sync backwards if the hash from prev_shard.hash is not matching with the shard hash from hpfs.
@@ -123,8 +126,12 @@ namespace ledger
last_raw_shard_seq_no - synced_shard_seq_no + 1 < conf::cfg.node.history_config.max_raw_shards)
{
// Check whether the hash of the previous raw shard matches with the hash in the prev_shard.hash file.
const std::string prev_shard_vpath = std::string(RAW_DIR).append("/").append(std::to_string(--synced_shard_seq_no));
fs_mount->get_hash(prev_shard_hash_from_hpfs, hpfs::RW_SESSION_NAME, prev_shard_vpath);
util::h32 prev_shard_hash_from_hpfs = util::h32_empty;
if (synced_shard_seq_no > 0)
{
const std::string prev_shard_vpath = std::string(RAW_DIR).append("/").append(std::to_string(--synced_shard_seq_no));
fs_mount->get_hash(prev_shard_hash_from_hpfs, hpfs::RW_SESSION_NAME, prev_shard_vpath);
}
if (prev_shard_hash_from_file != util::h32_empty // Hash in the prev_shard.hash of the 0th shard is h32 empty. Syncing should be stopped then.
&& prev_shard_hash_from_file != prev_shard_hash_from_hpfs) // Continue to sync backwards if the hash from prev_shard.hash is not matching with the shard hash from hpfs.

View File

@@ -81,82 +81,100 @@ table NplMsg {
// Make sure to update signature generation/verification whenever these fields are changed.
}
table HpfsRequestMsg {
mount_id: uint32;
parent_path:string;
is_file:bool;
block_id:int32;
expected_hash:[ubyte];
}
//--hpfs requests and responses--//
union HpfsResponse {
HpfsFileHashMapResponse,
HpfsBlockResponse,
HpfsFsEntryResponse
}
enum HpfsFsEntryResponseType : byte { Matched = 0, Mismatched = 1, Responded = 2, NotAvailable = 3 }
table HpfsResponseMsg{
hash:[ubyte];
path: string;
mount_id: uint32;
content:HpfsResponse;
}
table HpfsFsEntryResponse{
dir_mode:uint32;
entries: [HpfsFSHashEntry];
}
table HpfsFileHashMapResponse{
file_length:uint64;
file_mode:uint32;
hash_map:[ubyte];
}
table HpfsBlockResponse{
block_id:uint32;
data: [ubyte];
}
table HpfsFSHashEntry{
table HpfsFSHashEntry {
name: string;
is_file: bool;
hash: [ubyte];
response_type: HpfsFsEntryResponseType;
}
table HpfsLogRequest
{
target_seq_no:uint64;
min_record_id:SequenceHash;
table HpfsFsEntryHint {
entries: [HpfsFSHashEntry];
}
table HpfsLogResponse
{
min_record_id:SequenceHash;
log_record_bytes:[ubyte];
table HpfsFileHashMapHint {
hash_map: [ubyte];
}
table PeerRequirementAnnouncementMsg{
union HpfsRequestHint {
HpfsFsEntryHint,
HpfsFileHashMapHint
}
table HpfsRequestMsg {
mount_id: uint32;
parent_path: string;
is_file: bool;
block_id: int32;
expected_hash: [ubyte];
hint: HpfsRequestHint;
}
union HpfsResponse {
HpfsFsEntryResponse,
HpfsFileHashMapResponse,
HpfsBlockResponse
}
table HpfsResponseMsg {
hash: [ubyte];
path: string;
mount_id: uint32;
content: HpfsResponse;
}
table HpfsFsEntryResponse {
dir_mode: uint32;
entries: [HpfsFSHashEntry];
}
table HpfsFileHashMapResponse {
file_length: uint64;
file_mode: uint32;
hash_map: [ubyte];
responded_block_ids: [uint32];
}
table HpfsBlockResponse {
block_id: uint32;
data: [ubyte];
}
table HpfsLogRequest {
target_seq_no: uint64;
min_record_id: SequenceHash;
}
table HpfsLogResponse {
min_record_id: SequenceHash;
log_record_bytes: [ubyte];
}
table PeerRequirementAnnouncementMsg {
need_consensus_msg_forwarding: bool;
}
table PeerCapacityAnnouncementMsg{
available_capacity:int16;
timestamp:uint64;
table PeerCapacityAnnouncementMsg {
available_capacity: int16;
timestamp: uint64;
}
table PeerListRequestMsg{
table PeerListRequestMsg {
}
table PeerListResponseMsg{
table PeerListResponseMsg {
peer_list: [PeerProperties];
}
table PeerProperties {
host_address:string;
port:uint16;
available_capacity:int16;
timestamp:uint64;
host_address: string;
port: uint16;
available_capacity: int16;
timestamp: uint64;
}
table SequenceHash {
@@ -165,7 +183,7 @@ table SequenceHash {
}
table ByteArray { // To help represent list of byte arrays
array:[ubyte];
array: [ubyte];
}
root_type P2PMsg; //root type for all messages

View File

@@ -206,6 +206,19 @@ namespace msg::fbuf::p2pmsg
hr.is_file = msg.is_file();
hr.parent_path = flatbuf_str_to_sv(msg.parent_path());
hr.expected_hash = flatbuf_bytes_to_hash(msg.expected_hash());
if (msg.hint_type() == HpfsRequestHint_HpfsFsEntryHint)
{
flatbuf_hpfsfshashentries_to_hpfsfshashentries(hr.fs_entry_hints, msg.hint_as_HpfsFsEntryHint()->entries());
}
else if (msg.hint_type() == HpfsRequestHint_HpfsFileHashMapHint)
{
const HpfsFileHashMapHint *hint = msg.hint_as_HpfsFileHashMapHint();
const size_t block_hash_count = hint->hash_map()->size() / sizeof(util::h32);
hr.file_hashmap_hints.resize(block_hash_count);
memcpy(hr.file_hashmap_hints.data(), hint->hash_map()->data(), hint->hash_map()->size());
}
return hr;
}
@@ -266,7 +279,7 @@ namespace msg::fbuf::p2pmsg
return map;
}
void flatbuf_hpfsfshashentry_to_hpfsfshashentry(std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entries, const flatbuffers::Vector<flatbuffers::Offset<HpfsFSHashEntry>> *fhashes)
void flatbuf_hpfsfshashentries_to_hpfsfshashentries(std::vector<p2p::hpfs_fs_hash_entry> &fs_entries, const flatbuffers::Vector<flatbuffers::Offset<HpfsFSHashEntry>> *fhashes)
{
for (const HpfsFSHashEntry *f_hash : *fhashes)
{
@@ -274,8 +287,9 @@ namespace msg::fbuf::p2pmsg
entry.name = flatbuf_str_to_sv(f_hash->name());
entry.is_file = f_hash->is_file();
entry.hash = flatbuf_bytes_to_hash(f_hash->hash());
entry.response_type = (p2p::HPFS_FS_ENTRY_RESPONSE_TYPE)f_hash->response_type();
fs_entries.emplace(entry.name, std::move(entry));
fs_entries.push_back(std::move(entry));
}
}
@@ -409,6 +423,33 @@ namespace msg::fbuf::p2pmsg
}
void create_msg_from_hpfs_request(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_request &hr)
{
if (!hr.is_file) // Dir fs entry request.
{
const auto hint = CreateHpfsFsEntryHint(
builder,
hpfsfshashentry_to_flatbuf_hpfsfshashentry(builder, hr.fs_entry_hints));
create_hpfs_request_msg(builder, hr, HpfsRequestHint_HpfsFsEntryHint, hint.Union());
}
else if (hr.is_file && hr.block_id == -1) // File hash map request.
{
std::string_view hashmap_sv(reinterpret_cast<const char *>(hr.file_hashmap_hints.data()), hr.file_hashmap_hints.size() * sizeof(util::h32));
const auto hint = CreateHpfsFileHashMapHint(
builder,
sv_to_flatbuf_bytes(builder, hashmap_sv));
create_hpfs_request_msg(builder, hr, HpfsRequestHint_HpfsFileHashMapHint, hint.Union());
}
else
{
create_hpfs_request_msg(builder, hr);
}
}
void create_hpfs_request_msg(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_request &hr,
msg::fbuf::p2pmsg::HpfsRequestHint hint_type, flatbuffers::Offset<void> hint)
{
const auto msg = CreateHpfsRequestMsg(
builder,
@@ -416,7 +457,9 @@ namespace msg::fbuf::p2pmsg
sv_to_flatbuf_str(builder, hr.parent_path),
hr.is_file,
hr.block_id,
hash_to_flatbuf_bytes(builder, hr.expected_hash));
hash_to_flatbuf_bytes(builder, hr.expected_hash),
hint_type,
hint);
create_p2p_msg(builder, P2PMsgContent_HpfsRequestMsg, msg.Union());
}
@@ -443,12 +486,12 @@ namespace msg::fbuf::p2pmsg
void create_msg_from_fsentry_response(
flatbuffers::FlatBufferBuilder &builder, const std::string_view path, const uint32_t mount_id, const mode_t dir_mode,
std::vector<hpfs::child_hash_node> &hash_nodes, const util::h32 &expected_hash)
std::vector<p2p::hpfs_fs_hash_entry> &fs_entries, const util::h32 &expected_hash)
{
const auto child_msg = CreateHpfsFsEntryResponse(
builder,
dir_mode,
hpfsfshashentry_to_flatbuf_hpfsfshashentry(builder, hash_nodes));
hpfsfshashentry_to_flatbuf_hpfsfshashentry(builder, fs_entries));
const auto msg = CreateHpfsResponseMsg(
builder,
@@ -463,7 +506,8 @@ namespace msg::fbuf::p2pmsg
void create_msg_from_filehashmap_response(
flatbuffers::FlatBufferBuilder &builder, std::string_view path, const uint32_t mount_id,
std::vector<util::h32> &hashmap, const std::size_t file_length, const mode_t file_mode, const util::h32 &expected_hash)
const std::vector<util::h32> &hashmap, const std::vector<uint32_t> &responded_block_ids,
const std::size_t file_length, const mode_t file_mode, const util::h32 &expected_hash)
{
std::string_view hashmap_sv(reinterpret_cast<const char *>(hashmap.data()), hashmap.size() * sizeof(util::h32));
@@ -471,7 +515,8 @@ namespace msg::fbuf::p2pmsg
builder,
file_length,
file_mode,
sv_to_flatbuf_bytes(builder, hashmap_sv));
sv_to_flatbuf_bytes(builder, hashmap_sv),
builder.CreateVector(responded_block_ids));
const auto msg = CreateHpfsResponseMsg(
builder,
@@ -484,17 +529,18 @@ namespace msg::fbuf::p2pmsg
create_p2p_msg(builder, P2PMsgContent_HpfsResponseMsg, msg.Union());
}
void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &builder, p2p::block_response &block_resp, const uint32_t mount_id)
void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &builder, const uint32_t block_id, const std::vector<uint8_t> &block_data,
const util::h32 &block_hash, std::string_view parent_path, const uint32_t mount_id)
{
const auto child_msg = CreateHpfsBlockResponse(
builder,
block_resp.block_id,
sv_to_flatbuf_bytes(builder, block_resp.data));
block_id,
builder.CreateVector(block_data));
const auto msg = CreateHpfsResponseMsg(
builder,
hash_to_flatbuf_bytes(builder, block_resp.hash),
sv_to_flatbuf_str(builder, block_resp.path),
hash_to_flatbuf_bytes(builder, block_hash),
sv_to_flatbuf_str(builder, parent_path),
mount_id,
HpfsResponse_HpfsBlockResponse,
child_msg.Union());
@@ -562,19 +608,18 @@ namespace msg::fbuf::p2pmsg
}
const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<HpfsFSHashEntry>>>
hpfsfshashentry_to_flatbuf_hpfsfshashentry(
flatbuffers::FlatBufferBuilder &builder,
std::vector<hpfs::child_hash_node> &hash_nodes)
hpfsfshashentry_to_flatbuf_hpfsfshashentry(flatbuffers::FlatBufferBuilder &builder, const std::vector<p2p::hpfs_fs_hash_entry> &fs_entries)
{
std::vector<flatbuffers::Offset<HpfsFSHashEntry>> fbvec;
fbvec.reserve(hash_nodes.size());
for (auto const &hash_node : hash_nodes)
fbvec.reserve(fs_entries.size());
for (auto const &fs_entry : fs_entries)
{
flatbuffers::Offset<HpfsFSHashEntry> hpfs_fs_entry = CreateHpfsFSHashEntry(
builder,
sv_to_flatbuf_str(builder, hash_node.name),
hash_node.is_file,
hash_to_flatbuf_bytes(builder, hash_node.hash));
sv_to_flatbuf_str(builder, fs_entry.name),
fs_entry.is_file,
hash_to_flatbuf_bytes(builder, fs_entry.hash),
(HpfsFsEntryResponseType)fs_entry.response_type);
fbvec.push_back(hpfs_fs_entry);
}

View File

@@ -3,7 +3,6 @@
#include "../../pchheader.hpp"
#include "../../p2p/p2p.hpp"
#include "../../hpfs/hpfs_mount.hpp"
#include "p2pmsg_generated.h"
namespace msg::fbuf::p2pmsg
@@ -48,7 +47,7 @@ namespace msg::fbuf::p2pmsg
const std::unordered_map<std::string, std::list<usr::submitted_user_input>>
flatbuf_user_input_group_to_user_input_map(const flatbuffers::Vector<flatbuffers::Offset<UserInputGroup>> *fbvec);
void flatbuf_hpfsfshashentry_to_hpfsfshashentry(std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entries, const flatbuffers::Vector<flatbuffers::Offset<HpfsFSHashEntry>> *fhashes);
void flatbuf_hpfsfshashentries_to_hpfsfshashentries(std::vector<p2p::hpfs_fs_hash_entry> &fs_entries, const flatbuffers::Vector<flatbuffers::Offset<HpfsFSHashEntry>> *fhashes);
const std::vector<p2p::peer_properties>
flatbuf_peer_propertieslist_to_peer_propertiesvector(const flatbuffers::Vector<flatbuffers::Offset<PeerProperties>> *fbvec);
@@ -73,19 +72,24 @@ namespace msg::fbuf::p2pmsg
void create_msg_from_hpfs_request(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_request &hr);
void create_hpfs_request_msg(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_request &hr,
msg::fbuf::p2pmsg::HpfsRequestHint hint_type = HpfsRequestHint_NONE, flatbuffers::Offset<void> hint = 0);
void create_msg_from_hpfs_log_request(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_log_request &hpfs_log_request);
void create_msg_from_hpfs_log_response(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_log_response &hpfs_log_response);
void create_msg_from_fsentry_response(
flatbuffers::FlatBufferBuilder &builder, const std::string_view path, const uint32_t mount_id, const mode_t dir_mode,
std::vector<hpfs::child_hash_node> &hash_nodes, const util::h32 &expected_hash);
std::vector<p2p::hpfs_fs_hash_entry> &fs_entries, const util::h32 &expected_hash);
void create_msg_from_filehashmap_response(
flatbuffers::FlatBufferBuilder &builder, std::string_view path, const uint32_t mount_id,
std::vector<util::h32> &hashmap, const std::size_t file_length, const mode_t file_mode, const util::h32 &expected_hash);
const std::vector<util::h32> &hashmap, const std::vector<uint32_t> &responded_block_ids,
const std::size_t file_length, const mode_t file_mode, const util::h32 &expected_hash);
void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &builder, p2p::block_response &block_resp, const uint32_t mount_id);
void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &builder, const uint32_t block_id, const std::vector<uint8_t> &block_data,
const util::h32 &block_hash, std::string_view parent_path, const uint32_t mount_id);
void create_msg_from_peer_requirement_announcement(flatbuffers::FlatBufferBuilder &builder, const bool need_consensus_msg_forwarding);
@@ -99,9 +103,7 @@ namespace msg::fbuf::p2pmsg
user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map<std::string, std::list<usr::submitted_user_input>> &map);
const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<HpfsFSHashEntry>>>
hpfsfshashentry_to_flatbuf_hpfsfshashentry(
flatbuffers::FlatBufferBuilder &builder,
std::vector<hpfs::child_hash_node> &hash_nodes);
hpfsfshashentry_to_flatbuf_hpfsfshashentry(flatbuffers::FlatBufferBuilder &builder, const std::vector<p2p::hpfs_fs_hash_entry> &fs_entries);
const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<PeerProperties>>>
peer_propertiesvector_to_flatbuf_peer_propertieslist(flatbuffers::FlatBufferBuilder &builder, const std::vector<p2p::peer_properties> &peers, const std::optional<conf::peer_ip_port> &skipping_ip_port);

View File

@@ -34,6 +34,15 @@ struct ProposalMsgBuilder;
struct NplMsg;
struct NplMsgBuilder;
struct HpfsFSHashEntry;
struct HpfsFSHashEntryBuilder;
struct HpfsFsEntryHint;
struct HpfsFsEntryHintBuilder;
struct HpfsFileHashMapHint;
struct HpfsFileHashMapHintBuilder;
struct HpfsRequestMsg;
struct HpfsRequestMsgBuilder;
@@ -49,9 +58,6 @@ struct HpfsFileHashMapResponseBuilder;
struct HpfsBlockResponse;
struct HpfsBlockResponseBuilder;
struct HpfsFSHashEntry;
struct HpfsFSHashEntryBuilder;
struct HpfsLogRequest;
struct HpfsLogRequestBuilder;
@@ -204,21 +210,105 @@ template<> struct P2PMsgContentTraits<msg::fbuf::p2pmsg::HpfsLogResponse> {
bool VerifyP2PMsgContent(flatbuffers::Verifier &verifier, const void *obj, P2PMsgContent type);
bool VerifyP2PMsgContentVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector<flatbuffers::Offset<void>> *values, const flatbuffers::Vector<uint8_t> *types);
enum HpfsFsEntryResponseType {
HpfsFsEntryResponseType_Matched = 0,
HpfsFsEntryResponseType_Mismatched = 1,
HpfsFsEntryResponseType_Responded = 2,
HpfsFsEntryResponseType_NotAvailable = 3,
HpfsFsEntryResponseType_MIN = HpfsFsEntryResponseType_Matched,
HpfsFsEntryResponseType_MAX = HpfsFsEntryResponseType_NotAvailable
};
inline const HpfsFsEntryResponseType (&EnumValuesHpfsFsEntryResponseType())[4] {
static const HpfsFsEntryResponseType values[] = {
HpfsFsEntryResponseType_Matched,
HpfsFsEntryResponseType_Mismatched,
HpfsFsEntryResponseType_Responded,
HpfsFsEntryResponseType_NotAvailable
};
return values;
}
inline const char * const *EnumNamesHpfsFsEntryResponseType() {
static const char * const names[5] = {
"Matched",
"Mismatched",
"Responded",
"NotAvailable",
nullptr
};
return names;
}
inline const char *EnumNameHpfsFsEntryResponseType(HpfsFsEntryResponseType e) {
if (flatbuffers::IsOutRange(e, HpfsFsEntryResponseType_Matched, HpfsFsEntryResponseType_NotAvailable)) return "";
const size_t index = static_cast<size_t>(e);
return EnumNamesHpfsFsEntryResponseType()[index];
}
enum HpfsRequestHint {
HpfsRequestHint_NONE = 0,
HpfsRequestHint_HpfsFsEntryHint = 1,
HpfsRequestHint_HpfsFileHashMapHint = 2,
HpfsRequestHint_MIN = HpfsRequestHint_NONE,
HpfsRequestHint_MAX = HpfsRequestHint_HpfsFileHashMapHint
};
inline const HpfsRequestHint (&EnumValuesHpfsRequestHint())[3] {
static const HpfsRequestHint values[] = {
HpfsRequestHint_NONE,
HpfsRequestHint_HpfsFsEntryHint,
HpfsRequestHint_HpfsFileHashMapHint
};
return values;
}
inline const char * const *EnumNamesHpfsRequestHint() {
static const char * const names[4] = {
"NONE",
"HpfsFsEntryHint",
"HpfsFileHashMapHint",
nullptr
};
return names;
}
inline const char *EnumNameHpfsRequestHint(HpfsRequestHint e) {
if (flatbuffers::IsOutRange(e, HpfsRequestHint_NONE, HpfsRequestHint_HpfsFileHashMapHint)) return "";
const size_t index = static_cast<size_t>(e);
return EnumNamesHpfsRequestHint()[index];
}
template<typename T> struct HpfsRequestHintTraits {
static const HpfsRequestHint enum_value = HpfsRequestHint_NONE;
};
template<> struct HpfsRequestHintTraits<msg::fbuf::p2pmsg::HpfsFsEntryHint> {
static const HpfsRequestHint enum_value = HpfsRequestHint_HpfsFsEntryHint;
};
template<> struct HpfsRequestHintTraits<msg::fbuf::p2pmsg::HpfsFileHashMapHint> {
static const HpfsRequestHint enum_value = HpfsRequestHint_HpfsFileHashMapHint;
};
bool VerifyHpfsRequestHint(flatbuffers::Verifier &verifier, const void *obj, HpfsRequestHint type);
bool VerifyHpfsRequestHintVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector<flatbuffers::Offset<void>> *values, const flatbuffers::Vector<uint8_t> *types);
enum HpfsResponse {
HpfsResponse_NONE = 0,
HpfsResponse_HpfsFileHashMapResponse = 1,
HpfsResponse_HpfsBlockResponse = 2,
HpfsResponse_HpfsFsEntryResponse = 3,
HpfsResponse_HpfsFsEntryResponse = 1,
HpfsResponse_HpfsFileHashMapResponse = 2,
HpfsResponse_HpfsBlockResponse = 3,
HpfsResponse_MIN = HpfsResponse_NONE,
HpfsResponse_MAX = HpfsResponse_HpfsFsEntryResponse
HpfsResponse_MAX = HpfsResponse_HpfsBlockResponse
};
inline const HpfsResponse (&EnumValuesHpfsResponse())[4] {
static const HpfsResponse values[] = {
HpfsResponse_NONE,
HpfsResponse_HpfsFsEntryResponse,
HpfsResponse_HpfsFileHashMapResponse,
HpfsResponse_HpfsBlockResponse,
HpfsResponse_HpfsFsEntryResponse
HpfsResponse_HpfsBlockResponse
};
return values;
}
@@ -226,16 +316,16 @@ inline const HpfsResponse (&EnumValuesHpfsResponse())[4] {
inline const char * const *EnumNamesHpfsResponse() {
static const char * const names[5] = {
"NONE",
"HpfsFsEntryResponse",
"HpfsFileHashMapResponse",
"HpfsBlockResponse",
"HpfsFsEntryResponse",
nullptr
};
return names;
}
inline const char *EnumNameHpfsResponse(HpfsResponse e) {
if (flatbuffers::IsOutRange(e, HpfsResponse_NONE, HpfsResponse_HpfsFsEntryResponse)) return "";
if (flatbuffers::IsOutRange(e, HpfsResponse_NONE, HpfsResponse_HpfsBlockResponse)) return "";
const size_t index = static_cast<size_t>(e);
return EnumNamesHpfsResponse()[index];
}
@@ -244,6 +334,10 @@ template<typename T> struct HpfsResponseTraits {
static const HpfsResponse enum_value = HpfsResponse_NONE;
};
template<> struct HpfsResponseTraits<msg::fbuf::p2pmsg::HpfsFsEntryResponse> {
static const HpfsResponse enum_value = HpfsResponse_HpfsFsEntryResponse;
};
template<> struct HpfsResponseTraits<msg::fbuf::p2pmsg::HpfsFileHashMapResponse> {
static const HpfsResponse enum_value = HpfsResponse_HpfsFileHashMapResponse;
};
@@ -252,10 +346,6 @@ template<> struct HpfsResponseTraits<msg::fbuf::p2pmsg::HpfsBlockResponse> {
static const HpfsResponse enum_value = HpfsResponse_HpfsBlockResponse;
};
template<> struct HpfsResponseTraits<msg::fbuf::p2pmsg::HpfsFsEntryResponse> {
static const HpfsResponse enum_value = HpfsResponse_HpfsFsEntryResponse;
};
bool VerifyHpfsResponse(flatbuffers::Verifier &verifier, const void *obj, HpfsResponse type);
bool VerifyHpfsResponseVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector<flatbuffers::Offset<void>> *values, const flatbuffers::Vector<uint8_t> *types);
@@ -1231,6 +1321,219 @@ inline flatbuffers::Offset<NplMsg> CreateNplMsgDirect(
lcl_id);
}
struct HpfsFSHashEntry FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef HpfsFSHashEntryBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_NAME = 4,
VT_IS_FILE = 6,
VT_HASH = 8,
VT_RESPONSE_TYPE = 10
};
const flatbuffers::String *name() const {
return GetPointer<const flatbuffers::String *>(VT_NAME);
}
flatbuffers::String *mutable_name() {
return GetPointer<flatbuffers::String *>(VT_NAME);
}
bool is_file() const {
return GetField<uint8_t>(VT_IS_FILE, 0) != 0;
}
bool mutate_is_file(bool _is_file) {
return SetField<uint8_t>(VT_IS_FILE, static_cast<uint8_t>(_is_file), 0);
}
const flatbuffers::Vector<uint8_t> *hash() const {
return GetPointer<const flatbuffers::Vector<uint8_t> *>(VT_HASH);
}
flatbuffers::Vector<uint8_t> *mutable_hash() {
return GetPointer<flatbuffers::Vector<uint8_t> *>(VT_HASH);
}
msg::fbuf::p2pmsg::HpfsFsEntryResponseType response_type() const {
return static_cast<msg::fbuf::p2pmsg::HpfsFsEntryResponseType>(GetField<int8_t>(VT_RESPONSE_TYPE, 0));
}
bool mutate_response_type(msg::fbuf::p2pmsg::HpfsFsEntryResponseType _response_type) {
return SetField<int8_t>(VT_RESPONSE_TYPE, static_cast<int8_t>(_response_type), 0);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyOffset(verifier, VT_NAME) &&
verifier.VerifyString(name()) &&
VerifyField<uint8_t>(verifier, VT_IS_FILE) &&
VerifyOffset(verifier, VT_HASH) &&
verifier.VerifyVector(hash()) &&
VerifyField<int8_t>(verifier, VT_RESPONSE_TYPE) &&
verifier.EndTable();
}
};
struct HpfsFSHashEntryBuilder {
typedef HpfsFSHashEntry Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_name(flatbuffers::Offset<flatbuffers::String> name) {
fbb_.AddOffset(HpfsFSHashEntry::VT_NAME, name);
}
void add_is_file(bool is_file) {
fbb_.AddElement<uint8_t>(HpfsFSHashEntry::VT_IS_FILE, static_cast<uint8_t>(is_file), 0);
}
void add_hash(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash) {
fbb_.AddOffset(HpfsFSHashEntry::VT_HASH, hash);
}
void add_response_type(msg::fbuf::p2pmsg::HpfsFsEntryResponseType response_type) {
fbb_.AddElement<int8_t>(HpfsFSHashEntry::VT_RESPONSE_TYPE, static_cast<int8_t>(response_type), 0);
}
explicit HpfsFSHashEntryBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
HpfsFSHashEntryBuilder &operator=(const HpfsFSHashEntryBuilder &);
flatbuffers::Offset<HpfsFSHashEntry> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<HpfsFSHashEntry>(end);
return o;
}
};
inline flatbuffers::Offset<HpfsFSHashEntry> CreateHpfsFSHashEntry(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<flatbuffers::String> name = 0,
bool is_file = false,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash = 0,
msg::fbuf::p2pmsg::HpfsFsEntryResponseType response_type = msg::fbuf::p2pmsg::HpfsFsEntryResponseType_Matched) {
HpfsFSHashEntryBuilder builder_(_fbb);
builder_.add_hash(hash);
builder_.add_name(name);
builder_.add_response_type(response_type);
builder_.add_is_file(is_file);
return builder_.Finish();
}
inline flatbuffers::Offset<HpfsFSHashEntry> CreateHpfsFSHashEntryDirect(
flatbuffers::FlatBufferBuilder &_fbb,
const char *name = nullptr,
bool is_file = false,
const std::vector<uint8_t> *hash = nullptr,
msg::fbuf::p2pmsg::HpfsFsEntryResponseType response_type = msg::fbuf::p2pmsg::HpfsFsEntryResponseType_Matched) {
auto name__ = name ? _fbb.CreateString(name) : 0;
auto hash__ = hash ? _fbb.CreateVector<uint8_t>(*hash) : 0;
return msg::fbuf::p2pmsg::CreateHpfsFSHashEntry(
_fbb,
name__,
is_file,
hash__,
response_type);
}
struct HpfsFsEntryHint FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef HpfsFsEntryHintBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_ENTRIES = 4
};
const flatbuffers::Vector<flatbuffers::Offset<msg::fbuf::p2pmsg::HpfsFSHashEntry>> *entries() const {
return GetPointer<const flatbuffers::Vector<flatbuffers::Offset<msg::fbuf::p2pmsg::HpfsFSHashEntry>> *>(VT_ENTRIES);
}
flatbuffers::Vector<flatbuffers::Offset<msg::fbuf::p2pmsg::HpfsFSHashEntry>> *mutable_entries() {
return GetPointer<flatbuffers::Vector<flatbuffers::Offset<msg::fbuf::p2pmsg::HpfsFSHashEntry>> *>(VT_ENTRIES);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyOffset(verifier, VT_ENTRIES) &&
verifier.VerifyVector(entries()) &&
verifier.VerifyVectorOfTables(entries()) &&
verifier.EndTable();
}
};
struct HpfsFsEntryHintBuilder {
typedef HpfsFsEntryHint Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_entries(flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<msg::fbuf::p2pmsg::HpfsFSHashEntry>>> entries) {
fbb_.AddOffset(HpfsFsEntryHint::VT_ENTRIES, entries);
}
explicit HpfsFsEntryHintBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
HpfsFsEntryHintBuilder &operator=(const HpfsFsEntryHintBuilder &);
flatbuffers::Offset<HpfsFsEntryHint> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<HpfsFsEntryHint>(end);
return o;
}
};
inline flatbuffers::Offset<HpfsFsEntryHint> CreateHpfsFsEntryHint(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<msg::fbuf::p2pmsg::HpfsFSHashEntry>>> entries = 0) {
HpfsFsEntryHintBuilder builder_(_fbb);
builder_.add_entries(entries);
return builder_.Finish();
}
inline flatbuffers::Offset<HpfsFsEntryHint> CreateHpfsFsEntryHintDirect(
flatbuffers::FlatBufferBuilder &_fbb,
const std::vector<flatbuffers::Offset<msg::fbuf::p2pmsg::HpfsFSHashEntry>> *entries = nullptr) {
auto entries__ = entries ? _fbb.CreateVector<flatbuffers::Offset<msg::fbuf::p2pmsg::HpfsFSHashEntry>>(*entries) : 0;
return msg::fbuf::p2pmsg::CreateHpfsFsEntryHint(
_fbb,
entries__);
}
struct HpfsFileHashMapHint FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef HpfsFileHashMapHintBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_HASH_MAP = 4
};
const flatbuffers::Vector<uint8_t> *hash_map() const {
return GetPointer<const flatbuffers::Vector<uint8_t> *>(VT_HASH_MAP);
}
flatbuffers::Vector<uint8_t> *mutable_hash_map() {
return GetPointer<flatbuffers::Vector<uint8_t> *>(VT_HASH_MAP);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyOffset(verifier, VT_HASH_MAP) &&
verifier.VerifyVector(hash_map()) &&
verifier.EndTable();
}
};
struct HpfsFileHashMapHintBuilder {
typedef HpfsFileHashMapHint Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_hash_map(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash_map) {
fbb_.AddOffset(HpfsFileHashMapHint::VT_HASH_MAP, hash_map);
}
explicit HpfsFileHashMapHintBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
HpfsFileHashMapHintBuilder &operator=(const HpfsFileHashMapHintBuilder &);
flatbuffers::Offset<HpfsFileHashMapHint> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<HpfsFileHashMapHint>(end);
return o;
}
};
inline flatbuffers::Offset<HpfsFileHashMapHint> CreateHpfsFileHashMapHint(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash_map = 0) {
HpfsFileHashMapHintBuilder builder_(_fbb);
builder_.add_hash_map(hash_map);
return builder_.Finish();
}
inline flatbuffers::Offset<HpfsFileHashMapHint> CreateHpfsFileHashMapHintDirect(
flatbuffers::FlatBufferBuilder &_fbb,
const std::vector<uint8_t> *hash_map = nullptr) {
auto hash_map__ = hash_map ? _fbb.CreateVector<uint8_t>(*hash_map) : 0;
return msg::fbuf::p2pmsg::CreateHpfsFileHashMapHint(
_fbb,
hash_map__);
}
struct HpfsRequestMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef HpfsRequestMsgBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
@@ -1238,7 +1541,9 @@ struct HpfsRequestMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
VT_PARENT_PATH = 6,
VT_IS_FILE = 8,
VT_BLOCK_ID = 10,
VT_EXPECTED_HASH = 12
VT_EXPECTED_HASH = 12,
VT_HINT_TYPE = 14,
VT_HINT = 16
};
uint32_t mount_id() const {
return GetField<uint32_t>(VT_MOUNT_ID, 0);
@@ -1270,6 +1575,22 @@ struct HpfsRequestMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
flatbuffers::Vector<uint8_t> *mutable_expected_hash() {
return GetPointer<flatbuffers::Vector<uint8_t> *>(VT_EXPECTED_HASH);
}
msg::fbuf::p2pmsg::HpfsRequestHint hint_type() const {
return static_cast<msg::fbuf::p2pmsg::HpfsRequestHint>(GetField<uint8_t>(VT_HINT_TYPE, 0));
}
const void *hint() const {
return GetPointer<const void *>(VT_HINT);
}
template<typename T> const T *hint_as() const;
const msg::fbuf::p2pmsg::HpfsFsEntryHint *hint_as_HpfsFsEntryHint() const {
return hint_type() == msg::fbuf::p2pmsg::HpfsRequestHint_HpfsFsEntryHint ? static_cast<const msg::fbuf::p2pmsg::HpfsFsEntryHint *>(hint()) : nullptr;
}
const msg::fbuf::p2pmsg::HpfsFileHashMapHint *hint_as_HpfsFileHashMapHint() const {
return hint_type() == msg::fbuf::p2pmsg::HpfsRequestHint_HpfsFileHashMapHint ? static_cast<const msg::fbuf::p2pmsg::HpfsFileHashMapHint *>(hint()) : nullptr;
}
void *mutable_hint() {
return GetPointer<void *>(VT_HINT);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<uint32_t>(verifier, VT_MOUNT_ID) &&
@@ -1279,10 +1600,21 @@ struct HpfsRequestMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
VerifyField<int32_t>(verifier, VT_BLOCK_ID) &&
VerifyOffset(verifier, VT_EXPECTED_HASH) &&
verifier.VerifyVector(expected_hash()) &&
VerifyField<uint8_t>(verifier, VT_HINT_TYPE) &&
VerifyOffset(verifier, VT_HINT) &&
VerifyHpfsRequestHint(verifier, hint(), hint_type()) &&
verifier.EndTable();
}
};
template<> inline const msg::fbuf::p2pmsg::HpfsFsEntryHint *HpfsRequestMsg::hint_as<msg::fbuf::p2pmsg::HpfsFsEntryHint>() const {
return hint_as_HpfsFsEntryHint();
}
template<> inline const msg::fbuf::p2pmsg::HpfsFileHashMapHint *HpfsRequestMsg::hint_as<msg::fbuf::p2pmsg::HpfsFileHashMapHint>() const {
return hint_as_HpfsFileHashMapHint();
}
struct HpfsRequestMsgBuilder {
typedef HpfsRequestMsg Table;
flatbuffers::FlatBufferBuilder &fbb_;
@@ -1302,6 +1634,12 @@ struct HpfsRequestMsgBuilder {
void add_expected_hash(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> expected_hash) {
fbb_.AddOffset(HpfsRequestMsg::VT_EXPECTED_HASH, expected_hash);
}
void add_hint_type(msg::fbuf::p2pmsg::HpfsRequestHint hint_type) {
fbb_.AddElement<uint8_t>(HpfsRequestMsg::VT_HINT_TYPE, static_cast<uint8_t>(hint_type), 0);
}
void add_hint(flatbuffers::Offset<void> hint) {
fbb_.AddOffset(HpfsRequestMsg::VT_HINT, hint);
}
explicit HpfsRequestMsgBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
@@ -1320,12 +1658,16 @@ inline flatbuffers::Offset<HpfsRequestMsg> CreateHpfsRequestMsg(
flatbuffers::Offset<flatbuffers::String> parent_path = 0,
bool is_file = false,
int32_t block_id = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> expected_hash = 0) {
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> expected_hash = 0,
msg::fbuf::p2pmsg::HpfsRequestHint hint_type = msg::fbuf::p2pmsg::HpfsRequestHint_NONE,
flatbuffers::Offset<void> hint = 0) {
HpfsRequestMsgBuilder builder_(_fbb);
builder_.add_hint(hint);
builder_.add_expected_hash(expected_hash);
builder_.add_block_id(block_id);
builder_.add_parent_path(parent_path);
builder_.add_mount_id(mount_id);
builder_.add_hint_type(hint_type);
builder_.add_is_file(is_file);
return builder_.Finish();
}
@@ -1336,7 +1678,9 @@ inline flatbuffers::Offset<HpfsRequestMsg> CreateHpfsRequestMsgDirect(
const char *parent_path = nullptr,
bool is_file = false,
int32_t block_id = 0,
const std::vector<uint8_t> *expected_hash = nullptr) {
const std::vector<uint8_t> *expected_hash = nullptr,
msg::fbuf::p2pmsg::HpfsRequestHint hint_type = msg::fbuf::p2pmsg::HpfsRequestHint_NONE,
flatbuffers::Offset<void> hint = 0) {
auto parent_path__ = parent_path ? _fbb.CreateString(parent_path) : 0;
auto expected_hash__ = expected_hash ? _fbb.CreateVector<uint8_t>(*expected_hash) : 0;
return msg::fbuf::p2pmsg::CreateHpfsRequestMsg(
@@ -1345,7 +1689,9 @@ inline flatbuffers::Offset<HpfsRequestMsg> CreateHpfsRequestMsgDirect(
parent_path__,
is_file,
block_id,
expected_hash__);
expected_hash__,
hint_type,
hint);
}
struct HpfsResponseMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
@@ -1382,15 +1728,15 @@ struct HpfsResponseMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
return GetPointer<const void *>(VT_CONTENT);
}
template<typename T> const T *content_as() const;
const msg::fbuf::p2pmsg::HpfsFsEntryResponse *content_as_HpfsFsEntryResponse() const {
return content_type() == msg::fbuf::p2pmsg::HpfsResponse_HpfsFsEntryResponse ? static_cast<const msg::fbuf::p2pmsg::HpfsFsEntryResponse *>(content()) : nullptr;
}
const msg::fbuf::p2pmsg::HpfsFileHashMapResponse *content_as_HpfsFileHashMapResponse() const {
return content_type() == msg::fbuf::p2pmsg::HpfsResponse_HpfsFileHashMapResponse ? static_cast<const msg::fbuf::p2pmsg::HpfsFileHashMapResponse *>(content()) : nullptr;
}
const msg::fbuf::p2pmsg::HpfsBlockResponse *content_as_HpfsBlockResponse() const {
return content_type() == msg::fbuf::p2pmsg::HpfsResponse_HpfsBlockResponse ? static_cast<const msg::fbuf::p2pmsg::HpfsBlockResponse *>(content()) : nullptr;
}
const msg::fbuf::p2pmsg::HpfsFsEntryResponse *content_as_HpfsFsEntryResponse() const {
return content_type() == msg::fbuf::p2pmsg::HpfsResponse_HpfsFsEntryResponse ? static_cast<const msg::fbuf::p2pmsg::HpfsFsEntryResponse *>(content()) : nullptr;
}
void *mutable_content() {
return GetPointer<void *>(VT_CONTENT);
}
@@ -1408,6 +1754,10 @@ struct HpfsResponseMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
}
};
template<> inline const msg::fbuf::p2pmsg::HpfsFsEntryResponse *HpfsResponseMsg::content_as<msg::fbuf::p2pmsg::HpfsFsEntryResponse>() const {
return content_as_HpfsFsEntryResponse();
}
template<> inline const msg::fbuf::p2pmsg::HpfsFileHashMapResponse *HpfsResponseMsg::content_as<msg::fbuf::p2pmsg::HpfsFileHashMapResponse>() const {
return content_as_HpfsFileHashMapResponse();
}
@@ -1416,10 +1766,6 @@ template<> inline const msg::fbuf::p2pmsg::HpfsBlockResponse *HpfsResponseMsg::c
return content_as_HpfsBlockResponse();
}
template<> inline const msg::fbuf::p2pmsg::HpfsFsEntryResponse *HpfsResponseMsg::content_as<msg::fbuf::p2pmsg::HpfsFsEntryResponse>() const {
return content_as_HpfsFsEntryResponse();
}
struct HpfsResponseMsgBuilder {
typedef HpfsResponseMsg Table;
flatbuffers::FlatBufferBuilder &fbb_;
@@ -1561,7 +1907,8 @@ struct HpfsFileHashMapResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Ta
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_FILE_LENGTH = 4,
VT_FILE_MODE = 6,
VT_HASH_MAP = 8
VT_HASH_MAP = 8,
VT_RESPONDED_BLOCK_IDS = 10
};
uint64_t file_length() const {
return GetField<uint64_t>(VT_FILE_LENGTH, 0);
@@ -1581,12 +1928,20 @@ struct HpfsFileHashMapResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Ta
flatbuffers::Vector<uint8_t> *mutable_hash_map() {
return GetPointer<flatbuffers::Vector<uint8_t> *>(VT_HASH_MAP);
}
const flatbuffers::Vector<uint32_t> *responded_block_ids() const {
return GetPointer<const flatbuffers::Vector<uint32_t> *>(VT_RESPONDED_BLOCK_IDS);
}
flatbuffers::Vector<uint32_t> *mutable_responded_block_ids() {
return GetPointer<flatbuffers::Vector<uint32_t> *>(VT_RESPONDED_BLOCK_IDS);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<uint64_t>(verifier, VT_FILE_LENGTH) &&
VerifyField<uint32_t>(verifier, VT_FILE_MODE) &&
VerifyOffset(verifier, VT_HASH_MAP) &&
verifier.VerifyVector(hash_map()) &&
VerifyOffset(verifier, VT_RESPONDED_BLOCK_IDS) &&
verifier.VerifyVector(responded_block_ids()) &&
verifier.EndTable();
}
};
@@ -1604,6 +1959,9 @@ struct HpfsFileHashMapResponseBuilder {
void add_hash_map(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash_map) {
fbb_.AddOffset(HpfsFileHashMapResponse::VT_HASH_MAP, hash_map);
}
void add_responded_block_ids(flatbuffers::Offset<flatbuffers::Vector<uint32_t>> responded_block_ids) {
fbb_.AddOffset(HpfsFileHashMapResponse::VT_RESPONDED_BLOCK_IDS, responded_block_ids);
}
explicit HpfsFileHashMapResponseBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
@@ -1620,9 +1978,11 @@ inline flatbuffers::Offset<HpfsFileHashMapResponse> CreateHpfsFileHashMapRespons
flatbuffers::FlatBufferBuilder &_fbb,
uint64_t file_length = 0,
uint32_t file_mode = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash_map = 0) {
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash_map = 0,
flatbuffers::Offset<flatbuffers::Vector<uint32_t>> responded_block_ids = 0) {
HpfsFileHashMapResponseBuilder builder_(_fbb);
builder_.add_file_length(file_length);
builder_.add_responded_block_ids(responded_block_ids);
builder_.add_hash_map(hash_map);
builder_.add_file_mode(file_mode);
return builder_.Finish();
@@ -1632,13 +1992,16 @@ inline flatbuffers::Offset<HpfsFileHashMapResponse> CreateHpfsFileHashMapRespons
flatbuffers::FlatBufferBuilder &_fbb,
uint64_t file_length = 0,
uint32_t file_mode = 0,
const std::vector<uint8_t> *hash_map = nullptr) {
const std::vector<uint8_t> *hash_map = nullptr,
const std::vector<uint32_t> *responded_block_ids = nullptr) {
auto hash_map__ = hash_map ? _fbb.CreateVector<uint8_t>(*hash_map) : 0;
auto responded_block_ids__ = responded_block_ids ? _fbb.CreateVector<uint32_t>(*responded_block_ids) : 0;
return msg::fbuf::p2pmsg::CreateHpfsFileHashMapResponse(
_fbb,
file_length,
file_mode,
hash_map__);
hash_map__,
responded_block_ids__);
}
struct HpfsBlockResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
@@ -1711,93 +2074,6 @@ inline flatbuffers::Offset<HpfsBlockResponse> CreateHpfsBlockResponseDirect(
data__);
}
struct HpfsFSHashEntry FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef HpfsFSHashEntryBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_NAME = 4,
VT_IS_FILE = 6,
VT_HASH = 8
};
const flatbuffers::String *name() const {
return GetPointer<const flatbuffers::String *>(VT_NAME);
}
flatbuffers::String *mutable_name() {
return GetPointer<flatbuffers::String *>(VT_NAME);
}
bool is_file() const {
return GetField<uint8_t>(VT_IS_FILE, 0) != 0;
}
bool mutate_is_file(bool _is_file) {
return SetField<uint8_t>(VT_IS_FILE, static_cast<uint8_t>(_is_file), 0);
}
const flatbuffers::Vector<uint8_t> *hash() const {
return GetPointer<const flatbuffers::Vector<uint8_t> *>(VT_HASH);
}
flatbuffers::Vector<uint8_t> *mutable_hash() {
return GetPointer<flatbuffers::Vector<uint8_t> *>(VT_HASH);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyOffset(verifier, VT_NAME) &&
verifier.VerifyString(name()) &&
VerifyField<uint8_t>(verifier, VT_IS_FILE) &&
VerifyOffset(verifier, VT_HASH) &&
verifier.VerifyVector(hash()) &&
verifier.EndTable();
}
};
struct HpfsFSHashEntryBuilder {
typedef HpfsFSHashEntry Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_name(flatbuffers::Offset<flatbuffers::String> name) {
fbb_.AddOffset(HpfsFSHashEntry::VT_NAME, name);
}
void add_is_file(bool is_file) {
fbb_.AddElement<uint8_t>(HpfsFSHashEntry::VT_IS_FILE, static_cast<uint8_t>(is_file), 0);
}
void add_hash(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash) {
fbb_.AddOffset(HpfsFSHashEntry::VT_HASH, hash);
}
explicit HpfsFSHashEntryBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
HpfsFSHashEntryBuilder &operator=(const HpfsFSHashEntryBuilder &);
flatbuffers::Offset<HpfsFSHashEntry> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<HpfsFSHashEntry>(end);
return o;
}
};
inline flatbuffers::Offset<HpfsFSHashEntry> CreateHpfsFSHashEntry(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<flatbuffers::String> name = 0,
bool is_file = false,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash = 0) {
HpfsFSHashEntryBuilder builder_(_fbb);
builder_.add_hash(hash);
builder_.add_name(name);
builder_.add_is_file(is_file);
return builder_.Finish();
}
inline flatbuffers::Offset<HpfsFSHashEntry> CreateHpfsFSHashEntryDirect(
flatbuffers::FlatBufferBuilder &_fbb,
const char *name = nullptr,
bool is_file = false,
const std::vector<uint8_t> *hash = nullptr) {
auto name__ = name ? _fbb.CreateString(name) : 0;
auto hash__ = hash ? _fbb.CreateVector<uint8_t>(*hash) : 0;
return msg::fbuf::p2pmsg::CreateHpfsFSHashEntry(
_fbb,
name__,
is_file,
hash__);
}
struct HpfsLogRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef HpfsLogRequestBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
@@ -2415,11 +2691,44 @@ inline bool VerifyP2PMsgContentVector(flatbuffers::Verifier &verifier, const fla
return true;
}
inline bool VerifyHpfsRequestHint(flatbuffers::Verifier &verifier, const void *obj, HpfsRequestHint type) {
switch (type) {
case HpfsRequestHint_NONE: {
return true;
}
case HpfsRequestHint_HpfsFsEntryHint: {
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::HpfsFsEntryHint *>(obj);
return verifier.VerifyTable(ptr);
}
case HpfsRequestHint_HpfsFileHashMapHint: {
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::HpfsFileHashMapHint *>(obj);
return verifier.VerifyTable(ptr);
}
default: return true;
}
}
inline bool VerifyHpfsRequestHintVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector<flatbuffers::Offset<void>> *values, const flatbuffers::Vector<uint8_t> *types) {
if (!values || !types) return !values && !types;
if (values->size() != types->size()) return false;
for (flatbuffers::uoffset_t i = 0; i < values->size(); ++i) {
if (!VerifyHpfsRequestHint(
verifier, values->Get(i), types->GetEnum<HpfsRequestHint>(i))) {
return false;
}
}
return true;
}
inline bool VerifyHpfsResponse(flatbuffers::Verifier &verifier, const void *obj, HpfsResponse type) {
switch (type) {
case HpfsResponse_NONE: {
return true;
}
case HpfsResponse_HpfsFsEntryResponse: {
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::HpfsFsEntryResponse *>(obj);
return verifier.VerifyTable(ptr);
}
case HpfsResponse_HpfsFileHashMapResponse: {
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::HpfsFileHashMapResponse *>(obj);
return verifier.VerifyTable(ptr);
@@ -2428,10 +2737,6 @@ inline bool VerifyHpfsResponse(flatbuffers::Verifier &verifier, const void *obj,
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::HpfsBlockResponse *>(obj);
return verifier.VerifyTable(ptr);
}
case HpfsResponse_HpfsFsEntryResponse: {
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::HpfsFsEntryResponse *>(obj);
return verifier.VerifyTable(ptr);
}
default: return true;
}
}

View File

@@ -5,6 +5,7 @@
#include "../usr/user_input.hpp"
#include "../util/h32.hpp"
#include "../conf.hpp"
#include "../hpfs/hpfs_mount.hpp"
#include "../msg/fbuf/p2pmsg_generated.h"
#include "peer_comm_server.hpp"
#include "peer_comm_session.hpp"
@@ -122,16 +123,6 @@ namespace p2p
std::string data;
};
// Represents a hpfs request sent to a peer.
struct hpfs_request
{
uint32_t mount_id; // Relavent file system id.
std::string parent_path; // The requested file or dir path.
bool is_file = false; // Whether the path is a file or dir.
int32_t block_id = 0; // Block id of the file if we are requesting for file block. Otherwise -1.
util::h32 expected_hash; // The expected hash of the requested result.
};
// Represents hpfs log sync request.
struct hpfs_log_request
{
@@ -146,12 +137,24 @@ namespace p2p
std::vector<uint8_t> log_record_bytes;
};
enum HPFS_FS_ENTRY_RESPONSE_TYPE
{
MATCHED = 0, // The entry matches between requester and responder. No sync needed.
MISMATCHED = 1, // The entry does not match (either hash mismatch or new entry). Requester must request for this entry.
RESPONDED = 2, // The entry does not match and the repsonder has dispatched the sync response.
NOT_AVAILABLE = 3 // The entry does not exist on responder side. Requester must delete this on his side.
};
// Represents hpfs file system entry.
struct hpfs_fs_hash_entry
{
std::string name; // Name of the file/dir.
bool is_file = false; // Whether this is a file or dir.
util::h32 hash; // Hash of the file or dir.
// Only relevant for hpfs responses. Indicates about the availabilty and status of this
// fs entry as reported by the responder.
HPFS_FS_ENTRY_RESPONSE_TYPE response_type = HPFS_FS_ENTRY_RESPONSE_TYPE::MATCHED;
};
// Represents a file block data resposne.
@@ -163,6 +166,18 @@ namespace p2p
util::h32 hash; // Hash of the bloc data.
};
// Represents a hpfs request sent to a peer.
struct hpfs_request
{
uint32_t mount_id = 0; // Relavent file system id.
std::string parent_path; // The requested file or dir path.
bool is_file = false; // Whether the path is a file or dir.
int32_t block_id = 0; // Block id of the file if we are requesting for file block. Otherwise -1.
util::h32 expected_hash; // The expected hash of the requested result.
std::vector<hpfs_fs_hash_entry> fs_entry_hints; // Included fs entry entry hints for the responder.
std::vector<util::h32> file_hashmap_hints; // Included file hash map hints for the responder.
};
struct peer_message_info
{
const msg::fbuf::p2pmsg::P2PMsg *p2p_msg = NULL;