diff --git a/CMakeLists.txt b/CMakeLists.txt index 3f8aa947..eac653a4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,8 +46,8 @@ add_executable(hpcore src/usr/user_session_handler.cpp src/usr/usr.cpp src/usr/read_req.cpp - src/cons/cons.cpp - src/cons/ledger_handler.cpp + src/ledger.cpp + src/consensus.cpp src/state/state_sync.cpp src/state/state_serve.cpp src/main.cpp diff --git a/README.md b/README.md index 36e48e65..11a14830 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,9 @@ Code is divided into subsystems via namespaces. **p2p::** Handles peer-to-peer connections and message exchange between nodes. Makes use of **crypto** and **comm**. -**cons::** Handles consensus and proposal rounds. Makes use of **usr**, **p2p** and **sc** +**consensus::** Handles consensus and proposal rounds. Makes use of **usr**, **p2p** and **sc** + +**ledger::** Maintains the ledger and handles ledger syncing activites. **comm::** Handles generic web sockets communication functionality. Mainly acts as a wrapper for websocketd/websocat. diff --git a/src/cons/ledger_handler.cpp b/src/cons/ledger_handler.cpp deleted file mode 100644 index 92b0472e..00000000 --- a/src/cons/ledger_handler.cpp +++ /dev/null @@ -1,476 +0,0 @@ -#include "../pchheader.hpp" -#include "../conf.hpp" -#include "../crypto.hpp" -#include "../p2p/p2p.hpp" -#include "../msg/fbuf/common_helpers.hpp" -#include "../msg/fbuf/ledger_helpers.hpp" -#include "../msg/fbuf/p2pmsg_helpers.hpp" -#include "../hplog.hpp" -#include "ledger_handler.hpp" -#include "cons.hpp" - -namespace cons -{ - - namespace p2pmsg = msg::fbuf::p2pmsg; - - /** - * Create and save ledger from the given proposal message. - * @param proposal consensus reached Satge 3 proposal. - * @return tuple of current lcl sequence number and file name of the saved lcl. - */ - const std::tuple save_ledger(const p2p::proposal &proposal) - { - const size_t pos = proposal.lcl.find("-"); - uint64_t led_seq_no = 0; - - if (pos != std::string::npos) - { - led_seq_no = std::stoull(proposal.lcl.substr(0, pos)); //get lcl sequence number. - led_seq_no++; //current lcl sequence number. - } - else - { - //lcl records should follow [ledger sequnce numer]-lcl[lcl hex] format. - LOG_ERROR << "Invalid lcl name: " << proposal.lcl << " when saving ledger."; - } - - //Serialize lcl using flatbuffer ledger schema. - flatbuffers::FlatBufferBuilder builder(1024); - const std::string_view ledger_str = msg::fbuf::ledger::create_ledger_from_proposal(builder, proposal, led_seq_no); - - //Get binary hash of the the serialized lcl. - const std::string lcl = crypto::get_hash(ledger_str); - - //Get hex from binary hash. - std::string lcl_hash; - util::bin2hex(lcl_hash, - reinterpret_cast(lcl.data()), - lcl.size()); - - //construct lcl file name. - //lcl file name should follow [ledger sequnce numer]-lcl[lcl hex] format. - const std::string seq_no_str = std::to_string(led_seq_no); - std::string file_name; - file_name.reserve(lcl_hash.size() + seq_no_str.size() + 1); - file_name.append(seq_no_str) - .append("-") - .append(lcl_hash); - - write_ledger(file_name, ledger_str.data(), ledger_str.size()); - - ledger_cache_entry c; - c.lcl = file_name; - c.state = proposal.state.to_string_view(); - cons::ctx.ledger_cache.emplace(led_seq_no, std::move(c)); - - //Remove old ledgers that exceeds max sequence range. - if (led_seq_no > MAX_LEDGER_SEQUENCE) - { - remove_old_ledgers(led_seq_no - MAX_LEDGER_SEQUENCE); - } - - return std::make_tuple(led_seq_no, std::move(file_name)); - } - - /** - * Remove old ledgers that exceeds max sequence range from file system and ledger history cache. - * @param led_seq_no minimum sequence number to be in history. - */ - void remove_old_ledgers(const uint64_t led_seq_no) - { - std::map::iterator itr; - - std::string dir_path; - - dir_path.reserve(conf::ctx.hist_dir.size() + 1); - dir_path.append(conf::ctx.hist_dir) - .append("/"); - - for (itr = cons::ctx.ledger_cache.begin(); - itr != cons::ctx.ledger_cache.lower_bound(led_seq_no + 1); - itr++) - { - const std::string file_name = itr->second.lcl; - std::string file_path; - file_path.reserve(dir_path.size() + itr->second.lcl.size() + 4); - file_path.append(dir_path) - .append(file_name) - .append(".lcl"); - - if (util::is_file_exists(file_path)) - util::remove_file(file_path); - } - - if (!cons::ctx.ledger_cache.empty()) - cons::ctx.ledger_cache.erase(cons::ctx.ledger_cache.begin(), cons::ctx.ledger_cache.lower_bound(led_seq_no + 1)); - } - - /** - * Write ledger to file system. - * @param file_name current ledger sequence number. - * @param ledger_raw raw lcl data. - * @param ledger_size size of the raw lcl data. - */ - void write_ledger(const std::string &file_name, const char *ledger_raw, size_t ledger_size) - { - //create file path to save ledger. - //file name -> [ledger sequnce numer]-[lcl hex] - - std::string path; - - path.reserve(file_name.size() + conf::ctx.hist_dir.size() + 5); - path.append(conf::ctx.hist_dir) - .append("/") - .append(file_name) - .append(".lcl"); - - //write ledger to file system - std::ofstream ofs(std::move(path)); - ofs.write(ledger_raw, ledger_size); - ofs.close(); - } - - /** - * Delete ledger from file system. - * @param file_name name of ledger to be deleted. - */ - void remove_ledger(const std::string &file_name) - { - std::string file_path; - file_path.reserve(conf::ctx.hist_dir.size() + file_name.size() + 5); - file_path.append(conf::ctx.hist_dir) - .append("/") - .append(file_name) - .append(".lcl"); - util::remove_file(file_path); - } - - /** - * Retrieve lcl(last closed ledger) information from ledger history. - * @return A ledger_history struct representing the lcl. - */ - const ledger_history load_ledger() - { - ledger_history ldg_hist; - //Get all records at lcl history direcory and find the last closed ledger. - size_t latest_pos = 0; - for (const auto &entry : util::fetch_dir_entries(conf::ctx.hist_dir)) - { - std::string file_path(conf::ctx.hist_dir); - file_path.append("/").append(entry.d_name); - - if (util::is_dir_exists(file_path)) - { - LOG_ERROR << "Found directory " << entry.d_name << " in " << conf::ctx.hist_dir << ". There should be no folders in this directory"; - } - else - { - const std::string_view extension = util::fetch_file_extension(file_path); - const std::string file_name(util::remove_file_extension(entry.d_name)); - - if (extension != ".lcl") - { - LOG_ERROR << "Found invalid file extension: " << extension << " for lcl file " << entry.d_name << " in " << conf::ctx.hist_dir; - } - - const size_t pos = file_name.find("-"); - uint64_t seq_no = 0; - - if (pos != std::string::npos) - { - seq_no = std::stoull(file_name.substr(0, pos)); - - std::ifstream file(file_path, std::ios::binary | std::ios::ate); - std::streamsize size = file.tellg(); - file.seekg(0, std::ios::beg); - - std::vector buffer(size); - if (file.read(buffer.data(), size)) - { - const uint8_t *ledger_buf_ptr = reinterpret_cast(buffer.data()); - const msg::fbuf::ledger::Ledger *ledger = msg::fbuf::ledger::GetLedger(ledger_buf_ptr); - ledger_cache_entry c; - c.lcl = file_name; - c.state = msg::fbuf::flatbuff_bytes_to_sv(ledger->state()); - - ldg_hist.cache.emplace(seq_no, std::move(c)); //lcl_cache -> [seq_no-hash] - } - } - else - { - //lcl records should follow [ledger sequnce numer]-lcl[lcl hex] format. - LOG_ERROR << "Invalid lcl file name: " << file_name << " in " << conf::ctx.hist_dir; - } - } - } - - //check if there is a saved lcl file -> if no send genesis lcl. - if (ldg_hist.cache.empty()) - { - ldg_hist.led_seq_no = 0; - ldg_hist.lcl = GENESIS_LEDGER; - } - else - { - ldg_hist.led_seq_no = ldg_hist.cache.rbegin()->first; - ldg_hist.lcl = ldg_hist.cache.rbegin()->second.lcl; - - //Remove old ledgers that exceeds max sequence range. - if (ldg_hist.led_seq_no > MAX_LEDGER_SEQUENCE) - { - remove_old_ledgers(ldg_hist.led_seq_no - MAX_LEDGER_SEQUENCE); - } - } - - return ldg_hist; - } - - /** - * Create and send ledger history request to random node from unl list. - * @param minimum_lcl hash of the minimum lcl from which node need lcl history. - * @param required_lcl hash of the required lcl. - */ - void send_ledger_history_request(const std::string &minimum_lcl, const std::string &required_lcl) - { - p2p::history_request hr; - hr.required_lcl = required_lcl; - hr.minimum_lcl = minimum_lcl; - - flatbuffers::FlatBufferBuilder fbuf(1024); - p2pmsg::create_msg_from_history_request(fbuf, hr); - p2p::send_message_to_random_peer(fbuf); - - ctx.last_requested_lcl = required_lcl; - - LOG_DEBUG << "Ledger history request sent. Required lcl:" << required_lcl.substr(0, 15); - } - - /** - * Check requested lcl is in node's lcl history cache. - * @param hr lcl history request information. - * @return true if requested lcl is in lcl history cache. - */ - bool check_required_lcl_availability(const p2p::history_request &hr) - { - size_t pos = hr.required_lcl.find("-"); - uint64_t req_seq_no = 0; - - //get sequence number of required lcl - if (pos != std::string::npos) - { - req_seq_no = std::stoull(hr.required_lcl.substr(0, pos)); //get required lcl sequence number - } - - if (req_seq_no > 0) - { - const auto itr = cons::ctx.ledger_cache.find(req_seq_no); - if (itr == cons::ctx.ledger_cache.end()) - { - LOG_DEBUG << "Required lcl peer asked for is not in our lcl cache."; - //either this node is also not in consesnsus ledger or other node requesting a lcl that is older than node's current - // minimum lcl sequence becuase of maximum ledger history range. - return false; - } - else if (itr->second.lcl != hr.required_lcl) - { - LOG_DEBUG << "Required lcl peer asked for is not in our lcl cache."; - //either this node or requesting node is in a fork condition. - return false; - } - } - else - { - return false; //Very rare case: node asking for the genisis lcl. - } - return true; - } - - /** - * Retrieve lcl(last closed ledger) information from ledger history. - * @param hr lcl history request information. - * @return A ledger history response containing requested ledger details. - */ - const p2p::history_response retrieve_ledger_history(const p2p::history_request &hr) - { - p2p::history_response history_response; - size_t pos = hr.minimum_lcl.find("-"); - uint64_t min_seq_no = 0; - - //get sequence number of minimum lcl required - if (pos != std::string::npos) - { - min_seq_no = std::stoull(hr.minimum_lcl.substr(0, pos)); //get required lcl sequence number - } - - const auto itr = cons::ctx.ledger_cache.find(min_seq_no); - if (itr != cons::ctx.ledger_cache.end()) //requested minimum lcl is not in our lcl history cache - { - min_seq_no = itr->first; - //check whether minimum lcl node ask for is same as this node's. - //eventhough sequence number are same, lcl hash can be changed if one of node is in a fork condition. - if (hr.minimum_lcl != itr->second.lcl) - { - LOG_DEBUG << "Invalid minimum ledger. Recieved min hash: " << hr.minimum_lcl << " Node hash: " << itr->second.lcl; - history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER; - return history_response; - } - } - else if (min_seq_no > cons::ctx.ledger_cache.rbegin()->first) //Recieved minimum lcl sequence is ahead of node's lcl sequence. - { - LOG_DEBUG << "Invalid minimum ledger. Recieved minimum sequence number is ahead of node current lcl sequence. Recvd hash: " << hr.minimum_lcl; - history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER; - return history_response; - } - else - { - LOG_DEBUG << "Minimum lcl peer asked for is not in our lcl cache. Therefore sending from node minimum lcl"; - min_seq_no = cons::ctx.ledger_cache.begin()->first; - } - - //LOG_DBG << "history request min seq: " << std::to_string(min_seq_no); - - //copy current history cache. - std::map led_cache = cons::ctx.ledger_cache; - - //filter out cache and get raw files here. - led_cache.erase( - led_cache.begin(), - led_cache.lower_bound(min_seq_no)); - - //Get raw content of lcls that going to be send. - for (auto &[seq_no, cache] : led_cache) - { - p2p::history_ledger ledger; - ledger.lcl = cache.lcl; - ledger.state = cache.state; - - std::string path; - - path.reserve(conf::ctx.hist_dir.size() + cache.lcl.size() + 5); - path.append(conf::ctx.hist_dir) - .append("/") - .append(cache.lcl) - .append(".lcl"); - - //read lcl file - std::ifstream file(path, std::ios::binary | std::ios::ate); - std::streamsize size = file.tellg(); - file.seekg(0, std::ios::beg); - - std::vector buffer(size); - if (file.read(buffer.data(), size)) - { - ledger.raw_ledger = reinterpret_cast &>(buffer); - history_response.hist_ledgers.emplace(seq_no, ledger); - } - } - - return history_response; - } - - /** - * Handle recieved ledger history response. - * @param hr lcl history request information. - * @return peer outbound message object with ledger history response. - */ - void handle_ledger_history_response(const p2p::history_response &hr) - { - //check response object contains - if (ctx.last_requested_lcl.empty()) - { - LOG_DEBUG << "Peer sent us a history response but we never asked for one!"; - return; - } - - if (hr.error == p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER) - { - // This means we are in a fork ledger.Remove/rollback current ledger. - // Basically in the long run we'll rolback one by one untill we catch up to valid minimum ledger . - remove_ledger(ctx.lcl); - cons::ctx.ledger_cache.erase(ctx.ledger_cache.rbegin()->first); - LOG_DEBUG << "Invalid min ledger. Removed last ledger."; - } - else - { - //check whether recieved lcl history contains the current lcl node required. - bool have_requested_lcl = false; - for (auto &[seq_no, ledger] : hr.hist_ledgers) - { - if (ctx.last_requested_lcl == ledger.lcl) - { - have_requested_lcl = true; - break; - } - } - - if (!have_requested_lcl) - { - LOG_DEBUG << "Peer sent us a history response but not containing the lcl we asked for! " << hr.hist_ledgers.size(); - return; - } - - //Check integrity of recieved lcl list. - //By checking recieved lcl hashes matches lcl content by applying hashing for each raw content. - for (auto &[seq_no, ledger] : hr.hist_ledgers) - { - const size_t pos = ledger.lcl.find("-"); - std::string rec_lcl_hash = ledger.lcl.substr((pos + 1), (ledger.lcl.size() - 1)); - - //Get binary hash of the the serialized lcl. - const std::string lcl = crypto::get_hash(&ledger.raw_ledger[0], ledger.raw_ledger.size()); - - //Get hex from binary hash - std::string lcl_hash; - - util::bin2hex(lcl_hash, - reinterpret_cast(lcl.data()), - lcl.size()); - - //LOG_DBG << "passed lcl: " << ledger.lcl << " gen lcl: " << lcl_hash; - - //recieved lcl hash and hash generated from recieved lcl content doesn't match -> abandon applying it - if (lcl_hash != rec_lcl_hash) - { - LOG_WARNING << "peer sent us a history response we asked for but the ledger data does not match the ledger hashes"; - //todo: we should penalize peer who send this? - return; - } - } - } - - //Execution to here means the history data sent checks out - //Save recieved lcl in file system and update lcl history cache - for (auto &[seq_no, ledger] : hr.hist_ledgers) - { - auto prev_dup_itr = cons::ctx.ledger_cache.find(seq_no); - if (prev_dup_itr != cons::ctx.ledger_cache.end()) - { - remove_ledger(prev_dup_itr->second.lcl); - cons::ctx.ledger_cache.erase(prev_dup_itr); - } - write_ledger(ledger.lcl, reinterpret_cast(&ledger.raw_ledger[0]), ledger.raw_ledger.size()); - ledger_cache_entry l; - l.lcl = ledger.lcl; - l.state = ledger.state; - cons::ctx.ledger_cache.emplace(seq_no, std::move(l)); - } - - ctx.last_requested_lcl = ""; - - if (cons::ctx.ledger_cache.empty()) - { - cons::ctx.led_seq_no = 0; - cons::ctx.lcl = GENESIS_LEDGER; - } - else - { - const auto latest_lcl_itr = cons::ctx.ledger_cache.rbegin(); - cons::ctx.lcl = latest_lcl_itr->second.lcl; - cons::ctx.led_seq_no = latest_lcl_itr->first; - } - - LOG_INFO << "lcl sync complete. New lcl:" << cons::ctx.lcl.substr(0, 15); - } - -} // namespace cons \ No newline at end of file diff --git a/src/cons/ledger_handler.hpp b/src/cons/ledger_handler.hpp deleted file mode 100644 index 86ec07d8..00000000 --- a/src/cons/ledger_handler.hpp +++ /dev/null @@ -1,46 +0,0 @@ -#ifndef _HP_CONS_LEDGER_ -#define _HP_CONS_LEDGER_ - -#include "../pchheader.hpp" -#include "../p2p/p2p.hpp" - -namespace cons -{ - -//max ledger count -constexpr uint64_t MAX_LEDGER_SEQUENCE = 200; -constexpr const char* GENESIS_LEDGER = "0-genesis"; -struct ledger_cache_entry -{ - std::string lcl; - std::string state; -}; - -struct ledger_history -{ - std::string lcl; - uint64_t led_seq_no = 0; - std::map cache; -}; - -const std::tuple save_ledger(const p2p::proposal &proposal); - -void remove_old_ledgers(const uint64_t led_seq_no); - -void write_ledger(const std::string &file_name, const char *ledger_raw, size_t ledger_size); - -void remove_ledger(const std::string &file_name); - -const ledger_history load_ledger(); - -void send_ledger_history_request(const std::string &minimum_lcl, const std::string &required_lcl); - -bool check_required_lcl_availability(const p2p::history_request &hr); - -const p2p::history_response retrieve_ledger_history(const p2p::history_request &hr); - -void handle_ledger_history_response(const p2p::history_response &hr); - -} // namespace cons - -#endif \ No newline at end of file diff --git a/src/cons/cons.cpp b/src/consensus.cpp similarity index 90% rename from src/cons/cons.cpp rename to src/consensus.cpp index 5d16009c..c133f413 100644 --- a/src/cons/cons.cpp +++ b/src/consensus.cpp @@ -1,24 +1,24 @@ -#include "../pchheader.hpp" -#include "../conf.hpp" -#include "../usr/usr.hpp" -#include "../usr/user_input.hpp" -#include "../p2p/p2p.hpp" -#include "../msg/fbuf/p2pmsg_helpers.hpp" -#include "../msg/usrmsg_parser.hpp" -#include "../msg/usrmsg_common.hpp" -#include "../p2p/peer_session_handler.hpp" -#include "../hplog.hpp" -#include "../crypto.hpp" -#include "../sc.hpp" -#include "../hpfs/h32.hpp" -#include "../hpfs/hpfs.hpp" -#include "../state/state_sync.hpp" -#include "ledger_handler.hpp" -#include "cons.hpp" +#include "pchheader.hpp" +#include "conf.hpp" +#include "usr/usr.hpp" +#include "usr/user_input.hpp" +#include "p2p/p2p.hpp" +#include "msg/fbuf/p2pmsg_helpers.hpp" +#include "msg/usrmsg_parser.hpp" +#include "msg/usrmsg_common.hpp" +#include "p2p/peer_session_handler.hpp" +#include "hplog.hpp" +#include "crypto.hpp" +#include "sc.hpp" +#include "hpfs/h32.hpp" +#include "hpfs/hpfs.hpp" +#include "state/state_sync.hpp" +#include "ledger.hpp" +#include "consensus.hpp" namespace p2pmsg = msg::fbuf::p2pmsg; -namespace cons +namespace consensus { /** @@ -30,22 +30,10 @@ namespace cons constexpr float MAJORITY_THRESHOLD = 0.8; consensus_context ctx; - bool init_success = false; - bool is_shutting_down = false; - - // Consensus processing thread. - std::thread consensus_thread; - int init() { - //load lcl details from lcl history. - ledger_history ldr_hist = load_ledger(); - ctx.led_seq_no = ldr_hist.led_seq_no; - ctx.lcl = ldr_hist.lcl; - ctx.ledger_cache.swap(ldr_hist.cache); - if (get_initial_state_hash(ctx.state) == -1) { LOG_ERROR << "Failed to get initial state hash."; @@ -64,7 +52,7 @@ namespace cons ctx.contract_ctx.args.readonly = false; // Starting consensus processing thread. - consensus_thread = std::thread(cons::run_consensus); + ctx.consensus_thread = std::thread(run_consensus); init_success = true; return 0; @@ -78,14 +66,14 @@ namespace cons if (init_success) { // Making the consensus while loop stop. - is_shutting_down = true; + ctx.is_shutting_down = true; // Stop the contract if running. sc::stop(ctx.contract_ctx); // Joining consensus processing thread. - if (consensus_thread.joinable()) - consensus_thread.join(); + if (ctx.consensus_thread.joinable()) + ctx.consensus_thread.join(); } } @@ -94,7 +82,7 @@ namespace cons */ void wait() { - consensus_thread.join(); + ctx.consensus_thread.join(); } void run_consensus() @@ -103,7 +91,7 @@ namespace cons LOG_INFO << "Consensus processor started."; - while (!is_shutting_down) + while (!ctx.is_shutting_down) { if (consensus() == -1) { @@ -128,6 +116,10 @@ namespace cons ctx.time_now = stage_start; std::list collected_proposals; + // Get current lcl and sequence no. + const std::string lcl = ledger::ctx.get_lcl(); + const uint64_t lcl_seq_no = ledger::ctx.get_seq_no(); + // Throughout consensus, we move over the incoming proposals collected via the network so far into // the candidate proposal set (move and append). This is to have a private working set for the consensus // and avoid threading conflicts with network incoming proposals. @@ -165,7 +157,7 @@ namespace cons auto itr = ctx.candidate_npl_messages.begin(); while (itr != ctx.candidate_npl_messages.end()) { - if (itr->lcl == ctx.lcl) + if (itr->lcl == lcl) ++itr; else ctx.candidate_npl_messages.erase(itr++); @@ -177,13 +169,12 @@ namespace cons { // Broadcast non-unl proposals (NUP) containing inputs from locally connected users. broadcast_nonunl_proposal(); - //util::sleep(conf::cfg.roundtime / 10); // Verify and transfer user inputs from incoming NUPs onto consensus candidate data. - verify_and_populate_candidate_user_inputs(); + verify_and_populate_candidate_user_inputs(lcl_seq_no); // In stage 0 we create a novel proposal and broadcast it. - const p2p::proposal stg_prop = create_stage0_proposal(); + const p2p::proposal stg_prop = create_stage0_proposal(lcl); broadcast_proposal(stg_prop); } else // Stage 1, 2, 3 @@ -196,31 +187,15 @@ namespace cons // check if we're ahead/behind of consensus lcl bool is_lcl_desync = false, should_request_history = false; std::string majority_lcl; - check_lcl_votes(is_lcl_desync, should_request_history, majority_lcl, votes); + check_lcl_votes(is_lcl_desync, should_request_history, majority_lcl, votes, lcl); if (is_lcl_desync) { if (should_request_history) { - LOG_INFO << "Syncing lcl. Curr lcl:" << cons::ctx.lcl.substr(0, 15) << " majority:" << majority_lcl.substr(0, 15); - - // TODO: If we are in a lcl fork condition try to rollback state with the help of - // state_restore to rollback state checkpoints before requesting new state. - - // Handle minority going forward when boostrapping cluster. - // Here we are mimicking invalid min ledger scenario. - if (majority_lcl == GENESIS_LEDGER) - { - ctx.last_requested_lcl = majority_lcl; - p2p::history_response res; - res.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER; - handle_ledger_history_response(std::move(res)); - } - else - { - //create history request message and request history from a random peer. - send_ledger_history_request(ctx.lcl, majority_lcl); - } + //Node is not in sync with majority lcl. Switch to observer mode. + conf::change_operating_mode(conf::OPERATING_MODE::OBSERVER); + ledger::set_sync_target(majority_lcl); } } else @@ -239,16 +214,16 @@ namespace cons conf::change_operating_mode(conf::OPERATING_MODE::PROPOSER); // In stage 1, 2, 3 we vote for incoming proposals and promote winning votes based on thresholds. - const p2p::proposal stg_prop = create_stage123_proposal(votes); + const p2p::proposal stg_prop = create_stage123_proposal(votes, lcl); broadcast_proposal(stg_prop); if (ctx.stage == 3) { - if (apply_ledger(stg_prop) != -1) + if (apply_ledger(stg_prop, lcl_seq_no, lcl) != -1) { // node has finished a consensus round (all 4 stages). - LOG_INFO << "****Stage 3 consensus reached**** (lcl:" << ctx.lcl.substr(0, 15) + LOG_INFO << "****Stage 3 consensus reached**** (lcl:" << lcl.substr(0, 15) << " state:" << ctx.state << ")"; } else @@ -384,7 +359,7 @@ namespace cons * Verifies the user signatures and populate non-expired user inputs from collected * non-unl proposals (if any) into consensus candidate data. */ - void verify_and_populate_candidate_user_inputs() + void verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no) { // Lock the user sessions and the list so any network activity is blocked. std::scoped_lock lock(usr::ctx.users_mutex, p2p::ctx.collected_msgs.nonunl_proposals_mutex); @@ -422,7 +397,7 @@ namespace cons parser.extract_input_container(input, nonce, max_lcl_seqno, umsg.input_container); // Ignore the input if our ledger has passed the input TTL. - if (max_lcl_seqno > ctx.led_seq_no) + if (max_lcl_seqno > lcl_seq_no) { if (!appbill_balance_exceeded) { @@ -548,13 +523,13 @@ namespace cons } } - p2p::proposal create_stage0_proposal() + p2p::proposal create_stage0_proposal(std::string_view lcl) { // The proposal we are going to emit in stage 0. p2p::proposal stg_prop; stg_prop.time = ctx.time_now; stg_prop.stage = 0; - stg_prop.lcl = ctx.lcl; + stg_prop.lcl = lcl; stg_prop.state = ctx.state; // Populate the proposal with set of candidate user pubkeys. @@ -577,7 +552,7 @@ namespace cons return stg_prop; } - p2p::proposal create_stage123_proposal(vote_counter &votes) + p2p::proposal create_stage123_proposal(vote_counter &votes, std::string_view lcl) { // The proposal to be emited at the end of this stage. p2p::proposal stg_prop; @@ -586,7 +561,7 @@ namespace cons // we always vote for our current lcl and state regardless of what other peers are saying // if there's a fork condition we will either request history and state from // our peers or we will halt depending on level of consensus on the sides of the fork - stg_prop.lcl = ctx.lcl; + stg_prop.lcl = lcl; stg_prop.state = ctx.state; // Vote for rest of the proposal fields by looking at candidate proposals. @@ -677,7 +652,7 @@ namespace cons /** * Check our LCL is consistent with the proposals being made by our UNL peers lcl_votes. */ - void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes) + void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes, std::string_view lcl) { int32_t total_lcl_votes = 0; @@ -710,14 +685,10 @@ namespace cons //if winning lcl is not matched node lcl, //that means vote is not on the consensus ledger. //Should request history from a peer. - if (ctx.lcl != majority_lcl) + if (lcl != majority_lcl) { LOG_DEBUG << "We are not on the consensus ledger, requesting history from a random peer"; is_desync = true; - - //Node is not in sync with current lcl ->switch to observer mode. - conf::change_operating_mode(conf::OPERATING_MODE::OBSERVER); - should_request_history = true; return; } @@ -767,11 +738,11 @@ namespace cons switch (stage) { case 1: - return cons::STAGE1_THRESHOLD * conf::cfg.unl.size(); + return STAGE1_THRESHOLD * conf::cfg.unl.size(); case 2: - return cons::STAGE2_THRESHOLD * conf::cfg.unl.size(); + return STAGE2_THRESHOLD * conf::cfg.unl.size(); case 3: - return cons::STAGE3_THRESHOLD * conf::cfg.unl.size(); + return STAGE3_THRESHOLD * conf::cfg.unl.size(); } return -1; } @@ -780,18 +751,17 @@ namespace cons * Finalize the ledger after consensus. * @param cons_prop The proposal that reached consensus. */ - int apply_ledger(const p2p::proposal &cons_prop) + int apply_ledger(const p2p::proposal &cons_prop, const uint64_t lcl_seq_no, std::string_view lcl) { - const std::tuple new_lcl = save_ledger(cons_prop); - ctx.led_seq_no = std::get<0>(new_lcl); - ctx.lcl = std::get<1>(new_lcl); + if (ledger::save_ledger(cons_prop) == -1) + return -1; // After the current ledger seq no is updated, we remove any newly expired inputs from candidate set. { auto itr = ctx.candidate_user_inputs.begin(); while (itr != ctx.candidate_user_inputs.end()) { - if (itr->second.maxledgerseqno <= ctx.led_seq_no) + if (itr->second.maxledgerseqno <= lcl_seq_no) ctx.candidate_user_inputs.erase(itr++); else ++itr; @@ -799,13 +769,13 @@ namespace cons } // Send any output from the previous consensus round to locally connected users. - dispatch_user_outputs(cons_prop); + dispatch_user_outputs(cons_prop, lcl_seq_no, lcl); // Execute the contract { sc::contract_execution_args &args = ctx.contract_ctx.args; args.time = cons_prop.time; - args.lcl = ctx.lcl; + args.lcl = lcl; // Feed NPL messages. args.npl_messages.splice(args.npl_messages.end(), ctx.candidate_npl_messages); @@ -822,7 +792,7 @@ namespace cons ctx.state = args.post_execution_state_hash; extract_user_outputs_from_contract_bufmap(args.userbufs); - broadcast_npl_output(args.npl_output); + broadcast_npl_output(args.npl_output, lcl); sc::clear_args(args); } @@ -833,7 +803,7 @@ namespace cons * Dispatch any consensus-reached outputs to matching users if they are connected to us locally. * @param cons_prop The proposal that achieved consensus. */ - void dispatch_user_outputs(const p2p::proposal &cons_prop) + void dispatch_user_outputs(const p2p::proposal &cons_prop, const uint64_t lcl_seq_no, std::string_view lcl) { std::scoped_lock lock(usr::ctx.users_mutex); @@ -866,7 +836,7 @@ namespace cons msg::usrmsg::usrmsg_parser parser(user.protocol); std::vector msg; - parser.create_contract_output_container(msg, outputtosend); + parser.create_contract_output_container(msg, outputtosend, lcl_seq_no, lcl); user.session.send(msg); } @@ -941,12 +911,12 @@ namespace cons } } - void broadcast_npl_output(std::string &output) + void broadcast_npl_output(std::string &output, std::string_view lcl) { if (!output.empty()) { flatbuffers::FlatBufferBuilder fbuf(1024); - p2pmsg::create_msg_from_npl_output(fbuf, output, ctx.lcl); + p2pmsg::create_msg_from_npl_output(fbuf, output, lcl); p2p::broadcast_message(fbuf, true); } } @@ -986,4 +956,4 @@ namespace cons ctx.state = new_state; } -} // namespace cons +} // namespace consensus diff --git a/src/cons/cons.hpp b/src/consensus.hpp similarity index 77% rename from src/cons/cons.hpp rename to src/consensus.hpp index 789153e4..a2c23247 100644 --- a/src/cons/cons.hpp +++ b/src/consensus.hpp @@ -1,16 +1,15 @@ #ifndef _HP_CONS_ #define _HP_CONS_ -#include "../pchheader.hpp" -#include "../util.hpp" -#include "../sc.hpp" -#include "../p2p/p2p.hpp" -#include "../usr/user_input.hpp" -#include "../hpfs/h32.hpp" -#include "../sc.hpp" -#include "ledger_handler.hpp" +#include "pchheader.hpp" +#include "util.hpp" +#include "sc.hpp" +#include "p2p/p2p.hpp" +#include "usr/user_input.hpp" +#include "hpfs/h32.hpp" +#include "sc.hpp" -namespace cons +namespace consensus { /** * Represents a contract input that takes part in consensus. @@ -71,18 +70,8 @@ namespace cons uint8_t stage = 0; uint64_t time_now = 0; - std::string lcl; - uint64_t led_seq_no = 0; hpfs::h32 state = hpfs::h32_empty; - //Map of closed ledgers(only lrdgername[sequnece_number-hash], state hash) with sequence number as map key. - //contains closed ledgers from latest to latest - MAX_LEDGER_SEQUENCE. - //this is loaded when node started and updated throughout consensus - delete ledgers that falls behind MAX_LEDGER_SEQUENCE range. - //We will use this to track lcls related logic.- track state, lcl request, response. - std::map ledger_cache; - std::string last_requested_lcl; - - //ledger close time of previous hash uint16_t stage_time = 0; // Time allocated to a consensus stage. uint16_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage. @@ -90,6 +79,8 @@ namespace cons sc::execution_context contract_ctx; bool is_shutting_down = false; + std::thread consensus_thread; + consensus_context() : recent_userinput_hashes(200) { @@ -106,8 +97,6 @@ namespace cons std::map state; }; - extern consensus_context ctx; - int init(); void deinit(); @@ -124,17 +113,17 @@ namespace cons void broadcast_nonunl_proposal(); - void verify_and_populate_candidate_user_inputs(); + void verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no); bool verify_appbill_check(std::string_view pubkey, const size_t input_len); - p2p::proposal create_stage0_proposal(); + p2p::proposal create_stage0_proposal(std::string_view lcl); - p2p::proposal create_stage123_proposal(vote_counter &votes); + p2p::proposal create_stage123_proposal(vote_counter &votes, std::string_view lcl); void broadcast_proposal(const p2p::proposal &p); - void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes); + void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes, std::string_view lcl); void check_state_votes(bool &is_desync, hpfs::h32 &majority_state, vote_counter &votes); @@ -146,15 +135,15 @@ namespace cons uint64_t get_stage_time_resolution(const uint64_t time); - int apply_ledger(const p2p::proposal &proposal); + int apply_ledger(const p2p::proposal &proposal, const uint64_t lcl_seq_no, std::string_view lcl); - void dispatch_user_outputs(const p2p::proposal &cons_prop); + void dispatch_user_outputs(const p2p::proposal &cons_prop, const uint64_t lcl_seq_no, std::string_view lcl); void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop); void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap); - void broadcast_npl_output(std::string &output); + void broadcast_npl_output(std::string &output, std::string_view lcl); template void increment(std::map &counter, const T &candidate); @@ -163,6 +152,6 @@ namespace cons void on_state_sync_completion(const hpfs::h32 new_state); -} // namespace cons +} // namespace consensus #endif diff --git a/src/ledger.cpp b/src/ledger.cpp new file mode 100644 index 00000000..94eab35e --- /dev/null +++ b/src/ledger.cpp @@ -0,0 +1,619 @@ +#include "pchheader.hpp" +#include "conf.hpp" +#include "crypto.hpp" +#include "p2p/p2p.hpp" +#include "msg/fbuf/common_helpers.hpp" +#include "msg/fbuf/ledger_helpers.hpp" +#include "msg/fbuf/p2pmsg_helpers.hpp" +#include "hplog.hpp" +#include "ledger.hpp" + +namespace p2pmsg = msg::fbuf::p2pmsg; + +namespace ledger +{ + constexpr int FILE_PERMS = 0644; + constexpr uint64_t MAX_LEDGER_SEQUENCE = 200; // Max ledger count. + constexpr uint16_t SYNCER_IDLE_WAIT = 20; // lcl syncer loop sleep time (milliseconds). + + ledger_context ctx; + sync_context sync_ctx; + bool init_success = false; + + /** + * Retrieve ledger history information from persisted ledgers. + */ + int init() + { + // Get all records at lcl history direcory and find the last closed ledger. + for (const auto &entry : util::fetch_dir_entries(conf::ctx.hist_dir)) + { + const std::string file_path = conf::ctx.hist_dir + "/" + entry.d_name; + + if (util::is_dir_exists(file_path)) + { + LOG_ERROR << "Found directory " << entry.d_name << " in " << conf::ctx.hist_dir << ". There should be no folders in this directory."; + return -1; + } + else + { + const std::string_view extension = util::fetch_file_extension(file_path); + const std::string file_name(util::remove_file_extension(entry.d_name)); + + if (extension != ".lcl") + { + LOG_ERROR << "Found invalid file extension: " << extension << " for lcl file " << entry.d_name << " in " << conf::ctx.hist_dir; + return -1; + } + + const size_t pos = file_name.find("-"); + + if (pos != std::string::npos) + { + std::vector buffer; + if (read_ledger(file_path, buffer) == -1) + return -1; + + if (!msg::fbuf::ledger::verify_ledger_buffer(buffer.data(), buffer.size())) + { + LOG_ERROR << "Ledger data verification failed. " << file_name; + return -1; + } + + const uint64_t seq_no = std::stoull(file_name.substr(0, pos)); + ctx.cache.emplace(seq_no, std::move(file_name)); // cache -> [seq_no - hash] + } + else + { + // lcl records should follow [ledger sequnce numer]-lcl[lcl hex] format. + LOG_ERROR << "Invalid lcl file name: " << file_name; + return -1; + } + } + } + + // Check if there is a saved lcl file -> if no send genesis lcl. + if (ctx.cache.empty()) + { + ctx.set_lcl(0, GENESIS_LEDGER); + } + else + { + const auto last_ledger = ctx.cache.rbegin(); + ctx.set_lcl(last_ledger->first, last_ledger->second); + + const uint64_t seq_no = ctx.get_seq_no(); + + // Remove old ledgers that exceeds max sequence range. + if (seq_no > MAX_LEDGER_SEQUENCE) + remove_old_ledgers(seq_no - MAX_LEDGER_SEQUENCE); + } + + sync_ctx.lcl_sync_thread = std::thread(lcl_syncer_loop); + + init_success = true; + return 0; + } + + void deinit() + { + if (init_success) + { + sync_ctx.is_shutting_down = true; + sync_ctx.lcl_sync_thread.join(); + } + } + + void set_sync_target(std::string_view target_lcl) + { + if (sync_ctx.is_shutting_down) + return; + + { + std::scoped_lock lock(sync_ctx.target_lcl_mutex); + sync_ctx.target_lcl = target_lcl; + } + + const std::string lcl = ctx.get_lcl(); + + LOG_INFO << "lcl sync: Syncing for target:" << sync_ctx.target_lcl.substr(0, 15) << " (current:" << lcl.substr(0, 15) << ")"; + + // Request history from a random peer if needed. + // If target is genesis ledger, we simply clear our ledger history without sending a + // history request. + if (target_lcl != GENESIS_LEDGER) + send_ledger_history_request(lcl, target_lcl); + } + + /** + * Runs the lcl sync worker loop. + */ + void lcl_syncer_loop() + { + util::mask_signal(); + + LOG_INFO << "lcl sync: Worker started."; + + std::list> history_requests; + std::list history_responses; + + while (!sync_ctx.is_shutting_down) + { + util::sleep(SYNCER_IDLE_WAIT); + + const std::string lcl = ctx.get_lcl(); + + // Move over the collected sync items to the local lists. + { + std::scoped_lock(sync_ctx.list_mutex); + history_requests.splice(history_requests.end(), sync_ctx.collected_history_requests); + history_responses.splice(history_responses.end(), sync_ctx.collected_history_responses); + } + + // Process any target lcl sync activities. + { + std::scoped_lock lock(sync_ctx.target_lcl_mutex); + + if (!sync_ctx.target_lcl.empty()) + { + if (sync_ctx.target_lcl == GENESIS_LEDGER) + { + clear_ledger(); + sync_ctx.target_lcl.clear(); + } + else + { + // Only process the first successful item which matches with our current lcl. + for (const p2p::history_response &hr : history_responses) + { + if (hr.requester_lcl == lcl && handle_ledger_history_response(hr) != -1) + { + sync_ctx.target_lcl.clear(); + break; + } + } + } + } + + history_responses.clear(); + } + + // Serve any history requests from other nodes. + { + // Acquire lock so consensus does not update the ledger while we are reading the ledger. + std::scoped_lock ledger_lock(ctx.ledger_mutex); + + for (const auto &[session_id, hr] : history_requests) + { + // First check whether we have the required lcl available. + if (!check_required_lcl_availability(hr.required_lcl)) + continue; + + p2p::history_response resp; + if (ledger::retrieve_ledger_history(hr, resp) != -1) + { + flatbuffers::FlatBufferBuilder fbuf(1024); + p2pmsg::create_msg_from_history_response(fbuf, resp); + std::string_view msg = msg::fbuf::flatbuff_bytes_to_sv(fbuf.GetBufferPointer(), fbuf.GetSize()); + + // Find the peer that we should send the state response to. + std::scoped_lock lock(p2p::ctx.peer_connections_mutex); + const auto peer_itr = p2p::ctx.peer_connections.find(session_id); + + if (peer_itr != p2p::ctx.peer_connections.end()) + { + comm::comm_session *session = peer_itr->second; + session->send(msg); + } + } + } + + history_requests.clear(); + } + } + + LOG_INFO << "lcl sync: Worker stopped."; + } + + /** + * Returns the current top ledger seq no and lcl. + */ + const std::pair get_ledger_cache_top() + { + const auto latest_lcl_itr = ctx.cache.rbegin(); + + if (latest_lcl_itr == ctx.cache.rend()) + return std::make_pair(0, GENESIS_LEDGER); + else + return std::make_pair(latest_lcl_itr->first, latest_lcl_itr->second); + } + + /** + * Create and save ledger from the given proposal message. Called by consensus. + * @param proposal Consensus-reached Stage 3 proposal. + */ + int save_ledger(const p2p::proposal &proposal) + { + const size_t pos = proposal.lcl.find("-"); + uint64_t seq_no = 0; + + if (pos != std::string::npos) + { + seq_no = std::stoull(proposal.lcl.substr(0, pos)); // Get lcl sequence number. + seq_no++; // New lcl sequence number. + } + else + { + // lcl records should follow [ledger sequnce numer]-lcl[lcl hex] format. + LOG_ERROR << "Invalid lcl name: " << proposal.lcl << " when saving ledger."; + return -1; + } + + // Serialize lcl using flatbuffer ledger schema. + flatbuffers::FlatBufferBuilder builder(1024); + msg::fbuf::ledger::create_ledger_from_proposal(builder, proposal, seq_no); + + // Get binary hash of the the serialized lcl. + std::string_view ledger_str_buf = msg::fbuf::flatbuff_bytes_to_sv(builder.GetBufferPointer(), builder.GetSize()); + const std::string lcl = crypto::get_hash(ledger_str_buf); + + // Get hex from binary hash. + std::string lcl_hash; + util::bin2hex(lcl_hash, + reinterpret_cast(lcl.data()), + lcl.size()); + + // Acquire lock so history request serving does not access the ledger while consensus is updating the ledger. + std::scoped_lock ledger_lock(ctx.ledger_mutex); + + // Construct lcl file name. + // lcl file name should follow [ledger sequnce numer]-lcl[lcl hex] format. + const std::string file_name = std::to_string(seq_no) + "-" + lcl_hash; + if (write_ledger(file_name, builder.GetBufferPointer(), builder.GetSize()) == -1) + return -1; + + ctx.set_lcl(seq_no, file_name); + + ctx.cache.emplace(seq_no, std::move(file_name)); + + //Remove old ledgers that exceeds max sequence range. + if (seq_no > MAX_LEDGER_SEQUENCE) + remove_old_ledgers(seq_no - MAX_LEDGER_SEQUENCE); + + return 0; + } + + /** + * Remove old ledgers that exceeds max sequence range from file system and ledger history cache. + * @param led_seq_no minimum sequence number to be in history. + */ + void remove_old_ledgers(const uint64_t led_seq_no) + { + std::map::iterator itr; + + for (itr = ctx.cache.begin(); + itr != ctx.cache.lower_bound(led_seq_no + 1); + itr++) + { + const std::string file_path = conf::ctx.hist_dir + "/" + itr->second + ".lcl"; + + if (util::is_file_exists(file_path)) + util::remove_file(file_path); + } + + if (!ctx.cache.empty()) + ctx.cache.erase(ctx.cache.begin(), ctx.cache.lower_bound(led_seq_no + 1)); + } + + /** + * Clears out entire ledger history. + */ + void clear_ledger() + { + util::clear_directory(conf::ctx.hist_dir); + ctx.cache.clear(); + ctx.set_lcl(0, GENESIS_LEDGER); + } + + /** + * Reads the specified ledger entry. + * @param file_path File path to read. + * @param buffer Buffer to populate with file contents. + * @return 0 on success. -1 on failure. + */ + int read_ledger(std::string_view file_path, std::vector &buffer) + { + const int fd = open(file_path.data(), O_RDONLY); + if (fd == -1) + { + LOG_ERROR << errno << ": Error opening ledger file for read. " << file_path; + return -1; + } + + struct stat st; + if (fstat(fd, &st) == -1) + { + LOG_ERROR << errno << ": Error in ledger file stat. " << file_path; + return -1; + } + + buffer.resize(st.st_size); + if (read(fd, buffer.data(), buffer.size()) == -1) + { + LOG_ERROR << errno << ": Error reading ledger file. " << file_path; + return -1; + } + + return 0; + } + + /** + * Write ledger to file system. + * @param file_name current ledger sequence number. + * @param ledger_raw raw lcl data. + * @param ledger_size size of the raw lcl data. + */ + int write_ledger(const std::string &file_name, const uint8_t *ledger_raw, const size_t ledger_size) + { + // Create file path to save ledger. + // file name -> [ledger sequnce numer]-[lcl hex] + + const std::string file_path = conf::ctx.hist_dir + "/" + file_name + ".lcl"; + + // Write ledger to file system + const int fd = open(file_path.data(), O_CREAT | O_RDWR, FILE_PERMS); + if (fd == -1) + { + LOG_ERROR << errno << ": Error creating ledger file. " << file_path; + return -1; + } + + if (write(fd, ledger_raw, ledger_size) == -1) + { + LOG_ERROR << errno << ": Error writing to new ledger file. " << file_path; + close(fd); + return -1; + } + + close(fd); + return 0; + } + + /** + * Delete ledger from file system. + * @param file_name name of ledger to be deleted. + */ + void remove_ledger(const std::string &file_name) + { + std::string file_path; + file_path.reserve(conf::ctx.hist_dir.size() + file_name.size() + 5); + file_path.append(conf::ctx.hist_dir) + .append("/") + .append(file_name) + .append(".lcl"); + util::remove_file(file_path); + } + + /** + * Create and send ledger history request to random node from unl list. + * @param minimum_lcl hash of the minimum lcl from which node need lcl history. + * @param required_lcl hash of the required lcl. + */ + void send_ledger_history_request(std::string_view minimum_lcl, std::string_view required_lcl) + { + p2p::history_request hr; + hr.required_lcl = required_lcl; + hr.minimum_lcl = minimum_lcl; + + flatbuffers::FlatBufferBuilder fbuf(1024); + p2pmsg::create_msg_from_history_request(fbuf, hr); + p2p::send_message_to_random_peer(fbuf); + + LOG_DEBUG << "Ledger history request sent. Required lcl:" << required_lcl.substr(0, 15); + } + + /** + * Check requested lcl is in node's lcl history cache. + * @param hr lcl history request information. + * @return true if requested lcl is in lcl history cache. + */ + bool check_required_lcl_availability(const std::string &required_lcl) + { + size_t pos = required_lcl.find("-"); + uint64_t req_seq_no = 0; + + // Get sequence number of required lcl + if (pos != std::string::npos) + { + req_seq_no = std::stoull(required_lcl.substr(0, pos)); // Get required lcl sequence number + } + + if (req_seq_no > 0) + { + const auto itr = ctx.cache.find(req_seq_no); + if (itr == ctx.cache.end()) + { + LOG_DEBUG << "Required lcl peer asked for is not in our lcl cache."; + // Either this node is also not in consesnsus ledger or other node requesting a lcl that is older than node's current + // minimum lcl sequence becuase of maximum ledger history range. + return false; + } + else if (itr->second != required_lcl) + { + LOG_DEBUG << "Required lcl peer asked for is not in our lcl cache."; + // Either this node or requesting node is in a fork condition. + return false; + } + } + else + { + return false; // Very rare case: Peer asking for the genisis lcl. + } + + return true; + } + + /** + * Retrieve lcl(last closed ledger) information from ledger history. + * @param hr lcl history request information. + * @param history_response Ledger history response to populate requested ledger details + * @return 0 on success. -1 on failure. + */ + int retrieve_ledger_history(const p2p::history_request &hr, p2p::history_response &history_response) + { + // Get sequence number of minimum lcl required + const size_t pos = hr.minimum_lcl.find("-"); + if (pos == std::string::npos) + { + LOG_DEBUG << "lcl serve: Invalid lcl history request. Requested:" << hr.minimum_lcl; + return -1; + } + + // We put the requester's own lcl back in the response so they can validate the liveliness of the response. + history_response.requester_lcl = hr.minimum_lcl; + + uint64_t min_seq_no = std::stoull(hr.minimum_lcl.substr(0, pos)); // Get required lcl sequence number + + const auto itr = ctx.cache.find(min_seq_no); + if (itr != ctx.cache.end()) // Requested minimum lcl is not in our lcl history cache + { + min_seq_no = itr->first; + + // Check whether minimum lcl requested is same as this node's. + // Evenhough sequence number are same, lcl hash can be changed if one of node is in a fork condition. + if (hr.minimum_lcl != itr->second) + { + LOG_DEBUG << "lcl serve: Invalid minimum ledger. Requested min lcl:" << hr.minimum_lcl << " Node lcl:" << itr->second; + history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER; + return 0; + } + } + else if (min_seq_no > ctx.cache.rbegin()->first) //Recieved minimum lcl sequence is ahead of node's lcl sequence. + { + LOG_DEBUG << "lcl serve: Invalid minimum ledger. Recieved minimum seq no is ahead of node current seq no. Requested lcl:" << hr.minimum_lcl; + history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER; + return 0; + } + else + { + LOG_DEBUG << "lcl serve: Minimum lcl peer asked for is not in our lcl cache. Therefore sending from node minimum lcl."; + min_seq_no = ctx.cache.begin()->first; + } + + //copy current history cache. + std::map led_cache = ctx.cache; + + //filter out cache and get raw files here. + led_cache.erase( + led_cache.begin(), + led_cache.lower_bound(min_seq_no)); + + //Get raw content of lcls that going to be send. + for (const auto &[seq_no, lcl] : led_cache) + { + p2p::history_ledger ledger; + ledger.lcl = lcl; + + // Read lcl file. + const std::string file_path = conf::ctx.hist_dir + "/" + lcl + ".lcl"; + if (read_ledger(file_path, ledger.raw_ledger) == -1) + { + LOG_DEBUG << "lcl serve: Error when reading ledger file."; + return -1; + } + + history_response.hist_ledgers.emplace(seq_no, std::move(ledger)); + } + + return 0; + } + + /** + * Handle recieved ledger history response. + * @param hr lcl history request information. + * @return 0 on successful lcl update. -1 on failure. + */ + int handle_ledger_history_response(const p2p::history_response &hr) + { + if (hr.error == p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER) + { + // This means we are in a fork ledger. Remove/rollback current top ledger. + // Basically in the long run we'll rolback one by one untill we catch up to valid minimum ledger. + remove_ledger(ctx.get_lcl()); + ctx.cache.erase(ctx.cache.rbegin()->first); + + const auto [seq_no, lcl] = get_ledger_cache_top(); + ctx.set_lcl(seq_no, lcl); + + LOG_INFO << "lcl sync: Fork detected. Removed last ledger. New lcl:" << lcl.substr(0, 15); + return 0; + } + else + { + // Check whether recieved lcl history contains the current lcl node required. + bool contains_requested_lcl = false; + for (auto &[seq_no, ledger] : hr.hist_ledgers) + { + if (sync_ctx.target_lcl == ledger.lcl) + { + contains_requested_lcl = true; + break; + } + } + + if (!contains_requested_lcl) + { + LOG_DEBUG << "lcl sync: Peer sent us a history response but not containing the lcl we asked for."; + return -1; + } + + // Check integrity of recieved lcl list. + // By checking recieved lcl hashes matches lcl content by applying hashing for each raw content. + for (auto &[seq_no, ledger] : hr.hist_ledgers) + { + const size_t pos = ledger.lcl.find("-"); + const std::string rec_lcl_hash = ledger.lcl.substr((pos + 1), (ledger.lcl.size() - 1)); + + // Get binary hash of the the serialized lcl. + const std::string lcl = crypto::get_hash(ledger.raw_ledger.data(), ledger.raw_ledger.size()); + + // Get hex from binary hash + std::string lcl_hash; + + util::bin2hex(lcl_hash, + reinterpret_cast(lcl.data()), + lcl.size()); + + // recieved lcl hash and hash generated from recieved lcl content doesn't match -> abandon applying it + if (lcl_hash != rec_lcl_hash) + { + LOG_DEBUG << "lcl sync: Peer sent us a history response but the ledger data does not match the hashes."; + // todo: we should penalize peer who sent this. + return -1; + } + } + } + + // Execution to here means the history data sent checks out. + // Save recieved lcl in file system and update lcl history cache. + for (auto &[seq_no, ledger] : hr.hist_ledgers) + { + auto prev_dup_itr = ctx.cache.find(seq_no); + if (prev_dup_itr != ctx.cache.end()) + { + remove_ledger(prev_dup_itr->second); + ctx.cache.erase(prev_dup_itr); + } + + write_ledger(ledger.lcl, ledger.raw_ledger.data(), ledger.raw_ledger.size()); + ctx.cache.emplace(seq_no, ledger.lcl); + } + + const auto [seq_no, lcl] = get_ledger_cache_top(); + ctx.set_lcl(seq_no, lcl); + + LOG_INFO << "lcl sync: Sync complete. New lcl:" << lcl.substr(0, 15); + return 0; + } + +} // namespace ledger \ No newline at end of file diff --git a/src/ledger.hpp b/src/ledger.hpp new file mode 100644 index 00000000..7f1d8c55 --- /dev/null +++ b/src/ledger.hpp @@ -0,0 +1,97 @@ +#ifndef _HP_LEDGER_ +#define _HP_LEDGER_ + +#include "pchheader.hpp" +#include "p2p/p2p.hpp" + +namespace ledger +{ + constexpr const char *GENESIS_LEDGER = "0-genesis"; + + struct sync_context + { + // The current target lcl that we are syncing towards. + std::string target_lcl; + std::mutex target_lcl_mutex; + + // Lists holding history requests and responses collected from incoming p2p messages. + std::list> collected_history_requests; + std::list collected_history_responses; + std::mutex list_mutex; + + std::thread lcl_sync_thread; + bool is_shutting_down = false; + }; + + struct ledger_context + { + private: + std::string lcl; + uint64_t seq_no = 0; + std::shared_mutex lcl_mutex; + + public: + // Map of closed ledgers (lcl string) with sequence number as map key. + // Contains closed ledgers from oldest to latest - MAX_LEDGER_SEQUENCE. + // This is loaded when node started and updated throughout consensus. + // Deletes ledgers that falls behind MAX_LEDGER_SEQUENCE range. + std::map cache; + + std::mutex ledger_mutex; + + const std::string get_lcl() + { + std::shared_lock lock(lcl_mutex); + return lcl; + } + + uint64_t get_seq_no() + { + std::shared_lock lock(lcl_mutex); + return seq_no; + } + + void set_lcl(const uint64_t new_seq_no, std::string_view new_lcl) + { + std::unique_lock lock(lcl_mutex); + lcl = new_lcl; + seq_no = new_seq_no; + } + }; + + extern sync_context sync_ctx; + extern ledger_context ctx; + + int init(); + + void deinit(); + + void lcl_syncer_loop(); + + void set_sync_target(std::string_view target_lcl); + + const std::pair get_ledger_cache_top(); + + int save_ledger(const p2p::proposal &proposal); + + void remove_old_ledgers(const uint64_t led_seq_no); + + void clear_ledger(); + + int read_ledger(std::string_view file_path, std::vector &buffer); + + int write_ledger(const std::string &file_name, const uint8_t *ledger_raw, const size_t ledger_size); + + void remove_ledger(const std::string &file_name); + + void send_ledger_history_request(std::string_view minimum_lcl, std::string_view required_lcl); + + bool check_required_lcl_availability(const std::string &required_lcl); + + int retrieve_ledger_history(const p2p::history_request &hr, p2p::history_response &history_response); + + int handle_ledger_history_response(const p2p::history_response &hr); + +} // namespace ledger + +#endif \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index c35b06b4..e3f84152 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -11,7 +11,8 @@ #include "usr/usr.hpp" #include "usr/read_req.hpp" #include "p2p/p2p.hpp" -#include "cons/cons.hpp" +#include "consensus.hpp" +#include "ledger.hpp" #include "hpfs/hpfs.hpp" #include "state/state_sync.hpp" #include "state/state_serve.hpp" @@ -67,7 +68,8 @@ int parse_cmd(int argc, char **argv) */ void deinit() { - cons::deinit(); + consensus::deinit(); + ledger::deinit(); state_sync::deinit(); state_serve::deinit(); read_req::deinit(); @@ -191,7 +193,7 @@ int main(int argc, char **argv) LOG_INFO << "Public key: " << conf::cfg.pubkeyhex.substr(2); // Public key without 'ed' prefix. if (hpfs::init() != 0 || p2p::init() != 0 || usr::init() != 0 || read_req::init() != 0 || - state_serve::init() != 0 || state_sync::init() != 0 || cons::init() != 0) + state_serve::init() != 0 || state_sync::init() != 0 || ledger::init() || consensus::init() != 0) { deinit(); return -1; @@ -201,7 +203,7 @@ int main(int argc, char **argv) signal(SIGINT, &sigint_handler); // Wait until consensus thread finishes. - cons::wait(); + consensus::wait(); // deinit() here only gets called when there is an error in consensus. // If not deinit in the sigint handler is called when a SIGINT is received. diff --git a/src/msg/bson/usrmsg_bson.cpp b/src/msg/bson/usrmsg_bson.cpp index 5f6db7bc..6450a1ec 100644 --- a/src/msg/bson/usrmsg_bson.cpp +++ b/src/msg/bson/usrmsg_bson.cpp @@ -1,6 +1,5 @@ #include "../../pchheader.hpp" #include "../../util.hpp" -#include "../../cons/cons.hpp" #include "../../hplog.hpp" #include "../usrmsg_common.hpp" #include "usrmsg_bson.hpp" @@ -17,16 +16,16 @@ namespace msg::usrmsg::bson * "lcl_seqno": * } */ - void create_status_response(std::vector &msg) + void create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl) { jsoncons::bson::bson_bytes_encoder encoder(msg); encoder.begin_object(); encoder.key(msg::usrmsg::FLD_TYPE); encoder.string_value(msg::usrmsg::MSGTYPE_STAT_RESPONSE); encoder.key(msg::usrmsg::FLD_LCL); - encoder.string_value(cons::ctx.lcl); + encoder.string_value(lcl); encoder.key(msg::usrmsg::FLD_LCL_SEQ); - encoder.int64_value(cons::ctx.led_seq_no); + encoder.int64_value(lcl_seq_no); encoder.end_object(); encoder.flush(); } @@ -95,16 +94,16 @@ namespace msg::usrmsg::bson * } * @param content The contract binary output content to be put in the message. */ - void create_contract_output_container(std::vector &msg, std::string_view content) + void create_contract_output_container(std::vector &msg, std::string_view content, const uint64_t lcl_seq_no, std::string_view lcl) { jsoncons::bson::bson_bytes_encoder encoder(msg); encoder.begin_object(); encoder.key(msg::usrmsg::FLD_TYPE); encoder.string_value(msg::usrmsg::MSGTYPE_CONTRACT_OUTPUT); encoder.key(msg::usrmsg::FLD_LCL); - encoder.string_value(cons::ctx.lcl); + encoder.string_value(lcl); encoder.key(msg::usrmsg::FLD_LCL_SEQ); - encoder.int64_value(cons::ctx.led_seq_no); + encoder.int64_value(lcl_seq_no); encoder.key(msg::usrmsg::FLD_CONTENT); encoder.byte_string_value(content); encoder.end_object(); diff --git a/src/msg/bson/usrmsg_bson.hpp b/src/msg/bson/usrmsg_bson.hpp index db803650..f697ee2b 100644 --- a/src/msg/bson/usrmsg_bson.hpp +++ b/src/msg/bson/usrmsg_bson.hpp @@ -8,14 +8,14 @@ namespace msg::usrmsg::bson void create_user_challenge(std::vector &msg, std::string &challengehex); - void create_status_response(std::vector &msg); + void create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl); void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, std::string_view input_sig); void create_contract_read_response_container(std::vector &msg, std::string_view content); - void create_contract_output_container(std::vector &msg, std::string_view content); + void create_contract_output_container(std::vector &msg, std::string_view content, const uint64_t lcl_seq_no, std::string_view lcl); int verify_user_handshake_response(std::string &extracted_pubkeyhex, std::string &extracted_protocol, std::string_view response, std::string_view original_challenge); diff --git a/src/msg/fbuf/ledger_helpers.cpp b/src/msg/fbuf/ledger_helpers.cpp index 89926b7a..04f880fa 100644 --- a/src/msg/fbuf/ledger_helpers.cpp +++ b/src/msg/fbuf/ledger_helpers.cpp @@ -7,25 +7,30 @@ namespace msg::fbuf::ledger { -/** + /** * Create ledger from the given proposal struct. * @param p The proposal struct to be placed in ledger. */ -const std::string_view create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p, const uint64_t seq_no) -{ - flatbuffers::Offset ledger = - ledger::CreateLedger( - builder, - seq_no, - p.time, - sv_to_flatbuff_bytes(builder, p.lcl), - sv_to_flatbuff_bytes(builder, p.state.to_string_view()), - stringlist_to_flatbuf_bytearrayvector(builder, p.users), - stringlist_to_flatbuf_bytearrayvector(builder, p.hash_inputs), - stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs)); + void create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p, const uint64_t seq_no) + { + flatbuffers::Offset ledger = + ledger::CreateLedger( + builder, + seq_no, + p.time, + sv_to_flatbuff_bytes(builder, p.lcl), + sv_to_flatbuff_bytes(builder, p.state.to_string_view()), + stringlist_to_flatbuf_bytearrayvector(builder, p.users), + stringlist_to_flatbuf_bytearrayvector(builder, p.hash_inputs), + stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs)); - builder.Finish(ledger); // Finished building message content to get serialised content. + builder.Finish(ledger); // Finished building message content to get serialised content. + } + + bool verify_ledger_buffer(const uint8_t *ledger_buf_ptr, const size_t buf_len) + { + flatbuffers::Verifier ledger_verifier(ledger_buf_ptr, buf_len); + return VerifyLedgerBuffer(ledger_verifier); + } - return flatbuff_bytes_to_sv(builder.GetBufferPointer(), builder.GetSize()); -} } // namespace msg::fbuf::ledger diff --git a/src/msg/fbuf/ledger_helpers.hpp b/src/msg/fbuf/ledger_helpers.hpp index 6a8bad8a..a5608e60 100644 --- a/src/msg/fbuf/ledger_helpers.hpp +++ b/src/msg/fbuf/ledger_helpers.hpp @@ -8,7 +8,10 @@ namespace msg::fbuf::ledger { -const std::string_view create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p, const uint64_t seq_no); -} + void create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p, const uint64_t seq_no); + + bool verify_ledger_buffer(const uint8_t *ledger_buf_ptr, const size_t buf_len); + +} // namespace msg::fbuf::ledger #endif \ No newline at end of file diff --git a/src/msg/fbuf/p2pmsg_content.fbs b/src/msg/fbuf/p2pmsg_content.fbs index 76ca452c..df213ded 100644 --- a/src/msg/fbuf/p2pmsg_content.fbs +++ b/src/msg/fbuf/p2pmsg_content.fbs @@ -60,6 +60,7 @@ enum Ledger_Response_Error : ubyte } table History_Response_Message { //Ledger History request type message schema + requester_lcl:[ubyte]; hist_ledgers:[HistoryLedgerPair]; error: Ledger_Response_Error; } @@ -70,7 +71,6 @@ table HistoryLedgerPair { //A key, value pair of byte[]. } table HistoryLedger { - state:[ubyte]; lcl:[ubyte]; raw_ledger:[ubyte]; } diff --git a/src/msg/fbuf/p2pmsg_content_generated.h b/src/msg/fbuf/p2pmsg_content_generated.h index bd620de3..6f0f9a10 100644 --- a/src/msg/fbuf/p2pmsg_content_generated.h +++ b/src/msg/fbuf/p2pmsg_content_generated.h @@ -974,9 +974,16 @@ inline flatbuffers::Offset CreateHistory_Request_Messag struct History_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef History_Response_MessageBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_HIST_LEDGERS = 4, - VT_ERROR = 6 + VT_REQUESTER_LCL = 4, + VT_HIST_LEDGERS = 6, + VT_ERROR = 8 }; + const flatbuffers::Vector *requester_lcl() const { + return GetPointer *>(VT_REQUESTER_LCL); + } + flatbuffers::Vector *mutable_requester_lcl() { + return GetPointer *>(VT_REQUESTER_LCL); + } const flatbuffers::Vector> *hist_ledgers() const { return GetPointer> *>(VT_HIST_LEDGERS); } @@ -991,6 +998,8 @@ struct History_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::T } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_REQUESTER_LCL) && + verifier.VerifyVector(requester_lcl()) && VerifyOffset(verifier, VT_HIST_LEDGERS) && verifier.VerifyVector(hist_ledgers()) && verifier.VerifyVectorOfTables(hist_ledgers()) && @@ -1003,6 +1012,9 @@ struct History_Response_MessageBuilder { typedef History_Response_Message Table; flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; + void add_requester_lcl(flatbuffers::Offset> requester_lcl) { + fbb_.AddOffset(History_Response_Message::VT_REQUESTER_LCL, requester_lcl); + } void add_hist_ledgers(flatbuffers::Offset>> hist_ledgers) { fbb_.AddOffset(History_Response_Message::VT_HIST_LEDGERS, hist_ledgers); } @@ -1022,21 +1034,26 @@ struct History_Response_MessageBuilder { inline flatbuffers::Offset CreateHistory_Response_Message( flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset> requester_lcl = 0, flatbuffers::Offset>> hist_ledgers = 0, msg::fbuf::p2pmsg::Ledger_Response_Error error = msg::fbuf::p2pmsg::Ledger_Response_Error_None) { History_Response_MessageBuilder builder_(_fbb); builder_.add_hist_ledgers(hist_ledgers); + builder_.add_requester_lcl(requester_lcl); builder_.add_error(error); return builder_.Finish(); } inline flatbuffers::Offset CreateHistory_Response_MessageDirect( flatbuffers::FlatBufferBuilder &_fbb, + const std::vector *requester_lcl = nullptr, const std::vector> *hist_ledgers = nullptr, msg::fbuf::p2pmsg::Ledger_Response_Error error = msg::fbuf::p2pmsg::Ledger_Response_Error_None) { + auto requester_lcl__ = requester_lcl ? _fbb.CreateVector(*requester_lcl) : 0; auto hist_ledgers__ = hist_ledgers ? _fbb.CreateVector>(*hist_ledgers) : 0; return msg::fbuf::p2pmsg::CreateHistory_Response_Message( _fbb, + requester_lcl__, hist_ledgers__, error); } @@ -1102,16 +1119,9 @@ inline flatbuffers::Offset CreateHistoryLedgerPair( struct HistoryLedger FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef HistoryLedgerBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_STATE = 4, - VT_LCL = 6, - VT_RAW_LEDGER = 8 + VT_LCL = 4, + VT_RAW_LEDGER = 6 }; - const flatbuffers::Vector *state() const { - return GetPointer *>(VT_STATE); - } - flatbuffers::Vector *mutable_state() { - return GetPointer *>(VT_STATE); - } const flatbuffers::Vector *lcl() const { return GetPointer *>(VT_LCL); } @@ -1126,8 +1136,6 @@ struct HistoryLedger FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && - VerifyOffset(verifier, VT_STATE) && - verifier.VerifyVector(state()) && VerifyOffset(verifier, VT_LCL) && verifier.VerifyVector(lcl()) && VerifyOffset(verifier, VT_RAW_LEDGER) && @@ -1140,9 +1148,6 @@ struct HistoryLedgerBuilder { typedef HistoryLedger Table; flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; - void add_state(flatbuffers::Offset> state) { - fbb_.AddOffset(HistoryLedger::VT_STATE, state); - } void add_lcl(flatbuffers::Offset> lcl) { fbb_.AddOffset(HistoryLedger::VT_LCL, lcl); } @@ -1162,27 +1167,22 @@ struct HistoryLedgerBuilder { inline flatbuffers::Offset CreateHistoryLedger( flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset> state = 0, flatbuffers::Offset> lcl = 0, flatbuffers::Offset> raw_ledger = 0) { HistoryLedgerBuilder builder_(_fbb); builder_.add_raw_ledger(raw_ledger); builder_.add_lcl(lcl); - builder_.add_state(state); return builder_.Finish(); } inline flatbuffers::Offset CreateHistoryLedgerDirect( flatbuffers::FlatBufferBuilder &_fbb, - const std::vector *state = nullptr, const std::vector *lcl = nullptr, const std::vector *raw_ledger = nullptr) { - auto state__ = state ? _fbb.CreateVector(*state) : 0; auto lcl__ = lcl ? _fbb.CreateVector(*lcl) : 0; auto raw_ledger__ = raw_ledger ? _fbb.CreateVector(*raw_ledger) : 0; return msg::fbuf::p2pmsg::CreateHistoryLedger( _fbb, - state__, lcl__, raw_ledger__); } diff --git a/src/msg/fbuf/p2pmsg_helpers.cpp b/src/msg/fbuf/p2pmsg_helpers.cpp index 10cd8dd2..4d375591 100644 --- a/src/msg/fbuf/p2pmsg_helpers.cpp +++ b/src/msg/fbuf/p2pmsg_helpers.cpp @@ -201,6 +201,9 @@ namespace msg::fbuf::p2pmsg { p2p::history_response hr; + if (msg.requester_lcl()) + hr.requester_lcl = flatbuff_bytes_to_sv(msg.requester_lcl()); + if (msg.hist_ledgers()) hr.hist_ledgers = flatbuf_historyledgermap_to_historyledgermap(msg.hist_ledgers()); @@ -427,6 +430,7 @@ namespace msg::fbuf::p2pmsg flatbuffers::Offset hrmsg = CreateHistory_Response_Message( builder, + sv_to_flatbuff_bytes(builder, hr.requester_lcl), historyledgermap_to_flatbuf_historyledgermap(builder, hr.hist_ledgers), (Ledger_Response_Error)hr.error); @@ -692,7 +696,6 @@ namespace msg::fbuf::p2pmsg { flatbuffers::Offset history_ledger = CreateHistoryLedger( builder, - sv_to_flatbuff_bytes(builder, ledger.state), sv_to_flatbuff_bytes(builder, ledger.lcl), builder.CreateVector(ledger.raw_ledger)); diff --git a/src/msg/json/usrmsg_json.cpp b/src/msg/json/usrmsg_json.cpp index b1c943e5..55fdee5f 100644 --- a/src/msg/json/usrmsg_json.cpp +++ b/src/msg/json/usrmsg_json.cpp @@ -1,7 +1,6 @@ #include "../../pchheader.hpp" #include "../../util.hpp" #include "../../crypto.hpp" -#include "../../cons/cons.hpp" #include "../../hplog.hpp" #include "../usrmsg_common.hpp" #include "usrmsg_json.hpp" @@ -73,7 +72,7 @@ namespace msg::usrmsg::json * "lcl_seqno": * } */ - void create_status_response(std::vector &msg) + void create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl) { msg.reserve(128); msg += "{\""; @@ -83,11 +82,11 @@ namespace msg::usrmsg::json msg += SEP_COMMA; msg += msg::usrmsg::FLD_LCL; msg += SEP_COLON; - msg += cons::ctx.lcl; + msg += lcl; msg += SEP_COMMA; msg += msg::usrmsg::FLD_LCL_SEQ; msg += SEP_COLON_NOQUOTE; - msg += std::to_string(cons::ctx.led_seq_no); + msg += std::to_string(lcl_seq_no); msg += "}"; } @@ -172,7 +171,7 @@ namespace msg::usrmsg::json * } * @param content The contract binary output content to be put in the message. */ - void create_contract_output_container(std::vector &msg, std::string_view content) + void create_contract_output_container(std::vector &msg, std::string_view content, const uint64_t lcl_seq_no, std::string_view lcl) { std::string contenthex; util::bin2hex( @@ -188,11 +187,11 @@ namespace msg::usrmsg::json msg += SEP_COMMA; msg += msg::usrmsg::FLD_LCL; msg += SEP_COLON; - msg += cons::ctx.lcl; + msg += lcl; msg += SEP_COMMA; msg += msg::usrmsg::FLD_LCL_SEQ; msg += SEP_COLON_NOQUOTE; - msg += std::to_string(cons::ctx.led_seq_no); + msg += std::to_string(lcl_seq_no); msg += SEP_COMMA_NOQUOTE; msg += msg::usrmsg::FLD_CONTENT; msg += SEP_COLON; diff --git a/src/msg/json/usrmsg_json.hpp b/src/msg/json/usrmsg_json.hpp index 02ef312a..a0c11323 100644 --- a/src/msg/json/usrmsg_json.hpp +++ b/src/msg/json/usrmsg_json.hpp @@ -8,14 +8,14 @@ namespace msg::usrmsg::json void create_user_challenge(std::vector &msg, std::string &challengehex); - void create_status_response(std::vector &msg); + void create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl); void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, std::string_view input_sig); void create_contract_read_response_container(std::vector &msg, std::string_view content); - void create_contract_output_container(std::vector &msg, std::string_view content); + void create_contract_output_container(std::vector &msg, std::string_view content, const uint64_t lcl_seq_no, std::string_view lcl); int verify_user_handshake_response(std::string &extracted_pubkeyhex, std::string &extracted_protocol, std::string_view response, std::string_view original_challenge); diff --git a/src/msg/usrmsg_parser.cpp b/src/msg/usrmsg_parser.cpp index 9b11c837..7f368cb8 100644 --- a/src/msg/usrmsg_parser.cpp +++ b/src/msg/usrmsg_parser.cpp @@ -13,12 +13,12 @@ namespace msg::usrmsg { } - void usrmsg_parser::create_status_response(std::vector &msg) const + void usrmsg_parser::create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl) const { if (protocol == util::PROTOCOL::JSON) - jusrmsg::create_status_response(msg); + jusrmsg::create_status_response(msg, lcl_seq_no, lcl); else - busrmsg::create_status_response(msg); + busrmsg::create_status_response(msg, lcl_seq_no, lcl); } void usrmsg_parser::create_contract_input_status(std::vector &msg, std::string_view status, @@ -38,12 +38,12 @@ namespace msg::usrmsg busrmsg::create_contract_read_response_container(msg, content); } - void usrmsg_parser::create_contract_output_container(std::vector &msg, std::string_view content) const + void usrmsg_parser::create_contract_output_container(std::vector &msg, std::string_view content, const uint64_t lcl_seq_no, std::string_view lcl) const { if (protocol == util::PROTOCOL::JSON) - jusrmsg::create_contract_output_container(msg, content); + jusrmsg::create_contract_output_container(msg, content, lcl_seq_no, lcl); else - busrmsg::create_contract_output_container(msg, content); + busrmsg::create_contract_output_container(msg, content, lcl_seq_no, lcl); } int usrmsg_parser::parse(std::string_view message) diff --git a/src/msg/usrmsg_parser.hpp b/src/msg/usrmsg_parser.hpp index 4a67c453..05412d3c 100644 --- a/src/msg/usrmsg_parser.hpp +++ b/src/msg/usrmsg_parser.hpp @@ -18,14 +18,14 @@ namespace msg::usrmsg public: usrmsg_parser(const util::PROTOCOL protocol); - void create_status_response(std::vector &msg) const; + void create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl) const; void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, std::string_view input_sig) const; void create_contract_read_response_container(std::vector &msg, std::string_view content) const; - void create_contract_output_container(std::vector &msg, std::string_view content) const; + void create_contract_output_container(std::vector &msg, std::string_view content, const uint64_t lcl_seq_no, std::string_view lcl) const; int parse(std::string_view message); diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index f43d0bf8..93103754 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -39,7 +39,6 @@ namespace p2p struct history_ledger { - std::string state; std::string lcl; std::vector raw_ledger; }; @@ -60,6 +59,7 @@ namespace p2p struct history_response { + std::string requester_lcl; std::map hist_ledgers; LEDGER_RESPONSE_ERROR error = LEDGER_RESPONSE_ERROR::NONE; }; diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 09164c3d..d28b7006 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -11,9 +11,8 @@ #include "../comm/comm_client.hpp" #include "p2p.hpp" #include "peer_session_handler.hpp" -#include "../cons/ledger_handler.hpp" #include "../state/state_sync.hpp" -#include "../cons/cons.hpp" +#include "../ledger.hpp" namespace p2pmsg = msg::fbuf::p2pmsg; @@ -129,6 +128,7 @@ namespace p2p { if (p2pmsg::validate_container_trust(container) != 0) { + session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); LOG_DEBUG << "NPL message rejected due to trust failure. " << session.uniqueid.substr(0, 10); return 0; } @@ -146,6 +146,7 @@ namespace p2p { if (p2pmsg::validate_container_trust(container) != 0) { + session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); LOG_DEBUG << "State request message rejected due to trust failure. " << session.uniqueid.substr(0, 10); return 0; } @@ -159,6 +160,7 @@ namespace p2p { if (p2pmsg::validate_container_trust(container) != 0) { + session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); LOG_DEBUG << "State response message rejected due to trust failure. " << session.uniqueid.substr(0, 10); return 0; } @@ -175,33 +177,27 @@ namespace p2p { if (p2pmsg::validate_container_trust(container) != 0) { + session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); LOG_DEBUG << "History request message rejected due to trust failure. " << session.uniqueid.substr(0, 10); return 0; } const p2p::history_request hr = p2pmsg::create_history_request_from_msg(*content->message_as_History_Request_Message()); - //first check node has the required lcl available. -> if so send lcl history accordingly. - const bool req_lcl_avail = cons::check_required_lcl_availability(hr); - if (req_lcl_avail) - { - flatbuffers::FlatBufferBuilder fbuf(1024); - p2pmsg::create_msg_from_history_response(fbuf, cons::retrieve_ledger_history(hr)); - std::string_view msg = std::string_view( - reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); - - session.send(msg); - } + std::scoped_lock lock(ledger::sync_ctx.list_mutex); + ledger::sync_ctx.collected_history_requests.push_back(std::make_pair(session.uniqueid, std::move(hr))); } else if (content_message_type == p2pmsg::Message_History_Response_Message) //message is a lcl history response message { if (p2pmsg::validate_container_trust(container) != 0) { + session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); LOG_DEBUG << "History response message rejected due to trust failure. " << session.uniqueid.substr(0, 10); return 0; } - cons::handle_ledger_history_response( - p2pmsg::create_history_response_from_msg(*content->message_as_History_Response_Message())); + const p2p::history_response hr = p2pmsg::create_history_response_from_msg(*content->message_as_History_Response_Message()); + std::scoped_lock lock(ledger::sync_ctx.list_mutex); + ledger::sync_ctx.collected_history_responses.push_back(std::move(hr)); } else { diff --git a/src/pchheader.hpp b/src/pchheader.hpp index 49b4fcde..3b707ec1 100644 --- a/src/pchheader.hpp +++ b/src/pchheader.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -22,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -38,6 +40,7 @@ #include #include #include +#include #include #include #include diff --git a/src/state/state_serve.cpp b/src/state/state_serve.cpp index 90b2c444..c4d6df84 100644 --- a/src/state/state_serve.cpp +++ b/src/state/state_serve.cpp @@ -6,7 +6,7 @@ #include "../msg/fbuf/p2pmsg_content_generated.h" #include "../msg/fbuf/p2pmsg_helpers.hpp" #include "../msg/fbuf/common_helpers.hpp" -#include "../cons/cons.hpp" +#include "../ledger.hpp" #include "../hplog.hpp" #include "state_serve.hpp" #include "state_common.hpp" @@ -64,6 +64,7 @@ namespace state_serve } const uint64_t time_start = util::get_epoch_milliseconds(); + const std::string lcl = ledger::ctx.get_lcl(); for (auto &[session_id, request] : state_requests) { @@ -81,7 +82,7 @@ namespace state_serve const p2p::state_request sr = p2pmsg::create_state_request_from_msg(*content->message_as_State_Request_Message()); flatbuffers::FlatBufferBuilder fbuf(1024); - if (state_serve::create_state_response(fbuf, sr) == 1) + if (state_serve::create_state_response(fbuf, sr, lcl) == 1) { // Find the peer that we should send the state response to. std::scoped_lock lock(p2p::ctx.peer_connections_mutex); @@ -111,7 +112,7 @@ namespace state_serve * @return 1 if successful state response was generated. 0 if request is invalid * and no response was generated. -1 on error. */ - int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr) + int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr, std::string_view lcl) { LOG_DEBUG << "Serving state req. path:" << sr.parent_path << " block_id:" << sr.block_id; @@ -136,7 +137,7 @@ namespace state_serve resp.hash = sr.expected_hash; resp.data = std::string_view(reinterpret_cast(block.data()), block.size()); - msg::fbuf::p2pmsg::create_msg_from_block_response(fbuf, resp, cons::ctx.lcl); + msg::fbuf::p2pmsg::create_msg_from_block_response(fbuf, resp, lcl); return 1; // Success. } } @@ -158,7 +159,7 @@ namespace state_serve { msg::fbuf::p2pmsg::create_msg_from_filehashmap_response( fbuf, sr.parent_path, block_hashes, - file_length, sr.expected_hash, cons::ctx.lcl); + file_length, sr.expected_hash, lcl); return 1; // Success. } } @@ -177,7 +178,7 @@ namespace state_serve else if (result == 1) { msg::fbuf::p2pmsg::create_msg_from_fsentry_response( - fbuf, sr.parent_path, child_hash_nodes, sr.expected_hash, cons::ctx.lcl); + fbuf, sr.parent_path, child_hash_nodes, sr.expected_hash, lcl); return 1; // Success. } } diff --git a/src/state/state_serve.hpp b/src/state/state_serve.hpp index 9bf7619f..3ac847c2 100644 --- a/src/state/state_serve.hpp +++ b/src/state/state_serve.hpp @@ -14,7 +14,7 @@ namespace state_serve void state_serve_loop(); - int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr); + int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr, std::string_view lcl); int get_data_block(std::vector &vec, const std::string_view vpath, const uint32_t block_id, const hpfs::h32 expected_hash); diff --git a/src/state/state_sync.cpp b/src/state/state_sync.cpp index 8102c14f..e77ed8df 100644 --- a/src/state/state_sync.cpp +++ b/src/state/state_sync.cpp @@ -3,7 +3,7 @@ #include "../msg/fbuf/common_helpers.hpp" #include "../p2p/p2p.hpp" #include "../pchheader.hpp" -#include "../cons/cons.hpp" +#include "../ledger.hpp" #include "../hplog.hpp" #include "../util.hpp" #include "../hpfs/hpfs.hpp" @@ -129,6 +129,7 @@ namespace state_sync LOG_ERROR << "State sync: Failed to start hpfs rw session"; } + std::scoped_lock lock(ctx.target_state_update_lock); ctx.target_state = hpfs::h32_empty; ctx.is_syncing = false; } @@ -138,13 +139,18 @@ namespace state_sync void request_loop(const hpfs::h32 current_target, hpfs::h32 &updated_state) { + std::string lcl = ledger::ctx.get_lcl(); + // Send the initial root state request. - submit_request(backlog_item{BACKLOG_ITEM_TYPE::DIR, "/", -1, current_target}); + submit_request(backlog_item{BACKLOG_ITEM_TYPE::DIR, "/", -1, current_target}, lcl); while (!should_stop_request_loop(current_target)) { util::sleep(REQUEST_LOOP_WAIT); + // Get current lcl. + std::string lcl = ledger::ctx.get_lcl(); + { std::scoped_lock lock(p2p::ctx.collected_msgs.state_responses_mutex); @@ -216,7 +222,7 @@ namespace state_sync // Reset the counter and re-submit request. request.waiting_time = 0; LOG_DEBUG << "State sync: Resubmitting request..."; - submit_request(request); + submit_request(request, lcl); } } @@ -230,7 +236,7 @@ namespace state_sync return; const backlog_item &request = ctx.pending_requests.front(); - submit_request(request); + submit_request(request, lcl); ctx.pending_requests.pop_front(); } } @@ -257,7 +263,8 @@ namespace state_sync * @param block_id The requested block id. Only relevant if requesting a file block. Otherwise -1. * @param expected_hash The expected hash of the requested data. The peer will ignore the request if their hash is different. */ - void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, const hpfs::h32 expected_hash) + void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, + const hpfs::h32 expected_hash, std::string_view lcl) { p2p::state_request sr; sr.parent_path = path; @@ -266,25 +273,25 @@ namespace state_sync sr.expected_hash = expected_hash; flatbuffers::FlatBufferBuilder fbuf(1024); - msg::fbuf::p2pmsg::create_msg_from_state_request(fbuf, sr, cons::ctx.lcl); + msg::fbuf::p2pmsg::create_msg_from_state_request(fbuf, sr, lcl); p2p::send_message_to_random_peer(fbuf); //todo: send to a node that hold the majority state to improve reliability of retrieving state. } /** * Submits a pending state request to the peer. */ - void submit_request(const backlog_item &request) + void submit_request(const backlog_item &request, std::string_view lcl) { LOG_DEBUG << "State sync: Submitting request. type:" << request.type - << " path:" << request.path << " block_id:" << request.block_id - << " hash:" << request.expected_hash; + << " path:" << request.path << " block_id:" << request.block_id + << " hash:" << request.expected_hash; const std::string key = std::string(request.path) .append(reinterpret_cast(&request.expected_hash), sizeof(hpfs::h32)); ctx.submitted_requests.try_emplace(key, request); const bool is_file = request.type != BACKLOG_ITEM_TYPE::DIR; - request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash); + request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, lcl); } /** @@ -412,8 +419,8 @@ namespace state_sync std::string_view buf = msg::fbuf::flatbuff_bytes_to_sv(block_msg->data()); LOG_DEBUG << "State sync: Writing block_id " << block_id - << " (len:" << buf.length() - << ") of " << file_vpath; + << " (len:" << buf.length() + << ") of " << file_vpath; std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(file_vpath); const int fd = open(file_physical_path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, FILE_PERMS); diff --git a/src/state/state_sync.hpp b/src/state/state_sync.hpp index cc6b6e9a..a8794e39 100644 --- a/src/state/state_sync.hpp +++ b/src/state/state_sync.hpp @@ -68,9 +68,10 @@ namespace state_sync bool should_stop_request_loop(const hpfs::h32 current_target); - void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, const hpfs::h32 expected_hash); + void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, + const hpfs::h32 expected_hash, std::string_view lcl); - void submit_request(const backlog_item &request); + void submit_request(const backlog_item &request, std::string_view lcl); int handle_fs_entry_response(std::string_view parent_vpath, const msg::fbuf::p2pmsg::Fs_Entry_Response *fs_entry_resp); diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 6ac6dc2c..a131abb2 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -8,6 +8,7 @@ #include "../conf.hpp" #include "../crypto.hpp" #include "../hplog.hpp" +#include "../ledger.hpp" #include "usr.hpp" #include "user_session_handler.hpp" #include "user_input.hpp" @@ -180,7 +181,7 @@ namespace usr else if (msg_type == msg::usrmsg::MSGTYPE_STAT) { std::vector msg; - parser.create_status_response(msg); + parser.create_status_response(msg, ledger::ctx.get_seq_no(), ledger::ctx.get_lcl()); user.session.send(msg); return 0; } diff --git a/src/util.cpp b/src/util.cpp index ce8e8595..207f75b1 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -375,6 +375,20 @@ namespace util return remove(path.data()); } + /** + * Clears all files from a directory (not recursive). + */ + int clear_directory(std::string_view dir_path) + { + return nftw( + dir_path.data(), [](const char *fpath, const struct stat *sb, int typeflag, struct FTW *ftwbuf) { + if (typeflag == FTW_F) // Is file. + return remove(fpath); + return 0; + }, + 1, FTW_PHYS); + } + void split_string(std::vector &collection, std::string_view str, std::string_view delimeter) { if (str.empty()) diff --git a/src/util.hpp b/src/util.hpp index f2ba1339..a838b37e 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -108,6 +108,8 @@ namespace util int remove_file(std::string_view path); + int clear_directory(std::string_view dir_path); + void split_string(std::vector &collection, std::string_view str, std::string_view delimeter); } // namespace util