From 51173e37f2b15f93e65862f8dfed86d2ddef7d76 Mon Sep 17 00:00:00 2001 From: Savinda Senevirathne Date: Fri, 6 Nov 2020 10:55:40 +0530 Subject: [PATCH] Support message separation for multiple inputs from same user. (#142) --- examples/nodejs_client/text-client.js | 9 +- examples/nodejs_contract/echo_contract.js | 5 +- examples/nodejs_contract/hp-contract-lib.js | 61 ++++++--- src/consensus.cpp | 41 ++++--- src/consensus.hpp | 6 +- src/crypto.cpp | 26 +++- src/crypto.hpp | 32 ++--- src/sc.cpp | 129 ++++++++++++++------ src/sc.hpp | 22 ++-- src/usr/read_req.cpp | 23 ++-- src/util.hpp | 1 - 11 files changed, 241 insertions(+), 114 deletions(-) diff --git a/examples/nodejs_client/text-client.js b/examples/nodejs_client/text-client.js index 1fc36ef2..179b1d1a 100644 --- a/examples/nodejs_client/text-client.js +++ b/examples/nodejs_client/text-client.js @@ -66,14 +66,15 @@ async function main() { console.log("Ready to accept inputs."); const input_pump = () => { - rl.question('', async (inp) => { + rl.question('', (inp) => { if (inp.startsWith("read ")) hpc.sendContractReadRequest(inp.substr(5)) else { - const submissionStatus = await hpc.sendContractInput(inp); - if (submissionStatus && submissionStatus != "ok") - console.log("Input submission failed. reason: " + submissionStatus); + hpc.sendContractInput(inp).then(submissionStatus => { + if (submissionStatus && submissionStatus != "ok") + console.log("Input submission failed. reason: " + submissionStatus); + }); } input_pump(); diff --git a/examples/nodejs_contract/echo_contract.js b/examples/nodejs_contract/echo_contract.js index 674698b5..20a61ad0 100644 --- a/examples/nodejs_contract/echo_contract.js +++ b/examples/nodejs_contract/echo_contract.js @@ -18,10 +18,13 @@ hpc.events.on("user_message", (pubKey, message) => { } else { user.sendOutput("Echoing: " + userInput); - user.closeChannel(); } }); +hpc.events.on("user_finished", (pubKey) => { + hpc.users[pubKey].closeChannel(); +}); + const npl = hpc.npl; // Npl channel always connected if contract is not in readonly mode. diff --git a/examples/nodejs_contract/hp-contract-lib.js b/examples/nodejs_contract/hp-contract-lib.js index 45415716..9b8b2a0b 100644 --- a/examples/nodejs_contract/hp-contract-lib.js +++ b/examples/nodejs_contract/hp-contract-lib.js @@ -31,26 +31,46 @@ function HotPocketChannel(fd, userPubKey, events) { let socket = null; if (fd > 0) { socket = fs.createReadStream(null, { fd: fd }); - const dataParts = []; + let dataParts = []; + let msgCount = -1; let msgLen = -1; - let bytesRead = 0; + let pos = 0; socket.on("data", (buf) => { - if (msgLen == -1) { - // First two bytes indicate the message len. - const msgLenBuf = readBytes(buf, 0, 4); - if (msgLenBuf) { - msgLen = msgLenBuf.readUInt32BE(); - const msgBuf = readBytes(buf, 4, buf.byteLength - 4); - dataParts.push(msgBuf) - bytesRead = msgBuf.byteLength; - } - } else { - dataParts.push(buf); - bytesRead += buf.length; + pos = 0; + if (msgCount == -1) { + const msgCountBuf = readBytes(buf, 0, 4) + msgCount = msgCountBuf.readUInt32BE(); + pos += 4; } - if (bytesRead == msgLen) { - msgLen == -1; - events.emit("user_message", userPubKey, Buffer.concat(dataParts)); + while (pos < buf.byteLength) { + if (msgLen == -1) { + const msgLenBuf = readBytes(buf, pos, 4); + pos += 4; + msgLen = msgLenBuf.readUInt32BE(); + } + let possible_read_len; + if (((buf.byteLength - pos) - msgLen) >= 0) { + // Can finish reading a full message. + possible_read_len = msgLen; + msgLen = -1; + } else { + // Only partial message is recieved. + possible_read_len = buf.byteLength - pos + msgLen -= possible_read_len; + } + const msgBuf = readBytes(buf, pos, possible_read_len); + pos += possible_read_len; + dataParts.push(msgBuf) + + if (msgLen == -1) { + events.emit("user_message", userPubKey, Buffer.concat(dataParts)); + dataParts = []; + msgCount-- + } + if (msgCount == 0) { + msgCount = -1 + events.emit("user_finished", userPubKey); + } } }); @@ -67,7 +87,12 @@ function HotPocketChannel(fd, userPubKey, events) { } this.sendOutput = function (output) { - fs.writeSync(fd, output); + const outputStringBuf = Buffer.from(output); + let headerBuf = Buffer.alloc(4); + // Writing message length in big endian format. + headerBuf.writeUInt32BE(outputStringBuf.byteLength) + fs.writeSync(fd, headerBuf); + fs.writeSync(fd, outputStringBuf); } this.closeChannel = function () { diff --git a/src/consensus.cpp b/src/consensus.cpp index 857b24c1..133411a8 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -795,7 +795,6 @@ namespace consensus else { // Send matching outputs to locally connected users. - candidate_user_output &cand_output = cu_itr->second; // Find the user session by user pubkey. @@ -805,16 +804,16 @@ namespace consensus const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id. if (user_itr != usr::ctx.users.end()) // match found { - std::string outputtosend; - outputtosend.swap(cand_output.output); - const usr::connected_user &user = user_itr->second; msg::usrmsg::usrmsg_parser parser(user.protocol); - - std::vector msg; - parser.create_contract_output_container(msg, outputtosend, lcl_seq_no, lcl); - - user.session.send(msg); + // Sending all the outputs to the user. + for (sc::contract_output &output : cand_output.outputs) + { + std::vector msg; + parser.create_contract_output_container(msg, output.message, lcl_seq_no, lcl); + user.session.send(msg); + output.message.clear(); + } } } @@ -834,7 +833,9 @@ namespace consensus // Populate the buf map with all currently connected users regardless of whether they have inputs or not. // This is in case the contract wanted to emit some data to a user without needing any input. for (const std::string &pubkey : cons_prop.users) - bufmap.try_emplace(pubkey, sc::contract_iobuf_pair()); + { + bufmap.try_emplace(pubkey, sc::contract_iobufs()); + } for (const std::string &hash : cons_prop.hash_inputs) { @@ -855,8 +856,8 @@ namespace consensus std::string inputtofeed; inputtofeed.swap(cand_input.input); - sc::contract_iobuf_pair &bufpair = bufmap[cand_input.userpubkey]; - bufpair.inputs.push_back(std::move(inputtofeed)); + sc::contract_iobufs &bufs = bufmap[cand_input.userpubkey]; + bufs.inputs.push_back(std::move(inputtofeed)); // Remove the input from the candidate set because we no longer need it. //LOG_DEBUG << "candidate input deleted."; @@ -872,17 +873,21 @@ namespace consensus */ void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap) { - for (auto &[pubkey, bufpair] : bufmap) + for (auto &[pubkey, bufs] : bufmap) { - if (!bufpair.output.empty()) + if (!bufs.outputs.empty()) { - std::string output; - output.swap(bufpair.output); + std::vector vect; + // Adding public key. + vect.push_back(pubkey); + // Only using message to generate hash for output messages. Length is not needed. + for (sc::contract_output &output : bufs.outputs) + vect.push_back(output.message); - const std::string hash = crypto::get_hash(pubkey, output); + const std::string hash = crypto::get_hash(vect); ctx.candidate_user_outputs.try_emplace( std::move(hash), - candidate_user_output(pubkey, std::move(output))); + candidate_user_output(pubkey, std::move(bufs.outputs))); } } } diff --git a/src/consensus.hpp b/src/consensus.hpp index 16fcb87d..593af93b 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -32,10 +32,10 @@ namespace consensus struct candidate_user_output { const std::string userpubkey; - std::string output; + std::list outputs; - candidate_user_output(const std::string userpubkey, const std::string output) - : userpubkey(std::move(userpubkey)), output(std::move(output)) + candidate_user_output(const std::string userpubkey, const std::list outputs) + : userpubkey(std::move(userpubkey)), outputs(std::move(outputs)) { } }; diff --git a/src/crypto.cpp b/src/crypto.cpp index 5c6f0ec5..94600e77 100644 --- a/src/crypto.cpp +++ b/src/crypto.cpp @@ -35,8 +35,8 @@ namespace crypto seckey[0] = KEYPFX_ed25519; crypto_sign_ed25519_keypair( - reinterpret_cast(pubkey.data() + 1), // +1 to skip the prefix byte. - reinterpret_cast(seckey.data() + 1)); // +1 to skip the prefix byte. + reinterpret_cast(pubkey.data() + 1), // +1 to skip the prefix byte. + reinterpret_cast(seckey.data() + 1)); // +1 to skip the prefix byte. } /** @@ -196,4 +196,26 @@ namespace crypto return hash; } + /** + * Generates blake3 hash for the given string view vector using stream hashing. + */ + std::string get_hash(const std::vector &sw_vect) + { + std::string hash; + hash.resize(BLAKE3_OUT_LEN); + + // Init stream hashing. + blake3_hasher hasher; + blake3_hasher_init(&hasher); + + // Hash is generated only using message in contract output struct. + for (std::string_view sw : sw_vect) + blake3_hasher_update(&hasher, reinterpret_cast(sw.data()), sw.length()); + + // Get the final hash. + blake3_hasher_finalize(&hasher, reinterpret_cast(hash.data()), hash.length()); + + return hash; + } + } // namespace crypto \ No newline at end of file diff --git a/src/crypto.hpp b/src/crypto.hpp index fa8bf9ad..31b68e0d 100644 --- a/src/crypto.hpp +++ b/src/crypto.hpp @@ -10,30 +10,32 @@ namespace crypto { -// Prefix byte to append to ed25519 keys. -static unsigned char KEYPFX_ed25519 = 0xED; -// Prefixed public key bytes. -static size_t PFXD_PUBKEY_BYTES = crypto_sign_ed25519_PUBLICKEYBYTES + 1; -// Prefixed secret key bytes. -static size_t PFXD_SECKEY_BYTES = crypto_sign_ed25519_SECRETKEYBYTES + 1; + // Prefix byte to append to ed25519 keys. + static unsigned char KEYPFX_ed25519 = 0xED; + // Prefixed public key bytes. + static size_t PFXD_PUBKEY_BYTES = crypto_sign_ed25519_PUBLICKEYBYTES + 1; + // Prefixed secret key bytes. + static size_t PFXD_SECKEY_BYTES = crypto_sign_ed25519_SECRETKEYBYTES + 1; -int init(); + int init(); -void generate_signing_keys(std::string &pubkey, std::string &seckey); + void generate_signing_keys(std::string &pubkey, std::string &seckey); -std::string sign(std::string_view msg, std::string_view seckey); + std::string sign(std::string_view msg, std::string_view seckey); -std::string sign_hex(std::string_view msg, std::string_view seckeyhex); + std::string sign_hex(std::string_view msg, std::string_view seckeyhex); -int verify(std::string_view msg, std::string_view sig, std::string_view pubkey); + int verify(std::string_view msg, std::string_view sig, std::string_view pubkey); -int verify_hex(std::string_view msg, std::string_view sighex, std::string_view pubkeyhex); + int verify_hex(std::string_view msg, std::string_view sighex, std::string_view pubkeyhex); -std::string get_hash(std::string_view data); + std::string get_hash(std::string_view data); -std::string get_hash(const unsigned char * data, size_t data_length); + std::string get_hash(const unsigned char *data, size_t data_length); -std::string get_hash(std::string_view s1, std::string_view s2); + std::string get_hash(std::string_view s1, std::string_view s2); + + std::string get_hash(const std::vector &sw_vect); } // namespace crypto diff --git a/src/sc.cpp b/src/sc.cpp index d303ab51..487c709e 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -393,11 +393,11 @@ namespace sc */ int write_contract_hp_inputs(execution_context &ctx) { - if (write_iosocket_seq_packet(ctx.hpscfds, ctx.args.hpscbufs.inputs, false) == -1) - { - LOG_ERROR << "Error writing HP inputs to SC"; - return -1; - } + // if (write_iosocket_seq_packet(ctx.hpscfds, ctx.args.hpscbufs.inputs, false) == -1) + // { + // LOG_ERROR << "Error writing HP inputs to SC"; + // return -1; + // } return 0; } @@ -450,12 +450,16 @@ namespace sc int read_contract_hp_outputs(execution_context &ctx) { std::string output; - const int hpsc_res = read_iosocket_seq_packet(ctx.hpscfds, ctx.args.hpscbufs.output); + const int hpsc_res = read_iosocket_seq_packet(ctx.hpscfds, output); if (hpsc_res == -1) { LOG_ERROR << "Error reading HP output from the contract."; return -1; } + else if (hpsc_res > 0) + { + // ctx.args.hpscbufs.outputs.push_back(output); + } return (hpsc_res == 0) ? 0 : 1; } @@ -576,19 +580,71 @@ namespace sc int read_contract_fdmap_outputs(contract_fdmap_t &fdmap, contract_bufmap_t &bufmap) { bool bytes_read = false; - for (auto &[pubkey, bufpair] : bufmap) + for (auto &[pubkey, bufs] : bufmap) { // Get fds for the pubkey. + std::string output; std::vector &fds = fdmap[pubkey]; - const int res = read_iosocket_stream(fds, bufpair.output); - if (res == -1) + // This returns the total bytes read from the socket. + const int total_bytes_read = read_iosocket_stream(fds, output); + + if (total_bytes_read > 0) + { + // Current reading position of the received buffer chunk. + int pos = 0; + // Go through the buffer to the end. + while (pos < total_bytes_read) + { + // Check whether the output list is empty or the last message stored is finished reading. + // If so, an empty container is added to store the new message. + if (bufs.outputs.empty() || (bufs.outputs.back().message.length() == bufs.outputs.back().message_len)) + { + // Add new empty container. + bufs.outputs.push_back(contract_output()); + } + + // Get the laterst element from the list. + contract_output ¤t_output = bufs.outputs.back(); + + // This is a new container. Message len of container is defaults to 0. + if (current_output.message_len == 0) + { + // Extract the message length from four byte header in the buffer. + // Length received is in Big Endian format. + // Re-construct it into natural order. (No matter the format computer saves it in). + current_output.message_len = (uint8_t)output[pos] << 24 | (uint8_t)output[pos + 1] << 16 | (uint8_t)output[pos + 2] << 8 | (uint8_t)output[pos + 3]; + // Advance the current position. + pos += 4; + } + // Store the possible message length which could be read from the remaining buffer length. + int possible_read_len; + + // Checking whether the remaing buffer length is long enough to finish reading the current message. + if (((total_bytes_read - pos) - (current_output.message_len - current_output.message.length())) >= 0) + { + // Can finish reading a full message. Possible length is equal to the remaining message length. + possible_read_len = current_output.message_len - current_output.message.length(); + } + else + { + // Only partial message is recieved. Store the received bytes until other chunk is received. + possible_read_len = total_bytes_read - pos; + } + // Extract the message chunk from the buffer. + std::string msgBuf = output.substr(pos, possible_read_len); + pos += possible_read_len; + // Append the extracted message chunk to the current message. + current_output.message += msgBuf; + } + + bytes_read = true; + } + + if (total_bytes_read == -1) { return -1; } - - if (res > 0) - bytes_read = true; } return bytes_read ? 1 : 0; @@ -649,25 +705,32 @@ namespace sc if (!inputs.empty()) { // Prepare the input memory segments to write with wrtiev. - iovec memsegs[2]; - std::string msg_buf; + // Extra one element for the header. + iovec memsegs[inputs.size() * 2 + 1]; + uint8_t header[inputs.size() * 4 + 4]; + header[0] = inputs.size() >> 24; + header[1] = inputs.size() >> 16; + header[2] = inputs.size() >> 8; + header[3] = inputs.size(); + // Message count header. + memsegs[0].iov_base = header; + memsegs[0].iov_len = 4; + size_t i = 1; for (std::string &input : inputs) { - // Concat messages into one message segment. - msg_buf += input; + // 4 bytes for message len header. + header[i * 4] = input.length() >> 24; + header[i * 4 + 1] = input.length() >> 16; + header[i * 4 + 2] = input.length() >> 8; + header[i * 4 + 3] = input.length(); + memsegs[i * 2 - 1].iov_base = &header[i * 4]; + memsegs[i * 2 - 1].iov_len = 4; + memsegs[i * 2].iov_base = input.data(); + memsegs[i * 2].iov_len = input.length(); + i++; } - // Storing message len in big endian. - uint8_t header[4]; - header[0] = msg_buf.length() >> 24; - header[1] = msg_buf.length() >> 16; - header[2] = msg_buf.length() >> 8; - header[3] = msg_buf.length(); - memsegs[0].iov_base = header; - memsegs[0].iov_len = sizeof(header); - memsegs[1].iov_base = msg_buf.data(); - memsegs[1].iov_len = msg_buf.length(); - if (writev(writefd, memsegs, 2) == -1) + if (writev(writefd, memsegs, (inputs.size() * 2 + 1)) == -1) write_error = true; inputs.clear(); @@ -755,10 +818,9 @@ namespace sc } return res; } - } - close(readfd); + close(readfd); fds[SOCKETFDTYPE::HPREADWRITE] = -1; LOG_ERROR << errno << ": Error reading sequence packet socket."; @@ -790,9 +852,8 @@ namespace sc return 0; } - const size_t current_size = output.size(); - output.resize(current_size + available_bytes); - const int res = read(readfd, output.data() + current_size, available_bytes); + output.resize(available_bytes); + const int res = read(readfd, output.data(), available_bytes); if (res >= 0) { @@ -876,8 +937,8 @@ namespace sc void clear_args(contract_execution_args &args) { args.userbufs.clear(); - args.hpscbufs.inputs.clear(); - args.hpscbufs.output.clear(); + // args.hpscbufs.inputs.clear(); + // args.hpscbufs.outputs.clear(); // Empty npl message queue. while (args.npl_messages.pop()) { diff --git a/src/sc.hpp b/src/sc.hpp index f20f1225..e5b86b43 100644 --- a/src/sc.hpp +++ b/src/sc.hpp @@ -25,17 +25,25 @@ namespace sc }; /** + * Stores contract output message length along with the message. Length is used to construct the message from the stream buffer. + */ + struct contract_output + { + uint32_t message_len = 0; + std::string message; + + }; + /** * Represents list of inputs to the contract and the accumulated contract output for those inputs. */ - struct contract_iobuf_pair + + struct contract_iobufs { // List of inputs to be fed into the contract. std::list inputs; - // Output emitted by contract after execution. - // (Because we are reading output at the end, there's no way to - // get a "list" of outputs. So it's always a one contiguous output.) - std::string output; + // List of outputs from the contract. + std::list outputs; }; // Common typedef for a map of pubkey->fdlist. @@ -44,7 +52,7 @@ namespace sc // Common typedef for a map of pubkey->I/O list pair (input list and output list). // This is used to keep track of input/output buffers for a given public key (eg. user, npl) - typedef std::unordered_map contract_bufmap_t; + typedef std::unordered_map contract_bufmap_t; /** * Holds information that should be passed into the contract process. @@ -66,7 +74,7 @@ namespace sc // Pair of HP<->SC JSON message buffers (mainly used for control messages). // Input buffers for HP->SC messages, Output buffers for SC->HP messages. - contract_iobuf_pair hpscbufs; + // contract_iobuf_pair hpscbufs; // Current HotPocket consensus time. int64_t time = 0; diff --git a/src/usr/read_req.cpp b/src/usr/read_req.cpp index 5b3821ad..ba8caabe 100644 --- a/src/usr/read_req.cpp +++ b/src/usr/read_req.cpp @@ -139,7 +139,7 @@ namespace read_req std::scoped_lock lock(usr::ctx.users_mutex); const auto user_buf_itr = context_itr->args.userbufs.begin(); - if (!user_buf_itr->second.output.empty()) + if (!user_buf_itr->second.outputs.empty()) { // Find the user session by user pubkey. const auto sess_itr = usr::ctx.sessionids.find(user_buf_itr->first); @@ -148,15 +148,16 @@ namespace read_req const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id. if (user_itr != usr::ctx.users.end()) // match found { - std::string outputtosend; - outputtosend.swap(user_buf_itr->second.output); - const usr::connected_user &user = user_itr->second; msg::usrmsg::usrmsg_parser parser(user.protocol); - - std::vector msg; - parser.create_contract_read_response_container(msg, outputtosend); - user.session.send(msg); + for (sc::contract_output &output : user_buf_itr->second.outputs) + { + std::vector msg; + parser.create_contract_read_response_container(msg, output.message); + user.session.send(msg); + output.message.clear(); + } + user_buf_itr->second.outputs.clear(); } } } @@ -215,9 +216,9 @@ namespace read_req contract_ctx.args.state_dir = conf::ctx.state_dir; contract_ctx.args.state_dir.append("/rr_").append(std::to_string(thread_id)); contract_ctx.args.readonly = true; - sc::contract_iobuf_pair user_bufpair; - user_bufpair.inputs.push_back(std::move(read_request.content)); - contract_ctx.args.userbufs.try_emplace(read_request.pubkey, std::move(user_bufpair)); + sc::contract_iobufs user_bufs; + user_bufs.inputs.push_back(std::move(read_request.content)); + contract_ctx.args.userbufs.try_emplace(read_request.pubkey, std::move(user_bufs)); } /** diff --git a/src/util.hpp b/src/util.hpp index a838b37e..73fadda7 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -2,7 +2,6 @@ #define _HP_UTIL_ #include "pchheader.hpp" -#include "crypto.hpp" /** * Contains helper functions and data structures used by multiple other subsystems.