diff --git a/CMakeLists.txt b/CMakeLists.txt index 38e7be4d..308afecb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,7 +46,6 @@ add_executable(hpcore src/sc/hpfs_log_sync.cpp src/comm/comm_session.cpp src/msg/fbuf/common_helpers.cpp - src/msg/fbuf/ledger_helpers.cpp src/msg/fbuf/p2pmsg_conversion.cpp src/msg/json/controlmsg_json.cpp src/msg/controlmsg_parser.cpp diff --git a/examples/js_client/hp-client-lib.js b/examples/js_client/hp-client-lib.js index 45adbd8e..0cb7d60b 100644 --- a/examples/js_client/hp-client-lib.js +++ b/examples/js_client/hp-client-lib.js @@ -334,8 +334,8 @@ return getMultiConnectionResult(con => con.getStatus()); } - this.getLedgerBySeqNo = (seqNo, includeRawInputs, includeRawOutputs) => { - return getMultiConnectionResult(con => con.getLedgerBySeqNo(seqNo, includeRawInputs, includeRawOutputs)); + this.getLedgerBySeqNo = (seqNo, includeInputs, includeOutputs) => { + return getMultiConnectionResult(con => con.getLedgerBySeqNo(seqNo, includeInputs, includeOutputs)); } } @@ -618,18 +618,35 @@ const result = { seqNo: r.seq_no, timestamp: r.timestamp, - hash: r.hash, - prevHash: r.prev_hash, - stateHash: r.state_hash, - configHash: r.config_hash, - userHash: r.user_hash, - inputHash: r.input_hash, - outputHash: r.output_hash + hash: msgHelper.deserializeValue(r.hash), + prevHash: msgHelper.deserializeValue(r.prev_hash), + stateHash: msgHelper.deserializeValue(r.state_hash), + configHash: msgHelper.deserializeValue(r.config_hash), + userHash: msgHelper.deserializeValue(r.user_hash), + inputHash: msgHelper.deserializeValue(r.input_hash), + outputHash: msgHelper.deserializeValue(r.output_hash) } - if (r.raw_inputs) - result.rawInputs = r.raw_inputs; - if (r.raw_outputs) - result.rawOutputs = r.raw_outputs; + + if (r.inputs) { + result.inputs = r.inputs.map(i => { + return { + pubkey: msgHelper.deserializeValue(i.pubkey), + hash: msgHelper.deserializeValue(i.hash), + blob: msgHelper.deserializeValue(i.blob) + } + }); + } + + if (r.outputs) { + result.outputs = r.outputs.map(o => { + return { + pubkey: msgHelper.deserializeValue(o.pubkey), + hash: msgHelper.deserializeValue(o.hash), + blobs: o.blobs.map(b => msgHelper.deserializeValue(b)) + } + }); + } + return result; }); if (resolver.type == "seq_no") @@ -837,11 +854,11 @@ return Promise.resolve(); } - this.getLedgerBySeqNo = (seqNo, includeRawInputs, includeRawOutputs) => { + this.getLedgerBySeqNo = (seqNo, includeInputs, includeOutputs) => { if (connectionStatus != 2) return Promise.resolve(null); - const msg = msgHelper.createLedgerQuery("seq_no", { "seq_no": seqNo }, includeRawInputs, includeRawOutputs); + const msg = msgHelper.createLedgerQuery("seq_no", { "seq_no": seqNo }, includeInputs, includeOutputs); const p = new Promise(resolve => { ledgerQueryResolvers[msg.id] = { type: "seq_no", @@ -968,11 +985,11 @@ return { type: "stat" }; } - this.createLedgerQuery = (filterBy, params, includeRawInputs, includeRawOutputs) => { + this.createLedgerQuery = (filterBy, params, includeInputs, includeOutputs) => { const includes = []; - if (includeRawInputs) includes.push("raw_inputs"); - if (includeRawOutputs) includes.push("raw_outputs"); + if (includeInputs) includes.push("inputs"); + if (includeOutputs) includes.push("outputs"); return { type: "ledger_query", diff --git a/src/conf.cpp b/src/conf.cpp index bc8f05cc..70239538 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -133,7 +133,7 @@ namespace conf util::create_dir_tree_recursive(ctx.contract_hpfs_dir + "/seed" + sc::STATE_DIR_PATH) == -1 || util::create_dir_tree_recursive(ctx.contract_hpfs_mount_dir) == -1 || util::create_dir_tree_recursive(ctx.ledger_hpfs_dir + "/seed" + ledger::PRIMARY_DIR) == -1 || - util::create_dir_tree_recursive(ctx.ledger_hpfs_dir + "/seed" + ledger::BLOB_DIR) == -1 || + util::create_dir_tree_recursive(ctx.ledger_hpfs_dir + "/seed" + ledger::RAW_DIR) == -1 || util::create_dir_tree_recursive(ctx.ledger_hpfs_mount_dir) == -1 || util::create_dir_tree_recursive(ctx.contract_log_dir) == -1) { @@ -154,7 +154,7 @@ namespace conf cfg.node.role = ROLE::VALIDATOR; cfg.node.history = HISTORY::CUSTOM; cfg.node.history_config.max_primary_shards = 1; - cfg.node.history_config.max_blob_shards = 1; + cfg.node.history_config.max_raw_shards = 1; cfg.contract.id = crypto::generate_uuid(); cfg.contract.execute = true; @@ -356,9 +356,9 @@ namespace conf jpath = "node.history_config"; cfg.node.history_config.max_primary_shards = node["history_config"]["max_primary_shards"].as(); - cfg.node.history_config.max_blob_shards = node["history_config"]["max_blob_shards"].as(); + cfg.node.history_config.max_raw_shards = node["history_config"]["max_raw_shards"].as(); - // Max shards cannot be zero for primary and blob shards if the history mode is custom. + // Max shards cannot be zero for primary and raw shards if the history mode is custom. // In history = full, these configs are not used. if (cfg.node.history == HISTORY::CUSTOM) { @@ -368,9 +368,9 @@ namespace conf return -1; } - if (cfg.node.history_config.max_blob_shards == 0) + if (cfg.node.history_config.max_raw_shards == 0) { - std::cerr << "'max_blob_shards' cannot be zero in history=custom mode.\n"; + std::cerr << "'max_raw_shards' cannot be zero in history=custom mode.\n"; return -1; } } @@ -535,7 +535,7 @@ namespace conf jsoncons::ojson history_config; history_config.insert_or_assign("max_primary_shards", cfg.node.history_config.max_primary_shards); - history_config.insert_or_assign("max_blob_shards", cfg.node.history_config.max_blob_shards); + history_config.insert_or_assign("max_raw_shards", cfg.node.history_config.max_raw_shards); node_config.insert_or_assign("history_config", history_config); d.insert_or_assign("node", node_config); diff --git a/src/conf.hpp b/src/conf.hpp index 3b99c93b..7293ef3b 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -47,11 +47,11 @@ namespace conf CUSTOM }; - // Max number of shards to keep for primary and blob shards. + // Max number of shards to keep for primary and raw shards. struct history_configuration { uint64_t max_primary_shards; // Maximum number of shards for primary shards. - uint64_t max_blob_shards; // Maximum number of shards for blob shards. + uint64_t max_raw_shards; // Maximum number of shards for raw shards. }; // Log severity levels used in Hot Pocket. diff --git a/src/consensus.cpp b/src/consensus.cpp index b41a07ab..b0ec1a1b 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -109,12 +109,12 @@ namespace consensus // If possible, switch back to validator mode before stage processing. (if we were syncing before) check_sync_completion(); - // Get current lcl, state, patch, primary shard and blob shard info. + // Get current lcl, state, patch, primary shard and raw shard info. p2p::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); util::h32 state_hash = sc::contract_fs.get_parent_hash(sc::STATE_DIR_PATH); const util::h32 patch_hash = sc::contract_fs.get_parent_hash(sc::PATCH_FILE_PATH); const p2p::sequence_hash last_primary_shard_id = ledger::ctx.get_last_primary_shard_id(); - const p2p::sequence_hash last_blob_shard_id = ledger::ctx.get_last_blob_shard_id(); + const p2p::sequence_hash last_raw_shard_id = ledger::ctx.get_last_raw_shard_id(); if (ctx.stage == 0 || ctx.stage == 2) { @@ -131,7 +131,7 @@ namespace consensus if (verify_and_populate_candidate_user_inputs(lcl_id.seq_no) == -1) return -1; - const p2p::proposal p = create_stage0_proposal(state_hash, patch_hash, last_primary_shard_id, last_blob_shard_id); + const p2p::proposal p = create_stage0_proposal(state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id); broadcast_proposal(p); ctx.stage = 1; // Transition to next stage. @@ -161,7 +161,7 @@ namespace consensus if (sync_status == 0) { // If we are in sync, vote and broadcast the winning votes to next stage. - const p2p::proposal p = create_stage123_proposal(votes, unl_count, state_hash, patch_hash, last_primary_shard_id, last_blob_shard_id); + const p2p::proposal p = create_stage123_proposal(votes, unl_count, state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id); broadcast_proposal(p); // Upon successful consensus at stage 3, update the ledger and execute the contract using the consensus proposal. @@ -200,7 +200,7 @@ namespace consensus int commit_consensus_results(const p2p::proposal &cons_prop, const consensus::consensed_user_map &consensed_users, const util::h32 &patch_hash) { // Persist the new ledger with the consensus results. - if (ledger::save_ledger(cons_prop, consensed_users) == -1) + if (ledger::update_ledger(cons_prop, consensed_users) == -1) return -1; p2p::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); @@ -250,10 +250,10 @@ namespace consensus ledger::ledger_sync_worker.set_target_push_front(hpfs::sync_target{sync_name, majority_primary_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); } - // Check out blob shard hash with majority blob shard hash. - bool is_last_blob_shard_desync = false; - p2p::sequence_hash majority_blob_shard_id; - check_last_blob_shard_hash_votes(is_last_blob_shard_desync, majority_blob_shard_id, votes); + // Check out raw shard hash with majority raw shard hash. + bool is_last_raw_shard_desync = false; + p2p::sequence_hash majority_raw_shard_id; + check_last_raw_shard_hash_votes(is_last_raw_shard_desync, majority_raw_shard_id, votes); // Check our state with majority state. bool is_state_desync = false; @@ -288,38 +288,38 @@ namespace consensus } } - // If ledger blob shard is desync, We first request the latest blob shard. - if (is_last_blob_shard_desync) + // If ledger raw shard is desync, We first request the latest raw shard. + if (is_last_raw_shard_desync) { conf::change_role(conf::ROLE::OBSERVER); - const std::string majority_shard_seq_no_str = std::to_string(majority_blob_shard_id.seq_no); - const std::string sync_name = "blob shard " + majority_shard_seq_no_str; - const std::string shard_path = std::string(ledger::BLOB_DIR).append("/").append(majority_shard_seq_no_str); - ledger::ledger_sync_worker.is_last_blob_shard_syncing = true; - ledger::ledger_sync_worker.set_target_push_back(hpfs::sync_target{sync_name, majority_blob_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); + const std::string majority_shard_seq_no_str = std::to_string(majority_raw_shard_id.seq_no); + const std::string sync_name = "raw shard " + majority_shard_seq_no_str; + const std::string shard_path = std::string(ledger::RAW_DIR).append("/").append(majority_shard_seq_no_str); + ledger::ledger_sync_worker.is_last_raw_shard_syncing = true; + ledger::ledger_sync_worker.set_target_push_back(hpfs::sync_target{sync_name, majority_raw_shard_id.hash, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); } // If shards aren't aligned with max shard count, Do the relevant shard cleanups and requests. // In the first consensus round sync completion after the startup. - if (!ledger::ledger_sync_worker.is_syncing && (!ledger::ctx.primary_shards_persisted || !ledger::ctx.blob_shards_persisted) && ledger::ledger_fs.acquire_rw_session() != -1) + if (!ledger::ledger_sync_worker.is_syncing && (!ledger::ctx.primary_shards_persisted || !ledger::ctx.raw_shards_persisted) && ledger::ledger_fs.acquire_rw_session() != -1) { if (!ledger::ctx.primary_shards_persisted) ledger::persist_shard_history(majority_primary_shard_id.seq_no, ledger::PRIMARY_DIR); - if (!ledger::ctx.blob_shards_persisted) - ledger::persist_shard_history(majority_blob_shard_id.seq_no, ledger::BLOB_DIR); + if (!ledger::ctx.raw_shards_persisted) + ledger::persist_shard_history(majority_raw_shard_id.seq_no, ledger::RAW_DIR); ledger::ledger_fs.release_rw_session(); } - // Proceed further only if last primary shard, last blob shard, state and patch hashes are in sync with majority. - if (!is_last_primary_shard_desync && !is_last_blob_shard_desync && !is_state_desync && !is_patch_desync) + // Proceed further only if last primary shard, last raw shard, state and patch hashes are in sync with majority. + if (!is_last_primary_shard_desync && !is_last_raw_shard_desync && !is_state_desync && !is_patch_desync) { conf::change_role(conf::ROLE::VALIDATOR); return 0; } - // Last primary shard hash, last blob shard hash, patch or state desync. + // Last primary shard hash, last raw shard hash, patch or state desync. return -1; } @@ -335,7 +335,7 @@ namespace consensus { const bool is_contract_syncing = (conf::cfg.node.history == conf::HISTORY::FULL) ? sc::hpfs_log_sync::sync_ctx.is_syncing : sc::contract_sync_worker.is_syncing; // In ledger sync we only concern about last shard sync status to proceed with consensus. - const bool is_ledger_syncing = ledger::ledger_sync_worker.is_last_primary_shard_syncing || ledger::ledger_sync_worker.is_last_blob_shard_syncing; + const bool is_ledger_syncing = ledger::ledger_sync_worker.is_last_primary_shard_syncing || ledger::ledger_sync_worker.is_last_raw_shard_syncing; if (conf::cfg.node.role == conf::ROLE::OBSERVER && !is_contract_syncing && !is_ledger_syncing) conf::change_role(conf::ROLE::VALIDATOR); } @@ -382,7 +382,7 @@ namespace consensus << " state:" << cp.state_hash << " patch:" << cp.patch_hash << " lps:" << cp.last_primary_shard_id - << " lbs:" << cp.last_blob_shard_id + << " lbs:" << cp.last_raw_shard_id << " [from:" << ((cp.pubkey == conf::cfg.node.public_key) ? "self" : util::to_hex(cp.pubkey).substr(2, 10)) << "]" << "(" << (cp.recv_timestamp > cp.sent_timestamp ? (cp.recv_timestamp - cp.sent_timestamp) : 0) << "ms)"; @@ -617,7 +617,7 @@ namespace consensus << " state:" << p.state_hash << " patch:" << p.patch_hash << " last_primary_shard_id:" << p.last_primary_shard_id - << " last_blob_shard_id:" << p.last_blob_shard_id; + << " last_raw_shard_id:" << p.last_raw_shard_id; } /** @@ -738,7 +738,7 @@ namespace consensus } p2p::proposal create_stage0_proposal(const util::h32 &state_hash, const util::h32 &patch_hash, - const p2p::sequence_hash &last_primary_shard_id, const p2p::sequence_hash &last_blob_shard_id) + const p2p::sequence_hash &last_primary_shard_id, const p2p::sequence_hash &last_raw_shard_id) { // This is the proposal that stage 0 votes on. // We report our own values in stage 0. @@ -748,7 +748,7 @@ namespace consensus p.state_hash = state_hash; p.patch_hash = patch_hash; p.last_primary_shard_id = last_primary_shard_id; - p.last_blob_shard_id = last_blob_shard_id; + p.last_raw_shard_id = last_raw_shard_id; crypto::random_bytes(p.nonce, ROUND_NONCE_SIZE); // Populate the proposal with set of candidate user pubkeys. @@ -766,7 +766,7 @@ namespace consensus } p2p::proposal create_stage123_proposal(vote_counter &votes, const size_t unl_count, const util::h32 &state_hash, const util::h32 &patch_hash, - const p2p::sequence_hash &last_primary_shard_id, const p2p::sequence_hash &last_blob_shard_id) + const p2p::sequence_hash &last_primary_shard_id, const p2p::sequence_hash &last_raw_shard_id) { // The proposal to be emited at the end of this stage. p2p::proposal p; @@ -777,7 +777,7 @@ namespace consensus p.state_hash = state_hash; p.patch_hash = patch_hash; p.last_primary_shard_id = last_primary_shard_id; - p.last_blob_shard_id = last_blob_shard_id; + p.last_raw_shard_id = last_raw_shard_id; p.output_hash.resize(BLAKE3_OUT_LEN); // Default empty hash. const uint64_t time_now = util::get_epoch_milliseconds(); @@ -945,29 +945,29 @@ namespace consensus } /** - * Check whether our last blob shard hash is consistent with the proposals being made by our UNL peers last blob shard hash votes. - * @param is_ledger_blob_desync Indicates whether our ledger blob hash is out-of-sync with majority ledger blob hash. + * Check whether our last raw shard hash is consistent with the proposals being made by our UNL peers last raw shard hash votes. + * @param is_ledger_blob_desync Indicates whether our ledger raw hash is out-of-sync with majority ledger raw hash. * @param majority_primary_shard_id Majority primary shard id. * @param votes Vote counter for this stage. */ - void check_last_blob_shard_hash_votes(bool &is_ledger_blob_desync, p2p::sequence_hash &majority_blob_shard_id, vote_counter &votes) + void check_last_raw_shard_hash_votes(bool &is_ledger_blob_desync, p2p::sequence_hash &majority_raw_shard_id, vote_counter &votes) { for (const auto &[pubkey, cp] : ctx.candidate_proposals) { - increment(votes.last_ledger_blob_shard, cp.last_blob_shard_id); + increment(votes.last_ledger_raw_shard, cp.last_raw_shard_id); } uint32_t winning_votes = 0; - for (const auto [shard_id, votes] : votes.last_ledger_blob_shard) + for (const auto [shard_id, votes] : votes.last_ledger_raw_shard) { if (votes > winning_votes) { winning_votes = votes; - majority_blob_shard_id = shard_id; + majority_raw_shard_id = shard_id; } } - is_ledger_blob_desync = (ledger::ctx.get_last_blob_shard_id() != majority_blob_shard_id); + is_ledger_blob_desync = (ledger::ctx.get_last_raw_shard_id() != majority_raw_shard_id); } /** diff --git a/src/consensus.hpp b/src/consensus.hpp index 9ca5b034..64ba7da6 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -134,7 +134,7 @@ namespace consensus std::map state_hash; std::map patch_hash; std::map last_ledger_primary_shard; - std::map last_ledger_blob_shard; + std::map last_ledger_raw_shard; }; extern std::atomic is_patch_update_pending; // Keep track whether the patch file is changed by the SC and is not yet applied to runtime. @@ -174,16 +174,16 @@ namespace consensus int verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no); p2p::proposal create_stage0_proposal(const util::h32 &state_hash, const util::h32 &patch_hash, - const p2p::sequence_hash &last_primary_shard_id, const p2p::sequence_hash &last_blob_shard_id); + const p2p::sequence_hash &last_primary_shard_id, const p2p::sequence_hash &last_raw_shard_id); p2p::proposal create_stage123_proposal(vote_counter &votes, const size_t unl_count, const util::h32 &state_hash, const util::h32 &patch_hash, - const p2p::sequence_hash &last_primary_shard_id, const p2p::sequence_hash &last_blob_shard_id); + const p2p::sequence_hash &last_primary_shard_id, const p2p::sequence_hash &last_raw_shard_id); void broadcast_proposal(const p2p::proposal &p); bool check_last_primary_shard_hash_votes(bool &is_desync, p2p::sequence_hash &majority_primary_shard_id, vote_counter &votes, const size_t unl_count); - void check_last_blob_shard_hash_votes(bool &is_ledger_blob_desync, p2p::sequence_hash &majority_blob_shard_id, vote_counter &votes); + void check_last_raw_shard_hash_votes(bool &is_ledger_blob_desync, p2p::sequence_hash &majority_raw_shard_id, vote_counter &votes); void check_state_votes(bool &is_state_desync, util::h32 &majority_state_hash, vote_counter &votes); diff --git a/src/ledger/ledger.cpp b/src/ledger/ledger.cpp index d80980e3..2249c0b2 100644 --- a/src/ledger/ledger.cpp +++ b/src/ledger/ledger.cpp @@ -1,20 +1,27 @@ #include "ledger.hpp" +#include "../consensus.hpp" #include "../crypto.hpp" #include "../conf.hpp" #include "../util/version.hpp" #include "../util/util.hpp" -#include "../msg/fbuf/ledger_helpers.hpp" #include "../msg/fbuf/common_helpers.hpp" #include "ledger_common.hpp" #include "ledger_serve.hpp" -#define LEDGER_CREATE_ERROR \ - { \ - if (db != NULL) \ - sqlite::close_db(&db); \ - ledger_fs.release_rw_session(); \ - return -1; \ +#define RAW_DATA_RETURN(ret) \ + { \ + if (users_stmt != NULL) \ + sqlite3_finalize(users_stmt); \ + if (outputs_stmt != NULL) \ + sqlite3_finalize(outputs_stmt); \ + if (inputs_stmt != NULL) \ + sqlite3_finalize(inputs_stmt); \ + if (in_fd != -1) \ + close(in_fd); \ + if (out_fd != -1) \ + close(out_fd); \ + return ret; \ } namespace ledger @@ -69,6 +76,11 @@ namespace ledger return -1; } + // Remove old shards that exceeds max shard range. + const p2p::sequence_hash lcl_id = ctx.get_lcl_id(); + remove_old_shards(lcl_id.seq_no, PRIMARY_SHARD_SIZE, conf::cfg.node.history_config.max_primary_shards, PRIMARY_DIR); + remove_old_shards(lcl_id.seq_no, RAW_SHARD_SIZE, conf::cfg.node.history_config.max_raw_shards, RAW_DIR); + return 0; } @@ -83,83 +95,119 @@ namespace ledger } /** - * Create and save ledger record from the given proposal message. + * Updates the ledger with the given proposal message. * @param proposal Consensus-reached Stage 3 proposal. * @param consensed_users Users and their raw inputs/outputs received in this consensus round. * @return Returns 0 on success -1 on error. */ - int save_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users) + int update_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users) { - // Aqure hpfs rw session before accessing shards and insert ledger records. + // Aquire hpfs rw session before writing into shards. if (ledger_fs.acquire_rw_session() == -1) return -1; - const p2p::sequence_hash lcl_id = ctx.get_lcl_id(); - uint64_t seq_no = lcl_id.seq_no; - seq_no++; // New ledger sequence number. - - sqlite3 *db = NULL; - - // Prepare shard folders and database and get the primary shard sequence number. - uint64_t primary_shard_seq_no; - if (prepare_shard(&db, primary_shard_seq_no, seq_no) == -1) - LEDGER_CREATE_ERROR; - - // Insert primary ledger record. - std::string new_ledger_hash; - if (insert_ledger_record(db, lcl_id, primary_shard_seq_no, proposal, new_ledger_hash) == -1) - LEDGER_CREATE_ERROR; - - // Save blob data. - if ((!proposal.input_ordered_hashes.empty() || proposal.output_hash != util::h32_empty.to_string_view()) && - save_ledger_blob(new_ledger_hash, consensed_users) == -1) - LEDGER_CREATE_ERROR; - - // Update the latest seq_no and lcl when ledger is created. - p2p::sequence_hash new_lcl_id; - new_lcl_id.seq_no = seq_no; - new_lcl_id.hash = new_ledger_hash; - ctx.set_lcl_id(new_lcl_id); - - const std::string shard_vpath = std::string(ledger::PRIMARY_DIR).append("/").append(std::to_string(primary_shard_seq_no)); - util::h32 last_primary_shard_hash; - if (ledger_fs.get_hash(last_primary_shard_hash, hpfs::RW_SESSION_NAME, shard_vpath) == -1) + p2p::sequence_hash lcl_id; + if (update_primary_ledger(proposal, consensed_users, lcl_id) == -1 || + update_ledger_raw_data(proposal, consensed_users, lcl_id) == -1) { - LOG_ERROR << errno << ": Error reading shard hash: " << std::to_string(primary_shard_seq_no); - LEDGER_CREATE_ERROR; - } - - // Update the last shard hash and shard seqence number tracker when a new ledger is created. - ctx.set_last_primary_shard_id(p2p::sequence_hash{primary_shard_seq_no, last_primary_shard_hash}); - - // Update the hpfs log index file only in full history mode. - if (conf::cfg.node.history == conf::HISTORY::FULL && sc::contract_fs.update_hpfs_log_index() == -1) - { - LOG_ERROR << errno << ": Error updating the log index file."; + ledger_fs.release_rw_session(); return -1; } - //Remove old shards that exceeds max shard range. - if (conf::cfg.node.history == conf::HISTORY::CUSTOM && primary_shard_seq_no >= conf::cfg.node.history_config.max_primary_shards) - { - remove_old_shards(primary_shard_seq_no - conf::cfg.node.history_config.max_primary_shards + 1, PRIMARY_DIR); - } - - sqlite::close_db(&db); return ledger_fs.release_rw_session(); } /** - * Inserts new ledger record to the sqlite database. - * @param db Database connection to use. - * @param current_lcl_id Current lcl id. - * @param primary_shard_seq_no Current primary shard seq no. - * @param proposal The consensus proposal. - * @param new_ledger_hash Hash of the ledger that got isnerted. + * Updates the primary ledger with the given consensus information. + * @param proposal Consensus-reached Stage 3 proposal. + * @param consensed_users Users and their raw inputs/outputs received in this consensus round. + * @param new_lcl_id The new ledger seq no. and hash. * @return 0 on success. -1 on failure. */ - int insert_ledger_record(sqlite3 *db, const p2p::sequence_hash ¤t_lcl_id, const uint64_t primary_shard_seq_no, - const p2p::proposal &proposal, std::string &new_ledger_hash) + int update_primary_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, p2p::sequence_hash &new_lcl_id) + { + const p2p::sequence_hash lcl_id = ctx.get_lcl_id(); + new_lcl_id.seq_no = lcl_id.seq_no + 1; + + sqlite3 *db = NULL; + + // Prepare shard folders and database and get the shard sequence number. + uint64_t shard_seq_no; + const int shard_res = prepare_shard(&db, shard_seq_no, new_lcl_id.seq_no, PRIMARY_SHARD_SIZE, PRIMARY_DIR, PRIMARY_DB, true); + + // Insert primary ledger record. + if (shard_res >= 0 && insert_ledger_record(db, lcl_id, shard_seq_no, proposal, new_lcl_id) != -1) + { + sqlite::close_db(&db); + ctx.set_lcl_id(new_lcl_id); + + const std::string shard_vpath = std::string(ledger::PRIMARY_DIR).append("/").append(std::to_string(shard_seq_no)); + util::h32 last_primary_shard_hash; + if (ledger_fs.get_hash(last_primary_shard_hash, hpfs::RW_SESSION_NAME, shard_vpath) == -1) + { + LOG_ERROR << errno << ": Error reading shard hash: " << shard_seq_no; + return -1; + } + + // Update the last shard hash and shard seqence number tracker when a new ledger is created. + ctx.set_last_primary_shard_id(p2p::sequence_hash{shard_seq_no, last_primary_shard_hash}); + + // Update the hpfs log index file only in full history mode. + if (conf::cfg.node.history == conf::HISTORY::FULL && sc::contract_fs.update_hpfs_log_index() == -1) + { + LOG_ERROR << errno << ": Error updating the hpfs log index file."; + return -1; + } + + // Remove old shards if new one got created. + if (shard_res == 1) + remove_old_shards(new_lcl_id.seq_no, PRIMARY_SHARD_SIZE, conf::cfg.node.history_config.max_primary_shards, PRIMARY_DIR); + + return 0; + } + + sqlite::close_db(&db); + return -1; + } + + int update_ledger_raw_data(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id) + { + if ((conf::cfg.node.history != conf::HISTORY::FULL && conf::cfg.node.history_config.max_raw_shards == 0)) + return 0; + + const bool has_updates = !consensed_users.empty(); + + // Prepare shard folders and database and get the shard sequence number. + sqlite3 *db = NULL; + uint64_t shard_seq_no; + const int shard_res = prepare_shard(&db, shard_seq_no, lcl_id.seq_no, RAW_SHARD_SIZE, RAW_DIR, RAW_DB, has_updates); + + if (shard_res >= 0 && insert_raw_data_records(db, shard_seq_no, proposal, consensed_users, lcl_id) != -1) + { + sqlite::close_db(&db); + + // Remove old shards if new one got created. + if (shard_res == 1) + remove_old_shards(lcl_id.seq_no, RAW_SHARD_SIZE, conf::cfg.node.history_config.max_raw_shards, RAW_DIR); + + return 0; + } + + sqlite::close_db(&db); + return -1; + } + + /** + * Inserts new ledger record to the sqlite database. + * @param db The sqlite db connection for primary ledger db. + * @param current_lcl_id Current lcl id. + * @param shard_seq_no Current primary shard seq no. + * @param proposal The consensus proposal. + * @param new_lcl_id Newly created ledger id. + * @return 0 on success. -1 on failure. + */ + int insert_ledger_record(sqlite3 *db, const p2p::sequence_hash ¤t_lcl_id, const uint64_t shard_seq_no, + const p2p::proposal &proposal, p2p::sequence_hash &new_lcl_id) { // Combined binary hash of consensus user binary pub keys. const std::string user_hash = crypto::get_list_hash(proposal.users); @@ -193,13 +241,13 @@ namespace ledger const std::string prev_ledger_hash(current_lcl_id.hash.to_string_view()); // Ledger hash is the combined hash of previous ledger hash and the new data hash. - new_ledger_hash = crypto::get_hash(prev_ledger_hash, data_hash); + new_lcl_id.hash = crypto::get_hash(prev_ledger_hash, data_hash); // Construct ledger struct with binary hashes. const ledger_record ledger{ current_lcl_id.seq_no + 1, proposal.time, - new_ledger_hash, + std::string(new_lcl_id.hash.to_string_view()), prev_ledger_hash, data_hash, std::string(proposal.state_hash.to_string_view()), @@ -210,7 +258,7 @@ namespace ledger if (sqlite::insert_ledger_row(db, ledger) == -1) { - LOG_ERROR << errno << ": Error creating the ledger, shard: " << primary_shard_seq_no; + LOG_ERROR << errno << ": Error creating the ledger, shard: " << shard_seq_no; return -1; } @@ -218,53 +266,203 @@ namespace ledger } /** - * Opens a db connection to a shard and populates the shard_seq_no. - * @param db Database connection to be openned. - * @param ledger_seq_no Ledger sequence number. + * Populates the raw data db and blob files with consensed users, inputs and outputs records. + * @param db The sqlite db connection for raw data db. + * @param shard_seq_no Raw shard seq no. + * @param proposal The consensus proposal. + * @param consensed_users Consensed users and their inputs and outputs. + * @param lcl_id Current ledger id. * @return 0 on success. -1 on failure. */ - int prepare_shard(sqlite3 **db, uint64_t &shard_seq_no, const uint64_t ledger_seq_no) + int insert_raw_data_records(sqlite3 *db, const uint64_t shard_seq_no, const p2p::proposal &proposal, + const consensus::consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id) + { + // We keep sqlite records about users, inputs and outputs. To store raw input and output content, we use the corresponding blob file + // within the shard. Each shard has a sqlite db, raw inputs blob file and raw outputs blob file. + + if (consensed_users.empty()) + return 0; + + const std::string shard_path = ledger_fs.physical_path(hpfs::RW_SESSION_NAME, std::string(RAW_DIR).append("/").append(std::to_string(shard_seq_no)).append("/")); + + // We reuse sqlite prepared statements to improve looping performance. + + sqlite3_stmt *users_stmt = sqlite::prepare_user_insert(db); + sqlite3_stmt *outputs_stmt = NULL; + sqlite3_stmt *inputs_stmt = NULL; + + int in_fd = -1; // Raw inputs storage file for the shard. Only created and opened if there are any inputs. + int out_fd = -1; // Raw outputs storage file for the shard. Only created and opened if there are any outputs. + size_t in_pos = 0; // Current writing position offset of the inputs file. + size_t out_pos = 0; // Current writing position offset of the outputs file. + + for (const auto &[pubkey, cu] : consensed_users) + { + if (sqlite::insert_user_record(users_stmt, lcl_id.seq_no, pubkey) == -1) + RAW_DATA_RETURN(-1); + + if (!cu.consensed_inputs.empty()) + { + if (inputs_stmt == NULL) + inputs_stmt = sqlite::prepare_user_input_insert(db); + + for (const consensus::consensed_user_input &cui : cu.consensed_inputs) + { + // Create and open the raw inputs file for the shard if needed. + if (in_fd == -1 && (in_fd = create_raw_data_blob_file(shard_path, RAW_INPUTS_FILE, in_pos)) == -1) + RAW_DATA_RETURN(-1); + + // Write the input to the blob file. Then we save the written offset and blob size in sqlite record. + std::string buf; + usr::input_store.read_buf(cui.input, buf); + if (write(in_fd, buf.data(), buf.size()) == -1) + { + LOG_ERROR << errno << ": Error when writing input blob."; + RAW_DATA_RETURN(-1); + } + + // Insert sqlite record. + std::string_view hash = util::get_string_suffix(cui.ordered_hash, BLAKE3_OUT_LEN); + std::string_view nonce = cui.ordered_hash.substr(0, cui.ordered_hash.size() - hash.size()); + + if (sqlite::insert_user_input_record(inputs_stmt, lcl_id.seq_no, pubkey, hash, nonce, in_pos, buf.size()) == -1) + RAW_DATA_RETURN(-1); + + in_pos += buf.size(); // Increament the blob file write offset so next write will happen correctly. + } + } + + if (!cu.consensed_outputs.outputs.empty()) + { + // Create and open the raw outputs file for the shard if needed. + if (out_fd == -1 && (out_fd = create_raw_data_blob_file(shard_path, RAW_OUTPUTS_FILE, out_pos)) == -1) + RAW_DATA_RETURN(-1); + + // Write all the outputs of this user to the blob file. Then we save the written offset and output count in sqlite record. + // First we write the list of offsets and sizes of each output. Then the outputs themselves. + // [offset1][size1][offset2][size2]....[output1][output2]... + + // Prepare write header. + const uint64_t output_count = cu.consensed_outputs.outputs.size(); + std::vector header(output_count * (sizeof(off_t) + sizeof(size_t))); // Header containing list of [offset+size]. + off_t out_buf_offset = out_pos + header.size(); // Output buffers will be written after the header. + for (size_t i = 0; i < output_count; i++) + { + const size_t output_size = cu.consensed_outputs.outputs[i].size(); + uint8_t *header_pos = header.data() + (i * (sizeof(off_t) + sizeof(size_t))); + // Write the pair of offset+size of the individual output into the header. + util::uint64_to_bytes(header_pos, out_buf_offset); + util::uint64_to_bytes(header_pos + sizeof(size_t), output_size); + out_buf_offset += output_size; + } + + // Write the header and output buffers. + iovec memsegs[1 + output_count]; + memsegs[0] = iovec{header.data(), header.size()}; + uint64_t total_write_size = header.size(); + for (size_t i = 0; i < output_count; i++) + { + const std::string &output = cu.consensed_outputs.outputs[i]; + memsegs[i + 1] = iovec{(void *)output.data(), output.size()}; + total_write_size += output.size(); + } + if (writev(out_fd, memsegs, 1 + output_count) == -1) + { + LOG_ERROR << errno << ": Error when writing outputs blobs."; + RAW_DATA_RETURN(-1); + } + + // Insert sqlite record. + // Prepare the output insertion stamement only once. + if (outputs_stmt == NULL) + outputs_stmt = sqlite::prepare_user_output_insert(db); + + if (sqlite::insert_user_output_record(outputs_stmt, lcl_id.seq_no, pubkey, cu.consensed_outputs.hash, out_pos, output_count) == -1) + RAW_DATA_RETURN(-1); + + out_pos += total_write_size; // Increament the blob file write offset so next write will happen correctly. + } + } + + RAW_DATA_RETURN(0); + } + + /** + * Open or create the specified file name for appending raw blob data. + * @param shard_path Parent shard directory. + * @param file_name Name of the blob file. + * @param file_size Current file size. + * @return 0 on success. -1 on failure. + */ + int create_raw_data_blob_file(const std::string &shard_path, const char *file_name, size_t &file_size) + { + const std::string file_path = shard_path + file_name; + int fd = open(file_path.data(), O_WRONLY | O_APPEND | O_CREAT, FILE_PERMS); + if (fd == -1) + LOG_ERROR << errno << ": Error when creating file " << file_path; + + struct stat st; + if (fstat(fd, &st) == -1) + LOG_ERROR << errno << ": Error when stat of file " << file_path; + + file_size = st.st_size; + return fd; + } + + /** + * Creates or open a db connection to the shard based on the params. This is used to create primary and raw shards. + * @param db Database connection to be opened. + * @param ledger_seq_no Ledger sequence number. + * @param open_db Whether a connection to the sql db must be opened or not. + * @return 0 if shard already exists. 1 if new shard got created. -1 on failure. + */ + int prepare_shard(sqlite3 **db, uint64_t &shard_seq_no, const uint64_t ledger_seq_no, const uint64_t shard_size, + const char *shard_dir, const char *db_name, const bool open_db) { // Construct shard path. - shard_seq_no = (ledger_seq_no - 1) / PRIMARY_SHARD_SIZE; - const std::string shard_path = ledger_fs.physical_path(hpfs::RW_SESSION_NAME, std::string(ledger::PRIMARY_DIR).append("/").append(std::to_string(shard_seq_no))); + shard_seq_no = (ledger_seq_no - 1) / shard_size; + const std::string shard_path = ledger_fs.physical_path(hpfs::RW_SESSION_NAME, std::string(shard_dir).append("/").append(std::to_string(shard_seq_no))); - // If (seq_no - 1) % PRIMARY_SHARD_SIZE == 0 means this is the first ledger of the shard. - // So create the shard folder and ledger table. - if ((ledger_seq_no - 1) % PRIMARY_SHARD_SIZE == 0) + // This means this is the first ledger of the shard. + // So create the shard folder and other required files. + if ((ledger_seq_no - 1) % shard_size == 0) { // Creating the directory. if (util::create_dir_tree_recursive(shard_path) == -1) { - LOG_ERROR << errno << ": Error creating the shard, shard: " << std::to_string(shard_seq_no); + LOG_ERROR << errno << ": Error creating the shard " << shard_path; return -1; } // Creating ledger database and open a database connection. - if (sqlite::open_db(shard_path + "/" + DATABASE, db) == -1) + if (sqlite::open_db(shard_path + "/" + db_name, db) == -1) { - LOG_ERROR << errno << ": Error openning the shard database, shard: " << std::to_string(shard_seq_no); + LOG_ERROR << errno << ": Error creating the database " << db_name << " in " << shard_path; return -1; } - // Create and update the hp_version table with current hp version. - if (sqlite::create_hp_version_table_and_update(*db, version::LEDGER_VERSION) == -1) + if ((shard_dir == PRIMARY_DIR && sqlite::initialize_ledger_db(*db) == -1) || + (shard_dir == RAW_DIR && sqlite::initialize_ledger_raw_db(*db) == -1)) { - LOG_ERROR << errno << ": Error creating and updating hp version table, shard: " << std::to_string(shard_seq_no); + LOG_ERROR << errno << ": Error initilizing the database " << db_name << " in " << shard_path; return -1; } - // Creating the ledger table. - if (sqlite::create_ledger_table(*db) == -1) + // Create and update the hp table with current ledger version. + if (sqlite::create_hp_table(*db, version::LEDGER_VERSION) == -1) { - LOG_ERROR << errno << ": Error creating the shard table, shard: " << std::to_string(shard_seq_no); + LOG_ERROR << errno << ": Error creating hp table in " << db_name << " in " << shard_path; return -1; } + // Close the connection if it doesn't need to be retained. + if (!open_db) + sqlite::close_db(db); + util::h32 prev_shard_hash; if (shard_seq_no > 0) { - const std::string prev_shard_vpath = std::string(ledger::PRIMARY_DIR).append("/").append(std::to_string(shard_seq_no - 1)); + const std::string prev_shard_vpath = std::string(shard_dir) + "/" + std::to_string(shard_seq_no - 1); if (ledger_fs.get_hash(prev_shard_hash, hpfs::RW_SESSION_NAME, prev_shard_vpath) < 1) { LOG_ERROR << errno << ": Error getting shard hash in vpath: " << prev_shard_vpath << " for previous shard hash."; @@ -273,66 +471,79 @@ namespace ledger } // Write the prev_shard.hash to the new folder. - const std::string shard_hash_file_path = shard_path + PREV_SHARD_HASH_FILENAME; - const int fd = open(shard_hash_file_path.data(), O_CREAT | O_RDWR, FILE_PERMS); - if (fd == -1) { - LOG_ERROR << errno << ": Error creating prev_shard.hash file in shard " << std::to_string(shard_seq_no); - return -1; - } + const std::string shard_hash_file_path = shard_path + PREV_SHARD_HASH_FILENAME; + const int fd = open(shard_hash_file_path.data(), O_CREAT | O_RDWR, FILE_PERMS); + if (fd == -1) + { + LOG_ERROR << errno << ": Error creating prev_shard.hash file in " << shard_path; + return -1; + } - struct iovec iov_vec[2]; - iov_vec[0].iov_base = version::LEDGER_VERSION_BYTES; - iov_vec[0].iov_len = version::VERSION_BYTES_LEN; + struct iovec iov_vec[2]; + iov_vec[0].iov_base = version::LEDGER_VERSION_BYTES; + iov_vec[0].iov_len = version::VERSION_BYTES_LEN; - iov_vec[1].iov_base = &prev_shard_hash; - iov_vec[1].iov_len = sizeof(util::h32); + iov_vec[1].iov_base = &prev_shard_hash; + iov_vec[1].iov_len = sizeof(util::h32); - if (writev(fd, iov_vec, 2) == -1) - { - LOG_ERROR << errno << ": Error writing to " << shard_hash_file_path << "."; + if (writev(fd, iov_vec, 2) == -1) + { + LOG_ERROR << errno << ": Error writing to " << shard_hash_file_path << "."; + close(fd); + return -1; + } close(fd); - return -1; } - close(fd); - // Persist newly created shard seq number as the max primary shard seq number. - if (persist_max_shard_seq_no(PRIMARY_DIR, shard_seq_no) == -1) + // Persist newly created shard seq number as the max shard seq number. + if (persist_max_shard_seq_no(shard_dir, shard_seq_no) == -1) { - LOG_ERROR << "Error persisting maximum primary shard sequnce number."; + LOG_ERROR << "Error persisting maximum raw shard sequnce number."; return -1; } - } - else if (sqlite::open_db(shard_path + "/" + DATABASE, db) == -1) - { - LOG_ERROR << errno << ": Error openning the shard database, shard: " << std::to_string(shard_seq_no); - return -1; - } - return 0; + return 1; + } + else + { + if (open_db && sqlite::open_db(shard_path + "/" + db_name, db) == -1) + { + LOG_ERROR << errno << ": Error openning the shard database in " << shard_path; + return -1; + } + return 0; + } } /** * Remove old shards that exceeds max shard range from file system. - * @param led_shard_no Minimum shard number to be in history. + * @param lcl_seq_no Current ledger seq no. + * @param shard_size Shard size to use. + * @param max_shards Maximum shards to keep. + * @param shard_parent_dir Shard parent directory. */ - void remove_old_shards(const uint64_t led_shard_no, std::string_view shard_parent_dir) + void remove_old_shards(const uint64_t lcl_seq_no, const uint64_t shard_size, const uint64_t max_shards, std::string_view shard_parent_dir) { - // Remove old shards if this is not a full history node. - if (conf::cfg.node.history == conf::HISTORY::CUSTOM) + const uint64_t shard_seq_no = (lcl_seq_no - 1) / shard_size; + + // No removals if this is a full history node or we haven't yet reached the shard limit. + if (conf::cfg.node.history == conf::HISTORY::FULL || max_shards > shard_seq_no) + return; + + const uint64_t delete_from = shard_seq_no - max_shards; + + for (int i = delete_from; i >= 0; i--) { - for (int i = led_shard_no - 1; i >= 0; i--) + const std::string shard_path = std::string(ledger_fs.physical_path(hpfs::RW_SESSION_NAME, shard_parent_dir)).append("/").append(std::to_string(i)); + // Break the loop if there's no corresponding shard. + // There cannot be shards which is less than this shard no. since shards are continous. + if (!util::is_dir_exists(shard_path)) + break; + else if (util::remove_directory_recursively(shard_path) == -1) { - const std::string shard_path = std::string(ledger_fs.physical_path(hpfs::RW_SESSION_NAME, shard_parent_dir)).append("/").append(std::to_string(i)); - // Break the loop if there's no corresponding shard. - // There cannot be shards which is less than this shard no since shards are continous. - if (!util::is_dir_exists(shard_path)) - break; - else if (util::remove_directory_recursively(shard_path) == -1) - { - LOG_ERROR << errno << ": Error deleting shard: " << i; - break; - } + LOG_ERROR << errno << ": Error deleting shard: " << shard_path; + break; } } } @@ -345,14 +556,14 @@ namespace ledger void persist_shard_history(const uint64_t shard_seq_no, std::string_view shard_parent_dir) { // Skip if shard cleanup and requesting has been already done. - if ((shard_parent_dir == PRIMARY_DIR && ctx.primary_shards_persisted) || (shard_parent_dir == BLOB_DIR && ctx.blob_shards_persisted)) + if ((shard_parent_dir == PRIMARY_DIR && ctx.primary_shards_persisted) || (shard_parent_dir == RAW_DIR && ctx.raw_shards_persisted)) return; // Set persisted flag to true. So this cleanup won't get executed again. - shard_parent_dir == PRIMARY_DIR ? ctx.primary_shards_persisted = true : ctx.blob_shards_persisted = true; + shard_parent_dir == PRIMARY_DIR ? ctx.primary_shards_persisted = true : ctx.raw_shards_persisted = true; const std::string shard_dir_path = std::string(ledger_fs.physical_path(hpfs::RW_SESSION_NAME, shard_parent_dir)); - const uint64_t max_shard_count = shard_dir_path == PRIMARY_DIR ? conf::cfg.node.history_config.max_primary_shards : conf::cfg.node.history_config.max_blob_shards; + const uint64_t max_shard_count = shard_dir_path == PRIMARY_DIR ? conf::cfg.node.history_config.max_primary_shards : conf::cfg.node.history_config.max_raw_shards; const std::list shard_list = util::fetch_dir_entries(shard_dir_path); // Skip the sequence no file from the count. uint64_t shard_count = shard_list.size() - 1; @@ -401,171 +612,11 @@ namespace ledger return; } - const std::string sync_name = (shard_parent_dir == PRIMARY_DIR ? "primary" : "blob") + std::string(" shard ") + std::to_string(seq_no); const std::string shard_path = std::string(shard_parent_dir).append("/").append(std::to_string(seq_no)); - ledger_sync_worker.set_target_push_back(hpfs::sync_target{sync_name, prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); + ledger_sync_worker.set_target_push_back(hpfs::sync_target{shard_path, prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); } } - /** - * Save raw data from the consensed proposal. A blob file is only created if there is any user inputs or outputs - * to save disk space. - * @param ledger_hash Hash of this ledger we are saving. - * @param consensed_users Users and their raw inputs/outputs consensed in this consensus round. - * @return Returns 0 on success -1 on error. - */ - int save_ledger_blob(std::string_view ledger_hash, const consensus::consensed_user_map &consensed_users) - { - // Construct shard path. - uint64_t last_blob_shard_seq_no = ctx.get_last_blob_shard_id().seq_no; - std::string shard_vpath = std::string(ledger::BLOB_DIR).append("/").append(std::to_string(last_blob_shard_seq_no)); - std::string shard_path = ledger_fs.physical_path(hpfs::RW_SESSION_NAME, shard_vpath); - - bool should_create_folder = false; - if (util::is_dir_exists(shard_path)) - { - if ((util::fetch_dir_entries(shard_path).size() - 1) >= BLOB_SHARD_SIZE) - { - should_create_folder = true; - last_blob_shard_seq_no++; - shard_vpath = std::string(ledger::BLOB_DIR).append("/").append(std::to_string(last_blob_shard_seq_no)); - shard_path = ledger_fs.physical_path(hpfs::RW_SESSION_NAME, shard_vpath); - } - } - else - { - should_create_folder = true; - } - - // Create the required shard folder if not already existing. - if (should_create_folder) - { - // Creating the directory. - if (util::create_dir_tree_recursive(shard_path) == -1) - { - LOG_ERROR << errno << ": Error creating the blob shard, shard: " << std::to_string(last_blob_shard_seq_no); - ledger_fs.release_rw_session(); - return -1; - } - - util::h32 prev_shard_hash; - if (last_blob_shard_seq_no > 0) - { - const std::string prev_shard_vpath = std::string(ledger::BLOB_DIR).append("/").append(std::to_string(last_blob_shard_seq_no - 1)); - if (ledger_fs.get_hash(prev_shard_hash, hpfs::RW_SESSION_NAME, prev_shard_vpath) < 1) - { - LOG_ERROR << errno << ": Error getting blob shard hash in vpath: " << prev_shard_vpath << " for previous shard hash."; - ledger_fs.release_rw_session(); - return -1; - } - } - // Write the prev_shard.hash to the new folder. - const std::string shard_hash_file_path = shard_path + PREV_SHARD_HASH_FILENAME; - const int fd = open(shard_hash_file_path.data(), O_CREAT | O_RDWR, FILE_PERMS); - if (fd == -1) - { - LOG_ERROR << errno << ": Error creating prev_shard.hash file in blob shard " << std::to_string(last_blob_shard_seq_no); - return -1; - } - - struct iovec iov_vec[2]; - iov_vec[0].iov_base = version::LEDGER_VERSION_BYTES; - iov_vec[0].iov_len = version::VERSION_BYTES_LEN; - - iov_vec[1].iov_base = &prev_shard_hash; - iov_vec[1].iov_len = sizeof(util::h32); - - if (writev(fd, iov_vec, 2) == -1) - { - LOG_ERROR << errno << ": Error writing to " << shard_hash_file_path << "."; - close(fd); - return -1; - } - close(fd); - - // Persist newly created shard seq number as the max blob shard seq number. - if (persist_max_shard_seq_no(BLOB_DIR, last_blob_shard_seq_no) == -1) - { - LOG_ERROR << "Error persisting maximum blob shard sequnce number."; - return -1; - } - } - - ledger_blob blob; - - blob.ledger_hash = ledger_hash; - - // Include consensed user inputs. - for (const auto &[pubkey, user] : consensed_users) - { - const auto [itr, success] = blob.inputs.try_emplace(pubkey, std::vector()); - - for (const consensus::consensed_user_input &ci : user.consensed_inputs) - { - std::string input; - if (usr::input_store.read_buf(ci.input, input) != -1) - itr->second.push_back(input); - } - } - - // Include consensed user outputs. - for (const auto &[pubkey, user] : consensed_users) - { - std::vector outputs; - for (const std::string &output : user.consensed_outputs.outputs) - outputs.push_back(std::move(output)); - - blob.outputs.emplace(pubkey, std::move(outputs)); - } - - flatbuffers::FlatBufferBuilder builder(1024); - msg::fbuf::ledgermsg::create_ledger_blob_msg_from_ledger_blob(builder, blob); - - const std::string file_path = shard_path + "/" + util::to_hex(ledger_hash) + ".blob"; - - const int fd = open(file_path.data(), O_CREAT | O_RDWR, FILE_PERMS); - if (fd == -1) - { - LOG_ERROR << errno << ": Error creating ledger blob file. " << file_path; - return -1; - } - - struct iovec iov_vec[2]; - iov_vec[0].iov_base = version::LEDGER_VERSION_BYTES; - iov_vec[0].iov_len = version::VERSION_BYTES_LEN; - - iov_vec[1].iov_base = builder.GetBufferPointer(); - iov_vec[1].iov_len = builder.GetSize(); - - if (writev(fd, iov_vec, 2) == -1) - { - LOG_ERROR << errno << ": Error writing to ledger blob file. " << file_path; - close(fd); - return -1; - } - - close(fd); - - util::h32 last_shard_hash; - if (ledger_fs.get_hash(last_shard_hash, hpfs::RW_SESSION_NAME, shard_vpath) == -1) - { - LOG_ERROR << errno << ": Error reading blob shard hash: " << std::to_string(last_blob_shard_seq_no); - ledger_fs.release_rw_session(); - return -1; - } - - // Update the last blob shard hash and blob shard seqence number tracker when a new ledger is created. - ctx.set_last_blob_shard_id(p2p::sequence_hash{last_blob_shard_seq_no, last_shard_hash}); - - //Remove old shards that exceeds max shard range. - if (conf::cfg.node.history == conf::HISTORY::CUSTOM && last_blob_shard_seq_no >= conf::cfg.node.history_config.max_blob_shards) - { - remove_old_shards(last_blob_shard_seq_no - conf::cfg.node.history_config.max_blob_shards + 1, BLOB_DIR); - } - - return 0; - } - /** * Get last ledger and update the context. * @param session_name Hpfs session name. @@ -584,7 +635,7 @@ namespace ledger return 0; } - if (sqlite::open_db(shard_path + "/" + DATABASE, &db) == -1) + if (sqlite::open_db(shard_path + "/" + PRIMARY_DB, &db) == -1) { LOG_ERROR << errno << ": Error openning the shard database, shard: " << std::to_string(last_primary_shard_id.seq_no); return -1; @@ -613,7 +664,7 @@ namespace ledger * @param session_name Hpfs session name. * @param last_shard_id Struct which holds last shard data. (sequence number and hash). * @param shard_parent_dir Parent director vpath of the shards. - * @return + * @return 0 on success. -1 on error. */ int get_last_shard_info(std::string_view session_name, p2p::sequence_hash &last_shard_id, const std::string &shard_parent_dir) { @@ -659,7 +710,7 @@ namespace ledger /** * Update max_shard.seq_no meta file with the given latest shard sequence number which can be used to identify last shard * sequence number in startup. - * @param shard_parent_dir Shard's parent directory. (primary or blob). + * @param shard_parent_dir Shard's parent directory. (primary or raw). * @param last_shard_seq_no Last shard sequence number of the given parent. * @return Return -1 on error and 0 on success. */ @@ -687,7 +738,7 @@ namespace ledger if (writev(fd, iov_vec, 2) == -1) { - LOG_ERROR << errno << ": Error updating the max_shard.seq_no file for shard " << std::to_string(last_shard_seq_no); + LOG_ERROR << errno << ": Error updating the max_shard.seq_no file for shard " << last_shard_seq_no; close(fd); return -1; } @@ -712,9 +763,9 @@ namespace ledger const std::string shard_path = ledger_fs.physical_path(session_name, ledger::PRIMARY_DIR) + "/" + std::to_string(shard_seq_no); - if (sqlite::open_db(shard_path + "/" + DATABASE, &db) == -1) + if (sqlite::open_db(shard_path + "/" + PRIMARY_DB, &db) == -1) { - LOG_ERROR << errno << ": Error openning the shard database, shard: " << std::to_string(shard_seq_no); + LOG_ERROR << errno << ": Error openning the shard database, shard: " << shard_seq_no; ledger_fs.stop_ro_session(session_name); return -1; } diff --git a/src/ledger/ledger.hpp b/src/ledger/ledger.hpp index 8d8ab26a..20567ac8 100644 --- a/src/ledger/ledger.hpp +++ b/src/ledger/ledger.hpp @@ -16,14 +16,14 @@ namespace ledger p2p::sequence_hash lcl_id; std::shared_mutex last_primary_shard_mutex; p2p::sequence_hash last_primary_shard_id; - std::shared_mutex last_blob_shard_mutex; - p2p::sequence_hash last_blob_shard_id; + std::shared_mutex last_raw_shard_mutex; + p2p::sequence_hash last_raw_shard_id; public: // These flags will be marked as true after doing the shards cleanup and requesting // at the first consensus round to align with the max shard counts. std::atomic primary_shards_persisted = false; - std::atomic blob_shards_persisted = false; + std::atomic raw_shards_persisted = false; const p2p::sequence_hash get_lcl_id() { @@ -49,16 +49,16 @@ namespace ledger last_primary_shard_id = sequence_hash_id; } - const p2p::sequence_hash get_last_blob_shard_id() + const p2p::sequence_hash get_last_raw_shard_id() { - std::shared_lock lock(last_blob_shard_mutex); - return last_blob_shard_id; + std::shared_lock lock(last_raw_shard_mutex); + return last_raw_shard_id; } - void set_last_blob_shard_id(const p2p::sequence_hash &sequence_hash_id) + void set_last_raw_shard_id(const p2p::sequence_hash &sequence_hash_id) { - std::unique_lock lock(last_blob_shard_mutex); - last_blob_shard_id = sequence_hash_id; + std::unique_lock lock(last_raw_shard_mutex); + last_raw_shard_id = sequence_hash_id; } }; @@ -70,16 +70,24 @@ namespace ledger void deinit(); - int save_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users); + int update_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users); - int insert_ledger_record(sqlite3 *db, const p2p::sequence_hash ¤t_lcl_id, const uint64_t primary_shard_seq_no, - const p2p::proposal &proposal, std::string &new_ledger_hash); + int update_primary_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, p2p::sequence_hash &new_lcl_id); - int prepare_shard(sqlite3 **db, uint64_t &shard_seq_no, const uint64_t ledger_seq_no); + int update_ledger_raw_data(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id); - int save_ledger_blob(std::string_view ledger_hash, const consensus::consensed_user_map &consensed_users); + int insert_ledger_record(sqlite3 *db, const p2p::sequence_hash ¤t_lcl_id, const uint64_t shard_seq_no, + const p2p::proposal &proposal, p2p::sequence_hash &new_lcl_id); - void remove_old_shards(const uint64_t led_shard_no, std::string_view shard_parent_dir); + int insert_raw_data_records(sqlite3 *db, const uint64_t shard_seq_no, const p2p::proposal &proposal, + const consensus::consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id); + + int create_raw_data_blob_file(const std::string &shard_path, const char *file_name, size_t &file_size); + + int prepare_shard(sqlite3 **db, uint64_t &shard_seq_no, const uint64_t ledger_seq_no, const uint64_t shard_size, + const char *shard_dir, const char *db_name, const bool open_db); + + void remove_old_shards(const uint64_t lcl_seq_no, const uint64_t shard_size, const uint64_t max_shards, std::string_view shard_parent_dir); void persist_shard_history(const uint64_t shard_seq_no, std::string_view shard_parent_dir); diff --git a/src/ledger/ledger_common.hpp b/src/ledger/ledger_common.hpp index 03d6f890..cffc1d3d 100644 --- a/src/ledger/ledger_common.hpp +++ b/src/ledger/ledger_common.hpp @@ -6,12 +6,42 @@ namespace ledger { - constexpr const char *DATABASE = "ledger.sqlite"; + constexpr const char *PRIMARY_DB = "ledger.sqlite"; + constexpr const char *RAW_DB = "raw.sqlite"; + constexpr const char *RAW_INPUTS_FILE = "raw_inputs.blob"; + constexpr const char *RAW_OUTPUTS_FILE = "raw_outputs.blob"; constexpr uint64_t PRIMARY_SHARD_SIZE = 262144; // 2^18 ledgers per shard. - constexpr uint64_t BLOB_SHARD_SIZE = 4096; + constexpr uint64_t RAW_SHARD_SIZE = 4096; /** - * Struct to hold ledger fields read. + * Holds an individual input for a user within a ledger. + */ + struct ledger_user_input + { + uint64_t ledger_seq_no; // Ledger seq no. + std::string pubkey; // The user pubkey. + std::string hash; // The hash of this input. + std::string nonce; // Nonce the user had submitted for this input. + off_t blob_offset; // Blob file offset of this input blob. + size_t blob_size; // Length of the input. + std::string blob; // THe actual input blob. + }; + + /** + * Holds all the outputs for an indivudual user within a ledger. + */ + struct ledger_user_output + { + uint64_t ledger_seq_no; // Ledger seq no. + std::string pubkey; // The user pubkey. + std::string hash; // Combined output hash for this user's outputs. + off_t blob_offset; // Blob file offset of the output group header. + size_t blob_count; // How many outputs the user has within this ledger. + std::vector outputs; // The actual output blobs for this user within the ledger. + }; + + /** + * Struct to hold ledger fields corresponding to sqlite table. * All the hashes are stored as 32 byte binary data. */ struct ledger_record @@ -26,17 +56,8 @@ namespace ledger std::string user_hash; std::string input_hash; std::string output_hash; - }; - - /** - * Struct to hold ledger raw inputs and outputs. - * This is used with flatbuffers to persist to disk. - */ - struct ledger_blob - { - util::h32 ledger_hash; - std::map> inputs; - std::map> outputs; + std::optional> inputs; + std::optional> outputs; }; // Holds the global genesis ledger. diff --git a/src/ledger/ledger_mount.cpp b/src/ledger/ledger_mount.cpp index 206c6cc4..31edacef 100644 --- a/src/ledger/ledger_mount.cpp +++ b/src/ledger/ledger_mount.cpp @@ -11,7 +11,7 @@ namespace ledger { // Add ledger fs preparation logic here. p2p::sequence_hash last_primary_shard_id; - p2p::sequence_hash last_blob_shard_id; + p2p::sequence_hash last_raw_shard_id; if (acquire_rw_session() == -1) { @@ -21,7 +21,7 @@ namespace ledger if (get_last_shard_info(hpfs::RW_SESSION_NAME, last_primary_shard_id, PRIMARY_DIR) == -1 || get_last_ledger_and_update_context(hpfs::RW_SESSION_NAME, last_primary_shard_id) == -1 || - get_last_shard_info(hpfs::RW_SESSION_NAME, last_blob_shard_id, BLOB_DIR) == -1) + get_last_shard_info(hpfs::RW_SESSION_NAME, last_raw_shard_id, RAW_DIR) == -1) { LOG_ERROR << "Failed to prepare initial fs at mount " << mount_dir << "."; return -1; @@ -33,12 +33,12 @@ namespace ledger return -1; } - LOG_INFO << "Initial primary: " << last_primary_shard_id << " | blob: " << last_blob_shard_id; + LOG_INFO << "Ledger primary:" << last_primary_shard_id << " | raw:" << last_raw_shard_id; // Update last shard hash and shard number tracker. ctx.set_last_primary_shard_id(last_primary_shard_id); - // Update last blob shard hash and blob shard number tracker. - ctx.set_last_blob_shard_id(last_blob_shard_id); + // Update last raw shard hash and raw shard number tracker. + ctx.set_last_raw_shard_id(last_raw_shard_id); return 0; } diff --git a/src/ledger/ledger_mount.hpp b/src/ledger/ledger_mount.hpp index 996eee6a..555bfa31 100644 --- a/src/ledger/ledger_mount.hpp +++ b/src/ledger/ledger_mount.hpp @@ -9,7 +9,7 @@ namespace ledger { constexpr const char *PRIMARY_DIR = "/primary"; // Ledger primary directory name. - constexpr const char *BLOB_DIR = "/blob"; // Ledger blob directory name. + constexpr const char *RAW_DIR = "/raw"; // Ledger raw data directory name. constexpr const char *PREV_SHARD_HASH_FILENAME = "/prev_shard.hash"; // Previous shard hash file name. constexpr const char *SHARD_SEQ_NO_FILENAME = "/max_shard.seq_no"; // Meta file containing the maximum shard seq number information. /** diff --git a/src/ledger/ledger_query.cpp b/src/ledger/ledger_query.cpp index a8e881d6..022f0635 100644 --- a/src/ledger/ledger_query.cpp +++ b/src/ledger/ledger_query.cpp @@ -2,7 +2,7 @@ #include "ledger_common.hpp" #include "ledger.hpp" #include "sqlite.hpp" -#include "../msg/fbuf/ledger_helpers.hpp" +#include "../conf.hpp" #include "../util/version.hpp" namespace ledger::query @@ -29,7 +29,7 @@ namespace ledger::query if (ledger::ledger_fs.start_ro_session(fs_sess_name, false) == -1) return res; - std::vector records; + std::vector ledgers; if (q.index() == 0) // Filter by seq no. { @@ -39,11 +39,26 @@ namespace ledger::query if (seq_no_res != -1) { if (seq_no_res == 1) // Ledger found. - records.push_back(query_result_record{std::move(ledger)}); + ledgers.push_back(std::move(ledger)); - // Fill raw input/output data into results. - if (fill_blob_data(records, seq_q.raw_inputs, seq_q.raw_outputs, fs_sess_name) != -1) - res = std::move(records); + // Fill raw data if required. + if (seq_q.inputs || seq_q.outputs) + { + for (ledger_record &ledger : ledgers) + { + if (seq_q.inputs) + ledger.inputs = std::vector(); + if (seq_q.outputs) + ledger.outputs = std::vector(); + + if (get_ledger_raw_data(ledger, user_pubkey, fs_sess_name) != -1) + res = ledgers; + } + } + else + { + res = ledgers; + } } } @@ -51,68 +66,6 @@ namespace ledger::query return res; } - /** - * Fills in the raw input/output blob data to the specified ledger query result records. - * @param records List of query results to fill in. - * @param raw_inputs Whether raw inputs must be filled. - * @param raw_outputs Whether raw outputs must be filled. - * @param fs_sess_name The ledger hosting fs session name. - */ - int fill_blob_data(std::vector &records, const bool raw_inputs, const bool raw_outputs, const std::string &fs_sess_name) - { - // If blob data is not requested to be filled, the relevant field (inputs or outputs) in each result will contain NULL. - // If blob data is requested to be filled, then the relevant field will contain the map of blobs or empty map. - - if (!raw_inputs && !raw_outputs) - return 0; // Nothing to fill. - - for (query_result_record &r : records) - { - // Populate with empty map if inputs/outputs requested. - if (raw_inputs) - r.raw_inputs = blob_map(); - if (raw_outputs) - r.raw_outputs = blob_map(); - - if (r.ledger.seq_no == 0) - return 0; // Nothing to fill for GENESIS ledger. - - const uint64_t shard_seq_no = (r.ledger.seq_no - 1) / ledger::BLOB_SHARD_SIZE; - const std::string file_vpath = std::string(ledger::BLOB_DIR) + "/" + std::to_string(shard_seq_no) + "/" + util::to_hex(r.ledger.ledger_hash) + ".blob"; - const std::string file_path = ledger::ledger_fs.physical_path(fs_sess_name, file_vpath); - std::string blob_msg; - const int fd = open(file_path.data(), O_RDONLY); - - // If file does not exist, skip this leadger. (it means there are no input/output data for this leadger) - if (fd == -1 && errno == ENOENT) - continue; - - if (fd != -1 && util::read_from_fd(fd, blob_msg, version::VERSION_BYTES_LEN) > 0) - { - ledger_blob raw_data; - if (msg::fbuf::ledgermsg::create_ledger_blob_from_msg(raw_data, blob_msg, raw_inputs, raw_outputs) != -1) - { - if (raw_inputs) - raw_data.inputs.swap(*r.raw_inputs); - - if (raw_outputs) - raw_data.outputs.swap(*r.raw_outputs); - - close(fd); - continue; - } - } - - if (fd != -1) - close(fd); - - // Reaching this point means loop has encountered an error. - return -1; - } - - return 0; - } - /** * Get the ledger record by seq no. * @param ledger Ledger structure to populate (if match found)). @@ -131,20 +84,165 @@ namespace ledger::query // Construct shard path based on provided ledger seq no. const uint64_t shard_seq_no = (q.seq_no - 1) / ledger::PRIMARY_SHARD_SIZE; - const std::string db_vpath = std::string(ledger::PRIMARY_DIR) + "/" + std::to_string(shard_seq_no) + "/" + ledger::DATABASE; - const std::string dbpath = ledger::ledger_fs.physical_path(fs_sess_name, db_vpath); + const std::string db_vpath = std::string(ledger::PRIMARY_DIR) + "/" + std::to_string(shard_seq_no) + "/" + ledger::PRIMARY_DB; + const std::string db_path = ledger::ledger_fs.physical_path(fs_sess_name, db_vpath); - if (!util::is_file_exists(dbpath)) + if (!util::is_file_exists(db_path)) return 0; // Not found. - query_result_record result; - sqlite3 *db = NULL; - if (sqlite::open_db(dbpath, &db) == -1) + if (sqlite::open_db(db_path, &db, true) == -1) return -1; const int sql_res = sqlite::get_ledger_by_seq_no(db, q.seq_no, ledger); sqlite::close_db(&db); return sql_res; } + + /** + * Retrieve user inputs and outputs by ledger seq no. If consensus is private, this only fills blobs of the requesting user. + * @param ledger Ledger record to populate with inputs and outputs. + * @param user_pubkey Binary pubkey of the user executing the query. + * @param fs_sess_name The ledger hosting fs session name. + * @returns 0 on success. -1 on failure. + */ + int get_ledger_raw_data(ledger_record &ledger, std::string_view user_pubkey, const std::string &fs_sess_name) + { + // If both inputs and outputs collections are null, don't proceed. + if (!ledger.inputs && !ledger.outputs) + return 0; + + const uint64_t shard_seq_no = (ledger.seq_no - 1) / ledger::RAW_SHARD_SIZE; + const std::string shard_path = ledger::ledger_fs.physical_path(fs_sess_name, std::string(ledger::RAW_DIR) + "/" + std::to_string(shard_seq_no) + "/"); + const std::string db_path = shard_path + RAW_DB; + + if (!util::is_file_exists(db_path)) + return 0; // Not found. + + sqlite3 *db = NULL; + if (sqlite::open_db(db_path, &db, true) == -1) + return -1; + + if ((ledger.inputs && get_ledger_inputs(db, *ledger.inputs, ledger.seq_no, shard_path, user_pubkey, fs_sess_name) == -1) || + (ledger.outputs && get_ledger_outputs(db, *ledger.outputs, ledger.seq_no, shard_path, user_pubkey, fs_sess_name) == -1)) + { + sqlite::close_db(&db); + return -1; + } + + sqlite::close_db(&db); + return 0; + } + + /** + * Retrieve user inputs by ledger seq no. If consensus is private, this only fills blobs of the requesting user. + * @param db Sqlite db connection for raw data db. + * @param inputs User input collection to populate. + * @param seq_no Ledger seq no. to query. + * @param shard_path The shard physical path. + * @param user_pubkey Binary pubkey of the user executing the query. + * @param fs_sess_name The ledger hosting fs session name. + * @returns 0 on success. -1 on failure. + */ + int get_ledger_inputs(sqlite3 *db, std::vector &inputs, const uint64_t seq_no, const std::string &shard_path, std::string_view user_pubkey, const std::string &fs_sess_name) + { + if (sqlite::get_user_inputs_by_seq_no(db, seq_no, inputs) == -1) + return -1; + + if (inputs.empty()) + return 0; + + const std::string blob_file = shard_path + RAW_INPUTS_FILE; + const int fd = open(blob_file.data(), O_RDONLY); + if (fd == -1) + { + LOG_ERROR << errno << ": Error in query when opening " << blob_file; + return -1; + } + + for (ledger_user_input &inp : inputs) + { + // Do not return other users' blobs if consensus is private. + if (!conf::cfg.contract.is_consensus_public && inp.pubkey != user_pubkey) + continue; + + inp.blob.resize(inp.blob_size); + if (util::read_from_fd(fd, inp.blob.data(), inp.blob_size, inp.blob_offset, blob_file) == -1) + { + close(fd); + return -1; + } + } + + close(fd); + return 0; + } + + /** + * Retrieve user outputs by ledger seq no. If consensus is private, this only fills blobs of the requesting user. + * @param db Sqlite db connection for raw data db. + * @param outputs User output collection to populate. + * @param seq_no Ledger seq no. to query. + * @param shard_path The shard physical path. + * @param user_pubkey Binary pubkey of the user executing the query. + * @param fs_sess_name The ledger hosting fs session name. + * @returns 0 on success. -1 on failure. + */ + int get_ledger_outputs(sqlite3 *db, std::vector &outputs, const uint64_t seq_no, const std::string &shard_path, std::string_view user_pubkey, const std::string &fs_sess_name) + { + if (sqlite::get_user_outputs_by_seq_no(db, seq_no, outputs) == -1) + return -1; + + if (outputs.empty()) + return 0; + + const std::string blob_file = shard_path + RAW_OUTPUTS_FILE; + const int fd = open(blob_file.data(), O_RDONLY); + if (fd == -1) + { + LOG_ERROR << errno << ": Error in query when opening " << blob_file; + return -1; + } + + // Loop through each user's blob groups. + for (ledger_user_output &user : outputs) + { + // Do not return other users' blobs if consensus is private. + if (!conf::cfg.contract.is_consensus_public && user.pubkey != user_pubkey) + continue; + + // Output blobs for each user are grouped. Group header contains all individual blob offsets and sizes + // for that user, followed by actual blobs. + + // Read the entire header. + const off_t header_pos = user.blob_offset; + std::vector header(user.blob_count * (sizeof(off_t) + sizeof(size_t))); + if (util::read_from_fd(fd, header.data(), header.size(), header_pos, blob_file) == -1) + { + close(fd); + return -1; + } + + for (size_t i = 0; i < user.blob_count; i++) + { + // Position inside the header which contains the offset of the individual output blob. + const off_t header_read_pos = i * (sizeof(off_t) + sizeof(size_t)); + const uint64_t offset = util::uint64_from_bytes(header.data() + header_read_pos); + const size_t size = util::uint64_from_bytes(header.data() + header_read_pos + sizeof(size_t)); + + // Read the output blob content. + std::string output; + output.resize(size); + if (util::read_from_fd(fd, output.data(), output.size(), offset, blob_file) == -1) + { + close(fd); + return -1; + } + user.outputs.push_back(std::move(output)); + } + } + + close(fd); + return 0; + } } \ No newline at end of file diff --git a/src/ledger/ledger_query.hpp b/src/ledger/ledger_query.hpp index 0f0ab2e0..1ecd3e6c 100644 --- a/src/ledger/ledger_query.hpp +++ b/src/ledger/ledger_query.hpp @@ -12,17 +12,8 @@ namespace ledger::query struct seq_no_query { uint64_t seq_no = 0; - bool raw_inputs = false; - bool raw_outputs = false; - }; - - typedef std::map> blob_map; - - struct query_result_record - { - ledger::ledger_record ledger; - std::optional raw_inputs; - std::optional raw_outputs; + bool inputs = false; + bool outputs = false; }; struct user_buffer_collection @@ -32,11 +23,14 @@ namespace ledger::query }; typedef std::variant query_request; - typedef std::variant> query_result; + typedef std::variant> query_result; const query_result execute(std::string_view user_pubkey, const query_request &q); - int fill_blob_data(std::vector &records, const bool raw_inputs, const bool raw_outputs, const std::string &fs_sess_name); int get_ledger_by_seq_no(ledger_record &ledger, const seq_no_query &q, const std::string &fs_sess_name); + int get_ledger_raw_data(ledger_record &ledger, std::string_view user_pubkey, const std::string &fs_sess_name); + int get_ledger_inputs(sqlite3 *db, std::vector &inputs, const uint64_t seq_no, const std::string &shard_path, std::string_view user_pubkey, const std::string &fs_sess_name); + int get_ledger_outputs(sqlite3 *db, std::vector &outputs, const uint64_t seq_no, const std::string &shard_path, std::string_view user_pubkey, const std::string &fs_sess_name); + } #endif \ No newline at end of file diff --git a/src/ledger/ledger_sync.cpp b/src/ledger/ledger_sync.cpp index 123b485a..a4d472d9 100644 --- a/src/ledger/ledger_sync.cpp +++ b/src/ledger/ledger_sync.cpp @@ -57,10 +57,6 @@ namespace ledger return; } - // If existing max shard is older than the max we can keep. Then delete all the existing shards. - if (conf::cfg.node.history == conf::HISTORY::CUSTOM && synced_shard_seq_no - last_primary_shard_seq_no >= conf::cfg.node.history_config.max_primary_shards) - remove_old_shards(last_primary_shard_seq_no + 1, PRIMARY_DIR); - const p2p::sequence_hash updated_primary_shard_id{synced_shard_seq_no, synced_target.hash}; if (get_last_ledger_and_update_context(hpfs::RW_SESSION_NAME, updated_primary_shard_id) == -1) { @@ -70,6 +66,9 @@ namespace ledger ctx.set_last_primary_shard_id(updated_primary_shard_id); last_primary_shard_seq_no = synced_shard_seq_no; is_last_primary_shard_syncing = false; + + // If existing max shard is older than the max we can keep. Then delete all the existing shards. + remove_old_shards(ctx.get_lcl_id().seq_no, PRIMARY_SHARD_SIZE, conf::cfg.node.history_config.max_primary_shards, PRIMARY_DIR); } if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all shards if this is a full history node. @@ -86,65 +85,64 @@ namespace ledger const std::string shard_path = std::string(PRIMARY_DIR).append("/").append(std::to_string(synced_shard_seq_no)); set_target_push_back(hpfs::sync_target{sync_name, prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); } - else if (conf::cfg.node.history == conf::HISTORY::CUSTOM && last_primary_shard_seq_no >= conf::cfg.node.history_config.max_primary_shards) + else { // When there are no more shards to sync, Remove old shards that exceeds max shard range. - remove_old_shards(last_primary_shard_seq_no - conf::cfg.node.history_config.max_primary_shards + 1, PRIMARY_DIR); + remove_old_shards(ctx.get_lcl_id().seq_no, PRIMARY_SHARD_SIZE, conf::cfg.node.history_config.max_primary_shards, PRIMARY_DIR); } } - else if (conf::cfg.node.history == conf::HISTORY::CUSTOM && last_primary_shard_seq_no >= conf::cfg.node.history_config.max_primary_shards) + else { // When there are no more shards to sync, Remove old shards that exceeds max shard range. - remove_old_shards(last_primary_shard_seq_no - conf::cfg.node.history_config.max_primary_shards + 1, PRIMARY_DIR); + remove_old_shards(ctx.get_lcl_id().seq_no, PRIMARY_SHARD_SIZE, conf::cfg.node.history_config.max_primary_shards, PRIMARY_DIR); } } - else if (shard_parent_dir == BLOB_DIR) + else if (shard_parent_dir == RAW_DIR) { - // If the synced blob shard sequence number is equal or greater than the current blob shard seq number, + // If the synced raw shard sequence number is equal or greater than the current raw shard seq number, // then the context information should be updated. - uint64_t last_blob_shard_seq_no = ctx.get_last_blob_shard_id().seq_no; - if (last_blob_shard_seq_no <= synced_shard_seq_no) + uint64_t last_raw_shard_seq_no = ctx.get_last_raw_shard_id().seq_no; + if (last_raw_shard_seq_no <= synced_shard_seq_no) { // Persist the lastest synced shard seq number to the max shard meta file. - if (persist_max_shard_seq_no(BLOB_DIR, synced_shard_seq_no) == -1) + if (persist_max_shard_seq_no(RAW_DIR, synced_shard_seq_no) == -1) { - LOG_ERROR << "Error updating max shard meta file in blob shard sync."; + LOG_ERROR << "Error updating max shard meta file in raw shard sync."; return; } - // If existing max shard is older than the max we can keep. Then delete all the existing shards. - if (conf::cfg.node.history == conf::HISTORY::CUSTOM && synced_shard_seq_no - last_blob_shard_seq_no >= conf::cfg.node.history_config.max_blob_shards) - remove_old_shards(last_blob_shard_seq_no + 1, BLOB_DIR); + last_raw_shard_seq_no = synced_shard_seq_no; + ctx.set_last_raw_shard_id(p2p::sequence_hash{synced_shard_seq_no, synced_target.hash}); + is_last_raw_shard_syncing = false; - last_blob_shard_seq_no = synced_shard_seq_no; - ctx.set_last_blob_shard_id(p2p::sequence_hash{synced_shard_seq_no, synced_target.hash}); - is_last_blob_shard_syncing = false; + // If existing max shard is older than the max we can keep. Then delete all the existing shards. + remove_old_shards(ctx.get_lcl_id().seq_no, RAW_SHARD_SIZE, conf::cfg.node.history_config.max_raw_shards, RAW_DIR); } - if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all blob shards if this is a full history node. - last_blob_shard_seq_no - synced_shard_seq_no + 1 < conf::cfg.node.history_config.max_blob_shards) + if (conf::cfg.node.history == conf::HISTORY::FULL || // Sync all raw shards if this is a full history node. + last_raw_shard_seq_no - synced_shard_seq_no + 1 < conf::cfg.node.history_config.max_raw_shards) { - // Check whether the blob hash of the previous blob shard matches with the hash in the prev_shard.hash file. - const std::string prev_shard_vpath = std::string(BLOB_DIR).append("/").append(std::to_string(--synced_shard_seq_no)); + // Check whether the hash of the previous raw shard matches with the hash in the prev_shard.hash file. + const std::string prev_shard_vpath = std::string(RAW_DIR).append("/").append(std::to_string(--synced_shard_seq_no)); fs_mount->get_hash(prev_shard_hash_from_hpfs, hpfs::RW_SESSION_NAME, prev_shard_vpath); if (prev_shard_hash_from_file != util::h32_empty // Hash in the prev_shard.hash of the 0th shard is h32 empty. Syncing should be stopped then. && prev_shard_hash_from_file != prev_shard_hash_from_hpfs) // Continue to sync backwards if the hash from prev_shard.hash is not matching with the shard hash from hpfs. { - const std::string sync_name = "blob shard " + std::to_string(synced_shard_seq_no); - const std::string shard_path = std::string(BLOB_DIR).append("/").append(std::to_string(synced_shard_seq_no)); + const std::string sync_name = "raw shard " + std::to_string(synced_shard_seq_no); + const std::string shard_path = std::string(RAW_DIR).append("/").append(std::to_string(synced_shard_seq_no)); set_target_push_back(hpfs::sync_target{sync_name, prev_shard_hash_from_file, shard_path, hpfs::BACKLOG_ITEM_TYPE::DIR}); } - else if (conf::cfg.node.history == conf::HISTORY::CUSTOM && last_blob_shard_seq_no >= conf::cfg.node.history_config.max_blob_shards) + else { // When there are no more shards to sync, Remove old shards that exceeds max shard range. - remove_old_shards(last_blob_shard_seq_no - conf::cfg.node.history_config.max_blob_shards + 1, BLOB_DIR); + remove_old_shards(ctx.get_lcl_id().seq_no, RAW_SHARD_SIZE, conf::cfg.node.history_config.max_raw_shards, RAW_DIR); } } - else if (conf::cfg.node.history == conf::HISTORY::CUSTOM && last_blob_shard_seq_no >= conf::cfg.node.history_config.max_blob_shards) + else { // When there are no more shards to sync, Remove old shards that exceeds max shard range. - remove_old_shards(last_blob_shard_seq_no - conf::cfg.node.history_config.max_blob_shards + 1, BLOB_DIR); + remove_old_shards(ctx.get_lcl_id().seq_no, RAW_SHARD_SIZE, conf::cfg.node.history_config.max_raw_shards, RAW_DIR); } } } @@ -162,7 +160,7 @@ namespace ledger { // Reset these flags since we are abandoning the sync. is_last_primary_shard_syncing = false; - is_last_blob_shard_syncing = false; + is_last_raw_shard_syncing = false; } } // namespace ledger \ No newline at end of file diff --git a/src/ledger/ledger_sync.hpp b/src/ledger/ledger_sync.hpp index 0c8f4938..e6a5a0f8 100644 --- a/src/ledger/ledger_sync.hpp +++ b/src/ledger/ledger_sync.hpp @@ -17,7 +17,7 @@ namespace ledger public: std::atomic is_last_primary_shard_syncing = false; - std::atomic is_last_blob_shard_syncing = false; + std::atomic is_last_raw_shard_syncing = false; }; } // namespace ledger #endif \ No newline at end of file diff --git a/src/ledger/sqlite.cpp b/src/ledger/sqlite.cpp index 3a86dafc..651dd39c 100644 --- a/src/ledger/sqlite.cpp +++ b/src/ledger/sqlite.cpp @@ -5,10 +5,16 @@ namespace ledger::sqlite { constexpr const char *LEDGER_TABLE = "ledger"; - constexpr const char *HP_VERSION_TABLE = "hp"; - constexpr const char *HP_VERSION_COLUMN = "hp_version"; + constexpr const char *USERS_TABLE = "users"; + constexpr const char *INPUTS_TABLE = "inputs"; + constexpr const char *OUTPUTS_TABLE = "outputs"; + constexpr const char *HP_TABLE = "hp"; + constexpr const char *LEDGER_VERSION_COLUMN = "ledger_version"; constexpr const char *COLUMN_DATA_TYPES[]{"INT", "TEXT", "BLOB"}; constexpr const char *CREATE_TABLE = "CREATE TABLE IF NOT EXISTS "; + constexpr const char *CREATE_INDEX = "CREATE INDEX "; + constexpr const char *CREATE_UNIQUE_INDEX = "CREATE UNIQUE INDEX "; + constexpr const char *JOURNAL_MODE_OFF = "PRAGMA journal_mode=OFF"; constexpr const char *INSERT_INTO = "INSERT INTO "; constexpr const char *PRIMARY_KEY = "PRIMARY KEY"; constexpr const char *NOT_NULL = "NOT NULL"; @@ -19,13 +25,26 @@ namespace ledger::sqlite constexpr const char *AND = " AND "; constexpr const char *SELECT_LAST_LEDGER = "SELECT * FROM ledger ORDER BY seq_no DESC LIMIT 1"; constexpr const char *SELECT_LEDGER_BY_SEQ_NO = "SELECT * FROM ledger WHERE seq_no=? LIMIT 1"; + + constexpr const char *SELECT_INPUTS_BY_SEQ_NO = "SELECT * FROM inputs WHERE ledger_seq_no=?"; + constexpr const char *SELECT_OUTPUTS_BY_SEQ_NO = "SELECT * FROM outputs WHERE ledger_seq_no=?"; + constexpr const char *INSERT_INTO_LEDGER = "INSERT INTO ledger(" "seq_no, time, ledger_hash, prev_ledger_hash, data_hash," "state_hash, patch_hash, user_hash, input_hash, output_hash" ") VALUES(?,?,?,?,?,?,?,?,?,?)"; + constexpr const char *INSERT_INTO_USERS = "INSERT INTO users(ledger_seq_no, pubkey) VALUES(?,?)"; + constexpr const char *INSERT_INTO_USER_INPUTS = "INSERT INTO inputs(ledger_seq_no, pubkey, hash, nonce," + " blob_offset, blob_size) VALUES(?,?,?,?,?,?)"; + constexpr const char *INSERT_INTO_USER_OUTPUTS = "INSERT INTO outputs(ledger_seq_no, pubkey, hash," + " blob_offset, blob_count) VALUES(?,?,?,?,?)"; +#define PUBKEY_SIZE 33 #define BIND_H32_BLOB(idx, field) (field.size() == sizeof(util::h32) && sqlite3_bind_blob(stmt, idx, field.data(), sizeof(util::h32), SQLITE_STATIC) == SQLITE_OK) +#define BIND_PUBKEY_BLOB(idx, field) (field.size() == PUBKEY_SIZE && sqlite3_bind_blob(stmt, idx, field.data(), PUBKEY_SIZE, SQLITE_STATIC) == SQLITE_OK) +#define BIND_BLOB(idx, field) (field.size() > 0 && sqlite3_bind_blob(stmt, idx, field.data(), field.size(), SQLITE_STATIC) == SQLITE_OK) #define GET_H32_BLOB(idx) std::string((char *)sqlite3_column_blob(stmt, idx), sizeof(util::h32)) +#define GET_PUBKEY_BLOB(idx) std::string((char *)sqlite3_column_blob(stmt, idx), PUBKEY_SIZE) /** * Opens a connection to a given databse and give the db pointer. @@ -33,15 +52,22 @@ namespace ledger::sqlite * @param db Pointer to the db pointer which is to be connected and pointed. * @returns returns 0 on success, or -1 on error. */ - int open_db(std::string_view db_name, sqlite3 **db) + int open_db(std::string_view db_name, sqlite3 **db, const bool read_only) { int ret; - if ((ret = sqlite3_open(db_name.data(), db)) != SQLITE_OK) + const int flags = read_only ? SQLITE_OPEN_READONLY : (SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE); + if ((ret = sqlite3_open_v2(db_name.data(), db, flags, 0)) != SQLITE_OK) { *db = NULL; LOG_ERROR << "Can't open database: " << ret << ", " << sqlite3_errmsg(*db); return -1; } + + // We turn off journaling for the db because we don't need transacion support. + // Journaling mode introduces lot of extra underyling file system operations which causes lot of overhead. + if (exec_sql(*db, JOURNAL_MODE_OFF) == -1) + return -1; + return 0; } @@ -75,9 +101,7 @@ namespace ledger::sqlite int create_table(sqlite3 *db, std::string_view table_name, const std::vector &column_info) { std::string sql; - sql.append(CREATE_TABLE); - sql.append(table_name); - sql.append(" ("); + sql.append(CREATE_TABLE).append(table_name).append(" ("); for (auto itr = column_info.begin(); itr != column_info.end(); ++itr) { @@ -102,8 +126,32 @@ namespace ledger::sqlite } sql.append(")"); - /* Execute SQL statement */ - return exec_sql(db, sql); + const int ret = exec_sql(db, sql); + if (ret == -1) + LOG_ERROR << "Error when creating sqlite table " << table_name; + + return ret; + } + + int create_index(sqlite3 *db, std::string_view table_name, std::string_view column_names, const bool is_unique) + { + std::string index_name = std::string("idx_").append(table_name).append("_").append(column_names); + std::replace(index_name.begin(), index_name.end(), ',', '_'); + + std::string sql; + sql.append(is_unique ? CREATE_UNIQUE_INDEX : CREATE_INDEX) + .append(index_name) + .append(" ON ") + .append(table_name) + .append("(") + .append(column_names) + .append(")"); + + const int ret = exec_sql(db, sql); + if (ret == -1) + LOG_ERROR << "Error when creating sqlite index '" << index_name << "' in table " << table_name; + + return ret; } /** @@ -210,6 +258,9 @@ namespace ledger::sqlite */ int close_db(sqlite3 **db) { + if (*db == NULL) + return 0; + if (sqlite3_close(*db) != SQLITE_OK) { LOG_ERROR << "Can't close database: " << sqlite3_errmsg(*db); @@ -221,13 +272,13 @@ namespace ledger::sqlite } /** - * Creates a table for ledger records. + * Sets up a blank ledger database. * @param db Pointer to the db. * @returns returns 0 on success, or -1 on error. */ - int create_ledger_table(sqlite3 *db) + int initialize_ledger_db(sqlite3 *db) { - const std::vector column_info{ + const std::vector columns{ table_column_info("seq_no", COLUMN_DATA_TYPE::INT, true), table_column_info("time", COLUMN_DATA_TYPE::INT), table_column_info("ledger_hash", COLUMN_DATA_TYPE::BLOB), @@ -239,29 +290,86 @@ namespace ledger::sqlite table_column_info("input_hash", COLUMN_DATA_TYPE::BLOB), table_column_info("output_hash", COLUMN_DATA_TYPE::BLOB)}; - if (create_table(db, LEDGER_TABLE, column_info) == -1) + if (create_table(db, LEDGER_TABLE, columns) == -1 || + create_index(db, LEDGER_TABLE, "time", true) == -1 || + create_index(db, LEDGER_TABLE, "ledger_hash", true) == -1) return -1; return 0; } /** - * Create and update the hp table from the hp version when creating a new shard. + * Sets up a blank ledger raw data database. * @param db Pointer to the db. - * @param version Hp version. + * @returns returns 0 on success, or -1 on error. + */ + int initialize_ledger_raw_db(sqlite3 *db) + { + // Users table. + { + const std::vector user_columns{ + table_column_info("ledger_seq_no", COLUMN_DATA_TYPE::INT), + table_column_info("pubkey", COLUMN_DATA_TYPE::BLOB)}; + + if (create_table(db, USERS_TABLE, user_columns) == -1 || + create_index(db, USERS_TABLE, "ledger_seq_no", false) == -1 || + create_index(db, USERS_TABLE, "pubkey", false) == -1) + return -1; + } + + // Inputs table. + { + const std::vector input_columns{ + table_column_info("ledger_seq_no", COLUMN_DATA_TYPE::INT), + table_column_info("pubkey", COLUMN_DATA_TYPE::BLOB), + table_column_info("hash", COLUMN_DATA_TYPE::BLOB), + table_column_info("nonce", COLUMN_DATA_TYPE::BLOB), + table_column_info("blob_offset", COLUMN_DATA_TYPE::INT), + table_column_info("blob_size", COLUMN_DATA_TYPE::INT)}; + + if (create_table(db, INPUTS_TABLE, input_columns) == -1 || + create_index(db, INPUTS_TABLE, "ledger_seq_no", false) == -1 || + create_index(db, INPUTS_TABLE, "hash", false) == -1 || + create_index(db, INPUTS_TABLE, "ledger_seq_no,pubkey", false) == -1) + return -1; + } + + // Outputs table. + { + const std::vector input_columns{ + table_column_info("ledger_seq_no", COLUMN_DATA_TYPE::INT), + table_column_info("pubkey", COLUMN_DATA_TYPE::BLOB), + table_column_info("hash", COLUMN_DATA_TYPE::BLOB), + table_column_info("blob_offset", COLUMN_DATA_TYPE::INT), + table_column_info("blob_count", COLUMN_DATA_TYPE::INT)}; + + if (create_table(db, OUTPUTS_TABLE, input_columns) == -1 || + create_index(db, OUTPUTS_TABLE, "ledger_seq_no", false) == -1 || + create_index(db, OUTPUTS_TABLE, "hash", false) == -1 || + create_index(db, OUTPUTS_TABLE, "ledger_seq_no,pubkey", false) == -1) + return -1; + } + + return 0; + } + + /** + * Create and update the hp system table when creating a new shard. + * @param db Pointer to the db. + * @param version Version string to be placed in the table. * @returns returns 0 on success, or -1 on error. * */ - int create_hp_version_table_and_update(sqlite3 *db, std::string_view version) + int create_hp_table(sqlite3 *db, std::string_view version) { const std::vector column_info{ - table_column_info(HP_VERSION_COLUMN, COLUMN_DATA_TYPE::TEXT)}; + table_column_info(LEDGER_VERSION_COLUMN, COLUMN_DATA_TYPE::TEXT)}; - if (create_table(db, HP_VERSION_TABLE, column_info) == -1) + if (create_table(db, HP_TABLE, column_info) == -1) return -1; const std::string value_string = "\"" + std::string(version) + "\""; - if (insert_row(db, HP_VERSION_TABLE, HP_VERSION_COLUMN, value_string) == -1) + if (insert_row(db, HP_TABLE, LEDGER_VERSION_COLUMN, value_string) == -1) return -1; return 0; @@ -293,18 +401,106 @@ namespace ledger::sqlite return 0; } + LOG_ERROR << "Error inserting ledger record. " << sqlite3_errmsg(db); sqlite3_finalize(stmt); return -1; } - /** - * Checks whether ledger table exist. - * @param db Pointer to the db. - * @returns returns true is exist, otherwise false. - */ - bool is_ledger_table_exist(sqlite3 *db) + sqlite3_stmt *prepare_user_insert(sqlite3 *db) { - return is_table_exists(db, LEDGER_TABLE); + sqlite3_stmt *stmt; + if (sqlite3_prepare_v2(db, INSERT_INTO_USERS, -1, &stmt, 0) == SQLITE_OK && stmt != NULL) + return stmt; + + return NULL; + } + + sqlite3_stmt *prepare_user_input_insert(sqlite3 *db) + { + sqlite3_stmt *stmt; + if (sqlite3_prepare_v2(db, INSERT_INTO_USER_INPUTS, -1, &stmt, 0) == SQLITE_OK && stmt != NULL) + return stmt; + + return NULL; + } + + sqlite3_stmt *prepare_user_output_insert(sqlite3 *db) + { + sqlite3_stmt *stmt; + if (sqlite3_prepare_v2(db, INSERT_INTO_USER_OUTPUTS, -1, &stmt, 0) == SQLITE_OK && stmt != NULL) + return stmt; + + LOG_ERROR << "Prepare sqlite statement failed."; + return NULL; + } + + int insert_user_record(sqlite3_stmt *stmt, const uint64_t ledger_seq_no, std::string_view pubkey) + { + if (stmt == NULL) + { + LOG_ERROR << "Sqlite statement null."; + return -1; + } + + if (sqlite3_reset(stmt) == SQLITE_OK && + sqlite3_bind_int64(stmt, 1, ledger_seq_no) == SQLITE_OK && + BIND_PUBKEY_BLOB(2, pubkey) && + sqlite3_step(stmt) == SQLITE_DONE) + { + return 0; + } + + LOG_ERROR << "Error inserting user record."; + return -1; + } + + int insert_user_input_record(sqlite3_stmt *stmt, const uint64_t ledger_seq_no, std::string_view pubkey, + std::string_view hash, std::string_view nonce, const uint64_t blob_offset, const uint64_t blob_size) + { + if (stmt == NULL) + { + LOG_ERROR << "Sqlite statement null."; + return -1; + } + + if (sqlite3_reset(stmt) == SQLITE_OK && + sqlite3_bind_int64(stmt, 1, ledger_seq_no) == SQLITE_OK && + BIND_PUBKEY_BLOB(2, pubkey) && + BIND_H32_BLOB(3, hash) && + BIND_BLOB(4, nonce) && + sqlite3_bind_int64(stmt, 5, blob_offset) == SQLITE_OK && + sqlite3_bind_int64(stmt, 6, blob_size) == SQLITE_OK && + sqlite3_step(stmt) == SQLITE_DONE) + { + return 0; + } + + LOG_ERROR << "Error inserting user input record."; + return -1; + } + + int insert_user_output_record(sqlite3_stmt *stmt, const uint64_t ledger_seq_no, std::string_view pubkey, + std::string_view hash, const uint64_t blob_offset, const uint64_t output_count) + { + if (stmt == NULL) + { + LOG_ERROR << "Sqlite statement null."; + return -1; + } + + if (sqlite3_reset(stmt) == SQLITE_OK && + sqlite3_bind_int64(stmt, 1, ledger_seq_no) == SQLITE_OK && + BIND_PUBKEY_BLOB(2, pubkey) && + BIND_H32_BLOB(3, hash) && + sqlite3_bind_int64(stmt, 4, blob_offset) == SQLITE_OK && + sqlite3_bind_int64(stmt, 5, output_count) == SQLITE_OK && + sqlite3_step(stmt) == SQLITE_DONE) + { + return 0; + } + + LOG_ERROR << "Error inserting user output record."; + return -1; } /** @@ -325,7 +521,7 @@ namespace ledger::sqlite return 0; } - LOG_ERROR << "Error when querying last ledger from db."; + LOG_ERROR << "Error when querying last ledger from db. " << sqlite3_errmsg(db); sqlite3_finalize(stmt); return -1; } @@ -358,7 +554,45 @@ namespace ledger::sqlite } } - LOG_ERROR << "Error when querying ledger by seq no. from db."; + LOG_ERROR << "Error when querying ledger by seq no. from db. " << sqlite3_errmsg(db); + sqlite3_finalize(stmt); + return -1; + } + + int get_user_inputs_by_seq_no(sqlite3 *db, const uint64_t seq_no, std::vector &inputs) + { + sqlite3_stmt *stmt; + + if (sqlite3_prepare_v2(db, SELECT_INPUTS_BY_SEQ_NO, -1, &stmt, 0) == SQLITE_OK && stmt != NULL && + sqlite3_bind_int64(stmt, 1, seq_no) == SQLITE_OK) + { + while (sqlite3_step(stmt) == SQLITE_ROW) + inputs.push_back(populate_user_input_from_sql_record(stmt)); + + sqlite3_finalize(stmt); + return 0; + } + + LOG_ERROR << "Error when querying ledger inputs by seq no. from db. " << sqlite3_errmsg(db); + sqlite3_finalize(stmt); + return -1; + } + + int get_user_outputs_by_seq_no(sqlite3 *db, const uint64_t seq_no, std::vector &outputs) + { + sqlite3_stmt *stmt; + + if (sqlite3_prepare_v2(db, SELECT_OUTPUTS_BY_SEQ_NO, -1, &stmt, 0) == SQLITE_OK && stmt != NULL && + sqlite3_bind_int64(stmt, 1, seq_no) == SQLITE_OK) + { + while (sqlite3_step(stmt) == SQLITE_ROW) + outputs.push_back(populate_user_output_from_sql_record(stmt)); + + sqlite3_finalize(stmt); + return 0; + } + + LOG_ERROR << "Error when querying ledger outputs by seq no. from db. " << sqlite3_errmsg(db); sqlite3_finalize(stmt); return -1; } @@ -377,4 +611,27 @@ namespace ledger::sqlite ledger.output_hash = GET_H32_BLOB(9); } + ledger::ledger_user_input populate_user_input_from_sql_record(sqlite3_stmt *stmt) + { + ledger::ledger_user_input inp; + inp.ledger_seq_no = sqlite3_column_int64(stmt, 0); + inp.pubkey = GET_PUBKEY_BLOB(1); + inp.hash = GET_H32_BLOB(2); + // inp.nonce = GET_BLOB(3); + inp.blob_offset = sqlite3_column_int64(stmt, 4); + inp.blob_size = sqlite3_column_int64(stmt, 5); + return inp; + } + + ledger::ledger_user_output populate_user_output_from_sql_record(sqlite3_stmt *stmt) + { + ledger::ledger_user_output out; + out.ledger_seq_no = sqlite3_column_int64(stmt, 0); + out.pubkey = GET_PUBKEY_BLOB(1); + out.hash = GET_H32_BLOB(2); + out.blob_offset = sqlite3_column_int64(stmt, 3); + out.blob_count = sqlite3_column_int64(stmt, 4); + return out; + } + } // namespace ledger::sqlite diff --git a/src/ledger/sqlite.hpp b/src/ledger/sqlite.hpp index 0493918e..aa269728 100644 --- a/src/ledger/sqlite.hpp +++ b/src/ledger/sqlite.hpp @@ -33,19 +33,21 @@ namespace ledger::sqlite bool is_key; bool is_null; - table_column_info(std::string_view name, const COLUMN_DATA_TYPE &column_type, const bool is_key = false, const bool is_null = false) + table_column_info(std::string_view name, const COLUMN_DATA_TYPE &column_type, const bool is_key = false, const bool is_null = true) : name(name), column_type(column_type), is_key(is_key), is_null(is_null) { } }; // Generic methods. - int open_db(std::string_view db_name, sqlite3 **db); + int open_db(std::string_view db_name, sqlite3 **db, const bool read_only = false); int exec_sql(sqlite3 *db, std::string_view sql, int (*callback)(void *, int, char **, char **) = NULL, void *callback_first_arg = NULL); int create_table(sqlite3 *db, std::string_view table_name, const std::vector &column_info); + int create_index(sqlite3 *db, std::string_view table_name, std::string_view column_names, const bool is_unique); + int insert_rows(sqlite3 *db, std::string_view table_name, std::string_view column_names_string, const std::vector &value_strings); int insert_row(sqlite3 *db, std::string_view table_name, std::string_view column_names_string, std::string_view value_string); @@ -55,20 +57,42 @@ namespace ledger::sqlite int close_db(sqlite3 **db); // Ledger specific methdods. - int create_ledger_table(sqlite3 *db); + int initialize_ledger_db(sqlite3 *db); - int create_hp_version_table_and_update(sqlite3 *db, std::string_view version); + int initialize_ledger_raw_db(sqlite3 *db); + + int create_hp_table(sqlite3 *db, std::string_view version); int insert_ledger_row(sqlite3 *db, const ledger::ledger_record &ledger); - bool is_ledger_table_exist(sqlite3 *db); + sqlite3_stmt *prepare_user_insert(sqlite3 *db); + + sqlite3_stmt *prepare_user_input_insert(sqlite3 *db); + + sqlite3_stmt *prepare_user_output_insert(sqlite3 *db); + + int insert_user_record(sqlite3_stmt *stmt, const uint64_t ledger_seq_no, std::string_view pubkey); + + int insert_user_input_record(sqlite3_stmt *stmt, const uint64_t ledger_seq_no, std::string_view pubkey, + std::string_view hash, std::string_view nonce, const uint64_t blob_offset, const uint64_t blob_size); + + int insert_user_output_record(sqlite3_stmt *stmt, const uint64_t ledger_seq_no, std::string_view pubkey, + std::string_view hash, const uint64_t blob_offset, const uint64_t output_count); int get_last_ledger(sqlite3 *db, ledger::ledger_record &ledger); int get_ledger_by_seq_no(sqlite3 *db, const uint64_t seq_no, ledger::ledger_record &ledger); + int get_user_inputs_by_seq_no(sqlite3 *db, const uint64_t seq_no, std::vector &inputs); + + int get_user_outputs_by_seq_no(sqlite3 *db, const uint64_t seq_no, std::vector &outputs); + void populate_ledger_from_sql_record(ledger::ledger_record &ledger, sqlite3_stmt *stmt); + ledger::ledger_user_input populate_user_input_from_sql_record(sqlite3_stmt *stmt); + + ledger::ledger_user_output populate_user_output_from_sql_record(sqlite3_stmt *stmt); + } // namespace ledger::sqlite #endif \ No newline at end of file diff --git a/src/msg/bson/usrmsg_bson.cpp b/src/msg/bson/usrmsg_bson.cpp index 1ce15a76..5b29258a 100644 --- a/src/msg/bson/usrmsg_bson.cpp +++ b/src/msg/bson/usrmsg_bson.cpp @@ -254,7 +254,7 @@ namespace msg::usrmsg::bson encoder.key(msg::usrmsg::FLD_RESULTS); encoder.begin_array(); - populate_ledger_query_results(encoder, std::get>(result)); + populate_ledger_query_results(encoder, std::get>(result)); encoder.end_array(); encoder.end_object(); encoder.flush(); @@ -416,7 +416,7 @@ namespace msg::usrmsg::bson * "id": "", * "filter_by": "", * "params": {...}, // Params supported by the specified filter. - * "include": ["raw_inputs", "raw_outputs"] + * "include": ["inputs", "outputs"] * } * @return 0 on successful extraction. -1 for failure. */ @@ -445,14 +445,14 @@ namespace msg::usrmsg::bson extracted_id = std::move(id); // Detect includes. - bool raw_inputs = false; - bool raw_outputs = false; + bool inputs = false; + bool outputs = false; for (auto &val : d[msg::usrmsg::FLD_INCLUDE].array_range()) { - if (val == msg::usrmsg::FLD_RAW_INPUTS) - raw_inputs = true; - else if (val == msg::usrmsg::FLD_RAW_OUTPUTS) - raw_outputs = true; + if (val == msg::usrmsg::FLD_INPUTS) + inputs = true; + else if (val == msg::usrmsg::FLD_OUTPUTS) + outputs = true; } auto ¶ms_field = d[msg::usrmsg::FLD_PARAMS]; @@ -467,8 +467,8 @@ namespace msg::usrmsg::bson extracted_query = ledger::query::seq_no_query{ params_field[msg::usrmsg::FLD_SEQ_NO].as(), - raw_inputs, - raw_outputs}; + inputs, + outputs}; return 0; } else @@ -499,62 +499,83 @@ namespace msg::usrmsg::bson } } - void populate_ledger_query_results(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector &results) + void populate_ledger_query_results(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector &results) { - for (const ledger::query::query_result_record &r : results) + for (const ledger::ledger_record &ledger : results) { encoder.begin_object(); encoder.key(msg::usrmsg::FLD_SEQ_NO); - encoder.uint64_value(r.ledger.seq_no); + encoder.uint64_value(ledger.seq_no); encoder.key(msg::usrmsg::FLD_TIMESTAMP); - encoder.uint64_value(r.ledger.timestamp); + encoder.uint64_value(ledger.timestamp); encoder.key(msg::usrmsg::FLD_HASH); - encoder.byte_string_value(r.ledger.ledger_hash); + encoder.byte_string_value(ledger.ledger_hash); encoder.key(msg::usrmsg::FLD_PREV_HASH); - encoder.byte_string_value(r.ledger.prev_ledger_hash); + encoder.byte_string_value(ledger.prev_ledger_hash); encoder.key(msg::usrmsg::FLD_STATE_HASH); - encoder.byte_string_value(r.ledger.state_hash); + encoder.byte_string_value(ledger.state_hash); encoder.key(msg::usrmsg::FLD_CONFIG_HASH); - encoder.byte_string_value(r.ledger.config_hash); + encoder.byte_string_value(ledger.config_hash); encoder.key(msg::usrmsg::FLD_USER_HASH); - encoder.byte_string_value(r.ledger.user_hash); + encoder.byte_string_value(ledger.user_hash); encoder.key(msg::usrmsg::FLD_INPUT_HASH); - encoder.byte_string_value(r.ledger.input_hash); + encoder.byte_string_value(ledger.input_hash); encoder.key(msg::usrmsg::FLD_OUTPUT_HASH); - encoder.byte_string_value(r.ledger.output_hash); + encoder.byte_string_value(ledger.output_hash); // If raw inputs or outputs is not requested, we don't include that field at all in the response. // Otherwise the field will always contain an array (empty array if no data). - if (r.raw_inputs) + if (ledger.inputs) { - encoder.key(msg::usrmsg::FLD_RAW_INPUTS); - populate_ledger_blob_map(encoder, *r.raw_inputs); + encoder.key(msg::usrmsg::FLD_INPUTS); + populate_ledger_inputs(encoder, *ledger.inputs); } - if (r.raw_outputs) + if (ledger.outputs) { - encoder.key(msg::usrmsg::FLD_RAW_OUTPUTS); - populate_ledger_blob_map(encoder, *r.raw_outputs); + encoder.key(msg::usrmsg::FLD_OUTPUTS); + populate_ledger_outputs(encoder, *ledger.outputs); } encoder.end_object(); } } - void populate_ledger_blob_map(jsoncons::bson::bson_bytes_encoder &encoder, const ledger::query::blob_map &blob_map) + void populate_ledger_inputs(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector &inputs) { encoder.begin_array(); - for (const auto &[pubkey, blobs] : blob_map) + for (const ledger::ledger_user_input &inp : inputs) { encoder.begin_object(); encoder.key(msg::usrmsg::FLD_PUBKEY); - encoder.byte_string_value(pubkey); + encoder.byte_string_value(inp.pubkey); + encoder.key(msg::usrmsg::FLD_HASH); + encoder.byte_string_value(inp.hash); + encoder.key(msg::usrmsg::FLD_BLOB); + encoder.byte_string_value(inp.blob); + + encoder.end_object(); + } + encoder.end_array(); + } + + void populate_ledger_outputs(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector &users) + { + encoder.begin_array(); + for (const ledger::ledger_user_output &user : users) + { + encoder.begin_object(); + + encoder.key(msg::usrmsg::FLD_PUBKEY); + encoder.byte_string_value(user.pubkey); + encoder.key(msg::usrmsg::FLD_HASH); + encoder.byte_string_value(user.hash); encoder.key(msg::usrmsg::FLD_BLOBS); encoder.begin_array(); - for (const std::string &blob : blobs) - encoder.byte_string_value(blob); + for (const std::string &output : user.outputs) + encoder.byte_string_value(output); encoder.end_array(); encoder.end_object(); diff --git a/src/msg/bson/usrmsg_bson.hpp b/src/msg/bson/usrmsg_bson.hpp index 286c7d54..99cbd15f 100644 --- a/src/msg/bson/usrmsg_bson.hpp +++ b/src/msg/bson/usrmsg_bson.hpp @@ -43,9 +43,11 @@ namespace msg::usrmsg::bson void populate_output_hash_array(jsoncons::bson::bson_bytes_encoder &encoder, const util::merkle_hash_node &node); - void populate_ledger_query_results(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector &results); + void populate_ledger_query_results(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector &results); - void populate_ledger_blob_map(jsoncons::bson::bson_bytes_encoder &encoder, const ledger::query::blob_map &blob_map); + void populate_ledger_inputs(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector &inputs); + + void populate_ledger_outputs(jsoncons::bson::bson_bytes_encoder &encoder, const std::vector &users); } // namespace msg::usrmsg::bson diff --git a/src/msg/fbuf/ledger_blob_schema.fbs b/src/msg/fbuf/ledger_blob_schema.fbs deleted file mode 100644 index 04c31637..00000000 --- a/src/msg/fbuf/ledger_blob_schema.fbs +++ /dev/null @@ -1,27 +0,0 @@ -namespace msg.fbuf.ledgermsg; - -table LedgerBlob { - ledger_hash:[ubyte]; - raw_inputs:[RawInputCollection]; - raw_outputs:[RawOutputCollection]; -} - -table RawInputCollection { - pubkey:[ubyte]; - inputs:[RawInput]; -} - -table RawInput { - input: [ubyte]; -} - -table RawOutputCollection { - pubkey:[ubyte]; - outputs:[RawOutput]; -} - -table RawOutput { - output: [ubyte]; -} - -root_type LedgerBlob; \ No newline at end of file diff --git a/src/msg/fbuf/ledger_blob_schema_generated.h b/src/msg/fbuf/ledger_blob_schema_generated.h deleted file mode 100644 index 432cced8..00000000 --- a/src/msg/fbuf/ledger_blob_schema_generated.h +++ /dev/null @@ -1,408 +0,0 @@ -// automatically generated by the FlatBuffers compiler, do not modify - - -#ifndef FLATBUFFERS_GENERATED_LEDGERBLOBSCHEMA_MSG_FBUF_LEDGERMSG_H_ -#define FLATBUFFERS_GENERATED_LEDGERBLOBSCHEMA_MSG_FBUF_LEDGERMSG_H_ - -#include "flatbuffers/flatbuffers.h" - -namespace msg { -namespace fbuf { -namespace ledgermsg { - -struct LedgerBlob; -struct LedgerBlobBuilder; - -struct RawInputCollection; -struct RawInputCollectionBuilder; - -struct RawInput; -struct RawInputBuilder; - -struct RawOutputCollection; -struct RawOutputCollectionBuilder; - -struct RawOutput; -struct RawOutputBuilder; - -struct LedgerBlob FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { - typedef LedgerBlobBuilder Builder; - enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_LEDGER_HASH = 4, - VT_RAW_INPUTS = 6, - VT_RAW_OUTPUTS = 8 - }; - const flatbuffers::Vector *ledger_hash() const { - return GetPointer *>(VT_LEDGER_HASH); - } - flatbuffers::Vector *mutable_ledger_hash() { - return GetPointer *>(VT_LEDGER_HASH); - } - const flatbuffers::Vector> *raw_inputs() const { - return GetPointer> *>(VT_RAW_INPUTS); - } - flatbuffers::Vector> *mutable_raw_inputs() { - return GetPointer> *>(VT_RAW_INPUTS); - } - const flatbuffers::Vector> *raw_outputs() const { - return GetPointer> *>(VT_RAW_OUTPUTS); - } - flatbuffers::Vector> *mutable_raw_outputs() { - return GetPointer> *>(VT_RAW_OUTPUTS); - } - bool Verify(flatbuffers::Verifier &verifier) const { - return VerifyTableStart(verifier) && - VerifyOffset(verifier, VT_LEDGER_HASH) && - verifier.VerifyVector(ledger_hash()) && - VerifyOffset(verifier, VT_RAW_INPUTS) && - verifier.VerifyVector(raw_inputs()) && - verifier.VerifyVectorOfTables(raw_inputs()) && - VerifyOffset(verifier, VT_RAW_OUTPUTS) && - verifier.VerifyVector(raw_outputs()) && - verifier.VerifyVectorOfTables(raw_outputs()) && - verifier.EndTable(); - } -}; - -struct LedgerBlobBuilder { - typedef LedgerBlob Table; - flatbuffers::FlatBufferBuilder &fbb_; - flatbuffers::uoffset_t start_; - void add_ledger_hash(flatbuffers::Offset> ledger_hash) { - fbb_.AddOffset(LedgerBlob::VT_LEDGER_HASH, ledger_hash); - } - void add_raw_inputs(flatbuffers::Offset>> raw_inputs) { - fbb_.AddOffset(LedgerBlob::VT_RAW_INPUTS, raw_inputs); - } - void add_raw_outputs(flatbuffers::Offset>> raw_outputs) { - fbb_.AddOffset(LedgerBlob::VT_RAW_OUTPUTS, raw_outputs); - } - explicit LedgerBlobBuilder(flatbuffers::FlatBufferBuilder &_fbb) - : fbb_(_fbb) { - start_ = fbb_.StartTable(); - } - flatbuffers::Offset Finish() { - const auto end = fbb_.EndTable(start_); - auto o = flatbuffers::Offset(end); - return o; - } -}; - -inline flatbuffers::Offset CreateLedgerBlob( - flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset> ledger_hash = 0, - flatbuffers::Offset>> raw_inputs = 0, - flatbuffers::Offset>> raw_outputs = 0) { - LedgerBlobBuilder builder_(_fbb); - builder_.add_raw_outputs(raw_outputs); - builder_.add_raw_inputs(raw_inputs); - builder_.add_ledger_hash(ledger_hash); - return builder_.Finish(); -} - -inline flatbuffers::Offset CreateLedgerBlobDirect( - flatbuffers::FlatBufferBuilder &_fbb, - const std::vector *ledger_hash = nullptr, - const std::vector> *raw_inputs = nullptr, - const std::vector> *raw_outputs = nullptr) { - auto ledger_hash__ = ledger_hash ? _fbb.CreateVector(*ledger_hash) : 0; - auto raw_inputs__ = raw_inputs ? _fbb.CreateVector>(*raw_inputs) : 0; - auto raw_outputs__ = raw_outputs ? _fbb.CreateVector>(*raw_outputs) : 0; - return msg::fbuf::ledgermsg::CreateLedgerBlob( - _fbb, - ledger_hash__, - raw_inputs__, - raw_outputs__); -} - -struct RawInputCollection FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { - typedef RawInputCollectionBuilder Builder; - enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_PUBKEY = 4, - VT_INPUTS = 6 - }; - const flatbuffers::Vector *pubkey() const { - return GetPointer *>(VT_PUBKEY); - } - flatbuffers::Vector *mutable_pubkey() { - return GetPointer *>(VT_PUBKEY); - } - const flatbuffers::Vector> *inputs() const { - return GetPointer> *>(VT_INPUTS); - } - flatbuffers::Vector> *mutable_inputs() { - return GetPointer> *>(VT_INPUTS); - } - bool Verify(flatbuffers::Verifier &verifier) const { - return VerifyTableStart(verifier) && - VerifyOffset(verifier, VT_PUBKEY) && - verifier.VerifyVector(pubkey()) && - VerifyOffset(verifier, VT_INPUTS) && - verifier.VerifyVector(inputs()) && - verifier.VerifyVectorOfTables(inputs()) && - verifier.EndTable(); - } -}; - -struct RawInputCollectionBuilder { - typedef RawInputCollection Table; - flatbuffers::FlatBufferBuilder &fbb_; - flatbuffers::uoffset_t start_; - void add_pubkey(flatbuffers::Offset> pubkey) { - fbb_.AddOffset(RawInputCollection::VT_PUBKEY, pubkey); - } - void add_inputs(flatbuffers::Offset>> inputs) { - fbb_.AddOffset(RawInputCollection::VT_INPUTS, inputs); - } - explicit RawInputCollectionBuilder(flatbuffers::FlatBufferBuilder &_fbb) - : fbb_(_fbb) { - start_ = fbb_.StartTable(); - } - flatbuffers::Offset Finish() { - const auto end = fbb_.EndTable(start_); - auto o = flatbuffers::Offset(end); - return o; - } -}; - -inline flatbuffers::Offset CreateRawInputCollection( - flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset> pubkey = 0, - flatbuffers::Offset>> inputs = 0) { - RawInputCollectionBuilder builder_(_fbb); - builder_.add_inputs(inputs); - builder_.add_pubkey(pubkey); - return builder_.Finish(); -} - -inline flatbuffers::Offset CreateRawInputCollectionDirect( - flatbuffers::FlatBufferBuilder &_fbb, - const std::vector *pubkey = nullptr, - const std::vector> *inputs = nullptr) { - auto pubkey__ = pubkey ? _fbb.CreateVector(*pubkey) : 0; - auto inputs__ = inputs ? _fbb.CreateVector>(*inputs) : 0; - return msg::fbuf::ledgermsg::CreateRawInputCollection( - _fbb, - pubkey__, - inputs__); -} - -struct RawInput FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { - typedef RawInputBuilder Builder; - enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_INPUT = 4 - }; - const flatbuffers::Vector *input() const { - return GetPointer *>(VT_INPUT); - } - flatbuffers::Vector *mutable_input() { - return GetPointer *>(VT_INPUT); - } - bool Verify(flatbuffers::Verifier &verifier) const { - return VerifyTableStart(verifier) && - VerifyOffset(verifier, VT_INPUT) && - verifier.VerifyVector(input()) && - verifier.EndTable(); - } -}; - -struct RawInputBuilder { - typedef RawInput Table; - flatbuffers::FlatBufferBuilder &fbb_; - flatbuffers::uoffset_t start_; - void add_input(flatbuffers::Offset> input) { - fbb_.AddOffset(RawInput::VT_INPUT, input); - } - explicit RawInputBuilder(flatbuffers::FlatBufferBuilder &_fbb) - : fbb_(_fbb) { - start_ = fbb_.StartTable(); - } - flatbuffers::Offset Finish() { - const auto end = fbb_.EndTable(start_); - auto o = flatbuffers::Offset(end); - return o; - } -}; - -inline flatbuffers::Offset CreateRawInput( - flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset> input = 0) { - RawInputBuilder builder_(_fbb); - builder_.add_input(input); - return builder_.Finish(); -} - -inline flatbuffers::Offset CreateRawInputDirect( - flatbuffers::FlatBufferBuilder &_fbb, - const std::vector *input = nullptr) { - auto input__ = input ? _fbb.CreateVector(*input) : 0; - return msg::fbuf::ledgermsg::CreateRawInput( - _fbb, - input__); -} - -struct RawOutputCollection FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { - typedef RawOutputCollectionBuilder Builder; - enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_PUBKEY = 4, - VT_OUTPUTS = 6 - }; - const flatbuffers::Vector *pubkey() const { - return GetPointer *>(VT_PUBKEY); - } - flatbuffers::Vector *mutable_pubkey() { - return GetPointer *>(VT_PUBKEY); - } - const flatbuffers::Vector> *outputs() const { - return GetPointer> *>(VT_OUTPUTS); - } - flatbuffers::Vector> *mutable_outputs() { - return GetPointer> *>(VT_OUTPUTS); - } - bool Verify(flatbuffers::Verifier &verifier) const { - return VerifyTableStart(verifier) && - VerifyOffset(verifier, VT_PUBKEY) && - verifier.VerifyVector(pubkey()) && - VerifyOffset(verifier, VT_OUTPUTS) && - verifier.VerifyVector(outputs()) && - verifier.VerifyVectorOfTables(outputs()) && - verifier.EndTable(); - } -}; - -struct RawOutputCollectionBuilder { - typedef RawOutputCollection Table; - flatbuffers::FlatBufferBuilder &fbb_; - flatbuffers::uoffset_t start_; - void add_pubkey(flatbuffers::Offset> pubkey) { - fbb_.AddOffset(RawOutputCollection::VT_PUBKEY, pubkey); - } - void add_outputs(flatbuffers::Offset>> outputs) { - fbb_.AddOffset(RawOutputCollection::VT_OUTPUTS, outputs); - } - explicit RawOutputCollectionBuilder(flatbuffers::FlatBufferBuilder &_fbb) - : fbb_(_fbb) { - start_ = fbb_.StartTable(); - } - flatbuffers::Offset Finish() { - const auto end = fbb_.EndTable(start_); - auto o = flatbuffers::Offset(end); - return o; - } -}; - -inline flatbuffers::Offset CreateRawOutputCollection( - flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset> pubkey = 0, - flatbuffers::Offset>> outputs = 0) { - RawOutputCollectionBuilder builder_(_fbb); - builder_.add_outputs(outputs); - builder_.add_pubkey(pubkey); - return builder_.Finish(); -} - -inline flatbuffers::Offset CreateRawOutputCollectionDirect( - flatbuffers::FlatBufferBuilder &_fbb, - const std::vector *pubkey = nullptr, - const std::vector> *outputs = nullptr) { - auto pubkey__ = pubkey ? _fbb.CreateVector(*pubkey) : 0; - auto outputs__ = outputs ? _fbb.CreateVector>(*outputs) : 0; - return msg::fbuf::ledgermsg::CreateRawOutputCollection( - _fbb, - pubkey__, - outputs__); -} - -struct RawOutput FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { - typedef RawOutputBuilder Builder; - enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_OUTPUT = 4 - }; - const flatbuffers::Vector *output() const { - return GetPointer *>(VT_OUTPUT); - } - flatbuffers::Vector *mutable_output() { - return GetPointer *>(VT_OUTPUT); - } - bool Verify(flatbuffers::Verifier &verifier) const { - return VerifyTableStart(verifier) && - VerifyOffset(verifier, VT_OUTPUT) && - verifier.VerifyVector(output()) && - verifier.EndTable(); - } -}; - -struct RawOutputBuilder { - typedef RawOutput Table; - flatbuffers::FlatBufferBuilder &fbb_; - flatbuffers::uoffset_t start_; - void add_output(flatbuffers::Offset> output) { - fbb_.AddOffset(RawOutput::VT_OUTPUT, output); - } - explicit RawOutputBuilder(flatbuffers::FlatBufferBuilder &_fbb) - : fbb_(_fbb) { - start_ = fbb_.StartTable(); - } - flatbuffers::Offset Finish() { - const auto end = fbb_.EndTable(start_); - auto o = flatbuffers::Offset(end); - return o; - } -}; - -inline flatbuffers::Offset CreateRawOutput( - flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset> output = 0) { - RawOutputBuilder builder_(_fbb); - builder_.add_output(output); - return builder_.Finish(); -} - -inline flatbuffers::Offset CreateRawOutputDirect( - flatbuffers::FlatBufferBuilder &_fbb, - const std::vector *output = nullptr) { - auto output__ = output ? _fbb.CreateVector(*output) : 0; - return msg::fbuf::ledgermsg::CreateRawOutput( - _fbb, - output__); -} - -inline const msg::fbuf::ledgermsg::LedgerBlob *GetLedgerBlob(const void *buf) { - return flatbuffers::GetRoot(buf); -} - -inline const msg::fbuf::ledgermsg::LedgerBlob *GetSizePrefixedLedgerBlob(const void *buf) { - return flatbuffers::GetSizePrefixedRoot(buf); -} - -inline LedgerBlob *GetMutableLedgerBlob(void *buf) { - return flatbuffers::GetMutableRoot(buf); -} - -inline bool VerifyLedgerBlobBuffer( - flatbuffers::Verifier &verifier) { - return verifier.VerifyBuffer(nullptr); -} - -inline bool VerifySizePrefixedLedgerBlobBuffer( - flatbuffers::Verifier &verifier) { - return verifier.VerifySizePrefixedBuffer(nullptr); -} - -inline void FinishLedgerBlobBuffer( - flatbuffers::FlatBufferBuilder &fbb, - flatbuffers::Offset root) { - fbb.Finish(root); -} - -inline void FinishSizePrefixedLedgerBlobBuffer( - flatbuffers::FlatBufferBuilder &fbb, - flatbuffers::Offset root) { - fbb.FinishSizePrefixed(root); -} - -} // namespace ledgermsg -} // namespace fbuf -} // namespace msg - -#endif // FLATBUFFERS_GENERATED_LEDGERBLOBSCHEMA_MSG_FBUF_LEDGERMSG_H_ diff --git a/src/msg/fbuf/ledger_helpers.cpp b/src/msg/fbuf/ledger_helpers.cpp deleted file mode 100644 index d7022639..00000000 --- a/src/msg/fbuf/ledger_helpers.cpp +++ /dev/null @@ -1,102 +0,0 @@ -#include "../../pchheader.hpp" -#include "../../p2p/p2p.hpp" -#include "ledger_blob_schema_generated.h" -#include "common_helpers.hpp" -#include "ledger_helpers.hpp" - -namespace msg::fbuf::ledgermsg -{ - /** - * Create ledger blob msg from ledger blob struct. - * @param ledger_blob Ledger blob to be placed in file. - */ - void create_ledger_blob_msg_from_ledger_blob(flatbuffers::FlatBufferBuilder &builder, const ledger::ledger_blob &ledger_blob) - { - std::vector> raw_inputs; - raw_inputs.reserve(ledger_blob.inputs.size()); - std::vector> raw_outputs; - raw_outputs.reserve(ledger_blob.outputs.size()); - - for (const auto &[key, value] : ledger_blob.inputs) - { - std::vector> inputs; - inputs.reserve(value.size()); - - for (const auto &input : value) - inputs.push_back(ledgermsg::CreateRawInput(builder, sv_to_flatbuf_bytes(builder, input))); - - raw_inputs.push_back(ledgermsg::CreateRawInputCollection( - builder, - sv_to_flatbuf_bytes(builder, key), - builder.CreateVector(inputs))); - } - - for (const auto &[key, value] : ledger_blob.outputs) - { - std::vector> outputs; - outputs.reserve(value.size()); - - for (const auto &output : value) - outputs.push_back(ledgermsg::CreateRawOutput(builder, sv_to_flatbuf_bytes(builder, output))); - - raw_outputs.push_back(ledgermsg::CreateRawOutputCollection( - builder, - sv_to_flatbuf_bytes(builder, key), - builder.CreateVector(outputs))); - } - - flatbuffers::Offset blob = - ledgermsg::CreateLedgerBlob( - builder, - hash_to_flatbuf_bytes(builder, ledger_blob.ledger_hash), - builder.CreateVector(raw_inputs), - builder.CreateVector(raw_outputs)); - - builder.Finish(blob); // Finished building message content to get serialised content. - } - - const int create_ledger_blob_from_msg(ledger::ledger_blob &blob_data, const std::string &msg, const bool read_inputs, const bool read_outputs) - { - // Verify ledger blob using flatbuffer verifier - flatbuffers::Verifier verifier((uint8_t *)msg.data(), msg.size(), 16, 100); - if (!VerifyLedgerBlobBuffer(verifier)) - { - LOG_ERROR << "Ledger blob flatbuffer verification failed."; - return -1; - } - - const auto ledger_msg = ledgermsg::GetLedgerBlob(msg.data()); - blob_data.ledger_hash = flatbuf_bytes_to_hash(ledger_msg->ledger_hash()); - - if (read_inputs) - { - std::vector inputs; - for (const auto collection : *ledger_msg->raw_inputs()) - { - for (const auto input_msg : *collection->inputs()) - { - inputs.push_back(std::string(flatbuf_bytes_to_sv(input_msg->input()))); - } - - blob_data.inputs.emplace(std::string(flatbuf_bytes_to_sv(collection->pubkey())), std::move(inputs)); - } - } - - if (read_outputs) - { - std::vector outputs; - for (const auto collection : *ledger_msg->raw_outputs()) - { - for (const auto output_msg : *collection->outputs()) - { - outputs.push_back(std::string(flatbuf_bytes_to_sv(output_msg->output()))); - } - - blob_data.outputs.emplace(std::string(flatbuf_bytes_to_sv(collection->pubkey())), std::move(outputs)); - } - } - - return 0; - } - -} // namespace msg::fbuf::ledgermsg diff --git a/src/msg/fbuf/ledger_helpers.hpp b/src/msg/fbuf/ledger_helpers.hpp deleted file mode 100644 index cf3f51e5..00000000 --- a/src/msg/fbuf/ledger_helpers.hpp +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef _HP_MSG_FBUF_LEDGER_HELPERS_ -#define _HP_MSG_FBUF_LEDGER_HELPERS_ - -#include "../../pchheader.hpp" -#include "../../p2p/p2p.hpp" -#include "../../ledger/ledger.hpp" - -namespace msg::fbuf::ledgermsg -{ - void create_ledger_blob_msg_from_ledger_blob(flatbuffers::FlatBufferBuilder &builder, const ledger::ledger_blob &ledger_blob); - - const int create_ledger_blob_from_msg(ledger::ledger_blob &blob_data, const std::string &msg, const bool read_inputs, const bool read_outputs); - -} // namespace msg::fbuf::ledgermsg - -#endif \ No newline at end of file diff --git a/src/msg/fbuf/p2pmsg.fbs b/src/msg/fbuf/p2pmsg.fbs index 27f29051..c0faf616 100644 --- a/src/msg/fbuf/p2pmsg.fbs +++ b/src/msg/fbuf/p2pmsg.fbs @@ -67,7 +67,7 @@ table ProposalMsg { state_hash: [ubyte]; patch_hash: [ubyte]; last_primary_shard_id:SequenceHash; - last_blob_shard_id: SequenceHash; + last_raw_shard_id: SequenceHash; // Make sure to update signature generation/verification whenever these fields are changed. } diff --git a/src/msg/fbuf/p2pmsg_conversion.cpp b/src/msg/fbuf/p2pmsg_conversion.cpp index bbee94d3..b4591914 100644 --- a/src/msg/fbuf/p2pmsg_conversion.cpp +++ b/src/msg/fbuf/p2pmsg_conversion.cpp @@ -79,7 +79,7 @@ namespace msg::fbuf::p2pmsg hasher.add(msg.state_hash()); hasher.add(msg.patch_hash()); hasher.add(msg.last_primary_shard_id()); - hasher.add(msg.last_blob_shard_id()); + hasher.add(msg.last_raw_shard_id()); return crypto::verify(hasher.hash(), flatbuf_bytes_to_sv(msg.sig()), pubkey) == 0; } @@ -139,7 +139,7 @@ namespace msg::fbuf::p2pmsg p.state_hash = flatbuf_bytes_to_sv(msg.state_hash()); p.patch_hash = flatbuf_bytes_to_sv(msg.patch_hash()); p.last_primary_shard_id = flatbuf_seqhash_to_seqhash(msg.last_primary_shard_id()); - p.last_blob_shard_id = flatbuf_seqhash_to_seqhash(msg.last_blob_shard_id()); + p.last_raw_shard_id = flatbuf_seqhash_to_seqhash(msg.last_raw_shard_id()); if (msg.users()) p.users = flatbuf_bytearrayvector_to_stringlist(msg.users()); @@ -314,7 +314,7 @@ namespace msg::fbuf::p2pmsg hasher.add(p.state_hash); hasher.add(p.patch_hash); hasher.add(p.last_primary_shard_id); - hasher.add(p.last_blob_shard_id); + hasher.add(p.last_raw_shard_id); return crypto::sign(hasher.hash(), conf::cfg.node.private_key); } @@ -391,7 +391,7 @@ namespace msg::fbuf::p2pmsg hash_to_flatbuf_bytes(builder, p.state_hash), hash_to_flatbuf_bytes(builder, p.patch_hash), seqhash_to_flatbuf_seqhash(builder, p.last_primary_shard_id), - seqhash_to_flatbuf_seqhash(builder, p.last_blob_shard_id)); + seqhash_to_flatbuf_seqhash(builder, p.last_raw_shard_id)); create_p2p_msg(builder, P2PMsgContent_ProposalMsg, msg.Union()); } diff --git a/src/msg/fbuf/p2pmsg_generated.h b/src/msg/fbuf/p2pmsg_generated.h index 64c2c897..ced6cc96 100644 --- a/src/msg/fbuf/p2pmsg_generated.h +++ b/src/msg/fbuf/p2pmsg_generated.h @@ -872,7 +872,7 @@ struct ProposalMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VT_STATE_HASH = 24, VT_PATCH_HASH = 26, VT_LAST_PRIMARY_SHARD_ID = 28, - VT_LAST_BLOB_SHARD_ID = 30 + VT_LAST_RAW_SHARD_ID = 30 }; const flatbuffers::Vector *pubkey() const { return GetPointer *>(VT_PUBKEY); @@ -952,11 +952,11 @@ struct ProposalMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { msg::fbuf::p2pmsg::SequenceHash *mutable_last_primary_shard_id() { return GetPointer(VT_LAST_PRIMARY_SHARD_ID); } - const msg::fbuf::p2pmsg::SequenceHash *last_blob_shard_id() const { - return GetPointer(VT_LAST_BLOB_SHARD_ID); + const msg::fbuf::p2pmsg::SequenceHash *last_raw_shard_id() const { + return GetPointer(VT_LAST_RAW_SHARD_ID); } - msg::fbuf::p2pmsg::SequenceHash *mutable_last_blob_shard_id() { - return GetPointer(VT_LAST_BLOB_SHARD_ID); + msg::fbuf::p2pmsg::SequenceHash *mutable_last_raw_shard_id() { + return GetPointer(VT_LAST_RAW_SHARD_ID); } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && @@ -985,8 +985,8 @@ struct ProposalMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { verifier.VerifyVector(patch_hash()) && VerifyOffset(verifier, VT_LAST_PRIMARY_SHARD_ID) && verifier.VerifyTable(last_primary_shard_id()) && - VerifyOffset(verifier, VT_LAST_BLOB_SHARD_ID) && - verifier.VerifyTable(last_blob_shard_id()) && + VerifyOffset(verifier, VT_LAST_RAW_SHARD_ID) && + verifier.VerifyTable(last_raw_shard_id()) && verifier.EndTable(); } }; @@ -1034,8 +1034,8 @@ struct ProposalMsgBuilder { void add_last_primary_shard_id(flatbuffers::Offset last_primary_shard_id) { fbb_.AddOffset(ProposalMsg::VT_LAST_PRIMARY_SHARD_ID, last_primary_shard_id); } - void add_last_blob_shard_id(flatbuffers::Offset last_blob_shard_id) { - fbb_.AddOffset(ProposalMsg::VT_LAST_BLOB_SHARD_ID, last_blob_shard_id); + void add_last_raw_shard_id(flatbuffers::Offset last_raw_shard_id) { + fbb_.AddOffset(ProposalMsg::VT_LAST_RAW_SHARD_ID, last_raw_shard_id); } explicit ProposalMsgBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { @@ -1064,10 +1064,10 @@ inline flatbuffers::Offset CreateProposalMsg( flatbuffers::Offset> state_hash = 0, flatbuffers::Offset> patch_hash = 0, flatbuffers::Offset last_primary_shard_id = 0, - flatbuffers::Offset last_blob_shard_id = 0) { + flatbuffers::Offset last_raw_shard_id = 0) { ProposalMsgBuilder builder_(_fbb); builder_.add_time(time); - builder_.add_last_blob_shard_id(last_blob_shard_id); + builder_.add_last_raw_shard_id(last_raw_shard_id); builder_.add_last_primary_shard_id(last_primary_shard_id); builder_.add_patch_hash(patch_hash); builder_.add_state_hash(state_hash); @@ -1098,7 +1098,7 @@ inline flatbuffers::Offset CreateProposalMsgDirect( const std::vector *state_hash = nullptr, const std::vector *patch_hash = nullptr, flatbuffers::Offset last_primary_shard_id = 0, - flatbuffers::Offset last_blob_shard_id = 0) { + flatbuffers::Offset last_raw_shard_id = 0) { auto pubkey__ = pubkey ? _fbb.CreateVector(*pubkey) : 0; auto sig__ = sig ? _fbb.CreateVector(*sig) : 0; auto nonce__ = nonce ? _fbb.CreateVector(*nonce) : 0; @@ -1123,7 +1123,7 @@ inline flatbuffers::Offset CreateProposalMsgDirect( state_hash__, patch_hash__, last_primary_shard_id, - last_blob_shard_id); + last_raw_shard_id); } struct NplMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { diff --git a/src/msg/json/usrmsg_json.cpp b/src/msg/json/usrmsg_json.cpp index 3144f0ed..2299ff23 100644 --- a/src/msg/json/usrmsg_json.cpp +++ b/src/msg/json/usrmsg_json.cpp @@ -487,7 +487,7 @@ namespace msg::usrmsg::json msg += msg::usrmsg::FLD_RESULTS; msg += "\":["; if (result.index() == 1) - populate_ledger_query_results(msg, std::get>(result)); + populate_ledger_query_results(msg, std::get>(result)); msg += "]}"; } @@ -763,7 +763,7 @@ namespace msg::usrmsg::json * "id": "", * "filter_by": "", * "params": {...}, // Params supported by the specified filter. - * "include": ["raw_inputs", "raw_outputs"] + * "include": ["inputs", "outputs"] * } * @return 0 on successful extraction. -1 for failure. */ @@ -792,14 +792,14 @@ namespace msg::usrmsg::json extracted_id = std::move(id); // Detect includes. - bool raw_inputs = false; - bool raw_outputs = false; + bool inputs = false; + bool outputs = false; for (auto &val : d[msg::usrmsg::FLD_INCLUDE].array_range()) { - if (val == msg::usrmsg::FLD_RAW_INPUTS) - raw_inputs = true; - else if (val == msg::usrmsg::FLD_RAW_OUTPUTS) - raw_outputs = true; + if (val == msg::usrmsg::FLD_INPUTS) + inputs = true; + else if (val == msg::usrmsg::FLD_OUTPUTS) + outputs = true; } auto ¶ms_field = d[msg::usrmsg::FLD_PARAMS]; @@ -814,8 +814,8 @@ namespace msg::usrmsg::json extracted_query = ledger::query::seq_no_query{ params_field[msg::usrmsg::FLD_SEQ_NO].as(), - raw_inputs, - raw_outputs}; + inputs, + outputs}; return 0; } else @@ -883,96 +883,124 @@ namespace msg::usrmsg::json } } - void populate_ledger_query_results(std::vector &msg, const std::vector &results) + void populate_ledger_query_results(std::vector &msg, const std::vector &results) { for (size_t i = 0; i < results.size(); i++) { - const ledger::query::query_result_record &r = results[i]; + const ledger::ledger_record &ledger = results[i]; msg += "{\""; msg += msg::usrmsg::FLD_SEQ_NO; msg += SEP_COLON_NOQUOTE; - msg += std::to_string(r.ledger.seq_no); + msg += std::to_string(ledger.seq_no); msg += SEP_COMMA_NOQUOTE; msg += msg::usrmsg::FLD_TIMESTAMP; msg += SEP_COLON_NOQUOTE; - msg += std::to_string(r.ledger.timestamp); + msg += std::to_string(ledger.timestamp); msg += SEP_COMMA_NOQUOTE; msg += msg::usrmsg::FLD_HASH; msg += SEP_COLON; - msg += util::to_hex(r.ledger.ledger_hash); + msg += util::to_hex(ledger.ledger_hash); msg += SEP_COMMA; msg += msg::usrmsg::FLD_PREV_HASH; msg += SEP_COLON; - msg += util::to_hex(r.ledger.prev_ledger_hash); + msg += util::to_hex(ledger.prev_ledger_hash); msg += SEP_COMMA; msg += msg::usrmsg::FLD_STATE_HASH; msg += SEP_COLON; - msg += util::to_hex(r.ledger.state_hash); + msg += util::to_hex(ledger.state_hash); msg += SEP_COMMA; msg += msg::usrmsg::FLD_CONFIG_HASH; msg += SEP_COLON; - msg += util::to_hex(r.ledger.config_hash); + msg += util::to_hex(ledger.config_hash); msg += SEP_COMMA; msg += msg::usrmsg::FLD_USER_HASH; msg += SEP_COLON; - msg += util::to_hex(r.ledger.user_hash); + msg += util::to_hex(ledger.user_hash); msg += SEP_COMMA; msg += msg::usrmsg::FLD_INPUT_HASH; msg += SEP_COLON; - msg += util::to_hex(r.ledger.input_hash); + msg += util::to_hex(ledger.input_hash); msg += SEP_COMMA; msg += msg::usrmsg::FLD_OUTPUT_HASH; msg += SEP_COLON; - msg += util::to_hex(r.ledger.output_hash); + msg += util::to_hex(ledger.output_hash); msg += "\""; // If raw inputs or outputs is not requested, we don't include that field at all in the response. // Otherwise the field will always contain an array (empty array if no data). - if (r.raw_inputs) + if (ledger.inputs) { msg += SEP_COMMA_NOQUOTE; - msg += msg::usrmsg::FLD_RAW_INPUTS; + msg += msg::usrmsg::FLD_INPUTS; msg += SEP_COLON_NOQUOTE; - populate_ledger_blob_map(msg, *r.raw_inputs); + populate_ledger_inputs(msg, *ledger.inputs); } - if (r.raw_outputs) + if (ledger.outputs) { msg += SEP_COMMA_NOQUOTE; - msg += msg::usrmsg::FLD_RAW_OUTPUTS; + msg += msg::usrmsg::FLD_OUTPUTS; msg += SEP_COLON_NOQUOTE; - populate_ledger_blob_map(msg, *r.raw_outputs); + populate_ledger_outputs(msg, *ledger.outputs); } msg += (i == (results.size() - 1) ? "}" : "},"); } } - void populate_ledger_blob_map(std::vector &msg, const ledger::query::blob_map &blob_map) + void populate_ledger_inputs(std::vector &msg, const std::vector &inputs) { msg += "["; - for (auto itr = blob_map.begin(); itr != blob_map.end();) + for (auto itr = inputs.begin(); itr != inputs.end();) { msg += "{\""; msg += msg::usrmsg::FLD_PUBKEY; msg += SEP_COLON; - msg += util::to_hex(itr->first); + msg += util::to_hex(itr->pubkey); + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_HASH; + msg += SEP_COLON; + msg += util::to_hex(itr->hash); + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_BLOB; + msg += SEP_COLON; + msg += util::to_hex(itr->blob); + + itr++; + msg += (itr == inputs.end() ? "\"}" : "\"},"); + } + msg += "]"; + } + + void populate_ledger_outputs(std::vector &msg, const std::vector &users) + { + msg += "["; + for (auto itr = users.begin(); itr != users.end();) + { + msg += "{\""; + msg += msg::usrmsg::FLD_PUBKEY; + msg += SEP_COLON; + msg += util::to_hex(itr->pubkey); + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_HASH; + msg += SEP_COLON; + msg += util::to_hex(itr->hash); msg += SEP_COMMA; msg += msg::usrmsg::FLD_BLOBS; msg += "\":["; - - const std::vector &blobs = itr->second; - for (size_t i = 0; i < blobs.size(); i++) + for (auto o_itr = itr->outputs.begin(); o_itr != itr->outputs.end();) { msg += "\""; - msg += util::to_hex(blobs[i]); - msg += (i == (blobs.size() - 1) ? "\"" : "\","); + msg += util::to_hex(*o_itr); + + o_itr++; + msg += (o_itr == itr->outputs.end() ? "\"" : "\","); } itr++; - msg += (itr == blob_map.end() ? "]}" : "]},"); + msg += (itr == users.end() ? "]}" : "]},"); } msg += "]"; } diff --git a/src/msg/json/usrmsg_json.hpp b/src/msg/json/usrmsg_json.hpp index 4bfbcae7..0f7a1752 100644 --- a/src/msg/json/usrmsg_json.hpp +++ b/src/msg/json/usrmsg_json.hpp @@ -49,9 +49,11 @@ namespace msg::usrmsg::json void populate_output_hash_array(std::vector &msg, const util::merkle_hash_node &node); - void populate_ledger_query_results(std::vector &msg, const std::vector &results); + void populate_ledger_query_results(std::vector &msg, const std::vector &results); - void populate_ledger_blob_map(std::vector &msg, const ledger::query::blob_map &blob_map); + void populate_ledger_inputs(std::vector &msg, const std::vector &inputs); + + void populate_ledger_outputs(std::vector &msg, const std::vector &users); } // namespace msg::usrmsg::json diff --git a/src/msg/usrmsg_common.hpp b/src/msg/usrmsg_common.hpp index 3dac89ef..e164a2e4 100644 --- a/src/msg/usrmsg_common.hpp +++ b/src/msg/usrmsg_common.hpp @@ -54,8 +54,8 @@ namespace msg::usrmsg constexpr const char *FLD_STATE_HASH = "state_hash"; constexpr const char *FLD_CONFIG_HASH = "config_hash"; constexpr const char *FLD_USER_HASH = "user_hash"; - constexpr const char *FLD_RAW_INPUTS = "raw_inputs"; - constexpr const char *FLD_RAW_OUTPUTS = "raw_outputs"; + constexpr const char *FLD_INPUTS = "inputs"; + constexpr const char *FLD_BLOB = "blob"; constexpr const char *FLD_BLOBS = "blobs"; // Message types diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 70c2ee16..0665f980 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -74,7 +74,7 @@ namespace p2p uint32_t roundtime = 0; // Roundtime of the proposer. std::string nonce; // Random nonce that is used to reduce lcl predictability. sequence_hash last_primary_shard_id; - sequence_hash last_blob_shard_id; + sequence_hash last_raw_shard_id; util::h32 state_hash; // Contract state hash. util::h32 patch_hash; // Patch file hash. std::set users; diff --git a/src/sc/hpfs_log_sync.cpp b/src/sc/hpfs_log_sync.cpp index 8ef973ed..aa38fc6c 100644 --- a/src/sc/hpfs_log_sync.cpp +++ b/src/sc/hpfs_log_sync.cpp @@ -386,7 +386,7 @@ namespace sc::hpfs_log_sync if (db != NULL) ledger::sqlite::close_db(&db); - if (ledger::sqlite::open_db(shard_path + "/" + ledger::DATABASE, &db) == -1) + if (ledger::sqlite::open_db(shard_path + "/" + ledger::PRIMARY_DB, &db) == -1) { LOG_ERROR << errno << ": Error openning the shard database, shard: " << shard_seq_no; ledger::ledger_fs.stop_ro_session(session_name); diff --git a/src/util/util.cpp b/src/util/util.cpp index 63a9432c..11465de0 100644 --- a/src/util/util.cpp +++ b/src/util/util.cpp @@ -329,7 +329,7 @@ namespace util } /** - * Reads from a given file discriptor. + * Reads the entire file from given file discriptor. * @param fd File descriptor to be read. * @param buf String buffer to be populated. * @param offset Begin offset of the file to read. @@ -339,13 +339,42 @@ namespace util { struct stat st; if (fstat(fd, &st) == -1) + { + LOG_ERROR << errno << ": Error in stat for reading entire file."; return -1; + } buf.resize(st.st_size - offset); return pread(fd, buf.data(), buf.size(), offset); } + /** + * Reads the specified portion from the given file descriptor. + * @param fd File descriptor to be read. + * @param buf Buffer to populate. + * @param size How many bytes to read. + * @param offset Offset position in the file to start reading. + * @param file_name File name to print in error log. + * @return 0 on success. -1 on failure or when specified buffer size could not be read. + */ + int read_from_fd(const int fd, void *buf, const size_t size, const off_t offset, std::string_view file_name) + { + const int res = pread(fd, buf, size, offset); + if (res == -1) + { + LOG_ERROR << errno << ": Error when reading " << file_name; + return -1; + } + else if (res < size) + { + LOG_ERROR << "Not enough bytes read from " << file_name; + return -1; + } + + return 0; + } + /** * Create a record lock for the file descriptor. Lock is associated with the process (Not for forked child processes). * @param fd File descriptor to be locked. diff --git a/src/util/util.hpp b/src/util/util.hpp index ac9fafe1..fcf46cef 100644 --- a/src/util/util.hpp +++ b/src/util/util.hpp @@ -63,6 +63,8 @@ namespace util int read_from_fd(const int fd, std::string &buf, const off_t offset = 0); + int read_from_fd(const int fd, void *buf, const size_t size, const off_t offset, std::string_view file_name); + int set_lock(const int fd, struct flock &lock, const bool is_rwlock, const off_t start, const off_t len); int release_lock(const int fd, struct flock &lock); diff --git a/test/local-cluster/cluster-create.sh b/test/local-cluster/cluster-create.sh index cbc5b5ff..c5b7ca46 100755 --- a/test/local-cluster/cluster-create.sh +++ b/test/local-cluster/cluster-create.sh @@ -87,7 +87,7 @@ do history: 'custom',\ history_config: {\ max_primary_shards: 4,\ - max_blob_shards: 4\ + max_raw_shards: 4\ }\ }, null, 2)")