From 79b55258defb0650473fdc4f141c0c8fa3fffe82 Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Fri, 27 Nov 2020 15:57:36 +0530 Subject: [PATCH] Poll-based contract output reading. (#172) --- examples/c_contract/hotpocket_contract.h | 2 +- examples/nodejs_contract/hp-contract-lib.js | 2 +- src/sc.cpp | 266 ++++++++++---------- src/sc.hpp | 61 +++-- 4 files changed, 161 insertions(+), 170 deletions(-) diff --git a/examples/c_contract/hotpocket_contract.h b/examples/c_contract/hotpocket_contract.h index ead3278a..e412bdb5 100644 --- a/examples/c_contract/hotpocket_contract.h +++ b/examples/c_contract/hotpocket_contract.h @@ -478,7 +478,7 @@ void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_obj } } } - else if (strcmp(k->string, "hpfd") == 0) + else if (strcmp(k->string, "controlfd") == 0) { __HP_ASSIGN_INT(gctx.control_fd, elem); } diff --git a/examples/nodejs_contract/hp-contract-lib.js b/examples/nodejs_contract/hp-contract-lib.js index 7f3cc48b..4c600874 100644 --- a/examples/nodejs_contract/hp-contract-lib.js +++ b/examples/nodejs_contract/hp-contract-lib.js @@ -22,7 +22,7 @@ class HotPocketContract { const argsJson = fs.readFileSync(0, 'utf8'); const hpargs = JSON.parse(argsJson); - this.#controlChannel = new ControlChannel(hpargs.hpfd); + this.#controlChannel = new ControlChannel(hpargs.controlfd); this.#executeContract(hpargs, contractFunc); } diff --git a/src/sc.cpp b/src/sc.cpp index 302593e7..a850754b 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -12,7 +12,7 @@ namespace sc { - const uint32_t MAX_SEQ_PACKET_SIZE = 128 * 1024; + const uint32_t READ_BUFFER_SIZE = 128 * 1024; // This has to be minimum 128KB to support sequence packets. bool init_success = false; // We maintain two hpfs global processes for merging and rw sessions. @@ -65,17 +65,19 @@ namespace sc if (start_hpfs_session(ctx) == -1) return -1; - create_iosockets_for_fdmap(ctx.userfds, ctx.args.userbufs); // User output socket. - create_iosockets(ctx.hpscfds, SOCK_SEQPACKET); // Control socket. - if (!ctx.args.readonly) - create_iosockets(ctx.nplfds, SOCK_SEQPACKET); // NPL socket. - - // Clone the user inputs fd to be passed on to the contract. - const int user_inputs_fd = dup(ctx.args.user_input_store.fd); - - int ret = 0; + // Create the IO sockets for users, control channel and npl. + // (Note: User socket will only be used for contract output only. For feeding user inputs we are using a memfd.) + if (create_iosockets_for_fdmap(ctx.userfds, ctx.args.userbufs) == -1 || + create_iosockets(ctx.controlfds, SOCK_SEQPACKET) == -1 || + (!ctx.args.readonly && create_iosockets(ctx.nplfds, SOCK_SEQPACKET) == -1)) + { + cleanup_fds(ctx); + stop_hpfs_session(ctx); + return -1; + } LOG_DEBUG << "Starting contract process..." << (ctx.args.readonly ? " (rdonly)" : ""); + int ret = 0; const pid_t pid = fork(); if (pid > 0) @@ -85,7 +87,6 @@ namespace sc // Close all fds unused by HP process. close_unused_fds(ctx, true); - close(user_inputs_fd); // Start the contract monitor thread. ctx.contract_monitor_thread = std::thread(contract_monitor_loop, std::ref(ctx)); @@ -104,10 +105,11 @@ namespace sc // Close all fds unused by SC process. close_unused_fds(ctx, false); - // Reset the seek position for the contract's copy of user inputs fd. - lseek(user_inputs_fd, 0, SEEK_SET); + // Clone the user inputs fd to be passed on to the contract. + const int user_inputs_fd = dup(ctx.args.user_input_store.fd); + lseek(user_inputs_fd, 0, SEEK_SET); // Reset seek position. - // Write the contract input message from HotPocket to the stdin (0) of the contract process. + // Write the contract execution args from HotPocket to the stdin (0) of the contract process. write_contract_args(ctx, user_inputs_fd); const bool using_appbill = !ctx.args.readonly && !conf::cfg.appbill.empty(); @@ -137,14 +139,10 @@ namespace sc else { LOG_ERROR << errno << ": fork() failed when starting contract process." << (ctx.args.readonly ? " (rdonly)" : ""); - goto failure; + ret = -1; } - goto success; - failure: - ret = -1; - - success: + cleanup_fds(ctx); if (stop_hpfs_session(ctx) == -1) ret = -1; @@ -245,7 +243,7 @@ namespace sc * "ts": , * "readonly": , * "lcl": "", (eg: 169-a1d82eb4c9ed005ec2c4f4f82b6f0c2fd7543d66b1a0f6b8e58ae670b3e2bcfb) - * "hpfd": fd, + * "controlfd": fd, * "nplfd":fd, * "userinfd":fd, // User inputs fd. * "users":{ "":[outfd, [msg1_off, msg1_len], ...], ... }, @@ -267,10 +265,10 @@ namespace sc if (!ctx.args.readonly) { os << ",\"lcl\":\"" << ctx.args.lcl - << "\",\"nplfd\":" << ctx.nplfds[SOCKETFDTYPE::SCREADWRITE]; + << "\",\"nplfd\":" << ctx.nplfds.scfd; } - os << ",\"hpfd\":" << ctx.hpscfds[SOCKETFDTYPE::SCREADWRITE]; + os << ",\"controlfd\":" << ctx.controlfds.scfd; os << ",\"userinfd\":" << user_inputs_fd << ",\"users\":{"; @@ -315,18 +313,43 @@ namespace sc { util::mask_signal(); + // Prepare output poll fd list. + // User out fds + control fd + NPL fd (NPL fd not available in readonly mode) + const size_t out_fd_count = ctx.userfds.size() + (ctx.args.readonly ? 1 : 2); + const size_t control_fd_idx = ctx.userfds.size(); + const size_t npl_fd_idx = control_fd_idx + 1; + struct pollfd out_fds[out_fd_count]; + + auto user_itr = ctx.userfds.begin(); + for (int i = 0; i < out_fd_count; i++) + { + const int fd = (user_itr != ctx.userfds.end()) ? (user_itr++)->second.hpfd + : (i == control_fd_idx ? ctx.controlfds.hpfd : ctx.nplfds.hpfd); + out_fds[i] = {fd, POLLIN, 0}; + } + while (!ctx.is_shutting_down) { + // Reset the revents because we are reusing same pollfd list. + for (int i = 0; i < out_fd_count; i++) + out_fds[i].revents = 0; + + if (poll(out_fds, out_fd_count, 20) == -1) + { + LOG_ERROR << errno << ": Poll error in contract outputs."; + break; + } + // Atempt to read messages from contract (regardless of contract terminated or not). - const int hpsc_read_res = read_control_outputs(ctx); - const int npl_read_res = ctx.args.readonly ? 0 : read_contract_npl_outputs(ctx); - const int user_read_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs); + const int control_read_res = read_control_outputs(ctx, out_fds[control_fd_idx]); + const int npl_read_res = ctx.args.readonly ? 0 : read_npl_outputs(ctx, out_fds[npl_fd_idx]); + const int user_read_res = read_contract_fdmap_outputs(ctx.userfds, out_fds, ctx.args.userbufs); if (ctx.termination_signaled || ctx.contract_pid == 0) { // If no bytes were read after contract finished execution, exit the loop. // Otherwise keep running the loop becaue there might be further messages to read. - if ((hpsc_read_res + npl_read_res + user_read_res) == 0) + if ((control_read_res + npl_read_res + user_read_res) == 0) break; } else @@ -340,11 +363,6 @@ namespace sc const int control_write_res = write_control_inputs(ctx); if (control_write_res == -1) break; - - // If no operation was performed during this iteration, wait for a small delay until the next iteration. - // This means there were no queued messages from either side. - if ((hpsc_read_res + npl_read_res + user_read_res + control_write_res + control_write_res) == 0) - util::sleep(20); } // Check if contract process has exited on its own during the loop. @@ -353,11 +371,7 @@ namespace sc } // Close all fds. - cleanup_vectorfds(ctx.hpscfds); - cleanup_vectorfds(ctx.nplfds); - for (auto &[pubkey, fds] : ctx.userfds) - cleanup_vectorfds(fds); - ctx.userfds.clear(); + cleanup_fds(ctx); // Purge any inputs we passed to the contract. for (const auto &[pubkey, bufs] : ctx.args.userbufs) @@ -390,7 +404,7 @@ namespace sc if (ctx.args.control_messages.try_dequeue(control_msg)) { - if (write_iosocket_seq_packet(ctx.hpscfds, control_msg) == -1) + if (write_iosocket_seq_packet(ctx.controlfds, control_msg) == -1) { LOG_ERROR << "Error writing HP inputs to SC"; return -1; @@ -411,7 +425,7 @@ namespace sc * npl inputs are feed into the contract as sequence packets. It first sends the pubkey and then * the data. */ - const int writefd = ctx.nplfds[SOCKETFDTYPE::HPREADWRITE]; + const int writefd = ctx.nplfds.hpfd; if (writefd == -1) return 0; @@ -460,10 +474,10 @@ namespace sc * * @return 0 if no bytes were read. 1 if bytes were read.. */ - int read_control_outputs(execution_context &ctx) + int read_control_outputs(execution_context &ctx, const pollfd pfd) { std::string output; - const int res = read_iosocket(false, ctx.hpscfds, output); + const int res = read_iosocket(false, pfd, output); if (res == -1) { LOG_ERROR << "Error reading control message from the contract."; @@ -481,10 +495,10 @@ namespace sc * @param ctx contract execution context. * @return 0 if no bytes were read. 1 if bytes were read. */ - int read_contract_npl_outputs(execution_context &ctx) + int read_npl_outputs(execution_context &ctx, const pollfd pfd) { std::string output; - const int res = read_iosocket(false, ctx.nplfds, output); + const int res = read_iosocket(false, pfd, output); if (res == -1) { @@ -532,7 +546,7 @@ namespace sc // Write hex pubkey as key and output fd as first element of array. os << "\"" << pubkeyhex << "\":[" - << itr->second[SOCKETFDTYPE::SCREADWRITE]; + << itr->second.scfd; // Write input offsets into the same array. for (auto inp_itr = user_inputs.begin(); inp_itr != user_inputs.end(); inp_itr++) @@ -544,7 +558,7 @@ namespace sc /** * Creates io sockets for all pubkeys specified in bufmap. - * @param fdmap A map which has public key and a vector as fd list for that public key. + * @param fdmap A map which has public key and fd pair for that public key. * @param bufmap A map which has a public key and input/output buffer lists for that public key. * @return 0 on success. -1 on failure. */ @@ -552,7 +566,7 @@ namespace sc { for (auto &[pubkey, buflist] : bufmap) { - std::vector fds = std::vector(); + fd_pair fds = {}; if (create_iosockets(fds, SOCK_STREAM) == -1) return -1; @@ -566,21 +580,23 @@ namespace sc * Common function to read all outputs produced by the contract process and store them in * output buffers for later processing. * - * @param fdmap A map which has public key and a vector as fd list for that public key. + * @param fdmap A map which has public key and fd pair for that public key. + * @param pfds Poll fd set for users (must be in same order as user fdmap). * @param bufmap A map which has a public key and input/output buffer pair for that public key. * @return 0 if no bytes were read. 1 if bytes were read. */ - int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap) + int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, const pollfd *pfds, contract_bufmap_t &bufmap) { bool bytes_read = false; + int i = 0; for (auto &[pubkey, bufs] : bufmap) { // Get fds for the pubkey. std::string output; - std::vector &fds = fdmap[pubkey]; + fd_pair &fds = fdmap[pubkey]; // This returns the total bytes read from the socket. - const int total_bytes_read = read_iosocket(true, fds, output); + const int total_bytes_read = read_iosocket(true, pfds[i++], output); if (total_bytes_read == -1) { @@ -644,11 +660,11 @@ namespace sc /** * Common function to create a socket (Hp->SC, SC->HP). - * @param fds Vector to populate fd list. + * @param fds fd pair to populate. * @param socket_type Type of the socket. (SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET) * @return Returns -1 if socket creation fails otherwise 0. */ - int create_iosockets(std::vector &fds, const int socket_type) + int create_iosockets(fd_pair &fds, const int socket_type) { int socket[2] = {-1, -1}; // Create the socket of given type. @@ -658,23 +674,22 @@ namespace sc return -1; } - // If socket got created, assign them to the fd vector. - fds.clear(); - fds.push_back(socket[0]); //SCREADWRITE - fds.push_back(socket[1]); //HPREADWRITE + // If socket got created, assign them to the fd pair. + fds.scfd = socket[0]; + fds.hpfd = socket[1]; return 0; } /** * Common function to write the given input into the write fd from the HP side socket. - * @param fds Vector of fd list. + * @param fds fd pair. * @param input Input to write into the HP write fd. */ - int write_iosocket_seq_packet(std::vector &fds, std::string_view input) + int write_iosocket_seq_packet(fd_pair &fds, std::string_view input) { // Write the inputs (if any) into the contract. - const int writefd = fds[SOCKETFDTYPE::HPREADWRITE]; + const int writefd = fds.hpfd; if (writefd == -1) return 0; @@ -689,119 +704,98 @@ namespace sc /** * Common function to read buffered output from the socket and populate the output. - * @param is_stream_socket Indicates whether socket is steam socket or not - * @param fds Vector representing the socket fd list. + * @param is_stream_socket Indicates whether socket is steam socket or not. + * @param pfd The pollfd struct containing poll status. * @param output The buffer to place the read output. * @return -1 on error. Otherwise no. of bytes read. */ - int read_iosocket(const bool is_stream_socket, std::vector &fds, std::string &output) + int read_iosocket(const bool is_stream_socket, const pollfd pfd, std::string &output) { // Read any available data that have been written by the contract process // from the output socket and store in the output buffer. - // Outputs will be read by the consensus process later when it wishes so. - - const int readfd = fds[SOCKETFDTYPE::HPREADWRITE]; - int res = 0; - - if (readfd == -1) - return 0; - - // Available bytes returns the total number of bytes to read of multiple messages. - size_t available_bytes = 0; - if (ioctl(readfd, FIONREAD, &available_bytes) != -1) + if (pfd.revents & POLLIN) { - if (available_bytes == 0) - { - res = 0; - } - else - { - const size_t bytes_to_read = is_stream_socket ? available_bytes : MIN(MAX_SEQ_PACKET_SIZE, available_bytes); - output.resize(bytes_to_read); - const int read_res = read(readfd, output.data(), bytes_to_read); - output.resize(read_res); + output.resize(READ_BUFFER_SIZE); + const int res = read(pfd.fd, output.data(), READ_BUFFER_SIZE); + output.resize(res); // Resize back to the actual bytes read. - if (read_res >= 0) - { - res = read_res; - if (is_stream_socket) - output.resize(read_res); - } - else - { - res = -1; - LOG_ERROR << errno << ": Error reading from contract socket."; - } - } - } - else - { - res = -1; - } + if (res == -1) + LOG_ERROR << errno << ": Error reading from contract socket. stream:" << is_stream_socket; - return res; + return res; + } + return 0; } void close_unused_fds(execution_context &ctx, const bool is_hp) { if (!ctx.args.readonly) { - close_unused_socket_vectorfds(is_hp, ctx.nplfds); + close_unused_socket_fds(is_hp, ctx.nplfds); } - close_unused_socket_vectorfds(is_hp, ctx.hpscfds); + close_unused_socket_fds(is_hp, ctx.controlfds); // Loop through user fds. for (auto &[pubkey, fds] : ctx.userfds) - close_unused_socket_vectorfds(is_hp, fds); + close_unused_socket_fds(is_hp, fds); } /** * Common function for closing unused fds based on which process this gets called from. * This also marks active fds with O_CLOEXEC for close-on-exec behaviour. * @param is_hp Specify 'true' when calling from HP process. 'false' from SC process. - * @param fds Vector of fds to close. + * @param fds fd pair to close. */ - void close_unused_socket_vectorfds(const bool is_hp, std::vector &fds) + void close_unused_socket_fds(const bool is_hp, fd_pair &fds) { - for (int fd_type = 0; fd_type <= 1; fd_type++) + if (is_hp) { - const int fd = fds[fd_type]; - if (fd != -1) + if (fds.scfd != -1) { - if ((is_hp && fd_type == SOCKETFDTYPE::SCREADWRITE) || - (!is_hp && fd_type == SOCKETFDTYPE::HPREADWRITE)) - { - close(fd); - fds[fd_type] = -1; - } - else if (is_hp && (fd_type == SOCKETFDTYPE::HPREADWRITE)) - { - // The fd must be kept open in HP process. But we must - // mark it to close on exec in a potential forked process. - int flags = fcntl(fd, F_GETFD, NULL); - flags |= FD_CLOEXEC; - fcntl(fd, F_SETFD, flags); - } + close(fds.scfd); + fds.scfd = -1; + } + + // The hp fd must be kept open in HP process. But we must + // mark it to close on exec in a potential forked process. + if (fds.hpfd != -1) + { + int flags = fcntl(fds.hpfd, F_GETFD, NULL); + flags |= FD_CLOEXEC; + fcntl(fds.hpfd, F_SETFD, flags); + } + } + else + { + if (fds.hpfd != -1) + { + close(fds.hpfd); + fds.hpfd = -1; } } } - /** - * Closes all fds in a vector fd set. - */ - void cleanup_vectorfds(std::vector &fds) + void cleanup_fds(execution_context &ctx) { - for (int i = 0; i < fds.size(); i++) - { - if (fds[i] != -1) - { - close(fds[i]); - fds[i] = -1; - } - } + cleanup_fd_pair(ctx.controlfds); + cleanup_fd_pair(ctx.nplfds); + for (auto &[pubkey, fds] : ctx.userfds) + cleanup_fd_pair(fds); + ctx.userfds.clear(); + } - fds.clear(); + /** + * Closes fds in a fd pair. + */ + void cleanup_fd_pair(fd_pair &fds) + { + if (fds.hpfd != -1) + close(fds.hpfd); + if (fds.scfd != -1) + close(fds.scfd); + fds.hpfd = -1; + fds.scfd = -1; } /** diff --git a/src/sc.hpp b/src/sc.hpp index 314b55fa..75a633a7 100644 --- a/src/sc.hpp +++ b/src/sc.hpp @@ -16,15 +16,10 @@ namespace sc constexpr uint16_t MAX_NPL_MSG_QUEUE_SIZE = 64; // Maximum npl message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31).... constexpr uint16_t MAX_CONTROL_MSG_QUEUE_SIZE = 64; // Maximum out message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31).... - // Enum used to differenciate socket fds maintained for SC socket. - enum SOCKETFDTYPE + struct fd_pair { - // Used by Smart Contract to read input sent by Hot Pocket. - // Used by Smart Contract to write output back to Hot Pocket. - SCREADWRITE = 0, - // Used by Hot Pocket to write input to the smart contract. - // Used by Hot Pocket to read output from the smart contract. - HPREADWRITE = 1 + int hpfd = -1; + int scfd = -1; }; /** @@ -35,10 +30,10 @@ namespace sc uint32_t message_len = 0; std::string message; }; - /** - * Represents list of inputs to the contract and the accumulated contract output for those inputs. - */ + /** + * Represents list of inputs to the contract and the accumulated contract output for those inputs. + */ struct contract_iobufs { // List of inputs to be fed into the contract. @@ -48,17 +43,17 @@ namespace sc std::list outputs; }; - // Common typedef for a map of pubkey->fdlist. - // This is used to keep track of fdlist quadruplet with a public key (eg. user, npl). - typedef std::unordered_map> contract_fdmap_t; + // Common typedef for a map of pubkey->fdpair. + // This is used to keep track of fdpair with a public key (eg. user). + typedef std::map contract_fdmap_t; // Common typedef for a map of pubkey->I/O list pair (input list and output list). - // This is used to keep track of input/output buffers for a given public key (eg. user, npl) - typedef std::unordered_map contract_bufmap_t; + // This is used to keep track of input/output buffers for a given public key (eg. user) + typedef std::map contract_bufmap_t; /** - * Holds information that should be passed into the contract process. - */ + * Holds information that should be passed into the contract process. + */ struct contract_execution_args { // Whether the contract should execute in read only mode (to serve read requests). @@ -97,8 +92,8 @@ namespace sc }; /** - * Holds context information relating to contract execution environment. - */ + * Holds context information relating to contract execution environment. + */ struct execution_context { // The arguments that was used to initiate this execution. @@ -107,11 +102,11 @@ namespace sc // Map of user socket fds (map key: user public key) contract_fdmap_t userfds; - // Socket fds for NPL <--> messages. - std::vector nplfds; + // Socket fds for NPL messages. + fd_pair nplfds; - // Socket fds for HP <--> messages. - std::vector hpscfds; + // Socket fds for control messages. + fd_pair controlfds; // Holds the contract process id (if currently executing). pid_t contract_pid = 0; @@ -155,9 +150,9 @@ namespace sc int write_npl_messages(execution_context &ctx); - int read_control_outputs(execution_context &ctx); + int read_control_outputs(execution_context &ctx, const pollfd pfd); - int read_contract_npl_outputs(execution_context &ctx); + int read_npl_outputs(execution_context &ctx, const pollfd pfd); void broadcast_npl_output(std::string_view output); @@ -167,19 +162,21 @@ namespace sc int create_iosockets_for_fdmap(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap); - int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap); + int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, const pollfd *pfds, contract_bufmap_t &bufmap); - int create_iosockets(std::vector &fds, const int socket_type); + int create_iosockets(fd_pair &fds, const int socket_type); - int write_iosocket_seq_packet(std::vector &fds, std::string_view input); + int write_iosocket_seq_packet(fd_pair &fds, std::string_view input); - int read_iosocket(const bool is_stream_socket, std::vector &fds, std::string &output); + int read_iosocket(const bool is_stream_socket, const pollfd pfd, std::string &output); void close_unused_fds(execution_context &ctx, const bool is_hp); - void close_unused_socket_vectorfds(const bool is_hp, std::vector &fds); + void close_unused_socket_fds(const bool is_hp, fd_pair &fds); - void cleanup_vectorfds(std::vector &fds); + void cleanup_fds(execution_context &ctx); + + void cleanup_fd_pair(fd_pair &fds); void stop(execution_context &ctx);