From 8f00c5e7d42763668d2b92a7672d502999007322 Mon Sep 17 00:00:00 2001 From: priyadharsun <30920822+priyadharsun@users.noreply.github.com> Date: Tue, 10 Nov 2020 14:09:24 +0530 Subject: [PATCH] Introduced contract control channel with 'terminate' message (#147) --- examples/nodejs_contract/hp-contract-lib.js | 3 - src/consensus.cpp | 12 +- src/consensus.hpp | 2 + src/sc.cpp | 223 ++++++++------------ src/sc.hpp | 16 +- 5 files changed, 112 insertions(+), 144 deletions(-) diff --git a/examples/nodejs_contract/hp-contract-lib.js b/examples/nodejs_contract/hp-contract-lib.js index b1cac04e..82d0af2b 100644 --- a/examples/nodejs_contract/hp-contract-lib.js +++ b/examples/nodejs_contract/hp-contract-lib.js @@ -64,9 +64,6 @@ function HotPocketContract() { this.terminate = () => { this.control.sendOutput("Terminated") - // We are still using process.kill(0) temporarily to stop contract hanging. - // This will be removed after the control message is implemented. - process.kill(0); } if (!Object.keys(hpargs.usrfd).length) { diff --git a/src/consensus.cpp b/src/consensus.cpp index 5bfaef19..d8b0f0cb 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -355,7 +355,7 @@ namespace consensus } /** - * Equeue npl messages to the npl messages queue. + * Enqueue npl messages to the npl messages queue. * @param npl_msg Constructed npl message. * @return Returns true if enqueue is success otherwise false. */ @@ -364,6 +364,16 @@ namespace consensus return ctx.contract_ctx.args.npl_messages.try_enqueue(npl_msg); } + /** + * Enqueue conrol messages to the control messages queue. + * @param control_msg Constructed control message. + * @return Returns true if enqueue is success otherwise false. + */ + bool push_control_message(const std::string &control_msg) + { + return ctx.contract_ctx.args.control_messages.try_enqueue(control_msg); + } + /** * Verifies the user signatures and populate non-expired user inputs from collected * non-unl proposals (if any) into consensus candidate data. diff --git a/src/consensus.hpp b/src/consensus.hpp index 027276c5..ec7d876c 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -135,6 +135,8 @@ namespace consensus int get_initial_state_hash(hpfs::h32 &hash); + bool push_control_message(const std::string &control_msg); + } // namespace consensus #endif diff --git a/src/sc.cpp b/src/sc.cpp index 546f9054..d66d55aa 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -67,11 +67,13 @@ namespace sc if (!ctx.args.readonly) { - // create sequential packet sockets for npl and hp messages. + // Create sequential packet sockets for npl messages. create_iosockets(ctx.nplfds, SOCK_SEQPACKET); - create_iosockets(ctx.hpscfds, SOCK_SEQPACKET); } + // Create sequential packet sockets for hp messages. + create_iosockets(ctx.hpscfds, SOCK_SEQPACKET); + int ret = 0; LOG_DEBUG << "Starting contract process..." << (ctx.args.readonly ? " (rdonly)" : ""); @@ -93,12 +95,14 @@ namespace sc { util::kill_process(pid, true); ctx.contract_pid = 0; + ctx.args.contract_terminated = true; goto failure; } // Wait for child process (contract process) to complete execution. const int presult = await_process_execution(ctx.contract_pid); ctx.contract_pid = 0; + ctx.args.contract_terminated = true; LOG_DEBUG << "Contract process ended." << (ctx.args.readonly ? " (rdonly)" : ""); // There could be 2 reasons for the contract to end; the contract voluntary finished execution or @@ -167,13 +171,8 @@ namespace sc if (stop_hpfs_session(ctx) == -1) ret = -1; + // Cleaning the user fdmap after executing the contract. cleanup_fdmap(ctx.userfds); - if (!ctx.args.readonly) - { - cleanup_vectorfds(ctx.hpscfds); - cleanup_vectorfds(ctx.nplfds); - } - return ret; } @@ -270,10 +269,10 @@ namespace sc if (!ctx.args.readonly) { os << ",\"lcl\":\"" << ctx.args.lcl - << "\",\"hpfd\":" << ctx.hpscfds[SOCKETFDTYPE::SCREADWRITE] - << ",\"nplfd\":" << ctx.nplfds[SOCKETFDTYPE::SCREADWRITE]; + << "\",\"nplfd\":" << ctx.nplfds[SOCKETFDTYPE::SCREADWRITE]; } + os << ",\"hpfd\":" << ctx.hpscfds[SOCKETFDTYPE::SCREADWRITE]; os << ",\"usrfd\":{"; fdmap_json_to_stream(ctx.userfds, os); @@ -327,13 +326,6 @@ namespace sc int feed_inputs(execution_context &ctx) { - // Write any input messages to hp->sc socket. - if (!ctx.args.readonly && write_contract_hp_inputs(ctx) == -1) - { - LOG_ERROR << "Error when writing contract hp inputs."; - return -1; - } - // Write any user inputs to user sockets. if (write_contract_fdmap_inputs(ctx.userfds, ctx.args.userbufs) == -1) { @@ -358,30 +350,26 @@ namespace sc if (ctx.should_stop) break; - const int hpsc_res = ctx.args.readonly ? 0 : read_contract_hp_outputs(ctx); - if (hpsc_res == -1) - return -1; - + const int hpsc_res = read_contract_hp_outputs(ctx); const int npl_read_res = ctx.args.readonly ? 0 : read_contract_npl_outputs(ctx); - if (npl_read_res == -1) - return -1; + const int user_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs, ctx.args.contract_terminated); - const int npl_write_res = ctx.args.readonly ? 0 : write_npl_messages(ctx); - if (npl_write_res == -1) - return -1; - - const int user_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs); - if (user_res == -1) + if (!ctx.args.contract_terminated) { - LOG_ERROR << "Error reading user outputs from the contract."; - return -1; + if (!ctx.args.readonly) + write_npl_messages(ctx); + write_contract_hp_inputs(ctx); } // If no bytes were read after contract finished execution, exit the read loop. - if (hpsc_res == 0 && npl_read_res == 0 && user_res == 0 && ctx.contract_pid == 0) + if (hpsc_res <= 0 && npl_read_res <= 0 && user_res <= 0 && ctx.args.contract_terminated) + { break; - - util::sleep(20); + } + else if (hpsc_res <= 0 && npl_read_res <= 0 && user_res <= 0) + { + util::sleep(20); + } } LOG_DEBUG << "Contract outputs collected."; @@ -393,11 +381,16 @@ namespace sc */ int write_contract_hp_inputs(execution_context &ctx) { - // if (write_iosocket_seq_packet(ctx.hpscfds, ctx.args.hpscbufs.inputs, false) == -1) - // { - // LOG_ERROR << "Error writing HP inputs to SC"; - // return -1; - // } + std::string control_msg; + + if (ctx.args.control_messages.try_dequeue(control_msg)) + { + if (write_iosocket_seq_packet(ctx.hpscfds, control_msg) == -1) + { + LOG_ERROR << "Error writing HP inputs to SC"; + return -1; + } + } return 0; } @@ -423,7 +416,7 @@ namespace sc p2p::npl_message npl_msg; if (ctx.args.npl_messages.try_dequeue(npl_msg)) { - if (npl_msg.lcl == ledger::ctx.get_lcl()) + if (npl_msg.lcl == ctx.args.lcl) { // Writing the public key to the contract's fd. if (write(writefd, npl_msg.pubkey.data(), npl_msg.pubkey.size()) == -1) @@ -450,7 +443,7 @@ namespace sc int read_contract_hp_outputs(execution_context &ctx) { std::string output; - const int hpsc_res = read_iosocket_seq_packet(ctx.hpscfds, output); + const int hpsc_res = read_iosocket(false, ctx.hpscfds, output, ctx.args.contract_terminated); if (hpsc_res == -1) { LOG_ERROR << "Error reading HP output from the contract."; @@ -459,6 +452,8 @@ namespace sc else if (hpsc_res > 0) { // ctx.args.hpscbufs.outputs.push_back(output); + + handle_control_msgs(ctx.args, output); } return (hpsc_res == 0) ? 0 : 1; @@ -472,7 +467,7 @@ namespace sc int read_contract_npl_outputs(execution_context &ctx) { std::string output; - const int npl_res = read_iosocket_seq_packet(ctx.nplfds, output); + const int npl_res = read_iosocket(false, ctx.nplfds, output, ctx.args.contract_terminated); if (npl_res == -1) { @@ -575,9 +570,10 @@ namespace sc * * @param fdmap A map which has public key and a vector as fd list for that public key. * @param bufmap A map which has a public key and input/output buffer pair for that public key. + * @param contract_terminated Indicates whether the contract termination signal recieved. * @return 0 if no bytes were read. 1 if bytes were read. -1 on failure. */ - int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap) + int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap, const bool contract_terminated) { bool bytes_read = false; for (auto &[pubkey, bufs] : bufmap) @@ -587,7 +583,7 @@ namespace sc std::vector &fds = fdmap[pubkey]; // This returns the total bytes read from the socket. - const int total_bytes_read = read_iosocket_stream(fds, output); + const int total_bytes_read = read_iosocket(true, fds, output, contract_terminated); if (total_bytes_read > 0) { @@ -640,11 +636,6 @@ namespace sc bytes_read = true; } - - if (total_bytes_read == -1) - { - return -1; - } } return bytes_read ? 1 : 0; @@ -656,9 +647,6 @@ namespace sc */ void cleanup_fdmap(contract_fdmap_t &fdmap) { - for (auto &[pubkey, fds] : fdmap) - cleanup_vectorfds(fds); - fdmap.clear(); } @@ -690,7 +678,6 @@ namespace sc * Common function to write the given input buffer into the write fd from the HP side socket. * @param fds Vector of fd list. * @param inputs Buffer to write into the HP write fd. - * @param close_if_empty Close the socket after writing if this is true. */ int write_iosocket_stream(std::vector &fds, std::list &inputs) { @@ -740,52 +727,43 @@ namespace sc } /** - * Common function to write the given input buffer into the write fd from the HP side socket. + * Common function to write the given input into the write fd from the HP side socket. * @param fds Vector of fd list. - * @param inputs Buffer to write into the HP write fd. - * @param close_if_empty Close the socket after writing if this is true. + * @param input Input to write into the HP write fd. */ - int write_iosocket_seq_packet(std::vector &fds, std::list &inputs, const bool close_if_empty) + int write_iosocket_seq_packet(std::vector &fds, std::string_view input) { // Write the inputs (if any) into the contract. const int writefd = fds[SOCKETFDTYPE::HPREADWRITE]; if (writefd == -1) return 0; - bool write_error = false; - - if (!inputs.empty()) + if (write(writefd, input.data(), input.length()) == -1) { - for (std::string &input : inputs) - { - if (write(writefd, input.data(), input.length()) == -1) - write_error = true; - } - } - else if (close_if_empty) - { - close(writefd); - fds[SOCKETFDTYPE::HPREADWRITE] = -1; - } - if (write_error) LOG_ERROR << errno << ": Error writing to sequece packet socket."; + //cleanup_vectorfds(fds); + return -1; + } - return write_error ? -1 : 0; + return 0; } /** - * Common function to read buffered output from the sequence packet socket and populate the output. + * Common function to read buffered output from the socket and populate the output. + * @param is_stream_socket Indicates whether socket is steam socket or not * @param fds Vector representing the socket fd list. * @param output The buffer to place the read output. + * @param contract_terminated Indicates whether the contract termination signal recieved. * @return -1 on error. Otherwise no. of bytes read. */ - int read_iosocket_seq_packet(std::vector &fds, std::string &output) + int read_iosocket(const bool is_stream_socket, std::vector &fds, std::string &output, const bool contract_terminated) { // Read any available data that have been written by the contract process // from the output socket and store in the output buffer. // Outputs will be read by the consensus process later when it wishes so. const int readfd = fds[SOCKETFDTYPE::HPREADWRITE]; + int res = 0; if (readfd == -1) return 0; @@ -795,84 +773,49 @@ namespace sc if (ioctl(readfd, FIONREAD, &available_bytes) != -1) { if (available_bytes == 0) - return 0; - - output.resize(MIN(MAX_SEQ_PACKET_SIZE, available_bytes)); - const int res = read(readfd, output.data(), MAX_SEQ_PACKET_SIZE); - output.resize(res); - - if (res >= 0) { - if (res == 0) // EOF + res = 0; + } + else + { + const size_t bytes_to_read = is_stream_socket ? available_bytes : MIN(MAX_SEQ_PACKET_SIZE, available_bytes); + output.resize(bytes_to_read); + const int read_res = read(readfd, output.data(), bytes_to_read); + + if (read_res >= 0) { - close(readfd); - fds[SOCKETFDTYPE::HPREADWRITE] = -1; + res = read_res; + if (is_stream_socket) + output.resize(read_res); + } + else + { + res = -1; } - return res; } } - - close(readfd); - fds[SOCKETFDTYPE::HPREADWRITE] = -1; - LOG_ERROR << errno << ": Error reading sequence packet socket."; - - return -1; - } - - /** - * Common function to read buffered output from the stream socket and populate the output list. - * @param fds Vector representing the sockets fd list. - * @param output The buffer to place the read output. - * @return -1 on error. Otherwise no. of bytes read. - */ - int read_iosocket_stream(std::vector &fds, std::string &output) - { - // Read any available data that have been written by the contract process - // from the output socket and store in the output buffer. - // Outputs will be read by the consensus process later when it wishes so. - - const int readfd = fds[SOCKETFDTYPE::HPREADWRITE]; - if (readfd == -1) - return 0; - - bool read_error = false; - size_t available_bytes = 0; - if (ioctl(readfd, FIONREAD, &available_bytes) != -1) + else { - if (available_bytes == 0) - { - return 0; - } - - output.resize(available_bytes); - const int res = read(readfd, output.data(), available_bytes); - - if (res >= 0) - { - if (res == 0) // EOF - { - close(readfd); - fds[SOCKETFDTYPE::HPREADWRITE] = -1; - } - return res; - } + res = -1; } - close(readfd); - fds[SOCKETFDTYPE::HPREADWRITE] = -1; - LOG_ERROR << errno << ": Error reading stream socket."; + if (res == -1 || (res == 0 && contract_terminated)) + { + cleanup_vectorfds(fds); + } - return -1; + return res; } void close_unused_fds(execution_context &ctx, const bool is_hp) { if (!ctx.args.readonly) { - close_unused_socket_vectorfds(is_hp, ctx.hpscfds); close_unused_socket_vectorfds(is_hp, ctx.nplfds); } + close_unused_socket_vectorfds(is_hp, ctx.hpscfds); + // Loop through user fds. for (auto &[pubkey, fds] : ctx.userfds) close_unused_socket_vectorfds(is_hp, fds); @@ -938,6 +881,7 @@ namespace sc args.time = 0; args.lcl.clear(); args.post_execution_state_hash = hpfs::h32_empty; + args.contract_terminated = false; } /** @@ -954,4 +898,13 @@ namespace sc ctx.contract_io_thread.join(); } + void handle_control_msgs(contract_execution_args &args, std::string &output) + { + if (output == "Terminated") + { + args.contract_terminated = true; + } + output.clear(); + } + } // namespace sc diff --git a/src/sc.hpp b/src/sc.hpp index 724b5699..2c48da2d 100644 --- a/src/sc.hpp +++ b/src/sc.hpp @@ -71,6 +71,9 @@ namespace sc // NPL messages to be passed into contract. moodycamel::ReaderWriterQueue npl_messages; + + // Contol messages to be passed into contract. + moodycamel::ReaderWriterQueue control_messages; // Pair of HP<->SC JSON message buffers (mainly used for control messages). // Input buffers for HP->SC messages, Output buffers for SC->HP messages. @@ -84,6 +87,9 @@ namespace sc // State hash after execution will be copied to this (not applicable to read only mode). hpfs::h32 post_execution_state_hash = hpfs::h32_empty; + + // Indicates that the contract has sent termination control message or it has exited. + bool contract_terminated = false; }; /** @@ -154,19 +160,17 @@ namespace sc int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap); - int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap); + int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap, const bool contract_terminated); void cleanup_fdmap(contract_fdmap_t &fdmap); int create_iosockets(std::vector &fds, const int socket_type); - int write_iosocket_seq_packet(std::vector &fds, std::list &inputs, const bool close_if_empty); + int write_iosocket_seq_packet(std::vector &fds, std::string_view input); int write_iosocket_stream(std::vector &fds, std::list &inputs); - int read_iosocket_seq_packet(std::vector &fds, std::string &output); - - int read_iosocket_stream(std::vector &fds, std::string &output); + int read_iosocket(const bool is_stream_socket, std::vector &fds, std::string &output, const bool contract_terminated); void close_unused_fds(execution_context &ctx, const bool is_hp); @@ -178,6 +182,8 @@ namespace sc void stop(execution_context &ctx); + void handle_control_msgs(contract_execution_args &args, std::string &output); + } // namespace sc #endif \ No newline at end of file