Poll-based contract output reading. (#172)

This commit is contained in:
Ravin Perera
2020-11-27 15:57:36 +05:30
committed by GitHub
parent 80392cc995
commit 79b55258de
4 changed files with 161 additions and 170 deletions

View File

@@ -478,7 +478,7 @@ void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_obj
}
}
}
else if (strcmp(k->string, "hpfd") == 0)
else if (strcmp(k->string, "controlfd") == 0)
{
__HP_ASSIGN_INT(gctx.control_fd, elem);
}

View File

@@ -22,7 +22,7 @@ class HotPocketContract {
const argsJson = fs.readFileSync(0, 'utf8');
const hpargs = JSON.parse(argsJson);
this.#controlChannel = new ControlChannel(hpargs.hpfd);
this.#controlChannel = new ControlChannel(hpargs.controlfd);
this.#executeContract(hpargs, contractFunc);
}

View File

@@ -12,7 +12,7 @@
namespace sc
{
const uint32_t MAX_SEQ_PACKET_SIZE = 128 * 1024;
const uint32_t READ_BUFFER_SIZE = 128 * 1024; // This has to be minimum 128KB to support sequence packets.
bool init_success = false;
// We maintain two hpfs global processes for merging and rw sessions.
@@ -65,17 +65,19 @@ namespace sc
if (start_hpfs_session(ctx) == -1)
return -1;
create_iosockets_for_fdmap(ctx.userfds, ctx.args.userbufs); // User output socket.
create_iosockets(ctx.hpscfds, SOCK_SEQPACKET); // Control socket.
if (!ctx.args.readonly)
create_iosockets(ctx.nplfds, SOCK_SEQPACKET); // NPL socket.
// Clone the user inputs fd to be passed on to the contract.
const int user_inputs_fd = dup(ctx.args.user_input_store.fd);
int ret = 0;
// Create the IO sockets for users, control channel and npl.
// (Note: User socket will only be used for contract output only. For feeding user inputs we are using a memfd.)
if (create_iosockets_for_fdmap(ctx.userfds, ctx.args.userbufs) == -1 ||
create_iosockets(ctx.controlfds, SOCK_SEQPACKET) == -1 ||
(!ctx.args.readonly && create_iosockets(ctx.nplfds, SOCK_SEQPACKET) == -1))
{
cleanup_fds(ctx);
stop_hpfs_session(ctx);
return -1;
}
LOG_DEBUG << "Starting contract process..." << (ctx.args.readonly ? " (rdonly)" : "");
int ret = 0;
const pid_t pid = fork();
if (pid > 0)
@@ -85,7 +87,6 @@ namespace sc
// Close all fds unused by HP process.
close_unused_fds(ctx, true);
close(user_inputs_fd);
// Start the contract monitor thread.
ctx.contract_monitor_thread = std::thread(contract_monitor_loop, std::ref(ctx));
@@ -104,10 +105,11 @@ namespace sc
// Close all fds unused by SC process.
close_unused_fds(ctx, false);
// Reset the seek position for the contract's copy of user inputs fd.
lseek(user_inputs_fd, 0, SEEK_SET);
// Clone the user inputs fd to be passed on to the contract.
const int user_inputs_fd = dup(ctx.args.user_input_store.fd);
lseek(user_inputs_fd, 0, SEEK_SET); // Reset seek position.
// Write the contract input message from HotPocket to the stdin (0) of the contract process.
// Write the contract execution args from HotPocket to the stdin (0) of the contract process.
write_contract_args(ctx, user_inputs_fd);
const bool using_appbill = !ctx.args.readonly && !conf::cfg.appbill.empty();
@@ -137,14 +139,10 @@ namespace sc
else
{
LOG_ERROR << errno << ": fork() failed when starting contract process." << (ctx.args.readonly ? " (rdonly)" : "");
goto failure;
ret = -1;
}
goto success;
failure:
ret = -1;
success:
cleanup_fds(ctx);
if (stop_hpfs_session(ctx) == -1)
ret = -1;
@@ -245,7 +243,7 @@ namespace sc
* "ts": <this node's timestamp (unix milliseconds)>,
* "readonly": <true|false>,
* "lcl": "<this node's last closed ledger seq no. and hash in hex>", (eg: 169-a1d82eb4c9ed005ec2c4f4f82b6f0c2fd7543d66b1a0f6b8e58ae670b3e2bcfb)
* "hpfd": fd,
* "controlfd": fd,
* "nplfd":fd,
* "userinfd":fd, // User inputs fd.
* "users":{ "<pkhex>":[outfd, [msg1_off, msg1_len], ...], ... },
@@ -267,10 +265,10 @@ namespace sc
if (!ctx.args.readonly)
{
os << ",\"lcl\":\"" << ctx.args.lcl
<< "\",\"nplfd\":" << ctx.nplfds[SOCKETFDTYPE::SCREADWRITE];
<< "\",\"nplfd\":" << ctx.nplfds.scfd;
}
os << ",\"hpfd\":" << ctx.hpscfds[SOCKETFDTYPE::SCREADWRITE];
os << ",\"controlfd\":" << ctx.controlfds.scfd;
os << ",\"userinfd\":" << user_inputs_fd
<< ",\"users\":{";
@@ -315,18 +313,43 @@ namespace sc
{
util::mask_signal();
// Prepare output poll fd list.
// User out fds + control fd + NPL fd (NPL fd not available in readonly mode)
const size_t out_fd_count = ctx.userfds.size() + (ctx.args.readonly ? 1 : 2);
const size_t control_fd_idx = ctx.userfds.size();
const size_t npl_fd_idx = control_fd_idx + 1;
struct pollfd out_fds[out_fd_count];
auto user_itr = ctx.userfds.begin();
for (int i = 0; i < out_fd_count; i++)
{
const int fd = (user_itr != ctx.userfds.end()) ? (user_itr++)->second.hpfd
: (i == control_fd_idx ? ctx.controlfds.hpfd : ctx.nplfds.hpfd);
out_fds[i] = {fd, POLLIN, 0};
}
while (!ctx.is_shutting_down)
{
// Reset the revents because we are reusing same pollfd list.
for (int i = 0; i < out_fd_count; i++)
out_fds[i].revents = 0;
if (poll(out_fds, out_fd_count, 20) == -1)
{
LOG_ERROR << errno << ": Poll error in contract outputs.";
break;
}
// Atempt to read messages from contract (regardless of contract terminated or not).
const int hpsc_read_res = read_control_outputs(ctx);
const int npl_read_res = ctx.args.readonly ? 0 : read_contract_npl_outputs(ctx);
const int user_read_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs);
const int control_read_res = read_control_outputs(ctx, out_fds[control_fd_idx]);
const int npl_read_res = ctx.args.readonly ? 0 : read_npl_outputs(ctx, out_fds[npl_fd_idx]);
const int user_read_res = read_contract_fdmap_outputs(ctx.userfds, out_fds, ctx.args.userbufs);
if (ctx.termination_signaled || ctx.contract_pid == 0)
{
// If no bytes were read after contract finished execution, exit the loop.
// Otherwise keep running the loop becaue there might be further messages to read.
if ((hpsc_read_res + npl_read_res + user_read_res) == 0)
if ((control_read_res + npl_read_res + user_read_res) == 0)
break;
}
else
@@ -340,11 +363,6 @@ namespace sc
const int control_write_res = write_control_inputs(ctx);
if (control_write_res == -1)
break;
// If no operation was performed during this iteration, wait for a small delay until the next iteration.
// This means there were no queued messages from either side.
if ((hpsc_read_res + npl_read_res + user_read_res + control_write_res + control_write_res) == 0)
util::sleep(20);
}
// Check if contract process has exited on its own during the loop.
@@ -353,11 +371,7 @@ namespace sc
}
// Close all fds.
cleanup_vectorfds(ctx.hpscfds);
cleanup_vectorfds(ctx.nplfds);
for (auto &[pubkey, fds] : ctx.userfds)
cleanup_vectorfds(fds);
ctx.userfds.clear();
cleanup_fds(ctx);
// Purge any inputs we passed to the contract.
for (const auto &[pubkey, bufs] : ctx.args.userbufs)
@@ -390,7 +404,7 @@ namespace sc
if (ctx.args.control_messages.try_dequeue(control_msg))
{
if (write_iosocket_seq_packet(ctx.hpscfds, control_msg) == -1)
if (write_iosocket_seq_packet(ctx.controlfds, control_msg) == -1)
{
LOG_ERROR << "Error writing HP inputs to SC";
return -1;
@@ -411,7 +425,7 @@ namespace sc
* npl inputs are feed into the contract as sequence packets. It first sends the pubkey and then
* the data.
*/
const int writefd = ctx.nplfds[SOCKETFDTYPE::HPREADWRITE];
const int writefd = ctx.nplfds.hpfd;
if (writefd == -1)
return 0;
@@ -460,10 +474,10 @@ namespace sc
*
* @return 0 if no bytes were read. 1 if bytes were read..
*/
int read_control_outputs(execution_context &ctx)
int read_control_outputs(execution_context &ctx, const pollfd pfd)
{
std::string output;
const int res = read_iosocket(false, ctx.hpscfds, output);
const int res = read_iosocket(false, pfd, output);
if (res == -1)
{
LOG_ERROR << "Error reading control message from the contract.";
@@ -481,10 +495,10 @@ namespace sc
* @param ctx contract execution context.
* @return 0 if no bytes were read. 1 if bytes were read.
*/
int read_contract_npl_outputs(execution_context &ctx)
int read_npl_outputs(execution_context &ctx, const pollfd pfd)
{
std::string output;
const int res = read_iosocket(false, ctx.nplfds, output);
const int res = read_iosocket(false, pfd, output);
if (res == -1)
{
@@ -532,7 +546,7 @@ namespace sc
// Write hex pubkey as key and output fd as first element of array.
os << "\"" << pubkeyhex << "\":["
<< itr->second[SOCKETFDTYPE::SCREADWRITE];
<< itr->second.scfd;
// Write input offsets into the same array.
for (auto inp_itr = user_inputs.begin(); inp_itr != user_inputs.end(); inp_itr++)
@@ -544,7 +558,7 @@ namespace sc
/**
* Creates io sockets for all pubkeys specified in bufmap.
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
* @param fdmap A map which has public key and fd pair for that public key.
* @param bufmap A map which has a public key and input/output buffer lists for that public key.
* @return 0 on success. -1 on failure.
*/
@@ -552,7 +566,7 @@ namespace sc
{
for (auto &[pubkey, buflist] : bufmap)
{
std::vector<int> fds = std::vector<int>();
fd_pair fds = {};
if (create_iosockets(fds, SOCK_STREAM) == -1)
return -1;
@@ -566,21 +580,23 @@ namespace sc
* Common function to read all outputs produced by the contract process and store them in
* output buffers for later processing.
*
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
* @param fdmap A map which has public key and fd pair for that public key.
* @param pfds Poll fd set for users (must be in same order as user fdmap).
* @param bufmap A map which has a public key and input/output buffer pair for that public key.
* @return 0 if no bytes were read. 1 if bytes were read.
*/
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap)
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, const pollfd *pfds, contract_bufmap_t &bufmap)
{
bool bytes_read = false;
int i = 0;
for (auto &[pubkey, bufs] : bufmap)
{
// Get fds for the pubkey.
std::string output;
std::vector<int> &fds = fdmap[pubkey];
fd_pair &fds = fdmap[pubkey];
// This returns the total bytes read from the socket.
const int total_bytes_read = read_iosocket(true, fds, output);
const int total_bytes_read = read_iosocket(true, pfds[i++], output);
if (total_bytes_read == -1)
{
@@ -644,11 +660,11 @@ namespace sc
/**
* Common function to create a socket (Hp->SC, SC->HP).
* @param fds Vector to populate fd list.
* @param fds fd pair to populate.
* @param socket_type Type of the socket. (SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET)
* @return Returns -1 if socket creation fails otherwise 0.
*/
int create_iosockets(std::vector<int> &fds, const int socket_type)
int create_iosockets(fd_pair &fds, const int socket_type)
{
int socket[2] = {-1, -1};
// Create the socket of given type.
@@ -658,23 +674,22 @@ namespace sc
return -1;
}
// If socket got created, assign them to the fd vector.
fds.clear();
fds.push_back(socket[0]); //SCREADWRITE
fds.push_back(socket[1]); //HPREADWRITE
// If socket got created, assign them to the fd pair.
fds.scfd = socket[0];
fds.hpfd = socket[1];
return 0;
}
/**
* Common function to write the given input into the write fd from the HP side socket.
* @param fds Vector of fd list.
* @param fds fd pair.
* @param input Input to write into the HP write fd.
*/
int write_iosocket_seq_packet(std::vector<int> &fds, std::string_view input)
int write_iosocket_seq_packet(fd_pair &fds, std::string_view input)
{
// Write the inputs (if any) into the contract.
const int writefd = fds[SOCKETFDTYPE::HPREADWRITE];
const int writefd = fds.hpfd;
if (writefd == -1)
return 0;
@@ -689,119 +704,98 @@ namespace sc
/**
* Common function to read buffered output from the socket and populate the output.
* @param is_stream_socket Indicates whether socket is steam socket or not
* @param fds Vector representing the socket fd list.
* @param is_stream_socket Indicates whether socket is steam socket or not.
* @param pfd The pollfd struct containing poll status.
* @param output The buffer to place the read output.
* @return -1 on error. Otherwise no. of bytes read.
*/
int read_iosocket(const bool is_stream_socket, std::vector<int> &fds, std::string &output)
int read_iosocket(const bool is_stream_socket, const pollfd pfd, std::string &output)
{
// Read any available data that have been written by the contract process
// from the output socket and store in the output buffer.
// Outputs will be read by the consensus process later when it wishes so.
const int readfd = fds[SOCKETFDTYPE::HPREADWRITE];
int res = 0;
if (readfd == -1)
return 0;
// Available bytes returns the total number of bytes to read of multiple messages.
size_t available_bytes = 0;
if (ioctl(readfd, FIONREAD, &available_bytes) != -1)
if (pfd.revents & POLLIN)
{
if (available_bytes == 0)
{
res = 0;
}
else
{
const size_t bytes_to_read = is_stream_socket ? available_bytes : MIN(MAX_SEQ_PACKET_SIZE, available_bytes);
output.resize(bytes_to_read);
const int read_res = read(readfd, output.data(), bytes_to_read);
output.resize(read_res);
output.resize(READ_BUFFER_SIZE);
const int res = read(pfd.fd, output.data(), READ_BUFFER_SIZE);
output.resize(res); // Resize back to the actual bytes read.
if (read_res >= 0)
{
res = read_res;
if (is_stream_socket)
output.resize(read_res);
}
else
{
res = -1;
LOG_ERROR << errno << ": Error reading from contract socket.";
}
}
}
else
{
res = -1;
}
if (res == -1)
LOG_ERROR << errno << ": Error reading from contract socket. stream:" << is_stream_socket;
return res;
return res;
}
return 0;
}
void close_unused_fds(execution_context &ctx, const bool is_hp)
{
if (!ctx.args.readonly)
{
close_unused_socket_vectorfds(is_hp, ctx.nplfds);
close_unused_socket_fds(is_hp, ctx.nplfds);
}
close_unused_socket_vectorfds(is_hp, ctx.hpscfds);
close_unused_socket_fds(is_hp, ctx.controlfds);
// Loop through user fds.
for (auto &[pubkey, fds] : ctx.userfds)
close_unused_socket_vectorfds(is_hp, fds);
close_unused_socket_fds(is_hp, fds);
}
/**
* Common function for closing unused fds based on which process this gets called from.
* This also marks active fds with O_CLOEXEC for close-on-exec behaviour.
* @param is_hp Specify 'true' when calling from HP process. 'false' from SC process.
* @param fds Vector of fds to close.
* @param fds fd pair to close.
*/
void close_unused_socket_vectorfds(const bool is_hp, std::vector<int> &fds)
void close_unused_socket_fds(const bool is_hp, fd_pair &fds)
{
for (int fd_type = 0; fd_type <= 1; fd_type++)
if (is_hp)
{
const int fd = fds[fd_type];
if (fd != -1)
if (fds.scfd != -1)
{
if ((is_hp && fd_type == SOCKETFDTYPE::SCREADWRITE) ||
(!is_hp && fd_type == SOCKETFDTYPE::HPREADWRITE))
{
close(fd);
fds[fd_type] = -1;
}
else if (is_hp && (fd_type == SOCKETFDTYPE::HPREADWRITE))
{
// The fd must be kept open in HP process. But we must
// mark it to close on exec in a potential forked process.
int flags = fcntl(fd, F_GETFD, NULL);
flags |= FD_CLOEXEC;
fcntl(fd, F_SETFD, flags);
}
close(fds.scfd);
fds.scfd = -1;
}
// The hp fd must be kept open in HP process. But we must
// mark it to close on exec in a potential forked process.
if (fds.hpfd != -1)
{
int flags = fcntl(fds.hpfd, F_GETFD, NULL);
flags |= FD_CLOEXEC;
fcntl(fds.hpfd, F_SETFD, flags);
}
}
else
{
if (fds.hpfd != -1)
{
close(fds.hpfd);
fds.hpfd = -1;
}
}
}
/**
* Closes all fds in a vector fd set.
*/
void cleanup_vectorfds(std::vector<int> &fds)
void cleanup_fds(execution_context &ctx)
{
for (int i = 0; i < fds.size(); i++)
{
if (fds[i] != -1)
{
close(fds[i]);
fds[i] = -1;
}
}
cleanup_fd_pair(ctx.controlfds);
cleanup_fd_pair(ctx.nplfds);
for (auto &[pubkey, fds] : ctx.userfds)
cleanup_fd_pair(fds);
ctx.userfds.clear();
}
fds.clear();
/**
* Closes fds in a fd pair.
*/
void cleanup_fd_pair(fd_pair &fds)
{
if (fds.hpfd != -1)
close(fds.hpfd);
if (fds.scfd != -1)
close(fds.scfd);
fds.hpfd = -1;
fds.scfd = -1;
}
/**

View File

@@ -16,15 +16,10 @@ namespace sc
constexpr uint16_t MAX_NPL_MSG_QUEUE_SIZE = 64; // Maximum npl message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31)....
constexpr uint16_t MAX_CONTROL_MSG_QUEUE_SIZE = 64; // Maximum out message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31)....
// Enum used to differenciate socket fds maintained for SC socket.
enum SOCKETFDTYPE
struct fd_pair
{
// Used by Smart Contract to read input sent by Hot Pocket.
// Used by Smart Contract to write output back to Hot Pocket.
SCREADWRITE = 0,
// Used by Hot Pocket to write input to the smart contract.
// Used by Hot Pocket to read output from the smart contract.
HPREADWRITE = 1
int hpfd = -1;
int scfd = -1;
};
/**
@@ -35,10 +30,10 @@ namespace sc
uint32_t message_len = 0;
std::string message;
};
/**
* Represents list of inputs to the contract and the accumulated contract output for those inputs.
*/
/**
* Represents list of inputs to the contract and the accumulated contract output for those inputs.
*/
struct contract_iobufs
{
// List of inputs to be fed into the contract.
@@ -48,17 +43,17 @@ namespace sc
std::list<contract_output> outputs;
};
// Common typedef for a map of pubkey->fdlist.
// This is used to keep track of fdlist quadruplet with a public key (eg. user, npl).
typedef std::unordered_map<std::string, std::vector<int>> contract_fdmap_t;
// Common typedef for a map of pubkey->fdpair.
// This is used to keep track of fdpair with a public key (eg. user).
typedef std::map<std::string, fd_pair> contract_fdmap_t;
// Common typedef for a map of pubkey->I/O list pair (input list and output list).
// This is used to keep track of input/output buffers for a given public key (eg. user, npl)
typedef std::unordered_map<std::string, contract_iobufs> contract_bufmap_t;
// This is used to keep track of input/output buffers for a given public key (eg. user)
typedef std::map<std::string, contract_iobufs> contract_bufmap_t;
/**
* Holds information that should be passed into the contract process.
*/
* Holds information that should be passed into the contract process.
*/
struct contract_execution_args
{
// Whether the contract should execute in read only mode (to serve read requests).
@@ -97,8 +92,8 @@ namespace sc
};
/**
* Holds context information relating to contract execution environment.
*/
* Holds context information relating to contract execution environment.
*/
struct execution_context
{
// The arguments that was used to initiate this execution.
@@ -107,11 +102,11 @@ namespace sc
// Map of user socket fds (map key: user public key)
contract_fdmap_t userfds;
// Socket fds for NPL <--> messages.
std::vector<int> nplfds;
// Socket fds for NPL messages.
fd_pair nplfds;
// Socket fds for HP <--> messages.
std::vector<int> hpscfds;
// Socket fds for control messages.
fd_pair controlfds;
// Holds the contract process id (if currently executing).
pid_t contract_pid = 0;
@@ -155,9 +150,9 @@ namespace sc
int write_npl_messages(execution_context &ctx);
int read_control_outputs(execution_context &ctx);
int read_control_outputs(execution_context &ctx, const pollfd pfd);
int read_contract_npl_outputs(execution_context &ctx);
int read_npl_outputs(execution_context &ctx, const pollfd pfd);
void broadcast_npl_output(std::string_view output);
@@ -167,19 +162,21 @@ namespace sc
int create_iosockets_for_fdmap(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, const pollfd *pfds, contract_bufmap_t &bufmap);
int create_iosockets(std::vector<int> &fds, const int socket_type);
int create_iosockets(fd_pair &fds, const int socket_type);
int write_iosocket_seq_packet(std::vector<int> &fds, std::string_view input);
int write_iosocket_seq_packet(fd_pair &fds, std::string_view input);
int read_iosocket(const bool is_stream_socket, std::vector<int> &fds, std::string &output);
int read_iosocket(const bool is_stream_socket, const pollfd pfd, std::string &output);
void close_unused_fds(execution_context &ctx, const bool is_hp);
void close_unused_socket_vectorfds(const bool is_hp, std::vector<int> &fds);
void close_unused_socket_fds(const bool is_hp, fd_pair &fds);
void cleanup_vectorfds(std::vector<int> &fds);
void cleanup_fds(execution_context &ctx);
void cleanup_fd_pair(fd_pair &fds);
void stop(execution_context &ctx);