From efbd775fa1d42cb9cdddc3d58e72c6ef8a4f1a6f Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Mon, 30 Nov 2020 10:03:06 +0530 Subject: [PATCH] Refactored C contract lib interface. (#175) --- examples/c_contract/echo_contract.c | 98 ++-- examples/c_contract/hotpocket_contract.h | 559 ++++++++------------ examples/nodejs_contract/hp-contract-lib.js | 13 +- test/local-cluster/cluster-create.sh | 2 +- 4 files changed, 286 insertions(+), 386 deletions(-) diff --git a/examples/c_contract/echo_contract.c b/examples/c_contract/echo_contract.c index d38fe22b..f76925ca 100644 --- a/examples/c_contract/echo_contract.c +++ b/examples/c_contract/echo_contract.c @@ -3,52 +3,79 @@ #include #include "hotpocket_contract.h" -// gcc echo_contract.c -o echo_contract -pthread +// gcc echo_contract.c -o echo_contract -void echo_contract(const struct hp_contract_context *ctx); -void on_user_message(const struct hp_contract_context *ctx, const struct hp_user *user, const void *buf, const uint32_t len); -void on_peer_message(const struct hp_contract_context *ctx, const char *peerPubKey, const void *buf, const uint32_t len); +void store_timestamp(const uint64_t timestamp); +void process_user_message(const struct hp_user *user, const void *buf, const uint32_t len); int main(int argc, char **argv) { - if (hp_init(echo_contract) == -1) + if (hp_init_contract() == -1) return 1; - return 0; -} - -/** - * HP smart contract is defined as a function which takes HP contract context as an argument. - * HP considers execution as complete, when this function returns. - */ -void echo_contract(const struct hp_contract_context *ctx) -{ - // Non-blocking call. This will start the peer message (NPL) listener. - // hp_peer_message_listener(ctx, on_peer_message); - - // Peer message sending example. - // hp_peer_write(ctx, "Hello", 5); + const struct hp_contract_context *ctx = hp_get_context(); + // We store the execution timestamp as an example state file change. if (!ctx->readonly) + store_timestamp(ctx->timestamp); + + // Read and process all user inputs from the mmap. + const void *input_mmap = hp_init_user_input_mmap(); + + // Iterate through all users. + for (int u = 0; u < ctx->users.count; u++) { - // We just save execution timestamp as an example state file change. - int fd = open("exects.txt", O_RDWR | O_CREAT | O_APPEND); - if (fd > 0) + const struct hp_user *user = &ctx->users.list[u]; + + // Iterate through all inputs from this user. + for (int i = 0; i < user->inputs.count; i++) { - char tsbuf[20]; - memset(tsbuf, 0, 20); - sprintf(tsbuf, "%lu\n", ctx->timestamp); - struct iovec vec[2] = {{"ts:", 4}, {(void *)tsbuf, 20}}; - writev(fd, vec, 2); - close(fd); + const struct hp_user_input input = user->inputs.list[i]; + + // Instead of mmap, we can also read the inputs from 'ctx->users.in_fd' using file I/O. + // However, using mmap is recommended because user inputs already reside in memory. + const void *buf = input_mmap + input.offset; + + process_user_message(user, buf, input.size); } } - // Blocking call. This will block until all user messages are looped. - hp_user_message_loop(ctx, on_user_message); + // Peer message send example: + // hp_write_peer_msg("Hello!", 6); + + // Peer message receive example: + // // Allocate buffers for received message. + // char sender[HP_KEY_SIZE]; + // char *msg = malloc(HP_PEER_MSG_MAX_SIZE); + // // Wait for 200ms for incoming message. We will receive our own message as well. + // const int len = hp_read_peer_msg(msg, sender, 200); + // if (len > 0) + // printf("Received %.*s from %.*s", len, msg, HP_KEY_SIZE, sender); + // free(msg); + + // Update UNL example: + // hp_update_unl("<64 char hex to add>", 1, "<64 char hex to remove>", 1); + + hp_deinit_user_input_mmap(); + hp_deinit_contract(); + return 0; } -void on_user_message(const struct hp_contract_context *ctx, const struct hp_user *user, const void *buf, const uint32_t len) +void store_timestamp(const uint64_t timestamp) +{ + int fd = open("exects.txt", O_RDWR | O_CREAT | O_APPEND); + if (fd > 0) + { + char tsbuf[20]; + memset(tsbuf, 0, 20); + sprintf(tsbuf, "%lu\n", timestamp); + struct iovec vec[2] = {{"ts:", 4}, {(void *)tsbuf, 20}}; + writev(fd, vec, 2); + close(fd); + } +} + +void process_user_message(const struct hp_user *user, const void *buf, const uint32_t len) { if (strncmp(buf, "ts", 2) == 0) { @@ -61,7 +88,7 @@ void on_user_message(const struct hp_contract_context *ctx, const struct hp_user char tsbuf[st.st_size]; if (read(fd, tsbuf, st.st_size) > 0) { - hp_user_write(user, tsbuf, st.st_size); + hp_write_user_msg(user, tsbuf, st.st_size); } } close(fd); @@ -70,11 +97,6 @@ void on_user_message(const struct hp_contract_context *ctx, const struct hp_user else { struct iovec vec[2] = {{"Echoing: ", 9}, {(void *)buf, len}}; - hp_user_writev(user, vec, 2); + hp_writev_user_msg(user, vec, 2); } } - -// Peer message handler func. -// void on_peer_message(const struct hp_contract_context *ctx, const char *peerPubKey, const void *buf, const uint32_t len) -// { -// } diff --git a/examples/c_contract/hotpocket_contract.h b/examples/c_contract/hotpocket_contract.h index e412bdb5..bd802790 100644 --- a/examples/c_contract/hotpocket_contract.h +++ b/examples/c_contract/hotpocket_contract.h @@ -1,5 +1,5 @@ -#ifndef HOTPOCKET_CONTRACT_LIB -#define HOTPOCKET_CONTRACT_LIB +#ifndef __HOTPOCKET_CONTRACT_LIB_C__ +#define __HOTPOCKET_CONTRACT_LIB_C__ #include #include @@ -8,17 +8,16 @@ #include #include #include -#include +#include #include "json.h" -#define __HP_KEY_SIZE 64 -#define __HP_HASH_SIZE 64 -#define __HP_MSG_HEADER_LEN 4 -#define __HP_SEQPKT_BUF_SIZE 131072 // 128KB to support SEQ_PACKET sockets. -#define __HP_POLL_TIMEOUT 20 - #define __HP_MMAP_BLOCK_SIZE 4096 #define __HP_MMAP_BLOCK_ALIGN(x) (((x) + ((typeof(x))(__HP_MMAP_BLOCK_SIZE)-1)) & ~((typeof(x))(__HP_MMAP_BLOCK_SIZE)-1)) +#define __HP_STREAM_MSG_HEADER_SIZE 4 +#define __HP_SEQPKT_MAX_SIZE 131072 // 128KB to support SEQ_PACKET sockets. +#define HP_PEER_MSG_MAX_SIZE __HP_SEQPKT_MAX_SIZE +#define HP_KEY_SIZE 64 +#define HP_HASH_SIZE 64 #define __HP_ASSIGN_STRING(dest, elem) \ if (elem->value->type == json_type_string) \ @@ -62,24 +61,29 @@ struct hp_user_input uint32_t size; }; +struct hp_user_inputs_collection +{ + struct hp_user_input *list; + size_t count; +}; + struct hp_user { - char pubkey[__HP_KEY_SIZE + 1]; + char pubkey[HP_KEY_SIZE + 1]; int outfd; - struct hp_user_input *inputs; - uint32_t inputs_count; + struct hp_user_inputs_collection inputs; }; struct hp_peer { - char pubkey[__HP_KEY_SIZE + 1]; + char pubkey[HP_KEY_SIZE + 1]; }; struct hp_users_collection { - int infd; struct hp_user *list; size_t count; + int in_fd; }; struct hp_peers_collection @@ -92,69 +96,51 @@ struct hp_peers_collection struct hp_contract_context { bool readonly; - uint64_t timestamp; - char pubkey[__HP_KEY_SIZE + 1]; - - char lcl_hash[__HP_HASH_SIZE + 1]; - uint64_t lcl_seq_no; - + char pubkey[HP_KEY_SIZE + 1]; + char lcl[HP_HASH_SIZE + 22]; // uint64(20 chars) + "-" + hash + nullchar struct hp_users_collection users; struct hp_peers_collection peers; }; -struct __hp_global_context +struct __hp_contract { + struct hp_contract_context *cctx; int control_fd; - bool should_stop; + void *user_inmap; + size_t user_inmap_size; }; -typedef void (*hp_contract_func)(const struct hp_contract_context *ctx); -typedef void (*hp_user_message_func)(const struct hp_contract_context *ctx, - const struct hp_user *user, const void *buf, const uint32_t len); -typedef void (*hp_peer_message_func)(const struct hp_contract_context *ctx, - const char *peerPubKey, const void *buf, const uint32_t len); - -struct __hp_peer_message_thread_arg -{ - const struct hp_contract_context *ctx; - hp_peer_message_func on_peer_message; -}; - -int hp_init(); -int hp_user_message_loop(const struct hp_contract_context *ctx, hp_user_message_func on_user_message); -int hp_peer_message_listener(const struct hp_contract_context *ctx, hp_peer_message_func on_peer_message); - -int hp_user_write(const struct hp_user *user, const uint8_t *buf, const uint32_t len); -int hp_user_writev(const struct hp_user *user, const struct iovec *bufs, const int buf_count); - -int hp_peer_write(const struct hp_contract_context *ctx, const uint8_t *buf, const uint32_t len); -int hp_peer_writev(const struct hp_contract_context *ctx, const struct iovec *bufs, const int buf_count); - +int hp_init_contract(); +int hp_deinit_contract(); +const struct hp_contract_context *hp_get_context(); +const void *hp_init_user_input_mmap(); +void hp_deinit_user_input_mmap(); +int hp_write_user_msg(const struct hp_user *user, const void *buf, const uint32_t len); +int hp_writev_user_msg(const struct hp_user *user, const struct iovec *bufs, const int buf_count); +int hp_write_peer_msg(const void *buf, const uint32_t len); +int hp_writev_peer_msg(const struct iovec *bufs, const int buf_count); +int hp_read_peer_msg(void *msg_buf, char *pubkey_buf, const int timeout); int hp_update_unl(const char *add, const size_t add_count, const char *remove, const size_t remove_count); -void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_object_s *object); -void __hp_free_contract_context(struct hp_contract_context *ctx); +void __hp_parse_args_json(const struct json_object_s *object); +int __hp_write_control_msg(const void *buf, const uint32_t len); +void __hp_free(void *ptr); -static void *__hp_peer_message_thread_func(void *arg); +static struct __hp_contract __hpc = {}; -static void *__hp_control_message_thread_func(void *arg); -int __hp_control_write(const uint8_t *buf, const uint32_t len); -void __hp_on_control_message(const void *buf, const uint32_t len); - -static struct __hp_global_context gctx = {}; -static pthread_t __hp_control_thread = 0; -static pthread_t __hp_peer_thread = 0; - -int hp_init(hp_contract_func contract_func) +int hp_init_contract() { - if (!contract_func) - return -1; + if (__hpc.cctx) + return -1; // Already initialized. char buf[4096]; const size_t len = read(STDIN_FILENO, buf, sizeof(buf)); if (len == -1) + { + perror("Error when reading stdin."); return -1; + } struct json_value_s *root = json_parse(buf, len); @@ -164,121 +150,101 @@ int hp_init(hp_contract_func contract_func) if (object->length > 0) { // Create and populate hotpocket context. - struct hp_contract_context ctx = {}; - __hp_parse_args_json(&ctx, object); - free(root); + __hpc.cctx = (struct hp_contract_context *)malloc(sizeof(struct hp_contract_context)); + __hp_parse_args_json(object); + __hp_free(root); - // Start control channel listener. - if (pthread_create(&__hp_control_thread, NULL, &__hp_control_message_thread_func, NULL) == -1) - { - perror("Error creating control thread"); - goto error; - } - - // Execute user defined contract function. - if (contract_func) - contract_func(&ctx); - - // Instructs to all threads to gracefully stop. - gctx.should_stop = true; - - if (__hp_peer_thread) - pthread_join(__hp_peer_thread, NULL); - __hp_peer_thread = 0; - - pthread_join(__hp_control_thread, NULL); - __hp_control_thread = 0; - - // Cleanup. - close(ctx.users.infd); - for (int i = 0; i < ctx.users.count; i++) - close(ctx.users.list[i].outfd); - - close(ctx.peers.fd); - - __hp_free_contract_context(&ctx); - - // Send termination control message. - __hp_control_write("{\"type\":\"contract_end\"}", 23); - close(gctx.control_fd); return 0; } } -error: - - if (root) - free(root); - + __hp_free(root); return -1; } -int hp_user_message_loop(const struct hp_contract_context *ctx, hp_user_message_func on_user_message) +int hp_deinit_contract() { - int result = 0; - const int fd = ctx->users.infd; + struct hp_contract_context *cctx = __hpc.cctx; + if (!cctx) + return -1; // Not initialized. + + // Cleanup user input mmap (if mapped). + hp_deinit_user_input_mmap(); + + // Cleanup user and peer fds. + close(cctx->users.in_fd); + for (int i = 0; i < cctx->users.count; i++) + close(cctx->users.list[i].outfd); + close(cctx->peers.fd); + + // Cleanup user list allocation. + if (cctx->users.list) + { + for (int i = 0; i < cctx->users.count; i++) + __hp_free(cctx->users.list[i].inputs.list); + + __hp_free(cctx->users.list); + } + // Cleanup peer list allocation. + __hp_free(cctx->peers.list); + // Cleanup contract context. + __hp_free(cctx); + + // Send termination control message. + __hp_write_control_msg("{\"type\":\"contract_end\"}", 23); + close(__hpc.control_fd); +} + +const struct hp_contract_context *hp_get_context() +{ + return __hpc.cctx; +} + +const void *hp_init_user_input_mmap() +{ + if (__hpc.user_inmap) + return __hpc.user_inmap; + + struct hp_contract_context *cctx = __hpc.cctx; struct stat st; - if (fstat(fd, &st) == -1) + if (fstat(cctx->users.in_fd, &st) == -1) { perror("Error in user input fd stat"); - return -1; + return NULL; } if (st.st_size == 0) - return 0; + return NULL; const size_t mmap_size = __HP_MMAP_BLOCK_ALIGN(st.st_size); - void *fdptr = mmap(NULL, mmap_size, PROT_READ, MAP_PRIVATE, fd, 0); - if (fdptr == MAP_FAILED) + void *mmap_ptr = mmap(NULL, mmap_size, PROT_READ, MAP_PRIVATE, cctx->users.in_fd, 0); + if (mmap_ptr == MAP_FAILED) { perror("Error in user input fd mmap"); - return -1; + return NULL; } - close(fd); // We can close the fd after mmap. - - for (int i = 0; i < ctx->users.count; i++) - { - const struct hp_user *user = &ctx->users.list[i]; - for (int j = 0; j < user->inputs_count; j++) - { - const struct hp_user_input *input = &user->inputs[j]; - on_user_message(ctx, user, (fdptr + input->offset), input->size); - } - } - - munmap(fdptr, mmap_size); + __hpc.user_inmap = mmap_ptr; + __hpc.user_inmap_size = mmap_size; + return __hpc.user_inmap; } -int hp_peer_message_listener(const struct hp_contract_context *ctx, hp_peer_message_func on_peer_message) +void hp_deinit_user_input_mmap() { - if (__hp_peer_thread) - { - fprintf(stderr, "Peer listener already started.\n"); - return -1; - } - - // We need to malloc the arg so it doesn't go out of scope. (It will be freed by the thread func when it exits) - struct __hp_peer_message_thread_arg *arg = malloc(sizeof(struct __hp_peer_message_thread_arg)); - arg->ctx = ctx; - arg->on_peer_message = on_peer_message; - if (pthread_create(&__hp_peer_thread, NULL, &__hp_peer_message_thread_func, arg) == -1) - { - perror("Error creating peer thread"); - return -1; - } - - return 0; + if (__hpc.user_inmap) + munmap(__hpc.user_inmap, __hpc.user_inmap_size); + __hpc.user_inmap = NULL; + __hpc.user_inmap_size = 0; } -int hp_user_write(const struct hp_user *user, const uint8_t *buf, const uint32_t len) +int hp_write_user_msg(const struct hp_user *user, const void *buf, const uint32_t len) { const struct iovec vec = {(void *)buf, len}; - return hp_user_writev(user, &vec, 1); + return hp_writev_user_msg(user, &vec, 1); } -int hp_user_writev(const struct hp_user *user, const struct iovec *bufs, const int buf_count) +int hp_writev_user_msg(const struct hp_user *user, const struct iovec *bufs, const int buf_count) { const int total_buf_count = buf_count + 1; struct iovec all_bufs[total_buf_count]; // We need to prepend the length header buf to indicate user message length. @@ -291,41 +257,112 @@ int hp_user_writev(const struct hp_user *user, const struct iovec *bufs, const i msg_len += bufs[i].iov_len; } - uint8_t header_buf[__HP_MSG_HEADER_LEN]; + uint8_t header_buf[__HP_STREAM_MSG_HEADER_SIZE]; __HP_TO_BE(msg_len, header_buf, 0); all_bufs[0].iov_base = header_buf; - all_bufs[0].iov_len = __HP_MSG_HEADER_LEN; + all_bufs[0].iov_len = __HP_STREAM_MSG_HEADER_SIZE; return writev(user->outfd, all_bufs, total_buf_count); } -int hp_peer_write(const struct hp_contract_context *ctx, const uint8_t *buf, const uint32_t len) +int hp_write_peer_msg(const void *buf, const uint32_t len) { - if (len > __HP_SEQPKT_BUF_SIZE) + if (len > HP_PEER_MSG_MAX_SIZE) { - fprintf(stderr, "Peer message exceeds max length %d.", __HP_SEQPKT_BUF_SIZE); + fprintf(stderr, "Peer message exceeds max length %d.", HP_PEER_MSG_MAX_SIZE); return -1; } - return write(ctx->peers.fd, buf, len); + return write(__hpc.cctx->peers.fd, buf, len); } -int hp_peer_writev(const struct hp_contract_context *ctx, const struct iovec *bufs, const int buf_count) +int hp_writev_peer_msg(const struct iovec *bufs, const int buf_count) { uint32_t len = 0; for (int i = 0; i < buf_count; i++) len += bufs[i].iov_len; - if (len > __HP_SEQPKT_BUF_SIZE) + if (len > HP_PEER_MSG_MAX_SIZE) { - fprintf(stderr, "Peer message exceeds max length %d.", __HP_SEQPKT_BUF_SIZE); + fprintf(stderr, "Peer message exceeds max length %d.", HP_PEER_MSG_MAX_SIZE); return -1; } - return writev(ctx->peers.fd, bufs, buf_count); + return writev(__hpc.cctx->peers.fd, bufs, buf_count); } +/** + * Reads a peer message (NPL) while waiting for 'timeout' milliseconds. + * @param msg_buf The buffer to place the incoming message. Must be of at least 'HP_PEER_MSG_MAX_SIZE' length. + * @param pubkey_buf The buffer to place the sender pubkey (hex). Must be of at least 'HP_KEY_SIZE' length. + * @param timeout Maximum milliseoncds to wait until a message arrives. If 0, returns immediately. + * If -1, waits forever until message arrives. + * @return Message length on success. 0 if no message arrived within timeout. -1 on error. + */ +int hp_read_peer_msg(void *msg_buf, char *pubkey_buf, const int timeout) +{ + struct pollfd pfd = {__hpc.cctx->peers.fd, POLLIN, 0}; + + // Peer messages consist of alternating SEQ packets of pubkey and data. + // So we need to wait for both pubkey and data packets to form a complete peer message. + + // Wait for the pubkey. + if (poll(&pfd, 1, timeout) == -1) + { + perror("Peer channel pubkey poll error"); + return -1; + } + else if (pfd.revents & (POLLHUP | POLLERR | POLLNVAL)) + { + fprintf(stderr, "Peer channel pubkey poll returned error: %d\n", pfd.revents); + return -1; + } + else if (pfd.revents & POLLIN) + { + // Read pubkey. + if (read(pfd.fd, pubkey_buf, HP_KEY_SIZE) == -1) + { + perror("Error reading pubkey from peer channel"); + return -1; + } + + // Wait for data. (data should be available immediately because we have received the pubkey) + pfd.revents = 0; + if (poll(&pfd, 1, 100) == -1) + { + perror("Peer channel data poll error"); + return -1; + } + else if (pfd.revents & (POLLHUP | POLLERR | POLLNVAL)) + { + fprintf(stderr, "Peer channel data poll returned error: %d\n", pfd.revents); + return -1; + } + else if (pfd.revents & POLLIN) + { + // Read data. + const int readres = read(pfd.fd, msg_buf, HP_PEER_MSG_MAX_SIZE); + if (readres == -1) + { + perror("Error reading pubkey from peer channel"); + return -1; + } + return readres; + } + } + + return 0; +} + +/** + * Updates the UNL of this node with specified 'add' and 'remove' changesets. + * @param add Array of hex pubkeys of 'HP_KEY_SIZE' to add. + * @param add_count Number of elements in 'add' array. + * @param remove Array of hex pubkeys of 'HP_KEY_SIZE' to remove. + * @param remove_count Number of elements in 'add' remove. + * @return 0 on success. -1 on error. + */ int hp_update_unl(const char *add, const size_t add_count, const char *remove, const size_t remove_count) { // We assume 'add' and 'remove' are pointing to a char buffer containing 'count' no. of char[64] buffers. @@ -362,47 +399,37 @@ int hp_update_unl(const char *add, const size_t add_count, const char *remove, c strncpy(json_buf + pos, "]}", 2); - return __hp_control_write(json_buf, json_size); + return __hp_write_control_msg(json_buf, json_size); } -void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_object_s *object) +void __hp_parse_args_json(const struct json_object_s *object) { const struct json_object_element_s *elem = object->start; + struct hp_contract_context *cctx = __hpc.cctx; + do { const struct json_string_s *k = elem->name; if (strcmp(k->string, "pubkey") == 0) { - __HP_ASSIGN_STRING(ctx->pubkey, elem); + __HP_ASSIGN_STRING(cctx->pubkey, elem); } else if (strcmp(k->string, "ts") == 0) { - __HP_ASSIGN_UINT64(ctx->timestamp, elem); + __HP_ASSIGN_UINT64(cctx->timestamp, elem); } else if (strcmp(k->string, "readonly") == 0) { - __HP_ASSIGN_BOOL(ctx->readonly, elem); + __HP_ASSIGN_BOOL(cctx->readonly, elem); } else if (strcmp(k->string, "lcl") == 0) { - if (elem->value->type == json_type_string) - { - const struct json_string_s *value = (struct json_string_s *)elem->value->payload; - const char *delim = "-"; - char *tok_ptr; - char *tok_str = strdup(value->string); - const char *seq_str = strtok_r(tok_str, delim, &tok_ptr); - const char *hash_str = strtok_r(NULL, delim, &tok_ptr); - - ctx->lcl_seq_no = strtoull(seq_str, NULL, 0); - memcpy(ctx->lcl_hash, hash_str, __HP_HASH_SIZE); - free(tok_str); - } + __HP_ASSIGN_STRING(cctx->lcl, elem); } else if (strcmp(k->string, "userinfd") == 0) { - __HP_ASSIGN_INT(ctx->users.infd, elem); + __HP_ASSIGN_INT(cctx->users.in_fd, elem); } else if (strcmp(k->string, "users") == 0) { @@ -411,16 +438,16 @@ void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_obj const struct json_object_s *user_object = (struct json_object_s *)elem->value->payload; const size_t user_count = user_object->length; - ctx->users.count = user_count; - ctx->users.list = user_count ? (struct hp_user *)malloc(sizeof(struct hp_user) * user_count) : NULL; + cctx->users.count = user_count; + cctx->users.list = user_count ? (struct hp_user *)malloc(sizeof(struct hp_user) * user_count) : NULL; if (user_count > 0) { struct json_object_element_s *user_elem = user_object->start; for (int i = 0; i < user_count; i++) { - struct hp_user *user = &ctx->users.list[i]; - memcpy(user->pubkey, user_elem->name->string, __HP_KEY_SIZE); + struct hp_user *user = &cctx->users.list[i]; + memcpy(user->pubkey, user_elem->name->string, HP_KEY_SIZE); if (user_elem->value->type == json_type_array) { @@ -432,17 +459,17 @@ void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_obj arr_elem = arr_elem->next; // Subsequent elements are tupels of [offset, size] of input messages for this user. - user->inputs_count = arr->length - 1; - user->inputs = user->inputs_count ? malloc(user->inputs_count * sizeof(struct hp_user_input)) : NULL; - for (int i = 0; i < user->inputs_count; i++) + user->inputs.count = arr->length - 1; + user->inputs.list = user->inputs.count ? (struct hp_user_input *)malloc(user->inputs.count * sizeof(struct hp_user_input)) : NULL; + for (int i = 0; i < user->inputs.count; i++) { if (arr_elem->value->type == json_type_array) { const struct json_array_s *input_info = (struct json_array_s *)arr_elem->value->payload; if (input_info->length == 2) { - __HP_ASSIGN_UINT64(user->inputs[i].offset, input_info->start); - __HP_ASSIGN_UINT64(user->inputs[i].size, input_info->start->next); + __HP_ASSIGN_UINT64(user->inputs.list[i].offset, input_info->start); + __HP_ASSIGN_UINT64(user->inputs.list[i].size, input_info->start->next); } } arr_elem = arr_elem->next; @@ -455,7 +482,7 @@ void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_obj } else if (strcmp(k->string, "nplfd") == 0) { - __HP_ASSIGN_INT(ctx->peers.fd, elem); + __HP_ASSIGN_INT(cctx->peers.fd, elem); } else if (strcmp(k->string, "unl") == 0) { @@ -464,15 +491,15 @@ void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_obj const struct json_array_s *peer_array = (struct json_array_s *)elem->value->payload; const size_t peer_count = peer_array->length; - ctx->peers.count = peer_count; - ctx->peers.list = peer_count ? (struct hp_peer *)malloc(sizeof(struct hp_peer) * peer_count) : NULL; + cctx->peers.count = peer_count; + cctx->peers.list = peer_count ? (struct hp_peer *)malloc(sizeof(struct hp_peer) * peer_count) : NULL; if (peer_count > 0) { struct json_array_element_s *peer_elem = peer_array->start; for (int i = 0; i < peer_count; i++) { - __HP_ASSIGN_STRING(ctx->peers.list[i].pubkey, peer_elem); + __HP_ASSIGN_STRING(cctx->peers.list[i].pubkey, peer_elem); peer_elem = peer_elem->next; } } @@ -480,170 +507,28 @@ void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_obj } else if (strcmp(k->string, "controlfd") == 0) { - __HP_ASSIGN_INT(gctx.control_fd, elem); + __HP_ASSIGN_INT(__hpc.control_fd, elem); } elem = elem->next; } while (elem); } -static void *__hp_peer_message_thread_func(void *arg) +int __hp_write_control_msg(const void *buf, const uint32_t len) { - const struct __hp_peer_message_thread_arg *args = (const struct __hp_peer_message_thread_arg *)arg; - - // Pubkey buf to hold the sender pubkey of the message that follows it. - bool has_pubkey = false; - char pubkey_buf[__HP_KEY_SIZE + 1]; - memset(pubkey_buf, 0, sizeof(pubkey_buf)); - - // Buffer to hold current message. - uint8_t *msg_buf = malloc(__HP_SEQPKT_BUF_SIZE); - - struct pollfd pfd = {args->ctx->peers.fd, POLLIN, 0}; - - while (!gctx.should_stop) + if (len > __HP_SEQPKT_MAX_SIZE) { - // Reset poll fd because we are reusing it. - pfd.revents = 0; - - if (poll(&pfd, 1, __HP_POLL_TIMEOUT) == -1) - { - perror("Peer channel poll error"); - goto error; - } - - short result = pfd.revents; - if (result == 0) - continue; - - if (result & (POLLHUP | POLLERR | POLLNVAL)) - { - fprintf(stderr, "Peer channel poll returned error.\n"); - goto error; - } - else if (result & POLLIN) - { - // The read data alternates between the sender pubkey and the message. - if (!has_pubkey) - { - if (read(pfd.fd, pubkey_buf, __HP_KEY_SIZE) == -1) - { - perror("Error reading pubkey from peer channel"); - goto error; - } - has_pubkey = true; - } - else - { - const size_t read_res = read(pfd.fd, msg_buf, __HP_SEQPKT_BUF_SIZE); - if (read_res == -1) - { - perror("Error reading message from peer channel"); - goto error; - } - - // Invoke the user defined peer message handler func. - args->on_peer_message(args->ctx, pubkey_buf, msg_buf, read_res); - has_pubkey = false; - } - } - } - - // If we reach here that means result is successful. - goto end; - -error: - // Perform any error handling. - -end: - free(arg); - free(msg_buf); - return NULL; -} - -static void *__hp_control_message_thread_func(void *arg) -{ - int result = 0; - - // Temp buffer for all read operations. - uint8_t *buf = malloc(__HP_SEQPKT_BUF_SIZE); - - struct pollfd pfd = {gctx.control_fd, POLLIN, 0}; - - while (!gctx.should_stop) - { - // Reset poll fd because we are reusing it. - pfd.revents = 0; - - if (poll(&pfd, 1, __HP_POLL_TIMEOUT) == -1) - { - perror("Control channel poll error"); - goto error; - } - - short result = pfd.revents; - if (result == 0) - continue; - - if (result & (POLLHUP | POLLERR | POLLNVAL)) - { - fprintf(stderr, "Control channel poll returned error.\n"); - goto error; - } - else if (result & POLLIN) - { - const size_t read_res = read(pfd.fd, buf, __HP_SEQPKT_BUF_SIZE); - if (read_res == -1) - { - perror("Error reading control channel"); - goto error; - } - - __hp_on_control_message(buf, read_res); - } - } - - // If we reach here that means result is successful. - goto end; - -error: - // Perform any error handling. - -end: - free(buf); - return NULL; -} - -void __hp_on_control_message(const void *buf, const uint32_t len) -{ - // TODO: Handle control messages from hot pocket. -} - -void __hp_free_contract_context(struct hp_contract_context *ctx) -{ - if (ctx->users.list) - { - for (int i = 0; i < ctx->users.count; i++) - { - if (ctx->users.list[i].inputs) - free(ctx->users.list[i].inputs); - } - free(ctx->users.list); - } - - if (ctx->peers.list) - free(ctx->peers.list); -} - -int __hp_control_write(const uint8_t *buf, const uint32_t len) -{ - if (len > __HP_SEQPKT_BUF_SIZE) - { - fprintf(stderr, "Control message exceeds max length %d.", __HP_SEQPKT_BUF_SIZE); + fprintf(stderr, "Control message exceeds max length %d.", __HP_SEQPKT_MAX_SIZE); return -1; } - return write(gctx.control_fd, buf, len); + return write(__hpc.control_fd, buf, len); +} + +void __hp_free(void *ptr) +{ + free(ptr); + ptr = NULL; } #endif \ No newline at end of file diff --git a/examples/nodejs_contract/hp-contract-lib.js b/examples/nodejs_contract/hp-contract-lib.js index 4c600874..732e0a9e 100644 --- a/examples/nodejs_contract/hp-contract-lib.js +++ b/examples/nodejs_contract/hp-contract-lib.js @@ -55,19 +55,12 @@ class ContractExecutionContext { #controlChannel = null; constructor(hpargs, users, peers, controlChannel) { + this.#controlChannel = controlChannel; this.readonly = hpargs.readonly; this.timestamp = hpargs.ts; this.users = users; - this.peers = peers; - this.#controlChannel = controlChannel; - - if (!hpargs.readonly) { - const lclParts = hpargs.lcl.split("-"); - this.lcl = { - seqNo: parseInt(lclParts[0]), - hash: lclParts[1] - }; - } + this.peers = peers; // Not available in readonly mode. + this.lcl = hpargs.lcl; // Not available in readonly mode. } async updateUnl(addArray, removeArray) { diff --git a/test/local-cluster/cluster-create.sh b/test/local-cluster/cluster-create.sh index 8f3cf1d6..3da3bc25 100755 --- a/test/local-cluster/cluster-create.sh +++ b/test/local-cluster/cluster-create.sh @@ -18,7 +18,7 @@ hpcore=$(realpath ../..) if [ "$CONTRACT" = "cecho" ]; then # C echo contract echo "Using C echo contract." pushd $hpcore/examples/c_contract/ > /dev/null 2>&1 - gcc echo_contract.c -o echo_contract -pthread + gcc echo_contract.c -o echo_contract popd > /dev/null 2>&1 copyfiles="$hpcore/examples/c_contract/echo_contract" binary="/contract/bin/echo_contract"