Refactoring hpfs, hpfs sync and hpfs serve code. (#231)

* Refactoring hpfs code to a class so it can support multiple mounts.
* Refactoring hpfs serve into a class to support mulitiple mount serving.
* Refactoring hpfs sync into class to support multiple instances.
* Code improvements in hpfs_sync.
* Taking a sync target list for hpfs syncing target set.
This commit is contained in:
Savinda Senevirathne
2021-02-02 13:17:21 +05:30
committed by GitHub
parent 08680ee8d4
commit d08d2630f6
21 changed files with 948 additions and 736 deletions

View File

@@ -37,6 +37,7 @@ add_executable(hpcore
src/sc.cpp
src/bill/corebill.cpp
src/hpfs/hpfs.cpp
src/hpfs/hpfs_mount.cpp
src/hpfs/hpfs_serve.cpp
src/hpfs/hpfs_sync.cpp
src/comm/comm_session.cpp

View File

@@ -667,7 +667,7 @@ namespace conf
jsoncons::ojson jdoc;
populate_contract_section_json(jdoc, cfg.contract, true);
const std::string patch_file_path = hpfs::physical_path(hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH);
const std::string patch_file_path = hpfs::contract_fs.physical_path(hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH);
return write_json_file(patch_file_path, jdoc);
}
@@ -679,7 +679,7 @@ namespace conf
*/
int apply_patch_config(std::string_view hpfs_session_name)
{
const std::string path = hpfs::physical_path(hpfs_session_name, hpfs::PATCH_FILE_PATH);
const std::string path = hpfs::contract_fs.physical_path(hpfs_session_name, hpfs::PATCH_FILE_PATH);
if (!util::is_file_exists(path))
return 0;

View File

@@ -113,8 +113,9 @@ namespace consensus
// Get current lcl and state.
std::string lcl = ledger::ctx.get_lcl();
const uint64_t lcl_seq_no = ledger::ctx.get_seq_no();
util::h32 state_hash = hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::STATE);
util::h32 patch_hash = hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH);
hpfs::hpfs_mount &contract_fs = hpfs::contract_fs; // Ref of the contract_fs object.
util::h32 state_hash = contract_fs.get_parent_hash(hpfs::STATE_DIR_PATH);
util::h32 patch_hash = contract_fs.get_parent_hash(hpfs::PATCH_FILE_PATH);
if (ctx.stage == 0)
{
@@ -199,11 +200,21 @@ namespace consensus
if (is_patch_desync)
is_patch_update_pending = false;
// Start hpfs sync if we are out-of-sync with majority hpfs state.
// Start hpfs sync if we are out-of-sync with majority hpfs patch hash or state hash.
if (is_state_desync || is_patch_desync)
{
conf::change_role(conf::ROLE::OBSERVER);
hpfs_sync::set_target(majority_state_hash, majority_patch_hash);
// This queue holds all the sync targets which needs to get synced in contract fs.
std::queue<hpfs::sync_target> sync_target_list;
if (is_patch_desync)
sync_target_list.push(hpfs::sync_target{"patch", majority_patch_hash, hpfs::PATCH_FILE_PATH, hpfs::BACKLOG_ITEM_TYPE::FILE});
if (is_state_desync)
sync_target_list.push(hpfs::sync_target{"state", majority_state_hash, hpfs::STATE_DIR_PATH, hpfs::BACKLOG_ITEM_TYPE::DIR});
// Set sync targets for contract fs.
hpfs::contract_sync.set_target(std::move(sync_target_list));
}
// Proceed further only if both lcl and state are in sync with majority.
@@ -213,7 +224,7 @@ namespace consensus
return 0;
}
// lcl, hpfs or unl desync.
// lcl or hpfs desync.
return -1;
}
@@ -227,7 +238,7 @@ namespace consensus
*/
void check_sync_completion()
{
if (conf::cfg.node.role == conf::ROLE::OBSERVER && !hpfs_sync::ctx.is_syncing && !ledger::sync_ctx.is_syncing)
if (conf::cfg.node.role == conf::ROLE::OBSERVER && !hpfs::contract_sync.ctx.is_syncing && !ledger::sync_ctx.is_syncing)
conf::change_role(conf::ROLE::VALIDATOR);
}
@@ -759,7 +770,7 @@ namespace consensus
}
}
is_state_desync = (hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::STATE) != majority_state_hash);
is_state_desync = (hpfs::contract_fs.get_parent_hash(hpfs::STATE_DIR_PATH) != majority_state_hash);
}
/**
@@ -785,7 +796,7 @@ namespace consensus
}
}
is_patch_desync = (hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH) != majority_patch_hash);
is_patch_desync = (hpfs::contract_fs.get_parent_hash(hpfs::PATCH_FILE_PATH) != majority_patch_hash);
}
/**
@@ -871,7 +882,8 @@ namespace consensus
return -1;
}
hpfs::ctx.set_hash(hpfs::HPFS_PARENT_COMPONENTS::STATE, args.post_execution_state_hash);
// Update state hash in contract fs global hash tracker.
hpfs::contract_fs.set_parent_hash(hpfs::STATE_DIR_PATH, args.post_execution_state_hash);
new_state_hash = args.post_execution_state_hash;
extract_user_outputs_from_contract_bufmap(args.userbufs);
@@ -1040,16 +1052,18 @@ namespace consensus
*/
int apply_consensed_patch_file_changes(const util::h32 &prop_patch_hash, const util::h32 &current_patch_hash)
{
hpfs::hpfs_mount &contract_fs = hpfs::contract_fs;
// Check whether is there any patch changes to be applied which reached consensus.
if (is_patch_update_pending && current_patch_hash == prop_patch_hash)
{
if (hpfs::start_ro_session(HPFS_SESSION_NAME, false) != -1)
if (contract_fs.start_ro_session(HPFS_SESSION_NAME, false) != -1)
{
// Appling new patch file changes to hpcore runtime.
if (conf::apply_patch_config(HPFS_SESSION_NAME) == -1)
{
LOG_ERROR << "Appling patch file changes after consensus failed.";
hpfs::stop_ro_session(HPFS_SESSION_NAME);
contract_fs.stop_ro_session(HPFS_SESSION_NAME);
return -1;
}
else
@@ -1059,7 +1073,7 @@ namespace consensus
}
}
if (hpfs::stop_ro_session(HPFS_SESSION_NAME) == -1)
if (contract_fs.stop_ro_session(HPFS_SESSION_NAME) == -1)
return -1;
}
return 0;

View File

@@ -1,360 +1,47 @@
#include "hpfs.hpp"
#include "./hpfs.hpp"
#include "../conf.hpp"
#include "../hplog.hpp"
#include "../util/util.hpp"
#include "../util/h32.hpp"
#include "../sc.hpp"
#include "./hpfs_serve.hpp"
namespace hpfs
{
constexpr const char *TRACE_ARG_ERROR = "trace=error";
constexpr const char *TRACE_ARG_DEBUG = "trace=error";
constexpr const char *RW_SESSION = "/::hpfs.rw.hmap";
constexpr const char *RO_SESSION = "/::hpfs.ro.";
constexpr const char *RO_SESSION_HMAP = "/::hpfs.ro.hmap.";
constexpr const char *HMAP_HASH = "::hpfs.hmap.hash";
constexpr const char *HMAP_CHILDREN = "::hpfs.hmap.children";
constexpr ino_t ROOT_INO = 1;
constexpr uint16_t PROCESS_INIT_TIMEOUT = 2000;
constexpr uint16_t INIT_CHECK_INTERVAL = 20;
bool init_success = false;
hpfs_context ctx;
hpfs::hpfs_mount contract_fs; // Global contract file system instance.
hpfs::hpfs_sync contract_sync; // Global contract file system sync instance.
hpfs::hpfs_serve contract_serve;
/**
* Performs system startup activitites related to hpfs execution.
*/
* Initialize necessary file system mounts to hpcore.
*/
int init()
{
if (start_hpfs_process(ctx.hpfs_pid) == -1)
return -1;
if (prepare_fs() == -1)
if (contract_fs.init(CONTRACT_FS_ID, conf::ctx.hpfs_dir, conf::ctx.hpfs_mount_dir, conf::ctx.hpfs_rw_dir, conf::cfg.node.full_history) == -1)
{
util::kill_process(ctx.hpfs_pid, true);
LOG_ERROR << "Contract file system initialization failed.";
return -1;
}
if (contract_serve.init("contract", &contract_fs) == -1)
{
LOG_ERROR << "Contract file system serve worker initialization failed.";
return -1;
}
if (contract_sync.init("contract", &contract_fs) == -1)
{
LOG_ERROR << "Contract file system sync worker initialization failed.";
return -1;
}
init_success = true;
return 0;
}
/**
* Performs global cleanup related to hpfs execution.
*/
* Perform cleanups on created mounts.
*/
void deinit()
{
if (init_success)
{
LOG_DEBUG << "Stopping hpfs process... pid:" << ctx.hpfs_pid;
if (ctx.hpfs_pid > 0 && util::kill_process(ctx.hpfs_pid, true) == 0)
LOG_INFO << "Stopped hpfs process.";
}
}
/**
* Performs initial patch file population and loads initial hashes for later use.
* During startup, we always populate patch.cfg with current values from hp.cfg.
* @return 0 on success. -1 on failure.
*/
int prepare_fs()
{
util::h32 initial_state_hash;
util::h32 initial_patch_hash;
if (acquire_rw_session() == -1 ||
conf::populate_patch_config() == -1 ||
get_hash(initial_state_hash, RW_SESSION_NAME, hpfs::STATE_DIR_PATH) == -1 ||
get_hash(initial_patch_hash, RW_SESSION_NAME, hpfs::PATCH_FILE_PATH) == -1 ||
release_rw_session() == -1)
{
LOG_ERROR << "Failed to prepare initial fs.";
return -1;
}
ctx.set_hash(HPFS_PARENT_COMPONENTS::STATE, initial_state_hash);
ctx.set_hash(HPFS_PARENT_COMPONENTS::PATCH, initial_patch_hash);
LOG_INFO << "Initial state: " << initial_state_hash << " | patch: " << initial_patch_hash;
return 0;
}
/**
* Starts the hpfs process used for all fs sessions.
*/
int start_hpfs_process(pid_t &hpfs_pid)
{
const pid_t pid = fork();
if (pid > 0)
{
// HotPocket process.
LOG_DEBUG << "Starting hpfs process.";
// Wait until hpfs is initialized properly.
const uint16_t max_retries = PROCESS_INIT_TIMEOUT / INIT_CHECK_INTERVAL;
bool hpfs_initialized = false;
uint16_t retry_count = 0;
do
{
util::sleep(INIT_CHECK_INTERVAL);
// Check if hpfs process is still running.
// Sending signal 0 to test whether process exist.
if (util::kill_process(pid, false, 0) == -1)
{
LOG_ERROR << "hpfs process " << pid << " has stopped.";
break;
}
// We check for the specific inode no. of the mounted root dir. That means hpfs FUSE interface is up.
struct stat st;
if (stat(conf::ctx.hpfs_mount_dir.c_str(), &st) == -1)
{
LOG_ERROR << errno << ": Error in checking hpfs status.";
break;
}
hpfs_initialized = (st.st_ino == ROOT_INO);
// Keep retrying until root inode no. matches or timeout occurs.
} while (!hpfs_initialized && ++retry_count <= max_retries);
// Kill the process if hpfs couldn't be initialized properly.
if (!hpfs_initialized)
{
LOG_ERROR << "Couldn't initialize hpfs process.";
util::kill_process(pid, true);
return -1;
}
hpfs_pid = pid;
LOG_DEBUG << "hpfs process started. pid:" << hpfs_pid;
}
else if (pid == 0)
{
// hpfs process.
util::fork_detach();
const char *active_hpfs_trace_arg = (conf::cfg.log.loglevel_type == conf::LOG_SEVERITY::DEBUG ? TRACE_ARG_DEBUG : TRACE_ARG_ERROR);
// Fill process args.
char *execv_args[] = {
conf::ctx.hpfs_exe_path.data(),
(char *)"fs",
conf::ctx.hpfs_dir.data(),
conf::ctx.hpfs_mount_dir.data(),
// In full history mode, we disable log merge of hpfs.
(char *)(conf::cfg.node.full_history ? "merge=false" : "merge=true"),
(char *)active_hpfs_trace_arg,
NULL};
const int ret = execv(execv_args[0], execv_args);
std::cerr << errno << ": hpfs process execv failed.\n";
exit(1);
}
else
{
LOG_ERROR << errno << ": fork() failed when starting hpfs process.";
return -1;
}
return 0;
}
/**
* Starts a virtual fs ReadWrite session with hash map enabled.
* If RW session already started, this will simply acquire a consumer reference.
* @return 0 on success. -1 on failure.
*/
int acquire_rw_session()
{
std::scoped_lock lock(ctx.rw_mutex);
LOG_DEBUG << "Starting hpfs rw session at " << conf::ctx.hpfs_rw_dir;
const std::string session_file = conf::ctx.hpfs_mount_dir + RW_SESSION;
// The sessions creation either should be succesful or should report as already exists (errno=EEXIST).
// Otherwise we consider it as failure.
if (mknod(session_file.c_str(), 0, 0) == -1 && errno != EEXIST)
{
LOG_ERROR << errno << ": Error starting hpfs rw session at " << conf::ctx.hpfs_rw_dir;
return -1;
}
ctx.rw_consumers++;
return 0;
}
/**
* Releases a consumer reference to the RW session. If there are no more references,
* actually stops the running RW session.
* @return 0 on success. -1 on failure.
*/
int release_rw_session()
{
std::scoped_lock lock(ctx.rw_mutex);
if (ctx.rw_consumers > 0)
ctx.rw_consumers--;
if (ctx.rw_consumers == 0)
{
const std::string session_file = conf::ctx.hpfs_mount_dir + RW_SESSION;
if (unlink(session_file.c_str()) == -1)
{
LOG_ERROR << errno << ": Error stopping hpfs rw session at " << conf::ctx.hpfs_rw_dir;
return -1;
}
}
return 0;
}
/**
* Starts a virtual fs ReadOnly session.
* @return 0 on success. -1 on failure.
*/
int start_ro_session(const std::string &name, const bool hmap_enabled)
{
LOG_DEBUG << "Starting hpfs ro session " << name << " hmap:" << hmap_enabled;
const std::string session_file = conf::ctx.hpfs_mount_dir + (hmap_enabled ? RO_SESSION_HMAP : RO_SESSION) + name;
if (mknod(session_file.c_str(), 0, 0) == -1)
{
LOG_ERROR << errno << ": Error starting hpfs ro session " << name;
return -1;
}
return 0;
}
/**
* Stops the specified ReadOnly fs session.
* @return 0 on success. -1 on failure.
*/
int stop_ro_session(const std::string &name)
{
LOG_DEBUG << "Stopping hpfs ro session " << name;
const std::string session_file = conf::ctx.hpfs_mount_dir + RO_SESSION + name;
if (unlink(session_file.c_str()) == -1)
{
LOG_ERROR << errno << ": Error stopping hpfs ro session " << name;
return -1;
}
return 0;
}
/**
* Populates the hash of the specified vpath.
* @return 1 on success. 0 if vpath not found. -1 on error.
*/
int get_hash(util::h32 &hash, std::string_view session_name, std::string_view vpath)
{
const std::string path = physical_path(session_name, std::string(vpath).append(HMAP_HASH));
const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1 && errno == ENOENT)
{
LOG_DEBUG << "Cannot get hash. vpath not found. " << vpath;
return 0;
}
else if (fd == -1)
{
LOG_ERROR << errno << ": Error opening hash file. " << vpath;
return -1;
}
const int res = read(fd, &hash, sizeof(util::h32));
close(fd);
if (res == -1)
{
LOG_ERROR << errno << ": Error reading hash file. " << vpath;
return -1;
}
return 1;
}
/**
* Populates the list of file block hashes for the specified vpath.
* @return 1 on success. 0 if vpath not found. -1 on error.
*/
int get_file_block_hashes(std::vector<util::h32> &hashes, std::string_view session_name, std::string_view vpath)
{
const std::string path = physical_path(session_name, std::string(vpath).append(HMAP_CHILDREN));
const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1 && errno == ENOENT)
{
LOG_DEBUG << "Cannot get file block hashes. vpath not found. " << vpath;
return 0;
}
else if (fd == -1)
{
LOG_DEBUG << errno << ": Error opening hashmap children. " << vpath;
return -1;
}
struct stat st;
if (fstat(fd, &st) == -1)
{
close(fd);
LOG_ERROR << errno << ": Error reading block hashes length. " << vpath;
return -1;
}
const int children_count = st.st_size / sizeof(util::h32);
hashes.resize(children_count);
const int res = read(fd, hashes.data(), st.st_size);
close(fd);
if (res == -1)
{
LOG_ERROR << errno << ": Error reading block hashes. " << vpath;
return -1;
}
return 1;
}
/**
* Populates the list of dir entry hashes for the specified vpath.
* @return 1 on success. 0 if vpath not found. -1 on error.
*/
int get_dir_children_hashes(std::vector<child_hash_node> &hash_nodes, std::string_view session_name, std::string_view dir_vpath)
{
const std::string path = physical_path(session_name, std::string(dir_vpath).append(HMAP_CHILDREN));
const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1 && errno == ENOENT)
{
LOG_DEBUG << "Cannot get dir children hashes. Dir vpath not found. " << dir_vpath;
return 0;
}
else if (fd == -1)
{
LOG_ERROR << errno << ": Error opening dir hash children nodes. " << dir_vpath;
return -1;
}
struct stat st;
if (fstat(fd, &st) == -1)
{
close(fd);
LOG_ERROR << errno << ": Error reading hash children nodes length. " << dir_vpath;
return -1;
}
const int children_count = st.st_size / sizeof(child_hash_node);
hash_nodes.resize(children_count);
const int res = read(fd, hash_nodes.data(), st.st_size);
close(fd);
if (res == -1)
{
LOG_ERROR << errno << ": Error reading hash children nodes. " << dir_vpath;
return -1;
}
return 1;
}
const std::string physical_path(std::string_view session_name, std::string_view vpath)
{
return conf::ctx.hpfs_mount_dir + "/" + session_name.data() + vpath.data();
contract_fs.deinit();
contract_serve.deinit();
contract_sync.deinit();
}
} // namespace hpfs

