diff --git a/examples/nodejs_contract/echo_contract.js b/examples/nodejs_contract/echo_contract.js index 20a61ad0..2874041d 100644 --- a/examples/nodejs_contract/echo_contract.js +++ b/examples/nodejs_contract/echo_contract.js @@ -9,56 +9,44 @@ const hpc = new HotPocketContract(); if (!hpc.readonly) fs.appendFileSync("exects.txt", "ts:" + hpc.timestamp + "\n"); -hpc.events.on("user_message", (pubKey, message) => { +hpc.events.on("user_message", async (pubKey, message) => { const userInput = message.toString("utf8"); const user = hpc.users[pubKey]; if (userInput == "ts") { user.sendOutput(fs.readFileSync("exects.txt")); - user.closeChannel(); } else { user.sendOutput("Echoing: " + userInput); } }); -hpc.events.on("user_finished", (pubKey) => { - hpc.users[pubKey].closeChannel(); +hpc.events.on("all_users_completed", () => { + hpc.terminate(); }); -const npl = hpc.npl; +// Developer should call run method after all the event subscriptions are done. +hpc.run(); -// Npl channel always connected if contract is not in readonly mode. -// Smart contract developer has to mannually close the channel once the execution logic is complete. -if (npl) { - npl.closeNplChannel(); -} - -// HP <--> SC -const hp = hpc.control; -hp.closeControlChannel(); - -// let i = 0; -// hp.events.on('message', (msg) => { +// Control message sending and receiving template. +// const hp = hpc.control; +// hpc.events.on('control_message', (msg) => { // console.log('control msg - ' + msg); // hp.sendOutput(msg); -// i++; -// if (i == 2) -// hp.closeControlChannel(); // }) // Npl message sending and receiving template. +// const npl = hpc.npl; // if (npl) { // let i = 0; // let interval = setInterval(() => { // npl.sendOutput(`npl${i} from contract`); // if (i == 5) { // clearInterval(interval); -// npl.closeNplChannel(); // } // i++; // }, 500); -// npl.events.on("message", msg => { +// hpc.events.on("npl_message", msg => { // if (msg) { // console.log(msg); // } diff --git a/examples/nodejs_contract/hp-contract-lib.js b/examples/nodejs_contract/hp-contract-lib.js index 9b8b2a0b..b1cac04e 100644 --- a/examples/nodejs_contract/hp-contract-lib.js +++ b/examples/nodejs_contract/hp-contract-lib.js @@ -1,33 +1,82 @@ const fs = require('fs'); -const events = require('events'); const MAX_SEQ_PACKET_SIZE = 128 * 1024; +let incompleteUserCount = 0; + +function AsyncCallbackEmitter() { + this.callbacks = {}; + + this.on = (event, callback) => { + if (!this.callbacks[event]) { + this.callbacks[event] = []; + } + this.callbacks[event].push(callback); + }; + + this.emit = async (event, ...args) => { + let eventCallbacks = this.callbacks[event]; + if (eventCallbacks && eventCallbacks.length) { + await Promise.all(eventCallbacks.map(async callback => { + if (callback.constructor.name === 'AsyncFunction') { + await callback(...args); + } + else { + callback(...args); + } + })); + } + }; + + this.removeAllListeners = () => { + this.callbacks = {}; + }; + + this.removeListener = (event) => { + delete this.callbacks[event]; + }; +} + function HotPocketContract() { const hpargs = JSON.parse(fs.readFileSync(0, 'utf8')); this.readonly = hpargs.readonly; this.timestamp = hpargs.ts; + this.events = new AsyncCallbackEmitter(); - if (!this.readonly) { - const lclParts = hpargs.lcl.split("-"); - this.lcl = { - seqNo: parseInt(lclParts[0]), - hash: lclParts[1] - }; + this.run = () => { + if (!this.readonly) { + const lclParts = hpargs.lcl.split("-"); + this.lcl = { + seqNo: parseInt(lclParts[0]), + hash: lclParts[1] + }; - this.npl = new HotPocketNplChannel(hpargs.nplfd); - } + this.npl = new HotPocketNplChannel(this.events, hpargs.nplfd); + } - this.control = new HotPocketControlChannel(hpargs.hpfd); - this.events = new events.EventEmitter(); + this.control = new HotPocketControlChannel(this.events, hpargs.hpfd); - this.users = {}; - Object.keys(hpargs.usrfd).forEach((userPubKey) => { - this.users[userPubKey] = new HotPocketChannel(hpargs.usrfd[userPubKey], userPubKey, this.events); - }); + this.users = {}; + Object.keys(hpargs.usrfd).forEach((userPubKey) => { + this.users[userPubKey] = new HotPocketUserChannel(this.events, hpargs.usrfd[userPubKey], userPubKey); + incompleteUserCount++; + }); + + this.terminate = () => { + this.control.sendOutput("Terminated") + // We are still using process.kill(0) temporarily to stop contract hanging. + // This will be removed after the control message is implemented. + process.kill(0); + } + + if (!Object.keys(hpargs.usrfd).length) { + + this.events.emit("all_users_completed"); + } + }; } -function HotPocketChannel(fd, userPubKey, events) { +function HotPocketUserChannel(events, fd, userPubKey) { let socket = null; if (fd > 0) { socket = fs.createReadStream(null, { fd: fd }); @@ -35,7 +84,7 @@ function HotPocketChannel(fd, userPubKey, events) { let msgCount = -1; let msgLen = -1; let pos = 0; - socket.on("data", (buf) => { + socket.on("data", async (buf) => { pos = 0; if (msgCount == -1) { const msgCountBuf = readBytes(buf, 0, 4) @@ -61,16 +110,21 @@ function HotPocketChannel(fd, userPubKey, events) { const msgBuf = readBytes(buf, pos, possible_read_len); pos += possible_read_len; dataParts.push(msgBuf) - + if (msgLen == -1) { - events.emit("user_message", userPubKey, Buffer.concat(dataParts)); + await events.emit("user_message", userPubKey, Buffer.concat(dataParts)); dataParts = []; msgCount-- } - if (msgCount == 0) { - msgCount = -1 - events.emit("user_finished", userPubKey); + } + + if (msgCount == 0) { + msgCount = -1; + incompleteUserCount--; + if (incompleteUserCount == 0) { + events.emit("all_users_completed"); } + events.emit("user_completed", userPubKey); } }); @@ -94,17 +148,10 @@ function HotPocketChannel(fd, userPubKey, events) { fs.writeSync(fd, headerBuf); fs.writeSync(fd, outputStringBuf); } - - this.closeChannel = function () { - if (fd > 0) { - socket.destroy(); - } - } } -function HotPocketNplChannel(fd) { +function HotPocketNplChannel(events, fd) { - this.events = new events.EventEmitter(); let socket = null; let isPubKeyReceived = false; let pubKey; @@ -119,7 +166,7 @@ function HotPocketNplChannel(fd) { isPubKeyReceived = true; } else { - this.events.emit("message", { + events.emit("npl_message", { pubkey: pubKey, input: d }); @@ -128,7 +175,7 @@ function HotPocketNplChannel(fd) { } }); socket.on("error", (e) => { - this.events.emit("error", e); + events.emit("npl_error", e); }); } @@ -137,26 +184,19 @@ function HotPocketNplChannel(fd) { fs.writeSync(fd, output); } } - - this.closeNplChannel = () => { - if (fd > 0) { - socket.destroy(); - } - } } -function HotPocketControlChannel(fd) { +function HotPocketControlChannel(events, fd) { - this.events = new events.EventEmitter(); let socket = null; if (fd > 0) { socket = fs.createReadStream(null, { fd: fd, highWaterMark: MAX_SEQ_PACKET_SIZE }); socket.on("data", d => { - this.events.emit("message", d); + events.emit("control_message", d); }); socket.on("error", (e) => { - this.events.emit("error", e); + events.emit("control_error", e); }); } @@ -165,12 +205,6 @@ function HotPocketControlChannel(fd) { fs.writeSync(fd, output); } } - - this.closeControlChannel = () => { - if (fd > 0) { - socket.destroy(); - } - } } module.exports = { diff --git a/src/sc.cpp b/src/sc.cpp index 487c709e..546f9054 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -562,7 +562,7 @@ namespace sc // Loop through input buffers for each pubkey. for (auto &[pubkey, buflist] : bufmap) { - if (write_iosocket_stream(fdmap[pubkey], buflist.inputs, true) == -1) + if (write_iosocket_stream(fdmap[pubkey], buflist.inputs) == -1) return -1; } @@ -692,7 +692,7 @@ namespace sc * @param inputs Buffer to write into the HP write fd. * @param close_if_empty Close the socket after writing if this is true. */ - int write_iosocket_stream(std::vector &fds, std::list &inputs, const bool close_if_empty) + int write_iosocket_stream(std::vector &fds, std::list &inputs) { // Write the inputs (if any) into the contract. @@ -702,45 +702,37 @@ namespace sc bool write_error = false; - if (!inputs.empty()) + // Prepare the input memory segments to write with wrtiev. + // Extra one element for the header. + iovec memsegs[inputs.size() * 2 + 1]; + uint8_t header[inputs.size() * 4 + 4]; + header[0] = inputs.size() >> 24; + header[1] = inputs.size() >> 16; + header[2] = inputs.size() >> 8; + header[3] = inputs.size(); + // Message count header. + memsegs[0].iov_base = header; + memsegs[0].iov_len = 4; + size_t i = 1; + for (std::string &input : inputs) { - // Prepare the input memory segments to write with wrtiev. - // Extra one element for the header. - iovec memsegs[inputs.size() * 2 + 1]; - uint8_t header[inputs.size() * 4 + 4]; - header[0] = inputs.size() >> 24; - header[1] = inputs.size() >> 16; - header[2] = inputs.size() >> 8; - header[3] = inputs.size(); - // Message count header. - memsegs[0].iov_base = header; - memsegs[0].iov_len = 4; - size_t i = 1; - for (std::string &input : inputs) - { - // 4 bytes for message len header. - header[i * 4] = input.length() >> 24; - header[i * 4 + 1] = input.length() >> 16; - header[i * 4 + 2] = input.length() >> 8; - header[i * 4 + 3] = input.length(); - memsegs[i * 2 - 1].iov_base = &header[i * 4]; - memsegs[i * 2 - 1].iov_len = 4; - memsegs[i * 2].iov_base = input.data(); - memsegs[i * 2].iov_len = input.length(); - i++; - } - - if (writev(writefd, memsegs, (inputs.size() * 2 + 1)) == -1) - write_error = true; - - inputs.clear(); - } - else if (close_if_empty) - { - close(writefd); - fds[SOCKETFDTYPE::HPREADWRITE] = -1; + // 4 bytes for message len header. + header[i * 4] = input.length() >> 24; + header[i * 4 + 1] = input.length() >> 16; + header[i * 4 + 2] = input.length() >> 8; + header[i * 4 + 3] = input.length(); + memsegs[i * 2 - 1].iov_base = &header[i * 4]; + memsegs[i * 2 - 1].iov_len = 4; + memsegs[i * 2].iov_base = input.data(); + memsegs[i * 2].iov_len = input.length(); + i++; } + if (writev(writefd, memsegs, (inputs.size() * 2 + 1)) == -1) + write_error = true; + + inputs.clear(); + if (write_error) LOG_ERROR << errno << ": Error writing to stream socket."; diff --git a/src/sc.hpp b/src/sc.hpp index e5b86b43..724b5699 100644 --- a/src/sc.hpp +++ b/src/sc.hpp @@ -162,7 +162,7 @@ namespace sc int write_iosocket_seq_packet(std::vector &fds, std::list &inputs, const bool close_if_empty); - int write_iosocket_stream(std::vector &fds, std::list &inputs, const bool close_if_empty); + int write_iosocket_stream(std::vector &fds, std::list &inputs); int read_iosocket_seq_packet(std::vector &fds, std::string &output);