From d4a786e3b972511584ea98d7232bbc6eadb6ba0a Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Wed, 5 Aug 2020 21:30:48 +0530 Subject: [PATCH] Refactored NPL message processing. Passed lcl to contract args. (#105) --- examples/nodejs_contract/hp-contract-lib.js | 65 +++++++- src/cons/cons.cpp | 38 +++-- src/cons/cons.hpp | 2 +- src/msg/fbuf/p2pmsg_helpers.cpp | 6 +- src/msg/fbuf/p2pmsg_helpers.hpp | 2 +- src/p2p/p2p.hpp | 7 +- src/p2p/peer_session_handler.cpp | 12 +- src/sc.cpp | 175 +++++++++----------- src/sc.hpp | 18 +- test/vm-cluster/test.sh | 7 - 10 files changed, 190 insertions(+), 142 deletions(-) delete mode 100755 test/vm-cluster/test.sh diff --git a/examples/nodejs_contract/hp-contract-lib.js b/examples/nodejs_contract/hp-contract-lib.js index a0dc3d6a..7297f220 100644 --- a/examples/nodejs_contract/hp-contract-lib.js +++ b/examples/nodejs_contract/hp-contract-lib.js @@ -4,8 +4,18 @@ function HotPocketContract() { const hpargs = JSON.parse(fs.readFileSync(0, 'utf8')); this.readonly = hpargs.readonly; this.timestamp = hpargs.ts; - this.users = {}; + if (!this.readonly) { + const lclParts = hpargs.lcl.split("-"); + this.lcl = { + seqNo: parseInt(lclParts[0]), + hash: lclParts[1] + }; + + this.npl = new HotPocketNplChannel(hpargs.nplfd[0], hpargs.nplfd[1]); + } + + this.users = {}; Object.keys(hpargs.usrfd).forEach((userPubKey) => { const userfds = hpargs.usrfd[userPubKey]; this.users[userPubKey] = new HotPocketChannel(userfds[0], userfds[1]); @@ -22,6 +32,59 @@ function HotPocketChannel(infd, outfd) { } } +function HotPocketNplChannel(infd, outfd) { + this.readInput = function () { + if (infd == -1) + return null; + + // Input may consist of multiple messages. + // Each message has the format: + // | NPL version (1 byte) | reserve (1 byte) | msg length (2 bytes BE) | peer pubkey (32 bytes) | msg | + + const inputs = []; // Peer inputs will be populated to this. + + const buf = fs.readFileSync(infd); + let pos = 0; + while (pos < buf.byteLength) { + + pos += 2; // Skip version and reserve. + + // Read message len. + const msgLenBuf = readBytes(buf, pos, 2); + if (!msgLenBuf) break; + const msgLen = msgLenBuf.readUInt16BE(); + + pos += 2; + const pubKeyBuf = readBytes(buf, pos, 32); + if (!pubKeyBuf) break; + + pos += 32; + + const msgBuf = readBytes(buf, pos, msgLen) + if (!msgBuf) break; + + inputs.push({ + pubkey: pubKeyBuf.toString("hex"), + input: msgBuf + }); + + pos += msgLen; + } + + return inputs; + } + + this.sendOutput = function (output) { + fs.writeFileSync(outfd, output); + } + + const readBytes = function (buf, pos, count) { + if (pos + count > buf.byteLength) + return null; + return buf.slice(pos, pos + count); + } +} + module.exports = { HotPocketContract } \ No newline at end of file diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index 01d389f8..993fa1df 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -4,7 +4,6 @@ #include "../usr/user_input.hpp" #include "../p2p/p2p.hpp" #include "../msg/fbuf/p2pmsg_helpers.hpp" -#include "../msg/fbuf/common_helpers.hpp" #include "../msg/usrmsg_parser.hpp" #include "../msg/usrmsg_common.hpp" #include "../p2p/peer_session_handler.hpp" @@ -119,21 +118,24 @@ namespace cons ctx.candidate_proposals.emplace(proposal.pubkey, std::move(proposal)); } } + // Throughout consensus, we move over the incoming npl messages collected via the network so far into // the candidate npl message set (move and append). This is to have a private working set for the consensus // and avoid threading conflicts with network incoming npl messages. { std::lock_guard lock(p2p::ctx.collected_msgs.npl_messages_mutex); - for (const auto &npl : p2p::ctx.collected_msgs.npl_messages) - { - const msg::fbuf::p2pmsg::Container *container = msg::fbuf::p2pmsg::GetContainer(npl.data()); - // Only the npl messages with a valid lcl will be passed down to the contract. lcl should match the previous round's lcl - if (msg::fbuf::flatbuff_bytes_to_sv(container->lcl()) != ctx.lcl) - continue; + ctx.candidate_npl_messages.splice(ctx.candidate_npl_messages.end(), p2p::ctx.collected_msgs.npl_messages); + } - ctx.candidate_npl_messages.push_back(std::move(npl)); - } - p2p::ctx.collected_msgs.npl_messages.clear(); + // Only the npl messages with a valid lcl will be passed down to the contract. + // lcl should match the previous round's lcl. + auto itr = ctx.candidate_npl_messages.begin(); + while (itr != ctx.candidate_npl_messages.end()) + { + if (itr->lcl == ctx.lcl) + ++itr; + else + ctx.candidate_npl_messages.erase(itr++); } LOG_DBG << "Started stage " << std::to_string(ctx.stage); @@ -772,9 +774,12 @@ namespace cons { sc::contract_execution_args &args = ctx.contract_ctx.args; args.time = cons_prop.time; + args.lcl = ctx.lcl; - // Populate npl bufs and user bufs. - args.nplbufs.inputs.splice(args.nplbufs.inputs.end(), ctx.candidate_npl_messages); + // Feed NPL messages. + args.npl_messages.splice(args.npl_messages.end(), ctx.candidate_npl_messages); + + // Populate user bufs. feed_user_inputs_to_contract_bufmap(args.userbufs, cons_prop); // TODO: Do something usefull with HP<-->SC channel. @@ -786,7 +791,7 @@ namespace cons ctx.state = args.post_execution_state_hash; extract_user_outputs_from_contract_bufmap(args.userbufs); - broadcast_npl_output(args.nplbufs.output); + broadcast_npl_output(args.npl_output); sc::clear_args(args); } @@ -909,12 +914,9 @@ namespace cons { if (!output.empty()) { - p2p::npl_message npl; - npl.data.swap(output); - flatbuffers::FlatBufferBuilder fbuf(1024); - p2pmsg::create_msg_from_npl_output(fbuf, npl, ctx.lcl); - p2p::broadcast_message(fbuf, false); + p2pmsg::create_msg_from_npl_output(fbuf, output, ctx.lcl); + p2p::broadcast_message(fbuf, true); } } diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp index 252784f2..4294e68a 100644 --- a/src/cons/cons.hpp +++ b/src/cons/cons.hpp @@ -53,7 +53,7 @@ struct consensus_context std::unordered_map candidate_proposals; // The set of npl messages that are being collected as consensus stages are progressing. - std::list candidate_npl_messages; + std::list candidate_npl_messages; // Set of user pubkeys that is said to be connected to the cluster. This will be cleared in each round. std::unordered_set candidate_users; diff --git a/src/msg/fbuf/p2pmsg_helpers.cpp b/src/msg/fbuf/p2pmsg_helpers.cpp index fc8e8630..f231638d 100644 --- a/src/msg/fbuf/p2pmsg_helpers.cpp +++ b/src/msg/fbuf/p2pmsg_helpers.cpp @@ -372,17 +372,17 @@ namespace msg::fbuf::p2pmsg /** * Ctreat npl message from the given npl output srtuct. * @param container_builder Flatbuffer builder for the container message. - * @param n The npl struct to be placed in the container message. + * @param msg The message to be sent as NPL message. * @param lcl Lcl value to be passed in the container message. */ - void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builder, const p2p::npl_message &n, std::string_view lcl) + void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builder, const std::string_view &msg, std::string_view lcl) { flatbuffers::FlatBufferBuilder builder(1024); const flatbuffers::Offset npl = CreateNpl_Message( builder, - sv_to_flatbuff_bytes(builder, n.data)); + sv_to_flatbuff_bytes(builder, msg)); const flatbuffers::Offset message = CreateContent(builder, Message_Npl_Message, npl.Union()); builder.Finish(message); // Finished building message content to get serialised content. diff --git a/src/msg/fbuf/p2pmsg_helpers.hpp b/src/msg/fbuf/p2pmsg_helpers.hpp index c4fa76dc..9fc4e4ba 100644 --- a/src/msg/fbuf/p2pmsg_helpers.hpp +++ b/src/msg/fbuf/p2pmsg_helpers.hpp @@ -51,7 +51,7 @@ namespace msg::fbuf::p2pmsg void create_msg_from_history_response(flatbuffers::FlatBufferBuilder &container_builder, const p2p::history_response &hr); - void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builder, const p2p::npl_message &npl, std::string_view lcl); + void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builder, const std::string_view &msg, std::string_view lcl); void create_msg_from_state_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::state_request &hr, std::string_view lcl); diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 44950d48..bebbd8c0 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -64,8 +64,11 @@ namespace p2p LEDGER_RESPONSE_ERROR error; }; + // Represents an NPL message sent by a peer. struct npl_message { + std::string pubkey; // Peer binary pubkey. + std::string lcl; // LCL of the peer. std::string data; }; @@ -103,8 +106,8 @@ namespace p2p std::list nonunl_proposals; std::mutex nonunl_proposals_mutex; // Mutex for non-unl proposals access race conditions. - // NPL messages are stored as string list because we are feeding the npl messages as it is (byte array) to the contract. - std::list npl_messages; + // List of NPL messages collected from peers. + std::list npl_messages; std::mutex npl_messages_mutex; // Mutex for npl_messages access race conditions. // List of pairs indicating the session pubkey hex and the state requests. diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 667dba6e..91e45224 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -135,12 +135,12 @@ namespace p2p std::lock_guard lock(ctx.collected_msgs.npl_messages_mutex); // Insert npl message with lock. - // Npl messages are added to the npl message array as it is without deserealizing the content. The same content will be passed down - // to the contract as input in a binary format - const uint8_t *container_buf_ptr = reinterpret_cast(message.data()); - const size_t container_buf_size = message.length(); - const std::string npl_message(reinterpret_cast(container_buf_ptr), container_buf_size); - ctx.collected_msgs.npl_messages.push_back(std::move(npl_message)); + const p2pmsg::Npl_Message *npl_p2p_msg = content->message_as_Npl_Message(); + npl_message msg; + msg.data = msg::fbuf::flatbuff_bytes_to_sv(npl_p2p_msg->data()); + msg.pubkey = msg::fbuf::flatbuff_bytes_to_sv(container->pubkey()); + msg.lcl = msg::fbuf::flatbuff_bytes_to_sv(container->lcl()); + ctx.collected_msgs.npl_messages.push_back(std::move(msg)); } else if (content_message_type == p2pmsg::Message_State_Request_Message) { diff --git a/src/sc.cpp b/src/sc.cpp index 88c15ca6..1963033b 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -1,9 +1,6 @@ #include "pchheader.hpp" #include "conf.hpp" #include "hplog.hpp" -#include "msg/fbuf/common_helpers.hpp" -#include "msg/fbuf/p2pmsg_container_generated.h" -#include "msg/fbuf/p2pmsg_content_generated.h" #include "sc.hpp" #include "hpfs/hpfs.hpp" @@ -24,7 +21,7 @@ namespace sc if (!ctx.args.readonly) { - create_iopipes(ctx.nplfds, !ctx.args.nplbufs.inputs.empty()); + create_iopipes(ctx.nplfds, !ctx.args.npl_messages.empty()); create_iopipes(ctx.hpscfds, !ctx.args.hpscbufs.inputs.empty()); } @@ -173,11 +170,12 @@ namespace sc * "version":"", * "pubkey": "", * "ts": , + * "readonly": , + * "lcl": "", (eg: 169-a1d82eb4c9ed005ec2c4f4f82b6f0c2fd7543d66b1a0f6b8e58ae670b3e2bcfb) * "hpfd": [fd0, fd1], - * "usrfd":{ "":[fd0, fd1], ... }, * "nplfd":[fd0, fd1], - * "unl":[ "pkhex", ... ], - * "readonly": + * "usrfd":{ "":[fd0, fd1], ... }, + * "unl":[ "pkhex", ... ] * } */ int write_contract_args(const execution_context &ctx) @@ -194,7 +192,8 @@ namespace sc if (!ctx.args.readonly) { - os << ",\"hpfd\":[" << ctx.hpscfds[FDTYPE::SCREAD] << "," << ctx.hpscfds[FDTYPE::SCWRITE] + os << ",\"lcl\":\"" << ctx.args.lcl + << "\",\"hpfd\":[" << ctx.hpscfds[FDTYPE::SCREAD] << "," << ctx.hpscfds[FDTYPE::SCWRITE] << "],\"nplfd\":[" << ctx.nplfds[FDTYPE::SCREAD] << "," << ctx.nplfds[FDTYPE::SCWRITE] << "]"; } @@ -250,8 +249,12 @@ namespace sc int feed_inputs(execution_context &ctx) { - // Write any hp or npl input messages to hp->sc and npl->sc pipe. - if (!ctx.args.readonly && write_contract_hp_npl_inputs(ctx) != 0) + // Write any input messages to hp->sc pipe. + if (!ctx.args.readonly && write_contract_hp_inputs(ctx) != 0) + return -1; + + // Write any NPL messages to contract. + if (!ctx.args.readonly && write_npl_messages(ctx) != 0) return -1; // Write any verified (consensus-reached) user inputs to user pipes. @@ -298,7 +301,7 @@ namespace sc /** * Writes any hp input messages to the contract. */ - int write_contract_hp_npl_inputs(execution_context &ctx) + int write_contract_hp_inputs(execution_context &ctx) { if (write_iopipe(ctx.hpscfds, ctx.args.hpscbufs.inputs) != 0) { @@ -306,13 +309,69 @@ namespace sc return -1; } - if (write_npl_iopipe(ctx.nplfds, ctx.args.nplbufs.inputs) != 0) + return 0; + } + + /** + * Write npl messages to the contract. + */ + int write_npl_messages(execution_context &ctx) + { + /** + * npl inputs are feed into the contract in a binary protocol. It follows the following pattern + * |**NPL version (1 byte)**|**Reserved (1 byte)**|**Length of the message (2 bytes)**|**Public key (32 bytes)**|**Npl message data**| + * Length of the message is calculated without including public key length + */ + const int writefd = ctx.nplfds[FDTYPE::HPWRITE]; + if (writefd == -1) + return 0; + + bool write_error = false; + if (!ctx.args.npl_messages.empty()) { - LOG_ERR << "Error writing NPL inputs to SC"; - return -1; + const size_t total_memsegs = ctx.args.npl_messages.size() * 3; + iovec memsegs[total_memsegs]; + size_t i = 0; + for (const auto &npl_msg : ctx.args.npl_messages) + { + const uint8_t pre_header_index = i * 3; + const uint8_t pubkey_index = pre_header_index + 1; + const uint8_t msg_index = pre_header_index + 2; + + const uint16_t msg_len = npl_msg.data.size(); + + // Header is |version(1byte)|reserve(1byte)|msg length(2bytes big endian)| + uint8_t header[4]; + header[0] = util::MIN_NPL_INPUT_VERSION; + + // Store msg length in big endian. + header[2] = msg_len << 8; + header[3] = msg_len; + + memsegs[pre_header_index].iov_base = header; + memsegs[pre_header_index].iov_len = sizeof(header); + + // Pubkey without the key type prefix. + memsegs[pubkey_index].iov_base = reinterpret_cast(const_cast(npl_msg.pubkey.data() + 1)); + memsegs[pubkey_index].iov_len = npl_msg.pubkey.size() - 1; + + memsegs[msg_index].iov_base = reinterpret_cast(const_cast(npl_msg.data.data())); + memsegs[msg_index].iov_len = msg_len; + + i++; + } + + if (writev(writefd, memsegs, total_memsegs) == -1) + write_error = true; + + ctx.args.npl_messages.clear(); } - return 0; + // Close the writefd since we no longer need it. + close(writefd); + ctx.nplfds[FDTYPE::HPWRITE] = -1; + + return write_error ? -1 : 0; } /** @@ -330,7 +389,7 @@ namespace sc return -1; } - const int npl_res = read_iopipe(ctx.nplfds, ctx.args.nplbufs.output); + const int npl_res = read_iopipe(ctx.nplfds, ctx.args.npl_output); if (npl_res == -1) { LOG_ERR << "Error reading NPL output from the contract."; @@ -521,85 +580,6 @@ namespace sc return write_error ? -1 : 0; } - /** - * Write the given input buffer into the write fd from the HP side. - * @param fds Vector of fd list. - * @param inputs Buffer to write into the HP write fd. - */ - int write_npl_iopipe(std::vector &fds, std::list &inputs) - { - /** - * npl inputs are feed into the contract in a binary protocol. It follows the following pattern - * |**NPL version (1 byte)**|**Reserved (1 byte)**|**Length of the message (2 bytes)**|**Public key (4 bytes)**|**Npl message data**| - * Length of the message is calculated without including public key length - */ - const int writefd = fds[FDTYPE::HPWRITE]; - if (writefd == -1) - return 0; - - bool write_error = false; - if (!inputs.empty()) - { - int8_t total_memsegs = inputs.size() * 3; - iovec memsegs[total_memsegs]; - size_t i = 0; - for (auto &input : inputs) - { - int8_t pre_header_index = i * 3; - int8_t pubkey_index = pre_header_index + 1; - int8_t msg_index = pre_header_index + 2; - - // First binary representation of version, reserve and message length is constructed and feed it into - // memory segment. Then the public key and at last the message data - - // At the moment no data is inserted as reserve - uint8_t reserve = 0; - - //Get message container - const msg::fbuf::p2pmsg::Container *container = msg::fbuf::p2pmsg::GetContainer(input.data()); - const flatbuffers::Vector *container_content = container->content(); - - uint16_t msg_length = container_content->size(); - - /** - * Pre header is constructed using bit shifting. This will generate a bit pattern as explain in the example below - * version = 00000001 - * reserve = 00000000 - * msg_length = 0000000010001101 - * pre_header = 00000001000000000000000010001101 - */ - uint32_t pre_header = util::MIN_NPL_INPUT_VERSION; - pre_header = pre_header << 8; - pre_header += reserve; - - pre_header = pre_header << 16; - pre_header += msg_length; - memsegs[pre_header_index].iov_base = &pre_header; - memsegs[pre_header_index].iov_len = 4; - - std::string_view msg_pubkey = msg::fbuf::flatbuff_bytes_to_sv(container->pubkey()); - memsegs[pubkey_index].iov_base = reinterpret_cast(const_cast(msg_pubkey.data())); - memsegs[pubkey_index].iov_len = msg_pubkey.size(); - - memsegs[msg_index].iov_base = reinterpret_cast(const_cast(container_content->Data())); - memsegs[msg_index].iov_len = container_content->size(); - - i++; - } - - if (writev(writefd, memsegs, total_memsegs) == -1) - write_error = true; - - inputs.clear(); - } - - // Close the writefd since we no longer need it. - close(writefd); - fds[FDTYPE::HPWRITE] = -1; - - return write_error ? -1 : 0; - } - /** * Common function to read buffered output from the pipe and populate the output list. * @param fds Vector representing the pipes fd list. @@ -701,9 +681,10 @@ namespace sc args.userbufs.clear(); args.hpscbufs.inputs.clear(); args.hpscbufs.output.clear(); - args.nplbufs.inputs.clear(); - args.nplbufs.output.clear(); + args.npl_messages.clear(); + args.npl_output.clear(); args.time = 0; + args.lcl.clear(); args.post_execution_state_hash = hpfs::h32_empty; } diff --git a/src/sc.hpp b/src/sc.hpp index 81436632..9f538a1c 100644 --- a/src/sc.hpp +++ b/src/sc.hpp @@ -5,6 +5,7 @@ #include "usr/usr.hpp" #include "hpfs/h32.hpp" #include "util.hpp" +#include "p2p/p2p.hpp" /** * Contains helper functions regarding POSIX process execution and IPC between HP and SC. @@ -62,9 +63,11 @@ namespace sc // The value is a pair holding consensus-verified inputs and contract-generated outputs. contract_bufmap_t userbufs; - // Pair of NPL<->SC byte array message buffers. - // Input buffers for NPL->SC messages, Output buffers for SC->NPL messages. - contract_iobuf_pair nplbufs; + // NPL messages to be passed into contract. + std::list npl_messages; + + // Output NPL buffer. + std::string npl_output; // Pair of HP<->SC JSON message buffers (mainly used for control messages). // Input buffers for HP->SC messages, Output buffers for SC->HP messages. @@ -73,6 +76,9 @@ namespace sc // Current HotPocket consensus time. int64_t time = 0; + // Current HotPocket lcl (seq no. and ledger hash hex) + std::string lcl; + // State hash after execution will be copied to this (not applicable to read only mode). hpfs::h32 post_execution_state_hash = hpfs::h32_empty; }; @@ -123,7 +129,9 @@ namespace sc int fetch_outputs(execution_context &ctx); - int write_contract_hp_npl_inputs(execution_context &ctx); + int write_contract_hp_inputs(execution_context &ctx); + + int write_npl_messages(execution_context &ctx); int read_contract_hp_npl_outputs(execution_context &ctx); @@ -143,8 +151,6 @@ namespace sc int write_iopipe(std::vector &fds, std::list &inputs); - int write_npl_iopipe(std::vector &fds, std::list &inputs); - int read_iopipe(std::vector &fds, std::string &output); void close_unused_fds(execution_context &ctx, const bool is_hp); diff --git a/test/vm-cluster/test.sh b/test/vm-cluster/test.sh deleted file mode 100755 index 62dc02fd..00000000 --- a/test/vm-cluster/test.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - -string="azure.com" -#name=${string%%.*} -name=${string##*azure} - -echo [$name] \ No newline at end of file