From 33bd63ac642f722a1fe9cb0ba19b31c0f37831d2 Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Fri, 4 Jun 2021 15:08:10 +0530 Subject: [PATCH] Status tracking infrastructure. (#317) * Added node's current status information tracker. * Used the status tracker for responding to status messages. * Used change-event notifications to broadcast UNL change event. --- CMakeLists.txt | 2 + src/comm/comm_server.hpp | 10 ++-- src/consensus.cpp | 47 ++++++++-------- src/consensus.hpp | 23 ++++---- src/hpfs/hpfs_serve.cpp | 5 +- src/ledger/ledger.cpp | 47 +++++++++------- src/ledger/ledger.hpp | 32 +++++------ src/ledger/ledger_mount.cpp | 7 +-- src/ledger/ledger_sync.cpp | 4 +- src/msg/bson/usrmsg_bson.cpp | 32 ++++++----- src/msg/bson/usrmsg_bson.hpp | 2 +- src/msg/fbuf/flatbuf_hasher.hpp | 2 +- src/msg/fbuf/p2pmsg_conversion.cpp | 8 +-- src/msg/fbuf/p2pmsg_conversion.hpp | 8 +-- src/msg/json/usrmsg_json.cpp | 39 +++++++------- src/msg/json/usrmsg_json.hpp | 2 +- src/msg/usrmsg_common.hpp | 2 +- src/msg/usrmsg_parser.cpp | 6 +-- src/msg/usrmsg_parser.hpp | 2 +- src/p2p/p2p.cpp | 10 +--- src/p2p/p2p.hpp | 43 +++------------ src/p2p/peer_comm_server.cpp | 15 +++++- src/sc/hpfs_log_sync.cpp | 7 +-- src/sc/hpfs_log_sync.hpp | 2 +- src/sc/sc.hpp | 3 +- src/status.cpp | 87 ++++++++++++++++++++++++++++++ src/status.hpp | 35 ++++++++++++ src/unl.cpp | 18 +++---- src/unl.hpp | 2 +- src/usr/user_comm_server.hpp | 13 +++++ src/usr/usr.cpp | 45 ++++++++++------ src/usr/usr.hpp | 2 +- src/util/sequence_hash.cpp | 36 +++++++++++++ src/util/sequence_hash.hpp | 27 ++++++++++ 34 files changed, 414 insertions(+), 211 deletions(-) create mode 100644 src/status.cpp create mode 100644 src/status.hpp create mode 100644 src/util/sequence_hash.cpp create mode 100644 src/util/sequence_hash.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 308afecb..3c5fa8f9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,6 +31,7 @@ add_executable(hpcore src/util/buffer_store.cpp src/util/merkle_hash_tree.cpp src/util/h32.cpp + src/util/sequence_hash.cpp src/unl.cpp src/crypto.cpp src/conf.cpp @@ -68,6 +69,7 @@ add_executable(hpcore src/ledger/ledger_sync.cpp src/ledger/ledger_serve.cpp src/ledger/ledger.cpp + src/status.cpp src/consensus.cpp src/main.cpp ) diff --git a/src/comm/comm_server.hpp b/src/comm/comm_server.hpp index 99891fd7..be939ce3 100644 --- a/src/comm/comm_server.hpp +++ b/src/comm/comm_server.hpp @@ -46,8 +46,8 @@ namespace comm const std::string name; const uint16_t listen_port; std::optional hpws_server; - std::thread watchdog_thread; // Connection watcher thread. - std::thread inbound_message_processor_thread; // Incoming message processor thread. + std::thread watchdog_thread; // Connection watcher thread. + std::thread message_processor_thread; // Message processor thread. void connection_watchdog() { @@ -151,7 +151,7 @@ namespace comm // If the hpws client object was not added to a session so far, in will get dstructed and the channel will close. } - void inbound_message_processor_loop() + void message_processor_loop() { util::mask_signal(); @@ -251,7 +251,7 @@ namespace comm return -1; watchdog_thread = std::thread(&comm_server::connection_watchdog, this); - inbound_message_processor_thread = std::thread(&comm_server::inbound_message_processor_loop, this); + message_processor_thread = std::thread(&comm_server::message_processor_loop, this); start_custom_jobs(); return 0; @@ -266,7 +266,7 @@ namespace comm watchdog_thread.join(); hpws_server.reset(); - inbound_message_processor_thread.join(); + message_processor_thread.join(); } }; diff --git a/src/consensus.cpp b/src/consensus.cpp index f6b423f3..5b37bb8b 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -11,10 +11,12 @@ #include "hplog.hpp" #include "crypto.hpp" #include "util/h32.hpp" +#include "util/sequence_hash.hpp" #include "unl.hpp" #include "ledger/ledger.hpp" #include "consensus.hpp" #include "sc/hpfs_log_sync.hpp" +#include "status.hpp" namespace p2pmsg = msg::fbuf::p2pmsg; @@ -107,11 +109,11 @@ namespace consensus revise_candidate_proposals(ctx.sync_status == 0); // Get current lcl, state, patch, primary shard and raw shard info. - p2p::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); + util::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); util::h32 state_hash = sc::contract_fs.get_parent_hash(sc::STATE_DIR_PATH); const util::h32 patch_hash = sc::contract_fs.get_parent_hash(sc::PATCH_FILE_PATH); - const p2p::sequence_hash last_primary_shard_id = ledger::ctx.get_last_primary_shard_id(); - const p2p::sequence_hash last_raw_shard_id = ledger::ctx.get_last_raw_shard_id(); + const util::sequence_hash last_primary_shard_id = ledger::ctx.get_last_primary_shard_id(); + const util::sequence_hash last_raw_shard_id = ledger::ctx.get_last_raw_shard_id(); if (ctx.stage == 0 || ctx.stage == 2) { @@ -142,7 +144,8 @@ namespace consensus // Check whether we are in sync with other nodes using proposals. { - const int new_sync_status = check_sync_status(unl_count, votes, lcl_id); + int new_sync_status = check_sync_status(unl_count, votes, lcl_id); + if (ctx.sync_status != 0 && new_sync_status == 0) { // If we are just becoming 'in-sync' after being out-of-sync, check the sync status again after the proper @@ -151,12 +154,14 @@ namespace consensus LOG_DEBUG << "Rechecking sync status after becoming in-sync."; revise_candidate_proposals(true); - ctx.sync_status = check_sync_status(unl_count, votes, lcl_id); - } - else - { - ctx.sync_status = new_sync_status; + new_sync_status = check_sync_status(unl_count, votes, lcl_id); } + + // Update the status if the sync status changed. + if ((ctx.sync_status != 0 && new_sync_status == 0) || (ctx.sync_status == 0 && new_sync_status != 0)) + status::sync_status_changed(new_sync_status == 0); + + ctx.sync_status = new_sync_status; } if (ctx.sync_status == -2) // Unreliable votes. @@ -215,7 +220,7 @@ namespace consensus if (ledger::update_ledger(cons_prop, consensed_users) == -1) return -1; - p2p::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); + util::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); LOG_INFO << "****Ledger created**** (lcl:" << lcl_id << " state:" << cons_prop.state_hash << " patch:" << cons_prop.patch_hash << ")"; // Now that there's a new ledger, prune any newly-expired candidate inputs. @@ -242,10 +247,10 @@ namespace consensus * Checks whether we are in sync with the received votes. * @return 0 if we are in sync. -1 on ledger or contract state desync. -2 if majority last ledger primary shard hash unreliable. */ - int check_sync_status(const size_t unl_count, vote_counter &votes, const p2p::sequence_hash &lcl_id) + int check_sync_status(const size_t unl_count, vote_counter &votes, const util::sequence_hash &lcl_id) { bool is_last_primary_shard_desync = false; - p2p::sequence_hash majority_primary_shard_id; + util::sequence_hash majority_primary_shard_id; if (check_last_primary_shard_hash_votes(is_last_primary_shard_desync, majority_primary_shard_id, votes, unl_count)) { // We proceed further only if last primary shard hash check was success (meaning last primary shard hash check could be reliably performed). @@ -261,7 +266,7 @@ namespace consensus // Check out raw shard hash with majority raw shard hash. bool is_last_raw_shard_desync = false; - p2p::sequence_hash majority_raw_shard_id; + util::sequence_hash majority_raw_shard_id; check_last_raw_shard_hash_votes(is_last_raw_shard_desync, majority_raw_shard_id, votes); // Check our state with majority state. @@ -477,7 +482,7 @@ namespace consensus /** * Removes any candidate inputs that has lived passed the current ledger seq no. */ - void expire_candidate_inputs(const p2p::sequence_hash &lcl_id) + void expire_candidate_inputs(const util::sequence_hash &lcl_id) { auto itr = ctx.candidate_user_inputs.begin(); while (itr != ctx.candidate_user_inputs.end()) @@ -757,7 +762,7 @@ namespace consensus } p2p::proposal create_stage0_proposal(const util::h32 &state_hash, const util::h32 &patch_hash, - const p2p::sequence_hash &last_primary_shard_id, const p2p::sequence_hash &last_raw_shard_id) + const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id) { // This is the proposal that stage 0 votes on. // We report our own values in stage 0. @@ -786,7 +791,7 @@ namespace consensus } p2p::proposal create_stage123_proposal(vote_counter &votes, const size_t unl_count, const util::h32 &state_hash, const util::h32 &patch_hash, - const p2p::sequence_hash &last_primary_shard_id, const p2p::sequence_hash &last_raw_shard_id) + const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id) { // The proposal to be emited at the end of this stage. p2p::proposal p; @@ -911,7 +916,7 @@ namespace consensus * @param unl_count Number of unl peers. * @return True if majority ledger primary hash could be calculated reliably. False if shard index hash check failed due to unreliable votes. */ - bool check_last_primary_shard_hash_votes(bool &is_desync, p2p::sequence_hash &majority_primary_shard_id, vote_counter &votes, const size_t unl_count) + bool check_last_primary_shard_hash_votes(bool &is_desync, util::sequence_hash &majority_primary_shard_id, vote_counter &votes, const size_t unl_count) { uint32_t total_ledger_primary_hash_votes = 0; @@ -971,7 +976,7 @@ namespace consensus * @param majority_primary_shard_id Majority primary shard id. * @param votes Vote counter for this stage. */ - void check_last_raw_shard_hash_votes(bool &is_ledger_blob_desync, p2p::sequence_hash &majority_raw_shard_id, vote_counter &votes) + void check_last_raw_shard_hash_votes(bool &is_ledger_blob_desync, util::sequence_hash &majority_raw_shard_id, vote_counter &votes) { for (const auto &[pubkey, cp] : ctx.candidate_proposals) { @@ -1049,7 +1054,7 @@ namespace consensus * @param consensed_users Consensed users and their inputs. * @param lcl_id Current lcl id of the node. */ - int execute_contract(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id) + int execute_contract(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id) { if (!conf::cfg.contract.execute || ctx.is_shutting_down) return 0; @@ -1109,7 +1114,7 @@ namespace consensus * @param consensed_users The map of consensed users containing their inputs. * @param lcl_id The ledger the inputs got included in. */ - void dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id) + void dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id) { if (consensed_users.empty()) return; @@ -1139,7 +1144,7 @@ namespace consensus * @param consensed_users The map of consensed users containing their outputs. * @param lcl_id The ledger the outputs got included in. */ - void dispatch_consensed_user_outputs(const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id) + void dispatch_consensed_user_outputs(const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id) { if (!consensed_users.empty()) { diff --git a/src/consensus.hpp b/src/consensus.hpp index beea1625..5a609280 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -9,6 +9,7 @@ #include "p2p/p2p.hpp" #include "usr/user_input.hpp" #include "util/h32.hpp" +#include "util/sequence_hash.hpp" namespace consensus { @@ -134,8 +135,8 @@ namespace consensus std::map output_hash; std::map state_hash; std::map patch_hash; - std::map last_ledger_primary_shard; - std::map last_ledger_raw_shard; + std::map last_ledger_primary_shard; + std::map last_ledger_raw_shard; }; extern std::atomic is_patch_update_pending; // Keep track whether the patch file is changed by the SC and is not yet applied to runtime. @@ -152,13 +153,13 @@ namespace consensus int commit_consensus_results(const p2p::proposal &cons_prop, const consensus::consensed_user_map &consensed_users, const util::h32 &patch_hash); - int check_sync_status(const size_t unl_count, vote_counter &votes, const p2p::sequence_hash &lcl_id); + int check_sync_status(const size_t unl_count, vote_counter &votes, const util::sequence_hash &lcl_id); void revise_candidate_proposals(const bool in_sync); int prepare_consensed_users(consensed_user_map &consensed_users, const p2p::proposal &cons_prop); - void expire_candidate_inputs(const p2p::sequence_hash &lcl_id); + void expire_candidate_inputs(const util::sequence_hash &lcl_id); int cleanup_consensed_user_inputs(const consensed_user_map &consensed_users); @@ -173,16 +174,16 @@ namespace consensus int verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no); p2p::proposal create_stage0_proposal(const util::h32 &state_hash, const util::h32 &patch_hash, - const p2p::sequence_hash &last_primary_shard_id, const p2p::sequence_hash &last_raw_shard_id); + const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id); p2p::proposal create_stage123_proposal(vote_counter &votes, const size_t unl_count, const util::h32 &state_hash, const util::h32 &patch_hash, - const p2p::sequence_hash &last_primary_shard_id, const p2p::sequence_hash &last_raw_shard_id); + const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id); void broadcast_proposal(const p2p::proposal &p); - bool check_last_primary_shard_hash_votes(bool &is_desync, p2p::sequence_hash &majority_primary_shard_id, vote_counter &votes, const size_t unl_count); + bool check_last_primary_shard_hash_votes(bool &is_desync, util::sequence_hash &majority_primary_shard_id, vote_counter &votes, const size_t unl_count); - void check_last_raw_shard_hash_votes(bool &is_ledger_blob_desync, p2p::sequence_hash &majority_raw_shard_id, vote_counter &votes); + void check_last_raw_shard_hash_votes(bool &is_ledger_blob_desync, util::sequence_hash &majority_raw_shard_id, vote_counter &votes); void check_state_votes(bool &is_state_desync, util::h32 &majority_state_hash, vote_counter &votes); @@ -194,11 +195,11 @@ namespace consensus uint64_t get_stage_time_resolution(const uint64_t time); - int execute_contract(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id); + int execute_contract(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id); - void dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id); + void dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id); - void dispatch_consensed_user_outputs(const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id); + void dispatch_consensed_user_outputs(const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id); void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const consensed_user_map &consensed_users); diff --git a/src/hpfs/hpfs_serve.cpp b/src/hpfs/hpfs_serve.cpp index 2e62c44d..b5283900 100644 --- a/src/hpfs/hpfs_serve.cpp +++ b/src/hpfs/hpfs_serve.cpp @@ -1,5 +1,6 @@ #include "../pchheader.hpp" #include "../util/h32.hpp" +#include "../util/sequence_hash.hpp" #include "../util/util.hpp" #include "../p2p/p2p.hpp" #include "../msg/fbuf/p2pmsg_conversion.hpp" @@ -69,8 +70,8 @@ namespace hpfs prev_requests_processed = !hpfs_requests.empty(); const uint64_t time_start = util::get_epoch_milliseconds(); - const p2p::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); - const p2p::sequence_hash last_primary_shard_id = ledger::ctx.get_last_primary_shard_id(); + const util::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); + const util::sequence_hash last_primary_shard_id = ledger::ctx.get_last_primary_shard_id(); const uint32_t request_batch_timeout = hpfs::get_request_resubmit_timeout() * 0.9; if (hpfs_requests.empty()) diff --git a/src/ledger/ledger.cpp b/src/ledger/ledger.cpp index 94e8535a..5c083f5f 100644 --- a/src/ledger/ledger.cpp +++ b/src/ledger/ledger.cpp @@ -5,7 +5,7 @@ #include "../conf.hpp" #include "../util/version.hpp" #include "../util/util.hpp" -#include "../msg/fbuf/common_helpers.hpp" +#include "../status.hpp" #include "ledger_common.hpp" #include "ledger_serve.hpp" @@ -79,7 +79,7 @@ namespace ledger } // Remove old shards that exceeds max shard range. - const p2p::sequence_hash lcl_id = ctx.get_lcl_id(); + const util::sequence_hash lcl_id = ctx.get_lcl_id(); remove_old_shards(lcl_id.seq_no, PRIMARY_SHARD_SIZE, conf::cfg.node.history_config.max_primary_shards, PRIMARY_DIR); remove_old_shards(lcl_id.seq_no, RAW_SHARD_SIZE, conf::cfg.node.history_config.max_raw_shards, RAW_DIR); @@ -111,7 +111,7 @@ namespace ledger if (ledger_fs.acquire_rw_session() == -1) return -1; - p2p::sequence_hash lcl_id; + util::sequence_hash lcl_id; if (update_primary_ledger(proposal, consensed_users, lcl_id) == -1 || update_ledger_raw_data(proposal, consensed_users, lcl_id) == -1) { @@ -129,9 +129,9 @@ namespace ledger * @param new_lcl_id The new ledger seq no. and hash. * @return 0 on success. -1 on failure. */ - int update_primary_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, p2p::sequence_hash &new_lcl_id) + int update_primary_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, util::sequence_hash &new_lcl_id) { - const p2p::sequence_hash lcl_id = ctx.get_lcl_id(); + const util::sequence_hash lcl_id = ctx.get_lcl_id(); new_lcl_id.seq_no = lcl_id.seq_no + 1; sqlite3 *db = NULL; @@ -141,7 +141,8 @@ namespace ledger const int shard_res = prepare_shard(&db, shard_seq_no, new_lcl_id.seq_no, PRIMARY_SHARD_SIZE, PRIMARY_DIR, PRIMARY_DB, true); // Insert primary ledger record. - if (shard_res >= 0 && insert_ledger_record(db, lcl_id, shard_seq_no, proposal, new_lcl_id) != -1) + ledger_record ledger; + if (shard_res >= 0 && insert_ledger_record(db, lcl_id, shard_seq_no, proposal, new_lcl_id, ledger) != -1) { sqlite::close_db(&db); ctx.set_lcl_id(new_lcl_id); @@ -155,7 +156,7 @@ namespace ledger } // Update the last shard hash and shard seqence number tracker when a new ledger is created. - ctx.set_last_primary_shard_id(p2p::sequence_hash{shard_seq_no, last_primary_shard_hash}); + ctx.set_last_primary_shard_id(util::sequence_hash{shard_seq_no, last_primary_shard_hash}); // Update the hpfs log index file only in full history mode. if (conf::cfg.node.history == conf::HISTORY::FULL && sc::contract_fs.update_hpfs_log_index(new_lcl_id.seq_no) == -1) @@ -168,6 +169,9 @@ namespace ledger if (shard_res == 1) remove_old_shards(new_lcl_id.seq_no, PRIMARY_SHARD_SIZE, conf::cfg.node.history_config.max_primary_shards, PRIMARY_DIR); + // Update the node's status. + status::ledger_created(new_lcl_id, ledger); + return 0; } @@ -175,7 +179,7 @@ namespace ledger return -1; } - int update_ledger_raw_data(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id) + int update_ledger_raw_data(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const util::sequence_hash &lcl_id) { if ((conf::cfg.node.history != conf::HISTORY::FULL && conf::cfg.node.history_config.max_raw_shards == 0)) return 0; @@ -194,7 +198,7 @@ namespace ledger // Update in-memory context raw shard hash after inserting new record. util::h32 last_raw_shard_hash; if (ledger_fs.get_hash(last_raw_shard_hash, hpfs::RW_SESSION_NAME, std::string(RAW_DIR).append("/").append(std::to_string(shard_seq_no))) != -1) - ctx.set_last_raw_shard_id(p2p::sequence_hash{shard_seq_no, last_raw_shard_hash}); + ctx.set_last_raw_shard_id(util::sequence_hash{shard_seq_no, last_raw_shard_hash}); // Remove old shards if new one got created. if (shard_res == 1) @@ -214,21 +218,22 @@ namespace ledger * @param shard_seq_no Current primary shard seq no. * @param proposal The consensus proposal. * @param new_lcl_id Newly created ledger id. + * @param ledger Newly created ledger record. * @return 0 on success. -1 on failure. */ - int insert_ledger_record(sqlite3 *db, const p2p::sequence_hash ¤t_lcl_id, const uint64_t shard_seq_no, - const p2p::proposal &proposal, p2p::sequence_hash &new_lcl_id) + int insert_ledger_record(sqlite3 *db, const util::sequence_hash ¤t_lcl_id, const uint64_t shard_seq_no, + const p2p::proposal &proposal, util::sequence_hash &new_lcl_id, ledger_record &ledger) { // Combined binary hash of consensus user binary pub keys. const std::string user_hash = crypto::get_list_hash(proposal.users); // Combined binary hash of consensus input hashes. std::vector inp_hashes; + + // We need to consider the last 32 bytes of each ordered hash to get input hash without the nonce prefix. for (const std::string &o_hash : proposal.input_ordered_hashes) - { - // We need to consider the last 32 bytes of each ordered hash to get input hash without the nonce prefix. inp_hashes.push_back(util::get_string_suffix(o_hash, BLAKE3_OUT_LEN)); - } + const std::string input_hash = crypto::get_list_hash(inp_hashes); uint8_t seq_no_bytes[8], time_bytes[8]; @@ -254,7 +259,7 @@ namespace ledger new_lcl_id.hash = crypto::get_hash(prev_ledger_hash, data_hash); // Construct ledger struct with binary hashes. - const ledger_record ledger{ + ledger = ledger_record{ current_lcl_id.seq_no + 1, proposal.time, std::string(new_lcl_id.hash.to_string_view()), @@ -285,7 +290,7 @@ namespace ledger * @return 0 on success. -1 on failure. */ int insert_raw_data_records(sqlite3 *db, const uint64_t shard_seq_no, const p2p::proposal &proposal, - const consensus::consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id) + const consensus::consensed_user_map &consensed_users, const util::sequence_hash &lcl_id) { // We keep sqlite records about users, inputs and outputs. To store raw input and output content, we use the corresponding blob file // within the shard. Each shard has a sqlite db, raw inputs blob file and raw outputs blob file. @@ -641,7 +646,7 @@ namespace ledger * @param genesis_fallback Whether to automaticaly fallback to genesis ledger on ledger db read error. * @return Returns 0 on success -1 on error. */ - int get_last_ledger_and_update_context(std::string_view session_name, const p2p::sequence_hash &last_primary_shard_id, const bool genesis_fallback) + int get_last_ledger_and_update_context(std::string_view session_name, const util::sequence_hash &last_primary_shard_id, const bool genesis_fallback) { sqlite3 *db = NULL; const std::string shard_path = ledger_fs.physical_path(session_name, ledger::PRIMARY_DIR) + "/" + std::to_string(last_primary_shard_id.seq_no); @@ -649,7 +654,7 @@ namespace ledger if (last_primary_shard_id.empty()) { // This is the genesis ledger. - ctx.set_lcl_id(p2p::sequence_hash{0, util::h32_empty}); + ctx.set_lcl_id(util::sequence_hash{0, util::h32_empty}); return 0; } @@ -677,11 +682,13 @@ namespace ledger sqlite::close_db(&db); // Update new lcl information. - p2p::sequence_hash lcl_id; + util::sequence_hash lcl_id; lcl_id.seq_no = last_ledger.seq_no; lcl_id.hash = last_ledger.ledger_hash; ctx.set_lcl_id(lcl_id); + status::init_ledger(lcl_id, last_ledger); + return 0; } @@ -692,7 +699,7 @@ namespace ledger * @param shard_parent_dir Parent director vpath of the shards. * @return 0 on success. -1 on error. */ - int get_last_shard_info(std::string_view session_name, p2p::sequence_hash &last_shard_id, const std::string &shard_parent_dir) + int get_last_shard_info(std::string_view session_name, util::sequence_hash &last_shard_id, const std::string &shard_parent_dir) { const std::string last_shard_seq_no_vpath = shard_parent_dir + SHARD_SEQ_NO_FILENAME; const std::string last_shard_seq_no_path = ledger_fs.physical_path(session_name, last_shard_seq_no_vpath); diff --git a/src/ledger/ledger.hpp b/src/ledger/ledger.hpp index 5af0e3c1..6cff336a 100644 --- a/src/ledger/ledger.hpp +++ b/src/ledger/ledger.hpp @@ -13,11 +13,11 @@ namespace ledger { private: std::shared_mutex lcl_mutex; - p2p::sequence_hash lcl_id; + util::sequence_hash lcl_id; std::shared_mutex last_primary_shard_mutex; - p2p::sequence_hash last_primary_shard_id; + util::sequence_hash last_primary_shard_id; std::shared_mutex last_raw_shard_mutex; - p2p::sequence_hash last_raw_shard_id; + util::sequence_hash last_raw_shard_id; public: // These flags will be marked as true after doing the shards cleanup and requesting @@ -25,37 +25,37 @@ namespace ledger std::atomic primary_shards_persisted = false; std::atomic raw_shards_persisted = false; - const p2p::sequence_hash get_lcl_id() + const util::sequence_hash get_lcl_id() { std::shared_lock lock(lcl_mutex); return lcl_id; } - void set_lcl_id(const p2p::sequence_hash &sequence_hash_id) + void set_lcl_id(const util::sequence_hash &sequence_hash_id) { std::unique_lock lock(lcl_mutex); lcl_id = sequence_hash_id; } - const p2p::sequence_hash get_last_primary_shard_id() + const util::sequence_hash get_last_primary_shard_id() { std::shared_lock lock(last_primary_shard_mutex); return last_primary_shard_id; } - void set_last_primary_shard_id(const p2p::sequence_hash &sequence_hash_id) + void set_last_primary_shard_id(const util::sequence_hash &sequence_hash_id) { std::unique_lock lock(last_primary_shard_mutex); last_primary_shard_id = sequence_hash_id; } - const p2p::sequence_hash get_last_raw_shard_id() + const util::sequence_hash get_last_raw_shard_id() { std::shared_lock lock(last_raw_shard_mutex); return last_raw_shard_id; } - void set_last_raw_shard_id(const p2p::sequence_hash &sequence_hash_id) + void set_last_raw_shard_id(const util::sequence_hash &sequence_hash_id) { std::unique_lock lock(last_raw_shard_mutex); last_raw_shard_id = sequence_hash_id; @@ -72,15 +72,15 @@ namespace ledger int update_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users); - int update_primary_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, p2p::sequence_hash &new_lcl_id); + int update_primary_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, util::sequence_hash &new_lcl_id); - int update_ledger_raw_data(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id); + int update_ledger_raw_data(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const util::sequence_hash &lcl_id); - int insert_ledger_record(sqlite3 *db, const p2p::sequence_hash ¤t_lcl_id, const uint64_t shard_seq_no, - const p2p::proposal &proposal, p2p::sequence_hash &new_lcl_id); + int insert_ledger_record(sqlite3 *db, const util::sequence_hash ¤t_lcl_id, const uint64_t shard_seq_no, + const p2p::proposal &proposal, util::sequence_hash &new_lcl_id, ledger_record &ledger); int insert_raw_data_records(sqlite3 *db, const uint64_t shard_seq_no, const p2p::proposal &proposal, - const consensus::consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id); + const consensus::consensed_user_map &consensed_users, const util::sequence_hash &lcl_id); int create_raw_data_blob_file(const std::string &shard_path, const char *file_name, size_t &file_size); @@ -91,9 +91,9 @@ namespace ledger void persist_shard_history(const uint64_t shard_seq_no, std::string_view shard_parent_dir); - int get_last_ledger_and_update_context(std::string_view session_name, const p2p::sequence_hash &last_primary_shard_id, const bool genesis_fallback); + int get_last_ledger_and_update_context(std::string_view session_name, const util::sequence_hash &last_primary_shard_id, const bool genesis_fallback); - int get_last_shard_info(std::string_view session_name, p2p::sequence_hash &last_shard_id, const std::string &shard_parent_dir); + int get_last_shard_info(std::string_view session_name, util::sequence_hash &last_shard_id, const std::string &shard_parent_dir); int persist_max_shard_seq_no(const std::string &shard_parent_dir, const uint64_t last_shard_seq_no); diff --git a/src/ledger/ledger_mount.cpp b/src/ledger/ledger_mount.cpp index d4d622b7..6aec8ab2 100644 --- a/src/ledger/ledger_mount.cpp +++ b/src/ledger/ledger_mount.cpp @@ -1,4 +1,5 @@ -#include "./ledger_mount.hpp" +#include "../util/sequence_hash.hpp" +#include "ledger_mount.hpp" #include "ledger.hpp" namespace ledger @@ -10,8 +11,8 @@ namespace ledger int ledger_mount::prepare_fs() { // Add ledger fs preparation logic here. - p2p::sequence_hash last_primary_shard_id; - p2p::sequence_hash last_raw_shard_id; + util::sequence_hash last_primary_shard_id; + util::sequence_hash last_raw_shard_id; if (acquire_rw_session() == -1) { diff --git a/src/ledger/ledger_sync.cpp b/src/ledger/ledger_sync.cpp index 395c3bfe..a2ca8ee5 100644 --- a/src/ledger/ledger_sync.cpp +++ b/src/ledger/ledger_sync.cpp @@ -56,7 +56,7 @@ namespace ledger return; } - const p2p::sequence_hash updated_primary_shard_id{synced_shard_seq_no, hash}; + const util::sequence_hash updated_primary_shard_id{synced_shard_seq_no, hash}; if (get_last_ledger_and_update_context(hpfs::RW_SESSION_NAME, updated_primary_shard_id, false) == -1) { LOG_ERROR << "Error updating context from the synced shard " << vpath; @@ -114,7 +114,7 @@ namespace ledger } last_raw_shard_seq_no = synced_shard_seq_no; - ctx.set_last_raw_shard_id(p2p::sequence_hash{synced_shard_seq_no, hash}); + ctx.set_last_raw_shard_id(util::sequence_hash{synced_shard_seq_no, hash}); is_last_raw_shard_syncing = false; // If existing max shard is older than the max we can keep. Then delete all the existing shards. diff --git a/src/msg/bson/usrmsg_bson.cpp b/src/msg/bson/usrmsg_bson.cpp index 96c7a8bb..6c044cd4 100644 --- a/src/msg/bson/usrmsg_bson.cpp +++ b/src/msg/bson/usrmsg_bson.cpp @@ -1,10 +1,11 @@ #include "../../conf.hpp" -#include "../../p2p/p2p.hpp" #include "../../pchheader.hpp" #include "../../util/version.hpp" #include "../../util/util.hpp" +#include "../../util/sequence_hash.hpp" #include "../../hplog.hpp" #include "../../ledger/ledger_query.hpp" +#include "../../status.hpp" #include "../usrmsg_common.hpp" #include "usrmsg_bson.hpp" @@ -20,8 +21,11 @@ namespace msg::usrmsg::bson * "ledger_hash": * } */ - void create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl_hash) + void create_status_response(std::vector &msg) { + const util::sequence_hash lcl_id = status::get_lcl_id(); + const std::set unl = status::get_unl(); + jsoncons::bson::bson_bytes_encoder encoder(msg); encoder.begin_object(); encoder.key(msg::usrmsg::FLD_TYPE); @@ -29,9 +33,9 @@ namespace msg::usrmsg::bson encoder.key(msg::usrmsg::FLD_HP_VERSION); encoder.string_value(version::HP_VERSION); encoder.key(msg::usrmsg::FLD_LEDGER_SEQ_NO); - encoder.int64_value(lcl_seq_no); + encoder.int64_value(lcl_id.seq_no); encoder.key(msg::usrmsg::FLD_LEDGER_HASH); - encoder.byte_string_value(lcl_hash); + encoder.byte_string_value(lcl_id.hash.to_string_view()); encoder.key(msg::usrmsg::FLD_ROUND_TIME); encoder.uint64_value(conf::cfg.contract.roundtime); encoder.key(msg::usrmsg::FLD_CONTARCT_EXECUTION_ENABLED); @@ -43,27 +47,21 @@ namespace msg::usrmsg::bson encoder.key(msg::usrmsg::FLD_CURRENT_UNL); encoder.begin_array(); - for (std::string_view unl : conf::cfg.contract.unl) - encoder.byte_string_value(unl); + for (std::string_view pubkey : unl) + encoder.byte_string_value(pubkey); encoder.end_array(); encoder.key(msg::usrmsg::FLD_PEERS); { - std::scoped_lock lock(p2p::ctx.peer_connections_mutex); - - const size_t max_peers_count = MIN(MAX_KNOWN_PEERS_INFO, p2p::ctx.peer_connections.size()); + const std::set peers = status::get_peers(); + const size_t max_peers_count = MIN(MAX_KNOWN_PEERS_INFO, peers.size()); size_t count = 1; encoder.begin_array(); - // Currently all peers, up to a max of 10 are sent regardless of state. - for (auto peer = p2p::ctx.peer_connections.begin(); peer != p2p::ctx.peer_connections.end() && count <= max_peers_count; peer++) + for (auto peer = peers.begin(); peer != peers.end() && count <= max_peers_count; peer++) { - const p2p::peer_comm_session *sess = peer->second; - if (sess->known_ipport) - { - encoder.string_value(sess->known_ipport->to_string()); - count++; - } + encoder.string_value(peer->to_string()); + count++; } encoder.end_array(); } diff --git a/src/msg/bson/usrmsg_bson.hpp b/src/msg/bson/usrmsg_bson.hpp index 0b9dbef5..82a2d39b 100644 --- a/src/msg/bson/usrmsg_bson.hpp +++ b/src/msg/bson/usrmsg_bson.hpp @@ -8,7 +8,7 @@ namespace msg::usrmsg::bson { - void create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl_hash); + void create_status_response(std::vector &msg); void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash); diff --git a/src/msg/fbuf/flatbuf_hasher.hpp b/src/msg/fbuf/flatbuf_hasher.hpp index 664bdb7b..94e2ab69 100644 --- a/src/msg/fbuf/flatbuf_hasher.hpp +++ b/src/msg/fbuf/flatbuf_hasher.hpp @@ -64,7 +64,7 @@ namespace msg::fbuf::p2pmsg add(h.to_string_view()); } - void add(const p2p::sequence_hash &h) + void add(const util::sequence_hash &h) { add(h.seq_no); add(h.hash); diff --git a/src/msg/fbuf/p2pmsg_conversion.cpp b/src/msg/fbuf/p2pmsg_conversion.cpp index c7caf6ff..d5f29664 100644 --- a/src/msg/fbuf/p2pmsg_conversion.cpp +++ b/src/msg/fbuf/p2pmsg_conversion.cpp @@ -243,7 +243,7 @@ namespace msg::fbuf::p2pmsg return hpfs_log_response; } - p2p::sequence_hash flatbuf_seqhash_to_seqhash(const SequenceHash *fbseqhash) + util::sequence_hash flatbuf_seqhash_to_seqhash(const SequenceHash *fbseqhash) { return { fbseqhash->seq_no(), @@ -334,7 +334,7 @@ namespace msg::fbuf::p2pmsg return crypto::sign(hasher.hash(), conf::cfg.node.private_key); } - const std::string generate_npl_signature(std::string_view data, const p2p::sequence_hash &lcl_id) + const std::string generate_npl_signature(std::string_view data, const util::sequence_hash &lcl_id) { flatbuf_hasher hasher; hasher.add(data); @@ -411,7 +411,7 @@ namespace msg::fbuf::p2pmsg create_p2p_msg(builder, P2PMsgContent_ProposalMsg, msg.Union()); } - void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &builder, std::string_view data, const p2p::sequence_hash &lcl_id) + void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &builder, std::string_view data, const util::sequence_hash &lcl_id) { const auto msg = CreateNplMsg( builder, @@ -647,7 +647,7 @@ namespace msg::fbuf::p2pmsg } const flatbuffers::Offset - seqhash_to_flatbuf_seqhash(flatbuffers::FlatBufferBuilder &builder, const p2p::sequence_hash &seqhash) + seqhash_to_flatbuf_seqhash(flatbuffers::FlatBufferBuilder &builder, const util::sequence_hash &seqhash) { return CreateSequenceHash(builder, seqhash.seq_no, hash_to_flatbuf_bytes(builder, seqhash.hash)); } diff --git a/src/msg/fbuf/p2pmsg_conversion.hpp b/src/msg/fbuf/p2pmsg_conversion.hpp index cf31eccc..3e6733f6 100644 --- a/src/msg/fbuf/p2pmsg_conversion.hpp +++ b/src/msg/fbuf/p2pmsg_conversion.hpp @@ -40,7 +40,7 @@ namespace msg::fbuf::p2pmsg const p2p::hpfs_log_response create_hpfs_log_response_from_msg(const p2p::peer_message_info &mi); - p2p::sequence_hash flatbuf_seqhash_to_seqhash(const msg::fbuf::p2pmsg::SequenceHash *fbseqhash); + util::sequence_hash flatbuf_seqhash_to_seqhash(const msg::fbuf::p2pmsg::SequenceHash *fbseqhash); const std::set flatbuf_bytearrayvector_to_stringlist(const flatbuffers::Vector> *fbvec); @@ -56,7 +56,7 @@ namespace msg::fbuf::p2pmsg const std::string generate_proposal_signature(const p2p::proposal &p); - const std::string generate_npl_signature(std::string_view data, const p2p::sequence_hash &lcl_id); + const std::string generate_npl_signature(std::string_view data, const util::sequence_hash &lcl_id); void create_p2p_msg(flatbuffers::FlatBufferBuilder &builder, const msg::fbuf::p2pmsg::P2PMsgContent content_type, const flatbuffers::Offset content); @@ -68,7 +68,7 @@ namespace msg::fbuf::p2pmsg void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p); - void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &builder, std::string_view data, const p2p::sequence_hash &lcl_id); + void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &builder, std::string_view data, const util::sequence_hash &lcl_id); void create_msg_from_hpfs_request(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_request &hr); @@ -109,7 +109,7 @@ namespace msg::fbuf::p2pmsg peer_propertiesvector_to_flatbuf_peer_propertieslist(flatbuffers::FlatBufferBuilder &builder, const std::vector &peers, const std::optional &skipping_ip_port); const flatbuffers::Offset - seqhash_to_flatbuf_seqhash(flatbuffers::FlatBufferBuilder &builder, const p2p::sequence_hash &seqhash); + seqhash_to_flatbuf_seqhash(flatbuffers::FlatBufferBuilder &builder, const util::sequence_hash &seqhash); const flatbuffers::Offset>> stringlist_to_flatbuf_bytearrayvector(flatbuffers::FlatBufferBuilder &builder, const std::set &set); diff --git a/src/msg/json/usrmsg_json.cpp b/src/msg/json/usrmsg_json.cpp index 783ffceb..58c506a6 100644 --- a/src/msg/json/usrmsg_json.cpp +++ b/src/msg/json/usrmsg_json.cpp @@ -1,5 +1,6 @@ #include "../../pchheader.hpp" #include "../../util/version.hpp" +#include "../../util/sequence_hash.hpp" #include "../../util/util.hpp" #include "../../util/merkle_hash_tree.hpp" #include "../../unl.hpp" @@ -7,6 +8,7 @@ #include "../../hplog.hpp" #include "../../conf.hpp" #include "../../ledger/ledger_query.hpp" +#include "../../status.hpp" #include "../usrmsg_common.hpp" #include "usrmsg_json.hpp" @@ -138,9 +140,12 @@ namespace msg::usrmsg::json * "ledger_hash": "" * } */ - void create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl_hash) + void create_status_response(std::vector &msg) { - const uint16_t msg_length = 406 + (69 * conf::cfg.contract.unl.size()); + const util::sequence_hash lcl_id = status::get_lcl_id(); + const std::set unl = status::get_unl(); + + const uint16_t msg_length = 406 + (69 * unl.size()); msg.reserve(msg_length); msg += "{\""; @@ -154,11 +159,11 @@ namespace msg::usrmsg::json msg += SEP_COMMA; msg += msg::usrmsg::FLD_LEDGER_SEQ_NO; msg += SEP_COLON_NOQUOTE; - msg += std::to_string(lcl_seq_no); + msg += std::to_string(lcl_id.seq_no); msg += SEP_COMMA_NOQUOTE; msg += msg::usrmsg::FLD_LEDGER_HASH; msg += SEP_COLON; - msg += util::to_hex(lcl_hash); + msg += util::to_hex(lcl_id.hash.to_string_view()); msg += SEP_COMMA; msg += msg::usrmsg::FLD_ROUND_TIME; msg += SEP_COLON_NOQUOTE; @@ -180,11 +185,11 @@ namespace msg::usrmsg::json msg += SEP_COLON_NOQUOTE; msg += OPEN_SQR_BRACKET; - for (auto node = conf::cfg.contract.unl.begin(); node != conf::cfg.contract.unl.end(); node++) + for (auto pubkey = unl.begin(); pubkey != unl.end(); pubkey++) { - msg += DOUBLE_QUOTE + util::to_hex(*node) + DOUBLE_QUOTE; + msg += DOUBLE_QUOTE + util::to_hex(*pubkey) + DOUBLE_QUOTE; - if (std::next(node) != conf::cfg.contract.unl.end()) + if (std::next(pubkey) != unl.end()) msg += ","; } @@ -195,22 +200,16 @@ namespace msg::usrmsg::json msg += OPEN_SQR_BRACKET; { - std::scoped_lock lock(p2p::ctx.peer_connections_mutex); - - const size_t max_peers_count = MIN(MAX_KNOWN_PEERS_INFO, p2p::ctx.peer_connections.size()); + const std::set peers = status::get_peers(); + const size_t max_peers_count = MIN(MAX_KNOWN_PEERS_INFO, peers.size()); size_t count = 1; - // Currently all peers, up to a max of 10 are sent regardless of state. - for (auto peer = p2p::ctx.peer_connections.begin(); peer != p2p::ctx.peer_connections.end() && count <= max_peers_count; peer++) + for (auto peer = peers.begin(); peer != peers.end() && count <= max_peers_count; peer++) { - const p2p::peer_comm_session *sess = peer->second; - if (sess->known_ipport) - { - if (count > 1) - msg += ","; - msg += DOUBLE_QUOTE + sess->known_ipport->to_string() + DOUBLE_QUOTE; - count++; - } + if (count > 1) + msg += ","; + msg += DOUBLE_QUOTE + peer->to_string() + DOUBLE_QUOTE; + count++; } } diff --git a/src/msg/json/usrmsg_json.hpp b/src/msg/json/usrmsg_json.hpp index 68d64ad8..e004191b 100644 --- a/src/msg/json/usrmsg_json.hpp +++ b/src/msg/json/usrmsg_json.hpp @@ -12,7 +12,7 @@ namespace msg::usrmsg::json void create_server_challenge_response(std::vector &msg, const std::string &original_challenge); - void create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl_hash); + void create_status_response(std::vector &msg); void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash); diff --git a/src/msg/usrmsg_common.hpp b/src/msg/usrmsg_common.hpp index 4117fce7..09e62a2f 100644 --- a/src/msg/usrmsg_common.hpp +++ b/src/msg/usrmsg_common.hpp @@ -9,7 +9,7 @@ namespace msg::usrmsg constexpr size_t CHALLENGE_LEN = 16; // Max no. of known peers to return in get status. - constexpr const size_t MAX_KNOWN_PEERS_INFO = 10; + constexpr const size_t MAX_KNOWN_PEERS_INFO = 16; // Message field names constexpr const char *FLD_HP_VERSION = "hp_version"; diff --git a/src/msg/usrmsg_parser.cpp b/src/msg/usrmsg_parser.cpp index 0ce4a945..355e6a7a 100644 --- a/src/msg/usrmsg_parser.cpp +++ b/src/msg/usrmsg_parser.cpp @@ -13,12 +13,12 @@ namespace msg::usrmsg { } - void usrmsg_parser::create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl_hash) const + void usrmsg_parser::create_status_response(std::vector &msg) const { if (protocol == util::PROTOCOL::JSON) - jusrmsg::create_status_response(msg, lcl_seq_no, lcl_hash); + jusrmsg::create_status_response(msg); else - busrmsg::create_status_response(msg, lcl_seq_no, lcl_hash); + busrmsg::create_status_response(msg); } void usrmsg_parser::create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, diff --git a/src/msg/usrmsg_parser.hpp b/src/msg/usrmsg_parser.hpp index 2a767bf5..089675b7 100644 --- a/src/msg/usrmsg_parser.hpp +++ b/src/msg/usrmsg_parser.hpp @@ -20,7 +20,7 @@ namespace msg::usrmsg public: usrmsg_parser(const util::PROTOCOL protocol); - void create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl_hash) const; + void create_status_response(std::vector &msg) const; void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash) const; diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 469cd7ae..c40e98f7 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -2,6 +2,7 @@ #include "../conf.hpp" #include "../crypto.hpp" #include "../util/util.hpp" +#include "../util/sequence_hash.hpp" #include "../hplog.hpp" #include "../msg/fbuf/common_helpers.hpp" #include "../msg/fbuf/p2pmsg_conversion.hpp" @@ -548,13 +549,4 @@ namespace p2p } } - /** - * This is a helper method for sequence_hash structure which enables printing it straight away. - */ - std::ostream &operator<<(std::ostream &output, const sequence_hash &seq_hash) - { - output << seq_hash.seq_no << "-" << seq_hash.hash; - return output; - } - } // namespace p2p \ No newline at end of file diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 5d43fd38..d17ce64b 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -4,6 +4,7 @@ #include "../pchheader.hpp" #include "../usr/user_input.hpp" #include "../util/h32.hpp" +#include "../util/sequence_hash.hpp" #include "../conf.hpp" #include "../hpfs/hpfs_mount.hpp" #include "../msg/fbuf/p2pmsg_generated.h" @@ -32,38 +33,6 @@ namespace p2p int64_t weight = 0; }; - struct sequence_hash - { - uint64_t seq_no = 0; - util::h32 hash = util::h32_empty; - - bool operator!=(const sequence_hash &seq_hash) const - { - return seq_no != seq_hash.seq_no || hash != seq_hash.hash; - } - - bool operator==(const sequence_hash &seq_hash) const - { - return seq_no == seq_hash.seq_no && hash == seq_hash.hash; - } - - bool operator<(const sequence_hash &seq_hash) const - { - return (seq_no == seq_hash.seq_no) ? hash < seq_hash.hash : seq_no < seq_hash.seq_no; - } - - const std::string to_string() - { - return std::to_string(seq_no) + "-" + util::to_hex(hash.to_string_view()); - } - - const bool empty() const - { - return seq_no == 0 && hash == util::h32_empty; - } - }; - // This is a helper method for sequence_hash structure which enables printing it straight away. - std::ostream &operator<<(std::ostream &output, const sequence_hash &seq_hash); struct proposal { @@ -75,8 +44,8 @@ namespace p2p uint8_t stage = 0; // The round-stage that this proposal belongs to. uint32_t time_config = 0; // Time config of the proposer. std::string nonce; // Random nonce that is used to reduce lcl predictability. - sequence_hash last_primary_shard_id; - sequence_hash last_raw_shard_id; + util::sequence_hash last_primary_shard_id; + util::sequence_hash last_raw_shard_id; util::h32 state_hash; // Contract state hash. util::h32 patch_hash; // Patch file hash. std::set users; @@ -120,7 +89,7 @@ namespace p2p struct npl_message { std::string pubkey; // Peer binary pubkey. - p2p::sequence_hash lcl_id; // lcl of the peer. + util::sequence_hash lcl_id; // lcl of the peer. std::string data; }; @@ -128,13 +97,13 @@ namespace p2p struct hpfs_log_request { uint64_t target_seq_no; - sequence_hash min_record_id; + util::sequence_hash min_record_id; }; // Represents hpfs log sync response. struct hpfs_log_response { - sequence_hash min_record_id; + util::sequence_hash min_record_id; std::vector log_record_bytes; }; diff --git a/src/p2p/peer_comm_server.cpp b/src/p2p/peer_comm_server.cpp index 241e52a1..c1b8eef9 100644 --- a/src/p2p/peer_comm_server.cpp +++ b/src/p2p/peer_comm_server.cpp @@ -6,6 +6,7 @@ #include "peer_comm_server.hpp" #include "peer_comm_session.hpp" #include "self_node.hpp" +#include "../status.hpp" namespace p2p { @@ -96,15 +97,27 @@ namespace p2p // Find already connected known remote parties list. std::vector known_remotes; + // Keeps challenge-verified known peers list. + std::set verified_remotes; + { std::scoped_lock lock(sessions_mutex); for (const p2p::peer_comm_session &session : sessions) { - if (session.state != comm::SESSION_STATE::CLOSED && session.known_ipport.has_value()) + if (!session.known_ipport) + continue; + + if (session.state != comm::SESSION_STATE::CLOSED) known_remotes.push_back(session.known_ipport.value()); + + if (session.challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED) + verified_remotes.emplace(session.known_ipport.value()); } } + // Update the central status holder. + status::set_peers(verified_remotes); + // Update global known remote count when new connections are made. known_remote_count = known_remotes.size(); diff --git a/src/sc/hpfs_log_sync.cpp b/src/sc/hpfs_log_sync.cpp index c3695685..17d2a4ce 100644 --- a/src/sc/hpfs_log_sync.cpp +++ b/src/sc/hpfs_log_sync.cpp @@ -1,4 +1,5 @@ #include "hpfs_log_sync.hpp" +#include "../util/sequence_hash.hpp" #include "../conf.hpp" #include "../crypto.hpp" #include "../ledger/ledger.hpp" @@ -23,7 +24,7 @@ namespace sc::hpfs_log_sync bool init_success = false; // Represent sequence number and the root hash of the genesis ledger. - p2p::sequence_hash genesis_seq_hash; + util::sequence_hash genesis_seq_hash; /** * Initialize log record syncer. @@ -269,7 +270,7 @@ namespace sc::hpfs_log_sync */ int get_verified_min_record() { - p2p::sequence_hash last_from_index; + util::sequence_hash last_from_index; if (sc::contract_fs.get_last_seq_no_from_index(last_from_index.seq_no) == -1 || sc::contract_fs.get_hash_from_index_by_seq_no(last_from_index.hash, last_from_index.seq_no) == -1) { @@ -277,7 +278,7 @@ namespace sc::hpfs_log_sync return -1; } - p2p::sequence_hash last_from_ledger = ledger::ctx.get_lcl_id(); + util::sequence_hash last_from_ledger = ledger::ctx.get_lcl_id(); if (last_from_index.seq_no == ledger::genesis.seq_no || last_from_ledger.seq_no == ledger::genesis.seq_no) { // Request full ledger. diff --git a/src/sc/hpfs_log_sync.hpp b/src/sc/hpfs_log_sync.hpp index 20e97e0b..90709294 100644 --- a/src/sc/hpfs_log_sync.hpp +++ b/src/sc/hpfs_log_sync.hpp @@ -16,7 +16,7 @@ namespace sc::hpfs_log_sync uint64_t target_log_seq_no; util::h32 target_root_hash; std::mutex target_log_seq_no_mutex; - p2p::sequence_hash min_log_record; + util::sequence_hash min_log_record; uint64_t target_requested_on = 0; uint16_t request_submissions = 0; diff --git a/src/sc/sc.hpp b/src/sc/sc.hpp index 5ce25fc3..64e1e59a 100644 --- a/src/sc/sc.hpp +++ b/src/sc/sc.hpp @@ -4,6 +4,7 @@ #include "../pchheader.hpp" #include "../usr/usr.hpp" #include "../util/h32.hpp" +#include "../util/sequence_hash.hpp" #include "../util/util.hpp" #include "../util/buffer_store.hpp" #include "../p2p/p2p.hpp" @@ -88,7 +89,7 @@ namespace sc uint64_t time = 0; // Current HotPocket lcl (seq no. and ledger hash hex) - p2p::sequence_hash lcl_id; + util::sequence_hash lcl_id; // State hash after execution will be copied to this (not applicable to read only mode). util::h32 post_execution_state_hash = util::h32_empty; diff --git a/src/status.cpp b/src/status.cpp new file mode 100644 index 00000000..77bc0d3f --- /dev/null +++ b/src/status.cpp @@ -0,0 +1,87 @@ +#include "status.hpp" +#include "util/sequence_hash.hpp" +#include "ledger/ledger_common.hpp" +#include "conf.hpp" + +namespace status +{ + moodycamel::ConcurrentQueue event_queue; + + std::shared_mutex ledger_mutex; + util::sequence_hash lcl_id; // Last ledger id/hash pair. + ledger::ledger_record last_ledger; // Last ledger record that the node created. + bool is_in_sync = false; // Indicates whether this node is in sync with other nodes or not. + + std::shared_mutex unl_mutex; + std::set unl; // List of last reported unl binary pubkeys. + + std::shared_mutex peers_mutex; + std::set peers; // Known ip:port pairs for connection verified peers. + + //----- Ledger status + + void init_ledger(const util::sequence_hash &ledger_id, const ledger::ledger_record &ledger) + { + // Not acquiring the mutex lock since this is called during startup only. + lcl_id = ledger_id; + last_ledger = ledger; + is_in_sync = true; + } + + void ledger_created(const util::sequence_hash &ledger_id, const ledger::ledger_record &ledger) + { + std::unique_lock lock(ledger_mutex); + lcl_id = ledger_id; + last_ledger = ledger; + is_in_sync = true; // Creating a ledger automatically means we are in sync. + } + + void sync_status_changed(const bool in_sync) + { + std::unique_lock lock(ledger_mutex); + is_in_sync = in_sync; + } + + const util::sequence_hash get_lcl_id() + { + std::shared_lock lock(ledger_mutex); + return lcl_id; + } + + //----- UNL status + + void init_unl(const std::set &init_unl) + { + // Not acquiring the mutex lock since this is called during startup only. + unl = init_unl; + } + + void unl_changed(const std::set &new_unl) + { + std::unique_lock lock(unl_mutex); + unl = new_unl; + + event_queue.try_enqueue(unl_change_event{unl}); + } + + const std::set get_unl() + { + std::shared_lock lock(unl_mutex); + return unl; + } + + //----- Peers status + + void set_peers(const std::set &updated_peers) + { + std::unique_lock lock(peers_mutex); + peers = std::move(updated_peers); + } + + const std::set get_peers() + { + std::unique_lock lock(peers_mutex); + return peers; + } + +} // namespace status \ No newline at end of file diff --git a/src/status.hpp b/src/status.hpp new file mode 100644 index 00000000..ebbf656b --- /dev/null +++ b/src/status.hpp @@ -0,0 +1,35 @@ +#ifndef _HP_STATUS_ +#define _HP_STATUS_ + +#include "pchheader.hpp" +#include "util/sequence_hash.hpp" +#include "ledger/ledger_common.hpp" +#include "conf.hpp" + +namespace status +{ + struct unl_change_event + { + std::set unl; + }; + + // Represents any kind of change that has happened in the node. + typedef std::variant change_event; + + extern moodycamel::ConcurrentQueue event_queue; + + void init_ledger(const util::sequence_hash &ledger_id, const ledger::ledger_record &ledger); + void ledger_created(const util::sequence_hash &ledger_id, const ledger::ledger_record &ledger); + void sync_status_changed(const bool in_sync); + const util::sequence_hash get_lcl_id(); + + void init_unl(const std::set &init_unl); + void unl_changed(const std::set &new_unl); + const std::set get_unl(); + + void set_peers(const std::set &updated_peers); + const std::set get_peers(); + +} // namespace status + +#endif diff --git a/src/unl.cpp b/src/unl.cpp index bd1cfdf1..662082b2 100644 --- a/src/unl.cpp +++ b/src/unl.cpp @@ -3,6 +3,7 @@ #include "conf.hpp" #include "unl.hpp" #include "crypto.hpp" +#include "status.hpp" /** * Manages the UNL public keys of this node. @@ -23,7 +24,8 @@ namespace unl return -1; std::unique_lock lock(unl_mutex); - update_unl_list(conf::cfg.contract.unl); + merge_latest_unl_config(); + status::init_unl(conf::cfg.contract.unl); return 0; } @@ -68,15 +70,13 @@ namespace unl bool is_unl_list_changed = false; { std::unique_lock lock(unl_mutex); - is_unl_list_changed = update_unl_list(conf::cfg.contract.unl); + is_unl_list_changed = merge_latest_unl_config(); } - // Update the is_unl flag of peer sessions. - // Broadcast changed unl list to all the connected users. if (is_unl_list_changed) { - p2p::update_unl_connections(); - usr::announce_unl_list(conf::cfg.contract.unl); + p2p::update_unl_connections(); // Update the is_unl flag of peer sessions. + status::unl_changed(conf::cfg.contract.unl); // Update the central node status holder. } } @@ -141,10 +141,10 @@ namespace unl } /** - * Updates the unl list using the provided new list. + * Updates the unl list using the latest config unl. * @return Whether or not any unl list changes were made. */ - bool update_unl_list(const std::set &new_list) + bool merge_latest_unl_config() { bool changes_made = false; @@ -176,7 +176,7 @@ namespace unl return false; // Update the prepared json list which will be fed into contract args. - json_list = prepare_json_list(new_list); + json_list = prepare_json_list(conf::cfg.contract.unl); // Update the own node's unl status. conf::cfg.node.is_unl = (list.count(conf::cfg.node.public_key) == 1); diff --git a/src/unl.hpp b/src/unl.hpp index c4a9cb78..32d3cef1 100644 --- a/src/unl.hpp +++ b/src/unl.hpp @@ -19,7 +19,7 @@ namespace unl void update_unl_changes_from_patch(); void update_time_config_stats(const std::list &proposals); uint32_t get_majority_time_config(); - bool update_unl_list(const std::set &new_list); + bool merge_latest_unl_config(); const std::string prepare_json_list(const std::set &new_list); } // namespace unl diff --git a/src/usr/user_comm_server.hpp b/src/usr/user_comm_server.hpp index 4ccf0403..961695e1 100644 --- a/src/usr/user_comm_server.hpp +++ b/src/usr/user_comm_server.hpp @@ -1,14 +1,27 @@ #ifndef _HP_USR_USER_COMM_SERVER_ #define _HP_USR_USER_COMM_SERVER_ +#include "../status.hpp" #include "../comm/comm_server.hpp" +#include "../msg/usrmsg_parser.hpp" +#include "usr.hpp" #include "user_comm_session.hpp" namespace usr { + // Forward declaration. Defined in usr.cpp. + void dispatch_change_events(); + class user_comm_server : public comm::comm_server { using comm::comm_server::comm_server; // Inherit constructors. + + protected: + int process_custom_messages() + { + usr::dispatch_change_events(); + return 0; + } }; } // namespace usr diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 7f912a7f..c297de87 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -3,12 +3,14 @@ #include "../msg/usrmsg_parser.hpp" #include "../msg/usrmsg_common.hpp" #include "../util/util.hpp" +#include "../util/sequence_hash.hpp" #include "../conf.hpp" #include "../crypto.hpp" #include "../hplog.hpp" #include "../ledger/ledger.hpp" #include "../util/buffer_store.hpp" #include "../hpfs/hpfs_mount.hpp" +#include "../status.hpp" #include "usr.hpp" #include "user_session_handler.hpp" #include "user_comm_session.hpp" @@ -174,7 +176,7 @@ namespace usr uint64_t max_ledger_seq_no; if (parser.extract_input_container(input_data, nonce, max_ledger_seq_no, input_container) != -1) { - const p2p::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); + const util::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); // Ignore the input if the max ledger seq number specified is beyond the max offeset. if (conf::cfg.contract.max_input_ledger_offset != 0 && max_ledger_seq_no > lcl_id.seq_no + conf::cfg.contract.max_input_ledger_offset) { @@ -233,8 +235,7 @@ namespace usr else if (msg_type == msg::usrmsg::MSGTYPE_STAT) { std::vector resp; - const p2p::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); - parser.create_status_response(resp, lcl_id.seq_no, lcl_id.hash.to_string_view()); + parser.create_status_response(resp); user.session.send(resp); return 0; } @@ -529,22 +530,36 @@ namespace usr } /** - * Send unl list to all the connected users. - * @param unl_list Set of unl pubkeys. - */ - void announce_unl_list(const std::set &unl_list) + * Sends any change event notifications to relevant users who are currently connected to the node. + */ + void dispatch_change_events() { - std::scoped_lock lock(ctx.users_mutex); - - for (const auto &user : ctx.users) + status::change_event ev; + while (status::event_queue.try_dequeue(ev)) { - const usr::connected_user &connected_user = user.second; - msg::usrmsg::usrmsg_parser parser(connected_user.protocol); + // Array to hold constructed message cache from each protocol. + std::vector protocol_msgs[2]; - std::vector msg; - parser.create_unl_list_container(msg, unl_list); + if (ev.index() == 0) // UNL change event. Broadcast for all users. + { + const status::unl_change_event &unl_ev = std::get(ev); - connected_user.session.send(msg); + std::scoped_lock lock(ctx.users_mutex); + for (auto &[sid, user] : ctx.users) + { + std::vector &msg = protocol_msgs[user.protocol]; + if (msg.empty()) // Construct the message with relevant protocol if not done so already. + { + msg::usrmsg::usrmsg_parser parser(user.protocol); + parser.create_unl_list_container(msg, unl_ev.unl); + } + user.session.send(msg); + } + + // Clear the caches for the next event. + protocol_msgs[util::PROTOCOL::JSON].clear(); + protocol_msgs[util::PROTOCOL::BSON].clear(); + } } } diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index a81ac859..141af417 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -101,7 +101,7 @@ namespace usr bool verify_appbill_check(std::string_view pubkey, const size_t input_len); - void announce_unl_list(const std::set &unl_list); + void dispatch_change_events(); } // namespace usr diff --git a/src/util/sequence_hash.cpp b/src/util/sequence_hash.cpp new file mode 100644 index 00000000..7a40ef13 --- /dev/null +++ b/src/util/sequence_hash.cpp @@ -0,0 +1,36 @@ +#include "sequence_hash.hpp" + +namespace util +{ + bool sequence_hash::operator!=(const sequence_hash &seq_hash) const + { + return seq_no != seq_hash.seq_no || hash != seq_hash.hash; + } + + bool sequence_hash::operator==(const sequence_hash &seq_hash) const + { + return seq_no == seq_hash.seq_no && hash == seq_hash.hash; + } + + bool sequence_hash::operator<(const sequence_hash &seq_hash) const + { + return (seq_no == seq_hash.seq_no) ? hash < seq_hash.hash : seq_no < seq_hash.seq_no; + } + + const std::string sequence_hash::to_string() + { + return std::to_string(seq_no) + "-" + util::to_hex(hash.to_string_view()); + } + + const bool sequence_hash::empty() const + { + return seq_no == 0 && hash == util::h32_empty; + } + + std::ostream &operator<<(std::ostream &output, const sequence_hash &seq_hash) + { + output << seq_hash.seq_no << "-" << seq_hash.hash; + return output; + } + +} // namespace util \ No newline at end of file diff --git a/src/util/sequence_hash.hpp b/src/util/sequence_hash.hpp new file mode 100644 index 00000000..fec8d21f --- /dev/null +++ b/src/util/sequence_hash.hpp @@ -0,0 +1,27 @@ +#ifndef _HP_UTIL_SEQUENCE_HASH_ +#define _HP_UTIL_SEQUENCE_HASH_ + +#include "../pchheader.hpp" +#include "util.hpp" +#include "h32.hpp" + +namespace util +{ + struct sequence_hash + { + uint64_t seq_no = 0; + util::h32 hash = util::h32_empty; + + bool operator!=(const sequence_hash &seq_hash) const; + bool operator==(const sequence_hash &seq_hash) const; + bool operator<(const sequence_hash &seq_hash) const; + const std::string to_string(); + const bool empty() const; + }; + + // This is a helper method for sequence_hash structure which enables printing it straight away. + std::ostream &operator<<(std::ostream &output, const sequence_hash &seq_hash); + +} // namespace util + +#endif \ No newline at end of file