mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Refactored node contract lib control flow. (#176)
This commit is contained in:
@@ -2,36 +2,43 @@ const { HotPocketContract } = require("./hp-contract-lib");
|
||||
const fs = require('fs');
|
||||
|
||||
// HP smart contract is defined as a function which takes HP ExecutionContext as an argument.
|
||||
// HP considers execution as complete, when this function completes and all the callbacks are complete.
|
||||
// HP considers execution as complete, when this function completes and all the peer message callbacks are complete.
|
||||
const echoContract = async (ctx) => {
|
||||
|
||||
// We just save execution timestamp as an example state file change.
|
||||
if (!ctx.readonly)
|
||||
fs.appendFileSync("exects.txt", "ts:" + ctx.timestamp + "\n");
|
||||
|
||||
// This will return after all user messages are processed.
|
||||
await ctx.users.onMessage(async (user, buf) => {
|
||||
// Collection of user input handler promises to wait for.
|
||||
const inputHandlers = [];
|
||||
|
||||
for (const user of ctx.users.list()) {
|
||||
|
||||
// This user's pubkey can be accessed from 'user.pubKey'
|
||||
// A reply message can be sent to the user by 'user.send(msg)'
|
||||
|
||||
for (const input of user.inputs) {
|
||||
|
||||
const msg = buf.toString();
|
||||
if (msg == "ts") {
|
||||
await user.send(fs.readFileSync("exects.txt"));
|
||||
}
|
||||
else {
|
||||
await user.send("Echoing: " + msg);
|
||||
}
|
||||
});
|
||||
inputHandlers.push(new Promise(async (resolve) => {
|
||||
|
||||
// Get list of all users who are connected.
|
||||
// ctx.users.get();
|
||||
const buf = await ctx.users.read(input);
|
||||
const msg = buf.toString();
|
||||
|
||||
if (msg == "ts")
|
||||
await user.send(fs.readFileSync("exects.txt"));
|
||||
else
|
||||
await user.send("Echoing: " + msg);
|
||||
|
||||
resolve();
|
||||
}));
|
||||
}
|
||||
}
|
||||
await Promise.all(inputHandlers);
|
||||
|
||||
// Get the user identified by public key.
|
||||
// ctx.users.find("<PubkeyHex>");
|
||||
|
||||
// Get list of all peers in the cluster.
|
||||
// ctx.peers.get();
|
||||
// ctx.peers.list();
|
||||
|
||||
// Get the peer identified by public key.
|
||||
// ctx.peers.find("<PubkeyHex>");
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
const { EventEmitter } = require('events');
|
||||
const fs = require('fs');
|
||||
|
||||
const MAX_SEQ_PACKET_SIZE = 128 * 1024;
|
||||
@@ -10,7 +9,6 @@ Object.freeze(CONTROL_MESSAGE);
|
||||
|
||||
class HotPocketContract {
|
||||
|
||||
events = new EventEmitter();
|
||||
#controlChannel = null;
|
||||
|
||||
init(contractFunc) {
|
||||
@@ -29,16 +27,16 @@ class HotPocketContract {
|
||||
#executeContract = (hpargs, contractFunc) => {
|
||||
// Keeps track of all the tasks (promises) that must be awaited before the termination.
|
||||
const pendingTasks = [];
|
||||
const nplChannel = new NplChannel(hpargs.nplfd);
|
||||
|
||||
const users = new UsersCollection(hpargs.userinfd, hpargs.users);
|
||||
const peers = new PeersCollection(hpargs.readonly, hpargs.unl, hpargs.nplfd, pendingTasks, this.events);
|
||||
const peers = new PeersCollection(hpargs.readonly, hpargs.unl, nplChannel, pendingTasks);
|
||||
const executionContext = new ContractExecutionContext(hpargs, users, peers, this.#controlChannel);
|
||||
|
||||
this.events.emit("session_start");
|
||||
invokeCallback(contractFunc, executionContext).catch(errHandler).finally(() => {
|
||||
// Wait for any pending tasks added during execution.
|
||||
Promise.all(pendingTasks).catch(errHandler).finally(() => {
|
||||
this.events.emit("session_end");
|
||||
nplChannel.close();
|
||||
this.#terminate();
|
||||
});
|
||||
});
|
||||
@@ -73,85 +71,51 @@ class ContractExecutionContext {
|
||||
class UsersCollection {
|
||||
|
||||
#users = {};
|
||||
#totalUsers = 0;
|
||||
#infd = null;
|
||||
|
||||
constructor(userInputsFd, usersObj) {
|
||||
const users = Object.entries(usersObj);
|
||||
this.#infd = userInputsFd;
|
||||
|
||||
users.forEach(([pubKey, arr]) => {
|
||||
Object.entries(usersObj).forEach(([pubKey, arr]) => {
|
||||
|
||||
const outfd = arr[0]; // First array element is the output fd.
|
||||
arr.splice(0, 1); // Remove first element (output fd). The rest are pairs of msg offset/length tuples.
|
||||
|
||||
const channel = new UserChannel(userInputsFd, outfd, arr);
|
||||
const user = new User(pubKey, channel);
|
||||
|
||||
this.#users[pubKey] = {
|
||||
user: user,
|
||||
channel: channel
|
||||
}
|
||||
const channel = new UserChannel(outfd);
|
||||
this.#users[pubKey] = new User(pubKey, channel, arr);
|
||||
});
|
||||
|
||||
this.#totalUsers = users.length;
|
||||
}
|
||||
|
||||
// Returns the User for the specified pubkey. Returns null if not found.
|
||||
find(pubKey) {
|
||||
const u = this.#users[pubKey];
|
||||
return u && u.user;
|
||||
return this.#users[pubKey]
|
||||
}
|
||||
|
||||
// Returns all the currently connected users.
|
||||
get() {
|
||||
return Object.values(this.#users).map(u => u.user);
|
||||
list() {
|
||||
return Object.values(this.#users);
|
||||
}
|
||||
|
||||
count() {
|
||||
return Object.keys(this.#users).length;
|
||||
}
|
||||
|
||||
async onMessage(callback) {
|
||||
|
||||
if (this.#totalUsers == 0) {
|
||||
await Promise.resolve();
|
||||
return;
|
||||
}
|
||||
|
||||
// We create a promise which would get resolved when all users' message handling have completed.
|
||||
const allUsersCompletedTask = new Promise(resolve => {
|
||||
|
||||
let pendingUserCount = this.#totalUsers;
|
||||
const userMessageTasks = [];
|
||||
|
||||
const onUserMessage = (user, msg) => {
|
||||
userMessageTasks.push(invokeCallback(callback, user, msg));
|
||||
};
|
||||
|
||||
const onUserComplete = () => {
|
||||
pendingUserCount--;
|
||||
if (pendingUserCount == 0) {
|
||||
// All user message events has been emitted.
|
||||
// Now start waiting for queued up user message callback completion.
|
||||
Promise.all(userMessageTasks).catch(errHandler).finally(resolve);
|
||||
}
|
||||
}
|
||||
|
||||
// Register callback to consume all users messages.
|
||||
Object.values(this.#users).forEach(u => {
|
||||
u.channel.consume((msg) => onUserMessage(u.user, msg), onUserComplete);
|
||||
})
|
||||
});
|
||||
|
||||
await allUsersCompletedTask;
|
||||
async read(input) {
|
||||
const [offset, size] = input;
|
||||
const buf = Buffer.alloc(size);
|
||||
await readAsync(this.#infd, buf, offset, size);
|
||||
return buf;
|
||||
}
|
||||
}
|
||||
|
||||
class User {
|
||||
pubKey = null;
|
||||
inputs = null;
|
||||
#channel = null;
|
||||
|
||||
constructor(pubKey, channel) {
|
||||
constructor(pubKey, channel, inputs) {
|
||||
this.pubKey = pubKey;
|
||||
this.inputs = inputs;
|
||||
this.#channel = channel;
|
||||
}
|
||||
|
||||
@@ -161,27 +125,10 @@ class User {
|
||||
}
|
||||
|
||||
class UserChannel {
|
||||
#readStream = null;
|
||||
#infd = -1;
|
||||
#outfd = -1;
|
||||
#inputs = null;
|
||||
|
||||
constructor(infd, outfd, inputs) {
|
||||
this.#infd = infd;
|
||||
constructor(outfd) {
|
||||
this.#outfd = outfd;
|
||||
this.#inputs = inputs;
|
||||
}
|
||||
|
||||
consume(onMessage, onComplete) {
|
||||
|
||||
// Each input is 2 element array of [offset, length].
|
||||
for (const [offset, size] of this.#inputs) {
|
||||
const buf = Buffer.alloc(size);
|
||||
fs.readSync(this.#infd, buf, 0, size, offset);
|
||||
onMessage(buf);
|
||||
}
|
||||
|
||||
onComplete();
|
||||
}
|
||||
|
||||
send(msg) {
|
||||
@@ -199,7 +146,7 @@ class PeersCollection {
|
||||
#readonly = false;
|
||||
#pendingTasks = null;
|
||||
|
||||
constructor(readonly, unl, nplfd, pendingTasks, events) {
|
||||
constructor(readonly, unl, channel, pendingTasks) {
|
||||
this.#readonly = readonly;
|
||||
this.#pendingTasks = pendingTasks;
|
||||
|
||||
@@ -208,8 +155,7 @@ class PeersCollection {
|
||||
this.#peers[pubKey] = new Peer(pubKey);
|
||||
});
|
||||
|
||||
this.#channel = new NplChannel(nplfd);
|
||||
events.on("session_end", () => this.#channel.close());
|
||||
this.#channel = channel;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,7 +165,7 @@ class PeersCollection {
|
||||
}
|
||||
|
||||
// Returns all the peers.
|
||||
get() {
|
||||
list() {
|
||||
return Object.values(this.#peers);
|
||||
}
|
||||
|
||||
@@ -328,6 +274,7 @@ class ControlChannel {
|
||||
|
||||
const writeAsync = (fd, buf) => new Promise(resolve => fs.write(fd, buf, resolve));
|
||||
const writevAsync = (fd, bufList) => new Promise(resolve => fs.writev(fd, bufList, resolve));
|
||||
const readAsync = (fd, buf, offset, size) => new Promise(resolve => fs.read(fd, buf, 0, size, offset, resolve));
|
||||
|
||||
const invokeCallback = async (callback, ...args) => {
|
||||
if (!callback)
|
||||
|
||||
@@ -885,6 +885,7 @@ namespace consensus
|
||||
else
|
||||
{
|
||||
// Populate the input content into the bufmap.
|
||||
// It's VERY important that we preserve the proposal input hash order when feeding to the contract as well.
|
||||
candidate_user_input &cand_input = itr->second;
|
||||
sc::contract_iobufs &contract_user = bufmap[cand_input.userpubkey];
|
||||
contract_user.inputs.push_back(cand_input.input);
|
||||
|
||||
Reference in New Issue
Block a user