From d6acee4e09f42a2656396acc3084d1efa890d787 Mon Sep 17 00:00:00 2001 From: Asanka Indrajith Date: Sat, 26 Oct 2019 11:46:32 -0400 Subject: [PATCH] Basic consensus implementation (#45) Consensus for user connections, user inputs, contract outputs and time. --- .gitignore | 3 +- CMakeLists.txt | 25 +- Dockerfile | 5 +- cluster-create.sh | 118 +++++++ cluster-start.sh | 27 ++ examples/echocontract/contract.js | 10 +- examples/hpclient/client.js | 2 +- src/conf.cpp | 10 +- src/cons/cons.cpp | 500 ++++++++++++++++++++++++++++ src/cons/cons.hpp | 72 ++++ src/crypto.cpp | 19 -- src/crypto.hpp | 2 - src/hplog.cpp | 4 +- src/main.cpp | 56 +--- src/p2p/message_content.fbs | 28 +- src/p2p/message_content_generated.h | 326 +++++++++--------- src/p2p/p2p.cpp | 77 +---- src/p2p/p2p.hpp | 36 +- src/p2p/peer_message_handler.cpp | 346 +++++++++++++++++++ src/p2p/peer_message_handler.hpp | 56 ++++ src/p2p/peer_session_handler.cpp | 189 +++-------- src/p2p/peer_session_handler.hpp | 4 +- src/proc.hpp | 4 +- src/usr/user_session_handler.cpp | 2 +- src/usr/usr.cpp | 12 +- src/usr/usr.hpp | 2 + src/util.cpp | 11 + src/util.hpp | 5 + 28 files changed, 1464 insertions(+), 487 deletions(-) create mode 100755 cluster-create.sh create mode 100755 cluster-start.sh create mode 100644 src/cons/cons.cpp create mode 100644 src/cons/cons.hpp create mode 100644 src/p2p/peer_message_handler.cpp create mode 100644 src/p2p/peer_message_handler.hpp diff --git a/.gitignore b/.gitignore index 45937a12..ffeeb717 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ release/** **/Makefile **/CMakeCache.txt **/cmake_install.cmake -**/CMakeFiles/** \ No newline at end of file +**/CMakeFiles/** +hpcluster \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 291dc5d0..5ae9ee3a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,40 +1,43 @@ cmake_minimum_required(VERSION 3.2) project(HPCore) -add_definitions("-std=c++17" "-DBOOST_LOG_DYN_LINK") -find_package(Boost REQUIRED COMPONENTS system log) +add_definitions("-std=c++17") set(CMAKE_RUNTIME_OUTPUT_DIRECTORY build) +set(CMAKE_BUILD_TYPE "MinSizeRel" FORCE) add_executable(hpcore src/sock/socket_client.cpp src/sock/socket_server.cpp src/sock/socket_session.cpp + src/p2p/peer_message_handler.cpp src/p2p/peer_session_handler.cpp src/p2p/p2p.cpp + src/usr/user_session_handler.cpp + src/usr/usr.cpp + src/cons/cons.cpp src/util.cpp src/crypto.cpp src/conf.cpp src/hplog.cpp src/proc.cpp - src/usr/user_session_handler.cpp - src/usr/usr.cpp src/main.cpp ) -add_custom_target(release - COMMAND cmake -DCMAKE_BUILD_TYPE=RELEASE . - COMMAND make hpcore +add_custom_target(docker COMMAND strip ./build/hpcore COMMAND docker build -t hpcore:latest . ) -set_target_properties(release PROPERTIES EXCLUDE_FROM_ALL TRUE) +set_target_properties(docker PROPERTIES EXCLUDE_FROM_ALL TRUE) +add_dependencies(docker hpcore) target_link_libraries(hpcore libsodium.a - ${Boost_SYSTEM_LIBRARY} - ${Boost_LOG_LIBRARY} - ${Boost_LOG_SETUP_LIBRARY} + libboost_system.a + libboost_thread.a + libboost_log.a + libboost_log_setup.a + libboost_filesystem.a stdc++fs pthread crypto diff --git a/Dockerfile b/Dockerfile index afe10b35..9d59a341 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,8 @@ -FROM ubuntu:bionic +# We are going with NodeJs debian docker image because sample contracts need NodeJs to run. +# Otherwise, hpcore itself can run on any docker image like ubuntu or debian without NodeJs. +FROM node:10.17.0-buster-slim +# hpcore binary is copied to /hp directory withtin the docker image. WORKDIR /hp COPY ./build/hpcore . ENTRYPOINT ["/hp/hpcore"] \ No newline at end of file diff --git a/cluster-create.sh b/cluster-create.sh new file mode 100755 index 00000000..30964b3c --- /dev/null +++ b/cluster-create.sh @@ -0,0 +1,118 @@ +#!/bin/bash + +# Generate contract sub-directories within this script directory for the given no. of cluster nodes. +# Usage (to generate 8-node cluster): ./cluster.sh 8 + +# Validate the node count arg. +if [ -n "$1" ] && [ "$1" -eq "$1" ] 2>/dev/null; then + echo "Generating a Hot Pocket cluster of ${1} node(s)..." +else + echo "Error: Please provide number of nodes." + exit 1 +fi + +# Delete and recreate 'hpcluster' directory. +rm -r hpcluster > /dev/null 2>&1 +mkdir hpcluster +clusterloc="./hpcluster" + +pushd $clusterloc > /dev/null 2>&1 + +# Create contract directories for all nodes in the cluster. +ncount=$1 +for (( i=0; i<$ncount; i++ )) +do + + let n=$i+1 + let peerport=22860+$n + let pubport=8080+$n + + # Create contract dir named "node" + ../build/hpcore new "node${n}" > /dev/null 2>&1 + + pushd ./node$n/cfg > /dev/null 2>&1 + + # Use NodeJs to manipulate HP json configuration. + + mv hp.cfg tmp.json # nodejs needs file extension to be .json + + # Collect each node pubkey and peer ports for later processing. + + pubkeys[i]=$(node -p "require('./tmp.json').pubkeyhex") + + # During hosting we use docker virtual dns instead of IP address. + # So each node is reachable via 'node' name. + peers[i]="node${n}:${peerport}" + + # Update contract config. + node -p "JSON.stringify({...require('./tmp.json'), \ + binary: '/usr/local/bin/node', \ + binargs: './bin/contract.js', \ + peerport: ${peerport}, \ + pubport: ${pubport}, \ + roundtime: 10000, + loglevel: 'debug' + }, null, 2)" > hp.cfg + rm tmp.json + + # Generate ssl certs + openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -keyout key.pem -out cert.pem \ + -subj "/C=AU/ST=ST/L=L/O=O/OU=OU/CN=localhost/emailAddress=hpnode${n}@example" > /dev/null 2>&1 + popd > /dev/null 2>&1 + + # Copy the contract executable. + mkdir ./node$n/bin + cp ../examples/echocontract/contract.js ./node$n/bin/contract.js +done + +# Function to generate JSON array string while skiping a given index. +function joinarr { + arrname=$1[@] + arr=("${!arrname}") + skip=$2 + + j=0 + str="[" + for (( i=0; i<$ncount; i++ )) + do + let prevlast=$ncount-2 + if [ "$i" != "$skip" ] + then + str="$str'${arr[i]}'" + + if [ $j -lt $prevlast ] + then + str="$str," + fi + let j=j+1 + fi + done + str="$str]" + + echo $str +} + +# Loop through all nodes hp.cfg and inject peer and unl lists (skip self node). +for (( j=0; j<$ncount; j++ )) +do + let n=$j+1 + mypeers=$(joinarr peers $j) + myunl=$(joinarr pubkeys $j) + + pushd ./node$n/cfg > /dev/null 2>&1 + mv hp.cfg tmp.json # nodejs needs file extension to be .json + node -p "JSON.stringify({...require('./tmp.json'),peers:${mypeers},unl:${myunl}}, null, 2)" > hp.cfg + rm tmp.json + popd > /dev/null 2>&1 +done + +popd > /dev/null 2>&1 + +# Create docker virtual network named 'hpnet' +# All nodes will communicate with each other via this network. +docker network create --driver bridge hpnet > /dev/null 2>&1 + +echo "Cluster generated at ${clusterloc}" +echo "Use \"./cluster-start.sh \" to run each node." + +exit 0 \ No newline at end of file diff --git a/cluster-start.sh b/cluster-start.sh new file mode 100755 index 00000000..f253960b --- /dev/null +++ b/cluster-start.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +# Runs the specified node contract with hpcore docker image. +# This script assumes you already have the hpcore docker image and 'hpnet' virtual docker network. +# Usage (to run the node no. 1): ./start.sh 1 + +# Validate the node count arg. +if [ -n "$1" ] && [ "$1" -eq "$1" ] 2>/dev/null; then + echo "Starting docker container..." +else + echo "Error: Please provide node ID to run." + exit 1 +fi + +clusterloc=$(pwd)/hpcluster +n=$1 + +let pubport=8080+$n +# let peerport=22860+$n #Uncomment if peer port needs to be exposed to host. + +# Mount the node contract directory into hpcore docker container and run. +# We specify --network=hpnet so all nodes will communicate via 'hpnet' docker virtual network. +# We specify --name for each node so it will be the virtual dns name for each node. +docker run --rm -t -i --network=hpnet --name=node${n} \ + -p ${pubport}:${pubport} \ + --mount type=bind,source=${clusterloc}/node${n},target=/contract \ + hpcore:latest run /contract \ No newline at end of file diff --git a/examples/echocontract/contract.js b/examples/echocontract/contract.js index 65eb7dec..7ed22bdb 100644 --- a/examples/echocontract/contract.js +++ b/examples/echocontract/contract.js @@ -4,8 +4,8 @@ process.on('uncaughtException', (err) => { const fs = require('fs') let input = fs.readFileSync(0, 'utf8'); -console.log("===Sample contract started==="); -console.log("Contract args received from hp: " + input); +//console.log("===Sample contract started==="); +//console.log("Contract args received from hp: " + input); let hpargs = JSON.parse(input); @@ -22,9 +22,9 @@ Object.keys(hpargs.usrfd).forEach(function (key, index) { let hpinput = fs.readFileSync(hpargs.hpfd[0], 'utf8'); if (hpinput.length > 0) { - console.log("Input received from hp:"); - console.log(hpinput); + //console.log("Input received from hp:"); + //console.log(hpinput); fs.writeSync(hpargs.hpfd[1], "Echoing: " + hpinput); } -console.log("===Sample contract ended==="); \ No newline at end of file +//console.log("===Sample contract ended==="); \ No newline at end of file diff --git a/examples/hpclient/client.js b/examples/hpclient/client.js index 5c4967cc..27101996 100644 --- a/examples/hpclient/client.js +++ b/examples/hpclient/client.js @@ -93,7 +93,7 @@ function main() { var input_pump = () => { rl.question('', (answer) => { - ws.send(answer + "\n") + ws.send(answer) input_pump() }) } diff --git a/src/conf.cpp b/src/conf.cpp index 8393e924..6af1f9c2 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -38,6 +38,14 @@ int init() if (validate_contract_dir_paths() != 0 || load_config() != 0 || validate_config() != 0) return -1; + // Append self peer to peer list. + std::string portstr = std::to_string(cfg.peerport); + std::string peerid = "0.0.0.0:" + portstr; + cfg.peers.emplace(std::move(peerid), std::make_pair("0.0.0.0", portstr)); + + // Append self pubkey to unl list. + cfg.unl.emplace(cfg.pubkey); + return 0; } @@ -457,7 +465,7 @@ int validate_contract_dir_paths() { std::cout << path << " does not exist. Please provide self-signed certificates. Can generate using command\n" << "openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -keyout key.pem -out cert.pem\n" - << "and add it to " + ctx.configDir; + << "and add it to " + ctx.configDir << std::endl; } else { diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp new file mode 100644 index 00000000..fb60d567 --- /dev/null +++ b/src/cons/cons.cpp @@ -0,0 +1,500 @@ +#include +#include +#include +#include +#include +#include "../conf.hpp" +#include "../usr/usr.hpp" +#include "../p2p/p2p.hpp" +#include "../p2p/peer_message_handler.hpp" +#include "../p2p/peer_session_handler.hpp" +#include "../hplog.hpp" +#include "../crypto.hpp" +#include "../proc.hpp" +#include "cons.hpp" + +namespace cons +{ + +consensus_context ctx; + +/** + * Increment voting table counter. + * + * @param counter The counter map in which a vote should be incremented. + * @param candidate The candidate whose vote should be increased by 1. + */ +template +void increment(std::map &counter, const T &candidate) +{ + if (counter.count(candidate)) + counter[candidate]++; + else + counter.try_emplace(candidate, 1); +} + +void consensus() +{ + // A consensus round consists of 4 stages (0,1,2,3). + + // For a given stage, this function may get visited multiple times due to time-wait conditions. + + // Get the latest current time. + ctx.time_now = util::get_epoch_milliseconds(); + + // Throughout consensus, we move over the incoming proposals collected via the network so far into + // the candidate proposal set (move and append). This is to have a private working set for the consensus and void + // threading conflicts with network incoming proposals list. + { + std::lock_guard lock(p2p::collected_msgs.proposals_mutex); + ctx.candidate_proposals.splice(ctx.candidate_proposals.end(), p2p::collected_msgs.proposals); + } + + if (ctx.stage == 0) + { + // Stage 0 means begining of a consensus round. + + // Remove any useless candidate proposals so we'll have a cleaner proposal set to look at + // when we transition to stage 1. + auto itr = ctx.candidate_proposals.begin(); + while (itr != ctx.candidate_proposals.end()) + { + // Remove any proposal from previous round's stage 3. + // Remove any proposal from self (pubkey match). + // todo: check the state of these to ensure we're running consensus ledger + if (itr->stage == 3 || conf::cfg.pubkey == itr->pubkey) + ctx.candidate_proposals.erase(itr++); + else + ++itr; + } + + // In stage 0 we create a novel proposal and broadcast it. + const p2p::proposal stg_prop = create_stage0_proposal(); + if (broadcast_proposal(stg_prop) != 0) + { + // No peers to broadcast stage0 proposal (not even self). So we wait and try stage 0 again. + timewait_stage(true); + return; + } + } + else // Stage 1, 2, 3 + { + // Initialize vote counters + vote_counter votes; + + // check if we're ahead/behind of consensus + bool is_desync, reset_to_stage0; + int8_t majority_stage; + check_majority_stage(is_desync, reset_to_stage0, majority_stage, votes); + if (is_desync) + { + timewait_stage(reset_to_stage0); + return; + } + + // In stage 1, 2, 3 we vote for incoming proposals and promote winning votes based on thresholds. + const p2p::proposal stg_prop = create_stage123_proposal(votes); + broadcast_proposal(stg_prop); + + // Remove all candidate proposals that are behind our current stage. + auto itr = ctx.candidate_proposals.begin(); + while (itr != ctx.candidate_proposals.end()) + { + if (itr->stage < ctx.stage) + ctx.candidate_proposals.erase(itr++); + else + ++itr; + } + + if (ctx.stage == 3) + { + apply_ledger(stg_prop); + + // We have finished a consensus round (all 4 stages). + LOG_DBG << "****Stage 3 consensus reached****"; + } + } + + // We have finished a consensus stage. + + // Transition to next stage. + ctx.stage = (ctx.stage + 1) % 4; + + // after a stage 0 novel proposal we will just busy wait for proposals + if (ctx.stage == 0) + std::this_thread::sleep_for(std::chrono::milliseconds(conf::cfg.roundtime / 100)); + else + std::this_thread::sleep_for(std::chrono::milliseconds(conf::cfg.roundtime / 4)); +} + +p2p::proposal create_stage0_proposal() +{ + // The proposal we are going to emit in stage 0. + p2p::proposal stg_prop; + stg_prop.time = ctx.time_now; + stg_prop.timestamp = ctx.time_now; + ctx.novel_proposal_time = ctx.time_now; + stg_prop.stage = 0; + stg_prop.lcl = ctx.lcl; + + // Populate the stg_prop with users list (user pubkey list) and their inputs. + { + std::lock_guard lock(usr::users_mutex); + for (auto &[sid, user] : usr::users) + { + // add all the user connections we host + stg_prop.users.emplace(user.pubkey); + + // and all their pending messages + if (!user.inbuffer.empty()) + { + std::string input; + input.swap(user.inbuffer); + stg_prop.raw_inputs.try_emplace(user.pubkey, std::move(input)); + } + } + } + + // Populate the stg_prop with any contract outputs from previous round's stage 3. + for (auto &[pubkey, bufpair] : ctx.local_userbuf) + { + if (!bufpair.second.empty()) // bufpair.second is the output buffer. + { + std::string rawoutput; + rawoutput.swap(bufpair.second); + + stg_prop.raw_outputs.try_emplace(pubkey, std::move(rawoutput)); + } + } + ctx.local_userbuf.clear(); + + // todo: set propsal states + // todo: generate stg_prop hash and check with ctx.novel_proposal, we are sending same stg_prop again. + + return stg_prop; +} + +p2p::proposal create_stage123_proposal(vote_counter &votes) +{ + // The proposal to be emited at the end of this stage. + p2p::proposal stg_prop; + stg_prop.timestamp = ctx.time_now; + stg_prop.stage = ctx.stage; + + // we always vote for our current lcl regardless of what other peers are saying + // if there's a fork condition we will either request history and state from + // our peers or we will halt depending on level of consensus on the sides of the fork + stg_prop.lcl = ctx.lcl; + + //todo:check lcl votes and wait for proposals + + // Vote for rest of the proposal fields + for (const p2p::proposal &cp : ctx.candidate_proposals) + { + // Vote for times. + // Everyone votes on an arbitrary time, as long as its within the round time and not in the future + if (ctx.time_now > cp.time && (ctx.time_now - cp.time) < conf::cfg.roundtime) + increment(votes.time, cp.time); + + // Vote for user connections + for (const std::string &user : cp.users) + increment(votes.users, user); + + // Vote for user inputs + + // Proposals from stage 0 will have raw inputs in them. + if (!cp.raw_inputs.empty()) + { + for (auto &[pubkey, input] : cp.raw_inputs) + { + // Hash the pubkey+input. + std::string str_to_hash; + str_to_hash.reserve(pubkey.size() + input.size()); + str_to_hash.append(pubkey); + str_to_hash.append(input); + std::string hash = crypto::sha_512_hash(str_to_hash, "INP", 3); + + // Vote for the hash. + increment(votes.inputs, hash); + + // Remember the actual input along with the hash for future use for apply-ledger. + ctx.possible_inputs.try_emplace( + std::move(hash), + std::make_pair(pubkey, input)); + } + } + // Proposals from stage 1, 2, 3 will have hashed inputs in them. + else if (!cp.hash_inputs.empty()) + { + for (const std::string &inputhash : cp.hash_inputs) + increment(votes.inputs, inputhash); + } + + // Vote for user outputs + + // Proposals from stage 0 will have raw user outputs in them. + if (!cp.raw_outputs.empty()) + { + for (auto &[pubkey, output] : cp.raw_outputs) + { + // Hash the pubkey+input. + std::string str_to_hash; + str_to_hash.reserve(pubkey.size() + output.size()); + str_to_hash.append(pubkey); + str_to_hash.append(output); + std::string hash = crypto::sha_512_hash(str_to_hash, "OUT", 3); + + // Vote for the hash. + increment(votes.outputs, hash); + + // Remember the actual output along with the hash for future use for apply-ledger. + ctx.possible_outputs.try_emplace( + std::move(hash), + std::make_pair(pubkey, output)); + } + } + // Proposals from stage 1, 2, 3 will have hashed user outputs in them. + else if (!cp.hash_outputs.empty()) + { + for (auto outputhash : cp.hash_outputs) + { + increment(votes.outputs, outputhash); + } + } + + // todo: repeat above for state + } + + float_t vote_threshold = get_stage_threshold(ctx.stage); + + // todo: check if inputs being proposed by another node are actually spoofed inputs + // from a user locally connected to this node. + + // if we're at proposal stage 1 we'll accept any input and connection that has 1 or more vote. + + // Add user connections which have votes over stage threshold to proposal. + for (auto &[userpubkey, numvotes] : votes.users) + if (numvotes >= vote_threshold || (numvotes > 0 && ctx.stage == 1)) + stg_prop.users.emplace(userpubkey); + + // Add inputs which have votes over stage threshold to proposal. + for (auto &[hash, numvotes] : votes.inputs) + if (numvotes >= vote_threshold || (numvotes > 0 && ctx.stage == 1)) + stg_prop.hash_inputs.emplace(hash); + + // Add outputs which have votes over stage threshold to proposal. + for (auto &[hash, numvotes] : votes.outputs) + if (numvotes >= vote_threshold) + stg_prop.hash_outputs.emplace(hash); + + // todo:add states which have votes over stage threshold to proposal. + + // time is voted on a simple sorted and majority basis, since there will always be disagreement. + int32_t highest_votes = 0; + for (auto [time, numvotes] : votes.time) + { + if (numvotes > highest_votes) + { + highest_votes = numvotes; + stg_prop.time = time; + } + } + + return stg_prop; +} + +/** + * Broadcasts the given proposal to all connected peers. + * @return 0 on success. -1 if not peers to broadcast. + */ +int broadcast_proposal(const p2p::proposal &p) +{ + p2p::peer_outbound_message msg(std::make_shared(1024)); + p2p::create_msg_from_proposal(msg.builder(), p); + + { + //Broadcast while locking the peer_connections. + std::lock_guard lock(p2p::peer_connections_mutex); + + if (p2p::peer_connections.size() == 0) + { + LOG_DBG << "No peers to broadcast"; + return -1; + } + + for (auto &[k, session] : p2p::peer_connections) + session->send(msg); + } + + LOG_DBG << "Proposed [stage" << std::to_string(p.stage) + << "] users:" << p.users.size() + << " rinp:" << p.raw_inputs.size() + << " hinp:" << p.hash_inputs.size() + << " rout:" << p.raw_outputs.size() + << " hout:" << p.hash_outputs.size(); + + return 0; +} + +/** + * Check whether our current stage is ahead or behind of the majority stage. + */ +void check_majority_stage(bool &is_desync, bool &should_reset, int8_t &majority_stage, vote_counter &votes) +{ + // Stage votes. + for (const p2p::proposal &cp : ctx.candidate_proposals) + { + // Vote stages if only proposal lcl is match with node's last consensus lcl + if (cp.lcl == ctx.lcl) + increment(votes.stage, cp.stage); + + // todo:vote for lcl checking condtion + } + + majority_stage = -1; + is_desync = false; + + int32_t highest_votes = 0; + for (const auto [stage, votes] : votes.stage) + { + if (votes > highest_votes) + { + highest_votes = votes; + majority_stage = stage; + } + } + + if (majority_stage < ctx.stage - 1) + { + should_reset = (ctx.time_now - ctx.novel_proposal_time) < floor(conf::cfg.roundtime / 4); + is_desync = true; + + LOG_DBG << "Stage desync (Reset:" << should_reset << "). Node stage:" << std::to_string(ctx.stage) + << " is ahead of majority stage:" << std::to_string(majority_stage); + } + else if (majority_stage > ctx.stage - 1) + { + should_reset = true; + is_desync = true; + + LOG_DBG << "Stage desync (Reset:" << should_reset << "). Node stage:" << std::to_string(ctx.stage) + << " is behind majority stage:" << std::to_string(majority_stage); + } +} + +/** + * Returns the consensus percentage threshold for the specified stage. + * @param stage The consensus stage [1, 2, 3] + */ +float_t get_stage_threshold(int8_t stage) +{ + switch (stage) + { + case 1: + return cons::STAGE1_THRESHOLD * conf::cfg.unl.size(); + case 2: + return cons::STAGE2_THRESHOLD * conf::cfg.unl.size(); + case 3: + return cons::STAGE3_THRESHOLD * conf::cfg.unl.size(); + } + return -1; +} + +void timewait_stage(bool reset) +{ + if (reset) + ctx.stage = 0; + + std::this_thread::sleep_for(std::chrono::milliseconds(conf::cfg.roundtime / 100)); +} + +/** + * Finalize the ledger after consensus. + * @param cons_prop The proposal that reached consensus. + */ +void apply_ledger(const p2p::proposal &cons_prop) +{ + // todo:write lcl. + + // Send any output from the previous consensus round to users. + for (const std::string &hash : cons_prop.hash_outputs) + { + auto itr = ctx.possible_outputs.find(hash); + bool hashfound = (itr != ctx.possible_outputs.end()); + if (!hashfound) + { + // There's no possiblity for this to happen. + LOG_ERR << "Output required but wasn't in our possible output dict, this will potentially cause desync."; + // todo: consider fatal + } + else + { + // Send outputs to users. + auto &[pubkey, output] = itr->second; + std::string outputtosend; + outputtosend.swap(output); + + { + std::lock_guard lock(usr::users_mutex); + + // Find the user by session id. + const std::string sessionid = usr::sessionids[pubkey]; + auto itr = usr::users.find(sessionid); + if (itr != usr::users.end()) + { + const usr::connected_user &user = itr->second; + usr::user_outbound_message outmsg(std::move(outputtosend)); + user.session->send(std::move(outmsg)); + } + } + } + } + + // now we can safely clear our outputs. + ctx.possible_outputs.empty(); + + //todo:check state against the winning / canonical state + //and act accordingly (rollback, ask state from peer, etc.) + + //create input to feed to binary contract run + + //todo:remove entries from pending inputs that made their way into a closed ledger + for (const std::string &hash : cons_prop.hash_inputs) + { + auto itr = ctx.possible_inputs.find(hash); + bool hashfound = (itr != ctx.possible_inputs.end()); + if (!hashfound) + { + // There's no possiblity for this to happen. + LOG_ERR << "input required but wasn't in our possible input dict, this will potentially cause desync"; + // todo: consider fatal + } + else + { + // Prepare ctx.local_userbuf with user inputs to feed to the contract. + + const std::string &pubkey = itr->second.first; + std::string rawinput = itr->second.second; + + std::string inputtofeed; + inputtofeed.swap(rawinput); + + std::pair bufpair; + bufpair.first = std::move(inputtofeed); + ctx.local_userbuf.try_emplace(pubkey, std::move(bufpair)); + } + } + + run_contract_binary(cons_prop.time); +} + +void run_contract_binary(int64_t time_now) +{ + std::pair hpscbufpair; + std::unordered_map> nplbufs; + + proc::ContractExecArgs eargs(time_now, ctx.local_userbuf, nplbufs, hpscbufpair); + proc::exec_contract(eargs); +} + +} // namespace cons \ No newline at end of file diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp new file mode 100644 index 00000000..9893a6a2 --- /dev/null +++ b/src/cons/cons.hpp @@ -0,0 +1,72 @@ +#ifndef _HP_CONS_H_ +#define _HP_CONS_H_ + +#include +#include +#include +#include +#include "../p2p/p2p.hpp" + +namespace cons +{ + +//stage 1 vote threshold +static const float STAGE1_THRESHOLD = 0.5; +//stage 2 vote threshold +static const float STAGE2_THRESHOLD = 0.65; +//stage 3 vote threshold +static const float STAGE3_THRESHOLD = 0.8; + +/** + * This is used to store consensus information + */ +struct consensus_context +{ + std::list candidate_proposals; + + int8_t stage; + int64_t novel_proposal_time; + int64_t time_now; + std::string lcl; + std::string novel_proposal; + std::map> possible_inputs; + std::map> possible_outputs; + + std::unordered_map> local_userbuf; + + int32_t next_sleep; +}; + +struct vote_counter +{ + std::map stage; + std::map time; + std::map lcl; + std::map users; + std::map inputs; + std::map outputs; +}; + +extern consensus_context ctx; + +void consensus(); + +void apply_ledger(const p2p::proposal &proposal); + +float_t get_stage_threshold(int8_t stage); + +void timewait_stage(bool reset); + +p2p::proposal create_stage0_proposal(); + +p2p::proposal create_stage123_proposal(vote_counter &votes); + +int broadcast_proposal(const p2p::proposal &p); + +void check_majority_stage(bool &is_desync, bool &should_reset, int8_t &majority_stage, vote_counter &votes); + +void run_contract_binary(std::int64_t time); + +} // namespace cons + +#endif diff --git a/src/crypto.cpp b/src/crypto.cpp index c1d0f989..11a260b8 100644 --- a/src/crypto.cpp +++ b/src/crypto.cpp @@ -132,25 +132,6 @@ int verify_hex(std::string_view msg, std::string_view sighex, std::string_view p decoded_pubkey + 1); // +1 to skip prefix byte. } -/** - * Generate SHA 512 hash for message prepend with prefix before hashing. - * - * @param msg message string. - * @param prefix prefix char array. - * @param char_length length of prefix char array. - * @return SHA 512 hash. - */ -std::string sha_512_hash(const std::string &msg, const char *prefix, size_t char_length) -{ - std::string payload; - payload.reserve(char_length + msg.size()); - payload.append(prefix); - payload.append(msg); - unsigned char hashchars[crypto_hash_sha512_BYTES]; - crypto_hash_sha512(hashchars, (unsigned char *)payload.data(), payload.length()); - return std::string((char *)hashchars, crypto_hash_sha512_BYTES); -} - /** * Generate SHA 512 hash for message prepend with prefix before hashing. * diff --git a/src/crypto.hpp b/src/crypto.hpp index 4289f266..a05375d7 100644 --- a/src/crypto.hpp +++ b/src/crypto.hpp @@ -29,8 +29,6 @@ int verify(std::string_view msg, std::string_view sig, std::string_view pubkey); int verify_hex(std::string_view msg, std::string_view sighex, std::string_view pubkeyhex); -std::string sha_512_hash(const std::string &msg, const char *prefix, size_t char_length); - std::string sha_512_hash(std::string_view msg, const char *prefix, size_t char_length); } // namespace crypto diff --git a/src/hplog.cpp b/src/hplog.cpp index dc824a77..8a8cc288 100644 --- a/src/hplog.cpp +++ b/src/hplog.cpp @@ -79,12 +79,12 @@ void init() // This will make every new launch of Hot Pocket to start a new log file number. // It will scan existing log files matching the pattern and find the next number. - keywords::scan_method = sinks::file::scan_matching, + keywords::scan_method = sinks::file::scan_matching #ifndef NDEBUG // We enable auto_flush to immediately get the logs onto the file. Otherwise it takes time // for buffered logs to reach the file. This impacts performance. So enabled only in debug build. - keywords::auto_flush = true + , keywords::auto_flush = true #endif ); } diff --git a/src/main.cpp b/src/main.cpp index cac57889..564b2ec2 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -13,9 +13,9 @@ #include "conf.hpp" #include "crypto.hpp" #include "usr/usr.hpp" -#include "p2p/p2p.hpp" #include "proc.hpp" #include "hplog.hpp" +#include "cons/cons.hpp" /** * Parses CLI args and extracts hot pocket command and parameters given. @@ -124,65 +124,21 @@ int main(int argc, char **argv) hplog::init(); - if (usr::init() != 0) + if (p2p::init() != 0) return -1; - if (p2p::init() != 0) + if (usr::init() != 0) return -1; // After initializing primary subsystems, register the SIGINT handler. signal(SIGINT, signal_handler); - // This will start hosting the contract and start consensus rounds. - // TODO + cons::ctx.stage = 0; + cons::ctx.lcl = "static_lcl"; while (true) { - sleep(2); - - // Test code to execute contract and collect outputs. - std::unordered_map> userbufs; - for (auto &[sid, user] : usr::users) - { - std::pair bufpair; - std::string inputtosend; - inputtosend.swap(user.inbuffer); - bufpair.first = std::move(inputtosend); - - userbufs.emplace(user.pubkey, std::move(bufpair)); - } - - std::pair hpscbufpair; - hpscbufpair.first = "{msg:'Message from HP'}"; - - std::unordered_map> nplbufs; - for (int i = 0; i < 3; i++) - { - std::pair bufpair; - nplbufs.emplace("aaa", std::move(bufpair)); - } - - proc::ContractExecArgs eargs(123123345, userbufs, nplbufs, hpscbufpair); - proc::exec_contract(eargs); - - for (auto &[pubkey, bufpair] : userbufs) - { - if (!bufpair.second.empty()) - { - // Find the user session id by the pubkey. - const std::string sessionid = usr::sessionids[pubkey]; - - // Find the user by session id. - auto itr = usr::users.find(sessionid); - const usr::connected_user &user = itr->second; - user.session->send(std::move(bufpair.second)); - } - } - - if (!hpscbufpair.second.empty()) - LOG_DBG << "Message from SC: " << hpscbufpair.second; - - userbufs.clear(); + cons::consensus(); } // Free resources. diff --git a/src/p2p/message_content.fbs b/src/p2p/message_content.fbs index 4ebdfc9b..438450c6 100644 --- a/src/p2p/message_content.fbs +++ b/src/p2p/message_content.fbs @@ -1,36 +1,36 @@ //IDL file for p2p message content schema. namespace p2p; -table StringKeyValuePair { //flatbuff equivalent for dictionary/hashmap for +table BytesKeyValuePair { //flatbuff equivalent for dictionary/hashmap for key:[ubyte]; - value:string; + value:[ubyte]; } table ByteArray { //To represent list of byte arrays array:[ubyte]; } -union Message { Proposal, Npl } //message content type +union Message { Proposal_Message, Npl_Message } //message content type table Content { message:Message; } -table Proposal { //Proposal type message schema +table Proposal_Message { //Proposal type message schema pubkey:[ubyte]; timestamp:uint64; stage: int8; time:uint64; lcl:[ubyte]; - connections: [ByteArray]; - raw_inputs: [StringKeyValuePair]; //stage 0 inputs - hash_inputs:[string]; //stage > 0 inputs (hash of satge 0 inputs) - raw_outputs: [StringKeyValuePair]; //stage 0 outputs - hash_outputs:[string]; //stage > 0 outputs (hash of satge 0 outputs) + users: [ByteArray]; + raw_inputs: [BytesKeyValuePair]; //stage 0 inputs + hash_inputs:[ByteArray]; //stage > 0 inputs (hash of stage 0 inputs) + raw_outputs: [BytesKeyValuePair]; //stage 0 outputs + hash_outputs:[ByteArray]; //stage > 0 outputs (hash of stage 0 outputs) state: State; } -table Npl { //NPL type message schema +table Npl_Message { //NPL type message schema pubkey:[ubyte]; timestamp:uint64; data:[ubyte]; @@ -38,16 +38,16 @@ table Npl { //NPL type message schema } table StateDifference { //Represent state difference by tracking created,updated and deleted state files. - created: [StringKeyValuePair]; //list of { fn => hash } - updated: [StringKeyValuePair]; - deleted: [StringKeyValuePair]; + created: [BytesKeyValuePair]; //list of { fn => hash } + updated: [BytesKeyValuePair]; + deleted: [BytesKeyValuePair]; } table State { previous: [ubyte]; // hash of the previous state current: [ubyte]; // hash of the current state difference: StateDifference; - patch: [StringKeyValuePair]; // fn -> bsdiff patch going from previous state to new state + patch: [BytesKeyValuePair]; // fn -> bsdiff patch going from previous state to new state } root_type Content; //root type for message content \ No newline at end of file diff --git a/src/p2p/message_content_generated.h b/src/p2p/message_content_generated.h index 92112163..d52af826 100644 --- a/src/p2p/message_content_generated.h +++ b/src/p2p/message_content_generated.h @@ -8,15 +8,15 @@ namespace p2p { -struct StringKeyValuePair; +struct BytesKeyValuePair; struct ByteArray; struct Content; -struct Proposal; +struct Proposal_Message; -struct Npl; +struct Npl_Message; struct StateDifference; @@ -24,17 +24,17 @@ struct State; enum Message { Message_NONE = 0, - Message_Proposal = 1, - Message_Npl = 2, + Message_Proposal_Message = 1, + Message_Npl_Message = 2, Message_MIN = Message_NONE, - Message_MAX = Message_Npl + Message_MAX = Message_Npl_Message }; inline const Message (&EnumValuesMessage())[3] { static const Message values[] = { Message_NONE, - Message_Proposal, - Message_Npl + Message_Proposal_Message, + Message_Npl_Message }; return values; } @@ -42,15 +42,15 @@ inline const Message (&EnumValuesMessage())[3] { inline const char * const *EnumNamesMessage() { static const char * const names[] = { "NONE", - "Proposal", - "Npl", + "Proposal_Message", + "Npl_Message", nullptr }; return names; } inline const char *EnumNameMessage(Message e) { - if (e < Message_NONE || e > Message_Npl) return ""; + if (e < Message_NONE || e > Message_Npl_Message) return ""; const size_t index = static_cast(e); return EnumNamesMessage()[index]; } @@ -59,18 +59,18 @@ template struct MessageTraits { static const Message enum_value = Message_NONE; }; -template<> struct MessageTraits { - static const Message enum_value = Message_Proposal; +template<> struct MessageTraits { + static const Message enum_value = Message_Proposal_Message; }; -template<> struct MessageTraits { - static const Message enum_value = Message_Npl; +template<> struct MessageTraits { + static const Message enum_value = Message_Npl_Message; }; bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Message type); bool VerifyMessageVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector> *values, const flatbuffers::Vector *types); -struct StringKeyValuePair FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { +struct BytesKeyValuePair FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_KEY = 4, VT_VALUE = 6 @@ -81,60 +81,60 @@ struct StringKeyValuePair FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { flatbuffers::Vector *mutable_key() { return GetPointer *>(VT_KEY); } - const flatbuffers::String *value() const { - return GetPointer(VT_VALUE); + const flatbuffers::Vector *value() const { + return GetPointer *>(VT_VALUE); } - flatbuffers::String *mutable_value() { - return GetPointer(VT_VALUE); + flatbuffers::Vector *mutable_value() { + return GetPointer *>(VT_VALUE); } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && VerifyOffset(verifier, VT_KEY) && verifier.VerifyVector(key()) && VerifyOffset(verifier, VT_VALUE) && - verifier.VerifyString(value()) && + verifier.VerifyVector(value()) && verifier.EndTable(); } }; -struct StringKeyValuePairBuilder { +struct BytesKeyValuePairBuilder { flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; void add_key(flatbuffers::Offset> key) { - fbb_.AddOffset(StringKeyValuePair::VT_KEY, key); + fbb_.AddOffset(BytesKeyValuePair::VT_KEY, key); } - void add_value(flatbuffers::Offset value) { - fbb_.AddOffset(StringKeyValuePair::VT_VALUE, value); + void add_value(flatbuffers::Offset> value) { + fbb_.AddOffset(BytesKeyValuePair::VT_VALUE, value); } - explicit StringKeyValuePairBuilder(flatbuffers::FlatBufferBuilder &_fbb) + explicit BytesKeyValuePairBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { start_ = fbb_.StartTable(); } - StringKeyValuePairBuilder &operator=(const StringKeyValuePairBuilder &); - flatbuffers::Offset Finish() { + BytesKeyValuePairBuilder &operator=(const BytesKeyValuePairBuilder &); + flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); - auto o = flatbuffers::Offset(end); + auto o = flatbuffers::Offset(end); return o; } }; -inline flatbuffers::Offset CreateStringKeyValuePair( +inline flatbuffers::Offset CreateBytesKeyValuePair( flatbuffers::FlatBufferBuilder &_fbb, flatbuffers::Offset> key = 0, - flatbuffers::Offset value = 0) { - StringKeyValuePairBuilder builder_(_fbb); + flatbuffers::Offset> value = 0) { + BytesKeyValuePairBuilder builder_(_fbb); builder_.add_value(value); builder_.add_key(key); return builder_.Finish(); } -inline flatbuffers::Offset CreateStringKeyValuePairDirect( +inline flatbuffers::Offset CreateBytesKeyValuePairDirect( flatbuffers::FlatBufferBuilder &_fbb, const std::vector *key = nullptr, - const char *value = nullptr) { + const std::vector *value = nullptr) { auto key__ = key ? _fbb.CreateVector(*key) : 0; - auto value__ = value ? _fbb.CreateString(value) : 0; - return p2p::CreateStringKeyValuePair( + auto value__ = value ? _fbb.CreateVector(*value) : 0; + return p2p::CreateBytesKeyValuePair( _fbb, key__, value__); @@ -208,11 +208,11 @@ struct Content FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { return GetPointer(VT_MESSAGE); } template const T *message_as() const; - const Proposal *message_as_Proposal() const { - return message_type() == Message_Proposal ? static_cast(message()) : nullptr; + const Proposal_Message *message_as_Proposal_Message() const { + return message_type() == Message_Proposal_Message ? static_cast(message()) : nullptr; } - const Npl *message_as_Npl() const { - return message_type() == Message_Npl ? static_cast(message()) : nullptr; + const Npl_Message *message_as_Npl_Message() const { + return message_type() == Message_Npl_Message ? static_cast(message()) : nullptr; } void *mutable_message() { return GetPointer(VT_MESSAGE); @@ -226,12 +226,12 @@ struct Content FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { } }; -template<> inline const Proposal *Content::message_as() const { - return message_as_Proposal(); +template<> inline const Proposal_Message *Content::message_as() const { + return message_as_Proposal_Message(); } -template<> inline const Npl *Content::message_as() const { - return message_as_Npl(); +template<> inline const Npl_Message *Content::message_as() const { + return message_as_Npl_Message(); } struct ContentBuilder { @@ -265,14 +265,14 @@ inline flatbuffers::Offset CreateContent( return builder_.Finish(); } -struct Proposal FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { +struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_PUBKEY = 4, VT_TIMESTAMP = 6, VT_STAGE = 8, VT_TIME = 10, VT_LCL = 12, - VT_CONNECTIONS = 14, + VT_USERS = 14, VT_RAW_INPUTS = 16, VT_HASH_INPUTS = 18, VT_RAW_OUTPUTS = 20, @@ -309,35 +309,35 @@ struct Proposal FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { flatbuffers::Vector *mutable_lcl() { return GetPointer *>(VT_LCL); } - const flatbuffers::Vector> *connections() const { - return GetPointer> *>(VT_CONNECTIONS); + const flatbuffers::Vector> *users() const { + return GetPointer> *>(VT_USERS); } - flatbuffers::Vector> *mutable_connections() { - return GetPointer> *>(VT_CONNECTIONS); + flatbuffers::Vector> *mutable_users() { + return GetPointer> *>(VT_USERS); } - const flatbuffers::Vector> *raw_inputs() const { - return GetPointer> *>(VT_RAW_INPUTS); + const flatbuffers::Vector> *raw_inputs() const { + return GetPointer> *>(VT_RAW_INPUTS); } - flatbuffers::Vector> *mutable_raw_inputs() { - return GetPointer> *>(VT_RAW_INPUTS); + flatbuffers::Vector> *mutable_raw_inputs() { + return GetPointer> *>(VT_RAW_INPUTS); } - const flatbuffers::Vector> *hash_inputs() const { - return GetPointer> *>(VT_HASH_INPUTS); + const flatbuffers::Vector> *hash_inputs() const { + return GetPointer> *>(VT_HASH_INPUTS); } - flatbuffers::Vector> *mutable_hash_inputs() { - return GetPointer> *>(VT_HASH_INPUTS); + flatbuffers::Vector> *mutable_hash_inputs() { + return GetPointer> *>(VT_HASH_INPUTS); } - const flatbuffers::Vector> *raw_outputs() const { - return GetPointer> *>(VT_RAW_OUTPUTS); + const flatbuffers::Vector> *raw_outputs() const { + return GetPointer> *>(VT_RAW_OUTPUTS); } - flatbuffers::Vector> *mutable_raw_outputs() { - return GetPointer> *>(VT_RAW_OUTPUTS); + flatbuffers::Vector> *mutable_raw_outputs() { + return GetPointer> *>(VT_RAW_OUTPUTS); } - const flatbuffers::Vector> *hash_outputs() const { - return GetPointer> *>(VT_HASH_OUTPUTS); + const flatbuffers::Vector> *hash_outputs() const { + return GetPointer> *>(VT_HASH_OUTPUTS); } - flatbuffers::Vector> *mutable_hash_outputs() { - return GetPointer> *>(VT_HASH_OUTPUTS); + flatbuffers::Vector> *mutable_hash_outputs() { + return GetPointer> *>(VT_HASH_OUTPUTS); } const State *state() const { return GetPointer(VT_STATE); @@ -354,89 +354,89 @@ struct Proposal FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VerifyField(verifier, VT_TIME) && VerifyOffset(verifier, VT_LCL) && verifier.VerifyVector(lcl()) && - VerifyOffset(verifier, VT_CONNECTIONS) && - verifier.VerifyVector(connections()) && - verifier.VerifyVectorOfTables(connections()) && + VerifyOffset(verifier, VT_USERS) && + verifier.VerifyVector(users()) && + verifier.VerifyVectorOfTables(users()) && VerifyOffset(verifier, VT_RAW_INPUTS) && verifier.VerifyVector(raw_inputs()) && verifier.VerifyVectorOfTables(raw_inputs()) && VerifyOffset(verifier, VT_HASH_INPUTS) && verifier.VerifyVector(hash_inputs()) && - verifier.VerifyVectorOfStrings(hash_inputs()) && + verifier.VerifyVectorOfTables(hash_inputs()) && VerifyOffset(verifier, VT_RAW_OUTPUTS) && verifier.VerifyVector(raw_outputs()) && verifier.VerifyVectorOfTables(raw_outputs()) && VerifyOffset(verifier, VT_HASH_OUTPUTS) && verifier.VerifyVector(hash_outputs()) && - verifier.VerifyVectorOfStrings(hash_outputs()) && + verifier.VerifyVectorOfTables(hash_outputs()) && VerifyOffset(verifier, VT_STATE) && verifier.VerifyTable(state()) && verifier.EndTable(); } }; -struct ProposalBuilder { +struct Proposal_MessageBuilder { flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; void add_pubkey(flatbuffers::Offset> pubkey) { - fbb_.AddOffset(Proposal::VT_PUBKEY, pubkey); + fbb_.AddOffset(Proposal_Message::VT_PUBKEY, pubkey); } void add_timestamp(uint64_t timestamp) { - fbb_.AddElement(Proposal::VT_TIMESTAMP, timestamp, 0); + fbb_.AddElement(Proposal_Message::VT_TIMESTAMP, timestamp, 0); } void add_stage(int8_t stage) { - fbb_.AddElement(Proposal::VT_STAGE, stage, 0); + fbb_.AddElement(Proposal_Message::VT_STAGE, stage, 0); } void add_time(uint64_t time) { - fbb_.AddElement(Proposal::VT_TIME, time, 0); + fbb_.AddElement(Proposal_Message::VT_TIME, time, 0); } void add_lcl(flatbuffers::Offset> lcl) { - fbb_.AddOffset(Proposal::VT_LCL, lcl); + fbb_.AddOffset(Proposal_Message::VT_LCL, lcl); } - void add_connections(flatbuffers::Offset>> connections) { - fbb_.AddOffset(Proposal::VT_CONNECTIONS, connections); + void add_users(flatbuffers::Offset>> users) { + fbb_.AddOffset(Proposal_Message::VT_USERS, users); } - void add_raw_inputs(flatbuffers::Offset>> raw_inputs) { - fbb_.AddOffset(Proposal::VT_RAW_INPUTS, raw_inputs); + void add_raw_inputs(flatbuffers::Offset>> raw_inputs) { + fbb_.AddOffset(Proposal_Message::VT_RAW_INPUTS, raw_inputs); } - void add_hash_inputs(flatbuffers::Offset>> hash_inputs) { - fbb_.AddOffset(Proposal::VT_HASH_INPUTS, hash_inputs); + void add_hash_inputs(flatbuffers::Offset>> hash_inputs) { + fbb_.AddOffset(Proposal_Message::VT_HASH_INPUTS, hash_inputs); } - void add_raw_outputs(flatbuffers::Offset>> raw_outputs) { - fbb_.AddOffset(Proposal::VT_RAW_OUTPUTS, raw_outputs); + void add_raw_outputs(flatbuffers::Offset>> raw_outputs) { + fbb_.AddOffset(Proposal_Message::VT_RAW_OUTPUTS, raw_outputs); } - void add_hash_outputs(flatbuffers::Offset>> hash_outputs) { - fbb_.AddOffset(Proposal::VT_HASH_OUTPUTS, hash_outputs); + void add_hash_outputs(flatbuffers::Offset>> hash_outputs) { + fbb_.AddOffset(Proposal_Message::VT_HASH_OUTPUTS, hash_outputs); } void add_state(flatbuffers::Offset state) { - fbb_.AddOffset(Proposal::VT_STATE, state); + fbb_.AddOffset(Proposal_Message::VT_STATE, state); } - explicit ProposalBuilder(flatbuffers::FlatBufferBuilder &_fbb) + explicit Proposal_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { start_ = fbb_.StartTable(); } - ProposalBuilder &operator=(const ProposalBuilder &); - flatbuffers::Offset Finish() { + Proposal_MessageBuilder &operator=(const Proposal_MessageBuilder &); + flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); - auto o = flatbuffers::Offset(end); + auto o = flatbuffers::Offset(end); return o; } }; -inline flatbuffers::Offset CreateProposal( +inline flatbuffers::Offset CreateProposal_Message( flatbuffers::FlatBufferBuilder &_fbb, flatbuffers::Offset> pubkey = 0, uint64_t timestamp = 0, int8_t stage = 0, uint64_t time = 0, flatbuffers::Offset> lcl = 0, - flatbuffers::Offset>> connections = 0, - flatbuffers::Offset>> raw_inputs = 0, - flatbuffers::Offset>> hash_inputs = 0, - flatbuffers::Offset>> raw_outputs = 0, - flatbuffers::Offset>> hash_outputs = 0, + flatbuffers::Offset>> users = 0, + flatbuffers::Offset>> raw_inputs = 0, + flatbuffers::Offset>> hash_inputs = 0, + flatbuffers::Offset>> raw_outputs = 0, + flatbuffers::Offset>> hash_outputs = 0, flatbuffers::Offset state = 0) { - ProposalBuilder builder_(_fbb); + Proposal_MessageBuilder builder_(_fbb); builder_.add_time(time); builder_.add_timestamp(timestamp); builder_.add_state(state); @@ -444,41 +444,41 @@ inline flatbuffers::Offset CreateProposal( builder_.add_raw_outputs(raw_outputs); builder_.add_hash_inputs(hash_inputs); builder_.add_raw_inputs(raw_inputs); - builder_.add_connections(connections); + builder_.add_users(users); builder_.add_lcl(lcl); builder_.add_pubkey(pubkey); builder_.add_stage(stage); return builder_.Finish(); } -inline flatbuffers::Offset CreateProposalDirect( +inline flatbuffers::Offset CreateProposal_MessageDirect( flatbuffers::FlatBufferBuilder &_fbb, const std::vector *pubkey = nullptr, uint64_t timestamp = 0, int8_t stage = 0, uint64_t time = 0, const std::vector *lcl = nullptr, - const std::vector> *connections = nullptr, - const std::vector> *raw_inputs = nullptr, - const std::vector> *hash_inputs = nullptr, - const std::vector> *raw_outputs = nullptr, - const std::vector> *hash_outputs = nullptr, + const std::vector> *users = nullptr, + const std::vector> *raw_inputs = nullptr, + const std::vector> *hash_inputs = nullptr, + const std::vector> *raw_outputs = nullptr, + const std::vector> *hash_outputs = nullptr, flatbuffers::Offset state = 0) { auto pubkey__ = pubkey ? _fbb.CreateVector(*pubkey) : 0; auto lcl__ = lcl ? _fbb.CreateVector(*lcl) : 0; - auto connections__ = connections ? _fbb.CreateVector>(*connections) : 0; - auto raw_inputs__ = raw_inputs ? _fbb.CreateVector>(*raw_inputs) : 0; - auto hash_inputs__ = hash_inputs ? _fbb.CreateVector>(*hash_inputs) : 0; - auto raw_outputs__ = raw_outputs ? _fbb.CreateVector>(*raw_outputs) : 0; - auto hash_outputs__ = hash_outputs ? _fbb.CreateVector>(*hash_outputs) : 0; - return p2p::CreateProposal( + auto users__ = users ? _fbb.CreateVector>(*users) : 0; + auto raw_inputs__ = raw_inputs ? _fbb.CreateVector>(*raw_inputs) : 0; + auto hash_inputs__ = hash_inputs ? _fbb.CreateVector>(*hash_inputs) : 0; + auto raw_outputs__ = raw_outputs ? _fbb.CreateVector>(*raw_outputs) : 0; + auto hash_outputs__ = hash_outputs ? _fbb.CreateVector>(*hash_outputs) : 0; + return p2p::CreateProposal_Message( _fbb, pubkey__, timestamp, stage, time, lcl__, - connections__, + users__, raw_inputs__, hash_inputs__, raw_outputs__, @@ -486,7 +486,7 @@ inline flatbuffers::Offset CreateProposalDirect( state); } -struct Npl FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { +struct Npl_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_PUBKEY = 4, VT_TIMESTAMP = 6, @@ -530,40 +530,40 @@ struct Npl FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { } }; -struct NplBuilder { +struct Npl_MessageBuilder { flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; void add_pubkey(flatbuffers::Offset> pubkey) { - fbb_.AddOffset(Npl::VT_PUBKEY, pubkey); + fbb_.AddOffset(Npl_Message::VT_PUBKEY, pubkey); } void add_timestamp(uint64_t timestamp) { - fbb_.AddElement(Npl::VT_TIMESTAMP, timestamp, 0); + fbb_.AddElement(Npl_Message::VT_TIMESTAMP, timestamp, 0); } void add_data(flatbuffers::Offset> data) { - fbb_.AddOffset(Npl::VT_DATA, data); + fbb_.AddOffset(Npl_Message::VT_DATA, data); } void add_lcl(flatbuffers::Offset> lcl) { - fbb_.AddOffset(Npl::VT_LCL, lcl); + fbb_.AddOffset(Npl_Message::VT_LCL, lcl); } - explicit NplBuilder(flatbuffers::FlatBufferBuilder &_fbb) + explicit Npl_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { start_ = fbb_.StartTable(); } - NplBuilder &operator=(const NplBuilder &); - flatbuffers::Offset Finish() { + Npl_MessageBuilder &operator=(const Npl_MessageBuilder &); + flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); - auto o = flatbuffers::Offset(end); + auto o = flatbuffers::Offset(end); return o; } }; -inline flatbuffers::Offset CreateNpl( +inline flatbuffers::Offset CreateNpl_Message( flatbuffers::FlatBufferBuilder &_fbb, flatbuffers::Offset> pubkey = 0, uint64_t timestamp = 0, flatbuffers::Offset> data = 0, flatbuffers::Offset> lcl = 0) { - NplBuilder builder_(_fbb); + Npl_MessageBuilder builder_(_fbb); builder_.add_timestamp(timestamp); builder_.add_lcl(lcl); builder_.add_data(data); @@ -571,7 +571,7 @@ inline flatbuffers::Offset CreateNpl( return builder_.Finish(); } -inline flatbuffers::Offset CreateNplDirect( +inline flatbuffers::Offset CreateNpl_MessageDirect( flatbuffers::FlatBufferBuilder &_fbb, const std::vector *pubkey = nullptr, uint64_t timestamp = 0, @@ -580,7 +580,7 @@ inline flatbuffers::Offset CreateNplDirect( auto pubkey__ = pubkey ? _fbb.CreateVector(*pubkey) : 0; auto data__ = data ? _fbb.CreateVector(*data) : 0; auto lcl__ = lcl ? _fbb.CreateVector(*lcl) : 0; - return p2p::CreateNpl( + return p2p::CreateNpl_Message( _fbb, pubkey__, timestamp, @@ -594,23 +594,23 @@ struct StateDifference FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VT_UPDATED = 6, VT_DELETED = 8 }; - const flatbuffers::Vector> *created() const { - return GetPointer> *>(VT_CREATED); + const flatbuffers::Vector> *created() const { + return GetPointer> *>(VT_CREATED); } - flatbuffers::Vector> *mutable_created() { - return GetPointer> *>(VT_CREATED); + flatbuffers::Vector> *mutable_created() { + return GetPointer> *>(VT_CREATED); } - const flatbuffers::Vector> *updated() const { - return GetPointer> *>(VT_UPDATED); + const flatbuffers::Vector> *updated() const { + return GetPointer> *>(VT_UPDATED); } - flatbuffers::Vector> *mutable_updated() { - return GetPointer> *>(VT_UPDATED); + flatbuffers::Vector> *mutable_updated() { + return GetPointer> *>(VT_UPDATED); } - const flatbuffers::Vector> *deleted() const { - return GetPointer> *>(VT_DELETED); + const flatbuffers::Vector> *deleted() const { + return GetPointer> *>(VT_DELETED); } - flatbuffers::Vector> *mutable_deleted() { - return GetPointer> *>(VT_DELETED); + flatbuffers::Vector> *mutable_deleted() { + return GetPointer> *>(VT_DELETED); } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && @@ -630,13 +630,13 @@ struct StateDifference FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { struct StateDifferenceBuilder { flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; - void add_created(flatbuffers::Offset>> created) { + void add_created(flatbuffers::Offset>> created) { fbb_.AddOffset(StateDifference::VT_CREATED, created); } - void add_updated(flatbuffers::Offset>> updated) { + void add_updated(flatbuffers::Offset>> updated) { fbb_.AddOffset(StateDifference::VT_UPDATED, updated); } - void add_deleted(flatbuffers::Offset>> deleted) { + void add_deleted(flatbuffers::Offset>> deleted) { fbb_.AddOffset(StateDifference::VT_DELETED, deleted); } explicit StateDifferenceBuilder(flatbuffers::FlatBufferBuilder &_fbb) @@ -653,9 +653,9 @@ struct StateDifferenceBuilder { inline flatbuffers::Offset CreateStateDifference( flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset>> created = 0, - flatbuffers::Offset>> updated = 0, - flatbuffers::Offset>> deleted = 0) { + flatbuffers::Offset>> created = 0, + flatbuffers::Offset>> updated = 0, + flatbuffers::Offset>> deleted = 0) { StateDifferenceBuilder builder_(_fbb); builder_.add_deleted(deleted); builder_.add_updated(updated); @@ -665,12 +665,12 @@ inline flatbuffers::Offset CreateStateDifference( inline flatbuffers::Offset CreateStateDifferenceDirect( flatbuffers::FlatBufferBuilder &_fbb, - const std::vector> *created = nullptr, - const std::vector> *updated = nullptr, - const std::vector> *deleted = nullptr) { - auto created__ = created ? _fbb.CreateVector>(*created) : 0; - auto updated__ = updated ? _fbb.CreateVector>(*updated) : 0; - auto deleted__ = deleted ? _fbb.CreateVector>(*deleted) : 0; + const std::vector> *created = nullptr, + const std::vector> *updated = nullptr, + const std::vector> *deleted = nullptr) { + auto created__ = created ? _fbb.CreateVector>(*created) : 0; + auto updated__ = updated ? _fbb.CreateVector>(*updated) : 0; + auto deleted__ = deleted ? _fbb.CreateVector>(*deleted) : 0; return p2p::CreateStateDifference( _fbb, created__, @@ -703,11 +703,11 @@ struct State FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { StateDifference *mutable_difference() { return GetPointer(VT_DIFFERENCE); } - const flatbuffers::Vector> *patch() const { - return GetPointer> *>(VT_PATCH); + const flatbuffers::Vector> *patch() const { + return GetPointer> *>(VT_PATCH); } - flatbuffers::Vector> *mutable_patch() { - return GetPointer> *>(VT_PATCH); + flatbuffers::Vector> *mutable_patch() { + return GetPointer> *>(VT_PATCH); } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && @@ -736,7 +736,7 @@ struct StateBuilder { void add_difference(flatbuffers::Offset difference) { fbb_.AddOffset(State::VT_DIFFERENCE, difference); } - void add_patch(flatbuffers::Offset>> patch) { + void add_patch(flatbuffers::Offset>> patch) { fbb_.AddOffset(State::VT_PATCH, patch); } explicit StateBuilder(flatbuffers::FlatBufferBuilder &_fbb) @@ -756,7 +756,7 @@ inline flatbuffers::Offset CreateState( flatbuffers::Offset> previous = 0, flatbuffers::Offset> current = 0, flatbuffers::Offset difference = 0, - flatbuffers::Offset>> patch = 0) { + flatbuffers::Offset>> patch = 0) { StateBuilder builder_(_fbb); builder_.add_patch(patch); builder_.add_difference(difference); @@ -770,10 +770,10 @@ inline flatbuffers::Offset CreateStateDirect( const std::vector *previous = nullptr, const std::vector *current = nullptr, flatbuffers::Offset difference = 0, - const std::vector> *patch = nullptr) { + const std::vector> *patch = nullptr) { auto previous__ = previous ? _fbb.CreateVector(*previous) : 0; auto current__ = current ? _fbb.CreateVector(*current) : 0; - auto patch__ = patch ? _fbb.CreateVector>(*patch) : 0; + auto patch__ = patch ? _fbb.CreateVector>(*patch) : 0; return p2p::CreateState( _fbb, previous__, @@ -787,12 +787,12 @@ inline bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Mess case Message_NONE: { return true; } - case Message_Proposal: { - auto ptr = reinterpret_cast(obj); + case Message_Proposal_Message: { + auto ptr = reinterpret_cast(obj); return verifier.VerifyTable(ptr); } - case Message_Npl: { - auto ptr = reinterpret_cast(obj); + case Message_Npl_Message: { + auto ptr = reinterpret_cast(obj); return verifier.VerifyTable(ptr); } default: return false; diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 49572f56..9d766ec6 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -11,10 +11,17 @@ namespace ssl = boost::asio::ssl; // from namespace p2p { + +/** + * Holds all the messages until they are processed by consensus. + */ +message_collection collected_msgs; + /** * Peer connections exposing to the application */ std::unordered_map *> peer_connections; +std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions. /** * Peer session handler instance. This instance's methods will be fired for any peer socket activity. @@ -47,7 +54,7 @@ std::thread peer_thread; */ sock::session_options sess_opts; -std::map recent_peer_msghash; +std::map recent_peer_msghash; int init() { @@ -103,72 +110,4 @@ void peer_connection_watchdog() } } -/** - * Validate the incoming p2p message. Check for message version, timestamp and signature. - * - * @param message binary message content. - * @param signature binary message signature. - * @param pubkey binary public key of message originating node. - * @param timestamp message timestamp. - * @param version message timestamp. - * @return whether message is validated or not. - */ -bool validate_peer_message(std::string_view message, std::string_view signature, std::string_view pubkey, time_t timestamp, uint16_t version) -{ - //Validation are prioritzed base on expensiveness of validation. - //i.e - signature validation is done at the end. - - std::time_t time_now = std::time(nullptr); - - //check protocol version of message whether it is greater than minimum supported protocol version. - if (version < util::MIN_PEERMSG_VERSION) - { - LOG_DBG << "Recieved message is from unsupported version"; - return false; - } - - // validate if the message is not from a node of current node's unl list. - if (!conf::cfg.unl.count(pubkey.data())) - { - LOG_DBG << "pubkey verification failed"; - return false; - } - - //check message timestamp. < timestamp now - 4* round time. - /*todo:this might change to check only current stage related. (Base on how consensus algorithm implementation take shape) - check message stage is for valid stage(node's current consensus stage - 1) - */ - if (timestamp < (time_now - conf::cfg.roundtime * 4)) - { - LOG_DBG << "Recieved message from peer is old"; - return false; - } - - //verify message signature. - //this should be the last validation since this is bit expensive - auto signature_verified = crypto::verify(message, signature, pubkey); - - if (signature_verified != 0) - { - LOG_DBG << "Signature verification failed"; - return false; - } - - // After signature is verified, get message hash and see wheteher - // message is already recieved -> abandon if duplicate. - auto messageHash = crypto::sha_512_hash(message, "PEERMSG", 7); - - if (recent_peer_msghash.count(messageHash) == 0) - { - recent_peer_msghash.try_emplace(std::move(messageHash), timestamp); - } - else - { - LOG_DBG << "Duplicate message"; - return false; - } - - return true; -} - } // namespace p2p \ No newline at end of file diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 634ccc50..1b1f51ed 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -1,21 +1,51 @@ #ifndef _HP_P2P_H_ #define _HP_P2P_H_ +#include #include +#include +#include #include "../sock/socket_session.hpp" #include "peer_session_handler.hpp" namespace p2p { + +struct proposal +{ + std::string pubkey; + int64_t timestamp; + int64_t time; + int8_t stage; + std::string lcl; + std::unordered_set users; + std::unordered_map raw_inputs; + std::unordered_set hash_inputs; + std::unordered_map raw_outputs; + std::unordered_set hash_outputs; +}; + +struct message_collection +{ + std::list proposals; + std::mutex proposals_mutex; // Mutex for proposals access race conditions. +}; + +/** + * Holds all the messages until they are processed by consensus. + */ +extern message_collection collected_msgs; + /** * This is used to store active peer connections mapped by the unique key of socket session */ -extern std::unordered_map< std::string, sock::socket_session *> peer_connections; +extern std::unordered_map *> peer_connections; +extern std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions. /** * This is used to store hash of recent peer messages: messagehash -> timestamp of message */ -extern std::map recent_peer_msghash; +extern std::map recent_peer_msghash; int init(); @@ -24,8 +54,6 @@ void start_peer_connections(); void peer_connection_watchdog(); -bool validate_peer_message(std::string_view message, std::string_view signature, std::string_view pubkey, time_t timestamp, uint16_t version); - } // namespace p2p #endif \ No newline at end of file diff --git a/src/p2p/peer_message_handler.cpp b/src/p2p/peer_message_handler.cpp new file mode 100644 index 00000000..eaf1bc31 --- /dev/null +++ b/src/p2p/peer_message_handler.cpp @@ -0,0 +1,346 @@ +#include +#include +#include "../conf.hpp" +#include "../crypto.hpp" +#include "../util.hpp" +#include "../hplog.hpp" +#include "peer_message_handler.hpp" +#include "message_content_generated.h" +#include "message_container_generated.h" + +namespace p2p +{ + +/** + * This section contains Flatbuffer message reading/writing helpers. + * These helpers are mainly used by peer_session_handler. + * + * All Flatbuffer peer messages are 'Container' messages. 'Container' message is a bucket + * which some common headers (version, singature etc..) and the message 'Content' (Proposal, NPL etc..). + * + * Therefore, when constructing peer messages, we have to first construct 'Content' message and then + * place the 'Content' inside a 'Conatiner. 'Content' and 'Container' messages are constructed using + * Flatbuffer builders. + * + * Reading is also 2 steps because of this. We have first interprit the 'Container' message from the + * received data and then interprit the 'Content' portion of it separately to read the actual content. + */ + +//---Message validation and reading helpers---/ + +/** + * Verifies Conatiner message structure and outputs faltbuffer Container pointer to access the given buffer. + * + * @param container_ref A pointer reference to assign the pointer to the Container object. + * @param container_bud The buffer containing the data that should validated and interpreted + * via the container pointer. + * @return 0 on successful verification. -1 for failure. + */ +int validate_and_extract_container(const Container **container_ref, std::string_view container_buf) +{ + //Accessing message buffer + const uint8_t *container_buf_ptr = reinterpret_cast(container_buf.data()); + size_t container_buf_size = container_buf.length(); + + //Defining Flatbuffer verifier (default max depth = 64, max_tables = 1000000,) + flatbuffers::Verifier container_verifier(container_buf_ptr, container_buf_size); + + //Verify container message using flatbuffer verifier + if (!VerifyContainerBuffer(container_verifier)) + { + LOG_DBG << "Flatbuffer verify: Bad container."; + return -1; + } + + //Get message container + const Container *container = GetContainer(container_buf_ptr); + + //check protocol version of message whether it is greater than minimum supported protocol version. + const uint16_t version = container->version(); + if (version < util::MIN_PEERMSG_VERSION) + { + LOG_DBG << "Recieved message is from unsupported protocol version (" << version << ")"; + return -1; + } + + //Assign container and content out params. + *container_ref = container; + return 0; +} + +/** + * Verifies the Content message structure and outputs faltbuffer Content pointer to access the given buffer. + * + * @param content_ref A pointer reference to assign the pointer to the Content object. + * @param content_ptr Pointer to the the buffer containing the data that should validated and interpreted + * via the container pointer. + * @param content_size Data buffer size. + * @return 0 on successful verification. -1 for failure. + */ +int validate_and_extract_content(const Content **content_ref, const uint8_t *content_ptr, flatbuffers::uoffset_t content_size) +{ + //Defining Flatbuffer verifier for message content verification. + //Since content is also serialised by using Flatbuffer we can verify it using Flatbuffer. + flatbuffers::Verifier content_verifier(content_ptr, content_size); + + //verify content message using flatbuffer verifier. + if (!VerifyContainerBuffer(content_verifier)) + { + LOG_DBG << "Flatbuffer verify: Bad content."; + return -1; + } + + *content_ref = GetContent(content_ptr); + return 0; +} + +/** + * Validate the incoming p2p message content on several criteria. + * + * @param message Message content data buffer. + * @param signature Binary message signature. + * @param pubkey Binary public key of message originating node. + * @param timestamp Message timestamp. + * @param version Message protocol version. + * @return 0 on successful validation. -1 for failure. + */ +int validate_content_message(std::string_view message, std::string_view signature, std::string_view pubkey, int64_t timestamp) +{ + //Validation are prioritzed base on expensiveness of validation. + //i.e - signature validation is done at the end. + + int64_t time_now = util::get_epoch_milliseconds(); + + // validate if the message is not from a node of current node's unl list. + if (!conf::cfg.unl.count(pubkey.data())) + { + LOG_DBG << "pubkey verification failed"; + return -1; + } + + //check message timestamp. < timestamp now - 4* round time. + /*todo:this might change to check only current stage related. (Base on how consensus algorithm implementation take shape) + check message stage is for valid stage(node's current consensus stage - 1) + */ + if (timestamp < (time_now - conf::cfg.roundtime * 4)) + { + LOG_DBG << "Recieved message from peer is old"; + return -1; + } + + //verify message signature. + //this should be the last validation since this is bit expensive + auto signature_verified = crypto::verify(message, signature, pubkey); + + if (signature_verified != 0) + { + LOG_DBG << "Signature verification failed"; + return -1; + } + + // After signature is verified, get message hash and see wheteher + // message is already recieved -> abandon if duplicate. + // auto messageHash = crypto::sha_512_hash(message, "PEERMSG", 7); + + // if (recent_peer_msghash.count(messageHash) == 0) + // { + // recent_peer_msghash.try_emplace(std::move(messageHash), timestamp); + // } + // else + // { + // LOG_DBG << "Duplicate message"; + // return -1; + // } + + return 0; +} + +/** + * Creates a proposal stuct from the given proposal message. + * @param The Flatbuffer poporal received from the peer. + * @return A proposal struct representing the message. + */ +const proposal create_proposal_from_msg(const Proposal_Message &msg) +{ + proposal p; + + if (msg.pubkey()) + p.pubkey = flatbuff_bytes_to_sv(msg.pubkey()); + + p.time = msg.time(); + p.timestamp = msg.timestamp(); + p.stage = msg.stage(); + + if (msg.lcl()) + p.lcl = flatbuff_bytes_to_sv(msg.lcl()); + + if (msg.users()) + p.users = flatbuf_bytearrayvector_to_stringlist(msg.users()); + + if (msg.raw_inputs()) + p.raw_inputs = flatbuf_pairvector_to_stringmap(msg.raw_inputs()); + + if (msg.hash_inputs()) + p.hash_inputs = flatbuf_bytearrayvector_to_stringlist(msg.hash_inputs()); + + if (msg.raw_outputs()) + p.raw_outputs = flatbuf_pairvector_to_stringmap(msg.raw_outputs()); + + if (msg.hash_outputs()) + p.hash_outputs = flatbuf_bytearrayvector_to_stringlist(msg.hash_outputs()); + + return p; +} + +//---Message creation helpers---// + +/** + * Ctreat proposal peer message from the given proposal struct. + * @param container_builder Flatbuffer builder for the container message. + * @param p The proposal struct to be placed in the container message. + */ +void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder, const proposal &p) +{ + // todo:get a average propsal message size and allocate content builder based on that. + flatbuffers::FlatBufferBuilder builder(1024); + + // Create dummy propsal message + flatbuffers::Offset proposal = + CreateProposal_Message( + builder, + sv_to_flatbuff_bytes(builder, conf::cfg.pubkey), + p.timestamp, + p.stage, + p.time, + sv_to_flatbuff_bytes(builder, p.lcl), + stringlist_to_flatbuf_bytearrayvector(builder, p.users), + stringmap_to_flatbuf_bytepairvector(builder, p.raw_inputs), + stringlist_to_flatbuf_bytearrayvector(builder, p.hash_inputs), + stringmap_to_flatbuf_bytepairvector(builder, p.raw_outputs), + stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs)); + + flatbuffers::Offset message = CreateContent(builder, Message_Proposal_Message, proposal.Union()); + builder.Finish(message); // Finished building message content to get serialised content. + + // Now that we have built the content message, + // we need to sign it and place it inside a container message. + create_containermsg_from_content(container_builder, builder); +} + +/** + * Creates a Flatbuffer container message from the given Content message. + * @param container_builder The Flatbuffer builder to which the final container message should be written to. + * @param content_builder The Flatbuffer builder containing the content message that should be placed + * inside the container message. + */ +void create_containermsg_from_content( + flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder) +{ + uint8_t *content_buf = content_builder.GetBufferPointer(); + flatbuffers::uoffset_t content_size = content_builder.GetSize(); + + // Create container message content from serialised content from previous step. + flatbuffers::Offset> content = container_builder.CreateVector(content_buf, content_size); + + // Sign message content with this node's private key. + std::string_view content_to_sign(reinterpret_cast(content_buf), content_size); + std::string sig = crypto::sign(content_to_sign, conf::cfg.seckey); + + flatbuffers::Offset container_message = CreateContainer( + container_builder, + util::PEERMSG_VERSION, + sv_to_flatbuff_bytes(container_builder, sig), //signature field + content); + + // Finish building message container to get serialised message. + container_builder.Finish(container_message); +} + +//---Conversion helpers from flatbuffers data types to std data types---// + +/** + * Returns string_view from flat buffer data pointer and length. + */ +std::string_view flatbuff_bytes_to_sv(const uint8_t *data, flatbuffers::uoffset_t length) +{ + const char *signature_content_str = reinterpret_cast(data); + return std::string_view(signature_content_str, length); +} + +/** + * Returns return string_view from Flat Buffer vector of bytes. + */ +std::string_view flatbuff_bytes_to_sv(const flatbuffers::Vector *buffer) +{ + return flatbuff_bytes_to_sv(buffer->Data(), buffer->size()); +} + +/** + * Returns set from Flatbuffer vector of ByteArrays. + */ +const std::unordered_set flatbuf_bytearrayvector_to_stringlist(const flatbuffers::Vector> *fbvec) +{ + std::unordered_set set; + set.reserve(fbvec->size()); + for (auto el : *fbvec) + set.emplace(std::string(flatbuff_bytes_to_sv(el->array()))); + return set; +} + +/** + * Returns a map from Flatbuffer vector of key value pairs. + */ +const std::unordered_map +flatbuf_pairvector_to_stringmap(const flatbuffers::Vector> *fbvec) +{ + std::unordered_map map; + map.reserve(fbvec->size()); + for (auto el : *fbvec) + map.emplace(flatbuff_bytes_to_sv(el->key()), flatbuff_bytes_to_sv(el->value())); + return map; +} + +//---Conversion helpers from std data types to flatbuffers data types---// +//---These are used in constructing Flatbuffer messages using builders---// + +/** + * Returns Flatbuffer bytes vector from string_view. + */ +const flatbuffers::Offset> +sv_to_flatbuff_bytes(flatbuffers::FlatBufferBuilder &builder, std::string_view sv) +{ + return builder.CreateVector(reinterpret_cast(sv.data()), sv.size()); +} + +/** + * Returns Flatbuffer vector of ByteArrays from given set of strings. + */ +const flatbuffers::Offset>> +stringlist_to_flatbuf_bytearrayvector(flatbuffers::FlatBufferBuilder &builder, const std::unordered_set &set) +{ + std::vector> fbvec; + fbvec.reserve(set.size()); + for (std::string_view str : set) + fbvec.push_back(CreateByteArray(builder, sv_to_flatbuff_bytes(builder, str))); + return builder.CreateVector(fbvec); +} + +/** + * Returns Flatbuffer vector of key value pairs from given map. + */ +const flatbuffers::Offset>> +stringmap_to_flatbuf_bytepairvector(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map &map) +{ + std::vector> fbvec; + fbvec.reserve(map.size()); + for (auto const &[key, value] : map) + { + fbvec.push_back(CreateBytesKeyValuePair( + builder, + sv_to_flatbuff_bytes(builder, key), + sv_to_flatbuff_bytes(builder, value))); + } + return builder.CreateVector(fbvec); +} + +} // namespace p2p \ No newline at end of file diff --git a/src/p2p/peer_message_handler.hpp b/src/p2p/peer_message_handler.hpp new file mode 100644 index 00000000..848795a4 --- /dev/null +++ b/src/p2p/peer_message_handler.hpp @@ -0,0 +1,56 @@ +#ifndef _HP_PEER_MESSAGE_HANDLER_H_ +#define _HP_PEER_MESSAGE_HANDLER_H_ + +#include +#include +#include "message_content_generated.h" +#include "message_container_generated.h" +#include "p2p.hpp" + +namespace p2p +{ +/** + * This section contains Flatbuffer message reading/writing helpers. + */ + +//---Message validation and reading helpers---/ + +int validate_and_extract_container(const Container **container_ref, std::string_view container_buf); + +int validate_and_extract_content(const Content **content_ref, const uint8_t *content_ptr, flatbuffers::uoffset_t content_size); + +int validate_content_message(std::string_view message, std::string_view signature, std::string_view pubkey, int64_t timestamp); + +const proposal create_proposal_from_msg(const Proposal_Message &msg); + +//---Message creation helpers---// + +void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder, const proposal &p); + +void create_containermsg_from_content( + flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder); + +//---Conversion helpers from flatbuffers data types to std data types---// + +std::string_view flatbuff_bytes_to_sv(const uint8_t *data, flatbuffers::uoffset_t length); + +std::string_view flatbuff_bytes_to_sv(const flatbuffers::Vector *buffer); + +const std::unordered_set flatbuf_bytearrayvector_to_stringlist(const flatbuffers::Vector> *fbvec); + +const std::unordered_map flatbuf_pairvector_to_stringmap(const flatbuffers::Vector> *fbvec); + +//---Conversion helpers from std data types to flatbuffers data types---// + +const flatbuffers::Offset> +sv_to_flatbuff_bytes(flatbuffers::FlatBufferBuilder &builder, std::string_view sv); + +const flatbuffers::Offset>> +stringlist_to_flatbuf_bytearrayvector(flatbuffers::FlatBufferBuilder &builder, const std::unordered_set &set); + +const flatbuffers::Offset>> +stringmap_to_flatbuf_bytepairvector(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map &map); + +} // namespace p2p + +#endif \ No newline at end of file diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index da785b5c..0b09d8d9 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -6,6 +6,7 @@ #include "../hplog.hpp" #include "p2p.hpp" #include "peer_session_handler.hpp" +#include "peer_message_handler.hpp" #include "message_content_generated.h" #include "message_container_generated.h" @@ -32,70 +33,6 @@ std::string_view peer_outbound_message::buffer() (*fbbuilder_ptr).GetSize()); } -//private method used to create a proposal message with dummy data. -//Will be similiar to consensus proposal creation in each stage. -const std::string create_message(flatbuffers::FlatBufferBuilder &container_builder) -{ - //todo:get a average propsal message size and allocate builder based on that. - /* - * todo: create custom vector allocator for protobuff in order to avoid copying buffer to string. - * includes overidding socket_session send method to support this as well. - */ - flatbuffers::FlatBufferBuilder builder(1024); - std::time_t timestamp = std::time(nullptr); - uint8_t stage = 0; - - std::string pubkey = conf::cfg.pubkey; - flatbuffers::Offset> pubkey_b = builder.CreateVector((uint8_t *)pubkey.data(), pubkey.size()); - - //create dummy propsal message - flatbuffers::Offset proposal = CreateProposal(builder, pubkey_b, timestamp, stage, timestamp); - flatbuffers::Offset message = CreateContent(builder, Message_Proposal, proposal.Union()); - builder.Finish(message); //finished building message content to get serialised content. - - //Get serialized/packed message content pointer and size. - uint8_t *buf = builder.GetBufferPointer(); - flatbuffers::uoffset_t size = builder.GetSize(); - - //Get a binary string_view for the serialised message content. - const char *content_str = reinterpret_cast(buf); - std::string_view message_content(content_str, size); - - //create container message content from serialised content from previous step. - flatbuffers::Offset> content = container_builder.CreateVector(buf, size); - - //Sign message content with node's private key. - std::string sig = crypto::sign(message_content, conf::cfg.seckey); - char *sig_buf = sig.data(); - flatbuffers::Offset> signature = container_builder.CreateVector((uint8_t *)sig_buf, sig.size()); //include signature to message - - flatbuffers::Offset container_message = CreateContainer(container_builder, util::MIN_PEERMSG_VERSION, signature, content); - container_builder.Finish(container_message); //finished building message container to get serialised message. - - flatbuffers::uoffset_t buf_size = container_builder.GetSize(); - uint8_t *message_buf = container_builder.GetBufferPointer(); - - //todo: should return buffer_pointer to socket. - return std::string((char *)message_buf, buf_size); -} - -/** - * Private method to return string_view from flat buffer data pointer and length. - */ -std::string_view flatbuff_bytes_to_sv(const uint8_t *data, flatbuffers::uoffset_t length) -{ - const char *signature_content_str = reinterpret_cast(data); - return std::string_view(signature_content_str, length); -} - -/** - * Private method to return string_view from Flat Buffer vector of bytes. - */ -std::string_view flatbuff_bytes_to_sv(const flatbuffers::Vector *buffer) -{ - return flatbuff_bytes_to_sv(buffer->Data(), buffer->size()); -} - /** * This gets hit every time a peer connects to HP via the peer port (configured in contract config). */ @@ -105,15 +42,20 @@ void peer_session_handler::on_connect(sock::socket_sessioninit_uniqueid(); - peer_connections.insert(std::make_pair(session->uniqueid, session)); + { + std::lock_guard lock(p2p::peer_connections_mutex); + peer_connections.insert(std::make_pair(session->uniqueid, session)); + } LOG_DBG << "Adding peer to list: " << session->uniqueid; } else { // todo: set container builder defualt builder size to combination of serialized content length + signature length(which is fixed) - peer_outbound_message msg(std::make_shared(1024)); - std::string message = create_message(msg.builder()); - session->send(msg); + // peer_outbound_message msg(std::make_shared(1024)); + + // proposal p; + // create_msg_from_proposal(msg.builder(), p); + // session->send(msg); } } @@ -121,93 +63,66 @@ void peer_session_handler::on_connect(sock::socket_session *session, std::string_view message) { - //Accessing message buffer - const uint8_t *container_pointer = reinterpret_cast(message.data()); - size_t container_length = message.length(); + const Container *container; + if (validate_and_extract_container(&container, message) != 0) + return; - //Defining Flatbuffer verifier (default max depth = 64, max_tables = 1000000,) - flatbuffers::Verifier container_verifier(container_pointer, container_length); + //Get serialised message content. + const flatbuffers::Vector *container_content = container->content(); - //Verify container message using flatbuffer verifier - if (VerifyContainerBuffer(container_verifier)) + //Accessing message content and size. + const uint8_t *content_ptr = container_content->Data(); + flatbuffers::uoffset_t content_size = container_content->size(); + + const Content *content; + if (validate_and_extract_content(&content, content_ptr, content_size) != 0) + return; + + p2p::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc + + if (content_message_type == Message_Proposal_Message) //message is a proposal message { - //Get message container - const p2p::Container *container = GetContainer(container_pointer); - const uint16_t version = container->version(); + const Proposal_Message *proposalmsg = content->message_as_Proposal_Message(); + + //validate message for malleability, timeliness, signature and prune recieving messages. + bool val_result = validate_content_message( + flatbuff_bytes_to_sv(content_ptr, content_size), + flatbuff_bytes_to_sv(container->signature()), + flatbuff_bytes_to_sv(proposalmsg->pubkey()), + proposalmsg->timestamp()); - //Get serialised message content. - const flatbuffers::Vector *container_content = container->content(); - - //Accessing message content and size. - const uint8_t *content_pointer = container_content->Data(); - flatbuffers::uoffset_t content_size = container_content->size(); - - //Defining Flatbuffer verifier for content message verification. - //Since content is also serialised by using Filterbuf we can verify it using Filterbuffer. - flatbuffers::Verifier content_verifier(content_pointer, content_size); - - //verify content message conent using flatbuffer verifier. - if (VerifyContainerBuffer(content_verifier)) + if (val_result == 0) { - //Get message content. - const Content *content = GetContent(content_pointer); - p2p::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc - - if (content_message_type == Message_Proposal) //message is a proposal message - { - const Proposal *proposal = content->message_as_Proposal(); - uint64_t timestamp = proposal->timestamp(); - - //Get public key of message originating node. - std::string_view message_pubkey = flatbuff_bytes_to_sv(proposal->pubkey()); - - //Get signature from container message. - std::string_view message_signature = flatbuff_bytes_to_sv(container->signature()); - - std::string_view message_content = flatbuff_bytes_to_sv(content_pointer, content_size); - - //validate message for malleability, timeliness, signature and prune recieving messages. - bool validated = p2p::validate_peer_message(message_content, message_signature, message_pubkey, timestamp, version); - if (validated) - { - //if validated send message to consensus. - //if validated broadcast message. - } - else - { - LOG_DBG << "Message validation failed"; - } - } - else if (content_message_type == Message_Npl) //message is a proposal message - { - const Npl *npl = content->message_as_Npl(); - // execute npl logic here. - //broadcast message. - } - else - { - //warn received invalid message from peer. - LOG_DBG << "Received invalid message type from peer"; - //remove/penalize node who sent the message. - } + std::lock_guard lock(collected_msgs.proposals_mutex); + collected_msgs.proposals.push_back(create_proposal_from_msg(*proposalmsg)); } else { - //warn bad message content. - LOG_DBG << "Bad message content"; + LOG_DBG << "Message content field validation failed"; } } + else if (content_message_type == Message_Npl_Message) //message is a NPL message + { + const Npl_Message *npl = content->message_as_Npl_Message(); + // execute npl logic here. + //broadcast message. + } else { - //warn bad messages from peer. - LOG_DBG << "Bad message from peer"; + //warn received invalid message from peer. + LOG_DBG << "Received invalid message type from peer"; + //TODO: remove/penalize node who sent the message. } } //peer session on message callback method void peer_session_handler::on_close(sock::socket_session *session) { - LOG_DBG << "on_closing peer :" << session->uniqueid; + { + std::lock_guard lock(p2p::peer_connections_mutex); + peer_connections.erase(session->uniqueid); + } + LOG_DBG << "Peer disonnected: " << session->uniqueid; } } // namespace p2p \ No newline at end of file diff --git a/src/p2p/peer_session_handler.hpp b/src/p2p/peer_session_handler.hpp index c0f306e4..36e76c9f 100644 --- a/src/p2p/peer_session_handler.hpp +++ b/src/p2p/peer_session_handler.hpp @@ -1,5 +1,5 @@ -#ifndef _HP_P2P_SESSION_H_ -#define _HP_P2P_SESSION_H_ +#ifndef _HP_PEER_SESSION_HANDLER_H_ +#define _HP_PEER_SESSION_HANDLER_H_ #include #include diff --git a/src/proc.hpp b/src/proc.hpp index d97f9d6d..d1a6644d 100644 --- a/src/proc.hpp +++ b/src/proc.hpp @@ -40,10 +40,10 @@ struct ContractExecArgs std::pair &hpscbufs; // Current HotPocket timestamp. - uint64_t timestamp; + int64_t timestamp; ContractExecArgs( - uint64_t _timestamp, + int64_t _timestamp, contract_bufmap &_userbufs, contract_bufmap &_nplbufs, std::pair &_hpscbufs) : diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp index 72fc3b2d..908b4d56 100644 --- a/src/usr/user_session_handler.cpp +++ b/src/usr/user_session_handler.cpp @@ -27,7 +27,7 @@ user_outbound_message::user_outbound_message(std::string &&_msg) // Returns the buffer that should be written to the socket. std::string_view user_outbound_message::buffer() { - return std::string_view(msg.data(), msg.size()); + return msg; } /** diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index e6691bc4..aaef8f81 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -24,6 +24,7 @@ ssl::context ctx{ssl::context::tlsv13}; * Map key: User socket session id () */ std::unordered_map users; +std::mutex users_mutex; // Mutex for users access race conditions. /** * Holds set of connected user session ids and public keys for lookups. @@ -219,7 +220,10 @@ int add_user(sock::socket_session *session, const std::st return -1; } - users.emplace(sessionid, usr::connected_user(session, pubkey)); + { + std::lock_guard lock(users_mutex); + users.emplace(sessionid, usr::connected_user(session, pubkey)); + } // Populate sessionid map so we can lookup by user pubkey. sessionids[pubkey] = sessionid; @@ -246,7 +250,11 @@ int remove_user(const std::string &sessionid) usr::connected_user &user = itr->second; - sessionids.erase(user.pubkey); + { + std::lock_guard lock(users_mutex); + sessionids.erase(user.pubkey); + } + users.erase(itr); return 0; } diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index a6f092d2..e63e7402 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include "../util.hpp" #include "../sock/socket_session.hpp" #include "user_session_handler.hpp" @@ -45,6 +46,7 @@ struct connected_user * Map key: User socket session id () */ extern std::unordered_map users; +extern std::mutex users_mutex; // Mutex for users access race conditions. /** * Keep track of verification-pending challenges issued to newly connected users. diff --git a/src/util.cpp b/src/util.cpp index eba145dd..a43b7c74 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include namespace util @@ -52,6 +53,16 @@ int hex2bin(unsigned char *decodedbuf, size_t decodedbuf_len, std::string_view h return 0; } +/** + * Returns current time in UNIX epoch milliseconds. + */ +int64_t get_epoch_milliseconds() +{ + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); +} + /** * Compare two version strings in the format of "1.12.3". * v1 < v2 -> returns -1 diff --git a/src/util.hpp b/src/util.hpp index d6d90164..0bf87b51 100644 --- a/src/util.hpp +++ b/src/util.hpp @@ -14,6 +14,9 @@ namespace util // Hot Pocket version. Displayed on 'hotpocket version' and written to new contract configs. static const char *HP_VERSION = "0.1"; +// Current version of the peer message protocol. +static const int PEERMSG_VERSION = 1; + // Minimum compatible contract config version (this will be used to validate contract configs) static const char *MIN_CONTRACT_VERSION = "0.1"; @@ -37,6 +40,8 @@ int bin2hex(std::string &encoded_string, const unsigned char *bin, size_t bin_le int hex2bin(unsigned char *decoded, size_t decoded_len, std::string_view hex_str); +int64_t get_epoch_milliseconds(); + int version_compare(const std::string &x, const std::string &y); std::string_view getsv(const rapidjson::Value &v);