diff --git a/README.md b/README.md index d796015d..bf7777c9 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ A C++ version of hotpocket designed for production envrionments, original protot * P2P Protocol - https://google.github.io/flatbuffers * Fuse filesystem - https://github.com/libfuse/libfuse * Boost - https://www.boost.org +* Concurrent queue - https://github.com/cameron314/readerwriterqueue ## Steps to setup Hot Pocket (For Ubuntu/Debian) @@ -80,6 +81,12 @@ Example: When you make a change to `p2pmsg_content_.fbc` defnition file, you nee 4. `meson .. && ninja` 6. `sudo ninja install` +#### Install reader-writer queue +1. Download [readerwritequeue 1.0.3](https://github.com/cameron314/readerwriterqueue/archive/v1.0.3.zip) and extract. +2. `mkdir build; cd build` +3. `cmake ..` +4. `sudo make install` + #### Run ldconfig `sudo ldconfig` diff --git a/src/comm/comm_server.cpp b/src/comm/comm_server.cpp index 52c5be21..eafa9698 100644 --- a/src/comm/comm_server.cpp +++ b/src/comm/comm_server.cpp @@ -22,6 +22,9 @@ namespace comm watchdog_thread = std::thread( &comm_server::connection_watchdog, this, accept_fd, session_type, is_binary, std::ref(metric_thresholds), req_known_remotes, max_msg_size); + + inbound_message_processor_thread = std::thread(&comm_server::inbound_message_processor_loop, this, session_type); + return start_websocketd_process(port, domain_socket_name, is_binary, use_size_header, max_msg_size); } @@ -71,30 +74,20 @@ namespace comm { util::mask_signal(); - // Map with read fd to connected session mappings. - std::unordered_map sessions; - // Map with read fd to connected comm client mappings. - std::unordered_map outbound_clients; - // Counter to track when to initiate outbound client connections. int16_t loop_counter = -1; - // Indicates whether at least some bytes were read from any of the clients during the previous iteration. - // If no bytes were read, we would force thread sleep to wait for bytes to arrive. - bool bytes_read = false; - - while (true) + while (!should_stop_listening) { - if (should_stop_listening) - break; + util::sleep(100); // Accept any new incoming connection if available. - check_for_new_connection(sessions, accept_fd, session_type, is_binary, metric_thresholds); + check_for_new_connection(sessions, accept_fd, session_type, is_binary, metric_thresholds, max_msg_size); + // Restore any missing outbound connections. if (!req_known_remotes.empty()) { - // Restore any missing outbound connections every 500 iterations (including the first iteration). - if (loop_counter == -1 || loop_counter == 500) + if (loop_counter == 20) { loop_counter = 0; maintain_known_connections(sessions, outbound_clients, req_known_remotes, session_type, is_binary, max_msg_size, metric_thresholds); @@ -102,71 +95,27 @@ namespace comm loop_counter++; } - // Prepare poll fd list. - const size_t fd_count = sessions.size() + 1; //+1 for the inclusion of accept_fd - - pollfd pollfds[fd_count]; - memset(pollfds, 0, sizeof(pollfd) * fd_count); - - if (poll_fds(pollfds, accept_fd, sessions) == -1) + // Cleanup any sessions that needs closure. + std::set closed_session_fds; + for (auto &[fd, session] : sessions) { - util::sleep(10); - continue; + if (session.state == SESSION_STATE::MUST_CLOSE) + session.close(true); + + if (session.state == SESSION_STATE::CLOSED) + closed_session_fds.emplace(fd); } - - if (!bytes_read) - util::sleep(10); - bytes_read = false; - - // Loop through all session fds and read any data. - const size_t sessions_count = sessions.size(); - if (sessions_count == 0) - continue; - - for (size_t i = 1; i <= sessions_count; i++) + for (const int fd : closed_session_fds) { - const short result = pollfds[i].revents; - const int fd = pollfds[i].fd; + // Delete from sessions. + sessions.erase(fd); - const auto iter = sessions.find(fd); - if (iter != sessions.end()) + // Delete from outbound clients. + const auto client_itr = outbound_clients.find(fd); + if (client_itr != outbound_clients.end()) { - comm_session &session = iter->second; - bool should_disconnect = (session.state == SESSION_STATE::CLOSED); - - if (!should_disconnect) - { - if (result & POLLIN) - { - const int read_result = session.attempt_read(max_msg_size); - - // read_result -1 means error and we should disconnect the client. - // read_result 0 means no bytes were read. - // read_result 1 means some bytes were read. - // read_result 2 means full message were read and processed successfully. - if (read_result > 0) - bytes_read = true; - else if (read_result == -1) - should_disconnect = true; - } - - if (result & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) - should_disconnect = true; - } - - if (should_disconnect) - { - // If this is an outbound session, cleanup the corresponding comm client as well. - if (!session.is_inbound) - { - const auto client_itr = outbound_clients.find(fd); - client_itr->second.stop(); - outbound_clients.erase(client_itr); - } - - session.close(); - sessions.erase(fd); - } + client_itr->second.stop(); + outbound_clients.erase(client_itr); } } } @@ -182,31 +131,10 @@ namespace comm LOG_INFO << (session_type == SESSION_TYPE::USER ? "User" : "Peer") << " listener stopped."; } - int comm_server::poll_fds(pollfd *pollfds, const int accept_fd, const std::unordered_map &sessions) - { - const short poll_events = POLLIN | POLLRDHUP; - pollfds[0].fd = accept_fd; - - auto iter = sessions.begin(); - for (size_t i = 1; i <= sessions.size(); i++) - { - pollfds[i].fd = iter->first; - pollfds[i].events = poll_events; - iter++; - } - - if (poll(pollfds, sessions.size() + 1, 10) == -1) //10ms timeout - { - LOG_ERR << errno << ": Poll failed."; - return -1; - } - - return 0; - } - void comm_server::check_for_new_connection( std::unordered_map &sessions, const int accept_fd, - const SESSION_TYPE session_type, const bool is_binary, const uint64_t (&metric_thresholds)[4]) + const SESSION_TYPE session_type, const bool is_binary, const uint64_t (&metric_thresholds)[4], + const uint64_t max_msg_size) { // Accept new client connection (if available) int client_fd = accept(accept_fd, NULL, NULL); @@ -227,9 +155,13 @@ namespace comm } else { - comm_session session(ip, client_fd, client_fd, session_type, is_binary, true, metric_thresholds); + comm_session session(ip, client_fd, client_fd, session_type, is_binary, true, metric_thresholds, max_msg_size); if (session.on_connect() == 0) - sessions.try_emplace(client_fd, std::move(session)); + { + std::scoped_lock lock(sessions_mutex); + const auto [itr, success] = sessions.try_emplace(client_fd, std::move(session)); + itr->second.start_messaging_threads(); + } } } else @@ -273,11 +205,16 @@ namespace comm } else { - comm::comm_session session(host, client.read_fd, client.write_fd, comm::SESSION_TYPE::PEER, is_binary, false, metric_thresholds); + comm::comm_session session(host, client.read_fd, client.write_fd, comm::SESSION_TYPE::PEER, is_binary, false, metric_thresholds, max_msg_size); session.known_ipport = ipport; if (session.on_connect() == 0) { - sessions.try_emplace(client.read_fd, std::move(session)); + { + std::scoped_lock lock(sessions_mutex); + const auto [itr, success] = sessions.try_emplace(client.read_fd, std::move(session)); + itr->second.start_messaging_threads(); + } + outbound_clients.emplace(client.read_fd, std::move(client)); known_remotes.emplace(ipport); } @@ -285,6 +222,37 @@ namespace comm } } + void comm_server::inbound_message_processor_loop(const SESSION_TYPE session_type) + { + util::mask_signal(); + + while (!should_stop_listening) + { + bool messages_processed = false; + + { + // Process one message from each session in round-robin fashion. + std::scoped_lock lock(sessions_mutex); + for (auto &[fd, session] : sessions) + { + const int result = session.process_next_inbound_message(); + + if (result != 0) + messages_processed = true; + + if (result == -1) + session.mark_for_closure(); + } + } + + // If no messages were processed in this cycle, wait for some time. + if (!messages_processed) + util::sleep(10); + } + + LOG_INFO << (session_type == SESSION_TYPE::USER ? "User" : "Peer") << " message processor stopped."; + } + int comm_server::start_websocketd_process( const uint16_t port, const char *domain_socket_name, const bool is_binary, const bool use_size_header, const uint64_t max_msg_size) @@ -440,6 +408,7 @@ namespace comm { should_stop_listening = true; watchdog_thread.join(); + inbound_message_processor_thread.join(); if (websocketd_pid > 0) util::kill_process(websocketd_pid, false); // Kill websocketd. diff --git a/src/comm/comm_server.hpp b/src/comm/comm_server.hpp index b9008a03..59a81dca 100644 --- a/src/comm/comm_server.hpp +++ b/src/comm/comm_server.hpp @@ -11,16 +11,26 @@ namespace comm class comm_server { pid_t websocketd_pid = 0; - int firewall_out = -1; // at some point we may want to listen for firewall_in but at the moment unimplemented - std::thread watchdog_thread; + int firewall_out = -1; // at some point we may want to listen for firewall_in but at the moment unimplemented + std::thread watchdog_thread; // Connection watcher thread. + std::thread inbound_message_processor_thread; // Incoming message processor thread. bool should_stop_listening = false; + // Map with read fd to connected session mappings. + std::unordered_map sessions; + std::mutex sessions_mutex; + + // Map with read fd to connected comm client mappings. + std::unordered_map outbound_clients; + int open_domain_socket(const char *domain_socket_name); void connection_watchdog( const int accept_fd, const SESSION_TYPE session_type, const bool is_binary, const uint64_t (&metric_thresholds)[4], const std::set &eq_known_remotes, const uint64_t max_msg_size); + void inbound_message_processor_loop(const SESSION_TYPE session_type); + int start_websocketd_process( const uint16_t port, const char *domain_socket_name, const bool is_binary, const bool use_size_header, const uint64_t max_msg_size); @@ -29,7 +39,8 @@ namespace comm void check_for_new_connection( std::unordered_map &sessions, const int accept_fd, - const SESSION_TYPE session_type, const bool is_binary, const uint64_t (&metric_thresholds)[4]); + const SESSION_TYPE session_type, const bool is_binary, const uint64_t (&metric_thresholds)[4], + const uint64_t max_msg_size); void maintain_known_connections( std::unordered_map &sessions, std::unordered_map &outbound_clients, diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index 33c79396..da5e030c 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -9,10 +9,9 @@ namespace comm { - constexpr uint32_t INTERVALMS = 60000; constexpr uint8_t SIZE_HEADER_LEN = 8; - constexpr uint32_t READ_BUFFER_IDLE_SIZE = 64 * 1024; + constexpr short READER_POLL_EVENTS = POLLIN | POLLRDHUP; // Global instances of user and peer session handlers. usr::user_session_handler user_sess_handler; @@ -20,14 +19,16 @@ namespace comm comm_session::comm_session( std::string_view ip, const int read_fd, const int write_fd, const SESSION_TYPE session_type, - const bool is_binary, const bool is_inbound, const uint64_t (&metric_thresholds)[4]) + const bool is_binary, const bool is_inbound, const uint64_t (&metric_thresholds)[4], const uint64_t max_msg_size) : read_fd(read_fd), write_fd(write_fd), session_type(session_type), uniqueid(std::to_string(read_fd).append(":").append(ip)), is_binary(is_binary), - is_inbound(is_inbound) + is_inbound(is_inbound), + max_msg_size(max_msg_size), + in_msg_queue(32) { // Create new session_thresholds and insert it to thresholds vector. // Have to maintain the SESSION_THRESHOLDS enum order in inserting new thresholds to thresholds vector @@ -37,6 +38,53 @@ namespace comm thresholds.push_back(session_threshold(metric_thresholds[i], INTERVALMS)); } + void comm_session::start_messaging_threads() + { + reader_thread = std::thread(&comm_session::reader_loop, this); + } + + void comm_session::reader_loop() + { + util::mask_signal(); + + while (!should_stop_messaging_threads) + { + pollfd pollfds[1] = {{read_fd, READER_POLL_EVENTS}}; + + if (poll(pollfds, 1, 20) == -1) + { + LOG_ERR << errno << ": Session reader poll failed."; + break; + } + + const short result = pollfds[0].revents; + bool should_disconnect = false; + + if (result & POLLIN) + { + // read_result -1 means error and we should disconnect the client. + // read_result 0 means no bytes were read. + // read_result 1 means some bytes were read. + // read_result 2 means full message were read and processed successfully. + const int read_result = attempt_read(); + + if (read_result == -1) + should_disconnect = true; + } + + if (!should_disconnect && (result & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL))) + should_disconnect = true; + + if (should_disconnect) + { + // Here we mark the session as needing to close. + // The session will be properly "closed" and cleaned up by the global comm_server thread. + mark_for_closure(); + break; + } + } + } + int comm_session::on_connect() { state = SESSION_STATE::ACTIVE; @@ -49,12 +97,11 @@ namespace comm /** * Attempts to read message data from the given socket fd and passes the message on to the session. - * @param max_msg_size The allowed max byte length of a message to be read. * @return -1 on error and client must be disconnected. 0 if no message data bytes were read. 1 if some * bytes were read but a full message is not yet formed. 2 if a fully formed message has been * read into the read buffer. */ - int comm_session::attempt_read(const uint64_t max_msg_size) + int comm_session::attempt_read() { size_t available_bytes = 0; if (ioctl(read_fd, FIONREAD, &available_bytes) == -1 || @@ -67,9 +114,11 @@ namespace comm // Try to read a complete message using available bytes. if (available_bytes > 0) { + increment_metric(SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, available_bytes); + if (is_binary) { - res = get_binary_msg_read_len(available_bytes); + res = attempt_binary_msg_construction(available_bytes); } else { @@ -79,32 +128,40 @@ namespace comm if (res == 2) // Full message has been read into read buffer. { - if (on_message(std::string_view(read_buffer.data(), read_buffer.size())) == -1) - res = -1; - - // Reset the read buffer. - if (read_buffer.size() > READ_BUFFER_IDLE_SIZE) - { - read_buffer.resize(READ_BUFFER_IDLE_SIZE); - read_buffer.shrink_to_fit(); // This is to avaoid large idle memory allocations. - } - - read_buffer.clear(); + std::vector msg; + msg.swap(read_buffer); read_buffer_filled_size = 0; + + in_msg_queue.enqueue(std::move(msg)); } } return res; } - int comm_session::on_message(std::string_view message) + /** + * Processes the next queued message (if any). + * @return 0 if no messages in queue. 1 if message was processed. -1 means session must be closed. + */ + int comm_session::process_next_inbound_message() { - increment_metric(SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, message.length()); + if (state != SESSION_STATE::ACTIVE) + return 0; - if (session_type == SESSION_TYPE::USER) - return user_sess_handler.on_message(*this, message); - else - return peer_sess_handler.on_message(*this, message); + std::vector msg; + if (in_msg_queue.try_dequeue(msg)) + { + std::string_view sv(msg.data(), msg.size()); + const int sess_handler_result = (session_type == SESSION_TYPE::USER) + ? user_sess_handler.on_message(*this, sv) + : peer_sess_handler.on_message(*this, sv); + + // If session handler returns -1 then that means the session must be closed. + // Otherwise it's considered message processing is successful. + return sess_handler_result == -1 ? -1 : 1; + } + + return 0; } int comm_session::send(const std::vector &message) const @@ -148,12 +205,28 @@ namespace comm if (writev(write_fd, memsegs, 2) == -1) { - LOG_ERR << errno << ": Session " << uniqueid << " send writev failed."; + LOG_ERR << errno << ": Session " << uniqueid.substr(0, 10) << " send writev failed."; return -1; } return 0; } + /** + * Mark the session as needing to close. The session will be properly "closed" + * and cleaned up by the global comm_server thread. + */ + void comm_session::mark_for_closure() + { + if (state == SESSION_STATE::CLOSED) + return; + + state = SESSION_STATE::MUST_CLOSE; + } + + /** + * Close the connection and wrap up any session processing threads. + * This will be only called by the global comm_server thread. + */ void comm_session::close(const bool invoke_handler) { if (state == SESSION_STATE::CLOSED) @@ -167,21 +240,25 @@ namespace comm peer_sess_handler.on_close(*this); } - ::close(read_fd); + should_stop_messaging_threads = true; // Set the messaging thread stop flag before closing the fds. state = SESSION_STATE::CLOSED; + ::close(read_fd); + if (read_fd != write_fd) + ::close(write_fd); + reader_thread.join(); LOG_DBG << (session_type == SESSION_TYPE::PEER ? "Peer" : "User") << " session closed: " - << uniqueid << (is_inbound ? "[in]" : "[out]") << (is_self ? "[self]" : ""); + << uniqueid.substr(0, 10) << (is_inbound ? "[in]" : "[out]") << (is_self ? "[self]" : ""); } /** - * Retrieves the length of the binary message pending to be read. Only relevant for Binary mode. + * Attempts to construct the full binary message pending to be read. Only relevant for Binary mode. * @param available_bytes Count of bytes that is available to read from the client socket. * @return -1 on error and client must be disconnected. 0 if no message data bytes were read. 1 if some * bytes were read but a full message is not yet formed. 2 if a fully formed message has been * read into the read buffer. */ - int comm_session::get_binary_msg_read_len(const size_t available_bytes) + int comm_session::attempt_binary_msg_construction(const size_t available_bytes) { // If we have previously encountered a size header and we are waiting until all message // bytes are received, we must have the expected message size > 0. diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp index 16e0ff2c..89d1ad37 100644 --- a/src/comm/comm_session.hpp +++ b/src/comm/comm_session.hpp @@ -7,64 +7,76 @@ namespace comm { - -enum CHALLENGE_STATUS -{ - CHALLENGE_ISSUED, - CHALLENGE_VERIFIED -}; -enum SESSION_STATE -{ - ACTIVE, - CLOSED -}; + enum CHALLENGE_STATUS + { + NOT_ISSUED, + CHALLENGE_ISSUED, + CHALLENGE_VERIFIED + }; -enum SESSION_TYPE -{ - USER = 0, - PEER = 1 -}; + enum SESSION_STATE + { + NOT_INITIALIZED, // Session is not yet initialized properly. + ACTIVE, // Session is active and functioning. + MUST_CLOSE, // Session socket is in unusable state and must be closed. + CLOSED // Session is fully closed. + }; -/** + enum SESSION_TYPE + { + USER = 0, + PEER = 1 + }; + + /** * Represents an active WebSocket connection */ -class comm_session -{ - const int read_fd = 0; - const int write_fd = 0; // Only valid for outgoing client connections. - const SESSION_TYPE session_type; - std::vector thresholds; // track down various communication thresholds - uint32_t expected_msg_size = 0; // Next expected message size based on size header. - std::vector read_buffer; // Local buffer to keep collecting data until a complete message can be constructed. - uint32_t read_buffer_filled_size = 0; // How many bytes have been buffered so far. + class comm_session + { + const int read_fd = 0; + const int write_fd = 0; + const SESSION_TYPE session_type; + const uint64_t max_msg_size = 0; + std::vector thresholds; // track down various communication thresholds + + uint32_t expected_msg_size = 0; // Next expected message size based on size header. + std::vector read_buffer; // Local buffer to keep collecting data until a complete message can be constructed. + uint32_t read_buffer_filled_size = 0; // How many bytes have been buffered so far. + + bool should_stop_messaging_threads = false; // Indicates whether messaging threads has been instructed to stop. + std::thread reader_thread; // The thread responsible for reading messages from the read fd. + moodycamel::ReaderWriterQueue> in_msg_queue; // Holds incoming messages waiting to be processed. - int get_binary_msg_read_len(const size_t available_bytes); - int on_message(std::string_view message); + void reader_loop(); + int attempt_read(); + int attempt_binary_msg_construction(const size_t available_bytes); -public: - const std::string address; // IP address of the remote party. - const bool is_binary; - const bool is_inbound; - bool is_self = false; - std::string uniqueid; - std::string issued_challenge; - conf::ip_port_pair known_ipport; - SESSION_STATE state; - CHALLENGE_STATUS challenge_status; + public: + const std::string address; // IP address of the remote party. + const bool is_binary; + const bool is_inbound; + bool is_self = false; + std::string uniqueid; + std::string issued_challenge; + conf::ip_port_pair known_ipport; + SESSION_STATE state = SESSION_STATE::NOT_INITIALIZED; + CHALLENGE_STATUS challenge_status = CHALLENGE_STATUS::NOT_ISSUED; - comm_session( - std::string_view ip, const int read_fd, const int write_fd, const SESSION_TYPE session_type, - const bool is_binary, const bool is_inbound, const uint64_t (&metric_thresholds)[4]); - int on_connect(); - int attempt_read(const uint64_t max_msg_size); - int send(const std::vector &message) const; - int send(std::string_view message) const; - void close(const bool invoke_handler = true); + comm_session( + std::string_view ip, const int read_fd, const int write_fd, const SESSION_TYPE session_type, + const bool is_binary, const bool is_inbound, const uint64_t (&metric_thresholds)[4], const uint64_t max_msg_size); + int on_connect(); + void start_messaging_threads(); + int process_next_inbound_message(); + int send(const std::vector &message) const; + int send(std::string_view message) const; + void mark_for_closure(); + void close(const bool invoke_handler = true); - void set_threshold(const SESSION_THRESHOLDS threshold_type, const uint64_t threshold_limit, const uint32_t intervalms); - void increment_metric(const SESSION_THRESHOLDS threshold_type, const uint64_t amount); -}; + void set_threshold(const SESSION_THRESHOLDS threshold_type, const uint64_t threshold_limit, const uint32_t intervalms); + void increment_metric(const SESSION_THRESHOLDS threshold_type, const uint64_t amount); + }; } // namespace comm diff --git a/src/main.cpp b/src/main.cpp index 49b566fe..b9870a49 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -137,9 +137,17 @@ void std_terminate() noexcept int main(int argc, char **argv) { - //seed rand + // seed rand srand(util::get_epoch_milliseconds()); + // Disable SIGPIPE to avoid crashing on broken pipe IO. + { + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGPIPE); + pthread_sigmask(SIG_BLOCK, &mask, NULL); + } + // Register exception handler for std exceptions. std::set_terminate(&std_terminate); diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index a923767e..0c0d8a98 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -108,10 +108,10 @@ namespace p2p session.challenge_status = comm::CHALLENGE_VERIFIED; return 0; } - else // New connection is not self but with same pub key. + else // New connection is not self but peer pub key already exists in our sessions. { comm::comm_session &ex_session = *iter->second; - // We don't allow duplicate connections to the same peer to same direction. + // We don't allow duplicate sessions to the same peer to same direction. if (ex_session.is_inbound != session.is_inbound) { // Decide whether we need to replace existing session with new session. @@ -124,11 +124,11 @@ namespace p2p session.uniqueid.swap(pubkeyhex); session.challenge_status = comm::CHALLENGE_VERIFIED; - ex_session.close(false); + ex_session.mark_for_closure(); p2p::ctx.peer_connections.erase(iter); // remove existing session. p2p::ctx.peer_connections.try_emplace(session.uniqueid, &session); // add new session. - LOG_DBG << "Replacing existing connection [" << session.uniqueid << "]"; + LOG_DBG << "Replacing existing connection [" << session.uniqueid.substr(0, 10) << "]"; return 0; } else if (ex_session.known_ipport.first.empty() || !session.known_ipport.first.empty()) @@ -139,7 +139,7 @@ namespace p2p } // Reaching this point means we don't need the new session. - LOG_DBG << "Rejecting new peer connection because existing connection takes priority [" << pubkeyhex << "]"; + LOG_DBG << "Rejecting new peer connection because existing connection takes priority [" << pubkeyhex.substr(0, 10) << "]"; return -1; } } diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 91e45224..ce4aa538 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -33,7 +33,7 @@ namespace p2p // Limit max number of inbound connections. if (conf::cfg.peermaxcons > 0 && ctx.peer_connections.size() >= conf::cfg.peermaxcons) { - LOG_DBG << "Max peer connections reached. Dropped connection " << session.uniqueid; + LOG_DBG << "Max peer connections reached. Dropped connection " << session.uniqueid.substr(0, 10); return -1; } } @@ -70,7 +70,7 @@ namespace p2p if (!recent_peermsg_hashes.try_emplace(crypto::get_hash(message))) { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_DUPMSGS_PER_MINUTE, 1); - LOG_DBG << "Duplicate peer message. " << session.uniqueid; + LOG_DBG << "Duplicate peer message. " << session.uniqueid.substr(0, 10); return 0; } @@ -99,7 +99,7 @@ namespace p2p if (session.challenge_status != comm::CHALLENGE_VERIFIED) { - LOG_DBG << "Cannot accept messages. Peer challenge unresolved. " << session.uniqueid; + LOG_DBG << "Cannot accept messages. Peer challenge unresolved. " << session.uniqueid.substr(0, 10); return 0; } @@ -109,7 +109,7 @@ namespace p2p if (p2pmsg::validate_container_trust(container) != 0) { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); - LOG_DBG << "Proposal rejected due to trust failure. " << session.uniqueid; + LOG_DBG << "Proposal rejected due to trust failure. " << session.uniqueid.substr(0, 10); return 0; } @@ -129,7 +129,7 @@ namespace p2p { if (p2pmsg::validate_container_trust(container) != 0) { - LOG_DBG << "NPL message rejected due to trust failure. " << session.uniqueid; + LOG_DBG << "NPL message rejected due to trust failure. " << session.uniqueid.substr(0, 10); return 0; } @@ -146,7 +146,7 @@ namespace p2p { if (p2pmsg::validate_container_trust(container) != 0) { - LOG_DBG << "State request message rejected due to trust failure. " << session.uniqueid; + LOG_DBG << "State request message rejected due to trust failure. " << session.uniqueid.substr(0, 10); return 0; } @@ -159,7 +159,7 @@ namespace p2p { if (p2pmsg::validate_container_trust(container) != 0) { - LOG_DBG << "State response message rejected due to trust failure. " << session.uniqueid; + LOG_DBG << "State response message rejected due to trust failure. " << session.uniqueid.substr(0, 10); return 0; } @@ -175,7 +175,7 @@ namespace p2p { if (p2pmsg::validate_container_trust(container) != 0) { - LOG_DBG << "History request message rejected due to trust failure. " << session.uniqueid; + LOG_DBG << "History request message rejected due to trust failure. " << session.uniqueid.substr(0, 10); return 0; } @@ -196,7 +196,7 @@ namespace p2p { if (p2pmsg::validate_container_trust(container) != 0) { - LOG_DBG << "History response message rejected due to trust failure. " << session.uniqueid; + LOG_DBG << "History response message rejected due to trust failure. " << session.uniqueid.substr(0, 10); return 0; } @@ -206,7 +206,7 @@ namespace p2p else { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); - LOG_DBG << "Received invalid peer message type. " << session.uniqueid; + LOG_DBG << "Received invalid peer message type. " << session.uniqueid.substr(0, 10); } return 0; } @@ -214,8 +214,11 @@ namespace p2p //peer session on message callback method void peer_session_handler::on_close(const comm::comm_session &session) const { + // Erase the corresponding uniqueid peer connection if it's this session. std::lock_guard lock(ctx.peer_connections_mutex); - ctx.peer_connections.erase(session.uniqueid); + const auto itr = ctx.peer_connections.find(session.uniqueid); + if (itr != ctx.peer_connections.end() && itr->second == &session) + ctx.peer_connections.erase(itr); } } // namespace p2p \ No newline at end of file diff --git a/src/pchheader.hpp b/src/pchheader.hpp index 3df578e8..c41be987 100644 --- a/src/pchheader.hpp +++ b/src/pchheader.hpp @@ -38,7 +38,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp index 9007af42..e09dec90 100644 --- a/src/usr/user_session_handler.cpp +++ b/src/usr/user_session_handler.cpp @@ -17,11 +17,11 @@ int user_session_handler::on_connect(comm::comm_session &session) const { if (conf::cfg.pubmaxcons > 0 && ctx.users.size() >= conf::cfg.pubmaxcons) { - LOG_DBG << "Max user connections reached. Dropped connection " << session.uniqueid; + LOG_DBG << "Max user connections reached. Dropped connection " << session.uniqueid.substr(0, 10); return -1; } - LOG_DBG << "User client connected " << session.uniqueid; + LOG_DBG << "User client connected " << session.uniqueid.substr(0, 10); // As soon as a user connects, we issue them a challenge message. We remember the // challenge we issued and later verify the user's response with it. @@ -61,13 +61,13 @@ int user_session_handler::on_message(comm::comm_session &session, std::string_vi if (handle_user_message(user, message) != 0) { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); - LOG_DBG << "Bad message from user " << session.uniqueid; + LOG_DBG << "Bad message from user " << session.uniqueid.substr(0, 10); } } else { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); - LOG_DBG << "User session id not found: " << session.uniqueid; + LOG_DBG << "User session id not found: " << session.uniqueid.substr(0, 10); } return 0; @@ -75,7 +75,7 @@ int user_session_handler::on_message(comm::comm_session &session, std::string_vi // If for any reason we reach this point, we should drop the connection because none of the // valid cases match. - LOG_DBG << "Dropping the user connection " << session.uniqueid; + LOG_DBG << "Dropping the user connection " << session.uniqueid.substr(0, 10); corebill::report_violation(session.address); return -1; } diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 4f70bc92..47bd1f14 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -73,7 +73,7 @@ namespace usr // The received message must be the challenge response. We need to verify it. if (session.issued_challenge.empty()) { - LOG_DBG << "No challenge found for the session " << session.uniqueid; + LOG_DBG << "No challenge found for the session " << session.uniqueid.substr(0, 10); return -1; } @@ -106,18 +106,18 @@ namespace usr add_user(session, userpubkey, user_protocol); // Add the user to the global authed user list session.issued_challenge.clear(); // Remove the stored challenge - LOG_DBG << "User connection " << session.uniqueid << " authenticated. Public key " + LOG_DBG << "User connection " << session.uniqueid.substr(0, 10) << " authenticated. Public key " << userpubkeyhex; return 0; } else { - LOG_DBG << "Duplicate user public key " << session.uniqueid; + LOG_DBG << "Duplicate user public key " << session.uniqueid.substr(0, 10); } } else { - LOG_DBG << "Challenge verification failed " << session.uniqueid; + LOG_DBG << "Challenge verification failed " << session.uniqueid.substr(0, 10); } return -1;