View File

@@ -1,91 +1,17 @@
#ifndef _HP_HPFS_HPFS_
#define _HP_HPFS_HPFS_
#ifndef _HP_HPFS_HPFS
#define _HP_HPFS_HPFS
#include "../pchheader.hpp"
#include "../util/h32.hpp"
#include "../conf.hpp"
#include "./hpfs_mount.hpp"
#include "./hpfs_sync.hpp"
namespace hpfs
{
constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB;
constexpr const char *RW_SESSION_NAME = "rw"; // The built-in session name used by hpfs for RW sessions.
constexpr const char *STATE_DIR_PATH = "/state"; // State directory name.
constexpr const char *PATCH_FILE_PATH = "/patch.cfg"; // Config patch filename.
struct child_hash_node
{
bool is_file = false;
char name[256];
util::h32 hash;
child_hash_node()
{
memset(name, 0, sizeof(name));
}
};
inline uint16_t get_request_resubmit_timeout()
{
return conf::cfg.contract.roundtime;
}
enum HPFS_PARENT_COMPONENTS
{
STATE,
PATCH
};
struct hpfs_context
{
private:
std::vector<util::h32> parent_hashes; // Keep hashes of each hpfs parent.
std::shared_mutex parent_mutexes[2] = {std::shared_mutex(), std::shared_mutex()}; // Mutexes for each parent.
public:
pid_t hpfs_pid = 0;
// No. of consumers for RW session.
// We use this as a reference counting mechanism to cleanup RW session when no one requires it.
uint32_t rw_consumers = 0;
std::mutex rw_mutex;
hpfs_context()
{
parent_hashes.reserve(2);
for (size_t i = 0; i < 2; i++)
{
parent_hashes.push_back(util::h32_empty);
}
}
util::h32 get_hash(const HPFS_PARENT_COMPONENTS parent)
{
std::shared_lock lock(parent_mutexes[parent]);
return parent_hashes[parent];
}
void set_hash(const HPFS_PARENT_COMPONENTS parent, util::h32 new_state)
{
std::unique_lock lock(parent_mutexes[parent]);
parent_hashes[parent] = new_state;
}
};
extern hpfs_context ctx;
constexpr int32_t CONTRACT_FS_ID = 0;
extern hpfs::hpfs_mount contract_fs; // Global contract file system instance.
extern hpfs::hpfs_sync contract_sync; // Global contract file system sync instance.
int init();
void deinit();
int prepare_fs();
int start_hpfs_process(pid_t &hpfs_pid);
int acquire_rw_session();
int release_rw_session();
int start_ro_session(const std::string &name, const bool hmap_enabled);
int stop_ro_session(const std::string &name);
int get_hash(util::h32 &hash, std::string_view session_name, std::string_view vpath);
int get_file_block_hashes(std::vector<util::h32> &hashes, std::string_view session_name, std::string_view vpath);
int get_dir_children_hashes(std::vector<child_hash_node> &hash_nodes, std::string_view session_name, std::string_view dir_vpath);
const std::string physical_path(std::string_view session_name, std::string_view vpath);
} // namespace hpfs
#endif

400
src/hpfs/hpfs_mount.cpp Normal file
View File

