Introduced priority queues for proposal processing. (#301)

* Introduced high priority queue for comm session message processing.
* Added high priority send for proposals.
This commit is contained in:
Ravin Perera
2021-05-03 21:44:36 +05:30
committed by GitHub
parent a8ae9c5876
commit fded7b7917
13 changed files with 156 additions and 58 deletions

View File

@@ -12,6 +12,7 @@ namespace comm
{
constexpr uint32_t DEFAULT_MAX_MSG_SIZE = 5 * 1024 * 1024;
constexpr uint64_t DEFAULT_MAX_CONNECTIONS = 99999;
constexpr uint16_t MAX_INBOUND_HIGH_PRIO_BTACH = 2; // Maximum no. of incomning high priority messages to process at a time.
template <typename T>
class comm_server
@@ -21,6 +22,7 @@ namespace comm
const uint64_t max_msg_size;
const uint64_t max_in_connections;
const uint64_t max_in_connections_per_host;
bool use_priority_queues = false; // Whether to activate inbound message high vs low priority-based processing.
bool is_shutting_down = false;
std::list<T> sessions;
std::list<T> new_sessions; // Sessions that haven't been initialized properly which are yet to be merge to "sessions" list.
@@ -168,11 +170,33 @@ namespace comm
messages_processed = true;
{
// Process one message from each session in round-robin fashion.
std::scoped_lock<std::mutex> lock(sessions_mutex);
if (use_priority_queues)
{
// Process high priority messages from each session in round-robin fashion.
for (T &session : sessions)
{
// Process predefined no. of high priority messages from each session.
uint16_t high_prio_msgs_processed = 0; // High priority messages so far processed from this session.
int result = 0;
while (high_prio_msgs_processed < MAX_INBOUND_HIGH_PRIO_BTACH && // Check if we have reached the batch limit.
(result = session.process_next_inbound_message(1)) != 0)
{
high_prio_msgs_processed++;
messages_processed = true;
if (result == -1)
session.mark_for_closure();
}
}
}
// Process one low priority message from each session in round-robin fashion.
for (T &session : sessions)
{
const int result = session.process_next_inbound_message();
const int result = session.process_next_inbound_message(2);
if (result != 0)
messages_processed = true;
@@ -217,13 +241,14 @@ namespace comm
public:
comm_server(std::string_view name, const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size,
const uint64_t max_in_connections, const uint64_t max_in_connections_per_host)
const uint64_t max_in_connections, const uint64_t max_in_connections_per_host, const bool use_priority_queues)
: name(name),
listen_port(port),
metric_thresholds(metric_thresholds),
max_msg_size(max_msg_size > 0 ? max_msg_size : DEFAULT_MAX_MSG_SIZE),
max_in_connections(max_in_connections > 0 ? max_in_connections : DEFAULT_MAX_CONNECTIONS),
max_in_connections_per_host(max_in_connections_per_host > 0 ? max_in_connections_per_host : DEFAULT_MAX_CONNECTIONS)
max_in_connections_per_host(max_in_connections_per_host > 0 ? max_in_connections_per_host : DEFAULT_MAX_CONNECTIONS),
use_priority_queues(use_priority_queues)
{
}

View File

@@ -18,7 +18,8 @@ namespace comm
host_address(host_address),
hpws_client(std::move(hpws_client)),
is_inbound(is_inbound),
in_msg_queue(MAX_IN_MSG_QUEUE_SIZE)
in_msg_queue1(MAX_IN_MSG_QUEUE_SIZE),
in_msg_queue2(MAX_IN_MSG_QUEUE_SIZE)
{
// Create new session_thresholds and insert it to thresholds vector.
// Have to maintain the SESSION_THRESHOLDS enum order in inserting new thresholds to thresholds vector
@@ -75,9 +76,23 @@ namespace comm
// Enqueue the message for processing.
std::string_view data = std::get<std::string_view>(read_result);
std::vector<char> msg(data.size());
memcpy(msg.data(), data.data(), data.size());
in_msg_queue.try_enqueue(std::move(msg));
// Detect message priority before adding to the message queue.
const int priority = get_message_priority(data);
if (priority == 0) // priority 0 means a bad message.
{
increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1);
}
else if (priority == 1 || priority == 2)
{
std::vector<char> msg(data.size());
memcpy(msg.data(), data.data(), data.size());
if (priority == 1)
in_msg_queue1.try_enqueue(std::move(msg));
else if (priority == 2)
in_msg_queue2.try_enqueue(std::move(msg));
}
// Signal the hpws client that we are ready for next message.
std::optional<hpws::error> error = hpws_client->ack(data);
@@ -100,40 +115,54 @@ namespace comm
/**
* Processes the next queued message (if any).
* @return 0 if no messages in queue. 1 if message was processed. -1 means session must be closed.
* @param priority Which priority queue to process.
* @return 0 if no messages in queue. 1 if a message were processed. -1 means session must be closed.
*/
int comm_session::process_next_inbound_message()
int comm_session::process_next_inbound_message(const uint16_t priority)
{
if (state != SESSION_STATE::ACTIVE)
return 0;
int res = 0;
moodycamel::ReaderWriterQueue<std::vector<char>> &queue = (priority == 1 ? in_msg_queue1 : in_msg_queue2);
// Process queue top.
std::vector<char> msg;
if (in_msg_queue.try_dequeue(msg))
if (queue.try_dequeue(msg))
{
std::string_view sv(msg.data(), msg.size());
const int sess_handler_result = handle_message(sv);
// If session handler returns -1 then that means the session must be closed.
// Otherwise it's considered message processing is successful.
return sess_handler_result == -1 ? -1 : 1;
if (handle_message(sv) == -1)
return -1;
else
res = 1;
}
return 0;
}
int comm_session::send(const std::vector<uint8_t> &message)
{
std::string_view sv(reinterpret_cast<const char *>(message.data()), message.size());
send(sv);
return 0;
return res;
}
/**
* Adds the given message to the outbound message queue.
* @param message Message to be added to the outbound queue.
* @return 0 on successful addition and -1 if the session is already closed or there's no space in the queue.
* @param priority If 1 adds to high priority queue. Else adds to low priority queue.
* @return 0 on successful addition and -1 if the session is already closed.
*/
int comm_session::send(std::string_view message)
int comm_session::send(const std::vector<uint8_t> &message, const uint16_t priority)
{
std::string_view sv(reinterpret_cast<const char *>(message.data()), message.size());
return send(sv);
}
/**
* Adds the given message to the outbound message queue.
* @param message Message to be added to the outbound queue.
* @param priority If 1 adds to high priority queue. Else adds to low priority queue.
* @return 0 on successful addition and -1 if the session is already closed.
*/
int comm_session::send(std::string_view message, const uint16_t priority)
{
if (state == SESSION_STATE::CLOSED)
return -1;
@@ -141,8 +170,12 @@ namespace comm
// Updating last activity timestamp since this session is sending a message.
last_activity_timestamp = util::get_epoch_milliseconds();
// Passing the ownership of message to the queue.
out_msg_queue.enqueue(std::string(message));
// Passing the ownership of message to the queue based on specified priority.
if (priority == 1)
out_msg_queue1.enqueue(std::string(message));
else
out_msg_queue2.enqueue(std::string(message));
return 0;
}
@@ -176,18 +209,27 @@ namespace comm
// Keep checking until the session is terminated.
while (state != SESSION_STATE::CLOSED)
{
bool messages_sent = false;
std::string msg_to_send;
// If the queue is not empty, the first element will be processed,
// else wait 10ms until queue gets populated.
if (out_msg_queue.try_dequeue(msg_to_send))
// Send all messages in high priority queue.
while (out_msg_queue1.try_dequeue(msg_to_send))
{
process_outbound_message(msg_to_send);
msg_to_send.clear();
messages_sent = true;
}
else
// Send top message in low priority queue.
if (out_msg_queue2.try_dequeue(msg_to_send))
{
util::sleep(10);
process_outbound_message(msg_to_send);
messages_sent = true;
}
// Wait for small delay if there were no outbound messages.
if (!messages_sent)
util::sleep(10);
}
}
@@ -275,7 +317,7 @@ namespace comm
// Reset counter timestamp.
t.timestamp = time_now;
}
// Check whether we have exceeded the threshold within the monitering interval.
const uint64_t elapsed_time = time_now - t.timestamp;
if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit)
@@ -327,6 +369,11 @@ namespace comm
return 0;
}
int comm_session::get_message_priority(std::string_view msg)
{
return 2; // Default is low priority.
}
int comm_session::handle_message(std::string_view msg)
{
return 0;

View File

@@ -33,15 +33,18 @@ namespace comm
std::optional<hpws::client> hpws_client;
std::vector<session_threshold> thresholds; // track down various communication thresholds
std::thread reader_thread; // The thread responsible for reading messages from the read fd.
std::thread writer_thread; // The thread responsible for writing messages to the write fd.
moodycamel::ReaderWriterQueue<std::vector<char>> in_msg_queue; // Holds incoming messages waiting to be processed.
moodycamel::ConcurrentQueue<std::string> out_msg_queue; // Holds outgoing messages waiting to be processed.
std::thread reader_thread; // The thread responsible for reading messages from the read fd.
std::thread writer_thread; // The thread responsible for writing messages to the write fd.
moodycamel::ReaderWriterQueue<std::vector<char>> in_msg_queue1; // Holds high priority incoming messages waiting to be processed.
moodycamel::ReaderWriterQueue<std::vector<char>> in_msg_queue2; // Holds low priority incoming messages waiting to be processed.
moodycamel::ConcurrentQueue<std::string> out_msg_queue1; // Holds high priority outgoing messages waiting to be processed.
moodycamel::ConcurrentQueue<std::string> out_msg_queue2; // Holds low priority outgoing messages waiting to be processed.
void reader_loop();
protected:
virtual int handle_connect();
virtual int get_message_priority(std::string_view msg);
virtual int handle_message(std::string_view msg);
virtual void handle_close();
virtual void handle_on_verified();
@@ -59,9 +62,9 @@ namespace comm
comm_session(
std::string_view host_address, hpws::client &&hpws_client, const bool is_inbound, const uint64_t (&metric_thresholds)[5]);
int init();
int process_next_inbound_message();
int send(const std::vector<uint8_t> &message);
int send(std::string_view message);
int process_next_inbound_message(const uint16_t priority);
int send(const std::vector<uint8_t> &message, const uint16_t priority = 2);
int send(std::string_view message, const uint16_t priority = 2);
int process_outbound_message(std::string_view message);
void process_outbound_msg_queue();
void check_last_activity_rules();

View File

@@ -395,10 +395,10 @@ namespace consensus
while (itr != ctx.candidate_proposals.end())
{
const p2p::proposal &cp = itr->second;
const int8_t stage_diff = ctx.stage - cp.stage;
// Only consider this round's proposals which are from previous stage.
const bool keep_candidate = (ctx.round_start_time == cp.time) && (stage_diff == 1);
// Only consider this round's proposals which are from current or previous stage.
const bool stage_valid = ctx.stage >= cp.stage && (ctx.stage - cp.stage) <= 1;
const bool keep_candidate = (ctx.round_start_time == cp.time) && stage_valid;
LOG_DEBUG << (keep_candidate ? "Prop--->" : "Erased")
<< " [s" << std::to_string(cp.stage)
<< "] u/i:" << cp.users.size()
@@ -617,7 +617,7 @@ namespace consensus
flatbuffers::FlatBufferBuilder fbuf;
p2pmsg::create_msg_from_nonunl_proposal(fbuf, nup);
p2p::broadcast_message(fbuf, true);
p2p::broadcast_message(fbuf, true, false, false, 1); // Use high priority send.
LOG_DEBUG << "NUP sent."
<< " users:" << nup.user_inputs.size();
@@ -635,7 +635,7 @@ namespace consensus
flatbuffers::FlatBufferBuilder fbuf;
p2pmsg::create_msg_from_proposal(fbuf, p);
p2p::broadcast_message(fbuf, true, false, !conf::cfg.contract.is_consensus_public);
p2p::broadcast_message(fbuf, true, false, !conf::cfg.contract.is_consensus_public, 1); // Use high priority send.
LOG_DEBUG << "Proposed <s" << std::to_string(p.stage) << "> u/i:" << p.users.size()
<< "/" << p.input_ordered_hashes.size()

View File

@@ -200,8 +200,9 @@ namespace p2p
* @param send_to_self Whether to also send the message to self (this node).
* @param is_msg_forwarding Whether this broadcast is for message forwarding.
* @param unl_only Whether this broadcast is only for the unl nodes.
* @param priority If 1, use high pririty send. Else, use low priority send.
*/
void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding, const bool unl_only)
void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding, const bool unl_only, const uint16_t priority)
{
broadcast_message(msg::fbuf::builder_to_string_view(fbuf), send_to_self, is_msg_forwarding, unl_only);
}
@@ -212,8 +213,9 @@ namespace p2p
* @param is_msg_forwarding Whether this broadcast is for message forwarding.
* @param unl_only Whether this broadcast is only for the unl nodes.
* @param skipping_session Session to be skipped in message forwarding(optional).
* @param priority If 1, use high pririty send. Else, use low priority send.
*/
void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding, const bool unl_only, const peer_comm_session *skipping_session)
void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding, const bool unl_only, const peer_comm_session *skipping_session, const uint16_t priority)
{
if (send_to_self)
self::send(message);
@@ -230,7 +232,7 @@ namespace p2p
(unl_only && !session->is_unl))
continue;
session->send(message);
session->send(message, priority);
}
}

