From fded7b7917b05fe940ab5e4c6418e3b8e6df5c31 Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Mon, 3 May 2021 21:44:36 +0530 Subject: [PATCH] Introduced priority queues for proposal processing. (#301) * Introduced high priority queue for comm session message processing. * Added high priority send for proposals. --- src/comm/comm_server.hpp | 33 ++++++++-- src/comm/comm_session.cpp | 101 ++++++++++++++++++++++--------- src/comm/comm_session.hpp | 17 +++--- src/consensus.cpp | 10 +-- src/p2p/p2p.cpp | 8 ++- src/p2p/p2p.hpp | 4 +- src/p2p/peer_comm_server.cpp | 2 +- src/p2p/peer_comm_session.cpp | 5 ++ src/p2p/peer_comm_session.hpp | 1 + src/p2p/peer_session_handler.cpp | 28 ++++++--- src/p2p/peer_session_handler.hpp | 1 + src/sc/sc.cpp | 2 +- src/usr/usr.cpp | 2 +- 13 files changed, 156 insertions(+), 58 deletions(-) diff --git a/src/comm/comm_server.hpp b/src/comm/comm_server.hpp index 90110462..4d178960 100644 --- a/src/comm/comm_server.hpp +++ b/src/comm/comm_server.hpp @@ -12,6 +12,7 @@ namespace comm { constexpr uint32_t DEFAULT_MAX_MSG_SIZE = 5 * 1024 * 1024; constexpr uint64_t DEFAULT_MAX_CONNECTIONS = 99999; + constexpr uint16_t MAX_INBOUND_HIGH_PRIO_BTACH = 2; // Maximum no. of incomning high priority messages to process at a time. template class comm_server @@ -21,6 +22,7 @@ namespace comm const uint64_t max_msg_size; const uint64_t max_in_connections; const uint64_t max_in_connections_per_host; + bool use_priority_queues = false; // Whether to activate inbound message high vs low priority-based processing. bool is_shutting_down = false; std::list sessions; std::list new_sessions; // Sessions that haven't been initialized properly which are yet to be merge to "sessions" list. @@ -168,11 +170,33 @@ namespace comm messages_processed = true; { - // Process one message from each session in round-robin fashion. std::scoped_lock lock(sessions_mutex); + + if (use_priority_queues) + { + // Process high priority messages from each session in round-robin fashion. + for (T &session : sessions) + { + // Process predefined no. of high priority messages from each session. + + uint16_t high_prio_msgs_processed = 0; // High priority messages so far processed from this session. + int result = 0; + while (high_prio_msgs_processed < MAX_INBOUND_HIGH_PRIO_BTACH && // Check if we have reached the batch limit. + (result = session.process_next_inbound_message(1)) != 0) + { + high_prio_msgs_processed++; + messages_processed = true; + + if (result == -1) + session.mark_for_closure(); + } + } + } + + // Process one low priority message from each session in round-robin fashion. for (T &session : sessions) { - const int result = session.process_next_inbound_message(); + const int result = session.process_next_inbound_message(2); if (result != 0) messages_processed = true; @@ -217,13 +241,14 @@ namespace comm public: comm_server(std::string_view name, const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size, - const uint64_t max_in_connections, const uint64_t max_in_connections_per_host) + const uint64_t max_in_connections, const uint64_t max_in_connections_per_host, const bool use_priority_queues) : name(name), listen_port(port), metric_thresholds(metric_thresholds), max_msg_size(max_msg_size > 0 ? max_msg_size : DEFAULT_MAX_MSG_SIZE), max_in_connections(max_in_connections > 0 ? max_in_connections : DEFAULT_MAX_CONNECTIONS), - max_in_connections_per_host(max_in_connections_per_host > 0 ? max_in_connections_per_host : DEFAULT_MAX_CONNECTIONS) + max_in_connections_per_host(max_in_connections_per_host > 0 ? max_in_connections_per_host : DEFAULT_MAX_CONNECTIONS), + use_priority_queues(use_priority_queues) { } diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index d3b21890..07c96f56 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -18,7 +18,8 @@ namespace comm host_address(host_address), hpws_client(std::move(hpws_client)), is_inbound(is_inbound), - in_msg_queue(MAX_IN_MSG_QUEUE_SIZE) + in_msg_queue1(MAX_IN_MSG_QUEUE_SIZE), + in_msg_queue2(MAX_IN_MSG_QUEUE_SIZE) { // Create new session_thresholds and insert it to thresholds vector. // Have to maintain the SESSION_THRESHOLDS enum order in inserting new thresholds to thresholds vector @@ -75,9 +76,23 @@ namespace comm // Enqueue the message for processing. std::string_view data = std::get(read_result); - std::vector msg(data.size()); - memcpy(msg.data(), data.data(), data.size()); - in_msg_queue.try_enqueue(std::move(msg)); + + // Detect message priority before adding to the message queue. + const int priority = get_message_priority(data); + if (priority == 0) // priority 0 means a bad message. + { + increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); + } + else if (priority == 1 || priority == 2) + { + std::vector msg(data.size()); + memcpy(msg.data(), data.data(), data.size()); + + if (priority == 1) + in_msg_queue1.try_enqueue(std::move(msg)); + else if (priority == 2) + in_msg_queue2.try_enqueue(std::move(msg)); + } // Signal the hpws client that we are ready for next message. std::optional error = hpws_client->ack(data); @@ -100,40 +115,54 @@ namespace comm /** * Processes the next queued message (if any). - * @return 0 if no messages in queue. 1 if message was processed. -1 means session must be closed. + * @param priority Which priority queue to process. + * @return 0 if no messages in queue. 1 if a message were processed. -1 means session must be closed. */ - int comm_session::process_next_inbound_message() + int comm_session::process_next_inbound_message(const uint16_t priority) { if (state != SESSION_STATE::ACTIVE) return 0; + int res = 0; + + moodycamel::ReaderWriterQueue> &queue = (priority == 1 ? in_msg_queue1 : in_msg_queue2); + + // Process queue top. std::vector msg; - if (in_msg_queue.try_dequeue(msg)) + if (queue.try_dequeue(msg)) { std::string_view sv(msg.data(), msg.size()); - const int sess_handler_result = handle_message(sv); // If session handler returns -1 then that means the session must be closed. // Otherwise it's considered message processing is successful. - return sess_handler_result == -1 ? -1 : 1; + if (handle_message(sv) == -1) + return -1; + else + res = 1; } - return 0; - } - - int comm_session::send(const std::vector &message) - { - std::string_view sv(reinterpret_cast(message.data()), message.size()); - send(sv); - return 0; + return res; } /** * Adds the given message to the outbound message queue. * @param message Message to be added to the outbound queue. - * @return 0 on successful addition and -1 if the session is already closed or there's no space in the queue. + * @param priority If 1 adds to high priority queue. Else adds to low priority queue. + * @return 0 on successful addition and -1 if the session is already closed. */ - int comm_session::send(std::string_view message) + int comm_session::send(const std::vector &message, const uint16_t priority) + { + std::string_view sv(reinterpret_cast(message.data()), message.size()); + return send(sv); + } + + /** + * Adds the given message to the outbound message queue. + * @param message Message to be added to the outbound queue. + * @param priority If 1 adds to high priority queue. Else adds to low priority queue. + * @return 0 on successful addition and -1 if the session is already closed. + */ + int comm_session::send(std::string_view message, const uint16_t priority) { if (state == SESSION_STATE::CLOSED) return -1; @@ -141,8 +170,12 @@ namespace comm // Updating last activity timestamp since this session is sending a message. last_activity_timestamp = util::get_epoch_milliseconds(); - // Passing the ownership of message to the queue. - out_msg_queue.enqueue(std::string(message)); + // Passing the ownership of message to the queue based on specified priority. + if (priority == 1) + out_msg_queue1.enqueue(std::string(message)); + else + out_msg_queue2.enqueue(std::string(message)); + return 0; } @@ -176,18 +209,27 @@ namespace comm // Keep checking until the session is terminated. while (state != SESSION_STATE::CLOSED) { + bool messages_sent = false; std::string msg_to_send; - // If the queue is not empty, the first element will be processed, - // else wait 10ms until queue gets populated. - if (out_msg_queue.try_dequeue(msg_to_send)) + // Send all messages in high priority queue. + while (out_msg_queue1.try_dequeue(msg_to_send)) { process_outbound_message(msg_to_send); + msg_to_send.clear(); + messages_sent = true; } - else + + // Send top message in low priority queue. + if (out_msg_queue2.try_dequeue(msg_to_send)) { - util::sleep(10); + process_outbound_message(msg_to_send); + messages_sent = true; } + + // Wait for small delay if there were no outbound messages. + if (!messages_sent) + util::sleep(10); } } @@ -275,7 +317,7 @@ namespace comm // Reset counter timestamp. t.timestamp = time_now; } - + // Check whether we have exceeded the threshold within the monitering interval. const uint64_t elapsed_time = time_now - t.timestamp; if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit) @@ -327,6 +369,11 @@ namespace comm return 0; } + int comm_session::get_message_priority(std::string_view msg) + { + return 2; // Default is low priority. + } + int comm_session::handle_message(std::string_view msg) { return 0; diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp index 684d5a6a..38d9db40 100644 --- a/src/comm/comm_session.hpp +++ b/src/comm/comm_session.hpp @@ -33,15 +33,18 @@ namespace comm std::optional hpws_client; std::vector thresholds; // track down various communication thresholds - std::thread reader_thread; // The thread responsible for reading messages from the read fd. - std::thread writer_thread; // The thread responsible for writing messages to the write fd. - moodycamel::ReaderWriterQueue> in_msg_queue; // Holds incoming messages waiting to be processed. - moodycamel::ConcurrentQueue out_msg_queue; // Holds outgoing messages waiting to be processed. + std::thread reader_thread; // The thread responsible for reading messages from the read fd. + std::thread writer_thread; // The thread responsible for writing messages to the write fd. + moodycamel::ReaderWriterQueue> in_msg_queue1; // Holds high priority incoming messages waiting to be processed. + moodycamel::ReaderWriterQueue> in_msg_queue2; // Holds low priority incoming messages waiting to be processed. + moodycamel::ConcurrentQueue out_msg_queue1; // Holds high priority outgoing messages waiting to be processed. + moodycamel::ConcurrentQueue out_msg_queue2; // Holds low priority outgoing messages waiting to be processed. void reader_loop(); protected: virtual int handle_connect(); + virtual int get_message_priority(std::string_view msg); virtual int handle_message(std::string_view msg); virtual void handle_close(); virtual void handle_on_verified(); @@ -59,9 +62,9 @@ namespace comm comm_session( std::string_view host_address, hpws::client &&hpws_client, const bool is_inbound, const uint64_t (&metric_thresholds)[5]); int init(); - int process_next_inbound_message(); - int send(const std::vector &message); - int send(std::string_view message); + int process_next_inbound_message(const uint16_t priority); + int send(const std::vector &message, const uint16_t priority = 2); + int send(std::string_view message, const uint16_t priority = 2); int process_outbound_message(std::string_view message); void process_outbound_msg_queue(); void check_last_activity_rules(); diff --git a/src/consensus.cpp b/src/consensus.cpp index 20bbc01e..dee7a8ab 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -395,10 +395,10 @@ namespace consensus while (itr != ctx.candidate_proposals.end()) { const p2p::proposal &cp = itr->second; - const int8_t stage_diff = ctx.stage - cp.stage; - // Only consider this round's proposals which are from previous stage. - const bool keep_candidate = (ctx.round_start_time == cp.time) && (stage_diff == 1); + // Only consider this round's proposals which are from current or previous stage. + const bool stage_valid = ctx.stage >= cp.stage && (ctx.stage - cp.stage) <= 1; + const bool keep_candidate = (ctx.round_start_time == cp.time) && stage_valid; LOG_DEBUG << (keep_candidate ? "Prop--->" : "Erased") << " [s" << std::to_string(cp.stage) << "] u/i:" << cp.users.size() @@ -617,7 +617,7 @@ namespace consensus flatbuffers::FlatBufferBuilder fbuf; p2pmsg::create_msg_from_nonunl_proposal(fbuf, nup); - p2p::broadcast_message(fbuf, true); + p2p::broadcast_message(fbuf, true, false, false, 1); // Use high priority send. LOG_DEBUG << "NUP sent." << " users:" << nup.user_inputs.size(); @@ -635,7 +635,7 @@ namespace consensus flatbuffers::FlatBufferBuilder fbuf; p2pmsg::create_msg_from_proposal(fbuf, p); - p2p::broadcast_message(fbuf, true, false, !conf::cfg.contract.is_consensus_public); + p2p::broadcast_message(fbuf, true, false, !conf::cfg.contract.is_consensus_public, 1); // Use high priority send. LOG_DEBUG << "Proposed u/i:" << p.users.size() << "/" << p.input_ordered_hashes.size() diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index e2b10c3b..3ddf7061 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -200,8 +200,9 @@ namespace p2p * @param send_to_self Whether to also send the message to self (this node). * @param is_msg_forwarding Whether this broadcast is for message forwarding. * @param unl_only Whether this broadcast is only for the unl nodes. + * @param priority If 1, use high pririty send. Else, use low priority send. */ - void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding, const bool unl_only) + void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding, const bool unl_only, const uint16_t priority) { broadcast_message(msg::fbuf::builder_to_string_view(fbuf), send_to_self, is_msg_forwarding, unl_only); } @@ -212,8 +213,9 @@ namespace p2p * @param is_msg_forwarding Whether this broadcast is for message forwarding. * @param unl_only Whether this broadcast is only for the unl nodes. * @param skipping_session Session to be skipped in message forwarding(optional). + * @param priority If 1, use high pririty send. Else, use low priority send. */ - void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding, const bool unl_only, const peer_comm_session *skipping_session) + void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding, const bool unl_only, const peer_comm_session *skipping_session, const uint16_t priority) { if (send_to_self) self::send(message); @@ -230,7 +232,7 @@ namespace p2p (unl_only && !session->is_unl)) continue; - session->send(message); + session->send(message, priority); } } diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 75929807..da9f39cb 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -240,9 +240,9 @@ namespace p2p int resolve_peer_challenge(peer_comm_session &session, const peer_challenge_response &challenge_resp); - void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding = false, const bool unl_only = false); + void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding = false, const bool unl_only = false, const uint16_t priority = 2); - void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding = false, const bool unl_only = false, const peer_comm_session *skipping_session = NULL); + void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding = false, const bool unl_only = false, const peer_comm_session *skipping_session = NULL, const uint16_t priority = 2); void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf); diff --git a/src/p2p/peer_comm_server.cpp b/src/p2p/peer_comm_server.cpp index be48ce83..8dceb019 100644 --- a/src/p2p/peer_comm_server.cpp +++ b/src/p2p/peer_comm_server.cpp @@ -16,7 +16,7 @@ namespace p2p peer_comm_server::peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size, const uint64_t max_in_connections, const uint64_t max_in_connections_per_host, const std::vector &req_known_remotes) - : comm::comm_server("Peer", port, metric_thresholds, max_msg_size, max_in_connections, max_in_connections_per_host), + : comm::comm_server("Peer", port, metric_thresholds, max_msg_size, max_in_connections, max_in_connections_per_host, true), req_known_remotes(req_known_remotes) // Copy over known peers into internal collection. { } diff --git a/src/p2p/peer_comm_session.cpp b/src/p2p/peer_comm_session.cpp index 359b97a3..93054596 100644 --- a/src/p2p/peer_comm_session.cpp +++ b/src/p2p/peer_comm_session.cpp @@ -9,6 +9,11 @@ namespace p2p return p2p::handle_peer_connect(*this); } + int peer_comm_session::get_message_priority(std::string_view msg) + { + return p2p::get_message_priority(msg); + } + int peer_comm_session::handle_message(std::string_view msg) { return p2p::handle_peer_message(*this, msg); diff --git a/src/p2p/peer_comm_session.hpp b/src/p2p/peer_comm_session.hpp index 0793efe9..3ae0eb7b 100644 --- a/src/p2p/peer_comm_session.hpp +++ b/src/p2p/peer_comm_session.hpp @@ -16,6 +16,7 @@ namespace p2p private: int handle_connect(); + int get_message_priority(std::string_view msg); int handle_message(std::string_view msg); void handle_close(); void handle_on_verified(); diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 56839c5e..eeb43d11 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -45,6 +45,27 @@ namespace p2p return 0; } + /** + * Returns the priority that should be assigned to the message. + * @return 0 if bad message. 1 or 2 if correct priority was assigned. + */ + int get_message_priority(std::string_view message) + { + if (!p2pmsg::verify_peer_message(message)) + { + LOG_DEBUG << "Flatbuffer verify: Bad peer message."; + return 0; + } + + const auto p2p_msg = p2pmsg::GetP2PMsg(message.data()); + const msg::fbuf::p2pmsg::P2PMsgContent type = p2p_msg->content_type(); + + if (type == p2pmsg::P2PMsgContent_ProposalMsg || type == p2pmsg::P2PMsgContent_NonUnlProposalMsg) + return 1; // High priority + else + return 2; // Low priority + } + /** * Peer session on message callback method. Validate and handle each type of peer messages. * @return 0 on normal execution. -1 when session needs to be closed as a result of message handling. @@ -54,13 +75,6 @@ namespace p2p // Adding message size to peer message characters(bytes) per minute counter. session.increment_metric(comm::SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, message.size()); - if (!p2pmsg::verify_peer_message(message)) - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); - LOG_DEBUG << "Flatbuffer verify: Bad peer message."; - return 0; - } - const peer_message_info mi = p2pmsg::get_peer_message_info(message); if (!mi.p2p_msg) // Message buffer will be null if peer message was too old. return 0; diff --git a/src/p2p/peer_session_handler.hpp b/src/p2p/peer_session_handler.hpp index 6a1c4b5a..3757275c 100644 --- a/src/p2p/peer_session_handler.hpp +++ b/src/p2p/peer_session_handler.hpp @@ -7,6 +7,7 @@ namespace p2p { int handle_peer_connect(p2p::peer_comm_session &session); + int get_message_priority(std::string_view message); int handle_peer_message(p2p::peer_comm_session &session, std::string_view message); int handle_self_message(std::string_view message); int handle_peer_close(const p2p::peer_comm_session &session); diff --git a/src/sc/sc.cpp b/src/sc/sc.cpp index 334bc361..743238d9 100644 --- a/src/sc/sc.cpp +++ b/src/sc/sc.cpp @@ -660,7 +660,7 @@ namespace sc { flatbuffers::FlatBufferBuilder fbuf; msg::fbuf::p2pmsg::create_msg_from_npl_output(fbuf, output, ledger::ctx.get_lcl_id()); - p2p::broadcast_message(fbuf, true, false, !conf::cfg.contract.is_npl_public); + p2p::broadcast_message(fbuf, true, false, !conf::cfg.contract.is_npl_public, 1); // Use high priority send. } } diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 0d3635bd..7f912a7f 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -79,7 +79,7 @@ namespace usr int start_listening() { ctx.server.emplace("User", conf::cfg.user.port, metric_thresholds, conf::cfg.user.max_bytes_per_msg, - conf::cfg.user.max_connections, conf::cfg.user.max_in_connections_per_host); + conf::cfg.user.max_connections, conf::cfg.user.max_in_connections_per_host, false); if (ctx.server->start() == -1) return -1;