Integrated merkel map buildup after contract execution.

This commit is contained in:
ravinsp
2019-11-09 11:41:00 +05:30
parent 32ef41bfdf
commit a3bc504bf7
8 changed files with 233 additions and 26 deletions

View File

@@ -81,6 +81,7 @@ target_link_libraries(hpusr hpsupport hpsock hpschema)
add_library(hpcons
src/cons/cons.cpp
src/cons/ledger_handler.cpp
src/cons/statemap_handler.cpp
)
target_link_libraries(hpcons hpsupport hpproc hpp2p hpusr)

View File

@@ -3,19 +3,22 @@ process.on('uncaughtException', (err) => {
})
const fs = require('fs')
let input = fs.readFileSync(0, 'utf8');
//console.log("===Sample contract started===");
//console.log("Contract args received from hp: " + input);
let hpargs = JSON.parse(input);
let hpargsstr = fs.readFileSync(0, 'utf8');
let hpargs = JSON.parse(hpargsstr);
// We just save execution args as an example state file change.
fs.appendFileSync("state/execargs.txt", hpargsstr + "\n");
Object.keys(hpargs.usrfd).forEach(function (key, index) {
let userfds = hpargs.usrfd[key];
let userinput = fs.readFileSync(userfds[0], 'utf8');
if (userinput.length > 0) {
console.log("Input received from user " + key + ":");
console.log(userinput);
// Append user input to a state file.
fs.appendFileSync("state/userinputs.txt", userinput + "\n");
fs.writeSync(userfds[1], "Echoing: " + userinput);
}
});

View File