@@ -0,0 +1,400 @@
#include "hpfs_mount.hpp"
#include "../conf.hpp"
#include "../hplog.hpp"
#include "../util/util.hpp"
#include "../util/h32.hpp"
#include "../sc.hpp"
namespace hpfs
{
constexpr const char *TRACE_ARG_ERROR = "trace=error";
// Trace is set to error intentionally to prevent log pollution in debug mode. Change this in hpfs specific debugging.
constexpr const char *TRACE_ARG_DEBUG = "trace=error";
constexpr const char *RW_SESSION = "/::hpfs.rw.hmap";
constexpr const char *RO_SESSION = "/::hpfs.ro.";
constexpr const char *RO_SESSION_HMAP = "/::hpfs.ro.hmap.";
constexpr const char *HMAP_HASH = "::hpfs.hmap.hash";
constexpr const char *HMAP_CHILDREN = "::hpfs.hmap.children";
constexpr ino_t ROOT_INO = 1;
constexpr uint16_t PROCESS_INIT_TIMEOUT = 2000;
constexpr uint16_t INIT_CHECK_INTERVAL = 20;
/**
* This should be called to activate the hpfs mount process.
*/
int hpfs_mount::init(const int32_t mount_id, std::string_view fs_dir, std::string_view mount_dir, std::string_view rw_dir, const bool is_full_history)
{
this->mount_id = mount_id;
this->fs_dir = fs_dir;
this->mount_dir = mount_dir;
this->rw_dir = rw_dir;
this->is_full_history = is_full_history;
if (start_hpfs_process() == -1)
return -1;
if (prepare_fs() == -1)
{
util::kill_process(hpfs_pid, true);
return -1;
}
init_success = true;
return 0;
}
/**
* Performs cleanup related to hpfs mount execution.
*/
void hpfs_mount::deinit()
{
if (init_success)
{
LOG_DEBUG << "Stopping hpfs process... pid:" << hpfs_pid;
if (hpfs_pid > 0 && util::kill_process(hpfs_pid, true) == 0)
LOG_INFO << "Stopped hpfs process.";
}
}
/**
* This perform file system preparation tasks.
* @return 0 on success. -1 on failure.
*/
int hpfs_mount::prepare_fs()
{
// This contract mount specific preparation logic will be moved to a seprate child class in the next PBI.
util::h32 initial_state_hash;
util::h32 initial_patch_hash;
if (acquire_rw_session() == -1 ||
conf::populate_patch_config() == -1 ||
get_hash(initial_state_hash, RW_SESSION_NAME, hpfs::STATE_DIR_PATH) == -1 ||
get_hash(initial_patch_hash, RW_SESSION_NAME, hpfs::PATCH_FILE_PATH) == -1 ||
release_rw_session() == -1)
{
LOG_ERROR << "Failed to prepare initial fs at mount " << mount_dir << ".";
return -1;
}
set_parent_hash(hpfs::STATE_DIR_PATH, initial_state_hash);
set_parent_hash(hpfs::PATCH_FILE_PATH, initial_patch_hash);
LOG_INFO << "Initial state: " << initial_state_hash << " | patch: " << initial_patch_hash;
return 0;
}
/**
* Starts the hpfs process used for all fs sessions of the mount.
*/
int hpfs_mount::start_hpfs_process()
{
const pid_t pid = fork();
if (pid > 0)
{
// HotPocket process.
LOG_DEBUG << "Starting hpfs process at " << mount_dir << ".";
// Wait until hpfs is initialized properly.
const uint16_t max_retries = PROCESS_INIT_TIMEOUT / INIT_CHECK_INTERVAL;
bool hpfs_initialized = false;
uint16_t retry_count = 0;
do
{
util::sleep(INIT_CHECK_INTERVAL);
// Check if hpfs process is still running.
// Sending signal 0 to test whether process exist.
if (util::kill_process(pid, false, 0) == -1)
{
LOG_ERROR << "hpfs process " << pid << " has stopped.";
break;
}
// We check for the specific inode no. of the mounted root dir. That means hpfs FUSE interface is up.
struct stat st;
if (stat(mount_dir.data(), &st) == -1)
{
LOG_ERROR << errno << ": Error in checking hpfs status at mount " << mount_dir << ".";
break;
}
hpfs_initialized = (st.st_ino == ROOT_INO);
// Keep retrying until root inode no. matches or timeout occurs.
} while (!hpfs_initialized && ++retry_count <= max_retries);
// Kill the process if hpfs couldn't be initialized properly.
if (!hpfs_initialized)
{
LOG_ERROR << "Couldn't initialize hpfs process at mount " << mount_dir << ".";
util::kill_process(pid, true);
return -1;
}
hpfs_pid = pid;
LOG_DEBUG << "hpfs process started. pid:" << hpfs_pid;
}
else if (pid == 0)
{
// hpfs process.
util::fork_detach();
const char *active_hpfs_trace_arg = (conf::cfg.log.loglevel_type == conf::LOG_SEVERITY::DEBUG ? TRACE_ARG_DEBUG : TRACE_ARG_ERROR);
// Fill process args.
char *execv_args[] = {
conf::ctx.hpfs_exe_path.data(),
(char *)"fs",
(char *)fs_dir.data(),
(char *)mount_dir.data(),
// In full history mode, we disable log merge of hpfs.
(char *)(is_full_history ? "merge=false" : "merge=true"),
(char *)active_hpfs_trace_arg,
NULL};
const int ret = execv(execv_args[0], execv_args);
std::cerr << errno << ": hpfs process execv failed at mount " << mount_dir << ".\n";
exit(1);
}
else
{
LOG_ERROR << errno << ": fork() failed when starting hpfs process at mount " << mount_dir << ".";
return -1;
}
return 0;
}
/**
* Starts a virtual fs ReadWrite session with hash map enabled.
* If RW session already started, this will simply acquire a consumer reference.
* @return 0 on success. -1 on failure.
*/
int hpfs_mount::acquire_rw_session()
{
std::scoped_lock lock(rw_mutex);
LOG_DEBUG << "Starting hpfs rw session at " << rw_dir;
const std::string session_file = mount_dir + RW_SESSION;
// The sessions creation either should be succesful or should report as already exists (errno=EEXIST).
// Otherwise we consider it as failure.
if (mknod(session_file.c_str(), 0, 0) == -1 && errno != EEXIST)
{
LOG_ERROR << errno << ": Error starting hpfs rw session at " << rw_dir;
return -1;
}
rw_consumers++;
return 0;
}
/**
* Releases a consumer reference to the RW session. If there are no more references,
* actually stops the running RW session.
* @return 0 on success. -1 on failure.
*/
int hpfs_mount::release_rw_session()
{
std::scoped_lock lock(rw_mutex);
if (rw_consumers > 0)
rw_consumers--;
if (rw_consumers == 0)
{
const std::string session_file = mount_dir + RW_SESSION;
if (unlink(session_file.c_str()) == -1)
{
LOG_ERROR << errno << ": Error stopping hpfs rw session at " << rw_dir;
return -1;
}
}
return 0;
}
/**
* Starts a virtual fs ReadOnly session.
* @return 0 on success. -1 on failure.
*/
int hpfs_mount::start_ro_session(const std::string &name, const bool hmap_enabled)
{
LOG_DEBUG << "Starting hpfs ro session " << name << " hmap:" << hmap_enabled;
const std::string session_file = mount_dir + (hmap_enabled ? RO_SESSION_HMAP : RO_SESSION) + name;
if (mknod(session_file.c_str(), 0, 0) == -1)
{
LOG_ERROR << errno << ": Error starting hpfs ro session " << name;
return -1;
}
return 0;
}
/**
* Stops the specified ReadOnly fs session.
* @return 0 on success. -1 on failure.
*/
int hpfs_mount::stop_ro_session(const std::string &name)
{
LOG_DEBUG << "Stopping hpfs ro session " << name;
const std::string session_file = mount_dir + RO_SESSION + name;
if (unlink(session_file.c_str()) == -1)
{
LOG_ERROR << errno << ": Error stopping hpfs ro session " << name;
return -1;
}
return 0;
}
/**
* Populates the hash of the specified vpath.
* @return 1 on success. 0 if vpath not found. -1 on error.
*/
int hpfs_mount::get_hash(util::h32 &hash, std::string_view session_name, std::string_view vpath)
{
const std::string path = physical_path(session_name, std::string(vpath).append(HMAP_HASH));
const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1 && errno == ENOENT)
{
LOG_DEBUG << "Cannot get hash. vpath not found. " << vpath;
return 0;
}
else if (fd == -1)
{
LOG_ERROR << errno << ": Error opening hash file. " << vpath;
return -1;
}
const int res = read(fd, &hash, sizeof(util::h32));
close(fd);
if (res == -1)
{
LOG_ERROR << errno << ": Error reading hash file. " << vpath;
return -1;
}
return 1;
}
/**
* Populates the list of file block hashes for the specified vpath.
* @return 1 on success. 0 if vpath not found. -1 on error.
*/
int hpfs_mount::get_file_block_hashes(std::vector<util::h32> &hashes, std::string_view session_name, std::string_view vpath)
{
const std::string path = physical_path(session_name, std::string(vpath).append(HMAP_CHILDREN));
const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1 && errno == ENOENT)
{
LOG_DEBUG << "Cannot get file block hashes. vpath not found. " << vpath;
return 0;
}
else if (fd == -1)
{
LOG_DEBUG << errno << ": Error opening hashmap children. " << vpath;
return -1;
}
struct stat st;
if (fstat(fd, &st) == -1)
{
close(fd);
LOG_ERROR << errno << ": Error reading block hashes length. " << vpath;
return -1;
}
const int children_count = st.st_size / sizeof(util::h32);
hashes.resize(children_count);
const int res = read(fd, hashes.data(), st.st_size);
close(fd);
if (res == -1)
{
LOG_ERROR << errno << ": Error reading block hashes. " << vpath;
return -1;
}
return 1;
}
/**
* Populates the list of dir entry hashes for the specified vpath.
* @return 1 on success. 0 if vpath not found. -1 on error.
*/
int hpfs_mount::get_dir_children_hashes(std::vector<child_hash_node> &hash_nodes, std::string_view session_name, std::string_view dir_vpath)
{
const std::string path = physical_path(session_name, std::string(dir_vpath).append(HMAP_CHILDREN));
const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1 && errno == ENOENT)
{
LOG_DEBUG << "Cannot get dir children hashes. Dir vpath not found. " << dir_vpath;
return 0;
}
else if (fd == -1)
{
LOG_ERROR << errno << ": Error opening dir hash children nodes. " << dir_vpath;
return -1;
}
struct stat st;
if (fstat(fd, &st) == -1)
{
close(fd);
LOG_ERROR << errno << ": Error reading hash children nodes length. " << dir_vpath;
return -1;
}
const int children_count = st.st_size / sizeof(child_hash_node);
hash_nodes.resize(children_count);
const int res = read(fd, hash_nodes.data(), st.st_size);
close(fd);
if (res == -1)
{
LOG_ERROR << errno << ": Error reading hash children nodes. " << dir_vpath;
return -1;
}
return 1;
}
const std::string hpfs_mount::physical_path(std::string_view session_name, std::string_view vpath)
{
return mount_dir + "/" + session_name.data() + vpath.data();
}
/**
* This returns the hash of a given parent.
* @param parent_vpath vpath of the parent file or directory.
* @return Returns the hash of the given vpath if available or
* an empth h32 hash if parent vpath not available.
*/
const util::h32 hpfs_mount::get_parent_hash(const std::string &parent_vpath)
{
std::shared_lock lock(parent_hashes_mutex);
const auto itr = parent_hashes.find(parent_vpath);
if (itr == parent_hashes.end())
{
return util::h32_empty; // Looking parent_vpath is not found.
}
return itr->second;
}
/**
* This set the hash of a given parent.
* @param parent_vpath vpath of the parent file or directory.
* @param new_state Hash of the parent.
*/
void hpfs_mount::set_parent_hash(const std::string &parent_vpath, const util::h32 new_state)
{
std::unique_lock lock(parent_hashes_mutex);
const auto itr = parent_hashes.find(parent_vpath);
if (itr == parent_hashes.end())
{
parent_hashes.try_emplace(parent_vpath, new_state);
}
else
{
itr->second = new_state;
}
}
} // namespace hpfs

75
src/hpfs/hpfs_mount.hpp Normal file
View File

@@ -0,0 +1,75 @@
#ifndef _HP_HPFS_HPFS_MOUNT_
#define _HP_HPFS_HPFS_MOUNT_
#include "../pchheader.hpp"
#include "../util/h32.hpp"
#include "../conf.hpp"
namespace hpfs
{
constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB;
constexpr const char *RW_SESSION_NAME = "rw"; // The built-in session name used by hpfs for RW sessions.
constexpr const char *STATE_DIR_PATH = "/state"; // State directory name.
constexpr const char *PATCH_FILE_PATH = "/patch.cfg"; // Config patch filename.
struct child_hash_node
{
bool is_file = false;
char name[256];
util::h32 hash;
child_hash_node()
{
memset(name, 0, sizeof(name));
}
};
inline uint16_t get_request_resubmit_timeout()
{
return conf::cfg.contract.roundtime;
}
/**
* This class represents a hpfs file system mount.
*/
class hpfs_mount
{
private:
pid_t hpfs_pid = 0;
std::string fs_dir;
std::string mount_dir;
bool is_full_history;
bool init_success = false;
// Keeps the hashes of hpfs parents against its vpath.
std::unordered_map<std::string, util::h32> parent_hashes;
std::shared_mutex parent_hashes_mutex;
// No. of consumers for RW session.
// We use this as a reference counting mechanism to cleanup RW session when no one requires it.
uint32_t rw_consumers = 0;
std::mutex rw_mutex;
protected:
virtual int prepare_fs();
public:
int32_t mount_id; // Used in hpfs serving and syncing.
std::string rw_dir;
int init(const int32_t mount_id, std::string_view fs_dir, std::string_view mount_dir, std::string_view rw_dir, const bool is_full_history);
void deinit();
int start_hpfs_process();
int acquire_rw_session();
int release_rw_session();
int start_ro_session(const std::string &name, const bool hmap_enabled);
int stop_ro_session(const std::string &name);
int get_hash(util::h32 &hash, std::string_view session_name, std::string_view vpath);
int get_file_block_hashes(std::vector<util::h32> &hashes, std::string_view session_name, std::string_view vpath);
int get_dir_children_hashes(std::vector<child_hash_node> &hash_nodes, std::string_view session_name, std::string_view dir_vpath);
const std::string physical_path(std::string_view session_name, std::string_view vpath);
const util::h32 get_parent_hash(const std::string &parent_vpath);
void set_parent_hash(const std::string &parent_vpath, const util::h32 new_state);
};
} // namespace hpfs
#endif

View File

