Revamped NodeJS library examples. (#151)

This commit is contained in:
Ravin Perera
2020-11-16 17:08:47 +05:30
committed by GitHub
parent 645f0023a0
commit 0098c3ddab
10 changed files with 487 additions and 283 deletions

View File

@@ -1,56 +1,42 @@
const { HotPocketContract } = require("./hp-contract-lib");
const fs = require('fs');
// HP smart contract is defined as a function which takes HP ExecutionContext as an argument.
// HP considers execution as complete, when this function completes and all the user message callbacks are complete.
const echoContract = (ctx) => {
// We just save execution timestamp as an example state file change.
if (!ctx.readonly)
fs.appendFileSync("exects.txt", "ts:" + ctx.timestamp + "\n");
ctx.users.onMessage(async (user, buf) => {
// This user's pubkey can be accessed from 'user.pubKey'
// A reply message can be sent to the user by 'user.send(msg)'
const msg = buf.toString("utf8");
if (msg == "ts") {
await user.send(fs.readFileSync("exects.txt"));
}
else {
await user.send("Echoing: " + msg);
}
});
// Broadcast message to all connected users.
// ctx.users.get().forEach(u => u.send("Hello"));
// Send message to specific user (identified by public key).
// await ctx.users.find(<PubkeyHex>).send("Hello");
// Peer messages example.
// if (!ctx.readonly) {
// ctx.peers.onMessage((peer, msg) => {
// console.log(msg + " from " + peer.pubKey);
// })
// await ctx.peers.send("Hello");
// }
}
const hpc = new HotPocketContract();
//console.log("===Echo contract started===");
// We just save execution timestamp as an example state file change.
if (!hpc.readonly)
fs.appendFileSync("exects.txt", "ts:" + hpc.timestamp + "\n");
hpc.events.on("user_message", async (pubKey, message) => {
const userInput = message.toString("utf8");
const user = hpc.users[pubKey];
if (userInput == "ts") {
user.sendOutput(fs.readFileSync("exects.txt"));
}
else {
user.sendOutput("Echoing: " + userInput);
}
});
hpc.events.on("all_users_completed", () => {
hpc.terminate();
});
// Developer should call run method after all the event subscriptions are done.
hpc.run();
// Control message sending and receiving template.
// const hp = hpc.control;
// hpc.events.on('control_message', (msg) => {
// console.log('control msg - ' + msg);
// hp.sendOutput(msg);
// })
// Npl message sending and receiving template.
// const npl = hpc.npl;
// if (npl) {
// let i = 0;
// let interval = setInterval(() => {
// npl.sendOutput(`npl${i} from contract`);
// if (i == 5) {
// clearInterval(interval);
// }
// i++;
// }, 500);
// hpc.events.on("npl_message", msg => {
// if (msg) {
// console.log(msg);
// }
// });
// }
//console.log("===Echo contract ended===");
hpc.init(echoContract);

View File

@@ -2,29 +2,20 @@ const { HotPocketContract } = require("./hp-contract-lib");
const fs = require('fs');
const bson = require('bson');
const hpc = new HotPocketContract();
//console.log("===File contract started===");
Object.keys(hpc.users).forEach(function (key) {
const user = hpc.users[key];
user.readInput().then(input => {
if (!input)
return;
const msg = bson.deserialize(input);
const fileContract = (ctx) => {
ctx.users.onMessage(async (user, buf) => {
const msg = bson.deserialize(buf);
if (msg.type == "upload") {
if (fs.existsSync(msg.fileName)) {
user.sendOutput(bson.serialize({
await user.send(bson.serialize({
type: "uploadResult",
status: "already_exists",
fileName: msg.fileName
}));
}
else if (msg.content.length > 10 * 1024 * 1024) { // 10MB
user.sendOutput(bson.serialize({
await user.send(bson.serialize({
type: "uploadResult",
status: "too_large",
fileName: msg.fileName
@@ -35,7 +26,7 @@ Object.keys(hpc.users).forEach(function (key) {
// Save the file.
fs.writeFileSync(msg.fileName, msg.content.buffer);
user.sendOutput(bson.serialize({
await user.send(bson.serialize({
type: "uploadResult",
status: "ok",
fileName: msg.fileName
@@ -45,14 +36,14 @@ Object.keys(hpc.users).forEach(function (key) {
else if (msg.type == "delete") {
if (fs.existsSync(msg.fileName)) {
fs.unlinkSync(msg.fileName);
user.sendOutput(bson.serialize({
await user.send(bson.serialize({
type: "deleteResult",
status: "ok",
fileName: msg.fileName
}));
}
else {
user.sendOutput(bson.serialize({
await user.send(bson.serialize({
type: "deleteResult",
status: "not_found",
fileName: msg.fileName
@@ -62,7 +53,7 @@ Object.keys(hpc.users).forEach(function (key) {
else if (msg.type == "download") {
if (fs.existsSync(msg.fileName)) {
const fileContent = fs.readFileSync(msg.fileName);
user.sendOutput(bson.serialize({
await user.send(bson.serialize({
type: "downloadResult",
status: "ok",
fileName: msg.fileName,
@@ -70,7 +61,7 @@ Object.keys(hpc.users).forEach(function (key) {
}));
}
else {
user.sendOutput(bson.serialize({
await user.send(bson.serialize({
type: "downloadResult",
status: "not_found",
fileName: msg.fileName
@@ -78,6 +69,7 @@ Object.keys(hpc.users).forEach(function (key) {
}
}
});
});
};
//console.log("===File contract ended===");
const hpc = new HotPocketContract();
hpc.init(fileContract);

View File

@@ -1,206 +1,377 @@
const { EventEmitter } = require('events');
const fs = require('fs');
const MAX_SEQ_PACKET_SIZE = 128 * 1024;
let incompleteUserCount = 0;
class HotPocketContract {
function AsyncCallbackEmitter() {
this.callbacks = {};
events = new EventEmitter();
#controlChannel = null;
this.on = (event, callback) => {
if (!this.callbacks[event]) {
this.callbacks[event] = [];
}
this.callbacks[event].push(callback);
};
init(contractFunc) {
this.emit = async (event, ...args) => {
let eventCallbacks = this.callbacks[event];
if (eventCallbacks && eventCallbacks.length) {
await Promise.all(eventCallbacks.map(async callback => {
if (callback.constructor.name === 'AsyncFunction') {
await callback(...args);
}
else {
callback(...args);
}
}));
}
};
if (this.#controlChannel) // Already initialized.
return;
this.removeAllListeners = () => {
this.callbacks = {};
};
// Parse HotPocket args.
const hpargs = JSON.parse(fs.readFileSync(0, 'utf8'));
this.removeListener = (event) => {
delete this.callbacks[event];
};
this.#controlChannel = new ControlChannel(hpargs.hpfd);
this.#executeContract(hpargs, contractFunc);
}
#executeContract = (hpargs, contractFunc) => {
// Keeps track of all the tasks (promises) that must be awaited before the termination.
const pendingTasks = [];
const users = new UsersCollection(hpargs.usrfd, pendingTasks, this.events);
const peers = new PeersCollection(hpargs.readonly, hpargs.unl, hpargs.nplfd, pendingTasks, this.events);
const executionContext = new ContractExecutionContext(hpargs, users, peers);
this.events.emit("session_start");
invokeCallback(contractFunc, executionContext).then(() => {
// Wait for any pending tasks added during execution.
Promise.all(pendingTasks).then(() => {
this.events.emit("session_end");
this.#terminate();
});
});
}
#terminate = () => {
this.#controlChannel.send("Terminated")
this.#controlChannel.close();
}
}
function HotPocketContract() {
const hpargs = JSON.parse(fs.readFileSync(0, 'utf8'));
this.readonly = hpargs.readonly;
this.timestamp = hpargs.ts;
this.events = new AsyncCallbackEmitter();
class ContractExecutionContext {
this.run = () => {
if (!this.readonly) {
constructor(hpargs, users, peers) {
this.readonly = hpargs.readonly;
this.timestamp = hpargs.ts;
this.users = users;
this.peers = peers;
if (!hpargs.readonly) {
const lclParts = hpargs.lcl.split("-");
this.lcl = {
seqNo: parseInt(lclParts[0]),
hash: lclParts[1]
};
this.npl = new HotPocketNplChannel(this.events, hpargs.nplfd);
}
this.control = new HotPocketControlChannel(this.events, hpargs.hpfd);
this.users = {};
Object.keys(hpargs.usrfd).forEach((userPubKey) => {
this.users[userPubKey] = new HotPocketUserChannel(this.events, hpargs.usrfd[userPubKey], userPubKey);
incompleteUserCount++;
});
this.terminate = () => {
this.control.sendOutput("Terminated")
}
if (!Object.keys(hpargs.usrfd).length) {
this.events.emit("all_users_completed");
}
};
}
}
function HotPocketUserChannel(events, fd, userPubKey) {
let socket = null;
if (fd > 0) {
socket = fs.createReadStream(null, { fd: fd });
class UsersCollection {
#users = {};
#totalUsers = 0;
#pendingTasks = null
constructor(usrfds, pendingTasks, events) {
const userKeys = Object.keys(usrfds);
userKeys.forEach((pubKey) => {
const channel = new UserChannel(usrfds[pubKey]);
const user = new User(pubKey, channel);
this.#users[pubKey] = {
user: user,
channel: channel
}
});
this.#totalUsers = userKeys.length;
this.#pendingTasks = pendingTasks;
events.on("session_end", () => Object.values(this.#users).forEach(u => u.channel.close()));
}
// Returns the User for the specified pubkey. Returns null if not found.
find(pubKey) {
const u = this.#users[pubKey];
return u && u.user;
}
// Returns all the currently connected users.
get() {
return Object.values(this.#users).map(u => u.user);
}
count() {
return Object.keys(this.#users).length;
}
onMessage(callback) {
if (this.#totalUsers == 0)
return Promise.resolve();
// We create a promise which would get resolved when all users' message emissions have completed.
const allUsersCompletedTask = new Promise(allUsersCompletionResolver => {
let pendingUserCount = this.#totalUsers;
const userMessageTasks = [];
const onUserMessage = (user, msg) => {
userMessageTasks.push(invokeCallback(callback, user, msg));
};
const onUserComplete = () => {
pendingUserCount--;
if (pendingUserCount == 0) {
// All user message events has been emitted.
// Now start waiting for queued up user message callback completion.
Promise.all(userMessageTasks).then(allUsersCompletionResolver)
}
}
// Register callback to consume all users messages.
Object.values(this.#users).forEach(u => {
u.channel.consume((msg) => onUserMessage(u.user, msg), onUserComplete);
})
});
// We add the all users completed task to the global pending tasks list so the contract execution will not
// wrap up before this task is complete.
this.#pendingTasks.push(allUsersCompletedTask);
return allUsersCompletedTask;
}
}
class User {
pubKey = null;
#channel = null;
constructor(pubKey, channel) {
this.pubKey = pubKey;
this.#channel = channel;
}
async send(msg) {
await this.#channel.send(msg);
}
}
class UserChannel {
#readStream = null;
#fd = -1;
constructor(fd) {
this.#fd = fd;
}
consume(onMessage, onComplete) {
this.#readStream = fs.createReadStream(null, { fd: this.#fd });
let dataParts = [];
let msgCount = -1;
let msgLen = -1;
let remainingMsgCount = -1;
let currentMsgLen = -1;
let pos = 0;
socket.on("data", async (buf) => {
// Read bytes from the given buffer.
const readBytes = (buf, pos, count) => {
if (pos + count > buf.byteLength)
return null;
return buf.slice(pos, pos + count);
}
this.#readStream.on("data", (buf) => {
pos = 0;
if (msgCount == -1) {
if (remainingMsgCount == -1) {
const msgCountBuf = readBytes(buf, 0, 4)
msgCount = msgCountBuf.readUInt32BE();
remainingMsgCount = msgCountBuf.readUInt32BE();
pos += 4;
}
while (pos < buf.byteLength) {
if (msgLen == -1) {
if (currentMsgLen == -1) {
const msgLenBuf = readBytes(buf, pos, 4);
pos += 4;
msgLen = msgLenBuf.readUInt32BE();
currentMsgLen = msgLenBuf.readUInt32BE();
}
let possible_read_len;
if (((buf.byteLength - pos) - msgLen) >= 0) {
if (((buf.byteLength - pos) - currentMsgLen) >= 0) {
// Can finish reading a full message.
possible_read_len = msgLen;
msgLen = -1;
possible_read_len = currentMsgLen;
currentMsgLen = -1;
} else {
// Only partial message is recieved.
possible_read_len = buf.byteLength - pos
msgLen -= possible_read_len;
currentMsgLen -= possible_read_len;
}
const msgBuf = readBytes(buf, pos, possible_read_len);
pos += possible_read_len;
dataParts.push(msgBuf)
if (msgLen == -1) {
await events.emit("user_message", userPubKey, Buffer.concat(dataParts));
if (currentMsgLen == -1) {
onMessage(Buffer.concat(dataParts));
dataParts = [];
msgCount--
remainingMsgCount--
}
}
if (msgCount == 0) {
msgCount = -1;
incompleteUserCount--;
if (incompleteUserCount == 0) {
events.emit("all_users_completed");
}
events.emit("user_completed", userPubKey);
if (remainingMsgCount == 0) {
remainingMsgCount = -1;
onComplete();
}
});
socket.on("error", (e) => {
events.emit("user_error", userPubKey, e);
})
}
// Read bytes from the given buffer.
const readBytes = function (buf, pos, count) {
if (pos + count > buf.byteLength)
return null;
return buf.slice(pos, pos + count);
}
this.sendOutput = function (output) {
const outputStringBuf = Buffer.from(output);
send(msg) {
const messageBuf = Buffer.from(msg);
let headerBuf = Buffer.alloc(4);
// Writing message length in big endian format.
headerBuf.writeUInt32BE(outputStringBuf.byteLength)
fs.writeSync(fd, headerBuf);
fs.writeSync(fd, outputStringBuf);
headerBuf.writeUInt32BE(messageBuf.byteLength)
return writevAsync(this.#fd, [headerBuf, messageBuf]);
}
close() {
this.#readStream && this.#readStream.close();
}
}
function HotPocketNplChannel(events, fd) {
class PeersCollection {
#peers = {};
#channel = null;
#readonly = false;
#pendingTasks = null;
constructor(readonly, unl, nplfd, pendingTasks, events) {
this.#readonly = readonly;
this.#pendingTasks = pendingTasks;
if (!readonly) {
unl.forEach(pubKey => {
this.#peers[pubKey] = new Peer(pubKey);
});
this.#channel = new NplChannel(nplfd);
events.on("session_end", () => this.#channel.close());
}
}
// Returns the Peer for the specified pubkey. Returns null if not found.
find(pubKey) {
return this.#peers[pubKey];
}
// Returns all the peers.
get() {
return Object.values(this.#peers);
}
count() {
return Object.keys(this.#peers).length;
}
// Registers for peer messages.
onMessage(callback) {
if (this.#readonly)
throw "Peer messages not available in readonly mode.";
this.#channel.consume((pubKey, msg) => {
this.#pendingTasks.push(invokeCallback(callback, this.#peers[pubKey], msg));
});
}
// Broadcasts a message to all peers (including self).
async send(msg) {
if (this.#readonly)
throw "Peer messages not available in readonly mode.";
await this.#channel.send(msg);
}
}
class Peer {
pubKey = null;
constructor(pubKey) {
this.pubKey = pubKey;
}
}
class NplChannel {
#readStream = null;
#fd = -1;
constructor(fd) {
this.#fd = fd;
}
consume(onMessage) {
this.#readStream = fs.createReadStream(null, { fd: this.#fd, highWaterMark: MAX_SEQ_PACKET_SIZE });
let socket = null;
let isPubKeyReceived = false;
let pubKey;
if (fd > 0) {
// From the hotpocket when sending the npl messages first it sends the pubkey of the particular node
// and then the message, First data buffer is taken as pubkey and the second one as message,
// then npl message object is constructed and the event is emmited.
socket = fs.createReadStream(null, { fd: fd, highWaterMark: MAX_SEQ_PACKET_SIZE });
socket.on("data", d => {
if (!isPubKeyReceived) {
pubKey = d.toString('hex');
isPubKeyReceived = true;
let pubKey = null;
this.#readStream.on("data", (data) => {
if (!pubKey) {
pubKey = data.toString('hex');
}
else {
events.emit("npl_message", {
pubkey: pubKey,
input: d
});
onMessage(pubKey, data);
pubKey = null;
isPubKeyReceived = false;
}
});
socket.on("error", (e) => {
events.emit("npl_error", e);
});
}
this.sendOutput = (output) => {
if (fd > 0) {
fs.writeSync(fd, output);
}
send(msg) {
const buf = Buffer.from(msg);
if (buf.length > MAX_SEQ_PACKET_SIZE)
throw ("Peer message exceeds max size " + MAX_SEQ_PACKET_SIZE);
return writeAsync(this.#fd, buf);
}
close() {
this.#readStream && this.#readStream.close();
}
}
function HotPocketControlChannel(events, fd) {
let socket = null;
if (fd > 0) {
socket = fs.createReadStream(null, { fd: fd, highWaterMark: MAX_SEQ_PACKET_SIZE });
socket.on("data", d => {
events.emit("control_message", d);
});
class ControlChannel {
socket.on("error", (e) => {
events.emit("control_error", e);
});
#readStream = null;
#fd = -1;
constructor(fd) {
this.#fd = fd;
}
this.sendOutput = (output) => {
if (fd > 0) {
fs.writeSync(fd, output);
}
consume(onMessage) {
this.#readStream = fs.createReadStream(null, { fd: this.#fd, highWaterMark: MAX_SEQ_PACKET_SIZE });
this.#readStream.on("data", onMessage);
}
send(msg) {
const buf = Buffer.from(msg);
if (buf.length > MAX_SEQ_PACKET_SIZE)
throw ("Control message exceeds max size " + MAX_SEQ_PACKET_SIZE);
return writeAsync(this.#fd, buf);
}
close() {
this.#readStream && this.#readStream.close();
}
}
const writeAsync = (fd, buf) => new Promise(resolve => fs.write(fd, buf, resolve));
const writevAsync = (fd, bufList) => new Promise(resolve => fs.writev(fd, bufList, resolve));
const invokeCallback = async (callback, ...args) => {
if (!callback)
return;
if (callback.constructor.name === 'AsyncFunction') {
await callback(...args);
}
else {
callback(...args);
}
}