diff --git a/examples/nodejs_client/file-client.js b/examples/nodejs_client/file-client.js index 1bb59098..8c81e2c0 100644 --- a/examples/nodejs_client/file-client.js +++ b/examples/nodejs_client/file-client.js @@ -1,35 +1,21 @@ const fs = require('fs'); const readline = require('readline'); -const sodium = require('libsodium-wrappers'); const { exit } = require('process'); -const { HotPocketClient, HotPocketProtocols, HotPocketEvents } = require('./hp-client-lib'); const bson = require('bson'); var path = require("path"); +const { HotPocketClient, HotPocketKeyGenerator, HotPocketEvents } = require('./hp-client-lib'); async function main() { - await sodium.ready; + const keys = await HotPocketKeyGenerator.generate(); - let keys = {}; - const key_file = '.hp_client_keys'; - if (!fs.existsSync(key_file)) { - keys = sodium.crypto_sign_keypair(); - keys.privateKey = sodium.to_hex(keys.privateKey) - keys.publicKey = sodium.to_hex(keys.publicKey) - fs.writeFileSync(key_file, JSON.stringify(keys)) - } else { - keys = JSON.parse(fs.readFileSync(key_file)) - keys.privateKey = Uint8Array.from(Buffer.from(keys.privateKey, 'hex')) - keys.publicKey = Uint8Array.from(Buffer.from(keys.publicKey, 'hex')) - } - - const pkhex = 'ed' + Buffer.from(keys.publicKey).toString('hex'); + const pkhex = Buffer.from(keys.publicKey).toString('hex'); console.log('My public key is: ' + pkhex); let server = 'wss://localhost:8080' if (process.argv.length == 3) server = 'wss://localhost:' + process.argv[2] if (process.argv.length == 4) server = 'wss://' + process.argv[2] + ':' + process.argv[3] - const hpc = new HotPocketClient(server, HotPocketProtocols.BSON, keys); + const hpc = new HotPocketClient(server, keys); // Establish HotPocket connection. if (!await hpc.connect()) { diff --git a/examples/nodejs_client/hp-client-lib.js b/examples/nodejs_client/hp-client-lib.js index 2a4f8222..a90dc1f0 100644 --- a/examples/nodejs_client/hp-client-lib.js +++ b/examples/nodejs_client/hp-client-lib.js @@ -1,8 +1,11 @@ -const ws_api = require('ws'); +const WebSocket = require('isomorphic-ws'); const sodium = require('libsodium-wrappers'); const EventEmitter = require('events'); const bson = require('bson'); +// Whether we are in NodeJS or Browser. +const isNodeJS = (typeof window === 'undefined'); + const protocols = { JSON: "json", BSON: "bson" @@ -16,17 +19,35 @@ const events = { } Object.freeze(events); -function HotPocketClient(server, protocol, keys) { +const HotPocketKeyGenerator = { + generate: async function (privateKeyHex = null) { + await sodium.ready; - if (protocol != protocols.JSON && protocol != protocols.BSON) - throw new Error("Protocol: 'json' or 'bson' expected."); + if (!privateKeyHex) { + const keys = sodium.crypto_sign_keypair(); + return { + privateKey: keys.privateKey, + publicKey: keys.publicKey + } + } + else { + const binPrivateKey = Buffer.from(privateKeyHex, "hex"); + return { + privateKey: Uint8Array.from(binPrivateKey), + publicKey: Uint8Array.from(binPrivateKey.slice(32)) + } + } + }, +} + +function HotPocketClient(server, keys, protocol = protocols.BSON) { let ws = null; const msgHelper = new MessageHelper(keys, protocol); const emitter = new EventEmitter(); let handshakeResolver = null; - let statResponseResolver = null; + let statResponseResolvers = []; let contractInputResolvers = {}; this.connect = function () { @@ -34,31 +55,47 @@ function HotPocketClient(server, protocol, keys) { handshakeResolver = resolve; - ws = new ws_api(server, { - rejectUnauthorized: false - }) + if (isNodeJS) { + ws = new WebSocket(server, { + rejectUnauthorized: false + }) + } + else { + ws = new WebSocket(server); + } - ws.on('close', () => { + ws.onclose = () => { // If there are any ongoing resolvers resolve them with error output. handshakeResolver && handshakeResolver(false); handshakeResolver = null; - statResponseResolver && statResponseResolver(null); - statResponseResolver = null; + statResponseResolvers.forEach(resolver => resolver(null)); + statResponseResolvers = []; Object.values(contractInputResolvers).forEach(resolver => resolver(null)); contractInputResolvers = {}; emitter.emit(events.disconnect); - }); + }; + + ws.onmessage = async (rcvd) => { + + if (isNodeJS) { + msg = rcvd.data; + } + else { + msg = (handshakeResolver || protocol == protocols.JSON) ? + await rcvd.data.text() : + Buffer.from(await rcvd.data.arrayBuffer()); + } - ws.on('message', (msg) => { try { // Use JSON if we are still in handshake phase. m = handshakeResolver ? JSON.parse(msg) : msgHelper.deserializeMessage(msg); } catch (e) { + console.log(e); console.log("Exception deserializing: "); console.log(msg) return; @@ -68,7 +105,7 @@ function HotPocketClient(server, protocol, keys) { // sign the challenge and send back the response const response = msgHelper.createHandshakeResponse(m.challenge); ws.send(JSON.stringify(response)); - + setTimeout(() => { // If we are still connected, report handshaking as successful. // (If websocket disconnects, handshakeResolver will be null) @@ -96,16 +133,18 @@ function HotPocketClient(server, protocol, keys) { emitter.emit(events.contractOutput, decoded); } else if (m.type == "stat_response") { - statResponseResolver && statResponseResolver({ - lcl: m.lcl, - lclSeqNo: m.lcl_seqno - }); - statResponseResolver = null; + statResponseResolvers.forEach(resolver => { + resolver({ + lcl: m.lcl, + lclSeqNo: m.lcl_seqno + }); + }) + statResponseResolvers = []; } else { console.log("Received unrecognized message: type:" + m.type); } - }); + } }); }; @@ -116,7 +155,7 @@ function HotPocketClient(server, protocol, keys) { this.close = function () { return new Promise(resolve => { try { - ws.removeAllListeners("close"); + ws.onclose = resolve; ws.on("close", resolve); ws.close(); } catch (error) { @@ -126,12 +165,16 @@ function HotPocketClient(server, protocol, keys) { } this.getStatus = function () { - const msg = msgHelper.createStatusRequest(); const p = new Promise(resolve => { - statResponseResolver = resolve; + statResponseResolvers.push(resolve); }); - ws.send(msgHelper.serializeObject(msg)); + // If this is the only awaiting stat request, then send an actual stat request. + // Otherwise simply wait for the previously sent request. + if (statResponseResolvers.length == 1) { + const msg = msgHelper.createStatusRequest(); + ws.send(msgHelper.serializeObject(msg)); + } return p; } @@ -146,7 +189,7 @@ function HotPocketClient(server, protocol, keys) { // Acquire the current lcl and add the specified offset. const stat = await this.getStatus(); if (!stat) - return new Promise(resolve => resolve(null)); + return new Promise(resolve => resolve("ledger_status_error")); const maxLclSeqNo = stat.lclSeqNo + maxLclOffset; const msg = msgHelper.createContractInput(input, nonce, maxLclSeqNo); @@ -185,7 +228,7 @@ function MessageHelper(keys, protocol) { } this.createHandshakeResponse = function (challenge) { - // For handshake response encoding we Hot Pocket always use json. + // For handshake response encoding Hot Pocket always uses json. // Handshake response will specify the protocol to use for subsequent messages. const sigBytes = sodium.crypto_sign_detached(challenge, keys.privateKey); return { @@ -236,8 +279,17 @@ function MessageHelper(keys, protocol) { } } -module.exports = { - HotPocketClient, - HotPocketProtocols: protocols, - HotPocketEvents: events -}; +if (isNodeJS) { + module.exports = { + HotPocketKeyGenerator, + HotPocketClient, + HotPocketEvents: events + }; +} +else { + window.HotPocket = { + KeyGenerator: HotPocketKeyGenerator, + Client: HotPocketClient, + Events: events + } +} \ No newline at end of file diff --git a/examples/nodejs_client/package-lock.json b/examples/nodejs_client/package-lock.json index 6487a803..d63ba334 100644 --- a/examples/nodejs_client/package-lock.json +++ b/examples/nodejs_client/package-lock.json @@ -30,11 +30,24 @@ "ieee754": "^1.1.4" } }, + "bufferutil": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.1.tgz", + "integrity": "sha512-xowrxvpxojqkagPcWRQVXZl0YXhRhAtBEIq3VoER1NH5Mw1n1o0ojdspp+GS2J//2gCVyrzQDApQ4unGF+QOoA==", + "requires": { + "node-gyp-build": "~3.7.0" + } + }, "ieee754": { "version": "1.1.13", "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.1.13.tgz", "integrity": "sha512-4vf7I2LYV/HaWerSo3XmlMkp5eZ83i+/CDluXi/IGTs/O1sejBNhTtnxzmRZfvOUqj7lZjqHkeTvpgSFDlWZTg==" }, + "isomorphic-ws": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/isomorphic-ws/-/isomorphic-ws-4.0.1.tgz", + "integrity": "sha512-BhBvN2MBpWTaSHdWRb/bwdZJ1WaehQ2L1KngkCkfLUGF0mAWAT1sQUQacEmQ0jXkFw/czDXPNQSL5u2/Krsz1w==" + }, "libsodium": { "version": "0.7.6", "resolved": "https://registry.npmjs.org/libsodium/-/libsodium-0.7.6.tgz", @@ -53,6 +66,19 @@ "resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz", "integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA==" }, + "node-gyp-build": { + "version": "3.7.0", + "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-3.7.0.tgz", + "integrity": "sha512-L/Eg02Epx6Si2NXmedx+Okg+4UHqmaf3TNcxd50SF9NQGcJaON3AtU++kax69XV7YWz4tUspqZSAsVofhFKG2w==" + }, + "utf-8-validate": { + "version": "5.0.2", + "resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.2.tgz", + "integrity": "sha512-SwV++i2gTD5qh2XqaPzBnNX88N6HdyhQrNNRykvcS0QKvItV9u3vPEJr+X5Hhfb1JC0r0e1alL0iB09rY8+nmw==", + "requires": { + "node-gyp-build": "~3.7.0" + } + }, "ws": { "version": "7.1.2", "resolved": "https://registry.npmjs.org/ws/-/ws-7.1.2.tgz", diff --git a/examples/nodejs_client/package.json b/examples/nodejs_client/package.json index d28333ae..435afe9c 100644 --- a/examples/nodejs_client/package.json +++ b/examples/nodejs_client/package.json @@ -2,6 +2,9 @@ "dependencies": { "libsodium-wrappers": "0.7.6", "ws": "7.1.2", - "bson": "4.0.4" + "isomorphic-ws": "4.0.1", + "bson": "4.0.4", + "utf-8-validate": "5.0.2", + "bufferutil": "4.0.1" } } diff --git a/examples/nodejs_client/text-client.js b/examples/nodejs_client/text-client.js index 179b1d1a..e6787b33 100644 --- a/examples/nodejs_client/text-client.js +++ b/examples/nodejs_client/text-client.js @@ -1,33 +1,19 @@ const fs = require('fs'); const readline = require('readline'); -const sodium = require('libsodium-wrappers'); const { exit } = require('process'); -const { HotPocketClient, HotPocketProtocols, HotPocketEvents } = require('./hp-client-lib'); +const { HotPocketClient, HotPocketKeyGenerator, HotPocketEvents } = require('./hp-client-lib'); async function main() { - await sodium.ready; + const keys = await HotPocketKeyGenerator.generate(); - let keys = {}; - const key_file = '.hp_client_keys'; - if (!fs.existsSync(key_file)) { - keys = sodium.crypto_sign_keypair(); - keys.privateKey = sodium.to_hex(keys.privateKey) - keys.publicKey = sodium.to_hex(keys.publicKey) - fs.writeFileSync(key_file, JSON.stringify(keys)) - } else { - keys = JSON.parse(fs.readFileSync(key_file)) - keys.privateKey = Uint8Array.from(Buffer.from(keys.privateKey, 'hex')) - keys.publicKey = Uint8Array.from(Buffer.from(keys.publicKey, 'hex')) - } - - const pkhex = 'ed' + Buffer.from(keys.publicKey).toString('hex'); + const pkhex = Buffer.from(keys.publicKey).toString('hex'); console.log('My public key is: ' + pkhex); let server = 'wss://localhost:8080' if (process.argv.length == 3) server = 'wss://localhost:' + process.argv[2] if (process.argv.length == 4) server = 'wss://' + process.argv[2] + ':' + process.argv[3] - const hpc = new HotPocketClient(server, HotPocketProtocols.JSON, keys); + const hpc = new HotPocketClient(server, keys); // Establish HotPocket connection. if (!await hpc.connect()) { @@ -68,13 +54,15 @@ async function main() { const input_pump = () => { rl.question('', (inp) => { - if (inp.startsWith("read ")) - hpc.sendContractReadRequest(inp.substr(5)) - else { - hpc.sendContractInput(inp).then(submissionStatus => { - if (submissionStatus && submissionStatus != "ok") - console.log("Input submission failed. reason: " + submissionStatus); - }); + if (inp.length > 0) { + if (inp.startsWith("read ")) + hpc.sendContractReadRequest(inp.substr(5)) + else { + hpc.sendContractInput(inp).then(submissionStatus => { + if (submissionStatus && submissionStatus != "ok") + console.log("Input submission failed. reason: " + submissionStatus); + }); + } } input_pump(); diff --git a/examples/nodejs_contract/echo_contract.js b/examples/nodejs_contract/echo_contract.js index 2874041d..6ee0ae03 100644 --- a/examples/nodejs_contract/echo_contract.js +++ b/examples/nodejs_contract/echo_contract.js @@ -1,56 +1,42 @@ const { HotPocketContract } = require("./hp-contract-lib"); const fs = require('fs'); +// HP smart contract is defined as a function which takes HP ExecutionContext as an argument. +// HP considers execution as complete, when this function completes and all the user message callbacks are complete. +const echoContract = (ctx) => { + + // We just save execution timestamp as an example state file change. + if (!ctx.readonly) + fs.appendFileSync("exects.txt", "ts:" + ctx.timestamp + "\n"); + + ctx.users.onMessage(async (user, buf) => { + + // This user's pubkey can be accessed from 'user.pubKey' + // A reply message can be sent to the user by 'user.send(msg)' + + const msg = buf.toString("utf8"); + if (msg == "ts") { + await user.send(fs.readFileSync("exects.txt")); + } + else { + await user.send("Echoing: " + msg); + } + }); + + // Broadcast message to all connected users. + // ctx.users.get().forEach(u => u.send("Hello")); + + // Send message to specific user (identified by public key). + // await ctx.users.find().send("Hello"); + + // Peer messages example. + // if (!ctx.readonly) { + // ctx.peers.onMessage((peer, msg) => { + // console.log(msg + " from " + peer.pubKey); + // }) + // await ctx.peers.send("Hello"); + // } +} + const hpc = new HotPocketContract(); - -//console.log("===Echo contract started==="); - -// We just save execution timestamp as an example state file change. -if (!hpc.readonly) - fs.appendFileSync("exects.txt", "ts:" + hpc.timestamp + "\n"); - -hpc.events.on("user_message", async (pubKey, message) => { - const userInput = message.toString("utf8"); - const user = hpc.users[pubKey]; - if (userInput == "ts") { - user.sendOutput(fs.readFileSync("exects.txt")); - } - else { - user.sendOutput("Echoing: " + userInput); - } -}); - -hpc.events.on("all_users_completed", () => { - hpc.terminate(); -}); - -// Developer should call run method after all the event subscriptions are done. -hpc.run(); - -// Control message sending and receiving template. -// const hp = hpc.control; -// hpc.events.on('control_message', (msg) => { -// console.log('control msg - ' + msg); -// hp.sendOutput(msg); -// }) - -// Npl message sending and receiving template. -// const npl = hpc.npl; -// if (npl) { -// let i = 0; -// let interval = setInterval(() => { -// npl.sendOutput(`npl${i} from contract`); -// if (i == 5) { -// clearInterval(interval); -// } -// i++; -// }, 500); - -// hpc.events.on("npl_message", msg => { -// if (msg) { -// console.log(msg); -// } -// }); -// } - -//console.log("===Echo contract ended==="); +hpc.init(echoContract); \ No newline at end of file diff --git a/examples/nodejs_contract/file_contract.js b/examples/nodejs_contract/file_contract.js index a7f3bad2..b11ce475 100644 --- a/examples/nodejs_contract/file_contract.js +++ b/examples/nodejs_contract/file_contract.js @@ -2,29 +2,20 @@ const { HotPocketContract } = require("./hp-contract-lib"); const fs = require('fs'); const bson = require('bson'); -const hpc = new HotPocketContract(); - -//console.log("===File contract started==="); - -Object.keys(hpc.users).forEach(function (key) { - const user = hpc.users[key]; - - user.readInput().then(input => { - if (!input) - return; - - const msg = bson.deserialize(input); +const fileContract = (ctx) => { + ctx.users.onMessage(async (user, buf) => { + const msg = bson.deserialize(buf); if (msg.type == "upload") { if (fs.existsSync(msg.fileName)) { - user.sendOutput(bson.serialize({ + await user.send(bson.serialize({ type: "uploadResult", status: "already_exists", fileName: msg.fileName })); } else if (msg.content.length > 10 * 1024 * 1024) { // 10MB - user.sendOutput(bson.serialize({ + await user.send(bson.serialize({ type: "uploadResult", status: "too_large", fileName: msg.fileName @@ -35,7 +26,7 @@ Object.keys(hpc.users).forEach(function (key) { // Save the file. fs.writeFileSync(msg.fileName, msg.content.buffer); - user.sendOutput(bson.serialize({ + await user.send(bson.serialize({ type: "uploadResult", status: "ok", fileName: msg.fileName @@ -45,14 +36,14 @@ Object.keys(hpc.users).forEach(function (key) { else if (msg.type == "delete") { if (fs.existsSync(msg.fileName)) { fs.unlinkSync(msg.fileName); - user.sendOutput(bson.serialize({ + await user.send(bson.serialize({ type: "deleteResult", status: "ok", fileName: msg.fileName })); } else { - user.sendOutput(bson.serialize({ + await user.send(bson.serialize({ type: "deleteResult", status: "not_found", fileName: msg.fileName @@ -62,7 +53,7 @@ Object.keys(hpc.users).forEach(function (key) { else if (msg.type == "download") { if (fs.existsSync(msg.fileName)) { const fileContent = fs.readFileSync(msg.fileName); - user.sendOutput(bson.serialize({ + await user.send(bson.serialize({ type: "downloadResult", status: "ok", fileName: msg.fileName, @@ -70,7 +61,7 @@ Object.keys(hpc.users).forEach(function (key) { })); } else { - user.sendOutput(bson.serialize({ + await user.send(bson.serialize({ type: "downloadResult", status: "not_found", fileName: msg.fileName @@ -78,6 +69,7 @@ Object.keys(hpc.users).forEach(function (key) { } } }); -}); +}; -//console.log("===File contract ended==="); \ No newline at end of file +const hpc = new HotPocketContract(); +hpc.init(fileContract); diff --git a/examples/nodejs_contract/hp-contract-lib.js b/examples/nodejs_contract/hp-contract-lib.js index 82d0af2b..8c3b525d 100644 --- a/examples/nodejs_contract/hp-contract-lib.js +++ b/examples/nodejs_contract/hp-contract-lib.js @@ -1,206 +1,377 @@ +const { EventEmitter } = require('events'); const fs = require('fs'); const MAX_SEQ_PACKET_SIZE = 128 * 1024; -let incompleteUserCount = 0; +class HotPocketContract { -function AsyncCallbackEmitter() { - this.callbacks = {}; + events = new EventEmitter(); + #controlChannel = null; - this.on = (event, callback) => { - if (!this.callbacks[event]) { - this.callbacks[event] = []; - } - this.callbacks[event].push(callback); - }; + init(contractFunc) { - this.emit = async (event, ...args) => { - let eventCallbacks = this.callbacks[event]; - if (eventCallbacks && eventCallbacks.length) { - await Promise.all(eventCallbacks.map(async callback => { - if (callback.constructor.name === 'AsyncFunction') { - await callback(...args); - } - else { - callback(...args); - } - })); - } - }; + if (this.#controlChannel) // Already initialized. + return; - this.removeAllListeners = () => { - this.callbacks = {}; - }; + // Parse HotPocket args. + const hpargs = JSON.parse(fs.readFileSync(0, 'utf8')); - this.removeListener = (event) => { - delete this.callbacks[event]; - }; + this.#controlChannel = new ControlChannel(hpargs.hpfd); + this.#executeContract(hpargs, contractFunc); + } + + #executeContract = (hpargs, contractFunc) => { + // Keeps track of all the tasks (promises) that must be awaited before the termination. + const pendingTasks = []; + + const users = new UsersCollection(hpargs.usrfd, pendingTasks, this.events); + const peers = new PeersCollection(hpargs.readonly, hpargs.unl, hpargs.nplfd, pendingTasks, this.events); + const executionContext = new ContractExecutionContext(hpargs, users, peers); + + this.events.emit("session_start"); + invokeCallback(contractFunc, executionContext).then(() => { + // Wait for any pending tasks added during execution. + Promise.all(pendingTasks).then(() => { + this.events.emit("session_end"); + this.#terminate(); + }); + }); + } + + #terminate = () => { + this.#controlChannel.send("Terminated") + this.#controlChannel.close(); + } } -function HotPocketContract() { - const hpargs = JSON.parse(fs.readFileSync(0, 'utf8')); - this.readonly = hpargs.readonly; - this.timestamp = hpargs.ts; - this.events = new AsyncCallbackEmitter(); +class ContractExecutionContext { - this.run = () => { - if (!this.readonly) { + constructor(hpargs, users, peers) { + this.readonly = hpargs.readonly; + this.timestamp = hpargs.ts; + this.users = users; + this.peers = peers; + + if (!hpargs.readonly) { const lclParts = hpargs.lcl.split("-"); this.lcl = { seqNo: parseInt(lclParts[0]), hash: lclParts[1] }; - - this.npl = new HotPocketNplChannel(this.events, hpargs.nplfd); } - - this.control = new HotPocketControlChannel(this.events, hpargs.hpfd); - - this.users = {}; - Object.keys(hpargs.usrfd).forEach((userPubKey) => { - this.users[userPubKey] = new HotPocketUserChannel(this.events, hpargs.usrfd[userPubKey], userPubKey); - incompleteUserCount++; - }); - - this.terminate = () => { - this.control.sendOutput("Terminated") - } - - if (!Object.keys(hpargs.usrfd).length) { - - this.events.emit("all_users_completed"); - } - }; + } } -function HotPocketUserChannel(events, fd, userPubKey) { - let socket = null; - if (fd > 0) { - socket = fs.createReadStream(null, { fd: fd }); +class UsersCollection { + + #users = {}; + #totalUsers = 0; + #pendingTasks = null + + constructor(usrfds, pendingTasks, events) { + const userKeys = Object.keys(usrfds); + + userKeys.forEach((pubKey) => { + const channel = new UserChannel(usrfds[pubKey]); + const user = new User(pubKey, channel); + this.#users[pubKey] = { + user: user, + channel: channel + } + }); + + this.#totalUsers = userKeys.length; + this.#pendingTasks = pendingTasks; + + events.on("session_end", () => Object.values(this.#users).forEach(u => u.channel.close())); + } + + // Returns the User for the specified pubkey. Returns null if not found. + find(pubKey) { + const u = this.#users[pubKey]; + return u && u.user; + } + + // Returns all the currently connected users. + get() { + return Object.values(this.#users).map(u => u.user); + } + + count() { + return Object.keys(this.#users).length; + } + + onMessage(callback) { + + if (this.#totalUsers == 0) + return Promise.resolve(); + + // We create a promise which would get resolved when all users' message emissions have completed. + const allUsersCompletedTask = new Promise(allUsersCompletionResolver => { + + let pendingUserCount = this.#totalUsers; + const userMessageTasks = []; + + const onUserMessage = (user, msg) => { + userMessageTasks.push(invokeCallback(callback, user, msg)); + }; + + const onUserComplete = () => { + pendingUserCount--; + if (pendingUserCount == 0) { + // All user message events has been emitted. + // Now start waiting for queued up user message callback completion. + Promise.all(userMessageTasks).then(allUsersCompletionResolver) + } + } + + // Register callback to consume all users messages. + Object.values(this.#users).forEach(u => { + u.channel.consume((msg) => onUserMessage(u.user, msg), onUserComplete); + }) + }); + + // We add the all users completed task to the global pending tasks list so the contract execution will not + // wrap up before this task is complete. + this.#pendingTasks.push(allUsersCompletedTask); + return allUsersCompletedTask; + } +} + +class User { + pubKey = null; + #channel = null; + + constructor(pubKey, channel) { + this.pubKey = pubKey; + this.#channel = channel; + } + + async send(msg) { + await this.#channel.send(msg); + } +} + +class UserChannel { + #readStream = null; + #fd = -1; + + constructor(fd) { + this.#fd = fd; + } + + consume(onMessage, onComplete) { + + this.#readStream = fs.createReadStream(null, { fd: this.#fd }); let dataParts = []; - let msgCount = -1; - let msgLen = -1; + let remainingMsgCount = -1; + let currentMsgLen = -1; let pos = 0; - socket.on("data", async (buf) => { + + // Read bytes from the given buffer. + const readBytes = (buf, pos, count) => { + if (pos + count > buf.byteLength) + return null; + return buf.slice(pos, pos + count); + } + + this.#readStream.on("data", (buf) => { pos = 0; - if (msgCount == -1) { + if (remainingMsgCount == -1) { const msgCountBuf = readBytes(buf, 0, 4) - msgCount = msgCountBuf.readUInt32BE(); + remainingMsgCount = msgCountBuf.readUInt32BE(); pos += 4; } + while (pos < buf.byteLength) { - if (msgLen == -1) { + if (currentMsgLen == -1) { const msgLenBuf = readBytes(buf, pos, 4); pos += 4; - msgLen = msgLenBuf.readUInt32BE(); + currentMsgLen = msgLenBuf.readUInt32BE(); } let possible_read_len; - if (((buf.byteLength - pos) - msgLen) >= 0) { + if (((buf.byteLength - pos) - currentMsgLen) >= 0) { // Can finish reading a full message. - possible_read_len = msgLen; - msgLen = -1; + possible_read_len = currentMsgLen; + currentMsgLen = -1; } else { // Only partial message is recieved. possible_read_len = buf.byteLength - pos - msgLen -= possible_read_len; + currentMsgLen -= possible_read_len; } const msgBuf = readBytes(buf, pos, possible_read_len); pos += possible_read_len; dataParts.push(msgBuf) - if (msgLen == -1) { - await events.emit("user_message", userPubKey, Buffer.concat(dataParts)); + if (currentMsgLen == -1) { + onMessage(Buffer.concat(dataParts)); dataParts = []; - msgCount-- + remainingMsgCount-- } } - if (msgCount == 0) { - msgCount = -1; - incompleteUserCount--; - if (incompleteUserCount == 0) { - events.emit("all_users_completed"); - } - events.emit("user_completed", userPubKey); + if (remainingMsgCount == 0) { + remainingMsgCount = -1; + onComplete(); } }); - - socket.on("error", (e) => { - events.emit("user_error", userPubKey, e); - }) } - // Read bytes from the given buffer. - const readBytes = function (buf, pos, count) { - if (pos + count > buf.byteLength) - return null; - return buf.slice(pos, pos + count); - } - - this.sendOutput = function (output) { - const outputStringBuf = Buffer.from(output); + send(msg) { + const messageBuf = Buffer.from(msg); let headerBuf = Buffer.alloc(4); // Writing message length in big endian format. - headerBuf.writeUInt32BE(outputStringBuf.byteLength) - fs.writeSync(fd, headerBuf); - fs.writeSync(fd, outputStringBuf); + headerBuf.writeUInt32BE(messageBuf.byteLength) + return writevAsync(this.#fd, [headerBuf, messageBuf]); + } + + close() { + this.#readStream && this.#readStream.close(); } } -function HotPocketNplChannel(events, fd) { +class PeersCollection { + #peers = {}; + #channel = null; + #readonly = false; + #pendingTasks = null; + + constructor(readonly, unl, nplfd, pendingTasks, events) { + this.#readonly = readonly; + this.#pendingTasks = pendingTasks; + + if (!readonly) { + unl.forEach(pubKey => { + this.#peers[pubKey] = new Peer(pubKey); + }); + + this.#channel = new NplChannel(nplfd); + events.on("session_end", () => this.#channel.close()); + } + } + + // Returns the Peer for the specified pubkey. Returns null if not found. + find(pubKey) { + return this.#peers[pubKey]; + } + + // Returns all the peers. + get() { + return Object.values(this.#peers); + } + + count() { + return Object.keys(this.#peers).length; + } + + // Registers for peer messages. + onMessage(callback) { + + if (this.#readonly) + throw "Peer messages not available in readonly mode."; + + this.#channel.consume((pubKey, msg) => { + this.#pendingTasks.push(invokeCallback(callback, this.#peers[pubKey], msg)); + }); + } + + // Broadcasts a message to all peers (including self). + async send(msg) { + if (this.#readonly) + throw "Peer messages not available in readonly mode."; + + await this.#channel.send(msg); + } +} + +class Peer { + pubKey = null; + + constructor(pubKey) { + this.pubKey = pubKey; + } +} + +class NplChannel { + + #readStream = null; + #fd = -1; + + constructor(fd) { + this.#fd = fd; + } + + consume(onMessage) { + + this.#readStream = fs.createReadStream(null, { fd: this.#fd, highWaterMark: MAX_SEQ_PACKET_SIZE }); - let socket = null; - let isPubKeyReceived = false; - let pubKey; - if (fd > 0) { // From the hotpocket when sending the npl messages first it sends the pubkey of the particular node // and then the message, First data buffer is taken as pubkey and the second one as message, // then npl message object is constructed and the event is emmited. - socket = fs.createReadStream(null, { fd: fd, highWaterMark: MAX_SEQ_PACKET_SIZE }); - socket.on("data", d => { - if (!isPubKeyReceived) { - pubKey = d.toString('hex'); - isPubKeyReceived = true; + let pubKey = null; + + this.#readStream.on("data", (data) => { + if (!pubKey) { + pubKey = data.toString('hex'); } else { - events.emit("npl_message", { - pubkey: pubKey, - input: d - }); + onMessage(pubKey, data); pubKey = null; - isPubKeyReceived = false; } }); - socket.on("error", (e) => { - events.emit("npl_error", e); - }); } - this.sendOutput = (output) => { - if (fd > 0) { - fs.writeSync(fd, output); - } + send(msg) { + const buf = Buffer.from(msg); + if (buf.length > MAX_SEQ_PACKET_SIZE) + throw ("Peer message exceeds max size " + MAX_SEQ_PACKET_SIZE); + return writeAsync(this.#fd, buf); + } + + close() { + this.#readStream && this.#readStream.close(); } } -function HotPocketControlChannel(events, fd) { - let socket = null; - if (fd > 0) { - socket = fs.createReadStream(null, { fd: fd, highWaterMark: MAX_SEQ_PACKET_SIZE }); - socket.on("data", d => { - events.emit("control_message", d); - }); +class ControlChannel { - socket.on("error", (e) => { - events.emit("control_error", e); - }); + #readStream = null; + #fd = -1; + + constructor(fd) { + this.#fd = fd; } - this.sendOutput = (output) => { - if (fd > 0) { - fs.writeSync(fd, output); - } + consume(onMessage) { + this.#readStream = fs.createReadStream(null, { fd: this.#fd, highWaterMark: MAX_SEQ_PACKET_SIZE }); + this.#readStream.on("data", onMessage); + } + + send(msg) { + const buf = Buffer.from(msg); + if (buf.length > MAX_SEQ_PACKET_SIZE) + throw ("Control message exceeds max size " + MAX_SEQ_PACKET_SIZE); + return writeAsync(this.#fd, buf); + } + + close() { + this.#readStream && this.#readStream.close(); + } +} + +const writeAsync = (fd, buf) => new Promise(resolve => fs.write(fd, buf, resolve)); +const writevAsync = (fd, bufList) => new Promise(resolve => fs.writev(fd, bufList, resolve)); + +const invokeCallback = async (callback, ...args) => { + if (!callback) + return; + + if (callback.constructor.name === 'AsyncFunction') { + await callback(...args); + } + else { + callback(...args); } } diff --git a/src/sc.cpp b/src/sc.cpp index d66d55aa..c8469ad5 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -418,8 +418,8 @@ namespace sc { if (npl_msg.lcl == ctx.args.lcl) { - // Writing the public key to the contract's fd. - if (write(writefd, npl_msg.pubkey.data(), npl_msg.pubkey.size()) == -1) + // Writing the public key to the contract's fd (Skip first byte for key type prefix). + if (write(writefd, npl_msg.pubkey.data() + 1, npl_msg.pubkey.size() - 1) == -1) return -1; // Writing the message to the contract's fd. if (write(writefd, npl_msg.data.data(), npl_msg.data.size()) == -1) diff --git a/test/local-cluster/Dockerfile b/test/local-cluster/Dockerfile index f18238e7..91771870 100644 --- a/test/local-cluster/Dockerfile +++ b/test/local-cluster/Dockerfile @@ -3,7 +3,7 @@ FROM node:12.18.3-buster-slim RUN apt-get update -RUN apt-get install -y libgomp1 libssl-dev +RUN apt-get install -y libgomp1 libssl-dev gdb # Install shared libraries. # Copy shared libraries and register it.