Input responses while syncing. (#323)

* Send pending input responses after syncing.
* Send input statuses on expired inputs.
* Sqlite no journal.
This commit is contained in:
Ravin Perera
2021-06-12 15:14:21 +05:30
committed by GitHub
parent aae1617ec0
commit c1e1cd12a8
17 changed files with 336 additions and 68 deletions

View File

@@ -125,7 +125,7 @@ namespace conf
uint64_t max_bad_msgs_per_min = 0; // User bad messages per minute
uint16_t max_connections = 0; // Max inbound user connections
uint16_t max_in_connections_per_host = 0; // Max inbound user connections per remote host (IP).
uint64_t concurrent_read_reqeuests = 10; // Supported concurrent read requests count.
uint64_t concurrent_read_reqeuests = 4; // Supported concurrent read requests count.
};
struct peer_discovery_config

View File

@@ -14,6 +14,7 @@
#include "util/sequence_hash.hpp"
#include "unl.hpp"
#include "ledger/ledger.hpp"
#include "ledger/ledger_query.hpp"
#include "consensus.hpp"
#include "sc/hpfs_log_sync.hpp"
#include "status.hpp"
@@ -106,7 +107,7 @@ namespace consensus
// Throughout consensus, we continously update and prune the candidate proposals for newly
// arived ones and expired ones.
revise_candidate_proposals(ctx.sync_status == 0);
revise_candidate_proposals(ctx.vote_status == VOTES_SYNCED);
// Get current lcl, state, patch, primary shard and raw shard info.
util::sequence_hash lcl_id = ledger::ctx.get_lcl_id();
@@ -146,28 +147,43 @@ namespace consensus
{
int new_sync_status = check_sync_status(unl_count, votes, lcl_id);
if (ctx.sync_status != 0 && new_sync_status == 0)
if (ctx.vote_status != VOTES_SYNCED && new_sync_status == VOTES_UNRELIABLE)
{
// If we are just becoming 'in-sync' after being out-of-sync, check the sync status again after the proper
// If we are just becoming 'in-sync' after being out-of-sync, check the vote status again after the proper
// pruning of candidate proposals. This is because we relax the proposal pruning rules when we are not in sync,
// and we need to make the final sync status check after proper pruning rules are applied.
// and we need to make the final vote status check after proper pruning rules are applied.
LOG_DEBUG << "Rechecking sync status after becoming in-sync.";
LOG_DEBUG << "Rechecking vote status after becoming in-sync.";
revise_candidate_proposals(true);
new_sync_status = check_sync_status(unl_count, votes, lcl_id);
}
// Update the sync status if we went from in-sync to not-in-sync. We will report back as being in-sync
// only when we hit stage 3.
if (ctx.sync_status == 0 && new_sync_status != 0)
// Update the node's status if we went from in-sync to not-in-sync. We will report back as being in-sync only when ledger is created.
if (ctx.vote_status == VOTES_SYNCED && new_sync_status != VOTES_SYNCED)
status::sync_status_changed(false);
ctx.sync_status = new_sync_status;
// This marks entering into a new sync cycle.
if (new_sync_status == VOTES_DESYNC && !ctx.sync_ongoing)
{
// Cleanup any unconsensed contract outputs we may have had before the sync cycle began because those are going to be
// irrelavant after the sync.
cleanup_output_collections();
ctx.sync_ongoing = true;
}
// If we just bacame in-sync after being in desync, we need to restore consensus context information from the synced ledger.
if (ctx.vote_status != VOTES_SYNCED && new_sync_status == VOTES_SYNCED && ctx.sync_ongoing)
dispatch_synced_ledger_input_statuses(lcl_id);
ctx.vote_status = new_sync_status;
}
if (ctx.sync_status == -2) // Unreliable votes.
if (ctx.vote_status == VOTES_UNRELIABLE)
{
ctx.unreliable_votes_attempts++;
// If we get too many consecative unreliable vote rounds, then we perform time config sniffing just in case the unreliable votes
// are caused because our roundtime config information is different from other nodes.
if (ctx.unreliable_votes_attempts >= MAX_UNRELIABLE_VOTES_ATTEMPTS)
{
refresh_time_config(true);
@@ -179,16 +195,23 @@ namespace consensus
ctx.unreliable_votes_attempts = 0;
}
if (ctx.sync_status == 0)
if (ctx.vote_status == VOTES_SYNCED)
{
// If we are in sync, vote and broadcast the winning votes to next stage.
const p2p::proposal p = create_stage123_proposal(votes, unl_count, state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id);
broadcast_proposal(p);
// Upon successful consensus at stage 3, update the ledger and execute the contract using the consensus proposal.
if (ctx.stage == 3)
// This marks the moment we finish a sync cycle. We are in stage 1 and we ditect that our votes are in sync.
if (ctx.stage == 1 && ctx.sync_ongoing)
{
status::sync_status_changed(true); // Creating a new ledger means we are in sync.
// Clear any sync recovery pending state if we enter stage 1 while being in sync.
ctx.sync_ongoing = false;
status::sync_status_changed(true);
LOG_DEBUG << "Sync recovery completed.";
}
else if (ctx.stage == 3)
{
// Upon successful consensus at stage 3, update the ledger and execute the contract using the consensus proposal.
consensed_user_map consensed_users;
if (prepare_consensed_users(consensed_users, p) == -1 ||
@@ -219,12 +242,22 @@ namespace consensus
*/
int commit_consensus_results(const p2p::proposal &cons_prop, const consensus::consensed_user_map &consensed_users, const util::h32 &patch_hash)
{
// Creating a ledger while sync ongoing happens when we discover that our ledger votes are in sync at stage 2 or 3. At this point,
// we can create the ledger with majority votes. However we dont't have the raw contract outputs we should have had in the previous ledger
// (because we were syncing the ledger and didn't execute the contract). So after this ledger creation we will most probably get a raw ledger
// desync again because our raw outputs were different from other nodes. Therefore we don't consider this a proper/synced ledger creation until
// we are fully out of the sync cycle. Hence we pass the sync_ongoing flag to indicate whether we are still inside a sync cycle or not.
// Sync cycle is considered trully complete after the raw ledger is synced again and we discover in next round Stage 1 that our ledger votes
// are in sync.
// Persist the new ledger with the consensus results.
if (ledger::update_ledger(cons_prop, consensed_users) == -1)
if (ledger::update_ledger(cons_prop, consensed_users, ctx.sync_ongoing) == -1)
return -1;
util::sequence_hash lcl_id = ledger::ctx.get_lcl_id();
LOG_INFO << "****Ledger created**** (lcl:" << lcl_id << " state:" << cons_prop.state_hash << " patch:" << cons_prop.patch_hash << ")";
if (!ctx.sync_ongoing)
LOG_INFO << "****Ledger created**** (lcl:" << lcl_id << " state:" << cons_prop.state_hash << " patch:" << cons_prop.patch_hash << ")";
// Now that there's a new ledger, prune any newly-expired candidate inputs.
expire_candidate_inputs(lcl_id);
@@ -240,7 +273,7 @@ namespace consensus
return -1;
// Execute the smart contract with the consensed user inputs.
if (execute_contract(cons_prop, consensed_users, lcl_id) == -1)
if (execute_contract(cons_prop.time, consensed_users, lcl_id) == -1)
return -1;
return 0;
@@ -331,16 +364,14 @@ namespace consensus
// Proceed further only if last primary shard, last raw shard, state and patch hashes are in sync with majority.
if (!is_last_primary_shard_desync && !is_last_raw_shard_desync && !is_state_desync && !is_patch_desync)
{
return 0;
}
return VOTES_SYNCED;
// Last primary shard hash, last raw shard hash, patch or state desync.
return -1;
return VOTES_DESYNC;
}
// Majority last primary shard hash couldn't be detected reliably.
return -2;
return VOTES_UNRELIABLE;
}
/**
@@ -460,8 +491,10 @@ namespace consensus
}
// If final elected output hash matches our output hash, move the outputs into consensed outputs.
// However, do not perform the safety matching check if we have just completed a sync cycle as we will not possess the outputs
// generated during the previous ledger.
{
if (cons_prop.output_hash == ctx.user_outputs_hashtree.root_hash())
if (ctx.sync_ongoing || cons_prop.output_hash == ctx.user_outputs_hashtree.root_hash())
{
for (const auto &[hash, gen_output] : ctx.generated_user_outputs)
{
@@ -474,7 +507,7 @@ namespace consensus
}
else
{
LOG_WARNING << "Consenses output hash didn't match our output hash.";
LOG_WARNING << "Consensus output hash didn't match our output hash.";
ret = -1;
}
}
@@ -483,15 +516,20 @@ namespace consensus
}
/**
* Removes any candidate inputs that has lived passed the current ledger seq no.
* Removes any candidate inputs that has lived past the current ledger seq no.
*/
void expire_candidate_inputs(const util::sequence_hash &lcl_id)
{
std::unordered_map<std::string, std::vector<usr::input_status_response>> rejections;
auto itr = ctx.candidate_user_inputs.begin();
while (itr != ctx.candidate_user_inputs.end())
{
if (itr->second.max_ledger_seq_no <= lcl_id.seq_no)
{
const std::string input_hash = std::string(util::get_string_suffix(itr->first, BLAKE3_OUT_LEN));
rejections[itr->second.user_pubkey].push_back(usr::input_status_response{input_hash, msg::usrmsg::REASON_MAX_LEDGER_EXPIRED});
// Erase the candidate input along with its data buffer in the input store.
usr::input_store.purge(itr->second.input);
ctx.candidate_user_inputs.erase(itr++);
@@ -501,6 +539,9 @@ namespace consensus
++itr;
}
}
// Inform any connected users about their expired inputs.
usr::send_input_status_responses(rejections);
}
/**
@@ -1053,11 +1094,11 @@ namespace consensus
/**
* Executes the contract after consensus.
* @param cons_prop The proposal that reached consensus.
* @param time The consensus time.
* @param consensed_users Consensed users and their inputs.
* @param lcl_id Current lcl id of the node.
*/
int execute_contract(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id)
int execute_contract(const uint64_t time, const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id)
{
if (!conf::cfg.contract.execute || ctx.is_shutting_down)
return 0;
@@ -1069,7 +1110,7 @@ namespace consensus
sc::contract_execution_args &args = ctx.contract_ctx->args;
args.readonly = false;
args.time = cons_prop.time;
args.time = time;
// lcl to be passed to the contract.
args.lcl_id = lcl_id;
@@ -1180,6 +1221,39 @@ namespace consensus
cleanup_output_collections();
}
/**
* Dispatches any input responses corresponsing to candidate inputs that we have been holding during while syncing.
*/
void dispatch_synced_ledger_input_statuses(const util::sequence_hash &lcl_id)
{
// Find out any inputs we were holding that may have made their way into the ledger while we were syncing,
// and reply with 'accepted' input statuses if the user is conencted to us.
std::unordered_map<std::string, std::vector<usr::input_status_response>> responses;
auto itr = ctx.candidate_user_inputs.begin();
while (itr != ctx.candidate_user_inputs.end())
{
std::string_view ordered_hash = itr->first;
const std::string input_hash = std::string(util::get_string_suffix(ordered_hash, BLAKE3_OUT_LEN));
std::optional<ledger::ledger_user_input> input;
std::optional<ledger::ledger_record> ledger;
if (ledger::query::get_input_by_hash(lcl_id.seq_no, input_hash, input, ledger) != -1 && input)
{
// Each 'accepted' status response must be associated with the ledger seqno/hash that contained the input.
responses[itr->second.user_pubkey].push_back(usr::input_status_response{
input_hash, NULL, input->ledger_seq_no, *(util::h32 *)ledger->ledger_hash.data()});
// Erase the candidate input since we no longer need to hold it.
ctx.candidate_user_inputs.erase(itr++);
}
else
{
++itr;
}
}
usr::send_input_status_responses(responses);
}
/**
* Transfers consensus-reached inputs into the provided contract buf map so it can be fed into the contract process.
* @param bufmap The contract bufmap which needs to be populated with inputs.

View File

@@ -80,6 +80,11 @@ namespace consensus
}
};
#define VOTES_UNKNOWN -3
#define VOTES_UNRELIABLE -2
#define VOTES_DESYNC -1
#define VOTES_SYNCED 0
/**
* This is used to store consensus information
*/
@@ -112,7 +117,11 @@ namespace consensus
uint32_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage.
uint64_t round_boundry_offset = 0; // Time window boundry offset based on contract id.
uint16_t unreliable_votes_attempts = 0; // No. of times we failed to get reliable votes continously.
int sync_status = 0; // Current sync status.
int vote_status = VOTES_UNKNOWN; // Current status of votes.
// Indicates whether we are inside a sync cycle or not. Sync cycle is considered to being when we first detect that we are out of sync
// and considered to end when we detect to be in sync inside stage 1 of a round for the first time after we began a sync.
bool sync_ongoing = false;
std::optional<sc::execution_context> contract_ctx;
std::mutex contract_ctx_mutex;
@@ -195,12 +204,14 @@ namespace consensus
uint64_t get_stage_time_resolution(const uint64_t time);
int execute_contract(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id);
int execute_contract(const uint64_t time, const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id);
void dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id);
void dispatch_consensed_user_outputs(const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id);
void dispatch_synced_ledger_input_statuses(const util::sequence_hash &lcl_id);
void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const consensed_user_map &consensed_users);
void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap);