@@ -13,28 +13,36 @@
namespace p2pmsg = msg::fbuf::p2pmsg;
/**
* Helper functions for serving hpfs requests from other peers.
* Class for serving hpfs requests from other peers.
*/
namespace hpfs_serve
namespace hpfs
{
constexpr uint16_t LOOP_WAIT = 20; // Milliseconds
constexpr const char *HPFS_SESSION_NAME = "rw";
uint16_t REQUEST_BATCH_TIMEOUT;
bool is_shutting_down = false;
bool init_success = false;
std::thread hpfs_serve_thread;
int init()
/**
* @param name The name of the serving instance. (For identification purpose)
* @param fs_mount The pointer to the relavent hpfs mount instance this server is serving.
* @return This returns -1 on error and 0 on success.
*/
int hpfs_serve::init(std::string_view name, hpfs::hpfs_mount *fs_mount)
{
if (fs_mount == NULL)
return -1;
this->name = name;
this->fs_mount = fs_mount;
REQUEST_BATCH_TIMEOUT = hpfs::get_request_resubmit_timeout() * 0.9;
hpfs_serve_thread = std::thread(hpfs_serve_loop);
hpfs_serve_thread = std::thread(&hpfs_serve::hpfs_serve_loop, this);
init_success = true;
return 0;
}
void deinit()
/**
* Perform cleanup activities.
*/
void hpfs_serve::deinit()
{
if (init_success)
{
@@ -43,13 +51,13 @@ namespace hpfs_serve
}
}
void hpfs_serve_loop()
void hpfs_serve::hpfs_serve_loop()
{
util::mask_signal();
LOG_INFO << "Hpfs server started.";
LOG_INFO << "Hpfs " << name << " server started.";
std::list<std::pair<std::string, std::string>> hpfs_requests;
std::list<std::pair<std::string, p2p::hpfs_request>> hpfs_requests;
// Indicates whether any requests were processed in the previous loop iteration.
bool prev_requests_processed = false;
@@ -60,13 +68,7 @@ namespace hpfs_serve
if (!prev_requests_processed)
util::sleep(LOOP_WAIT);
{
std::scoped_lock<std::mutex> lock(p2p::ctx.collected_msgs.hpfs_requests_mutex);
// Move collected hpfs requests over to local requests list.
if (!p2p::ctx.collected_msgs.hpfs_requests.empty())
hpfs_requests.splice(hpfs_requests.end(), p2p::ctx.collected_msgs.hpfs_requests);
}
swap_collected_requests(hpfs_requests);
prev_requests_processed = !hpfs_requests.empty();
const uint64_t time_start = util::get_epoch_milliseconds();
@@ -75,9 +77,9 @@ namespace hpfs_serve
if (hpfs_requests.empty())
continue;
if (hpfs::acquire_rw_session() != -1)
if (fs_mount->acquire_rw_session() != -1)
{
for (auto &[session_id, request] : hpfs_requests)
for (auto &[session_id, hr] : hpfs_requests)
{
if (is_shutting_down)
break;
@@ -87,19 +89,15 @@ namespace hpfs_serve
const uint64_t time_now = util::get_epoch_milliseconds();
if ((time_now - time_start) > REQUEST_BATCH_TIMEOUT)
{
LOG_DEBUG << "Hpfs serve batch timeout. Abandonding hpfs requests.";
LOG_DEBUG << "Hpfs " << name << " serve batch timeout. Abandonding hpfs requests.";
break;
}
// Session id is in binary format. Converting to hex before printing.
LOG_DEBUG << "Serving hpfs request from [" << util::to_hex(session_id).substr(2, 10) << "]";
const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(request.data());
const p2p::hpfs_request sr = p2pmsg::create_hpfs_request_from_msg(*content->message_as_Hpfs_Request_Message());
flatbuffers::FlatBufferBuilder fbuf(1024);
if (hpfs_serve::create_hpfs_response(fbuf, sr, lcl) == 1)
if (hpfs_serve::create_hpfs_response(fbuf, hr, lcl) == 1)
{
// Find the peer that we should send the hpfs response to.
std::scoped_lock<std::mutex> lock(p2p::ctx.peer_connections_mutex);
@@ -116,13 +114,12 @@ namespace hpfs_serve
}
}
hpfs::release_rw_session();
fs_mount->release_rw_session();
}
hpfs_requests.clear();
}
LOG_INFO << "Hpfs server stopped.";
LOG_INFO << "Hpfs " << name << " server stopped.";
}
/**
@@ -132,7 +129,7 @@ namespace hpfs_serve
* @return 1 if successful hpfs response was generated. 0 if request is invalid
* and no response was generated. -1 on error.
*/
int create_hpfs_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::hpfs_request &hr, std::string_view lcl)
int hpfs_serve::create_hpfs_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::hpfs_request &hr, std::string_view lcl)
{
LOG_DEBUG << "Serving hpfs req. path:" << hr.parent_path << " block_id:" << hr.block_id;
@@ -157,7 +154,7 @@ namespace hpfs_serve
resp.hash = hr.expected_hash;
resp.data = std::string_view(reinterpret_cast<const char *>(block.data()), block.size());
msg::fbuf::p2pmsg::create_msg_from_block_response(fbuf, resp, lcl);
msg::fbuf::p2pmsg::create_msg_from_block_response(fbuf, resp, fs_mount->mount_id, lcl);
return 1; // Success.
}
}
@@ -178,7 +175,7 @@ namespace hpfs_serve
else if (result == 1)
{
msg::fbuf::p2pmsg::create_msg_from_filehashmap_response(
fbuf, hr.parent_path, block_hashes,
fbuf, hr.parent_path, fs_mount->mount_id, block_hashes,
file_length, hr.expected_hash, lcl);
return 1; // Success.
}
@@ -198,7 +195,7 @@ namespace hpfs_serve
else if (result == 1)
{
msg::fbuf::p2pmsg::create_msg_from_fsentry_response(
fbuf, hr.parent_path, child_hash_nodes, hr.expected_hash, lcl);
fbuf, hr.parent_path, fs_mount->mount_id, child_hash_nodes, hr.expected_hash, lcl);
return 1; // Success.
}
}
@@ -212,12 +209,12 @@ namespace hpfs_serve
* Retrieves the specified data block from a hpfs file if expected hash matches.
* @return 1 if block data was succefully fetched. 0 if vpath or block does not exist. -1 on error.
*/
int get_data_block(std::vector<uint8_t> &block, const std::string_view vpath,
const uint32_t block_id, const util::h32 expected_hash)
int hpfs_serve::get_data_block(std::vector<uint8_t> &block, const std::string_view vpath,
const uint32_t block_id, const util::h32 expected_hash)
{
// Check whether the existing block hash matches expected hash.
std::vector<util::h32> block_hashes;
int result = hpfs::get_file_block_hashes(block_hashes, HPFS_SESSION_NAME, vpath);
int result = fs_mount->get_file_block_hashes(block_hashes, HPFS_SESSION_NAME, vpath);
if (result == 1)
{
if (block_id >= block_hashes.size())
@@ -233,7 +230,7 @@ namespace hpfs_serve
else // Get actual block data.
{
struct stat st;
const std::string file_path = conf::ctx.hpfs_rw_dir + vpath.data();
const std::string file_path = fs_mount->rw_dir + vpath.data();
const off_t block_offset = block_id * hpfs::BLOCK_SIZE;
const int fd = open(file_path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1)
@@ -289,12 +286,12 @@ namespace hpfs_serve
* Retrieves the specified file block hashes if expected hash matches.
* @return 1 if block hashes were successfuly fetched. 0 if vpath does not exist. -1 on error.
*/
int get_data_block_hashes(std::vector<util::h32> &hashes, size_t &file_length,
const std::string_view vpath, const util::h32 expected_hash)
int hpfs_serve::get_data_block_hashes(std::vector<util::h32> &hashes, size_t &file_length,
const std::string_view vpath, const util::h32 expected_hash)
{
// Check whether the existing file hash matches expected hash.
util::h32 file_hash = util::h32_empty;
int result = hpfs::get_hash(file_hash, HPFS_SESSION_NAME, vpath);
int result = fs_mount->get_hash(file_hash, HPFS_SESSION_NAME, vpath);
if (result == 1)
{
if (file_hash != expected_hash)
@@ -303,14 +300,14 @@ namespace hpfs_serve
result = 0;
}
// Get the block hashes.
else if (hpfs::get_file_block_hashes(hashes, HPFS_SESSION_NAME, vpath) < 0)
else if (fs_mount->get_file_block_hashes(hashes, HPFS_SESSION_NAME, vpath) < 0)
{
result = -1;
}
else
{
// Get actual file length.
const std::string file_path = conf::ctx.hpfs_rw_dir + vpath.data();
const std::string file_path = fs_mount->rw_dir + vpath.data();
struct stat st;
if (stat(file_path.c_str(), &st) == -1)
{
@@ -329,12 +326,12 @@ namespace hpfs_serve
* Retrieves the specified dir entry hashes if expected fir hash matches.
* @return 1 if fs entry hashes were successfuly fetched. 0 if vpath does not exist. -1 on error.
*/
int get_fs_entry_hashes(std::vector<hpfs::child_hash_node> &hash_nodes,
const std::string_view vpath, const util::h32 expected_hash)
int hpfs_serve::get_fs_entry_hashes(std::vector<hpfs::child_hash_node> &hash_nodes,
const std::string_view vpath, const util::h32 expected_hash)
{
// Check whether the existing dir hash matches expected hash.
util::h32 dir_hash = util::h32_empty;
int result = hpfs::get_hash(dir_hash, HPFS_SESSION_NAME, vpath);
int result = fs_mount->get_hash(dir_hash, HPFS_SESSION_NAME, vpath);
if (result == 1)
{
if (dir_hash != expected_hash)
@@ -343,7 +340,7 @@ namespace hpfs_serve
result = 0;
}
// Get the children hash nodes.
else if (hpfs::get_dir_children_hashes(hash_nodes, HPFS_SESSION_NAME, vpath) < 0)
else if (fs_mount->get_dir_children_hashes(hash_nodes, HPFS_SESSION_NAME, vpath) < 0)
{
result = -1;
}
@@ -355,4 +352,16 @@ namespace hpfs_serve
return result;
}
} // namespace hpfs_serve
/**
* Move the collected requests from hpfs requests to the local hpfs request list.
*/
void hpfs_serve::swap_collected_requests(std::list<std::pair<std::string, p2p::hpfs_request>> &hpfs_requests)
{
std::scoped_lock<std::mutex> lock(p2p::ctx.collected_msgs.contract_hpfs_requests_mutex);
// Move collected hpfs requests for contract fs over to local requests list.
if (!p2p::ctx.collected_msgs.contract_hpfs_requests.empty())
hpfs_requests.splice(hpfs_requests.end(), p2p::ctx.collected_msgs.contract_hpfs_requests);
}
} // namespace hpfs

View File

@@ -2,28 +2,43 @@
#define _HP_HPFS_HPFS_SERVE_
#include "../util/h32.hpp"
#include "hpfs.hpp"
#include "hpfs_mount.hpp"
#include "../p2p/p2p.hpp"
#include "../msg/fbuf/p2pmsg_content_generated.h"
namespace hpfs_serve
namespace hpfs
{
int init();
class hpfs_serve
{
private:
uint16_t REQUEST_BATCH_TIMEOUT;
void deinit();
void hpfs_serve_loop();
bool is_shutting_down = false;
bool init_success = false;
std::thread hpfs_serve_thread;
hpfs::hpfs_mount *fs_mount = NULL;
std::string_view name;
void hpfs_serve_loop();
int create_hpfs_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::hpfs_request &sr, std::string_view lcl);
protected:
virtual void swap_collected_requests(std::list<std::pair<std::string, p2p::hpfs_request>> &hpfs_requests); // Must override in child classes.
int get_data_block(std::vector<uint8_t> &vec, const std::string_view vpath,
const uint32_t block_id, const util::h32 expected_hash);
public:
int init(std::string_view name, hpfs::hpfs_mount *fs_mount);
int get_data_block_hashes(std::vector<util::h32> &hashes, size_t &file_length,
const std::string_view vpath, const util::h32 expected_hash);
void deinit();
int get_fs_entry_hashes(std::vector<hpfs::child_hash_node> &hash_nodes,
int create_hpfs_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::hpfs_request &sr, std::string_view lcl);
int get_data_block(std::vector<uint8_t> &block, const std::string_view vpath,
const uint32_t block_id, const util::h32 expected_hash);
int get_data_block_hashes(std::vector<util::h32> &hashes, size_t &file_length,
const std::string_view vpath, const util::h32 expected_hash);
int get_fs_entry_hashes(std::vector<hpfs::child_hash_node> &hash_nodes,
const std::string_view vpath, const util::h32 expected_hash);
} // namespace hpfs_sync
};
} // namespace hpfs
#endif

View File

@@ -6,13 +6,12 @@
#include "../ledger.hpp"
#include "../hplog.hpp"
#include "../util/util.hpp"
#include "../hpfs/hpfs.hpp"
#include "../util/h32.hpp"
#include "hpfs_sync.hpp"
#include "../sc.hpp"
#include "../unl.hpp"
namespace hpfs_sync
namespace hpfs
{
// Idle loop sleep time (milliseconds).
constexpr uint16_t IDLE_WAIT = 40;
@@ -28,25 +27,26 @@ namespace hpfs_sync
constexpr int FILE_PERMS = 0644;
// No. of milliseconds to wait before resubmitting a request.
uint16_t REQUEST_RESUBMIT_TIMEOUT;
sync_context ctx;
bool init_success = false;
int init()
/**
* This should be called to activate the hpfs sync.
*/
int hpfs_sync::init(std::string_view name, hpfs::hpfs_mount *fs_mount)
{
if (fs_mount == NULL)
return -1;
this->name = name;
this->fs_mount = fs_mount;
REQUEST_RESUBMIT_TIMEOUT = hpfs::get_request_resubmit_timeout();
ctx.target_state_hash = util::h32_empty;
ctx.target_patch_hash = util::h32_empty;
ctx.current_parent_target_hash = util::h32_empty;
// Patch file sync has the highest priority.
ctx.current_syncing_parent = hpfs::HPFS_PARENT_COMPONENTS::PATCH;
ctx.hpfs_sync_thread = std::thread(hpfs_syncer_loop);
ctx.hpfs_sync_thread = std::thread(&hpfs_sync::hpfs_syncer_loop, this);
init_success = true;
return 0;
}
void deinit()
/**
* Perform relavent cleaning.
*/
void hpfs_sync::deinit()
{
if (init_success)
{
@@ -57,41 +57,35 @@ namespace hpfs_sync
}
/**
* Sets a new target states for the syncing process.
* @param target_state_hash The target hpfs state which we should sync towards.
* @param target_patch_hash The target hpfs patch state which we should sync towards.
* Sets a list of sync targets. Sync finishes when all the targets are synced.
* Syncing happens sequentially.
* @param target_list List of sync targets to sync towards.
*/
void set_target(const util::h32 target_state_hash, const util::h32 target_patch_hash)
void hpfs_sync::set_target(const std::queue<sync_target> &target_list)
{
std::unique_lock lock(ctx.target_state_mutex);
// Do not do anything if we are already syncing towards the specified target states.
if (ctx.is_shutting_down || (ctx.is_syncing && ctx.target_state_hash == target_state_hash && ctx.target_patch_hash == target_patch_hash))
if (target_list.empty())
return;
ctx.target_state_hash = target_state_hash;
ctx.target_patch_hash = target_patch_hash;
if (hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH) != target_patch_hash)
{
ctx.current_syncing_parent = hpfs::HPFS_PARENT_COMPONENTS::PATCH;
ctx.current_parent_target_hash = ctx.target_patch_hash;
}
else
{
ctx.current_syncing_parent = hpfs::HPFS_PARENT_COMPONENTS::STATE;
ctx.current_parent_target_hash = ctx.target_state_hash;
}
// Do not do anything if we are already syncing towards the specified target states.
if (ctx.is_shutting_down || (ctx.is_syncing && ctx.original_target_list == target_list))
return;
ctx.original_target_list = target_list;
ctx.target_list = std::move(target_list);
std::unique_lock lock(ctx.current_target_mutex);
ctx.current_target = ctx.target_list.front(); // Make the first element of the list the first target to sync.
ctx.is_syncing = true;
}
/**
* Runs the hpfs sync worker loop.
*/
void hpfs_syncer_loop()
void hpfs_sync::hpfs_syncer_loop()
{
util::mask_signal();
LOG_INFO << "hpfs sync: Worker started.";
LOG_INFO << "hpfs " << name << " sync: Worker started.";
while (!ctx.is_shutting_down)
{
@@ -102,115 +96,81 @@ namespace hpfs_sync
if (!ctx.is_syncing)
continue;
if (hpfs::acquire_rw_session() != -1)
if (fs_mount->acquire_rw_session() != -1)
{
while (!ctx.is_shutting_down)
{
{
std::shared_lock lock(ctx.target_state_mutex);
if (ctx.current_syncing_parent == hpfs::HPFS_PARENT_COMPONENTS::PATCH)
LOG_INFO << "hpfs sync: Starting sync for target patch hash: " << ctx.target_patch_hash;
else
LOG_INFO << "hpfs sync: Starting sync for target state hash: " << ctx.target_state_hash;
std::shared_lock lock(ctx.current_target_mutex);
LOG_INFO << "hpfs " << name << " sync: Starting sync for target " << ctx.current_target.name << " hash: " << ctx.current_target.hash;
}
util::h32 new_state = util::h32_empty;
const int result = request_loop(ctx.current_parent_target_hash, new_state);
const int result = request_loop(ctx.current_target.hash, new_state);
ctx.pending_requests.clear();
ctx.candidate_hpfs_responses.clear();
ctx.submitted_requests.clear();
if (result == -1 || ctx.is_shutting_down)
if (result == -1 || result == 1 || ctx.is_shutting_down)
break;
{
std::shared_lock lock(ctx.target_state_mutex);
std::shared_lock lock(ctx.current_target_mutex);
if (new_state == ctx.current_parent_target_hash)
if (new_state == ctx.current_target.hash)
{
if (ctx.current_syncing_parent == hpfs::HPFS_PARENT_COMPONENTS::PATCH)
{
ctx.target_patch_hash = util::h32_empty;
LOG_INFO << "hpfs sync: Target patch state achieved: " << new_state;
LOG_INFO << "hpfs " << name << " sync: Target " << ctx.current_target.name << " hash achieved: " << new_state;
on_current_sync_state_acheived();
// Appling new patch file changes to hpcore runtime.
if (conf::apply_patch_config(hpfs::RW_SESSION_NAME) == -1)
{
LOG_ERROR << "Appling patch file changes after sync failed";
}
else
{
unl::update_unl_changes_from_patch();
// Update global hash tracker with the new patch file hash.
util::h32 updated_patch_hash;
hpfs::get_hash(updated_patch_hash, hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH);
hpfs::ctx.set_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH, updated_patch_hash);
}
if (ctx.target_state_hash == hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::STATE))
break;
ctx.current_parent_target_hash = ctx.target_state_hash;
ctx.current_syncing_parent = hpfs::HPFS_PARENT_COMPONENTS::STATE;
continue;
}
else if (ctx.current_syncing_parent == hpfs::HPFS_PARENT_COMPONENTS::STATE)
{
ctx.target_state_hash = util::h32_empty;
LOG_INFO << "hpfs sync: Target state achieved: " << new_state;
// Start syncing to next target.
const int result = start_syncing_next_target();
if (result == 0)
break;
}
else if (result == 1)
continue;
}
else
{
LOG_INFO << "hpfs sync: Continuing sync for new target: " << ctx.current_parent_target_hash;
LOG_INFO << "hpfs " << name << " sync: Continuing sync for new target: " << ctx.current_target.hash;
continue;
}
}
}
LOG_INFO << "hpfs sync: All parents synced.";
hpfs::release_rw_session();
LOG_INFO << "hpfs " << name << " sync: All parents synced.";
fs_mount->release_rw_session();
}
else
{
LOG_ERROR << "hpfs sync: Failed to start hpfs rw session";
LOG_ERROR << "hpfs " << name << " sync: Failed to start hpfs rw session";
}
std::unique_lock lock(ctx.target_state_mutex);
ctx.current_parent_target_hash = util::h32_empty;
// Clear target list and original target list since the sync is complete.
ctx.target_list = {};
ctx.original_target_list = {};
ctx.is_syncing = false;
}
LOG_INFO << "hpfs sync: Worker stopped.";
LOG_INFO << "hpfs " << name << " sync: Worker stopped.";
}
int request_loop(const util::h32 current_target, util::h32 &updated_state)
/**
* Reqest loop.
* @return -1 on error. 0 when current sync state acheived or sync is stopped due to target change.
* Returns 1 on successfully finishing all the sync targets.
*/
int hpfs_sync::request_loop(const util::h32 current_target, util::h32 &updated_state)
{
std::string target_parent_vpath;
BACKLOG_ITEM_TYPE target_parent_backlog_item_type;
if (ctx.current_syncing_parent == hpfs::HPFS_PARENT_COMPONENTS::STATE)
{
target_parent_vpath = hpfs::STATE_DIR_PATH;
target_parent_backlog_item_type = BACKLOG_ITEM_TYPE::DIR;
}
else if (ctx.current_syncing_parent == hpfs::HPFS_PARENT_COMPONENTS::PATCH)
{
target_parent_vpath = hpfs::PATCH_FILE_PATH;
target_parent_backlog_item_type = BACKLOG_ITEM_TYPE::FILE;
}
std::string lcl = ledger::ctx.get_lcl();
// Send the initial root hpfs request of the current target.
submit_request(backlog_item{ctx.current_target.item_type, ctx.current_target.vpath, -1, current_target}, lcl);
// Indicates whether any responses were processed in the previous loop iteration.
bool prev_responses_processed = false;
// No. of repetitive resubmissions so far. (This is reset whenever we receive a hpfs response)
uint16_t resubmissions_count = 0;
// Send the initial root hpfs request.
submit_request(backlog_item{target_parent_backlog_item_type, target_parent_vpath, -1, current_target}, lcl);
while (!should_stop_request_loop(current_target))
{
// Wait a small delay if there were no responses processed during previous iteration.
@@ -220,13 +180,8 @@ namespace hpfs_sync
// Get current lcl.
std::string lcl = ledger::ctx.get_lcl();
{
std::scoped_lock lock(p2p::ctx.collected_msgs.hpfs_responses_mutex);
// Move collected hpfs responses over to local candidate responses list.
if (!p2p::ctx.collected_msgs.hpfs_responses.empty())
ctx.candidate_hpfs_responses.splice(ctx.candidate_hpfs_responses.end(), p2p::ctx.collected_msgs.hpfs_responses);
}
// Move the received hpfs responses to the local response list.
swap_collected_responses();
prev_responses_processed = !ctx.candidate_hpfs_responses.empty();
@@ -239,7 +194,7 @@ namespace hpfs_sync
if (should_stop_request_loop(current_target))
return 0;
LOG_DEBUG << "hpfs sync: Processing hpfs response from [" << response.first.substr(2, 10) << "]";
LOG_DEBUG << "hpfs " << name << " sync: Processing hpfs response from [" << response.first.substr(2, 10) << "]";
const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(response.second.data());
const msg::fbuf::p2pmsg::Hpfs_Response_Message *resp_msg = content->message_as_Hpfs_Response_Message();
@@ -252,7 +207,7 @@ namespace hpfs_sync
const auto pending_resp_itr = ctx.submitted_requests.find(key);
if (pending_resp_itr == ctx.submitted_requests.end())
{
LOG_DEBUG << "hpfs sync: Skipping hpfs response due to hash mismatch.";
LOG_DEBUG << "hpfs " << name << " sync: Skipping hpfs response due to hash mismatch.";
continue;
}
@@ -270,7 +225,7 @@ namespace hpfs_sync
// Validate received fs data against the hash.
if (!validate_fs_entry_hash(vpath, hash, peer_fs_entry_map))
{
LOG_INFO << "hpfs sync: Skipping hpfs response due to fs entry hash mismatch.";
LOG_INFO << "hpfs " << name << " sync: Skipping hpfs response due to fs entry hash mismatch.";
continue;
}
@@ -287,7 +242,7 @@ namespace hpfs_sync
// Validate received hashmap against the hash.
if (!validate_file_hashmap_hash(vpath, hash, peer_hashes, peer_hash_count))
{
LOG_INFO << "hpfs sync: Skipping hpfs response due to file hashmap hash mismatch.";
LOG_INFO << "hpfs " << name << " sync: Skipping hpfs response due to file hashmap hash mismatch.";
continue;
}
@@ -304,7 +259,7 @@ namespace hpfs_sync
// Validate received block data against the hash.
if (!validate_file_block_hash(hash, block_id, buf))
{
LOG_INFO << "hpfs sync: Skipping hpfs response due to file block hash mismatch.";
LOG_INFO << "hpfs " << name << " sync: Skipping hpfs response due to file block hash mismatch.";
continue;
}
@@ -316,16 +271,16 @@ namespace hpfs_sync
// After handling each response, check whether we have reached target hpfs state.
// get_hash returns 0 incase target parent is not existing in our side.
if (hpfs::get_hash(updated_state, hpfs::RW_SESSION_NAME, target_parent_vpath) == -1)
if (fs_mount->get_hash(updated_state, hpfs::RW_SESSION_NAME, ctx.current_target.vpath) == -1)
{
LOG_ERROR << "hpfs sync: exiting due to hash check error.";
LOG_ERROR << "hpfs " << name << " sync: exiting due to hash check error.";
return -1;
}
// Update the central hpfs state tracker.
hpfs::ctx.set_hash(ctx.current_syncing_parent, updated_state);
fs_mount->set_parent_hash(ctx.current_target.vpath, updated_state);
LOG_DEBUG << "hpfs sync: current:" << updated_state << " | target:" << current_target;
LOG_DEBUG << "hpfs " << name << " sync: current:" << updated_state << " | target:" << current_target;
if (updated_state == current_target)
return 0;
}
@@ -347,13 +302,18 @@ namespace hpfs_sync
{
if (++resubmissions_count > ABANDON_THRESHOLD)
{
LOG_INFO << "hpfs sync: Resubmission threshold exceeded. Abandoning sync.";
return -1;
LOG_INFO << "hpfs " << name << " sync: Resubmission threshold exceeded. Abandoning sync.";
std::shared_lock lock(ctx.current_target_mutex);
const int result = start_syncing_next_target();
if (result == 0)
return 1; // To stop syncing since we have sync all the targets.
return 0;
}
// Reset the counter and re-submit request.
request.waiting_time = 0;
LOG_DEBUG << "hpfs sync: Resubmitting request...";
LOG_DEBUG << "hpfs " << name << " sync: Resubmitting request...";
submit_request(request, lcl);
}
}
@@ -373,7 +333,6 @@ namespace hpfs_sync
}
}
}
return 0;
}
@@ -384,7 +343,7 @@ namespace hpfs_sync
* @param fs_entry_map Received fs entry map.
* @returns true if hash is valid, otherwise false.
*/
bool validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entry_map)
bool hpfs_sync::validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entry_map)
{
util::h32 content_hash;
@@ -410,7 +369,7 @@ namespace hpfs_sync
* @param hash_count Size of the hash list.
* @returns true if hash is valid, otherwise false.
*/
bool validate_file_hashmap_hash(std::string_view vpath, std::string_view hash, const util::h32 *hashes, const size_t hash_count)
bool hpfs_sync::validate_file_hashmap_hash(std::string_view vpath, std::string_view hash, const util::h32 *hashes, const size_t hash_count)
{
util::h32 content_hash = util::h32_empty;
@@ -435,7 +394,7 @@ namespace hpfs_sync
* @param buf Block buffer.
* @returns true if hash is valid, otherwise false.
*/
bool validate_file_block_hash(std::string_view hash, const uint32_t block_id, std::string_view buf)
bool hpfs_sync::validate_file_block_hash(std::string_view hash, const uint32_t block_id, std::string_view buf)
{
// Calculate block offset of this block.
const off_t block_offset = block_id * hpfs::BLOCK_SIZE;
@@ -446,14 +405,14 @@ namespace hpfs_sync
/**
* Indicates whether to break out of hpfs request processing loop.
*/
bool should_stop_request_loop(const util::h32 current_target)
bool hpfs_sync::should_stop_request_loop(const util::h32 &current_target)
{
if (ctx.is_shutting_down)
return true;
// Stop request loop if the target has changed.
std::shared_lock lock(ctx.target_state_mutex);
return current_target != ctx.current_parent_target_hash;
std::shared_lock lock(ctx.current_target_mutex);
return current_target != ctx.current_target.hash;
}
/**
@@ -464,24 +423,25 @@ namespace hpfs_sync
* @param expected_hash The expected hash of the requested data. The peer will ignore the request if their hash is different.
* @param target_pubkey The peer pubkey the request was submitted to.
*/
void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id,
const util::h32 expected_hash, std::string_view lcl, std::string &target_pubkey)
void hpfs_sync::request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id,
const util::h32 expected_hash, std::string_view lcl, std::string &target_pubkey)
{
p2p::hpfs_request hr;
hr.parent_path = path;
hr.is_file = is_file;
hr.block_id = block_id;
hr.expected_hash = expected_hash;
hr.mount_id = fs_mount->mount_id;
flatbuffers::FlatBufferBuilder fbuf(1024);
msg::fbuf::p2pmsg::create_msg_from_state_request(fbuf, hr, lcl);
msg::fbuf::p2pmsg::create_msg_from_hpfs_request(fbuf, hr, lcl);
p2p::send_message_to_random_peer(fbuf, target_pubkey); //todo: send to a node that hold the majority hpfs state to improve reliability of retrieving hpfs state.
}
/**
* Submits a pending hpfs request to the peer.
*/
void submit_request(const backlog_item &request, std::string_view lcl)
void hpfs_sync::submit_request(const backlog_item &request, std::string_view lcl)
{
const std::string key = std::string(request.path)
.append(reinterpret_cast<const char *>(&request.expected_hash), sizeof(util::h32));
@@ -492,7 +452,7 @@ namespace hpfs_sync
request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, lcl, target_pubkey);
if (!target_pubkey.empty())
LOG_DEBUG << "hpfs sync: Requesting from [" << target_pubkey.substr(2, 10) << "]. type:" << request.type
LOG_DEBUG << "hpfs " << name << " sync: Requesting from [" << target_pubkey.substr(2, 10) << "]. type:" << request.type
<< " path:" << request.path << " block_id:" << request.block_id
<< " hash:" << request.expected_hash;
}
@@ -503,19 +463,19 @@ namespace hpfs_sync
* @param fs_entry_map Received fs entry map.
* @returns 0 on success, otherwise -1.
*/
int handle_fs_entry_response(std::string_view vpath, std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entry_map)
int hpfs_sync::handle_fs_entry_response(std::string_view vpath, std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entry_map)
{
// Get the parent path of the fs entries we have received.
LOG_DEBUG << "hpfs sync: Processing fs entries response for " << vpath;
LOG_DEBUG << "hpfs " << name << " sync: Processing fs entries response for " << vpath;
// Create physical directory on our side if not exist.
std::string parent_physical_path = conf::ctx.hpfs_rw_dir + vpath.data();
std::string parent_physical_path = fs_mount->rw_dir + vpath.data();
if (util::create_dir_tree_recursive(parent_physical_path) == -1)
return -1;
// Get the children hash entries and compare with what we got from peer.
std::vector<hpfs::child_hash_node> existing_fs_entries;
if (hpfs::get_dir_children_hashes(existing_fs_entries, hpfs::RW_SESSION_NAME, vpath) == -1)
if (fs_mount->get_dir_children_hashes(existing_fs_entries, hpfs::RW_SESSION_NAME, vpath) == -1)
return -1;
// Request more info on fs entries that exist on both sides but are different.
@@ -544,13 +504,13 @@ namespace hpfs_sync
else
{
// If there was an entry that does not exist on other side, delete it.
std::string child_physical_path = conf::ctx.hpfs_rw_dir + child_vpath.data();
std::string child_physical_path = fs_mount->rw_dir + child_vpath.data();
if ((ex_entry.is_file && unlink(child_physical_path.c_str()) == -1) ||
!ex_entry.is_file && util::remove_directory_recursively(child_physical_path.c_str()) == -1)
return -1;
LOG_DEBUG << "hpfs sync: Deleted " << (ex_entry.is_file ? "file" : "dir") << " path " << child_vpath;
LOG_DEBUG << "hpfs " << name << " sync: Deleted " << (ex_entry.is_file ? "file" : "dir") << " path " << child_vpath;
}
}
@@ -580,14 +540,14 @@ namespace hpfs_sync
* @param file_length Size of the file.
* @returns 0 on success, otherwise -1.
*/
int handle_file_hashmap_response(std::string_view vpath, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length)
int hpfs_sync::handle_file_hashmap_response(std::string_view vpath, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length)
{
// Get the file path of the block hashes we have received.
LOG_DEBUG << "hpfs sync: Processing file block hashes response for " << vpath;
LOG_DEBUG << "hpfs " << name << " sync: Processing file block hashes response for " << vpath;
// File block hashes on our side (file might not exist on our side).
std::vector<util::h32> existing_hashes;
if (hpfs::get_file_block_hashes(existing_hashes, hpfs::RW_SESSION_NAME, vpath) == -1 && errno != ENOENT)
if (fs_mount->get_file_block_hashes(existing_hashes, hpfs::RW_SESSION_NAME, vpath) == -1 && errno != ENOENT)
return -1;
const size_t existing_hash_count = existing_hashes.size();
@@ -604,7 +564,7 @@ namespace hpfs_sync
if (existing_hashes.size() >= hash_count)
{
// If peer file might be smaller, truncate our file to match with peer file.
std::string file_physical_path = conf::ctx.hpfs_rw_dir + vpath.data();
std::string file_physical_path = fs_mount->rw_dir + vpath.data();
if (truncate(file_physical_path.c_str(), file_length) == -1)
return -1;
}
@@ -619,13 +579,13 @@ namespace hpfs_sync
* @param buf Block buffer.
* @returns 0 on success, otherwise -1.
*/
int handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf)
int hpfs_sync::handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf)
{
LOG_DEBUG << "hpfs sync: Writing block_id " << block_id
LOG_DEBUG << "hpfs " << name << " sync: Writing block_id " << block_id
<< " (len:" << buf.length()
<< ") of " << vpath;
std::string file_physical_path = conf::ctx.hpfs_rw_dir + vpath.data();
std::string file_physical_path = fs_mount->rw_dir + vpath.data();
const int fd = open(file_physical_path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, FILE_PERMS);
if (fd == -1)
{
@@ -645,4 +605,61 @@ namespace hpfs_sync
return 0;
}
} // namespace hpfs_sync
/**
* This method can be used to invoke mount specific custom logic (after extending this super class) to be executed after
* a sync target is acheived.
*/
void hpfs_sync::on_current_sync_state_acheived()
{
if (ctx.current_target.vpath == hpfs::PATCH_FILE_PATH)
{
// Appling new patch file changes to hpcore runtime.
if (conf::apply_patch_config(hpfs::RW_SESSION_NAME) == -1)
{
LOG_ERROR << "Appling patch file changes after sync failed";
}
else
{
unl::update_unl_changes_from_patch();
// Update global hash tracker with the new patch file hash.
util::h32 updated_patch_hash;
fs_mount->get_hash(updated_patch_hash, hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH);
fs_mount->set_parent_hash(ctx.current_target.vpath, updated_patch_hash);
}
}
}
/**
* Starts syncing next target if available after current target finishes.
* @return returns 0 when the full sync is complete and 1 when more sync targets are available.
*/
int hpfs_sync::start_syncing_next_target()
{
ctx.target_list.pop(); // Remove the synced parent from the target list.
if (ctx.target_list.empty())
{
ctx.current_target = {};
return 0;
}
else
{
ctx.current_target = ctx.target_list.front();
return 1;
}
}
/**
* Move the collected responses from hpfs responses to a local response list.
*/
void hpfs_sync::swap_collected_responses()
{
// This logic will be added to a child class in next PBI.
std::scoped_lock lock(p2p::ctx.collected_msgs.contract_hpfs_responses_mutex);
// Move collected hpfs responses over to local candidate responses list.
if (!p2p::ctx.collected_msgs.contract_hpfs_responses.empty())
ctx.candidate_hpfs_responses.splice(ctx.candidate_hpfs_responses.end(), p2p::ctx.collected_msgs.contract_hpfs_responses);
}
} // namespace hpfs

