mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
vmsplice and other optmizations for user pipes I/O (#28)
This commit is contained in:
295
src/proc.cpp
295
src/proc.cpp
@@ -1,24 +1,38 @@
|
||||
#include <cstdio>
|
||||
#include <iostream>
|
||||
#include <stdlib.h>
|
||||
#include <vector>
|
||||
#include <unistd.h>
|
||||
#include <sstream>
|
||||
#include <fcntl.h>
|
||||
#include <sys/uio.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/wait.h>
|
||||
#include <rapidjson/document.h>
|
||||
#include <rapidjson/stringbuffer.h>
|
||||
#include <rapidjson/writer.h>
|
||||
#include "proc.hpp"
|
||||
#include "usr/usr.hpp"
|
||||
#include "conf.hpp"
|
||||
|
||||
namespace proc
|
||||
{
|
||||
|
||||
/**
|
||||
* Keeps the currently executing contract process id (if any)
|
||||
* Enum used to differenciate pipe fds maintained for SC I/O pipes.
|
||||
*/
|
||||
__pid_t contract_pid = 0;
|
||||
enum FDTYPE
|
||||
{
|
||||
// Used by Smart Contract to read input sent by Hot Pocket
|
||||
SCREAD = 0,
|
||||
// Used by Hot Pocket to write input to the smart contract.
|
||||
HPWRITE = 1,
|
||||
// Used by Hot Pocket to read output from the smart contract.
|
||||
HPREAD = 2,
|
||||
// Used by Smart Contract to write output back to Hot Pocket.
|
||||
SCWRITE = 3
|
||||
};
|
||||
|
||||
/**
|
||||
* Map of user pipe fds (map key: user public key)
|
||||
*/
|
||||
std::unordered_map<std::string, std::vector<int>> userfds;
|
||||
|
||||
/**
|
||||
* Executes the contract process and passes the specified arguments.
|
||||
@@ -27,15 +41,11 @@ __pid_t contract_pid = 0;
|
||||
*/
|
||||
int exec_contract(const ContractExecArgs &args)
|
||||
{
|
||||
if (is_contract_running())
|
||||
// Write any verified (consensus-reached) user inputs to user pipes.
|
||||
if (write_verified_user_inputs(args) != 0)
|
||||
{
|
||||
std::cerr << "Contract process still running.\n";
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (create_userpipes() != 0)
|
||||
{
|
||||
std::cerr << "User pipe creation failed.\n";
|
||||
cleanup_userfds();
|
||||
std::cerr << "Failed to write user inputs to contract.\n";
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -44,10 +54,25 @@ int exec_contract(const ContractExecArgs &args)
|
||||
{
|
||||
// HotPocket process.
|
||||
|
||||
contract_pid = pid;
|
||||
|
||||
// Close all user fds unused by HP process.
|
||||
close_unused_userfds(true);
|
||||
|
||||
// Wait for child process (contract process) to complete execution.
|
||||
int scstatus;
|
||||
wait(&scstatus);
|
||||
if (!WIFEXITED(scstatus))
|
||||
{
|
||||
std::cerr << "Contract process exited with non-normal status code: "
|
||||
<< WEXITSTATUS(scstatus) << std::endl;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// After contract execution, collect contract user outputs.
|
||||
if (read_contract_user_outputs(args) != 0)
|
||||
{
|
||||
std::cerr << "Failed to read user outputs from contract.\n";
|
||||
return -1;
|
||||
};
|
||||
}
|
||||
else if (pid == 0)
|
||||
{
|
||||
@@ -75,40 +100,6 @@ int exec_contract(const ContractExecArgs &args)
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create pipes for all authed users in order to perform I/O with SC.
|
||||
*/
|
||||
int create_userpipes()
|
||||
{
|
||||
for (auto &[k, user] : usr::users)
|
||||
{
|
||||
int inpipe[2];
|
||||
if (pipe(inpipe) != 0)
|
||||
{
|
||||
//Abandon and cleanup.
|
||||
cleanup_userfds(user);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int outpipe[2];
|
||||
if (pipe(outpipe) != 0)
|
||||
{
|
||||
// Close the earlier created pipe.
|
||||
close(inpipe[0]);
|
||||
close(inpipe[1]);
|
||||
|
||||
inpipe[0] = 0;
|
||||
inpipe[1] = 0;
|
||||
|
||||
//Abandon and cleanup.
|
||||
cleanup_userfds(user);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the contract input message into the stdin of the contract process.
|
||||
* Input format:
|
||||
@@ -116,54 +107,48 @@ int create_userpipes()
|
||||
* "version":"<hp version>",
|
||||
* "pubkey": "<this node's base64 public key>",
|
||||
* "ts": <this node's timestamp (unix milliseconds)>,
|
||||
* "usrfd":{ "pkb64":[fd0, fd1], ... },
|
||||
* "nplfd":{ "pkb64":[fd0, fd1], ... },
|
||||
* "usrfd":{ "<pkb64>":[fd0, fd1], ... },
|
||||
* "nplfd":{ "<pkb64>":[fd0, fd1], ... },
|
||||
* "unl":[ "pkb64", ... ]
|
||||
* }
|
||||
*/
|
||||
int write_to_stdin(const ContractExecArgs &args)
|
||||
{
|
||||
//Populate the json document with contract args.
|
||||
// Populate the json strring with contract args.
|
||||
// We don't use a JSOn parser here because it's lightweight to contrstuct the
|
||||
// json string manually.
|
||||
|
||||
rapidjson::Document d;
|
||||
d.SetObject();
|
||||
rapidjson::Document::AllocatorType &allocator = d.GetAllocator();
|
||||
std::ostringstream os;
|
||||
os << "{\"version\":\"" << util::HP_VERSION
|
||||
<< "\",\"pubkey\":\"" << conf::cfg.pubkeyb64
|
||||
<< "\",\"ts\":" << args.timestamp
|
||||
<< ",\"usrfd\":{";
|
||||
|
||||
d.AddMember("version", rapidjson::StringRef(util::HP_VERSION), allocator);
|
||||
d.AddMember("pubkey", rapidjson::StringRef(conf::cfg.pubkeyb64.data()), allocator);
|
||||
d.AddMember("ts", args.timestamp, allocator);
|
||||
|
||||
rapidjson::Value users(rapidjson::kObjectType);
|
||||
for (auto &[sid, user] : usr::users)
|
||||
for (auto itr = userfds.begin(); itr != userfds.end(); itr++)
|
||||
{
|
||||
rapidjson::Value fdlist(rapidjson::kArrayType);
|
||||
fdlist.PushBack(user.fds[usr::USERFDTYPE::SCREAD], allocator);
|
||||
fdlist.PushBack(user.fds[usr::USERFDTYPE::SCWRITE], allocator);
|
||||
users.AddMember(rapidjson::StringRef(user.pubkeyb64.data()), fdlist, allocator);
|
||||
if (itr != userfds.begin())
|
||||
os << ","; // Trailing comma separator for previous element.
|
||||
|
||||
// Write user pubkey and fds.
|
||||
os << "\"" << itr->first << "\":["
|
||||
<< itr->second[FDTYPE::SCREAD] << ","
|
||||
<< itr->second[FDTYPE::SCWRITE] << "]";
|
||||
}
|
||||
d.AddMember("usrfd", users, allocator);
|
||||
|
||||
// rapidjson::Value peers(rapidjson::kObjectType);
|
||||
// for (auto &[sid, peer] : p2p::peers)
|
||||
// {
|
||||
// rapidjson::Value fdlist(rapidjson::kArrayType);
|
||||
// fdlist.PushBack(peer.inpipe[0], allocator);
|
||||
// fdlist.PushBack(peer.outpipe[1], allocator);
|
||||
// peers.AddMember(rapidjson::StringRef(peer.pubkeyb64.data()), fdlist, allocator);
|
||||
// }
|
||||
// d.AddMember("nplfd", peers, allocator);
|
||||
os << "},\"nplfd\":{},\"unl\":[";
|
||||
|
||||
rapidjson::Value unl(rapidjson::kArrayType);
|
||||
for (std::string &node : conf::cfg.unl)
|
||||
unl.PushBack(rapidjson::StringRef(node.data()), allocator);
|
||||
d.AddMember("unl", unl, allocator);
|
||||
for (auto node = conf::cfg.unl.begin(); node != conf::cfg.unl.end(); node++)
|
||||
{
|
||||
if (node != conf::cfg.unl.begin())
|
||||
os << ","; // Trailing comma separator for previous element.
|
||||
|
||||
rapidjson::StringBuffer buffer;
|
||||
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
|
||||
d.Accept(writer);
|
||||
os << "\"" << *node << "\"";
|
||||
}
|
||||
|
||||
os << "]}";
|
||||
|
||||
// Get the json string that should be written to contract input pipe.
|
||||
const char *json = buffer.GetString();
|
||||
std::string json = os.str();
|
||||
|
||||
// Establish contract input pipe.
|
||||
int stdinpipe[2];
|
||||
@@ -179,19 +164,69 @@ int write_to_stdin(const ContractExecArgs &args)
|
||||
close(stdinpipe[0]);
|
||||
|
||||
// Write the json message and close write fd.
|
||||
write(stdinpipe[1], json, buffer.GetSize());
|
||||
write(stdinpipe[1], json.data(), json.size());
|
||||
close(stdinpipe[1]);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the pipes and writes verified (consesus-reached) user
|
||||
* input to the SC via the pipe.
|
||||
*/
|
||||
int write_verified_user_inputs(const ContractExecArgs &args)
|
||||
{
|
||||
for (auto &[pubkey, bufpair] : args.userbufs)
|
||||
{
|
||||
int inpipe[2];
|
||||
if (pipe(inpipe) != 0)
|
||||
return -1;
|
||||
|
||||
int outpipe[2];
|
||||
if (pipe(outpipe) != 0)
|
||||
{
|
||||
// Close the earlier created pipe.
|
||||
close(inpipe[0]);
|
||||
close(inpipe[1]);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// If both pipes got created, assign them to the fd map.
|
||||
std::vector<int> fds;
|
||||
fds.push_back(inpipe[0]); //SCREAD
|
||||
fds.push_back(inpipe[1]); //HPWRITE
|
||||
fds.push_back(outpipe[0]); //HPREAD
|
||||
fds.push_back(outpipe[1]); //SCWRITE
|
||||
userfds[pubkey] = fds;
|
||||
|
||||
// Write the user input into the contract and close the writefd.
|
||||
// We use vmsplice to map (zero-copy) the user input into the fd.
|
||||
iovec memsegs[1];
|
||||
memsegs[0].iov_base = bufpair.first.data(); // bufpair.first is the input buffer.
|
||||
memsegs[0].iov_len = bufpair.first.length();
|
||||
int writefd = fds[FDTYPE::HPWRITE];
|
||||
|
||||
if (vmsplice(writefd, memsegs, 1, 0) == -1)
|
||||
{
|
||||
std::cerr << "Error writing contract input (" << bufpair.first.length()
|
||||
<< " bytes) from user " << pubkey << std::endl;
|
||||
}
|
||||
|
||||
// Close the writefd since we no longer need it for this round.
|
||||
close(writefd);
|
||||
fds[FDTYPE::HPWRITE] = 0;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read all per-user outputs produced by the contract process and store them in
|
||||
* the user buffer for later processing.
|
||||
*
|
||||
* @return 0 on success. -1 on failure.
|
||||
*/
|
||||
int read_contract_user_outputs()
|
||||
int read_contract_user_outputs(const ContractExecArgs &args)
|
||||
{
|
||||
// Read any outputs that have been written by the contract process
|
||||
// from all the user outpipes and store in the outbuffer of each user.
|
||||
@@ -201,88 +236,82 @@ int read_contract_user_outputs()
|
||||
// Currently this is sequential for simplicity which will not scale well
|
||||
// when there are large number of users connected to the same HP node.
|
||||
|
||||
for (auto &[sid, user] : usr::users)
|
||||
for (auto &[pubkey, bufpair] : args.userbufs)
|
||||
{
|
||||
int fdout = user.fds[usr::USERFDTYPE::HPREAD];
|
||||
// Get fds for the user by pubkey.
|
||||
std::vector<int> &fds = userfds[pubkey];
|
||||
int readfd = fds[FDTYPE::HPREAD];
|
||||
int bytes_available = 0;
|
||||
ioctl(fdout, FIONREAD, &bytes_available);
|
||||
ioctl(readfd, FIONREAD, &bytes_available);
|
||||
|
||||
if (bytes_available > 0)
|
||||
{
|
||||
char data[bytes_available];
|
||||
read(fdout, data, bytes_available);
|
||||
bufpair.second.reserve(bytes_available); // bufpair.second is the output buffer.
|
||||
|
||||
// Populate the user output buffer with new data
|
||||
user.outbuffer = std::string(data, bytes_available);
|
||||
// Populate the user output buffer with new data from the pipe.
|
||||
// We use vmsplice to map (zero-copy) the output from the fd.
|
||||
iovec memsegs[1];
|
||||
memsegs[0].iov_base = bufpair.second.data();
|
||||
memsegs[0].iov_len = bytes_available;
|
||||
|
||||
// Close remaining fds on HP process side because we are done with contract process I/O.
|
||||
close(user.fds[usr::USERFDTYPE::HPREAD]);
|
||||
close(user.fds[usr::USERFDTYPE::HPWRITE]);
|
||||
|
||||
std::cout << "Read " + std::to_string(bytes_available) << " bytes into user output buffer. user:" + user.pubkeyb64 << std::endl;
|
||||
if (vmsplice(readfd, memsegs, 1, 0) == -1)
|
||||
{
|
||||
std::cerr << "Error reading contract output for user "
|
||||
<< pubkey << std::endl;
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cout << "Contract produced " << bytes_available << " bytes for user " << pubkey << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
// Close readfd fd on HP process side because we are done with contract process I/O.
|
||||
close(readfd);
|
||||
fds[FDTYPE::HPREAD] = 0;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the contract process is running at this moment.
|
||||
*/
|
||||
bool is_contract_running()
|
||||
{
|
||||
if (contract_pid > 0)
|
||||
{
|
||||
int status = 0;
|
||||
if (!waitpid(contract_pid, &status, WNOHANG))
|
||||
return true;
|
||||
contract_pid = 0;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes unused user fds based on which process this gets called from.
|
||||
*/
|
||||
void close_unused_userfds(bool is_hp)
|
||||
{
|
||||
for (auto &[sid, user] : usr::users)
|
||||
for (auto &[pubkey, fds] : userfds)
|
||||
{
|
||||
if (is_hp)
|
||||
{
|
||||
// Close unused fds in Hot Pocket process.
|
||||
close(user.fds[usr::USERFDTYPE::SCREAD]);
|
||||
close(user.fds[usr::USERFDTYPE::SCWRITE]);
|
||||
close(fds[FDTYPE::SCREAD]);
|
||||
fds[FDTYPE::SCREAD] = 0;
|
||||
close(fds[FDTYPE::SCWRITE]);
|
||||
fds[FDTYPE::SCWRITE] = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Close unused fds in smart contract process.
|
||||
close(user.fds[usr::USERFDTYPE::HPREAD]);
|
||||
close(user.fds[usr::USERFDTYPE::HPWRITE]);
|
||||
close(fds[FDTYPE::HPREAD]);
|
||||
fds[FDTYPE::HPREAD] = 0;
|
||||
|
||||
// HPWRITE fd has aleady been closed by HP process after writing user
|
||||
// inputs (before the fork).
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup any open fds of all users (called after partial pipe failure).
|
||||
*
|
||||
* @param upto The user upto which point should be checked for open fds.
|
||||
* Closes any open user fds based after an error.
|
||||
*/
|
||||
void cleanup_userfds(const usr::contract_user &upto)
|
||||
void cleanup_userfds()
|
||||
{
|
||||
for (auto &[sid, user] : usr::users)
|
||||
for (auto &[pubkey, fds] : userfds)
|
||||
{
|
||||
if (&user == &upto)
|
||||
break;
|
||||
|
||||
for (int i = 0; i < 4; i++)
|
||||
{
|
||||
if (user.fds[i] > 0)
|
||||
{
|
||||
close(user.fds[i]);
|
||||
user.fds[i] = 0;
|
||||
}
|
||||
if (fds[i] > 0)
|
||||
close(fds[i]);
|
||||
fds[i] = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user