fix: Subscription source bugs fix (#1626)

For #1620.

- Add timeouts for websocket operations for connections to rippled.
Without these timeouts if connection hangs for some reason, clio
wouldn't know the connection is hanging.
- Fix potential data race in choosing new subscription source which will
forward messages to users.
- Optimise switching between subscription sources.
This commit is contained in:
Sergey Kuznetsov
2024-09-05 14:58:06 +01:00
committed by GitHub
parent 2e2740d4c5
commit 9fe9e7c9d2
8 changed files with 147 additions and 84 deletions

View File

@@ -24,6 +24,8 @@
#include "rpc/JS.hpp"
#include "util/Retry.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"
#include "util/requests/Types.hpp"
#include <boost/algorithm/string/classification.hpp>
@@ -66,7 +68,7 @@ SubscriptionSource::SubscriptionSource(
OnConnectHook onConnect,
OnDisconnectHook onDisconnect,
OnLedgerClosedHook onLedgerClosed,
std::chrono::steady_clock::duration const connectionTimeout,
std::chrono::steady_clock::duration const wsTimeout,
std::chrono::steady_clock::duration const retryDelay
)
: log_(fmt::format("SubscriptionSource[{}:{}]", ip, wsPort))
@@ -74,14 +76,20 @@ SubscriptionSource::SubscriptionSource(
, validatedLedgers_(std::move(validatedLedgers))
, subscriptions_(std::move(subscriptions))
, strand_(boost::asio::make_strand(ioContext))
, wsTimeout_(wsTimeout)
, retry_(util::makeRetryExponentialBackoff(retryDelay, RETRY_MAX_DELAY, strand_))
, onConnect_(std::move(onConnect))
, onDisconnect_(std::move(onDisconnect))
, onLedgerClosed_(std::move(onLedgerClosed))
, lastMessageTimeSecondsSinceEpoch_(PrometheusService::gaugeInt(
"subscription_source_last_message_time",
util::prometheus::Labels({{"source", fmt::format("{}:{}", ip, wsPort)}}),
"Seconds since epoch of the last message received from rippled subscription streams"
))
{
wsConnectionBuilder_.addHeader({boost::beast::http::field::user_agent, "clio-client"})
.addHeader({"X-User", "clio-client"})
.setConnectionTimeout(connectionTimeout);
.setConnectionTimeout(wsTimeout_);
}
SubscriptionSource::~SubscriptionSource()
@@ -167,21 +175,22 @@ SubscriptionSource::subscribe()
}
wsConnection_ = std::move(connection).value();
isConnected_ = true;
onConnect_();
LOG(log_.info()) << "Connected";
auto const& subscribeCommand = getSubscribeCommandJson();
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield);
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield, wsTimeout_);
if (writeErrorOpt) {
handleError(writeErrorOpt.value(), yield);
return;
}
isConnected_ = true;
LOG(log_.info()) << "Connected";
onConnect_();
retry_.reset();
while (!stop_) {
auto const message = wsConnection_->read(yield);
auto const message = wsConnection_->read(yield, wsTimeout_);
if (not message) {
handleError(message.error(), yield);
return;
@@ -256,8 +265,6 @@ SubscriptionSource::handleMessage(std::string const& message)
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) {
LOG(log_.debug()) << "Forwarding manifest: " << object;
subscriptions_->forwardManifest(object);
} else {
LOG(log_.error()) << "Unknown message: " << object;
}
}
}
@@ -278,10 +285,10 @@ void
SubscriptionSource::handleError(util::requests::RequestError const& error, boost::asio::yield_context yield)
{
isConnected_ = false;
isForwarding_ = false;
bool const wasForwarding = isForwarding_.exchange(false);
if (not stop_) {
onDisconnect_();
LOG(log_.info()) << "Disconnected";
onDisconnect_(wasForwarding);
}
if (wsConnection_ != nullptr) {
@@ -312,7 +319,11 @@ SubscriptionSource::logError(util::requests::RequestError const& error) const
void
SubscriptionSource::setLastMessageTime()
{
lastMessageTime_.lock().get() = std::chrono::steady_clock::now();
lastMessageTimeSecondsSinceEpoch_.get().set(
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count()
);
auto lock = lastMessageTime_.lock();
lock.get() = std::chrono::steady_clock::now();
}
void