diff --git a/CMakeLists.txt b/CMakeLists.txt index afbaef35..86da05d2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,12 +34,14 @@ add_executable(hpcore src/crypto.cpp src/conf.cpp src/hplog.cpp - src/sc.cpp src/bill/corebill.cpp - src/hpfs/hpfs.cpp src/hpfs/hpfs_mount.cpp src/hpfs/hpfs_serve.cpp src/hpfs/hpfs_sync.cpp + src/sc/contract_mount.cpp + src/sc/contract_serve.cpp + src/sc/contract_sync.cpp + src/sc/sc.cpp src/comm/comm_session.cpp src/msg/fbuf/common_helpers.cpp src/msg/fbuf/p2pmsg_helpers.cpp @@ -61,6 +63,9 @@ add_executable(hpcore src/usr/read_req.cpp src/ledger.cpp src/ledger/sqlite.cpp + src/ledger/ledger_mount.cpp + src/ledger/ledger_sync.cpp + src/ledger/ledger_serve.cpp src/ledger/ledger_sample.cpp src/consensus.cpp src/main.cpp diff --git a/src/conf.cpp b/src/conf.cpp index 26e60183..35cdadaa 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -1,7 +1,7 @@ #include "pchheader.hpp" #include "conf.hpp" #include "crypto.hpp" -#include "hpfs/hpfs.hpp" +#include "sc/sc.hpp" #include "util/util.hpp" namespace conf @@ -109,9 +109,13 @@ namespace conf if (util::create_dir_tree_recursive(ctx.config_dir) == -1 || util::create_dir_tree_recursive(ctx.hist_dir) == -1 || util::create_dir_tree_recursive(ctx.full_hist_dir) == -1 || - util::create_dir_tree_recursive(ctx.contract_log_dir) == -1 || - util::create_dir_tree_recursive(ctx.hpfs_dir + "/seed" + hpfs::STATE_DIR_PATH) == -1 || - util::create_dir_tree_recursive(ctx.hpfs_mount_dir) == -1) + util::create_dir_tree_recursive(ctx.log_dir) == -1 || + util::create_dir_tree_recursive(ctx.contract_hpfs_dir + "/seed" + hpfs::STATE_DIR_PATH) == -1 || + util::create_dir_tree_recursive(ctx.contract_hpfs_mount_dir) == -1 || + util::create_dir_tree_recursive(ctx.ledger_hpfs_dir + "/seed" + hpfs::LEDGER_PRIMARY_DIR) == -1 || + util::create_dir_tree_recursive(ctx.ledger_hpfs_dir + "/seed" + hpfs::LEDGER_BLOB_DIR) == -1 || + util::create_dir_tree_recursive(ctx.ledger_hpfs_mount_dir) == -1 || + util::create_dir_tree_recursive(ctx.contract_log_dir) == -1) { std::cerr << "ERROR: unable to create directories.\n"; return -1; @@ -204,9 +208,12 @@ namespace conf ctx.tls_cert_file = ctx.config_dir + "/tlscert.pem"; ctx.hist_dir = basedir + "/hist"; ctx.full_hist_dir = basedir + "/fullhist"; - ctx.hpfs_dir = basedir + "/hpfs"; - ctx.hpfs_mount_dir = ctx.hpfs_dir + "/mnt"; - ctx.hpfs_rw_dir = ctx.hpfs_mount_dir + "/rw"; + ctx.contract_hpfs_dir = basedir + "/contract_fs"; + ctx.contract_hpfs_mount_dir = ctx.contract_hpfs_dir + "/mnt"; + ctx.contract_hpfs_rw_dir = ctx.contract_hpfs_mount_dir + "/rw"; + ctx.ledger_hpfs_dir = basedir + "/ledger_fs"; + ctx.ledger_hpfs_mount_dir = ctx.ledger_hpfs_dir + "/mnt"; + ctx.ledger_hpfs_rw_dir = ctx.ledger_hpfs_mount_dir + "/rw"; ctx.log_dir = basedir + "/log"; ctx.contract_log_dir = ctx.log_dir + "/contract"; } @@ -586,12 +593,13 @@ namespace conf */ int validate_contract_dir_paths() { - const std::string paths[9] = { + const std::string paths[10] = { ctx.contract_dir, ctx.config_file, ctx.hist_dir, ctx.full_hist_dir, - ctx.hpfs_dir, + ctx.contract_hpfs_dir, + ctx.ledger_hpfs_dir, ctx.tls_key_file, ctx.tls_cert_file, ctx.hpfs_exe_path, @@ -674,7 +682,7 @@ namespace conf jsoncons::ojson jdoc; populate_contract_section_json(jdoc, cfg.contract, true); - const std::string patch_file_path = hpfs::contract_fs.physical_path(hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH); + const std::string patch_file_path = sc::contract_fs.physical_path(hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH); return write_json_file(patch_file_path, jdoc); } @@ -686,7 +694,7 @@ namespace conf */ int apply_patch_config(std::string_view hpfs_session_name) { - const std::string path = hpfs::contract_fs.physical_path(hpfs_session_name, hpfs::PATCH_FILE_PATH); + const std::string path = sc::contract_fs.physical_path(hpfs_session_name, hpfs::PATCH_FILE_PATH); if (!util::is_file_exists(path)) return 0; @@ -909,7 +917,7 @@ namespace conf // Uncomment for docker-based execution. // std::string volumearg; - // volumearg.append("type=bind,source=").append(ctx.hpfs_dir).append(",target=/hpfs"); + // volumearg.append("type=bind,source=").append(ctx.contract_hpfs_dir).append(",target=/hpfs"); // const char *dockerargs[] = {"/usr/bin/docker", "run", "--rm", "-i", "--mount", volumearg.data(), contract.bin_path.data()}; // contract.runtime_binexec_args.insert(contract.runtime_binexec_args.begin(), std::begin(dockerargs), std::end(dockerargs)); diff --git a/src/conf.hpp b/src/conf.hpp index 668c2ce4..f318d06b 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -143,18 +143,21 @@ namespace conf std::string hpws_exe_path; // hpws executable file path. std::string hpfs_exe_path; // hpfs executable file path. - std::string contract_dir; // Contract base directory full path. - std::string full_hist_dir; // Contract full history dir full path. - std::string hist_dir; // Contract ledger history dir full path. - std::string hpfs_dir; // hpfs metdata dir (The location of hpfs log file). - std::string hpfs_mount_dir; // hpfs fuse file system mount path. - std::string hpfs_rw_dir; // hpfs read/write fs session path. - std::string log_dir; // HotPocket log dir full path. - std::string contract_log_dir; // Contract log dir full path. - std::string config_dir; // Config dir full path. - std::string config_file; // Full path to the config file. - std::string tls_key_file; // Full path to the tls private key file. - std::string tls_cert_file; // Full path to the tls certificate. + std::string contract_dir; // Contract base directory full path. + std::string full_hist_dir; // Contract full history dir full path. + std::string hist_dir; // Contract ledger history dir full path. + std::string contract_hpfs_dir; // Contract hpfs metdata dir (The location of hpfs log file). + std::string contract_hpfs_mount_dir; // Contract hpfs fuse file system mount path. + std::string contract_hpfs_rw_dir; // Contract hpfs read/write fs session path. + std::string ledger_hpfs_dir; // Ledger hpfs metdata dir (The location of hpfs log file). + std::string ledger_hpfs_mount_dir; // Ledger hpfs fuse file system mount path. + std::string ledger_hpfs_rw_dir; // Ledger hpfs read/write fs session path. + std::string log_dir; // HotPocket log dir full path. + std::string contract_log_dir; // Contract log dir full path. + std::string config_dir; // Config dir full path. + std::string config_file; // Full path to the config file. + std::string tls_key_file; // Full path to the tls private key file. + std::string tls_cert_file; // Full path to the tls certificate. int config_fd; // Config file file descriptor. struct flock config_lock; // Config file lock. diff --git a/src/consensus.cpp b/src/consensus.cpp index 8e6cb1d7..2bb4ca5d 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -10,10 +10,7 @@ #include "p2p/peer_session_handler.hpp" #include "hplog.hpp" #include "crypto.hpp" -#include "sc.hpp" #include "util/h32.hpp" -#include "hpfs/hpfs.hpp" -#include "hpfs/hpfs_sync.hpp" #include "unl.hpp" #include "ledger.hpp" #include "consensus.hpp" @@ -113,9 +110,8 @@ namespace consensus // Get current lcl and state. std::string lcl = ledger::ctx.get_lcl(); const uint64_t lcl_seq_no = ledger::ctx.get_seq_no(); - hpfs::hpfs_mount &contract_fs = hpfs::contract_fs; // Ref of the contract_fs object. - util::h32 state_hash = contract_fs.get_parent_hash(hpfs::STATE_DIR_PATH); - util::h32 patch_hash = contract_fs.get_parent_hash(hpfs::PATCH_FILE_PATH); + util::h32 state_hash = sc::contract_fs.get_parent_hash(hpfs::STATE_DIR_PATH); + util::h32 patch_hash = sc::contract_fs.get_parent_hash(hpfs::PATCH_FILE_PATH); if (ctx.stage == 0) { @@ -214,7 +210,7 @@ namespace consensus sync_target_list.push(hpfs::sync_target{"state", majority_state_hash, hpfs::STATE_DIR_PATH, hpfs::BACKLOG_ITEM_TYPE::DIR}); // Set sync targets for contract fs. - hpfs::contract_sync.set_target(std::move(sync_target_list)); + sc::contract_sync_worker.set_target(std::move(sync_target_list)); } // Proceed further only if both lcl and state are in sync with majority. @@ -238,7 +234,7 @@ namespace consensus */ void check_sync_completion() { - if (conf::cfg.node.role == conf::ROLE::OBSERVER && !hpfs::contract_sync.ctx.is_syncing && !ledger::sync_ctx.is_syncing) + if (conf::cfg.node.role == conf::ROLE::OBSERVER && !sc::contract_sync_worker.is_syncing && !ledger::sync_ctx.is_syncing) conf::change_role(conf::ROLE::VALIDATOR); } @@ -770,7 +766,7 @@ namespace consensus } } - is_state_desync = (hpfs::contract_fs.get_parent_hash(hpfs::STATE_DIR_PATH) != majority_state_hash); + is_state_desync = (sc::contract_fs.get_parent_hash(hpfs::STATE_DIR_PATH) != majority_state_hash); } /** @@ -796,7 +792,7 @@ namespace consensus } } - is_patch_desync = (hpfs::contract_fs.get_parent_hash(hpfs::PATCH_FILE_PATH) != majority_patch_hash); + is_patch_desync = (sc::contract_fs.get_parent_hash(hpfs::PATCH_FILE_PATH) != majority_patch_hash); } /** @@ -883,7 +879,7 @@ namespace consensus } // Update state hash in contract fs global hash tracker. - hpfs::contract_fs.set_parent_hash(hpfs::STATE_DIR_PATH, args.post_execution_state_hash); + sc::contract_fs.set_parent_hash(hpfs::STATE_DIR_PATH, args.post_execution_state_hash); new_state_hash = args.post_execution_state_hash; extract_user_outputs_from_contract_bufmap(args.userbufs); @@ -1052,18 +1048,16 @@ namespace consensus */ int apply_consensed_patch_file_changes(const util::h32 &prop_patch_hash, const util::h32 ¤t_patch_hash) { - hpfs::hpfs_mount &contract_fs = hpfs::contract_fs; - // Check whether is there any patch changes to be applied which reached consensus. if (is_patch_update_pending && current_patch_hash == prop_patch_hash) { - if (contract_fs.start_ro_session(HPFS_SESSION_NAME, false) != -1) + if (sc::contract_fs.start_ro_session(HPFS_SESSION_NAME, false) != -1) { // Appling new patch file changes to hpcore runtime. if (conf::apply_patch_config(HPFS_SESSION_NAME) == -1) { LOG_ERROR << "Appling patch file changes after consensus failed."; - contract_fs.stop_ro_session(HPFS_SESSION_NAME); + sc::contract_fs.stop_ro_session(HPFS_SESSION_NAME); return -1; } else @@ -1073,7 +1067,7 @@ namespace consensus } } - if (contract_fs.stop_ro_session(HPFS_SESSION_NAME) == -1) + if (sc::contract_fs.stop_ro_session(HPFS_SESSION_NAME) == -1) return -1; } return 0; diff --git a/src/consensus.hpp b/src/consensus.hpp index 68a45f57..d6e9863f 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -5,11 +5,10 @@ #include "util/util.hpp" #include "util/buffer_store.hpp" #include "util/merkle_hash_tree.hpp" -#include "sc.hpp" +#include "./sc/sc.hpp" #include "p2p/p2p.hpp" #include "usr/user_input.hpp" #include "util/h32.hpp" -#include "sc.hpp" namespace consensus { diff --git a/src/hpfs/hpfs.cpp b/src/hpfs/hpfs.cpp deleted file mode 100644 index 0c142070..00000000 --- a/src/hpfs/hpfs.cpp +++ /dev/null @@ -1,47 +0,0 @@ -#include "./hpfs.hpp" -#include "../conf.hpp" -#include "./hpfs_serve.hpp" - -namespace hpfs -{ - hpfs::hpfs_mount contract_fs; // Global contract file system instance. - hpfs::hpfs_sync contract_sync; // Global contract file system sync instance. - hpfs::hpfs_serve contract_serve; - - /** - * Initialize necessary file system mounts to hpcore. - */ - int init() - { - if (contract_fs.init(CONTRACT_FS_ID, conf::ctx.hpfs_dir, conf::ctx.hpfs_mount_dir, conf::ctx.hpfs_rw_dir, conf::cfg.node.full_history) == -1) - { - LOG_ERROR << "Contract file system initialization failed."; - return -1; - } - - if (contract_serve.init("contract", &contract_fs) == -1) - { - LOG_ERROR << "Contract file system serve worker initialization failed."; - return -1; - } - - if (contract_sync.init("contract", &contract_fs) == -1) - { - LOG_ERROR << "Contract file system sync worker initialization failed."; - return -1; - } - - return 0; - } - - /** - * Perform cleanups on created mounts. - */ - void deinit() - { - contract_fs.deinit(); - contract_serve.deinit(); - contract_sync.deinit(); - } - -} // namespace hpfs \ No newline at end of file diff --git a/src/hpfs/hpfs.hpp b/src/hpfs/hpfs.hpp deleted file mode 100644 index 36678058..00000000 --- a/src/hpfs/hpfs.hpp +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef _HP_HPFS_HPFS -#define _HP_HPFS_HPFS - -#include "./hpfs_mount.hpp" -#include "./hpfs_sync.hpp" - -namespace hpfs -{ - constexpr int32_t CONTRACT_FS_ID = 0; - - extern hpfs::hpfs_mount contract_fs; // Global contract file system instance. - extern hpfs::hpfs_sync contract_sync; // Global contract file system sync instance. - int init(); - void deinit(); - -} // namespace hpfs -#endif \ No newline at end of file diff --git a/src/hpfs/hpfs_mount.cpp b/src/hpfs/hpfs_mount.cpp index 8b0fa9f7..3e37595d 100644 --- a/src/hpfs/hpfs_mount.cpp +++ b/src/hpfs/hpfs_mount.cpp @@ -3,7 +3,7 @@ #include "../hplog.hpp" #include "../util/util.hpp" #include "../util/h32.hpp" -#include "../sc.hpp" +#include "../sc/sc.hpp" namespace hpfs { @@ -23,7 +23,7 @@ namespace hpfs /** * This should be called to activate the hpfs mount process. */ - int hpfs_mount::init(const int32_t mount_id, std::string_view fs_dir, std::string_view mount_dir, std::string_view rw_dir, const bool is_full_history) + int hpfs_mount::init(const uint32_t mount_id, std::string_view fs_dir, std::string_view mount_dir, std::string_view rw_dir, const bool is_full_history) { this->mount_id = mount_id; this->fs_dir = fs_dir; @@ -62,23 +62,6 @@ namespace hpfs */ int hpfs_mount::prepare_fs() { - // This contract mount specific preparation logic will be moved to a seprate child class in the next PBI. - util::h32 initial_state_hash; - util::h32 initial_patch_hash; - - if (acquire_rw_session() == -1 || - conf::populate_patch_config() == -1 || - get_hash(initial_state_hash, RW_SESSION_NAME, hpfs::STATE_DIR_PATH) == -1 || - get_hash(initial_patch_hash, RW_SESSION_NAME, hpfs::PATCH_FILE_PATH) == -1 || - release_rw_session() == -1) - { - LOG_ERROR << "Failed to prepare initial fs at mount " << mount_dir << "."; - return -1; - } - - set_parent_hash(hpfs::STATE_DIR_PATH, initial_state_hash); - set_parent_hash(hpfs::PATCH_FILE_PATH, initial_patch_hash); - LOG_INFO << "Initial state: " << initial_state_hash << " | patch: " << initial_patch_hash; return 0; } diff --git a/src/hpfs/hpfs_mount.hpp b/src/hpfs/hpfs_mount.hpp index 51b43305..a37e1b9c 100644 --- a/src/hpfs/hpfs_mount.hpp +++ b/src/hpfs/hpfs_mount.hpp @@ -7,10 +7,12 @@ namespace hpfs { - constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB; - constexpr const char *RW_SESSION_NAME = "rw"; // The built-in session name used by hpfs for RW sessions. - constexpr const char *STATE_DIR_PATH = "/state"; // State directory name. - constexpr const char *PATCH_FILE_PATH = "/patch.cfg"; // Config patch filename. + constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB; + constexpr const char *RW_SESSION_NAME = "rw"; // The built-in session name used by hpfs for RW sessions. + constexpr const char *STATE_DIR_PATH = "/state"; // State directory name. + constexpr const char *PATCH_FILE_PATH = "/patch.cfg"; // Config patch filename. + constexpr const char *LEDGER_PRIMARY_DIR = "/primary"; // Ledger primary directory name. + constexpr const char *LEDGER_BLOB_DIR = "/blob"; // Ledger blob directory name. struct child_hash_node { @@ -37,7 +39,6 @@ namespace hpfs private: pid_t hpfs_pid = 0; std::string fs_dir; - std::string mount_dir; bool is_full_history; bool init_success = false; // Keeps the hashes of hpfs parents against its vpath. @@ -49,12 +50,13 @@ namespace hpfs std::mutex rw_mutex; protected: + std::string mount_dir; virtual int prepare_fs(); public: - int32_t mount_id; // Used in hpfs serving and syncing. + uint32_t mount_id; // Used in hpfs serving and syncing. std::string rw_dir; - int init(const int32_t mount_id, std::string_view fs_dir, std::string_view mount_dir, std::string_view rw_dir, const bool is_full_history); + int init(const uint32_t mount_id, std::string_view fs_dir, std::string_view mount_dir, std::string_view rw_dir, const bool is_full_history); void deinit(); int start_hpfs_process(); diff --git a/src/hpfs/hpfs_serve.cpp b/src/hpfs/hpfs_serve.cpp index 30d84686..24ae5477 100644 --- a/src/hpfs/hpfs_serve.cpp +++ b/src/hpfs/hpfs_serve.cpp @@ -57,7 +57,6 @@ namespace hpfs LOG_INFO << "Hpfs " << name << " server started."; - std::list> hpfs_requests; // Indicates whether any requests were processed in the previous loop iteration. bool prev_requests_processed = false; @@ -68,7 +67,7 @@ namespace hpfs if (!prev_requests_processed) util::sleep(LOOP_WAIT); - swap_collected_requests(hpfs_requests); + swap_collected_requests(); prev_requests_processed = !hpfs_requests.empty(); const uint64_t time_start = util::get_epoch_milliseconds(); @@ -353,15 +352,4 @@ namespace hpfs return result; } - /** - * Move the collected requests from hpfs requests to the local hpfs request list. - */ - void hpfs_serve::swap_collected_requests(std::list> &hpfs_requests) - { - std::scoped_lock lock(p2p::ctx.collected_msgs.contract_hpfs_requests_mutex); - - // Move collected hpfs requests for contract fs over to local requests list. - if (!p2p::ctx.collected_msgs.contract_hpfs_requests.empty()) - hpfs_requests.splice(hpfs_requests.end(), p2p::ctx.collected_msgs.contract_hpfs_requests); - } } // namespace hpfs \ No newline at end of file diff --git a/src/hpfs/hpfs_serve.hpp b/src/hpfs/hpfs_serve.hpp index 367d2ccd..53b73a2c 100644 --- a/src/hpfs/hpfs_serve.hpp +++ b/src/hpfs/hpfs_serve.hpp @@ -21,7 +21,9 @@ namespace hpfs void hpfs_serve_loop(); protected: - virtual void swap_collected_requests(std::list> &hpfs_requests); // Must override in child classes. + std::list> hpfs_requests; + // Move the collected requests from hpfs requests to a local response list. + virtual void swap_collected_requests() = 0; // Must override in child classes. public: int init(std::string_view name, hpfs::hpfs_mount *fs_mount); diff --git a/src/hpfs/hpfs_sync.cpp b/src/hpfs/hpfs_sync.cpp index b192111d..82975a75 100644 --- a/src/hpfs/hpfs_sync.cpp +++ b/src/hpfs/hpfs_sync.cpp @@ -8,7 +8,7 @@ #include "../util/util.hpp" #include "../util/h32.hpp" #include "hpfs_sync.hpp" -#include "../sc.hpp" +#include "../sc/sc.hpp" #include "../unl.hpp" namespace hpfs @@ -38,7 +38,7 @@ namespace hpfs this->name = name; this->fs_mount = fs_mount; REQUEST_RESUBMIT_TIMEOUT = hpfs::get_request_resubmit_timeout(); - ctx.hpfs_sync_thread = std::thread(&hpfs_sync::hpfs_syncer_loop, this); + hpfs_sync_thread = std::thread(&hpfs_sync::hpfs_syncer_loop, this); init_success = true; return 0; } @@ -50,9 +50,9 @@ namespace hpfs { if (init_success) { - ctx.is_syncing = false; - ctx.is_shutting_down = true; - ctx.hpfs_sync_thread.join(); + is_syncing = false; + is_shutting_down = true; + hpfs_sync_thread.join(); } } @@ -67,15 +67,15 @@ namespace hpfs return; // Do not do anything if we are already syncing towards the specified target states. - if (ctx.is_shutting_down || (ctx.is_syncing && ctx.original_target_list == target_list)) + if (is_shutting_down || (is_syncing && original_target_list == target_list)) return; - ctx.original_target_list = target_list; - ctx.target_list = std::move(target_list); + this->original_target_list = target_list; + this->target_list = std::move(target_list); - std::unique_lock lock(ctx.current_target_mutex); - ctx.current_target = ctx.target_list.front(); // Make the first element of the list the first target to sync. - ctx.is_syncing = true; + std::unique_lock lock(current_target_mutex); + current_target = target_list.front(); // Make the first element of the list the first target to sync. + is_syncing = true; } /** @@ -85,42 +85,42 @@ namespace hpfs { util::mask_signal(); - LOG_INFO << "hpfs " << name << " sync: Worker started."; + LOG_INFO << "Hpfs " << name << " sync: Worker started."; - while (!ctx.is_shutting_down) + while (!is_shutting_down) { util::sleep(IDLE_WAIT); // Keep idling if we are not doing any sync activity. - if (!ctx.is_syncing) + if (!is_syncing) continue; if (fs_mount->acquire_rw_session() != -1) { - while (!ctx.is_shutting_down) + while (!is_shutting_down) { { - std::shared_lock lock(ctx.current_target_mutex); - LOG_INFO << "hpfs " << name << " sync: Starting sync for target " << ctx.current_target.name << " hash: " << ctx.current_target.hash; + std::shared_lock lock(current_target_mutex); + LOG_INFO << "Hpfs " << name << " sync: Starting sync for target " << current_target.name << " hash: " << current_target.hash; } util::h32 new_state = util::h32_empty; - const int result = request_loop(ctx.current_target.hash, new_state); + const int result = request_loop(current_target.hash, new_state); - ctx.pending_requests.clear(); - ctx.candidate_hpfs_responses.clear(); - ctx.submitted_requests.clear(); + pending_requests.clear(); + candidate_hpfs_responses.clear(); + submitted_requests.clear(); - if (result == -1 || result == 1 || ctx.is_shutting_down) + if (result == -1 || result == 1 || is_shutting_down) break; { - std::shared_lock lock(ctx.current_target_mutex); + std::shared_lock lock(current_target_mutex); - if (new_state == ctx.current_target.hash) + if (new_state == current_target.hash) { - LOG_INFO << "hpfs " << name << " sync: Target " << ctx.current_target.name << " hash achieved: " << new_state; - on_current_sync_state_acheived(); + LOG_INFO << "Hpfs " << name << " sync: Target " << current_target.name << " hash achieved: " << new_state; + on_current_sync_state_acheived(new_state); // Start syncing to next target. const int result = start_syncing_next_target(); @@ -131,26 +131,26 @@ namespace hpfs } else { - LOG_INFO << "hpfs " << name << " sync: Continuing sync for new target: " << ctx.current_target.hash; + LOG_INFO << "Hpfs " << name << " sync: Continuing sync for new target: " << current_target.hash; continue; } } } - LOG_INFO << "hpfs " << name << " sync: All parents synced."; + LOG_INFO << "Hpfs " << name << " sync: All parents synced."; fs_mount->release_rw_session(); } else { - LOG_ERROR << "hpfs " << name << " sync: Failed to start hpfs rw session"; + LOG_ERROR << "Hpfs " << name << " sync: Failed to start hpfs rw session"; } // Clear target list and original target list since the sync is complete. - ctx.target_list = {}; - ctx.original_target_list = {}; - ctx.is_syncing = false; + target_list = {}; + original_target_list = {}; + is_syncing = false; } - LOG_INFO << "hpfs " << name << " sync: Worker stopped."; + LOG_INFO << "Hpfs " << name << " sync: Worker stopped."; } /** @@ -158,12 +158,12 @@ namespace hpfs * @return -1 on error. 0 when current sync state acheived or sync is stopped due to target change. * Returns 1 on successfully finishing all the sync targets. */ - int hpfs_sync::request_loop(const util::h32 current_target, util::h32 &updated_state) + int hpfs_sync::request_loop(const util::h32 current_target_hash, util::h32 &updated_state) { std::string lcl = ledger::ctx.get_lcl(); // Send the initial root hpfs request of the current target. - submit_request(backlog_item{ctx.current_target.item_type, ctx.current_target.vpath, -1, current_target}, lcl); + submit_request(backlog_item{this->current_target.item_type, this->current_target.vpath, -1, current_target_hash}, lcl); // Indicates whether any responses were processed in the previous loop iteration. bool prev_responses_processed = false; @@ -171,7 +171,7 @@ namespace hpfs // No. of repetitive resubmissions so far. (This is reset whenever we receive a hpfs response) uint16_t resubmissions_count = 0; - while (!should_stop_request_loop(current_target)) + while (!should_stop_request_loop(current_target_hash)) { // Wait a small delay if there were no responses processed during previous iteration. if (!prev_responses_processed) @@ -183,18 +183,18 @@ namespace hpfs // Move the received hpfs responses to the local response list. swap_collected_responses(); - prev_responses_processed = !ctx.candidate_hpfs_responses.empty(); + prev_responses_processed = !candidate_hpfs_responses.empty(); // Reset resubmissions counter whenever we have a resposne. - if (!ctx.candidate_hpfs_responses.empty()) + if (!candidate_hpfs_responses.empty()) resubmissions_count = 0; - for (auto &response : ctx.candidate_hpfs_responses) + for (auto &response : candidate_hpfs_responses) { - if (should_stop_request_loop(current_target)) + if (should_stop_request_loop(current_target_hash)) return 0; - LOG_DEBUG << "hpfs " << name << " sync: Processing hpfs response from [" << response.first.substr(2, 10) << "]"; + LOG_DEBUG << "Hpfs " << name << " sync: Processing hpfs response from [" << response.first.substr(2, 10) << "]"; const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(response.second.data()); const msg::fbuf::p2pmsg::Hpfs_Response_Message *resp_msg = content->message_as_Hpfs_Response_Message(); @@ -204,10 +204,10 @@ namespace hpfs std::string_view vpath = msg::fbuf::flatbuff_str_to_sv(resp_msg->path()); const std::string key = std::string(vpath).append(hash); - const auto pending_resp_itr = ctx.submitted_requests.find(key); - if (pending_resp_itr == ctx.submitted_requests.end()) + const auto pending_resp_itr = submitted_requests.find(key); + if (pending_resp_itr == submitted_requests.end()) { - LOG_DEBUG << "hpfs " << name << " sync: Skipping hpfs response due to hash mismatch."; + LOG_DEBUG << "Hpfs " << name << " sync: Skipping hpfs response due to hash mismatch."; continue; } @@ -225,7 +225,7 @@ namespace hpfs // Validate received fs data against the hash. if (!validate_fs_entry_hash(vpath, hash, peer_fs_entry_map)) { - LOG_INFO << "hpfs " << name << " sync: Skipping hpfs response due to fs entry hash mismatch."; + LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to fs entry hash mismatch."; continue; } @@ -242,7 +242,7 @@ namespace hpfs // Validate received hashmap against the hash. if (!validate_file_hashmap_hash(vpath, hash, peer_hashes, peer_hash_count)) { - LOG_INFO << "hpfs " << name << " sync: Skipping hpfs response due to file hashmap hash mismatch."; + LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to file hashmap hash mismatch."; continue; } @@ -259,7 +259,7 @@ namespace hpfs // Validate received block data against the hash. if (!validate_file_block_hash(hash, block_id, buf)) { - LOG_INFO << "hpfs " << name << " sync: Skipping hpfs response due to file block hash mismatch."; + LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to file block hash mismatch."; continue; } @@ -267,30 +267,30 @@ namespace hpfs } // Now that we have received matching hash and handled it, remove it from the waiting list. - ctx.submitted_requests.erase(pending_resp_itr); + submitted_requests.erase(pending_resp_itr); // After handling each response, check whether we have reached target hpfs state. // get_hash returns 0 incase target parent is not existing in our side. - if (fs_mount->get_hash(updated_state, hpfs::RW_SESSION_NAME, ctx.current_target.vpath) == -1) + if (fs_mount->get_hash(updated_state, hpfs::RW_SESSION_NAME, this->current_target.vpath) == -1) { - LOG_ERROR << "hpfs " << name << " sync: exiting due to hash check error."; + LOG_ERROR << "Hpfs " << name << " sync: exiting due to hash check error."; return -1; } // Update the central hpfs state tracker. - fs_mount->set_parent_hash(ctx.current_target.vpath, updated_state); + fs_mount->set_parent_hash(this->current_target.vpath, updated_state); - LOG_DEBUG << "hpfs " << name << " sync: current:" << updated_state << " | target:" << current_target; - if (updated_state == current_target) + LOG_DEBUG << "Hpfs " << name << " sync: current:" << updated_state << " | target:" << current_target_hash; + if (updated_state == current_target_hash) return 0; } - ctx.candidate_hpfs_responses.clear(); + candidate_hpfs_responses.clear(); // Check for long-awaited responses and re-request them. - for (auto &[hash, request] : ctx.submitted_requests) + for (auto &[hash, request] : submitted_requests) { - if (should_stop_request_loop(current_target)) + if (should_stop_request_loop(current_target_hash)) return 0; if (request.waiting_time < REQUEST_RESUBMIT_TIMEOUT) @@ -302,9 +302,9 @@ namespace hpfs { if (++resubmissions_count > ABANDON_THRESHOLD) { - LOG_INFO << "hpfs " << name << " sync: Resubmission threshold exceeded. Abandoning sync."; + LOG_INFO << "Hpfs " << name << " sync: Resubmission threshold exceeded. Abandoning sync."; - std::shared_lock lock(ctx.current_target_mutex); + std::shared_lock lock(current_target_mutex); const int result = start_syncing_next_target(); if (result == 0) return 1; // To stop syncing since we have sync all the targets. @@ -313,23 +313,23 @@ namespace hpfs // Reset the counter and re-submit request. request.waiting_time = 0; - LOG_DEBUG << "hpfs " << name << " sync: Resubmitting request..."; + LOG_DEBUG << "Hpfs " << name << " sync: Resubmitting request..."; submit_request(request, lcl); } } // Check whether we can submit any more requests. - if (!ctx.pending_requests.empty() && ctx.submitted_requests.size() < MAX_AWAITING_REQUESTS) + if (!pending_requests.empty() && submitted_requests.size() < MAX_AWAITING_REQUESTS) { - const uint16_t available_slots = MAX_AWAITING_REQUESTS - ctx.submitted_requests.size(); - for (int i = 0; i < available_slots && !ctx.pending_requests.empty(); i++) + const uint16_t available_slots = MAX_AWAITING_REQUESTS - submitted_requests.size(); + for (int i = 0; i < available_slots && !pending_requests.empty(); i++) { - if (should_stop_request_loop(current_target)) + if (should_stop_request_loop(current_target_hash)) return 0; - const backlog_item &request = ctx.pending_requests.front(); + const backlog_item &request = pending_requests.front(); submit_request(request, lcl); - ctx.pending_requests.pop_front(); + pending_requests.pop_front(); } } } @@ -405,14 +405,14 @@ namespace hpfs /** * Indicates whether to break out of hpfs request processing loop. */ - bool hpfs_sync::should_stop_request_loop(const util::h32 ¤t_target) + bool hpfs_sync::should_stop_request_loop(const util::h32 ¤t_target_hash) { - if (ctx.is_shutting_down) + if (is_shutting_down) return true; // Stop request loop if the target has changed. - std::shared_lock lock(ctx.current_target_mutex); - return current_target != ctx.current_target.hash; + std::shared_lock lock(current_target_mutex); + return current_target_hash != this->current_target.hash; } /** @@ -445,14 +445,14 @@ namespace hpfs { const std::string key = std::string(request.path) .append(reinterpret_cast(&request.expected_hash), sizeof(util::h32)); - ctx.submitted_requests.try_emplace(key, request); + submitted_requests.try_emplace(key, request); const bool is_file = request.type != BACKLOG_ITEM_TYPE::DIR; std::string target_pubkey; request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, lcl, target_pubkey); if (!target_pubkey.empty()) - LOG_DEBUG << "hpfs " << name << " sync: Requesting from [" << target_pubkey.substr(2, 10) << "]. type:" << request.type + LOG_DEBUG << "Hpfs " << name << " sync: Requesting from [" << target_pubkey.substr(2, 10) << "]. type:" << request.type << " path:" << request.path << " block_id:" << request.block_id << " hash:" << request.expected_hash; } @@ -466,7 +466,7 @@ namespace hpfs int hpfs_sync::handle_fs_entry_response(std::string_view vpath, std::unordered_map &fs_entry_map) { // Get the parent path of the fs entries we have received. - LOG_DEBUG << "hpfs " << name << " sync: Processing fs entries response for " << vpath; + LOG_DEBUG << "Hpfs " << name << " sync: Processing fs entries response for " << vpath; // Create physical directory on our side if not exist. std::string parent_physical_path = fs_mount->rw_dir + vpath.data(); @@ -494,9 +494,9 @@ namespace hpfs { // Prioritize file hpfs requests over directories. if (ex_entry.is_file) - ctx.pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, peer_itr->second.hash}); + pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, peer_itr->second.hash}); else - ctx.pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, peer_itr->second.hash}); + pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, peer_itr->second.hash}); } fs_entry_map.erase(peer_itr); @@ -510,7 +510,7 @@ namespace hpfs !ex_entry.is_file && util::remove_directory_recursively(child_physical_path.c_str()) == -1) return -1; - LOG_DEBUG << "hpfs " << name << " sync: Deleted " << (ex_entry.is_file ? "file" : "dir") << " path " << child_vpath; + LOG_DEBUG << "Hpfs " << name << " sync: Deleted " << (ex_entry.is_file ? "file" : "dir") << " path " << child_vpath; } } @@ -524,9 +524,9 @@ namespace hpfs // Prioritize file hpfs requests over directories. if (fs_entry.is_file) - ctx.pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, fs_entry.hash}); + pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, fs_entry.hash}); else - ctx.pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, fs_entry.hash}); + pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, fs_entry.hash}); } return 0; @@ -543,7 +543,7 @@ namespace hpfs int hpfs_sync::handle_file_hashmap_response(std::string_view vpath, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length) { // Get the file path of the block hashes we have received. - LOG_DEBUG << "hpfs " << name << " sync: Processing file block hashes response for " << vpath; + LOG_DEBUG << "Hpfs " << name << " sync: Processing file block hashes response for " << vpath; // File block hashes on our side (file might not exist on our side). std::vector existing_hashes; @@ -552,13 +552,13 @@ namespace hpfs const size_t existing_hash_count = existing_hashes.size(); // Compare the block hashes and request any differences. - auto insert_itr = ctx.pending_requests.begin(); + auto insert_itr = pending_requests.begin(); const int32_t max_block_id = MAX(existing_hash_count, hash_count) - 1; for (int32_t block_id = 0; block_id <= max_block_id; block_id++) { // Insert at front to give priority to block requests while preserving block order. if (block_id >= existing_hash_count || existing_hashes[block_id] != hashes[block_id]) - ctx.pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]}); + pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]}); } if (existing_hashes.size() >= hash_count) @@ -581,7 +581,7 @@ namespace hpfs */ int hpfs_sync::handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf) { - LOG_DEBUG << "hpfs " << name << " sync: Writing block_id " << block_id + LOG_DEBUG << "Hpfs " << name << " sync: Writing block_id " << block_id << " (len:" << buf.length() << ") of " << vpath; @@ -609,25 +609,8 @@ namespace hpfs * This method can be used to invoke mount specific custom logic (after extending this super class) to be executed after * a sync target is acheived. */ - void hpfs_sync::on_current_sync_state_acheived() + void hpfs_sync::on_current_sync_state_acheived(const util::h32 &acheived_hash) { - if (ctx.current_target.vpath == hpfs::PATCH_FILE_PATH) - { - // Appling new patch file changes to hpcore runtime. - if (conf::apply_patch_config(hpfs::RW_SESSION_NAME) == -1) - { - LOG_ERROR << "Appling patch file changes after sync failed"; - } - else - { - unl::update_unl_changes_from_patch(); - - // Update global hash tracker with the new patch file hash. - util::h32 updated_patch_hash; - fs_mount->get_hash(updated_patch_hash, hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH); - fs_mount->set_parent_hash(ctx.current_target.vpath, updated_patch_hash); - } - } } /** @@ -636,30 +619,17 @@ namespace hpfs */ int hpfs_sync::start_syncing_next_target() { - ctx.target_list.pop(); // Remove the synced parent from the target list. - if (ctx.target_list.empty()) + target_list.pop(); // Remove the synced parent from the target list. + if (target_list.empty()) { - ctx.current_target = {}; + current_target = {}; return 0; } else { - ctx.current_target = ctx.target_list.front(); + current_target = target_list.front(); return 1; } } - /** - * Move the collected responses from hpfs responses to a local response list. - */ - void hpfs_sync::swap_collected_responses() - { - // This logic will be added to a child class in next PBI. - std::scoped_lock lock(p2p::ctx.collected_msgs.contract_hpfs_responses_mutex); - - // Move collected hpfs responses over to local candidate responses list. - if (!p2p::ctx.collected_msgs.contract_hpfs_responses.empty()) - ctx.candidate_hpfs_responses.splice(ctx.candidate_hpfs_responses.end(), p2p::ctx.collected_msgs.contract_hpfs_responses); - } - } // namespace hpfs \ No newline at end of file diff --git a/src/hpfs/hpfs_sync.hpp b/src/hpfs/hpfs_sync.hpp index a9af42ac..2fd4d2a1 100644 --- a/src/hpfs/hpfs_sync.hpp +++ b/src/hpfs/hpfs_sync.hpp @@ -6,6 +6,7 @@ #include "../msg/fbuf/p2pmsg_content_generated.h" #include "../util/h32.hpp" #include "../crypto.hpp" +#include "./hpfs_mount.hpp" namespace hpfs { @@ -43,50 +44,49 @@ namespace hpfs } }; - struct sync_context + class hpfs_sync { - // The current target hashes we are syncing towards. - std::queue target_list; + private: + bool init_success = false; + uint16_t REQUEST_RESUBMIT_TIMEOUT; // No. of milliseconds to wait before resubmitting a request. + std::string name; // Name used for logging. + + std::queue target_list; // The current target hashes we are syncing towards. + // Store the originally submitted sync target list. This list is used to avoid submitting same list multiple times // because target list is updated when the sync targets are acheived. std::queue original_target_list; - sync_target current_target = {}; - // List of sender pubkeys and hpfs responses(flatbuffer messages) to be processed. - std::list> candidate_hpfs_responses; - - // List of pending sync requests to be sent out. - std::list pending_requests; + std::list pending_requests; // List of pending sync requests to be sent out. // List of submitted requests we are awaiting responses for, keyed by expected response path+hash. std::unordered_map submitted_requests; std::thread hpfs_sync_thread; std::shared_mutex current_target_mutex; - std::atomic is_syncing = false; std::atomic is_shutting_down = false; - }; - - class hpfs_sync - { - private: - bool init_success = false; - uint16_t REQUEST_RESUBMIT_TIMEOUT; // No. of milliseconds to wait before resubmitting a request. - hpfs::hpfs_mount *fs_mount = NULL; - std::string name; void hpfs_syncer_loop(); - int request_loop(const util::h32 current_target, util::h32 &updated_state); + int request_loop(const util::h32 current_target_hash, util::h32 &updated_state); int start_syncing_next_target(); protected: - virtual void on_current_sync_state_acheived(); - virtual void swap_collected_responses(); // Must override in child classes. + sync_target current_target = {}; + + // List of sender pubkeys and hpfs responses(flatbuffer messages) to be processed. + std::list> candidate_hpfs_responses; + + hpfs::hpfs_mount *fs_mount = NULL; + + virtual void on_current_sync_state_acheived(const util::h32 &acheived_hash); + + // Move the collected responses from hpfs responses to a local response list. + virtual void swap_collected_responses() = 0; // Must override in child classes. public: - sync_context ctx; + std::atomic is_syncing = false; int init(std::string_view name, hpfs::hpfs_mount *fs_mount); @@ -100,7 +100,7 @@ namespace hpfs bool validate_file_block_hash(std::string_view hash, const uint32_t block_id, std::string_view buf); - bool should_stop_request_loop(const util::h32 ¤t_target); + bool should_stop_request_loop(const util::h32 ¤t_target_hash); void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, const util::h32 expected_hash, std::string_view lcl, std::string &target_pubkey); diff --git a/src/ledger/ledger_mount.cpp b/src/ledger/ledger_mount.cpp new file mode 100644 index 00000000..4ee4f739 --- /dev/null +++ b/src/ledger/ledger_mount.cpp @@ -0,0 +1,15 @@ +#include "./ledger_mount.hpp" + +namespace ledger +{ + /** + * Perform ledger file system mount related preparation tasks. + * @return Returns -1 on error and 0 on success. + */ + int ledger_mount::prepare_fs() + { + // Add ledger fs preparation logic here. + return 0; + } + +} // namespace ledger \ No newline at end of file diff --git a/src/ledger/ledger_mount.hpp b/src/ledger/ledger_mount.hpp new file mode 100644 index 00000000..4551b9ba --- /dev/null +++ b/src/ledger/ledger_mount.hpp @@ -0,0 +1,20 @@ +#ifndef _HP_LEDGER_LEDGER_MOUNT_ +#define _HP_LEDGER_LEDGER_MOUNT_ + +#include "../pchheader.hpp" +#include "../util/h32.hpp" +#include "../conf.hpp" +#include "../hpfs/hpfs_mount.hpp" + +namespace ledger +{ + /** + * Represents ledger file system mount. + */ + class ledger_mount : public hpfs::hpfs_mount + { + private: + int prepare_fs(); + }; +} // namespace ledger +#endif \ No newline at end of file diff --git a/src/ledger/ledger_sample.cpp b/src/ledger/ledger_sample.cpp index 66c002f7..0846812f 100644 --- a/src/ledger/ledger_sample.cpp +++ b/src/ledger/ledger_sample.cpp @@ -4,10 +4,52 @@ #include "../util/util.hpp" #include "../msg/fbuf/ledger_helpers.hpp" #include "../msg/fbuf/common_helpers.hpp" +#include "ledger_serve.hpp" // Currently this namespace is added for sqlite testing, later this can be modified and renamed as 'ledger::ledger_sample' -> 'ledger' for ledger implementations. namespace ledger::ledger_sample { + constexpr uint32_t LEDGER_FS_ID = 1; + ledger::ledger_mount ledger_fs; // Global ledger file system instance. + ledger::ledger_sync ledger_sync_worker; // Global ledger file system sync instance. + ledger::ledger_serve ledger_server; // Ledger file server instance. + + /** + * Perform ledger related initializations. + */ + int init() + { + if (ledger_fs.init(LEDGER_FS_ID, conf::ctx.ledger_hpfs_dir, conf::ctx.ledger_hpfs_mount_dir, conf::ctx.ledger_hpfs_rw_dir, conf::cfg.node.full_history) == -1) + { + LOG_ERROR << "Ledger file system initialization failed."; + return -1; + } + + if (ledger_server.init("ledger", &ledger_fs) == -1) + { + LOG_ERROR << "Ledger file system serve worker initialization failed."; + return -1; + } + + if (ledger_sync_worker.init("ledger", &ledger_fs) == -1) + { + LOG_ERROR << "Ledger file system sync worker initialization failed."; + return -1; + } + + return 0; + } + + /** + * Perform deinit tasks related to ledger. + */ + void deinit() + { + ledger_fs.deinit(); + ledger_server.deinit(); + ledger_sync_worker.deinit(); + } + /** * Create and save ledger record from the given proposal message. * @param proposal Consensus-reached Stage 3 proposal. @@ -47,14 +89,14 @@ namespace ledger::ledger_sample // Get binary hash of the serialized lcl. std::string_view ledger_str_buf = msg::fbuf::flatbuff_bytes_to_sv(builder.GetBufferPointer(), builder.GetSize()); const std::string lcl_hash = crypto::get_hash(ledger_str_buf); - + // Get binary hash of users and inputs. const std::string user_hash = crypto::get_hash(proposal.users); const std::string input_hash = crypto::get_hash(proposal.input_hashes); const std::string seq_no_str = std::to_string(seq_no); const std::string time_str = std::to_string(proposal.time); - + // Contruct binary string for data hash. std::string data; data.reserve(seq_no_str.size() + time_str.size() + (32 * 5)); diff --git a/src/ledger/ledger_sample.hpp b/src/ledger/ledger_sample.hpp index eef4c1de..7a8775d4 100644 --- a/src/ledger/ledger_sample.hpp +++ b/src/ledger/ledger_sample.hpp @@ -1,12 +1,21 @@ #include "../p2p/p2p.hpp" #include "sqlite.hpp" +#include "ledger_sync.hpp" +#include "ledger_mount.hpp" namespace ledger::ledger_sample { constexpr const char *GENESIS_LEDGER = "0-genesis"; + extern ledger::ledger_mount ledger_fs; // Global ledger file system instance. + extern ledger::ledger_sync ledger_sync_worker; // Global ledger file system sync instance. + + int init(); + + void deinit(); + int save_ledger(const p2p::proposal &proposal); int extract_lcl(const std::string &lcl, uint64_t &seq_no, std::string &hash); - + } // namespace ledger::ledger_sample \ No newline at end of file diff --git a/src/ledger/ledger_serve.cpp b/src/ledger/ledger_serve.cpp new file mode 100644 index 00000000..794b2a72 --- /dev/null +++ b/src/ledger/ledger_serve.cpp @@ -0,0 +1,14 @@ + +#include "./ledger_serve.hpp" + +namespace ledger +{ + void ledger_serve::swap_collected_requests() + { + std::scoped_lock lock(p2p::ctx.collected_msgs.ledger_hpfs_requests_mutex); + + // Move collected hpfs requests for contract fs over to local requests list. + if (!p2p::ctx.collected_msgs.ledger_hpfs_requests.empty()) + hpfs_requests.splice(hpfs_requests.end(), p2p::ctx.collected_msgs.ledger_hpfs_requests); + } +} // namespace ledger \ No newline at end of file diff --git a/src/ledger/ledger_serve.hpp b/src/ledger/ledger_serve.hpp new file mode 100644 index 00000000..c536dea5 --- /dev/null +++ b/src/ledger/ledger_serve.hpp @@ -0,0 +1,15 @@ +#ifndef _HP_LEDGER_LEDGER_SERVE_ +#define _HP_LEDGER_LEDGER_SERVE_ + +#include "../pchheader.hpp" +#include "../hpfs/hpfs_serve.hpp" + +namespace ledger +{ + class ledger_serve : public hpfs::hpfs_serve + { + private: + void swap_collected_requests(); + }; +} // namespace ledger +#endif \ No newline at end of file diff --git a/src/ledger/ledger_sync.cpp b/src/ledger/ledger_sync.cpp new file mode 100644 index 00000000..27dd81e8 --- /dev/null +++ b/src/ledger/ledger_sync.cpp @@ -0,0 +1,19 @@ + +#include "./ledger_sync.hpp" + +namespace ledger +{ + void ledger_sync::on_current_sync_state_acheived(const util::h32 &acheived_hash) + { + // Logic when a sync state is acheived can be performed here. + } + + void ledger_sync::swap_collected_responses() + { + std::scoped_lock lock(p2p::ctx.collected_msgs.ledger_hpfs_responses_mutex); + + // Move collected hpfs responses over to local candidate responses list. + if (!p2p::ctx.collected_msgs.ledger_hpfs_responses.empty()) + candidate_hpfs_responses.splice(candidate_hpfs_responses.end(), p2p::ctx.collected_msgs.ledger_hpfs_responses); + } +} // namespace ledger \ No newline at end of file diff --git a/src/ledger/ledger_sync.hpp b/src/ledger/ledger_sync.hpp new file mode 100644 index 00000000..9c80a48d --- /dev/null +++ b/src/ledger/ledger_sync.hpp @@ -0,0 +1,18 @@ +#ifndef _HP_LEDGER_LEDGER_SYNC_ +#define _HP_LEDGER_LEDGER_SYNC_ + +#include "../pchheader.hpp" +#include "../util/h32.hpp" +#include "../conf.hpp" +#include "../hpfs/hpfs_sync.hpp" + +namespace ledger +{ + class ledger_sync : public hpfs::hpfs_sync + { + private: + void on_current_sync_state_acheived(const util::h32 &acheived_hash); + void swap_collected_responses(); + }; +} // namespace ledger +#endif \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index e03e4c2f..6b650c4e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -6,14 +6,14 @@ #include "util/util.hpp" #include "conf.hpp" #include "crypto.hpp" -#include "sc.hpp" +#include "./sc/sc.hpp" #include "hplog.hpp" #include "usr/usr.hpp" #include "usr/read_req.hpp" #include "p2p/p2p.hpp" #include "consensus.hpp" #include "ledger.hpp" -#include "hpfs/hpfs.hpp" +#include "ledger/ledger_sample.hpp" #include "unl.hpp" /** @@ -71,7 +71,8 @@ void deinit() p2p::deinit(); read_req::deinit(); consensus::deinit(); - hpfs::deinit(); + ledger::ledger_sample::deinit(); // Deinit method in new ledger implementation. + sc::deinit(); ledger::deinit(); conf::deinit(); } @@ -195,7 +196,8 @@ int main(int argc, char **argv) LOG_INFO << "Public key: " << conf::cfg.node.public_key_hex; LOG_INFO << "Contract: " << conf::cfg.contract.id << " (" << conf::cfg.contract.version << ")"; - if (hpfs::init() == -1 || + if (sc::init() == -1 || + ledger::ledger_sample::init() == -1 || // Init method of new ledger implementaiton. ledger::init() == -1 || unl::init() == -1 || consensus::init() == -1 || diff --git a/src/msg/fbuf/p2pmsg_content.fbs b/src/msg/fbuf/p2pmsg_content.fbs index f54d21bf..e807f272 100644 --- a/src/msg/fbuf/p2pmsg_content.fbs +++ b/src/msg/fbuf/p2pmsg_content.fbs @@ -93,7 +93,7 @@ table HistoryLedgerBlock { } table Hpfs_Request_Message { //Hpfs request message schema - mount_id: int32; + mount_id: uint32; parent_path:string; is_file:bool; block_id:int32; @@ -106,7 +106,7 @@ table Hpfs_Response_Message{ hpfs_response:Hpfs_Response; hash:[ubyte]; path: string; - mount_id: int32; + mount_id: uint32; } table Fs_Entry_Response{ diff --git a/src/msg/fbuf/p2pmsg_content_generated.h b/src/msg/fbuf/p2pmsg_content_generated.h index f1e000cc..2afbe856 100644 --- a/src/msg/fbuf/p2pmsg_content_generated.h +++ b/src/msg/fbuf/p2pmsg_content_generated.h @@ -1317,11 +1317,11 @@ struct Hpfs_Request_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table VT_BLOCK_ID = 10, VT_EXPECTED_HASH = 12 }; - int32_t mount_id() const { - return GetField(VT_MOUNT_ID, 0); + uint32_t mount_id() const { + return GetField(VT_MOUNT_ID, 0); } - bool mutate_mount_id(int32_t _mount_id) { - return SetField(VT_MOUNT_ID, _mount_id, 0); + bool mutate_mount_id(uint32_t _mount_id) { + return SetField(VT_MOUNT_ID, _mount_id, 0); } const flatbuffers::String *parent_path() const { return GetPointer(VT_PARENT_PATH); @@ -1349,7 +1349,7 @@ struct Hpfs_Request_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && - VerifyField(verifier, VT_MOUNT_ID) && + VerifyField(verifier, VT_MOUNT_ID) && VerifyOffset(verifier, VT_PARENT_PATH) && verifier.VerifyString(parent_path()) && VerifyField(verifier, VT_IS_FILE) && @@ -1364,8 +1364,8 @@ struct Hpfs_Request_MessageBuilder { typedef Hpfs_Request_Message Table; flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; - void add_mount_id(int32_t mount_id) { - fbb_.AddElement(Hpfs_Request_Message::VT_MOUNT_ID, mount_id, 0); + void add_mount_id(uint32_t mount_id) { + fbb_.AddElement(Hpfs_Request_Message::VT_MOUNT_ID, mount_id, 0); } void add_parent_path(flatbuffers::Offset parent_path) { fbb_.AddOffset(Hpfs_Request_Message::VT_PARENT_PATH, parent_path); @@ -1392,7 +1392,7 @@ struct Hpfs_Request_MessageBuilder { inline flatbuffers::Offset CreateHpfs_Request_Message( flatbuffers::FlatBufferBuilder &_fbb, - int32_t mount_id = 0, + uint32_t mount_id = 0, flatbuffers::Offset parent_path = 0, bool is_file = false, int32_t block_id = 0, @@ -1408,7 +1408,7 @@ inline flatbuffers::Offset CreateHpfs_Request_Message( inline flatbuffers::Offset CreateHpfs_Request_MessageDirect( flatbuffers::FlatBufferBuilder &_fbb, - int32_t mount_id = 0, + uint32_t mount_id = 0, const char *parent_path = nullptr, bool is_file = false, int32_t block_id = 0, @@ -1464,11 +1464,11 @@ struct Hpfs_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tabl flatbuffers::String *mutable_path() { return GetPointer(VT_PATH); } - int32_t mount_id() const { - return GetField(VT_MOUNT_ID, 0); + uint32_t mount_id() const { + return GetField(VT_MOUNT_ID, 0); } - bool mutate_mount_id(int32_t _mount_id) { - return SetField(VT_MOUNT_ID, _mount_id, 0); + bool mutate_mount_id(uint32_t _mount_id) { + return SetField(VT_MOUNT_ID, _mount_id, 0); } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && @@ -1479,7 +1479,7 @@ struct Hpfs_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tabl verifier.VerifyVector(hash()) && VerifyOffset(verifier, VT_PATH) && verifier.VerifyString(path()) && - VerifyField(verifier, VT_MOUNT_ID) && + VerifyField(verifier, VT_MOUNT_ID) && verifier.EndTable(); } }; @@ -1512,8 +1512,8 @@ struct Hpfs_Response_MessageBuilder { void add_path(flatbuffers::Offset path) { fbb_.AddOffset(Hpfs_Response_Message::VT_PATH, path); } - void add_mount_id(int32_t mount_id) { - fbb_.AddElement(Hpfs_Response_Message::VT_MOUNT_ID, mount_id, 0); + void add_mount_id(uint32_t mount_id) { + fbb_.AddElement(Hpfs_Response_Message::VT_MOUNT_ID, mount_id, 0); } explicit Hpfs_Response_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { @@ -1532,7 +1532,7 @@ inline flatbuffers::Offset CreateHpfs_Response_Message( flatbuffers::Offset hpfs_response = 0, flatbuffers::Offset> hash = 0, flatbuffers::Offset path = 0, - int32_t mount_id = 0) { + uint32_t mount_id = 0) { Hpfs_Response_MessageBuilder builder_(_fbb); builder_.add_mount_id(mount_id); builder_.add_path(path); @@ -1548,7 +1548,7 @@ inline flatbuffers::Offset CreateHpfs_Response_MessageDir flatbuffers::Offset hpfs_response = 0, const std::vector *hash = nullptr, const char *path = nullptr, - int32_t mount_id = 0) { + uint32_t mount_id = 0) { auto hash__ = hash ? _fbb.CreateVector(*hash) : 0; auto path__ = path ? _fbb.CreateString(path) : 0; return msg::fbuf::p2pmsg::CreateHpfs_Response_Message( diff --git a/src/msg/fbuf/p2pmsg_helpers.cpp b/src/msg/fbuf/p2pmsg_helpers.cpp index 5661bc98..c4d85839 100644 --- a/src/msg/fbuf/p2pmsg_helpers.cpp +++ b/src/msg/fbuf/p2pmsg_helpers.cpp @@ -492,12 +492,13 @@ namespace msg::fbuf::p2pmsg * Create content response message from the given content response. * @param container_builder Flatbuffer builder for the container message. * @param path The path of the directory. + * @param mount_id The mount id of the relavent hpfs mount. * @param hash_nodes File or directory entries with hashes in the given parent path. * @param expected_hash The exptected hash of the requested path. * @param lcl Lcl to be include in the container msg. */ void create_msg_from_fsentry_response( - flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, const int32_t mount_id, + flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, const uint32_t mount_id, std::vector &hash_nodes, util::h32 expected_hash, std::string_view lcl) { flatbuffers::FlatBufferBuilder builder(1024); @@ -525,11 +526,12 @@ namespace msg::fbuf::p2pmsg * Create content response message from the given content response. * @param container_builder Flatbuffer builder for the container message. * @param path The path of the directory. + * @param mount_id The mount id of the relavent hpfs mount. * @param hashmap Hashmap of the file * @param lcl Lcl to be include in the container msg. */ void create_msg_from_filehashmap_response( - flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, const int32_t mount_id, + flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, const uint32_t mount_id, std::vector &hashmap, std::size_t file_length, util::h32 expected_hash, std::string_view lcl) { // todo:get a average propsal message size and allocate content builder based on that. @@ -561,10 +563,11 @@ namespace msg::fbuf::p2pmsg /** * Create content response message from the given content response. * @param container_builder Flatbuffer builder for the container message. - * @param block_resp Block response struct to place in the message + * @param block_resp Block response struct to place in the message. + * @param mount_id The mount id of the relavent hpfs mount. * @param lcl Lcl to be include in the container message. */ - void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, const int32_t mount_id, std::string_view lcl) + void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, const uint32_t mount_id, std::string_view lcl) { // todo:get a average propsal message size and allocate content builder based on that. flatbuffers::FlatBufferBuilder builder(1024); diff --git a/src/msg/fbuf/p2pmsg_helpers.hpp b/src/msg/fbuf/p2pmsg_helpers.hpp index 0a044564..309b478d 100644 --- a/src/msg/fbuf/p2pmsg_helpers.hpp +++ b/src/msg/fbuf/p2pmsg_helpers.hpp @@ -58,14 +58,14 @@ namespace msg::fbuf::p2pmsg void create_msg_from_hpfs_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::hpfs_request &hr, std::string_view lcl); void create_msg_from_fsentry_response( - flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, const int32_t mount_id, + flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, const uint32_t mount_id, std::vector &hash_nodes, util::h32 expected_hash, std::string_view lcl); void create_msg_from_filehashmap_response( - flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, const int32_t mount_id, + flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, const uint32_t mount_id, std::vector &hashmap, std::size_t file_length, util::h32 expected_hash, std::string_view lcl); - void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, const int32_t mount_id, std::string_view lcl); + void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, const uint32_t mount_id, std::string_view lcl); void create_containermsg_from_content( flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, std::string_view lcl, const bool sign); @@ -101,10 +101,10 @@ namespace msg::fbuf::p2pmsg peer_propertiesvector_to_flatbuf_peer_propertieslist(flatbuffers::FlatBufferBuilder &builder, const std::vector &peers, const std::optional &skipping_ip_port); void flatbuf_hpfsfshashentry_to_hpfsfshashentry(std::unordered_map &fs_entries, - const flatbuffers::Vector> *fhashes); + const flatbuffers::Vector> *fhashes); void hpfsfilehash_to_flatbuf_hpfsfilehash(flatbuffers::FlatBufferBuilder &builder, std::vector> &list, - std::string_view full_path, bool is_file, std::string_view hash); + std::string_view full_path, bool is_file, std::string_view hash); flatbuffers::Offset>> hpfsfshashentry_to_flatbuff_hpfsfshashentry( diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index d8e14bc8..1c143fc0 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -14,8 +14,8 @@ namespace p2p { constexpr uint16_t PROPOSAL_LIST_CAP = 64; // Maximum proposal count. constexpr uint16_t NONUNL_PROPOSAL_LIST_CAP = 64; // Maximum nonunl proposal count. - constexpr uint16_t HPFS_REQ_LIST_CAP = 64; // Maximum state request count. - constexpr uint16_t HPFS_RES_LIST_CAP = 64; // Maximum state response count. + constexpr uint16_t HPFS_REQ_LIST_CAP = 64; // Maximum state request count. + constexpr uint16_t HPFS_RES_LIST_CAP = 64; // Maximum state response count. constexpr uint16_t PEER_LIST_CAP = 64; // Maximum peer count. struct proposal @@ -91,7 +91,7 @@ namespace p2p // Represents a hpfs request sent to a peer. struct hpfs_request { - int32_t mount_id; // Relavent file system id. + uint32_t mount_id; // Relavent file system id. std::string parent_path; // The requested file or dir path. bool is_file = false; // Whether the path is a file or dir. int32_t block_id = 0; // Block id of the file if we are requesting for file block. Otherwise -1. @@ -127,9 +127,17 @@ namespace p2p std::list> contract_hpfs_requests; std::mutex contract_hpfs_requests_mutex; // Mutex for contract fs hpfs requests access race conditions. + // List of pairs indicating the session pubkey hex and the ledger fs hpfs requests. + std::list> ledger_hpfs_requests; + std::mutex ledger_hpfs_requests_mutex; // Mutex for ledger fs hpfs requests access race conditions. + // List of pairs indicating the session pubkey hex and the contract fs hpfs responses. std::list> contract_hpfs_responses; std::mutex contract_hpfs_responses_mutex; // Mutex for contract fs hpfs responses access race conditions. + + // List of pairs indicating the session pubkey hex and the ledger fs hpfs responses. + std::list> ledger_hpfs_responses; + std::mutex ledger_hpfs_responses_mutex; // Mutex for ledger fs hpfs responses access race conditions. }; struct connected_context diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 6d258319..157122af 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -9,12 +9,11 @@ #include "../msg/fbuf/p2pmsg_content_generated.h" #include "../msg/fbuf/p2pmsg_helpers.hpp" #include "../msg/fbuf/common_helpers.hpp" -#include "../hpfs/hpfs_sync.hpp" +#include "../ledger/ledger_sample.hpp" #include "../ledger.hpp" #include "peer_comm_session.hpp" #include "p2p.hpp" #include "../unl.hpp" -#include "../hpfs/hpfs.hpp" namespace p2pmsg = msg::fbuf::p2pmsg; @@ -195,7 +194,7 @@ namespace p2p { const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(content_ptr); const p2p::hpfs_request hr = p2pmsg::create_hpfs_request_from_msg(*content->message_as_Hpfs_Request_Message()); - if (hr.mount_id == hpfs::contract_fs.mount_id) + if (hr.mount_id == sc::contract_fs.mount_id) { // Check the cap and insert request with lock. std::scoped_lock lock(ctx.collected_msgs.contract_hpfs_requests_mutex); @@ -210,6 +209,21 @@ namespace p2p LOG_DEBUG << "Hpfs contract fs request rejected. Maximum hpfs contract fs request count reached. " << session.display_name(); } } + else if (hr.mount_id == ledger::ledger_sample::ledger_fs.mount_id) + { + // Check the cap and insert request with lock. + std::scoped_lock lock(ctx.collected_msgs.ledger_hpfs_requests_mutex); + + // If max number of state requests reached skip the rest. + if (ctx.collected_msgs.ledger_hpfs_requests.size() < p2p::HPFS_REQ_LIST_CAP) + { + ctx.collected_msgs.ledger_hpfs_requests.push_back(std::make_pair(session.pubkey, std::move(hr))); + } + else + { + LOG_DEBUG << "Hpfs ledger fs request rejected. Maximum hpfs ledger fs request count reached. " << session.display_name(); + } + } } else if (content_message_type == p2pmsg::Message_Hpfs_Response_Message) { @@ -217,7 +231,7 @@ namespace p2p const msg::fbuf::p2pmsg::Hpfs_Response_Message *resp_msg = content->message_as_Hpfs_Response_Message(); // Only accept hpfs responses if hpfs fs is syncing. - if (hpfs::contract_sync.ctx.is_syncing && resp_msg->mount_id() == hpfs::contract_fs.mount_id) + if (sc::contract_sync_worker.is_syncing && resp_msg->mount_id() == sc::contract_fs.mount_id) { // Check the cap and insert state_response with lock. std::scoped_lock lock(ctx.collected_msgs.contract_hpfs_responses_mutex); @@ -233,6 +247,22 @@ namespace p2p LOG_DEBUG << "Contract hpfs response rejected. Maximum contract hpfs response count reached. " << session.display_name(); } } + else if (ledger::ledger_sample::ledger_sync_worker.is_syncing && resp_msg->mount_id() == ledger::ledger_sample::ledger_fs.mount_id) + { + // Check the cap and insert state_response with lock. + std::scoped_lock lock(ctx.collected_msgs.ledger_hpfs_responses_mutex); + + // If max number of state responses reached skip the rest. + if (ctx.collected_msgs.ledger_hpfs_responses.size() < p2p::HPFS_RES_LIST_CAP) + { + std::string response(reinterpret_cast(content_ptr), content_size); + ctx.collected_msgs.ledger_hpfs_responses.push_back(std::make_pair(session.uniqueid, std::move(response))); + } + else + { + LOG_DEBUG << "Ledger hpfs response rejected. Maximum ledger hpfs response count reached. " << session.display_name(); + } + } } else if (content_message_type == p2pmsg::Message_History_Request_Message) //message is a lcl history request message { diff --git a/src/sc/contract_mount.cpp b/src/sc/contract_mount.cpp new file mode 100644 index 00000000..f97a5c9b --- /dev/null +++ b/src/sc/contract_mount.cpp @@ -0,0 +1,30 @@ +#include "./contract_mount.hpp" + +namespace sc +{ + /** + * Perform contract file system mount related preparation tasks. + * @return Returns -1 on error and 0 on success. + */ + int contract_mount::prepare_fs() + { + util::h32 initial_state_hash; + util::h32 initial_patch_hash; + + if (acquire_rw_session() == -1 || + conf::populate_patch_config() == -1 || + get_hash(initial_state_hash, hpfs::RW_SESSION_NAME, hpfs::STATE_DIR_PATH) == -1 || + get_hash(initial_patch_hash, hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH) == -1 || + release_rw_session() == -1) + { + LOG_ERROR << "Failed to prepare initial fs at mount " << mount_dir << "."; + return -1; + } + + set_parent_hash(hpfs::STATE_DIR_PATH, initial_state_hash); + set_parent_hash(hpfs::PATCH_FILE_PATH, initial_patch_hash); + LOG_INFO << "Initial state: " << initial_state_hash << " | patch: " << initial_patch_hash; + return 0; + } + +} \ No newline at end of file diff --git a/src/sc/contract_mount.hpp b/src/sc/contract_mount.hpp new file mode 100644 index 00000000..d7cc516a --- /dev/null +++ b/src/sc/contract_mount.hpp @@ -0,0 +1,20 @@ +#ifndef _HP_SC_CONTRACT_MOUNT_ +#define _HP_SC_CONTRACT_MOUNT_ + +#include "../pchheader.hpp" +#include "../util/h32.hpp" +#include "../conf.hpp" +#include "../hpfs/hpfs_mount.hpp" + +namespace sc +{ + /** + * Represents contract file system mount. + */ + class contract_mount : public hpfs::hpfs_mount + { + private: + int prepare_fs(); + }; +} // namespace sc +#endif \ No newline at end of file diff --git a/src/sc/contract_serve.cpp b/src/sc/contract_serve.cpp new file mode 100644 index 00000000..961bee2d --- /dev/null +++ b/src/sc/contract_serve.cpp @@ -0,0 +1,14 @@ + +#include "./contract_serve.hpp" + +namespace sc +{ + void contract_serve::swap_collected_requests() + { + std::scoped_lock lock(p2p::ctx.collected_msgs.contract_hpfs_requests_mutex); + + // Move collected hpfs requests for contract fs over to local requests list. + if (!p2p::ctx.collected_msgs.contract_hpfs_requests.empty()) + hpfs_requests.splice(hpfs_requests.end(), p2p::ctx.collected_msgs.contract_hpfs_requests); + } +} // namespace sc \ No newline at end of file diff --git a/src/sc/contract_serve.hpp b/src/sc/contract_serve.hpp new file mode 100644 index 00000000..79170bce --- /dev/null +++ b/src/sc/contract_serve.hpp @@ -0,0 +1,17 @@ +#ifndef _HP_SC_CONTRACT_SERVE_ +#define _HP_SC_CONTRACT_SERVE_ + +#include "../pchheader.hpp" +#include "../util/h32.hpp" +#include "../conf.hpp" +#include "../hpfs/hpfs_serve.hpp" + +namespace sc +{ + class contract_serve : public hpfs::hpfs_serve + { + private: + void swap_collected_requests(); + }; +} // namespace sc +#endif \ No newline at end of file diff --git a/src/sc/contract_sync.cpp b/src/sc/contract_sync.cpp new file mode 100644 index 00000000..87414f5f --- /dev/null +++ b/src/sc/contract_sync.cpp @@ -0,0 +1,36 @@ + +#include "./contract_sync.hpp" +#include "../unl.hpp" +#include "../hpfs/hpfs_mount.hpp" + +namespace sc +{ + + void contract_sync::on_current_sync_state_acheived(const util::h32 &acheived_hash) + { + if (current_target.vpath == hpfs::PATCH_FILE_PATH) + { + // Appling new patch file changes to hpcore runtime. + if (conf::apply_patch_config(hpfs::RW_SESSION_NAME) == -1) + { + LOG_ERROR << "Appling patch file changes after sync failed"; + } + else + { + unl::update_unl_changes_from_patch(); + + // Update global hash tracker with the new patch file hash. + fs_mount->set_parent_hash(current_target.vpath, acheived_hash); + } + } + } + + void contract_sync::swap_collected_responses() + { + std::scoped_lock lock(p2p::ctx.collected_msgs.contract_hpfs_responses_mutex); + + // Move collected hpfs responses over to local candidate responses list. + if (!p2p::ctx.collected_msgs.contract_hpfs_responses.empty()) + candidate_hpfs_responses.splice(candidate_hpfs_responses.end(), p2p::ctx.collected_msgs.contract_hpfs_responses); + } +} // namespace sc \ No newline at end of file diff --git a/src/sc/contract_sync.hpp b/src/sc/contract_sync.hpp new file mode 100644 index 00000000..b9812c1f --- /dev/null +++ b/src/sc/contract_sync.hpp @@ -0,0 +1,18 @@ +#ifndef _HP_SC_CONTRACT_SYNC_ +#define _HP_SC_CONTRACT_SYNC_ + +#include "../pchheader.hpp" +#include "../util/h32.hpp" +#include "../conf.hpp" +#include "../hpfs/hpfs_sync.hpp" + +namespace sc +{ + class contract_sync : public hpfs::hpfs_sync + { + private: + void on_current_sync_state_acheived(const util::h32 &acheived_hash); + void swap_collected_responses(); + }; +} // namespace sc +#endif \ No newline at end of file diff --git a/src/sc.cpp b/src/sc/sc.cpp similarity index 94% rename from src/sc.cpp rename to src/sc/sc.cpp index 636a9a20..32e7df0a 100644 --- a/src/sc.cpp +++ b/src/sc/sc.cpp @@ -1,14 +1,14 @@ -#include "pchheader.hpp" -#include "conf.hpp" -#include "consensus.hpp" -#include "hplog.hpp" -#include "ledger.hpp" +#include "../pchheader.hpp" +#include "../conf.hpp" +#include "../consensus.hpp" +#include "../hplog.hpp" +#include "../ledger.hpp" #include "sc.hpp" -#include "hpfs/hpfs.hpp" -#include "msg/fbuf/p2pmsg_helpers.hpp" -#include "msg/controlmsg_common.hpp" -#include "msg/controlmsg_parser.hpp" -#include "unl.hpp" +#include "../msg/fbuf/p2pmsg_helpers.hpp" +#include "../msg/controlmsg_common.hpp" +#include "../msg/controlmsg_parser.hpp" +#include "../unl.hpp" +#include "contract_serve.hpp" namespace sc { @@ -17,6 +17,40 @@ namespace sc constexpr const char *STDOUT_LOG = ".stdout.log"; constexpr const char *STDERR_LOG = ".stderr.log"; + constexpr uint32_t CONTRACT_FS_ID = 0; + + sc::contract_mount contract_fs; // Global contract file system instance. + sc::contract_sync contract_sync_worker; // Global contract file system sync instance. + sc::contract_serve contract_server; // Contract file server instance. + + int init() + { + if (contract_fs.init(CONTRACT_FS_ID, conf::ctx.contract_hpfs_dir, conf::ctx.contract_hpfs_mount_dir, conf::ctx.contract_hpfs_rw_dir, conf::cfg.node.full_history) == -1) + { + LOG_ERROR << "Contract file system initialization failed."; + return -1; + } + + if (contract_server.init("contract", &contract_fs) == -1) + { + LOG_ERROR << "Contract file system serve worker initialization failed."; + return -1; + } + + if (contract_sync_worker.init("contract", &contract_fs) == -1) + { + LOG_ERROR << "Contract file system sync worker initialization failed."; + return -1; + } + return 0; + } + + void deinit() + { + contract_fs.deinit(); + contract_server.deinit(); + contract_sync_worker.deinit(); + } /** * Executes the contract process and passes the specified context arguments. * @return 0 on successful process creation. -1 on failure or contract process is already running. @@ -92,7 +126,7 @@ namespace sc execv_args[j] = conf::cfg.contract.runtime_binexec_args[i].data(); execv_args[len - 1] = NULL; - const std::string current_dir = hpfs::contract_fs.physical_path(ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH); + const std::string current_dir = contract_fs.physical_path(ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH); chdir(current_dir.c_str()); if (create_contract_log_files(ctx) == -1) @@ -165,8 +199,8 @@ namespace sc if (!ctx.args.readonly) ctx.args.hpfs_session_name = hpfs::RW_SESSION_NAME; - return ctx.args.readonly ? hpfs::contract_fs.start_ro_session(ctx.args.hpfs_session_name, false) - : hpfs::contract_fs.acquire_rw_session(); + return ctx.args.readonly ? contract_fs.start_ro_session(ctx.args.hpfs_session_name, false) + : contract_fs.acquire_rw_session(); } /** @@ -174,7 +208,6 @@ namespace sc */ int stop_hpfs_session(execution_context &ctx) { - hpfs::hpfs_mount &contract_fs = hpfs::contract_fs; if (ctx.args.readonly) { return contract_fs.stop_ro_session(ctx.args.hpfs_session_name); diff --git a/src/sc.hpp b/src/sc/sc.hpp similarity index 92% rename from src/sc.hpp rename to src/sc/sc.hpp index 8a439ec3..7e1f06b1 100644 --- a/src/sc.hpp +++ b/src/sc/sc.hpp @@ -1,12 +1,14 @@ -#ifndef _HP_SC_ -#define _HP_SC_ +#ifndef _HP_SC_SC_ +#define _HP_SC_SC_ -#include "pchheader.hpp" -#include "usr/usr.hpp" -#include "util/h32.hpp" -#include "util/util.hpp" -#include "util/buffer_store.hpp" -#include "p2p/p2p.hpp" +#include "../pchheader.hpp" +#include "../usr/usr.hpp" +#include "../util/h32.hpp" +#include "../util/util.hpp" +#include "../util/buffer_store.hpp" +#include "../p2p/p2p.hpp" +#include "contract_mount.hpp" +#include "contract_sync.hpp" /** * Contains helper functions regarding POSIX process execution and IPC between HP and SC. @@ -125,6 +127,13 @@ namespace sc } }; + extern sc::contract_mount contract_fs; // Global contract file system instance. + extern sc::contract_sync contract_sync_worker; // Global contract file system sync instance. + + int init(); + + void deinit(); + int execute_contract(execution_context &ctx); //------Internal-use functions for this namespace. @@ -172,7 +181,7 @@ namespace sc void cleanup_fds(execution_context &ctx); void cleanup_fd_pair(fd_pair &fds); - + void stop(execution_context &ctx); void handle_control_msg(execution_context &ctx, std::string_view msg); diff --git a/src/usr/read_req.hpp b/src/usr/read_req.hpp index 619eb0d8..9721be46 100644 --- a/src/usr/read_req.hpp +++ b/src/usr/read_req.hpp @@ -1,7 +1,7 @@ #ifndef _HP_USR_READ_REQ_ #define _HP_USR_READ_REQ_ -#include "../sc.hpp" +#include "../sc/sc.hpp" #include "../util/buffer_store.hpp" namespace read_req diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 38291af2..18555dd1 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -16,7 +16,6 @@ #include "user_input.hpp" #include "read_req.hpp" #include "input_nonce_map.hpp" -#include "../hpfs/hpfs.hpp" namespace usr { @@ -392,7 +391,7 @@ namespace usr util::fork_detach(); // before execution chdir into a valid the latest state data directory that contains an appbill.table - const std::string appbill_dir = hpfs::contract_fs.rw_dir + hpfs::STATE_DIR_PATH; + const std::string appbill_dir = sc::contract_fs.rw_dir + hpfs::STATE_DIR_PATH; chdir(appbill_dir.c_str()); int ret = execv(execv_args[0], execv_args); std::cerr << errno << ": Appbill process execv failed.\n"; diff --git a/test/local-cluster/cluster-create.sh b/test/local-cluster/cluster-create.sh index 637c5281..8bf3aa88 100755 --- a/test/local-cluster/cluster-create.sh +++ b/test/local-cluster/cluster-create.sh @@ -110,6 +110,8 @@ do user: ${user_json}, \ log: {\ loglevel: '$loglevel', \ + max_mbytes_per_file: 10, \ + max_file_count": 50, \ loggers:['console', 'file'] \ }\ }, null, 2)" > hp.cfg @@ -180,9 +182,9 @@ done for (( i=1; i<=$ncount; i++ )) do - mkdir -p ./node$i/hpfs/seed/ > /dev/null 2>&1 + mkdir -p ./node$i/contract_fs/seed/ > /dev/null 2>&1 - pushd ./node$i/hpfs/seed/state/ > /dev/null 2>&1 + pushd ./node$i/contract_fs/seed/state/ > /dev/null 2>&1 # Load credit balance for user for appbill testing purposes. >appbill.table