Separate contract and ledger file system mounts. (#235)

- Creating two separate hpfs folders for contract and ledger file systems (contract_fs and ledger_fs).
- Added separate mounts for contract and ledger file systems.
- Added separate instances for contract serving and ledger serving.
- Added separate instances for contract syncing and ledger syncing.
- Modified cluster creating script to accompany folder name changes in contract folder.
This commit is contained in:
Savinda Senevirathne
2021-02-05 11:48:52 +05:30
committed by GitHub
parent ae55a6ea5a
commit a4399544b9
39 changed files with 622 additions and 359 deletions

View File

@@ -34,12 +34,14 @@ add_executable(hpcore
src/crypto.cpp
src/conf.cpp
src/hplog.cpp
src/sc.cpp
src/bill/corebill.cpp
src/hpfs/hpfs.cpp
src/hpfs/hpfs_mount.cpp
src/hpfs/hpfs_serve.cpp
src/hpfs/hpfs_sync.cpp
src/sc/contract_mount.cpp
src/sc/contract_serve.cpp
src/sc/contract_sync.cpp
src/sc/sc.cpp
src/comm/comm_session.cpp
src/msg/fbuf/common_helpers.cpp
src/msg/fbuf/p2pmsg_helpers.cpp
@@ -61,6 +63,9 @@ add_executable(hpcore
src/usr/read_req.cpp
src/ledger.cpp
src/ledger/sqlite.cpp
src/ledger/ledger_mount.cpp
src/ledger/ledger_sync.cpp
src/ledger/ledger_serve.cpp
src/ledger/ledger_sample.cpp
src/consensus.cpp
src/main.cpp

View File

@@ -1,7 +1,7 @@
#include "pchheader.hpp"
#include "conf.hpp"
#include "crypto.hpp"
#include "hpfs/hpfs.hpp"
#include "sc/sc.hpp"
#include "util/util.hpp"
namespace conf
@@ -109,9 +109,13 @@ namespace conf
if (util::create_dir_tree_recursive(ctx.config_dir) == -1 ||
util::create_dir_tree_recursive(ctx.hist_dir) == -1 ||
util::create_dir_tree_recursive(ctx.full_hist_dir) == -1 ||
util::create_dir_tree_recursive(ctx.contract_log_dir) == -1 ||
util::create_dir_tree_recursive(ctx.hpfs_dir + "/seed" + hpfs::STATE_DIR_PATH) == -1 ||
util::create_dir_tree_recursive(ctx.hpfs_mount_dir) == -1)
util::create_dir_tree_recursive(ctx.log_dir) == -1 ||
util::create_dir_tree_recursive(ctx.contract_hpfs_dir + "/seed" + hpfs::STATE_DIR_PATH) == -1 ||
util::create_dir_tree_recursive(ctx.contract_hpfs_mount_dir) == -1 ||
util::create_dir_tree_recursive(ctx.ledger_hpfs_dir + "/seed" + hpfs::LEDGER_PRIMARY_DIR) == -1 ||
util::create_dir_tree_recursive(ctx.ledger_hpfs_dir + "/seed" + hpfs::LEDGER_BLOB_DIR) == -1 ||
util::create_dir_tree_recursive(ctx.ledger_hpfs_mount_dir) == -1 ||
util::create_dir_tree_recursive(ctx.contract_log_dir) == -1)
{
std::cerr << "ERROR: unable to create directories.\n";
return -1;
@@ -204,9 +208,12 @@ namespace conf
ctx.tls_cert_file = ctx.config_dir + "/tlscert.pem";
ctx.hist_dir = basedir + "/hist";
ctx.full_hist_dir = basedir + "/fullhist";
ctx.hpfs_dir = basedir + "/hpfs";
ctx.hpfs_mount_dir = ctx.hpfs_dir + "/mnt";
ctx.hpfs_rw_dir = ctx.hpfs_mount_dir + "/rw";
ctx.contract_hpfs_dir = basedir + "/contract_fs";
ctx.contract_hpfs_mount_dir = ctx.contract_hpfs_dir + "/mnt";
ctx.contract_hpfs_rw_dir = ctx.contract_hpfs_mount_dir + "/rw";
ctx.ledger_hpfs_dir = basedir + "/ledger_fs";
ctx.ledger_hpfs_mount_dir = ctx.ledger_hpfs_dir + "/mnt";
ctx.ledger_hpfs_rw_dir = ctx.ledger_hpfs_mount_dir + "/rw";
ctx.log_dir = basedir + "/log";
ctx.contract_log_dir = ctx.log_dir + "/contract";
}
@@ -586,12 +593,13 @@ namespace conf
*/
int validate_contract_dir_paths()
{
const std::string paths[9] = {
const std::string paths[10] = {
ctx.contract_dir,
ctx.config_file,
ctx.hist_dir,
ctx.full_hist_dir,
ctx.hpfs_dir,
ctx.contract_hpfs_dir,
ctx.ledger_hpfs_dir,
ctx.tls_key_file,
ctx.tls_cert_file,
ctx.hpfs_exe_path,
@@ -674,7 +682,7 @@ namespace conf
jsoncons::ojson jdoc;
populate_contract_section_json(jdoc, cfg.contract, true);
const std::string patch_file_path = hpfs::contract_fs.physical_path(hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH);
const std::string patch_file_path = sc::contract_fs.physical_path(hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH);
return write_json_file(patch_file_path, jdoc);
}
@@ -686,7 +694,7 @@ namespace conf
*/
int apply_patch_config(std::string_view hpfs_session_name)
{
const std::string path = hpfs::contract_fs.physical_path(hpfs_session_name, hpfs::PATCH_FILE_PATH);
const std::string path = sc::contract_fs.physical_path(hpfs_session_name, hpfs::PATCH_FILE_PATH);
if (!util::is_file_exists(path))
return 0;
@@ -909,7 +917,7 @@ namespace conf
// Uncomment for docker-based execution.
// std::string volumearg;
// volumearg.append("type=bind,source=").append(ctx.hpfs_dir).append(",target=/hpfs");
// volumearg.append("type=bind,source=").append(ctx.contract_hpfs_dir).append(",target=/hpfs");
// const char *dockerargs[] = {"/usr/bin/docker", "run", "--rm", "-i", "--mount", volumearg.data(), contract.bin_path.data()};
// contract.runtime_binexec_args.insert(contract.runtime_binexec_args.begin(), std::begin(dockerargs), std::end(dockerargs));

View File

@@ -143,18 +143,21 @@ namespace conf
std::string hpws_exe_path; // hpws executable file path.
std::string hpfs_exe_path; // hpfs executable file path.
std::string contract_dir; // Contract base directory full path.
std::string full_hist_dir; // Contract full history dir full path.
std::string hist_dir; // Contract ledger history dir full path.
std::string hpfs_dir; // hpfs metdata dir (The location of hpfs log file).
std::string hpfs_mount_dir; // hpfs fuse file system mount path.
std::string hpfs_rw_dir; // hpfs read/write fs session path.
std::string log_dir; // HotPocket log dir full path.
std::string contract_log_dir; // Contract log dir full path.
std::string config_dir; // Config dir full path.
std::string config_file; // Full path to the config file.
std::string tls_key_file; // Full path to the tls private key file.
std::string tls_cert_file; // Full path to the tls certificate.
std::string contract_dir; // Contract base directory full path.
std::string full_hist_dir; // Contract full history dir full path.
std::string hist_dir; // Contract ledger history dir full path.
std::string contract_hpfs_dir; // Contract hpfs metdata dir (The location of hpfs log file).
std::string contract_hpfs_mount_dir; // Contract hpfs fuse file system mount path.
std::string contract_hpfs_rw_dir; // Contract hpfs read/write fs session path.
std::string ledger_hpfs_dir; // Ledger hpfs metdata dir (The location of hpfs log file).
std::string ledger_hpfs_mount_dir; // Ledger hpfs fuse file system mount path.
std::string ledger_hpfs_rw_dir; // Ledger hpfs read/write fs session path.
std::string log_dir; // HotPocket log dir full path.
std::string contract_log_dir; // Contract log dir full path.
std::string config_dir; // Config dir full path.
std::string config_file; // Full path to the config file.
std::string tls_key_file; // Full path to the tls private key file.
std::string tls_cert_file; // Full path to the tls certificate.
int config_fd; // Config file file descriptor.
struct flock config_lock; // Config file lock.

View File

@@ -10,10 +10,7 @@
#include "p2p/peer_session_handler.hpp"
#include "hplog.hpp"
#include "crypto.hpp"
#include "sc.hpp"
#include "util/h32.hpp"
#include "hpfs/hpfs.hpp"
#include "hpfs/hpfs_sync.hpp"
#include "unl.hpp"
#include "ledger.hpp"
#include "consensus.hpp"
@@ -113,9 +110,8 @@ namespace consensus
// Get current lcl and state.
std::string lcl = ledger::ctx.get_lcl();
const uint64_t lcl_seq_no = ledger::ctx.get_seq_no();
hpfs::hpfs_mount &contract_fs = hpfs::contract_fs; // Ref of the contract_fs object.
util::h32 state_hash = contract_fs.get_parent_hash(hpfs::STATE_DIR_PATH);
util::h32 patch_hash = contract_fs.get_parent_hash(hpfs::PATCH_FILE_PATH);
util::h32 state_hash = sc::contract_fs.get_parent_hash(hpfs::STATE_DIR_PATH);
util::h32 patch_hash = sc::contract_fs.get_parent_hash(hpfs::PATCH_FILE_PATH);
if (ctx.stage == 0)
{
@@ -214,7 +210,7 @@ namespace consensus
sync_target_list.push(hpfs::sync_target{"state", majority_state_hash, hpfs::STATE_DIR_PATH, hpfs::BACKLOG_ITEM_TYPE::DIR});
// Set sync targets for contract fs.
hpfs::contract_sync.set_target(std::move(sync_target_list));
sc::contract_sync_worker.set_target(std::move(sync_target_list));
}
// Proceed further only if both lcl and state are in sync with majority.
@@ -238,7 +234,7 @@ namespace consensus
*/
void check_sync_completion()
{
if (conf::cfg.node.role == conf::ROLE::OBSERVER && !hpfs::contract_sync.ctx.is_syncing && !ledger::sync_ctx.is_syncing)
if (conf::cfg.node.role == conf::ROLE::OBSERVER && !sc::contract_sync_worker.is_syncing && !ledger::sync_ctx.is_syncing)
conf::change_role(conf::ROLE::VALIDATOR);
}
@@ -770,7 +766,7 @@ namespace consensus
}
}
is_state_desync = (hpfs::contract_fs.get_parent_hash(hpfs::STATE_DIR_PATH) != majority_state_hash);
is_state_desync = (sc::contract_fs.get_parent_hash(hpfs::STATE_DIR_PATH) != majority_state_hash);
}
/**
@@ -796,7 +792,7 @@ namespace consensus
}
}
is_patch_desync = (hpfs::contract_fs.get_parent_hash(hpfs::PATCH_FILE_PATH) != majority_patch_hash);
is_patch_desync = (sc::contract_fs.get_parent_hash(hpfs::PATCH_FILE_PATH) != majority_patch_hash);
}
/**
@@ -883,7 +879,7 @@ namespace consensus
}
// Update state hash in contract fs global hash tracker.
hpfs::contract_fs.set_parent_hash(hpfs::STATE_DIR_PATH, args.post_execution_state_hash);
sc::contract_fs.set_parent_hash(hpfs::STATE_DIR_PATH, args.post_execution_state_hash);
new_state_hash = args.post_execution_state_hash;
extract_user_outputs_from_contract_bufmap(args.userbufs);
@@ -1052,18 +1048,16 @@ namespace consensus
*/
int apply_consensed_patch_file_changes(const util::h32 &prop_patch_hash, const util::h32 &current_patch_hash)
{
hpfs::hpfs_mount &contract_fs = hpfs::contract_fs;
// Check whether is there any patch changes to be applied which reached consensus.
if (is_patch_update_pending && current_patch_hash == prop_patch_hash)
{
if (contract_fs.start_ro_session(HPFS_SESSION_NAME, false) != -1)
if (sc::contract_fs.start_ro_session(HPFS_SESSION_NAME, false) != -1)
{
// Appling new patch file changes to hpcore runtime.
if (conf::apply_patch_config(HPFS_SESSION_NAME) == -1)
{
LOG_ERROR << "Appling patch file changes after consensus failed.";
contract_fs.stop_ro_session(HPFS_SESSION_NAME);
sc::contract_fs.stop_ro_session(HPFS_SESSION_NAME);
return -1;
}
else
@@ -1073,7 +1067,7 @@ namespace consensus
}
}
if (contract_fs.stop_ro_session(HPFS_SESSION_NAME) == -1)
if (sc::contract_fs.stop_ro_session(HPFS_SESSION_NAME) == -1)
return -1;
}
return 0;

View File

@@ -5,11 +5,10 @@
#include "util/util.hpp"
#include "util/buffer_store.hpp"
#include "util/merkle_hash_tree.hpp"
#include "sc.hpp"
#include "./sc/sc.hpp"
#include "p2p/p2p.hpp"
#include "usr/user_input.hpp"
#include "util/h32.hpp"
#include "sc.hpp"
namespace consensus
{

View File

@@ -1,47 +0,0 @@
#include "./hpfs.hpp"
#include "../conf.hpp"
#include "./hpfs_serve.hpp"
namespace hpfs
{
hpfs::hpfs_mount contract_fs; // Global contract file system instance.
hpfs::hpfs_sync contract_sync; // Global contract file system sync instance.
hpfs::hpfs_serve contract_serve;
/**
* Initialize necessary file system mounts to hpcore.
*/
int init()
{
if (contract_fs.init(CONTRACT_FS_ID, conf::ctx.hpfs_dir, conf::ctx.hpfs_mount_dir, conf::ctx.hpfs_rw_dir, conf::cfg.node.full_history) == -1)
{
LOG_ERROR << "Contract file system initialization failed.";
return -1;
}
if (contract_serve.init("contract", &contract_fs) == -1)
{
LOG_ERROR << "Contract file system serve worker initialization failed.";
return -1;
}
if (contract_sync.init("contract", &contract_fs) == -1)
{
LOG_ERROR << "Contract file system sync worker initialization failed.";
return -1;
}
return 0;
}
/**
* Perform cleanups on created mounts.
*/
void deinit()
{
contract_fs.deinit();
contract_serve.deinit();
contract_sync.deinit();
}
} // namespace hpfs

View File

@@ -1,17 +0,0 @@
#ifndef _HP_HPFS_HPFS
#define _HP_HPFS_HPFS
#include "./hpfs_mount.hpp"
#include "./hpfs_sync.hpp"
namespace hpfs
{
constexpr int32_t CONTRACT_FS_ID = 0;
extern hpfs::hpfs_mount contract_fs; // Global contract file system instance.
extern hpfs::hpfs_sync contract_sync; // Global contract file system sync instance.
int init();
void deinit();
} // namespace hpfs
#endif

View File

@@ -3,7 +3,7 @@
#include "../hplog.hpp"
#include "../util/util.hpp"
#include "../util/h32.hpp"
#include "../sc.hpp"
#include "../sc/sc.hpp"
namespace hpfs
{
@@ -23,7 +23,7 @@ namespace hpfs
/**
* This should be called to activate the hpfs mount process.
*/
int hpfs_mount::init(const int32_t mount_id, std::string_view fs_dir, std::string_view mount_dir, std::string_view rw_dir, const bool is_full_history)
int hpfs_mount::init(const uint32_t mount_id, std::string_view fs_dir, std::string_view mount_dir, std::string_view rw_dir, const bool is_full_history)
{
this->mount_id = mount_id;
this->fs_dir = fs_dir;
@@ -62,23 +62,6 @@ namespace hpfs
*/
int hpfs_mount::prepare_fs()
{
// This contract mount specific preparation logic will be moved to a seprate child class in the next PBI.
util::h32 initial_state_hash;
util::h32 initial_patch_hash;
if (acquire_rw_session() == -1 ||
conf::populate_patch_config() == -1 ||
get_hash(initial_state_hash, RW_SESSION_NAME, hpfs::STATE_DIR_PATH) == -1 ||
get_hash(initial_patch_hash, RW_SESSION_NAME, hpfs::PATCH_FILE_PATH) == -1 ||
release_rw_session() == -1)
{
LOG_ERROR << "Failed to prepare initial fs at mount " << mount_dir << ".";
return -1;
}
set_parent_hash(hpfs::STATE_DIR_PATH, initial_state_hash);
set_parent_hash(hpfs::PATCH_FILE_PATH, initial_patch_hash);
LOG_INFO << "Initial state: " << initial_state_hash << " | patch: " << initial_patch_hash;
return 0;
}

View File

@@ -7,10 +7,12 @@
namespace hpfs
{
constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB;
constexpr const char *RW_SESSION_NAME = "rw"; // The built-in session name used by hpfs for RW sessions.
constexpr const char *STATE_DIR_PATH = "/state"; // State directory name.
constexpr const char *PATCH_FILE_PATH = "/patch.cfg"; // Config patch filename.
constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB;
constexpr const char *RW_SESSION_NAME = "rw"; // The built-in session name used by hpfs for RW sessions.
constexpr const char *STATE_DIR_PATH = "/state"; // State directory name.
constexpr const char *PATCH_FILE_PATH = "/patch.cfg"; // Config patch filename.
constexpr const char *LEDGER_PRIMARY_DIR = "/primary"; // Ledger primary directory name.
constexpr const char *LEDGER_BLOB_DIR = "/blob"; // Ledger blob directory name.
struct child_hash_node
{
@@ -37,7 +39,6 @@ namespace hpfs
private:
pid_t hpfs_pid = 0;
std::string fs_dir;
std::string mount_dir;
bool is_full_history;
bool init_success = false;
// Keeps the hashes of hpfs parents against its vpath.
@@ -49,12 +50,13 @@ namespace hpfs
std::mutex rw_mutex;
protected:
std::string mount_dir;
virtual int prepare_fs();
public:
int32_t mount_id; // Used in hpfs serving and syncing.
uint32_t mount_id; // Used in hpfs serving and syncing.
std::string rw_dir;
int init(const int32_t mount_id, std::string_view fs_dir, std::string_view mount_dir, std::string_view rw_dir, const bool is_full_history);
int init(const uint32_t mount_id, std::string_view fs_dir, std::string_view mount_dir, std::string_view rw_dir, const bool is_full_history);
void deinit();
int start_hpfs_process();

View File

@@ -57,7 +57,6 @@ namespace hpfs
LOG_INFO << "Hpfs " << name << " server started.";
std::list<std::pair<std::string, p2p::hpfs_request>> hpfs_requests;
// Indicates whether any requests were processed in the previous loop iteration.
bool prev_requests_processed = false;
@@ -68,7 +67,7 @@ namespace hpfs
if (!prev_requests_processed)
util::sleep(LOOP_WAIT);
swap_collected_requests(hpfs_requests);
swap_collected_requests();
prev_requests_processed = !hpfs_requests.empty();
const uint64_t time_start = util::get_epoch_milliseconds();
@@ -353,15 +352,4 @@ namespace hpfs
return result;
}
/**
* Move the collected requests from hpfs requests to the local hpfs request list.
*/
void hpfs_serve::swap_collected_requests(std::list<std::pair<std::string, p2p::hpfs_request>> &hpfs_requests)
{
std::scoped_lock<std::mutex> lock(p2p::ctx.collected_msgs.contract_hpfs_requests_mutex);
// Move collected hpfs requests for contract fs over to local requests list.
if (!p2p::ctx.collected_msgs.contract_hpfs_requests.empty())
hpfs_requests.splice(hpfs_requests.end(), p2p::ctx.collected_msgs.contract_hpfs_requests);
}
} // namespace hpfs

View File

@@ -21,7 +21,9 @@ namespace hpfs
void hpfs_serve_loop();
protected:
virtual void swap_collected_requests(std::list<std::pair<std::string, p2p::hpfs_request>> &hpfs_requests); // Must override in child classes.
std::list<std::pair<std::string, p2p::hpfs_request>> hpfs_requests;
// Move the collected requests from hpfs requests to a local response list.
virtual void swap_collected_requests() = 0; // Must override in child classes.
public:
int init(std::string_view name, hpfs::hpfs_mount *fs_mount);

View File

@@ -8,7 +8,7 @@
#include "../util/util.hpp"
#include "../util/h32.hpp"
#include "hpfs_sync.hpp"
#include "../sc.hpp"
#include "../sc/sc.hpp"
#include "../unl.hpp"
namespace hpfs
@@ -38,7 +38,7 @@ namespace hpfs
this->name = name;
this->fs_mount = fs_mount;
REQUEST_RESUBMIT_TIMEOUT = hpfs::get_request_resubmit_timeout();
ctx.hpfs_sync_thread = std::thread(&hpfs_sync::hpfs_syncer_loop, this);
hpfs_sync_thread = std::thread(&hpfs_sync::hpfs_syncer_loop, this);
init_success = true;
return 0;
}
@@ -50,9 +50,9 @@ namespace hpfs
{
if (init_success)
{
ctx.is_syncing = false;
ctx.is_shutting_down = true;
ctx.hpfs_sync_thread.join();
is_syncing = false;
is_shutting_down = true;
hpfs_sync_thread.join();
}
}
@@ -67,15 +67,15 @@ namespace hpfs
return;
// Do not do anything if we are already syncing towards the specified target states.
if (ctx.is_shutting_down || (ctx.is_syncing && ctx.original_target_list == target_list))
if (is_shutting_down || (is_syncing && original_target_list == target_list))
return;
ctx.original_target_list = target_list;
ctx.target_list = std::move(target_list);
this->original_target_list = target_list;
this->target_list = std::move(target_list);
std::unique_lock lock(ctx.current_target_mutex);
ctx.current_target = ctx.target_list.front(); // Make the first element of the list the first target to sync.
ctx.is_syncing = true;
std::unique_lock lock(current_target_mutex);
current_target = target_list.front(); // Make the first element of the list the first target to sync.
is_syncing = true;
}
/**
@@ -85,42 +85,42 @@ namespace hpfs
{
util::mask_signal();
LOG_INFO << "hpfs " << name << " sync: Worker started.";
LOG_INFO << "Hpfs " << name << " sync: Worker started.";
while (!ctx.is_shutting_down)
while (!is_shutting_down)
{
util::sleep(IDLE_WAIT);
// Keep idling if we are not doing any sync activity.
if (!ctx.is_syncing)
if (!is_syncing)
continue;
if (fs_mount->acquire_rw_session() != -1)
{
while (!ctx.is_shutting_down)
while (!is_shutting_down)
{
{
std::shared_lock lock(ctx.current_target_mutex);
LOG_INFO << "hpfs " << name << " sync: Starting sync for target " << ctx.current_target.name << " hash: " << ctx.current_target.hash;
std::shared_lock lock(current_target_mutex);
LOG_INFO << "Hpfs " << name << " sync: Starting sync for target " << current_target.name << " hash: " << current_target.hash;
}
util::h32 new_state = util::h32_empty;
const int result = request_loop(ctx.current_target.hash, new_state);
const int result = request_loop(current_target.hash, new_state);
ctx.pending_requests.clear();
ctx.candidate_hpfs_responses.clear();
ctx.submitted_requests.clear();
pending_requests.clear();
candidate_hpfs_responses.clear();
submitted_requests.clear();
if (result == -1 || result == 1 || ctx.is_shutting_down)
if (result == -1 || result == 1 || is_shutting_down)
break;
{
std::shared_lock lock(ctx.current_target_mutex);
std::shared_lock lock(current_target_mutex);
if (new_state == ctx.current_target.hash)
if (new_state == current_target.hash)
{
LOG_INFO << "hpfs " << name << " sync: Target " << ctx.current_target.name << " hash achieved: " << new_state;
on_current_sync_state_acheived();
LOG_INFO << "Hpfs " << name << " sync: Target " << current_target.name << " hash achieved: " << new_state;
on_current_sync_state_acheived(new_state);
// Start syncing to next target.
const int result = start_syncing_next_target();
@@ -131,26 +131,26 @@ namespace hpfs
}
else
{
LOG_INFO << "hpfs " << name << " sync: Continuing sync for new target: " << ctx.current_target.hash;
LOG_INFO << "Hpfs " << name << " sync: Continuing sync for new target: " << current_target.hash;
continue;
}
}
}
LOG_INFO << "hpfs " << name << " sync: All parents synced.";
LOG_INFO << "Hpfs " << name << " sync: All parents synced.";
fs_mount->release_rw_session();
}
else
{
LOG_ERROR << "hpfs " << name << " sync: Failed to start hpfs rw session";
LOG_ERROR << "Hpfs " << name << " sync: Failed to start hpfs rw session";
}
// Clear target list and original target list since the sync is complete.
ctx.target_list = {};
ctx.original_target_list = {};
ctx.is_syncing = false;
target_list = {};
original_target_list = {};
is_syncing = false;
}
LOG_INFO << "hpfs " << name << " sync: Worker stopped.";
LOG_INFO << "Hpfs " << name << " sync: Worker stopped.";
}
/**
@@ -158,12 +158,12 @@ namespace hpfs
* @return -1 on error. 0 when current sync state acheived or sync is stopped due to target change.
* Returns 1 on successfully finishing all the sync targets.
*/
int hpfs_sync::request_loop(const util::h32 current_target, util::h32 &updated_state)
int hpfs_sync::request_loop(const util::h32 current_target_hash, util::h32 &updated_state)
{
std::string lcl = ledger::ctx.get_lcl();
// Send the initial root hpfs request of the current target.
submit_request(backlog_item{ctx.current_target.item_type, ctx.current_target.vpath, -1, current_target}, lcl);
submit_request(backlog_item{this->current_target.item_type, this->current_target.vpath, -1, current_target_hash}, lcl);
// Indicates whether any responses were processed in the previous loop iteration.
bool prev_responses_processed = false;
@@ -171,7 +171,7 @@ namespace hpfs
// No. of repetitive resubmissions so far. (This is reset whenever we receive a hpfs response)
uint16_t resubmissions_count = 0;
while (!should_stop_request_loop(current_target))
while (!should_stop_request_loop(current_target_hash))
{
// Wait a small delay if there were no responses processed during previous iteration.
if (!prev_responses_processed)
@@ -183,18 +183,18 @@ namespace hpfs
// Move the received hpfs responses to the local response list.
swap_collected_responses();
prev_responses_processed = !ctx.candidate_hpfs_responses.empty();
prev_responses_processed = !candidate_hpfs_responses.empty();
// Reset resubmissions counter whenever we have a resposne.
if (!ctx.candidate_hpfs_responses.empty())
if (!candidate_hpfs_responses.empty())
resubmissions_count = 0;
for (auto &response : ctx.candidate_hpfs_responses)
for (auto &response : candidate_hpfs_responses)
{
if (should_stop_request_loop(current_target))
if (should_stop_request_loop(current_target_hash))
return 0;
LOG_DEBUG << "hpfs " << name << " sync: Processing hpfs response from [" << response.first.substr(2, 10) << "]";
LOG_DEBUG << "Hpfs " << name << " sync: Processing hpfs response from [" << response.first.substr(2, 10) << "]";
const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(response.second.data());
const msg::fbuf::p2pmsg::Hpfs_Response_Message *resp_msg = content->message_as_Hpfs_Response_Message();
@@ -204,10 +204,10 @@ namespace hpfs
std::string_view vpath = msg::fbuf::flatbuff_str_to_sv(resp_msg->path());
const std::string key = std::string(vpath).append(hash);
const auto pending_resp_itr = ctx.submitted_requests.find(key);
if (pending_resp_itr == ctx.submitted_requests.end())
const auto pending_resp_itr = submitted_requests.find(key);
if (pending_resp_itr == submitted_requests.end())
{
LOG_DEBUG << "hpfs " << name << " sync: Skipping hpfs response due to hash mismatch.";
LOG_DEBUG << "Hpfs " << name << " sync: Skipping hpfs response due to hash mismatch.";
continue;
}
@@ -225,7 +225,7 @@ namespace hpfs
// Validate received fs data against the hash.
if (!validate_fs_entry_hash(vpath, hash, peer_fs_entry_map))
{
LOG_INFO << "hpfs " << name << " sync: Skipping hpfs response due to fs entry hash mismatch.";
LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to fs entry hash mismatch.";
continue;
}
@@ -242,7 +242,7 @@ namespace hpfs
// Validate received hashmap against the hash.
if (!validate_file_hashmap_hash(vpath, hash, peer_hashes, peer_hash_count))
{
LOG_INFO << "hpfs " << name << " sync: Skipping hpfs response due to file hashmap hash mismatch.";
LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to file hashmap hash mismatch.";
continue;
}
@@ -259,7 +259,7 @@ namespace hpfs
// Validate received block data against the hash.
if (!validate_file_block_hash(hash, block_id, buf))
{
LOG_INFO << "hpfs " << name << " sync: Skipping hpfs response due to file block hash mismatch.";
LOG_INFO << "Hpfs " << name << " sync: Skipping hpfs response due to file block hash mismatch.";
continue;
}
@@ -267,30 +267,30 @@ namespace hpfs
}
// Now that we have received matching hash and handled it, remove it from the waiting list.
ctx.submitted_requests.erase(pending_resp_itr);
submitted_requests.erase(pending_resp_itr);
// After handling each response, check whether we have reached target hpfs state.
// get_hash returns 0 incase target parent is not existing in our side.
if (fs_mount->get_hash(updated_state, hpfs::RW_SESSION_NAME, ctx.current_target.vpath) == -1)
if (fs_mount->get_hash(updated_state, hpfs::RW_SESSION_NAME, this->current_target.vpath) == -1)
{
LOG_ERROR << "hpfs " << name << " sync: exiting due to hash check error.";
LOG_ERROR << "Hpfs " << name << " sync: exiting due to hash check error.";
return -1;
}
// Update the central hpfs state tracker.
fs_mount->set_parent_hash(ctx.current_target.vpath, updated_state);
fs_mount->set_parent_hash(this->current_target.vpath, updated_state);
LOG_DEBUG << "hpfs " << name << " sync: current:" << updated_state << " | target:" << current_target;
if (updated_state == current_target)
LOG_DEBUG << "Hpfs " << name << " sync: current:" << updated_state << " | target:" << current_target_hash;
if (updated_state == current_target_hash)
return 0;
}
ctx.candidate_hpfs_responses.clear();
candidate_hpfs_responses.clear();
// Check for long-awaited responses and re-request them.
for (auto &[hash, request] : ctx.submitted_requests)
for (auto &[hash, request] : submitted_requests)
{
if (should_stop_request_loop(current_target))
if (should_stop_request_loop(current_target_hash))
return 0;
if (request.waiting_time < REQUEST_RESUBMIT_TIMEOUT)
@@ -302,9 +302,9 @@ namespace hpfs
{
if (++resubmissions_count > ABANDON_THRESHOLD)
{
LOG_INFO << "hpfs " << name << " sync: Resubmission threshold exceeded. Abandoning sync.";
LOG_INFO << "Hpfs " << name << " sync: Resubmission threshold exceeded. Abandoning sync.";
std::shared_lock lock(ctx.current_target_mutex);
std::shared_lock lock(current_target_mutex);
const int result = start_syncing_next_target();
if (result == 0)
return 1; // To stop syncing since we have sync all the targets.
@@ -313,23 +313,23 @@ namespace hpfs
// Reset the counter and re-submit request.
request.waiting_time = 0;
LOG_DEBUG << "hpfs " << name << " sync: Resubmitting request...";
LOG_DEBUG << "Hpfs " << name << " sync: Resubmitting request...";
submit_request(request, lcl);
}
}
// Check whether we can submit any more requests.
if (!ctx.pending_requests.empty() && ctx.submitted_requests.size() < MAX_AWAITING_REQUESTS)
if (!pending_requests.empty() && submitted_requests.size() < MAX_AWAITING_REQUESTS)
{
const uint16_t available_slots = MAX_AWAITING_REQUESTS - ctx.submitted_requests.size();
for (int i = 0; i < available_slots && !ctx.pending_requests.empty(); i++)
const uint16_t available_slots = MAX_AWAITING_REQUESTS - submitted_requests.size();
for (int i = 0; i < available_slots && !pending_requests.empty(); i++)
{
if (should_stop_request_loop(current_target))
if (should_stop_request_loop(current_target_hash))
return 0;
const backlog_item &request = ctx.pending_requests.front();
const backlog_item &request = pending_requests.front();
submit_request(request, lcl);
ctx.pending_requests.pop_front();
pending_requests.pop_front();
}
}
}
@@ -405,14 +405,14 @@ namespace hpfs
/**
* Indicates whether to break out of hpfs request processing loop.
*/
bool hpfs_sync::should_stop_request_loop(const util::h32 &current_target)
bool hpfs_sync::should_stop_request_loop(const util::h32 &current_target_hash)
{
if (ctx.is_shutting_down)
if (is_shutting_down)
return true;
// Stop request loop if the target has changed.
std::shared_lock lock(ctx.current_target_mutex);
return current_target != ctx.current_target.hash;
std::shared_lock lock(current_target_mutex);
return current_target_hash != this->current_target.hash;
}
/**
@@ -445,14 +445,14 @@ namespace hpfs
{
const std::string key = std::string(request.path)
.append(reinterpret_cast<const char *>(&request.expected_hash), sizeof(util::h32));
ctx.submitted_requests.try_emplace(key, request);
submitted_requests.try_emplace(key, request);
const bool is_file = request.type != BACKLOG_ITEM_TYPE::DIR;
std::string target_pubkey;
request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, lcl, target_pubkey);
if (!target_pubkey.empty())
LOG_DEBUG << "hpfs " << name << " sync: Requesting from [" << target_pubkey.substr(2, 10) << "]. type:" << request.type
LOG_DEBUG << "Hpfs " << name << " sync: Requesting from [" << target_pubkey.substr(2, 10) << "]. type:" << request.type
<< " path:" << request.path << " block_id:" << request.block_id
<< " hash:" << request.expected_hash;
}
@@ -466,7 +466,7 @@ namespace hpfs
int hpfs_sync::handle_fs_entry_response(std::string_view vpath, std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entry_map)
{
// Get the parent path of the fs entries we have received.
LOG_DEBUG << "hpfs " << name << " sync: Processing fs entries response for " << vpath;
LOG_DEBUG << "Hpfs " << name << " sync: Processing fs entries response for " << vpath;
// Create physical directory on our side if not exist.
std::string parent_physical_path = fs_mount->rw_dir + vpath.data();
@@ -494,9 +494,9 @@ namespace hpfs
{
// Prioritize file hpfs requests over directories.
if (ex_entry.is_file)
ctx.pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, peer_itr->second.hash});
pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, peer_itr->second.hash});
else
ctx.pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, peer_itr->second.hash});
pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, peer_itr->second.hash});
}
fs_entry_map.erase(peer_itr);
@@ -510,7 +510,7 @@ namespace hpfs
!ex_entry.is_file && util::remove_directory_recursively(child_physical_path.c_str()) == -1)
return -1;
LOG_DEBUG << "hpfs " << name << " sync: Deleted " << (ex_entry.is_file ? "file" : "dir") << " path " << child_vpath;
LOG_DEBUG << "Hpfs " << name << " sync: Deleted " << (ex_entry.is_file ? "file" : "dir") << " path " << child_vpath;
}
}
@@ -524,9 +524,9 @@ namespace hpfs
// Prioritize file hpfs requests over directories.
if (fs_entry.is_file)
ctx.pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, fs_entry.hash});
pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, fs_entry.hash});
else
ctx.pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, fs_entry.hash});
pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, fs_entry.hash});
}
return 0;
@@ -543,7 +543,7 @@ namespace hpfs
int hpfs_sync::handle_file_hashmap_response(std::string_view vpath, const util::h32 *hashes, const size_t hash_count, const uint64_t file_length)
{
// Get the file path of the block hashes we have received.
LOG_DEBUG << "hpfs " << name << " sync: Processing file block hashes response for " << vpath;
LOG_DEBUG << "Hpfs " << name << " sync: Processing file block hashes response for " << vpath;
// File block hashes on our side (file might not exist on our side).
std::vector<util::h32> existing_hashes;
@@ -552,13 +552,13 @@ namespace hpfs
const size_t existing_hash_count = existing_hashes.size();
// Compare the block hashes and request any differences.
auto insert_itr = ctx.pending_requests.begin();
auto insert_itr = pending_requests.begin();
const int32_t max_block_id = MAX(existing_hash_count, hash_count) - 1;
for (int32_t block_id = 0; block_id <= max_block_id; block_id++)
{
// Insert at front to give priority to block requests while preserving block order.
if (block_id >= existing_hash_count || existing_hashes[block_id] != hashes[block_id])
ctx.pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]});
pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(vpath), block_id, hashes[block_id]});
}
if (existing_hashes.size() >= hash_count)
@@ -581,7 +581,7 @@ namespace hpfs
*/
int hpfs_sync::handle_file_block_response(std::string_view vpath, const uint32_t block_id, std::string_view buf)
{
LOG_DEBUG << "hpfs " << name << " sync: Writing block_id " << block_id
LOG_DEBUG << "Hpfs " << name << " sync: Writing block_id " << block_id
<< " (len:" << buf.length()
<< ") of " << vpath;
@@ -609,25 +609,8 @@ namespace hpfs
* This method can be used to invoke mount specific custom logic (after extending this super class) to be executed after
* a sync target is acheived.
*/
void hpfs_sync::on_current_sync_state_acheived()
void hpfs_sync::on_current_sync_state_acheived(const util::h32 &acheived_hash)
{
if (ctx.current_target.vpath == hpfs::PATCH_FILE_PATH)
{
// Appling new patch file changes to hpcore runtime.
if (conf::apply_patch_config(hpfs::RW_SESSION_NAME) == -1)
{
LOG_ERROR << "Appling patch file changes after sync failed";
}
else
{
unl::update_unl_changes_from_patch();
// Update global hash tracker with the new patch file hash.
util::h32 updated_patch_hash;
fs_mount->get_hash(updated_patch_hash, hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH);
fs_mount->set_parent_hash(ctx.current_target.vpath, updated_patch_hash);
}
}
}
/**
@@ -636,30 +619,17 @@ namespace hpfs
*/
int hpfs_sync::start_syncing_next_target()
{
ctx.target_list.pop(); // Remove the synced parent from the target list.
if (ctx.target_list.empty())
target_list.pop(); // Remove the synced parent from the target list.
if (target_list.empty())
{
ctx.current_target = {};
current_target = {};
return 0;
}
else
{
ctx.current_target = ctx.target_list.front();
current_target = target_list.front();
return 1;
}
}
/**
* Move the collected responses from hpfs responses to a local response list.
*/
void hpfs_sync::swap_collected_responses()
{
// This logic will be added to a child class in next PBI.
std::scoped_lock lock(p2p::ctx.collected_msgs.contract_hpfs_responses_mutex);
// Move collected hpfs responses over to local candidate responses list.
if (!p2p::ctx.collected_msgs.contract_hpfs_responses.empty())
ctx.candidate_hpfs_responses.splice(ctx.candidate_hpfs_responses.end(), p2p::ctx.collected_msgs.contract_hpfs_responses);
}
} // namespace hpfs

View File

@@ -6,6 +6,7 @@
#include "../msg/fbuf/p2pmsg_content_generated.h"
#include "../util/h32.hpp"
#include "../crypto.hpp"
#include "./hpfs_mount.hpp"
namespace hpfs
{
@@ -43,50 +44,49 @@ namespace hpfs
}
};
struct sync_context
class hpfs_sync
{
// The current target hashes we are syncing towards.
std::queue<sync_target> target_list;
private:
bool init_success = false;
uint16_t REQUEST_RESUBMIT_TIMEOUT; // No. of milliseconds to wait before resubmitting a request.
std::string name; // Name used for logging.
std::queue<sync_target> target_list; // The current target hashes we are syncing towards.
// Store the originally submitted sync target list. This list is used to avoid submitting same list multiple times
// because target list is updated when the sync targets are acheived.
std::queue<sync_target> original_target_list;
sync_target current_target = {};
// List of sender pubkeys and hpfs responses(flatbuffer messages) to be processed.
std::list<std::pair<std::string, std::string>> candidate_hpfs_responses;
// List of pending sync requests to be sent out.
std::list<backlog_item> pending_requests;
std::list<backlog_item> pending_requests; // List of pending sync requests to be sent out.
// List of submitted requests we are awaiting responses for, keyed by expected response path+hash.
std::unordered_map<std::string, backlog_item> submitted_requests;
std::thread hpfs_sync_thread;
std::shared_mutex current_target_mutex;
std::atomic<bool> is_syncing = false;
std::atomic<bool> is_shutting_down = false;
};
class hpfs_sync
{
private:
bool init_success = false;
uint16_t REQUEST_RESUBMIT_TIMEOUT; // No. of milliseconds to wait before resubmitting a request.
hpfs::hpfs_mount *fs_mount = NULL;
std::string name;
void hpfs_syncer_loop();
int request_loop(const util::h32 current_target, util::h32 &updated_state);
int request_loop(const util::h32 current_target_hash, util::h32 &updated_state);
int start_syncing_next_target();
protected:
virtual void on_current_sync_state_acheived();
virtual void swap_collected_responses(); // Must override in child classes.
sync_target current_target = {};
// List of sender pubkeys and hpfs responses(flatbuffer messages) to be processed.
std::list<std::pair<std::string, std::string>> candidate_hpfs_responses;
hpfs::hpfs_mount *fs_mount = NULL;
virtual void on_current_sync_state_acheived(const util::h32 &acheived_hash);
// Move the collected responses from hpfs responses to a local response list.
virtual void swap_collected_responses() = 0; // Must override in child classes.
public:
sync_context ctx;
std::atomic<bool> is_syncing = false;
int init(std::string_view name, hpfs::hpfs_mount *fs_mount);
@@ -100,7 +100,7 @@ namespace hpfs
bool validate_file_block_hash(std::string_view hash, const uint32_t block_id, std::string_view buf);
bool should_stop_request_loop(const util::h32 &current_target);
bool should_stop_request_loop(const util::h32 &current_target_hash);
void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id,
const util::h32 expected_hash, std::string_view lcl, std::string &target_pubkey);

View File

@@ -0,0 +1,15 @@
#include "./ledger_mount.hpp"
namespace ledger
{
/**
* Perform ledger file system mount related preparation tasks.
* @return Returns -1 on error and 0 on success.
*/
int ledger_mount::prepare_fs()
{
// Add ledger fs preparation logic here.
return 0;
}
} // namespace ledger

View File

@@ -0,0 +1,20 @@
#ifndef _HP_LEDGER_LEDGER_MOUNT_
#define _HP_LEDGER_LEDGER_MOUNT_
#include "../pchheader.hpp"
#include "../util/h32.hpp"
#include "../conf.hpp"
#include "../hpfs/hpfs_mount.hpp"
namespace ledger
{
/**
* Represents ledger file system mount.
*/
class ledger_mount : public hpfs::hpfs_mount
{
private:
int prepare_fs();
};
} // namespace ledger
#endif

View File

@@ -4,10 +4,52 @@
#include "../util/util.hpp"
#include "../msg/fbuf/ledger_helpers.hpp"
#include "../msg/fbuf/common_helpers.hpp"
#include "ledger_serve.hpp"
// Currently this namespace is added for sqlite testing, later this can be modified and renamed as 'ledger::ledger_sample' -> 'ledger' for ledger implementations.
namespace ledger::ledger_sample
{
constexpr uint32_t LEDGER_FS_ID = 1;
ledger::ledger_mount ledger_fs; // Global ledger file system instance.
ledger::ledger_sync ledger_sync_worker; // Global ledger file system sync instance.
ledger::ledger_serve ledger_server; // Ledger file server instance.
/**
* Perform ledger related initializations.
*/
int init()
{
if (ledger_fs.init(LEDGER_FS_ID, conf::ctx.ledger_hpfs_dir, conf::ctx.ledger_hpfs_mount_dir, conf::ctx.ledger_hpfs_rw_dir, conf::cfg.node.full_history) == -1)
{
LOG_ERROR << "Ledger file system initialization failed.";
return -1;
}
if (ledger_server.init("ledger", &ledger_fs) == -1)
{
LOG_ERROR << "Ledger file system serve worker initialization failed.";
return -1;
}
if (ledger_sync_worker.init("ledger", &ledger_fs) == -1)
{
LOG_ERROR << "Ledger file system sync worker initialization failed.";
return -1;
}
return 0;
}
/**
* Perform deinit tasks related to ledger.
*/
void deinit()
{
ledger_fs.deinit();
ledger_server.deinit();
ledger_sync_worker.deinit();
}
/**
* Create and save ledger record from the given proposal message.
* @param proposal Consensus-reached Stage 3 proposal.
@@ -47,14 +89,14 @@ namespace ledger::ledger_sample
// Get binary hash of the serialized lcl.
std::string_view ledger_str_buf = msg::fbuf::flatbuff_bytes_to_sv(builder.GetBufferPointer(), builder.GetSize());
const std::string lcl_hash = crypto::get_hash(ledger_str_buf);
// Get binary hash of users and inputs.
const std::string user_hash = crypto::get_hash(proposal.users);
const std::string input_hash = crypto::get_hash(proposal.input_hashes);
const std::string seq_no_str = std::to_string(seq_no);
const std::string time_str = std::to_string(proposal.time);
// Contruct binary string for data hash.
std::string data;
data.reserve(seq_no_str.size() + time_str.size() + (32 * 5));

View File

@@ -1,12 +1,21 @@
#include "../p2p/p2p.hpp"
#include "sqlite.hpp"
#include "ledger_sync.hpp"
#include "ledger_mount.hpp"
namespace ledger::ledger_sample
{
constexpr const char *GENESIS_LEDGER = "0-genesis";
extern ledger::ledger_mount ledger_fs; // Global ledger file system instance.
extern ledger::ledger_sync ledger_sync_worker; // Global ledger file system sync instance.
int init();
void deinit();
int save_ledger(const p2p::proposal &proposal);
int extract_lcl(const std::string &lcl, uint64_t &seq_no, std::string &hash);
} // namespace ledger::ledger_sample

View File

@@ -0,0 +1,14 @@
#include "./ledger_serve.hpp"
namespace ledger
{
void ledger_serve::swap_collected_requests()
{
std::scoped_lock<std::mutex> lock(p2p::ctx.collected_msgs.ledger_hpfs_requests_mutex);
// Move collected hpfs requests for contract fs over to local requests list.
if (!p2p::ctx.collected_msgs.ledger_hpfs_requests.empty())
hpfs_requests.splice(hpfs_requests.end(), p2p::ctx.collected_msgs.ledger_hpfs_requests);
}
} // namespace ledger

View File

@@ -0,0 +1,15 @@
#ifndef _HP_LEDGER_LEDGER_SERVE_
#define _HP_LEDGER_LEDGER_SERVE_
#include "../pchheader.hpp"
#include "../hpfs/hpfs_serve.hpp"
namespace ledger
{
class ledger_serve : public hpfs::hpfs_serve
{
private:
void swap_collected_requests();
};
} // namespace ledger
#endif

View File

@@ -0,0 +1,19 @@
#include "./ledger_sync.hpp"
namespace ledger
{
void ledger_sync::on_current_sync_state_acheived(const util::h32 &acheived_hash)
{
// Logic when a sync state is acheived can be performed here.
}
void ledger_sync::swap_collected_responses()
{
std::scoped_lock lock(p2p::ctx.collected_msgs.ledger_hpfs_responses_mutex);
// Move collected hpfs responses over to local candidate responses list.
if (!p2p::ctx.collected_msgs.ledger_hpfs_responses.empty())
candidate_hpfs_responses.splice(candidate_hpfs_responses.end(), p2p::ctx.collected_msgs.ledger_hpfs_responses);
}
} // namespace ledger

View File

@@ -0,0 +1,18 @@
#ifndef _HP_LEDGER_LEDGER_SYNC_
#define _HP_LEDGER_LEDGER_SYNC_
#include "../pchheader.hpp"
#include "../util/h32.hpp"
#include "../conf.hpp"
#include "../hpfs/hpfs_sync.hpp"
namespace ledger
{
class ledger_sync : public hpfs::hpfs_sync
{
private:
void on_current_sync_state_acheived(const util::h32 &acheived_hash);
void swap_collected_responses();
};
} // namespace ledger
#endif

View File

@@ -6,14 +6,14 @@
#include "util/util.hpp"
#include "conf.hpp"
#include "crypto.hpp"
#include "sc.hpp"
#include "./sc/sc.hpp"
#include "hplog.hpp"
#include "usr/usr.hpp"
#include "usr/read_req.hpp"
#include "p2p/p2p.hpp"
#include "consensus.hpp"
#include "ledger.hpp"
#include "hpfs/hpfs.hpp"
#include "ledger/ledger_sample.hpp"
#include "unl.hpp"
/**
@@ -71,7 +71,8 @@ void deinit()
p2p::deinit();
read_req::deinit();
consensus::deinit();
hpfs::deinit();
ledger::ledger_sample::deinit(); // Deinit method in new ledger implementation.
sc::deinit();
ledger::deinit();
conf::deinit();
}
@@ -195,7 +196,8 @@ int main(int argc, char **argv)
LOG_INFO << "Public key: " << conf::cfg.node.public_key_hex;
LOG_INFO << "Contract: " << conf::cfg.contract.id << " (" << conf::cfg.contract.version << ")";
if (hpfs::init() == -1 ||
if (sc::init() == -1 ||
ledger::ledger_sample::init() == -1 || // Init method of new ledger implementaiton.
ledger::init() == -1 ||
unl::init() == -1 ||
consensus::init() == -1 ||

View File

@@ -93,7 +93,7 @@ table HistoryLedgerBlock {
}
table Hpfs_Request_Message { //Hpfs request message schema
mount_id: int32;
mount_id: uint32;
parent_path:string;
is_file:bool;
block_id:int32;
@@ -106,7 +106,7 @@ table Hpfs_Response_Message{
hpfs_response:Hpfs_Response;
hash:[ubyte];
path: string;
mount_id: int32;
mount_id: uint32;
}
table Fs_Entry_Response{

View File

@@ -1317,11 +1317,11 @@ struct Hpfs_Request_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table
VT_BLOCK_ID = 10,
VT_EXPECTED_HASH = 12
};
int32_t mount_id() const {
return GetField<int32_t>(VT_MOUNT_ID, 0);
uint32_t mount_id() const {
return GetField<uint32_t>(VT_MOUNT_ID, 0);
}
bool mutate_mount_id(int32_t _mount_id) {
return SetField<int32_t>(VT_MOUNT_ID, _mount_id, 0);
bool mutate_mount_id(uint32_t _mount_id) {
return SetField<uint32_t>(VT_MOUNT_ID, _mount_id, 0);
}
const flatbuffers::String *parent_path() const {
return GetPointer<const flatbuffers::String *>(VT_PARENT_PATH);
@@ -1349,7 +1349,7 @@ struct Hpfs_Request_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<int32_t>(verifier, VT_MOUNT_ID) &&
VerifyField<uint32_t>(verifier, VT_MOUNT_ID) &&
VerifyOffset(verifier, VT_PARENT_PATH) &&
verifier.VerifyString(parent_path()) &&
VerifyField<uint8_t>(verifier, VT_IS_FILE) &&
@@ -1364,8 +1364,8 @@ struct Hpfs_Request_MessageBuilder {
typedef Hpfs_Request_Message Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_mount_id(int32_t mount_id) {
fbb_.AddElement<int32_t>(Hpfs_Request_Message::VT_MOUNT_ID, mount_id, 0);
void add_mount_id(uint32_t mount_id) {
fbb_.AddElement<uint32_t>(Hpfs_Request_Message::VT_MOUNT_ID, mount_id, 0);
}
void add_parent_path(flatbuffers::Offset<flatbuffers::String> parent_path) {
fbb_.AddOffset(Hpfs_Request_Message::VT_PARENT_PATH, parent_path);
@@ -1392,7 +1392,7 @@ struct Hpfs_Request_MessageBuilder {
inline flatbuffers::Offset<Hpfs_Request_Message> CreateHpfs_Request_Message(
flatbuffers::FlatBufferBuilder &_fbb,
int32_t mount_id = 0,
uint32_t mount_id = 0,
flatbuffers::Offset<flatbuffers::String> parent_path = 0,
bool is_file = false,
int32_t block_id = 0,
@@ -1408,7 +1408,7 @@ inline flatbuffers::Offset<Hpfs_Request_Message> CreateHpfs_Request_Message(
inline flatbuffers::Offset<Hpfs_Request_Message> CreateHpfs_Request_MessageDirect(
flatbuffers::FlatBufferBuilder &_fbb,
int32_t mount_id = 0,
uint32_t mount_id = 0,
const char *parent_path = nullptr,
bool is_file = false,
int32_t block_id = 0,
@@ -1464,11 +1464,11 @@ struct Hpfs_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tabl
flatbuffers::String *mutable_path() {
return GetPointer<flatbuffers::String *>(VT_PATH);
}
int32_t mount_id() const {
return GetField<int32_t>(VT_MOUNT_ID, 0);
uint32_t mount_id() const {
return GetField<uint32_t>(VT_MOUNT_ID, 0);
}
bool mutate_mount_id(int32_t _mount_id) {
return SetField<int32_t>(VT_MOUNT_ID, _mount_id, 0);
bool mutate_mount_id(uint32_t _mount_id) {
return SetField<uint32_t>(VT_MOUNT_ID, _mount_id, 0);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
@@ -1479,7 +1479,7 @@ struct Hpfs_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tabl
verifier.VerifyVector(hash()) &&
VerifyOffset(verifier, VT_PATH) &&
verifier.VerifyString(path()) &&
VerifyField<int32_t>(verifier, VT_MOUNT_ID) &&
VerifyField<uint32_t>(verifier, VT_MOUNT_ID) &&
verifier.EndTable();
}
};
@@ -1512,8 +1512,8 @@ struct Hpfs_Response_MessageBuilder {
void add_path(flatbuffers::Offset<flatbuffers::String> path) {
fbb_.AddOffset(Hpfs_Response_Message::VT_PATH, path);
}
void add_mount_id(int32_t mount_id) {
fbb_.AddElement<int32_t>(Hpfs_Response_Message::VT_MOUNT_ID, mount_id, 0);
void add_mount_id(uint32_t mount_id) {
fbb_.AddElement<uint32_t>(Hpfs_Response_Message::VT_MOUNT_ID, mount_id, 0);
}
explicit Hpfs_Response_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
@@ -1532,7 +1532,7 @@ inline flatbuffers::Offset<Hpfs_Response_Message> CreateHpfs_Response_Message(
flatbuffers::Offset<void> hpfs_response = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash = 0,
flatbuffers::Offset<flatbuffers::String> path = 0,
int32_t mount_id = 0) {
uint32_t mount_id = 0) {
Hpfs_Response_MessageBuilder builder_(_fbb);
builder_.add_mount_id(mount_id);
builder_.add_path(path);
@@ -1548,7 +1548,7 @@ inline flatbuffers::Offset<Hpfs_Response_Message> CreateHpfs_Response_MessageDir
flatbuffers::Offset<void> hpfs_response = 0,
const std::vector<uint8_t> *hash = nullptr,
const char *path = nullptr,
int32_t mount_id = 0) {
uint32_t mount_id = 0) {
auto hash__ = hash ? _fbb.CreateVector<uint8_t>(*hash) : 0;
auto path__ = path ? _fbb.CreateString(path) : 0;
return msg::fbuf::p2pmsg::CreateHpfs_Response_Message(

View File

@@ -492,12 +492,13 @@ namespace msg::fbuf::p2pmsg
* Create content response message from the given content response.
* @param container_builder Flatbuffer builder for the container message.
* @param path The path of the directory.
* @param mount_id The mount id of the relavent hpfs mount.
* @param hash_nodes File or directory entries with hashes in the given parent path.
* @param expected_hash The exptected hash of the requested path.
* @param lcl Lcl to be include in the container msg.
*/
void create_msg_from_fsentry_response(
flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, const int32_t mount_id,
flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, const uint32_t mount_id,
std::vector<hpfs::child_hash_node> &hash_nodes, util::h32 expected_hash, std::string_view lcl)
{
flatbuffers::FlatBufferBuilder builder(1024);
@@ -525,11 +526,12 @@ namespace msg::fbuf::p2pmsg
* Create content response message from the given content response.
* @param container_builder Flatbuffer builder for the container message.
* @param path The path of the directory.
* @param mount_id The mount id of the relavent hpfs mount.
* @param hashmap Hashmap of the file
* @param lcl Lcl to be include in the container msg.
*/
void create_msg_from_filehashmap_response(
flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, const int32_t mount_id,
flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, const uint32_t mount_id,
std::vector<util::h32> &hashmap, std::size_t file_length, util::h32 expected_hash, std::string_view lcl)
{
// todo:get a average propsal message size and allocate content builder based on that.
@@ -561,10 +563,11 @@ namespace msg::fbuf::p2pmsg
/**
* Create content response message from the given content response.
* @param container_builder Flatbuffer builder for the container message.
* @param block_resp Block response struct to place in the message
* @param block_resp Block response struct to place in the message.
* @param mount_id The mount id of the relavent hpfs mount.
* @param lcl Lcl to be include in the container message.
*/
void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, const int32_t mount_id, std::string_view lcl)
void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, const uint32_t mount_id, std::string_view lcl)
{
// todo:get a average propsal message size and allocate content builder based on that.
flatbuffers::FlatBufferBuilder builder(1024);

View File

@@ -58,14 +58,14 @@ namespace msg::fbuf::p2pmsg
void create_msg_from_hpfs_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::hpfs_request &hr, std::string_view lcl);
void create_msg_from_fsentry_response(
flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, const int32_t mount_id,
flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, const uint32_t mount_id,
std::vector<hpfs::child_hash_node> &hash_nodes, util::h32 expected_hash, std::string_view lcl);
void create_msg_from_filehashmap_response(
flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, const int32_t mount_id,
flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, const uint32_t mount_id,
std::vector<util::h32> &hashmap, std::size_t file_length, util::h32 expected_hash, std::string_view lcl);
void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, const int32_t mount_id, std::string_view lcl);
void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, const uint32_t mount_id, std::string_view lcl);
void create_containermsg_from_content(
flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, std::string_view lcl, const bool sign);
@@ -101,10 +101,10 @@ namespace msg::fbuf::p2pmsg
peer_propertiesvector_to_flatbuf_peer_propertieslist(flatbuffers::FlatBufferBuilder &builder, const std::vector<conf::peer_properties> &peers, const std::optional<conf::ip_port_prop> &skipping_ip_port);
void flatbuf_hpfsfshashentry_to_hpfsfshashentry(std::unordered_map<std::string, p2p::hpfs_fs_hash_entry> &fs_entries,
const flatbuffers::Vector<flatbuffers::Offset<Hpfs_FS_Hash_Entry>> *fhashes);
const flatbuffers::Vector<flatbuffers::Offset<Hpfs_FS_Hash_Entry>> *fhashes);
void hpfsfilehash_to_flatbuf_hpfsfilehash(flatbuffers::FlatBufferBuilder &builder, std::vector<flatbuffers::Offset<Hpfs_FS_Hash_Entry>> &list,
std::string_view full_path, bool is_file, std::string_view hash);
std::string_view full_path, bool is_file, std::string_view hash);
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Hpfs_FS_Hash_Entry>>>
hpfsfshashentry_to_flatbuff_hpfsfshashentry(

View File

@@ -14,8 +14,8 @@ namespace p2p
{
constexpr uint16_t PROPOSAL_LIST_CAP = 64; // Maximum proposal count.
constexpr uint16_t NONUNL_PROPOSAL_LIST_CAP = 64; // Maximum nonunl proposal count.
constexpr uint16_t HPFS_REQ_LIST_CAP = 64; // Maximum state request count.
constexpr uint16_t HPFS_RES_LIST_CAP = 64; // Maximum state response count.
constexpr uint16_t HPFS_REQ_LIST_CAP = 64; // Maximum state request count.
constexpr uint16_t HPFS_RES_LIST_CAP = 64; // Maximum state response count.
constexpr uint16_t PEER_LIST_CAP = 64; // Maximum peer count.
struct proposal
@@ -91,7 +91,7 @@ namespace p2p
// Represents a hpfs request sent to a peer.
struct hpfs_request
{
int32_t mount_id; // Relavent file system id.
uint32_t mount_id; // Relavent file system id.
std::string parent_path; // The requested file or dir path.
bool is_file = false; // Whether the path is a file or dir.
int32_t block_id = 0; // Block id of the file if we are requesting for file block. Otherwise -1.
@@ -127,9 +127,17 @@ namespace p2p
std::list<std::pair<std::string, p2p::hpfs_request>> contract_hpfs_requests;
std::mutex contract_hpfs_requests_mutex; // Mutex for contract fs hpfs requests access race conditions.
// List of pairs indicating the session pubkey hex and the ledger fs hpfs requests.
std::list<std::pair<std::string, p2p::hpfs_request>> ledger_hpfs_requests;
std::mutex ledger_hpfs_requests_mutex; // Mutex for ledger fs hpfs requests access race conditions.
// List of pairs indicating the session pubkey hex and the contract fs hpfs responses.
std::list<std::pair<std::string, std::string>> contract_hpfs_responses;
std::mutex contract_hpfs_responses_mutex; // Mutex for contract fs hpfs responses access race conditions.
// List of pairs indicating the session pubkey hex and the ledger fs hpfs responses.
std::list<std::pair<std::string, std::string>> ledger_hpfs_responses;
std::mutex ledger_hpfs_responses_mutex; // Mutex for ledger fs hpfs responses access race conditions.
};
struct connected_context

View File

@@ -9,12 +9,11 @@
#include "../msg/fbuf/p2pmsg_content_generated.h"
#include "../msg/fbuf/p2pmsg_helpers.hpp"
#include "../msg/fbuf/common_helpers.hpp"
#include "../hpfs/hpfs_sync.hpp"
#include "../ledger/ledger_sample.hpp"
#include "../ledger.hpp"
#include "peer_comm_session.hpp"
#include "p2p.hpp"
#include "../unl.hpp"
#include "../hpfs/hpfs.hpp"
namespace p2pmsg = msg::fbuf::p2pmsg;
@@ -195,7 +194,7 @@ namespace p2p
{
const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(content_ptr);
const p2p::hpfs_request hr = p2pmsg::create_hpfs_request_from_msg(*content->message_as_Hpfs_Request_Message());
if (hr.mount_id == hpfs::contract_fs.mount_id)
if (hr.mount_id == sc::contract_fs.mount_id)
{
// Check the cap and insert request with lock.
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.contract_hpfs_requests_mutex);
@@ -210,6 +209,21 @@ namespace p2p
LOG_DEBUG << "Hpfs contract fs request rejected. Maximum hpfs contract fs request count reached. " << session.display_name();
}
}
else if (hr.mount_id == ledger::ledger_sample::ledger_fs.mount_id)
{
// Check the cap and insert request with lock.
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.ledger_hpfs_requests_mutex);
// If max number of state requests reached skip the rest.
if (ctx.collected_msgs.ledger_hpfs_requests.size() < p2p::HPFS_REQ_LIST_CAP)
{
ctx.collected_msgs.ledger_hpfs_requests.push_back(std::make_pair(session.pubkey, std::move(hr)));
}
else
{
LOG_DEBUG << "Hpfs ledger fs request rejected. Maximum hpfs ledger fs request count reached. " << session.display_name();
}
}
}
else if (content_message_type == p2pmsg::Message_Hpfs_Response_Message)
{
@@ -217,7 +231,7 @@ namespace p2p
const msg::fbuf::p2pmsg::Hpfs_Response_Message *resp_msg = content->message_as_Hpfs_Response_Message();
// Only accept hpfs responses if hpfs fs is syncing.
if (hpfs::contract_sync.ctx.is_syncing && resp_msg->mount_id() == hpfs::contract_fs.mount_id)
if (sc::contract_sync_worker.is_syncing && resp_msg->mount_id() == sc::contract_fs.mount_id)
{
// Check the cap and insert state_response with lock.
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.contract_hpfs_responses_mutex);
@@ -233,6 +247,22 @@ namespace p2p
LOG_DEBUG << "Contract hpfs response rejected. Maximum contract hpfs response count reached. " << session.display_name();
}
}
else if (ledger::ledger_sample::ledger_sync_worker.is_syncing && resp_msg->mount_id() == ledger::ledger_sample::ledger_fs.mount_id)
{
// Check the cap and insert state_response with lock.
std::scoped_lock<std::mutex> lock(ctx.collected_msgs.ledger_hpfs_responses_mutex);
// If max number of state responses reached skip the rest.
if (ctx.collected_msgs.ledger_hpfs_responses.size() < p2p::HPFS_RES_LIST_CAP)
{
std::string response(reinterpret_cast<const char *>(content_ptr), content_size);
ctx.collected_msgs.ledger_hpfs_responses.push_back(std::make_pair(session.uniqueid, std::move(response)));
}
else
{
LOG_DEBUG << "Ledger hpfs response rejected. Maximum ledger hpfs response count reached. " << session.display_name();
}
}
}
else if (content_message_type == p2pmsg::Message_History_Request_Message) //message is a lcl history request message
{

30
src/sc/contract_mount.cpp Normal file
View File

@@ -0,0 +1,30 @@
#include "./contract_mount.hpp"
namespace sc
{
/**
* Perform contract file system mount related preparation tasks.
* @return Returns -1 on error and 0 on success.
*/
int contract_mount::prepare_fs()
{
util::h32 initial_state_hash;
util::h32 initial_patch_hash;
if (acquire_rw_session() == -1 ||
conf::populate_patch_config() == -1 ||
get_hash(initial_state_hash, hpfs::RW_SESSION_NAME, hpfs::STATE_DIR_PATH) == -1 ||
get_hash(initial_patch_hash, hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH) == -1 ||
release_rw_session() == -1)
{
LOG_ERROR << "Failed to prepare initial fs at mount " << mount_dir << ".";
return -1;
}
set_parent_hash(hpfs::STATE_DIR_PATH, initial_state_hash);
set_parent_hash(hpfs::PATCH_FILE_PATH, initial_patch_hash);
LOG_INFO << "Initial state: " << initial_state_hash << " | patch: " << initial_patch_hash;
return 0;
}
}

20
src/sc/contract_mount.hpp Normal file
View File

@@ -0,0 +1,20 @@
#ifndef _HP_SC_CONTRACT_MOUNT_
#define _HP_SC_CONTRACT_MOUNT_
#include "../pchheader.hpp"
#include "../util/h32.hpp"
#include "../conf.hpp"
#include "../hpfs/hpfs_mount.hpp"
namespace sc
{
/**
* Represents contract file system mount.
*/
class contract_mount : public hpfs::hpfs_mount
{
private:
int prepare_fs();
};
} // namespace sc
#endif

14
src/sc/contract_serve.cpp Normal file
View File

@@ -0,0 +1,14 @@
#include "./contract_serve.hpp"
namespace sc
{
void contract_serve::swap_collected_requests()
{
std::scoped_lock<std::mutex> lock(p2p::ctx.collected_msgs.contract_hpfs_requests_mutex);
// Move collected hpfs requests for contract fs over to local requests list.
if (!p2p::ctx.collected_msgs.contract_hpfs_requests.empty())
hpfs_requests.splice(hpfs_requests.end(), p2p::ctx.collected_msgs.contract_hpfs_requests);
}
} // namespace sc

17
src/sc/contract_serve.hpp Normal file
View File

@@ -0,0 +1,17 @@
#ifndef _HP_SC_CONTRACT_SERVE_
#define _HP_SC_CONTRACT_SERVE_
#include "../pchheader.hpp"
#include "../util/h32.hpp"
#include "../conf.hpp"
#include "../hpfs/hpfs_serve.hpp"
namespace sc
{
class contract_serve : public hpfs::hpfs_serve
{
private:
void swap_collected_requests();
};
} // namespace sc
#endif

36
src/sc/contract_sync.cpp Normal file
View File

@@ -0,0 +1,36 @@
#include "./contract_sync.hpp"
#include "../unl.hpp"
#include "../hpfs/hpfs_mount.hpp"
namespace sc
{
void contract_sync::on_current_sync_state_acheived(const util::h32 &acheived_hash)
{
if (current_target.vpath == hpfs::PATCH_FILE_PATH)
{
// Appling new patch file changes to hpcore runtime.
if (conf::apply_patch_config(hpfs::RW_SESSION_NAME) == -1)
{
LOG_ERROR << "Appling patch file changes after sync failed";
}
else
{
unl::update_unl_changes_from_patch();
// Update global hash tracker with the new patch file hash.
fs_mount->set_parent_hash(current_target.vpath, acheived_hash);
}
}
}
void contract_sync::swap_collected_responses()
{
std::scoped_lock lock(p2p::ctx.collected_msgs.contract_hpfs_responses_mutex);
// Move collected hpfs responses over to local candidate responses list.
if (!p2p::ctx.collected_msgs.contract_hpfs_responses.empty())
candidate_hpfs_responses.splice(candidate_hpfs_responses.end(), p2p::ctx.collected_msgs.contract_hpfs_responses);
}
} // namespace sc

18
src/sc/contract_sync.hpp Normal file
View File

@@ -0,0 +1,18 @@
#ifndef _HP_SC_CONTRACT_SYNC_
#define _HP_SC_CONTRACT_SYNC_
#include "../pchheader.hpp"
#include "../util/h32.hpp"
#include "../conf.hpp"
#include "../hpfs/hpfs_sync.hpp"
namespace sc
{
class contract_sync : public hpfs::hpfs_sync
{
private:
void on_current_sync_state_acheived(const util::h32 &acheived_hash);
void swap_collected_responses();
};
} // namespace sc
#endif

View File

@@ -1,14 +1,14 @@
#include "pchheader.hpp"
#include "conf.hpp"
#include "consensus.hpp"
#include "hplog.hpp"
#include "ledger.hpp"
#include "../pchheader.hpp"
#include "../conf.hpp"
#include "../consensus.hpp"
#include "../hplog.hpp"
#include "../ledger.hpp"
#include "sc.hpp"
#include "hpfs/hpfs.hpp"
#include "msg/fbuf/p2pmsg_helpers.hpp"
#include "msg/controlmsg_common.hpp"
#include "msg/controlmsg_parser.hpp"
#include "unl.hpp"
#include "../msg/fbuf/p2pmsg_helpers.hpp"
#include "../msg/controlmsg_common.hpp"
#include "../msg/controlmsg_parser.hpp"
#include "../unl.hpp"
#include "contract_serve.hpp"
namespace sc
{
@@ -17,6 +17,40 @@ namespace sc
constexpr const char *STDOUT_LOG = ".stdout.log";
constexpr const char *STDERR_LOG = ".stderr.log";
constexpr uint32_t CONTRACT_FS_ID = 0;
sc::contract_mount contract_fs; // Global contract file system instance.
sc::contract_sync contract_sync_worker; // Global contract file system sync instance.
sc::contract_serve contract_server; // Contract file server instance.
int init()
{
if (contract_fs.init(CONTRACT_FS_ID, conf::ctx.contract_hpfs_dir, conf::ctx.contract_hpfs_mount_dir, conf::ctx.contract_hpfs_rw_dir, conf::cfg.node.full_history) == -1)
{
LOG_ERROR << "Contract file system initialization failed.";
return -1;
}
if (contract_server.init("contract", &contract_fs) == -1)
{
LOG_ERROR << "Contract file system serve worker initialization failed.";
return -1;
}
if (contract_sync_worker.init("contract", &contract_fs) == -1)
{
LOG_ERROR << "Contract file system sync worker initialization failed.";
return -1;
}
return 0;
}
void deinit()
{
contract_fs.deinit();
contract_server.deinit();
contract_sync_worker.deinit();
}
/**
* Executes the contract process and passes the specified context arguments.
* @return 0 on successful process creation. -1 on failure or contract process is already running.
@@ -92,7 +126,7 @@ namespace sc
execv_args[j] = conf::cfg.contract.runtime_binexec_args[i].data();
execv_args[len - 1] = NULL;
const std::string current_dir = hpfs::contract_fs.physical_path(ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH);
const std::string current_dir = contract_fs.physical_path(ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH);
chdir(current_dir.c_str());
if (create_contract_log_files(ctx) == -1)
@@ -165,8 +199,8 @@ namespace sc
if (!ctx.args.readonly)
ctx.args.hpfs_session_name = hpfs::RW_SESSION_NAME;
return ctx.args.readonly ? hpfs::contract_fs.start_ro_session(ctx.args.hpfs_session_name, false)
: hpfs::contract_fs.acquire_rw_session();
return ctx.args.readonly ? contract_fs.start_ro_session(ctx.args.hpfs_session_name, false)
: contract_fs.acquire_rw_session();
}
/**
@@ -174,7 +208,6 @@ namespace sc
*/
int stop_hpfs_session(execution_context &ctx)
{
hpfs::hpfs_mount &contract_fs = hpfs::contract_fs;
if (ctx.args.readonly)
{
return contract_fs.stop_ro_session(ctx.args.hpfs_session_name);

View File

@@ -1,12 +1,14 @@
#ifndef _HP_SC_
#define _HP_SC_
#ifndef _HP_SC_SC_
#define _HP_SC_SC_
#include "pchheader.hpp"
#include "usr/usr.hpp"
#include "util/h32.hpp"
#include "util/util.hpp"
#include "util/buffer_store.hpp"
#include "p2p/p2p.hpp"
#include "../pchheader.hpp"
#include "../usr/usr.hpp"
#include "../util/h32.hpp"
#include "../util/util.hpp"
#include "../util/buffer_store.hpp"
#include "../p2p/p2p.hpp"
#include "contract_mount.hpp"
#include "contract_sync.hpp"
/**
* Contains helper functions regarding POSIX process execution and IPC between HP and SC.
@@ -125,6 +127,13 @@ namespace sc
}
};
extern sc::contract_mount contract_fs; // Global contract file system instance.
extern sc::contract_sync contract_sync_worker; // Global contract file system sync instance.
int init();
void deinit();
int execute_contract(execution_context &ctx);
//------Internal-use functions for this namespace.
@@ -172,7 +181,7 @@ namespace sc
void cleanup_fds(execution_context &ctx);
void cleanup_fd_pair(fd_pair &fds);
void stop(execution_context &ctx);
void handle_control_msg(execution_context &ctx, std::string_view msg);

View File

@@ -1,7 +1,7 @@
#ifndef _HP_USR_READ_REQ_
#define _HP_USR_READ_REQ_
#include "../sc.hpp"
#include "../sc/sc.hpp"
#include "../util/buffer_store.hpp"
namespace read_req

View File

@@ -16,7 +16,6 @@
#include "user_input.hpp"
#include "read_req.hpp"
#include "input_nonce_map.hpp"
#include "../hpfs/hpfs.hpp"
namespace usr
{
@@ -392,7 +391,7 @@ namespace usr
util::fork_detach();
// before execution chdir into a valid the latest state data directory that contains an appbill.table
const std::string appbill_dir = hpfs::contract_fs.rw_dir + hpfs::STATE_DIR_PATH;
const std::string appbill_dir = sc::contract_fs.rw_dir + hpfs::STATE_DIR_PATH;
chdir(appbill_dir.c_str());
int ret = execv(execv_args[0], execv_args);
std::cerr << errno << ": Appbill process execv failed.\n";

View File

@@ -110,6 +110,8 @@ do
user: ${user_json}, \
log: {\
loglevel: '$loglevel', \
max_mbytes_per_file: 10, \
max_file_count": 50, \
loggers:['console', 'file'] \
}\
}, null, 2)" > hp.cfg
@@ -180,9 +182,9 @@ done
for (( i=1; i<=$ncount; i++ ))
do
mkdir -p ./node$i/hpfs/seed/ > /dev/null 2>&1
mkdir -p ./node$i/contract_fs/seed/ > /dev/null 2>&1
pushd ./node$i/hpfs/seed/state/ > /dev/null 2>&1
pushd ./node$i/contract_fs/seed/state/ > /dev/null 2>&1
# Load credit balance for user for appbill testing purposes.
>appbill.table