Single-process hpfs integration. (#212)

This commit is contained in:
Ravin Perera
2021-01-06 21:53:39 +05:30
committed by GitHub
parent bed8205ca8
commit e8e7921ac1
14 changed files with 245 additions and 293 deletions

View File

@@ -3,7 +3,6 @@
#include "crypto.hpp"
#include "hpfs/hpfs.hpp"
#include "util/util.hpp"
#include "sc.hpp"
namespace conf
{
@@ -17,11 +16,11 @@ namespace conf
// Stores the initial startup mode of the node.
ROLE startup_mode;
const static char *ROLE_OBSERVER = "observer";
const static char *ROLE_VALIDATOR = "validator";
const static char *PUBLIC = "public";
const static char *PRIVATE = "private";
constexpr const char *ROLE_OBSERVER = "observer";
constexpr const char *ROLE_VALIDATOR = "validator";
constexpr const char *PUBLIC = "public";
constexpr const char *PRIVATE = "private";
constexpr const char *HPFS_PATCH_SESSION_NAME = "ro_patch";
bool init_success = false;
@@ -34,16 +33,16 @@ namespace conf
// The validations/loading needs to be in this order.
// 1. Validate contract directories
// 2. Read and load the contract config into memory
// 3. Update contract config if patch file exists.
// 4. Validate the loaded config values
// 5. Locking the config file at the startup.
// 5. Initialize logging subsystem.
// 6. Update and validate contract config if patch file exists.
if (validate_contract_dir_paths() == -1 ||
set_config_lock() == -1 ||
read_config(cfg) == -1 ||
apply_patch_changes(cfg.contract) == -1 ||
validate_config(cfg) == -1 ||
set_config_lock() == -1)
validate_config(cfg) == -1)
{
release_config_lock();
return -1;
}
@@ -110,11 +109,7 @@ namespace conf
util::create_dir_tree_recursive(ctx.hist_dir);
util::create_dir_tree_recursive(ctx.full_hist_dir);
util::create_dir_tree_recursive(ctx.log_dir);
util::create_dir_tree_recursive(ctx.hpfs_dir);
// Creating hpfs seed dir in advance so hpfs doesn't cause mkdir race conditions during first-run.
util::create_dir_tree_recursive(ctx.hpfs_dir + "/seed");
util::create_dir_tree_recursive(ctx.hpfs_dir + std::string("/seed").append(sc::STATE_DIR_PATH));
util::create_dir_tree_recursive(ctx.hpfs_dir + "/seed" + hpfs::STATE_DIR_PATH);
//Create config file with default settings.
@@ -201,8 +196,8 @@ namespace conf
ctx.hist_dir = basedir + "/hist";
ctx.full_hist_dir = basedir + "/fullhist";
ctx.hpfs_dir = basedir + "/hpfs";
ctx.hpfs_rw_dir = ctx.hpfs_dir + "/rw";
ctx.hpfs_serve_dir = ctx.hpfs_dir + "/ss";
ctx.hpfs_mount_dir = ctx.hpfs_dir + "/mnt";
ctx.hpfs_rw_dir = ctx.hpfs_mount_dir + "/rw";
ctx.log_dir = basedir + "/log";
}
@@ -769,22 +764,19 @@ namespace conf
* @param contract_config Contract section of config structure.
* @return Returns -1 on error and 0 on successful update.
*/
int apply_patch_changes(contract_params &contract_config)
int apply_patch_changes()
{
pid_t hpfs_ro_pid = 0;
std::string mount_dir; // Holds the mount directory of the newly created hpfs session.
int res = 0;
if (hpfs::start_ro_session(HPFS_PATCH_SESSION_NAME, false) == -1)
return -1;
if (hpfs::start_ro_rw_process(hpfs_ro_pid, mount_dir,
true, false, true) == -1 || // Creating a hpfs process and then starts a virtual hpfs session.
validate_and_apply_patch_config(contract_config, mount_dir) == -1 || // Validate content in patch file and update contract section in config.
hpfs::stop_fs_session(mount_dir) == -1) // Stop the created hpfs session.
res = -1;
// Validate content in patch file and update contract section in config.
if (validate_and_apply_patch_config(cfg.contract, HPFS_PATCH_SESSION_NAME) == -1)
{
hpfs::stop_ro_session(HPFS_PATCH_SESSION_NAME);
return -1;
}
// Created hpfs process should be killed even the patch validation failed.
if (hpfs_ro_pid > 0 && util::kill_process(hpfs_ro_pid, true) == -1)
res = -1;
return res;
return hpfs::stop_ro_session(HPFS_PATCH_SESSION_NAME);
}
/**
@@ -793,9 +785,9 @@ namespace conf
* @param mount_dir hpfs process mount directory path.
* @return Returns -1 on error and 0 in successful update.
*/
int validate_and_apply_patch_config(contract_params &contract_config, std::string_view mount_dir)
int validate_and_apply_patch_config(contract_params &contract_config, std::string_view hpfs_session_name)
{
const std::string path = std::string(mount_dir).append(PATCH_FILE_PATH);
const std::string path = hpfs::physical_path(hpfs_session_name, hpfs::PATCH_FILE_PATH);
if (util::is_file_exists(path))
{
std::ifstream ifs(path);
@@ -889,7 +881,7 @@ namespace conf
if (!contract_config.appbill.bin_args.empty())
util::split_string(contract_config.appbill.runtime_args, contract_config.appbill.bin_args, " ");
contract_config.appbill.runtime_args.insert(contract_config.appbill.runtime_args.begin(), (contract_config.appbill.mode[0] == '/' ? contract_config.appbill.mode : util::realpath(conf::ctx.contract_dir + "/bin/" + contract_config.appbill.mode)));
std::cout << "Contract config updated from patch file\n";
}
catch (const std::exception &e)

View File

@@ -137,20 +137,20 @@ namespace conf
std::string hpws_exe_path; // hpws executable file path.
std::string hpfs_exe_path; // hpfs executable file path.
std::string contract_dir; // Contract base directory full path
std::string full_hist_dir; // Contract full history dir full path
std::string hist_dir; // Contract ledger history dir full path
std::string hpfs_dir; // Hpfs file system mount path (hpfs path)
std::string hpfs_rw_dir; // Hpfs read/write mount path.
std::string hpfs_serve_dir; // Hpfs server hpfs mount path.
std::string log_dir; // Contract log dir full path
std::string config_dir; // Contract config dir full path
std::string config_file; // Full path to the contract config file
std::string tls_key_file; // Full path to the tls private key file
std::string tls_cert_file; // Full path to the tls certificate
std::string contract_dir; // Contract base directory full path.
std::string full_hist_dir; // Contract full history dir full path.
std::string hist_dir; // Contract ledger history dir full path.
std::string hpfs_dir; // hpfs metdata dir (The location of hpfs log file).
std::string hpfs_mount_dir; // hpfs fuse file system mount path.
std::string hpfs_rw_dir; // hpfs read/write fs session path.
std::string log_dir; // Contract log dir full path.
std::string config_dir; // Contract config dir full path.
std::string config_file; // Full path to the contract config file.
std::string tls_key_file; // Full path to the tls private key file.
std::string tls_cert_file; // Full path to the tls certificate.
int config_fd; // Config file file descriptor
struct flock config_lock; // Config file record log
int config_fd; // Config file file descriptor.
struct flock config_lock; // Config file lock.
};
// Holds all the contract config values.
@@ -173,8 +173,6 @@ namespace conf
// Other modeuls will access config values via this.
extern contract_config cfg;
const static char *PATCH_FILE_PATH = "/patch.cfg"; // Config patch filename.
int init();
void deinit();
@@ -203,9 +201,9 @@ namespace conf
std::string_view extract_missing_field(std::string err_message);
int apply_patch_changes(contract_params &contract_config);
int apply_patch_changes();
int validate_and_apply_patch_config(contract_params &contract_config, std::string_view mount_dir);
int validate_and_apply_patch_config(contract_params &contract_config, std::string_view hpfs_session_name);
int set_config_lock();

