From 8a22748c8daea96c47b0c8bc54e9465ede635094 Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Wed, 16 Oct 2019 17:26:33 +0530 Subject: [PATCH] Implemented sending contract output back to the user. --- examples/hpcontract/contract.js | 12 ++--- src/main.cpp | 46 ++++++++++++++------ src/proc.cpp | 75 ++++++++++++++++++++++++-------- src/proc.hpp | 2 + src/usr/user_session_handler.cpp | 5 ++- src/usr/usr.cpp | 12 ++--- src/usr/usr.hpp | 13 ++++-- 7 files changed, 121 insertions(+), 44 deletions(-) diff --git a/examples/hpcontract/contract.js b/examples/hpcontract/contract.js index d02b33cc..62ddf585 100644 --- a/examples/hpcontract/contract.js +++ b/examples/hpcontract/contract.js @@ -10,13 +10,15 @@ console.log("Input received from hp: " + input); let hpargs = JSON.parse(input); -Object.keys(hpargs.usrfd).forEach(function(key,index) { +Object.keys(hpargs.usrfd).forEach(function (key, index) { let userfds = hpargs.usrfd[key]; - let userinput = Buffer.from(pipe.getfdbytes(userfds[0])).toString(); - console.log("Input received from user " + key + ":"); - console.log(userinput); + let userinput = Buffer.from(pipe.getfdbytes(userfds[0])).toString().trim(); - fs.writeSync(userfds[1], "Echoing: " + userinput); + if (userinput.length > 0) { + console.log("Input received from user " + key + ":"); + console.log(userinput); + fs.writeSync(userfds[1], "Echoing: " + userinput); + } }); console.log("===Sample contract ended==="); \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 947623c4..efd4f55e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -103,21 +103,41 @@ int main(int argc, char **argv) // This will start hosting the contract and start consensus rounds. // TODO - // Temp code to avoid exiting. - std::string s; - std::cin >> s; - - // Test code to execute contract and collect outputs. - std::unordered_map> userbufs; - for (auto &[sid, user] : usr::users) + while (true) { - std::pair bufpair; - bufpair.first = std::move(user.inbuffer); - userbufs[user.pubkey] = bufpair; - } + sleep(1); + // Test code to execute contract and collect outputs. + std::unordered_map> userbufs; + for (auto &[sid, user] : usr::users) + { + std::pair bufpair; - proc::ContractExecArgs eargs(123123345, userbufs); - proc::exec_contract(eargs); + std::string inputtosend; + inputtosend.swap(user.inbuffer); + + bufpair.first = std::move(inputtosend); + userbufs[user.pubkey] = bufpair; + } + + proc::ContractExecArgs eargs(123123345, userbufs); + proc::exec_contract(eargs); + + for (auto &[pubkey, bufpair] : userbufs) + { + if (!bufpair.second.empty()) + { + // Find the user session id by the pubkey. + const std::string sessionid = usr::sessionids[pubkey]; + + // Find the user by session id. + auto itr = usr::users.find(sessionid); + const usr::connected_user &user = itr->second; + user.session->send(std::move(bufpair.second)); + } + } + + userbufs.clear(); + } // Free resources. usr::deinit(); diff --git a/src/proc.cpp b/src/proc.cpp index e5a42615..4c20d97a 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -34,6 +34,11 @@ enum FDTYPE */ std::unordered_map> userfds; +/** + * Holds the contract process id (if currently executing). + */ +__pid_t contract_pid; + /** * Executes the contract process and passes the specified arguments. * @@ -49,21 +54,22 @@ int exec_contract(const ContractExecArgs &args) return -1; } - __pid_t pid = fork(); + __id_t pid = fork(); if (pid > 0) { // HotPocket process. + contract_pid = pid; // Close all user fds unused by HP process. close_unused_userfds(true); // Wait for child process (contract process) to complete execution. - int scstatus; - wait(&scstatus); - if (!WIFEXITED(scstatus)) + + int presult = await_contract_execution(); + contract_pid = 0; + if (presult != 0) { - std::cerr << "Contract process exited with non-normal status code: " - << WEXITSTATUS(scstatus) << std::endl; + std::cerr << "Contract process exited with non-normal status code: " << presult << std::endl; return -1; } @@ -73,6 +79,8 @@ int exec_contract(const ContractExecArgs &args) std::cerr << "Failed to read user outputs from contract.\n"; return -1; }; + + userfds.clear(); } else if (pid == 0) { @@ -100,6 +108,23 @@ int exec_contract(const ContractExecArgs &args) return 0; } +/** + * Blocks the calling thread until the contract process compelted exeution (if running). + * + * @returns 0 if contract process exited normally, exit code of contract process if abnormally exited. + */ +int await_contract_execution() +{ + if (contract_pid > 0) + { + int scstatus; + waitpid(contract_pid, &scstatus, 0); + if (!WIFEXITED(scstatus)) + return WEXITSTATUS(scstatus); + } + return 0; +} + /** * Writes the contract input message into the stdin of the contract process. * Input format: @@ -207,17 +232,28 @@ int write_verified_user_inputs(const ContractExecArgs &args) fds.push_back(outpipe[1]); //SCWRITE userfds[pubkey] = fds; - // Write the user input into the contract and close the writefd. - // We use vmsplice to map (zero-copy) the user input into the fd. - iovec memsegs[1]; - memsegs[0].iov_base = bufpair.first.data(); // bufpair.first is the input buffer. - memsegs[0].iov_len = bufpair.first.length(); + // Write the user input (if any) into the contract and close the writefd. + int writefd = fds[FDTYPE::HPWRITE]; - if (vmsplice(writefd, memsegs, 1, 0) == -1) + if (!bufpair.first.empty()) // bufpair.first is the input buffer. { - std::cerr << "Error writing contract input (" << bufpair.first.length() - << " bytes) from user" << std::endl; + // We use vmsplice to map (zero-copy) the user input into the fd. + iovec memsegs[1]; + memsegs[0].iov_base = bufpair.first.data(); + memsegs[0].iov_len = bufpair.first.length(); + + if (vmsplice(writefd, memsegs, 1, 0) == -1) + { + std::cerr << "Error writing contract input (" << bufpair.first.length() + << " bytes) from user" << std::endl; + } + + // It's important that we DO NOT clear the input buffer string until the contract + // process has actually read from the fd. Because the OS is just mapping our + // input buffer memory portion into the fd, if we clear it now, the contract process + // will get invaid bytes when reading the fd. Therefore we clear the input buffer + // inside read_contract_user_outputs(). } // Close the writefd since we no longer need it for this round. @@ -246,6 +282,10 @@ int read_contract_user_outputs(const ContractExecArgs &args) for (auto &[pubkey, bufpair] : args.userbufs) { + // Clear the input buffer because we are sure the contract has finished reading from + // that mapped memory portion. + bufpair.first.clear(); //bufpair.first is the input buffer. + // Get fds for the user by pubkey. std::vector &fds = userfds[pubkey]; int readfd = fds[FDTYPE::HPREAD]; @@ -257,7 +297,7 @@ int read_contract_user_outputs(const ContractExecArgs &args) bufpair.second.resize(bytes_available); // bufpair.second is the output buffer. // Populate the user output buffer with new data from the pipe. - // We use vmsplice to map (zero-copy) the output from the fd. + // We use vmsplice to map (zero-copy) the output from the fd into output bbuffer. iovec memsegs[1]; memsegs[0].iov_base = bufpair.second.data(); memsegs[0].iov_len = bytes_available; @@ -269,7 +309,8 @@ int read_contract_user_outputs(const ContractExecArgs &args) } else { - std::cout << "Contract produced " << bytes_available << " bytes for user" << std::endl; + std::cout << "Contract produced " << bytes_available + << " bytes for user" << std::endl; } } @@ -309,7 +350,7 @@ void close_unused_userfds(bool is_hp) } /** - * Closes any open user fds based after an error. + * Closes any open user fds after an error. */ void cleanup_userfds() { diff --git a/src/proc.hpp b/src/proc.hpp index 32a56c4a..1af87d5b 100644 --- a/src/proc.hpp +++ b/src/proc.hpp @@ -35,6 +35,8 @@ struct ContractExecArgs int exec_contract(const ContractExecArgs &args); +int await_contract_execution(); + //------Internal-use functions for this namespace. int write_to_stdin(const ContractExecArgs &args); diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp index e9eacb33..a7b8a8e3 100644 --- a/src/usr/user_session_handler.cpp +++ b/src/usr/user_session_handler.cpp @@ -6,6 +6,7 @@ #include #include "../util.hpp" #include "../sock/socket_session.hpp" +#include "../proc.hpp" #include "usr.hpp" #include "user_session_handler.hpp" @@ -80,7 +81,7 @@ void user_session_handler::on_message(sock::socket_session *session, std::string session->flags_.reset(util::SESSION_FLAG::USER_CHALLENGE_ISSUED); // Clear challenge-issued flag session->flags_.set(util::SESSION_FLAG::USER_AUTHED); // Set the user-authed flag - usr::add_user(session->uniqueid_, userpubkey); // Add the user to the global authed user list + usr::add_user(session, userpubkey); // Add the user to the global authed user list usr::pending_challenges.erase(session->uniqueid_); // Remove the stored challenge std::cout << "User connection " << session->uniqueid_ << " authenticated. Public key " @@ -138,6 +139,8 @@ void user_session_handler::on_close(sock::socket_session *session) // Session belongs to an authed user. else if (session->flags_[util::SESSION_FLAG::USER_AUTHED]) { + // Wait for SC process completion before we remove existing user. + proc::await_contract_execution(); usr::remove_user(session->uniqueid_); } diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index f51bfcb4..b88094ba 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -16,13 +16,14 @@ namespace usr { /** - * Global user list. (Exposed to other sub systems) + * Connected (authenticated) user list. (Exposed to other sub systems) * Map key: User socket session id () */ std::unordered_map users; /** - * Holds set of connected user session ids for lookups. (Exposed to other sub systems) + * Holds set of connected user session ids and public keys for lookups. + * This is used for pubkey duplicate checks as well. * Map key: User binary pubkey */ std::unordered_map sessionids; @@ -196,19 +197,20 @@ int verify_user_challenge_response(std::string &extracted_pubkeyb64, std::string * Adds the user denoted by specified session id and public key to the global authed user list. * This should get called after the challenge handshake is verified. * - * @param sessionid User socket session id. + * @param session User socket session. * @param pubkey User's binary public key. * @return 0 on successful additions. -1 on failure. */ -int add_user(const std::string &sessionid, const std::string &pubkey) +int add_user(sock::socket_session *session, const std::string &pubkey) { + const std::string &sessionid = session->uniqueid_; if (users.count(sessionid) == 1) { std::cerr << sessionid << " already exist. Cannot add user.\n"; return -1; } - users.emplace(sessionid, usr::connected_user(pubkey)); + users.emplace(sessionid, usr::connected_user(session, pubkey)); // Populate sessionid map so we can lookup by user pubkey. sessionids[pubkey] = sessionid; diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index 2f911d7a..fada49d0 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -5,6 +5,7 @@ #include #include #include "../util.hpp" +#include "../sock/socket_session.hpp" /** * Maintains the global user list with pending input outputs and manages user connections. @@ -24,17 +25,23 @@ struct connected_user // Holds the unprocessed user input collected from websocket. std::string inbuffer; + // Holds the websocket session of this user. + // We don't need to own the session object since the lifetime of user and session are coupled. + sock::socket_session *session; + /** * @param _pubkey The public key of the user in binary format. */ - connected_user(std::string_view _pubkey) + connected_user(sock::socket_session *_session, std::string_view _pubkey) { + session = _session; pubkey = _pubkey; } }; /** - * Global authenticated (challenge-verified) user list. + * Connected (authenticated) user list. (Exposed to other sub systems) + * Map key: User socket session id () */ extern std::unordered_map users; @@ -57,7 +64,7 @@ void create_user_challenge(std::string &msg, std::string &challengeb64); int verify_user_challenge_response(std::string &extracted_pubkeyb64, std::string_view response, std::string_view original_challenge); -int add_user(const std::string &sessionid, const std::string &pubkey); +int add_user(sock::socket_session *session, const std::string &pubkey); int remove_user(const std::string &sessionid);