Implemented websocket read bufferring. (#92)

This commit is contained in:
Ravin Perera
2020-04-16 19:31:23 +05:30
committed by GitHub
parent 7a7dd8698d
commit 721daf19f3
3 changed files with 36 additions and 17 deletions

View File

@@ -92,6 +92,8 @@ void comm_server::connection_watchdog(
continue;
}
util::sleep(10);
// Accept any new incoming connection if available.
check_for_new_connection(sessions, accept_fd, session_type, is_binary, metric_thresholds);

View File

@@ -71,15 +71,10 @@ int comm_session::attempt_read(const uint64_t max_msg_size)
}
else if (read_len > 0)
{
available_bytes -= read_len;
if (is_binary)
available_bytes -= SIZE_HEADER_LEN;
char msg_buf[read_len];
if (read(read_fd, msg_buf, read_len) == -1)
return -1;
return on_message(std::string_view(msg_buf, read_len));
int res = on_message(std::string_view(read_buffer.data(), read_len));
read_buffer.clear(); // Clear the buffer after read operation.
read_buffer_filled_size = 0;
return res;
}
}
@@ -167,6 +162,8 @@ uint32_t comm_session::get_binary_msg_read_len(const size_t available_bytes)
// If we have previously encountered a size header and we are waiting until all message
// bytes are received, we must have the expected message size > 0.
size_t data_bytes = available_bytes;
// If we are not tracking a previous size header, then we must check for a size header.
if (expected_msg_size == 0 && available_bytes >= SIZE_HEADER_LEN)
{
@@ -175,22 +172,40 @@ uint32_t comm_session::get_binary_msg_read_len(const size_t available_bytes)
if (read(read_fd, header_buf, SIZE_HEADER_LEN) == -1)
return -1; // Indicates that we should disconnect the client.
data_bytes -= SIZE_HEADER_LEN;
// We are using last 4 bytes (big endian) in the header for the message size.
uint32_t upcoming_msg_size = (header_buf[4] << 24) + (header_buf[5] << 16) + (header_buf[6] << 8) + header_buf[7];
// We must read the entire message if all message bytes are available.
if (available_bytes >= (SIZE_HEADER_LEN + upcoming_msg_size))
return upcoming_msg_size;
// Remember the expected msg size until sufficient bytes are available.
expected_msg_size = upcoming_msg_size;
read_buffer.resize(expected_msg_size);
}
else if (expected_msg_size > 0 && available_bytes >= expected_msg_size)
if (expected_msg_size > 0 && data_bytes > 0)
{
// Claculate bytes remaining to form complete message.
const size_t remaining_len = expected_msg_size - read_buffer_filled_size;
// We know expected message size, and enough bytes are available to read complete expected message.
const uint32_t read_len = expected_msg_size;
expected_msg_size = 0; // reset the expected msg size.
return read_len;
if (data_bytes >= remaining_len)
{
// Complete the buffer by reading remaining bytes.
if (read(read_fd, read_buffer.data() + read_buffer_filled_size, remaining_len) == -1)
return -1; // Indicates that we should disconnect the client.
read_buffer_filled_size += remaining_len;
const size_t read_len = expected_msg_size;
expected_msg_size = 0; // reset the expected msg size.
return read_len;
}
else
{
// Collect any available bytes to the buffer.
if (read(read_fd, read_buffer.data() + read_buffer_filled_size, data_bytes) == -1)
return -1; // Indicates that we should disconnect the client.
read_buffer_filled_size += data_bytes;
}
}
// Skip reading

View File

@@ -42,6 +42,8 @@ class comm_session
const SESSION_TYPE session_type;
std::vector<session_threshold> thresholds; // track down various communication thresholds
uint32_t expected_msg_size = 0; // Next expected message size based on size header.
std::vector<char> read_buffer; // Local buffer to keep collecting data until a complete message can be constructed.
uint32_t read_buffer_filled_size = 0; // How many bytes have been buffered so far.
uint32_t get_binary_msg_read_len(const size_t available_bytes);
int on_message(std::string_view message);