diff --git a/src/app/ClioApplication.cpp b/src/app/ClioApplication.cpp index e68a2797a..6c40900ac 100644 --- a/src/app/ClioApplication.cpp +++ b/src/app/ClioApplication.cpp @@ -27,6 +27,8 @@ #include "etl/ETLService.hpp" #include "etl/LoadBalancer.hpp" #include "etl/NetworkValidatedLedgers.hpp" +#include "etlng/LoadBalancer.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManager.hpp" #include "migration/MigrationInspectorFactory.hpp" #include "rpc/Counters.hpp" @@ -130,7 +132,12 @@ ClioApplication::run(bool const useNgWebServer) // ETL uses the balancer to extract data. // The server uses the balancer to forward RPCs to a rippled node. // The balancer itself publishes to streams (transactions_proposed and accounts_proposed) - auto balancer = etl::LoadBalancer::makeLoadBalancer(config_, ioc, backend, subscriptions, ledgers); + auto balancer = [&] -> std::shared_ptr { + if (config_.get("__ng_etl")) + return etlng::LoadBalancer::makeLoadBalancer(config_, ioc, backend, subscriptions, ledgers); + + return etl::LoadBalancer::makeLoadBalancer(config_, ioc, backend, subscriptions, ledgers); + }(); // ETL is responsible for writing and publishing to streams. In read-only mode, ETL only publishes auto etl = etl::ETLService::makeETLService(config_, ioc, backend, subscriptions, balancer, ledgers); @@ -142,12 +149,12 @@ ClioApplication::run(bool const useNgWebServer) config_, backend, subscriptions, balancer, etl, amendmentCenter, counters ); - using RPCEngineType = rpc::RPCEngine; + using RPCEngineType = rpc::RPCEngine; auto const rpcEngine = RPCEngineType::makeRPCEngine(config_, backend, balancer, dosGuard, workQueue, counters, handlerProvider); if (useNgWebServer or config_.get("server.__ng_web_server")) { - web::ng::RPCServerHandler handler{config_, backend, rpcEngine, etl}; + web::ng::RPCServerHandler handler{config_, backend, rpcEngine, etl}; auto expectedAdminVerifier = web::makeAdminVerificationStrategy(config_); if (not expectedAdminVerifier.has_value()) { @@ -188,8 +195,7 @@ ClioApplication::run(bool const useNgWebServer) } // Init the web server - auto handler = - std::make_shared>(config_, backend, rpcEngine, etl); + auto handler = std::make_shared>(config_, backend, rpcEngine, etl); auto const httpServer = web::makeHttpServer(config_, ioc, dosGuard, handler); diff --git a/src/app/Stopper.hpp b/src/app/Stopper.hpp index b4faa1377..daba1164e 100644 --- a/src/app/Stopper.hpp +++ b/src/app/Stopper.hpp @@ -20,8 +20,8 @@ #pragma once #include "data/BackendInterface.hpp" -#include "etl/ETLService.hpp" -#include "etl/LoadBalancer.hpp" +#include "etlng/ETLServiceInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "util/CoroutineGroup.hpp" #include "util/log/Logger.hpp" @@ -74,15 +74,12 @@ public: * @param ioc The io_context to stop. * @return The callback to be called on application stop. */ - template < - web::ng::SomeServer ServerType, - etl::SomeLoadBalancer LoadBalancerType, - etl::SomeETLService ETLServiceType> + template static std::function makeOnStopCallback( ServerType& server, - LoadBalancerType& balancer, - ETLServiceType& etl, + etlng::LoadBalancerInterface& balancer, + etlng::ETLServiceInterface& etl, feed::SubscriptionManagerInterface& subscriptions, data::BackendInterface& backend, boost::asio::io_context& ioc diff --git a/src/etl/ETLService.cpp b/src/etl/ETLService.cpp index 6eca76339..a7207c99b 100644 --- a/src/etl/ETLService.cpp +++ b/src/etl/ETLService.cpp @@ -22,6 +22,7 @@ #include "data/BackendInterface.hpp" #include "etl/CorruptionDetector.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "util/Assert.hpp" #include "util/Constants.hpp" @@ -43,6 +44,7 @@ #include namespace etl { + // Database must be populated when this starts std::optional ETLService::runETLPipeline(uint32_t startSequence, uint32_t numExtractors) @@ -265,7 +267,7 @@ ETLService::ETLService( boost::asio::io_context& ioc, std::shared_ptr backend, std::shared_ptr subscriptions, - std::shared_ptr balancer, + std::shared_ptr balancer, std::shared_ptr ledgers ) : backend_(backend) diff --git a/src/etl/ETLService.hpp b/src/etl/ETLService.hpp index d1d865475..ea8b7852d 100644 --- a/src/etl/ETLService.hpp +++ b/src/etl/ETLService.hpp @@ -32,7 +32,12 @@ #include "etl/impl/LedgerLoader.hpp" #include "etl/impl/LedgerPublisher.hpp" #include "etl/impl/Transformer.hpp" +#include "etlng/ETLService.hpp" +#include "etlng/ETLServiceInterface.hpp" +#include "etlng/LoadBalancer.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" +#include "util/Assert.hpp" #include "util/log/Logger.hpp" #include @@ -41,7 +46,6 @@ #include #include -#include #include #include #include @@ -81,14 +85,13 @@ concept SomeETLService = std::derived_from; * the others will fall back to monitoring/publishing. In this sense, this class dynamically transitions from monitoring * to writing and from writing to monitoring, based on the activity of other processes running on different machines. */ -class ETLService : public ETLServiceTag { +class ETLService : public etlng::ETLServiceInterface, ETLServiceTag { // TODO: make these template parameters in ETLService - using LoadBalancerType = LoadBalancer; using DataPipeType = etl::impl::ExtractionDataPipe; using CacheLoaderType = etl::CacheLoader<>; - using LedgerFetcherType = etl::impl::LedgerFetcher; + using LedgerFetcherType = etl::impl::LedgerFetcher; using ExtractorType = etl::impl::Extractor; - using LedgerLoaderType = etl::impl::LedgerLoader; + using LedgerLoaderType = etl::impl::LedgerLoader; using LedgerPublisherType = etl::impl::LedgerPublisher; using AmendmentBlockHandlerType = etl::impl::AmendmentBlockHandler; using TransformerType = @@ -97,7 +100,7 @@ class ETLService : public ETLServiceTag { util::Logger log_{"ETL"}; std::shared_ptr backend_; - std::shared_ptr loadBalancer_; + std::shared_ptr loadBalancer_; std::shared_ptr networkValidatedLedgers_; std::uint32_t extractorThreads_ = 1; @@ -132,7 +135,7 @@ public: boost::asio::io_context& ioc, std::shared_ptr backend, std::shared_ptr subscriptions, - std::shared_ptr balancer, + std::shared_ptr balancer, std::shared_ptr ledgers ); @@ -154,20 +157,33 @@ public: * @param ledgers The network validated ledgers datastructure * @return A shared pointer to a new instance of ETLService */ - static std::shared_ptr + static std::shared_ptr makeETLService( util::config::ClioConfigDefinition const& config, boost::asio::io_context& ioc, std::shared_ptr backend, std::shared_ptr subscriptions, - std::shared_ptr balancer, + std::shared_ptr balancer, std::shared_ptr ledgers ) { - auto etl = std::make_shared(config, ioc, backend, subscriptions, balancer, ledgers); - etl->run(); + std::shared_ptr ret; - return etl; + if (config.get("__ng_etl")) { + ASSERT( + std::dynamic_pointer_cast(balancer), + "LoadBalancer type must be etlng::LoadBalancer" + ); + ret = std::make_shared(config, backend, subscriptions, balancer, ledgers); + } else { + ASSERT( + std::dynamic_pointer_cast(balancer), "LoadBalancer type must be etl::LoadBalancer" + ); + ret = std::make_shared(config, ioc, backend, subscriptions, balancer, ledgers); + } + + ret->run(); + return ret; } /** @@ -184,7 +200,7 @@ public: * @note This method blocks until the ETL service has stopped. */ void - stop() + stop() override { LOG(log_.info()) << "Stop called"; @@ -203,7 +219,7 @@ public: * @return Time passed since last ledger close */ std::uint32_t - lastCloseAgeSeconds() const + lastCloseAgeSeconds() const override { return ledgerPublisher_.lastCloseAgeSeconds(); } @@ -214,7 +230,7 @@ public: * @return true if currently amendment blocked; false otherwise */ bool - isAmendmentBlocked() const + isAmendmentBlocked() const override { return state_.isAmendmentBlocked; } @@ -225,7 +241,7 @@ public: * @return true if corruption of DB was detected and cache was stopped. */ bool - isCorruptionDetected() const + isCorruptionDetected() const override { return state_.isCorruptionDetected; } @@ -236,7 +252,7 @@ public: * @return The state of ETL as a JSON object */ boost::json::object - getInfo() const + getInfo() const override { boost::json::object result; @@ -254,11 +270,17 @@ public: * @return The etl nodes' state, nullopt if etl nodes are not connected */ std::optional - getETLState() const noexcept + getETLState() const noexcept override { return loadBalancer_->getETLState(); } + /** + * @brief Start all components to run ETL service. + */ + void + run() override; + private: /** * @brief Run the ETL pipeline. @@ -325,12 +347,6 @@ private: return numMarkers_; } - /** - * @brief Start all components to run ETL service. - */ - void - run(); - /** * @brief Spawn the worker thread and start monitoring. */ diff --git a/src/etl/LoadBalancer.cpp b/src/etl/LoadBalancer.cpp index 7dc0d1925..a26c5bb58 100644 --- a/src/etl/LoadBalancer.cpp +++ b/src/etl/LoadBalancer.cpp @@ -23,6 +23,7 @@ #include "etl/ETLState.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/Source.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "rpc/Errors.hpp" #include "util/Assert.hpp" @@ -59,7 +60,7 @@ using namespace util::config; namespace etl { -std::shared_ptr +std::shared_ptr LoadBalancer::makeLoadBalancer( ClioConfigDefinition const& config, boost::asio::io_context& ioc, @@ -175,12 +176,12 @@ LoadBalancer::~LoadBalancer() } std::vector -LoadBalancer::loadInitialLedger(uint32_t sequence, bool cacheOnly, std::chrono::steady_clock::duration retryAfter) +LoadBalancer::loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter) { std::vector response; execute( - [this, &response, &sequence, cacheOnly](auto& source) { - auto [data, res] = source->loadInitialLedger(sequence, downloadRanges_, cacheOnly); + [this, &response, &sequence](auto& source) { + auto [data, res] = source->loadInitialLedger(sequence, downloadRanges_); if (!res) { LOG(log_.error()) << "Failed to download initial ledger." diff --git a/src/etl/LoadBalancer.hpp b/src/etl/LoadBalancer.hpp index cfe01d019..169636685 100644 --- a/src/etl/LoadBalancer.hpp +++ b/src/etl/LoadBalancer.hpp @@ -23,8 +23,11 @@ #include "etl/ETLState.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/Source.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "rpc/Errors.hpp" +#include "util/Assert.hpp" #include "util/Mutex.hpp" #include "util/ResponseExpirationCache.hpp" #include "util/log/Logger.hpp" @@ -41,13 +44,13 @@ #include #include -#include #include #include #include #include #include #include +#include #include namespace etl { @@ -69,7 +72,7 @@ concept SomeLoadBalancer = std::derived_from; * which ledgers have been validated by the network, and the range of ledgers each etl source has). This class also * allows requests for ledger data to be load balanced across all possible ETL sources. */ -class LoadBalancer : public LoadBalancerTag { +class LoadBalancer : public etlng::LoadBalancerInterface, LoadBalancerTag { public: using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject; using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse; @@ -133,7 +136,7 @@ public: * @param sourceFactory A factory function to create a source * @return A shared pointer to a new instance of LoadBalancer */ - static std::shared_ptr + static std::shared_ptr makeLoadBalancer( util::config::ClioConfigDefinition const& config, boost::asio::io_context& ioc, @@ -150,16 +153,32 @@ public: * @note This function will retry indefinitely until the ledger is downloaded. * * @param sequence Sequence of ledger to download - * @param cacheOnly Whether to only write to cache and not to the DB; defaults to false + * @param retryAfter Time to wait between retries (2 seconds by default) + * @return A std::vector The ledger data + */ + std::vector + loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}) + override; + + /** + * @brief Load the initial ledger, writing data to the queue. + * @note This function will retry indefinitely until the ledger is downloaded. + * + * @param sequence Sequence of ledger to download + * @param observer The observer to notify of progress * @param retryAfter Time to wait between retries (2 seconds by default) * @return A std::vector The ledger data */ std::vector loadInitialLedger( - uint32_t sequence, - bool cacheOnly = false, - std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2} - ); + [[maybe_unused]] uint32_t sequence, + [[maybe_unused]] etlng::InitialLoadObserverInterface& observer, + [[maybe_unused]] std::chrono::steady_clock::duration retryAfter + ) override + { + ASSERT(false, "Not available for old ETL"); + std::unreachable(); + } /** * @brief Fetch data for a specific ledger. @@ -180,7 +199,7 @@ public: bool getObjects, bool getObjectNeighbors, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2} - ); + ) override; /** * @brief Represent the state of this load balancer as a JSON object @@ -188,7 +207,7 @@ public: * @return JSON representation of the state of this load balancer. */ boost::json::value - toJson() const; + toJson() const override; /** * @brief Forward a JSON RPC request to a randomly selected rippled node. @@ -205,14 +224,14 @@ public: std::optional const& clientIp, bool isAdmin, boost::asio::yield_context yield - ); + ) override; /** * @brief Return state of ETL nodes. * @return ETL state, nullopt if etl nodes not available */ std::optional - getETLState() noexcept; + getETLState() noexcept override; /** * @brief Stop the load balancer. This will stop all subscription sources. @@ -221,7 +240,7 @@ public: * @param yield The coroutine context */ void - stop(boost::asio::yield_context yield); + stop(boost::asio::yield_context yield) override; private: /** diff --git a/src/etl/NetworkValidatedLedgersInterface.hpp b/src/etl/NetworkValidatedLedgersInterface.hpp index d2ab9d12b..067407c8c 100644 --- a/src/etl/NetworkValidatedLedgersInterface.hpp +++ b/src/etl/NetworkValidatedLedgersInterface.hpp @@ -26,6 +26,7 @@ #include #include + namespace etl { /** diff --git a/src/etl/Source.hpp b/src/etl/Source.hpp index 91d9bf817..6ff7e7285 100644 --- a/src/etl/Source.hpp +++ b/src/etl/Source.hpp @@ -23,8 +23,6 @@ #include "etl/NetworkValidatedLedgersInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "rpc/Errors.hpp" -#include "util/log/Logger.hpp" -#include "util/newconfig/ConfigDefinition.hpp" #include "util/newconfig/ObjectView.hpp" #include @@ -130,11 +128,10 @@ public: * * @param sequence Sequence of the ledger to download * @param numMarkers Number of markers to generate for async calls - * @param cacheOnly Only insert into cache, not the DB; defaults to false * @return A std::pair of the data and a bool indicating whether the download was successful */ virtual std::pair, bool> - loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, bool cacheOnly = false) = 0; + loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers) = 0; /** * @brief Forward a request to rippled. diff --git a/src/etl/impl/GrpcSource.cpp b/src/etl/impl/GrpcSource.cpp index 29be5b224..539cdf481 100644 --- a/src/etl/impl/GrpcSource.cpp +++ b/src/etl/impl/GrpcSource.cpp @@ -98,7 +98,7 @@ GrpcSource::fetchLedger(uint32_t sequence, bool getObjects, bool getObjectNeighb } std::pair, bool> -GrpcSource::loadInitialLedger(uint32_t const sequence, uint32_t const numMarkers, bool const cacheOnly) +GrpcSource::loadInitialLedger(uint32_t const sequence, uint32_t const numMarkers) { if (!stub_) return {{}, false}; @@ -130,7 +130,7 @@ GrpcSource::loadInitialLedger(uint32_t const sequence, uint32_t const numMarkers LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix(); - auto result = ptr->process(stub_, cq, *backend_, abort, cacheOnly); + auto result = ptr->process(stub_, cq, *backend_, abort); if (result != etl::impl::AsyncCallData::CallStatus::MORE) { ++numFinished; LOG(log_.debug()) << "Finished a marker. Current number of finished = " << numFinished; diff --git a/src/etl/impl/GrpcSource.hpp b/src/etl/impl/GrpcSource.hpp index 5d41f193d..248f4191d 100644 --- a/src/etl/impl/GrpcSource.hpp +++ b/src/etl/impl/GrpcSource.hpp @@ -60,11 +60,10 @@ public: * * @param sequence Sequence of the ledger to download * @param numMarkers Number of markers to generate for async calls - * @param cacheOnly Only insert into cache, not the DB; defaults to false * @return A std::pair of the data and a bool indicating whether the download was successful */ std::pair, bool> - loadInitialLedger(uint32_t sequence, uint32_t numMarkers, bool cacheOnly = false); + loadInitialLedger(uint32_t sequence, uint32_t numMarkers); }; } // namespace etl::impl diff --git a/src/etl/impl/LedgerFetcher.hpp b/src/etl/impl/LedgerFetcher.hpp index 9d8a0df88..acb792ac2 100644 --- a/src/etl/impl/LedgerFetcher.hpp +++ b/src/etl/impl/LedgerFetcher.hpp @@ -20,6 +20,8 @@ #pragma once #include "data/BackendInterface.hpp" +#include "etl/LedgerFetcherInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "util/log/Logger.hpp" #include @@ -34,22 +36,18 @@ namespace etl::impl { /** * @brief GRPC Ledger data fetcher */ -template -class LedgerFetcher { -public: - using OptionalGetLedgerResponseType = typename LoadBalancerType::OptionalGetLedgerResponseType; - +class LedgerFetcher : public LedgerFetcherInterface { private: util::Logger log_{"ETL"}; std::shared_ptr backend_; - std::shared_ptr loadBalancer_; + std::shared_ptr loadBalancer_; public: /** * @brief Create an instance of the fetcher */ - LedgerFetcher(std::shared_ptr backend, std::shared_ptr balancer) + LedgerFetcher(std::shared_ptr backend, std::shared_ptr balancer) : backend_(std::move(backend)), loadBalancer_(std::move(balancer)) { } @@ -64,7 +62,7 @@ public: * @return Ledger header and transaction+metadata blobs; Empty optional if the server is shutting down */ [[nodiscard]] OptionalGetLedgerResponseType - fetchData(uint32_t sequence) + fetchData(uint32_t sequence) override { LOG(log_.debug()) << "Attempting to fetch ledger with sequence = " << sequence; @@ -84,7 +82,7 @@ public: * @return Ledger data diff between sequance and parent; Empty optional if the server is shutting down */ [[nodiscard]] OptionalGetLedgerResponseType - fetchDataAndDiff(uint32_t sequence) + fetchDataAndDiff(uint32_t sequence) override { LOG(log_.debug()) << "Attempting to fetch ledger with sequence = " << sequence; diff --git a/src/etl/impl/LedgerLoader.hpp b/src/etl/impl/LedgerLoader.hpp index 716fa8718..6be0f30e3 100644 --- a/src/etl/impl/LedgerLoader.hpp +++ b/src/etl/impl/LedgerLoader.hpp @@ -26,6 +26,7 @@ #include "etl/NFTHelpers.hpp" #include "etl/SystemState.hpp" #include "etl/impl/LedgerFetcher.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "util/Assert.hpp" #include "util/LedgerUtils.hpp" #include "util/Profiler.hpp" @@ -65,18 +66,18 @@ namespace etl::impl { /** * @brief Loads ledger data into the DB */ -template +template class LedgerLoader { public: - using GetLedgerResponseType = typename LoadBalancerType::GetLedgerResponseType; - using OptionalGetLedgerResponseType = typename LoadBalancerType::OptionalGetLedgerResponseType; - using RawLedgerObjectType = typename LoadBalancerType::RawLedgerObjectType; + using GetLedgerResponseType = etlng::LoadBalancerInterface::GetLedgerResponseType; + using OptionalGetLedgerResponseType = etlng::LoadBalancerInterface::OptionalGetLedgerResponseType; + using RawLedgerObjectType = etlng::LoadBalancerInterface::RawLedgerObjectType; private: util::Logger log_{"ETL"}; std::shared_ptr backend_; - std::shared_ptr loadBalancer_; + std::shared_ptr loadBalancer_; std::reference_wrapper fetcher_; std::reference_wrapper state_; // shared state for ETL @@ -86,7 +87,7 @@ public: */ LedgerLoader( std::shared_ptr backend, - std::shared_ptr balancer, + std::shared_ptr balancer, LedgerFetcherType& fetcher, SystemState const& state ) diff --git a/src/etl/impl/SourceImpl.hpp b/src/etl/impl/SourceImpl.hpp index 3f2f25a33..a2b4175e0 100644 --- a/src/etl/impl/SourceImpl.hpp +++ b/src/etl/impl/SourceImpl.hpp @@ -202,9 +202,9 @@ public: * @return A std::pair of the data and a bool indicating whether the download was successful */ std::pair, bool> - loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, bool cacheOnly = false) final + loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers) final { - return grpcSource_.loadInitialLedger(sequence, numMarkers, cacheOnly); + return grpcSource_.loadInitialLedger(sequence, numMarkers); } /** diff --git a/src/etlng/CMakeLists.txt b/src/etlng/CMakeLists.txt index d0f2854e9..49e443a34 100644 --- a/src/etlng/CMakeLists.txt +++ b/src/etlng/CMakeLists.txt @@ -2,10 +2,13 @@ add_library(clio_etlng) target_sources( clio_etlng - PRIVATE impl/AmendmentBlockHandler.cpp + PRIVATE LoadBalancer.cpp + Source.cpp + impl/AmendmentBlockHandler.cpp impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp + impl/ForwardingSource.cpp impl/Loading.cpp impl/Monitor.cpp impl/TaskManager.cpp diff --git a/src/etlng/ETLService.hpp b/src/etlng/ETLService.hpp new file mode 100644 index 000000000..960d6e456 --- /dev/null +++ b/src/etlng/ETLService.hpp @@ -0,0 +1,283 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/BackendInterface.hpp" +#include "data/LedgerCache.hpp" +#include "data/Types.hpp" +#include "etl/CacheLoader.hpp" +#include "etl/ETLState.hpp" +#include "etl/LedgerFetcherInterface.hpp" +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etl/SystemState.hpp" +#include "etl/impl/AmendmentBlockHandler.hpp" +#include "etl/impl/LedgerFetcher.hpp" +#include "etl/impl/LedgerPublisher.hpp" +#include "etlng/AmendmentBlockHandlerInterface.hpp" +#include "etlng/ETLServiceInterface.hpp" +#include "etlng/ExtractorInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" +#include "etlng/impl/AmendmentBlockHandler.hpp" +#include "etlng/impl/Extraction.hpp" +#include "etlng/impl/Loading.hpp" +#include "etlng/impl/Monitor.hpp" +#include "etlng/impl/Registry.hpp" +#include "etlng/impl/Scheduling.hpp" +#include "etlng/impl/TaskManager.hpp" +#include "etlng/impl/ext/Cache.hpp" +#include "etlng/impl/ext/Core.hpp" +#include "etlng/impl/ext/NFT.hpp" +#include "etlng/impl/ext/Successor.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "util/Assert.hpp" +#include "util/Profiler.hpp" +#include "util/async/context/BasicExecutionContext.hpp" +#include "util/config/Config.hpp" +#include "util/log/Logger.hpp" +#include "util/newconfig/ConfigDefinition.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace etlng { + +/** + * @brief This class is responsible for continuously extracting data from a p2p node, and writing that data to the + * databases. + * + * Usually, multiple different processes share access to the same network accessible databases, in which case only one + * such process is performing ETL and writing to the database. The other processes simply monitor the database for new + * ledgers, and publish those ledgers to the various subscription streams. If a monitoring process determines that the + * ETL writer has failed (no new ledgers written for some time), the process will attempt to become the ETL writer. + * + * If there are multiple monitoring processes that try to become the ETL writer at the same time, one will win out, and + * the others will fall back to monitoring/publishing. In this sense, this class dynamically transitions from monitoring + * to writing and from writing to monitoring, based on the activity of other processes running on different machines. + */ +class ETLService : public ETLServiceInterface { + util::Logger log_{"ETL"}; + + std::shared_ptr backend_; + std::shared_ptr subscriptions_; + std::shared_ptr balancer_; + std::shared_ptr ledgers_; + std::shared_ptr> cacheLoader_; + + std::shared_ptr fetcher_; + std::shared_ptr extractor_; + + etl::SystemState state_; + util::async::CoroExecutionContext ctx_{8}; + + std::shared_ptr amendmentBlockHandler_; + std::shared_ptr loader_; + + std::optional> mainLoop_; + +public: + /** + * @brief Create an instance of ETLService. + * + * @param config The configuration to use + * @param backend BackendInterface implementation + * @param subscriptions Subscription manager + * @param balancer Load balancer to use + * @param ledgers The network validated ledgers datastructure + */ + ETLService( + util::config::ClioConfigDefinition const& config, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer, + std::shared_ptr ledgers + ) + : backend_(std::move(backend)) + , subscriptions_(std::move(subscriptions)) + , balancer_(std::move(balancer)) + , ledgers_(std::move(ledgers)) + , cacheLoader_(std::make_shared>(config, backend_, backend_->cache())) + , fetcher_(std::make_shared(backend_, balancer_)) + , extractor_(std::make_shared(fetcher_)) + , amendmentBlockHandler_(std::make_shared(ctx_, state_)) + , loader_(std::make_shared( + backend_, + fetcher_, + impl::makeRegistry( + impl::CacheExt{backend_->cache()}, + impl::CoreExt{backend_}, + impl::SuccessorExt{backend_, backend_->cache()}, + impl::NFTExt{backend_} + ), + amendmentBlockHandler_ + )) + { + LOG(log_.info()) << "Creating ETLng..."; + } + + ~ETLService() override + { + LOG(log_.debug()) << "Stopping ETLng"; + } + + void + run() override + { + LOG(log_.info()) << "run() in ETLng..."; + + mainLoop_.emplace(ctx_.execute([this] { + auto const rng = loadInitialLedgerIfNeeded(); + + LOG(log_.info()) << "Waiting for next ledger to be validated by network..."; + std::optional mostRecentValidated = ledgers_->getMostRecent(); + + if (not mostRecentValidated) { + LOG(log_.info()) << "The wait for the next validated ledger has been aborted. " + "Exiting monitor loop"; + return; + } + + ASSERT(rng.has_value(), "Ledger range can't be null"); + auto const nextSequence = rng->maxSequence + 1; + + LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence; + + auto scheduler = impl::makeScheduler(impl::ForwardScheduler{*ledgers_, nextSequence} + // impl::BackfillScheduler{nextSequence - 1, nextSequence - 1000}, + // TODO lift limit and start with rng.minSeq + ); + + auto man = impl::TaskManager(ctx_, *scheduler, *extractor_, *loader_); + + // TODO: figure out this: std::make_shared(backend_, ledgers_, nextSequence) + man.run({}); // TODO: needs to be interruptable and fill out settings + })); + } + + void + stop() override + { + LOG(log_.info()) << "Stop called"; + // TODO: stop the service correctly + } + + boost::json::object + getInfo() const override + { + // TODO + return {{"ok", true}}; + } + + bool + isAmendmentBlocked() const override + { + // TODO + return false; + } + + bool + isCorruptionDetected() const override + { + // TODO + return false; + } + + std::optional + getETLState() const override + { + // TODO + return std::nullopt; + } + + std::uint32_t + lastCloseAgeSeconds() const override + { + // TODO + return 0; + } + +private: + // TODO: this better be std::expected + std::optional + loadInitialLedgerIfNeeded() + { + if (auto rng = backend_->hardFetchLedgerRangeNoThrow(); not rng.has_value()) { + LOG(log_.info()) << "Database is empty. Will download a ledger from the network."; + + try { + LOG(log_.info()) << "Waiting for next ledger to be validated by network..."; + if (auto const mostRecentValidated = ledgers_->getMostRecent(); mostRecentValidated.has_value()) { + auto const seq = *mostRecentValidated; + LOG(log_.info()) << "Ledger " << seq << " has been validated. Downloading... "; + + auto [ledger, timeDiff] = ::util::timed>([this, seq]() { + return extractor_->extractLedgerOnly(seq).and_then([this, seq](auto&& data) { + // TODO: loadInitialLedger in balancer should be called fetchEdgeKeys or similar + data.edgeKeys = balancer_->loadInitialLedger(seq, *loader_); + + // TODO: this should be interruptable for graceful shutdown + return loader_->loadInitialLedger(data); + }); + }); + + LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff; + LOG(log_.info()) << "Finished loadInitialLedger. cache size = " << backend_->cache().size(); + + if (ledger.has_value()) + return backend_->hardFetchLedgerRangeNoThrow(); + + LOG(log_.error()) << "Failed to load initial ledger. Exiting monitor loop"; + } else { + LOG(log_.info()) << "The wait for the next validated ledger has been aborted. " + "Exiting monitor loop"; + } + } catch (std::runtime_error const& e) { + LOG(log_.fatal()) << "Failed to load initial ledger: " << e.what(); + amendmentBlockHandler_->notifyAmendmentBlocked(); + } + } else { + LOG(log_.info()) << "Database already populated. Picking up from the tip of history"; + cacheLoader_->load(rng->maxSequence); + + return rng; + } + + return std::nullopt; + } +}; +} // namespace etlng diff --git a/src/etlng/ETLServiceInterface.hpp b/src/etlng/ETLServiceInterface.hpp new file mode 100644 index 000000000..7f0b2fc2b --- /dev/null +++ b/src/etlng/ETLServiceInterface.hpp @@ -0,0 +1,92 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "etl/ETLState.hpp" + +#include + +#include +#include + +namespace etlng { + +/** + * @brief This is a base class for any ETL service implementations. + * @note A ETL service is responsible for continuously extracting data from a p2p node, and writing that data to the + * databases. + */ +struct ETLServiceInterface { + virtual ~ETLServiceInterface() = default; + + /** + * @brief Start all components to run ETL service. + */ + virtual void + run() = 0; + + /** + * @brief Stop the ETL service. + * @note This method blocks until the ETL service has stopped. + */ + virtual void + stop() = 0; + + /** + * @brief Get state of ETL as a JSON object + * + * @return The state of ETL as a JSON object + */ + [[nodiscard]] virtual boost::json::object + getInfo() const = 0; + + /** + * @brief Check for the amendment blocked state. + * + * @return true if currently amendment blocked; false otherwise + */ + [[nodiscard]] virtual bool + isAmendmentBlocked() const = 0; + + /** + * @brief Check whether Clio detected DB corruptions. + * + * @return true if corruption of DB was detected and cache was stopped. + */ + [[nodiscard]] virtual bool + isCorruptionDetected() const = 0; + + /** + * @brief Get the etl nodes' state + * @return The etl nodes' state, nullopt if etl nodes are not connected + */ + [[nodiscard]] virtual std::optional + getETLState() const = 0; + + /** + * @brief Get time passed since last ledger close, in seconds. + * + * @return Time passed since last ledger close + */ + [[nodiscard]] virtual std::uint32_t + lastCloseAgeSeconds() const = 0; +}; + +} // namespace etlng diff --git a/src/etlng/LoadBalancer.cpp b/src/etlng/LoadBalancer.cpp new file mode 100644 index 000000000..fa34af78e --- /dev/null +++ b/src/etlng/LoadBalancer.cpp @@ -0,0 +1,371 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2023, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "etlng/LoadBalancer.hpp" + +#include "data/BackendInterface.hpp" +#include "etl/ETLState.hpp" +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" +#include "etlng/Source.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "rpc/Errors.hpp" +#include "util/Assert.hpp" +#include "util/CoroutineGroup.hpp" +#include "util/Random.hpp" +#include "util/ResponseExpirationCache.hpp" +#include "util/log/Logger.hpp" +#include "util/newconfig/ArrayView.hpp" +#include "util/newconfig/ConfigDefinition.hpp" +#include "util/newconfig/ObjectView.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace util::config; + +namespace etlng { + +std::shared_ptr +LoadBalancer::makeLoadBalancer( + ClioConfigDefinition const& config, + boost::asio::io_context& ioc, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + SourceFactory sourceFactory +) +{ + return std::make_shared( + config, ioc, std::move(backend), std::move(subscriptions), std::move(validatedLedgers), std::move(sourceFactory) + ); +} + +LoadBalancer::LoadBalancer( + ClioConfigDefinition const& config, + boost::asio::io_context& ioc, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + SourceFactory sourceFactory +) +{ + auto const forwardingCacheTimeout = config.get("forwarding.cache_timeout"); + if (forwardingCacheTimeout > 0.f) { + forwardingCache_ = util::ResponseExpirationCache{ + util::config::ClioConfigDefinition::toMilliseconds(forwardingCacheTimeout), + {"server_info", "server_state", "server_definitions", "fee", "ledger_closed"} + }; + } + + auto const numMarkers = config.getValueView("num_markers"); + if (numMarkers.hasValue()) { + auto const value = numMarkers.asIntType(); + downloadRanges_ = value; + } else if (backend->fetchLedgerRange()) { + downloadRanges_ = 4; + } + + auto const allowNoEtl = config.get("allow_no_etl"); + + auto const checkOnETLFailure = [this, allowNoEtl](std::string const& log) { + LOG(log_.warn()) << log; + + if (!allowNoEtl) { + LOG(log_.error()) << "Set allow_no_etl as true in config to allow clio run without valid ETL sources."; + throw std::logic_error("ETL configuration error."); + } + }; + + auto const forwardingTimeout = + ClioConfigDefinition::toMilliseconds(config.get("forwarding.request_timeout")); + auto const etlArray = config.getArray("etl_sources"); + for (auto it = etlArray.begin(); it != etlArray.end(); ++it) { + auto source = sourceFactory( + *it, + ioc, + subscriptions, + validatedLedgers, + forwardingTimeout, + [this]() { + if (not hasForwardingSource_.lock().get()) + chooseForwardingSource(); + }, + [this](bool wasForwarding) { + if (wasForwarding) + chooseForwardingSource(); + }, + [this]() { + if (forwardingCache_.has_value()) + forwardingCache_->invalidate(); + } + ); + + // checking etl node validity + auto const stateOpt = etl::ETLState::fetchETLStateFromSource(*source); + + if (!stateOpt) { + LOG(log_.warn()) << "Failed to fetch ETL state from source = " << source->toString() + << " Please check the configuration and network"; + } else if (etlState_ && etlState_->networkID && stateOpt->networkID && + etlState_->networkID != stateOpt->networkID) { + checkOnETLFailure(fmt::format( + "ETL sources must be on the same network. Source network id = {} does not match others network id = {}", + *(stateOpt->networkID), + *(etlState_->networkID) + )); + } else { + etlState_ = stateOpt; + } + + sources_.push_back(std::move(source)); + LOG(log_.info()) << "Added etl source - " << sources_.back()->toString(); + } + + if (!etlState_) + checkOnETLFailure("Failed to fetch ETL state from any source. Please check the configuration and network"); + + if (sources_.empty()) + checkOnETLFailure("No ETL sources configured. Please check the configuration"); + + // This is made separate from source creation to prevent UB in case one of the sources will call + // chooseForwardingSource while we are still filling the sources_ vector + for (auto const& source : sources_) { + source->run(); + } +} + +LoadBalancer::~LoadBalancer() +{ + sources_.clear(); +} + +std::vector +LoadBalancer::loadInitialLedger( + uint32_t sequence, + etlng::InitialLoadObserverInterface& loadObserver, + std::chrono::steady_clock::duration retryAfter +) +{ + std::vector response; + execute( + [this, &response, &sequence, &loadObserver](auto& source) { + auto [data, res] = source->loadInitialLedger(sequence, downloadRanges_, loadObserver); + + if (!res) { + LOG(log_.error()) << "Failed to download initial ledger." + << " Sequence = " << sequence << " source = " << source->toString(); + } else { + response = std::move(data); + } + + return res; + }, + sequence, + retryAfter + ); + return response; +} + +LoadBalancer::OptionalGetLedgerResponseType +LoadBalancer::fetchLedger( + uint32_t ledgerSequence, + bool getObjects, + bool getObjectNeighbors, + std::chrono::steady_clock::duration retryAfter +) +{ + GetLedgerResponseType response; + execute( + [&response, ledgerSequence, getObjects, getObjectNeighbors, log = log_](auto& source) { + auto [status, data] = source->fetchLedger(ledgerSequence, getObjects, getObjectNeighbors); + response = std::move(data); + if (status.ok() && response.validated()) { + LOG(log.info()) << "Successfully fetched ledger = " << ledgerSequence + << " from source = " << source->toString(); + return true; + } + + LOG(log.warn()) << "Could not fetch ledger " << ledgerSequence << ", Reply: " << response.DebugString() + << ", error_code: " << status.error_code() << ", error_msg: " << status.error_message() + << ", source = " << source->toString(); + return false; + }, + ledgerSequence, + retryAfter + ); + return response; +} + +std::expected +LoadBalancer::forwardToRippled( + boost::json::object const& request, + std::optional const& clientIp, + bool isAdmin, + boost::asio::yield_context yield +) +{ + if (not request.contains("command")) + return std::unexpected{rpc::ClioError::RpcCommandIsMissing}; + + auto const cmd = boost::json::value_to(request.at("command")); + if (forwardingCache_) { + if (auto cachedResponse = forwardingCache_->get(cmd); cachedResponse) { + return std::move(cachedResponse).value(); + } + } + + ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests."); + std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1); + + auto numAttempts = 0u; + + auto xUserValue = isAdmin ? kADMIN_FORWARDING_X_USER_VALUE : kUSER_FORWARDING_X_USER_VALUE; + + std::optional response; + rpc::ClioError error = rpc::ClioError::EtlConnectionError; + while (numAttempts < sources_.size()) { + auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield); + if (res) { + response = std::move(res).value(); + break; + } + error = std::max(error, res.error()); // Choose the best result between all sources + + sourceIdx = (sourceIdx + 1) % sources_.size(); + ++numAttempts; + } + + if (response) { + if (forwardingCache_ and not response->contains("error")) + forwardingCache_->put(cmd, *response); + return std::move(response).value(); + } + + return std::unexpected{error}; +} + +boost::json::value +LoadBalancer::toJson() const +{ + boost::json::array ret; + for (auto& src : sources_) + ret.push_back(src->toJson()); + + return ret; +} + +template +void +LoadBalancer::execute(Func f, uint32_t ledgerSequence, std::chrono::steady_clock::duration retryAfter) +{ + ASSERT(not sources_.empty(), "ETL sources must be configured to execute functions."); + size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1); + + size_t numAttempts = 0; + + while (true) { + auto& source = sources_[sourceIdx]; + + LOG(log_.debug()) << "Attempting to execute func. ledger sequence = " << ledgerSequence + << " - source = " << source->toString(); + // Originally, it was (source->hasLedger(ledgerSequence) || true) + /* Sometimes rippled has ledger but doesn't actually know. However, + but this does NOT happen in the normal case and is safe to remove + This || true is only needed when loading full history standalone */ + if (source->hasLedger(ledgerSequence)) { + bool const res = f(source); + if (res) { + LOG(log_.debug()) << "Successfully executed func at source = " << source->toString() + << " - ledger sequence = " << ledgerSequence; + break; + } + + LOG(log_.warn()) << "Failed to execute func at source = " << source->toString() + << " - ledger sequence = " << ledgerSequence; + } else { + LOG(log_.warn()) << "Ledger not present at source = " << source->toString() + << " - ledger sequence = " << ledgerSequence; + } + sourceIdx = (sourceIdx + 1) % sources_.size(); + numAttempts++; + if (numAttempts % sources_.size() == 0) { + LOG(log_.info()) << "Ledger sequence " << ledgerSequence + << " is not yet available from any configured sources. Sleeping and trying again"; + std::this_thread::sleep_for(retryAfter); + } + } +} + +std::optional +LoadBalancer::getETLState() noexcept +{ + if (!etlState_) { + // retry ETLState fetch + etlState_ = etl::ETLState::fetchETLStateFromSource(*this); + } + return etlState_; +} + +void +LoadBalancer::stop(boost::asio::yield_context yield) +{ + util::CoroutineGroup group{yield}; + std::ranges::for_each(sources_, [&group, yield](auto& source) { + group.spawn(yield, [&source](boost::asio::yield_context innerYield) { source->stop(innerYield); }); + }); + group.asyncWait(yield); +} + +void +LoadBalancer::chooseForwardingSource() +{ + LOG(log_.info()) << "Choosing a new source to forward subscriptions"; + auto hasForwardingSourceLock = hasForwardingSource_.lock(); + hasForwardingSourceLock.get() = false; + for (auto& source : sources_) { + if (not hasForwardingSourceLock.get() and source->isConnected()) { + source->setForwarding(true); + hasForwardingSourceLock.get() = true; + } else { + source->setForwarding(false); + } + } +} + +} // namespace etlng diff --git a/src/etlng/LoadBalancer.hpp b/src/etlng/LoadBalancer.hpp new file mode 100644 index 000000000..ac01100da --- /dev/null +++ b/src/etlng/LoadBalancer.hpp @@ -0,0 +1,271 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2022, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/BackendInterface.hpp" +#include "etl/ETLState.hpp" +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" +#include "etlng/Source.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "rpc/Errors.hpp" +#include "util/Assert.hpp" +#include "util/Mutex.hpp" +#include "util/ResponseExpirationCache.hpp" +#include "util/log/Logger.hpp" +#include "util/newconfig/ConfigDefinition.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace etlng { + +/** + * @brief A tag class to help identify LoadBalancer in templated code. + */ +struct LoadBalancerTag { + virtual ~LoadBalancerTag() = default; +}; + +template +concept SomeLoadBalancer = std::derived_from; + +/** + * @brief This class is used to manage connections to transaction processing processes. + * + * This class spawns a listener for each etl source, which listens to messages on the ledgers stream (to keep track of + * which ledgers have been validated by the network, and the range of ledgers each etl source has). This class also + * allows requests for ledger data to be load balanced across all possible ETL sources. + */ +class LoadBalancer : public etlng::LoadBalancerInterface, LoadBalancerTag { +public: + using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject; + using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse; + using OptionalGetLedgerResponseType = std::optional; + +private: + static constexpr std::uint32_t kDEFAULT_DOWNLOAD_RANGES = 16; + + util::Logger log_{"ETL"}; + // Forwarding cache must be destroyed after sources because sources have a callback to invalidate cache + std::optional forwardingCache_; + std::optional forwardingXUserValue_; + + std::vector sources_; + std::optional etlState_; + std::uint32_t downloadRanges_ = + kDEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */ + + // Using mutext instead of atomic_bool because choosing a new source to + // forward messages should be done with a mutual exclusion otherwise there will be a race condition + util::Mutex hasForwardingSource_{false}; + +public: + /** + * @brief Value for the X-User header when forwarding admin requests + */ + static constexpr std::string_view kADMIN_FORWARDING_X_USER_VALUE = "clio_admin"; + + /** + * @brief Value for the X-User header when forwarding user requests + */ + static constexpr std::string_view kUSER_FORWARDING_X_USER_VALUE = "clio_user"; + + /** + * @brief Create an instance of the load balancer. + * + * @param config The configuration to use + * @param ioc The io_context to run on + * @param backend BackendInterface implementation + * @param subscriptions Subscription manager + * @param validatedLedgers The network validated ledgers datastructure + * @param sourceFactory A factory function to create a source + */ + LoadBalancer( + util::config::ClioConfigDefinition const& config, + boost::asio::io_context& ioc, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + SourceFactory sourceFactory = makeSource + ); + + /** + * @brief A factory function for the load balancer. + * + * @param config The configuration to use + * @param ioc The io_context to run on + * @param backend BackendInterface implementation + * @param subscriptions Subscription manager + * @param validatedLedgers The network validated ledgers datastructure + * @param sourceFactory A factory function to create a source + * @return A shared pointer to a new instance of LoadBalancer + */ + static std::shared_ptr + makeLoadBalancer( + util::config::ClioConfigDefinition const& config, + boost::asio::io_context& ioc, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + SourceFactory sourceFactory = makeSource + ); + + ~LoadBalancer() override; + + /** + * @brief Load the initial ledger, writing data to the queue. + * @note This function will retry indefinitely until the ledger is downloaded. + * + * @param sequence Sequence of ledger to download + * @param retryAfter Time to wait between retries (2 seconds by default) + * @return A std::vector The ledger data + */ + std::vector + loadInitialLedger( + [[maybe_unused]] uint32_t sequence, + [[maybe_unused]] std::chrono::steady_clock::duration retryAfter + ) override + { + ASSERT(false, "Not available for new ETL"); + std::unreachable(); + }; + + /** + * @brief Load the initial ledger, writing data to the queue. + * @note This function will retry indefinitely until the ledger is downloaded. + * + * @param sequence Sequence of ledger to download + * @param observer The observer to notify of progress + * @param retryAfter Time to wait between retries (2 seconds by default) + * @return A std::vector The ledger data + */ + std::vector + loadInitialLedger( + uint32_t sequence, + etlng::InitialLoadObserverInterface& observer, + std::chrono::steady_clock::duration retryAfter + ) override; + + /** + * @brief Fetch data for a specific ledger. + * + * This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger + * is found in the database, or the server is shutting down. + * + * @param ledgerSequence Sequence of the ledger to fetch + * @param getObjects Whether to get the account state diff between this ledger and the prior one + * @param getObjectNeighbors Whether to request object neighbors + * @param retryAfter Time to wait between retries (2 seconds by default) + * @return The extracted data, if extraction was successful. If the ledger was found + * in the database or the server is shutting down, the optional will be empty + */ + OptionalGetLedgerResponseType + fetchLedger( + uint32_t ledgerSequence, + bool getObjects, + bool getObjectNeighbors, + std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2} + ) override; + + /** + * @brief Represent the state of this load balancer as a JSON object + * + * @return JSON representation of the state of this load balancer. + */ + boost::json::value + toJson() const override; + + /** + * @brief Forward a JSON RPC request to a randomly selected rippled node. + * + * @param request JSON-RPC request to forward + * @param clientIp The IP address of the peer, if known + * @param isAdmin Whether the request is from an admin + * @param yield The coroutine context + * @return Response received from rippled node as JSON object on success or error on failure + */ + std::expected + forwardToRippled( + boost::json::object const& request, + std::optional const& clientIp, + bool isAdmin, + boost::asio::yield_context yield + ) override; + + /** + * @brief Return state of ETL nodes. + * @return ETL state, nullopt if etl nodes not available + */ + std::optional + getETLState() noexcept override; + + /** + * @brief Stop the load balancer. This will stop all subscription sources. + * @note This function will asynchronously wait for all sources to stop. + * + * @param yield The coroutine context + */ + void + stop(boost::asio::yield_context yield) override; + +private: + /** + * @brief Execute a function on a randomly selected source. + * + * @note f is a function that takes an Source as an argument and returns a bool. + * Attempt to execute f for one randomly chosen Source that has the specified ledger. If f returns false, another + * randomly chosen Source is used. The process repeats until f returns true. + * + * @param f Function to execute. This function takes the ETL source as an argument, and returns a bool + * @param ledgerSequence f is executed for each Source that has this ledger + * @param retryAfter Time to wait between retries (2 seconds by default) + * server is shutting down + */ + template + void + execute(Func f, uint32_t ledgerSequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}); + + /** + * @brief Choose a new source to forward requests + */ + void + chooseForwardingSource(); +}; + +} // namespace etlng diff --git a/src/etlng/LoadBalancerInterface.hpp b/src/etlng/LoadBalancerInterface.hpp index 3466a6e79..20623d456 100644 --- a/src/etlng/LoadBalancerInterface.hpp +++ b/src/etlng/LoadBalancerInterface.hpp @@ -129,6 +129,15 @@ public: */ virtual std::optional getETLState() noexcept = 0; + + /** + * @brief Stop the load balancer. This will stop all subscription sources. + * @note This function will asynchronously wait for all sources to stop. + * + * @param yield The coroutine context + */ + virtual void + stop(boost::asio::yield_context yield) = 0; }; } // namespace etlng diff --git a/src/etlng/LoaderInterface.hpp b/src/etlng/LoaderInterface.hpp index 929d71679..72cad3192 100644 --- a/src/etlng/LoaderInterface.hpp +++ b/src/etlng/LoaderInterface.hpp @@ -45,7 +45,7 @@ struct LoaderInterface { * @param data The data to load * @return Optional ledger header */ - virtual std::optional + [[nodiscard]] virtual std::optional loadInitialLedger(model::LedgerData const& data) = 0; }; diff --git a/src/etlng/Source.cpp b/src/etlng/Source.cpp new file mode 100644 index 000000000..3f9395446 --- /dev/null +++ b/src/etlng/Source.cpp @@ -0,0 +1,73 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "etlng/Source.hpp" + +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etl/impl/ForwardingSource.hpp" +#include "etl/impl/SubscriptionSource.hpp" +#include "etlng/impl/GrpcSource.hpp" +#include "etlng/impl/SourceImpl.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "util/newconfig/ObjectView.hpp" + +#include + +#include +#include +#include +#include + +namespace etlng { + +SourcePtr +makeSource( + util::config::ObjectView const& config, + boost::asio::io_context& ioc, + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + std::chrono::steady_clock::duration forwardingTimeout, + SourceBase::OnConnectHook onConnect, + SourceBase::OnDisconnectHook onDisconnect, + SourceBase::OnLedgerClosedHook onLedgerClosed +) +{ + auto const ip = config.get("ip"); + auto const wsPort = config.get("ws_port"); + auto const grpcPort = config.get("grpc_port"); + + etl::impl::ForwardingSource forwardingSource{ip, wsPort, forwardingTimeout}; + impl::GrpcSource grpcSource{ip, grpcPort}; + auto subscriptionSource = std::make_unique( + ioc, + ip, + wsPort, + std::move(validatedLedgers), + std::move(subscriptions), + std::move(onConnect), + std::move(onDisconnect), + std::move(onLedgerClosed) + ); + + return std::make_unique>( + ip, wsPort, grpcPort, std::move(grpcSource), std::move(subscriptionSource), std::move(forwardingSource) + ); +} + +} // namespace etlng diff --git a/src/etlng/Source.hpp b/src/etlng/Source.hpp new file mode 100644 index 000000000..52c07ef9b --- /dev/null +++ b/src/etlng/Source.hpp @@ -0,0 +1,194 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/BackendInterface.hpp" +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "rpc/Errors.hpp" +#include "util/newconfig/ObjectView.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace etlng { + +/** + * @brief Provides an implementation of a ETL source + */ +class SourceBase { +public: + using OnConnectHook = std::function; + using OnDisconnectHook = std::function; + using OnLedgerClosedHook = std::function; + + virtual ~SourceBase() = default; + + /** + * @brief Run subscriptions loop of the source + */ + virtual void + run() = 0; + + /** + * @brief Stop Source. + * @note This method will asynchronously wait for source to be stopped. + * + * @param yield The coroutine context. + */ + virtual void + stop(boost::asio::yield_context yield) = 0; + + /** + * @brief Check if source is connected + * + * @return true if source is connected; false otherwise + */ + [[nodiscard]] virtual bool + isConnected() const = 0; + + /** + * @brief Set the forwarding state of the source. + * + * @param isForwarding Whether to forward or not + */ + virtual void + setForwarding(bool isForwarding) = 0; + + /** + * @brief Represent the source as a JSON object + * + * @return JSON representation of the source + */ + [[nodiscard]] virtual boost::json::object + toJson() const = 0; + + /** @return String representation of the source (for debug) */ + [[nodiscard]] virtual std::string + toString() const = 0; + + /** + * @brief Check if ledger is known by this source. + * + * @param sequence The ledger sequence to check + * @return true if ledger is in the range of this source; false otherwise + */ + [[nodiscard]] virtual bool + hasLedger(uint32_t sequence) const = 0; + + /** + * @brief Fetch data for a specific ledger. + * + * This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger + * is found in the database, or the server is shutting down. + * + * @param sequence Sequence of the ledger to fetch + * @param getObjects Whether to get the account state diff between this ledger and the prior one; defaults to true + * @param getObjectNeighbors Whether to request object neighbors; defaults to false + * @return A std::pair of the response status and the response itself + */ + [[nodiscard]] virtual std::pair + fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false) = 0; + + /** + * @brief Download a ledger in full. + * + * @param sequence Sequence of the ledger to download + * @param numMarkers Number of markers to generate for async calls + * @param loader InitialLoadObserverInterface implementation + * @return A std::pair of the data and a bool indicating whether the download was successful + */ + virtual std::pair, bool> + loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, etlng::InitialLoadObserverInterface& loader) = 0; + + /** + * @brief Forward a request to rippled. + * + * @param request The request to forward + * @param forwardToRippledClientIp IP of the client forwarding this request if known + * @param xUserValue Value of the X-User header + * @param yield The coroutine context + * @return Response on success or error on failure + */ + [[nodiscard]] virtual std::expected + forwardToRippled( + boost::json::object const& request, + std::optional const& forwardToRippledClientIp, + std::string_view xUserValue, + boost::asio::yield_context yield + ) const = 0; +}; + +using SourcePtr = std::unique_ptr; + +using SourceFactory = std::function subscriptions, + std::shared_ptr validatedLedgers, + std::chrono::steady_clock::duration forwardingTimeout, + SourceBase::OnConnectHook onConnect, + SourceBase::OnDisconnectHook onDisconnect, + SourceBase::OnLedgerClosedHook onLedgerClosed +)>; + +/** + * @brief Create a source + * + * @param config The configuration to use + * @param ioc The io_context to run on + * @param subscriptions Subscription manager + * @param validatedLedgers The network validated ledgers data structure + * @param forwardingTimeout The timeout for forwarding to rippled + * @param onConnect The hook to call on connect + * @param onDisconnect The hook to call on disconnect + * @param onLedgerClosed The hook to call on ledger closed. This is called when a ledger is closed and the source is set + * as forwarding. + * @return The created source + */ +[[nodiscard]] SourcePtr +makeSource( + util::config::ObjectView const& config, + boost::asio::io_context& ioc, + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + std::chrono::steady_clock::duration forwardingTimeout, + SourceBase::OnConnectHook onConnect, + SourceBase::OnDisconnectHook onDisconnect, + SourceBase::OnLedgerClosedHook onLedgerClosed +); + +} // namespace etlng diff --git a/src/etlng/impl/ForwardingSource.cpp b/src/etlng/impl/ForwardingSource.cpp new file mode 100644 index 000000000..b4fb024bf --- /dev/null +++ b/src/etlng/impl/ForwardingSource.cpp @@ -0,0 +1,116 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "etlng/impl/ForwardingSource.hpp" + +#include "rpc/Errors.hpp" +#include "util/log/Logger.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace etlng::impl { + +ForwardingSource::ForwardingSource( + std::string ip, + std::string wsPort, + std::chrono::steady_clock::duration forwardingTimeout, + std::chrono::steady_clock::duration connTimeout +) + : log_(fmt::format("ForwardingSource[{}:{}]", ip, wsPort)) + , connectionBuilder_(std::move(ip), std::move(wsPort)) + , forwardingTimeout_{forwardingTimeout} +{ + connectionBuilder_.setConnectionTimeout(connTimeout) + .addHeader( + {boost::beast::http::field::user_agent, fmt::format("{} websocket-client-coro", BOOST_BEAST_VERSION_STRING)} + ); +} + +std::expected +ForwardingSource::forwardToRippled( + boost::json::object const& request, + std::optional const& forwardToRippledClientIp, + std::string_view xUserValue, + boost::asio::yield_context yield +) const +{ + auto connectionBuilder = connectionBuilder_; + if (forwardToRippledClientIp) { + connectionBuilder.addHeader( + {boost::beast::http::field::forwarded, fmt::format("for={}", *forwardToRippledClientIp)} + ); + } + + connectionBuilder.addHeader({"X-User", std::string{xUserValue}}); + + auto expectedConnection = connectionBuilder.connect(yield); + if (not expectedConnection) { + LOG(log_.debug()) << "Couldn't connect to rippled to forward request."; + return std::unexpected{rpc::ClioError::EtlConnectionError}; + } + auto& connection = expectedConnection.value(); + + auto writeError = connection->write(boost::json::serialize(request), yield, forwardingTimeout_); + if (writeError) { + LOG(log_.debug()) << "Error sending request to rippled to forward request."; + return std::unexpected{rpc::ClioError::EtlRequestError}; + } + + auto response = connection->read(yield, forwardingTimeout_); + if (not response) { + if (auto errorCode = response.error().errorCode(); + errorCode.has_value() and errorCode->value() == boost::system::errc::timed_out) { + LOG(log_.debug()) << "Request to rippled timed out"; + return std::unexpected{rpc::ClioError::EtlRequestTimeout}; + } + LOG(log_.debug()) << "Error sending request to rippled to forward request."; + return std::unexpected{rpc::ClioError::EtlRequestError}; + } + + boost::json::value parsedResponse; + try { + parsedResponse = boost::json::parse(*response); + if (not parsedResponse.is_object()) + throw std::runtime_error("response is not an object"); + } catch (std::exception const& e) { + LOG(log_.debug()) << "Error parsing response from rippled: " << e.what() << ". Response: " << *response; + return std::unexpected{rpc::ClioError::EtlInvalidResponse}; + } + + auto responseObject = parsedResponse.as_object(); + responseObject["forwarded"] = true; + + return responseObject; +} + +} // namespace etlng::impl diff --git a/src/etlng/impl/ForwardingSource.hpp b/src/etlng/impl/ForwardingSource.hpp new file mode 100644 index 000000000..8624e4e50 --- /dev/null +++ b/src/etlng/impl/ForwardingSource.hpp @@ -0,0 +1,70 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "rpc/Errors.hpp" +#include "util/log/Logger.hpp" +#include "util/requests/WsConnection.hpp" + +#include +#include + +#include +#include +#include +#include +#include + +namespace etlng::impl { + +class ForwardingSource { + util::Logger log_; + util::requests::WsConnectionBuilder connectionBuilder_; + std::chrono::steady_clock::duration forwardingTimeout_; + + static constexpr std::chrono::seconds kCONNECTION_TIMEOUT{3}; + +public: + ForwardingSource( + std::string ip, + std::string wsPort, + std::chrono::steady_clock::duration forwardingTimeout, + std::chrono::steady_clock::duration connTimeout = ForwardingSource::kCONNECTION_TIMEOUT + ); + + /** + * @brief Forward a request to rippled. + * + * @param request The request to forward + * @param forwardToRippledClientIp IP of the client forwarding this request if known + * @param xUserValue Optional value for X-User header + * @param yield The coroutine context + * @return Response on success or error on failure + */ + std::expected + forwardToRippled( + boost::json::object const& request, + std::optional const& forwardToRippledClientIp, + std::string_view xUserValue, + boost::asio::yield_context yield + ) const; +}; + +} // namespace etlng::impl diff --git a/src/etlng/impl/Loading.hpp b/src/etlng/impl/Loading.hpp index 435dcb4e4..caa677bce 100644 --- a/src/etlng/impl/Loading.hpp +++ b/src/etlng/impl/Loading.hpp @@ -39,7 +39,6 @@ #include #include -#include #include #include #include diff --git a/src/etlng/impl/Registry.hpp b/src/etlng/impl/Registry.hpp index d5d262388..921b70811 100644 --- a/src/etlng/impl/Registry.hpp +++ b/src/etlng/impl/Registry.hpp @@ -24,7 +24,6 @@ #include -#include #include #include #include @@ -81,7 +80,7 @@ concept ContainsValidHook = HasLedgerDataHook or HasInitialDataHook or template concept NoTwoOfKind = not(HasLedgerDataHook and HasTransactionHook) and - not(HasInitialDataHook and HasInitialTransactionHook) and not(HasInitialDataHook and HasObjectHook) and + not(HasInitialDataHook and HasInitialTransactionHook) and not(HasInitialObjectsHook and HasInitialObjectHook); template @@ -216,4 +215,10 @@ public: } }; +static auto +makeRegistry(auto&&... exts) +{ + return std::make_unique...>>(std::forward(exts)...); +} + } // namespace etlng::impl diff --git a/src/etlng/impl/SourceImpl.hpp b/src/etlng/impl/SourceImpl.hpp new file mode 100644 index 000000000..1c99d973b --- /dev/null +++ b/src/etlng/impl/SourceImpl.hpp @@ -0,0 +1,232 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "etl/impl/ForwardingSource.hpp" +#include "etl/impl/SubscriptionSource.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/Source.hpp" +#include "etlng/impl/GrpcSource.hpp" +#include "rpc/Errors.hpp" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace etlng::impl { + +/** + * @brief Provides an implementation of a ETL source + * + * @tparam GrpcSourceType The type of the gRPC source + * @tparam SubscriptionSourceTypePtr The type of the subscription source + * @tparam ForwardingSourceType The type of the forwarding source + */ +template < + typename GrpcSourceType = GrpcSource, + typename SubscriptionSourceTypePtr = std::unique_ptr, + typename ForwardingSourceType = etl::impl::ForwardingSource> +class SourceImpl : public SourceBase { + std::string ip_; + std::string wsPort_; + std::string grpcPort_; + + GrpcSourceType grpcSource_; + SubscriptionSourceTypePtr subscriptionSource_; + ForwardingSourceType forwardingSource_; + +public: + /** + * @brief Construct a new SourceImpl object + * + * @param ip The IP of the source + * @param wsPort The web socket port of the source + * @param grpcPort The gRPC port of the source + * @param grpcSource The gRPC source + * @param subscriptionSource The subscription source + * @param forwardingSource The forwarding source + */ + template + requires std::is_same_v and + std::is_same_v + SourceImpl( + std::string ip, + std::string wsPort, + std::string grpcPort, + SomeGrpcSourceType&& grpcSource, + SubscriptionSourceTypePtr subscriptionSource, + SomeForwardingSourceType&& forwardingSource + ) + : ip_(std::move(ip)) + , wsPort_(std::move(wsPort)) + , grpcPort_(std::move(grpcPort)) + , grpcSource_(std::forward(grpcSource)) + , subscriptionSource_(std::move(subscriptionSource)) + , forwardingSource_(std::forward(forwardingSource)) + { + } + + /** + * @brief Run subscriptions loop of the source + */ + void + run() final + { + subscriptionSource_->run(); + } + + void + stop(boost::asio::yield_context yield) final + { + subscriptionSource_->stop(yield); + } + + /** + * @brief Check if source is connected + * + * @return true if source is connected; false otherwise + */ + bool + isConnected() const final + { + return subscriptionSource_->isConnected(); + } + + /** + * @brief Set the forwarding state of the source. + * + * @param isForwarding Whether to forward or not + */ + void + setForwarding(bool isForwarding) final + { + subscriptionSource_->setForwarding(isForwarding); + } + + /** + * @brief Represent the source as a JSON object + * + * @return JSON representation of the source + */ + boost::json::object + toJson() const final + { + boost::json::object res; + + res["validated_range"] = subscriptionSource_->validatedRange(); + res["is_connected"] = std::to_string(static_cast(subscriptionSource_->isConnected())); + res["ip"] = ip_; + res["ws_port"] = wsPort_; + res["grpc_port"] = grpcPort_; + + auto last = subscriptionSource_->lastMessageTime(); + if (last.time_since_epoch().count() != 0) { + res["last_msg_age_seconds"] = std::to_string( + std::chrono::duration_cast(std::chrono::steady_clock::now() - last).count() + ); + } + + return res; + } + + /** @return String representation of the source (for debug) */ + std::string + toString() const final + { + return "{validated range: " + subscriptionSource_->validatedRange() + ", ip: " + ip_ + + ", web socket port: " + wsPort_ + ", grpc port: " + grpcPort_ + "}"; + } + + /** + * @brief Check if ledger is known by this source. + * + * @param sequence The ledger sequence to check + * @return true if ledger is in the range of this source; false otherwise + */ + bool + hasLedger(uint32_t sequence) const final + { + return subscriptionSource_->hasLedger(sequence); + } + + /** + * @brief Fetch data for a specific ledger. + * + * This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger + * is found in the database, or the server is shutting down. + * + * @param sequence Sequence of the ledger to fetch + * @param getObjects Whether to get the account state diff between this ledger and the prior one; defaults to true + * @param getObjectNeighbors Whether to request object neighbors; defaults to false + * @return A std::pair of the response status and the response itself + */ + std::pair + fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false) final + { + return grpcSource_.fetchLedger(sequence, getObjects, getObjectNeighbors); + } + + /** + * @brief Download a ledger in full. + * + * @param sequence Sequence of the ledger to download + * @param numMarkers Number of markers to generate for async calls + * @param loader InitialLoadObserverInterface implementation + * @return A std::pair of the data and a bool indicating whether the download was successful + */ + std::pair, bool> + loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, etlng::InitialLoadObserverInterface& loader) final + { + return grpcSource_.loadInitialLedger(sequence, numMarkers, loader); + } + + /** + * @brief Forward a request to rippled. + * + * @param request The request to forward + * @param forwardToRippledClientIp IP of the client forwarding this request if known + * @param xUserValue Optional value of the X-User header + * @param yield The coroutine context + * @return Response or ClioError + */ + std::expected + forwardToRippled( + boost::json::object const& request, + std::optional const& forwardToRippledClientIp, + std::string_view xUserValue, + boost::asio::yield_context yield + ) const final + { + return forwardingSource_.forwardToRippled(request, forwardToRippledClientIp, xUserValue, yield); + } +}; + +} // namespace etlng::impl diff --git a/src/etlng/impl/ext/NFT.hpp b/src/etlng/impl/ext/NFT.hpp index 65e45233c..6d6a146cf 100644 --- a/src/etlng/impl/ext/NFT.hpp +++ b/src/etlng/impl/ext/NFT.hpp @@ -20,15 +20,11 @@ #pragma once #include "data/BackendInterface.hpp" -#include "data/DBHelpers.hpp" -#include "etl/NFTHelpers.hpp" #include "etlng/Models.hpp" #include "util/log/Logger.hpp" #include #include -#include -#include namespace etlng::impl { diff --git a/src/rpc/RPCEngine.hpp b/src/rpc/RPCEngine.hpp index 13216c20d..fe46db104 100644 --- a/src/rpc/RPCEngine.hpp +++ b/src/rpc/RPCEngine.hpp @@ -20,6 +20,7 @@ #pragma once #include "data/BackendInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "rpc/Errors.hpp" #include "rpc/RPCHelpers.hpp" #include "rpc/WorkQueue.hpp" @@ -55,7 +56,7 @@ namespace rpc { /** * @brief The RPC engine that ties all RPC-related functionality together. */ -template +template class RPCEngine { util::Logger perfLog_{"Performance"}; util::Logger log_{"RPC"}; @@ -67,7 +68,7 @@ class RPCEngine { std::shared_ptr handlerProvider_; - impl::ForwardingProxy forwardingProxy_; + impl::ForwardingProxy forwardingProxy_; std::optional responseCache_; @@ -86,7 +87,7 @@ public: RPCEngine( util::config::ClioConfigDefinition const& config, std::shared_ptr const& backend, - std::shared_ptr const& balancer, + std::shared_ptr const& balancer, web::dosguard::DOSGuardInterface const& dosGuard, WorkQueue& workQueue, CountersType& counters, @@ -128,7 +129,7 @@ public: makeRPCEngine( util::config::ClioConfigDefinition const& config, std::shared_ptr const& backend, - std::shared_ptr const& balancer, + std::shared_ptr const& balancer, web::dosguard::DOSGuardInterface const& dosGuard, WorkQueue& workQueue, CountersType& counters, diff --git a/src/rpc/common/impl/ForwardingProxy.hpp b/src/rpc/common/impl/ForwardingProxy.hpp index c60773df1..2c07b79ab 100644 --- a/src/rpc/common/impl/ForwardingProxy.hpp +++ b/src/rpc/common/impl/ForwardingProxy.hpp @@ -19,6 +19,7 @@ #pragma once +#include "etlng/LoadBalancerInterface.hpp" #include "rpc/Errors.hpp" #include "rpc/RPCHelpers.hpp" #include "rpc/common/Types.hpp" @@ -31,20 +32,21 @@ #include #include #include +#include namespace rpc::impl { -template +template class ForwardingProxy { util::Logger log_{"RPC"}; - std::shared_ptr balancer_; + std::shared_ptr balancer_; std::reference_wrapper counters_; std::shared_ptr handlerProvider_; public: ForwardingProxy( - std::shared_ptr const& balancer, + std::shared_ptr const& balancer, CountersType& counters, std::shared_ptr const& handlerProvider ) diff --git a/src/rpc/common/impl/HandlerProvider.cpp b/src/rpc/common/impl/HandlerProvider.cpp index cf4485cc2..6707cca07 100644 --- a/src/rpc/common/impl/HandlerProvider.cpp +++ b/src/rpc/common/impl/HandlerProvider.cpp @@ -21,7 +21,8 @@ #include "data/AmendmentCenterInterface.hpp" #include "data/BackendInterface.hpp" -#include "etl/ETLService.hpp" +#include "etlng/ETLServiceInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "rpc/Counters.hpp" #include "rpc/common/AnyHandler.hpp" @@ -72,8 +73,8 @@ ProductionHandlerProvider::ProductionHandlerProvider( util::config::ClioConfigDefinition const& config, std::shared_ptr const& backend, std::shared_ptr const& subscriptionManager, - std::shared_ptr const& balancer, - std::shared_ptr const& etl, + std::shared_ptr const& balancer, + std::shared_ptr const& etl, std::shared_ptr const& amendmentCenter, Counters const& counters ) diff --git a/src/rpc/common/impl/HandlerProvider.hpp b/src/rpc/common/impl/HandlerProvider.hpp index 89ea5661f..2c07428cf 100644 --- a/src/rpc/common/impl/HandlerProvider.hpp +++ b/src/rpc/common/impl/HandlerProvider.hpp @@ -21,6 +21,8 @@ #include "data/AmendmentCenterInterface.hpp" #include "data/BackendInterface.hpp" +#include "etlng/ETLServiceInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "rpc/common/AnyHandler.hpp" #include "rpc/common/HandlerProvider.hpp" @@ -32,10 +34,6 @@ #include #include -namespace etl { -class ETLService; -class LoadBalancer; -} // namespace etl namespace rpc { class Counters; } // namespace rpc @@ -55,8 +53,8 @@ public: util::config::ClioConfigDefinition const& config, std::shared_ptr const& backend, std::shared_ptr const& subscriptionManager, - std::shared_ptr const& balancer, - std::shared_ptr const& etl, + std::shared_ptr const& balancer, + std::shared_ptr const& etl, std::shared_ptr const& amendmentCenter, Counters const& counters ); diff --git a/src/rpc/handlers/ServerInfo.hpp b/src/rpc/handlers/ServerInfo.hpp index 008450721..085066f7c 100644 --- a/src/rpc/handlers/ServerInfo.hpp +++ b/src/rpc/handlers/ServerInfo.hpp @@ -21,6 +21,8 @@ #include "data/BackendInterface.hpp" #include "data/DBHelpers.hpp" +#include "etlng/ETLServiceInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "rpc/Errors.hpp" #include "rpc/JS.hpp" @@ -49,10 +51,6 @@ #include #include -namespace etl { -class ETLService; -class LoadBalancer; -} // namespace etl namespace rpc { class Counters; } // namespace rpc @@ -62,18 +60,16 @@ namespace rpc { /** * @brief Contains common functionality for handling the `server_info` command * - * @tparam LoadBalancerType The type of the load balancer - * @tparam ETLServiceType The type of the ETL service * @tparam CountersType The type of the counters */ -template +template class BaseServerInfoHandler { static constexpr auto kBACKEND_COUNTERS_KEY = "backend_counters"; std::shared_ptr backend_; std::shared_ptr subscriptions_; - std::shared_ptr balancer_; - std::shared_ptr etl_; + std::shared_ptr balancer_; + std::shared_ptr etl_; std::reference_wrapper counters_; public: @@ -158,8 +154,8 @@ public: BaseServerInfoHandler( std::shared_ptr const& backend, std::shared_ptr const& subscriptions, - std::shared_ptr const& balancer, - std::shared_ptr const& etl, + std::shared_ptr const& balancer, + std::shared_ptr const& etl, CountersType const& counters ) : backend_(backend) @@ -352,6 +348,6 @@ private: * * For more details see: https://xrpl.org/server_info-clio.html */ -using ServerInfoHandler = BaseServerInfoHandler; +using ServerInfoHandler = BaseServerInfoHandler; } // namespace rpc diff --git a/src/rpc/handlers/Tx.hpp b/src/rpc/handlers/Tx.hpp index b05a64c1e..acc5de4a3 100644 --- a/src/rpc/handlers/Tx.hpp +++ b/src/rpc/handlers/Tx.hpp @@ -22,6 +22,7 @@ #include "data/BackendInterface.hpp" #include "data/Types.hpp" #include "etl/ETLService.hpp" +#include "etlng/ETLServiceInterface.hpp" #include "rpc/Errors.hpp" #include "rpc/JS.hpp" #include "rpc/RPCHelpers.hpp" @@ -52,14 +53,13 @@ namespace rpc { /** - * @brief Contains common functionality for handling the `tx` command + * @brief The tx method retrieves information on a single transaction, by its identifying hash. * - * @tparam ETLServiceType The type of the ETL service to use + * For more details see: https://xrpl.org/tx.html */ -template -class BaseTxHandler { +class TxHandler { std::shared_ptr sharedPtrBackend_; - std::shared_ptr etl_; + std::shared_ptr etl_; public: /** @@ -95,14 +95,14 @@ public: using Result = HandlerReturnType; /** - * @brief Construct a new BaseTxHandler object + * @brief Construct a new TxHandler object * * @param sharedPtrBackend The backend to use * @param etl The ETL service to use */ - BaseTxHandler( + TxHandler( std::shared_ptr const& sharedPtrBackend, - std::shared_ptr const& etl + std::shared_ptr const& etl ) : sharedPtrBackend_(sharedPtrBackend), etl_(etl) { @@ -183,7 +183,7 @@ public: dbResponse = sharedPtrBackend_->fetchTransaction(ripple::uint256{input.transaction->c_str()}, ctx.yield); } - auto output = BaseTxHandler::Output{.apiVersion = ctx.apiVersion}; + auto output = TxHandler::Output{.apiVersion = ctx.apiVersion}; if (!dbResponse) { if (rangeSupplied && input.transaction) // ranges not for ctid @@ -320,7 +320,7 @@ private: friend Input tag_invoke(boost::json::value_to_tag, boost::json::value const& jv) { - auto input = BaseTxHandler::Input{}; + auto input = TxHandler::Input{}; auto const& jsonObject = jv.as_object(); if (jsonObject.contains(JS(transaction))) @@ -344,10 +344,4 @@ private: } }; -/** - * @brief The tx method retrieves information on a single transaction, by its identifying hash. - * - * For more details see: https://xrpl.org/tx.html - */ -using TxHandler = BaseTxHandler; } // namespace rpc diff --git a/src/util/newconfig/ConfigDefinition.hpp b/src/util/newconfig/ConfigDefinition.hpp index 030b8d160..b00cbd1cb 100644 --- a/src/util/newconfig/ConfigDefinition.hpp +++ b/src/util/newconfig/ConfigDefinition.hpp @@ -293,7 +293,7 @@ static ClioConfigDefinition gClioConfig = ClioConfigDefinition{ {"database.cassandra.certfile", ConfigValue{ConfigType::String}.optional()}, {"allow_no_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, - + {"__ng_etl", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, {"etl_sources.[].ip", Array{ConfigValue{ConfigType::String}.optional().withConstraint(gValidateIp)}}, {"etl_sources.[].ws_port", Array{ConfigValue{ConfigType::String}.optional().withConstraint(gValidatePort)}}, {"etl_sources.[].grpc_port", Array{ConfigValue{ConfigType::String}.optional().withConstraint(gValidatePort)}}, diff --git a/src/web/RPCServerHandler.hpp b/src/web/RPCServerHandler.hpp index 632d9551e..a0abacb7c 100644 --- a/src/web/RPCServerHandler.hpp +++ b/src/web/RPCServerHandler.hpp @@ -20,6 +20,7 @@ #pragma once #include "data/BackendInterface.hpp" +#include "etlng/ETLServiceInterface.hpp" #include "rpc/Errors.hpp" #include "rpc/Factories.hpp" #include "rpc/JS.hpp" @@ -58,11 +59,11 @@ namespace web { * * Note: see @ref web::SomeServerHandler concept */ -template +template class RPCServerHandler { std::shared_ptr const backend_; std::shared_ptr const rpcEngine_; - std::shared_ptr const etl_; + std::shared_ptr const etl_; util::TagDecoratorFactory const tagFactory_; rpc::impl::ProductionAPIVersionParser apiVersionParser_; // can be injected if needed @@ -82,7 +83,7 @@ public: util::config::ClioConfigDefinition const& config, std::shared_ptr const& backend, std::shared_ptr const& rpcEngine, - std::shared_ptr const& etl + std::shared_ptr const& etl ) : backend_(backend) , rpcEngine_(rpcEngine) diff --git a/src/web/ng/RPCServerHandler.hpp b/src/web/ng/RPCServerHandler.hpp index f8dbcb183..fec8a22f7 100644 --- a/src/web/ng/RPCServerHandler.hpp +++ b/src/web/ng/RPCServerHandler.hpp @@ -20,6 +20,7 @@ #pragma once #include "data/BackendInterface.hpp" +#include "etlng/ETLServiceInterface.hpp" #include "rpc/Errors.hpp" #include "rpc/Factories.hpp" #include "rpc/JS.hpp" @@ -64,11 +65,11 @@ namespace web::ng { * * Note: see @ref web::SomeServerHandler concept */ -template +template class RPCServerHandler { std::shared_ptr const backend_; std::shared_ptr const rpcEngine_; - std::shared_ptr const etl_; + std::shared_ptr const etl_; util::TagDecoratorFactory const tagFactory_; rpc::impl::ProductionAPIVersionParser apiVersionParser_; // can be injected if needed @@ -88,7 +89,7 @@ public: util::config::ClioConfigDefinition const& config, std::shared_ptr const& backend, std::shared_ptr const& rpcEngine, - std::shared_ptr const& etl + std::shared_ptr const& etl ) : backend_(backend) , rpcEngine_(rpcEngine) diff --git a/tests/common/util/MockETLService.hpp b/tests/common/util/MockETLService.hpp index 618e5ab57..f398ad9cd 100644 --- a/tests/common/util/MockETLService.hpp +++ b/tests/common/util/MockETLService.hpp @@ -20,21 +20,21 @@ #pragma once #include "etl/ETLState.hpp" +#include "etlng/ETLServiceInterface.hpp" #include #include #include -#include #include #include -struct MockETLService { - MOCK_METHOD(boost::json::object, getInfo, (), (const)); - MOCK_METHOD(std::chrono::time_point, getLastPublish, (), (const)); - MOCK_METHOD(std::uint32_t, lastPublishAgeSeconds, (), (const)); - MOCK_METHOD(std::uint32_t, lastCloseAgeSeconds, (), (const)); - MOCK_METHOD(bool, isAmendmentBlocked, (), (const)); - MOCK_METHOD(bool, isCorruptionDetected, (), (const)); - MOCK_METHOD(std::optional, getETLState, (), (const)); +struct MockETLService : etlng::ETLServiceInterface { + MOCK_METHOD(void, run, (), (override)); + MOCK_METHOD(void, stop, (), (override)); + MOCK_METHOD(boost::json::object, getInfo, (), (const, override)); + MOCK_METHOD(std::uint32_t, lastCloseAgeSeconds, (), (const, override)); + MOCK_METHOD(bool, isAmendmentBlocked, (), (const, override)); + MOCK_METHOD(bool, isCorruptionDetected, (), (const, override)); + MOCK_METHOD(std::optional, getETLState, (), (const, override)); }; diff --git a/tests/common/util/MockLoadBalancer.hpp b/tests/common/util/MockLoadBalancer.hpp index 12bcfea78..f0b9dd313 100644 --- a/tests/common/util/MockLoadBalancer.hpp +++ b/tests/common/util/MockLoadBalancer.hpp @@ -38,22 +38,6 @@ #include #include -struct MockLoadBalancer { - using RawLedgerObjectType = FakeLedgerObject; - - MOCK_METHOD(void, loadInitialLedger, (std::uint32_t, bool), ()); - MOCK_METHOD(std::optional, fetchLedger, (uint32_t, bool, bool), ()); - MOCK_METHOD(boost::json::value, toJson, (), (const)); - - using ForwardToRippledReturnType = std::expected; - MOCK_METHOD( - ForwardToRippledReturnType, - forwardToRippled, - (boost::json::object const&, std::optional const&, bool, boost::asio::yield_context), - (const) - ); -}; - struct MockNgLoadBalancer : etlng::LoadBalancerInterface { using RawLedgerObjectType = FakeLedgerObject; @@ -85,4 +69,7 @@ struct MockNgLoadBalancer : etlng::LoadBalancerInterface { (boost::json::object const&, std::optional const&, bool, boost::asio::yield_context), (override) ); + MOCK_METHOD(void, stop, (boost::asio::yield_context), ()); }; + +using MockLoadBalancer = MockNgLoadBalancer; diff --git a/tests/common/util/MockSource.hpp b/tests/common/util/MockSource.hpp index 58984d485..a982f3aef 100644 --- a/tests/common/util/MockSource.hpp +++ b/tests/common/util/MockSource.hpp @@ -60,7 +60,7 @@ struct MockSource : etl::SourceBase { (uint32_t, bool, bool), (override) ); - MOCK_METHOD((std::pair, bool>), loadInitialLedger, (uint32_t, uint32_t, bool), (override)); + MOCK_METHOD((std::pair, bool>), loadInitialLedger, (uint32_t, uint32_t), (override)); using ForwardToRippledReturnType = std::expected; MOCK_METHOD( @@ -132,9 +132,9 @@ public: } std::pair, bool> - loadInitialLedger(uint32_t sequence, uint32_t maxLedger, bool getObjects) override + loadInitialLedger(uint32_t sequence, uint32_t maxLedger) override { - return mock_->loadInitialLedger(sequence, maxLedger, getObjects); + return mock_->loadInitialLedger(sequence, maxLedger); } std::expected diff --git a/tests/common/util/MockSourceNg.hpp b/tests/common/util/MockSourceNg.hpp new file mode 100644 index 000000000..7137e47ec --- /dev/null +++ b/tests/common/util/MockSourceNg.hpp @@ -0,0 +1,245 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2023, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== +#pragma once + +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/Source.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "rpc/Errors.hpp" +#include "util/newconfig/ObjectView.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +struct MockSourceNg : etlng::SourceBase { + MOCK_METHOD(void, run, (), (override)); + MOCK_METHOD(void, stop, (boost::asio::yield_context), (override)); + MOCK_METHOD(bool, isConnected, (), (const, override)); + MOCK_METHOD(void, setForwarding, (bool), (override)); + MOCK_METHOD(boost::json::object, toJson, (), (const, override)); + MOCK_METHOD(std::string, toString, (), (const, override)); + MOCK_METHOD(bool, hasLedger, (uint32_t), (const, override)); + MOCK_METHOD( + (std::pair), + fetchLedger, + (uint32_t, bool, bool), + (override) + ); + MOCK_METHOD( + (std::pair, bool>), + loadInitialLedger, + (uint32_t, uint32_t, etlng::InitialLoadObserverInterface&), + (override) + ); + + using ForwardToRippledReturnType = std::expected; + MOCK_METHOD( + ForwardToRippledReturnType, + forwardToRippled, + (boost::json::object const&, std::optional const&, std::string_view, boost::asio::yield_context), + (const, override) + ); +}; + +template