From 83189556de7c5f7c25a4669c5547f5751335fb9a Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Sun, 3 Nov 2019 22:45:23 +0530 Subject: [PATCH] Refactored user I/O with signed inputs and NUPs. (#53) --- README.md | 17 +- examples/hpclient/client.js | 29 +- examples/hpclient/malicious-client.js | 5 +- src/cons/cons.cpp | 441 ++++++++++++------------ src/cons/cons.hpp | 80 ++++- src/crypto.cpp | 33 +- src/crypto.hpp | 2 + src/fbschema/common_helpers.cpp | 7 +- src/fbschema/common_helpers.hpp | 4 +- src/fbschema/p2pmsg_content.fbs | 20 +- src/fbschema/p2pmsg_content_generated.h | 323 +++++++++-------- src/fbschema/p2pmsg_helpers.cpp | 133 +++---- src/fbschema/p2pmsg_helpers.hpp | 18 +- src/jsonschema/usrmsg_helpers.cpp | 187 ++++++++-- src/jsonschema/usrmsg_helpers.hpp | 26 ++ src/p2p/p2p.cpp | 20 +- src/p2p/p2p.hpp | 21 +- src/p2p/peer_session_handler.cpp | 7 + src/pchheader.hpp | 1 + src/proc.cpp | 8 +- src/proc.hpp | 12 +- src/usr/user_input.hpp | 32 ++ src/usr/user_session_handler.cpp | 21 +- src/usr/usr.cpp | 55 ++- src/usr/usr.hpp | 13 +- src/util.cpp | 8 + src/util.hpp | 31 +- 27 files changed, 958 insertions(+), 596 deletions(-) create mode 100644 src/usr/user_input.hpp diff --git a/README.md b/README.md index c6ca288a..12fd407b 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ A C++ version of hotpocket designed for production envrionments, original protot * Websockets - Boost|Beast https://github.com/boostorg/beast * RapidJSON - http://rapidjson.org * P2P Protocol - https://google.github.io/flatbuffers/ +* TLS - https://www.openssl.org/ ## Steps to setup Hot Pocket @@ -26,9 +27,8 @@ Instructions are based on [this](https://libsodium.gitbook.io/doc/installation). 1. Download and extract Libsodium 1.0.18 from [here](https://download.libsodium.org/libsodium/releases/libsodium-1.0.18-stable.tar.gz). 2. Navigate to the extracted libsodium directory in a terminal. -3. Run `./configure` -4. Run `make && make check` -5. Run `sudo make install` +3. Run `./configure && make && make check` +4. Run `sudo make install` #### Install Boost Following Instructions are based on Boost [getting started](https://www.boost.org/doc/libs/1_71_0/more/getting_started/unix-variants.html#prepare-to-use-a-boost-library-binary) @@ -58,21 +58,20 @@ make 4. Run `sudo snap install flatbuffers --edge` ##### Compiling FlatBuffers message definitions -When you make a change to `message.fbc` defnition file, you need to run this: +Example: When you make a change to `p2pmsg_content_.fbc` defnition file, you need to run this: -`flatc -o src/p2p/ --gen-mutable --cpp src/p2p/message.fbs` +`flatc -o src/fbschema/ --gen-mutable --cpp src/fbschema/p2pmsg_content.fbs` #### Install OpenSSL 1. Download and extract OpenSSL-1.1.1d source from [here](https://www.openssl.org/source/openssl-1.1.1d.tar.gz). 2. Navigate to the extracted directory. -3. Run `./config` -4. Run `make` -5. Run `make install` +3. Run `./config && make` +4. Run `sudo make install` #### Run ldconfig `sudo ldconfig` -This will update your library cache and avoid potential issues when running your compiled C++ program which links to newly installed libraries. +This will update your linker library cache and avoid potential issues when running your compiled C++ program which links to newly installed libraries. #### Build and run Hot Pocket 1. Navigate to hotpocket repo root. diff --git a/examples/hpclient/client.js b/examples/hpclient/client.js index 27101996..3af59f7d 100644 --- a/examples/hpclient/client.js +++ b/examples/hpclient/client.js @@ -41,7 +41,7 @@ function main() { /* anatomy of a public challenge { - hotpocket: 0.1, + version: '0.1', type: 'public_challenge', challenge: '' } @@ -76,13 +76,14 @@ function main() { // sign the challenge and send back the response var sigbytes = sodium.crypto_sign_detached(m.challenge, keys.privateKey); var response = { + version: '0.1', type: 'challenge_response', challenge: m.challenge, sig: Buffer.from(sigbytes).toString('hex'), pubkey: pkhex } - console.log('Sending challenge response...'); + console.log('Sending challenge response.'); ws.send(JSON.stringify(response)) // start listening for stdin @@ -91,9 +92,29 @@ function main() { output: process.stdout }); + // Capture user input from the console. var input_pump = () => { - rl.question('', (answer) => { - ws.send(answer) + rl.question('\nProvide an input: ', (inp) => { + + let inp_container = { + nonce: (new Date()).getTime().toString(), + input: Buffer.from(inp).toString('hex'), + maxledgerseqno: 9999999 + } + let inp_container_bytes = JSON.stringify(inp_container); + let sig_bytes = sodium.crypto_sign_detached(inp_container_bytes, keys.privateKey); + + let signed_inp_container = { + version: "0.1", + type: "contract_input", + content: inp_container_bytes.toString('hex'), + sig: Buffer.from(sig_bytes).toString('hex') + } + + let msgtosend = JSON.stringify(signed_inp_container); + console.log("Sending message: " + msgtosend); + ws.send(msgtosend) + input_pump() }) } diff --git a/examples/hpclient/malicious-client.js b/examples/hpclient/malicious-client.js index 389acc21..ab84cf7a 100644 --- a/examples/hpclient/malicious-client.js +++ b/examples/hpclient/malicious-client.js @@ -48,7 +48,7 @@ function main() { /* anatomy of a public challenge { - hotpocket: 0.1, + version: '0.1', type: 'public_challenge', challenge: '' } @@ -83,13 +83,14 @@ function main() { // sign the challenge and send back the response var sigbytes = sodium.crypto_sign_detached(m.challenge, keys.privateKey); var response = { + version: '0.1', type: 'challenge_response', challenge: m.challenge, sig: Buffer.from(sigbytes).toString('hex'), pubkey: pkhex } - console.log('Sending challenge response...'); + console.log('Sending challenge response.'); ws.send(JSON.stringify(response)); setInterval(() => { diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index e0a68742..f46e7bc9 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -2,8 +2,10 @@ #include "../pchheader.hpp" #include "../conf.hpp" #include "../usr/usr.hpp" +#include "../usr/user_input.hpp" #include "../p2p/p2p.hpp" #include "../fbschema/p2pmsg_helpers.hpp" +#include "../jsonschema/usrmsg_helpers.hpp" #include "../p2p/peer_session_handler.hpp" #include "../hplog.hpp" #include "../crypto.hpp" @@ -12,33 +14,19 @@ #include "cons.hpp" namespace p2pmsg = fbschema::p2pmsg; +namespace jusrmsg = jsonschema::usrmsg; namespace cons { consensus_context ctx; -/** - * Increment voting table counter. - * - * @param counter The counter map in which a vote should be incremented. - * @param candidate The candidate whose vote should be increased by 1. - */ -template -void increment(std::map &counter, const T &candidate) -{ - if (counter.count(candidate)) - counter[candidate]++; - else - counter.try_emplace(candidate, 1); -} - int init() { //set start stage ctx.stage = 0; - //load lcl detals from lcl history. + //load lcl details from lcl history. const ledger_history ldr_hist = load_ledger(); ctx.led_seq_no = ldr_hist.led_seq_no; ctx.lcl = ldr_hist.lcl; @@ -56,8 +44,8 @@ void consensus() ctx.time_now = util::get_epoch_milliseconds(); // Throughout consensus, we move over the incoming proposals collected via the network so far into - // the candidate proposal set (move and append). This is to have a private working set for the consensus and avoid - // threading conflicts with network incoming proposals. + // the candidate proposal set (move and append). This is to have a private working set for the consensus + // and avoid threading conflicts with network incoming proposals. { std::lock_guard lock(p2p::collected_msgs.proposals_mutex); ctx.candidate_proposals.splice(ctx.candidate_proposals.end(), p2p::collected_msgs.proposals); @@ -69,9 +57,7 @@ void consensus() bool self = p.pubkey == conf::cfg.pubkey; std::cout << "[stage" << std::to_string(p.stage) << "] users:" << p.users.size() - << " rinp:" << p.raw_inputs.size() << " hinp:" << p.hash_inputs.size() - << " rout:" << p.raw_outputs.size() << " hout:" << p.hash_outputs.size() << " lcl:" << p.lcl << " self:" << self @@ -99,17 +85,16 @@ void consensus() } } - // Transfer connected user data onto consensus candidate data. - populate_candidate_users_and_inputs(); + // Broadcast non-unl proposals (NUP) containing inputs from locally connected users. + broadcast_nonunl_proposal(); + util::sleep(conf::cfg.roundtime / 10); + + // Verify and transfer user inputs from incoming NUPs onto consensus candidate data. + verify_and_populate_candidate_user_inputs(); // In stage 0 we create a novel proposal and broadcast it. const p2p::proposal stg_prop = create_stage0_proposal(); - if (broadcast_proposal(stg_prop) != 0) - { - // No peers to broadcast stage0 proposal (not even self). So we wait and try stage 0 again. - timewait_stage(true); - return; - } + broadcast_proposal(stg_prop); } else // Stage 1, 2, 3 { @@ -173,28 +158,82 @@ void consensus() // after a stage 0 novel proposal we will just busy wait for proposals if (ctx.stage == 0) - std::this_thread::sleep_for(std::chrono::milliseconds(conf::cfg.roundtime / 100)); + util::sleep(conf::cfg.roundtime / 100); else - std::this_thread::sleep_for(std::chrono::milliseconds(conf::cfg.roundtime / 4)); + util::sleep(conf::cfg.roundtime / 4); } /** - * Populate connected users and their inputs (if any) into consensus candidate data. + * Broadcasts any inputs from locally connected users via an NUP. + * @return 0 for successful broadcast. -1 for failure. */ -void populate_candidate_users_and_inputs() +void broadcast_nonunl_proposal() { - // Lock the connected user list until we do this operation. - std::lock_guard lock(usr::ctx.users_mutex); - for (auto &[sid, con_user] : usr::ctx.users) + // Construct NUP. + p2p::nonunl_proposal nup; + + std::lock_guard lock(p2p::collected_msgs.nonunl_proposals_mutex); + for (auto &[sid, user] : usr::ctx.users) { - // Populate the user into candidate user inputs map. - // We do this regardless of whether the user has any inputs or not. + std::list usermsgs; + usermsgs.splice(usermsgs.end(), user.submitted_inputs); - std::list &inplist = ctx.candidate_users[con_user.pubkey]; - - // Transfer the connected user's inputs (if any) to the candidate user's inputs list. - inplist.splice(inplist.end(), con_user.inputs); + // We should create an entry for each user pubkey, even if the user has no inputs. This is + // because this data map will be used to track connected users as well in addition to inputs. + nup.user_messages.try_emplace(user.pubkey, std::move(usermsgs)); } + + p2p::peer_outbound_message msg(std::make_shared(1024)); + p2pmsg::create_msg_from_nonunl_proposal(msg.builder(), nup); + p2p::broadcast_message(msg); + + LOG_DBG << "NUP sent." << " users:" << nup.user_messages.size(); +} + +/** + * Verifies the user signatures and populate non-expired user inputs from collected + * non-unl proposals (if any) into consensus candidate data. + */ +void verify_and_populate_candidate_user_inputs() +{ + // Lock the list so any network activity is blocked. + std::lock_guard lock(p2p::collected_msgs.nonunl_proposals_mutex); + for (const p2p::nonunl_proposal &p : p2p::collected_msgs.nonunl_proposals) + { + for (const auto &[pubkey, umsgs] : p.user_messages) + { + // Populate user list. + ctx.candidate_users.emplace(pubkey); + + for (const usr::user_submitted_message &umsg : umsgs) + { + // Verify the signature of the message content. + if (crypto::verify(umsg.content, umsg.sig, pubkey) == 0) + { + // TODO: Also verify XRP payment token. + + std::string nonce; + std::string input; + uint64_t maxledgerseqno; + jusrmsg::extract_input_container(nonce, input, maxledgerseqno, umsg.content); + + // Ignore the input if our ledger has passed the input TTL. + if (maxledgerseqno > ctx.led_seq_no) + { + // Hash is prefixed with the nonce to support user-defined sort order. + std::string hash = std::move(nonce); + // Append the hash of the message signature to get the final hash. + hash.append(crypto::get_hash(umsg.sig)); + + ctx.candidate_user_inputs.try_emplace( + hash, + candidate_user_input(pubkey, std::move(input), maxledgerseqno)); + } + } + } + } + } + p2p::collected_msgs.nonunl_proposals.clear(); } p2p::proposal create_stage0_proposal() @@ -206,39 +245,23 @@ p2p::proposal create_stage0_proposal() stg_prop.stage = 0; stg_prop.lcl = ctx.lcl; - // Populate the poposal with users list (user pubkey list) and their inputs. - - for (auto [pubkey, inputs] : ctx.candidate_users) - { - // Add all the user connections we host. + // Populate the proposal with set of candidate user pubkeys. + for (const std::string &pubkey : ctx.candidate_users) stg_prop.users.emplace(pubkey); - // Add all their pending inputs. - if (!inputs.empty()) - { - std::vector inpvec; - for (util::hash_buffer &hashbuf : inputs) - inpvec.push_back(hashbuf); // Copy all hashbufs from candidate inputs into the proposal. + // We don't need candidate_users anymore, so clear it. It will be repopulated during next censensus round. + ctx.candidate_users.clear(); - stg_prop.raw_inputs.emplace(pubkey, std::move(inpvec)); - } - } + // Populate the proposal with hashes of user inputs. + for (const auto &[hash, cand_input] : ctx.candidate_user_inputs) + stg_prop.hash_inputs.emplace(hash); - // Populate the stg_prop with any contract outputs from previous round's stage 3. - for (auto &[pubkey, bufpair] : ctx.useriobufmap) - { - if (!bufpair.output.empty()) - { - std::string rawoutput; - rawoutput.swap(bufpair.output); - - stg_prop.raw_outputs.try_emplace(pubkey, util::hash_buffer(rawoutput, pubkey)); - } - } - ctx.useriobufmap.clear(); + // Populate the proposal with hashes of user outputs. + for (const auto &[hash, cand_output] : ctx.candidate_user_outputs) + stg_prop.hash_outputs.emplace(hash); // todo: set propsal states - // todo: generate stg_prop hash and check with ctx.novel_proposal, we are sending same stg_prop again. + // todo: generate stg_prop hash and check with ctx.novel_proposal, we are sending same proposal again. return stg_prop; } @@ -254,69 +277,27 @@ p2p::proposal create_stage123_proposal(vote_counter &votes) // our peers or we will halt depending on level of consensus on the sides of the fork stg_prop.lcl = ctx.lcl; - //todo:check lcl votes and wait for proposals - // Vote for rest of the proposal fields by looking at candidate proposals. for (const p2p::proposal &cp : ctx.candidate_proposals) { // Vote for times. - // Everyone votes on an arbitrary time, as long as its within the round time and not in the future + // Everyone votes on an arbitrary time, as long as its within the round time and not in the future. if (ctx.time_now > cp.time && (ctx.time_now - cp.time) < conf::cfg.roundtime) increment(votes.time, cp.time); - // Vote for user connections - for (const std::string &user : cp.users) - increment(votes.users, user); + // Vote for user pubkeys. + for (const std::string &pubkey : cp.users) + increment(votes.users, pubkey); - // Vote for user inputs + // Vote for user inputs (hashes). Only vote for the inputs that are in our candidate_inputs set. + for (const std::string &hash : cp.hash_inputs) + if (ctx.candidate_user_inputs.count(hash) > 0) + increment(votes.inputs, hash); - // Proposals from stage 0 will have raw inputs (and their hashes) in them. - if (!cp.raw_inputs.empty()) - { - for (auto &[pubkey, inputs] : cp.raw_inputs) - { - // Vote for the input hash. - for (util::hash_buffer input : inputs) - { - increment(votes.inputs, input.hash); - - std::string inputbuffer; - inputbuffer.swap(input.buffer); - // Remember the actual input along with the hash for future use for apply-ledger. - ctx.possible_inputs.try_emplace(input.hash, std::make_pair(pubkey, inputbuffer)); - } - } - } - // Proposals from stage 1, 2, 3 will have only input hashes in them. - else if (!cp.hash_inputs.empty()) - { - for (const std::string &inputhash : cp.hash_inputs) - increment(votes.inputs, inputhash); - } - - // Vote for contract outputs - - // Proposals from stage 0 will have raw user outputs in them. - if (!cp.raw_outputs.empty()) - { - for (auto [pubkey, output] : cp.raw_outputs) - { - // Vote for the hash. - increment(votes.outputs, output.hash); - - std::string outputbuf; - outputbuf.swap(output.buffer); - - // Remember the actual output along with the hash for future use for apply-ledger and sending back to user. - ctx.possible_outputs.try_emplace(output.hash, std::make_pair(pubkey, outputbuf)); - } - } - // Proposals from stage 1, 2, 3 will have hashed user outputs in them. - else if (!cp.hash_outputs.empty()) - { - for (auto outputhash : cp.hash_outputs) - increment(votes.outputs, outputhash); - } + // Vote for contract outputs (hashes). Only vote for the outputs that are in our candidate_outputs set. + for (const std::string &hash : cp.hash_outputs) + if (ctx.candidate_user_outputs.count(hash) > 0) + increment(votes.outputs, hash); // todo: repeat above for state } @@ -328,14 +309,14 @@ p2p::proposal create_stage123_proposal(vote_counter &votes) // if we're at proposal stage 1 we'll accept any input and connection that has 1 or more vote. - // Add user connections which have votes over stage threshold to proposal. - for (auto &[userpubkey, numvotes] : votes.users) - if (numvotes >= vote_threshold || (numvotes > 0 && ctx.stage == 1)) - stg_prop.users.emplace(userpubkey); + // Add user pubkeys which have votes over stage threshold to proposal. + for (auto &[pubkey, numvotes] : votes.users) + if (numvotes >= vote_threshold || (ctx.stage == 1 && numvotes > 0)) + stg_prop.users.emplace(pubkey); // Add inputs which have votes over stage threshold to proposal. for (auto &[hash, numvotes] : votes.inputs) - if (numvotes >= vote_threshold || (numvotes > 0 && ctx.stage == 1)) + if (numvotes >= vote_threshold || (ctx.stage == 1 && numvotes > 0)) stg_prop.hash_inputs.emplace(hash); // Add outputs which have votes over stage threshold to proposal. @@ -363,33 +344,16 @@ p2p::proposal create_stage123_proposal(vote_counter &votes) * Broadcasts the given proposal to all connected peers. * @return 0 on success. -1 if no peers to broadcast. */ -int broadcast_proposal(const p2p::proposal &p) +void broadcast_proposal(const p2p::proposal &p) { p2p::peer_outbound_message msg(std::make_shared(1024)); p2pmsg::create_msg_from_proposal(msg.builder(), p); - - { - //Broadcast while locking the peer_connections. - std::lock_guard lock(p2p::peer_connections_mutex); - - if (p2p::peer_connections.size() == 0) - { - LOG_DBG << "No peers to broadcast"; - return -1; - } - - for (auto &[k, session] : p2p::peer_connections) - session->send(msg); - } + p2p::broadcast_message(msg); LOG_DBG << "Proposed [stage" << std::to_string(p.stage) << "] users:" << p.users.size() - << " rinp:" << p.raw_inputs.size() << " hinp:" << p.hash_inputs.size() - << " rout:" << p.raw_outputs.size() << " hout:" << p.hash_outputs.size(); - - return 0; } /** @@ -520,7 +484,7 @@ void timewait_stage(bool reset) if (reset) ctx.stage = 0; - std::this_thread::sleep_for(std::chrono::milliseconds(conf::cfg.roundtime / 100)); + util::sleep(conf::cfg.roundtime / 100); } /** @@ -529,112 +493,167 @@ void timewait_stage(bool reset) */ void apply_ledger(const p2p::proposal &cons_prop) { - // todo:write lcl. ctx.led_seq_no++; ctx.lcl = cons::save_ledger(cons_prop, ctx.led_seq_no); - // Send any output from the previous consensus round to users. + // After the current ledger seq no is updated, we remove any newly expired inputs from candidate set. + { + auto itr = ctx.candidate_user_inputs.begin(); + while (itr != ctx.candidate_user_inputs.end()) + { + if (itr->second.maxledgerseqno <= ctx.led_seq_no) + ctx.candidate_user_inputs.erase(itr++); + else + ++itr; + } + } + + // Send any output from the previous consensus round to locally connected users. + dispatch_user_outputs(cons_prop); + + // todo:check state against the winning / canonical state + // and act accordingly (rollback, ask state from peer, etc.) + + proc::contract_bufmap_t useriobufmap; + feed_inputs_to_contract_bufmap(useriobufmap, cons_prop); + run_contract_binary(cons_prop.time, useriobufmap); + extract_outputs_from_contract_bufmap(useriobufmap); +} + +/** + * Dispatch any consensus-reached outputs to matching users if they are connected to us locally. + * @param cons_prop The proposal that achieved consensus. + */ +void dispatch_user_outputs(const p2p::proposal &cons_prop) +{ + std::lock_guard lock(usr::ctx.users_mutex); + for (const std::string &hash : cons_prop.hash_outputs) { - auto itr = ctx.possible_outputs.find(hash); - bool hashfound = (itr != ctx.possible_outputs.end()); + auto cu_itr = ctx.candidate_user_outputs.find(hash); + bool hashfound = (cu_itr != ctx.candidate_user_outputs.end()); if (!hashfound) { - // There's no possiblity for this to happen. - LOG_ERR << "Output required but wasn't in our possible output dict, this will potentially cause desync."; + LOG_ERR << "Output required but wasn't in our candidate outputs map, this will potentially cause desync."; // todo: consider fatal } else { - // Send outputs to users. - auto &[pubkey, output] = itr->second; - std::string outputtosend; - outputtosend.swap(output); + // Send matching outputs to locally connected users. + candidate_user_output &cand_output = cu_itr->second; + + // Find the user session by user pubkey. + auto sess_itr = usr::ctx.sessionids.find(cand_output.userpubkey); + if (sess_itr != usr::ctx.sessionids.end()) // match found { - std::lock_guard lock(usr::ctx.users_mutex); - - // Find the user by session id. - const std::string sessionid = usr::ctx.sessionids[pubkey]; - auto itr = usr::ctx.users.find(sessionid); - if (itr != usr::ctx.users.end()) + auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id. + if (user_itr != usr::ctx.users.end()) // match found { - const usr::connected_user &user = itr->second; + std::string outputtosend; + outputtosend.swap(cand_output.output); usr::user_outbound_message outmsg(std::move(outputtosend)); + + const usr::connected_user &user = user_itr->second; user.session->send(std::move(outmsg)); } } } } + + // now we can safely clear our candidate outputs. + ctx.candidate_user_outputs.clear(); +} - // now we can safely clear our outputs. - ctx.possible_outputs.empty(); - - //todo:check state against the winning / canonical state - //and act accordingly (rollback, ask state from peer, etc.) - - //create input to feed to binary contract run +/** + * Transfers consensus-reached inputs into the provided contract buf map so it can be fed into the contract process. + * @param bufmap The contract bufmap which needs to be populated with inputs. + * @param cons_prop The proposal that achieved consensus. + */ +void feed_inputs_to_contract_bufmap(proc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop) +{ + // Populate the buf map with all currently connected users regardless of whether they have inputs or not. + // This is in case the contract wanted to emit some data to a user without needing any input. + for (const std::string &pubkey : cons_prop.users) + bufmap.try_emplace(pubkey, proc::contract_iobuf_pair()); for (const std::string &hash : cons_prop.hash_inputs) { - auto itr = ctx.possible_inputs.find(hash); - bool hashfound = (itr != ctx.possible_inputs.end()); + // For each consensus input hash, we need to find the actual input content to feed the contract. + auto itr = ctx.candidate_user_inputs.find(hash); + bool hashfound = (itr != ctx.candidate_user_inputs.end()); if (!hashfound) { - // There's no possiblity for this to happen. - LOG_ERR << "input required but wasn't in our possible input dict, this will potentially cause desync"; - // todo: consider fatal + LOG_ERR << "input required but wasn't in our candidate inputs map, this will potentially cause desync."; + // TODO: consider fatal } else { - // Prepare ctx.useriobufmap with user inputs to feed to the contract. + // Populate the input content into the bufmap. - const std::string &pubkey = itr->second.first; - std::string rawinput = itr->second.second; + candidate_user_input &cand_input = itr->second; std::string inputtofeed; - inputtofeed.swap(rawinput); + inputtofeed.swap(cand_input.input); - proc::contract_iobuf_pair &bufpair = ctx.useriobufmap[pubkey]; + proc::contract_iobuf_pair &bufpair = bufmap[cand_input.userpubkey]; bufpair.inputs.push_back(std::move(inputtofeed)); + + // Remove the input from the candidate set because we no longer need it. + ctx.candidate_user_inputs.erase(itr); } } - ctx.possible_inputs.clear(); - - run_contract_binary(cons_prop.time); - - // Remove entries from candidate inputs that made their way into a closed ledger - auto cu_itr = ctx.candidate_users.begin(); - while (cu_itr != ctx.candidate_users.end()) - { - // Delete any ledger inputs for this user. - std::list &inputs = cu_itr->second; - auto inp_itr = inputs.begin(); - while (inp_itr != inputs.end()) - { - // Delete the input from the list, if it was part of consensus proposal. - if (cons_prop.hash_inputs.count(inp_itr->hash)) - inputs.erase(inp_itr++); - else - ++inp_itr; - } - - // Delete the user from the list if there are no more unprocessed inputs. - if (cu_itr->second.empty()) - ctx.candidate_users.erase(cu_itr++); - else - ++cu_itr; - } } -void run_contract_binary(int64_t time_now) +/** + * Reads any outputs the contract has produced on the provided buf map and transfers them to candidate outputs + * for the next consensus round. + * @param bufmap The contract bufmap containing the outputs produced by the contract. + */ +void extract_outputs_from_contract_bufmap(proc::contract_bufmap_t &bufmap) { - // todo:implement proper data structures to exchange npl and hpsc bufs - proc::contract_bufmap_t nplbufs; + for (auto &[pubkey, bufpair] : bufmap) + { + if (!bufpair.output.empty()) + { + std::string output; + output.swap(bufpair.output); + + std::string hash = crypto::get_hash(pubkey, output); + ctx.candidate_user_outputs.try_emplace( + std::move(hash), + candidate_user_output(pubkey, std::move(output))); + } + } +} + +/** + * Executes the smart contract with the specified time and provided I/O buf maps. + * @param time_now The time that must be passed on to the contract. + * @param useriobufmap The contract bufmap which holds user I/O buffers. + */ +void run_contract_binary(int64_t time_now, proc::contract_bufmap_t &useriobufmap) +{ + // todo:implement exchange of npl and hpsc bufs + proc::contract_bufmap_t nplbufmap; proc::contract_iobuf_pair hpscbufpair; - proc::ContractExecArgs eargs(time_now, ctx.useriobufmap, nplbufs, hpscbufpair); + proc::contract_exec_args eargs(time_now, useriobufmap, nplbufmap, hpscbufpair); proc::exec_contract(eargs); } +/** + * Increment voting table counter. + * @param counter The counter map in which a vote should be incremented. + * @param candidate The candidate whose vote should be increased by 1. + */ +template +void increment(std::map &counter, const T &candidate) +{ + if (counter.count(candidate)) + counter[candidate]++; + else + counter.try_emplace(candidate, 1); +} + } // namespace cons \ No newline at end of file diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp index e39385e1..f87b1b6e 100644 --- a/src/cons/cons.hpp +++ b/src/cons/cons.hpp @@ -4,6 +4,7 @@ #include "../pchheader.hpp" #include "../proc.hpp" #include "../p2p/p2p.hpp" +#include "../usr/user_input.hpp" namespace cons { @@ -15,13 +16,58 @@ static const float STAGE2_THRESHOLD = 0.65; //stage 3 vote threshold static const float STAGE3_THRESHOLD = 0.8; +/** + * Represents a contract input that takes part in consensus. + */ +struct candidate_user_input +{ + std::string userpubkey; + std::string input; + uint64_t maxledgerseqno; + + candidate_user_input(std::string userpubkey, std::string input, uint64_t maxledgerseqno) + { + this->userpubkey = std::move(userpubkey); + this->input = std::move(input); + this->maxledgerseqno = maxledgerseqno; + } +}; + +/** + * Represents a contract output that takes part in consensus. + */ +struct candidate_user_output +{ + std::string userpubkey; + std::string output; + + candidate_user_output(std::string userpubkey, std::string output) + { + this->userpubkey = std::move(userpubkey); + this->output = std::move(output); + } +}; + /** * This is used to store consensus information */ struct consensus_context { + // The set of proposals that are being collected as consensus stages are progressing. std::list candidate_proposals; - std::unordered_map> candidate_users; + + // Set of user pubkeys that is said to be connected to the cluster. This will be cleared in each round. + std::unordered_set candidate_users; + + // Map of candidate user inputs with input hash as map key. Inputs will stay here until they + // achieve consensus or expire (due to maxledgerseqno). Input hash is globally unique among inputs + // from all users. We will use this map to feed inputs into the contract once consensus is achieved. + std::unordered_map candidate_user_inputs; + + // Map of outputs generated by the contract with output hash is the map key. Outputs will stay + // here until the end of the current consensus round. Output hash is globally unique among outputs for + // all users. We will use this map to distribute outputs back to connected users once consensus is achieved. + std::unordered_map candidate_user_outputs; uint8_t stage; uint64_t novel_proposal_time; @@ -30,11 +76,6 @@ struct consensus_context uint64_t led_seq_no; std::string novel_proposal; - std::map> possible_inputs; - std::map> possible_outputs; - - std::unordered_map useriobufmap; - int32_t next_sleep; }; @@ -54,25 +95,36 @@ int init(); void consensus(); -void apply_ledger(const p2p::proposal &proposal); +void broadcast_nonunl_proposal(); -float_t get_stage_threshold(uint8_t stage); - -void timewait_stage(bool reset); - -void populate_candidate_users_and_inputs(); +void verify_and_populate_candidate_user_inputs(); p2p::proposal create_stage0_proposal(); p2p::proposal create_stage123_proposal(vote_counter &votes); -int broadcast_proposal(const p2p::proposal &p); +void broadcast_proposal(const p2p::proposal &p); void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority_stage, vote_counter &votes); void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes); -void run_contract_binary(int64_t time); +float_t get_stage_threshold(uint8_t stage); + +void timewait_stage(bool reset); + +void apply_ledger(const p2p::proposal &proposal); + +void dispatch_user_outputs(const p2p::proposal &cons_prop); + +void feed_inputs_to_contract_bufmap(proc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop); + +void extract_outputs_from_contract_bufmap(proc::contract_bufmap_t &bufmap); + +void run_contract_binary(int64_t time_now, proc::contract_bufmap_t &useriobufmap); + +template +void increment(std::map &counter, const T &candidate); } // namespace cons diff --git a/src/crypto.cpp b/src/crypto.cpp index 17ee1b21..729f2ff3 100644 --- a/src/crypto.cpp +++ b/src/crypto.cpp @@ -138,16 +138,41 @@ int verify_hex(std::string_view msg, std::string_view sighex, std::string_view p */ std::string get_hash(std::string_view data) { - unsigned char hashchars[crypto_generichash_BYTES]; + std::string hash; + hash.resize(crypto_generichash_blake2b_BYTES); crypto_generichash_blake2b( - hashchars, - sizeof hashchars, + reinterpret_cast(hash.data()), + hash.length(), reinterpret_cast(data.data()), data.length(), NULL, 0); - return std::string(reinterpret_cast(hashchars), crypto_generichash_blake2b_BYTES); + return hash; +} + +/** + * Generates blake2b hash for the given set of strings using stream hashing. + */ +std::string get_hash(std::string_view s1, std::string_view s2) +{ + std::string hash; + hash.resize(crypto_generichash_blake2b_BYTES); + + // Init stream hashing. + crypto_generichash_blake2b_state state; + crypto_generichash_blake2b_init(&state, NULL, 0, hash.length()); + + crypto_generichash_blake2b_update(&state, reinterpret_cast(s1.data()), s1.length()); + crypto_generichash_blake2b_update(&state, reinterpret_cast(s2.data()), s2.length()); + + // Get the final hash. + crypto_generichash_blake2b_final( + &state, + reinterpret_cast(hash.data()), + hash.length()); + + return hash; } } // namespace crypto \ No newline at end of file diff --git a/src/crypto.hpp b/src/crypto.hpp index d1c18464..4f5fe6f0 100644 --- a/src/crypto.hpp +++ b/src/crypto.hpp @@ -31,6 +31,8 @@ int verify_hex(std::string_view msg, std::string_view sighex, std::string_view p std::string get_hash(std::string_view data); +std::string get_hash(std::string_view s1, std::string_view s2); + } // namespace crypto #endif \ No newline at end of file diff --git a/src/fbschema/common_helpers.cpp b/src/fbschema/common_helpers.cpp index 88813f83..bfa06d98 100644 --- a/src/fbschema/common_helpers.cpp +++ b/src/fbschema/common_helpers.cpp @@ -25,10 +25,9 @@ std::string_view flatbuff_bytes_to_sv(const flatbuffers::Vector *buffer /** * Returns set from Flatbuffer vector of ByteArrays. */ -const std::unordered_set flatbuf_bytearrayvector_to_stringlist(const flatbuffers::Vector> *fbvec) +const std::set flatbuf_bytearrayvector_to_stringlist(const flatbuffers::Vector> *fbvec) { - std::unordered_set set; - set.reserve(fbvec->size()); + std::set set; for (auto el : *fbvec) set.emplace(std::string(flatbuff_bytes_to_sv(el->array()))); return set; @@ -63,7 +62,7 @@ sv_to_flatbuff_bytes(flatbuffers::FlatBufferBuilder &builder, std::string_view s * Returns Flatbuffer vector of ByteArrays from given set of strings. */ const flatbuffers::Offset>> -stringlist_to_flatbuf_bytearrayvector(flatbuffers::FlatBufferBuilder &builder, const std::unordered_set &set) +stringlist_to_flatbuf_bytearrayvector(flatbuffers::FlatBufferBuilder &builder, const std::set &set) { std::vector> fbvec; fbvec.reserve(set.size()); diff --git a/src/fbschema/common_helpers.hpp b/src/fbschema/common_helpers.hpp index d9d2d5e4..08782704 100644 --- a/src/fbschema/common_helpers.hpp +++ b/src/fbschema/common_helpers.hpp @@ -17,7 +17,7 @@ std::string_view flatbuff_bytes_to_sv(const uint8_t *data, flatbuffers::uoffset_ std::string_view flatbuff_bytes_to_sv(const flatbuffers::Vector *buffer); -const std::unordered_set +const std::set flatbuf_bytearrayvector_to_stringlist(const flatbuffers::Vector> *fbvec); const std::unordered_map @@ -29,7 +29,7 @@ const flatbuffers::Offset> sv_to_flatbuff_bytes(flatbuffers::FlatBufferBuilder &builder, std::string_view sv); const flatbuffers::Offset>> -stringlist_to_flatbuf_bytearrayvector(flatbuffers::FlatBufferBuilder &builder, const std::unordered_set &set); +stringlist_to_flatbuf_bytearrayvector(flatbuffers::FlatBufferBuilder &builder, const std::set &set); const flatbuffers::Offset>> stringmap_to_flatbuf_bytepairvector(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map &map); diff --git a/src/fbschema/p2pmsg_content.fbs b/src/fbschema/p2pmsg_content.fbs index c44ba664..08280046 100644 --- a/src/fbschema/p2pmsg_content.fbs +++ b/src/fbschema/p2pmsg_content.fbs @@ -2,30 +2,32 @@ include "common_schema.fbs"; namespace fbschema.p2pmsg; -table RawInputList { //Pubkey bytes with an array of key value pairs. - pubkey:[ubyte]; - inputs:[BytesKeyValuePair]; +table UserSubmittedMessage { + content:[ubyte]; + signature:[ubyte]; } -table RawOutput { //Pubkey bytes with a output key value pair. +table UserSubmittedMessageGroup { pubkey:[ubyte]; - output:BytesKeyValuePair; + messages:[UserSubmittedMessage]; } -union Message { Proposal_Message, Npl_Message } //message content type +union Message { NonUnl_Proposal_Message, Proposal_Message, Npl_Message } //message content type table Content { message:Message; } +table NonUnl_Proposal_Message { + usermessages:[UserSubmittedMessageGroup]; +} + table Proposal_Message { //Proposal type message schema stage:uint8; time:uint64; lcl:[ubyte]; - users: [ByteArray]; - raw_inputs: [RawInputList]; //stage 0 inputs (hash and raw value) + users:[ByteArray]; hash_inputs:[ByteArray]; //stage > 0 inputs (hash of stage 0 inputs) - raw_outputs: [RawOutput]; //stage 0 outputs (hash and raw value) hash_outputs:[ByteArray]; //stage > 0 outputs (hash of stage 0 outputs) state: State; } diff --git a/src/fbschema/p2pmsg_content_generated.h b/src/fbschema/p2pmsg_content_generated.h index 6fddd44d..c3c5cbe1 100644 --- a/src/fbschema/p2pmsg_content_generated.h +++ b/src/fbschema/p2pmsg_content_generated.h @@ -1,8 +1,8 @@ // automatically generated by the FlatBuffers compiler, do not modify -#ifndef FLATBUFFERS_GENERATED_P2PMSGCONTENT_FBSCHEMA_P2PMSG_ -#define FLATBUFFERS_GENERATED_P2PMSGCONTENT_FBSCHEMA_P2PMSG_ +#ifndef FLATBUFFERS_GENERATED_P2PMSGCONTENT_FBSCHEMA_P2PMSG_H_ +#define FLATBUFFERS_GENERATED_P2PMSGCONTENT_FBSCHEMA_P2PMSG_H_ #include "flatbuffers/flatbuffers.h" @@ -11,12 +11,14 @@ namespace fbschema { namespace p2pmsg { -struct RawInputList; +struct UserSubmittedMessage; -struct RawOutput; +struct UserSubmittedMessageGroup; struct Content; +struct NonUnl_Proposal_Message; + struct Proposal_Message; struct Npl_Message; @@ -27,15 +29,17 @@ struct State; enum Message { Message_NONE = 0, - Message_Proposal_Message = 1, - Message_Npl_Message = 2, + Message_NonUnl_Proposal_Message = 1, + Message_Proposal_Message = 2, + Message_Npl_Message = 3, Message_MIN = Message_NONE, Message_MAX = Message_Npl_Message }; -inline const Message (&EnumValuesMessage())[3] { +inline const Message (&EnumValuesMessage())[4] { static const Message values[] = { Message_NONE, + Message_NonUnl_Proposal_Message, Message_Proposal_Message, Message_Npl_Message }; @@ -45,6 +49,7 @@ inline const Message (&EnumValuesMessage())[3] { inline const char * const *EnumNamesMessage() { static const char * const names[] = { "NONE", + "NonUnl_Proposal_Message", "Proposal_Message", "Npl_Message", nullptr @@ -62,6 +67,10 @@ template struct MessageTraits { static const Message enum_value = Message_NONE; }; +template<> struct MessageTraits { + static const Message enum_value = Message_NonUnl_Proposal_Message; +}; + template<> struct MessageTraits { static const Message enum_value = Message_Proposal_Message; }; @@ -73,10 +82,80 @@ template<> struct MessageTraits { bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Message type); bool VerifyMessageVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector> *values, const flatbuffers::Vector *types); -struct RawInputList FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { +struct UserSubmittedMessage FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_CONTENT = 4, + VT_SIGNATURE = 6 + }; + const flatbuffers::Vector *content() const { + return GetPointer *>(VT_CONTENT); + } + flatbuffers::Vector *mutable_content() { + return GetPointer *>(VT_CONTENT); + } + const flatbuffers::Vector *signature() const { + return GetPointer *>(VT_SIGNATURE); + } + flatbuffers::Vector *mutable_signature() { + return GetPointer *>(VT_SIGNATURE); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_CONTENT) && + verifier.VerifyVector(content()) && + VerifyOffset(verifier, VT_SIGNATURE) && + verifier.VerifyVector(signature()) && + verifier.EndTable(); + } +}; + +struct UserSubmittedMessageBuilder { + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_content(flatbuffers::Offset> content) { + fbb_.AddOffset(UserSubmittedMessage::VT_CONTENT, content); + } + void add_signature(flatbuffers::Offset> signature) { + fbb_.AddOffset(UserSubmittedMessage::VT_SIGNATURE, signature); + } + explicit UserSubmittedMessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + UserSubmittedMessageBuilder &operator=(const UserSubmittedMessageBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateUserSubmittedMessage( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset> content = 0, + flatbuffers::Offset> signature = 0) { + UserSubmittedMessageBuilder builder_(_fbb); + builder_.add_signature(signature); + builder_.add_content(content); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateUserSubmittedMessageDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const std::vector *content = nullptr, + const std::vector *signature = nullptr) { + auto content__ = content ? _fbb.CreateVector(*content) : 0; + auto signature__ = signature ? _fbb.CreateVector(*signature) : 0; + return fbschema::p2pmsg::CreateUserSubmittedMessage( + _fbb, + content__, + signature__); +} + +struct UserSubmittedMessageGroup FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_PUBKEY = 4, - VT_INPUTS = 6 + VT_MESSAGES = 6 }; const flatbuffers::Vector *pubkey() const { return GetPointer *>(VT_PUBKEY); @@ -84,133 +163,64 @@ struct RawInputList FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { flatbuffers::Vector *mutable_pubkey() { return GetPointer *>(VT_PUBKEY); } - const flatbuffers::Vector> *inputs() const { - return GetPointer> *>(VT_INPUTS); + const flatbuffers::Vector> *messages() const { + return GetPointer> *>(VT_MESSAGES); } - flatbuffers::Vector> *mutable_inputs() { - return GetPointer> *>(VT_INPUTS); + flatbuffers::Vector> *mutable_messages() { + return GetPointer> *>(VT_MESSAGES); } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && VerifyOffset(verifier, VT_PUBKEY) && verifier.VerifyVector(pubkey()) && - VerifyOffset(verifier, VT_INPUTS) && - verifier.VerifyVector(inputs()) && - verifier.VerifyVectorOfTables(inputs()) && + VerifyOffset(verifier, VT_MESSAGES) && + verifier.VerifyVector(messages()) && + verifier.VerifyVectorOfTables(messages()) && verifier.EndTable(); } }; -struct RawInputListBuilder { +struct UserSubmittedMessageGroupBuilder { flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; void add_pubkey(flatbuffers::Offset> pubkey) { - fbb_.AddOffset(RawInputList::VT_PUBKEY, pubkey); + fbb_.AddOffset(UserSubmittedMessageGroup::VT_PUBKEY, pubkey); } - void add_inputs(flatbuffers::Offset>> inputs) { - fbb_.AddOffset(RawInputList::VT_INPUTS, inputs); + void add_messages(flatbuffers::Offset>> messages) { + fbb_.AddOffset(UserSubmittedMessageGroup::VT_MESSAGES, messages); } - explicit RawInputListBuilder(flatbuffers::FlatBufferBuilder &_fbb) + explicit UserSubmittedMessageGroupBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { start_ = fbb_.StartTable(); } - RawInputListBuilder &operator=(const RawInputListBuilder &); - flatbuffers::Offset Finish() { + UserSubmittedMessageGroupBuilder &operator=(const UserSubmittedMessageGroupBuilder &); + flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); - auto o = flatbuffers::Offset(end); + auto o = flatbuffers::Offset(end); return o; } }; -inline flatbuffers::Offset CreateRawInputList( +inline flatbuffers::Offset CreateUserSubmittedMessageGroup( flatbuffers::FlatBufferBuilder &_fbb, flatbuffers::Offset> pubkey = 0, - flatbuffers::Offset>> inputs = 0) { - RawInputListBuilder builder_(_fbb); - builder_.add_inputs(inputs); + flatbuffers::Offset>> messages = 0) { + UserSubmittedMessageGroupBuilder builder_(_fbb); + builder_.add_messages(messages); builder_.add_pubkey(pubkey); return builder_.Finish(); } -inline flatbuffers::Offset CreateRawInputListDirect( +inline flatbuffers::Offset CreateUserSubmittedMessageGroupDirect( flatbuffers::FlatBufferBuilder &_fbb, const std::vector *pubkey = nullptr, - const std::vector> *inputs = nullptr) { + const std::vector> *messages = nullptr) { auto pubkey__ = pubkey ? _fbb.CreateVector(*pubkey) : 0; - auto inputs__ = inputs ? _fbb.CreateVector>(*inputs) : 0; - return fbschema::p2pmsg::CreateRawInputList( + auto messages__ = messages ? _fbb.CreateVector>(*messages) : 0; + return fbschema::p2pmsg::CreateUserSubmittedMessageGroup( _fbb, pubkey__, - inputs__); -} - -struct RawOutput FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { - enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_PUBKEY = 4, - VT_OUTPUT = 6 - }; - const flatbuffers::Vector *pubkey() const { - return GetPointer *>(VT_PUBKEY); - } - flatbuffers::Vector *mutable_pubkey() { - return GetPointer *>(VT_PUBKEY); - } - const fbschema::BytesKeyValuePair *output() const { - return GetPointer(VT_OUTPUT); - } - fbschema::BytesKeyValuePair *mutable_output() { - return GetPointer(VT_OUTPUT); - } - bool Verify(flatbuffers::Verifier &verifier) const { - return VerifyTableStart(verifier) && - VerifyOffset(verifier, VT_PUBKEY) && - verifier.VerifyVector(pubkey()) && - VerifyOffset(verifier, VT_OUTPUT) && - verifier.VerifyTable(output()) && - verifier.EndTable(); - } -}; - -struct RawOutputBuilder { - flatbuffers::FlatBufferBuilder &fbb_; - flatbuffers::uoffset_t start_; - void add_pubkey(flatbuffers::Offset> pubkey) { - fbb_.AddOffset(RawOutput::VT_PUBKEY, pubkey); - } - void add_output(flatbuffers::Offset output) { - fbb_.AddOffset(RawOutput::VT_OUTPUT, output); - } - explicit RawOutputBuilder(flatbuffers::FlatBufferBuilder &_fbb) - : fbb_(_fbb) { - start_ = fbb_.StartTable(); - } - RawOutputBuilder &operator=(const RawOutputBuilder &); - flatbuffers::Offset Finish() { - const auto end = fbb_.EndTable(start_); - auto o = flatbuffers::Offset(end); - return o; - } -}; - -inline flatbuffers::Offset CreateRawOutput( - flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset> pubkey = 0, - flatbuffers::Offset output = 0) { - RawOutputBuilder builder_(_fbb); - builder_.add_output(output); - builder_.add_pubkey(pubkey); - return builder_.Finish(); -} - -inline flatbuffers::Offset CreateRawOutputDirect( - flatbuffers::FlatBufferBuilder &_fbb, - const std::vector *pubkey = nullptr, - flatbuffers::Offset output = 0) { - auto pubkey__ = pubkey ? _fbb.CreateVector(*pubkey) : 0; - return fbschema::p2pmsg::CreateRawOutput( - _fbb, - pubkey__, - output); + messages__); } struct Content FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { @@ -228,6 +238,9 @@ struct Content FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { return GetPointer(VT_MESSAGE); } template const T *message_as() const; + const NonUnl_Proposal_Message *message_as_NonUnl_Proposal_Message() const { + return message_type() == Message_NonUnl_Proposal_Message ? static_cast(message()) : nullptr; + } const Proposal_Message *message_as_Proposal_Message() const { return message_type() == Message_Proposal_Message ? static_cast(message()) : nullptr; } @@ -246,6 +259,10 @@ struct Content FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { } }; +template<> inline const NonUnl_Proposal_Message *Content::message_as() const { + return message_as_NonUnl_Proposal_Message(); +} + template<> inline const Proposal_Message *Content::message_as() const { return message_as_Proposal_Message(); } @@ -285,17 +302,69 @@ inline flatbuffers::Offset CreateContent( return builder_.Finish(); } +struct NonUnl_Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_USERMESSAGES = 4 + }; + const flatbuffers::Vector> *usermessages() const { + return GetPointer> *>(VT_USERMESSAGES); + } + flatbuffers::Vector> *mutable_usermessages() { + return GetPointer> *>(VT_USERMESSAGES); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_USERMESSAGES) && + verifier.VerifyVector(usermessages()) && + verifier.VerifyVectorOfTables(usermessages()) && + verifier.EndTable(); + } +}; + +struct NonUnl_Proposal_MessageBuilder { + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_usermessages(flatbuffers::Offset>> usermessages) { + fbb_.AddOffset(NonUnl_Proposal_Message::VT_USERMESSAGES, usermessages); + } + explicit NonUnl_Proposal_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + NonUnl_Proposal_MessageBuilder &operator=(const NonUnl_Proposal_MessageBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateNonUnl_Proposal_Message( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset>> usermessages = 0) { + NonUnl_Proposal_MessageBuilder builder_(_fbb); + builder_.add_usermessages(usermessages); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateNonUnl_Proposal_MessageDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const std::vector> *usermessages = nullptr) { + auto usermessages__ = usermessages ? _fbb.CreateVector>(*usermessages) : 0; + return fbschema::p2pmsg::CreateNonUnl_Proposal_Message( + _fbb, + usermessages__); +} + struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_STAGE = 4, VT_TIME = 6, VT_LCL = 8, VT_USERS = 10, - VT_RAW_INPUTS = 12, - VT_HASH_INPUTS = 14, - VT_RAW_OUTPUTS = 16, - VT_HASH_OUTPUTS = 18, - VT_STATE = 20 + VT_HASH_INPUTS = 12, + VT_HASH_OUTPUTS = 14, + VT_STATE = 16 }; uint8_t stage() const { return GetField(VT_STAGE, 0); @@ -321,24 +390,12 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { flatbuffers::Vector> *mutable_users() { return GetPointer> *>(VT_USERS); } - const flatbuffers::Vector> *raw_inputs() const { - return GetPointer> *>(VT_RAW_INPUTS); - } - flatbuffers::Vector> *mutable_raw_inputs() { - return GetPointer> *>(VT_RAW_INPUTS); - } const flatbuffers::Vector> *hash_inputs() const { return GetPointer> *>(VT_HASH_INPUTS); } flatbuffers::Vector> *mutable_hash_inputs() { return GetPointer> *>(VT_HASH_INPUTS); } - const flatbuffers::Vector> *raw_outputs() const { - return GetPointer> *>(VT_RAW_OUTPUTS); - } - flatbuffers::Vector> *mutable_raw_outputs() { - return GetPointer> *>(VT_RAW_OUTPUTS); - } const flatbuffers::Vector> *hash_outputs() const { return GetPointer> *>(VT_HASH_OUTPUTS); } @@ -360,15 +417,9 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VerifyOffset(verifier, VT_USERS) && verifier.VerifyVector(users()) && verifier.VerifyVectorOfTables(users()) && - VerifyOffset(verifier, VT_RAW_INPUTS) && - verifier.VerifyVector(raw_inputs()) && - verifier.VerifyVectorOfTables(raw_inputs()) && VerifyOffset(verifier, VT_HASH_INPUTS) && verifier.VerifyVector(hash_inputs()) && verifier.VerifyVectorOfTables(hash_inputs()) && - VerifyOffset(verifier, VT_RAW_OUTPUTS) && - verifier.VerifyVector(raw_outputs()) && - verifier.VerifyVectorOfTables(raw_outputs()) && VerifyOffset(verifier, VT_HASH_OUTPUTS) && verifier.VerifyVector(hash_outputs()) && verifier.VerifyVectorOfTables(hash_outputs()) && @@ -393,15 +444,9 @@ struct Proposal_MessageBuilder { void add_users(flatbuffers::Offset>> users) { fbb_.AddOffset(Proposal_Message::VT_USERS, users); } - void add_raw_inputs(flatbuffers::Offset>> raw_inputs) { - fbb_.AddOffset(Proposal_Message::VT_RAW_INPUTS, raw_inputs); - } void add_hash_inputs(flatbuffers::Offset>> hash_inputs) { fbb_.AddOffset(Proposal_Message::VT_HASH_INPUTS, hash_inputs); } - void add_raw_outputs(flatbuffers::Offset>> raw_outputs) { - fbb_.AddOffset(Proposal_Message::VT_RAW_OUTPUTS, raw_outputs); - } void add_hash_outputs(flatbuffers::Offset>> hash_outputs) { fbb_.AddOffset(Proposal_Message::VT_HASH_OUTPUTS, hash_outputs); } @@ -426,18 +471,14 @@ inline flatbuffers::Offset CreateProposal_Message( uint64_t time = 0, flatbuffers::Offset> lcl = 0, flatbuffers::Offset>> users = 0, - flatbuffers::Offset>> raw_inputs = 0, flatbuffers::Offset>> hash_inputs = 0, - flatbuffers::Offset>> raw_outputs = 0, flatbuffers::Offset>> hash_outputs = 0, flatbuffers::Offset state = 0) { Proposal_MessageBuilder builder_(_fbb); builder_.add_time(time); builder_.add_state(state); builder_.add_hash_outputs(hash_outputs); - builder_.add_raw_outputs(raw_outputs); builder_.add_hash_inputs(hash_inputs); - builder_.add_raw_inputs(raw_inputs); builder_.add_users(users); builder_.add_lcl(lcl); builder_.add_stage(stage); @@ -450,16 +491,12 @@ inline flatbuffers::Offset CreateProposal_MessageDirect( uint64_t time = 0, const std::vector *lcl = nullptr, const std::vector> *users = nullptr, - const std::vector> *raw_inputs = nullptr, const std::vector> *hash_inputs = nullptr, - const std::vector> *raw_outputs = nullptr, const std::vector> *hash_outputs = nullptr, flatbuffers::Offset state = 0) { auto lcl__ = lcl ? _fbb.CreateVector(*lcl) : 0; auto users__ = users ? _fbb.CreateVector>(*users) : 0; - auto raw_inputs__ = raw_inputs ? _fbb.CreateVector>(*raw_inputs) : 0; auto hash_inputs__ = hash_inputs ? _fbb.CreateVector>(*hash_inputs) : 0; - auto raw_outputs__ = raw_outputs ? _fbb.CreateVector>(*raw_outputs) : 0; auto hash_outputs__ = hash_outputs ? _fbb.CreateVector>(*hash_outputs) : 0; return fbschema::p2pmsg::CreateProposal_Message( _fbb, @@ -467,9 +504,7 @@ inline flatbuffers::Offset CreateProposal_MessageDirect( time, lcl__, users__, - raw_inputs__, hash_inputs__, - raw_outputs__, hash_outputs__, state); } @@ -775,6 +810,10 @@ inline bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Mess case Message_NONE: { return true; } + case Message_NonUnl_Proposal_Message: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } case Message_Proposal_Message: { auto ptr = reinterpret_cast(obj); return verifier.VerifyTable(ptr); @@ -836,4 +875,4 @@ inline void FinishSizePrefixedContentBuffer( } // namespace p2pmsg } // namespace fbschema -#endif // FLATBUFFERS_GENERATED_P2PMSGCONTENT_FBSCHEMA_P2PMSG_ +#endif // FLATBUFFERS_GENERATED_P2PMSGCONTENT_FBSCHEMA_P2PMSG_H_ diff --git a/src/fbschema/p2pmsg_helpers.cpp b/src/fbschema/p2pmsg_helpers.cpp index 483726a4..63b08a89 100644 --- a/src/fbschema/p2pmsg_helpers.cpp +++ b/src/fbschema/p2pmsg_helpers.cpp @@ -139,6 +139,21 @@ int validate_and_extract_content(const Content **content_ref, const uint8_t *con return 0; } +/** + * Creates a non-unl proposal stuct from the given non-unl proposal message. + * @param The Flatbuffer non-unl poporal received from the peer. + * @return A non-unl proposal struct representing the message. + */ +const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const NonUnl_Proposal_Message &msg, uint64_t timestamp) +{ + p2p::nonunl_proposal nup; + + if (msg.usermessages()) + nup.user_messages = flatbuf_usermsgsmap_to_usermsgsmap(msg.usermessages()); + + return nup; +} + /** * Creates a proposal stuct from the given proposal message. * @param The Flatbuffer poporal received from the peer. @@ -159,15 +174,9 @@ const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const if (msg.users()) p.users = flatbuf_bytearrayvector_to_stringlist(msg.users()); - if (msg.raw_inputs()) - p.raw_inputs = flatbuf_rawinputs_to_hashbuffermap(msg.raw_inputs()); - if (msg.hash_inputs()) p.hash_inputs = flatbuf_bytearrayvector_to_stringlist(msg.hash_inputs()); - if (msg.raw_outputs()) - p.raw_outputs = flatbuf_rawoutputs_to_hashbuffermap(msg.raw_outputs()); - if (msg.hash_outputs()) p.hash_outputs = flatbuf_bytearrayvector_to_stringlist(msg.hash_outputs()); @@ -176,6 +185,23 @@ const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const //---Message creation helpers---// +void create_msg_from_nonunl_proposal(flatbuffers::FlatBufferBuilder &container_builder, const p2p::nonunl_proposal &nup) +{ + flatbuffers::FlatBufferBuilder builder(1024); + + flatbuffers::Offset nupmsg = + CreateNonUnl_Proposal_Message( + builder, + usermsgsmap_to_flatbuf_usermsgsmap(builder, nup.user_messages)); + + flatbuffers::Offset message = CreateContent(builder, Message_NonUnl_Proposal_Message, nupmsg.Union()); + builder.Finish(message); // Finished building message content to get serialised content. + + // Now that we have built the content message, + // we need to sign it and place it inside a container message. + create_containermsg_from_content(container_builder, builder, false); +} + /** * Ctreat proposal peer message from the given proposal struct. * @param container_builder Flatbuffer builder for the container message. @@ -186,7 +212,6 @@ void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder, // todo:get a average propsal message size and allocate content builder based on that. flatbuffers::FlatBufferBuilder builder(1024); - // Create dummy propsal message flatbuffers::Offset proposal = CreateProposal_Message( builder, @@ -194,9 +219,7 @@ void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder, p.time, sv_to_flatbuff_bytes(builder, p.lcl), stringlist_to_flatbuf_bytearrayvector(builder, p.users), - hashbuffermap_to_flatbuf_rawinputs(builder, p.raw_inputs), stringlist_to_flatbuf_bytearrayvector(builder, p.hash_inputs), - hashbuffermap_to_flatbuf_rawoutputs(builder, p.raw_outputs), stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs)); flatbuffers::Offset message = CreateContent(builder, Message_Proposal_Message, proposal.Union()); @@ -249,48 +272,24 @@ void create_containermsg_from_content( //---Conversion helpers from flatbuffers data types to std data types---// -/** - * Returns a hash buffer map from Flatbuffer proposal raw inputs. - */ -const std::unordered_map> -flatbuf_rawinputs_to_hashbuffermap(const flatbuffers::Vector> *fbvec) +const std::unordered_map> +flatbuf_usermsgsmap_to_usermsgsmap(const flatbuffers::Vector> *fbvec) { - std::unordered_map> map; + std::unordered_map> map; map.reserve(fbvec->size()); - for (const RawInputList *user : *fbvec) + for (const UserSubmittedMessageGroup *group : *fbvec) { - std::vector bufvec; - bufvec.reserve(user->inputs()->size()); + std::list msglist; - for (auto input : *user->inputs()) + for (auto msg : *group->messages()) { - // Create hash_buffer object and manually assign the hash from the input. - util::hash_buffer buf(flatbuff_bytes_to_sv(input->value())); //input->value() is the raw input. - buf.hash = flatbuff_bytes_to_sv(input->key()); //input->key() is the hash of the input. - - bufvec.push_back(buf); + msglist.push_back(usr::user_submitted_message( + flatbuff_bytes_to_sv(msg->content()), + flatbuff_bytes_to_sv(msg->signature()) + )); } - map.emplace(flatbuff_bytes_to_sv(user->pubkey()), std::move(bufvec)); - } - return map; -} - -/** - * Returns a hash buffer map from Flatbuffer proposal raw outputs. - */ -const std::unordered_map -flatbuf_rawoutputs_to_hashbuffermap(const flatbuffers::Vector> *fbvec) -{ - std::unordered_map map; - map.reserve(fbvec->size()); - for (const RawOutput *user : *fbvec) - { - // Create hash_buffer object and manually assign the hash from the output. - util::hash_buffer buf(flatbuff_bytes_to_sv(user->output()->value())); //output->value() is the raw output. - buf.hash = flatbuff_bytes_to_sv(user->output()->key()); //output->key() is the hash of the output. - - map.emplace(flatbuff_bytes_to_sv(user->pubkey()), std::move(buf)); + map.emplace(flatbuff_bytes_to_sv(group->pubkey()), std::move(msglist)); } return map; } @@ -298,50 +297,26 @@ flatbuf_rawoutputs_to_hashbuffermap(const flatbuffers::Vector>> -hashbuffermap_to_flatbuf_rawinputs(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map) +const flatbuffers::Offset>> +usermsgsmap_to_flatbuf_usermsgsmap(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map) { - std::vector> fbvec; + std::vector> fbvec; fbvec.reserve(map.size()); - for (auto const &[pubkey, bufvec] : map) + for (auto const &[pubkey, msglist] : map) { - std::vector> fbinputsvec; - for (const util::hash_buffer &buf : bufvec) + std::vector> fbmsgsvec; + for (const usr::user_submitted_message &msg : msglist) { - fbinputsvec.push_back(CreateBytesKeyValuePair( + fbmsgsvec.push_back(CreateUserSubmittedMessage( builder, - sv_to_flatbuff_bytes(builder, buf.hash), - sv_to_flatbuff_bytes(builder, buf.buffer))); + sv_to_flatbuff_bytes(builder, msg.content), + sv_to_flatbuff_bytes(builder, msg.sig))); } - fbvec.push_back(CreateRawInputList( + fbvec.push_back(CreateUserSubmittedMessageGroup( builder, sv_to_flatbuff_bytes(builder, pubkey), - builder.CreateVector(fbinputsvec))); - } - return builder.CreateVector(fbvec); -} - -/** - * Returns Flatbuffer vector of RawOutputs from a given map of hash buffers. - */ -const flatbuffers::Offset>> -hashbuffermap_to_flatbuf_rawoutputs(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map &map) -{ - std::vector> fbvec; - fbvec.reserve(map.size()); - for (auto const &[pubkey, buf] : map) - { - fbvec.push_back(CreateRawOutput( - builder, - sv_to_flatbuff_bytes(builder, pubkey), - CreateBytesKeyValuePair( - builder, - sv_to_flatbuff_bytes(builder, buf.hash), - sv_to_flatbuff_bytes(builder, buf.buffer)))); + builder.CreateVector(fbmsgsvec))); } return builder.CreateVector(fbvec); } diff --git a/src/fbschema/p2pmsg_helpers.hpp b/src/fbschema/p2pmsg_helpers.hpp index 1775855c..0ceca08e 100644 --- a/src/fbschema/p2pmsg_helpers.hpp +++ b/src/fbschema/p2pmsg_helpers.hpp @@ -21,10 +21,14 @@ int validate_container_trust(const Container *container); int validate_and_extract_content(const Content **content_ref, const uint8_t *content_ptr, flatbuffers::uoffset_t content_size); +const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const NonUnl_Proposal_Message &msg, uint64_t timestamp); + const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector *pubkey, uint64_t timestamp); //---Message creation helpers---// +void create_msg_from_nonunl_proposal(flatbuffers::FlatBufferBuilder &container_builder, const p2p::nonunl_proposal &nup); + void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder, const p2p::proposal &p); void create_containermsg_from_content( @@ -32,19 +36,13 @@ void create_containermsg_from_content( //---Conversion helpers from flatbuffers data types to std data types---// -const std::unordered_map> -flatbuf_rawinputs_to_hashbuffermap(const flatbuffers::Vector> *fbvec); - -const std::unordered_map -flatbuf_rawoutputs_to_hashbuffermap(const flatbuffers::Vector> *fbvec); +const std::unordered_map> +flatbuf_usermsgsmap_to_usermsgsmap(const flatbuffers::Vector> *fbvec); //---Conversion helpers from std data types to flatbuffers data types---// -const flatbuffers::Offset>> -hashbuffermap_to_flatbuf_rawinputs(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map); - -const flatbuffers::Offset>> -hashbuffermap_to_flatbuf_rawoutputs(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map &map); +const flatbuffers::Offset>> +usermsgsmap_to_flatbuf_usermsgsmap(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map); } // namespace fbschema::p2pmsg diff --git a/src/jsonschema/usrmsg_helpers.cpp b/src/jsonschema/usrmsg_helpers.cpp index e583844c..73efd71c 100644 --- a/src/jsonschema/usrmsg_helpers.cpp +++ b/src/jsonschema/usrmsg_helpers.cpp @@ -7,21 +7,9 @@ namespace jsonschema::usrmsg { -static const char *SCHEMA_VERSION = "0.1"; - -// These fields are used on json messages response validation. -static const char *FLD_TYPE = "type"; -static const char *FLD_CHALLENGE = "challenge"; -static const char *FLD_SIG = "sig"; -static const char *FLD_PUBKEY = "pubkey"; -static const char *FLD_INPUT = "input"; -static const char *FLD_CONTENT = "content"; -static const char *FLD_NONCE = "nonce"; - -// Message types -static const char *MSGTYPE_CHALLENGE = "public_challenge"; -static const char *MSGTYPE_CHALLENGE_RESP = "challenge_response"; -static const char *MSGTYPE_CONTRACT_INPUT = "contract_input"; +// Separators +static const char *SEP_COMMA = "\",\""; +static const char *SEP_COLON = "\":\""; // Length of user random challenge bytes. static const size_t CHALLENGE_LEN = 16; @@ -58,15 +46,17 @@ void create_user_challenge(std::string &msg, std::string &challengehex) // Only Hot Pocket version number is variable length. Therefore message size is roughly 95 bytes // so allocating 128bits for heap padding. msg.reserve(128); - msg.append("{\"version\":\"") + msg.append("{\"") + .append(FLD_VERSION) + .append(SEP_COLON) .append(SCHEMA_VERSION) - .append("\",\"") + .append(SEP_COMMA) .append(FLD_TYPE) - .append("\":\"") + .append(SEP_COLON) .append(MSGTYPE_CHALLENGE) - .append("\",\"") + .append(SEP_COMMA) .append(FLD_CHALLENGE) - .append("\":\"") + .append(SEP_COLON) .append(challengehex) .append("\"}"); } @@ -79,6 +69,7 @@ void create_user_challenge(std::string &msg, std::string &challengehex) * @param response The response bytes to verify. This will be parsed as json. * Accepted response format: * { + * "version": "" * "type": "challenge_response", * "challenge": "", * "sig": "", @@ -89,43 +80,35 @@ void create_user_challenge(std::string &msg, std::string &challengehex) */ int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string_view response, std::string_view original_challenge) { - // We load response raw bytes into json document. rapidjson::Document d; - - // Because we project the response message directly from the binary socket buffer in a zero-copy manner, the response - // string is not null terminated. 'kParseStopWhenDoneFlag' avoids rapidjson error in this case. - d.Parse(response.data()); - if (d.HasParseError()) - { - LOG_INFO << "Challenge response json parsing failed."; + if (parse_user_message(d, response) != 0) return -1; - } // Validate msg type. - if (!d.HasMember(FLD_TYPE) || d[FLD_TYPE] != MSGTYPE_CHALLENGE_RESP) + if (d[FLD_TYPE] != MSGTYPE_CHALLENGE_RESP) { - LOG_INFO << "User challenge response type invalid. 'challenge_response' expected."; + LOG_DBG << "User challenge response type invalid. 'challenge_response' expected."; return -1; } // Compare the response challenge string with the original issued challenge. if (!d.HasMember(FLD_CHALLENGE) || d[FLD_CHALLENGE] != original_challenge.data()) { - LOG_INFO << "User challenge response challenge invalid."; + LOG_DBG << "User challenge response challenge invalid."; return -1; } // Check for the 'sig' field existence. if (!d.HasMember(FLD_SIG) || !d[FLD_SIG].IsString()) { - LOG_INFO << "User challenge response signature invalid."; + LOG_DBG << "User challenge response signature invalid."; return -1; } // Check for the 'pubkey' field existence. if (!d.HasMember(FLD_PUBKEY) || !d[FLD_PUBKEY].IsString()) { - LOG_INFO << "User challenge response public key invalid."; + LOG_DBG << "User challenge response public key invalid."; return -1; } @@ -136,7 +119,7 @@ int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string util::getsv(d[FLD_SIG]), pubkeysv) != 0) { - LOG_INFO << "User challenge response signature verification failed."; + LOG_DBG << "User challenge response signature verification failed."; return -1; } @@ -145,4 +128,138 @@ int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string return 0; } +/** + * Extracts a signed input container message sent by user. + * + * @param extracted_content The content extracted from the message. + * @param extracted_sig The binary signature extracted from the message. + * @param d The json document holding the input container. + * Accepted signed input container format: + * { + * "version": "" + * "type": "contract_input", + * "content": "", + * "sig": "" + * } + * @return 0 on successful extraction. -1 for failure. + */ +int extract_signed_input_container( + std::string &extracted_content, std::string &extracted_sig, const rapidjson::Document &d) +{ + if (!d.HasMember(FLD_CONTENT) || !d.HasMember(FLD_SIG)) + { + LOG_DBG << "User signed input required fields missing."; + return -1; + } + + if (!d[FLD_CONTENT].IsString() || !d[FLD_SIG].IsString()) + { + LOG_DBG << "User signed input invaid field values."; + return -1; + } + + // Verify the signature of the content. + + const std::string content(d[FLD_CONTENT].GetString(), d[FLD_CONTENT].GetStringLength()); + + const std::string_view sighex(d[FLD_SIG].GetString(), d[FLD_SIG].GetStringLength()); + std::string sig; + sig.resize(crypto_sign_ed25519_BYTES); + util::hex2bin(reinterpret_cast(sig.data()), sig.length(), sighex); + + extracted_content = std::move(content); + extracted_sig = std::move(sig); + return 0; +} + +/** + * Extract the individual components of a given input container json. + * @param nonce The extracted nonce. + * @param input The extracted input. + * @param max_ledger_seqno The extracted max ledger sequence no. + * @param contentjson The json string containing the input container message. + * { + * "nonce": "", + * "input": "", + * "maxledgerseqno": 4562712334 + * } + * @return 0 on succesful extraction. -1 on failure. + */ +int extract_input_container(std::string &nonce, std::string &input, uint64_t &max_ledger_seqno, std::string_view contentjson) +{ + rapidjson::Document d; + d.Parse(contentjson.data()); + if (d.HasParseError()) + { + LOG_DBG << "User input container json parsing failed."; + return -1; + } + + if (!d.HasMember(FLD_NONCE) || !d.HasMember(FLD_INPUT) || !d.HasMember(FLD_MAX_LED_SEQ)) + { + LOG_DBG << "User input container required fields missing."; + return -1; + } + + if (!d[FLD_NONCE].IsString() || !d[FLD_INPUT].IsString() || !d[FLD_MAX_LED_SEQ].IsUint64()) + { + LOG_DBG << "User input container invaid field values."; + return -1; + } + + rapidjson::Value &inputval = d[FLD_INPUT]; + std::string_view inputhex(inputval.GetString(), inputval.GetStringLength()); + + // Convert hex input to binary. + input.resize(inputhex.length() / 2); + if (util::hex2bin( + reinterpret_cast(input.data()), + input.length(), + inputhex) != 0) + { + LOG_DBG << "Contract input format invalid."; + return -1; + } + + nonce = d[FLD_NONCE].GetString(); + max_ledger_seqno = d[FLD_MAX_LED_SEQ].GetUint64(); + + return 0; +} + +/** + * Parses a json message sent by a user. + * @param d RapidJson document to which the parsed json should be loaded. + * @param message The message to parse. + * @return 0 on successful parsing. -1 for failure. + */ +int parse_user_message(rapidjson::Document &d, std::string_view message) +{ + // We load response raw bytes into json document. + // Because we project the response message directly from the binary socket buffer in a zero-copy manner, the response + // string is not null terminated. 'kParseStopWhenDoneFlag' avoids rapidjson error in this case. + d.Parse(message.data()); + if (d.HasParseError()) + { + LOG_DBG << "User json message parsing failed."; + return -1; + } + + // Check existence of msg type field. + if (!d.HasMember(FLD_VERSION) || !d[FLD_VERSION].IsString()) + { + LOG_DBG << "User json message 'version' missing or invalid."; + return -1; + } + + // Check existence of msg type field. + if (!d.HasMember(FLD_TYPE) || !d[FLD_TYPE].IsString()) + { + LOG_DBG << "User json message 'type' missing or invalid."; + return -1; + } + + return 0; +} + } // namespace jsonschema::usrmsg \ No newline at end of file diff --git a/src/jsonschema/usrmsg_helpers.hpp b/src/jsonschema/usrmsg_helpers.hpp index 1dca112e..5a7758b4 100644 --- a/src/jsonschema/usrmsg_helpers.hpp +++ b/src/jsonschema/usrmsg_helpers.hpp @@ -5,9 +5,35 @@ namespace jsonschema::usrmsg { + +static const char *SCHEMA_VERSION = "0.1"; + +// These fields are used on json messages response validation. +static const char *FLD_VERSION = "version"; +static const char *FLD_TYPE = "type"; +static const char *FLD_CHALLENGE = "challenge"; +static const char *FLD_SIG = "sig"; +static const char *FLD_PUBKEY = "pubkey"; +static const char *FLD_INPUT = "input"; +static const char *FLD_MAX_LED_SEQ = "maxledgerseqno"; +static const char *FLD_CONTENT = "content"; +static const char *FLD_NONCE = "nonce"; + +// Message types +static const char *MSGTYPE_CHALLENGE = "public_challenge"; +static const char *MSGTYPE_CHALLENGE_RESP = "challenge_response"; +static const char *MSGTYPE_CONTRACT_INPUT = "contract_input"; + void create_user_challenge(std::string &msg, std::string &challengehex); int verify_user_challenge_response(std::string &extracted_pubkeyhex, std::string_view response, std::string_view original_challenge); + +int extract_signed_input_container(std::string &extracted_content, std::string &extracted_sig, const rapidjson::Document &d); + +int extract_input_container(std::string &nonce, std::string &input, uint64_t &max_ledger_seqno, std::string_view contentjson); + +int parse_user_message(rapidjson::Document &d, std::string_view message); + } // namespace jsonschema::usrmsg #endif \ No newline at end of file diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index e2f3061f..6c739f29 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -116,7 +116,7 @@ void peer_connection_watchdog() } } - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + util::sleep(200); } } @@ -150,4 +150,22 @@ bool is_message_duplicate(std::string_view message) return true; } +/** + * Broadcasts the given message to all currently connected outbound peers. + */ +void broadcast_message(peer_outbound_message msg) +{ + if (p2p::peer_connections.size() == 0) + { + LOG_DBG << "No peers to broadcast (not even self). Waiting until at least one peer connects."; + while (p2p::peer_connections.size() == 0) + util::sleep(100); + } + + //Broadcast while locking the peer_connections. + std::lock_guard lock(p2p::peer_connections_mutex); + for (auto &[k, session] : p2p::peer_connections) + session->send(msg); +} + } // namespace p2p \ No newline at end of file diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 02a3c08f..a90e604a 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -3,6 +3,7 @@ #include "../pchheader.hpp" #include "../sock/socket_session.hpp" +#include "../usr/user_input.hpp" #include "peer_session_handler.hpp" namespace p2p @@ -15,17 +16,23 @@ struct proposal uint64_t time; uint8_t stage; std::string lcl; - std::unordered_set users; - std::unordered_map> raw_inputs; - std::unordered_set hash_inputs; - std::unordered_map raw_outputs; - std::unordered_set hash_outputs; + std::set users; + std::set hash_inputs; + std::set hash_outputs; +}; + +struct nonunl_proposal +{ + std::unordered_map> user_messages; }; struct message_collection { std::list proposals; - std::mutex proposals_mutex; // Mutex for proposals access race conditions. + std::mutex proposals_mutex; // Mutex for proposals access race conditions. + + std::list nonunl_proposals; + std::mutex nonunl_proposals_mutex; // Mutex for non-unl proposals access race conditions. }; /** @@ -48,6 +55,8 @@ void peer_connection_watchdog(); bool is_message_duplicate(std::string_view message); +void broadcast_message(peer_outbound_message msg); + } // namespace p2p #endif \ No newline at end of file diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 690d5173..ec59b604 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -70,6 +70,13 @@ void peer_session_handler::on_message(sock::socket_sessionmessage_as_Proposal_Message(), container->pubkey(), container->timestamp())); } + else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message + { + std::lock_guard lock(collected_msgs.nonunl_proposals_mutex); // Insert non-unl proposal with lock. + + collected_msgs.nonunl_proposals.push_back( + p2pmsg::create_nonunl_proposal_from_msg(*content->message_as_NonUnl_Proposal_Message(), container->timestamp())); + } else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message { const p2pmsg::Npl_Message *npl = content->message_as_Npl_Message(); diff --git a/src/pchheader.hpp b/src/pchheader.hpp index 5b0929db..b5c692b4 100644 --- a/src/pchheader.hpp +++ b/src/pchheader.hpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include diff --git a/src/proc.cpp b/src/proc.cpp index f48bbaf0..2fc27a5a 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -36,7 +36,7 @@ __pid_t contract_pid; * * @return 0 on successful process creation. -1 on failure or contract process is already running. */ -int exec_contract(const ContractExecArgs &args) +int exec_contract(const contract_exec_args &args) { // Write any hp input messages to hp->sc pipe. if (write_contract_hp_inputs(args) != 0) @@ -162,7 +162,7 @@ int await_contract_execution() * "unl":[ "pkhex", ... ] * } */ -int write_contract_args(const ContractExecArgs &args) +int write_contract_args(const contract_exec_args &args) { // Populate the json string with contract args. // We don't use a JSON parser here because it's lightweight to contrstuct the @@ -230,7 +230,7 @@ int write_contract_args(const ContractExecArgs &args) /** * Writes any hp input messages to the contract. */ -int write_contract_hp_inputs(const ContractExecArgs &args) +int write_contract_hp_inputs(const contract_exec_args &args) { if (create_and_write_iopipes(hpscfds, args.hpscbufs.inputs) != 0) { @@ -246,7 +246,7 @@ int write_contract_hp_inputs(const ContractExecArgs &args) * * @return 0 on success. -1 on failure. */ -int read_contract_hp_outputs(const ContractExecArgs &args) +int read_contract_hp_outputs(const contract_exec_args &args) { // Clear the input buffers because we are sure the contract has finished reading from // that mapped memory portion. diff --git a/src/proc.hpp b/src/proc.hpp index 50110f02..38a62fb0 100644 --- a/src/proc.hpp +++ b/src/proc.hpp @@ -35,7 +35,7 @@ typedef std::unordered_map contract_bufmap_t; /** * Holds information that should be passed into the contract process. */ -struct ContractExecArgs +struct contract_exec_args { // Map of user I/O buffers (map key: user binary public key). // The value is a pair holding consensus-verified inputs and contract-generated outputs. @@ -52,7 +52,7 @@ struct ContractExecArgs // Current HotPocket timestamp. int64_t timestamp; - ContractExecArgs( + contract_exec_args( int64_t _timestamp, contract_bufmap_t &_userbufs, contract_bufmap_t &_nplbufs, @@ -65,17 +65,17 @@ struct ContractExecArgs } }; -int exec_contract(const ContractExecArgs &args); +int exec_contract(const contract_exec_args &args); int await_contract_execution(); //------Internal-use functions for this namespace. -int write_contract_args(const ContractExecArgs &args); +int write_contract_args(const contract_exec_args &args); -int write_contract_hp_inputs(const ContractExecArgs &args); +int write_contract_hp_inputs(const contract_exec_args &args); -int read_contract_hp_outputs(const ContractExecArgs &args); +int read_contract_hp_outputs(const contract_exec_args &args); // Common helper functions diff --git a/src/usr/user_input.hpp b/src/usr/user_input.hpp new file mode 100644 index 00000000..5765e5a9 --- /dev/null +++ b/src/usr/user_input.hpp @@ -0,0 +1,32 @@ +#ifndef _HP_USR_USER_INPUT_ +#define _HP_USR_USER_INPUT_ + +#include "../pchheader.hpp" + +namespace usr +{ + +/** + * Represents a signed contract input message a network user has submitted. + */ +struct user_submitted_message +{ + std::string content; + std::string sig; + + user_submitted_message(std::string content, std::string sig) + { + this->content = std::move(content); + this->sig = std::move(sig); + } + + user_submitted_message(std::string_view content, std::string_view sig) + { + this->content = content; + this->sig = sig; + } +}; + +} // namespace usr + +#endif \ No newline at end of file diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp index edd8ed5c..e82f05c2 100644 --- a/src/usr/user_session_handler.cpp +++ b/src/usr/user_session_handler.cpp @@ -44,7 +44,7 @@ void user_session_handler::on_message( // Meaning we have previously issued a challenge to the client, if (session->flags[util::SESSION_FLAG::USER_CHALLENGE_ISSUED]) { - if (verify_challenge(message, session)) + if (verify_challenge(message, session) == 0) return; } // Check whether this session belongs to an authenticated (challenge-verified) user. @@ -58,12 +58,20 @@ void user_session_handler::on_message( { // This is an authed user. connected_user &user = itr->second; - handle_user_message(user, message); - return; + if (handle_user_message(user, message) == 0) + return; + + LOG_DBG << "Bad message from user " << session->uniqueid; + // TODO: Increase session bad message count. + } + else + { + LOG_DBG << "User session id not found: " << session->uniqueid; } } - // If for any reason we reach this point, we should drop the connection. + // If for any reason we reach this point, we should drop the connection because none of the + // valid cases match. session->close(); LOG_INFO << "Dropped the user connection " << session->address << ":" << session->port; } @@ -77,14 +85,11 @@ void user_session_handler::on_close(sock::socket_session // Session is awaiting challenge response. if (session->flags[util::SESSION_FLAG::USER_CHALLENGE_ISSUED]) - { ctx.pending_challenges.erase(session->uniqueid); - } + // Session belongs to an authed user. else if (session->flags[util::SESSION_FLAG::USER_AUTHED]) - { remove_user(session->uniqueid); - } LOG_INFO << "User disconnected " << session->uniqueid; } diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index df2f381a..d99dd27e 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -1,6 +1,4 @@ #include "../pchheader.hpp" -#include "usr.hpp" -#include "user_session_handler.hpp" #include "../jsonschema/usrmsg_helpers.hpp" #include "../sock/socket_server.hpp" #include "../sock/socket_session_handler.hpp" @@ -8,6 +6,9 @@ #include "../conf.hpp" #include "../crypto.hpp" #include "../hplog.hpp" +#include "usr.hpp" +#include "user_session_handler.hpp" +#include "user_input.hpp" namespace jusrmsg = jsonschema::usrmsg; @@ -43,14 +44,20 @@ std::string issue_challenge(const std::string sessionid) return msgstr; } -bool verify_challenge(std::string_view message, sock::socket_session *session) +/** + * Verifies the given message for a previously issued user challenge. + * @param message Challenge response. + * @param session The socket session that received the response. + * @return 0 for successful verification. -1 for failure. + */ +int verify_challenge(std::string_view message, sock::socket_session *session) { // The received message must be the challenge response. We need to verify it. auto itr = ctx.pending_challenges.find(session->uniqueid); if (itr == ctx.pending_challenges.end()) { LOG_DBG << "No challenge found for the session " << session->uniqueid; - return false; + return -1; } std::string userpubkeyhex; @@ -81,7 +88,7 @@ bool verify_challenge(std::string_view message, sock::socket_sessionuniqueid << " authenticated. Public key " << userpubkeyhex; - return true; + return 0; } else { @@ -93,18 +100,44 @@ bool verify_challenge(std::string_view message, sock::socket_sessionuniqueid; } - return false; + return -1; } -void handle_user_message(connected_user &user, std::string_view message) +/** + * Processes a message sent by a connected user. This will be invoked by web socket on_message handler. + * @param user The authenticated user who sent the message. + * @param message The message sent by user. + * @return 0 on successful processing. -1 for failure. + */ +int handle_user_message(connected_user &user, std::string_view message) { + rapidjson::Document d; + if (jusrmsg::parse_user_message(d, message) == 0) { - std::lock_guard lock(ctx.users_mutex); - //Add to the hashed input buffer list. - user.inputs.push_back(util::hash_buffer(message, user.pubkey)); + // Message is a contract input message. + if (d[jusrmsg::FLD_TYPE] == jusrmsg::MSGTYPE_CONTRACT_INPUT) + { + std::string contentjson; + std::string sig; + if (jusrmsg::extract_signed_input_container(contentjson, sig, d) == 0) + { + std::lock_guard lock(ctx.users_mutex); + + //Add to the submitted input list. + user.submitted_inputs.push_back(user_submitted_message( + std::move(contentjson), + std::move(sig))); + return 0; + } + } + else + { + LOG_DBG << "Invalid user message type: " << d[jusrmsg::FLD_TYPE].GetString(); + } } - LOG_DBG << "Collected " << message.length() << " bytes from user"; + // Bad message. + return -1; } /** diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index 489785af..41855727 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -5,6 +5,7 @@ #include "../util.hpp" #include "../sock/socket_session.hpp" #include "user_session_handler.hpp" +#include "user_input.hpp" /** * Maintains the global user list with pending input outputs and manages user connections. @@ -21,16 +22,16 @@ struct connected_user // User binary public key const std::string pubkey; - // Holds the unprocessed user inputs (and the hashes) collected from websocket. - std::list inputs; + // Holds the unprocessed user inputs collected from websocket. + std::list submitted_inputs; // Holds the websocket session of this user. // We don't need to own the session object since the lifetime of user and session are coupled. sock::socket_session *session; /** - * @session - * @param _pubkey The public key of the user in binary format. + * @param session The web socket session the user is connected to. + * @param pubkey The public key of the user in binary format. */ connected_user(sock::socket_session *session, std::string_view pubkey) : pubkey(pubkey) @@ -85,9 +86,9 @@ int init(); std::string issue_challenge(const std::string sessionid); -bool verify_challenge(std::string_view message, sock::socket_session *session); +int verify_challenge(std::string_view message, sock::socket_session *session); -void handle_user_message(connected_user &user, std::string_view message); +int handle_user_message(connected_user &user, std::string_view message); int add_user(sock::socket_session *session, const std::string &pubkey); diff --git a/src/util.cpp b/src/util.cpp index 0acc0b00..b95ca1d9 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -59,6 +59,14 @@ int64_t get_epoch_milliseconds() .count(); } +/** + * Sleeps the current thread for specified no. of milliseconds. + */ +void sleep(uint64_t milliseconds) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds)); +} + /** * Compare two version strings in the format of "1.12.3". * v1 < v2 -> returns -1 diff --git a/src/util.hpp b/src/util.hpp index d5e5fe6e..2bf32d33 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -49,39 +49,12 @@ int hex2bin(unsigned char *decoded, size_t decoded_len, std::string_view hex_str int64_t get_epoch_milliseconds(); +void sleep(uint64_t milliseconds); + int version_compare(const std::string &x, const std::string &y); std::string_view getsv(const rapidjson::Value &v); -/** - * Represents a data buffer which calculates the hash of the buffer. - */ -struct hash_buffer -{ - std::string hash; - std::string buffer; - - hash_buffer(std::string_view data) - { - buffer = data; - } - - hash_buffer(std::string_view data, std::string_view hashprefix) - { - buffer = data; - - std::string timestr = std::to_string(get_epoch_milliseconds()); - - std::string stringtohash; - stringtohash.reserve(hashprefix.length() + buffer.length() + timestr.length()); - stringtohash.append(hashprefix); - stringtohash.append(buffer); - stringtohash.append(timestr); - - hash = crypto::get_hash(stringtohash); - } -}; - } // namespace util #endif \ No newline at end of file