Added npl channel to contract process.

This commit is contained in:
Ravin Perera
2019-10-21 16:41:00 +05:30
parent 7ed1466ad9
commit b4237f1285
6 changed files with 201 additions and 122 deletions

View File

@@ -54,7 +54,7 @@ function main() {
ws.on('message', (m) => {
console.log("-----Received raw message-----")
console.log(m)
console.log(m.toString())
console.log("------------------------------")
try {

View File

@@ -184,37 +184,38 @@ int load_config()
cfg.binargs = d["binargs"].GetString();
cfg.listenip = d["listenip"].GetString();
//Storing peers in unordered map keyed by the address:port and also saving address and port seperately to retrieve easily
// in handling peer connections.
// Storing peers in unordered map keyed by the concatenated address:port and also saving address and port
// seperately to retrieve easily when handling peer connections.
std::vector<std::string> splitted_peers;
cfg.peers.clear();
for (auto &v : d["peers"].GetArray())
{
//Split the address:port text into two
boost::split(splitted_peers, v.GetString(), boost::is_any_of(":"));
const char* ipport_concat = v.GetString();
// Split the address:port text into two
boost::split(splitted_peers, ipport_concat, boost::is_any_of(":"));
if (splitted_peers.size() == 2)
{
//Push the peer address and the port to peers array
cfg.peers.emplace(std::make_pair(v.GetString(), std::make_pair(splitted_peers.front(), splitted_peers.back())));
// Push the peer address and the port to peers array
cfg.peers.emplace(std::make_pair(ipport_concat, std::make_pair(splitted_peers.front(), splitted_peers.back())));
splitted_peers.clear();
}
}
cfg.unl.clear();
for (auto &v : d["unl"].GetArray())
for (auto &nodepk : d["unl"].GetArray())
{
//Convert the public key hex of the unl to binary and store it
std::string bin_unl;
bin_unl.resize(crypto::PFXD_PUBKEY_BYTES);
// Convert the public key hex of each node to binary and store it.
std::string bin_pubkey;
bin_pubkey.resize(crypto::PFXD_PUBKEY_BYTES);
if (util::hex2bin(
reinterpret_cast<unsigned char *>(bin_unl.data()),
bin_unl.length(),
v.GetString()) != 0)
reinterpret_cast<unsigned char *>(bin_pubkey.data()),
bin_pubkey.length(),
nodepk.GetString()) != 0)
{
std::cerr << "Error decoding unl list.\n";
return -1;
}
cfg.unl.emplace(bin_unl);
cfg.unl.emplace(bin_pubkey);
}
cfg.peerport = d["peerport"].GetInt();
@@ -255,28 +256,28 @@ int save_config()
d.AddMember("listenip", rapidjson::StringRef(cfg.listenip.data()), allocator);
rapidjson::Value peers(rapidjson::kArrayType);
for (auto &peer : cfg.peers)
for (auto &[ipport_concat, ipport_pair] : cfg.peers)
{
rapidjson::Value v;
v.SetString(rapidjson::StringRef(peer.first.data()), allocator);
v.SetString(rapidjson::StringRef(ipport_concat.data()), allocator);
peers.PushBack(v, allocator);
}
d.AddMember("peers", peers, allocator);
rapidjson::Value unl(rapidjson::kArrayType);
for (auto &node : cfg.unl)
for (auto &nodepk : cfg.unl)
{
rapidjson::Value v;
std::string npl_node;
std::string hex_pubkey;
if (util::bin2hex(
npl_node,
reinterpret_cast<const unsigned char *>(node.data()),
node.length()) != 0)
hex_pubkey,
reinterpret_cast<const unsigned char *>(nodepk.data()),
nodepk.length()) != 0)
{
std::cerr << "Error encoding npl list.\n";
return -1;
}
v.SetString(rapidjson::StringRef(npl_node.data()), allocator);
v.SetString(rapidjson::StringRef(hex_pubkey.data()), allocator);
unl.PushBack(v, allocator);
}
d.AddMember("unl", unl, allocator);

View File

