Replaced contract I/O pipes with domain sockets. (#140)

This commit is contained in:
Savinda Senevirathne
2020-11-02 16:50:21 +05:30
committed by GitHub
parent 37cc27c5ce
commit 7a4515865d
4 changed files with 254 additions and 210 deletions

View File

@@ -9,7 +9,7 @@
namespace sc
{
const int MAX_NPL_BUF_SIZE = 128 * 1024;
const int MAX_SEQ_PACKET_SIZE = 128 * 1024;
bool init_success = false;
// We maintain two hpfs global processes for merging and rw sessions.
@@ -62,13 +62,14 @@ namespace sc
if (start_hpfs_session(ctx) == -1)
return -1;
// Setup io pipes and feed all inputs to them.
create_iopipes_for_fdmap(ctx.userfds, ctx.args.userbufs);
// Setup io sockets and feed all inputs to them.
create_iosockets_for_fdmap(ctx.userfds, ctx.args.userbufs);
if (!ctx.args.readonly)
{
create_iosockets(ctx.nplfds);
create_iopipes(ctx.hpscfds, !ctx.args.hpscbufs.inputs.empty());
// create sequential packet sockets for npl and hp messages.
create_iosockets(ctx.nplfds, SOCK_SEQPACKET);
create_iosockets(ctx.hpscfds, SOCK_SEQPACKET);
}
int ret = 0;
@@ -269,8 +270,8 @@ namespace sc
if (!ctx.args.readonly)
{
os << ",\"lcl\":\"" << ctx.args.lcl
<< "\",\"hpfd\":[" << ctx.hpscfds[FDTYPE::SCREAD] << "," << ctx.hpscfds[FDTYPE::SCWRITE]
<< "],\"nplfd\":" << ctx.nplfds[SOCKETFDTYPE::SCREADWRITE];
<< "\",\"hpfd\":" << ctx.hpscfds[SOCKETFDTYPE::SCREADWRITE]
<< ",\"nplfd\":" << ctx.nplfds[SOCKETFDTYPE::SCREADWRITE];
}
os << ",\"usrfd\":{";
@@ -326,11 +327,14 @@ namespace sc
int feed_inputs(execution_context &ctx)
{
// Write any input messages to hp->sc pipe.
// Write any input messages to hp->sc socket.
if (!ctx.args.readonly && write_contract_hp_inputs(ctx) == -1)
{
LOG_ERROR << "Error when writing contract hp inputs.";
return -1;
}
// Write any verified (consensus-reached) user inputs to user pipes.
// Write any user inputs to user sockets.
if (write_contract_fdmap_inputs(ctx.userfds, ctx.args.userbufs) == -1)
{
LOG_ERROR << "Failed to write user inputs to contract.";
@@ -389,7 +393,7 @@ namespace sc
*/
int write_contract_hp_inputs(execution_context &ctx)
{
if (write_iopipe(ctx.hpscfds, ctx.args.hpscbufs.inputs) == -1)
if (write_iosocket_seq_packet(ctx.hpscfds, ctx.args.hpscbufs.inputs, false) == -1)
{
LOG_ERROR << "Error writing HP inputs to SC";
return -1;
@@ -445,7 +449,8 @@ namespace sc
*/
int read_contract_hp_outputs(execution_context &ctx)
{
const int hpsc_res = read_iopipe(ctx.hpscfds, ctx.args.hpscbufs.output);
std::string output;
const int hpsc_res = read_iosocket_seq_packet(ctx.hpscfds, ctx.args.hpscbufs.output);
if (hpsc_res == -1)
{
LOG_ERROR << "Error reading HP output from the contract.";
@@ -463,7 +468,7 @@ namespace sc
int read_contract_npl_outputs(execution_context &ctx)
{
std::string output;
const int npl_res = read_iosocket(ctx.nplfds, output);
const int npl_res = read_iosocket_seq_packet(ctx.nplfds, output);
if (npl_res == -1)
{
@@ -514,24 +519,23 @@ namespace sc
pubkey.length() - 1);
// Write hex pubkey and fds.
os << "\"" << pubkeyhex << "\":["
<< itr->second[FDTYPE::SCREAD] << ","
<< itr->second[FDTYPE::SCWRITE] << "]";
os << "\"" << pubkeyhex << "\":"
<< itr->second[SOCKETFDTYPE::SCREADWRITE];
}
}
/**
* Creates io pipes for all pubkeys specified in bufmap.
* Creates io sockets for all pubkeys specified in bufmap.
* @param fdmap A map which has public key and a vector<int> as fd list for that public key.
* @param bufmap A map which has a public key and input/output buffer lists for that public key.
* @return 0 on success. -1 on failure.
*/
int create_iopipes_for_fdmap(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap)
int create_iosockets_for_fdmap(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap)
{
for (auto &[pubkey, buflist] : bufmap)
{
std::vector<int> fds = std::vector<int>();
if (create_iopipes(fds, !buflist.inputs.empty()) == -1)
if (create_iosockets(fds, SOCK_STREAM) == -1)
return -1;
fdmap.emplace(pubkey, std::move(fds));
@@ -541,7 +545,7 @@ namespace sc
}
/**
* Common function to create the pipes and write buffer inputs to the fdmap.
* Common function to create the sockets and write buffer inputs to the fdmap.
* We take mutable parameters since the internal entries in the maps will be
* modified (eg. fd close, buffer clear).
*
@@ -554,7 +558,7 @@ namespace sc
// Loop through input buffers for each pubkey.
for (auto &[pubkey, buflist] : bufmap)
{
if (write_iopipe(fdmap[pubkey], buflist.inputs) == -1)
if (write_iosocket_stream(fdmap[pubkey], buflist.inputs, true) == -1)
return -1;
}
@@ -577,9 +581,11 @@ namespace sc
// Get fds for the pubkey.
std::vector<int> &fds = fdmap[pubkey];
const int res = read_iopipe(fds, bufpair.output);
const int res = read_iosocket_stream(fds, bufpair.output);
if (res == -1)
{
return -1;
}
if (res > 0)
bytes_read = true;
@@ -600,50 +606,19 @@ namespace sc
fdmap.clear();
}
/**
* Common function to create a pair of pipes (Hp->SC, SC->HP).
* @param fds Vector to populate fd list.
* @param create_inpipe Whether to create the input pipe from HP to SC.
*/
int create_iopipes(std::vector<int> &fds, const bool create_inpipe)
{
int inpipe[2] = {-1, -1};
if (create_inpipe && pipe(inpipe) == -1)
return -1;
int outpipe[2] = {-1, -1};
if (pipe(outpipe) == -1)
{
if (create_inpipe)
{
// Close the earlier created pipe.
close(inpipe[0]);
close(inpipe[1]);
}
return -1;
}
// If both pipes got created, assign them to the fd vector.
fds.clear();
fds.push_back(inpipe[0]); //SCREAD
fds.push_back(inpipe[1]); //HPWRITE
fds.push_back(outpipe[0]); //HPREAD
fds.push_back(outpipe[1]); //SCWRITE
return 0;
}
/**
* Common function to create a socket (Hp->SC, SC->HP).
* @param fds Vector to populate fd list.
* @param socket_type Type of the socket. (SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET)
* @return Returns -1 if socket creation fails otherwise 0.
*/
int create_iosockets(std::vector<int> &fds)
int create_iosockets(std::vector<int> &fds, const int socket_type)
{
int socket[2] = {-1, -1};
// Create a sequence packet socket.
if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, socket) == -1)
// Create the socket of given type.
if (socketpair(AF_UNIX, socket_type, 0, socket) == -1)
{
LOG_ERROR << errno << ": Error when creating domain socket.";
return -1;
}
@@ -656,15 +631,16 @@ namespace sc
}
/**
* Common function to write the given input buffer into the write fd from the HP side.
* Common function to write the given input buffer into the write fd from the HP side socket.
* @param fds Vector of fd list.
* @param inputs Buffer to write into the HP write fd.
* @param close_if_empty Close the socket after writing if this is true.
*/
int write_iopipe(std::vector<int> &fds, std::list<std::string> &inputs)
int write_iosocket_stream(std::vector<int> &fds, std::list<std::string> &inputs, const bool close_if_empty)
{
// Write the inputs (if any) into the contract and close the writefd.
// Write the inputs (if any) into the contract.
const int writefd = fds[FDTYPE::HPWRITE];
const int writefd = fds[SOCKETFDTYPE::HPREADWRITE];
if (writefd == -1)
return 0;
@@ -673,78 +649,82 @@ namespace sc
if (!inputs.empty())
{
// Prepare the input memory segments to write with wrtiev.
size_t i = 0;
iovec memsegs[inputs.size()];
iovec memsegs[2];
std::string msg_buf;
for (std::string &input : inputs)
{
memsegs[i].iov_base = input.data();
memsegs[i].iov_len = input.length();
i++;
// Concat messages into one message segment.
msg_buf += input;
}
// Storing message len in big endian.
uint8_t header[4];
header[0] = msg_buf.length() >> 24;
header[1] = msg_buf.length() >> 16;
header[2] = msg_buf.length() >> 8;
header[3] = msg_buf.length();
memsegs[0].iov_base = header;
memsegs[0].iov_len = sizeof(header);
memsegs[1].iov_base = msg_buf.data();
memsegs[1].iov_len = msg_buf.length();
if (writev(writefd, memsegs, inputs.size()) == -1)
if (writev(writefd, memsegs, 2) == -1)
write_error = true;
inputs.clear();
}
else if (close_if_empty)
{
close(writefd);
fds[SOCKETFDTYPE::HPREADWRITE] = -1;
}
// Close the writefd since we no longer need it.
close(writefd);
fds[FDTYPE::HPWRITE] = -1;
if (write_error)
LOG_ERROR << errno << ": Error writing to stream socket.";
return write_error ? -1 : 0;
}
/**
* Common function to read buffered output from the pipe and populate the output list.
* @param fds Vector representing the pipes fd list.
* @param output The buffer to place the read output.
* @return -1 on error. Otherwise no. of bytes read.
* Common function to write the given input buffer into the write fd from the HP side socket.
* @param fds Vector of fd list.
* @param inputs Buffer to write into the HP write fd.
* @param close_if_empty Close the socket after writing if this is true.
*/
int read_iopipe(std::vector<int> &fds, std::string &output)
int write_iosocket_seq_packet(std::vector<int> &fds, std::list<std::string> &inputs, const bool close_if_empty)
{
// Read any available data that have been written by the contract process
// from the output pipe and store in the output buffer.
// Outputs will be read by the consensus process later when it wishes so.
const int readfd = fds[FDTYPE::HPREAD];
if (readfd == -1)
// Write the inputs (if any) into the contract.
const int writefd = fds[SOCKETFDTYPE::HPREADWRITE];
if (writefd == -1)
return 0;
bool read_error = false;
size_t available_bytes = 0;
if (ioctl(readfd, FIONREAD, &available_bytes) != -1)
bool write_error = false;
if (!inputs.empty())
{
if (available_bytes == 0)
return 0;
const size_t current_size = output.size();
output.resize(current_size + available_bytes);
const int res = read(readfd, output.data() + current_size, available_bytes);
if (res >= 0)
for (std::string &input : inputs)
{
if (res == 0) // EOF
{
close(readfd);
fds[FDTYPE::HPREAD] = -1;
}
return res;
if (write(writefd, input.data(), input.length()) == -1)
write_error = true;
}
}
else if (close_if_empty)
{
close(writefd);
fds[SOCKETFDTYPE::HPREADWRITE] = -1;
}
if (write_error)
LOG_ERROR << errno << ": Error writing to sequece packet socket.";
close(readfd);
fds[FDTYPE::HPREAD] = -1;
return -1;
return write_error ? -1 : 0;
}
/**
* Common function to read buffered output from the socket and populate the output.
* Common function to read buffered output from the sequence packet socket and populate the output.
* @param fds Vector representing the socket fd list.
* @param output The buffer to place the read output.
* @return -1 on error. Otherwise no. of bytes read.
*/
int read_iosocket(std::vector<int> &fds, std::string &output)
int read_iosocket_seq_packet(std::vector<int> &fds, std::string &output)
{
// Read any available data that have been written by the contract process
// from the output socket and store in the output buffer.
@@ -762,13 +742,73 @@ namespace sc
if (available_bytes == 0)
return 0;
output.resize(MAX_NPL_BUF_SIZE);
const int res = read(readfd, output.data(), MAX_NPL_BUF_SIZE);
output.resize(MIN(MAX_SEQ_PACKET_SIZE, available_bytes));
const int res = read(readfd, output.data(), MAX_SEQ_PACKET_SIZE);
output.resize(res);
return res;
if (res >= 0)
{
if (res == 0) // EOF
{
close(readfd);
fds[SOCKETFDTYPE::HPREADWRITE] = -1;
}
return res;
}
}
close(readfd);
fds[SOCKETFDTYPE::HPREADWRITE] = -1;
LOG_ERROR << errno << ": Error reading sequence packet socket.";
return -1;
}
/**
* Common function to read buffered output from the stream socket and populate the output list.
* @param fds Vector representing the sockets fd list.
* @param output The buffer to place the read output.
* @return -1 on error. Otherwise no. of bytes read.
*/
int read_iosocket_stream(std::vector<int> &fds, std::string &output)
{
// Read any available data that have been written by the contract process
// from the output socket and store in the output buffer.
// Outputs will be read by the consensus process later when it wishes so.
const int readfd = fds[SOCKETFDTYPE::HPREADWRITE];
if (readfd == -1)
return 0;
bool read_error = false;
size_t available_bytes = 0;
if (ioctl(readfd, FIONREAD, &available_bytes) != -1)
{
if (available_bytes == 0)
{
return 0;
}
const size_t current_size = output.size();
output.resize(current_size + available_bytes);
const int res = read(readfd, output.data() + current_size, available_bytes);
if (res >= 0)
{
if (res == 0) // EOF
{
close(readfd);
fds[SOCKETFDTYPE::HPREADWRITE] = -1;
}
return res;
}
}
close(readfd);
fds[SOCKETFDTYPE::HPREADWRITE] = -1;
LOG_ERROR << errno << ": Error reading stream socket.";
return -1;
}
@@ -776,44 +816,13 @@ namespace sc
{
if (!ctx.args.readonly)
{
close_unused_vectorfds(is_hp, ctx.hpscfds);
close_unused_socket_vectorfds(is_hp, ctx.hpscfds);
close_unused_socket_vectorfds(is_hp, ctx.nplfds);
}
// Loop through user fds.
for (auto &[pubkey, fds] : ctx.userfds)
close_unused_vectorfds(is_hp, fds);
}
/**
* Common function for closing unused fds based on which process this gets called from.
* This also marks active fds with O_CLOEXEC for close-on-exec behaviour.
* @param is_hp Specify 'true' when calling from HP process. 'false' from SC process.
* @param fds Vector of fds to close.
*/
void close_unused_vectorfds(const bool is_hp, std::vector<int> &fds)
{
for (int fd_type = 0; fd_type <= 3; fd_type++)
{
const int fd = fds[fd_type];
if (fd != -1)
{
if ((is_hp && (fd_type == FDTYPE::SCREAD || fd_type == FDTYPE::SCWRITE)) ||
(!is_hp && (fd_type == FDTYPE::HPREAD || fd_type == FDTYPE::HPWRITE)))
{
close(fd);
fds[fd_type] = -1;
}
else if (is_hp && (fd_type == FDTYPE::HPREAD || fd_type == FDTYPE::HPWRITE))
{
// The fd must be kept open in HP process. But we must
// mark it to close on exec in a potential forked process.
int flags = fcntl(fd, F_GETFD, NULL);
flags |= FD_CLOEXEC;
fcntl(fd, F_SETFD, flags);
}
}
}
close_unused_socket_vectorfds(is_hp, fds);
}
/**