Naming changes and reintroducing key prefix to contract libraries. (#202)

This commit is contained in:
Ravin Perera
2020-12-23 19:23:46 +05:30
committed by GitHub
parent e835e18d18
commit 8dc20bdab0
8 changed files with 107 additions and 99 deletions

View File

@@ -40,21 +40,21 @@ int main(int argc, char **argv)
}
}
// Peer message send example:
// hp_write_peer_msg("Hello!", 6);
// NPL message send example:
// hp_write_npl_msg("Hello!", 6);
// Peer message receive example:
// NPL message receive example:
// // Allocate buffers for received message.
// char sender[HP_KEY_SIZE];
// char *msg = malloc(HP_PEER_MSG_MAX_SIZE);
// // Wait for 200ms for incoming message. We will receive our own message as well.
// const int len = hp_read_peer_msg(msg, sender, 200);
// char *msg = malloc(HP_NPL_MSG_MAX_SIZE);
// // Wait for 200ms for incoming message. We will receive our own message as well if we are part of unl.
// const int len = hp_read_npl_msg(msg, sender, 200);
// if (len > 0)
// printf("Received %.*s from %.*s", len, msg, HP_KEY_SIZE, sender);
// free(msg);
// Update UNL example:
// hp_update_unl("<64 char hex to add>", 1, "<64 char hex to remove>", 1);
// hp_update_unl("<66 char hex to add>", 1, "<66 char hex to remove>", 1);
hp_deinit_user_input_mmap();
hp_deinit_contract();

View File

