diff --git a/src/etl/ETLSource.cpp b/src/etl/ETLSource.cpp index 1d6f8ec59..c4be14615 100644 --- a/src/etl/ETLSource.cpp +++ b/src/etl/ETLSource.cpp @@ -21,8 +21,8 @@ ETLSourceImpl::ETLSourceImpl( std::shared_ptr networkValidatedLedgers, ETLLoadBalancer& balancer) : ioc_(ioContext) - , resolver_(boost::asio::make_strand(ioc_)) - , timer_(ioc_) + , resolver_(boost::asio::make_strand(ioContext)) + , timer_(ioContext) , networkValidatedLedgers_(networkValidatedLedgers) , backend_(backend) , subscriptions_(subscriptions) @@ -107,13 +107,12 @@ ETLSourceImpl::reconnect(boost::beast::error_code ec) timer_.async_wait([this](auto ec) { bool startAgain = (ec != boost::asio::error::operation_aborted); BOOST_LOG_TRIVIAL(trace) << __func__ << " async_wait : ec = " << ec; - close(startAgain); + derived().close(startAgain); }); } -template void -ETLSourceImpl::close(bool startAgain) +PlainETLSource::close(bool startAgain) { timer_.cancel(); ioc_.post([this, startAgain]() { @@ -147,6 +146,54 @@ ETLSourceImpl::close(bool startAgain) }); } +void +SslETLSource::close(bool startAgain) +{ + timer_.cancel(); + ioc_.post([this, startAgain]() { + if (closing_) + return; + + if (derived().ws().is_open()) + { + // onStop() also calls close(). If the async_close is called twice, + // an assertion fails. Using closing_ makes sure async_close is only + // called once + closing_ = true; + derived().ws().async_close( + boost::beast::websocket::close_code::normal, + [this, startAgain](auto ec) { + if (ec) + { + BOOST_LOG_TRIVIAL(error) + << __func__ << " async_close : " + << "error code = " << ec << " - " << toString(); + } + closing_ = false; + if (startAgain) + { + ws_ = std::make_unique>>( + boost::asio::make_strand(ioc_), *sslCtx_); + + run(); + } + + }); + } + else if (startAgain) + { + ws_ = std::make_unique>>( + boost::asio::make_strand(ioc_), *sslCtx_); + + run(); + } + }); +} + template void ETLSourceImpl::onResolve( diff --git a/src/etl/ETLSource.h b/src/etl/ETLSource.h index 99b4bc892..b652fe84f 100644 --- a/src/etl/ETLSource.h +++ b/src/etl/ETLSource.h @@ -69,8 +69,6 @@ class ETLSourceImpl : public ETLSource std::string grpcPort_; - boost::asio::io_context& ioc_; - std::unique_ptr stub_; boost::asio::ip::tcp::resolver resolver_; @@ -87,8 +85,6 @@ class ETLSourceImpl : public ETLSource mutable std::mutex mtx_; - std::atomic_bool closing_{false}; - std::atomic_bool connected_{false}; // true if this ETL source is forwarding transactions received on the @@ -101,13 +97,28 @@ class ETLSourceImpl : public ETLSource std::chrono::system_clock::time_point lastMsgTime_; mutable std::mutex lastMsgTimeMtx_; - // used for retrying connections - boost::asio::steady_timer timer_; - std::shared_ptr backend_; std::shared_ptr subscriptions_; ETLLoadBalancer& balancer_; +protected: + Derived& + derived() + { + return static_cast(*this); + } + + std::string ip_; + + size_t numFailures_ = 0; + + boost::asio::io_context& ioc_; + + // used for retrying connections + boost::asio::steady_timer timer_; + + std::atomic_bool closing_{false}; + void run() override { @@ -121,17 +132,6 @@ class ETLSourceImpl : public ETLSource }); } -protected: - Derived& - derived() - { - return static_cast(*this); - } - - std::string ip_; - - size_t numFailures_ = 0; - public: ~ETLSourceImpl() @@ -315,11 +315,6 @@ public: bool handleMessage(); - /// Close the websocket - /// @param startAgain whether to reconnect - void - close(bool startAgain); - /// Get grpc stub to forward requests to rippled node /// @return stub to send requests to ETL source std::unique_ptr @@ -355,6 +350,11 @@ public: boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint) override; + /// Close the websocket + /// @param startAgain whether to reconnect + void + close(bool startAgain); + boost::beast::websocket::stream& ws() { @@ -364,6 +364,8 @@ public: class SslETLSource : public ETLSourceImpl { + std::optional> sslCtx_; + std::unique_ptr>> ws_; @@ -377,10 +379,11 @@ public: std::shared_ptr nwvl, ETLLoadBalancer& balancer) : ETLSourceImpl(config, ioc, backend, subscriptions, nwvl, balancer) + , sslCtx_(sslCtx) , ws_(std::make_unique< boost::beast::websocket::stream>>( - boost::asio::make_strand(ioc), *sslCtx)) + boost::asio::make_strand(ioc_), *sslCtx_)) { } @@ -394,6 +397,11 @@ public: boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint); + /// Close the websocket + /// @param startAgain whether to reconnect + void + close(bool startAgain); + boost::beast::websocket::stream>& ws() {