diff --git a/examples/js_client/browser-example.html b/examples/js_client/browser-example.html index f1c9f35e..222f2969 100644 --- a/examples/js_client/browser-example.html +++ b/examples/js_client/browser-example.html @@ -63,8 +63,8 @@ }) // This will get fired when contract sends an output. - hpc.on(HotPocket.events.contractOutput, (output) => { - console.log("Contract output>> " + output); + hpc.on(HotPocket.events.contractOutput, (r) => { + r.outputs.forEach(o => console.log("Contract output>> " + o)); }) // This will get fired when contract sends a read response. diff --git a/examples/js_client/file-client.js b/examples/js_client/file-client.js index 5cfa6272..cd70f76c 100644 --- a/examples/js_client/file-client.js +++ b/examples/js_client/file-client.js @@ -43,23 +43,28 @@ async function main() { }) // This will get fired when contract sends an output. - hpc.on(HotPocket.events.contractOutput, (output) => { - const result = bson.deserialize(output); - if (result.type == "uploadResult") { - if (result.status == "ok") - console.log("File " + result.fileName + " uploaded successfully."); - else - console.log("File " + result.fileName + " upload failed. reason: " + result.status); - } - else if (result.type == "deleteResult") { - if (result.status == "ok") - console.log("File " + result.fileName + " deleted successfully."); - else - console.log("File " + result.fileName + " delete failed. reason: " + result.status); - } - else { - console.log("Unknown contract output."); - } + hpc.on(HotPocket.events.contractOutput, (r) => { + + r.outputs.forEach(output => { + + const result = bson.deserialize(output); + if (result.type == "uploadResult") { + if (result.status == "ok") + console.log("File " + result.fileName + " uploaded successfully."); + else + console.log("File " + result.fileName + " upload failed. reason: " + result.status); + } + else if (result.type == "deleteResult") { + if (result.status == "ok") + console.log("File " + result.fileName + " deleted successfully."); + else + console.log("File " + result.fileName + " delete failed. reason: " + result.status); + } + else { + console.log("Unknown contract output."); + } + + }); }) // This will get fired when contract sends a read response. diff --git a/examples/js_client/hp-client-lib.js b/examples/js_client/hp-client-lib.js index d8ae2937..45adbd8e 100644 --- a/examples/js_client/hp-client-lib.js +++ b/examples/js_client/hp-client-lib.js @@ -367,37 +367,47 @@ } // Get root hash of the given merkle hash tree. (called recursively) - // checkHashString specifies the hash that must be checked for existance. - const getMerkleHash = (tree, checkHashString) => { + // Merkle hash tree indicates the collapsed output hashes of this round for all users. + // This user's output hash is indicated in the tree as null. + // selfHash: specifies the output hash of this user. + const getMerkleHash = (tree, selfHash) => { const listToHash = []; // Collects elements to hash. - let checkHashFound = false; + let selfHashFound = false; for (let elem of tree) { if (Array.isArray(elem)) { // If the 'elem' is an array we should find the root hash of the array. - // Call this func recursively. If checkHash already found, pass null. - const result = getMerkleHash(elem, checkHashFound ? null : checkHashString); + // Call this func recursively. If self hash already found, pass null. + const result = getMerkleHash(elem, selfHashFound ? null : selfHash); if (result[0] == true) - checkHashFound = true; + selfHashFound = true; listToHash.push(result[1]); } - else { - // 'elem' is a single hash value. We get the hash bytes depending on the data type. - // (json encoding will use hex string and bson will use buffer) - const hashBytes = isString(elem) ? hexToUint8Array(elem) : elem.buffer; - listToHash.push(hashBytes); + else { // elem' is a single hash value + // We get the hash bytes depending on the data type. (json encoding will use hex string + // and bson will use buffer). If the elem contains null, that means it represents the + // self hash. So we should substitute the self hash to null. - // If checkHash is specified, compare the hashes. - if (checkHashString && msgHelper.stringifyValue(hashBytes) == checkHashString) - checkHashFound = true; + // If we have already found self hash (indicated by selfHash=null), we cannot encounter + // null element again. + if (!selfHash && !elem) { + liblog(1, "Self hash encountered more than once in output hash tree."); + return [false, null]; + } + + if (!elem) + selfHashFound = true; + + const hashBytes = elem ? msgHelper.binaryDecode(elem) : selfHash; // If element is null, use self hash. + listToHash.push(hashBytes); } } - // Return a tuple of whether check hash was found and the root hash of the provided merkle tree. - return [checkHashFound, getHash(listToHash)]; + // Return a tuple of whether self hash was found and the root hash of the provided merkle tree. + return [selfHashFound, getHash(listToHash)]; } // Verifies whether the provided root hash has enough signatures from unl. @@ -417,8 +427,8 @@ // Get the signature and issuer pubkey bytes based on the data type. // (json encoding will use hex string and bson will use buffer) - const binPubkey = isString(pair[0]) ? hexToUint8Array(pair[0]) : pair[0].buffer; - const sig = isString(pair[1]) ? hexToUint8Array(pair[1]) : pair[1].buffer; + const binPubkey = msgHelper.binaryDecode(pair[0]); + const sig = msgHelper.binaryDecode(pair[1]); // Check whether the pubkey is in unl and whether signature is valid. if (!passedKeys[pubkeyHex] && unlKeysLookup[pubkeyHex] && sodium.crypto_sign_verify_detached(sig, rootHash, binPubkey.slice(1))) @@ -430,14 +440,22 @@ return ((passed / totalUnl) >= outputValidationPassThreshold); } - const validateOutput = (msg, trustedKeys) => { + const verifyContractOutputTrust = (msg, trustedKeys) => { // Calculate combined output hash with user's pubkey. const outputHash = getHash([clientKeys.publicKey, ...msgHelper.spreadArrayField(msg.outputs)]); - const result = getMerkleHash(msg.hashes, msgHelper.stringifyValue(outputHash)); - if (result[0] == true) { - const rootHash = result[1]; + // Check whether calculated output hash is same as output hash indicated in the message. + if (!arraysEqual(outputHash, msgHelper.binaryDecode(msg.output_hash))) { + liblog(1, "Contract output hash mismatch."); + return false; + } + + const hashResult = getMerkleHash(msg.hash_tree, outputHash); + + // hashResult is a tuple containing whether self hash was found and the calculated root hash of the merkle hash tree. + if (hashResult[0] == true) { + const rootHash = hashResult[1]; // Verify the issued signatures against the root hash. return validateHashSignatures(rootHash, msg.unl_sig, trustedKeys); @@ -533,7 +551,7 @@ const contractMessageHandler = (m) => { if (m.type == "contract_read_response") { - emitter && emitter.emit(events.contractReadResponse, msgHelper.deserializeOutput(m.content)); + emitter && emitter.emit(events.contractReadResponse, msgHelper.deserializeValue(m.content)); } else if (m.type == "contract_input_status") { const inputHashHex = msgHelper.stringifyValue(m.input_hash); @@ -543,7 +561,7 @@ if (m.status == "accepted") { result.ledgerSeqNo = m.ledger_seq_no; - result.ledgerHash = m.ledger_hash; + result.ledgerHash = msgHelper.deserializeValue(m.ledger_hash); } else { result.reason = m.reason; @@ -557,8 +575,15 @@ if (emitter) { // Validate outputs if trusted keys is not null. (null means bypass validation) const trustedKeys = getTrustedKeys(); - if (!trustedKeys || validateOutput(m, trustedKeys)) - m.outputs.forEach(output => emitter.emit(events.contractOutput, msgHelper.deserializeOutput(output))); + + if (!trustedKeys || verifyContractOutputTrust(m, trustedKeys)) { + emitter.emit(events.contractOutput, { + ledgerSeqNo: m.ledger_seq_no, + ledgerHash: msgHelper.deserializeValue(m.ledger_hash), + outputHash: msgHelper.deserializeValue(m.output_hash), + outputs: m.outputs.map(o => msgHelper.deserializeValue(o)) + }); + } else liblog(1, "Output validation failed."); } @@ -567,8 +592,8 @@ statResponseResolvers.forEach(resolver => { resolver({ hpVersion: m.hp_version, - lclSeqNo: m.lcl_seq_no, - lclHash: m.lcl_hash, + ledgerSeqNo: m.ledger_seq_no, + ledgerHash: msgHelper.deserializeValue(m.ledger_hash), roundTime: m.round_time, contractExecutionEnabled: m.contract_execution_enabled, readRequestsEnabled: m.read_requests_enabled, @@ -780,7 +805,7 @@ if (!stat) throw "Error retrieving ledger status." - maxLedger += stat.lclSeqNo; + maxLedger += stat.ledgerSeqNo; } const inp = msgHelper.createContractInputComponents(input, nonce, maxLedger); @@ -841,6 +866,10 @@ (Buffer.isBuffer(data) ? data : Buffer.from(data)); } + this.binaryDecode = (data) => { + return protocol == protocols.json ? hexToUint8Array(data) : new Uint8Array(data.buffer); + } + this.serializeObject = (obj) => { return protocol == protocols.json ? JSON.stringify(obj) : bson.serialize(obj); } @@ -855,8 +884,8 @@ (Buffer.isBuffer(input) ? input : Buffer.from(input)); } - this.deserializeOutput = (content) => { - return protocol == protocols.json ? content : content.buffer; + this.deserializeValue = (val) => { + return protocol == protocols.json ? val : val.buffer; } // Used for generating strings to hold values as js object keys. @@ -967,6 +996,16 @@ return (typeof obj === "string" || obj instanceof String); } + function arraysEqual(a, b) { + if (a.length != b.length) + return false; + for (let i = 0; i < a.length; i++) { + if (a[i] !== b[i]) + return false; + } + return true; + } + function EventEmitter() { const registrations = {}; diff --git a/examples/js_client/text-client.js b/examples/js_client/text-client.js index 39a818c2..a69d6da3 100644 --- a/examples/js_client/text-client.js +++ b/examples/js_client/text-client.js @@ -1,9 +1,11 @@ const readline = require('readline'); const HotPocket = require('./hp-client-lib'); - async function main() { + // Set HP lib log level. 0=info, 1=error + // HotPocket.setLogLevel(1); + const keys = await HotPocket.generateKeys(); const pkhex = Buffer.from(keys.publicKey).toString('hex'); @@ -55,9 +57,9 @@ async function main() { console.log("New unl received: " + JSON.stringify(unl)); // unl is an array of hex public keys. }) - // This will get fired when contract sends an output. - hpc.on(HotPocket.events.contractOutput, (output) => { - console.log("Contract output>> " + output); + // This will get fired when contract sends outputs. + hpc.on(HotPocket.events.contractOutput, (r) => { + r.outputs.forEach(o => console.log("Contract output>> " + o)); }) // This will get fired when contract sends a read response. diff --git a/examples/nodejs_contract/echo_contract.js b/examples/nodejs_contract/echo_contract.js index 7252c33d..9f266fe5 100644 --- a/examples/nodejs_contract/echo_contract.js +++ b/examples/nodejs_contract/echo_contract.js @@ -9,16 +9,19 @@ const echoContract = async (ctx) => { if (!ctx.readonly) fs.appendFileSync("exects.txt", "ts:" + ctx.timestamp + "\n"); - // Collection of user input handler promises to wait for. - const inputHandlers = []; + // Collection of per-user promises to wait for. Each promise completes when inputs for that user is processed. + const userHandlers = []; for (const user of ctx.users.list()) { - // This user's pubkey can be accessed from 'user.pubKey' + // This user's hex pubkey can be accessed from 'user.pubKey' - for (const input of user.inputs) { + // For each user we add a promise to list of promises. + userHandlers.push(new Promise(async (resolve) => { - inputHandlers.push(new Promise(async (resolve) => { + // The contract need to ensure that all outputs for a particular user is emitted + // in deterministic order. Hence, we are processing all inputs for each user sequentially. + for (const input of user.inputs) { const buf = await ctx.users.read(input); const msg = buf.toString(); @@ -26,11 +29,15 @@ const echoContract = async (ctx) => { const output = (msg == "ts") ? fs.readFileSync("exects.txt").toString() : ("Echoing: " + msg); await user.send(output); - resolve(); - })); - } + } + + // The promise gets complete when all inputs for this user are processed. + resolve(); + })); } - await Promise.all(inputHandlers); + + // Wait until all user promises are complete. + await Promise.all(userHandlers); // Get the user identified by public key. // ctx.users.find(""); diff --git a/src/consensus.cpp b/src/consensus.cpp index 795029d1..b41a07ab 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -168,13 +168,15 @@ namespace consensus if (ctx.stage == 3) { consensed_user_map consensed_users; - if (prepare_consensed_users_and_inputs(consensed_users, p) == -1 || - update_ledger(p, consensed_users, patch_hash, lcl_id) == -1 || - execute_contract(p, consensed_users, lcl_id) == -1) + if (prepare_consensed_users(consensed_users, p) == -1 || + commit_consensus_results(p, consensed_users, patch_hash) == -1) + { LOG_ERROR << "Error occured in Stage 3 consensus execution."; - // Cleanup any buffers occupied by consensed inputs regardless of any errors occured. - purge_user_input_buffers(consensed_users); + // Cleanup obsolete information before next round starts. + cleanup_output_collections(); + cleanup_consensed_user_inputs(consensed_users); + } } } @@ -189,6 +191,41 @@ namespace consensus return 0; } + /** + * Performs the consensus finalalization activities with the provided consensused information. + * @param cons_prop The proposal which reached consensus. + * @param consensed_users Set of consensed users and their consensed inputs and outputs. + * @param patch_hash The current config patch hash. + */ + int commit_consensus_results(const p2p::proposal &cons_prop, const consensus::consensed_user_map &consensed_users, const util::h32 &patch_hash) + { + // Persist the new ledger with the consensus results. + if (ledger::save_ledger(cons_prop, consensed_users) == -1) + return -1; + + p2p::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); + LOG_INFO << "****Ledger created**** (lcl:" << lcl_id << " state:" << cons_prop.state_hash << " patch:" << cons_prop.patch_hash << ")"; + + // Now that there's a new ledger, prune any newly-expired candidate inputs. + expire_candidate_inputs(lcl_id); + + // Inform locally connected users that their inputs made it into the ledger. + dispatch_consensed_user_input_responses(consensed_users, lcl_id); + + // Send consensed outputs to locally connected users. + dispatch_consensed_user_outputs(consensed_users, lcl_id); + + // Apply consensed config patch file changes to the hpcore runtime and hp.cfg. + if (apply_consensed_patch_file_changes(cons_prop.patch_hash, patch_hash) == -1) + return -1; + + // Execute the smart contract with the consensed user inputs. + if (execute_contract(cons_prop, consensed_users, lcl_id) == -1) + return -1; + + return 0; + } + /** * Checks whether we are in sync with the received votes. * @return 0 if we are in sync. -1 on ledger or hpfs desync. -2 if majority last ledger primary shard hash unreliable. @@ -357,42 +394,61 @@ namespace consensus } /** - * Prepare the consensed user map including the consensed user inputs based on the consensus proposal. + * Prepare the consensed user map including the consensed inputs/outputs for those users based on the consensus proposal. * @param consensed_users The consensed user map to populate. * @param cons_prop The proposal that reached consensus. * @return 0 on success. -1 on failure. */ - int prepare_consensed_users_and_inputs(consensed_user_map &consensed_users, const p2p::proposal &cons_prop) + int prepare_consensed_users(consensed_user_map &consensed_users, const p2p::proposal &cons_prop) { - // Prepare consensed user input set by joining consensus proposal input ordered hashes and candidate user input set. - // consensed inputs are removed from the candidate set. - int ret = 0; // Populate the users map with all consensed users regardless of whether they have inputs or not. for (const std::string &pubkey : cons_prop.users) - consensed_users.try_emplace(pubkey, std::vector()); + consensed_users.try_emplace(pubkey, consensed_user{}); + // Prepare consensed user input set by joining consensus proposal input ordered hashes and candidate user input set. + // consensed inputs are removed from the candidate set. for (const std::string &ordered_hash : cons_prop.input_ordered_hashes) { // For each consensus input ordered hash, we need to find the candidate input. const auto itr = ctx.candidate_user_inputs.find(ordered_hash); const bool hash_found = (itr != ctx.candidate_user_inputs.end()); - if (!hash_found) + if (hash_found) { - LOG_ERROR << "Input required but wasn't in our candidate inputs map, this will potentially cause desync."; + candidate_user_input &ci = itr->second; + consensed_users[ci.user_pubkey].consensed_inputs.emplace_back(ordered_hash, ci.input); + + // Erase the consensed input from the candidate set. + ctx.candidate_user_inputs.erase(itr); + } + else + { + LOG_WARNING << "Input required but wasn't in our candidate inputs map, this will potentially cause desync."; // We set error return value but keep on moving candidate inputs to consensed inputs. // This is so that their underlying buffers can get deallocated during stage 3 execution steps. ret = -1; } + } + + // If final elected output hash matches our output hash, move the outputs into consensed outputs. + { + if (cons_prop.output_hash == ctx.user_outputs_hashtree.root_hash()) + { + for (const auto &[hash, gen_output] : ctx.generated_user_outputs) + { + consensed_user_output &con_out = consensed_users[gen_output.user_pubkey].consensed_outputs; + con_out.hash = hash; + + for (const sc::contract_output &co : gen_output.outputs) + con_out.outputs.push_back(std::move(co.message)); + } + } else { - candidate_user_input &ci = itr->second; - consensed_users[ci.user_pubkey].emplace_back(ordered_hash, ci.input, ci.protocol); - - // Erase the consensed input from the candidate set. - ctx.candidate_user_inputs.erase(itr); + LOG_WARNING << "Consenses output hash didn't match our output hash."; + ret = -1; } } @@ -400,17 +456,39 @@ namespace consensus } /** - * Purges the underyling buffers that belong to provided consensed user inputs. - * @param consensed_users The consensed user map that contains input pointers. + * Removes any candidate inputs that has lived passed the current ledger seq no. + */ + void expire_candidate_inputs(const p2p::sequence_hash &lcl_id) + { + auto itr = ctx.candidate_user_inputs.begin(); + while (itr != ctx.candidate_user_inputs.end()) + { + if (itr->second.max_ledger_seq_no <= lcl_id.seq_no) + { + // Erase the candidate input along with its data buffer in the input store. + usr::input_store.purge(itr->second.input); + ctx.candidate_user_inputs.erase(itr++); + } + else + { + ++itr; + } + } + } + + /** + * Cleans up any consensused user inputs that are not relevant for the next round. + * @param consensed_users The consensed user map that contains consensed inputs. * @return 0 on success. -1 on failure. */ - int purge_user_input_buffers(const consensed_user_map &consensed_users) + int cleanup_consensed_user_inputs(const consensed_user_map &consensed_users) { int ret = 0; - for (const auto &[pubkey, inputs] : consensed_users) + // Purges the underyling buffers that belong to provided consensed user inputs. + for (const auto &[pubkey, user] : consensed_users) { - for (const consensed_user_input &ci : inputs) + for (const consensed_user_input &ci : user.consensed_inputs) { if (usr::input_store.purge(ci.input) == -1) ret = -1; @@ -420,6 +498,17 @@ namespace consensus return ret; } + /** + * Clears the contract output collections that are no longer needed for the next round. + */ + void cleanup_output_collections() + { + ctx.user_outputs_our_sig.clear(); + ctx.generated_user_outputs.clear(); + ctx.user_outputs_hashtree.clear(); + ctx.user_outputs_unl_sig.clear(); + } + /** * Syncrhonise the stage/round time for fixed intervals and reset the stage. * @return True if consensus can proceed in the current round. False if stage is reset. @@ -605,7 +694,7 @@ namespace consensus if (reject_reason == NULL) extracted_inputs.push_back(std::move(extracted)); else - rejections[pubkey].push_back(usr::input_status_response{submitted_input.protocol, crypto::get_hash(submitted_input.sig), reject_reason}); + rejections[pubkey].push_back(usr::input_status_response{crypto::get_hash(submitted_input.sig), reject_reason}); } // This will sort the inputs in nonce order so the validation will follow the same order on all nodes. @@ -628,7 +717,7 @@ namespace consensus // No reject reason means we should go ahead and subject the input to consensus. ctx.candidate_user_inputs.try_emplace( ordered_hash, - candidate_user_input(pubkey, stored_input, extracted_input.max_ledger_seq_no, extracted_input.protocol)); + candidate_user_input(pubkey, stored_input, extracted_input.max_ledger_seq_no)); } // If the input was rejected we need to inform the user. @@ -636,7 +725,7 @@ namespace consensus { // We need to consider the last 32 bytes of each ordered hash to get input hash without the nonce prefix. const std::string input_hash = std::string(util::get_string_suffix(ordered_hash, BLAKE3_OUT_LEN)); - rejections[pubkey].push_back(usr::input_status_response{extracted_input.protocol, std::move(input_hash), reject_reason}); + rejections[pubkey].push_back(usr::input_status_response{std::move(input_hash), reject_reason}); } } } @@ -933,56 +1022,6 @@ namespace consensus is_patch_desync = (sc::contract_fs.get_parent_hash(sc::PATCH_FILE_PATH) != majority_patch_hash); } - /** - * Update the ledger after reaching consensus. - * @param cons_prop The proposal that reached consensus. - * @param consensed_users Consensed users and their inputs. - * @param patch_hash The patch hash. - * @param lcl_id Last lcl seq_no and hash. - * @return 0 on success. -1 on error. - */ - int update_ledger(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const util::h32 &patch_hash, p2p::sequence_hash &new_lcl_id) - { - if (ledger::save_ledger(cons_prop, consensed_users, ctx.generated_user_outputs) == -1) - return -1; - - new_lcl_id = ledger::ctx.get_lcl_id(); - const p2p::sequence_hash new_last_primary_shard_id = ledger::ctx.get_last_primary_shard_id(); - - LOG_INFO << "****Ledger created**** (lcl:" << new_lcl_id << " state:" << cons_prop.state_hash << " patch:" << cons_prop.patch_hash << ")"; - - // Send back the inputs "accepted" responses to the user. - if (dispatch_consensed_user_input_responses(consensed_users, new_lcl_id) == -1) - return -1; - - // Send any output from the previous consensus round to locally connected users. - dispatch_user_outputs(cons_prop, new_lcl_id); - - // Apply consensed patch file changes to the hpcore runtime and hp.cfg. - if (apply_consensed_patch_file_changes(cons_prop.patch_hash, patch_hash) == -1) - return -1; - - // After the current ledger seq no is updated, we remove any newly expired inputs from candidate set. - { - auto itr = ctx.candidate_user_inputs.begin(); - while (itr != ctx.candidate_user_inputs.end()) - { - if (itr->second.max_ledger_seq_no <= new_lcl_id.seq_no) - { - // Erase the candidate input along with its data buffer in the input store. - usr::input_store.purge(itr->second.input); - ctx.candidate_user_inputs.erase(itr++); - } - else - { - ++itr; - } - } - } - - return 0; - } - /** * Executes the contract after consensus. * @param cons_prop The proposal that reached consensus. @@ -1015,14 +1054,16 @@ namespace consensus return -1; } + // Cleanup the fed inputs and extract the generated outputs. + cleanup_consensed_user_inputs(consensed_users); + extract_user_outputs_from_contract_bufmap(args.userbufs); + // Get the new state hash after contract execution. const util::h32 &new_state_hash = args.post_execution_state_hash; // Update state hash in contract fs global hash tracker. sc::contract_fs.set_parent_hash(sc::STATE_DIR_PATH, new_state_hash); - extract_user_outputs_from_contract_bufmap(args.userbufs); - // Generate user output hash merkle tree and signature with state hash included. if (!ctx.generated_user_outputs.empty()) { @@ -1043,95 +1084,81 @@ namespace consensus } /** - * Dispatch acceptance status responses to consensed user inputs, if the recipients are connected to us locally. - * @param consensed_users The map of consensed users and their inputs. + * Dispatch acceptence status responses to consensed user inputs, if the recipients are connected to us locally. + * @param consensed_users The map of consensed users containing their inputs. * @param lcl_id The ledger the inputs got included in. - * @return 0 on success. -1 on failure. */ - int dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id) + void dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id) { + if (consensed_users.empty()) + return; + std::unordered_map> responses; - for (const auto &[pubkey, inputs] : consensed_users) + for (const auto &[pubkey, user] : consensed_users) { - if (inputs.empty()) + if (user.consensed_inputs.empty()) continue; const auto [itr, success] = responses.emplace(pubkey, std::vector()); - for (const consensed_user_input &ci : inputs) + for (const consensed_user_input &ci : user.consensed_inputs) { // We need to consider the last 32 bytes of each ordered hash to get input hash without the nonce prefix. const std::string input_hash = std::string(util::get_string_suffix(ci.ordered_hash, BLAKE3_OUT_LEN)); - itr->second.push_back(usr::input_status_response{ci.protocol, input_hash, NULL}); + itr->second.push_back(usr::input_status_response{input_hash, NULL}); } } usr::send_input_status_responses(responses, lcl_id.seq_no, lcl_id.hash); - - return 0; } /** * Dispatch any consensus-reached outputs to matching users if they are connected to us locally. - * @param cons_prop The proposal that achieved consensus. - * @param lcl_id Lcl sequnce no hash info. + * @param consensed_users The map of consensed users containing their outputs. + * @param lcl_id The ledger the outputs got included in. */ - void dispatch_user_outputs(const p2p::proposal &cons_prop, const p2p::sequence_hash &lcl_id) + void dispatch_consensed_user_outputs(const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id) { - if (cons_prop.output_hash == ctx.user_outputs_hashtree.root_hash()) + if (!consensed_users.empty()) { std::scoped_lock lock(usr::ctx.users_mutex); - // If final elected output hash matches our output hash, distribute the outputs - // to locally connected users. - for (auto &[hash, user_output] : ctx.generated_user_outputs) + for (const auto &[pubkey, cu] : consensed_users) { + if (cu.consensed_outputs.outputs.empty()) + continue; + // Find user to send by pubkey. - const auto user_itr = usr::ctx.users.find(user_output.user_pubkey); + const auto user_itr = usr::ctx.users.find(pubkey); if (user_itr != usr::ctx.users.end()) // match found { const usr::connected_user &user = user_itr->second; msg::usrmsg::usrmsg_parser parser(user.protocol); + // Get the collapsed hash tree with this user's output hash remaining independently. + util::merkle_hash_node collapsed_hash_root = ctx.user_outputs_hashtree.collapse(cu.consensed_outputs.hash); + // Send the outputs and signatures to the user. std::vector msg; - - // Get the collapsed hash tree with this user's output hash remaining independently. - util::merkle_hash_node collapsed_hash_root = ctx.user_outputs_hashtree.collapse(hash); - - std::vector outputs; - for (const sc::contract_output &output : user_output.outputs) - outputs.emplace_back(output.message); - - parser.create_contract_output_container(msg, outputs, collapsed_hash_root, ctx.user_outputs_unl_sig, lcl_id.seq_no, lcl_id.hash.to_string_view()); - + parser.create_contract_output_container(msg, cu.consensed_outputs.hash, cu.consensed_outputs.outputs, collapsed_hash_root, ctx.user_outputs_unl_sig, + lcl_id.seq_no, lcl_id.hash.to_string_view()); user.session.send(msg); } - - user_output.outputs.clear(); // We no longer need this user's outputs. } } - else - { - LOG_INFO << "Output required but didn't match our output hash."; - } - // Clear the output hash tree and signature because we no longer need it. - ctx.user_outputs_hashtree.clear(); - ctx.user_outputs_our_sig.clear(); - ctx.user_outputs_unl_sig.clear(); - ctx.generated_user_outputs.clear(); + cleanup_output_collections(); } /** * Transfers consensus-reached inputs into the provided contract buf map so it can be fed into the contract process. * @param bufmap The contract bufmap which needs to be populated with inputs. - * @param consensed_users Set of consensed users keyed by user binary pubkey and any inputs. + * @param consensed_users Set of consensed users keyed by user binary pubkey. */ void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const consensed_user_map &consensed_users) { - for (const auto &[pubkey, inputs] : consensed_users) + for (const auto &[pubkey, user] : consensed_users) { // Populate the buf map with user pubkey regardless of whether user has any inputs or not. // This is in case the contract wanted to emit some data to a user without needing any input. @@ -1139,7 +1166,7 @@ namespace consensus // Populate the input contents into the bufmap. // It's VERY important that we preserve the original input order when feeding to the contract as well. - for (const consensed_user_input &ci : inputs) + for (const consensed_user_input &ci : user.consensed_inputs) itr->second.inputs.push_back(ci.input); } } diff --git a/src/consensus.hpp b/src/consensus.hpp index 4d7cdd07..9ca5b034 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -21,34 +21,49 @@ namespace consensus const std::string user_pubkey; const util::buffer_view input; const uint64_t max_ledger_seq_no = 0; - const util::PROTOCOL protocol; - candidate_user_input(const std::string &user_pubkey, const util::buffer_view input, const uint64_t max_ledger_seq_no, const util::PROTOCOL protocol) - : user_pubkey(user_pubkey), input(input), max_ledger_seq_no(max_ledger_seq_no), protocol(protocol) + candidate_user_input(const std::string &user_pubkey, const util::buffer_view input, const uint64_t max_ledger_seq_no) + : user_pubkey(user_pubkey), input(input), max_ledger_seq_no(max_ledger_seq_no) { } }; /** - * Represents consensus reached user input. - * This is used in a map keyed by user pubkey. + * Represents a consensus reached user input. */ struct consensed_user_input { const std::string ordered_hash; // [nonce] + [input signature hash] const util::buffer_view input; // The input data buffer pointer. - const util::PROTOCOL protocol; // json/bson protocol used by the user when submitting the input. - consensed_user_input(const std::string &ordered_hash, const util::buffer_view input, const util::PROTOCOL protocol) - : ordered_hash(ordered_hash), input(input), protocol(protocol) + consensed_user_input(const std::string &ordered_hash, const util::buffer_view input) + : ordered_hash(ordered_hash), input(input) { } }; /** - * Consensed inputs map keyed by user binary pubkey. + * Represents all consensus reached outputs for a user. */ - typedef std::map> consensed_user_map; + struct consensed_user_output + { + std::string hash; // The hash of all outputs for the user. + std::vector outputs; + }; + + /** + * Represents a consensed user and any consensus-reached inputs/outputs for that user. + */ + struct consensed_user + { + std::vector consensed_inputs; + consensed_user_output consensed_outputs; + }; + + /** + * Consensed users map keyed by user binary pubkey. + */ + typedef std::map consensed_user_map; /** * Represents a contract-generated user output that takes part in consensus. @@ -134,15 +149,21 @@ namespace consensus int consensus(); + int commit_consensus_results(const p2p::proposal &cons_prop, const consensus::consensed_user_map &consensed_users, const util::h32 &patch_hash); + int check_sync_status(const size_t unl_count, vote_counter &votes); void check_sync_completion(); void revise_candidate_proposals(); - int prepare_consensed_users_and_inputs(consensed_user_map &consensed_users, const p2p::proposal &cons_prop); + int prepare_consensed_users(consensed_user_map &consensed_users, const p2p::proposal &cons_prop); - int purge_user_input_buffers(const consensed_user_map &consensed_users); + void expire_candidate_inputs(const p2p::sequence_hash &lcl_id); + + int cleanup_consensed_user_inputs(const consensed_user_map &consensed_users); + + void cleanup_output_collections(); bool wait_and_proceed_stage(); @@ -174,13 +195,11 @@ namespace consensus uint64_t get_stage_time_resolution(const uint64_t time); - int update_ledger(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const util::h32 &patch_hash, p2p::sequence_hash &new_lcl_id); - int execute_contract(const p2p::proposal &cons_prop, const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id); - int dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id); + void dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id); - void dispatch_user_outputs(const p2p::proposal &cons_prop, const p2p::sequence_hash &lcl_id); + void dispatch_consensed_user_outputs(const consensed_user_map &consensed_users, const p2p::sequence_hash &lcl_id); void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const consensed_user_map &consensed_users); diff --git a/src/ledger/ledger.cpp b/src/ledger/ledger.cpp index c0041604..d80980e3 100644 --- a/src/ledger/ledger.cpp +++ b/src/ledger/ledger.cpp @@ -85,12 +85,10 @@ namespace ledger /** * Create and save ledger record from the given proposal message. * @param proposal Consensus-reached Stage 3 proposal. - * @param consensed_users Users and their raw inputs received in this consensus round. - * @param generated_user_outputs Generated raw outputs in this consensus round. + * @param consensed_users Users and their raw inputs/outputs received in this consensus round. * @return Returns 0 on success -1 on error. */ - int save_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, - const std::map &generated_user_outputs) + int save_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users) { // Aqure hpfs rw session before accessing shards and insert ledger records. if (ledger_fs.acquire_rw_session() == -1) @@ -113,8 +111,8 @@ namespace ledger LEDGER_CREATE_ERROR; // Save blob data. - if ((!proposal.input_ordered_hashes.empty() || !generated_user_outputs.empty()) && - save_ledger_blob(new_ledger_hash, consensed_users, generated_user_outputs) == -1) + if ((!proposal.input_ordered_hashes.empty() || proposal.output_hash != util::h32_empty.to_string_view()) && + save_ledger_blob(new_ledger_hash, consensed_users) == -1) LEDGER_CREATE_ERROR; // Update the latest seq_no and lcl when ledger is created. @@ -410,15 +408,13 @@ namespace ledger } /** - * Save raw data from the consensed proposal. A blob file is only created if there is any user inputs or contract outputs + * Save raw data from the consensed proposal. A blob file is only created if there is any user inputs or outputs * to save disk space. * @param ledger_hash Hash of this ledger we are saving. - * @param consensed_users Users and their raw inputs consensed in this consensus round. - * @param generated_user_outputs Generated raw outputs in this consensus round. + * @param consensed_users Users and their raw inputs/outputs consensed in this consensus round. * @return Returns 0 on success -1 on error. */ - int save_ledger_blob(std::string_view ledger_hash, const consensus::consensed_user_map &consensed_users, - const std::map &generated_user_outputs) + int save_ledger_blob(std::string_view ledger_hash, const consensus::consensed_user_map &consensed_users) { // Construct shard path. uint64_t last_blob_shard_seq_no = ctx.get_last_blob_shard_id().seq_no; @@ -500,11 +496,11 @@ namespace ledger blob.ledger_hash = ledger_hash; // Include consensed user inputs. - for (const auto &[pubkey, inputs] : consensed_users) + for (const auto &[pubkey, user] : consensed_users) { const auto [itr, success] = blob.inputs.try_emplace(pubkey, std::vector()); - for (const consensus::consensed_user_input &ci : inputs) + for (const consensus::consensed_user_input &ci : user.consensed_inputs) { std::string input; if (usr::input_store.read_buf(ci.input, input) != -1) @@ -513,13 +509,13 @@ namespace ledger } // Include consensed user outputs. - for (const auto &[hash, user_output] : generated_user_outputs) + for (const auto &[pubkey, user] : consensed_users) { std::vector outputs; - for (const auto &output : user_output.outputs) - outputs.push_back(std::move(output.message)); + for (const std::string &output : user.consensed_outputs.outputs) + outputs.push_back(std::move(output)); - blob.outputs.emplace(user_output.user_pubkey, std::move(outputs)); + blob.outputs.emplace(pubkey, std::move(outputs)); } flatbuffers::FlatBufferBuilder builder(1024); diff --git a/src/ledger/ledger.hpp b/src/ledger/ledger.hpp index c4b80d45..8d8ab26a 100644 --- a/src/ledger/ledger.hpp +++ b/src/ledger/ledger.hpp @@ -70,16 +70,14 @@ namespace ledger void deinit(); - int save_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users, - const std::map &generated_user_outputs); + int save_ledger(const p2p::proposal &proposal, const consensus::consensed_user_map &consensed_users); int insert_ledger_record(sqlite3 *db, const p2p::sequence_hash ¤t_lcl_id, const uint64_t primary_shard_seq_no, const p2p::proposal &proposal, std::string &new_ledger_hash); int prepare_shard(sqlite3 **db, uint64_t &shard_seq_no, const uint64_t ledger_seq_no); - int save_ledger_blob(std::string_view ledger_hash, const consensus::consensed_user_map &consensed_users, - const std::map &generated_user_outputs); + int save_ledger_blob(std::string_view ledger_hash, const consensus::consensed_user_map &consensed_users); void remove_old_shards(const uint64_t led_shard_no, std::string_view shard_parent_dir); diff --git a/src/msg/bson/usrmsg_bson.cpp b/src/msg/bson/usrmsg_bson.cpp index 8495f6cc..1ce15a76 100644 --- a/src/msg/bson/usrmsg_bson.cpp +++ b/src/msg/bson/usrmsg_bson.cpp @@ -16,8 +16,8 @@ namespace msg::usrmsg::bson * Message format: * { * "type": "stat_response", - * "lcl_seq_no": , - * "lcl_hash": + * "ledger_seq_no": , + * "ledger_hash": * } */ constexpr const size_t MAX_KNOWN_PEERS_INFO = 10; @@ -30,9 +30,9 @@ namespace msg::usrmsg::bson encoder.string_value(msg::usrmsg::MSGTYPE_STAT_RESPONSE); encoder.key(msg::usrmsg::FLD_HP_VERSION); encoder.string_value(version::HP_VERSION); - encoder.key(msg::usrmsg::FLD_LCL_SEQ); + encoder.key(msg::usrmsg::FLD_LEDGER_SEQ_NO); encoder.int64_value(lcl_seq_no); - encoder.key(msg::usrmsg::FLD_LCL_HASH); + encoder.key(msg::usrmsg::FLD_LEDGER_HASH); encoder.byte_string_value(lcl_hash); encoder.key(msg::usrmsg::FLD_ROUND_TIME); encoder.uint64_value(conf::cfg.contract.roundtime); @@ -145,15 +145,21 @@ namespace msg::usrmsg::bson * Message format: * { * "type": "contract_output", - * "lcl_seq_no": , - * "lcl_hash": - * "outputs": [, , ...], // The output order is the hash order. - * "hashes": [], // Always includes user's output hash [output hash = hash(pubkey+all outputs for the user)] + * "ledger_seq_no": , + * "ledger_hash": , + * "outputs": [, , ...], // The output order is the hash generation order. + * "output_hash": , [output hash = hash(pubkey+all outputs for the user)] + * "hash_tree": [], // Collapsed merkle tree with user's hash element marked as null. * "unl_sig": [["", ""], ...] // Binary UNL pubkeys and signatures of root hash. * } - * @param content The contract binary output content to be put in the message. + * @param hash This user's combined output hash. [output hash = hash(pubkey+all outputs for the user)] + * @param outputs List of outputs for the user. + * @param hash_root Root node of the collapsed merkle hash tree for this round. + * @param unl_sig List of unl signatures issued on the root hash. (root hash = merkle root hash of hashes of all users) + * @param lcl_seq_no Current ledger seq no. + * @param lcl_hash Current ledger hash. */ - void create_contract_output_container(std::vector &msg, const ::std::vector &outputs, + void create_contract_output_container(std::vector &msg, std::string_view hash, const ::std::vector &outputs, const util::merkle_hash_node &hash_root, const std::vector> &unl_sig, const uint64_t lcl_seq_no, std::string_view lcl_hash) { @@ -161,9 +167,9 @@ namespace msg::usrmsg::bson encoder.begin_object(); encoder.key(msg::usrmsg::FLD_TYPE); encoder.string_value(msg::usrmsg::MSGTYPE_CONTRACT_OUTPUT); - encoder.key(msg::usrmsg::FLD_LCL_SEQ); + encoder.key(msg::usrmsg::FLD_LEDGER_SEQ_NO); encoder.int64_value(lcl_seq_no); - encoder.key(msg::usrmsg::FLD_LCL_HASH); + encoder.key(msg::usrmsg::FLD_LEDGER_HASH); encoder.byte_string_value(lcl_hash); encoder.key(msg::usrmsg::FLD_OUTPUTS); @@ -172,7 +178,10 @@ namespace msg::usrmsg::bson encoder.byte_string_value(outputs[i]); encoder.end_array(); - encoder.key(msg::usrmsg::FLD_HASHES); + encoder.key(msg::usrmsg::FLD_OUTPUT_HASH); + encoder.byte_string_value(hash); + + encoder.key(msg::usrmsg::FLD_HASH_TREE); populate_output_hash_array(encoder, hash_root); encoder.key(msg::usrmsg::FLD_UNL_SIG); @@ -473,7 +482,12 @@ namespace msg::usrmsg::bson { if (node.children.empty()) { - encoder.byte_string_value(node.hash); + // The retained node is serialized as null. + // This is so the client can identify the self-hash position within the hash tree. + if (node.is_retained) + encoder.null_value(); + else + encoder.byte_string_value(node.hash); return; } else diff --git a/src/msg/bson/usrmsg_bson.hpp b/src/msg/bson/usrmsg_bson.hpp index e97c99b7..286c7d54 100644 --- a/src/msg/bson/usrmsg_bson.hpp +++ b/src/msg/bson/usrmsg_bson.hpp @@ -15,7 +15,7 @@ namespace msg::usrmsg::bson void create_contract_read_response_container(std::vector &msg, std::string_view content); - void create_contract_output_container(std::vector &msg, const ::std::vector &outputs, + void create_contract_output_container(std::vector &msg, std::string_view hash, const ::std::vector &outputs, const util::merkle_hash_node &hash_root, const std::vector> &unl_sig, const uint64_t lcl_seq_no, std::string_view lcl_hash); diff --git a/src/msg/json/usrmsg_json.cpp b/src/msg/json/usrmsg_json.cpp index 74a6b14b..3144f0ed 100644 --- a/src/msg/json/usrmsg_json.cpp +++ b/src/msg/json/usrmsg_json.cpp @@ -136,8 +136,8 @@ namespace msg::usrmsg::json * Message format: * { * "type": "stat_response", - * "lcl_seq_no": , - * "lcl_hash": "" + * "ledger_seq_no": , + * "ledger_hash": "" * } */ void create_status_response(std::vector &msg, const uint64_t lcl_seq_no, std::string_view lcl_hash) @@ -154,11 +154,11 @@ namespace msg::usrmsg::json msg += SEP_COLON; msg += version::HP_VERSION; msg += SEP_COMMA; - msg += msg::usrmsg::FLD_LCL_SEQ; + msg += msg::usrmsg::FLD_LEDGER_SEQ_NO; msg += SEP_COLON_NOQUOTE; msg += std::to_string(lcl_seq_no); msg += SEP_COMMA_NOQUOTE; - msg += msg::usrmsg::FLD_LCL_HASH; + msg += msg::usrmsg::FLD_LEDGER_HASH; msg += SEP_COLON; msg += util::to_hex(lcl_hash); msg += SEP_COMMA; @@ -323,15 +323,21 @@ namespace msg::usrmsg::json * Message format: * { * "type": "contract_output", - * "lcl_seq_no": , - * "lcl_hash": "", - * "outputs": ["", "", ...], // The output order is the hash order. - * "hashes": [], // Always includes user's output hash [output hash = hash(pubkey+all outputs for the user)] + * "ledger_seq_no": , + * "ledger_hash": "", + * "outputs": ["", "", ...], // The output order is the hash generation order. + * "output_hash": "", [output hash = hash(pubkey+all outputs for the user)] + * "hash_tree": [], // Collapsed merkle tree with user's hash element marked as null. * "unl_sig": [["", ""], ...] // UNL pubkeys and signatures of root hash. * } - * @param content The contract binary output content to be put in the message. + * @param hash This user's combined output hash. [output hash = hash(pubkey+all outputs for the user)] + * @param outputs List of outputs for the user. + * @param hash_root Root node of the collapsed merkle hash tree. + * @param unl_sig List of unl signatures issued on the root hash. (root hash = combined merkle hash of hashes of all users) + * @param lcl_seq_no Current ledger seq no. + * @param lcl_hash Current ledger hash. */ - void create_contract_output_container(std::vector &msg, const ::std::vector &outputs, + void create_contract_output_container(std::vector &msg, std::string_view hash, const ::std::vector &outputs, const util::merkle_hash_node &hash_root, const std::vector> &unl_sig, const uint64_t lcl_seq_no, std::string_view lcl_hash) { @@ -341,11 +347,11 @@ namespace msg::usrmsg::json msg += SEP_COLON; msg += msg::usrmsg::MSGTYPE_CONTRACT_OUTPUT; msg += SEP_COMMA; - msg += msg::usrmsg::FLD_LCL_SEQ; + msg += msg::usrmsg::FLD_LEDGER_SEQ_NO; msg += SEP_COLON_NOQUOTE; msg += std::to_string(lcl_seq_no); msg += SEP_COMMA_NOQUOTE; - msg += msg::usrmsg::FLD_LCL_HASH; + msg += msg::usrmsg::FLD_LEDGER_HASH; msg += SEP_COLON; msg += util::to_hex(lcl_hash); msg += SEP_COMMA; @@ -379,10 +385,15 @@ namespace msg::usrmsg::json msg += "],\""; - msg += msg::usrmsg::FLD_HASHES; - msg += "\":"; + msg += msg::usrmsg::FLD_OUTPUT_HASH; + msg += SEP_COLON; + msg += util::to_hex(hash); + msg += SEP_COMMA; + + msg += msg::usrmsg::FLD_HASH_TREE; + msg += SEP_COLON_NOQUOTE; populate_output_hash_array(msg, hash_root); - msg += ",\""; + msg += SEP_COMMA_NOQUOTE; msg += msg::usrmsg::FLD_UNL_SIG; msg += "\":["; @@ -391,7 +402,7 @@ namespace msg::usrmsg::json const auto &sig = unl_sig[i]; // Pubkey and Signature pair. msg += "[\""; msg += util::to_hex(sig.first); - msg += "\",\""; + msg += SEP_COMMA; msg += util::to_hex(sig.second); msg += "\"]"; @@ -845,9 +856,18 @@ namespace msg::usrmsg::json { if (node.children.empty()) { - msg += "\""; - msg += util::to_hex(node.hash); - msg += "\""; + if (node.is_retained) + { + // The retained node is serialized as null. + // This is so the client can identify the self-hash position within the hash tree. + msg += "null"; + } + else + { + msg += "\""; + msg += util::to_hex(node.hash); + msg += "\""; + } return; } else diff --git a/src/msg/json/usrmsg_json.hpp b/src/msg/json/usrmsg_json.hpp index 2cc3fdfe..4bfbcae7 100644 --- a/src/msg/json/usrmsg_json.hpp +++ b/src/msg/json/usrmsg_json.hpp @@ -19,7 +19,7 @@ namespace msg::usrmsg::json void create_contract_read_response_container(std::vector &msg, std::string_view content); - void create_contract_output_container(std::vector &msg, const ::std::vector &outputs, + void create_contract_output_container(std::vector &msg, std::string_view hash, const ::std::vector &outputs, const util::merkle_hash_node &hash_root, const std::vector> &unl_sig, const uint64_t lcl_seq_no, std::string_view lcl_hash); diff --git a/src/msg/usrmsg_common.hpp b/src/msg/usrmsg_common.hpp index 53f70490..3dac89ef 100644 --- a/src/msg/usrmsg_common.hpp +++ b/src/msg/usrmsg_common.hpp @@ -28,11 +28,10 @@ namespace msg::usrmsg constexpr const char *FLD_MAX_LEDGER_SEQ_NO = "max_ledger_seq_no"; constexpr const char *FLD_CONTENT = "content"; constexpr const char *FLD_OUTPUTS = "outputs"; - constexpr const char *FLD_HASHES = "hashes"; + constexpr const char *FLD_OUTPUT_HASH = "output_hash"; + constexpr const char *FLD_HASH_TREE = "hash_tree"; constexpr const char *FLD_UNL_SIG = "unl_sig"; constexpr const char *FLD_NONCE = "nonce"; - constexpr const char *FLD_LCL_HASH = "lcl_hash"; - constexpr const char *FLD_LCL_SEQ = "lcl_seq_no"; constexpr const char *FLD_STATUS = "status"; constexpr const char *FLD_REASON = "reason"; constexpr const char *FLD_ROUND_TIME = "round_time"; @@ -55,7 +54,6 @@ namespace msg::usrmsg constexpr const char *FLD_STATE_HASH = "state_hash"; constexpr const char *FLD_CONFIG_HASH = "config_hash"; constexpr const char *FLD_USER_HASH = "user_hash"; - constexpr const char *FLD_OUTPUT_HASH = "output_hash"; constexpr const char *FLD_RAW_INPUTS = "raw_inputs"; constexpr const char *FLD_RAW_OUTPUTS = "raw_outputs"; constexpr const char *FLD_BLOBS = "blobs"; diff --git a/src/msg/usrmsg_parser.cpp b/src/msg/usrmsg_parser.cpp index bee7d3ac..06348f5a 100644 --- a/src/msg/usrmsg_parser.cpp +++ b/src/msg/usrmsg_parser.cpp @@ -38,14 +38,14 @@ namespace msg::usrmsg busrmsg::create_contract_read_response_container(msg, content); } - void usrmsg_parser::create_contract_output_container(std::vector &msg, const ::std::vector &outputs, + void usrmsg_parser::create_contract_output_container(std::vector &msg, std::string_view hash, const ::std::vector &outputs, const util::merkle_hash_node &hash_root, const std::vector> &unl_sig, const uint64_t lcl_seq_no, std::string_view lcl_hash) const { if (protocol == util::PROTOCOL::JSON) - jusrmsg::create_contract_output_container(msg, outputs, hash_root, unl_sig, lcl_seq_no, lcl_hash); + jusrmsg::create_contract_output_container(msg, hash, outputs, hash_root, unl_sig, lcl_seq_no, lcl_hash); else - busrmsg::create_contract_output_container(msg, outputs, hash_root, unl_sig, lcl_seq_no, lcl_hash); + busrmsg::create_contract_output_container(msg, hash, outputs, hash_root, unl_sig, lcl_seq_no, lcl_hash); } void usrmsg_parser::create_unl_list_container(std::vector &msg, const ::std::set &unl_list) const diff --git a/src/msg/usrmsg_parser.hpp b/src/msg/usrmsg_parser.hpp index b99d8481..6bed01fa 100644 --- a/src/msg/usrmsg_parser.hpp +++ b/src/msg/usrmsg_parser.hpp @@ -27,7 +27,7 @@ namespace msg::usrmsg void create_contract_read_response_container(std::vector &msg, std::string_view content) const; - void create_contract_output_container(std::vector &msg, const ::std::vector &outputs, + void create_contract_output_container(std::vector &msg, std::string_view hash, const ::std::vector &outputs, const util::merkle_hash_node &hash_root, const std::vector> &unl_sig, const uint64_t lcl_seq_no, std::string_view lcl_hash) const; diff --git a/src/usr/user_input.hpp b/src/usr/user_input.hpp index 55ba7e02..71c3adf1 100644 --- a/src/usr/user_input.hpp +++ b/src/usr/user_input.hpp @@ -23,7 +23,6 @@ namespace usr std::string nonce; uint64_t max_ledger_seq_no; std::string sig; - util::PROTOCOL protocol; // The message protocol used by the user. // Comparison operator used for sorting user's inputs in nonce order. bool operator<(const extracted_user_input &other) diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 4368bf2e..00d64f62 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -294,6 +294,8 @@ namespace usr const auto user_itr = usr::ctx.users.find(pubkey); if (user_itr != usr::ctx.users.end()) { + msg::usrmsg::usrmsg_parser parser(user_itr->second.protocol); + // Send the request status result if this user is connected to us. for (const input_status_response &resp : user_responses) { @@ -301,7 +303,6 @@ namespace usr // would have gotten the proper status response during first submission. if (resp.reject_reason != msg::usrmsg::REASON_ALREADY_SUBMITTED) { - msg::usrmsg::usrmsg_parser parser(resp.protocol); send_input_status(parser, user_itr->second.session, resp.reject_reason == NULL ? msg::usrmsg::STATUS_ACCEPTED : msg::usrmsg::STATUS_REJECTED, @@ -408,7 +409,6 @@ namespace usr } extracted.sig = std::move(submitted.sig); - extracted.protocol = submitted.protocol; return NULL; } diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index a38f0dec..a81ac859 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -66,7 +66,6 @@ namespace usr struct input_status_response { - const util::PROTOCOL protocol; const std::string input_hash; const char *reject_reason; }; diff --git a/src/util/merkle_hash_tree.cpp b/src/util/merkle_hash_tree.cpp index ad8c3368..96b542ec 100644 --- a/src/util/merkle_hash_tree.cpp +++ b/src/util/merkle_hash_tree.cpp @@ -100,6 +100,10 @@ namespace util if (retain_hash.empty() || node.hash == retain_hash) { node.children.clear(); // No need to dig deeper. + + if (node.hash == retain_hash) + node.is_retained = true; // Mark this node as the retained node. + return true; } else diff --git a/src/util/merkle_hash_tree.hpp b/src/util/merkle_hash_tree.hpp index f49ecfb5..fe36016b 100644 --- a/src/util/merkle_hash_tree.hpp +++ b/src/util/merkle_hash_tree.hpp @@ -9,6 +9,7 @@ namespace util { std::string hash; std::list children; + bool is_retained = false; }; class merkle_hash_tree diff --git a/test/metrics/metrics.js b/test/metrics/metrics.js index 146cc1e3..0ca77f2d 100644 --- a/test/metrics/metrics.js +++ b/test/metrics/metrics.js @@ -97,21 +97,24 @@ function singleUserInputOutput(payloadKB, requestCount) { const hpc = await createClient(); const timer = new Timer(); - hpc.on(HotPocket.events.contractOutput, (response) => { - respCount++; - if (respCount == requestCount) { - const runPeriod = timer.stop(); - hpc.close().then(() => resolve(runPeriod)); - } + hpc.on(HotPocket.events.contractOutput, (r) => { + r.outputs.forEach(response => { + respCount++; + if (respCount == requestCount) { + const runPeriod = timer.stop(); + hpc.close().then(() => resolve(runPeriod)); + } + }); }); timer.start(); for (let i = 0; i < requestCount; i++) { const nonce = i.toString().padStart(5); - hpc.sendContractInput(payload, nonce, 10).then(r => { - if (r.status != "ok") - console.log(r.reason); - }); + const input = await hpc.submitContractInput(payload, nonce, 10); + input.submissionStatus.then(s => { + if (s.status != "accepted") + console.log(s.reason); + });; } }) } @@ -142,18 +145,21 @@ function largePayload(payloadMB) { const hpc = await createClient(); const timer = new Timer(); - hpc.on(HotPocket.events.contractOutput, (response) => { - if (response.length < payload.length) - console.log("Payload length mismatch."); + hpc.on(HotPocket.events.contractOutput, (r) => { + r.outputs.forEach(response => { + if (response.length < payload.length) + console.log("Payload length mismatch."); - const runPeriod = timer.stop(); - hpc.close().then(() => resolve(runPeriod)); + const runPeriod = timer.stop(); + hpc.close().then(() => resolve(runPeriod)); + }) }); timer.start(); - await hpc.sendContractInput(payload).then(r => { - if (r.status != "ok") - console.log(r.reason); + const input = await hpc.submitContractInput(payload); + input.submissionStatus.then(s => { + if (s.status != "accepted") + console.log(s.reason); });; }) }