diff --git a/src/conf.hpp b/src/conf.hpp index 4e770e55..90fe1f7a 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -125,7 +125,7 @@ namespace conf uint64_t max_bad_msgs_per_min = 0; // User bad messages per minute uint16_t max_connections = 0; // Max inbound user connections uint16_t max_in_connections_per_host = 0; // Max inbound user connections per remote host (IP). - uint64_t concurrent_read_reqeuests = 10; // Supported concurrent read requests count. + uint64_t concurrent_read_reqeuests = 4; // Supported concurrent read requests count. }; struct peer_discovery_config diff --git a/src/consensus.cpp b/src/consensus.cpp index b3018591..72c51404 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -14,6 +14,7 @@ #include "util/sequence_hash.hpp" #include "unl.hpp" #include "ledger/ledger.hpp" +#include "ledger/ledger_query.hpp" #include "consensus.hpp" #include "sc/hpfs_log_sync.hpp" #include "status.hpp" @@ -106,7 +107,7 @@ namespace consensus // Throughout consensus, we continously update and prune the candidate proposals for newly // arived ones and expired ones. - revise_candidate_proposals(ctx.sync_status == 0); + revise_candidate_proposals(ctx.vote_status == VOTES_SYNCED); // Get current lcl, state, patch, primary shard and raw shard info. util::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); @@ -146,28 +147,43 @@ namespace consensus { int new_sync_status = check_sync_status(unl_count, votes, lcl_id); - if (ctx.sync_status != 0 && new_sync_status == 0) + if (ctx.vote_status != VOTES_SYNCED && new_sync_status == VOTES_UNRELIABLE) { - // If we are just becoming 'in-sync' after being out-of-sync, check the sync status again after the proper + // If we are just becoming 'in-sync' after being out-of-sync, check the vote status again after the proper // pruning of candidate proposals. This is because we relax the proposal pruning rules when we are not in sync, - // and we need to make the final sync status check after proper pruning rules are applied. + // and we need to make the final vote status check after proper pruning rules are applied. - LOG_DEBUG << "Rechecking sync status after becoming in-sync."; + LOG_DEBUG << "Rechecking vote status after becoming in-sync."; revise_candidate_proposals(true); new_sync_status = check_sync_status(unl_count, votes, lcl_id); } - // Update the sync status if we went from in-sync to not-in-sync. We will report back as being in-sync - // only when we hit stage 3. - if (ctx.sync_status == 0 && new_sync_status != 0) + // Update the node's status if we went from in-sync to not-in-sync. We will report back as being in-sync only when ledger is created. + if (ctx.vote_status == VOTES_SYNCED && new_sync_status != VOTES_SYNCED) status::sync_status_changed(false); - ctx.sync_status = new_sync_status; + // This marks entering into a new sync cycle. + if (new_sync_status == VOTES_DESYNC && !ctx.sync_ongoing) + { + // Cleanup any unconsensed contract outputs we may have had before the sync cycle began because those are going to be + // irrelavant after the sync. + cleanup_output_collections(); + ctx.sync_ongoing = true; + } + + // If we just bacame in-sync after being in desync, we need to restore consensus context information from the synced ledger. + if (ctx.vote_status != VOTES_SYNCED && new_sync_status == VOTES_SYNCED && ctx.sync_ongoing) + dispatch_synced_ledger_input_statuses(lcl_id); + + ctx.vote_status = new_sync_status; } - if (ctx.sync_status == -2) // Unreliable votes. + if (ctx.vote_status == VOTES_UNRELIABLE) { ctx.unreliable_votes_attempts++; + + // If we get too many consecative unreliable vote rounds, then we perform time config sniffing just in case the unreliable votes + // are caused because our roundtime config information is different from other nodes. if (ctx.unreliable_votes_attempts >= MAX_UNRELIABLE_VOTES_ATTEMPTS) { refresh_time_config(true); @@ -179,16 +195,23 @@ namespace consensus ctx.unreliable_votes_attempts = 0; } - if (ctx.sync_status == 0) + if (ctx.vote_status == VOTES_SYNCED) { // If we are in sync, vote and broadcast the winning votes to next stage. const p2p::proposal p = create_stage123_proposal(votes, unl_count, state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id); broadcast_proposal(p); - // Upon successful consensus at stage 3, update the ledger and execute the contract using the consensus proposal. - if (ctx.stage == 3) + // This marks the moment we finish a sync cycle. We are in stage 1 and we ditect that our votes are in sync. + if (ctx.stage == 1 && ctx.sync_ongoing) { - status::sync_status_changed(true); // Creating a new ledger means we are in sync. + // Clear any sync recovery pending state if we enter stage 1 while being in sync. + ctx.sync_ongoing = false; + status::sync_status_changed(true); + LOG_DEBUG << "Sync recovery completed."; + } + else if (ctx.stage == 3) + { + // Upon successful consensus at stage 3, update the ledger and execute the contract using the consensus proposal. consensed_user_map consensed_users; if (prepare_consensed_users(consensed_users, p) == -1 || @@ -219,12 +242,22 @@ namespace consensus */ int commit_consensus_results(const p2p::proposal &cons_prop, const consensus::consensed_user_map &consensed_users, const util::h32 &patch_hash) { + // Creating a ledger while sync ongoing happens when we discover that our ledger votes are in sync at stage 2 or 3. At this point, + // we can create the ledger with majority votes. However we dont't have the raw contract outputs we should have had in the previous ledger + // (because we were syncing the ledger and didn't execute the contract). So after this ledger creation we will most probably get a raw ledger + // desync again because our raw outputs were different from other nodes. Therefore we don't consider this a proper/synced ledger creation until + // we are fully out of the sync cycle. Hence we pass the sync_ongoing flag to indicate whether we are still inside a sync cycle or not. + // Sync cycle is considered trully complete after the raw ledger is synced again and we discover in next round Stage 1 that our ledger votes + // are in sync. + // Persist the new ledger with the consensus results. - if (ledger::update_ledger(cons_prop, consensed_users) == -1) + if (ledger::update_ledger(cons_prop, consensed_users, ctx.sync_ongoing) == -1) return -1; util::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); - LOG_INFO << "****Ledger created**** (lcl:" << lcl_id << " state:" << cons_prop.state_hash << " patch:" << cons_prop.patch_hash << ")"; + + if (!ctx.sync_ongoing) + LOG_INFO << "****Ledger created**** (lcl:" << lcl_id << " state:" << cons_prop.state_hash << " patch:" << cons_prop.patch_hash << ")"; // Now that there's a new ledger, prune any newly-expired candidate inputs. expire_candidate_inputs(lcl_id); @@ -240,7 +273,7 @@ namespace consensus return -1; // Execute the smart contract with the consensed user inputs. - if (execute_contract(cons_prop, consensed_users, lcl_id) == -1) + if (execute_contract(cons_prop.time, consensed_users, lcl_id) == -1) return -1; return 0; @@ -331,16 +364,14 @@ namespace consensus // Proceed further only if last primary shard, last raw shard, state and patch hashes are in sync with majority. if (!is_last_primary_shard_desync && !is_last_raw_shard_desync && !is_state_desync && !is_patch_desync) - { - return 0; - } + return VOTES_SYNCED; // Last primary shard hash, last raw shard hash, patch or state desync. - return -1; + return VOTES_DESYNC; } // Majority last primary shard hash couldn't be detected reliably. - return -2; + return VOTES_UNRELIABLE; } /** @@ -460,8 +491,10 @@ namespace consensus } // If final elected output hash matches our output hash, move the outputs into consensed outputs. + // However, do not perform the safety matching check if we have just completed a sync cycle as we will not possess the outputs + // generated during the previous ledger. { - if (cons_prop.output_hash == ctx.user_outputs_hashtree.root_hash()) + if (ctx.sync_ongoing || cons_prop.output_hash == ctx.user_outputs_hashtree.root_hash()) { for (const auto &[hash, gen_output] : ctx.generated_user_outputs) { @@ -474,7 +507,7 @@ namespace consensus } else { - LOG_WARNING << "Consenses output hash didn't match our output hash."; + LOG_WARNING << "Consensus output hash didn't match our output hash."; ret = -1; } } @@ -483,15 +516,20 @@ namespace consensus } /** - * Removes any candidate inputs that has lived passed the current ledger seq no. + * Removes any candidate inputs that has lived past the current ledger seq no. */ void expire_candidate_inputs(const util::sequence_hash &lcl_id) { + std::unordered_map> rejections; + auto itr = ctx.candidate_user_inputs.begin(); while (itr != ctx.candidate_user_inputs.end()) { if (itr->second.max_ledger_seq_no <= lcl_id.seq_no) { + const std::string input_hash = std::string(util::get_string_suffix(itr->first, BLAKE3_OUT_LEN)); + rejections[itr->second.user_pubkey].push_back(usr::input_status_response{input_hash, msg::usrmsg::REASON_MAX_LEDGER_EXPIRED}); + // Erase the candidate input along with its data buffer in the input store. usr::input_store.purge(itr->second.input); ctx.candidate_user_inputs.erase(itr++); @@ -501,6 +539,9 @@ namespace consensus ++itr; } } + + // Inform any connected users about their expired inputs. + usr::send_input_status_responses(rejections); } /** @@ -1053,11 +1094,11 @@ namespace consensus /** * Executes the contract after consensus. - * @param cons_prop The proposal that reached consensus. + * @param time The consensus time. * @param consensed_users Consensed users and their inputs. * @param lcl_id Current lcl id of the node. */ - int execute_contract(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id) + int execute_contract(const uint64_t time, const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id) { if (!conf::cfg.contract.execute || ctx.is_shutting_down) return 0; @@ -1069,7 +1110,7 @@ namespace consensus sc::contract_execution_args &args = ctx.contract_ctx->args; args.readonly = false; - args.time = cons_prop.time; + args.time = time; // lcl to be passed to the contract. args.lcl_id = lcl_id; @@ -1180,6 +1221,39 @@ namespace consensus cleanup_output_collections(); } + /** + * Dispatches any input responses corresponsing to candidate inputs that we have been holding during while syncing. + */ + void dispatch_synced_ledger_input_statuses(const util::sequence_hash &lcl_id) + { + // Find out any inputs we were holding that may have made their way into the ledger while we were syncing, + // and reply with 'accepted' input statuses if the user is conencted to us. + std::unordered_map> responses; + auto itr = ctx.candidate_user_inputs.begin(); + while (itr != ctx.candidate_user_inputs.end()) + { + std::string_view ordered_hash = itr->first; + const std::string input_hash = std::string(util::get_string_suffix(ordered_hash, BLAKE3_OUT_LEN)); + std::optional input; + std::optional ledger; + if (ledger::query::get_input_by_hash(lcl_id.seq_no, input_hash, input, ledger) != -1 && input) + { + // Each 'accepted' status response must be associated with the ledger seqno/hash that contained the input. + responses[itr->second.user_pubkey].push_back(usr::input_status_response{ + input_hash, NULL, input->ledger_seq_no, *(util::h32 *)ledger->ledger_hash.data()}); + + // Erase the candidate input since we no longer need to hold it. + ctx.candidate_user_inputs.erase(itr++); + } + else + { + ++itr; + } + } + + usr::send_input_status_responses(responses); + } + /** * Transfers consensus-reached inputs into the provided contract buf map so it can be fed into the contract process. * @param bufmap The contract bufmap which needs to be populated with inputs. diff --git a/src/consensus.hpp b/src/consensus.hpp index 5a609280..2a22b23d 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -80,6 +80,11 @@ namespace consensus } }; +#define VOTES_UNKNOWN -3 +#define VOTES_UNRELIABLE -2 +#define VOTES_DESYNC -1 +#define VOTES_SYNCED 0 + /** * This is used to store consensus information */ @@ -112,7 +117,11 @@ namespace consensus uint32_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage. uint64_t round_boundry_offset = 0; // Time window boundry offset based on contract id. uint16_t unreliable_votes_attempts = 0; // No. of times we failed to get reliable votes continously. - int sync_status = 0; // Current sync status. + int vote_status = VOTES_UNKNOWN; // Current status of votes. + + // Indicates whether we are inside a sync cycle or not. Sync cycle is considered to being when we first detect that we are out of sync + // and considered to end when we detect to be in sync inside stage 1 of a round for the first time after we began a sync. + bool sync_ongoing = false; std::optional contract_ctx; std::mutex contract_ctx_mutex; @@ -195,12 +204,14 @@ namespace consensus uint64_t get_stage_time_resolution(const uint64_t time); - int execute_contract(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id); + int execute_contract(const uint64_t time, const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id); void dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id); void dispatch_consensed_user_outputs(const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id); + void dispatch_synced_ledger_input_statuses(const util::sequence_hash &lcl_id); + void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const consensed_user_map &consensed_users); void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap); diff --git a/src/ledger/ledger.cpp b/src/ledger/ledger.cpp index 5c083f5f..0b04fc83 100644 --- a/src/ledger/ledger.cpp +++ b/src/ledger/ledger.cpp @@ -105,14 +105,14 @@ namespace ledger * @param consensed_users Users and their raw inputs/outputs received in this consensus round. * @return Returns 0 on success -1 on error. */ - int update_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users) + int update_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const bool sync_recovery_pending) { // Acquire hpfs rw session before writing into shards. if (ledger_fs.acquire_rw_session() == -1) return -1; util::sequence_hash lcl_id; - if (update_primary_ledger(proposal, consensed_users, lcl_id) == -1 || + if (update_primary_ledger(proposal, consensed_users, sync_recovery_pending, lcl_id) == -1 || update_ledger_raw_data(proposal, consensed_users, lcl_id) == -1) { ledger_fs.release_rw_session(); @@ -129,7 +129,7 @@ namespace ledger * @param new_lcl_id The new ledger seq no. and hash. * @return 0 on success. -1 on failure. */ - int update_primary_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, util::sequence_hash &new_lcl_id) + int update_primary_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const bool sync_recovery_pending, util::sequence_hash &new_lcl_id) { const util::sequence_hash lcl_id = ctx.get_lcl_id(); new_lcl_id.seq_no = lcl_id.seq_no + 1; @@ -169,8 +169,9 @@ namespace ledger if (shard_res == 1) remove_old_shards(new_lcl_id.seq_no, PRIMARY_SHARD_SIZE, conf::cfg.node.history_config.max_primary_shards, PRIMARY_DIR); - // Update the node's status. - status::ledger_created(new_lcl_id, ledger); + // Update the node's status if we are not inside a sync recovery cycle. + if (!sync_recovery_pending) + status::ledger_created(new_lcl_id, ledger); return 0; } diff --git a/src/ledger/ledger.hpp b/src/ledger/ledger.hpp index 6cff336a..af9a51b0 100644 --- a/src/ledger/ledger.hpp +++ b/src/ledger/ledger.hpp @@ -70,9 +70,9 @@ namespace ledger void deinit(); - int update_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users); + int update_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const bool sync_recovery_pending); - int update_primary_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, util::sequence_hash &new_lcl_id); + int update_primary_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const bool sync_recovery_pending, util::sequence_hash &new_lcl_id); int update_ledger_raw_data(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const util::sequence_hash &lcl_id); @@ -98,7 +98,6 @@ namespace ledger int persist_max_shard_seq_no(const std::string &shard_parent_dir, const uint64_t last_shard_seq_no); int get_root_hash_from_ledger(util::h32 &root_hash, const uint64_t seq_no); - } // namespace ledger #endif diff --git a/src/ledger/ledger_common.hpp b/src/ledger/ledger_common.hpp index 9a80e982..7ff57335 100644 --- a/src/ledger/ledger_common.hpp +++ b/src/ledger/ledger_common.hpp @@ -24,7 +24,7 @@ namespace ledger uint64_t nonce; // Nonce the user had submitted for this input. off_t blob_offset; // Blob file offset of this input blob. size_t blob_size; // Length of the input. - std::string blob; // THe actual input blob. + std::string blob; // The actual input blob. }; /** diff --git a/src/ledger/ledger_query.cpp b/src/ledger/ledger_query.cpp index a02c2099..40be6724 100644 --- a/src/ledger/ledger_query.cpp +++ b/src/ledger/ledger_query.cpp @@ -44,6 +44,9 @@ namespace ledger::query // Fill raw data if required. if (seq_q.inputs || seq_q.outputs) { + // Do not return other users' blobs if consensus is private. + const std::string filter_user = conf::cfg.contract.is_consensus_public ? "" : std::string(user_pubkey); + for (ledger_record &ledger : ledgers) { if (seq_q.inputs) @@ -52,7 +55,7 @@ namespace ledger::query ledger.outputs = std::vector(); // No need to actually query raw data for genesis ledger. - if (seq_q.seq_no == 0 || get_ledger_raw_data(ledger, user_pubkey, fs_sess_name) != -1) + if (seq_q.seq_no == 0 || get_ledger_raw_data(ledger, filter_user, fs_sess_name) != -1) res = ledgers; } } @@ -84,7 +87,7 @@ namespace ledger::query } // Construct shard path based on provided ledger seq no. - const uint64_t shard_seq_no = (q.seq_no - 1) / ledger::PRIMARY_SHARD_SIZE; + const uint64_t shard_seq_no = SHARD_SEQ(q.seq_no, ledger::PRIMARY_SHARD_SIZE); const std::string db_vpath = std::string(ledger::PRIMARY_DIR) + "/" + std::to_string(shard_seq_no) + "/" + ledger::PRIMARY_DB; const std::string db_path = ledger::ledger_fs.physical_path(fs_sess_name, db_vpath); @@ -103,7 +106,7 @@ namespace ledger::query /** * Retrieve user inputs and outputs by ledger seq no. If consensus is private, this only fills blobs of the requesting user. * @param ledger Ledger record to populate with inputs and outputs. - * @param user_pubkey Binary pubkey of the user executing the query. + * @param user_pubkey Binary user pubkey. If not empty, include raw data only for this user. * @param fs_sess_name The ledger hosting fs session name. * @returns 0 on success. -1 on failure. */ @@ -113,7 +116,7 @@ namespace ledger::query if (!ledger.inputs && !ledger.outputs) return 0; - const uint64_t shard_seq_no = (ledger.seq_no - 1) / ledger::RAW_SHARD_SIZE; + const uint64_t shard_seq_no = SHARD_SEQ(ledger.seq_no, ledger::RAW_SHARD_SIZE); const std::string shard_path = ledger::ledger_fs.physical_path(fs_sess_name, std::string(ledger::RAW_DIR) + "/" + std::to_string(shard_seq_no) + "/"); const std::string db_path = shard_path + RAW_DB; @@ -141,7 +144,7 @@ namespace ledger::query * @param inputs User input collection to populate. * @param seq_no Ledger seq no. to query. * @param shard_path The shard physical path. - * @param user_pubkey Binary pubkey of the user executing the query. + * @param user_pubkey Binary user pubkey. If not empty, include raw data only for this user. * @param fs_sess_name The ledger hosting fs session name. * @returns 0 on success. -1 on failure. */ @@ -163,8 +166,8 @@ namespace ledger::query for (ledger_user_input &inp : inputs) { - // Do not return other users' blobs if consensus is private. - if (!conf::cfg.contract.is_consensus_public && inp.pubkey != user_pubkey) + // Apply user filter. + if (!user_pubkey.empty() && inp.pubkey != user_pubkey) continue; inp.blob.resize(inp.blob_size); @@ -185,7 +188,7 @@ namespace ledger::query * @param outputs User output collection to populate. * @param seq_no Ledger seq no. to query. * @param shard_path The shard physical path. - * @param user_pubkey Binary pubkey of the user executing the query. + * @param user_pubkey Binary user pubkey. If not empty, include raw data only for this user. * @param fs_sess_name The ledger hosting fs session name. * @returns 0 on success. -1 on failure. */ @@ -208,8 +211,8 @@ namespace ledger::query // Loop through each user's blob groups. for (ledger_user_output &user : outputs) { - // Do not return other users' blobs if consensus is private. - if (!conf::cfg.contract.is_consensus_public && user.pubkey != user_pubkey) + // Apply user filter. + if (!user_pubkey.empty() && user.pubkey != user_pubkey) continue; // Output blobs for each user are grouped. Group header contains all individual blob offsets and sizes @@ -246,4 +249,124 @@ namespace ledger::query close(fd); return 0; } + + /** + * Loads inputs and connected users from the specified ledger. + */ + int get_input_users_from_ledger(const uint64_t seq_no, std::vector &users, std::vector &inputs) + { + const char *session_name = "input_users"; + if (ledger_fs.start_ro_session(session_name, false) == -1) + return -1; + + const uint64_t shard_seq_no = SHARD_SEQ(seq_no, ledger::RAW_SHARD_SIZE); + const std::string shard_path = ledger::ledger_fs.physical_path(session_name, std::string(ledger::RAW_DIR) + "/" + std::to_string(shard_seq_no) + "/"); + const std::string db_path = shard_path + RAW_DB; + + sqlite3 *db = NULL; + if (sqlite::open_db(db_path, &db) == -1) + { + LOG_ERROR << errno << ": Error openning the shard database for input_users, shard: " << shard_seq_no; + ledger_fs.stop_ro_session(session_name); + return -1; + } + + if (sqlite::get_users_by_seq_no(db, seq_no, users) == -1 || + sqlite::get_user_inputs_by_seq_no(db, seq_no, inputs) == -1) + { + LOG_ERROR << errno << ": Error querying ledger input_users, seq_no: " << seq_no; + sqlite::close_db(&db); + ledger_fs.stop_ro_session(session_name); + return -1; + } + + sqlite::close_db(&db); + ledger_fs.stop_ro_session(session_name); + return 0; + } + + /** + * Attempts to find the provided input hash by scanning all the shards starting with the latest shard. + * @param lcl_seq_no Latest ledger seq no. used to determine latest shard. + * @param hash Input hash to find. + * @param input Popualted input data, if found. + * @param ledger Popualted ledger data which contains the input, if input found. + * @return 0 on successful attempt. -1 on error. + */ + int get_input_by_hash(const uint64_t lcl_seq_no, std::string_view hash, std::optional &input, std::optional &ledger) + { + const char *session_name = "input_by_hash"; + if (ledger_fs.start_ro_session(session_name, false) == -1) + return -1; + + // Search all the shards starting with last shard for the input hash. + uint64_t raw_shard = SHARD_SEQ(lcl_seq_no, ledger::RAW_SHARD_SIZE); + while (raw_shard >= 0 && !input) + { + const std::string shard_path = ledger::ledger_fs.physical_path(session_name, std::string(ledger::RAW_DIR) + "/" + std::to_string(raw_shard) + "/"); + const std::string db_path = shard_path + RAW_DB; + + if (!util::is_file_exists(db_path)) + { + ledger_fs.stop_ro_session(session_name); + return 0; // Shard not found. So we abandon the search. + } + + sqlite3 *db = NULL; + if (sqlite::open_db(db_path, &db) == -1) + { + LOG_ERROR << errno << ": Error openning the raw shard database to find input hash, shard: " << raw_shard; + ledger_fs.stop_ro_session(session_name); + return -1; + } + + if (sqlite::get_user_input_by_hash(db, hash, input) == -1) + { + LOG_ERROR << errno << ": Error finding input hash in shard " << raw_shard; + sqlite::close_db(&db); + ledger_fs.stop_ro_session(session_name); + return -1; + } + + sqlite::close_db(&db); + + if (raw_shard == 0) + break; + + // Keep scanning shards backwards. + raw_shard--; + } + + if (input) + { + // Find the ledger containing the input. + const uint64_t primary_shard = SHARD_SEQ(input->ledger_seq_no, ledger::PRIMARY_SHARD_SIZE); + const std::string shard_path = ledger::ledger_fs.physical_path(session_name, std::string(ledger::PRIMARY_DIR) + "/" + std::to_string(primary_shard) + "/"); + const std::string db_path = shard_path + PRIMARY_DB; + + sqlite3 *db = NULL; + if (sqlite::open_db(db_path, &db) == -1) + { + LOG_ERROR << errno << ": Error openning the primary database to find ledger hash for input, shard: " << primary_shard; + ledger_fs.stop_ro_session(session_name); + return -1; + } + + ledger::ledger_record rec; + if (sqlite::get_ledger_by_seq_no(db, input->ledger_seq_no, rec) != 1) + { + LOG_ERROR << errno << ": Error getting ledger for input in shard " << raw_shard; + sqlite::close_db(&db); + ledger_fs.stop_ro_session(session_name); + return -1; + } + + ledger = std::move(rec); + + sqlite::close_db(&db); + } + + ledger_fs.stop_ro_session(session_name); + return 0; + } } \ No newline at end of file diff --git a/src/ledger/ledger_query.hpp b/src/ledger/ledger_query.hpp index 1ecd3e6c..a8302abb 100644 --- a/src/ledger/ledger_query.hpp +++ b/src/ledger/ledger_query.hpp @@ -30,7 +30,8 @@ namespace ledger::query int get_ledger_raw_data(ledger_record &ledger, std::string_view user_pubkey, const std::string &fs_sess_name); int get_ledger_inputs(sqlite3 *db, std::vector &inputs, const uint64_t seq_no, const std::string &shard_path, std::string_view user_pubkey, const std::string &fs_sess_name); int get_ledger_outputs(sqlite3 *db, std::vector &outputs, const uint64_t seq_no, const std::string &shard_path, std::string_view user_pubkey, const std::string &fs_sess_name); - + int get_input_users_from_ledger(const uint64_t seq_no, std::vector &users, std::vector &inputs); + int get_input_by_hash(const uint64_t lcl_seq_no, std::string_view hash, std::optional &input, std::optional &ledger); } #endif \ No newline at end of file diff --git a/src/ledger/sqlite.cpp b/src/ledger/sqlite.cpp index 7f5a1791..915eff96 100644 --- a/src/ledger/sqlite.cpp +++ b/src/ledger/sqlite.cpp @@ -28,9 +28,10 @@ namespace ledger::sqlite constexpr const char *AND = " AND "; constexpr const char *SELECT_LAST_LEDGER = "SELECT * FROM ledger ORDER BY seq_no DESC LIMIT 1"; constexpr const char *SELECT_LEDGER_BY_SEQ_NO = "SELECT * FROM ledger WHERE seq_no=? LIMIT 1"; - + constexpr const char *SELECT_USERS_BY_SEQ_NO = "SELECT * FROM users WHERE ledger_seq_no=?"; constexpr const char *SELECT_INPUTS_BY_SEQ_NO = "SELECT * FROM inputs WHERE ledger_seq_no=?"; constexpr const char *SELECT_OUTPUTS_BY_SEQ_NO = "SELECT * FROM outputs WHERE ledger_seq_no=?"; + constexpr const char *SELECT_INPUT_BY_HASH = "SELECT * FROM inputs WHERE hash=?"; constexpr const char *INSERT_INTO_LEDGER = "INSERT INTO ledger(" "seq_no, time, ledger_hash, prev_ledger_hash, data_hash," @@ -579,6 +580,25 @@ namespace ledger::sqlite return -1; } + int get_users_by_seq_no(sqlite3 *db, const uint64_t seq_no, std::vector &users) + { + sqlite3_stmt *stmt; + + if (sqlite3_prepare_v2(db, SELECT_USERS_BY_SEQ_NO, -1, &stmt, 0) == SQLITE_OK && stmt != NULL && + sqlite3_bind_int64(stmt, 1, seq_no) == SQLITE_OK) + { + while (sqlite3_step(stmt) == SQLITE_ROW) + users.push_back(GET_PUBKEY_BLOB(1)); + + sqlite3_finalize(stmt); + return 0; + } + + LOG_ERROR << "Error when querying ledger users by seq no. from db. " << sqlite3_errmsg(db); + sqlite3_finalize(stmt); + return -1; + } + int get_user_inputs_by_seq_no(sqlite3 *db, const uint64_t seq_no, std::vector &inputs) { sqlite3_stmt *stmt; @@ -617,6 +637,25 @@ namespace ledger::sqlite return -1; } + int get_user_input_by_hash(sqlite3 *db, std::string_view hash, std::optional &input) + { + sqlite3_stmt *stmt; + + if (sqlite3_prepare_v2(db, SELECT_INPUT_BY_HASH, -1, &stmt, 0) == SQLITE_OK && stmt != NULL && + BIND_H32_BLOB(1, hash)) + { + if (sqlite3_step(stmt) == SQLITE_ROW) + input = populate_user_input_from_sql_record(stmt); + + sqlite3_finalize(stmt); + return 0; + } + + LOG_ERROR << "Error when querying ledger inputs by hash. from db. " << sqlite3_errmsg(db); + sqlite3_finalize(stmt); + return -1; + } + void populate_ledger_from_sql_record(ledger::ledger_record &ledger, sqlite3_stmt *stmt) { ledger.seq_no = sqlite3_column_int64(stmt, 0); diff --git a/src/ledger/sqlite.hpp b/src/ledger/sqlite.hpp index 87fa143f..f26ff539 100644 --- a/src/ledger/sqlite.hpp +++ b/src/ledger/sqlite.hpp @@ -40,7 +40,7 @@ namespace ledger::sqlite }; // Generic methods. - int open_db(std::string_view db_name, sqlite3 **db, const bool writable = false, const bool journal = true); + int open_db(std::string_view db_name, sqlite3 **db, const bool writable = false, const bool journal = false); int exec_sql(sqlite3 *db, std::string_view sql, int (*callback)(void *, int, char **, char **) = NULL, void *callback_first_arg = NULL); @@ -89,10 +89,14 @@ namespace ledger::sqlite int get_ledger_by_seq_no(sqlite3 *db, const uint64_t seq_no, ledger::ledger_record &ledger); + int get_users_by_seq_no(sqlite3 *db, const uint64_t seq_no, std::vector &users); + int get_user_inputs_by_seq_no(sqlite3 *db, const uint64_t seq_no, std::vector &inputs); int get_user_outputs_by_seq_no(sqlite3 *db, const uint64_t seq_no, std::vector &outputs); + int get_user_input_by_hash(sqlite3 *db, std::string_view hash, std::optional &input); + void populate_ledger_from_sql_record(ledger::ledger_record &ledger, sqlite3_stmt *stmt); ledger::ledger_user_input populate_user_input_from_sql_record(sqlite3_stmt *stmt); diff --git a/src/sc/hpfs_log_sync.cpp b/src/sc/hpfs_log_sync.cpp index 17d2a4ce..920754cc 100644 --- a/src/sc/hpfs_log_sync.cpp +++ b/src/sc/hpfs_log_sync.cpp @@ -411,7 +411,7 @@ namespace sc::hpfs_log_sync do { - const uint64_t shard_seq_no = (current_seq_no - 1) / ledger::PRIMARY_SHARD_SIZE; + const uint64_t shard_seq_no = SHARD_SEQ(current_seq_no, ledger::PRIMARY_SHARD_SIZE); const std::string shard_path = ledger::ledger_fs.physical_path(session_name, ledger::PRIMARY_DIR) + "/" + std::to_string(shard_seq_no); // Change db connection if the shard changes. diff --git a/src/status.cpp b/src/status.cpp index a598ddb9..6106858e 100644 --- a/src/status.cpp +++ b/src/status.cpp @@ -11,7 +11,9 @@ namespace status util::sequence_hash lcl_id; // Last ledger id/hash pair. ledger::ledger_record last_ledger; // Last ledger record that the node created. - std::atomic in_sync = false; // Indicates whether this node is in sync with other nodes or not. + // Indicates whether this node is in sync with other nodes or not. + // -1=unknown, 0=not-in-sync, 1=in-sync + std::atomic in_sync = -1; std::shared_mutex unl_mutex; std::set unl; // List of last reported unl binary pubkeys. @@ -26,13 +28,14 @@ namespace status // Not acquiring the mutex lock since this is called during startup only. lcl_id = ledger_id; last_ledger = ledger; - - // We assume we are not in sync unless otherwise found that we are. - in_sync = false; } void ledger_created(const util::sequence_hash &ledger_id, const ledger::ledger_record &ledger) { + // If currently not-in-sync, report it as in-sync when a ledger is created. + if (in_sync != 1) + sync_status_changed(true); + std::unique_lock lock(ledger_mutex); lcl_id = ledger_id; last_ledger = ledger; @@ -41,11 +44,8 @@ namespace status void sync_status_changed(const bool new_in_sync) { - if (in_sync != new_in_sync) - { - in_sync = new_in_sync; - event_queue.try_enqueue(sync_status_change_event{new_in_sync}); - } + in_sync = new_in_sync ? 1 : 0; + event_queue.try_enqueue(sync_status_change_event{new_in_sync}); } const util::sequence_hash get_lcl_id() @@ -56,7 +56,13 @@ namespace status const bool is_in_sync() { - return in_sync; + return in_sync == 1; + } + + const ledger::ledger_record get_last_ledger() + { + std::shared_lock lock(ledger_mutex); + return last_ledger; } //----- UNL status diff --git a/src/status.hpp b/src/status.hpp index e245da44..a84bc1e7 100644 --- a/src/status.hpp +++ b/src/status.hpp @@ -33,6 +33,7 @@ namespace status void sync_status_changed(const bool in_sync); const util::sequence_hash get_lcl_id(); const bool is_in_sync(); + const ledger::ledger_record get_last_ledger(); void init_unl(const std::set &init_unl); void unl_changed(const std::set &new_unl); diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 00fa208c..ad2ec01b 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -288,6 +288,11 @@ namespace usr /** * Sends multiple user input responses grouped by user. + * @param responses The collection of status responses to be sent out. + * @param ledger_seq_no The ledger seq no to indicate in accepted responses. Ignored for 'rejected' responses or for + * individual responses having their own ledger information. + * @param ledger_hash The ledger hash to indicate in accepted responses. Ignored for 'rejected' responses or for + * individual responses having their own ledger information. */ void send_input_status_responses(const std::unordered_map> &responses, const uint64_t ledger_seq_no, const util::h32 &ledger_hash) @@ -318,8 +323,9 @@ namespace usr resp.reject_reason == NULL ? msg::usrmsg::STATUS_ACCEPTED : msg::usrmsg::STATUS_REJECTED, resp.reject_reason == NULL ? "" : resp.reject_reason, resp.input_hash, - ledger_seq_no, - ledger_hash); + // Give priority ledger seq no/hash contained in individual responses. + resp.ledger_seq_no == 0 ? ledger_seq_no : resp.ledger_seq_no, + resp.ledger_hash == util::h32_empty ? ledger_hash : resp.ledger_hash); } } } diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index faeee854..02fa9ceb 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -74,7 +74,9 @@ namespace usr struct input_status_response { const std::string input_hash; - const char *reject_reason; + const char *reject_reason = NULL; + const uint64_t ledger_seq_no = 0; + const util::h32 ledger_hash = util::h32_empty; }; extern connected_context ctx; diff --git a/src/util/util.hpp b/src/util/util.hpp index 57e7b60e..35cb653c 100644 --- a/src/util/util.hpp +++ b/src/util/util.hpp @@ -9,6 +9,7 @@ #define MAX(a, b) ((a > b) ? a : b) #define MIN(a, b) ((a < b) ? a : b) +#define SHARD_SEQ(seq_no, shard_size) (MAX(seq_no, 1) - 1) / shard_size namespace util { diff --git a/test/vm-cluster/cluster.sh b/test/vm-cluster/cluster.sh index 0ae4d305..ba458008 100755 --- a/test/vm-cluster/cluster.sh +++ b/test/vm-cluster/cluster.sh @@ -38,7 +38,7 @@ if [ "$mode" = "info" ] || [ "$mode" = "select" ] || echo "mode: $mode" else echo "Invalid command." - echo " Expected: info | select | new | updatebin | updateconfig [N] | reconfig" \ + echo " Expected: info | select | new | updatebin [N] | updateconfig [N] | reconfig" \ " | start [N] | stop [N] | check [N] | log | kill [N] | reboot | ssh or" \ " | ssl or | lcl | pubkey [N]" echo " : Required node no. [N]: Optional node no."