Introduced contract control channel with 'terminate' message (#147)

This commit is contained in:
priyadharsun
2020-11-10 14:09:24 +05:30
committed by GitHub
parent 49e30961bd
commit 8f00c5e7d4
5 changed files with 112 additions and 144 deletions

View File

@@ -64,9 +64,6 @@ function HotPocketContract() {
this.terminate = () => {
this.control.sendOutput("Terminated")
// We are still using process.kill(0) temporarily to stop contract hanging.
// This will be removed after the control message is implemented.
process.kill(0);
}
if (!Object.keys(hpargs.usrfd).length) {

View File

@@ -355,7 +355,7 @@ namespace consensus
}
/**
* Equeue npl messages to the npl messages queue.
* Enqueue npl messages to the npl messages queue.
* @param npl_msg Constructed npl message.
* @return Returns true if enqueue is success otherwise false.
*/
@@ -364,6 +364,16 @@ namespace consensus
return ctx.contract_ctx.args.npl_messages.try_enqueue(npl_msg);
}
/**
* Enqueue conrol messages to the control messages queue.
* @param control_msg Constructed control message.
* @return Returns true if enqueue is success otherwise false.
*/
bool push_control_message(const std::string &control_msg)
{
return ctx.contract_ctx.args.control_messages.try_enqueue(control_msg);
}
/**
* Verifies the user signatures and populate non-expired user inputs from collected
* non-unl proposals (if any) into consensus candidate data.

View File

@@ -135,6 +135,8 @@ namespace consensus
int get_initial_state_hash(hpfs::h32 &hash);
bool push_control_message(const std::string &control_msg);
} // namespace consensus
#endif

View File

@@ -67,11 +67,13 @@ namespace sc
if (!ctx.args.readonly)
{
// create sequential packet sockets for npl and hp messages.
// Create sequential packet sockets for npl messages.
create_iosockets(ctx.nplfds, SOCK_SEQPACKET);
create_iosockets(ctx.hpscfds, SOCK_SEQPACKET);
}
// Create sequential packet sockets for hp messages.
create_iosockets(ctx.hpscfds, SOCK_SEQPACKET);
int ret = 0;
LOG_DEBUG << "Starting contract process..." << (ctx.args.readonly ? " (rdonly)" : "");
@@ -93,12 +95,14 @@ namespace sc
{
util::kill_process(pid, true);
ctx.contract_pid = 0;
ctx.args.contract_terminated = true;
goto failure;
}
// Wait for child process (contract process) to complete execution.
const int presult = await_process_execution(ctx.contract_pid);
ctx.contract_pid = 0;
ctx.args.contract_terminated = true;
LOG_DEBUG << "Contract process ended." << (ctx.args.readonly ? " (rdonly)" : "");
// There could be 2 reasons for the contract to end; the contract voluntary finished execution or
@@ -167,13 +171,8 @@ namespace sc
if (stop_hpfs_session(ctx) == -1)
ret = -1;
// Cleaning the user fdmap after executing the contract.
cleanup_fdmap(ctx.userfds);
if (!ctx.args.readonly)
{
cleanup_vectorfds(ctx.hpscfds);
cleanup_vectorfds(ctx.nplfds);
}
return ret;
}
@@ -270,10 +269,10 @@ namespace sc
if (!ctx.args.readonly)
{
os << ",\"lcl\":\"" << ctx.args.lcl
<< "\",\"hpfd\":" << ctx.hpscfds[SOCKETFDTYPE::SCREADWRITE]
<< ",\"nplfd\":" << ctx.nplfds[SOCKETFDTYPE::SCREADWRITE];
<< "\",\"nplfd\":" << ctx.nplfds[SOCKETFDTYPE::SCREADWRITE];
}
os << ",\"hpfd\":" << ctx.hpscfds[SOCKETFDTYPE::SCREADWRITE];
os << ",\"usrfd\":{";
fdmap_json_to_stream(ctx.userfds, os);
@@ -327,13 +326,6 @@ namespace sc
int feed_inputs(execution_context &ctx)
{
// Write any input messages to hp->sc socket.
if (!ctx.args.readonly && write_contract_hp_inputs(ctx) == -1)
{
LOG_ERROR << "Error when writing contract hp inputs.";
return -1;
}
// Write any user inputs to user sockets.
if (write_contract_fdmap_inputs(ctx.userfds, ctx.args.userbufs) == -1)
{
@@ -358,30 +350,26 @@ namespace sc
if (ctx.should_stop)
break;
const int hpsc_res = ctx.args.readonly ? 0 : read_contract_hp_outputs(ctx);
if (hpsc_res == -1)
return -1;
const int hpsc_res = read_contract_hp_outputs(ctx);
const int npl_read_res = ctx.args.readonly ? 0 : read_contract_npl_outputs(ctx);
if (npl_read_res == -1)
return -1;
const int user_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs, ctx.args.contract_terminated);
const int npl_write_res = ctx.args.readonly ? 0 : write_npl_messages(ctx);
if (npl_write_res == -1)
return -1;
const int user_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs);
if (user_res == -1)
if (!ctx.args.contract_terminated)
{
LOG_ERROR << "Error reading user outputs from the contract.";
return -1;
if (!ctx.args.readonly)
write_npl_messages(ctx);
write_contract_hp_inputs(ctx);
}
// If no bytes were read after contract finished execution, exit the read loop.
if (hpsc_res == 0 && npl_read_res == 0 && user_res == 0 && ctx.contract_pid == 0)
if (hpsc_res <= 0 && npl_read_res <= 0 && user_res <= 0 && ctx.args.contract_terminated)
{
break;
util::sleep(20);
}
else if (hpsc_res <= 0 && npl_read_res <= 0 && user_res <= 0)
{
util::sleep(20);
}
}
LOG_DEBUG << "Contract outputs collected.";
@@ -393,11 +381,16 @@ namespace sc
*/
int write_contract_hp_inputs(execution_context &ctx)
{
// if (write_iosocket_seq_packet(ctx.hpscfds, ctx.args.hpscbufs.inputs, false) == -1)
// {
// LOG_ERROR << "Error writing HP inputs to SC";
// return -1;
// }
std::string control_msg;
if (ctx.args.control_messages.try_dequeue(control_msg))
{
if (write_iosocket_seq_packet(ctx.hpscfds, control_msg) == -1)
{
LOG_ERROR << "Error writing HP inputs to SC";
return -1;
}
}
return 0;
}
@@ -423,7 +416,7 @@ namespace sc
p2p::npl_message npl_msg;
if (ctx.args.npl_messages.try_dequeue(npl_msg))
{
if (npl_msg.lcl == ledger::ctx.get_lcl())
if (npl_msg.lcl == ctx.args.lcl)
{
// Writing the public key to the contract's fd.
if (write(writefd, npl_msg.pubkey.data(), npl_msg.pubkey.size()) == -1)
@@ -450,7 +443,7 @@ namespace sc
int read_contract_hp_outputs(execution_context &ctx)
{
std::string output;
const int hpsc_res = read_iosocket_seq_packet(ctx.hpscfds, output);
const int hpsc_res = read_iosocket(false, ctx.hpscfds, output, ctx.args.contract_terminated);
if (hpsc_res == -1)
{
LOG_ERROR << "Error reading HP output from the contract.";
@@ -459,6 +452,8 @@ namespace sc
else if (hpsc_res > 0)
{
// ctx.args.hpscbufs.outputs.push_back(output);
handle_control_msgs(ctx.args, output);
}
return (hpsc_res == 0) ? 0 : 1;
@@ -472,7 +467,7 @@ namespace sc
int read_contract_npl_outputs(execution_context &ctx)
{
std::string output;
const int npl_res = read_iosocket_seq_packet(ctx.nplfds, output);
const int npl_res = read_iosocket(false, ctx.nplfds, output, ctx.args.contract_terminated);
if (npl_res == -1)
{
@@ -575,9 +570,10 @@ namespace sc
*
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
* @param bufmap A map which has a public key and input/output buffer pair for that public key.
* @param contract_terminated Indicates whether the contract termination signal recieved.
* @return 0 if no bytes were read. 1 if bytes were read. -1 on failure.
*/
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap)
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap, const bool contract_terminated)
{
bool bytes_read = false;
for (auto &[pubkey, bufs] : bufmap)
@@ -587,7 +583,7 @@ namespace sc
std::vector<int> &fds = fdmap[pubkey];
// This returns the total bytes read from the socket.
const int total_bytes_read = read_iosocket_stream(fds, output);
const int total_bytes_read = read_iosocket(true, fds, output, contract_terminated);
if (total_bytes_read > 0)
{
@@ -640,11 +636,6 @@ namespace sc
bytes_read = true;
}
if (total_bytes_read == -1)
{
return -1;
}
}
return bytes_read ? 1 : 0;
@@ -656,9 +647,6 @@ namespace sc
*/
void cleanup_fdmap(contract_fdmap_t &fdmap)
{
for (auto &[pubkey, fds] : fdmap)
cleanup_vectorfds(fds);
fdmap.clear();
}
@@ -690,7 +678,6 @@ namespace sc
* Common function to write the given input buffer into the write fd from the HP side socket.
* @param fds Vector of fd list.
* @param inputs Buffer to write into the HP write fd.
* @param close_if_empty Close the socket after writing if this is true.
*/
int write_iosocket_stream(std::vector<int> &fds, std::list<std::string> &inputs)
{
@@ -740,52 +727,43 @@ namespace sc
}
/**
* Common function to write the given input buffer into the write fd from the HP side socket.
* Common function to write the given input into the write fd from the HP side socket.
* @param fds Vector of fd list.
* @param inputs Buffer to write into the HP write fd.
* @param close_if_empty Close the socket after writing if this is true.
* @param input Input to write into the HP write fd.
*/
int write_iosocket_seq_packet(std::vector<int> &fds, std::list<std::string> &inputs, const bool close_if_empty)
int write_iosocket_seq_packet(std::vector<int> &fds, std::string_view input)
{
// Write the inputs (if any) into the contract.
const int writefd = fds[SOCKETFDTYPE::HPREADWRITE];
if (writefd == -1)
return 0;
bool write_error = false;
if (!inputs.empty())
if (write(writefd, input.data(), input.length()) == -1)
{
for (std::string &input : inputs)
{
if (write(writefd, input.data(), input.length()) == -1)
write_error = true;
}
}
else if (close_if_empty)
{
close(writefd);
fds[SOCKETFDTYPE::HPREADWRITE] = -1;
}
if (write_error)
LOG_ERROR << errno << ": Error writing to sequece packet socket.";
//cleanup_vectorfds(fds);
return -1;
}
return write_error ? -1 : 0;
return 0;
}
/**
* Common function to read buffered output from the sequence packet socket and populate the output.
* Common function to read buffered output from the socket and populate the output.
* @param is_stream_socket Indicates whether socket is steam socket or not
* @param fds Vector representing the socket fd list.
* @param output The buffer to place the read output.
* @param contract_terminated Indicates whether the contract termination signal recieved.
* @return -1 on error. Otherwise no. of bytes read.
*/
int read_iosocket_seq_packet(std::vector<int> &fds, std::string &output)
int read_iosocket(const bool is_stream_socket, std::vector<int> &fds, std::string &output, const bool contract_terminated)
{
// Read any available data that have been written by the contract process
// from the output socket and store in the output buffer.
// Outputs will be read by the consensus process later when it wishes so.
const int readfd = fds[SOCKETFDTYPE::HPREADWRITE];
int res = 0;
if (readfd == -1)
return 0;
@@ -795,84 +773,49 @@ namespace sc
if (ioctl(readfd, FIONREAD, &available_bytes) != -1)
{
if (available_bytes == 0)
return 0;
output.resize(MIN(MAX_SEQ_PACKET_SIZE, available_bytes));
const int res = read(readfd, output.data(), MAX_SEQ_PACKET_SIZE);
output.resize(res);
if (res >= 0)
{
if (res == 0) // EOF
res = 0;
}
else
{
const size_t bytes_to_read = is_stream_socket ? available_bytes : MIN(MAX_SEQ_PACKET_SIZE, available_bytes);
output.resize(bytes_to_read);
const int read_res = read(readfd, output.data(), bytes_to_read);
if (read_res >= 0)
{
close(readfd);
fds[SOCKETFDTYPE::HPREADWRITE] = -1;
res = read_res;
if (is_stream_socket)
output.resize(read_res);
}
else
{
res = -1;
}
return res;
}
}
close(readfd);
fds[SOCKETFDTYPE::HPREADWRITE] = -1;
LOG_ERROR << errno << ": Error reading sequence packet socket.";
return -1;
}
/**
* Common function to read buffered output from the stream socket and populate the output list.
* @param fds Vector representing the sockets fd list.
* @param output The buffer to place the read output.
* @return -1 on error. Otherwise no. of bytes read.
*/
int read_iosocket_stream(std::vector<int> &fds, std::string &output)
{
// Read any available data that have been written by the contract process
// from the output socket and store in the output buffer.
// Outputs will be read by the consensus process later when it wishes so.
const int readfd = fds[SOCKETFDTYPE::HPREADWRITE];
if (readfd == -1)
return 0;
bool read_error = false;
size_t available_bytes = 0;
if (ioctl(readfd, FIONREAD, &available_bytes) != -1)
else
{
if (available_bytes == 0)
{
return 0;
}
output.resize(available_bytes);
const int res = read(readfd, output.data(), available_bytes);
if (res >= 0)
{
if (res == 0) // EOF
{
close(readfd);
fds[SOCKETFDTYPE::HPREADWRITE] = -1;
}
return res;
}
res = -1;
}
close(readfd);
fds[SOCKETFDTYPE::HPREADWRITE] = -1;
LOG_ERROR << errno << ": Error reading stream socket.";
if (res == -1 || (res == 0 && contract_terminated))
{
cleanup_vectorfds(fds);
}
return -1;
return res;
}
void close_unused_fds(execution_context &ctx, const bool is_hp)
{
if (!ctx.args.readonly)
{
close_unused_socket_vectorfds(is_hp, ctx.hpscfds);
close_unused_socket_vectorfds(is_hp, ctx.nplfds);
}
close_unused_socket_vectorfds(is_hp, ctx.hpscfds);
// Loop through user fds.
for (auto &[pubkey, fds] : ctx.userfds)
close_unused_socket_vectorfds(is_hp, fds);
@@ -938,6 +881,7 @@ namespace sc
args.time = 0;
args.lcl.clear();
args.post_execution_state_hash = hpfs::h32_empty;
args.contract_terminated = false;
}
/**
@@ -954,4 +898,13 @@ namespace sc
ctx.contract_io_thread.join();
}
void handle_control_msgs(contract_execution_args &args, std::string &output)
{
if (output == "Terminated")
{
args.contract_terminated = true;
}
output.clear();
}
} // namespace sc

View File

@@ -71,6 +71,9 @@ namespace sc
// NPL messages to be passed into contract.
moodycamel::ReaderWriterQueue<p2p::npl_message> npl_messages;
// Contol messages to be passed into contract.
moodycamel::ReaderWriterQueue<std::string> control_messages;
// Pair of HP<->SC JSON message buffers (mainly used for control messages).
// Input buffers for HP->SC messages, Output buffers for SC->HP messages.
@@ -84,6 +87,9 @@ namespace sc
// State hash after execution will be copied to this (not applicable to read only mode).
hpfs::h32 post_execution_state_hash = hpfs::h32_empty;
// Indicates that the contract has sent termination control message or it has exited.
bool contract_terminated = false;
};
/**
@@ -154,19 +160,17 @@ namespace sc
int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap, const bool contract_terminated);
void cleanup_fdmap(contract_fdmap_t &fdmap);
int create_iosockets(std::vector<int> &fds, const int socket_type);
int write_iosocket_seq_packet(std::vector<int> &fds, std::list<std::string> &inputs, const bool close_if_empty);
int write_iosocket_seq_packet(std::vector<int> &fds, std::string_view input);
int write_iosocket_stream(std::vector<int> &fds, std::list<std::string> &inputs);
int read_iosocket_seq_packet(std::vector<int> &fds, std::string &output);
int read_iosocket_stream(std::vector<int> &fds, std::string &output);
int read_iosocket(const bool is_stream_socket, std::vector<int> &fds, std::string &output, const bool contract_terminated);
void close_unused_fds(execution_context &ctx, const bool is_hp);
@@ -178,6 +182,8 @@ namespace sc
void stop(execution_context &ctx);
void handle_control_msgs(contract_execution_args &args, std::string &output);
} // namespace sc
#endif