mirror of
https://github.com/XRPLF/clio.git
synced 2026-04-29 15:37:53 +00:00
Fixes #1620. Cherry pick of #1626 into develop. - Add timeouts for websocket operations for connections to rippled. Without these timeouts if connection hangs for some reason, clio wouldn't know the connection is hanging. - Fix potential data race in choosing new subscription source which will forward messages to users. - Optimise switching between subscription sources.
This commit is contained in:
@@ -111,10 +111,13 @@ LoadBalancer::LoadBalancer(
|
||||
validatedLedgers,
|
||||
forwardingTimeout,
|
||||
[this]() {
|
||||
if (not hasForwardingSource_)
|
||||
if (not hasForwardingSource_.lock().get())
|
||||
chooseForwardingSource();
|
||||
},
|
||||
[this](bool wasForwarding) {
|
||||
if (wasForwarding)
|
||||
chooseForwardingSource();
|
||||
},
|
||||
[this]() { chooseForwardingSource(); },
|
||||
[this]() {
|
||||
if (forwardingCache_.has_value())
|
||||
forwardingCache_->invalidate();
|
||||
@@ -323,11 +326,12 @@ void
|
||||
LoadBalancer::chooseForwardingSource()
|
||||
{
|
||||
LOG(log_.info()) << "Choosing a new source to forward subscriptions";
|
||||
hasForwardingSource_ = false;
|
||||
auto hasForwardingSourceLock = hasForwardingSource_.lock();
|
||||
hasForwardingSourceLock.get() = false;
|
||||
for (auto& source : sources_) {
|
||||
if (not hasForwardingSource_ and source->isConnected()) {
|
||||
if (not hasForwardingSourceLock.get() and source->isConnected()) {
|
||||
source->setForwarding(true);
|
||||
hasForwardingSource_ = true;
|
||||
hasForwardingSourceLock.get() = true;
|
||||
} else {
|
||||
source->setForwarding(false);
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
#include "etl/Source.hpp"
|
||||
#include "etl/impl/ForwardingCache.hpp"
|
||||
#include "feed/SubscriptionManagerInterface.hpp"
|
||||
#include "rpc/Errors.hpp"
|
||||
#include "util/Mutex.hpp"
|
||||
#include "util/config/Config.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
|
||||
@@ -39,7 +39,6 @@
|
||||
#include <org/xrpl/rpc/v1/ledger.pb.h>
|
||||
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <expected>
|
||||
@@ -76,7 +75,10 @@ private:
|
||||
std::optional<ETLState> etlState_;
|
||||
std::uint32_t downloadRanges_ =
|
||||
DEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
|
||||
std::atomic_bool hasForwardingSource_{false};
|
||||
|
||||
// Using mutext instead of atomic_bool because choosing a new source to
|
||||
// forward messages should be done with a mutual exclusion otherwise there will be a race condition
|
||||
util::Mutex<bool> hasForwardingSource_{false};
|
||||
|
||||
public:
|
||||
/**
|
||||
|
||||
@@ -53,7 +53,7 @@ namespace etl {
|
||||
class SourceBase {
|
||||
public:
|
||||
using OnConnectHook = std::function<void()>;
|
||||
using OnDisconnectHook = std::function<void()>;
|
||||
using OnDisconnectHook = std::function<void(bool)>;
|
||||
using OnLedgerClosedHook = std::function<void()>;
|
||||
|
||||
virtual ~SourceBase() = default;
|
||||
|
||||
@@ -24,6 +24,8 @@
|
||||
#include "rpc/JS.hpp"
|
||||
#include "util/Retry.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
#include "util/prometheus/Label.hpp"
|
||||
#include "util/prometheus/Prometheus.hpp"
|
||||
#include "util/requests/Types.hpp"
|
||||
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
@@ -66,7 +68,7 @@ SubscriptionSource::SubscriptionSource(
|
||||
OnConnectHook onConnect,
|
||||
OnDisconnectHook onDisconnect,
|
||||
OnLedgerClosedHook onLedgerClosed,
|
||||
std::chrono::steady_clock::duration const connectionTimeout,
|
||||
std::chrono::steady_clock::duration const wsTimeout,
|
||||
std::chrono::steady_clock::duration const retryDelay
|
||||
)
|
||||
: log_(fmt::format("SubscriptionSource[{}:{}]", ip, wsPort))
|
||||
@@ -74,14 +76,20 @@ SubscriptionSource::SubscriptionSource(
|
||||
, validatedLedgers_(std::move(validatedLedgers))
|
||||
, subscriptions_(std::move(subscriptions))
|
||||
, strand_(boost::asio::make_strand(ioContext))
|
||||
, wsTimeout_(wsTimeout)
|
||||
, retry_(util::makeRetryExponentialBackoff(retryDelay, RETRY_MAX_DELAY, strand_))
|
||||
, onConnect_(std::move(onConnect))
|
||||
, onDisconnect_(std::move(onDisconnect))
|
||||
, onLedgerClosed_(std::move(onLedgerClosed))
|
||||
, lastMessageTimeSecondsSinceEpoch_(PrometheusService::gaugeInt(
|
||||
"subscription_source_last_message_time",
|
||||
util::prometheus::Labels({{"source", fmt::format("{}:{}", ip, wsPort)}}),
|
||||
"Seconds since epoch of the last message received from rippled subscription streams"
|
||||
))
|
||||
{
|
||||
wsConnectionBuilder_.addHeader({boost::beast::http::field::user_agent, "clio-client"})
|
||||
.addHeader({"X-User", "clio-client"})
|
||||
.setConnectionTimeout(connectionTimeout);
|
||||
.setConnectionTimeout(wsTimeout_);
|
||||
}
|
||||
|
||||
SubscriptionSource::~SubscriptionSource()
|
||||
@@ -167,21 +175,22 @@ SubscriptionSource::subscribe()
|
||||
}
|
||||
|
||||
wsConnection_ = std::move(connection).value();
|
||||
isConnected_ = true;
|
||||
onConnect_();
|
||||
LOG(log_.info()) << "Connected";
|
||||
|
||||
auto const& subscribeCommand = getSubscribeCommandJson();
|
||||
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield);
|
||||
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield, wsTimeout_);
|
||||
if (writeErrorOpt) {
|
||||
handleError(writeErrorOpt.value(), yield);
|
||||
return;
|
||||
}
|
||||
|
||||
isConnected_ = true;
|
||||
LOG(log_.info()) << "Connected";
|
||||
onConnect_();
|
||||
|
||||
retry_.reset();
|
||||
|
||||
while (!stop_) {
|
||||
auto const message = wsConnection_->read(yield);
|
||||
auto const message = wsConnection_->read(yield, wsTimeout_);
|
||||
if (not message) {
|
||||
handleError(message.error(), yield);
|
||||
return;
|
||||
@@ -256,8 +265,6 @@ SubscriptionSource::handleMessage(std::string const& message)
|
||||
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) {
|
||||
LOG(log_.debug()) << "Forwarding manifest: " << object;
|
||||
subscriptions_->forwardManifest(object);
|
||||
} else {
|
||||
LOG(log_.error()) << "Unknown message: " << object;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -278,10 +285,10 @@ void
|
||||
SubscriptionSource::handleError(util::requests::RequestError const& error, boost::asio::yield_context yield)
|
||||
{
|
||||
isConnected_ = false;
|
||||
isForwarding_ = false;
|
||||
bool const wasForwarding = isForwarding_.exchange(false);
|
||||
if (not stop_) {
|
||||
onDisconnect_();
|
||||
LOG(log_.info()) << "Disconnected";
|
||||
onDisconnect_(wasForwarding);
|
||||
}
|
||||
|
||||
if (wsConnection_ != nullptr) {
|
||||
@@ -312,7 +319,11 @@ SubscriptionSource::logError(util::requests::RequestError const& error) const
|
||||
void
|
||||
SubscriptionSource::setLastMessageTime()
|
||||
{
|
||||
lastMessageTime_.lock().get() = std::chrono::steady_clock::now();
|
||||
lastMessageTimeSecondsSinceEpoch_.get().set(
|
||||
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count()
|
||||
);
|
||||
auto lock = lastMessageTime_.lock();
|
||||
lock.get() = std::chrono::steady_clock::now();
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include "util/Mutex.hpp"
|
||||
#include "util/Retry.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
#include "util/prometheus/Gauge.hpp"
|
||||
#include "util/requests/Types.hpp"
|
||||
#include "util/requests/WsConnection.hpp"
|
||||
|
||||
@@ -37,6 +38,7 @@
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <future>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
@@ -71,6 +73,8 @@ private:
|
||||
|
||||
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
|
||||
|
||||
std::chrono::steady_clock::duration wsTimeout_;
|
||||
|
||||
util::Retry retry_;
|
||||
|
||||
OnConnectHook onConnect_;
|
||||
@@ -83,9 +87,11 @@ private:
|
||||
|
||||
util::Mutex<std::chrono::steady_clock::time_point> lastMessageTime_;
|
||||
|
||||
std::reference_wrapper<util::prometheus::GaugeInt> lastMessageTimeSecondsSinceEpoch_;
|
||||
|
||||
std::future<void> runFuture_;
|
||||
|
||||
static constexpr std::chrono::seconds CONNECTION_TIMEOUT{30};
|
||||
static constexpr std::chrono::seconds WS_TIMEOUT{30};
|
||||
static constexpr std::chrono::seconds RETRY_MAX_DELAY{30};
|
||||
static constexpr std::chrono::seconds RETRY_DELAY{1};
|
||||
|
||||
@@ -103,7 +109,7 @@ public:
|
||||
* @param onDisconnect The onDisconnect hook. Called when the connection is lost
|
||||
* @param onLedgerClosed The onLedgerClosed hook. Called when the ledger is closed but only if the source is
|
||||
* forwarding
|
||||
* @param connectionTimeout The connection timeout. Defaults to 30 seconds
|
||||
* @param wsTimeout A timeout for websocket operations. Defaults to 30 seconds
|
||||
* @param retryDelay The retry delay. Defaults to 1 second
|
||||
*/
|
||||
SubscriptionSource(
|
||||
@@ -115,7 +121,7 @@ public:
|
||||
OnConnectHook onConnect,
|
||||
OnDisconnectHook onDisconnect,
|
||||
OnLedgerClosedHook onLedgerClosed,
|
||||
std::chrono::steady_clock::duration const connectionTimeout = CONNECTION_TIMEOUT,
|
||||
std::chrono::steady_clock::duration const wsTimeout = WS_TIMEOUT,
|
||||
std::chrono::steady_clock::duration const retryDelay = RETRY_DELAY
|
||||
);
|
||||
|
||||
|
||||
@@ -39,8 +39,10 @@
|
||||
#include <boost/beast/websocket/stream_base.hpp>
|
||||
#include <boost/system/errc.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <expected>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
@@ -123,15 +125,20 @@ private:
|
||||
static void
|
||||
withTimeout(Operation&& operation, boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout)
|
||||
{
|
||||
auto isCompleted = std::make_shared<bool>(false);
|
||||
boost::asio::cancellation_signal cancellationSignal;
|
||||
auto cyield = boost::asio::bind_cancellation_slot(cancellationSignal.slot(), yield);
|
||||
|
||||
boost::asio::steady_timer timer{boost::asio::get_associated_executor(cyield), timeout};
|
||||
timer.async_wait([&cancellationSignal](boost::system::error_code errorCode) {
|
||||
if (!errorCode)
|
||||
|
||||
// The timer below can be called with no error code even if the operation is completed before the timeout, so we
|
||||
// need an additional flag here
|
||||
timer.async_wait([&cancellationSignal, isCompleted](boost::system::error_code errorCode) {
|
||||
if (!errorCode and not *isCompleted)
|
||||
cancellationSignal.emit(boost::asio::cancellation_type::terminal);
|
||||
});
|
||||
operation(cyield);
|
||||
*isCompleted = true;
|
||||
}
|
||||
|
||||
static boost::system::error_code
|
||||
|
||||
Reference in New Issue
Block a user