diff --git a/examples/nodejs_contract/echo_contract.js b/examples/nodejs_contract/echo_contract.js index 6ee0ae03..78125b55 100644 --- a/examples/nodejs_contract/echo_contract.js +++ b/examples/nodejs_contract/echo_contract.js @@ -21,13 +21,19 @@ const echoContract = (ctx) => { else { await user.send("Echoing: " + msg); } - }); + }); - // Broadcast message to all connected users. - // ctx.users.get().forEach(u => u.send("Hello")); + // Get list of all users who are connected. + // ctx.users.get(); - // Send message to specific user (identified by public key). - // await ctx.users.find().send("Hello"); + // Get the user identified by public key. + // ctx.users.find(""); + + // Get list of all peers in the cluster. + // ctx.peers.get(); + + // Get the peer identified by public key. + // ctx.peers.find(""); // Peer messages example. // if (!ctx.readonly) { diff --git a/examples/nodejs_contract/hp-contract-lib.js b/examples/nodejs_contract/hp-contract-lib.js index 8c3b525d..1bf83ad7 100644 --- a/examples/nodejs_contract/hp-contract-lib.js +++ b/examples/nodejs_contract/hp-contract-lib.js @@ -29,9 +29,9 @@ class HotPocketContract { const executionContext = new ContractExecutionContext(hpargs, users, peers); this.events.emit("session_start"); - invokeCallback(contractFunc, executionContext).then(() => { + invokeCallback(contractFunc, executionContext).catch(errHandler).finally(() => { // Wait for any pending tasks added during execution. - Promise.all(pendingTasks).then(() => { + Promise.all(pendingTasks).catch(errHandler).finally(() => { this.events.emit("session_end"); this.#terminate(); }); @@ -121,7 +121,7 @@ class UsersCollection { if (pendingUserCount == 0) { // All user message events has been emitted. // Now start waiting for queued up user message callback completion. - Promise.all(userMessageTasks).then(allUsersCompletionResolver) + Promise.all(userMessageTasks).catch(errHandler).finally(allUsersCompletionResolver) } } @@ -368,13 +368,15 @@ const invokeCallback = async (callback, ...args) => { return; if (callback.constructor.name === 'AsyncFunction') { - await callback(...args); + await callback(...args).catch(errHandler); } else { callback(...args); } } +const errHandler = (err) => console.log(err); + module.exports = { HotPocketContract } \ No newline at end of file diff --git a/src/consensus.cpp b/src/consensus.cpp index 3c22bd6e..927e5f39 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -40,9 +40,6 @@ namespace consensus ctx.stage_time = (conf::cfg.roundtime * 2) / 7; ctx.stage_reset_wait_threshold = conf::cfg.roundtime / 10; - ctx.contract_ctx.args.state_dir = conf::ctx.state_rw_dir; - ctx.contract_ctx.args.readonly = false; - // Starting consensus processing thread. ctx.consensus_thread = std::thread(run_consensus); @@ -61,7 +58,11 @@ namespace consensus ctx.is_shutting_down = true; // Stop the contract if running. - sc::stop(ctx.contract_ctx); + { + std::scoped_lock lock(ctx.contract_ctx_mutex); + if (ctx.contract_ctx) + sc::stop(ctx.contract_ctx.value()); + } // Joining consensus processing thread. if (ctx.consensus_thread.joinable()) @@ -361,7 +362,10 @@ namespace consensus */ bool push_npl_message(p2p::npl_message &npl_msg) { - return ctx.contract_ctx.args.npl_messages.try_enqueue(npl_msg); + std::scoped_lock lock(ctx.contract_ctx_mutex); + if (ctx.contract_ctx) + return ctx.contract_ctx->args.npl_messages.try_enqueue(npl_msg); + return false; } /** @@ -371,7 +375,10 @@ namespace consensus */ bool push_control_message(const std::string &control_msg) { - return ctx.contract_ctx.args.control_messages.try_enqueue(control_msg); + std::scoped_lock lock(ctx.contract_ctx_mutex); + if (ctx.contract_ctx) + return ctx.contract_ctx->args.control_messages.try_enqueue(control_msg); + return false; } /** @@ -721,16 +728,23 @@ namespace consensus dispatch_user_outputs(cons_prop, new_lcl_seq_no, new_lcl); // Execute the contract + if (!ctx.is_shutting_down) { - sc::contract_execution_args &args = ctx.contract_ctx.args; + { + std::scoped_lock lock(ctx.contract_ctx_mutex); + ctx.contract_ctx.emplace(); + } + + sc::contract_execution_args &args = ctx.contract_ctx->args; + args.state_dir = conf::ctx.state_rw_dir; + args.readonly = false; args.time = cons_prop.time; args.lcl = new_lcl; // Populate user bufs. feed_user_inputs_to_contract_bufmap(args.userbufs, cons_prop); - // TODO: Do something usefull with HP<-->SC channel. - if (sc::execute_contract(ctx.contract_ctx) == -1) + if (sc::execute_contract(ctx.contract_ctx.value()) == -1) { LOG_ERROR << "Contract execution failed."; return -1; @@ -741,7 +755,10 @@ namespace consensus extract_user_outputs_from_contract_bufmap(args.userbufs); - sc::clear_args(args); + { + std::scoped_lock lock(ctx.contract_ctx_mutex); + ctx.contract_ctx.reset(); + } } return 0; diff --git a/src/consensus.hpp b/src/consensus.hpp index ec7d876c..ae039165 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -68,7 +68,8 @@ namespace consensus uint16_t stage_time = 0; // Time allocated to a consensus stage. uint16_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage. - sc::execution_context contract_ctx; + std::optional contract_ctx; + std::mutex contract_ctx_mutex; bool is_shutting_down = false; std::thread consensus_thread; diff --git a/src/sc.cpp b/src/sc.cpp index c8469ad5..8e453cd0 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -62,7 +62,7 @@ namespace sc if (start_hpfs_session(ctx) == -1) return -1; - // Setup io sockets and feed all inputs to them. + // Setup user io sockets and feed all inputs to them. create_iosockets_for_fdmap(ctx.userfds, ctx.args.userbufs); if (!ctx.args.readonly) @@ -87,38 +87,12 @@ namespace sc // Close all fds unused by HP process. close_unused_fds(ctx, true); - // Start the contract output collection thread. - ctx.contract_io_thread = std::thread(handle_contract_io, std::ref(ctx)); + // Start the contract monitor thread. + ctx.contract_monitor_thread = std::thread(contract_monitor_loop, std::ref(ctx)); - // Write the inputs into the contract process. - if (feed_inputs(ctx) == -1) - { - util::kill_process(pid, true); - ctx.contract_pid = 0; - ctx.args.contract_terminated = true; - goto failure; - } - - // Wait for child process (contract process) to complete execution. - const int presult = await_process_execution(ctx.contract_pid); - ctx.contract_pid = 0; - ctx.args.contract_terminated = true; - LOG_DEBUG << "Contract process ended." << (ctx.args.readonly ? " (rdonly)" : ""); - - // There could be 2 reasons for the contract to end; the contract voluntary finished execution or - // it was killed due to Hot Pocket shutting down. - - // Wait for the i/o thread to gracefully stop if this is voluntary contract termination. - // 'ctx.should_stop' indicates Hot Pocket is shutting down. If that's the case ouput collection thread - // is joined by the deinit logic. - if (!ctx.should_stop && ctx.contract_io_thread.joinable()) - ctx.contract_io_thread.join(); - - if (presult != 0) - { - LOG_ERROR << "Contract process exited with non-normal status code: " << presult; - goto failure; - } + // Wait for the contract monitor thread to gracefully stop along with the contract process. + if (ctx.contract_monitor_thread.joinable()) + ctx.contract_monitor_thread.join(); } else if (pid == 0) { @@ -153,7 +127,7 @@ namespace sc chdir(ctx.args.state_dir.c_str()); - int ret = execv(execv_args[0], execv_args); + execv(execv_args[0], execv_args); std::cerr << errno << ": Contract process execv failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n"; exit(1); } @@ -171,25 +145,45 @@ namespace sc if (stop_hpfs_session(ctx) == -1) ret = -1; - // Cleaning the user fdmap after executing the contract. - cleanup_fdmap(ctx.userfds); return ret; } /** - * Blocks the calling thread until the specified process completed exeution (if running). - * @return 0 if process exited normally or exit code of process if abnormally exited. + * Checks whether the contract process has exited. + * @param ctx Contract execution context. + * @param block Whether to block and wait until the contract has exited. + * @return 0 if child has not exited. 1 if exited successfully. -1 if exited abnormally */ - int await_process_execution(pid_t pid) + int check_contract_exited(execution_context &ctx, const bool block) { - if (pid > 0) + int scstatus = 0; + const int wait_res = waitpid(ctx.contract_pid, &scstatus, block ? 0 : WNOHANG); + + if (wait_res == 0) // Child still running. { - int scstatus = 0; - waitpid(pid, &scstatus, 0); - if (!WIFEXITED(scstatus)) - return WEXITSTATUS(scstatus); + return 0; + } + if (wait_res == -1) + { + LOG_ERROR << errno << ": Contract process waitpid error. pid:" << ctx.contract_pid; + ctx.contract_pid = 0; + return -1; + } + else // Child has exited + { + ctx.contract_pid = 0; + + if (WIFEXITED(scstatus)) + { + LOG_DEBUG << "Contract process" << (ctx.args.readonly ? " (rdonly)" : "") << " ended normally."; + return 1; + } + else + { + LOG_ERROR << "Contract process" << (ctx.args.readonly ? " (rdonly)" : "") << " ended with code " << WEXITSTATUS(scstatus); + return -1; + } } - return 0; } /** @@ -324,56 +318,81 @@ namespace sc return 0; } - int feed_inputs(execution_context &ctx) - { - // Write any user inputs to user sockets. - if (write_contract_fdmap_inputs(ctx.userfds, ctx.args.userbufs) == -1) - { - LOG_ERROR << "Failed to write user inputs to contract."; - return -1; - } - - return 0; - } - /** - * Collect contract outputs and feed npl messages while contract is running. + * Feeds and collect contract messages. * @param ctx Contract execution context. - * @return Returns -1 if the operation fails otherwise 0. */ - int handle_contract_io(execution_context &ctx) + void contract_monitor_loop(execution_context &ctx) { util::mask_signal(); - while (true) + // Write any user inputs to the contract. + if (write_contract_fdmap_inputs(ctx.userfds, ctx.args.userbufs) == -1) { - if (ctx.should_stop) - break; - - const int hpsc_res = read_contract_hp_outputs(ctx); - const int npl_read_res = ctx.args.readonly ? 0 : read_contract_npl_outputs(ctx); - const int user_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs, ctx.args.contract_terminated); - - if (!ctx.args.contract_terminated) + LOG_ERROR << "Failed to write user inputs to contract."; + } + else + { + while (!ctx.is_shutting_down) { - if (!ctx.args.readonly) - write_npl_messages(ctx); - write_contract_hp_inputs(ctx); - } + // Atempt to read messages from contract (regardless of contract terminated or not). + const int hpsc_read_res = read_contract_hp_outputs(ctx); + const int npl_read_res = ctx.args.readonly ? 0 : read_contract_npl_outputs(ctx); + const int user_read_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs); - // If no bytes were read after contract finished execution, exit the read loop. - if (hpsc_res <= 0 && npl_read_res <= 0 && user_res <= 0 && ctx.args.contract_terminated) - { - break; - } - else if (hpsc_res <= 0 && npl_read_res <= 0 && user_res <= 0) - { - util::sleep(20); + if (ctx.termination_signaled || ctx.contract_pid == 0) + { + // If no bytes were read after contract finished execution, exit the loop. + // Otherwise keep running the loop becaue there might be further messages to read. + if ((hpsc_read_res + npl_read_res + user_read_res) == 0) + break; + } + else + { + // We assume contract is still running. Attempt to write any queued messages to the contract. + + const int npl_write_res = ctx.args.readonly ? 0 : write_npl_messages(ctx); + if (npl_write_res == -1) + break; + + const int hpsc_write_res = write_contract_hp_inputs(ctx); + if (hpsc_write_res == -1) + break; + + // If no operation was performed during this iteration, wait for a small delay until the next iteration. + // This means there were no queued messages from either side. + if ((hpsc_read_res + npl_read_res + user_read_res + hpsc_write_res + hpsc_write_res) == 0) + util::sleep(20); + } + + // Check if contract process has exited on its own during the loop. + if (ctx.contract_pid > 0) + check_contract_exited(ctx, false); } } - LOG_DEBUG << "Contract outputs collected."; - return 0; + // Close all fds. + cleanup_vectorfds(ctx.hpscfds); + cleanup_vectorfds(ctx.nplfds); + for (auto &[pubkey, fds] : ctx.userfds) + cleanup_vectorfds(fds); + ctx.userfds.clear(); + + // If we reach this point but the contract is still running, then we need to kill the contract by force. + // This can be the case if HP is shutting down, or there was an error in initial feeding of inputs. + if (ctx.contract_pid > 0) + { + // Check if the contract has exited voluntarily. + if (check_contract_exited(ctx, false) == 0) + { + // Issue kill signal if the contract hasn't indicated the termination control message. + if (!ctx.termination_signaled) + kill(ctx.contract_pid, SIGTERM); + check_contract_exited(ctx, true); // Blocking wait until exit. + } + } + + LOG_DEBUG << "Contract monitor stopped"; } /** @@ -398,7 +417,7 @@ namespace sc /** * Write npl messages to the contract. * @param ctx Contract execution context. - * @return Returns -1 when fails otherwise 0. + * @return Returns -1 when fails. 0 if no messages were written. 1 if some messages were written. */ int write_npl_messages(execution_context &ctx) { @@ -420,10 +439,19 @@ namespace sc { // Writing the public key to the contract's fd (Skip first byte for key type prefix). if (write(writefd, npl_msg.pubkey.data() + 1, npl_msg.pubkey.size() - 1) == -1) + { + LOG_ERROR << errno << ": Error writing npl message pubkey."; return -1; + } + // Writing the message to the contract's fd. if (write(writefd, npl_msg.data.data(), npl_msg.data.size()) == -1) + { + LOG_ERROR << errno << ": Error writing npl message data."; return -1; + } + + return 1; } else { @@ -438,41 +466,37 @@ namespace sc * Read all HP output messages produced by the contract process and store them in * the buffer for later processing. * - * @return 0 if no bytes were read. 1 if bytes were read. -1 on failure. + * @return 0 if no bytes were read. 1 if bytes were read.. */ int read_contract_hp_outputs(execution_context &ctx) { std::string output; - const int hpsc_res = read_iosocket(false, ctx.hpscfds, output, ctx.args.contract_terminated); + const int hpsc_res = read_iosocket(false, ctx.hpscfds, output); if (hpsc_res == -1) { LOG_ERROR << "Error reading HP output from the contract."; - return -1; } else if (hpsc_res > 0) { - // ctx.args.hpscbufs.outputs.push_back(output); - - handle_control_msgs(ctx.args, output); + handle_control_msgs(ctx, output); } - return (hpsc_res == 0) ? 0 : 1; + return (hpsc_res > 0) ? 1 : 0; } /** * Read all NPL output messages produced by the contract process and broadcast them. * @param ctx contract execution context. - * @return 0 if no bytes were read. 1 if bytes were read. -1 on failure. + * @return 0 if no bytes were read. 1 if bytes were read. */ int read_contract_npl_outputs(execution_context &ctx) { std::string output; - const int npl_res = read_iosocket(false, ctx.nplfds, output, ctx.args.contract_terminated); + const int npl_res = read_iosocket(false, ctx.nplfds, output); if (npl_res == -1) { LOG_ERROR << "Error reading NPL output from the contract."; - return -1; } else if (npl_res > 0) { @@ -480,7 +504,7 @@ namespace sc broadcast_npl_output(output); } - return (npl_res == 0) ? 0 : 1; + return (npl_res > 0) ? 1 : 0; } /** @@ -543,15 +567,6 @@ namespace sc return 0; } - /** - * Common function to create the sockets and write buffer inputs to the fdmap. - * We take mutable parameters since the internal entries in the maps will be - * modified (eg. fd close, buffer clear). - * - * @param fdmap A map which has public key and a vector as fd list for that public key. - * @param bufmap A map which has a public key and input/output buffer lists for that public key. - * @return 0 on success. -1 on failure. - */ int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap) { // Loop through input buffers for each pubkey. @@ -570,10 +585,9 @@ namespace sc * * @param fdmap A map which has public key and a vector as fd list for that public key. * @param bufmap A map which has a public key and input/output buffer pair for that public key. - * @param contract_terminated Indicates whether the contract termination signal recieved. - * @return 0 if no bytes were read. 1 if bytes were read. -1 on failure. + * @return 0 if no bytes were read. 1 if bytes were read. */ - int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap, const bool contract_terminated) + int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap) { bool bytes_read = false; for (auto &[pubkey, bufs] : bufmap) @@ -583,9 +597,13 @@ namespace sc std::vector &fds = fdmap[pubkey]; // This returns the total bytes read from the socket. - const int total_bytes_read = read_iosocket(true, fds, output, contract_terminated); + const int total_bytes_read = read_iosocket(true, fds, output); - if (total_bytes_read > 0) + if (total_bytes_read == -1) + { + LOG_ERROR << "Error reading user outputs from contract."; + } + else if (total_bytes_read > 0) { // Current reading position of the received buffer chunk. int pos = 0; @@ -641,15 +659,6 @@ namespace sc return bytes_read ? 1 : 0; } - /** - * Common function to close any open fds in the map after an error. - * @param fdmap Any pubkey->fdlist map. (eg. ctx.userfds, ctx.nplfds) - */ - void cleanup_fdmap(contract_fdmap_t &fdmap) - { - fdmap.clear(); - } - /** * Common function to create a socket (Hp->SC, SC->HP). * @param fds Vector to populate fd list. @@ -741,7 +750,6 @@ namespace sc if (write(writefd, input.data(), input.length()) == -1) { LOG_ERROR << errno << ": Error writing to sequece packet socket."; - //cleanup_vectorfds(fds); return -1; } @@ -753,10 +761,9 @@ namespace sc * @param is_stream_socket Indicates whether socket is steam socket or not * @param fds Vector representing the socket fd list. * @param output The buffer to place the read output. - * @param contract_terminated Indicates whether the contract termination signal recieved. * @return -1 on error. Otherwise no. of bytes read. */ - int read_iosocket(const bool is_stream_socket, std::vector &fds, std::string &output, const bool contract_terminated) + int read_iosocket(const bool is_stream_socket, std::vector &fds, std::string &output) { // Read any available data that have been written by the contract process // from the output socket and store in the output buffer. @@ -791,6 +798,7 @@ namespace sc else { res = -1; + LOG_ERROR << errno << ": Error reading from contract socket."; } } } @@ -799,11 +807,6 @@ namespace sc res = -1; } - if (res == -1 || (res == 0 && contract_terminated)) - { - cleanup_vectorfds(fds); - } - return res; } @@ -869,42 +872,21 @@ namespace sc fds.clear(); } - void clear_args(contract_execution_args &args) - { - args.userbufs.clear(); - // args.hpscbufs.inputs.clear(); - // args.hpscbufs.outputs.clear(); - // Empty npl message queue. - while (args.npl_messages.pop()) - { - } - args.time = 0; - args.lcl.clear(); - args.post_execution_state_hash = hpfs::h32_empty; - args.contract_terminated = false; - } - /** - * Cleanup any running processes for the specified execution context. + * Force cleanup any running processes for the specified execution context. */ void stop(execution_context &ctx) { - ctx.should_stop = true; - - if (ctx.contract_pid > 0) - util::kill_process(ctx.contract_pid, true); - - if (ctx.contract_io_thread.joinable()) - ctx.contract_io_thread.join(); + ctx.is_shutting_down = true; } - void handle_control_msgs(contract_execution_args &args, std::string &output) + void handle_control_msgs(execution_context &ctx, std::string &msg) { - if (output == "Terminated") + if (msg == "Terminated") { - args.contract_terminated = true; + ctx.termination_signaled = true; } - output.clear(); + msg.clear(); } } // namespace sc diff --git a/src/sc.hpp b/src/sc.hpp index 2c48da2d..29bc8fcc 100644 --- a/src/sc.hpp +++ b/src/sc.hpp @@ -31,7 +31,6 @@ namespace sc { uint32_t message_len = 0; std::string message; - }; /** * Represents list of inputs to the contract and the accumulated contract output for those inputs. @@ -74,10 +73,6 @@ namespace sc // Contol messages to be passed into contract. moodycamel::ReaderWriterQueue control_messages; - - // Pair of HP<->SC JSON message buffers (mainly used for control messages). - // Input buffers for HP->SC messages, Output buffers for SC->HP messages. - // contract_iobuf_pair hpscbufs; // Current HotPocket consensus time. int64_t time = 0; @@ -87,9 +82,6 @@ namespace sc // State hash after execution will be copied to this (not applicable to read only mode). hpfs::h32 post_execution_state_hash = hpfs::h32_empty; - - // Indicates that the contract has sent termination control message or it has exited. - bool contract_terminated = false; }; /** @@ -116,10 +108,13 @@ namespace sc pid_t hpfs_pid = 0; // Thread to collect contract inputs and outputs and feed npl messages while contract is running. - std::thread contract_io_thread; + std::thread contract_monitor_thread; + + // Indicates that the contract has sent termination control message. + bool termination_signaled = false; // Indicates that the deinit procedure has begun. - bool should_stop = false; + bool is_shutting_down = false; }; int init(); @@ -130,7 +125,7 @@ namespace sc //------Internal-use functions for this namespace. - int await_process_execution(pid_t pid); + int check_contract_exited(execution_context &ctx, const bool block); int start_hpfs_session(execution_context &ctx); @@ -138,9 +133,7 @@ namespace sc int write_contract_args(const execution_context &ctx); - int feed_inputs(execution_context &ctx); - - int handle_contract_io(execution_context &ctx); + void contract_monitor_loop(execution_context &ctx); int write_contract_hp_inputs(execution_context &ctx); @@ -160,9 +153,7 @@ namespace sc int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap); - int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap, const bool contract_terminated); - - void cleanup_fdmap(contract_fdmap_t &fdmap); + int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap); int create_iosockets(std::vector &fds, const int socket_type); @@ -170,7 +161,7 @@ namespace sc int write_iosocket_stream(std::vector &fds, std::list &inputs); - int read_iosocket(const bool is_stream_socket, std::vector &fds, std::string &output, const bool contract_terminated); + int read_iosocket(const bool is_stream_socket, std::vector &fds, std::string &output); void close_unused_fds(execution_context &ctx, const bool is_hp); @@ -178,11 +169,9 @@ namespace sc void cleanup_vectorfds(std::vector &fds); - void clear_args(contract_execution_args &args); - void stop(execution_context &ctx); - void handle_control_msgs(contract_execution_args &args, std::string &output); + void handle_control_msgs(execution_context &ctx, std::string &msg); } // namespace sc