From d4646179c22c14555846e65df3bb11f717301f1e Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Wed, 7 Apr 2021 21:22:29 +0530 Subject: [PATCH] Returned input hash and ledger info for input submissions. (#283) - Introduced input hash which can later be used to query the ledger. - Returned input hash and ledger info at input submission. - Updated js client lib to support input hash. - Updated consensus proposal candidate expiration rules. --- examples/js_client/browser-example.html | 11 +- examples/js_client/file-client.js | 18 +- examples/js_client/hp-client-lib.js | 106 ++++--- examples/js_client/text-client.js | 13 +- src/consensus.cpp | 361 +++++++++++++++--------- src/consensus.hpp | 53 +++- src/crypto.cpp | 45 +-- src/crypto.hpp | 5 +- src/ledger/ledger.cpp | 171 ++++++----- src/ledger/ledger.hpp | 7 +- src/ledger/ledger_common.hpp | 4 +- src/ledger/sqlite.cpp | 2 +- src/msg/bson/usrmsg_bson.cpp | 46 ++- src/msg/bson/usrmsg_bson.hpp | 4 +- src/msg/fbuf/p2pmsg_conversion.cpp | 6 +- src/msg/json/usrmsg_json.cpp | 54 +++- src/msg/json/usrmsg_json.hpp | 4 +- src/msg/usrmsg_common.hpp | 6 +- src/msg/usrmsg_parser.cpp | 14 +- src/msg/usrmsg_parser.hpp | 8 +- src/p2p/p2p.hpp | 2 +- src/sc/sc.cpp | 5 - src/sc/sc.hpp | 5 + src/usr/input_nonce_map.cpp | 6 +- src/usr/input_nonce_map.hpp | 2 +- src/usr/user_input.hpp | 2 +- src/usr/usr.cpp | 65 +++-- src/usr/usr.hpp | 11 +- src/util/buffer_store.cpp | 5 +- src/util/merkle_hash_tree.cpp | 2 +- src/util/util.cpp | 8 + src/util/util.hpp | 2 + test/metrics/metrics.js | 8 +- 33 files changed, 668 insertions(+), 393 deletions(-) diff --git a/examples/js_client/browser-example.html b/examples/js_client/browser-example.html index 864ee936..f1c9f35e 100644 --- a/examples/js_client/browser-example.html +++ b/examples/js_client/browser-example.html @@ -80,10 +80,13 @@ console.log('HotPocket Connected.'); hpc.sendContractReadRequest("Hello"); - hpc.sendContractInput("World!").then(status => { - if (status != "ok") - console.log(status); - }); + + const input = await hpc.submitContractInput("World!") + + // console.log(input.hash); + const r = await input.submissionStatus; + if (r.status != "accepted") + console.log(r.reason); // When we need to close HP connection: // await hpc.close(); diff --git a/examples/js_client/file-client.js b/examples/js_client/file-client.js index ef2006eb..5cfa6272 100644 --- a/examples/js_client/file-client.js +++ b/examples/js_client/file-client.js @@ -78,7 +78,7 @@ async function main() { console.log("Unknown read request result."); } }) - + console.log("Ready to accept inputs."); const input_pump = () => { @@ -92,25 +92,27 @@ async function main() { const sizeKB = Math.round(fileContent.length / 1024); console.log("Uploading file " + fileName + " (" + sizeKB + " KB)"); - const submissionStatus = await hpc.sendContractInput(bson.serialize({ + const input = await hpc.submitContractInput(bson.serialize({ type: "upload", fileName: fileName, content: fileContent - }), null, 100); + })); - if (submissionStatus && submissionStatus != "ok") - console.log("Upload failed. reason: " + submissionStatus); + const submission = await input.submissionStatus; + if (submission.status != "accepted") + console.log("Upload failed. reason: " + submission.reason); } else if (inp.startsWith("delete ")) { const fileName = inp.substr(7); - const submissionStatus = await hpc.sendContractInput(bson.serialize({ + const input = await hpc.submitContractInput(bson.serialize({ type: "delete", fileName: fileName })); - if (submissionStatus && submissionStatus != "ok") - console.log("Delete failed. reason: " + submissionStatus); + const submission = await input.submissionStatus; + if (submission.status != "accepted") + console.log("Delete failed. reason: " + submission.reason); } else if (inp.startsWith("download ")) { diff --git a/examples/js_client/hp-client-lib.js b/examples/js_client/hp-client-lib.js index 527dbcb5..d8ae2937 100644 --- a/examples/js_client/hp-client-lib.js +++ b/examples/js_client/hp-client-lib.js @@ -321,9 +321,9 @@ emitter.clear(event); } - this.sendContractInput = (input, nonce = null, maxLclOffset = null) => { + this.submitContractInput = (input, nonce = null, maxLedger = null, isOffset = true) => { // We always only submit contract input to a single node (even if we are connected to multiple nodes). - return getMultiConnectionResult(con => con.sendContractInput(input, nonce, maxLclOffset), 1); + return getMultiConnectionResult(con => con.submitContractInput(input, nonce, maxLedger, isOffset), 1); } this.sendContractReadRequest = (request) => { @@ -356,7 +356,7 @@ let handshakeResolver = null; let closeResolver = null; let statResponseResolvers = []; - let contractInputResolvers = {}; + let contractInputResolvers = {}; // Contract input status-awaiting resolvers (keyed by input hash). let ledgerQueryResolvers = {}; // Message resolvers that uses request/reply associations. // Calcualtes the blake3 hash of all array items. @@ -536,14 +536,21 @@ emitter && emitter.emit(events.contractReadResponse, msgHelper.deserializeOutput(m.content)); } else if (m.type == "contract_input_status") { - const sigKey = msgHelper.stringifyValue(m.input_sig); - const resolver = contractInputResolvers[sigKey]; + const inputHashHex = msgHelper.stringifyValue(m.input_hash); + const resolver = contractInputResolvers[inputHashHex]; if (resolver) { - if (m.status == "accepted") - resolver("ok"); - else - resolver(m.reason); - delete contractInputResolvers[sigKey]; + const result = { status: m.status } + + if (m.status == "accepted") { + result.ledgerSeqNo = m.ledger_seq_no; + result.ledgerHash = m.ledger_hash; + } + else { + result.reason = m.reason; + } + + resolver(result); + delete contractInputResolvers[inputHashHex]; } } else if (m.type == "contract_output") { @@ -679,7 +686,10 @@ statResponseResolvers.forEach(resolver => resolver(null)); statResponseResolvers = []; - Object.values(contractInputResolvers).forEach(resolver => resolver(null)); + Object.values(contractInputResolvers).forEach(resolver => resolver({ + status: "failed", + reason: "connection_error" + })); contractInputResolvers = {}; this.onClose && this.onClose(); @@ -745,33 +755,51 @@ return p; } - this.sendContractInput = async (input, nonce = null, maxLclOffset = null) => { + this.submitContractInput = async (input, nonce, maxLedger, isOffset) => { if (connectionStatus != 2) - return Promise.resolve(null); - - if (!maxLclOffset) - maxLclOffset = 10; + throw "Connection error."; + if (maxLedger == 0) + throw "Max ledger seq no. or offset cannot be 0."; + if (!isOffset && !maxLedger) + throw "Max ledger seq. no not specified."; + // Use time-based incrementing nonce if not specified. if (!nonce) nonce = (new Date()).getTime().toString(); else nonce = nonce.toString(); - // Acquire the current lcl and add the specified offset. - const stat = await this.getStatus(); - if (!stat) - return new Promise(resolve => resolve("ledger_status_error")); - const maxLclSeqNo = stat.lclSeqNo + maxLclOffset; + // If max ledger is specified as offset, we need to get current ledger status and add the offset to it. + if (isOffset) { + if (!maxLedger) + maxLedger = 10; // Default offset applied if not specified. - const msg = msgHelper.createContractInput(input, nonce, maxLclSeqNo); - const sigKey = msgHelper.stringifyValue(msg.sig); + // Acquire the current ledger status and add the specified offset. + const stat = await this.getStatus(); + if (!stat) + throw "Error retrieving ledger status." + + maxLedger += stat.lclSeqNo; + } + + const inp = msgHelper.createContractInputComponents(input, nonce, maxLedger); + + const inputHashHex = msgHelper.stringifyValue(inp.hash); + + // Start waiting for this input's accept/rejected status response. const p = new Promise(resolve => { - contractInputResolvers[sigKey] = resolve; + contractInputResolvers[inputHashHex] = resolve; }); + const msg = msgHelper.createContractInputMessage(inp.container, inp.sig); wsSend(msgHelper.serializeObject(msg)); - return p; + + // We return the input hash and a promise which can be resolved to get the input submission status. + return { + hash: msgHelper.binaryEncode(inp.hash), + submissionStatus: p + }; } this.sendContractReadRequest = (request) => { @@ -862,7 +890,8 @@ } } - this.createContractInput = (input, nonce, maxLclSeqNo) => { + // Creates a signed contract input components + this.createContractInputComponents = (input, nonce, maxLedgerSeqNo) => { if (input.length == 0) return null; @@ -870,19 +899,30 @@ const inpContainer = { input: this.serializeInput(input), nonce: nonce, - max_lcl_seq_no: maxLclSeqNo + max_ledger_seq_no: maxLedgerSeqNo } const serlializedInpContainer = this.serializeObject(inpContainer); const sigBytes = sodium.crypto_sign_detached(serlializedInpContainer, keys.privateKey.slice(1)); - const signedInpContainer = { - type: "contract_input", - input_container: serlializedInpContainer, - sig: this.binaryEncode(sigBytes) - } + // Input hash is the blake3 hash of the input signature. + // The input hash can later be used to query input details from the ledger. + const inputHash = new Uint8Array(blake3.hash(sigBytes)); - return signedInpContainer; + return { + hash: inputHash, + container: serlializedInpContainer, + sig: sigBytes + } + } + + this.createContractInputMessage = (container, sig) => { + + return { + type: "contract_input", + input_container: container, + sig: this.binaryEncode(sig) + } } this.createReadRequest = (request) => { diff --git a/examples/js_client/text-client.js b/examples/js_client/text-client.js index 97faec99..39a818c2 100644 --- a/examples/js_client/text-client.js +++ b/examples/js_client/text-client.js @@ -1,6 +1,7 @@ const readline = require('readline'); const HotPocket = require('./hp-client-lib'); + async function main() { const keys = await HotPocket.generateKeys(); @@ -93,10 +94,14 @@ async function main() { if (inp.startsWith("read ")) hpc.sendContractReadRequest(inp.substr(5)); else { - hpc.sendContractInput(inp).then(status => { - if (status != "ok") - console.log(status); - }); + hpc.submitContractInput(inp).then(input => { + // console.log(input.hash); + input.submissionStatus.then(s => { + if (s.status != "accepted") + console.log(s.reason); + }); + }) + } } diff --git a/src/consensus.cpp b/src/consensus.cpp index 10714a1d..795029d1 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -116,12 +116,16 @@ namespace consensus const p2p::sequence_hash last_primary_shard_id = ledger::ctx.get_last_primary_shard_id(); const p2p::sequence_hash last_blob_shard_id = ledger::ctx.get_last_blob_shard_id(); - if (ctx.stage == 0) + if (ctx.stage == 0 || ctx.stage == 2) { // Broadcast non-unl proposal (NUP) containing inputs from locally connected users. - // This is performed at stage 0, so we can to make sure this happens regardless of whether we are in-sync or not. + // This is performed at stage 0 so we can to make sure this happens regardless of whether we are in-sync or not. + // This is also performed at stage 2, so the next round receives the inputs before it starts. broadcast_nonunl_proposal(); + } + if (ctx.stage == 0) + { // Prepare the consensus candidate user inputs that we have accumulated so far. (We receive them periodically via NUPs) // The candidate inputs will be included in the stage 0 proposal. if (verify_and_populate_candidate_user_inputs(lcl_id.seq_no) == -1) @@ -161,8 +165,17 @@ namespace consensus broadcast_proposal(p); // Upon successful consensus at stage 3, update the ledger and execute the contract using the consensus proposal. - if (ctx.stage == 3 && update_ledger_and_execute_contract(p, state_hash, patch_hash, lcl_id) == -1) - LOG_ERROR << "Error occured in Stage 3 consensus execution."; + if (ctx.stage == 3) + { + consensed_user_map consensed_users; + if (prepare_consensed_users_and_inputs(consensed_users, p) == -1 || + update_ledger(p, consensed_users, patch_hash, lcl_id) == -1 || + execute_contract(p, consensed_users, lcl_id) == -1) + LOG_ERROR << "Error occured in Stage 3 consensus execution."; + + // Cleanup any buffers occupied by consensed inputs regardless of any errors occured. + purge_user_input_buffers(consensed_users); + } } // We have finished a consensus stage. Transition or reset stage based on sync status. @@ -320,20 +333,21 @@ namespace consensus while (itr != ctx.candidate_proposals.end()) { const p2p::proposal &cp = itr->second; - const uint64_t time_diff = (time_now > cp.sent_timestamp) ? (time_now - cp.sent_timestamp) : 0; const int8_t stage_diff = ctx.stage - cp.stage; - // only consider recent proposals and proposals from previous stage and current stage. - const bool keep_candidate = (time_diff < (conf::cfg.contract.roundtime * 4)) && (stage_diff == -3 || stage_diff <= 1); + // Only consider this round's proposals which are from previous stage. + const bool keep_candidate = (ctx.round_start_time == cp.time) && (stage_diff == 1); LOG_DEBUG << (keep_candidate ? "Prop--->" : "Erased") << " [s" << std::to_string(cp.stage) << "] u/i:" << cp.users.size() - << "/" << cp.input_hashes.size() - << " ts:" << std::to_string(cp.time) + << "/" << cp.input_ordered_hashes.size() + << " ts:" << cp.time << " state:" << cp.state_hash << " patch:" << cp.patch_hash + << " lps:" << cp.last_primary_shard_id + << " lbs:" << cp.last_blob_shard_id << " [from:" << ((cp.pubkey == conf::cfg.node.public_key) ? "self" : util::to_hex(cp.pubkey).substr(2, 10)) << "]" - << "(" << std::to_string(cp.recv_timestamp > cp.sent_timestamp ? cp.recv_timestamp - cp.sent_timestamp : 0) << "ms)"; + << "(" << (cp.recv_timestamp > cp.sent_timestamp ? (cp.recv_timestamp - cp.sent_timestamp) : 0) << "ms)"; if (keep_candidate) ++itr; @@ -342,6 +356,70 @@ namespace consensus } } + /** + * Prepare the consensed user map including the consensed user inputs based on the consensus proposal. + * @param consensed_users The consensed user map to populate. + * @param cons_prop The proposal that reached consensus. + * @return 0 on success. -1 on failure. + */ + int prepare_consensed_users_and_inputs(consensed_user_map &consensed_users, const p2p::proposal &cons_prop) + { + // Prepare consensed user input set by joining consensus proposal input ordered hashes and candidate user input set. + // consensed inputs are removed from the candidate set. + + int ret = 0; + + // Populate the users map with all consensed users regardless of whether they have inputs or not. + for (const std::string &pubkey : cons_prop.users) + consensed_users.try_emplace(pubkey, std::vector()); + + for (const std::string &ordered_hash : cons_prop.input_ordered_hashes) + { + // For each consensus input ordered hash, we need to find the candidate input. + const auto itr = ctx.candidate_user_inputs.find(ordered_hash); + const bool hash_found = (itr != ctx.candidate_user_inputs.end()); + if (!hash_found) + { + LOG_ERROR << "Input required but wasn't in our candidate inputs map, this will potentially cause desync."; + + // We set error return value but keep on moving candidate inputs to consensed inputs. + // This is so that their underlying buffers can get deallocated during stage 3 execution steps. + ret = -1; + } + else + { + candidate_user_input &ci = itr->second; + consensed_users[ci.user_pubkey].emplace_back(ordered_hash, ci.input, ci.protocol); + + // Erase the consensed input from the candidate set. + ctx.candidate_user_inputs.erase(itr); + } + } + + return ret; + } + + /** + * Purges the underyling buffers that belong to provided consensed user inputs. + * @param consensed_users The consensed user map that contains input pointers. + * @return 0 on success. -1 on failure. + */ + int purge_user_input_buffers(const consensed_user_map &consensed_users) + { + int ret = 0; + + for (const auto &[pubkey, inputs] : consensed_users) + { + for (const consensed_user_input &ci : inputs) + { + if (usr::input_store.purge(ci.input) == -1) + ret = -1; + } + } + + return ret; + } + /** * Syncrhonise the stage/round time for fixed intervals and reset the stage. * @return True if consensus can proceed in the current round. False if stage is reset. @@ -445,8 +523,8 @@ namespace consensus p2p::broadcast_message(fbuf, true, false, !conf::cfg.contract.is_consensus_public); LOG_DEBUG << "Proposed u/i:" << p.users.size() - << "/" << p.input_hashes.size() - << " ts:" << std::to_string(p.time) + << "/" << p.input_ordered_hashes.size() + << " ts:" << p.time << " state:" << p.state_hash << " patch:" << p.patch_hash << " last_primary_shard_id:" << p.last_primary_shard_id @@ -485,9 +563,9 @@ namespace consensus */ int verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no) { - // Maintains users and any input-acceptance responses we should send to them. + // Maintains users and any input-rejected responses we should send to them. // Key: user pubkey. Value: List of responses for that user. - std::unordered_map> responses; + std::unordered_map> rejections; // Maintains merged list of users with each user's inputs grouped under the user. // Key: user pubkey, Value: List of inputs from the user. @@ -527,7 +605,7 @@ namespace consensus if (reject_reason == NULL) extracted_inputs.push_back(std::move(extracted)); else - responses[pubkey].push_back(usr::input_status_response{submitted_input.protocol, submitted_input.sig, reject_reason}); + rejections[pubkey].push_back(usr::input_status_response{submitted_input.protocol, crypto::get_hash(submitted_input.sig), reject_reason}); } // This will sort the inputs in nonce order so the validation will follow the same order on all nodes. @@ -540,26 +618,32 @@ namespace consensus for (const usr::extracted_user_input &extracted_input : extracted_inputs) { util::buffer_view stored_input; // Contains pointer to the input data stored in memfd accessed by the contract. - std::string hash; + std::string ordered_hash; // Validate the input against all submission criteria. - const char *reject_reason = usr::validate_user_input_submission(pubkey, extracted_input, lcl_seq_no, total_input_size, hash, stored_input); + const char *reject_reason = usr::validate_user_input_submission(pubkey, extracted_input, lcl_seq_no, total_input_size, ordered_hash, stored_input); if (reject_reason == NULL && !stored_input.is_null()) { // No reject reason means we should go ahead and subject the input to consensus. ctx.candidate_user_inputs.try_emplace( - hash, - candidate_user_input(pubkey, stored_input, extracted_input.max_lcl_seq_no)); + ordered_hash, + candidate_user_input(pubkey, stored_input, extracted_input.max_ledger_seq_no, extracted_input.protocol)); } - responses[pubkey].push_back(usr::input_status_response{extracted_input.protocol, extracted_input.sig, reject_reason}); + // If the input was rejected we need to inform the user. + if (reject_reason != NULL) + { + // We need to consider the last 32 bytes of each ordered hash to get input hash without the nonce prefix. + const std::string input_hash = std::string(util::get_string_suffix(ordered_hash, BLAKE3_OUT_LEN)); + rejections[pubkey].push_back(usr::input_status_response{extracted_input.protocol, std::move(input_hash), reject_reason}); + } } } input_groups.clear(); - usr::send_input_status_responses(responses); + usr::send_input_status_responses(rejections); return 0; } @@ -583,7 +667,7 @@ namespace consensus // Populate the proposal with hashes of user inputs. for (const auto &[hash, cand_input] : ctx.candidate_user_inputs) - p.input_hashes.emplace(hash); + p.input_ordered_hashes.emplace(hash); // Populate the output hash and our signature. This is the merkle tree root hash of user outputs and state hash. p.output_hash = ctx.user_outputs_hashtree.root_hash(); @@ -625,9 +709,9 @@ namespace consensus increment(votes.users, pubkey); // Vote for user inputs (hashes). Only vote for the inputs that are in our candidate_inputs set. - for (const std::string &hash : cp.input_hashes) - if (ctx.candidate_user_inputs.count(hash) > 0) - increment(votes.inputs, hash); + for (const std::string &ordered_hash : cp.input_ordered_hashes) + if (ctx.candidate_user_inputs.count(ordered_hash) > 0) + increment(votes.inputs, ordered_hash); // Vote for contract output hash. increment(votes.output_hash, cp.output_hash); @@ -648,7 +732,7 @@ namespace consensus // Add inputs which have votes over stage threshold to proposal. for (const auto &[hash, numvotes] : votes.inputs) if (numvotes >= required_votes || (ctx.stage == 1 && numvotes > 0)) - p.input_hashes.emplace(hash); + p.input_ordered_hashes.emplace(hash); // Reset required votes for majority votes. required_votes = ceil(MAJORITY_THRESHOLD * unl_count); @@ -800,7 +884,7 @@ namespace consensus /** * Check state hash against the winning and canonical state hash. * @param is_state_desync Flag to determine whether contract state is out of sync. - * @param majority_state_hash Consensused state hash. + * @param majority_state_hash consensed state hash. * @param votes The voting table. */ void check_state_votes(bool &is_state_desync, util::h32 &majority_state_hash, vote_counter &votes) @@ -826,7 +910,7 @@ namespace consensus /** * Check state hash against the winning and canonical state hash. * @param is_patch_desync Flag to determine whether patch file is out of sync. - * @param majority_patch_hash Consensused patch hash. + * @param majority_patch_hash consensed patch hash. * @param votes The voting table. */ void check_patch_votes(bool &is_patch_desync, util::h32 &majority_patch_hash, vote_counter &votes) @@ -850,16 +934,16 @@ namespace consensus } /** - * Update the ledger and execute the contract after consensus. + * Update the ledger after reaching consensus. * @param cons_prop The proposal that reached consensus. - * @param new_state_hash The state hash. + * @param consensed_users Consensed users and their inputs. * @param patch_hash The patch hash. * @param lcl_id Last lcl seq_no and hash. - * @param last_primary_shard_id Last primary shard id. + * @return 0 on success. -1 on error. */ - int update_ledger_and_execute_contract(const p2p::proposal &cons_prop, util::h32 &new_state_hash, const util::h32 &patch_hash, p2p::sequence_hash &new_lcl_id) + int update_ledger(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const util::h32 &patch_hash, p2p::sequence_hash &new_lcl_id) { - if (ledger::save_ledger(cons_prop, ctx.candidate_user_inputs, ctx.generated_user_outputs) == -1) + if (ledger::save_ledger(cons_prop, consensed_users, ctx.generated_user_outputs) == -1) return -1; new_lcl_id = ledger::ctx.get_lcl_id(); @@ -867,6 +951,13 @@ namespace consensus LOG_INFO << "****Ledger created**** (lcl:" << new_lcl_id << " state:" << cons_prop.state_hash << " patch:" << cons_prop.patch_hash << ")"; + // Send back the inputs "accepted" responses to the user. + if (dispatch_consensed_user_input_responses(consensed_users, new_lcl_id) == -1) + return -1; + + // Send any output from the previous consensus round to locally connected users. + dispatch_user_outputs(cons_prop, new_lcl_id); + // Apply consensed patch file changes to the hpcore runtime and hp.cfg. if (apply_consensed_patch_file_changes(cons_prop.patch_hash, patch_hash) == -1) return -1; @@ -877,64 +968,107 @@ namespace consensus while (itr != ctx.candidate_user_inputs.end()) { if (itr->second.max_ledger_seq_no <= new_lcl_id.seq_no) + { + // Erase the candidate input along with its data buffer in the input store. + usr::input_store.purge(itr->second.input); ctx.candidate_user_inputs.erase(itr++); + } else + { ++itr; + } } } - // Send any output from the previous consensus round to locally connected users. - if (dispatch_user_outputs(cons_prop, new_lcl_id) == -1) - return -1; + return 0; + } + + /** + * Executes the contract after consensus. + * @param cons_prop The proposal that reached consensus. + * @param consensed_users Consensed users and their inputs. + * @param lcl_id Current lcl id of the node. + */ + int execute_contract(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id) + { + if (!conf::cfg.contract.execute || ctx.is_shutting_down) + return 0; - // Execute the contract - if (conf::cfg.contract.execute && !ctx.is_shutting_down) { + std::scoped_lock lock(ctx.contract_ctx_mutex); + ctx.contract_ctx.emplace(usr::input_store); + } + + sc::contract_execution_args &args = ctx.contract_ctx->args; + args.readonly = false; + args.time = cons_prop.time; + + // lcl to be passed to the contract. + args.lcl_id = lcl_id; + + // Populate contract user bufs. + feed_user_inputs_to_contract_bufmap(args.userbufs, consensed_users); + + if (sc::execute_contract(ctx.contract_ctx.value()) == -1) + { + LOG_ERROR << "Consensus contract execution failed."; + return -1; + } + + // Get the new state hash after contract execution. + const util::h32 &new_state_hash = args.post_execution_state_hash; + + // Update state hash in contract fs global hash tracker. + sc::contract_fs.set_parent_hash(sc::STATE_DIR_PATH, new_state_hash); + + extract_user_outputs_from_contract_bufmap(args.userbufs); + + // Generate user output hash merkle tree and signature with state hash included. + if (!ctx.generated_user_outputs.empty()) + { + std::vector hashes; + for (const auto &[hash, output] : ctx.generated_user_outputs) + hashes.push_back(hash); + hashes.push_back(new_state_hash.to_string_view()); + ctx.user_outputs_hashtree.populate(hashes); + ctx.user_outputs_our_sig = crypto::sign(ctx.user_outputs_hashtree.root_hash(), conf::cfg.node.private_key); + } + + { + std::scoped_lock lock(ctx.contract_ctx_mutex); + ctx.contract_ctx.reset(); + } + + return 0; + } + + /** + * Dispatch acceptance status responses to consensed user inputs, if the recipients are connected to us locally. + * @param consensed_users The map of consensed users and their inputs. + * @param lcl_id The ledger the inputs got included in. + * @return 0 on success. -1 on failure. + */ + int dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id) + { + std::unordered_map> responses; + + for (const auto &[pubkey, inputs] : consensed_users) + { + if (inputs.empty()) + continue; + + const auto [itr, success] = responses.emplace(pubkey, std::vector()); + + for (const consensed_user_input &ci : inputs) { - std::scoped_lock lock(ctx.contract_ctx_mutex); - ctx.contract_ctx.emplace(usr::input_store); - } - - sc::contract_execution_args &args = ctx.contract_ctx->args; - args.readonly = false; - args.time = cons_prop.time; - - // lcl to be passed to the contract. - args.lcl_id = new_lcl_id; - - // Populate user bufs. - if (feed_user_inputs_to_contract_bufmap(args.userbufs, cons_prop) == -1) - return -1; - - if (sc::execute_contract(ctx.contract_ctx.value()) == -1) - { - LOG_ERROR << "Contract execution failed."; - return -1; - } - - // Update state hash in contract fs global hash tracker. - sc::contract_fs.set_parent_hash(sc::STATE_DIR_PATH, args.post_execution_state_hash); - new_state_hash = args.post_execution_state_hash; - - extract_user_outputs_from_contract_bufmap(args.userbufs); - - // Generate user output hash merkle tree and signature with state hash included. - if (!ctx.generated_user_outputs.empty()) - { - std::vector hashes; - for (const auto &[hash, output] : ctx.generated_user_outputs) - hashes.push_back(hash); - hashes.push_back(new_state_hash.to_string_view()); - ctx.user_outputs_hashtree.populate(hashes); - ctx.user_outputs_our_sig = crypto::sign(ctx.user_outputs_hashtree.root_hash(), conf::cfg.node.private_key); - } - - { - std::scoped_lock lock(ctx.contract_ctx_mutex); - ctx.contract_ctx.reset(); + // We need to consider the last 32 bytes of each ordered hash to get input hash without the nonce prefix. + const std::string input_hash = std::string(util::get_string_suffix(ci.ordered_hash, BLAKE3_OUT_LEN)); + itr->second.push_back(usr::input_status_response{ci.protocol, input_hash, NULL}); } } + usr::send_input_status_responses(responses, lcl_id.seq_no, lcl_id.hash); + return 0; } @@ -943,7 +1077,7 @@ namespace consensus * @param cons_prop The proposal that achieved consensus. * @param lcl_id Lcl sequnce no hash info. */ - int dispatch_user_outputs(const p2p::proposal &cons_prop, const p2p::sequence_hash &lcl_id) + void dispatch_user_outputs(const p2p::proposal &cons_prop, const p2p::sequence_hash &lcl_id) { if (cons_prop.output_hash == ctx.user_outputs_hashtree.root_hash()) { @@ -954,7 +1088,7 @@ namespace consensus for (auto &[hash, user_output] : ctx.generated_user_outputs) { // Find user to send by pubkey. - const auto user_itr = usr::ctx.users.find(user_output.userpubkey); + const auto user_itr = usr::ctx.users.find(user_output.user_pubkey); if (user_itr != usr::ctx.users.end()) // match found { const usr::connected_user &user = user_itr->second; @@ -988,48 +1122,26 @@ namespace consensus ctx.user_outputs_our_sig.clear(); ctx.user_outputs_unl_sig.clear(); ctx.generated_user_outputs.clear(); - - return 0; } /** * Transfers consensus-reached inputs into the provided contract buf map so it can be fed into the contract process. * @param bufmap The contract bufmap which needs to be populated with inputs. - * @param cons_prop The proposal that achieved consensus. + * @param consensed_users Set of consensed users keyed by user binary pubkey and any inputs. */ - int feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop) + void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const consensed_user_map &consensed_users) { - // Populate the buf map with all currently connected users regardless of whether they have inputs or not. - // This is in case the contract wanted to emit some data to a user without needing any input. - for (const std::string &pubkey : cons_prop.users) + for (const auto &[pubkey, inputs] : consensed_users) { - bufmap.try_emplace(pubkey, sc::contract_iobufs()); + // Populate the buf map with user pubkey regardless of whether user has any inputs or not. + // This is in case the contract wanted to emit some data to a user without needing any input. + const auto [itr, success] = bufmap.emplace(pubkey, sc::contract_iobufs()); + + // Populate the input contents into the bufmap. + // It's VERY important that we preserve the original input order when feeding to the contract as well. + for (const consensed_user_input &ci : inputs) + itr->second.inputs.push_back(ci.input); } - - for (const std::string &hash : cons_prop.input_hashes) - { - // For each consensus input hash, we need to find the actual input content to feed the contract. - const auto itr = ctx.candidate_user_inputs.find(hash); - const bool hashfound = (itr != ctx.candidate_user_inputs.end()); - if (!hashfound) - { - LOG_ERROR << "Input required but wasn't in our candidate inputs map, this will potentially cause desync."; - return -1; - } - else - { - // Populate the input content into the bufmap. - // It's VERY important that we preserve the proposal input hash order when feeding to the contract as well. - candidate_user_input &cand_input = itr->second; - sc::contract_iobufs &contract_user = bufmap[cand_input.user_pubkey]; - contract_user.inputs.push_back(cand_input.input); - - // Remove the input from the candidate set because we no longer need it. - ctx.candidate_user_inputs.erase(itr); - } - } - - return 0; } /** @@ -1039,24 +1151,21 @@ namespace consensus */ void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap) { - for (auto &[pubkey, bufs] : bufmap) + for (const auto &[pubkey, bufs] : bufmap) { + // For each user calculate the total hash of their outputs. + // Final hash for user = hash(pubkey + outputs...) + if (!bufs.outputs.empty()) { // Generate hash of all sorted outputs combined with user pubkey. - - std::vector vect; - for (sc::contract_output &output : bufs.outputs) - vect.push_back(output.message); - - // We sort all outputs so every node calculates the final hash the same way. - std::sort(vect.begin(), vect.end()); - - // Adding user public key. - vect.push_back(pubkey); + std::vector to_hash; + to_hash.push_back(pubkey); + for (const sc::contract_output &con_out : bufs.outputs) + to_hash.push_back(con_out.message); ctx.generated_user_outputs.try_emplace( - crypto::get_hash(vect), + crypto::get_list_hash(to_hash), generated_user_output(pubkey, std::move(bufs.outputs))); } } diff --git a/src/consensus.hpp b/src/consensus.hpp index 35212e77..4d7cdd07 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -14,29 +14,52 @@ namespace consensus { /** * Represents a contract input that takes part in consensus. + * This is used in a map keyed by input ordered hash. */ struct candidate_user_input { const std::string user_pubkey; - const uint64_t max_ledger_seq_no = 0; const util::buffer_view input; + const uint64_t max_ledger_seq_no = 0; + const util::PROTOCOL protocol; - candidate_user_input(const std::string user_pubkey, const util::buffer_view input, const uint64_t max_ledger_seq_no) - : user_pubkey(std::move(user_pubkey)), input(input), max_ledger_seq_no(max_ledger_seq_no) + candidate_user_input(const std::string &user_pubkey, const util::buffer_view input, const uint64_t max_ledger_seq_no, const util::PROTOCOL protocol) + : user_pubkey(user_pubkey), input(input), max_ledger_seq_no(max_ledger_seq_no), protocol(protocol) { } }; + /** + * Represents consensus reached user input. + * This is used in a map keyed by user pubkey. + */ + struct consensed_user_input + { + const std::string ordered_hash; // [nonce] + [input signature hash] + const util::buffer_view input; // The input data buffer pointer. + const util::PROTOCOL protocol; // json/bson protocol used by the user when submitting the input. + + consensed_user_input(const std::string &ordered_hash, const util::buffer_view input, const util::PROTOCOL protocol) + : ordered_hash(ordered_hash), input(input), protocol(protocol) + { + } + }; + + /** + * Consensed inputs map keyed by user binary pubkey. + */ + typedef std::map> consensed_user_map; + /** * Represents a contract-generated user output that takes part in consensus. */ struct generated_user_output { - const std::string userpubkey; + const std::string user_pubkey; std::list outputs; - generated_user_output(const std::string userpubkey, const std::list outputs) - : userpubkey(std::move(userpubkey)), outputs(std::move(outputs)) + generated_user_output(const std::string &user_pubkey, const std::list &&outputs) + : user_pubkey(user_pubkey), outputs(outputs) { } }; @@ -54,8 +77,8 @@ namespace consensus // Set of user pubkeys that is said to be connected to the cluster. This will be cleared in each round. std::set candidate_users; - // Map of candidate user inputs with input hash as map key. Inputs will stay here until they - // achieve consensus or expire (due to max_ledger_seq_no). Input hash is globally unique among inputs + // Map of candidate user inputs with ordered hash as map key. Inputs will stay here until they + // achieve consensus or expire (due to max_ledger_seq_no). Ordered hash is globally unique among inputs // from all users. We will use this map to feed inputs into the contract once consensus is achieved. std::map candidate_user_inputs; @@ -117,6 +140,10 @@ namespace consensus void revise_candidate_proposals(); + int prepare_consensed_users_and_inputs(consensed_user_map &consensed_users, const p2p::proposal &cons_prop); + + int purge_user_input_buffers(const consensed_user_map &consensed_users); + bool wait_and_proceed_stage(); void broadcast_nonunl_proposal(); @@ -147,11 +174,15 @@ namespace consensus uint64_t get_stage_time_resolution(const uint64_t time); - int update_ledger_and_execute_contract(const p2p::proposal &cons_prop, util::h32 &new_state_hash, const util::h32 &patch_hash, p2p::sequence_hash &new_lcl_id); + int update_ledger(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const util::h32 &patch_hash, p2p::sequence_hash &new_lcl_id); - int dispatch_user_outputs(const p2p::proposal &cons_prop, const p2p::sequence_hash &lcl_id); + int execute_contract(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id); - int feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop); + int dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id); + + void dispatch_user_outputs(const p2p::proposal &cons_prop, const p2p::sequence_hash &lcl_id); + + void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const consensed_user_map &consensed_users); void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap); diff --git a/src/crypto.cpp b/src/crypto.cpp index b1156f47..9014f8c8 100644 --- a/src/crypto.cpp +++ b/src/crypto.cpp @@ -154,14 +154,17 @@ namespace crypto } /** - * Generates blake3 hash for the given string view vector using stream hashing. + * Generates blake3 hash for the given list of strings using stream hashing. + * @param str_list Any list container of list of strings or string_views. + * @return The combined blake32 hash of elements in listed order. */ - const std::string get_hash(const std::vector &sw_vect) + template + const std::string get_list_hash(const T &str_list) { std::string hash; hash.resize(BLAKE3_OUT_LEN); - - if (sw_vect.empty()) + + if (str_list.empty()) { return hash; } @@ -171,41 +174,17 @@ namespace crypto blake3_hasher_init(&hasher); // Hash is generated only using message in contract output struct. - for (std::string_view sw : sw_vect) - blake3_hasher_update(&hasher, reinterpret_cast(sw.data()), sw.length()); - - // Get the final hash. - blake3_hasher_finalize(&hasher, reinterpret_cast(hash.data()), hash.length()); - - return hash; - } - - /** - * Generates blake3 hash for the given string set using stream hashing. - */ - const std::string get_hash(const std::set &sw_set) - { - std::string hash; - hash.resize(BLAKE3_OUT_LEN); - - if (sw_set.empty()) - { - return hash; - } - - // Init stream hashing. - blake3_hasher hasher; - blake3_hasher_init(&hasher); - - // Hash is generated only using message in contract output struct. - for (std::string_view sw : sw_set) - blake3_hasher_update(&hasher, reinterpret_cast(sw.data()), sw.length()); + for (std::string_view sv : str_list) + blake3_hasher_update(&hasher, reinterpret_cast(sv.data()), sv.length()); // Get the final hash. blake3_hasher_finalize(&hasher, reinterpret_cast(hash.data()), hash.length()); return hash; } + template const std::string get_list_hash>(const std::set &str_list); + template const std::string get_list_hash>(const std::vector &str_list); + template const std::string get_list_hash>(const std::vector &str_list); const std::string generate_uuid() { diff --git a/src/crypto.hpp b/src/crypto.hpp index 963183d5..630eeb3e 100644 --- a/src/crypto.hpp +++ b/src/crypto.hpp @@ -29,9 +29,8 @@ namespace crypto const std::string get_hash(std::string_view s1, std::string_view s2); - const std::string get_hash(const std::vector &sw_vect); - - const std::string get_hash(const std::set &sw_set); + template + const std::string get_list_hash(const T &str_list); const std::string generate_uuid(); diff --git a/src/ledger/ledger.cpp b/src/ledger/ledger.cpp index 5eb945aa..c0041604 100644 --- a/src/ledger/ledger.cpp +++ b/src/ledger/ledger.cpp @@ -85,23 +85,21 @@ namespace ledger /** * Create and save ledger record from the given proposal message. * @param proposal Consensus-reached Stage 3 proposal. - * @param candidate_user_inputs Raw inputs received in this consensus round. + * @param consensed_users Users and their raw inputs received in this consensus round. * @param generated_user_outputs Generated raw outputs in this consensus round. * @return Returns 0 on success -1 on error. */ - int save_ledger(const p2p::proposal &proposal, const std::map &candidate_user_inputs, + int save_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const std::map &generated_user_outputs) { - const p2p::sequence_hash lcl_id = ctx.get_lcl_id(); - uint64_t seq_no = lcl_id.seq_no; - const std::string prev_ledger_hash(lcl_id.hash.to_string_view()); - - seq_no++; // New lcl sequence number. - // Aqure hpfs rw session before accessing shards and insert ledger records. if (ledger_fs.acquire_rw_session() == -1) return -1; + const p2p::sequence_hash lcl_id = ctx.get_lcl_id(); + uint64_t seq_no = lcl_id.seq_no; + seq_no++; // New ledger sequence number. + sqlite3 *db = NULL; // Prepare shard folders and database and get the primary shard sequence number. @@ -109,62 +107,20 @@ namespace ledger if (prepare_shard(&db, primary_shard_seq_no, seq_no) == -1) LEDGER_CREATE_ERROR; - // Combined binary hash of consensus user binary pub keys. - const std::string user_hash = crypto::get_hash(proposal.users); - // Combined binary hash of consensus input hashes. - const std::string input_hash = crypto::get_hash(proposal.input_hashes); - - uint8_t seq_no_byte_str[8], time_byte_str[8]; - util::uint64_to_bytes(seq_no_byte_str, seq_no); - util::uint64_to_bytes(time_byte_str, proposal.time); - - // Contruct binary string for data hash. - - std::string data; - data.reserve(sizeof(seq_no_byte_str) + sizeof(time_byte_str) + (sizeof(util::h32) * 5)); - data.append((char *)seq_no_byte_str); - data.append((char *)time_byte_str); - data.append(proposal.state_hash.to_string_view()); - data.append(proposal.patch_hash.to_string_view()); - data.append(user_hash); - data.append(input_hash); - data.append(proposal.output_hash); - - // Combined binary hash of data fields. blake3(seq_no + time + state_hash + patch_hash + user_hash + input_hash + output_hash) - const std::string data_hash = crypto::get_hash(data); - - // Ledger hash is the combined hash of previous ledger hash and the new data hash. - const std::string ledger_hash = crypto::get_hash(prev_ledger_hash, data_hash); - - // Construct ledger struct with binary hashes. - const ledger_record ledger{ - seq_no, - proposal.time, - ledger_hash, - prev_ledger_hash, - data_hash, - std::string(proposal.state_hash.to_string_view()), - std::string(proposal.patch_hash.to_string_view()), - user_hash, - input_hash, - proposal.output_hash}; // Merkle root output hash. - - if (sqlite::insert_ledger_row(db, ledger) == -1) - { - LOG_ERROR << errno << ": Error creating the ledger, shard: " << std::to_string(primary_shard_seq_no); + // Insert primary ledger record. + std::string new_ledger_hash; + if (insert_ledger_record(db, lcl_id, primary_shard_seq_no, proposal, new_ledger_hash) == -1) LEDGER_CREATE_ERROR; - } - if ((!candidate_user_inputs.empty() || !generated_user_outputs.empty()) && save_ledger_blob(ledger_hash, candidate_user_inputs, generated_user_outputs) == -1) - { - LOG_ERROR << errno << ": Error saving the raw inputs/outputs, shard: " << std::to_string(primary_shard_seq_no); + // Save blob data. + if ((!proposal.input_ordered_hashes.empty() || !generated_user_outputs.empty()) && + save_ledger_blob(new_ledger_hash, consensed_users, generated_user_outputs) == -1) LEDGER_CREATE_ERROR; - } // Update the latest seq_no and lcl when ledger is created. p2p::sequence_hash new_lcl_id; new_lcl_id.seq_no = seq_no; - new_lcl_id.hash = ledger_hash; + new_lcl_id.hash = new_ledger_hash; ctx.set_lcl_id(new_lcl_id); const std::string shard_vpath = std::string(ledger::PRIMARY_DIR).append("/").append(std::to_string(primary_shard_seq_no)); @@ -195,11 +151,79 @@ namespace ledger return ledger_fs.release_rw_session(); } + /** + * Inserts new ledger record to the sqlite database. + * @param db Database connection to use. + * @param current_lcl_id Current lcl id. + * @param primary_shard_seq_no Current primary shard seq no. + * @param proposal The consensus proposal. + * @param new_ledger_hash Hash of the ledger that got isnerted. + * @return 0 on success. -1 on failure. + */ + int insert_ledger_record(sqlite3 *db, const p2p::sequence_hash ¤t_lcl_id, const uint64_t primary_shard_seq_no, + const p2p::proposal &proposal, std::string &new_ledger_hash) + { + // Combined binary hash of consensus user binary pub keys. + const std::string user_hash = crypto::get_list_hash(proposal.users); + + // Combined binary hash of consensus input hashes. + std::vector inp_hashes; + for (const std::string &o_hash : proposal.input_ordered_hashes) + { + // We need to consider the last 32 bytes of each ordered hash to get input hash without the nonce prefix. + inp_hashes.push_back(util::get_string_suffix(o_hash, BLAKE3_OUT_LEN)); + } + const std::string input_hash = crypto::get_list_hash(inp_hashes); + + uint8_t seq_no_bytes[8], time_bytes[8]; + util::uint64_to_bytes(seq_no_bytes, current_lcl_id.seq_no); + util::uint64_to_bytes(time_bytes, proposal.time); + + // Contruct binary string for data hash. + std::vector data; + data.emplace_back((char *)seq_no_bytes, sizeof(seq_no_bytes)); + data.emplace_back((char *)time_bytes, sizeof(time_bytes)); + data.push_back(proposal.state_hash.to_string_view()); + data.push_back(proposal.patch_hash.to_string_view()); + data.push_back(user_hash); + data.push_back(input_hash); + data.push_back(proposal.output_hash); + + // Combined binary hash of data fields. blake3(seq_no + time + state_hash + patch_hash + user_hash + input_hash + output_hash) + const std::string data_hash = crypto::get_list_hash(data); + + const std::string prev_ledger_hash(current_lcl_id.hash.to_string_view()); + + // Ledger hash is the combined hash of previous ledger hash and the new data hash. + new_ledger_hash = crypto::get_hash(prev_ledger_hash, data_hash); + + // Construct ledger struct with binary hashes. + const ledger_record ledger{ + current_lcl_id.seq_no + 1, + proposal.time, + new_ledger_hash, + prev_ledger_hash, + data_hash, + std::string(proposal.state_hash.to_string_view()), + std::string(proposal.patch_hash.to_string_view()), + user_hash, + input_hash, + proposal.output_hash}; // Merkle root output hash. + + if (sqlite::insert_ledger_row(db, ledger) == -1) + { + LOG_ERROR << errno << ": Error creating the ledger, shard: " << primary_shard_seq_no; + return -1; + } + + return 0; + } + /** * Opens a db connection to a shard and populates the shard_seq_no. * @param db Database connection to be openned. * @param ledger_seq_no Ledger sequence number. - * @return Returns 0 on success -1 on failure. + * @return 0 on success. -1 on failure. */ int prepare_shard(sqlite3 **db, uint64_t &shard_seq_no, const uint64_t ledger_seq_no) { @@ -386,14 +410,14 @@ namespace ledger } /** - * Save raw data from the consensused proposal. A blob file is only created if there is any user inputs or contract outputs + * Save raw data from the consensed proposal. A blob file is only created if there is any user inputs or contract outputs * to save disk space. * @param ledger_hash Hash of this ledger we are saving. - * @param candidate_user_inputs Raw inputs received in this consensus round. + * @param consensed_users Users and their raw inputs consensed in this consensus round. * @param generated_user_outputs Generated raw outputs in this consensus round. * @return Returns 0 on success -1 on error. */ - int save_ledger_blob(std::string_view ledger_hash, const std::map &candidate_user_inputs, + int save_ledger_blob(std::string_view ledger_hash, const consensus::consensed_user_map &consensed_users, const std::map &generated_user_outputs) { // Construct shard path. @@ -474,25 +498,28 @@ namespace ledger ledger_blob blob; blob.ledger_hash = ledger_hash; - for (const auto &[hash, user_input] : candidate_user_inputs) + + // Include consensed user inputs. + for (const auto &[pubkey, inputs] : consensed_users) { - std::string input; - if (usr::input_store.read_buf(user_input.input, input) != -1) + const auto [itr, success] = blob.inputs.try_emplace(pubkey, std::vector()); + + for (const consensus::consensed_user_input &ci : inputs) { - const auto itr = blob.inputs.find(user_input.user_pubkey); - if (itr == blob.inputs.end()) - blob.inputs.emplace(user_input.user_pubkey, std::vector()); - blob.inputs[user_input.user_pubkey].push_back(input); + std::string input; + if (usr::input_store.read_buf(ci.input, input) != -1) + itr->second.push_back(input); } } + + // Include consensed user outputs. for (const auto &[hash, user_output] : generated_user_outputs) { std::vector outputs; for (const auto &output : user_output.outputs) - { - outputs.push_back(output.message); - } - blob.outputs.emplace(user_output.userpubkey, std::move(outputs)); + outputs.push_back(std::move(output.message)); + + blob.outputs.emplace(user_output.user_pubkey, std::move(outputs)); } flatbuffers::FlatBufferBuilder builder(1024); diff --git a/src/ledger/ledger.hpp b/src/ledger/ledger.hpp index ad4128a5..c4b80d45 100644 --- a/src/ledger/ledger.hpp +++ b/src/ledger/ledger.hpp @@ -70,12 +70,15 @@ namespace ledger void deinit(); - int save_ledger(const p2p::proposal &proposal, const std::map &candidate_user_inputs, + int save_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, const std::map &generated_user_outputs); + int insert_ledger_record(sqlite3 *db, const p2p::sequence_hash ¤t_lcl_id, const uint64_t primary_shard_seq_no, + const p2p::proposal &proposal, std::string &new_ledger_hash); + int prepare_shard(sqlite3 **db, uint64_t &shard_seq_no, const uint64_t ledger_seq_no); - int save_ledger_blob(std::string_view ledger_hash, const std::map &candidate_user_inputs, + int save_ledger_blob(std::string_view ledger_hash, const consensus::consensed_user_map &consensed_users, const std::map &generated_user_outputs); void remove_old_shards(const uint64_t led_shard_no, std::string_view shard_parent_dir); diff --git a/src/ledger/ledger_common.hpp b/src/ledger/ledger_common.hpp index 390bf329..03d6f890 100644 --- a/src/ledger/ledger_common.hpp +++ b/src/ledger/ledger_common.hpp @@ -16,8 +16,8 @@ namespace ledger */ struct ledger_record { - uint64_t seq_no; - uint64_t timestamp; + uint64_t seq_no = 0; + uint64_t timestamp = 0; std::string ledger_hash; std::string prev_ledger_hash; std::string data_hash; diff --git a/src/ledger/sqlite.cpp b/src/ledger/sqlite.cpp index 2acc216a..3a86dafc 100644 --- a/src/ledger/sqlite.cpp +++ b/src/ledger/sqlite.cpp @@ -24,7 +24,7 @@ namespace ledger::sqlite "state_hash, patch_hash, user_hash, input_hash, output_hash" ") VALUES(?,?,?,?,?,?,?,?,?,?)"; -#define BIND_H32_BLOB(idx, field) (sqlite3_bind_blob(stmt, idx, field.data(), sizeof(util::h32), SQLITE_STATIC) == SQLITE_OK) +#define BIND_H32_BLOB(idx, field) (field.size() == sizeof(util::h32) && sqlite3_bind_blob(stmt, idx, field.data(), sizeof(util::h32), SQLITE_STATIC) == SQLITE_OK) #define GET_H32_BLOB(idx) std::string((char *)sqlite3_column_blob(stmt, idx), sizeof(util::h32)) /** diff --git a/src/msg/bson/usrmsg_bson.cpp b/src/msg/bson/usrmsg_bson.cpp index a387e57b..8495f6cc 100644 --- a/src/msg/bson/usrmsg_bson.cpp +++ b/src/msg/bson/usrmsg_bson.cpp @@ -75,13 +75,17 @@ namespace msg::usrmsg::bson * "type": "contract_input_status", * "status": "", * "reason": "", - * "input_sig": + * "input_hash": , + * "ledger_seq_no": , + * "ledger_hash": "" * } * @param is_accepted Whether the original message was accepted or not. * @param reason Rejected reason. Empty if accepted. - * @param input_sig Binary signature of the original input message which generated this result. + * @param input_hash Binary Hash of the original input signature. This is used by user + * to tie the response with the input submission. */ - void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, std::string_view input_sig) + void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, + std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash) { jsoncons::bson::bson_bytes_encoder encoder(msg); encoder.begin_object(); @@ -89,10 +93,26 @@ namespace msg::usrmsg::bson encoder.string_value(msg::usrmsg::MSGTYPE_CONTRACT_INPUT_STATUS); encoder.key(msg::usrmsg::FLD_STATUS); encoder.string_value(status); - encoder.key(msg::usrmsg::FLD_REASON); - encoder.string_value(reason); - encoder.key(msg::usrmsg::FLD_INPUT_SIG); - encoder.byte_string_value(input_sig); + + // Reject reason is only included for rejected inputs. + if (!reason.empty()) + { + encoder.key(msg::usrmsg::FLD_REASON); + encoder.string_value(reason); + } + + encoder.key(msg::usrmsg::FLD_INPUT_HASH); + encoder.byte_string_value(input_hash); + + // Ledger information is only included in 'accepted' input statuses. + if (ledger_seq_no > 0) + { + encoder.key(msg::usrmsg::FLD_LEDGER_SEQ_NO); + encoder.uint64_value(ledger_seq_no); + encoder.key(msg::usrmsg::FLD_LEDGER_HASH); + encoder.byte_string_value(ledger_hash.to_string_view()); + } + encoder.end_object(); encoder.flush(); } @@ -334,16 +354,16 @@ namespace msg::usrmsg::bson * Extract the individual components of a given input container bson. * @param input The extracted input. * @param nonce The extracted nonce. - * @param max_lcl_seq_no The extracted max ledger sequence no. + * @param max_ledger_seq_no The extracted max ledger sequence no. * @param contentjson The bson input container message. * { * "input": , * "nonce": "", - * "max_lcl_seq_no": + * "max_ledger_seq_no": * } * @return 0 on succesful extraction. -1 on failure. */ - int extract_input_container(std::string &input, std::string &nonce, uint64_t &max_lcl_seq_no, std::string_view contentbson) + int extract_input_container(std::string &input, std::string &nonce, uint64_t &max_ledger_seq_no, std::string_view contentbson) { jsoncons::ojson d; try @@ -356,13 +376,13 @@ namespace msg::usrmsg::bson return -1; } - if (!d.contains(msg::usrmsg::FLD_INPUT) || !d.contains(msg::usrmsg::FLD_NONCE) || !d.contains(msg::usrmsg::FLD_MAX_LCL_SEQ)) + if (!d.contains(msg::usrmsg::FLD_INPUT) || !d.contains(msg::usrmsg::FLD_NONCE) || !d.contains(msg::usrmsg::FLD_MAX_LEDGER_SEQ_NO)) { LOG_DEBUG << "User input container required fields missing or invalid."; return -1; } - if (!d[msg::usrmsg::FLD_INPUT].is_byte_string_view() || !d[msg::usrmsg::FLD_NONCE].is_string() || !d[msg::usrmsg::FLD_MAX_LCL_SEQ].is_uint64()) + if (!d[msg::usrmsg::FLD_INPUT].is_byte_string_view() || !d[msg::usrmsg::FLD_NONCE].is_string() || !d[msg::usrmsg::FLD_MAX_LEDGER_SEQ_NO].is_uint64()) { LOG_DEBUG << "User input container invalid field values."; return -1; @@ -372,7 +392,7 @@ namespace msg::usrmsg::bson input = std::string_view(reinterpret_cast(bsv.data()), bsv.size()); nonce = d[msg::usrmsg::FLD_NONCE].as(); - max_lcl_seq_no = d[msg::usrmsg::FLD_MAX_LCL_SEQ].as(); + max_ledger_seq_no = d[msg::usrmsg::FLD_MAX_LEDGER_SEQ_NO].as(); return 0; } diff --git a/src/msg/bson/usrmsg_bson.hpp b/src/msg/bson/usrmsg_bson.hpp index 89e53a9b..e97c99b7 100644 --- a/src/msg/bson/usrmsg_bson.hpp +++ b/src/msg/bson/usrmsg_bson.hpp @@ -11,7 +11,7 @@ namespace msg::usrmsg::bson void create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl_hash); void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, - std::string_view input_sig); + std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash); void create_contract_read_response_container(std::vector &msg, std::string_view content); @@ -37,7 +37,7 @@ namespace msg::usrmsg::bson const jsoncons::ojson &d); int extract_input_container(std::string &input, std::string &nonce, - uint64_t &max_lcl_seq_no, std::string_view contentbson); + uint64_t &max_ledger_seq_no, std::string_view contentbson); int extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id, const jsoncons::ojson &d); diff --git a/src/msg/fbuf/p2pmsg_conversion.cpp b/src/msg/fbuf/p2pmsg_conversion.cpp index 1a03faee..bbee94d3 100644 --- a/src/msg/fbuf/p2pmsg_conversion.cpp +++ b/src/msg/fbuf/p2pmsg_conversion.cpp @@ -145,7 +145,7 @@ namespace msg::fbuf::p2pmsg p.users = flatbuf_bytearrayvector_to_stringlist(msg.users()); if (msg.input_hashes()) - p.input_hashes = flatbuf_bytearrayvector_to_stringlist(msg.input_hashes()); + p.input_ordered_hashes = flatbuf_bytearrayvector_to_stringlist(msg.input_hashes()); if (msg.output_hash()) p.output_hash = flatbuf_bytes_to_sv(msg.output_hash()); @@ -308,7 +308,7 @@ namespace msg::fbuf::p2pmsg hasher.add(p.roundtime); hasher.add(p.nonce); hasher.add(p.users); - hasher.add(p.input_hashes); + hasher.add(p.input_ordered_hashes); hasher.add(p.output_hash); hasher.add(p.output_sig); hasher.add(p.state_hash); @@ -385,7 +385,7 @@ namespace msg::fbuf::p2pmsg p.roundtime, sv_to_flatbuf_bytes(builder, p.nonce), stringlist_to_flatbuf_bytearrayvector(builder, p.users), - stringlist_to_flatbuf_bytearrayvector(builder, p.input_hashes), + stringlist_to_flatbuf_bytearrayvector(builder, p.input_ordered_hashes), sv_to_flatbuf_bytes(builder, p.output_hash), sv_to_flatbuf_bytes(builder, p.output_sig), hash_to_flatbuf_bytes(builder, p.state_hash), diff --git a/src/msg/json/usrmsg_json.cpp b/src/msg/json/usrmsg_json.cpp index 31e512c4..74a6b14b 100644 --- a/src/msg/json/usrmsg_json.cpp +++ b/src/msg/json/usrmsg_json.cpp @@ -224,13 +224,17 @@ namespace msg::usrmsg::json * "type": "contract_input_status", * "status": "", * "reason": "", - * "input_sig": "" + * "input_hash": "", + * "ledger_seq_no": , + * "ledger_hash": "" * } * @param is_accepted Whether the original message was accepted or not. * @param reason Rejected reason. Empty if accepted. - * @param input_sig Binary signature of the original input message which generated this result. + * @param input_hash Binary Hash of the original input signature. This is used by user + * to tie the response with the input submission. */ - void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, std::string_view input_sig) + void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, + std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash) { msg.reserve(256); msg += "{\""; @@ -242,13 +246,33 @@ namespace msg::usrmsg::json msg += SEP_COLON; msg += status; msg += SEP_COMMA; - msg += msg::usrmsg::FLD_REASON; + + // Reject reason is only included for rejected inputs. + if (!reason.empty()) + { + msg += msg::usrmsg::FLD_REASON; + msg += SEP_COLON; + msg += reason; + msg += SEP_COMMA; + } + + msg += msg::usrmsg::FLD_INPUT_HASH; msg += SEP_COLON; - msg += reason; - msg += SEP_COMMA; - msg += msg::usrmsg::FLD_INPUT_SIG; - msg += SEP_COLON; - msg += util::to_hex(input_sig); + msg += util::to_hex(input_hash); + + // Ledger information is only included in 'accepted' input statuses. + if (ledger_seq_no > 0) + { + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_LEDGER_SEQ_NO; + msg += SEP_COLON_NOQUOTE; + msg += std::to_string(ledger_seq_no); + msg += SEP_COMMA_NOQUOTE; + msg += msg::usrmsg::FLD_LEDGER_HASH; + msg += SEP_COLON; + msg += util::to_hex(ledger_hash.to_string_view()); + } + msg += "\"}"; } @@ -676,16 +700,16 @@ namespace msg::usrmsg::json * Extract the individual components of a given input container json. * @param input The extracted input. * @param nonce The extracted nonce. - * @param max_lcl_seq_no The extracted max ledger sequence no. + * @param max_ledger_seq_no The extracted max ledger sequence no. * @param contentjson The json string containing the input container message. * { * "input": "", * "nonce": "", - * "max_lcl_seq_no": + * "max_ledger_seq_no": * } * @return 0 on succesful extraction. -1 on failure. */ - int extract_input_container(std::string &input, std::string &nonce, uint64_t &max_lcl_seq_no, std::string_view contentjson) + int extract_input_container(std::string &input, std::string &nonce, uint64_t &max_ledger_seq_no, std::string_view contentjson) { jsoncons::json d; try @@ -698,13 +722,13 @@ namespace msg::usrmsg::json return -1; } - if (!d.contains(msg::usrmsg::FLD_INPUT) || !d.contains(msg::usrmsg::FLD_NONCE) || !d.contains(msg::usrmsg::FLD_MAX_LCL_SEQ)) + if (!d.contains(msg::usrmsg::FLD_INPUT) || !d.contains(msg::usrmsg::FLD_NONCE) || !d.contains(msg::usrmsg::FLD_MAX_LEDGER_SEQ_NO)) { LOG_DEBUG << "User input container required fields missing."; return -1; } - if (!d[msg::usrmsg::FLD_INPUT].is() || !d[msg::usrmsg::FLD_NONCE].is() || !d[msg::usrmsg::FLD_MAX_LCL_SEQ].is()) + if (!d[msg::usrmsg::FLD_INPUT].is() || !d[msg::usrmsg::FLD_NONCE].is() || !d[msg::usrmsg::FLD_MAX_LEDGER_SEQ_NO].is()) { LOG_DEBUG << "User input container invalid field values."; return -1; @@ -712,7 +736,7 @@ namespace msg::usrmsg::json input = d[msg::usrmsg::FLD_INPUT].as(); nonce = d[msg::usrmsg::FLD_NONCE].as(); - max_lcl_seq_no = d[msg::usrmsg::FLD_MAX_LCL_SEQ].as(); + max_ledger_seq_no = d[msg::usrmsg::FLD_MAX_LEDGER_SEQ_NO].as(); return 0; } diff --git a/src/msg/json/usrmsg_json.hpp b/src/msg/json/usrmsg_json.hpp index 8341b7b6..2cc3fdfe 100644 --- a/src/msg/json/usrmsg_json.hpp +++ b/src/msg/json/usrmsg_json.hpp @@ -15,7 +15,7 @@ namespace msg::usrmsg::json void create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl_hash); void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, - std::string_view input_sig); + std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash); void create_contract_read_response_container(std::vector &msg, std::string_view content); @@ -41,7 +41,7 @@ namespace msg::usrmsg::json const jsoncons::json &d); int extract_input_container(std::string &input, std::string &nonce, - uint64_t &max_lcl_seq_no, std::string_view contentjson); + uint64_t &max_ledger_seq_no, std::string_view contentjson); int extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id, const jsoncons::json &d); diff --git a/src/msg/usrmsg_common.hpp b/src/msg/usrmsg_common.hpp index bfb25456..53f70490 100644 --- a/src/msg/usrmsg_common.hpp +++ b/src/msg/usrmsg_common.hpp @@ -22,7 +22,10 @@ namespace msg::usrmsg constexpr const char *FLD_INPUT = "input"; constexpr const char *FLD_INPUT_CONTAINER = "input_container"; constexpr const char *FLD_INPUT_SIG = "input_sig"; - constexpr const char *FLD_MAX_LCL_SEQ = "max_lcl_seq_no"; + constexpr const char *FLD_INPUT_HASH = "input_hash"; + constexpr const char *FLD_LEDGER_SEQ_NO = "ledger_seq_no"; + constexpr const char *FLD_LEDGER_HASH = "ledger_hash"; + constexpr const char *FLD_MAX_LEDGER_SEQ_NO = "max_ledger_seq_no"; constexpr const char *FLD_CONTENT = "content"; constexpr const char *FLD_OUTPUTS = "outputs"; constexpr const char *FLD_HASHES = "hashes"; @@ -52,7 +55,6 @@ namespace msg::usrmsg constexpr const char *FLD_STATE_HASH = "state_hash"; constexpr const char *FLD_CONFIG_HASH = "config_hash"; constexpr const char *FLD_USER_HASH = "user_hash"; - constexpr const char *FLD_INPUT_HASH = "input_hash"; constexpr const char *FLD_OUTPUT_HASH = "output_hash"; constexpr const char *FLD_RAW_INPUTS = "raw_inputs"; constexpr const char *FLD_RAW_OUTPUTS = "raw_outputs"; diff --git a/src/msg/usrmsg_parser.cpp b/src/msg/usrmsg_parser.cpp index fa68dd64..bee7d3ac 100644 --- a/src/msg/usrmsg_parser.cpp +++ b/src/msg/usrmsg_parser.cpp @@ -21,13 +21,13 @@ namespace msg::usrmsg busrmsg::create_status_response(msg, lcl_seq_no, lcl_hash); } - void usrmsg_parser::create_contract_input_status(std::vector &msg, std::string_view status, - std::string_view reason, std::string_view input_sig) const + void usrmsg_parser::create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, + std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash) const { if (protocol == util::PROTOCOL::JSON) - jusrmsg::create_contract_input_status(msg, status, reason, input_sig); + jusrmsg::create_contract_input_status(msg, status, reason, input_hash, ledger_seq_no, ledger_hash); else - busrmsg::create_contract_input_status(msg, status, reason, input_sig); + busrmsg::create_contract_input_status(msg, status, reason, input_hash, ledger_seq_no, ledger_hash); } void usrmsg_parser::create_contract_read_response_container(std::vector &msg, std::string_view content) const @@ -98,12 +98,12 @@ namespace msg::usrmsg } int usrmsg_parser::extract_input_container(std::string &input, std::string &nonce, - uint64_t &max_lcl_seq_no, std::string_view encoded_content) const + uint64_t &max_ledger_seq_no, std::string_view encoded_content) const { if (protocol == util::PROTOCOL::JSON) - return jusrmsg::extract_input_container(input, nonce, max_lcl_seq_no, encoded_content); + return jusrmsg::extract_input_container(input, nonce, max_ledger_seq_no, encoded_content); else - return busrmsg::extract_input_container(input, nonce, max_lcl_seq_no, encoded_content); + return busrmsg::extract_input_container(input, nonce, max_ledger_seq_no, encoded_content); } int usrmsg_parser::extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id) const diff --git a/src/msg/usrmsg_parser.hpp b/src/msg/usrmsg_parser.hpp index b5638437..b99d8481 100644 --- a/src/msg/usrmsg_parser.hpp +++ b/src/msg/usrmsg_parser.hpp @@ -22,8 +22,8 @@ namespace msg::usrmsg void create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl_hash) const; - void create_contract_input_status(std::vector &msg, std::string_view status, - std::string_view reason, std::string_view input_sig) const; + void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, + std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash) const; void create_contract_read_response_container(std::vector &msg, std::string_view content) const; @@ -34,7 +34,7 @@ namespace msg::usrmsg void create_unl_list_container(std::vector &msg, const ::std::set &unl_list) const; void create_ledger_query_response(std::vector &msg, std::string_view reply_for, - const ledger::query::query_result &result) const; + const ledger::query::query_result &result) const; int parse(std::string_view message); @@ -45,7 +45,7 @@ namespace msg::usrmsg int extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig) const; int extract_input_container(std::string &input, std::string &nonce, - uint64_t &max_lcl_seq_no, std::string_view encoded_content) const; + uint64_t &max_ledger_seq_no, std::string_view encoded_content) const; int extract_ledger_query(ledger::query::query_request &extracted_query, std::string &extracted_id) const; }; diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 6c861633..70c2ee16 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -78,7 +78,7 @@ namespace p2p util::h32 state_hash; // Contract state hash. util::h32 patch_hash; // Patch file hash. std::set users; - std::set input_hashes; + std::set input_ordered_hashes; std::string output_hash; std::string output_sig; }; diff --git a/src/sc/sc.cpp b/src/sc/sc.cpp index 4283d741..a1c8422d 100644 --- a/src/sc/sc.cpp +++ b/src/sc/sc.cpp @@ -456,11 +456,6 @@ namespace sc // Close all fds. cleanup_fds(ctx); - // Purge any inputs we passed to the contract. - for (const auto &[pubkey, bufs] : ctx.args.userbufs) - for (const util::buffer_view &input : bufs.inputs) - ctx.args.user_input_store.purge(input); - // If we reach this point but the contract is still running, then we need to kill the contract by force. // This can be the case if HP is shutting down, or there was an error in initial feeding of inputs. if (ctx.contract_pid > 0) diff --git a/src/sc/sc.hpp b/src/sc/sc.hpp index f020ca4b..5ce25fc3 100644 --- a/src/sc/sc.hpp +++ b/src/sc/sc.hpp @@ -31,6 +31,11 @@ namespace sc { uint32_t message_len = 0; std::string message; + + bool operator<(const contract_output &other) const + { + return message < other.message; + } }; /** diff --git a/src/usr/input_nonce_map.cpp b/src/usr/input_nonce_map.cpp index a26c4891..e0c011f6 100644 --- a/src/usr/input_nonce_map.cpp +++ b/src/usr/input_nonce_map.cpp @@ -15,7 +15,7 @@ namespace usr * 1 if nonce has expired. * 2 if message with same nonce/sig has already been submitted. */ - int input_nonce_map::check(const std::string &pubkey, const std::string &nonce, const std::string &sig, const uint64_t &max_lcl_seq_no, const bool no_add) + int input_nonce_map::check(const std::string &pubkey, const std::string &nonce, const std::string &sig, const uint64_t &max_ledger_seq_no, const bool no_add) { int result = 0; @@ -25,7 +25,7 @@ namespace usr { result = 0; if (!no_add) - nonce_map.emplace(pubkey, std::tuple(nonce, sig, max_lcl_seq_no)); + nonce_map.emplace(pubkey, std::tuple(nonce, sig, max_ledger_seq_no)); } else { @@ -38,7 +38,7 @@ namespace usr if (!no_add) { std::get<0>(itr->second) = nonce; - std::get<2>(itr->second) = max_lcl_seq_no; + std::get<2>(itr->second) = max_ledger_seq_no; } result = 0; } diff --git a/src/usr/input_nonce_map.hpp b/src/usr/input_nonce_map.hpp index 471c7d6e..e396a306 100644 --- a/src/usr/input_nonce_map.hpp +++ b/src/usr/input_nonce_map.hpp @@ -13,7 +13,7 @@ namespace usr void cleanup(); public: - int check(const std::string &pubkey, const std::string &nonce, const std::string &sig, const uint64_t &max_lcl_seq_no, const bool no_add = false); + int check(const std::string &pubkey, const std::string &nonce, const std::string &sig, const uint64_t &max_ledger_seq_no, const bool no_add = false); }; } // namespace usr diff --git a/src/usr/user_input.hpp b/src/usr/user_input.hpp index bfd27baf..55ba7e02 100644 --- a/src/usr/user_input.hpp +++ b/src/usr/user_input.hpp @@ -21,7 +21,7 @@ namespace usr { std::string input; std::string nonce; - uint64_t max_lcl_seq_no; + uint64_t max_ledger_seq_no; std::string sig; util::PROTOCOL protocol; // The message protocol used by the user. diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 5655fced..4368bf2e 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -173,21 +173,28 @@ namespace usr std::string input_data; std::string nonce; - uint64_t max_lcl_seq_no; - if (parser.extract_input_container(input_data, nonce, max_lcl_seq_no, input_container) != -1) + uint64_t max_ledger_seq_no; + if (parser.extract_input_container(input_data, nonce, max_ledger_seq_no, input_container) != -1) { const p2p::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); // Ignore the input if the max ledger seq number specified is beyond the max offeset. - if (conf::cfg.contract.max_input_ledger_offset != 0 && max_lcl_seq_no > lcl_id.seq_no + conf::cfg.contract.max_input_ledger_offset) + if (conf::cfg.contract.max_input_ledger_offset != 0 && max_ledger_seq_no > lcl_id.seq_no + conf::cfg.contract.max_input_ledger_offset) { - send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_MAX_LEDGER_OFFSET_EXCEEDED, sig); + send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_MAX_LEDGER_OFFSET_EXCEEDED, crypto::get_hash(sig)); + return -1; + } + + // Ignore the input if our ledger has passed the input TTL. + if (max_ledger_seq_no <= lcl_id.seq_no) + { + send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_MAX_LEDGER_EXPIRED, crypto::get_hash(sig)); return -1; } // Check for max nonce size. if (nonce.size() > MAX_INPUT_NONCE_SIZE) { - send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_NONCE_OVERFLOW, sig); + send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_NONCE_OVERFLOW, crypto::get_hash(sig)); return -1; } @@ -195,11 +202,11 @@ namespace usr if (conf::cfg.contract.round_limits.user_input_bytes > 0 && (user.collected_input_size + input_data.size()) > conf::cfg.contract.round_limits.user_input_bytes) { - send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_ROUND_INPUTS_OVERFLOW, sig); + send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_ROUND_INPUTS_OVERFLOW, crypto::get_hash(sig)); return -1; } - const int nonce_status = nonce_map.check(user.pubkey, nonce, sig, max_lcl_seq_no, true); + const int nonce_status = nonce_map.check(user.pubkey, nonce, sig, max_ledger_seq_no, true); if (nonce_status == 0) { //Add to the submitted input list. @@ -216,19 +223,19 @@ namespace usr else { const char *reason = nonce_status == 1 ? msg::usrmsg::REASON_NONCE_EXPIRED : msg::usrmsg::REASON_ALREADY_SUBMITTED; - send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, reason, sig); + send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, reason, crypto::get_hash(sig)); return -1; } } else { - send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_BAD_MSG_FORMAT, sig); + send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_BAD_MSG_FORMAT, crypto::get_hash(sig)); return -1; } } else { - send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_BAD_MSG_FORMAT, sig); + send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_BAD_MSG_FORMAT, crypto::get_hash(sig)); return -1; } } @@ -272,8 +279,12 @@ namespace usr /** * Sends multiple user input responses grouped by user. */ - void send_input_status_responses(const std::unordered_map> &responses) + void send_input_status_responses(const std::unordered_map> &responses, + const uint64_t ledger_seq_no, const util::h32 &ledger_hash) { + if (responses.empty()) + return; + // Lock the user sessions. std::scoped_lock lock(usr::ctx.users_mutex); @@ -295,7 +306,9 @@ namespace usr user_itr->second.session, resp.reject_reason == NULL ? msg::usrmsg::STATUS_ACCEPTED : msg::usrmsg::STATUS_REJECTED, resp.reject_reason == NULL ? "" : resp.reject_reason, - resp.sig); + resp.input_hash, + ledger_seq_no, + ledger_hash); } } } @@ -306,10 +319,11 @@ namespace usr * Send the specified contract input status result via the provided session. */ void send_input_status(const msg::usrmsg::usrmsg_parser &parser, usr::user_comm_session &session, - std::string_view status, std::string_view reason, std::string_view input_sig) + std::string_view status, std::string_view reason, std::string_view input_hash, + const uint64_t ledger_seq_no, const util::h32 &ledger_hash) { std::vector msg; - parser.create_contract_input_status(msg, status, reason, input_sig); + parser.create_contract_input_status(msg, status, reason, input_hash, ledger_seq_no, ledger_hash); session.send(msg); } @@ -387,7 +401,7 @@ namespace usr // Extract information from input container. msg::usrmsg::usrmsg_parser parser(submitted.protocol); - if (parser.extract_input_container(extracted.input, extracted.nonce, extracted.max_lcl_seq_no, submitted.input_container) == -1) + if (parser.extract_input_container(extracted.input, extracted.nonce, extracted.max_ledger_seq_no, submitted.input_container) == -1) { LOG_DEBUG << "User input bad input container format."; return msg::usrmsg::REASON_BAD_MSG_FORMAT; @@ -404,17 +418,25 @@ namespace usr * @return The rejection reason if input rejected. NULL if the input can be accepted. */ const char *validate_user_input_submission(const std::string &user_pubkey, const usr::extracted_user_input &extracted_input, - const uint64_t lcl_seq_no, size_t &total_input_size, std::string &hash, util::buffer_view &input) + const uint64_t lcl_seq_no, size_t &total_input_size, std::string &ordered_hash, util::buffer_view &input) { + // Ordered hash is used as the globally unqiue 'key' to represent this input for this consensus round. + // It is prefixed with the nonce to support user-defined sort order and the input hash is appended + // to make it unique among inputs from all users. + // Ordered hash = nonce + input hash + // Nonce length is not fixed. So last 32 bytes of ordered hash always contains the input hash. + // In the ledger, we will store the nonce and input hash separately. + ordered_hash = extracted_input.nonce + crypto::get_hash(extracted_input.sig); + // Ignore the input if the max ledger seq number specified is beyond the max offeset. - if (conf::cfg.contract.max_input_ledger_offset != 0 && extracted_input.max_lcl_seq_no > lcl_seq_no + conf::cfg.contract.max_input_ledger_offset) + if (conf::cfg.contract.max_input_ledger_offset != 0 && extracted_input.max_ledger_seq_no > lcl_seq_no + conf::cfg.contract.max_input_ledger_offset) { LOG_DEBUG << "User input bad max ledger seq beyond the max offset."; return msg::usrmsg::REASON_MAX_LEDGER_OFFSET_EXCEEDED; } // Ignore the input if our ledger has passed the input TTL. - if (extracted_input.max_lcl_seq_no <= lcl_seq_no) + if (extracted_input.max_ledger_seq_no <= lcl_seq_no) { LOG_DEBUG << "User input bad max ledger seq expired."; return msg::usrmsg::REASON_MAX_LEDGER_EXPIRED; @@ -429,7 +451,7 @@ namespace usr return msg::usrmsg::REASON_ROUND_INPUTS_OVERFLOW; } - const int nonce_status = nonce_map.check(user_pubkey, extracted_input.nonce, extracted_input.sig, extracted_input.max_lcl_seq_no); + const int nonce_status = nonce_map.check(user_pubkey, extracted_input.nonce, extracted_input.sig, extracted_input.max_ledger_seq_no); if (nonce_status > 0) { LOG_DEBUG << (nonce_status == 1 ? "User input nonce expired." : "User input with same nonce/sig already submitted."); @@ -444,11 +466,6 @@ namespace usr // Reaching here means the input is successfully validated and we can submit it to consensus. - // Hash is used as the globally unqiue 'key' to represent this input for this consensus round. - // It is prefixed with the nonce to support user-defined sort order and signature hash is appended - // to make it unique among inputs from all users. - hash = extracted_input.nonce + crypto::get_hash(extracted_input.sig); - // Copy the input data into the input store. Contract will read the input from this location. input = input_store.write_buf(extracted_input.input.data(), extracted_input.input.size()); diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index 4276581f..a38f0dec 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -3,6 +3,7 @@ #include "../pchheader.hpp" #include "../util/util.hpp" +#include "../util/h32.hpp" #include "../util/rollover_hashset.hpp" #include "../util/buffer_store.hpp" #include "../msg/usrmsg_parser.hpp" @@ -66,7 +67,7 @@ namespace usr struct input_status_response { const util::PROTOCOL protocol; - const std::string sig; + const std::string input_hash; const char *reject_reason; }; @@ -83,10 +84,12 @@ namespace usr int handle_authed_user_message(connected_user &user, std::string_view message); - void send_input_status_responses(const std::unordered_map> &responses); + void send_input_status_responses(const std::unordered_map> &responses, + const uint64_t ledger_seq_no = 0, const util::h32 &ledger_hash = util::h32_empty); void send_input_status(const msg::usrmsg::usrmsg_parser &parser, usr::user_comm_session &session, - std::string_view status, std::string_view reason, std::string_view input_sig); + std::string_view status, std::string_view reason, std::string_view input_hash, + const uint64_t ledger_seq_no = 0, const util::h32 &ledger_hash = util::h32_empty); int add_user(usr::user_comm_session &session, const std::string &user_pubkey_hex, std::string_view protocol_code); @@ -95,7 +98,7 @@ namespace usr const char *extract_submitted_input(const std::string &user_pubkey, const usr::submitted_user_input &submitted, usr::extracted_user_input &extracted); const char *validate_user_input_submission(const std::string &user_pubkey, const usr::extracted_user_input &extracted_input, - const uint64_t lcl_seq_no, size_t &total_input_size, std::string &hash, util::buffer_view &input); + const uint64_t lcl_seq_no, size_t &total_input_size, std::string &ordered_hash, util::buffer_view &input); bool verify_appbill_check(std::string_view pubkey, const size_t input_len); diff --git a/src/util/buffer_store.cpp b/src/util/buffer_store.cpp index 1805edc0..1fca5a79 100644 --- a/src/util/buffer_store.cpp +++ b/src/util/buffer_store.cpp @@ -62,9 +62,10 @@ namespace util int buffer_store::purge(const buffer_view &buf) { const size_t purge_size = BLOCK_ALIGN(buf.size); - if (fallocate(fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, buf.offset, purge_size) == -1) + if (fallocate(fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, buf.offset, purge_size) == -1 && + errno != EBADF) // errno=EBADF is ignored, since if the memfd is closed, we don't need to cleanup anyway. { - LOG_ERROR << errno << ": Error when purging buffer store fd " << fd; + LOG_ERROR << errno << ": Error when purging buffer store fd " << fd << " (" << buf.offset << "," << buf.size << ")"; return -1; } return 0; diff --git a/src/util/merkle_hash_tree.cpp b/src/util/merkle_hash_tree.cpp index a8495da8..ad8c3368 100644 --- a/src/util/merkle_hash_tree.cpp +++ b/src/util/merkle_hash_tree.cpp @@ -40,7 +40,7 @@ namespace util std::vector hashes; for (const util::merkle_hash_node &child : parent.children) hashes.push_back(child.hash); - parent.hash = crypto::get_hash(hashes); + parent.hash = crypto::get_list_hash(hashes); } else { diff --git a/src/util/util.cpp b/src/util/util.cpp index ae1f9b85..63a9432c 100644 --- a/src/util/util.cpp +++ b/src/util/util.cpp @@ -458,4 +458,12 @@ namespace util ((uint64_t)data[7]); } + /** + * Returns the substring view from the end of the provided string view. + */ + std::string_view get_string_suffix(std::string_view sv, const size_t suffix_len) + { + return sv.substr(sv.size() - suffix_len, suffix_len); + } + } // namespace util diff --git a/src/util/util.hpp b/src/util/util.hpp index 3ee4d9d5..ac9fafe1 100644 --- a/src/util/util.hpp +++ b/src/util/util.hpp @@ -79,6 +79,8 @@ namespace util uint64_t uint64_from_bytes(const uint8_t *data); + std::string_view get_string_suffix(std::string_view sv, const size_t suffix_len); + } // namespace util #endif diff --git a/test/metrics/metrics.js b/test/metrics/metrics.js index 830ce73c..146cc1e3 100644 --- a/test/metrics/metrics.js +++ b/test/metrics/metrics.js @@ -109,8 +109,8 @@ function singleUserInputOutput(payloadKB, requestCount) { for (let i = 0; i < requestCount; i++) { const nonce = i.toString().padStart(5); hpc.sendContractInput(payload, nonce, 10).then(r => { - if (r != "ok") - console.log(r); + if (r.status != "ok") + console.log(r.reason); }); } }) @@ -152,8 +152,8 @@ function largePayload(payloadMB) { timer.start(); await hpc.sendContractInput(payload).then(r => { - if (r != "ok") - console.log(r); + if (r.status != "ok") + console.log(r.reason); });; }) }