View File

@@ -817,7 +817,7 @@ namespace consensus
new_lcl = ledger::ctx.get_lcl();
const uint64_t new_lcl_seq_no = ledger::ctx.get_seq_no();
LOG_INFO << "****Ledger created**** (lcl:" << new_lcl.substr(0, 15) << " state hash:" << cons_prop.state_hash << " patch hash:" << cons_prop.patch_hash << ")";
LOG_INFO << "****Ledger created**** (lcl:" << new_lcl.substr(0, 15) << " state:" << cons_prop.state_hash << " patch:" << cons_prop.patch_hash << ")";
// After the current ledger seq no is updated, we remove any newly expired inputs from candidate set.
{
@@ -844,7 +844,6 @@ namespace consensus
}
sc::contract_execution_args &args = ctx.contract_ctx->args;
args.hpfs_dir = conf::ctx.hpfs_rw_dir;
args.readonly = false;
args.time = cons_prop.time;
args.lcl = new_lcl;

View File

@@ -7,12 +7,17 @@
namespace hpfs
{
constexpr const char *HPFS_TRACE_ARG_ERROR = "trace=error";
constexpr const char *HPFS_TRACE_ARG_DEBUG = "trace=error";
constexpr const char *HPFS_HMAP_HASH = "::hpfs.hmap.hash";
constexpr const char *HPFS_HMAP_CHILDREN = "::hpfs.hmap.children";
constexpr const char *HPFS_SESSION = "::hpfs.session";
constexpr ino_t HPFS_ROOT_INO = 2;
constexpr const char *TRACE_ARG_ERROR = "trace=error";
constexpr const char *TRACE_ARG_DEBUG = "trace=error";
constexpr const char *RW_SESSION = "/::hpfs.rw.hmap";
constexpr const char *RO_SESSION = "/::hpfs.ro.";
constexpr const char *RO_SESSION_HMAP = "/::hpfs.ro.hmap.";
constexpr const char *HMAP_HASH = "::hpfs.hmap.hash";
constexpr const char *HMAP_CHILDREN = "::hpfs.hmap.children";
constexpr ino_t ROOT_INO = 1;
constexpr const char *INIT_SESSION_NAME = "init";
constexpr uint16_t PROCESS_INIT_TIMEOUT = 2000;
constexpr uint16_t INIT_CHECK_INTERVAL = 20;
bool init_success = false;
hpfs_context ctx;
@@ -22,32 +27,25 @@ namespace hpfs
*/
int init()
{
if (start_merge_process(ctx.hpfs_merge_pid) == -1)
if (start_hpfs_process(ctx.hpfs_pid) == -1)
return -1;
if (start_ro_rw_process(ctx.hpfs_rw_pid, conf::ctx.hpfs_rw_dir, false, true, false) == -1)
{
// Stop the merge process in case of failure.
util::kill_process(ctx.hpfs_merge_pid, true);
return -1;
}
util::h32 initial_state_hash;
util::h32 initial_patch_hash;
if (start_fs_session(conf::ctx.hpfs_rw_dir) == -1 ||
get_hash(initial_state_hash, conf::ctx.hpfs_rw_dir, sc::STATE_DIR_PATH) == -1 ||
get_hash(initial_patch_hash, conf::ctx.hpfs_rw_dir, conf::PATCH_FILE_PATH) == -1 ||
stop_fs_session(conf::ctx.hpfs_rw_dir) == -1)
if (start_ro_session(INIT_SESSION_NAME, true) == -1 ||
get_hash(initial_state_hash, INIT_SESSION_NAME, hpfs::STATE_DIR_PATH) == -1 ||
get_hash(initial_patch_hash, INIT_SESSION_NAME, hpfs::PATCH_FILE_PATH) == -1 ||
stop_ro_session(INIT_SESSION_NAME) == -1)
{
LOG_ERROR << "Failed to get initial state hash.";
LOG_ERROR << "Failed to get initial hpfs hashes.";
util::kill_process(ctx.hpfs_pid, true);
return -1;
}
ctx.set_hash(HPFS_PARENT_COMPONENTS::STATE, initial_state_hash);
ctx.set_hash(HPFS_PARENT_COMPONENTS::PATCH, initial_patch_hash);
LOG_INFO << "Initial state hash: " << initial_state_hash;
LOG_INFO << "Initial patch hash: " << initial_patch_hash;
LOG_INFO << "Initial state hash: " << initial_state_hash << " | patch hash: " << initial_patch_hash;
init_success = true;
return 0;
}
@@ -59,88 +57,26 @@ namespace hpfs
{
if (init_success)
{
LOG_DEBUG << "Stopping hpfs rw process... pid:" << ctx.hpfs_rw_pid;
if (ctx.hpfs_rw_pid > 0 && util::kill_process(ctx.hpfs_rw_pid, true) == 0)
LOG_INFO << "Stopped hpfs rw process.";
LOG_DEBUG << "Stopping hpfs merge process... pid:" << ctx.hpfs_merge_pid;
if (ctx.hpfs_merge_pid > 0 && util::kill_process(ctx.hpfs_merge_pid, true) == 0)
LOG_INFO << "Stopped hpfs merge process.";
LOG_DEBUG << "Stopping hpfs process... pid:" << ctx.hpfs_pid;
if (ctx.hpfs_pid > 0 && util::kill_process(ctx.hpfs_pid, true) == 0)
LOG_INFO << "Stopped hpfs process.";
}
}
/**
* Starts hpfs merge process.
* Starts the hpfs process used for all fs sessions.
*/
int start_merge_process(pid_t &hpfs_pid)
int start_hpfs_process(pid_t &hpfs_pid)
{
const pid_t pid = fork();
if (pid > 0)
{
LOG_DEBUG << "Starting hpfs merge process...";
// HotPocket process.
util::sleep(INIT_CHECK_INTERVAL);
// Check if hpfs process is still running.
// Sending signal 0 to test whether process exist.
if (util::kill_process(pid, false, 0) == -1)
return -1;
hpfs_pid = pid;
LOG_DEBUG << "hpfs merge process started. pid:" << hpfs_pid;
}
else if (pid == 0)
{
// hpfs process.
util::fork_detach();
const char *active_hpfs_trace_arg = (conf::cfg.log.loglevel_type == conf::LOG_SEVERITY::DEBUG ? HPFS_TRACE_ARG_DEBUG : HPFS_TRACE_ARG_ERROR);
// Fill process args.
char *execv_args[] = {
conf::ctx.hpfs_exe_path.data(),
(char *)"merge",
conf::ctx.hpfs_dir.data(),
(char *)active_hpfs_trace_arg,
NULL};
const int ret = execv(execv_args[0], execv_args);
std::cerr << errno << ": hpfs merge process execv failed.\n";
exit(1);
}
else
{
LOG_ERROR << errno << ": fork() failed when starting hpfs merge process.";
return -1;
}
return 0;
}
/**
* Starts hpfs readonly/readwrite process and also starts a virtual fs session.
*/
int start_ro_rw_process(pid_t &hpfs_pid, std::string &mount_dir, const bool readonly,
const bool hash_map_enabled, const bool auto_start_session, const uint16_t timeout)
{
const pid_t pid = fork();
const char *mode = readonly ? "ro" : "rw";
if (pid > 0)
{
// HotPocket process.
LOG_DEBUG << "Starting hpfs " << mode << " process at " << mount_dir;
// If the mount dir is not specified, assign a mount dir based on hpfs process id.
if (mount_dir.empty())
mount_dir = std::string(conf::ctx.hpfs_dir)
.append("/")
.append(std::to_string(pid));
LOG_DEBUG << "Starting hpfs process.";
// Wait until hpfs is initialized properly.
const uint16_t max_retries = timeout / INIT_CHECK_INTERVAL;
const uint16_t max_retries = PROCESS_INIT_TIMEOUT / INIT_CHECK_INTERVAL;
bool hpfs_initialized = false;
uint16_t retry_count = 0;
do
@@ -151,65 +87,54 @@ namespace hpfs
// Sending signal 0 to test whether process exist.
if (util::kill_process(pid, false, 0) == -1)
{
LOG_ERROR << "hpfs process " << pid << " has stopped at " << mount_dir;
LOG_ERROR << "hpfs process " << pid << " has stopped.";
break;
}
// We check for the specific inode no. of the mounted root dir. That means hpfs FUSE interface is up.
struct stat st;
if (stat(mount_dir.c_str(), &st) == -1)
if (stat(conf::ctx.hpfs_mount_dir.c_str(), &st) == -1)
{
LOG_ERROR << errno << ": Error in checking hpfs status at " << mount_dir;
LOG_ERROR << errno << ": Error in checking hpfs status.";
break;
}
hpfs_initialized = (st.st_ino == HPFS_ROOT_INO);
hpfs_initialized = (st.st_ino == ROOT_INO);
// Keep retrying until root inode no. matches or timeout occurs.
} while (!hpfs_initialized && ++retry_count <= max_retries);
// If hpfs FUSE interface initialized within the timeout period, we then attempt to start up a virtual fs session.
// hpfs achieves this by having a 'session' file created.
if (hpfs_initialized && auto_start_session)
start_fs_session(mount_dir);
// Kill the process if hpfs couldn't be initialized properly.
if (!hpfs_initialized)
{
LOG_ERROR << "Couldn't initialize hpfs session at " << mount_dir;
LOG_ERROR << "Couldn't initialize hpfs process.";
util::kill_process(pid, true);
return -1;
}
hpfs_pid = pid;
LOG_DEBUG << "hpfs " << mode << " process started at " << mount_dir << " pid:" << hpfs_pid;
LOG_DEBUG << "hpfs process started. pid:" << hpfs_pid;
}
else if (pid == 0)
{
// hpfs process.
util::fork_detach();
// If the mount dir is not specified, assign a mount dir based on hpfs process id.
const pid_t self_pid = getpid();
if (mount_dir.empty())
mount_dir = std::string(conf::ctx.hpfs_dir)
.append("/")
.append(std::to_string(self_pid));
const char *active_hpfs_trace_arg = (conf::cfg.log.loglevel_type == conf::LOG_SEVERITY::DEBUG ? HPFS_TRACE_ARG_DEBUG : HPFS_TRACE_ARG_ERROR);
const char *active_hpfs_trace_arg = (conf::cfg.log.loglevel_type == conf::LOG_SEVERITY::DEBUG ? TRACE_ARG_DEBUG : TRACE_ARG_ERROR);
// Fill process args.
char *execv_args[] = {
conf::ctx.hpfs_exe_path.data(),
(char *)mode, // hpfs mode: rw | ro
(char *)"fs",
conf::ctx.hpfs_dir.data(),
mount_dir.data(),
(char *)(hash_map_enabled ? "hmap=true" : "hmap=false"),
conf::ctx.hpfs_mount_dir.data(),
// In full history mode, we disable log merge of hpfs.
(char *)(conf::cfg.node.full_history ? "merge=false" : "merge=true"),
(char *)active_hpfs_trace_arg,
NULL};
const int ret = execv(execv_args[0], execv_args);
std::cerr << errno << ": hpfs session process execv failed.\n";
std::cerr << errno << ": hpfs process execv failed.\n";
exit(1);
}
else
@@ -222,32 +147,82 @@ namespace hpfs
}
/**
* Starts a virtual fs session on the hpfs process attached to the specified mount dir.
* Starts a virtual fs ReadWrite session with hash map enabled.
* If RW session already started, this will simply acquire a consumer reference.
* @return 0 on success. -1 on failure.
*/
int start_fs_session(std::string_view mount_dir)
int acquire_rw_session()
{
LOG_DEBUG << "Starting hpfs fs session at " << mount_dir;
std::scoped_lock lock(ctx.rw_mutex);
const std::string session_file = std::string(mount_dir).append("/").append(HPFS_SESSION);
LOG_DEBUG << "Starting hpfs rw session at " << conf::ctx.hpfs_rw_dir;
const std::string session_file = conf::ctx.hpfs_mount_dir + RW_SESSION;
// The sessions creation either should be succesful or should report as already exists (errno=EEXIST).
// Otherwise we consider it as failure.
if (mknod(session_file.c_str(), 0, 0) == -1 && errno != EEXIST)
{
LOG_ERROR << errno << ": Error starting hpfs rw session at " << conf::ctx.hpfs_rw_dir;
return -1;
}
ctx.rw_consumers++;
return 0;
}
/**
* Releases a consumer reference to the RW session. If there are no more references,
* actually stops the running RW session.
* @return 0 on success. -1 on failure.
*/
int release_rw_session()
{
std::scoped_lock lock(ctx.rw_mutex);
if (ctx.rw_consumers > 0)
ctx.rw_consumers--;
if (ctx.rw_consumers == 0)
{
const std::string session_file = conf::ctx.hpfs_mount_dir + RW_SESSION;
if (unlink(session_file.c_str()) == -1)
{
LOG_ERROR << errno << ": Error stopping hpfs rw session at " << conf::ctx.hpfs_rw_dir;
return -1;
}
}
return 0;
}
/**
* Starts a virtual fs ReadOnly session.
* @return 0 on success. -1 on failure.
*/
int start_ro_session(const std::string &name, const bool hmap_enabled)
{
LOG_DEBUG << "Starting hpfs ro session " << name << " hmap:" << hmap_enabled;
const std::string session_file = conf::ctx.hpfs_mount_dir + (hmap_enabled ? RO_SESSION_HMAP : RO_SESSION) + name;
if (mknod(session_file.c_str(), 0, 0) == -1)
{
LOG_ERROR << errno << ": Error starting hpfs fs session at " << mount_dir;
LOG_ERROR << errno << ": Error starting hpfs ro session " << name;
return -1;
}
return 0;
}
/**
* Stops the active virtual fs session on the hpfs process attached to the specified mount dir.
* Stops the specified ReadOnly fs session.
* @return 0 on success. -1 on failure.
*/
int stop_fs_session(std::string_view mount_dir)
int stop_ro_session(const std::string &name)
{
LOG_DEBUG << "Stopping hpfs fs session at " << mount_dir;
LOG_DEBUG << "Stopping hpfs ro session " << name;
const std::string session_file = std::string(mount_dir).append("/").append(HPFS_SESSION);
const std::string session_file = conf::ctx.hpfs_mount_dir + RO_SESSION + name;
if (unlink(session_file.c_str()) == -1)
{
LOG_ERROR << errno << ": Error stopping hpfs fs session at " << mount_dir;
LOG_ERROR << errno << ": Error stopping hpfs ro session " << name;
return -1;
}
return 0;
@@ -257,9 +232,9 @@ namespace hpfs
* Populates the hash of the specified vpath.
* @return 1 on success. 0 if vpath not found. -1 on error.
*/
int get_hash(util::h32 &hash, const std::string_view mount_dir, const std::string_view vpath)
int get_hash(util::h32 &hash, std::string_view session_name, std::string_view vpath)
{
const std::string path = std::string(mount_dir).append(vpath).append(HPFS_HMAP_HASH);
const std::string path = physical_path(session_name, std::string(vpath).append(HMAP_HASH));
const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1 && errno == ENOENT)
{
@@ -286,9 +261,9 @@ namespace hpfs
* Populates the list of file block hashes for the specified vpath.
* @return 1 on success. 0 if vpath not found. -1 on error.
*/
int get_file_block_hashes(std::vector<util::h32> &hashes, const std::string_view mount_dir, const std::string_view vpath)
int get_file_block_hashes(std::vector<util::h32> &hashes, std::string_view session_name, std::string_view vpath)
{
const std::string path = std::string(mount_dir).append(vpath).append(HPFS_HMAP_CHILDREN);
const std::string path = physical_path(session_name, std::string(vpath).append(HMAP_CHILDREN));
const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1 && errno == ENOENT)
{
@@ -326,9 +301,9 @@ namespace hpfs
* Populates the list of dir entry hashes for the specified vpath.
* @return 1 on success. 0 if vpath not found. -1 on error.
*/
int get_dir_children_hashes(std::vector<child_hash_node> &hash_nodes, const std::string_view mount_dir, const std::string_view dir_vpath)
int get_dir_children_hashes(std::vector<child_hash_node> &hash_nodes, std::string_view session_name, std::string_view dir_vpath)
{
const std::string path = std::string(mount_dir).append(dir_vpath).append(HPFS_HMAP_CHILDREN);
const std::string path = physical_path(session_name, std::string(dir_vpath).append(HMAP_CHILDREN));
const int fd = open(path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1 && errno == ENOENT)
{
@@ -362,4 +337,9 @@ namespace hpfs
return 1;
}
const std::string physical_path(std::string_view session_name, std::string_view vpath)
{
return conf::ctx.hpfs_mount_dir + "/" + session_name.data() + vpath.data();
}
} // namespace hpfs

View File

@@ -7,7 +7,10 @@
namespace hpfs
{
constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB;
constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB;
constexpr const char *RW_SESSION_NAME = "rw"; // The built-in session name used by hpfs for RW sessions.
constexpr const char *STATE_DIR_PATH = "/state"; // State directory name.
constexpr const char *PATCH_FILE_PATH = "/patch.cfg"; // Config patch filename.
struct child_hash_node
{
@@ -39,8 +42,12 @@ namespace hpfs
std::shared_mutex parent_mutexes[2] = {std::shared_mutex(), std::shared_mutex()}; // Mutexes for each parent.
public:
pid_t hpfs_merge_pid = 0;
pid_t hpfs_rw_pid = 0;
pid_t hpfs_pid = 0;
// No. of consumers for RW session.
// We use this as a reference counting mechanism to cleanup RW session when no one requires it.
uint32_t rw_consumers = 0;
std::mutex rw_mutex;
hpfs_context()
{
@@ -69,14 +76,15 @@ namespace hpfs
int init();
void deinit();
int start_merge_process(pid_t &hpfs_pid);
int start_ro_rw_process(pid_t &hpfs_pid, std::string &mount_dir, const bool readonly,
const bool hash_map_enabled, const bool auto_start_session, const uint16_t timeout = 4000);
int start_fs_session(std::string_view mount_dir);
int stop_fs_session(std::string_view mount_dir);
int get_hash(util::h32 &hash, const std::string_view mount_dir, const std::string_view vpath);
int get_file_block_hashes(std::vector<util::h32> &hashes, const std::string_view mount_dir, const std::string_view vpath);
int get_dir_children_hashes(std::vector<child_hash_node> &hash_nodes, const std::string_view mount_dir, const std::string_view dir_vpath);
int start_hpfs_process(pid_t &hpfs_pid);
int acquire_rw_session();
int release_rw_session();
int start_ro_session(const std::string &name, const bool hmap_enabled);
int stop_ro_session(const std::string &name);
int get_hash(util::h32 &hash, std::string_view session_name, std::string_view vpath);
int get_file_block_hashes(std::vector<util::h32> &hashes, std::string_view session_name, std::string_view vpath);
int get_dir_children_hashes(std::vector<child_hash_node> &hash_nodes, std::string_view session_name, std::string_view dir_vpath);
const std::string physical_path(std::string_view session_name, std::string_view vpath);
} // namespace hpfs
#endif

View File

@@ -18,21 +18,17 @@ namespace p2pmsg = msg::fbuf::p2pmsg;
namespace hpfs_serve
{
constexpr uint16_t LOOP_WAIT = 20; // Milliseconds
constexpr const char *HPFS_SESSION_NAME = "rw";
uint16_t REQUEST_BATCH_TIMEOUT;
bool is_shutting_down = false;
bool init_success = false;
pid_t hpfs_pid;
std::thread hpfs_serve_thread;
int init()
{
REQUEST_BATCH_TIMEOUT = hpfs::get_request_resubmit_timeout() * 0.9;
if (hpfs::start_ro_rw_process(hpfs_pid, conf::ctx.hpfs_serve_dir, true, true, false) == -1)
return -1;
hpfs_serve_thread = std::thread(hpfs_serve_loop);
init_success = true;
return 0;
@@ -44,10 +40,6 @@ namespace hpfs_serve
{
is_shutting_down = true;
hpfs_serve_thread.join();
LOG_DEBUG << "Stopping hpfs state serve process... pid:" << hpfs_pid;
if (hpfs_pid > 0 && util::kill_process(hpfs_pid, true) == 0)
LOG_INFO << "Stopped hpfs state serve process.";
}
}
@@ -83,7 +75,7 @@ namespace hpfs_serve
if (hpfs_requests.empty())
continue;
if (hpfs::start_fs_session(conf::ctx.hpfs_serve_dir) != -1)
if (hpfs::acquire_rw_session() != -1)
{
for (auto &[session_id, request] : hpfs_requests)
{
@@ -124,7 +116,7 @@ namespace hpfs_serve
}
}
hpfs::stop_fs_session(conf::ctx.hpfs_serve_dir);
hpfs::release_rw_session();
}
hpfs_requests.clear();
@@ -225,7 +217,7 @@ namespace hpfs_serve
{
// Check whether the existing block hash matches expected hash.
std::vector<util::h32> block_hashes;
int result = hpfs::get_file_block_hashes(block_hashes, conf::ctx.hpfs_serve_dir, vpath);
int result = hpfs::get_file_block_hashes(block_hashes, HPFS_SESSION_NAME, vpath);
if (result == 1)
{
if (block_id >= block_hashes.size())
@@ -241,7 +233,7 @@ namespace hpfs_serve
else // Get actual block data.
{
struct stat st;
const std::string file_path = std::string(conf::ctx.hpfs_serve_dir).append(vpath);
const std::string file_path = conf::ctx.hpfs_rw_dir + vpath.data();
const off_t block_offset = block_id * hpfs::BLOCK_SIZE;
const int fd = open(file_path.c_str(), O_RDONLY | O_CLOEXEC);
if (fd == -1)
@@ -302,7 +294,7 @@ namespace hpfs_serve
{
// Check whether the existing file hash matches expected hash.
util::h32 file_hash = util::h32_empty;
int result = hpfs::get_hash(file_hash, conf::ctx.hpfs_serve_dir, vpath);
int result = hpfs::get_hash(file_hash, HPFS_SESSION_NAME, vpath);
if (result == 1)
{
if (file_hash != expected_hash)
@@ -311,14 +303,14 @@ namespace hpfs_serve
result = 0;
}
// Get the block hashes.
else if (hpfs::get_file_block_hashes(hashes, conf::ctx.hpfs_serve_dir, vpath) < 0)
else if (hpfs::get_file_block_hashes(hashes, HPFS_SESSION_NAME, vpath) < 0)
{
result = -1;
}
else
{
// Get actual file length.
const std::string file_path = std::string(conf::ctx.hpfs_serve_dir).append(vpath);
const std::string file_path = conf::ctx.hpfs_rw_dir + vpath.data();
struct stat st;
if (stat(file_path.c_str(), &st) == -1)
{
@@ -342,7 +334,7 @@ namespace hpfs_serve
{
// Check whether the existing dir hash matches expected hash.
util::h32 dir_hash = util::h32_empty;
int result = hpfs::get_hash(dir_hash, conf::ctx.hpfs_serve_dir, vpath);
int result = hpfs::get_hash(dir_hash, HPFS_SESSION_NAME, vpath);
if (result == 1)
{
if (dir_hash != expected_hash)
@@ -351,7 +343,7 @@ namespace hpfs_serve
result = 0;
}
// Get the children hash nodes.
else if (hpfs::get_dir_children_hashes(hash_nodes, conf::ctx.hpfs_serve_dir, vpath) < 0)
else if (hpfs::get_dir_children_hashes(hash_nodes, HPFS_SESSION_NAME, vpath) < 0)
{
result = -1;
}

View File

@@ -42,7 +42,6 @@ namespace hpfs_sync
// Patch file sync has the highest priority.
ctx.current_syncing_parent = hpfs::HPFS_PARENT_COMPONENTS::PATCH;
ctx.hpfs_sync_thread = std::thread(hpfs_syncer_loop);
ctx.hpfs_mount_dir = conf::ctx.hpfs_rw_dir;
init_success = true;
return 0;
}
@@ -103,7 +102,7 @@ namespace hpfs_sync
if (!ctx.is_syncing)
continue;
if (hpfs::start_fs_session(ctx.hpfs_mount_dir) != -1)
if (hpfs::acquire_rw_session() != -1)
{
while (!ctx.is_shutting_down)
{
@@ -135,7 +134,7 @@ namespace hpfs_sync
LOG_INFO << "hpfs sync: Target patch state achieved: " << new_state;
// Appling new patch file changes to hpcore runtime.
if (conf::validate_and_apply_patch_config(conf::cfg.contract, conf::ctx.hpfs_rw_dir) == -1)
if (conf::validate_and_apply_patch_config(conf::cfg.contract, hpfs::RW_SESSION_NAME) == -1)
{
LOG_ERROR << "Appling patch file changes after sync failed";
}
@@ -145,7 +144,7 @@ namespace hpfs_sync
// Update global hash tracker with the new patch file hash.
util::h32 updated_patch_hash;
hpfs::get_hash(updated_patch_hash, conf::ctx.hpfs_rw_dir, conf::PATCH_FILE_PATH);
hpfs::get_hash(updated_patch_hash, hpfs::RW_SESSION_NAME, hpfs::PATCH_FILE_PATH);
hpfs::ctx.set_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH, updated_patch_hash);
}
@@ -172,7 +171,7 @@ namespace hpfs_sync
}
LOG_INFO << "hpfs sync: All parents synced.";
hpfs::stop_fs_session(ctx.hpfs_mount_dir);
hpfs::release_rw_session();
}
else
{
@@ -193,12 +192,12 @@ namespace hpfs_sync
BACKLOG_ITEM_TYPE target_parent_backlog_item_type;
if (ctx.current_syncing_parent == hpfs::HPFS_PARENT_COMPONENTS::STATE)
{
target_parent_vpath = sc::STATE_DIR_PATH;
target_parent_vpath = hpfs::STATE_DIR_PATH;
target_parent_backlog_item_type = BACKLOG_ITEM_TYPE::DIR;
}
else if (ctx.current_syncing_parent == hpfs::HPFS_PARENT_COMPONENTS::PATCH)
{
target_parent_vpath = conf::PATCH_FILE_PATH;
target_parent_vpath = hpfs::PATCH_FILE_PATH;
target_parent_backlog_item_type = BACKLOG_ITEM_TYPE::FILE;
}
std::string lcl = ledger::ctx.get_lcl();
@@ -317,7 +316,7 @@ namespace hpfs_sync
// After handling each response, check whether we have reached target hpfs state.
// get_hash returns 0 incase target parent is not existing in our side.
if (hpfs::get_hash(updated_state, ctx.hpfs_mount_dir, target_parent_vpath) == -1)
if (hpfs::get_hash(updated_state, hpfs::RW_SESSION_NAME, target_parent_vpath) == -1)
{
LOG_ERROR << "hpfs sync: exiting due to hash check error.";
return -1;
@@ -510,13 +509,13 @@ namespace hpfs_sync
LOG_DEBUG << "hpfs sync: Processing fs entries response for " << vpath;
// Create physical directory on our side if not exist.
std::string parent_physical_path = std::string(ctx.hpfs_mount_dir).append(vpath);
std::string parent_physical_path = conf::ctx.hpfs_rw_dir + vpath.data();
if (util::create_dir_tree_recursive(parent_physical_path) == -1)
return -1;
// Get the children hash entries and compare with what we got from peer.
std::vector<hpfs::child_hash_node> existing_fs_entries;
if (hpfs::get_dir_children_hashes(existing_fs_entries, ctx.hpfs_mount_dir, vpath) == -1)
if (hpfs::get_dir_children_hashes(existing_fs_entries, hpfs::RW_SESSION_NAME, vpath) == -1)
return -1;
// Request more info on fs entries that exist on both sides but are different.
@@ -545,7 +544,7 @@ namespace hpfs_sync
else
{
// If there was an entry that does not exist on other side, delete it.
std::string child_physical_path = std::string(ctx.hpfs_mount_dir).append(child_vpath);
std::string child_physical_path = conf::ctx.hpfs_rw_dir + child_vpath.data();
if ((ex_entry.is_file && unlink(child_physical_path.c_str()) == -1) ||
!ex_entry.is_file && util::remove_directory_recursively(child_physical_path.c_str()) == -1)
@@ -588,7 +587,7 @@ namespace hpfs_sync
// File block hashes on our side (file might not exist on our side).
std::vector<util::h32> existing_hashes;
if (hpfs::get_file_block_hashes(existing_hashes, ctx.hpfs_mount_dir, vpath) == -1 && errno != ENOENT)
if (hpfs::get_file_block_hashes(existing_hashes, hpfs::RW_SESSION_NAME, vpath) == -1 && errno != ENOENT)
return -1;
const size_t existing_hash_count = existing_hashes.size();
@@ -605,7 +604,7 @@ namespace hpfs_sync
if (existing_hashes.size() >= hash_count)
{
// If peer file might be smaller, truncate our file to match with peer file.
std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(vpath);
std::string file_physical_path = conf::ctx.hpfs_rw_dir + vpath.data();
if (truncate(file_physical_path.c_str(), file_length) == -1)
return -1;
}
@@ -626,7 +625,7 @@ namespace hpfs_sync
<< " (len:" << buf.length()
<< ") of " << vpath;
std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(vpath);
std::string file_physical_path = conf::ctx.hpfs_rw_dir + vpath.data();
const int fd = open(file_physical_path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, FILE_PERMS);
if (fd == -1)
{

View File

@@ -52,7 +52,6 @@ namespace hpfs_sync
std::shared_mutex target_state_mutex;
std::atomic<bool> is_syncing = false;
std::atomic<bool> is_shutting_down = false;
std::string hpfs_mount_dir;
};
extern sync_context ctx;

View File

@@ -199,11 +199,12 @@ int main(int argc, char **argv)
LOG_INFO << "Public key: " << conf::cfg.node.public_key_hex;
LOG_INFO << "Contract: " << conf::cfg.contract.id << " (" << conf::cfg.contract.version << ")";
if (ledger::init() == -1 ||
unl::init() == -1 ||
hpfs::init() == -1 ||
if (hpfs::init() == -1 ||
conf::apply_patch_changes() == -1 ||
hpfs_serve::init() == -1 ||
hpfs_sync::init() == -1 ||
ledger::init() == -1 ||
unl::init() == -1 ||
consensus::init() == -1 ||
read_req::init() == -1 ||
p2p::init() == -1 ||

View File

@@ -89,7 +89,7 @@ namespace sc
execv_args[j] = conf::cfg.contract.runtime_binexec_args[i].data();
execv_args[len - 1] = NULL;
const std::string current_dir = std::string(ctx.args.hpfs_dir).append(STATE_DIR_PATH);
const std::string current_dir = hpfs::physical_path(ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH);
chdir(current_dir.c_str());
execv(execv_args[0], execv_args);
@@ -104,27 +104,6 @@ namespace sc
cleanup_fds(ctx);
util::h32 patch_hash;
if (hpfs::get_hash(patch_hash, ctx.args.hpfs_dir, conf::PATCH_FILE_PATH) == 1)
{
if (patch_hash != hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH))
{
// Appling new patch file changes to hpcore runtime.
if (conf::validate_and_apply_patch_config(conf::cfg.contract, ctx.args.hpfs_dir) == -1)
{
LOG_ERROR << "Appling patch file after contract execution failed";
}
else
{
// Update global hash tracker with the new patch file hash.
hpfs::ctx.set_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH, patch_hash);
unl::update_unl_changes_from_patch();
}
}
}
if (stop_hpfs_session(ctx) == -1)
ret = -1;
@@ -170,50 +149,59 @@ namespace sc
}
/**
* Starts the hpfs read/write virtual filesystem.
* Starts the hpfs virtual filesystem session used for contract execution.
*/
int start_hpfs_session(execution_context &ctx)
{
// In readonly mode, we must start the hpfs process first.
// In RW mode, there is a global hpfs RW process so we only need to create an fs session.
if (ctx.args.readonly)
{
if (hpfs::start_ro_rw_process(ctx.hpfs_pid, ctx.args.hpfs_dir, true, false, false) == -1)
return -1;
}
else
{
ctx.hpfs_pid = hpfs::ctx.hpfs_rw_pid;
}
if (!ctx.args.readonly)
ctx.args.hpfs_session_name = hpfs::RW_SESSION_NAME;
if (hpfs::start_fs_session(ctx.args.hpfs_dir) == -1)
return -1;
return 0;
return ctx.args.readonly ? hpfs::start_ro_session(ctx.args.hpfs_session_name, false)
: hpfs::acquire_rw_session();
}
/**
* Stops the hpfs virtual filesystem.
* Stops the hpfs virtual filesystem session.
*/
int stop_hpfs_session(execution_context &ctx)
{
int result = 0;
// Read the root hash if not in readonly mode.
if (!ctx.args.readonly && hpfs::get_hash(ctx.args.post_execution_state_hash, ctx.args.hpfs_dir, STATE_DIR_PATH) < 1)
result = -1;
if (ctx.args.readonly)
{
return hpfs::stop_ro_session(ctx.args.hpfs_session_name);
}
else
{
// Read the state hash if not in readonly mode.
if (hpfs::get_hash(ctx.args.post_execution_state_hash, ctx.args.hpfs_session_name, hpfs::STATE_DIR_PATH) < 1)
{
hpfs::release_rw_session();
return -1;
}
LOG_DEBUG << "Stopping hpfs contract session..." << (ctx.args.readonly ? " (rdonly)" : "");
util::h32 patch_hash;
const int patch_hash_result = hpfs::get_hash(patch_hash, ctx.args.hpfs_session_name, hpfs::PATCH_FILE_PATH);
if (patch_hash_result == -1)
{
hpfs::release_rw_session();
return -1;
}
else if (patch_hash_result == 1 && patch_hash != hpfs::ctx.get_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH))
{
// Appling new patch file changes to hpcore runtime.
if (conf::validate_and_apply_patch_config(conf::cfg.contract, ctx.args.hpfs_session_name) == -1)
{
LOG_ERROR << "Appling patch file after contract execution failed";
}
else
{
// Update global hash tracker with the new patch file hash.
hpfs::ctx.set_hash(hpfs::HPFS_PARENT_COMPONENTS::PATCH, patch_hash);
unl::update_unl_changes_from_patch();
}
}
if (hpfs::stop_fs_session(ctx.args.hpfs_dir) == -1)
return -1;
// In readonly mode, we must also stop the hpfs process itself after sopping the session.
// In RW mode, we only need to stop the fs session and let the process keep running.
if (ctx.args.readonly && util::kill_process(ctx.hpfs_pid, true) == -1)
result = -1;
ctx.hpfs_pid = 0;
return result;
return hpfs::release_rw_session();
}
}
/**

View File

@@ -15,7 +15,6 @@ namespace sc
{
constexpr uint16_t MAX_NPL_MSG_QUEUE_SIZE = 64; // Maximum npl message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31)....
constexpr uint16_t MAX_CONTROL_MSG_QUEUE_SIZE = 64; // Maximum out message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31)....
constexpr const char *STATE_DIR_PATH = "/state"; // State directory name.
struct fd_pair
{
@@ -60,8 +59,8 @@ namespace sc
// Whether the contract should execute in read only mode (to serve read requests).
bool readonly = false;
// Hpfs dir path to be used for this execution.
std::string hpfs_dir;
// hpfs session name used for this execution.
std::string hpfs_session_name;
// Map of user I/O buffers (map key: user binary public key).
// The value is a pair holding consensus-verified inputs and contract-generated outputs.
@@ -112,9 +111,6 @@ namespace sc
// Holds the contract process id (if currently executing).
pid_t contract_pid = 0;
// Holds the hpfs rw process id (if currently executing).
pid_t hpfs_pid = 0;
// Thread to collect contract inputs and outputs and feed npl messages while contract is running.
std::thread contract_monitor_thread;

View File

@@ -217,9 +217,7 @@ namespace read_req
*/
void initialize_execution_context(const user_read_req &read_request, const pthread_t thread_id, sc::execution_context &contract_ctx)
{
// Create new folder with the thread id per each thread.
contract_ctx.args.hpfs_dir = conf::ctx.hpfs_dir;
contract_ctx.args.hpfs_dir.append("/rr_").append(std::to_string(thread_id));
contract_ctx.args.hpfs_session_name = "ro_" + std::to_string(thread_id);
contract_ctx.args.readonly = true;
sc::contract_iobufs user_bufs;
user_bufs.inputs.push_back(read_request.content);

View File

@@ -8,6 +8,7 @@
#include "../hplog.hpp"
#include "../ledger.hpp"
#include "../util/buffer_store.hpp"
#include "../hpfs/hpfs.hpp"
#include "usr.hpp"
#include "user_session_handler.hpp"
#include "user_comm_session.hpp"
@@ -389,7 +390,8 @@ namespace usr
util::fork_detach();
// before execution chdir into a valid the latest state data directory that contains an appbill.table
chdir(conf::ctx.hpfs_rw_dir.c_str());
const std::string appbill_dir = conf::ctx.hpfs_rw_dir + hpfs::STATE_DIR_PATH;
chdir(appbill_dir.c_str());
int ret = execv(execv_args[0], execv_args);
std::cerr << errno << ": Appbill process execv failed.\n";
return false;

Binary file not shown.