From 5f40aebf0866019b77ac767efce27f4eb7626fb6 Mon Sep 17 00:00:00 2001 From: Chalith Desaman Date: Wed, 14 Oct 2020 15:18:00 +0530 Subject: [PATCH] NPL message refactor. (#132) * Implemented feeding and broadcasting npl messages from the contract execution in real-time. * Replaced npl pipe with domain sockets. * Refactored npl read and write in nodejs echo contract --- examples/nodejs_contract/echo_contract.js | 47 +++- examples/nodejs_contract/hp-contract-lib.js | 91 +++----- src/conf.cpp | 1 + src/consensus.cpp | 43 +--- src/consensus.hpp | 7 +- src/p2p/p2p.hpp | 4 - src/p2p/peer_session_handler.cpp | 9 +- src/sc.cpp | 247 ++++++++++++++------ src/sc.hpp | 36 ++- 9 files changed, 298 insertions(+), 187 deletions(-) diff --git a/examples/nodejs_contract/echo_contract.js b/examples/nodejs_contract/echo_contract.js index c1b3897b..d8a27a4f 100644 --- a/examples/nodejs_contract/echo_contract.js +++ b/examples/nodejs_contract/echo_contract.js @@ -9,19 +9,44 @@ const hpc = new HotPocketContract(); if (!hpc.readonly) fs.appendFileSync("exects.txt", "ts:" + hpc.timestamp + "\n"); -Object.keys(hpc.users).forEach(function (key) { +Object.keys(hpc.users).forEach(async (key) => { const user = hpc.users[key]; - user.readInput().then(inputBuf => { - if (inputBuf) { - const userInput = inputBuf.toString("utf8"); - - if (userInput == "ts") - user.sendOutput(fs.readFileSync("exects.txt")); - else - user.sendOutput("Echoing: " + userInput); - } - }) + const inputBuf = await user.readInput(); + if (inputBuf) { + const userInput = inputBuf.toString("utf8"); + if (userInput == "ts") + user.sendOutput(fs.readFileSync("exects.txt")); + else + user.sendOutput("Echoing: " + userInput); + } }); +const npl = hpc.npl; + +// Npl channel always connected if contract is not in readonly mode. +// Smart contract developer has to mannually close the channel once the execution logic is complete. +if (npl) { + npl.closeNplChannel(); +} + +// Npl message sending and receiving template. +// if (npl) { +// let i = 0; +// let interval = setInterval(() => { +// npl.sendOutput(`npl${i} from contract`); +// if (i == 5) { +// clearInterval(interval); +// npl.closeNplChannel(); +// } +// i++; +// }, 500); + +// npl.events.on("message", msg => { +// if (msg) { +// console.log(msg); +// } +// }); +// } + //console.log("===Echo contract ended==="); diff --git a/examples/nodejs_contract/hp-contract-lib.js b/examples/nodejs_contract/hp-contract-lib.js index 46ad6df2..62e1b8ab 100644 --- a/examples/nodejs_contract/hp-contract-lib.js +++ b/examples/nodejs_contract/hp-contract-lib.js @@ -1,4 +1,7 @@ const fs = require('fs'); +const events = require('events'); + +MAX_NPL_BUF_SIZE = 128*1024; function HotPocketContract() { const hpargs = JSON.parse(fs.readFileSync(0, 'utf8')); @@ -12,7 +15,7 @@ function HotPocketContract() { hash: lclParts[1] }; - this.npl = new HotPocketNplChannel(hpargs.nplfd[0], hpargs.nplfd[1]); + this.npl = new HotPocketNplChannel(hpargs.nplfd); } this.users = {}; @@ -65,66 +68,46 @@ function HotPocketChannel(infd, outfd) { } } -function HotPocketNplChannel(infd, outfd) { +function HotPocketNplChannel(fd) { - const parseNplInputs = function (buf) { - - // Input may consist of multiple messages. - // Each message has the format: - // | NPL version (1 byte) | reserve (1 byte) | msg length (2 bytes BE) | peer pubkey (32 bytes) | msg | - - const inputs = []; // Peer inputs will be populated to this. - - let pos = 0; - while (pos < buf.byteLength) { - - pos += 2; // Skip version and reserve. - - // Read message len. - const msgLenBuf = readBytes(buf, pos, 2); - if (!msgLenBuf) break; - const msgLen = msgLenBuf.readUInt16BE(); - - pos += 2; - const pubKeyBuf = readBytes(buf, pos, 32); - if (!pubKeyBuf) break; - - pos += 32; - - const msgBuf = readBytes(buf, pos, msgLen) - if (!msgBuf) break; - - inputs.push({ - pubkey: pubKeyBuf.toString("hex"), - input: msgBuf - }); - - pos += msgLen; - } - - return inputs; - } - - const readBytes = function (buf, pos, count) { - if (pos + count > buf.byteLength) - return null; - return buf.slice(pos, pos + count); - } - - this.readInput = function () { - return new Promise((resolve) => { - if (infd == -1) { - resolve(null); + this.events = new events.EventEmitter(); + let socket = null; + let isPubKeyReceived = false; + let pubKey; + if (fd > 0) { + // From the hotpocket when sending the npl messages first it sends the pubkey of the particular node + // and then the message, First data buffer is taken as pubkey and the second one as message, + // then npl message object is constructed and the event is emmited. + socket = fs.createReadStream(null, { fd: fd, highWaterMark: MAX_NPL_BUF_SIZE}); + socket.on("data", d => { + if (!isPubKeyReceived) { + pubKey = d.toString('hex'); + isPubKeyReceived = true; } else { - const s = fs.createReadStream(null, { fd: infd }); - drainStream(s).then(buf => resolve(parseNplInputs(buf))); + this.events.emit("message", { + pubkey: pubKey, + input: d + }); + pubKey = null; + isPubKeyReceived = false; } }); + socket.on("error", (e) => { + this.events.emit("error", e); + }); } - this.sendOutput = function (output) { - fs.writeFileSync(outfd, output); + this.sendOutput = (output) => { + if (fd > 0) { + fs.writeSync(fd, output); + } + } + + this.closeNplChannel = () => { + if (fd > 0) { + socket.destroy(); + } } } diff --git a/src/conf.cpp b/src/conf.cpp index acb794f7..6dbe30d4 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -78,6 +78,7 @@ namespace conf util::create_dir_tree_recursive(ctx.config_dir); util::create_dir_tree_recursive(ctx.hist_dir); util::create_dir_tree_recursive(ctx.state_rw_dir); + util::create_dir_tree_recursive(ctx.log_dir); //Create config file with default settings. diff --git a/src/consensus.cpp b/src/consensus.cpp index c133f413..446afd5b 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -144,25 +144,6 @@ namespace consensus } } - // Throughout consensus, we move over the incoming npl messages collected via the network so far into - // the candidate npl message set (move and append). This is to have a private working set for the consensus - // and avoid threading conflicts with network incoming npl messages. - { - std::scoped_lock lock(p2p::ctx.collected_msgs.npl_messages_mutex); - ctx.candidate_npl_messages.splice(ctx.candidate_npl_messages.end(), p2p::ctx.collected_msgs.npl_messages); - } - - // Only the npl messages with a valid lcl will be passed down to the contract. - // lcl should match the previous round's lcl. - auto itr = ctx.candidate_npl_messages.begin(); - while (itr != ctx.candidate_npl_messages.end()) - { - if (itr->lcl == lcl) - ++itr; - else - ctx.candidate_npl_messages.erase(itr++); - } - LOG_DEBUG << "Started stage " << std::to_string(ctx.stage); if (ctx.stage == 0) // Stage 0 means begining of a consensus round. @@ -355,6 +336,16 @@ namespace consensus << " users:" << nup.user_inputs.size(); } + /** + * Equeue npl messages to the npl messages queue. + * @param npl_msg Constructed npl message. + * @return Returns true if enqueue is success otherwise false. + */ + bool push_npl_message(p2p::npl_message &npl_msg) + { + return ctx.contract_ctx.args.npl_messages.try_enqueue(npl_msg); + } + /** * Verifies the user signatures and populate non-expired user inputs from collected * non-unl proposals (if any) into consensus candidate data. @@ -777,9 +768,6 @@ namespace consensus args.time = cons_prop.time; args.lcl = lcl; - // Feed NPL messages. - args.npl_messages.splice(args.npl_messages.end(), ctx.candidate_npl_messages); - // Populate user bufs. feed_user_inputs_to_contract_bufmap(args.userbufs, cons_prop); // TODO: Do something usefull with HP<-->SC channel. @@ -792,7 +780,6 @@ namespace consensus ctx.state = args.post_execution_state_hash; extract_user_outputs_from_contract_bufmap(args.userbufs); - broadcast_npl_output(args.npl_output, lcl); sc::clear_args(args); } @@ -911,16 +898,6 @@ namespace consensus } } - void broadcast_npl_output(std::string &output, std::string_view lcl) - { - if (!output.empty()) - { - flatbuffers::FlatBufferBuilder fbuf(1024); - p2pmsg::create_msg_from_npl_output(fbuf, output, lcl); - p2p::broadcast_message(fbuf, true); - } - } - /** * Increment voting table counter. * @param counter The counter map in which a vote should be incremented. diff --git a/src/consensus.hpp b/src/consensus.hpp index a2c23247..27deda92 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -50,9 +50,6 @@ namespace consensus // todo: having a queue of proposals against peer pubkey. std::unordered_map candidate_proposals; - // The set of npl messages that are being collected as consensus stages are progressing. - std::list candidate_npl_messages; - // Set of user pubkeys that is said to be connected to the cluster. This will be cleared in each round. std::unordered_set candidate_users; @@ -113,6 +110,8 @@ namespace consensus void broadcast_nonunl_proposal(); + bool push_npl_message(p2p::npl_message &npl_message); + void verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no); bool verify_appbill_check(std::string_view pubkey, const size_t input_len); @@ -143,8 +142,6 @@ namespace consensus void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap); - void broadcast_npl_output(std::string &output, std::string_view lcl); - template void increment(std::map &counter, const T &candidate); diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 93103754..4ed07075 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -106,10 +106,6 @@ namespace p2p std::list nonunl_proposals; std::mutex nonunl_proposals_mutex; // Mutex for non-unl proposals access race conditions. - // List of NPL messages collected from peers. - std::list npl_messages; - std::mutex npl_messages_mutex; // Mutex for npl_messages access race conditions. - // List of pairs indicating the session pubkey hex and the state requests. std::list> state_requests; std::mutex state_requests_mutex; // Mutex for state requests access race conditions. diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index d28b7006..93710f51 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -1,5 +1,6 @@ #include "../pchheader.hpp" #include "../conf.hpp" +#include "../consensus.hpp" #include "../crypto.hpp" #include "../util.hpp" #include "../hplog.hpp" @@ -133,14 +134,16 @@ namespace p2p return 0; } - std::scoped_lock lock(ctx.collected_msgs.npl_messages_mutex); // Insert npl message with lock. - const p2pmsg::Npl_Message *npl_p2p_msg = content->message_as_Npl_Message(); npl_message msg; msg.data = msg::fbuf::flatbuff_bytes_to_sv(npl_p2p_msg->data()); msg.pubkey = msg::fbuf::flatbuff_bytes_to_sv(container->pubkey()); msg.lcl = msg::fbuf::flatbuff_bytes_to_sv(container->lcl()); - ctx.collected_msgs.npl_messages.push_back(std::move(msg)); + + if (!consensus::push_npl_message(msg)) + { + LOG_DEBUG << "NPL message enqueue failure. " << session.uniqueid.substr(0, 10); + } } else if (content_message_type == p2pmsg::Message_State_Request_Message) { diff --git a/src/sc.cpp b/src/sc.cpp index 8efad6b4..cee50f11 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -1,11 +1,16 @@ #include "pchheader.hpp" #include "conf.hpp" +#include "consensus.hpp" #include "hplog.hpp" +#include "ledger.hpp" #include "sc.hpp" #include "hpfs/hpfs.hpp" +#include "msg/fbuf/p2pmsg_helpers.hpp" namespace sc { + const int MAX_NPL_BUF_SIZE = 128 * 1024; + /** * Executes the contract process and passes the specified context arguments. * @return 0 on successful process creation. -1 on failure or contract process is already running. @@ -21,7 +26,7 @@ namespace sc if (!ctx.args.readonly) { - create_iopipes(ctx.nplfds, !ctx.args.npl_messages.empty()); + create_iosockets(ctx.nplfds); create_iopipes(ctx.hpscfds, !ctx.args.hpscbufs.inputs.empty()); } @@ -39,7 +44,7 @@ namespace sc close_unused_fds(ctx, true); // Start the contract output collection thread. - ctx.output_fetcher_thread = std::thread(fetch_outputs, std::ref(ctx)); + ctx.contract_io_thread = std::thread(handle_contract_io, std::ref(ctx)); // Write the inputs into the contract process. if (feed_inputs(ctx) == -1) @@ -57,11 +62,11 @@ namespace sc // There could be 2 reasons for the contract to end; the contract voluntary finished execution or // it was killed due to Hot Pocket shutting down. - // Wait for the output collection thread to gracefully stop if this is voluntary contract termination. + // Wait for the i/o thread to gracefully stop if this is voluntary contract termination. // 'ctx.should_stop' indicates Hot Pocket is shutting down. If that's the case ouput collection thread // is joined by the deinit logic. - if (!ctx.should_stop && ctx.output_fetcher_thread.joinable()) - ctx.output_fetcher_thread.join(); + if (!ctx.should_stop && ctx.contract_io_thread.joinable()) + ctx.contract_io_thread.join(); if (presult != 0) { @@ -208,7 +213,7 @@ namespace sc { os << ",\"lcl\":\"" << ctx.args.lcl << "\",\"hpfd\":[" << ctx.hpscfds[FDTYPE::SCREAD] << "," << ctx.hpscfds[FDTYPE::SCWRITE] - << "],\"nplfd\":[" << ctx.nplfds[FDTYPE::SCREAD] << "," << ctx.nplfds[FDTYPE::SCWRITE] << "]"; + << "],\"nplfd\":" << ctx.nplfds[SOCKETFDTYPE::SCREADWRITE]; } os << ",\"usrfd\":{"; @@ -267,10 +272,6 @@ namespace sc if (!ctx.args.readonly && write_contract_hp_inputs(ctx) == -1) return -1; - // Write any NPL messages to contract. - if (!ctx.args.readonly && write_npl_messages(ctx) == -1) - return -1; - // Write any verified (consensus-reached) user inputs to user pipes. if (write_contract_fdmap_inputs(ctx.userfds, ctx.args.userbufs) == -1) { @@ -281,7 +282,12 @@ namespace sc return 0; } - int fetch_outputs(execution_context &ctx) + /** + * Collect contract outputs and feed npl messages while contract is running. + * @param ctx Contract execution context. + * @return Returns -1 if the operation fails otherwise 0. + */ + int handle_contract_io(execution_context &ctx) { util::mask_signal(); @@ -290,8 +296,16 @@ namespace sc if (ctx.should_stop) break; - const int hpsc_npl_res = ctx.args.readonly ? 0 : read_contract_hp_npl_outputs(ctx); - if (hpsc_npl_res == -1) + const int hpsc_res = ctx.args.readonly ? 0 : read_contract_hp_outputs(ctx); + if (hpsc_res == -1) + return -1; + + const int npl_read_res = ctx.args.readonly ? 0 : read_contract_npl_outputs(ctx); + if (npl_read_res == -1) + return -1; + + const int npl_write_res = ctx.args.readonly ? 0 : write_npl_messages(ctx); + if (npl_write_res == -1) return -1; const int user_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs); @@ -302,7 +316,7 @@ namespace sc } // If no bytes were read after contract finished execution, exit the read loop. - if (hpsc_npl_res == 0 && user_res == 0 && ctx.contract_pid == 0) + if (hpsc_res == 0 && npl_read_res == 0 && user_res == 0 && ctx.contract_pid == 0) break; util::sleep(20); @@ -328,64 +342,41 @@ namespace sc /** * Write npl messages to the contract. + * @param ctx Contract execution context. + * @return Returns -1 when fails otherwise 0. */ int write_npl_messages(execution_context &ctx) { /** - * npl inputs are feed into the contract in a binary protocol. It follows the following pattern - * |**NPL version (1 byte)**|**Reserved (1 byte)**|**Length of the message (2 bytes)**|**Public key (32 bytes)**|**Npl message data**| - * Length of the message is calculated without including public key length + * npl inputs are feed into the contract as sequence packets. It first sends the pubkey and then + * the data. */ - const int writefd = ctx.nplfds[FDTYPE::HPWRITE]; + const int writefd = ctx.nplfds[SOCKETFDTYPE::HPREADWRITE]; + if (writefd == -1) return 0; - bool write_error = false; - if (!ctx.args.npl_messages.empty()) + // Dequeue the next npl message from the queue. + // Check the lcl against the latest lcl. + p2p::npl_message npl_msg; + if (ctx.args.npl_messages.try_dequeue(npl_msg)) { - const size_t total_memsegs = ctx.args.npl_messages.size() * 3; - iovec memsegs[total_memsegs]; - size_t i = 0; - for (const auto &npl_msg : ctx.args.npl_messages) + if (npl_msg.lcl == ledger::ctx.get_lcl()) { - const uint8_t pre_header_index = i * 3; - const uint8_t pubkey_index = pre_header_index + 1; - const uint8_t msg_index = pre_header_index + 2; - - const uint16_t msg_len = npl_msg.data.size(); - - // Header is |version(1byte)|reserve(1byte)|msg length(2bytes big endian)| - uint8_t header[4]; - header[0] = util::MIN_NPL_INPUT_VERSION; - - // Store msg length in big endian. - header[2] = msg_len << 8; - header[3] = msg_len; - - memsegs[pre_header_index].iov_base = header; - memsegs[pre_header_index].iov_len = sizeof(header); - - // Pubkey without the key type prefix. - memsegs[pubkey_index].iov_base = reinterpret_cast(const_cast(npl_msg.pubkey.data() + 1)); - memsegs[pubkey_index].iov_len = npl_msg.pubkey.size() - 1; - - memsegs[msg_index].iov_base = reinterpret_cast(const_cast(npl_msg.data.data())); - memsegs[msg_index].iov_len = msg_len; - - i++; + // Writing the public key to the contract's fd. + if (write(writefd, npl_msg.pubkey.data(), npl_msg.pubkey.size()) == -1) + return -1; + // Writing the message to the contract's fd. + if (write(writefd, npl_msg.data.data(), npl_msg.data.size()) == -1) + return -1; + } + else + { + LOG_DEBUG << "NPL message dropped due to lcl mismatch."; } - - if (writev(writefd, memsegs, total_memsegs) == -1) - write_error = true; - - ctx.args.npl_messages.clear(); } - // Close the writefd since we no longer need it. - close(writefd); - ctx.nplfds[FDTYPE::HPWRITE] = -1; - - return write_error ? -1 : 0; + return 0; } /** @@ -394,7 +385,7 @@ namespace sc * * @return 0 if no bytes were read. 1 if bytes were read. -1 on failure. */ - int read_contract_hp_npl_outputs(execution_context &ctx) + int read_contract_hp_outputs(execution_context &ctx) { const int hpsc_res = read_iopipe(ctx.hpscfds, ctx.args.hpscbufs.output); if (hpsc_res == -1) @@ -403,19 +394,50 @@ namespace sc return -1; } - const int npl_res = read_iopipe(ctx.nplfds, ctx.args.npl_output); + return (hpsc_res == 0) ? 0 : 1; + } + + /** + * Read all NPL output messages produced by the contract process and broadcast them. + * @param ctx contract execution context. + * @return 0 if no bytes were read. 1 if bytes were read. -1 on failure. + */ + int read_contract_npl_outputs(execution_context &ctx) + { + std::string output; + const int npl_res = read_iosocket(ctx.nplfds, output); + if (npl_res == -1) { LOG_ERROR << "Error reading NPL output from the contract."; return -1; } + else if (npl_res > 0) + { + // Broadcast npl messages once contract npl output is collected. + broadcast_npl_output(output); + } - return (hpsc_res == 0 && npl_res == 0) ? 0 : 1; + return (npl_res == 0) ? 0 : 1; + } + + /** + * Broadcast npl messages to peers. + * @param output Npl message to be broadcasted. + */ + void broadcast_npl_output(std::string_view output) + { + if (!output.empty()) + { + flatbuffers::FlatBufferBuilder fbuf(1024); + msg::fbuf::p2pmsg::create_msg_from_npl_output(fbuf, output, ledger::ctx.get_lcl()); + p2p::broadcast_message(fbuf, true); + } } /** * Common helper function to write json output of fdmap to given ostream. - * @param fdmap Any pubkey->fdlist map. (eg. ctx.userfds, ctx.nplfds) + * @param fdmap Any pubkey->fdlist map. (eg. ctx.userfds) * @param os An output stream. */ void fdmap_json_to_stream(const contract_fdmap_t &fdmap, std::ostringstream &os) @@ -553,6 +575,28 @@ namespace sc return 0; } + /** + * Common function to create a socket (Hp->SC, SC->HP). + * @param fds Vector to populate fd list. + * @return Returns -1 if socket creation fails otherwise 0. + */ + int create_iosockets(std::vector &fds) + { + int socket[2] = {-1, -1}; + // Create a sequence packet socket. + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, socket) == -1) + { + return -1; + } + + // If socket got created, assign them to the fd vector. + fds.clear(); + fds.push_back(socket[0]); //SCREADWRITE + fds.push_back(socket[1]); //HPREADWRITE + + return 0; + } + /** * Common function to write the given input buffer into the write fd from the HP side. * @param fds Vector of fd list. @@ -636,12 +680,46 @@ namespace sc return -1; } + /** + * Common function to read buffered output from the socket and populate the output. + * @param fds Vector representing the socket fd list. + * @param output The buffer to place the read output. + * @return -1 on error. Otherwise no. of bytes read. + */ + int read_iosocket(std::vector &fds, std::string &output) + { + // Read any available data that have been written by the contract process + // from the output socket and store in the output buffer. + // Outputs will be read by the consensus process later when it wishes so. + + const int readfd = fds[SOCKETFDTYPE::HPREADWRITE]; + + if (readfd == -1) + return 0; + + // Available bytes returns the total number of bytes to read of multiple messages. + size_t available_bytes = 0; + if (ioctl(readfd, FIONREAD, &available_bytes) != -1) + { + if (available_bytes == 0) + return 0; + + output.resize(MAX_NPL_BUF_SIZE); + const int res = read(readfd, output.data(), MAX_NPL_BUF_SIZE); + output.resize(res); + + return res; + } + + return -1; + } + void close_unused_fds(execution_context &ctx, const bool is_hp) { if (!ctx.args.readonly) { close_unused_vectorfds(is_hp, ctx.hpscfds); - close_unused_vectorfds(is_hp, ctx.nplfds); + close_unused_socket_vectorfds(is_hp, ctx.nplfds); } // Loop through user fds. @@ -680,6 +758,37 @@ namespace sc } } + /** + * Common function for closing unused fds based on which process this gets called from. + * This also marks active fds with O_CLOEXEC for close-on-exec behaviour. + * @param is_hp Specify 'true' when calling from HP process. 'false' from SC process. + * @param fds Vector of fds to close. + */ + void close_unused_socket_vectorfds(const bool is_hp, std::vector &fds) + { + for (int fd_type = 0; fd_type <= 1; fd_type++) + { + const int fd = fds[fd_type]; + if (fd != -1) + { + if ((is_hp && fd_type == SOCKETFDTYPE::SCREADWRITE) || + (!is_hp && fd_type == SOCKETFDTYPE::HPREADWRITE)) + { + close(fd); + fds[fd_type] = -1; + } + else if (is_hp && (fd_type == SOCKETFDTYPE::HPREADWRITE)) + { + // The fd must be kept open in HP process. But we must + // mark it to close on exec in a potential forked process. + int flags = fcntl(fd, F_GETFD, NULL); + flags |= FD_CLOEXEC; + fcntl(fd, F_SETFD, flags); + } + } + } + } + /** * Closes all fds in a vector fd set. */ @@ -702,8 +811,10 @@ namespace sc args.userbufs.clear(); args.hpscbufs.inputs.clear(); args.hpscbufs.output.clear(); - args.npl_messages.clear(); - args.npl_output.clear(); + // Empty npl message queue. + while (args.npl_messages.pop()) + { + } args.time = 0; args.lcl.clear(); args.post_execution_state_hash = hpfs::h32_empty; @@ -719,8 +830,8 @@ namespace sc if (ctx.contract_pid > 0) util::kill_process(ctx.contract_pid, true); - if (ctx.output_fetcher_thread.joinable()) - ctx.output_fetcher_thread.join(); + if (ctx.contract_io_thread.joinable()) + ctx.contract_io_thread.join(); } } // namespace sc diff --git a/src/sc.hpp b/src/sc.hpp index c3f8a2e4..a4178830 100644 --- a/src/sc.hpp +++ b/src/sc.hpp @@ -16,7 +16,7 @@ namespace sc // Enum used to differenciate pipe fds maintained for SC I/O pipes. enum FDTYPE { - // Used by Smart Contract to read input sent by Hot Pocket + // Used by Smart Contract to read input sent by Hot Pocket. SCREAD = 0, // Used by Hot Pocket to write input to the smart contract. HPWRITE = 1, @@ -26,6 +26,17 @@ namespace sc SCWRITE = 3 }; + // Enum used to differenciate socket fds maintained for SC socket. + enum SOCKETFDTYPE + { + // Used by Smart Contract to read input sent by Hot Pocket. + // Used by Smart Contract to write output back to Hot Pocket. + SCREADWRITE = 0, + // Used by Hot Pocket to write input to the smart contract. + // Used by Hot Pocket to read output from the smart contract. + HPREADWRITE = 1 + }; + /** * Represents list of inputs to the contract and the accumulated contract output for those inputs. */ @@ -64,11 +75,8 @@ namespace sc contract_bufmap_t userbufs; // NPL messages to be passed into contract. - std::list npl_messages; + moodycamel::ReaderWriterQueue npl_messages; - // Output NPL buffer. - std::string npl_output; - // Pair of HP<->SC JSON message buffers (mainly used for control messages). // Input buffers for HP->SC messages, Output buffers for SC->HP messages. contract_iobuf_pair hpscbufs; @@ -106,8 +114,8 @@ namespace sc // Holds the hpfs rw process id (if currently executing). pid_t hpfs_pid = 0; - // Thread to collect contract outputs while contract is running. - std::thread output_fetcher_thread; + // Thread to collect contract inputs and outputs and feed npl messages while contract is running. + std::thread contract_io_thread; // Indicates that the deinit procedure has begun. bool should_stop = false; @@ -127,13 +135,17 @@ namespace sc int feed_inputs(execution_context &ctx); - int fetch_outputs(execution_context &ctx); + int handle_contract_io(execution_context &ctx); int write_contract_hp_inputs(execution_context &ctx); int write_npl_messages(execution_context &ctx); - int read_contract_hp_npl_outputs(execution_context &ctx); + int read_contract_hp_outputs(execution_context &ctx); + + int read_contract_npl_outputs(execution_context &ctx); + + void broadcast_npl_output(std::string_view output); // Common helper functions @@ -149,14 +161,20 @@ namespace sc int create_iopipes(std::vector &fds, const bool create_inpipe); + int create_iosockets(std::vector &fds); + int write_iopipe(std::vector &fds, std::list &inputs); int read_iopipe(std::vector &fds, std::string &output); + int read_iosocket(std::vector &fds, std::string &output); + void close_unused_fds(execution_context &ctx, const bool is_hp); void close_unused_vectorfds(const bool is_hp, std::vector &fds); + void close_unused_socket_vectorfds(const bool is_hp, std::vector &fds); + void cleanup_vectorfds(std::vector &fds); void clear_args(contract_execution_args &args);