Implemented sending contract output back to the user.

This commit is contained in:
Ravin Perera
2019-10-16 17:26:33 +05:30
parent 8b003aeaa2
commit 8a22748c8d
7 changed files with 121 additions and 44 deletions

View File

@@ -10,13 +10,15 @@ console.log("Input received from hp: " + input);
let hpargs = JSON.parse(input);
Object.keys(hpargs.usrfd).forEach(function(key,index) {
Object.keys(hpargs.usrfd).forEach(function (key, index) {
let userfds = hpargs.usrfd[key];
let userinput = Buffer.from(pipe.getfdbytes(userfds[0])).toString();
console.log("Input received from user " + key + ":");
console.log(userinput);
let userinput = Buffer.from(pipe.getfdbytes(userfds[0])).toString().trim();
fs.writeSync(userfds[1], "Echoing: " + userinput);
if (userinput.length > 0) {
console.log("Input received from user " + key + ":");
console.log(userinput);
fs.writeSync(userfds[1], "Echoing: " + userinput);
}
});
console.log("===Sample contract ended===");

View File

@@ -103,21 +103,41 @@ int main(int argc, char **argv)
// This will start hosting the contract and start consensus rounds.
// TODO
// Temp code to avoid exiting.
std::string s;
std::cin >> s;
// Test code to execute contract and collect outputs.
std::unordered_map<std::string, std::pair<std::string, std::string>> userbufs;
for (auto &[sid, user] : usr::users)
while (true)
{
std::pair<std::string, std::string> bufpair;
bufpair.first = std::move(user.inbuffer);
userbufs[user.pubkey] = bufpair;
}
sleep(1);
// Test code to execute contract and collect outputs.
std::unordered_map<std::string, std::pair<std::string, std::string>> userbufs;
for (auto &[sid, user] : usr::users)
{
std::pair<std::string, std::string> bufpair;
proc::ContractExecArgs eargs(123123345, userbufs);
proc::exec_contract(eargs);
std::string inputtosend;
inputtosend.swap(user.inbuffer);
bufpair.first = std::move(inputtosend);
userbufs[user.pubkey] = bufpair;
}
proc::ContractExecArgs eargs(123123345, userbufs);
proc::exec_contract(eargs);
for (auto &[pubkey, bufpair] : userbufs)
{
if (!bufpair.second.empty())
{
// Find the user session id by the pubkey.
const std::string sessionid = usr::sessionids[pubkey];
// Find the user by session id.
auto itr = usr::users.find(sessionid);
const usr::connected_user &user = itr->second;
user.session->send(std::move(bufpair.second));
}
}
userbufs.clear();
}
// Free resources.
usr::deinit();

View File

@@ -34,6 +34,11 @@ enum FDTYPE
*/
std::unordered_map<std::string, std::vector<int>> userfds;
/**
* Holds the contract process id (if currently executing).
*/
__pid_t contract_pid;
/**
* Executes the contract process and passes the specified arguments.
*
@@ -49,21 +54,22 @@ int exec_contract(const ContractExecArgs &args)
return -1;
}
__pid_t pid = fork();
__id_t pid = fork();
if (pid > 0)
{
// HotPocket process.
contract_pid = pid;
// Close all user fds unused by HP process.
close_unused_userfds(true);
// Wait for child process (contract process) to complete execution.
int scstatus;
wait(&scstatus);
if (!WIFEXITED(scstatus))
int presult = await_contract_execution();
contract_pid = 0;
if (presult != 0)
{
std::cerr << "Contract process exited with non-normal status code: "
<< WEXITSTATUS(scstatus) << std::endl;
std::cerr << "Contract process exited with non-normal status code: " << presult << std::endl;
return -1;
}
@@ -73,6 +79,8 @@ int exec_contract(const ContractExecArgs &args)
std::cerr << "Failed to read user outputs from contract.\n";
return -1;
};
userfds.clear();
}
else if (pid == 0)
{
@@ -100,6 +108,23 @@ int exec_contract(const ContractExecArgs &args)
return 0;
}
/**
* Blocks the calling thread until the contract process compelted exeution (if running).
*
* @returns 0 if contract process exited normally, exit code of contract process if abnormally exited.
*/
int await_contract_execution()
{
if (contract_pid > 0)
{
int scstatus;
waitpid(contract_pid, &scstatus, 0);
if (!WIFEXITED(scstatus))
return WEXITSTATUS(scstatus);
}
return 0;
}
/**
* Writes the contract input message into the stdin of the contract process.
* Input format:
@@ -207,17 +232,28 @@ int write_verified_user_inputs(const ContractExecArgs &args)
fds.push_back(outpipe[1]); //SCWRITE
userfds[pubkey] = fds;
// Write the user input into the contract and close the writefd.
// We use vmsplice to map (zero-copy) the user input into the fd.
iovec memsegs[1];
memsegs[0].iov_base = bufpair.first.data(); // bufpair.first is the input buffer.
memsegs[0].iov_len = bufpair.first.length();
// Write the user input (if any) into the contract and close the writefd.
int writefd = fds[FDTYPE::HPWRITE];
if (vmsplice(writefd, memsegs, 1, 0) == -1)
if (!bufpair.first.empty()) // bufpair.first is the input buffer.
{
std::cerr << "Error writing contract input (" << bufpair.first.length()
<< " bytes) from user" << std::endl;
// We use vmsplice to map (zero-copy) the user input into the fd.
iovec memsegs[1];
memsegs[0].iov_base = bufpair.first.data();
memsegs[0].iov_len = bufpair.first.length();
if (vmsplice(writefd, memsegs, 1, 0) == -1)
{
std::cerr << "Error writing contract input (" << bufpair.first.length()
<< " bytes) from user" << std::endl;
}
// It's important that we DO NOT clear the input buffer string until the contract
// process has actually read from the fd. Because the OS is just mapping our
// input buffer memory portion into the fd, if we clear it now, the contract process
// will get invaid bytes when reading the fd. Therefore we clear the input buffer
// inside read_contract_user_outputs().
}
// Close the writefd since we no longer need it for this round.
@@ -246,6 +282,10 @@ int read_contract_user_outputs(const ContractExecArgs &args)
for (auto &[pubkey, bufpair] : args.userbufs)
{
// Clear the input buffer because we are sure the contract has finished reading from
// that mapped memory portion.
bufpair.first.clear(); //bufpair.first is the input buffer.
// Get fds for the user by pubkey.
std::vector<int> &fds = userfds[pubkey];
int readfd = fds[FDTYPE::HPREAD];
@@ -257,7 +297,7 @@ int read_contract_user_outputs(const ContractExecArgs &args)
bufpair.second.resize(bytes_available); // bufpair.second is the output buffer.
// Populate the user output buffer with new data from the pipe.
// We use vmsplice to map (zero-copy) the output from the fd.
// We use vmsplice to map (zero-copy) the output from the fd into output bbuffer.
iovec memsegs[1];
memsegs[0].iov_base = bufpair.second.data();
memsegs[0].iov_len = bytes_available;
@@ -269,7 +309,8 @@ int read_contract_user_outputs(const ContractExecArgs &args)
}
else
{
std::cout << "Contract produced " << bytes_available << " bytes for user" << std::endl;
std::cout << "Contract produced " << bytes_available
<< " bytes for user" << std::endl;
}
}
@@ -309,7 +350,7 @@ void close_unused_userfds(bool is_hp)
}
/**
* Closes any open user fds based after an error.
* Closes any open user fds after an error.
*/
void cleanup_userfds()
{

View File

@@ -35,6 +35,8 @@ struct ContractExecArgs
int exec_contract(const ContractExecArgs &args);
int await_contract_execution();
//------Internal-use functions for this namespace.
int write_to_stdin(const ContractExecArgs &args);

View File

@@ -6,6 +6,7 @@
#include <sodium.h>
#include "../util.hpp"
#include "../sock/socket_session.hpp"
#include "../proc.hpp"
#include "usr.hpp"
#include "user_session_handler.hpp"
@@ -80,7 +81,7 @@ void user_session_handler::on_message(sock::socket_session *session, std::string
session->flags_.reset(util::SESSION_FLAG::USER_CHALLENGE_ISSUED); // Clear challenge-issued flag
session->flags_.set(util::SESSION_FLAG::USER_AUTHED); // Set the user-authed flag
usr::add_user(session->uniqueid_, userpubkey); // Add the user to the global authed user list
usr::add_user(session, userpubkey); // Add the user to the global authed user list
usr::pending_challenges.erase(session->uniqueid_); // Remove the stored challenge
std::cout << "User connection " << session->uniqueid_ << " authenticated. Public key "
@@ -138,6 +139,8 @@ void user_session_handler::on_close(sock::socket_session *session)
// Session belongs to an authed user.
else if (session->flags_[util::SESSION_FLAG::USER_AUTHED])
{
// Wait for SC process completion before we remove existing user.
proc::await_contract_execution();
usr::remove_user(session->uniqueid_);
}

View File

@@ -16,13 +16,14 @@ namespace usr
{
/**
* Global user list. (Exposed to other sub systems)
* Connected (authenticated) user list. (Exposed to other sub systems)
* Map key: User socket session id (<ip:port>)
*/
std::unordered_map<std::string, usr::connected_user> users;
/**
* Holds set of connected user session ids for lookups. (Exposed to other sub systems)
* Holds set of connected user session ids and public keys for lookups.
* This is used for pubkey duplicate checks as well.
* Map key: User binary pubkey
*/
std::unordered_map<std::string, std::string> sessionids;
@@ -196,19 +197,20 @@ int verify_user_challenge_response(std::string &extracted_pubkeyb64, std::string
* Adds the user denoted by specified session id and public key to the global authed user list.
* This should get called after the challenge handshake is verified.
*
* @param sessionid User socket session id.
* @param session User socket session.
* @param pubkey User's binary public key.
* @return 0 on successful additions. -1 on failure.
*/
int add_user(const std::string &sessionid, const std::string &pubkey)
int add_user(sock::socket_session *session, const std::string &pubkey)
{
const std::string &sessionid = session->uniqueid_;
if (users.count(sessionid) == 1)
{
std::cerr << sessionid << " already exist. Cannot add user.\n";
return -1;
}
users.emplace(sessionid, usr::connected_user(pubkey));
users.emplace(sessionid, usr::connected_user(session, pubkey));
// Populate sessionid map so we can lookup by user pubkey.
sessionids[pubkey] = sessionid;

View File

@@ -5,6 +5,7 @@
#include <string_view>
#include <unordered_map>
#include "../util.hpp"
#include "../sock/socket_session.hpp"
/**
* Maintains the global user list with pending input outputs and manages user connections.
@@ -24,17 +25,23 @@ struct connected_user
// Holds the unprocessed user input collected from websocket.
std::string inbuffer;
// Holds the websocket session of this user.
// We don't need to own the session object since the lifetime of user and session are coupled.
sock::socket_session *session;
/**
* @param _pubkey The public key of the user in binary format.
*/
connected_user(std::string_view _pubkey)
connected_user(sock::socket_session *_session, std::string_view _pubkey)
{
session = _session;
pubkey = _pubkey;
}
};
/**
* Global authenticated (challenge-verified) user list.
* Connected (authenticated) user list. (Exposed to other sub systems)
* Map key: User socket session id (<ip:port>)
*/
extern std::unordered_map<std::string, usr::connected_user> users;
@@ -57,7 +64,7 @@ void create_user_challenge(std::string &msg, std::string &challengeb64);
int verify_user_challenge_response(std::string &extracted_pubkeyb64, std::string_view response, std::string_view original_challenge);
int add_user(const std::string &sessionid, const std::string &pubkey);
int add_user(sock::socket_session *session, const std::string &pubkey);
int remove_user(const std::string &sessionid);