Refactored NPL message processing. Passed lcl to contract args. (#105)

This commit is contained in:
Ravin Perera
2020-08-05 21:30:48 +05:30
committed by GitHub
parent 1328b06346
commit d4a786e3b9
10 changed files with 190 additions and 142 deletions

View File

@@ -4,8 +4,18 @@ function HotPocketContract() {
const hpargs = JSON.parse(fs.readFileSync(0, 'utf8'));
this.readonly = hpargs.readonly;
this.timestamp = hpargs.ts;
this.users = {};
if (!this.readonly) {
const lclParts = hpargs.lcl.split("-");
this.lcl = {
seqNo: parseInt(lclParts[0]),
hash: lclParts[1]
};
this.npl = new HotPocketNplChannel(hpargs.nplfd[0], hpargs.nplfd[1]);
}
this.users = {};
Object.keys(hpargs.usrfd).forEach((userPubKey) => {
const userfds = hpargs.usrfd[userPubKey];
this.users[userPubKey] = new HotPocketChannel(userfds[0], userfds[1]);
@@ -22,6 +32,59 @@ function HotPocketChannel(infd, outfd) {
}
}
function HotPocketNplChannel(infd, outfd) {
this.readInput = function () {
if (infd == -1)
return null;
// Input may consist of multiple messages.
// Each message has the format:
// | NPL version (1 byte) | reserve (1 byte) | msg length (2 bytes BE) | peer pubkey (32 bytes) | msg |
const inputs = []; // Peer inputs will be populated to this.
const buf = fs.readFileSync(infd);
let pos = 0;
while (pos < buf.byteLength) {
pos += 2; // Skip version and reserve.
// Read message len.
const msgLenBuf = readBytes(buf, pos, 2);
if (!msgLenBuf) break;
const msgLen = msgLenBuf.readUInt16BE();
pos += 2;
const pubKeyBuf = readBytes(buf, pos, 32);
if (!pubKeyBuf) break;
pos += 32;
const msgBuf = readBytes(buf, pos, msgLen)
if (!msgBuf) break;
inputs.push({
pubkey: pubKeyBuf.toString("hex"),
input: msgBuf
});
pos += msgLen;
}
return inputs;
}
this.sendOutput = function (output) {
fs.writeFileSync(outfd, output);
}
const readBytes = function (buf, pos, count) {
if (pos + count > buf.byteLength)
return null;
return buf.slice(pos, pos + count);
}
}
module.exports = {
HotPocketContract
}

View File

@@ -4,7 +4,6 @@
#include "../usr/user_input.hpp"
#include "../p2p/p2p.hpp"
#include "../msg/fbuf/p2pmsg_helpers.hpp"
#include "../msg/fbuf/common_helpers.hpp"
#include "../msg/usrmsg_parser.hpp"
#include "../msg/usrmsg_common.hpp"
#include "../p2p/peer_session_handler.hpp"
@@ -119,21 +118,24 @@ namespace cons
ctx.candidate_proposals.emplace(proposal.pubkey, std::move(proposal));
}
}
// Throughout consensus, we move over the incoming npl messages collected via the network so far into
// the candidate npl message set (move and append). This is to have a private working set for the consensus
// and avoid threading conflicts with network incoming npl messages.
{
std::lock_guard<std::mutex> lock(p2p::ctx.collected_msgs.npl_messages_mutex);
for (const auto &npl : p2p::ctx.collected_msgs.npl_messages)
{
const msg::fbuf::p2pmsg::Container *container = msg::fbuf::p2pmsg::GetContainer(npl.data());
// Only the npl messages with a valid lcl will be passed down to the contract. lcl should match the previous round's lcl
if (msg::fbuf::flatbuff_bytes_to_sv(container->lcl()) != ctx.lcl)
continue;
ctx.candidate_npl_messages.splice(ctx.candidate_npl_messages.end(), p2p::ctx.collected_msgs.npl_messages);
}
ctx.candidate_npl_messages.push_back(std::move(npl));
}
p2p::ctx.collected_msgs.npl_messages.clear();
// Only the npl messages with a valid lcl will be passed down to the contract.
// lcl should match the previous round's lcl.
auto itr = ctx.candidate_npl_messages.begin();
while (itr != ctx.candidate_npl_messages.end())
{
if (itr->lcl == ctx.lcl)
++itr;
else
ctx.candidate_npl_messages.erase(itr++);
}
LOG_DBG << "Started stage " << std::to_string(ctx.stage);
@@ -772,9 +774,12 @@ namespace cons
{
sc::contract_execution_args &args = ctx.contract_ctx.args;
args.time = cons_prop.time;
args.lcl = ctx.lcl;
// Populate npl bufs and user bufs.
args.nplbufs.inputs.splice(args.nplbufs.inputs.end(), ctx.candidate_npl_messages);
// Feed NPL messages.
args.npl_messages.splice(args.npl_messages.end(), ctx.candidate_npl_messages);
// Populate user bufs.
feed_user_inputs_to_contract_bufmap(args.userbufs, cons_prop);
// TODO: Do something usefull with HP<-->SC channel.
@@ -786,7 +791,7 @@ namespace cons
ctx.state = args.post_execution_state_hash;
extract_user_outputs_from_contract_bufmap(args.userbufs);
broadcast_npl_output(args.nplbufs.output);
broadcast_npl_output(args.npl_output);
sc::clear_args(args);
}
@@ -909,12 +914,9 @@ namespace cons
{
if (!output.empty())
{
p2p::npl_message npl;
npl.data.swap(output);
flatbuffers::FlatBufferBuilder fbuf(1024);
p2pmsg::create_msg_from_npl_output(fbuf, npl, ctx.lcl);
p2p::broadcast_message(fbuf, false);
p2pmsg::create_msg_from_npl_output(fbuf, output, ctx.lcl);
p2p::broadcast_message(fbuf, true);
}
}

View File

@@ -53,7 +53,7 @@ struct consensus_context
std::unordered_map<std::string, const p2p::proposal> candidate_proposals;
// The set of npl messages that are being collected as consensus stages are progressing.
std::list<std::string> candidate_npl_messages;
std::list<p2p::npl_message> candidate_npl_messages;
// Set of user pubkeys that is said to be connected to the cluster. This will be cleared in each round.
std::unordered_set<std::string> candidate_users;

View File

@@ -372,17 +372,17 @@ namespace msg::fbuf::p2pmsg
/**
* Ctreat npl message from the given npl output srtuct.
* @param container_builder Flatbuffer builder for the container message.
* @param n The npl struct to be placed in the container message.
* @param msg The message to be sent as NPL message.
* @param lcl Lcl value to be passed in the container message.
*/
void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builder, const p2p::npl_message &n, std::string_view lcl)
void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builder, const std::string_view &msg, std::string_view lcl)
{
flatbuffers::FlatBufferBuilder builder(1024);
const flatbuffers::Offset<Npl_Message> npl =
CreateNpl_Message(
builder,
sv_to_flatbuff_bytes(builder, n.data));
sv_to_flatbuff_bytes(builder, msg));
const flatbuffers::Offset<Content> message = CreateContent(builder, Message_Npl_Message, npl.Union());
builder.Finish(message); // Finished building message content to get serialised content.

