From b4237f12858d7824f844822f43181958c5d4bf4e Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Mon, 21 Oct 2019 16:41:00 +0530 Subject: [PATCH] Added npl channel to contract process. --- examples/hpclient/client.js | 2 +- src/conf.cpp | 45 ++++----- src/conf.hpp | 33 ++++--- src/main.cpp | 21 ++-- src/proc.cpp | 188 ++++++++++++++++++++++-------------- src/proc.hpp | 34 +++++-- 6 files changed, 201 insertions(+), 122 deletions(-) diff --git a/examples/hpclient/client.js b/examples/hpclient/client.js index 2d2d160d..e90f50fe 100644 --- a/examples/hpclient/client.js +++ b/examples/hpclient/client.js @@ -54,7 +54,7 @@ function main() { ws.on('message', (m) => { console.log("-----Received raw message-----") - console.log(m) + console.log(m.toString()) console.log("------------------------------") try { diff --git a/src/conf.cpp b/src/conf.cpp index 1a12e800..578d9d1c 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -184,37 +184,38 @@ int load_config() cfg.binargs = d["binargs"].GetString(); cfg.listenip = d["listenip"].GetString(); - //Storing peers in unordered map keyed by the address:port and also saving address and port seperately to retrieve easily - // in handling peer connections. + // Storing peers in unordered map keyed by the concatenated address:port and also saving address and port + // seperately to retrieve easily when handling peer connections. std::vector splitted_peers; cfg.peers.clear(); for (auto &v : d["peers"].GetArray()) { - //Split the address:port text into two - boost::split(splitted_peers, v.GetString(), boost::is_any_of(":")); + const char* ipport_concat = v.GetString(); + // Split the address:port text into two + boost::split(splitted_peers, ipport_concat, boost::is_any_of(":")); if (splitted_peers.size() == 2) { - //Push the peer address and the port to peers array - cfg.peers.emplace(std::make_pair(v.GetString(), std::make_pair(splitted_peers.front(), splitted_peers.back()))); + // Push the peer address and the port to peers array + cfg.peers.emplace(std::make_pair(ipport_concat, std::make_pair(splitted_peers.front(), splitted_peers.back()))); splitted_peers.clear(); } } cfg.unl.clear(); - for (auto &v : d["unl"].GetArray()) + for (auto &nodepk : d["unl"].GetArray()) { - //Convert the public key hex of the unl to binary and store it - std::string bin_unl; - bin_unl.resize(crypto::PFXD_PUBKEY_BYTES); + // Convert the public key hex of each node to binary and store it. + std::string bin_pubkey; + bin_pubkey.resize(crypto::PFXD_PUBKEY_BYTES); if (util::hex2bin( - reinterpret_cast(bin_unl.data()), - bin_unl.length(), - v.GetString()) != 0) + reinterpret_cast(bin_pubkey.data()), + bin_pubkey.length(), + nodepk.GetString()) != 0) { std::cerr << "Error decoding unl list.\n"; return -1; } - cfg.unl.emplace(bin_unl); + cfg.unl.emplace(bin_pubkey); } cfg.peerport = d["peerport"].GetInt(); @@ -255,28 +256,28 @@ int save_config() d.AddMember("listenip", rapidjson::StringRef(cfg.listenip.data()), allocator); rapidjson::Value peers(rapidjson::kArrayType); - for (auto &peer : cfg.peers) + for (auto &[ipport_concat, ipport_pair] : cfg.peers) { rapidjson::Value v; - v.SetString(rapidjson::StringRef(peer.first.data()), allocator); + v.SetString(rapidjson::StringRef(ipport_concat.data()), allocator); peers.PushBack(v, allocator); } d.AddMember("peers", peers, allocator); rapidjson::Value unl(rapidjson::kArrayType); - for (auto &node : cfg.unl) + for (auto &nodepk : cfg.unl) { rapidjson::Value v; - std::string npl_node; + std::string hex_pubkey; if (util::bin2hex( - npl_node, - reinterpret_cast(node.data()), - node.length()) != 0) + hex_pubkey, + reinterpret_cast(nodepk.data()), + nodepk.length()) != 0) { std::cerr << "Error encoding npl list.\n"; return -1; } - v.SetString(rapidjson::StringRef(npl_node.data()), allocator); + v.SetString(rapidjson::StringRef(hex_pubkey.data()), allocator); unl.PushBack(v, allocator); } d.AddMember("unl", unl, allocator); diff --git a/src/conf.hpp b/src/conf.hpp index d0bb6fd3..37aba685 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -13,6 +13,9 @@ namespace conf { +// Typedef to represent ip address and port pair. +typedef std::pair ip_port_pair; + // Holds contextual information about the currently loaded contract. struct contract_ctx { @@ -36,21 +39,21 @@ struct contract_config // Config elements which are loaded from the config file. - std::string pubkeyhex; // Contract hex public key - std::string seckeyhex; // Contract hex secret key - std::string keytype; // Key generation algorithm used by libsodium - std::string binary; // Full path to the contract binary - std::string binargs; // CLI arguments to pass to the contract binary - std::string listenip; // The IPs to listen on for incoming connections - std::unordered_map> peers; // List of peers in a map keyed by ":" format - std::unordered_set unl; // Unique node list (list of base64 public keys) - std::uint16_t peerport; // Listening port for peer connections - int roundtime; // Consensus round time in ms - std::uint16_t pubport; // Listening port for public user connections - int pubmaxsize; // User message max size in bytes - int pubmaxcpm; // User message rate - std::string loglevel; // Log severity level (debug, info, warn, error) - std::unordered_set loggers; // List of enabled loggers (console, file) + std::string pubkeyhex; // Contract hex public key + std::string seckeyhex; // Contract hex secret key + std::string keytype; // Key generation algorithm used by libsodium + std::string binary; // Full path to the contract binary + std::string binargs; // CLI arguments to pass to the contract binary + std::string listenip; // The IPs to listen on for incoming connections + std::unordered_map peers; // Map of peers keyed by ":" concatenated format + std::unordered_set unl; // Unique node list (list of binary public keys) + std::uint16_t peerport; // Listening port for peer connections + int roundtime; // Consensus round time in ms + std::uint16_t pubport; // Listening port for public user connections + int pubmaxsize; // User message max size in bytes + int pubmaxcpm; // User message rate + std::string loglevel; // Log severity level (debug, info, warn, error) + std::unordered_set loggers; // List of enabled loggers (console, file) }; // Global contract context struct exposed to the application. diff --git a/src/main.cpp b/src/main.cpp index b4f6cf42..cac57889 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -127,9 +127,9 @@ int main(int argc, char **argv) if (usr::init() != 0) return -1; - if (p2p::init() != 0) + if (p2p::init() != 0) return -1; - + // After initializing primary subsystems, register the SIGINT handler. signal(SIGINT, signal_handler); @@ -145,17 +145,24 @@ int main(int argc, char **argv) for (auto &[sid, user] : usr::users) { std::pair bufpair; - std::string inputtosend; inputtosend.swap(user.inbuffer); - bufpair.first = std::move(inputtosend); - userbufs[user.pubkey] = bufpair; + + userbufs.emplace(user.pubkey, std::move(bufpair)); } + std::pair hpscbufpair; hpscbufpair.first = "{msg:'Message from HP'}"; - proc::ContractExecArgs eargs(123123345, userbufs, hpscbufpair); + std::unordered_map> nplbufs; + for (int i = 0; i < 3; i++) + { + std::pair bufpair; + nplbufs.emplace("aaa", std::move(bufpair)); + } + + proc::ContractExecArgs eargs(123123345, userbufs, nplbufs, hpscbufpair); proc::exec_contract(eargs); for (auto &[pubkey, bufpair] : userbufs) @@ -184,7 +191,7 @@ int main(int argc, char **argv) } } - LOG_INFO << "exited normally"; + std::cout << "exited normally\n"; return 0; } diff --git a/src/proc.cpp b/src/proc.cpp index db2f39ab..f135beff 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -30,7 +30,10 @@ enum FDTYPE }; // Map of user pipe fds (map key: user public key) -std::unordered_map> userfds; +contract_fdmap userfds; + +// Map of NPL pipe fds (map key: user public key) +contract_fdmap nplfds; // Pipe fds for HP <--> messages. std::vector hpscfds; @@ -52,15 +55,23 @@ int exec_contract(const ContractExecArgs &args) return -1; } - // Write any verified (consensus-reached) user inputs to user pipes. - if (write_contract_user_inputs(args) != 0) + // Write any npl inputs to npl pipes. + if (write_contract_fdmap_inputs(nplfds, args.nplbufs) != 0) { - cleanup_userfds(); + cleanup_fdmap(nplfds); + LOG_ERR << "Failed to write NPL inputs to contract."; + return -1; + } + + // Write any verified (consensus-reached) user inputs to user pipes. + if (write_contract_fdmap_inputs(userfds, args.userbufs) != 0) + { + cleanup_fdmap(userfds); LOG_ERR << "Failed to write user inputs to contract."; return -1; } - __id_t pid = fork(); + __pid_t pid = fork(); if (pid > 0) { // HotPocket process. @@ -85,11 +96,24 @@ int exec_contract(const ContractExecArgs &args) // After contract execution, collect contract outputs. if (read_contract_hp_outputs(args) != 0) + { + LOG_ERR << "Error reading HP output from the contract."; return -1; + } - if (read_contract_user_outputs(args) != 0) + if (read_contract_fdmap_outputs(nplfds, args.nplbufs) != 0) + { + LOG_ERR << "Error reading NPL output from the contract."; return -1; + } + if (read_contract_fdmap_outputs(userfds, args.userbufs) != 0) + { + LOG_ERR << "Error reading User output from the contract."; + return -1; + } + + nplfds.clear(); userfds.clear(); } else if (pid == 0) @@ -151,7 +175,7 @@ int await_contract_execution() int write_contract_args(const ContractExecArgs &args) { // Populate the json string with contract args. - // We don't use a JSOn parser here because it's lightweight to contrstuct the + // We don't use a JSON parser here because it's lightweight to contrstuct the // json string manually. std::ostringstream os; @@ -161,33 +185,27 @@ int write_contract_args(const ContractExecArgs &args) << ",\"hpfd\":[" << hpscfds[FDTYPE::SCREAD] << "," << hpscfds[FDTYPE::SCWRITE] << "],\"usrfd\":{"; - for (auto itr = userfds.begin(); itr != userfds.end(); itr++) + fdmap_json_to_stream(userfds, os); + + os << "},\"nplfd\":{"; + + fdmap_json_to_stream(nplfds, os); + + os << "},\"unl\":["; + + for (auto nodepk = conf::cfg.unl.begin(); nodepk != conf::cfg.unl.end(); nodepk++) { - if (itr != userfds.begin()) + if (nodepk != conf::cfg.unl.begin()) os << ","; // Trailing comma separator for previous element. - // Get the hex pubkey of the user. - std::string_view userpubkey = itr->first; // User pubkey in binary format. - std::string userpubkeyhex; + // Convert binary nodepk into hex. + std::string pubkeyhex; util::bin2hex( - userpubkeyhex, - reinterpret_cast(userpubkey.data()), - userpubkey.length()); + pubkeyhex, + reinterpret_cast((*nodepk).data()), + (*nodepk).length()); - // Write user hex pubkey and fds. - os << "\"" << userpubkeyhex << "\":[" - << itr->second[FDTYPE::SCREAD] << "," - << itr->second[FDTYPE::SCWRITE] << "]"; - } - - os << "},\"nplfd\":{},\"unl\":["; - - for (auto node = conf::cfg.unl.begin(); node != conf::cfg.unl.end(); node++) - { - if (node != conf::cfg.unl.begin()) - os << ","; // Trailing comma separator for previous element. - - os << "\"" << *node << "\""; + os << "\"" << pubkeyhex << "\""; } os << "]}"; @@ -227,35 +245,12 @@ int write_contract_hp_inputs(const ContractExecArgs &args) if (create_and_write_iopipes(hpscfds, args.hpscbufs.first) != 0) // hpscbufs.first is the input buffer. { LOG_ERR << "Error writing HP input to SC (" << args.hpscbufs.first.length() - << " bytes)"; + << " bytes)"; return -1; } return 0; } -/** - * Creates the pipes and writes verified (consesus-reached) user - * inputs to the SC via the pipe. - */ -int write_contract_user_inputs(const ContractExecArgs &args) -{ - // Loop through input buffer for each user. - for (auto &[pubkey, bufpair] : args.userbufs) - { - userfds[pubkey] = std::move(std::vector()); - std::vector &fds = userfds[pubkey]; - - if (create_and_write_iopipes(fds, bufpair.first) != 0) // bufpair.first is the input buffer. - { - LOG_ERR << "Error writing contract input (" << bufpair.first.length() - << " bytes) from user"; - return -1; - } - } - - return 0; -} - /** * Read all HP output messages produced by the contract process and store them in * the buffer for later processing. @@ -269,46 +264,95 @@ int read_contract_hp_outputs(const ContractExecArgs &args) args.hpscbufs.first.clear(); //bufpair.first is the input buffer. if (read_iopipe(hpscfds, args.hpscbufs.second) != 0) // hpscbufs.second is the output buffer. - { - LOG_ERR << "Error reading HP output"; return -1; - } + return 0; } /** - * Read all per-user outputs produced by the contract process and store them in - * the user buffer for later processing. + * Common helper function to write json output of fdmap to given ostream. + * @param fdmap Any pubkey->fdlist map. (eg. userfds, nplfds) + * @param os An output stream. + */ +void fdmap_json_to_stream(const contract_fdmap &fdmap, std::ostringstream &os) +{ + for (auto itr = fdmap.begin(); itr != fdmap.end(); itr++) + { + if (itr != fdmap.begin()) + os << ","; // Trailing comma separator for previous element. + + // Get the hex pubkey. + std::string_view pubkey = itr->first; // Pubkey in binary format. + std::string pubkeyhex; + util::bin2hex( + pubkeyhex, + reinterpret_cast(pubkey.data()), + pubkey.length()); + + // Write hex pubkey and fds. + os << "\"" << pubkeyhex << "\":[" + << itr->second[FDTYPE::SCREAD] << "," + << itr->second[FDTYPE::SCWRITE] << "]"; + } +} + +/** + * Common function to create the pipes and write buffer inputs to the fdmap. + * We take mutable parameters since the internal entries in the maps will be + * modified (eg. fd close, buffer clear). * + * @param fdmap A map which has public key and a vector as fd list for that public key. + * @param bufmap A map which has a public key and input/output buffer pair for that public key. * @return 0 on success. -1 on failure. */ -int read_contract_user_outputs(const ContractExecArgs &args) +int write_contract_fdmap_inputs(contract_fdmap &fdmap, contract_bufmap &bufmap) { - for (auto &[pubkey, bufpair] : args.userbufs) + // Loop through input buffer for each pubkey. + for (auto &[pubkey, bufpair] : bufmap) + { + std::vector fds = std::vector(); + if (create_and_write_iopipes(fds, bufpair.first) != 0) // bufpair.first is the input buffer. + return -1; + + fdmap.emplace(pubkey, std::move(fds)); + } + + return 0; +} + +/** + * Common function to read all outputs produced by the contract process and store them in + * output buffers for later processing. + * + * @param fdmap A map which has public key and a vector as fd list for that public key. + * @param bufmap A map which has a public key and input/output buffer pair for that public key. + * @return 0 on success. -1 on failure. + */ +int read_contract_fdmap_outputs(contract_fdmap &fdmap, contract_bufmap &bufmap) +{ + for (auto &[pubkey, bufpair] : bufmap) { // Clear the input buffer because we are sure the contract has finished reading from // that mapped memory portion. bufpair.first.clear(); //bufpair.first is the input buffer. - // Get fds for the user by pubkey. - std::vector &fds = userfds[pubkey]; + // Get fds for the pubkey. + std::vector &fds = fdmap[pubkey]; if (read_iopipe(fds, bufpair.second) != 0) // bufpair.second is the output buffer. - { - LOG_ERR << "Error reading contract output for user " - << pubkey; - } + return -1; } return 0; } /** - * Closes any open user fds after an error. + * Common function to close any open fds in the map after an error. + * @param fdmap Any pubkey->fdlist map. (eg. userfds, nplfds) */ -void cleanup_userfds() +void cleanup_fdmap(contract_fdmap &fdmap) { - for (auto &[pubkey, fds] : userfds) + for (auto &[pubkey, fds] : fdmap) { for (int i = 0; i < 4; i++) { @@ -377,7 +421,7 @@ int create_and_write_iopipes(std::vector &fds, std::string &inputbuffer) } /** - * Common function to read SC output from the pipe and populate a given buffer. + * Common function to read and close SC output from the pipe and populate a given buffer. * @param fds Vector representing the pipes fd list. * @param The buffer to place the read output. */ @@ -420,6 +464,10 @@ void close_unused_fds(bool is_hp) // Loop through user fds. for (auto &[pubkey, fds] : userfds) close_unused_vectorfds(is_hp, fds); + + // Loop through npl fds. + for (auto &[pubkey, fds] : nplfds) + close_unused_vectorfds(is_hp, fds); } /** diff --git a/src/proc.hpp b/src/proc.hpp index cb986c0f..d97f9d6d 100644 --- a/src/proc.hpp +++ b/src/proc.hpp @@ -2,7 +2,9 @@ #define _HP_PROC_H_ #include -#include +#include +#include +#include #include "usr/usr.hpp" #include "util.hpp" @@ -12,6 +14,14 @@ namespace proc { +// Common typedef for a map of pubkey->fdlist. +// This is used to keep track of fdlist quadruplet with a public key (eg. user, npl). +typedef std::unordered_map> contract_fdmap; + +// Common typedef for a map of pubkey->buf-pair (input buffer and output buffer). +// This is used to keep track of input/output buffer pair with a public key (eg. user, npl) +typedef std::unordered_map> contract_bufmap; + /** * Holds information that should be passed into the contract process. */ @@ -19,7 +29,11 @@ struct ContractExecArgs { // Map of user I/O buffers (map key: user binary public key). // The value is a pair holding consensus-verified input and contract-generated output. - std::unordered_map> &userbufs; + contract_bufmap &userbufs; + + // Map of NPL I/O buffers (map key: Peer binary public key). + // The value is a pair holding NPL input and contract-generated output. + contract_bufmap &nplbufs; // Pair of HP<->SC JSON message buffers (mainly used for control messages). // Input buffer for HP->SC messages, Output buffer for SC->HP messages. @@ -30,9 +44,11 @@ struct ContractExecArgs ContractExecArgs( uint64_t _timestamp, - std::unordered_map> &_userbufs, + contract_bufmap &_userbufs, + contract_bufmap &_nplbufs, std::pair &_hpscbufs) : userbufs(_userbufs), + nplbufs(_nplbufs), hpscbufs(_hpscbufs) { timestamp = _timestamp; @@ -49,13 +65,17 @@ int write_contract_args(const ContractExecArgs &args); int write_contract_hp_inputs(const ContractExecArgs &args); -int write_contract_user_inputs(const ContractExecArgs &args); - int read_contract_hp_outputs(const ContractExecArgs &args); -int read_contract_user_outputs(const ContractExecArgs &args); +// Common helper functions -void cleanup_userfds(); +void fdmap_json_to_stream(const contract_fdmap &fdmap, std::ostringstream &os); + +int write_contract_fdmap_inputs(contract_fdmap &fdmap, contract_bufmap &bufmap); + +int read_contract_fdmap_outputs(contract_fdmap &fdmap, contract_bufmap &bufmap); + +void cleanup_fdmap(contract_fdmap &fdmap); int create_and_write_iopipes(std::vector &fds, std::string &inputbuffer);