diff --git a/CMakeLists.txt b/CMakeLists.txt index f905e265..92ee45ec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -59,6 +59,7 @@ add_executable(hpcore src/ledger/ledger_sync.cpp src/ledger/ledger_serve.cpp src/ledger/ledger.cpp + src/debug_shell/debug_shell.cpp src/status.cpp src/consensus.cpp src/main.cpp @@ -77,7 +78,7 @@ target_link_libraries(hpcore add_custom_command(TARGET hpcore POST_BUILD # COMMAND strip ./build/hpcore - COMMAND cp ./test/bin/hpws ./test/bin/hpfs ./evernode-license.pdf ./build/ + COMMAND cp ./test/bin/hpws ./test/bin/hpfs ./test/bin/hpsh ./evernode-license.pdf ./build/ ) target_precompile_headers(hpcore PUBLIC src/pchheader.hpp) @@ -87,9 +88,9 @@ target_precompile_headers(hpcore PUBLIC src/pchheader.hpp) add_custom_target(docker COMMAND mkdir -p ./test/local-cluster/bin COMMAND cp ./build/hpcore ./test/local-cluster/bin/ - COMMAND cp ./test/bin/libblake3.so ./test/bin/hpws ./test/bin/hpfs ./test/local-cluster/bin/ + COMMAND cp ./test/bin/libblake3.so ./test/bin/hpws ./test/bin/hpfs ./test/bin/hpsh ./test/local-cluster/bin/ COMMAND cp ./evernode-license.pdf ./test/local-cluster/bin/ - COMMAND docker build -t hpcore:latest -t hpcore:0.6.4 -f ./test/local-cluster/Dockerfile ./test/local-cluster/bin/ + COMMAND docker build -t hpcore:latest -t hpcore:0.6.5 -f ./test/local-cluster/Dockerfile ./test/local-cluster/bin/ ) set_target_properties(docker PROPERTIES EXCLUDE_FROM_ALL TRUE) add_dependencies(docker hpcore) diff --git a/examples/js_client/file-client.js b/examples/js_client/file-client.js index d6a481ac..43ac05e0 100644 --- a/examples/js_client/file-client.js +++ b/examples/js_client/file-client.js @@ -67,22 +67,6 @@ async function main() { }); }) - // This will get fired when contract sends a read response. - hpc.on(HotPocket.events.contractReadResponse, (response) => { - const result = bson.deserialize(response); - if (result.type == "downloadResult") { - if (result.status == "ok") { - fs.writeFileSync(result.fileName, result.content.buffer); - console.log("File " + result.fileName + " downloaded to current directory."); - } - else { - console.log("File " + result.fileName + " download failed. reason: " + result.status); - } - } - else { - console.log("Unknown read request result."); - } - }) console.log("Ready to accept inputs."); @@ -122,10 +106,16 @@ async function main() { else if (inp.startsWith("download ")) { const fileName = inp.substr(9); - hpc.sendContractReadRequest(bson.serialize({ + hpc.submitContractReadRequest(bson.serialize({ type: "download", fileName: fileName - })); + })).then(reply => { + const res = bson.deserialize(reply); + if (res && res.type === 'downloadResult' && res.status === 'ok') + console.log(res.content); + else + console.log(res); + }); } else { console.log("Invalid command. [upload | delete | download ] expected.") diff --git a/examples/js_client/text-client.js b/examples/js_client/text-client.js index 1c3a61d4..1e34fa1f 100644 --- a/examples/js_client/text-client.js +++ b/examples/js_client/text-client.js @@ -123,6 +123,16 @@ async function main() { else if (inp === "stat") { hpc.getStatus().then(stat => console.log(stat)); } + else if (inp.startsWith("debug_shell ")) { + hpc.submitDebugShellRequest(inp.substr(12)).then(id => { + hpc.on(id, (reply) => { + if (reply.data) + console.log(reply.data); + else + console.error(reply.error); + }) + }); + } else { if (inp.startsWith("upload ")) { diff --git a/examples/nodejs_contract/diagnostic_contract.js b/examples/nodejs_contract/diagnostic_contract.js index 48650af9..d5c14e59 100644 --- a/examples/nodejs_contract/diagnostic_contract.js +++ b/examples/nodejs_contract/diagnostic_contract.js @@ -6,7 +6,7 @@ const filename = "file.dat"; const autofilePrefix = "autofile"; const autofileSize = 1 * 1024 * 1024; -const diagnosticContract = async (ctx) => { +const diagnosticContract = async (ctx, readOnly = false) => { // Collection of per-user promises to wait for. Each promise completes when inputs for that user is processed. const userHandlers = []; @@ -134,5 +134,16 @@ const diagnosticContract = async (ctx) => { } +const fallback = async (ctx) => { + console.log(`Fallback mode: Non consensus execution count: ${ctx.nonConsensusRounds}`); +} + + const hpc = new HotPocket.Contract(); -hpc.init(diagnosticContract); \ No newline at end of file +hpc.init({ + "consensus": async (ctx) => { await diagnosticContract(ctx); }, + "consensus_fallback": async (ctx) => { await fallback(ctx); }, + "read_req": async (ctx) => { await diagnosticContract(ctx, true); } +}); + + diff --git a/examples/nodejs_contract/echo_contract.js b/examples/nodejs_contract/echo_contract.js index 7b152b55..f065c9a5 100644 --- a/examples/nodejs_contract/echo_contract.js +++ b/examples/nodejs_contract/echo_contract.js @@ -5,10 +5,10 @@ const exectsFile = "exects.txt"; // HP smart contract is defined as a function which takes HP ExecutionContext as an argument. // HP considers execution as complete, when this function completes and all the NPL message callbacks are complete. -const echoContract = async (ctx) => { +const contract = async (ctx, readonly = false) => { // We just save execution timestamp as an example state file change. - if (!ctx.readonly) { + if (!readonly) { fs.appendFileSync(exectsFile, "ts:" + ctx.timestamp + "\n"); const stats = fs.statSync(exectsFile); @@ -56,7 +56,7 @@ const echoContract = async (ctx) => { // ctx.unl.find(""); // NPL messages example. - // if (!ctx.readonly) { + // if (!readonly) { // // Start listening to incoming NPL messages before we send ours. // const promise = new Promise((resolve, reject) => { // let timeout = setTimeout(() => { @@ -84,5 +84,33 @@ const echoContract = async (ctx) => { // await ctx.updateConfig(config); } +const fallback = async (ctx) => { + console.log(`Fallback mode: Non consensus execution count: ${ctx.nonConsensusRounds}`); + // NPL messages example. + // Start listening to incoming NPL messages before we send ours. + const promise = new Promise((resolve, reject) => { + let timeout = setTimeout(() => { + reject('NPL timeout.'); + }, 2000); + + let list = []; + ctx.unl.onMessage((node, msg) => { + console.log(`${node.publicKey} said ${msg} to me.`); + list.push(msg); + if (list.length == ctx.unl.list().length) { + clearTimeout(timeout); + resolve(); + } + }); + }); + + await ctx.unl.send("Hello"); + await promise; +} + const hpc = new HotPocket.Contract(); -hpc.init(echoContract); \ No newline at end of file +hpc.init({ + "consensus": async (ctx) => { await contract(ctx, false); }, + "consensus_fallback": async (ctx) => { await fallback(ctx); }, + "read_req": async (ctx) => { await contract(ctx, true); } +}); \ No newline at end of file diff --git a/examples/nodejs_contract/file_contract.js b/examples/nodejs_contract/file_contract.js index 55c3820b..6c239ace 100644 --- a/examples/nodejs_contract/file_contract.js +++ b/examples/nodejs_contract/file_contract.js @@ -2,7 +2,7 @@ const HotPocket = require("hotpocket-nodejs-contract"); const fs = require('fs'); const bson = require('bson'); -const fileContract = async (ctx) => { +const fileContract = async (ctx, readonly = false) => { for (const user of ctx.users.list()) { @@ -54,7 +54,7 @@ const fileContract = async (ctx) => { })); } } - else if (msg.type == "download") { + else if (readonly && msg.type == "download") { if (fs.existsSync(msg.fileName)) { const fileContent = fs.readFileSync(msg.fileName); await user.send(bson.serialize({ @@ -76,5 +76,14 @@ const fileContract = async (ctx) => { } }; + +const fallback = async (ctx) => { + console.log(`Fallback mode: Non consensus execution count: ${ctx.nonConsensusRounds}`); +} + const hpc = new HotPocket.Contract(); -hpc.init(fileContract, HotPocket.clientProtocols.bson); +hpc.init({ + "consensus": async (ctx) => { await fileContract(ctx); }, + "consensus_fallback": async (ctx) => { await fallback(ctx); }, + "read_req": async (ctx) => { await fileContract(ctx, true); } +}, HotPocket.clientProtocols.bson); diff --git a/examples/nodejs_contract/package-lock.json b/examples/nodejs_contract/package-lock.json index 4ae9c899..4b24d58a 100644 --- a/examples/nodejs_contract/package-lock.json +++ b/examples/nodejs_contract/package-lock.json @@ -6,7 +6,7 @@ "": { "dependencies": { "bson": "4.0.4", - "hotpocket-nodejs-contract": "0.5.10", + "hotpocket-nodejs-contract": "0.7.0", "seedrandom": "3.0.5" } }, @@ -37,9 +37,9 @@ } }, "node_modules/hotpocket-nodejs-contract": { - "version": "0.5.10", - "resolved": "https://registry.npmjs.org/hotpocket-nodejs-contract/-/hotpocket-nodejs-contract-0.5.10.tgz", - "integrity": "sha512-1Cw9WcyoQmJabGkjRbM4FZ1HMvUI3z6IVTbhi0BDiD+K1c6bHROPgNKbIHCSL15+Z5xvbHpeuOOPLGogGtdC7Q==" + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/hotpocket-nodejs-contract/-/hotpocket-nodejs-contract-0.7.0.tgz", + "integrity": "sha512-MHIhGGzPBsKkmXMZrr8L3ze6cSqlSKQ+V2J59/fSS7FDeo5+Z983ZOwt1aBvaLmkN22YOvtLVdBUn1EEamryOg==" }, "node_modules/ieee754": { "version": "1.1.13", @@ -82,9 +82,9 @@ } }, "hotpocket-nodejs-contract": { - "version": "0.5.10", - "resolved": "https://registry.npmjs.org/hotpocket-nodejs-contract/-/hotpocket-nodejs-contract-0.5.10.tgz", - "integrity": "sha512-1Cw9WcyoQmJabGkjRbM4FZ1HMvUI3z6IVTbhi0BDiD+K1c6bHROPgNKbIHCSL15+Z5xvbHpeuOOPLGogGtdC7Q==" + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/hotpocket-nodejs-contract/-/hotpocket-nodejs-contract-0.7.0.tgz", + "integrity": "sha512-MHIhGGzPBsKkmXMZrr8L3ze6cSqlSKQ+V2J59/fSS7FDeo5+Z983ZOwt1aBvaLmkN22YOvtLVdBUn1EEamryOg==" }, "ieee754": { "version": "1.1.13", diff --git a/examples/nodejs_contract/package.json b/examples/nodejs_contract/package.json index 31bca1ba..afe6db89 100644 --- a/examples/nodejs_contract/package.json +++ b/examples/nodejs_contract/package.json @@ -5,7 +5,7 @@ "build-diag": "ncc build diagnostic_contract.js -o dist/diagnostic-contract" }, "dependencies": { - "hotpocket-nodejs-contract": "0.5.10", + "hotpocket-nodejs-contract": "0.7.0", "bson": "4.0.4", "seedrandom": "3.0.5" } diff --git a/src/conf.cpp b/src/conf.cpp index 47e6e696..99ff3a11 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -194,6 +194,8 @@ namespace conf cfg.log.loggers.emplace("console"); cfg.log.loggers.emplace("file"); + cfg.debug_shell.enabled = false; + // Save the default settings into the config file. if (write_config(cfg) != 0) return -1; @@ -242,6 +244,7 @@ namespace conf ctx.hpws_exe_path = ctx.exe_dir + "/" + "hpws"; ctx.hpfs_exe_path = ctx.exe_dir + "/" + "hpfs"; + ctx.hpsh_exe_path = ctx.exe_dir + "/" + "hpsh"; ctx.contract_dir = basedir; ctx.config_dir = basedir + "/cfg"; @@ -511,6 +514,42 @@ namespace conf } } + // debug_shell + { + jpath = "debug_shell"; + + try + { + const jsoncons::ojson &debug_shell = d["debug_shell"]; + cfg.debug_shell.enabled = debug_shell["enabled"].as(); + + if (cfg.debug_shell.run_as.from_string(debug_shell["run_as"].as()) == -1) + { + std::cerr << "Invalid format for debug_shell run as config (\"uid>0:gid>0\" expected).\n"; + return -1; + } + + jpath = "debug_shell.users"; + cfg.debug_shell.users.clear(); + for (auto &userpk : debug_shell["users"].array_range()) + { + // Convert the public key hex of each node to binary and store it. + const std::string bin_pubkey = util::to_bin(userpk.as()); + if (bin_pubkey.empty()) + { + std::cerr << "Error decoding user pubkey list.\n"; + return -1; + } + cfg.debug_shell.users.emplace(bin_pubkey); + } + } + catch (const std::exception &e) + { + print_missing_field_error(jpath, e); + return -1; + } + } + return 0; } @@ -623,6 +662,20 @@ namespace conf d.insert_or_assign("log", log_config); } + // debug_shell configs + { + jsoncons::ojson debug_shell_config; + debug_shell_config.insert_or_assign("enabled", cfg.debug_shell.enabled); + debug_shell_config.insert_or_assign("run_as", cfg.debug_shell.run_as.to_string()); + jsoncons::ojson users(jsoncons::json_array_arg); + for (const auto &userpk : cfg.debug_shell.users) + { + users.push_back(util::to_hex(userpk)); + } + debug_shell_config.insert_or_assign("users", users); + d.insert_or_assign("debug_shell", debug_shell_config); + } + return write_json_file(ctx.config_file, d); } @@ -709,7 +762,7 @@ namespace conf */ int validate_contract_dir_paths() { - const std::string paths[8] = { + const std::string paths[9] = { ctx.contract_dir, ctx.config_file, ctx.contract_hpfs_dir, @@ -717,7 +770,8 @@ namespace conf ctx.tls_key_file, ctx.tls_cert_file, ctx.hpfs_exe_path, - ctx.hpws_exe_path}; + ctx.hpws_exe_path, + ctx.hpsh_exe_path}; for (const std::string &path : paths) { @@ -729,7 +783,7 @@ namespace conf << "openssl req -newkey rsa:2048 -new -nodes -x509 -days 365 -keyout tlskey.pem -out tlscert.pem\n" << "and add it to " + ctx.config_dir << std::endl; } - else if (path == ctx.hpfs_exe_path || path == ctx.hpws_exe_path) + else if (path == ctx.hpfs_exe_path || path == ctx.hpws_exe_path || path == ctx.hpsh_exe_path) { std::cerr << path << " binary does not exist.\n"; } @@ -952,10 +1006,14 @@ namespace conf jdoc.insert_or_assign("max_input_ledger_offset", contract.max_input_ledger_offset); jsoncons::ojson consensus; + jsoncons::ojson fallback; + fallback.insert_or_assign("execute", contract.consensus.fallback.execute); + consensus.insert_or_assign("mode", contract.consensus.mode == MODE::PUBLIC ? MODE_PUBLIC : MODE_PRIVATE); consensus.insert_or_assign("roundtime", contract.consensus.roundtime.load()); consensus.insert_or_assign("stage_slice", contract.consensus.stage_slice.load()); consensus.insert_or_assign("threshold", contract.consensus.threshold); + consensus.insert_or_assign("fallback", fallback); jdoc.insert_or_assign("consensus", consensus); jsoncons::ojson npl; @@ -1095,6 +1153,9 @@ namespace conf } contract.consensus.mode = jdoc["consensus"]["mode"].as() == MODE_PUBLIC ? MODE::PUBLIC : MODE::PRIVATE; + jpath = "contract.consensus.fallback"; + contract.consensus.fallback.execute = jdoc["consensus"]["fallback"]["execute"].as(); + jpath = "contract.npl"; if (jdoc["npl"]["mode"].as() != MODE_PUBLIC && jdoc["npl"]["mode"].as() != MODE_PRIVATE) { diff --git a/src/conf.hpp b/src/conf.hpp index 242d8d43..661fa84c 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -173,12 +173,18 @@ namespace conf size_t max_file_count = 0; // Max no. of log files to keep. }; + struct fallback_config + { + bool execute = false; // Whether or not to execute the contract on fallback mode. + }; + struct consensus_config { MODE mode; // If PUBLIC, consensus are broadcasted to non-unl nodes as well. std::atomic roundtime = 0; // Consensus round time in ms (max: 3,600,000). std::atomic stage_slice = 0; // Percentage slice of round time that stages 0,1,2 get (max: 33). - uint16_t threshold = 0; + uint16_t threshold = 0; // The minimum percentage of votes for accepting a stage 3 proposal to create a ledger. + fallback_config fallback; // Consensus fallback related configuration. }; struct npl_config @@ -208,6 +214,13 @@ namespace conf std::vector runtime_env_args; // Contract environment variables. }; + struct debug_shell_config + { + bool enabled = false; // Whether or not to enable debug_shell. + ugid run_as; // The user/groups id to execute the debug_shell as. + std::set users; // List of users who are allowed to perform debug_shell (list of binary public keys). + }; + struct user_config { uint16_t port = 0; // Listening port for public user connections @@ -263,6 +276,7 @@ namespace conf std::string exe_dir; // HotPocket executable dir. std::string hpws_exe_path; // hpws executable file path. std::string hpfs_exe_path; // hpfs executable file path. + std::string hpsh_exe_path; // debug_shell executable path file std::string contract_dir; // Contract base directory full path. std::string contract_hpfs_dir; // Contract hpfs metadata dir (The location of hpfs log file). @@ -316,14 +330,15 @@ namespace conf hpfs_config hpfs; log_config log; health_config health; // For debugging only. Not included in the config file. + debug_shell_config debug_shell; }; // Global contract context struct exposed to the application. - // Other modeuls will access context values via this. + // Other modules will access context values via this. extern contract_ctx ctx; // Global configuration struct exposed to the application. - // Other modeuls will access config values via this. + // Other modules will access config values via this. extern hp_config cfg; int init(); diff --git a/src/consensus.cpp b/src/consensus.cpp index 4efb7ee2..5e899b63 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -29,6 +29,7 @@ namespace consensus // Max no. of time to get unreliable votes before we try heuristics to increase vote receiving reliability. constexpr uint16_t MAX_UNRELIABLE_VOTES_ATTEMPTS = 5; + constexpr uint16_t FALLBACK_CONTRACT_TERMINATE_MARGIN = 100; consensus_context ctx; bool init_success = false; @@ -140,6 +141,9 @@ namespace consensus broadcast_nonunl_proposal(); } + // Keep copy of current stage since we might change the stage in following conditions. + uint8_t cur_stage = ctx.stage; + if (ctx.stage == 0) { // Prepare the consensus candidate user inputs that we have accumulated so far. (We receive them periodically via NUPs) @@ -147,7 +151,7 @@ namespace consensus if (verify_and_populate_candidate_user_inputs(lcl_id.seq_no) == -1) return -1; - const p2p::proposal p = create_stage0_proposal(state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id); + const p2p::proposal p = create_stage0_proposal(state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id, lcl_id); broadcast_proposal(p); ctx.stage = 1; // Transition to next stage. @@ -200,6 +204,7 @@ namespace consensus { ctx.stage = 0; ctx.unreliable_votes_attempts++; + ctx.non_consensus_rounds++; // If we get too many consecutive unreliable vote rounds, then we perform time config sniffing just in case the unreliable votes // are caused because our roundtime config information is different from other nodes. @@ -208,15 +213,19 @@ namespace consensus refresh_time_config(true); ctx.unreliable_votes_attempts = 0; } + + if (execute_contract_in_fallback(cur_stage, lcl_id)) + apply_consensed_patch_file_changes(); } else { ctx.unreliable_votes_attempts = 0; + ctx.non_consensus_rounds = 0; if (new_vote_status == status::VOTE_STATUS::SYNCED) { // If we are in sync, vote and broadcast the winning votes to next stage. - const p2p::proposal p = create_stage123_proposal(votes, unl_count, state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id); + const p2p::proposal p = create_stage123_proposal(votes, unl_count, state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id, lcl_id); broadcast_proposal(p); // This marks the moment we finish a sync cycle. We are in stage 1 and we just detected that our votes are in sync. @@ -238,34 +247,33 @@ namespace consensus } /** - * Scan the stage 3 proposals from last round and create the ledger if all majority critertia are met. + * Evaluates stage 3 proposals and populates winning info. + * @param proposal_count Number of stage proposals. + * @param winning_votes Number of votes fo winning proposal. + * @param winning_proposal The winning proposal. + * @param stage Stage to evaluate. */ - void attempt_ledger_close() + int evaluate_proposals(uint32_t &proposal_count, uint32_t &winning_votes, p2p::proposal &winning_proposal, const uint8_t stage) { std::map> proposal_groups; // Stores sets of proposals against grouped by their root hashes. - uint32_t stage3_prop_count = 0; // Keep track of the number of stage 3 proposals received. + proposal_count = 0; // Keep track of the number of stage 3 proposals received. // Count votes of all stage 3 proposal hashes. for (const auto &[pubkey, cp] : ctx.candidate_proposals) { - if (cp.stage == 3) + if (cp.stage == stage) { - stage3_prop_count++; + proposal_count++; proposal_groups[cp.root_hash].push_back(cp); } } - // Threshold is devided by 100 to convert average to decimal. - const uint32_t min_votes_required = ceil((conf::cfg.contract.consensus.threshold * unl::count()) / 100.0); - if (stage3_prop_count < min_votes_required) - { - // We don't have enough stage 3 proposals to create a ledger. - LOG_DEBUG << "Not enough stage 3 proposals to create a ledger. received:" << stage3_prop_count << " needed:" << min_votes_required; - return; - } + // Return -1 if there are no proposals to evaluate. + if (proposal_count == 0) + return -1; // Find the winning hash and no. of votes for it. - uint32_t winning_votes = 0; + winning_votes = 0; util::h32 winning_hash = util::h32_empty; for (const auto [hash, proposals] : proposal_groups) { @@ -276,16 +284,44 @@ namespace consensus } } - if (winning_votes < min_votes_required) + // Return -1 if there are no winning proposal. + if (winning_votes == 0) + return -1; + + // Consensus reached. This is the winning set of proposals. + std::vector &winning_group = proposal_groups[winning_hash]; + + winning_proposal = std::move(winning_group.front()); + + return winning_votes; + } + + /** + * Scan the stage 3 proposals from last round and create the ledger if all majority criteria are met. + * @return true on successful ledger closure and false on failure. + */ + void attempt_ledger_close() + { + uint32_t stage3_prop_count = 0; + uint32_t winning_votes = 0; + p2p::proposal winning_prop; + const uint32_t min_votes_required = ceil((conf::cfg.contract.consensus.threshold * unl::count()) / 100.0); + + evaluate_proposals(stage3_prop_count, winning_votes, winning_prop, 3); + + // Threshold is divided by 100 to convert average to decimal. + if (stage3_prop_count < min_votes_required) + { + // We don't have enough stage 3 proposals to create a ledger. + LOG_DEBUG << "Not enough stage 3 proposals to create a ledger. received:" << stage3_prop_count << " needed:" << min_votes_required; + return; + } + else if (winning_votes < min_votes_required) { LOG_INFO << "Cannot close ledger. Possible fork condition. won:" << winning_votes << " needed:" << min_votes_required; return; } - // Consensus reached. This is the winning set of proposals. - std::vector &winning_group = proposal_groups[winning_hash]; - - p2p::proposal &winning_prop = winning_group.front(); LOG_DEBUG << "Closing ledger with proposal:" << winning_prop.root_hash; // Upon successful ledger close condition, update the ledger and execute the contract using the consensus proposal. @@ -293,16 +329,18 @@ namespace consensus if (prepare_consensed_users(consensed_users, winning_prop) == -1 || commit_consensus_results(winning_prop, consensed_users) == -1) { - LOG_ERROR << "Error occured when closing ledger"; + LOG_ERROR << "Error occurred when closing ledger"; // Cleanup obsolete information before next round starts. cleanup_output_collections(); cleanup_consensed_user_inputs(consensed_users); } + + return; } /** - * Performs the consensus finalalization activities with the provided consensused information. + * Performs the consensus finalization activities with the provided consensed information. * @param cons_prop The proposal which reached consensus. * @param consensed_users Set of consensed users and their consensed inputs and outputs. */ @@ -338,7 +376,8 @@ namespace consensus // Apply consensed config patch file changes to the hpcore runtime and hp.cfg. const util::h32 patch_hash = sc::contract_fs.get_parent_hash(sc::PATCH_FILE_PATH); - if (apply_consensed_patch_file_changes(cons_prop.patch_hash, patch_hash) == -1) + // Check whether is there any patch changes to be applied which reached consensus. + if (patch_hash == cons_prop.patch_hash && apply_consensed_patch_file_changes() == -1) return -1; // Execute the smart contract with the consensed user inputs. @@ -795,7 +834,7 @@ namespace consensus if (ctx.contract_ctx->args.lcl_id == npl_msg.lcl_id) return ctx.contract_ctx->args.npl_messages.try_enqueue(std::move(npl_msg)); else - LOG_DEBUG << "Trying to add irrelevant NPL from " << util::to_hex(npl_msg.pubkey) << " | lcl-seq: " << npl_msg.lcl_id.seq_no; + LOG_DEBUG << "Trying to add irrelevant NPL from " << util::to_hex(npl_msg.pubkey) << " | lcl-seq: " << npl_msg.lcl_id.seq_no; } return false; } @@ -904,8 +943,8 @@ namespace consensus return 0; } - p2p::proposal create_stage0_proposal(const util::h32 &state_hash, const util::h32 &patch_hash, - const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id) + p2p::proposal create_stage0_proposal(const util::h32 &state_hash, const util::h32 &patch_hash, const util::sequence_hash &last_primary_shard_id, + const util::sequence_hash &last_raw_shard_id, const util::sequence_hash &lcl_id) { // This is the proposal that stage 0 votes on. // We report our own values in stage 0. @@ -917,6 +956,7 @@ namespace consensus p.last_primary_shard_id = last_primary_shard_id; p.last_raw_shard_id = last_raw_shard_id; p.time_config = CURRENT_TIME_CONFIG; + p.lcl_id = lcl_id; // In stage 0 proposals, we calculate a random nonce from this node to contribute to the group nonce std::string rand_bytes; @@ -939,7 +979,7 @@ namespace consensus } p2p::proposal create_stage123_proposal(vote_counter &votes, const size_t unl_count, const util::h32 &state_hash, const util::h32 &patch_hash, - const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id) + const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id, const util::sequence_hash &lcl_id) { // The proposal to be emited at the end of this stage. p2p::proposal p; @@ -955,6 +995,7 @@ namespace consensus p.time_config = CURRENT_TIME_CONFIG; p.output_hash.resize(BLAKE3_OUT_LEN); // Default empty hash. p.node_nonce = ctx.round_nonce; + p.lcl_id = lcl_id; const uint64_t time_now = util::get_epoch_milliseconds(); @@ -1208,7 +1249,7 @@ namespace consensus } sc::contract_execution_args &args = ctx.contract_ctx->args; - args.readonly = false; + args.mode = sc::EXECUTION_MODE::CONSENSUS; args.time = time; // lcl to be passed to the contract. @@ -1252,6 +1293,65 @@ namespace consensus return 0; } + /** + * Executes the contract on consensus failure. + * @param stage Stage the consensus are currently in. + * @param lcl_id Current lcl id of the node. + */ + bool execute_contract_in_fallback(const uint8_t stage, const util::sequence_hash &lcl_id) + { + // Do not execute if fallback is not enabled, HotPocket is shutting down. + if (!conf::cfg.contract.consensus.fallback.execute || ctx.is_shutting_down) + return false; + + // Pass new user store for inputs, We are not handling inputs in fallback mode right now. + util::buffer_store fallback_store; + { + std::scoped_lock lock(ctx.contract_ctx_mutex); + ctx.contract_ctx.emplace(fallback_store); + } + + sc::contract_execution_args &args = ctx.contract_ctx->args; + args.mode = sc::EXECUTION_MODE::CONSENSUS_FALLBACK; + + // lcl to be passed to the contract. + args.lcl_id = lcl_id; + args.non_consensus_rounds = ctx.non_consensus_rounds; + + uint32_t stage3_prop_count = 0; + uint32_t winning_votes = 0; + p2p::proposal winning_prop; + // Evaluate proposals and take tike from winning proposal. + if (evaluate_proposals(stage3_prop_count, winning_votes, winning_prop, (stage + 3) % 4) > 0) + args.time = winning_prop.time; + + // We should end the contact before this round end, Otherwise we'll trap inside a round reset loop. + // We keep a margin of 100 milliseconds to avoid contract monitor thread hangs. + args.end_before = (ctx.round_start_time + conf::cfg.contract.consensus.roundtime - FALLBACK_CONTRACT_TERMINATE_MARGIN); + + bool executed = false; + // Execute contract in fallback mode without user inputs or outputs. + if (sc::execute_contract(ctx.contract_ctx.value()) == -1) + { + LOG_ERROR << "Contract execution for fallback mode failed."; + } + else + { + // Get the new state hash after contract execution. + const util::h32 &new_state_hash = args.post_execution_state_hash; + // Update state hash in contract fs global hash tracker. + sc::contract_fs.set_parent_hash(sc::STATE_DIR_PATH, new_state_hash); + executed = true; + } + + { + std::scoped_lock lock(ctx.contract_ctx_mutex); + ctx.contract_ctx.reset(); + } + + return executed; + } + /** * Dispatch acceptence status responses to consensed user inputs, if the recipients are connected to us locally. * @param consensed_users The map of consensed users containing their inputs. @@ -1417,14 +1517,12 @@ namespace consensus /** * Apply patch file changes after verification from consensus. - * @param prop_patch_hash Hash of patch file which reached consensus. - * @param current_patch_hash Hash of the current patch file. * @return 0 on success. -1 on failure. */ - int apply_consensed_patch_file_changes(const util::h32 &prop_patch_hash, const util::h32 ¤t_patch_hash) + int apply_consensed_patch_file_changes() { - // Check whether is there any patch changes to be applied which reached consensus. - if (is_patch_update_pending && current_patch_hash == prop_patch_hash) + // Check whether is there any patch changes to be applied. + if (is_patch_update_pending) { if (sc::contract_fs.start_ro_session(HPFS_SESSION_NAME, false) != -1) { diff --git a/src/consensus.hpp b/src/consensus.hpp index 11434779..4d0e2b51 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -113,6 +113,7 @@ namespace consensus uint64_t round_boundry_offset = 0; // Time window boundry offset based on contract id. uint16_t unreliable_votes_attempts = 0; // No. of times we failed to get reliable votes continously. util::h32 round_nonce; // The random nonce generated by this node for this consensus round. + uint16_t non_consensus_rounds = 0; // No. of times we failed reach the consensus. // Indicates whether we are inside a sync cycle or not. Sync cycle is considered to being when we first detect that we are out of sync // and considered to end when we detect to be in sync inside stage 1 of a round for the first time after we began a sync. @@ -167,6 +168,8 @@ namespace consensus int consensus(); + int evaluate_proposals(uint32_t &proposal_count, uint32_t &winning_votes, p2p::proposal &winning_proposal, const uint8_t stage); + void attempt_ledger_close(); int commit_consensus_results(const p2p::proposal &cons_prop, const consensus::consensed_user_map &consensed_users); @@ -191,11 +194,11 @@ namespace consensus int verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no); - p2p::proposal create_stage0_proposal(const util::h32 &state_hash, const util::h32 &patch_hash, - const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id); + p2p::proposal create_stage0_proposal(const util::h32 &state_hash, const util::h32 &patch_hash, const util::sequence_hash &last_primary_shard_id, + const util::sequence_hash &last_raw_shard_id, const util::sequence_hash &lcl_id); p2p::proposal create_stage123_proposal(vote_counter &votes, const size_t unl_count, const util::h32 &state_hash, const util::h32 &patch_hash, - const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id); + const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id, const util::sequence_hash &lcl_id); void broadcast_proposal(const p2p::proposal &p); @@ -215,6 +218,8 @@ namespace consensus int execute_contract(const uint64_t time, const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id); + bool execute_contract_in_fallback(const uint8_t stage, const util::sequence_hash &lcl_id); + void dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id); void dispatch_consensed_user_outputs(const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id); @@ -232,7 +237,7 @@ namespace consensus bool push_control_message(const std::string &control_msg); - int apply_consensed_patch_file_changes(const util::h32 &prop_patch_hash, const util::h32 ¤t_patch_hash); + int apply_consensed_patch_file_changes(); void refresh_time_config(const bool perform_detection); diff --git a/src/debug_shell/debug_shell.cpp b/src/debug_shell/debug_shell.cpp new file mode 100644 index 00000000..582d6a4b --- /dev/null +++ b/src/debug_shell/debug_shell.cpp @@ -0,0 +1,313 @@ +#include "debug_shell.hpp" + +namespace debug_shell +{ + constexpr uint8_t DEBUG_SHELL_CTRL_TERMINATE = 0; + constexpr uint8_t DEBUG_SHELL_CTRL_SH = 1; + constexpr uint32_t POLL_TIMEOUT = 1000; + constexpr uint32_t READ_BUFFER_SIZE = 128 * 1024; + + debug_shell_context ctx; + + int init() + { + // Do not initialize if disabled in config. + if (!conf::cfg.debug_shell.enabled) + return 0; + + // Create a socket pair for the control channel. + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, ctx.control_fds) == -1) + { + LOG_ERROR << errno << ": Error initializing socket pair."; + return -1; + } + + // Create a child process for debug_shell process + ctx.debug_shell_pid = fork(); + if (ctx.debug_shell_pid == -1) + { + LOG_ERROR << errno << ": Error forking hpfs process."; + close(ctx.control_fds[0]); + close(ctx.control_fds[1]); + return -1; + } + else if (ctx.debug_shell_pid > 0) + { + // Close child end of socket and start the watcher thread. + close(ctx.control_fds[0]); + + ctx.watcher_thread = std::thread(response_watcher); + } + else if (ctx.debug_shell_pid == 0) + { + util::fork_detach(); + + close(ctx.control_fds[1]); + + std::string fd_str; + fd_str.resize(10); + snprintf(fd_str.data(), 10, "%d", ctx.control_fds[0]); + + char *argv[] = {(char *)conf::ctx.hpsh_exe_path.data(), fd_str.data(), NULL}; + + // Just before we execv the debug_shell binary, we set user execution user/group if specified in hp config. + // (Must set gid before setting uid) + if (!conf::cfg.debug_shell.run_as.empty() && (setgid(conf::cfg.debug_shell.run_as.gid) == -1 || setuid(conf::cfg.debug_shell.run_as.uid) == -1)) + { + std::cerr << errno << ": DebugShell process setgid/uid failed." + << "\n"; + exit(1); + } + + execv(argv[0], argv); + + std::cerr << errno << ": Error executing debug_shell." + << "\n"; + + close(ctx.control_fds[0]); + exit(1); + } + + ctx.is_initialized = true; + + LOG_INFO << "DebugShell handler started."; + + return 0; + } + + void deinit() + { + // This is not initialized if disabled in config. + if (!conf::cfg.debug_shell.enabled) + return; + + ctx.is_shutting_down = true; + + if (ctx.debug_shell_pid > 0) + send_terminate_message(); + + // Joining consensus processing thread. + if (ctx.watcher_thread.joinable()) + ctx.watcher_thread.join(); + + // close sockets. + close(ctx.control_fds[0]); + for (const auto &command : ctx.commands) + { + close(command.out_fd); + } + + if (ctx.debug_shell_pid > 0) + { + // Check if the debug_shell has exited voluntarily. + if (check_debug_shell_exited(false) == 0) + { + // Issue kill signal to kill the debug_shell process. + kill(ctx.debug_shell_pid, SIGKILL); + check_debug_shell_exited(true); // Blocking wait until exit. + } + } + + LOG_INFO << "DebugShell handler stopped."; + } + + int check_debug_shell_exited(const bool block) + { + int scstatus = 0; + const int wait_res = waitpid(ctx.debug_shell_pid, &scstatus, block ? 0 : WNOHANG); + + if (wait_res == 0) // Child still running. + { + return 0; + } + if (wait_res == -1) + { + LOG_ERROR << errno << ": DebugShell process waitpid error. pid:" << ctx.debug_shell_pid; + ctx.debug_shell_pid = 0; + return -1; + } + else // Child has exited + { + ctx.debug_shell_pid = 0; + + if (WIFEXITED(scstatus)) + { + LOG_DEBUG << "DebugShell process ended normally."; + return 1; + } + else + { + LOG_WARNING << "DebugShell process ended prematurely. Exit code " << WEXITSTATUS(scstatus); + return -1; + } + } + } + + int send_terminate_message() + { + return (write(ctx.control_fds[1], &DEBUG_SHELL_CTRL_TERMINATE, 1) < 0) ? -1 : 0; + } + + void remove_user_commands(std::string_view user_pubkey) + { + std::scoped_lock lock(ctx.command_mutex); + // Loop for all the child commands and delete the commands belongs to the user. + auto itr = ctx.commands.begin(); + while (itr != ctx.commands.end()) + { + if (itr->user_pubkey == user_pubkey) + { + // Close the file descriptor and remove the command from context. + close(itr->out_fd); + itr = ctx.commands.erase(itr); + } + else + { + itr++; + } + } + } + + int execute(std::string_view id, std::string_view user_pubkey, std::string_view message) + { + if (ctx.is_shutting_down) + return -1; + + if (conf::cfg.debug_shell.users.find(std::string(user_pubkey)) == conf::cfg.debug_shell.users.end()) + { + LOG_ERROR << "This user is not allowed to perform debug_shell operations."; + return -2; + } + + std::string buffer; + buffer.resize(message.size() + 1); + buffer[0] = DEBUG_SHELL_CTRL_SH; + memcpy(buffer.data() + 1, message.data(), message.size()); + + // Send the debug_shell request header. + if (write(ctx.control_fds[1], buffer.data(), message.size() + 1) < 0) + { + LOG_ERROR << errno << ": Error writing header message to control fd."; + return -1; + } + + // Read the control message which will contain the socket file descriptor. + struct msghdr child_msg = {0}; + memset(&child_msg, 0, sizeof(child_msg)); + char cmsgbuf[CMSG_SPACE(sizeof(int))]; + child_msg.msg_control = cmsgbuf; + child_msg.msg_controllen = sizeof(cmsgbuf); + + recvmsg(ctx.control_fds[1], &child_msg, 0); + + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg); + + // Skip if the message does not has file descriptor scm rights. + if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS) + { + LOG_ERROR << "Message sent on control line from debug_shell has non-scm_rights."; + return -1; + } + + int out_fd = -1; + memcpy(&out_fd, CMSG_DATA(cmsg), sizeof(out_fd)); + + if (out_fd <= 0) + { + LOG_ERROR << "Invalid file descriptor receives on control line from debug_shell"; + return -1; + } + + // Add the command to the context. + { + std::scoped_lock lock(ctx.command_mutex); + ctx.commands.push_back(command_context{std::string(id), std::string(user_pubkey), out_fd}); + } + + return 0; + } + + void response_watcher() + { + util::mask_signal(); + + while (!ctx.is_shutting_down) + { + // Iterate through received commands and check for outputs. + if (ctx.commands.size() > 0) + { + std::scoped_lock lock(ctx.command_mutex); + + auto itr = ctx.commands.begin(); + while (itr != ctx.commands.end()) + { + if (ctx.is_shutting_down) + break; + + struct pollfd pfd; + pfd.fd = itr->out_fd; + pfd.events = POLLIN; + + bool remove = false; + + // If child fd has data to read handle them. + const int poll_res = poll(&pfd, 1, POLL_TIMEOUT); + if (poll_res == -1) + { + LOG_ERROR << errno << ": Error in poll"; + remove = true; + } + else if (poll_res > 0 && (pfd.revents & POLLIN)) + { + // Read the response and send to the user. + std::string response; + response.resize(READ_BUFFER_SIZE); + const int res = read(pfd.fd, response.data(), READ_BUFFER_SIZE); + if (res > 0) + { + response.resize(res); + + // If response contains trailing new line, Remove it. + if (response[res - 1] == '\n') + response[res - 1] = '\0'; + + std::scoped_lock lock(usr::ctx.users_mutex); + + // Find the user session by user pubkey. + const auto user_itr = usr::ctx.users.find(itr->user_pubkey); + if (user_itr != usr::ctx.users.end()) // match found + { + const usr::connected_user &user = user_itr->second; + msg::usrmsg::usrmsg_parser parser(user.protocol); + usr::send_debug_shell_response(std::move(parser), user.session, itr->id, msg::usrmsg::STATUS_ACCEPTED, response); + response.clear(); + } + } + else if (res == -1) + { + LOG_ERROR << errno << ": Error reading from fd."; + remove = true; + } + else + { + LOG_DEBUG << "DebugShell has closed the connection."; + remove = true; + } + } + + if (remove) + { + // Close the file descriptor and remove the command from context. + close(itr->out_fd); + itr = ctx.commands.erase(itr); + } + else + { + itr++; + } + } + } + util::sleep(100); + } + } +} \ No newline at end of file diff --git a/src/debug_shell/debug_shell.hpp b/src/debug_shell/debug_shell.hpp new file mode 100644 index 00000000..1d0ea17a --- /dev/null +++ b/src/debug_shell/debug_shell.hpp @@ -0,0 +1,46 @@ +#ifndef _HP_DEBUG_SHELL_ +#define _HP_DEBUG_SHELL_ + +#include "../pchheader.hpp" +#include "../conf.hpp" +#include "../usr/usr.hpp" +#include "../msg/usrmsg_common.hpp" + +namespace debug_shell +{ + struct command_context + { + std::string id; + std::string user_pubkey; + int out_fd; + }; + + struct debug_shell_context + { + std::mutex command_mutex; + std::list commands; + int control_fds[2]; + int debug_shell_pid; + std::thread watcher_thread; + bool is_shutting_down; + bool is_initialized = false; + }; + + extern debug_shell_context ctx; + + int init(); + + void deinit(); + + int check_debug_shell_exited(const bool block); + + int send_terminate_message(); + + void remove_user_commands(std::string_view user_pubkey); + + int execute(std::string_view id, std::string_view user_pubkey, std::string_view message); + + void response_watcher(); +} + +#endif \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index bc711b27..2e463c48 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -16,6 +16,7 @@ #include "ledger/ledger.hpp" #include "unl.hpp" #include "killswitch/killswitch.h" +#include "debug_shell/debug_shell.hpp" /** * Parses CLI args and extracts HotPocket command and parameters given. @@ -75,6 +76,7 @@ void deinit() sc::deinit(); ledger::deinit(); conf::deinit(); + debug_shell::deinit(); } void sig_exit_handler(int signum) @@ -213,7 +215,8 @@ int main(int argc, char **argv) consensus::init() == -1 || read_req::init() == -1 || p2p::init() == -1 || - usr::init() == -1) + usr::init() == -1 || + debug_shell::init() == -1) { deinit(); return -1; diff --git a/src/msg/bson/usrmsg_bson.cpp b/src/msg/bson/usrmsg_bson.cpp index 7b7e26df..d41c0169 100644 --- a/src/msg/bson/usrmsg_bson.cpp +++ b/src/msg/bson/usrmsg_bson.cpp @@ -161,6 +161,43 @@ namespace msg::usrmsg::bson encoder.flush(); } + /** + * Constructs a debug_shell response message. + * @param msg Buffer to construct the generated bson message into. + * Message format: + * { + * "type": "debug_shell_response", + * "reply_for": "", + * "status": "", + * "content": "" + * "reason": "", + * } + * @param content The contract binary output content to be put in the message. + */ + void create_debug_shell_response_container(std::vector &msg, std::string_view reply_for, std::string_view status, std::string_view content, std::string_view reason) + { + jsoncons::bson::bson_bytes_encoder encoder(msg); + encoder.begin_object(); + encoder.key(msg::usrmsg::FLD_TYPE); + encoder.string_value(msg::usrmsg::MSGTYPE_DEBUG_SHELL_RESPONSE); + encoder.key(msg::usrmsg::FLD_REPLY_FOR); + encoder.string_value(reply_for); + encoder.key(msg::usrmsg::FLD_STATUS); + encoder.string_value(status); + encoder.key(msg::usrmsg::FLD_CONTENT); + encoder.byte_string_value(content); + + // Reject reason is only included for rejected inputs. + if (!reason.empty()) + { + encoder.key(msg::usrmsg::FLD_REASON); + encoder.string_value(reason); + } + + encoder.end_object(); + encoder.flush(); + } + /** * Constructs a contract read response message. * @param msg Buffer to construct the generated bson message into. @@ -196,7 +233,7 @@ namespace msg::usrmsg::bson * "ledger_hash": , * "outputs": [, , ...], // The output order is the hash generation order. * "output_hash": , [output hash = hash(pubkey+all outputs for the user)] - * "hash_tree": [], // Collapsed merkle tree with user's hash element marked as null. + * "hash_tree": [], // Collapsed merkle tree with user's hash element marked as null. * "unl_sig": [["", ""], ...] // Binary UNL pubkeys and signatures of root hash. * } * @param hash This user's combined output hash. [output hash = hash(pubkey+all outputs for the user)] @@ -466,7 +503,7 @@ namespace msg::usrmsg::bson /** * Extracts a contract read request message sent by user. - * + * * @param extracted_content The content to be passed to the contract, extracted from the message. * @param d The bson document holding the read request message. * Accepted signed input container format: @@ -497,11 +534,44 @@ namespace msg::usrmsg::bson return 0; } + /** + * Extracts a debug_shell input message sent by user. + * + * @param extracted_content The content to be passed to the debug_shell, extracted from the message. + * @param d The bson document holding the debug_shell input message. + * Accepted signed input container format: + * { + * "type": "debug_shell_request", + * "id": "", + * "content": + * } + * @return 0 on successful extraction. -1 for failure. + */ + int extract_debug_shell_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::ojson &d) + { + if (!d.contains(msg::usrmsg::FLD_ID) || !d[msg::usrmsg::FLD_ID].is()) + { + LOG_DEBUG << "DebugShell input 'id' field missing or invalid."; + return -1; + } + + if (!d.contains(msg::usrmsg::FLD_CONTENT) || !d[msg::usrmsg::FLD_CONTENT].is_byte_string_view()) + { + LOG_DEBUG << "DebugShell input 'content' field missing or invalid."; + return -1; + } + + extracted_id = d[msg::usrmsg::FLD_ID].as(); + const jsoncons::byte_string_view &bsv = d[msg::usrmsg::FLD_CONTENT].as_byte_string_view(); + extracted_content = std::string_view(reinterpret_cast(bsv.data()), bsv.size()); + return 0; + } + /** * Extracts a signed input container message sent by user. - * + * * @param extracted_input_container The input container extracted from the message. - * @param extracted_sig The binary signature extracted from the message. + * @param extracted_sig The binary signature extracted from the message. * @param d The bson document holding the input container. * Accepted signed input container format: * { diff --git a/src/msg/bson/usrmsg_bson.hpp b/src/msg/bson/usrmsg_bson.hpp index e5ac95dd..405c72d4 100644 --- a/src/msg/bson/usrmsg_bson.hpp +++ b/src/msg/bson/usrmsg_bson.hpp @@ -17,6 +17,8 @@ namespace msg::usrmsg::bson void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash); + void create_debug_shell_response_container(std::vector &msg, std::string_view reply_for, std::string_view status, std::string_view content, std::string_view reason); + void create_contract_read_response_container(std::vector &msg, std::string_view reply_for, std::string_view content); void create_contract_output_container(std::vector &msg, std::string_view hash, const ::std::vector &outputs, @@ -43,6 +45,8 @@ namespace msg::usrmsg::bson int extract_read_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::ojson &d); + int extract_debug_shell_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::ojson &d); + int extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig, const jsoncons::ojson &d); diff --git a/src/msg/fbuf/p2pmsg.fbs b/src/msg/fbuf/p2pmsg.fbs index acb0ab61..1c1d32fa 100644 --- a/src/msg/fbuf/p2pmsg.fbs +++ b/src/msg/fbuf/p2pmsg.fbs @@ -68,6 +68,7 @@ table ProposalMsg { output_sig:[ubyte]; state_hash: [ubyte]; patch_hash: [ubyte]; + lcl_id:SequenceHash; last_primary_shard_id:SequenceHash; last_raw_shard_id: SequenceHash; diff --git a/src/msg/fbuf/p2pmsg_conversion.cpp b/src/msg/fbuf/p2pmsg_conversion.cpp index 4c70923a..9d850e61 100644 --- a/src/msg/fbuf/p2pmsg_conversion.cpp +++ b/src/msg/fbuf/p2pmsg_conversion.cpp @@ -157,6 +157,7 @@ namespace msg::fbuf::p2pmsg p.stage = msg.stage(); p.state_hash = flatbuf_bytes_to_sv(msg.state_hash()); p.patch_hash = flatbuf_bytes_to_sv(msg.patch_hash()); + p.lcl_id = flatbuf_seqhash_to_seqhash(msg.lcl_id()); p.last_primary_shard_id = flatbuf_seqhash_to_seqhash(msg.last_primary_shard_id()); p.last_raw_shard_id = flatbuf_seqhash_to_seqhash(msg.last_raw_shard_id()); @@ -432,6 +433,7 @@ namespace msg::fbuf::p2pmsg sv_to_flatbuf_bytes(builder, p.output_sig), hash_to_flatbuf_bytes(builder, p.state_hash), hash_to_flatbuf_bytes(builder, p.patch_hash), + seqhash_to_flatbuf_seqhash(builder, p.lcl_id), seqhash_to_flatbuf_seqhash(builder, p.last_primary_shard_id), seqhash_to_flatbuf_seqhash(builder, p.last_raw_shard_id)); diff --git a/src/msg/fbuf/p2pmsg_generated.h b/src/msg/fbuf/p2pmsg_generated.h index 799b4e80..c297e5c5 100644 --- a/src/msg/fbuf/p2pmsg_generated.h +++ b/src/msg/fbuf/p2pmsg_generated.h @@ -1006,8 +1006,9 @@ struct ProposalMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VT_OUTPUT_SIG = 24, VT_STATE_HASH = 26, VT_PATCH_HASH = 28, - VT_LAST_PRIMARY_SHARD_ID = 30, - VT_LAST_RAW_SHARD_ID = 32 + VT_LCL_ID = 30, + VT_LAST_PRIMARY_SHARD_ID = 32, + VT_LAST_RAW_SHARD_ID = 34 }; const flatbuffers::Vector *pubkey() const { return GetPointer *>(VT_PUBKEY); @@ -1087,6 +1088,12 @@ struct ProposalMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { flatbuffers::Vector *mutable_patch_hash() { return GetPointer *>(VT_PATCH_HASH); } + const msg::fbuf::p2pmsg::SequenceHash *lcl_id() const { + return GetPointer(VT_LCL_ID); + } + msg::fbuf::p2pmsg::SequenceHash *mutable_lcl_id() { + return GetPointer(VT_LCL_ID); + } const msg::fbuf::p2pmsg::SequenceHash *last_primary_shard_id() const { return GetPointer(VT_LAST_PRIMARY_SHARD_ID); } @@ -1126,6 +1133,8 @@ struct ProposalMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { verifier.VerifyVector(state_hash()) && VerifyOffset(verifier, VT_PATCH_HASH) && verifier.VerifyVector(patch_hash()) && + VerifyOffset(verifier, VT_LCL_ID) && + verifier.VerifyTable(lcl_id()) && VerifyOffset(verifier, VT_LAST_PRIMARY_SHARD_ID) && verifier.VerifyTable(last_primary_shard_id()) && VerifyOffset(verifier, VT_LAST_RAW_SHARD_ID) && @@ -1177,6 +1186,9 @@ struct ProposalMsgBuilder { void add_patch_hash(flatbuffers::Offset> patch_hash) { fbb_.AddOffset(ProposalMsg::VT_PATCH_HASH, patch_hash); } + void add_lcl_id(flatbuffers::Offset lcl_id) { + fbb_.AddOffset(ProposalMsg::VT_LCL_ID, lcl_id); + } void add_last_primary_shard_id(flatbuffers::Offset last_primary_shard_id) { fbb_.AddOffset(ProposalMsg::VT_LAST_PRIMARY_SHARD_ID, last_primary_shard_id); } @@ -1210,12 +1222,14 @@ inline flatbuffers::Offset CreateProposalMsg( flatbuffers::Offset> output_sig = 0, flatbuffers::Offset> state_hash = 0, flatbuffers::Offset> patch_hash = 0, + flatbuffers::Offset lcl_id = 0, flatbuffers::Offset last_primary_shard_id = 0, flatbuffers::Offset last_raw_shard_id = 0) { ProposalMsgBuilder builder_(_fbb); builder_.add_time(time); builder_.add_last_raw_shard_id(last_raw_shard_id); builder_.add_last_primary_shard_id(last_primary_shard_id); + builder_.add_lcl_id(lcl_id); builder_.add_patch_hash(patch_hash); builder_.add_state_hash(state_hash); builder_.add_output_sig(output_sig); @@ -1246,6 +1260,7 @@ inline flatbuffers::Offset CreateProposalMsgDirect( const std::vector *output_sig = nullptr, const std::vector *state_hash = nullptr, const std::vector *patch_hash = nullptr, + flatbuffers::Offset lcl_id = 0, flatbuffers::Offset last_primary_shard_id = 0, flatbuffers::Offset last_raw_shard_id = 0) { auto pubkey__ = pubkey ? _fbb.CreateVector(*pubkey) : 0; @@ -1273,6 +1288,7 @@ inline flatbuffers::Offset CreateProposalMsgDirect( output_sig__, state_hash__, patch_hash__, + lcl_id, last_primary_shard_id, last_raw_shard_id); } diff --git a/src/msg/json/usrmsg_json.cpp b/src/msg/json/usrmsg_json.cpp index a4d5b7af..b875198c 100644 --- a/src/msg/json/usrmsg_json.cpp +++ b/src/msg/json/usrmsg_json.cpp @@ -34,7 +34,7 @@ namespace msg::usrmsg::json * Constructs user challenge message json and the challenge string required for * initial user challenge handshake. This gets called when a user establishes * a web socket connection to HP. - * + * * @param msg Buffer to construct the generated json message string into. * Message format: * { @@ -84,7 +84,7 @@ namespace msg::usrmsg::json /** * Constructs server challenge response message json. This gets sent when we receive * a challenge from the user. - * + * * @param msg Buffer to construct the generated json message string into. * Message format: * { @@ -272,7 +272,7 @@ namespace msg::usrmsg::json * { * "type": "contract_input_status", * "status": "", - * "reason": "", + * "reason": "", * "input_hash": "", * "ledger_seq_no": , * "ledger_hash": "" @@ -325,6 +325,68 @@ namespace msg::usrmsg::json msg += "\"}"; } + /** + * Constructs a debug_shell response message. + * @param msg Buffer to construct the generated json message string into. + * Message format: + * { + * "type": "debug_shell_response", + * "reply_for": "", + * "status": "", + * "content": "" + * "reason": "", + * } + * @param content The contract binary output content to be put in the message. + */ + void create_debug_shell_response_container(std::vector &msg, std::string_view reply_for, std::string_view status, std::string_view content, std::string_view reason) + { + msg.reserve(content.size() + 256); + msg += "{\""; + msg += msg::usrmsg::FLD_TYPE; + msg += SEP_COLON; + msg += msg::usrmsg::MSGTYPE_DEBUG_SHELL_RESPONSE; + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_REPLY_FOR; + msg += SEP_COLON; + msg += reply_for; + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_STATUS; + msg += SEP_COLON; + msg += status; + msg += SEP_COMMA; + msg += msg::usrmsg::FLD_CONTENT; + msg += SEP_COLON_NOQUOTE; + + if (is_json_string(content)) + { + // Process the final string using jsoncons. + jsoncons::json jstring = content; + jsoncons::json_options options; + options.escape_all_non_ascii(true); + + std::string escaped_content; + jstring.dump(escaped_content); + + msg += escaped_content; + } + else + { + msg += content; + } + + // Reject reason is only included for rejected inputs. + if (!reason.empty()) + { + msg += SEP_COMMA_NOQUOTE; + msg += msg::usrmsg::FLD_REASON; + msg += SEP_COLON; + msg += reason; + msg += DOUBLE_QUOTE; + } + + msg += "}"; + } + /** * Constructs a contract read response message. * @param msg Buffer to construct the generated json message string into. @@ -381,7 +443,7 @@ namespace msg::usrmsg::json * "ledger_hash": "", * "outputs": ["", "", ...], // The output order is the hash generation order. * "output_hash": "", [output hash = hash(pubkey+all outputs for the user)] - * "hash_tree": [], // Collapsed merkle tree with user's hash element marked as null. + * "hash_tree": [], // Collapsed merkle tree with user's hash element marked as null. * "unl_sig": [["", ""], ...] // UNL pubkeys and signatures of root hash. * } * @param hash This user's combined output hash. [output hash = hash(pubkey+all outputs for the user)] @@ -566,12 +628,12 @@ namespace msg::usrmsg::json * { * "type": "health_event", * "event": "proposal" | "connectivity", - * + * * // proposal * "comm_latency": {min:0, max:0, avg:0}, * "read_latency": {min:0, max:0, avg:0} * "batch_size": 0 - * + * * // connectivity * "peer_count": 0, * "weakly_connected": true | false @@ -697,7 +759,7 @@ namespace msg::usrmsg::json /** * Verifies the user handshake response with the original challenge issued to the user * and the user public key contained in the response. - * + * * @param extracted_pubkeyhex The hex public key extracted from the response. * @param extracted_protocol The protocol code extracted from the response. * @param extracted_server_challenge Any server challenge issued by user. @@ -842,7 +904,7 @@ namespace msg::usrmsg::json /** * Extracts a contract read request message sent by user. - * + * * @param extracted_content The content to be passed to the contract, extracted from the message. * @param d The json document holding the read request message. * Accepted signed input container format: @@ -872,11 +934,43 @@ namespace msg::usrmsg::json return 0; } + /** + * Extracts a debug_shell input message sent by user. + * + * @param extracted_content The content to be passed to the, extracted from the message. + * @param d The json document holding the debug_shell input message. + * Accepted signed input container format: + * { + * "type": "debug_shell_request", + * "id": "", + * "content": "" + * } + * @return 0 on successful extraction. -1 for failure. + */ + int extract_debug_shell_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::json &d) + { + if (!d.contains(msg::usrmsg::FLD_ID) || !d[msg::usrmsg::FLD_ID].is()) + { + LOG_DEBUG << "DebugShell input 'id' field missing or invalid."; + return -1; + } + + if (!d.contains(msg::usrmsg::FLD_CONTENT) || !d[msg::usrmsg::FLD_CONTENT].is()) + { + LOG_DEBUG << "DebugShell input 'content' field missing or invalid."; + return -1; + } + + extracted_id = d[msg::usrmsg::FLD_ID].as(); + extracted_content = d[msg::usrmsg::FLD_CONTENT].as(); + return 0; + } + /** * Extracts a signed input container message sent by user. - * + * * @param extracted_input_container The input container extracted from the message. - * @param extracted_sig The binary signature extracted from the message. + * @param extracted_sig The binary signature extracted from the message. * @param d The json document holding the input container. * Accepted signed input container format: * { diff --git a/src/msg/json/usrmsg_json.hpp b/src/msg/json/usrmsg_json.hpp index f959b8be..6fe2b610 100644 --- a/src/msg/json/usrmsg_json.hpp +++ b/src/msg/json/usrmsg_json.hpp @@ -21,6 +21,8 @@ namespace msg::usrmsg::json void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash); + void create_debug_shell_response_container(std::vector &msg, std::string_view reply_for, std::string_view status, std::string_view content, std::string_view reason); + void create_contract_read_response_container(std::vector &msg, std::string_view reply_for, std::string_view content); void create_contract_output_container(std::vector &msg, std::string_view hash, const ::std::vector &outputs, @@ -47,6 +49,8 @@ namespace msg::usrmsg::json int extract_read_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::json &d); + int extract_debug_shell_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::json &d); + int extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig, const jsoncons::json &d); diff --git a/src/msg/usrmsg_common.hpp b/src/msg/usrmsg_common.hpp index 3a6ca510..689661a6 100644 --- a/src/msg/usrmsg_common.hpp +++ b/src/msg/usrmsg_common.hpp @@ -80,6 +80,8 @@ namespace msg::usrmsg constexpr const char *MSGTYPE_SERVER_CHALLENGE_RESPONSE = "server_challenge_response"; constexpr const char *MSGTYPE_CONTRACT_READ_REQUEST = "contract_read_request"; constexpr const char *MSGTYPE_CONTRACT_READ_RESPONSE = "contract_read_response"; + constexpr const char *MSGTYPE_DEBUG_SHELL_REQUEST = "debug_shell_request"; + constexpr const char *MSGTYPE_DEBUG_SHELL_RESPONSE = "debug_shell_response"; constexpr const char *MSGTYPE_CONTRACT_INPUT = "contract_input"; constexpr const char *MSGTYPE_CONTRACT_INPUT_STATUS = "contract_input_status"; constexpr const char *MSGTYPE_CONTRACT_OUTPUT = "contract_output"; @@ -106,6 +108,9 @@ namespace msg::usrmsg constexpr const char *REASON_NONCE_EXPIRED = "nonce_expired"; constexpr const char *REASON_ALREADY_SUBMITTED = "already_submitted"; constexpr const char *REASON_ROUND_INPUTS_OVERFLOW = "round_inputs_overflow"; + constexpr const char *REASON_USER_NOT_ALLOWED = "user_not_allowed"; + constexpr const char *REASON_NOT_INITIALIZED = "not_initialized"; + constexpr const char *REASON_INTERNAL_ERROR = "internal_error"; constexpr const char *QUERY_FILTER_BY_SEQ_NO = "seq_no"; constexpr const char *STR_TRUE = "true"; constexpr const char *STR_FALSE = "false"; diff --git a/src/msg/usrmsg_parser.cpp b/src/msg/usrmsg_parser.cpp index 48a0b8ba..6cc8dbf4 100644 --- a/src/msg/usrmsg_parser.cpp +++ b/src/msg/usrmsg_parser.cpp @@ -38,6 +38,14 @@ namespace msg::usrmsg busrmsg::create_contract_input_status(msg, status, reason, input_hash, ledger_seq_no, ledger_hash); } + void usrmsg_parser::create_debug_shell_response_container(std::vector &msg, std::string_view reply_for, std::string_view status, std::string_view content, std::string_view reason) const + { + if (protocol == util::PROTOCOL::JSON) + jusrmsg::create_debug_shell_response_container(msg, reply_for, status, content, reason); + else + busrmsg::create_debug_shell_response_container(msg, reply_for, status, content, reason); + } + void usrmsg_parser::create_contract_read_response_container(std::vector &msg, std::string_view reply_for, std::string_view content) const { if (protocol == util::PROTOCOL::JSON) @@ -121,6 +129,14 @@ namespace msg::usrmsg return busrmsg::extract_read_request(extracted_id, extracted_content, bdoc); } + int usrmsg_parser::extract_debug_shell_request(std::string &extracted_id, std::string &extracted_content) const + { + if (protocol == util::PROTOCOL::JSON) + return jusrmsg::extract_debug_shell_request(extracted_id, extracted_content, jdoc); + else + return busrmsg::extract_debug_shell_request(extracted_id, extracted_content, bdoc); + } + int usrmsg_parser::extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig) const { if (protocol == util::PROTOCOL::JSON) diff --git a/src/msg/usrmsg_parser.hpp b/src/msg/usrmsg_parser.hpp index 51ca0af9..698acaba 100644 --- a/src/msg/usrmsg_parser.hpp +++ b/src/msg/usrmsg_parser.hpp @@ -26,6 +26,8 @@ namespace msg::usrmsg void create_contract_input_status(std::vector &msg, std::string_view status, std::string_view reason, std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash) const; + void create_debug_shell_response_container(std::vector &msg, std::string_view reply_for, std::string_view status, std::string_view content, std::string_view reason) const; + void create_contract_read_response_container(std::vector &msg, std::string_view reply_for, std::string_view content) const; void create_contract_output_container(std::vector &msg, std::string_view hash, const ::std::vector &outputs, @@ -49,6 +51,8 @@ namespace msg::usrmsg int extract_read_request(std::string &extracted_id, std::string &extracted_content) const; + int extract_debug_shell_request(std::string &extracted_id, std::string &extracted_content) const; + int extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig) const; int extract_input_container(std::string &input, uint64_t &nonce, diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 2890bf5a..3feb43eb 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -62,6 +62,7 @@ namespace p2p std::set input_ordered_hashes; std::string output_hash; std::string output_sig; + util::sequence_hash lcl_id; }; struct nonunl_proposal diff --git a/src/sc/sc.cpp b/src/sc/sc.cpp index 284da8a5..4535fd77 100644 --- a/src/sc/sc.cpp +++ b/src/sc/sc.cpp @@ -91,7 +91,7 @@ namespace sc ctx.working_dir = contract_fs.physical_path(ctx.args.hpfs_session_name, STATE_DIR_PATH); // Setup contract output log file paths (for consensus execution only). - if (conf::cfg.contract.log.enable && !ctx.args.readonly) + if (conf::cfg.contract.log.enable && ctx.args.mode == EXECUTION_MODE::CONSENSUS) { // We keep appending logs to the same out/err files (Rollout log files are maintained according to the hp config settings). const std::string prefix = ctx.args.hpfs_session_name; @@ -122,14 +122,17 @@ namespace sc // (Note: User socket will only be used for contract output only. For feeding user inputs we are using a memfd.) if (create_iosockets_for_fdmap(ctx.user_fds, ctx.args.userbufs) == -1 || create_iosockets(ctx.control_fds, SOCK_SEQPACKET) == -1 || - (!ctx.args.readonly && create_iosockets(ctx.npl_fds, SOCK_SEQPACKET) == -1)) + (ctx.args.mode != EXECUTION_MODE::READ_REQUEST && create_iosockets(ctx.npl_fds, SOCK_SEQPACKET) == -1)) { cleanup_fds(ctx); stop_hpfs_session(ctx); return -1; } - LOG_DEBUG << "Starting contract process..." << (ctx.args.readonly ? " (rdonly)" : ""); + std::string_view mode = ctx.args.get_exec_mode_str(); + + LOG_DEBUG << "Starting contract process..." + << "(" << mode << ")"; int ret = 0; const pid_t pid = fork(); @@ -157,14 +160,18 @@ namespace sc if (insert_demarkation_line(ctx) == -1) { - std::cerr << errno << ": Contract process inserting demarkation line failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n"; + std::cerr << errno << ": Contract process inserting demarkation line failed." + << "(" << mode << ")" + << "\n"; exit(1); } // Set process resource limits. if (set_process_rlimits() == -1) { - std::cerr << errno << ": Failed to set contract process resource limits." << (ctx.args.readonly ? " (rdonly)" : "") << "\n"; + std::cerr << errno << ": Failed to set contract process resource limits." + << "(" << mode << ")" + << "\n"; exit(1); } @@ -195,7 +202,9 @@ namespace sc if (chdir(ctx.working_dir.c_str()) == -1) { - std::cerr << errno << ": Contract process chdir failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n"; + std::cerr << errno << ": Contract process chdir failed." + << "(" << mode << ")" + << "\n"; exit(1); } @@ -203,27 +212,32 @@ namespace sc // (Must set gid before setting uid) if (!conf::cfg.contract.run_as.empty() && (setgid(conf::cfg.contract.run_as.gid) == -1 || setuid(conf::cfg.contract.run_as.uid) == -1)) { - std::cerr << errno << ": Contract process setgid/uid failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n"; + std::cerr << errno << ": Contract process setgid/uid failed." + << "(" << mode << ")" + << "\n"; exit(1); } // We do not create logs files in readonly execution due to the difficulty in managing the log file limits. - (conf::cfg.contract.log.enable && !ctx.args.readonly) + (conf::cfg.contract.log.enable && ctx.args.mode == EXECUTION_MODE::CONSENSUS) ? execv_and_redirect_logs(execv_len - 1, (const char **)execv_args, ctx.stdout_file, ctx.stderr_file, (const char **)env_args) : execve(execv_args[0], execv_args, env_args); - std::cerr << errno << ": Contract process execve() failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n"; + std::cerr << errno << ": Contract process execve() failed." + << "(" << mode << ")" + << "\n"; exit(1); } else { - LOG_ERROR << errno << ": fork() failed when starting contract process." << (ctx.args.readonly ? " (rdonly)" : ""); + LOG_ERROR << errno << ": fork() failed when starting contract process." + << "(" << mode << ")"; ret = -1; } cleanup_fds(ctx); // If the consensus contact finished executing successfully, run the post-exec.sh script if it exists. - if (ctx.exit_success && !ctx.args.readonly && run_post_exec_script(ctx) == -1) + if (ctx.exit_success && ctx.args.mode == EXECUTION_MODE::CONSENSUS && run_post_exec_script(ctx) == -1) ret = -1; if (stop_hpfs_session(ctx) == -1) @@ -283,16 +297,20 @@ namespace sc else // Child has exited { ctx.contract_pid = 0; + std::string_view mode = ctx.args.get_exec_mode_str(); if (WIFEXITED(scstatus)) { ctx.exit_success = true; - LOG_DEBUG << "Contract process" << (ctx.args.readonly ? " (rdonly)" : "") << " ended normally."; + LOG_DEBUG << "Contract process ended normally. " + << "(" << mode << ")"; return 1; } else { - LOG_WARNING << "Contract process" << (ctx.args.readonly ? " (rdonly)" : "") << " ended prematurely. Exit code " << WEXITSTATUS(scstatus); + LOG_WARNING << "Contract process ended prematurely. " + << "(" << mode << ") " + << "Exit code " << WEXITSTATUS(scstatus); return -1; } } @@ -303,11 +321,11 @@ namespace sc */ int start_hpfs_session(execution_context &ctx) { - if (!ctx.args.readonly) + if (ctx.args.mode != EXECUTION_MODE::READ_REQUEST) ctx.args.hpfs_session_name = hpfs::RW_SESSION_NAME; - return ctx.args.readonly ? contract_fs.start_ro_session(ctx.args.hpfs_session_name, false) - : contract_fs.acquire_rw_session(); + return ctx.args.mode == EXECUTION_MODE::READ_REQUEST ? contract_fs.start_ro_session(ctx.args.hpfs_session_name, false) + : contract_fs.acquire_rw_session(); } /** @@ -315,7 +333,7 @@ namespace sc */ int stop_hpfs_session(execution_context &ctx) { - if (ctx.args.readonly) + if (ctx.args.mode == EXECUTION_MODE::READ_REQUEST) { return contract_fs.stop_ro_session(ctx.args.hpfs_session_name); } @@ -357,7 +375,7 @@ namespace sc * "public_key": "", * "private_key": "", * "timestamp": , - * "readonly": , + * "mode": , * "lcl_seq_no": "", * "lcl_hex": "", * "control_fd": fd, @@ -367,7 +385,7 @@ namespace sc * "unl":[ "", ... ] * } */ - int write_contract_args(const execution_context &ctx, const int user_inputs_fd) + int write_contract_args(execution_context &ctx, const int user_inputs_fd) { // Populate the json string with contract args. // We don't use a JSON parser here because it's lightweight to contrstuct the @@ -379,15 +397,20 @@ namespace sc << "\",\"public_key\":\"" << conf::cfg.node.public_key_hex << "\",\"private_key\":\"" << conf::cfg.node.private_key_hex << "\",\"timestamp\":" << ctx.args.time - << ",\"readonly\":" << (ctx.args.readonly ? "true" : "false"); + << ",\"mode\":\"" << ctx.args.get_exec_mode_str() << "\""; - if (!ctx.args.readonly) + if (ctx.args.mode != EXECUTION_MODE::READ_REQUEST) { os << ",\"lcl_seq_no\":" << ctx.args.lcl_id.seq_no << ",\"lcl_hash\":\"" << util::to_hex(ctx.args.lcl_id.hash.to_string_view()) << "\",\"npl_fd\":" << ctx.npl_fds.scfd; } + if (ctx.args.mode == EXECUTION_MODE::CONSENSUS_FALLBACK) + { + os << ",\"non_consensus_rounds\":" << ctx.args.non_consensus_rounds; + } + os << ",\"control_fd\":" << ctx.control_fds.scfd; os << ",\"user_in_fd\":" << user_inputs_fd @@ -435,11 +458,19 @@ namespace sc // We record the start time of the monitoring thread to track the contract execution timeout. const uint64_t start_time = util::get_epoch_milliseconds(); - const uint64_t exec_timeout = conf::cfg.contract.round_limits.exec_timeout; + // If this is in fallback mode, we should terminate the contract before the given timeout. + const uint64_t exec_timeout = ctx.args.mode == EXECUTION_MODE::CONSENSUS_FALLBACK ? (ctx.args.end_before - start_time) : conf::cfg.contract.round_limits.exec_timeout; + + // If this is in fallback mode, check wether we haven't passed the round end. If not, do not execute. + if (ctx.args.mode == EXECUTION_MODE::CONSENSUS_FALLBACK && start_time >= ctx.args.end_before) + { + LOG_INFO << "Contract process end time has passed."; + return; + } // Prepare output poll fd list. - // User out fds + control fd + NPL fd (NPL fd not available in readonly mode) - const size_t out_fd_count = ctx.user_fds.size() + (ctx.args.readonly ? 1 : 2); + // User out fds + control fd + NPL fd (NPL fd not available in read request mode) + const size_t out_fd_count = ctx.user_fds.size() + (ctx.args.mode == EXECUTION_MODE::READ_REQUEST ? 1 : 2); const size_t control_fd_idx = ctx.user_fds.size(); const size_t npl_fd_idx = control_fd_idx + 1; struct pollfd out_fds[out_fd_count]; @@ -480,7 +511,7 @@ namespace sc // Attempt to read messages from contract (regardless of contract terminated or not). const int control_read_res = read_control_outputs(ctx, out_fds[control_fd_idx]); - const int npl_read_res = ctx.args.readonly ? 0 : read_npl_outputs(ctx, &out_fds[npl_fd_idx]); + const int npl_read_res = ctx.args.mode == EXECUTION_MODE::READ_REQUEST ? 0 : read_npl_outputs(ctx, &out_fds[npl_fd_idx]); const int user_read_res = read_contract_fdmap_outputs(ctx.user_fds, out_fds, ctx.args.userbufs); messages_read = (control_read_res + npl_read_res + user_read_res) > 0; @@ -495,7 +526,7 @@ namespace sc { // Reaching here means the contract is still running. Attempt to write any queued messages to the contract. - const int npl_write_res = ctx.args.readonly ? 0 : write_npl_messages(ctx); + const int npl_write_res = ctx.args.mode == EXECUTION_MODE::READ_REQUEST ? 0 : write_npl_messages(ctx); if (npl_write_res == -1) break; @@ -550,24 +581,31 @@ namespace sc util::fork_detach(); const std::string script_args = std::to_string(ctx.args.lcl_id.seq_no) + " " + util::to_hex(ctx.args.lcl_id.hash.to_string_view()); + std::string_view mode = ctx.args.get_exec_mode_str(); // We set current working dir and pass command line arg to the script. char *argv[] = {(char *)POST_EXEC_SCRIPT, (char *)script_args.data(), (char *)NULL}; if (chdir(ctx.working_dir.c_str()) == -1) { - std::cerr << errno << ": Post-exec script chdir failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n"; + std::cerr << errno << ": Post-exec script chdir failed." + << "(" << mode << ")" + << "\n"; exit(1); } // Set user execution user/group if specified (Must set gid before setting uid). if (!conf::cfg.contract.run_as.empty() && (setgid(conf::cfg.contract.run_as.gid) == -1 || setuid(conf::cfg.contract.run_as.uid) == -1)) { - std::cerr << errno << ": Post-exec script setgid/uid failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n"; + std::cerr << errno << ": Post-exec script setgid/uid failed." + << "(" << mode << ")" + << "\n"; exit(1); } conf::cfg.contract.log.enable ? execv_and_redirect_logs(2, (const char **)argv, ctx.stdout_file, ctx.stderr_file) : execv(argv[0], argv); - std::cerr << errno << ": Post-exec script execv() failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n"; + std::cerr << errno << ": Post-exec script execv() failed." + << "(" << mode << ")" + << "\n"; exit(1); } else if (pid > 0) @@ -904,7 +942,7 @@ namespace sc */ int insert_demarkation_line(execution_context &ctx) { - if (!conf::cfg.contract.log.enable || ctx.args.readonly) + if (!conf::cfg.contract.log.enable || ctx.args.mode != EXECUTION_MODE::CONSENSUS) return 0; // The permissions of a created file are restricted by the process's current umask, so group and world write are always disabled by default. @@ -1066,7 +1104,7 @@ namespace sc void close_unused_fds(execution_context &ctx, const bool is_hp) { - if (!ctx.args.readonly) + if (ctx.args.mode != EXECUTION_MODE::READ_REQUEST) { close_unused_socket_fds(is_hp, ctx.npl_fds); } diff --git a/src/sc/sc.hpp b/src/sc/sc.hpp index 7ef679b6..442e2fd0 100644 --- a/src/sc/sc.hpp +++ b/src/sc/sc.hpp @@ -19,6 +19,14 @@ namespace sc constexpr uint16_t MAX_NPL_MSG_QUEUE_SIZE = 255; // Maximum npl message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31).... constexpr uint16_t MAX_CONTROL_MSG_QUEUE_SIZE = 255; // Maximum out message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31).... + // Contract execution mode. + enum EXECUTION_MODE + { + CONSENSUS = 0, + CONSENSUS_FALLBACK = 1, + READ_REQUEST = 2 + }; + struct fd_pair { int hpfd = -1; @@ -27,7 +35,7 @@ namespace sc /** * Stores contract output message length along with the message. Length is used to construct the message from the stream buffer. - */ + */ struct contract_output { uint32_t message_len = 0; @@ -67,8 +75,8 @@ namespace sc */ struct contract_execution_args { - // Whether the contract should execute in read only mode (to serve read requests). - bool readonly = false; + // Contract execution mode. + EXECUTION_MODE mode; // hpfs session name used for this execution. std::string hpfs_session_name; @@ -94,12 +102,32 @@ namespace sc // State hash after execution will be copied to this (not applicable to read only mode). util::h32 post_execution_state_hash = util::h32_empty; + uint64_t end_before; + + // This is for fallback mode, No of times we failed reach the consensus. + uint16_t non_consensus_rounds = 0; + contract_execution_args(util::buffer_store &user_input_store) : user_input_store(user_input_store), npl_messages(MAX_NPL_MSG_QUEUE_SIZE), control_messages(MAX_CONTROL_MSG_QUEUE_SIZE) { } + + // Convert execution mode to string. + std::string_view get_exec_mode_str() + { + if (mode == EXECUTION_MODE::CONSENSUS_FALLBACK) + { + return "consensus_fallback"; + } + else if (mode == EXECUTION_MODE::READ_REQUEST) + { + return "read_req"; + } + + return "consensus"; + } }; /** @@ -164,7 +192,7 @@ namespace sc int stop_hpfs_session(execution_context &ctx); - int write_contract_args(const execution_context &ctx, const int user_inputs_fd); + int write_contract_args(execution_context &ctx, const int user_inputs_fd); void contract_monitor_loop(execution_context &ctx); diff --git a/src/unl.cpp b/src/unl.cpp index 47ff4cd3..2282c26b 100644 --- a/src/unl.cpp +++ b/src/unl.cpp @@ -12,8 +12,9 @@ namespace unl { struct node_stat { - uint32_t time_config = 0; // Roundtime config of this node. - uint64_t active_on = 0; // Latest timestamp we received a proposal from this node. + uint32_t time_config = 0; // Roundtime config of this node. + uint64_t active_on = 0; // Latest timestamp we received a proposal from this node. + util::sequence_hash lcl_id; // Current HotPocket lcl (seq no. and ledger hash hex) }; std::map list; // List of binary pubkeys of UNL nodes and their statistics. @@ -61,7 +62,7 @@ namespace unl * Check whether the given pubkey is in the unl list. * @param bin_pubkey Pubkey to check for existence. * @return Return true if the given pubkey is in the unl list. - */ + */ bool exists(const std::string &bin_pubkey) { std::shared_lock lock(unl_mutex); @@ -70,7 +71,7 @@ namespace unl /** * Replace the unl list from the latest unl list from patch file. - */ + */ void update_unl_changes_from_patch() { bool is_unl_list_changed = false; @@ -100,6 +101,7 @@ namespace unl if (itr != list.end()) { changes_made = true; + itr->second.lcl_id = p.lcl_id; itr->second.active_on = p.recv_timestamp; itr->second.time_config = p.time_config; } @@ -210,7 +212,9 @@ namespace unl // Convert binary pubkey into hex. os << "\"" << util::to_hex(node->first) << "\":{" - << "\"active_on\":" << node->second.active_on + << "\"active_on\":" << node->second.active_on << "," + << "\"lcl_seq_no\":" << node->second.lcl_id.seq_no << "," + << "\"lcl_hash\":\"" << util::to_hex(node->second.lcl_id.hash.to_string_view()) << "\"" << "}"; } os << "}"; diff --git a/src/usr/read_req.cpp b/src/usr/read_req.cpp index 9bd93867..1619f9ab 100644 --- a/src/usr/read_req.cpp +++ b/src/usr/read_req.cpp @@ -219,7 +219,7 @@ namespace read_req void initialize_execution_context(const user_read_req &read_request, const pthread_t thread_id, sc::execution_context &contract_ctx) { contract_ctx.args.hpfs_session_name = "ro_" + std::to_string(thread_id); - contract_ctx.args.readonly = true; + contract_ctx.args.mode = sc::EXECUTION_MODE::READ_REQUEST; sc::contract_iobufs user_bufs; user_bufs.inputs.push_back(read_request.content); contract_ctx.args.userbufs.try_emplace(read_request.pubkey, std::move(user_bufs)); diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 061f48e8..9512eb63 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -17,6 +17,7 @@ #include "user_input.hpp" #include "read_req.hpp" #include "input_nonce_map.hpp" +#include "../debug_shell/debug_shell.hpp" namespace usr { @@ -204,7 +205,7 @@ namespace usr const int nonce_status = nonce_map.check(user.pubkey, nonce, sig, max_ledger_seq_no, true); if (nonce_status == 0) { - //Add to the submitted input list. + // Add to the submitted input list. user.submitted_inputs.push_back(submitted_user_input{ std::move(input_container), std::move(sig), @@ -272,6 +273,37 @@ namespace usr user.session.send(resp); return 0; } + else if (msg_type == msg::usrmsg::MSGTYPE_DEBUG_SHELL_REQUEST) + { + std::string id, content; + if (parser.extract_debug_shell_request(id, content) == -1) + { + send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_BAD_MSG_FORMAT, ""); + return -1; + } + + // If debug_shell is initialized, send status reject. + if (!debug_shell::ctx.is_initialized) + { + send_debug_shell_response(parser, user.session, id, msg::usrmsg::STATUS_REJECTED, "", msg::usrmsg::REASON_NOT_INITIALIZED); + return -1; + } + + const int res = debug_shell::execute(id, user.pubkey, content); + // Send user npt allowed status if not allowed. + if (res == -1) + { + send_debug_shell_response(parser, user.session, id, msg::usrmsg::STATUS_REJECTED, "", msg::usrmsg::REASON_INTERNAL_ERROR); + return -1; + } + else if (res == -2) + { + send_debug_shell_response(parser, user.session, id, msg::usrmsg::STATUS_REJECTED, "", msg::usrmsg::REASON_USER_NOT_ALLOWED); + return -1; + } + + return 0; + } else { LOG_DEBUG << "Invalid user message type: " << msg_type; @@ -333,6 +365,17 @@ namespace usr } } + /** + * Send the specified debug_shell request status result via the provided session. + */ + void send_debug_shell_response(const msg::usrmsg::usrmsg_parser &parser, usr::user_comm_session &session, std::string_view reply_for, + std::string_view status, std::string_view content, std::string_view reason) + { + std::vector msg; + parser.create_debug_shell_response_container(msg, reply_for, status, content, reason); + session.send(msg); + } + /** * Send the specified contract input status result via the provided session. */ @@ -348,7 +391,7 @@ namespace usr /** * Adds the user denoted by specified session id and public key to the global authed user list. * This should get called after the challenge handshake is verified. - * + * * @param session User socket session. * @param user_pubkey_hex User's hex public key. * @param protocol_code Messaging protocol used by user. @@ -397,14 +440,20 @@ namespace usr /** * Removes the specified public key from the global user list. * This must get called when an authenticated user disconnects from HP. - * + * * @param pubkey User pubkey. * @return 0 on successful removals. -1 on failure. */ int remove_user(const std::string &pubkey) { - std::scoped_lock lock(ctx.users_mutex); - ctx.users.erase(pubkey); + { + std::scoped_lock lock(ctx.users_mutex); + ctx.users.erase(pubkey); + } + // Remove any debug_shell commands sent by the user. + if (debug_shell::ctx.is_initialized) + debug_shell::remove_user_commands(pubkey); + return 0; } diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index 6e9ceef3..93e053ee 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -95,6 +95,9 @@ namespace usr void send_input_status_responses(const std::unordered_map> &responses, const uint64_t ledger_seq_no = 0, const util::h32 &ledger_hash = util::h32_empty); + void send_debug_shell_response(const msg::usrmsg::usrmsg_parser &parser, usr::user_comm_session &session, std::string_view reply_for, + std::string_view status, std::string_view content, std::string_view reason = ""); + void send_input_status(const msg::usrmsg::usrmsg_parser &parser, usr::user_comm_session &session, std::string_view status, std::string_view reason, std::string_view input_hash, const uint64_t ledger_seq_no = 0, const util::h32 &ledger_hash = util::h32_empty); diff --git a/src/util/version.hpp b/src/util/version.hpp index 2a0e4bc4..21fdcee6 100644 --- a/src/util/version.hpp +++ b/src/util/version.hpp @@ -6,10 +6,10 @@ namespace version { // HotPocket version. Written to new configs and p2p/user messages. - constexpr const char *HP_VERSION = "0.6.4"; + constexpr const char *HP_VERSION = "0.6.5"; // Minimum compatible config version (this will be used to validate configs). - constexpr const char *MIN_CONFIG_VERSION = "0.6.3"; + constexpr const char *MIN_CONFIG_VERSION = "0.6.5"; // Ledger file storage version. All nodes in a cluster MUST use the same ledger version. constexpr const char *LEDGER_VERSION = "0.6.3"; diff --git a/test/bin/hpsh b/test/bin/hpsh new file mode 100755 index 00000000..f1324999 Binary files /dev/null and b/test/bin/hpsh differ diff --git a/test/docker/Dockerfile.ubt.20.04 b/test/docker/Dockerfile.ubt.20.04 index f5ed6b62..ee42f488 100644 --- a/test/docker/Dockerfile.ubt.20.04 +++ b/test/docker/Dockerfile.ubt.20.04 @@ -13,6 +13,6 @@ RUN apt-get update \ && rm -rf /var/lib/apt/lists/* \ && mkdir /usr/local/bin/hotpocket -COPY hpcore hpfs hpws evernode-license.pdf /usr/local/bin/hotpocket/ +COPY hpcore hpfs hpws hpsh evernode-license.pdf /usr/local/bin/hotpocket/ ENTRYPOINT ["/usr/local/bin/hotpocket/hpcore"] diff --git a/test/docker/build.sh b/test/docker/build.sh index 6dfaa004..683b4446 100755 --- a/test/docker/build.sh +++ b/test/docker/build.sh @@ -8,7 +8,7 @@ njsfile="Dockerfile.ubt.20.04-njs" # Prepare build context tmp=$(mktemp -d) -cp $hpcoredir/build/hpcore $hpcoredir/test/bin/{hpfs,hpws,libblake3.so} $hpcoredir/evernode-license.pdf $tmp/ +cp $hpcoredir/build/hpcore $hpcoredir/test/bin/{hpfs,hpws,hpsh,libblake3.so} $hpcoredir/evernode-license.pdf $tmp/ strip $tmp/hpcore # Remove the revision component from hp version to make up the image version. diff --git a/test/local-cluster/Dockerfile b/test/local-cluster/Dockerfile index 7845dc85..6615a6f6 100644 --- a/test/local-cluster/Dockerfile +++ b/test/local-cluster/Dockerfile @@ -2,7 +2,7 @@ FROM evernode/hotpocket:latest-ubt.20.04-njs.20 # Copy (overwrite) the local build outputs into the docker image. -COPY hpcore hpfs hpws evernode-license.pdf /usr/local/bin/hotpocket/ +COPY hpcore hpfs hpws hpsh evernode-license.pdf /usr/local/bin/hotpocket/ ENTRYPOINT ["/usr/local/bin/hotpocket/hpcore"] diff --git a/test/local-cluster/cluster-create.sh b/test/local-cluster/cluster-create.sh index c7603d76..071ad440 100755 --- a/test/local-cluster/cluster-create.sh +++ b/test/local-cluster/cluster-create.sh @@ -16,6 +16,7 @@ fi ncount=$1 loglevel=$2 roundtime=$3 +fallback_enabled=$4 hpcore=$(realpath ../..) iprange="172.1.1" @@ -51,7 +52,7 @@ elif [ "$CONTRACT" = "diag" ]; then # Diagnostic contract else # nodejs echo contract (default) echo "Using nodejs echo contract." pushd $hpcore/examples/nodejs_contract/ > /dev/null 2>&1 - npm install + npm install npm run build-echo popd > /dev/null 2>&1 copyfiles="$hpcore/examples/nodejs_contract/dist/echo-contract/index.js" @@ -65,6 +66,9 @@ fi if [ "$roundtime" = "" ]; then roundtime=1000 fi +if [ "$fallback_enabled" != "true" ]; then + fallback_enabled=false +fi # Delete and recreate 'hpcluster' directory. sudo rm -rf hpcluster > /dev/null 2>&1 @@ -115,7 +119,10 @@ do consensus: { \ ...require('./tmp.json').contract.consensus, \ mode: 'public', \ - roundtime: $roundtime \ + roundtime: $roundtime,\ + fallback: { \ + execute: $fallback_enabled \ + } \ }, \ npl: { \ mode: 'public' \ diff --git a/test/local-cluster/cluster-start.sh b/test/local-cluster/cluster-start.sh index 5c09c737..f52e75eb 100755 --- a/test/local-cluster/cluster-start.sh +++ b/test/local-cluster/cluster-start.sh @@ -14,7 +14,7 @@ fi clusterloc=$(pwd)/hpcluster n=$1 -hpversion=0.6.4 +hpversion=0.6.5 let pubport=8080+$n let peerport=22860+$n diff --git a/test/local-cluster/consensus-test-continuous.sh b/test/local-cluster/consensus-test-continuous.sh index 07e701d5..b6716c41 100644 --- a/test/local-cluster/consensus-test-continuous.sh +++ b/test/local-cluster/consensus-test-continuous.sh @@ -5,7 +5,7 @@ WINDOWSIZE=60 # size of window in seconds to examine for successful consensus ro PIPE=concon.pipe clusterloc=$(pwd)/hpcluster n=1 -hpversion=0.6.4 +hpversion=0.6.5 let pubport=8080+$n while true; do diff --git a/test/local-cluster/consensus-test-loop.sh b/test/local-cluster/consensus-test-loop.sh index 9d5782f6..793fe5a2 100644 --- a/test/local-cluster/consensus-test-loop.sh +++ b/test/local-cluster/consensus-test-loop.sh @@ -3,7 +3,7 @@ clusterloc=$(pwd)/hpcluster n=1 -hpversion=0.6.4 +hpversion=0.6.5 let pubport=8080+$n while true; do CONSENSUS="0" diff --git a/test/local-cluster/rundir.sh b/test/local-cluster/rundir.sh index e95c9058..7a9e0512 100755 --- a/test/local-cluster/rundir.sh +++ b/test/local-cluster/rundir.sh @@ -15,7 +15,7 @@ fi dir=$(realpath $1) dirname=$(basename $dir) n=$1 -hpversion=0.6.4 +hpversion=0.6.5 let pubport=8080