diff --git a/CMakeLists.txt b/CMakeLists.txt index 02e82797..afbaef35 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -37,6 +37,7 @@ add_executable(hpcore src/sc.cpp src/bill/corebill.cpp src/hpfs/hpfs.cpp + src/hpfs/hpfs_mount.cpp src/hpfs/hpfs_serve.cpp src/hpfs/hpfs_sync.cpp src/comm/comm_session.cpp diff --git a/src/conf.cpp b/src/conf.cpp index c138faec..2c125f86 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -667,7 +667,7 @@ namespace conf jsoncons::ojson jdoc; populate_contract_section_json(jdoc, cfg.contract, true); - const std::string patch_file_path = hpfs::physical_path(hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH); + const std::string patch_file_path = hpfs::contract_fs.physical_path(hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH); return write_json_file(patch_file_path, jdoc); } @@ -679,7 +679,7 @@ namespace conf */ int apply_patch_config(std::string_view hpfs_session_name) { - const std::string path = hpfs::physical_path(hpfs_session_name, hpfs::PATCH_FILE_PATH); + const std::string path = hpfs::contract_fs.physical_path(hpfs_session_name, hpfs::PATCH_FILE_PATH); if (!util::is_file_exists(path)) return 0; diff --git a/src/consensus.cpp b/src/consensus.cpp index b59bedfa..8e6cb1d7 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -113,8 +113,9 @@ namespace consensus // Get current lcl and state. std::string lcl = ledger::ctx.get_lcl(); const uint64_t lcl_seq_no = ledger::ctx.get_seq_no(); - util::h32 state_hash = hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::STATE); - util::h32 patch_hash = hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH); + hpfs::hpfs_mount &contract_fs = hpfs::contract_fs; // Ref of the contract_fs object. + util::h32 state_hash = contract_fs.get_parent_hash(hpfs::STATE_DIR_PATH); + util::h32 patch_hash = contract_fs.get_parent_hash(hpfs::PATCH_FILE_PATH); if (ctx.stage == 0) { @@ -199,11 +200,21 @@ namespace consensus if (is_patch_desync) is_patch_update_pending = false; - // Start hpfs sync if we are out-of-sync with majority hpfs state. + // Start hpfs sync if we are out-of-sync with majority hpfs patch hash or state hash. if (is_state_desync || is_patch_desync) { conf::change_role(conf::ROLE::OBSERVER); - hpfs_sync::set_target(majority_state_hash, majority_patch_hash); + + // This queue holds all the sync targets which needs to get synced in contract fs. + std::queue sync_target_list; + if (is_patch_desync) + sync_target_list.push(hpfs::sync_target{"patch", majority_patch_hash, hpfs::PATCH_FILE_PATH, hpfs::BACKLOG_ITEM_TYPE::FILE}); + + if (is_state_desync) + sync_target_list.push(hpfs::sync_target{"state", majority_state_hash, hpfs::STATE_DIR_PATH, hpfs::BACKLOG_ITEM_TYPE::DIR}); + + // Set sync targets for contract fs. + hpfs::contract_sync.set_target(std::move(sync_target_list)); } // Proceed further only if both lcl and state are in sync with majority. @@ -213,7 +224,7 @@ namespace consensus return 0; } - // lcl, hpfs or unl desync. + // lcl or hpfs desync. return -1; } @@ -227,7 +238,7 @@ namespace consensus */ void check_sync_completion() { - if (conf::cfg.node.role == conf::ROLE::OBSERVER && !hpfs_sync::ctx.is_syncing && !ledger::sync_ctx.is_syncing) + if (conf::cfg.node.role == conf::ROLE::OBSERVER && !hpfs::contract_sync.ctx.is_syncing && !ledger::sync_ctx.is_syncing) conf::change_role(conf::ROLE::VALIDATOR); } @@ -759,7 +770,7 @@ namespace consensus } } - is_state_desync = (hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::STATE) != majority_state_hash); + is_state_desync = (hpfs::contract_fs.get_parent_hash(hpfs::STATE_DIR_PATH) != majority_state_hash); } /** @@ -785,7 +796,7 @@ namespace consensus } } - is_patch_desync = (hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH) != majority_patch_hash); + is_patch_desync = (hpfs::contract_fs.get_parent_hash(hpfs::PATCH_FILE_PATH) != majority_patch_hash); } /** @@ -871,7 +882,8 @@ namespace consensus return -1; } - hpfs::ctx.set_hash(hpfs::HPFS_PARENT_COMPONENTS::STATE, args.post_execution_state_hash); + // Update state hash in contract fs global hash tracker. + hpfs::contract_fs.set_parent_hash(hpfs::STATE_DIR_PATH, args.post_execution_state_hash); new_state_hash = args.post_execution_state_hash; extract_user_outputs_from_contract_bufmap(args.userbufs); @@ -1040,16 +1052,18 @@ namespace consensus */ int apply_consensed_patch_file_changes(const util::h32 &prop_patch_hash, const util::h32 ¤t_patch_hash) { + hpfs::hpfs_mount &contract_fs = hpfs::contract_fs; + // Check whether is there any patch changes to be applied which reached consensus. if (is_patch_update_pending && current_patch_hash == prop_patch_hash) { - if (hpfs::start_ro_session(HPFS_SESSION_NAME, false) != -1) + if (contract_fs.start_ro_session(HPFS_SESSION_NAME, false) != -1) { // Appling new patch file changes to hpcore runtime. if (conf::apply_patch_config(HPFS_SESSION_NAME) == -1) { LOG_ERROR << "Appling patch file changes after consensus failed."; - hpfs::stop_ro_session(HPFS_SESSION_NAME); + contract_fs.stop_ro_session(HPFS_SESSION_NAME); return -1; } else @@ -1059,7 +1073,7 @@ namespace consensus } } - if (hpfs::stop_ro_session(HPFS_SESSION_NAME) == -1) + if (contract_fs.stop_ro_session(HPFS_SESSION_NAME) == -1) return -1; } return 0; diff --git a/src/hpfs/hpfs.cpp b/src/hpfs/hpfs.cpp index 779ac1a5..0c142070 100644 --- a/src/hpfs/hpfs.cpp +++ b/src/hpfs/hpfs.cpp @@ -1,360 +1,47 @@ -#include "hpfs.hpp" +#include "./hpfs.hpp" #include "../conf.hpp" -#include "../hplog.hpp" -#include "../util/util.hpp" -#include "../util/h32.hpp" -#include "../sc.hpp" +#include "./hpfs_serve.hpp" namespace hpfs { - constexpr const char *TRACE_ARG_ERROR = "trace=error"; - constexpr const char *TRACE_ARG_DEBUG = "trace=error"; - constexpr const char *RW_SESSION = "/::hpfs.rw.hmap"; - constexpr const char *RO_SESSION = "/::hpfs.ro."; - constexpr const char *RO_SESSION_HMAP = "/::hpfs.ro.hmap."; - constexpr const char *HMAP_HASH = "::hpfs.hmap.hash"; - constexpr const char *HMAP_CHILDREN = "::hpfs.hmap.children"; - constexpr ino_t ROOT_INO = 1; - - constexpr uint16_t PROCESS_INIT_TIMEOUT = 2000; - constexpr uint16_t INIT_CHECK_INTERVAL = 20; - bool init_success = false; - hpfs_context ctx; + hpfs::hpfs_mount contract_fs; // Global contract file system instance. + hpfs::hpfs_sync contract_sync; // Global contract file system sync instance. + hpfs::hpfs_serve contract_serve; /** - * Performs system startup activitites related to hpfs execution. - */ + * Initialize necessary file system mounts to hpcore. + */ int init() { - if (start_hpfs_process(ctx.hpfs_pid) == -1) - return -1; - - if (prepare_fs() == -1) + if (contract_fs.init(CONTRACT_FS_ID, conf::ctx.hpfs_dir, conf::ctx.hpfs_mount_dir, conf::ctx.hpfs_rw_dir, conf::cfg.node.full_history) == -1) { - util::kill_process(ctx.hpfs_pid, true); + LOG_ERROR << "Contract file system initialization failed."; + return -1; + } + + if (contract_serve.init("contract", &contract_fs) == -1) + { + LOG_ERROR << "Contract file system serve worker initialization failed."; + return -1; + } + + if (contract_sync.init("contract", &contract_fs) == -1) + { + LOG_ERROR << "Contract file system sync worker initialization failed."; return -1; } - init_success = true; return 0; } /** - * Performs global cleanup related to hpfs execution. - */ + * Perform cleanups on created mounts. + */ void deinit() { - if (init_success) - { - LOG_DEBUG << "Stopping hpfs process... pid:" << ctx.hpfs_pid; - if (ctx.hpfs_pid > 0 && util::kill_process(ctx.hpfs_pid, true) == 0) - LOG_INFO << "Stopped hpfs process."; - } - } - - /** - * Performs initial patch file population and loads initial hashes for later use. - * During startup, we always populate patch.cfg with current values from hp.cfg. - * @return 0 on success. -1 on failure. - */ - int prepare_fs() - { - util::h32 initial_state_hash; - util::h32 initial_patch_hash; - - if (acquire_rw_session() == -1 || - conf::populate_patch_config() == -1 || - get_hash(initial_state_hash, RW_SESSION_NAME, hpfs::STATE_DIR_PATH) == -1 || - get_hash(initial_patch_hash, RW_SESSION_NAME, hpfs::PATCH_FILE_PATH) == -1 || - release_rw_session() == -1) - { - LOG_ERROR << "Failed to prepare initial fs."; - return -1; - } - - ctx.set_hash(HPFS_PARENT_COMPONENTS::STATE, initial_state_hash); - ctx.set_hash(HPFS_PARENT_COMPONENTS::PATCH, initial_patch_hash); - LOG_INFO << "Initial state: " << initial_state_hash << " | patch: " << initial_patch_hash; - return 0; - } - - /** - * Starts the hpfs process used for all fs sessions. - */ - int start_hpfs_process(pid_t &hpfs_pid) - { - const pid_t pid = fork(); - if (pid > 0) - { - // HotPocket process. - - LOG_DEBUG << "Starting hpfs process."; - - // Wait until hpfs is initialized properly. - const uint16_t max_retries = PROCESS_INIT_TIMEOUT / INIT_CHECK_INTERVAL; - bool hpfs_initialized = false; - uint16_t retry_count = 0; - do - { - util::sleep(INIT_CHECK_INTERVAL); - - // Check if hpfs process is still running. - // Sending signal 0 to test whether process exist. - if (util::kill_process(pid, false, 0) == -1) - { - LOG_ERROR << "hpfs process " << pid << " has stopped."; - break; - } - - // We check for the specific inode no. of the mounted root dir. That means hpfs FUSE interface is up. - struct stat st; - if (stat(conf::ctx.hpfs_mount_dir.c_str(), &st) == -1) - { - LOG_ERROR << errno << ": Error in checking hpfs status."; - break; - } - - hpfs_initialized = (st.st_ino == ROOT_INO); - // Keep retrying until root inode no. matches or timeout occurs. - - } while (!hpfs_initialized && ++retry_count <= max_retries); - - // Kill the process if hpfs couldn't be initialized properly. - if (!hpfs_initialized) - { - LOG_ERROR << "Couldn't initialize hpfs process."; - util::kill_process(pid, true); - return -1; - } - - hpfs_pid = pid; - LOG_DEBUG << "hpfs process started. pid:" << hpfs_pid; - } - else if (pid == 0) - { - // hpfs process. - util::fork_detach(); - - const char *active_hpfs_trace_arg = (conf::cfg.log.loglevel_type == conf::LOG_SEVERITY::DEBUG ? TRACE_ARG_DEBUG : TRACE_ARG_ERROR); - - // Fill process args. - char *execv_args[] = { - conf::ctx.hpfs_exe_path.data(), - (char *)"fs", - conf::ctx.hpfs_dir.data(), - conf::ctx.hpfs_mount_dir.data(), - // In full history mode, we disable log merge of hpfs. - (char *)(conf::cfg.node.full_history ? "merge=false" : "merge=true"), - (char *)active_hpfs_trace_arg, - NULL}; - - const int ret = execv(execv_args[0], execv_args); - std::cerr << errno << ": hpfs process execv failed.\n"; - exit(1); - } - else - { - LOG_ERROR << errno << ": fork() failed when starting hpfs process."; - return -1; - } - - return 0; - } - - /** - * Starts a virtual fs ReadWrite session with hash map enabled. - * If RW session already started, this will simply acquire a consumer reference. - * @return 0 on success. -1 on failure. - */ - int acquire_rw_session() - { - std::scoped_lock lock(ctx.rw_mutex); - - LOG_DEBUG << "Starting hpfs rw session at " << conf::ctx.hpfs_rw_dir; - - const std::string session_file = conf::ctx.hpfs_mount_dir + RW_SESSION; - - // The sessions creation either should be succesful or should report as already exists (errno=EEXIST). - // Otherwise we consider it as failure. - if (mknod(session_file.c_str(), 0, 0) == -1 && errno != EEXIST) - { - LOG_ERROR << errno << ": Error starting hpfs rw session at " << conf::ctx.hpfs_rw_dir; - return -1; - } - ctx.rw_consumers++; - return 0; - } - - /** - * Releases a consumer reference to the RW session. If there are no more references, - * actually stops the running RW session. - * @return 0 on success. -1 on failure. - */ - int release_rw_session() - { - std::scoped_lock lock(ctx.rw_mutex); - - if (ctx.rw_consumers > 0) - ctx.rw_consumers--; - - if (ctx.rw_consumers == 0) - { - const std::string session_file = conf::ctx.hpfs_mount_dir + RW_SESSION; - if (unlink(session_file.c_str()) == -1) - { - LOG_ERROR << errno << ": Error stopping hpfs rw session at " << conf::ctx.hpfs_rw_dir; - return -1; - } - } - return 0; - } - - /** - * Starts a virtual fs ReadOnly session. - * @return 0 on success. -1 on failure. - */ - int start_ro_session(const std::string &name, const bool hmap_enabled) - { - LOG_DEBUG << "Starting hpfs ro session " << name << " hmap:" << hmap_enabled; - - const std::string session_file = conf::ctx.hpfs_mount_dir + (hmap_enabled ? RO_SESSION_HMAP : RO_SESSION) + name; - if (mknod(session_file.c_str(), 0, 0) == -1) - { - LOG_ERROR << errno << ": Error starting hpfs ro session " << name; - return -1; - } - return 0; - } - - /** - * Stops the specified ReadOnly fs session. - * @return 0 on success. -1 on failure. - */ - int stop_ro_session(const std::string &name) - { - LOG_DEBUG << "Stopping hpfs ro session " << name; - - const std::string session_file = conf::ctx.hpfs_mount_dir + RO_SESSION + name; - if (unlink(session_file.c_str()) == -1) - { - LOG_ERROR << errno << ": Error stopping hpfs ro session " << name; - return -1; - } - return 0; - } - - /** - * Populates the hash of the specified vpath. - * @return 1 on success. 0 if vpath not found. -1 on error. - */ - int get_hash(util::h32 &hash, std::string_view session_name, std::string_view vpath) - { - const std::string path = physical_path(session_name, std::string(vpath).append(HMAP_HASH)); - const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC); - if (fd == -1 && errno == ENOENT) - { - LOG_DEBUG << "Cannot get hash. vpath not found. " << vpath; - return 0; - } - else if (fd == -1) - { - LOG_ERROR << errno << ": Error opening hash file. " << vpath; - return -1; - } - - const int res = read(fd, &hash, sizeof(util::h32)); - close(fd); - if (res == -1) - { - LOG_ERROR << errno << ": Error reading hash file. " << vpath; - return -1; - } - return 1; - } - - /** - * Populates the list of file block hashes for the specified vpath. - * @return 1 on success. 0 if vpath not found. -1 on error. - */ - int get_file_block_hashes(std::vector &hashes, std::string_view session_name, std::string_view vpath) - { - const std::string path = physical_path(session_name, std::string(vpath).append(HMAP_CHILDREN)); - const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC); - if (fd == -1 && errno == ENOENT) - { - LOG_DEBUG << "Cannot get file block hashes. vpath not found. " << vpath; - return 0; - } - else if (fd == -1) - { - LOG_DEBUG << errno << ": Error opening hashmap children. " << vpath; - return -1; - } - - struct stat st; - if (fstat(fd, &st) == -1) - { - close(fd); - LOG_ERROR << errno << ": Error reading block hashes length. " << vpath; - return -1; - } - - const int children_count = st.st_size / sizeof(util::h32); - hashes.resize(children_count); - - const int res = read(fd, hashes.data(), st.st_size); - close(fd); - if (res == -1) - { - LOG_ERROR << errno << ": Error reading block hashes. " << vpath; - return -1; - } - return 1; - } - - /** - * Populates the list of dir entry hashes for the specified vpath. - * @return 1 on success. 0 if vpath not found. -1 on error. - */ - int get_dir_children_hashes(std::vector &hash_nodes, std::string_view session_name, std::string_view dir_vpath) - { - const std::string path = physical_path(session_name, std::string(dir_vpath).append(HMAP_CHILDREN)); - const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC); - if (fd == -1 && errno == ENOENT) - { - LOG_DEBUG << "Cannot get dir children hashes. Dir vpath not found. " << dir_vpath; - return 0; - } - else if (fd == -1) - { - LOG_ERROR << errno << ": Error opening dir hash children nodes. " << dir_vpath; - return -1; - } - - struct stat st; - if (fstat(fd, &st) == -1) - { - close(fd); - LOG_ERROR << errno << ": Error reading hash children nodes length. " << dir_vpath; - return -1; - } - - const int children_count = st.st_size / sizeof(child_hash_node); - hash_nodes.resize(children_count); - - const int res = read(fd, hash_nodes.data(), st.st_size); - close(fd); - if (res == -1) - { - LOG_ERROR << errno << ": Error reading hash children nodes. " << dir_vpath; - return -1; - } - return 1; - } - - const std::string physical_path(std::string_view session_name, std::string_view vpath) - { - return conf::ctx.hpfs_mount_dir + "/" + session_name.data() + vpath.data(); + contract_fs.deinit(); + contract_serve.deinit(); + contract_sync.deinit(); } } // namespace hpfs \ No newline at end of file diff --git a/src/hpfs/hpfs.hpp b/src/hpfs/hpfs.hpp index 726421b2..36678058 100644 --- a/src/hpfs/hpfs.hpp +++ b/src/hpfs/hpfs.hpp @@ -1,91 +1,17 @@ -#ifndef _HP_HPFS_HPFS_ -#define _HP_HPFS_HPFS_ +#ifndef _HP_HPFS_HPFS +#define _HP_HPFS_HPFS -#include "../pchheader.hpp" -#include "../util/h32.hpp" -#include "../conf.hpp" +#include "./hpfs_mount.hpp" +#include "./hpfs_sync.hpp" namespace hpfs { - constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB; - constexpr const char *RW_SESSION_NAME = "rw"; // The built-in session name used by hpfs for RW sessions. - constexpr const char *STATE_DIR_PATH = "/state"; // State directory name. - constexpr const char *PATCH_FILE_PATH = "/patch.cfg"; // Config patch filename. - - struct child_hash_node - { - bool is_file = false; - char name[256]; - util::h32 hash; - - child_hash_node() - { - memset(name, 0, sizeof(name)); - } - }; - - inline uint16_t get_request_resubmit_timeout() - { - return conf::cfg.contract.roundtime; - } - - enum HPFS_PARENT_COMPONENTS - { - STATE, - PATCH - }; - - struct hpfs_context - { - private: - std::vector parent_hashes; // Keep hashes of each hpfs parent. - std::shared_mutex parent_mutexes[2] = {std::shared_mutex(), std::shared_mutex()}; // Mutexes for each parent. - - public: - pid_t hpfs_pid = 0; - - // No. of consumers for RW session. - // We use this as a reference counting mechanism to cleanup RW session when no one requires it. - uint32_t rw_consumers = 0; - std::mutex rw_mutex; - - hpfs_context() - { - parent_hashes.reserve(2); - for (size_t i = 0; i < 2; i++) - { - parent_hashes.push_back(util::h32_empty); - } - } - - util::h32 get_hash(const HPFS_PARENT_COMPONENTS parent) - { - std::shared_lock lock(parent_mutexes[parent]); - return parent_hashes[parent]; - } - - void set_hash(const HPFS_PARENT_COMPONENTS parent, util::h32 new_state) - { - std::unique_lock lock(parent_mutexes[parent]); - parent_hashes[parent] = new_state; - } - }; - - extern hpfs_context ctx; + constexpr int32_t CONTRACT_FS_ID = 0; + extern hpfs::hpfs_mount contract_fs; // Global contract file system instance. + extern hpfs::hpfs_sync contract_sync; // Global contract file system sync instance. int init(); void deinit(); - int prepare_fs(); - int start_hpfs_process(pid_t &hpfs_pid); - int acquire_rw_session(); - int release_rw_session(); - int start_ro_session(const std::string &name, const bool hmap_enabled); - int stop_ro_session(const std::string &name); - int get_hash(util::h32 &hash, std::string_view session_name, std::string_view vpath); - int get_file_block_hashes(std::vector &hashes, std::string_view session_name, std::string_view vpath); - int get_dir_children_hashes(std::vector &hash_nodes, std::string_view session_name, std::string_view dir_vpath); - const std::string physical_path(std::string_view session_name, std::string_view vpath); } // namespace hpfs - #endif \ No newline at end of file diff --git a/src/hpfs/hpfs_mount.cpp b/src/hpfs/hpfs_mount.cpp new file mode 100644 index 00000000..8b0fa9f7 --- /dev/null +++ b/src/hpfs/hpfs_mount.cpp @@ -0,0 +1,400 @@ +#include "hpfs_mount.hpp" +#include "../conf.hpp" +#include "../hplog.hpp" +#include "../util/util.hpp" +#include "../util/h32.hpp" +#include "../sc.hpp" + +namespace hpfs +{ + constexpr const char *TRACE_ARG_ERROR = "trace=error"; + // Trace is set to error intentionally to prevent log pollution in debug mode. Change this in hpfs specific debugging. + constexpr const char *TRACE_ARG_DEBUG = "trace=error"; + constexpr const char *RW_SESSION = "/::hpfs.rw.hmap"; + constexpr const char *RO_SESSION = "/::hpfs.ro."; + constexpr const char *RO_SESSION_HMAP = "/::hpfs.ro.hmap."; + constexpr const char *HMAP_HASH = "::hpfs.hmap.hash"; + constexpr const char *HMAP_CHILDREN = "::hpfs.hmap.children"; + constexpr ino_t ROOT_INO = 1; + + constexpr uint16_t PROCESS_INIT_TIMEOUT = 2000; + constexpr uint16_t INIT_CHECK_INTERVAL = 20; + + /** + * This should be called to activate the hpfs mount process. + */ + int hpfs_mount::init(const int32_t mount_id, std::string_view fs_dir, std::string_view mount_dir, std::string_view rw_dir, const bool is_full_history) + { + this->mount_id = mount_id; + this->fs_dir = fs_dir; + this->mount_dir = mount_dir; + this->rw_dir = rw_dir; + this->is_full_history = is_full_history; + if (start_hpfs_process() == -1) + return -1; + + if (prepare_fs() == -1) + { + util::kill_process(hpfs_pid, true); + return -1; + } + + init_success = true; + return 0; + } + + /** + * Performs cleanup related to hpfs mount execution. + */ + void hpfs_mount::deinit() + { + if (init_success) + { + LOG_DEBUG << "Stopping hpfs process... pid:" << hpfs_pid; + if (hpfs_pid > 0 && util::kill_process(hpfs_pid, true) == 0) + LOG_INFO << "Stopped hpfs process."; + } + } + + /** + * This perform file system preparation tasks. + * @return 0 on success. -1 on failure. + */ + int hpfs_mount::prepare_fs() + { + // This contract mount specific preparation logic will be moved to a seprate child class in the next PBI. + util::h32 initial_state_hash; + util::h32 initial_patch_hash; + + if (acquire_rw_session() == -1 || + conf::populate_patch_config() == -1 || + get_hash(initial_state_hash, RW_SESSION_NAME, hpfs::STATE_DIR_PATH) == -1 || + get_hash(initial_patch_hash, RW_SESSION_NAME, hpfs::PATCH_FILE_PATH) == -1 || + release_rw_session() == -1) + { + LOG_ERROR << "Failed to prepare initial fs at mount " << mount_dir << "."; + return -1; + } + + set_parent_hash(hpfs::STATE_DIR_PATH, initial_state_hash); + set_parent_hash(hpfs::PATCH_FILE_PATH, initial_patch_hash); + LOG_INFO << "Initial state: " << initial_state_hash << " | patch: " << initial_patch_hash; + return 0; + } + + /** + * Starts the hpfs process used for all fs sessions of the mount. + */ + int hpfs_mount::start_hpfs_process() + { + const pid_t pid = fork(); + if (pid > 0) + { + // HotPocket process. + + LOG_DEBUG << "Starting hpfs process at " << mount_dir << "."; + + // Wait until hpfs is initialized properly. + const uint16_t max_retries = PROCESS_INIT_TIMEOUT / INIT_CHECK_INTERVAL; + bool hpfs_initialized = false; + uint16_t retry_count = 0; + do + { + util::sleep(INIT_CHECK_INTERVAL); + + // Check if hpfs process is still running. + // Sending signal 0 to test whether process exist. + if (util::kill_process(pid, false, 0) == -1) + { + LOG_ERROR << "hpfs process " << pid << " has stopped."; + break; + } + + // We check for the specific inode no. of the mounted root dir. That means hpfs FUSE interface is up. + struct stat st; + if (stat(mount_dir.data(), &st) == -1) + { + LOG_ERROR << errno << ": Error in checking hpfs status at mount " << mount_dir << "."; + break; + } + + hpfs_initialized = (st.st_ino == ROOT_INO); + // Keep retrying until root inode no. matches or timeout occurs. + + } while (!hpfs_initialized && ++retry_count <= max_retries); + + // Kill the process if hpfs couldn't be initialized properly. + if (!hpfs_initialized) + { + LOG_ERROR << "Couldn't initialize hpfs process at mount " << mount_dir << "."; + util::kill_process(pid, true); + return -1; + } + + hpfs_pid = pid; + LOG_DEBUG << "hpfs process started. pid:" << hpfs_pid; + } + else if (pid == 0) + { + // hpfs process. + util::fork_detach(); + + const char *active_hpfs_trace_arg = (conf::cfg.log.loglevel_type == conf::LOG_SEVERITY::DEBUG ? TRACE_ARG_DEBUG : TRACE_ARG_ERROR); + + // Fill process args. + char *execv_args[] = { + conf::ctx.hpfs_exe_path.data(), + (char *)"fs", + (char *)fs_dir.data(), + (char *)mount_dir.data(), + // In full history mode, we disable log merge of hpfs. + (char *)(is_full_history ? "merge=false" : "merge=true"), + (char *)active_hpfs_trace_arg, + NULL}; + + const int ret = execv(execv_args[0], execv_args); + std::cerr << errno << ": hpfs process execv failed at mount " << mount_dir << ".\n"; + exit(1); + } + else + { + LOG_ERROR << errno << ": fork() failed when starting hpfs process at mount " << mount_dir << "."; + return -1; + } + + return 0; + } + + /** + * Starts a virtual fs ReadWrite session with hash map enabled. + * If RW session already started, this will simply acquire a consumer reference. + * @return 0 on success. -1 on failure. + */ + int hpfs_mount::acquire_rw_session() + { + std::scoped_lock lock(rw_mutex); + + LOG_DEBUG << "Starting hpfs rw session at " << rw_dir; + + const std::string session_file = mount_dir + RW_SESSION; + + // The sessions creation either should be succesful or should report as already exists (errno=EEXIST). + // Otherwise we consider it as failure. + if (mknod(session_file.c_str(), 0, 0) == -1 && errno != EEXIST) + { + LOG_ERROR << errno << ": Error starting hpfs rw session at " << rw_dir; + return -1; + } + rw_consumers++; + return 0; + } + + /** + * Releases a consumer reference to the RW session. If there are no more references, + * actually stops the running RW session. + * @return 0 on success. -1 on failure. + */ + int hpfs_mount::release_rw_session() + { + std::scoped_lock lock(rw_mutex); + + if (rw_consumers > 0) + rw_consumers--; + + if (rw_consumers == 0) + { + const std::string session_file = mount_dir + RW_SESSION; + if (unlink(session_file.c_str()) == -1) + { + LOG_ERROR << errno << ": Error stopping hpfs rw session at " << rw_dir; + return -1; + } + } + return 0; + } + + /** + * Starts a virtual fs ReadOnly session. + * @return 0 on success. -1 on failure. + */ + int hpfs_mount::start_ro_session(const std::string &name, const bool hmap_enabled) + { + LOG_DEBUG << "Starting hpfs ro session " << name << " hmap:" << hmap_enabled; + + const std::string session_file = mount_dir + (hmap_enabled ? RO_SESSION_HMAP : RO_SESSION) + name; + if (mknod(session_file.c_str(), 0, 0) == -1) + { + LOG_ERROR << errno << ": Error starting hpfs ro session " << name; + return -1; + } + return 0; + } + + /** + * Stops the specified ReadOnly fs session. + * @return 0 on success. -1 on failure. + */ + int hpfs_mount::stop_ro_session(const std::string &name) + { + LOG_DEBUG << "Stopping hpfs ro session " << name; + + const std::string session_file = mount_dir + RO_SESSION + name; + if (unlink(session_file.c_str()) == -1) + { + LOG_ERROR << errno << ": Error stopping hpfs ro session " << name; + return -1; + } + return 0; + } + + /** + * Populates the hash of the specified vpath. + * @return 1 on success. 0 if vpath not found. -1 on error. + */ + int hpfs_mount::get_hash(util::h32 &hash, std::string_view session_name, std::string_view vpath) + { + const std::string path = physical_path(session_name, std::string(vpath).append(HMAP_HASH)); + const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC); + if (fd == -1 && errno == ENOENT) + { + LOG_DEBUG << "Cannot get hash. vpath not found. " << vpath; + return 0; + } + else if (fd == -1) + { + LOG_ERROR << errno << ": Error opening hash file. " << vpath; + return -1; + } + + const int res = read(fd, &hash, sizeof(util::h32)); + close(fd); + if (res == -1) + { + LOG_ERROR << errno << ": Error reading hash file. " << vpath; + return -1; + } + return 1; + } + + /** + * Populates the list of file block hashes for the specified vpath. + * @return 1 on success. 0 if vpath not found. -1 on error. + */ + int hpfs_mount::get_file_block_hashes(std::vector &hashes, std::string_view session_name, std::string_view vpath) + { + const std::string path = physical_path(session_name, std::string(vpath).append(HMAP_CHILDREN)); + const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC); + if (fd == -1 && errno == ENOENT) + { + LOG_DEBUG << "Cannot get file block hashes. vpath not found. " << vpath; + return 0; + } + else if (fd == -1) + { + LOG_DEBUG << errno << ": Error opening hashmap children. " << vpath; + return -1; + } + + struct stat st; + if (fstat(fd, &st) == -1) + { + close(fd); + LOG_ERROR << errno << ": Error reading block hashes length. " << vpath; + return -1; + } + + const int children_count = st.st_size / sizeof(util::h32); + hashes.resize(children_count); + + const int res = read(fd, hashes.data(), st.st_size); + close(fd); + if (res == -1) + { + LOG_ERROR << errno << ": Error reading block hashes. " << vpath; + return -1; + } + return 1; + } + + /** + * Populates the list of dir entry hashes for the specified vpath. + * @return 1 on success. 0 if vpath not found. -1 on error. + */ + int hpfs_mount::get_dir_children_hashes(std::vector &hash_nodes, std::string_view session_name, std::string_view dir_vpath) + { + const std::string path = physical_path(session_name, std::string(dir_vpath).append(HMAP_CHILDREN)); + const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC); + if (fd == -1 && errno == ENOENT) + { + LOG_DEBUG << "Cannot get dir children hashes. Dir vpath not found. " << dir_vpath; + return 0; + } + else if (fd == -1) + { + LOG_ERROR << errno << ": Error opening dir hash children nodes. " << dir_vpath; + return -1; + } + + struct stat st; + if (fstat(fd, &st) == -1) + { + close(fd); + LOG_ERROR << errno << ": Error reading hash children nodes length. " << dir_vpath; + return -1; + } + + const int children_count = st.st_size / sizeof(child_hash_node); + hash_nodes.resize(children_count); + + const int res = read(fd, hash_nodes.data(), st.st_size); + close(fd); + if (res == -1) + { + LOG_ERROR << errno << ": Error reading hash children nodes. " << dir_vpath; + return -1; + } + return 1; + } + + const std::string hpfs_mount::physical_path(std::string_view session_name, std::string_view vpath) + { + return mount_dir + "/" + session_name.data() + vpath.data(); + } + + /** + * This returns the hash of a given parent. + * @param parent_vpath vpath of the parent file or directory. + * @return Returns the hash of the given vpath if available or + * an empth h32 hash if parent vpath not available. + */ + const util::h32 hpfs_mount::get_parent_hash(const std::string &parent_vpath) + { + std::shared_lock lock(parent_hashes_mutex); + const auto itr = parent_hashes.find(parent_vpath); + if (itr == parent_hashes.end()) + { + return util::h32_empty; // Looking parent_vpath is not found. + } + return itr->second; + } + + /** + * This set the hash of a given parent. + * @param parent_vpath vpath of the parent file or directory. + * @param new_state Hash of the parent. + */ + void hpfs_mount::set_parent_hash(const std::string &parent_vpath, const util::h32 new_state) + { + std::unique_lock lock(parent_hashes_mutex); + const auto itr = parent_hashes.find(parent_vpath); + if (itr == parent_hashes.end()) + { + parent_hashes.try_emplace(parent_vpath, new_state); + } + else + { + itr->second = new_state; + } + } + +} // namespace hpfs \ No newline at end of file diff --git a/src/hpfs/hpfs_mount.hpp b/src/hpfs/hpfs_mount.hpp new file mode 100644 index 00000000..51b43305 --- /dev/null +++ b/src/hpfs/hpfs_mount.hpp @@ -0,0 +1,75 @@ +#ifndef _HP_HPFS_HPFS_MOUNT_ +#define _HP_HPFS_HPFS_MOUNT_ + +#include "../pchheader.hpp" +#include "../util/h32.hpp" +#include "../conf.hpp" + +namespace hpfs +{ + constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB; + constexpr const char *RW_SESSION_NAME = "rw"; // The built-in session name used by hpfs for RW sessions. + constexpr const char *STATE_DIR_PATH = "/state"; // State directory name. + constexpr const char *PATCH_FILE_PATH = "/patch.cfg"; // Config patch filename. + + struct child_hash_node + { + bool is_file = false; + char name[256]; + util::h32 hash; + + child_hash_node() + { + memset(name, 0, sizeof(name)); + } + }; + + inline uint16_t get_request_resubmit_timeout() + { + return conf::cfg.contract.roundtime; + } + + /** + * This class represents a hpfs file system mount. + */ + class hpfs_mount + { + private: + pid_t hpfs_pid = 0; + std::string fs_dir; + std::string mount_dir; + bool is_full_history; + bool init_success = false; + // Keeps the hashes of hpfs parents against its vpath. + std::unordered_map parent_hashes; + std::shared_mutex parent_hashes_mutex; + // No. of consumers for RW session. + // We use this as a reference counting mechanism to cleanup RW session when no one requires it. + uint32_t rw_consumers = 0; + std::mutex rw_mutex; + + protected: + virtual int prepare_fs(); + + public: + int32_t mount_id; // Used in hpfs serving and syncing. + std::string rw_dir; + int init(const int32_t mount_id, std::string_view fs_dir, std::string_view mount_dir, std::string_view rw_dir, const bool is_full_history); + void deinit(); + + int start_hpfs_process(); + int acquire_rw_session(); + int release_rw_session(); + int start_ro_session(const std::string &name, const bool hmap_enabled); + int stop_ro_session(const std::string &name); + int get_hash(util::h32 &hash, std::string_view session_name, std::string_view vpath); + int get_file_block_hashes(std::vector &hashes, std::string_view session_name, std::string_view vpath); + int get_dir_children_hashes(std::vector &hash_nodes, std::string_view session_name, std::string_view dir_vpath); + const std::string physical_path(std::string_view session_name, std::string_view vpath); + const util::h32 get_parent_hash(const std::string &parent_vpath); + void set_parent_hash(const std::string &parent_vpath, const util::h32 new_state); + }; + +} // namespace hpfs + +#endif \ No newline at end of file diff --git a/src/hpfs/hpfs_serve.cpp b/src/hpfs/hpfs_serve.cpp index 3f337221..30d84686 100644 --- a/src/hpfs/hpfs_serve.cpp +++ b/src/hpfs/hpfs_serve.cpp @@ -13,28 +13,36 @@ namespace p2pmsg = msg::fbuf::p2pmsg; /** - * Helper functions for serving hpfs requests from other peers. + * Class for serving hpfs requests from other peers. */ -namespace hpfs_serve +namespace hpfs { constexpr uint16_t LOOP_WAIT = 20; // Milliseconds constexpr const char *HPFS_SESSION_NAME = "rw"; - uint16_t REQUEST_BATCH_TIMEOUT; - - bool is_shutting_down = false; - bool init_success = false; - std::thread hpfs_serve_thread; - - int init() + /** + * @param name The name of the serving instance. (For identification purpose) + * @param fs_mount The pointer to the relavent hpfs mount instance this server is serving. + * @return This returns -1 on error and 0 on success. + */ + int hpfs_serve::init(std::string_view name, hpfs::hpfs_mount *fs_mount) { + if (fs_mount == NULL) + return -1; + + this->name = name; + this->fs_mount = fs_mount; + REQUEST_BATCH_TIMEOUT = hpfs::get_request_resubmit_timeout() * 0.9; - hpfs_serve_thread = std::thread(hpfs_serve_loop); + hpfs_serve_thread = std::thread(&hpfs_serve::hpfs_serve_loop, this); init_success = true; return 0; } - void deinit() + /** + * Perform cleanup activities. + */ + void hpfs_serve::deinit() { if (init_success) { @@ -43,13 +51,13 @@ namespace hpfs_serve } } - void hpfs_serve_loop() + void hpfs_serve::hpfs_serve_loop() { util::mask_signal(); - LOG_INFO << "Hpfs server started."; + LOG_INFO << "Hpfs " << name << " server started."; - std::list> hpfs_requests; + std::list> hpfs_requests; // Indicates whether any requests were processed in the previous loop iteration. bool prev_requests_processed = false; @@ -60,13 +68,7 @@ namespace hpfs_serve if (!prev_requests_processed) util::sleep(LOOP_WAIT); - { - std::scoped_lock lock(p2p::ctx.collected_msgs.hpfs_requests_mutex); - - // Move collected hpfs requests over to local requests list. - if (!p2p::ctx.collected_msgs.hpfs_requests.empty()) - hpfs_requests.splice(hpfs_requests.end(), p2p::ctx.collected_msgs.hpfs_requests); - } + swap_collected_requests(hpfs_requests); prev_requests_processed = !hpfs_requests.empty(); const uint64_t time_start = util::get_epoch_milliseconds(); @@ -75,9 +77,9 @@ namespace hpfs_serve if (hpfs_requests.empty()) continue; - if (hpfs::acquire_rw_session() != -1) + if (fs_mount->acquire_rw_session() != -1) { - for (auto &[session_id, request] : hpfs_requests) + for (auto &[session_id, hr] : hpfs_requests) { if (is_shutting_down) break; @@ -87,19 +89,15 @@ namespace hpfs_serve const uint64_t time_now = util::get_epoch_milliseconds(); if ((time_now - time_start) > REQUEST_BATCH_TIMEOUT) { - LOG_DEBUG << "Hpfs serve batch timeout. Abandonding hpfs requests."; + LOG_DEBUG << "Hpfs " << name << " serve batch timeout. Abandonding hpfs requests."; break; } // Session id is in binary format. Converting to hex before printing. LOG_DEBUG << "Serving hpfs request from [" << util::to_hex(session_id).substr(2, 10) << "]"; - - const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(request.data()); - - const p2p::hpfs_request sr = p2pmsg::create_hpfs_request_from_msg(*content->message_as_Hpfs_Request_Message()); flatbuffers::FlatBufferBuilder fbuf(1024); - if (hpfs_serve::create_hpfs_response(fbuf, sr, lcl) == 1) + if (hpfs_serve::create_hpfs_response(fbuf, hr, lcl) == 1) { // Find the peer that we should send the hpfs response to. std::scoped_lock lock(p2p::ctx.peer_connections_mutex); @@ -116,13 +114,12 @@ namespace hpfs_serve } } - hpfs::release_rw_session(); + fs_mount->release_rw_session(); } hpfs_requests.clear(); } - - LOG_INFO << "Hpfs server stopped."; + LOG_INFO << "Hpfs " << name << " server stopped."; } /** @@ -132,7 +129,7 @@ namespace hpfs_serve * @return 1 if successful hpfs response was generated. 0 if request is invalid * and no response was generated. -1 on error. */ - int create_hpfs_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::hpfs_request &hr, std::string_view lcl) + int hpfs_serve::create_hpfs_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::hpfs_request &hr, std::string_view lcl) { LOG_DEBUG << "Serving hpfs req. path:" << hr.parent_path << " block_id:" << hr.block_id; @@ -157,7 +154,7 @@ namespace hpfs_serve resp.hash = hr.expected_hash; resp.data = std::string_view(reinterpret_cast(block.data()), block.size()); - msg::fbuf::p2pmsg::create_msg_from_block_response(fbuf, resp, lcl); + msg::fbuf::p2pmsg::create_msg_from_block_response(fbuf, resp, fs_mount->mount_id, lcl); return 1; // Success. } } @@ -178,7 +175,7 @@ namespace hpfs_serve else if (result == 1) { msg::fbuf::p2pmsg::create_msg_from_filehashmap_response( - fbuf, hr.parent_path, block_hashes, + fbuf, hr.parent_path, fs_mount->mount_id, block_hashes, file_length, hr.expected_hash, lcl); return 1; // Success. } @@ -198,7 +195,7 @@ namespace hpfs_serve else if (result == 1) { msg::fbuf::p2pmsg::create_msg_from_fsentry_response( - fbuf, hr.parent_path, child_hash_nodes, hr.expected_hash, lcl); + fbuf, hr.parent_path, fs_mount->mount_id, child_hash_nodes, hr.expected_hash, lcl); return 1; // Success. } } @@ -212,12 +209,12 @@ namespace hpfs_serve * Retrieves the specified data block from a hpfs file if expected hash matches. * @return 1 if block data was succefully fetched. 0 if vpath or block does not exist. -1 on error. */ - int get_data_block(std::vector &block, const std::string_view vpath, - const uint32_t block_id, const util::h32 expected_hash) + int hpfs_serve::get_data_block(std::vector &block, const std::string_view vpath, + const uint32_t block_id, const util::h32 expected_hash) { // Check whether the existing block hash matches expected hash. std::vector block_hashes; - int result = hpfs::get_file_block_hashes(block_hashes, HPFS_SESSION_NAME, vpath); + int result = fs_mount->get_file_block_hashes(block_hashes, HPFS_SESSION_NAME, vpath); if (result == 1) { if (block_id >= block_hashes.size()) @@ -233,7 +230,7 @@ namespace hpfs_serve else // Get actual block data. { struct stat st; - const std::string file_path = conf::ctx.hpfs_rw_dir + vpath.data(); + const std::string file_path = fs_mount->rw_dir + vpath.data(); const off_t block_offset = block_id * hpfs::BLOCK_SIZE; const int fd = open(file_path.c_str(), O_RDONLY | O_CLOEXEC); if (fd == -1) @@ -289,12 +286,12 @@ namespace hpfs_serve * Retrieves the specified file block hashes if expected hash matches. * @return 1 if block hashes were successfuly fetched. 0 if vpath does not exist. -1 on error. */ - int get_data_block_hashes(std::vector &hashes, size_t &file_length, - const std::string_view vpath, const util::h32 expected_hash) + int hpfs_serve::get_data_block_hashes(std::vector &hashes, size_t &file_length, + const std::string_view vpath, const util::h32 expected_hash) { // Check whether the existing file hash matches expected hash. util::h32 file_hash = util::h32_empty; - int result = hpfs::get_hash(file_hash, HPFS_SESSION_NAME, vpath); + int result = fs_mount->get_hash(file_hash, HPFS_SESSION_NAME, vpath); if (result == 1) { if (file_hash != expected_hash) @@ -303,14 +300,14 @@ namespace hpfs_serve result = 0; } // Get the block hashes. - else if (hpfs::get_file_block_hashes(hashes, HPFS_SESSION_NAME, vpath) < 0) + else if (fs_mount->get_file_block_hashes(hashes, HPFS_SESSION_NAME, vpath) < 0) { result = -1; } else { // Get actual file length. - const std::string file_path = conf::ctx.hpfs_rw_dir + vpath.data(); + const std::string file_path = fs_mount->rw_dir + vpath.data(); struct stat st; if (stat(file_path.c_str(), &st) == -1) { @@ -329,12 +326,12 @@ namespace hpfs_serve * Retrieves the specified dir entry hashes if expected fir hash matches. * @return 1 if fs entry hashes were successfuly fetched. 0 if vpath does not exist. -1 on error. */ - int get_fs_entry_hashes(std::vector &hash_nodes, - const std::string_view vpath, const util::h32 expected_hash) + int hpfs_serve::get_fs_entry_hashes(std::vector &hash_nodes, + const std::string_view vpath, const util::h32 expected_hash) { // Check whether the existing dir hash matches expected hash. util::h32 dir_hash = util::h32_empty; - int result = hpfs::get_hash(dir_hash, HPFS_SESSION_NAME, vpath); + int result = fs_mount->get_hash(dir_hash, HPFS_SESSION_NAME, vpath); if (result == 1) { if (dir_hash != expected_hash) @@ -343,7 +340,7 @@ namespace hpfs_serve result = 0; } // Get the children hash nodes. - else if (hpfs::get_dir_children_hashes(hash_nodes, HPFS_SESSION_NAME, vpath) < 0) + else if (fs_mount->get_dir_children_hashes(hash_nodes, HPFS_SESSION_NAME, vpath) < 0) { result = -1; } @@ -355,4 +352,16 @@ namespace hpfs_serve return result; } -} // namespace hpfs_serve \ No newline at end of file + + /** + * Move the collected requests from hpfs requests to the local hpfs request list. + */ + void hpfs_serve::swap_collected_requests(std::list> &hpfs_requests) + { + std::scoped_lock lock(p2p::ctx.collected_msgs.contract_hpfs_requests_mutex); + + // Move collected hpfs requests for contract fs over to local requests list. + if (!p2p::ctx.collected_msgs.contract_hpfs_requests.empty()) + hpfs_requests.splice(hpfs_requests.end(), p2p::ctx.collected_msgs.contract_hpfs_requests); + } +} // namespace hpfs \ No newline at end of file diff --git a/src/hpfs/hpfs_serve.hpp b/src/hpfs/hpfs_serve.hpp index 6426ac8a..367d2ccd 100644 --- a/src/hpfs/hpfs_serve.hpp +++ b/src/hpfs/hpfs_serve.hpp @@ -2,28 +2,43 @@ #define _HP_HPFS_HPFS_SERVE_ #include "../util/h32.hpp" -#include "hpfs.hpp" +#include "hpfs_mount.hpp" #include "../p2p/p2p.hpp" #include "../msg/fbuf/p2pmsg_content_generated.h" -namespace hpfs_serve +namespace hpfs { - int init(); + class hpfs_serve + { + private: + uint16_t REQUEST_BATCH_TIMEOUT; - void deinit(); - - void hpfs_serve_loop(); + bool is_shutting_down = false; + bool init_success = false; + std::thread hpfs_serve_thread; + hpfs::hpfs_mount *fs_mount = NULL; + std::string_view name; + void hpfs_serve_loop(); - int create_hpfs_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::hpfs_request &sr, std::string_view lcl); + protected: + virtual void swap_collected_requests(std::list> &hpfs_requests); // Must override in child classes. - int get_data_block(std::vector &vec, const std::string_view vpath, - const uint32_t block_id, const util::h32 expected_hash); + public: + int init(std::string_view name, hpfs::hpfs_mount *fs_mount); - int get_data_block_hashes(std::vector &hashes, size_t &file_length, - const std::string_view vpath, const util::h32 expected_hash); + void deinit(); - int get_fs_entry_hashes(std::vector &hash_nodes, + int create_hpfs_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::hpfs_request &sr, std::string_view lcl); + + int get_data_block(std::vector &block, const std::string_view vpath, + const uint32_t block_id, const util::h32 expected_hash); + + int get_data_block_hashes(std::vector &hashes, size_t &file_length, + const std::string_view vpath, const util::h32 expected_hash); + + int get_fs_entry_hashes(std::vector &hash_nodes, const std::string_view vpath, const util::h32 expected_hash); -} // namespace hpfs_sync + }; +} // namespace hpfs #endif \ No newline at end of file diff --git a/src/hpfs/hpfs_sync.cpp b/src/hpfs/hpfs_sync.cpp index b64e14bf..b192111d 100644 --- a/src/hpfs/hpfs_sync.cpp +++ b/src/hpfs/hpfs_sync.cpp @@ -6,13 +6,12 @@ #include "../ledger.hpp" #include "../hplog.hpp" #include "../util/util.hpp" -#include "../hpfs/hpfs.hpp" #include "../util/h32.hpp" #include "hpfs_sync.hpp" #include "../sc.hpp" #include "../unl.hpp" -namespace hpfs_sync +namespace hpfs { // Idle loop sleep time (milliseconds). constexpr uint16_t IDLE_WAIT = 40; @@ -28,25 +27,26 @@ namespace hpfs_sync constexpr int FILE_PERMS = 0644; - // No. of milliseconds to wait before resubmitting a request. - uint16_t REQUEST_RESUBMIT_TIMEOUT; - sync_context ctx; - bool init_success = false; - - int init() + /** + * This should be called to activate the hpfs sync. + */ + int hpfs_sync::init(std::string_view name, hpfs::hpfs_mount *fs_mount) { + if (fs_mount == NULL) + return -1; + + this->name = name; + this->fs_mount = fs_mount; REQUEST_RESUBMIT_TIMEOUT = hpfs::get_request_resubmit_timeout(); - ctx.target_state_hash = util::h32_empty; - ctx.target_patch_hash = util::h32_empty; - ctx.current_parent_target_hash = util::h32_empty; - // Patch file sync has the highest priority. - ctx.current_syncing_parent = hpfs::HPFS_PARENT_COMPONENTS::PATCH; - ctx.hpfs_sync_thread = std::thread(hpfs_syncer_loop); + ctx.hpfs_sync_thread = std::thread(&hpfs_sync::hpfs_syncer_loop, this); init_success = true; return 0; } - void deinit() + /** + * Perform relavent cleaning. + */ + void hpfs_sync::deinit() { if (init_success) { @@ -57,41 +57,35 @@ namespace hpfs_sync } /** - * Sets a new target states for the syncing process. - * @param target_state_hash The target hpfs state which we should sync towards. - * @param target_patch_hash The target hpfs patch state which we should sync towards. + * Sets a list of sync targets. Sync finishes when all the targets are synced. + * Syncing happens sequentially. + * @param target_list List of sync targets to sync towards. */ - void set_target(const util::h32 target_state_hash, const util::h32 target_patch_hash) + void hpfs_sync::set_target(const std::queue &target_list) { - std::unique_lock lock(ctx.target_state_mutex); - - // Do not do anything if we are already syncing towards the specified target states. - if (ctx.is_shutting_down || (ctx.is_syncing && ctx.target_state_hash == target_state_hash && ctx.target_patch_hash == target_patch_hash)) + if (target_list.empty()) return; - ctx.target_state_hash = target_state_hash; - ctx.target_patch_hash = target_patch_hash; - if (hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH) != target_patch_hash) - { - ctx.current_syncing_parent = hpfs::HPFS_PARENT_COMPONENTS::PATCH; - ctx.current_parent_target_hash = ctx.target_patch_hash; - } - else - { - ctx.current_syncing_parent = hpfs::HPFS_PARENT_COMPONENTS::STATE; - ctx.current_parent_target_hash = ctx.target_state_hash; - } + // Do not do anything if we are already syncing towards the specified target states. + if (ctx.is_shutting_down || (ctx.is_syncing && ctx.original_target_list == target_list)) + return; + + ctx.original_target_list = target_list; + ctx.target_list = std::move(target_list); + + std::unique_lock lock(ctx.current_target_mutex); + ctx.current_target = ctx.target_list.front(); // Make the first element of the list the first target to sync. ctx.is_syncing = true; } /** * Runs the hpfs sync worker loop. */ - void hpfs_syncer_loop() + void hpfs_sync::hpfs_syncer_loop() { util::mask_signal(); - LOG_INFO << "hpfs sync: Worker started."; + LOG_INFO << "hpfs " << name << " sync: Worker started."; while (!ctx.is_shutting_down) { @@ -102,115 +96,81 @@ namespace hpfs_sync if (!ctx.is_syncing) continue; - if (hpfs::acquire_rw_session() != -1) + if (fs_mount->acquire_rw_session() != -1) { while (!ctx.is_shutting_down) { { - std::shared_lock lock(ctx.target_state_mutex); - if (ctx.current_syncing_parent == hpfs::HPFS_PARENT_COMPONENTS::PATCH) - LOG_INFO << "hpfs sync: Starting sync for target patch hash: " << ctx.target_patch_hash; - else - LOG_INFO << "hpfs sync: Starting sync for target state hash: " << ctx.target_state_hash; + std::shared_lock lock(ctx.current_target_mutex); + LOG_INFO << "hpfs " << name << " sync: Starting sync for target " << ctx.current_target.name << " hash: " << ctx.current_target.hash; } util::h32 new_state = util::h32_empty; - const int result = request_loop(ctx.current_parent_target_hash, new_state); + const int result = request_loop(ctx.current_target.hash, new_state); ctx.pending_requests.clear(); ctx.candidate_hpfs_responses.clear(); ctx.submitted_requests.clear(); - if (result == -1 || ctx.is_shutting_down) + if (result == -1 || result == 1 || ctx.is_shutting_down) break; { - std::shared_lock lock(ctx.target_state_mutex); + std::shared_lock lock(ctx.current_target_mutex); - if (new_state == ctx.current_parent_target_hash) + if (new_state == ctx.current_target.hash) { - if (ctx.current_syncing_parent == hpfs::HPFS_PARENT_COMPONENTS::PATCH) - { - ctx.target_patch_hash = util::h32_empty; - LOG_INFO << "hpfs sync: Target patch state achieved: " << new_state; + LOG_INFO << "hpfs " << name << " sync: Target " << ctx.current_target.name << " hash achieved: " << new_state; + on_current_sync_state_acheived(); - // Appling new patch file changes to hpcore runtime. - if (conf::apply_patch_config(hpfs::RW_SESSION_NAME) == -1) - { - LOG_ERROR << "Appling patch file changes after sync failed"; - } - else - { - unl::update_unl_changes_from_patch(); - - // Update global hash tracker with the new patch file hash. - util::h32 updated_patch_hash; - hpfs::get_hash(updated_patch_hash, hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH); - hpfs::ctx.set_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH, updated_patch_hash); - } - - if (ctx.target_state_hash == hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::STATE)) - break; - - ctx.current_parent_target_hash = ctx.target_state_hash; - ctx.current_syncing_parent = hpfs::HPFS_PARENT_COMPONENTS::STATE; - continue; - } - else if (ctx.current_syncing_parent == hpfs::HPFS_PARENT_COMPONENTS::STATE) - { - ctx.target_state_hash = util::h32_empty; - LOG_INFO << "hpfs sync: Target state achieved: " << new_state; + // Start syncing to next target. + const int result = start_syncing_next_target(); + if (result == 0) break; - } + else if (result == 1) + continue; } else { - LOG_INFO << "hpfs sync: Continuing sync for new target: " << ctx.current_parent_target_hash; + LOG_INFO << "hpfs " << name << " sync: Continuing sync for new target: " << ctx.current_target.hash; continue; } } } - LOG_INFO << "hpfs sync: All parents synced."; - hpfs::release_rw_session(); + LOG_INFO << "hpfs " << name << " sync: All parents synced."; + fs_mount->release_rw_session(); } else { - LOG_ERROR << "hpfs sync: Failed to start hpfs rw session"; + LOG_ERROR << "hpfs " << name << " sync: Failed to start hpfs rw session"; } - - std::unique_lock lock(ctx.target_state_mutex); - ctx.current_parent_target_hash = util::h32_empty; + // Clear target list and original target list since the sync is complete. + ctx.target_list = {}; + ctx.original_target_list = {}; ctx.is_syncing = false; } - LOG_INFO << "hpfs sync: Worker stopped."; + LOG_INFO << "hpfs " << name << " sync: Worker stopped."; } - int request_loop(const util::h32 current_target, util::h32 &updated_state) + /** + * Reqest loop. + * @return -1 on error. 0 when current sync state acheived or sync is stopped due to target change. + * Returns 1 on successfully finishing all the sync targets. + */ + int hpfs_sync::request_loop(const util::h32 current_target, util::h32 &updated_state) { - std::string target_parent_vpath; - BACKLOG_ITEM_TYPE target_parent_backlog_item_type; - if (ctx.current_syncing_parent == hpfs::HPFS_PARENT_COMPONENTS::STATE) - { - target_parent_vpath = hpfs::STATE_DIR_PATH; - target_parent_backlog_item_type = BACKLOG_ITEM_TYPE::DIR; - } - else if (ctx.current_syncing_parent == hpfs::HPFS_PARENT_COMPONENTS::PATCH) - { - target_parent_vpath = hpfs::PATCH_FILE_PATH; - target_parent_backlog_item_type = BACKLOG_ITEM_TYPE::FILE; - } std::string lcl = ledger::ctx.get_lcl(); + // Send the initial root hpfs request of the current target. + submit_request(backlog_item{ctx.current_target.item_type, ctx.current_target.vpath, -1, current_target}, lcl); + // Indicates whether any responses were processed in the previous loop iteration. bool prev_responses_processed = false; // No. of repetitive resubmissions so far. (This is reset whenever we receive a hpfs response) uint16_t resubmissions_count = 0; - // Send the initial root hpfs request. - submit_request(backlog_item{target_parent_backlog_item_type, target_parent_vpath, -1, current_target}, lcl); - while (!should_stop_request_loop(current_target)) { // Wait a small delay if there were no responses processed during previous iteration. @@ -220,13 +180,8 @@ namespace hpfs_sync // Get current lcl. std::string lcl = ledger::ctx.get_lcl(); - { - std::scoped_lock lock(p2p::ctx.collected_msgs.hpfs_responses_mutex); - - // Move collected hpfs responses over to local candidate responses list. - if (!p2p::ctx.collected_msgs.hpfs_responses.empty()) - ctx.candidate_hpfs_responses.splice(ctx.candidate_hpfs_responses.end(), p2p::ctx.collected_msgs.hpfs_responses); - } + // Move the received hpfs responses to the local response list. + swap_collected_responses(); prev_responses_processed = !ctx.candidate_hpfs_responses.empty(); @@ -239,7 +194,7 @@ namespace hpfs_sync if (should_stop_request_loop(current_target)) return 0; - LOG_DEBUG << "hpfs sync: Processing hpfs response from [" << response.first.substr(2, 10) << "]"; + LOG_DEBUG << "hpfs " << name << " sync: Processing hpfs response from [" << response.first.substr(2, 10) << "]"; const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(response.second.data()); const msg::fbuf::p2pmsg::Hpfs_Response_Message *resp_msg = content->message_as_Hpfs_Response_Message(); @@ -252,7 +207,7 @@ namespace hpfs_sync const auto pending_resp_itr = ctx.submitted_requests.find(key); if (pending_resp_itr == ctx.submitted_requests.end()) { - LOG_DEBUG << "hpfs sync: Skipping hpfs response due to hash mismatch."; + LOG_DEBUG << "hpfs " << name << " sync: Skipping hpfs response due to hash mismatch."; continue; } @@ -270,7 +225,7 @@ namespace hpfs_sync // Validate received fs data against the hash. if (!validate_fs_entry_hash(vpath, hash, peer_fs_entry_map)) { - LOG_INFO << "hpfs sync: Skipping hpfs response due to fs entry hash mismatch."; + LOG_INFO << "hpfs " << name << " sync: Skipping hpfs response due to fs entry hash mismatch."; continue; } @@ -287,7 +242,7 @@ namespace hpfs_sync // Validate received hashmap against the hash. if (!validate_file_hashmap_hash(vpath, hash, peer_hashes, peer_hash_count)) { - LOG_INFO << "hpfs sync: Skipping hpfs response due to file hashmap hash mismatch."; + LOG_INFO << "hpfs " << name << " sync: Skipping hpfs response due to file hashmap hash mismatch."; continue; } @@ -304,7 +259,7 @@ namespace hpfs_sync // Validate received block data against the hash. if (!validate_file_block_hash(hash, block_id, buf)) { - LOG_INFO << "hpfs sync: Skipping hpfs response due to file block hash mismatch."; + LOG_INFO << "hpfs " << name << " sync: Skipping hpfs response due to file block hash mismatch."; continue; } @@ -316,16 +271,16 @@ namespace hpfs_sync // After handling each response, check whether we have reached target hpfs state. // get_hash returns 0 incase target parent is not existing in our side. - if (hpfs::get_hash(updated_state, hpfs::RW_SESSION_NAME, target_parent_vpath) == -1) + if (fs_mount->get_hash(updated_state, hpfs::RW_SESSION_NAME, ctx.current_target.vpath) == -1) { - LOG_ERROR << "hpfs sync: exiting due to hash check error."; + LOG_ERROR << "hpfs " << name << " sync: exiting due to hash check error."; return -1; } // Update the central hpfs state tracker. - hpfs::ctx.set_hash(ctx.current_syncing_parent, updated_state); + fs_mount->set_parent_hash(ctx.current_target.vpath, updated_state); - LOG_DEBUG << "hpfs sync: current:" << updated_state << " | target:" << current_target; + LOG_DEBUG << "hpfs " << name << " sync: current:" << updated_state << " | target:" << current_target; if (updated_state == current_target) return 0; } @@ -347,13 +302,18 @@ namespace hpfs_sync { if (++resubmissions_count > ABANDON_THRESHOLD) { - LOG_INFO << "hpfs sync: Resubmission threshold exceeded. Abandoning sync."; - return -1; + LOG_INFO << "hpfs " << name << " sync: Resubmission threshold exceeded. Abandoning sync."; + + std::shared_lock lock(ctx.current_target_mutex); + const int result = start_syncing_next_target(); + if (result == 0) + return 1; // To stop syncing since we have sync all the targets. + return 0; } // Reset the counter and re-submit request. request.waiting_time = 0; - LOG_DEBUG << "hpfs sync: Resubmitting request..."; + LOG_DEBUG << "hpfs " << name << " sync: Resubmitting request..."; submit_request(request, lcl); } } @@ -373,7 +333,6 @@ namespace hpfs_sync } } } - return 0; } @@ -384,7 +343,7 @@ namespace hpfs_sync * @param fs_entry_map Received fs entry map. * @returns true if hash is valid, otherwise false. */ - bool validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const std::unordered_map &fs_entry_map) + bool hpfs_sync::validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const std::unordered_map &fs_entry_map) { util::h32 content_hash; @@ -410,7 +369,7 @@ namespace hpfs_sync * @param hash_count Size of the hash list. * @returns true if hash is valid, otherwise false. */ - bool validate_file_hashmap_hash(std::string_view vpath, std::string_view hash, const util::h32 *hashes, const size_t hash_count) + bool hpfs_sync::validate_file_hashmap_hash(std::string_view vpath, std::string_view hash, const util::h32 *hashes, const size_t hash_count) { util::h32 content_hash = util::h32_empty; @@ -435,7 +394,7 @@ namespace hpfs_sync * @param buf Block buffer. * @returns true if hash is valid, otherwise false. */ - bool validate_file_block_hash(std::string_view hash, const uint32_t block_id, std::string_view buf) + bool hpfs_sync::validate_file_block_hash(std::string_view hash, const uint32_t block_id, std::string_view buf) { // Calculate block offset of this block. const off_t block_offset = block_id * hpfs::BLOCK_SIZE; @@ -446,14 +405,14 @@ namespace hpfs_sync /** * Indicates whether to break out of hpfs request processing loop. */ - bool should_stop_request_loop(const util::h32 current_target) + bool hpfs_sync::should_stop_request_loop(const util::h32 ¤t_target) { if (ctx.is_shutting_down) return true; // Stop request loop if the target has changed. - std::shared_lock lock(ctx.target_state_mutex); - return current_target != ctx.current_parent_target_hash; + std::shared_lock lock(ctx.current_target_mutex); + return current_target != ctx.current_target.hash; } /** @@ -464,24 +423,25 @@ namespace hpfs_sync * @param expected_hash The expected hash of the requested data. The peer will ignore the request if their hash is different. * @param target_pubkey The peer pubkey the request was submitted to. */ - void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, - const util::h32 expected_hash, std::string_view lcl, std::string &target_pubkey) + void hpfs_sync::request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, + const util::h32 expected_hash, std::string_view lcl, std::string &target_pubkey) { p2p::hpfs_request hr; hr.parent_path = path; hr.is_file = is_file; hr.block_id = block_id; hr.expected_hash = expected_hash; + hr.mount_id = fs_mount->mount_id; flatbuffers::FlatBufferBuilder fbuf(1024); - msg::fbuf::p2pmsg::create_msg_from_state_request(fbuf, hr, lcl); + msg::fbuf::p2pmsg::create_msg_from_hpfs_request(fbuf, hr, lcl); p2p::send_message_to_random_peer(fbuf, target_pubkey); //todo: send to a node that hold the majority hpfs state to improve reliability of retrieving hpfs state. } /** * Submits a pending hpfs request to the peer. */ - void submit_request(const backlog_item &request, std::string_view lcl) + void hpfs_sync::submit_request(const backlog_item &request, std::string_view lcl) { const std::string key = std::string(request.path) .append(reinterpret_cast(&request.expected_hash), sizeof(util::h32)); @@ -492,7 +452,7 @@ namespace hpfs_sync request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, lcl, target_pubkey); if (!target_pubkey.empty()) - LOG_DEBUG << "hpfs sync: Requesting from [" << target_pubkey.substr(2, 10) << "]. type:" << request.type + LOG_DEBUG << "hpfs " << name << " sync: Requesting from [" << target_pubkey.substr(2, 10) << "]. type:" << request.type << " path:" << request.path << " block_id:" << request.block_id << " hash:" << request.expected_hash; } @@ -503,19 +463,19 @@ namespace hpfs_sync * @param fs_entry_map Received fs entry map. * @returns 0 on success, otherwise -1. */ - int handle_fs_entry_response(std::string_view vpath, std::unordered_map &fs_entry_map) + int hpfs_sync::handle_fs_entry_response(std::string_view vpath, std::unordered_map &fs_entry_map) { // Get the parent path of the fs entries we have received. - LOG_DEBUG << "hpfs sync: Processing fs entries response for " << vpath; + LOG_DEBUG << "hpfs " << name << " sync: Processing fs entries response for " << vpath; // Create physical directory on our side if not exist. - std::string parent_physical_path = conf::ctx.hpfs_rw_dir + vpath.data(); + std::string parent_physical_path = fs_mount->rw_dir + vpath.data(); if (util::create_dir_tree_recursive(parent_physical_path) == -1) return -1; // Get the children hash entries and compare with what we got from peer. std::vector existing_fs_entries; - if (hpfs::get_dir_children_hashes(existing_fs_entries, hpfs::RW_SESSION_NAME, vpath) == -1) + if (fs_mount->get_dir_children_hashes(existing_fs_entries, hpfs::RW_SESSION_NAME, vpath) == -1) return -1; // Request more info on fs entries that exist on both sides but are different. @@ -544,13 +504,13 @@ namespace hpfs_sync else { // If there was an entry that does not exist on other side, delete it. - std::string child_physical_path = conf::ctx.hpfs_rw_dir + child_vpath.data(); + std::string child_physical_path = fs_mount->rw_dir + child_vpath.data(); if ((ex_entry.is_file && unlink(child_physical_path.c_str()) == -1) || !ex_entry.is_file && util::remove_directory_recursively(child_physical_path.c_str()) == -1) return -1; - LOG_DEBUG << "hpfs sync: Deleted " << (ex_entry.is_file ? "file" : "dir") << " path " << child_vpath; + LOG_DEBUG << "hpfs " << name << " sync: Deleted " << (ex_entry.is_file ? "file" : "dir") << " path " << child_vpath; } } @@ -580,14 +540,14 @@ namespace hpfs_sync * @param file_length Size of the file. * @returns 0 on success, otherwise -1. */ - int handle_file_hashmap_response(std::string_view vpath, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length) + int hpfs_sync::handle_file_hashmap_response(std::string_view vpath, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length) { // Get the file path of the block hashes we have received. - LOG_DEBUG << "hpfs sync: Processing file block hashes response for " << vpath; + LOG_DEBUG << "hpfs " << name << " sync: Processing file block hashes response for " << vpath; // File block hashes on our side (file might not exist on our side). std::vector existing_hashes; - if (hpfs::get_file_block_hashes(existing_hashes, hpfs::RW_SESSION_NAME, vpath) == -1 && errno != ENOENT) + if (fs_mount->get_file_block_hashes(existing_hashes, hpfs::RW_SESSION_NAME, vpath) == -1 && errno != ENOENT) return -1; const size_t existing_hash_count = existing_hashes.size(); @@ -604,7 +564,7 @@ namespace hpfs_sync if (existing_hashes.size() >= hash_count) { // If peer file might be smaller, truncate our file to match with peer file. - std::string file_physical_path = conf::ctx.hpfs_rw_dir + vpath.data(); + std::string file_physical_path = fs_mount->rw_dir + vpath.data(); if (truncate(file_physical_path.c_str(), file_length) == -1) return -1; } @@ -619,13 +579,13 @@ namespace hpfs_sync * @param buf Block buffer. * @returns 0 on success, otherwise -1. */ - int handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf) + int hpfs_sync::handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf) { - LOG_DEBUG << "hpfs sync: Writing block_id " << block_id + LOG_DEBUG << "hpfs " << name << " sync: Writing block_id " << block_id << " (len:" << buf.length() << ") of " << vpath; - std::string file_physical_path = conf::ctx.hpfs_rw_dir + vpath.data(); + std::string file_physical_path = fs_mount->rw_dir + vpath.data(); const int fd = open(file_physical_path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, FILE_PERMS); if (fd == -1) { @@ -645,4 +605,61 @@ namespace hpfs_sync return 0; } -} // namespace hpfs_sync \ No newline at end of file + /** + * This method can be used to invoke mount specific custom logic (after extending this super class) to be executed after + * a sync target is acheived. + */ + void hpfs_sync::on_current_sync_state_acheived() + { + if (ctx.current_target.vpath == hpfs::PATCH_FILE_PATH) + { + // Appling new patch file changes to hpcore runtime. + if (conf::apply_patch_config(hpfs::RW_SESSION_NAME) == -1) + { + LOG_ERROR << "Appling patch file changes after sync failed"; + } + else + { + unl::update_unl_changes_from_patch(); + + // Update global hash tracker with the new patch file hash. + util::h32 updated_patch_hash; + fs_mount->get_hash(updated_patch_hash, hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH); + fs_mount->set_parent_hash(ctx.current_target.vpath, updated_patch_hash); + } + } + } + + /** + * Starts syncing next target if available after current target finishes. + * @return returns 0 when the full sync is complete and 1 when more sync targets are available. + */ + int hpfs_sync::start_syncing_next_target() + { + ctx.target_list.pop(); // Remove the synced parent from the target list. + if (ctx.target_list.empty()) + { + ctx.current_target = {}; + return 0; + } + else + { + ctx.current_target = ctx.target_list.front(); + return 1; + } + } + + /** + * Move the collected responses from hpfs responses to a local response list. + */ + void hpfs_sync::swap_collected_responses() + { + // This logic will be added to a child class in next PBI. + std::scoped_lock lock(p2p::ctx.collected_msgs.contract_hpfs_responses_mutex); + + // Move collected hpfs responses over to local candidate responses list. + if (!p2p::ctx.collected_msgs.contract_hpfs_responses.empty()) + ctx.candidate_hpfs_responses.splice(ctx.candidate_hpfs_responses.end(), p2p::ctx.collected_msgs.contract_hpfs_responses); + } + +} // namespace hpfs \ No newline at end of file diff --git a/src/hpfs/hpfs_sync.hpp b/src/hpfs/hpfs_sync.hpp index b1b1930d..a9af42ac 100644 --- a/src/hpfs/hpfs_sync.hpp +++ b/src/hpfs/hpfs_sync.hpp @@ -7,7 +7,7 @@ #include "../util/h32.hpp" #include "../crypto.hpp" -namespace hpfs_sync +namespace hpfs { enum BACKLOG_ITEM_TYPE @@ -30,14 +30,27 @@ namespace hpfs_sync uint16_t waiting_time = 0; }; + struct sync_target + { + std::string name; // Used for logging. + util::h32 hash = util::h32_empty; + std::string vpath; + BACKLOG_ITEM_TYPE item_type = BACKLOG_ITEM_TYPE::DIR; + + bool operator==(const sync_target &target) const + { + return this->hash == target.hash; + } + }; + struct sync_context { // The current target hashes we are syncing towards. - util::h32 target_state_hash; - util::h32 target_patch_hash; - util::h32 current_parent_target_hash; - - hpfs::HPFS_PARENT_COMPONENTS current_syncing_parent; + std::queue target_list; + // Store the originally submitted sync target list. This list is used to avoid submitting same list multiple times + // because target list is updated when the sync targets are acheived. + std::queue original_target_list; + sync_target current_target = {}; // List of sender pubkeys and hpfs responses(flatbuffer messages) to be processed. std::list> candidate_hpfs_responses; @@ -49,42 +62,58 @@ namespace hpfs_sync std::unordered_map submitted_requests; std::thread hpfs_sync_thread; - std::shared_mutex target_state_mutex; + std::shared_mutex current_target_mutex; std::atomic is_syncing = false; std::atomic is_shutting_down = false; }; - extern sync_context ctx; + class hpfs_sync + { + private: + bool init_success = false; + uint16_t REQUEST_RESUBMIT_TIMEOUT; // No. of milliseconds to wait before resubmitting a request. + hpfs::hpfs_mount *fs_mount = NULL; + std::string name; - int init(); + void hpfs_syncer_loop(); - void deinit(); + int request_loop(const util::h32 current_target, util::h32 &updated_state); - void set_target(const util::h32 target_state_hash, const util::h32 target_patch_hash); + int start_syncing_next_target(); - void hpfs_syncer_loop(); + protected: + virtual void on_current_sync_state_acheived(); + virtual void swap_collected_responses(); // Must override in child classes. - int request_loop(const util::h32 current_target, util::h32 &updated_state); + public: + sync_context ctx; - bool validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const std::unordered_map &fs_entry_map); + int init(std::string_view name, hpfs::hpfs_mount *fs_mount); - bool validate_file_hashmap_hash(std::string_view vpath, std::string_view hash, const util::h32 *hashes, const size_t hash_count); + void deinit(); - bool validate_file_block_hash(std::string_view hash, const uint32_t block_id, std::string_view buf); + void set_target(const std::queue &target_list); - bool should_stop_request_loop(const util::h32 current_target); + bool validate_fs_entry_hash(std::string_view vpath, std::string_view hash, const std::unordered_map &fs_entry_map); - void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, - const util::h32 expected_hash, std::string_view lcl, std::string &target_pubkey); + bool validate_file_hashmap_hash(std::string_view vpath, std::string_view hash, const util::h32 *hashes, const size_t hash_count); - void submit_request(const backlog_item &request, std::string_view lcl); + bool validate_file_block_hash(std::string_view hash, const uint32_t block_id, std::string_view buf); - int handle_fs_entry_response(std::string_view vpath, std::unordered_map &fs_entry_map); + bool should_stop_request_loop(const util::h32 ¤t_target); - int handle_file_hashmap_response(std::string_view vpath, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length); + void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, + const util::h32 expected_hash, std::string_view lcl, std::string &target_pubkey); - int handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf); + void submit_request(const backlog_item &request, std::string_view lcl); -} // namespace hpfs_sync + int handle_fs_entry_response(std::string_view vpath, std::unordered_map &fs_entry_map); + + int handle_file_hashmap_response(std::string_view vpath, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length); + + int handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf); + }; + +} // namespace hpfs #endif \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 867389d0..e03e4c2f 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -14,8 +14,6 @@ #include "consensus.hpp" #include "ledger.hpp" #include "hpfs/hpfs.hpp" -#include "hpfs/hpfs_sync.hpp" -#include "hpfs/hpfs_serve.hpp" #include "unl.hpp" /** @@ -73,8 +71,6 @@ void deinit() p2p::deinit(); read_req::deinit(); consensus::deinit(); - hpfs_sync::deinit(); - hpfs_serve::deinit(); hpfs::deinit(); ledger::deinit(); conf::deinit(); @@ -200,8 +196,6 @@ int main(int argc, char **argv) LOG_INFO << "Contract: " << conf::cfg.contract.id << " (" << conf::cfg.contract.version << ")"; if (hpfs::init() == -1 || - hpfs_serve::init() == -1 || - hpfs_sync::init() == -1 || ledger::init() == -1 || unl::init() == -1 || consensus::init() == -1 || diff --git a/src/msg/fbuf/p2pmsg_content.fbs b/src/msg/fbuf/p2pmsg_content.fbs index b352c866..f54d21bf 100644 --- a/src/msg/fbuf/p2pmsg_content.fbs +++ b/src/msg/fbuf/p2pmsg_content.fbs @@ -93,6 +93,7 @@ table HistoryLedgerBlock { } table Hpfs_Request_Message { //Hpfs request message schema + mount_id: int32; parent_path:string; is_file:bool; block_id:int32; @@ -105,6 +106,7 @@ table Hpfs_Response_Message{ hpfs_response:Hpfs_Response; hash:[ubyte]; path: string; + mount_id: int32; } table Fs_Entry_Response{ diff --git a/src/msg/fbuf/p2pmsg_content_generated.h b/src/msg/fbuf/p2pmsg_content_generated.h index 8724b9e8..f1e000cc 100644 --- a/src/msg/fbuf/p2pmsg_content_generated.h +++ b/src/msg/fbuf/p2pmsg_content_generated.h @@ -1311,11 +1311,18 @@ inline flatbuffers::Offset CreateHistoryLedgerBlockDirect( struct Hpfs_Request_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef Hpfs_Request_MessageBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_PARENT_PATH = 4, - VT_IS_FILE = 6, - VT_BLOCK_ID = 8, - VT_EXPECTED_HASH = 10 + VT_MOUNT_ID = 4, + VT_PARENT_PATH = 6, + VT_IS_FILE = 8, + VT_BLOCK_ID = 10, + VT_EXPECTED_HASH = 12 }; + int32_t mount_id() const { + return GetField(VT_MOUNT_ID, 0); + } + bool mutate_mount_id(int32_t _mount_id) { + return SetField(VT_MOUNT_ID, _mount_id, 0); + } const flatbuffers::String *parent_path() const { return GetPointer(VT_PARENT_PATH); } @@ -1342,6 +1349,7 @@ struct Hpfs_Request_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && + VerifyField(verifier, VT_MOUNT_ID) && VerifyOffset(verifier, VT_PARENT_PATH) && verifier.VerifyString(parent_path()) && VerifyField(verifier, VT_IS_FILE) && @@ -1356,6 +1364,9 @@ struct Hpfs_Request_MessageBuilder { typedef Hpfs_Request_Message Table; flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; + void add_mount_id(int32_t mount_id) { + fbb_.AddElement(Hpfs_Request_Message::VT_MOUNT_ID, mount_id, 0); + } void add_parent_path(flatbuffers::Offset parent_path) { fbb_.AddOffset(Hpfs_Request_Message::VT_PARENT_PATH, parent_path); } @@ -1381,6 +1392,7 @@ struct Hpfs_Request_MessageBuilder { inline flatbuffers::Offset CreateHpfs_Request_Message( flatbuffers::FlatBufferBuilder &_fbb, + int32_t mount_id = 0, flatbuffers::Offset parent_path = 0, bool is_file = false, int32_t block_id = 0, @@ -1389,12 +1401,14 @@ inline flatbuffers::Offset CreateHpfs_Request_Message( builder_.add_expected_hash(expected_hash); builder_.add_block_id(block_id); builder_.add_parent_path(parent_path); + builder_.add_mount_id(mount_id); builder_.add_is_file(is_file); return builder_.Finish(); } inline flatbuffers::Offset CreateHpfs_Request_MessageDirect( flatbuffers::FlatBufferBuilder &_fbb, + int32_t mount_id = 0, const char *parent_path = nullptr, bool is_file = false, int32_t block_id = 0, @@ -1403,6 +1417,7 @@ inline flatbuffers::Offset CreateHpfs_Request_MessageDirec auto expected_hash__ = expected_hash ? _fbb.CreateVector(*expected_hash) : 0; return msg::fbuf::p2pmsg::CreateHpfs_Request_Message( _fbb, + mount_id, parent_path__, is_file, block_id, @@ -1415,7 +1430,8 @@ struct Hpfs_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tabl VT_HPFS_RESPONSE_TYPE = 4, VT_HPFS_RESPONSE = 6, VT_HASH = 8, - VT_PATH = 10 + VT_PATH = 10, + VT_MOUNT_ID = 12 }; msg::fbuf::p2pmsg::Hpfs_Response hpfs_response_type() const { return static_cast(GetField(VT_HPFS_RESPONSE_TYPE, 0)); @@ -1448,6 +1464,12 @@ struct Hpfs_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tabl flatbuffers::String *mutable_path() { return GetPointer(VT_PATH); } + int32_t mount_id() const { + return GetField(VT_MOUNT_ID, 0); + } + bool mutate_mount_id(int32_t _mount_id) { + return SetField(VT_MOUNT_ID, _mount_id, 0); + } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && VerifyField(verifier, VT_HPFS_RESPONSE_TYPE) && @@ -1457,6 +1479,7 @@ struct Hpfs_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tabl verifier.VerifyVector(hash()) && VerifyOffset(verifier, VT_PATH) && verifier.VerifyString(path()) && + VerifyField(verifier, VT_MOUNT_ID) && verifier.EndTable(); } }; @@ -1489,6 +1512,9 @@ struct Hpfs_Response_MessageBuilder { void add_path(flatbuffers::Offset path) { fbb_.AddOffset(Hpfs_Response_Message::VT_PATH, path); } + void add_mount_id(int32_t mount_id) { + fbb_.AddElement(Hpfs_Response_Message::VT_MOUNT_ID, mount_id, 0); + } explicit Hpfs_Response_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { start_ = fbb_.StartTable(); @@ -1505,8 +1531,10 @@ inline flatbuffers::Offset CreateHpfs_Response_Message( msg::fbuf::p2pmsg::Hpfs_Response hpfs_response_type = msg::fbuf::p2pmsg::Hpfs_Response_NONE, flatbuffers::Offset hpfs_response = 0, flatbuffers::Offset> hash = 0, - flatbuffers::Offset path = 0) { + flatbuffers::Offset path = 0, + int32_t mount_id = 0) { Hpfs_Response_MessageBuilder builder_(_fbb); + builder_.add_mount_id(mount_id); builder_.add_path(path); builder_.add_hash(hash); builder_.add_hpfs_response(hpfs_response); @@ -1519,7 +1547,8 @@ inline flatbuffers::Offset CreateHpfs_Response_MessageDir msg::fbuf::p2pmsg::Hpfs_Response hpfs_response_type = msg::fbuf::p2pmsg::Hpfs_Response_NONE, flatbuffers::Offset hpfs_response = 0, const std::vector *hash = nullptr, - const char *path = nullptr) { + const char *path = nullptr, + int32_t mount_id = 0) { auto hash__ = hash ? _fbb.CreateVector(*hash) : 0; auto path__ = path ? _fbb.CreateString(path) : 0; return msg::fbuf::p2pmsg::CreateHpfs_Response_Message( @@ -1527,7 +1556,8 @@ inline flatbuffers::Offset CreateHpfs_Response_MessageDir hpfs_response_type, hpfs_response, hash__, - path__); + path__, + mount_id); } struct Fs_Entry_Response FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { diff --git a/src/msg/fbuf/p2pmsg_helpers.cpp b/src/msg/fbuf/p2pmsg_helpers.cpp index e5c383cb..5661bc98 100644 --- a/src/msg/fbuf/p2pmsg_helpers.cpp +++ b/src/msg/fbuf/p2pmsg_helpers.cpp @@ -4,7 +4,6 @@ #include "../../util/util.hpp" #include "../../hplog.hpp" #include "../../util/h32.hpp" -#include "../../hpfs/hpfs.hpp" #include "../../unl.hpp" #include "p2pmsg_container_generated.h" #include "p2pmsg_content_generated.h" @@ -275,7 +274,7 @@ namespace msg::fbuf::p2pmsg const p2p::hpfs_request create_hpfs_request_from_msg(const Hpfs_Request_Message &msg) { p2p::hpfs_request hr; - + hr.mount_id = msg.mount_id(); hr.block_id = msg.block_id(); hr.is_file = msg.is_file(); hr.parent_path = flatbuff_str_to_sv(msg.parent_path()); @@ -468,13 +467,14 @@ namespace msg::fbuf::p2pmsg * @param container_builder Flatbuffer builder for the container message. * @param hr The hpfs request struct to be placed in the container message. */ - void create_msg_from_state_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::hpfs_request &hr, std::string_view lcl) + void create_msg_from_hpfs_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::hpfs_request &hr, std::string_view lcl) { flatbuffers::FlatBufferBuilder builder(1024); flatbuffers::Offset srmsg = CreateHpfs_Request_Message( builder, + hr.mount_id, sv_to_flatbuff_str(builder, hr.parent_path), hr.is_file, hr.block_id, @@ -497,7 +497,7 @@ namespace msg::fbuf::p2pmsg * @param lcl Lcl to be include in the container msg. */ void create_msg_from_fsentry_response( - flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, + flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, const int32_t mount_id, std::vector &hash_nodes, util::h32 expected_hash, std::string_view lcl) { flatbuffers::FlatBufferBuilder builder(1024); @@ -511,7 +511,7 @@ namespace msg::fbuf::p2pmsg builder, Hpfs_Response_Fs_Entry_Response, resp.Union(), hash_to_flatbuff_bytes(builder, expected_hash), - sv_to_flatbuff_str(builder, path)); + sv_to_flatbuff_str(builder, path), mount_id); flatbuffers::Offset message = CreateContent(builder, Message_Hpfs_Response_Message, st_resp.Union()); builder.Finish(message); // Finished building message content to get serialised content. @@ -529,7 +529,7 @@ namespace msg::fbuf::p2pmsg * @param lcl Lcl to be include in the container msg. */ void create_msg_from_filehashmap_response( - flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, + flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, const int32_t mount_id, std::vector &hashmap, std::size_t file_length, util::h32 expected_hash, std::string_view lcl) { // todo:get a average propsal message size and allocate content builder based on that. @@ -548,7 +548,7 @@ namespace msg::fbuf::p2pmsg Hpfs_Response_File_HashMap_Response, resp.Union(), hash_to_flatbuff_bytes(builder, expected_hash), - sv_to_flatbuff_str(builder, path)); + sv_to_flatbuff_str(builder, path), mount_id); flatbuffers::Offset message = CreateContent(builder, Message_Hpfs_Response_Message, st_resp.Union()); builder.Finish(message); // Finished building message content to get serialised content. @@ -564,7 +564,7 @@ namespace msg::fbuf::p2pmsg * @param block_resp Block response struct to place in the message * @param lcl Lcl to be include in the container message. */ - void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, std::string_view lcl) + void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, const int32_t mount_id, std::string_view lcl) { // todo:get a average propsal message size and allocate content builder based on that. flatbuffers::FlatBufferBuilder builder(1024); @@ -580,7 +580,7 @@ namespace msg::fbuf::p2pmsg Hpfs_Response_Block_Response, resp.Union(), hash_to_flatbuff_bytes(builder, block_resp.hash), - sv_to_flatbuff_str(builder, block_resp.path)); + sv_to_flatbuff_str(builder, block_resp.path), mount_id); flatbuffers::Offset message = CreateContent(builder, Message_Hpfs_Response_Message, st_resp.Union()); builder.Finish(message); // Finished building message content to get serialised content. diff --git a/src/msg/fbuf/p2pmsg_helpers.hpp b/src/msg/fbuf/p2pmsg_helpers.hpp index 4e3d178c..0a044564 100644 --- a/src/msg/fbuf/p2pmsg_helpers.hpp +++ b/src/msg/fbuf/p2pmsg_helpers.hpp @@ -4,7 +4,7 @@ #include "../../pchheader.hpp" #include "../../p2p/p2p.hpp" #include "../../util/h32.hpp" -#include "../../hpfs/hpfs.hpp" +#include "../../hpfs/hpfs_mount.hpp" #include "p2pmsg_container_generated.h" #include "p2pmsg_content_generated.h" @@ -55,17 +55,17 @@ namespace msg::fbuf::p2pmsg void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builder, const std::string_view &msg, std::string_view lcl); - void create_msg_from_state_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::hpfs_request &hr, std::string_view lcl); + void create_msg_from_hpfs_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::hpfs_request &hr, std::string_view lcl); void create_msg_from_fsentry_response( - flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, + flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, const int32_t mount_id, std::vector &hash_nodes, util::h32 expected_hash, std::string_view lcl); void create_msg_from_filehashmap_response( - flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, + flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, const int32_t mount_id, std::vector &hashmap, std::size_t file_length, util::h32 expected_hash, std::string_view lcl); - void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, std::string_view lcl); + void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, const int32_t mount_id, std::string_view lcl); void create_containermsg_from_content( flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, std::string_view lcl, const bool sign); diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 60523b33..d8e14bc8 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -14,8 +14,8 @@ namespace p2p { constexpr uint16_t PROPOSAL_LIST_CAP = 64; // Maximum proposal count. constexpr uint16_t NONUNL_PROPOSAL_LIST_CAP = 64; // Maximum nonunl proposal count. - constexpr uint16_t STATE_REQ_LIST_CAP = 64; // Maximum state request count. - constexpr uint16_t STATE_RES_LIST_CAP = 64; // Maximum state response count. + constexpr uint16_t HPFS_REQ_LIST_CAP = 64; // Maximum state request count. + constexpr uint16_t HPFS_RES_LIST_CAP = 64; // Maximum state response count. constexpr uint16_t PEER_LIST_CAP = 64; // Maximum peer count. struct proposal @@ -28,8 +28,8 @@ namespace p2p uint8_t stage = 0; std::string nonce; // Random nonce that is used to reduce lcl predictability. std::string lcl; - util::h32 state_hash; // Contract state hash. - util::h32 patch_hash; // Patch file hash. + util::h32 state_hash; // Contract state hash. + util::h32 patch_hash; // Patch file hash. std::set users; std::set input_hashes; std::string output_hash; @@ -91,6 +91,7 @@ namespace p2p // Represents a hpfs request sent to a peer. struct hpfs_request { + int32_t mount_id; // Relavent file system id. std::string parent_path; // The requested file or dir path. bool is_file = false; // Whether the path is a file or dir. int32_t block_id = 0; // Block id of the file if we are requesting for file block. Otherwise -1. @@ -122,13 +123,13 @@ namespace p2p std::list nonunl_proposals; std::mutex nonunl_proposals_mutex; // Mutex for non-unl proposals access race conditions. - // List of pairs indicating the session pubkey hex and the hpfs requests. - std::list> hpfs_requests; - std::mutex hpfs_requests_mutex; // Mutex for hpfs requests access race conditions. + // List of pairs indicating the session pubkey hex and the contract fs hpfs requests. + std::list> contract_hpfs_requests; + std::mutex contract_hpfs_requests_mutex; // Mutex for contract fs hpfs requests access race conditions. - // List of pairs indicating the session pubkey hex and the hpfs responses. - std::list> hpfs_responses; - std::mutex hpfs_responses_mutex; // Mutex for hpfs responses access race conditions. + // List of pairs indicating the session pubkey hex and the contract fs hpfs responses. + std::list> contract_hpfs_responses; + std::mutex contract_hpfs_responses_mutex; // Mutex for contract fs hpfs responses access race conditions. }; struct connected_context diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index f03ff9a2..6d258319 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -14,6 +14,7 @@ #include "peer_comm_session.hpp" #include "p2p.hpp" #include "../unl.hpp" +#include "../hpfs/hpfs.hpp" namespace p2pmsg = msg::fbuf::p2pmsg; @@ -192,36 +193,44 @@ namespace p2p } else if (content_message_type == p2pmsg::Message_Hpfs_Request_Message) { - // Check the cap and insert request with lock. - std::scoped_lock lock(ctx.collected_msgs.hpfs_requests_mutex); + const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(content_ptr); + const p2p::hpfs_request hr = p2pmsg::create_hpfs_request_from_msg(*content->message_as_Hpfs_Request_Message()); + if (hr.mount_id == hpfs::contract_fs.mount_id) + { + // Check the cap and insert request with lock. + std::scoped_lock lock(ctx.collected_msgs.contract_hpfs_requests_mutex); - // If max number of state requests reached skip the rest. - if (ctx.collected_msgs.hpfs_requests.size() < p2p::STATE_REQ_LIST_CAP) - { - std::string state_request_msg(reinterpret_cast(content_ptr), content_size); - ctx.collected_msgs.hpfs_requests.push_back(std::make_pair(session.pubkey, std::move(state_request_msg))); - } - else - { - LOG_DEBUG << "State request rejected. Maximum state request count reached. " << session.display_name(); + // If max number of state requests reached skip the rest. + if (ctx.collected_msgs.contract_hpfs_requests.size() < p2p::HPFS_REQ_LIST_CAP) + { + ctx.collected_msgs.contract_hpfs_requests.push_back(std::make_pair(session.pubkey, std::move(hr))); + } + else + { + LOG_DEBUG << "Hpfs contract fs request rejected. Maximum hpfs contract fs request count reached. " << session.display_name(); + } } } else if (content_message_type == p2pmsg::Message_Hpfs_Response_Message) { - if (hpfs_sync::ctx.is_syncing) // Only accept state responses if state is syncing. + const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(content_ptr); + const msg::fbuf::p2pmsg::Hpfs_Response_Message *resp_msg = content->message_as_Hpfs_Response_Message(); + + // Only accept hpfs responses if hpfs fs is syncing. + if (hpfs::contract_sync.ctx.is_syncing && resp_msg->mount_id() == hpfs::contract_fs.mount_id) { // Check the cap and insert state_response with lock. - std::scoped_lock lock(ctx.collected_msgs.hpfs_responses_mutex); + std::scoped_lock lock(ctx.collected_msgs.contract_hpfs_responses_mutex); // If max number of state responses reached skip the rest. - if (ctx.collected_msgs.hpfs_responses.size() < p2p::STATE_RES_LIST_CAP) + if (ctx.collected_msgs.contract_hpfs_responses.size() < p2p::HPFS_RES_LIST_CAP) { std::string response(reinterpret_cast(content_ptr), content_size); - ctx.collected_msgs.hpfs_responses.push_back(std::make_pair(session.uniqueid, std::move(response))); + ctx.collected_msgs.contract_hpfs_responses.push_back(std::make_pair(session.uniqueid, std::move(response))); } else { - LOG_DEBUG << "State response rejected. Maximum state response count reached. " << session.display_name(); + LOG_DEBUG << "Contract hpfs response rejected. Maximum contract hpfs response count reached. " << session.display_name(); } } } diff --git a/src/pchheader.hpp b/src/pchheader.hpp index 15043ddf..504a71a5 100644 --- a/src/pchheader.hpp +++ b/src/pchheader.hpp @@ -49,5 +49,6 @@ #include #include #include +#include #endif \ No newline at end of file diff --git a/src/sc.cpp b/src/sc.cpp index 11edc536..69fbcb35 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -89,7 +89,7 @@ namespace sc execv_args[j] = conf::cfg.contract.runtime_binexec_args[i].data(); execv_args[len - 1] = NULL; - const std::string current_dir = hpfs::physical_path(ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH); + const std::string current_dir = hpfs::contract_fs.physical_path(ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH); chdir(current_dir.c_str()); execv(execv_args[0], execv_args); @@ -156,8 +156,8 @@ namespace sc if (!ctx.args.readonly) ctx.args.hpfs_session_name = hpfs::RW_SESSION_NAME; - return ctx.args.readonly ? hpfs::start_ro_session(ctx.args.hpfs_session_name, false) - : hpfs::acquire_rw_session(); + return ctx.args.readonly ? hpfs::contract_fs.start_ro_session(ctx.args.hpfs_session_name, false) + : hpfs::contract_fs.acquire_rw_session(); } /** @@ -165,36 +165,37 @@ namespace sc */ int stop_hpfs_session(execution_context &ctx) { + hpfs::hpfs_mount &contract_fs = hpfs::contract_fs; if (ctx.args.readonly) { - return hpfs::stop_ro_session(ctx.args.hpfs_session_name); + return contract_fs.stop_ro_session(ctx.args.hpfs_session_name); } else { // Read the state hash if not in readonly mode. - if (hpfs::get_hash(ctx.args.post_execution_state_hash, ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH) < 1) + if (contract_fs.get_hash(ctx.args.post_execution_state_hash, ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH) < 1) { - hpfs::release_rw_session(); + contract_fs.release_rw_session(); return -1; } util::h32 patch_hash; - const int patch_hash_result = hpfs::get_hash(patch_hash, ctx.args.hpfs_session_name, hpfs::PATCH_FILE_PATH); + const int patch_hash_result = contract_fs.get_hash(patch_hash, ctx.args.hpfs_session_name, hpfs::PATCH_FILE_PATH); if (patch_hash_result == -1) { - hpfs::release_rw_session(); + contract_fs.release_rw_session(); return -1; } - else if (patch_hash_result == 1 && patch_hash != hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH)) + else if (patch_hash_result == 1 && patch_hash != contract_fs.get_parent_hash(hpfs::PATCH_FILE_PATH)) { - // Update global hash tracker with the new patch file hash. - hpfs::ctx.set_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH, patch_hash); + // Update global hash tracker of contract fs with the new patch file hash. + contract_fs.set_parent_hash(hpfs::PATCH_FILE_PATH, patch_hash); // Denote that the patch file was updated by the SC. consensus::is_patch_update_pending = true; } - return hpfs::release_rw_session(); + return contract_fs.release_rw_session(); } } diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index e595054b..38291af2 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -8,7 +8,7 @@ #include "../hplog.hpp" #include "../ledger.hpp" #include "../util/buffer_store.hpp" -#include "../hpfs/hpfs.hpp" +#include "../hpfs/hpfs_mount.hpp" #include "usr.hpp" #include "user_session_handler.hpp" #include "user_comm_session.hpp" @@ -16,6 +16,7 @@ #include "user_input.hpp" #include "read_req.hpp" #include "input_nonce_map.hpp" +#include "../hpfs/hpfs.hpp" namespace usr { @@ -391,7 +392,7 @@ namespace usr util::fork_detach(); // before execution chdir into a valid the latest state data directory that contains an appbill.table - const std::string appbill_dir = conf::ctx.hpfs_rw_dir + hpfs::STATE_DIR_PATH; + const std::string appbill_dir = hpfs::contract_fs.rw_dir + hpfs::STATE_DIR_PATH; chdir(appbill_dir.c_str()); int ret = execv(execv_args[0], execv_args); std::cerr << errno << ": Appbill process execv failed.\n";