View File

@@ -7,7 +7,7 @@
#include "../util/h32.hpp"
#include "../crypto.hpp"
namespace hpfs_sync
namespace hpfs
{
enum BACKLOG_ITEM_TYPE
@@ -30,14 +30,27 @@ namespace hpfs_sync
uint16_t waiting_time = 0;
};
struct sync_target
{
std::string name; // Used for logging.
util::h32 hash = util::h32_empty;
std::string vpath;
BACKLOG_ITEM_TYPE item_type = BACKLOG_ITEM_TYPE::DIR;
bool operator==(const sync_target &target) const
{
return this->hash == target.hash;
}
};
struct sync_context
{
// The current target hashes we are syncing towards.
util::h32 target_state_hash;
util::h32 target_patch_hash;
util::h32 current_parent_target_hash;
hpfs::HPFS_PARENT_COMPONENTS current_syncing_parent;
std::queue<sync_target> target_list;
// Store the originally submitted sync target list. This list is used to avoid submitting same list multiple times
// because target list is updated when the sync targets are acheived.
std::queue<sync_target> original_target_list;
sync_target current_target = {};
// List of sender pubkeys and hpfs responses(flatbuffer messages) to be processed.
std::list<std::pair<std::string, std::string>> candidate_hpfs_responses;
@@ -49,42 +62,58 @@ namespace hpfs_sync
std::unordered_map<std::string, backlog_item> submitted_requests;
std::thread hpfs_sync_thread;
std::shared_mutex target_state_mutex;
std::shared_mutex current_target_mutex;
std::atomic<bool> is_syncing = false;
std::atomic<bool> is_shutting_down = false;
};
extern sync_context ctx;
class hpfs_sync
{
private:
bool init_success = false;
uint16_t REQUEST_RESUBMIT_TIMEOUT; // No. of milliseconds to wait before resubmitting a request.
hpfs::hpfs_mount *fs_mount = NULL;
std::string name;
int init();
void hpfs_syncer_loop();
void deinit();
int request_loop(const util::h32 current_target, util::h32 &updated_state);
void set_target(const util::h32 target_state_hash, const util::h32 target_patch_hash);
int start_syncing_next_target();
void hpfs_syncer_loop();
protected:
virtual void on_current_sync_state_acheived();
virtual void swap_collected_responses(); // Must override in child classes.
int request_loop(const util::h32 current_target, util::h32 &updated_state);
public:
sync_context ctx;
bool validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entry_map);
int init(std::string_view name, hpfs::hpfs_mount *fs_mount);
bool validate_file_hashmap_hash(std::string_view vpath, std::string_view hash, const util::h32 *hashes, const size_t hash_count);
void deinit();
bool validate_file_block_hash(std::string_view hash, const uint32_t block_id, std::string_view buf);
void set_target(const std::queue<sync_target> &target_list);
bool should_stop_request_loop(const util::h32 current_target);
bool validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entry_map);
void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id,
const util::h32 expected_hash, std::string_view lcl, std::string &target_pubkey);
bool validate_file_hashmap_hash(std::string_view vpath, std::string_view hash, const util::h32 *hashes, const size_t hash_count);
void submit_request(const backlog_item &request, std::string_view lcl);
bool validate_file_block_hash(std::string_view hash, const uint32_t block_id, std::string_view buf);
int handle_fs_entry_response(std::string_view vpath, std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entry_map);
bool should_stop_request_loop(const util::h32 &current_target);
int handle_file_hashmap_response(std::string_view vpath, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length);
void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id,
const util::h32 expected_hash, std::string_view lcl, std::string &target_pubkey);
int handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf);
void submit_request(const backlog_item &request, std::string_view lcl);
} // namespace hpfs_sync
int handle_fs_entry_response(std::string_view vpath, std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entry_map);
int handle_file_hashmap_response(std::string_view vpath, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length);
int handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf);
};
} // namespace hpfs
#endif

