diff --git a/src/comm/comm_server.cpp b/src/comm/comm_server.cpp index 11b502cf..d1f64aa7 100644 --- a/src/comm/comm_server.cpp +++ b/src/comm/comm_server.cpp @@ -92,6 +92,8 @@ void comm_server::connection_watchdog( continue; } + util::sleep(10); + // Accept any new incoming connection if available. check_for_new_connection(sessions, accept_fd, session_type, is_binary, metric_thresholds); diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index 2102270e..be643f8a 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -71,15 +71,10 @@ int comm_session::attempt_read(const uint64_t max_msg_size) } else if (read_len > 0) { - available_bytes -= read_len; - if (is_binary) - available_bytes -= SIZE_HEADER_LEN; - - char msg_buf[read_len]; - if (read(read_fd, msg_buf, read_len) == -1) - return -1; - - return on_message(std::string_view(msg_buf, read_len)); + int res = on_message(std::string_view(read_buffer.data(), read_len)); + read_buffer.clear(); // Clear the buffer after read operation. + read_buffer_filled_size = 0; + return res; } } @@ -167,6 +162,8 @@ uint32_t comm_session::get_binary_msg_read_len(const size_t available_bytes) // If we have previously encountered a size header and we are waiting until all message // bytes are received, we must have the expected message size > 0. + size_t data_bytes = available_bytes; + // If we are not tracking a previous size header, then we must check for a size header. if (expected_msg_size == 0 && available_bytes >= SIZE_HEADER_LEN) { @@ -175,22 +172,40 @@ uint32_t comm_session::get_binary_msg_read_len(const size_t available_bytes) if (read(read_fd, header_buf, SIZE_HEADER_LEN) == -1) return -1; // Indicates that we should disconnect the client. + data_bytes -= SIZE_HEADER_LEN; + // We are using last 4 bytes (big endian) in the header for the message size. uint32_t upcoming_msg_size = (header_buf[4] << 24) + (header_buf[5] << 16) + (header_buf[6] << 8) + header_buf[7]; - // We must read the entire message if all message bytes are available. - if (available_bytes >= (SIZE_HEADER_LEN + upcoming_msg_size)) - return upcoming_msg_size; - // Remember the expected msg size until sufficient bytes are available. expected_msg_size = upcoming_msg_size; + read_buffer.resize(expected_msg_size); } - else if (expected_msg_size > 0 && available_bytes >= expected_msg_size) + + if (expected_msg_size > 0 && data_bytes > 0) { + // Claculate bytes remaining to form complete message. + const size_t remaining_len = expected_msg_size - read_buffer_filled_size; + // We know expected message size, and enough bytes are available to read complete expected message. - const uint32_t read_len = expected_msg_size; - expected_msg_size = 0; // reset the expected msg size. - return read_len; + if (data_bytes >= remaining_len) + { + // Complete the buffer by reading remaining bytes. + if (read(read_fd, read_buffer.data() + read_buffer_filled_size, remaining_len) == -1) + return -1; // Indicates that we should disconnect the client. + read_buffer_filled_size += remaining_len; + + const size_t read_len = expected_msg_size; + expected_msg_size = 0; // reset the expected msg size. + return read_len; + } + else + { + // Collect any available bytes to the buffer. + if (read(read_fd, read_buffer.data() + read_buffer_filled_size, data_bytes) == -1) + return -1; // Indicates that we should disconnect the client. + read_buffer_filled_size += data_bytes; + } } // Skip reading diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp index a7e42377..6c594743 100644 --- a/src/comm/comm_session.hpp +++ b/src/comm/comm_session.hpp @@ -42,6 +42,8 @@ class comm_session const SESSION_TYPE session_type; std::vector thresholds; // track down various communication thresholds uint32_t expected_msg_size = 0; // Next expected message size based on size header. + std::vector read_buffer; // Local buffer to keep collecting data until a complete message can be constructed. + uint32_t read_buffer_filled_size = 0; // How many bytes have been buffered so far. uint32_t get_binary_msg_read_len(const size_t available_bytes); int on_message(std::string_view message);