View File

@@ -51,7 +51,7 @@ namespace msg::fbuf::p2pmsg
void create_msg_from_history_response(flatbuffers::FlatBufferBuilder &container_builder, const p2p::history_response &hr);
void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builder, const p2p::npl_message &npl, std::string_view lcl);
void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builder, const std::string_view &msg, std::string_view lcl);
void create_msg_from_state_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::state_request &hr, std::string_view lcl);

View File

@@ -64,8 +64,11 @@ namespace p2p
LEDGER_RESPONSE_ERROR error;
};
// Represents an NPL message sent by a peer.
struct npl_message
{
std::string pubkey; // Peer binary pubkey.
std::string lcl; // LCL of the peer.
std::string data;
};
@@ -103,8 +106,8 @@ namespace p2p
std::list<nonunl_proposal> nonunl_proposals;
std::mutex nonunl_proposals_mutex; // Mutex for non-unl proposals access race conditions.
// NPL messages are stored as string list because we are feeding the npl messages as it is (byte array) to the contract.
std::list<std::string> npl_messages;
// List of NPL messages collected from peers.
std::list<npl_message> npl_messages;
std::mutex npl_messages_mutex; // Mutex for npl_messages access race conditions.
// List of pairs indicating the session pubkey hex and the state requests.

View File

