diff --git a/examples/nodejs_contract/echo_contract.js b/examples/nodejs_contract/echo_contract.js index 536f55be..25f2368f 100644 --- a/examples/nodejs_contract/echo_contract.js +++ b/examples/nodejs_contract/echo_contract.js @@ -2,36 +2,43 @@ const { HotPocketContract } = require("./hp-contract-lib"); const fs = require('fs'); // HP smart contract is defined as a function which takes HP ExecutionContext as an argument. -// HP considers execution as complete, when this function completes and all the callbacks are complete. +// HP considers execution as complete, when this function completes and all the peer message callbacks are complete. const echoContract = async (ctx) => { // We just save execution timestamp as an example state file change. if (!ctx.readonly) fs.appendFileSync("exects.txt", "ts:" + ctx.timestamp + "\n"); - // This will return after all user messages are processed. - await ctx.users.onMessage(async (user, buf) => { + // Collection of user input handler promises to wait for. + const inputHandlers = []; + + for (const user of ctx.users.list()) { // This user's pubkey can be accessed from 'user.pubKey' - // A reply message can be sent to the user by 'user.send(msg)' + + for (const input of user.inputs) { - const msg = buf.toString(); - if (msg == "ts") { - await user.send(fs.readFileSync("exects.txt")); - } - else { - await user.send("Echoing: " + msg); - } - }); + inputHandlers.push(new Promise(async (resolve) => { - // Get list of all users who are connected. - // ctx.users.get(); + const buf = await ctx.users.read(input); + const msg = buf.toString(); + + if (msg == "ts") + await user.send(fs.readFileSync("exects.txt")); + else + await user.send("Echoing: " + msg); + + resolve(); + })); + } + } + await Promise.all(inputHandlers); // Get the user identified by public key. // ctx.users.find(""); // Get list of all peers in the cluster. - // ctx.peers.get(); + // ctx.peers.list(); // Get the peer identified by public key. // ctx.peers.find(""); diff --git a/examples/nodejs_contract/hp-contract-lib.js b/examples/nodejs_contract/hp-contract-lib.js index 732e0a9e..5678aeeb 100644 --- a/examples/nodejs_contract/hp-contract-lib.js +++ b/examples/nodejs_contract/hp-contract-lib.js @@ -1,4 +1,3 @@ -const { EventEmitter } = require('events'); const fs = require('fs'); const MAX_SEQ_PACKET_SIZE = 128 * 1024; @@ -10,7 +9,6 @@ Object.freeze(CONTROL_MESSAGE); class HotPocketContract { - events = new EventEmitter(); #controlChannel = null; init(contractFunc) { @@ -29,16 +27,16 @@ class HotPocketContract { #executeContract = (hpargs, contractFunc) => { // Keeps track of all the tasks (promises) that must be awaited before the termination. const pendingTasks = []; + const nplChannel = new NplChannel(hpargs.nplfd); const users = new UsersCollection(hpargs.userinfd, hpargs.users); - const peers = new PeersCollection(hpargs.readonly, hpargs.unl, hpargs.nplfd, pendingTasks, this.events); + const peers = new PeersCollection(hpargs.readonly, hpargs.unl, nplChannel, pendingTasks); const executionContext = new ContractExecutionContext(hpargs, users, peers, this.#controlChannel); - this.events.emit("session_start"); invokeCallback(contractFunc, executionContext).catch(errHandler).finally(() => { // Wait for any pending tasks added during execution. Promise.all(pendingTasks).catch(errHandler).finally(() => { - this.events.emit("session_end"); + nplChannel.close(); this.#terminate(); }); }); @@ -73,85 +71,51 @@ class ContractExecutionContext { class UsersCollection { #users = {}; - #totalUsers = 0; + #infd = null; constructor(userInputsFd, usersObj) { - const users = Object.entries(usersObj); + this.#infd = userInputsFd; - users.forEach(([pubKey, arr]) => { + Object.entries(usersObj).forEach(([pubKey, arr]) => { const outfd = arr[0]; // First array element is the output fd. arr.splice(0, 1); // Remove first element (output fd). The rest are pairs of msg offset/length tuples. - const channel = new UserChannel(userInputsFd, outfd, arr); - const user = new User(pubKey, channel); - - this.#users[pubKey] = { - user: user, - channel: channel - } + const channel = new UserChannel(outfd); + this.#users[pubKey] = new User(pubKey, channel, arr); }); - - this.#totalUsers = users.length; } // Returns the User for the specified pubkey. Returns null if not found. find(pubKey) { - const u = this.#users[pubKey]; - return u && u.user; + return this.#users[pubKey] } // Returns all the currently connected users. - get() { - return Object.values(this.#users).map(u => u.user); + list() { + return Object.values(this.#users); } count() { return Object.keys(this.#users).length; } - async onMessage(callback) { - - if (this.#totalUsers == 0) { - await Promise.resolve(); - return; - } - - // We create a promise which would get resolved when all users' message handling have completed. - const allUsersCompletedTask = new Promise(resolve => { - - let pendingUserCount = this.#totalUsers; - const userMessageTasks = []; - - const onUserMessage = (user, msg) => { - userMessageTasks.push(invokeCallback(callback, user, msg)); - }; - - const onUserComplete = () => { - pendingUserCount--; - if (pendingUserCount == 0) { - // All user message events has been emitted. - // Now start waiting for queued up user message callback completion. - Promise.all(userMessageTasks).catch(errHandler).finally(resolve); - } - } - - // Register callback to consume all users messages. - Object.values(this.#users).forEach(u => { - u.channel.consume((msg) => onUserMessage(u.user, msg), onUserComplete); - }) - }); - - await allUsersCompletedTask; + async read(input) { + const [offset, size] = input; + const buf = Buffer.alloc(size); + await readAsync(this.#infd, buf, offset, size); + return buf; } } class User { pubKey = null; + inputs = null; #channel = null; - constructor(pubKey, channel) { + constructor(pubKey, channel, inputs) { this.pubKey = pubKey; + this.inputs = inputs; this.#channel = channel; } @@ -161,27 +125,10 @@ class User { } class UserChannel { - #readStream = null; - #infd = -1; #outfd = -1; - #inputs = null; - constructor(infd, outfd, inputs) { - this.#infd = infd; + constructor(outfd) { this.#outfd = outfd; - this.#inputs = inputs; - } - - consume(onMessage, onComplete) { - - // Each input is 2 element array of [offset, length]. - for (const [offset, size] of this.#inputs) { - const buf = Buffer.alloc(size); - fs.readSync(this.#infd, buf, 0, size, offset); - onMessage(buf); - } - - onComplete(); } send(msg) { @@ -199,7 +146,7 @@ class PeersCollection { #readonly = false; #pendingTasks = null; - constructor(readonly, unl, nplfd, pendingTasks, events) { + constructor(readonly, unl, channel, pendingTasks) { this.#readonly = readonly; this.#pendingTasks = pendingTasks; @@ -208,8 +155,7 @@ class PeersCollection { this.#peers[pubKey] = new Peer(pubKey); }); - this.#channel = new NplChannel(nplfd); - events.on("session_end", () => this.#channel.close()); + this.#channel = channel; } } @@ -219,7 +165,7 @@ class PeersCollection { } // Returns all the peers. - get() { + list() { return Object.values(this.#peers); } @@ -328,6 +274,7 @@ class ControlChannel { const writeAsync = (fd, buf) => new Promise(resolve => fs.write(fd, buf, resolve)); const writevAsync = (fd, bufList) => new Promise(resolve => fs.writev(fd, bufList, resolve)); +const readAsync = (fd, buf, offset, size) => new Promise(resolve => fs.read(fd, buf, 0, size, offset, resolve)); const invokeCallback = async (callback, ...args) => { if (!callback) diff --git a/src/consensus.cpp b/src/consensus.cpp index 0f0ed3d7..27b01cab 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -885,6 +885,7 @@ namespace consensus else { // Populate the input content into the bufmap. + // It's VERY important that we preserve the proposal input hash order when feeding to the contract as well. candidate_user_input &cand_input = itr->second; sc::contract_iobufs &contract_user = bufmap[cand_input.userpubkey]; contract_user.inputs.push_back(cand_input.input);