#include "../pchheader.hpp" #include "../msg/json/usrmsg_json.hpp" #include "../msg/usrmsg_parser.hpp" #include "../msg/usrmsg_common.hpp" #include "../util/util.hpp" #include "../util/sequence_hash.hpp" #include "../conf.hpp" #include "../crypto.hpp" #include "../hplog.hpp" #include "../ledger/ledger.hpp" #include "../util/buffer_store.hpp" #include "../hpfs/hpfs_mount.hpp" #include "../status.hpp" #include "usr.hpp" #include "user_comm_session.hpp" #include "user_comm_server.hpp" #include "user_input.hpp" #include "read_req.hpp" #include "input_nonce_map.hpp" namespace usr { // Holds global connected-users and related objects. connected_context ctx; util::buffer_store input_store; input_nonce_map nonce_map; uint64_t metric_thresholds[5]; bool init_success = false; /** * Initializes the usr subsystem. Must be called once during application startup. * @return 0 for successful initialization. -1 for failure. */ int init() { metric_thresholds[0] = conf::cfg.user.max_bytes_per_min; metric_thresholds[1] = 0; // This metric doesn't apply to user context. metric_thresholds[2] = 0; // This metric doesn't apply to user context. metric_thresholds[3] = conf::cfg.user.max_bad_msgs_per_min; metric_thresholds[4] = conf::cfg.user.idle_timeout; if (input_store.init() == -1) return -1; // Start listening for incoming user connections only if user connection listening is enabled. if (conf::cfg.user.listen) { if (start_listening() == -1) return -1; } else { LOG_INFO << "User connection listner isn't started since user connections are disabled."; } init_success = true; return 0; } /** * Cleanup any running processes. */ void deinit() { if (init_success) { // Stop com server only if user connections config is enabled (Otherwise server hasn't been started). if (conf::cfg.user.listen) ctx.server->stop(); input_store.deinit(); } } /** * Starts listening for incoming user websocket connections. */ int start_listening() { ctx.server.emplace("User", conf::cfg.user.port, metric_thresholds, conf::cfg.user.max_bytes_per_msg, conf::cfg.user.max_connections, conf::cfg.user.max_in_connections_per_host, false); if (ctx.server->start() == -1) return -1; LOG_INFO << "Started listening for user connections on " << std::to_string(conf::cfg.user.port); return 0; } /** * Verifies the given message for a previously issued user challenge. * @param message Challenge response. * @param session The socket session that received the response. * @return 0 for successful verification. -1 for failure. */ int verify_challenge(std::string_view message, usr::user_comm_session &session) { // The received message must be the challenge response. We need to verify it. if (session.issued_challenge.empty()) { LOG_DEBUG << "No user challenge found for the session " << session.display_name(); return -1; } std::string user_pubkey_hex; std::string protocol_code; std::string server_challenge; if (msg::usrmsg::json::verify_user_challenge(user_pubkey_hex, protocol_code, server_challenge, message, session.issued_challenge) == 0) { // If user has specified server challange, we need to send a challenge response. if (!server_challenge.empty()) { std::vector msg; msg::usrmsg::json::create_server_challenge_response(msg, server_challenge); session.send(msg); } // Challenge signature verification successful. Add the user to our global user list. add_user(session, user_pubkey_hex, protocol_code); return 0; } else { LOG_DEBUG << "User challenge verification failed " << session.display_name(); return -1; } } /** * Processes a message sent by a authenticated user. This will be invoked by web socket on_message handler. * @param user The authenticated user who sent the message. * @param message The message sent by user. * @return 0 on successful processing. -1 for failure. */ int handle_authed_user_message(connected_user &user, std::string_view message) { msg::usrmsg::usrmsg_parser parser(user.protocol); if (parser.parse(message) == 0) { std::string msg_type; parser.extract_type(msg_type); if (msg_type == msg::usrmsg::MSGTYPE_CONTRACT_READ_REQUEST) { // Ignore the request if contract execution is disabled or read requests disallowed. if (!conf::cfg.contract.execute || conf::cfg.user.concurrent_read_requests == 0) return 0; std::string content; if (parser.extract_read_request(content) != -1) { if (read_req::populate_read_req_queue(user.pubkey, std::move(content)) == -1) LOG_WARNING << "Failed to enqueue read request."; return 0; } else { send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_BAD_MSG_FORMAT, ""); return -1; } } else if (msg_type == msg::usrmsg::MSGTYPE_CONTRACT_INPUT) { // Message is a contract input message. std::string input_container; std::string sig; if (parser.extract_signed_input_container(input_container, sig) != -1) { std::scoped_lock lock(ctx.users_mutex); std::string input_data; uint64_t nonce; uint64_t max_ledger_seq_no; if (parser.extract_input_container(input_data, nonce, max_ledger_seq_no, input_container) != -1) { const util::sequence_hash lcl_id = ledger::ctx.get_lcl_id(); // Ignore the input if the max ledger seq number specified is beyond the max offeset. if (conf::cfg.contract.max_input_ledger_offset != 0 && max_ledger_seq_no > lcl_id.seq_no + conf::cfg.contract.max_input_ledger_offset) { send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_MAX_LEDGER_OFFSET_EXCEEDED, crypto::get_hash(sig)); return -1; } // Ignore the input if our ledger has passed the input TTL. if (max_ledger_seq_no <= lcl_id.seq_no) { send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_MAX_LEDGER_EXPIRED, crypto::get_hash(sig)); return -1; } // Check whether the newly received input is going to cause overflow of round input limit. if (conf::cfg.contract.round_limits.user_input_bytes > 0 && (user.collected_input_size + input_data.size()) > conf::cfg.contract.round_limits.user_input_bytes) { send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_ROUND_INPUTS_OVERFLOW, crypto::get_hash(sig)); return -1; } const int nonce_status = nonce_map.check(user.pubkey, nonce, sig, max_ledger_seq_no, true); if (nonce_status == 0) { //Add to the submitted input list. user.submitted_inputs.push_back(submitted_user_input{ std::move(input_container), std::move(sig), user.protocol}); // Increment the collected input size counter. This will be reset whenever collected inputs are moved // to concensus candidate input set. user.collected_input_size += input_data.size(); return 0; } else { const char *reason = nonce_status == 1 ? msg::usrmsg::REASON_NONCE_EXPIRED : msg::usrmsg::REASON_ALREADY_SUBMITTED; send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, reason, crypto::get_hash(sig)); return -1; } } else { send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_BAD_MSG_FORMAT, crypto::get_hash(sig)); return -1; } } else { send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_BAD_MSG_FORMAT, crypto::get_hash(sig)); return -1; } } else if (msg_type == msg::usrmsg::MSGTYPE_STAT) { std::vector resp; parser.create_status_response(resp); user.session.send(resp); return 0; } else if (msg_type == msg::usrmsg::MSGTYPE_LCL) { std::vector resp; parser.create_lcl_response(resp); user.session.send(resp); return 0; } else if (msg_type == msg::usrmsg::MSGTYPE_SUBSCRIPTION) { NOTIFICATION_CHANNEL channel; bool enabled; if (parser.extract_subscription_request(channel, enabled) == -1) return -1; user.subscriptions[channel] = enabled; return 0; } else if (msg_type == msg::usrmsg::MSGTYPE_LEDGER_QUERY) { ledger::query::query_request req; std::string id; if (parser.extract_ledger_query(req, id) == -1) return -1; const ledger::query::query_result result = ledger::query::execute(user.pubkey, req); std::vector resp; parser.create_ledger_query_response(resp, id, result); user.session.send(resp); return 0; } else { LOG_DEBUG << "Invalid user message type: " << msg_type; send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_INVALID_MSG_TYPE, ""); return -1; } } else { // Bad message. send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_BAD_MSG_FORMAT, ""); return -1; } } /** * Sends multiple user input responses grouped by user. * @param responses The collection of status responses to be sent out. * @param ledger_seq_no The ledger seq no to indicate in accepted responses. Ignored for 'rejected' responses or for * individual responses having their own ledger information. * @param ledger_hash The ledger hash to indicate in accepted responses. Ignored for 'rejected' responses or for * individual responses having their own ledger information. */ void send_input_status_responses(const std::unordered_map> &responses, const uint64_t ledger_seq_no, const util::h32 &ledger_hash) { if (responses.empty()) return; // Lock the user sessions. std::scoped_lock lock(usr::ctx.users_mutex); for (auto &[pubkey, user_responses] : responses) { // Locate this user's socket session. const auto user_itr = usr::ctx.users.find(pubkey); if (user_itr != usr::ctx.users.end()) { msg::usrmsg::usrmsg_parser parser(user_itr->second.protocol); // Send the request status result if this user is connected to us. for (const input_status_response &resp : user_responses) { // We are not sending any status response for 'already submitted' inputs. This is because the user // would have gotten the proper status response during first submission. if (resp.reject_reason != msg::usrmsg::REASON_ALREADY_SUBMITTED) { send_input_status(parser, user_itr->second.session, resp.reject_reason == NULL ? msg::usrmsg::STATUS_ACCEPTED : msg::usrmsg::STATUS_REJECTED, resp.reject_reason == NULL ? "" : resp.reject_reason, resp.input_hash, // Give priority ledger seq no/hash contained in individual responses. resp.ledger_seq_no == 0 ? ledger_seq_no : resp.ledger_seq_no, resp.ledger_hash == util::h32_empty ? ledger_hash : resp.ledger_hash); } } } } } /** * Send the specified contract input status result via the provided session. */ void send_input_status(const msg::usrmsg::usrmsg_parser &parser, usr::user_comm_session &session, std::string_view status, std::string_view reason, std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash) { std::vector msg; parser.create_contract_input_status(msg, status, reason, input_hash, ledger_seq_no, ledger_hash); session.send(msg); } /** * Adds the user denoted by specified session id and public key to the global authed user list. * This should get called after the challenge handshake is verified. * * @param session User socket session. * @param user_pubkey_hex User's hex public key. * @param protocol_code Messaging protocol used by user. * @return 0 on successful additions. -1 on failure. */ int add_user(usr::user_comm_session &session, const std::string &pubkey_hex, std::string_view protocol_code) { // If max number of user connections reached skip the rest. if (ctx.users.size() == MAX_USER_COUNT) { LOG_DEBUG << "Rejecting " + session.display_name() << ". Maximum user count reached."; return -1; } // Decode hex pubkey and get binary pubkey. const std::string pubkey = util::to_bin(pubkey_hex); // Acquire user list lock. std::scoped_lock lock(ctx.users_mutex); // Now check whether this user public key is a duplicate. if (ctx.users.count(pubkey) == 0) { // All good. Unique public key. // Promote the connection from pending-challenges to authenticated users. const util::PROTOCOL protocol = (protocol_code == "json" ? util::PROTOCOL::JSON : util::PROTOCOL::BSON); session.mark_as_verified(); // Mark connection as a verified connection. session.issued_challenge.clear(); // Remove the stored challenge session.uniqueid = pubkey_hex; session.pubkey = pubkey; // Add the user to the global authed user list ctx.users.emplace(pubkey, usr::connected_user(session, pubkey, protocol)); LOG_DEBUG << "User connection authenticated. Public key " << pubkey_hex; } else { LOG_DEBUG << "Duplicate user public key " << session.display_name(); } return 0; } /** * Removes the specified public key from the global user list. * This must get called when an authenticated user disconnects from HP. * * @param pubkey User pubkey. * @return 0 on successful removals. -1 on failure. */ int remove_user(const std::string &pubkey) { std::scoped_lock lock(ctx.users_mutex); const auto itr = ctx.users.erase(pubkey); return 0; } const char *extract_submitted_input(const std::string &user_pubkey, const usr::submitted_user_input &submitted, usr::extracted_user_input &extracted) { // Verify the signature of the submitted input_container. if (crypto::verify(submitted.input_container, submitted.sig, user_pubkey) == -1) { LOG_DEBUG << "User input bad signature."; return msg::usrmsg::REASON_BAD_SIG; } // Extract information from input container. msg::usrmsg::usrmsg_parser parser(submitted.protocol); if (parser.extract_input_container(extracted.input, extracted.nonce, extracted.max_ledger_seq_no, submitted.input_container) == -1) { LOG_DEBUG << "User input bad input container format."; return msg::usrmsg::REASON_BAD_MSG_FORMAT; } extracted.sig = std::move(submitted.sig); return NULL; } /** * Validates the provided user input message against all the required criteria. * @return The rejection reason if input rejected. NULL if the input can be accepted. */ const char *validate_user_input_submission(const std::string &user_pubkey, const usr::extracted_user_input &extracted_input, const uint64_t lcl_seq_no, size_t &total_input_size, std::string &ordered_hash, util::buffer_view &input) { // Ordered hash is used as the globally unqiue 'key' to represent this input for this consensus round. // It is prefixed with the nonce to support user-defined sort order and the input hash is appended // to make it unique among inputs from all users. // Ordered hash = nonce (8 bytes) + input hash (32 bytes) // In the ledger, we will store the nonce and input hash separately. ordered_hash = util::uint64_to_string_bytes(extracted_input.nonce) + crypto::get_hash(extracted_input.sig); // Ignore the input if the max ledger seq number specified is beyond the max offeset. if (conf::cfg.contract.max_input_ledger_offset != 0 && extracted_input.max_ledger_seq_no > lcl_seq_no + conf::cfg.contract.max_input_ledger_offset) { LOG_DEBUG << "User input bad max ledger seq beyond the max offset."; return msg::usrmsg::REASON_MAX_LEDGER_OFFSET_EXCEEDED; } // Ignore the input if our ledger has passed the input TTL. if (extracted_input.max_ledger_seq_no <= lcl_seq_no) { LOG_DEBUG << "User input bad max ledger seq expired."; return msg::usrmsg::REASON_MAX_LEDGER_EXPIRED; } // Check subtotal of inputs extracted so far with the input size limit. const size_t new_total_input_size = total_input_size + extracted_input.input.size(); if (conf::cfg.contract.round_limits.user_input_bytes > 0 && new_total_input_size > conf::cfg.contract.round_limits.user_input_bytes) { LOG_DEBUG << "User input input exceeds round limit."; return msg::usrmsg::REASON_ROUND_INPUTS_OVERFLOW; } const int nonce_status = nonce_map.check(user_pubkey, extracted_input.nonce, extracted_input.sig, extracted_input.max_ledger_seq_no); if (nonce_status > 0) { LOG_DEBUG << (nonce_status == 1 ? "User input nonce expired." : "User input with same nonce/sig already submitted."); return (nonce_status == 1 ? msg::usrmsg::REASON_NONCE_EXPIRED : msg::usrmsg::REASON_ALREADY_SUBMITTED); } // Reaching here means the input is successfully validated and we can submit it to consensus. // Copy the input data into the input store. Contract will read the input from this location. input = input_store.write_buf(extracted_input.input.data(), extracted_input.input.size()); // Increment the total valid input size so far. total_input_size = new_total_input_size; return NULL; // Success. No reject reason. } /** * Sends any change event notifications to relevant users who are currently connected to the node. */ void dispatch_change_events() { status::change_event ev; while (status::event_queue.try_dequeue(ev)) { // Array to hold constructed message cache from each protocol. std::vector protocol_msgs[2]; if (ev.index() == 0) // UNL change event. Broadcast for subscribed users. { std::scoped_lock lock(ctx.users_mutex); for (auto &[sid, user] : ctx.users) { if (user.subscriptions[NOTIFICATION_CHANNEL::UNL_CHANGE]) { std::vector &msg = protocol_msgs[user.protocol]; if (msg.empty()) // Construct the message with relevant protocol if not done so already. { msg::usrmsg::usrmsg_parser parser(user.protocol); const status::unl_change_event &unl_ev = std::get(ev); parser.create_unl_notification(msg, unl_ev.unl); } user.session.send(msg); } } } else if (ev.index() == 1 || ev.index() == 2) // Ledger events. Broadcast for subscribed users. { std::scoped_lock lock(ctx.users_mutex); for (auto &[sid, user] : ctx.users) { if (user.subscriptions[NOTIFICATION_CHANNEL::LEDGER_EVENT]) { std::vector &msg = protocol_msgs[user.protocol]; if (msg.empty()) // Construct the message with relevant protocol if not done so already. { msg::usrmsg::usrmsg_parser parser(user.protocol); if (ev.index() == 1) // Ledger created event. { const status::ledger_created_event &ledger_ev = std::get(ev); parser.create_ledger_created_notification(msg, ledger_ev.ledger); } else if (ev.index() == 2) // Vote status chnge event. { const status::vote_status_change_event &vote_ev = std::get(ev); parser.create_vote_status_notification(msg, vote_ev.vote_status); } } user.session.send(msg); } } } else if (ev.index() == 3) // Health events. Broadcast for subscribed users. { std::scoped_lock lock(ctx.users_mutex); for (auto &[sid, user] : ctx.users) { if (user.subscriptions[NOTIFICATION_CHANNEL::HEALTH_STAT]) { std::vector &msg = protocol_msgs[user.protocol]; if (msg.empty()) // Construct the message with relevant protocol if not done so already. { msg::usrmsg::usrmsg_parser parser(user.protocol); const status::health_event &health_ev = std::get(ev); parser.create_health_notification(msg, health_ev); } user.session.send(msg); } } } } } } // namespace usr