mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Contract execution refactor. (#153)
This commit is contained in:
@@ -21,13 +21,19 @@ const echoContract = (ctx) => {
|
||||
else {
|
||||
await user.send("Echoing: " + msg);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Broadcast message to all connected users.
|
||||
// ctx.users.get().forEach(u => u.send("Hello"));
|
||||
// Get list of all users who are connected.
|
||||
// ctx.users.get();
|
||||
|
||||
// Send message to specific user (identified by public key).
|
||||
// await ctx.users.find(<PubkeyHex>).send("Hello");
|
||||
// Get the user identified by public key.
|
||||
// ctx.users.find("<PubkeyHex>");
|
||||
|
||||
// Get list of all peers in the cluster.
|
||||
// ctx.peers.get();
|
||||
|
||||
// Get the peer identified by public key.
|
||||
// ctx.peers.find("<PubkeyHex>");
|
||||
|
||||
// Peer messages example.
|
||||
// if (!ctx.readonly) {
|
||||
|
||||
@@ -29,9 +29,9 @@ class HotPocketContract {
|
||||
const executionContext = new ContractExecutionContext(hpargs, users, peers);
|
||||
|
||||
this.events.emit("session_start");
|
||||
invokeCallback(contractFunc, executionContext).then(() => {
|
||||
invokeCallback(contractFunc, executionContext).catch(errHandler).finally(() => {
|
||||
// Wait for any pending tasks added during execution.
|
||||
Promise.all(pendingTasks).then(() => {
|
||||
Promise.all(pendingTasks).catch(errHandler).finally(() => {
|
||||
this.events.emit("session_end");
|
||||
this.#terminate();
|
||||
});
|
||||
@@ -121,7 +121,7 @@ class UsersCollection {
|
||||
if (pendingUserCount == 0) {
|
||||
// All user message events has been emitted.
|
||||
// Now start waiting for queued up user message callback completion.
|
||||
Promise.all(userMessageTasks).then(allUsersCompletionResolver)
|
||||
Promise.all(userMessageTasks).catch(errHandler).finally(allUsersCompletionResolver)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -368,13 +368,15 @@ const invokeCallback = async (callback, ...args) => {
|
||||
return;
|
||||
|
||||
if (callback.constructor.name === 'AsyncFunction') {
|
||||
await callback(...args);
|
||||
await callback(...args).catch(errHandler);
|
||||
}
|
||||
else {
|
||||
callback(...args);
|
||||
}
|
||||
}
|
||||
|
||||
const errHandler = (err) => console.log(err);
|
||||
|
||||
module.exports = {
|
||||
HotPocketContract
|
||||
}
|
||||
@@ -40,9 +40,6 @@ namespace consensus
|
||||
ctx.stage_time = (conf::cfg.roundtime * 2) / 7;
|
||||
ctx.stage_reset_wait_threshold = conf::cfg.roundtime / 10;
|
||||
|
||||
ctx.contract_ctx.args.state_dir = conf::ctx.state_rw_dir;
|
||||
ctx.contract_ctx.args.readonly = false;
|
||||
|
||||
// Starting consensus processing thread.
|
||||
ctx.consensus_thread = std::thread(run_consensus);
|
||||
|
||||
@@ -61,7 +58,11 @@ namespace consensus
|
||||
ctx.is_shutting_down = true;
|
||||
|
||||
// Stop the contract if running.
|
||||
sc::stop(ctx.contract_ctx);
|
||||
{
|
||||
std::scoped_lock lock(ctx.contract_ctx_mutex);
|
||||
if (ctx.contract_ctx)
|
||||
sc::stop(ctx.contract_ctx.value());
|
||||
}
|
||||
|
||||
// Joining consensus processing thread.
|
||||
if (ctx.consensus_thread.joinable())
|
||||
@@ -361,7 +362,10 @@ namespace consensus
|
||||
*/
|
||||
bool push_npl_message(p2p::npl_message &npl_msg)
|
||||
{
|
||||
return ctx.contract_ctx.args.npl_messages.try_enqueue(npl_msg);
|
||||
std::scoped_lock lock(ctx.contract_ctx_mutex);
|
||||
if (ctx.contract_ctx)
|
||||
return ctx.contract_ctx->args.npl_messages.try_enqueue(npl_msg);
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -371,7 +375,10 @@ namespace consensus
|
||||
*/
|
||||
bool push_control_message(const std::string &control_msg)
|
||||
{
|
||||
return ctx.contract_ctx.args.control_messages.try_enqueue(control_msg);
|
||||
std::scoped_lock lock(ctx.contract_ctx_mutex);
|
||||
if (ctx.contract_ctx)
|
||||
return ctx.contract_ctx->args.control_messages.try_enqueue(control_msg);
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -721,16 +728,23 @@ namespace consensus
|
||||
dispatch_user_outputs(cons_prop, new_lcl_seq_no, new_lcl);
|
||||
|
||||
// Execute the contract
|
||||
if (!ctx.is_shutting_down)
|
||||
{
|
||||
sc::contract_execution_args &args = ctx.contract_ctx.args;
|
||||
{
|
||||
std::scoped_lock lock(ctx.contract_ctx_mutex);
|
||||
ctx.contract_ctx.emplace();
|
||||
}
|
||||
|
||||
sc::contract_execution_args &args = ctx.contract_ctx->args;
|
||||
args.state_dir = conf::ctx.state_rw_dir;
|
||||
args.readonly = false;
|
||||
args.time = cons_prop.time;
|
||||
args.lcl = new_lcl;
|
||||
|
||||
// Populate user bufs.
|
||||
feed_user_inputs_to_contract_bufmap(args.userbufs, cons_prop);
|
||||
// TODO: Do something usefull with HP<-->SC channel.
|
||||
|
||||
if (sc::execute_contract(ctx.contract_ctx) == -1)
|
||||
if (sc::execute_contract(ctx.contract_ctx.value()) == -1)
|
||||
{
|
||||
LOG_ERROR << "Contract execution failed.";
|
||||
return -1;
|
||||
@@ -741,7 +755,10 @@ namespace consensus
|
||||
|
||||
extract_user_outputs_from_contract_bufmap(args.userbufs);
|
||||
|
||||
sc::clear_args(args);
|
||||
{
|
||||
std::scoped_lock lock(ctx.contract_ctx_mutex);
|
||||
ctx.contract_ctx.reset();
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
@@ -68,7 +68,8 @@ namespace consensus
|
||||
uint16_t stage_time = 0; // Time allocated to a consensus stage.
|
||||
uint16_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage.
|
||||
|
||||
sc::execution_context contract_ctx;
|
||||
std::optional<sc::execution_context> contract_ctx;
|
||||
std::mutex contract_ctx_mutex;
|
||||
bool is_shutting_down = false;
|
||||
|
||||
std::thread consensus_thread;
|
||||
|
||||
286
src/sc.cpp
286
src/sc.cpp
@@ -62,7 +62,7 @@ namespace sc
|
||||
if (start_hpfs_session(ctx) == -1)
|
||||
return -1;
|
||||
|
||||
// Setup io sockets and feed all inputs to them.
|
||||
// Setup user io sockets and feed all inputs to them.
|
||||
create_iosockets_for_fdmap(ctx.userfds, ctx.args.userbufs);
|
||||
|
||||
if (!ctx.args.readonly)
|
||||
@@ -87,38 +87,12 @@ namespace sc
|
||||
// Close all fds unused by HP process.
|
||||
close_unused_fds(ctx, true);
|
||||
|
||||
// Start the contract output collection thread.
|
||||
ctx.contract_io_thread = std::thread(handle_contract_io, std::ref(ctx));
|
||||
// Start the contract monitor thread.
|
||||
ctx.contract_monitor_thread = std::thread(contract_monitor_loop, std::ref(ctx));
|
||||
|
||||
// Write the inputs into the contract process.
|
||||
if (feed_inputs(ctx) == -1)
|
||||
{
|
||||
util::kill_process(pid, true);
|
||||
ctx.contract_pid = 0;
|
||||
ctx.args.contract_terminated = true;
|
||||
goto failure;
|
||||
}
|
||||
|
||||
// Wait for child process (contract process) to complete execution.
|
||||
const int presult = await_process_execution(ctx.contract_pid);
|
||||
ctx.contract_pid = 0;
|
||||
ctx.args.contract_terminated = true;
|
||||
LOG_DEBUG << "Contract process ended." << (ctx.args.readonly ? " (rdonly)" : "");
|
||||
|
||||
// There could be 2 reasons for the contract to end; the contract voluntary finished execution or
|
||||
// it was killed due to Hot Pocket shutting down.
|
||||
|
||||
// Wait for the i/o thread to gracefully stop if this is voluntary contract termination.
|
||||
// 'ctx.should_stop' indicates Hot Pocket is shutting down. If that's the case ouput collection thread
|
||||
// is joined by the deinit logic.
|
||||
if (!ctx.should_stop && ctx.contract_io_thread.joinable())
|
||||
ctx.contract_io_thread.join();
|
||||
|
||||
if (presult != 0)
|
||||
{
|
||||
LOG_ERROR << "Contract process exited with non-normal status code: " << presult;
|
||||
goto failure;
|
||||
}
|
||||
// Wait for the contract monitor thread to gracefully stop along with the contract process.
|
||||
if (ctx.contract_monitor_thread.joinable())
|
||||
ctx.contract_monitor_thread.join();
|
||||
}
|
||||
else if (pid == 0)
|
||||
{
|
||||
@@ -153,7 +127,7 @@ namespace sc
|
||||
|
||||
chdir(ctx.args.state_dir.c_str());
|
||||
|
||||
int ret = execv(execv_args[0], execv_args);
|
||||
execv(execv_args[0], execv_args);
|
||||
std::cerr << errno << ": Contract process execv failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n";
|
||||
exit(1);
|
||||
}
|
||||
@@ -171,25 +145,45 @@ namespace sc
|
||||
if (stop_hpfs_session(ctx) == -1)
|
||||
ret = -1;
|
||||
|
||||
// Cleaning the user fdmap after executing the contract.
|
||||
cleanup_fdmap(ctx.userfds);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks the calling thread until the specified process completed exeution (if running).
|
||||
* @return 0 if process exited normally or exit code of process if abnormally exited.
|
||||
* Checks whether the contract process has exited.
|
||||
* @param ctx Contract execution context.
|
||||
* @param block Whether to block and wait until the contract has exited.
|
||||
* @return 0 if child has not exited. 1 if exited successfully. -1 if exited abnormally
|
||||
*/
|
||||
int await_process_execution(pid_t pid)
|
||||
int check_contract_exited(execution_context &ctx, const bool block)
|
||||
{
|
||||
if (pid > 0)
|
||||
int scstatus = 0;
|
||||
const int wait_res = waitpid(ctx.contract_pid, &scstatus, block ? 0 : WNOHANG);
|
||||
|
||||
if (wait_res == 0) // Child still running.
|
||||
{
|
||||
int scstatus = 0;
|
||||
waitpid(pid, &scstatus, 0);
|
||||
if (!WIFEXITED(scstatus))
|
||||
return WEXITSTATUS(scstatus);
|
||||
return 0;
|
||||
}
|
||||
if (wait_res == -1)
|
||||
{
|
||||
LOG_ERROR << errno << ": Contract process waitpid error. pid:" << ctx.contract_pid;
|
||||
ctx.contract_pid = 0;
|
||||
return -1;
|
||||
}
|
||||
else // Child has exited
|
||||
{
|
||||
ctx.contract_pid = 0;
|
||||
|
||||
if (WIFEXITED(scstatus))
|
||||
{
|
||||
LOG_DEBUG << "Contract process" << (ctx.args.readonly ? " (rdonly)" : "") << " ended normally.";
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_ERROR << "Contract process" << (ctx.args.readonly ? " (rdonly)" : "") << " ended with code " << WEXITSTATUS(scstatus);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -324,56 +318,81 @@ namespace sc
|
||||
return 0;
|
||||
}
|
||||
|
||||
int feed_inputs(execution_context &ctx)
|
||||
{
|
||||
// Write any user inputs to user sockets.
|
||||
if (write_contract_fdmap_inputs(ctx.userfds, ctx.args.userbufs) == -1)
|
||||
{
|
||||
LOG_ERROR << "Failed to write user inputs to contract.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect contract outputs and feed npl messages while contract is running.
|
||||
* Feeds and collect contract messages.
|
||||
* @param ctx Contract execution context.
|
||||
* @return Returns -1 if the operation fails otherwise 0.
|
||||
*/
|
||||
int handle_contract_io(execution_context &ctx)
|
||||
void contract_monitor_loop(execution_context &ctx)
|
||||
{
|
||||
util::mask_signal();
|
||||
|
||||
while (true)
|
||||
// Write any user inputs to the contract.
|
||||
if (write_contract_fdmap_inputs(ctx.userfds, ctx.args.userbufs) == -1)
|
||||
{
|
||||
if (ctx.should_stop)
|
||||
break;
|
||||
|
||||
const int hpsc_res = read_contract_hp_outputs(ctx);
|
||||
const int npl_read_res = ctx.args.readonly ? 0 : read_contract_npl_outputs(ctx);
|
||||
const int user_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs, ctx.args.contract_terminated);
|
||||
|
||||
if (!ctx.args.contract_terminated)
|
||||
LOG_ERROR << "Failed to write user inputs to contract.";
|
||||
}
|
||||
else
|
||||
{
|
||||
while (!ctx.is_shutting_down)
|
||||
{
|
||||
if (!ctx.args.readonly)
|
||||
write_npl_messages(ctx);
|
||||
write_contract_hp_inputs(ctx);
|
||||
}
|
||||
// Atempt to read messages from contract (regardless of contract terminated or not).
|
||||
const int hpsc_read_res = read_contract_hp_outputs(ctx);
|
||||
const int npl_read_res = ctx.args.readonly ? 0 : read_contract_npl_outputs(ctx);
|
||||
const int user_read_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs);
|
||||
|
||||
// If no bytes were read after contract finished execution, exit the read loop.
|
||||
if (hpsc_res <= 0 && npl_read_res <= 0 && user_res <= 0 && ctx.args.contract_terminated)
|
||||
{
|
||||
break;
|
||||
}
|
||||
else if (hpsc_res <= 0 && npl_read_res <= 0 && user_res <= 0)
|
||||
{
|
||||
util::sleep(20);
|
||||
if (ctx.termination_signaled || ctx.contract_pid == 0)
|
||||
{
|
||||
// If no bytes were read after contract finished execution, exit the loop.
|
||||
// Otherwise keep running the loop becaue there might be further messages to read.
|
||||
if ((hpsc_read_res + npl_read_res + user_read_res) == 0)
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
// We assume contract is still running. Attempt to write any queued messages to the contract.
|
||||
|
||||
const int npl_write_res = ctx.args.readonly ? 0 : write_npl_messages(ctx);
|
||||
if (npl_write_res == -1)
|
||||
break;
|
||||
|
||||
const int hpsc_write_res = write_contract_hp_inputs(ctx);
|
||||
if (hpsc_write_res == -1)
|
||||
break;
|
||||
|
||||
// If no operation was performed during this iteration, wait for a small delay until the next iteration.
|
||||
// This means there were no queued messages from either side.
|
||||
if ((hpsc_read_res + npl_read_res + user_read_res + hpsc_write_res + hpsc_write_res) == 0)
|
||||
util::sleep(20);
|
||||
}
|
||||
|
||||
// Check if contract process has exited on its own during the loop.
|
||||
if (ctx.contract_pid > 0)
|
||||
check_contract_exited(ctx, false);
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DEBUG << "Contract outputs collected.";
|
||||
return 0;
|
||||
// Close all fds.
|
||||
cleanup_vectorfds(ctx.hpscfds);
|
||||
cleanup_vectorfds(ctx.nplfds);
|
||||
for (auto &[pubkey, fds] : ctx.userfds)
|
||||
cleanup_vectorfds(fds);
|
||||
ctx.userfds.clear();
|
||||
|
||||
// If we reach this point but the contract is still running, then we need to kill the contract by force.
|
||||
// This can be the case if HP is shutting down, or there was an error in initial feeding of inputs.
|
||||
if (ctx.contract_pid > 0)
|
||||
{
|
||||
// Check if the contract has exited voluntarily.
|
||||
if (check_contract_exited(ctx, false) == 0)
|
||||
{
|
||||
// Issue kill signal if the contract hasn't indicated the termination control message.
|
||||
if (!ctx.termination_signaled)
|
||||
kill(ctx.contract_pid, SIGTERM);
|
||||
check_contract_exited(ctx, true); // Blocking wait until exit.
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DEBUG << "Contract monitor stopped";
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -398,7 +417,7 @@ namespace sc
|
||||
/**
|
||||
* Write npl messages to the contract.
|
||||
* @param ctx Contract execution context.
|
||||
* @return Returns -1 when fails otherwise 0.
|
||||
* @return Returns -1 when fails. 0 if no messages were written. 1 if some messages were written.
|
||||
*/
|
||||
int write_npl_messages(execution_context &ctx)
|
||||
{
|
||||
@@ -420,10 +439,19 @@ namespace sc
|
||||
{
|
||||
// Writing the public key to the contract's fd (Skip first byte for key type prefix).
|
||||
if (write(writefd, npl_msg.pubkey.data() + 1, npl_msg.pubkey.size() - 1) == -1)
|
||||
{
|
||||
LOG_ERROR << errno << ": Error writing npl message pubkey.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Writing the message to the contract's fd.
|
||||
if (write(writefd, npl_msg.data.data(), npl_msg.data.size()) == -1)
|
||||
{
|
||||
LOG_ERROR << errno << ": Error writing npl message data.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -438,41 +466,37 @@ namespace sc
|
||||
* Read all HP output messages produced by the contract process and store them in
|
||||
* the buffer for later processing.
|
||||
*
|
||||
* @return 0 if no bytes were read. 1 if bytes were read. -1 on failure.
|
||||
* @return 0 if no bytes were read. 1 if bytes were read..
|
||||
*/
|
||||
int read_contract_hp_outputs(execution_context &ctx)
|
||||
{
|
||||
std::string output;
|
||||
const int hpsc_res = read_iosocket(false, ctx.hpscfds, output, ctx.args.contract_terminated);
|
||||
const int hpsc_res = read_iosocket(false, ctx.hpscfds, output);
|
||||
if (hpsc_res == -1)
|
||||
{
|
||||
LOG_ERROR << "Error reading HP output from the contract.";
|
||||
return -1;
|
||||
}
|
||||
else if (hpsc_res > 0)
|
||||
{
|
||||
// ctx.args.hpscbufs.outputs.push_back(output);
|
||||
|
||||
handle_control_msgs(ctx.args, output);
|
||||
handle_control_msgs(ctx, output);
|
||||
}
|
||||
|
||||
return (hpsc_res == 0) ? 0 : 1;
|
||||
return (hpsc_res > 0) ? 1 : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read all NPL output messages produced by the contract process and broadcast them.
|
||||
* @param ctx contract execution context.
|
||||
* @return 0 if no bytes were read. 1 if bytes were read. -1 on failure.
|
||||
* @return 0 if no bytes were read. 1 if bytes were read.
|
||||
*/
|
||||
int read_contract_npl_outputs(execution_context &ctx)
|
||||
{
|
||||
std::string output;
|
||||
const int npl_res = read_iosocket(false, ctx.nplfds, output, ctx.args.contract_terminated);
|
||||
const int npl_res = read_iosocket(false, ctx.nplfds, output);
|
||||
|
||||
if (npl_res == -1)
|
||||
{
|
||||
LOG_ERROR << "Error reading NPL output from the contract.";
|
||||
return -1;
|
||||
}
|
||||
else if (npl_res > 0)
|
||||
{
|
||||
@@ -480,7 +504,7 @@ namespace sc
|
||||
broadcast_npl_output(output);
|
||||
}
|
||||
|
||||
return (npl_res == 0) ? 0 : 1;
|
||||
return (npl_res > 0) ? 1 : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -543,15 +567,6 @@ namespace sc
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Common function to create the sockets and write buffer inputs to the fdmap.
|
||||
* We take mutable parameters since the internal entries in the maps will be
|
||||
* modified (eg. fd close, buffer clear).
|
||||
*
|
||||
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
|
||||
* @param bufmap A map which has a public key and input/output buffer lists for that public key.
|
||||
* @return 0 on success. -1 on failure.
|
||||
*/
|
||||
int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap)
|
||||
{
|
||||
// Loop through input buffers for each pubkey.
|
||||
@@ -570,10 +585,9 @@ namespace sc
|
||||
*
|
||||
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
|
||||
* @param bufmap A map which has a public key and input/output buffer pair for that public key.
|
||||
* @param contract_terminated Indicates whether the contract termination signal recieved.
|
||||
* @return 0 if no bytes were read. 1 if bytes were read. -1 on failure.
|
||||
* @return 0 if no bytes were read. 1 if bytes were read.
|
||||
*/
|
||||
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap, const bool contract_terminated)
|
||||
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap)
|
||||
{
|
||||
bool bytes_read = false;
|
||||
for (auto &[pubkey, bufs] : bufmap)
|
||||
@@ -583,9 +597,13 @@ namespace sc
|
||||
std::vector<int> &fds = fdmap[pubkey];
|
||||
|
||||
// This returns the total bytes read from the socket.
|
||||
const int total_bytes_read = read_iosocket(true, fds, output, contract_terminated);
|
||||
const int total_bytes_read = read_iosocket(true, fds, output);
|
||||
|
||||
if (total_bytes_read > 0)
|
||||
if (total_bytes_read == -1)
|
||||
{
|
||||
LOG_ERROR << "Error reading user outputs from contract.";
|
||||
}
|
||||
else if (total_bytes_read > 0)
|
||||
{
|
||||
// Current reading position of the received buffer chunk.
|
||||
int pos = 0;
|
||||
@@ -641,15 +659,6 @@ namespace sc
|
||||
return bytes_read ? 1 : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Common function to close any open fds in the map after an error.
|
||||
* @param fdmap Any pubkey->fdlist map. (eg. ctx.userfds, ctx.nplfds)
|
||||
*/
|
||||
void cleanup_fdmap(contract_fdmap_t &fdmap)
|
||||
{
|
||||
fdmap.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Common function to create a socket (Hp->SC, SC->HP).
|
||||
* @param fds Vector to populate fd list.
|
||||
@@ -741,7 +750,6 @@ namespace sc
|
||||
if (write(writefd, input.data(), input.length()) == -1)
|
||||
{
|
||||
LOG_ERROR << errno << ": Error writing to sequece packet socket.";
|
||||
//cleanup_vectorfds(fds);
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -753,10 +761,9 @@ namespace sc
|
||||
* @param is_stream_socket Indicates whether socket is steam socket or not
|
||||
* @param fds Vector representing the socket fd list.
|
||||
* @param output The buffer to place the read output.
|
||||
* @param contract_terminated Indicates whether the contract termination signal recieved.
|
||||
* @return -1 on error. Otherwise no. of bytes read.
|
||||
*/
|
||||
int read_iosocket(const bool is_stream_socket, std::vector<int> &fds, std::string &output, const bool contract_terminated)
|
||||
int read_iosocket(const bool is_stream_socket, std::vector<int> &fds, std::string &output)
|
||||
{
|
||||
// Read any available data that have been written by the contract process
|
||||
// from the output socket and store in the output buffer.
|
||||
@@ -791,6 +798,7 @@ namespace sc
|
||||
else
|
||||
{
|
||||
res = -1;
|
||||
LOG_ERROR << errno << ": Error reading from contract socket.";
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -799,11 +807,6 @@ namespace sc
|
||||
res = -1;
|
||||
}
|
||||
|
||||
if (res == -1 || (res == 0 && contract_terminated))
|
||||
{
|
||||
cleanup_vectorfds(fds);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
@@ -869,42 +872,21 @@ namespace sc
|
||||
fds.clear();
|
||||
}
|
||||
|
||||
void clear_args(contract_execution_args &args)
|
||||
{
|
||||
args.userbufs.clear();
|
||||
// args.hpscbufs.inputs.clear();
|
||||
// args.hpscbufs.outputs.clear();
|
||||
// Empty npl message queue.
|
||||
while (args.npl_messages.pop())
|
||||
{
|
||||
}
|
||||
args.time = 0;
|
||||
args.lcl.clear();
|
||||
args.post_execution_state_hash = hpfs::h32_empty;
|
||||
args.contract_terminated = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup any running processes for the specified execution context.
|
||||
* Force cleanup any running processes for the specified execution context.
|
||||
*/
|
||||
void stop(execution_context &ctx)
|
||||
{
|
||||
ctx.should_stop = true;
|
||||
|
||||
if (ctx.contract_pid > 0)
|
||||
util::kill_process(ctx.contract_pid, true);
|
||||
|
||||
if (ctx.contract_io_thread.joinable())
|
||||
ctx.contract_io_thread.join();
|
||||
ctx.is_shutting_down = true;
|
||||
}
|
||||
|
||||
void handle_control_msgs(contract_execution_args &args, std::string &output)
|
||||
void handle_control_msgs(execution_context &ctx, std::string &msg)
|
||||
{
|
||||
if (output == "Terminated")
|
||||
if (msg == "Terminated")
|
||||
{
|
||||
args.contract_terminated = true;
|
||||
ctx.termination_signaled = true;
|
||||
}
|
||||
output.clear();
|
||||
msg.clear();
|
||||
}
|
||||
|
||||
} // namespace sc
|
||||
|
||||
31
src/sc.hpp
31
src/sc.hpp
@@ -31,7 +31,6 @@ namespace sc
|
||||
{
|
||||
uint32_t message_len = 0;
|
||||
std::string message;
|
||||
|
||||
};
|
||||
/**
|
||||
* Represents list of inputs to the contract and the accumulated contract output for those inputs.
|
||||
@@ -74,10 +73,6 @@ namespace sc
|
||||
|
||||
// Contol messages to be passed into contract.
|
||||
moodycamel::ReaderWriterQueue<std::string> control_messages;
|
||||
|
||||
// Pair of HP<->SC JSON message buffers (mainly used for control messages).
|
||||
// Input buffers for HP->SC messages, Output buffers for SC->HP messages.
|
||||
// contract_iobuf_pair hpscbufs;
|
||||
|
||||
// Current HotPocket consensus time.
|
||||
int64_t time = 0;
|
||||
@@ -87,9 +82,6 @@ namespace sc
|
||||
|
||||
// State hash after execution will be copied to this (not applicable to read only mode).
|
||||
hpfs::h32 post_execution_state_hash = hpfs::h32_empty;
|
||||
|
||||
// Indicates that the contract has sent termination control message or it has exited.
|
||||
bool contract_terminated = false;
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -116,10 +108,13 @@ namespace sc
|
||||
pid_t hpfs_pid = 0;
|
||||
|
||||
// Thread to collect contract inputs and outputs and feed npl messages while contract is running.
|
||||
std::thread contract_io_thread;
|
||||
std::thread contract_monitor_thread;
|
||||
|
||||
// Indicates that the contract has sent termination control message.
|
||||
bool termination_signaled = false;
|
||||
|
||||
// Indicates that the deinit procedure has begun.
|
||||
bool should_stop = false;
|
||||
bool is_shutting_down = false;
|
||||
};
|
||||
|
||||
int init();
|
||||
@@ -130,7 +125,7 @@ namespace sc
|
||||
|
||||
//------Internal-use functions for this namespace.
|
||||
|
||||
int await_process_execution(pid_t pid);
|
||||
int check_contract_exited(execution_context &ctx, const bool block);
|
||||
|
||||
int start_hpfs_session(execution_context &ctx);
|
||||
|
||||
@@ -138,9 +133,7 @@ namespace sc
|
||||
|
||||
int write_contract_args(const execution_context &ctx);
|
||||
|
||||
int feed_inputs(execution_context &ctx);
|
||||
|
||||
int handle_contract_io(execution_context &ctx);
|
||||
void contract_monitor_loop(execution_context &ctx);
|
||||
|
||||
int write_contract_hp_inputs(execution_context &ctx);
|
||||
|
||||
@@ -160,9 +153,7 @@ namespace sc
|
||||
|
||||
int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
|
||||
|
||||
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap, const bool contract_terminated);
|
||||
|
||||
void cleanup_fdmap(contract_fdmap_t &fdmap);
|
||||
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
|
||||
|
||||
int create_iosockets(std::vector<int> &fds, const int socket_type);
|
||||
|
||||
@@ -170,7 +161,7 @@ namespace sc
|
||||
|
||||
int write_iosocket_stream(std::vector<int> &fds, std::list<std::string> &inputs);
|
||||
|
||||
int read_iosocket(const bool is_stream_socket, std::vector<int> &fds, std::string &output, const bool contract_terminated);
|
||||
int read_iosocket(const bool is_stream_socket, std::vector<int> &fds, std::string &output);
|
||||
|
||||
void close_unused_fds(execution_context &ctx, const bool is_hp);
|
||||
|
||||
@@ -178,11 +169,9 @@ namespace sc
|
||||
|
||||
void cleanup_vectorfds(std::vector<int> &fds);
|
||||
|
||||
void clear_args(contract_execution_args &args);
|
||||
|
||||
void stop(execution_context &ctx);
|
||||
|
||||
void handle_control_msgs(contract_execution_args &args, std::string &output);
|
||||
void handle_control_msgs(execution_context &ctx, std::string &msg);
|
||||
|
||||
} // namespace sc
|
||||
|
||||
|
||||
Reference in New Issue
Block a user