diff --git a/CMakeLists.txt b/CMakeLists.txt index 535948ad..32d28b70 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -25,6 +25,7 @@ add_executable(appbill add_executable(hpcore src/util.cpp + src/util/buffer_store.cpp src/crypto.cpp src/conf.cpp src/hplog.cpp diff --git a/examples/c_contract/echo_contract.c b/examples/c_contract/echo_contract.c index 5ed41203..d38fe22b 100644 --- a/examples/c_contract/echo_contract.c +++ b/examples/c_contract/echo_contract.c @@ -36,6 +36,7 @@ void echo_contract(const struct hp_contract_context *ctx) if (fd > 0) { char tsbuf[20]; + memset(tsbuf, 0, 20); sprintf(tsbuf, "%lu\n", ctx->timestamp); struct iovec vec[2] = {{"ts:", 4}, {(void *)tsbuf, 20}}; writev(fd, vec, 2); diff --git a/examples/c_contract/hotpocket_contract.h b/examples/c_contract/hotpocket_contract.h index 3a887905..500c5691 100644 --- a/examples/c_contract/hotpocket_contract.h +++ b/examples/c_contract/hotpocket_contract.h @@ -7,67 +7,77 @@ #include #include #include +#include #include #include "json.h" -#define _HP_KEY_SIZE 64 -#define _HP_HASH_SIZE 64 -#define _HP_MSG_HEADER_LEN 4 -#define _HP_USER_BUF_SIZE 4096 // Buffer used to read user message data. -#define _HP_SEQPKT_BUF_SIZE 131072 // 128KB to support SEQ_PACKET sockets. -#define _HP_POLL_TIMEOUT 20 +#define __HP_KEY_SIZE 64 +#define __HP_HASH_SIZE 64 +#define __HP_MSG_HEADER_LEN 4 +#define __HP_SEQPKT_BUF_SIZE 131072 // 128KB to support SEQ_PACKET sockets. +#define __HP_POLL_TIMEOUT 20 -#define _HP_MIN(a, b) ((a < b) ? a : b) +#define __HP_MMAP_BLOCK_SIZE 4096 +#define __HP_MMAP_BLOCK_ALIGN(x) (((x) + ((typeof(x))(__HP_MMAP_BLOCK_SIZE)-1)) & ~((typeof(x))(__HP_MMAP_BLOCK_SIZE)-1)) -#define _HP_ASSIGN_STRING(dest, elem) \ - if (elem->value->type == json_type_string) \ - { \ - struct json_string_s *value = (struct json_string_s *)elem->value->payload; \ - memcpy(dest, value->string, sizeof(dest)); \ +#define __HP_ASSIGN_STRING(dest, elem) \ + if (elem->value->type == json_type_string) \ + { \ + const struct json_string_s *value = (struct json_string_s *)elem->value->payload; \ + memcpy(dest, value->string, sizeof(dest)); \ } -#define _HP_ASSIGN_UINT64(dest, elem) \ - if (elem->value->type == json_type_number) \ - { \ - struct json_number_s *value = (struct json_number_s *)elem->value->payload; \ - dest = strtoull(value->number, NULL, 0); \ +#define __HP_ASSIGN_UINT64(dest, elem) \ + if (elem->value->type == json_type_number) \ + { \ + const struct json_number_s *value = (struct json_number_s *)elem->value->payload; \ + dest = strtoull(value->number, NULL, 0); \ } -#define _HP_ASSIGN_INT(dest, elem) \ - if (elem->value->type == json_type_number) \ - { \ - struct json_number_s *value = (struct json_number_s *)elem->value->payload; \ - dest = atoi(value->number); \ +#define __HP_ASSIGN_INT(dest, elem) \ + if (elem->value->type == json_type_number) \ + { \ + const struct json_number_s *value = (struct json_number_s *)elem->value->payload; \ + dest = atoi(value->number); \ } -#define _HP_ASSIGN_BOOL(dest, elem) \ +#define __HP_ASSIGN_BOOL(dest, elem) \ if (elem->value->type == json_type_true) \ dest = true; \ else if (elem->value->type == json_type_false) \ dest = false; -#define _HP_FROM_BE(buf, pos) \ +#define __HP_FROM_BE(buf, pos) \ ((uint8_t)buf[pos + 0] << 24 | (uint8_t)buf[pos + 1] << 16 | (uint8_t)buf[pos + 2] << 8 | (uint8_t)buf[pos + 3]) -#define _HP_TO_BE(num, buf, pos) \ - buf[pos] = num >> 24; \ - buf[1 + pos] = num >> 16; \ - buf[2 + pos] = num >> 8; \ +#define __HP_TO_BE(num, buf, pos) \ + buf[pos] = num >> 24; \ + buf[1 + pos] = num >> 16; \ + buf[2 + pos] = num >> 8; \ buf[3 + pos] = num; +struct hp_user_input +{ + off_t offset; + uint32_t size; +}; + struct hp_user { - char pubkey[_HP_KEY_SIZE + 1]; - int fd; + char pubkey[__HP_KEY_SIZE + 1]; + int outfd; + struct hp_user_input *inputs; + uint32_t inputs_count; }; struct hp_peer { - char pubkey[_HP_KEY_SIZE + 1]; + char pubkey[__HP_KEY_SIZE + 1]; }; struct hp_users_collection { + int infd; struct hp_user *list; size_t count; }; @@ -84,9 +94,9 @@ struct hp_contract_context bool readonly; uint64_t timestamp; - char pubkey[_HP_KEY_SIZE + 1]; + char pubkey[__HP_KEY_SIZE + 1]; - char lcl_hash[_HP_HASH_SIZE + 1]; + char lcl_hash[__HP_HASH_SIZE + 1]; uint64_t lcl_seq_no; struct hp_users_collection users; @@ -99,29 +109,11 @@ struct __hp_global_context bool should_stop; }; -struct __hp_user_state -{ - bool completed; // Whether we have finished processing all incoming messages for this user. - - uint32_t total_messages; // Total messages for the user. - uint32_t processed_messages; // No. of processed messages so far for the user. - bool total_messages_known; // Whether the total messages count has been set properly. - - uint8_t header_buf[_HP_MSG_HEADER_LEN]; // Header length buf (total msg count or msg size header). - uint8_t header_filled_len; // Current no. of header bytes collected so far. - - uint32_t msg_actual_len; // Actual(final) size of current message. - uint32_t msg_filled_len; // Current no. of message bytes collected so far. - uint8_t *msg_buf; // Buf holding the collected bytes for the current message. - - struct hp_user *user; // The user reference tracked by this state struct. -}; - typedef void (*hp_contract_func)(const struct hp_contract_context *ctx); typedef void (*hp_user_message_func)(const struct hp_contract_context *ctx, - const struct hp_user *user, const void *buf, const uint32_t len); + const struct hp_user *user, const void *buf, const uint32_t len); typedef void (*hp_peer_message_func)(const struct hp_contract_context *ctx, - const char *peerPubKey, const void *buf, const uint32_t len); + const char *peerPubKey, const void *buf, const uint32_t len); struct __hp_peer_message_thread_arg { @@ -140,11 +132,7 @@ int hp_peer_write(const struct hp_contract_context *ctx, const uint8_t *buf, con int hp_peer_writev(const struct hp_contract_context *ctx, const struct iovec *bufs, const int buf_count); void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_context *ctx, const struct json_object_s *object); - -void __hp_parse_user_chunk(const struct hp_contract_context *ctx, struct __hp_user_state *us, - const uint8_t *buf, const uint32_t len, hp_user_message_func on_user_message); -bool __hp_parse_length_header(struct __hp_user_state *us, const uint8_t *chunk, const uint32_t chunk_len, - uint32_t *chunk_pos, uint32_t *target); +void __hp_free_contract_context(struct hp_contract_context *ctx); static void *__hp_peer_message_thread_func(void *arg); @@ -180,7 +168,7 @@ int hp_init(hp_contract_func contract_func) // Start control channel listener. if (pthread_create(&__hp_control_thread, NULL, &__hp_control_message_thread_func, NULL) == -1) { - perror("Error creating control thread. "); + perror("Error creating control thread"); goto error; } @@ -199,15 +187,17 @@ int hp_init(hp_contract_func contract_func) __hp_control_thread = 0; // Cleanup. + close(ctx.users.infd); for (int i = 0; i < ctx.users.count; i++) - close(ctx.users.list[i].fd); + close(ctx.users.list[i].outfd); close(ctx.peers.fd); + __hp_free_contract_context(&ctx); + // Send termination control message. write(gctx.control_fd, "Terminated", 10); close(gctx.control_fd); - return 0; } } @@ -223,97 +213,39 @@ error: int hp_user_message_loop(const struct hp_contract_context *ctx, hp_user_message_func on_user_message) { int result = 0; + const int fd = ctx->users.infd; - // We poll user fds, control fd and npl fd (npl fd not available in read only mode) - const size_t total_users = ctx->users.count; - size_t remaining_users = total_users; - - // User states list to keep track of message collection status for each user. - struct __hp_user_state user_states[total_users]; - memset(user_states, 0, sizeof(struct __hp_user_state) * total_users); - - // Temp buffer for all read operations. - uint8_t *buf = malloc(_HP_USER_BUF_SIZE); - - // Create fd set to be polled. - struct pollfd pollfds[total_users]; - for (int i = 0; i < total_users; i++) + struct stat st; + if (fstat(fd, &st) == -1) { - pollfds[i].fd = ctx->users.list[i].fd; - pollfds[i].events = POLLIN; - pollfds[i].revents = 0; - - user_states[i].user = &ctx->users.list[i]; + perror("Error in user input fd stat"); + return -1; } - while (remaining_users > 0) + if (st.st_size == 0) + return 0; + + const size_t mmap_size = __HP_MMAP_BLOCK_ALIGN(st.st_size); + void *fdptr = mmap(NULL, mmap_size, PROT_READ, MAP_PRIVATE, fd, 0); + if (fdptr == MAP_FAILED) { - // Reset poll fd set because we are reusing it. - for (int i = 0; i < total_users; i++) - pollfds[0].revents = 0; + perror("Error in user input fd mmap"); + return -1; + } - if (poll(pollfds, total_users, _HP_POLL_TIMEOUT) == -1) + close(fd); // We can close the fd after mmap. + + for (int i = 0; i < ctx->users.count; i++) + { + const struct hp_user *user = &ctx->users.list[i]; + for (int j = 0; j < user->inputs_count; j++) { - perror("User poll error. "); - goto error; - } - - for (int i = 0; i < total_users; i++) - { - short result = pollfds[i].revents; - if (result == 0) - continue; - - if (result & (POLLHUP | POLLERR | POLLNVAL)) - { - fprintf(stderr, "User poll returned error.\n"); - goto error; - } - else if (result & POLLIN) - { - const size_t read_res = read(pollfds[i].fd, buf, _HP_USER_BUF_SIZE); - if (read_res == -1) - { - perror("Error reading user socket. "); - goto error; - } - - if (!user_states[i].completed) - { - // User sockets are stream sockets. So we have to do the message stitching ourselves based on - // total msg count and msg size headers sent over the stream. - __hp_parse_user_chunk(ctx, &user_states[i], buf, read_res, on_user_message); - - if (user_states[i].completed) - { - remaining_users--; - - // All users completed. - if (remaining_users == 0) - break; - } - } - } + const struct hp_user_input *input = &user->inputs[j]; + on_user_message(ctx, user, (fdptr + input->offset), input->size); } } - // If we reach here that means result is successful. - result = 0; - goto end; - -error: - // On error set result to -1. - result = -1; - -end: - for (int i = 0; i < total_users; i++) - { - if (user_states[i].msg_buf) - free(user_states[i].msg_buf); - } - - free(buf); - return result; + munmap(fdptr, mmap_size); } int hp_peer_message_listener(const struct hp_contract_context *ctx, hp_peer_message_func on_peer_message) @@ -330,7 +262,7 @@ int hp_peer_message_listener(const struct hp_contract_context *ctx, hp_peer_mess arg->on_peer_message = on_peer_message; if (pthread_create(&__hp_peer_thread, NULL, &__hp_peer_message_thread_func, arg) == -1) { - perror("Error creating peer thread. "); + perror("Error creating peer thread"); return -1; } @@ -356,20 +288,20 @@ int hp_user_writev(const struct hp_user *user, const struct iovec *bufs, const i msg_len += bufs[i].iov_len; } - uint8_t header_buf[_HP_MSG_HEADER_LEN]; - _HP_TO_BE(msg_len, header_buf, 0); + uint8_t header_buf[__HP_MSG_HEADER_LEN]; + __HP_TO_BE(msg_len, header_buf, 0); all_bufs[0].iov_base = header_buf; - all_bufs[0].iov_len = _HP_MSG_HEADER_LEN; + all_bufs[0].iov_len = __HP_MSG_HEADER_LEN; - return writev(user->fd, all_bufs, total_buf_count); + return writev(user->outfd, all_bufs, total_buf_count); } int hp_peer_write(const struct hp_contract_context *ctx, const uint8_t *buf, const uint32_t len) { - if (len > _HP_SEQPKT_BUF_SIZE) + if (len > __HP_SEQPKT_BUF_SIZE) { - fprintf(stderr, "Peer message exceeds max length %d.", _HP_SEQPKT_BUF_SIZE); + fprintf(stderr, "Peer message exceeds max length %d.", __HP_SEQPKT_BUF_SIZE); return -1; } @@ -382,138 +314,39 @@ int hp_peer_writev(const struct hp_contract_context *ctx, const struct iovec *bu for (int i = 0; i < buf_count; i++) len += bufs[i].iov_len; - if (len > _HP_SEQPKT_BUF_SIZE) + if (len > __HP_SEQPKT_BUF_SIZE) { - fprintf(stderr, "Peer message exceeds max length %d.", _HP_SEQPKT_BUF_SIZE); + fprintf(stderr, "Peer message exceeds max length %d.", __HP_SEQPKT_BUF_SIZE); return -1; } return writev(ctx->peers.fd, bufs, buf_count); } -void __hp_parse_user_chunk(const struct hp_contract_context *ctx, struct __hp_user_state *us, - const uint8_t *chunk, const uint32_t chunk_len, hp_user_message_func on_user_message) -{ - uint32_t pos = 0; - - if (!us->total_messages_known) - us->total_messages_known = __hp_parse_length_header(us, chunk, chunk_len, &pos, &us->total_messages); - - if (!us->total_messages_known) - return; - - if (us->total_messages == 0) - { - us->completed = true; - return; - } - - while (pos < chunk_len) - { - if (us->msg_actual_len == 0) - { - if (__hp_parse_length_header(us, chunk, chunk_len, &pos, &us->msg_actual_len) && us->msg_actual_len == 0) - { - // If we parse msg length=0, then abandon further processing for this user. - fprintf(stderr, "Message size 0 received for user.\n"); - us->completed = true; - return; - } - } - - // Going inside following 'if' means we know the current message length, and there are more data bytes to be read. - if (us->msg_actual_len > 0 && pos < chunk_len) - { - if (!us->msg_buf) - us->msg_buf = malloc(us->msg_actual_len); - - const uint32_t remaining_len = chunk_len - pos; - const uint32_t msg_bytes_to_copy = _HP_MIN(remaining_len, (us->msg_actual_len - us->msg_filled_len)); - - memcpy(us->msg_buf + us->msg_filled_len, (chunk + pos), msg_bytes_to_copy); - us->msg_filled_len += msg_bytes_to_copy; - pos += msg_bytes_to_copy; - - // See whether we just completed forming a full message. - if (us->msg_filled_len == us->msg_actual_len) - { - // Execute on_message func with msg_buf. - on_user_message(ctx, us->user, us->msg_buf, us->msg_actual_len); - - // Reset message construction. - free(us->msg_buf); - us->msg_buf = NULL; - us->msg_actual_len = 0; - us->msg_filled_len = 0; - us->processed_messages++; - - if (us->processed_messages == us->total_messages) - { - us->completed = true; - return; - } - } - } - } -} - -bool __hp_parse_length_header(struct __hp_user_state *us, const uint8_t *chunk, const uint32_t chunk_len, - uint32_t *chunk_pos, uint32_t *target) -{ - uint32_t pos = *chunk_pos; - const uint32_t remaining_len = chunk_len - pos; - - // See if we can detect complete length header without the help of the header buffer. - if (remaining_len >= _HP_MSG_HEADER_LEN && us->header_filled_len == 0) - { - *target = _HP_FROM_BE(chunk, pos); - *chunk_pos = pos + _HP_MSG_HEADER_LEN; - return true; - } - else - { - const uint32_t header_bytes_to_copy = _HP_MIN(remaining_len, (_HP_MSG_HEADER_LEN - us->header_filled_len)); - - memcpy(us->header_buf + us->header_filled_len, (chunk + pos), header_bytes_to_copy); - us->header_filled_len += header_bytes_to_copy; - *chunk_pos = pos + header_bytes_to_copy; - - // See whether we can now read length value after new bytes where added to the header. - if (us->header_filled_len == _HP_MSG_HEADER_LEN) - { - *target = _HP_FROM_BE(us->header_buf, 0); - us->header_filled_len = 0; - return true; - } - } - - return false; // Couldn't detect the length header with available bytes. -} - void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_context *ctx, const struct json_object_s *object) { - struct json_object_element_s *elem = object->start; + const struct json_object_element_s *elem = object->start; do { - struct json_string_s *k = elem->name; + const struct json_string_s *k = elem->name; if (strcmp(k->string, "pubkey") == 0) { - _HP_ASSIGN_STRING(ctx->pubkey, elem); + __HP_ASSIGN_STRING(ctx->pubkey, elem); } else if (strcmp(k->string, "ts") == 0) { - _HP_ASSIGN_UINT64(ctx->timestamp, elem); + __HP_ASSIGN_UINT64(ctx->timestamp, elem); } else if (strcmp(k->string, "readonly") == 0) { - _HP_ASSIGN_BOOL(ctx->readonly, elem); + __HP_ASSIGN_BOOL(ctx->readonly, elem); } else if (strcmp(k->string, "lcl") == 0) { if (elem->value->type == json_type_string) { - struct json_string_s *value = (struct json_string_s *)elem->value->payload; + const struct json_string_s *value = (struct json_string_s *)elem->value->payload; const char *delim = "-"; char *tok_ptr; char *tok_str = strdup(value->string); @@ -521,14 +354,19 @@ void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_c const char *hash_str = strtok_r(NULL, delim, &tok_ptr); ctx->lcl_seq_no = strtoull(seq_str, NULL, 0); - memcpy(ctx->lcl_hash, hash_str, _HP_HASH_SIZE); + memcpy(ctx->lcl_hash, hash_str, __HP_HASH_SIZE); + free(tok_str); } } - else if (strcmp(k->string, "usrfd") == 0) + else if (strcmp(k->string, "userinfd") == 0) + { + __HP_ASSIGN_INT(ctx->users.infd, elem); + } + else if (strcmp(k->string, "users") == 0) { if (elem->value->type == json_type_object) { - struct json_object_s *user_object = (struct json_object_s *)elem->value->payload; + const struct json_object_s *user_object = (struct json_object_s *)elem->value->payload; const size_t user_count = user_object->length; ctx->users.count = user_count; @@ -539,9 +377,35 @@ void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_c struct json_object_element_s *user_elem = user_object->start; for (int i = 0; i < user_count; i++) { - memcpy(ctx->users.list[i].pubkey, user_elem->name->string, _HP_KEY_SIZE); - _HP_ASSIGN_INT(ctx->users.list[i].fd, user_elem); + struct hp_user *user = &ctx->users.list[i]; + memcpy(user->pubkey, user_elem->name->string, __HP_KEY_SIZE); + if (user_elem->value->type == json_type_array) + { + const struct json_array_s *arr = (struct json_array_s *)user_elem->value->payload; + struct json_array_element_s *arr_elem = arr->start; + + // First element is the output fd. + __HP_ASSIGN_INT(user->outfd, arr_elem); + arr_elem = arr_elem->next; + + // Subsequent elements are tupels of [offset, size] of input messages for this user. + user->inputs_count = arr->length - 1; + user->inputs = user->inputs_count ? malloc(user->inputs_count * sizeof(struct hp_user_input)) : NULL; + for (int i = 0; i < user->inputs_count; i++) + { + if (arr_elem->value->type == json_type_array) + { + const struct json_array_s *input_info = (struct json_array_s *)arr_elem->value->payload; + if (input_info->length == 2) + { + __HP_ASSIGN_UINT64(user->inputs[i].offset, input_info->start); + __HP_ASSIGN_UINT64(user->inputs[i].size, input_info->start->next); + } + } + arr_elem = arr_elem->next; + } + } user_elem = user_elem->next; } } @@ -549,13 +413,13 @@ void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_c } else if (strcmp(k->string, "nplfd") == 0) { - _HP_ASSIGN_INT(ctx->peers.fd, elem); + __HP_ASSIGN_INT(ctx->peers.fd, elem); } else if (strcmp(k->string, "unl") == 0) { if (elem->value->type == json_type_array) { - struct json_array_s *peer_array = (struct json_array_s *)elem->value->payload; + const struct json_array_s *peer_array = (struct json_array_s *)elem->value->payload; const size_t peer_count = peer_array->length; ctx->peers.count = peer_count; @@ -566,7 +430,7 @@ void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_c struct json_array_element_s *peer_elem = peer_array->start; for (int i = 0; i < peer_count; i++) { - _HP_ASSIGN_STRING(ctx->peers.list[i].pubkey, peer_elem); + __HP_ASSIGN_STRING(ctx->peers.list[i].pubkey, peer_elem); peer_elem = peer_elem->next; } } @@ -574,7 +438,7 @@ void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_c } else if (strcmp(k->string, "hpfd") == 0) { - _HP_ASSIGN_INT(gctx->control_fd, elem); + __HP_ASSIGN_INT(gctx->control_fd, elem); } elem = elem->next; @@ -587,11 +451,11 @@ static void *__hp_peer_message_thread_func(void *arg) // Pubkey buf to hold the sender pubkey of the message that follows it. bool has_pubkey = false; - char pubkey_buf[_HP_KEY_SIZE + 1]; + char pubkey_buf[__HP_KEY_SIZE + 1]; memset(pubkey_buf, 0, sizeof(pubkey_buf)); // Buffer to hold current message. - uint8_t *msg_buf = malloc(_HP_SEQPKT_BUF_SIZE); + uint8_t *msg_buf = malloc(__HP_SEQPKT_BUF_SIZE); struct pollfd pfd = {args->ctx->peers.fd, POLLIN, 0}; @@ -600,9 +464,9 @@ static void *__hp_peer_message_thread_func(void *arg) // Reset poll fd because we are reusing it. pfd.revents = 0; - if (poll(&pfd, 1, _HP_POLL_TIMEOUT) == -1) + if (poll(&pfd, 1, __HP_POLL_TIMEOUT) == -1) { - perror("Peer channel poll error. "); + perror("Peer channel poll error"); goto error; } @@ -620,19 +484,19 @@ static void *__hp_peer_message_thread_func(void *arg) // The read data alternates between the sender pubkey and the message. if (!has_pubkey) { - if (read(pfd.fd, pubkey_buf, _HP_KEY_SIZE) == -1) + if (read(pfd.fd, pubkey_buf, __HP_KEY_SIZE) == -1) { - perror("Error reading pubkey from peer channel. "); + perror("Error reading pubkey from peer channel"); goto error; } has_pubkey = true; } else { - const size_t read_res = read(pfd.fd, msg_buf, _HP_USER_BUF_SIZE); + const size_t read_res = read(pfd.fd, msg_buf, __HP_SEQPKT_BUF_SIZE); if (read_res == -1) { - perror("Error reading message from peer channel. "); + perror("Error reading message from peer channel"); goto error; } @@ -660,7 +524,7 @@ static void *__hp_control_message_thread_func(void *arg) int result = 0; // Temp buffer for all read operations. - uint8_t *buf = malloc(_HP_SEQPKT_BUF_SIZE); + uint8_t *buf = malloc(__HP_SEQPKT_BUF_SIZE); struct pollfd pfd = {gctx.control_fd, POLLIN, 0}; @@ -669,9 +533,9 @@ static void *__hp_control_message_thread_func(void *arg) // Reset poll fd because we are reusing it. pfd.revents = 0; - if (poll(&pfd, 1, _HP_POLL_TIMEOUT) == -1) + if (poll(&pfd, 1, __HP_POLL_TIMEOUT) == -1) { - perror("Control channel poll error. "); + perror("Control channel poll error"); goto error; } @@ -686,10 +550,10 @@ static void *__hp_control_message_thread_func(void *arg) } else if (result & POLLIN) { - const size_t read_res = read(pfd.fd, buf, _HP_USER_BUF_SIZE); + const size_t read_res = read(pfd.fd, buf, __HP_SEQPKT_BUF_SIZE); if (read_res == -1) { - perror("Error reading control channel. "); + perror("Error reading control channel"); goto error; } @@ -713,4 +577,20 @@ void __hp_on_control_message(const void *buf, const uint32_t len) // TODO: Handle control messages from hot pocket. } +void __hp_free_contract_context(struct hp_contract_context *ctx) +{ + if (ctx->users.list) + { + for (int i = 0; i < ctx->users.count; i++) + { + if (ctx->users.list[i].inputs) + free(ctx->users.list[i].inputs); + } + free(ctx->users.list); + } + + if (ctx->peers.list) + free(ctx->peers.list); +} + #endif \ No newline at end of file diff --git a/examples/nodejs_client/multi-client.js b/examples/nodejs_client/multi-client.js new file mode 100644 index 00000000..eaad14fe --- /dev/null +++ b/examples/nodejs_client/multi-client.js @@ -0,0 +1,73 @@ +const { HotPocketClient, HotPocketKeyGenerator, HotPocketEvents } = require('./hp-client-lib'); + +async function main() { + + const clientCount = 3; + const clients = []; + for (let i = 1; i <= clientCount; i++) { + clients.push(new RoboClient('wss://localhost:', 8081, i.toString())); + } + + await Promise.all(clients.map(c => c.connect())); + console.log("Clients connected."); + + await Promise.all(clients.map(c => c.sendInputs(["A", "B", "C"]))); + console.log("Clients submitted."); + + // await Promise.all(clients.map(c => c.disconnect())); + // console.log("Clients closed."); +} + +function RoboClient(server, port, clientId) { + + this.connect = async () => { + this.keys = await HotPocketKeyGenerator.generate(); + this.hpclient = new HotPocketClient(server + port, this.keys); + + + if (!await this.hpclient.connect()) { + this.log('Connection failed.'); + } + this.log('HotPocket Connected.'); + + // This will get fired if HP server disconnects unexpectedly. + this.hpclient.on(HotPocketEvents.disconnect, () => { + this.log('Server disconnected'); + }) + + // This will get fired when contract sends an output. + this.hpclient.on(HotPocketEvents.contractOutput, (output) => { + this.log("Contract output>> " + Buffer.from(output, "hex")); + }) + + // This will get fired when contract sends a read response. + this.hpclient.on(HotPocketEvents.contractReadResponse, (response) => { + this.log("Contract read response>> " + Buffer.from(response, "hex")); + }) + } + + this.disconnect = async () => { + await this.hpclient.close(); + } + + this.sendInputs = async (inputs) => { + + let idx = 1; + let tasks = []; + inputs.forEach(inp => { + const nonce = clientId.toString() + '-' + idx.toString(); + tasks.push(this.hpclient.sendContractInput((clientId + inp), nonce).then(submissionStatus => { + if (submissionStatus && submissionStatus != "ok") + this.log("Input submission failed. reason: " + submissionStatus); + })); + idx++; + }) + await Promise.all(tasks); + } + + this.log = (text) => { + console.log(clientId + ": " + text) + } +} + +main(); \ No newline at end of file diff --git a/examples/nodejs_client/text-client.js b/examples/nodejs_client/text-client.js index e6787b33..9b0aba7a 100644 --- a/examples/nodejs_client/text-client.js +++ b/examples/nodejs_client/text-client.js @@ -1,4 +1,3 @@ -const fs = require('fs'); const readline = require('readline'); const { exit } = require('process'); const { HotPocketClient, HotPocketKeyGenerator, HotPocketEvents } = require('./hp-client-lib'); diff --git a/examples/nodejs_contract/hp-contract-lib.js b/examples/nodejs_contract/hp-contract-lib.js index fb1b2721..a66c6f39 100644 --- a/examples/nodejs_contract/hp-contract-lib.js +++ b/examples/nodejs_contract/hp-contract-lib.js @@ -14,7 +14,8 @@ class HotPocketContract { return; // Parse HotPocket args. - const hpargs = JSON.parse(fs.readFileSync(0, 'utf8')); + const argsJson = fs.readFileSync(0, 'utf8'); + const hpargs = JSON.parse(argsJson); this.#controlChannel = new ControlChannel(hpargs.hpfd); this.#executeContract(hpargs, contractFunc); @@ -24,7 +25,7 @@ class HotPocketContract { // Keeps track of all the tasks (promises) that must be awaited before the termination. const pendingTasks = []; - const users = new UsersCollection(hpargs.usrfd, this.events); + const users = new UsersCollection(hpargs.userinfd, hpargs.users); const peers = new PeersCollection(hpargs.readonly, hpargs.unl, hpargs.nplfd, pendingTasks, this.events); const executionContext = new ContractExecutionContext(hpargs, users, peers); @@ -67,21 +68,24 @@ class UsersCollection { #users = {}; #totalUsers = 0; - constructor(usrfds, events) { - const userKeys = Object.keys(usrfds); + constructor(userInputsFd, usersObj) { + const users = Object.entries(usersObj); - userKeys.forEach((pubKey) => { - const channel = new UserChannel(usrfds[pubKey]); + users.forEach(([pubKey, arr]) => { + + const outfd = arr[0]; // First array element is the output fd. + arr.splice(0, 1); // Remove first element (output fd). The rest are pairs of msg offset/length tuples. + + const channel = new UserChannel(userInputsFd, outfd, arr); const user = new User(pubKey, channel); + this.#users[pubKey] = { user: user, channel: channel } }); - this.#totalUsers = userKeys.length; - - events.on("session_end", () => Object.values(this.#users).forEach(u => u.channel.close())); + this.#totalUsers = users.length; } // Returns the User for the specified pubkey. Returns null if not found. @@ -151,69 +155,26 @@ class User { class UserChannel { #readStream = null; - #fd = -1; + #infd = -1; + #outfd = -1; + #inputs = null; - constructor(fd) { - this.#fd = fd; + constructor(infd, outfd, inputs) { + this.#infd = infd; + this.#outfd = outfd; + this.#inputs = inputs; } consume(onMessage, onComplete) { - this.#readStream = fs.createReadStream(null, { fd: this.#fd }); - let dataParts = []; - let remainingMsgCount = -1; - let currentMsgLen = -1; - let pos = 0; - - // Read bytes from the given buffer. - const readBytes = (buf, pos, count) => { - if (pos + count > buf.byteLength) - return null; - return buf.slice(pos, pos + count); + // Each input is 2 element array of [offset, length]. + for (const [offset, size] of this.#inputs) { + const buf = Buffer.alloc(size); + fs.readSync(this.#infd, buf, 0, size, offset); + onMessage(buf); } - this.#readStream.on("data", (buf) => { - pos = 0; - if (remainingMsgCount == -1) { - const msgCountBuf = readBytes(buf, 0, 4) - remainingMsgCount = msgCountBuf.readUInt32BE(); - pos += 4; - } - - while (pos < buf.byteLength) { - if (currentMsgLen == -1) { - const msgLenBuf = readBytes(buf, pos, 4); - pos += 4; - currentMsgLen = msgLenBuf.readUInt32BE(); - } - let possible_read_len; - if (((buf.byteLength - pos) - currentMsgLen) >= 0) { - // Can finish reading a full message. - possible_read_len = currentMsgLen; - currentMsgLen = -1; - } else { - // Only partial message is recieved. - possible_read_len = buf.byteLength - pos - currentMsgLen -= possible_read_len; - } - const msgBuf = readBytes(buf, pos, possible_read_len); - pos += possible_read_len; - dataParts.push(msgBuf) - - if (currentMsgLen == -1) { - onMessage(Buffer.concat(dataParts)); - dataParts = []; - remainingMsgCount-- - } - } - - if (remainingMsgCount == 0) { - remainingMsgCount = -1; - onComplete(); - } - }); - - this.#readStream.on("error", (err) => { }); + onComplete(); } send(msg) { @@ -221,11 +182,7 @@ class UserChannel { let headerBuf = Buffer.alloc(4); // Writing message length in big endian format. headerBuf.writeUInt32BE(messageBuf.byteLength) - return writevAsync(this.#fd, [headerBuf, messageBuf]); - } - - close() { - this.#readStream && this.#readStream.close(); + return writevAsync(this.#outfd, [headerBuf, messageBuf]); } } diff --git a/src/consensus.cpp b/src/consensus.cpp index 6c462515..57369130 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -162,7 +162,8 @@ namespace consensus // Prepare the consensus candidate user inputs that we have acumulated so far. (We receive them periodically via NUPs) // The candidate inputs will be included in the new round proposal. - verify_and_populate_candidate_user_inputs(lcl_seq_no); + if (verify_and_populate_candidate_user_inputs(lcl_seq_no) == -1) + return -1; const p2p::proposal new_round_prop = create_new_round_proposal(lcl, state); broadcast_proposal(new_round_prop); @@ -385,7 +386,7 @@ namespace consensus * Verifies the user signatures and populate non-expired user inputs from collected * non-unl proposals (if any) into consensus candidate data. */ - void verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no) + int verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no) { // Move over NUPs collected from the network into a local list. std::list collected_nups; @@ -433,17 +434,21 @@ namespace consensus } else { - std::string hash, input; + util::buffer_view input; + std::string hash; uint64_t max_lcl_seqno; reject_reason = usr::validate_user_input_submission(pubkey, umsg, lcl_seq_no, total_input_len, recent_user_input_hashes, hash, input, max_lcl_seqno); + if (input.is_null()) + return -1; + if (reject_reason == NULL) { // No reject reason means we should go ahead and subject the input to consensus. ctx.candidate_user_inputs.try_emplace( hash, - candidate_user_input(pubkey, std::move(input), max_lcl_seqno)); + candidate_user_input(pubkey, input, max_lcl_seqno)); } else if (reject_reason == msg::usrmsg::REASON_APPBILL_BALANCE_EXCEEDED) { @@ -486,6 +491,8 @@ namespace consensus } } } + + return 0; } p2p::proposal create_new_round_proposal(std::string_view lcl, hpfs::h32 state) @@ -732,7 +739,7 @@ namespace consensus { { std::scoped_lock lock(ctx.contract_ctx_mutex); - ctx.contract_ctx.emplace(); + ctx.contract_ctx.emplace(usr::input_store); } sc::contract_execution_args &args = ctx.contract_ctx->args; @@ -742,7 +749,8 @@ namespace consensus args.lcl = new_lcl; // Populate user bufs. - feed_user_inputs_to_contract_bufmap(args.userbufs, cons_prop); + if (feed_user_inputs_to_contract_bufmap(args.userbufs, cons_prop) == -1) + return -1; if (sc::execute_contract(ctx.contract_ctx.value()) == -1) { @@ -814,7 +822,7 @@ namespace consensus * @param bufmap The contract bufmap which needs to be populated with inputs. * @param cons_prop The proposal that achieved consensus. */ - void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop) + int feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop) { // Populate the buf map with all currently connected users regardless of whether they have inputs or not. // This is in case the contract wanted to emit some data to a user without needing any input. @@ -830,26 +838,22 @@ namespace consensus const bool hashfound = (itr != ctx.candidate_user_inputs.end()); if (!hashfound) { - LOG_ERROR << "input required but wasn't in our candidate inputs map, this will potentially cause desync."; - // TODO: consider fatal + LOG_ERROR << "Input required but wasn't in our candidate inputs map, this will potentially cause desync."; + return -1; } else { // Populate the input content into the bufmap. - candidate_user_input &cand_input = itr->second; - - std::string inputtofeed; - inputtofeed.swap(cand_input.input); - - sc::contract_iobufs &bufs = bufmap[cand_input.userpubkey]; - bufs.inputs.push_back(std::move(inputtofeed)); + sc::contract_iobufs &contract_user = bufmap[cand_input.userpubkey]; + contract_user.inputs.push_back(cand_input.input); // Remove the input from the candidate set because we no longer need it. - //LOG_DEBUG << "candidate input deleted."; ctx.candidate_user_inputs.erase(itr); } } + + return 0; } /** diff --git a/src/consensus.hpp b/src/consensus.hpp index ae039165..c3eb3ff9 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -3,6 +3,7 @@ #include "pchheader.hpp" #include "util.hpp" +#include "util/buffer_store.hpp" #include "sc.hpp" #include "p2p/p2p.hpp" #include "usr/user_input.hpp" @@ -18,10 +19,10 @@ namespace consensus { const std::string userpubkey; const uint64_t maxledgerseqno = 0; - std::string input; + const util::buffer_view input; - candidate_user_input(const std::string userpubkey, const std::string input, const uint64_t maxledgerseqno) - : userpubkey(std::move(userpubkey)), input(std::move(input)), maxledgerseqno(maxledgerseqno) + candidate_user_input(const std::string userpubkey, const util::buffer_view input, const uint64_t maxledgerseqno) + : userpubkey(std::move(userpubkey)), input(input), maxledgerseqno(maxledgerseqno) { } }; @@ -105,7 +106,7 @@ namespace consensus bool push_npl_message(p2p::npl_message &npl_message); - void verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no); + int verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no); p2p::proposal create_new_round_proposal(std::string_view lcl, hpfs::h32 state); @@ -127,7 +128,7 @@ namespace consensus void dispatch_user_outputs(const p2p::proposal &cons_prop, const uint64_t lcl_seq_no, std::string_view lcl); - void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop); + int feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop); void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap); diff --git a/src/pchheader.hpp b/src/pchheader.hpp index bb46fd16..9d2ba4f5 100644 --- a/src/pchheader.hpp +++ b/src/pchheader.hpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include diff --git a/src/sc.cpp b/src/sc.cpp index 409a68f1..0348cec5 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -62,17 +62,13 @@ namespace sc if (start_hpfs_session(ctx) == -1) return -1; - // Setup user io sockets and feed all inputs to them. - create_iosockets_for_fdmap(ctx.userfds, ctx.args.userbufs); - + create_iosockets_for_fdmap(ctx.userfds, ctx.args.userbufs); // User output socket. + create_iosockets(ctx.hpscfds, SOCK_SEQPACKET); // Control socket. if (!ctx.args.readonly) - { - // Create sequential packet sockets for npl messages. - create_iosockets(ctx.nplfds, SOCK_SEQPACKET); - } + create_iosockets(ctx.nplfds, SOCK_SEQPACKET); // NPL socket. - // Create sequential packet sockets for hp messages. - create_iosockets(ctx.hpscfds, SOCK_SEQPACKET); + // Clone the user inputs fd to be passed on to the contract. + const int user_inputs_fd = dup(ctx.args.user_input_store.fd); int ret = 0; @@ -86,6 +82,7 @@ namespace sc // Close all fds unused by HP process. close_unused_fds(ctx, true); + close(user_inputs_fd); // Start the contract monitor thread. ctx.contract_monitor_thread = std::thread(contract_monitor_loop, std::ref(ctx)); @@ -104,8 +101,11 @@ namespace sc // Close all fds unused by SC process. close_unused_fds(ctx, false); + // Reset the seek position for the contract's copy of user inputs fd. + lseek(user_inputs_fd, 0, SEEK_SET); + // Write the contract input message from HotPocket to the stdin (0) of the contract process. - write_contract_args(ctx); + write_contract_args(ctx, user_inputs_fd); const bool using_appbill = !ctx.args.readonly && !conf::cfg.appbill.empty(); int len = conf::cfg.runtime_binexec_args.size() + 1; @@ -244,11 +244,12 @@ namespace sc * "lcl": "", (eg: 169-a1d82eb4c9ed005ec2c4f4f82b6f0c2fd7543d66b1a0f6b8e58ae670b3e2bcfb) * "hpfd": fd, * "nplfd":fd, - * "usrfd":{ "":fd, ... }, + * "userinfd":fd, // User inputs fd. + * "users":{ "":[outfd, [msg1_off, msg1_len], ...], ... }, * "unl":[ "", ... ] * } */ - int write_contract_args(const execution_context &ctx) + int write_contract_args(const execution_context &ctx, const int user_inputs_fd) { // Populate the json string with contract args. // We don't use a JSON parser here because it's lightweight to contrstuct the @@ -267,9 +268,11 @@ namespace sc } os << ",\"hpfd\":" << ctx.hpscfds[SOCKETFDTYPE::SCREADWRITE]; - os << ",\"usrfd\":{"; - fdmap_json_to_stream(ctx.userfds, os); + os << ",\"userinfd\":" << user_inputs_fd + << ",\"users\":{"; + + user_json_to_stream(ctx.userfds, ctx.args.userbufs, os); os << "},\"unl\":["; @@ -326,49 +329,41 @@ namespace sc { util::mask_signal(); - // Write any user inputs to the contract. - if (write_contract_fdmap_inputs(ctx.userfds, ctx.args.userbufs) == -1) + while (!ctx.is_shutting_down) { - LOG_ERROR << "Failed to write user inputs to contract."; - } - else - { - while (!ctx.is_shutting_down) + // Atempt to read messages from contract (regardless of contract terminated or not). + const int hpsc_read_res = read_contract_hp_outputs(ctx); + const int npl_read_res = ctx.args.readonly ? 0 : read_contract_npl_outputs(ctx); + const int user_read_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs); + + if (ctx.termination_signaled || ctx.contract_pid == 0) { - // Atempt to read messages from contract (regardless of contract terminated or not). - const int hpsc_read_res = read_contract_hp_outputs(ctx); - const int npl_read_res = ctx.args.readonly ? 0 : read_contract_npl_outputs(ctx); - const int user_read_res = read_contract_fdmap_outputs(ctx.userfds, ctx.args.userbufs); - - if (ctx.termination_signaled || ctx.contract_pid == 0) - { - // If no bytes were read after contract finished execution, exit the loop. - // Otherwise keep running the loop becaue there might be further messages to read. - if ((hpsc_read_res + npl_read_res + user_read_res) == 0) - break; - } - else - { - // We assume contract is still running. Attempt to write any queued messages to the contract. - - const int npl_write_res = ctx.args.readonly ? 0 : write_npl_messages(ctx); - if (npl_write_res == -1) - break; - - const int hpsc_write_res = write_contract_hp_inputs(ctx); - if (hpsc_write_res == -1) - break; - - // If no operation was performed during this iteration, wait for a small delay until the next iteration. - // This means there were no queued messages from either side. - if ((hpsc_read_res + npl_read_res + user_read_res + hpsc_write_res + hpsc_write_res) == 0) - util::sleep(20); - } - - // Check if contract process has exited on its own during the loop. - if (ctx.contract_pid > 0) - check_contract_exited(ctx, false); + // If no bytes were read after contract finished execution, exit the loop. + // Otherwise keep running the loop becaue there might be further messages to read. + if ((hpsc_read_res + npl_read_res + user_read_res) == 0) + break; } + else + { + // We assume contract is still running. Attempt to write any queued messages to the contract. + + const int npl_write_res = ctx.args.readonly ? 0 : write_npl_messages(ctx); + if (npl_write_res == -1) + break; + + const int hpsc_write_res = write_contract_hp_inputs(ctx); + if (hpsc_write_res == -1) + break; + + // If no operation was performed during this iteration, wait for a small delay until the next iteration. + // This means there were no queued messages from either side. + if ((hpsc_read_res + npl_read_res + user_read_res + hpsc_write_res + hpsc_write_res) == 0) + util::sleep(20); + } + + // Check if contract process has exited on its own during the loop. + if (ctx.contract_pid > 0) + check_contract_exited(ctx, false); } // Close all fds. @@ -378,6 +373,11 @@ namespace sc cleanup_vectorfds(fds); ctx.userfds.clear(); + // Purge any inputs we passed to the contract. + for (const auto &[pubkey, bufs] : ctx.args.userbufs) + for (const util::buffer_view &input : bufs.inputs) + ctx.args.user_input_store.purge(input); + // If we reach this point but the contract is still running, then we need to kill the contract by force. // This can be the case if HP is shutting down, or there was an error in initial feeding of inputs. if (ctx.contract_pid > 0) @@ -527,29 +527,32 @@ namespace sc } } - /** - * Common helper function to write json output of fdmap to given ostream. - * @param fdmap Any pubkey->fdlist map. (eg. ctx.userfds) - * @param os An output stream. - */ - void fdmap_json_to_stream(const contract_fdmap_t &fdmap, std::ostringstream &os) + void user_json_to_stream(const contract_fdmap_t &user_fdmap, const contract_bufmap_t &user_bufmap, std::ostringstream &os) { - for (auto itr = fdmap.begin(); itr != fdmap.end(); itr++) + for (auto itr = user_fdmap.begin(); itr != user_fdmap.end(); itr++) { - if (itr != fdmap.begin()) + if (itr != user_fdmap.begin()) os << ","; // Trailing comma separator for previous element. // Get the hex pubkey. - std::string_view pubkey = itr->first; // Pubkey in binary format. + const std::string &pubkey = itr->first; // Pubkey in binary format. std::string pubkeyhex; util::bin2hex( pubkeyhex, - reinterpret_cast(pubkey.data()) + 1, + reinterpret_cast(pubkey.data()) + 1, // Skip key type prefix. pubkey.length() - 1); - // Write hex pubkey and fds. - os << "\"" << pubkeyhex << "\":" + const std::vector &user_inputs = user_bufmap.find(pubkey)->second.inputs; + + // Write hex pubkey as key and output fd as first element of array. + os << "\"" << pubkeyhex << "\":[" << itr->second[SOCKETFDTYPE::SCREADWRITE]; + + // Write input offsets into the same array. + for (auto inp_itr = user_inputs.begin(); inp_itr != user_inputs.end(); inp_itr++) + os << ",[" << inp_itr->offset << "," << inp_itr->size << "]"; + + os << "]"; } } @@ -573,18 +576,6 @@ namespace sc return 0; } - int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap) - { - // Loop through input buffers for each pubkey. - for (auto &[pubkey, buflist] : bufmap) - { - if (write_iosocket_stream(fdmap[pubkey], buflist.inputs) == -1) - return -1; - } - - return 0; - } - /** * Common function to read all outputs produced by the contract process and store them in * output buffers for later processing. @@ -689,59 +680,6 @@ namespace sc return 0; } - /** - * Common function to write the given input buffer into the write fd from the HP side socket. - * @param fds Vector of fd list. - * @param inputs Buffer to write into the HP write fd. - */ - int write_iosocket_stream(std::vector &fds, std::list &inputs) - { - // Write the inputs (if any) into the contract. - - const int writefd = fds[SOCKETFDTYPE::HPREADWRITE]; - if (writefd == -1) - return 0; - - bool write_error = false; - - // Prepare the input memory segments to write with wrtiev. - // Extra one element for the header. - iovec memsegs[inputs.size() * 2 + 1]; - uint8_t header[inputs.size() * 4 + 4]; - header[0] = inputs.size() >> 24; - header[1] = inputs.size() >> 16; - header[2] = inputs.size() >> 8; - header[3] = inputs.size(); - // Message count header. - memsegs[0].iov_base = header; - memsegs[0].iov_len = 4; - size_t i = 1; - for (std::string &input : inputs) - { - // 4 bytes for message len header. - const uint32_t len = input.length(); - header[i * 4] = len >> 24; - header[i * 4 + 1] = len >> 16; - header[i * 4 + 2] = len >> 8; - header[i * 4 + 3] = len; - memsegs[i * 2 - 1].iov_base = &header[i * 4]; - memsegs[i * 2 - 1].iov_len = 4; - memsegs[i * 2].iov_base = input.data(); - memsegs[i * 2].iov_len = input.length(); - i++; - } - - if (writev(writefd, memsegs, (inputs.size() * 2 + 1)) == -1) - write_error = true; - - inputs.clear(); - - if (write_error) - LOG_ERROR << errno << ": Error writing to stream socket."; - - return write_error ? -1 : 0; - } - /** * Common function to write the given input into the write fd from the HP side socket. * @param fds Vector of fd list. diff --git a/src/sc.hpp b/src/sc.hpp index 41e31324..18ce21d8 100644 --- a/src/sc.hpp +++ b/src/sc.hpp @@ -5,6 +5,7 @@ #include "usr/usr.hpp" #include "hpfs/h32.hpp" #include "util.hpp" +#include "util/buffer_store.hpp" #include "p2p/p2p.hpp" /** @@ -39,7 +40,7 @@ namespace sc struct contract_iobufs { // List of inputs to be fed into the contract. - std::list inputs; + std::vector inputs; // List of outputs from the contract. std::list outputs; @@ -68,6 +69,8 @@ namespace sc // The value is a pair holding consensus-verified inputs and contract-generated outputs. contract_bufmap_t userbufs; + util::buffer_store &user_input_store; + // NPL messages to be passed into contract. moodycamel::ReaderWriterQueue npl_messages; @@ -82,6 +85,10 @@ namespace sc // State hash after execution will be copied to this (not applicable to read only mode). hpfs::h32 post_execution_state_hash = hpfs::h32_empty; + + contract_execution_args(util::buffer_store &user_input_store) : user_input_store(user_input_store) + { + } }; /** @@ -115,6 +122,10 @@ namespace sc // Indicates that the deinit procedure has begun. bool is_shutting_down = false; + + execution_context(util::buffer_store &user_input_store) : args(user_input_store) + { + } }; int init(); @@ -131,7 +142,7 @@ namespace sc int stop_hpfs_session(execution_context &ctx); - int write_contract_args(const execution_context &ctx); + int write_contract_args(const execution_context &ctx, const int user_inputs_fd); void contract_monitor_loop(execution_context &ctx); @@ -147,20 +158,16 @@ namespace sc // Common helper functions - void fdmap_json_to_stream(const contract_fdmap_t &fdmap, std::ostringstream &os); + void user_json_to_stream(const contract_fdmap_t &user_fdmap, const contract_bufmap_t &user_bufmap, std::ostringstream &os); int create_iosockets_for_fdmap(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap); - int write_contract_fdmap_inputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap); - int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap); int create_iosockets(std::vector &fds, const int socket_type); int write_iosocket_seq_packet(std::vector &fds, std::string_view input); - int write_iosocket_stream(std::vector &fds, std::list &inputs); - int read_iosocket(const bool is_stream_socket, std::vector &fds, std::string &output); void close_unused_fds(execution_context &ctx, const bool is_hp); diff --git a/src/usr/read_req.cpp b/src/usr/read_req.cpp index 02206732..dbbe5c0a 100644 --- a/src/usr/read_req.cpp +++ b/src/usr/read_req.cpp @@ -1,6 +1,7 @@ #include "../pchheader.hpp" #include "../hplog.hpp" #include "../util.hpp" +#include "../util/buffer_store.hpp" #include "../conf.hpp" #include "../msg/usrmsg_parser.hpp" #include "usr.hpp" @@ -17,6 +18,8 @@ namespace read_req bool is_shutting_down = false; bool init_success = false; + + util::buffer_store read_req_store; std::thread thread_pool_executor; // Thread which spawns new threads for the read requests is the queue. std::vector read_req_threads; moodycamel::ConcurrentQueue read_req_queue(MAX_QUEUE_SIZE); @@ -27,6 +30,9 @@ namespace read_req int init() { + if (read_req_store.init() == -1) + return -1; + thread_pool_executor = std::thread(manage_thread_pool); init_success = true; return 0; @@ -51,6 +57,8 @@ namespace read_req // Joining all read request processing threads. for (std::thread &thread : read_req_threads) thread.join(); + + read_req_store.deinit(); } } @@ -123,7 +131,7 @@ namespace read_req { { // Contract context is added to the list for force kill if a SIGINT is received. - sc::execution_context contract_ctx; + sc::execution_context contract_ctx(read_req_store); std::scoped_lock execution_contract_lock(execution_contexts_mutex); context_itr = execution_contexts.emplace(execution_contexts.begin(), std::move(contract_ctx)); } @@ -191,13 +199,14 @@ namespace read_req */ int populate_read_req_queue(const std::string &pubkey, const std::string &content) { - sc::execution_context contract_ctx; - user_read_req read_request; - read_request.content = std::move(content); + read_request.content = read_req_store.write_buf(content.data(), content.size()); read_request.pubkey = pubkey; - return read_req_queue.try_enqueue(read_request); + if (read_request.content.is_null()) + return -1; + else + return read_req_queue.try_enqueue(read_request); } /** @@ -213,7 +222,7 @@ namespace read_req contract_ctx.args.state_dir.append("/rr_").append(std::to_string(thread_id)); contract_ctx.args.readonly = true; sc::contract_iobufs user_bufs; - user_bufs.inputs.push_back(std::move(read_request.content)); + user_bufs.inputs.push_back(read_request.content); contract_ctx.args.userbufs.try_emplace(read_request.pubkey, std::move(user_bufs)); } diff --git a/src/usr/read_req.hpp b/src/usr/read_req.hpp index 669f0044..619eb0d8 100644 --- a/src/usr/read_req.hpp +++ b/src/usr/read_req.hpp @@ -1,14 +1,15 @@ -#ifndef _HP_CONS_READ_REQ_ -#define _HP_CONS_READ_REQ_ +#ifndef _HP_USR_READ_REQ_ +#define _HP_USR_READ_REQ_ #include "../sc.hpp" +#include "../util/buffer_store.hpp" namespace read_req { struct user_read_req { std::string pubkey; - std::string content; + util::buffer_view content; }; int init(); diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 02eab829..6fe969c5 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -7,6 +7,7 @@ #include "../crypto.hpp" #include "../hplog.hpp" #include "../ledger.hpp" +#include "../util/buffer_store.hpp" #include "usr.hpp" #include "user_session_handler.hpp" #include "user_comm_session.hpp" @@ -20,6 +21,7 @@ namespace usr // Holds global connected-users and related objects. connected_context ctx; + util::buffer_store input_store; uint64_t metric_thresholds[5]; bool init_success = false; @@ -30,11 +32,14 @@ namespace usr int init() { metric_thresholds[0] = conf::cfg.pubmaxcpm; - metric_thresholds[1] = 0; // This metric doesn't apply to user context. - metric_thresholds[2] = 0; // This metric doesn't apply to user context. + metric_thresholds[1] = 0; // This metric doesn't apply to user context. + metric_thresholds[2] = 0; // This metric doesn't apply to user context. metric_thresholds[3] = conf::cfg.pubmaxbadmpm; metric_thresholds[4] = conf::cfg.pubidletimeout; + if (input_store.init() == -1) + return -1; + // Start listening for incoming user connections. if (start_listening() == -1) return -1; @@ -49,7 +54,10 @@ namespace usr void deinit() { if (init_success) + { ctx.server->stop(); + input_store.deinit(); + } } /** @@ -213,8 +221,8 @@ namespace usr const util::PROTOCOL protocol = (protocol_code == "json" ? util::PROTOCOL::JSON : util::PROTOCOL::BSON); - session.mark_as_verified(); // Mark connection as a verified connection. - session.issued_challenge.clear(); // Remove the stored challenge + session.mark_as_verified(); // Mark connection as a verified connection. + session.issued_challenge.clear(); // Remove the stored challenge session.uniqueid = pubkey; // Add the user to the global authed user list @@ -250,7 +258,7 @@ namespace usr const char *validate_user_input_submission(const std::string_view user_pubkey, const usr::user_input &umsg, const uint64_t lcl_seq_no, size_t &total_input_len, util::rollover_hashset &recent_user_input_hashes, - std::string &hash, std::string &input, uint64_t &max_lcl_seqno) + std::string &hash, util::buffer_view &input, uint64_t &max_lcl_seqno) { const std::string sig_hash = crypto::get_hash(umsg.sig); @@ -270,7 +278,9 @@ namespace usr std::string nonce; msg::usrmsg::usrmsg_parser parser(umsg.protocol); - parser.extract_input_container(input, nonce, max_lcl_seqno, umsg.input_container); + + std::string input_data; + parser.extract_input_container(input_data, nonce, max_lcl_seqno, umsg.input_container); // Ignore the input if our ledger has passed the input TTL. if (max_lcl_seqno <= lcl_seq_no) @@ -280,7 +290,7 @@ namespace usr } // Keep checking the subtotal of inputs extracted so far with the appbill account balance. - total_input_len += input.length(); + total_input_len += input_data.length(); if (!verify_appbill_check(user_pubkey, total_input_len)) { LOG_DEBUG << "User message app bill balance exceeded."; @@ -292,6 +302,10 @@ namespace usr // Append the hash of the message signature to get the final hash. hash.append(sig_hash); + // Copy the input data into the input store. + std::string_view s(); + input = input_store.write_buf(input_data.data(), input_data.size()); + return NULL; // Success. No reject reason. } diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index b9e13b93..3d1b22c1 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -3,6 +3,7 @@ #include "../pchheader.hpp" #include "../util.hpp" +#include "../util/buffer_store.hpp" #include "../msg/usrmsg_parser.hpp" #include "user_comm_session.hpp" #include "user_comm_server.hpp" @@ -59,6 +60,7 @@ namespace usr std::optional server; }; extern connected_context ctx; + extern util::buffer_store input_store; int init(); @@ -80,7 +82,7 @@ namespace usr const char *validate_user_input_submission(const std::string_view user_pubkey, const usr::user_input &umsg, const uint64_t lcl_seq_no, size_t &total_input_len, util::rollover_hashset &recent_user_input_hashes, - std::string &hash, std::string &input, uint64_t &max_lcl_seqno); + std::string &hash, util::buffer_view &input, uint64_t &max_lcl_seqno); bool verify_appbill_check(std::string_view pubkey, const size_t input_len); diff --git a/src/util/buffer_store.cpp b/src/util/buffer_store.cpp new file mode 100644 index 00000000..b23703fc --- /dev/null +++ b/src/util/buffer_store.cpp @@ -0,0 +1,62 @@ +#include "../pchheader.hpp" +#include "buffer_store.hpp" + +// memfd block size to have clean hole punch so that allocated blocks are released properly. +#define BLOCK_SIZE 4096 +#define BLOCK_ALIGN(x) (((x) + ((typeof(x))(BLOCK_SIZE)-1)) & ~((typeof(x))(BLOCK_SIZE)-1)) + +namespace util +{ + int buffer_store::init() + { + int fd = memfd_create("buffer_store", MFD_CLOEXEC); + if (fd == -1) + { + LOG_ERROR << errno << ": Error creating buffer store memfd."; + return -1; + } + + this->fd = fd; + return 0; + } + + const buffer_view buffer_store::write_buf(const void *buf, const uint32_t size) + { + buffer_view view = {0, 0}; + int res = pwrite(fd, buf, size, next_write_pos); + if (res < size) + { + LOG_ERROR << errno << ": Error writing to buffer store fd " << fd; + return view; + } + else + { + view.offset = next_write_pos; + view.size = size; + + // Get nearest block offset that occurs after the just-written buffer. + next_write_pos += size; + next_write_pos = BLOCK_ALIGN(next_write_pos); + + return view; + } + } + + int buffer_store::purge(const buffer_view &buf) + { + const size_t purge_size = BLOCK_ALIGN(buf.size); + if (fallocate(fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, buf.offset, purge_size) == -1) + { + LOG_ERROR << errno << ": Error when purging buffer store fd " << fd; + return -1; + } + return 0; + } + + void buffer_store::deinit() + { + if (fd > 0) + close(fd); + } + +} // namespace util \ No newline at end of file diff --git a/src/util/buffer_store.hpp b/src/util/buffer_store.hpp new file mode 100644 index 00000000..a9ddd7cd --- /dev/null +++ b/src/util/buffer_store.hpp @@ -0,0 +1,35 @@ +#ifndef _HP_UTIL_BUFFER_STORE_ +#define _HP_UTIL_BUFFER_STORE_ + +#include "../pchheader.hpp" + +namespace util +{ + + struct buffer_view + { + off_t offset; + uint32_t size; + + bool is_null() + { + return !offset && !size; + } + }; + + class buffer_store + { + private: + off_t next_write_pos = 0; + + public: + int fd; + int init(); + const buffer_view write_buf(const void *buf, const uint32_t size); + int purge(const buffer_view &buf); + void deinit(); + }; + +} // namespace util + +#endif \ No newline at end of file diff --git a/test/local-cluster/Dockerfile b/test/local-cluster/Dockerfile index 91771870..313da7d4 100644 --- a/test/local-cluster/Dockerfile +++ b/test/local-cluster/Dockerfile @@ -3,7 +3,7 @@ FROM node:12.18.3-buster-slim RUN apt-get update -RUN apt-get install -y libgomp1 libssl-dev gdb +RUN apt-get install -y libgomp1 libssl-dev gdb valgrind # Install shared libraries. # Copy shared libraries and register it.