@@ -15,8 +15,8 @@
#define __HP_MMAP_BLOCK_ALIGN(x) (((x) + ((off_t)(__HP_MMAP_BLOCK_SIZE)-1)) & ~((off_t)(__HP_MMAP_BLOCK_SIZE)-1))
#define __HP_STREAM_MSG_HEADER_SIZE 4
#define __HP_SEQPKT_MAX_SIZE 131072 // 128KB to support SEQ_PACKET sockets.
#define HP_PEER_MSG_MAX_SIZE __HP_SEQPKT_MAX_SIZE
#define HP_KEY_SIZE 64
#define HP_NPL_MSG_MAX_SIZE __HP_SEQPKT_MAX_SIZE
#define HP_KEY_SIZE 66 // Hex pubkey size. (64 char key + 2 chars for key type prfix)
#define HP_HASH_SIZE 64
#define __HP_ASSIGN_STRING(dest, elem) \
@@ -77,16 +77,18 @@ struct hp_user_inputs_collection
size_t count;
};
// Represents a user that is connected to HP cluster.
struct hp_user
{
char pubkey[HP_KEY_SIZE + 1];
char pubkey[HP_KEY_SIZE + 1]; // +1 for null char.S
int outfd;
struct hp_user_inputs_collection inputs;
};
struct hp_peer
// Represents a node that's part of unl.
struct hp_unl_node
{
char pubkey[HP_KEY_SIZE + 1];
char pubkey[HP_KEY_SIZE + 1]; // +1 for null char.S
};
struct hp_users_collection
@@ -96,21 +98,21 @@ struct hp_users_collection
int in_fd;
};
struct hp_peers_collection
struct hp_unl_collection
{
struct hp_peer *list;
struct hp_unl_node *list;
size_t count;
int fd;
int npl_fd;
};
struct hp_contract_context
{
bool readonly;
uint64_t timestamp;
char pubkey[HP_KEY_SIZE + 1];
char lcl[HP_HASH_SIZE + 22]; // uint64(20 chars) + "-" + hash + nullchar
char pubkey[HP_KEY_SIZE + 1]; // +1 for null char.S
char lcl[HP_HASH_SIZE + 22]; // uint64(20 chars) + "-" + hash + nullchar
struct hp_users_collection users;
struct hp_peers_collection peers;
struct hp_unl_collection unl;
};
struct __hp_contract
@@ -128,9 +130,9 @@ const void *hp_init_user_input_mmap();
void hp_deinit_user_input_mmap();
int hp_write_user_msg(const struct hp_user *user, const void *buf, const uint32_t len);
int hp_writev_user_msg(const struct hp_user *user, const struct iovec *bufs, const int buf_count);
int hp_write_peer_msg(const void *buf, const uint32_t len);
int hp_writev_peer_msg(const struct iovec *bufs, const int buf_count);
int hp_read_peer_msg(void *msg_buf, char *pubkey_buf, const int timeout);
int hp_write_npl_msg(const void *buf, const uint32_t len);
int hp_writev_npl_msg(const struct iovec *bufs, const int buf_count);
int hp_read_npl_msg(void *msg_buf, char *pubkey_buf, const int timeout);
int hp_update_unl(const char *add, const size_t add_count, const char *remove, const size_t remove_count);
void __hp_parse_args_json(const struct json_object_s *object);
@@ -189,11 +191,11 @@ int hp_deinit_contract()
// Cleanup user input mmap (if mapped).
hp_deinit_user_input_mmap();
// Cleanup user and peer fds.
// Cleanup user and npl fd.
close(cctx->users.in_fd);
for (int i = 0; i < cctx->users.count; i++)
close(cctx->users.list[i].outfd);
close(cctx->peers.fd);
close(cctx->unl.npl_fd);
// Cleanup user list allocation.
if (cctx->users.list)
@@ -203,8 +205,8 @@ int hp_deinit_contract()
__hp_free(cctx->users.list);
}
// Cleanup peer list allocation.
__hp_free(cctx->peers.list);
// Cleanup unl list allocation.
__hp_free(cctx->unl.list);
// Cleanup contract context.
__hp_free(cctx);
@@ -283,56 +285,56 @@ int hp_writev_user_msg(const struct hp_user *user, const struct iovec *bufs, con
return writev(user->outfd, all_bufs, total_buf_count);
}
int hp_write_peer_msg(const void *buf, const uint32_t len)
int hp_write_npl_msg(const void *buf, const uint32_t len)
{
if (len > HP_PEER_MSG_MAX_SIZE)
if (len > HP_NPL_MSG_MAX_SIZE)
{
fprintf(stderr, "Peer message exceeds max length %d.", HP_PEER_MSG_MAX_SIZE);
fprintf(stderr, "NPL message exceeds max length %d.", HP_NPL_MSG_MAX_SIZE);
return -1;
}
return write(__hpc.cctx->peers.fd, buf, len);
return write(__hpc.cctx->unl.npl_fd, buf, len);
}
int hp_writev_peer_msg(const struct iovec *bufs, const int buf_count)
int hp_writev_npl_msg(const struct iovec *bufs, const int buf_count)
{
uint32_t len = 0;
for (int i = 0; i < buf_count; i++)
len += bufs[i].iov_len;
if (len > HP_PEER_MSG_MAX_SIZE)
if (len > HP_NPL_MSG_MAX_SIZE)
{
fprintf(stderr, "Peer message exceeds max length %d.", HP_PEER_MSG_MAX_SIZE);
fprintf(stderr, "NPL message exceeds max length %d.", HP_NPL_MSG_MAX_SIZE);
return -1;
}
return writev(__hpc.cctx->peers.fd, bufs, buf_count);
return writev(__hpc.cctx->unl.npl_fd, bufs, buf_count);
}
/**
* Reads a peer message (NPL) while waiting for 'timeout' milliseconds.
* @param msg_buf The buffer to place the incoming message. Must be of at least 'HP_PEER_MSG_MAX_SIZE' length.
* Reads a NPL message while waiting for 'timeout' milliseconds.
* @param msg_buf The buffer to place the incoming message. Must be of at least 'HP_NPL_MSG_MAX_SIZE' length.
* @param pubkey_buf The buffer to place the sender pubkey (hex). Must be of at least 'HP_KEY_SIZE' length.
* @param timeout Maximum milliseoncds to wait until a message arrives. If 0, returns immediately.
* If -1, waits forever until message arrives.
* @return Message length on success. 0 if no message arrived within timeout. -1 on error.
*/
int hp_read_peer_msg(void *msg_buf, char *pubkey_buf, const int timeout)
int hp_read_npl_msg(void *msg_buf, char *pubkey_buf, const int timeout)
{
struct pollfd pfd = {__hpc.cctx->peers.fd, POLLIN, 0};
struct pollfd pfd = {__hpc.cctx->unl.npl_fd, POLLIN, 0};
// Peer messages consist of alternating SEQ packets of pubkey and data.
// So we need to wait for both pubkey and data packets to form a complete peer message.
// NPL messages consist of alternating SEQ packets of pubkey and data.
// So we need to wait for both pubkey and data packets to form a complete NPL message.
// Wait for the pubkey.
if (poll(&pfd, 1, timeout) == -1)
{
perror("Peer channel pubkey poll error");
perror("NPL channel pubkey poll error");
return -1;
}
else if (pfd.revents & (POLLHUP | POLLERR | POLLNVAL))
{
fprintf(stderr, "Peer channel pubkey poll returned error: %d\n", pfd.revents);
fprintf(stderr, "NPL channel pubkey poll returned error: %d\n", pfd.revents);
return -1;
}
else if (pfd.revents & POLLIN)
@@ -340,7 +342,7 @@ int hp_read_peer_msg(void *msg_buf, char *pubkey_buf, const int timeout)
// Read pubkey.
if (read(pfd.fd, pubkey_buf, HP_KEY_SIZE) == -1)
{
perror("Error reading pubkey from peer channel");
perror("Error reading pubkey from NPL channel");
return -1;
}
@@ -348,21 +350,21 @@ int hp_read_peer_msg(void *msg_buf, char *pubkey_buf, const int timeout)
pfd.revents = 0;
if (poll(&pfd, 1, 100) == -1)
{
perror("Peer channel data poll error");
perror("NPL channel data poll error");
return -1;
}
else if (pfd.revents & (POLLHUP | POLLERR | POLLNVAL))
{
fprintf(stderr, "Peer channel data poll returned error: %d\n", pfd.revents);
fprintf(stderr, "NPL channel data poll returned error: %d\n", pfd.revents);
return -1;
}
else if (pfd.revents & POLLIN)
{
// Read data.
const int readres = read(pfd.fd, msg_buf, HP_PEER_MSG_MAX_SIZE);
const int readres = read(pfd.fd, msg_buf, HP_NPL_MSG_MAX_SIZE);
if (readres == -1)
{
perror("Error reading pubkey from peer channel");
perror("Error reading pubkey from NPL channel");
return -1;
}
return readres;
@@ -382,12 +384,14 @@ int hp_read_peer_msg(void *msg_buf, char *pubkey_buf, const int timeout)
*/
int hp_update_unl(const char *add, const size_t add_count, const char *remove, const size_t remove_count)
{
// We assume 'add' and 'remove' are pointing to a char buffer containing 'count' no. of char[64] buffers.
// We assume 'add' and 'remove' are pointing to char buffers containing 'count' no. of char[HP_KEY_SIZE] buffers.
// Calculate total json message length and prepare the json buf.
// Format: {"type":"unl_changeset","add":["pubkey1",...],"remove":["pubkey2",...]}
const size_t json_size = 45 + (67 * add_count - (add_count ? 1 : 0)) + (67 * remove_count - (remove_count ? 1 : 0));
// {"type":"unl_changeset","add":[],"remove":[]} => length 45
// "pubkey", (HP_KEY_SIZE+quotes+comma) => length 69
const size_t json_size = 45 + (69 * add_count - (add_count ? 1 : 0)) + (69 * remove_count - (remove_count ? 1 : 0));
char json_buf[json_size];
strncpy(json_buf, "{\"type\":\"unl_changeset\",\"add\":[", 31);
@@ -397,8 +401,8 @@ int hp_update_unl(const char *add, const size_t add_count, const char *remove, c
if (i > 0)
json_buf[pos++] = ',';
json_buf[pos++] = '"';
strncpy(json_buf + pos, add + (i * 64), 64);
pos += 64;
strncpy(json_buf + pos, add + (i * HP_KEY_SIZE), HP_KEY_SIZE);
pos += HP_KEY_SIZE;
json_buf[pos++] = '"';
}
@@ -409,8 +413,8 @@ int hp_update_unl(const char *add, const size_t add_count, const char *remove, c
if (i > 0)
json_buf[pos++] = ',';
json_buf[pos++] = '"';
strncpy(json_buf + pos, remove + (i * 64), 64);
pos += 64;
strncpy(json_buf + pos, remove + (i * HP_KEY_SIZE), HP_KEY_SIZE);
pos += HP_KEY_SIZE;
json_buf[pos++] = '"';
}
@@ -499,25 +503,25 @@ void __hp_parse_args_json(const struct json_object_s *object)
}
else if (strcmp(k->string, "nplfd") == 0)
{
__HP_ASSIGN_INT(cctx->peers.fd, elem);
__HP_ASSIGN_INT(cctx->unl.npl_fd, elem);
}
else if (strcmp(k->string, "unl") == 0)
{
if (elem->value->type == json_type_array)
{
const struct json_array_s *peer_array = (struct json_array_s *)elem->value->payload;
const size_t peer_count = peer_array->length;
const struct json_array_s *unl_array = (struct json_array_s *)elem->value->payload;
const size_t unl_count = unl_array->length;
cctx->peers.count = peer_count;
cctx->peers.list = peer_count ? (struct hp_peer *)malloc(sizeof(struct hp_peer) * peer_count) : NULL;
cctx->unl.count = unl_count;
cctx->unl.list = unl_count ? (struct hp_unl_node *)malloc(sizeof(struct hp_unl_node) * unl_count) : NULL;
if (peer_count > 0)
if (unl_count > 0)
{
struct json_array_element_s *peer_elem = peer_array->start;
for (int i = 0; i < peer_count; i++)
struct json_array_element_s *unl_elem = unl_array->start;
for (int i = 0; i < unl_count; i++)
{
__HP_ASSIGN_STRING(cctx->peers.list[i].pubkey, peer_elem);
peer_elem = peer_elem->next;
__HP_ASSIGN_STRING(cctx->unl.list[i].pubkey, unl_elem);
unl_elem = unl_elem->next;
}
}
}

View File

@@ -2,7 +2,7 @@ const HotPocket = require("./hp-contract-lib");
const fs = require('fs');
// HP smart contract is defined as a function which takes HP ExecutionContext as an argument.
// HP considers execution as complete, when this function completes and all the peer message callbacks are complete.
// HP considers execution as complete, when this function completes and all the NPL message callbacks are complete.
const echoContract = async (ctx) => {
// We just save execution timestamp as an example state file change.
@@ -35,19 +35,22 @@ const echoContract = async (ctx) => {
// Get the user identified by public key.
// ctx.users.find("<PubkeyHex>");
// Get list of all peers in the cluster.
// ctx.peers.list();
// Get list of all unl nodes in the cluster.
// ctx.unl.list();
// Get the peer identified by public key.
// ctx.peers.find("<PubkeyHex>");
// Get the unl node identified by public key.
// ctx.unl.find("<PubkeyHex>");
// Peer messages example.
// NPL messages example.
// if (!ctx.readonly) {
// ctx.peers.onMessage((peer, msg) => {
// console.log(msg + " from " + peer.pubKey);
// ctx.unl.onMessage((node, msg) => {
// console.log(msg + " from " + node.pubKey);
// })
// await ctx.peers.send("Hello");
// await ctx.unl.send("Hello");
// }
// Update UNL example:
// ctx.updateUnl(["<add pubkey hex>"], ["<remove pubkey hex>"]);
}
const hpc = new HotPocket.Contract();

View File

@@ -48,8 +48,8 @@ class HotPocketContract {
const nplChannel = new NplChannel(hpargs.nplfd);
const users = new UsersCollection(hpargs.userinfd, hpargs.users, this.#clientProtocol);
const peers = new PeersCollection(hpargs.readonly, hpargs.unl, nplChannel, pendingTasks);
const executionContext = new ContractExecutionContext(hpargs, users, peers, this.#controlChannel);
const unl = new UnlCollection(hpargs.readonly, hpargs.unl, nplChannel, pendingTasks);
const executionContext = new ContractExecutionContext(hpargs, users, unl, this.#controlChannel);
invokeCallback(contractFunc, executionContext).catch(errHandler).finally(() => {
// Wait for any pending tasks added during execution.
@@ -70,12 +70,12 @@ class ContractExecutionContext {
#controlChannel = null;
constructor(hpargs, users, peers, controlChannel) {
constructor(hpargs, users, unl, controlChannel) {
this.#controlChannel = controlChannel;
this.readonly = hpargs.readonly;
this.timestamp = hpargs.ts;
this.users = users;
this.peers = peers; // Not available in readonly mode.
this.unl = unl; // Not available in readonly mode.
this.lcl = hpargs.lcl; // Not available in readonly mode.
}
@@ -181,8 +181,8 @@ class UserChannel {
}
}
class PeersCollection {
#peers = {};
class UnlCollection {
nodes = {};
#channel = null;
#readonly = false;
#pendingTasks = null;
@@ -193,48 +193,49 @@ class PeersCollection {
if (!readonly) {
unl.forEach(pubKey => {
this.#peers[pubKey] = new Peer(pubKey);
this.nodes[pubKey] = new UnlNode(pubKey);
});
this.#channel = channel;
}
}
// Returns the Peer for the specified pubkey. Returns null if not found.
// Returns the unl node for the specified pubkey. Returns null if not found.
find(pubKey) {
return this.#peers[pubKey];
return this.nodes[pubKey];
}
// Returns all the peers.
// Returns all the unl nodes.
list() {
return Object.values(this.#peers);
return Object.values(this.nodes);
}
count() {
return Object.keys(this.#peers).length;
return Object.keys(this.nodes).length;
}
// Registers for peer messages.
// Registers for NPL messages.
onMessage(callback) {
if (this.#readonly)
throw "Peer messages not available in readonly mode.";
throw "NPL messages not available in readonly mode.";
this.#channel.consume((pubKey, msg) => {
this.#pendingTasks.push(invokeCallback(callback, this.#peers[pubKey], msg));
this.#pendingTasks.push(invokeCallback(callback, this.nodes[pubKey], msg));
});
}
// Broadcasts a message to all peers (including self).
// Broadcasts a message to all unl nodes (including self if self is part of unl).
async send(msg) {
if (this.#readonly)
throw "Peer messages not available in readonly mode.";
throw "NPL messages not available in readonly mode.";
await this.#channel.send(msg);
}
}
class Peer {
// Represents a node that's part of unl.
class UnlNode {
pubKey = null;
constructor(pubKey) {
@@ -242,6 +243,7 @@ class Peer {
}
}
// Represents the node-party-line that can be used to communicate with unl nodes.
class NplChannel {
#readStream = null;
@@ -276,7 +278,7 @@ class NplChannel {
send(msg) {
const buf = Buffer.from(msg);
if (buf.length > MAX_SEQ_PACKET_SIZE)
throw ("Peer message exceeds max size " + MAX_SEQ_PACKET_SIZE);
throw ("NPL message exceeds max size " + MAX_SEQ_PACKET_SIZE);
return writeAsync(this.#fd, buf);
}

View File

@@ -196,9 +196,8 @@ int main(int argc, char **argv)
hplog::init();
LOG_INFO << "Hot Pocket " << util::HP_VERSION;
LOG_INFO << "Role: "
<< (conf::cfg.node.role == conf::ROLE::OBSERVER ? "Observer" : "Validator");
LOG_INFO << "Public key: " << conf::cfg.node.pub_key_hex.substr(2); // Public key without 'ed' prefix.
LOG_INFO << "Role: " << (conf::cfg.node.role == conf::ROLE::OBSERVER ? "Observer" : "Validator");
LOG_INFO << "Public key: " << conf::cfg.node.pub_key_hex;
LOG_INFO << "Contract: " << conf::cfg.contract.id << " (" << conf::cfg.contract.version << ")";
if (ledger::init() == -1 ||

View File

@@ -80,7 +80,7 @@ namespace msg::controlmsg::json
for (const auto &v : d[field_name].array_range())
{
std::string hex_pubkey = "ed" + v.as<std::string>();
std::string_view hex_pubkey = v.as<std::string_view>();
std::string bin_pubkey;
bin_pubkey.resize(crypto::PFXD_PUBKEY_BYTES);
if (util::hex2bin(

View File

@@ -258,7 +258,7 @@ namespace sc
std::ostringstream os;
os << "{\"version\":\"" << util::HP_VERSION
<< "\",\"pubkey\":\"" << conf::cfg.node.pub_key_hex.substr(2)
<< "\",\"pubkey\":\"" << conf::cfg.node.pub_key_hex
<< "\",\"ts\":" << ctx.args.time
<< ",\"readonly\":" << (ctx.args.readonly ? "true" : "false");
@@ -440,8 +440,8 @@ namespace sc
std::string pubkeyhex;
util::bin2hex(
pubkeyhex,
reinterpret_cast<const unsigned char *>(npl_msg.pubkey.data()) + 1, // Skip first byte for key type prefix.
npl_msg.pubkey.length() - 1);
reinterpret_cast<const unsigned char *>(npl_msg.pubkey.data()),
npl_msg.pubkey.length());
// Writing the public key to the contract's fd (Skip first byte for key type prefix).
if (write(writefd, pubkeyhex.data(), pubkeyhex.size()) == -1)

View File

@@ -166,8 +166,8 @@ namespace unl
std::string pubkeyhex;
util::bin2hex(
pubkeyhex,
reinterpret_cast<const unsigned char *>(pk->data()) + 1,
pk->length() - 1);
reinterpret_cast<const unsigned char *>(pk->data()),
pk->length());
os << "\"" << pubkeyhex << "\"";
}