mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-06-09 11:46:52 +00:00
Fixed incorrect memory access issues. (#109)
This commit is contained in:
@@ -79,22 +79,15 @@ namespace comm
|
||||
// Counter to track when to initiate outbound client connections.
|
||||
int16_t loop_counter = -1;
|
||||
|
||||
// Indicates whether at least some bytes were read from any of the clients during the previous iteration.
|
||||
// If no bytes were read, we would force thread sleep to wait for bytes to arrive.
|
||||
bool bytes_read = false;
|
||||
|
||||
while (true)
|
||||
{
|
||||
if (should_stop_listening)
|
||||
break;
|
||||
|
||||
// Prepare poll fd list.
|
||||
const size_t fd_count = sessions.size() + 1; //+1 for the inclusion of accept_fd
|
||||
pollfd pollfds[fd_count];
|
||||
if (poll_fds(pollfds, accept_fd, sessions) == -1)
|
||||
{
|
||||
util::sleep(10);
|
||||
continue;
|
||||
}
|
||||
|
||||
util::sleep(10);
|
||||
|
||||
// Accept any new incoming connection if available.
|
||||
check_for_new_connection(sessions, accept_fd, session_type, is_binary, metric_thresholds);
|
||||
|
||||
@@ -109,9 +102,27 @@ namespace comm
|
||||
loop_counter++;
|
||||
}
|
||||
|
||||
const size_t sessions_count = sessions.size();
|
||||
// Prepare poll fd list.
|
||||
const size_t fd_count = sessions.size() + 1; //+1 for the inclusion of accept_fd
|
||||
|
||||
pollfd pollfds[fd_count];
|
||||
memset(pollfds, 0, sizeof(pollfd) * fd_count);
|
||||
|
||||
if (poll_fds(pollfds, accept_fd, sessions) == -1)
|
||||
{
|
||||
util::sleep(10);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!bytes_read)
|
||||
util::sleep(10);
|
||||
bytes_read = false;
|
||||
|
||||
// Loop through all session fds and read any data.
|
||||
const size_t sessions_count = sessions.size();
|
||||
if (sessions_count == 0)
|
||||
continue;
|
||||
|
||||
// Loop through all fds and read any data.
|
||||
for (size_t i = 1; i <= sessions_count; i++)
|
||||
{
|
||||
const short result = pollfds[i].revents;
|
||||
@@ -126,7 +137,18 @@ namespace comm
|
||||
if (!should_disconnect)
|
||||
{
|
||||
if (result & POLLIN)
|
||||
should_disconnect = (session.attempt_read(max_msg_size) == -1);
|
||||
{
|
||||
const int read_result = session.attempt_read(max_msg_size);
|
||||
|
||||
// read_result -1 means error and we should disconnect the client.
|
||||
// read_result 0 means no bytes were read.
|
||||
// read_result 1 means some bytes were read.
|
||||
// read_result 2 means full message were read and processed successfully.
|
||||
if (read_result > 0)
|
||||
bytes_read = true;
|
||||
else if (read_result == -1)
|
||||
should_disconnect = true;
|
||||
}
|
||||
|
||||
if (result & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL))
|
||||
should_disconnect = true;
|
||||
|
||||
@@ -12,6 +12,7 @@ namespace comm
|
||||
|
||||
constexpr uint32_t INTERVALMS = 60000;
|
||||
constexpr uint8_t SIZE_HEADER_LEN = 8;
|
||||
constexpr uint32_t READ_BUFFER_IDLE_SIZE = 64 * 1024;
|
||||
|
||||
// Global instances of user and peer session handlers.
|
||||
usr::user_session_handler user_sess_handler;
|
||||
@@ -48,8 +49,10 @@ namespace comm
|
||||
|
||||
/**
|
||||
* Attempts to read message data from the given socket fd and passes the message on to the session.
|
||||
* @param should_disconnect Whether the client fd must be disconnected.
|
||||
* @param max_msg_size The allowed max byte length of a message to be read.
|
||||
* @return -1 on error and client must be disconnected. 0 if no message data bytes were read. 1 if some
|
||||
* bytes were read but a full message is not yet formed. 2 if a fully formed message has been
|
||||
* read into the read buffer.
|
||||
*/
|
||||
int comm_session::attempt_read(const uint64_t max_msg_size)
|
||||
{
|
||||
@@ -59,33 +62,39 @@ namespace comm
|
||||
available_bytes > (max_msg_size + (is_binary ? SIZE_HEADER_LEN : 0))))
|
||||
return -1;
|
||||
|
||||
int res = 0;
|
||||
|
||||
// Try to read a complete message using available bytes.
|
||||
// If complete message is not available silently return.
|
||||
if (available_bytes > 0)
|
||||
{
|
||||
const uint32_t read_len = is_binary ? get_binary_msg_read_len(available_bytes) : available_bytes;
|
||||
|
||||
if (read_len == -1)
|
||||
if (is_binary)
|
||||
{
|
||||
return -1;
|
||||
res = get_binary_msg_read_len(available_bytes);
|
||||
}
|
||||
else if (read_len > 0)
|
||||
else
|
||||
{
|
||||
if (!is_binary)
|
||||
read_buffer.resize(available_bytes);
|
||||
res = read(read_fd, read_buffer.data(), available_bytes) < available_bytes ? -1 : 2;
|
||||
}
|
||||
|
||||
if (res == 2) // Full message has been read into read buffer.
|
||||
{
|
||||
if (on_message(std::string_view(read_buffer.data(), read_buffer.size())) == -1)
|
||||
res = -1;
|
||||
|
||||
// Reset the read buffer.
|
||||
if (read_buffer.size() > READ_BUFFER_IDLE_SIZE)
|
||||
{
|
||||
read_buffer.resize(read_len);
|
||||
if (read(read_fd, read_buffer.data(), read_len) < read_len)
|
||||
return -1;
|
||||
read_buffer.resize(READ_BUFFER_IDLE_SIZE);
|
||||
read_buffer.shrink_to_fit(); // This is to avaoid large idle memory allocations.
|
||||
}
|
||||
|
||||
int res = on_message(std::string_view(read_buffer.data(), read_len));
|
||||
read_buffer.clear(); // Clear the buffer after read operation.
|
||||
read_buffer.clear();
|
||||
read_buffer_filled_size = 0;
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
return res;
|
||||
}
|
||||
|
||||
int comm_session::on_message(std::string_view message)
|
||||
@@ -168,9 +177,11 @@ namespace comm
|
||||
/**
|
||||
* Retrieves the length of the binary message pending to be read. Only relevant for Binary mode.
|
||||
* @param available_bytes Count of bytes that is available to read from the client socket.
|
||||
* @return Length of the message if the complete message available to be read. 0 if reading must be skipped. -1 if client must be disconnected.
|
||||
* @return -1 on error and client must be disconnected. 0 if no message data bytes were read. 1 if some
|
||||
* bytes were read but a full message is not yet formed. 2 if a fully formed message has been
|
||||
* read into the read buffer.
|
||||
*/
|
||||
uint32_t comm_session::get_binary_msg_read_len(const size_t available_bytes)
|
||||
int comm_session::get_binary_msg_read_len(const size_t available_bytes)
|
||||
{
|
||||
// If we have previously encountered a size header and we are waiting until all message
|
||||
// bytes are received, we must have the expected message size > 0.
|
||||
@@ -210,7 +221,8 @@ namespace comm
|
||||
|
||||
const size_t read_len = expected_msg_size;
|
||||
expected_msg_size = 0; // reset the expected msg size.
|
||||
return read_len;
|
||||
|
||||
return 2; // Full message has been read.
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -218,11 +230,12 @@ namespace comm
|
||||
if (read(read_fd, read_buffer.data() + read_buffer_filled_size, data_bytes) == -1)
|
||||
return -1; // Indicates that we should disconnect the client.
|
||||
read_buffer_filled_size += data_bytes;
|
||||
|
||||
return 1; // Some bytes were read, but full message is not yet formed.
|
||||
}
|
||||
}
|
||||
|
||||
// Skip reading
|
||||
return 0;
|
||||
return 0; // No message data bytes was read.
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -39,7 +39,7 @@ class comm_session
|
||||
std::vector<char> read_buffer; // Local buffer to keep collecting data until a complete message can be constructed.
|
||||
uint32_t read_buffer_filled_size = 0; // How many bytes have been buffered so far.
|
||||
|
||||
uint32_t get_binary_msg_read_len(const size_t available_bytes);
|
||||
int get_binary_msg_read_len(const size_t available_bytes);
|
||||
int on_message(std::string_view message);
|
||||
|
||||
public:
|
||||
|
||||
@@ -33,10 +33,10 @@ enum SESSION_THRESHOLDS
|
||||
*/
|
||||
struct session_threshold
|
||||
{
|
||||
uint64_t threshold_limit;
|
||||
uint32_t intervalms;
|
||||
uint64_t counter_value;
|
||||
uint64_t timestamp;
|
||||
uint64_t threshold_limit = 0;
|
||||
uint32_t intervalms = 0;
|
||||
uint64_t counter_value = 0;
|
||||
uint64_t timestamp = 0;
|
||||
|
||||
session_threshold(const uint64_t threshold_limit, const uint32_t intervalms)
|
||||
: threshold_limit(threshold_limit), intervalms(intervalms)
|
||||
|
||||
Reference in New Issue
Block a user