diff --git a/examples/nodejs_contract/echo_contract.js b/examples/nodejs_contract/echo_contract.js index 3f4a340f..c1b3897b 100644 --- a/examples/nodejs_contract/echo_contract.js +++ b/examples/nodejs_contract/echo_contract.js @@ -12,19 +12,16 @@ if (!hpc.readonly) Object.keys(hpc.users).forEach(function (key) { const user = hpc.users[key]; - const inputBuf = user.readInput(); - if (inputBuf) { - const userInput = inputBuf.toString("utf8"); + user.readInput().then(inputBuf => { + if (inputBuf) { + const userInput = inputBuf.toString("utf8"); - // Append user input to a state file if not in read only mode. - if (!hpc.readonly) - fs.appendFileSync("userinputs.txt", userInput + "\n"); - - if (userInput == "ts") - user.sendOutput(fs.readFileSync("exects.txt")); - else - user.sendOutput("Echoing: " + userInput); - } + if (userInput == "ts") + user.sendOutput(fs.readFileSync("exects.txt")); + else + user.sendOutput("Echoing: " + userInput); + } + }) }); //console.log("===Echo contract ended==="); diff --git a/examples/nodejs_contract/file_contract.js b/examples/nodejs_contract/file_contract.js index a6acaa2e..a7f3bad2 100644 --- a/examples/nodejs_contract/file_contract.js +++ b/examples/nodejs_contract/file_contract.js @@ -1,29 +1,30 @@ +const { HotPocketContract } = require("./hp-contract-lib"); const fs = require('fs'); const bson = require('bson'); +const hpc = new HotPocketContract(); + //console.log("===File contract started==="); -const hpargs = JSON.parse(fs.readFileSync(0, 'utf8')); -//console.log("Contract args received from hp: " + hpargs); +Object.keys(hpc.users).forEach(function (key) { + const user = hpc.users[key]; -Object.keys(hpargs.usrfd).forEach(function (key) { - const userfds = hpargs.usrfd[key]; + user.readInput().then(input => { + if (!input) + return; - if (userfds[0] != -1) { - - const input = fs.readFileSync(userfds[0]); const msg = bson.deserialize(input); if (msg.type == "upload") { if (fs.existsSync(msg.fileName)) { - fs.writeSync(userfds[1], bson.serialize({ + user.sendOutput(bson.serialize({ type: "uploadResult", status: "already_exists", fileName: msg.fileName })); } else if (msg.content.length > 10 * 1024 * 1024) { // 10MB - fs.writeSync(userfds[1], bson.serialize({ + user.sendOutput(bson.serialize({ type: "uploadResult", status: "too_large", fileName: msg.fileName @@ -33,8 +34,8 @@ Object.keys(hpargs.usrfd).forEach(function (key) { // Save the file. fs.writeFileSync(msg.fileName, msg.content.buffer); - - fs.writeSync(userfds[1], bson.serialize({ + + user.sendOutput(bson.serialize({ type: "uploadResult", status: "ok", fileName: msg.fileName @@ -44,14 +45,14 @@ Object.keys(hpargs.usrfd).forEach(function (key) { else if (msg.type == "delete") { if (fs.existsSync(msg.fileName)) { fs.unlinkSync(msg.fileName); - fs.writeSync(userfds[1], bson.serialize({ + user.sendOutput(bson.serialize({ type: "deleteResult", status: "ok", fileName: msg.fileName })); } else { - fs.writeSync(userfds[1], bson.serialize({ + user.sendOutput(bson.serialize({ type: "deleteResult", status: "not_found", fileName: msg.fileName @@ -61,7 +62,7 @@ Object.keys(hpargs.usrfd).forEach(function (key) { else if (msg.type == "download") { if (fs.existsSync(msg.fileName)) { const fileContent = fs.readFileSync(msg.fileName); - fs.writeSync(userfds[1], bson.serialize({ + user.sendOutput(bson.serialize({ type: "downloadResult", status: "ok", fileName: msg.fileName, @@ -69,14 +70,14 @@ Object.keys(hpargs.usrfd).forEach(function (key) { })); } else { - fs.writeSync(userfds[1], bson.serialize({ + user.sendOutput(bson.serialize({ type: "downloadResult", status: "not_found", fileName: msg.fileName })); } } - } + }); }); //console.log("===File contract ended==="); \ No newline at end of file diff --git a/examples/nodejs_contract/hp-contract-lib.js b/examples/nodejs_contract/hp-contract-lib.js index 7297f220..46ad6df2 100644 --- a/examples/nodejs_contract/hp-contract-lib.js +++ b/examples/nodejs_contract/hp-contract-lib.js @@ -22,9 +22,42 @@ function HotPocketContract() { }); } +// Helper function to asynchronously read a stream to the end and fill a buffer. +const drainStream = function (stream) { + + return new Promise((resolve) => { + + const dataParts = []; + + const resolveBuffer = function () { + if (dataParts.length > 0) + return resolve(Buffer.concat(dataParts)); + else + return resolve(null); + } + + stream.on("data", d => { + dataParts.push(d); + }); + stream.on('end', resolveBuffer); + stream.on("close", resolveBuffer); + stream.on("error", () => { + resolve(null); + }); + }); +} + function HotPocketChannel(infd, outfd) { this.readInput = function () { - return infd == -1 ? null : fs.readFileSync(infd); + return new Promise((resolve) => { + if (infd == -1) { + resolve(null); + } + else { + const s = fs.createReadStream(null, { fd: infd }); + drainStream(s).then(buf => resolve(buf)); + } + }); } this.sendOutput = function (output) { @@ -33,9 +66,8 @@ function HotPocketChannel(infd, outfd) { } function HotPocketNplChannel(infd, outfd) { - this.readInput = function () { - if (infd == -1) - return null; + + const parseNplInputs = function (buf) { // Input may consist of multiple messages. // Each message has the format: @@ -43,7 +75,6 @@ function HotPocketNplChannel(infd, outfd) { const inputs = []; // Peer inputs will be populated to this. - const buf = fs.readFileSync(infd); let pos = 0; while (pos < buf.byteLength) { @@ -74,15 +105,27 @@ function HotPocketNplChannel(infd, outfd) { return inputs; } - this.sendOutput = function (output) { - fs.writeFileSync(outfd, output); - } - const readBytes = function (buf, pos, count) { if (pos + count > buf.byteLength) return null; return buf.slice(pos, pos + count); } + + this.readInput = function () { + return new Promise((resolve) => { + if (infd == -1) { + resolve(null); + } + else { + const s = fs.createReadStream(null, { fd: infd }); + drainStream(s).then(buf => resolve(parseNplInputs(buf))); + } + }); + } + + this.sendOutput = function (output) { + fs.writeFileSync(outfd, output); + } } module.exports = { diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index 5bf716ca..eddbcfaf 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -321,22 +321,23 @@ namespace cons */ void broadcast_nonunl_proposal() { - std::lock_guard lock(p2p::ctx.collected_msgs.nonunl_proposals_mutex); - if (usr::ctx.users.empty()) return; // Construct NUP. p2p::nonunl_proposal nup; - for (auto &[sid, user] : usr::ctx.users) { - std::list user_inputs; - user_inputs.splice(user_inputs.end(), user.submitted_inputs); + std::lock_guard(usr::ctx.users_mutex); + for (auto &[sid, user] : usr::ctx.users) + { + std::list user_inputs; + user_inputs.splice(user_inputs.end(), user.submitted_inputs); - // We should create an entry for each user pubkey, even if the user has no inputs. This is - // because this data map will be used to track connected users as well in addition to inputs. - nup.user_inputs.try_emplace(user.pubkey, std::move(user_inputs)); + // We should create an entry for each user pubkey, even if the user has no inputs. This is + // because this data map will be used to track connected users as well in addition to inputs. + nup.user_inputs.try_emplace(user.pubkey, std::move(user_inputs)); + } } flatbuffers::FlatBufferBuilder fbuf(1024); diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 82ad2d8e..a923767e 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -75,7 +75,8 @@ namespace p2p return -1; } - // Converting the binary pub key into hexa decimal string this will be used as the key in storing peer sessions + // Converting the binary pub key into hexadecimal string. + // This will be used as the lookup key in storing peer sessions. std::string pubkeyhex; util::bin2hex(pubkeyhex, reinterpret_cast(challenge_resp.pubkey.data()), challenge_resp.pubkey.length()); @@ -145,7 +146,7 @@ namespace p2p /** * Broadcasts the given message to all currently connected outbound peers. - * @param msg Peer outbound message to be broadcasted. + * @param fbuf Peer outbound message to be broadcasted. * @param send_to_self Whether to also send the message to self (this node). */ void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self) @@ -173,7 +174,7 @@ namespace p2p /** * Sends the given message to self (this node). - * @param msg Peer outbound message to be sent to self. + * @param fbuf Peer outbound message to be sent to self. */ void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf) { @@ -194,7 +195,7 @@ namespace p2p /** * Sends the given message to a random peer (except self). - * @param msg Peer outbound message to be sent to peer. + * @param fbuf Peer outbound message to be sent to peer. */ void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf) { diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index 63b96a97..3c7d06f5 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -52,7 +52,7 @@ namespace usr struct connected_context { // Connected (authenticated) user list. - // Map key: User socket session id () + // Map key: User socket session id. std::unordered_map users; std::mutex users_mutex; // Mutex for users access race conditions. diff --git a/test/bin/hpfs b/test/bin/hpfs index 90885c21..ca9d6496 100755 Binary files a/test/bin/hpfs and b/test/bin/hpfs differ