diff --git a/CMakeLists.txt b/CMakeLists.txt index 0fbccbe0..ef0b1ab5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,7 +54,8 @@ add_executable(hpcore src/usr/usr.cpp src/cons/cons.cpp src/cons/ledger_handler.cpp - src/cons/state_handler.cpp + src/state/state_sync.cpp + src/state/state_serve.cpp src/main.cpp ) target_link_libraries(hpcore @@ -75,8 +76,8 @@ add_dependencies(hpcore ) add_custom_command(TARGET hpcore POST_BUILD - COMMAND strip ./build/hpcore - COMMAND strip ./build/appbill + # COMMAND strip ./build/hpcore + # COMMAND strip ./build/appbill COMMAND cp ./test/bin/websocketd ./test/bin/websocat ./test/bin/hpfs ./build/ ) diff --git a/README.md b/README.md index 0bf740cd..fe80a571 100644 --- a/README.md +++ b/README.md @@ -96,7 +96,7 @@ Code is divided into subsystems via namespaces. **p2p::** Handles peer-to-peer connections and message exchange between nodes. Makes use of **crypto** and **comm**. -**cons::** Handles consensus and proposal rounds. Makes use of **usr**, **p2p** and **proc** +**cons::** Handles consensus and proposal rounds. Makes use of **usr**, **p2p** and **sc** **comm::** Handles generic web sockets communication functionality. Mainly acts as a wrapper for websocketd/websocat. diff --git a/examples/echo_contract/contract.js b/examples/echo_contract/contract.js index c15cb925..3a8086c4 100644 --- a/examples/echo_contract/contract.js +++ b/examples/echo_contract/contract.js @@ -22,17 +22,17 @@ Object.keys(hpargs.usrfd).forEach(function (key, index) { } }); -let nplinput = fs.readFileSync(hpargs.nplfd[0], 'utf8'); -if (nplinput.length > 0) { - console.log("Input received from hp:"); +if (hpargs.nplfd[0] != -1) { + let nplinput = fs.readFileSync(hpargs.nplfd[0], 'utf8'); + console.log("Input received from peers:"); console.log(nplinput); fs.writeSync(hpargs.nplfd[1], "Echoing: " + nplinput); } -let hpinput = fs.readFileSync(hpargs.hpfd[0], 'utf8'); -if (hpinput.length > 0) { - //console.log("Input received from hp:"); - //console.log(hpinput); +if (hpargs.hpfd[0] != -1) { + let hpinput = fs.readFileSync(hpargs.hpfd[0], 'utf8'); + console.log("Input received from hp:"); + console.log(hpinput); fs.writeSync(hpargs.hpfd[1], "Echoing: " + hpinput); } diff --git a/src/comm/comm_client.cpp b/src/comm/comm_client.cpp index 658495d8..0638c669 100644 --- a/src/comm/comm_client.cpp +++ b/src/comm/comm_client.cpp @@ -59,6 +59,8 @@ namespace comm else if (pid == 0) { // Websocat process. + util::unmask_signal(); + close(write_pipe[1]); //parent write close(read_pipe[0]); //parent read diff --git a/src/comm/comm_server.cpp b/src/comm/comm_server.cpp index b01e99ab..ef7fe6b1 100644 --- a/src/comm/comm_server.cpp +++ b/src/comm/comm_server.cpp @@ -196,17 +196,24 @@ namespace comm { // New client connected. const std::string ip = get_cgi_ip(client_fd); - - if (corebill::is_banned(ip)) + if (!ip.empty()) { - LOG_DBG << "Dropping connection for banned host " << ip; - close(client_fd); + if (corebill::is_banned(ip)) + { + LOG_DBG << "Dropping connection for banned host " << ip; + close(client_fd); + } + else + { + comm_session session(ip, client_fd, client_fd, session_type, is_binary, true, metric_thresholds); + if (session.on_connect() == 0) + sessions.try_emplace(client_fd, std::move(session)); + } } else { - comm_session session(ip, client_fd, client_fd, session_type, is_binary, true, metric_thresholds); - if (session.on_connect() == 0) - sessions.try_emplace(client_fd, std::move(session)); + close(client_fd); + LOG_ERR << "Closed bad client socket: " << client_fd; } } } @@ -292,6 +299,8 @@ namespace comm else if (pid == 0) { // Websocketd process. + util::unmask_signal(); + // We are using websocketd forked repo: https://github.com/codetsunami/websocketd if (firewall_out > 0) @@ -303,9 +312,9 @@ namespace comm } std::string max_frame = std::string("--maxframe=") - .append(use_size_header - ? "4294967296" // 4GB - : std::to_string(max_msg_size)); + .append(use_size_header + ? "4294967296" // 4GB + : std::to_string(max_msg_size)); // Fill process args. char *execv_args[] = { diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index b4d87c2d..5c3add80 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -12,8 +12,8 @@ #include "../sc.hpp" #include "../hpfs/h32.hpp" #include "../hpfs/hpfs.hpp" +#include "../state/state_sync.hpp" #include "ledger_handler.hpp" -#include "state_handler.hpp" #include "cons.hpp" namespace p2pmsg = fbschema::p2pmsg; @@ -36,19 +36,19 @@ namespace cons int init() { - //set start stage - ctx.stage = 0; - //load lcl details from lcl history. ledger_history ldr_hist = load_ledger(); ctx.led_seq_no = ldr_hist.led_seq_no; ctx.lcl = ldr_hist.lcl; ctx.ledger_cache.swap(ldr_hist.cache); - if (hpfs::get_root_hash(ctx.curr_state_hash) == -1) + if (get_initial_state_hash(ctx.state) == -1) + { + LOG_ERR << "Failed to get initial state hash."; return -1; + } - LOG_INFO << "Initial state: " << ctx.curr_state_hash; + LOG_INFO << "Initial state: " << ctx.state; // We allocate 1/5 of the round time to each stage expect stage 3. For stage 3 we allocate 2/5. // Stage 3 is allocated an extra stage_time unit becayse a node needs enough time to @@ -65,10 +65,6 @@ namespace cons */ void deinit() { - if (init_success) - { - - } } int run_consensus() @@ -158,14 +154,12 @@ namespace cons vote_counter votes; // check if we're ahead/behind of consensus lcl - bool is_lcl_desync, should_request_history; + bool is_lcl_desync = false, should_request_history = false; std::string majority_lcl; check_lcl_votes(is_lcl_desync, should_request_history, majority_lcl, votes); if (is_lcl_desync) { - ctx.is_lcl_syncing = true; - if (should_request_history) { LOG_INFO << "Syncing lcl. Curr lcl:" << cons::ctx.lcl.substr(0, 15) << " majority:" << majority_lcl.substr(0, 15); @@ -191,14 +185,16 @@ namespace cons } else { - const bool lcl_syncing_just_finished = ctx.is_lcl_syncing; - ctx.is_lcl_syncing = false; + bool is_state_desync = false; + hpfs::h32 majority_state = hpfs::h32_empty; + check_state_votes(is_state_desync, majority_state, votes); - if (lcl_syncing_just_finished) - ; //TODO: Check and compare majotiry state and start state sync. - bool is_state_syncing = false; - - if (!is_state_syncing) + if (is_state_desync) + { + conf::change_operating_mode(conf::OPERATING_MODE::OBSERVER); + state_sync::set_target(majority_state, on_state_sync_completion); + } + else { conf::change_operating_mode(conf::OPERATING_MODE::PROPOSER); @@ -214,7 +210,7 @@ namespace cons // node has finished a consensus round (all 4 stages). LOG_INFO << "****Stage 3 consensus reached**** (lcl:" << ctx.lcl.substr(0, 15) - << " state:" << ctx.curr_state_hash << ")"; + << " state:" << ctx.state << ")"; } } } @@ -247,8 +243,8 @@ namespace cons << " hout:" << cp.hash_outputs.size() << " ts:" << std::to_string(cp.time) << " lcl:" << cp.lcl.substr(0, 15) - << " state:" << cp.curr_state_hash - << " self:" << self; + << " state:" << cp.state + << (self ? " [self]" : ""); } else { @@ -479,6 +475,9 @@ namespace cons int pid = fork(); if (pid == 0) { + // appbill process. + util::unmask_signal(); + // before execution chdir into a valid the latest state data directory that contains an appbill.table chdir(conf::ctx.state_rw_dir.c_str()); int ret = execv(execv_args[0], execv_args); @@ -513,7 +512,7 @@ namespace cons stg_prop.time = ctx.time_now; stg_prop.stage = 0; stg_prop.lcl = ctx.lcl; - stg_prop.curr_state_hash = ctx.curr_state_hash; + stg_prop.state = ctx.state; // Populate the proposal with set of candidate user pubkeys. for (const std::string &pubkey : ctx.candidate_users) @@ -545,7 +544,7 @@ namespace cons // if there's a fork condition we will either request history and state from // our peers or we will halt depending on level of consensus on the sides of the fork stg_prop.lcl = ctx.lcl; - stg_prop.curr_state_hash = ctx.curr_state_hash; + stg_prop.state = ctx.state; // Vote for rest of the proposal fields by looking at candidate proposals. for (const auto &[pubkey, cp] : ctx.candidate_proposals) @@ -651,10 +650,6 @@ namespace cons { LOG_DBG << "Not enough peers proposing to perform consensus. votes:" << std::to_string(total_lcl_votes) << " needed:" << std::to_string(MAJORITY_THRESHOLD * conf::cfg.unl.size()); is_desync = true; - - //Not enough nodes are propsing. So Node is switching to Proposer if it's in observer mode. - conf::change_operating_mode(conf::OPERATING_MODE::PROPOSER); - return; } @@ -692,6 +687,33 @@ namespace cons } } + /** + * Check state against the winning and canonical state + * @param votes The voting table. + */ + void check_state_votes(bool &is_desync, hpfs::h32 &majority_state, vote_counter &votes) + { + for (const auto &[pubkey, cp] : ctx.candidate_proposals) + { + increment(votes.state, cp.state); + } + + int32_t winning_votes = 0; + for (const auto [state, votes] : votes.state) + { + if (votes > winning_votes) + { + winning_votes = votes; + majority_state = state; + } + } + + { + std::lock_guard(ctx.state_sync_lock); + is_desync = (ctx.state != majority_state); + } + } + /** * Returns the consensus percentage threshold for the specified stage. * @param stage The consensus stage [1, 2, 3] @@ -797,30 +819,6 @@ namespace cons } } - /** - * Check state against the winning and canonical state - * @param votes The voting table. - */ - void check_state(vote_counter &votes) - { - hpfs::h32 majority_state = hpfs::h32_empty; - - for (const auto &[pubkey, cp] : ctx.candidate_proposals) - { - increment(votes.state, cp.curr_state_hash); - } - - int32_t winning_votes = 0; - for (const auto [state, votes] : votes.state) - { - if (votes > winning_votes) - { - winning_votes = votes; - majority_state = state; - } - } - } - /** * Transfers consensus-reached inputs into the provided contract buf map so it can be fed into the contract process. * @param bufmap The contract bufmap which needs to be populated with inputs. @@ -908,7 +906,7 @@ namespace cons sc::contract_iobuf_pair hpscbufpair; return sc::exec_contract( sc::contract_exec_args(time_now, useriobufmap, nplbufpair, hpscbufpair), - ctx.curr_state_hash); + ctx.state); } /** @@ -925,4 +923,25 @@ namespace cons counter.try_emplace(candidate, 1); } + /** + * Get the contract state hash. + */ + int get_initial_state_hash(hpfs::h32 &hash) + { + pid_t pid; + std::string mount_dir; + if (hpfs::start_fs_session(pid, mount_dir, "ro", true) == -1) + return -1; + + int res = get_hash(hash, mount_dir, "/"); + util::kill_process(pid, true); + return res; + } + + void on_state_sync_completion(const hpfs::h32 new_state) + { + std::lock_guard(ctx.state_sync_lock); + ctx.state = new_state; + } + } // namespace cons diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp index 4eccda76..05cafb48 100644 --- a/src/cons/cons.hpp +++ b/src/cons/cons.hpp @@ -8,7 +8,6 @@ #include "../usr/user_input.hpp" #include "../hpfs/h32.hpp" #include "ledger_handler.hpp" -#include "state_handler.hpp" namespace cons { @@ -74,7 +73,7 @@ struct consensus_context uint64_t time_now = 0; std::string lcl; uint64_t led_seq_no = 0; - hpfs::h32 curr_state_hash; + hpfs::h32 state = hpfs::h32_empty; //Map of closed ledgers(only lrdgername[sequnece_number-hash], state hash) with sequence number as map key. //contains closed ledgers from latest to latest - MAX_LEDGER_SEQUENCE. @@ -82,12 +81,12 @@ struct consensus_context //We will use this to track lcls related logic.- track state, lcl request, response. std::map ledger_cache; std::string last_requested_lcl; - bool is_lcl_syncing = false; //ledger close time of previous hash uint16_t stage_time = 0; // Time allocated to a consensus stage. uint16_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage. + std::mutex state_sync_lock; bool is_shutting_down = false; consensus_context() @@ -134,6 +133,8 @@ void broadcast_proposal(const p2p::proposal &p); void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes); +void check_state_votes(bool &is_desync, hpfs::h32 &majority_state, vote_counter &votes); + float_t get_stage_threshold(const uint8_t stage); void timewait_stage(const bool reset, const uint64_t time); @@ -146,8 +147,6 @@ int apply_ledger(const p2p::proposal &proposal); void dispatch_user_outputs(const p2p::proposal &cons_prop); -void check_state(vote_counter &votes); - void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop); void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap); @@ -159,6 +158,10 @@ int run_contract_binary(const int64_t time_now, sc::contract_bufmap_t &useriobuf template void increment(std::map &counter, const T &candidate); +int get_initial_state_hash(hpfs::h32 &hash); + +void on_state_sync_completion(const hpfs::h32 new_state); + } // namespace cons #endif diff --git a/src/cons/ledger_handler.cpp b/src/cons/ledger_handler.cpp index 12c14367..0f1edab0 100644 --- a/src/cons/ledger_handler.cpp +++ b/src/cons/ledger_handler.cpp @@ -61,7 +61,7 @@ const std::tuple save_ledger(const p2p::proposal &p ledger_cache_entry c; c.lcl = file_name; - c.state = proposal.curr_state_hash.to_string_view(); + c.state = proposal.state.to_string_view(); cons::ctx.ledger_cache.emplace(led_seq_no, std::move(c)); //Remove old ledgers that exceeds max sequence range. diff --git a/src/cons/state_handler.cpp b/src/cons/state_handler.cpp deleted file mode 100644 index 40cc7b98..00000000 --- a/src/cons/state_handler.cpp +++ /dev/null @@ -1,361 +0,0 @@ -#include "state_handler.hpp" -#include "../fbschema/p2pmsg_helpers.hpp" -#include "../fbschema/p2pmsg_content_generated.h" -#include "../fbschema/common_helpers.hpp" -#include "../p2p/p2p.hpp" -#include "../pchheader.hpp" -#include "../cons/cons.hpp" -#include "../hplog.hpp" -#include "../util.hpp" - -namespace cons -{ - -// Max number of requests that can be awaiting response at any given time. -constexpr uint16_t MAX_AWAITING_REQUESTS = 1; -// Syncing loop sleep delay. -constexpr uint16_t SYNC_LOOP_WAIT = 100; - -// List of state responses flatbuffer messages to be processed. -std::list candidate_state_responses; - -// List of pending sync requests to be sent out. -std::list pending_requests; - -// List of submitted requests we are awaiting responses for, keyed by expected response hash. -std::unordered_map submitted_requests; - -/** - * Sends a state request to a random peer. - * @param path Requested file or dir path. - * @param is_file Whether the requested path if a file or dir. - * @param block_id The requested block id. Only relevant if requesting a file block. Otherwise -1. - * @param expected_hash The expected hash of the requested data. The peer will ignore the request if their hash is different. - */ -void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, const hpfs::h32 expected_hash) -{ - p2p::state_request sr; - sr.parent_path = path; - sr.is_file = is_file; - sr.block_id = block_id; - sr.expected_hash = expected_hash; - - flatbuffers::FlatBufferBuilder fbuf(1024); - fbschema::p2pmsg::create_msg_from_state_request(fbuf, sr, ctx.lcl); - p2p::send_message_to_random_peer(fbuf); //todo: send to a node that hold the majority state to improve reliability of retrieving state. -} - -/** - * Creats the reply message for a given state request. - * @param msg The peer outbound message reference to build up the reply message. - * @param sr The state request which should be replied to. - */ -int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr) -{ - // If block_id > -1 this means this is a file block data request. - if (sr.block_id > -1) - { - // Vector to hold the block bytes. Normally block size is constant BLOCK_SIZE (4MB), but the - // last block of a file may have a smaller size. - std::vector block; - - // TODO: get block - - p2p::block_response resp; - resp.path = sr.parent_path; - resp.block_id = sr.block_id; - resp.hash = sr.expected_hash; - resp.data = std::string_view(reinterpret_cast(block.data()), block.size()); - - fbschema::p2pmsg::create_msg_from_block_response(fbuf, resp, ctx.lcl); - } - else - { - // File state request means we have to reply with the file block hash map. - if (sr.is_file) - { - std::vector existing_block_hashmap; - - // TODO: get block hash list - // TODO: get file length - std::size_t file_length = 0; - - fbschema::p2pmsg::create_msg_from_filehashmap_response(fbuf, sr.parent_path, existing_block_hashmap, file_length, sr.expected_hash, ctx.lcl); - } - else - { - // If the state request is for a directory we need to reply with the file system entries and their hashes inside that dir. - std::unordered_map existing_fs_entries; - - // TODO: get fs entry hashes - - fbschema::p2pmsg::create_msg_from_fsentry_response(fbuf, sr.parent_path, existing_fs_entries, sr.expected_hash, ctx.lcl); - } - } - - return 0; -} - -/** - * Initiates state sync process by setting up context variables and sending the initial state request. - * @param state_hash_to_request Peer's expected state hash. If peer doesn't have this as its state hash the - * request will be ignord. - */ -void start_state_sync(const hpfs::h32 state_hash_to_request) -{ - { - std::lock_guard lock(p2p::ctx.collected_msgs.state_response_mutex); - p2p::ctx.collected_msgs.state_response.clear(); - } - - { - candidate_state_responses.clear(); - pending_requests.clear(); - submitted_requests.clear(); - } - - // Send the root state request. - submit_request(backlog_item{BACKLOG_ITEM_TYPE::DIR, "/", -1, state_hash_to_request}); -} - -/** - * Runs the state sync loop. - */ -int run_state_sync_iterator() -{ - util::mask_signal(); - - while (true) - { - if (ctx.is_shutting_down) - break; - - util::sleep(SYNC_LOOP_WAIT); - - // TODO: Also bypass peer session handler state responses if we're not syncing. - - { - std::lock_guard lock(p2p::ctx.collected_msgs.state_response_mutex); - - // Move collected state responses over to local candidate responses list. - if (!p2p::ctx.collected_msgs.state_response.empty()) - candidate_state_responses.splice(candidate_state_responses.end(), p2p::ctx.collected_msgs.state_response); - } - - for (auto &response : candidate_state_responses) - { - if (ctx.is_shutting_down) - break; - - const fbschema::p2pmsg::Content *content = fbschema::p2pmsg::GetContent(response.data()); - const fbschema::p2pmsg::State_Response_Message *resp_msg = content->message_as_State_Response_Message(); - - // Check whether we are actually waiting for this response's hash. If not, ignore it. - hpfs::h32 response_hash = fbschema::flatbuff_bytes_to_hash(resp_msg->hash()); - const auto pending_resp_itr = submitted_requests.find(response_hash); - if (pending_resp_itr == submitted_requests.end()) - continue; - - // Now that we have received matching hash, remove it from the waiting list. - submitted_requests.erase(pending_resp_itr); - - // Process the message based on response type. - const fbschema::p2pmsg::State_Response msg_type = resp_msg->state_response_type(); - - if (msg_type == fbschema::p2pmsg::State_Response_Fs_Entry_Response) - { - if (handle_fs_entry_response(resp_msg->state_response_as_Fs_Entry_Response()) == -1) - return -1; - } - else if (msg_type == fbschema::p2pmsg::State_Response_File_HashMap_Response) - { - if (handle_file_hashmap_response(resp_msg->state_response_as_File_HashMap_Response()) == -1) - return -1; - } - else if (msg_type == fbschema::p2pmsg::State_Response_Block_Response) - { - if (handle_file_block_response(resp_msg->state_response_as_Block_Response()) == -1) - return -1; - } - } - - candidate_state_responses.clear(); - - // Check for long-awaited responses and re-request them. - for (auto &[hash, request] : submitted_requests) - { - if (ctx.is_shutting_down) - break; - - // We wait for half of round time before each request is resubmitted. - if (request.waiting_cycles < (conf::cfg.roundtime / (SYNC_LOOP_WAIT * 2))) - { - // Increment counter. - request.waiting_cycles++; - } - else - { - // Reset the counter and re-submit request. - request.waiting_cycles = 0; - LOG_DBG << "Resubmitting state request..."; - submit_request(request); - } - } - - // Check whether we can submit any more requests. - if (!pending_requests.empty() && submitted_requests.size() < MAX_AWAITING_REQUESTS) - { - const uint16_t available_slots = MAX_AWAITING_REQUESTS - submitted_requests.size(); - for (int i = 0; i < available_slots && !pending_requests.empty(); i++) - { - if (ctx.is_shutting_down) - break; - - const backlog_item &request = pending_requests.front(); - submit_request(request); - pending_requests.pop_front(); - } - } - } - - return 0; -} - -/** - * Submits a pending state request to the peer. - */ -void submit_request(const backlog_item &request) -{ - LOG_DBG << "Submitting state request. type:" << request.type << " path:" << request.path << " block_id:" << request.block_id; - - submitted_requests.try_emplace(request.expected_hash, request); - - const bool is_file = request.type != BACKLOG_ITEM_TYPE::DIR; - request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash); -} - -/** - * Process state file system entry response for a directory. - */ -int handle_fs_entry_response(const fbschema::p2pmsg::Fs_Entry_Response *fs_entry_resp) -{ - std::unordered_map state_fs_entry_list; - fbschema::p2pmsg::flatbuf_statefshashentry_to_statefshashentry(state_fs_entry_list, fs_entry_resp->entries()); - - std::unordered_map existing_fs_entries; - std::string_view root_path_sv = fbschema::flatbuff_str_to_sv(fs_entry_resp->path()); - std::string root_path_str(root_path_sv.data(), root_path_sv.size()); - - // TODO: Create state path dir if not exist. - // TODO: Get existing fs entries hash map. - // if (!statefs::is_dir_exists(root_path_str)) - // { - // statefs::create_dir(root_path_str); - // } - // else - // { - // if (statefs::get_fs_entry_hashes(existing_fs_entries, std::move(root_path_str), hpfs::h32_empty) == -1) - // return -1; - // } - - // Request more info on fs entries that exist on both sides but are different. - for (const auto &[path, fs_entry] : existing_fs_entries) - { - const auto fs_itr = state_fs_entry_list.find(path); - if (fs_itr != state_fs_entry_list.end()) - { - if (fs_itr->second.hash != fs_entry.hash) - { - if (fs_entry.is_file) - pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, path, -1, fs_itr->second.hash}); - else - pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, path, -1, fs_itr->second.hash}); - } - - state_fs_entry_list.erase(fs_itr); - } - else - { - // If there was an entry that does not exist on other side, delete it from this node. - if (fs_entry.is_file) - { - //if (statefs::delete_file(path) == -1) - // return -1; - } - else - { - //if (statefs::delete_dir(path) == -1) - // return -1; - } - } - } - - // Queue the remaining fs entries (that this node does not have at all) to request. - for (const auto &[path, fs_entry] : state_fs_entry_list) - { - if (fs_entry.is_file) - pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, path, -1, fs_entry.hash}); - else - pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, path, -1, fs_entry.hash}); - } - - return 0; -} - -int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response *file_resp) -{ - std::string_view path_sv = fbschema::flatbuff_str_to_sv(file_resp->path()); - const std::string path_str(path_sv.data(), path_sv.size()); - - std::vector existing_block_hashmap; - //if (statefs::get_block_hash_map(existing_block_hashmap, path_str, hpfs::h32_empty) == -1) - // return -1; - - const hpfs::h32 *existing_hashes = reinterpret_cast(existing_block_hashmap.data()); - auto existing_hash_count = existing_block_hashmap.size() / sizeof(hpfs::h32); - - const hpfs::h32 *resp_hashes = reinterpret_cast(file_resp->hash_map()->data()); - auto resp_hash_count = file_resp->hash_map()->size() / sizeof(hpfs::h32); - - auto insert_itr = pending_requests.begin(); - - for (int block_id = 0; block_id < existing_hash_count; ++block_id) - { - if (block_id >= resp_hash_count) - break; - - if (existing_hashes[block_id] != resp_hashes[block_id]) - { - // Insert at front to give priority to block requests while preserving block order. - pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, path_str, block_id, resp_hashes[block_id]}); - } - } - - if (existing_hash_count > resp_hash_count) - { - //if (statefs::truncate_file(path_str, file_resp->file_length()) == -1) - // return -1; - } - else if (existing_hash_count < resp_hash_count) - { - for (int block_id = existing_hash_count; block_id < resp_hash_count; ++block_id) - { - // Insert at front to give priority to block requests while preserving block order. - pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, path_str, block_id, resp_hashes[block_id]}); - } - } - - return 0; -} - -int handle_file_block_response(const fbschema::p2pmsg::Block_Response *block_msg) -{ - p2p::block_response block_resp = fbschema::p2pmsg::create_block_response_from_msg(*block_msg); - - //if (statefs::write_block(block_resp.path, block_resp.block_id, block_resp.data.data(), block_resp.data.size()) == -1) - // return -1; - - return 0; -} - -} // namespace cons \ No newline at end of file diff --git a/src/cons/state_handler.hpp b/src/cons/state_handler.hpp deleted file mode 100644 index 47963fd3..00000000 --- a/src/cons/state_handler.hpp +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef _HP_CONS_STATE_HANDLER_ -#define _HP_CONS_STATE_HANDLER_ - -#include "../pchheader.hpp" -#include "../p2p/p2p.hpp" -#include "../fbschema/p2pmsg_content_generated.h" -#include "../hpfs/h32.hpp" - -namespace cons -{ - -enum BACKLOG_ITEM_TYPE -{ - DIR = 0, - FILE = 1, - BLOCK = 2 -}; - -// Represents a queued up state sync operation which needs to be performed. -struct backlog_item -{ - BACKLOG_ITEM_TYPE type; - std::string path; - int32_t block_id = -1; // Only relevant if type=BLOCK - hpfs::h32 expected_hash; - - // No. of cycles that this item has been waiting in pending state. - // Used by pending_responses list to increase wait count. - int16_t waiting_cycles = 0; -}; - -extern std::list candidate_state_responses; - -int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr); - -void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, const hpfs::h32 expected_hash); - -void start_state_sync(const hpfs::h32 state_hash_to_request); - -int run_state_sync_iterator(); - -void submit_request(const backlog_item &request); - -int handle_fs_entry_response(const fbschema::p2pmsg::Fs_Entry_Response *fs_entry_resp); - -int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response *file_resp); - -int handle_file_block_response(const fbschema::p2pmsg::Block_Response *block_msg); - -} // namespace cons - -#endif \ No newline at end of file diff --git a/src/fbschema/ledger_helpers.cpp b/src/fbschema/ledger_helpers.cpp index 90f814c3..08206df1 100644 --- a/src/fbschema/ledger_helpers.cpp +++ b/src/fbschema/ledger_helpers.cpp @@ -19,7 +19,7 @@ const std::string_view create_ledger_from_proposal(flatbuffers::FlatBufferBuilde seq_no, p.time, sv_to_flatbuff_bytes(builder, p.lcl), - sv_to_flatbuff_bytes(builder, p.curr_state_hash.to_string_view()), + sv_to_flatbuff_bytes(builder, p.state.to_string_view()), stringlist_to_flatbuf_bytearrayvector(builder, p.users), stringlist_to_flatbuf_bytearrayvector(builder, p.hash_inputs), stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs)); diff --git a/src/fbschema/p2pmsg_content.fbs b/src/fbschema/p2pmsg_content.fbs index ba169f34..bebe1142 100644 --- a/src/fbschema/p2pmsg_content.fbs +++ b/src/fbschema/p2pmsg_content.fbs @@ -1,4 +1,6 @@ -//IDL file for p2p message content schema. +// IDL file for p2p message content schema. +// flatc -o src/fbschema/ --gen-mutable --cpp src/fbschema/p2pmsg_content.fbs + include "common_schema.fbs"; namespace fbschema.p2pmsg; @@ -37,7 +39,7 @@ table Proposal_Message { //Proposal type message schema users:[ByteArray]; hash_inputs:[ByteArray]; //stage > 0 inputs (hash of stage 0 inputs) hash_outputs:[ByteArray]; //stage > 0 outputs (hash of stage 0 outputs) - curr_state_hash: [ubyte]; + state: [ubyte]; } table Npl_Message { //NPL type message schema @@ -104,7 +106,7 @@ table Block_Response{ } table State_FS_Hash_Entry{ - path: string; + name: string; is_file: bool; hash: [ubyte]; } diff --git a/src/fbschema/p2pmsg_content_generated.h b/src/fbschema/p2pmsg_content_generated.h index 0df19a81..ba4e6f3d 100644 --- a/src/fbschema/p2pmsg_content_generated.h +++ b/src/fbschema/p2pmsg_content_generated.h @@ -700,7 +700,7 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VT_USERS = 8, VT_HASH_INPUTS = 10, VT_HASH_OUTPUTS = 12, - VT_CURR_STATE_HASH = 14 + VT_STATE = 14 }; uint8_t stage() const { return GetField(VT_STAGE, 0); @@ -732,11 +732,11 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { flatbuffers::Vector> *mutable_hash_outputs() { return GetPointer> *>(VT_HASH_OUTPUTS); } - const flatbuffers::Vector *curr_state_hash() const { - return GetPointer *>(VT_CURR_STATE_HASH); + const flatbuffers::Vector *state() const { + return GetPointer *>(VT_STATE); } - flatbuffers::Vector *mutable_curr_state_hash() { - return GetPointer *>(VT_CURR_STATE_HASH); + flatbuffers::Vector *mutable_state() { + return GetPointer *>(VT_STATE); } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && @@ -751,8 +751,8 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VerifyOffset(verifier, VT_HASH_OUTPUTS) && verifier.VerifyVector(hash_outputs()) && verifier.VerifyVectorOfTables(hash_outputs()) && - VerifyOffset(verifier, VT_CURR_STATE_HASH) && - verifier.VerifyVector(curr_state_hash()) && + VerifyOffset(verifier, VT_STATE) && + verifier.VerifyVector(state()) && verifier.EndTable(); } }; @@ -776,8 +776,8 @@ struct Proposal_MessageBuilder { void add_hash_outputs(flatbuffers::Offset>> hash_outputs) { fbb_.AddOffset(Proposal_Message::VT_HASH_OUTPUTS, hash_outputs); } - void add_curr_state_hash(flatbuffers::Offset> curr_state_hash) { - fbb_.AddOffset(Proposal_Message::VT_CURR_STATE_HASH, curr_state_hash); + void add_state(flatbuffers::Offset> state) { + fbb_.AddOffset(Proposal_Message::VT_STATE, state); } explicit Proposal_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { @@ -797,10 +797,10 @@ inline flatbuffers::Offset CreateProposal_Message( flatbuffers::Offset>> users = 0, flatbuffers::Offset>> hash_inputs = 0, flatbuffers::Offset>> hash_outputs = 0, - flatbuffers::Offset> curr_state_hash = 0) { + flatbuffers::Offset> state = 0) { Proposal_MessageBuilder builder_(_fbb); builder_.add_time(time); - builder_.add_curr_state_hash(curr_state_hash); + builder_.add_state(state); builder_.add_hash_outputs(hash_outputs); builder_.add_hash_inputs(hash_inputs); builder_.add_users(users); @@ -815,11 +815,11 @@ inline flatbuffers::Offset CreateProposal_MessageDirect( const std::vector> *users = nullptr, const std::vector> *hash_inputs = nullptr, const std::vector> *hash_outputs = nullptr, - const std::vector *curr_state_hash = nullptr) { + const std::vector *state = nullptr) { auto users__ = users ? _fbb.CreateVector>(*users) : 0; auto hash_inputs__ = hash_inputs ? _fbb.CreateVector>(*hash_inputs) : 0; auto hash_outputs__ = hash_outputs ? _fbb.CreateVector>(*hash_outputs) : 0; - auto curr_state_hash__ = curr_state_hash ? _fbb.CreateVector(*curr_state_hash) : 0; + auto state__ = state ? _fbb.CreateVector(*state) : 0; return fbschema::p2pmsg::CreateProposal_Message( _fbb, stage, @@ -827,7 +827,7 @@ inline flatbuffers::Offset CreateProposal_MessageDirect( users__, hash_inputs__, hash_outputs__, - curr_state_hash__); + state__); } struct Npl_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { @@ -1623,15 +1623,15 @@ inline flatbuffers::Offset CreateBlock_ResponseDirect( struct State_FS_Hash_Entry FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef State_FS_Hash_EntryBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_PATH = 4, + VT_NAME = 4, VT_IS_FILE = 6, VT_HASH = 8 }; - const flatbuffers::String *path() const { - return GetPointer(VT_PATH); + const flatbuffers::String *name() const { + return GetPointer(VT_NAME); } - flatbuffers::String *mutable_path() { - return GetPointer(VT_PATH); + flatbuffers::String *mutable_name() { + return GetPointer(VT_NAME); } bool is_file() const { return GetField(VT_IS_FILE, 0) != 0; @@ -1647,8 +1647,8 @@ struct State_FS_Hash_Entry FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && - VerifyOffset(verifier, VT_PATH) && - verifier.VerifyString(path()) && + VerifyOffset(verifier, VT_NAME) && + verifier.VerifyString(name()) && VerifyField(verifier, VT_IS_FILE) && VerifyOffset(verifier, VT_HASH) && verifier.VerifyVector(hash()) && @@ -1660,8 +1660,8 @@ struct State_FS_Hash_EntryBuilder { typedef State_FS_Hash_Entry Table; flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; - void add_path(flatbuffers::Offset path) { - fbb_.AddOffset(State_FS_Hash_Entry::VT_PATH, path); + void add_name(flatbuffers::Offset name) { + fbb_.AddOffset(State_FS_Hash_Entry::VT_NAME, name); } void add_is_file(bool is_file) { fbb_.AddElement(State_FS_Hash_Entry::VT_IS_FILE, static_cast(is_file), 0); @@ -1682,26 +1682,26 @@ struct State_FS_Hash_EntryBuilder { inline flatbuffers::Offset CreateState_FS_Hash_Entry( flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset path = 0, + flatbuffers::Offset name = 0, bool is_file = false, flatbuffers::Offset> hash = 0) { State_FS_Hash_EntryBuilder builder_(_fbb); builder_.add_hash(hash); - builder_.add_path(path); + builder_.add_name(name); builder_.add_is_file(is_file); return builder_.Finish(); } inline flatbuffers::Offset CreateState_FS_Hash_EntryDirect( flatbuffers::FlatBufferBuilder &_fbb, - const char *path = nullptr, + const char *name = nullptr, bool is_file = false, const std::vector *hash = nullptr) { - auto path__ = path ? _fbb.CreateString(path) : 0; + auto name__ = name ? _fbb.CreateString(name) : 0; auto hash__ = hash ? _fbb.CreateVector(*hash) : 0; return fbschema::p2pmsg::CreateState_FS_Hash_Entry( _fbb, - path__, + name__, is_file, hash__); } diff --git a/src/fbschema/p2pmsg_helpers.cpp b/src/fbschema/p2pmsg_helpers.cpp index e08f8cbc..a5fc3004 100644 --- a/src/fbschema/p2pmsg_helpers.cpp +++ b/src/fbschema/p2pmsg_helpers.cpp @@ -4,6 +4,8 @@ #include "../util.hpp" #include "../hplog.hpp" #include "../p2p/p2p.hpp" +#include "../hpfs/h32.hpp" +#include "../hpfs/hpfs.hpp" #include "p2pmsg_container_generated.h" #include "p2pmsg_content_generated.h" #include "common_helpers.hpp" @@ -222,7 +224,7 @@ namespace fbschema::p2pmsg p.time = msg.time(); p.stage = msg.stage(); p.lcl = flatbuff_bytes_to_sv(lcl); - p.curr_state_hash = flatbuff_bytes_to_sv(msg.curr_state_hash()); + p.state = flatbuff_bytes_to_sv(msg.state()); if (msg.users()) p.users = flatbuf_bytearrayvector_to_stringlist(msg.users()); @@ -271,21 +273,6 @@ namespace fbschema::p2pmsg return sr; } - /** - * Creates a block response struct from the given block response message. - * @param msg Flatbuffer block response message received from the peer. - * @return A Block response struct representing the message. - */ - const p2p::block_response create_block_response_from_msg(const Block_Response &msg) - { - p2p::block_response br; - - br.path = flatbuff_str_to_sv(msg.path()); - br.block_id = msg.block_id(); - br.data = flatbuff_bytes_to_sv(msg.data()); - return br; - } - //---Message creation helpers---// /** @@ -372,7 +359,7 @@ namespace fbschema::p2pmsg stringlist_to_flatbuf_bytearrayvector(builder, p.users), stringlist_to_flatbuf_bytearrayvector(builder, p.hash_inputs), stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs), - sv_to_flatbuff_bytes(builder, p.curr_state_hash.to_string_view())); + sv_to_flatbuff_bytes(builder, p.state.to_string_view())); const flatbuffers::Offset message = CreateContent(builder, Message_Proposal_Message, proposal.Union()); builder.Finish(message); // Finished building message content to get serialised content. @@ -480,11 +467,13 @@ namespace fbschema::p2pmsg * Create content response message from the given content response. * @param container_builder Flatbuffer builder for the container message. * @param path The path of the directory. - * @param fs_entries File or directory entries in the given parent path. + * @param hash_nodes File or directory entries with hashes in the given parent path. * @param expected_hash The exptected hash of the requested path. * @param lcl Lcl to be include in the container msg. */ - void create_msg_from_fsentry_response(flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, std::unordered_map &fs_entries, hpfs::h32 expected_hash, std::string_view lcl) + void create_msg_from_fsentry_response( + flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, + std::vector &hash_nodes, hpfs::h32 expected_hash, std::string_view lcl) { flatbuffers::FlatBufferBuilder builder(1024); @@ -492,7 +481,7 @@ namespace fbschema::p2pmsg CreateFs_Entry_Response( builder, sv_to_flatbuff_str(builder, path), - statefshashentry_to_flatbuff_statefshashentry(builder, fs_entries)); + statefshashentry_to_flatbuff_statefshashentry(builder, hash_nodes)); const flatbuffers::Offset st_resp = CreateState_Response_Message( builder, State_Response_Fs_Entry_Response, @@ -514,12 +503,14 @@ namespace fbschema::p2pmsg * @param hashmap Hashmap of the file * @param lcl Lcl to be include in the container msg. */ - void create_msg_from_filehashmap_response(flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, std::vector &hashmap, std::size_t file_length, hpfs::h32 expected_hash, std::string_view lcl) + void create_msg_from_filehashmap_response( + flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, + std::vector &hashmap, std::size_t file_length, hpfs::h32 expected_hash, std::string_view lcl) { // todo:get a average propsal message size and allocate content builder based on that. flatbuffers::FlatBufferBuilder builder(1024); - std::string_view hashmap_sv(reinterpret_cast(hashmap.data()), hashmap.size()); + std::string_view hashmap_sv(reinterpret_cast(hashmap.data()), hashmap.size() * sizeof(hpfs::h32)); const flatbuffers::Offset resp = CreateFile_HashMap_Response( @@ -574,21 +565,6 @@ namespace fbschema::p2pmsg create_containermsg_from_content(container_builder, builder, lcl, true); } - void create_msg_from_state_error_response(flatbuffers::FlatBufferBuilder &container_builder, std::string_view lcl) - { - // todo:get a average propsal message size and allocate content builder based on that. - flatbuffers::FlatBufferBuilder builder(1024); - - const flatbuffers::Offset st_resp = CreateState_Response_Message(builder, State_Response_NONE, 0, true); - - flatbuffers::Offset message = CreateContent(builder, Message_State_Response_Message, st_resp.Union()); - builder.Finish(message); // Finished building message content to get serialised content. - - // Now that we have built the content message, - // we need to sign it and place it inside a container message. - create_containermsg_from_content(container_builder, builder, lcl, true); - } - /** * Creates a Flatbuffer container message from the given Content message. * @param container_builder The Flatbuffer builder to which the final container message should be written to. @@ -728,29 +704,31 @@ namespace fbschema::p2pmsg void flatbuf_statefshashentry_to_statefshashentry(std::unordered_map &fs_entries, const flatbuffers::Vector> *fhashes) { - for (const State_FS_Hash_Entry *f_hash : *fhashes) { - p2p::state_fs_hash_entry h; - - h.is_file = f_hash->is_file(); - h.hash = flatbuff_bytes_to_hash(f_hash->hash()); - fs_entries.emplace(flatbuff_str_to_sv(f_hash->path()), std::move(h)); + p2p::state_fs_hash_entry entry; + entry.name = flatbuff_str_to_sv(f_hash->name()); + entry.is_file = f_hash->is_file(); + entry.hash = flatbuff_bytes_to_hash(f_hash->hash()); + + fs_entries.emplace(entry.name, std::move(entry)); } } flatbuffers::Offset>> - statefshashentry_to_flatbuff_statefshashentry(flatbuffers::FlatBufferBuilder &builder, std::unordered_map &fs_entries) + statefshashentry_to_flatbuff_statefshashentry( + flatbuffers::FlatBufferBuilder &builder, + std::vector &hash_nodes) { std::vector> fbvec; - fbvec.reserve(fs_entries.size()); - for (auto const &[path, fs_entry] : fs_entries) + fbvec.reserve(hash_nodes.size()); + for (auto const &hash_node : hash_nodes) { flatbuffers::Offset state_fs_entry = CreateState_FS_Hash_Entry( builder, - sv_to_flatbuff_str(builder, path), - fs_entry.is_file, - hash_to_flatbuff_bytes(builder, fs_entry.hash)); + sv_to_flatbuff_str(builder, hash_node.name), + hash_node.is_file, + hash_to_flatbuff_bytes(builder, hash_node.hash)); fbvec.push_back(state_fs_entry); } diff --git a/src/fbschema/p2pmsg_helpers.hpp b/src/fbschema/p2pmsg_helpers.hpp index 9e789418..d2b9ffd4 100644 --- a/src/fbschema/p2pmsg_helpers.hpp +++ b/src/fbschema/p2pmsg_helpers.hpp @@ -6,90 +6,94 @@ #include "p2pmsg_content_generated.h" #include "../p2p/p2p.hpp" #include "../hpfs/h32.hpp" +#include "../hpfs/hpfs.hpp" namespace fbschema::p2pmsg { -/** + /** * This section contains Flatbuffer p2p message reading/writing helpers. */ -//---Message validation helpers---/ + //---Message validation helpers---/ -int validate_and_extract_container(const Container **container_ref, std::string_view container_buf); + int validate_and_extract_container(const Container **container_ref, std::string_view container_buf); -int validate_container_trust(const Container *container); + int validate_container_trust(const Container *container); -int validate_and_extract_content(const Content **content_ref, const uint8_t *content_ptr, const flatbuffers::uoffset_t content_size); + int validate_and_extract_content(const Content **content_ref, const uint8_t *content_ptr, const flatbuffers::uoffset_t content_size); -//---Message reading helpers---/ + //---Message reading helpers---/ -const std::string_view get_peer_challenge_from_msg(const Peer_Challenge_Message &msg); + const std::string_view get_peer_challenge_from_msg(const Peer_Challenge_Message &msg); -const p2p::peer_challenge_response create_peer_challenge_response_from_msg(const Peer_Challenge_Response_Message &msg, const flatbuffers::Vector *pubkey); + const p2p::peer_challenge_response create_peer_challenge_response_from_msg(const Peer_Challenge_Response_Message &msg, const flatbuffers::Vector *pubkey); -const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const NonUnl_Proposal_Message &msg, const uint64_t timestamp); + const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const NonUnl_Proposal_Message &msg, const uint64_t timestamp); -const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector *pubkey, const uint64_t timestamp, const flatbuffers::Vector *lcl); + const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector *pubkey, const uint64_t timestamp, const flatbuffers::Vector *lcl); -const p2p::history_request create_history_request_from_msg(const History_Request_Message &msg); + const p2p::history_request create_history_request_from_msg(const History_Request_Message &msg); -const p2p::history_response create_history_response_from_msg(const History_Response_Message &msg); + const p2p::history_response create_history_response_from_msg(const History_Response_Message &msg); -const p2p::state_request create_state_request_from_msg(const State_Request_Message &msg); + const p2p::state_request create_state_request_from_msg(const State_Request_Message &msg); -const p2p::block_response create_block_response_from_msg(const Block_Response &msg); + //---Message creation helpers---// + void create_peer_challenge_response_from_challenge(flatbuffers::FlatBufferBuilder &container_builder, const std::string &challenge); -//---Message creation helpers---// -void create_peer_challenge_response_from_challenge(flatbuffers::FlatBufferBuilder &container_builder, const std::string &challenge); + void create_msg_from_peer_challenge(flatbuffers::FlatBufferBuilder &container_builder, std::string &challenge); -void create_msg_from_peer_challenge(flatbuffers::FlatBufferBuilder &container_builder, std::string &challenge); + void create_msg_from_nonunl_proposal(flatbuffers::FlatBufferBuilder &container_builder, const p2p::nonunl_proposal &nup); -void create_msg_from_nonunl_proposal(flatbuffers::FlatBufferBuilder &container_builder, const p2p::nonunl_proposal &nup); + void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder, const p2p::proposal &p); -void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder, const p2p::proposal &p); + void create_msg_from_history_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::history_request &hr); -void create_msg_from_history_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::history_request &hr); + void create_msg_from_history_response(flatbuffers::FlatBufferBuilder &container_builder, const p2p::history_response &hr); -void create_msg_from_history_response(flatbuffers::FlatBufferBuilder &container_builder, const p2p::history_response &hr); + void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builder, const p2p::npl_message &npl, std::string_view lcl); -void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builder, const p2p::npl_message &npl, std::string_view lcl); + void create_msg_from_state_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::state_request &hr, std::string_view lcl); -void create_msg_from_state_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::state_request &hr, std::string_view lcl); + void create_msg_from_fsentry_response( + flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, + std::vector &hash_nodes, hpfs::h32 expected_hash, std::string_view lcl); -void create_msg_from_fsentry_response(flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, - std::unordered_map &fs_entries, hpfs::h32 expected_hash, std::string_view lcl); + void create_msg_from_filehashmap_response( + flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, + std::vector &hashmap, std::size_t file_length, hpfs::h32 expected_hash, std::string_view lcl); -void create_msg_from_filehashmap_response(flatbuffers::FlatBufferBuilder &container_builder, std::string_view path, std::vector &hashmap, std::size_t file_length, hpfs::h32 expected_hash, std::string_view lcl); + void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, std::string_view lcl); -void create_msg_from_block_response(flatbuffers::FlatBufferBuilder &container_builder, p2p::block_response &block_resp, std::string_view lcl); + void create_containermsg_from_content( + flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, std::string_view lcl, const bool sign); -void create_containermsg_from_content( - flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, std::string_view lcl, const bool sign); + //---Conversion helpers from flatbuffers data types to std data types---// -//---Conversion helpers from flatbuffers data types to std data types---// + const std::unordered_map> + flatbuf_usermsgsmap_to_usermsgsmap(const flatbuffers::Vector> *fbvec); -const std::unordered_map> -flatbuf_usermsgsmap_to_usermsgsmap(const flatbuffers::Vector> *fbvec); + //---Conversion helpers from std data types to flatbuffers data types---// -//---Conversion helpers from std data types to flatbuffers data types---// + const flatbuffers::Offset>> + usermsgsmap_to_flatbuf_usermsgsmap(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map); -const flatbuffers::Offset>> -usermsgsmap_to_flatbuf_usermsgsmap(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map); + const std::map + flatbuf_historyledgermap_to_historyledgermap(const flatbuffers::Vector> *fbvec); -const std::map -flatbuf_historyledgermap_to_historyledgermap(const flatbuffers::Vector> *fbvec); + const flatbuffers::Offset>> + historyledgermap_to_flatbuf_historyledgermap(flatbuffers::FlatBufferBuilder &builder, const std::map &map); -const flatbuffers::Offset>> -historyledgermap_to_flatbuf_historyledgermap(flatbuffers::FlatBufferBuilder &builder, const std::map &map); + void flatbuf_statefshashentry_to_statefshashentry(std::unordered_map &fs_entries, + const flatbuffers::Vector> *fhashes); -void flatbuf_statefshashentry_to_statefshashentry(std::unordered_map &fs_entries, - const flatbuffers::Vector> *fhashes); + void statefilehash_to_flatbuf_statefilehash(flatbuffers::FlatBufferBuilder &builder, std::vector> &list, + std::string_view full_path, bool is_file, std::string_view hash); -void statefilehash_to_flatbuf_statefilehash(flatbuffers::FlatBufferBuilder &builder, std::vector> &list, - std::string_view full_path, bool is_file, std::string_view hash); - -flatbuffers::Offset>> -statefshashentry_to_flatbuff_statefshashentry(flatbuffers::FlatBufferBuilder &builder, std::unordered_map &fs_entries); + flatbuffers::Offset>> + statefshashentry_to_flatbuff_statefshashentry( + flatbuffers::FlatBufferBuilder &builder, + std::vector &hash_nodes); } // namespace fbschema::p2pmsg diff --git a/src/hpfs/hpfs.cpp b/src/hpfs/hpfs.cpp index 20752804..63ba560e 100644 --- a/src/hpfs/hpfs.cpp +++ b/src/hpfs/hpfs.cpp @@ -48,6 +48,8 @@ namespace hpfs else if (pid == 0) { // hpfs process. + util::unmask_signal(); + // Fill process args. char *execv_args[] = { conf::ctx.hpfs_exe_path.data(), @@ -77,12 +79,17 @@ namespace hpfs { // HotPocket process. - // If the mound dir is not specified, assign a mount dir based on hpfs process id. + // If the mount dir is not specified, assign a mount dir based on hpfs process id. if (mount_dir.empty()) mount_dir = std::string(conf::ctx.state_dir) .append("/") .append(std::to_string(pid)); + // The path used for checking whether hpfs has finished initializing. + const std::string check_path = hash_map_enabled + ? std::string(mount_dir).append("/::hpfs.hmap.hash") + : mount_dir; + // Wait until hpfs is initialized properly. bool hpfs_initialized = false; uint8_t retry_count = 0; @@ -94,9 +101,11 @@ namespace hpfs if (kill(pid, 0) == -1) break; - // If hpfs is initialized, the inode no. of the mounted root dir is always 1. + // If hash map is enabled we check whether stat succeeds on the root hash. + // If not, we check whether the inode no. of the mounted root dir is 1. struct stat st; - hpfs_initialized = (stat(mount_dir.c_str(), &st) == 0 && st.st_ino == 1); + hpfs_initialized = (stat(check_path.c_str(), &st) == 0 && + (hash_map_enabled || st.st_ino == 1)); } while (!hpfs_initialized && ++retry_count < 100); @@ -113,8 +122,9 @@ namespace hpfs else if (pid == 0) { // hpfs process. + util::unmask_signal(); - // If the mound dir is not specified, assign a mount dir based on hpfs process id. + // If the mount dir is not specified, assign a mount dir based on hpfs process id. const pid_t self_pid = getpid(); if (mount_dir.empty()) mount_dir = std::string(conf::ctx.state_dir) @@ -127,7 +137,7 @@ namespace hpfs (char *)mode, // hpfs mode: rw | ro conf::ctx.state_dir.data(), mount_dir.data(), - (char *)(hash_map_enabled ? "hmap=true" : "hmap-false"), + (char *)(hash_map_enabled ? "hmap=true" : "hmap=false"), NULL}; const int ret = execv(execv_args[0], execv_args); @@ -143,19 +153,6 @@ namespace hpfs return 0; } - int get_root_hash(h32 &hash) - { - pid_t pid; - std::string mount_dir; - if (start_fs_session(pid, mount_dir, "ro", true) == -1) - return -1; - - int res = get_hash(hash, mount_dir, "/"); - util::kill_process(pid, true); - - return res; - } - int get_hash(h32 &hash, const std::string_view mount_dir, const std::string_view vpath) { std::string path = std::string(mount_dir).append(vpath).append("::hpfs.hmap.hash"); @@ -175,4 +172,63 @@ namespace hpfs return 0; } + int get_file_block_hashes(std::vector &hashes, const std::string_view mount_dir, const std::string_view vpath) + { + std::string path = std::string(mount_dir).append(vpath).append("::hpfs.hmap.children"); + int fd = open(path.c_str(), O_RDONLY); + if (fd == -1) + return -1; + + struct stat st; + if (fstat(fd, &st) == -1) + { + close(fd); + LOG_ERR << errno << ": Error reading block hashes length."; + return -1; + } + + const int children_count = st.st_size / sizeof(h32); + hashes.resize(children_count); + + int res = read(fd, hashes.data(), st.st_size); + close(fd); + if (res == -1) + { + LOG_ERR << errno << ": Error reading hash block hashes."; + return -1; + } + return 0; + } + + int get_dir_children_hashes(std::vector &hash_nodes, const std::string_view mount_dir, const std::string_view dir_vpath) + { + std::string path = std::string(mount_dir).append(dir_vpath).append("::hpfs.hmap.children"); + int fd = open(path.c_str(), O_RDONLY); + if (fd == -1) + { + LOG_ERR << errno << ": Error opening hash children nodes."; + return -1; + } + + struct stat st; + if (fstat(fd, &st) == -1) + { + close(fd); + LOG_ERR << errno << ": Error reading hash children nodes length."; + return -1; + } + + const int children_count = st.st_size / sizeof(child_hash_node); + hash_nodes.resize(children_count); + + int res = read(fd, hash_nodes.data(), st.st_size); + close(fd); + if (res == -1) + { + LOG_ERR << errno << ": Error reading hash children nodes."; + return -1; + } + return 0; + } + } // namespace hpfs \ No newline at end of file diff --git a/src/hpfs/hpfs.hpp b/src/hpfs/hpfs.hpp index 41361b84..780624b1 100644 --- a/src/hpfs/hpfs.hpp +++ b/src/hpfs/hpfs.hpp @@ -6,13 +6,21 @@ namespace hpfs { + struct child_hash_node + { + bool is_file; + char name[256]; + h32 hash; + }; + int init(); void deinit(); int start_merge_process(); int start_fs_session(pid_t &session_pid, std::string &mount_dir, const char *mode, const bool hash_map_enabled); - int get_root_hash(h32 &hash); int get_hash(h32 &hash, const std::string_view mount_dir, const std::string_view vpath); + int get_file_block_hashes(std::vector &hashes, const std::string_view mount_dir, const std::string_view vpath); + int get_dir_children_hashes(std::vector &hash_nodes, const std::string_view mount_dir, const std::string_view dir_vpath); } // namespace hpfs #endif \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 83140166..396c82c1 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -12,6 +12,7 @@ #include "p2p/p2p.hpp" #include "cons/cons.hpp" #include "hpfs/hpfs.hpp" +#include "state/state_sync.hpp" /** * Parses CLI args and extracts hot pocket command and parameters given. @@ -64,10 +65,11 @@ int parse_cmd(int argc, char **argv) */ void deinit() { - usr::deinit(); - p2p::deinit(); cons::deinit(); sc::deinit(); + state_sync::deinit(); + usr::deinit(); + p2p::deinit(); hpfs::deinit(); hplog::deinit(); } @@ -188,7 +190,8 @@ int main(int argc, char **argv) LOG_INFO << "Operating mode: " << (conf::cfg.startup_mode == conf::OPERATING_MODE::OBSERVER ? "Observer" : "Proposer"); - if (hpfs::init() != 0 || p2p::init() != 0 || usr::init() != 0 || cons::init() != 0) + if (hpfs::init() != 0 || p2p::init() != 0 || usr::init() != 0 || + state_sync::init() != 0 || cons::init() != 0) { deinit(); return -1; diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index e11af096..3787ff31 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -89,6 +89,7 @@ namespace p2p if (iter == p2p::ctx.peer_connections.end()) { // Add the new connection straight away, if we haven't seen it before. + session.is_self = (res == 0); session.uniqueid.swap(pubkeyhex); session.challenge_status = comm::CHALLENGE_VERIFIED; p2p::ctx.peer_connections.try_emplace(session.uniqueid, &session); @@ -198,12 +199,12 @@ namespace p2p const size_t connected_peers = ctx.peer_connections.size(); if (connected_peers == 0) { - LOG_DBG << "No peers to send (not even self)."; + LOG_DBG << "No peers to random send."; return; } else if (connected_peers == 1 && ctx.peer_connections.begin()->second->is_self) { - LOG_DBG << "Only self is connected."; + LOG_DBG << "Only self is connected. Cannot random send."; return; } diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 81ba3f09..48b31bef 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -20,7 +20,7 @@ struct proposal uint64_t time; uint8_t stage; std::string lcl; - hpfs::h32 curr_state_hash; + hpfs::h32 state; std::set users; std::set hash_inputs; std::set hash_outputs; @@ -81,8 +81,9 @@ struct state_request // Represents state file system entry. struct state_fs_hash_entry { + std::string name; // Name of the file/dir. bool is_file; // Whether this is a file or dir. - hpfs::h32 hash; // Hash of the file or dir. + hpfs::h32 hash; // Hash of the file or dir. }; // Represents a file block data resposne. diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 8fe9f4f9..c3def4d2 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -12,7 +12,8 @@ #include "p2p.hpp" #include "peer_session_handler.hpp" #include "../cons/ledger_handler.hpp" -#include "../cons/state_handler.hpp" +#include "../state/state_sync.hpp" +#include "../state/state_serve.hpp" #include "../cons/cons.hpp" namespace p2pmsg = fbschema::p2pmsg; @@ -20,191 +21,195 @@ namespace p2pmsg = fbschema::p2pmsg; namespace p2p { -// The set of recent peer message hashes used for duplicate detection. -util::rollover_hashset recent_peermsg_hashes(200); + // The set of recent peer message hashes used for duplicate detection. + util::rollover_hashset recent_peermsg_hashes(200); -/** + /** * This gets hit every time a peer connects to HP via the peer port (configured in contract config). */ -int peer_session_handler::on_connect(comm::comm_session &session) const -{ - if (session.is_inbound) + int peer_session_handler::on_connect(comm::comm_session &session) const { - // Limit max number of inbound connections. - if (conf::cfg.peermaxcons > 0 && ctx.peer_connections.size() >= conf::cfg.peermaxcons) + if (session.is_inbound) { - LOG_DBG << "Max peer connections reached. Dropped connection " << session.uniqueid; - return -1; + // Limit max number of inbound connections. + if (conf::cfg.peermaxcons > 0 && ctx.peer_connections.size() >= conf::cfg.peermaxcons) + { + LOG_DBG << "Max peer connections reached. Dropped connection " << session.uniqueid; + return -1; + } } - } - // Send peer challenge. - flatbuffers::FlatBufferBuilder fbuf(1024); - p2pmsg::create_msg_from_peer_challenge(fbuf, session.issued_challenge); - std::string_view msg = std::string_view( - reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); - session.send(msg); - session.challenge_status = comm::CHALLENGE_ISSUED; - return 0; -} - -//peer session on message callback method -//validate and handle each type of peer messages. -int peer_session_handler::on_message(comm::comm_session &session, std::string_view message) const -{ - const p2pmsg::Container *container; - if (p2pmsg::validate_and_extract_container(&container, message) != 0) - return 0; - - //Get serialised message content. - const flatbuffers::Vector *container_content = container->content(); - - //Accessing message content and size. - const uint8_t *content_ptr = container_content->Data(); - const flatbuffers::uoffset_t content_size = container_content->size(); - - const p2pmsg::Content *content; - if (p2pmsg::validate_and_extract_content(&content, content_ptr, content_size) != 0) - return 0; - - if (!recent_peermsg_hashes.try_emplace(crypto::get_hash(message))) - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_DUPMSGS_PER_MINUTE, 1); - LOG_DBG << "Duplicate peer message. " << session.uniqueid; - return 0; - } - - const p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc - - if (content_message_type == p2pmsg::Message_Peer_Challenge_Message) // message is a peer challenge announcement - { - // Sending the challenge response to the respected peer. - const std::string challenge = std::string(p2pmsg::get_peer_challenge_from_msg(*content->message_as_Peer_Challenge_Message())); + // Send peer challenge. flatbuffers::FlatBufferBuilder fbuf(1024); - p2pmsg::create_peer_challenge_response_from_challenge(fbuf, challenge); + p2pmsg::create_msg_from_peer_challenge(fbuf, session.issued_challenge); std::string_view msg = std::string_view( reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); - return session.send(msg); - } - - if (content_message_type == p2pmsg::Message_Peer_Challenge_Response_Message) // message is a peer challenge response - { - // Ignore if challenge is already resolved. - if (session.challenge_status == comm::CHALLENGE_ISSUED) - { - const p2p::peer_challenge_response challenge_resp = p2pmsg::create_peer_challenge_response_from_msg(*content->message_as_Peer_Challenge_Response_Message(), container->pubkey()); - return p2p::resolve_peer_challenge(session, challenge_resp); - } - } - - if (session.challenge_status != comm::CHALLENGE_VERIFIED) - { - LOG_DBG << "Cannot accept messages. Peer challenge unresolved. " << session.uniqueid; + session.send(msg); + session.challenge_status = comm::CHALLENGE_ISSUED; return 0; } - if (content_message_type == p2pmsg::Message_Proposal_Message) // message is a proposal message + //peer session on message callback method + //validate and handle each type of peer messages. + int peer_session_handler::on_message(comm::comm_session &session, std::string_view message) const { - // We only trust proposals coming from trusted peers. - if (p2pmsg::validate_container_trust(container) != 0) + const p2pmsg::Container *container; + if (p2pmsg::validate_and_extract_container(&container, message) != 0) + return 0; + + //Get serialised message content. + const flatbuffers::Vector *container_content = container->content(); + + //Accessing message content and size. + const uint8_t *content_ptr = container_content->Data(); + const flatbuffers::uoffset_t content_size = container_content->size(); + + const p2pmsg::Content *content; + if (p2pmsg::validate_and_extract_content(&content, content_ptr, content_size) != 0) + return 0; + + if (!recent_peermsg_hashes.try_emplace(crypto::get_hash(message))) { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); - LOG_DBG << "Proposal rejected due to trust failure. " << session.uniqueid; + session.increment_metric(comm::SESSION_THRESHOLDS::MAX_DUPMSGS_PER_MINUTE, 1); + LOG_DBG << "Duplicate peer message. " << session.uniqueid; return 0; } - std::lock_guard lock(ctx.collected_msgs.proposals_mutex); // Insert proposal with lock. + const p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc - ctx.collected_msgs.proposals.push_back( - p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey(), container->timestamp(), container->lcl())); - } - else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message - { - std::lock_guard lock(ctx.collected_msgs.nonunl_proposals_mutex); // Insert non-unl proposal with lock. - - ctx.collected_msgs.nonunl_proposals.push_back( - p2pmsg::create_nonunl_proposal_from_msg(*content->message_as_NonUnl_Proposal_Message(), container->timestamp())); - } - else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message - { - if (p2pmsg::validate_container_trust(container) != 0) - { - LOG_DBG << "NPL message rejected due to trust failure. " << session.uniqueid; - return 0; - } - - std::lock_guard lock(ctx.collected_msgs.npl_messages_mutex); // Insert npl message with lock. - - // Npl messages are added to the npl message array as it is without deserealizing the content. The same content will be passed down - // to the contract as input in a binary format - const uint8_t *container_buf_ptr = reinterpret_cast(message.data()); - const size_t container_buf_size = message.length(); - const std::string npl_message(reinterpret_cast(container_buf_ptr), container_buf_size); - ctx.collected_msgs.npl_messages.push_back(std::move(npl_message)); - } - else if (content_message_type == p2pmsg::Message_State_Request_Message) - { - const p2p::state_request sr = p2pmsg::create_state_request_from_msg(*content->message_as_State_Request_Message()); - flatbuffers::FlatBufferBuilder fbuf(1024); - - if (cons::create_state_response(fbuf, sr) == 0) - { - std::string_view msg = std::string_view( - reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); - session.send(msg); - } - } - else if (content_message_type == p2pmsg::Message_State_Response_Message) - { - if (p2pmsg::validate_container_trust(container) != 0) - { - LOG_DBG << "State response message rejected due to trust failure. " << session.uniqueid; - return 0; - } - - std::lock_guard lock(ctx.collected_msgs.state_response_mutex); // Insert state_response with lock. - std::string response(reinterpret_cast(content_ptr), content_size); - ctx.collected_msgs.state_response.push_back(std::move(response)); - } - else if (content_message_type == p2pmsg::Message_History_Request_Message) //message is a lcl history request message - { - const p2p::history_request hr = p2pmsg::create_history_request_from_msg(*content->message_as_History_Request_Message()); - //first check node has the required lcl available. -> if so send lcl history accordingly. - const bool req_lcl_avail = cons::check_required_lcl_availability(hr); - if (req_lcl_avail) + if (content_message_type == p2pmsg::Message_Peer_Challenge_Message) // message is a peer challenge announcement { + // Sending the challenge response to the respected peer. + const std::string challenge = std::string(p2pmsg::get_peer_challenge_from_msg(*content->message_as_Peer_Challenge_Message())); flatbuffers::FlatBufferBuilder fbuf(1024); - p2pmsg::create_msg_from_history_response(fbuf, cons::retrieve_ledger_history(hr)); + p2pmsg::create_peer_challenge_response_from_challenge(fbuf, challenge); std::string_view msg = std::string_view( reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); - - session.send(msg); + return session.send(msg); } - } - else if (content_message_type == p2pmsg::Message_History_Response_Message) //message is a lcl history response message - { - if (p2pmsg::validate_container_trust(container) != 0) + + if (content_message_type == p2pmsg::Message_Peer_Challenge_Response_Message) // message is a peer challenge response { - LOG_DBG << "History response message rejected due to trust failure. " << session.uniqueid; + // Ignore if challenge is already resolved. + if (session.challenge_status == comm::CHALLENGE_ISSUED) + { + const p2p::peer_challenge_response challenge_resp = p2pmsg::create_peer_challenge_response_from_msg(*content->message_as_Peer_Challenge_Response_Message(), container->pubkey()); + return p2p::resolve_peer_challenge(session, challenge_resp); + } + } + + if (session.challenge_status != comm::CHALLENGE_VERIFIED) + { + LOG_DBG << "Cannot accept messages. Peer challenge unresolved. " << session.uniqueid; return 0; } - cons::handle_ledger_history_response( - p2pmsg::create_history_response_from_msg(*content->message_as_History_Response_Message())); - } - else - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); - LOG_DBG << "Received invalid peer message type. " << session.uniqueid; - } - return 0; -} + if (content_message_type == p2pmsg::Message_Proposal_Message) // message is a proposal message + { + // We only trust proposals coming from trusted peers. + if (p2pmsg::validate_container_trust(container) != 0) + { + session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); + LOG_DBG << "Proposal rejected due to trust failure. " << session.uniqueid; + return 0; + } -//peer session on message callback method -void peer_session_handler::on_close(const comm::comm_session &session) const -{ - std::lock_guard lock(ctx.peer_connections_mutex); - ctx.peer_connections.erase(session.uniqueid); -} + std::lock_guard lock(ctx.collected_msgs.proposals_mutex); // Insert proposal with lock. + + ctx.collected_msgs.proposals.push_back( + p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey(), container->timestamp(), container->lcl())); + } + else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message + { + std::lock_guard lock(ctx.collected_msgs.nonunl_proposals_mutex); // Insert non-unl proposal with lock. + + ctx.collected_msgs.nonunl_proposals.push_back( + p2pmsg::create_nonunl_proposal_from_msg(*content->message_as_NonUnl_Proposal_Message(), container->timestamp())); + } + else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message + { + if (p2pmsg::validate_container_trust(container) != 0) + { + LOG_DBG << "NPL message rejected due to trust failure. " << session.uniqueid; + return 0; + } + + std::lock_guard lock(ctx.collected_msgs.npl_messages_mutex); // Insert npl message with lock. + + // Npl messages are added to the npl message array as it is without deserealizing the content. The same content will be passed down + // to the contract as input in a binary format + const uint8_t *container_buf_ptr = reinterpret_cast(message.data()); + const size_t container_buf_size = message.length(); + const std::string npl_message(reinterpret_cast(container_buf_ptr), container_buf_size); + ctx.collected_msgs.npl_messages.push_back(std::move(npl_message)); + } + else if (content_message_type == p2pmsg::Message_State_Request_Message) + { + const p2p::state_request sr = p2pmsg::create_state_request_from_msg(*content->message_as_State_Request_Message()); + flatbuffers::FlatBufferBuilder fbuf(1024); + + if (state_serve::create_state_response(fbuf, sr) == 0) + { + std::string_view msg = std::string_view( + reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); + session.send(msg); + } + } + else if (content_message_type == p2pmsg::Message_State_Response_Message) + { + if (p2pmsg::validate_container_trust(container) != 0) + { + LOG_DBG << "State response message rejected due to trust failure. " << session.uniqueid; + return 0; + } + + if (state_sync::ctx.is_syncing) // Only accept state responses if state is syncing. + { + // Insert state_response with lock. + std::lock_guard lock(ctx.collected_msgs.state_response_mutex); + std::string response(reinterpret_cast(content_ptr), content_size); + ctx.collected_msgs.state_response.push_back(std::move(response)); + } + } + else if (content_message_type == p2pmsg::Message_History_Request_Message) //message is a lcl history request message + { + const p2p::history_request hr = p2pmsg::create_history_request_from_msg(*content->message_as_History_Request_Message()); + //first check node has the required lcl available. -> if so send lcl history accordingly. + const bool req_lcl_avail = cons::check_required_lcl_availability(hr); + if (req_lcl_avail) + { + flatbuffers::FlatBufferBuilder fbuf(1024); + p2pmsg::create_msg_from_history_response(fbuf, cons::retrieve_ledger_history(hr)); + std::string_view msg = std::string_view( + reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); + + session.send(msg); + } + } + else if (content_message_type == p2pmsg::Message_History_Response_Message) //message is a lcl history response message + { + if (p2pmsg::validate_container_trust(container) != 0) + { + LOG_DBG << "History response message rejected due to trust failure. " << session.uniqueid; + return 0; + } + + cons::handle_ledger_history_response( + p2pmsg::create_history_response_from_msg(*content->message_as_History_Response_Message())); + } + else + { + session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); + LOG_DBG << "Received invalid peer message type. " << session.uniqueid; + } + return 0; + } + + //peer session on message callback method + void peer_session_handler::on_close(const comm::comm_session &session) const + { + std::lock_guard lock(ctx.peer_connections_mutex); + ctx.peer_connections.erase(session.uniqueid); + } } // namespace p2p \ No newline at end of file diff --git a/src/pchheader.hpp b/src/pchheader.hpp index 8926a262..72be1a20 100644 --- a/src/pchheader.hpp +++ b/src/pchheader.hpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include diff --git a/src/sc.cpp b/src/sc.cpp index e7b9e985..b9dd01c9 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -56,13 +56,12 @@ namespace sc LOG_ERR << "Contract process exited with non-normal status code: " << presult; goto failure; } - - if (stop_hpfs_rw_session(state_hash) != 0) - goto failure; } else if (pid == 0) { // Contract process. + util::unmask_signal(); + // Set up the process environment and overlay the contract binary program with execv(). // Close all fds unused by SC process. @@ -106,6 +105,7 @@ namespace sc ret = -1; success: + stop_hpfs_rw_session(state_hash); cleanup_fdmap(ctx.userfds); cleanup_vectorfds(ctx.hpscfds); cleanup_vectorfds(ctx.nplfds); @@ -254,6 +254,8 @@ namespace sc int fetch_outputs(const contract_exec_args &args) { + util::mask_signal(); + while (true) { if (ctx.should_deinit) diff --git a/src/sc.hpp b/src/sc.hpp index 2c1e1b79..73601c0c 100644 --- a/src/sc.hpp +++ b/src/sc.hpp @@ -1,5 +1,5 @@ -#ifndef _HP_PROC_ -#define _HP_PROC_ +#ifndef _HP_SC_ +#define _HP_SC_ #include "pchheader.hpp" #include "usr/usr.hpp" diff --git a/src/state/state_serve.cpp b/src/state/state_serve.cpp new file mode 100644 index 00000000..bcad23c6 --- /dev/null +++ b/src/state/state_serve.cpp @@ -0,0 +1,252 @@ +#include "../pchheader.hpp" +#include "../hpfs/hpfs.hpp" +#include "../hpfs/h32.hpp" +#include "../util.hpp" +#include "../p2p/p2p.hpp" +#include "../fbschema/p2pmsg_content_generated.h" +#include "../fbschema/p2pmsg_helpers.hpp" +#include "../fbschema/common_helpers.hpp" +#include "../cons/cons.hpp" +#include "../hplog.hpp" +#include "state_serve.hpp" + +/** + * Helper functions for serving state requests from other peers. + */ +namespace state_serve +{ + constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB; + + /** + * Creates the reply message for a given state request. + * @param fbuf The flatbuffer builder to construct the reply message. + * @param sr The state request which should be replied to. + */ + int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr) + { + LOG_DBG << "Serving state req. path:" << sr.parent_path << " block_id:" << sr.block_id; + + // If block_id > -1 this means this is a file block data request. + if (sr.block_id > -1) + { + // Vector to hold the block bytes. Normally block size is constant BLOCK_SIZE (4MB), but the + // last block of a file may have a smaller size. + std::vector block; + if (get_file_block(block, sr.parent_path, sr.block_id, sr.expected_hash) == -1) + { + LOG_ERR << "Error in getting file block."; + return -1; + } + + p2p::block_response resp; + resp.path = sr.parent_path; + resp.block_id = sr.block_id; + resp.hash = sr.expected_hash; + resp.data = std::string_view(reinterpret_cast(block.data()), block.size()); + + fbschema::p2pmsg::create_msg_from_block_response(fbuf, resp, cons::ctx.lcl); + } + else + { + // File state request means we have to reply with the file block hash map. + if (sr.is_file) + { + std::vector block_hashes; + std::size_t file_length = 0; + if (get_file_block_hashes(block_hashes, file_length, sr.parent_path, sr.expected_hash) == -1) + { + LOG_ERR << "Error in getting block hashes."; + return -1; + } + + fbschema::p2pmsg::create_msg_from_filehashmap_response( + fbuf, sr.parent_path, block_hashes, + file_length, sr.expected_hash, cons::ctx.lcl); + } + else + { + // If the state request is for a directory we need to reply with the + // file system entries and their hashes inside that dir. + std::vector child_hash_nodes; + if (get_fs_entry_hashes(child_hash_nodes, sr.parent_path, sr.expected_hash) == -1) + { + LOG_ERR << "Error in getting fs entries."; + return -1; + } + + fbschema::p2pmsg::create_msg_from_fsentry_response( + fbuf, sr.parent_path, child_hash_nodes, sr.expected_hash, cons::ctx.lcl); + } + } + + return 0; + } + + /** + * Retrieves the specified data block from a state file if expected hash matches. + * @return Number of bytes read on success. -1 on failure. + */ + int get_file_block(std::vector &block, const std::string_view vpath, + const uint32_t block_id, const hpfs::h32 expected_hash) + { + int fd = 0; + pid_t hpfs_pid = 0; + std::string mount_dir; + if (hpfs::start_fs_session(hpfs_pid, mount_dir, "ro", true) == -1) + return -1; + + // Check whether the existing block hash matches expected hash. + std::vector block_hashes; + if (hpfs::get_file_block_hashes(block_hashes, mount_dir, vpath) == -1) + goto failure; + + if (block_id >= block_hashes.size()) + { + LOG_DBG << "Requested block_id " << block_id << " does not exist."; + goto failure; + } + + if (block_hashes[block_id] != expected_hash) + { + LOG_DBG << "Expected hash mismatch."; + goto failure; + } + + // Get actual block data. + { + const std::string file_path = std::string(mount_dir).append(vpath); + const off_t block_offset = block_id * BLOCK_SIZE; + fd = open(file_path.c_str(), O_RDONLY); + if (fd == -1) + { + LOG_ERR << errno << ": Open failed. " << file_path; + goto failure; + } + + struct stat st; + if (fstat(fd, &st) == -1) + { + LOG_ERR << errno << ": Stat failed. " << file_path; + goto failure; + } + + if (!S_ISREG(st.st_mode)) + { + LOG_ERR << "Not a file. " << file_path; + goto failure; + } + + if (block_offset > st.st_size) + { + LOG_ERR << "Block offset " << block_offset << " larger than file " << st.st_size << " - " << file_path; + goto failure; + } + + const size_t read_len = MIN(BLOCK_SIZE, (st.st_size - block_offset)); + block.resize(read_len); + + lseek(fd, block_offset, SEEK_SET); + const int res = read(fd, block.data(), read_len); + if (res < read_len) + { + LOG_ERR << errno << ": Read failed (result:" << res + << " off:" << block_offset << " len:" << read_len << "). " << file_path; + goto failure; + } + } + + goto success; + + failure: + if (fd > 0) + close(fd); + util::kill_process(hpfs_pid, true); + return -1; + success: + if (fd > 0) + close(fd); + if (util::kill_process(hpfs_pid, true) == -1) + return -1; + return 0; + } + + int get_file_block_hashes(std::vector &hashes, size_t &file_length, + const std::string_view vpath, const hpfs::h32 expected_hash) + { + pid_t hpfs_pid = 0; + std::string mount_dir; + if (hpfs::start_fs_session(hpfs_pid, mount_dir, "ro", true) == -1) + return -1; + + // Check whether the existing file hash matches expected hash. + hpfs::h32 file_hash = hpfs::h32_empty; + if (hpfs::get_hash(file_hash, mount_dir, vpath) == -1) + goto failure; + + if (file_hash != expected_hash) + { + LOG_DBG << "Expected hash mismatch."; + goto failure; + } + + // Get the block hashes. + if (hpfs::get_file_block_hashes(hashes, mount_dir, vpath) == -1) + goto failure; + + // Get actual file length. + { + const std::string file_path = std::string(mount_dir).append(vpath); + struct stat st; + if (stat(file_path.c_str(), &st) == -1) + { + LOG_ERR << errno << ": Stat failed. " << file_path; + goto failure; + } + file_length = st.st_size; + } + + goto success; + + failure: + util::kill_process(hpfs_pid, true); + return -1; + success: + if (util::kill_process(hpfs_pid, true) == -1) + return -1; + return 0; + } + + int get_fs_entry_hashes(std::vector &hash_nodes, + const std::string_view vpath, const hpfs::h32 expected_hash) + { + pid_t hpfs_pid = 0; + std::string mount_dir; + if (hpfs::start_fs_session(hpfs_pid, mount_dir, "ro", true) == -1) + return -1; + + // Check whether the existing dir hash matches expected hash. + hpfs::h32 dir_hash = hpfs::h32_empty; + if (hpfs::get_hash(dir_hash, mount_dir, vpath) == -1) + goto failure; + + if (dir_hash != expected_hash) + { + LOG_DBG << "Expected hash mismatch."; + goto failure; + } + + // Get the children hash nodes. + if (hpfs::get_dir_children_hashes(hash_nodes, mount_dir, vpath) == -1) + goto failure; + + goto success; + + failure: + util::kill_process(hpfs_pid, true); + return -1; + success: + if (util::kill_process(hpfs_pid, true) == -1) + return -1; + return 0; + } +} // namespace state_serve \ No newline at end of file diff --git a/src/state/state_serve.hpp b/src/state/state_serve.hpp new file mode 100644 index 00000000..e3e1efc5 --- /dev/null +++ b/src/state/state_serve.hpp @@ -0,0 +1,23 @@ +#ifndef _HP_CONS_STATE_SERVE_ +#define _HP_CONS_STATE_SERVE_ + +#include "../hpfs/h32.hpp" +#include "../hpfs/hpfs.hpp" +#include "../p2p/p2p.hpp" +#include "../fbschema/p2pmsg_content_generated.h" + +namespace state_serve +{ + int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr); + + int get_file_block(std::vector &vec, const std::string_view vpath, + const uint32_t block_id, const hpfs::h32 expected_hash); + + int get_file_block_hashes(std::vector &hashes, size_t &file_length, + const std::string_view vpath, const hpfs::h32 expected_hash); + + int get_fs_entry_hashes(std::vector &hash_nodes, + const std::string_view vpath, const hpfs::h32 expected_hash); +} // namespace state_sync + +#endif \ No newline at end of file diff --git a/src/state/state_sync.cpp b/src/state/state_sync.cpp new file mode 100644 index 00000000..d1dbb6b4 --- /dev/null +++ b/src/state/state_sync.cpp @@ -0,0 +1,429 @@ +#include "../state/state_sync.hpp" +#include "../fbschema/p2pmsg_helpers.hpp" +#include "../fbschema/p2pmsg_content_generated.h" +#include "../fbschema/common_helpers.hpp" +#include "../p2p/p2p.hpp" +#include "../pchheader.hpp" +#include "../cons/cons.hpp" +#include "../hplog.hpp" +#include "../util.hpp" +#include "../hpfs/hpfs.hpp" +#include "../hpfs/h32.hpp" + +namespace state_sync +{ + // Idle loop sleep time (milliseconds). + constexpr uint16_t IDLE_WAIT = 50; + + // Max number of requests that can be awaiting response at any given time. + constexpr uint16_t MAX_AWAITING_REQUESTS = 1; + + // Request loop sleep time (milliseconds). + constexpr uint16_t REQUEST_LOOP_WAIT = 20; + + constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB; + + constexpr int FILE_PERMS = 0644; + + // No. of milliseconds to wait before resubmitting a request. + uint16_t REQUEST_RESUBMIT_TIMEOUT; + + sync_context ctx; + + int init() + { + REQUEST_RESUBMIT_TIMEOUT = conf::cfg.roundtime / 2; + ctx.target_state = hpfs::h32_empty; + ctx.state_sync_thread = std::thread(state_syncer_loop); + return 0; + } + + void deinit() + { + ctx.is_syncing = false; + ctx.is_shutting_down = true; + ctx.state_sync_thread.join(); + } + + /** + * Sets a new target state for the syncing process. + * @param target_state The target state which we should sync towards. + * @param completion_callback The callback function to call upon state sync completion. + */ + void set_target(const hpfs::h32 target_state, void (*const completion_callback)(const hpfs::h32)) + { + std::lock_guard lock(ctx.target_state_update_lock); + + // Do not do anything if we are already syncing towards the specified target state. + if (ctx.is_shutting_down || (ctx.is_syncing && ctx.target_state == target_state)) + return; + + ctx.completion_callback = completion_callback; + ctx.target_state = target_state; + ctx.is_syncing = true; + } + + /** + * Runs the state sync worker loop. + */ + void state_syncer_loop() + { + util::mask_signal(); + + LOG_INFO << "State sync: Worker started."; + + while (!ctx.is_shutting_down) + { + util::sleep(IDLE_WAIT); + + // Keep idling if we are not doing any sync activity. + { + std::lock_guard lock(ctx.target_state_update_lock); + if (!ctx.is_syncing) + continue; + + LOG_INFO << "State sync: Starting sync for target state: " << ctx.target_state; + } + + LOG_DBG << "State sync: Starting hpfs rw session..."; + pid_t hpfs_pid = 0; + if (hpfs::start_fs_session(hpfs_pid, ctx.hpfs_mount_dir, "rw", true) != -1) + { + while (!ctx.is_shutting_down) + { + hpfs::h32 new_state = hpfs::h32_empty; + request_loop(ctx.target_state, new_state); + + if (ctx.is_shutting_down) + break; + + ctx.pending_requests.clear(); + ctx.candidate_state_responses.clear(); + ctx.submitted_requests.clear(); + + { + std::lock_guard lock(ctx.target_state_update_lock); + + if (new_state == ctx.target_state) + { + LOG_INFO << "State sync: Target state achieved: " << new_state; + ctx.completion_callback(new_state); + break; + } + else + { + LOG_INFO << "State sync: Continuing sync for new target: " << ctx.target_state; + continue; + } + } + } + + // Stop hpfs rw session. + LOG_DBG << "State sync: Stopping hpfs session... pid:" << hpfs_pid; + util::kill_process(hpfs_pid, true); + } + else + { + LOG_ERR << "State sync: Failed to start hpfs rw session"; + } + + ctx.target_state = hpfs::h32_empty; + ctx.is_syncing = false; + } + + LOG_INFO << "State sync: Worker stopped."; + } + + void request_loop(const hpfs::h32 current_target, hpfs::h32 &updated_state) + { + // Send the initial root state request. + submit_request(backlog_item{BACKLOG_ITEM_TYPE::DIR, "/", -1, current_target}); + + while (!should_stop_request_loop(current_target)) + { + util::sleep(REQUEST_LOOP_WAIT); + + { + std::lock_guard lock(p2p::ctx.collected_msgs.state_response_mutex); + + // Move collected state responses over to local candidate responses list. + if (!p2p::ctx.collected_msgs.state_response.empty()) + ctx.candidate_state_responses.splice(ctx.candidate_state_responses.end(), p2p::ctx.collected_msgs.state_response); + } + + for (auto &response : ctx.candidate_state_responses) + { + if (should_stop_request_loop(current_target)) + return; + + const fbschema::p2pmsg::Content *content = fbschema::p2pmsg::GetContent(response.data()); + const fbschema::p2pmsg::State_Response_Message *resp_msg = content->message_as_State_Response_Message(); + + // Check whether we are actually waiting for this response's hash. If not, ignore it. + const hpfs::h32 response_hash = fbschema::flatbuff_bytes_to_hash(resp_msg->hash()); + const auto pending_resp_itr = ctx.submitted_requests.find(response_hash); + if (pending_resp_itr == ctx.submitted_requests.end()) + { + LOG_DBG << "Skipping state response due to hash mismatch. Received:" << response_hash; + continue; + } + + // Now that we have received matching hash, remove it from the waiting list. + ctx.submitted_requests.erase(pending_resp_itr); + + // Process the message based on response type. + const fbschema::p2pmsg::State_Response msg_type = resp_msg->state_response_type(); + + if (msg_type == fbschema::p2pmsg::State_Response_Fs_Entry_Response) + handle_fs_entry_response(resp_msg->state_response_as_Fs_Entry_Response()); + else if (msg_type == fbschema::p2pmsg::State_Response_File_HashMap_Response) + handle_file_hashmap_response(resp_msg->state_response_as_File_HashMap_Response()); + else if (msg_type == fbschema::p2pmsg::State_Response_Block_Response) + handle_file_block_response(resp_msg->state_response_as_Block_Response()); + + // After handling each response, check whether we have reached target state. + hpfs::get_hash(updated_state, ctx.hpfs_mount_dir, "/"); + LOG_DBG << "State sync: current:" << updated_state << " | target:" << current_target; + if (updated_state == current_target) + return; + } + + ctx.candidate_state_responses.clear(); + + // Check for long-awaited responses and re-request them. + for (auto &[hash, request] : ctx.submitted_requests) + { + if (should_stop_request_loop(current_target)) + return; + + if (request.waiting_time < REQUEST_RESUBMIT_TIMEOUT) + { + // Increment wait time. + request.waiting_time += REQUEST_LOOP_WAIT; + } + else + { + // Reset the counter and re-submit request. + request.waiting_time = 0; + LOG_DBG << "State sync: Resubmitting request..."; + submit_request(request); + } + } + + // Check whether we can submit any more requests. + if (!ctx.pending_requests.empty() && ctx.submitted_requests.size() < MAX_AWAITING_REQUESTS) + { + const uint16_t available_slots = MAX_AWAITING_REQUESTS - ctx.submitted_requests.size(); + for (int i = 0; i < available_slots && !ctx.pending_requests.empty(); i++) + { + if (should_stop_request_loop(current_target)) + return; + + const backlog_item &request = ctx.pending_requests.front(); + submit_request(request); + ctx.pending_requests.pop_front(); + } + } + } + } + + /** + * Indicates whether to break out of state request processing loop. + */ + bool should_stop_request_loop(const hpfs::h32 current_target) + { + if (ctx.is_shutting_down) + return true; + + // Stop request loop if the target has changed. + std::lock_guard lock(ctx.target_state_update_lock); + return current_target != ctx.target_state; + } + + /** + * Sends a state request to a random peer. + * @param path Requested file or dir path. + * @param is_file Whether the requested path if a file or dir. + * @param block_id The requested block id. Only relevant if requesting a file block. Otherwise -1. + * @param expected_hash The expected hash of the requested data. The peer will ignore the request if their hash is different. + */ + void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, const hpfs::h32 expected_hash) + { + p2p::state_request sr; + sr.parent_path = path; + sr.is_file = is_file; + sr.block_id = block_id; + sr.expected_hash = expected_hash; + + flatbuffers::FlatBufferBuilder fbuf(1024); + fbschema::p2pmsg::create_msg_from_state_request(fbuf, sr, cons::ctx.lcl); + p2p::send_message_to_random_peer(fbuf); //todo: send to a node that hold the majority state to improve reliability of retrieving state. + } + + /** + * Submits a pending state request to the peer. + */ + void submit_request(const backlog_item &request) + { + LOG_DBG << "State sync: Submitting request. type:" << request.type + << " path:" << request.path << " block_id:" << request.block_id + << " hash:" << request.expected_hash; + + ctx.submitted_requests.try_emplace(request.expected_hash, request); + + const bool is_file = request.type != BACKLOG_ITEM_TYPE::DIR; + request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash); + } + + /** + * Process dir children response. + */ + int handle_fs_entry_response(const fbschema::p2pmsg::Fs_Entry_Response *fs_entry_resp) + { + // Get the parent path of the fs entries we have received. + std::string_view parent_vpath = fbschema::flatbuff_str_to_sv(fs_entry_resp->path()); + LOG_DBG << "State sync: Processing fs entries response for " << parent_vpath; + + // Get fs entries we have received. + std::unordered_map peer_fs_entry_map; + fbschema::p2pmsg::flatbuf_statefshashentry_to_statefshashentry(peer_fs_entry_map, fs_entry_resp->entries()); + + // Create physical directory on our side if not exist. + std::string parent_physical_path = std::string(ctx.hpfs_mount_dir).append(parent_vpath); + if (util::create_dir_tree_recursive(parent_physical_path) == -1) + return -1; + + // Get the children hash entries and compare with what we got from peer. + std::vector existing_fs_entries; + if (hpfs::get_dir_children_hashes(existing_fs_entries, ctx.hpfs_mount_dir, parent_vpath) == -1) + return -1; + + // Request more info on fs entries that exist on both sides but are different. + for (const auto &ex_entry : existing_fs_entries) + { + // Construct child vpath. + std::string child_vpath = std::string(parent_vpath) + .append(parent_vpath.back() != '/' ? "/" : "") + .append(ex_entry.name); + + const auto peer_itr = peer_fs_entry_map.find(ex_entry.name); + if (peer_itr != peer_fs_entry_map.end()) + { + // Request state if hash is different. + if (peer_itr->second.hash != ex_entry.hash) + { + // Prioritize file state requests over directories. + if (ex_entry.is_file) + ctx.pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, peer_itr->second.hash}); + else + ctx.pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, peer_itr->second.hash}); + } + + peer_fs_entry_map.erase(peer_itr); + } + else + { + // If there was an entry that does not exist on other side, delete it. + std::string child_physical_path = std::string(ctx.hpfs_mount_dir).append(child_vpath); + + if ((ex_entry.is_file && unlink(child_physical_path.c_str()) == -1) || + !ex_entry.is_file && rmdir(child_physical_path.c_str()) == -1) + return -1; + + LOG_DBG << "State sync: Deleted " << (ex_entry.is_file ? "file" : "dir") << " path " << child_vpath; + } + } + + // Queue the remaining peer fs entries (that our side does not have at all) to request. + for (const auto &[name, fs_entry] : peer_fs_entry_map) + { + // Construct child vpath. + std::string child_vpath = std::string(parent_vpath) + .append(parent_vpath.back() != '/' ? "/" : "") + .append(name); + + // Prioritize file state requests over directories. + if (fs_entry.is_file) + ctx.pending_requests.push_front(backlog_item{BACKLOG_ITEM_TYPE::FILE, child_vpath, -1, fs_entry.hash}); + else + ctx.pending_requests.push_back(backlog_item{BACKLOG_ITEM_TYPE::DIR, child_vpath, -1, fs_entry.hash}); + } + + return 0; + } + + /** + * Process file block hash map response. + */ + int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response *file_resp) + { + // Get the file path of the block hashes we have received. + std::string file_vpath = std::string(fbschema::flatbuff_str_to_sv(file_resp->path())); + LOG_DBG << "State sync: Processing file block hashes response for " << file_vpath; + + // File block hashes on our side (file might not exist on our side). + std::vector existing_hashes; + if (hpfs::get_file_block_hashes(existing_hashes, ctx.hpfs_mount_dir, file_vpath) == -1 && errno != ENOENT) + return -1; + const size_t existing_hash_count = existing_hashes.size(); + + // File block hashes we received from the peer. + const hpfs::h32 *peer_hashes = reinterpret_cast(file_resp->hash_map()->data()); + const size_t peer_hash_count = file_resp->hash_map()->size() / sizeof(hpfs::h32); + + // Compare the block hashes and request any differences. + auto insert_itr = ctx.pending_requests.begin(); + const int32_t max_block_id = MAX(existing_hash_count, peer_hash_count) - 1; + for (int32_t block_id = 0; block_id <= max_block_id; block_id++) + { + // Insert at front to give priority to block requests while preserving block order. + if (block_id >= existing_hash_count || existing_hashes[block_id] != peer_hashes[block_id]) + ctx.pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, file_vpath, block_id, peer_hashes[block_id]}); + } + + if (existing_hashes.size() >= peer_hash_count) + { + // If peer file might be smaller, truncate our file to match with peer file. + std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(file_vpath); + if (truncate(file_physical_path.c_str(), file_resp->file_length()) == -1) + return -1; + } + + return 0; + } + + /** + * Process file block response. + */ + int handle_file_block_response(const fbschema::p2pmsg::Block_Response *block_msg) + { + // Get the file path of the block data we have received. + std::string_view file_vpath = fbschema::flatbuff_str_to_sv(block_msg->path()); + const uint32_t block_id = block_msg->block_id(); + std::string_view buf = fbschema::flatbuff_bytes_to_sv(block_msg->data()); + + LOG_DBG << "State sync: Writing block_id " << block_id + << " (len:" << buf.length() + << ") of " << file_vpath; + + std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(file_vpath); + const int fd = open(file_physical_path.c_str(), O_WRONLY | O_CREAT, FILE_PERMS); + if (fd == -1) + { + LOG_ERR << errno << " Open failed " << file_physical_path; + return -1; + } + + const off_t offset = block_id * BLOCK_SIZE; + const int res = pwrite(fd, buf.data(), buf.length(), offset); + close(fd); + if (res < buf.length()) + { + LOG_ERR << errno << " Write failed " << file_physical_path; + return -1; + } + + return 0; + } + +} // namespace state_sync \ No newline at end of file diff --git a/src/state/state_sync.hpp b/src/state/state_sync.hpp new file mode 100644 index 00000000..599b7323 --- /dev/null +++ b/src/state/state_sync.hpp @@ -0,0 +1,83 @@ +#ifndef _HP_CONS_STATE_SYNC_ +#define _HP_CONS_STATE_SYNC_ + +#include "../pchheader.hpp" +#include "../p2p/p2p.hpp" +#include "../fbschema/p2pmsg_content_generated.h" +#include "../hpfs/h32.hpp" + +namespace state_sync +{ + + enum BACKLOG_ITEM_TYPE + { + DIR = 0, + FILE = 1, + BLOCK = 2 + }; + + // Represents a queued up state sync operation which needs to be performed. + struct backlog_item + { + BACKLOG_ITEM_TYPE type; + std::string path; + int32_t block_id = -1; // Only relevant if type=BLOCK + hpfs::h32 expected_hash; + + // No. of millisconds that this item has been waiting in pending state. + // Used by pending_responses list to increase waiting time and resubmit request. + uint16_t waiting_time = 0; + }; + + struct sync_context + { + // The current target state we are syncing towards. + hpfs::h32 target_state; + + // List of state responses flatbuffer messages to be processed. + std::list candidate_state_responses; + + // List of pending sync requests to be sent out. + std::list pending_requests; + + // List of submitted requests we are awaiting responses for, keyed by expected response hash. + std::unordered_map submitted_requests; + + std::thread state_sync_thread; + std::mutex target_state_update_lock; + bool is_syncing = false; + bool is_shutting_down = false; + std::string hpfs_mount_dir; + + void (*completion_callback)(const hpfs::h32); + }; + + extern sync_context ctx; + + extern std::list candidate_state_responses; + + int init(); + + void deinit(); + + void set_target(const hpfs::h32 target_state, void (*const completion_callback)(const hpfs::h32)); + + void state_syncer_loop(); + + void request_loop(const hpfs::h32 current_target, hpfs::h32 &updated_state); + + bool should_stop_request_loop(const hpfs::h32 current_target); + + void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, const hpfs::h32 expected_hash); + + void submit_request(const backlog_item &request); + + int handle_fs_entry_response(const fbschema::p2pmsg::Fs_Entry_Response *fs_entry_resp); + + int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response *file_resp); + + int handle_file_block_response(const fbschema::p2pmsg::Block_Response *block_msg); + +} // namespace state_sync + +#endif \ No newline at end of file diff --git a/src/util.cpp b/src/util.cpp index 2b5e020c..c08ebd7b 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -207,23 +207,61 @@ namespace util pthread_sigmask(SIG_BLOCK, &mask, NULL); } - // Kill a process with SIGINT and wait until it stops running. - int kill_process(const pid_t pid, const bool wait) + // Clears signal mask from the calling thread. + // Used for other processes forked from hpcore threads. + void unmask_signal() { - if (kill(pid, SIGINT) == -1) + sigset_t mask; + sigemptyset(&mask); + pthread_sigmask(SIG_SETMASK, &mask, NULL); + } + + // Kill a process with a signal and wait until it stops running. + int kill_process(const pid_t pid, const bool wait, int signal) + { + if (kill(pid, signal) == -1) { - LOG_ERR << errno << ": Error issuing SIGINT to pid " << pid; + LOG_ERR << errno << ": Error issuing signal to pid " << pid; return -1; } int pid_status; if (wait && waitpid(pid, &pid_status, 0) == -1) { - LOG_ERR << errno << ": waitpid failed."; + LOG_ERR << errno << ": waitpid after kill failed."; return -1; } return 0; } + bool is_dir_exists(std::string_view path) + { + struct stat st; + return (stat(path.data(), &st) == 0 && S_ISDIR(st.st_mode)); + } + + int create_dir_tree_recursive(std::string_view path) + { + if (strcmp(path.data(), "/") == 0) // No need of checking if we are at root. + return 0; + + // Check whether this dir exists or not. + struct stat st; + if (stat(path.data(), &st) != 0 || !S_ISDIR(st.st_mode)) + { + // Check and create parent dir tree first. + char *path2 = strdup(path.data()); + char *parent_dir_path = dirname(path2); + if (create_dir_tree_recursive(parent_dir_path) == -1) + return -1; + + // Create this dir. + if (mkdir(path.data(), S_IRWXU | S_IRWXG | S_IROTH) == -1) + return -1; + } + + return 0; + } + } // namespace util diff --git a/src/util.hpp b/src/util.hpp index a0162f11..032654cd 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -7,80 +7,87 @@ /** * Contains helper functions and data structures used by multiple other subsystems. */ + +#define MAX(a, b) ((a > b) ? a : b) +#define MIN(a, b) ((a < b) ? a : b) + namespace util { + // Hot Pocket version. Displayed on 'hotpocket version' and written to new contract configs. + constexpr const char *HP_VERSION = "0.1"; -// Hot Pocket version. Displayed on 'hotpocket version' and written to new contract configs. -constexpr const char *HP_VERSION = "0.1"; + // Minimum compatible contract config version (this will be used to validate contract configs) + constexpr const char *MIN_CONTRACT_VERSION = "0.1"; -// Minimum compatible contract config version (this will be used to validate contract configs) -constexpr const char *MIN_CONTRACT_VERSION = "0.1"; + // Current version of the peer message protocol. + constexpr uint8_t PEERMSG_VERSION = 1; -// Current version of the peer message protocol. -constexpr uint8_t PEERMSG_VERSION = 1; + // Minimum compatible peer message version (this will be used to accept/reject incoming peer connections) + // (Keeping this as int for effcient msg payload and comparison) + constexpr uint8_t MIN_PEERMSG_VERSION = 1; -// Minimum compatible peer message version (this will be used to accept/reject incoming peer connections) -// (Keeping this as int for effcient msg payload and comparison) -constexpr uint8_t MIN_PEERMSG_VERSION = 1; + // Minimum compatible npl contract input version (this will be used to generate the npl input to feed the contract) + // (Keeping this as int for effcient msg payload and comparison) + constexpr uint8_t MIN_NPL_INPUT_VERSION = 1; -// Minimum compatible npl contract input version (this will be used to generate the npl input to feed the contract) -// (Keeping this as int for effcient msg payload and comparison) -constexpr uint8_t MIN_NPL_INPUT_VERSION = 1; - - - -/** + /** * FIFO hash set with a max size. */ -class rollover_hashset -{ -private: - // The set of recent hashes used for duplicate detection. - std::unordered_set recent_hashes; + class rollover_hashset + { + private: + // The set of recent hashes used for duplicate detection. + std::unordered_set recent_hashes; - // The supporting list of recent hashes used for adding and removing hashes from - // the 'recent_hashes' in a first-in-first-out manner. - std::list recent_hashes_list; + // The supporting list of recent hashes used for adding and removing hashes from + // the 'recent_hashes' in a first-in-first-out manner. + std::list recent_hashes_list; - uint32_t maxsize; + uint32_t maxsize; -public: - rollover_hashset(const uint32_t maxsize); - bool try_emplace(const std::string hash); -}; + public: + rollover_hashset(const uint32_t maxsize); + bool try_emplace(const std::string hash); + }; -/** + /** * A string set with expiration for elements. */ -class ttl_set -{ -private: - // Keeps short-lived items with their absolute expiration time. - std::unordered_map ttlmap; + class ttl_set + { + private: + // Keeps short-lived items with their absolute expiration time. + std::unordered_map ttlmap; -public: - void emplace(const std::string key, uint64_t ttl_milli); - void erase(const std::string &key); - bool exists(const std::string &key); -}; + public: + void emplace(const std::string key, uint64_t ttl_milli); + void erase(const std::string &key); + bool exists(const std::string &key); + }; -int bin2hex(std::string &encoded_string, const unsigned char *bin, const size_t bin_len); + int bin2hex(std::string &encoded_string, const unsigned char *bin, const size_t bin_len); -int hex2bin(unsigned char *decoded, const size_t decoded_len, std::string_view hex_str); + int hex2bin(unsigned char *decoded, const size_t decoded_len, std::string_view hex_str); -int64_t get_epoch_milliseconds(); + int64_t get_epoch_milliseconds(); -void sleep(const uint64_t milliseconds); + void sleep(const uint64_t milliseconds); -int version_compare(const std::string &x, const std::string &y); + int version_compare(const std::string &x, const std::string &y); -std::string_view getsv(const rapidjson::Value &v); + std::string_view getsv(const rapidjson::Value &v); -std::string realpath(std::string path); + std::string realpath(std::string path); -void mask_signal(); + void mask_signal(); -int kill_process(const pid_t pid, const bool wait); + void unmask_signal(); + + int kill_process(const pid_t pid, const bool wait, int signal = SIGINT); + + bool is_dir_exists(std::string_view path); + + int create_dir_tree_recursive(std::string_view path); } // namespace util diff --git a/test/bin/hpfs b/test/bin/hpfs index 0c51e627..57bb1edd 100755 Binary files a/test/bin/hpfs and b/test/bin/hpfs differ diff --git a/test/local-cluster/cluster-create.sh b/test/local-cluster/cluster-create.sh index 4ac5d43a..40d150c0 100755 --- a/test/local-cluster/cluster-create.sh +++ b/test/local-cluster/cluster-create.sh @@ -117,14 +117,16 @@ do mkdir -p ./node$i/state/seed > /dev/null 2>&1 - # Load credit balance for user for appbill testing purposes. pushd ./node$i/state/seed/ > /dev/null 2>&1 + + # Load credit balance for user for appbill testing purposes. >appbill.table ../../../../bin/appbill --credit "705bf26354ee4c63c0e5d5d883c07cefc3196d049bd3825f827eb3bc23ead035" 10000 - popd > /dev/null 2>&1 # Copy any more initial state files for testing. - #cp ~/my_big_file ~/hpcore/hpcluster/node$i/state/seed/ + # cp ~/my_big_file . + + popd > /dev/null 2>&1 done