mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Added large I/O message support. (#95)
This commit is contained in:
@@ -75,8 +75,8 @@ add_dependencies(hpcore
|
||||
)
|
||||
|
||||
add_custom_command(TARGET hpcore POST_BUILD
|
||||
# COMMAND strip ./build/hpcore
|
||||
# COMMAND strip ./build/appbill
|
||||
COMMAND strip ./build/hpcore
|
||||
COMMAND strip ./build/appbill
|
||||
COMMAND cp ./test/bin/websocketd ./test/bin/websocat ./test/bin/hpfs ./build/
|
||||
)
|
||||
|
||||
|
||||
@@ -13,9 +13,9 @@ fs.appendFileSync("exects.txt", "ts:" + hpargs.ts + "\n");
|
||||
|
||||
Object.keys(hpargs.usrfd).forEach(function (key, index) {
|
||||
let userfds = hpargs.usrfd[key];
|
||||
let userinput = fs.readFileSync(userfds[0], 'utf8');
|
||||
|
||||
if (userinput.length > 0) {
|
||||
if (userfds[0] != -1) {
|
||||
let userinput = fs.readFileSync(userfds[0], 'utf8');
|
||||
// Append user input to a state file.
|
||||
fs.appendFileSync("userinputs.txt", userinput + "\n");
|
||||
fs.writeSync(userfds[1], "Echoing: " + userinput);
|
||||
|
||||
@@ -13,9 +13,11 @@ fs.appendFileSync("exects.txt", "ts:" + hpargs.ts + "\n");
|
||||
|
||||
Object.keys(hpargs.usrfd).forEach(function (key, index) {
|
||||
let userfds = hpargs.usrfd[key];
|
||||
let fileContent = fs.readFileSync(userfds[0]);
|
||||
|
||||
if (fileContent.length > 0) {
|
||||
if (userfds[0] != -1) {
|
||||
|
||||
let fileContent = fs.readFileSync(userfds[0]);
|
||||
|
||||
// Save the content into a new file.
|
||||
var fileName = new Date().getTime().toString();
|
||||
fs.writeFileSync(fileName, fileContent);
|
||||
|
||||
@@ -51,12 +51,9 @@ function main() {
|
||||
|
||||
function create_input_container(inp) {
|
||||
|
||||
let hexInp = inp.toString('hex');
|
||||
console.log("hex " + hexInp.length);
|
||||
|
||||
let inp_container = {
|
||||
nonce: (new Date()).getTime().toString(),
|
||||
input: hexInp,
|
||||
input: inp.toString('hex'),
|
||||
max_ledger_seqno: 9999999
|
||||
}
|
||||
let inp_container_bytes = JSON.stringify(inp_container);
|
||||
|
||||
@@ -22,7 +22,8 @@ namespace comm
|
||||
watchdog_thread = std::thread(
|
||||
&comm_server::connection_watchdog, this, accept_fd, session_type, is_binary,
|
||||
std::ref(metric_thresholds), req_known_remotes, max_msg_size);
|
||||
return start_websocketd_process(port, domain_socket_name, is_binary, use_size_header);
|
||||
return start_websocketd_process(port, domain_socket_name, is_binary,
|
||||
use_size_header, max_msg_size);
|
||||
}
|
||||
|
||||
return -1;
|
||||
@@ -255,7 +256,9 @@ namespace comm
|
||||
}
|
||||
}
|
||||
|
||||
int comm_server::start_websocketd_process(const uint16_t port, const char *domain_socket_name, const bool is_binary, const bool use_size_header)
|
||||
int comm_server::start_websocketd_process(
|
||||
const uint16_t port, const char *domain_socket_name,
|
||||
const bool is_binary, const bool use_size_header, const uint64_t max_msg_size)
|
||||
{
|
||||
// setup pipe for firewall
|
||||
int firewall_pipe[2]; // parent to child pipe
|
||||
@@ -299,10 +302,10 @@ namespace comm
|
||||
dup2(firewall_pipe[0], 0);
|
||||
}
|
||||
|
||||
// Override stdout in the child's file table with /dev/null
|
||||
// int null_fd = open("/dev/null", O_WRONLY);
|
||||
// if (null_fd)
|
||||
// dup2(null_fd, 1);
|
||||
std::string max_frame = std::string("--maxframe=")
|
||||
.append(use_size_header
|
||||
? "4294967296" // 4GB
|
||||
: std::to_string(max_msg_size));
|
||||
|
||||
// Fill process args.
|
||||
char *execv_args[] = {
|
||||
@@ -316,6 +319,7 @@ namespace comm
|
||||
conf::ctx.tls_key_file.data(),
|
||||
(char *)(is_binary ? "--binary=true" : "--binary=false"),
|
||||
(char *)(use_size_header ? "--sizeheader=true" : "--sizeheader=false"),
|
||||
max_frame.data(),
|
||||
(char *)"--loglevel=error",
|
||||
(char *)"nc", // netcat (OpenBSD) is used for domain socket redirection.
|
||||
(char *)"-U", // Use UNIX domain socket
|
||||
|
||||
@@ -8,42 +8,44 @@
|
||||
namespace comm
|
||||
{
|
||||
|
||||
class comm_server
|
||||
{
|
||||
pid_t websocketd_pid = 0;
|
||||
int firewall_out = -1; // at some point we may want to listen for firewall_in but at the moment unimplemented
|
||||
std::thread watchdog_thread;
|
||||
bool should_stop_listening = false;
|
||||
class comm_server
|
||||
{
|
||||
pid_t websocketd_pid = 0;
|
||||
int firewall_out = -1; // at some point we may want to listen for firewall_in but at the moment unimplemented
|
||||
std::thread watchdog_thread;
|
||||
bool should_stop_listening = false;
|
||||
|
||||
int open_domain_socket(const char *domain_socket_name);
|
||||
int open_domain_socket(const char *domain_socket_name);
|
||||
|
||||
void connection_watchdog(
|
||||
const int accept_fd, const SESSION_TYPE session_type, const bool is_binary,
|
||||
const uint64_t (&metric_thresholds)[4], const std::set<conf::ip_port_pair> &eq_known_remotes, const uint64_t max_msg_size);
|
||||
void connection_watchdog(
|
||||
const int accept_fd, const SESSION_TYPE session_type, const bool is_binary,
|
||||
const uint64_t (&metric_thresholds)[4], const std::set<conf::ip_port_pair> &eq_known_remotes, const uint64_t max_msg_size);
|
||||
|
||||
int start_websocketd_process(const uint16_t port, const char *domain_socket_name, const bool is_binary, const bool use_size_header);
|
||||
int start_websocketd_process(
|
||||
const uint16_t port, const char *domain_socket_name,
|
||||
const bool is_binary, const bool use_size_header, const uint64_t max_msg_size);
|
||||
|
||||
int poll_fds(pollfd *pollfds, const int accept_fd, const std::unordered_map<int, comm_session> &sessions);
|
||||
int poll_fds(pollfd *pollfds, const int accept_fd, const std::unordered_map<int, comm_session> &sessions);
|
||||
|
||||
void check_for_new_connection(
|
||||
std::unordered_map<int, comm_session> &sessions, const int accept_fd,
|
||||
const SESSION_TYPE session_type, const bool is_binary, const uint64_t (&metric_thresholds)[4]);
|
||||
void check_for_new_connection(
|
||||
std::unordered_map<int, comm_session> &sessions, const int accept_fd,
|
||||
const SESSION_TYPE session_type, const bool is_binary, const uint64_t (&metric_thresholds)[4]);
|
||||
|
||||
void maintain_known_connections(
|
||||
std::unordered_map<int, comm_session> &sessions, std::unordered_map<int, comm_client> &outbound_clients,
|
||||
const std::set<conf::ip_port_pair> &req_known_remotes, const SESSION_TYPE session_type, const bool is_binary,
|
||||
const uint64_t max_msg_size, const uint64_t (&metric_thresholds)[4]);
|
||||
void maintain_known_connections(
|
||||
std::unordered_map<int, comm_session> &sessions, std::unordered_map<int, comm_client> &outbound_clients,
|
||||
const std::set<conf::ip_port_pair> &req_known_remotes, const SESSION_TYPE session_type, const bool is_binary,
|
||||
const uint64_t max_msg_size, const uint64_t (&metric_thresholds)[4]);
|
||||
|
||||
std::string get_cgi_ip(const int fd);
|
||||
std::string get_cgi_ip(const int fd);
|
||||
|
||||
public:
|
||||
// Start accepting incoming connections
|
||||
int start(
|
||||
const uint16_t port, const char *domain_socket_name, const SESSION_TYPE session_type, const bool is_binary, const bool use_size_header,
|
||||
const uint64_t (&metric_thresholds)[4], const std::set<conf::ip_port_pair> &req_known_remotes, const uint64_t max_msg_size);
|
||||
void stop();
|
||||
void firewall_ban(std::string_view ip, const bool unban);
|
||||
};
|
||||
public:
|
||||
// Start accepting incoming connections
|
||||
int start(
|
||||
const uint16_t port, const char *domain_socket_name, const SESSION_TYPE session_type, const bool is_binary, const bool use_size_header,
|
||||
const uint64_t (&metric_thresholds)[4], const std::set<conf::ip_port_pair> &req_known_remotes, const uint64_t max_msg_size);
|
||||
void stop();
|
||||
void firewall_ban(std::string_view ip, const bool unban);
|
||||
};
|
||||
|
||||
} // namespace comm
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
276
src/proc.cpp
276
src/proc.cpp
@@ -9,6 +9,7 @@
|
||||
|
||||
namespace proc
|
||||
{
|
||||
constexpr size_t OUTPUT_READ_BUF_SIZE = 64 * 1024; //64KB
|
||||
|
||||
// Enum used to differenciate pipe fds maintained for SC I/O pipes.
|
||||
enum FDTYPE
|
||||
@@ -38,7 +39,10 @@ namespace proc
|
||||
// Holds the hpfs rw process id (if currently executing).
|
||||
pid_t hpfs_pid;
|
||||
|
||||
const char *FINDMNT_COMMAND = "findmnt --noheadings ";
|
||||
// Thread to collect contract outputs while contract is running.
|
||||
std::thread output_fetcher_thread;
|
||||
|
||||
bool should_deinit = false;
|
||||
|
||||
/**
|
||||
* Executes the contract process and passes the specified arguments.
|
||||
@@ -46,18 +50,16 @@ namespace proc
|
||||
*/
|
||||
int exec_contract(const contract_exec_args &args, hpfs::h32 &state_hash)
|
||||
{
|
||||
// Setup io pipes and feed all inputs to them.
|
||||
create_iopipes_for_fdmap(userfds, args.userbufs);
|
||||
create_iopipes(nplfds);
|
||||
create_iopipes(hpscfds);
|
||||
|
||||
if (feed_inputs(args) != 0)
|
||||
return -1;
|
||||
|
||||
// Start the hpfs rw session before starting the contract process.
|
||||
if (start_hpfs_rw_session() != 0)
|
||||
return -1;
|
||||
|
||||
// Setup io pipes and feed all inputs to them.
|
||||
create_iopipes_for_fdmap(userfds, args.userbufs);
|
||||
create_iopipes(nplfds, !args.nplbuff.inputs.empty());
|
||||
create_iopipes(hpscfds, !args.hpscbufs.inputs.empty());
|
||||
|
||||
int ret = 0;
|
||||
const pid_t pid = fork();
|
||||
if (pid > 0)
|
||||
{
|
||||
@@ -67,23 +69,29 @@ namespace proc
|
||||
// Close all fds unused by HP process.
|
||||
close_unused_fds(true);
|
||||
|
||||
// Start the contract output collection thread.
|
||||
output_fetcher_thread = std::thread(fetch_outputs, std::ref(args));
|
||||
|
||||
// Write the inputs into the contract process.
|
||||
if (feed_inputs(args) != 0)
|
||||
goto failure;
|
||||
|
||||
// Wait for child process (contract process) to complete execution.
|
||||
const int presult = await_process_execution(contract_pid);
|
||||
contract_pid = 0;
|
||||
LOG_DBG << "Contract process ended.";
|
||||
|
||||
contract_pid = 0;
|
||||
// Wait for the output collection thread to gracefully stop.
|
||||
output_fetcher_thread.join();
|
||||
|
||||
if (presult != 0)
|
||||
{
|
||||
LOG_ERR << "Contract process exited with non-normal status code: " << presult;
|
||||
return -1;
|
||||
goto failure;
|
||||
}
|
||||
|
||||
if (stop_hpfs_rw_session(state_hash) != 0)
|
||||
return -1;
|
||||
|
||||
// After contract execution, collect contract outputs.
|
||||
if (fetch_outputs(args) != 0)
|
||||
return -1;
|
||||
goto failure;
|
||||
}
|
||||
else if (pid == 0)
|
||||
{
|
||||
@@ -123,10 +131,19 @@ namespace proc
|
||||
else
|
||||
{
|
||||
LOG_ERR << "fork() failed when starting contract process.";
|
||||
return -1;
|
||||
goto failure;
|
||||
}
|
||||
|
||||
return 0;
|
||||
goto success;
|
||||
failure:
|
||||
ret = -1;
|
||||
|
||||
success:
|
||||
cleanup_fdmap(userfds);
|
||||
cleanup_vectorfds(hpscfds);
|
||||
cleanup_vectorfds(nplfds);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -261,7 +278,6 @@ namespace proc
|
||||
// Write any verified (consensus-reached) user inputs to user pipes.
|
||||
if (write_contract_fdmap_inputs(userfds, args.userbufs) != 0)
|
||||
{
|
||||
cleanup_fdmap(userfds);
|
||||
LOG_ERR << "Failed to write user inputs to contract.";
|
||||
return -1;
|
||||
}
|
||||
@@ -271,19 +287,30 @@ namespace proc
|
||||
|
||||
int fetch_outputs(const contract_exec_args &args)
|
||||
{
|
||||
if (read_contract_hp_npl_outputs(args) != 0)
|
||||
while (true)
|
||||
{
|
||||
return -1;
|
||||
if (should_deinit)
|
||||
break;
|
||||
|
||||
const int hpsc_npl_res = read_contract_hp_npl_outputs(args);
|
||||
if (hpsc_npl_res == -1)
|
||||
return -1;
|
||||
|
||||
const int user_res = read_contract_fdmap_outputs(userfds, args.userbufs);
|
||||
if (user_res == -1)
|
||||
{
|
||||
LOG_ERR << "Error reading user outputs from the contract.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
// If no bytes were read after contract finished execution, exit the read loop.
|
||||
if (hpsc_npl_res == 0 && user_res == 0 && contract_pid == 0)
|
||||
break;
|
||||
|
||||
util::sleep(20);
|
||||
}
|
||||
|
||||
if (read_contract_fdmap_outputs(userfds, args.userbufs) != 0)
|
||||
{
|
||||
LOG_ERR << "Error reading User output from the contract.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
nplfds.clear();
|
||||
userfds.clear();
|
||||
LOG_DBG << "Contract outputs collected.\n";
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -311,27 +338,25 @@ namespace proc
|
||||
* Read all HP output messages produced by the contract process and store them in
|
||||
* the buffer for later processing.
|
||||
*
|
||||
* @return 0 on success. -1 on failure.
|
||||
* @return 0 if no bytes were read. 1 if bytes were read. -1 on failure.
|
||||
*/
|
||||
int read_contract_hp_npl_outputs(const contract_exec_args &args)
|
||||
{
|
||||
// Clear the input buffers because we are sure the contract has finished reading from
|
||||
// that mapped memory portion.
|
||||
args.hpscbufs.inputs.clear();
|
||||
|
||||
if (read_iopipe(hpscfds, args.hpscbufs.output) != 0) // hpscbufs.second is the output buffer.
|
||||
const int hpsc_res = read_iopipe(hpscfds, args.hpscbufs.output);
|
||||
if (hpsc_res == -1)
|
||||
{
|
||||
LOG_ERR << "Error reading HP output from the contract.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (read_iopipe(nplfds, args.nplbuff.output) != 0) // hpscbufs.second is the output buffer.
|
||||
const int npl_res = read_iopipe(nplfds, args.nplbuff.output);
|
||||
if (npl_res == -1)
|
||||
{
|
||||
LOG_ERR << "Error reading NPL output from the contract.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return (hpsc_res == 0 && npl_res == 0) ? 0 : 1;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -372,7 +397,7 @@ namespace proc
|
||||
for (auto &[pubkey, buflist] : bufmap)
|
||||
{
|
||||
std::vector<int> fds = std::vector<int>();
|
||||
if (create_iopipes(fds) != 0)
|
||||
if (create_iopipes(fds, !buflist.inputs.empty()) != 0)
|
||||
return -1;
|
||||
|
||||
fdmap.emplace(pubkey, std::move(fds));
|
||||
@@ -408,24 +433,25 @@ namespace proc
|
||||
*
|
||||
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
|
||||
* @param bufmap A map which has a public key and input/output buffer pair for that public key.
|
||||
* @return 0 on success. -1 on failure.
|
||||
* @return 0 if no bytes were read. 1 if bytes were read. -1 on failure.
|
||||
*/
|
||||
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap)
|
||||
{
|
||||
bool bytes_read = false;
|
||||
for (auto &[pubkey, bufpair] : bufmap)
|
||||
{
|
||||
// Clear the input buffer because we are sure the contract has finished reading from
|
||||
// the inputs' mapped memory portion.
|
||||
bufpair.inputs.clear();
|
||||
|
||||
// Get fds for the pubkey.
|
||||
std::vector<int> &fds = fdmap[pubkey];
|
||||
|
||||
if (read_iopipe(fds, bufpair.output) != 0) // bufpair.second is the output buffer.
|
||||
const int res = read_iopipe(fds, bufpair.output);
|
||||
if (res == -1)
|
||||
return -1;
|
||||
|
||||
if (res > 0)
|
||||
bytes_read = true;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return bytes_read ? 1 : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -435,33 +461,32 @@ namespace proc
|
||||
void cleanup_fdmap(contract_fdmap_t &fdmap)
|
||||
{
|
||||
for (auto &[pubkey, fds] : fdmap)
|
||||
{
|
||||
for (int i = 0; i < 4; i++)
|
||||
{
|
||||
if (fds[i] > 0)
|
||||
close(fds[i]);
|
||||
fds[i] = 0;
|
||||
}
|
||||
}
|
||||
cleanup_vectorfds(fds);
|
||||
|
||||
fdmap.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Common function to create a pair of pipes (Hp->SC, SC->HP).
|
||||
* @param fds Vector to populate fd list.
|
||||
* @param inputbuffer Buffer to write into the HP write fd.
|
||||
* @param create_inpipe Whether to create the input pipe from HP to SC.
|
||||
*/
|
||||
int create_iopipes(std::vector<int> &fds)
|
||||
int create_iopipes(std::vector<int> &fds, const bool create_inpipe)
|
||||
{
|
||||
int inpipe[2];
|
||||
if (pipe(inpipe) != 0)
|
||||
int inpipe[2] = {-1, -1};
|
||||
if (create_inpipe && pipe(inpipe) != 0)
|
||||
return -1;
|
||||
|
||||
int outpipe[2];
|
||||
int outpipe[2] = {-1, -1};
|
||||
if (pipe(outpipe) != 0)
|
||||
{
|
||||
// Close the earlier created pipe.
|
||||
close(inpipe[0]);
|
||||
close(inpipe[1]);
|
||||
if (create_inpipe)
|
||||
{
|
||||
// Close the earlier created pipe.
|
||||
close(inpipe[0]);
|
||||
close(inpipe[1]);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -485,11 +510,14 @@ namespace proc
|
||||
// Write the inputs (if any) into the contract and close the writefd.
|
||||
|
||||
const int writefd = fds[FDTYPE::HPWRITE];
|
||||
bool vmsplice_error = false;
|
||||
if (writefd == -1)
|
||||
return 0;
|
||||
|
||||
bool write_error = false;
|
||||
|
||||
if (!inputs.empty())
|
||||
{
|
||||
// Prepare the input memory segments to map with vmsplice.
|
||||
// Prepare the input memory segments to write with wrtiev.
|
||||
size_t i = 0;
|
||||
iovec memsegs[inputs.size()];
|
||||
for (std::string &input : inputs)
|
||||
@@ -499,21 +527,17 @@ namespace proc
|
||||
i++;
|
||||
}
|
||||
|
||||
// We use vmsplice to map (zero-copy) the inputs into the fd.
|
||||
if (vmsplice(writefd, memsegs, inputs.size(), 0) == -1)
|
||||
vmsplice_error = true;
|
||||
if (writev(writefd, memsegs, inputs.size()) == -1)
|
||||
write_error = true;
|
||||
|
||||
// It's important that we DO NOT clear the input buffer string until the contract
|
||||
// process has actually read from the fd. Because the OS is just mapping our
|
||||
// input buffer memory portion into the fd, if we clear it now, the contract process
|
||||
// will get invaid bytes when reading the fd.
|
||||
inputs.clear();
|
||||
}
|
||||
|
||||
// Close the writefd since we no longer need it.
|
||||
close(writefd);
|
||||
fds[FDTYPE::HPWRITE] = 0;
|
||||
fds[FDTYPE::HPWRITE] = -1;
|
||||
|
||||
return vmsplice_error ? -1 : 0;
|
||||
return write_error ? -1 : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -529,7 +553,10 @@ namespace proc
|
||||
* Length of the message is calculated without including public key length
|
||||
*/
|
||||
const int writefd = fds[FDTYPE::HPWRITE];
|
||||
bool vmsplice_error = false;
|
||||
if (writefd == -1)
|
||||
return 0;
|
||||
|
||||
bool write_error = false;
|
||||
if (!inputs.empty())
|
||||
{
|
||||
int8_t total_memsegs = inputs.size() * 3;
|
||||
@@ -579,56 +606,60 @@ namespace proc
|
||||
i++;
|
||||
}
|
||||
|
||||
if (vmsplice(writefd, memsegs, total_memsegs, 0) == -1)
|
||||
vmsplice_error = true;
|
||||
if (writev(writefd, memsegs, total_memsegs) == -1)
|
||||
write_error = true;
|
||||
|
||||
inputs.clear();
|
||||
}
|
||||
// It's important that we DO NOT clear the input buffer string until the contract
|
||||
// process has actually read from the fd. Because the OS is just mapping our
|
||||
// input buffer memory portion into the fd, if we clear it now, the contract process
|
||||
// will get invaid bytes when reading the fd.
|
||||
|
||||
// Close the writefd since we no longer need it.
|
||||
close(writefd);
|
||||
fds[FDTYPE::HPWRITE] = 0;
|
||||
fds[FDTYPE::HPWRITE] = -1;
|
||||
|
||||
return vmsplice_error ? -1 : 0;
|
||||
return write_error ? -1 : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Common function to read and close SC output from the pipe and populate the output list.
|
||||
* Common function to read buffered output from the pipe and populate the output list.
|
||||
* @param fds Vector representing the pipes fd list.
|
||||
* @param output The buffer to place the read output.
|
||||
* @return -1 on error. Otherwise no. of bytes read.
|
||||
*/
|
||||
int read_iopipe(std::vector<int> &fds, std::string &output)
|
||||
{
|
||||
// Read any data that have been written by the contract process
|
||||
// Read any available data that have been written by the contract process
|
||||
// from the output pipe and store in the output buffer.
|
||||
// Outputs will be read by the consensus process later when it wishes so.
|
||||
|
||||
const int readfd = fds[FDTYPE::HPREAD];
|
||||
int bytes_available = 0;
|
||||
ioctl(readfd, FIONREAD, &bytes_available);
|
||||
bool vmsplice_error = false;
|
||||
if (readfd == -1)
|
||||
return 0;
|
||||
|
||||
if (bytes_available > 0)
|
||||
bool read_error = false;
|
||||
size_t available_bytes = 0;
|
||||
if (ioctl(readfd, FIONREAD, &available_bytes) != -1)
|
||||
{
|
||||
output.resize(bytes_available);
|
||||
if (available_bytes == 0)
|
||||
return 0;
|
||||
|
||||
// Populate the user output buffer with new data from the pipe.
|
||||
// We use vmsplice to map (zero-copy) the output from the fd into output bbuffer.
|
||||
iovec memsegs[1];
|
||||
memsegs[0].iov_base = output.data();
|
||||
memsegs[0].iov_len = bytes_available;
|
||||
const size_t current_size = output.size();
|
||||
output.resize(current_size + available_bytes);
|
||||
const int res = read(readfd, output.data() + current_size, available_bytes);
|
||||
|
||||
if (vmsplice(readfd, memsegs, 1, 0) == -1)
|
||||
vmsplice_error = true;
|
||||
if (res >= 0)
|
||||
{
|
||||
if (res == 0) // EOF
|
||||
{
|
||||
close(readfd);
|
||||
fds[FDTYPE::HPREAD] = -1;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
// Close readfd fd on HP process side because we are done with contract process I/O.
|
||||
close(readfd);
|
||||
fds[FDTYPE::HPREAD] = 0;
|
||||
|
||||
return vmsplice_error ? -1 : 0;
|
||||
fds[FDTYPE::HPREAD] = -1;
|
||||
return -1;
|
||||
}
|
||||
|
||||
void close_unused_fds(const bool is_hp)
|
||||
@@ -649,35 +680,54 @@ namespace proc
|
||||
*/
|
||||
void close_unused_vectorfds(const bool is_hp, std::vector<int> &fds)
|
||||
{
|
||||
if (is_hp)
|
||||
{
|
||||
// Close unused fds in Hot Pocket process.
|
||||
close(fds[FDTYPE::SCREAD]);
|
||||
fds[FDTYPE::SCREAD] = 0;
|
||||
close(fds[FDTYPE::SCWRITE]);
|
||||
fds[FDTYPE::SCWRITE] = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Close unused fds in smart contract process.
|
||||
close(fds[FDTYPE::HPREAD]);
|
||||
fds[FDTYPE::HPREAD] = 0;
|
||||
const int fdtypes_to_close[2] = {
|
||||
is_hp ? FDTYPE::SCREAD : FDTYPE::HPREAD,
|
||||
is_hp ? FDTYPE::SCWRITE : FDTYPE::HPWRITE,
|
||||
};
|
||||
|
||||
// HPWRITE fd has aleady been closed by HP process after writing
|
||||
// inputs (before the fork).
|
||||
for (const int fdtype : fdtypes_to_close)
|
||||
{
|
||||
const int fd = fds[fdtype];
|
||||
if (fd != -1)
|
||||
{
|
||||
close(fd);
|
||||
fds[fdtype] = -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes all fds in a vector fd set.
|
||||
*/
|
||||
void cleanup_vectorfds(std::vector<int> &fds)
|
||||
{
|
||||
for (int i = 0; i < fds.size(); i++)
|
||||
{
|
||||
if (fds[i] != -1)
|
||||
{
|
||||
close(fds[i]);
|
||||
fds[i] = -1;
|
||||
}
|
||||
}
|
||||
|
||||
fds.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup any running processes.
|
||||
*/
|
||||
void deinit()
|
||||
{
|
||||
should_deinit = true;
|
||||
|
||||
if (contract_pid > 0)
|
||||
util::kill_process(contract_pid, true);
|
||||
|
||||
if (hpfs_pid > 0)
|
||||
util::kill_process(hpfs_pid, true);
|
||||
|
||||
if (output_fetcher_thread.joinable())
|
||||
output_fetcher_thread.join();
|
||||
}
|
||||
|
||||
} // namespace proc
|
||||
|
||||
137
src/proc.hpp
137
src/proc.hpp
@@ -12,106 +12,107 @@
|
||||
namespace proc
|
||||
{
|
||||
|
||||
/**
|
||||
/**
|
||||
* Represents list of inputs to the contract and the accumulated contract output for those inputs.
|
||||
*/
|
||||
struct contract_iobuf_pair
|
||||
{
|
||||
// List of inputs to be fed into the contract.
|
||||
std::list<std::string> inputs;
|
||||
struct contract_iobuf_pair
|
||||
{
|
||||
// List of inputs to be fed into the contract.
|
||||
std::list<std::string> inputs;
|
||||
|
||||
// Output emitted by contract after execution.
|
||||
// (Because we are reading output at the end, there's no way to
|
||||
// get a "list" of outputs. So it's always a one contiguous output.)
|
||||
std::string output;
|
||||
};
|
||||
// Output emitted by contract after execution.
|
||||
// (Because we are reading output at the end, there's no way to
|
||||
// get a "list" of outputs. So it's always a one contiguous output.)
|
||||
std::string output;
|
||||
};
|
||||
|
||||
// Common typedef for a map of pubkey->fdlist.
|
||||
// This is used to keep track of fdlist quadruplet with a public key (eg. user, npl).
|
||||
typedef std::unordered_map<std::string, std::vector<int>> contract_fdmap_t;
|
||||
// Common typedef for a map of pubkey->fdlist.
|
||||
// This is used to keep track of fdlist quadruplet with a public key (eg. user, npl).
|
||||
typedef std::unordered_map<std::string, std::vector<int>> contract_fdmap_t;
|
||||
|
||||
// Common typedef for a map of pubkey->I/O list pair (input list and output list).
|
||||
// This is used to keep track of input/output buffers for a given public key (eg. user, npl)
|
||||
typedef std::unordered_map<std::string, contract_iobuf_pair> contract_bufmap_t;
|
||||
// Common typedef for a map of pubkey->I/O list pair (input list and output list).
|
||||
// This is used to keep track of input/output buffers for a given public key (eg. user, npl)
|
||||
typedef std::unordered_map<std::string, contract_iobuf_pair> contract_bufmap_t;
|
||||
|
||||
/**
|
||||
/**
|
||||
* Holds information that should be passed into the contract process.
|
||||
*/
|
||||
struct contract_exec_args
|
||||
{
|
||||
// Map of user I/O buffers (map key: user binary public key).
|
||||
// The value is a pair holding consensus-verified inputs and contract-generated outputs.
|
||||
contract_bufmap_t &userbufs;
|
||||
|
||||
// Pair of NPL<->SC byte array message buffers.
|
||||
// Input buffers for NPL->SC messages, Output buffers for SC->NPL messages.
|
||||
contract_iobuf_pair &nplbuff;
|
||||
|
||||
// Pair of HP<->SC JSON message buffers (mainly used for control messages).
|
||||
// Input buffers for HP->SC messages, Output buffers for SC->HP messages.
|
||||
contract_iobuf_pair &hpscbufs;
|
||||
|
||||
// Current HotPocket timestamp.
|
||||
const int64_t timestamp;
|
||||
|
||||
contract_exec_args(
|
||||
int64_t timestamp,
|
||||
contract_bufmap_t &userbufs,
|
||||
contract_iobuf_pair &nplbuff,
|
||||
contract_iobuf_pair &hpscbufs) :
|
||||
userbufs(userbufs),
|
||||
nplbuff(nplbuff),
|
||||
hpscbufs(hpscbufs),
|
||||
timestamp(timestamp)
|
||||
struct contract_exec_args
|
||||
{
|
||||
}
|
||||
};
|
||||
// Map of user I/O buffers (map key: user binary public key).
|
||||
// The value is a pair holding consensus-verified inputs and contract-generated outputs.
|
||||
contract_bufmap_t &userbufs;
|
||||
|
||||
int exec_contract(const contract_exec_args &args, hpfs::h32 &state_hash);
|
||||
// Pair of NPL<->SC byte array message buffers.
|
||||
// Input buffers for NPL->SC messages, Output buffers for SC->NPL messages.
|
||||
contract_iobuf_pair &nplbuff;
|
||||
|
||||
void deinit();
|
||||
// Pair of HP<->SC JSON message buffers (mainly used for control messages).
|
||||
// Input buffers for HP->SC messages, Output buffers for SC->HP messages.
|
||||
contract_iobuf_pair &hpscbufs;
|
||||
|
||||
//------Internal-use functions for this namespace.
|
||||
// Current HotPocket timestamp.
|
||||
const int64_t timestamp;
|
||||
|
||||
int await_process_execution(pid_t pid);
|
||||
contract_exec_args(
|
||||
int64_t timestamp,
|
||||
contract_bufmap_t &userbufs,
|
||||
contract_iobuf_pair &nplbuff,
|
||||
contract_iobuf_pair &hpscbufs) : userbufs(userbufs),
|
||||
nplbuff(nplbuff),
|
||||
hpscbufs(hpscbufs),
|
||||
timestamp(timestamp)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
int start_hpfs_rw_session();
|
||||
int exec_contract(const contract_exec_args &args, hpfs::h32 &state_hash);
|
||||
|
||||
int stop_hpfs_rw_session(hpfs::h32 &state_hash);
|
||||
void deinit();
|
||||
|
||||
int write_contract_args(const contract_exec_args &args);
|
||||
//------Internal-use functions for this namespace.
|
||||
|
||||
int feed_inputs(const contract_exec_args &args);
|
||||
int await_process_execution(pid_t pid);
|
||||
|
||||
int fetch_outputs(const contract_exec_args &args);
|
||||
int start_hpfs_rw_session();
|
||||
|
||||
int write_contract_hp_npl_inputs(const contract_exec_args &args);
|
||||
int stop_hpfs_rw_session(hpfs::h32 &state_hash);
|
||||
|
||||
int read_contract_hp_npl_outputs(const contract_exec_args &args);
|
||||
int write_contract_args(const contract_exec_args &args);
|
||||
|
||||
// Common helper functions
|
||||
int feed_inputs(const contract_exec_args &args);
|
||||
|
||||
void fdmap_json_to_stream(const contract_fdmap_t &fdmap, std::ostringstream &os);
|
||||
int fetch_outputs(const contract_exec_args &args);
|
||||
|
||||
int create_iopipes_for_fdmap(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
|
||||
int write_contract_hp_npl_inputs(const contract_exec_args &args);
|
||||
|
||||
int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
|
||||
int read_contract_hp_npl_outputs(const contract_exec_args &args);
|
||||
|
||||
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
|
||||
// Common helper functions
|
||||
|
||||
void cleanup_fdmap(contract_fdmap_t &fdmap);
|
||||
void fdmap_json_to_stream(const contract_fdmap_t &fdmap, std::ostringstream &os);
|
||||
|
||||
int create_iopipes(std::vector<int> &fds);
|
||||
int create_iopipes_for_fdmap(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
|
||||
|
||||
int write_iopipe(std::vector<int> &fds, std::list<std::string> &inputs);
|
||||
int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
|
||||
|
||||
int write_npl_iopipe(std::vector<int> &fds, std::list<std::string> &inputs);
|
||||
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
|
||||
|
||||
int read_iopipe(std::vector<int> &fds, std::string &output);
|
||||
void cleanup_fdmap(contract_fdmap_t &fdmap);
|
||||
|
||||
void close_unused_fds(const bool is_hp);
|
||||
int create_iopipes(std::vector<int> &fds, const bool create_inpipe);
|
||||
|
||||
void close_unused_vectorfds(const bool is_hp, std::vector<int> &fds);
|
||||
int write_iopipe(std::vector<int> &fds, std::list<std::string> &inputs);
|
||||
|
||||
int write_npl_iopipe(std::vector<int> &fds, std::list<std::string> &inputs);
|
||||
|
||||
int read_iopipe(std::vector<int> &fds, std::string &output);
|
||||
|
||||
void close_unused_fds(const bool is_hp);
|
||||
|
||||
void close_unused_vectorfds(const bool is_hp, std::vector<int> &fds);
|
||||
|
||||
void cleanup_vectorfds(std::vector<int> &fds);
|
||||
|
||||
} // namespace proc
|
||||
|
||||
|
||||
@@ -115,16 +115,16 @@ done
|
||||
for (( i=1; i<=$ncount; i++ ))
|
||||
do
|
||||
|
||||
mkdir -p ./node$i/statehist/0/data/ > /dev/null 2>&1
|
||||
mkdir -p ./node$i/state/seed > /dev/null 2>&1
|
||||
|
||||
# Load credit balance for user for testing purposes.
|
||||
pushd ./node$i/statehist/0/data/ > /dev/null 2>&1
|
||||
# Load credit balance for user for appbill testing purposes.
|
||||
pushd ./node$i/state/seed/ > /dev/null 2>&1
|
||||
>appbill.table
|
||||
../../../../../bin/appbill --credit "705bf26354ee4c63c0e5d5d883c07cefc3196d049bd3825f827eb3bc23ead035" 10000
|
||||
../../../../bin/appbill --credit "705bf26354ee4c63c0e5d5d883c07cefc3196d049bd3825f827eb3bc23ead035" 10000
|
||||
popd > /dev/null 2>&1
|
||||
|
||||
# Copy any more initial state files for testing.
|
||||
#cp ~/my_big_file ~/hpcore/hpcluster/node$i/statehist/0/data/
|
||||
#cp ~/my_big_file ~/hpcore/hpcluster/node$i/state/seed/
|
||||
|
||||
done
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ fi
|
||||
if [ $mode = "check" ]; then
|
||||
let nodeid=$2-1
|
||||
vmip=${vmips[$nodeid]}
|
||||
sshpass -f vmpass.txt ssh geveo@$vmip 'echo hpcore pid:$(pidof hpcore) hpfs pid:$(pidof hpfs) websocketd pid:$(pidof websocketd)'
|
||||
sshpass -f vmpass.txt ssh geveo@$vmip 'echo hpcore pid:$(pidof hpcore) hpfs pid:$(pidof hpfs) websocketd pid:$(pidof websocketd) websocat pid:$(pidof websocat)'
|
||||
exit 0
|
||||
fi
|
||||
|
||||
@@ -59,6 +59,7 @@ if [ $mode = "kill" ]; then
|
||||
sshpass -f vmpass.txt ssh geveo@$vmip 'sudo kill $(pidof hpcore) > /dev/null 2>&1'
|
||||
sshpass -f vmpass.txt ssh geveo@$vmip 'sudo kill $(pidof hpfs) > /dev/null 2>&1'
|
||||
sshpass -f vmpass.txt ssh geveo@$vmip 'sudo kill $(pidof websocketd) > /dev/null 2>&1'
|
||||
sshpass -f vmpass.txt ssh geveo@$vmip 'sudo kill $(pidof websocat) > /dev/null 2>&1'
|
||||
exit 0
|
||||
fi
|
||||
|
||||
|
||||
@@ -10,9 +10,9 @@
|
||||
|
||||
name=$1
|
||||
loc=$2
|
||||
vmsize=Standard_B1ls
|
||||
vmsize=Standard_B1s
|
||||
vmpass=$(cat vmpass.txt)
|
||||
resgroup=My-ResGroup
|
||||
resgroup=HotPocket-ResGroup
|
||||
|
||||
az vm create --name $name --resource-group $resgroup --size $vmsize --admin-username geveo --admin-password $vmpass --image UbuntuLTS --location $loc --generate-ssh-keys
|
||||
az vm open-port --resource-group $resgroup --name $name --port 22860 --priority 900 && \
|
||||
|
||||
Reference in New Issue
Block a user