mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-04 11:55:51 +00:00
@@ -322,6 +322,7 @@ LoadBalancer::getETLState() noexcept
|
||||
void
|
||||
LoadBalancer::chooseForwardingSource()
|
||||
{
|
||||
LOG(log_.info()) << "Choosing a new source to forward subscriptions";
|
||||
hasForwardingSource_ = false;
|
||||
for (auto& source : sources_) {
|
||||
if (not hasForwardingSource_ and source->isConnected()) {
|
||||
|
||||
@@ -47,7 +47,7 @@
|
||||
namespace etl::impl {
|
||||
|
||||
GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::shared_ptr<BackendInterface> backend)
|
||||
: log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
|
||||
: log_(fmt::format("GrpcSource[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
|
||||
{
|
||||
try {
|
||||
boost::asio::io_context ctx;
|
||||
|
||||
@@ -69,7 +69,7 @@ SubscriptionSource::SubscriptionSource(
|
||||
std::chrono::steady_clock::duration const connectionTimeout,
|
||||
std::chrono::steady_clock::duration const retryDelay
|
||||
)
|
||||
: log_(fmt::format("GrpcSource[{}:{}]", ip, wsPort))
|
||||
: log_(fmt::format("SubscriptionSource[{}:{}]", ip, wsPort))
|
||||
, wsConnectionBuilder_(ip, wsPort)
|
||||
, validatedLedgers_(std::move(validatedLedgers))
|
||||
, subscriptions_(std::move(subscriptions))
|
||||
@@ -133,6 +133,7 @@ void
|
||||
SubscriptionSource::setForwarding(bool isForwarding)
|
||||
{
|
||||
isForwarding_ = isForwarding;
|
||||
LOG(log_.info()) << "Forwarding set to " << isForwarding_;
|
||||
}
|
||||
|
||||
std::chrono::steady_clock::time_point
|
||||
@@ -168,6 +169,7 @@ SubscriptionSource::subscribe()
|
||||
wsConnection_ = std::move(connection).value();
|
||||
isConnected_ = true;
|
||||
onConnect_();
|
||||
LOG(log_.info()) << "Connected";
|
||||
|
||||
auto const& subscribeCommand = getSubscribeCommandJson();
|
||||
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield);
|
||||
@@ -224,10 +226,11 @@ SubscriptionSource::handleMessage(std::string const& message)
|
||||
auto validatedLedgers = boost::json::value_to<std::string>(result.at(JS(validated_ledgers)));
|
||||
setValidatedRange(std::move(validatedLedgers));
|
||||
}
|
||||
LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object;
|
||||
LOG(log_.debug()) << "Received a message on ledger subscription stream. Message: " << object;
|
||||
|
||||
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_LedgerClosed) {
|
||||
LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object;
|
||||
LOG(log_.debug()) << "Received a message of type 'ledgerClosed' on ledger subscription stream. Message: "
|
||||
<< object;
|
||||
if (object.contains(JS(ledger_index))) {
|
||||
ledgerIndex = object.at(JS(ledger_index)).as_int64();
|
||||
}
|
||||
@@ -245,11 +248,16 @@ SubscriptionSource::handleMessage(std::string const& message)
|
||||
// 2 - Validated transaction
|
||||
// Only forward proposed transaction, validated transactions are sent by Clio itself
|
||||
if (object.contains(JS(transaction)) and !object.contains(JS(meta))) {
|
||||
LOG(log_.debug()) << "Forwarding proposed transaction: " << object;
|
||||
subscriptions_->forwardProposedTransaction(object);
|
||||
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ValidationReceived) {
|
||||
LOG(log_.debug()) << "Forwarding validation: " << object;
|
||||
subscriptions_->forwardValidation(object);
|
||||
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) {
|
||||
LOG(log_.debug()) << "Forwarding manifest: " << object;
|
||||
subscriptions_->forwardManifest(object);
|
||||
} else {
|
||||
LOG(log_.error()) << "Unknown message: " << object;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -261,7 +269,7 @@ SubscriptionSource::handleMessage(std::string const& message)
|
||||
|
||||
return std::nullopt;
|
||||
} catch (std::exception const& e) {
|
||||
LOG(log_.error()) << "Exception in handleMessage : " << e.what();
|
||||
LOG(log_.error()) << "Exception in handleMessage: " << e.what();
|
||||
return util::requests::RequestError{fmt::format("Error handling message: {}", e.what())};
|
||||
}
|
||||
}
|
||||
@@ -273,13 +281,11 @@ SubscriptionSource::handleError(util::requests::RequestError const& error, boost
|
||||
isForwarding_ = false;
|
||||
if (not stop_) {
|
||||
onDisconnect_();
|
||||
LOG(log_.info()) << "Disconnected";
|
||||
}
|
||||
|
||||
if (wsConnection_ != nullptr) {
|
||||
auto const err = wsConnection_->close(yield);
|
||||
if (err) {
|
||||
LOG(log_.error()) << "Error closing websocket connection: " << err->message();
|
||||
}
|
||||
wsConnection_->close(yield);
|
||||
wsConnection_.reset();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user