View File

@@ -240,9 +240,9 @@ namespace p2p
int resolve_peer_challenge(peer_comm_session &session, const peer_challenge_response &challenge_resp);
void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding = false, const bool unl_only = false);
void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding = false, const bool unl_only = false, const uint16_t priority = 2);
void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding = false, const bool unl_only = false, const peer_comm_session *skipping_session = NULL);
void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding = false, const bool unl_only = false, const peer_comm_session *skipping_session = NULL, const uint16_t priority = 2);
void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf);

View File

@@ -16,7 +16,7 @@ namespace p2p
peer_comm_server::peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[5], const uint64_t max_msg_size,
const uint64_t max_in_connections, const uint64_t max_in_connections_per_host,
const std::vector<peer_properties> &req_known_remotes)
: comm::comm_server<peer_comm_session>("Peer", port, metric_thresholds, max_msg_size, max_in_connections, max_in_connections_per_host),
: comm::comm_server<peer_comm_session>("Peer", port, metric_thresholds, max_msg_size, max_in_connections, max_in_connections_per_host, true),
req_known_remotes(req_known_remotes) // Copy over known peers into internal collection.
{
}

View File

@@ -9,6 +9,11 @@ namespace p2p
return p2p::handle_peer_connect(*this);
}
int peer_comm_session::get_message_priority(std::string_view msg)
{
return p2p::get_message_priority(msg);
}
int peer_comm_session::handle_message(std::string_view msg)
{
return p2p::handle_peer_message(*this, msg);

View File

@@ -16,6 +16,7 @@ namespace p2p
private:
int handle_connect();
int get_message_priority(std::string_view msg);
int handle_message(std::string_view msg);
void handle_close();
void handle_on_verified();

View File

@@ -45,6 +45,27 @@ namespace p2p
return 0;
}
/**
* Returns the priority that should be assigned to the message.
* @return 0 if bad message. 1 or 2 if correct priority was assigned.
*/
int get_message_priority(std::string_view message)
{
if (!p2pmsg::verify_peer_message(message))
{
LOG_DEBUG << "Flatbuffer verify: Bad peer message.";
return 0;
}
const auto p2p_msg = p2pmsg::GetP2PMsg(message.data());
const msg::fbuf::p2pmsg::P2PMsgContent type = p2p_msg->content_type();
if (type == p2pmsg::P2PMsgContent_ProposalMsg || type == p2pmsg::P2PMsgContent_NonUnlProposalMsg)
return 1; // High priority
else
return 2; // Low priority
}
/**
* Peer session on message callback method. Validate and handle each type of peer messages.
* @return 0 on normal execution. -1 when session needs to be closed as a result of message handling.
@@ -54,13 +75,6 @@ namespace p2p
// Adding message size to peer message characters(bytes) per minute counter.
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, message.size());
if (!p2pmsg::verify_peer_message(message))
{
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1);
LOG_DEBUG << "Flatbuffer verify: Bad peer message.";
return 0;
}
const peer_message_info mi = p2pmsg::get_peer_message_info(message);
if (!mi.p2p_msg) // Message buffer will be null if peer message was too old.
return 0;

View File

@@ -7,6 +7,7 @@
namespace p2p
{
int handle_peer_connect(p2p::peer_comm_session &session);
int get_message_priority(std::string_view message);
int handle_peer_message(p2p::peer_comm_session &session, std::string_view message);
int handle_self_message(std::string_view message);
int handle_peer_close(const p2p::peer_comm_session &session);

View File

@@ -660,7 +660,7 @@ namespace sc
{
flatbuffers::FlatBufferBuilder fbuf;
msg::fbuf::p2pmsg::create_msg_from_npl_output(fbuf, output, ledger::ctx.get_lcl_id());
p2p::broadcast_message(fbuf, true, false, !conf::cfg.contract.is_npl_public);
p2p::broadcast_message(fbuf, true, false, !conf::cfg.contract.is_npl_public, 1); // Use high priority send.
}
}

View File

@@ -79,7 +79,7 @@ namespace usr
int start_listening()
{
ctx.server.emplace("User", conf::cfg.user.port, metric_thresholds, conf::cfg.user.max_bytes_per_msg,
conf::cfg.user.max_connections, conf::cfg.user.max_in_connections_per_host);
conf::cfg.user.max_connections, conf::cfg.user.max_in_connections_per_host, false);
if (ctx.server->start() == -1)
return -1;