View File

@@ -105,14 +105,14 @@ namespace ledger
* @param consensed_users Users and their raw inputs/outputs received in this consensus round.
* @return Returns 0 on success -1 on error.
*/
int update_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users)
int update_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const bool sync_recovery_pending)
{
// Acquire hpfs rw session before writing into shards.
if (ledger_fs.acquire_rw_session() == -1)
return -1;
util::sequence_hash lcl_id;
if (update_primary_ledger(proposal, consensed_users, lcl_id) == -1 ||
if (update_primary_ledger(proposal, consensed_users, sync_recovery_pending, lcl_id) == -1 ||
update_ledger_raw_data(proposal, consensed_users, lcl_id) == -1)
{
ledger_fs.release_rw_session();
@@ -129,7 +129,7 @@ namespace ledger
* @param new_lcl_id The new ledger seq no. and hash.
* @return 0 on success. -1 on failure.
*/
int update_primary_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, util::sequence_hash &new_lcl_id)
int update_primary_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const bool sync_recovery_pending, util::sequence_hash &new_lcl_id)
{
const util::sequence_hash lcl_id = ctx.get_lcl_id();
new_lcl_id.seq_no = lcl_id.seq_no + 1;
@@ -169,8 +169,9 @@ namespace ledger
if (shard_res == 1)
remove_old_shards(new_lcl_id.seq_no, PRIMARY_SHARD_SIZE, conf::cfg.node.history_config.max_primary_shards, PRIMARY_DIR);
// Update the node's status.
status::ledger_created(new_lcl_id, ledger);
// Update the node's status if we are not inside a sync recovery cycle.
if (!sync_recovery_pending)
status::ledger_created(new_lcl_id, ledger);
return 0;
}

View File

@@ -70,9 +70,9 @@ namespace ledger
void deinit();
int update_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users);
int update_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const bool sync_recovery_pending);
int update_primary_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, util::sequence_hash &new_lcl_id);
int update_primary_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const bool sync_recovery_pending, util::sequence_hash &new_lcl_id);
int update_ledger_raw_data(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const util::sequence_hash &lcl_id);
@@ -98,7 +98,6 @@ namespace ledger
int persist_max_shard_seq_no(const std::string &shard_parent_dir, const uint64_t last_shard_seq_no);
int get_root_hash_from_ledger(util::h32 &root_hash, const uint64_t seq_no);
} // namespace ledger
#endif

