From d6751c3e52b3bf9c0b4b9bb8538d06886b30c0c8 Mon Sep 17 00:00:00 2001 From: ravinsp <33562092+ravinsp@users.noreply.github.com> Date: Mon, 16 Dec 2019 14:49:33 +0530 Subject: [PATCH] Added better error handling for socket errors. --- src/cons/state_handler.cpp | 12 ++-- src/main.cpp | 2 +- src/sock/socket_session.cpp | 39 +++++++--- src/sock/socket_session.hpp | 4 +- src/sock/socket_session_lambda.cpp | 110 ++++++++++++++++++++--------- 5 files changed, 119 insertions(+), 48 deletions(-) diff --git a/src/cons/state_handler.cpp b/src/cons/state_handler.cpp index b25a42ea..dca12bde 100644 --- a/src/cons/state_handler.cpp +++ b/src/cons/state_handler.cpp @@ -11,8 +11,12 @@ namespace cons { -constexpr uint16_t MAX_AWAITING_REQUESTS = 1; -constexpr uint16_t MAX_RESPONSE_WAIT_CYCLES = 10; +// Max number of requests that can be awaiting response at any given time. +constexpr uint16_t MAX_AWAITING_REQUESTS = 4; +// Syncing loop sleep delay. +constexpr uint16_t SYNC_LOOP_WAIT = 50; +// No of loop cycles to wait for a response before resubmitting request. +constexpr uint16_t MAX_AWAITING_CYCLES = 1000 / SYNC_LOOP_WAIT; // List of state responses flatbuffer messages to be processed. std::list candidate_state_responses; @@ -100,7 +104,7 @@ int run_state_sync_iterator() { while (true) { - util::sleep(120); + util::sleep(SYNC_LOOP_WAIT); // TODO: Also bypass peer session handler responses if not syncing. if (!ctx.is_state_syncing) @@ -155,7 +159,7 @@ int run_state_sync_iterator() // Check for long-awaited responses and re-request them. for (auto &[hash, request] : submitted_requests) { - if (request.waiting_cycles < MAX_RESPONSE_WAIT_CYCLES) + if (request.waiting_cycles < MAX_AWAITING_CYCLES) { // Increment counter. request.waiting_cycles++; diff --git a/src/main.cpp b/src/main.cpp index b7a8458e..a965eab6 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -115,7 +115,7 @@ void std_terminate() noexcept } catch (std::exception &ex) { - std::cerr << "std error: " << ex.what() << "\n"; + LOG_ERR << "std error: " << ex.what() << "\n"; } catch (...) { diff --git a/src/sock/socket_session.cpp b/src/sock/socket_session.cpp index d64a18ea..68bde4f2 100644 --- a/src/sock/socket_session.cpp +++ b/src/sock/socket_session.cpp @@ -226,17 +226,29 @@ void socket_session::on_read(const error_code ec, const std::size_t) template void socket_session::send(const T msg) { - // Always add to queue - queue.push_back(std::move(msg)); + try + { + std::lock_guard lock(send_mutex); + + // Always add to queue + queue.push_back(std::move(msg)); + //using sync write until async_write is properly handled for multi-threaded writes. + ws.write(net::buffer(queue.front().buffer())); + queue.erase(queue.begin()); + } + catch (...) + { + this->handle_exception("sync_write"); + } // Are we already writing? - if (queue.size() > 1) - return; + // if (queue.size() > 1) + // return; - std::string_view sv = queue.front().buffer(); + // std::string_view sv = queue.front().buffer(); - // We are not currently writing, so send this immediately - ws_async_write(sv); + // // We are not currently writing, so send this immediately + // ws_async_write(sv); } /* @@ -278,7 +290,7 @@ template void socket_session::on_close(const error_code ec, const int8_t type) { sess_handler.on_close(this); - + if (type == 1) return; @@ -300,6 +312,17 @@ void socket_session::fail(const error_code ec, char const *what) return; } +template +void socket_session::handle_exception(std::string_view event_name) +{ + std::exception_ptr p = std::current_exception(); + LOG_ERR << "Socket Exception on " << event_name << ": " << (p ? p.__cxa_exception_type()->name() : "null") << std::endl; + + // Close the socket on any event error except close event. + if (event_name != "close") + this->ws_async_close(); +} + template socket_session::~socket_session() { diff --git a/src/sock/socket_session.hpp b/src/sock/socket_session.hpp index 88bd54db..8e513cd0 100644 --- a/src/sock/socket_session.hpp +++ b/src/sock/socket_session.hpp @@ -90,7 +90,8 @@ class socket_session : public std::enable_shared_from_this> websocket::stream> ws; // websocket stream used send an recieve messages std::vector queue; // used to store messages temporarily until it is sent to the relevant party socket_session_handler &sess_handler; // handler passed to gain access to websocket events - std::vector thresholds; // track down various communication thresholds + std::vector thresholds; // track down various communication thresholds + std::mutex send_mutex; // mutex for calling send() void fail(const error_code ec, char const *what); @@ -120,6 +121,7 @@ class socket_session : public std::enable_shared_from_this> void ws_async_close(); + void handle_exception(std::string_view event_name); public: diff --git a/src/sock/socket_session_lambda.cpp b/src/sock/socket_session_lambda.cpp index a36510e2..0abbfdc9 100644 --- a/src/sock/socket_session_lambda.cpp +++ b/src/sock/socket_session_lambda.cpp @@ -18,65 +18,107 @@ namespace sock template void socket_session::ws_next_layer_async_handshake(const ssl::stream_base::handshake_type handshake_type) { - // Perform the SSL handshake - ws.next_layer().async_handshake( - handshake_type, - [sp = this->shared_from_this()](error_code ec) { - sp->on_ssl_handshake(ec); - }); + try + { + // Perform the SSL handshake + ws.next_layer().async_handshake( + handshake_type, + [sp = this->shared_from_this()](error_code ec) { + sp->on_ssl_handshake(ec); + }); + } + catch (...) + { + this->handle_exception("ssl_handshake"); + } } template void socket_session::ws_async_accept() { - ws.async_accept( - [sp = this->shared_from_this()]( - error_code ec) { - sp->on_accept(ec); - }); + try + { + ws.async_accept( + [sp = this->shared_from_this()]( + error_code ec) { + sp->on_accept(ec); + }); + } + catch (...) + { + this->handle_exception("accept"); + } } template void socket_session::ws_async_handshake() { - ws.async_handshake(this->address, "/", - [sp = this->shared_from_this()]( - error_code ec) { - sp->on_accept(ec); - }); + try + { + ws.async_handshake(this->address, "/", + [sp = this->shared_from_this()]( + error_code ec) { + sp->on_accept(ec); + }); + } + catch (...) + { + this->handle_exception("handshake"); + } } template void socket_session::ws_async_read() { - ws.async_read( - buffer, - [sp = this->shared_from_this()]( - error_code ec, std::size_t bytes) { - sp->on_read(ec, bytes); - }); + try + { + ws.async_read( + buffer, + [sp = this->shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_read(ec, bytes); + }); + } + catch (...) + { + this->handle_exception("read"); + } } template void socket_session::ws_async_write(std::string_view message) { - ws.async_write( - // Project the outbound_message buffer from the queue front into the asio buffer. - net::buffer(message.data(), message.length()), - [sp = this->shared_from_this()]( - error_code ec, std::size_t bytes) { - sp->on_write(ec, bytes); - }); + try + { + ws.async_write( + // Project the outbound_message buffer from the queue front into the asio buffer. + net::buffer(message.data(), message.length()), + [sp = this->shared_from_this()]( + error_code ec, std::size_t bytes) { + sp->on_write(ec, bytes); + }); + } + catch (...) + { + this->handle_exception("write"); + } } template void socket_session::ws_async_close() { - ws.async_close(websocket::close_code::normal, - [sp = this->shared_from_this()]( - error_code ec) { - sp->on_close(ec, 0); - }); + try + { + ws.async_close(websocket::close_code::normal, + [sp = this->shared_from_this()]( + error_code ec) { + sp->on_close(ec, 0); + }); + } + catch (...) + { + this->handle_exception("close"); + } } // Template instantiations.