diff --git a/.gitignore b/.gitignore index f8bcd489..a590c895 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ *clio*.log build/ +.vscode .python-version diff --git a/CMakeLists.txt b/CMakeLists.txt index 3c832eb5..c1d3dccb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,6 +54,7 @@ target_sources(clio PRIVATE src/backend/SimpleCache.cpp ## ETL src/etl/ETLSource.cpp + src/etl/ProbingETLSource.cpp src/etl/NFTHelpers.cpp src/etl/ReportingETL.cpp ## Subscriptions diff --git a/src/etl/ETLSource.cpp b/src/etl/ETLSource.cpp index f19c975d..17d8b174 100644 --- a/src/etl/ETLSource.cpp +++ b/src/etl/ETLSource.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -95,66 +96,16 @@ make_TimeoutOption() } } -// Create ETL source without grpc endpoint -// Fetch ledger and load initial ledger will fail for this source -// Primarly used in read-only mode, to monitor when ledgers are validated -template -ETLSourceImpl::ETLSourceImpl( - boost::json::object const& config, - boost::asio::io_context& ioContext, - std::shared_ptr backend, - std::shared_ptr subscriptions, - std::shared_ptr networkValidatedLedgers, - ETLLoadBalancer& balancer) - : resolver_(boost::asio::make_strand(ioContext)) - , networkValidatedLedgers_(networkValidatedLedgers) - , backend_(backend) - , subscriptions_(subscriptions) - , balancer_(balancer) - , forwardCache_(config, ioContext, *this) - , ioc_(ioContext) - , timer_(ioContext) -{ - if (config.contains("ip")) - { - auto ipJs = config.at("ip").as_string(); - ip_ = {ipJs.c_str(), ipJs.size()}; - } - if (config.contains("ws_port")) - { - auto portjs = config.at("ws_port").as_string(); - wsPort_ = {portjs.c_str(), portjs.size()}; - } - if (config.contains("grpc_port")) - { - auto portjs = config.at("grpc_port").as_string(); - grpcPort_ = {portjs.c_str(), portjs.size()}; - try - { - boost::asio::ip::tcp::endpoint endpoint{ - boost::asio::ip::make_address(ip_), std::stoi(grpcPort_)}; - std::stringstream ss; - ss << endpoint; - grpc::ChannelArguments chArgs; - chArgs.SetMaxReceiveMessageSize(-1); - stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub( - grpc::CreateCustomChannel( - ss.str(), grpc::InsecureChannelCredentials(), chArgs)); - BOOST_LOG_TRIVIAL(debug) << "Made stub for remote = " << toString(); - } - catch (std::exception const& e) - { - BOOST_LOG_TRIVIAL(debug) - << "Exception while creating stub = " << e.what() - << " . Remote = " << toString(); - } - } -} - template void ETLSourceImpl::reconnect(boost::beast::error_code ec) { + if (paused_) + return; + + if (connected_) + hooks_.onDisconnected(ec); + connected_ = false; // These are somewhat normal errors. operation_aborted occurs on shutdown, // when the timer is cancelled. connection_refused will occur repeatedly @@ -422,6 +373,10 @@ ETLSourceImpl::onHandshake(boost::beast::error_code ec) { BOOST_LOG_TRIVIAL(trace) << __func__ << " : ec = " << ec << " - " << toString(); + if (auto action = hooks_.onConnected(ec); + action == ETLSourceHooks::Action::STOP) + return; + if (ec) { // start over @@ -960,34 +915,18 @@ static std::unique_ptr make_ETLSource( boost::json::object const& config, boost::asio::io_context& ioContext, - std::optional> sslCtx, std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr networkValidatedLedgers, ETLLoadBalancer& balancer) { - std::unique_ptr src = nullptr; - if (sslCtx) - { - src = std::make_unique( - config, - ioContext, - sslCtx, - backend, - subscriptions, - networkValidatedLedgers, - balancer); - } - else - { - src = std::make_unique( - config, - ioContext, - backend, - subscriptions, - networkValidatedLedgers, - balancer); - } + auto src = std::make_unique( + config, + ioContext, + backend, + subscriptions, + networkValidatedLedgers, + balancer); src->run(); @@ -997,7 +936,6 @@ make_ETLSource( ETLLoadBalancer::ETLLoadBalancer( boost::json::object const& config, boost::asio::io_context& ioContext, - std::optional> sslCtx, std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr nwvl) @@ -1016,13 +954,7 @@ ETLLoadBalancer::ETLLoadBalancer( for (auto& entry : config.at("etl_sources").as_array()) { std::unique_ptr source = make_ETLSource( - entry.as_object(), - ioContext, - sslCtx, - backend, - subscriptions, - nwvl, - *this); + entry.as_object(), ioContext, backend, subscriptions, nwvl, *this); sources_.push_back(std::move(source)); BOOST_LOG_TRIVIAL(info) << __func__ << " : added etl source - " diff --git a/src/etl/ETLSource.h b/src/etl/ETLSource.h index 1e9d017e..be318e1f 100644 --- a/src/etl/ETLSource.h +++ b/src/etl/ETLSource.h @@ -16,6 +16,7 @@ class ETLLoadBalancer; class ETLSource; +class ProbingETLSource; class SubscriptionManager; /// This class manages a connection to a single ETL source. This is almost @@ -96,6 +97,12 @@ public: virtual void run() = 0; + virtual void + pause() = 0; + + virtual void + resume() = 0; + virtual std::string toString() const = 0; @@ -126,6 +133,7 @@ public: private: friend ForwardCache; + friend ProbingETLSource; virtual std::optional requestFromRippled( @@ -134,6 +142,14 @@ private: boost::asio::yield_context& yield) const = 0; }; +struct ETLSourceHooks +{ + enum class Action { STOP, PROCEED }; + + std::function onConnected; + std::function onDisconnected; +}; + template class ETLSourceImpl : public ETLSource { @@ -199,6 +215,10 @@ protected: std::atomic_bool closing_{false}; + std::atomic_bool paused_{false}; + + ETLSourceHooks hooks_; + void run() override { @@ -215,7 +235,7 @@ protected: public: ~ETLSourceImpl() { - close(false); + derived().close(false); } bool @@ -247,7 +267,54 @@ public: std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr networkValidatedLedgers, - ETLLoadBalancer& balancer); + ETLLoadBalancer& balancer, + ETLSourceHooks hooks) + : resolver_(boost::asio::make_strand(ioContext)) + , networkValidatedLedgers_(networkValidatedLedgers) + , backend_(backend) + , subscriptions_(subscriptions) + , balancer_(balancer) + , forwardCache_(config, ioContext, *this) + , ioc_(ioContext) + , timer_(ioContext) + , hooks_(hooks) + { + if (config.contains("ip")) + { + auto ipJs = config.at("ip").as_string(); + ip_ = {ipJs.c_str(), ipJs.size()}; + } + if (config.contains("ws_port")) + { + auto portjs = config.at("ws_port").as_string(); + wsPort_ = {portjs.c_str(), portjs.size()}; + } + if (config.contains("grpc_port")) + { + auto portjs = config.at("grpc_port").as_string(); + grpcPort_ = {portjs.c_str(), portjs.size()}; + try + { + boost::asio::ip::tcp::endpoint endpoint{ + boost::asio::ip::make_address(ip_), std::stoi(grpcPort_)}; + std::stringstream ss; + ss << endpoint; + grpc::ChannelArguments chArgs; + chArgs.SetMaxReceiveMessageSize(-1); + stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub( + grpc::CreateCustomChannel( + ss.str(), grpc::InsecureChannelCredentials(), chArgs)); + BOOST_LOG_TRIVIAL(debug) + << "Made stub for remote = " << toString(); + } + catch (std::exception const& e) + { + BOOST_LOG_TRIVIAL(debug) + << "Exception while creating stub = " << e.what() + << " . Remote = " << toString(); + } + } + } /// @param sequence ledger sequence to check for /// @return true if this source has the desired ledger @@ -371,6 +438,22 @@ public: void reconnect(boost::beast::error_code ec); + /// Pause the source effectively stopping it from trying to reconnect + void + pause() override + { + paused_ = true; + derived().close(false); + } + + /// Resume the source allowing it to reconnect again + void + resume() override + { + paused_ = false; + derived().close(true); + } + /// Callback void onResolve( @@ -420,8 +503,16 @@ public: std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr nwvl, - ETLLoadBalancer& balancer) - : ETLSourceImpl(config, ioc, backend, subscriptions, nwvl, balancer) + ETLLoadBalancer& balancer, + ETLSourceHooks hooks) + : ETLSourceImpl( + config, + ioc, + backend, + subscriptions, + nwvl, + balancer, + std::move(hooks)) , ws_(std::make_unique< boost::beast::websocket::stream>( boost::asio::make_strand(ioc))) @@ -462,8 +553,16 @@ public: std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr nwvl, - ETLLoadBalancer& balancer) - : ETLSourceImpl(config, ioc, backend, subscriptions, nwvl, balancer) + ETLLoadBalancer& balancer, + ETLSourceHooks hooks) + : ETLSourceImpl( + config, + ioc, + backend, + subscriptions, + nwvl, + balancer, + std::move(hooks)) , sslCtx_(sslCtx) , ws_(std::make_unique>>( @@ -513,7 +612,6 @@ public: ETLLoadBalancer( boost::json::object const& config, boost::asio::io_context& ioContext, - std::optional> sslCtx, std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr nwvl); @@ -522,13 +620,12 @@ public: make_ETLLoadBalancer( boost::json::object const& config, boost::asio::io_context& ioc, - std::optional> sslCtx, std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr validatedLedgers) { return std::make_shared( - config, ioc, sslCtx, backend, subscriptions, validatedLedgers); + config, ioc, backend, subscriptions, validatedLedgers); } ~ETLLoadBalancer() diff --git a/src/etl/ProbingETLSource.cpp b/src/etl/ProbingETLSource.cpp new file mode 100644 index 00000000..718e4792 --- /dev/null +++ b/src/etl/ProbingETLSource.cpp @@ -0,0 +1,190 @@ +#include + +ProbingETLSource::ProbingETLSource( + boost::json::object const& config, + boost::asio::io_context& ioc, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr nwvl, + ETLLoadBalancer& balancer, + boost::asio::ssl::context sslCtx) + : ioc_{ioc} + , sslCtx_{std::move(sslCtx)} + , sslSrc_{make_shared( + config, + ioc, + std::ref(sslCtx_), + backend, + subscriptions, + nwvl, + balancer, + make_SSLHooks())} + , plainSrc_{make_shared( + config, + ioc, + backend, + subscriptions, + nwvl, + balancer, + make_PlainHooks())} +{ +} + +void +ProbingETLSource::run() +{ + sslSrc_->run(); + plainSrc_->run(); +} + +void +ProbingETLSource::pause() +{ + sslSrc_->pause(); + plainSrc_->pause(); +} + +void +ProbingETLSource::resume() +{ + sslSrc_->resume(); + plainSrc_->resume(); +} + +bool +ProbingETLSource::isConnected() const +{ + return currentSrc_ && currentSrc_->isConnected(); +} + +bool +ProbingETLSource::hasLedger(uint32_t sequence) const +{ + if (!currentSrc_) + return false; + return currentSrc_->hasLedger(sequence); +} + +boost::json::object +ProbingETLSource::toJson() const +{ + if (!currentSrc_) + return {}; + return currentSrc_->toJson(); +} + +std::string +ProbingETLSource::toString() const +{ + if (!currentSrc_) + return "{ probing }"; + return currentSrc_->toString(); +} + +bool +ProbingETLSource::loadInitialLedger( + std::uint32_t ledgerSequence, + std::uint32_t numMarkers, + bool cacheOnly) +{ + if (!currentSrc_) + return false; + return currentSrc_->loadInitialLedger( + ledgerSequence, numMarkers, cacheOnly); +} + +std::pair +ProbingETLSource::fetchLedger( + uint32_t ledgerSequence, + bool getObjects, + bool getObjectNeighbors) +{ + if (!currentSrc_) + return {}; + return currentSrc_->fetchLedger( + ledgerSequence, getObjects, getObjectNeighbors); +} + +std::optional +ProbingETLSource::forwardToRippled( + boost::json::object const& request, + std::string const& clientIp, + boost::asio::yield_context& yield) const +{ + if (!currentSrc_) + return {}; + return currentSrc_->forwardToRippled(request, clientIp, yield); +} + +std::optional +ProbingETLSource::requestFromRippled( + boost::json::object const& request, + std::string const& clientIp, + boost::asio::yield_context& yield) const +{ + if (!currentSrc_) + return {}; + return currentSrc_->requestFromRippled(request, clientIp, yield); +} + +ETLSourceHooks +ProbingETLSource::make_SSLHooks() noexcept +{ + return {// onConnected + [this](auto ec) { + std::lock_guard lck(mtx_); + if (currentSrc_) + return ETLSourceHooks::Action::STOP; + + if (!ec) + { + plainSrc_->pause(); + currentSrc_ = sslSrc_; + BOOST_LOG_TRIVIAL(info) + << "Selected WSS as the main source: " + << currentSrc_->toString(); + } + return ETLSourceHooks::Action::PROCEED; + }, + // onDisconnected + [this](auto ec) { + std::lock_guard lck(mtx_); + if (currentSrc_) + { + currentSrc_ = nullptr; + plainSrc_->resume(); + } + return ETLSourceHooks::Action::STOP; + }}; +} + +ETLSourceHooks +ProbingETLSource::make_PlainHooks() noexcept +{ + return {// onConnected + [this](auto ec) { + std::lock_guard lck(mtx_); + if (currentSrc_) + return ETLSourceHooks::Action::STOP; + + if (!ec) + { + sslSrc_->pause(); + currentSrc_ = plainSrc_; + BOOST_LOG_TRIVIAL(info) + << "Selected Plain WS as the main source: " + << currentSrc_->toString(); + } + return ETLSourceHooks::Action::PROCEED; + }, + // onDisconnected + [this](auto ec) { + std::lock_guard lck(mtx_); + if (currentSrc_) + { + currentSrc_ = nullptr; + sslSrc_->resume(); + } + return ETLSourceHooks::Action::STOP; + }}; +} diff --git a/src/etl/ProbingETLSource.h b/src/etl/ProbingETLSource.h new file mode 100644 index 00000000..77ec293a --- /dev/null +++ b/src/etl/ProbingETLSource.h @@ -0,0 +1,91 @@ +#ifndef RIPPLE_APP_REPORTING_PROBINGETLSOURCE_H_INCLUDED +#define RIPPLE_APP_REPORTING_PROBINGETLSOURCE_H_INCLUDED + +#include +#include +#include +#include +#include +#include +#include + +/// This ETLSource implementation attempts to connect over both secure websocket +/// and plain websocket. First to connect pauses the other and the probing is +/// considered done at this point. If however the connected source loses +/// connection the probing is kickstarted again. +class ProbingETLSource : public ETLSource +{ + std::mutex mtx_; + boost::asio::io_context& ioc_; + boost::asio::ssl::context sslCtx_; + std::shared_ptr sslSrc_; + std::shared_ptr plainSrc_; + std::shared_ptr currentSrc_; + +public: + ProbingETLSource( + boost::json::object const& config, + boost::asio::io_context& ioc, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr nwvl, + ETLLoadBalancer& balancer, + boost::asio::ssl::context sslCtx = boost::asio::ssl::context{ + boost::asio::ssl::context::tlsv12}); + + ~ProbingETLSource() = default; + + void + run() override; + + void + pause() override; + + void + resume() override; + + bool + isConnected() const override; + + bool + hasLedger(uint32_t sequence) const override; + + boost::json::object + toJson() const override; + + std::string + toString() const override; + + bool + loadInitialLedger( + std::uint32_t ledgerSequence, + std::uint32_t numMarkers, + bool cacheOnly = false) override; + + std::pair + fetchLedger( + uint32_t ledgerSequence, + bool getObjects = true, + bool getObjectNeighbors = false) override; + + std::optional + forwardToRippled( + boost::json::object const& request, + std::string const& clientIp, + boost::asio::yield_context& yield) const override; + +private: + std::optional + requestFromRippled( + boost::json::object const& request, + std::string const& clientIp, + boost::asio::yield_context& yield) const override; + + ETLSourceHooks + make_SSLHooks() noexcept; + + ETLSourceHooks + make_PlainHooks() noexcept; +}; + +#endif diff --git a/src/main/main.cpp b/src/main/main.cpp index fb7c9d3a..0e7e8073 100644 --- a/src/main/main.cpp +++ b/src/main/main.cpp @@ -272,7 +272,7 @@ main(int argc, char* argv[]) // The balancer itself publishes to streams (transactions_proposed and // accounts_proposed) auto balancer = ETLLoadBalancer::make_ETLLoadBalancer( - *config, ioc, ctxRef, backend, subscriptions, ledgers); + *config, ioc, backend, subscriptions, ledgers); // ETL is responsible for writing and publishing to streams. In read-only // mode, ETL only publishes