@@ -135,12 +135,12 @@ namespace p2p
std::lock_guard<std::mutex> lock(ctx.collected_msgs.npl_messages_mutex); // Insert npl message with lock.
// Npl messages are added to the npl message array as it is without deserealizing the content. The same content will be passed down
// to the contract as input in a binary format
const uint8_t *container_buf_ptr = reinterpret_cast<const uint8_t *>(message.data());
const size_t container_buf_size = message.length();
const std::string npl_message(reinterpret_cast<const char *>(container_buf_ptr), container_buf_size);
ctx.collected_msgs.npl_messages.push_back(std::move(npl_message));
const p2pmsg::Npl_Message *npl_p2p_msg = content->message_as_Npl_Message();
npl_message msg;
msg.data = msg::fbuf::flatbuff_bytes_to_sv(npl_p2p_msg->data());
msg.pubkey = msg::fbuf::flatbuff_bytes_to_sv(container->pubkey());
msg.lcl = msg::fbuf::flatbuff_bytes_to_sv(container->lcl());
ctx.collected_msgs.npl_messages.push_back(std::move(msg));
}
else if (content_message_type == p2pmsg::Message_State_Request_Message)
{

View File

@@ -1,9 +1,6 @@
#include "pchheader.hpp"
#include "conf.hpp"
#include "hplog.hpp"
#include "msg/fbuf/common_helpers.hpp"
#include "msg/fbuf/p2pmsg_container_generated.h"
#include "msg/fbuf/p2pmsg_content_generated.h"
#include "sc.hpp"
#include "hpfs/hpfs.hpp"
@@ -24,7 +21,7 @@ namespace sc
if (!ctx.args.readonly)
{
create_iopipes(ctx.nplfds, !ctx.args.nplbufs.inputs.empty());
create_iopipes(ctx.nplfds, !ctx.args.npl_messages.empty());
create_iopipes(ctx.hpscfds, !ctx.args.hpscbufs.inputs.empty());
}
@@ -173,11 +170,12 @@ namespace sc
* "version":"<hp version>",
* "pubkey": "<this node's hex public key>",
* "ts": <this node's timestamp (unix milliseconds)>,
* "readonly": <true|false>,
* "lcl": "<this node's last closed ledger seq no. and hash in hex>", (eg: 169-a1d82eb4c9ed005ec2c4f4f82b6f0c2fd7543d66b1a0f6b8e58ae670b3e2bcfb)
* "hpfd": [fd0, fd1],
* "usrfd":{ "<pkhex>":[fd0, fd1], ... },
* "nplfd":[fd0, fd1],
* "unl":[ "pkhex", ... ],
* "readonly": <true|false>
* "usrfd":{ "<pkhex>":[fd0, fd1], ... },
* "unl":[ "pkhex", ... ]
* }
*/
int write_contract_args(const execution_context &ctx)
@@ -194,7 +192,8 @@ namespace sc
if (!ctx.args.readonly)
{
os << ",\"hpfd\":[" << ctx.hpscfds[FDTYPE::SCREAD] << "," << ctx.hpscfds[FDTYPE::SCWRITE]
os << ",\"lcl\":\"" << ctx.args.lcl
<< "\",\"hpfd\":[" << ctx.hpscfds[FDTYPE::SCREAD] << "," << ctx.hpscfds[FDTYPE::SCWRITE]
<< "],\"nplfd\":[" << ctx.nplfds[FDTYPE::SCREAD] << "," << ctx.nplfds[FDTYPE::SCWRITE] << "]";
}
@@ -250,8 +249,12 @@ namespace sc
int feed_inputs(execution_context &ctx)
{
// Write any hp or npl input messages to hp->sc and npl->sc pipe.
if (!ctx.args.readonly && write_contract_hp_npl_inputs(ctx) != 0)
// Write any input messages to hp->sc pipe.
if (!ctx.args.readonly && write_contract_hp_inputs(ctx) != 0)
return -1;
// Write any NPL messages to contract.
if (!ctx.args.readonly && write_npl_messages(ctx) != 0)
return -1;
// Write any verified (consensus-reached) user inputs to user pipes.
@@ -298,7 +301,7 @@ namespace sc
/**
* Writes any hp input messages to the contract.
*/
int write_contract_hp_npl_inputs(execution_context &ctx)
int write_contract_hp_inputs(execution_context &ctx)
{
if (write_iopipe(ctx.hpscfds, ctx.args.hpscbufs.inputs) != 0)
{
@@ -306,13 +309,69 @@ namespace sc
return -1;
}
if (write_npl_iopipe(ctx.nplfds, ctx.args.nplbufs.inputs) != 0)
return 0;
}
/**
* Write npl messages to the contract.
*/
int write_npl_messages(execution_context &ctx)
{
/**
* npl inputs are feed into the contract in a binary protocol. It follows the following pattern
* |**NPL version (1 byte)**|**Reserved (1 byte)**|**Length of the message (2 bytes)**|**Public key (32 bytes)**|**Npl message data**|
* Length of the message is calculated without including public key length
*/
const int writefd = ctx.nplfds[FDTYPE::HPWRITE];
if (writefd == -1)
return 0;
bool write_error = false;
if (!ctx.args.npl_messages.empty())
{
LOG_ERR << "Error writing NPL inputs to SC";
return -1;
const size_t total_memsegs = ctx.args.npl_messages.size() * 3;
iovec memsegs[total_memsegs];
size_t i = 0;
for (const auto &npl_msg : ctx.args.npl_messages)
{
const uint8_t pre_header_index = i * 3;
const uint8_t pubkey_index = pre_header_index + 1;
const uint8_t msg_index = pre_header_index + 2;
const uint16_t msg_len = npl_msg.data.size();
// Header is |version(1byte)|reserve(1byte)|msg length(2bytes big endian)|
uint8_t header[4];
header[0] = util::MIN_NPL_INPUT_VERSION;
// Store msg length in big endian.
header[2] = msg_len << 8;
header[3] = msg_len;
memsegs[pre_header_index].iov_base = header;
memsegs[pre_header_index].iov_len = sizeof(header);
// Pubkey without the key type prefix.
memsegs[pubkey_index].iov_base = reinterpret_cast<void *>(const_cast<char *>(npl_msg.pubkey.data() + 1));
memsegs[pubkey_index].iov_len = npl_msg.pubkey.size() - 1;
memsegs[msg_index].iov_base = reinterpret_cast<void *>(const_cast<char *>(npl_msg.data.data()));
memsegs[msg_index].iov_len = msg_len;
i++;
}
if (writev(writefd, memsegs, total_memsegs) == -1)
write_error = true;
ctx.args.npl_messages.clear();
}
return 0;
// Close the writefd since we no longer need it.
close(writefd);
ctx.nplfds[FDTYPE::HPWRITE] = -1;
return write_error ? -1 : 0;
}
/**
@@ -330,7 +389,7 @@ namespace sc
return -1;
}
const int npl_res = read_iopipe(ctx.nplfds, ctx.args.nplbufs.output);
const int npl_res = read_iopipe(ctx.nplfds, ctx.args.npl_output);
if (npl_res == -1)
{
LOG_ERR << "Error reading NPL output from the contract.";
@@ -521,85 +580,6 @@ namespace sc
return write_error ? -1 : 0;
}
/**
* Write the given input buffer into the write fd from the HP side.
* @param fds Vector of fd list.
* @param inputs Buffer to write into the HP write fd.
*/
int write_npl_iopipe(std::vector<int> &fds, std::list<std::string> &inputs)
{
/**
* npl inputs are feed into the contract in a binary protocol. It follows the following pattern
* |**NPL version (1 byte)**|**Reserved (1 byte)**|**Length of the message (2 bytes)**|**Public key (4 bytes)**|**Npl message data**|
* Length of the message is calculated without including public key length
*/
const int writefd = fds[FDTYPE::HPWRITE];
if (writefd == -1)
return 0;
bool write_error = false;
if (!inputs.empty())
{
int8_t total_memsegs = inputs.size() * 3;
iovec memsegs[total_memsegs];
size_t i = 0;
for (auto &input : inputs)
{
int8_t pre_header_index = i * 3;
int8_t pubkey_index = pre_header_index + 1;
int8_t msg_index = pre_header_index + 2;
// First binary representation of version, reserve and message length is constructed and feed it into
// memory segment. Then the public key and at last the message data
// At the moment no data is inserted as reserve
uint8_t reserve = 0;
//Get message container
const msg::fbuf::p2pmsg::Container *container = msg::fbuf::p2pmsg::GetContainer(input.data());
const flatbuffers::Vector<uint8_t> *container_content = container->content();
uint16_t msg_length = container_content->size();
/**
* Pre header is constructed using bit shifting. This will generate a bit pattern as explain in the example below
* version = 00000001
* reserve = 00000000
* msg_length = 0000000010001101
* pre_header = 00000001000000000000000010001101
*/
uint32_t pre_header = util::MIN_NPL_INPUT_VERSION;
pre_header = pre_header << 8;
pre_header += reserve;
pre_header = pre_header << 16;
pre_header += msg_length;
memsegs[pre_header_index].iov_base = &pre_header;
memsegs[pre_header_index].iov_len = 4;
std::string_view msg_pubkey = msg::fbuf::flatbuff_bytes_to_sv(container->pubkey());
memsegs[pubkey_index].iov_base = reinterpret_cast<void *>(const_cast<char *>(msg_pubkey.data()));
memsegs[pubkey_index].iov_len = msg_pubkey.size();
memsegs[msg_index].iov_base = reinterpret_cast<void *>(const_cast<uint8_t *>(container_content->Data()));
memsegs[msg_index].iov_len = container_content->size();
i++;
}
if (writev(writefd, memsegs, total_memsegs) == -1)
write_error = true;
inputs.clear();
}
// Close the writefd since we no longer need it.
close(writefd);
fds[FDTYPE::HPWRITE] = -1;
return write_error ? -1 : 0;
}
/**
* Common function to read buffered output from the pipe and populate the output list.
* @param fds Vector representing the pipes fd list.
@@ -701,9 +681,10 @@ namespace sc
args.userbufs.clear();
args.hpscbufs.inputs.clear();
args.hpscbufs.output.clear();
args.nplbufs.inputs.clear();
args.nplbufs.output.clear();
args.npl_messages.clear();
args.npl_output.clear();
args.time = 0;
args.lcl.clear();
args.post_execution_state_hash = hpfs::h32_empty;
}

View File

@@ -5,6 +5,7 @@
#include "usr/usr.hpp"
#include "hpfs/h32.hpp"
#include "util.hpp"
#include "p2p/p2p.hpp"
/**
* Contains helper functions regarding POSIX process execution and IPC between HP and SC.
@@ -62,9 +63,11 @@ namespace sc
// The value is a pair holding consensus-verified inputs and contract-generated outputs.
contract_bufmap_t userbufs;
// Pair of NPL<->SC byte array message buffers.
// Input buffers for NPL->SC messages, Output buffers for SC->NPL messages.
contract_iobuf_pair nplbufs;
// NPL messages to be passed into contract.
std::list<p2p::npl_message> npl_messages;
// Output NPL buffer.
std::string npl_output;
// Pair of HP<->SC JSON message buffers (mainly used for control messages).
// Input buffers for HP->SC messages, Output buffers for SC->HP messages.
@@ -73,6 +76,9 @@ namespace sc
// Current HotPocket consensus time.
int64_t time = 0;
// Current HotPocket lcl (seq no. and ledger hash hex)
std::string lcl;
// State hash after execution will be copied to this (not applicable to read only mode).
hpfs::h32 post_execution_state_hash = hpfs::h32_empty;
};
@@ -123,7 +129,9 @@ namespace sc
int fetch_outputs(execution_context &ctx);
int write_contract_hp_npl_inputs(execution_context &ctx);
int write_contract_hp_inputs(execution_context &ctx);
int write_npl_messages(execution_context &ctx);
int read_contract_hp_npl_outputs(execution_context &ctx);
@@ -143,8 +151,6 @@ namespace sc
int write_iopipe(std::vector<int> &fds, std::list<std::string> &inputs);
int write_npl_iopipe(std::vector<int> &fds, std::list<std::string> &inputs);
int read_iopipe(std::vector<int> &fds, std::string &output);
void close_unused_fds(execution_context &ctx, const bool is_hp);

View File

@@ -1,7 +0,0 @@
#!/bin/bash
string="azure.com"
#name=${string%%.*}
name=${string##*azure}
echo [$name]