View File

@@ -24,7 +24,7 @@ namespace ledger
uint64_t nonce; // Nonce the user had submitted for this input.
off_t blob_offset; // Blob file offset of this input blob.
size_t blob_size; // Length of the input.
std::string blob; // THe actual input blob.
std::string blob; // The actual input blob.
};
/**

View File

@@ -44,6 +44,9 @@ namespace ledger::query
// Fill raw data if required.
if (seq_q.inputs || seq_q.outputs)
{
// Do not return other users' blobs if consensus is private.
const std::string filter_user = conf::cfg.contract.is_consensus_public ? "" : std::string(user_pubkey);
for (ledger_record &ledger : ledgers)
{
if (seq_q.inputs)
@@ -52,7 +55,7 @@ namespace ledger::query
ledger.outputs = std::vector<ledger::ledger_user_output>();
// No need to actually query raw data for genesis ledger.
if (seq_q.seq_no == 0 || get_ledger_raw_data(ledger, user_pubkey, fs_sess_name) != -1)
if (seq_q.seq_no == 0 || get_ledger_raw_data(ledger, filter_user, fs_sess_name) != -1)
res = ledgers;
}
}
@@ -84,7 +87,7 @@ namespace ledger::query
}
// Construct shard path based on provided ledger seq no.
const uint64_t shard_seq_no = (q.seq_no - 1) / ledger::PRIMARY_SHARD_SIZE;
const uint64_t shard_seq_no = SHARD_SEQ(q.seq_no, ledger::PRIMARY_SHARD_SIZE);
const std::string db_vpath = std::string(ledger::PRIMARY_DIR) + "/" + std::to_string(shard_seq_no) + "/" + ledger::PRIMARY_DB;
const std::string db_path = ledger::ledger_fs.physical_path(fs_sess_name, db_vpath);
@@ -103,7 +106,7 @@ namespace ledger::query
/**
* Retrieve user inputs and outputs by ledger seq no. If consensus is private, this only fills blobs of the requesting user.
* @param ledger Ledger record to populate with inputs and outputs.
* @param user_pubkey Binary pubkey of the user executing the query.
* @param user_pubkey Binary user pubkey. If not empty, include raw data only for this user.
* @param fs_sess_name The ledger hosting fs session name.
* @returns 0 on success. -1 on failure.
*/
@@ -113,7 +116,7 @@ namespace ledger::query
if (!ledger.inputs && !ledger.outputs)
return 0;
const uint64_t shard_seq_no = (ledger.seq_no - 1) / ledger::RAW_SHARD_SIZE;
const uint64_t shard_seq_no = SHARD_SEQ(ledger.seq_no, ledger::RAW_SHARD_SIZE);
const std::string shard_path = ledger::ledger_fs.physical_path(fs_sess_name, std::string(ledger::RAW_DIR) + "/" + std::to_string(shard_seq_no) + "/");
const std::string db_path = shard_path + RAW_DB;
@@ -141,7 +144,7 @@ namespace ledger::query
* @param inputs User input collection to populate.
* @param seq_no Ledger seq no. to query.
* @param shard_path The shard physical path.
* @param user_pubkey Binary pubkey of the user executing the query.
* @param user_pubkey Binary user pubkey. If not empty, include raw data only for this user.
* @param fs_sess_name The ledger hosting fs session name.
* @returns 0 on success. -1 on failure.
*/
@@ -163,8 +166,8 @@ namespace ledger::query
for (ledger_user_input &inp : inputs)
{
// Do not return other users' blobs if consensus is private.
if (!conf::cfg.contract.is_consensus_public && inp.pubkey != user_pubkey)
// Apply user filter.
if (!user_pubkey.empty() && inp.pubkey != user_pubkey)
continue;
inp.blob.resize(inp.blob_size);
@@ -185,7 +188,7 @@ namespace ledger::query
* @param outputs User output collection to populate.
* @param seq_no Ledger seq no. to query.
* @param shard_path The shard physical path.
* @param user_pubkey Binary pubkey of the user executing the query.
* @param user_pubkey Binary user pubkey. If not empty, include raw data only for this user.
* @param fs_sess_name The ledger hosting fs session name.
* @returns 0 on success. -1 on failure.
*/
@@ -208,8 +211,8 @@ namespace ledger::query
// Loop through each user's blob groups.
for (ledger_user_output &user : outputs)
{
// Do not return other users' blobs if consensus is private.
if (!conf::cfg.contract.is_consensus_public && user.pubkey != user_pubkey)
// Apply user filter.
if (!user_pubkey.empty() && user.pubkey != user_pubkey)
continue;
// Output blobs for each user are grouped. Group header contains all individual blob offsets and sizes
@@ -246,4 +249,124 @@ namespace ledger::query
close(fd);
return 0;
}
/**
* Loads inputs and connected users from the specified ledger.
*/
int get_input_users_from_ledger(const uint64_t seq_no, std::vector<std::string> &users, std::vector<ledger_user_input> &inputs)
{
const char *session_name = "input_users";
if (ledger_fs.start_ro_session(session_name, false) == -1)
return -1;
const uint64_t shard_seq_no = SHARD_SEQ(seq_no, ledger::RAW_SHARD_SIZE);
const std::string shard_path = ledger::ledger_fs.physical_path(session_name, std::string(ledger::RAW_DIR) + "/" + std::to_string(shard_seq_no) + "/");
const std::string db_path = shard_path + RAW_DB;
sqlite3 *db = NULL;
if (sqlite::open_db(db_path, &db) == -1)
{
LOG_ERROR << errno << ": Error openning the shard database for input_users, shard: " << shard_seq_no;
ledger_fs.stop_ro_session(session_name);
return -1;
}
if (sqlite::get_users_by_seq_no(db, seq_no, users) == -1 ||
sqlite::get_user_inputs_by_seq_no(db, seq_no, inputs) == -1)
{
LOG_ERROR << errno << ": Error querying ledger input_users, seq_no: " << seq_no;
sqlite::close_db(&db);
ledger_fs.stop_ro_session(session_name);
return -1;
}
sqlite::close_db(&db);
ledger_fs.stop_ro_session(session_name);
return 0;
}
/**
* Attempts to find the provided input hash by scanning all the shards starting with the latest shard.
* @param lcl_seq_no Latest ledger seq no. used to determine latest shard.
* @param hash Input hash to find.
* @param input Popualted input data, if found.
* @param ledger Popualted ledger data which contains the input, if input found.
* @return 0 on successful attempt. -1 on error.
*/
int get_input_by_hash(const uint64_t lcl_seq_no, std::string_view hash, std::optional<ledger::ledger_user_input> &input, std::optional<ledger::ledger_record> &ledger)
{
const char *session_name = "input_by_hash";
if (ledger_fs.start_ro_session(session_name, false) == -1)
return -1;
// Search all the shards starting with last shard for the input hash.
uint64_t raw_shard = SHARD_SEQ(lcl_seq_no, ledger::RAW_SHARD_SIZE);
while (raw_shard >= 0 && !input)
{
const std::string shard_path = ledger::ledger_fs.physical_path(session_name, std::string(ledger::RAW_DIR) + "/" + std::to_string(raw_shard) + "/");
const std::string db_path = shard_path + RAW_DB;
if (!util::is_file_exists(db_path))
{
ledger_fs.stop_ro_session(session_name);
return 0; // Shard not found. So we abandon the search.
}
sqlite3 *db = NULL;
if (sqlite::open_db(db_path, &db) == -1)
{
LOG_ERROR << errno << ": Error openning the raw shard database to find input hash, shard: " << raw_shard;
ledger_fs.stop_ro_session(session_name);
return -1;
}
if (sqlite::get_user_input_by_hash(db, hash, input) == -1)
{
LOG_ERROR << errno << ": Error finding input hash in shard " << raw_shard;
sqlite::close_db(&db);
ledger_fs.stop_ro_session(session_name);
return -1;
}
sqlite::close_db(&db);
if (raw_shard == 0)
break;
// Keep scanning shards backwards.
raw_shard--;
}
if (input)
{
// Find the ledger containing the input.
const uint64_t primary_shard = SHARD_SEQ(input->ledger_seq_no, ledger::PRIMARY_SHARD_SIZE);
const std::string shard_path = ledger::ledger_fs.physical_path(session_name, std::string(ledger::PRIMARY_DIR) + "/" + std::to_string(primary_shard) + "/");
const std::string db_path = shard_path + PRIMARY_DB;
sqlite3 *db = NULL;
if (sqlite::open_db(db_path, &db) == -1)
{
LOG_ERROR << errno << ": Error openning the primary database to find ledger hash for input, shard: " << primary_shard;
ledger_fs.stop_ro_session(session_name);
return -1;
}
ledger::ledger_record rec;
if (sqlite::get_ledger_by_seq_no(db, input->ledger_seq_no, rec) != 1)
{
LOG_ERROR << errno << ": Error getting ledger for input in shard " << raw_shard;
sqlite::close_db(&db);
ledger_fs.stop_ro_session(session_name);
return -1;
}
ledger = std::move(rec);
sqlite::close_db(&db);
}
ledger_fs.stop_ro_session(session_name);
return 0;
}
}