View File

@@ -14,8 +14,6 @@
#include "consensus.hpp"
#include "ledger.hpp"
#include "hpfs/hpfs.hpp"
#include "hpfs/hpfs_sync.hpp"
#include "hpfs/hpfs_serve.hpp"
#include "unl.hpp"
/**
@@ -73,8 +71,6 @@ void deinit()
p2p::deinit();
read_req::deinit();
consensus::deinit();
hpfs_sync::deinit();
hpfs_serve::deinit();
hpfs::deinit();
ledger::deinit();
conf::deinit();
@@ -200,8 +196,6 @@ int main(int argc, char **argv)
LOG_INFO << "Contract: " << conf::cfg.contract.id << " (" << conf::cfg.contract.version << ")";
if (hpfs::init() == -1 ||
hpfs_serve::init() == -1 ||
hpfs_sync::init() == -1 ||
ledger::init() == -1 ||
unl::init() == -1 ||
consensus::init() == -1 ||

View File

@@ -93,6 +93,7 @@ table HistoryLedgerBlock {
}
table Hpfs_Request_Message { //Hpfs request message schema
mount_id: int32;
parent_path:string;
is_file:bool;
block_id:int32;
@@ -105,6 +106,7 @@ table Hpfs_Response_Message{
hpfs_response:Hpfs_Response;
hash:[ubyte];
path: string;
mount_id: int32;
}
table Fs_Entry_Response{

View File

@@ -1311,11 +1311,18 @@ inline flatbuffers::Offset<HistoryLedgerBlock> CreateHistoryLedgerBlockDirect(
struct Hpfs_Request_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef Hpfs_Request_MessageBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_PARENT_PATH = 4,
VT_IS_FILE = 6,
VT_BLOCK_ID = 8,
VT_EXPECTED_HASH = 10
VT_MOUNT_ID = 4,
VT_PARENT_PATH = 6,
VT_IS_FILE = 8,
VT_BLOCK_ID = 10,
VT_EXPECTED_HASH = 12
};
int32_t mount_id() const {
return GetField<int32_t>(VT_MOUNT_ID, 0);
}
bool mutate_mount_id(int32_t _mount_id) {
return SetField<int32_t>(VT_MOUNT_ID, _mount_id, 0);
}
const flatbuffers::String *parent_path() const {
return GetPointer<const flatbuffers::String *>(VT_PARENT_PATH);
}
@@ -1342,6 +1349,7 @@ struct Hpfs_Request_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<int32_t>(verifier, VT_MOUNT_ID) &&
VerifyOffset(verifier, VT_PARENT_PATH) &&
verifier.VerifyString(parent_path()) &&
VerifyField<uint8_t>(verifier, VT_IS_FILE) &&
@@ -1356,6 +1364,9 @@ struct Hpfs_Request_MessageBuilder {
typedef Hpfs_Request_Message Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_mount_id(int32_t mount_id) {
fbb_.AddElement<int32_t>(Hpfs_Request_Message::VT_MOUNT_ID, mount_id, 0);
}
void add_parent_path(flatbuffers::Offset<flatbuffers::String> parent_path) {
fbb_.AddOffset(Hpfs_Request_Message::VT_PARENT_PATH, parent_path);
}
@@ -1381,6 +1392,7 @@ struct Hpfs_Request_MessageBuilder {
inline flatbuffers::Offset<Hpfs_Request_Message> CreateHpfs_Request_Message(
flatbuffers::FlatBufferBuilder &_fbb,
int32_t mount_id = 0,
flatbuffers::Offset<flatbuffers::String> parent_path = 0,
bool is_file = false,
int32_t block_id = 0,
@@ -1389,12 +1401,14 @@ inline flatbuffers::Offset<Hpfs_Request_Message> CreateHpfs_Request_Message(
builder_.add_expected_hash(expected_hash);
builder_.add_block_id(block_id);
builder_.add_parent_path(parent_path);
builder_.add_mount_id(mount_id);
builder_.add_is_file(is_file);
return builder_.Finish();
}
inline flatbuffers::Offset<Hpfs_Request_Message> CreateHpfs_Request_MessageDirect(
flatbuffers::FlatBufferBuilder &_fbb,
int32_t mount_id = 0,
const char *parent_path = nullptr,
bool is_file = false,
int32_t block_id = 0,
@@ -1403,6 +1417,7 @@ inline flatbuffers::Offset<Hpfs_Request_Message> CreateHpfs_Request_MessageDirec
auto expected_hash__ = expected_hash ? _fbb.CreateVector<uint8_t>(*expected_hash) : 0;
return msg::fbuf::p2pmsg::CreateHpfs_Request_Message(
_fbb,
mount_id,
parent_path__,
is_file,
block_id,
@@ -1415,7 +1430,8 @@ struct Hpfs_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tabl
VT_HPFS_RESPONSE_TYPE = 4,
VT_HPFS_RESPONSE = 6,
VT_HASH = 8,
VT_PATH = 10
VT_PATH = 10,
VT_MOUNT_ID = 12
};
msg::fbuf::p2pmsg::Hpfs_Response hpfs_response_type() const {
return static_cast<msg::fbuf::p2pmsg::Hpfs_Response>(GetField<uint8_t>(VT_HPFS_RESPONSE_TYPE, 0));
@@ -1448,6 +1464,12 @@ struct Hpfs_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tabl
flatbuffers::String *mutable_path() {
return GetPointer<flatbuffers::String *>(VT_PATH);
}
int32_t mount_id() const {
return GetField<int32_t>(VT_MOUNT_ID, 0);
}
bool mutate_mount_id(int32_t _mount_id) {
return SetField<int32_t>(VT_MOUNT_ID, _mount_id, 0);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<uint8_t>(verifier, VT_HPFS_RESPONSE_TYPE) &&
@@ -1457,6 +1479,7 @@ struct Hpfs_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tabl
verifier.VerifyVector(hash()) &&
VerifyOffset(verifier, VT_PATH) &&
verifier.VerifyString(path()) &&
VerifyField<int32_t>(verifier, VT_MOUNT_ID) &&
verifier.EndTable();
}
};
@@ -1489,6 +1512,9 @@ struct Hpfs_Response_MessageBuilder {
void add_path(flatbuffers::Offset<flatbuffers::String> path) {
fbb_.AddOffset(Hpfs_Response_Message::VT_PATH, path);
}
void add_mount_id(int32_t mount_id) {
fbb_.AddElement<int32_t>(Hpfs_Response_Message::VT_MOUNT_ID, mount_id, 0);
}
explicit Hpfs_Response_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
@@ -1505,8 +1531,10 @@ inline flatbuffers::Offset<Hpfs_Response_Message> CreateHpfs_Response_Message(
msg::fbuf::p2pmsg::Hpfs_Response hpfs_response_type = msg::fbuf::p2pmsg::Hpfs_Response_NONE,
flatbuffers::Offset<void> hpfs_response = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash = 0,
flatbuffers::Offset<flatbuffers::String> path = 0) {
flatbuffers::Offset<flatbuffers::String> path = 0,
int32_t mount_id = 0) {
Hpfs_Response_MessageBuilder builder_(_fbb);
builder_.add_mount_id(mount_id);
builder_.add_path(path);
builder_.add_hash(hash);
builder_.add_hpfs_response(hpfs_response);
@@ -1519,7 +1547,8 @@ inline flatbuffers::Offset<Hpfs_Response_Message> CreateHpfs_Response_MessageDir
msg::fbuf::p2pmsg::Hpfs_Response hpfs_response_type = msg::fbuf::p2pmsg::Hpfs_Response_NONE,
flatbuffers::Offset<void> hpfs_response = 0,
const std::vector<uint8_t> *hash = nullptr,
const char *path = nullptr) {
const char *path = nullptr,
int32_t mount_id = 0) {
auto hash__ = hash ? _fbb.CreateVector<uint8_t>(*hash) : 0;
auto path__ = path ? _fbb.CreateString(path) : 0;
return msg::fbuf::p2pmsg::CreateHpfs_Response_Message(
@@ -1527,7 +1556,8 @@ inline flatbuffers::Offset<Hpfs_Response_Message> CreateHpfs_Response_MessageDir
hpfs_response_type,
hpfs_response,
hash__,
path__);
path__,
mount_id);
}
struct Fs_Entry_Response FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {

View File

@@ -4,7 +4,6 @@
#include "../../util/util.hpp"
#include "../../hplog.hpp"
#include "../../util/h32.hpp"
#include "../../hpfs/hpfs.hpp"
#include "../../unl.hpp"
#include "p2pmsg_container_generated.h"
#include "p2pmsg_content_generated.h"
@@ -275,7 +274,7 @@ namespace msg::fbuf::p2pmsg
const p2p::hpfs_request create_hpfs_request_from_msg(const Hpfs_Request_Message &msg)
{
p2p::hpfs_request hr;
hr.mount_id = msg.mount_id();
hr.block_id = msg.block_id();
hr.is_file = msg.is_file();
hr.parent_path = flatbuff_str_to_sv(msg.parent_path());
@@ -468,13 +467,14 @@ namespace msg::fbuf::p2pmsg
* @param container_builder Flatbuffer builder for the container message.
* @param hr The hpfs request struct to be placed in the container message.
*/
void create_msg_from_state_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::hpfs_request &hr, std::string_view lcl)
void create_msg_from_hpfs_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::hpfs_request &hr, std::string_view lcl)
{
flatbuffers::FlatBufferBuilder builder(1024);
flatbuffers::Offset<Hpfs_Request_Message> srmsg =
CreateHpfs_Request_Message(
builder,
hr.mount_id,
sv_to_flatbuff_str(builder, hr.parent_path),
hr.is_file,
hr.block_id,
@@ -497,7 +497,7 @@ namespace msg::fbuf::p2pmsg
* @param lcl Lcl to be include in the container msg.
*/
void create_msg_from_fsentry_response(
flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path,
flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, const int32_t mount_id,
std::vector<hpfs::child_hash_node> &hash_nodes, util::h32 expected_hash, std::string_view lcl)
{
flatbuffers::FlatBufferBuilder builder(1024);
@@ -511,7 +511,7 @@ namespace msg::fbuf::p2pmsg
builder, Hpfs_Response_Fs_Entry_Response,
resp.Union(),
hash_to_flatbuff_bytes(builder, expected_hash),
sv_to_flatbuff_str(builder, path));
sv_to_flatbuff_str(builder, path), mount_id);
flatbuffers::Offset<Content> message = CreateContent(builder, Message_Hpfs_Response_Message, st_resp.Union());
builder.Finish(message); // Finished building message content to get serialised content.
@@ -529,7 +529,7 @@ namespace msg::fbuf::p2pmsg
* @param lcl Lcl to be include in the container msg.
*/
void create_msg_from_filehashmap_response(
flatbuffers::FlatBufferBuilder &container_builder, std::string_view path,
flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, const int32_t mount_id,
std::vector<util::h32> &hashmap, std::size_t file_length, util::h32 expected_hash, std::string_view lcl)
{
// todo:get a average propsal message size and allocate content builder based on that.
@@ -548,7 +548,7 @@ namespace msg::fbuf::p2pmsg
Hpfs_Response_File_HashMap_Response,
resp.Union(),
hash_to_flatbuff_bytes(builder, expected_hash),
sv_to_flatbuff_str(builder, path));
sv_to_flatbuff_str(builder, path), mount_id);
flatbuffers::Offset<Content> message = CreateContent(builder, Message_Hpfs_Response_Message, st_resp.Union());
builder.Finish(message); // Finished building message content to get serialised content.
@@ -564,7 +564,7 @@ namespace msg::fbuf::p2pmsg
* @param block_resp Block response struct to place in the message
* @param lcl Lcl to be include in the container message.
*/
void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, std::string_view lcl)
void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, const int32_t mount_id, std::string_view lcl)
{
// todo:get a average propsal message size and allocate content builder based on that.
flatbuffers::FlatBufferBuilder builder(1024);
@@ -580,7 +580,7 @@ namespace msg::fbuf::p2pmsg
Hpfs_Response_Block_Response,
resp.Union(),
hash_to_flatbuff_bytes(builder, block_resp.hash),
sv_to_flatbuff_str(builder, block_resp.path));
sv_to_flatbuff_str(builder, block_resp.path), mount_id);
flatbuffers::Offset<Content> message = CreateContent(builder, Message_Hpfs_Response_Message, st_resp.Union());
builder.Finish(message); // Finished building message content to get serialised content.

