From 033b5fa7bced242446d5a541d767aa58de9a33f3 Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Sun, 14 Feb 2021 20:19:53 +0530 Subject: [PATCH] User inputs round limit. (#240) --- src/conf.cpp | 9 +- src/conf.hpp | 8 ++ src/consensus.cpp | 138 +++++++++++----------------- src/consensus.hpp | 1 + src/msg/fbuf/ledger_helpers.cpp | 6 +- src/msg/fbuf/p2pmsg_helpers.cpp | 20 ++--- src/msg/fbuf/p2pmsg_helpers.hpp | 4 +- src/msg/usrmsg_common.hpp | 2 + src/p2p/p2p.hpp | 2 +- src/usr/user_input.hpp | 30 +++---- src/usr/user_session_handler.cpp | 2 +- src/usr/usr.cpp | 148 +++++++++++++++++++++++-------- src/usr/usr.hpp | 24 +++-- 13 files changed, 234 insertions(+), 160 deletions(-) diff --git a/src/conf.cpp b/src/conf.cpp index ce55a8ee..335becbc 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -849,8 +849,12 @@ namespace conf jsoncons::ojson appbill; appbill.insert_or_assign("mode", contract.appbill.mode); appbill.insert_or_assign("bin_args", contract.appbill.bin_args); - jdoc.insert_or_assign("appbill", appbill); + + jsoncons::ojson round_limits; + round_limits.insert_or_assign("user_input_bytes", contract.round_limits.user_input_bytes); + round_limits.insert_or_assign("user_output_bytes", contract.round_limits.user_output_bytes); + jdoc.insert_or_assign("round_limits", round_limits); } /** @@ -938,6 +942,9 @@ namespace conf return -1; } contract.appbill.bin_args = jdoc["appbill"]["bin_args"].as(); + + contract.round_limits.user_input_bytes = jdoc["round_limits"]["user_input_bytes"].as(); + contract.round_limits.user_output_bytes = jdoc["round_limits"]["user_output_bytes"].as(); } catch (const std::exception &e) { diff --git a/src/conf.hpp b/src/conf.hpp index 8c4cbebf..49ae0d7e 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -84,6 +84,13 @@ namespace conf // Config element which are initialized in memory (This is not directly loaded from the config file) std::vector runtime_args; // Appbill execution args used during runtime. }; + + struct round_limits_config + { + size_t user_input_bytes = 0; // Max contract input bytes per user per round. + size_t user_output_bytes = 0; // Max contract output bytes per user per round. + }; + struct contract_config { std::string id; // Contract guid. @@ -97,6 +104,7 @@ namespace conf bool is_consensus_public = false; // If true, consensus are broadcasted to non-unl nodes as well. bool is_npl_public = false; // If true, npl messages are broadcasted to non-unl nodes as well. appbill_config appbill; + round_limits_config round_limits; // Config element which are initialized in memory (This is not directly loaded from the config file) std::vector runtime_binexec_args; // Contract binary execution args used during runtime. diff --git a/src/consensus.cpp b/src/consensus.cpp index b1c2d707..86dcfce6 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -364,8 +364,9 @@ namespace consensus // Construct NUP. for (auto &[sid, user] : usr::ctx.users) { - std::list user_inputs; + std::list user_inputs; user_inputs.splice(user_inputs.end(), user.submitted_inputs); + user.collected_input_size = 0; // Reset the collected inputs size counter. // We should create an entry for each user pubkey, even if the user has no inputs. This is // because this data map will be used to track connected users as well in addition to inputs. @@ -435,109 +436,81 @@ namespace consensus */ int verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no) { - // Move over NUPs collected from the network into a local list. - std::list collected_nups; - { - std::scoped_lock lock(p2p::ctx.collected_msgs.nonunl_proposals_mutex); - collected_nups.splice(collected_nups.end(), p2p::ctx.collected_msgs.nonunl_proposals); - } + // Maintains users and any input-acceptance responses we should send to them. + // Key: user pubkey. Value: List of responses for that user. + std::unordered_map> responses; - // Prepare merged list of users with each user's inputs grouped under the user. + // Maintains merged list of users with each user's inputs grouped under the user. // Key: user pubkey, Value: List of inputs from the user. - std::unordered_map> input_groups; - for (p2p::nonunl_proposal &p : collected_nups) + std::unordered_map> input_groups; + + // Move over NUPs collected from the network input groups (grouped by user). { - for (auto &[pubkey, umsgs] : p.user_inputs) + std::list collected_nups; { - // Move any user inputs from each NUP over to the grouped inputs under the user pubkey. - std::list &input_list = input_groups[pubkey]; - input_list.splice(input_list.end(), umsgs); + std::scoped_lock lock(p2p::ctx.collected_msgs.nonunl_proposals_mutex); + collected_nups.splice(collected_nups.end(), p2p::ctx.collected_msgs.nonunl_proposals); + } + + for (p2p::nonunl_proposal &p : collected_nups) + { + for (auto &[pubkey, sbmitted_inputs] : p.user_inputs) + { + // Move any user inputs from each NUP over to the grouped inputs under the user pubkey. + std::list &input_list = input_groups[pubkey]; + input_list.splice(input_list.end(), sbmitted_inputs); + } } } - collected_nups.clear(); - // Maintains users and any input-acceptance responses we should send to them. - // Key: user pubkey. Value: List of [user-protocol, msg-sig, reject-reason] tuples. - std::unordered_map>> responses; - - for (const auto &[pubkey, umsgs] : input_groups) + for (auto &[pubkey, submitted_inputs] : input_groups) { // Populate user list with this user's pubkey. ctx.candidate_users.emplace(pubkey); + std::list extracted_inputs; + + for (const usr::submitted_user_input &submitted_input : submitted_inputs) + { + usr::extracted_user_input extracted = {}; + const char *reject_reason = usr::extract_submitted_input(pubkey, submitted_input, extracted); + + if (reject_reason == NULL) + extracted_inputs.push_back(std::move(extracted)); + else + responses[pubkey].push_back(usr::input_status_response{submitted_input.protocol, submitted_input.sig, reject_reason}); + } + + // This will sort the inputs in nonce order so the validation will follow the same order on all nodes. + extracted_inputs.sort(); + // Keep track of total input length to verify against remaining balance. // We only process inputs in the submitted order that can be satisfied with the remaining account balance. - size_t total_input_len = 0; - bool appbill_balance_exceeded = false; + size_t total_input_size = 0; - for (const usr::user_input &umsg : umsgs) + for (const usr::extracted_user_input &extracted_input : extracted_inputs) { - const char *reject_reason = NULL; + util::buffer_view stored_input; // Contains pointer to the input data stored in memfd accessed by the contract. + std::string hash; - if (appbill_balance_exceeded) - { - reject_reason = msg::usrmsg::REASON_APPBILL_BALANCE_EXCEEDED; - } - else - { - util::buffer_view input; - std::string hash; - uint64_t max_lcl_seqno; - reject_reason = usr::validate_user_input_submission(pubkey, umsg, lcl_seq_no, total_input_len, hash, input, max_lcl_seqno); + // Validate the input against all submission criteria. + const char *reject_reason = usr::validate_user_input_submission(pubkey, extracted_input, lcl_seq_no, total_input_size, hash, stored_input); - if (reject_reason == NULL && !input.is_null()) - { - // No reject reason means we should go ahead and subject the input to consensus. - ctx.candidate_user_inputs.try_emplace( - hash, - candidate_user_input(pubkey, input, max_lcl_seqno)); - } - else if (reject_reason == msg::usrmsg::REASON_APPBILL_BALANCE_EXCEEDED) - { - // Abandon processing further inputs from this user when we find out - // an input cannot be processed with the account balance. - appbill_balance_exceeded = true; - } + if (reject_reason == NULL && !stored_input.is_null()) + { + // No reject reason means we should go ahead and subject the input to consensus. + ctx.candidate_user_inputs.try_emplace( + hash, + candidate_user_input(pubkey, stored_input, extracted_input.max_lcl_seqno)); } - responses[pubkey].push_back(std::tuple(umsg.protocol, umsg.sig, reject_reason)); + responses[pubkey].push_back(usr::input_status_response{extracted_input.protocol, extracted_input.sig, reject_reason}); } } input_groups.clear(); - { - // Lock the user sessions. - std::scoped_lock lock(usr::ctx.users_mutex); - - for (auto &[pubkey, user_responses] : responses) - { - // Locate this user's socket session. - const auto user_itr = usr::ctx.users.find(pubkey); - if (user_itr != usr::ctx.users.end()) - { - // Send the request status result if this user is connected to us. - for (auto &resp : user_responses) - { - // resp: 0=protocl, 1=msg sig, 2=reject reason. - const char *reject_reason = std::get<2>(resp); - - // We are not sending any status response for 'already submitted' inputs. This is because the user - // would have gotten the proper status response during first submission. - if (reject_reason != msg::usrmsg::REASON_ALREADY_SUBMITTED) - { - msg::usrmsg::usrmsg_parser parser(std::get<0>(resp)); - const std::string &msg_sig = std::get<1>(resp); - usr::send_input_status(parser, - user_itr->second.session, - reject_reason == NULL ? msg::usrmsg::STATUS_ACCEPTED : msg::usrmsg::STATUS_REJECTED, - reject_reason == NULL ? "" : reject_reason, - msg_sig); - } - } - } - } - } + usr::send_input_status_responses(responses); return 0; } @@ -824,10 +797,7 @@ namespace consensus // Taking the raw input string from the buffer_view. std::string input; if (usr::input_store.read_buf(cand_input.input, input) != -1) - { - usr::raw_user_input raw_input(cand_input.userpubkey, std::move(input)); - raw_inputs.emplace(hash, std::move(raw_input)); - } + raw_inputs.emplace(hash, usr::raw_user_input{cand_input.userpubkey, std::move(input)}); } } } diff --git a/src/consensus.hpp b/src/consensus.hpp index d1534cd6..397d2d87 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -95,6 +95,7 @@ namespace consensus std::map state_hash; std::map patch_hash; }; + extern std::atomic is_patch_update_pending; // Keep track whether the patch file is changed by the SC and is not yet applied to runtime. int init(); diff --git a/src/msg/fbuf/ledger_helpers.cpp b/src/msg/fbuf/ledger_helpers.cpp index 6d08401c..eb9aae16 100644 --- a/src/msg/fbuf/ledger_helpers.cpp +++ b/src/msg/fbuf/ledger_helpers.cpp @@ -81,8 +81,10 @@ namespace msg::fbuf::ledger map.reserve(fbvec->size()); for (auto el : *fbvec) { - usr::raw_user_input raw_user_input(flatbuff_bytes_to_sv(el->pubkey()), flatbuff_bytes_to_sv(el->input())); - map.emplace(flatbuff_bytes_to_sv(el->hash()), raw_user_input); + map.emplace(flatbuff_bytes_to_sv(el->hash()), + usr::raw_user_input{ + std::string(flatbuff_bytes_to_sv(el->pubkey())), + std::string(flatbuff_bytes_to_sv(el->input()))}); } return map; } diff --git a/src/msg/fbuf/p2pmsg_helpers.cpp b/src/msg/fbuf/p2pmsg_helpers.cpp index c4d85839..8be523af 100644 --- a/src/msg/fbuf/p2pmsg_helpers.cpp +++ b/src/msg/fbuf/p2pmsg_helpers.cpp @@ -730,21 +730,21 @@ namespace msg::fbuf::p2pmsg //---Conversion helpers from flatbuffers data types to std data types---// - const std::unordered_map> + const std::unordered_map> flatbuf_user_input_group_to_user_input_map(const flatbuffers::Vector> *fbvec) { - std::unordered_map> map; + std::unordered_map> map; map.reserve(fbvec->size()); for (const UserInputGroup *group : *fbvec) { - std::list user_inputs_list; + std::list user_inputs_list; for (const auto msg : *group->messages()) { - user_inputs_list.push_back(usr::user_input( - flatbuff_bytes_to_sv(msg->input_container()), - flatbuff_bytes_to_sv(msg->signature()), - static_cast(msg->protocol()))); + user_inputs_list.push_back(usr::submitted_user_input{ + std::string(flatbuff_bytes_to_sv(msg->input_container())), + std::string(flatbuff_bytes_to_sv(msg->signature())), + static_cast(msg->protocol())}); } map.emplace(flatbuff_bytes_to_sv(group->pubkey()), std::move(user_inputs_list)); @@ -756,14 +756,14 @@ namespace msg::fbuf::p2pmsg //---These are used in constructing Flatbuffer messages using builders---// const flatbuffers::Offset>> - user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map) + user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map) { std::vector> fbvec; fbvec.reserve(map.size()); for (const auto &[pubkey, msglist] : map) { std::vector> fbmsgsvec; - for (const usr::user_input &msg : msglist) + for (const usr::submitted_user_input &msg : msglist) { fbmsgsvec.push_back(CreateUserInput( builder, @@ -787,7 +787,7 @@ namespace msg::fbuf::p2pmsg for (const HistoryLedgerBlockPair *pair : *fbvec) { - std::list msglist; + std::list msglist; p2p::history_ledger_block ledger; diff --git a/src/msg/fbuf/p2pmsg_helpers.hpp b/src/msg/fbuf/p2pmsg_helpers.hpp index 309b478d..0e35801d 100644 --- a/src/msg/fbuf/p2pmsg_helpers.hpp +++ b/src/msg/fbuf/p2pmsg_helpers.hpp @@ -80,13 +80,13 @@ namespace msg::fbuf::p2pmsg //---Conversion helpers from flatbuffers data types to std data types---// - const std::unordered_map> + const std::unordered_map> flatbuf_user_input_group_to_user_input_map(const flatbuffers::Vector> *fbvec); //---Conversion helpers from std data types to flatbuffers data types---// const flatbuffers::Offset>> - user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map); + user_input_map_to_flatbuf_user_input_group(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map); const std::map flatbuf_historyledgermap_to_historyledgermap(const flatbuffers::Vector> *fbvec); diff --git a/src/msg/usrmsg_common.hpp b/src/msg/usrmsg_common.hpp index 21ce2dbc..19ebf0ed 100644 --- a/src/msg/usrmsg_common.hpp +++ b/src/msg/usrmsg_common.hpp @@ -58,6 +58,8 @@ namespace msg::usrmsg constexpr const char *REASON_MAX_LEDGER_EXPIRED = "max_ledger_expired"; constexpr const char *REASON_NONCE_EXPIRED = "nonce_expired"; constexpr const char *REASON_ALREADY_SUBMITTED = "already_submitted"; + constexpr const char *REASON_NONCE_OVERFLOW = "nonce_overflow"; + constexpr const char *REASON_ROUND_INPUTS_OVERFLOW = "round_inputs_overflow"; } // namespace msg::usrmsg diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 1c143fc0..4803d2a5 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -38,7 +38,7 @@ namespace p2p struct nonunl_proposal { - std::unordered_map> user_inputs; + std::unordered_map> user_inputs; }; struct history_request diff --git a/src/usr/user_input.hpp b/src/usr/user_input.hpp index 61de2a4f..403f2b48 100644 --- a/src/usr/user_input.hpp +++ b/src/usr/user_input.hpp @@ -8,22 +8,27 @@ namespace usr { /** - * Represents a signed contract input message a network user has submitted. - */ - struct user_input + * Represents a signed contract input message a network user has submitted. + */ + struct submitted_user_input { const std::string input_container; const std::string sig; - const util::PROTOCOL protocol; // The encoding protocol used for the input container. + const util::PROTOCOL protocol; // The message protocol used by the user. + }; - user_input(const std::string input_container, const std::string sig, const util::PROTOCOL protocol) - : input_container(std::move(input_container)), sig(std::move(sig)), protocol(protocol) - { - } + struct extracted_user_input + { + std::string input; + std::string nonce; + uint64_t max_lcl_seqno; + std::string sig; + util::PROTOCOL protocol; // The message protocol used by the user. - user_input(std::string_view input_container, std::string_view sig, const util::PROTOCOL protocol) - : input_container(input_container), sig(sig), protocol(protocol) + // Comparison operator used for sorting user's inputs in nonce order. + bool operator<(const extracted_user_input &other) { + return nonce < other.nonce; } }; @@ -31,11 +36,6 @@ namespace usr { const std::string pubkey; const std::string input; - - raw_user_input(std::string_view pubkey, std::string_view input) - : pubkey(pubkey), input(input) - { - } }; } // namespace usr diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp index 40653a15..dc1888b2 100644 --- a/src/usr/user_session_handler.cpp +++ b/src/usr/user_session_handler.cpp @@ -66,7 +66,7 @@ namespace usr { // This is an authed user. connected_user &user = itr->second; - if (handle_user_message(user, message) != 0) + if (handle_authed_user_message(user, message) != 0) { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); LOG_DEBUG << "Bad message from user " << session.display_name(); diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 1bc4d2ff..76bdbff6 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -28,6 +28,8 @@ namespace usr uint64_t metric_thresholds[5]; bool init_success = false; + constexpr size_t MAX_INPUT_NONCE_SIZE = 128; + /** * Initializes the usr subsystem. Must be called once during application startup. * @return 0 for successful initialization. -1 for failure. @@ -128,12 +130,12 @@ namespace usr } /** - * Processes a message sent by a connected user. This will be invoked by web socket on_message handler. + * Processes a message sent by a authenticated user. This will be invoked by web socket on_message handler. * @param user The authenticated user who sent the message. * @param message The message sent by user. * @return 0 on successful processing. -1 for failure. */ - int handle_user_message(connected_user &user, std::string_view message) + int handle_authed_user_message(connected_user &user, std::string_view message) { msg::usrmsg::usrmsg_parser parser(user.protocol); @@ -175,14 +177,33 @@ namespace usr uint64_t max_lcl_seqno; if (parser.extract_input_container(input_data, nonce, max_lcl_seqno, input_container) != -1) { + // Check for max nonce size. + if (nonce.size() > MAX_INPUT_NONCE_SIZE) + { + send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_NONCE_OVERFLOW, sig); + return -1; + } + + // Check whether the newly received input is going to cause overflow of round input limit. + if (conf::cfg.contract.round_limits.user_input_bytes > 0 && + (user.collected_input_size + input_data.size()) > conf::cfg.contract.round_limits.user_input_bytes) + { + send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_ROUND_INPUTS_OVERFLOW, sig); + return -1; + } + const int nonce_status = nonce_map.check(user.pubkey, nonce, sig, max_lcl_seqno, true); if (nonce_status == 0) { //Add to the submitted input list. - user.submitted_inputs.push_back(user_input( + user.submitted_inputs.push_back(submitted_user_input{ std::move(input_container), std::move(sig), - user.protocol)); + user.protocol}); + + // Increment the collected input size counter. This will be reset whenever collected inputs are moved + // to concensus candidate input set. + user.collected_input_size += input_data.size(); return 0; } else @@ -226,6 +247,39 @@ namespace usr } } + /** + * Sends multiple user input responses grouped by user. + */ + void send_input_status_responses(const std::unordered_map> &responses) + { + // Lock the user sessions. + std::scoped_lock lock(usr::ctx.users_mutex); + + for (auto &[pubkey, user_responses] : responses) + { + // Locate this user's socket session. + const auto user_itr = usr::ctx.users.find(pubkey); + if (user_itr != usr::ctx.users.end()) + { + // Send the request status result if this user is connected to us. + for (const input_status_response &resp : user_responses) + { + // We are not sending any status response for 'already submitted' inputs. This is because the user + // would have gotten the proper status response during first submission. + if (resp.reject_reason != msg::usrmsg::REASON_ALREADY_SUBMITTED) + { + msg::usrmsg::usrmsg_parser parser(resp.protocol); + send_input_status(parser, + user_itr->second.session, + resp.reject_reason == NULL ? msg::usrmsg::STATUS_ACCEPTED : msg::usrmsg::STATUS_REJECTED, + resp.reject_reason == NULL ? "" : resp.reject_reason, + resp.sig); + } + } + } + } + } + /** * Send the specified contract input status result via the provided session. */ @@ -300,61 +354,77 @@ namespace usr return 0; } + const char *extract_submitted_input(const std::string &user_pubkey, const usr::submitted_user_input &submitted, usr::extracted_user_input &extracted) + { + // Verify the signature of the submitted input_container. + if (crypto::verify(submitted.input_container, submitted.sig, user_pubkey) == -1) + { + LOG_DEBUG << "User input bad signature."; + return msg::usrmsg::REASON_BAD_SIG; + } + + // Extract information from input container. + msg::usrmsg::usrmsg_parser parser(submitted.protocol); + if (parser.extract_input_container(extracted.input, extracted.nonce, extracted.max_lcl_seqno, submitted.input_container) == -1) + { + LOG_DEBUG << "User input bad input container format."; + return msg::usrmsg::REASON_BAD_MSG_FORMAT; + } + + extracted.sig = std::move(submitted.sig); + extracted.protocol = submitted.protocol; + + return NULL; + } + /** * Validates the provided user input message against all the required criteria. * @return The rejection reason if input rejected. NULL if the input can be accepted. */ - const char *validate_user_input_submission(const std::string &user_pubkey, const usr::user_input &umsg, - const uint64_t lcl_seq_no, size_t &total_input_len, - std::string &hash, util::buffer_view &input, uint64_t &max_lcl_seqno) + const char *validate_user_input_submission(const std::string &user_pubkey, const usr::extracted_user_input &extracted_input, + const uint64_t lcl_seq_no, size_t &total_input_size, std::string &hash, util::buffer_view &input) { - // Verify the signature of the input_container. - if (crypto::verify(umsg.input_container, umsg.sig, user_pubkey) == -1) - { - LOG_DEBUG << "User message bad signature."; - return msg::usrmsg::REASON_BAD_SIG; - } - - std::string nonce; - msg::usrmsg::usrmsg_parser parser(umsg.protocol); - - std::string input_data; - if (parser.extract_input_container(input_data, nonce, max_lcl_seqno, umsg.input_container) == -1) - { - LOG_DEBUG << "User message bad input format."; - return msg::usrmsg::REASON_BAD_MSG_FORMAT; - } - // Ignore the input if our ledger has passed the input TTL. - if (max_lcl_seqno <= lcl_seq_no) + if (extracted_input.max_lcl_seqno <= lcl_seq_no) { - LOG_DEBUG << "User message bad max ledger seq expired."; + LOG_DEBUG << "User input bad max ledger seq expired."; return msg::usrmsg::REASON_MAX_LEDGER_EXPIRED; } - const int nonce_status = nonce_map.check(user_pubkey, nonce, umsg.sig, max_lcl_seqno); + // Check subtotal of inputs extracted so far with the input size limit. + const size_t new_total_input_size = total_input_size + extracted_input.input.size(); + if (conf::cfg.contract.round_limits.user_input_bytes > 0 && + new_total_input_size > conf::cfg.contract.round_limits.user_input_bytes) + { + LOG_DEBUG << "User input input exceeds round limit."; + return msg::usrmsg::REASON_ROUND_INPUTS_OVERFLOW; + } + + const int nonce_status = nonce_map.check(user_pubkey, extracted_input.nonce, extracted_input.sig, extracted_input.max_lcl_seqno); if (nonce_status > 0) { - LOG_DEBUG << (nonce_status == 1 ? "User message nonce expired." : "User message with same nonce/sig already submitted."); + LOG_DEBUG << (nonce_status == 1 ? "User input nonce expired." : "User input with same nonce/sig already submitted."); return (nonce_status == 1 ? msg::usrmsg::REASON_NONCE_EXPIRED : msg::usrmsg::REASON_ALREADY_SUBMITTED); } - // Keep checking the subtotal of inputs extracted so far with the appbill account balance. - total_input_len += input_data.length(); - if (!verify_appbill_check(user_pubkey, total_input_len)) + if (!verify_appbill_check(user_pubkey, new_total_input_size)) { - LOG_DEBUG << "User message app bill balance exceeded."; + LOG_DEBUG << "User input app bill balance exceeded."; return msg::usrmsg::REASON_APPBILL_BALANCE_EXCEEDED; } - // Hash is prefixed with the nonce to support user-defined sort order. - hash = std::move(nonce); - // Append the hash of the message signature to get the final hash. - hash.append(crypto::get_hash(umsg.sig)); + // Reaching here means the input is successfully validated and we can submit it to consensus. - // Copy the input data into the input store. - std::string_view s(); - input = input_store.write_buf(input_data.data(), input_data.size()); + // Hash is used as the globally unqiue 'key' to represent this input for this consensus round. + // It is prefixed with the nonce to support user-defined sort order and signature hash is appended + // to make it unique among inputs from all users. + hash = extracted_input.nonce + crypto::get_hash(extracted_input.sig); + + // Copy the input data into the input store. Contract will read the input from this location. + input = input_store.write_buf(extracted_input.input.data(), extracted_input.input.size()); + + // Increment the total valid input size so far. + total_input_size = new_total_input_size; return NULL; // Success. No reject reason. } diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index f465cf27..4276581f 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -28,7 +28,10 @@ namespace usr const std::string pubkey; // Holds the unprocessed user inputs collected from websocket. - std::list submitted_inputs; + std::list submitted_inputs; + + // Total input bytes collected which are pending to be subjected to consensus. + size_t collected_input_size = 0; // Holds the websocket session of this user. // We don't need to own the session object since the lifetime of user and session are coupled. @@ -59,6 +62,14 @@ namespace usr std::optional server; }; + + struct input_status_response + { + const util::PROTOCOL protocol; + const std::string sig; + const char *reject_reason; + }; + extern connected_context ctx; extern util::buffer_store input_store; @@ -70,7 +81,9 @@ namespace usr int verify_challenge(std::string_view message, usr::user_comm_session &session); - int handle_user_message(connected_user &user, std::string_view message); + int handle_authed_user_message(connected_user &user, std::string_view message); + + void send_input_status_responses(const std::unordered_map> &responses); void send_input_status(const msg::usrmsg::usrmsg_parser &parser, usr::user_comm_session &session, std::string_view status, std::string_view reason, std::string_view input_sig); @@ -79,9 +92,10 @@ namespace usr int remove_user(const std::string &pubkey); - const char *validate_user_input_submission(const std::string &user_pubkey, const usr::user_input &umsg, - const uint64_t lcl_seq_no, size_t &total_input_len, - std::string &hash, util::buffer_view &input, uint64_t &max_lcl_seqno); + const char *extract_submitted_input(const std::string &user_pubkey, const usr::submitted_user_input &submitted, usr::extracted_user_input &extracted); + + const char *validate_user_input_submission(const std::string &user_pubkey, const usr::extracted_user_input &extracted_input, + const uint64_t lcl_seq_no, size_t &total_input_size, std::string &hash, util::buffer_view &input); bool verify_appbill_check(std::string_view pubkey, const size_t input_len);