Support message separation for multiple inputs from same user. (#142)

This commit is contained in:
Savinda Senevirathne
2020-11-06 10:55:40 +05:30
committed by GitHub
parent 202a6a2715
commit 51173e37f2
11 changed files with 241 additions and 114 deletions

View File

@@ -66,14 +66,15 @@ async function main() {
console.log("Ready to accept inputs.");
const input_pump = () => {
rl.question('', async (inp) => {
rl.question('', (inp) => {
if (inp.startsWith("read "))
hpc.sendContractReadRequest(inp.substr(5))
else {
const submissionStatus = await hpc.sendContractInput(inp);
if (submissionStatus && submissionStatus != "ok")
console.log("Input submission failed. reason: " + submissionStatus);
hpc.sendContractInput(inp).then(submissionStatus => {
if (submissionStatus && submissionStatus != "ok")
console.log("Input submission failed. reason: " + submissionStatus);
});
}
input_pump();

View File

@@ -18,10 +18,13 @@ hpc.events.on("user_message", (pubKey, message) => {
}
else {
user.sendOutput("Echoing: " + userInput);
user.closeChannel();
}
});
hpc.events.on("user_finished", (pubKey) => {
hpc.users[pubKey].closeChannel();
});
const npl = hpc.npl;
// Npl channel always connected if contract is not in readonly mode.

View File

@@ -31,26 +31,46 @@ function HotPocketChannel(fd, userPubKey, events) {
let socket = null;
if (fd > 0) {
socket = fs.createReadStream(null, { fd: fd });
const dataParts = [];
let dataParts = [];
let msgCount = -1;
let msgLen = -1;
let bytesRead = 0;
let pos = 0;
socket.on("data", (buf) => {
if (msgLen == -1) {
// First two bytes indicate the message len.
const msgLenBuf = readBytes(buf, 0, 4);
if (msgLenBuf) {
msgLen = msgLenBuf.readUInt32BE();
const msgBuf = readBytes(buf, 4, buf.byteLength - 4);
dataParts.push(msgBuf)
bytesRead = msgBuf.byteLength;
}
} else {
dataParts.push(buf);
bytesRead += buf.length;
pos = 0;
if (msgCount == -1) {
const msgCountBuf = readBytes(buf, 0, 4)
msgCount = msgCountBuf.readUInt32BE();
pos += 4;
}
if (bytesRead == msgLen) {
msgLen == -1;
events.emit("user_message", userPubKey, Buffer.concat(dataParts));
while (pos < buf.byteLength) {
if (msgLen == -1) {
const msgLenBuf = readBytes(buf, pos, 4);
pos += 4;
msgLen = msgLenBuf.readUInt32BE();
}
let possible_read_len;
if (((buf.byteLength - pos) - msgLen) >= 0) {
// Can finish reading a full message.
possible_read_len = msgLen;
msgLen = -1;
} else {
// Only partial message is recieved.
possible_read_len = buf.byteLength - pos
msgLen -= possible_read_len;
}
const msgBuf = readBytes(buf, pos, possible_read_len);
pos += possible_read_len;
dataParts.push(msgBuf)
if (msgLen == -1) {
events.emit("user_message", userPubKey, Buffer.concat(dataParts));
dataParts = [];
msgCount--
}
if (msgCount == 0) {
msgCount = -1
events.emit("user_finished", userPubKey);
}
}
});
@@ -67,7 +87,12 @@ function HotPocketChannel(fd, userPubKey, events) {
}
this.sendOutput = function (output) {
fs.writeSync(fd, output);
const outputStringBuf = Buffer.from(output);
let headerBuf = Buffer.alloc(4);
// Writing message length in big endian format.
headerBuf.writeUInt32BE(outputStringBuf.byteLength)
fs.writeSync(fd, headerBuf);
fs.writeSync(fd, outputStringBuf);
}
this.closeChannel = function () {

View File

@@ -795,7 +795,6 @@ namespace consensus
else
{
// Send matching outputs to locally connected users.
candidate_user_output &cand_output = cu_itr->second;
// Find the user session by user pubkey.
@@ -805,16 +804,16 @@ namespace consensus
const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id.
if (user_itr != usr::ctx.users.end()) // match found
{
std::string outputtosend;
outputtosend.swap(cand_output.output);
const usr::connected_user &user = user_itr->second;
msg::usrmsg::usrmsg_parser parser(user.protocol);
std::vector<uint8_t> msg;
parser.create_contract_output_container(msg, outputtosend, lcl_seq_no, lcl);
user.session.send(msg);
// Sending all the outputs to the user.
for (sc::contract_output &output : cand_output.outputs)
{
std::vector<uint8_t> msg;
parser.create_contract_output_container(msg, output.message, lcl_seq_no, lcl);
user.session.send(msg);
output.message.clear();
}
}
}
@@ -834,7 +833,9 @@ namespace consensus
// Populate the buf map with all currently connected users regardless of whether they have inputs or not.
// This is in case the contract wanted to emit some data to a user without needing any input.
for (const std::string &pubkey : cons_prop.users)
bufmap.try_emplace(pubkey, sc::contract_iobuf_pair());
{
bufmap.try_emplace(pubkey, sc::contract_iobufs());
}
for (const std::string &hash : cons_prop.hash_inputs)
{
@@ -855,8 +856,8 @@ namespace consensus
std::string inputtofeed;
inputtofeed.swap(cand_input.input);
sc::contract_iobuf_pair &bufpair = bufmap[cand_input.userpubkey];
bufpair.inputs.push_back(std::move(inputtofeed));
sc::contract_iobufs &bufs = bufmap[cand_input.userpubkey];
bufs.inputs.push_back(std::move(inputtofeed));
// Remove the input from the candidate set because we no longer need it.
//LOG_DEBUG << "candidate input deleted.";
@@ -872,17 +873,21 @@ namespace consensus
*/
void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap)
{
for (auto &[pubkey, bufpair] : bufmap)
for (auto &[pubkey, bufs] : bufmap)
{
if (!bufpair.output.empty())
if (!bufs.outputs.empty())
{
std::string output;
output.swap(bufpair.output);
std::vector<std::string_view> vect;
// Adding public key.
vect.push_back(pubkey);
// Only using message to generate hash for output messages. Length is not needed.
for (sc::contract_output &output : bufs.outputs)
vect.push_back(output.message);
const std::string hash = crypto::get_hash(pubkey, output);
const std::string hash = crypto::get_hash(vect);
ctx.candidate_user_outputs.try_emplace(
std::move(hash),
candidate_user_output(pubkey, std::move(output)));
candidate_user_output(pubkey, std::move(bufs.outputs)));
}
}
}

View File

@@ -32,10 +32,10 @@ namespace consensus
struct candidate_user_output
{
const std::string userpubkey;
std::string output;
std::list<sc::contract_output> outputs;
candidate_user_output(const std::string userpubkey, const std::string output)
: userpubkey(std::move(userpubkey)), output(std::move(output))
candidate_user_output(const std::string userpubkey, const std::list<sc::contract_output> outputs)
: userpubkey(std::move(userpubkey)), outputs(std::move(outputs))
{
}
};

View File

@@ -35,8 +35,8 @@ namespace crypto
seckey[0] = KEYPFX_ed25519;
crypto_sign_ed25519_keypair(
reinterpret_cast<unsigned char *>(pubkey.data() + 1), // +1 to skip the prefix byte.
reinterpret_cast<unsigned char *>(seckey.data() + 1)); // +1 to skip the prefix byte.
reinterpret_cast<unsigned char *>(pubkey.data() + 1), // +1 to skip the prefix byte.
reinterpret_cast<unsigned char *>(seckey.data() + 1)); // +1 to skip the prefix byte.
}
/**
@@ -196,4 +196,26 @@ namespace crypto
return hash;
}
/**
* Generates blake3 hash for the given string view vector using stream hashing.
*/
std::string get_hash(const std::vector<std::string_view> &sw_vect)
{
std::string hash;
hash.resize(BLAKE3_OUT_LEN);
// Init stream hashing.
blake3_hasher hasher;
blake3_hasher_init(&hasher);
// Hash is generated only using message in contract output struct.
for (std::string_view sw : sw_vect)
blake3_hasher_update(&hasher, reinterpret_cast<const unsigned char *>(sw.data()), sw.length());
// Get the final hash.
blake3_hasher_finalize(&hasher, reinterpret_cast<unsigned char *>(hash.data()), hash.length());
return hash;
}
} // namespace crypto

View File

@@ -10,30 +10,32 @@
namespace crypto
{
// Prefix byte to append to ed25519 keys.
static unsigned char KEYPFX_ed25519 = 0xED;
// Prefixed public key bytes.
static size_t PFXD_PUBKEY_BYTES = crypto_sign_ed25519_PUBLICKEYBYTES + 1;
// Prefixed secret key bytes.
static size_t PFXD_SECKEY_BYTES = crypto_sign_ed25519_SECRETKEYBYTES + 1;
// Prefix byte to append to ed25519 keys.
static unsigned char KEYPFX_ed25519 = 0xED;
// Prefixed public key bytes.
static size_t PFXD_PUBKEY_BYTES = crypto_sign_ed25519_PUBLICKEYBYTES + 1;
// Prefixed secret key bytes.
static size_t PFXD_SECKEY_BYTES = crypto_sign_ed25519_SECRETKEYBYTES + 1;
int init();
int init();
void generate_signing_keys(std::string &pubkey, std::string &seckey);
void generate_signing_keys(std::string &pubkey, std::string &seckey);
std::string sign(std::string_view msg, std::string_view seckey);
std::string sign(std::string_view msg, std::string_view seckey);
std::string sign_hex(std::string_view msg, std::string_view seckeyhex);
std::string sign_hex(std::string_view msg, std::string_view seckeyhex);
int verify(std::string_view msg, std::string_view sig, std::string_view pubkey);
int verify(std::string_view msg, std::string_view sig, std::string_view pubkey);
int verify_hex(std::string_view msg, std::string_view sighex, std::string_view pubkeyhex);
int verify_hex(std::string_view msg, std::string_view sighex, std::string_view pubkeyhex);
std::string get_hash(std::string_view data);
std::string get_hash(std::string_view data);
std::string get_hash(const unsigned char * data, size_t data_length);
std::string get_hash(const unsigned char *data, size_t data_length);
std::string get_hash(std::string_view s1, std::string_view s2);
std::string get_hash(std::string_view s1, std::string_view s2);
std::string get_hash(const std::vector<std::string_view> &sw_vect);
} // namespace crypto

View File

@@ -393,11 +393,11 @@ namespace sc
*/
int write_contract_hp_inputs(execution_context &ctx)
{
if (write_iosocket_seq_packet(ctx.hpscfds, ctx.args.hpscbufs.inputs, false) == -1)
{
LOG_ERROR << "Error writing HP inputs to SC";
return -1;
}
// if (write_iosocket_seq_packet(ctx.hpscfds, ctx.args.hpscbufs.inputs, false) == -1)
// {
// LOG_ERROR << "Error writing HP inputs to SC";
// return -1;
// }
return 0;
}
@@ -450,12 +450,16 @@ namespace sc
int read_contract_hp_outputs(execution_context &ctx)
{
std::string output;
const int hpsc_res = read_iosocket_seq_packet(ctx.hpscfds, ctx.args.hpscbufs.output);
const int hpsc_res = read_iosocket_seq_packet(ctx.hpscfds, output);
if (hpsc_res == -1)
{
LOG_ERROR << "Error reading HP output from the contract.";
return -1;
}
else if (hpsc_res > 0)
{
// ctx.args.hpscbufs.outputs.push_back(output);
}
return (hpsc_res == 0) ? 0 : 1;
}
@@ -576,19 +580,71 @@ namespace sc
int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap)
{
bool bytes_read = false;
for (auto &[pubkey, bufpair] : bufmap)
for (auto &[pubkey, bufs] : bufmap)
{
// Get fds for the pubkey.
std::string output;
std::vector<int> &fds = fdmap[pubkey];
const int res = read_iosocket_stream(fds, bufpair.output);
if (res == -1)
// This returns the total bytes read from the socket.
const int total_bytes_read = read_iosocket_stream(fds, output);
if (total_bytes_read > 0)
{
// Current reading position of the received buffer chunk.
int pos = 0;
// Go through the buffer to the end.
while (pos < total_bytes_read)
{
// Check whether the output list is empty or the last message stored is finished reading.
// If so, an empty container is added to store the new message.
if (bufs.outputs.empty() || (bufs.outputs.back().message.length() == bufs.outputs.back().message_len))
{
// Add new empty container.
bufs.outputs.push_back(contract_output());
}
// Get the laterst element from the list.
contract_output &current_output = bufs.outputs.back();
// This is a new container. Message len of container is defaults to 0.
if (current_output.message_len == 0)
{
// Extract the message length from four byte header in the buffer.
// Length received is in Big Endian format.
// Re-construct it into natural order. (No matter the format computer saves it in).
current_output.message_len = (uint8_t)output[pos] << 24 | (uint8_t)output[pos + 1] << 16 | (uint8_t)output[pos + 2] << 8 | (uint8_t)output[pos + 3];
// Advance the current position.
pos += 4;
}
// Store the possible message length which could be read from the remaining buffer length.
int possible_read_len;
// Checking whether the remaing buffer length is long enough to finish reading the current message.
if (((total_bytes_read - pos) - (current_output.message_len - current_output.message.length())) >= 0)
{
// Can finish reading a full message. Possible length is equal to the remaining message length.
possible_read_len = current_output.message_len - current_output.message.length();
}
else
{
// Only partial message is recieved. Store the received bytes until other chunk is received.
possible_read_len = total_bytes_read - pos;
}
// Extract the message chunk from the buffer.
std::string msgBuf = output.substr(pos, possible_read_len);
pos += possible_read_len;
// Append the extracted message chunk to the current message.
current_output.message += msgBuf;
}
bytes_read = true;
}
if (total_bytes_read == -1)
{
return -1;
}
if (res > 0)
bytes_read = true;
}
return bytes_read ? 1 : 0;
@@ -649,25 +705,32 @@ namespace sc
if (!inputs.empty())
{
// Prepare the input memory segments to write with wrtiev.
iovec memsegs[2];
std::string msg_buf;
// Extra one element for the header.
iovec memsegs[inputs.size() * 2 + 1];
uint8_t header[inputs.size() * 4 + 4];
header[0] = inputs.size() >> 24;
header[1] = inputs.size() >> 16;
header[2] = inputs.size() >> 8;
header[3] = inputs.size();
// Message count header.
memsegs[0].iov_base = header;
memsegs[0].iov_len = 4;
size_t i = 1;
for (std::string &input : inputs)
{
// Concat messages into one message segment.
msg_buf += input;
// 4 bytes for message len header.
header[i * 4] = input.length() >> 24;
header[i * 4 + 1] = input.length() >> 16;
header[i * 4 + 2] = input.length() >> 8;
header[i * 4 + 3] = input.length();
memsegs[i * 2 - 1].iov_base = &header[i * 4];
memsegs[i * 2 - 1].iov_len = 4;
memsegs[i * 2].iov_base = input.data();
memsegs[i * 2].iov_len = input.length();
i++;
}
// Storing message len in big endian.
uint8_t header[4];
header[0] = msg_buf.length() >> 24;
header[1] = msg_buf.length() >> 16;
header[2] = msg_buf.length() >> 8;
header[3] = msg_buf.length();
memsegs[0].iov_base = header;
memsegs[0].iov_len = sizeof(header);
memsegs[1].iov_base = msg_buf.data();
memsegs[1].iov_len = msg_buf.length();
if (writev(writefd, memsegs, 2) == -1)
if (writev(writefd, memsegs, (inputs.size() * 2 + 1)) == -1)
write_error = true;
inputs.clear();
@@ -755,10 +818,9 @@ namespace sc
}
return res;
}
}
close(readfd);
close(readfd);
fds[SOCKETFDTYPE::HPREADWRITE] = -1;
LOG_ERROR << errno << ": Error reading sequence packet socket.";
@@ -790,9 +852,8 @@ namespace sc
return 0;
}
const size_t current_size = output.size();
output.resize(current_size + available_bytes);
const int res = read(readfd, output.data() + current_size, available_bytes);
output.resize(available_bytes);
const int res = read(readfd, output.data(), available_bytes);
if (res >= 0)
{
@@ -876,8 +937,8 @@ namespace sc
void clear_args(contract_execution_args &args)
{
args.userbufs.clear();
args.hpscbufs.inputs.clear();
args.hpscbufs.output.clear();
// args.hpscbufs.inputs.clear();
// args.hpscbufs.outputs.clear();
// Empty npl message queue.
while (args.npl_messages.pop())
{

View File

@@ -25,17 +25,25 @@ namespace sc
};
/**
* Stores contract output message length along with the message. Length is used to construct the message from the stream buffer.
*/
struct contract_output
{
uint32_t message_len = 0;
std::string message;
};
/**
* Represents list of inputs to the contract and the accumulated contract output for those inputs.
*/
struct contract_iobuf_pair
struct contract_iobufs
{
// List of inputs to be fed into the contract.
std::list<std::string> inputs;
// Output emitted by contract after execution.
// (Because we are reading output at the end, there's no way to
// get a "list" of outputs. So it's always a one contiguous output.)
std::string output;
// List of outputs from the contract.
std::list<contract_output> outputs;
};
// Common typedef for a map of pubkey->fdlist.
@@ -44,7 +52,7 @@ namespace sc
// Common typedef for a map of pubkey->I/O list pair (input list and output list).
// This is used to keep track of input/output buffers for a given public key (eg. user, npl)
typedef std::unordered_map<std::string, contract_iobuf_pair> contract_bufmap_t;
typedef std::unordered_map<std::string, contract_iobufs> contract_bufmap_t;
/**
* Holds information that should be passed into the contract process.
@@ -66,7 +74,7 @@ namespace sc
// Pair of HP<->SC JSON message buffers (mainly used for control messages).
// Input buffers for HP->SC messages, Output buffers for SC->HP messages.
contract_iobuf_pair hpscbufs;
// contract_iobuf_pair hpscbufs;
// Current HotPocket consensus time.
int64_t time = 0;

View File

@@ -139,7 +139,7 @@ namespace read_req
std::scoped_lock<std::mutex> lock(usr::ctx.users_mutex);
const auto user_buf_itr = context_itr->args.userbufs.begin();
if (!user_buf_itr->second.output.empty())
if (!user_buf_itr->second.outputs.empty())
{
// Find the user session by user pubkey.
const auto sess_itr = usr::ctx.sessionids.find(user_buf_itr->first);
@@ -148,15 +148,16 @@ namespace read_req
const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id.
if (user_itr != usr::ctx.users.end()) // match found
{
std::string outputtosend;
outputtosend.swap(user_buf_itr->second.output);
const usr::connected_user &user = user_itr->second;
msg::usrmsg::usrmsg_parser parser(user.protocol);
std::vector<uint8_t> msg;
parser.create_contract_read_response_container(msg, outputtosend);
user.session.send(msg);
for (sc::contract_output &output : user_buf_itr->second.outputs)
{
std::vector<uint8_t> msg;
parser.create_contract_read_response_container(msg, output.message);
user.session.send(msg);
output.message.clear();
}
user_buf_itr->second.outputs.clear();
}
}
}
@@ -215,9 +216,9 @@ namespace read_req
contract_ctx.args.state_dir = conf::ctx.state_dir;
contract_ctx.args.state_dir.append("/rr_").append(std::to_string(thread_id));
contract_ctx.args.readonly = true;
sc::contract_iobuf_pair user_bufpair;
user_bufpair.inputs.push_back(std::move(read_request.content));
contract_ctx.args.userbufs.try_emplace(read_request.pubkey, std::move(user_bufpair));
sc::contract_iobufs user_bufs;
user_bufs.inputs.push_back(std::move(read_request.content));
contract_ctx.args.userbufs.try_emplace(read_request.pubkey, std::move(user_bufs));
}
/**

View File

@@ -2,7 +2,6 @@
#define _HP_UTIL_
#include "pchheader.hpp"
#include "crypto.hpp"
/**
* Contains helper functions and data structures used by multiple other subsystems.