Migrated user inputs from domain sockets to memfd. (#159)

This commit is contained in:
Ravin Perera
2020-11-23 21:52:05 +05:30
committed by GitHub
parent bf710c6bde
commit 9c1f62dbde
18 changed files with 512 additions and 527 deletions

View File

@@ -25,6 +25,7 @@ add_executable(appbill
add_executable(hpcore
src/util.cpp
src/util/buffer_store.cpp
src/crypto.cpp
src/conf.cpp
src/hplog.cpp

View File

@@ -36,6 +36,7 @@ void echo_contract(const struct hp_contract_context *ctx)
if (fd > 0)
{
char tsbuf[20];
memset(tsbuf, 0, 20);
sprintf(tsbuf, "%lu\n", ctx->timestamp);
struct iovec vec[2] = {{"ts:", 4}, {(void *)tsbuf, 20}};
writev(fd, vec, 2);

View File

@@ -7,67 +7,77 @@
#include <stdbool.h>
#include <poll.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <pthread.h>
#include "json.h"
#define _HP_KEY_SIZE 64
#define _HP_HASH_SIZE 64
#define _HP_MSG_HEADER_LEN 4
#define _HP_USER_BUF_SIZE 4096 // Buffer used to read user message data.
#define _HP_SEQPKT_BUF_SIZE 131072 // 128KB to support SEQ_PACKET sockets.
#define _HP_POLL_TIMEOUT 20
#define __HP_KEY_SIZE 64
#define __HP_HASH_SIZE 64
#define __HP_MSG_HEADER_LEN 4
#define __HP_SEQPKT_BUF_SIZE 131072 // 128KB to support SEQ_PACKET sockets.
#define __HP_POLL_TIMEOUT 20
#define _HP_MIN(a, b) ((a < b) ? a : b)
#define __HP_MMAP_BLOCK_SIZE 4096
#define __HP_MMAP_BLOCK_ALIGN(x) (((x) + ((typeof(x))(__HP_MMAP_BLOCK_SIZE)-1)) & ~((typeof(x))(__HP_MMAP_BLOCK_SIZE)-1))
#define _HP_ASSIGN_STRING(dest, elem) \
if (elem->value->type == json_type_string) \
{ \
struct json_string_s *value = (struct json_string_s *)elem->value->payload; \
memcpy(dest, value->string, sizeof(dest)); \
#define __HP_ASSIGN_STRING(dest, elem) \
if (elem->value->type == json_type_string) \
{ \
const struct json_string_s *value = (struct json_string_s *)elem->value->payload; \
memcpy(dest, value->string, sizeof(dest)); \
}
#define _HP_ASSIGN_UINT64(dest, elem) \
if (elem->value->type == json_type_number) \
{ \
struct json_number_s *value = (struct json_number_s *)elem->value->payload; \
dest = strtoull(value->number, NULL, 0); \
#define __HP_ASSIGN_UINT64(dest, elem) \
if (elem->value->type == json_type_number) \
{ \
const struct json_number_s *value = (struct json_number_s *)elem->value->payload; \
dest = strtoull(value->number, NULL, 0); \
}
#define _HP_ASSIGN_INT(dest, elem) \
if (elem->value->type == json_type_number) \
{ \
struct json_number_s *value = (struct json_number_s *)elem->value->payload; \
dest = atoi(value->number); \
#define __HP_ASSIGN_INT(dest, elem) \
if (elem->value->type == json_type_number) \
{ \
const struct json_number_s *value = (struct json_number_s *)elem->value->payload; \
dest = atoi(value->number); \
}
#define _HP_ASSIGN_BOOL(dest, elem) \
#define __HP_ASSIGN_BOOL(dest, elem) \
if (elem->value->type == json_type_true) \
dest = true; \
else if (elem->value->type == json_type_false) \
dest = false;
#define _HP_FROM_BE(buf, pos) \
#define __HP_FROM_BE(buf, pos) \
((uint8_t)buf[pos + 0] << 24 | (uint8_t)buf[pos + 1] << 16 | (uint8_t)buf[pos + 2] << 8 | (uint8_t)buf[pos + 3])
#define _HP_TO_BE(num, buf, pos) \
buf[pos] = num >> 24; \
buf[1 + pos] = num >> 16; \
buf[2 + pos] = num >> 8; \
#define __HP_TO_BE(num, buf, pos) \
buf[pos] = num >> 24; \
buf[1 + pos] = num >> 16; \
buf[2 + pos] = num >> 8; \
buf[3 + pos] = num;
struct hp_user_input
{
off_t offset;
uint32_t size;
};
struct hp_user
{
char pubkey[_HP_KEY_SIZE + 1];
int fd;
char pubkey[__HP_KEY_SIZE + 1];
int outfd;
struct hp_user_input *inputs;
uint32_t inputs_count;
};
struct hp_peer
{
char pubkey[_HP_KEY_SIZE + 1];
char pubkey[__HP_KEY_SIZE + 1];
};
struct hp_users_collection
{
int infd;
struct hp_user *list;
size_t count;
};
@@ -84,9 +94,9 @@ struct hp_contract_context
bool readonly;
uint64_t timestamp;
char pubkey[_HP_KEY_SIZE + 1];
char pubkey[__HP_KEY_SIZE + 1];
char lcl_hash[_HP_HASH_SIZE + 1];
char lcl_hash[__HP_HASH_SIZE + 1];
uint64_t lcl_seq_no;
struct hp_users_collection users;
@@ -99,29 +109,11 @@ struct __hp_global_context
bool should_stop;
};
struct __hp_user_state
{
bool completed; // Whether we have finished processing all incoming messages for this user.
uint32_t total_messages; // Total messages for the user.
uint32_t processed_messages; // No. of processed messages so far for the user.
bool total_messages_known; // Whether the total messages count has been set properly.
uint8_t header_buf[_HP_MSG_HEADER_LEN]; // Header length buf (total msg count or msg size header).
uint8_t header_filled_len; // Current no. of header bytes collected so far.
uint32_t msg_actual_len; // Actual(final) size of current message.
uint32_t msg_filled_len; // Current no. of message bytes collected so far.
uint8_t *msg_buf; // Buf holding the collected bytes for the current message.
struct hp_user *user; // The user reference tracked by this state struct.
};
typedef void (*hp_contract_func)(const struct hp_contract_context *ctx);
typedef void (*hp_user_message_func)(const struct hp_contract_context *ctx,
const struct hp_user *user, const void *buf, const uint32_t len);
const struct hp_user *user, const void *buf, const uint32_t len);
typedef void (*hp_peer_message_func)(const struct hp_contract_context *ctx,
const char *peerPubKey, const void *buf, const uint32_t len);
const char *peerPubKey, const void *buf, const uint32_t len);
struct __hp_peer_message_thread_arg
{
@@ -140,11 +132,7 @@ int hp_peer_write(const struct hp_contract_context *ctx, const uint8_t *buf, con
int hp_peer_writev(const struct hp_contract_context *ctx, const struct iovec *bufs, const int buf_count);
void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_context *ctx, const struct json_object_s *object);
void __hp_parse_user_chunk(const struct hp_contract_context *ctx, struct __hp_user_state *us,
const uint8_t *buf, const uint32_t len, hp_user_message_func on_user_message);
bool __hp_parse_length_header(struct __hp_user_state *us, const uint8_t *chunk, const uint32_t chunk_len,
uint32_t *chunk_pos, uint32_t *target);
void __hp_free_contract_context(struct hp_contract_context *ctx);
static void *__hp_peer_message_thread_func(void *arg);
@@ -180,7 +168,7 @@ int hp_init(hp_contract_func contract_func)
// Start control channel listener.
if (pthread_create(&__hp_control_thread, NULL, &__hp_control_message_thread_func, NULL) == -1)
{
perror("Error creating control thread. ");
perror("Error creating control thread");
goto error;
}
@@ -199,15 +187,17 @@ int hp_init(hp_contract_func contract_func)
__hp_control_thread = 0;
// Cleanup.
close(ctx.users.infd);
for (int i = 0; i < ctx.users.count; i++)
close(ctx.users.list[i].fd);
close(ctx.users.list[i].outfd);
close(ctx.peers.fd);
__hp_free_contract_context(&ctx);
// Send termination control message.
write(gctx.control_fd, "Terminated", 10);
close(gctx.control_fd);
return 0;
}
}
@@ -223,97 +213,39 @@ error:
int hp_user_message_loop(const struct hp_contract_context *ctx, hp_user_message_func on_user_message)
{
int result = 0;
const int fd = ctx->users.infd;
// We poll user fds, control fd and npl fd (npl fd not available in read only mode)
const size_t total_users = ctx->users.count;
size_t remaining_users = total_users;
// User states list to keep track of message collection status for each user.
struct __hp_user_state user_states[total_users];
memset(user_states, 0, sizeof(struct __hp_user_state) * total_users);
// Temp buffer for all read operations.
uint8_t *buf = malloc(_HP_USER_BUF_SIZE);
// Create fd set to be polled.
struct pollfd pollfds[total_users];
for (int i = 0; i < total_users; i++)
struct stat st;
if (fstat(fd, &st) == -1)
{
pollfds[i].fd = ctx->users.list[i].fd;
pollfds[i].events = POLLIN;
pollfds[i].revents = 0;
user_states[i].user = &ctx->users.list[i];
perror("Error in user input fd stat");
return -1;
}
while (remaining_users > 0)
if (st.st_size == 0)
return 0;
const size_t mmap_size = __HP_MMAP_BLOCK_ALIGN(st.st_size);
void *fdptr = mmap(NULL, mmap_size, PROT_READ, MAP_PRIVATE, fd, 0);
if (fdptr == MAP_FAILED)
{
// Reset poll fd set because we are reusing it.
for (int i = 0; i < total_users; i++)
pollfds[0].revents = 0;
perror("Error in user input fd mmap");
return -1;
}
if (poll(pollfds, total_users, _HP_POLL_TIMEOUT) == -1)
close(fd); // We can close the fd after mmap.
for (int i = 0; i < ctx->users.count; i++)
{
const struct hp_user *user = &ctx->users.list[i];
for (int j = 0; j < user->inputs_count; j++)
{
perror("User poll error. ");
goto error;
}
for (int i = 0; i < total_users; i++)
{
short result = pollfds[i].revents;
if (result == 0)
continue;
if (result & (POLLHUP | POLLERR | POLLNVAL))
{
fprintf(stderr, "User poll returned error.\n");
goto error;
}
else if (result & POLLIN)
{
const size_t read_res = read(pollfds[i].fd, buf, _HP_USER_BUF_SIZE);
if (read_res == -1)
{
perror("Error reading user socket. ");
goto error;
}
if (!user_states[i].completed)
{
// User sockets are stream sockets. So we have to do the message stitching ourselves based on
// total msg count and msg size headers sent over the stream.
__hp_parse_user_chunk(ctx, &user_states[i], buf, read_res, on_user_message);
if (user_states[i].completed)
{
remaining_users--;
// All users completed.
if (remaining_users == 0)
break;
}
}
}
const struct hp_user_input *input = &user->inputs[j];
on_user_message(ctx, user, (fdptr + input->offset), input->size);
}
}
// If we reach here that means result is successful.
result = 0;
goto end;
error:
// On error set result to -1.
result = -1;
end:
for (int i = 0; i < total_users; i++)
{
if (user_states[i].msg_buf)
free(user_states[i].msg_buf);
}
free(buf);
return result;
munmap(fdptr, mmap_size);
}
int hp_peer_message_listener(const struct hp_contract_context *ctx, hp_peer_message_func on_peer_message)
@@ -330,7 +262,7 @@ int hp_peer_message_listener(const struct hp_contract_context *ctx, hp_peer_mess
arg->on_peer_message = on_peer_message;
if (pthread_create(&__hp_peer_thread, NULL, &__hp_peer_message_thread_func, arg) == -1)
{
perror("Error creating peer thread. ");
perror("Error creating peer thread");
return -1;
}
@@ -356,20 +288,20 @@ int hp_user_writev(const struct hp_user *user, const struct iovec *bufs, const i
msg_len += bufs[i].iov_len;
}
uint8_t header_buf[_HP_MSG_HEADER_LEN];
_HP_TO_BE(msg_len, header_buf, 0);
uint8_t header_buf[__HP_MSG_HEADER_LEN];
__HP_TO_BE(msg_len, header_buf, 0);
all_bufs[0].iov_base = header_buf;
all_bufs[0].iov_len = _HP_MSG_HEADER_LEN;
all_bufs[0].iov_len = __HP_MSG_HEADER_LEN;
return writev(user->fd, all_bufs, total_buf_count);
return writev(user->outfd, all_bufs, total_buf_count);
}
int hp_peer_write(const struct hp_contract_context *ctx, const uint8_t *buf, const uint32_t len)
{
if (len > _HP_SEQPKT_BUF_SIZE)
if (len > __HP_SEQPKT_BUF_SIZE)
{
fprintf(stderr, "Peer message exceeds max length %d.", _HP_SEQPKT_BUF_SIZE);
fprintf(stderr, "Peer message exceeds max length %d.", __HP_SEQPKT_BUF_SIZE);
return -1;
}
@@ -382,138 +314,39 @@ int hp_peer_writev(const struct hp_contract_context *ctx, const struct iovec *bu
for (int i = 0; i < buf_count; i++)
len += bufs[i].iov_len;
if (len > _HP_SEQPKT_BUF_SIZE)
if (len > __HP_SEQPKT_BUF_SIZE)
{
fprintf(stderr, "Peer message exceeds max length %d.", _HP_SEQPKT_BUF_SIZE);
fprintf(stderr, "Peer message exceeds max length %d.", __HP_SEQPKT_BUF_SIZE);
return -1;
}
return writev(ctx->peers.fd, bufs, buf_count);
}
void __hp_parse_user_chunk(const struct hp_contract_context *ctx, struct __hp_user_state *us,
const uint8_t *chunk, const uint32_t chunk_len, hp_user_message_func on_user_message)
{
uint32_t pos = 0;
if (!us->total_messages_known)
us->total_messages_known = __hp_parse_length_header(us, chunk, chunk_len, &pos, &us->total_messages);
if (!us->total_messages_known)
return;
if (us->total_messages == 0)
{
us->completed = true;
return;
}
while (pos < chunk_len)
{
if (us->msg_actual_len == 0)
{
if (__hp_parse_length_header(us, chunk, chunk_len, &pos, &us->msg_actual_len) && us->msg_actual_len == 0)
{
// If we parse msg length=0, then abandon further processing for this user.
fprintf(stderr, "Message size 0 received for user.\n");
us->completed = true;
return;
}
}
// Going inside following 'if' means we know the current message length, and there are more data bytes to be read.
if (us->msg_actual_len > 0 && pos < chunk_len)
{
if (!us->msg_buf)
us->msg_buf = malloc(us->msg_actual_len);
const uint32_t remaining_len = chunk_len - pos;
const uint32_t msg_bytes_to_copy = _HP_MIN(remaining_len, (us->msg_actual_len - us->msg_filled_len));
memcpy(us->msg_buf + us->msg_filled_len, (chunk + pos), msg_bytes_to_copy);
us->msg_filled_len += msg_bytes_to_copy;
pos += msg_bytes_to_copy;
// See whether we just completed forming a full message.
if (us->msg_filled_len == us->msg_actual_len)
{
// Execute on_message func with msg_buf.
on_user_message(ctx, us->user, us->msg_buf, us->msg_actual_len);
// Reset message construction.
free(us->msg_buf);
us->msg_buf = NULL;
us->msg_actual_len = 0;
us->msg_filled_len = 0;
us->processed_messages++;
if (us->processed_messages == us->total_messages)
{
us->completed = true;
return;
}
}
}
}
}
bool __hp_parse_length_header(struct __hp_user_state *us, const uint8_t *chunk, const uint32_t chunk_len,
uint32_t *chunk_pos, uint32_t *target)
{
uint32_t pos = *chunk_pos;
const uint32_t remaining_len = chunk_len - pos;
// See if we can detect complete length header without the help of the header buffer.
if (remaining_len >= _HP_MSG_HEADER_LEN && us->header_filled_len == 0)
{
*target = _HP_FROM_BE(chunk, pos);
*chunk_pos = pos + _HP_MSG_HEADER_LEN;
return true;
}
else
{
const uint32_t header_bytes_to_copy = _HP_MIN(remaining_len, (_HP_MSG_HEADER_LEN - us->header_filled_len));
memcpy(us->header_buf + us->header_filled_len, (chunk + pos), header_bytes_to_copy);
us->header_filled_len += header_bytes_to_copy;
*chunk_pos = pos + header_bytes_to_copy;
// See whether we can now read length value after new bytes where added to the header.
if (us->header_filled_len == _HP_MSG_HEADER_LEN)
{
*target = _HP_FROM_BE(us->header_buf, 0);
us->header_filled_len = 0;
return true;
}
}
return false; // Couldn't detect the length header with available bytes.
}
void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_context *ctx, const struct json_object_s *object)
{
struct json_object_element_s *elem = object->start;
const struct json_object_element_s *elem = object->start;
do
{
struct json_string_s *k = elem->name;
const struct json_string_s *k = elem->name;
if (strcmp(k->string, "pubkey") == 0)
{
_HP_ASSIGN_STRING(ctx->pubkey, elem);
__HP_ASSIGN_STRING(ctx->pubkey, elem);
}
else if (strcmp(k->string, "ts") == 0)
{
_HP_ASSIGN_UINT64(ctx->timestamp, elem);
__HP_ASSIGN_UINT64(ctx->timestamp, elem);
}
else if (strcmp(k->string, "readonly") == 0)
{
_HP_ASSIGN_BOOL(ctx->readonly, elem);
__HP_ASSIGN_BOOL(ctx->readonly, elem);
}
else if (strcmp(k->string, "lcl") == 0)
{
if (elem->value->type == json_type_string)
{
struct json_string_s *value = (struct json_string_s *)elem->value->payload;
const struct json_string_s *value = (struct json_string_s *)elem->value->payload;
const char *delim = "-";
char *tok_ptr;
char *tok_str = strdup(value->string);
@@ -521,14 +354,19 @@ void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_c
const char *hash_str = strtok_r(NULL, delim, &tok_ptr);
ctx->lcl_seq_no = strtoull(seq_str, NULL, 0);
memcpy(ctx->lcl_hash, hash_str, _HP_HASH_SIZE);
memcpy(ctx->lcl_hash, hash_str, __HP_HASH_SIZE);
free(tok_str);
}
}
else if (strcmp(k->string, "usrfd") == 0)
else if (strcmp(k->string, "userinfd") == 0)
{
__HP_ASSIGN_INT(ctx->users.infd, elem);
}
else if (strcmp(k->string, "users") == 0)
{
if (elem->value->type == json_type_object)
{
struct json_object_s *user_object = (struct json_object_s *)elem->value->payload;
const struct json_object_s *user_object = (struct json_object_s *)elem->value->payload;
const size_t user_count = user_object->length;
ctx->users.count = user_count;
@@ -539,9 +377,35 @@ void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_c
struct json_object_element_s *user_elem = user_object->start;
for (int i = 0; i < user_count; i++)
{
memcpy(ctx->users.list[i].pubkey, user_elem->name->string, _HP_KEY_SIZE);
_HP_ASSIGN_INT(ctx->users.list[i].fd, user_elem);
struct hp_user *user = &ctx->users.list[i];
memcpy(user->pubkey, user_elem->name->string, __HP_KEY_SIZE);
if (user_elem->value->type == json_type_array)
{
const struct json_array_s *arr = (struct json_array_s *)user_elem->value->payload;
struct json_array_element_s *arr_elem = arr->start;
// First element is the output fd.
__HP_ASSIGN_INT(user->outfd, arr_elem);
arr_elem = arr_elem->next;
// Subsequent elements are tupels of [offset, size] of input messages for this user.
user->inputs_count = arr->length - 1;
user->inputs = user->inputs_count ? malloc(user->inputs_count * sizeof(struct hp_user_input)) : NULL;
for (int i = 0; i < user->inputs_count; i++)
{
if (arr_elem->value->type == json_type_array)
{
const struct json_array_s *input_info = (struct json_array_s *)arr_elem->value->payload;
if (input_info->length == 2)
{
__HP_ASSIGN_UINT64(user->inputs[i].offset, input_info->start);
__HP_ASSIGN_UINT64(user->inputs[i].size, input_info->start->next);
}
}
arr_elem = arr_elem->next;
}
}
user_elem = user_elem->next;
}
}
@@ -549,13 +413,13 @@ void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_c
}
else if (strcmp(k->string, "nplfd") == 0)
{
_HP_ASSIGN_INT(ctx->peers.fd, elem);
__HP_ASSIGN_INT(ctx->peers.fd, elem);
}
else if (strcmp(k->string, "unl") == 0)
{
if (elem->value->type == json_type_array)
{
struct json_array_s *peer_array = (struct json_array_s *)elem->value->payload;
const struct json_array_s *peer_array = (struct json_array_s *)elem->value->payload;
const size_t peer_count = peer_array->length;
ctx->peers.count = peer_count;
@@ -566,7 +430,7 @@ void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_c
struct json_array_element_s *peer_elem = peer_array->start;
for (int i = 0; i < peer_count; i++)
{
_HP_ASSIGN_STRING(ctx->peers.list[i].pubkey, peer_elem);
__HP_ASSIGN_STRING(ctx->peers.list[i].pubkey, peer_elem);
peer_elem = peer_elem->next;
}
}
@@ -574,7 +438,7 @@ void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_c
}
else if (strcmp(k->string, "hpfd") == 0)
{
_HP_ASSIGN_INT(gctx->control_fd, elem);
__HP_ASSIGN_INT(gctx->control_fd, elem);
}
elem = elem->next;
@@ -587,11 +451,11 @@ static void *__hp_peer_message_thread_func(void *arg)
// Pubkey buf to hold the sender pubkey of the message that follows it.
bool has_pubkey = false;
char pubkey_buf[_HP_KEY_SIZE + 1];
char pubkey_buf[__HP_KEY_SIZE + 1];
memset(pubkey_buf, 0, sizeof(pubkey_buf));
// Buffer to hold current message.
uint8_t *msg_buf = malloc(_HP_SEQPKT_BUF_SIZE);
uint8_t *msg_buf = malloc(__HP_SEQPKT_BUF_SIZE);
struct pollfd pfd = {args->ctx->peers.fd, POLLIN, 0};
@@ -600,9 +464,9 @@ static void *__hp_peer_message_thread_func(void *arg)
// Reset poll fd because we are reusing it.
pfd.revents = 0;
if (poll(&pfd, 1, _HP_POLL_TIMEOUT) == -1)
if (poll(&pfd, 1, __HP_POLL_TIMEOUT) == -1)
{
perror("Peer channel poll error. ");
perror("Peer channel poll error");
goto error;
}
@@ -620,19 +484,19 @@ static void *__hp_peer_message_thread_func(void *arg)
// The read data alternates between the sender pubkey and the message.
if (!has_pubkey)
{
if (read(pfd.fd, pubkey_buf, _HP_KEY_SIZE) == -1)
if (read(pfd.fd, pubkey_buf, __HP_KEY_SIZE) == -1)
{
perror("Error reading pubkey from peer channel. ");
perror("Error reading pubkey from peer channel");
goto error;
}
has_pubkey = true;
}
else
{
const size_t read_res = read(pfd.fd, msg_buf, _HP_USER_BUF_SIZE);
const size_t read_res = read(pfd.fd, msg_buf, __HP_SEQPKT_BUF_SIZE);
if (read_res == -1)
{
perror("Error reading message from peer channel. ");
perror("Error reading message from peer channel");
goto error;
}
@@ -660,7 +524,7 @@ static void *__hp_control_message_thread_func(void *arg)
int result = 0;
// Temp buffer for all read operations.
uint8_t *buf = malloc(_HP_SEQPKT_BUF_SIZE);
uint8_t *buf = malloc(__HP_SEQPKT_BUF_SIZE);
struct pollfd pfd = {gctx.control_fd, POLLIN, 0};
@@ -669,9 +533,9 @@ static void *__hp_control_message_thread_func(void *arg)
// Reset poll fd because we are reusing it.
pfd.revents = 0;
if (poll(&pfd, 1, _HP_POLL_TIMEOUT) == -1)
if (poll(&pfd, 1, __HP_POLL_TIMEOUT) == -1)
{
perror("Control channel poll error. ");
perror("Control channel poll error");
goto error;
}
@@ -686,10 +550,10 @@ static void *__hp_control_message_thread_func(void *arg)
}
else if (result & POLLIN)
{
const size_t read_res = read(pfd.fd, buf, _HP_USER_BUF_SIZE);
const size_t read_res = read(pfd.fd, buf, __HP_SEQPKT_BUF_SIZE);
if (read_res == -1)
{
perror("Error reading control channel. ");
perror("Error reading control channel");
goto error;
}
@@ -713,4 +577,20 @@ void __hp_on_control_message(const void *buf, const uint32_t len)
// TODO: Handle control messages from hot pocket.
}
void __hp_free_contract_context(struct hp_contract_context *ctx)
{
if (ctx->users.list)
{
for (int i = 0; i < ctx->users.count; i++)
{
if (ctx->users.list[i].inputs)
free(ctx->users.list[i].inputs);
}
free(ctx->users.list);
}
if (ctx->peers.list)
free(ctx->peers.list);
}
#endif

View File

@@ -0,0 +1,73 @@
const { HotPocketClient, HotPocketKeyGenerator, HotPocketEvents } = require('./hp-client-lib');
async function main() {
const clientCount = 3;
const clients = [];
for (let i = 1; i <= clientCount; i++) {
clients.push(new RoboClient('wss://localhost:', 8081, i.toString()));
}
await Promise.all(clients.map(c => c.connect()));
console.log("Clients connected.");
await Promise.all(clients.map(c => c.sendInputs(["A", "B", "C"])));
console.log("Clients submitted.");
// await Promise.all(clients.map(c => c.disconnect()));
// console.log("Clients closed.");
}
function RoboClient(server, port, clientId) {
this.connect = async () => {
this.keys = await HotPocketKeyGenerator.generate();
this.hpclient = new HotPocketClient(server + port, this.keys);
if (!await this.hpclient.connect()) {
this.log('Connection failed.');
}
this.log('HotPocket Connected.');
// This will get fired if HP server disconnects unexpectedly.
this.hpclient.on(HotPocketEvents.disconnect, () => {
this.log('Server disconnected');
})
// This will get fired when contract sends an output.
this.hpclient.on(HotPocketEvents.contractOutput, (output) => {
this.log("Contract output>> " + Buffer.from(output, "hex"));
})
// This will get fired when contract sends a read response.
this.hpclient.on(HotPocketEvents.contractReadResponse, (response) => {
this.log("Contract read response>> " + Buffer.from(response, "hex"));
})
}
this.disconnect = async () => {
await this.hpclient.close();
}
this.sendInputs = async (inputs) => {
let idx = 1;
let tasks = [];
inputs.forEach(inp => {
const nonce = clientId.toString() + '-' + idx.toString();
tasks.push(this.hpclient.sendContractInput((clientId + inp), nonce).then(submissionStatus => {
if (submissionStatus && submissionStatus != "ok")
this.log("Input submission failed. reason: " + submissionStatus);
}));
idx++;
})
await Promise.all(tasks);
}
this.log = (text) => {
console.log(clientId + ": " + text)
}
}
main();

View File

@@ -1,4 +1,3 @@
const fs = require('fs');
const readline = require('readline');
const { exit } = require('process');
const { HotPocketClient, HotPocketKeyGenerator, HotPocketEvents } = require('./hp-client-lib');

View File

@@ -14,7 +14,8 @@ class HotPocketContract {
return;
// Parse HotPocket args.
const hpargs = JSON.parse(fs.readFileSync(0, 'utf8'));
const argsJson = fs.readFileSync(0, 'utf8');
const hpargs = JSON.parse(argsJson);
this.#controlChannel = new ControlChannel(hpargs.hpfd);
this.#executeContract(hpargs, contractFunc);
@@ -24,7 +25,7 @@ class HotPocketContract {
// Keeps track of all the tasks (promises) that must be awaited before the termination.
const pendingTasks = [];
const users = new UsersCollection(hpargs.usrfd, this.events);
const users = new UsersCollection(hpargs.userinfd, hpargs.users);
const peers = new PeersCollection(hpargs.readonly, hpargs.unl, hpargs.nplfd, pendingTasks, this.events);
const executionContext = new ContractExecutionContext(hpargs, users, peers);
@@ -67,21 +68,24 @@ class UsersCollection {
#users = {};
#totalUsers = 0;
constructor(usrfds, events) {
const userKeys = Object.keys(usrfds);
constructor(userInputsFd, usersObj) {
const users = Object.entries(usersObj);
userKeys.forEach((pubKey) => {
const channel = new UserChannel(usrfds[pubKey]);
users.forEach(([pubKey, arr]) => {
const outfd = arr[0]; // First array element is the output fd.
arr.splice(0, 1); // Remove first element (output fd). The rest are pairs of msg offset/length tuples.
const channel = new UserChannel(userInputsFd, outfd, arr);
const user = new User(pubKey, channel);
this.#users[pubKey] = {
user: user,
channel: channel
}
});
this.#totalUsers = userKeys.length;
events.on("session_end", () => Object.values(this.#users).forEach(u => u.channel.close()));
this.#totalUsers = users.length;
}
// Returns the User for the specified pubkey. Returns null if not found.
@@ -151,69 +155,26 @@ class User {
class UserChannel {
#readStream = null;
#fd = -1;
#infd = -1;
#outfd = -1;
#inputs = null;
constructor(fd) {
this.#fd = fd;
constructor(infd, outfd, inputs) {
this.#infd = infd;
this.#outfd = outfd;
this.#inputs = inputs;
}
consume(onMessage, onComplete) {
this.#readStream = fs.createReadStream(null, { fd: this.#fd });
let dataParts = [];
let remainingMsgCount = -1;
let currentMsgLen = -1;
let pos = 0;
// Read bytes from the given buffer.
const readBytes = (buf, pos, count) => {
if (pos + count > buf.byteLength)
return null;
return buf.slice(pos, pos + count);
// Each input is 2 element array of [offset, length].
for (const [offset, size] of this.#inputs) {
const buf = Buffer.alloc(size);
fs.readSync(this.#infd, buf, 0, size, offset);
onMessage(buf);
}
this.#readStream.on("data", (buf) => {
pos = 0;
if (remainingMsgCount == -1) {
const msgCountBuf = readBytes(buf, 0, 4)
remainingMsgCount = msgCountBuf.readUInt32BE();
pos += 4;
}
while (pos < buf.byteLength) {
if (currentMsgLen == -1) {
const msgLenBuf = readBytes(buf, pos, 4);
pos += 4;
currentMsgLen = msgLenBuf.readUInt32BE();
}
let possible_read_len;
if (((buf.byteLength - pos) - currentMsgLen) >= 0) {
// Can finish reading a full message.
possible_read_len = currentMsgLen;
currentMsgLen = -1;
} else {
// Only partial message is recieved.
possible_read_len = buf.byteLength - pos
currentMsgLen -= possible_read_len;
}
const msgBuf = readBytes(buf, pos, possible_read_len);
pos += possible_read_len;
dataParts.push(msgBuf)
if (currentMsgLen == -1) {
onMessage(Buffer.concat(dataParts));
dataParts = [];
remainingMsgCount--
}
}
if (remainingMsgCount == 0) {
remainingMsgCount = -1;
onComplete();
}
});
this.#readStream.on("error", (err) => { });
onComplete();
}
send(msg) {
@@ -221,11 +182,7 @@ class UserChannel {
let headerBuf = Buffer.alloc(4);
// Writing message length in big endian format.
headerBuf.writeUInt32BE(messageBuf.byteLength)
return writevAsync(this.#fd, [headerBuf, messageBuf]);
}
close() {
this.#readStream && this.#readStream.close();
return writevAsync(this.#outfd, [headerBuf, messageBuf]);
}
}

View File

@@ -162,7 +162,8 @@ namespace consensus
// Prepare the consensus candidate user inputs that we have acumulated so far. (We receive them periodically via NUPs)
// The candidate inputs will be included in the new round proposal.
verify_and_populate_candidate_user_inputs(lcl_seq_no);
if (verify_and_populate_candidate_user_inputs(lcl_seq_no) == -1)
return -1;
const p2p::proposal new_round_prop = create_new_round_proposal(lcl, state);
broadcast_proposal(new_round_prop);
@@ -385,7 +386,7 @@ namespace consensus
* Verifies the user signatures and populate non-expired user inputs from collected
* non-unl proposals (if any) into consensus candidate data.
*/
void verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no)
int verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no)
{
// Move over NUPs collected from the network into a local list.
std::list<p2p::nonunl_proposal> collected_nups;
@@ -433,17 +434,21 @@ namespace consensus
}
else
{
std::string hash, input;
util::buffer_view input;
std::string hash;
uint64_t max_lcl_seqno;
reject_reason = usr::validate_user_input_submission(pubkey, umsg, lcl_seq_no, total_input_len, recent_user_input_hashes,
hash, input, max_lcl_seqno);
if (input.is_null())
return -1;
if (reject_reason == NULL)
{
// No reject reason means we should go ahead and subject the input to consensus.
ctx.candidate_user_inputs.try_emplace(
hash,
candidate_user_input(pubkey, std::move(input), max_lcl_seqno));
candidate_user_input(pubkey, input, max_lcl_seqno));
}
else if (reject_reason == msg::usrmsg::REASON_APPBILL_BALANCE_EXCEEDED)
{
@@ -486,6 +491,8 @@ namespace consensus
}
}
}
return 0;
}
p2p::proposal create_new_round_proposal(std::string_view lcl, hpfs::h32 state)
@@ -732,7 +739,7 @@ namespace consensus
{
{
std::scoped_lock lock(ctx.contract_ctx_mutex);
ctx.contract_ctx.emplace();
ctx.contract_ctx.emplace(usr::input_store);
}
sc::contract_execution_args &args = ctx.contract_ctx->args;
@@ -742,7 +749,8 @@ namespace consensus
args.lcl = new_lcl;
// Populate user bufs.
feed_user_inputs_to_contract_bufmap(args.userbufs, cons_prop);
if (feed_user_inputs_to_contract_bufmap(args.userbufs, cons_prop) == -1)
return -1;
if (sc::execute_contract(ctx.contract_ctx.value()) == -1)
{
@@ -814,7 +822,7 @@ namespace consensus
* @param bufmap The contract bufmap which needs to be populated with inputs.
* @param cons_prop The proposal that achieved consensus.
*/
void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop)
int feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop)
{
// Populate the buf map with all currently connected users regardless of whether they have inputs or not.
// This is in case the contract wanted to emit some data to a user without needing any input.
@@ -830,26 +838,22 @@ namespace consensus
const bool hashfound = (itr != ctx.candidate_user_inputs.end());
if (!hashfound)
{
LOG_ERROR << "input required but wasn't in our candidate inputs map, this will potentially cause desync.";
// TODO: consider fatal
LOG_ERROR << "Input required but wasn't in our candidate inputs map, this will potentially cause desync.";
return -1;
}
else
{
// Populate the input content into the bufmap.
candidate_user_input &cand_input = itr->second;
std::string inputtofeed;
inputtofeed.swap(cand_input.input);
sc::contract_iobufs &bufs = bufmap[cand_input.userpubkey];
bufs.inputs.push_back(std::move(inputtofeed));
sc::contract_iobufs &contract_user = bufmap[cand_input.userpubkey];
contract_user.inputs.push_back(cand_input.input);
// Remove the input from the candidate set because we no longer need it.
//LOG_DEBUG << "candidate input deleted.";
ctx.candidate_user_inputs.erase(itr);
}
}
return 0;
}
/**

View File

@@ -3,6 +3,7 @@
#include "pchheader.hpp"
#include "util.hpp"
#include "util/buffer_store.hpp"
#include "sc.hpp"
#include "p2p/p2p.hpp"
#include "usr/user_input.hpp"
@@ -18,10 +19,10 @@ namespace consensus
{
const std::string userpubkey;
const uint64_t maxledgerseqno = 0;
std::string input;
const util::buffer_view input;
candidate_user_input(const std::string userpubkey, const std::string input, const uint64_t maxledgerseqno)
: userpubkey(std::move(userpubkey)), input(std::move(input)), maxledgerseqno(maxledgerseqno)
candidate_user_input(const std::string userpubkey, const util::buffer_view input, const uint64_t maxledgerseqno)
: userpubkey(std::move(userpubkey)), input(input), maxledgerseqno(maxledgerseqno)
{
}
};
@@ -105,7 +106,7 @@ namespace consensus
bool push_npl_message(p2p::npl_message &npl_message);
void verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no);
int verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no);
p2p::proposal create_new_round_proposal(std::string_view lcl, hpfs::h32 state);
@@ -127,7 +128,7 @@ namespace consensus
void dispatch_user_outputs(const p2p::proposal &cons_prop, const uint64_t lcl_seq_no, std::string_view lcl);
void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop);
int feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop);
void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap);

View File

@@ -31,6 +31,7 @@
#include <string>
#include <string_view>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/prctl.h>
#include <sys/socket.h>
#include <sys/stat.h>

View File

@@ -62,17 +62,13 @@ namespace sc
if (start_hpfs_session(ctx) == -1)
return -1;
// Setup user io sockets and feed all inputs to them.
create_iosockets_for_fdmap(ctx.userfds, ctx.args.userbufs);
create_iosockets_for_fdmap(ctx.userfds, ctx.args.userbufs); // User output socket.
create_iosockets(ctx.hpscfds, SOCK_SEQPACKET); // Control socket.
if (!ctx.args.readonly)
{
// Create sequential packet sockets for npl messages.
create_iosockets(ctx.nplfds, SOCK_SEQPACKET);
}
create_iosockets(ctx.nplfds, SOCK_SEQPACKET); // NPL socket.
// Create sequential packet sockets for hp messages.
create_iosockets(ctx.hpscfds, SOCK_SEQPACKET);
// Clone the user inputs fd to be passed on to the contract.
const int user_inputs_fd = dup(ctx.args.user_input_store.fd);
int ret = 0;
@@ -86,6 +82,7 @@ namespace sc
// Close all fds unused by HP process.
close_unused_fds(ctx, true);
close(user_inputs_fd);
// Start the contract monitor thread.
ctx.contract_monitor_thread = std::thread(contract_monitor_loop, std::ref(ctx));
@@ -104,8 +101,11 @@ namespace sc
// Close all fds unused by SC process.
close_unused_fds(ctx, false);
// Reset the seek position for the contract's copy of user inputs fd.
lseek(user_inputs_fd, 0, SEEK_SET);
// Write the contract input message from HotPocket to the stdin (0) of the contract process.
write_contract_args(ctx);
write_contract_args(ctx, user_inputs_fd);
const bool using_appbill = !ctx.args.readonly && !conf::cfg.appbill.empty();
int len = conf::cfg.runtime_binexec_args.size() + 1;
@@ -244,11 +244,12 @@ namespace sc
* "lcl": "<this node's last closed ledger seq no. and hash in hex>", (eg: 169-a1d82eb4c9ed005ec2c4f4f82b6f0c2fd7543d66b1a0f6b8e58ae670b3e2bcfb)
* "hpfd": fd,
* "nplfd":fd,
* "usrfd":{ "<pkhex>":fd, ... },
* "userinfd":fd, // User inputs fd.
* "users":{ "<pkhex>":[outfd, [msg1_off, msg1_len], ...], ... },
* "unl":[ "<pkhex>", ... ]
* }
*/
int write_contract_args(const execution_context &ctx)
int write_contract_args(const execution_context &ctx, const int user_inputs_fd)
{
// Populate the json string with contract args.
// We don't use a JSON parser here because it's lightweight to contrstuct the
@@ -267,9 +268,11 @@ namespace sc
}
os << ",\"hpfd\":" << ctx.hpscfds[SOCKETFDTYPE::SCREADWRITE];
os << ",\"usrfd\":{";
fdmap_json_to_stream(ctx.userfds, os);
os << ",\"userinfd\":" << user_inputs_fd
<< ",\"users\":{";
user_json_to_stream(ctx.userfds, ctx.args.userbufs, os);
os << "},\"unl\":[";
@@ -326,49 +329,41 @@ namespace sc
{
util::mask_signal();
// Write any user inputs to the contract.
if (write_contract_fdmap_inputs(ctx.userfds, ctx.args.userbufs) == -1)
while (!ctx.is_shutting_down)
{
LOG_ERROR << "Failed to write user inputs to contract.";
}
else
{
while (!ctx.is_shutting_down)
// Atempt to read messages from contract (regardless of contract terminated or not).
const int hpsc_read_res = read_contract_hp_outputs(ctx);
const int npl_read_res = ctx.args.readonly ? 0 : read_contract_npl_outputs(ctx);
const int user_read_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs);
if (ctx.termination_signaled || ctx.contract_pid == 0)
{
// Atempt to read messages from contract (regardless of contract terminated or not).
const int hpsc_read_res = read_contract_hp_outputs(ctx);
const int npl_read_res = ctx.args.readonly ? 0 : read_contract_npl_outputs(ctx);
const int user_read_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs);
if (ctx.termination_signaled || ctx.contract_pid == 0)
{
// If no bytes were read after contract finished execution, exit the loop.
// Otherwise keep running the loop becaue there might be further messages to read.
if ((hpsc_read_res + npl_read_res + user_read_res) == 0)
break;
}
else
{
// We assume contract is still running. Attempt to write any queued messages to the contract.
const int npl_write_res = ctx.args.readonly ? 0 : write_npl_messages(ctx);
if (npl_write_res == -1)
break;
const int hpsc_write_res = write_contract_hp_inputs(ctx);
if (hpsc_write_res == -1)
break;
// If no operation was performed during this iteration, wait for a small delay until the next iteration.
// This means there were no queued messages from either side.
if ((hpsc_read_res + npl_read_res + user_read_res + hpsc_write_res + hpsc_write_res) == 0)
util::sleep(20);
}
// Check if contract process has exited on its own during the loop.
if (ctx.contract_pid > 0)
check_contract_exited(ctx, false);
// If no bytes were read after contract finished execution, exit the loop.
// Otherwise keep running the loop becaue there might be further messages to read.
if ((hpsc_read_res + npl_read_res + user_read_res) == 0)
break;
}
else
{
// We assume contract is still running. Attempt to write any queued messages to the contract.
const int npl_write_res = ctx.args.readonly ? 0 : write_npl_messages(ctx);
if (npl_write_res == -1)
break;
const int hpsc_write_res = write_contract_hp_inputs(ctx);
if (hpsc_write_res == -1)
break;
// If no operation was performed during this iteration, wait for a small delay until the next iteration.
// This means there were no queued messages from either side.
if ((hpsc_read_res + npl_read_res + user_read_res + hpsc_write_res + hpsc_write_res) == 0)
util::sleep(20);
}
// Check if contract process has exited on its own during the loop.
if (ctx.contract_pid > 0)
check_contract_exited(ctx, false);
}
// Close all fds.
@@ -378,6 +373,11 @@ namespace sc
cleanup_vectorfds(fds);
ctx.userfds.clear();
// Purge any inputs we passed to the contract.
for (const auto &[pubkey, bufs] : ctx.args.userbufs)
for (const util::buffer_view &input : bufs.inputs)
ctx.args.user_input_store.purge(input);
// If we reach this point but the contract is still running, then we need to kill the contract by force.
// This can be the case if HP is shutting down, or there was an error in initial feeding of inputs.
if (ctx.contract_pid > 0)
@@ -527,29 +527,32 @@ namespace sc
}
}
/**
* Common helper function to write json output of fdmap to given ostream.
* @param fdmap Any pubkey->fdlist map. (eg. ctx.userfds)
* @param os An output stream.
*/
void fdmap_json_to_stream(const contract_fdmap_t &fdmap, std::ostringstream &os)
void user_json_to_stream(const contract_fdmap_t &user_fdmap, const contract_bufmap_t &user_bufmap, std::ostringstream &os)
{
for (auto itr = fdmap.begin(); itr != fdmap.end(); itr++)
for (auto itr = user_fdmap.begin(); itr != user_fdmap.end(); itr++)
{
if (itr != fdmap.begin())
if (itr != user_fdmap.begin())
os << ","; // Trailing comma separator for previous element.
// Get the hex pubkey.
std::string_view pubkey = itr->first; // Pubkey in binary format.
const std::string &pubkey = itr->first; // Pubkey in binary format.
std::string pubkeyhex;
util::bin2hex(
pubkeyhex,
reinterpret_cast<const unsigned char *>(pubkey.data()) + 1,
reinterpret_cast<const unsigned char *>(pubkey.data()) + 1, // Skip key type prefix.
pubkey.length() - 1);
// Write hex pubkey and fds.
os << "\"" << pubkeyhex << "\":"
const std::vector<util::buffer_view> &user_inputs = user_bufmap.find(pubkey)->second.inputs;
// Write hex pubkey as key and output fd as first element of array.
os << "\"" << pubkeyhex << "\":["
<< itr->second[SOCKETFDTYPE::SCREADWRITE];
// Write input offsets into the same array.
for (auto inp_itr = user_inputs.begin(); inp_itr != user_inputs.end(); inp_itr++)
os << ",[" << inp_itr->offset << "," << inp_itr->size << "]";
os << "]";
}
}
@@ -573,18 +576,6 @@ namespace sc
return 0;
}
int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap)
{
// Loop through input buffers for each pubkey.
for (auto &[pubkey, buflist] : bufmap)
{
if (write_iosocket_stream(fdmap[pubkey], buflist.inputs) == -1)
return -1;
}
return 0;
}
/**
* Common function to read all outputs produced by the contract process and store them in
* output buffers for later processing.
@@ -689,59 +680,6 @@ namespace sc
return 0;
}
/**
* Common function to write the given input buffer into the write fd from the HP side socket.
* @param fds Vector of fd list.
* @param inputs Buffer to write into the HP write fd.
*/
int write_iosocket_stream(std::vector<int> &fds, std::list<std::string> &inputs)
{
// Write the inputs (if any) into the contract.
const int writefd = fds[SOCKETFDTYPE::HPREADWRITE];
if (writefd == -1)
return 0;
bool write_error = false;
// Prepare the input memory segments to write with wrtiev.
// Extra one element for the header.
iovec memsegs[inputs.size() * 2 + 1];
uint8_t header[inputs.size() * 4 + 4];
header[0] = inputs.size() >> 24;
header[1] = inputs.size() >> 16;
header[2] = inputs.size() >> 8;
header[3] = inputs.size();
// Message count header.
memsegs[0].iov_base = header;
memsegs[0].iov_len = 4;
size_t i = 1;
for (std::string &input : inputs)
{
// 4 bytes for message len header.
const uint32_t len = input.length();
header[i * 4] = len >> 24;
header[i * 4 + 1] = len >> 16;
header[i * 4 + 2] = len >> 8;
header[i * 4 + 3] = len;
memsegs[i * 2 - 1].iov_base = &header[i * 4];
memsegs[i * 2 - 1].iov_len = 4;
memsegs[i * 2].iov_base = input.data();
memsegs[i * 2].iov_len = input.length();
i++;
}
if (writev(writefd, memsegs, (inputs.size() * 2 + 1)) == -1)
write_error = true;
inputs.clear();
if (write_error)
LOG_ERROR << errno << ": Error writing to stream socket.";
return write_error ? -1 : 0;
}
/**
* Common function to write the given input into the write fd from the HP side socket.
* @param fds Vector of fd list.

View File

@@ -5,6 +5,7 @@
#include "usr/usr.hpp"
#include "hpfs/h32.hpp"
#include "util.hpp"
#include "util/buffer_store.hpp"
#include "p2p/p2p.hpp"
/**
@@ -39,7 +40,7 @@ namespace sc
struct contract_iobufs
{
// List of inputs to be fed into the contract.
std::list<std::string> inputs;
std::vector<util::buffer_view> inputs;
// List of outputs from the contract.
std::list<contract_output> outputs;
@@ -68,6 +69,8 @@ namespace sc
// The value is a pair holding consensus-verified inputs and contract-generated outputs.
contract_bufmap_t userbufs;
util::buffer_store &user_input_store;
// NPL messages to be passed into contract.
moodycamel::ReaderWriterQueue<p2p::npl_message> npl_messages;
@@ -82,6 +85,10 @@ namespace sc
// State hash after execution will be copied to this (not applicable to read only mode).
hpfs::h32 post_execution_state_hash = hpfs::h32_empty;
contract_execution_args(util::buffer_store &user_input_store) : user_input_store(user_input_store)
{
}
};
/**
@@ -115,6 +122,10 @@ namespace sc
// Indicates that the deinit procedure has begun.
bool is_shutting_down = false;
execution_context(util::buffer_store &user_input_store) : args(user_input_store)
{
}
};
int init();
@@ -131,7 +142,7 @@ namespace sc
int stop_hpfs_session(execution_context &ctx);
int write_contract_args(const execution_context &ctx);
int write_contract_args(const execution_context &ctx, const int user_inputs_fd);
void contract_monitor_loop(execution_context &ctx);
@@ -147,20 +158,16 @@ namespace sc
// Common helper functions
void fdmap_json_to_stream(const contract_fdmap_t &fdmap, std::ostringstream &os);
void user_json_to_stream(const contract_fdmap_t &user_fdmap, const contract_bufmap_t &user_bufmap, std::ostringstream &os);
int create_iosockets_for_fdmap(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap);
int create_iosockets(std::vector<int> &fds, const int socket_type);
int write_iosocket_seq_packet(std::vector<int> &fds, std::string_view input);
int write_iosocket_stream(std::vector<int> &fds, std::list<std::string> &inputs);
int read_iosocket(const bool is_stream_socket, std::vector<int> &fds, std::string &output);
void close_unused_fds(execution_context &ctx, const bool is_hp);

View File

@@ -1,6 +1,7 @@
#include "../pchheader.hpp"
#include "../hplog.hpp"
#include "../util.hpp"
#include "../util/buffer_store.hpp"
#include "../conf.hpp"
#include "../msg/usrmsg_parser.hpp"
#include "usr.hpp"
@@ -17,6 +18,8 @@ namespace read_req
bool is_shutting_down = false;
bool init_success = false;
util::buffer_store read_req_store;
std::thread thread_pool_executor; // Thread which spawns new threads for the read requests is the queue.
std::vector<std::thread> read_req_threads;
moodycamel::ConcurrentQueue<user_read_req> read_req_queue(MAX_QUEUE_SIZE);
@@ -27,6 +30,9 @@ namespace read_req
int init()
{
if (read_req_store.init() == -1)
return -1;
thread_pool_executor = std::thread(manage_thread_pool);
init_success = true;
return 0;
@@ -51,6 +57,8 @@ namespace read_req
// Joining all read request processing threads.
for (std::thread &thread : read_req_threads)
thread.join();
read_req_store.deinit();
}
}
@@ -123,7 +131,7 @@ namespace read_req
{
{
// Contract context is added to the list for force kill if a SIGINT is received.
sc::execution_context contract_ctx;
sc::execution_context contract_ctx(read_req_store);
std::scoped_lock<std::mutex> execution_contract_lock(execution_contexts_mutex);
context_itr = execution_contexts.emplace(execution_contexts.begin(), std::move(contract_ctx));
}
@@ -191,13 +199,14 @@ namespace read_req
*/
int populate_read_req_queue(const std::string &pubkey, const std::string &content)
{
sc::execution_context contract_ctx;
user_read_req read_request;
read_request.content = std::move(content);
read_request.content = read_req_store.write_buf(content.data(), content.size());
read_request.pubkey = pubkey;
return read_req_queue.try_enqueue(read_request);
if (read_request.content.is_null())
return -1;
else
return read_req_queue.try_enqueue(read_request);
}
/**
@@ -213,7 +222,7 @@ namespace read_req
contract_ctx.args.state_dir.append("/rr_").append(std::to_string(thread_id));
contract_ctx.args.readonly = true;
sc::contract_iobufs user_bufs;
user_bufs.inputs.push_back(std::move(read_request.content));
user_bufs.inputs.push_back(read_request.content);
contract_ctx.args.userbufs.try_emplace(read_request.pubkey, std::move(user_bufs));
}

View File

@@ -1,14 +1,15 @@
#ifndef _HP_CONS_READ_REQ_
#define _HP_CONS_READ_REQ_
#ifndef _HP_USR_READ_REQ_
#define _HP_USR_READ_REQ_
#include "../sc.hpp"
#include "../util/buffer_store.hpp"
namespace read_req
{
struct user_read_req
{
std::string pubkey;
std::string content;
util::buffer_view content;
};
int init();

View File

@@ -7,6 +7,7 @@
#include "../crypto.hpp"
#include "../hplog.hpp"
#include "../ledger.hpp"
#include "../util/buffer_store.hpp"
#include "usr.hpp"
#include "user_session_handler.hpp"
#include "user_comm_session.hpp"
@@ -20,6 +21,7 @@ namespace usr
// Holds global connected-users and related objects.
connected_context ctx;
util::buffer_store input_store;
uint64_t metric_thresholds[5];
bool init_success = false;
@@ -30,11 +32,14 @@ namespace usr
int init()
{
metric_thresholds[0] = conf::cfg.pubmaxcpm;
metric_thresholds[1] = 0; // This metric doesn't apply to user context.
metric_thresholds[2] = 0; // This metric doesn't apply to user context.
metric_thresholds[1] = 0; // This metric doesn't apply to user context.
metric_thresholds[2] = 0; // This metric doesn't apply to user context.
metric_thresholds[3] = conf::cfg.pubmaxbadmpm;
metric_thresholds[4] = conf::cfg.pubidletimeout;
if (input_store.init() == -1)
return -1;
// Start listening for incoming user connections.
if (start_listening() == -1)
return -1;
@@ -49,7 +54,10 @@ namespace usr
void deinit()
{
if (init_success)
{
ctx.server->stop();
input_store.deinit();
}
}
/**
@@ -213,8 +221,8 @@ namespace usr
const util::PROTOCOL protocol = (protocol_code == "json" ? util::PROTOCOL::JSON : util::PROTOCOL::BSON);
session.mark_as_verified(); // Mark connection as a verified connection.
session.issued_challenge.clear(); // Remove the stored challenge
session.mark_as_verified(); // Mark connection as a verified connection.
session.issued_challenge.clear(); // Remove the stored challenge
session.uniqueid = pubkey;
// Add the user to the global authed user list
@@ -250,7 +258,7 @@ namespace usr
const char *validate_user_input_submission(const std::string_view user_pubkey, const usr::user_input &umsg,
const uint64_t lcl_seq_no, size_t &total_input_len,
util::rollover_hashset &recent_user_input_hashes,
std::string &hash, std::string &input, uint64_t &max_lcl_seqno)
std::string &hash, util::buffer_view &input, uint64_t &max_lcl_seqno)
{
const std::string sig_hash = crypto::get_hash(umsg.sig);
@@ -270,7 +278,9 @@ namespace usr
std::string nonce;
msg::usrmsg::usrmsg_parser parser(umsg.protocol);
parser.extract_input_container(input, nonce, max_lcl_seqno, umsg.input_container);
std::string input_data;
parser.extract_input_container(input_data, nonce, max_lcl_seqno, umsg.input_container);
// Ignore the input if our ledger has passed the input TTL.
if (max_lcl_seqno <= lcl_seq_no)
@@ -280,7 +290,7 @@ namespace usr
}
// Keep checking the subtotal of inputs extracted so far with the appbill account balance.
total_input_len += input.length();
total_input_len += input_data.length();
if (!verify_appbill_check(user_pubkey, total_input_len))
{
LOG_DEBUG << "User message app bill balance exceeded.";
@@ -292,6 +302,10 @@ namespace usr
// Append the hash of the message signature to get the final hash.
hash.append(sig_hash);
// Copy the input data into the input store.
std::string_view s();
input = input_store.write_buf(input_data.data(), input_data.size());
return NULL; // Success. No reject reason.
}

View File

@@ -3,6 +3,7 @@
#include "../pchheader.hpp"
#include "../util.hpp"
#include "../util/buffer_store.hpp"
#include "../msg/usrmsg_parser.hpp"
#include "user_comm_session.hpp"
#include "user_comm_server.hpp"
@@ -59,6 +60,7 @@ namespace usr
std::optional<usr::user_comm_server> server;
};
extern connected_context ctx;
extern util::buffer_store input_store;
int init();
@@ -80,7 +82,7 @@ namespace usr
const char *validate_user_input_submission(const std::string_view user_pubkey, const usr::user_input &umsg,
const uint64_t lcl_seq_no, size_t &total_input_len,
util::rollover_hashset &recent_user_input_hashes,
std::string &hash, std::string &input, uint64_t &max_lcl_seqno);
std::string &hash, util::buffer_view &input, uint64_t &max_lcl_seqno);
bool verify_appbill_check(std::string_view pubkey, const size_t input_len);

62
src/util/buffer_store.cpp Normal file
View File

@@ -0,0 +1,62 @@
#include "../pchheader.hpp"
#include "buffer_store.hpp"
// memfd block size to have clean hole punch so that allocated blocks are released properly.
#define BLOCK_SIZE 4096
#define BLOCK_ALIGN(x) (((x) + ((typeof(x))(BLOCK_SIZE)-1)) & ~((typeof(x))(BLOCK_SIZE)-1))
namespace util
{
int buffer_store::init()
{
int fd = memfd_create("buffer_store", MFD_CLOEXEC);
if (fd == -1)
{
LOG_ERROR << errno << ": Error creating buffer store memfd.";
return -1;
}
this->fd = fd;
return 0;
}
const buffer_view buffer_store::write_buf(const void *buf, const uint32_t size)
{
buffer_view view = {0, 0};
int res = pwrite(fd, buf, size, next_write_pos);
if (res < size)
{
LOG_ERROR << errno << ": Error writing to buffer store fd " << fd;
return view;
}
else
{
view.offset = next_write_pos;
view.size = size;
// Get nearest block offset that occurs after the just-written buffer.
next_write_pos += size;
next_write_pos = BLOCK_ALIGN(next_write_pos);
return view;
}
}
int buffer_store::purge(const buffer_view &buf)
{
const size_t purge_size = BLOCK_ALIGN(buf.size);
if (fallocate(fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, buf.offset, purge_size) == -1)
{
LOG_ERROR << errno << ": Error when purging buffer store fd " << fd;
return -1;
}
return 0;
}
void buffer_store::deinit()
{
if (fd > 0)
close(fd);
}
} // namespace util

35
src/util/buffer_store.hpp Normal file
View File

@@ -0,0 +1,35 @@
#ifndef _HP_UTIL_BUFFER_STORE_
#define _HP_UTIL_BUFFER_STORE_
#include "../pchheader.hpp"
namespace util
{
struct buffer_view
{
off_t offset;
uint32_t size;
bool is_null()
{
return !offset && !size;
}
};
class buffer_store
{
private:
off_t next_write_pos = 0;
public:
int fd;
int init();
const buffer_view write_buf(const void *buf, const uint32_t size);
int purge(const buffer_view &buf);
void deinit();
};
} // namespace util
#endif

View File

@@ -3,7 +3,7 @@
FROM node:12.18.3-buster-slim
RUN apt-get update
RUN apt-get install -y libgomp1 libssl-dev gdb
RUN apt-get install -y libgomp1 libssl-dev gdb valgrind
# Install shared libraries.
# Copy shared libraries and register it.