mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Separated pipe creation and writing.
This commit is contained in:
@@ -646,6 +646,9 @@ void run_contract_binary(const int64_t time_now, proc::contract_bufmap_t &userio
|
||||
// todo:implement exchange of npl and hpsc bufs
|
||||
proc::contract_bufmap_t nplbufmap;
|
||||
proc::contract_iobuf_pair hpscbufpair;
|
||||
hpscbufpair.inputs.push_back("A");
|
||||
hpscbufpair.inputs.push_back("B");
|
||||
hpscbufpair.inputs.push_back("C");
|
||||
|
||||
proc::exec_contract(
|
||||
proc::contract_exec_args(time_now, useriobufmap, nplbufmap, hpscbufpair));
|
||||
|
||||
165
src/proc.cpp
165
src/proc.cpp
@@ -33,33 +33,17 @@ __pid_t contract_pid;
|
||||
|
||||
/**
|
||||
* Executes the contract process and passes the specified arguments.
|
||||
*
|
||||
* @return 0 on successful process creation. -1 on failure or contract process is already running.
|
||||
*/
|
||||
int exec_contract(const contract_exec_args &args)
|
||||
{
|
||||
// Write any hp input messages to hp->sc pipe.
|
||||
if (write_contract_hp_inputs(args) != 0)
|
||||
{
|
||||
LOG_ERR << "Failed to write HP input to contract.";
|
||||
return -1;
|
||||
}
|
||||
// Setup io pipes and feed all inputs to them.
|
||||
create_iopipes_for_fdmap(nplfds, args.nplbufs);
|
||||
create_iopipes_for_fdmap(userfds, args.userbufs);
|
||||
create_iopipes(hpscfds);
|
||||
|
||||
// Write any npl inputs to npl pipes.
|
||||
if (write_contract_fdmap_inputs(nplfds, args.nplbufs) != 0)
|
||||
{
|
||||
cleanup_fdmap(nplfds);
|
||||
LOG_ERR << "Failed to write NPL inputs to contract.";
|
||||
if (feed_inputs(args) != 0)
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Write any verified (consensus-reached) user inputs to user pipes.
|
||||
if (write_contract_fdmap_inputs(userfds, args.userbufs) != 0)
|
||||
{
|
||||
cleanup_fdmap(userfds);
|
||||
LOG_ERR << "Failed to write user inputs to contract.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
const __pid_t pid = fork();
|
||||
if (pid > 0)
|
||||
@@ -71,8 +55,6 @@ int exec_contract(const contract_exec_args &args)
|
||||
close_unused_fds(true);
|
||||
|
||||
// Wait for child process (contract process) to complete execution.
|
||||
|
||||
LOG_INFO << "Contract process started.";
|
||||
const int presult = await_contract_execution();
|
||||
LOG_INFO << "Contract process ended.";
|
||||
|
||||
@@ -84,27 +66,8 @@ int exec_contract(const contract_exec_args &args)
|
||||
}
|
||||
|
||||
// After contract execution, collect contract outputs.
|
||||
|
||||
if (read_contract_hp_outputs(args) != 0)
|
||||
{
|
||||
LOG_ERR << "Error reading HP output from the contract.";
|
||||
if (fetch_outputs(args) != 0)
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (read_contract_fdmap_outputs(nplfds, args.nplbufs) != 0)
|
||||
{
|
||||
LOG_ERR << "Error reading NPL output from the contract.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (read_contract_fdmap_outputs(userfds, args.userbufs) != 0)
|
||||
{
|
||||
LOG_ERR << "Error reading User output from the contract.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
nplfds.clear();
|
||||
userfds.clear();
|
||||
}
|
||||
else if (pid == 0)
|
||||
{
|
||||
@@ -120,6 +83,8 @@ int exec_contract(const contract_exec_args &args)
|
||||
// Write the contract input message from HotPocket to the stdin (0) of the contract process.
|
||||
write_contract_args(args);
|
||||
|
||||
LOG_INFO << "Starting contract process...";
|
||||
|
||||
char *execv_args[] = {conf::cfg.binary.data(), conf::cfg.binargs.data(), NULL};
|
||||
execv(execv_args[0], execv_args);
|
||||
}
|
||||
@@ -227,12 +192,65 @@ int write_contract_args(const contract_exec_args &args)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int feed_inputs(const contract_exec_args &args)
|
||||
{
|
||||
// Write any hp input messages to hp->sc pipe.
|
||||
if (write_contract_hp_inputs(args) != 0)
|
||||
{
|
||||
LOG_ERR << "Failed to write HP input to contract.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Write any npl inputs to npl pipes.
|
||||
if (write_contract_fdmap_inputs(nplfds, args.nplbufs) != 0)
|
||||
{
|
||||
cleanup_fdmap(nplfds);
|
||||
LOG_ERR << "Failed to write NPL inputs to contract.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Write any verified (consensus-reached) user inputs to user pipes.
|
||||
if (write_contract_fdmap_inputs(userfds, args.userbufs) != 0)
|
||||
{
|
||||
cleanup_fdmap(userfds);
|
||||
LOG_ERR << "Failed to write user inputs to contract.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int fetch_outputs(const contract_exec_args &args)
|
||||
{
|
||||
if (read_contract_hp_outputs(args) != 0)
|
||||
{
|
||||
LOG_ERR << "Error reading HP output from the contract.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (read_contract_fdmap_outputs(nplfds, args.nplbufs) != 0)
|
||||
{
|
||||
LOG_ERR << "Error reading NPL output from the contract.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (read_contract_fdmap_outputs(userfds, args.userbufs) != 0)
|
||||
{
|
||||
LOG_ERR << "Error reading User output from the contract.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
nplfds.clear();
|
||||
userfds.clear();
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes any hp input messages to the contract.
|
||||
*/
|
||||
int write_contract_hp_inputs(const contract_exec_args &args)
|
||||
{
|
||||
if (create_and_write_iopipes(hpscfds, args.hpscbufs.inputs) != 0)
|
||||
if (write_iopipe(hpscfds, args.hpscbufs.inputs) != 0)
|
||||
{
|
||||
LOG_ERR << "Error writing HP inputs to SC";
|
||||
return -1;
|
||||
@@ -285,6 +303,26 @@ void fdmap_json_to_stream(const contract_fdmap_t &fdmap, std::ostringstream &os)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates io pipes for all pubkeys specified in bufmap.
|
||||
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
|
||||
* @param bufmap A map which has a public key and input/output buffer lists for that public key.
|
||||
* @return 0 on success. -1 on failure.
|
||||
*/
|
||||
int create_iopipes_for_fdmap(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap)
|
||||
{
|
||||
for (auto &[pubkey, buflist] : bufmap)
|
||||
{
|
||||
std::vector<int> fds = std::vector<int>();
|
||||
if (create_iopipes(fds) != 0)
|
||||
return -1;
|
||||
|
||||
fdmap.emplace(pubkey, std::move(fds));
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Common function to create the pipes and write buffer inputs to the fdmap.
|
||||
* We take mutable parameters since the internal entries in the maps will be
|
||||
@@ -299,11 +337,8 @@ int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufm
|
||||
// Loop through input buffers for each pubkey.
|
||||
for (auto &[pubkey, buflist] : bufmap)
|
||||
{
|
||||
std::vector<int> fds = std::vector<int>();
|
||||
if (create_and_write_iopipes(fds, buflist.inputs) != 0)
|
||||
if (write_iopipe(fdmap[pubkey], buflist.inputs) != 0)
|
||||
return -1;
|
||||
|
||||
fdmap.emplace(pubkey, std::move(fds));
|
||||
}
|
||||
|
||||
return 0;
|
||||
@@ -353,13 +388,11 @@ void cleanup_fdmap(contract_fdmap_t &fdmap)
|
||||
}
|
||||
|
||||
/**
|
||||
* Common function to create a pair of pipes (Hp->SC, SC->HP) and write the
|
||||
* given input buffer into the write fd from the HP side.
|
||||
*
|
||||
* Common function to create a pair of pipes (Hp->SC, SC->HP).
|
||||
* @param fds Vector to populate fd list.
|
||||
* @param inputbuffer Buffer to write into the HP write fd.
|
||||
*/
|
||||
int create_and_write_iopipes(std::vector<int> &fds, std::list<std::string> &inputs)
|
||||
int create_iopipes(std::vector<int> &fds)
|
||||
{
|
||||
int inpipe[2];
|
||||
if (pipe(inpipe) != 0)
|
||||
@@ -381,19 +414,35 @@ int create_and_write_iopipes(std::vector<int> &fds, std::list<std::string> &inpu
|
||||
fds.push_back(outpipe[0]); //HPREAD
|
||||
fds.push_back(outpipe[1]); //SCWRITE
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Common function to write the given input buffer into the write fd from the HP side.
|
||||
* @param fds Vector of fd list.
|
||||
* @param inputbuffer Buffer to write into the HP write fd.
|
||||
*/
|
||||
int write_iopipe(std::vector<int> &fds, std::list<std::string> &inputs)
|
||||
{
|
||||
// Write the inputs (if any) into the contract and close the writefd.
|
||||
|
||||
const int writefd = fds[FDTYPE::HPWRITE];
|
||||
bool vmsplice_error = false;
|
||||
|
||||
for (std::string &input : inputs)
|
||||
if (!inputs.empty())
|
||||
{
|
||||
// We use vmsplice to map (zero-copy) the input into the fd.
|
||||
iovec memsegs[1];
|
||||
memsegs[0].iov_base = input.data();
|
||||
memsegs[0].iov_len = input.length();
|
||||
// Prepare the input memory segments to map with vmsplice.
|
||||
size_t i = 0;
|
||||
iovec memsegs[inputs.size()];
|
||||
for (std::string &input : inputs)
|
||||
{
|
||||
memsegs[i].iov_base = input.data();
|
||||
memsegs[i].iov_len = input.length();
|
||||
i++;
|
||||
}
|
||||
|
||||
if (vmsplice(writefd, memsegs, 1, 0) == -1)
|
||||
// We use vmsplice to map (zero-copy) the inputs into the fd.
|
||||
if (vmsplice(writefd, memsegs, inputs.size(), 0) == -1)
|
||||
vmsplice_error = true;
|
||||
|
||||
// It's important that we DO NOT clear the input buffer string until the contract
|
||||
|
||||
10
src/proc.hpp
10
src/proc.hpp
@@ -73,6 +73,10 @@ int await_contract_execution();
|
||||
|
||||
int write_contract_args(const contract_exec_args &args);
|
||||
|
||||
int feed_inputs(const contract_exec_args &args);
|
||||
|
||||
int fetch_outputs(const contract_exec_args &args);
|
||||
|
||||
int write_contract_hp_inputs(const contract_exec_args &args);
|
||||
|
||||
int read_contract_hp_outputs(const contract_exec_args &args);
|
||||
@@ -81,13 +85,17 @@ int read_contract_hp_outputs(const contract_exec_args &args);
|
||||
|
||||
void fdmap_json_to_stream(const contract_fdmap_t &fdmap, std::ostringstream &os);
|
||||
|
||||
int create_iopipes_for_fdmap(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
|
||||
|
||||
int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
|
||||
|
||||
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
|
||||
|
||||
void cleanup_fdmap(contract_fdmap_t &fdmap);
|
||||
|
||||
int create_and_write_iopipes(std::vector<int> &fds, std::list<std::string> &inputs);
|
||||
int create_iopipes(std::vector<int> &fds);
|
||||
|
||||
int write_iopipe(std::vector<int> &fds, std::list<std::string> &inputs);
|
||||
|
||||
int read_iopipe(std::vector<int> &fds, std::string &output);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user