From 5004dc4e15d01aceee2799031b245dceb9600e83 Mon Sep 17 00:00:00 2001 From: Sergey Kuznetsov Date: Thu, 29 Aug 2024 15:59:02 +0100 Subject: [PATCH] fix: Fix logging in SubscriptionSource (#1617) For #1616. Later should be ported to develop as well. --- src/etl/LoadBalancer.cpp | 1 + src/etl/impl/GrpcSource.cpp | 2 +- src/etl/impl/SubscriptionSource.cpp | 22 ++++++++++++++-------- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/etl/LoadBalancer.cpp b/src/etl/LoadBalancer.cpp index f42a3d1d4..4af0ef821 100644 --- a/src/etl/LoadBalancer.cpp +++ b/src/etl/LoadBalancer.cpp @@ -314,6 +314,7 @@ LoadBalancer::getETLState() noexcept void LoadBalancer::chooseForwardingSource() { + LOG(log_.info()) << "Choosing a new source to forward subscriptions"; hasForwardingSource_ = false; for (auto& source : sources_) { if (not hasForwardingSource_ and source->isConnected()) { diff --git a/src/etl/impl/GrpcSource.cpp b/src/etl/impl/GrpcSource.cpp index db32a239b..c7d09cbe5 100644 --- a/src/etl/impl/GrpcSource.cpp +++ b/src/etl/impl/GrpcSource.cpp @@ -46,7 +46,7 @@ namespace etl::impl { GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::shared_ptr backend) - : log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort)), backend_(std::move(backend)) + : log_(fmt::format("GrpcSource[{}:{}]", ip, grpcPort)), backend_(std::move(backend)) { try { boost::asio::ip::tcp::endpoint const endpoint{boost::asio::ip::make_address(ip), std::stoi(grpcPort)}; diff --git a/src/etl/impl/SubscriptionSource.cpp b/src/etl/impl/SubscriptionSource.cpp index 3d5791500..d0e76f875 100644 --- a/src/etl/impl/SubscriptionSource.cpp +++ b/src/etl/impl/SubscriptionSource.cpp @@ -69,7 +69,7 @@ SubscriptionSource::SubscriptionSource( std::chrono::steady_clock::duration const connectionTimeout, std::chrono::steady_clock::duration const retryDelay ) - : log_(fmt::format("GrpcSource[{}:{}]", ip, wsPort)) + : log_(fmt::format("SubscriptionSource[{}:{}]", ip, wsPort)) , wsConnectionBuilder_(ip, wsPort) , validatedLedgers_(std::move(validatedLedgers)) , subscriptions_(std::move(subscriptions)) @@ -133,6 +133,7 @@ void SubscriptionSource::setForwarding(bool isForwarding) { isForwarding_ = isForwarding; + LOG(log_.info()) << "Forwarding set to " << isForwarding_; } std::chrono::steady_clock::time_point @@ -168,6 +169,7 @@ SubscriptionSource::subscribe() wsConnection_ = std::move(connection).value(); isConnected_ = true; onConnect_(); + LOG(log_.info()) << "Connected"; auto const& subscribeCommand = getSubscribeCommandJson(); auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield); @@ -224,10 +226,11 @@ SubscriptionSource::handleMessage(std::string const& message) auto validatedLedgers = boost::json::value_to(result.at(JS(validated_ledgers))); setValidatedRange(std::move(validatedLedgers)); } - LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object; + LOG(log_.debug()) << "Received a message on ledger subscription stream. Message: " << object; } else if (object.contains(JS(type)) && object.at(JS(type)) == JS_LedgerClosed) { - LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object; + LOG(log_.debug()) << "Received a message of type 'ledgerClosed' on ledger subscription stream. Message: " + << object; if (object.contains(JS(ledger_index))) { ledgerIndex = object.at(JS(ledger_index)).as_int64(); } @@ -245,11 +248,16 @@ SubscriptionSource::handleMessage(std::string const& message) // 2 - Validated transaction // Only forward proposed transaction, validated transactions are sent by Clio itself if (object.contains(JS(transaction)) and !object.contains(JS(meta))) { + LOG(log_.debug()) << "Forwarding proposed transaction: " << object; subscriptions_->forwardProposedTransaction(object); } else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ValidationReceived) { + LOG(log_.debug()) << "Forwarding validation: " << object; subscriptions_->forwardValidation(object); } else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) { + LOG(log_.debug()) << "Forwarding manifest: " << object; subscriptions_->forwardManifest(object); + } else { + LOG(log_.error()) << "Unknown message: " << object; } } } @@ -261,7 +269,7 @@ SubscriptionSource::handleMessage(std::string const& message) return std::nullopt; } catch (std::exception const& e) { - LOG(log_.error()) << "Exception in handleMessage : " << e.what(); + LOG(log_.error()) << "Exception in handleMessage: " << e.what(); return util::requests::RequestError{fmt::format("Error handling message: {}", e.what())}; } } @@ -273,13 +281,11 @@ SubscriptionSource::handleError(util::requests::RequestError const& error, boost isForwarding_ = false; if (not stop_) { onDisconnect_(); + LOG(log_.info()) << "Disconnected"; } if (wsConnection_ != nullptr) { - auto const err = wsConnection_->close(yield); - if (err) { - LOG(log_.error()) << "Error closing websocket connection: " << err->message(); - } + wsConnection_->close(yield); wsConnection_.reset(); }