View File

@@ -30,7 +30,8 @@ namespace ledger::query
int get_ledger_raw_data(ledger_record &ledger, std::string_view user_pubkey, const std::string &fs_sess_name);
int get_ledger_inputs(sqlite3 *db, std::vector<ledger_user_input> &inputs, const uint64_t seq_no, const std::string &shard_path, std::string_view user_pubkey, const std::string &fs_sess_name);
int get_ledger_outputs(sqlite3 *db, std::vector<ledger_user_output> &outputs, const uint64_t seq_no, const std::string &shard_path, std::string_view user_pubkey, const std::string &fs_sess_name);
int get_input_users_from_ledger(const uint64_t seq_no, std::vector<std::string> &users, std::vector<ledger_user_input> &inputs);
int get_input_by_hash(const uint64_t lcl_seq_no, std::string_view hash, std::optional<ledger::ledger_user_input> &input, std::optional<ledger::ledger_record> &ledger);
}
#endif

View File

@@ -28,9 +28,10 @@ namespace ledger::sqlite
constexpr const char *AND = " AND ";
constexpr const char *SELECT_LAST_LEDGER = "SELECT * FROM ledger ORDER BY seq_no DESC LIMIT 1";
constexpr const char *SELECT_LEDGER_BY_SEQ_NO = "SELECT * FROM ledger WHERE seq_no=? LIMIT 1";
constexpr const char *SELECT_USERS_BY_SEQ_NO = "SELECT * FROM users WHERE ledger_seq_no=?";
constexpr const char *SELECT_INPUTS_BY_SEQ_NO = "SELECT * FROM inputs WHERE ledger_seq_no=?";
constexpr const char *SELECT_OUTPUTS_BY_SEQ_NO = "SELECT * FROM outputs WHERE ledger_seq_no=?";
constexpr const char *SELECT_INPUT_BY_HASH = "SELECT * FROM inputs WHERE hash=?";
constexpr const char *INSERT_INTO_LEDGER = "INSERT INTO ledger("
"seq_no, time, ledger_hash, prev_ledger_hash, data_hash,"
@@ -579,6 +580,25 @@ namespace ledger::sqlite
return -1;
}
int get_users_by_seq_no(sqlite3 *db, const uint64_t seq_no, std::vector<std::string> &users)
{
sqlite3_stmt *stmt;
if (sqlite3_prepare_v2(db, SELECT_USERS_BY_SEQ_NO, -1, &stmt, 0) == SQLITE_OK && stmt != NULL &&
sqlite3_bind_int64(stmt, 1, seq_no) == SQLITE_OK)
{
while (sqlite3_step(stmt) == SQLITE_ROW)
users.push_back(GET_PUBKEY_BLOB(1));
sqlite3_finalize(stmt);
return 0;
}
LOG_ERROR << "Error when querying ledger users by seq no. from db. " << sqlite3_errmsg(db);
sqlite3_finalize(stmt);
return -1;
}
int get_user_inputs_by_seq_no(sqlite3 *db, const uint64_t seq_no, std::vector<ledger::ledger_user_input> &inputs)
{
sqlite3_stmt *stmt;
@@ -617,6 +637,25 @@ namespace ledger::sqlite
return -1;
}
int get_user_input_by_hash(sqlite3 *db, std::string_view hash, std::optional<ledger::ledger_user_input> &input)
{
sqlite3_stmt *stmt;
if (sqlite3_prepare_v2(db, SELECT_INPUT_BY_HASH, -1, &stmt, 0) == SQLITE_OK && stmt != NULL &&
BIND_H32_BLOB(1, hash))
{
if (sqlite3_step(stmt) == SQLITE_ROW)
input = populate_user_input_from_sql_record(stmt);
sqlite3_finalize(stmt);
return 0;
}
LOG_ERROR << "Error when querying ledger inputs by hash. from db. " << sqlite3_errmsg(db);
sqlite3_finalize(stmt);
return -1;
}
void populate_ledger_from_sql_record(ledger::ledger_record &ledger, sqlite3_stmt *stmt)
{
ledger.seq_no = sqlite3_column_int64(stmt, 0);

View File

@@ -40,7 +40,7 @@ namespace ledger::sqlite
};
// Generic methods.
int open_db(std::string_view db_name, sqlite3 **db, const bool writable = false, const bool journal = true);
int open_db(std::string_view db_name, sqlite3 **db, const bool writable = false, const bool journal = false);
int exec_sql(sqlite3 *db, std::string_view sql, int (*callback)(void *, int, char **, char **) = NULL, void *callback_first_arg = NULL);
@@ -89,10 +89,14 @@ namespace ledger::sqlite
int get_ledger_by_seq_no(sqlite3 *db, const uint64_t seq_no, ledger::ledger_record &ledger);
int get_users_by_seq_no(sqlite3 *db, const uint64_t seq_no, std::vector<std::string> &users);
int get_user_inputs_by_seq_no(sqlite3 *db, const uint64_t seq_no, std::vector<ledger::ledger_user_input> &inputs);
int get_user_outputs_by_seq_no(sqlite3 *db, const uint64_t seq_no, std::vector<ledger::ledger_user_output> &outputs);
int get_user_input_by_hash(sqlite3 *db, std::string_view hash, std::optional<ledger::ledger_user_input> &input);
void populate_ledger_from_sql_record(ledger::ledger_record &ledger, sqlite3_stmt *stmt);
ledger::ledger_user_input populate_user_input_from_sql_record(sqlite3_stmt *stmt);

