From 817ccd6a88c2faeeb0c4fbeefbed7aebc9f932f1 Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Fri, 19 Jun 2020 21:52:03 +0530 Subject: [PATCH] Implemented contract read requests. (#98) --- CMakeLists.txt | 1 + examples/echo_contract/contract.js | 43 +- examples/hpclient/client.js | 141 ------- examples/hpclient/text-client.js | 22 +- src/conf.cpp | 3 + src/conf.hpp | 1 + src/cons/cons.cpp | 42 +- src/cons/cons.hpp | 4 +- src/jsonschema/usrmsg_helpers.cpp | 612 ++++++++++++++++------------- src/jsonschema/usrmsg_helpers.hpp | 6 + src/main.cpp | 5 +- src/sc.cpp | 148 ++++--- src/sc.hpp | 59 +-- src/usr/read_req.cpp | 141 +++++++ src/usr/read_req.hpp | 16 + src/usr/usr.cpp | 22 +- src/usr/usr.hpp | 81 ++-- 17 files changed, 760 insertions(+), 587 deletions(-) delete mode 100644 examples/hpclient/client.js create mode 100644 src/usr/read_req.cpp create mode 100644 src/usr/read_req.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index ef0b1ab5..8f8ac6f9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -52,6 +52,7 @@ add_executable(hpcore src/p2p/p2p.cpp src/usr/user_session_handler.cpp src/usr/usr.cpp + src/usr/read_req.cpp src/cons/cons.cpp src/cons/ledger_handler.cpp src/state/state_sync.cpp diff --git a/examples/echo_contract/contract.js b/examples/echo_contract/contract.js index 3a8086c4..3bfe710e 100644 --- a/examples/echo_contract/contract.js +++ b/examples/echo_contract/contract.js @@ -4,36 +4,45 @@ process.on('uncaughtException', (err) => { const fs = require('fs') //console.log("===Sample contract started==="); -//console.log("Contract args received from hp: " + input); - let hpargs = JSON.parse(fs.readFileSync(0, 'utf8')); +//console.log(hpargs); // We just save execution args as an example state file change. -fs.appendFileSync("exects.txt", "ts:" + hpargs.ts + "\n"); +if (!hpargs.readonly) + fs.appendFileSync("exects.txt", "ts:" + hpargs.ts + "\n"); Object.keys(hpargs.usrfd).forEach(function (key, index) { let userfds = hpargs.usrfd[key]; if (userfds[0] != -1) { let userinput = fs.readFileSync(userfds[0], 'utf8'); - // Append user input to a state file. - fs.appendFileSync("userinputs.txt", userinput + "\n"); - fs.writeSync(userfds[1], "Echoing: " + userinput); + + // Append user input to a state file if not in read only mode. + if (!hpargs.readonly) + fs.appendFileSync("userinputs.txt", userinput + "\n"); + + if (userinput == "ts") + fs.writeSync(userfds[1], fs.readFileSync("exects.txt")); + else + fs.writeSync(userfds[1], "Echoing: " + userinput); } }); -if (hpargs.nplfd[0] != -1) { - let nplinput = fs.readFileSync(hpargs.nplfd[0], 'utf8'); - console.log("Input received from peers:"); - console.log(nplinput); - fs.writeSync(hpargs.nplfd[1], "Echoing: " + nplinput); -} +if (!hpargs.readonly) { -if (hpargs.hpfd[0] != -1) { - let hpinput = fs.readFileSync(hpargs.hpfd[0], 'utf8'); - console.log("Input received from hp:"); - console.log(hpinput); - fs.writeSync(hpargs.hpfd[1], "Echoing: " + hpinput); + if (hpargs.nplfd[0] != -1) { + let nplinput = fs.readFileSync(hpargs.nplfd[0], 'utf8'); + console.log("Input received from peers:"); + console.log(nplinput); + fs.writeSync(hpargs.nplfd[1], "Echoing: " + nplinput); + } + + if (hpargs.hpfd[0] != -1) { + let hpinput = fs.readFileSync(hpargs.hpfd[0], 'utf8'); + console.log("Input received from hp:"); + console.log(hpinput); + fs.writeSync(hpargs.hpfd[1], "Echoing: " + hpinput); + } } //console.log("===Sample contract ended==="); diff --git a/examples/hpclient/client.js b/examples/hpclient/client.js deleted file mode 100644 index 1d89a542..00000000 --- a/examples/hpclient/client.js +++ /dev/null @@ -1,141 +0,0 @@ -// -// HotPocket client example code adopted from: -// https://github.com/codetsunami/hotpocket/blob/master/hp_client.js -// - -const fs = require('fs') -const ws_api = require('ws'); -const sodium = require('libsodium-wrappers') -const readline = require('readline') - -// sodium has a trigger when it's ready, we will wait and execute from there -sodium.ready.then(main).catch((e) => { console.log(e) }) - - -function main() { - - var keys = sodium.crypto_sign_keypair() - - - // check for client keys - if (!fs.existsSync('.hp_client_keys')) { - keys.privateKey = sodium.to_hex(keys.privateKey) - keys.publicKey = sodium.to_hex(keys.publicKey) - fs.writeFileSync('.hp_client_keys', JSON.stringify(keys)) - } else { - keys = JSON.parse(fs.readFileSync('.hp_client_keys')) - keys.privateKey = Uint8Array.from(Buffer.from(keys.privateKey, 'hex')) - keys.publicKey = Uint8Array.from(Buffer.from(keys.publicKey, 'hex')) - } - - - var server = 'wss://localhost:8080' - - if (process.argv.length == 3) server = 'wss://localhost:' + process.argv[2] - - if (process.argv.length == 4) server = 'wss://' + process.argv[2] + ':' + process.argv[3] - - var ws = new ws_api(server, { - rejectUnauthorized: false - }) - - /* anatomy of a public challenge - { - version: '0.1', - type: 'public_challenge', - challenge: '' - } - */ - - - // if the console ctrl + c's us we should close ws gracefully - process.once('SIGINT', function (code) { - console.log('SIGINT received...'); - ws.close() - }); - - function create_input_container(inp) { - let inp_container = { - nonce: (new Date()).getTime().toString(), - input: Buffer.from(inp).toString('hex'), - max_ledger_seqno: 9999999 - } - let inp_container_bytes = JSON.stringify(inp_container); - let sig_bytes = sodium.crypto_sign_detached(inp_container_bytes, keys.privateKey); - - let signed_inp_container = { - type: "contract_input", - content: inp_container_bytes.toString('hex'), - sig: Buffer.from(sig_bytes).toString('hex') - } - - return JSON.stringify(signed_inp_container); - } - - function create_status_request() { - let statreq = { type: 'stat' } - return JSON.stringify(statreq); - } - - ws.on('message', (m) => { - console.log("-----Received raw message-----") - console.log(m.toString()) - console.log("------------------------------") - - try { - m = JSON.parse(m) - } catch (e) { - return - } - - if (m.type != 'public_challenge') return - - console.log("Received challenge message") - console.log(m) - - let pkhex = 'ed' + Buffer.from(keys.publicKey).toString('hex'); - console.log('My public key is: ' + pkhex); - - // sign the challenge and send back the response - var sigbytes = sodium.crypto_sign_detached(m.challenge, keys.privateKey); - var response = { - type: 'challenge_resp', - challenge: m.challenge, - sig: Buffer.from(sigbytes).toString('hex'), - pubkey: pkhex - } - - console.log('Sending challenge response.'); - ws.send(JSON.stringify(response)) - - // start listening for stdin - const rl = readline.createInterface({ - input: process.stdin, - output: process.stdout - }); - - // Capture user input from the console. - var input_pump = () => { - rl.question('\nProvide an input: ', (inp) => { - - let msgtosend = ""; - - if (inp == "stat") - msgtosend = create_status_request(); - else - msgtosend = create_input_container(inp); - - console.log("Sending message: " + msgtosend); - ws.send(msgtosend) - - input_pump() - }) - } - input_pump() - - }); - - ws.on('close', () => { - console.log('Server disconnected.'); - }); -} diff --git a/examples/hpclient/text-client.js b/examples/hpclient/text-client.js index 8749e2c1..f070e349 100644 --- a/examples/hpclient/text-client.js +++ b/examples/hpclient/text-client.js @@ -72,6 +72,19 @@ function main() { return JSON.stringify(signed_inp_container); } + function create_read_request_container(inp) { + + if (inp.length == 0) + return ""; + + let container = { + type: "contract_read_request", + content: Buffer.from(inp).toString('hex'), + } + + return JSON.stringify(container); + } + function create_status_request() { let statreq = { type: 'stat' } return JSON.stringify(statreq); @@ -108,12 +121,15 @@ function main() { if (inp == "stat") msgtosend = create_status_request(); + else if (inp.startsWith("read ")) + msgtosend = create_read_request_container(inp.substr(5)); else msgtosend = create_input_container(inp); - ws.send(msgtosend) + if (msgtosend.length > 0) + ws.send(msgtosend) - input_pump() + input_pump(); }) } input_pump(); @@ -131,7 +147,7 @@ function main() { if (m.type == 'public_challenge') { handle_public_challange(m); } - else if (m.type == 'contract_output') { + else if (m.type == 'contract_output' || m.type == 'contract_read_response') { console.log(Buffer.from(m.content, 'hex').toString()); } else if (m.type == 'request_status_result') { diff --git a/src/conf.cpp b/src/conf.cpp index 7a9633f6..c41da3ce 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -80,6 +80,8 @@ int create_contract() boost::filesystem::create_directories(ctx.config_dir); boost::filesystem::create_directories(ctx.hist_dir); boost::filesystem::create_directories(ctx.state_rw_dir); + boost::filesystem::create_directories(ctx.state_read_req_dir); + //Create config file with default settings. @@ -147,6 +149,7 @@ void set_contract_dir_paths(std::string exepath, std::string basedir) ctx.hist_dir = basedir + "/hist"; ctx.state_dir = basedir + "/state"; ctx.state_rw_dir = ctx.state_dir + "/rw"; + ctx.state_read_req_dir = ctx.state_dir + "/rr"; ctx.log_dir = basedir + "/log"; } diff --git a/src/conf.hpp b/src/conf.hpp index 84066c6f..d49ea0cd 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -35,6 +35,7 @@ struct contract_ctx std::string hist_dir; // Contract ledger history dir full path std::string state_dir; // Contract state maintenence path (hpfs path) std::string state_rw_dir; // Contract executation read/write state path. + std::string state_read_req_dir; // Contract executation state path for read requests. std::string log_dir; // Contract log dir full path std::string config_dir; // Contract config dir full path std::string config_file; // Full path to the contract config file diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index 5c3add80..0bf20e3e 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -56,6 +56,9 @@ namespace cons ctx.stage_time = conf::cfg.roundtime / 5; ctx.stage_reset_wait_threshold = conf::cfg.roundtime / 10; + ctx.contract_ctx.args.state_dir = conf::ctx.state_rw_dir; + ctx.contract_ctx.args.readonly = false; + init_success = true; return 0; } @@ -65,6 +68,8 @@ namespace cons */ void deinit() { + // Stop the contract if running. + sc::stop(ctx.contract_ctx); } int run_consensus() @@ -757,18 +762,25 @@ namespace cons // Send any output from the previous consensus round to locally connected users. dispatch_user_outputs(cons_prop); - sc::contract_bufmap_t useriobufmap; + // Execute the contract + { + sc::contract_execution_args &args = ctx.contract_ctx.args; + args.time = cons_prop.time; - sc::contract_iobuf_pair nplbufpair; - nplbufpair.inputs.splice(nplbufpair.inputs.end(), ctx.candidate_npl_messages); + // Populate npl bufs and user bufs. + args.nplbufs.inputs.splice(args.nplbufs.inputs.end(), ctx.candidate_npl_messages); + feed_user_inputs_to_contract_bufmap(args.userbufs, cons_prop); + // TODO: Do something usefull with HP<-->SC channel. - feed_user_inputs_to_contract_bufmap(useriobufmap, cons_prop); + if (sc::execute_contract(ctx.contract_ctx) == -1) + return -1; - if (run_contract_binary(cons_prop.time, useriobufmap, nplbufpair) == -1) - return -1; + ctx.state = args.post_execution_state_hash; + extract_user_outputs_from_contract_bufmap(args.userbufs); + broadcast_npl_output(args.nplbufs.output); - extract_user_outputs_from_contract_bufmap(useriobufmap); - broadcast_npl_output(nplbufpair.output); + sc::clear_args(args); + } return 0; } @@ -895,20 +907,6 @@ namespace cons } } - /** - * Executes the smart contract with the specified time and provided I/O buf maps. - * @param time_now The time that must be passed on to the contract. - * @param useriobufmap The contract bufmap which holds user I/O buffers. - */ - int run_contract_binary(const int64_t time_now, sc::contract_bufmap_t &useriobufmap, sc::contract_iobuf_pair &nplbufpair) - { - // todo:implement exchange of hpsc bufs - sc::contract_iobuf_pair hpscbufpair; - return sc::exec_contract( - sc::contract_exec_args(time_now, useriobufmap, nplbufpair, hpscbufpair), - ctx.state); - } - /** * Increment voting table counter. * @param counter The counter map in which a vote should be incremented. diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp index 05cafb48..252784f2 100644 --- a/src/cons/cons.hpp +++ b/src/cons/cons.hpp @@ -7,6 +7,7 @@ #include "../p2p/p2p.hpp" #include "../usr/user_input.hpp" #include "../hpfs/h32.hpp" +#include "../sc.hpp" #include "ledger_handler.hpp" namespace cons @@ -87,6 +88,7 @@ struct consensus_context uint16_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage. std::mutex state_sync_lock; + sc::execution_context contract_ctx; bool is_shutting_down = false; consensus_context() @@ -153,8 +155,6 @@ void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap); void broadcast_npl_output(std::string &output); -int run_contract_binary(const int64_t time_now, sc::contract_bufmap_t &useriobufmap, sc::contract_iobuf_pair &nplbufpair); - template void increment(std::map &counter, const T &candidate); diff --git a/src/jsonschema/usrmsg_helpers.cpp b/src/jsonschema/usrmsg_helpers.cpp index ba26952c..38a37a4b 100644 --- a/src/jsonschema/usrmsg_helpers.cpp +++ b/src/jsonschema/usrmsg_helpers.cpp @@ -8,35 +8,35 @@ namespace jsonschema::usrmsg { -// User JSON message schema version -constexpr const char *SCHEMA_VERSION = "0.1"; + // User JSON message schema version + constexpr const char *SCHEMA_VERSION = "0.1"; -// Separators -constexpr const char *SEP_COMMA = "\",\""; -constexpr const char *SEP_COLON = "\":\""; -constexpr const char *SEP_COMMA_NOQUOTE = ",\""; -constexpr const char *SEP_COLON_NOQUOTE = "\":"; + // Separators + constexpr const char *SEP_COMMA = "\",\""; + constexpr const char *SEP_COLON = "\":\""; + constexpr const char *SEP_COMMA_NOQUOTE = ",\""; + constexpr const char *SEP_COLON_NOQUOTE = "\":"; -// Message field names -const char *const FLD_VERSION = "version"; -constexpr const char *FLD_TYPE = "type"; -constexpr const char *FLD_CHALLENGE = "challenge"; -constexpr const char *FLD_SIG = "sig"; -constexpr const char *FLD_PUBKEY = "pubkey"; -constexpr const char *FLD_INPUT = "input"; -constexpr const char *FLD_MAX_LED_SEQ = "max_ledger_seqno"; -constexpr const char *FLD_CONTENT = "content"; -constexpr const char *FLD_NONCE = "nonce"; -constexpr const char *FLD_LCL = "lcl"; -constexpr const char *FLD_LCL_SEQ = "lcl_seqno"; -constexpr const char *FLD_STATUS = "status"; -constexpr const char *FLD_ORIGIN = "origin"; -constexpr const char *FLD_REASON = "reason"; + // Message field names + const char *const FLD_VERSION = "version"; + constexpr const char *FLD_TYPE = "type"; + constexpr const char *FLD_CHALLENGE = "challenge"; + constexpr const char *FLD_SIG = "sig"; + constexpr const char *FLD_PUBKEY = "pubkey"; + constexpr const char *FLD_INPUT = "input"; + constexpr const char *FLD_MAX_LED_SEQ = "max_ledger_seqno"; + constexpr const char *FLD_CONTENT = "content"; + constexpr const char *FLD_NONCE = "nonce"; + constexpr const char *FLD_LCL = "lcl"; + constexpr const char *FLD_LCL_SEQ = "lcl_seqno"; + constexpr const char *FLD_STATUS = "status"; + constexpr const char *FLD_ORIGIN = "origin"; + constexpr const char *FLD_REASON = "reason"; -// Length of user random challenge bytes. -const size_t CHALLENGE_LEN = 16; + // Length of user random challenge bytes. + const size_t CHALLENGE_LEN = 16; -/** + /** * Constructs user challenge message json and the challenge string required for * initial user challenge handshake. This gets called when a user establishes * a web socket connection to HP. @@ -50,40 +50,40 @@ const size_t CHALLENGE_LEN = 16; * } * @param challengehex String reference to copy the generated hex challenge string into. */ -void create_user_challenge(std::string &msg, std::string &challengehex) -{ - // Use libsodium to generate the random challenge bytes. - unsigned char challenge_bytes[CHALLENGE_LEN]; - randombytes_buf(challenge_bytes, CHALLENGE_LEN); + void create_user_challenge(std::string &msg, std::string &challengehex) + { + // Use libsodium to generate the random challenge bytes. + unsigned char challenge_bytes[CHALLENGE_LEN]; + randombytes_buf(challenge_bytes, CHALLENGE_LEN); - // We pass the hex challenge string separately to the caller even though - // we also include it in the challenge msg as well. + // We pass the hex challenge string separately to the caller even though + // we also include it in the challenge msg as well. - util::bin2hex(challengehex, challenge_bytes, CHALLENGE_LEN); + util::bin2hex(challengehex, challenge_bytes, CHALLENGE_LEN); - // Construct the challenge msg json. - // We do not use RapidJson here in favour of performance because this is a simple json message. + // Construct the challenge msg json. + // We do not use RapidJson here in favour of performance because this is a simple json message. - // Since we know the rough size of the challenge message we reserve adequate amount for the holder. - // Only Hot Pocket version number is variable length. Therefore message size is roughly 90 bytes - // so allocating 128bytes for heap padding. - msg.reserve(128); - msg.append("{\"") - .append(FLD_VERSION) - .append(SEP_COLON) - .append(SCHEMA_VERSION) - .append(SEP_COMMA) - .append(FLD_TYPE) - .append(SEP_COLON) - .append(MSGTYPE_CHALLENGE) - .append(SEP_COMMA) - .append(FLD_CHALLENGE) - .append(SEP_COLON) - .append(challengehex) - .append("\"}"); -} + // Since we know the rough size of the challenge message we reserve adequate amount for the holder. + // Only Hot Pocket version number is variable length. Therefore message size is roughly 90 bytes + // so allocating 128bytes for heap padding. + msg.reserve(128); + msg.append("{\"") + .append(FLD_VERSION) + .append(SEP_COLON) + .append(SCHEMA_VERSION) + .append(SEP_COMMA) + .append(FLD_TYPE) + .append(SEP_COLON) + .append(MSGTYPE_CHALLENGE) + .append(SEP_COMMA) + .append(FLD_CHALLENGE) + .append(SEP_COLON) + .append(challengehex) + .append("\"}"); + } -/** + /** * Constructs a status response message. * @param msg String reference to copy the generated json message string into. * Message format: @@ -93,25 +93,25 @@ void create_user_challenge(std::string &msg, std::string &challengehex) * "lcl_seqno": * } */ -void create_status_response(std::string &msg) -{ - msg.reserve(128); - msg.append("{\"") - .append(FLD_TYPE) - .append(SEP_COLON) - .append(MSGTYPE_STAT_RESP) - .append(SEP_COMMA) - .append(FLD_LCL) - .append(SEP_COLON) - .append(cons::ctx.lcl) - .append(SEP_COMMA) - .append(FLD_LCL_SEQ) - .append(SEP_COLON_NOQUOTE) - .append(std::to_string(cons::ctx.led_seq_no)) - .append("}"); -} + void create_status_response(std::string &msg) + { + msg.reserve(128); + msg.append("{\"") + .append(FLD_TYPE) + .append(SEP_COLON) + .append(MSGTYPE_STAT_RESP) + .append(SEP_COMMA) + .append(FLD_LCL) + .append(SEP_COLON) + .append(cons::ctx.lcl) + .append(SEP_COMMA) + .append(FLD_LCL_SEQ) + .append(SEP_COLON_NOQUOTE) + .append(std::to_string(cons::ctx.led_seq_no)) + .append("}"); + } -/** + /** * Constructs a request result message. * @param msg String reference to copy the generated json message string into. * Message format: @@ -129,52 +129,83 @@ void create_status_response(std::string &msg) * @param origin_type Original message type which generated this result. * @param origin_extra_data Extra field data string to be injected into origin. */ -void create_request_status_result(std::string &msg, std::string_view status, std::string_view reason, std::string_view origin_type, std::string_view origin_extra_data) -{ - msg.reserve(128); - msg.append("{\"") - .append(FLD_TYPE) - .append(SEP_COLON) - .append(MSGTYPE_REQUEST_STATUS_RESULT) - .append(SEP_COMMA) - .append(FLD_STATUS) - .append(SEP_COLON) - .append(status) - .append(SEP_COMMA) - .append(FLD_REASON) - .append(SEP_COLON) - .append(reason) - .append(SEP_COMMA) - .append(FLD_ORIGIN) - .append("\":{\"") - .append(FLD_TYPE) - .append(SEP_COLON) - .append(origin_type) - .append("\"") - .append(origin_extra_data) - .append("}}"); -} + void create_request_status_result(std::string &msg, std::string_view status, std::string_view reason, std::string_view origin_type, std::string_view origin_extra_data) + { + msg.reserve(128); + msg.append("{\"") + .append(FLD_TYPE) + .append(SEP_COLON) + .append(MSGTYPE_REQUEST_STATUS_RESULT) + .append(SEP_COMMA) + .append(FLD_STATUS) + .append(SEP_COLON) + .append(status) + .append(SEP_COMMA) + .append(FLD_REASON) + .append(SEP_COLON) + .append(reason) + .append(SEP_COMMA) + .append(FLD_ORIGIN) + .append("\":{\"") + .append(FLD_TYPE) + .append(SEP_COLON) + .append(origin_type) + .append("\"") + .append(origin_extra_data) + .append("}}"); + } -/** + /** * Returns concatenated string for contract input origin data fields to be included in request result. * @param sig Binary singature of the original contract input. */ -std::string origin_data_for_contract_input(std::string_view sig) -{ - std::string sighex; - util::bin2hex(sighex, reinterpret_cast(sig.data()), sig.length()); + std::string origin_data_for_contract_input(std::string_view sig) + { + std::string sighex; + util::bin2hex(sighex, reinterpret_cast(sig.data()), sig.length()); - std::string extra_data; - extra_data.append(",\"") - .append(FLD_SIG) - .append(SEP_COLON) - .append(sighex) - .append("\""); + std::string extra_data; + extra_data.append(",\"") + .append(FLD_SIG) + .append(SEP_COLON) + .append(sighex) + .append("\""); - return extra_data; -} + return extra_data; + } -/** + + /** + * Constructs a contract read response message. + * @param msg String reference to copy the generated json message string into. + * Message format: + * { + * "type": "contract_read_response", + * "content": "" + * } + * @param content The contract binary output content to be put in the message. + */ + void create_contract_read_response_container(std::string &msg, std::string_view content) + { + std::string contenthex; + util::bin2hex( + contenthex, + reinterpret_cast(content.data()), + content.length()); + + msg.reserve(256); + msg.append("{\"") + .append(FLD_TYPE) + .append(SEP_COLON) + .append(MSGTYPE_CONTRACT_READ_RESPONSE) + .append(SEP_COMMA) + .append(FLD_CONTENT) + .append(SEP_COLON) + .append(contenthex) + .append("\"}"); + } + + /** * Constructs a contract output container message. * @param msg String reference to copy the generated json message string into. * Message format: @@ -186,35 +217,35 @@ std::string origin_data_for_contract_input(std::string_view sig) * } * @param content The contract binary output content to be put in the message. */ -void create_contract_output_container(std::string &msg, std::string_view content) -{ - std::string contenthex; - util::bin2hex( - contenthex, - reinterpret_cast(content.data()), - content.length()); + void create_contract_output_container(std::string &msg, std::string_view content) + { + std::string contenthex; + util::bin2hex( + contenthex, + reinterpret_cast(content.data()), + content.length()); - msg.reserve(256); - msg.append("{\"") - .append(FLD_TYPE) - .append(SEP_COLON) - .append(MSGTYPE_CONTRACT_OUTPUT) - .append(SEP_COMMA) - .append(FLD_LCL) - .append(SEP_COLON) - .append(cons::ctx.lcl) - .append(SEP_COMMA) - .append(FLD_LCL_SEQ) - .append(SEP_COLON_NOQUOTE) - .append(std::to_string(cons::ctx.led_seq_no)) - .append(SEP_COMMA_NOQUOTE) - .append(FLD_CONTENT) - .append(SEP_COLON) - .append(contenthex) - .append("\"}"); -} + msg.reserve(256); + msg.append("{\"") + .append(FLD_TYPE) + .append(SEP_COLON) + .append(MSGTYPE_CONTRACT_OUTPUT) + .append(SEP_COMMA) + .append(FLD_LCL) + .append(SEP_COLON) + .append(cons::ctx.lcl) + .append(SEP_COMMA) + .append(FLD_LCL_SEQ) + .append(SEP_COLON_NOQUOTE) + .append(std::to_string(cons::ctx.led_seq_no)) + .append(SEP_COMMA_NOQUOTE) + .append(FLD_CONTENT) + .append(SEP_COLON) + .append(contenthex) + .append("\"}"); + } -/** + /** * Verifies the user challenge response with the original challenge issued to the user * and the user public key contained in the response. * @@ -230,57 +261,100 @@ void create_contract_output_container(std::string &msg, std::string_view content * @param original_challenge The original hex challenge string issued to the user. * @return 0 if challenge response is verified. -1 if challenge not met or an error occurs. */ -int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string_view response, std::string_view original_challenge) -{ - rapidjson::Document d; - if (parse_user_message(d, response) != 0) - return -1; - - // Validate msg type. - if (d[FLD_TYPE] != MSGTYPE_CHALLENGE_RESP) + int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string_view response, std::string_view original_challenge) { - LOG_DBG << "User challenge response type invalid. 'challenge_response' expected."; - return -1; + rapidjson::Document d; + if (parse_user_message(d, response) != 0) + return -1; + + // Validate msg type. + if (d[FLD_TYPE] != MSGTYPE_CHALLENGE_RESP) + { + LOG_DBG << "User challenge response type invalid. 'challenge_response' expected."; + return -1; + } + + // Compare the response challenge string with the original issued challenge. + if (!d.HasMember(FLD_CHALLENGE) || d[FLD_CHALLENGE] != original_challenge.data()) + { + LOG_DBG << "User challenge response challenge invalid."; + return -1; + } + + // Check for the 'sig' field existence. + if (!d.HasMember(FLD_SIG) || !d[FLD_SIG].IsString()) + { + LOG_DBG << "User challenge response signature invalid."; + return -1; + } + + // Check for the 'pubkey' field existence. + if (!d.HasMember(FLD_PUBKEY) || !d[FLD_PUBKEY].IsString()) + { + LOG_DBG << "User challenge response public key invalid."; + return -1; + } + + // Verify the challenge signature. We do this last due to signature verification cost. + std::string_view pubkeysv = util::getsv(d[FLD_PUBKEY]); + if (crypto::verify_hex( + original_challenge, + util::getsv(d[FLD_SIG]), + pubkeysv) != 0) + { + LOG_DBG << "User challenge response signature verification failed."; + return -1; + } + + extracted_pubkeyhex = pubkeysv; + + return 0; } - // Compare the response challenge string with the original issued challenge. - if (!d.HasMember(FLD_CHALLENGE) || d[FLD_CHALLENGE] != original_challenge.data()) + /** + * Extracts a contract read request message sent by user. + * + * @param extracted_content The content to be passed to the contract, extracted from the message. +* @param d The json document holding the read request message. + * Accepted signed input container format: + * { + * "type": "contract_read_request", + * "content": "" + * } + * @return 0 on successful extraction. -1 for failure. + */ + int extract_read_request(std::string &extracted_content, const rapidjson::Document &d) { - LOG_DBG << "User challenge response challenge invalid."; - return -1; + if (!d.HasMember(FLD_CONTENT)) + { + LOG_DBG << "Read request required fields missing."; + return -1; + } + + if (!d[FLD_CONTENT].IsString()) + { + LOG_DBG << "Read request invalid field values."; + return -1; + } + + std::string_view contenthex(d[FLD_CONTENT].GetString(), d[FLD_CONTENT].GetStringLength()); + + std::string content; + content.resize(contenthex.length() / 2); + if (util::hex2bin( + reinterpret_cast(content.data()), + content.length(), + contenthex) != 0) + { + LOG_DBG << "Read request format invalid."; + return -1; + } + + extracted_content = std::move(content); + return 0; } - // Check for the 'sig' field existence. - if (!d.HasMember(FLD_SIG) || !d[FLD_SIG].IsString()) - { - LOG_DBG << "User challenge response signature invalid."; - return -1; - } - - // Check for the 'pubkey' field existence. - if (!d.HasMember(FLD_PUBKEY) || !d[FLD_PUBKEY].IsString()) - { - LOG_DBG << "User challenge response public key invalid."; - return -1; - } - - // Verify the challenge signature. We do this last due to signature verification cost. - std::string_view pubkeysv = util::getsv(d[FLD_PUBKEY]); - if (crypto::verify_hex( - original_challenge, - util::getsv(d[FLD_SIG]), - pubkeysv) != 0) - { - LOG_DBG << "User challenge response signature verification failed."; - return -1; - } - - extracted_pubkeyhex = pubkeysv; - - return 0; -} - -/** + /** * Extracts a signed input container message sent by user. * * @param extracted_content The content extracted from the message. @@ -294,37 +368,37 @@ int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string * } * @return 0 on successful extraction. -1 for failure. */ -int extract_signed_input_container( - std::string &extracted_content, std::string &extracted_sig, const rapidjson::Document &d) -{ - if (!d.HasMember(FLD_CONTENT) || !d.HasMember(FLD_SIG)) + int extract_signed_input_container( + std::string &extracted_content, std::string &extracted_sig, const rapidjson::Document &d) { - LOG_DBG << "User signed input required fields missing."; - return -1; + if (!d.HasMember(FLD_CONTENT) || !d.HasMember(FLD_SIG)) + { + LOG_DBG << "User signed input required fields missing."; + return -1; + } + + if (!d[FLD_CONTENT].IsString() || !d[FLD_SIG].IsString()) + { + LOG_DBG << "User signed input invalid field values."; + return -1; + } + + // We do not verify the signature of the content here since we need to let each node + // (including self) to verify that individually after we broadcast the NUP proposal. + + const std::string content(d[FLD_CONTENT].GetString(), d[FLD_CONTENT].GetStringLength()); + + const std::string_view sighex(d[FLD_SIG].GetString(), d[FLD_SIG].GetStringLength()); + std::string sig; + sig.resize(crypto_sign_ed25519_BYTES); + util::hex2bin(reinterpret_cast(sig.data()), sig.length(), sighex); + + extracted_content = std::move(content); + extracted_sig = std::move(sig); + return 0; } - if (!d[FLD_CONTENT].IsString() || !d[FLD_SIG].IsString()) - { - LOG_DBG << "User signed input invalid field values."; - return -1; - } - - // We do not verify the signature of the content here since we need to let each node - // (including self) to verify that individually after we broadcast the NUP proposal. - - const std::string content(d[FLD_CONTENT].GetString(), d[FLD_CONTENT].GetStringLength()); - - const std::string_view sighex(d[FLD_SIG].GetString(), d[FLD_SIG].GetStringLength()); - std::string sig; - sig.resize(crypto_sign_ed25519_BYTES); - util::hex2bin(reinterpret_cast(sig.data()), sig.length(), sighex); - - extracted_content = std::move(content); - extracted_sig = std::move(sig); - return 0; -} - -/** + /** * Extract the individual components of a given input container json. * @param nonce The extracted nonce. * @param input The extracted input. @@ -337,49 +411,49 @@ int extract_signed_input_container( * } * @return 0 on succesful extraction. -1 on failure. */ -int extract_input_container(std::string &nonce, std::string &input, uint64_t &max_ledger_seqno, std::string_view contentjson) -{ - rapidjson::Document d; - d.Parse(contentjson.data()); - if (d.HasParseError()) + int extract_input_container(std::string &nonce, std::string &input, uint64_t &max_ledger_seqno, std::string_view contentjson) { - LOG_DBG << "User input container json parsing failed."; - return -1; + rapidjson::Document d; + d.Parse(contentjson.data()); + if (d.HasParseError()) + { + LOG_DBG << "User input container json parsing failed."; + return -1; + } + + if (!d.HasMember(FLD_NONCE) || !d.HasMember(FLD_INPUT) || !d.HasMember(FLD_MAX_LED_SEQ)) + { + LOG_DBG << "User input container required fields missing."; + return -1; + } + + if (!d[FLD_NONCE].IsString() || !d[FLD_INPUT].IsString() || !d[FLD_MAX_LED_SEQ].IsUint64()) + { + LOG_DBG << "User input container invalid field values."; + return -1; + } + + const rapidjson::Value &inputval = d[FLD_INPUT]; + std::string_view inputhex(inputval.GetString(), inputval.GetStringLength()); + + // Convert hex input to binary. + input.resize(inputhex.length() / 2); + if (util::hex2bin( + reinterpret_cast(input.data()), + input.length(), + inputhex) != 0) + { + LOG_DBG << "Contract input format invalid."; + return -1; + } + + nonce = d[FLD_NONCE].GetString(); + max_ledger_seqno = d[FLD_MAX_LED_SEQ].GetUint64(); + + return 0; } - if (!d.HasMember(FLD_NONCE) || !d.HasMember(FLD_INPUT) || !d.HasMember(FLD_MAX_LED_SEQ)) - { - LOG_DBG << "User input container required fields missing."; - return -1; - } - - if (!d[FLD_NONCE].IsString() || !d[FLD_INPUT].IsString() || !d[FLD_MAX_LED_SEQ].IsUint64()) - { - LOG_DBG << "User input container invalid field values."; - return -1; - } - - const rapidjson::Value &inputval = d[FLD_INPUT]; - std::string_view inputhex(inputval.GetString(), inputval.GetStringLength()); - - // Convert hex input to binary. - input.resize(inputhex.length() / 2); - if (util::hex2bin( - reinterpret_cast(input.data()), - input.length(), - inputhex) != 0) - { - LOG_DBG << "Contract input format invalid."; - return -1; - } - - nonce = d[FLD_NONCE].GetString(); - max_ledger_seqno = d[FLD_MAX_LED_SEQ].GetUint64(); - - return 0; -} - -/** + /** * Parses a json message sent by a user. * @param d RapidJson document to which the parsed json should be loaded. * @param message The message to parse. @@ -390,26 +464,26 @@ int extract_input_container(std::string &nonce, std::string &input, uint64_t &ma * } * @return 0 on successful parsing. -1 for failure. */ -int parse_user_message(rapidjson::Document &d, std::string_view message) -{ - // We load response raw bytes into json document. - // Because we project the response message directly from the binary socket buffer in a zero-copy manner, the response - // string is not null terminated. 'kParseStopWhenDoneFlag' avoids rapidjson error in this case. - d.Parse(message.data()); - if (d.HasParseError()) + int parse_user_message(rapidjson::Document &d, std::string_view message) { - LOG_DBG << "User json message parsing failed."; - return -1; - } + // We load response raw bytes into json document. + // Because we project the response message directly from the binary socket buffer in a zero-copy manner, the response + // string is not null terminated. 'kParseStopWhenDoneFlag' avoids rapidjson error in this case. + d.Parse(message.data()); + if (d.HasParseError()) + { + LOG_DBG << "User json message parsing failed."; + return -1; + } - // Check existence of msg type field. - if (!d.HasMember(FLD_TYPE) || !d[FLD_TYPE].IsString()) - { - LOG_DBG << "User json message 'type' missing or invalid."; - return -1; - } + // Check existence of msg type field. + if (!d.HasMember(FLD_TYPE) || !d[FLD_TYPE].IsString()) + { + LOG_DBG << "User json message 'type' missing or invalid."; + return -1; + } - return 0; -} + return 0; + } } // namespace jsonschema::usrmsg \ No newline at end of file diff --git a/src/jsonschema/usrmsg_helpers.hpp b/src/jsonschema/usrmsg_helpers.hpp index 215d6d5c..782cb946 100644 --- a/src/jsonschema/usrmsg_helpers.hpp +++ b/src/jsonschema/usrmsg_helpers.hpp @@ -12,6 +12,8 @@ extern const char* const FLD_TYPE; // Message types constexpr const char* MSGTYPE_CHALLENGE = "public_challenge"; constexpr const char* MSGTYPE_CHALLENGE_RESP = "challenge_resp"; +constexpr const char* MSGTYPE_CONTRACT_READ_REQUEST = "contract_read_request"; +constexpr const char* MSGTYPE_CONTRACT_READ_RESPONSE = "contract_read_response"; constexpr const char* MSGTYPE_CONTRACT_INPUT = "contract_input"; constexpr const char* MSGTYPE_CONTRACT_OUTPUT = "contract_output"; constexpr const char* MSGTYPE_STAT = "stat"; @@ -36,10 +38,14 @@ void create_request_status_result(std::string &msg, std::string_view status, std std::string origin_data_for_contract_input(std::string_view sig); +void create_contract_read_response_container(std::string &msg, std::string_view content); + void create_contract_output_container(std::string &msg, std::string_view content); int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string_view response, std::string_view original_challenge); +int extract_read_request(std::string &extracted_content, const rapidjson::Document &d); + int extract_signed_input_container(std::string &extracted_content, std::string &extracted_sig, const rapidjson::Document &d); int extract_input_container(std::string &nonce, std::string &input, uint64_t &max_ledger_seqno, std::string_view contentjson); diff --git a/src/main.cpp b/src/main.cpp index e5138c7c..aed0e732 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -9,6 +9,7 @@ #include "sc.hpp" #include "hplog.hpp" #include "usr/usr.hpp" +#include "usr/read_req.hpp" #include "p2p/p2p.hpp" #include "cons/cons.hpp" #include "hpfs/hpfs.hpp" @@ -67,9 +68,9 @@ int parse_cmd(int argc, char **argv) void deinit() { cons::deinit(); - sc::deinit(); state_sync::deinit(); state_serve::deinit(); + read_req::deinit(); usr::deinit(); p2p::deinit(); hpfs::deinit(); @@ -192,7 +193,7 @@ int main(int argc, char **argv) LOG_INFO << "Operating mode: " << (conf::cfg.startup_mode == conf::OPERATING_MODE::OBSERVER ? "Observer" : "Proposer"); - if (hpfs::init() != 0 || p2p::init() != 0 || usr::init() != 0 || + if (hpfs::init() != 0 || p2p::init() != 0 || usr::init() != 0 || read_req::init() != 0 || state_serve::init() != 0 || state_sync::init() != 0 || cons::init() != 0) { deinit(); diff --git a/src/sc.cpp b/src/sc.cpp index b9dd01c9..caf0ed18 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -9,22 +9,24 @@ namespace sc { - execution_context ctx; - /** - * Executes the contract process and passes the specified arguments. + * Executes the contract process and passes the specified context arguments. * @return 0 on successful process creation. -1 on failure or contract process is already running. */ - int exec_contract(const contract_exec_args &args, hpfs::h32 &state_hash) + int execute_contract(execution_context &ctx) { // Start the hpfs rw session before starting the contract process. - if (start_hpfs_rw_session() != 0) + if (start_hpfs_rw_session(ctx) != 0) return -1; // Setup io pipes and feed all inputs to them. - create_iopipes_for_fdmap(ctx.userfds, args.userbufs); - create_iopipes(ctx.nplfds, !args.nplbuff.inputs.empty()); - create_iopipes(ctx.hpscfds, !args.hpscbufs.inputs.empty()); + create_iopipes_for_fdmap(ctx.userfds, ctx.args.userbufs); + + if (!ctx.args.readonly) + { + create_iopipes(ctx.nplfds, !ctx.args.nplbufs.inputs.empty()); + create_iopipes(ctx.hpscfds, !ctx.args.hpscbufs.inputs.empty()); + } int ret = 0; const pid_t pid = fork(); @@ -34,19 +36,20 @@ namespace sc ctx.contract_pid = pid; // Close all fds unused by HP process. - close_unused_fds(true); + close_unused_fds(ctx, true); // Start the contract output collection thread. - ctx.output_fetcher_thread = std::thread(fetch_outputs, std::ref(args)); + ctx.output_fetcher_thread = std::thread(fetch_outputs, std::ref(ctx)); // Write the inputs into the contract process. - if (feed_inputs(args) != 0) + if (feed_inputs(ctx) != 0) goto failure; // Wait for child process (contract process) to complete execution. const int presult = await_process_execution(ctx.contract_pid); ctx.contract_pid = 0; - LOG_DBG << "Contract process ended."; + + LOG_DBG << "Contract process ended." << (ctx.args.readonly ? " (rdonly)" : ""); // Wait for the output collection thread to gracefully stop. ctx.output_fetcher_thread.join(); @@ -61,18 +64,18 @@ namespace sc { // Contract process. util::unmask_signal(); - + // Set up the process environment and overlay the contract binary program with execv(). // Close all fds unused by SC process. - close_unused_fds(false); + close_unused_fds(ctx, false); // Write the contract input message from HotPocket to the stdin (0) of the contract process. - write_contract_args(args); + write_contract_args(ctx); - LOG_DBG << "Starting contract process..."; + LOG_DBG << "Starting contract process..." << (ctx.args.readonly ? " (rdonly)" : ""); - const bool using_appbill = !conf::cfg.appbill.empty(); + const bool using_appbill = !ctx.args.readonly && !conf::cfg.appbill.empty(); int len = conf::cfg.runtime_binexec_args.size() + 1; if (using_appbill) len += conf::cfg.runtime_appbill_args.size(); @@ -81,22 +84,24 @@ namespace sc char *execv_args[len]; int j = 0; if (using_appbill) + { for (int i = 0; i < conf::cfg.runtime_appbill_args.size(); i++, j++) execv_args[i] = conf::cfg.runtime_appbill_args[i].data(); + } for (int i = 0; i < conf::cfg.runtime_binexec_args.size(); i++, j++) execv_args[j] = conf::cfg.runtime_binexec_args[i].data(); execv_args[len - 1] = NULL; - chdir(conf::ctx.state_rw_dir.c_str()); + chdir(ctx.args.state_dir.c_str()); int ret = execv(execv_args[0], execv_args); - LOG_ERR << errno << ": Contract process execv failed."; + LOG_ERR << errno << ": Contract process execv failed." << (ctx.args.readonly ? " (rdonly)" : ""); exit(1); } else { - LOG_ERR << "fork() failed when starting contract process."; + LOG_ERR << "fork() failed when starting contract process." << (ctx.args.readonly ? " (rdonly)" : ""); goto failure; } @@ -105,10 +110,13 @@ namespace sc ret = -1; success: - stop_hpfs_rw_session(state_hash); + stop_hpfs_rw_session(ctx); cleanup_fdmap(ctx.userfds); - cleanup_vectorfds(ctx.hpscfds); - cleanup_vectorfds(ctx.nplfds); + if (!ctx.args.readonly) + { + cleanup_vectorfds(ctx.hpscfds); + cleanup_vectorfds(ctx.nplfds); + } return ret; } @@ -132,30 +140,29 @@ namespace sc /** * Starts the hpfs read/write state filesystem. */ - int start_hpfs_rw_session() + int start_hpfs_rw_session(execution_context &ctx) { - LOG_DBG << "Starting hpfs rw session..."; - if (hpfs::start_fs_session(ctx.hpfs_pid, conf::ctx.state_rw_dir, "rw", true) == -1) + if (hpfs::start_fs_session(ctx.hpfs_pid, ctx.args.state_dir, ctx.args.readonly ? "ro" : "rw", true) == -1) return -1; - LOG_DBG << "hpfs rw session started. pid:" << ctx.hpfs_pid; + LOG_DBG << "hpfs session started. pid:" << ctx.hpfs_pid << (ctx.args.readonly ? " (rdonly)" : ""); } /** * Stops the hpfs state filesystem. */ - int stop_hpfs_rw_session(hpfs::h32 &state_hash) + int stop_hpfs_rw_session(execution_context &ctx) { - // Read the root hash. - if (hpfs::get_hash(state_hash, conf::ctx.state_rw_dir, "/") == -1) + // Read the root hash if not in readonly mode. + if (!ctx.args.readonly && hpfs::get_hash(ctx.args.post_execution_state_hash, ctx.args.state_dir, "/") == -1) return -1; - LOG_DBG << "Stopping hpfs rw session... pid:" << ctx.hpfs_pid; + LOG_DBG << "Stopping hpfs session... pid:" << ctx.hpfs_pid << (ctx.args.readonly ? " (rdonly)" : ""); + ; if (util::kill_process(ctx.hpfs_pid, true) == -1) return -1; ctx.hpfs_pid = 0; - LOG_DBG << "hpfs rw session stopped."; return 0; } @@ -172,7 +179,7 @@ namespace sc * "unl":[ "pkhex", ... ] * } */ - int write_contract_args(const contract_exec_args &args) + int write_contract_args(const execution_context &ctx) { // Populate the json string with contract args. // We don't use a JSON parser here because it's lightweight to contrstuct the @@ -181,14 +188,20 @@ namespace sc std::ostringstream os; os << "{\"version\":\"" << util::HP_VERSION << "\",\"pubkey\":\"" << conf::cfg.pubkeyhex - << "\",\"ts\":" << args.timestamp - << ",\"hpfd\":[" << ctx.hpscfds[FDTYPE::SCREAD] << "," << ctx.hpscfds[FDTYPE::SCWRITE] - << "],\"usrfd\":{"; + << "\",\"ts\":" << ctx.args.time + << ",\"readonly\":" << (ctx.args.readonly ? "true" : "false"); + + if (!ctx.args.readonly) + { + os << ",\"hpfd\":[" << ctx.hpscfds[FDTYPE::SCREAD] << "," << ctx.hpscfds[FDTYPE::SCWRITE] + << "],\"nplfd\":[" << ctx.nplfds[FDTYPE::SCREAD] << "," << ctx.nplfds[FDTYPE::SCWRITE] << "]"; + } + + os << ",\"usrfd\":{"; fdmap_json_to_stream(ctx.userfds, os); - os << "},\"nplfd\":[" << ctx.nplfds[FDTYPE::SCREAD] << "," << ctx.nplfds[FDTYPE::SCWRITE] - << "],\"unl\":["; + os << "},\"unl\":["; for (auto nodepk = conf::cfg.unl.begin(); nodepk != conf::cfg.unl.end(); nodepk++) { @@ -234,16 +247,14 @@ namespace sc return 0; } - int feed_inputs(const contract_exec_args &args) + int feed_inputs(execution_context &ctx) { // Write any hp or npl input messages to hp->sc and npl->sc pipe. - if (write_contract_hp_npl_inputs(args) != 0) - { + if (!ctx.args.readonly && write_contract_hp_npl_inputs(ctx) != 0) return -1; - } // Write any verified (consensus-reached) user inputs to user pipes. - if (write_contract_fdmap_inputs(ctx.userfds, args.userbufs) != 0) + if (write_contract_fdmap_inputs(ctx.userfds, ctx.args.userbufs) != 0) { LOG_ERR << "Failed to write user inputs to contract."; return -1; @@ -252,20 +263,20 @@ namespace sc return 0; } - int fetch_outputs(const contract_exec_args &args) + int fetch_outputs(execution_context &ctx) { util::mask_signal(); while (true) { - if (ctx.should_deinit) + if (ctx.should_stop) break; - const int hpsc_npl_res = read_contract_hp_npl_outputs(args); + const int hpsc_npl_res = ctx.args.readonly ? 0 : read_contract_hp_npl_outputs(ctx); if (hpsc_npl_res == -1) return -1; - const int user_res = read_contract_fdmap_outputs(ctx.userfds, args.userbufs); + const int user_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs); if (user_res == -1) { LOG_ERR << "Error reading user outputs from the contract."; @@ -286,15 +297,15 @@ namespace sc /** * Writes any hp input messages to the contract. */ - int write_contract_hp_npl_inputs(const contract_exec_args &args) + int write_contract_hp_npl_inputs(execution_context &ctx) { - if (write_iopipe(ctx.hpscfds, args.hpscbufs.inputs) != 0) + if (write_iopipe(ctx.hpscfds, ctx.args.hpscbufs.inputs) != 0) { LOG_ERR << "Error writing HP inputs to SC"; return -1; } - if (write_npl_iopipe(ctx.nplfds, args.nplbuff.inputs) != 0) + if (write_npl_iopipe(ctx.nplfds, ctx.args.nplbufs.inputs) != 0) { LOG_ERR << "Error writing NPL inputs to SC"; return -1; @@ -309,16 +320,16 @@ namespace sc * * @return 0 if no bytes were read. 1 if bytes were read. -1 on failure. */ - int read_contract_hp_npl_outputs(const contract_exec_args &args) + int read_contract_hp_npl_outputs(execution_context &ctx) { - const int hpsc_res = read_iopipe(ctx.hpscfds, args.hpscbufs.output); + const int hpsc_res = read_iopipe(ctx.hpscfds, ctx.args.hpscbufs.output); if (hpsc_res == -1) { LOG_ERR << "Error reading HP output from the contract."; return -1; } - const int npl_res = read_iopipe(ctx.nplfds, args.nplbuff.output); + const int npl_res = read_iopipe(ctx.nplfds, ctx.args.nplbufs.output); if (npl_res == -1) { LOG_ERR << "Error reading NPL output from the contract."; @@ -631,11 +642,13 @@ namespace sc return -1; } - void close_unused_fds(const bool is_hp) + void close_unused_fds(execution_context &ctx, const bool is_hp) { - close_unused_vectorfds(is_hp, ctx.hpscfds); - - close_unused_vectorfds(is_hp, ctx.nplfds); + if (!ctx.args.readonly) + { + close_unused_vectorfds(is_hp, ctx.hpscfds); + close_unused_vectorfds(is_hp, ctx.nplfds); + } // Loop through user fds. for (auto &[pubkey, fds] : ctx.userfds) @@ -682,12 +695,23 @@ namespace sc fds.clear(); } - /** - * Cleanup any running processes. - */ - void deinit() + void clear_args(contract_execution_args &args) { - ctx.should_deinit = true; + args.userbufs.clear(); + args.hpscbufs.inputs.clear(); + args.hpscbufs.output.clear(); + args.nplbufs.inputs.clear(); + args.nplbufs.output.clear(); + args.time = 0; + args.post_execution_state_hash = hpfs::h32_empty; + } + + /** + * Cleanup any running processes for the specified execution context. + */ + void stop(execution_context &ctx) + { + ctx.should_stop = true; if (ctx.contract_pid > 0) util::kill_process(ctx.contract_pid, true); diff --git a/src/sc.hpp b/src/sc.hpp index 73601c0c..81436632 100644 --- a/src/sc.hpp +++ b/src/sc.hpp @@ -50,33 +50,31 @@ namespace sc /** * Holds information that should be passed into the contract process. */ - struct contract_exec_args + struct contract_execution_args { + // Whether the contract should execute in read only mode (to serve read requests). + bool readonly = false; + + // State dir path to be used for this execution. + std::string state_dir; + // Map of user I/O buffers (map key: user binary public key). // The value is a pair holding consensus-verified inputs and contract-generated outputs. - contract_bufmap_t &userbufs; + contract_bufmap_t userbufs; // Pair of NPL<->SC byte array message buffers. // Input buffers for NPL->SC messages, Output buffers for SC->NPL messages. - contract_iobuf_pair &nplbuff; + contract_iobuf_pair nplbufs; // Pair of HP<->SC JSON message buffers (mainly used for control messages). // Input buffers for HP->SC messages, Output buffers for SC->HP messages. - contract_iobuf_pair &hpscbufs; + contract_iobuf_pair hpscbufs; - // Current HotPocket timestamp. - const int64_t timestamp; + // Current HotPocket consensus time. + int64_t time = 0; - contract_exec_args( - int64_t timestamp, - contract_bufmap_t &userbufs, - contract_iobuf_pair &nplbuff, - contract_iobuf_pair &hpscbufs) : userbufs(userbufs), - nplbuff(nplbuff), - hpscbufs(hpscbufs), - timestamp(timestamp) - { - } + // State hash after execution will be copied to this (not applicable to read only mode). + hpfs::h32 post_execution_state_hash = hpfs::h32_empty; }; /** @@ -84,6 +82,9 @@ namespace sc */ struct execution_context { + // The arguments that was used to initiate this execution. + contract_execution_args args; + // Map of user pipe fds (map key: user public key) contract_fdmap_t userfds; @@ -103,30 +104,28 @@ namespace sc std::thread output_fetcher_thread; // Indicates that the deinit procedure has begun. - bool should_deinit = false; + bool should_stop = false; }; - int exec_contract(const contract_exec_args &args, hpfs::h32 &state_hash); - - void deinit(); + int execute_contract(execution_context &ctx); //------Internal-use functions for this namespace. int await_process_execution(pid_t pid); - int start_hpfs_rw_session(); + int start_hpfs_rw_session(execution_context &ctx); - int stop_hpfs_rw_session(hpfs::h32 &state_hash); + int stop_hpfs_rw_session(execution_context &ctx); - int write_contract_args(const contract_exec_args &args); + int write_contract_args(const execution_context &ctx); - int feed_inputs(const contract_exec_args &args); + int feed_inputs(execution_context &ctx); - int fetch_outputs(const contract_exec_args &args); + int fetch_outputs(execution_context &ctx); - int write_contract_hp_npl_inputs(const contract_exec_args &args); + int write_contract_hp_npl_inputs(execution_context &ctx); - int read_contract_hp_npl_outputs(const contract_exec_args &args); + int read_contract_hp_npl_outputs(execution_context &ctx); // Common helper functions @@ -148,12 +147,16 @@ namespace sc int read_iopipe(std::vector &fds, std::string &output); - void close_unused_fds(const bool is_hp); + void close_unused_fds(execution_context &ctx, const bool is_hp); void close_unused_vectorfds(const bool is_hp, std::vector &fds); void cleanup_vectorfds(std::vector &fds); + void clear_args(contract_execution_args &args); + + void stop(execution_context &ctx); + } // namespace sc #endif \ No newline at end of file diff --git a/src/usr/read_req.cpp b/src/usr/read_req.cpp new file mode 100644 index 00000000..e52e31c9 --- /dev/null +++ b/src/usr/read_req.cpp @@ -0,0 +1,141 @@ +#include "../pchheader.hpp" +#include "../hplog.hpp" +#include "../util.hpp" +#include "../sc.hpp" +#include "../conf.hpp" +#include "../jsonschema/usrmsg_helpers.hpp" +#include "usr.hpp" +#include "read_req.hpp" + +namespace jusrmsg = jsonschema::usrmsg; + +/** + * Helper functions for serving read requests from users. + */ +namespace read_req +{ + constexpr uint16_t LOOP_WAIT = 100; // Milliseconds + bool is_shutting_down = false; + bool init_success = false; + std::thread read_req_thread; + sc::execution_context contract_ctx; + + int init() + { + contract_ctx.args.state_dir = conf::ctx.state_read_req_dir; + contract_ctx.args.readonly = true; + + read_req_thread = std::thread(read_request_processor); + init_success = true; + return 0; + } + + void deinit() + { + if (init_success) + { + is_shutting_down = true; + + // Stop the contract if running. + sc::stop(contract_ctx); + + read_req_thread.join(); + } + } + + void read_request_processor() + { + util::mask_signal(); + + LOG_INFO << "Read request server started."; + + // Lists of read requests submitted by users keyed by user pubkey. + std::unordered_map> read_requests; + + while (!is_shutting_down) + { + util::sleep(LOOP_WAIT); + + { + std::lock_guard lock(usr::ctx.users_mutex); + + // Move collected read requests from users over to local requests list. + for (auto &[sid, user] : usr::ctx.users) + { + if (!user.read_requests.empty()) + { + std::list user_read_requests; + user_read_requests.splice(user_read_requests.end(), user.read_requests); + + read_requests.try_emplace(user.pubkey, std::move(user_read_requests)); + } + } + } + + if (!read_requests.empty()) + { + LOG_DBG << "Processing read requests... count:" << read_requests.size(); + + // Process the read requests by executing the contract. + if (execute_contract(read_requests) != -1) + { + // If contract execution was succcessful, send the outputs back to users. + std::lock_guard lock(usr::ctx.users_mutex); + + uint32_t dispatch_count = 0; + for (auto &[pubkey, bufpair] : contract_ctx.args.userbufs) + { + if (!bufpair.output.empty()) + { + // Find the user session by user pubkey. + const auto sess_itr = usr::ctx.sessionids.find(pubkey); + if (sess_itr != usr::ctx.sessionids.end()) // match found + { + const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id. + if (user_itr != usr::ctx.users.end()) // match found + { + std::string outputtosend; + outputtosend.swap(bufpair.output); + + std::string msg; + jusrmsg::create_contract_read_response_container(msg, outputtosend); + + const usr::connected_user &user = user_itr->second; + user.session.send(msg); + dispatch_count++; + } + } + } + } + + sc::clear_args(contract_ctx.args); + LOG_DBG << "Dispatched read request responses. count:" << dispatch_count; + } + else + { + LOG_ERR << "Contract execution for read requests failed."; + } + + read_requests.clear(); + } + } + + LOG_INFO << "Read request server stopped."; + } + + int execute_contract(std::unordered_map> &read_requests) + { + // Populate read requests to user buf map. + for (auto &[pubkey, requests] : read_requests) + { + sc::contract_iobuf_pair user_bufpair; + user_bufpair.inputs.splice(user_bufpair.inputs.end(), requests); + + contract_ctx.args.userbufs.try_emplace(pubkey, std::move(user_bufpair)); + } + + // Execute the contract. + return sc::execute_contract(contract_ctx); + } + +} // namespace read_req \ No newline at end of file diff --git a/src/usr/read_req.hpp b/src/usr/read_req.hpp new file mode 100644 index 00000000..ccaa5a99 --- /dev/null +++ b/src/usr/read_req.hpp @@ -0,0 +1,16 @@ +#ifndef _HP_CONS_READ_REQ_ +#define _HP_CONS_READ_REQ_ + +namespace read_req +{ + int init(); + + void deinit(); + + void read_request_processor(); + + int execute_contract(std::unordered_map> &read_requests); + +} // namespace read_req + +#endif \ No newline at end of file diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 183aeecf..281631ce 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -134,9 +134,27 @@ namespace usr { const char *msg_type = d[jusrmsg::FLD_TYPE].GetString(); - // Message is a contract input message. - if (d[jusrmsg::FLD_TYPE] == jusrmsg::MSGTYPE_CONTRACT_INPUT) + if (d[jusrmsg::FLD_TYPE] == jusrmsg::MSGTYPE_CONTRACT_READ_REQUEST) { + std::string content; + if (jusrmsg::extract_read_request(content, d) == 0) + { + std::lock_guard lock(ctx.users_mutex); + + //Add to the user's pending read requests list. + user.read_requests.push_back(std::move(content)); + return 0; + } + else + { + send_request_status_result(user.session, jusrmsg::STATUS_REJECTED, jusrmsg::REASON_BAD_MSG_FORMAT, msg_type, ""); + return -1; + } + } + else if (d[jusrmsg::FLD_TYPE] == jusrmsg::MSGTYPE_CONTRACT_INPUT) + { + // Message is a contract input message. + std::string contentjson; std::string sig; if (jusrmsg::extract_signed_input_container(contentjson, sig, d) == 0) diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index c6181e90..b7e0ec00 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -14,68 +14,71 @@ namespace usr { -/** + /** * Holds information about an authenticated (challenge-verified) user * connected to the HotPocket node. */ -struct connected_user -{ - // User binary public key - const std::string pubkey; + struct connected_user + { + // User binary public key + const std::string pubkey; - // Holds the unprocessed user inputs collected from websocket. - std::list submitted_inputs; + // Holds the unprocessed user inputs collected from websocket. + std::list submitted_inputs; - // Holds the websocket session of this user. - // We don't need to own the session object since the lifetime of user and session are coupled. - const comm::comm_session &session; + // Holds the unprocessed read requests collected from websocket. + std::list read_requests; - /** + // Holds the websocket session of this user. + // We don't need to own the session object since the lifetime of user and session are coupled. + const comm::comm_session &session; + + /** * @param session The web socket session the user is connected to. * @param pubkey The public key of the user in binary format. */ - connected_user(const comm::comm_session &session, std::string_view pubkey) - : session(session), pubkey(pubkey) - { - } -}; + connected_user(const comm::comm_session &session, std::string_view pubkey) + : session(session), pubkey(pubkey) + { + } + }; -/** + /** * The context struct to hold global connected-users and related objects. */ -struct connected_context -{ - // Connected (authenticated) user list. - // Map key: User socket session id () - std::unordered_map users; - std::mutex users_mutex; // Mutex for users access race conditions. + struct connected_context + { + // Connected (authenticated) user list. + // Map key: User socket session id () + std::unordered_map users; + std::mutex users_mutex; // Mutex for users access race conditions. - // Holds set of connected user session ids and public keys for lookups. - // This is used for pubkey duplicate checks as well. - // Map key: User binary pubkey - std::unordered_map sessionids; + // Holds set of connected user session ids and public keys for lookups. + // This is used for pubkey duplicate checks as well. + // Map key: User binary pubkey + std::unordered_map sessionids; - comm::comm_server listener; -}; -extern connected_context ctx; + comm::comm_server listener; + }; + extern connected_context ctx; -int init(); + int init(); -void deinit(); + void deinit(); -int start_listening(); + int start_listening(); -int verify_challenge(std::string_view message, comm::comm_session &session); + int verify_challenge(std::string_view message, comm::comm_session &session); -int handle_user_message(connected_user &user, std::string_view message); + int handle_user_message(connected_user &user, std::string_view message); -void send_request_status_result(const comm::comm_session &session, std::string_view status, std::string_view reason, std::string_view origin_type, std::string_view origin_extra_data); + void send_request_status_result(const comm::comm_session &session, std::string_view status, std::string_view reason, std::string_view origin_type, std::string_view origin_extra_data); -int add_user(const comm::comm_session &session, const std::string &pubkey); + int add_user(const comm::comm_session &session, const std::string &pubkey); -int remove_user(const std::string &sessionid); + int remove_user(const std::string &sessionid); -const comm::comm_session *get_session_by_pubkey(const std::string &pubkey); + const comm::comm_session *get_session_by_pubkey(const std::string &pubkey); } // namespace usr