From 7a4515865db74ddca38f024248c4d959cb4625bc Mon Sep 17 00:00:00 2001 From: Savinda Senevirathne Date: Mon, 2 Nov 2020 16:50:21 +0530 Subject: [PATCH] Replaced contract I/O pipes with domain sockets. (#140) --- examples/nodejs_contract/echo_contract.js | 33 ++- examples/nodejs_contract/hp-contract-lib.js | 111 +++++--- src/sc.cpp | 287 ++++++++++---------- src/sc.hpp | 33 +-- 4 files changed, 254 insertions(+), 210 deletions(-) diff --git a/examples/nodejs_contract/echo_contract.js b/examples/nodejs_contract/echo_contract.js index d8a27a4f..674698b5 100644 --- a/examples/nodejs_contract/echo_contract.js +++ b/examples/nodejs_contract/echo_contract.js @@ -9,16 +9,16 @@ const hpc = new HotPocketContract(); if (!hpc.readonly) fs.appendFileSync("exects.txt", "ts:" + hpc.timestamp + "\n"); -Object.keys(hpc.users).forEach(async (key) => { - - const user = hpc.users[key]; - const inputBuf = await user.readInput(); - if (inputBuf) { - const userInput = inputBuf.toString("utf8"); - if (userInput == "ts") - user.sendOutput(fs.readFileSync("exects.txt")); - else - user.sendOutput("Echoing: " + userInput); +hpc.events.on("user_message", (pubKey, message) => { + const userInput = message.toString("utf8"); + const user = hpc.users[pubKey]; + if (userInput == "ts") { + user.sendOutput(fs.readFileSync("exects.txt")); + user.closeChannel(); + } + else { + user.sendOutput("Echoing: " + userInput); + user.closeChannel(); } }); @@ -30,6 +30,19 @@ if (npl) { npl.closeNplChannel(); } +// HP <--> SC +const hp = hpc.control; +hp.closeControlChannel(); + +// let i = 0; +// hp.events.on('message', (msg) => { +// console.log('control msg - ' + msg); +// hp.sendOutput(msg); +// i++; +// if (i == 2) +// hp.closeControlChannel(); +// }) + // Npl message sending and receiving template. // if (npl) { // let i = 0; diff --git a/examples/nodejs_contract/hp-contract-lib.js b/examples/nodejs_contract/hp-contract-lib.js index 62e1b8ab..45415716 100644 --- a/examples/nodejs_contract/hp-contract-lib.js +++ b/examples/nodejs_contract/hp-contract-lib.js @@ -1,7 +1,7 @@ const fs = require('fs'); const events = require('events'); -MAX_NPL_BUF_SIZE = 128*1024; +const MAX_SEQ_PACKET_SIZE = 128 * 1024; function HotPocketContract() { const hpargs = JSON.parse(fs.readFileSync(0, 'utf8')); @@ -18,53 +18,62 @@ function HotPocketContract() { this.npl = new HotPocketNplChannel(hpargs.nplfd); } + this.control = new HotPocketControlChannel(hpargs.hpfd); + this.events = new events.EventEmitter(); + this.users = {}; Object.keys(hpargs.usrfd).forEach((userPubKey) => { - const userfds = hpargs.usrfd[userPubKey]; - this.users[userPubKey] = new HotPocketChannel(userfds[0], userfds[1]); + this.users[userPubKey] = new HotPocketChannel(hpargs.usrfd[userPubKey], userPubKey, this.events); }); } -// Helper function to asynchronously read a stream to the end and fill a buffer. -const drainStream = function (stream) { - - return new Promise((resolve) => { - +function HotPocketChannel(fd, userPubKey, events) { + let socket = null; + if (fd > 0) { + socket = fs.createReadStream(null, { fd: fd }); const dataParts = []; - - const resolveBuffer = function () { - if (dataParts.length > 0) - return resolve(Buffer.concat(dataParts)); - else - return resolve(null); - } - - stream.on("data", d => { - dataParts.push(d); - }); - stream.on('end', resolveBuffer); - stream.on("close", resolveBuffer); - stream.on("error", () => { - resolve(null); - }); - }); -} - -function HotPocketChannel(infd, outfd) { - this.readInput = function () { - return new Promise((resolve) => { - if (infd == -1) { - resolve(null); + let msgLen = -1; + let bytesRead = 0; + socket.on("data", (buf) => { + if (msgLen == -1) { + // First two bytes indicate the message len. + const msgLenBuf = readBytes(buf, 0, 4); + if (msgLenBuf) { + msgLen = msgLenBuf.readUInt32BE(); + const msgBuf = readBytes(buf, 4, buf.byteLength - 4); + dataParts.push(msgBuf) + bytesRead = msgBuf.byteLength; + } + } else { + dataParts.push(buf); + bytesRead += buf.length; } - else { - const s = fs.createReadStream(null, { fd: infd }); - drainStream(s).then(buf => resolve(buf)); + if (bytesRead == msgLen) { + msgLen == -1; + events.emit("user_message", userPubKey, Buffer.concat(dataParts)); } }); + + socket.on("error", (e) => { + events.emit("user_error", userPubKey, e); + }) + } + + // Read bytes from the given buffer. + const readBytes = function (buf, pos, count) { + if (pos + count > buf.byteLength) + return null; + return buf.slice(pos, pos + count); } this.sendOutput = function (output) { - fs.writeFileSync(outfd, output); + fs.writeSync(fd, output); + } + + this.closeChannel = function () { + if (fd > 0) { + socket.destroy(); + } } } @@ -78,7 +87,7 @@ function HotPocketNplChannel(fd) { // From the hotpocket when sending the npl messages first it sends the pubkey of the particular node // and then the message, First data buffer is taken as pubkey and the second one as message, // then npl message object is constructed and the event is emmited. - socket = fs.createReadStream(null, { fd: fd, highWaterMark: MAX_NPL_BUF_SIZE}); + socket = fs.createReadStream(null, { fd: fd, highWaterMark: MAX_SEQ_PACKET_SIZE }); socket.on("data", d => { if (!isPubKeyReceived) { pubKey = d.toString('hex'); @@ -111,6 +120,34 @@ function HotPocketNplChannel(fd) { } } +function HotPocketControlChannel(fd) { + + this.events = new events.EventEmitter(); + let socket = null; + if (fd > 0) { + socket = fs.createReadStream(null, { fd: fd, highWaterMark: MAX_SEQ_PACKET_SIZE }); + socket.on("data", d => { + this.events.emit("message", d); + }); + + socket.on("error", (e) => { + this.events.emit("error", e); + }); + } + + this.sendOutput = (output) => { + if (fd > 0) { + fs.writeSync(fd, output); + } + } + + this.closeControlChannel = () => { + if (fd > 0) { + socket.destroy(); + } + } +} + module.exports = { HotPocketContract } \ No newline at end of file diff --git a/src/sc.cpp b/src/sc.cpp index 3c3759d6..d303ab51 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -9,7 +9,7 @@ namespace sc { - const int MAX_NPL_BUF_SIZE = 128 * 1024; + const int MAX_SEQ_PACKET_SIZE = 128 * 1024; bool init_success = false; // We maintain two hpfs global processes for merging and rw sessions. @@ -62,13 +62,14 @@ namespace sc if (start_hpfs_session(ctx) == -1) return -1; - // Setup io pipes and feed all inputs to them. - create_iopipes_for_fdmap(ctx.userfds, ctx.args.userbufs); + // Setup io sockets and feed all inputs to them. + create_iosockets_for_fdmap(ctx.userfds, ctx.args.userbufs); if (!ctx.args.readonly) { - create_iosockets(ctx.nplfds); - create_iopipes(ctx.hpscfds, !ctx.args.hpscbufs.inputs.empty()); + // create sequential packet sockets for npl and hp messages. + create_iosockets(ctx.nplfds, SOCK_SEQPACKET); + create_iosockets(ctx.hpscfds, SOCK_SEQPACKET); } int ret = 0; @@ -269,8 +270,8 @@ namespace sc if (!ctx.args.readonly) { os << ",\"lcl\":\"" << ctx.args.lcl - << "\",\"hpfd\":[" << ctx.hpscfds[FDTYPE::SCREAD] << "," << ctx.hpscfds[FDTYPE::SCWRITE] - << "],\"nplfd\":" << ctx.nplfds[SOCKETFDTYPE::SCREADWRITE]; + << "\",\"hpfd\":" << ctx.hpscfds[SOCKETFDTYPE::SCREADWRITE] + << ",\"nplfd\":" << ctx.nplfds[SOCKETFDTYPE::SCREADWRITE]; } os << ",\"usrfd\":{"; @@ -326,11 +327,14 @@ namespace sc int feed_inputs(execution_context &ctx) { - // Write any input messages to hp->sc pipe. + // Write any input messages to hp->sc socket. if (!ctx.args.readonly && write_contract_hp_inputs(ctx) == -1) + { + LOG_ERROR << "Error when writing contract hp inputs."; return -1; + } - // Write any verified (consensus-reached) user inputs to user pipes. + // Write any user inputs to user sockets. if (write_contract_fdmap_inputs(ctx.userfds, ctx.args.userbufs) == -1) { LOG_ERROR << "Failed to write user inputs to contract."; @@ -389,7 +393,7 @@ namespace sc */ int write_contract_hp_inputs(execution_context &ctx) { - if (write_iopipe(ctx.hpscfds, ctx.args.hpscbufs.inputs) == -1) + if (write_iosocket_seq_packet(ctx.hpscfds, ctx.args.hpscbufs.inputs, false) == -1) { LOG_ERROR << "Error writing HP inputs to SC"; return -1; @@ -445,7 +449,8 @@ namespace sc */ int read_contract_hp_outputs(execution_context &ctx) { - const int hpsc_res = read_iopipe(ctx.hpscfds, ctx.args.hpscbufs.output); + std::string output; + const int hpsc_res = read_iosocket_seq_packet(ctx.hpscfds, ctx.args.hpscbufs.output); if (hpsc_res == -1) { LOG_ERROR << "Error reading HP output from the contract."; @@ -463,7 +468,7 @@ namespace sc int read_contract_npl_outputs(execution_context &ctx) { std::string output; - const int npl_res = read_iosocket(ctx.nplfds, output); + const int npl_res = read_iosocket_seq_packet(ctx.nplfds, output); if (npl_res == -1) { @@ -514,24 +519,23 @@ namespace sc pubkey.length() - 1); // Write hex pubkey and fds. - os << "\"" << pubkeyhex << "\":[" - << itr->second[FDTYPE::SCREAD] << "," - << itr->second[FDTYPE::SCWRITE] << "]"; + os << "\"" << pubkeyhex << "\":" + << itr->second[SOCKETFDTYPE::SCREADWRITE]; } } /** - * Creates io pipes for all pubkeys specified in bufmap. + * Creates io sockets for all pubkeys specified in bufmap. * @param fdmap A map which has public key and a vector as fd list for that public key. * @param bufmap A map which has a public key and input/output buffer lists for that public key. * @return 0 on success. -1 on failure. */ - int create_iopipes_for_fdmap(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap) + int create_iosockets_for_fdmap(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap) { for (auto &[pubkey, buflist] : bufmap) { std::vector fds = std::vector(); - if (create_iopipes(fds, !buflist.inputs.empty()) == -1) + if (create_iosockets(fds, SOCK_STREAM) == -1) return -1; fdmap.emplace(pubkey, std::move(fds)); @@ -541,7 +545,7 @@ namespace sc } /** - * Common function to create the pipes and write buffer inputs to the fdmap. + * Common function to create the sockets and write buffer inputs to the fdmap. * We take mutable parameters since the internal entries in the maps will be * modified (eg. fd close, buffer clear). * @@ -554,7 +558,7 @@ namespace sc // Loop through input buffers for each pubkey. for (auto &[pubkey, buflist] : bufmap) { - if (write_iopipe(fdmap[pubkey], buflist.inputs) == -1) + if (write_iosocket_stream(fdmap[pubkey], buflist.inputs, true) == -1) return -1; } @@ -577,9 +581,11 @@ namespace sc // Get fds for the pubkey. std::vector &fds = fdmap[pubkey]; - const int res = read_iopipe(fds, bufpair.output); + const int res = read_iosocket_stream(fds, bufpair.output); if (res == -1) + { return -1; + } if (res > 0) bytes_read = true; @@ -600,50 +606,19 @@ namespace sc fdmap.clear(); } - /** - * Common function to create a pair of pipes (Hp->SC, SC->HP). - * @param fds Vector to populate fd list. - * @param create_inpipe Whether to create the input pipe from HP to SC. - */ - int create_iopipes(std::vector &fds, const bool create_inpipe) - { - int inpipe[2] = {-1, -1}; - if (create_inpipe && pipe(inpipe) == -1) - return -1; - - int outpipe[2] = {-1, -1}; - if (pipe(outpipe) == -1) - { - if (create_inpipe) - { - // Close the earlier created pipe. - close(inpipe[0]); - close(inpipe[1]); - } - return -1; - } - - // If both pipes got created, assign them to the fd vector. - fds.clear(); - fds.push_back(inpipe[0]); //SCREAD - fds.push_back(inpipe[1]); //HPWRITE - fds.push_back(outpipe[0]); //HPREAD - fds.push_back(outpipe[1]); //SCWRITE - - return 0; - } - /** * Common function to create a socket (Hp->SC, SC->HP). * @param fds Vector to populate fd list. + * @param socket_type Type of the socket. (SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET) * @return Returns -1 if socket creation fails otherwise 0. */ - int create_iosockets(std::vector &fds) + int create_iosockets(std::vector &fds, const int socket_type) { int socket[2] = {-1, -1}; - // Create a sequence packet socket. - if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, socket) == -1) + // Create the socket of given type. + if (socketpair(AF_UNIX, socket_type, 0, socket) == -1) { + LOG_ERROR << errno << ": Error when creating domain socket."; return -1; } @@ -656,15 +631,16 @@ namespace sc } /** - * Common function to write the given input buffer into the write fd from the HP side. + * Common function to write the given input buffer into the write fd from the HP side socket. * @param fds Vector of fd list. * @param inputs Buffer to write into the HP write fd. + * @param close_if_empty Close the socket after writing if this is true. */ - int write_iopipe(std::vector &fds, std::list &inputs) + int write_iosocket_stream(std::vector &fds, std::list &inputs, const bool close_if_empty) { - // Write the inputs (if any) into the contract and close the writefd. + // Write the inputs (if any) into the contract. - const int writefd = fds[FDTYPE::HPWRITE]; + const int writefd = fds[SOCKETFDTYPE::HPREADWRITE]; if (writefd == -1) return 0; @@ -673,78 +649,82 @@ namespace sc if (!inputs.empty()) { // Prepare the input memory segments to write with wrtiev. - size_t i = 0; - iovec memsegs[inputs.size()]; + iovec memsegs[2]; + std::string msg_buf; for (std::string &input : inputs) { - memsegs[i].iov_base = input.data(); - memsegs[i].iov_len = input.length(); - i++; + // Concat messages into one message segment. + msg_buf += input; } + // Storing message len in big endian. + uint8_t header[4]; + header[0] = msg_buf.length() >> 24; + header[1] = msg_buf.length() >> 16; + header[2] = msg_buf.length() >> 8; + header[3] = msg_buf.length(); + memsegs[0].iov_base = header; + memsegs[0].iov_len = sizeof(header); + memsegs[1].iov_base = msg_buf.data(); + memsegs[1].iov_len = msg_buf.length(); - if (writev(writefd, memsegs, inputs.size()) == -1) + if (writev(writefd, memsegs, 2) == -1) write_error = true; inputs.clear(); } + else if (close_if_empty) + { + close(writefd); + fds[SOCKETFDTYPE::HPREADWRITE] = -1; + } - // Close the writefd since we no longer need it. - close(writefd); - fds[FDTYPE::HPWRITE] = -1; + if (write_error) + LOG_ERROR << errno << ": Error writing to stream socket."; return write_error ? -1 : 0; } /** - * Common function to read buffered output from the pipe and populate the output list. - * @param fds Vector representing the pipes fd list. - * @param output The buffer to place the read output. - * @return -1 on error. Otherwise no. of bytes read. + * Common function to write the given input buffer into the write fd from the HP side socket. + * @param fds Vector of fd list. + * @param inputs Buffer to write into the HP write fd. + * @param close_if_empty Close the socket after writing if this is true. */ - int read_iopipe(std::vector &fds, std::string &output) + int write_iosocket_seq_packet(std::vector &fds, std::list &inputs, const bool close_if_empty) { - // Read any available data that have been written by the contract process - // from the output pipe and store in the output buffer. - // Outputs will be read by the consensus process later when it wishes so. - - const int readfd = fds[FDTYPE::HPREAD]; - if (readfd == -1) + // Write the inputs (if any) into the contract. + const int writefd = fds[SOCKETFDTYPE::HPREADWRITE]; + if (writefd == -1) return 0; - bool read_error = false; - size_t available_bytes = 0; - if (ioctl(readfd, FIONREAD, &available_bytes) != -1) + bool write_error = false; + + if (!inputs.empty()) { - if (available_bytes == 0) - return 0; - - const size_t current_size = output.size(); - output.resize(current_size + available_bytes); - const int res = read(readfd, output.data() + current_size, available_bytes); - - if (res >= 0) + for (std::string &input : inputs) { - if (res == 0) // EOF - { - close(readfd); - fds[FDTYPE::HPREAD] = -1; - } - return res; + if (write(writefd, input.data(), input.length()) == -1) + write_error = true; } } + else if (close_if_empty) + { + close(writefd); + fds[SOCKETFDTYPE::HPREADWRITE] = -1; + } + if (write_error) + LOG_ERROR << errno << ": Error writing to sequece packet socket."; - close(readfd); - fds[FDTYPE::HPREAD] = -1; - return -1; + return write_error ? -1 : 0; } /** - * Common function to read buffered output from the socket and populate the output. + * Common function to read buffered output from the sequence packet socket and populate the output. * @param fds Vector representing the socket fd list. * @param output The buffer to place the read output. * @return -1 on error. Otherwise no. of bytes read. */ - int read_iosocket(std::vector &fds, std::string &output) + int read_iosocket_seq_packet(std::vector &fds, std::string &output) { // Read any available data that have been written by the contract process // from the output socket and store in the output buffer. @@ -762,13 +742,73 @@ namespace sc if (available_bytes == 0) return 0; - output.resize(MAX_NPL_BUF_SIZE); - const int res = read(readfd, output.data(), MAX_NPL_BUF_SIZE); + output.resize(MIN(MAX_SEQ_PACKET_SIZE, available_bytes)); + const int res = read(readfd, output.data(), MAX_SEQ_PACKET_SIZE); output.resize(res); - return res; + if (res >= 0) + { + if (res == 0) // EOF + { + close(readfd); + fds[SOCKETFDTYPE::HPREADWRITE] = -1; + } + return res; + } + } + close(readfd); + fds[SOCKETFDTYPE::HPREADWRITE] = -1; + LOG_ERROR << errno << ": Error reading sequence packet socket."; + + return -1; + } + + /** + * Common function to read buffered output from the stream socket and populate the output list. + * @param fds Vector representing the sockets fd list. + * @param output The buffer to place the read output. + * @return -1 on error. Otherwise no. of bytes read. + */ + int read_iosocket_stream(std::vector &fds, std::string &output) + { + // Read any available data that have been written by the contract process + // from the output socket and store in the output buffer. + // Outputs will be read by the consensus process later when it wishes so. + + const int readfd = fds[SOCKETFDTYPE::HPREADWRITE]; + if (readfd == -1) + return 0; + + bool read_error = false; + size_t available_bytes = 0; + if (ioctl(readfd, FIONREAD, &available_bytes) != -1) + { + if (available_bytes == 0) + { + return 0; + } + + const size_t current_size = output.size(); + output.resize(current_size + available_bytes); + const int res = read(readfd, output.data() + current_size, available_bytes); + + if (res >= 0) + { + if (res == 0) // EOF + { + close(readfd); + fds[SOCKETFDTYPE::HPREADWRITE] = -1; + } + return res; + } + } + + close(readfd); + fds[SOCKETFDTYPE::HPREADWRITE] = -1; + LOG_ERROR << errno << ": Error reading stream socket."; + return -1; } @@ -776,44 +816,13 @@ namespace sc { if (!ctx.args.readonly) { - close_unused_vectorfds(is_hp, ctx.hpscfds); + close_unused_socket_vectorfds(is_hp, ctx.hpscfds); close_unused_socket_vectorfds(is_hp, ctx.nplfds); } // Loop through user fds. for (auto &[pubkey, fds] : ctx.userfds) - close_unused_vectorfds(is_hp, fds); - } - - /** - * Common function for closing unused fds based on which process this gets called from. - * This also marks active fds with O_CLOEXEC for close-on-exec behaviour. - * @param is_hp Specify 'true' when calling from HP process. 'false' from SC process. - * @param fds Vector of fds to close. - */ - void close_unused_vectorfds(const bool is_hp, std::vector &fds) - { - for (int fd_type = 0; fd_type <= 3; fd_type++) - { - const int fd = fds[fd_type]; - if (fd != -1) - { - if ((is_hp && (fd_type == FDTYPE::SCREAD || fd_type == FDTYPE::SCWRITE)) || - (!is_hp && (fd_type == FDTYPE::HPREAD || fd_type == FDTYPE::HPWRITE))) - { - close(fd); - fds[fd_type] = -1; - } - else if (is_hp && (fd_type == FDTYPE::HPREAD || fd_type == FDTYPE::HPWRITE)) - { - // The fd must be kept open in HP process. But we must - // mark it to close on exec in a potential forked process. - int flags = fcntl(fd, F_GETFD, NULL); - flags |= FD_CLOEXEC; - fcntl(fd, F_SETFD, flags); - } - } - } + close_unused_socket_vectorfds(is_hp, fds); } /** diff --git a/src/sc.hpp b/src/sc.hpp index 567979b2..f20f1225 100644 --- a/src/sc.hpp +++ b/src/sc.hpp @@ -13,19 +13,6 @@ namespace sc { - // Enum used to differenciate pipe fds maintained for SC I/O pipes. - enum FDTYPE - { - // Used by Smart Contract to read input sent by Hot Pocket. - SCREAD = 0, - // Used by Hot Pocket to write input to the smart contract. - HPWRITE = 1, - // Used by Hot Pocket to read output from the smart contract. - HPREAD = 2, - // Used by Smart Contract to write output back to Hot Pocket. - SCWRITE = 3 - }; - // Enum used to differenciate socket fds maintained for SC socket. enum SOCKETFDTYPE { @@ -99,13 +86,13 @@ namespace sc // The arguments that was used to initiate this execution. contract_execution_args args; - // Map of user pipe fds (map key: user public key) + // Map of user socket fds (map key: user public key) contract_fdmap_t userfds; - // Pipe fds for NPL <--> messages. + // Socket fds for NPL <--> messages. std::vector nplfds; - // Pipe fds for HP <--> messages. + // Socket fds for HP <--> messages. std::vector hpscfds; // Holds the contract process id (if currently executing). @@ -155,7 +142,7 @@ namespace sc void fdmap_json_to_stream(const contract_fdmap_t &fdmap, std::ostringstream &os); - int create_iopipes_for_fdmap(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap); + int create_iosockets_for_fdmap(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap); int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap); @@ -163,20 +150,18 @@ namespace sc void cleanup_fdmap(contract_fdmap_t &fdmap); - int create_iopipes(std::vector &fds, const bool create_inpipe); + int create_iosockets(std::vector &fds, const int socket_type); - int create_iosockets(std::vector &fds); + int write_iosocket_seq_packet(std::vector &fds, std::list &inputs, const bool close_if_empty); - int write_iopipe(std::vector &fds, std::list &inputs); + int write_iosocket_stream(std::vector &fds, std::list &inputs, const bool close_if_empty); - int read_iopipe(std::vector &fds, std::string &output); + int read_iosocket_seq_packet(std::vector &fds, std::string &output); - int read_iosocket(std::vector &fds, std::string &output); + int read_iosocket_stream(std::vector &fds, std::string &output); void close_unused_fds(execution_context &ctx, const bool is_hp); - void close_unused_vectorfds(const bool is_hp, std::vector &fds); - void close_unused_socket_vectorfds(const bool is_hp, std::vector &fds); void cleanup_vectorfds(std::vector &fds);