diff --git a/src/conf.cpp b/src/conf.cpp index 1ca1ffce..8aeb0026 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -3,7 +3,6 @@ #include "crypto.hpp" #include "hpfs/hpfs.hpp" #include "util/util.hpp" -#include "sc.hpp" namespace conf { @@ -17,11 +16,11 @@ namespace conf // Stores the initial startup mode of the node. ROLE startup_mode; - const static char *ROLE_OBSERVER = "observer"; - const static char *ROLE_VALIDATOR = "validator"; - - const static char *PUBLIC = "public"; - const static char *PRIVATE = "private"; + constexpr const char *ROLE_OBSERVER = "observer"; + constexpr const char *ROLE_VALIDATOR = "validator"; + constexpr const char *PUBLIC = "public"; + constexpr const char *PRIVATE = "private"; + constexpr const char *HPFS_PATCH_SESSION_NAME = "ro_patch"; bool init_success = false; @@ -34,16 +33,16 @@ namespace conf // The validations/loading needs to be in this order. // 1. Validate contract directories // 2. Read and load the contract config into memory - // 3. Update contract config if patch file exists. // 4. Validate the loaded config values - // 5. Locking the config file at the startup. + // 5. Initialize logging subsystem. + // 6. Update and validate contract config if patch file exists. if (validate_contract_dir_paths() == -1 || + set_config_lock() == -1 || read_config(cfg) == -1 || - apply_patch_changes(cfg.contract) == -1 || - validate_config(cfg) == -1 || - set_config_lock() == -1) + validate_config(cfg) == -1) { + release_config_lock(); return -1; } @@ -110,11 +109,7 @@ namespace conf util::create_dir_tree_recursive(ctx.hist_dir); util::create_dir_tree_recursive(ctx.full_hist_dir); util::create_dir_tree_recursive(ctx.log_dir); - util::create_dir_tree_recursive(ctx.hpfs_dir); - - // Creating hpfs seed dir in advance so hpfs doesn't cause mkdir race conditions during first-run. - util::create_dir_tree_recursive(ctx.hpfs_dir + "/seed"); - util::create_dir_tree_recursive(ctx.hpfs_dir + std::string("/seed").append(sc::STATE_DIR_PATH)); + util::create_dir_tree_recursive(ctx.hpfs_dir + "/seed" + hpfs::STATE_DIR_PATH); //Create config file with default settings. @@ -201,8 +196,8 @@ namespace conf ctx.hist_dir = basedir + "/hist"; ctx.full_hist_dir = basedir + "/fullhist"; ctx.hpfs_dir = basedir + "/hpfs"; - ctx.hpfs_rw_dir = ctx.hpfs_dir + "/rw"; - ctx.hpfs_serve_dir = ctx.hpfs_dir + "/ss"; + ctx.hpfs_mount_dir = ctx.hpfs_dir + "/mnt"; + ctx.hpfs_rw_dir = ctx.hpfs_mount_dir + "/rw"; ctx.log_dir = basedir + "/log"; } @@ -769,22 +764,19 @@ namespace conf * @param contract_config Contract section of config structure. * @return Returns -1 on error and 0 on successful update. */ - int apply_patch_changes(contract_params &contract_config) + int apply_patch_changes() { - pid_t hpfs_ro_pid = 0; - std::string mount_dir; // Holds the mount directory of the newly created hpfs session. - int res = 0; + if (hpfs::start_ro_session(HPFS_PATCH_SESSION_NAME, false) == -1) + return -1; - if (hpfs::start_ro_rw_process(hpfs_ro_pid, mount_dir, - true, false, true) == -1 || // Creating a hpfs process and then starts a virtual hpfs session. - validate_and_apply_patch_config(contract_config, mount_dir) == -1 || // Validate content in patch file and update contract section in config. - hpfs::stop_fs_session(mount_dir) == -1) // Stop the created hpfs session. - res = -1; + // Validate content in patch file and update contract section in config. + if (validate_and_apply_patch_config(cfg.contract, HPFS_PATCH_SESSION_NAME) == -1) + { + hpfs::stop_ro_session(HPFS_PATCH_SESSION_NAME); + return -1; + } - // Created hpfs process should be killed even the patch validation failed. - if (hpfs_ro_pid > 0 && util::kill_process(hpfs_ro_pid, true) == -1) - res = -1; - return res; + return hpfs::stop_ro_session(HPFS_PATCH_SESSION_NAME); } /** @@ -793,9 +785,9 @@ namespace conf * @param mount_dir hpfs process mount directory path. * @return Returns -1 on error and 0 in successful update. */ - int validate_and_apply_patch_config(contract_params &contract_config, std::string_view mount_dir) + int validate_and_apply_patch_config(contract_params &contract_config, std::string_view hpfs_session_name) { - const std::string path = std::string(mount_dir).append(PATCH_FILE_PATH); + const std::string path = hpfs::physical_path(hpfs_session_name, hpfs::PATCH_FILE_PATH); if (util::is_file_exists(path)) { std::ifstream ifs(path); @@ -889,7 +881,7 @@ namespace conf if (!contract_config.appbill.bin_args.empty()) util::split_string(contract_config.appbill.runtime_args, contract_config.appbill.bin_args, " "); contract_config.appbill.runtime_args.insert(contract_config.appbill.runtime_args.begin(), (contract_config.appbill.mode[0] == '/' ? contract_config.appbill.mode : util::realpath(conf::ctx.contract_dir + "/bin/" + contract_config.appbill.mode))); - + std::cout << "Contract config updated from patch file\n"; } catch (const std::exception &e) diff --git a/src/conf.hpp b/src/conf.hpp index 9cc28893..322dc909 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -137,20 +137,20 @@ namespace conf std::string hpws_exe_path; // hpws executable file path. std::string hpfs_exe_path; // hpfs executable file path. - std::string contract_dir; // Contract base directory full path - std::string full_hist_dir; // Contract full history dir full path - std::string hist_dir; // Contract ledger history dir full path - std::string hpfs_dir; // Hpfs file system mount path (hpfs path) - std::string hpfs_rw_dir; // Hpfs read/write mount path. - std::string hpfs_serve_dir; // Hpfs server hpfs mount path. - std::string log_dir; // Contract log dir full path - std::string config_dir; // Contract config dir full path - std::string config_file; // Full path to the contract config file - std::string tls_key_file; // Full path to the tls private key file - std::string tls_cert_file; // Full path to the tls certificate + std::string contract_dir; // Contract base directory full path. + std::string full_hist_dir; // Contract full history dir full path. + std::string hist_dir; // Contract ledger history dir full path. + std::string hpfs_dir; // hpfs metdata dir (The location of hpfs log file). + std::string hpfs_mount_dir; // hpfs fuse file system mount path. + std::string hpfs_rw_dir; // hpfs read/write fs session path. + std::string log_dir; // Contract log dir full path. + std::string config_dir; // Contract config dir full path. + std::string config_file; // Full path to the contract config file. + std::string tls_key_file; // Full path to the tls private key file. + std::string tls_cert_file; // Full path to the tls certificate. - int config_fd; // Config file file descriptor - struct flock config_lock; // Config file record log + int config_fd; // Config file file descriptor. + struct flock config_lock; // Config file lock. }; // Holds all the contract config values. @@ -173,8 +173,6 @@ namespace conf // Other modeuls will access config values via this. extern contract_config cfg; - const static char *PATCH_FILE_PATH = "/patch.cfg"; // Config patch filename. - int init(); void deinit(); @@ -203,9 +201,9 @@ namespace conf std::string_view extract_missing_field(std::string err_message); - int apply_patch_changes(contract_params &contract_config); + int apply_patch_changes(); - int validate_and_apply_patch_config(contract_params &contract_config, std::string_view mount_dir); + int validate_and_apply_patch_config(contract_params &contract_config, std::string_view hpfs_session_name); int set_config_lock(); diff --git a/src/consensus.cpp b/src/consensus.cpp index 10268fdf..13c9a973 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -817,7 +817,7 @@ namespace consensus new_lcl = ledger::ctx.get_lcl(); const uint64_t new_lcl_seq_no = ledger::ctx.get_seq_no(); - LOG_INFO << "****Ledger created**** (lcl:" << new_lcl.substr(0, 15) << " state hash:" << cons_prop.state_hash << " patch hash:" << cons_prop.patch_hash << ")"; + LOG_INFO << "****Ledger created**** (lcl:" << new_lcl.substr(0, 15) << " state:" << cons_prop.state_hash << " patch:" << cons_prop.patch_hash << ")"; // After the current ledger seq no is updated, we remove any newly expired inputs from candidate set. { @@ -844,7 +844,6 @@ namespace consensus } sc::contract_execution_args &args = ctx.contract_ctx->args; - args.hpfs_dir = conf::ctx.hpfs_rw_dir; args.readonly = false; args.time = cons_prop.time; args.lcl = new_lcl; diff --git a/src/hpfs/hpfs.cpp b/src/hpfs/hpfs.cpp index 276fa2dd..3a8b2dba 100644 --- a/src/hpfs/hpfs.cpp +++ b/src/hpfs/hpfs.cpp @@ -7,12 +7,17 @@ namespace hpfs { - constexpr const char *HPFS_TRACE_ARG_ERROR = "trace=error"; - constexpr const char *HPFS_TRACE_ARG_DEBUG = "trace=error"; - constexpr const char *HPFS_HMAP_HASH = "::hpfs.hmap.hash"; - constexpr const char *HPFS_HMAP_CHILDREN = "::hpfs.hmap.children"; - constexpr const char *HPFS_SESSION = "::hpfs.session"; - constexpr ino_t HPFS_ROOT_INO = 2; + constexpr const char *TRACE_ARG_ERROR = "trace=error"; + constexpr const char *TRACE_ARG_DEBUG = "trace=error"; + constexpr const char *RW_SESSION = "/::hpfs.rw.hmap"; + constexpr const char *RO_SESSION = "/::hpfs.ro."; + constexpr const char *RO_SESSION_HMAP = "/::hpfs.ro.hmap."; + constexpr const char *HMAP_HASH = "::hpfs.hmap.hash"; + constexpr const char *HMAP_CHILDREN = "::hpfs.hmap.children"; + constexpr ino_t ROOT_INO = 1; + + constexpr const char *INIT_SESSION_NAME = "init"; + constexpr uint16_t PROCESS_INIT_TIMEOUT = 2000; constexpr uint16_t INIT_CHECK_INTERVAL = 20; bool init_success = false; hpfs_context ctx; @@ -22,32 +27,25 @@ namespace hpfs */ int init() { - if (start_merge_process(ctx.hpfs_merge_pid) == -1) + if (start_hpfs_process(ctx.hpfs_pid) == -1) return -1; - if (start_ro_rw_process(ctx.hpfs_rw_pid, conf::ctx.hpfs_rw_dir, false, true, false) == -1) - { - // Stop the merge process in case of failure. - util::kill_process(ctx.hpfs_merge_pid, true); - return -1; - } - util::h32 initial_state_hash; util::h32 initial_patch_hash; - if (start_fs_session(conf::ctx.hpfs_rw_dir) == -1 || - get_hash(initial_state_hash, conf::ctx.hpfs_rw_dir, sc::STATE_DIR_PATH) == -1 || - get_hash(initial_patch_hash, conf::ctx.hpfs_rw_dir, conf::PATCH_FILE_PATH) == -1 || - stop_fs_session(conf::ctx.hpfs_rw_dir) == -1) + if (start_ro_session(INIT_SESSION_NAME, true) == -1 || + get_hash(initial_state_hash, INIT_SESSION_NAME, hpfs::STATE_DIR_PATH) == -1 || + get_hash(initial_patch_hash, INIT_SESSION_NAME, hpfs::PATCH_FILE_PATH) == -1 || + stop_ro_session(INIT_SESSION_NAME) == -1) { - LOG_ERROR << "Failed to get initial state hash."; + LOG_ERROR << "Failed to get initial hpfs hashes."; + util::kill_process(ctx.hpfs_pid, true); return -1; } ctx.set_hash(HPFS_PARENT_COMPONENTS::STATE, initial_state_hash); ctx.set_hash(HPFS_PARENT_COMPONENTS::PATCH, initial_patch_hash); - LOG_INFO << "Initial state hash: " << initial_state_hash; - LOG_INFO << "Initial patch hash: " << initial_patch_hash; + LOG_INFO << "Initial state hash: " << initial_state_hash << " | patch hash: " << initial_patch_hash; init_success = true; return 0; } @@ -59,88 +57,26 @@ namespace hpfs { if (init_success) { - LOG_DEBUG << "Stopping hpfs rw process... pid:" << ctx.hpfs_rw_pid; - if (ctx.hpfs_rw_pid > 0 && util::kill_process(ctx.hpfs_rw_pid, true) == 0) - LOG_INFO << "Stopped hpfs rw process."; - - LOG_DEBUG << "Stopping hpfs merge process... pid:" << ctx.hpfs_merge_pid; - if (ctx.hpfs_merge_pid > 0 && util::kill_process(ctx.hpfs_merge_pid, true) == 0) - LOG_INFO << "Stopped hpfs merge process."; + LOG_DEBUG << "Stopping hpfs process... pid:" << ctx.hpfs_pid; + if (ctx.hpfs_pid > 0 && util::kill_process(ctx.hpfs_pid, true) == 0) + LOG_INFO << "Stopped hpfs process."; } } /** - * Starts hpfs merge process. + * Starts the hpfs process used for all fs sessions. */ - int start_merge_process(pid_t &hpfs_pid) + int start_hpfs_process(pid_t &hpfs_pid) { const pid_t pid = fork(); - - if (pid > 0) - { - LOG_DEBUG << "Starting hpfs merge process..."; - - // HotPocket process. - util::sleep(INIT_CHECK_INTERVAL); - - // Check if hpfs process is still running. - // Sending signal 0 to test whether process exist. - if (util::kill_process(pid, false, 0) == -1) - return -1; - - hpfs_pid = pid; - LOG_DEBUG << "hpfs merge process started. pid:" << hpfs_pid; - } - else if (pid == 0) - { - // hpfs process. - util::fork_detach(); - - const char *active_hpfs_trace_arg = (conf::cfg.log.loglevel_type == conf::LOG_SEVERITY::DEBUG ? HPFS_TRACE_ARG_DEBUG : HPFS_TRACE_ARG_ERROR); - - // Fill process args. - char *execv_args[] = { - conf::ctx.hpfs_exe_path.data(), - (char *)"merge", - conf::ctx.hpfs_dir.data(), - (char *)active_hpfs_trace_arg, - NULL}; - - const int ret = execv(execv_args[0], execv_args); - std::cerr << errno << ": hpfs merge process execv failed.\n"; - exit(1); - } - else - { - LOG_ERROR << errno << ": fork() failed when starting hpfs merge process."; - return -1; - } - - return 0; - } - - /** - * Starts hpfs readonly/readwrite process and also starts a virtual fs session. - */ - int start_ro_rw_process(pid_t &hpfs_pid, std::string &mount_dir, const bool readonly, - const bool hash_map_enabled, const bool auto_start_session, const uint16_t timeout) - { - const pid_t pid = fork(); - const char *mode = readonly ? "ro" : "rw"; - if (pid > 0) { // HotPocket process. - LOG_DEBUG << "Starting hpfs " << mode << " process at " << mount_dir; - // If the mount dir is not specified, assign a mount dir based on hpfs process id. - if (mount_dir.empty()) - mount_dir = std::string(conf::ctx.hpfs_dir) - .append("/") - .append(std::to_string(pid)); + LOG_DEBUG << "Starting hpfs process."; // Wait until hpfs is initialized properly. - const uint16_t max_retries = timeout / INIT_CHECK_INTERVAL; + const uint16_t max_retries = PROCESS_INIT_TIMEOUT / INIT_CHECK_INTERVAL; bool hpfs_initialized = false; uint16_t retry_count = 0; do @@ -151,65 +87,54 @@ namespace hpfs // Sending signal 0 to test whether process exist. if (util::kill_process(pid, false, 0) == -1) { - LOG_ERROR << "hpfs process " << pid << " has stopped at " << mount_dir; + LOG_ERROR << "hpfs process " << pid << " has stopped."; break; } // We check for the specific inode no. of the mounted root dir. That means hpfs FUSE interface is up. struct stat st; - if (stat(mount_dir.c_str(), &st) == -1) + if (stat(conf::ctx.hpfs_mount_dir.c_str(), &st) == -1) { - LOG_ERROR << errno << ": Error in checking hpfs status at " << mount_dir; + LOG_ERROR << errno << ": Error in checking hpfs status."; break; } - hpfs_initialized = (st.st_ino == HPFS_ROOT_INO); + hpfs_initialized = (st.st_ino == ROOT_INO); // Keep retrying until root inode no. matches or timeout occurs. } while (!hpfs_initialized && ++retry_count <= max_retries); - // If hpfs FUSE interface initialized within the timeout period, we then attempt to start up a virtual fs session. - // hpfs achieves this by having a 'session' file created. - if (hpfs_initialized && auto_start_session) - start_fs_session(mount_dir); - // Kill the process if hpfs couldn't be initialized properly. if (!hpfs_initialized) { - LOG_ERROR << "Couldn't initialize hpfs session at " << mount_dir; + LOG_ERROR << "Couldn't initialize hpfs process."; util::kill_process(pid, true); return -1; } hpfs_pid = pid; - LOG_DEBUG << "hpfs " << mode << " process started at " << mount_dir << " pid:" << hpfs_pid; + LOG_DEBUG << "hpfs process started. pid:" << hpfs_pid; } else if (pid == 0) { // hpfs process. util::fork_detach(); - // If the mount dir is not specified, assign a mount dir based on hpfs process id. - const pid_t self_pid = getpid(); - if (mount_dir.empty()) - mount_dir = std::string(conf::ctx.hpfs_dir) - .append("/") - .append(std::to_string(self_pid)); - - const char *active_hpfs_trace_arg = (conf::cfg.log.loglevel_type == conf::LOG_SEVERITY::DEBUG ? HPFS_TRACE_ARG_DEBUG : HPFS_TRACE_ARG_ERROR); + const char *active_hpfs_trace_arg = (conf::cfg.log.loglevel_type == conf::LOG_SEVERITY::DEBUG ? TRACE_ARG_DEBUG : TRACE_ARG_ERROR); // Fill process args. char *execv_args[] = { conf::ctx.hpfs_exe_path.data(), - (char *)mode, // hpfs mode: rw | ro + (char *)"fs", conf::ctx.hpfs_dir.data(), - mount_dir.data(), - (char *)(hash_map_enabled ? "hmap=true" : "hmap=false"), + conf::ctx.hpfs_mount_dir.data(), + // In full history mode, we disable log merge of hpfs. + (char *)(conf::cfg.node.full_history ? "merge=false" : "merge=true"), (char *)active_hpfs_trace_arg, NULL}; const int ret = execv(execv_args[0], execv_args); - std::cerr << errno << ": hpfs session process execv failed.\n"; + std::cerr << errno << ": hpfs process execv failed.\n"; exit(1); } else @@ -222,32 +147,82 @@ namespace hpfs } /** - * Starts a virtual fs session on the hpfs process attached to the specified mount dir. + * Starts a virtual fs ReadWrite session with hash map enabled. + * If RW session already started, this will simply acquire a consumer reference. + * @return 0 on success. -1 on failure. */ - int start_fs_session(std::string_view mount_dir) + int acquire_rw_session() { - LOG_DEBUG << "Starting hpfs fs session at " << mount_dir; + std::scoped_lock lock(ctx.rw_mutex); - const std::string session_file = std::string(mount_dir).append("/").append(HPFS_SESSION); + LOG_DEBUG << "Starting hpfs rw session at " << conf::ctx.hpfs_rw_dir; + + const std::string session_file = conf::ctx.hpfs_mount_dir + RW_SESSION; + + // The sessions creation either should be succesful or should report as already exists (errno=EEXIST). + // Otherwise we consider it as failure. + if (mknod(session_file.c_str(), 0, 0) == -1 && errno != EEXIST) + { + LOG_ERROR << errno << ": Error starting hpfs rw session at " << conf::ctx.hpfs_rw_dir; + return -1; + } + ctx.rw_consumers++; + return 0; + } + + /** + * Releases a consumer reference to the RW session. If there are no more references, + * actually stops the running RW session. + * @return 0 on success. -1 on failure. + */ + int release_rw_session() + { + std::scoped_lock lock(ctx.rw_mutex); + + if (ctx.rw_consumers > 0) + ctx.rw_consumers--; + + if (ctx.rw_consumers == 0) + { + const std::string session_file = conf::ctx.hpfs_mount_dir + RW_SESSION; + if (unlink(session_file.c_str()) == -1) + { + LOG_ERROR << errno << ": Error stopping hpfs rw session at " << conf::ctx.hpfs_rw_dir; + return -1; + } + } + return 0; + } + + /** + * Starts a virtual fs ReadOnly session. + * @return 0 on success. -1 on failure. + */ + int start_ro_session(const std::string &name, const bool hmap_enabled) + { + LOG_DEBUG << "Starting hpfs ro session " << name << " hmap:" << hmap_enabled; + + const std::string session_file = conf::ctx.hpfs_mount_dir + (hmap_enabled ? RO_SESSION_HMAP : RO_SESSION) + name; if (mknod(session_file.c_str(), 0, 0) == -1) { - LOG_ERROR << errno << ": Error starting hpfs fs session at " << mount_dir; + LOG_ERROR << errno << ": Error starting hpfs ro session " << name; return -1; } return 0; } /** - * Stops the active virtual fs session on the hpfs process attached to the specified mount dir. + * Stops the specified ReadOnly fs session. + * @return 0 on success. -1 on failure. */ - int stop_fs_session(std::string_view mount_dir) + int stop_ro_session(const std::string &name) { - LOG_DEBUG << "Stopping hpfs fs session at " << mount_dir; + LOG_DEBUG << "Stopping hpfs ro session " << name; - const std::string session_file = std::string(mount_dir).append("/").append(HPFS_SESSION); + const std::string session_file = conf::ctx.hpfs_mount_dir + RO_SESSION + name; if (unlink(session_file.c_str()) == -1) { - LOG_ERROR << errno << ": Error stopping hpfs fs session at " << mount_dir; + LOG_ERROR << errno << ": Error stopping hpfs ro session " << name; return -1; } return 0; @@ -257,9 +232,9 @@ namespace hpfs * Populates the hash of the specified vpath. * @return 1 on success. 0 if vpath not found. -1 on error. */ - int get_hash(util::h32 &hash, const std::string_view mount_dir, const std::string_view vpath) + int get_hash(util::h32 &hash, std::string_view session_name, std::string_view vpath) { - const std::string path = std::string(mount_dir).append(vpath).append(HPFS_HMAP_HASH); + const std::string path = physical_path(session_name, std::string(vpath).append(HMAP_HASH)); const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC); if (fd == -1 && errno == ENOENT) { @@ -286,9 +261,9 @@ namespace hpfs * Populates the list of file block hashes for the specified vpath. * @return 1 on success. 0 if vpath not found. -1 on error. */ - int get_file_block_hashes(std::vector &hashes, const std::string_view mount_dir, const std::string_view vpath) + int get_file_block_hashes(std::vector &hashes, std::string_view session_name, std::string_view vpath) { - const std::string path = std::string(mount_dir).append(vpath).append(HPFS_HMAP_CHILDREN); + const std::string path = physical_path(session_name, std::string(vpath).append(HMAP_CHILDREN)); const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC); if (fd == -1 && errno == ENOENT) { @@ -326,9 +301,9 @@ namespace hpfs * Populates the list of dir entry hashes for the specified vpath. * @return 1 on success. 0 if vpath not found. -1 on error. */ - int get_dir_children_hashes(std::vector &hash_nodes, const std::string_view mount_dir, const std::string_view dir_vpath) + int get_dir_children_hashes(std::vector &hash_nodes, std::string_view session_name, std::string_view dir_vpath) { - const std::string path = std::string(mount_dir).append(dir_vpath).append(HPFS_HMAP_CHILDREN); + const std::string path = physical_path(session_name, std::string(dir_vpath).append(HMAP_CHILDREN)); const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC); if (fd == -1 && errno == ENOENT) { @@ -362,4 +337,9 @@ namespace hpfs return 1; } + const std::string physical_path(std::string_view session_name, std::string_view vpath) + { + return conf::ctx.hpfs_mount_dir + "/" + session_name.data() + vpath.data(); + } + } // namespace hpfs \ No newline at end of file diff --git a/src/hpfs/hpfs.hpp b/src/hpfs/hpfs.hpp index 46839ea9..30551fce 100644 --- a/src/hpfs/hpfs.hpp +++ b/src/hpfs/hpfs.hpp @@ -7,7 +7,10 @@ namespace hpfs { - constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB; + constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB; + constexpr const char *RW_SESSION_NAME = "rw"; // The built-in session name used by hpfs for RW sessions. + constexpr const char *STATE_DIR_PATH = "/state"; // State directory name. + constexpr const char *PATCH_FILE_PATH = "/patch.cfg"; // Config patch filename. struct child_hash_node { @@ -39,8 +42,12 @@ namespace hpfs std::shared_mutex parent_mutexes[2] = {std::shared_mutex(), std::shared_mutex()}; // Mutexes for each parent. public: - pid_t hpfs_merge_pid = 0; - pid_t hpfs_rw_pid = 0; + pid_t hpfs_pid = 0; + + // No. of consumers for RW session. + // We use this as a reference counting mechanism to cleanup RW session when no one requires it. + uint32_t rw_consumers = 0; + std::mutex rw_mutex; hpfs_context() { @@ -69,14 +76,15 @@ namespace hpfs int init(); void deinit(); - int start_merge_process(pid_t &hpfs_pid); - int start_ro_rw_process(pid_t &hpfs_pid, std::string &mount_dir, const bool readonly, - const bool hash_map_enabled, const bool auto_start_session, const uint16_t timeout = 4000); - int start_fs_session(std::string_view mount_dir); - int stop_fs_session(std::string_view mount_dir); - int get_hash(util::h32 &hash, const std::string_view mount_dir, const std::string_view vpath); - int get_file_block_hashes(std::vector &hashes, const std::string_view mount_dir, const std::string_view vpath); - int get_dir_children_hashes(std::vector &hash_nodes, const std::string_view mount_dir, const std::string_view dir_vpath); + int start_hpfs_process(pid_t &hpfs_pid); + int acquire_rw_session(); + int release_rw_session(); + int start_ro_session(const std::string &name, const bool hmap_enabled); + int stop_ro_session(const std::string &name); + int get_hash(util::h32 &hash, std::string_view session_name, std::string_view vpath); + int get_file_block_hashes(std::vector &hashes, std::string_view session_name, std::string_view vpath); + int get_dir_children_hashes(std::vector &hash_nodes, std::string_view session_name, std::string_view dir_vpath); + const std::string physical_path(std::string_view session_name, std::string_view vpath); } // namespace hpfs #endif \ No newline at end of file diff --git a/src/hpfs/hpfs_serve.cpp b/src/hpfs/hpfs_serve.cpp index a90a47de..3f337221 100644 --- a/src/hpfs/hpfs_serve.cpp +++ b/src/hpfs/hpfs_serve.cpp @@ -18,21 +18,17 @@ namespace p2pmsg = msg::fbuf::p2pmsg; namespace hpfs_serve { constexpr uint16_t LOOP_WAIT = 20; // Milliseconds + constexpr const char *HPFS_SESSION_NAME = "rw"; uint16_t REQUEST_BATCH_TIMEOUT; bool is_shutting_down = false; bool init_success = false; - pid_t hpfs_pid; std::thread hpfs_serve_thread; int init() { REQUEST_BATCH_TIMEOUT = hpfs::get_request_resubmit_timeout() * 0.9; - - if (hpfs::start_ro_rw_process(hpfs_pid, conf::ctx.hpfs_serve_dir, true, true, false) == -1) - return -1; - hpfs_serve_thread = std::thread(hpfs_serve_loop); init_success = true; return 0; @@ -44,10 +40,6 @@ namespace hpfs_serve { is_shutting_down = true; hpfs_serve_thread.join(); - - LOG_DEBUG << "Stopping hpfs state serve process... pid:" << hpfs_pid; - if (hpfs_pid > 0 && util::kill_process(hpfs_pid, true) == 0) - LOG_INFO << "Stopped hpfs state serve process."; } } @@ -83,7 +75,7 @@ namespace hpfs_serve if (hpfs_requests.empty()) continue; - if (hpfs::start_fs_session(conf::ctx.hpfs_serve_dir) != -1) + if (hpfs::acquire_rw_session() != -1) { for (auto &[session_id, request] : hpfs_requests) { @@ -124,7 +116,7 @@ namespace hpfs_serve } } - hpfs::stop_fs_session(conf::ctx.hpfs_serve_dir); + hpfs::release_rw_session(); } hpfs_requests.clear(); @@ -225,7 +217,7 @@ namespace hpfs_serve { // Check whether the existing block hash matches expected hash. std::vector block_hashes; - int result = hpfs::get_file_block_hashes(block_hashes, conf::ctx.hpfs_serve_dir, vpath); + int result = hpfs::get_file_block_hashes(block_hashes, HPFS_SESSION_NAME, vpath); if (result == 1) { if (block_id >= block_hashes.size()) @@ -241,7 +233,7 @@ namespace hpfs_serve else // Get actual block data. { struct stat st; - const std::string file_path = std::string(conf::ctx.hpfs_serve_dir).append(vpath); + const std::string file_path = conf::ctx.hpfs_rw_dir + vpath.data(); const off_t block_offset = block_id * hpfs::BLOCK_SIZE; const int fd = open(file_path.c_str(), O_RDONLY | O_CLOEXEC); if (fd == -1) @@ -302,7 +294,7 @@ namespace hpfs_serve { // Check whether the existing file hash matches expected hash. util::h32 file_hash = util::h32_empty; - int result = hpfs::get_hash(file_hash, conf::ctx.hpfs_serve_dir, vpath); + int result = hpfs::get_hash(file_hash, HPFS_SESSION_NAME, vpath); if (result == 1) { if (file_hash != expected_hash) @@ -311,14 +303,14 @@ namespace hpfs_serve result = 0; } // Get the block hashes. - else if (hpfs::get_file_block_hashes(hashes, conf::ctx.hpfs_serve_dir, vpath) < 0) + else if (hpfs::get_file_block_hashes(hashes, HPFS_SESSION_NAME, vpath) < 0) { result = -1; } else { // Get actual file length. - const std::string file_path = std::string(conf::ctx.hpfs_serve_dir).append(vpath); + const std::string file_path = conf::ctx.hpfs_rw_dir + vpath.data(); struct stat st; if (stat(file_path.c_str(), &st) == -1) { @@ -342,7 +334,7 @@ namespace hpfs_serve { // Check whether the existing dir hash matches expected hash. util::h32 dir_hash = util::h32_empty; - int result = hpfs::get_hash(dir_hash, conf::ctx.hpfs_serve_dir, vpath); + int result = hpfs::get_hash(dir_hash, HPFS_SESSION_NAME, vpath); if (result == 1) { if (dir_hash != expected_hash) @@ -351,7 +343,7 @@ namespace hpfs_serve result = 0; } // Get the children hash nodes. - else if (hpfs::get_dir_children_hashes(hash_nodes, conf::ctx.hpfs_serve_dir, vpath) < 0) + else if (hpfs::get_dir_children_hashes(hash_nodes, HPFS_SESSION_NAME, vpath) < 0) { result = -1; } diff --git a/src/hpfs/hpfs_sync.cpp b/src/hpfs/hpfs_sync.cpp index 2ef9abae..3f437e7e 100644 --- a/src/hpfs/hpfs_sync.cpp +++ b/src/hpfs/hpfs_sync.cpp @@ -42,7 +42,6 @@ namespace hpfs_sync // Patch file sync has the highest priority. ctx.current_syncing_parent = hpfs::HPFS_PARENT_COMPONENTS::PATCH; ctx.hpfs_sync_thread = std::thread(hpfs_syncer_loop); - ctx.hpfs_mount_dir = conf::ctx.hpfs_rw_dir; init_success = true; return 0; } @@ -103,7 +102,7 @@ namespace hpfs_sync if (!ctx.is_syncing) continue; - if (hpfs::start_fs_session(ctx.hpfs_mount_dir) != -1) + if (hpfs::acquire_rw_session() != -1) { while (!ctx.is_shutting_down) { @@ -135,7 +134,7 @@ namespace hpfs_sync LOG_INFO << "hpfs sync: Target patch state achieved: " << new_state; // Appling new patch file changes to hpcore runtime. - if (conf::validate_and_apply_patch_config(conf::cfg.contract, conf::ctx.hpfs_rw_dir) == -1) + if (conf::validate_and_apply_patch_config(conf::cfg.contract, hpfs::RW_SESSION_NAME) == -1) { LOG_ERROR << "Appling patch file changes after sync failed"; } @@ -145,7 +144,7 @@ namespace hpfs_sync // Update global hash tracker with the new patch file hash. util::h32 updated_patch_hash; - hpfs::get_hash(updated_patch_hash, conf::ctx.hpfs_rw_dir, conf::PATCH_FILE_PATH); + hpfs::get_hash(updated_patch_hash, hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH); hpfs::ctx.set_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH, updated_patch_hash); } @@ -172,7 +171,7 @@ namespace hpfs_sync } LOG_INFO << "hpfs sync: All parents synced."; - hpfs::stop_fs_session(ctx.hpfs_mount_dir); + hpfs::release_rw_session(); } else { @@ -193,12 +192,12 @@ namespace hpfs_sync BACKLOG_ITEM_TYPE target_parent_backlog_item_type; if (ctx.current_syncing_parent == hpfs::HPFS_PARENT_COMPONENTS::STATE) { - target_parent_vpath = sc::STATE_DIR_PATH; + target_parent_vpath = hpfs::STATE_DIR_PATH; target_parent_backlog_item_type = BACKLOG_ITEM_TYPE::DIR; } else if (ctx.current_syncing_parent == hpfs::HPFS_PARENT_COMPONENTS::PATCH) { - target_parent_vpath = conf::PATCH_FILE_PATH; + target_parent_vpath = hpfs::PATCH_FILE_PATH; target_parent_backlog_item_type = BACKLOG_ITEM_TYPE::FILE; } std::string lcl = ledger::ctx.get_lcl(); @@ -317,7 +316,7 @@ namespace hpfs_sync // After handling each response, check whether we have reached target hpfs state. // get_hash returns 0 incase target parent is not existing in our side. - if (hpfs::get_hash(updated_state, ctx.hpfs_mount_dir, target_parent_vpath) == -1) + if (hpfs::get_hash(updated_state, hpfs::RW_SESSION_NAME, target_parent_vpath) == -1) { LOG_ERROR << "hpfs sync: exiting due to hash check error."; return -1; @@ -510,13 +509,13 @@ namespace hpfs_sync LOG_DEBUG << "hpfs sync: Processing fs entries response for " << vpath; // Create physical directory on our side if not exist. - std::string parent_physical_path = std::string(ctx.hpfs_mount_dir).append(vpath); + std::string parent_physical_path = conf::ctx.hpfs_rw_dir + vpath.data(); if (util::create_dir_tree_recursive(parent_physical_path) == -1) return -1; // Get the children hash entries and compare with what we got from peer. std::vector existing_fs_entries; - if (hpfs::get_dir_children_hashes(existing_fs_entries, ctx.hpfs_mount_dir, vpath) == -1) + if (hpfs::get_dir_children_hashes(existing_fs_entries, hpfs::RW_SESSION_NAME, vpath) == -1) return -1; // Request more info on fs entries that exist on both sides but are different. @@ -545,7 +544,7 @@ namespace hpfs_sync else { // If there was an entry that does not exist on other side, delete it. - std::string child_physical_path = std::string(ctx.hpfs_mount_dir).append(child_vpath); + std::string child_physical_path = conf::ctx.hpfs_rw_dir + child_vpath.data(); if ((ex_entry.is_file && unlink(child_physical_path.c_str()) == -1) || !ex_entry.is_file && util::remove_directory_recursively(child_physical_path.c_str()) == -1) @@ -588,7 +587,7 @@ namespace hpfs_sync // File block hashes on our side (file might not exist on our side). std::vector existing_hashes; - if (hpfs::get_file_block_hashes(existing_hashes, ctx.hpfs_mount_dir, vpath) == -1 && errno != ENOENT) + if (hpfs::get_file_block_hashes(existing_hashes, hpfs::RW_SESSION_NAME, vpath) == -1 && errno != ENOENT) return -1; const size_t existing_hash_count = existing_hashes.size(); @@ -605,7 +604,7 @@ namespace hpfs_sync if (existing_hashes.size() >= hash_count) { // If peer file might be smaller, truncate our file to match with peer file. - std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(vpath); + std::string file_physical_path = conf::ctx.hpfs_rw_dir + vpath.data(); if (truncate(file_physical_path.c_str(), file_length) == -1) return -1; } @@ -626,7 +625,7 @@ namespace hpfs_sync << " (len:" << buf.length() << ") of " << vpath; - std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(vpath); + std::string file_physical_path = conf::ctx.hpfs_rw_dir + vpath.data(); const int fd = open(file_physical_path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, FILE_PERMS); if (fd == -1) { diff --git a/src/hpfs/hpfs_sync.hpp b/src/hpfs/hpfs_sync.hpp index 80744222..b1b1930d 100644 --- a/src/hpfs/hpfs_sync.hpp +++ b/src/hpfs/hpfs_sync.hpp @@ -52,7 +52,6 @@ namespace hpfs_sync std::shared_mutex target_state_mutex; std::atomic is_syncing = false; std::atomic is_shutting_down = false; - std::string hpfs_mount_dir; }; extern sync_context ctx; diff --git a/src/main.cpp b/src/main.cpp index b06d0318..0f2c9cb5 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -199,11 +199,12 @@ int main(int argc, char **argv) LOG_INFO << "Public key: " << conf::cfg.node.public_key_hex; LOG_INFO << "Contract: " << conf::cfg.contract.id << " (" << conf::cfg.contract.version << ")"; - if (ledger::init() == -1 || - unl::init() == -1 || - hpfs::init() == -1 || + if (hpfs::init() == -1 || + conf::apply_patch_changes() == -1 || hpfs_serve::init() == -1 || hpfs_sync::init() == -1 || + ledger::init() == -1 || + unl::init() == -1 || consensus::init() == -1 || read_req::init() == -1 || p2p::init() == -1 || diff --git a/src/sc.cpp b/src/sc.cpp index 06bb4f77..f8de1234 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -89,7 +89,7 @@ namespace sc execv_args[j] = conf::cfg.contract.runtime_binexec_args[i].data(); execv_args[len - 1] = NULL; - const std::string current_dir = std::string(ctx.args.hpfs_dir).append(STATE_DIR_PATH); + const std::string current_dir = hpfs::physical_path(ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH); chdir(current_dir.c_str()); execv(execv_args[0], execv_args); @@ -104,27 +104,6 @@ namespace sc cleanup_fds(ctx); - util::h32 patch_hash; - if (hpfs::get_hash(patch_hash, ctx.args.hpfs_dir, conf::PATCH_FILE_PATH) == 1) - { - if (patch_hash != hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH)) - { - - // Appling new patch file changes to hpcore runtime. - if (conf::validate_and_apply_patch_config(conf::cfg.contract, ctx.args.hpfs_dir) == -1) - { - LOG_ERROR << "Appling patch file after contract execution failed"; - } - else - { - // Update global hash tracker with the new patch file hash. - hpfs::ctx.set_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH, patch_hash); - - unl::update_unl_changes_from_patch(); - } - } - } - if (stop_hpfs_session(ctx) == -1) ret = -1; @@ -170,50 +149,59 @@ namespace sc } /** - * Starts the hpfs read/write virtual filesystem. + * Starts the hpfs virtual filesystem session used for contract execution. */ int start_hpfs_session(execution_context &ctx) { - // In readonly mode, we must start the hpfs process first. - // In RW mode, there is a global hpfs RW process so we only need to create an fs session. - if (ctx.args.readonly) - { - if (hpfs::start_ro_rw_process(ctx.hpfs_pid, ctx.args.hpfs_dir, true, false, false) == -1) - return -1; - } - else - { - ctx.hpfs_pid = hpfs::ctx.hpfs_rw_pid; - } + if (!ctx.args.readonly) + ctx.args.hpfs_session_name = hpfs::RW_SESSION_NAME; - if (hpfs::start_fs_session(ctx.args.hpfs_dir) == -1) - return -1; - - return 0; + return ctx.args.readonly ? hpfs::start_ro_session(ctx.args.hpfs_session_name, false) + : hpfs::acquire_rw_session(); } /** - * Stops the hpfs virtual filesystem. + * Stops the hpfs virtual filesystem session. */ int stop_hpfs_session(execution_context &ctx) { - int result = 0; - // Read the root hash if not in readonly mode. - if (!ctx.args.readonly && hpfs::get_hash(ctx.args.post_execution_state_hash, ctx.args.hpfs_dir, STATE_DIR_PATH) < 1) - result = -1; + if (ctx.args.readonly) + { + return hpfs::stop_ro_session(ctx.args.hpfs_session_name); + } + else + { + // Read the state hash if not in readonly mode. + if (hpfs::get_hash(ctx.args.post_execution_state_hash, ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH) < 1) + { + hpfs::release_rw_session(); + return -1; + } - LOG_DEBUG << "Stopping hpfs contract session..." << (ctx.args.readonly ? " (rdonly)" : ""); + util::h32 patch_hash; + const int patch_hash_result = hpfs::get_hash(patch_hash, ctx.args.hpfs_session_name, hpfs::PATCH_FILE_PATH); + if (patch_hash_result == -1) + { + hpfs::release_rw_session(); + return -1; + } + else if (patch_hash_result == 1 && patch_hash != hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH)) + { + // Appling new patch file changes to hpcore runtime. + if (conf::validate_and_apply_patch_config(conf::cfg.contract, ctx.args.hpfs_session_name) == -1) + { + LOG_ERROR << "Appling patch file after contract execution failed"; + } + else + { + // Update global hash tracker with the new patch file hash. + hpfs::ctx.set_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH, patch_hash); + unl::update_unl_changes_from_patch(); + } + } - if (hpfs::stop_fs_session(ctx.args.hpfs_dir) == -1) - return -1; - - // In readonly mode, we must also stop the hpfs process itself after sopping the session. - // In RW mode, we only need to stop the fs session and let the process keep running. - if (ctx.args.readonly && util::kill_process(ctx.hpfs_pid, true) == -1) - result = -1; - - ctx.hpfs_pid = 0; - return result; + return hpfs::release_rw_session(); + } } /** diff --git a/src/sc.hpp b/src/sc.hpp index 8cc2bf60..f6eb1836 100644 --- a/src/sc.hpp +++ b/src/sc.hpp @@ -15,7 +15,6 @@ namespace sc { constexpr uint16_t MAX_NPL_MSG_QUEUE_SIZE = 64; // Maximum npl message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31).... constexpr uint16_t MAX_CONTROL_MSG_QUEUE_SIZE = 64; // Maximum out message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31).... - constexpr const char *STATE_DIR_PATH = "/state"; // State directory name. struct fd_pair { @@ -60,8 +59,8 @@ namespace sc // Whether the contract should execute in read only mode (to serve read requests). bool readonly = false; - // Hpfs dir path to be used for this execution. - std::string hpfs_dir; + // hpfs session name used for this execution. + std::string hpfs_session_name; // Map of user I/O buffers (map key: user binary public key). // The value is a pair holding consensus-verified inputs and contract-generated outputs. @@ -112,9 +111,6 @@ namespace sc // Holds the contract process id (if currently executing). pid_t contract_pid = 0; - // Holds the hpfs rw process id (if currently executing). - pid_t hpfs_pid = 0; - // Thread to collect contract inputs and outputs and feed npl messages while contract is running. std::thread contract_monitor_thread; diff --git a/src/usr/read_req.cpp b/src/usr/read_req.cpp index 36586538..bb301223 100644 --- a/src/usr/read_req.cpp +++ b/src/usr/read_req.cpp @@ -217,9 +217,7 @@ namespace read_req */ void initialize_execution_context(const user_read_req &read_request, const pthread_t thread_id, sc::execution_context &contract_ctx) { - // Create new folder with the thread id per each thread. - contract_ctx.args.hpfs_dir = conf::ctx.hpfs_dir; - contract_ctx.args.hpfs_dir.append("/rr_").append(std::to_string(thread_id)); + contract_ctx.args.hpfs_session_name = "ro_" + std::to_string(thread_id); contract_ctx.args.readonly = true; sc::contract_iobufs user_bufs; user_bufs.inputs.push_back(read_request.content); diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 26f41ec1..47205454 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -8,6 +8,7 @@ #include "../hplog.hpp" #include "../ledger.hpp" #include "../util/buffer_store.hpp" +#include "../hpfs/hpfs.hpp" #include "usr.hpp" #include "user_session_handler.hpp" #include "user_comm_session.hpp" @@ -389,7 +390,8 @@ namespace usr util::fork_detach(); // before execution chdir into a valid the latest state data directory that contains an appbill.table - chdir(conf::ctx.hpfs_rw_dir.c_str()); + const std::string appbill_dir = conf::ctx.hpfs_rw_dir + hpfs::STATE_DIR_PATH; + chdir(appbill_dir.c_str()); int ret = execv(execv_args[0], execv_args); std::cerr << errno << ": Appbill process execv failed.\n"; return false; diff --git a/test/bin/hpfs b/test/bin/hpfs index e3335f49..3a8a1c91 100755 Binary files a/test/bin/hpfs and b/test/bin/hpfs differ