Updated nodejs contract lib for streamed input reading. (#110)

This commit is contained in:
Ravin Perera
2020-08-21 15:27:25 +05:30
committed by GitHub
parent baf5d8b14a
commit 465573ad29
7 changed files with 93 additions and 50 deletions

View File

@@ -12,19 +12,16 @@ if (!hpc.readonly)
Object.keys(hpc.users).forEach(function (key) {
const user = hpc.users[key];
const inputBuf = user.readInput();
if (inputBuf) {
const userInput = inputBuf.toString("utf8");
user.readInput().then(inputBuf => {
if (inputBuf) {
const userInput = inputBuf.toString("utf8");
// Append user input to a state file if not in read only mode.
if (!hpc.readonly)
fs.appendFileSync("userinputs.txt", userInput + "\n");
if (userInput == "ts")
user.sendOutput(fs.readFileSync("exects.txt"));
else
user.sendOutput("Echoing: " + userInput);
}
if (userInput == "ts")
user.sendOutput(fs.readFileSync("exects.txt"));
else
user.sendOutput("Echoing: " + userInput);
}
})
});
//console.log("===Echo contract ended===");

View File

@@ -1,29 +1,30 @@
const { HotPocketContract } = require("./hp-contract-lib");
const fs = require('fs');
const bson = require('bson');
const hpc = new HotPocketContract();
//console.log("===File contract started===");
const hpargs = JSON.parse(fs.readFileSync(0, 'utf8'));
//console.log("Contract args received from hp: " + hpargs);
Object.keys(hpc.users).forEach(function (key) {
const user = hpc.users[key];
Object.keys(hpargs.usrfd).forEach(function (key) {
const userfds = hpargs.usrfd[key];
user.readInput().then(input => {
if (!input)
return;
if (userfds[0] != -1) {
const input = fs.readFileSync(userfds[0]);
const msg = bson.deserialize(input);
if (msg.type == "upload") {
if (fs.existsSync(msg.fileName)) {
fs.writeSync(userfds[1], bson.serialize({
user.sendOutput(bson.serialize({
type: "uploadResult",
status: "already_exists",
fileName: msg.fileName
}));
}
else if (msg.content.length > 10 * 1024 * 1024) { // 10MB
fs.writeSync(userfds[1], bson.serialize({
user.sendOutput(bson.serialize({
type: "uploadResult",
status: "too_large",
fileName: msg.fileName
@@ -33,8 +34,8 @@ Object.keys(hpargs.usrfd).forEach(function (key) {
// Save the file.
fs.writeFileSync(msg.fileName, msg.content.buffer);
fs.writeSync(userfds[1], bson.serialize({
user.sendOutput(bson.serialize({
type: "uploadResult",
status: "ok",
fileName: msg.fileName
@@ -44,14 +45,14 @@ Object.keys(hpargs.usrfd).forEach(function (key) {
else if (msg.type == "delete") {
if (fs.existsSync(msg.fileName)) {
fs.unlinkSync(msg.fileName);
fs.writeSync(userfds[1], bson.serialize({
user.sendOutput(bson.serialize({
type: "deleteResult",
status: "ok",
fileName: msg.fileName
}));
}
else {
fs.writeSync(userfds[1], bson.serialize({
user.sendOutput(bson.serialize({
type: "deleteResult",
status: "not_found",
fileName: msg.fileName
@@ -61,7 +62,7 @@ Object.keys(hpargs.usrfd).forEach(function (key) {
else if (msg.type == "download") {
if (fs.existsSync(msg.fileName)) {
const fileContent = fs.readFileSync(msg.fileName);
fs.writeSync(userfds[1], bson.serialize({
user.sendOutput(bson.serialize({
type: "downloadResult",
status: "ok",
fileName: msg.fileName,
@@ -69,14 +70,14 @@ Object.keys(hpargs.usrfd).forEach(function (key) {
}));
}
else {
fs.writeSync(userfds[1], bson.serialize({
user.sendOutput(bson.serialize({
type: "downloadResult",
status: "not_found",
fileName: msg.fileName
}));
}
}
}
});
});
//console.log("===File contract ended===");

View File

@@ -22,9 +22,42 @@ function HotPocketContract() {
});
}
// Helper function to asynchronously read a stream to the end and fill a buffer.
const drainStream = function (stream) {
return new Promise((resolve) => {
const dataParts = [];
const resolveBuffer = function () {
if (dataParts.length > 0)
return resolve(Buffer.concat(dataParts));
else
return resolve(null);
}
stream.on("data", d => {
dataParts.push(d);
});
stream.on('end', resolveBuffer);
stream.on("close", resolveBuffer);
stream.on("error", () => {
resolve(null);
});
});
}
function HotPocketChannel(infd, outfd) {
this.readInput = function () {
return infd == -1 ? null : fs.readFileSync(infd);
return new Promise((resolve) => {
if (infd == -1) {
resolve(null);
}
else {
const s = fs.createReadStream(null, { fd: infd });
drainStream(s).then(buf => resolve(buf));
}
});
}
this.sendOutput = function (output) {
@@ -33,9 +66,8 @@ function HotPocketChannel(infd, outfd) {
}
function HotPocketNplChannel(infd, outfd) {
this.readInput = function () {
if (infd == -1)
return null;
const parseNplInputs = function (buf) {
// Input may consist of multiple messages.
// Each message has the format:
@@ -43,7 +75,6 @@ function HotPocketNplChannel(infd, outfd) {
const inputs = []; // Peer inputs will be populated to this.
const buf = fs.readFileSync(infd);
let pos = 0;
while (pos < buf.byteLength) {
@@ -74,15 +105,27 @@ function HotPocketNplChannel(infd, outfd) {
return inputs;
}
this.sendOutput = function (output) {
fs.writeFileSync(outfd, output);
}
const readBytes = function (buf, pos, count) {
if (pos + count > buf.byteLength)
return null;
return buf.slice(pos, pos + count);
}
this.readInput = function () {
return new Promise((resolve) => {
if (infd == -1) {
resolve(null);
}
else {
const s = fs.createReadStream(null, { fd: infd });
drainStream(s).then(buf => resolve(parseNplInputs(buf)));
}
});
}
this.sendOutput = function (output) {
fs.writeFileSync(outfd, output);
}
}
module.exports = {