View File

@@ -411,7 +411,7 @@ namespace sc::hpfs_log_sync
do
{
const uint64_t shard_seq_no = (current_seq_no - 1) / ledger::PRIMARY_SHARD_SIZE;
const uint64_t shard_seq_no = SHARD_SEQ(current_seq_no, ledger::PRIMARY_SHARD_SIZE);
const std::string shard_path = ledger::ledger_fs.physical_path(session_name, ledger::PRIMARY_DIR) + "/" + std::to_string(shard_seq_no);
// Change db connection if the shard changes.

View File

@@ -11,7 +11,9 @@ namespace status
util::sequence_hash lcl_id; // Last ledger id/hash pair.
ledger::ledger_record last_ledger; // Last ledger record that the node created.
std::atomic<bool> in_sync = false; // Indicates whether this node is in sync with other nodes or not.
// Indicates whether this node is in sync with other nodes or not.
// -1=unknown, 0=not-in-sync, 1=in-sync
std::atomic<int> in_sync = -1;
std::shared_mutex unl_mutex;
std::set<std::string> unl; // List of last reported unl binary pubkeys.
@@ -26,13 +28,14 @@ namespace status
// Not acquiring the mutex lock since this is called during startup only.
lcl_id = ledger_id;
last_ledger = ledger;
// We assume we are not in sync unless otherwise found that we are.
in_sync = false;
}
void ledger_created(const util::sequence_hash &ledger_id, const ledger::ledger_record &ledger)
{
// If currently not-in-sync, report it as in-sync when a ledger is created.
if (in_sync != 1)
sync_status_changed(true);
std::unique_lock lock(ledger_mutex);
lcl_id = ledger_id;
last_ledger = ledger;
@@ -41,11 +44,8 @@ namespace status
void sync_status_changed(const bool new_in_sync)
{
if (in_sync != new_in_sync)
{
in_sync = new_in_sync;
event_queue.try_enqueue(sync_status_change_event{new_in_sync});
}
in_sync = new_in_sync ? 1 : 0;
event_queue.try_enqueue(sync_status_change_event{new_in_sync});
}
const util::sequence_hash get_lcl_id()
@@ -56,7 +56,13 @@ namespace status
const bool is_in_sync()
{
return in_sync;
return in_sync == 1;
}
const ledger::ledger_record get_last_ledger()
{
std::shared_lock lock(ledger_mutex);
return last_ledger;
}
//----- UNL status

View File

@@ -33,6 +33,7 @@ namespace status
void sync_status_changed(const bool in_sync);
const util::sequence_hash get_lcl_id();
const bool is_in_sync();
const ledger::ledger_record get_last_ledger();
void init_unl(const std::set<std::string> &init_unl);
void unl_changed(const std::set<std::string> &new_unl);

View File

@@ -288,6 +288,11 @@ namespace usr
/**
* Sends multiple user input responses grouped by user.
* @param responses The collection of status responses to be sent out.
* @param ledger_seq_no The ledger seq no to indicate in accepted responses. Ignored for 'rejected' responses or for
* individual responses having their own ledger information.
* @param ledger_hash The ledger hash to indicate in accepted responses. Ignored for 'rejected' responses or for
* individual responses having their own ledger information.
*/
void send_input_status_responses(const std::unordered_map<std::string, std::vector<input_status_response>> &responses,
const uint64_t ledger_seq_no, const util::h32 &ledger_hash)
@@ -318,8 +323,9 @@ namespace usr
resp.reject_reason == NULL ? msg::usrmsg::STATUS_ACCEPTED : msg::usrmsg::STATUS_REJECTED,
resp.reject_reason == NULL ? "" : resp.reject_reason,
resp.input_hash,
ledger_seq_no,
ledger_hash);
// Give priority ledger seq no/hash contained in individual responses.
resp.ledger_seq_no == 0 ? ledger_seq_no : resp.ledger_seq_no,
resp.ledger_hash == util::h32_empty ? ledger_hash : resp.ledger_hash);
}
}
}

View File

@@ -74,7 +74,9 @@ namespace usr
struct input_status_response
{
const std::string input_hash;
const char *reject_reason;
const char *reject_reason = NULL;
const uint64_t ledger_seq_no = 0;
const util::h32 ledger_hash = util::h32_empty;
};
extern connected_context ctx;

View File

@@ -9,6 +9,7 @@
#define MAX(a, b) ((a > b) ? a : b)
#define MIN(a, b) ((a < b) ? a : b)
#define SHARD_SEQ(seq_no, shard_size) (MAX(seq_no, 1) - 1) / shard_size
namespace util
{

View File

@@ -38,7 +38,7 @@ if [ "$mode" = "info" ] || [ "$mode" = "select" ] ||
echo "mode: $mode"
else
echo "Invalid command."
echo " Expected: info | select | new | updatebin <N> | updateconfig [N] | reconfig" \
echo " Expected: info | select | new | updatebin [N] | updateconfig [N] | reconfig" \
" | start [N] | stop [N] | check [N] | log <N> | kill [N] | reboot <N> | ssh <N>or<command>" \
" | ssl <email>or<N> <email> | lcl | pubkey [N]"
echo " <N>: Required node no. [N]: Optional node no."