diff --git a/.dockerignore b/.dockerignore index bb73e4aa..dcc16fcd 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,2 +1,4 @@ **/** -!build/hpcore \ No newline at end of file +!build/hpcore +!build/hpstatemon +!libfuse3.so.3 \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 18ef9990..7f7ddbad 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,10 +40,19 @@ target_link_libraries(hpsupport ${CMAKE_DL_LIBS} ) +add_library(hpstatefs + src/statefs/hasher.cpp + src/statefs/state_common.cpp + src/statefs/hashmap_builder.cpp + src/statefs/hashtree_builder.cpp + src/statefs/state_restore.cpp +) +target_link_libraries(hpstatefs hpsupport) + add_library(hpproc src/proc/proc.cpp ) -target_link_libraries(hpproc hpsupport) +target_link_libraries(hpproc hpsupport hpstatefs) add_library(hpbill src/bill/corebill.cpp @@ -82,7 +91,6 @@ target_link_libraries(hpusr hpsupport hpsock hpschema) add_library(hpcons src/cons/cons.cpp src/cons/ledger_handler.cpp - src/cons/statemap_handler.cpp ) target_link_libraries(hpcons hpsupport hpproc hpp2p hpusr) @@ -100,6 +108,21 @@ target_link_libraries(hpcore hpcons ) +add_executable(hpstatemon + src/statefs/state_monitor/fusefs.cpp + src/statefs/state_monitor/state_monitor.cpp + src/statefs/hasher.cpp + src/statefs/state_common.cpp +) +target_link_libraries(hpstatemon + hpsupport + libfuse3.so.3 +) + +add_dependencies(hpcore + hpstatemon +) + target_precompile_headers(hpsupport PUBLIC src/pchheader.hpp) target_precompile_headers(hpcore REUSE_FROM hpsupport) target_precompile_headers(hpsock REUSE_FROM hpsupport) @@ -107,6 +130,7 @@ target_precompile_headers(hpschema REUSE_FROM hpsupport) target_precompile_headers(hpp2p REUSE_FROM hpsupport) target_precompile_headers(hpusr REUSE_FROM hpsupport) target_precompile_headers(hpcons REUSE_FROM hpsupport) +target_precompile_headers(hpstatemon REUSE_FROM hpsupport) # Create docker image from hpcore build output with 'make docker' # Requires docker to be runnable without 'sudo' diff --git a/Dockerfile b/Dockerfile index 9d59a341..dfcac61c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,7 +2,14 @@ # Otherwise, hpcore itself can run on any docker image like ubuntu or debian without NodeJs. FROM node:10.17.0-buster-slim +# Copy fuse shared library and register it. +COPY ./libfuse3.so.3 /usr/local/lib/ +RUN ldconfig +# Install fuse. +RUN apt-get update && apt-get install -y fuse && rm -rf /var/lib/apt/lists/* + # hpcore binary is copied to /hp directory withtin the docker image. WORKDIR /hp COPY ./build/hpcore . +COPY ./build/hpstatemon . ENTRYPOINT ["/hp/hpcore"] \ No newline at end of file diff --git a/README.md b/README.md index 12fd407b..38c646b7 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ A C++ version of hotpocket designed for production envrionments, original protot * RapidJSON - http://rapidjson.org * P2P Protocol - https://google.github.io/flatbuffers/ * TLS - https://www.openssl.org/ +* Fuse filesystem - https://github.com/libfuse/libfuse ## Steps to setup Hot Pocket @@ -68,6 +69,13 @@ Example: When you make a change to `p2pmsg_content_.fbc` defnition file, you nee 3. Run `./config && make` 4. Run `sudo make install` +#### Install libfuse +1. `sudo apt-get install -y meson ninja-build pkg-config` +2. Download [libfuse 3.8](https://github.com/libfuse/libfuse/releases/download/fuse-3.8.0/fuse-3.8.0.tar.xz) and extract. +3. `mkdir build; cd build` +4. `meson .. && ninja` +6. `sudo ninja install` + #### Run ldconfig `sudo ldconfig` @@ -98,4 +106,6 @@ Code is divided into subsystems via namespaces. **sock::** Handles generic web sockets functionality. Mainly acts as a wrapper for boost/beast. -**util::** Contains shared data structures/helper functions used by multiple subsystems. \ No newline at end of file +**util::** Contains shared data structures/helper functions used by multiple subsystems. + +**statefs::** Fuse-based state filesystem monitoring and contract state maintenence subsystem. \ No newline at end of file diff --git a/cluster-create.sh b/cluster-create.sh index 274380d4..47bf3726 100755 --- a/cluster-create.sh +++ b/cluster-create.sh @@ -50,8 +50,9 @@ do binargs: './bin/contract.js', \ peerport: ${peerport}, \ pubport: ${pubport}, \ - roundtime: 1000, - loglevel: 'debug' + roundtime: 1000, \ + loglevel: 'debug', \ + loggers:['console'] \ }, null, 2)" > hp.cfg rm tmp.json diff --git a/cluster-start.sh b/cluster-start.sh index f253960b..977f8b63 100755 --- a/cluster-start.sh +++ b/cluster-start.sh @@ -23,5 +23,6 @@ let pubport=8080+$n # We specify --name for each node so it will be the virtual dns name for each node. docker run --rm -t -i --network=hpnet --name=node${n} \ -p ${pubport}:${pubport} \ + --device /dev/fuse --cap-add SYS_ADMIN --security-opt apparmor:unconfined \ --mount type=bind,source=${clusterloc}/node${n},target=/contract \ hpcore:latest run /contract \ No newline at end of file diff --git a/examples/echocontract/contract.js b/examples/echocontract/contract.js index 87b83456..abe94886 100644 --- a/examples/echocontract/contract.js +++ b/examples/echocontract/contract.js @@ -6,11 +6,10 @@ const fs = require('fs') //console.log("===Sample contract started==="); //console.log("Contract args received from hp: " + input); -let hpargsstr = fs.readFileSync(0, 'utf8'); -let hpargs = JSON.parse(hpargsstr); +let hpargs = JSON.parse( fs.readFileSync(0, 'utf8')); // We just save execution args as an example state file change. -fs.appendFileSync("state/execargs.txt", hpargsstr + "\n"); +fs.appendFileSync("state/exects.txt", "ts:" + hpargs.ts + "\n"); Object.keys(hpargs.usrfd).forEach(function (key, index) { let userfds = hpargs.usrfd[key]; diff --git a/libfuse3.so.3 b/libfuse3.so.3 new file mode 100755 index 00000000..e4eedf2a Binary files /dev/null and b/libfuse3.so.3 differ diff --git a/src/conf.cpp b/src/conf.cpp index 9bfad565..ed46ca94 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -17,15 +17,6 @@ contract_config cfg; const static char *MODE_PASSIVE = "passive"; const static char *MODE_ACTIVE = "active"; -// provide a safe std::string overload for realpath -std::string realpath(std::string path) -{ - std::array buffer; - ::realpath(path.c_str(), buffer.data()); - buffer[PATH_MAX] = '\0'; - return buffer.data(); -} - /** * Loads and initializes the contract config for execution. Must be called once during application startup. * @return 0 for success. -1 for failure. @@ -91,7 +82,7 @@ int create_contract() boost::filesystem::create_directories(ctx.configdir); boost::filesystem::create_directories(ctx.histdir); boost::filesystem::create_directories(ctx.statedir); - boost::filesystem::create_directories(ctx.statemapdir); + boost::filesystem::create_directories(ctx.statehistdir); //Create config file with default settings. @@ -128,9 +119,16 @@ int create_contract() * Updates the contract context with directory paths based on provided base directory. * This is called after parsing HP command line arg in order to populate the ctx. */ -void set_contract_dir_paths(std::string basedir) +void set_contract_dir_paths(std::string exepath, std::string basedir) { - if (basedir == "") + if (exepath.empty()) + { + // this code branch will never execute the way main is currently coded, but it might change in future + std::cerr << "Executable path must be specified\n"; + exit(1); + } + + if (basedir.empty()) { // this code branch will never execute the way main is currently coded, but it might change in future std::cerr << "a contract directory must be specified\n"; @@ -138,7 +136,10 @@ void set_contract_dir_paths(std::string basedir) } // resolving the path through realpath will remove any trailing slash if present - basedir = realpath(basedir); + basedir = util::realpath(basedir); + + ctx.exedir = boost::filesystem::path(util::realpath(exepath)).parent_path().string(); + ctx.statemonexepath = ctx.exedir + "/" + "hpstatemon"; ctx.contractdir = basedir; ctx.configdir = basedir + "/cfg"; @@ -147,7 +148,7 @@ void set_contract_dir_paths(std::string basedir) ctx.tlscertfile = ctx.configdir + "/tlscert.pem"; ctx.histdir = basedir + "/hist"; ctx.statedir = basedir + "/state"; - ctx.statemapdir = basedir + "/statemap"; + ctx.statehistdir = basedir + "/statehist"; ctx.logdir = basedir + "/log"; } @@ -503,7 +504,7 @@ int validate_contract_dir_paths() ctx.configfile, ctx.histdir, ctx.statedir, - ctx.statemapdir, + ctx.statehistdir, ctx.tlskeyfile, ctx.tlscertfile}; diff --git a/src/conf.hpp b/src/conf.hpp index 9ea265a7..286f6ac1 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -24,11 +24,13 @@ enum OPERATING_MODE struct contract_ctx { std::string command; // The CLI command issued to launch HotPocket + std::string exedir; // Hot Pocket executable dir. + std::string statemonexepath;// State monitor executable file path. std::string contractdir; // Contract base directory full path std::string histdir; // Contract history dir full path - std::string statedir; // Contract state dir full path - std::string statemapdir; // Contract state map dir (.merkel files) full path + std::string statedir; // Contract executing state dir full path (This is the fuse mount point) + std::string statehistdir; // Contract state history dir full path std::string logdir; // Contract log dir full path std::string configdir; // Contract config dir full path std::string configfile; // Full path to the contract config file @@ -90,7 +92,7 @@ int rekey(); int create_contract(); -void set_contract_dir_paths(std::string basedir); +void set_contract_dir_paths(std::string exepath, std::string basedir); //------Internal-use functions for this namespace. diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index 07c34f86..135255bb 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -12,7 +12,6 @@ #include "../crypto.hpp" #include "../proc/proc.hpp" #include "ledger_handler.hpp" -#include "statemap_handler.hpp" #include "cons.hpp" namespace p2pmsg = fbschema::p2pmsg; @@ -590,7 +589,6 @@ void apply_ledger(const p2p::proposal &cons_prop) extract_user_outputs_from_contract_bufmap(useriobufmap); broadcast_npl_output(nplbufpair.output); - update_state_blockmap(updated_blocks); } /** diff --git a/src/cons/statemap_handler.cpp b/src/cons/statemap_handler.cpp deleted file mode 100644 index 2e161192..00000000 --- a/src/cons/statemap_handler.cpp +++ /dev/null @@ -1,177 +0,0 @@ -#include "../pchheader.hpp" -#include "../conf.hpp" -#include "../hplog.hpp" -#include "../proc/proc.hpp" - -namespace cons -{ - -constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; //this is the logical block size files are decomposed in, it's also the size of the merkle tree -constexpr size_t HASH_SIZE = crypto_generichash_blake2b_BYTES; -constexpr size_t MAX_HASHES = BLOCK_SIZE / HASH_SIZE; -constexpr const char *MERKLE_EXTENSION = ".merkle"; - -struct B2H // blake2b hash is 32 bytes which we store as 4 quad words -{ - uint64_t data[4]; -}; - -// provide some helper functions for working with 32 byte hash type -bool operator==(B2H &lhs, B2H &rhs) -{ - return lhs.data[0] == rhs.data[0] && lhs.data[1] == rhs.data[1] && lhs.data[2] == rhs.data[2] && lhs.data[3] == rhs.data[3]; -} - -// the actual hash function, note that the B2H datatype is always passed by value being only 4 quadwords -B2H hash(const void *ptr, size_t len) -{ - B2H ret; - crypto_generichash_blake2b_state state; - crypto_generichash_blake2b_init(&state, NULL, 0, HASH_SIZE); - crypto_generichash_blake2b_update(&state, - reinterpret_cast(ptr), len); - crypto_generichash_blake2b_final( - &state, - reinterpret_cast(&ret), - HASH_SIZE); - return ret; -} - -/** - * Updates the .merkel block map for the given state file. - * @param filepath Full path of the state file. - * @param hinted_blocks Set of updated file block ids. If empty full merkel block map will be recomputed. - */ -int update_file_blockmap(const std::string &filepath, const std::set &hinted_blocks) -{ - // .merkel file path will be corresponding path in "statemap" directory. - std::string merkle_fn; - const size_t relative_path_len = filepath.length() - conf::ctx.statedir.length(); - merkle_fn.reserve(conf::ctx.statemapdir.length() + relative_path_len + 7); - merkle_fn.append(conf::ctx.statemapdir); - merkle_fn.append(filepath.substr(conf::ctx.statedir.length(), relative_path_len)); - merkle_fn.append(MERKLE_EXTENSION); - - // To benefit from hint mode, the .merkle file must already exist. If not we simply disable hint mode - // because we anyway have to rebuild entire merkle file from scratch. - bool hint_mode = !hinted_blocks.empty(); - if (access(merkle_fn.c_str(), F_OK) == -1) - hint_mode = false; - - // open the target file for which we are building or updating a merkle tree - FILE *f = fopen(filepath.c_str(), "rb"); - if (!f) - { - LOG_ERR << "Failed to open state file: " << filepath << " for reading."; - return -1; - } - - // the merkle tree structure is only 4mb and could technically sit on stack in most cases but - // TODO: ******Why can't we allocate this on the stack? - auto merkle_tree = std::make_unique(MAX_HASHES); - // same with the read buffer - auto read_buffer = std::make_unique(BLOCK_SIZE); - - // this iterator will be used if we are in hint mode - auto hint = hinted_blocks.begin(); - - size_t block_counter = 0; - size_t block_location = 0; - - while (true) - { - // if hint blocks have been specified we'll seek to specific blocks in the file based on hint list. - if (hint_mode) - { - // check if we've run out of elements - if (hint == hinted_blocks.end()) - break; - - // get the next block on their list - block_location = *hint++; - - // seek the file cursor to the block - fseek(f, block_location * BLOCK_SIZE, SEEK_SET); - } - - // read the block - int bytesread = fread(read_buffer.get(), 1, BLOCK_SIZE, f); - if (bytesread <= 0) - break; - - // calculate the block hash - merkle_tree[block_location++] = hash(read_buffer.get(), std::min(BLOCK_SIZE, (size_t)bytesread)); - block_counter++; - } - - fclose(f); - - // now that we've computed all the block hashes we are interested in we have to deal with the .merkle file - - // open the .merkle file - // if we are in hint_mode we will open it in rb+ which will preserve its contents - // otherwise we will truncate it if it already exists, because we will have to overwrite everything anyway - FILE *fm = fopen(merkle_fn.c_str(), (hint_mode ? "rb+" : "wb+")); - if (!fm) - { - LOG_ERR << "Failed to open merkle file: " << filepath << " for writing."; - return -1; - } - - // get the size of the .merkle file - fseek(fm, 0L, SEEK_END); - const size_t len = ftell(fm); - rewind(fm); - - // write the updated hashes - if (hint_mode) - { - // write selectively the updated block hashes - const int fd = fileno(fm); - for (int block : hinted_blocks) - pwrite(fd, &(merkle_tree[block]), HASH_SIZE, HASH_SIZE * block); - } - else - { - // write the whole tree to the file - fwrite(reinterpret_cast(merkle_tree.get()), 1, std::min(BLOCK_SIZE, HASH_SIZE * block_counter), fm); - } - - // compute the root hash - - B2H root_hash; - - if (hint_mode) - { - // if we only updated selective hashes (hint mode) then now we need to compute a hash over the whole merkle file - // so we first need to read it in - rewind(f); - int bytesread = fread(read_buffer.get(), 1, BLOCK_SIZE, f); - if (bytesread <= 0) - fprintf(stderr, "could not read merkle file after writing to it?!\n"); - - // now simply compute the hash of what we just read, that's our root hash - root_hash = hash(read_buffer.get(), std::min(BLOCK_SIZE, (size_t)bytesread)); - } - else - { - // if we've just written out the whole merkle tree we already know it - root_hash = hash(merkle_tree.get(), std::min(BLOCK_SIZE, HASH_SIZE * block_counter)); - } - - fclose(fm); -} - -/** - * Updates the state block map for the files updated as specified by the provided blockmap. - * TODO: This doesn't currently support deleted file tracking. - */ -void update_state_blockmap(const proc::contract_fblockmap_t &updates) -{ - for (const auto &[filepath, blocks] : updates) - { - update_file_blockmap(filepath, blocks); - } -} - -} // namespace cons \ No newline at end of file diff --git a/src/cons/statemap_handler.hpp b/src/cons/statemap_handler.hpp deleted file mode 100644 index 7e295acb..00000000 --- a/src/cons/statemap_handler.hpp +++ /dev/null @@ -1,11 +0,0 @@ -#ifndef _HP_CONS_STATEMAP_HANDLER_ -#define _HP_CONS_STATEMAP_HANDLER_ - -#include "../proc/proc.hpp" - -namespace cons -{ -void update_state_blockmap(const proc::contract_fblockmap_t &updates); -} - -#endif \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 906beb29..3bccc4b9 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -11,6 +11,7 @@ #include "usr/usr.hpp" #include "p2p/p2p.hpp" #include "cons/cons.hpp" +#include "statefs/state_common.hpp" /** * Parses CLI args and extracts hot pocket command and parameters given. @@ -35,7 +36,7 @@ int parse_cmd(int argc, char **argv) { // We inform the conf subsystem to populate the contract directory context values // based on the directory argument from the command line. - conf::set_contract_dir_paths(argv[2]); + conf::set_contract_dir_paths(argv[0], argv[2]); return 0; } @@ -114,7 +115,7 @@ void std_terminate() noexcept int main(int argc, char **argv) { //seed rand - srand(time(0)); + srand(time(0)); // Register exception handler for std exceptions. std::set_terminate(&std_terminate); @@ -172,6 +173,8 @@ int main(int argc, char **argv) if (p2p::init() != 0 || usr::init() != 0 || cons::init() != 0) return -1; + statefs::init(conf::ctx.statehistdir); + // After initializing primary subsystems, register the SIGINT handler. signal(SIGINT, signal_handler); diff --git a/src/proc/proc.cpp b/src/proc/proc.cpp index 167d6968..907ab697 100644 --- a/src/proc/proc.cpp +++ b/src/proc/proc.cpp @@ -4,6 +4,9 @@ #include "../fbschema/common_helpers.hpp" #include "../fbschema/p2pmsg_container_generated.h" #include "../fbschema/p2pmsg_content_generated.h" +#include "../statefs/hasher.hpp" +#include "../statefs/state_common.hpp" +#include "../statefs/hashtree_builder.hpp" #include "proc.hpp" namespace proc @@ -34,6 +37,9 @@ std::vector hpscfds; // Holds the contract process id (if currently executing). pid_t contract_pid; +// Holds the state monitor process id (if currently executing). +pid_t statemon_pid; + /** * Executes the contract process and passes the specified arguments. * @return 0 on successful process creation. -1 on failure or contract process is already running. @@ -48,6 +54,10 @@ int exec_contract(const contract_exec_args &args) if (feed_inputs(args) != 0) return -1; + // Start the state monitor before starting the contract process. + if (start_state_monitor() != 0) + return -1; + const pid_t pid = fork(); if (pid > 0) { @@ -58,7 +68,7 @@ int exec_contract(const contract_exec_args &args) close_unused_fds(true); // Wait for child process (contract process) to complete execution. - const int presult = await_contract_execution(); + const int presult = await_process_execution(contract_pid); LOG_INFO << "Contract process ended."; contract_pid = 0; @@ -68,6 +78,9 @@ int exec_contract(const contract_exec_args &args) return -1; } + if (stop_state_monitor() != 0) + return -1; + // After contract execution, collect contract outputs. if (fetch_outputs(args) != 0) return -1; @@ -92,11 +105,12 @@ int exec_contract(const contract_exec_args &args) execv_args[conf::cfg.runtime_binexec_args.size()] = NULL; int ret = execv(execv_args[0], execv_args); - LOG_ERR << "Execv failed: " << ret; + LOG_ERR << "Contract process execv failed: " << ret; + exit(1); } else { - LOG_ERR << "fork() failed."; + LOG_ERR << "fork() failed when starting contract process."; return -1; } @@ -104,21 +118,84 @@ int exec_contract(const contract_exec_args &args) } /** - * Blocks the calling thread until the contract process compelted exeution (if running). - * @return 0 if contract process exited normally, exit code of contract process if abnormally exited. + * Blocks the calling thread until the specified process compelted exeution (if running). + * @return 0 if process exited normally or exit code of process if abnormally exited. */ -int await_contract_execution() +int await_process_execution(pid_t pid) { - if (contract_pid > 0) + if (pid > 0) { int scstatus; - waitpid(contract_pid, &scstatus, 0); + waitpid(pid, &scstatus, 0); if (!WIFEXITED(scstatus)) return WEXITSTATUS(scstatus); } return 0; } +/** + * Mounts the fuse file system at the contract state dir by starting the state monitor process. + * State monitor will automatically create a state history checkpoint as well. + */ +int start_state_monitor() +{ + pid_t pid = fork(); + if (pid > 0) + { + // HP process. + statemon_pid = pid; + return 0; + } + else if (pid == 0) + { + // State monitor process. + LOG_DBG << "Starting state monitor..."; + + // Fill process args. + char *execv_args[4]; + execv_args[0] = conf::ctx.statemonexepath.data(); + execv_args[1] = conf::ctx.statehistdir.data(); + execv_args[2] = conf::ctx.statedir.data(); + execv_args[3] = NULL; + + int ret = execv(execv_args[0], execv_args); + LOG_ERR << "State monitor execv failed: " << ret; + exit(1); + } + else if (pid < 0) + { + LOG_ERR << "fork() failed when starting state monitor."; + return -1; + } +} + +/** + * Terminate the state monitor and update the latest state hash tree. + */ +int stop_state_monitor() +{ + kill(statemon_pid, SIGINT); + + // Wait for state monitor process to complete execution after the SIGINT. + const int presult = await_process_execution(statemon_pid); + LOG_DBG << "State monitor stopped."; + + statemon_pid = 0; + + if (presult != 0) + LOG_ERR << "State monitor process exited with non-normal status code: " << presult; + + // Update the hash tree. + hasher::B2H statehash = {0, 0, 0, 0}; + statefs::hashtree_builder htreebuilder(statefs::get_statedir_context()); + if (htreebuilder.generate(statehash) != 0) + return -1; + + LOG_DBG << "State hash: " << std::hex << statehash << std::dec; + + return 0; +} + /** * Writes the contract args (JSON) into the stdin of the contract process. * Args format: diff --git a/src/proc/proc.hpp b/src/proc/proc.hpp index 370cbbf3..194641c1 100644 --- a/src/proc/proc.hpp +++ b/src/proc/proc.hpp @@ -80,7 +80,11 @@ int exec_contract(const contract_exec_args &args); //------Internal-use functions for this namespace. -int await_contract_execution(); +int await_process_execution(pid_t pid); + +int start_state_monitor(); + +int stop_state_monitor(); int write_contract_args(const contract_exec_args &args); diff --git a/src/statefs/hasher.cpp b/src/statefs/hasher.cpp new file mode 100644 index 00000000..18bef6b5 --- /dev/null +++ b/src/statefs/hasher.cpp @@ -0,0 +1,55 @@ +#include "hasher.hpp" + +namespace hasher +{ + +// provide some helper functions for working with 32 byte hash type +bool operator==(const B2H &lhs, const B2H &rhs) +{ + return lhs.data[0] == rhs.data[0] && lhs.data[1] == rhs.data[1] && lhs.data[2] == rhs.data[2] && lhs.data[3] == rhs.data[3]; +} + +bool operator!=(const B2H &lhs, const B2H &rhs) +{ + return lhs.data[0] != rhs.data[0] || lhs.data[1] != rhs.data[1] || lhs.data[2] != rhs.data[2] || lhs.data[3] != rhs.data[3]; +} + +void operator^=(B2H &lhs, const B2H &rhs) +{ + lhs.data[0] ^= rhs.data[0]; + lhs.data[1] ^= rhs.data[1]; + lhs.data[2] ^= rhs.data[2]; + lhs.data[3] ^= rhs.data[3]; +} + +std::ostream &operator<<(std::ostream &output, const B2H &h) +{ + output << h.data[0] << h.data[1] << h.data[2] << h.data[3]; + return output; +} + +std::stringstream &operator<<(std::stringstream &output, const B2H &h) +{ + output << std::hex << h; + return output; +} + +// the actual hash function, note that the B2H datatype is always passed by value being only 4 quadwords +B2H hash(const void *buf1, size_t buf1len, const void *buf2, size_t buf2len) +{ + crypto_generichash_blake2b_state state; + crypto_generichash_blake2b_init(&state, NULL, 0, HASH_SIZE); + + crypto_generichash_blake2b_update(&state, + reinterpret_cast(buf1), buf1len); + crypto_generichash_blake2b_update(&state, + reinterpret_cast(buf2), buf2len); + B2H ret; + crypto_generichash_blake2b_final( + &state, + reinterpret_cast(&ret), + HASH_SIZE); + return ret; +} + +} // namespace hasher \ No newline at end of file diff --git a/src/statefs/hasher.hpp b/src/statefs/hasher.hpp new file mode 100644 index 00000000..04869b48 --- /dev/null +++ b/src/statefs/hasher.hpp @@ -0,0 +1,27 @@ +#ifndef _HASHER_ +#define _HASHER_ + +#include "../pchheader.hpp" + +namespace hasher +{ + +constexpr size_t HASH_SIZE = crypto_generichash_blake2b_BYTES; + +struct B2H // blake2b hash is 32 bytes which we store as 4 quad words +{ + uint64_t data[4]; +}; + +// provide some helper functions for working with 32 byte hash type +bool operator==(const B2H &lhs, const B2H &rhs); +bool operator!=(const B2H &lhs, const B2H &rhs); +void operator^=(B2H &lhs, const B2H &rhs); +std::ostream &operator<<(std::ostream &output, const B2H &h); +std::stringstream &operator<<(std::stringstream &output, const B2H &h); + +B2H hash(const void *buf1, size_t buf1len, const void *buf2, size_t buf2len); + +} // namespace hasher + +#endif \ No newline at end of file diff --git a/src/statefs/hashmap_builder.cpp b/src/statefs/hashmap_builder.cpp new file mode 100644 index 00000000..f2f0b344 --- /dev/null +++ b/src/statefs/hashmap_builder.cpp @@ -0,0 +1,334 @@ +#include "../pchheader.hpp" +#include "../hplog.hpp" +#include "state_common.hpp" +#include "hashmap_builder.hpp" +#include "hasher.hpp" + +namespace statefs +{ + +hashmap_builder::hashmap_builder(const statedir_context &ctx) : ctx(ctx) +{ +} + +int hashmap_builder::generate_hashmap_forfile(hasher::B2H &parentdirhash, const std::string &filepath) +{ + // We attempt to avoid a full rebuild of the block hash map file when possible. + // For this optimisation, both the block hash map (.bhmap) file and the + // delta block index (.bindex) file must exist. + + // If the block index exists, we generate/update the hashmap file with the aid of that. + // Block index file contains the updated blockids. If not, we simply rehash all the blocks. + + std::string relpath = get_relpath(filepath, ctx.datadir); + + // Open the actual data file and calculate the block count. + int orifd = open(filepath.data(), O_RDONLY); + if (orifd == -1) + { + LOG_ERR << errno << ": Open failed " << filepath; + return -1; + } + const off_t filelength = lseek(orifd, 0, SEEK_END); + uint32_t blockcount = ceil((double)filelength / (double)BLOCK_SIZE); + + // Attempt to read the existing block hash map file. + std::string bhmapfile; + std::vector bhmapdata; + if (read_blockhashmap(bhmapdata, bhmapfile, relpath) == -1) + return -1; + + hasher::B2H oldfilehash = {0, 0, 0, 0}; + if (!bhmapdata.empty()) + memcpy(&oldfilehash, bhmapdata.data(), hasher::HASH_SIZE); + + // Attempt to read the delta block index file. + std::map bindex; + uint32_t original_blockcount; + if (get_blockindex(bindex, original_blockcount, relpath) == -1) + return -1; + + // Array to contain the updated block hashes. Slot 0 is for the root hash. + // Allocating hash array on the heap to avoid filling limited stack space. + std::unique_ptr hashes = std::make_unique(1 + blockcount); + const size_t hashes_size = (1 + blockcount) * hasher::HASH_SIZE; + + if (update_hashes(hashes.get(), hashes_size, relpath, orifd, blockcount, original_blockcount, bindex, bhmapdata) == -1) + return -1; + + if (write_blockhashmap(bhmapfile, hashes.get(), hashes_size) == -1) + return -1; + + if (update_hashtree_entry(parentdirhash, !bhmapdata.empty(), oldfilehash, hashes[0], bhmapfile, relpath) == -1) + return -1; + + return 0; +} + +int hashmap_builder::read_blockhashmap(std::vector &bhmapdata, std::string &bhmapfile, const std::string &relpath) +{ + bhmapfile.reserve(ctx.blockhashmapdir.length() + relpath.length() + HASHMAP_EXT_LEN); + bhmapfile.append(ctx.blockhashmapdir).append(relpath).append(HASHMAP_EXT); + + if (boost::filesystem::exists(bhmapfile)) + { + int hmapfd = open(bhmapfile.c_str(), O_RDONLY); + if (hmapfd == -1) + { + LOG_ERR << errno << ": Open failed " << bhmapfile; + return -1; + } + + off_t size = lseek(hmapfd, 0, SEEK_END); + bhmapdata.resize(size); + + if (pread(hmapfd, bhmapdata.data(), size, 0) == -1) + { + LOG_ERR << errno << ": Read failed " << bhmapfile; + return -1; + } + } + else + { + // Create directory tree if not exist so we are able to create the hashmap files. + boost::filesystem::path hmapsubdir = boost::filesystem::path(bhmapfile).parent_path(); + if (created_bhmapsubdirs.count(hmapsubdir.string()) == 0) + { + boost::filesystem::create_directories(hmapsubdir); + created_bhmapsubdirs.emplace(hmapsubdir.string()); + } + } + + return 0; +} + +int hashmap_builder::get_blockindex(std::map &idxmap, uint32_t &totalblockcount, const std::string &filerelpath) +{ + std::string bindexfile; + bindexfile.reserve(ctx.deltadir.length() + filerelpath.length() + BLOCKINDEX_EXT_LEN); + bindexfile.append(ctx.deltadir).append(filerelpath).append(BLOCKINDEX_EXT); + + if (boost::filesystem::exists(bindexfile)) + { + std::ifstream infile(bindexfile, std::ios::binary | std::ios::ate); + std::streamsize idxsize = infile.tellg(); + infile.seekg(0, std::ios::beg); + + // Read the block index file into a vector. + std::vector bindex(idxsize); + if (infile.read(bindex.data(), idxsize)) + { + // First 8 bytes contain the original file length. + off_t orifilelen; + memcpy(&orifilelen, bindex.data(), 8); + totalblockcount = ceil((double)orifilelen / (double)BLOCK_SIZE); + + // Skip the first 8 bytes and loop through index entries. + for (uint32_t idxoffset = 8; idxoffset < bindex.size();) + { + // Read the block no. (4 bytes) of where this block is from in the original file. + uint32_t blockno = 0; + memcpy(&blockno, bindex.data() + idxoffset, 4); + idxoffset += 12; // Skip the cached block offset (8 bytes) + + // Read the block hash (32 bytes). + hasher::B2H hash; + memcpy(&hash, bindex.data() + idxoffset, 32); + idxoffset += 32; + + idxmap.try_emplace(blockno, hash); + } + } + else + { + LOG_ERR << errno << ": Read failed " << bindexfile; + return -1; + } + + infile.close(); + } + + return 0; +} + +int hashmap_builder::update_hashes( + hasher::B2H *hashes, const off_t hashes_size, const std::string &relpath, const int orifd, + const uint32_t blockcount, const uint32_t original_blockcount, const std::map &bindex, const std::vector &bhmapdata) +{ + uint32_t nohint_blockstart = 0; + + // If both existing delta block index and block hash map is available, we can just overlay the + // changed block hashes (mentioned in the delta block index) on top of the old block hashes. + // This would prevent unncessarily hashing lot of blocks. + if (!bhmapdata.empty() && !bindex.empty()) + { + // Load old hashes. + memcpy(hashes, bhmapdata.data(), hashes_size < bhmapdata.size() ? hashes_size : bhmapdata.size()); + + // Refer to the block index and rehash the changed blocks. + for (const auto [blockid, oldhash] : bindex) + { + // If the blockid from the block index is no longer there, that means the current file is + // shorter than the previous version. So we can stop hashing at this point. + if (blockid >= blockcount) + break; + + if (compute_blockhash(hashes[blockid + 1], blockid, orifd, relpath) == -1) + return -1; + } + + // If the current file has more blocks than the previous version, we need to hash those + // additional blocks as well. + if (blockcount > original_blockcount) + nohint_blockstart = original_blockcount; + else + nohint_blockstart = blockcount; // No additional blocks remaining. + } + + //Hash any additional blocks that has to be hashed without the guidance of block index. + for (uint32_t blockid = nohint_blockstart; blockid < blockcount; blockid++) + { + if (compute_blockhash(hashes[blockid + 1], blockid, orifd, relpath) == -1) + return -1; + } + + // Calculate the new file hash: filehash = HASH(filename + XOR(block hashes)) + hasher::B2H filehash{0, 0, 0, 0}; + for (int i = 1; i <= blockcount; i++) + filehash ^= hashes[i]; + + // Rehash the file hash with filename included. + const std::string filename = boost::filesystem::path(relpath.data()).filename().string(); + filehash = hasher::hash(filename.c_str(), filename.length(), &filehash, hasher::HASH_SIZE); + + hashes[0] = filehash; + return 0; +} + +int hashmap_builder::compute_blockhash(hasher::B2H &hash, uint32_t blockid, int filefd, const std::string &relpath) +{ + // Allocating block buffer on the heap to avoid filling limited stack space. + std::unique_ptr blockbuf = std::make_unique(BLOCK_SIZE); + const off_t blockoffset = BLOCK_SIZE * blockid; + size_t bytesread = pread(filefd, blockbuf.get(), BLOCK_SIZE, blockoffset); + if (bytesread == -1) + { + LOG_ERR << errno << ": Read failed " << relpath; + return -1; + } + + hash = hasher::hash(&blockoffset, 8, blockbuf.get(), bytesread); + return 0; +} + +int hashmap_builder::write_blockhashmap(const std::string &bhmapfile, const hasher::B2H *hashes, const off_t hashes_size) +{ + int hmapfd = open(bhmapfile.c_str(), O_RDWR | O_TRUNC | O_CREAT, FILE_PERMS); + if (hmapfd == -1) + { + LOG_ERR << errno << ": Open failed " << bhmapfile; + return -1; + } + + // Write the updated hash list into the block hash map file. + if (pwrite(hmapfd, hashes, hashes_size, 0) == -1) + { + LOG_ERR << errno << ": Write failed " << bhmapfile; + return -1; + } + + if (ftruncate(hmapfd, hashes_size) == -1) + { + LOG_ERR << errno << ": Truncate failed " << bhmapfile; + return -1; + } +} + +int hashmap_builder::update_hashtree_entry(hasher::B2H &parentdirhash, const bool oldbhmap_exists, const hasher::B2H oldfilehash, const hasher::B2H newfilehash, const std::string &bhmapfile, const std::string &relpath) +{ + std::string hardlinkdir(ctx.hashtreedir); + const std::string relpathdir = boost::filesystem::path(relpath).parent_path().string(); + + hardlinkdir.append(relpathdir); + if (relpathdir != "/") + hardlinkdir.append("/"); + + std::stringstream newhlpath; + newhlpath << hardlinkdir << newfilehash << ".rh"; + + if (oldbhmap_exists) + { + // Rename the existing hard link if old block hash map existed. + // We thereby assume the old hard link also existed. + std::stringstream oldhlpath; + oldhlpath << hardlinkdir << oldfilehash << ".rh"; + if (rename(oldhlpath.str().c_str(), newhlpath.str().c_str()) == -1) + return -1; + + // Subtract the old root hash and add the new root hash from the parent dir hash. + parentdirhash ^= oldfilehash; + parentdirhash ^= newfilehash; + } + else + { + // Create a new hard link with new root hash as the name. + if (link(bhmapfile.c_str(), newhlpath.str().c_str()) == -1) + return -1; + + // Add the new root hash to parent hash. + parentdirhash ^= newfilehash; + } + + return 0; +} + +int hashmap_builder::remove_hashmapfile(hasher::B2H &parentdirhash, const std::string &bhmapfile) +{ + if (boost::filesystem::exists(bhmapfile)) + { + int hmapfd = open(bhmapfile.data(), O_RDONLY); + if (hmapfd == -1) + { + LOG_ERR << errno << ": Open failed " << bhmapfile; + return -1; + } + + hasher::B2H filehash; + if (read(hmapfd, &filehash, hasher::HASH_SIZE) == -1) + { + LOG_ERR << errno << ": Read failed " << bhmapfile; + return -1; + } + + // Delete the .bhmap file. + if (remove(bhmapfile.c_str()) == -1) + { + LOG_ERR << errno << ": Delete failed " << bhmapfile; + return -1; + } + + // Delete the hardlink of the .bhmap file. + std::string hardlinkdir(ctx.hashtreedir); + const std::string relpath = get_relpath(bhmapfile, ctx.blockhashmapdir); + const std::string relpathdir = boost::filesystem::path(relpath).parent_path().string(); + + hardlinkdir.append(relpathdir); + if (relpathdir != "/") + hardlinkdir.append("/"); + + std::stringstream hlpath; + hlpath << hardlinkdir << filehash << ".rh"; + if (remove(hlpath.str().c_str()) == -1) + { + LOG_ERR << errno << ": Delete failed for halrd link " << filehash << " of " << bhmapfile; + return -1; + } + + // XOR parent dir hash with file hash so the file hash gets removed from parent dir hash. + parentdirhash ^= filehash; + } + + return 0; +} + +} // namespace statefs \ No newline at end of file diff --git a/src/statefs/hashmap_builder.hpp b/src/statefs/hashmap_builder.hpp new file mode 100644 index 00000000..2aafc441 --- /dev/null +++ b/src/statefs/hashmap_builder.hpp @@ -0,0 +1,35 @@ +#ifndef _STATEFS_HASHMAP_BUILDER_ +#define _STATEFS_HASHMAP_BUILDER_ + +#include "../pchheader.hpp" +#include "hasher.hpp" +#include "state_common.hpp" + +namespace statefs +{ + +class hashmap_builder +{ +private: + const statedir_context ctx; + // List of new block hash map sub directories created during the session. + std::unordered_set created_bhmapsubdirs; + + int read_blockhashmap(std::vector &bhmapdata, std::string &hmapfile, const std::string &relpath); + int get_blockindex(std::map &idxmap, uint32_t &totalblockcount, const std::string &filerelpath); + int update_hashes( + hasher::B2H *hashes, const off_t hashes_size, const std::string &relpath, const int orifd, + const uint32_t blockcount, const uint32_t original_blockcount, const std::map &bindex, const std::vector &bhmapdata); + int compute_blockhash(hasher::B2H &hash, uint32_t blockid, int filefd, const std::string &relpath); + int write_blockhashmap(const std::string &bhmapfile, const hasher::B2H *hashes, const off_t hashes_size); + int update_hashtree_entry(hasher::B2H &parentdirhash, const bool oldbhmap_exists, const hasher::B2H oldfilehash, const hasher::B2H newfilehash, const std::string &bhmapfile, const std::string &relpath); + +public: + hashmap_builder(const statedir_context &ctx); + int generate_hashmap_forfile(hasher::B2H &parentdirhash, const std::string &filepath); + int remove_hashmapfile(hasher::B2H &parentdirhash, const std::string &filepath); +}; + +} // namespace statefs + +#endif diff --git a/src/statefs/hashtree_builder.cpp b/src/statefs/hashtree_builder.cpp new file mode 100644 index 00000000..fd3d1fcd --- /dev/null +++ b/src/statefs/hashtree_builder.cpp @@ -0,0 +1,246 @@ +#include "../pchheader.hpp" +#include "hashtree_builder.hpp" +#include "state_restore.hpp" +#include "state_common.hpp" + +namespace statefs +{ + +hashtree_builder::hashtree_builder(const statedir_context &ctx) : ctx(ctx), hmapbuilder(ctx) +{ +} + +int hashtree_builder::generate(hasher::B2H &roothash) +{ + // Load modified file path hints if available. + populate_hintpaths(IDX_TOUCHEDFILES); + populate_hintpaths(IDX_NEWFILES); + hintmode = !hintpaths.empty(); + + traversel_rootdir = ctx.datadir; + removal_mode = false; + if (update_hashtree(roothash) != 0) + return -1; + + // If there are any remaining hint files directly under this directory, that means + // those files are no longer there. So we need to delete the corresponding .bhmap and rh files + // and adjust the directory hash accordingly. + if (hintmode && !hintpaths.empty()) + { + traversel_rootdir = ctx.blockhashmapdir; + removal_mode = true; + if (update_hashtree(roothash) != 0) + return -1; + } + + return 0; +} + +int hashtree_builder::update_hashtree(hasher::B2H &roothash) +{ + hintpath_map::iterator hintdir_itr = hintpaths.end(); + if (!should_process_dir(hintdir_itr, traversel_rootdir)) + return 0; + + if (update_hashtree_fordir(roothash, traversel_rootdir, hintdir_itr, true) != 0) + return -1; + + return 0; +} + +int hashtree_builder::update_hashtree_fordir(hasher::B2H &parentdirhash, const std::string &dirpath, const hintpath_map::iterator hintdir_itr, const bool isrootlevel) +{ + const std::string htreedirpath = switch_basepath(dirpath, traversel_rootdir, ctx.hashtreedir); + + // Load current dir hash if exist. + const std::string dirhashfile = htreedirpath + "/" + DIRHASH_FNAME; + hasher::B2H dirhash = get_existingdirhash(dirhashfile); + + // Remember the dir hash before we mutate it. + hasher::B2H original_dirhash = dirhash; + + // Iterate files/subdirs inside this dir. + const boost::filesystem::directory_iterator itrend; + for (boost::filesystem::directory_iterator itr(dirpath); itr != itrend; itr++) + { + const bool isdir = boost::filesystem::is_directory(itr->path()); + const std::string pathstr = itr->path().string(); + + if (isdir) + { + hintpath_map::iterator hintsubdir_itr = hintpaths.end(); + if (!should_process_dir(hintsubdir_itr, pathstr)) + continue; + + if (update_hashtree_fordir(dirhash, pathstr, hintsubdir_itr, false) != 0) + return -1; + } + else + { + if (!should_process_file(hintdir_itr, pathstr)) + continue; + + if (process_file(dirhash, pathstr, htreedirpath) != 0) + return -1; + } + } + + // If there are no more files in the hint dir, delete the hint dir entry as well. + if (hintdir_itr != hintpaths.end() && hintdir_itr->second.empty()) + hintpaths.erase(hintdir_itr); + + // In removalmode, we check whether the dir is empty. If so we remove the dir as well. + if (removal_mode && boost::filesystem::is_empty(dirpath)) + { + // We remove the dirs if we are below root level only. + // Otherwise we only remove root dir.hash file. + if (!isrootlevel) + { + boost::filesystem::remove_all(dirpath); + boost::filesystem::remove_all(htreedirpath); + } + else + { + boost::filesystem::remove(dirhashfile); + } + + // Subtract the original dir hash from the parent dir hash. + parentdirhash ^= original_dirhash; + } + else if (dirhash != original_dirhash) + { + // If dir hash has changed, write it back to dir hash file. + if (save_dirhash(dirhashfile, dirhash) == -1) + return -1; + + // Also update the parent dir hash by subtracting the old hash and adding the new hash. + parentdirhash ^= original_dirhash; + parentdirhash ^= dirhash; + } + + return 0; +} + +hasher::B2H hashtree_builder::get_existingdirhash(const std::string &dirhashfile) +{ + // Load current dir hash if exist. + hasher::B2H dirhash{0, 0, 0, 0}; + int dirhashfd = open(dirhashfile.c_str(), O_RDONLY); + if (dirhashfd > 0) + { + read(dirhashfd, &dirhash, hasher::HASH_SIZE); + close(dirhashfd); + } + return dirhash; +} + +int hashtree_builder::save_dirhash(const std::string &dirhashfile, hasher::B2H dirhash) +{ + int dirhashfd = open(dirhashfile.c_str(), O_RDWR | O_TRUNC | O_CREAT, FILE_PERMS); + if (dirhashfd == -1) + return -1; + + if (write(dirhashfd, &dirhash, hasher::HASH_SIZE) == -1) + { + close(dirhashfd); + return -1; + } + + close(dirhashfd); + return 0; +} + +inline bool hashtree_builder::should_process_dir(hintpath_map::iterator &dir_itr, const std::string &dirpath) +{ + return (hintmode ? get_hinteddir_match(dir_itr, dirpath) : true); +} + +bool hashtree_builder::should_process_file(const hintpath_map::iterator hintdir_itr, const std::string filepath) +{ + if (hintmode) + { + if (hintdir_itr == hintpaths.end()) + return false; + + std::string relpath = get_relpath(filepath, traversel_rootdir); + + // If in removal mode, we are traversing .bhmap files. Hence we should truncate .bhmap extension + // before we search for the path in file hints. + if (removal_mode) + relpath = relpath.substr(0, relpath.length() - HASHMAP_EXT_LEN); + + std::unordered_set &hintfiles = hintdir_itr->second; + const auto hintfile_itr = hintfiles.find(relpath); + if (hintfile_itr == hintfiles.end()) + return false; + + // Erase the visiting filepath from hint files. + hintfiles.erase(hintfile_itr); + } + return true; +} + +int hashtree_builder::process_file(hasher::B2H &parentdirhash, const std::string &filepath, const std::string &htreedirpath) +{ + if (!removal_mode) + { + // Create directory tree if not exist so we are able to create the file root hash files (hard links). + if (created_htreesubdirs.count(htreedirpath) == 0) + { + boost::filesystem::create_directories(htreedirpath); + created_htreesubdirs.emplace(htreedirpath); + } + + if (hmapbuilder.generate_hashmap_forfile(parentdirhash, filepath) == -1) + return -1; + } + else + { + if (hmapbuilder.remove_hashmapfile(parentdirhash, filepath) == -1) + return -1; + } + + return 0; +} + +void hashtree_builder::populate_hintpaths(const char *const idxfile) +{ + std::ifstream infile(std::string(ctx.deltadir).append(idxfile)); + if (!infile.fail()) + { + for (std::string relpath; std::getline(infile, relpath);) + { + std::string parentdir = boost::filesystem::path(relpath).parent_path().string(); + hintpaths[parentdir].emplace(relpath); + } + infile.close(); + } +} + +bool hashtree_builder::get_hinteddir_match(hintpath_map::iterator &matchitr, const std::string &dirpath) +{ + // First check whether there's an exact match. If not check for a partial match. + // Exact match will return the iterator. Partial match or not found will return end() iterator. + const std::string relpath = get_relpath(dirpath, traversel_rootdir); + const auto exactmatchitr = hintpaths.find(relpath); + + if (exactmatchitr != hintpaths.end()) + { + matchitr = exactmatchitr; + return true; + } + + for (auto itr = hintpaths.begin(); itr != hintpaths.end(); itr++) + { + if (strncmp(relpath.c_str(), itr->first.c_str(), relpath.length()) == 0) + { + // Partial match found. + matchitr = hintpaths.end(); + return true; + } + } + + return false; // Not found at all. +} + +} // namespace statefs \ No newline at end of file diff --git a/src/statefs/hashtree_builder.hpp b/src/statefs/hashtree_builder.hpp new file mode 100644 index 00000000..f9819a68 --- /dev/null +++ b/src/statefs/hashtree_builder.hpp @@ -0,0 +1,48 @@ +#ifndef _STATEFS_HASHTREE_BUILDER_ +#define _STATEFS_HASHTREE_BUILDER_ + +#include "../pchheader.hpp" +#include "hasher.hpp" +#include "hashmap_builder.hpp" +#include "state_common.hpp" + +namespace statefs +{ + +typedef std::unordered_map> hintpath_map; + +class hashtree_builder +{ +private: + const statedir_context ctx; + hashmap_builder hmapbuilder; + + // Hint path map with parent dir as key and list of file paths under each parent dir. + hintpath_map hintpaths; + bool hintmode; + bool removal_mode; + std::string traversel_rootdir; + + // List of new root hash map sub directories created during the session. + std::unordered_set created_htreesubdirs; + + int update_hashtree(hasher::B2H &roothash); + int update_hashtree_fordir(hasher::B2H &parentdirhash, const std::string &relpath, const hintpath_map::iterator hintdir_itr, const bool isrootlevel); + + hasher::B2H get_existingdirhash(const std::string &dirhashfile); + int save_dirhash(const std::string &dirhashfile, hasher::B2H dirhash); + bool should_process_dir(hintpath_map::iterator &hintsubdir_itr, const std::string &dirpath); + bool should_process_file(const hintpath_map::iterator hintdir_itr, const std::string filepath); + int process_file(hasher::B2H &parentdirhash, const std::string &filepath, const std::string &htreedirpath); + int update_hashtree_entry(hasher::B2H &parentdirhash, const bool oldbhmap_exists, const hasher::B2H oldfilehash, const hasher::B2H newfilehash, const std::string &bhmapfile, const std::string &relpath); + void populate_hintpaths(const char *const idxfile); + bool get_hinteddir_match(hintpath_map::iterator &matchitr, const std::string &dirpath); + +public: + hashtree_builder(const statedir_context &ctx); + int generate(hasher::B2H &roothash); +}; + +} // namespace statefs + +#endif diff --git a/src/statefs/state_common.cpp b/src/statefs/state_common.cpp new file mode 100644 index 00000000..a55bbc0a --- /dev/null +++ b/src/statefs/state_common.cpp @@ -0,0 +1,60 @@ +#include +#include +#include "state_common.hpp" + +namespace statefs +{ + +std::string statehistdir; + +void init(const std::string &statehist_dir_root) +{ + statehistdir = realpath(statehist_dir_root.c_str(), NULL); + + // Initialize 0 state (current state) directory. + get_statedir_context(0, true); +} + +std::string get_statedir_root(const int16_t checkpointid) +{ + return statehistdir + "/" + std::to_string(checkpointid); +} + +statedir_context get_statedir_context(const int16_t checkpointid, const bool createdirs) +{ + statedir_context ctx; + ctx.rootdir = get_statedir_root(checkpointid); + ctx.datadir = ctx.rootdir + DATA_DIR; + ctx.blockhashmapdir = ctx.rootdir + BHMAP_DIR; + ctx.hashtreedir = ctx.rootdir + HTREE_DIR; + ctx.deltadir = ctx.rootdir + DELTA_DIR; + + if (createdirs) + { + if (!boost::filesystem::exists(ctx.datadir)) + boost::filesystem::create_directories(ctx.datadir); + if (!boost::filesystem::exists(ctx.blockhashmapdir)) + boost::filesystem::create_directories(ctx.blockhashmapdir); + if (!boost::filesystem::exists(ctx.hashtreedir)) + boost::filesystem::create_directories(ctx.hashtreedir); + if (!boost::filesystem::exists(ctx.deltadir)) + boost::filesystem::create_directories(ctx.deltadir); + } + + return ctx; +} + +std::string get_relpath(const std::string &fullpath, const std::string &base_path) +{ + std::string relpath = fullpath.substr(base_path.length(), fullpath.length() - base_path.length()); + if (relpath.empty()) + relpath = "/"; + return relpath; +} + +std::string switch_basepath(const std::string &fullpath, const std::string &from_base_path, const std::string &to_base_path) +{ + return to_base_path + get_relpath(fullpath, from_base_path); +} + +} // namespace statefs \ No newline at end of file diff --git a/src/statefs/state_common.hpp b/src/statefs/state_common.hpp new file mode 100644 index 00000000..64dc00ea --- /dev/null +++ b/src/statefs/state_common.hpp @@ -0,0 +1,60 @@ +#ifndef _STATEFS_STATE_COMMON_ +#define _STATEFS_STATE_COMMON_ + +#include +#include +#include "hasher.hpp" + +namespace statefs +{ + +constexpr int16_t MAX_CHECKPOINTS = 5; + +// Cache block size. +constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB + +// Cache block index entry bytes length. +constexpr size_t BLOCKINDEX_ENTRY_SIZE = 44; +constexpr size_t MAX_HASHES = BLOCK_SIZE / hasher::HASH_SIZE; + +// Permissions used when creating block cache and index files. +constexpr int FILE_PERMS = 0644; + +const char *const HASHMAP_EXT = ".bhmap"; +constexpr size_t HASHMAP_EXT_LEN = 6; + +const char *const BLOCKINDEX_EXT = ".bindex"; +constexpr size_t BLOCKINDEX_EXT_LEN = 7; + +const char *const BLOCKCACHE_EXT = ".bcache"; +constexpr size_t BLOCKCACHE_EXT_LEN = 7; + +const char *const IDX_NEWFILES = "/idxnew.idx"; +const char *const IDX_TOUCHEDFILES = "/idxtouched.idx"; +const char *const DIRHASH_FNAME = "dir.hash"; + +const char *const DATA_DIR = "/data"; +const char *const BHMAP_DIR = "/bhmap"; +const char *const HTREE_DIR = "/htree"; +const char *const DELTA_DIR = "/delta"; + +extern std::string statehistdir; + +struct statedir_context +{ + std::string rootdir; + std::string datadir; + std::string blockhashmapdir; + std::string hashtreedir; + std::string deltadir; +}; + +void init(const std::string &statehist_dir_root); +std::string get_statedir_root(const int16_t checkpointid); +statedir_context get_statedir_context(int16_t checkpointid = 0, bool createdirs = false); +std::string get_relpath(const std::string &fullpath, const std::string &base_path); +std::string switch_basepath(const std::string &fullpath, const std::string &from_base_path, const std::string &to_base_path); + +} // namespace statefs + +#endif \ No newline at end of file diff --git a/src/statefs/state_monitor/fusefs.cpp b/src/statefs/state_monitor/fusefs.cpp new file mode 100644 index 00000000..e9e768cf --- /dev/null +++ b/src/statefs/state_monitor/fusefs.cpp @@ -0,0 +1,1342 @@ +/* + * Code copied and adopted from https://github.com/libfuse/libfuse/blob/master/example/passthrough_hp.cc + */ + +#define FUSE_USE_VERSION 35 + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +// C includes +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// C++ includes +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "state_monitor.hpp" +#include "../state_common.hpp" + +using namespace std; + +// Uniquely identifies a file in the source directory tree. This could +// be simplified to just ino_t since we require the source directory +// not to contain any mountpoints. This hasn't been done yet in case +// we need to reconsider this constraint (but relaxing this would have +// the drawback that we can no longer re-use inode numbers, and thus +// readdir() would need to do a full lookup() in order to report the +// right inode number). +typedef std::pair SrcId; + +// Define a hash function for SrcId +namespace std +{ +template <> +struct hash +{ + size_t operator()(const SrcId &id) const + { + return hash{}(id.first) ^ hash{}(id.second); + } +}; +} // namespace std + +namespace helpers +{ + +int getfilepath(std::string &filepath, int parentfd, const char *filename) +{ + // Get parent directory path using the parentfd. + char proclnk[32]; + char parentpath[PATH_MAX]; + sprintf(proclnk, "/proc/self/fd/%d", parentfd); + ssize_t parentlen = readlink(proclnk, parentpath, PATH_MAX); + if (parentlen > 0) + { + // Concat parent dir path and filename to get the full path. + filepath.reserve(parentlen + strlen(filename) + 1); + filepath.append(parentpath, parentlen).append("/").append(filename); + return 0; + } + return -1; +} + +} // namespace helpers + +namespace fusefs +{ + +/* We are re-using pointers to our `struct sfs_inode` and `struct + sfs_dirp` elements as inodes and file handles. This means that we + must be able to store pointer a pointer in both a fuse_ino_t + variable and a uint64_t variable (used for file handles). */ +static_assert(sizeof(fuse_ino_t) >= sizeof(void *), + "void* must fit into fuse_ino_t"); +static_assert(sizeof(fuse_ino_t) >= sizeof(uint64_t), + "fuse_ino_t must be at least 64 bits"); + +/* Forward declarations */ +struct Inode; +static Inode &get_inode(fuse_ino_t ino); +static void forget_one(fuse_ino_t ino, uint64_t n); + +// Maps files in the source directory tree to inodes +typedef std::unordered_map InodeMap; + +struct Inode +{ + int fd{-1}; + bool is_symlink{false}; + dev_t src_dev{0}; + ino_t src_ino{0}; + uint64_t nlookup{0}; + std::mutex m; + + // Delete copy constructor and assignments. We could implement + // move if we need it. + Inode() = default; + Inode(const Inode &) = delete; + Inode(Inode &&inode) = delete; + Inode &operator=(Inode &&inode) = delete; + Inode &operator=(const Inode &) = delete; + + ~Inode() + { + if (fd > 0) + close(fd); + } +}; + +struct Fs +{ + // Must be acquired *after* any Inode.m locks. + std::mutex mutex; + InodeMap inodes; // protected by mutex + Inode root; + double timeout; + bool debug; + std::string source; + size_t blocksize; + dev_t src_dev; + bool nosplice; + bool nocache; +}; +static Fs fs{}; +static statefs::state_monitor statemonitor; + +#define FUSE_BUF_COPY_FLAGS \ + (fs.nosplice ? FUSE_BUF_NO_SPLICE : static_cast(0)) + +static Inode &get_inode(fuse_ino_t ino) +{ + if (ino == FUSE_ROOT_ID) + return fs.root; + + Inode *inode = reinterpret_cast(ino); + if (inode->fd == -1) + { + cerr << "INTERNAL ERROR: Unknown inode " << ino << endl; + abort(); + } + return *inode; +} + +static int get_fs_fd(fuse_ino_t ino) +{ + int fd = get_inode(ino).fd; + return fd; +} + +static void sfs_init(void *userdata, fuse_conn_info *conn) +{ + (void)userdata; + if (conn->capable & FUSE_CAP_EXPORT_SUPPORT) + conn->want |= FUSE_CAP_EXPORT_SUPPORT; + + if (fs.timeout && conn->capable & FUSE_CAP_WRITEBACK_CACHE) + conn->want |= FUSE_CAP_WRITEBACK_CACHE; + + if (conn->capable & FUSE_CAP_FLOCK_LOCKS) + conn->want |= FUSE_CAP_FLOCK_LOCKS; + + // Use splicing if supported. Since we are using writeback caching + // and readahead, individual requests should have a decent size so + // that splicing between fd's is well worth it. + if (conn->capable & FUSE_CAP_SPLICE_WRITE && !fs.nosplice) + conn->want |= FUSE_CAP_SPLICE_WRITE; + if (conn->capable & FUSE_CAP_SPLICE_READ && !fs.nosplice) + conn->want |= FUSE_CAP_SPLICE_READ; +} + +static void sfs_getattr(fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi) +{ + (void)fi; + Inode &inode = get_inode(ino); + struct stat attr; + auto res = fstatat(inode.fd, "", &attr, + AT_EMPTY_PATH | AT_SYMLINK_NOFOLLOW); + if (res == -1) + { + fuse_reply_err(req, errno); + return; + } + fuse_reply_attr(req, &attr, fs.timeout); +} + +#ifdef HAVE_UTIMENSAT +static int utimensat_empty_nofollow(Inode &inode, + const struct timespec *tv) +{ + if (inode.is_symlink) + { + /* Does not work on current kernels, but may in the future: + https://marc.info/?l=linux-kernel&m=154158217810354&w=2 */ + auto res = utimensat(inode.fd, "", tv, AT_EMPTY_PATH | AT_SYMLINK_NOFOLLOW); + if (res == -1 && errno == EINVAL) + { + /* Sorry, no race free way to set times on symlink. */ + errno = EPERM; + } + return res; + } + + char procname[64]; + sprintf(procname, "/proc/self/fd/%i", inode.fd); + + return utimensat(AT_FDCWD, procname, tv, 0); +} +#endif + +static void do_setattr(fuse_req_t req, fuse_ino_t ino, struct stat *attr, + int valid, struct fuse_file_info *fi) +{ + Inode &inode = get_inode(ino); + int ifd = inode.fd; + int res; + + if (valid & FUSE_SET_ATTR_MODE) + { + if (fi) + { + res = fchmod(fi->fh, attr->st_mode); + } + else + { + char procname[64]; + sprintf(procname, "/proc/self/fd/%i", ifd); + res = chmod(procname, attr->st_mode); + } + if (res == -1) + goto out_err; + } + if (valid & (FUSE_SET_ATTR_UID | FUSE_SET_ATTR_GID)) + { + uid_t uid = (valid & FUSE_SET_ATTR_UID) ? attr->st_uid : static_cast(-1); + gid_t gid = (valid & FUSE_SET_ATTR_GID) ? attr->st_gid : static_cast(-1); + + res = fchownat(ifd, "", uid, gid, AT_EMPTY_PATH | AT_SYMLINK_NOFOLLOW); + if (res == -1) + goto out_err; + } + if (valid & FUSE_SET_ATTR_SIZE) + { + if (fi) + { + res = ftruncate(fi->fh, attr->st_size); + } + else + { + char procname[64]; + sprintf(procname, "/proc/self/fd/%i", ifd); + res = truncate(procname, attr->st_size); + } + if (res == -1) + goto out_err; + } + if (valid & (FUSE_SET_ATTR_ATIME | FUSE_SET_ATTR_MTIME)) + { + struct timespec tv[2]; + + tv[0].tv_sec = 0; + tv[1].tv_sec = 0; + tv[0].tv_nsec = UTIME_OMIT; + tv[1].tv_nsec = UTIME_OMIT; + + if (valid & FUSE_SET_ATTR_ATIME_NOW) + tv[0].tv_nsec = UTIME_NOW; + else if (valid & FUSE_SET_ATTR_ATIME) + tv[0] = attr->st_atim; + + if (valid & FUSE_SET_ATTR_MTIME_NOW) + tv[1].tv_nsec = UTIME_NOW; + else if (valid & FUSE_SET_ATTR_MTIME) + tv[1] = attr->st_mtim; + + if (fi) + res = futimens(fi->fh, tv); + else + { +#ifdef HAVE_UTIMENSAT + res = utimensat_empty_nofollow(inode, tv); +#else + res = -1; + errno = EOPNOTSUPP; +#endif + } + if (res == -1) + goto out_err; + } + return sfs_getattr(req, ino, fi); + +out_err: + fuse_reply_err(req, errno); +} + +static void sfs_setattr(fuse_req_t req, fuse_ino_t ino, struct stat *attr, + int valid, fuse_file_info *fi) +{ + // We use some conditions to detect truncate call. + if (fi != NULL && fi->fh > 0 && attr->st_size > 0) + statemonitor.ontruncate(fi->fh, attr->st_size); + + (void)ino; + do_setattr(req, ino, attr, valid, fi); +} + +static int do_lookup(fuse_ino_t parent, const char *name, + fuse_entry_param *e) +{ + if (fs.debug) + cerr << "DEBUG: lookup(): name=" << name + << ", parent=" << parent << endl; + memset(e, 0, sizeof(*e)); + e->attr_timeout = fs.timeout; + e->entry_timeout = fs.timeout; + + auto newfd = openat(get_fs_fd(parent), name, O_PATH | O_NOFOLLOW); + if (newfd == -1) + return errno; + + auto res = fstatat(newfd, "", &e->attr, AT_EMPTY_PATH | AT_SYMLINK_NOFOLLOW); + if (res == -1) + { + auto saveerr = errno; + close(newfd); + if (fs.debug) + cerr << "DEBUG: lookup(): fstatat failed" << endl; + return saveerr; + } + + if (e->attr.st_dev != fs.src_dev) + { + cerr << "WARNING: Mountpoints in the source directory tree will be hidden." << endl; + return ENOTSUP; + } + else if (e->attr.st_ino == FUSE_ROOT_ID) + { + cerr << "ERROR: Source directory tree must not include inode " + << FUSE_ROOT_ID << endl; + return EIO; + } + + SrcId id{e->attr.st_ino, e->attr.st_dev}; + unique_lock fs_lock{fs.mutex}; + Inode *inode_p; + try + { + inode_p = &fs.inodes[id]; + } + catch (std::bad_alloc &) + { + return ENOMEM; + } + e->ino = reinterpret_cast(inode_p); + Inode &inode{*inode_p}; + + if (inode.fd != -1) + { // found existing inode + fs_lock.unlock(); + if (fs.debug) + cerr << "DEBUG: lookup(): inode " << e->attr.st_ino + << " (userspace) already known." << endl; + lock_guard g{inode.m}; + inode.nlookup++; + close(newfd); + } + else + { // no existing inode + /* This is just here to make Helgrind happy. It violates the + lock ordering requirement (inode.m must be acquired before + fs.mutex), but this is of no consequence because at this + point no other thread has access to the inode mutex */ + lock_guard g{inode.m}; + inode.src_ino = e->attr.st_ino; + inode.src_dev = e->attr.st_dev; + inode.is_symlink = S_ISLNK(e->attr.st_mode); + inode.nlookup = 1; + inode.fd = newfd; + fs_lock.unlock(); + + if (fs.debug) + cerr << "DEBUG: lookup(): created userspace inode " << e->attr.st_ino + << endl; + } + + return 0; +} + +static void sfs_lookup(fuse_req_t req, fuse_ino_t parent, const char *name) +{ + fuse_entry_param e{}; + auto err = do_lookup(parent, name, &e); + if (err == ENOENT) + { + e.attr_timeout = fs.timeout; + e.entry_timeout = fs.timeout; + e.ino = e.attr.st_ino = 0; + fuse_reply_entry(req, &e); + } + else if (err) + { + if (err == ENFILE || err == EMFILE) + cerr << "ERROR: Reached maximum number of file descriptors." << endl; + fuse_reply_err(req, err); + } + else + { + fuse_reply_entry(req, &e); + } +} + +static void mknod_symlink(fuse_req_t req, fuse_ino_t parent, + const char *name, mode_t mode, dev_t rdev, + const char *link) +{ + int res; + Inode &inode_p = get_inode(parent); + auto saverr = ENOMEM; + + if (S_ISDIR(mode)) + res = mkdirat(inode_p.fd, name, mode); + else if (S_ISLNK(mode)) + res = symlinkat(link, inode_p.fd, name); + else + res = mknodat(inode_p.fd, name, mode, rdev); + saverr = errno; + if (res == -1) + goto out; + + fuse_entry_param e; + saverr = do_lookup(parent, name, &e); + if (saverr) + goto out; + + fuse_reply_entry(req, &e); + return; + +out: + if (saverr == ENFILE || saverr == EMFILE) + cerr << "ERROR: Reached maximum number of file descriptors." << endl; + fuse_reply_err(req, saverr); +} + +static void sfs_mknod(fuse_req_t req, fuse_ino_t parent, const char *name, + mode_t mode, dev_t rdev) +{ + mknod_symlink(req, parent, name, mode, rdev, nullptr); +} + +static void sfs_mkdir(fuse_req_t req, fuse_ino_t parent, const char *name, + mode_t mode) +{ + mknod_symlink(req, parent, name, S_IFDIR | mode, 0, nullptr); +} + +static void sfs_symlink(fuse_req_t req, const char *link, fuse_ino_t parent, + const char *name) +{ + mknod_symlink(req, parent, name, S_IFLNK, 0, link); +} + +static int linkat_empty_nofollow(Inode &inode, int dfd, const char *name) +{ + if (inode.is_symlink) + { + auto res = linkat(inode.fd, "", dfd, name, AT_EMPTY_PATH); + if (res == -1 && (errno == ENOENT || errno == EINVAL)) + { + /* Sorry, no race free way to hard-link a symlink. */ + errno = EOPNOTSUPP; + } + return res; + } + + char procname[64]; + sprintf(procname, "/proc/self/fd/%i", inode.fd); + return linkat(AT_FDCWD, procname, dfd, name, AT_SYMLINK_FOLLOW); +} + +static void sfs_link(fuse_req_t req, fuse_ino_t ino, fuse_ino_t parent, + const char *name) +{ + Inode &inode = get_inode(ino); + Inode &inode_p = get_inode(parent); + fuse_entry_param e{}; + + e.attr_timeout = fs.timeout; + e.entry_timeout = fs.timeout; + + auto res = linkat_empty_nofollow(inode, inode_p.fd, name); + if (res == -1) + { + fuse_reply_err(req, errno); + return; + } + + res = fstatat(inode.fd, "", &e.attr, AT_EMPTY_PATH | AT_SYMLINK_NOFOLLOW); + if (res == -1) + { + fuse_reply_err(req, errno); + return; + } + e.ino = reinterpret_cast(&inode); + { + lock_guard g{inode.m}; + inode.nlookup++; + } + + fuse_reply_entry(req, &e); + return; +} + +static void sfs_rmdir(fuse_req_t req, fuse_ino_t parent, const char *name) +{ + Inode &inode_p = get_inode(parent); + lock_guard g{inode_p.m}; + auto res = unlinkat(inode_p.fd, name, AT_REMOVEDIR); + fuse_reply_err(req, res == -1 ? errno : 0); +} + +static void sfs_rename(fuse_req_t req, fuse_ino_t parent, const char *name, + fuse_ino_t newparent, const char *newname, + unsigned int flags) +{ + Inode &inode_p = get_inode(parent); + Inode &inode_np = get_inode(newparent); + if (flags) + { + fuse_reply_err(req, EINVAL); + return; + } + + std::string oldfilepath, newfilepath; + if (helpers::getfilepath(oldfilepath, inode_p.fd, name) == 0 && + helpers::getfilepath(newfilepath, inode_np.fd, newname) == 0) + { + statemonitor.onrename(oldfilepath, newfilepath); + } + + auto res = renameat(inode_p.fd, name, inode_np.fd, newname); + fuse_reply_err(req, res == -1 ? errno : 0); +} + +static void sfs_unlink(fuse_req_t req, fuse_ino_t parent, const char *name) +{ + Inode &inode_p = get_inode(parent); + + std::string filepath; + if (helpers::getfilepath(filepath, inode_p.fd, name) == 0) + statemonitor.ondelete(filepath); + + auto res = unlinkat(inode_p.fd, name, 0); + fuse_reply_err(req, res == -1 ? errno : 0); +} + +static void forget_one(fuse_ino_t ino, uint64_t n) +{ + Inode &inode = get_inode(ino); + unique_lock l{inode.m}; + + if (n > inode.nlookup) + { + cerr << "INTERNAL ERROR: Negative lookup count for inode " + << inode.src_ino << endl; + abort(); + } + inode.nlookup -= n; + if (!inode.nlookup) + { + if (fs.debug) + cerr << "DEBUG: forget: cleaning up inode " << inode.src_ino << endl; + { + lock_guard g_fs{fs.mutex}; + l.unlock(); + fs.inodes.erase({inode.src_ino, inode.src_dev}); + } + } + else if (fs.debug) + cerr << "DEBUG: forget: inode " << inode.src_ino + << " lookup count now " << inode.nlookup << endl; +} + +static void sfs_forget(fuse_req_t req, fuse_ino_t ino, uint64_t nlookup) +{ + forget_one(ino, nlookup); + fuse_reply_none(req); +} + +static void sfs_forget_multi(fuse_req_t req, size_t count, + fuse_forget_data *forgets) +{ + for (int i = 0; i < count; i++) + forget_one(forgets[i].ino, forgets[i].nlookup); + fuse_reply_none(req); +} + +static void sfs_readlink(fuse_req_t req, fuse_ino_t ino) +{ + Inode &inode = get_inode(ino); + char buf[PATH_MAX + 1]; + auto res = readlinkat(inode.fd, "", buf, sizeof(buf)); + if (res == -1) + fuse_reply_err(req, errno); + else if (res == sizeof(buf)) + fuse_reply_err(req, ENAMETOOLONG); + else + { + buf[res] = '\0'; + fuse_reply_readlink(req, buf); + } +} + +struct DirHandle +{ + DIR *dp{nullptr}; + off_t offset; + + DirHandle() = default; + DirHandle(const DirHandle &) = delete; + DirHandle &operator=(const DirHandle &) = delete; + + ~DirHandle() + { + if (dp) + closedir(dp); + } +}; + +static DirHandle *get_dir_handle(fuse_file_info *fi) +{ + return reinterpret_cast(fi->fh); +} + +static void sfs_opendir(fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi) +{ + Inode &inode = get_inode(ino); + auto d = new (nothrow) DirHandle; + if (d == nullptr) + { + fuse_reply_err(req, ENOMEM); + return; + } + + // Make Helgrind happy - it can't know that there's an implicit + // synchronization due to the fact that other threads cannot + // access d until we've called fuse_reply_*. + lock_guard g{inode.m}; + + auto fd = openat(inode.fd, ".", O_RDONLY); + if (fd == -1) + goto out_errno; + + // On success, dir stream takes ownership of fd, so we + // do not have to close it. + d->dp = fdopendir(fd); + if (d->dp == nullptr) + goto out_errno; + + d->offset = 0; + + fi->fh = reinterpret_cast(d); + if (fs.timeout) + { + fi->keep_cache = 1; + fi->cache_readdir = 1; + } + fuse_reply_open(req, fi); + return; + +out_errno: + auto error = errno; + delete d; + if (error == ENFILE || error == EMFILE) + cerr << "ERROR: Reached maximum number of file descriptors." << endl; + fuse_reply_err(req, error); +} + +static bool is_dot_or_dotdot(const char *name) +{ + return name[0] == '.' && + (name[1] == '\0' || (name[1] == '.' && name[2] == '\0')); +} + +static void do_readdir(fuse_req_t req, fuse_ino_t ino, size_t size, + off_t offset, fuse_file_info *fi, int plus) +{ + auto d = get_dir_handle(fi); + Inode &inode = get_inode(ino); + lock_guard g{inode.m}; + char *p; + auto rem = size; + int err = 0, count = 0; + + if (fs.debug) + cerr << "DEBUG: readdir(): started with offset " + << offset << endl; + + auto buf = new (nothrow) char[size]; + if (!buf) + { + fuse_reply_err(req, ENOMEM); + return; + } + p = buf; + + if (offset != d->offset) + { + if (fs.debug) + cerr << "DEBUG: readdir(): seeking to " << offset << endl; + seekdir(d->dp, offset); + d->offset = offset; + } + + while (1) + { + struct dirent *entry; + errno = 0; + entry = readdir(d->dp); + if (!entry) + { + if (errno) + { + err = errno; + if (fs.debug) + warn("DEBUG: readdir(): readdir failed with"); + goto error; + } + break; // End of stream + } + d->offset = entry->d_off; + if (is_dot_or_dotdot(entry->d_name)) + continue; + + fuse_entry_param e{}; + size_t entsize; + if (plus) + { + err = do_lookup(ino, entry->d_name, &e); + if (err) + goto error; + entsize = fuse_add_direntry_plus(req, p, rem, entry->d_name, &e, entry->d_off); + + if (entsize > rem) + { + if (fs.debug) + cerr << "DEBUG: readdir(): buffer full, returning data. " << endl; + forget_one(e.ino, 1); + break; + } + } + else + { + e.attr.st_ino = entry->d_ino; + e.attr.st_mode = entry->d_type << 12; + entsize = fuse_add_direntry(req, p, rem, entry->d_name, &e.attr, entry->d_off); + + if (entsize > rem) + { + if (fs.debug) + cerr << "DEBUG: readdir(): buffer full, returning data. " << endl; + break; + } + } + + p += entsize; + rem -= entsize; + count++; + if (fs.debug) + { + cerr << "DEBUG: readdir(): added to buffer: " << entry->d_name + << ", ino " << e.attr.st_ino << ", offset " << entry->d_off << endl; + } + } + err = 0; +error: + + // If there's an error, we can only signal it if we haven't stored + // any entries yet - otherwise we'd end up with wrong lookup + // counts for the entries that are already in the buffer. So we + // return what we've collected until that point. + if (err && rem == size) + { + if (err == ENFILE || err == EMFILE) + cerr << "ERROR: Reached maximum number of file descriptors." << endl; + fuse_reply_err(req, err); + } + else + { + if (fs.debug) + cerr << "DEBUG: readdir(): returning " << count + << " entries, curr offset " << d->offset << endl; + fuse_reply_buf(req, buf, size - rem); + } + delete[] buf; + return; +} + +static void sfs_readdir(fuse_req_t req, fuse_ino_t ino, size_t size, + off_t offset, fuse_file_info *fi) +{ + // operation logging is done in readdir to reduce code duplication + do_readdir(req, ino, size, offset, fi, 0); +} + +static void sfs_readdirplus(fuse_req_t req, fuse_ino_t ino, size_t size, + off_t offset, fuse_file_info *fi) +{ + // operation logging is done in readdir to reduce code duplication + do_readdir(req, ino, size, offset, fi, 1); +} + +static void sfs_releasedir(fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi) +{ + (void)ino; + auto d = get_dir_handle(fi); + delete d; + fuse_reply_err(req, 0); +} + +static void sfs_create(fuse_req_t req, fuse_ino_t parent, const char *name, + mode_t mode, fuse_file_info *fi) +{ + Inode &inode_p = get_inode(parent); + + auto fd = openat(inode_p.fd, name, + (fi->flags | O_CREAT) & ~O_NOFOLLOW, mode); + if (fd == -1) + { + auto err = errno; + if (err == ENFILE || err == EMFILE) + cerr << "ERROR: Reached maximum number of file descriptors." << endl; + fuse_reply_err(req, err); + return; + } + + fi->fh = fd; + fuse_entry_param e; + auto err = do_lookup(parent, name, &e); + if (err) + { + if (err == ENFILE || err == EMFILE) + cerr << "ERROR: Reached maximum number of file descriptors." << endl; + fuse_reply_err(req, err); + } + else + { + statemonitor.oncreate(fd); + fuse_reply_create(req, &e, fi); + } +} + +static void sfs_fsyncdir(fuse_req_t req, fuse_ino_t ino, int datasync, + fuse_file_info *fi) +{ + (void)ino; + int res; + int fd = dirfd(get_dir_handle(fi)->dp); + if (datasync) + res = fdatasync(fd); + else + res = fsync(fd); + fuse_reply_err(req, res == -1 ? errno : 0); +} + +static void sfs_open(fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi) +{ + Inode &inode = get_inode(ino); + + /* With writeback cache, kernel may send read requests even + when userspace opened write-only */ + if (fs.timeout && (fi->flags & O_ACCMODE) == O_WRONLY) + { + fi->flags &= ~O_ACCMODE; + fi->flags |= O_RDWR; + } + + /* With writeback cache, O_APPEND is handled by the kernel. This + breaks atomicity (since the file may change in the underlying + filesystem, so that the kernel's idea of the end of the file + isn't accurate anymore). However, no process should modify the + file in the underlying filesystem once it has been read, so + this is not a problem. */ + if (fs.timeout && fi->flags & O_APPEND) + fi->flags &= ~O_APPEND; + + /* Unfortunately we cannot use inode.fd, because this was opened + with O_PATH (so it doesn't allow read/write access). */ + char buf[64]; + sprintf(buf, "/proc/self/fd/%i", inode.fd); + + statemonitor.onopen(inode.fd, fi->flags); + + auto fd = open(buf, fi->flags & ~O_NOFOLLOW); + if (fd == -1) + { + auto err = errno; + if (err == ENFILE || err == EMFILE) + cerr << "ERROR: Reached maximum number of file descriptors." << endl; + fuse_reply_err(req, err); + return; + } + + fi->keep_cache = (fs.timeout != 0); + fi->fh = fd; + + fuse_reply_open(req, fi); +} + +static void sfs_release(fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi) +{ + (void)ino; + close(fi->fh); + statemonitor.onclose(fi->fh); + fuse_reply_err(req, 0); +} + +static void sfs_flush(fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi) +{ + (void)ino; + auto res = close(dup(fi->fh)); + fuse_reply_err(req, res == -1 ? errno : 0); +} + +static void sfs_fsync(fuse_req_t req, fuse_ino_t ino, int datasync, + fuse_file_info *fi) +{ + (void)ino; + int res; + if (datasync) + res = fdatasync(fi->fh); + else + res = fsync(fi->fh); + fuse_reply_err(req, res == -1 ? errno : 0); +} + +static void do_read(fuse_req_t req, size_t size, off_t off, fuse_file_info *fi) +{ + + fuse_bufvec buf = FUSE_BUFVEC_INIT(size); + buf.buf[0].flags = static_cast( + FUSE_BUF_IS_FD | FUSE_BUF_FD_SEEK); + buf.buf[0].fd = fi->fh; + buf.buf[0].pos = off; + + fuse_reply_data(req, &buf, FUSE_BUF_COPY_FLAGS); +} + +static void sfs_read(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, + fuse_file_info *fi) +{ + (void)ino; + do_read(req, size, off, fi); +} + +static void do_write_buf(fuse_req_t req, size_t size, off_t off, + fuse_bufvec *in_buf, fuse_file_info *fi) +{ + fuse_bufvec out_buf = FUSE_BUFVEC_INIT(size); + out_buf.buf[0].flags = static_cast( + FUSE_BUF_IS_FD | FUSE_BUF_FD_SEEK); + out_buf.buf[0].fd = fi->fh; + out_buf.buf[0].pos = off; + + auto res = fuse_buf_copy(&out_buf, in_buf, FUSE_BUF_COPY_FLAGS); + if (res < 0) + fuse_reply_err(req, -res); + else + fuse_reply_write(req, (size_t)res); +} + +static void sfs_write_buf(fuse_req_t req, fuse_ino_t ino, fuse_bufvec *in_buf, + off_t off, fuse_file_info *fi) +{ + (void)ino; + auto size{fuse_buf_size(in_buf)}; + + statemonitor.onwrite(fi->fh, off, size); + + do_write_buf(req, size, off, in_buf, fi); +} + +static void sfs_statfs(fuse_req_t req, fuse_ino_t ino) +{ + struct statvfs stbuf; + + auto res = fstatvfs(get_fs_fd(ino), &stbuf); + if (res == -1) + fuse_reply_err(req, errno); + else + fuse_reply_statfs(req, &stbuf); +} + +#ifdef HAVE_POSIX_FALLOCATE +static void sfs_fallocate(fuse_req_t req, fuse_ino_t ino, int mode, + off_t offset, off_t length, fuse_file_info *fi) +{ + (void)ino; + if (mode) + { + fuse_reply_err(req, EOPNOTSUPP); + return; + } + + auto err = posix_fallocate(fi->fh, offset, length); + fuse_reply_err(req, err); +} +#endif + +static void sfs_flock(fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi, + int op) +{ + (void)ino; + auto res = flock(fi->fh, op); + fuse_reply_err(req, res == -1 ? errno : 0); +} + +#ifdef HAVE_SETXATTR +static void sfs_getxattr(fuse_req_t req, fuse_ino_t ino, const char *name, + size_t size) +{ + char *value = nullptr; + Inode &inode = get_inode(ino); + ssize_t ret; + int saverr; + + if (inode.is_symlink) + { + /* Sorry, no race free way to getxattr on symlink. */ + saverr = ENOTSUP; + goto out; + } + + char procname[64]; + sprintf(procname, "/proc/self/fd/%i", inode.fd); + + if (size) + { + value = new (nothrow) char[size]; + if (value == nullptr) + { + saverr = ENOMEM; + goto out; + } + + ret = getxattr(procname, name, value, size); + if (ret == -1) + goto out_err; + saverr = 0; + if (ret == 0) + goto out; + + fuse_reply_buf(req, value, ret); + } + else + { + ret = getxattr(procname, name, nullptr, 0); + if (ret == -1) + goto out_err; + + fuse_reply_xattr(req, ret); + } +out_free: + delete[] value; + return; + +out_err: + saverr = errno; +out: + fuse_reply_err(req, saverr); + goto out_free; +} + +static void sfs_listxattr(fuse_req_t req, fuse_ino_t ino, size_t size) +{ + char *value = nullptr; + Inode &inode = get_inode(ino); + ssize_t ret; + int saverr; + + if (inode.is_symlink) + { + /* Sorry, no race free way to listxattr on symlink. */ + saverr = ENOTSUP; + goto out; + } + + char procname[64]; + sprintf(procname, "/proc/self/fd/%i", inode.fd); + + if (size) + { + value = new (nothrow) char[size]; + if (value == nullptr) + { + saverr = ENOMEM; + goto out; + } + + ret = listxattr(procname, value, size); + if (ret == -1) + goto out_err; + saverr = 0; + if (ret == 0) + goto out; + + fuse_reply_buf(req, value, ret); + } + else + { + ret = listxattr(procname, nullptr, 0); + if (ret == -1) + goto out_err; + + fuse_reply_xattr(req, ret); + } +out_free: + delete[] value; + return; +out_err: + saverr = errno; +out: + fuse_reply_err(req, saverr); + goto out_free; +} + +static void sfs_setxattr(fuse_req_t req, fuse_ino_t ino, const char *name, + const char *value, size_t size, int flags) +{ + Inode &inode = get_inode(ino); + ssize_t ret; + int saverr; + + if (inode.is_symlink) + { + /* Sorry, no race free way to setxattr on symlink. */ + saverr = ENOTSUP; + goto out; + } + + char procname[64]; + sprintf(procname, "/proc/self/fd/%i", inode.fd); + + ret = setxattr(procname, name, value, size, flags); + saverr = ret == -1 ? errno : 0; + +out: + fuse_reply_err(req, saverr); +} + +static void sfs_removexattr(fuse_req_t req, fuse_ino_t ino, const char *name) +{ + char procname[64]; + Inode &inode = get_inode(ino); + ssize_t ret; + int saverr; + + if (inode.is_symlink) + { + /* Sorry, no race free way to setxattr on symlink. */ + saverr = ENOTSUP; + goto out; + } + + sprintf(procname, "/proc/self/fd/%i", inode.fd); + ret = removexattr(procname, name); + saverr = ret == -1 ? errno : 0; + +out: + fuse_reply_err(req, saverr); +} +#endif + +static void assign_operations(fuse_lowlevel_ops &sfs_oper) +{ + sfs_oper.init = sfs_init; + sfs_oper.lookup = sfs_lookup; + sfs_oper.mkdir = sfs_mkdir; + sfs_oper.mknod = sfs_mknod; + sfs_oper.symlink = sfs_symlink; + sfs_oper.link = sfs_link; + sfs_oper.unlink = sfs_unlink; + sfs_oper.rmdir = sfs_rmdir; + sfs_oper.rename = sfs_rename; + sfs_oper.forget = sfs_forget; + sfs_oper.forget_multi = sfs_forget_multi; + sfs_oper.getattr = sfs_getattr; + sfs_oper.setattr = sfs_setattr; + sfs_oper.readlink = sfs_readlink; + sfs_oper.opendir = sfs_opendir; + sfs_oper.readdir = sfs_readdir; + sfs_oper.readdirplus = sfs_readdirplus; + sfs_oper.releasedir = sfs_releasedir; + sfs_oper.fsyncdir = sfs_fsyncdir; + sfs_oper.create = sfs_create; + sfs_oper.open = sfs_open; + sfs_oper.release = sfs_release; + sfs_oper.flush = sfs_flush; + sfs_oper.fsync = sfs_fsync; + sfs_oper.read = sfs_read; + sfs_oper.write_buf = sfs_write_buf; + sfs_oper.statfs = sfs_statfs; +#ifdef HAVE_POSIX_FALLOCATE + sfs_oper.fallocate = sfs_fallocate; +#endif + sfs_oper.flock = sfs_flock; +#ifdef HAVE_SETXATTR + sfs_oper.setxattr = sfs_setxattr; + sfs_oper.getxattr = sfs_getxattr; + sfs_oper.listxattr = sfs_listxattr; + sfs_oper.removexattr = sfs_removexattr; +#endif +} + +void maximize_fd_limit() +{ + struct rlimit lim + { + }; + auto res = getrlimit(RLIMIT_NOFILE, &lim); + if (res != 0) + { + warn("WARNING: getrlimit() failed with"); + return; + } + lim.rlim_cur = lim.rlim_max; + res = setrlimit(RLIMIT_NOFILE, &lim); + if (res != 0) + warn("WARNING: setrlimit() failed with"); +} + +int start(const char *arg0, const char *statehistdir, const char *fusemntdir) +{ + // We need an fd for every entry in our the filesystem that the + // kernel knows about. This is way more than most processes need, + // so try to get rid of any resource softlimit. + maximize_fd_limit(); + + // We consider this as the first run of the history dir is empty. + const bool firstrun = boost::filesystem::is_empty(statehistdir); + + statefs::init(statehistdir); + statemonitor.ctx = statefs::get_statedir_context(); + fs.source = statemonitor.ctx.datadir; + + // Create a checkpoint from the second run onwards. + if (!firstrun) + statemonitor.create_checkpoint(); + + // Initialize filesystem root + fs.root.fd = -1; + fs.root.nlookup = 9999; + fs.root.is_symlink = false; + fs.timeout = 86400.0; + + struct stat stat; + auto ret = lstat(fs.source.c_str(), &stat); + if (ret == -1) + err(1, "ERROR: failed to stat source (\"%s\")", fs.source.c_str()); + if (!S_ISDIR(stat.st_mode)) + errx(1, "ERROR: source is not a directory"); + fs.src_dev = stat.st_dev; + + fs.root.fd = open(fs.source.c_str(), O_PATH); + if (fs.root.fd == -1) + err(1, "ERROR: open(\"%s\", O_PATH)", fs.source.c_str()); + + // Initialize fuse + fuse_args args = FUSE_ARGS_INIT(0, nullptr); + if (fuse_opt_add_arg(&args, arg0) || + fuse_opt_add_arg(&args, "-o") || + fuse_opt_add_arg(&args, "default_permissions,fsname=hpstatefs") + /*|| fuse_opt_add_arg(&args, "-odebug")*/) + errx(3, "ERROR: Out of memory"); + + fuse_lowlevel_ops sfs_oper{}; + assign_operations(sfs_oper); + auto se = fuse_session_new(&args, &sfs_oper, sizeof(sfs_oper), &fs); + if (se == nullptr) + goto err_out1; + + if (fuse_set_signal_handlers(se) != 0) + goto err_out2; + + // Don't apply umask, use modes exactly as specified + umask(0); + + // Mount and run main loop + struct fuse_loop_config loop_config; + loop_config.clone_fd = 0; + loop_config.max_idle_threads = 10; + if (fuse_session_mount(se, fusemntdir) != 0) + goto err_out3; + + ret = fuse_session_loop_mt(se, &loop_config); + + fuse_session_unmount(se); + +err_out3: + fuse_remove_signal_handlers(se); +err_out2: + fuse_session_destroy(se); +err_out1: + fuse_opt_free_args(&args); + + return ret ? 1 : 0; +} + +} // namespace fusefs + +int main(int argc, char *argv[]) +{ + if (argc != 3) + { + std::cerr << "Incorrect arguments.\n"; + exit(1); + } + + fusefs::start(argv[0], argv[1], argv[2]); +} \ No newline at end of file diff --git a/src/statefs/state_monitor/fusefs.hpp b/src/statefs/state_monitor/fusefs.hpp new file mode 100644 index 00000000..762ca856 --- /dev/null +++ b/src/statefs/state_monitor/fusefs.hpp @@ -0,0 +1,9 @@ +#ifndef _FUSE_FS_ +#define _FUSE_FS_ + +namespace fusefs +{ +int start(const char *arg0, const char *source, const char *mountpoint, const char *deltadir); +} + +#endif \ No newline at end of file diff --git a/src/statefs/state_monitor/state_monitor.cpp b/src/statefs/state_monitor/state_monitor.cpp new file mode 100644 index 00000000..46d57736 --- /dev/null +++ b/src/statefs/state_monitor/state_monitor.cpp @@ -0,0 +1,519 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "../hasher.hpp" +#include "../state_common.hpp" +#include "state_monitor.hpp" + +namespace statefs +{ + +void state_monitor::create_checkpoint() +{ + // Shift -1 and below checkpoints by 1 more. + // If MAX oldest checkpoint is there, remove it and work our way upwards. + int16_t oldest_chkpnt = (MAX_CHECKPOINTS + 1) * -1; // +1 because we maintain one extra checkpoint in case of rollbacks. + for (int16_t chkpnt = oldest_chkpnt; chkpnt <= -1; chkpnt++) + { + std::string dir = get_statedir_root(chkpnt); + + if (boost::filesystem::exists(dir)) + { + if (chkpnt == oldest_chkpnt) + { + boost::filesystem::remove_all(dir); + } + else + { + std::string dirshift = get_statedir_root(chkpnt - 1); + boost::filesystem::rename(dir, dirshift); + } + } + + if (chkpnt == -1) + { + statedir_context ctx = get_statedir_context(0, true); + + // Shift 0-state delta dir to -1. + std::string delta_1 = dir + DELTA_DIR; + boost::filesystem::create_directories(delta_1); + + boost::filesystem::rename(ctx.deltadir, delta_1); + boost::filesystem::create_directories(ctx.deltadir); + } + } + + return; +} + +void state_monitor::oncreate(const int fd) +{ + std::lock_guard lock(monitor_mutex); + + std::string filepath; + if (extract_filepath(filepath, fd) == 0) + oncreate_filepath(filepath); +} + +void state_monitor::onopen(const int inodefd, const int flags) +{ + std::lock_guard lock(monitor_mutex); + + std::string filepath; + if (extract_filepath(filepath, inodefd) == 0) + { + state_file_info *fi; + if (get_tracked_fileinfo(&fi, filepath) == 0) + { + // Check whether fd is open in truncate mode. If so cache the entire file immediately. + if (flags & O_TRUNC) + cache_blocks(*fi, 0, fi->original_length); + } + } +} + +void state_monitor::onwrite(const int fd, const off_t offset, const size_t length) +{ + std::lock_guard lock(monitor_mutex); + + std::string filepath; + if (get_fd_filepath(filepath, fd) == 0) + { + state_file_info *fi; + if (get_tracked_fileinfo(&fi, filepath) == 0) + cache_blocks(*fi, offset, length); + } +} + +void state_monitor::onrename(const std::string &oldfilepath, const std::string &newfilepath) +{ + std::lock_guard lock(monitor_mutex); + + ondelete_filepath(oldfilepath); + oncreate_filepath(newfilepath); +} + +void state_monitor::ondelete(const std::string &filepath) +{ + std::lock_guard lock(monitor_mutex); + ondelete_filepath(filepath); +} + +void state_monitor::ontruncate(const int fd, const off_t newsize) +{ + std::lock_guard lock(monitor_mutex); + + std::string filepath; + if (get_fd_filepath(filepath, fd) == 0) + { + // If truncated size is less than the original, cache the entire file. + state_file_info *fi; + if (get_tracked_fileinfo(&fi, filepath) == 0 && newsize < fi->original_length) + cache_blocks(*fi, 0, fi->original_length); + } +} + +void state_monitor::onclose(const int fd) +{ + std::lock_guard lock(monitor_mutex); + + auto pitr = fdpathmap.find(fd); + if (pitr != fdpathmap.end()) + { + // Close any block cache/index fds we have opened for this file. + auto fitr = fileinfomap.find(pitr->second); // pitr->second is the filepath string. + if (fitr != fileinfomap.end()) + close_cachingfds(fitr->second); // fitr->second is the fileinfo struct. + + fdpathmap.erase(pitr); + } +} + +/** + * Extracts the full physical file path for a given fd. + * @param filepath String to assign the extracted file path. + * @param fd The file descriptor to find the filepath. + * @return 0 on successful file path extraction. -1 on failure. + */ +int state_monitor::extract_filepath(std::string &filepath, const int fd) +{ + char proclnk[32]; + sprintf(proclnk, "/proc/self/fd/%d", fd); + + filepath.resize(PATH_MAX); + ssize_t len = readlink(proclnk, filepath.data(), PATH_MAX); + if (len > 0) + { + filepath.resize(len); + return 0; + } + return -1; +} + +/** + * Find the full physical file path for a given fd using the fd map. + * @param filepath String to assign the extracted file path. + * @param fd The file descriptor to find the filepath. + * @return 0 on successful file path extraction. -1 on failure. + */ +int state_monitor::get_fd_filepath(std::string &filepath, const int fd) +{ + // Return path from the map if found. + const auto itr = fdpathmap.find(fd); + if (itr != fdpathmap.end()) + { + filepath = itr->second; + return 0; + } + + // Extract the file path and populate the fd-->filepath map. + if (extract_filepath(filepath, fd) == 0) + { + fdpathmap[fd] = filepath; + return 0; + } + + return -1; +} + +void state_monitor::oncreate_filepath(const std::string &filepath) +{ + // Check whether we are already tracking this file path. + // Only way this could happen is deleting an existing file and creating a new file with same name. + if (fileinfomap.count(filepath) == 0) + { + // Add an entry for the new file in the file info map. This information will be used to ignore + // future operations (eg. write/delete) done to this file. + state_file_info fi; + fi.isnew = true; + fi.filepath = filepath; + fileinfomap[filepath] = std::move(fi); + + // Add to the list of new files added during this session. + write_newfileentry(filepath); + } +} + +void state_monitor::ondelete_filepath(const std::string &filepath) +{ + state_file_info *fi; + if (get_tracked_fileinfo(&fi, filepath) == 0) + { + if (fi->isnew) + { + // If this is a new file, just remove from existing index entries. + // No need to cache the file blocks. + remove_newfileentry(fi->filepath); + fileinfomap.erase(filepath); + } + else + { + // If not a new file, cache the entire file. + cache_blocks(*fi, 0, fi->original_length); + } + } +} + +/** + * Finds the tracked state file information for the given filepath. + * @param fi Reference pointer to assign the state file info struct. + * @param filepath Full physical path of the file. + * @return 0 on successful find. -1 on failure. + */ +int state_monitor::get_tracked_fileinfo(state_file_info **fi, const std::string &filepath) +{ + // Return from filepath-->fileinfo map if found. + const auto itr = fileinfomap.find(filepath); + if (itr != fileinfomap.end()) + { + *fi = &itr->second; + return 0; + } + + // Initialize a new state file info struct for the given filepath. + state_file_info &fileinfo = fileinfomap[filepath]; + + // We use stat() to find out the length of the file. + struct stat stat_buf; + if (stat(filepath.c_str(), &stat_buf) != 0) + { + std::cerr << errno << ": Error occured in stat() of " << filepath << "\n"; + return -1; + } + + fileinfo.original_length = stat_buf.st_size; + fileinfo.filepath = filepath; + *fi = &fileinfo; + return 0; +} + +/** + * Caches the specified bytes range of the given file. + * @param fi The file info struct pointing to the file to be cached. + * @param offset The start byte position for caching. + * @param length How many bytes to cache. + * @return 0 on successful execution. -1 on failure. + */ +int state_monitor::cache_blocks(state_file_info &fi, const off_t offset, const size_t length) +{ + // No caching required if this is a new file created during this session. + if (fi.isnew) + return 0; + + uint32_t original_blockcount = ceil((double)fi.original_length / (double)BLOCK_SIZE); + + // Check whether we have already cached the entire file. + if (original_blockcount == fi.cached_blockids.size()) + return 0; + + // Initialize fds and indexes required for caching. + if (prepare_caching(fi) != 0) + return -1; + + // Return if incoming write is outside any of the original blocks. + if (offset > original_blockcount * BLOCK_SIZE) + return 0; + + uint32_t startblock = offset / BLOCK_SIZE; + uint32_t endblock = (offset + length) / BLOCK_SIZE; + + // std::cout << "Cache blocks: '" << fi.filepath << "' [" << offset << "," << length << "] " << startblock << "," << endblock << "\n"; + + // If this is the first time we are caching this file, write an entry to the touched file index. + if (fi.cached_blockids.empty() && write_touchedfileentry(fi.filepath) != 0) + return -1; + + for (uint32_t i = startblock; i <= endblock; i++) + { + // Skip if we have already cached this block. + if (fi.cached_blockids.count(i) > 0) + continue; + + // Read the block being replaced and send to cache file. + // Allocating block buffer on the heap to avoid filling limited stack space. + std::unique_ptr blockbuf = std::make_unique(BLOCK_SIZE); + off_t blockoffset = BLOCK_SIZE * i; + size_t bytesread = pread(fi.readfd, blockbuf.get(), BLOCK_SIZE, BLOCK_SIZE * i); + if (bytesread < 0) + { + std::cerr << errno << ": Read failed " << fi.filepath << "\n"; + return -1; + } + + // No more bytes to read in this file. + if (bytesread == 0) + return 0; + + if (write(fi.cachefd, blockbuf.get(), bytesread) < 0) + { + std::cerr << errno << ": Write to block cache failed. " << fi.filepath << "\n"; + return -1; + } + + // Append an entry (44 bytes) into the block cache index. We maintain this index to + // help random block access for external use cases. We currently do not sort this index here. + // Whoever is using the index must sort it if required. + // Entry format: [blocknum(4 bytes) | cacheoffset(8 bytes) | blockhash(32 bytes)] + + char entrybuf[BLOCKINDEX_ENTRY_SIZE]; + hasher::B2H hash = hasher::hash(&blockoffset, 8, blockbuf.get(), bytesread); + + // Original file block id. + memcpy(entrybuf, &i, 4); + // Position of the block within the cache file. + off_t cacheoffset = fi.cached_blockids.size() * BLOCK_SIZE; + memcpy(entrybuf + 4, &cacheoffset, 8); + // The block hash. + memcpy(entrybuf + 12, hash.data, 32); + if (write(fi.indexfd, entrybuf, BLOCKINDEX_ENTRY_SIZE) < 0) + { + std::cerr << errno << ": Write to block index failed. " << fi.filepath << "\n"; + return -1; + } + + // Mark the block as cached. + fi.cached_blockids.emplace(i); + } + + return 0; +} + +/** + * Initializes fds and indexes required for caching. + * @param fi The state file info struct pointing to the file being cached. + * @return 0 on succesful initialization. -1 on failure. + */ +int state_monitor::prepare_caching(state_file_info &fi) +{ + // If readfd is greater than 0 then we take it as caching being already initialized. + if (fi.readfd > 0) + return 0; + + // Open up the file using a read-only fd. This fd will be used to fetch blocks to be cached. + fi.readfd = open(fi.filepath.c_str(), O_RDONLY); + if (fi.readfd < 0) + { + std::cerr << errno << ": Open failed " << fi.filepath << "\n"; + return -1; + } + + // Get the path of the file relative to the state dir. We maintain this same reative path for the + // corresponding cache and index files in the cache dir. + std::string relpath = get_relpath(fi.filepath, ctx.datadir); + + std::string tmppath; + tmppath.reserve(ctx.deltadir.length() + relpath.length() + BLOCKCACHE_EXT_LEN); + tmppath.append(ctx.deltadir).append(relpath).append(BLOCKCACHE_EXT); + + // Create directory tree if not exist so we are able to create the cache and index files. + boost::filesystem::path cachesubdir = boost::filesystem::path(tmppath).parent_path(); + if (created_cachesubdirs.count(cachesubdir.string()) == 0) + { + boost::filesystem::create_directories(cachesubdir); + created_cachesubdirs.emplace(cachesubdir.string()); + } + + // Create and open the block cache file. + fi.cachefd = open(tmppath.c_str(), O_WRONLY | O_APPEND | O_CREAT, FILE_PERMS); + if (fi.cachefd <= 0) + { + std::cerr << errno << ": Open failed " << tmppath << "\n"; + return -1; + } + + // Create and open the block index file. + tmppath.replace(tmppath.length() - BLOCKCACHE_EXT_LEN, BLOCKINDEX_EXT_LEN, BLOCKINDEX_EXT); + fi.indexfd = open(tmppath.c_str(), O_WRONLY | O_APPEND | O_CREAT, FILE_PERMS); + if (fi.indexfd <= 0) + { + std::cerr << errno << ": Open failed " << tmppath << "\n"; + return -1; + } + + // Write first entry (8 bytes) to the index file. First entry is the length of the original file. + // This will be helpful when restoring/rolling back a file. + if (write(fi.indexfd, &fi.original_length, 8) == -1) + { + std::cerr << errno << ": Error writing to index file " << tmppath << "\n"; + return -1; + } + + return 0; +} + +/** + * Closes any open caching fds for a given file. + */ +void state_monitor::close_cachingfds(state_file_info &fi) +{ + if (fi.readfd > 0) + close(fi.readfd); + + if (fi.cachefd > 0) + close(fi.cachefd); + + if (fi.indexfd > 0) + close(fi.indexfd); + + fi.readfd = 0; + fi.cachefd = 0; + fi.indexfd = 0; +} + +/** + * Inserts a file into the modified files list of this session. + * This index is used to restore modified files during restore. + */ +int state_monitor::write_touchedfileentry(std::string_view filepath) +{ + if (touchedfileindexfd <= 0) + { + std::string indexfile = ctx.deltadir + "/idxtouched.idx"; + touchedfileindexfd = open(indexfile.c_str(), O_WRONLY | O_APPEND | O_CREAT, FILE_PERMS); + if (touchedfileindexfd <= 0) + { + std::cerr << errno << ": Open failed " << indexfile << "\n"; + return -1; + } + } + + // Write the relative file path line to the index. + filepath = filepath.substr(ctx.datadir.length(), filepath.length() - ctx.datadir.length()); + write(touchedfileindexfd, filepath.data(), filepath.length()); + write(touchedfileindexfd, "\n", 1); + return 0; +} + +/** + * Inserts a file into the list of new files created during this session. + * This index is used in deleting new files during restore. + */ +int state_monitor::write_newfileentry(std::string_view filepath) +{ + std::string indexfile = ctx.deltadir + "/idxnew.idx"; + int fd = open(indexfile.c_str(), O_WRONLY | O_APPEND | O_CREAT, FILE_PERMS); + if (fd <= 0) + { + std::cerr << errno << ": Open failed " << indexfile << "\n"; + return -1; + } + + // Write the relative file path line to the index. + filepath = filepath.substr(ctx.datadir.length(), filepath.length() - ctx.datadir.length()); + write(fd, filepath.data(), filepath.length()); + write(fd, "\n", 1); + close(fd); + return 0; +} + +/** + * Scans and removes the given filepath from the new files index. + */ +void state_monitor::remove_newfileentry(std::string_view filepath) +{ + filepath = filepath.substr(ctx.datadir.length(), filepath.length() - ctx.datadir.length()); + + // We create a copy of the new file index and transfer lines from first file + // to the second file except the line matching the given filepath. + + std::string indexfile = ctx.deltadir + "/idxnew.idx"; + std::string indexfile_tmp = ctx.deltadir + "/idxnew.idx.tmp"; + + std::ifstream infile(indexfile); + std::ofstream outfile(indexfile_tmp); + + bool linestransferred = false; + for (std::string line; std::getline(infile, line);) + { + if (line != filepath) // Skip the file being removed. + { + outfile << line << "\n"; + linestransferred = true; + } + } + + infile.close(); + outfile.close(); + + // Remove the old index. + std::remove(indexfile.c_str()); + + // If no lines transferred, delete the temp file as well. + if (linestransferred) + std::rename(indexfile_tmp.c_str(), indexfile.c_str()); + else + std::remove(indexfile_tmp.c_str()); +} + +} // namespace statefs \ No newline at end of file diff --git a/src/statefs/state_monitor/state_monitor.hpp b/src/statefs/state_monitor/state_monitor.hpp new file mode 100644 index 00000000..7958bdc8 --- /dev/null +++ b/src/statefs/state_monitor/state_monitor.hpp @@ -0,0 +1,77 @@ +#ifndef _STATEFS_STATE_MONITOR_ +#define _STATEFS_STATE_MONITOR_ + +#include +#include +#include +#include +#include +#include +#include "../state_common.hpp" + +namespace statefs +{ + +// Holds information about an original file in state that we are tracking. +struct state_file_info +{ + bool isnew; + off_t original_length; + std::unordered_set cached_blockids; + std::string filepath; + int readfd; + int cachefd; + int indexfd; +}; + +// Invoked by fuse file system for relevent file system calls. +class state_monitor +{ +private: + // Map of fd-->filepath + std::unordered_map fdpathmap; + + // Map of filepath-->fileinfo + std::unordered_map fileinfomap; + + // Complete list of modified files during the session. + std::unordered_set touchedfiles; + + // List of new cache sub directories created during the session. + std::unordered_set created_cachesubdirs; + + // Mutex to synchronize parallel file system calls into our custom state tracking logic. + std::mutex monitor_mutex; + + // Holds the fd used to write into modified files index. This will be kept open for the entire + // life of the state monitor. + int touchedfileindexfd = 0; + + int extract_filepath(std::string &filepath, const int fd); + int get_fd_filepath(std::string &filepath, const int fd); + void oncreate_filepath(const std::string &filepath); + void ondelete_filepath(const std::string &filepath); + int get_tracked_fileinfo(state_file_info **fileinfo, const std::string &filepath); + + int cache_blocks(state_file_info &fi, const off_t offset, const size_t length); + int prepare_caching(state_file_info &fi); + void close_cachingfds(state_file_info &fi); + int write_touchedfileentry(std::string_view filepath); + int write_newfileentry(std::string_view filepath); + void remove_newfileentry(std::string_view filepath); + +public: + statedir_context ctx; + void create_checkpoint(); + void oncreate(const int fd); + void onopen(const int inodefd, const int flags); + void onwrite(const int fd, const off_t offset, const size_t length); + void onrename(const std::string &oldfilepath, const std::string &newfilepath); + void ondelete(const std::string &filepath); + void ontruncate(const int fd, const off_t newsize); + void onclose(const int fd); +}; + +} // namespace statefs + +#endif \ No newline at end of file diff --git a/src/statefs/state_restore.cpp b/src/statefs/state_restore.cpp new file mode 100644 index 00000000..f856a085 --- /dev/null +++ b/src/statefs/state_restore.cpp @@ -0,0 +1,200 @@ +#include "../pchheader.hpp" +#include "../hplog.hpp" +#include "hasher.hpp" +#include "state_restore.hpp" +#include "hashtree_builder.hpp" +#include "state_common.hpp" + +namespace statefs +{ + +// Look at new files added and delete them if still exist. +void state_restore::delete_newfiles() +{ + std::string indexfile(ctx.deltadir); + indexfile.append(IDX_NEWFILES); + + std::ifstream infile(indexfile); + for (std::string file; std::getline(infile, file);) + { + std::string filepath(ctx.datadir); + filepath.append(file); + + std::remove(filepath.c_str()); + } + + infile.close(); +} + +// Look at touched files and restore them. +int state_restore::restore_touchedfiles() +{ + std::unordered_set processed; + + std::string indexfile(ctx.deltadir); + indexfile.append(IDX_TOUCHEDFILES); + + std::ifstream infile(indexfile); + for (std::string file; std::getline(infile, file);) + { + // Skip if already processed. + if (processed.count(file) > 0) + continue; + + std::vector bindex; + if (read_blockindex(bindex, file) != 0) + return -1; + + if (restore_blocks(file, bindex) != 0) + return -1; + + // Add to processed file list. + processed.emplace(file); + } + + infile.close(); + return 0; +} + +// Read the delta block index. +int state_restore::read_blockindex(std::vector &buffer, std::string_view file) +{ + std::string bindexfile(ctx.deltadir); + bindexfile.append(file).append(BLOCKINDEX_EXT); + std::ifstream infile(bindexfile, std::ios::binary | std::ios::ate); + std::streamsize idxsize = infile.tellg(); + infile.seekg(0, std::ios::beg); + + buffer.resize(idxsize); + if (!infile.read(buffer.data(), idxsize)) + { + LOG_ERR << errno << ": Read failed " << bindexfile; + return -1; + } + + return 0; +} + +// Restore blocks mentioned in the delta block index. +int state_restore::restore_blocks(std::string_view file, const std::vector &bindex) +{ + int bcachefd = 0, orifilefd = 0; + const char *idxptr = bindex.data(); + + // First 8 bytes of the index contains the supposed length of the original file. + off_t originallen = 0; + memcpy(&originallen, idxptr, 8); + + // Open block cache file. + { + std::string bcachefile(ctx.deltadir); + bcachefile.append(file).append(BLOCKCACHE_EXT); + bcachefd = open(bcachefile.c_str(), O_RDONLY); + if (bcachefd <= 0) + { + LOG_ERR << errno << ": Open failed " << bcachefile; + return -1; + } + } + + // Create or Open original file. + { + std::string originalfile(ctx.datadir); + originalfile.append(file); + + // Create directory tree if not exist so we are able to create the file. + boost::filesystem::path filedir = boost::filesystem::path(originalfile).parent_path(); + if (created_dirs.count(filedir.string()) == 0) + { + boost::filesystem::create_directories(filedir); + created_dirs.emplace(filedir.string()); + } + + orifilefd = open(originalfile.c_str(), O_WRONLY | O_CREAT, FILE_PERMS); + if (orifilefd <= 0) + { + LOG_ERR << errno << ": Open failed " << originalfile; + return -1; + } + } + + // Restore the blocks as specified in block index. + for (uint32_t idxoffset = 8; idxoffset < bindex.size();) + { + // Find the block no. of where this block is from in the original file. + uint32_t blockno = 0; + memcpy(&blockno, idxptr + idxoffset, 4); + idxoffset += 4; + off_t orifileoffset = blockno * BLOCK_SIZE; + + // Find the offset where the block is located in the block cache file. + off_t bcacheoffset; + memcpy(&bcacheoffset, idxptr + idxoffset, 8); + idxoffset += 40; // Skip the hash(32) + + // Transfer the cached block to the target file. + copy_file_range(bcachefd, &bcacheoffset, orifilefd, &orifileoffset, BLOCK_SIZE, 0); + } + + // If the target file is bigger than the original size, truncate it to the original size. + off_t currentlen = lseek(orifilefd, 0, SEEK_END); + if (currentlen > originallen) + ftruncate(orifilefd, originallen); + + close(bcachefd); + close(orifilefd); + + return 0; +} + +// This is called after a rollback so the all checkpoint dirs shift by 1. +void state_restore::rewind_checkpoints() +{ + // Assuming we have restored the current state with current delta, + // we need to shift each history delta by 1 place. + + // Delete the state 0 (current) delta. + boost::filesystem::remove_all(ctx.deltadir); + + int16_t oldest_chkpnt = (MAX_CHECKPOINTS + 1) * -1; // +1 because we maintain one extra checkpoint in case of rollbacks. + for (int16_t chkpnt = -1; chkpnt >= oldest_chkpnt; chkpnt--) + { + std::string dir = get_statedir_root(chkpnt); + + if (boost::filesystem::exists(dir)) + { + if (chkpnt == -1) + { + // Shift -1 state delta dir to 0-state and delete -1 dir. + std::string delta_1 = dir + DELTA_DIR; + boost::filesystem::rename(delta_1, ctx.deltadir); + boost::filesystem::remove_all(dir); + } + else + { + std::string dirshift = get_statedir_root(chkpnt + 1); + boost::filesystem::rename(dir, dirshift); + } + } + } +} + +// Rolls back current state to previous state. +int state_restore::rollback(hasher::B2H &roothash) +{ + ctx = get_statedir_context(); + + delete_newfiles(); + if (restore_touchedfiles() == -1) + return -1; + + // Update hash tree. + hashtree_builder htreebuilder(ctx); + htreebuilder.generate(roothash); + + rewind_checkpoints(); + + return 0; +} + +} // namespace statefs \ No newline at end of file diff --git a/src/statefs/state_restore.hpp b/src/statefs/state_restore.hpp new file mode 100644 index 00000000..1038c441 --- /dev/null +++ b/src/statefs/state_restore.hpp @@ -0,0 +1,28 @@ +#ifndef _STATEFS_STATE_RESTORE_ +#define _STATEFS_STATE_RESTORE_ + +#include "../pchheader.hpp" +#include "hasher.hpp" +#include "state_common.hpp" + +namespace statefs +{ + +class state_restore +{ +private: + statedir_context ctx; + std::unordered_set created_dirs; + void delete_newfiles(); + int restore_touchedfiles(); + int read_blockindex(std::vector &buffer, std::string_view file); + int restore_blocks(std::string_view file, const std::vector &bindex); + void rewind_checkpoints(); + +public: + int rollback(hasher::B2H &roothash); +}; + +} // namespace statefs + +#endif diff --git a/src/util.cpp b/src/util.cpp index 50dffdc3..918cecb9 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -188,4 +188,13 @@ std::string_view getsv(const rapidjson::Value &v) return std::string_view(v.GetString(), v.GetStringLength()); } +// provide a safe std::string overload for realpath +std::string realpath(std::string path) +{ + std::array buffer; + ::realpath(path.c_str(), buffer.data()); + buffer[PATH_MAX] = '\0'; + return buffer.data(); +} + } // namespace util \ No newline at end of file diff --git a/src/util.hpp b/src/util.hpp index 3ebae8d0..f485fcf3 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -11,10 +11,10 @@ namespace util { // Hot Pocket version. Displayed on 'hotpocket version' and written to new contract configs. -constexpr const char* HP_VERSION = "0.1"; +constexpr const char *HP_VERSION = "0.1"; // Minimum compatible contract config version (this will be used to validate contract configs) -constexpr const char* MIN_CONTRACT_VERSION = "0.1"; +constexpr const char *MIN_CONTRACT_VERSION = "0.1"; // Current version of the peer message protocol. constexpr uint8_t PEERMSG_VERSION = 1; @@ -74,6 +74,8 @@ int version_compare(const std::string &x, const std::string &y); std::string_view getsv(const rapidjson::Value &v); +std::string realpath(std::string path); + } // namespace util #endif \ No newline at end of file