From db99d949020c7dcb46a4221a25b077bf56bfd8f0 Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Tue, 15 Oct 2019 23:26:22 +0530 Subject: [PATCH] vmsplice and other optmizations for user pipes I/O (#28) --- examples/hpcontract/.gitignore | 1 + examples/hpcontract/contract.js | 22 ++ examples/hpcontract/package-lock.json | 46 ++++ examples/hpcontract/package.json | 6 + src/main.cpp | 17 ++ src/proc.cpp | 295 ++++++++++++++------------ src/proc.hpp | 24 ++- src/usr/user_session_handler.cpp | 41 ++-- src/usr/usr.cpp | 57 +++-- src/usr/usr.hpp | 52 ++--- 10 files changed, 351 insertions(+), 210 deletions(-) create mode 100644 examples/hpcontract/.gitignore create mode 100644 examples/hpcontract/contract.js create mode 100644 examples/hpcontract/package-lock.json create mode 100644 examples/hpcontract/package.json diff --git a/examples/hpcontract/.gitignore b/examples/hpcontract/.gitignore new file mode 100644 index 00000000..394522f4 --- /dev/null +++ b/examples/hpcontract/.gitignore @@ -0,0 +1 @@ +node_modules/** \ No newline at end of file diff --git a/examples/hpcontract/contract.js b/examples/hpcontract/contract.js new file mode 100644 index 00000000..d02b33cc --- /dev/null +++ b/examples/hpcontract/contract.js @@ -0,0 +1,22 @@ +process.on('uncaughtException', (err) => { + console.error('There was an uncaught error', err) +}) +const fs = require('fs') +const pipe = require('posix-pipe-fork-exec') + +let input = Buffer.from(pipe.getfdbytes(0)).toString() +console.log("===Sample contract started==="); +console.log("Input received from hp: " + input); + +let hpargs = JSON.parse(input); + +Object.keys(hpargs.usrfd).forEach(function(key,index) { + let userfds = hpargs.usrfd[key]; + let userinput = Buffer.from(pipe.getfdbytes(userfds[0])).toString(); + console.log("Input received from user " + key + ":"); + console.log(userinput); + + fs.writeSync(userfds[1], "Echoing: " + userinput); +}); + +console.log("===Sample contract ended==="); \ No newline at end of file diff --git a/examples/hpcontract/package-lock.json b/examples/hpcontract/package-lock.json new file mode 100644 index 00000000..78fc2004 --- /dev/null +++ b/examples/hpcontract/package-lock.json @@ -0,0 +1,46 @@ +{ + "requires": true, + "lockfileVersion": 1, + "dependencies": { + "fs-extra": { + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-8.1.0.tgz", + "integrity": "sha512-yhlQgA6mnOJUKOsRUFsgJdQCvkKhcz8tlZG5HBQfReYZy46OwLcY+Zia0mtdHsOo9y/hP+CxMN0TU9QxoOtG4g==", + "requires": { + "graceful-fs": "^4.2.0", + "jsonfile": "^4.0.0", + "universalify": "^0.1.0" + } + }, + "graceful-fs": { + "version": "4.2.2", + "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.2.tgz", + "integrity": "sha512-IItsdsea19BoLC7ELy13q1iJFNmd7ofZH5+X/pJr90/nRoPEX0DJo1dHDbgtYWOhJhcCgMDTOw84RZ72q6lB+Q==" + }, + "jsonfile": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-4.0.0.tgz", + "integrity": "sha1-h3Gq4HmbZAdrdmQPygWPnBDjPss=", + "requires": { + "graceful-fs": "^4.1.6" + } + }, + "nan": { + "version": "2.14.0", + "resolved": "https://registry.npmjs.org/nan/-/nan-2.14.0.tgz", + "integrity": "sha512-INOFj37C7k3AfaNTtX8RhsTw7qRy7eLET14cROi9+5HAVbbHuIWUHEauBv5qT4Av2tWasiTY1Jw6puUNqRJXQg==" + }, + "posix-pipe-fork-exec": { + "version": "git+https://github.com/codetsunami/posix-pipe-fork-exec.git#362060203b2eb705fa7acb969bf2ecf9e80ab102", + "from": "git+https://github.com/codetsunami/posix-pipe-fork-exec.git", + "requires": { + "nan": "^2.14.0" + } + }, + "universalify": { + "version": "0.1.2", + "resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz", + "integrity": "sha512-rBJeI5CXAlmy1pV+617WB9J63U6XcazHHF2f2dbJix4XzpUF0RS3Zbj0FGIOCAva5P/d/GBOYaACQ1w+0azUkg==" + } + } +} diff --git a/examples/hpcontract/package.json b/examples/hpcontract/package.json new file mode 100644 index 00000000..5d82607c --- /dev/null +++ b/examples/hpcontract/package.json @@ -0,0 +1,6 @@ +{ + "dependencies": { + "fs-extra": "^8.1.0", + "posix-pipe-fork-exec": "git+https://github.com/codetsunami/posix-pipe-fork-exec.git" + } +} diff --git a/src/main.cpp b/src/main.cpp index 46c7ac48..1586c044 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -5,10 +5,12 @@ #include #include #include +#include #include "util.hpp" #include "conf.hpp" #include "crypto.hpp" #include "usr/usr.hpp" +#include "proc.hpp" /** * Parses CLI args and extracts hot pocket command and parameters given. @@ -104,6 +106,21 @@ int main(int argc, char **argv) // Temp code to avoid exiting. std::string s; std::cin >> s; + + // Test code to execute contract and collect outputs. + std::unordered_map> userbufs; + for (auto &[sid, user] : usr::users) + { + std::pair bufpair; + bufpair.first = std::move(user.inbuffer); + userbufs[user.pubkeyb64] = bufpair; + } + + proc::ContractExecArgs eargs(123123345, userbufs); + proc::exec_contract(eargs); + + // Free resources. + usr::deinit(); } } } diff --git a/src/proc.cpp b/src/proc.cpp index 02ba67d8..22d9365e 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -1,24 +1,38 @@ #include #include #include +#include #include +#include +#include +#include #include -#include #include -#include -#include -#include #include "proc.hpp" -#include "usr/usr.hpp" #include "conf.hpp" namespace proc { /** - * Keeps the currently executing contract process id (if any) + * Enum used to differenciate pipe fds maintained for SC I/O pipes. */ -__pid_t contract_pid = 0; +enum FDTYPE +{ + // Used by Smart Contract to read input sent by Hot Pocket + SCREAD = 0, + // Used by Hot Pocket to write input to the smart contract. + HPWRITE = 1, + // Used by Hot Pocket to read output from the smart contract. + HPREAD = 2, + // Used by Smart Contract to write output back to Hot Pocket. + SCWRITE = 3 +}; + +/** + * Map of user pipe fds (map key: user public key) + */ +std::unordered_map> userfds; /** * Executes the contract process and passes the specified arguments. @@ -27,15 +41,11 @@ __pid_t contract_pid = 0; */ int exec_contract(const ContractExecArgs &args) { - if (is_contract_running()) + // Write any verified (consensus-reached) user inputs to user pipes. + if (write_verified_user_inputs(args) != 0) { - std::cerr << "Contract process still running.\n"; - return -1; - } - - if (create_userpipes() != 0) - { - std::cerr << "User pipe creation failed.\n"; + cleanup_userfds(); + std::cerr << "Failed to write user inputs to contract.\n"; return -1; } @@ -44,10 +54,25 @@ int exec_contract(const ContractExecArgs &args) { // HotPocket process. - contract_pid = pid; - // Close all user fds unused by HP process. close_unused_userfds(true); + + // Wait for child process (contract process) to complete execution. + int scstatus; + wait(&scstatus); + if (!WIFEXITED(scstatus)) + { + std::cerr << "Contract process exited with non-normal status code: " + << WEXITSTATUS(scstatus) << std::endl; + return -1; + } + + // After contract execution, collect contract user outputs. + if (read_contract_user_outputs(args) != 0) + { + std::cerr << "Failed to read user outputs from contract.\n"; + return -1; + }; } else if (pid == 0) { @@ -75,40 +100,6 @@ int exec_contract(const ContractExecArgs &args) return 0; } -/** - * Create pipes for all authed users in order to perform I/O with SC. - */ -int create_userpipes() -{ - for (auto &[k, user] : usr::users) - { - int inpipe[2]; - if (pipe(inpipe) != 0) - { - //Abandon and cleanup. - cleanup_userfds(user); - return -1; - } - - int outpipe[2]; - if (pipe(outpipe) != 0) - { - // Close the earlier created pipe. - close(inpipe[0]); - close(inpipe[1]); - - inpipe[0] = 0; - inpipe[1] = 0; - - //Abandon and cleanup. - cleanup_userfds(user); - return -1; - } - } - - return 0; -} - /** * Writes the contract input message into the stdin of the contract process. * Input format: @@ -116,54 +107,48 @@ int create_userpipes() * "version":"", * "pubkey": "", * "ts": , - * "usrfd":{ "pkb64":[fd0, fd1], ... }, - * "nplfd":{ "pkb64":[fd0, fd1], ... }, + * "usrfd":{ "":[fd0, fd1], ... }, + * "nplfd":{ "":[fd0, fd1], ... }, * "unl":[ "pkb64", ... ] * } */ int write_to_stdin(const ContractExecArgs &args) { - //Populate the json document with contract args. + // Populate the json strring with contract args. + // We don't use a JSOn parser here because it's lightweight to contrstuct the + // json string manually. - rapidjson::Document d; - d.SetObject(); - rapidjson::Document::AllocatorType &allocator = d.GetAllocator(); + std::ostringstream os; + os << "{\"version\":\"" << util::HP_VERSION + << "\",\"pubkey\":\"" << conf::cfg.pubkeyb64 + << "\",\"ts\":" << args.timestamp + << ",\"usrfd\":{"; - d.AddMember("version", rapidjson::StringRef(util::HP_VERSION), allocator); - d.AddMember("pubkey", rapidjson::StringRef(conf::cfg.pubkeyb64.data()), allocator); - d.AddMember("ts", args.timestamp, allocator); - - rapidjson::Value users(rapidjson::kObjectType); - for (auto &[sid, user] : usr::users) + for (auto itr = userfds.begin(); itr != userfds.end(); itr++) { - rapidjson::Value fdlist(rapidjson::kArrayType); - fdlist.PushBack(user.fds[usr::USERFDTYPE::SCREAD], allocator); - fdlist.PushBack(user.fds[usr::USERFDTYPE::SCWRITE], allocator); - users.AddMember(rapidjson::StringRef(user.pubkeyb64.data()), fdlist, allocator); + if (itr != userfds.begin()) + os << ","; // Trailing comma separator for previous element. + + // Write user pubkey and fds. + os << "\"" << itr->first << "\":[" + << itr->second[FDTYPE::SCREAD] << "," + << itr->second[FDTYPE::SCWRITE] << "]"; } - d.AddMember("usrfd", users, allocator); - // rapidjson::Value peers(rapidjson::kObjectType); - // for (auto &[sid, peer] : p2p::peers) - // { - // rapidjson::Value fdlist(rapidjson::kArrayType); - // fdlist.PushBack(peer.inpipe[0], allocator); - // fdlist.PushBack(peer.outpipe[1], allocator); - // peers.AddMember(rapidjson::StringRef(peer.pubkeyb64.data()), fdlist, allocator); - // } - // d.AddMember("nplfd", peers, allocator); + os << "},\"nplfd\":{},\"unl\":["; - rapidjson::Value unl(rapidjson::kArrayType); - for (std::string &node : conf::cfg.unl) - unl.PushBack(rapidjson::StringRef(node.data()), allocator); - d.AddMember("unl", unl, allocator); + for (auto node = conf::cfg.unl.begin(); node != conf::cfg.unl.end(); node++) + { + if (node != conf::cfg.unl.begin()) + os << ","; // Trailing comma separator for previous element. - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - d.Accept(writer); + os << "\"" << *node << "\""; + } + + os << "]}"; // Get the json string that should be written to contract input pipe. - const char *json = buffer.GetString(); + std::string json = os.str(); // Establish contract input pipe. int stdinpipe[2]; @@ -179,19 +164,69 @@ int write_to_stdin(const ContractExecArgs &args) close(stdinpipe[0]); // Write the json message and close write fd. - write(stdinpipe[1], json, buffer.GetSize()); + write(stdinpipe[1], json.data(), json.size()); close(stdinpipe[1]); return 0; } +/** + * Creates the pipes and writes verified (consesus-reached) user + * input to the SC via the pipe. + */ +int write_verified_user_inputs(const ContractExecArgs &args) +{ + for (auto &[pubkey, bufpair] : args.userbufs) + { + int inpipe[2]; + if (pipe(inpipe) != 0) + return -1; + + int outpipe[2]; + if (pipe(outpipe) != 0) + { + // Close the earlier created pipe. + close(inpipe[0]); + close(inpipe[1]); + return -1; + } + + // If both pipes got created, assign them to the fd map. + std::vector fds; + fds.push_back(inpipe[0]); //SCREAD + fds.push_back(inpipe[1]); //HPWRITE + fds.push_back(outpipe[0]); //HPREAD + fds.push_back(outpipe[1]); //SCWRITE + userfds[pubkey] = fds; + + // Write the user input into the contract and close the writefd. + // We use vmsplice to map (zero-copy) the user input into the fd. + iovec memsegs[1]; + memsegs[0].iov_base = bufpair.first.data(); // bufpair.first is the input buffer. + memsegs[0].iov_len = bufpair.first.length(); + int writefd = fds[FDTYPE::HPWRITE]; + + if (vmsplice(writefd, memsegs, 1, 0) == -1) + { + std::cerr << "Error writing contract input (" << bufpair.first.length() + << " bytes) from user " << pubkey << std::endl; + } + + // Close the writefd since we no longer need it for this round. + close(writefd); + fds[FDTYPE::HPWRITE] = 0; + } + + return 0; +} + /** * Read all per-user outputs produced by the contract process and store them in * the user buffer for later processing. * * @return 0 on success. -1 on failure. */ -int read_contract_user_outputs() +int read_contract_user_outputs(const ContractExecArgs &args) { // Read any outputs that have been written by the contract process // from all the user outpipes and store in the outbuffer of each user. @@ -201,88 +236,82 @@ int read_contract_user_outputs() // Currently this is sequential for simplicity which will not scale well // when there are large number of users connected to the same HP node. - for (auto &[sid, user] : usr::users) + for (auto &[pubkey, bufpair] : args.userbufs) { - int fdout = user.fds[usr::USERFDTYPE::HPREAD]; + // Get fds for the user by pubkey. + std::vector &fds = userfds[pubkey]; + int readfd = fds[FDTYPE::HPREAD]; int bytes_available = 0; - ioctl(fdout, FIONREAD, &bytes_available); + ioctl(readfd, FIONREAD, &bytes_available); if (bytes_available > 0) { - char data[bytes_available]; - read(fdout, data, bytes_available); + bufpair.second.reserve(bytes_available); // bufpair.second is the output buffer. - // Populate the user output buffer with new data - user.outbuffer = std::string(data, bytes_available); + // Populate the user output buffer with new data from the pipe. + // We use vmsplice to map (zero-copy) the output from the fd. + iovec memsegs[1]; + memsegs[0].iov_base = bufpair.second.data(); + memsegs[0].iov_len = bytes_available; - // Close remaining fds on HP process side because we are done with contract process I/O. - close(user.fds[usr::USERFDTYPE::HPREAD]); - close(user.fds[usr::USERFDTYPE::HPWRITE]); - - std::cout << "Read " + std::to_string(bytes_available) << " bytes into user output buffer. user:" + user.pubkeyb64 << std::endl; + if (vmsplice(readfd, memsegs, 1, 0) == -1) + { + std::cerr << "Error reading contract output for user " + << pubkey << std::endl; + } + else + { + std::cout << "Contract produced " << bytes_available << " bytes for user " << pubkey << std::endl; + } } + + // Close readfd fd on HP process side because we are done with contract process I/O. + close(readfd); + fds[FDTYPE::HPREAD] = 0; } return 0; } -/** - * Checks whether the contract process is running at this moment. - */ -bool is_contract_running() -{ - if (contract_pid > 0) - { - int status = 0; - if (!waitpid(contract_pid, &status, WNOHANG)) - return true; - contract_pid = 0; - } - - return false; -} - /** * Closes unused user fds based on which process this gets called from. */ void close_unused_userfds(bool is_hp) { - for (auto &[sid, user] : usr::users) + for (auto &[pubkey, fds] : userfds) { if (is_hp) { // Close unused fds in Hot Pocket process. - close(user.fds[usr::USERFDTYPE::SCREAD]); - close(user.fds[usr::USERFDTYPE::SCWRITE]); + close(fds[FDTYPE::SCREAD]); + fds[FDTYPE::SCREAD] = 0; + close(fds[FDTYPE::SCWRITE]); + fds[FDTYPE::SCWRITE] = 0; } else { // Close unused fds in smart contract process. - close(user.fds[usr::USERFDTYPE::HPREAD]); - close(user.fds[usr::USERFDTYPE::HPWRITE]); + close(fds[FDTYPE::HPREAD]); + fds[FDTYPE::HPREAD] = 0; + + // HPWRITE fd has aleady been closed by HP process after writing user + // inputs (before the fork). } } } /** - * Cleanup any open fds of all users (called after partial pipe failure). - * - * @param upto The user upto which point should be checked for open fds. + * Closes any open user fds based after an error. */ -void cleanup_userfds(const usr::contract_user &upto) +void cleanup_userfds() { - for (auto &[sid, user] : usr::users) + for (auto &[pubkey, fds] : userfds) { - if (&user == &upto) - break; - for (int i = 0; i < 4; i++) { - if (user.fds[i] > 0) - { - close(user.fds[i]); - user.fds[i] = 0; - } + if (fds[i] > 0) + close(fds[i]); + fds[i] = 0; } } } diff --git a/src/proc.hpp b/src/proc.hpp index cc5cd133..8a302124 100644 --- a/src/proc.hpp +++ b/src/proc.hpp @@ -17,9 +17,17 @@ namespace proc */ struct ContractExecArgs { - uint64_t timestamp; // Current HotPocket timestamp. + // Map of user I/O buffers (map key: user public key). + // The value is a pair holding consensus-verified input and contract-generated output. + std::unordered_map> &userbufs; + + // Current HotPocket timestamp. + uint64_t timestamp; - ContractExecArgs(uint64_t _timestamp) + ContractExecArgs( + uint64_t _timestamp, + std::unordered_map> &_userbufs) : + userbufs(_userbufs) { timestamp = _timestamp; } @@ -27,19 +35,17 @@ struct ContractExecArgs int exec_contract(const ContractExecArgs &args); -bool is_contract_running(); - //------Internal-use functions for this namespace. -int create_userpipes(); - int write_to_stdin(const ContractExecArgs &args); +int write_verified_user_inputs(const ContractExecArgs &args); + +int read_contract_user_outputs(const ContractExecArgs &args); + void close_unused_userfds(bool is_hp); -void cleanup_userfds(const usr::contract_user &user); - -int read_contract_user_outputs(); +void cleanup_userfds(); } // namespace proc diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp index 1778408d..79ec8899 100644 --- a/src/usr/user_session_handler.cpp +++ b/src/usr/user_session_handler.cpp @@ -24,7 +24,7 @@ void user_session_handler::on_connect(sock::socket_session *session) { std::cout << "User client connected " << session->address_ << ":" << session->port_ << std::endl; - // As soon as a user conntects, we issue them a challenge message. We remember the + // As soon as a user connects, we issue them a challenge message. We remember the // challenge we issued and later verifies the user's response with it. std::string msg; @@ -56,20 +56,31 @@ void user_session_handler::on_message(sock::socket_session *session, std::string auto itr = usr::pending_challenges.find(session->uniqueid_); if (itr != usr::pending_challenges.end()) { - std::string userpubkey; + std::string userpubkeyb64; std::string_view original_challenge = itr->second; - if (usr::verify_user_challenge_response(userpubkey, message, original_challenge) == 0) + if (usr::verify_user_challenge_response(userpubkeyb64, message, original_challenge) == 0) { - // Challenge verification successful. - // Promote the connection from pending-challenges to authenticated users. + // Challenge singature verification successful. - session->flags_.reset(util::SESSION_FLAG::USER_CHALLENGE_ISSUED); // Clear challenge-issued flag - session->flags_.set(util::SESSION_FLAG::USER_AUTHED); // Set the user-authed flag - usr::pending_challenges.erase(session->uniqueid_); // Remove the stored challenge - usr::add_user(session->uniqueid_, userpubkey); // Add the user to the global authed user list + // Now check whether this user public key is duplicate. + if (usr::sessionids.count(userpubkeyb64) == 0) + { + // All good. Unique public key. + // Promote the connection from pending-challenges to authenticated users. - std::cout << "User connection " << session->uniqueid_ << " authenticated.\n"; - return; + session->flags_.reset(util::SESSION_FLAG::USER_CHALLENGE_ISSUED); // Clear challenge-issued flag + session->flags_.set(util::SESSION_FLAG::USER_AUTHED); // Set the user-authed flag + usr::add_user(session->uniqueid_, userpubkeyb64); // Add the user to the global authed user list + usr::pending_challenges.erase(session->uniqueid_); // Remove the stored challenge + + std::cout << "User connection " << session->uniqueid_ << " authenticated. Public key " + << userpubkeyb64 << std::endl; + return; + } + else + { + std::cout << "Duplicate user public key " << session->uniqueid_ << std::endl; + } } else { @@ -87,11 +98,11 @@ void user_session_handler::on_message(sock::socket_session *session, std::string if (itr != usr::users.end()) { // This is an authed user. - usr::contract_user &user = itr->second; - + usr::connected_user &user = itr->second; + //Hand over the bytes into user inbuffer. - user.inbuffer = std::move(message); - + user.inbuffer.append(message); + std::cout << "Collected " << user.inbuffer.length() << " bytes from user " << user.pubkeyb64 << std::endl; return; } diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 91939829..22224d84 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -2,8 +2,6 @@ #include #include #include -#include -#include #include #include #include "../sock/socket_server.hpp" @@ -21,13 +19,19 @@ namespace usr * Global user list. (Exposed to other sub systems) * Map key: User socket session id () */ -std::map> users; +std::unordered_map users; + +/** + * Holds set of connected user session ids for lookups. (Exposed to other sub systems) + * Map key: User pubkey + */ +std::unordered_map sessionids; /** * Keep track of verification-pending challenges issued to newly connected users. * Map key: User socket session id () */ -std::map> pending_challenges; +std::unordered_map pending_challenges; /** * User session handler instance. This instance's methods will be fired for any user socket activity. @@ -70,6 +74,14 @@ int init() return 0; } +/** + * Free any resources used by usr subsystem (eg. socket listeners). + */ +void deinit() +{ + stop_listening(); +} + /** * Constructs user challenge message json and the challenge string required for * initial user challenge handshake. This gets called when a user gets establishes @@ -165,15 +177,18 @@ int verify_user_challenge_response(std::string &extracted_pubkeyb64, std::string } // Verify the challenge signature. We do this last due to signature verification cost. + std::string_view pubkeysv = util::getsv(d[CHALLENGE_RESP_PUBKEY]); if (crypto::verify_b64( original_challenge, util::getsv(d[CHALLENGE_RESP_SIG]), - util::getsv(d[CHALLENGE_RESP_PUBKEY])) != 0) + pubkeysv) != 0) { std::cerr << "User challenge response signature verification failed.\n"; return -1; } + extracted_pubkeyb64 = pubkeysv; + return 0; } @@ -185,7 +200,7 @@ int verify_user_challenge_response(std::string &extracted_pubkeyb64, std::string * @param pubkeyb64 User's base64 public key. * @return 0 on successful additions. -1 on failure. */ -int add_user(std::string_view sessionid, std::string_view pubkeyb64) +int add_user(const std::string &sessionid, const std::string &pubkeyb64) { if (users.count(sessionid) == 1) { @@ -193,7 +208,11 @@ int add_user(std::string_view sessionid, std::string_view pubkeyb64) return -1; } - users.emplace(sessionid, usr::contract_user(pubkeyb64)); + users.emplace(sessionid, usr::connected_user(pubkeyb64)); + + // Populate sessionid map so we can lookup by user pubkey. + sessionids.emplace(pubkeyb64, sessionid); + return 0; } @@ -201,9 +220,10 @@ int add_user(std::string_view sessionid, std::string_view pubkeyb64) * Removes the specified public key from the global user list. * This must get called when a user disconnects from HP. * + * @param sessionid User socket session id. * @return 0 on successful removals. -1 on failure. */ -int remove_user(std::string_view sessionid) +int remove_user(const std::string &sessionid) { auto itr = users.find(sessionid); @@ -213,18 +233,9 @@ int remove_user(std::string_view sessionid) return -1; } - usr::contract_user &user = itr->second; - - // Close any open fds for this user. - for (int i = 0; i < 4; i++) - { - if (user.fds[i] > 0) - { - close(user.fds[i]); - user.fds[i] = 0; - } - } + usr::connected_user &user = itr->second; + sessionids.erase(user.pubkeyb64); users.erase(itr); return 0; } @@ -246,4 +257,12 @@ void start_listening() std::cout << "Started listening for incoming user connections...\n"; } +/** + * Stops listening for incoming connections. + */ +void stop_listening() +{ + //TODO +} + } // namespace usr \ No newline at end of file diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index e4c3da7d..0192acad 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -3,7 +3,7 @@ #include #include -#include +#include #include "../util.hpp" /** @@ -16,68 +16,52 @@ namespace usr * Holds information about an authenticated (challenge-verified) user * connected to the HotPocket node. */ -struct contract_user +struct connected_user { // Base64 user public key std::string pubkeyb64; - // Holds the user input to be processed by consensus rounds + // Holds the unprocessed user input collected from websocket. std::string inbuffer; - // Holds the contract output to be processed by consensus rounds - std::string outbuffer; - - // HP --> SC pipe + SC --> HP pipe - // We keep 2 pipes in single array for easy access. - // fd[0] used by Smart Contract to read user-input sent by Hot Pocket. - // fd[1] used by Hot Pocket to write user-input to the smart contract. - // fd[2] used by Hot Pocket to read output from the smart contract. - // fd[3] used by Smart Contract to write output back to Hot Pocket. - int fds[4]; - - contract_user(std::string_view _pubkeyb64) + connected_user(std::string_view _pubkeyb64) { pubkeyb64 = _pubkeyb64; } }; -/** - * Enum used to differenciate pipe fds maintained for user/SC communication. - */ -enum USERFDTYPE -{ - // Used by Smart Contract to read user-input sent by Hot Pocket - SCREAD = 0, - // Used by Hot Pocket to write user-input to the smart contract. - HPWRITE = 1, - // Used by Hot Pocket to read output from the smart contract. - HPREAD = 2, - // Used by Smart Contract to write output back to Hot Pocket. - SCWRITE = 3 -}; - /** * Global authenticated (challenge-verified) user list. */ -extern std::map> users; +extern std::unordered_map users; + +/** + * Keep track of verification-pending challenges issued to newly connected users. + * Map key: User socket session id () + */ +extern std::unordered_map sessionids; /** * Keep track of verification-pending challenges issued to newly connected users. */ -extern std::map> pending_challenges; +extern std::unordered_map pending_challenges; int init(); +void deinit(); + void create_user_challenge(std::string &msg, std::string &challengeb64); int verify_user_challenge_response(std::string &extracted_pubkeyb64, std::string_view response, std::string_view original_challenge); -int add_user(std::string_view sessionid, std::string_view pubkeyb64); +int add_user(const std::string &sessionid, const std::string &pubkeyb64); -int remove_user(std::string_view sessionid); +int remove_user(const std::string &sessionid); void start_listening(); +void stop_listening(); + } // namespace usr #endif \ No newline at end of file