@@ -75,6 +75,7 @@ int create_contract()
boost::filesystem::create_directories(ctx.configDir);
boost::filesystem::create_directories(ctx.histDir);
boost::filesystem::create_directories(ctx.stateDir);
boost::filesystem::create_directories(ctx.stateMapDir);
//Create config file with default settings.
@@ -121,6 +122,7 @@ void set_contract_dir_paths(std::string basedir)
ctx.tlsCertFile = ctx.configDir + "/tlscert.pem";
ctx.histDir = basedir + "/hist";
ctx.stateDir = basedir + "/state";
ctx.stateMapDir = basedir + "/statemap";
ctx.logDir = basedir + "/log";
}
@@ -217,7 +219,7 @@ int load_config()
cfg.peerport = d["peerport"].GetInt();
cfg.roundtime = d["roundtime"].GetInt();
cfg.pubport = d["pubport"].GetInt();
cfg.pubmaxsize = d["pubmaxsize"].GetUint64();
cfg.pubmaxcpm = d["pubmaxcpm"].GetUint64();
cfg.pubmaxbadmpm = d["pubmaxbadmpm"].GetUint64();
@@ -450,7 +452,14 @@ int validate_config()
*/
int validate_contract_dir_paths()
{
const std::string paths[6] = {ctx.contractDir, ctx.configFile, ctx.histDir, ctx.stateDir, ctx.tlsKeyFile, ctx.tlsCertFile};
const std::string paths[7] = {
ctx.contractDir,
ctx.configFile,
ctx.histDir,
ctx.stateDir,
ctx.stateMapDir,
ctx.tlsKeyFile,
ctx.tlsCertFile};
for (const std::string &path : paths)
{
@@ -506,7 +515,7 @@ int is_schema_valid(const rapidjson::Document &d)
"\"peerport\": { \"type\": \"integer\" },"
"\"roundtime\": { \"type\": \"integer\" },"
"\"pubport\": { \"type\": \"integer\" },"
"\"pubmaxsize\": { \"type\": \"integer\" },"
"\"pubmaxcpm\": { \"type\": \"integer\" },"
"\"pubmaxbadmpm\": { \"type\": \"integer\" },"

View File

@@ -21,6 +21,7 @@ struct contract_ctx
std::string contractDir; // Contract base directory
std::string histDir; // Contract history dir
std::string stateDir; // Contract state dir
std::string stateMapDir; // Contract state map dir (.merkel files)
std::string logDir; // Contract log dir
std::string configDir; // Contract config dir
std::string configFile; // Full path to the contract config file

View File

@@ -11,6 +11,7 @@
#include "../crypto.hpp"
#include "../proc/proc.hpp"
#include "ledger_handler.hpp"
#include "statemap_handler.hpp"
#include "cons.hpp"
namespace p2pmsg = fbschema::p2pmsg;
@@ -56,11 +57,11 @@ void consensus()
{
const bool self = p.pubkey == conf::cfg.pubkey;
LOG_DBG << "[stage" << std::to_string(p.stage)
<< "] users:" << p.users.size()
<< " hinp:" << p.hash_inputs.size()
<< " hout:" << p.hash_outputs.size()
<< " lcl:" << p.lcl
<< " self:" << self;
<< "] users:" << p.users.size()
<< " hinp:" << p.hash_inputs.size()
<< " hout:" << p.hash_outputs.size()
<< " lcl:" << p.lcl
<< " self:" << self;
}
LOG_DBG << "timenow: " << std::to_string(ctx.time_now);
@@ -170,7 +171,7 @@ void broadcast_nonunl_proposal()
{
// Construct NUP.
p2p::nonunl_proposal nup;
std::lock_guard<std::mutex> lock(p2p::collected_msgs.nonunl_proposals_mutex);
for (auto &[sid, user] : usr::ctx.users)
{
@@ -186,7 +187,8 @@ void broadcast_nonunl_proposal()
p2pmsg::create_msg_from_nonunl_proposal(msg.builder(), nup);
p2p::broadcast_message(msg);
LOG_DBG << "NUP sent." << " users:" << nup.user_messages.size();
LOG_DBG << "NUP sent."
<< " users:" << nup.user_messages.size();
}
/**
@@ -201,7 +203,7 @@ void verify_and_populate_candidate_user_inputs()
{
for (const auto &[pubkey, umsgs] : p.user_messages)
{
// Populate user list.
// Populate user list.
ctx.candidate_users.emplace(pubkey);
for (const usr::user_submitted_message &umsg : umsgs)
@@ -523,10 +525,17 @@ void apply_ledger(const p2p::proposal &cons_prop)
// todo:check state against the winning / canonical state
// and act accordingly (rollback, ask state from peer, etc.)
// This will hold a list of file blocks that was updated by the contract process.
// We then feed this information to state tracking logic.
proc::contract_fblockmap_t updated_blocks;
proc::contract_bufmap_t useriobufmap;
feed_inputs_to_contract_bufmap(useriobufmap, cons_prop);
run_contract_binary(cons_prop.time, useriobufmap);
run_contract_binary(cons_prop.time, useriobufmap, updated_blocks);
extract_outputs_from_contract_bufmap(useriobufmap);
update_state_blockmap(updated_blocks);
}
/**
@@ -536,7 +545,7 @@ void apply_ledger(const p2p::proposal &cons_prop)
void dispatch_user_outputs(const p2p::proposal &cons_prop)
{
std::lock_guard<std::mutex> lock(usr::ctx.users_mutex);
for (const std::string &hash : cons_prop.hash_outputs)
{
const auto cu_itr = ctx.candidate_user_outputs.find(hash);
@@ -556,8 +565,8 @@ void dispatch_user_outputs(const p2p::proposal &cons_prop)
const auto sess_itr = usr::ctx.sessionids.find(cand_output.userpubkey);
if (sess_itr != usr::ctx.sessionids.end()) // match found
{
const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id.
if (user_itr != usr::ctx.users.end()) // match found
const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id.
if (user_itr != usr::ctx.users.end()) // match found
{
std::string outputtosend;
outputtosend.swap(cand_output.output);
@@ -569,7 +578,7 @@ void dispatch_user_outputs(const p2p::proposal &cons_prop)
}
}
}
// now we can safely clear our candidate outputs.
ctx.candidate_user_outputs.clear();
}
@@ -641,16 +650,12 @@ void extract_outputs_from_contract_bufmap(proc::contract_bufmap_t &bufmap)
* @param time_now The time that must be passed on to the contract.
* @param useriobufmap The contract bufmap which holds user I/O buffers.
*/
void run_contract_binary(const int64_t time_now, proc::contract_bufmap_t &useriobufmap)
void run_contract_binary(const int64_t time_now, proc::contract_bufmap_t &useriobufmap, proc::contract_fblockmap_t &state_updates)
{
// todo:implement exchange of npl and hpsc bufs
proc::contract_bufmap_t nplbufmap;
proc::contract_iobuf_pair hpscbufpair;
// This will hold a list of file blocks that was updated by the contract process.
// We then feed this information to state tracking logic.
proc::contract_fblockmap_t state_updates;
proc::exec_contract(
proc::contract_exec_args(time_now, useriobufmap, nplbufmap, hpscbufpair, state_updates));
}

View File

@@ -122,7 +122,7 @@ void feed_inputs_to_contract_bufmap(proc::contract_bufmap_t &bufmap, const p2p::
void extract_outputs_from_contract_bufmap(proc::contract_bufmap_t &bufmap);
void run_contract_binary(const int64_t time_now, proc::contract_bufmap_t &useriobufmap);
void run_contract_binary(const int64_t time_now, proc::contract_bufmap_t &useriobufmap, proc::contract_fblockmap_t &state_updates);
template <typename T>
void increment(std::map<T, int32_t> &counter, const T &candidate);

View File

@@ -0,0 +1,177 @@
#include "../pchheader.hpp"
#include "../conf.hpp"
#include "../hplog.hpp"
#include "../proc/proc.hpp"
namespace cons
{
constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; //this is the logical block size files are decomposed in, it's also the size of the merkle tree
constexpr size_t HASH_SIZE = crypto_generichash_blake2b_BYTES;
constexpr size_t MAX_HASHES = BLOCK_SIZE / HASH_SIZE;
constexpr const char *MERKLE_EXTENSION = ".merkle";
struct B2H // blake2b hash is 32 bytes which we store as 4 quad words
{
uint64_t data[4];
};
// provide some helper functions for working with 32 byte hash type
bool operator==(B2H &lhs, B2H &rhs)
{
return lhs.data[0] == rhs.data[0] && lhs.data[1] == rhs.data[1] && lhs.data[2] == rhs.data[2] && lhs.data[3] == rhs.data[3];
}
// the actual hash function, note that the B2H datatype is always passed by value being only 4 quadwords
B2H hash(const void *ptr, size_t len)
{
B2H ret;
crypto_generichash_blake2b_state state;
crypto_generichash_blake2b_init(&state, NULL, 0, HASH_SIZE);
crypto_generichash_blake2b_update(&state,
reinterpret_cast<const unsigned char *>(ptr), len);
crypto_generichash_blake2b_final(
&state,
reinterpret_cast<unsigned char *>(&ret),
HASH_SIZE);
return ret;
}
/**
* Updates the .merkel block map for the given state file.
* @param filepath Full path of the state file.
* @param hinted_blocks Set of updated file block ids. If empty full merkel block map will be recomputed.
*/
int update_file_blockmap(const std::string &filepath, const std::set<uint32_t> &hinted_blocks)
{
// .merkel file path will be corresponding path in "statemap" directory.
std::string merkle_fn;
const size_t relative_path_len = filepath.length() - conf::ctx.stateDir.length();
merkle_fn.reserve(conf::ctx.stateMapDir.length() + relative_path_len + 7);
merkle_fn.append(conf::ctx.stateMapDir);
merkle_fn.append(filepath.substr(conf::ctx.stateDir.length(), relative_path_len));
merkle_fn.append(MERKLE_EXTENSION);
// To benefit from hint mode, the .merkle file must already exist. If not we simply disable hint mode
// because we anyway have to rebuild entire merkle file from scratch.
bool hint_mode = !hinted_blocks.empty();
if (access(merkle_fn.c_str(), F_OK) == -1)
hint_mode = false;
// open the target file for which we are building or updating a merkle tree
FILE *f = fopen(filepath.c_str(), "rb");
if (!f)
{
LOG_ERR << "Failed to open state file: " << filepath << " for reading.";
return -1;
}
// the merkle tree structure is only 4mb and could technically sit on stack in most cases but
// TODO: ******Why can't we allocate this on the stack?
auto merkle_tree = std::make_unique<B2H[]>(MAX_HASHES);
// same with the read buffer
auto read_buffer = std::make_unique<uint8_t[]>(BLOCK_SIZE);
// this iterator will be used if we are in hint mode
auto hint = hinted_blocks.begin();
size_t block_counter = 0;
size_t block_location = 0;
while (true)
{
// if hint blocks have been specified we'll seek to specific blocks in the file based on hint list.
if (hint_mode)
{
// check if we've run out of elements
if (hint == hinted_blocks.end())
break;
// get the next block on their list
block_location = *hint++;
// seek the file cursor to the block
fseek(f, block_location * BLOCK_SIZE, SEEK_SET);
}
// read the block
int bytesread = fread(read_buffer.get(), 1, BLOCK_SIZE, f);
if (bytesread <= 0)
break;
// calculate the block hash
merkle_tree[block_location++] = hash(read_buffer.get(), std::min(BLOCK_SIZE, (size_t)bytesread));
block_counter++;
}
fclose(f);
// now that we've computed all the block hashes we are interested in we have to deal with the .merkle file
// open the .merkle file
// if we are in hint_mode we will open it in rb+ which will preserve its contents
// otherwise we will truncate it if it already exists, because we will have to overwrite everything anyway
FILE *fm = fopen(merkle_fn.c_str(), (hint_mode ? "rb+" : "wb+"));
if (!fm)
{
LOG_ERR << "Failed to open merkle file: " << filepath << " for writing.";
return -1;
}
// get the size of the .merkle file
fseek(fm, 0L, SEEK_END);
const size_t len = ftell(fm);
rewind(fm);
// write the updated hashes
if (hint_mode)
{
// write selectively the updated block hashes
const int fd = fileno(fm);
for (int block : hinted_blocks)
pwrite(fd, &(merkle_tree[block]), HASH_SIZE, HASH_SIZE * block);
}
else
{
// write the whole tree to the file
fwrite(reinterpret_cast<void *>(merkle_tree.get()), 1, std::min(BLOCK_SIZE, HASH_SIZE * block_counter), fm);
}
// compute the root hash
B2H root_hash;
if (hint_mode)
{
// if we only updated selective hashes (hint mode) then now we need to compute a hash over the whole merkle file
// so we first need to read it in
rewind(f);
int bytesread = fread(read_buffer.get(), 1, BLOCK_SIZE, f);
if (bytesread <= 0)
fprintf(stderr, "could not read merkle file after writing to it?!\n");
// now simply compute the hash of what we just read, that's our root hash
root_hash = hash(read_buffer.get(), std::min(BLOCK_SIZE, (size_t)bytesread));
}
else
{
// if we've just written out the whole merkle tree we already know it
root_hash = hash(merkle_tree.get(), std::min(BLOCK_SIZE, HASH_SIZE * block_counter));
}
fclose(fm);
}
/**
* Updates the state block map for the files updated as specified by the provided blockmap.
* TODO: This doesn't currently support deleted file tracking.
*/
void update_state_blockmap(const proc::contract_fblockmap_t &updates)
{
for (const auto &[filepath, blocks] : updates)
{
update_file_blockmap(filepath, blocks);
}
}
} // namespace cons

View File

@@ -0,0 +1,11 @@
#ifndef _HP_CONS_STATEMAP_HANDLER_
#define _HP_CONS_STATEMAP_HANDLER_
#include "../proc/proc.hpp"
namespace cons
{
void update_state_blockmap(const proc::contract_fblockmap_t &updates);
}
#endif