Implemented multi-threaded inbound network message processing. (#115)

Used per-session thread to offload messages from network and a single thread for processing collected messages.
This commit is contained in:
Ravin Perera
2020-09-10 15:40:08 +05:30
committed by GitHub
parent 10cfb6e75f
commit d2f45daf4c
11 changed files with 297 additions and 210 deletions

View File

@@ -22,6 +22,9 @@ namespace comm
watchdog_thread = std::thread(
&comm_server::connection_watchdog, this, accept_fd, session_type, is_binary,
std::ref(metric_thresholds), req_known_remotes, max_msg_size);
inbound_message_processor_thread = std::thread(&comm_server::inbound_message_processor_loop, this, session_type);
return start_websocketd_process(port, domain_socket_name, is_binary,
use_size_header, max_msg_size);
}
@@ -71,30 +74,20 @@ namespace comm
{
util::mask_signal();
// Map with read fd to connected session mappings.
std::unordered_map<int, comm_session> sessions;
// Map with read fd to connected comm client mappings.
std::unordered_map<int, comm_client> outbound_clients;
// Counter to track when to initiate outbound client connections.
int16_t loop_counter = -1;
// Indicates whether at least some bytes were read from any of the clients during the previous iteration.
// If no bytes were read, we would force thread sleep to wait for bytes to arrive.
bool bytes_read = false;
while (true)
while (!should_stop_listening)
{
if (should_stop_listening)
break;
util::sleep(100);
// Accept any new incoming connection if available.
check_for_new_connection(sessions, accept_fd, session_type, is_binary, metric_thresholds);
check_for_new_connection(sessions, accept_fd, session_type, is_binary, metric_thresholds, max_msg_size);
// Restore any missing outbound connections.
if (!req_known_remotes.empty())
{
// Restore any missing outbound connections every 500 iterations (including the first iteration).
if (loop_counter == -1 || loop_counter == 500)
if (loop_counter == 20)
{
loop_counter = 0;
maintain_known_connections(sessions, outbound_clients, req_known_remotes, session_type, is_binary, max_msg_size, metric_thresholds);
@@ -102,71 +95,27 @@ namespace comm
loop_counter++;
}
// Prepare poll fd list.
const size_t fd_count = sessions.size() + 1; //+1 for the inclusion of accept_fd
pollfd pollfds[fd_count];
memset(pollfds, 0, sizeof(pollfd) * fd_count);
if (poll_fds(pollfds, accept_fd, sessions) == -1)
// Cleanup any sessions that needs closure.
std::set<int> closed_session_fds;
for (auto &[fd, session] : sessions)
{
util::sleep(10);
continue;
if (session.state == SESSION_STATE::MUST_CLOSE)
session.close(true);
if (session.state == SESSION_STATE::CLOSED)
closed_session_fds.emplace(fd);
}
if (!bytes_read)
util::sleep(10);
bytes_read = false;
// Loop through all session fds and read any data.
const size_t sessions_count = sessions.size();
if (sessions_count == 0)
continue;
for (size_t i = 1; i <= sessions_count; i++)
for (const int fd : closed_session_fds)
{
const short result = pollfds[i].revents;
const int fd = pollfds[i].fd;
// Delete from sessions.
sessions.erase(fd);
const auto iter = sessions.find(fd);
if (iter != sessions.end())
// Delete from outbound clients.
const auto client_itr = outbound_clients.find(fd);
if (client_itr != outbound_clients.end())
{
comm_session &session = iter->second;
bool should_disconnect = (session.state == SESSION_STATE::CLOSED);
if (!should_disconnect)
{
if (result & POLLIN)
{
const int read_result = session.attempt_read(max_msg_size);
// read_result -1 means error and we should disconnect the client.
// read_result 0 means no bytes were read.
// read_result 1 means some bytes were read.
// read_result 2 means full message were read and processed successfully.
if (read_result > 0)
bytes_read = true;
else if (read_result == -1)
should_disconnect = true;
}
if (result & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL))
should_disconnect = true;
}
if (should_disconnect)
{
// If this is an outbound session, cleanup the corresponding comm client as well.
if (!session.is_inbound)
{
const auto client_itr = outbound_clients.find(fd);
client_itr->second.stop();
outbound_clients.erase(client_itr);
}
session.close();
sessions.erase(fd);
}
client_itr->second.stop();
outbound_clients.erase(client_itr);
}
}
}
@@ -182,31 +131,10 @@ namespace comm
LOG_INFO << (session_type == SESSION_TYPE::USER ? "User" : "Peer") << " listener stopped.";
}
int comm_server::poll_fds(pollfd *pollfds, const int accept_fd, const std::unordered_map<int, comm_session> &sessions)
{
const short poll_events = POLLIN | POLLRDHUP;
pollfds[0].fd = accept_fd;
auto iter = sessions.begin();
for (size_t i = 1; i <= sessions.size(); i++)
{
pollfds[i].fd = iter->first;
pollfds[i].events = poll_events;
iter++;
}
if (poll(pollfds, sessions.size() + 1, 10) == -1) //10ms timeout
{
LOG_ERR << errno << ": Poll failed.";
return -1;
}
return 0;
}
void comm_server::check_for_new_connection(
std::unordered_map<int, comm_session> &sessions, const int accept_fd,
const SESSION_TYPE session_type, const bool is_binary, const uint64_t (&metric_thresholds)[4])
const SESSION_TYPE session_type, const bool is_binary, const uint64_t (&metric_thresholds)[4],
const uint64_t max_msg_size)
{
// Accept new client connection (if available)
int client_fd = accept(accept_fd, NULL, NULL);
@@ -227,9 +155,13 @@ namespace comm
}
else
{
comm_session session(ip, client_fd, client_fd, session_type, is_binary, true, metric_thresholds);
comm_session session(ip, client_fd, client_fd, session_type, is_binary, true, metric_thresholds, max_msg_size);
if (session.on_connect() == 0)
sessions.try_emplace(client_fd, std::move(session));
{
std::scoped_lock<std::mutex> lock(sessions_mutex);
const auto [itr, success] = sessions.try_emplace(client_fd, std::move(session));
itr->second.start_messaging_threads();
}
}
}
else
@@ -273,11 +205,16 @@ namespace comm
}
else
{
comm::comm_session session(host, client.read_fd, client.write_fd, comm::SESSION_TYPE::PEER, is_binary, false, metric_thresholds);
comm::comm_session session(host, client.read_fd, client.write_fd, comm::SESSION_TYPE::PEER, is_binary, false, metric_thresholds, max_msg_size);
session.known_ipport = ipport;
if (session.on_connect() == 0)
{
sessions.try_emplace(client.read_fd, std::move(session));
{
std::scoped_lock<std::mutex> lock(sessions_mutex);
const auto [itr, success] = sessions.try_emplace(client.read_fd, std::move(session));
itr->second.start_messaging_threads();
}
outbound_clients.emplace(client.read_fd, std::move(client));
known_remotes.emplace(ipport);
}
@@ -285,6 +222,37 @@ namespace comm
}
}
void comm_server::inbound_message_processor_loop(const SESSION_TYPE session_type)
{
util::mask_signal();
while (!should_stop_listening)
{
bool messages_processed = false;
{
// Process one message from each session in round-robin fashion.
std::scoped_lock<std::mutex> lock(sessions_mutex);
for (auto &[fd, session] : sessions)
{
const int result = session.process_next_inbound_message();
if (result != 0)
messages_processed = true;
if (result == -1)
session.mark_for_closure();
}
}
// If no messages were processed in this cycle, wait for some time.
if (!messages_processed)
util::sleep(10);
}
LOG_INFO << (session_type == SESSION_TYPE::USER ? "User" : "Peer") << " message processor stopped.";
}
int comm_server::start_websocketd_process(
const uint16_t port, const char *domain_socket_name,
const bool is_binary, const bool use_size_header, const uint64_t max_msg_size)
@@ -440,6 +408,7 @@ namespace comm
{
should_stop_listening = true;
watchdog_thread.join();
inbound_message_processor_thread.join();
if (websocketd_pid > 0)
util::kill_process(websocketd_pid, false); // Kill websocketd.