From ba0cae019d3bad70e2dfedacb5e4f48a1d6d56fe Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Sat, 7 Nov 2020 15:01:01 +0530 Subject: [PATCH] Refactored consensus into 3 rounds. (#144) * Refactored consensus into 3 stages and removed stage 0. * Consensus threshold calculation improvements. * Refactored candidate user input processing. * Renamed proposal sent timestamp field. * Introduced comm_session display name. --- src/comm/comm_session.cpp | 35 +- src/comm/comm_session.hpp | 1 + src/consensus.cpp | 589 +++++++++++++-------------- src/consensus.hpp | 39 +- src/ledger.cpp | 1 + src/msg/fbuf/p2pmsg_content.fbs | 4 +- src/msg/fbuf/p2pmsg_helpers.cpp | 8 +- src/msg/fbuf/p2pmsg_helpers.hpp | 4 +- src/p2p/p2p.cpp | 2 +- src/p2p/p2p.hpp | 8 +- src/p2p/peer_session_handler.cpp | 18 +- src/pchheader.hpp | 1 + src/usr/read_req.cpp | 24 +- src/usr/user_session_handler.cpp | 10 +- src/usr/usr.cpp | 318 +++++++++------ src/usr/usr.hpp | 18 +- test/local-cluster/cluster-create.sh | 4 +- 17 files changed, 568 insertions(+), 516 deletions(-) diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index d335016b..e315d243 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -228,7 +228,34 @@ namespace comm reader_thread.join(); LOG_DEBUG << (session_type == SESSION_TYPE::PEER ? "Peer" : "User") << " session closed: " - << uniqueid.substr(0, 10) << (is_inbound ? "[in]" : "[out]") << (is_self ? "[self]" : ""); + << display_name() << (is_inbound ? "[in]" : "[out]") << (is_self ? "[self]" : ""); + } + + /** + * Returns printable name for the session based on uniqueid (used for logging). + */ + const std::string comm_session::display_name() + { + if (challenge_status == CHALLENGE_STATUS::CHALLENGE_VERIFIED) + { + if (session_type == SESSION_TYPE::PEER) + { + // Peer sessions use pubkey hex as unique id (skipping first 2 bytes key type prefix). + return uniqueid.substr(2, 10); + } + else + { + // User sessions use binary pubkey as unique id. So we need to convert to hex. + std::string hex; + util::bin2hex(hex, + reinterpret_cast(uniqueid.data()), + uniqueid.length()); + return hex.substr(2, 10); // Skipping first 2 bytes key type prefix. + } + } + + // Unverified sessions just use the ip/host address as the unique id. + return uniqueid; } /** @@ -268,13 +295,13 @@ namespace comm const uint64_t elapsed_time = time_now - t.timestamp; if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit) { - this->close(); + close(); t.timestamp = 0; t.counter_value = 0; - LOG_INFO << "Session " << this->uniqueid << " threshold exceeded. (type:" << threshold_type << " limit:" << t.threshold_limit << ")"; - corebill::report_violation(this->address); + LOG_INFO << "Session " << uniqueid << " threshold exceeded. (type:" << threshold_type << " limit:" << t.threshold_limit << ")"; + corebill::report_violation(address); } else if (elapsed_time > t.intervalms) { diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp index d7b4c751..2deb4ae4 100644 --- a/src/comm/comm_session.hpp +++ b/src/comm/comm_session.hpp @@ -70,6 +70,7 @@ namespace comm void process_outbound_msg_queue(); void mark_for_closure(); void close(const bool invoke_handler = true); + const std::string display_name(); void set_threshold(const SESSION_THRESHOLDS threshold_type, const uint64_t threshold_limit, const uint32_t intervalms); void increment_metric(const SESSION_THRESHOLDS threshold_type, const uint64_t amount); diff --git a/src/consensus.cpp b/src/consensus.cpp index 133411a8..5bfaef19 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -35,8 +35,9 @@ namespace consensus int init() { - // We allocate 1/4 of roundtime for each stage (there are 4 stages: 0,1,2,3) - ctx.stage_time = conf::cfg.roundtime / 4; + // We allocate 2/7 of roundtime for stage 1 and 2. The rest (4/7) is allocated to stage 3. + // This is because stage 3 needs some time to execute the contract in addition to broadcasting the proposal. + ctx.stage_time = (conf::cfg.roundtime * 2) / 7; ctx.stage_reset_wait_threshold = conf::cfg.roundtime / 10; ctx.contract_ctx.args.state_dir = conf::ctx.state_rw_dir; @@ -96,32 +97,136 @@ namespace consensus int consensus() { - // A consensus round consists of 4 stages (0,1,2,3). + // A consensus round consists of 3 stages (1,2,3). + // Stage 3 is the last stage AND it also provides entry point for next round stage 1. // For a given stage, this function may get visited multiple times due to time-wait conditions. uint64_t stage_start = 0; if (!wait_and_proceed_stage(stage_start)) return 0; // This means the stage has been reset. - // Get the latest current time. + LOG_DEBUG << "Started stage " << std::to_string(ctx.stage); + + // We consider stage start time as the current discreet time throughout the stage. ctx.time_now = stage_start; + + // Throughout consensus, we continously update and prune the candidate proposals for newly + // arived ones and expired ones. + revise_candidate_proposals(); + + // Get current lcl and state. + std::string lcl = ledger::ctx.get_lcl(); + uint64_t lcl_seq_no = ledger::ctx.get_seq_no(); + hpfs::h32 state = state_common::ctx.get_state(); + vote_counter votes; + + if (ctx.stage == 1) + { + if (is_in_sync(lcl, votes)) + { + // If we are in sync, vote and broadcast the winning votes to next stage. + const p2p::proposal p = create_stage_proposal(STAGE1_THRESHOLD, votes, lcl, state); + broadcast_proposal(p); + } + } + else if (ctx.stage == 2) + { + if (is_in_sync(lcl, votes)) + { + // If we are in sync, vote and broadcast the winning votes to next stage. + const p2p::proposal p = create_stage_proposal(STAGE2_THRESHOLD, votes, lcl, state); + broadcast_proposal(p); + } + + // In stage 2, broadcast non-unl proposal (NUP) containing inputs from locally connected users. + // This will be captured and verified at the end of stage 3. + broadcast_nonunl_proposal(); + } + else if (ctx.stage == 3) + { + if (is_in_sync(lcl, votes)) + { + // If we are in sync, vote and get the final winning votes. + // This is the consensus proposal which makes it into the ledger and contract execution + const p2p::proposal p = create_stage_proposal(STAGE3_THRESHOLD, votes, lcl, state); + + // Update the ledger and execute the contract using the consensus proposal. + if (update_ledger_and_execute_contract(p, lcl, state) == -1) + LOG_ERROR << "Error occured in Stage 3 consensus execution."; + } + + // Prepare for next round by sending NEW-ROUND PROPOSAL. + // At the end of stage 3, we broadcast the "new round" proposal which is subjected + // to voting in next round stage 1. + + // Prepare the consensus candidate user inputs that we have acumulated so far. (We receive them periodically via NUPs) + // The candidate inputs will be included in the new round proposal. + verify_and_populate_candidate_user_inputs(lcl_seq_no); + + const p2p::proposal new_round_prop = create_new_round_proposal(lcl, state); + broadcast_proposal(new_round_prop); + } + + // We have finished a consensus stage. Transition to next stage. (if at stage 3 go to next round stage 1) + ctx.stage = (ctx.stage < 3) ? (ctx.stage + 1) : 1; + return 0; + } + + bool is_in_sync(std::string_view lcl, vote_counter &votes) + { + // Check if we're ahead/behind of consensus lcl. + bool is_lcl_desync = false; + std::string majority_lcl; + if (check_lcl_votes(is_lcl_desync, majority_lcl, votes, lcl)) + { + // We proceed further only if lcl check was success (meaning lcl check could be reliably performed). + + // State lcl sync if we are out-of-sync with majority lcl. + if (is_lcl_desync) + { + conf::change_operating_mode(conf::OPERATING_MODE::OBSERVER); + ledger::set_sync_target(majority_lcl); + } + + // Check our state with majority state. + bool is_state_desync = false; + hpfs::h32 majority_state = hpfs::h32_empty; + check_state_votes(is_state_desync, majority_state, votes); + + // Start state sync if we are out-of-sync with majority state. + if (is_state_desync) + { + conf::change_operating_mode(conf::OPERATING_MODE::OBSERVER); + state_sync::set_target(majority_state); + } + + // Proceed further only if both lcl and state are in sync with majority. + if (!is_lcl_desync && !is_state_desync) + { + conf::change_operating_mode(conf::OPERATING_MODE::PROPOSER); + return true; + } + } + + return false; + } + + /** + * Moves proposals collected from the network into candidate proposals and + * cleans up any outdated proposals from the candidate set. + */ + void revise_candidate_proposals() + { + // Move over the network proposal collection into a local list. This is to have a private working + // set for candidate parsing and avoid threading conflicts with network incoming proposals. std::list collected_proposals; - - // Get current lcl and sequence no. - const std::string lcl = ledger::ctx.get_lcl(); - const uint64_t lcl_seq_no = ledger::ctx.get_seq_no(); - const hpfs::h32 state = state_common::ctx.get_state(); - - // Throughout consensus, we move over the incoming proposals collected via the network so far into - // the candidate proposal set (move and append). This is to have a private working set for the consensus - // and avoid threading conflicts with network incoming proposals. { std::scoped_lock lock(p2p::ctx.collected_msgs.proposals_mutex); collected_proposals.splice(collected_proposals.end(), p2p::ctx.collected_msgs.proposals); } - //Copy collected propsals to candidate set of proposals. - //Add propsals of new nodes and replace proposals from old nodes to reflect current status of nodes. + // Move collected propsals to candidate set of proposals. + // Add propsals of new nodes and replace proposals from old nodes to reflect current status of nodes. for (const auto &proposal : collected_proposals) { auto prop_itr = ctx.candidate_proposals.find(proposal.pubkey); @@ -136,85 +241,12 @@ namespace consensus } } - LOG_DEBUG << "Started stage " << std::to_string(ctx.stage); - - if (ctx.stage == 0) // Stage 0 means begining of a consensus round. - { - // Broadcast non-unl proposals (NUP) containing inputs from locally connected users. - broadcast_nonunl_proposal(); - - // Verify and transfer user inputs from incoming NUPs onto consensus candidate data. - verify_and_populate_candidate_user_inputs(lcl_seq_no); - - // In stage 0 we create a novel proposal and broadcast it. - const p2p::proposal stg_prop = create_stage0_proposal(lcl, state); - broadcast_proposal(stg_prop); - } - else // Stage 1, 2, 3 - { - purify_candidate_proposals(); - - // Initialize vote counters. - vote_counter votes; - - // Check if we're ahead/behind of consensus lcl. - bool is_lcl_desync = false; - std::string majority_lcl; - if (check_lcl_votes(is_lcl_desync, majority_lcl, votes, lcl)) - { - // We proceed further only if lcl check was success (meaning lcl check could be reliably performed). - - // State lcl sync if we are out-of-sync with majority lcl. - if (is_lcl_desync) - { - conf::change_operating_mode(conf::OPERATING_MODE::OBSERVER); - ledger::set_sync_target(majority_lcl); - } - - // Check our state with majority state. - bool is_state_desync = false; - hpfs::h32 majority_state = hpfs::h32_empty; - check_state_votes(is_state_desync, majority_state, votes); - - // State state sync if we are out-of-sync with majority state. - if (is_state_desync) - { - conf::change_operating_mode(conf::OPERATING_MODE::OBSERVER); - state_sync::set_target(majority_state); - } - - // Proceed further only if both lcl and state are in sync with majority. - if (!is_lcl_desync && !is_state_desync) - { - conf::change_operating_mode(conf::OPERATING_MODE::PROPOSER); - - // In stage 1, 2, 3 we vote for incoming proposals and promote winning votes based on thresholds. - const p2p::proposal stg_prop = create_stage123_proposal(votes, lcl, state); - - broadcast_proposal(stg_prop); - - // The node has finished a consensus round (all 4 stages) - if (ctx.stage == 3 && apply_ledger(stg_prop) == -1) - LOG_ERROR << "Error occured in Stage 3 consensus execution."; - } - } - } - - // Node has finished a consensus stage. Transition to next stage. - ctx.stage = (ctx.stage + 1) % 4; - return 0; - } - - /** - * Cleanup any outdated proposals from the candidate set. - */ - void purify_candidate_proposals() - { + // Prune any outdated proposals. auto itr = ctx.candidate_proposals.begin(); while (itr != ctx.candidate_proposals.end()) { const p2p::proposal &cp = itr->second; - const uint64_t time_diff = (ctx.time_now > cp.timestamp) ? (ctx.time_now - cp.timestamp) : 0; + const uint64_t time_diff = (ctx.time_now > cp.sent_timestamp) ? (ctx.time_now - cp.sent_timestamp) : 0; const int8_t stage_diff = ctx.stage - cp.stage; // only consider recent proposals and proposals from previous stage and current stage. @@ -249,22 +281,23 @@ namespace consensus const uint64_t now = util::get_epoch_milliseconds(); // Rrounds are discreet windows of roundtime. - // This gets the start time of current round window. Stage 0 must start in the next window. + // This gets the start time of current round window. Stage 1 must start in the next round window. const uint64_t current_round_start = (((uint64_t)(now / conf::cfg.roundtime)) * conf::cfg.roundtime); - if (ctx.stage == 0) + if (ctx.stage == 1) { - // Stage 0 must start in the next round window. + // Stage 1 must start in the next round window. + // (This makes sure stage 3 gets whichever the remaining time in the round after stage 1 and 2) stage_start = current_round_start + conf::cfg.roundtime; const int64_t to_wait = stage_start - now; - LOG_DEBUG << "Waiting " << std::to_string(to_wait) << "ms for next round stage 0"; + LOG_DEBUG << "Waiting " << std::to_string(to_wait) << "ms for next round stage 1"; util::sleep(to_wait); return true; } else { - stage_start = current_round_start + (ctx.stage * ctx.stage_time); + stage_start = current_round_start + ((ctx.stage - 1) * ctx.stage_time); // Compute stage time wait. // Node wait between stages to collect enough proposals from previous stages from other nodes. @@ -274,8 +307,8 @@ namespace consensus // it will join in next round. Otherwise it will continue particapating in this round. if (to_wait < ctx.stage_reset_wait_threshold) //todo: self claculating/adjusting network delay { - LOG_DEBUG << "Missed stage " << std::to_string(ctx.stage) << " window. Resetting to stage 0"; - ctx.stage = 0; + LOG_DEBUG << "Missed stage " << std::to_string(ctx.stage) << " window. Resetting to stage 1"; + ctx.stage = 1; return false; } else @@ -289,18 +322,19 @@ namespace consensus /** * Broadcasts any inputs from locally connected users via an NUP. - * @return 0 for successful broadcast. -1 for failure. */ void broadcast_nonunl_proposal() { - if (usr::ctx.users.empty()) - return; - - // Construct NUP. p2p::nonunl_proposal nup; { - std::scoped_lock(usr::ctx.users_mutex); + // Populate users and inputs to the NUP within user lock. + std::scoped_lock lock(usr::ctx.users_mutex); + + if (usr::ctx.users.empty()) + return; + + // Construct NUP. for (auto &[sid, user] : usr::ctx.users) { std::list user_inputs; @@ -336,171 +370,111 @@ namespace consensus */ void verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no) { - // Lock the user sessions and the list so any network activity is blocked. - std::scoped_lock lock(usr::ctx.users_mutex, p2p::ctx.collected_msgs.nonunl_proposals_mutex); - for (const p2p::nonunl_proposal &p : p2p::ctx.collected_msgs.nonunl_proposals) + // Move over NUPs collected from the network into a local list. + std::list collected_nups; { - for (const auto &[pubkey, umsgs] : p.user_inputs) + std::scoped_lock lock(p2p::ctx.collected_msgs.nonunl_proposals_mutex); + collected_nups.splice(collected_nups.end(), p2p::ctx.collected_msgs.nonunl_proposals); + } + + // Prepare merged list of users with each user's inputs grouped under the user. + // Key: user pubkey, Value: List of inputs from the user. + std::unordered_map> input_groups; + for (p2p::nonunl_proposal &p : collected_nups) + { + for (auto &[pubkey, umsgs] : p.user_inputs) { - // Locate this user's socket session in case we need to send any status messages regarding user inputs. - comm::comm_session *session = usr::get_session_by_pubkey(pubkey); + // Move any user inputs from each NUP over to the grouped inputs under the user pubkey. + std::list &input_list = input_groups[pubkey]; + input_list.splice(input_list.end(), umsgs); + } + } + collected_nups.clear(); - // Populate user list with this user's pubkey. - ctx.candidate_users.emplace(pubkey); + // Maintains users and any input-acceptance responses we should send to them. + // Key: user pubkey. Value: List of [user-protocol, msg-sig, reject-reason] tuples. + std::unordered_map>> responses; - // Keep track of total input length to verify against remaining balance. - // We only process inputs in the submitted order that can be satisfied with the remaining account balance. - size_t total_input_len = 0; - bool appbill_balance_exceeded = false; + for (const auto &[pubkey, umsgs] : input_groups) + { + // Populate user list with this user's pubkey. + ctx.candidate_users.emplace(pubkey); - for (const usr::user_input &umsg : umsgs) + // Keep track of total input length to verify against remaining balance. + // We only process inputs in the submitted order that can be satisfied with the remaining account balance. + size_t total_input_len = 0; + bool appbill_balance_exceeded = false; + util::rollover_hashset recent_user_input_hashes(200); + + for (const usr::user_input &umsg : umsgs) + { + const char *reject_reason = NULL; + + if (appbill_balance_exceeded) { - msg::usrmsg::usrmsg_parser parser(umsg.protocol); + reject_reason = msg::usrmsg::REASON_APPBILL_BALANCE_EXCEEDED; + } + else + { + std::string hash, input; + uint64_t max_lcl_seqno; + reject_reason = usr::validate_user_input_submission(pubkey, umsg, lcl_seq_no, total_input_len, recent_user_input_hashes, + hash, input, max_lcl_seqno); - const char *reject_reason = NULL; - const std::string sig_hash = crypto::get_hash(umsg.sig); - - // Check for duplicate messages using hash of the signature. - if (ctx.recent_userinput_hashes.try_emplace(sig_hash)) + if (reject_reason == NULL) { - // Verify the signature of the input_container. - if (crypto::verify(umsg.input_container, umsg.sig, pubkey) == 0) - { - std::string nonce; - std::string input; - uint64_t max_lcl_seqno; - parser.extract_input_container(input, nonce, max_lcl_seqno, umsg.input_container); - - // Ignore the input if our ledger has passed the input TTL. - if (max_lcl_seqno > lcl_seq_no) - { - if (!appbill_balance_exceeded) - { - // Hash is prefixed with the nonce to support user-defined sort order. - std::string hash = std::move(nonce); - // Append the hash of the message signature to get the final hash. - hash.append(sig_hash); - - // Keep checking the subtotal of inputs extracted so far with the appbill account balance. - total_input_len += input.length(); - if (verify_appbill_check(pubkey, total_input_len)) - { - ctx.candidate_user_inputs.try_emplace( - hash, - candidate_user_input(pubkey, std::move(input), max_lcl_seqno)); - } - else - { - // Abandon processing further inputs from this user when we find out - // an input cannot be processed with the account balance. - appbill_balance_exceeded = true; - reject_reason = msg::usrmsg::REASON_APPBILL_BALANCE_EXCEEDED; - } - } - else - { - reject_reason = msg::usrmsg::REASON_APPBILL_BALANCE_EXCEEDED; - } - } - else - { - LOG_DEBUG << "User message bad max ledger seq expired."; - reject_reason = msg::usrmsg::REASON_MAX_LEDGER_EXPIRED; - } - } - else - { - LOG_DEBUG << "User message bad signature."; - reject_reason = msg::usrmsg::REASON_BAD_SIG; - } + // No reject reason means we should go ahead and subject the input to consensus. + ctx.candidate_user_inputs.try_emplace( + hash, + candidate_user_input(pubkey, std::move(input), max_lcl_seqno)); } - else + else if (reject_reason == msg::usrmsg::REASON_APPBILL_BALANCE_EXCEEDED) { - LOG_DEBUG << "Duplicate user message."; - reject_reason = msg::usrmsg::REASON_DUPLICATE_MSG; + // Abandon processing further inputs from this user when we find out + // an input cannot be processed with the account balance. + appbill_balance_exceeded = true; } + } + responses[pubkey].push_back(std::tuple(umsg.protocol, umsg.sig, reject_reason)); + } + } + + input_groups.clear(); + + { + // Lock the user sessions. + std::scoped_lock lock(usr::ctx.users_mutex); + + for (auto &[pubkey, user_responses] : responses) + { + // Locate this user's socket session. + const auto user_itr = usr::ctx.users.find(pubkey); + if (user_itr != usr::ctx.users.end()) + { // Send the request status result if this user is connected to us. - if (session != NULL) + for (auto &resp : user_responses) { + // resp: 0=protocl, 1=msg sig, 2=reject reason. + msg::usrmsg::usrmsg_parser parser(std::get<0>(resp)); + const std::string &msg_sig = std::get<1>(resp); + const char *reject_reason = std::get<2>(resp); + usr::send_input_status(parser, - *session, + user_itr->second.session, reject_reason == NULL ? msg::usrmsg::STATUS_ACCEPTED : msg::usrmsg::STATUS_REJECTED, reject_reason == NULL ? "" : reject_reason, - umsg.sig); + msg_sig); } } } } - p2p::ctx.collected_msgs.nonunl_proposals.clear(); } - /** - * Executes the appbill and verifies whether the user has enough account balance to process the provided input. - * @param pubkey User binary pubkey. - * @param input_len Total bytes length of user input. - * @return Whether the user is allowed to process the input or not. - */ - bool verify_appbill_check(std::string_view pubkey, const size_t input_len) + p2p::proposal create_new_round_proposal(std::string_view lcl, hpfs::h32 state) { - // If appbill not enabled always green light the input. - if (conf::cfg.appbill.empty()) - return true; - - // execute appbill in --check mode to verify this user can submit a packet/connection to the network - // todo: this can be made more efficient, appbill --check can process 7 at a time - - // Fill appbill args - const int len = conf::cfg.runtime_appbill_args.size() + 4; - char *execv_args[len]; - for (int i = 0; i < conf::cfg.runtime_appbill_args.size(); i++) - execv_args[i] = conf::cfg.runtime_appbill_args[i].data(); - char option[] = "--check"; - execv_args[len - 4] = option; - // add the hex encoded public key as the last parameter - std::string hexpubkey; - util::bin2hex(hexpubkey, reinterpret_cast(pubkey.data()), pubkey.size()); - std::string inputsize = std::to_string(input_len); - execv_args[len - 3] = hexpubkey.data(); - execv_args[len - 2] = inputsize.data(); - execv_args[len - 1] = NULL; - - int pid = fork(); - if (pid == 0) - { - // appbill process. - util::fork_detach(); - - // before execution chdir into a valid the latest state data directory that contains an appbill.table - chdir(conf::ctx.state_rw_dir.c_str()); - int ret = execv(execv_args[0], execv_args); - std::cerr << errno << ": Appbill process execv failed.\n"; - return false; - } - else - { - // app bill in check mode takes a very short period of time to execute, typically 1ms - // so we will blocking wait for it here - int status = 0; - waitpid(pid, &status, 0); //todo: check error conditions here - status = WEXITSTATUS(status); - if (status != 128 && status != 0) - { - // this user's key passed appbill - return true; - } - else - { - // user's key did not pass, do not add to user input candidates - LOG_DEBUG << "Appbill validation failed " << hexpubkey << " return code was " << status; - return false; - } - } - } - - p2p::proposal create_stage0_proposal(std::string_view lcl, hpfs::h32 state) - { - // The proposal we are going to emit in stage 0. + // The proposal we are going to emit at the end of stage 3 after ledger update. + // This is the proposal that stage 1 votes on. p2p::proposal stg_prop; stg_prop.time = ctx.time_now; stg_prop.stage = 0; @@ -508,11 +482,7 @@ namespace consensus stg_prop.state = state; // Populate the proposal with set of candidate user pubkeys. - for (const std::string &pubkey : ctx.candidate_users) - stg_prop.users.emplace(pubkey); - - // We don't need candidate_users anymore, so clear it. It will be repopulated during next consensus round. - ctx.candidate_users.clear(); + stg_prop.users.swap(ctx.candidate_users); // Populate the proposal with hashes of user inputs. for (const auto &[hash, cand_input] : ctx.candidate_user_inputs) @@ -527,7 +497,7 @@ namespace consensus return stg_prop; } - p2p::proposal create_stage123_proposal(vote_counter &votes, std::string_view lcl, hpfs::h32 state) + p2p::proposal create_stage_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, hpfs::h32 state) { // The proposal to be emited at the end of this stage. p2p::proposal stg_prop; @@ -536,14 +506,14 @@ namespace consensus // we always vote for our current lcl and state regardless of what other peers are saying // if there's a fork condition we will either request history and state from - // our peers or we will halt depending on level of consensus on the sides of the fork + // our peers or we will halt depending on level of consensus on the sides of the fork. stg_prop.lcl = lcl; // Vote for rest of the proposal fields by looking at candidate proposals. for (const auto &[pubkey, cp] : ctx.candidate_proposals) { // Vote for times. - // Everyone votes on an arbitrary time, as long as its within the round time and not in the future. + // Everyone votes on an arbitrary time, as long as it's not in the future and within the round time. if (ctx.time_now > cp.time && (ctx.time_now - cp.time) < conf::cfg.roundtime) increment(votes.time, cp.time); @@ -562,7 +532,7 @@ namespace consensus increment(votes.outputs, hash); } - const float_t vote_threshold = get_stage_threshold(ctx.stage); + const uint32_t required_votes = ceil(vote_threshold * conf::cfg.unl.size()); // todo: check if inputs being proposed by another node are actually spoofed inputs // from a user locally connected to this node. @@ -571,25 +541,25 @@ namespace consensus // Add user pubkeys which have votes over stage threshold to proposal. for (const auto &[pubkey, numvotes] : votes.users) - if (numvotes >= vote_threshold || (ctx.stage == 1 && numvotes > 0)) + if (numvotes >= required_votes || (ctx.stage == 1 && numvotes > 0)) stg_prop.users.emplace(pubkey); // Add inputs which have votes over stage threshold to proposal. for (const auto &[hash, numvotes] : votes.inputs) - if (numvotes >= vote_threshold || (ctx.stage == 1 && numvotes > 0)) + if (numvotes >= required_votes || (ctx.stage == 1 && numvotes > 0)) stg_prop.hash_inputs.emplace(hash); // Add outputs which have votes over stage threshold to proposal. for (const auto &[hash, numvotes] : votes.outputs) - if (numvotes >= vote_threshold) + if (numvotes >= required_votes) stg_prop.hash_outputs.emplace(hash); // time is voted on a simple sorted (highest to lowest) and majority basis, since there will always be disagreement. - int32_t highest_time_vote = 0; + uint32_t highest_time_vote = 0; for (auto itr = votes.time.rbegin(); itr != votes.time.rend(); ++itr) { const uint64_t time = itr->first; - const int32_t numvotes = itr->second; + const uint32_t numvotes = itr->second; if (numvotes > highest_time_vote) { @@ -602,7 +572,8 @@ namespace consensus } /** - * Broadcasts the given proposal to all connected peers. + * Broadcasts the given proposal to all connected peers if in PROPOSER mode. Otherwise + * only send to self in OBSERVER mode. * @return 0 on success. -1 if no peers to broadcast. */ void broadcast_proposal(const p2p::proposal &p) @@ -634,7 +605,7 @@ namespace consensus */ bool check_lcl_votes(bool &is_desync, std::string &majority_lcl, vote_counter &votes, std::string_view lcl) { - int32_t total_lcl_votes = 0; + uint32_t total_lcl_votes = 0; for (const auto &[pubkey, cp] : ctx.candidate_proposals) { @@ -642,13 +613,15 @@ namespace consensus total_lcl_votes++; } - if (total_lcl_votes < (MAJORITY_THRESHOLD * conf::cfg.unl.size())) + // Check whether we have received enough votes in total. + const uint32_t min_required = ceil(MAJORITY_THRESHOLD * conf::cfg.unl.size()); + if (total_lcl_votes < min_required) { - LOG_DEBUG << "Not enough peers proposing to perform consensus. votes:" << total_lcl_votes << " needed:" << ceil(MAJORITY_THRESHOLD * conf::cfg.unl.size()); + LOG_DEBUG << "Not enough peers proposing to perform consensus. votes:" << total_lcl_votes << " needed:" << min_required; return false; } - int32_t winning_votes = 0; + uint32_t winning_votes = 0; for (const auto [lcl, votes] : votes.lcl) { if (votes > winning_votes) @@ -658,25 +631,30 @@ namespace consensus } } - // Check wheher there are good enough winning votes. - if (winning_votes < MAJORITY_THRESHOLD * ctx.candidate_proposals.size()) - { - // potential fork condition. - LOG_DEBUG << "No consensus on lcl. Possible fork condition. won:" << winning_votes << " total:" << ctx.candidate_proposals.size(); - return false; - } - - // Iif winning lcl is not matched with our lcl, that means we are not on the consensus ledger. + // If winning lcl is not matched with our lcl, that means we are not on the consensus ledger. + // If that's the case we should request history straight away. if (lcl != majority_lcl) { - LOG_DEBUG << "We are not on the consensus ledger, we must request history from a peer."; + LOG_DEBUG << "We are not on the consensus ledger, we must request history from a peer."; is_desync = true; return true; } - - // Reaching here means we have reliable amount of lcl votes and our lcl match with majority lcl. - is_desync = false; - return true; + else + { + // Check wheher there are enough winning votes for the lcl to be reliable. + const uint32_t min_wins_required = ceil(MAJORITY_THRESHOLD * ctx.candidate_proposals.size()); + if (winning_votes < min_wins_required) + { + LOG_DEBUG << "No consensus on lcl. Possible fork condition. won:" << winning_votes << " needed:" << min_wins_required; + return false; + } + else + { + // Reaching here means we have reliable amount of winning lcl votes and our lcl matches with majority lcl. + is_desync = false; + return true; + } + } } /** @@ -690,7 +668,7 @@ namespace consensus increment(votes.state, cp.state); } - int32_t winning_votes = 0; + uint32_t winning_votes = 0; for (const auto [state, votes] : votes.state) { if (votes > winning_votes) @@ -704,33 +682,15 @@ namespace consensus } /** - * Returns the consensus percentage threshold for the specified stage. - * @param stage The consensus stage [1, 2, 3] - */ - float_t get_stage_threshold(const uint8_t stage) - { - switch (stage) - { - case 1: - return STAGE1_THRESHOLD * conf::cfg.unl.size(); - case 2: - return STAGE2_THRESHOLD * conf::cfg.unl.size(); - case 3: - return STAGE3_THRESHOLD * conf::cfg.unl.size(); - } - return -1; - } - - /** - * Finalize the ledger after consensus. + * Update the ledger and execute the contract after consensus. * @param cons_prop The proposal that reached consensus. */ - int apply_ledger(const p2p::proposal &cons_prop) + int update_ledger_and_execute_contract(const p2p::proposal &cons_prop, std::string &new_lcl, hpfs::h32 &new_state) { if (ledger::save_ledger(cons_prop) == -1) return -1; - std::string new_lcl = ledger::ctx.get_lcl(); + new_lcl = ledger::ctx.get_lcl(); const uint64_t new_lcl_seq_no = ledger::ctx.get_seq_no(); LOG_INFO << "****Ledger created**** (lcl:" << new_lcl.substr(0, 15) << " state:" << cons_prop.state << ")"; @@ -767,6 +727,8 @@ namespace consensus } state_common::ctx.set_state(args.post_execution_state_hash); + new_state = args.post_execution_state_hash; + extract_user_outputs_from_contract_bufmap(args.userbufs); sc::clear_args(args); @@ -797,23 +759,20 @@ namespace consensus // Send matching outputs to locally connected users. candidate_user_output &cand_output = cu_itr->second; - // Find the user session by user pubkey. - const auto sess_itr = usr::ctx.sessionids.find(cand_output.userpubkey); - if (sess_itr != usr::ctx.sessionids.end()) // match found + // Find user to send by pubkey. + const auto user_itr = usr::ctx.users.find(cand_output.userpubkey); + if (user_itr != usr::ctx.users.end()) // match found { - const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id. - if (user_itr != usr::ctx.users.end()) // match found + const usr::connected_user &user = user_itr->second; + msg::usrmsg::usrmsg_parser parser(user.protocol); + + // Sending all the outputs to the user. + for (sc::contract_output &output : cand_output.outputs) { - const usr::connected_user &user = user_itr->second; - msg::usrmsg::usrmsg_parser parser(user.protocol); - // Sending all the outputs to the user. - for (sc::contract_output &output : cand_output.outputs) - { - std::vector msg; - parser.create_contract_output_container(msg, output.message, lcl_seq_no, lcl); - user.session.send(msg); - output.message.clear(); - } + std::vector msg; + parser.create_contract_output_container(msg, output.message, lcl_seq_no, lcl); + user.session.send(msg); + output.message.clear(); } } @@ -898,7 +857,7 @@ namespace consensus * @param candidate The candidate whose vote should be increased by 1. */ template - void increment(std::map &counter, const T &candidate) + void increment(std::map &counter, const T &candidate) { if (counter.count(candidate)) counter[candidate]++; diff --git a/src/consensus.hpp b/src/consensus.hpp index 593af93b..027276c5 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -51,7 +51,7 @@ namespace consensus std::unordered_map candidate_proposals; // Set of user pubkeys that is said to be connected to the cluster. This will be cleared in each round. - std::unordered_set candidate_users; + std::set candidate_users; // Map of candidate user inputs with input hash as map key. Inputs will stay here until they // achieve consensus or expire (due to maxledgerseqno). Input hash is globally unique among inputs @@ -63,9 +63,7 @@ namespace consensus // all users. We will use this map to distribute outputs back to connected users once consensus is achieved. std::unordered_map candidate_user_outputs; - util::rollover_hashset recent_userinput_hashes; - - uint8_t stage = 0; + uint8_t stage = 1; uint64_t time_now = 0; uint16_t stage_time = 0; // Time allocated to a consensus stage. uint16_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage. @@ -74,21 +72,16 @@ namespace consensus bool is_shutting_down = false; std::thread consensus_thread; - - consensus_context() - : recent_userinput_hashes(200) - { - } }; struct vote_counter { - std::map time; - std::map lcl; - std::map users; - std::map inputs; - std::map outputs; - std::map state; + std::map time; + std::map lcl; + std::map users; + std::map inputs; + std::map outputs; + std::map state; }; int init(); @@ -101,7 +94,9 @@ namespace consensus int consensus(); - void purify_candidate_proposals(); + bool is_in_sync(std::string_view lcl, vote_counter &votes); + + void revise_candidate_proposals(); bool wait_and_proceed_stage(uint64_t &stage_start); @@ -111,11 +106,9 @@ namespace consensus void verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no); - bool verify_appbill_check(std::string_view pubkey, const size_t input_len); + p2p::proposal create_new_round_proposal(std::string_view lcl, hpfs::h32 state); - p2p::proposal create_stage0_proposal(std::string_view lcl, hpfs::h32 state); - - p2p::proposal create_stage123_proposal(vote_counter &votes, std::string_view lcl, hpfs::h32 state); + p2p::proposal create_stage_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, hpfs::h32 state); void broadcast_proposal(const p2p::proposal &p); @@ -123,15 +116,13 @@ namespace consensus void check_state_votes(bool &is_desync, hpfs::h32 &majority_state, vote_counter &votes); - float_t get_stage_threshold(const uint8_t stage); - void timewait_stage(const bool reset, const uint64_t time); uint64_t get_ledger_time_resolution(const uint64_t time); uint64_t get_stage_time_resolution(const uint64_t time); - int apply_ledger(const p2p::proposal &proposal); + int update_ledger_and_execute_contract(const p2p::proposal &proposal, std::string &new_lcl, hpfs::h32 &new_state); void dispatch_user_outputs(const p2p::proposal &cons_prop, const uint64_t lcl_seq_no, std::string_view lcl); @@ -140,7 +131,7 @@ namespace consensus void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap); template - void increment(std::map &counter, const T &candidate); + void increment(std::map &counter, const T &candidate); int get_initial_state_hash(hpfs::h32 &hash); diff --git a/src/ledger.cpp b/src/ledger.cpp index b013b84d..c1bb2754 100644 --- a/src/ledger.cpp +++ b/src/ledger.cpp @@ -567,6 +567,7 @@ namespace ledger const auto [seq_no, lcl] = get_ledger_cache_top(); ctx.set_lcl(seq_no, lcl); + new_lcl = lcl; LOG_INFO << "lcl sync: Fork detected. Removed last ledger. New lcl:" << lcl.substr(0, 15); return 0; } diff --git a/src/msg/fbuf/p2pmsg_content.fbs b/src/msg/fbuf/p2pmsg_content.fbs index 5a8bd5e8..90a37fa7 100644 --- a/src/msg/fbuf/p2pmsg_content.fbs +++ b/src/msg/fbuf/p2pmsg_content.fbs @@ -49,8 +49,8 @@ table Proposal_Message { //Proposal type message schema stage:uint8; time:uint64; users:[ByteArray]; - hash_inputs:[ByteArray]; //stage > 0 inputs (hash of stage 0 inputs) - hash_outputs:[ByteArray]; //stage > 0 outputs (hash of stage 0 outputs) + hash_inputs:[ByteArray]; + hash_outputs:[ByteArray]; state: [ubyte]; } diff --git a/src/msg/fbuf/p2pmsg_helpers.cpp b/src/msg/fbuf/p2pmsg_helpers.cpp index 49e89f97..a2312297 100644 --- a/src/msg/fbuf/p2pmsg_helpers.cpp +++ b/src/msg/fbuf/p2pmsg_helpers.cpp @@ -222,7 +222,7 @@ namespace msg::fbuf::p2pmsg p2p::proposal p; p.pubkey = flatbuff_bytes_to_sv(pubkey); - p.timestamp = timestamp; + p.sent_timestamp = timestamp; p.time = msg.time(); p.stage = msg.stage(); p.lcl = flatbuff_bytes_to_sv(lcl); @@ -638,10 +638,10 @@ namespace msg::fbuf::p2pmsg //---Conversion helpers from flatbuffers data types to std data types---// - const std::unordered_map> + const std::unordered_map> flatbuf_user_input_group_to_user_input_map(const flatbuffers::Vector> *fbvec) { - std::unordered_map> map; + std::unordered_map> map; map.reserve(fbvec->size()); for (const UserInputGroup *group : *fbvec) { @@ -664,7 +664,7 @@ namespace msg::fbuf::p2pmsg //---These are used in constructing Flatbuffer messages using builders---// const flatbuffers::Offset>> - user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map) + user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map) { std::vector> fbvec; fbvec.reserve(map.size()); diff --git a/src/msg/fbuf/p2pmsg_helpers.hpp b/src/msg/fbuf/p2pmsg_helpers.hpp index e3656103..aa439863 100644 --- a/src/msg/fbuf/p2pmsg_helpers.hpp +++ b/src/msg/fbuf/p2pmsg_helpers.hpp @@ -70,13 +70,13 @@ namespace msg::fbuf::p2pmsg //---Conversion helpers from flatbuffers data types to std data types---// - const std::unordered_map> + const std::unordered_map> flatbuf_user_input_group_to_user_input_map(const flatbuffers::Vector> *fbvec); //---Conversion helpers from std data types to flatbuffers data types---// const flatbuffers::Offset>> - user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map); + user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map); const std::map flatbuf_historyledgermap_to_historyledgermap(const flatbuffers::Vector> *fbvec); diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index d7f9287f..bdc4a52c 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -131,7 +131,7 @@ namespace p2p session.is_weakly_connected = ex_session.is_weakly_connected; p2p::ctx.peer_connections.try_emplace(session.uniqueid, &session); // add new session. - LOG_DEBUG << "Replacing existing connection [" << session.uniqueid.substr(0, 10) << "]"; + LOG_DEBUG << "Replacing existing connection [" << session.display_name() << "]"; return 0; } else if (ex_session.known_ipport.first.empty() || !session.known_ipport.first.empty()) diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index f4e49c73..03fe7987 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -16,7 +16,11 @@ namespace p2p struct proposal { std::string pubkey; - uint64_t timestamp = 0; + + // The timestamp of the sender when this proposal was sent. + uint64_t sent_timestamp = 0; + + // The time value that is voted on. uint64_t time = 0; uint8_t stage = 0; std::string lcl; @@ -28,7 +32,7 @@ namespace p2p struct nonunl_proposal { - std::unordered_map> user_inputs; + std::unordered_map> user_inputs; }; struct history_request diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 6abbd2e8..20ea5139 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -32,7 +32,7 @@ namespace p2p // Limit max number of inbound connections. if (conf::cfg.peermaxcons > 0 && ctx.peer_connections.size() >= conf::cfg.peermaxcons) { - LOG_DEBUG << "Max peer connections reached. Dropped connection " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "Max peer connections reached. Dropped connection " << session.display_name(); return -1; } } @@ -69,7 +69,7 @@ namespace p2p if (!recent_peermsg_hashes.try_emplace(crypto::get_hash(message))) { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_DUPMSGS_PER_MINUTE, 1); - LOG_DEBUG << "Duplicate peer message. " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "Duplicate peer message. " << session.display_name(); return 0; } @@ -112,7 +112,7 @@ namespace p2p if (session.challenge_status != comm::CHALLENGE_VERIFIED) { - LOG_DEBUG << "Cannot accept messages. Peer challenge unresolved. " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "Cannot accept messages. Peer challenge unresolved. " << session.display_name(); return 0; } @@ -122,7 +122,7 @@ namespace p2p if (p2pmsg::validate_container_trust(container) != 0) { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); - LOG_DEBUG << "Proposal rejected due to trust failure. " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "Proposal rejected due to trust failure. " << session.display_name(); return 0; } @@ -143,7 +143,7 @@ namespace p2p if (p2pmsg::validate_container_trust(container) != 0) { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); - LOG_DEBUG << "NPL message rejected due to trust failure. " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "NPL message rejected due to trust failure. " << session.display_name(); return 0; } @@ -155,7 +155,7 @@ namespace p2p if (!consensus::push_npl_message(msg)) { - LOG_DEBUG << "NPL message enqueue failure. " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "NPL message enqueue failure. " << session.display_name(); } } else if (content_message_type == p2pmsg::Message_Connected_Status_Announcement_Message) // This message is the connected status announcement message. @@ -164,11 +164,11 @@ namespace p2p session.is_weakly_connected = announcement_msg->is_weakly_connected(); if (session.is_weakly_connected) { - LOG_DEBUG << "Weakly connected announcement received from " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "Weakly connected announcement received from " << session.display_name(); } else { - LOG_DEBUG << "Strongly connected announcement received from " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "Strongly connected announcement received from " << session.display_name(); } } else if (content_message_type == p2pmsg::Message_State_Request_Message) @@ -206,7 +206,7 @@ namespace p2p else { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); - LOG_DEBUG << "Received invalid peer message type. " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "Received invalid peer message type. " << session.display_name(); } return 0; } diff --git a/src/pchheader.hpp b/src/pchheader.hpp index 9e799575..bb46fd16 100644 --- a/src/pchheader.hpp +++ b/src/pchheader.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include diff --git a/src/usr/read_req.cpp b/src/usr/read_req.cpp index ba8caabe..02206732 100644 --- a/src/usr/read_req.cpp +++ b/src/usr/read_req.cpp @@ -142,23 +142,19 @@ namespace read_req if (!user_buf_itr->second.outputs.empty()) { // Find the user session by user pubkey. - const auto sess_itr = usr::ctx.sessionids.find(user_buf_itr->first); - if (sess_itr != usr::ctx.sessionids.end()) // match found + const auto user_itr = usr::ctx.users.find(user_buf_itr->first); + if (user_itr != usr::ctx.users.end()) // match found { - const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id. - if (user_itr != usr::ctx.users.end()) // match found + const usr::connected_user &user = user_itr->second; + msg::usrmsg::usrmsg_parser parser(user.protocol); + for (sc::contract_output &output : user_buf_itr->second.outputs) { - const usr::connected_user &user = user_itr->second; - msg::usrmsg::usrmsg_parser parser(user.protocol); - for (sc::contract_output &output : user_buf_itr->second.outputs) - { - std::vector msg; - parser.create_contract_read_response_container(msg, output.message); - user.session.send(msg); - output.message.clear(); - } - user_buf_itr->second.outputs.clear(); + std::vector msg; + parser.create_contract_read_response_container(msg, output.message); + user.session.send(msg); + output.message.clear(); } + user_buf_itr->second.outputs.clear(); } } LOG_DEBUG << "Read request contract execution ended."; diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp index acecbbef..9867d5ca 100644 --- a/src/usr/user_session_handler.cpp +++ b/src/usr/user_session_handler.cpp @@ -17,11 +17,11 @@ int user_session_handler::on_connect(comm::comm_session &session) const { if (conf::cfg.pubmaxcons > 0 && ctx.users.size() >= conf::cfg.pubmaxcons) { - LOG_DEBUG << "Max user connections reached. Dropped connection " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "Max user connections reached. Dropped connection " << session.display_name(); return -1; } - LOG_DEBUG << "User client connected " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "User client connected " << session.display_name(); // As soon as a user connects, we issue them a challenge message. We remember the // challenge we issued and later verify the user's response with it. @@ -61,13 +61,13 @@ int user_session_handler::on_message(comm::comm_session &session, std::string_vi if (handle_user_message(user, message) != 0) { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); - LOG_DEBUG << "Bad message from user " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "Bad message from user " << session.display_name(); } } else { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); - LOG_DEBUG << "User session id not found: " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "User session id not found: " << session.display_name(); } return 0; @@ -75,7 +75,7 @@ int user_session_handler::on_message(comm::comm_session &session, std::string_vi // If for any reason we reach this point, we should drop the connection because none of the // valid cases match. - LOG_DEBUG << "Dropping the user connection " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "Dropping the user connection " << session.display_name(); corebill::report_violation(session.address); return -1; } diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 4cccfde8..6bc35796 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -24,9 +24,9 @@ namespace usr bool init_success = false; /** - * Initializes the usr subsystem. Must be called once during application startup. - * @return 0 for successful initialization. -1 for failure. - */ + * Initializes the usr subsystem. Must be called once during application startup. + * @return 0 for successful initialization. -1 for failure. + */ int init() { metric_thresholds[0] = conf::cfg.pubmaxcpm; @@ -43,8 +43,8 @@ namespace usr } /** - * Cleanup any running processes. - */ + * Cleanup any running processes. + */ void deinit() { if (init_success) @@ -52,8 +52,8 @@ namespace usr } /** - * Starts listening for incoming user websocket connections. - */ + * Starts listening for incoming user websocket connections. + */ int start_listening() { if (ctx.listener.start( @@ -65,72 +65,43 @@ namespace usr } /** - * Verifies the given message for a previously issued user challenge. - * @param message Challenge response. - * @param session The socket session that received the response. - * @return 0 for successful verification. -1 for failure. - */ + * Verifies the given message for a previously issued user challenge. + * @param message Challenge response. + * @param session The socket session that received the response. + * @return 0 for successful verification. -1 for failure. + */ int verify_challenge(std::string_view message, comm::comm_session &session) { // The received message must be the challenge response. We need to verify it. if (session.issued_challenge.empty()) { - LOG_DEBUG << "No challenge found for the session " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "No user challenge found for the session " << session.display_name(); return -1; } - std::string userpubkeyhex; + std::string user_pubkey_hex; std::string protocol_code; std::string_view original_challenge = session.issued_challenge; - if (msg::usrmsg::json::verify_user_handshake_response(userpubkeyhex, protocol_code, message, original_challenge) == 0) + if (msg::usrmsg::json::verify_user_handshake_response(user_pubkey_hex, protocol_code, message, original_challenge) == 0) { - // Challenge signature verification successful. - - // Decode hex pubkey and get binary pubkey. We are only going to keep - // the binary pubkey due to reduced memory footprint. - std::string userpubkey; - userpubkey.resize(userpubkeyhex.length() / 2); - util::hex2bin( - reinterpret_cast(userpubkey.data()), - userpubkey.length(), - userpubkeyhex); - - // Now check whether this user public key is a duplicate. - if (ctx.sessionids.count(userpubkey) == 0) - { - // All good. Unique public key. - // Promote the connection from pending-challenges to authenticated users. - - const util::PROTOCOL user_protocol = (protocol_code == "json" ? util::PROTOCOL::JSON : util::PROTOCOL::BSON); - - session.challenge_status = comm::CHALLENGE_VERIFIED; // Set as challenge verified - add_user(session, userpubkey, user_protocol); // Add the user to the global authed user list - session.issued_challenge.clear(); // Remove the stored challenge - - LOG_DEBUG << "User connection " << session.uniqueid.substr(0, 10) << " authenticated. Public key " - << userpubkeyhex; - return 0; - } - else - { - LOG_DEBUG << "Duplicate user public key " << session.uniqueid.substr(0, 10); - } + // Challenge signature verification successful. Add the user to our global user list. + add_user(session, user_pubkey_hex, protocol_code); + return 0; } else { - LOG_DEBUG << "Challenge verification failed " << session.uniqueid.substr(0, 10); + LOG_DEBUG << "User challenge verification failed " << session.display_name(); + return -1; } - - return -1; } /** - * Processes a message sent by a connected user. This will be invoked by web socket on_message handler. - * @param user The authenticated user who sent the message. - * @param message The message sent by user. - * @return 0 on successful processing. -1 for failure. - */ + * Processes a message sent by a connected user. This will be invoked by web socket on_message handler. + * @param user The authenticated user who sent the message. + * @param message The message sent by user. + * @return 0 on successful processing. -1 for failure. + */ int handle_user_message(connected_user &user, std::string_view message) { msg::usrmsg::usrmsg_parser parser(user.protocol); @@ -200,8 +171,8 @@ namespace usr } /** - * Send the specified contract input status result via the provided session. - */ + * Send the specified contract input status result via the provided session. + */ void send_input_status(const msg::usrmsg::usrmsg_parser &parser, comm::comm_session &session, std::string_view status, std::string_view reason, std::string_view input_sig) { @@ -211,78 +182,179 @@ namespace usr } /** - * Adds the user denoted by specified session id and public key to the global authed user list. - * This should get called after the challenge handshake is verified. - * - * @param session User socket session. - * @param pubkey User's binary public key. - * @param protocol Messaging protocol used by user. - * @return 0 on successful additions. -1 on failure. - */ - int add_user(comm::comm_session &session, const std::string &pubkey, const util::PROTOCOL protocol) - { - const std::string &sessionid = session.uniqueid; - if (ctx.users.count(sessionid) == 1) - { - LOG_INFO << sessionid << " already exist. Cannot add user."; - return -1; - } - - { - std::scoped_lock lock(ctx.users_mutex); - ctx.users.emplace(sessionid, usr::connected_user(session, pubkey, protocol)); - } - - // Populate sessionid map so we can lookup by user pubkey. - ctx.sessionids.try_emplace(pubkey, sessionid); - - return 0; - } - - /** - * Removes the specified public key from the global user list. - * This must get called when a user disconnects from HP. - * - * @param sessionid User socket session id. - * @return 0 on successful removals. -1 on failure. - */ - int remove_user(const std::string &sessionid) - { - const auto itr = ctx.users.find(sessionid); - - if (itr == ctx.users.end()) - { - LOG_INFO << sessionid << " does not exist. Cannot remove user."; - return -1; - } - - usr::connected_user &user = itr->second; - - { - std::scoped_lock lock(ctx.users_mutex); - ctx.sessionids.erase(user.pubkey); - } - - ctx.users.erase(itr); - return 0; - } - - /** - * Finds and returns the socket session for the proided user pubkey. - * @param pubkey User binary pubkey. - * @return Pointer to the socket session. NULL of not found. + * Adds the user denoted by specified session id and public key to the global authed user list. + * This should get called after the challenge handshake is verified. + * + * @param session User socket session. + * @param user_pubkey_hex User's hex public key. + * @param protocol_code Messaging protocol used by user. + * @return 0 on successful additions. -1 on failure. */ - comm::comm_session *get_session_by_pubkey(const std::string &pubkey) + int add_user(comm::comm_session &session, const std::string &pubkey_hex, std::string_view protocol_code) { - const auto sessionid_itr = ctx.sessionids.find(pubkey); - if (sessionid_itr != ctx.sessionids.end()) + // Decode hex pubkey and get binary pubkey. We are only going to keep + // the binary pubkey due to reduced memory footprint. + std::string pubkey; + pubkey.resize(pubkey_hex.length() / 2); + util::hex2bin( + reinterpret_cast(pubkey.data()), + pubkey.length(), + pubkey_hex); + + // Acquire user list lock. + std::scoped_lock lock(ctx.users_mutex); + + // Now check whether this user public key is a duplicate. + if (ctx.users.count(pubkey) == 0) { - const auto user_itr = ctx.users.find(sessionid_itr->second); - if (user_itr != ctx.users.end()) - return &user_itr->second.session; + // All good. Unique public key. + // Promote the connection from pending-challenges to authenticated users. + + const util::PROTOCOL protocol = (protocol_code == "json" ? util::PROTOCOL::JSON : util::PROTOCOL::BSON); + + session.challenge_status = comm::CHALLENGE_VERIFIED; // Set as challenge verified + session.issued_challenge.clear(); // Remove the stored challenge + session.uniqueid = pubkey; + + // Add the user to the global authed user list + ctx.users.emplace(pubkey, usr::connected_user(session, pubkey, protocol)); + LOG_DEBUG << "User connection authenticated. Public key " << pubkey_hex; + } + else + { + LOG_DEBUG << "Duplicate user public key " << session.display_name(); } - return NULL; + return 0; + } + + /** + * Removes the specified public key from the global user list. + * This must get called when an authenticated user disconnects from HP. + * + * @param pubkey User pubkey. + * @return 0 on successful removals. -1 on failure. + */ + int remove_user(const std::string &pubkey) + { + std::scoped_lock lock(ctx.users_mutex); + const auto itr = ctx.users.erase(pubkey); + return 0; + } + + /** + * Validates the provided user input message against all the required criteria. + * @return The rejection reason if input rejected. NULL if the input can be accepted. + */ + const char *validate_user_input_submission(const std::string_view user_pubkey, const usr::user_input &umsg, + const uint64_t lcl_seq_no, size_t &total_input_len, + util::rollover_hashset &recent_user_input_hashes, + std::string &hash, std::string &input, uint64_t &max_lcl_seqno) + { + const std::string sig_hash = crypto::get_hash(umsg.sig); + + // Check for duplicate messages using hash of the signature. + if (!recent_user_input_hashes.try_emplace(sig_hash)) + { + LOG_DEBUG << "Duplicate user message."; + return msg::usrmsg::REASON_DUPLICATE_MSG; + } + + // Verify the signature of the input_container. + if (crypto::verify(umsg.input_container, umsg.sig, user_pubkey) == -1) + { + LOG_DEBUG << "User message bad signature."; + return msg::usrmsg::REASON_BAD_SIG; + } + + std::string nonce; + msg::usrmsg::usrmsg_parser parser(umsg.protocol); + parser.extract_input_container(input, nonce, max_lcl_seqno, umsg.input_container); + + // Ignore the input if our ledger has passed the input TTL. + if (max_lcl_seqno <= lcl_seq_no) + { + LOG_DEBUG << "User message bad max ledger seq expired."; + return msg::usrmsg::REASON_MAX_LEDGER_EXPIRED; + } + + // Keep checking the subtotal of inputs extracted so far with the appbill account balance. + total_input_len += input.length(); + if (!verify_appbill_check(user_pubkey, total_input_len)) + { + LOG_DEBUG << "User message app bill balance exceeded."; + return msg::usrmsg::REASON_APPBILL_BALANCE_EXCEEDED; + } + + // Hash is prefixed with the nonce to support user-defined sort order. + hash = std::move(nonce); + // Append the hash of the message signature to get the final hash. + hash.append(sig_hash); + + return NULL; // Success. No reject reason. + } + + /** + * Executes the appbill and verifies whether the user has enough account balance to process the provided input. + * @param pubkey User binary pubkey. + * @param input_len Total bytes length of user input. + * @return Whether the user is allowed to process the input or not. + */ + bool verify_appbill_check(std::string_view pubkey, const size_t input_len) + { + // If appbill not enabled always green light the input. + if (conf::cfg.appbill.empty()) + return true; + + // execute appbill in --check mode to verify this user can submit a packet/connection to the network + // todo: this can be made more efficient, appbill --check can process 7 at a time + + // Fill appbill args + const int len = conf::cfg.runtime_appbill_args.size() + 4; + char *execv_args[len]; + for (int i = 0; i < conf::cfg.runtime_appbill_args.size(); i++) + execv_args[i] = conf::cfg.runtime_appbill_args[i].data(); + char option[] = "--check"; + execv_args[len - 4] = option; + // add the hex encoded public key as the last parameter + std::string hexpubkey; + util::bin2hex(hexpubkey, reinterpret_cast(pubkey.data()), pubkey.size()); + std::string inputsize = std::to_string(input_len); + execv_args[len - 3] = hexpubkey.data(); + execv_args[len - 2] = inputsize.data(); + execv_args[len - 1] = NULL; + + int pid = fork(); + if (pid == 0) + { + // appbill process. + util::fork_detach(); + + // before execution chdir into a valid the latest state data directory that contains an appbill.table + chdir(conf::ctx.state_rw_dir.c_str()); + int ret = execv(execv_args[0], execv_args); + std::cerr << errno << ": Appbill process execv failed.\n"; + return false; + } + else + { + // app bill in check mode takes a very short period of time to execute, typically 1ms + // so we will blocking wait for it here + int status = 0; + waitpid(pid, &status, 0); //todo: check error conditions here + status = WEXITSTATUS(status); + if (status != 128 && status != 0) + { + // this user's key passed appbill + return true; + } + else + { + // user's key did not pass, do not add to user input candidates + LOG_DEBUG << "Appbill validation failed " << hexpubkey << " return code was " << status; + return false; + } + } } } // namespace usr \ No newline at end of file diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index b4c50ff3..08dcbc85 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -52,15 +52,10 @@ namespace usr struct connected_context { // Connected (authenticated) user list. - // Map key: User socket session id. + // Map key: User pubkey. Value: User info object. std::unordered_map users; std::mutex users_mutex; // Mutex for users access race conditions. - // Holds set of connected user session ids and public keys for lookups. - // This is used for pubkey duplicate checks as well. - // Map key: User binary pubkey - std::unordered_map sessionids; - comm::comm_server listener; }; extern connected_context ctx; @@ -78,11 +73,16 @@ namespace usr void send_input_status(const msg::usrmsg::usrmsg_parser &parser, comm::comm_session &session, std::string_view status, std::string_view reason, std::string_view input_sig); - int add_user(comm::comm_session &session, const std::string &pubkey, const util::PROTOCOL protocol); + int add_user(comm::comm_session &session, const std::string &user_pubkey_hex, std::string_view protocol_code); - int remove_user(const std::string &sessionid); + int remove_user(const std::string &pubkey); - comm::comm_session *get_session_by_pubkey(const std::string &pubkey); + const char *validate_user_input_submission(const std::string_view user_pubkey, const usr::user_input &umsg, + const uint64_t lcl_seq_no, size_t &total_input_len, + util::rollover_hashset &recent_user_input_hashes, + std::string &hash, std::string &input, uint64_t &max_lcl_seqno); + + bool verify_appbill_check(std::string_view pubkey, const size_t input_len); } // namespace usr diff --git a/test/local-cluster/cluster-create.sh b/test/local-cluster/cluster-create.sh index 5479bac2..5f2e64e1 100755 --- a/test/local-cluster/cluster-create.sh +++ b/test/local-cluster/cluster-create.sh @@ -52,8 +52,8 @@ do appbillargs: '', \ peerport: ${peerport}, \ pubport: ${pubport}, \ - roundtime: 2000, \ - loglevel: 'dbg', \ + roundtime: 1000, \ + loglevel: 'inf', \ loggers:['console', 'file'] \ }, null, 2)" > hp.cfg rm tmp.json