@@ -13,6 +13,9 @@
namespace conf
{
// Typedef to represent ip address and port pair.
typedef std::pair<std::string,std::string> ip_port_pair;
// Holds contextual information about the currently loaded contract.
struct contract_ctx
{
@@ -36,21 +39,21 @@ struct contract_config
// Config elements which are loaded from the config file.
std::string pubkeyhex; // Contract hex public key
std::string seckeyhex; // Contract hex secret key
std::string keytype; // Key generation algorithm used by libsodium
std::string binary; // Full path to the contract binary
std::string binargs; // CLI arguments to pass to the contract binary
std::string listenip; // The IPs to listen on for incoming connections
std::unordered_map<std::string, std::pair<std::string,std::string>> peers; // List of peers in a map keyed by "<ip address>:<port>" format
std::unordered_set<std::string> unl; // Unique node list (list of base64 public keys)
std::uint16_t peerport; // Listening port for peer connections
int roundtime; // Consensus round time in ms
std::uint16_t pubport; // Listening port for public user connections
int pubmaxsize; // User message max size in bytes
int pubmaxcpm; // User message rate
std::string loglevel; // Log severity level (debug, info, warn, error)
std::unordered_set<std::string> loggers; // List of enabled loggers (console, file)
std::string pubkeyhex; // Contract hex public key
std::string seckeyhex; // Contract hex secret key
std::string keytype; // Key generation algorithm used by libsodium
std::string binary; // Full path to the contract binary
std::string binargs; // CLI arguments to pass to the contract binary
std::string listenip; // The IPs to listen on for incoming connections
std::unordered_map<std::string, ip_port_pair> peers; // Map of peers keyed by "<ip address>:<port>" concatenated format
std::unordered_set<std::string> unl; // Unique node list (list of binary public keys)
std::uint16_t peerport; // Listening port for peer connections
int roundtime; // Consensus round time in ms
std::uint16_t pubport; // Listening port for public user connections
int pubmaxsize; // User message max size in bytes
int pubmaxcpm; // User message rate
std::string loglevel; // Log severity level (debug, info, warn, error)
std::unordered_set<std::string> loggers; // List of enabled loggers (console, file)
};
// Global contract context struct exposed to the application.

View File

@@ -127,9 +127,9 @@ int main(int argc, char **argv)
if (usr::init() != 0)
return -1;
if (p2p::init() != 0)
if (p2p::init() != 0)
return -1;
// After initializing primary subsystems, register the SIGINT handler.
signal(SIGINT, signal_handler);
@@ -145,17 +145,24 @@ int main(int argc, char **argv)
for (auto &[sid, user] : usr::users)
{
std::pair<std::string, std::string> bufpair;
std::string inputtosend;
inputtosend.swap(user.inbuffer);
bufpair.first = std::move(inputtosend);
userbufs[user.pubkey] = bufpair;
userbufs.emplace(user.pubkey, std::move(bufpair));
}
std::pair<std::string, std::string> hpscbufpair;
hpscbufpair.first = "{msg:'Message from HP'}";
proc::ContractExecArgs eargs(123123345, userbufs, hpscbufpair);
std::unordered_map<std::string, std::pair<std::string, std::string>> nplbufs;
for (int i = 0; i < 3; i++)
{
std::pair<std::string, std::string> bufpair;
nplbufs.emplace("aaa", std::move(bufpair));
}
proc::ContractExecArgs eargs(123123345, userbufs, nplbufs, hpscbufpair);
proc::exec_contract(eargs);
for (auto &[pubkey, bufpair] : userbufs)
@@ -184,7 +191,7 @@ int main(int argc, char **argv)
}
}
LOG_INFO << "exited normally";
std::cout << "exited normally\n";
return 0;
}

View File

