fix: Subscription source bugs fix (#1626) (#1633)

Fixes #1620.
Cherry pick of #1626 into develop.

- Add timeouts for websocket operations for connections to rippled.
Without these timeouts if connection hangs for some reason, clio
wouldn't know the connection is hanging.
- Fix potential data race in choosing new subscription source which will
forward messages to users.
- Optimise switching between subscription sources.
This commit is contained in:
Sergey Kuznetsov
2024-09-06 14:35:18 +01:00
committed by GitHub
parent 1d33b8e88a
commit 7088ebad97
8 changed files with 147 additions and 83 deletions

View File

@@ -25,6 +25,7 @@
#include "util/Mutex.hpp"
#include "util/Retry.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/requests/Types.hpp"
#include "util/requests/WsConnection.hpp"
@@ -37,6 +38,7 @@
#include <atomic>
#include <chrono>
#include <cstdint>
#include <functional>
#include <future>
#include <memory>
#include <optional>
@@ -71,6 +73,8 @@ private:
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
std::chrono::steady_clock::duration wsTimeout_;
util::Retry retry_;
OnConnectHook onConnect_;
@@ -83,9 +87,11 @@ private:
util::Mutex<std::chrono::steady_clock::time_point> lastMessageTime_;
std::reference_wrapper<util::prometheus::GaugeInt> lastMessageTimeSecondsSinceEpoch_;
std::future<void> runFuture_;
static constexpr std::chrono::seconds CONNECTION_TIMEOUT{30};
static constexpr std::chrono::seconds WS_TIMEOUT{30};
static constexpr std::chrono::seconds RETRY_MAX_DELAY{30};
static constexpr std::chrono::seconds RETRY_DELAY{1};
@@ -103,7 +109,7 @@ public:
* @param onDisconnect The onDisconnect hook. Called when the connection is lost
* @param onLedgerClosed The onLedgerClosed hook. Called when the ledger is closed but only if the source is
* forwarding
* @param connectionTimeout The connection timeout. Defaults to 30 seconds
* @param wsTimeout A timeout for websocket operations. Defaults to 30 seconds
* @param retryDelay The retry delay. Defaults to 1 second
*/
SubscriptionSource(
@@ -115,7 +121,7 @@ public:
OnConnectHook onConnect,
OnDisconnectHook onDisconnect,
OnLedgerClosedHook onLedgerClosed,
std::chrono::steady_clock::duration const connectionTimeout = CONNECTION_TIMEOUT,
std::chrono::steady_clock::duration const wsTimeout = WS_TIMEOUT,
std::chrono::steady_clock::duration const retryDelay = RETRY_DELAY
);