Refactored C contract lib interface. (#175)

This commit is contained in:
Ravin Perera
2020-11-30 10:03:06 +05:30
committed by GitHub
parent 332e5a4750
commit efbd775fa1
4 changed files with 286 additions and 386 deletions

View File

@@ -3,52 +3,79 @@
#include <sys/stat.h>
#include "hotpocket_contract.h"
// gcc echo_contract.c -o echo_contract -pthread
// gcc echo_contract.c -o echo_contract
void echo_contract(const struct hp_contract_context *ctx);
void on_user_message(const struct hp_contract_context *ctx, const struct hp_user *user, const void *buf, const uint32_t len);
void on_peer_message(const struct hp_contract_context *ctx, const char *peerPubKey, const void *buf, const uint32_t len);
void store_timestamp(const uint64_t timestamp);
void process_user_message(const struct hp_user *user, const void *buf, const uint32_t len);
int main(int argc, char **argv)
{
if (hp_init(echo_contract) == -1)
if (hp_init_contract() == -1)
return 1;
return 0;
}
/**
* HP smart contract is defined as a function which takes HP contract context as an argument.
* HP considers execution as complete, when this function returns.
*/
void echo_contract(const struct hp_contract_context *ctx)
{
// Non-blocking call. This will start the peer message (NPL) listener.
// hp_peer_message_listener(ctx, on_peer_message);
// Peer message sending example.
// hp_peer_write(ctx, "Hello", 5);
const struct hp_contract_context *ctx = hp_get_context();
// We store the execution timestamp as an example state file change.
if (!ctx->readonly)
store_timestamp(ctx->timestamp);
// Read and process all user inputs from the mmap.
const void *input_mmap = hp_init_user_input_mmap();
// Iterate through all users.
for (int u = 0; u < ctx->users.count; u++)
{
// We just save execution timestamp as an example state file change.
int fd = open("exects.txt", O_RDWR | O_CREAT | O_APPEND);
if (fd > 0)
const struct hp_user *user = &ctx->users.list[u];
// Iterate through all inputs from this user.
for (int i = 0; i < user->inputs.count; i++)
{
char tsbuf[20];
memset(tsbuf, 0, 20);
sprintf(tsbuf, "%lu\n", ctx->timestamp);
struct iovec vec[2] = {{"ts:", 4}, {(void *)tsbuf, 20}};
writev(fd, vec, 2);
close(fd);
const struct hp_user_input input = user->inputs.list[i];
// Instead of mmap, we can also read the inputs from 'ctx->users.in_fd' using file I/O.
// However, using mmap is recommended because user inputs already reside in memory.
const void *buf = input_mmap + input.offset;
process_user_message(user, buf, input.size);
}
}
// Blocking call. This will block until all user messages are looped.
hp_user_message_loop(ctx, on_user_message);
// Peer message send example:
// hp_write_peer_msg("Hello!", 6);
// Peer message receive example:
// // Allocate buffers for received message.
// char sender[HP_KEY_SIZE];
// char *msg = malloc(HP_PEER_MSG_MAX_SIZE);
// // Wait for 200ms for incoming message. We will receive our own message as well.
// const int len = hp_read_peer_msg(msg, sender, 200);
// if (len > 0)
// printf("Received %.*s from %.*s", len, msg, HP_KEY_SIZE, sender);
// free(msg);
// Update UNL example:
// hp_update_unl("<64 char hex to add>", 1, "<64 char hex to remove>", 1);
hp_deinit_user_input_mmap();
hp_deinit_contract();
return 0;
}
void on_user_message(const struct hp_contract_context *ctx, const struct hp_user *user, const void *buf, const uint32_t len)
void store_timestamp(const uint64_t timestamp)
{
int fd = open("exects.txt", O_RDWR | O_CREAT | O_APPEND);
if (fd > 0)
{
char tsbuf[20];
memset(tsbuf, 0, 20);
sprintf(tsbuf, "%lu\n", timestamp);
struct iovec vec[2] = {{"ts:", 4}, {(void *)tsbuf, 20}};
writev(fd, vec, 2);
close(fd);
}
}
void process_user_message(const struct hp_user *user, const void *buf, const uint32_t len)
{
if (strncmp(buf, "ts", 2) == 0)
{
@@ -61,7 +88,7 @@ void on_user_message(const struct hp_contract_context *ctx, const struct hp_user
char tsbuf[st.st_size];
if (read(fd, tsbuf, st.st_size) > 0)
{
hp_user_write(user, tsbuf, st.st_size);
hp_write_user_msg(user, tsbuf, st.st_size);
}
}
close(fd);
@@ -70,11 +97,6 @@ void on_user_message(const struct hp_contract_context *ctx, const struct hp_user
else
{
struct iovec vec[2] = {{"Echoing: ", 9}, {(void *)buf, len}};
hp_user_writev(user, vec, 2);
hp_writev_user_msg(user, vec, 2);
}
}
// Peer message handler func.
// void on_peer_message(const struct hp_contract_context *ctx, const char *peerPubKey, const void *buf, const uint32_t len)
// {
// }

View File

@@ -1,5 +1,5 @@
#ifndef HOTPOCKET_CONTRACT_LIB
#define HOTPOCKET_CONTRACT_LIB
#ifndef __HOTPOCKET_CONTRACT_LIB_C__
#define __HOTPOCKET_CONTRACT_LIB_C__
#include <stdio.h>
#include <unistd.h>
@@ -8,17 +8,16 @@
#include <poll.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <pthread.h>
#include <sys/stat.h>
#include "json.h"
#define __HP_KEY_SIZE 64
#define __HP_HASH_SIZE 64
#define __HP_MSG_HEADER_LEN 4
#define __HP_SEQPKT_BUF_SIZE 131072 // 128KB to support SEQ_PACKET sockets.
#define __HP_POLL_TIMEOUT 20
#define __HP_MMAP_BLOCK_SIZE 4096
#define __HP_MMAP_BLOCK_ALIGN(x) (((x) + ((typeof(x))(__HP_MMAP_BLOCK_SIZE)-1)) & ~((typeof(x))(__HP_MMAP_BLOCK_SIZE)-1))
#define __HP_STREAM_MSG_HEADER_SIZE 4
#define __HP_SEQPKT_MAX_SIZE 131072 // 128KB to support SEQ_PACKET sockets.
#define HP_PEER_MSG_MAX_SIZE __HP_SEQPKT_MAX_SIZE
#define HP_KEY_SIZE 64
#define HP_HASH_SIZE 64
#define __HP_ASSIGN_STRING(dest, elem) \
if (elem->value->type == json_type_string) \
@@ -62,24 +61,29 @@ struct hp_user_input
uint32_t size;
};
struct hp_user_inputs_collection
{
struct hp_user_input *list;
size_t count;
};
struct hp_user
{
char pubkey[__HP_KEY_SIZE + 1];
char pubkey[HP_KEY_SIZE + 1];
int outfd;
struct hp_user_input *inputs;
uint32_t inputs_count;
struct hp_user_inputs_collection inputs;
};
struct hp_peer
{
char pubkey[__HP_KEY_SIZE + 1];
char pubkey[HP_KEY_SIZE + 1];
};
struct hp_users_collection
{
int infd;
struct hp_user *list;
size_t count;
int in_fd;
};
struct hp_peers_collection
@@ -92,69 +96,51 @@ struct hp_peers_collection
struct hp_contract_context
{
bool readonly;
uint64_t timestamp;
char pubkey[__HP_KEY_SIZE + 1];
char lcl_hash[__HP_HASH_SIZE + 1];
uint64_t lcl_seq_no;
char pubkey[HP_KEY_SIZE + 1];
char lcl[HP_HASH_SIZE + 22]; // uint64(20 chars) + "-" + hash + nullchar
struct hp_users_collection users;
struct hp_peers_collection peers;
};
struct __hp_global_context
struct __hp_contract
{
struct hp_contract_context *cctx;
int control_fd;
bool should_stop;
void *user_inmap;
size_t user_inmap_size;
};
typedef void (*hp_contract_func)(const struct hp_contract_context *ctx);
typedef void (*hp_user_message_func)(const struct hp_contract_context *ctx,
const struct hp_user *user, const void *buf, const uint32_t len);
typedef void (*hp_peer_message_func)(const struct hp_contract_context *ctx,
const char *peerPubKey, const void *buf, const uint32_t len);
struct __hp_peer_message_thread_arg
{
const struct hp_contract_context *ctx;
hp_peer_message_func on_peer_message;
};
int hp_init();
int hp_user_message_loop(const struct hp_contract_context *ctx, hp_user_message_func on_user_message);
int hp_peer_message_listener(const struct hp_contract_context *ctx, hp_peer_message_func on_peer_message);
int hp_user_write(const struct hp_user *user, const uint8_t *buf, const uint32_t len);
int hp_user_writev(const struct hp_user *user, const struct iovec *bufs, const int buf_count);
int hp_peer_write(const struct hp_contract_context *ctx, const uint8_t *buf, const uint32_t len);
int hp_peer_writev(const struct hp_contract_context *ctx, const struct iovec *bufs, const int buf_count);
int hp_init_contract();
int hp_deinit_contract();
const struct hp_contract_context *hp_get_context();
const void *hp_init_user_input_mmap();
void hp_deinit_user_input_mmap();
int hp_write_user_msg(const struct hp_user *user, const void *buf, const uint32_t len);
int hp_writev_user_msg(const struct hp_user *user, const struct iovec *bufs, const int buf_count);
int hp_write_peer_msg(const void *buf, const uint32_t len);
int hp_writev_peer_msg(const struct iovec *bufs, const int buf_count);
int hp_read_peer_msg(void *msg_buf, char *pubkey_buf, const int timeout);
int hp_update_unl(const char *add, const size_t add_count, const char *remove, const size_t remove_count);
void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_object_s *object);
void __hp_free_contract_context(struct hp_contract_context *ctx);
void __hp_parse_args_json(const struct json_object_s *object);
int __hp_write_control_msg(const void *buf, const uint32_t len);
void __hp_free(void *ptr);
static void *__hp_peer_message_thread_func(void *arg);
static struct __hp_contract __hpc = {};
static void *__hp_control_message_thread_func(void *arg);
int __hp_control_write(const uint8_t *buf, const uint32_t len);
void __hp_on_control_message(const void *buf, const uint32_t len);
static struct __hp_global_context gctx = {};
static pthread_t __hp_control_thread = 0;
static pthread_t __hp_peer_thread = 0;
int hp_init(hp_contract_func contract_func)
int hp_init_contract()
{
if (!contract_func)
return -1;
if (__hpc.cctx)
return -1; // Already initialized.
char buf[4096];
const size_t len = read(STDIN_FILENO, buf, sizeof(buf));
if (len == -1)
{
perror("Error when reading stdin.");
return -1;
}
struct json_value_s *root = json_parse(buf, len);
@@ -164,121 +150,101 @@ int hp_init(hp_contract_func contract_func)
if (object->length > 0)
{
// Create and populate hotpocket context.
struct hp_contract_context ctx = {};
__hp_parse_args_json(&ctx, object);
free(root);
__hpc.cctx = (struct hp_contract_context *)malloc(sizeof(struct hp_contract_context));
__hp_parse_args_json(object);
__hp_free(root);
// Start control channel listener.
if (pthread_create(&__hp_control_thread, NULL, &__hp_control_message_thread_func, NULL) == -1)
{
perror("Error creating control thread");
goto error;
}
// Execute user defined contract function.
if (contract_func)
contract_func(&ctx);
// Instructs to all threads to gracefully stop.
gctx.should_stop = true;
if (__hp_peer_thread)
pthread_join(__hp_peer_thread, NULL);
__hp_peer_thread = 0;
pthread_join(__hp_control_thread, NULL);
__hp_control_thread = 0;
// Cleanup.
close(ctx.users.infd);
for (int i = 0; i < ctx.users.count; i++)
close(ctx.users.list[i].outfd);
close(ctx.peers.fd);
__hp_free_contract_context(&ctx);
// Send termination control message.
__hp_control_write("{\"type\":\"contract_end\"}", 23);
close(gctx.control_fd);
return 0;
}
}
error:
if (root)
free(root);
__hp_free(root);
return -1;
}
int hp_user_message_loop(const struct hp_contract_context *ctx, hp_user_message_func on_user_message)
int hp_deinit_contract()
{
int result = 0;
const int fd = ctx->users.infd;
struct hp_contract_context *cctx = __hpc.cctx;
if (!cctx)
return -1; // Not initialized.
// Cleanup user input mmap (if mapped).
hp_deinit_user_input_mmap();
// Cleanup user and peer fds.
close(cctx->users.in_fd);
for (int i = 0; i < cctx->users.count; i++)
close(cctx->users.list[i].outfd);
close(cctx->peers.fd);
// Cleanup user list allocation.
if (cctx->users.list)
{
for (int i = 0; i < cctx->users.count; i++)
__hp_free(cctx->users.list[i].inputs.list);
__hp_free(cctx->users.list);
}
// Cleanup peer list allocation.
__hp_free(cctx->peers.list);
// Cleanup contract context.
__hp_free(cctx);
// Send termination control message.
__hp_write_control_msg("{\"type\":\"contract_end\"}", 23);
close(__hpc.control_fd);
}
const struct hp_contract_context *hp_get_context()
{
return __hpc.cctx;
}
const void *hp_init_user_input_mmap()
{
if (__hpc.user_inmap)
return __hpc.user_inmap;
struct hp_contract_context *cctx = __hpc.cctx;
struct stat st;
if (fstat(fd, &st) == -1)
if (fstat(cctx->users.in_fd, &st) == -1)
{
perror("Error in user input fd stat");
return -1;
return NULL;
}
if (st.st_size == 0)
return 0;
return NULL;
const size_t mmap_size = __HP_MMAP_BLOCK_ALIGN(st.st_size);
void *fdptr = mmap(NULL, mmap_size, PROT_READ, MAP_PRIVATE, fd, 0);
if (fdptr == MAP_FAILED)
void *mmap_ptr = mmap(NULL, mmap_size, PROT_READ, MAP_PRIVATE, cctx->users.in_fd, 0);
if (mmap_ptr == MAP_FAILED)
{
perror("Error in user input fd mmap");
return -1;
return NULL;
}
close(fd); // We can close the fd after mmap.
for (int i = 0; i < ctx->users.count; i++)
{
const struct hp_user *user = &ctx->users.list[i];
for (int j = 0; j < user->inputs_count; j++)
{
const struct hp_user_input *input = &user->inputs[j];
on_user_message(ctx, user, (fdptr + input->offset), input->size);
}
}
munmap(fdptr, mmap_size);
__hpc.user_inmap = mmap_ptr;
__hpc.user_inmap_size = mmap_size;
return __hpc.user_inmap;
}
int hp_peer_message_listener(const struct hp_contract_context *ctx, hp_peer_message_func on_peer_message)
void hp_deinit_user_input_mmap()
{
if (__hp_peer_thread)
{
fprintf(stderr, "Peer listener already started.\n");
return -1;
}
// We need to malloc the arg so it doesn't go out of scope. (It will be freed by the thread func when it exits)
struct __hp_peer_message_thread_arg *arg = malloc(sizeof(struct __hp_peer_message_thread_arg));
arg->ctx = ctx;
arg->on_peer_message = on_peer_message;
if (pthread_create(&__hp_peer_thread, NULL, &__hp_peer_message_thread_func, arg) == -1)
{
perror("Error creating peer thread");
return -1;
}
return 0;
if (__hpc.user_inmap)
munmap(__hpc.user_inmap, __hpc.user_inmap_size);
__hpc.user_inmap = NULL;
__hpc.user_inmap_size = 0;
}
int hp_user_write(const struct hp_user *user, const uint8_t *buf, const uint32_t len)
int hp_write_user_msg(const struct hp_user *user, const void *buf, const uint32_t len)
{
const struct iovec vec = {(void *)buf, len};
return hp_user_writev(user, &vec, 1);
return hp_writev_user_msg(user, &vec, 1);
}
int hp_user_writev(const struct hp_user *user, const struct iovec *bufs, const int buf_count)
int hp_writev_user_msg(const struct hp_user *user, const struct iovec *bufs, const int buf_count)
{
const int total_buf_count = buf_count + 1;
struct iovec all_bufs[total_buf_count]; // We need to prepend the length header buf to indicate user message length.
@@ -291,41 +257,112 @@ int hp_user_writev(const struct hp_user *user, const struct iovec *bufs, const i
msg_len += bufs[i].iov_len;
}
uint8_t header_buf[__HP_MSG_HEADER_LEN];
uint8_t header_buf[__HP_STREAM_MSG_HEADER_SIZE];
__HP_TO_BE(msg_len, header_buf, 0);
all_bufs[0].iov_base = header_buf;
all_bufs[0].iov_len = __HP_MSG_HEADER_LEN;
all_bufs[0].iov_len = __HP_STREAM_MSG_HEADER_SIZE;
return writev(user->outfd, all_bufs, total_buf_count);
}
int hp_peer_write(const struct hp_contract_context *ctx, const uint8_t *buf, const uint32_t len)
int hp_write_peer_msg(const void *buf, const uint32_t len)
{
if (len > __HP_SEQPKT_BUF_SIZE)
if (len > HP_PEER_MSG_MAX_SIZE)
{
fprintf(stderr, "Peer message exceeds max length %d.", __HP_SEQPKT_BUF_SIZE);
fprintf(stderr, "Peer message exceeds max length %d.", HP_PEER_MSG_MAX_SIZE);
return -1;
}
return write(ctx->peers.fd, buf, len);
return write(__hpc.cctx->peers.fd, buf, len);
}
int hp_peer_writev(const struct hp_contract_context *ctx, const struct iovec *bufs, const int buf_count)
int hp_writev_peer_msg(const struct iovec *bufs, const int buf_count)
{
uint32_t len = 0;
for (int i = 0; i < buf_count; i++)
len += bufs[i].iov_len;
if (len > __HP_SEQPKT_BUF_SIZE)
if (len > HP_PEER_MSG_MAX_SIZE)
{
fprintf(stderr, "Peer message exceeds max length %d.", __HP_SEQPKT_BUF_SIZE);
fprintf(stderr, "Peer message exceeds max length %d.", HP_PEER_MSG_MAX_SIZE);
return -1;
}
return writev(ctx->peers.fd, bufs, buf_count);
return writev(__hpc.cctx->peers.fd, bufs, buf_count);
}
/**
* Reads a peer message (NPL) while waiting for 'timeout' milliseconds.
* @param msg_buf The buffer to place the incoming message. Must be of at least 'HP_PEER_MSG_MAX_SIZE' length.
* @param pubkey_buf The buffer to place the sender pubkey (hex). Must be of at least 'HP_KEY_SIZE' length.
* @param timeout Maximum milliseoncds to wait until a message arrives. If 0, returns immediately.
* If -1, waits forever until message arrives.
* @return Message length on success. 0 if no message arrived within timeout. -1 on error.
*/
int hp_read_peer_msg(void *msg_buf, char *pubkey_buf, const int timeout)
{
struct pollfd pfd = {__hpc.cctx->peers.fd, POLLIN, 0};
// Peer messages consist of alternating SEQ packets of pubkey and data.
// So we need to wait for both pubkey and data packets to form a complete peer message.
// Wait for the pubkey.
if (poll(&pfd, 1, timeout) == -1)
{
perror("Peer channel pubkey poll error");
return -1;
}
else if (pfd.revents & (POLLHUP | POLLERR | POLLNVAL))
{
fprintf(stderr, "Peer channel pubkey poll returned error: %d\n", pfd.revents);
return -1;
}
else if (pfd.revents & POLLIN)
{
// Read pubkey.
if (read(pfd.fd, pubkey_buf, HP_KEY_SIZE) == -1)
{
perror("Error reading pubkey from peer channel");
return -1;
}
// Wait for data. (data should be available immediately because we have received the pubkey)
pfd.revents = 0;
if (poll(&pfd, 1, 100) == -1)
{
perror("Peer channel data poll error");
return -1;
}
else if (pfd.revents & (POLLHUP | POLLERR | POLLNVAL))
{
fprintf(stderr, "Peer channel data poll returned error: %d\n", pfd.revents);
return -1;
}
else if (pfd.revents & POLLIN)
{
// Read data.
const int readres = read(pfd.fd, msg_buf, HP_PEER_MSG_MAX_SIZE);
if (readres == -1)
{
perror("Error reading pubkey from peer channel");
return -1;
}
return readres;
}
}
return 0;
}
/**
* Updates the UNL of this node with specified 'add' and 'remove' changesets.
* @param add Array of hex pubkeys of 'HP_KEY_SIZE' to add.
* @param add_count Number of elements in 'add' array.
* @param remove Array of hex pubkeys of 'HP_KEY_SIZE' to remove.
* @param remove_count Number of elements in 'add' remove.
* @return 0 on success. -1 on error.
*/
int hp_update_unl(const char *add, const size_t add_count, const char *remove, const size_t remove_count)
{
// We assume 'add' and 'remove' are pointing to a char buffer containing 'count' no. of char[64] buffers.
@@ -362,47 +399,37 @@ int hp_update_unl(const char *add, const size_t add_count, const char *remove, c
strncpy(json_buf + pos, "]}", 2);
return __hp_control_write(json_buf, json_size);
return __hp_write_control_msg(json_buf, json_size);
}
void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_object_s *object)
void __hp_parse_args_json(const struct json_object_s *object)
{
const struct json_object_element_s *elem = object->start;
struct hp_contract_context *cctx = __hpc.cctx;
do
{
const struct json_string_s *k = elem->name;
if (strcmp(k->string, "pubkey") == 0)
{
__HP_ASSIGN_STRING(ctx->pubkey, elem);
__HP_ASSIGN_STRING(cctx->pubkey, elem);
}
else if (strcmp(k->string, "ts") == 0)
{
__HP_ASSIGN_UINT64(ctx->timestamp, elem);
__HP_ASSIGN_UINT64(cctx->timestamp, elem);
}
else if (strcmp(k->string, "readonly") == 0)
{
__HP_ASSIGN_BOOL(ctx->readonly, elem);
__HP_ASSIGN_BOOL(cctx->readonly, elem);
}
else if (strcmp(k->string, "lcl") == 0)
{
if (elem->value->type == json_type_string)
{
const struct json_string_s *value = (struct json_string_s *)elem->value->payload;
const char *delim = "-";
char *tok_ptr;
char *tok_str = strdup(value->string);
const char *seq_str = strtok_r(tok_str, delim, &tok_ptr);
const char *hash_str = strtok_r(NULL, delim, &tok_ptr);
ctx->lcl_seq_no = strtoull(seq_str, NULL, 0);
memcpy(ctx->lcl_hash, hash_str, __HP_HASH_SIZE);
free(tok_str);
}
__HP_ASSIGN_STRING(cctx->lcl, elem);
}
else if (strcmp(k->string, "userinfd") == 0)
{
__HP_ASSIGN_INT(ctx->users.infd, elem);
__HP_ASSIGN_INT(cctx->users.in_fd, elem);
}
else if (strcmp(k->string, "users") == 0)
{
@@ -411,16 +438,16 @@ void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_obj
const struct json_object_s *user_object = (struct json_object_s *)elem->value->payload;
const size_t user_count = user_object->length;
ctx->users.count = user_count;
ctx->users.list = user_count ? (struct hp_user *)malloc(sizeof(struct hp_user) * user_count) : NULL;
cctx->users.count = user_count;
cctx->users.list = user_count ? (struct hp_user *)malloc(sizeof(struct hp_user) * user_count) : NULL;
if (user_count > 0)
{
struct json_object_element_s *user_elem = user_object->start;
for (int i = 0; i < user_count; i++)
{
struct hp_user *user = &ctx->users.list[i];
memcpy(user->pubkey, user_elem->name->string, __HP_KEY_SIZE);
struct hp_user *user = &cctx->users.list[i];
memcpy(user->pubkey, user_elem->name->string, HP_KEY_SIZE);
if (user_elem->value->type == json_type_array)
{
@@ -432,17 +459,17 @@ void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_obj
arr_elem = arr_elem->next;
// Subsequent elements are tupels of [offset, size] of input messages for this user.
user->inputs_count = arr->length - 1;
user->inputs = user->inputs_count ? malloc(user->inputs_count * sizeof(struct hp_user_input)) : NULL;
for (int i = 0; i < user->inputs_count; i++)
user->inputs.count = arr->length - 1;
user->inputs.list = user->inputs.count ? (struct hp_user_input *)malloc(user->inputs.count * sizeof(struct hp_user_input)) : NULL;
for (int i = 0; i < user->inputs.count; i++)
{
if (arr_elem->value->type == json_type_array)
{
const struct json_array_s *input_info = (struct json_array_s *)arr_elem->value->payload;
if (input_info->length == 2)
{
__HP_ASSIGN_UINT64(user->inputs[i].offset, input_info->start);
__HP_ASSIGN_UINT64(user->inputs[i].size, input_info->start->next);
__HP_ASSIGN_UINT64(user->inputs.list[i].offset, input_info->start);
__HP_ASSIGN_UINT64(user->inputs.list[i].size, input_info->start->next);
}
}
arr_elem = arr_elem->next;
@@ -455,7 +482,7 @@ void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_obj
}
else if (strcmp(k->string, "nplfd") == 0)
{
__HP_ASSIGN_INT(ctx->peers.fd, elem);
__HP_ASSIGN_INT(cctx->peers.fd, elem);
}
else if (strcmp(k->string, "unl") == 0)
{
@@ -464,15 +491,15 @@ void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_obj
const struct json_array_s *peer_array = (struct json_array_s *)elem->value->payload;
const size_t peer_count = peer_array->length;
ctx->peers.count = peer_count;
ctx->peers.list = peer_count ? (struct hp_peer *)malloc(sizeof(struct hp_peer) * peer_count) : NULL;
cctx->peers.count = peer_count;
cctx->peers.list = peer_count ? (struct hp_peer *)malloc(sizeof(struct hp_peer) * peer_count) : NULL;
if (peer_count > 0)
{
struct json_array_element_s *peer_elem = peer_array->start;
for (int i = 0; i < peer_count; i++)
{
__HP_ASSIGN_STRING(ctx->peers.list[i].pubkey, peer_elem);
__HP_ASSIGN_STRING(cctx->peers.list[i].pubkey, peer_elem);
peer_elem = peer_elem->next;
}
}
@@ -480,170 +507,28 @@ void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_obj
}
else if (strcmp(k->string, "controlfd") == 0)
{
__HP_ASSIGN_INT(gctx.control_fd, elem);
__HP_ASSIGN_INT(__hpc.control_fd, elem);
}
elem = elem->next;
} while (elem);
}
static void *__hp_peer_message_thread_func(void *arg)
int __hp_write_control_msg(const void *buf, const uint32_t len)
{
const struct __hp_peer_message_thread_arg *args = (const struct __hp_peer_message_thread_arg *)arg;
// Pubkey buf to hold the sender pubkey of the message that follows it.
bool has_pubkey = false;
char pubkey_buf[__HP_KEY_SIZE + 1];
memset(pubkey_buf, 0, sizeof(pubkey_buf));
// Buffer to hold current message.
uint8_t *msg_buf = malloc(__HP_SEQPKT_BUF_SIZE);
struct pollfd pfd = {args->ctx->peers.fd, POLLIN, 0};
while (!gctx.should_stop)
if (len > __HP_SEQPKT_MAX_SIZE)
{
// Reset poll fd because we are reusing it.
pfd.revents = 0;
if (poll(&pfd, 1, __HP_POLL_TIMEOUT) == -1)
{
perror("Peer channel poll error");
goto error;
}
short result = pfd.revents;
if (result == 0)
continue;
if (result & (POLLHUP | POLLERR | POLLNVAL))
{
fprintf(stderr, "Peer channel poll returned error.\n");
goto error;
}
else if (result & POLLIN)
{
// The read data alternates between the sender pubkey and the message.
if (!has_pubkey)
{
if (read(pfd.fd, pubkey_buf, __HP_KEY_SIZE) == -1)
{
perror("Error reading pubkey from peer channel");
goto error;
}
has_pubkey = true;
}
else
{
const size_t read_res = read(pfd.fd, msg_buf, __HP_SEQPKT_BUF_SIZE);
if (read_res == -1)
{
perror("Error reading message from peer channel");
goto error;
}
// Invoke the user defined peer message handler func.
args->on_peer_message(args->ctx, pubkey_buf, msg_buf, read_res);
has_pubkey = false;
}
}
}
// If we reach here that means result is successful.
goto end;
error:
// Perform any error handling.
end:
free(arg);
free(msg_buf);
return NULL;
}
static void *__hp_control_message_thread_func(void *arg)
{
int result = 0;
// Temp buffer for all read operations.
uint8_t *buf = malloc(__HP_SEQPKT_BUF_SIZE);
struct pollfd pfd = {gctx.control_fd, POLLIN, 0};
while (!gctx.should_stop)
{
// Reset poll fd because we are reusing it.
pfd.revents = 0;
if (poll(&pfd, 1, __HP_POLL_TIMEOUT) == -1)
{
perror("Control channel poll error");
goto error;
}
short result = pfd.revents;
if (result == 0)
continue;
if (result & (POLLHUP | POLLERR | POLLNVAL))
{
fprintf(stderr, "Control channel poll returned error.\n");
goto error;
}
else if (result & POLLIN)
{
const size_t read_res = read(pfd.fd, buf, __HP_SEQPKT_BUF_SIZE);
if (read_res == -1)
{
perror("Error reading control channel");
goto error;
}
__hp_on_control_message(buf, read_res);
}
}
// If we reach here that means result is successful.
goto end;
error:
// Perform any error handling.
end:
free(buf);
return NULL;
}
void __hp_on_control_message(const void *buf, const uint32_t len)
{
// TODO: Handle control messages from hot pocket.
}
void __hp_free_contract_context(struct hp_contract_context *ctx)
{
if (ctx->users.list)
{
for (int i = 0; i < ctx->users.count; i++)
{
if (ctx->users.list[i].inputs)
free(ctx->users.list[i].inputs);
}
free(ctx->users.list);
}
if (ctx->peers.list)
free(ctx->peers.list);
}
int __hp_control_write(const uint8_t *buf, const uint32_t len)
{
if (len > __HP_SEQPKT_BUF_SIZE)
{
fprintf(stderr, "Control message exceeds max length %d.", __HP_SEQPKT_BUF_SIZE);
fprintf(stderr, "Control message exceeds max length %d.", __HP_SEQPKT_MAX_SIZE);
return -1;
}
return write(gctx.control_fd, buf, len);
return write(__hpc.control_fd, buf, len);
}
void __hp_free(void *ptr)
{
free(ptr);
ptr = NULL;
}
#endif

View File

@@ -55,19 +55,12 @@ class ContractExecutionContext {
#controlChannel = null;
constructor(hpargs, users, peers, controlChannel) {
this.#controlChannel = controlChannel;
this.readonly = hpargs.readonly;
this.timestamp = hpargs.ts;
this.users = users;
this.peers = peers;
this.#controlChannel = controlChannel;
if (!hpargs.readonly) {
const lclParts = hpargs.lcl.split("-");
this.lcl = {
seqNo: parseInt(lclParts[0]),
hash: lclParts[1]
};
}
this.peers = peers; // Not available in readonly mode.
this.lcl = hpargs.lcl; // Not available in readonly mode.
}
async updateUnl(addArray, removeArray) {

View File

@@ -18,7 +18,7 @@ hpcore=$(realpath ../..)
if [ "$CONTRACT" = "cecho" ]; then # C echo contract
echo "Using C echo contract."
pushd $hpcore/examples/c_contract/ > /dev/null 2>&1
gcc echo_contract.c -o echo_contract -pthread
gcc echo_contract.c -o echo_contract
popd > /dev/null 2>&1
copyfiles="$hpcore/examples/c_contract/echo_contract"
binary="/contract/bin/echo_contract"