diff --git a/CMakeLists.txt b/CMakeLists.txt index 4b8fcf0f..50be2e08 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -81,6 +81,7 @@ target_link_libraries(hpusr hpsupport hpsock hpschema) add_library(hpcons src/cons/cons.cpp src/cons/ledger_handler.cpp + src/cons/statemap_handler.cpp ) target_link_libraries(hpcons hpsupport hpproc hpp2p hpusr) diff --git a/examples/echocontract/contract.js b/examples/echocontract/contract.js index 7ed22bdb..44796a22 100644 --- a/examples/echocontract/contract.js +++ b/examples/echocontract/contract.js @@ -3,19 +3,22 @@ process.on('uncaughtException', (err) => { }) const fs = require('fs') -let input = fs.readFileSync(0, 'utf8'); //console.log("===Sample contract started==="); //console.log("Contract args received from hp: " + input); -let hpargs = JSON.parse(input); +let hpargsstr = fs.readFileSync(0, 'utf8'); +let hpargs = JSON.parse(hpargsstr); + +// We just save execution args as an example state file change. +fs.appendFileSync("state/execargs.txt", hpargsstr + "\n"); Object.keys(hpargs.usrfd).forEach(function (key, index) { let userfds = hpargs.usrfd[key]; let userinput = fs.readFileSync(userfds[0], 'utf8'); if (userinput.length > 0) { - console.log("Input received from user " + key + ":"); - console.log(userinput); + // Append user input to a state file. + fs.appendFileSync("state/userinputs.txt", userinput + "\n"); fs.writeSync(userfds[1], "Echoing: " + userinput); } }); diff --git a/src/conf.cpp b/src/conf.cpp index de8bf151..94bf8465 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -75,6 +75,7 @@ int create_contract() boost::filesystem::create_directories(ctx.configDir); boost::filesystem::create_directories(ctx.histDir); boost::filesystem::create_directories(ctx.stateDir); + boost::filesystem::create_directories(ctx.stateMapDir); //Create config file with default settings. @@ -121,6 +122,7 @@ void set_contract_dir_paths(std::string basedir) ctx.tlsCertFile = ctx.configDir + "/tlscert.pem"; ctx.histDir = basedir + "/hist"; ctx.stateDir = basedir + "/state"; + ctx.stateMapDir = basedir + "/statemap"; ctx.logDir = basedir + "/log"; } @@ -217,7 +219,7 @@ int load_config() cfg.peerport = d["peerport"].GetInt(); cfg.roundtime = d["roundtime"].GetInt(); cfg.pubport = d["pubport"].GetInt(); - + cfg.pubmaxsize = d["pubmaxsize"].GetUint64(); cfg.pubmaxcpm = d["pubmaxcpm"].GetUint64(); cfg.pubmaxbadmpm = d["pubmaxbadmpm"].GetUint64(); @@ -450,7 +452,14 @@ int validate_config() */ int validate_contract_dir_paths() { - const std::string paths[6] = {ctx.contractDir, ctx.configFile, ctx.histDir, ctx.stateDir, ctx.tlsKeyFile, ctx.tlsCertFile}; + const std::string paths[7] = { + ctx.contractDir, + ctx.configFile, + ctx.histDir, + ctx.stateDir, + ctx.stateMapDir, + ctx.tlsKeyFile, + ctx.tlsCertFile}; for (const std::string &path : paths) { @@ -506,7 +515,7 @@ int is_schema_valid(const rapidjson::Document &d) "\"peerport\": { \"type\": \"integer\" }," "\"roundtime\": { \"type\": \"integer\" }," "\"pubport\": { \"type\": \"integer\" }," - + "\"pubmaxsize\": { \"type\": \"integer\" }," "\"pubmaxcpm\": { \"type\": \"integer\" }," "\"pubmaxbadmpm\": { \"type\": \"integer\" }," diff --git a/src/conf.hpp b/src/conf.hpp index ea67cc09..e81de946 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -21,6 +21,7 @@ struct contract_ctx std::string contractDir; // Contract base directory std::string histDir; // Contract history dir std::string stateDir; // Contract state dir + std::string stateMapDir; // Contract state map dir (.merkel files) std::string logDir; // Contract log dir std::string configDir; // Contract config dir std::string configFile; // Full path to the contract config file diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index 5116fb99..44c3b478 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -11,6 +11,7 @@ #include "../crypto.hpp" #include "../proc/proc.hpp" #include "ledger_handler.hpp" +#include "statemap_handler.hpp" #include "cons.hpp" namespace p2pmsg = fbschema::p2pmsg; @@ -56,11 +57,11 @@ void consensus() { const bool self = p.pubkey == conf::cfg.pubkey; LOG_DBG << "[stage" << std::to_string(p.stage) - << "] users:" << p.users.size() - << " hinp:" << p.hash_inputs.size() - << " hout:" << p.hash_outputs.size() - << " lcl:" << p.lcl - << " self:" << self; + << "] users:" << p.users.size() + << " hinp:" << p.hash_inputs.size() + << " hout:" << p.hash_outputs.size() + << " lcl:" << p.lcl + << " self:" << self; } LOG_DBG << "timenow: " << std::to_string(ctx.time_now); @@ -170,7 +171,7 @@ void broadcast_nonunl_proposal() { // Construct NUP. p2p::nonunl_proposal nup; - + std::lock_guard lock(p2p::collected_msgs.nonunl_proposals_mutex); for (auto &[sid, user] : usr::ctx.users) { @@ -186,7 +187,8 @@ void broadcast_nonunl_proposal() p2pmsg::create_msg_from_nonunl_proposal(msg.builder(), nup); p2p::broadcast_message(msg); - LOG_DBG << "NUP sent." << " users:" << nup.user_messages.size(); + LOG_DBG << "NUP sent." + << " users:" << nup.user_messages.size(); } /** @@ -201,7 +203,7 @@ void verify_and_populate_candidate_user_inputs() { for (const auto &[pubkey, umsgs] : p.user_messages) { - // Populate user list. + // Populate user list. ctx.candidate_users.emplace(pubkey); for (const usr::user_submitted_message &umsg : umsgs) @@ -523,10 +525,17 @@ void apply_ledger(const p2p::proposal &cons_prop) // todo:check state against the winning / canonical state // and act accordingly (rollback, ask state from peer, etc.) + // This will hold a list of file blocks that was updated by the contract process. + // We then feed this information to state tracking logic. + proc::contract_fblockmap_t updated_blocks; + proc::contract_bufmap_t useriobufmap; feed_inputs_to_contract_bufmap(useriobufmap, cons_prop); - run_contract_binary(cons_prop.time, useriobufmap); + + run_contract_binary(cons_prop.time, useriobufmap, updated_blocks); + extract_outputs_from_contract_bufmap(useriobufmap); + update_state_blockmap(updated_blocks); } /** @@ -536,7 +545,7 @@ void apply_ledger(const p2p::proposal &cons_prop) void dispatch_user_outputs(const p2p::proposal &cons_prop) { std::lock_guard lock(usr::ctx.users_mutex); - + for (const std::string &hash : cons_prop.hash_outputs) { const auto cu_itr = ctx.candidate_user_outputs.find(hash); @@ -556,8 +565,8 @@ void dispatch_user_outputs(const p2p::proposal &cons_prop) const auto sess_itr = usr::ctx.sessionids.find(cand_output.userpubkey); if (sess_itr != usr::ctx.sessionids.end()) // match found { - const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id. - if (user_itr != usr::ctx.users.end()) // match found + const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id. + if (user_itr != usr::ctx.users.end()) // match found { std::string outputtosend; outputtosend.swap(cand_output.output); @@ -569,7 +578,7 @@ void dispatch_user_outputs(const p2p::proposal &cons_prop) } } } - + // now we can safely clear our candidate outputs. ctx.candidate_user_outputs.clear(); } @@ -641,16 +650,12 @@ void extract_outputs_from_contract_bufmap(proc::contract_bufmap_t &bufmap) * @param time_now The time that must be passed on to the contract. * @param useriobufmap The contract bufmap which holds user I/O buffers. */ -void run_contract_binary(const int64_t time_now, proc::contract_bufmap_t &useriobufmap) +void run_contract_binary(const int64_t time_now, proc::contract_bufmap_t &useriobufmap, proc::contract_fblockmap_t &state_updates) { // todo:implement exchange of npl and hpsc bufs proc::contract_bufmap_t nplbufmap; proc::contract_iobuf_pair hpscbufpair; - // This will hold a list of file blocks that was updated by the contract process. - // We then feed this information to state tracking logic. - proc::contract_fblockmap_t state_updates; - proc::exec_contract( proc::contract_exec_args(time_now, useriobufmap, nplbufmap, hpscbufpair, state_updates)); } diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp index c07bfad5..18aab271 100644 --- a/src/cons/cons.hpp +++ b/src/cons/cons.hpp @@ -122,7 +122,7 @@ void feed_inputs_to_contract_bufmap(proc::contract_bufmap_t &bufmap, const p2p:: void extract_outputs_from_contract_bufmap(proc::contract_bufmap_t &bufmap); -void run_contract_binary(const int64_t time_now, proc::contract_bufmap_t &useriobufmap); +void run_contract_binary(const int64_t time_now, proc::contract_bufmap_t &useriobufmap, proc::contract_fblockmap_t &state_updates); template void increment(std::map &counter, const T &candidate); diff --git a/src/cons/statemap_handler.cpp b/src/cons/statemap_handler.cpp new file mode 100644 index 00000000..26c7a1e0 --- /dev/null +++ b/src/cons/statemap_handler.cpp @@ -0,0 +1,177 @@ +#include "../pchheader.hpp" +#include "../conf.hpp" +#include "../hplog.hpp" +#include "../proc/proc.hpp" + +namespace cons +{ + +constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; //this is the logical block size files are decomposed in, it's also the size of the merkle tree +constexpr size_t HASH_SIZE = crypto_generichash_blake2b_BYTES; +constexpr size_t MAX_HASHES = BLOCK_SIZE / HASH_SIZE; +constexpr const char *MERKLE_EXTENSION = ".merkle"; + +struct B2H // blake2b hash is 32 bytes which we store as 4 quad words +{ + uint64_t data[4]; +}; + +// provide some helper functions for working with 32 byte hash type +bool operator==(B2H &lhs, B2H &rhs) +{ + return lhs.data[0] == rhs.data[0] && lhs.data[1] == rhs.data[1] && lhs.data[2] == rhs.data[2] && lhs.data[3] == rhs.data[3]; +} + +// the actual hash function, note that the B2H datatype is always passed by value being only 4 quadwords +B2H hash(const void *ptr, size_t len) +{ + B2H ret; + crypto_generichash_blake2b_state state; + crypto_generichash_blake2b_init(&state, NULL, 0, HASH_SIZE); + crypto_generichash_blake2b_update(&state, + reinterpret_cast(ptr), len); + crypto_generichash_blake2b_final( + &state, + reinterpret_cast(&ret), + HASH_SIZE); + return ret; +} + +/** + * Updates the .merkel block map for the given state file. + * @param filepath Full path of the state file. + * @param hinted_blocks Set of updated file block ids. If empty full merkel block map will be recomputed. + */ +int update_file_blockmap(const std::string &filepath, const std::set &hinted_blocks) +{ + // .merkel file path will be corresponding path in "statemap" directory. + std::string merkle_fn; + const size_t relative_path_len = filepath.length() - conf::ctx.stateDir.length(); + merkle_fn.reserve(conf::ctx.stateMapDir.length() + relative_path_len + 7); + merkle_fn.append(conf::ctx.stateMapDir); + merkle_fn.append(filepath.substr(conf::ctx.stateDir.length(), relative_path_len)); + merkle_fn.append(MERKLE_EXTENSION); + + // To benefit from hint mode, the .merkle file must already exist. If not we simply disable hint mode + // because we anyway have to rebuild entire merkle file from scratch. + bool hint_mode = !hinted_blocks.empty(); + if (access(merkle_fn.c_str(), F_OK) == -1) + hint_mode = false; + + // open the target file for which we are building or updating a merkle tree + FILE *f = fopen(filepath.c_str(), "rb"); + if (!f) + { + LOG_ERR << "Failed to open state file: " << filepath << " for reading."; + return -1; + } + + // the merkle tree structure is only 4mb and could technically sit on stack in most cases but + // TODO: ******Why can't we allocate this on the stack? + auto merkle_tree = std::make_unique(MAX_HASHES); + // same with the read buffer + auto read_buffer = std::make_unique(BLOCK_SIZE); + + // this iterator will be used if we are in hint mode + auto hint = hinted_blocks.begin(); + + size_t block_counter = 0; + size_t block_location = 0; + + while (true) + { + // if hint blocks have been specified we'll seek to specific blocks in the file based on hint list. + if (hint_mode) + { + // check if we've run out of elements + if (hint == hinted_blocks.end()) + break; + + // get the next block on their list + block_location = *hint++; + + // seek the file cursor to the block + fseek(f, block_location * BLOCK_SIZE, SEEK_SET); + } + + // read the block + int bytesread = fread(read_buffer.get(), 1, BLOCK_SIZE, f); + if (bytesread <= 0) + break; + + // calculate the block hash + merkle_tree[block_location++] = hash(read_buffer.get(), std::min(BLOCK_SIZE, (size_t)bytesread)); + block_counter++; + } + + fclose(f); + + // now that we've computed all the block hashes we are interested in we have to deal with the .merkle file + + // open the .merkle file + // if we are in hint_mode we will open it in rb+ which will preserve its contents + // otherwise we will truncate it if it already exists, because we will have to overwrite everything anyway + FILE *fm = fopen(merkle_fn.c_str(), (hint_mode ? "rb+" : "wb+")); + if (!fm) + { + LOG_ERR << "Failed to open merkle file: " << filepath << " for writing."; + return -1; + } + + // get the size of the .merkle file + fseek(fm, 0L, SEEK_END); + const size_t len = ftell(fm); + rewind(fm); + + // write the updated hashes + if (hint_mode) + { + // write selectively the updated block hashes + const int fd = fileno(fm); + for (int block : hinted_blocks) + pwrite(fd, &(merkle_tree[block]), HASH_SIZE, HASH_SIZE * block); + } + else + { + // write the whole tree to the file + fwrite(reinterpret_cast(merkle_tree.get()), 1, std::min(BLOCK_SIZE, HASH_SIZE * block_counter), fm); + } + + // compute the root hash + + B2H root_hash; + + if (hint_mode) + { + // if we only updated selective hashes (hint mode) then now we need to compute a hash over the whole merkle file + // so we first need to read it in + rewind(f); + int bytesread = fread(read_buffer.get(), 1, BLOCK_SIZE, f); + if (bytesread <= 0) + fprintf(stderr, "could not read merkle file after writing to it?!\n"); + + // now simply compute the hash of what we just read, that's our root hash + root_hash = hash(read_buffer.get(), std::min(BLOCK_SIZE, (size_t)bytesread)); + } + else + { + // if we've just written out the whole merkle tree we already know it + root_hash = hash(merkle_tree.get(), std::min(BLOCK_SIZE, HASH_SIZE * block_counter)); + } + + fclose(fm); +} + +/** + * Updates the state block map for the files updated as specified by the provided blockmap. + * TODO: This doesn't currently support deleted file tracking. + */ +void update_state_blockmap(const proc::contract_fblockmap_t &updates) +{ + for (const auto &[filepath, blocks] : updates) + { + update_file_blockmap(filepath, blocks); + } +} + +} // namespace cons \ No newline at end of file diff --git a/src/cons/statemap_handler.hpp b/src/cons/statemap_handler.hpp new file mode 100644 index 00000000..7e295acb --- /dev/null +++ b/src/cons/statemap_handler.hpp @@ -0,0 +1,11 @@ +#ifndef _HP_CONS_STATEMAP_HANDLER_ +#define _HP_CONS_STATEMAP_HANDLER_ + +#include "../proc/proc.hpp" + +namespace cons +{ +void update_state_blockmap(const proc::contract_fblockmap_t &updates); +} + +#endif \ No newline at end of file