diff --git a/examples/hpcontract/contract.js b/examples/hpcontract/contract.js index 62ddf585..43d8e093 100644 --- a/examples/hpcontract/contract.js +++ b/examples/hpcontract/contract.js @@ -6,7 +6,7 @@ const pipe = require('posix-pipe-fork-exec') let input = Buffer.from(pipe.getfdbytes(0)).toString() console.log("===Sample contract started==="); -console.log("Input received from hp: " + input); +console.log("Contract args received from hp: " + input); let hpargs = JSON.parse(input); @@ -21,4 +21,11 @@ Object.keys(hpargs.usrfd).forEach(function (key, index) { } }); +let hpinput = Buffer.from(pipe.getfdbytes(hpargs.hpfd[0])).toString().trim(); +if (hpinput.length > 0) { + console.log("Input received from hp:"); + console.log(hpinput); + fs.writeSync(hpargs.hpfd[1], "Echoing: " + hpinput); +} + console.log("===Sample contract ended==="); \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index c29b1ccd..bf1fa40f 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -108,7 +108,8 @@ int main(int argc, char **argv) while (true) { - sleep(1); + sleep(2); + // Test code to execute contract and collect outputs. std::unordered_map> userbufs; for (auto &[sid, user] : usr::users) @@ -121,8 +122,10 @@ int main(int argc, char **argv) bufpair.first = std::move(inputtosend); userbufs[user.pubkey] = bufpair; } + std::pair hpscbufpair; + hpscbufpair.first = "{msg:'Message from HP'}"; - proc::ContractExecArgs eargs(123123345, userbufs); + proc::ContractExecArgs eargs(123123345, userbufs, hpscbufpair); proc::exec_contract(eargs); for (auto &[pubkey, bufpair] : userbufs) @@ -139,6 +142,9 @@ int main(int argc, char **argv) } } + if (!hpscbufpair.second.empty()) + std::cout << "Message from SC: " << hpscbufpair.second << std::endl; + userbufs.clear(); } diff --git a/src/proc.cpp b/src/proc.cpp index 31765ffa..a8ae5ba2 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -14,9 +14,7 @@ namespace proc { -/** - * Enum used to differenciate pipe fds maintained for SC I/O pipes. - */ +// Enum used to differenciate pipe fds maintained for SC I/O pipes. enum FDTYPE { // Used by Smart Contract to read input sent by Hot Pocket @@ -29,14 +27,13 @@ enum FDTYPE SCWRITE = 3 }; -/** - * Map of user pipe fds (map key: user public key) - */ +// Map of user pipe fds (map key: user public key) std::unordered_map> userfds; -/** - * Holds the contract process id (if currently executing). - */ +// Pipe fds for HP <--> messages. +std::vector hpscfds; + +// Holds the contract process id (if currently executing). __pid_t contract_pid; /** @@ -46,8 +43,15 @@ __pid_t contract_pid; */ int exec_contract(const ContractExecArgs &args) { + // Write any hp input messages to hp->sc pipe. + if (write_contract_hp_inputs(args) != 0) + { + std::cerr << "Failed to write HP input to contract.\n"; + return -1; + } + // Write any verified (consensus-reached) user inputs to user pipes. - if (write_verified_user_inputs(args) != 0) + if (write_contract_user_inputs(args) != 0) { cleanup_userfds(); std::cerr << "Failed to write user inputs to contract.\n"; @@ -60,11 +64,11 @@ int exec_contract(const ContractExecArgs &args) // HotPocket process. contract_pid = pid; - // Close all user fds unused by HP process. - close_unused_userfds(true); + // Close all fds unused by HP process. + close_unused_fds(true); // Wait for child process (contract process) to complete execution. - + int presult = await_contract_execution(); contract_pid = 0; if (presult != 0) @@ -73,13 +77,14 @@ int exec_contract(const ContractExecArgs &args) return -1; } - // After contract execution, collect contract user outputs. - if (read_contract_user_outputs(args) != 0) - { - std::cerr << "Failed to read user outputs from contract.\n"; + // After contract execution, collect contract outputs. + + if (read_contract_hp_outputs(args) != 0) return -1; - }; - + + if (read_contract_user_outputs(args) != 0) + return -1; + userfds.clear(); } else if (pid == 0) @@ -87,14 +92,14 @@ int exec_contract(const ContractExecArgs &args) // Contract process. // Set up the process environment and overlay the contract binary program with execv(). - // Close all user fds unused by SC process. - close_unused_userfds(false); + // Close all fds unused by SC process. + close_unused_fds(false); // Set the contract process working directory. chdir(conf::ctx.contractDir.data()); // Write the contract input message from HotPocket to the stdin (0) of the contract process. - write_to_stdin(args); + write_contract_args(args); char *execv_args[] = {conf::cfg.binary.data(), conf::cfg.binargs.data(), NULL}; execv(execv_args[0], execv_args); @@ -126,20 +131,21 @@ int await_contract_execution() } /** - * Writes the contract input message into the stdin of the contract process. - * Input format: + * Writes the contract args (JSON) into the stdin of the contract process. + * Args format: * { * "version":"", * "pubkey": "", * "ts": , + * "hpfd": [fd0, fd1], * "usrfd":{ "":[fd0, fd1], ... }, * "nplfd":{ "":[fd0, fd1], ... }, * "unl":[ "pkhex", ... ] * } */ -int write_to_stdin(const ContractExecArgs &args) +int write_contract_args(const ContractExecArgs &args) { - // Populate the json strring with contract args. + // Populate the json string with contract args. // We don't use a JSOn parser here because it's lightweight to contrstuct the // json string manually. @@ -147,7 +153,8 @@ int write_to_stdin(const ContractExecArgs &args) os << "{\"version\":\"" << util::HP_VERSION << "\",\"pubkey\":\"" << conf::cfg.pubkeyhex << "\",\"ts\":" << args.timestamp - << ",\"usrfd\":{"; + << ",\"hpfd\":[" << hpscfds[FDTYPE::SCREAD] << "," << hpscfds[FDTYPE::SCWRITE] + << "],\"usrfd\":{"; for (auto itr = userfds.begin(); itr != userfds.end(); itr++) { @@ -204,66 +211,62 @@ int write_to_stdin(const ContractExecArgs &args) } /** - * Creates the pipes and writes verified (consesus-reached) user - * input to the SC via the pipe. + * Writes any hp input messages to the contract. */ -int write_verified_user_inputs(const ContractExecArgs &args) +int write_contract_hp_inputs(const ContractExecArgs &args) { + if (create_and_write_iopipes(hpscfds, args.hpscbufs.first) != 0) // hpscbufs.first is the input buffer. + { + std::cerr << "Error writing HP input to SC (" << args.hpscbufs.first.length() + << " bytes)" << std::endl; + return -1; + } + return 0; +} + +/** + * Creates the pipes and writes verified (consesus-reached) user + * inputs to the SC via the pipe. + */ +int write_contract_user_inputs(const ContractExecArgs &args) +{ + // Loop through input buffer for each user. for (auto &[pubkey, bufpair] : args.userbufs) { - int inpipe[2]; - if (pipe(inpipe) != 0) - return -1; + userfds[pubkey] = std::move(std::vector()); + std::vector &fds = userfds[pubkey]; - int outpipe[2]; - if (pipe(outpipe) != 0) + if (create_and_write_iopipes(fds, bufpair.first) != 0) // bufpair.first is the input buffer. { - // Close the earlier created pipe. - close(inpipe[0]); - close(inpipe[1]); + std::cerr << "Error writing contract input (" << bufpair.first.length() + << " bytes) from user" << std::endl; return -1; } - - // If both pipes got created, assign them to the fd map. - std::vector fds; - fds.push_back(inpipe[0]); //SCREAD - fds.push_back(inpipe[1]); //HPWRITE - fds.push_back(outpipe[0]); //HPREAD - fds.push_back(outpipe[1]); //SCWRITE - userfds[pubkey] = fds; - - // Write the user input (if any) into the contract and close the writefd. - - int writefd = fds[FDTYPE::HPWRITE]; - - if (!bufpair.first.empty()) // bufpair.first is the input buffer. - { - // We use vmsplice to map (zero-copy) the user input into the fd. - iovec memsegs[1]; - memsegs[0].iov_base = bufpair.first.data(); - memsegs[0].iov_len = bufpair.first.length(); - - if (vmsplice(writefd, memsegs, 1, 0) == -1) - { - std::cerr << "Error writing contract input (" << bufpair.first.length() - << " bytes) from user" << std::endl; - } - - // It's important that we DO NOT clear the input buffer string until the contract - // process has actually read from the fd. Because the OS is just mapping our - // input buffer memory portion into the fd, if we clear it now, the contract process - // will get invaid bytes when reading the fd. Therefore we clear the input buffer - // inside read_contract_user_outputs(). - } - - // Close the writefd since we no longer need it for this round. - close(writefd); - fds[FDTYPE::HPWRITE] = 0; } return 0; } +/** + * Read all HP output messages produced by the contract process and store them in + * the buffer for later processing. + * + * @return 0 on success. -1 on failure. + */ +int read_contract_hp_outputs(const ContractExecArgs &args) +{ + // Clear the input buffer because we are sure the contract has finished reading from + // that mapped memory portion. + args.hpscbufs.first.clear(); //bufpair.first is the input buffer. + + if (read_iopipe(hpscfds, args.hpscbufs.second) != 0) // hpscbufs.second is the output buffer. + { + std::cerr << "Error reading HP output"; + return -1; + } + return 0; +} + /** * Read all per-user outputs produced by the contract process and store them in * the user buffer for later processing. @@ -272,14 +275,6 @@ int write_verified_user_inputs(const ContractExecArgs &args) */ int read_contract_user_outputs(const ContractExecArgs &args) { - // Read any outputs that have been written by the contract process - // from all the user outpipes and store in the outbuffer of each user. - // User outbuffer will be read by the consensus process later when it wishes so. - - // Future optmization: Read and populate user buffers parallely. - // Currently this is sequential for simplicity which will not scale well - // when there are large number of users connected to the same HP node. - for (auto &[pubkey, bufpair] : args.userbufs) { // Clear the input buffer because we are sure the contract has finished reading from @@ -288,67 +283,17 @@ int read_contract_user_outputs(const ContractExecArgs &args) // Get fds for the user by pubkey. std::vector &fds = userfds[pubkey]; - int readfd = fds[FDTYPE::HPREAD]; - int bytes_available = 0; - ioctl(readfd, FIONREAD, &bytes_available); - if (bytes_available > 0) + if (read_iopipe(fds, bufpair.second) != 0) // bufpair.second is the output buffer. { - bufpair.second.resize(bytes_available); // bufpair.second is the output buffer. - - // Populate the user output buffer with new data from the pipe. - // We use vmsplice to map (zero-copy) the output from the fd into output bbuffer. - iovec memsegs[1]; - memsegs[0].iov_base = bufpair.second.data(); - memsegs[0].iov_len = bytes_available; - - if (vmsplice(readfd, memsegs, 1, 0) == -1) - { - std::cerr << "Error reading contract output for user " - << pubkey << std::endl; - } - else - { - std::cout << "Contract produced " << bytes_available - << " bytes for user" << std::endl; - } + std::cerr << "Error reading contract output for user " + << pubkey << std::endl; } - - // Close readfd fd on HP process side because we are done with contract process I/O. - close(readfd); - fds[FDTYPE::HPREAD] = 0; } return 0; } -/** - * Closes unused user fds based on which process this gets called from. - */ -void close_unused_userfds(bool is_hp) -{ - for (auto &[pubkey, fds] : userfds) - { - if (is_hp) - { - // Close unused fds in Hot Pocket process. - close(fds[FDTYPE::SCREAD]); - fds[FDTYPE::SCREAD] = 0; - close(fds[FDTYPE::SCWRITE]); - fds[FDTYPE::SCWRITE] = 0; - } - else - { - // Close unused fds in smart contract process. - close(fds[FDTYPE::HPREAD]); - fds[FDTYPE::HPREAD] = 0; - - // HPWRITE fd has aleady been closed by HP process after writing user - // inputs (before the fork). - } - } -} - /** * Closes any open user fds after an error. */ @@ -365,4 +310,133 @@ void cleanup_userfds() } } +/** + * Common function to create a pair of pipes (Hp->SC, SC->HP) and write the + * given input buffer into the write fd from the HP side. + * + * @param fds Vector to populate fd list. + * @param inputbuffer Buffer to write into the HP write fd. + */ +int create_and_write_iopipes(std::vector &fds, std::string &inputbuffer) +{ + int inpipe[2]; + if (pipe(inpipe) != 0) + return -1; + + int outpipe[2]; + if (pipe(outpipe) != 0) + { + // Close the earlier created pipe. + close(inpipe[0]); + close(inpipe[1]); + return -1; + } + + // If both pipes got created, assign them to the fd vector. + fds.clear(); + fds.push_back(inpipe[0]); //SCREAD + fds.push_back(inpipe[1]); //HPWRITE + fds.push_back(outpipe[0]); //HPREAD + fds.push_back(outpipe[1]); //SCWRITE + + // Write the input (if any) into the contract and close the writefd. + + int writefd = fds[FDTYPE::HPWRITE]; + bool vmsplice_error = false; + + if (!inputbuffer.empty()) + { + // We use vmsplice to map (zero-copy) the input into the fd. + iovec memsegs[1]; + memsegs[0].iov_base = inputbuffer.data(); + memsegs[0].iov_len = inputbuffer.length(); + + if (vmsplice(writefd, memsegs, 1, 0) == -1) + vmsplice_error = true; + + // It's important that we DO NOT clear the input buffer string until the contract + // process has actually read from the fd. Because the OS is just mapping our + // input buffer memory portion into the fd, if we clear it now, the contract process + // will get invaid bytes when reading the fd. + } + + // Close the writefd since we no longer need it. + close(writefd); + fds[FDTYPE::HPWRITE] = 0; + + return vmsplice_error ? -1 : 0; +} + +/** + * Common function to read SC output from the pipe and populate a given buffer. + * @param fds Vector representing the pipes fd list. + * @param The buffer to place the read output. + */ +int read_iopipe(std::vector &fds, std::string &outputbuffer) +{ + // Read any outputs that have been written by the contract process + // from the HP outpipe and store in the outbuffer. + // outbuffer will be read by the consensus process later when it wishes so. + + int readfd = fds[FDTYPE::HPREAD]; + int bytes_available = 0; + ioctl(readfd, FIONREAD, &bytes_available); + bool vmsplice_error = false; + + if (bytes_available > 0) + { + outputbuffer.resize(bytes_available); // args.hpscbufs.second is the output buffer. + + // Populate the user output buffer with new data from the pipe. + // We use vmsplice to map (zero-copy) the output from the fd into output bbuffer. + iovec memsegs[1]; + memsegs[0].iov_base = outputbuffer.data(); + memsegs[0].iov_len = bytes_available; + + if (vmsplice(readfd, memsegs, 1, 0) == -1) + vmsplice_error = true; + } + + // Close readfd fd on HP process side because we are done with contract process I/O. + close(readfd); + fds[FDTYPE::HPREAD] = 0; + + return vmsplice_error ? -1 : 0; +} + +void close_unused_fds(bool is_hp) +{ + close_unused_vectorfds(is_hp, hpscfds); + + // Loop through user fds. + for (auto &[pubkey, fds] : userfds) + close_unused_vectorfds(is_hp, fds); +} + +/** + * Common function for closing unused fds based on which process this gets called from. + * @param is_hp Specify 'true' when calling from HP process. 'false' from SC process. + * @param fds Vector of fds to close. + */ +void close_unused_vectorfds(bool is_hp, std::vector &fds) +{ + if (is_hp) + { + // Close unused fds in Hot Pocket process. + close(fds[FDTYPE::SCREAD]); + fds[FDTYPE::SCREAD] = 0; + close(fds[FDTYPE::SCWRITE]); + fds[FDTYPE::SCWRITE] = 0; + } + else + { + // Close unused fds in smart contract process. + close(fds[FDTYPE::HPREAD]); + fds[FDTYPE::HPREAD] = 0; + + // HPWRITE fd has aleady been closed by HP process after writing + // inputs (before the fork). + } +} + } // namespace proc \ No newline at end of file diff --git a/src/proc.hpp b/src/proc.hpp index 1af87d5b..cb986c0f 100644 --- a/src/proc.hpp +++ b/src/proc.hpp @@ -20,14 +20,20 @@ struct ContractExecArgs // Map of user I/O buffers (map key: user binary public key). // The value is a pair holding consensus-verified input and contract-generated output. std::unordered_map> &userbufs; + + // Pair of HP<->SC JSON message buffers (mainly used for control messages). + // Input buffer for HP->SC messages, Output buffer for SC->HP messages. + std::pair &hpscbufs; // Current HotPocket timestamp. uint64_t timestamp; ContractExecArgs( uint64_t _timestamp, - std::unordered_map> &_userbufs) : - userbufs(_userbufs) + std::unordered_map> &_userbufs, + std::pair &_hpscbufs) : + userbufs(_userbufs), + hpscbufs(_hpscbufs) { timestamp = _timestamp; } @@ -39,16 +45,26 @@ int await_contract_execution(); //------Internal-use functions for this namespace. -int write_to_stdin(const ContractExecArgs &args); +int write_contract_args(const ContractExecArgs &args); -int write_verified_user_inputs(const ContractExecArgs &args); +int write_contract_hp_inputs(const ContractExecArgs &args); + +int write_contract_user_inputs(const ContractExecArgs &args); + +int read_contract_hp_outputs(const ContractExecArgs &args); int read_contract_user_outputs(const ContractExecArgs &args); -void close_unused_userfds(bool is_hp); - void cleanup_userfds(); +int create_and_write_iopipes(std::vector &fds, std::string &inputbuffer); + +int read_iopipe(std::vector &fds, std::string &outputbuffer); + +void close_unused_fds(bool is_hp); + +void close_unused_vectorfds(bool is_hp, std::vector &fds); + } // namespace proc #endif \ No newline at end of file