diff --git a/examples/echocontract/contract.js b/examples/echocontract/contract.js index 44796a22..87b83456 100644 --- a/examples/echocontract/contract.js +++ b/examples/echocontract/contract.js @@ -23,6 +23,13 @@ Object.keys(hpargs.usrfd).forEach(function (key, index) { } }); +let nplinput = fs.readFileSync(hpargs.nplfd[0], 'utf8'); +if (nplinput.length > 0) { + console.log("Input received from hp:"); + console.log(nplinput); + fs.writeSync(hpargs.nplfd[1], "Echoing: " + nplinput); +} + let hpinput = fs.readFileSync(hpargs.hpfd[0], 'utf8'); if (hpinput.length > 0) { //console.log("Input received from hp:"); diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index 57be03a1..32f71de5 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -5,6 +5,7 @@ #include "../usr/user_input.hpp" #include "../p2p/p2p.hpp" #include "../fbschema/p2pmsg_helpers.hpp" +#include "../fbschema/common_helpers.hpp" #include "../jsonschema/usrmsg_helpers.hpp" #include "../p2p/peer_session_handler.hpp" #include "../hplog.hpp" @@ -72,6 +73,23 @@ void consensus() } LOG_DBG << "timenow: " << std::to_string(ctx.time_now); + // Throughout consensus, we move over the incoming npl messages collected via the network so far into + // the candidate npl message set (move and append). This is to have a private working set for the consensus + // and avoid threading conflicts with network incoming npl messages. + { + std::lock_guard lock(p2p::ctx.collected_msgs.npl_messages_mutex); + for (const auto &npl : p2p::ctx.collected_msgs.npl_messages) + { + const fbschema::p2pmsg::Container *container = fbschema::p2pmsg::GetContainer(npl.data()); + // Only the npl messages with a valid lcl will be passed down to the contract. lcl should match the previous round's lcl + if (fbschema::flatbuff_bytes_to_sv(container->lcl()) != ctx.lcl) + continue; + + ctx.candidate_npl_messages.push_back(std::move(npl)); + } + p2p::ctx.collected_msgs.npl_messages.clear(); + } + if (ctx.stage == 0) { // Stage 0 means begining of a consensus round. @@ -192,7 +210,7 @@ void broadcast_nonunl_proposal() p2p::peer_outbound_message msg(std::make_shared(1024)); p2pmsg::create_msg_from_nonunl_proposal(msg.builder(), nup); - p2p::broadcast_message(msg); + p2p::broadcast_message(msg, true); LOG_DBG << "NUP sent." << " users:" << nup.user_messages.size(); @@ -370,7 +388,7 @@ void broadcast_proposal(const p2p::proposal &p) p2p::peer_outbound_message msg(std::make_shared(1024)); p2pmsg::create_msg_from_proposal(msg.builder(), p); - p2p::broadcast_message(msg); + p2p::broadcast_message(msg, true); LOG_DBG << "Proposed [stage" << std::to_string(p.stage) << "] users:" << p.users.size() @@ -541,11 +559,16 @@ void apply_ledger(const p2p::proposal &cons_prop) proc::contract_fblockmap_t updated_blocks; proc::contract_bufmap_t useriobufmap; - feed_inputs_to_contract_bufmap(useriobufmap, cons_prop); - run_contract_binary(cons_prop.time, useriobufmap, updated_blocks); + proc::contract_iobuf_pair nplbufpair; + nplbufpair.inputs.splice(nplbufpair.inputs.end(), ctx.candidate_npl_messages); - extract_outputs_from_contract_bufmap(useriobufmap); + feed_user_inputs_to_contract_bufmap(useriobufmap, cons_prop); + + run_contract_binary(cons_prop.time, useriobufmap, nplbufpair, updated_blocks); + + extract_user_outputs_from_contract_bufmap(useriobufmap); + broadcast_npl_output(nplbufpair.output); update_state_blockmap(updated_blocks); } @@ -601,7 +624,7 @@ void dispatch_user_outputs(const p2p::proposal &cons_prop) * @param bufmap The contract bufmap which needs to be populated with inputs. * @param cons_prop The proposal that achieved consensus. */ -void feed_inputs_to_contract_bufmap(proc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop) +void feed_user_inputs_to_contract_bufmap(proc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop) { // Populate the buf map with all currently connected users regardless of whether they have inputs or not. // This is in case the contract wanted to emit some data to a user without needing any input. @@ -641,7 +664,7 @@ void feed_inputs_to_contract_bufmap(proc::contract_bufmap_t &bufmap, const p2p:: * for the next consensus round. * @param bufmap The contract bufmap containing the outputs produced by the contract. */ -void extract_outputs_from_contract_bufmap(proc::contract_bufmap_t &bufmap) +void extract_user_outputs_from_contract_bufmap(proc::contract_bufmap_t &bufmap) { for (auto &[pubkey, bufpair] : bufmap) { @@ -658,19 +681,30 @@ void extract_outputs_from_contract_bufmap(proc::contract_bufmap_t &bufmap) } } +void broadcast_npl_output(std::string &output) +{ + if (!output.empty()) + { + p2p::npl_message npl; + npl.data.swap(output); + + p2p::peer_outbound_message msg(std::make_shared(1024)); + p2pmsg::create_msg_from_npl_output(msg.builder(), npl, ctx.lcl); + p2p::broadcast_message(msg, false); + } +} + /** * Executes the smart contract with the specified time and provided I/O buf maps. * @param time_now The time that must be passed on to the contract. * @param useriobufmap The contract bufmap which holds user I/O buffers. */ -void run_contract_binary(const int64_t time_now, proc::contract_bufmap_t &useriobufmap, proc::contract_fblockmap_t &state_updates) +void run_contract_binary(const int64_t time_now, proc::contract_bufmap_t &useriobufmap, proc::contract_iobuf_pair &nplbufpair, proc::contract_fblockmap_t &state_updates) { - // todo:implement exchange of npl and hpsc bufs - proc::contract_bufmap_t nplbufmap; + // todo:implement exchange of hpsc bufs proc::contract_iobuf_pair hpscbufpair; - proc::exec_contract( - proc::contract_exec_args(time_now, useriobufmap, nplbufmap, hpscbufpair, state_updates)); + proc::contract_exec_args(time_now, useriobufmap, nplbufpair, hpscbufpair, state_updates)); } /** diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp index 6a50b70a..d9c72f6f 100644 --- a/src/cons/cons.hpp +++ b/src/cons/cons.hpp @@ -47,6 +47,9 @@ struct consensus_context // The set of proposals that are being collected as consensus stages are progressing. std::list candidate_proposals; + // The set of npl messages that are being collected as consensus stages are progressing. + std::list candidate_npl_messages; + // Set of user pubkeys that is said to be connected to the cluster. This will be cleared in each round. std::unordered_set candidate_users; @@ -111,11 +114,13 @@ void apply_ledger(const p2p::proposal &proposal); void dispatch_user_outputs(const p2p::proposal &cons_prop); -void feed_inputs_to_contract_bufmap(proc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop); +void feed_user_inputs_to_contract_bufmap(proc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop); -void extract_outputs_from_contract_bufmap(proc::contract_bufmap_t &bufmap); +void extract_user_outputs_from_contract_bufmap(proc::contract_bufmap_t &bufmap); -void run_contract_binary(const int64_t time_now, proc::contract_bufmap_t &useriobufmap, proc::contract_fblockmap_t &state_updates); +void broadcast_npl_output(std::string &output); + +void run_contract_binary(const int64_t time_now, proc::contract_bufmap_t &useriobufmap, proc::contract_iobuf_pair &nplbufpair, proc::contract_fblockmap_t &state_updates); template void increment(std::map &counter, const T &candidate); diff --git a/src/fbschema/p2pmsg_container.fbs b/src/fbschema/p2pmsg_container.fbs index a76958b3..bc0e8c0f 100644 --- a/src/fbschema/p2pmsg_container.fbs +++ b/src/fbschema/p2pmsg_container.fbs @@ -9,6 +9,7 @@ table Container { //root type for message version:uint16; timestamp:uint64; pubkey:[ubyte]; + lcl:[ubyte]; signature:[ubyte]; // signature of the message content content:[ubyte]; // message content: byte array of proposal,npl,etc } diff --git a/src/fbschema/p2pmsg_container_generated.h b/src/fbschema/p2pmsg_container_generated.h index b9c3a3df..4d0b995a 100644 --- a/src/fbschema/p2pmsg_container_generated.h +++ b/src/fbschema/p2pmsg_container_generated.h @@ -1,8 +1,8 @@ // automatically generated by the FlatBuffers compiler, do not modify -#ifndef FLATBUFFERS_GENERATED_P2PMSGCONTAINER_FBSCHEMA_P2PMSG_ -#define FLATBUFFERS_GENERATED_P2PMSGCONTAINER_FBSCHEMA_P2PMSG_ +#ifndef FLATBUFFERS_GENERATED_P2PMSGCONTAINER_FBSCHEMA_P2PMSG_H_ +#define FLATBUFFERS_GENERATED_P2PMSGCONTAINER_FBSCHEMA_P2PMSG_H_ #include "flatbuffers/flatbuffers.h" @@ -19,8 +19,9 @@ struct Container FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VT_VERSION = 4, VT_TIMESTAMP = 6, VT_PUBKEY = 8, - VT_SIGNATURE = 10, - VT_CONTENT = 12 + VT_LCL = 10, + VT_SIGNATURE = 12, + VT_CONTENT = 14 }; uint16_t version() const { return GetField(VT_VERSION, 0); @@ -40,6 +41,12 @@ struct Container FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { flatbuffers::Vector *mutable_pubkey() { return GetPointer *>(VT_PUBKEY); } + const flatbuffers::Vector *lcl() const { + return GetPointer *>(VT_LCL); + } + flatbuffers::Vector *mutable_lcl() { + return GetPointer *>(VT_LCL); + } const flatbuffers::Vector *signature() const { return GetPointer *>(VT_SIGNATURE); } @@ -58,6 +65,8 @@ struct Container FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VerifyField(verifier, VT_TIMESTAMP) && VerifyOffset(verifier, VT_PUBKEY) && verifier.VerifyVector(pubkey()) && + VerifyOffset(verifier, VT_LCL) && + verifier.VerifyVector(lcl()) && VerifyOffset(verifier, VT_SIGNATURE) && verifier.VerifyVector(signature()) && VerifyOffset(verifier, VT_CONTENT) && @@ -78,6 +87,9 @@ struct ContainerBuilder { void add_pubkey(flatbuffers::Offset> pubkey) { fbb_.AddOffset(Container::VT_PUBKEY, pubkey); } + void add_lcl(flatbuffers::Offset> lcl) { + fbb_.AddOffset(Container::VT_LCL, lcl); + } void add_signature(flatbuffers::Offset> signature) { fbb_.AddOffset(Container::VT_SIGNATURE, signature); } @@ -101,12 +113,14 @@ inline flatbuffers::Offset CreateContainer( uint16_t version = 0, uint64_t timestamp = 0, flatbuffers::Offset> pubkey = 0, + flatbuffers::Offset> lcl = 0, flatbuffers::Offset> signature = 0, flatbuffers::Offset> content = 0) { ContainerBuilder builder_(_fbb); builder_.add_timestamp(timestamp); builder_.add_content(content); builder_.add_signature(signature); + builder_.add_lcl(lcl); builder_.add_pubkey(pubkey); builder_.add_version(version); return builder_.Finish(); @@ -117,9 +131,11 @@ inline flatbuffers::Offset CreateContainerDirect( uint16_t version = 0, uint64_t timestamp = 0, const std::vector *pubkey = nullptr, + const std::vector *lcl = nullptr, const std::vector *signature = nullptr, const std::vector *content = nullptr) { auto pubkey__ = pubkey ? _fbb.CreateVector(*pubkey) : 0; + auto lcl__ = lcl ? _fbb.CreateVector(*lcl) : 0; auto signature__ = signature ? _fbb.CreateVector(*signature) : 0; auto content__ = content ? _fbb.CreateVector(*content) : 0; return fbschema::p2pmsg::CreateContainer( @@ -127,6 +143,7 @@ inline flatbuffers::Offset CreateContainerDirect( version, timestamp, pubkey__, + lcl__, signature__, content__); } @@ -168,4 +185,4 @@ inline void FinishSizePrefixedContainerBuffer( } // namespace p2pmsg } // namespace fbschema -#endif // FLATBUFFERS_GENERATED_P2PMSGCONTAINER_FBSCHEMA_P2PMSG_ +#endif // FLATBUFFERS_GENERATED_P2PMSGCONTAINER_FBSCHEMA_P2PMSG_H_ diff --git a/src/fbschema/p2pmsg_content.fbs b/src/fbschema/p2pmsg_content.fbs index 08280046..a46158b1 100644 --- a/src/fbschema/p2pmsg_content.fbs +++ b/src/fbschema/p2pmsg_content.fbs @@ -25,7 +25,6 @@ table NonUnl_Proposal_Message { table Proposal_Message { //Proposal type message schema stage:uint8; time:uint64; - lcl:[ubyte]; users:[ByteArray]; hash_inputs:[ByteArray]; //stage > 0 inputs (hash of stage 0 inputs) hash_outputs:[ByteArray]; //stage > 0 outputs (hash of stage 0 outputs) @@ -33,10 +32,7 @@ table Proposal_Message { //Proposal type message schema } table Npl_Message { //NPL type message schema - pubkey:[ubyte]; - timestamp:uint64; data:[ubyte]; - lcl:[ubyte]; } table StateDifference { //Represent state difference by tracking created,updated and deleted state files. diff --git a/src/fbschema/p2pmsg_content_generated.h b/src/fbschema/p2pmsg_content_generated.h index c3c5cbe1..cdc26094 100644 --- a/src/fbschema/p2pmsg_content_generated.h +++ b/src/fbschema/p2pmsg_content_generated.h @@ -360,11 +360,10 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_STAGE = 4, VT_TIME = 6, - VT_LCL = 8, - VT_USERS = 10, - VT_HASH_INPUTS = 12, - VT_HASH_OUTPUTS = 14, - VT_STATE = 16 + VT_USERS = 8, + VT_HASH_INPUTS = 10, + VT_HASH_OUTPUTS = 12, + VT_STATE = 14 }; uint8_t stage() const { return GetField(VT_STAGE, 0); @@ -378,12 +377,6 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { bool mutate_time(uint64_t _time) { return SetField(VT_TIME, _time, 0); } - const flatbuffers::Vector *lcl() const { - return GetPointer *>(VT_LCL); - } - flatbuffers::Vector *mutable_lcl() { - return GetPointer *>(VT_LCL); - } const flatbuffers::Vector> *users() const { return GetPointer> *>(VT_USERS); } @@ -412,8 +405,6 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { return VerifyTableStart(verifier) && VerifyField(verifier, VT_STAGE) && VerifyField(verifier, VT_TIME) && - VerifyOffset(verifier, VT_LCL) && - verifier.VerifyVector(lcl()) && VerifyOffset(verifier, VT_USERS) && verifier.VerifyVector(users()) && verifier.VerifyVectorOfTables(users()) && @@ -438,9 +429,6 @@ struct Proposal_MessageBuilder { void add_time(uint64_t time) { fbb_.AddElement(Proposal_Message::VT_TIME, time, 0); } - void add_lcl(flatbuffers::Offset> lcl) { - fbb_.AddOffset(Proposal_Message::VT_LCL, lcl); - } void add_users(flatbuffers::Offset>> users) { fbb_.AddOffset(Proposal_Message::VT_USERS, users); } @@ -469,7 +457,6 @@ inline flatbuffers::Offset CreateProposal_Message( flatbuffers::FlatBufferBuilder &_fbb, uint8_t stage = 0, uint64_t time = 0, - flatbuffers::Offset> lcl = 0, flatbuffers::Offset>> users = 0, flatbuffers::Offset>> hash_inputs = 0, flatbuffers::Offset>> hash_outputs = 0, @@ -480,7 +467,6 @@ inline flatbuffers::Offset CreateProposal_Message( builder_.add_hash_outputs(hash_outputs); builder_.add_hash_inputs(hash_inputs); builder_.add_users(users); - builder_.add_lcl(lcl); builder_.add_stage(stage); return builder_.Finish(); } @@ -489,12 +475,10 @@ inline flatbuffers::Offset CreateProposal_MessageDirect( flatbuffers::FlatBufferBuilder &_fbb, uint8_t stage = 0, uint64_t time = 0, - const std::vector *lcl = nullptr, const std::vector> *users = nullptr, const std::vector> *hash_inputs = nullptr, const std::vector> *hash_outputs = nullptr, flatbuffers::Offset state = 0) { - auto lcl__ = lcl ? _fbb.CreateVector(*lcl) : 0; auto users__ = users ? _fbb.CreateVector>(*users) : 0; auto hash_inputs__ = hash_inputs ? _fbb.CreateVector>(*hash_inputs) : 0; auto hash_outputs__ = hash_outputs ? _fbb.CreateVector>(*hash_outputs) : 0; @@ -502,7 +486,6 @@ inline flatbuffers::Offset CreateProposal_MessageDirect( _fbb, stage, time, - lcl__, users__, hash_inputs__, hash_outputs__, @@ -511,44 +494,18 @@ inline flatbuffers::Offset CreateProposal_MessageDirect( struct Npl_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_PUBKEY = 4, - VT_TIMESTAMP = 6, - VT_DATA = 8, - VT_LCL = 10 + VT_DATA = 4 }; - const flatbuffers::Vector *pubkey() const { - return GetPointer *>(VT_PUBKEY); - } - flatbuffers::Vector *mutable_pubkey() { - return GetPointer *>(VT_PUBKEY); - } - uint64_t timestamp() const { - return GetField(VT_TIMESTAMP, 0); - } - bool mutate_timestamp(uint64_t _timestamp) { - return SetField(VT_TIMESTAMP, _timestamp, 0); - } const flatbuffers::Vector *data() const { return GetPointer *>(VT_DATA); } flatbuffers::Vector *mutable_data() { return GetPointer *>(VT_DATA); } - const flatbuffers::Vector *lcl() const { - return GetPointer *>(VT_LCL); - } - flatbuffers::Vector *mutable_lcl() { - return GetPointer *>(VT_LCL); - } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && - VerifyOffset(verifier, VT_PUBKEY) && - verifier.VerifyVector(pubkey()) && - VerifyField(verifier, VT_TIMESTAMP) && VerifyOffset(verifier, VT_DATA) && verifier.VerifyVector(data()) && - VerifyOffset(verifier, VT_LCL) && - verifier.VerifyVector(lcl()) && verifier.EndTable(); } }; @@ -556,18 +513,9 @@ struct Npl_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { struct Npl_MessageBuilder { flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; - void add_pubkey(flatbuffers::Offset> pubkey) { - fbb_.AddOffset(Npl_Message::VT_PUBKEY, pubkey); - } - void add_timestamp(uint64_t timestamp) { - fbb_.AddElement(Npl_Message::VT_TIMESTAMP, timestamp, 0); - } void add_data(flatbuffers::Offset> data) { fbb_.AddOffset(Npl_Message::VT_DATA, data); } - void add_lcl(flatbuffers::Offset> lcl) { - fbb_.AddOffset(Npl_Message::VT_LCL, lcl); - } explicit Npl_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { start_ = fbb_.StartTable(); @@ -582,33 +530,19 @@ struct Npl_MessageBuilder { inline flatbuffers::Offset CreateNpl_Message( flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset> pubkey = 0, - uint64_t timestamp = 0, - flatbuffers::Offset> data = 0, - flatbuffers::Offset> lcl = 0) { + flatbuffers::Offset> data = 0) { Npl_MessageBuilder builder_(_fbb); - builder_.add_timestamp(timestamp); - builder_.add_lcl(lcl); builder_.add_data(data); - builder_.add_pubkey(pubkey); return builder_.Finish(); } inline flatbuffers::Offset CreateNpl_MessageDirect( flatbuffers::FlatBufferBuilder &_fbb, - const std::vector *pubkey = nullptr, - uint64_t timestamp = 0, - const std::vector *data = nullptr, - const std::vector *lcl = nullptr) { - auto pubkey__ = pubkey ? _fbb.CreateVector(*pubkey) : 0; + const std::vector *data = nullptr) { auto data__ = data ? _fbb.CreateVector(*data) : 0; - auto lcl__ = lcl ? _fbb.CreateVector(*lcl) : 0; return fbschema::p2pmsg::CreateNpl_Message( _fbb, - pubkey__, - timestamp, - data__, - lcl__); + data__); } struct StateDifference FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { diff --git a/src/fbschema/p2pmsg_helpers.cpp b/src/fbschema/p2pmsg_helpers.cpp index 78f32986..9d077ede 100644 --- a/src/fbschema/p2pmsg_helpers.cpp +++ b/src/fbschema/p2pmsg_helpers.cpp @@ -147,7 +147,7 @@ int validate_and_extract_content(const Content **content_ref, const uint8_t *con const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const NonUnl_Proposal_Message &msg, const uint64_t timestamp) { p2p::nonunl_proposal nup; - + if (msg.usermessages()) nup.user_messages = flatbuf_usermsgsmap_to_usermsgsmap(msg.usermessages()); @@ -159,7 +159,7 @@ const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const NonUnl_Proposal * @param The Flatbuffer poporal received from the peer. * @return A proposal struct representing the message. */ -const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector *pubkey, const uint64_t timestamp) +const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector *pubkey, const uint64_t timestamp, const flatbuffers::Vector *lcl) { p2p::proposal p; @@ -167,9 +167,7 @@ const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const p.timestamp = timestamp; p.time = msg.time(); p.stage = msg.stage(); - - if (msg.lcl()) - p.lcl = flatbuff_bytes_to_sv(msg.lcl()); + p.lcl = flatbuff_bytes_to_sv(lcl); if (msg.users()) p.users = flatbuf_bytearrayvector_to_stringlist(msg.users()); @@ -199,7 +197,7 @@ void create_msg_from_nonunl_proposal(flatbuffers::FlatBufferBuilder &container_b // Now that we have built the content message, // we need to sign it and place it inside a container message. - create_containermsg_from_content(container_builder, builder, false); + create_containermsg_from_content(container_builder, builder, nullptr, false); } /** @@ -217,7 +215,6 @@ void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder, builder, p.stage, p.time, - sv_to_flatbuff_bytes(builder, p.lcl), stringlist_to_flatbuf_bytearrayvector(builder, p.users), stringlist_to_flatbuf_bytearrayvector(builder, p.hash_inputs), stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs)); @@ -227,7 +224,30 @@ void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder, // Now that we have built the content message, // we need to sign it and place it inside a container message. - create_containermsg_from_content(container_builder, builder, true); + create_containermsg_from_content(container_builder, builder, p.lcl, true); +} + +/** + * Ctreat npl message from the given npl output srtuct. + * @param container_builder Flatbuffer builder for the container message. + * @param n The npl struct to be placed in the container message. + * @param lcl Lcl value to be passed in the container message. + */ +void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builder, const p2p::npl_message &n, std::string_view lcl) +{ + flatbuffers::FlatBufferBuilder builder(1024); + + const flatbuffers::Offset npl = + CreateNpl_Message( + builder, + sv_to_flatbuff_bytes(builder, n.data)); + + const flatbuffers::Offset message = CreateContent(builder, Message_Npl_Message, npl.Union()); + builder.Finish(message); // Finished building message content to get serialised content. + + // Now that we have built the content message, + // we need to sign it and place it inside a container message. + create_containermsg_from_content(container_builder, builder, lcl, true); } /** @@ -238,7 +258,7 @@ void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder, * @param sign Whether to sign the message content. */ void create_containermsg_from_content( - flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, const bool sign) + flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, std::string_view lcl, const bool sign) { const uint8_t *content_buf = content_builder.GetBufferPointer(); const flatbuffers::uoffset_t content_size = content_builder.GetSize(); @@ -249,6 +269,8 @@ void create_containermsg_from_content( flatbuffers::Offset> pubkey_offset = 0; flatbuffers::Offset> sig_offset = 0; + flatbuffers::Offset> lcl_offset = 0; + if (sign) { // Sign message content with this node's private key. @@ -258,11 +280,15 @@ void create_containermsg_from_content( pubkey_offset = sv_to_flatbuff_bytes(container_builder, conf::cfg.pubkey); } + if (!lcl.empty()) + lcl_offset = sv_to_flatbuff_bytes(container_builder, lcl); + const flatbuffers::Offset container_message = CreateContainer( container_builder, util::PEERMSG_VERSION, util::get_epoch_milliseconds(), pubkey_offset, + lcl_offset, sig_offset, content); @@ -285,8 +311,7 @@ flatbuf_usermsgsmap_to_usermsgsmap(const flatbuffers::Vectorcontent()), - flatbuff_bytes_to_sv(msg->signature()) - )); + flatbuff_bytes_to_sv(msg->signature()))); } map.emplace(flatbuff_bytes_to_sv(group->pubkey()), std::move(msglist)); diff --git a/src/fbschema/p2pmsg_helpers.hpp b/src/fbschema/p2pmsg_helpers.hpp index 1a0d941f..50ad2072 100644 --- a/src/fbschema/p2pmsg_helpers.hpp +++ b/src/fbschema/p2pmsg_helpers.hpp @@ -23,7 +23,7 @@ int validate_and_extract_content(const Content **content_ref, const uint8_t *con const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const NonUnl_Proposal_Message &msg, const uint64_t timestamp); -const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector *pubkey, const uint64_t timestamp); +const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector *pubkey, const uint64_t timestamp, const flatbuffers::Vector *lcl); //---Message creation helpers---// @@ -31,8 +31,10 @@ void create_msg_from_nonunl_proposal(flatbuffers::FlatBufferBuilder &container_b void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder, const p2p::proposal &p); +void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builder, const p2p::npl_message &npl, std::string_view lcl); + void create_containermsg_from_content( - flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, const bool sign); + flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, std::string_view lcl, const bool sign); //---Conversion helpers from flatbuffers data types to std data types---// diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 3e77c862..96bcfc81 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -78,7 +78,7 @@ void peer_connection_watchdog() /** * Broadcasts the given message to all currently connected outbound peers. */ -void broadcast_message(const peer_outbound_message msg) +void broadcast_message(const peer_outbound_message msg, bool send_to_self) { if (ctx.peer_connections.size() == 0) { @@ -90,7 +90,11 @@ void broadcast_message(const peer_outbound_message msg) //Broadcast while locking the peer_connections. std::lock_guard lock(ctx.peer_connections_mutex); for (const auto &[k, session] : ctx.peer_connections) + { + if (!send_to_self && session->is_self) + continue; session->send(msg); + } } } // namespace p2p \ No newline at end of file diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index f4471043..5d9b9df3 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -26,13 +26,22 @@ struct nonunl_proposal std::unordered_map> user_messages; }; +struct npl_message +{ + std::string data; +}; + struct message_collection { std::list proposals; - std::mutex proposals_mutex; // Mutex for proposals access race conditions. + std::mutex proposals_mutex; // Mutex for proposals access race conditions. std::list nonunl_proposals; - std::mutex nonunl_proposals_mutex; // Mutex for non-unl proposals access race conditions. + std::mutex nonunl_proposals_mutex; // Mutex for non-unl proposals access race conditions. + + // NPL messages are stored as string list because we are feeding the npl messages as it is (byte array) to the contract. + std::list npl_messages; + std::mutex npl_messages_mutex; // Mutex for npl_messages access race conditions. }; struct connected_context @@ -74,7 +83,7 @@ void start_peer_connections(); void peer_connection_watchdog(); -void broadcast_message(const peer_outbound_message msg); +void broadcast_message(const peer_outbound_message msg, bool send_to_self); } // namespace p2p diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 0a3c4ef9..90d678aa 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -6,6 +6,7 @@ #include "../fbschema/p2pmsg_container_generated.h" #include "../fbschema/p2pmsg_content_generated.h" #include "../fbschema/p2pmsg_helpers.hpp" +#include "../fbschema/common_helpers.hpp" #include "../sock/socket_message.hpp" #include "../sock/socket_session.hpp" #include "p2p.hpp" @@ -82,7 +83,7 @@ void peer_session_handler::on_message(sock::socket_session lock(ctx.collected_msgs.proposals_mutex); // Insert proposal with lock. ctx.collected_msgs.proposals.push_back( - p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey(), container->timestamp())); + p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey(), container->timestamp(), container->lcl())); } else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message { @@ -93,9 +94,20 @@ void peer_session_handler::on_message(sock::socket_sessionmessage_as_Npl_Message(); - // execute npl logic here. - //broadcast message. + if (p2pmsg::validate_container_trust(container) != 0) + { + LOG_DBG << "NPL message rejected due to trust failure."; + return; + } + + std::lock_guard lock(ctx.collected_msgs.npl_messages_mutex); // Insert npl message with lock. + + // Npl messages are added to the npl message array as it is without deserealizing the content. The same content will be passed down + // to the contract as input in a binary format + const uint8_t *container_buf_ptr = reinterpret_cast(message.data()); + const size_t container_buf_size = message.length(); + const std::string npl_message(reinterpret_cast(container_buf_ptr), container_buf_size); + ctx.collected_msgs.npl_messages.push_back(std::move(npl_message)); } else { diff --git a/src/proc/proc.cpp b/src/proc/proc.cpp index 01cc9ed6..a1033f77 100644 --- a/src/proc/proc.cpp +++ b/src/proc/proc.cpp @@ -1,6 +1,9 @@ #include "../pchheader.hpp" #include "../conf.hpp" #include "../hplog.hpp" +#include "../fbschema/common_helpers.hpp" +#include "../fbschema/p2pmsg_container_generated.h" +#include "../fbschema/p2pmsg_content_generated.h" #include "proc.hpp" #include "ptrace_capture.hpp" @@ -23,8 +26,8 @@ enum FDTYPE // Map of user pipe fds (map key: user public key) contract_fdmap_t userfds; -// Map of NPL pipe fds (map key: user public key) -contract_fdmap_t nplfds; +// Pipe fds for NPL <--> messages. +std::vector nplfds; // Pipe fds for HP <--> messages. std::vector hpscfds; @@ -39,8 +42,8 @@ pid_t contract_pid; int exec_contract(const contract_exec_args &args) { // Setup io pipes and feed all inputs to them. - create_iopipes_for_fdmap(nplfds, args.nplbufs); create_iopipes_for_fdmap(userfds, args.userbufs); + create_iopipes(nplfds); create_iopipes(hpscfds); if (feed_inputs(args) != 0) @@ -107,7 +110,7 @@ int exec_contract(const contract_exec_args &args) * "ts": , * "hpfd": [fd0, fd1], * "usrfd":{ "":[fd0, fd1], ... }, - * "nplfd":{ "":[fd0, fd1], ... }, + * "nplfd":[fd0, fd1], * "unl":[ "pkhex", ... ] * } */ @@ -126,11 +129,8 @@ int write_contract_args(const contract_exec_args &args) fdmap_json_to_stream(userfds, os); - os << "},\"nplfd\":{"; - - fdmap_json_to_stream(nplfds, os); - - os << "},\"unl\":["; + os << "},\"nplfd\":[" << nplfds[FDTYPE::SCREAD] << "," << nplfds[FDTYPE::SCWRITE] + << "],\"unl\":["; for (auto nodepk = conf::cfg.unl.begin(); nodepk != conf::cfg.unl.end(); nodepk++) { @@ -178,18 +178,9 @@ int write_contract_args(const contract_exec_args &args) int feed_inputs(const contract_exec_args &args) { - // Write any hp input messages to hp->sc pipe. - if (write_contract_hp_inputs(args) != 0) + // Write any hp or npl input messages to hp->sc and npl->sc pipe. + if (write_contract_hp_npl_inputs(args) != 0) { - LOG_ERR << "Failed to write HP input to contract."; - return -1; - } - - // Write any npl inputs to npl pipes. - if (write_contract_fdmap_inputs(nplfds, args.nplbufs) != 0) - { - cleanup_fdmap(nplfds); - LOG_ERR << "Failed to write NPL inputs to contract."; return -1; } @@ -206,15 +197,8 @@ int feed_inputs(const contract_exec_args &args) int fetch_outputs(const contract_exec_args &args) { - if (read_contract_hp_outputs(args) != 0) + if (read_contract_hp_npl_outputs(args) != 0) { - LOG_ERR << "Error reading HP output from the contract."; - return -1; - } - - if (read_contract_fdmap_outputs(nplfds, args.nplbufs) != 0) - { - LOG_ERR << "Error reading NPL output from the contract."; return -1; } @@ -232,13 +216,20 @@ int fetch_outputs(const contract_exec_args &args) /** * Writes any hp input messages to the contract. */ -int write_contract_hp_inputs(const contract_exec_args &args) +int write_contract_hp_npl_inputs(const contract_exec_args &args) { if (write_iopipe(hpscfds, args.hpscbufs.inputs) != 0) { LOG_ERR << "Error writing HP inputs to SC"; return -1; } + + if (write_npl_iopipe(nplfds, args.nplbuff.inputs) != 0) + { + LOG_ERR << "Error writing NPL inputs to SC"; + return -1; + } + return 0; } @@ -248,14 +239,23 @@ int write_contract_hp_inputs(const contract_exec_args &args) * * @return 0 on success. -1 on failure. */ -int read_contract_hp_outputs(const contract_exec_args &args) +int read_contract_hp_npl_outputs(const contract_exec_args &args) { // Clear the input buffers because we are sure the contract has finished reading from // that mapped memory portion. args.hpscbufs.inputs.clear(); if (read_iopipe(hpscfds, args.hpscbufs.output) != 0) // hpscbufs.second is the output buffer. + { + LOG_ERR << "Error reading HP output from the contract."; return -1; + } + + if (read_iopipe(nplfds, args.nplbuff.output) != 0) // hpscbufs.second is the output buffer. + { + LOG_ERR << "Error reading NPL output from the contract."; + return -1; + } return 0; } @@ -404,7 +404,7 @@ int create_iopipes(std::vector &fds) /** * Common function to write the given input buffer into the write fd from the HP side. * @param fds Vector of fd list. - * @param inputbuffer Buffer to write into the HP write fd. + * @param inputs Buffer to write into the HP write fd. */ int write_iopipe(std::vector &fds, std::list &inputs) { @@ -442,6 +442,84 @@ int write_iopipe(std::vector &fds, std::list &inputs) return vmsplice_error ? -1 : 0; } +/** + * Write the given input buffer into the write fd from the HP side. + * @param fds Vector of fd list. + * @param inputs Buffer to write into the HP write fd. + */ +int write_npl_iopipe(std::vector &fds, std::list &inputs) +{ + /** + * npl inputs are feed into the contract in a binary protocol. It follows the following pattern + * |**NPL version (1 byte)**|**Reserved (1 byte)**|**Length of the message (2 bytes)**|**Public key (4 bytes)**|**Npl message data**| + * Length of the message is calculated without including public key length + */ + const int writefd = fds[FDTYPE::HPWRITE]; + bool vmsplice_error = false; + if (!inputs.empty()) + { + int8_t total_memsegs = inputs.size() * 3; + iovec memsegs[total_memsegs]; + size_t i = 0; + for (auto &input : inputs) + { + int8_t pre_header_index = i * 3; + int8_t pubkey_index = pre_header_index + 1; + int8_t msg_index = pre_header_index + 2; + + // First binary representation of version, reserve and message length is constructed and feed it into + // memory segment. Then the public key and at last the message data + + // At the moment no data is inserted as reserve + uint8_t reserve = 0; + + //Get message container + const fbschema::p2pmsg::Container *container = fbschema::p2pmsg::GetContainer(input.data()); + const flatbuffers::Vector *container_content = container->content(); + + uint16_t msg_length = container_content->size(); + + /** + * Pre header is constructed using bit shifting. This will generate a bit pattern as explain in the example below + * version = 00000001 + * reserve = 00000000 + * msg_length = 0000000010001101 + * pre_header = 00000001000000000000000010001101 + */ + uint32_t pre_header = util::MIN_NPL_INPUT_VERSION; + pre_header = pre_header << 8; + pre_header += reserve; + + pre_header = pre_header << 16; + pre_header += msg_length; + memsegs[pre_header_index].iov_base = &pre_header; + memsegs[pre_header_index].iov_len = 4; + + std::string_view msg_pubkey = fbschema::flatbuff_bytes_to_sv(container->pubkey()); + memsegs[pubkey_index].iov_base = reinterpret_cast(const_cast(msg_pubkey.data())); + memsegs[pubkey_index].iov_len = msg_pubkey.size(); + + memsegs[msg_index].iov_base = reinterpret_cast(const_cast(container_content->Data())); + memsegs[msg_index].iov_len = container_content->size(); + + i++; + } + + if (vmsplice(writefd, memsegs, total_memsegs, 0) == -1) + vmsplice_error = true; + } + // It's important that we DO NOT clear the input buffer string until the contract + // process has actually read from the fd. Because the OS is just mapping our + // input buffer memory portion into the fd, if we clear it now, the contract process + // will get invaid bytes when reading the fd. + + // Close the writefd since we no longer need it. + close(writefd); + fds[FDTYPE::HPWRITE] = 0; + + return vmsplice_error ? -1 : 0; +} + /** * Common function to read and close SC output from the pipe and populate the output list. * @param fds Vector representing the pipes fd list. @@ -483,13 +561,11 @@ void close_unused_fds(const bool is_hp) { close_unused_vectorfds(is_hp, hpscfds); + close_unused_vectorfds(is_hp, nplfds); + // Loop through user fds. for (auto &[pubkey, fds] : userfds) close_unused_vectorfds(is_hp, fds); - - // Loop through npl fds. - for (auto &[pubkey, fds] : nplfds) - close_unused_vectorfds(is_hp, fds); } /** diff --git a/src/proc/proc.hpp b/src/proc/proc.hpp index df787b98..9927c40b 100644 --- a/src/proc/proc.hpp +++ b/src/proc/proc.hpp @@ -21,7 +21,7 @@ struct contract_iobuf_pair // Output emitted by contract after execution. (Because we are reading output at the end, there's no way to // get a "list" of outputs. So it's always a one contingous output.) - std::string output; + std::string output; }; // Common typedef for a map of pubkey->fdlist. @@ -46,9 +46,9 @@ struct contract_exec_args // The value is a pair holding consensus-verified inputs and contract-generated outputs. contract_bufmap_t &userbufs; - // Map of NPL I/O buffers (map key: Peer binary public key). - // The value is a pair holding NPL inputs and contract-generated outputs. - contract_bufmap_t &nplbufs; + // Pair of NPL<->SC byte array message buffers. + // Input buffers for NPL->SC messages, Output buffers for SC->NPL messages. + contract_iobuf_pair &nplbuff; // Pair of HP<->SC JSON message buffers (mainly used for control messages). // Input buffers for HP->SC messages, Output buffers for SC->HP messages. @@ -64,11 +64,11 @@ struct contract_exec_args contract_exec_args( int64_t timestamp, contract_bufmap_t &userbufs, - contract_bufmap_t &nplbufs, + contract_iobuf_pair &nplbuff, contract_iobuf_pair &hpscbufs, contract_fblockmap_t &state_updates) : userbufs(userbufs), - nplbufs(nplbufs), + nplbuff(nplbuff), hpscbufs(hpscbufs), state_updates(state_updates), timestamp(timestamp) @@ -86,9 +86,9 @@ int feed_inputs(const contract_exec_args &args); int fetch_outputs(const contract_exec_args &args); -int write_contract_hp_inputs(const contract_exec_args &args); +int write_contract_hp_npl_inputs(const contract_exec_args &args); -int read_contract_hp_outputs(const contract_exec_args &args); +int read_contract_hp_npl_outputs(const contract_exec_args &args); // Common helper functions @@ -106,6 +106,8 @@ int create_iopipes(std::vector &fds); int write_iopipe(std::vector &fds, std::list &inputs); +int write_npl_iopipe(std::vector &fds, std::list &inputs); + int read_iopipe(std::vector &fds, std::string &output); void close_unused_fds(const bool is_hp); diff --git a/src/sock/socket_session.cpp b/src/sock/socket_session.cpp index 759144d6..49a83deb 100644 --- a/src/sock/socket_session.cpp +++ b/src/sock/socket_session.cpp @@ -17,8 +17,8 @@ socket_session::socket_session(websocket::stream void socket_session::set_max_socket_read_len(const uint64_t size) @@ -118,6 +118,10 @@ void socket_session::run(const std::string &&address, const std::string &&por this->uniqueid.reserve(port.size() + address.size() + 1); this->uniqueid.append(address).append(":").append(port); + // This indicates the connection is a self connection (node connects to the same node through server port) + if(address == "0.0.0.0") + this->is_self = true; + // Set the timeout. beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30)); diff --git a/src/sock/socket_session.hpp b/src/sock/socket_session.hpp index 0122fc52..88bd54db 100644 --- a/src/sock/socket_session.hpp +++ b/src/sock/socket_session.hpp @@ -140,6 +140,9 @@ public: // The unique identifier of the remote party (format :). std::string uniqueid; + // Boolean value to store whether the session is self connection (connect to the same node) + bool is_self; + // The set of sock::SESSION_FLAG enum flags that will be set by user-code of this calss. // We mainly use this to store contexual information about this session based on the use case. // Setting and reading flags to this is completely managed by user-code. diff --git a/src/util.hpp b/src/util.hpp index 555687e7..3ebae8d0 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -23,6 +23,10 @@ constexpr uint8_t PEERMSG_VERSION = 1; // (Keeping this as int for effcient msg payload and comparison) constexpr uint8_t MIN_PEERMSG_VERSION = 1; +// Minimum compatible npl contract input version (this will be used to generate the npl input to feed the contract) +// (Keeping this as int for effcient msg payload and comparison) +constexpr uint8_t MIN_NPL_INPUT_VERSION = 1; + /** * FIFO hash set with a max size. */