View File

@@ -4,7 +4,7 @@
#include "../../pchheader.hpp"
#include "../../p2p/p2p.hpp"
#include "../../util/h32.hpp"
#include "../../hpfs/hpfs.hpp"
#include "../../hpfs/hpfs_mount.hpp"
#include "p2pmsg_container_generated.h"
#include "p2pmsg_content_generated.h"
@@ -55,17 +55,17 @@ namespace msg::fbuf::p2pmsg
void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builder, const std::string_view &msg, std::string_view lcl);
void create_msg_from_state_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::hpfs_request &hr, std::string_view lcl);
void create_msg_from_hpfs_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::hpfs_request &hr, std::string_view lcl);
void create_msg_from_fsentry_response(
flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path,
flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, const int32_t mount_id,
std::vector<hpfs::child_hash_node> &hash_nodes, util::h32 expected_hash, std::string_view lcl);
void create_msg_from_filehashmap_response(
flatbuffers::FlatBufferBuilder &container_builder, std::string_view path,
flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, const int32_t mount_id,
std::vector<util::h32> &hashmap, std::size_t file_length, util::h32 expected_hash, std::string_view lcl);
void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, std::string_view lcl);
void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, const int32_t mount_id, std::string_view lcl);
void create_containermsg_from_content(
flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, std::string_view lcl, const bool sign);

