diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index ddd16d0c..7090515f 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -10,6 +10,7 @@ namespace comm { constexpr uint32_t INTERVALMS = 60000; constexpr uint16_t UNVERIFIED_INACTIVE_TIMEOUT = 5; // Time threshold for unverified inactive connections in seconds. + constexpr uint16_t MAX_IN_MSG_QUEUE_SIZE = 64; // Maximum in message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31).... comm_session::comm_session( std::string_view host_address, hpws::client &&hpws_client, const bool is_inbound, const uint64_t (&metric_thresholds)[5]) @@ -17,7 +18,7 @@ namespace comm host_address(host_address), hpws_client(std::move(hpws_client)), is_inbound(is_inbound), - in_msg_queue(32) + in_msg_queue(MAX_IN_MSG_QUEUE_SIZE) { // Create new session_thresholds and insert it to thresholds vector. // Have to maintain the SESSION_THRESHOLDS enum order in inserting new thresholds to thresholds vector @@ -76,7 +77,7 @@ namespace comm std::string_view data = std::get(read_result); std::vector msg(data.size()); memcpy(msg.data(), data.data(), data.size()); - in_msg_queue.enqueue(std::move(msg)); + in_msg_queue.try_enqueue(std::move(msg)); // Signal the hpws client that we are ready for next message. std::optional error = hpws_client->ack(data); @@ -130,7 +131,7 @@ namespace comm /** * Adds the given message to the outbound message queue. * @param message Message to be added to the outbound queue. - * @return 0 on successful addition and -1 if the session is already closed. + * @return 0 on successful addition and -1 if the session is already closed or there's no space in the queue. */ int comm_session::send(std::string_view message) { diff --git a/src/conf.cpp b/src/conf.cpp index 755553f0..6e3fd983 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -299,8 +299,8 @@ namespace conf cfg.peermaxdupmpm = d["peermaxdupmpm"].as(); cfg.peermaxbadmpm = d["peermaxbadmpm"].as(); cfg.peermaxbadsigpm = d["peermaxbadsigpm"].as(); - cfg.peermaxcons = d["peermaxcons"].as(); - cfg.peermaxknowncons = d["peermaxknowncons"].as(); + cfg.peermaxcons = d["peermaxcons"].as(); + cfg.peermaxknowncons = d["peermaxknowncons"].as(); // If peermaxknowcons is greater than peermaxcons then show error and stop execution. if (cfg.peermaxknowncons > cfg.peermaxcons) diff --git a/src/ledger.hpp b/src/ledger.hpp index 3b278293..85406d15 100644 --- a/src/ledger.hpp +++ b/src/ledger.hpp @@ -7,6 +7,8 @@ namespace ledger { constexpr const char *GENESIS_LEDGER = "0-genesis"; + constexpr uint16_t HISTORY_REQ_LIST_CAP = 64; // Maximum history request count. + constexpr uint16_t HISTORY_RES_LIST_CAP = 64; // Maximum history response count. struct sync_context { @@ -68,7 +70,7 @@ namespace ledger void deinit(); void lcl_syncer_loop(); - + void set_sync_target(std::string_view target_lcl); const std::pair get_ledger_cache_top(); diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 1b2b3d80..308cc84a 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -359,14 +359,22 @@ namespace p2p // Otherwise if new peer is recently updated (timestamp >) replace with the current one. if (itr == ctx.server->req_known_remotes.end()) { - ctx.server->req_known_remotes.push_back(peer); - LOG_DEBUG << "Adding " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + " to the known peer list"; + // If maximum number of peer list reached skip the rest of peers. + if (ctx.server->req_known_remotes.size() < p2p::PEER_LIST_CAP) + { + ctx.server->req_known_remotes.push_back(peer); + LOG_DEBUG << "Adding " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + " to the known peer list."; + } + else + { + LOG_DEBUG << "Rejecting " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + ". Maximum peer count reached."; + } } else if (itr->timestamp < peer.timestamp) { itr->available_capacity = peer.available_capacity; itr->timestamp = peer.timestamp; - LOG_DEBUG << "Replacing " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + " to the known peer list"; + LOG_DEBUG << "Replacing " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + " in the known peer list."; } } diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 77e4dadd..185be6c3 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -12,6 +12,12 @@ namespace p2p { + constexpr uint16_t PROPOSAL_LIST_CAP = 64; // Maximum proposal count. + constexpr uint16_t NONUNL_PROPOSAL_LIST_CAP = 64; // Maximum nonunl proposal count. + constexpr uint16_t STATE_REQ_LIST_CAP = 64; // Maximum state request count. + constexpr uint16_t STATE_RES_LIST_CAP = 64; // Maximum state response count. + constexpr uint16_t PEER_LIST_CAP = 64; // Maximum peer count. + struct proposal { std::string pubkey; diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 5580b08a..5b9d51e4 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -156,11 +156,13 @@ namespace p2p return 0; } - handle_proposal_message(container, content); + if (handle_proposal_message(container, content) != 0) + LOG_DEBUG << "Proposal rejected. Maximum proposal count reached. " << session.display_name(); } else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message { - handle_nonunl_proposal_message(container, content); + if (handle_nonunl_proposal_message(container, content) != 0) + LOG_DEBUG << "Nonunl proposal rejected. Maximum nonunl proposal count reached. " << session.display_name(); } else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message { @@ -175,34 +177,72 @@ namespace p2p } else if (content_message_type == p2pmsg::Message_State_Request_Message) { - // Insert request with lock. + // Check the cap and insert request with lock. std::scoped_lock lock(ctx.collected_msgs.state_requests_mutex); - std::string state_request_msg(reinterpret_cast(content_ptr), content_size); - ctx.collected_msgs.state_requests.push_back(std::make_pair(session.uniqueid, std::move(state_request_msg))); + + // If max number of state requests reached skip the rest. + if (ctx.collected_msgs.state_requests.size() < p2p::STATE_REQ_LIST_CAP) + { + std::string state_request_msg(reinterpret_cast(content_ptr), content_size); + ctx.collected_msgs.state_requests.push_back(std::make_pair(session.uniqueid, std::move(state_request_msg))); + } + else + { + LOG_DEBUG << "State request rejected. Maximum state request count reached. " << session.display_name(); + } } else if (content_message_type == p2pmsg::Message_State_Response_Message) { if (state_sync::ctx.is_syncing) // Only accept state responses if state is syncing. { - // Insert state_response with lock. + // Check the cap and insert state_response with lock. std::scoped_lock lock(ctx.collected_msgs.state_responses_mutex); - std::string response(reinterpret_cast(content_ptr), content_size); - ctx.collected_msgs.state_responses.push_back(std::make_pair(session.uniqueid, std::move(response))); + + // If max number of state responses reached skip the rest. + if (ctx.collected_msgs.state_responses.size() < p2p::STATE_RES_LIST_CAP) + { + std::string response(reinterpret_cast(content_ptr), content_size); + ctx.collected_msgs.state_responses.push_back(std::make_pair(session.uniqueid, std::move(response))); + } + else + { + LOG_DEBUG << "State response rejected. Maximum state response count reached. " << session.display_name(); + } } } else if (content_message_type == p2pmsg::Message_History_Request_Message) //message is a lcl history request message { - const p2p::history_request hr = p2pmsg::create_history_request_from_msg(*content->message_as_History_Request_Message()); + // Check the cap and insert request with lock. std::scoped_lock lock(ledger::sync_ctx.list_mutex); - ledger::sync_ctx.collected_history_requests.push_back(std::make_pair(session.uniqueid, std::move(hr))); + + // If max number of history requests reached skip the rest. + if (ledger::sync_ctx.collected_history_requests.size() < ledger::HISTORY_REQ_LIST_CAP) + { + const p2p::history_request hr = p2pmsg::create_history_request_from_msg(*content->message_as_History_Request_Message()); + ledger::sync_ctx.collected_history_requests.push_back(std::make_pair(session.uniqueid, std::move(hr))); + } + else + { + LOG_DEBUG << "History request rejected. Maximum history request count reached. " << session.display_name(); + } } else if (content_message_type == p2pmsg::Message_History_Response_Message) //message is a lcl history response message { if (ledger::sync_ctx.is_syncing) // Only accept history responses if ledger is syncing. { - const p2p::history_response hr = p2pmsg::create_history_response_from_msg(*content->message_as_History_Response_Message()); + // Check the cap and insert response with lock. std::scoped_lock lock(ledger::sync_ctx.list_mutex); - ledger::sync_ctx.collected_history_responses.push_back(std::move(hr)); + + // If max number of history respoinses reached skip the rest. + if (ledger::sync_ctx.collected_history_responses.size() < ledger::HISTORY_RES_LIST_CAP) + { + const p2p::history_response hr = p2pmsg::create_history_response_from_msg(*content->message_as_History_Response_Message()); + ledger::sync_ctx.collected_history_responses.push_back(std::move(hr)); + } + else + { + LOG_DEBUG << "History response rejected. Maximum history response count reached. " << session.display_name(); + } } } else @@ -236,29 +276,61 @@ namespace p2p const p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc if (content_message_type == p2pmsg::Message_Proposal_Message) // message is a proposal message - handle_proposal_message(container, content); + { + if (handle_proposal_message(container, content) != 0) + LOG_DEBUG << "Proposal rejected. Maximum proposal count reached. self"; + } else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message - handle_nonunl_proposal_message(container, content); + { + if (handle_nonunl_proposal_message(container, content) != 0) + LOG_DEBUG << "Nonunl proposal rejected. Maximum nonunl proposal count reached. self"; + } else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message handle_npl_message(container, content); return 0; } - void handle_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content) + /** + * Handle proposal message. + * @param container Message container. + * @param content Message content. + * @return returns 0 if proposal is pushed to the list, otherwise -1. + */ + int handle_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content) { - std::scoped_lock lock(ctx.collected_msgs.proposals_mutex); // Insert proposal with lock. + // Check the cap and insert proposal with lock. + std::scoped_lock lock(ctx.collected_msgs.proposals_mutex); + + // If max number of proposals reached skip the rest. + if (ctx.collected_msgs.proposals.size() == p2p::PROPOSAL_LIST_CAP) + return -1; ctx.collected_msgs.proposals.push_back( p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey(), container->timestamp(), container->lcl())); + + return 0; } - void handle_nonunl_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content) + /** + * Handle nonunl proposal message. + * @param container Message container. + * @param content Message content. + * @return returns 0 if nonunl proposal is pushed to the list, otherwise -1. + */ + int handle_nonunl_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content) { - std::scoped_lock lock(ctx.collected_msgs.nonunl_proposals_mutex); // Insert non-unl proposal with lock. + // Check the cap and insert proposal with lock. + std::scoped_lock lock(ctx.collected_msgs.nonunl_proposals_mutex); + + // If max number of nonunl proposals reached skip the rest. + if (ctx.collected_msgs.nonunl_proposals.size() == p2p::NONUNL_PROPOSAL_LIST_CAP) + return -1; ctx.collected_msgs.nonunl_proposals.push_back( p2pmsg::create_nonunl_proposal_from_msg(*content->message_as_NonUnl_Proposal_Message(), container->timestamp())); + + return 0; } void handle_npl_message(const p2pmsg::Container *container, const p2pmsg::Content *content) diff --git a/src/p2p/peer_session_handler.hpp b/src/p2p/peer_session_handler.hpp index f86a6c81..69828f19 100644 --- a/src/p2p/peer_session_handler.hpp +++ b/src/p2p/peer_session_handler.hpp @@ -14,9 +14,9 @@ namespace p2p int handle_peer_message(p2p::peer_comm_session &session, std::string_view message); int handle_self_message(std::string_view message); int handle_peer_close(const p2p::peer_comm_session &session); + int handle_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content); + int handle_nonunl_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content); void handle_peer_on_verified(p2p::peer_comm_session &session); - void handle_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content); - void handle_nonunl_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content); void handle_npl_message(const p2pmsg::Container *container, const p2pmsg::Content *content); } // namespace p2p diff --git a/src/sc.hpp b/src/sc.hpp index c2334cfa..e0bf452f 100644 --- a/src/sc.hpp +++ b/src/sc.hpp @@ -13,6 +13,8 @@ */ namespace sc { + constexpr uint16_t MAX_NPL_MSG_QUEUE_SIZE = 64; // Maximum npl message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31).... + constexpr uint16_t MAX_CONTROL_MSG_QUEUE_SIZE = 64; // Maximum out message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31).... // Enum used to differenciate socket fds maintained for SC socket. enum SOCKETFDTYPE @@ -86,7 +88,10 @@ namespace sc // State hash after execution will be copied to this (not applicable to read only mode). hpfs::h32 post_execution_state_hash = hpfs::h32_empty; - contract_execution_args(util::buffer_store &user_input_store) : user_input_store(user_input_store) + contract_execution_args(util::buffer_store &user_input_store) + : user_input_store(user_input_store), + npl_messages(MAX_NPL_MSG_QUEUE_SIZE), + control_messages(MAX_CONTROL_MSG_QUEUE_SIZE) { } }; diff --git a/src/usr/read_req.cpp b/src/usr/read_req.cpp index b98823e6..ff654a54 100644 --- a/src/usr/read_req.cpp +++ b/src/usr/read_req.cpp @@ -12,9 +12,9 @@ */ namespace read_req { - constexpr uint16_t LOOP_WAIT = 100; // Milliseconds. - constexpr uint16_t MAX_QUEUE_SIZE = 100; // Maximum read request queue size. - constexpr uint16_t MAX_THREAD_CAP = 5; // Maximum number of read request processing threads. + constexpr uint16_t LOOP_WAIT = 100; // Milliseconds. + constexpr uint16_t MAX_QUEUE_SIZE = 64; // Maximum read request queue size, The size passed is rounded up to the next multiple of the block size (32). + constexpr uint16_t MAX_THREAD_CAP = 5; // Maximum number of read request processing threads. bool is_shutting_down = false; bool init_success = false; @@ -22,7 +22,7 @@ namespace read_req util::buffer_store read_req_store; std::thread thread_pool_executor; // Thread which spawns new threads for the read requests is the queue. std::vector read_req_threads; - moodycamel::ConcurrentQueue read_req_queue(MAX_QUEUE_SIZE); + moodycamel::ConcurrentQueue read_req_queue(MAX_QUEUE_SIZE, 0, MAX_THREAD_CAP); std::mutex execution_contexts_mutex; std::list execution_contexts; std::mutex completed_threads_mutex; diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 15204cce..e135fe9a 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -201,6 +201,13 @@ namespace usr */ int add_user(usr::user_comm_session &session, const std::string &pubkey_hex, std::string_view protocol_code) { + // If max number of user connections reached skip the rest. + if (ctx.users.size() == MAX_USER_COUNT) + { + LOG_DEBUG << "Rejecting " + session.display_name() << ". Maximum user count reached."; + return -1; + } + // Decode hex pubkey and get binary pubkey. We are only going to keep // the binary pubkey due to reduced memory footprint. std::string pubkey; diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index 8b31f014..47b535c0 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -16,6 +16,8 @@ */ namespace usr { + constexpr uint16_t MAX_USER_COUNT = 64; // Maximum number of user. + /** * Holds information about an authenticated (challenge-verified) user * connected to the HotPocket node. @@ -28,9 +30,6 @@ namespace usr // Holds the unprocessed user inputs collected from websocket. std::list submitted_inputs; - // Holds the unprocessed read requests collected from websocket. - std::list read_requests; - // Holds the websocket session of this user. // We don't need to own the session object since the lifetime of user and session are coupled. usr::user_comm_session &session;