@@ -30,7 +30,10 @@ enum FDTYPE
};
// Map of user pipe fds (map key: user public key)
std::unordered_map<std::string, std::vector<int>> userfds;
contract_fdmap userfds;
// Map of NPL pipe fds (map key: user public key)
contract_fdmap nplfds;
// Pipe fds for HP <--> messages.
std::vector<int> hpscfds;
@@ -52,15 +55,23 @@ int exec_contract(const ContractExecArgs &args)
return -1;
}
// Write any verified (consensus-reached) user inputs to user pipes.
if (write_contract_user_inputs(args) != 0)
// Write any npl inputs to npl pipes.
if (write_contract_fdmap_inputs(nplfds, args.nplbufs) != 0)
{
cleanup_userfds();
cleanup_fdmap(nplfds);
LOG_ERR << "Failed to write NPL inputs to contract.";
return -1;
}
// Write any verified (consensus-reached) user inputs to user pipes.
if (write_contract_fdmap_inputs(userfds, args.userbufs) != 0)
{
cleanup_fdmap(userfds);
LOG_ERR << "Failed to write user inputs to contract.";
return -1;
}
__id_t pid = fork();
__pid_t pid = fork();
if (pid > 0)
{
// HotPocket process.
@@ -85,11 +96,24 @@ int exec_contract(const ContractExecArgs &args)
// After contract execution, collect contract outputs.
if (read_contract_hp_outputs(args) != 0)
{
LOG_ERR << "Error reading HP output from the contract.";
return -1;
}
if (read_contract_user_outputs(args) != 0)
if (read_contract_fdmap_outputs(nplfds, args.nplbufs) != 0)
{
LOG_ERR << "Error reading NPL output from the contract.";
return -1;
}
if (read_contract_fdmap_outputs(userfds, args.userbufs) != 0)
{
LOG_ERR << "Error reading User output from the contract.";
return -1;
}
nplfds.clear();
userfds.clear();
}
else if (pid == 0)
@@ -151,7 +175,7 @@ int await_contract_execution()
int write_contract_args(const ContractExecArgs &args)
{
// Populate the json string with contract args.
// We don't use a JSOn parser here because it's lightweight to contrstuct the
// We don't use a JSON parser here because it's lightweight to contrstuct the
// json string manually.
std::ostringstream os;
@@ -161,33 +185,27 @@ int write_contract_args(const ContractExecArgs &args)
<< ",\"hpfd\":[" << hpscfds[FDTYPE::SCREAD] << "," << hpscfds[FDTYPE::SCWRITE]
<< "],\"usrfd\":{";
for (auto itr = userfds.begin(); itr != userfds.end(); itr++)
fdmap_json_to_stream(userfds, os);
os << "},\"nplfd\":{";
fdmap_json_to_stream(nplfds, os);
os << "},\"unl\":[";
for (auto nodepk = conf::cfg.unl.begin(); nodepk != conf::cfg.unl.end(); nodepk++)
{
if (itr != userfds.begin())
if (nodepk != conf::cfg.unl.begin())
os << ","; // Trailing comma separator for previous element.
// Get the hex pubkey of the user.
std::string_view userpubkey = itr->first; // User pubkey in binary format.
std::string userpubkeyhex;
// Convert binary nodepk into hex.
std::string pubkeyhex;
util::bin2hex(
userpubkeyhex,
reinterpret_cast<const unsigned char *>(userpubkey.data()),
userpubkey.length());
pubkeyhex,
reinterpret_cast<const unsigned char *>((*nodepk).data()),
(*nodepk).length());
// Write user hex pubkey and fds.
os << "\"" << userpubkeyhex << "\":["
<< itr->second[FDTYPE::SCREAD] << ","
<< itr->second[FDTYPE::SCWRITE] << "]";
}
os << "},\"nplfd\":{},\"unl\":[";
for (auto node = conf::cfg.unl.begin(); node != conf::cfg.unl.end(); node++)
{
if (node != conf::cfg.unl.begin())
os << ","; // Trailing comma separator for previous element.
os << "\"" << *node << "\"";
os << "\"" << pubkeyhex << "\"";
}
os << "]}";
@@ -227,35 +245,12 @@ int write_contract_hp_inputs(const ContractExecArgs &args)
if (create_and_write_iopipes(hpscfds, args.hpscbufs.first) != 0) // hpscbufs.first is the input buffer.
{
LOG_ERR << "Error writing HP input to SC (" << args.hpscbufs.first.length()
<< " bytes)";
<< " bytes)";
return -1;
}
return 0;
}
/**
* Creates the pipes and writes verified (consesus-reached) user
* inputs to the SC via the pipe.
*/
int write_contract_user_inputs(const ContractExecArgs &args)
{
// Loop through input buffer for each user.
for (auto &[pubkey, bufpair] : args.userbufs)
{
userfds[pubkey] = std::move(std::vector<int>());
std::vector<int> &fds = userfds[pubkey];
if (create_and_write_iopipes(fds, bufpair.first) != 0) // bufpair.first is the input buffer.
{
LOG_ERR << "Error writing contract input (" << bufpair.first.length()
<< " bytes) from user";
return -1;
}
}
return 0;
}
/**
* Read all HP output messages produced by the contract process and store them in
* the buffer for later processing.
@@ -269,46 +264,95 @@ int read_contract_hp_outputs(const ContractExecArgs &args)
args.hpscbufs.first.clear(); //bufpair.first is the input buffer.
if (read_iopipe(hpscfds, args.hpscbufs.second) != 0) // hpscbufs.second is the output buffer.
{
LOG_ERR << "Error reading HP output";
return -1;
}
return 0;
}
/**
* Read all per-user outputs produced by the contract process and store them in
* the user buffer for later processing.
* Common helper function to write json output of fdmap to given ostream.
* @param fdmap Any pubkey->fdlist map. (eg. userfds, nplfds)
* @param os An output stream.
*/
void fdmap_json_to_stream(const contract_fdmap &fdmap, std::ostringstream &os)
{
for (auto itr = fdmap.begin(); itr != fdmap.end(); itr++)
{
if (itr != fdmap.begin())
os << ","; // Trailing comma separator for previous element.
// Get the hex pubkey.
std::string_view pubkey = itr->first; // Pubkey in binary format.
std::string pubkeyhex;
util::bin2hex(
pubkeyhex,
reinterpret_cast<const unsigned char *>(pubkey.data()),
pubkey.length());
// Write hex pubkey and fds.
os << "\"" << pubkeyhex << "\":["
<< itr->second[FDTYPE::SCREAD] << ","
<< itr->second[FDTYPE::SCWRITE] << "]";
}
}
/**
* Common function to create the pipes and write buffer inputs to the fdmap.
* We take mutable parameters since the internal entries in the maps will be
* modified (eg. fd close, buffer clear).
*
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
* @param bufmap A map which has a public key and input/output buffer pair for that public key.
* @return 0 on success. -1 on failure.
*/
int read_contract_user_outputs(const ContractExecArgs &args)
int write_contract_fdmap_inputs(contract_fdmap &fdmap, contract_bufmap &bufmap)
{
for (auto &[pubkey, bufpair] : args.userbufs)
// Loop through input buffer for each pubkey.
for (auto &[pubkey, bufpair] : bufmap)
{
std::vector<int> fds = std::vector<int>();
if (create_and_write_iopipes(fds, bufpair.first) != 0) // bufpair.first is the input buffer.
return -1;
fdmap.emplace(pubkey, std::move(fds));
}
return 0;
}
/**
* Common function to read all outputs produced by the contract process and store them in
* output buffers for later processing.
*
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
* @param bufmap A map which has a public key and input/output buffer pair for that public key.
* @return 0 on success. -1 on failure.
*/
int read_contract_fdmap_outputs(contract_fdmap &fdmap, contract_bufmap &bufmap)
{
for (auto &[pubkey, bufpair] : bufmap)
{
// Clear the input buffer because we are sure the contract has finished reading from
// that mapped memory portion.
bufpair.first.clear(); //bufpair.first is the input buffer.
// Get fds for the user by pubkey.
std::vector<int> &fds = userfds[pubkey];
// Get fds for the pubkey.
std::vector<int> &fds = fdmap[pubkey];
if (read_iopipe(fds, bufpair.second) != 0) // bufpair.second is the output buffer.
{
LOG_ERR << "Error reading contract output for user "
<< pubkey;
}
return -1;
}
return 0;
}
/**
* Closes any open user fds after an error.
* Common function to close any open fds in the map after an error.
* @param fdmap Any pubkey->fdlist map. (eg. userfds, nplfds)
*/
void cleanup_userfds()
void cleanup_fdmap(contract_fdmap &fdmap)
{
for (auto &[pubkey, fds] : userfds)
for (auto &[pubkey, fds] : fdmap)
{
for (int i = 0; i < 4; i++)
{
@@ -377,7 +421,7 @@ int create_and_write_iopipes(std::vector<int> &fds, std::string &inputbuffer)
}
/**
* Common function to read SC output from the pipe and populate a given buffer.
* Common function to read and close SC output from the pipe and populate a given buffer.
* @param fds Vector representing the pipes fd list.
* @param The buffer to place the read output.
*/
@@ -420,6 +464,10 @@ void close_unused_fds(bool is_hp)
// Loop through user fds.
for (auto &[pubkey, fds] : userfds)
close_unused_vectorfds(is_hp, fds);
// Loop through npl fds.
for (auto &[pubkey, fds] : nplfds)
close_unused_vectorfds(is_hp, fds);
}
/**

View File

@@ -2,7 +2,9 @@
#define _HP_PROC_H_
#include <cstdio>
#include <map>
#include <iostream>
#include <unordered_map>
#include <vector>
#include "usr/usr.hpp"
#include "util.hpp"
@@ -12,6 +14,14 @@
namespace proc
{
// Common typedef for a map of pubkey->fdlist.
// This is used to keep track of fdlist quadruplet with a public key (eg. user, npl).
typedef std::unordered_map<std::string, std::vector<int>> contract_fdmap;
// Common typedef for a map of pubkey->buf-pair (input buffer and output buffer).
// This is used to keep track of input/output buffer pair with a public key (eg. user, npl)
typedef std::unordered_map<std::string, std::pair<std::string, std::string>> contract_bufmap;
/**
* Holds information that should be passed into the contract process.
*/
@@ -19,7 +29,11 @@ struct ContractExecArgs
{
// Map of user I/O buffers (map key: user binary public key).
// The value is a pair holding consensus-verified input and contract-generated output.
std::unordered_map<std::string, std::pair<std::string, std::string>> &userbufs;
contract_bufmap &userbufs;
// Map of NPL I/O buffers (map key: Peer binary public key).
// The value is a pair holding NPL input and contract-generated output.
contract_bufmap &nplbufs;
// Pair of HP<->SC JSON message buffers (mainly used for control messages).
// Input buffer for HP->SC messages, Output buffer for SC->HP messages.
@@ -30,9 +44,11 @@ struct ContractExecArgs
ContractExecArgs(
uint64_t _timestamp,
std::unordered_map<std::string, std::pair<std::string, std::string>> &_userbufs,
contract_bufmap &_userbufs,
contract_bufmap &_nplbufs,
std::pair<std::string, std::string> &_hpscbufs) :
userbufs(_userbufs),
nplbufs(_nplbufs),
hpscbufs(_hpscbufs)
{
timestamp = _timestamp;
@@ -49,13 +65,17 @@ int write_contract_args(const ContractExecArgs &args);
int write_contract_hp_inputs(const ContractExecArgs &args);
int write_contract_user_inputs(const ContractExecArgs &args);
int read_contract_hp_outputs(const ContractExecArgs &args);
int read_contract_user_outputs(const ContractExecArgs &args);
// Common helper functions
void cleanup_userfds();
void fdmap_json_to_stream(const contract_fdmap &fdmap, std::ostringstream &os);
int write_contract_fdmap_inputs(contract_fdmap &fdmap, contract_bufmap &bufmap);
int read_contract_fdmap_outputs(contract_fdmap &fdmap, contract_bufmap &bufmap);
void cleanup_fdmap(contract_fdmap &fdmap);
int create_and_write_iopipes(std::vector<int> &fds, std::string &inputbuffer);