View File

@@ -14,8 +14,8 @@ namespace p2p
{
constexpr uint16_t PROPOSAL_LIST_CAP = 64; // Maximum proposal count.
constexpr uint16_t NONUNL_PROPOSAL_LIST_CAP = 64; // Maximum nonunl proposal count.
constexpr uint16_t STATE_REQ_LIST_CAP = 64; // Maximum state request count.
constexpr uint16_t STATE_RES_LIST_CAP = 64; // Maximum state response count.
constexpr uint16_t HPFS_REQ_LIST_CAP = 64; // Maximum state request count.
constexpr uint16_t HPFS_RES_LIST_CAP = 64; // Maximum state response count.
constexpr uint16_t PEER_LIST_CAP = 64; // Maximum peer count.
struct proposal
@@ -28,8 +28,8 @@ namespace p2p
uint8_t stage = 0;
std::string nonce; // Random nonce that is used to reduce lcl predictability.
std::string lcl;
util::h32 state_hash; // Contract state hash.
util::h32 patch_hash; // Patch file hash.
util::h32 state_hash; // Contract state hash.
util::h32 patch_hash; // Patch file hash.
std::set<std::string> users;
std::set<std::string> input_hashes;
std::string output_hash;
@@ -91,6 +91,7 @@ namespace p2p
// Represents a hpfs request sent to a peer.
struct hpfs_request
{
int32_t mount_id; // Relavent file system id.
std::string parent_path; // The requested file or dir path.
bool is_file = false; // Whether the path is a file or dir.
int32_t block_id = 0; // Block id of the file if we are requesting for file block. Otherwise -1.
@@ -122,13 +123,13 @@ namespace p2p
std::list<nonunl_proposal> nonunl_proposals;
std::mutex nonunl_proposals_mutex; // Mutex for non-unl proposals access race conditions.
// List of pairs indicating the session pubkey hex and the hpfs requests.
std::list<std::pair<std::string, std::string>> hpfs_requests;
std::mutex hpfs_requests_mutex; // Mutex for hpfs requests access race conditions.
// List of pairs indicating the session pubkey hex and the contract fs hpfs requests.
std::list<std::pair<std::string, p2p::hpfs_request>> contract_hpfs_requests;
std::mutex contract_hpfs_requests_mutex; // Mutex for contract fs hpfs requests access race conditions.
// List of pairs indicating the session pubkey hex and the hpfs responses.
std::list<std::pair<std::string, std::string>> hpfs_responses;
std::mutex hpfs_responses_mutex; // Mutex for hpfs responses access race conditions.
// List of pairs indicating the session pubkey hex and the contract fs hpfs responses.
std::list<std::pair<std::string, std::string>> contract_hpfs_responses;
std::mutex contract_hpfs_responses_mutex; // Mutex for contract fs hpfs responses access race conditions.
};
struct connected_context

View File

@@ -14,6 +14,7 @@
#include "peer_comm_session.hpp"
#include "p2p.hpp"
#include "../unl.hpp"
#include "../hpfs/hpfs.hpp"
namespace p2pmsg = msg::fbuf::p2pmsg;
@@ -192,36 +193,44 @@ namespace p2p
}
else if (content_message_type == p2pmsg::Message_Hpfs_Request_Message)
{
// Check the cap and insert request with lock.
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.hpfs_requests_mutex);
const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(content_ptr);
const p2p::hpfs_request hr = p2pmsg::create_hpfs_request_from_msg(*content->message_as_Hpfs_Request_Message());
if (hr.mount_id == hpfs::contract_fs.mount_id)
{
// Check the cap and insert request with lock.
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.contract_hpfs_requests_mutex);
// If max number of state requests reached skip the rest.
if (ctx.collected_msgs.hpfs_requests.size() < p2p::STATE_REQ_LIST_CAP)
{
std::string state_request_msg(reinterpret_cast<const char *>(content_ptr), content_size);
ctx.collected_msgs.hpfs_requests.push_back(std::make_pair(session.pubkey, std::move(state_request_msg)));
}
else
{
LOG_DEBUG << "State request rejected. Maximum state request count reached. " << session.display_name();
// If max number of state requests reached skip the rest.
if (ctx.collected_msgs.contract_hpfs_requests.size() < p2p::HPFS_REQ_LIST_CAP)
{
ctx.collected_msgs.contract_hpfs_requests.push_back(std::make_pair(session.pubkey, std::move(hr)));
}
else
{
LOG_DEBUG << "Hpfs contract fs request rejected. Maximum hpfs contract fs request count reached. " << session.display_name();
}
}
}
else if (content_message_type == p2pmsg::Message_Hpfs_Response_Message)
{
if (hpfs_sync::ctx.is_syncing) // Only accept state responses if state is syncing.
const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(content_ptr);
const msg::fbuf::p2pmsg::Hpfs_Response_Message *resp_msg = content->message_as_Hpfs_Response_Message();
// Only accept hpfs responses if hpfs fs is syncing.
if (hpfs::contract_sync.ctx.is_syncing && resp_msg->mount_id() == hpfs::contract_fs.mount_id)
{
// Check the cap and insert state_response with lock.
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.hpfs_responses_mutex);
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.contract_hpfs_responses_mutex);
// If max number of state responses reached skip the rest.
if (ctx.collected_msgs.hpfs_responses.size() < p2p::STATE_RES_LIST_CAP)
if (ctx.collected_msgs.contract_hpfs_responses.size() < p2p::HPFS_RES_LIST_CAP)
{
std::string response(reinterpret_cast<const char *>(content_ptr), content_size);
ctx.collected_msgs.hpfs_responses.push_back(std::make_pair(session.uniqueid, std::move(response)));
ctx.collected_msgs.contract_hpfs_responses.push_back(std::make_pair(session.uniqueid, std::move(response)));
}
else
{
LOG_DEBUG << "State response rejected. Maximum state response count reached. " << session.display_name();
LOG_DEBUG << "Contract hpfs response rejected. Maximum contract hpfs response count reached. " << session.display_name();
}
}
}

View File

@@ -49,5 +49,6 @@
#include <plog/Log.h>
#include <plog/Appenders/ColorConsoleAppender.h>
#include <sqlite3.h>
#include <queue>
#endif

View File

@@ -89,7 +89,7 @@ namespace sc
execv_args[j] = conf::cfg.contract.runtime_binexec_args[i].data();
execv_args[len - 1] = NULL;
const std::string current_dir = hpfs::physical_path(ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH);
const std::string current_dir = hpfs::contract_fs.physical_path(ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH);
chdir(current_dir.c_str());
execv(execv_args[0], execv_args);
@@ -156,8 +156,8 @@ namespace sc
if (!ctx.args.readonly)
ctx.args.hpfs_session_name = hpfs::RW_SESSION_NAME;
return ctx.args.readonly ? hpfs::start_ro_session(ctx.args.hpfs_session_name, false)
: hpfs::acquire_rw_session();
return ctx.args.readonly ? hpfs::contract_fs.start_ro_session(ctx.args.hpfs_session_name, false)
: hpfs::contract_fs.acquire_rw_session();
}
/**
@@ -165,36 +165,37 @@ namespace sc
*/
int stop_hpfs_session(execution_context &ctx)
{
hpfs::hpfs_mount &contract_fs = hpfs::contract_fs;
if (ctx.args.readonly)
{
return hpfs::stop_ro_session(ctx.args.hpfs_session_name);
return contract_fs.stop_ro_session(ctx.args.hpfs_session_name);
}
else
{
// Read the state hash if not in readonly mode.
if (hpfs::get_hash(ctx.args.post_execution_state_hash, ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH) < 1)
if (contract_fs.get_hash(ctx.args.post_execution_state_hash, ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH) < 1)
{
hpfs::release_rw_session();
contract_fs.release_rw_session();
return -1;
}
util::h32 patch_hash;
const int patch_hash_result = hpfs::get_hash(patch_hash, ctx.args.hpfs_session_name, hpfs::PATCH_FILE_PATH);
const int patch_hash_result = contract_fs.get_hash(patch_hash, ctx.args.hpfs_session_name, hpfs::PATCH_FILE_PATH);
if (patch_hash_result == -1)
{
hpfs::release_rw_session();
contract_fs.release_rw_session();
return -1;
}
else if (patch_hash_result == 1 && patch_hash != hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH))
else if (patch_hash_result == 1 && patch_hash != contract_fs.get_parent_hash(hpfs::PATCH_FILE_PATH))
{
// Update global hash tracker with the new patch file hash.
hpfs::ctx.set_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH, patch_hash);
// Update global hash tracker of contract fs with the new patch file hash.
contract_fs.set_parent_hash(hpfs::PATCH_FILE_PATH, patch_hash);
// Denote that the patch file was updated by the SC.
consensus::is_patch_update_pending = true;
}
return hpfs::release_rw_session();
return contract_fs.release_rw_session();
}
}

View File

@@ -8,7 +8,7 @@
#include "../hplog.hpp"
#include "../ledger.hpp"
#include "../util/buffer_store.hpp"
#include "../hpfs/hpfs.hpp"
#include "../hpfs/hpfs_mount.hpp"
#include "usr.hpp"
#include "user_session_handler.hpp"
#include "user_comm_session.hpp"
@@ -16,6 +16,7 @@
#include "user_input.hpp"
#include "read_req.hpp"
#include "input_nonce_map.hpp"
#include "../hpfs/hpfs.hpp"
namespace usr
{
@@ -391,7 +392,7 @@ namespace usr
util::fork_detach();
// before execution chdir into a valid the latest state data directory that contains an appbill.table
const std::string appbill_dir = conf::ctx.hpfs_rw_dir + hpfs::STATE_DIR_PATH;
const std::string appbill_dir = hpfs::contract_fs.rw_dir + hpfs::STATE_DIR_PATH;
chdir(appbill_dir.c_str());
int ret = execv(execv_args[0], execv_args);
std::cerr << errno << ": Appbill process execv failed.\n";