diff --git a/src/comm/comm_server.cpp b/src/comm/comm_server.cpp index f6d2366f..afa4aedd 100644 --- a/src/comm/comm_server.cpp +++ b/src/comm/comm_server.cpp @@ -158,7 +158,8 @@ namespace comm if (std::holds_alternative(client_result)) { const hpws::error error = std::get(client_result); - LOG_ERROR << "Outbound connection hpws error:" << error.first << " " << error.second; + if (error.first != 202) + LOG_ERROR << "Outbound connection hpws error:" << error.first << " " << error.second; } else { diff --git a/src/conf.cpp b/src/conf.cpp index a4b19dee..1ab27d01 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -77,8 +77,8 @@ namespace conf // Recursivly create contract directories. util::create_dir_tree_recursive(ctx.config_dir); util::create_dir_tree_recursive(ctx.hist_dir); - util::create_dir_tree_recursive(ctx.state_rw_dir); util::create_dir_tree_recursive(ctx.log_dir); + util::create_dir_tree_recursive(ctx.state_dir); //Create config file with default settings. @@ -153,6 +153,7 @@ namespace conf ctx.hist_dir = basedir + "/hist"; ctx.state_dir = basedir + "/state"; ctx.state_rw_dir = ctx.state_dir + "/rw"; + ctx.state_serve_dir = ctx.state_dir + "/ss"; ctx.log_dir = basedir + "/log"; } diff --git a/src/conf.hpp b/src/conf.hpp index 8b7f0a47..9588ed2b 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -34,20 +34,21 @@ namespace conf // Holds contextual information about the currently loaded contract. struct contract_ctx { - std::string command; // The CLI command issued to launch HotPocket - std::string exe_dir; // Hot Pocket executable dir. - std::string hpws_exe_path; // hpws executable file path. - std::string hpfs_exe_path; // hpfs executable file path. + std::string command; // The CLI command issued to launch HotPocket + std::string exe_dir; // Hot Pocket executable dir. + std::string hpws_exe_path; // hpws executable file path. + std::string hpfs_exe_path; // hpfs executable file path. - std::string contract_dir; // Contract base directory full path - std::string hist_dir; // Contract ledger history dir full path - std::string state_dir; // Contract state maintenence path (hpfs path) - std::string state_rw_dir; // Contract executation read/write state path. - std::string log_dir; // Contract log dir full path - std::string config_dir; // Contract config dir full path - std::string config_file; // Full path to the contract config file - std::string tls_key_file; // Full path to the tls secret key file - std::string tls_cert_file; // Full path to the tls certificate + std::string contract_dir; // Contract base directory full path + std::string hist_dir; // Contract ledger history dir full path + std::string state_dir; // Contract state maintenence path (hpfs path) + std::string state_rw_dir; // Contract executation read/write state path. + std::string state_serve_dir; // State server hpfs mount path. + std::string log_dir; // Contract log dir full path + std::string config_dir; // Contract config dir full path + std::string config_file; // Full path to the contract config file + std::string tls_key_file; // Full path to the tls secret key file + std::string tls_cert_file; // Full path to the tls certificate }; // Holds all the contract config values. @@ -88,7 +89,7 @@ namespace conf uint64_t peermaxbadsigpm = 0; // Peer bad signatures per minute uint16_t peermaxcons = 0; // Max inbound peer connections - bool msgforwarding = false; // Whether peer message forwarding is on/off. + bool msgforwarding = false; // Whether peer message forwarding is on/off. std::string loglevel; // Log severity level (debug, info, warn, error) LOG_SEVERITY loglevel_type; // Log severity level enum (debug, info, warn, error) diff --git a/src/consensus.cpp b/src/consensus.cpp index 446afd5b..f38b733f 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -79,7 +79,7 @@ namespace consensus /** * Joins the consensus processing thread. - */ + */ void wait() { ctx.consensus_thread.join(); @@ -222,8 +222,8 @@ namespace consensus } /** - * Cleanup any outdated proposals from the candidate set. - */ + * Cleanup any outdated proposals from the candidate set. + */ void purify_candidate_proposals() { auto itr = ctx.candidate_proposals.begin(); @@ -253,9 +253,9 @@ namespace consensus } /** - * Syncrhonise the stage/round time for fixed intervals and reset the stage. - * @return True if consensus can proceed in the current round. False if stage is reset. - */ + * Syncrhonise the stage/round time for fixed intervals and reset the stage. + * @return True if consensus can proceed in the current round. False if stage is reset. + */ bool wait_and_proceed_stage(uint64_t &stage_start) { // Here, nodes try to synchronise nodes stages using network clock. @@ -304,9 +304,9 @@ namespace consensus } /** - * Broadcasts any inputs from locally connected users via an NUP. - * @return 0 for successful broadcast. -1 for failure. - */ + * Broadcasts any inputs from locally connected users via an NUP. + * @return 0 for successful broadcast. -1 for failure. + */ void broadcast_nonunl_proposal() { if (usr::ctx.users.empty()) @@ -347,9 +347,9 @@ namespace consensus } /** - * Verifies the user signatures and populate non-expired user inputs from collected - * non-unl proposals (if any) into consensus candidate data. - */ + * Verifies the user signatures and populate non-expired user inputs from collected + * non-unl proposals (if any) into consensus candidate data. + */ void verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no) { // Lock the user sessions and the list so any network activity is blocked. @@ -452,11 +452,11 @@ namespace consensus } /** - * Executes the appbill and verifies whether the user has enough account balance to process the provided input. - * @param pubkey User binary pubkey. - * @param input_len Total bytes length of user input. - * @return Whether the user is allowed to process the input or not. - */ + * Executes the appbill and verifies whether the user has enough account balance to process the provided input. + * @param pubkey User binary pubkey. + * @param input_len Total bytes length of user input. + * @return Whether the user is allowed to process the input or not. + */ bool verify_appbill_check(std::string_view pubkey, const size_t input_len) { // If appbill not enabled always green light the input. @@ -618,9 +618,9 @@ namespace consensus } /** - * Broadcasts the given proposal to all connected peers. - * @return 0 on success. -1 if no peers to broadcast. - */ + * Broadcasts the given proposal to all connected peers. + * @return 0 on success. -1 if no peers to broadcast. + */ void broadcast_proposal(const p2p::proposal &p) { flatbuffers::FlatBufferBuilder fbuf(1024); @@ -641,8 +641,8 @@ namespace consensus } /** - * Check our LCL is consistent with the proposals being made by our UNL peers lcl_votes. - */ + * Check our LCL is consistent with the proposals being made by our UNL peers lcl_votes. + */ void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes, std::string_view lcl) { int32_t total_lcl_votes = 0; @@ -694,9 +694,9 @@ namespace consensus } /** - * Check state against the winning and canonical state - * @param votes The voting table. - */ + * Check state against the winning and canonical state + * @param votes The voting table. + */ void check_state_votes(bool &is_desync, hpfs::h32 &majority_state, vote_counter &votes) { for (const auto &[pubkey, cp] : ctx.candidate_proposals) @@ -721,9 +721,9 @@ namespace consensus } /** - * Returns the consensus percentage threshold for the specified stage. - * @param stage The consensus stage [1, 2, 3] - */ + * Returns the consensus percentage threshold for the specified stage. + * @param stage The consensus stage [1, 2, 3] + */ float_t get_stage_threshold(const uint8_t stage) { switch (stage) @@ -739,9 +739,9 @@ namespace consensus } /** - * Finalize the ledger after consensus. - * @param cons_prop The proposal that reached consensus. - */ + * Finalize the ledger after consensus. + * @param cons_prop The proposal that reached consensus. + */ int apply_ledger(const p2p::proposal &cons_prop, const uint64_t lcl_seq_no, std::string_view lcl) { if (ledger::save_ledger(cons_prop) == -1) @@ -787,9 +787,9 @@ namespace consensus } /** - * Dispatch any consensus-reached outputs to matching users if they are connected to us locally. - * @param cons_prop The proposal that achieved consensus. - */ + * Dispatch any consensus-reached outputs to matching users if they are connected to us locally. + * @param cons_prop The proposal that achieved consensus. + */ void dispatch_user_outputs(const p2p::proposal &cons_prop, const uint64_t lcl_seq_no, std::string_view lcl) { std::scoped_lock lock(usr::ctx.users_mutex); @@ -836,10 +836,10 @@ namespace consensus } /** - * Transfers consensus-reached inputs into the provided contract buf map so it can be fed into the contract process. - * @param bufmap The contract bufmap which needs to be populated with inputs. - * @param cons_prop The proposal that achieved consensus. - */ + * Transfers consensus-reached inputs into the provided contract buf map so it can be fed into the contract process. + * @param bufmap The contract bufmap which needs to be populated with inputs. + * @param cons_prop The proposal that achieved consensus. + */ void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop) { // Populate the buf map with all currently connected users regardless of whether they have inputs or not. @@ -877,10 +877,10 @@ namespace consensus } /** - * Reads any outputs the contract has produced on the provided buf map and transfers them to candidate outputs - * for the next consensus round. - * @param bufmap The contract bufmap containing the outputs produced by the contract. - */ + * Reads any outputs the contract has produced on the provided buf map and transfers them to candidate outputs + * for the next consensus round. + * @param bufmap The contract bufmap containing the outputs produced by the contract. + */ void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap) { for (auto &[pubkey, bufpair] : bufmap) @@ -899,10 +899,10 @@ namespace consensus } /** - * Increment voting table counter. - * @param counter The counter map in which a vote should be incremented. - * @param candidate The candidate whose vote should be increased by 1. - */ + * Increment voting table counter. + * @param counter The counter map in which a vote should be incremented. + * @param candidate The candidate whose vote should be increased by 1. + */ template void increment(std::map &counter, const T &candidate) { @@ -913,18 +913,15 @@ namespace consensus } /** - * Get the contract state hash. - */ + * Get the contract state hash. + */ int get_initial_state_hash(hpfs::h32 &hash) { - pid_t pid; - std::string mount_dir; - if (hpfs::start_fs_session(pid, mount_dir, "ro", true, 60000) == -1) + if (hpfs::start_fs_session(conf::ctx.state_rw_dir) == -1 || + hpfs::get_hash(ctx.state, conf::ctx.state_rw_dir, "/") == -1 || + hpfs::stop_fs_session(conf::ctx.state_rw_dir) == -1) return -1; - - int res = get_hash(hash, mount_dir, "/"); - util::kill_process(pid, true); - return res; + return 0; } void on_state_sync_completion(const hpfs::h32 new_state) diff --git a/src/hpfs/hpfs.cpp b/src/hpfs/hpfs.cpp index 0fbccab3..7a729b7b 100644 --- a/src/hpfs/hpfs.cpp +++ b/src/hpfs/hpfs.cpp @@ -8,41 +8,23 @@ namespace hpfs { constexpr const char *HPFS_TRACE_ARG_ERROR = "trace=error"; constexpr const char *HPFS_TRACE_ARG_DEBUG = "trace=debug"; + constexpr const char *HPFS_HMAP_HASH = "::hpfs.hmap.hash"; + constexpr const char *HPFS_HMAP_CHILDREN = "::hpfs.hmap.children"; + constexpr const char *HPFS_SESSION = "::hpfs.session"; + constexpr ino_t HPFS_ROOT_INO = 2; constexpr uint16_t INIT_CHECK_INTERVAL = 20; - pid_t merge_pid = 0; - bool init_success = false; - const char *active_hpfs_trace_arg; - - int init() - { - active_hpfs_trace_arg = (conf::cfg.loglevel_type == conf::LOG_SEVERITY::DEBUG ? HPFS_TRACE_ARG_DEBUG : HPFS_TRACE_ARG_ERROR); - - LOG_INFO << "Starting hpfs merge process..."; - if (start_merge_process() == -1) - return -1; - - LOG_INFO << "Started hpfs merge process. pid:" << merge_pid; - init_success = true; - return 0; - } - - void deinit() - { - if (init_success) - { - LOG_INFO << "Stopping hpfs merge process... pid:" << merge_pid; - if (merge_pid > 0 && util::kill_process(merge_pid, true) == 0) - LOG_INFO << "Stopped hpfs merge process."; - } - } - - int start_merge_process() + /** + * Starts hpfs merge process. + */ + int start_merge_process(pid_t &hpfs_pid) { const pid_t pid = fork(); if (pid > 0) { + LOG_DEBUG << "Starting hpfs merge process..."; + // HotPocket process. util::sleep(INIT_CHECK_INTERVAL); @@ -51,13 +33,16 @@ namespace hpfs if (util::kill_process(pid, false, 0) == -1) return -1; - merge_pid = pid; + hpfs_pid = pid; + LOG_DEBUG << "hpfs merge process started. pid:" << hpfs_pid; } else if (pid == 0) { // hpfs process. util::fork_detach(); + const char *active_hpfs_trace_arg = (conf::cfg.loglevel_type == conf::LOG_SEVERITY::DEBUG ? HPFS_TRACE_ARG_DEBUG : HPFS_TRACE_ARG_ERROR); + // Fill process args. char *execv_args[] = { conf::ctx.hpfs_exe_path.data(), @@ -79,15 +64,19 @@ namespace hpfs return 0; } - int start_fs_session(pid_t &session_pid, std::string &mount_dir, - const char *mode, const bool hash_map_enabled, const uint16_t timeout) + /** + * Starts hpfs readonly/readwrite process and also starts a virtual fs session. + */ + int start_ro_rw_process(pid_t &hpfs_pid, std::string &mount_dir, const bool readonly, + const bool hash_map_enabled, const bool auto_start_session, const uint16_t timeout) { const pid_t pid = fork(); + const char *mode = readonly ? "ro" : "rw"; if (pid > 0) { // HotPocket process. - LOG_DEBUG << "Starting hpfs " << mode << " session..."; + LOG_DEBUG << "Starting hpfs " << mode << " process at " << mount_dir; // If the mount dir is not specified, assign a mount dir based on hpfs process id. if (mount_dir.empty()) @@ -95,11 +84,6 @@ namespace hpfs .append("/") .append(std::to_string(pid)); - // The path used for checking whether hpfs has finished initializing. - const std::string check_path = hash_map_enabled - ? std::string(mount_dir).append("/::hpfs.hmap.hash") - : mount_dir; - // Wait until hpfs is initialized properly. const uint16_t max_retries = timeout / INIT_CHECK_INTERVAL; bool hpfs_initialized = false; @@ -112,35 +96,38 @@ namespace hpfs // Sending signal 0 to test whether process exist. if (util::kill_process(pid, false, 0) == -1) { - LOG_ERROR << "hpfs process " << pid << " has stopped."; + LOG_ERROR << "hpfs process " << pid << " has stopped at " << mount_dir; break; } - // If hash map is enabled we check whether stat succeeds on the root hash. - // If not, we check whether the inode no. of the mounted root dir is 1. + // We check for the specific inode no. of the mounted root dir. That means hpfs FUSE interface is up. struct stat st; - hpfs_initialized = (stat(check_path.c_str(), &st) == 0 && - (hash_map_enabled || st.st_ino == 1)); - - // The only error that warrants a retry is ENOENT (no entry). - // When hpfs is fully initialized we should receive some file from check_path. - if (!hpfs_initialized && errno != ENOENT) + if (stat(mount_dir.c_str(), &st) == -1) { - LOG_ERROR << errno << ": Error in checking hpfs status."; + LOG_ERROR << errno << ": Error in checking hpfs status at " << mount_dir; break; } + hpfs_initialized = (st.st_ino == HPFS_ROOT_INO); + // Keep retrying until root inode no. matches or timeout occurs. + } while (!hpfs_initialized && ++retry_count <= max_retries); - // Kill the process if hpfs couldn't be initialized after the wait period. + // If hpfs FUSE interface initialized within the timeout period, we then attempt to start up a virtual fs session. + // hpfs achieves this by having a 'session' file created. + if (hpfs_initialized && auto_start_session) + start_fs_session(mount_dir); + + // Kill the process if hpfs couldn't be initialized properly. if (!hpfs_initialized) { - LOG_ERROR << "Couldn't initialize hpfs session."; + LOG_ERROR << "Couldn't initialize hpfs session at " << mount_dir; util::kill_process(pid, true); return -1; } - session_pid = pid; + hpfs_pid = pid; + LOG_DEBUG << "hpfs " << mode << " process started at " << mount_dir << " pid:" << hpfs_pid; } else if (pid == 0) { @@ -154,6 +141,8 @@ namespace hpfs .append("/") .append(std::to_string(self_pid)); + const char *active_hpfs_trace_arg = (conf::cfg.loglevel_type == conf::LOG_SEVERITY::DEBUG ? HPFS_TRACE_ARG_DEBUG : HPFS_TRACE_ARG_ERROR); + // Fill process args. char *execv_args[] = { conf::ctx.hpfs_exe_path.data(), @@ -170,20 +159,52 @@ namespace hpfs } else { - LOG_ERROR << errno << ": fork() failed when starting hpfs session process."; + LOG_ERROR << errno << ": fork() failed when starting hpfs process."; return -1; } return 0; } + /** + * Starts a virtual fs session on the hpfs process attached to the specified mount dir. + */ + int start_fs_session(std::string_view mount_dir) + { + LOG_DEBUG << "Starting hpfs fs session at " << mount_dir; + + const std::string session_file = std::string(mount_dir).append("/").append(HPFS_SESSION); + if (mknod(session_file.c_str(), 0, 0) == -1) + { + LOG_ERROR << errno << ": Error starting hpfs fs session at " << mount_dir; + return -1; + } + return 0; + } + + /** + * Stops the active virtual fs session on the hpfs process attached to the specified mount dir. + */ + int stop_fs_session(std::string_view mount_dir) + { + LOG_DEBUG << "Stopping hpfs fs session at " << mount_dir; + + const std::string session_file = std::string(mount_dir).append("/").append(HPFS_SESSION); + if (unlink(session_file.c_str()) == -1) + { + LOG_ERROR << errno << ": Error stopping hpfs fs session at " << mount_dir; + return -1; + } + return 0; + } + /** * Populates the hash of the specified vpath. * @return 1 on success. 0 if vpath not found. -1 on error. */ int get_hash(h32 &hash, const std::string_view mount_dir, const std::string_view vpath) { - const std::string path = std::string(mount_dir).append(vpath).append("::hpfs.hmap.hash"); + const std::string path = std::string(mount_dir).append(vpath).append(HPFS_HMAP_HASH); const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC); if (fd == -1 && errno == ENOENT) { @@ -212,7 +233,7 @@ namespace hpfs */ int get_file_block_hashes(std::vector &hashes, const std::string_view mount_dir, const std::string_view vpath) { - const std::string path = std::string(mount_dir).append(vpath).append("::hpfs.hmap.children"); + const std::string path = std::string(mount_dir).append(vpath).append(HPFS_HMAP_CHILDREN); const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC); if (fd == -1 && errno == ENOENT) { diff --git a/src/hpfs/hpfs.hpp b/src/hpfs/hpfs.hpp index f27dca44..96028d53 100644 --- a/src/hpfs/hpfs.hpp +++ b/src/hpfs/hpfs.hpp @@ -18,11 +18,11 @@ namespace hpfs } }; - int init(); - void deinit(); - int start_merge_process(); - int start_fs_session(pid_t &session_pid, std::string &mount_dir, - const char *mode, const bool hash_map_enabled, const uint16_t timeout = 4000); + int start_merge_process(pid_t &hpfs_pid); + int start_ro_rw_process(pid_t &hpfs_pid, std::string &mount_dir, const bool readonly, + const bool hash_map_enabled, const bool auto_start_session, const uint16_t timeout = 4000); + int start_fs_session(std::string_view mount_dir); + int stop_fs_session(std::string_view mount_dir); int get_hash(h32 &hash, const std::string_view mount_dir, const std::string_view vpath); int get_file_block_hashes(std::vector &hashes, const std::string_view mount_dir, const std::string_view vpath); int get_dir_children_hashes(std::vector &hash_nodes, const std::string_view mount_dir, const std::string_view dir_vpath); diff --git a/src/main.cpp b/src/main.cpp index e3f84152..9c3f953f 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -73,9 +73,9 @@ void deinit() state_sync::deinit(); state_serve::deinit(); read_req::deinit(); + sc::deinit(); usr::deinit(); p2p::deinit(); - hpfs::deinit(); } void sigint_handler(int signum) @@ -192,8 +192,14 @@ int main(int argc, char **argv) << (conf::cfg.startup_mode == conf::OPERATING_MODE::OBSERVER ? "Observer" : "Proposer"); LOG_INFO << "Public key: " << conf::cfg.pubkeyhex.substr(2); // Public key without 'ed' prefix. - if (hpfs::init() != 0 || p2p::init() != 0 || usr::init() != 0 || read_req::init() != 0 || - state_serve::init() != 0 || state_sync::init() != 0 || ledger::init() || consensus::init() != 0) + if (p2p::init() != 0 || + usr::init() != 0 || + sc::init() || + read_req::init() != 0 || + state_serve::init() != 0 || + state_sync::init() != 0 || + ledger::init() || + consensus::init() != 0) { deinit(); return -1; diff --git a/src/sc.cpp b/src/sc.cpp index cee50f11..7a74f2a7 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -10,6 +10,47 @@ namespace sc { const int MAX_NPL_BUF_SIZE = 128 * 1024; + bool init_success = false; + + // We maintain two hpfs global processes for merging and rw sessions. + pid_t hpfs_merge_pid = 0; + pid_t hpfs_rw_pid = 0; + + /** + * Performs system startup activitites related to smart contract execution. + */ + int init() + { + if (hpfs::start_merge_process(hpfs_merge_pid) == -1) + return -1; + + if (hpfs::start_ro_rw_process(hpfs_rw_pid, conf::ctx.state_rw_dir, false, true, false) == -1) + { + // Stop the merge process in case of failure. + util::kill_process(hpfs_merge_pid, true); + return -1; + } + + init_success = true; + return 0; + } + + /** + * Performs global cleanup related to smart contract execution. + */ + void deinit() + { + if (init_success) + { + LOG_DEBUG << "Stopping hpfs rw process... pid:" << hpfs_rw_pid; + if (hpfs_rw_pid > 0 && util::kill_process(hpfs_rw_pid, true) == 0) + LOG_INFO << "Stopped hpfs rw process."; + + LOG_DEBUG << "Stopping hpfs merge process... pid:" << hpfs_merge_pid; + if (hpfs_merge_pid > 0 && util::kill_process(hpfs_merge_pid, true) == 0) + LOG_INFO << "Stopped hpfs merge process."; + } + } /** * Executes the contract process and passes the specified context arguments. @@ -18,7 +59,7 @@ namespace sc int execute_contract(execution_context &ctx) { // Start the hpfs rw session before starting the contract process. - if (start_hpfs_rw_session(ctx) == -1) + if (start_hpfs_session(ctx) == -1) return -1; // Setup io pipes and feed all inputs to them. @@ -122,7 +163,7 @@ namespace sc ret = -1; success: - if (stop_hpfs_rw_session(ctx) == -1) + if (stop_hpfs_session(ctx) == -1) ret = -1; cleanup_fdmap(ctx.userfds); @@ -154,30 +195,41 @@ namespace sc /** * Starts the hpfs read/write state filesystem. */ - int start_hpfs_rw_session(execution_context &ctx) + int start_hpfs_session(execution_context &ctx) { - if (hpfs::start_fs_session(ctx.hpfs_pid, ctx.args.state_dir, ctx.args.readonly ? "ro" : "rw", true) == -1) + // In readonly mode, we must start the hpfs process first. + // In RW mode, there is a global hpfs RW process so we only need to create an fs session. + if (ctx.args.readonly && hpfs::start_ro_rw_process(ctx.hpfs_pid, ctx.args.state_dir, true, false, false) == -1) + return -1; + else + ctx.hpfs_pid = hpfs_rw_pid; + + if (hpfs::start_fs_session(ctx.args.state_dir) == -1) return -1; - LOG_DEBUG << "hpfs session started. pid:" << ctx.hpfs_pid << (ctx.args.readonly ? " (rdonly)" : ""); return 0; } /** * Stops the hpfs state filesystem. */ - int stop_hpfs_rw_session(execution_context &ctx) + int stop_hpfs_session(execution_context &ctx) { int result = 0; // Read the root hash if not in readonly mode. if (!ctx.args.readonly && hpfs::get_hash(ctx.args.post_execution_state_hash, ctx.args.state_dir, "/") < 1) result = -1; - LOG_DEBUG << "Stopping hpfs session... pid:" << ctx.hpfs_pid << (ctx.args.readonly ? " (rdonly)" : ""); + LOG_DEBUG << "Stopping hpfs contract session..." << (ctx.args.readonly ? " (rdonly)" : ""); - if (util::kill_process(ctx.hpfs_pid, true) == -1) + // In readonly mode, we must also stop the hpfs process itself. + // In RW mode, we only need to stop the fs session and let the RW process keep running. + if (ctx.args.readonly && util::kill_process(ctx.hpfs_pid, true) == -1) result = -1; + if (hpfs::stop_fs_session(ctx.args.state_dir) == -1) + return -1; + ctx.hpfs_pid = 0; return result; } diff --git a/src/sc.hpp b/src/sc.hpp index a4178830..567979b2 100644 --- a/src/sc.hpp +++ b/src/sc.hpp @@ -121,15 +121,19 @@ namespace sc bool should_stop = false; }; + int init(); + + void deinit(); + int execute_contract(execution_context &ctx); //------Internal-use functions for this namespace. int await_process_execution(pid_t pid); - int start_hpfs_rw_session(execution_context &ctx); + int start_hpfs_session(execution_context &ctx); - int stop_hpfs_rw_session(execution_context &ctx); + int stop_hpfs_session(execution_context &ctx); int write_contract_args(const execution_context &ctx); diff --git a/src/state/state_serve.cpp b/src/state/state_serve.cpp index c4d6df84..2dadc697 100644 --- a/src/state/state_serve.cpp +++ b/src/state/state_serve.cpp @@ -24,11 +24,16 @@ namespace state_serve bool is_shutting_down = false; bool init_success = false; + pid_t hpfs_pid; std::thread state_serve_thread; int init() { REQUEST_BATCH_TIMEOUT = state_common::get_request_resubmit_timeout() * 0.9; + + if (hpfs::start_ro_rw_process(hpfs_pid, conf::ctx.state_serve_dir, true, true, false) == -1) + return -1; + state_serve_thread = std::thread(state_serve_loop); init_success = true; return 0; @@ -40,6 +45,10 @@ namespace state_serve { is_shutting_down = true; state_serve_thread.join(); + + LOG_DEBUG << "Stopping hpfs state serve process... pid:" << hpfs_pid; + if (hpfs_pid > 0 && util::kill_process(hpfs_pid, true) == 0) + LOG_INFO << "Stopped hpfs state serve process."; } } @@ -66,37 +75,45 @@ namespace state_serve const uint64_t time_start = util::get_epoch_milliseconds(); const std::string lcl = ledger::ctx.get_lcl(); - for (auto &[session_id, request] : state_requests) + if (state_requests.empty()) + continue; + + if (hpfs::start_fs_session(conf::ctx.state_serve_dir) != -1) { - if (is_shutting_down) - break; - - // If we have spent too much time handling state requests, abandon the entire batch - // because the requester would have stopped waiting for us. - const uint64_t time_now = util::get_epoch_milliseconds(); - if ((time_now - time_start) > REQUEST_BATCH_TIMEOUT) - break; - - const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(request.data()); - - const p2p::state_request sr = p2pmsg::create_state_request_from_msg(*content->message_as_State_Request_Message()); - flatbuffers::FlatBufferBuilder fbuf(1024); - - if (state_serve::create_state_response(fbuf, sr, lcl) == 1) + for (auto &[session_id, request] : state_requests) { - // Find the peer that we should send the state response to. - std::scoped_lock lock(p2p::ctx.peer_connections_mutex); - const auto peer_itr = p2p::ctx.peer_connections.find(session_id); + if (is_shutting_down) + break; - if (peer_itr != p2p::ctx.peer_connections.end()) + // If we have spent too much time handling state requests, abandon the entire batch + // because the requester would have stopped waiting for us. + const uint64_t time_now = util::get_epoch_milliseconds(); + if ((time_now - time_start) > REQUEST_BATCH_TIMEOUT) + break; + + const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(request.data()); + + const p2p::state_request sr = p2pmsg::create_state_request_from_msg(*content->message_as_State_Request_Message()); + flatbuffers::FlatBufferBuilder fbuf(1024); + + if (state_serve::create_state_response(fbuf, sr, lcl) == 1) { - std::string_view msg = std::string_view( - reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); + // Find the peer that we should send the state response to. + std::scoped_lock lock(p2p::ctx.peer_connections_mutex); + const auto peer_itr = p2p::ctx.peer_connections.find(session_id); - comm::comm_session *session = peer_itr->second; - session->send(msg); + if (peer_itr != p2p::ctx.peer_connections.end()) + { + std::string_view msg = std::string_view( + reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); + + comm::comm_session *session = peer_itr->second; + session->send(msg); + } } } + + hpfs::stop_fs_session(conf::ctx.state_serve_dir); } state_requests.clear(); @@ -195,14 +212,9 @@ namespace state_serve int get_data_block(std::vector &block, const std::string_view vpath, const uint32_t block_id, const hpfs::h32 expected_hash) { - pid_t hpfs_pid = 0; - std::string mount_dir; - if (hpfs::start_fs_session(hpfs_pid, mount_dir, "ro", true) == -1) - return -1; - // Check whether the existing block hash matches expected hash. std::vector block_hashes; - int result = hpfs::get_file_block_hashes(block_hashes, mount_dir, vpath); + int result = hpfs::get_file_block_hashes(block_hashes, conf::ctx.state_serve_dir, vpath); if (result == 1) { if (block_id >= block_hashes.size()) @@ -218,7 +230,7 @@ namespace state_serve else // Get actual block data. { struct stat st; - const std::string file_path = std::string(mount_dir).append(vpath); + const std::string file_path = std::string(conf::ctx.state_serve_dir).append(vpath); const off_t block_offset = block_id * state_common::BLOCK_SIZE; const int fd = open(file_path.c_str(), O_RDONLY | O_CLOEXEC); if (fd == -1) @@ -253,7 +265,7 @@ namespace state_serve if (res < read_len) { LOG_ERROR << errno << ": Read failed (result:" << res - << " off:" << block_offset << " len:" << read_len << "). " << file_path; + << " off:" << block_offset << " len:" << read_len << "). " << file_path; result = -1; } else @@ -267,8 +279,6 @@ namespace state_serve } } - if (util::kill_process(hpfs_pid, true) == -1) - return -1; return result; } @@ -279,14 +289,9 @@ namespace state_serve int get_data_block_hashes(std::vector &hashes, size_t &file_length, const std::string_view vpath, const hpfs::h32 expected_hash) { - pid_t hpfs_pid = 0; - std::string mount_dir; - if (hpfs::start_fs_session(hpfs_pid, mount_dir, "ro", true) == -1) - return -1; - // Check whether the existing file hash matches expected hash. hpfs::h32 file_hash = hpfs::h32_empty; - int result = hpfs::get_hash(file_hash, mount_dir, vpath); + int result = hpfs::get_hash(file_hash, conf::ctx.state_serve_dir, vpath); if (result == 1) { if (file_hash != expected_hash) @@ -295,14 +300,14 @@ namespace state_serve result = 0; } // Get the block hashes. - else if (hpfs::get_file_block_hashes(hashes, mount_dir, vpath) < 0) + else if (hpfs::get_file_block_hashes(hashes, conf::ctx.state_serve_dir, vpath) < 0) { result = -1; } else { // Get actual file length. - const std::string file_path = std::string(mount_dir).append(vpath); + const std::string file_path = std::string(conf::ctx.state_serve_dir).append(vpath); struct stat st; if (stat(file_path.c_str(), &st) == -1) { @@ -314,8 +319,6 @@ namespace state_serve } } - if (util::kill_process(hpfs_pid, true) == -1) - return -1; return result; } @@ -326,14 +329,9 @@ namespace state_serve int get_fs_entry_hashes(std::vector &hash_nodes, const std::string_view vpath, const hpfs::h32 expected_hash) { - pid_t hpfs_pid = 0; - std::string mount_dir; - if (hpfs::start_fs_session(hpfs_pid, mount_dir, "ro", true) == -1) - return -1; - // Check whether the existing dir hash matches expected hash. hpfs::h32 dir_hash = hpfs::h32_empty; - int result = hpfs::get_hash(dir_hash, mount_dir, vpath); + int result = hpfs::get_hash(dir_hash, conf::ctx.state_serve_dir, vpath); if (result == 1) { if (dir_hash != expected_hash) @@ -342,7 +340,7 @@ namespace state_serve result = 0; } // Get the children hash nodes. - else if (hpfs::get_dir_children_hashes(hash_nodes, mount_dir, vpath) < 0) + else if (hpfs::get_dir_children_hashes(hash_nodes, conf::ctx.state_serve_dir, vpath) < 0) { result = -1; } @@ -352,8 +350,6 @@ namespace state_serve } } - if (util::kill_process(hpfs_pid, true) == -1) - return -1; return result; } } // namespace state_serve \ No newline at end of file diff --git a/src/state/state_sync.cpp b/src/state/state_sync.cpp index e77ed8df..bcaf4a81 100644 --- a/src/state/state_sync.cpp +++ b/src/state/state_sync.cpp @@ -34,6 +34,7 @@ namespace state_sync REQUEST_RESUBMIT_TIMEOUT = state_common::get_request_resubmit_timeout(); ctx.target_state = hpfs::h32_empty; ctx.state_sync_thread = std::thread(state_syncer_loop); + ctx.hpfs_mount_dir = conf::ctx.state_rw_dir; init_success = true; return 0; } @@ -88,8 +89,7 @@ namespace state_sync LOG_INFO << "State sync: Starting sync for target state: " << ctx.target_state; } - pid_t hpfs_pid = 0; - if (hpfs::start_fs_session(hpfs_pid, ctx.hpfs_mount_dir, "rw", true) != -1) + if (hpfs::start_fs_session(ctx.hpfs_mount_dir) != -1) { while (!ctx.is_shutting_down) { @@ -119,10 +119,8 @@ namespace state_sync } } } - - // Stop hpfs rw session. - LOG_DEBUG << "State sync: Stopping hpfs session... pid:" << hpfs_pid; - util::kill_process(hpfs_pid, true); + + hpfs::stop_fs_session(ctx.hpfs_mount_dir); } else { diff --git a/test/bin/hpfs b/test/bin/hpfs index a6bec3d3..1777cc73 100755 Binary files a/test/bin/hpfs and b/test/bin/hpfs differ