NPL message refactor. (#132)

* Implemented feeding and broadcasting npl messages from the contract execution in real-time.
* Replaced npl pipe with domain sockets.
* Refactored npl read and write in nodejs echo contract
This commit is contained in:
Chalith Desaman
2020-10-14 15:18:00 +05:30
committed by GitHub
parent cb4d0c4f59
commit 5f40aebf08
9 changed files with 298 additions and 187 deletions

View File

@@ -1,11 +1,16 @@
#include "pchheader.hpp"
#include "conf.hpp"
#include "consensus.hpp"
#include "hplog.hpp"
#include "ledger.hpp"
#include "sc.hpp"
#include "hpfs/hpfs.hpp"
#include "msg/fbuf/p2pmsg_helpers.hpp"
namespace sc
{
const int MAX_NPL_BUF_SIZE = 128 * 1024;
/**
* Executes the contract process and passes the specified context arguments.
* @return 0 on successful process creation. -1 on failure or contract process is already running.
@@ -21,7 +26,7 @@ namespace sc
if (!ctx.args.readonly)
{
create_iopipes(ctx.nplfds, !ctx.args.npl_messages.empty());
create_iosockets(ctx.nplfds);
create_iopipes(ctx.hpscfds, !ctx.args.hpscbufs.inputs.empty());
}
@@ -39,7 +44,7 @@ namespace sc
close_unused_fds(ctx, true);
// Start the contract output collection thread.
ctx.output_fetcher_thread = std::thread(fetch_outputs, std::ref(ctx));
ctx.contract_io_thread = std::thread(handle_contract_io, std::ref(ctx));
// Write the inputs into the contract process.
if (feed_inputs(ctx) == -1)
@@ -57,11 +62,11 @@ namespace sc
// There could be 2 reasons for the contract to end; the contract voluntary finished execution or
// it was killed due to Hot Pocket shutting down.
// Wait for the output collection thread to gracefully stop if this is voluntary contract termination.
// Wait for the i/o thread to gracefully stop if this is voluntary contract termination.
// 'ctx.should_stop' indicates Hot Pocket is shutting down. If that's the case ouput collection thread
// is joined by the deinit logic.
if (!ctx.should_stop && ctx.output_fetcher_thread.joinable())
ctx.output_fetcher_thread.join();
if (!ctx.should_stop && ctx.contract_io_thread.joinable())
ctx.contract_io_thread.join();
if (presult != 0)
{
@@ -208,7 +213,7 @@ namespace sc
{
os << ",\"lcl\":\"" << ctx.args.lcl
<< "\",\"hpfd\":[" << ctx.hpscfds[FDTYPE::SCREAD] << "," << ctx.hpscfds[FDTYPE::SCWRITE]
<< "],\"nplfd\":[" << ctx.nplfds[FDTYPE::SCREAD] << "," << ctx.nplfds[FDTYPE::SCWRITE] << "]";
<< "],\"nplfd\":" << ctx.nplfds[SOCKETFDTYPE::SCREADWRITE];
}
os << ",\"usrfd\":{";
@@ -267,10 +272,6 @@ namespace sc
if (!ctx.args.readonly && write_contract_hp_inputs(ctx) == -1)
return -1;
// Write any NPL messages to contract.
if (!ctx.args.readonly && write_npl_messages(ctx) == -1)
return -1;
// Write any verified (consensus-reached) user inputs to user pipes.
if (write_contract_fdmap_inputs(ctx.userfds, ctx.args.userbufs) == -1)
{
@@ -281,7 +282,12 @@ namespace sc
return 0;
}
int fetch_outputs(execution_context &ctx)
/**
* Collect contract outputs and feed npl messages while contract is running.
* @param ctx Contract execution context.
* @return Returns -1 if the operation fails otherwise 0.
*/
int handle_contract_io(execution_context &ctx)
{
util::mask_signal();
@@ -290,8 +296,16 @@ namespace sc
if (ctx.should_stop)
break;
const int hpsc_npl_res = ctx.args.readonly ? 0 : read_contract_hp_npl_outputs(ctx);
if (hpsc_npl_res == -1)
const int hpsc_res = ctx.args.readonly ? 0 : read_contract_hp_outputs(ctx);
if (hpsc_res == -1)
return -1;
const int npl_read_res = ctx.args.readonly ? 0 : read_contract_npl_outputs(ctx);
if (npl_read_res == -1)
return -1;
const int npl_write_res = ctx.args.readonly ? 0 : write_npl_messages(ctx);
if (npl_write_res == -1)
return -1;
const int user_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs);
@@ -302,7 +316,7 @@ namespace sc
}
// If no bytes were read after contract finished execution, exit the read loop.
if (hpsc_npl_res == 0 && user_res == 0 && ctx.contract_pid == 0)
if (hpsc_res == 0 && npl_read_res == 0 && user_res == 0 && ctx.contract_pid == 0)
break;
util::sleep(20);
@@ -328,64 +342,41 @@ namespace sc
/**
* Write npl messages to the contract.
* @param ctx Contract execution context.
* @return Returns -1 when fails otherwise 0.
*/
int write_npl_messages(execution_context &ctx)
{
/**
* npl inputs are feed into the contract in a binary protocol. It follows the following pattern
* |**NPL version (1 byte)**|**Reserved (1 byte)**|**Length of the message (2 bytes)**|**Public key (32 bytes)**|**Npl message data**|
* Length of the message is calculated without including public key length
* npl inputs are feed into the contract as sequence packets. It first sends the pubkey and then
* the data.
*/
const int writefd = ctx.nplfds[FDTYPE::HPWRITE];
const int writefd = ctx.nplfds[SOCKETFDTYPE::HPREADWRITE];
if (writefd == -1)
return 0;
bool write_error = false;
if (!ctx.args.npl_messages.empty())
// Dequeue the next npl message from the queue.
// Check the lcl against the latest lcl.
p2p::npl_message npl_msg;
if (ctx.args.npl_messages.try_dequeue(npl_msg))
{
const size_t total_memsegs = ctx.args.npl_messages.size() * 3;
iovec memsegs[total_memsegs];
size_t i = 0;
for (const auto &npl_msg : ctx.args.npl_messages)
if (npl_msg.lcl == ledger::ctx.get_lcl())
{
const uint8_t pre_header_index = i * 3;
const uint8_t pubkey_index = pre_header_index + 1;
const uint8_t msg_index = pre_header_index + 2;
const uint16_t msg_len = npl_msg.data.size();
// Header is |version(1byte)|reserve(1byte)|msg length(2bytes big endian)|
uint8_t header[4];
header[0] = util::MIN_NPL_INPUT_VERSION;
// Store msg length in big endian.
header[2] = msg_len << 8;
header[3] = msg_len;
memsegs[pre_header_index].iov_base = header;
memsegs[pre_header_index].iov_len = sizeof(header);
// Pubkey without the key type prefix.
memsegs[pubkey_index].iov_base = reinterpret_cast<void *>(const_cast<char *>(npl_msg.pubkey.data() + 1));
memsegs[pubkey_index].iov_len = npl_msg.pubkey.size() - 1;
memsegs[msg_index].iov_base = reinterpret_cast<void *>(const_cast<char *>(npl_msg.data.data()));
memsegs[msg_index].iov_len = msg_len;
i++;
// Writing the public key to the contract's fd.
if (write(writefd, npl_msg.pubkey.data(), npl_msg.pubkey.size()) == -1)
return -1;
// Writing the message to the contract's fd.
if (write(writefd, npl_msg.data.data(), npl_msg.data.size()) == -1)
return -1;
}
else
{
LOG_DEBUG << "NPL message dropped due to lcl mismatch.";
}
if (writev(writefd, memsegs, total_memsegs) == -1)
write_error = true;
ctx.args.npl_messages.clear();
}
// Close the writefd since we no longer need it.
close(writefd);
ctx.nplfds[FDTYPE::HPWRITE] = -1;
return write_error ? -1 : 0;
return 0;
}
/**
@@ -394,7 +385,7 @@ namespace sc
*
* @return 0 if no bytes were read. 1 if bytes were read. -1 on failure.
*/
int read_contract_hp_npl_outputs(execution_context &ctx)
int read_contract_hp_outputs(execution_context &ctx)
{
const int hpsc_res = read_iopipe(ctx.hpscfds, ctx.args.hpscbufs.output);
if (hpsc_res == -1)
@@ -403,19 +394,50 @@ namespace sc
return -1;
}
const int npl_res = read_iopipe(ctx.nplfds, ctx.args.npl_output);
return (hpsc_res == 0) ? 0 : 1;
}
/**
* Read all NPL output messages produced by the contract process and broadcast them.
* @param ctx contract execution context.
* @return 0 if no bytes were read. 1 if bytes were read. -1 on failure.
*/
int read_contract_npl_outputs(execution_context &ctx)
{
std::string output;
const int npl_res = read_iosocket(ctx.nplfds, output);
if (npl_res == -1)
{
LOG_ERROR << "Error reading NPL output from the contract.";
return -1;
}
else if (npl_res > 0)
{
// Broadcast npl messages once contract npl output is collected.
broadcast_npl_output(output);
}
return (hpsc_res == 0 && npl_res == 0) ? 0 : 1;
return (npl_res == 0) ? 0 : 1;
}
/**
* Broadcast npl messages to peers.
* @param output Npl message to be broadcasted.
*/
void broadcast_npl_output(std::string_view output)
{
if (!output.empty())
{
flatbuffers::FlatBufferBuilder fbuf(1024);
msg::fbuf::p2pmsg::create_msg_from_npl_output(fbuf, output, ledger::ctx.get_lcl());
p2p::broadcast_message(fbuf, true);
}
}
/**
* Common helper function to write json output of fdmap to given ostream.
* @param fdmap Any pubkey->fdlist map. (eg. ctx.userfds, ctx.nplfds)
* @param fdmap Any pubkey->fdlist map. (eg. ctx.userfds)
* @param os An output stream.
*/
void fdmap_json_to_stream(const contract_fdmap_t &fdmap, std::ostringstream &os)
@@ -553,6 +575,28 @@ namespace sc
return 0;
}
/**
* Common function to create a socket (Hp->SC, SC->HP).
* @param fds Vector to populate fd list.
* @return Returns -1 if socket creation fails otherwise 0.
*/
int create_iosockets(std::vector<int> &fds)
{
int socket[2] = {-1, -1};
// Create a sequence packet socket.
if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, socket) == -1)
{
return -1;
}
// If socket got created, assign them to the fd vector.
fds.clear();
fds.push_back(socket[0]); //SCREADWRITE
fds.push_back(socket[1]); //HPREADWRITE
return 0;
}
/**
* Common function to write the given input buffer into the write fd from the HP side.
* @param fds Vector of fd list.
@@ -636,12 +680,46 @@ namespace sc
return -1;
}
/**
* Common function to read buffered output from the socket and populate the output.
* @param fds Vector representing the socket fd list.
* @param output The buffer to place the read output.
* @return -1 on error. Otherwise no. of bytes read.
*/
int read_iosocket(std::vector<int> &fds, std::string &output)
{
// Read any available data that have been written by the contract process
// from the output socket and store in the output buffer.
// Outputs will be read by the consensus process later when it wishes so.
const int readfd = fds[SOCKETFDTYPE::HPREADWRITE];
if (readfd == -1)
return 0;
// Available bytes returns the total number of bytes to read of multiple messages.
size_t available_bytes = 0;
if (ioctl(readfd, FIONREAD, &available_bytes) != -1)
{
if (available_bytes == 0)
return 0;
output.resize(MAX_NPL_BUF_SIZE);
const int res = read(readfd, output.data(), MAX_NPL_BUF_SIZE);
output.resize(res);
return res;
}
return -1;
}
void close_unused_fds(execution_context &ctx, const bool is_hp)
{
if (!ctx.args.readonly)
{
close_unused_vectorfds(is_hp, ctx.hpscfds);
close_unused_vectorfds(is_hp, ctx.nplfds);
close_unused_socket_vectorfds(is_hp, ctx.nplfds);
}
// Loop through user fds.
@@ -680,6 +758,37 @@ namespace sc
}
}
/**
* Common function for closing unused fds based on which process this gets called from.
* This also marks active fds with O_CLOEXEC for close-on-exec behaviour.
* @param is_hp Specify 'true' when calling from HP process. 'false' from SC process.
* @param fds Vector of fds to close.
*/
void close_unused_socket_vectorfds(const bool is_hp, std::vector<int> &fds)
{
for (int fd_type = 0; fd_type <= 1; fd_type++)
{
const int fd = fds[fd_type];
if (fd != -1)
{
if ((is_hp && fd_type == SOCKETFDTYPE::SCREADWRITE) ||
(!is_hp && fd_type == SOCKETFDTYPE::HPREADWRITE))
{
close(fd);
fds[fd_type] = -1;
}
else if (is_hp && (fd_type == SOCKETFDTYPE::HPREADWRITE))
{
// The fd must be kept open in HP process. But we must
// mark it to close on exec in a potential forked process.
int flags = fcntl(fd, F_GETFD, NULL);
flags |= FD_CLOEXEC;
fcntl(fd, F_SETFD, flags);
}
}
}
}
/**
* Closes all fds in a vector fd set.
*/
@@ -702,8 +811,10 @@ namespace sc
args.userbufs.clear();
args.hpscbufs.inputs.clear();
args.hpscbufs.output.clear();
args.npl_messages.clear();
args.npl_output.clear();
// Empty npl message queue.
while (args.npl_messages.pop())
{
}
args.time = 0;
args.lcl.clear();
args.post_execution_state_hash = hpfs::h32_empty;
@@ -719,8 +830,8 @@ namespace sc
if (ctx.contract_pid > 0)
util::kill_process(ctx.contract_pid, true);
if (ctx.output_fetcher_thread.joinable())
ctx.output_fetcher_thread.join();
if (ctx.contract_io_thread.joinable())
ctx.contract_io_thread.join();
}
} // namespace sc