diff --git a/README.md b/README.md index 2b306054..05d34cc0 100644 --- a/README.md +++ b/README.md @@ -74,11 +74,11 @@ Code is divided into subsystems via namespaces. **crypto::** Handles cryptographic activities. Wraps libsodium and offers convenience functions. -**proc::** Handles contract process execution. +**proc::** Handles contract process execution and managing user/SC I/O and npl I/O. Makes use of **usr** and **p2p**. -**usr::** Handles user connections and processing of user I/O with the smart contract. Makes use of **crypto** and **sock**. +**usr::** Handles user connections. Makes use of **crypto** and **sock**. -**p2p::** Handles peer-to-peer connections and message exchange between nodes. Also handles smart contract node-party-line (npl) I/O. Makes use of **crypto** and **sock**. +**p2p::** Handles peer-to-peer connections and message exchange between nodes. Makes use of **crypto** and **sock**. **cons::** Handles consensus and proposal rounds. Makes use of **usr**, **p2p** and **proc** diff --git a/src/proc.cpp b/src/proc.cpp index 69508d33..02ba67d8 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -2,12 +2,14 @@ #include #include #include +#include #include #include #include #include #include #include "proc.hpp" +#include "usr/usr.hpp" #include "conf.hpp" namespace proc @@ -31,18 +33,30 @@ int exec_contract(const ContractExecArgs &args) return -1; } + if (create_userpipes() != 0) + { + std::cerr << "User pipe creation failed.\n"; + return -1; + } + __pid_t pid = fork(); if (pid > 0) { // HotPocket process. contract_pid = pid; + + // Close all user fds unused by HP process. + close_unused_userfds(true); } else if (pid == 0) { // Contract process. // Set up the process environment and overlay the contract binary program with execv(). + // Close all user fds unused by SC process. + close_unused_userfds(false); + // Set the contract process working directory. chdir(conf::ctx.contractDir.data()); @@ -61,6 +75,40 @@ int exec_contract(const ContractExecArgs &args) return 0; } +/** + * Create pipes for all authed users in order to perform I/O with SC. + */ +int create_userpipes() +{ + for (auto &[k, user] : usr::users) + { + int inpipe[2]; + if (pipe(inpipe) != 0) + { + //Abandon and cleanup. + cleanup_userfds(user); + return -1; + } + + int outpipe[2]; + if (pipe(outpipe) != 0) + { + // Close the earlier created pipe. + close(inpipe[0]); + close(inpipe[1]); + + inpipe[0] = 0; + inpipe[1] = 0; + + //Abandon and cleanup. + cleanup_userfds(user); + return -1; + } + } + + return 0; +} + /** * Writes the contract input message into the stdin of the contract process. * Input format: @@ -86,24 +134,24 @@ int write_to_stdin(const ContractExecArgs &args) d.AddMember("ts", args.timestamp, allocator); rapidjson::Value users(rapidjson::kObjectType); - for (auto &[sid, user] : args.users) + for (auto &[sid, user] : usr::users) { rapidjson::Value fdlist(rapidjson::kArrayType); - fdlist.PushBack(user.inpipe[0], allocator); - fdlist.PushBack(user.outpipe[1], allocator); + fdlist.PushBack(user.fds[usr::USERFDTYPE::SCREAD], allocator); + fdlist.PushBack(user.fds[usr::USERFDTYPE::SCWRITE], allocator); users.AddMember(rapidjson::StringRef(user.pubkeyb64.data()), fdlist, allocator); } d.AddMember("usrfd", users, allocator); - rapidjson::Value peers(rapidjson::kObjectType); - for (auto &[sid, peer] : args.peers) - { - rapidjson::Value fdlist(rapidjson::kArrayType); - fdlist.PushBack(peer.inpipe[0], allocator); - fdlist.PushBack(peer.outpipe[1], allocator); - peers.AddMember(rapidjson::StringRef(peer.pubkeyb64.data()), fdlist, allocator); - } - d.AddMember("nplfd", peers, allocator); + // rapidjson::Value peers(rapidjson::kObjectType); + // for (auto &[sid, peer] : p2p::peers) + // { + // rapidjson::Value fdlist(rapidjson::kArrayType); + // fdlist.PushBack(peer.inpipe[0], allocator); + // fdlist.PushBack(peer.outpipe[1], allocator); + // peers.AddMember(rapidjson::StringRef(peer.pubkeyb64.data()), fdlist, allocator); + // } + // d.AddMember("nplfd", peers, allocator); rapidjson::Value unl(rapidjson::kArrayType); for (std::string &node : conf::cfg.unl) @@ -137,6 +185,47 @@ int write_to_stdin(const ContractExecArgs &args) return 0; } +/** + * Read all per-user outputs produced by the contract process and store them in + * the user buffer for later processing. + * + * @return 0 on success. -1 on failure. + */ +int read_contract_user_outputs() +{ + // Read any outputs that have been written by the contract process + // from all the user outpipes and store in the outbuffer of each user. + // User outbuffer will be read by the consensus process later when it wishes so. + + // Future optmization: Read and populate user buffers parallely. + // Currently this is sequential for simplicity which will not scale well + // when there are large number of users connected to the same HP node. + + for (auto &[sid, user] : usr::users) + { + int fdout = user.fds[usr::USERFDTYPE::HPREAD]; + int bytes_available = 0; + ioctl(fdout, FIONREAD, &bytes_available); + + if (bytes_available > 0) + { + char data[bytes_available]; + read(fdout, data, bytes_available); + + // Populate the user output buffer with new data + user.outbuffer = std::string(data, bytes_available); + + // Close remaining fds on HP process side because we are done with contract process I/O. + close(user.fds[usr::USERFDTYPE::HPREAD]); + close(user.fds[usr::USERFDTYPE::HPWRITE]); + + std::cout << "Read " + std::to_string(bytes_available) << " bytes into user output buffer. user:" + user.pubkeyb64 << std::endl; + } + } + + return 0; +} + /** * Checks whether the contract process is running at this moment. */ @@ -153,4 +242,49 @@ bool is_contract_running() return false; } +/** + * Closes unused user fds based on which process this gets called from. + */ +void close_unused_userfds(bool is_hp) +{ + for (auto &[sid, user] : usr::users) + { + if (is_hp) + { + // Close unused fds in Hot Pocket process. + close(user.fds[usr::USERFDTYPE::SCREAD]); + close(user.fds[usr::USERFDTYPE::SCWRITE]); + } + else + { + // Close unused fds in smart contract process. + close(user.fds[usr::USERFDTYPE::HPREAD]); + close(user.fds[usr::USERFDTYPE::HPWRITE]); + } + } +} + +/** + * Cleanup any open fds of all users (called after partial pipe failure). + * + * @param upto The user upto which point should be checked for open fds. + */ +void cleanup_userfds(const usr::contract_user &upto) +{ + for (auto &[sid, user] : usr::users) + { + if (&user == &upto) + break; + + for (int i = 0; i < 4; i++) + { + if (user.fds[i] > 0) + { + close(user.fds[i]); + user.fds[i] = 0; + } + } + } +} + } // namespace proc \ No newline at end of file diff --git a/src/proc.hpp b/src/proc.hpp index 2319aec4..cc5cd133 100644 --- a/src/proc.hpp +++ b/src/proc.hpp @@ -3,6 +3,7 @@ #include #include +#include "usr/usr.hpp" #include "util.hpp" /** @@ -16,12 +17,9 @@ namespace proc */ struct ContractExecArgs { - std::map &users; // Map of authenticated contract users indexed by user pubkey. - std::map &peers; // Map of connected peers indexed by node pubkey. - uint64_t timestamp; // Current HotPocket timestamp. + uint64_t timestamp; // Current HotPocket timestamp. - ContractExecArgs(std::map &_users, std::map &_peers, uint64_t _timestamp) - : users(_users), peers(_peers) + ContractExecArgs(uint64_t _timestamp) { timestamp = _timestamp; } @@ -33,8 +31,16 @@ bool is_contract_running(); //------Internal-use functions for this namespace. +int create_userpipes(); + int write_to_stdin(const ContractExecArgs &args); +void close_unused_userfds(bool is_hp); + +void cleanup_userfds(const usr::contract_user &user); + +int read_contract_user_outputs(); + } // namespace proc #endif \ No newline at end of file diff --git a/src/sock/socket_client.cpp b/src/sock/socket_client.cpp index 82c5d778..fbba9521 100644 --- a/src/sock/socket_client.cpp +++ b/src/sock/socket_client.cpp @@ -78,7 +78,7 @@ void socket_client::on_handshake(error ec) //Creates a new socket session object std::make_shared( ws_, sess_handler_) - ->client_run(port_, host_, ec); + ->client_run(std::move(host_), std::move(port_), ec); } /** diff --git a/src/sock/socket_client.hpp b/src/sock/socket_client.hpp index 194fff2f..f552629e 100644 --- a/src/sock/socket_client.hpp +++ b/src/sock/socket_client.hpp @@ -25,7 +25,7 @@ class socket_client : public std::enable_shared_from_this tcp::resolver resolver_; // resolver used to resolve host and the port websocket::stream ws_; // web socket stream used to send and receive messages std::string host_; // address of the server in which the client connects - std::string_view port_; // port of the server in which client connects + std::string port_; // port of the server in which client connects socket_session_handler &sess_handler_; // handler passed to gain access to websocket events void on_resolve(error ec, tcp::resolver::results_type results); diff --git a/src/sock/socket_server.cpp b/src/sock/socket_server.cpp index a6400b48..c117fff1 100644 --- a/src/sock/socket_server.cpp +++ b/src/sock/socket_server.cpp @@ -89,8 +89,8 @@ void socket_server::on_accept(error_code ec) } else { - std::string_view port = std::to_string(socket_.remote_endpoint().port()); - std::string_view address = socket_.remote_endpoint().address().to_string(); + std::string port = std::to_string(socket_.remote_endpoint().port()); + std::string address = socket_.remote_endpoint().address().to_string(); //Creating websocket stream required to pass to initiate a new session websocket::stream ws(std::move(socket_)); @@ -98,7 +98,7 @@ void socket_server::on_accept(error_code ec) // Launch a new session for this connection std::make_shared( ws, sess_handler_) - ->server_run(port, address); + ->server_run(std::move(address), std::move(port)); } // Accept another connection diff --git a/src/sock/socket_session.cpp b/src/sock/socket_session.cpp index 0e4bec39..f7bddbd9 100644 --- a/src/sock/socket_session.cpp +++ b/src/sock/socket_session.cpp @@ -17,7 +17,7 @@ socket_session::socket_session(websocket::stream &websocket, } //port and address will be used to identify from which client the message recieved in the handler -void socket_session::server_run(std::string_view port, std::string_view address) +void socket_session::server_run(const std::string &&address, const std::string &&port) { port_ = port; address_ = address; @@ -31,7 +31,7 @@ void socket_session::server_run(std::string_view port, std::string_view address) } //port and address will be used to identify from which server the message recieved in the handler -void socket_session::client_run(std::string_view port, std::string_view address, error ec) +void socket_session::client_run(const std::string &&address, const std::string &&port, error ec) { port_ = port; address_ = address; diff --git a/src/sock/socket_session.hpp b/src/sock/socket_session.hpp index 55fe48d5..682d4c06 100644 --- a/src/sock/socket_session.hpp +++ b/src/sock/socket_session.hpp @@ -50,10 +50,10 @@ public: // from the parent we store as it is, since we are not going to pass it anywhere or used in a method // The port of the remote party. - std::string_view port_; + std::string port_; // The IP address of the remote party. - std::string_view address_; + std::string address_; // The unique identifier of the remote party (format :). std::string uniqueid_; @@ -63,8 +63,8 @@ public: // Setting and reading flags to this is completely managed by user-code. std::bitset<8> flags_; - void server_run(std::string_view port, std::string_view address); - void client_run(std::string_view port, std::string_view address, error ec); + void server_run(const std::string &&address, const std::string &&port); + void client_run(const std::string &&address, const std::string &&port, error ec); void send(std::string &&ss); diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp index 90558e1c..1778408d 100644 --- a/src/usr/user_session_handler.cpp +++ b/src/usr/user_session_handler.cpp @@ -87,10 +87,12 @@ void user_session_handler::on_message(sock::socket_session *session, std::string if (itr != usr::users.end()) { // This is an authed user. - // Write the message to the user input pipe. SC will read from this pipe when it executes. - const util::contract_user &user = itr->second; - write(user.inpipe[1], message.data(), message.length()); - std::cout << "User " << user.pubkeyb64 << " wrote " << message.length() << " bytes to contract input.\n"; + usr::contract_user &user = itr->second; + + //Hand over the bytes into user inbuffer. + user.inbuffer = std::move(message); + + std::cout << "Collected " << user.inbuffer.length() << " bytes from user " << user.pubkeyb64 << std::endl; return; } } diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 5eb3a403..91939829 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -1,8 +1,6 @@ #include #include #include -#include -#include #include #include #include @@ -23,7 +21,7 @@ namespace usr * Global user list. (Exposed to other sub systems) * Map key: User socket session id () */ -std::map> users; +std::map> users; /** * Keep track of verification-pending challenges issued to newly connected users. @@ -48,10 +46,10 @@ std::thread listener_thread; // Challenge response fields. // These fields are used on challenge response validation. -static const char* CHALLENGE_RESP_TYPE = "type"; -static const char* CHALLENGE_RESP_CHALLENGE = "challenge"; -static const char* CHALLENGE_RESP_SIG = "sig"; -static const char* CHALLENGE_RESP_PUBKEY = "pubkey"; +static const char *CHALLENGE_RESP_TYPE = "type"; +static const char *CHALLENGE_RESP_CHALLENGE = "challenge"; +static const char *CHALLENGE_RESP_SIG = "sig"; +static const char *CHALLENGE_RESP_PUBKEY = "pubkey"; // Message type for the user challenge. static const char *CHALLENGE_MSGTYPE = "public_challenge"; @@ -168,9 +166,9 @@ int verify_user_challenge_response(std::string &extracted_pubkeyb64, std::string // Verify the challenge signature. We do this last due to signature verification cost. if (crypto::verify_b64( - original_challenge, - util::getsv(d[CHALLENGE_RESP_SIG]), - util::getsv(d[CHALLENGE_RESP_PUBKEY])) != 0) + original_challenge, + util::getsv(d[CHALLENGE_RESP_SIG]), + util::getsv(d[CHALLENGE_RESP_PUBKEY])) != 0) { std::cerr << "User challenge response signature verification failed.\n"; return -1; @@ -195,30 +193,7 @@ int add_user(std::string_view sessionid, std::string_view pubkeyb64) return -1; } - //Establish the I/O pipes for [User <--> SC] channel. - - //inpipe: User will write input to this and contract will read user-input from this. - int inpipe[2]; - if (pipe(inpipe) != 0) - { - std::cerr << "User in pipe creation failed. sessionid:" << sessionid << std::endl; - return -1; - } - - //outpipe: Contract will write output for the user to this and user will read from this. - int outpipe[2]; - if (pipe(outpipe) != 0) - { - std::cerr << "User out pipe creation failed. sessionid:" << sessionid << std::endl; - - //We need to close 'inpipe' in case outpipe failed. - close(inpipe[0]); - close(inpipe[1]); - - return -1; - } - - users.emplace(sessionid, util::contract_user(pubkeyb64, inpipe, outpipe)); + users.emplace(sessionid, usr::contract_user(pubkeyb64)); return 0; } @@ -238,52 +213,19 @@ int remove_user(std::string_view sessionid) return -1; } - const util::contract_user &user = itr->second; + usr::contract_user &user = itr->second; - //Close the User <--> SC I/O pipes. - close(user.inpipe[0]); - close(user.inpipe[1]); - close(user.outpipe[0]); - close(user.outpipe[1]); - - users.erase(itr); - return 0; -} - -/** - * Read all per-user outputs produced by the contract process and store them in - * the user buffer for later processing. - * - * @return 0 on success. -1 on failure. - */ -int read_contract_user_outputs() -{ - //Read any outputs that has been written by the contract process - //from all the user outpipes and store in the outbuffer of each user. - //User outbuffer will be read by the consensus process later when it wishes so. - - //Future optmization: Read and populate user buffers parallely. - //Currently this is sequential for simplicity which will not scale well - //when there are large number of users connected to the same HP node. - - for (auto &[sid, user] : users) + // Close any open fds for this user. + for (int i = 0; i < 4; i++) { - int fdout = user.outpipe[0]; - int bytes_available = 0; - ioctl(fdout, FIONREAD, &bytes_available); - - if (bytes_available > 0) + if (user.fds[i] > 0) { - char data[bytes_available]; - read(fdout, data, bytes_available); - - //Populate the user output buffer with new data - user.outbuffer = std::string(data, bytes_available); - - std::cout << "Read " + std::to_string(bytes_available) << " bytes into user output buffer. user:" + user.pubkeyb64 << std::endl; + close(user.fds[i]); + user.fds[i] = 0; } } + users.erase(itr); return 0; } diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index 2bf2967e..e4c3da7d 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -12,10 +12,54 @@ namespace usr { +/** + * Holds information about an authenticated (challenge-verified) user + * connected to the HotPocket node. + */ +struct contract_user +{ + // Base64 user public key + std::string pubkeyb64; + + // Holds the user input to be processed by consensus rounds + std::string inbuffer; + + // Holds the contract output to be processed by consensus rounds + std::string outbuffer; + + // HP --> SC pipe + SC --> HP pipe + // We keep 2 pipes in single array for easy access. + // fd[0] used by Smart Contract to read user-input sent by Hot Pocket. + // fd[1] used by Hot Pocket to write user-input to the smart contract. + // fd[2] used by Hot Pocket to read output from the smart contract. + // fd[3] used by Smart Contract to write output back to Hot Pocket. + int fds[4]; + + contract_user(std::string_view _pubkeyb64) + { + pubkeyb64 = _pubkeyb64; + } +}; + +/** + * Enum used to differenciate pipe fds maintained for user/SC communication. + */ +enum USERFDTYPE +{ + // Used by Smart Contract to read user-input sent by Hot Pocket + SCREAD = 0, + // Used by Hot Pocket to write user-input to the smart contract. + HPWRITE = 1, + // Used by Hot Pocket to read output from the smart contract. + HPREAD = 2, + // Used by Smart Contract to write output back to Hot Pocket. + SCWRITE = 3 +}; + /** * Global authenticated (challenge-verified) user list. */ -extern std::map> users; +extern std::map> users; /** * Keep track of verification-pending challenges issued to newly connected users. @@ -32,8 +76,6 @@ int add_user(std::string_view sessionid, std::string_view pubkeyb64); int remove_user(std::string_view sessionid); -int read_contract_user_outputs(); - void start_listening(); } // namespace usr diff --git a/src/util.hpp b/src/util.hpp index c6955b41..940b824b 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -32,35 +32,14 @@ enum SESSION_FLAG USER_AUTHED = 1 }; -/** - * Holds information about an authenticated (challenge-verified) user - * connected to the HotPocket node. - */ -struct contract_user -{ - std::string pubkeyb64; // Base64 user public key - int inpipe[2]; // Pipe to receive user input - int outpipe[2]; // Pipe to receive output produced by the contract - std::string outbuffer; // Holds the contract output to be processed by consensus rounds - - contract_user(std::string_view _pubkeyb64, int _inpipe[2], int _outpipe[2]) - { - pubkeyb64 = _pubkeyb64; - inpipe[0] = _inpipe[0]; - inpipe[1] = _inpipe[1]; - outpipe[0] = _outpipe[0]; - outpipe[1] = _outpipe[1]; - } -}; - /** * Holds information about a HotPocket peer connected to this node. */ struct peer_node { std::string pubkeyb64; // Base64 peer public key - int inpipe[2]; // NPL pipe from HP to SC - int outpipe[2]; // NPL pipe from SC to HP + int inpipe[2]; // NPL pipe from HP to SC + int outpipe[2]; // NPL pipe from SC to HP peer_node(std::string_view _pubkeyb64, int _inpipe[2], int _outpipe[2]) {