diff --git a/src/app/ClioApplication.cpp b/src/app/ClioApplication.cpp index 898c3817..903089d6 100644 --- a/src/app/ClioApplication.cpp +++ b/src/app/ClioApplication.cpp @@ -36,6 +36,7 @@ #include "rpc/RPCEngine.hpp" #include "rpc/WorkQueue.hpp" #include "rpc/common/impl/HandlerProvider.hpp" +#include "util/async/context/BasicExecutionContext.hpp" #include "util/build/Build.hpp" #include "util/config/ConfigDefinition.hpp" #include "util/log/Logger.hpp" @@ -103,6 +104,10 @@ ClioApplication::run(bool const useNgWebServer) // This is not the only io context in the application. boost::asio::io_context ioc{threads}; + // Similarly we need a context to run ETLng on + // In the future we can remove the raw ioc and use ctx instead + util::async::CoroExecutionContext ctx{threads}; + // Rate limiter, to prevent abuse auto whitelistHandler = web::dosguard::WhitelistHandler{config_}; auto const dosguardWeights = web::dosguard::Weights::make(config_); @@ -146,7 +151,7 @@ ClioApplication::run(bool const useNgWebServer) }(); // ETL is responsible for writing and publishing to streams. In read-only mode, ETL only publishes - auto etl = etl::ETLService::makeETLService(config_, ioc, backend, subscriptions, balancer, ledgers); + auto etl = etl::ETLService::makeETLService(config_, ioc, ctx, backend, subscriptions, balancer, ledgers); auto workQueue = rpc::WorkQueue::makeWorkQueue(config_); auto counters = rpc::Counters::makeCounters(workQueue); diff --git a/src/etl/CacheLoader.hpp b/src/etl/CacheLoader.hpp index 3e6d4a07..feaf592a 100644 --- a/src/etl/CacheLoader.hpp +++ b/src/etl/CacheLoader.hpp @@ -26,6 +26,7 @@ #include "etl/impl/CursorFromAccountProvider.hpp" #include "etl/impl/CursorFromDiffProvider.hpp" #include "etl/impl/CursorFromFixDiffNumProvider.hpp" +#include "etlng/CacheLoaderInterface.hpp" #include "util/Assert.hpp" #include "util/async/context/BasicExecutionContext.hpp" #include "util/log/Logger.hpp" @@ -33,6 +34,7 @@ #include #include #include +#include namespace etl { @@ -46,7 +48,7 @@ namespace etl { * @tparam ExecutionContextType The type of the execution context to use */ template -class CacheLoader { +class CacheLoader : public etlng::CacheLoaderInterface { using CacheLoaderType = impl::CacheLoaderImpl; util::Logger log_{"ETL"}; @@ -67,10 +69,13 @@ public: */ CacheLoader( util::config::ClioConfigDefinition const& config, - std::shared_ptr const& backend, + std::shared_ptr backend, data::LedgerCacheInterface& cache ) - : backend_{backend}, cache_{cache}, settings_{makeCacheLoaderSettings(config)}, ctx_{settings_.numThreads} + : backend_{std::move(backend)} + , cache_{cache} + , settings_{makeCacheLoaderSettings(config)} + , ctx_{settings_.numThreads} { } @@ -83,7 +88,7 @@ public: * @param seq The sequence number to load cache for */ void - load(uint32_t const seq) + load(uint32_t const seq) override { ASSERT(not cache_.get().isFull(), "Cache must not be full. seq = {}", seq); @@ -129,7 +134,7 @@ public: * @brief Requests the loader to stop asap */ void - stop() noexcept + stop() noexcept override { if (loader_ != nullptr) loader_->stop(); @@ -139,7 +144,7 @@ public: * @brief Waits for the loader to finish background work */ void - wait() noexcept + wait() noexcept override { if (loader_ != nullptr) loader_->wait(); diff --git a/src/etl/ETLService.cpp b/src/etl/ETLService.cpp index 6211e5ed..ac1bec78 100644 --- a/src/etl/ETLService.cpp +++ b/src/etl/ETLService.cpp @@ -20,17 +20,38 @@ #include "etl/ETLService.hpp" #include "data/BackendInterface.hpp" +#include "etl/CacheLoader.hpp" #include "etl/CorruptionDetector.hpp" +#include "etl/ETLState.hpp" +#include "etl/LoadBalancer.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etl/SystemState.hpp" +#include "etl/impl/AmendmentBlockHandler.hpp" +#include "etl/impl/ExtractionDataPipe.hpp" +#include "etl/impl/Extractor.hpp" +#include "etl/impl/LedgerFetcher.hpp" +#include "etl/impl/LedgerLoader.hpp" +#include "etl/impl/LedgerPublisher.hpp" +#include "etl/impl/Transformer.hpp" +#include "etlng/ETLService.hpp" +#include "etlng/ETLServiceInterface.hpp" +#include "etlng/LoadBalancer.hpp" #include "etlng/LoadBalancerInterface.hpp" +#include "etlng/impl/LedgerPublisher.hpp" +#include "etlng/impl/TaskManagerProvider.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "util/Assert.hpp" #include "util/Constants.hpp" +#include "util/async/AnyExecutionContext.hpp" #include "util/config/ConfigDefinition.hpp" #include "util/log/Logger.hpp" #include +#include +#include +#include #include +#include #include #include @@ -45,6 +66,75 @@ namespace etl { +std::shared_ptr +ETLService::makeETLService( + util::config::ClioConfigDefinition const& config, + boost::asio::io_context& ioc, + util::async::AnyExecutionContext ctx, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer, + std::shared_ptr ledgers +) +{ + std::shared_ptr ret; + + if (config.get("__ng_etl")) { + ASSERT( + std::dynamic_pointer_cast(balancer), "LoadBalancer type must be etlng::LoadBalancer" + ); + + auto state = std::make_shared(); + + auto fetcher = std::make_shared(backend, balancer); + auto extractor = std::make_shared(fetcher); + auto publisher = std::make_shared(ioc, backend, subscriptions, *state); + auto cacheLoader = std::make_shared>(config, backend, backend->cache()); + auto cacheUpdater = std::make_shared(backend->cache()); + auto amendmentBlockHandler = std::make_shared(ctx, *state); + + auto loader = std::make_shared( + backend, + etlng::impl::makeRegistry( + *state, + etlng::impl::CacheExt{cacheUpdater}, + etlng::impl::CoreExt{backend}, + etlng::impl::SuccessorExt{backend, backend->cache()}, + etlng::impl::NFTExt{backend} + ), + amendmentBlockHandler + ); + + auto taskManagerProvider = std::make_shared(*ledgers, extractor, loader); + + ret = std::make_shared( + ctx, + config, + backend, + balancer, + ledgers, + publisher, + cacheLoader, + cacheUpdater, + extractor, + loader, // loader itself + loader, // initial load observer + taskManagerProvider, + state + ); + } else { + ASSERT(std::dynamic_pointer_cast(balancer), "LoadBalancer type must be etl::LoadBalancer"); + ret = std::make_shared(config, ioc, backend, subscriptions, balancer, ledgers); + } + + // inject networkID into subscriptions, as transaction feed require it to inject CTID in response + if (auto const state = ret->getETLState(); state) + subscriptions->setNetworkID(state->networkID); + + ret->run(); + return ret; +} + // Database must be populated when this starts std::optional ETLService::runETLPipeline(uint32_t startSequence, uint32_t numExtractors) diff --git a/src/etl/ETLService.hpp b/src/etl/ETLService.hpp index 964258db..af9a3442 100644 --- a/src/etl/ETLService.hpp +++ b/src/etl/ETLService.hpp @@ -22,7 +22,6 @@ #include "data/BackendInterface.hpp" #include "etl/CacheLoader.hpp" #include "etl/ETLState.hpp" -#include "etl/LoadBalancer.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/SystemState.hpp" #include "etl/impl/AmendmentBlockHandler.hpp" @@ -32,12 +31,12 @@ #include "etl/impl/LedgerLoader.hpp" #include "etl/impl/LedgerPublisher.hpp" #include "etl/impl/Transformer.hpp" -#include "etlng/ETLService.hpp" #include "etlng/ETLServiceInterface.hpp" -#include "etlng/LoadBalancer.hpp" #include "etlng/LoadBalancerInterface.hpp" +#include "etlng/impl/LedgerPublisher.hpp" +#include "etlng/impl/TaskManagerProvider.hpp" #include "feed/SubscriptionManagerInterface.hpp" -#include "util/Assert.hpp" +#include "util/async/AnyExecutionContext.hpp" #include "util/log/Logger.hpp" #include @@ -150,6 +149,7 @@ public: * * @param config The configuration to use * @param ioc io context to run on + * @param ctx Execution context for asynchronous operations * @param backend BackendInterface implementation * @param subscriptions Subscription manager * @param balancer Load balancer to use @@ -160,34 +160,12 @@ public: makeETLService( util::config::ClioConfigDefinition const& config, boost::asio::io_context& ioc, + util::async::AnyExecutionContext ctx, std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr ledgers - ) - { - std::shared_ptr ret; - - if (config.get("__ng_etl")) { - ASSERT( - std::dynamic_pointer_cast(balancer), - "LoadBalancer type must be etlng::LoadBalancer" - ); - ret = std::make_shared(config, backend, subscriptions, balancer, ledgers); - } else { - ASSERT( - std::dynamic_pointer_cast(balancer), "LoadBalancer type must be etl::LoadBalancer" - ); - ret = std::make_shared(config, ioc, backend, subscriptions, balancer, ledgers); - } - - // inject networkID into subscriptions, as transaction feed require it to inject CTID in response - if (auto const state = ret->getETLState(); state) - subscriptions->setNetworkID(state->networkID); - - ret->run(); - return ret; - } + ); /** * @brief Stops components and joins worker thread. diff --git a/src/etl/impl/LedgerPublisher.hpp b/src/etl/impl/LedgerPublisher.hpp index 4c21b4c2..e0960c5c 100644 --- a/src/etl/impl/LedgerPublisher.hpp +++ b/src/etl/impl/LedgerPublisher.hpp @@ -24,6 +24,7 @@ #include "data/LedgerCacheInterface.hpp" #include "data/Types.hpp" #include "etl/SystemState.hpp" +#include "etlng/LedgerPublisherInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "util/Assert.hpp" #include "util/log/Logger.hpp" @@ -31,6 +32,7 @@ #include "util/prometheus/Prometheus.hpp" #include +#include #include #include #include @@ -66,7 +68,7 @@ namespace etl::impl { * includes reading all of the transactions from the database) is done from the application wide asio io_service, and a * strand is used to ensure ledgers are published in order. */ -class LedgerPublisher { +class LedgerPublisher : public etlng::LedgerPublisherInterface { util::Logger log_{"ETL"}; boost::asio::strand publishStrand_; @@ -121,7 +123,7 @@ public: uint32_t ledgerSequence, std::optional maxAttempts, std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1} - ) + ) override { LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence; size_t numAttempts = 0; @@ -235,7 +237,7 @@ public: * @brief Get time passed since last publish, in seconds */ std::uint32_t - lastPublishAgeSeconds() const + lastPublishAgeSeconds() const override { return std::chrono::duration_cast(std::chrono::system_clock::now() - getLastPublish()) .count(); @@ -245,7 +247,7 @@ public: * @brief Get last publish time as a time point */ std::chrono::time_point - getLastPublish() const + getLastPublish() const override { return std::chrono::time_point{std::chrono::seconds{lastPublishSeconds_.get().value() }}; @@ -255,7 +257,7 @@ public: * @brief Get time passed since last ledger close, in seconds */ std::uint32_t - lastCloseAgeSeconds() const + lastCloseAgeSeconds() const override { std::shared_lock const lck(closeTimeMtx_); auto now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) diff --git a/src/etlng/CMakeLists.txt b/src/etlng/CMakeLists.txt index 49e443a3..d0640b54 100644 --- a/src/etlng/CMakeLists.txt +++ b/src/etlng/CMakeLists.txt @@ -2,7 +2,8 @@ add_library(clio_etlng) target_sources( clio_etlng - PRIVATE LoadBalancer.cpp + PRIVATE ETLService.cpp + LoadBalancer.cpp Source.cpp impl/AmendmentBlockHandler.cpp impl/AsyncGrpcCall.cpp diff --git a/src/etlng/CacheLoaderInterface.hpp b/src/etlng/CacheLoaderInterface.hpp new file mode 100644 index 00000000..e90adeff --- /dev/null +++ b/src/etlng/CacheLoaderInterface.hpp @@ -0,0 +1,53 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include + +namespace etlng { + +/** + * @brief An interface for the Cache Loader + */ +struct CacheLoaderInterface { + virtual ~CacheLoaderInterface() = default; + + /** + * @brief Load the cache with the most recent ledger data + * + * @param seq The sequence number of the ledger to load + */ + virtual void + load(uint32_t const seq) = 0; + + /** + * @brief Stop the cache loading process + */ + virtual void + stop() noexcept = 0; + + /** + * @brief Wait for all cache loading tasks to complete + */ + virtual void + wait() noexcept = 0; +}; + +} // namespace etlng diff --git a/src/etlng/CacheUpdaterInterface.hpp b/src/etlng/CacheUpdaterInterface.hpp new file mode 100644 index 00000000..dc31cea3 --- /dev/null +++ b/src/etlng/CacheUpdaterInterface.hpp @@ -0,0 +1,66 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/Types.hpp" +#include "etlng/Models.hpp" + +#include +#include + +namespace etlng { + +/** + * @brief An interface for the Cache Updater + */ +struct CacheUpdaterInterface { + virtual ~CacheUpdaterInterface() = default; + + /** + * @brief Update the cache with ledger data + * @param data The ledger data to update with + */ + virtual void + update(model::LedgerData const& data) = 0; + + /** + * @brief Update the cache with ledger objects at a specific sequence + * @param seq The ledger sequence number + * @param objs The ledger objects to update with + */ + virtual void + update(uint32_t seq, std::vector const& objs) = 0; + + /** + * @brief Update the cache with model objects at a specific sequence + * @param seq The ledger sequence number + * @param objs The model objects to update with + */ + virtual void + update(uint32_t seq, std::vector const& objs) = 0; + + /** + * @brief Mark the cache as fully loaded + */ + virtual void + setFull() = 0; +}; + +} // namespace etlng diff --git a/src/etlng/ETLService.cpp b/src/etlng/ETLService.cpp new file mode 100644 index 00000000..0116f0a4 --- /dev/null +++ b/src/etlng/ETLService.cpp @@ -0,0 +1,275 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "etlng/ETLService.hpp" + +#include "data/BackendInterface.hpp" +#include "data/LedgerCacheInterface.hpp" +#include "data/Types.hpp" +#include "etl/ETLState.hpp" +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etl/SystemState.hpp" +#include "etl/impl/AmendmentBlockHandler.hpp" +#include "etl/impl/LedgerFetcher.hpp" +#include "etlng/CacheLoaderInterface.hpp" +#include "etlng/CacheUpdaterInterface.hpp" +#include "etlng/ExtractorInterface.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/LedgerPublisherInterface.hpp" +#include "etlng/LoadBalancerInterface.hpp" +#include "etlng/LoaderInterface.hpp" +#include "etlng/MonitorInterface.hpp" +#include "etlng/TaskManagerProviderInterface.hpp" +#include "etlng/impl/AmendmentBlockHandler.hpp" +#include "etlng/impl/CacheUpdater.hpp" +#include "etlng/impl/Extraction.hpp" +#include "etlng/impl/LedgerPublisher.hpp" +#include "etlng/impl/Loading.hpp" +#include "etlng/impl/Monitor.hpp" +#include "etlng/impl/Registry.hpp" +#include "etlng/impl/Scheduling.hpp" +#include "etlng/impl/TaskManager.hpp" +#include "etlng/impl/ext/Cache.hpp" +#include "etlng/impl/ext/Core.hpp" +#include "etlng/impl/ext/NFT.hpp" +#include "etlng/impl/ext/Successor.hpp" +#include "util/Assert.hpp" +#include "util/Profiler.hpp" +#include "util/async/AnyExecutionContext.hpp" +#include "util/log/Logger.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace etlng { + +ETLService::ETLService( + util::async::AnyExecutionContext ctx, + std::reference_wrapper config, + std::shared_ptr backend, + std::shared_ptr balancer, + std::shared_ptr ledgers, + std::shared_ptr publisher, + std::shared_ptr cacheLoader, + std::shared_ptr cacheUpdater, + std::shared_ptr extractor, + std::shared_ptr loader, + std::shared_ptr initialLoadObserver, + std::shared_ptr taskManagerProvider, + std::shared_ptr state +) + : ctx_(std::move(ctx)) + , config_(config) + , backend_(std::move(backend)) + , balancer_(std::move(balancer)) + , ledgers_(std::move(ledgers)) + , publisher_(std::move(publisher)) + , cacheLoader_(std::move(cacheLoader)) + , cacheUpdater_(std::move(cacheUpdater)) + , extractor_(std::move(extractor)) + , loader_(std::move(loader)) + , initialLoadObserver_(std::move(initialLoadObserver)) + , taskManagerProvider_(std::move(taskManagerProvider)) + , state_(std::move(state)) +{ + LOG(log_.info()) << "Creating ETLng..."; +} + +ETLService::~ETLService() +{ + stop(); + LOG(log_.debug()) << "Destroying ETLng"; +} + +void +ETLService::run() +{ + LOG(log_.info()) << "Running ETLng..."; + + // TODO: write-enabled node should start in readonly and do the 10 second dance to become a writer + mainLoop_.emplace(ctx_.execute([this] { + state_->isWriting = + not state_->isReadOnly; // TODO: this is now needed because we don't have a mechanism for readonly or + // ETL writer node. remove later in favor of real mechanism + + auto const rng = loadInitialLedgerIfNeeded(); + + LOG(log_.info()) << "Waiting for next ledger to be validated by network..."; + std::optional const mostRecentValidated = ledgers_->getMostRecent(); + + if (not mostRecentValidated) { + LOG(log_.info()) << "The wait for the next validated ledger has been aborted. " + "Exiting monitor loop"; + return; + } + + ASSERT(rng.has_value(), "Ledger range can't be null"); + auto const nextSequence = rng->maxSequence + 1; + + LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence; + startMonitor(nextSequence); + + // TODO: we only want to run the full ETL task man if we are POSSIBLY a write node + // but definitely not in strict readonly + if (not state_->isReadOnly) + startLoading(nextSequence); + })); +} + +void +ETLService::stop() +{ + LOG(log_.info()) << "Stop called"; + + if (taskMan_) + taskMan_->stop(); + if (monitor_) + monitor_->stop(); +} + +boost::json::object +ETLService::getInfo() const +{ + boost::json::object result; + + result["etl_sources"] = balancer_->toJson(); + result["is_writer"] = static_cast(state_->isWriting); + result["read_only"] = static_cast(state_->isReadOnly); + auto last = publisher_->getLastPublish(); + if (last.time_since_epoch().count() != 0) + result["last_publish_age_seconds"] = std::to_string(publisher_->lastPublishAgeSeconds()); + return result; +} + +bool +ETLService::isAmendmentBlocked() const +{ + return state_->isAmendmentBlocked; +} + +bool +ETLService::isCorruptionDetected() const +{ + return state_->isCorruptionDetected; +} + +std::optional +ETLService::getETLState() const +{ + return balancer_->getETLState(); +} + +std::uint32_t +ETLService::lastCloseAgeSeconds() const +{ + return publisher_->lastCloseAgeSeconds(); +} + +std::optional +ETLService::loadInitialLedgerIfNeeded() +{ + auto rng = backend_->hardFetchLedgerRangeNoThrow(); + if (not rng.has_value()) { + LOG(log_.info()) << "Database is empty. Will download a ledger from the network."; + + LOG(log_.info()) << "Waiting for next ledger to be validated by network..."; + if (auto const mostRecentValidated = ledgers_->getMostRecent(); mostRecentValidated.has_value()) { + auto const seq = *mostRecentValidated; + LOG(log_.info()) << "Ledger " << seq << " has been validated. Downloading... "; + + auto [ledger, timeDiff] = ::util::timed>([this, seq]() { + return extractor_->extractLedgerOnly(seq).and_then([this, seq](auto&& data) { + // TODO: loadInitialLedger in balancer should be called fetchEdgeKeys or similar + data.edgeKeys = balancer_->loadInitialLedger(seq, *initialLoadObserver_); + + // TODO: this should be interruptible for graceful shutdown + return loader_->loadInitialLedger(data); + }); + }); + + if (not ledger.has_value()) { + LOG(log_.error()) << "Failed to load initial ledger. Exiting monitor loop"; + return std::nullopt; + } + + LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff; + LOG(log_.info()) << "Finished loadInitialLedger. cache size = " << backend_->cache().size(); + + return backend_->hardFetchLedgerRangeNoThrow(); + } + + LOG(log_.info()) << "The wait for the next validated ledger has been aborted. " + "Exiting monitor loop"; + return std::nullopt; + } + + LOG(log_.info()) << "Database already populated. Picking up from the tip of history"; + cacheLoader_->load(rng->maxSequence); + + return rng; +} + +void +ETLService::startMonitor(uint32_t seq) +{ + monitor_ = std::make_unique(ctx_, backend_, ledgers_, seq); + monitorSubscription_ = monitor_->subscribe([this](uint32_t seq) { + log_.info() << "MONITOR got new seq from db: " << seq; + + // FIXME: is this the best way? + if (not state_->isWriting) { + auto const diff = data::synchronousAndRetryOnTimeout([this, seq](auto yield) { + return backend_->fetchLedgerDiff(seq, yield); + }); + cacheUpdater_->update(seq, diff); + } + + publisher_->publish(seq, {}); + }); + monitor_->run(); +} + +void +ETLService::startLoading(uint32_t seq) +{ + taskMan_ = taskManagerProvider_->make(ctx_, *monitor_, seq); + taskMan_->run(config_.get().get("extractor_threads")); +} + +} // namespace etlng diff --git a/src/etlng/ETLService.hpp b/src/etlng/ETLService.hpp index 5a54f14e..168db6f9 100644 --- a/src/etlng/ETLService.hpp +++ b/src/etlng/ETLService.hpp @@ -20,22 +20,27 @@ #pragma once #include "data/BackendInterface.hpp" -#include "data/LedgerCache.hpp" #include "data/Types.hpp" -#include "etl/CacheLoader.hpp" #include "etl/ETLState.hpp" -#include "etl/LedgerFetcherInterface.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/SystemState.hpp" #include "etl/impl/AmendmentBlockHandler.hpp" #include "etl/impl/LedgerFetcher.hpp" -#include "etl/impl/LedgerPublisher.hpp" -#include "etlng/AmendmentBlockHandlerInterface.hpp" +#include "etlng/CacheLoaderInterface.hpp" +#include "etlng/CacheUpdaterInterface.hpp" #include "etlng/ETLServiceInterface.hpp" #include "etlng/ExtractorInterface.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/LedgerPublisherInterface.hpp" #include "etlng/LoadBalancerInterface.hpp" +#include "etlng/LoaderInterface.hpp" +#include "etlng/MonitorInterface.hpp" +#include "etlng/TaskManagerInterface.hpp" +#include "etlng/TaskManagerProviderInterface.hpp" #include "etlng/impl/AmendmentBlockHandler.hpp" +#include "etlng/impl/CacheUpdater.hpp" #include "etlng/impl/Extraction.hpp" +#include "etlng/impl/LedgerPublisher.hpp" #include "etlng/impl/Loading.hpp" #include "etlng/impl/Monitor.hpp" #include "etlng/impl/Registry.hpp" @@ -45,14 +50,14 @@ #include "etlng/impl/ext/Core.hpp" #include "etlng/impl/ext/NFT.hpp" #include "etlng/impl/ext/Successor.hpp" -#include "feed/SubscriptionManagerInterface.hpp" -#include "util/Assert.hpp" -#include "util/Profiler.hpp" -#include "util/async/context/BasicExecutionContext.hpp" +#include "util/async/AnyExecutionContext.hpp" +#include "util/async/AnyOperation.hpp" #include "util/config/ConfigDefinition.hpp" #include "util/log/Logger.hpp" +#include #include +#include #include #include #include @@ -64,15 +69,12 @@ #include #include -#include +#include #include +#include #include #include -#include -#include #include -#include -#include namespace etlng { @@ -92,191 +94,94 @@ namespace etlng { class ETLService : public ETLServiceInterface { util::Logger log_{"ETL"}; + util::async::AnyExecutionContext ctx_; + std::reference_wrapper config_; std::shared_ptr backend_; - std::shared_ptr subscriptions_; - std::shared_ptr balancer_; + std::shared_ptr balancer_; std::shared_ptr ledgers_; - std::shared_ptr> cacheLoader_; - - std::shared_ptr fetcher_; + std::shared_ptr publisher_; + std::shared_ptr cacheLoader_; + std::shared_ptr cacheUpdater_; std::shared_ptr extractor_; + std::shared_ptr loader_; + std::shared_ptr initialLoadObserver_; + std::shared_ptr taskManagerProvider_; + std::shared_ptr state_; - etl::SystemState state_; - util::async::CoroExecutionContext ctx_{8}; + std::unique_ptr monitor_; + std::unique_ptr taskMan_; - std::shared_ptr amendmentBlockHandler_; - std::shared_ptr loader_; + boost::signals2::scoped_connection monitorSubscription_; - std::optional> mainLoop_; + std::optional> mainLoop_; public: /** * @brief Create an instance of ETLService. * - * @param config The configuration to use - * @param backend BackendInterface implementation - * @param subscriptions Subscription manager - * @param balancer Load balancer to use - * @param ledgers The network validated ledgers datastructure + * @param ctx The execution context for asynchronous operations + * @param config The Clio configuration definition + * @param backend Interface to the backend database + * @param balancer Load balancer for distributing work + * @param ledgers Interface for accessing network validated ledgers + * @param publisher Interface for publishing ledger data + * @param cacheLoader Interface for loading cache data + * @param cacheUpdater Interface for updating cache data + * @param extractor The extractor to use + * @param loader Interface for loading data + * @param initialLoadObserver The observer for initial data loading + * @param taskManagerProvider The provider of the task manager instance + * @param state System state tracking object */ ETLService( - util::config::ClioConfigDefinition const& config, - std::shared_ptr backend, - std::shared_ptr subscriptions, - std::shared_ptr balancer, - std::shared_ptr ledgers - ) - : backend_(std::move(backend)) - , subscriptions_(std::move(subscriptions)) - , balancer_(std::move(balancer)) - , ledgers_(std::move(ledgers)) - , cacheLoader_(std::make_shared>(config, backend_, backend_->cache())) - , fetcher_(std::make_shared(backend_, balancer_)) - , extractor_(std::make_shared(fetcher_)) - , amendmentBlockHandler_(std::make_shared(ctx_, state_)) - , loader_(std::make_shared( - backend_, - fetcher_, - impl::makeRegistry( - impl::CacheExt{backend_->cache()}, - impl::CoreExt{backend_}, - impl::SuccessorExt{backend_, backend_->cache()}, - impl::NFTExt{backend_} - ), - amendmentBlockHandler_ - )) - { - LOG(log_.info()) << "Creating ETLng..."; - } + util::async::AnyExecutionContext ctx, + std::reference_wrapper config, + std::shared_ptr backend, + std::shared_ptr balancer, + std::shared_ptr ledgers, + std::shared_ptr publisher, + std::shared_ptr cacheLoader, + std::shared_ptr cacheUpdater, + std::shared_ptr extractor, + std::shared_ptr loader, + std::shared_ptr initialLoadObserver, + std::shared_ptr taskManagerProvider, + std::shared_ptr state + ); - ~ETLService() override - { - LOG(log_.debug()) << "Stopping ETLng"; - } + ~ETLService() override; void - run() override - { - LOG(log_.info()) << "run() in ETLng..."; - - mainLoop_.emplace(ctx_.execute([this] { - auto const rng = loadInitialLedgerIfNeeded(); - - LOG(log_.info()) << "Waiting for next ledger to be validated by network..."; - std::optional const mostRecentValidated = ledgers_->getMostRecent(); - - if (not mostRecentValidated) { - LOG(log_.info()) << "The wait for the next validated ledger has been aborted. " - "Exiting monitor loop"; - return; - } - - ASSERT(rng.has_value(), "Ledger range can't be null"); - auto const nextSequence = rng->maxSequence + 1; - - LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence; - - auto scheduler = impl::makeScheduler(impl::ForwardScheduler{*ledgers_, nextSequence} - // impl::BackfillScheduler{nextSequence - 1, nextSequence - 1000}, - // TODO lift limit and start with rng.minSeq - ); - - auto man = impl::TaskManager(ctx_, *scheduler, *extractor_, *loader_); - - // TODO: figure out this: std::make_shared(backend_, ledgers_, nextSequence) - man.run({}); // TODO: needs to be interruptible and fill out settings - })); - } + run() override; void - stop() override - { - LOG(log_.info()) << "Stop called"; - // TODO: stop the service correctly - } + stop() override; boost::json::object - getInfo() const override - { - // TODO - return {{"ok", true}}; - } + getInfo() const override; bool - isAmendmentBlocked() const override - { - // TODO - return false; - } + isAmendmentBlocked() const override; bool - isCorruptionDetected() const override - { - // TODO - return false; - } + isCorruptionDetected() const override; std::optional - getETLState() const override - { - // TODO - return std::nullopt; - } + getETLState() const override; std::uint32_t - lastCloseAgeSeconds() const override - { - // TODO - return 0; - } + lastCloseAgeSeconds() const override; private: // TODO: this better be std::expected std::optional - loadInitialLedgerIfNeeded() - { - if (auto rng = backend_->hardFetchLedgerRangeNoThrow(); not rng.has_value()) { - LOG(log_.info()) << "Database is empty. Will download a ledger from the network."; + loadInitialLedgerIfNeeded(); - try { - LOG(log_.info()) << "Waiting for next ledger to be validated by network..."; - if (auto const mostRecentValidated = ledgers_->getMostRecent(); mostRecentValidated.has_value()) { - auto const seq = *mostRecentValidated; - LOG(log_.info()) << "Ledger " << seq << " has been validated. Downloading... "; + void + startMonitor(uint32_t seq); - auto [ledger, timeDiff] = ::util::timed>([this, seq]() { - return extractor_->extractLedgerOnly(seq).and_then([this, seq](auto&& data) { - // TODO: loadInitialLedger in balancer should be called fetchEdgeKeys or similar - data.edgeKeys = balancer_->loadInitialLedger(seq, *loader_); - - // TODO: this should be interruptible for graceful shutdown - return loader_->loadInitialLedger(data); - }); - }); - - LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff; - LOG(log_.info()) << "Finished loadInitialLedger. cache size = " << backend_->cache().size(); - - if (ledger.has_value()) - return backend_->hardFetchLedgerRangeNoThrow(); - - LOG(log_.error()) << "Failed to load initial ledger. Exiting monitor loop"; - } else { - LOG(log_.info()) << "The wait for the next validated ledger has been aborted. " - "Exiting monitor loop"; - } - } catch (std::runtime_error const& e) { - LOG(log_.fatal()) << "Failed to load initial ledger: " << e.what(); - amendmentBlockHandler_->notifyAmendmentBlocked(); - } - } else { - LOG(log_.info()) << "Database already populated. Picking up from the tip of history"; - cacheLoader_->load(rng->maxSequence); - - return rng; - } - - return std::nullopt; - } + void + startLoading(uint32_t seq); }; + } // namespace etlng diff --git a/src/etlng/LedgerPublisherInterface.hpp b/src/etlng/LedgerPublisherInterface.hpp index fa609625..29ec3fce 100644 --- a/src/etlng/LedgerPublisherInterface.hpp +++ b/src/etlng/LedgerPublisherInterface.hpp @@ -37,13 +37,38 @@ struct LedgerPublisherInterface { * @param seq The sequence number of the ledger * @param maxAttempts The maximum number of attempts to publish the ledger; no limit if nullopt * @param attemptsDelay The delay between attempts + * @return Whether the ledger was found in the database and published */ - virtual void + virtual bool publish( uint32_t seq, std::optional maxAttempts, std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1} ) = 0; + + /** + * @brief Get last publish time as a time point + * + * @return A std::chrono::time_point representing the time of the last publish + */ + virtual std::chrono::time_point + getLastPublish() const = 0; + + /** + * @brief Get time passed since last ledger close, in seconds + * + * @return The number of seconds since the last ledger close as std::uint32_t + */ + virtual std::uint32_t + lastCloseAgeSeconds() const = 0; + + /** + * @brief Get time passed since last publish, in seconds + * + * @return The number of seconds since the last publish as std::uint32_t + */ + virtual std::uint32_t + lastPublishAgeSeconds() const = 0; }; } // namespace etlng diff --git a/src/etlng/MonitorInterface.hpp b/src/etlng/MonitorInterface.hpp index 8e4b745e..dfae9344 100644 --- a/src/etlng/MonitorInterface.hpp +++ b/src/etlng/MonitorInterface.hpp @@ -40,6 +40,13 @@ public: virtual ~MonitorInterface() = default; + /** + * @brief Allows the loading process to notify of a freshly committed ledger + * @param seq The ledger sequence loaded + */ + virtual void + notifyLedgerLoaded(uint32_t seq) = 0; + /** * @brief Allows clients to get notified when a new ledger becomes available in Clio's database * diff --git a/src/etlng/TaskManagerInterface.hpp b/src/etlng/TaskManagerInterface.hpp new file mode 100644 index 00000000..95651ffa --- /dev/null +++ b/src/etlng/TaskManagerInterface.hpp @@ -0,0 +1,46 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include + +namespace etlng { + +/** + * @brief An interface for the Task Manager + */ +struct TaskManagerInterface { + virtual ~TaskManagerInterface() = default; + + /** + * @brief Start the task manager with specified settings + * @param numExtractors The number of extraction tasks + */ + virtual void + run(size_t numExtractors) = 0; + + /** + * @brief Stop the task manager + */ + virtual void + stop() = 0; +}; + +} // namespace etlng diff --git a/src/etlng/TaskManagerProviderInterface.hpp b/src/etlng/TaskManagerProviderInterface.hpp new file mode 100644 index 00000000..2894170b --- /dev/null +++ b/src/etlng/TaskManagerProviderInterface.hpp @@ -0,0 +1,51 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "etlng/MonitorInterface.hpp" +#include "etlng/TaskManagerInterface.hpp" +#include "util/async/AnyExecutionContext.hpp" + +#include +#include +#include +#include + +namespace etlng { + +/** + * @brief An interface for providing the Task Manager + */ +struct TaskManagerProviderInterface { + virtual ~TaskManagerProviderInterface() = default; + + /** + * @brief Make a task manager + * + * @param ctx The async context to associate the task manager instance with + * @param monitor The monitor to notify when ledger is loaded + * @param seq The sequence to start at + * @return A unique pointer to a TaskManager implementation + */ + virtual std::unique_ptr + make(util::async::AnyExecutionContext ctx, std::reference_wrapper monitor, uint32_t seq) = 0; +}; + +} // namespace etlng diff --git a/src/etlng/impl/AmendmentBlockHandler.cpp b/src/etlng/impl/AmendmentBlockHandler.cpp index 502e3181..efb9fbc4 100644 --- a/src/etlng/impl/AmendmentBlockHandler.cpp +++ b/src/etlng/impl/AmendmentBlockHandler.cpp @@ -36,7 +36,7 @@ AmendmentBlockHandler::ActionType const AmendmentBlockHandler::kDEFAULT_AMENDMEN }; AmendmentBlockHandler::AmendmentBlockHandler( - util::async::AnyExecutionContext&& ctx, + util::async::AnyExecutionContext ctx, etl::SystemState& state, std::chrono::steady_clock::duration interval, ActionType action diff --git a/src/etlng/impl/AmendmentBlockHandler.hpp b/src/etlng/impl/AmendmentBlockHandler.hpp index 0bece8e7..6275d4ea 100644 --- a/src/etlng/impl/AmendmentBlockHandler.hpp +++ b/src/etlng/impl/AmendmentBlockHandler.hpp @@ -50,7 +50,7 @@ public: static ActionType const kDEFAULT_AMENDMENT_BLOCK_ACTION; AmendmentBlockHandler( - util::async::AnyExecutionContext&& ctx, + util::async::AnyExecutionContext ctx, etl::SystemState& state, std::chrono::steady_clock::duration interval = std::chrono::seconds{1}, ActionType action = kDEFAULT_AMENDMENT_BLOCK_ACTION diff --git a/src/etlng/impl/CacheUpdater.hpp b/src/etlng/impl/CacheUpdater.hpp new file mode 100644 index 00000000..7e2646ef --- /dev/null +++ b/src/etlng/impl/CacheUpdater.hpp @@ -0,0 +1,69 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/LedgerCacheInterface.hpp" +#include "data/Types.hpp" +#include "etlng/CacheUpdaterInterface.hpp" +#include "etlng/Models.hpp" +#include "util/log/Logger.hpp" + +#include +#include +#include + +namespace etlng::impl { + +class CacheUpdater : public CacheUpdaterInterface { + std::reference_wrapper cache_; + + util::Logger log_{"ETL"}; + +public: + CacheUpdater(data::LedgerCacheInterface& cache) : cache_{cache} + { + } + + void + update(model::LedgerData const& data) override + { + cache_.get().update(data.objects, data.seq); + } + + void + update(uint32_t seq, std::vector const& objs) override + { + cache_.get().update(objs, seq); + } + + void + update(uint32_t seq, std::vector const& objs) override + { + cache_.get().update(objs, seq); + } + + void + setFull() override + { + cache_.get().setFull(); + } +}; + +} // namespace etlng::impl diff --git a/src/etlng/impl/Extraction.hpp b/src/etlng/impl/Extraction.hpp index ea09642a..9a2df8ba 100644 --- a/src/etlng/impl/Extraction.hpp +++ b/src/etlng/impl/Extraction.hpp @@ -90,6 +90,13 @@ public: { } + Extractor(Extractor const&) = delete; + Extractor(Extractor&&) = delete; + Extractor& + operator=(Extractor const&) = delete; + Extractor& + operator=(Extractor&&) = delete; + [[nodiscard]] std::optional extractLedgerWithDiff(uint32_t seq) override; diff --git a/src/etlng/impl/LedgerPublisher.hpp b/src/etlng/impl/LedgerPublisher.hpp new file mode 100644 index 00000000..2c0d9ed7 --- /dev/null +++ b/src/etlng/impl/LedgerPublisher.hpp @@ -0,0 +1,286 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/BackendInterface.hpp" +#include "data/DBHelpers.hpp" +#include "data/Types.hpp" +#include "etl/SystemState.hpp" +#include "etlng/LedgerPublisherInterface.hpp" +#include "etlng/impl/Loading.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "util/Assert.hpp" +#include "util/Mutex.hpp" +#include "util/log/Logger.hpp" +#include "util/prometheus/Counter.hpp" +#include "util/prometheus/Prometheus.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace etlng::impl { + +/** + * @brief Publishes ledgers in a synchronized fashion. + * + * If ETL is started far behind the network, ledgers will be written and published very rapidly. Monitoring processes + * will publish ledgers as they are written. However, to publish a ledger, the monitoring process needs to read all of + * the transactions for that ledger from the database. Reading the transactions from the database requires network + * calls, which can be slow. It is imperative however that the monitoring processes keep up with the writer, else the + * monitoring processes will not be able to detect if the writer failed. Therefore, publishing each ledger (which + * includes reading all of the transactions from the database) is done from the application wide asio io_service, and a + * strand is used to ensure ledgers are published in order. + */ +class LedgerPublisher : public etlng::LedgerPublisherInterface { + util::Logger log_{"ETL"}; + + boost::asio::strand publishStrand_; + + std::shared_ptr backend_; + std::shared_ptr subscriptions_; + std::reference_wrapper state_; // shared state for ETL + + util::Mutex, std::shared_mutex> lastCloseTime_; + + std::reference_wrapper lastPublishSeconds_ = PrometheusService::counterInt( + "etl_last_publish_seconds", + {}, + "Seconds since epoch of the last published ledger" + ); + + util::Mutex, std::shared_mutex> lastPublishedSequence_; + +public: + /** + * @brief Create an instance of the publisher + */ + LedgerPublisher( + boost::asio::io_context& ioc, // TODO: replace with AsyncContext shared with ETLServiceNg + std::shared_ptr backend, + std::shared_ptr subscriptions, + etl::SystemState const& state + ) + : publishStrand_{boost::asio::make_strand(ioc)} + , backend_{std::move(backend)} + , subscriptions_{std::move(subscriptions)} + , state_{std::cref(state)} + { + } + + /** + * @brief Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers + * stream. + * + * @param ledgerSequence the sequence of the ledger to publish + * @param maxAttempts the number of times to attempt to read the ledger from the database + * @param attemptsDelay the delay between attempts to read the ledger from the database + * @return Whether the ledger was found in the database and published + */ + bool + publish( + uint32_t ledgerSequence, + std::optional maxAttempts, + std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1} + ) override + { + LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence; + size_t numAttempts = 0; + while (not state_.get().isStopping) { + auto range = backend_->hardFetchLedgerRangeNoThrow(); + + if (!range || range->maxSequence < ledgerSequence) { + ++numAttempts; + LOG(log_.debug()) << "Trying to publish. Could not find ledger with sequence = " << ledgerSequence; + + // We try maxAttempts times to publish the ledger, waiting one second in between each attempt. + if (maxAttempts && numAttempts >= maxAttempts) { + LOG(log_.debug()) << "Failed to publish ledger after " << numAttempts << " attempts."; + return false; + } + std::this_thread::sleep_for(attemptsDelay); + continue; + } + + auto lgr = data::synchronousAndRetryOnTimeout([&](auto yield) { + return backend_->fetchLedgerBySequence(ledgerSequence, yield); + }); + + ASSERT(lgr.has_value(), "Ledger must exist in database. Ledger sequence = {}", ledgerSequence); + publish(*lgr); + + return true; + } + return false; + } + + /** + * @brief Publish the passed ledger asynchronously. + * + * All ledgers are published thru publishStrand_ which ensures that all publishes are performed in a serial fashion. + * + * @param lgrInfo the ledger to publish + */ + void + publish(ripple::LedgerHeader const& lgrInfo) + { + boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() { + LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq); + + // TODO: This should probably not be part of publisher in the future + if (not state_.get().isWriting) + backend_->updateRange(lgrInfo.seq); // This can't be unit tested atm. + + setLastClose(lgrInfo.closeTime); + auto age = lastCloseAgeSeconds(); + + // if the ledger closed over MAX_LEDGER_AGE_SECONDS ago, assume we are still catching up and don't publish + static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600; + if (age < kMAX_LEDGER_AGE_SECONDS) { + std::optional fees = data::synchronousAndRetryOnTimeout([&](auto yield) { + return backend_->fetchFees(lgrInfo.seq, yield); + }); + ASSERT(fees.has_value(), "Fees must exist for ledger {}", lgrInfo.seq); + + auto transactions = data::synchronousAndRetryOnTimeout([&](auto yield) { + return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield); + }); + + auto const ledgerRange = backend_->fetchLedgerRange(); + ASSERT(ledgerRange.has_value(), "Ledger range must exist"); + + auto const range = fmt::format("{}-{}", ledgerRange->minSequence, ledgerRange->maxSequence); + subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size()); + + // order with transaction index + std::ranges::sort(transactions, [](auto const& t1, auto const& t2) { + ripple::SerialIter iter1{t1.metadata.data(), t1.metadata.size()}; + ripple::STObject const object1(iter1, ripple::sfMetadata); + ripple::SerialIter iter2{t2.metadata.data(), t2.metadata.size()}; + ripple::STObject const object2(iter2, ripple::sfMetadata); + return object1.getFieldU32(ripple::sfTransactionIndex) < + object2.getFieldU32(ripple::sfTransactionIndex); + }); + + for (auto const& txAndMeta : transactions) + subscriptions_->pubTransaction(txAndMeta, lgrInfo); + + subscriptions_->pubBookChanges(lgrInfo, transactions); + + setLastPublishTime(); + LOG(log_.info()) << "Published ledger " << lgrInfo.seq; + } else { + LOG(log_.info()) << "Skipping publishing ledger " << lgrInfo.seq; + } + }); + + // we track latest publish-requested seq, not necessarily already published + setLastPublishedSequence(lgrInfo.seq); + } + + /** + * @brief Get time passed since last publish, in seconds + */ + std::uint32_t + lastPublishAgeSeconds() const override + { + return std::chrono::duration_cast(std::chrono::system_clock::now() - getLastPublish()) + .count(); + } + + /** + * @brief Get last publish time as a time point + */ + std::chrono::time_point + getLastPublish() const override + { + return std::chrono::time_point{std::chrono::seconds{lastPublishSeconds_.get().value() + }}; + } + + /** + * @brief Get time passed since last ledger close, in seconds + */ + std::uint32_t + lastCloseAgeSeconds() const override + { + auto closeTime = lastCloseTime_.lock()->time_since_epoch().count(); + auto now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) + .count(); + if (now < (kRIPPLE_EPOCH_START + closeTime)) + return 0; + return now - (kRIPPLE_EPOCH_START + closeTime); + } + + /** + * @brief Get the sequence of the last schueduled ledger to publish, Be aware that the ledger may not have been + * published to network + */ + std::optional + getLastPublishedSequence() const + { + return *lastPublishedSequence_.lock(); + } + +private: + void + setLastClose(std::chrono::time_point lastCloseTime) + { + auto closeTime = lastCloseTime_.lock(); + *closeTime = lastCloseTime; + } + + void + setLastPublishTime() + { + using namespace std::chrono; + auto const nowSeconds = duration_cast(system_clock::now().time_since_epoch()).count(); + lastPublishSeconds_.get().set(nowSeconds); + } + + void + setLastPublishedSequence(std::optional lastPublishedSequence) + { + auto lastPublishSeq = lastPublishedSequence_.lock(); + *lastPublishSeq = lastPublishedSequence; + } +}; + +} // namespace etlng::impl diff --git a/src/etlng/impl/Loading.cpp b/src/etlng/impl/Loading.cpp index e8608dbe..701fe359 100644 --- a/src/etlng/impl/Loading.cpp +++ b/src/etlng/impl/Loading.cpp @@ -20,7 +20,6 @@ #include "etlng/impl/Loading.hpp" #include "data/BackendInterface.hpp" -#include "etl/LedgerFetcherInterface.hpp" #include "etl/impl/LedgerLoader.hpp" #include "etlng/AmendmentBlockHandlerInterface.hpp" #include "etlng/Models.hpp" @@ -46,12 +45,10 @@ namespace etlng::impl { Loader::Loader( std::shared_ptr backend, - std::shared_ptr fetcher, std::shared_ptr registry, std::shared_ptr amendmentBlockHandler ) : backend_(std::move(backend)) - , fetcher_(std::move(fetcher)) , registry_(std::move(registry)) , amendmentBlockHandler_(std::move(amendmentBlockHandler)) { @@ -81,31 +78,44 @@ Loader::onInitialLoadGotMoreObjects( std::optional lastKey ) { - LOG(log_.debug()) << "On initial load: got more objects for seq " << seq << ". size = " << data.size(); - registry_->dispatchInitialObjects( - seq, data, std::move(lastKey).value_or(std::string{}) // TODO: perhaps use optional all the way to extensions? - ); + try { + LOG(log_.debug()) << "On initial load: got more objects for seq " << seq << ". size = " << data.size(); + registry_->dispatchInitialObjects( + seq, + data, + std::move(lastKey).value_or(std::string{}) // TODO: perhaps use optional all the way to extensions? + ); + } catch (std::runtime_error const& e) { + LOG(log_.fatal()) << "Failed to load initial objects for " << seq << ": " << e.what(); + amendmentBlockHandler_->notifyAmendmentBlocked(); + } } std::optional Loader::loadInitialLedger(model::LedgerData const& data) { - // check that database is actually empty - auto rng = backend_->hardFetchLedgerRangeNoThrow(); - if (rng) { - ASSERT(false, "Database is not empty"); + try { + // check that database is actually empty + auto rng = backend_->hardFetchLedgerRangeNoThrow(); + if (rng) { + ASSERT(false, "Database is not empty"); + return std::nullopt; + } + + LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(data.header); + + auto seconds = ::util::timed([this, &data]() { registry_->dispatchInitialData(data); }); + LOG(log_.info()) << "Dispatching initial data and submitting all writes took " << seconds << " seconds."; + + backend_->finishWrites(data.seq); + LOG(log_.debug()) << "Loaded initial ledger"; + + return {data.header}; + } catch (std::runtime_error const& e) { + LOG(log_.fatal()) << "Failed to load initial ledger " << data.seq << ": " << e.what(); + amendmentBlockHandler_->notifyAmendmentBlocked(); return std::nullopt; } - - LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(data.header); - - auto seconds = ::util::timed([this, &data]() { registry_->dispatchInitialData(data); }); - LOG(log_.info()) << "Dispatching initial data and submitting all writes took " << seconds << " seconds."; - - backend_->finishWrites(data.seq); - LOG(log_.debug()) << "Loaded initial ledger"; - - return {data.header}; } } // namespace etlng::impl diff --git a/src/etlng/impl/Loading.hpp b/src/etlng/impl/Loading.hpp index caa677bc..39a1e150 100644 --- a/src/etlng/impl/Loading.hpp +++ b/src/etlng/impl/Loading.hpp @@ -49,7 +49,6 @@ namespace etlng::impl { class Loader : public LoaderInterface, public InitialLoadObserverInterface { std::shared_ptr backend_; - std::shared_ptr fetcher_; std::shared_ptr registry_; std::shared_ptr amendmentBlockHandler_; @@ -62,11 +61,17 @@ public: Loader( std::shared_ptr backend, - std::shared_ptr fetcher, std::shared_ptr registry, std::shared_ptr amendmentBlockHandler ); + Loader(Loader const&) = delete; + Loader(Loader&&) = delete; + Loader& + operator=(Loader const&) = delete; + Loader& + operator=(Loader&&) = delete; + void load(model::LedgerData const& data) override; diff --git a/src/etlng/impl/Monitor.cpp b/src/etlng/impl/Monitor.cpp index 8b8bedad..e55eb345 100644 --- a/src/etlng/impl/Monitor.cpp +++ b/src/etlng/impl/Monitor.cpp @@ -55,6 +55,15 @@ Monitor::~Monitor() stop(); } +// TODO: think about using signals perhaps? maybe combining with onNextSequence? +// also, how do we not double invoke or does it not matter +void +Monitor::notifyLedgerLoaded(uint32_t seq) +{ + LOG(log_.debug()) << "Loader notified about newly committed ledger " << seq; + repeatedTask_->invoke(); // force-invoke immediately +}; + void Monitor::run(std::chrono::steady_clock::duration repeatInterval) { diff --git a/src/etlng/impl/Monitor.hpp b/src/etlng/impl/Monitor.hpp index 9b5112cc..b8971bc8 100644 --- a/src/etlng/impl/Monitor.hpp +++ b/src/etlng/impl/Monitor.hpp @@ -60,6 +60,9 @@ public: ); ~Monitor() override; + void + notifyLedgerLoaded(uint32_t seq) override; + void run(std::chrono::steady_clock::duration repeatInterval) override; diff --git a/src/etlng/impl/Registry.hpp b/src/etlng/impl/Registry.hpp index 921b7081..c1a43ae8 100644 --- a/src/etlng/impl/Registry.hpp +++ b/src/etlng/impl/Registry.hpp @@ -19,12 +19,15 @@ #pragma once +#include "etl/SystemState.hpp" #include "etlng/Models.hpp" #include "etlng/RegistryInterface.hpp" #include +#include #include +#include #include #include #include @@ -88,6 +91,7 @@ concept SomeExtension = NoTwoOfKind and ContainsValidHook; template class Registry : public RegistryInterface { + std::reference_wrapper state_; std::tuple store_; static_assert( @@ -101,9 +105,9 @@ class Registry : public RegistryInterface { ); public: - explicit constexpr Registry(SomeExtension auto&&... exts) + explicit constexpr Registry(etl::SystemState const& state, SomeExtension auto&&... exts) requires(std::is_same_v, std::decay_t> and ...) - : store_(std::forward(exts)...) + : state_{state}, store_(std::forward(exts)...) { } @@ -121,9 +125,8 @@ public: // send entire batch of data at once { auto const expand = [&](auto& p) { - if constexpr (requires { p.onLedgerData(data); }) { - p.onLedgerData(data); - } + if constexpr (requires { p.onLedgerData(data); }) + executeIfAllowed(p, [&data](auto& p) { p.onLedgerData(data); }); }; std::apply([&expand](auto&&... xs) { (expand(xs), ...); }, store_); @@ -134,7 +137,7 @@ public: auto const expand = [&](P& p, model::Transaction const& t) { if constexpr (requires { p.onTransaction(data.seq, t); }) { if (std::decay_t

::spec::wants(t.type)) - p.onTransaction(data.seq, t); + executeIfAllowed(p, [&data, &t](auto& p) { p.onTransaction(data.seq, t); }); } }; @@ -146,9 +149,8 @@ public: // send per object path { auto const expand = [&](P&& p, model::Object const& o) { - if constexpr (requires { p.onObject(data.seq, o); }) { - p.onObject(data.seq, o); - } + if constexpr (requires { p.onObject(data.seq, o); }) + executeIfAllowed(p, [&data, &o](auto& p) { p.onObject(data.seq, o); }); }; for (auto const& obj : data.objects) { @@ -163,9 +165,8 @@ public: // send entire vector path { auto const expand = [&](auto&& p) { - if constexpr (requires { p.onInitialObjects(seq, data, lastKey); }) { - p.onInitialObjects(seq, data, lastKey); - } + if constexpr (requires { p.onInitialObjects(seq, data, lastKey); }) + executeIfAllowed(p, [seq, &data, &lastKey](auto& p) { p.onInitialObjects(seq, data, lastKey); }); }; std::apply([&expand](auto&&... xs) { (expand(xs), ...); }, store_); @@ -174,9 +175,8 @@ public: // send per object path { auto const expand = [&](P&& p, model::Object const& o) { - if constexpr (requires { p.onInitialObject(seq, o); }) { - p.onInitialObject(seq, o); - } + if constexpr (requires { p.onInitialObject(seq, o); }) + executeIfAllowed(p, [seq, &o](auto& p) { p.onInitialObject(seq, o); }); }; for (auto const& obj : data) { @@ -191,9 +191,8 @@ public: // send entire batch path { auto const expand = [&](auto&& p) { - if constexpr (requires { p.onInitialData(data); }) { - p.onInitialData(data); - } + if constexpr (requires { p.onInitialData(data); }) + executeIfAllowed(p, [&data](auto& p) { p.onInitialData(data); }); }; std::apply([&expand](auto&&... xs) { (expand(xs), ...); }, store_); @@ -204,7 +203,7 @@ public: auto const expand = [&](P&& p, model::Transaction const& tx) { if constexpr (requires { p.onInitialTransaction(data.seq, tx); }) { if (std::decay_t

::spec::wants(tx.type)) - p.onInitialTransaction(data.seq, tx); + executeIfAllowed(p, [&data, &tx](auto& p) { p.onInitialTransaction(data.seq, tx); }); } }; @@ -213,12 +212,25 @@ public: } } } + +private: + void + executeIfAllowed(auto& p, auto&& fn) + { + if constexpr (requires { p.allowInReadonly(); }) { + if (state_.get().isWriting or p.allowInReadonly()) + fn(p); + } else { + if (state_.get().isWriting) + fn(p); + } + } }; static auto -makeRegistry(auto&&... exts) +makeRegistry(etl::SystemState const& state, auto&&... exts) { - return std::make_unique...>>(std::forward(exts)...); + return std::make_unique...>>(state, std::forward(exts)...); } } // namespace etlng::impl diff --git a/src/etlng/impl/TaskManager.cpp b/src/etlng/impl/TaskManager.cpp index a5fc20ce..8f61dc44 100644 --- a/src/etlng/impl/TaskManager.cpp +++ b/src/etlng/impl/TaskManager.cpp @@ -22,15 +22,21 @@ #include "etlng/ExtractorInterface.hpp" #include "etlng/LoaderInterface.hpp" #include "etlng/Models.hpp" +#include "etlng/MonitorInterface.hpp" #include "etlng/SchedulerInterface.hpp" +#include "etlng/impl/Monitor.hpp" +#include "etlng/impl/TaskQueue.hpp" +#include "util/LedgerUtils.hpp" +#include "util/Profiler.hpp" #include "util/async/AnyExecutionContext.hpp" #include "util/async/AnyOperation.hpp" -#include "util/async/AnyStrand.hpp" #include "util/log/Logger.hpp" #include #include +#include #include +#include #include #include #include @@ -39,12 +45,19 @@ namespace etlng::impl { TaskManager::TaskManager( - util::async::AnyExecutionContext&& ctx, - std::reference_wrapper scheduler, + util::async::AnyExecutionContext ctx, + std::shared_ptr scheduler, std::reference_wrapper extractor, - std::reference_wrapper loader + std::reference_wrapper loader, + std::reference_wrapper monitor, + uint32_t startSeq ) - : ctx_(std::move(ctx)), schedulers_(scheduler), extractor_(extractor), loader_(loader) + : ctx_(std::move(ctx)) + , schedulers_(std::move(scheduler)) + , extractor_(extractor) + , loader_(loader) + , monitor_(monitor) + , queue_({.startSeq = startSeq, .increment = 1u, .limit = kQUEUE_SIZE_LIMIT}) { } @@ -54,37 +67,32 @@ TaskManager::~TaskManager() } void -TaskManager::run(Settings settings) +TaskManager::run(std::size_t numExtractors) { - static constexpr auto kQUEUE_SIZE_LIMIT = 2048uz; + LOG(log_.debug()) << "Starting task manager with " << numExtractors << " extractors...\n"; - auto schedulingStrand = ctx_.makeStrand(); - PriorityQueue queue(ctx_.makeStrand(), kQUEUE_SIZE_LIMIT); + stop(); + extractors_.clear(); + loaders_.clear(); - LOG(log_.debug()) << "Starting task manager...\n"; + extractors_.reserve(numExtractors); + for ([[maybe_unused]] auto _ : std::views::iota(0uz, numExtractors)) + extractors_.push_back(spawnExtractor(queue_)); - extractors_.reserve(settings.numExtractors); - for ([[maybe_unused]] auto _ : std::views::iota(0uz, settings.numExtractors)) - extractors_.push_back(spawnExtractor(schedulingStrand, queue)); - - loaders_.reserve(settings.numLoaders); - for ([[maybe_unused]] auto _ : std::views::iota(0uz, settings.numLoaders)) - loaders_.push_back(spawnLoader(queue)); - - wait(); - LOG(log_.debug()) << "All finished in task manager..\n"; + // Only one forward loader for now. Backfill to be added here later + loaders_.push_back(spawnLoader(queue_)); } util::async::AnyOperation -TaskManager::spawnExtractor(util::async::AnyStrand& strand, PriorityQueue& queue) +TaskManager::spawnExtractor(TaskQueue& queue) { // TODO: these values may be extracted to config later and/or need to be fine-tuned on a realistic system static constexpr auto kDELAY_BETWEEN_ATTEMPTS = std::chrono::milliseconds{100u}; static constexpr auto kDELAY_BETWEEN_ENQUEUE_ATTEMPTS = std::chrono::milliseconds{1u}; - return strand.execute([this, &queue](auto stopRequested) { + return ctx_.execute([this, &queue](auto stopRequested) { while (not stopRequested) { - if (auto task = schedulers_.get().next(); task.has_value()) { + if (auto task = schedulers_->next(); task.has_value()) { if (auto maybeBatch = extractor_.get().extractLedgerWithDiff(task->seq); maybeBatch.has_value()) { LOG(log_.debug()) << "Adding data after extracting diff"; while (not queue.enqueue(*maybeBatch)) { @@ -107,13 +115,26 @@ TaskManager::spawnExtractor(util::async::AnyStrand& strand, PriorityQueue& queue } util::async::AnyOperation -TaskManager::spawnLoader(PriorityQueue& queue) +TaskManager::spawnLoader(TaskQueue& queue) { + static constexpr auto kNANO_TO_SECOND = 1.0e9; + return ctx_.execute([this, &queue](auto stopRequested) { while (not stopRequested) { // TODO (https://github.com/XRPLF/clio/issues/66): does not tell the loader whether it's out of order or not - if (auto data = queue.dequeue(); data.has_value()) - loader_.get().load(*data); + if (auto data = queue.dequeue(); data.has_value()) { + auto nanos = util::timed([this, data = *data] { loader_.get().load(data); }); + auto const seconds = nanos / kNANO_TO_SECOND; + auto const txnCount = data->transactions.size(); + auto const objCount = data->objects.size(); + + LOG(log_.info()) << "Wrote ledger " << data->seq << " with header: " << util::toString(data->header) + << ". txns[" << txnCount << "]; objs[" << objCount << "]; in " << seconds + << " seconds;" + << " tps[" << txnCount / seconds << "], ops[" << objCount / seconds << "]"; + + monitor_.get().notifyLedgerLoaded(data->seq); + } } }); } diff --git a/src/etlng/impl/TaskManager.hpp b/src/etlng/impl/TaskManager.hpp index 0047513b..48ab3f87 100644 --- a/src/etlng/impl/TaskManager.hpp +++ b/src/etlng/impl/TaskManager.hpp @@ -21,74 +21,70 @@ #include "etlng/ExtractorInterface.hpp" #include "etlng/LoaderInterface.hpp" -#include "etlng/Models.hpp" +#include "etlng/MonitorInterface.hpp" #include "etlng/SchedulerInterface.hpp" -#include "util/StrandedPriorityQueue.hpp" +#include "etlng/TaskManagerInterface.hpp" +#include "etlng/impl/Monitor.hpp" +#include "etlng/impl/TaskQueue.hpp" #include "util/async/AnyExecutionContext.hpp" #include "util/async/AnyOperation.hpp" -#include "util/async/AnyStrand.hpp" #include "util/log/Logger.hpp" #include +#include #include +#include #include +#include #include namespace etlng::impl { -class TaskManager { +class TaskManager : public TaskManagerInterface { + static constexpr auto kQUEUE_SIZE_LIMIT = 2048uz; + util::async::AnyExecutionContext ctx_; - std::reference_wrapper schedulers_; + std::shared_ptr schedulers_; std::reference_wrapper extractor_; std::reference_wrapper loader_; + std::reference_wrapper monitor_; + + impl::TaskQueue queue_; + std::atomic_uint32_t nextForwardSequence_; std::vector> extractors_; std::vector> loaders_; util::Logger log_{"ETL"}; - struct ReverseOrderComparator { - [[nodiscard]] bool - operator()(model::LedgerData const& lhs, model::LedgerData const& rhs) const noexcept - { - return lhs.seq > rhs.seq; - } - }; - public: - struct Settings { - size_t numExtractors; /**< number of extraction tasks */ - size_t numLoaders; /**< number of loading tasks */ - }; - - // reverse order loading is needed (i.e. start with oldest seq in forward fill buffer) - using PriorityQueue = util::StrandedPriorityQueue; - TaskManager( - util::async::AnyExecutionContext&& ctx, - std::reference_wrapper scheduler, + util::async::AnyExecutionContext ctx, + std::shared_ptr scheduler, std::reference_wrapper extractor, - std::reference_wrapper loader + std::reference_wrapper loader, + std::reference_wrapper monitor, + uint32_t startSeq ); - ~TaskManager(); + ~TaskManager() override; void - run(Settings settings); + run(std::size_t numExtractors) override; void - stop(); + stop() override; private: void wait(); [[nodiscard]] util::async::AnyOperation - spawnExtractor(util::async::AnyStrand& strand, PriorityQueue& queue); + spawnExtractor(TaskQueue& queue); [[nodiscard]] util::async::AnyOperation - spawnLoader(PriorityQueue& queue); + spawnLoader(TaskQueue& queue); }; } // namespace etlng::impl diff --git a/src/etlng/impl/TaskManagerProvider.hpp b/src/etlng/impl/TaskManagerProvider.hpp new file mode 100644 index 00000000..f242d7ce --- /dev/null +++ b/src/etlng/impl/TaskManagerProvider.hpp @@ -0,0 +1,74 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etlng/ExtractorInterface.hpp" +#include "etlng/LoaderInterface.hpp" +#include "etlng/MonitorInterface.hpp" +#include "etlng/TaskManagerInterface.hpp" +#include "etlng/TaskManagerProviderInterface.hpp" +#include "etlng/impl/Scheduling.hpp" +#include "etlng/impl/TaskManager.hpp" +#include "util/async/AnyExecutionContext.hpp" + +#include +#include +#include +#include + +namespace etlng::impl { + +/** + * @brief Implementation of the TaskManagerProvider interface + */ +class TaskManagerProvider : public TaskManagerProviderInterface { + std::reference_wrapper ledgers_; + std::shared_ptr extractor_; + std::shared_ptr loader_; + +public: + /** + * @brief Constructor + * + * @param ledgers Reference to ledgers + * @param extractor The extractor + * @param loader The loader + */ + TaskManagerProvider( + std::reference_wrapper ledgers, + std::shared_ptr extractor, + std::shared_ptr loader + ) + : ledgers_(ledgers), extractor_(std::move(extractor)), loader_(std::move(loader)) + { + } + + std::unique_ptr + make(util::async::AnyExecutionContext ctx, std::reference_wrapper monitor, uint32_t seq) override + { + auto scheduler = impl::makeScheduler(impl::ForwardScheduler{ledgers_, seq}); + // TODO: add impl::BackfillScheduler{seq - 1, seq - 1000}, + + return std::make_unique(std::move(ctx), std::move(scheduler), *extractor_, *loader_, monitor, seq); + } +}; + +} // namespace etlng::impl diff --git a/src/util/StrandedPriorityQueue.hpp b/src/etlng/impl/TaskQueue.hpp similarity index 54% rename from src/util/StrandedPriorityQueue.hpp rename to src/etlng/impl/TaskQueue.hpp index cd605e58..40c7ac75 100644 --- a/src/util/StrandedPriorityQueue.hpp +++ b/src/etlng/impl/TaskQueue.hpp @@ -19,36 +19,58 @@ #pragma once -#include "util/async/AnyStrand.hpp" +#include "etlng/Models.hpp" +#include "util/Mutex.hpp" #include -#include +#include #include #include -#include #include #include -namespace util { +namespace etlng::impl { + +struct ReverseOrderComparator { + [[nodiscard]] bool + operator()(model::LedgerData const& lhs, model::LedgerData const& rhs) const noexcept + { + return lhs.seq > rhs.seq; + } +}; /** - * @brief A wrapper for std::priority_queue that serialises operations using a strand + * @brief A wrapper for std::priority_queue that serialises operations using a mutex * @note This may be a candidate for future improvements if performance proves to be poor (e.g. use a lock free queue) */ -template > -class StrandedPriorityQueue { - util::async::AnyStrand strand_; +class TaskQueue { std::size_t limit_; - std::priority_queue, Compare> queue_; + std::uint32_t increment_; + + struct Data { + std::uint32_t expectedSequence; + std::priority_queue, ReverseOrderComparator> forwardLoadQueue; + + Data(std::uint32_t seq) : expectedSequence(seq) + { + } + }; + + util::Mutex data_; public: + struct Settings { + std::uint32_t startSeq = 0u; // sequence to start from (for dequeue) + std::uint32_t increment = 1u; // increment sequence by this value once dequeue was successful + std::optional limit = std::nullopt; + }; + /** - * @brief Construct a new priority queue on a strand - * @param strand The strand to use + * @brief Construct a new priority queue * @param limit The limit of items allowed simultaneously in the queue */ - StrandedPriorityQueue(util::async::AnyStrand&& strand, std::optional limit = std::nullopt) - : strand_(std::move(strand)), limit_(limit.value_or(0uz)) + explicit TaskQueue(Settings settings) + : limit_(settings.limit.value_or(0uz)), increment_(settings.increment), data_(settings.startSeq) { } @@ -56,25 +78,20 @@ public: * @brief Enqueue a new item onto the queue if space is available * @note This function blocks until the item is attempted to be added to the queue * - * @tparam I Type of the item to add * @param item The item to add * @return true if item added to the queue; false otherwise */ - template [[nodiscard]] bool - enqueue(I&& item) - requires std::is_same_v, T> + enqueue(model::LedgerData item) { - return strand_ - .execute([&item, this] { - if (limit_ == 0uz or queue_.size() < limit_) { - queue_.push(std::forward(item)); - return true; - } - return false; - }) - .get() - .value_or(false); // if some exception happens - failed to add + auto lock = data_.lock(); + + if (limit_ == 0uz or lock->forwardLoadQueue.size() < limit_) { + lock->forwardLoadQueue.push(std::move(item)); + return true; + } + + return false; } /** @@ -82,22 +99,19 @@ public: * @note This function blocks until the item is taken off the queue * @return An item if available; nullopt otherwise */ - [[nodiscard]] std::optional + [[nodiscard]] std::optional dequeue() { - return strand_ - .execute([this] -> std::optional { - std::optional out; + auto lock = data_.lock(); + std::optional out; - if (not queue_.empty()) { - out.emplace(queue_.top()); - queue_.pop(); - } + if (not lock->forwardLoadQueue.empty() && lock->forwardLoadQueue.top().seq == lock->expectedSequence) { + out.emplace(lock->forwardLoadQueue.top()); + lock->forwardLoadQueue.pop(); + lock->expectedSequence += increment_; + } - return out; - }) - .get() - .value_or(std::nullopt); + return out; } /** @@ -109,8 +123,8 @@ public: [[nodiscard]] bool empty() { - return strand_.execute([this] { return queue_.empty(); }).get().value(); + return data_.lock()->forwardLoadQueue.empty(); } }; -} // namespace util +} // namespace etlng::impl diff --git a/src/etlng/impl/ext/Cache.cpp b/src/etlng/impl/ext/Cache.cpp index d7c2e5f5..221f639a 100644 --- a/src/etlng/impl/ext/Cache.cpp +++ b/src/etlng/impl/ext/Cache.cpp @@ -19,24 +19,26 @@ #include "etlng/impl/ext/Cache.hpp" -#include "data/LedgerCacheInterface.hpp" +#include "etlng/CacheUpdaterInterface.hpp" #include "etlng/Models.hpp" #include "util/log/Logger.hpp" #include +#include #include +#include #include namespace etlng::impl { -CacheExt::CacheExt(data::LedgerCacheInterface& cache) : cache_(cache) +CacheExt::CacheExt(std::shared_ptr cacheUpdater) : cacheUpdater_(std::move(cacheUpdater)) { } void CacheExt::onLedgerData(model::LedgerData const& data) const { - cache_.get().update(data.objects, data.seq); + cacheUpdater_->update(data); LOG(log_.trace()) << "got data. objects cnt = " << data.objects.size(); } @@ -44,8 +46,8 @@ void CacheExt::onInitialData(model::LedgerData const& data) const { LOG(log_.trace()) << "got initial data. objects cnt = " << data.objects.size(); - cache_.get().update(data.objects, data.seq); - cache_.get().setFull(); + cacheUpdater_->update(data); + cacheUpdater_->setFull(); } void @@ -53,7 +55,7 @@ CacheExt::onInitialObjects(uint32_t seq, std::vector const& objs, const { LOG(log_.trace()) << "got initial objects cnt = " << objs.size(); - cache_.get().update(objs, seq); + cacheUpdater_->update(seq, objs); } } // namespace etlng::impl diff --git a/src/etlng/impl/ext/Cache.hpp b/src/etlng/impl/ext/Cache.hpp index fc9fbbeb..51bf5cd5 100644 --- a/src/etlng/impl/ext/Cache.hpp +++ b/src/etlng/impl/ext/Cache.hpp @@ -19,24 +19,25 @@ #pragma once -#include "data/LedgerCacheInterface.hpp" +#include "etlng/CacheUpdaterInterface.hpp" #include "etlng/Models.hpp" +#include "etlng/impl/CacheUpdater.hpp" #include "util/log/Logger.hpp" #include -#include +#include #include #include namespace etlng::impl { class CacheExt { - std::reference_wrapper cache_; + std::shared_ptr cacheUpdater_; util::Logger log_{"ETL"}; public: - CacheExt(data::LedgerCacheInterface& cache); + CacheExt(std::shared_ptr cacheUpdater); void onLedgerData(model::LedgerData const& data) const; @@ -46,6 +47,13 @@ public: void onInitialObjects(uint32_t seq, std::vector const& objs, [[maybe_unused]] std::string lastKey) const; + + // We want cache updates through ETL if we are a potential writer but currently are not writing to DB + static bool + allowInReadonly() + { + return true; + } }; } // namespace etlng::impl diff --git a/tests/common/util/MockLedgerPublisher.hpp b/tests/common/util/MockLedgerPublisher.hpp index 574fc967..26a24e7f 100644 --- a/tests/common/util/MockLedgerPublisher.hpp +++ b/tests/common/util/MockLedgerPublisher.hpp @@ -19,6 +19,8 @@ #pragma once +#include "etlng/LedgerPublisherInterface.hpp" + #include #include @@ -26,11 +28,11 @@ #include #include -struct MockLedgerPublisher { - MOCK_METHOD(bool, publish, (uint32_t, std::optional), ()); +struct MockLedgerPublisher : public etlng::LedgerPublisherInterface { + MOCK_METHOD(bool, publish, (uint32_t, std::optional, std::chrono::steady_clock::duration), (override)); MOCK_METHOD(void, publish, (ripple::LedgerHeader const&), ()); MOCK_METHOD(std::uint32_t, lastPublishAgeSeconds, (), (const)); - MOCK_METHOD(std::chrono::time_point, getLastPublish, (), (const)); - MOCK_METHOD(std::uint32_t, lastCloseAgeSeconds, (), (const)); + MOCK_METHOD(std::chrono::time_point, getLastPublish, (), (const, override)); + MOCK_METHOD(std::uint32_t, lastCloseAgeSeconds, (), (const, override)); MOCK_METHOD(std::optional, getLastPublishedSequence, (), (const)); }; diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index b24429a9..33b2ffc0 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -40,9 +40,11 @@ target_sources( etl/TransformerTests.cpp # ETLng etlng/AmendmentBlockHandlerTests.cpp + etlng/ETLServiceTests.cpp etlng/ExtractionTests.cpp etlng/ForwardingSourceTests.cpp etlng/GrpcSourceTests.cpp + etlng/LedgerPublisherTests.cpp etlng/RegistryTests.cpp etlng/SchedulingTests.cpp etlng/TaskManagerTests.cpp @@ -145,7 +147,6 @@ target_sources( util/ConceptsTests.cpp util/CoroutineGroupTests.cpp util/LedgerUtilsTests.cpp - util/StrandedPriorityQueueTests.cpp util/StringHashTests.cpp # Prometheus support util/prometheus/BoolTests.cpp diff --git a/tests/unit/etlng/ETLServiceTests.cpp b/tests/unit/etlng/ETLServiceTests.cpp new file mode 100644 index 00000000..bcdf65f4 --- /dev/null +++ b/tests/unit/etlng/ETLServiceTests.cpp @@ -0,0 +1,341 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025 the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "data/Types.hpp" +#include "etl/ETLState.hpp" +#include "etl/SystemState.hpp" +#include "etlng/CacheLoaderInterface.hpp" +#include "etlng/CacheUpdaterInterface.hpp" +#include "etlng/ETLService.hpp" +#include "etlng/ExtractorInterface.hpp" +#include "etlng/InitialLoadObserverInterface.hpp" +#include "etlng/LoaderInterface.hpp" +#include "etlng/Models.hpp" +#include "etlng/MonitorInterface.hpp" +#include "etlng/TaskManagerInterface.hpp" +#include "etlng/TaskManagerProviderInterface.hpp" +#include "util/BinaryTestObject.hpp" +#include "util/MockAssert.hpp" +#include "util/MockBackendTestFixture.hpp" +#include "util/MockLedgerPublisher.hpp" +#include "util/MockLoadBalancer.hpp" +#include "util/MockNetworkValidatedLedgers.hpp" +#include "util/MockPrometheus.hpp" +#include "util/MockSubscriptionManager.hpp" +#include "util/TestObject.hpp" +#include "util/async/AnyExecutionContext.hpp" +#include "util/async/context/BasicExecutionContext.hpp" +#include "util/async/context/SyncExecutionContext.hpp" +#include "util/async/impl/ErasedOperation.hpp" +#include "util/config/ConfigDefinition.hpp" +#include "util/config/ConfigValue.hpp" +#include "util/config/Types.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace util::config; + +namespace { +constinit auto const kSEQ = 100; +constinit auto const kLEDGER_HASH = "4BC50C9B0D8515D3EAAE1E74B29A95804346C491EE1A95BF25E4AAB854A6A652"; + +struct MockMonitor : public etlng::MonitorInterface { + MOCK_METHOD(void, notifyLedgerLoaded, (uint32_t), (override)); + MOCK_METHOD(boost::signals2::scoped_connection, subscribe, (SignalType::slot_type const&), (override)); + MOCK_METHOD(void, run, (std::chrono::steady_clock::duration), (override)); + MOCK_METHOD(void, stop, (), (override)); +}; + +struct MockExtractor : etlng::ExtractorInterface { + MOCK_METHOD(std::optional, extractLedgerWithDiff, (uint32_t), (override)); + MOCK_METHOD(std::optional, extractLedgerOnly, (uint32_t), (override)); +}; + +struct MockLoader : etlng::LoaderInterface { + MOCK_METHOD(void, load, (etlng::model::LedgerData const&), (override)); + MOCK_METHOD(std::optional, loadInitialLedger, (etlng::model::LedgerData const&), (override)); +}; + +struct MockCacheLoader : etlng::CacheLoaderInterface { + MOCK_METHOD(void, load, (uint32_t), (override)); + MOCK_METHOD(void, stop, (), (noexcept, override)); + MOCK_METHOD(void, wait, (), (noexcept, override)); +}; + +struct MockCacheUpdater : etlng::CacheUpdaterInterface { + MOCK_METHOD(void, update, (etlng::model::LedgerData const&), (override)); + MOCK_METHOD(void, update, (uint32_t, std::vector const&), (override)); + MOCK_METHOD(void, update, (uint32_t, std::vector const&), (override)); + MOCK_METHOD(void, setFull, (), (override)); +}; + +struct MockInitialLoadObserver : etlng::InitialLoadObserverInterface { + MOCK_METHOD( + void, + onInitialLoadGotMoreObjects, + (uint32_t, std::vector const&, std::optional), + (override) + ); +}; + +struct MockTaskManager : etlng::TaskManagerInterface { + MOCK_METHOD(void, run, (std::size_t), (override)); + MOCK_METHOD(void, stop, (), (override)); +}; + +struct MockTaskManagerProvider : etlng::TaskManagerProviderInterface { + MOCK_METHOD( + std::unique_ptr, + make, + (util::async::AnyExecutionContext, std::reference_wrapper, uint32_t), + (override) + ); +}; + +auto +createTestData(uint32_t seq) +{ + auto const header = createLedgerHeader(kLEDGER_HASH, seq); + return etlng::model::LedgerData{ + .transactions = {}, + .objects = {util::createObject(), util::createObject(), util::createObject()}, + .successors = {}, + .edgeKeys = {}, + .header = header, + .rawHeader = {}, + .seq = seq + }; +} +} // namespace + +struct ETLServiceTests : util::prometheus::WithPrometheus, MockBackendTest { + using SameThreadTestContext = util::async::BasicExecutionContext< + util::async::impl::SameThreadContext, + util::async::impl::BasicStopSource, + util::async::impl::SyncDispatchStrategy, + util::async::impl::SystemContextProvider, + util::async::impl::NoErrorHandler>; // This will allow ASSERTs turned exceptions to propagate + +protected: + SameThreadTestContext ctx_; + util::config::ClioConfigDefinition config_{ + {"extractor_threads", ConfigValue{ConfigType::Integer}.defaultValue(4)}, + {"io_threads", ConfigValue{ConfigType::Integer}.defaultValue(2)}, + {"cache.num_diffs", ConfigValue{ConfigType::Integer}.defaultValue(32)}, + {"cache.num_markers", ConfigValue{ConfigType::Integer}.defaultValue(48)}, + {"cache.num_cursors_from_diff", ConfigValue{ConfigType::Integer}.defaultValue(0)}, + {"cache.num_cursors_from_account", ConfigValue{ConfigType::Integer}.defaultValue(0)}, + {"cache.page_fetch_size", ConfigValue{ConfigType::Integer}.defaultValue(512)}, + {"cache.load", ConfigValue{ConfigType::String}.defaultValue("async")} + }; + StrictMockSubscriptionManagerSharedPtr subscriptions_; + std::shared_ptr> balancer_ = + std::make_shared>(); + std::shared_ptr> ledgers_ = + std::make_shared>(); + std::shared_ptr> publisher_ = + std::make_shared>(); + std::shared_ptr> cacheLoader_ = + std::make_shared>(); + std::shared_ptr> cacheUpdater_ = + std::make_shared>(); + std::shared_ptr> extractor_ = std::make_shared>(); + std::shared_ptr> loader_ = std::make_shared>(); + std::shared_ptr> initialLoadObserver_ = + std::make_shared>(); + std::shared_ptr> taskManagerProvider_ = + std::make_shared>(); + std::shared_ptr systemState_ = std::make_shared(); + + etlng::ETLService service_{ + ctx_, + config_, + backend_, + balancer_, + ledgers_, + publisher_, + cacheLoader_, + cacheUpdater_, + extractor_, + loader_, + initialLoadObserver_, + taskManagerProvider_, + systemState_ + }; +}; + +TEST_F(ETLServiceTests, GetInfoWithoutLastPublish) +{ + EXPECT_CALL(*balancer_, toJson()).WillOnce(testing::Return(boost::json::parse(R"json([{"test": "value"}])json"))); + EXPECT_CALL(*publisher_, getLastPublish()).WillOnce(testing::Return(std::chrono::system_clock::time_point{})); + EXPECT_CALL(*publisher_, lastPublishAgeSeconds()).WillRepeatedly(testing::Return(0)); + + auto result = service_.getInfo(); + auto expectedResult = boost::json::parse(R"json({ + "etl_sources": [{"test": "value"}], + "is_writer": 0, + "read_only": 0 + })json"); + + EXPECT_TRUE(result == expectedResult); + EXPECT_FALSE(result.contains("last_publish_age_seconds")); +} + +TEST_F(ETLServiceTests, GetInfoWithLastPublish) +{ + EXPECT_CALL(*balancer_, toJson()).WillOnce(testing::Return(boost::json::parse(R"json([{"test": "value"}])json"))); + EXPECT_CALL(*publisher_, getLastPublish()).WillOnce(testing::Return(std::chrono::system_clock::now())); + EXPECT_CALL(*publisher_, lastPublishAgeSeconds()).WillOnce(testing::Return(42)); + + auto result = service_.getInfo(); + auto expectedResult = boost::json::parse(R"json({ + "etl_sources": [{"test": "value"}], + "is_writer": 0, + "read_only": 0, + "last_publish_age_seconds": "42" + })json"); + + EXPECT_TRUE(result == expectedResult); +} + +TEST_F(ETLServiceTests, IsAmendmentBlocked) +{ + EXPECT_FALSE(service_.isAmendmentBlocked()); +} + +TEST_F(ETLServiceTests, IsCorruptionDetected) +{ + EXPECT_FALSE(service_.isCorruptionDetected()); +} + +TEST_F(ETLServiceTests, GetETLState) +{ + EXPECT_CALL(*balancer_, getETLState()).WillOnce(testing::Return(etl::ETLState{})); + + auto result = service_.getETLState(); + EXPECT_TRUE(result.has_value()); +} + +TEST_F(ETLServiceTests, LastCloseAgeSeconds) +{ + EXPECT_CALL(*publisher_, lastCloseAgeSeconds()).WillOnce(testing::Return(10)); + + auto result = service_.lastCloseAgeSeconds(); + EXPECT_GE(result, 0); +} + +TEST_F(ETLServiceTests, RunWithEmptyDatabase) +{ + auto mockTaskManager = std::make_unique>(); + auto ledgerData = createTestData(kSEQ); + + testing::Sequence s; + EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).InSequence(s).WillOnce(testing::Return(std::nullopt)); + EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ)); + EXPECT_CALL(*extractor_, extractLedgerOnly(kSEQ)).WillOnce(testing::Return(ledgerData)); + EXPECT_CALL(*balancer_, loadInitialLedger(kSEQ, testing::_, testing::_)) + .WillOnce(testing::Return(std::vector{})); + EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).WillOnce(testing::Return(ripple::LedgerHeader{})); + EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)) + .InSequence(s) + .WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); + EXPECT_CALL(*mockTaskManager, run(testing::_)); + EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1)) + .WillOnce(testing::Return(std::unique_ptr(mockTaskManager.release()))); + + service_.run(); +} + +TEST_F(ETLServiceTests, RunWithPopulatedDatabase) +{ + auto mockTaskManager = std::make_unique>(); + + EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)) + .WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); + EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ)); + EXPECT_CALL(*cacheLoader_, load(kSEQ)); + EXPECT_CALL(*mockTaskManager, run(testing::_)); + EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1)) + .WillOnce(testing::Return(std::unique_ptr(mockTaskManager.release()))); + + service_.run(); +} + +TEST_F(ETLServiceTests, WaitForValidatedLedgerIsAborted) +{ + EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt)); + EXPECT_CALL(*ledgers_, getMostRecent()).Times(2).WillRepeatedly(testing::Return(std::nullopt)); + + // No other calls should happen because we exit early + EXPECT_CALL(*extractor_, extractLedgerOnly(testing::_)).Times(0); + EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_)).Times(0); + EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).Times(0); + EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, testing::_)).Times(0); + + service_.run(); +} + +struct ETLServiceAssertTests : common::util::WithMockAssert, ETLServiceTests {}; + +TEST_F(ETLServiceAssertTests, FailToLoadInitialLedger) +{ + EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt)); + EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ)); + EXPECT_CALL(*extractor_, extractLedgerOnly(kSEQ)).WillOnce(testing::Return(std::nullopt)); + + // These calls should not happen because loading the initial ledger fails + EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_)).Times(0); + EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).Times(0); + EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, testing::_)).Times(0); + + EXPECT_CLIO_ASSERT_FAIL({ service_.run(); }); +} + +TEST_F(ETLServiceAssertTests, WaitForValidatedLedgerIsAbortedLeadToFailToLoadInitialLedger) +{ + testing::Sequence s; + EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt)); + EXPECT_CALL(*ledgers_, getMostRecent()).InSequence(s).WillOnce(testing::Return(std::nullopt)); + EXPECT_CALL(*ledgers_, getMostRecent()).InSequence(s).WillOnce(testing::Return(kSEQ)); + + // No other calls should happen because we exit early + EXPECT_CALL(*extractor_, extractLedgerOnly(testing::_)).Times(0); + EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_)).Times(0); + EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).Times(0); + EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, testing::_)).Times(0); + + EXPECT_CLIO_ASSERT_FAIL({ service_.run(); }); +} diff --git a/tests/unit/etlng/LedgerPublisherTests.cpp b/tests/unit/etlng/LedgerPublisherTests.cpp new file mode 100644 index 00000000..fdd32e36 --- /dev/null +++ b/tests/unit/etlng/LedgerPublisherTests.cpp @@ -0,0 +1,357 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "data/DBHelpers.hpp" +#include "data/Types.hpp" +#include "etl/SystemState.hpp" +#include "etlng/impl/LedgerPublisher.hpp" +#include "util/AsioContextTestFixture.hpp" +#include "util/MockBackendTestFixture.hpp" +#include "util/MockPrometheus.hpp" +#include "util/MockSubscriptionManager.hpp" +#include "util/TestObject.hpp" +#include "util/config/ConfigDefinition.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace testing; +using namespace etlng; +using namespace data; +using namespace std::chrono; + +namespace { + +constexpr auto kACCOUNT = "rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn"; +constexpr auto kACCOUNT2 = "rLEsXccBGNR3UPuPu2hUXPjziKC3qKSBun"; +constexpr auto kLEDGER_HASH = "4BC50C9B0D8515D3EAAE1E74B29A95804346C491EE1A95BF25E4AAB854A6A652"; +constexpr auto kSEQ = 30; +constexpr auto kAGE = 800; +constexpr auto kAMOUNT = 100; +constexpr auto kFEE = 3; +constexpr auto kFINAL_BALANCE = 110; +constexpr auto kFINAL_BALANCE2 = 30; + +MATCHER_P(ledgerHeaderMatcher, expectedHeader, "Headers match") +{ + return arg.seq == expectedHeader.seq && arg.hash == expectedHeader.hash && + arg.closeTime == expectedHeader.closeTime; +} + +} // namespace + +struct ETLLedgerPublisherNgTest : util::prometheus::WithPrometheus, MockBackendTestStrict, SyncAsioContextTest { + util::config::ClioConfigDefinition cfg{{}}; + StrictMockSubscriptionManagerSharedPtr mockSubscriptionManagerPtr; +}; + +TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderIsWritingFalseAndCacheDisabled) +{ + etl::SystemState dummyState; + dummyState.isWriting = false; + auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE); + impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + publisher.publish(dummyLedgerHeader); + EXPECT_CALL(*backend_, fetchLedgerDiff(kSEQ, _)).Times(0); + + // setLastPublishedSequence not in strand, should verify before run + EXPECT_TRUE(publisher.getLastPublishedSequence()); + EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); + + ctx_.run(); + EXPECT_TRUE(backend_->fetchLedgerRange()); + EXPECT_EQ(backend_->fetchLedgerRange().value().minSequence, kSEQ); + EXPECT_EQ(backend_->fetchLedgerRange().value().maxSequence, kSEQ); +} + +TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderIsWritingFalseAndCacheEnabled) +{ + etl::SystemState dummyState; + dummyState.isWriting = false; + auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE); + impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + publisher.publish(dummyLedgerHeader); + + // setLastPublishedSequence not in strand, should verify before run + EXPECT_TRUE(publisher.getLastPublishedSequence()); + EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); + + ctx_.run(); + EXPECT_TRUE(backend_->fetchLedgerRange()); + EXPECT_EQ(backend_->fetchLedgerRange().value().minSequence, kSEQ); + EXPECT_EQ(backend_->fetchLedgerRange().value().maxSequence, kSEQ); +} + +TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderIsWritingTrue) +{ + etl::SystemState dummyState; + dummyState.isWriting = true; + auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE); + impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + publisher.publish(dummyLedgerHeader); + + // setLastPublishedSequence not in strand, should verify before run + EXPECT_TRUE(publisher.getLastPublishedSequence()); + EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); + + ctx_.run(); + EXPECT_FALSE(backend_->fetchLedgerRange()); +} + +TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderInRange) +{ + etl::SystemState dummyState; + dummyState.isWriting = true; + + auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0 + impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + backend_->setRange(kSEQ - 1, kSEQ); + + publisher.publish(dummyLedgerHeader); + + // mock fetch fee + EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) + .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); + + TransactionAndMetadata t1; + t1.transaction = + createPaymentTransactionObject(kACCOUNT, kACCOUNT2, kAMOUNT, kFEE, kSEQ).getSerializer().peekData(); + t1.metadata = createPaymentTransactionMetaObject(kACCOUNT, kACCOUNT2, kFINAL_BALANCE, kFINAL_BALANCE2) + .getSerializer() + .peekData(); + t1.ledgerSequence = kSEQ; + + // mock fetch transactions + EXPECT_CALL(*backend_, fetchAllTransactionsInLedger).WillOnce(Return(std::vector{t1})); + + // setLastPublishedSequence not in strand, should verify before run + EXPECT_TRUE(publisher.getLastPublishedSequence()); + EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); + + EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 1)); + EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges); + // mock 1 transaction + EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction); + + ctx_.run(); + // last publish time should be set + EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1); +} + +TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderCloseTimeGreaterThanNow) +{ + etl::SystemState dummyState; + dummyState.isWriting = true; + + ripple::LedgerHeader dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); + auto const nowPlus10 = system_clock::now() + seconds(10); + auto const closeTime = duration_cast(nowPlus10.time_since_epoch()).count() - kRIPPLE_EPOCH_START; + dummyLedgerHeader.closeTime = ripple::NetClock::time_point{seconds{closeTime}}; + + backend_->setRange(kSEQ - 1, kSEQ); + + impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + publisher.publish(dummyLedgerHeader); + + // mock fetch fee + EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) + .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); + + TransactionAndMetadata t1; + t1.transaction = + createPaymentTransactionObject(kACCOUNT, kACCOUNT2, kAMOUNT, kFEE, kSEQ).getSerializer().peekData(); + t1.metadata = createPaymentTransactionMetaObject(kACCOUNT, kACCOUNT2, kFINAL_BALANCE, kFINAL_BALANCE2) + .getSerializer() + .peekData(); + t1.ledgerSequence = kSEQ; + + // mock fetch transactions + EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _)) + .WillOnce(Return(std::vector{t1})); + + // setLastPublishedSequence not in strand, should verify before run + EXPECT_TRUE(publisher.getLastPublishedSequence()); + EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); + + EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 1)); + EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges); + // mock 1 transaction + EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction); + + ctx_.run(); + // last publish time should be set + EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1); +} + +TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqStopIsTrue) +{ + etl::SystemState dummyState; + dummyState.isStopping = true; + impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + EXPECT_FALSE(publisher.publish(kSEQ, {})); +} + +TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqMaxAttempt) +{ + etl::SystemState dummyState; + dummyState.isStopping = false; + impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + + static constexpr auto kMAX_ATTEMPT = 2; + + LedgerRange const range{.minSequence = kSEQ - 1, .maxSequence = kSEQ - 1}; + EXPECT_CALL(*backend_, hardFetchLedgerRange).Times(kMAX_ATTEMPT).WillRepeatedly(Return(range)); + + EXPECT_FALSE(publisher.publish(kSEQ, kMAX_ATTEMPT, std::chrono::milliseconds{1})); +} + +TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqStopIsFalse) +{ + etl::SystemState dummyState; + dummyState.isStopping = false; + impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + + LedgerRange const range{.minSequence = kSEQ, .maxSequence = kSEQ}; + EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(Return(range)); + + auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE); + EXPECT_CALL(*backend_, fetchLedgerBySequence(kSEQ, _)).WillOnce(Return(dummyLedgerHeader)); + + EXPECT_TRUE(publisher.publish(kSEQ, {})); + ctx_.run(); +} + +TEST_F(ETLLedgerPublisherNgTest, PublishMultipleTxInOrder) +{ + etl::SystemState dummyState; + dummyState.isWriting = true; + + auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0 + impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + backend_->setRange(kSEQ - 1, kSEQ); + + publisher.publish(dummyLedgerHeader); + + // mock fetch fee + EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) + .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); + + // t1 index > t2 index + TransactionAndMetadata t1; + t1.transaction = + createPaymentTransactionObject(kACCOUNT, kACCOUNT2, kAMOUNT, kFEE, kSEQ).getSerializer().peekData(); + t1.metadata = createPaymentTransactionMetaObject(kACCOUNT, kACCOUNT2, kFINAL_BALANCE, kFINAL_BALANCE2, 2) + .getSerializer() + .peekData(); + t1.ledgerSequence = kSEQ; + t1.date = 1; + TransactionAndMetadata t2; + t2.transaction = + createPaymentTransactionObject(kACCOUNT, kACCOUNT2, kAMOUNT, kFEE, kSEQ).getSerializer().peekData(); + t2.metadata = createPaymentTransactionMetaObject(kACCOUNT, kACCOUNT2, kFINAL_BALANCE, kFINAL_BALANCE2, 1) + .getSerializer() + .peekData(); + t2.ledgerSequence = kSEQ; + t2.date = 2; + + // mock fetch transactions + EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _)) + .WillOnce(Return(std::vector{t1, t2})); + + // setLastPublishedSequence not in strand, should verify before run + EXPECT_TRUE(publisher.getLastPublishedSequence()); + EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); + + EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 2)); + EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges); + // should call pubTransaction t2 first (greater tx index) + Sequence const s; + EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t2, _)).InSequence(s); + EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t1, _)).InSequence(s); + + ctx_.run(); + // last publish time should be set + EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1); +} + +TEST_F(ETLLedgerPublisherNgTest, PublishVeryOldLedgerShouldSkip) +{ + etl::SystemState dummyState; + dummyState.isWriting = true; + + // Create a ledger header with age (800) greater than MAX_LEDGER_AGE_SECONDS (600) + auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 800); + impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + backend_->setRange(kSEQ - 1, kSEQ); + + publisher.publish(dummyLedgerHeader); + + EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger).Times(0); + EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges).Times(0); + EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction).Times(0); + + EXPECT_TRUE(publisher.getLastPublishedSequence()); + EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); + + ctx_.run(); +} + +TEST_F(ETLLedgerPublisherNgTest, PublishMultipleLedgersInQuickSuccession) +{ + etl::SystemState dummyState; + dummyState.isWriting = true; + + auto const dummyLedgerHeader1 = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); + auto const dummyLedgerHeader2 = createLedgerHeader(kLEDGER_HASH, kSEQ + 1, 0); + impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + backend_->setRange(kSEQ - 1, kSEQ + 1); + + // Publish two ledgers in quick succession + publisher.publish(dummyLedgerHeader1); + publisher.publish(dummyLedgerHeader2); + + EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) + .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); + EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ + 1, _)) + .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); + + EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _)) + .WillOnce(Return(std::vector{})); + EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ + 1, _)) + .WillOnce(Return(std::vector{})); + + Sequence const s; + EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(ledgerHeaderMatcher(dummyLedgerHeader1), _, _, _)).InSequence(s); + EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges(ledgerHeaderMatcher(dummyLedgerHeader1), _)).InSequence(s); + EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(ledgerHeaderMatcher(dummyLedgerHeader2), _, _, _)).InSequence(s); + EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges(ledgerHeaderMatcher(dummyLedgerHeader2), _)).InSequence(s); + + EXPECT_TRUE(publisher.getLastPublishedSequence()); + EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ + 1); + + ctx_.run(); +} diff --git a/tests/unit/etlng/LoadingTests.cpp b/tests/unit/etlng/LoadingTests.cpp index 1680d39d..512c8968 100644 --- a/tests/unit/etlng/LoadingTests.cpp +++ b/tests/unit/etlng/LoadingTests.cpp @@ -64,13 +64,10 @@ struct MockLoadObserver : etlng::InitialLoadObserverInterface { ); }; -struct LoadingTests : util::prometheus::WithPrometheus, - MockBackendTest, - MockLedgerFetcherTest, - MockAmendmentBlockHandlerTest { +struct LoadingTests : util::prometheus::WithPrometheus, MockBackendTest, MockAmendmentBlockHandlerTest { protected: std::shared_ptr mockRegistryPtr_ = std::make_shared(); - Loader loader_{backend_, mockLedgerFetcherPtr_, mockRegistryPtr_, mockAmendmentBlockHandlerPtr_}; + Loader loader_{backend_, mockRegistryPtr_, mockAmendmentBlockHandlerPtr_}; }; struct LoadingAssertTest : common::util::WithMockAssert, LoadingTests {}; @@ -148,6 +145,33 @@ TEST_F(LoadingTests, OnInitialLoadGotMoreObjectsWithoutKey) loader_.onInitialLoadGotMoreObjects(kSEQ, data.objects, lastKey); } +TEST_F(LoadingTests, OnInitialLoadGotMoreObjectsFailure) +{ + auto const data = createTestData(); + auto const lastKey = std::optional{}; + + EXPECT_CALL(*mockRegistryPtr_, dispatchInitialObjects(kSEQ, data.objects, std::string{})) + .WillOnce([](auto, auto, auto) { throw std::runtime_error("some error"); }); + EXPECT_CALL(*mockAmendmentBlockHandlerPtr_, notifyAmendmentBlocked()); + + loader_.onInitialLoadGotMoreObjects(kSEQ, data.objects, lastKey); +} + +TEST_F(LoadingTests, LoadInitialLedgerFailure) +{ + auto const data = createTestData(); + + EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt)); + EXPECT_CALL(*backend_, doFinishWrites()).Times(0); + EXPECT_CALL(*mockRegistryPtr_, dispatchInitialData(data)).WillOnce([](auto const&) { + throw std::runtime_error("some error"); + }); + EXPECT_CALL(*mockAmendmentBlockHandlerPtr_, notifyAmendmentBlocked()); + + auto const res = loader_.loadInitialLedger(data); + EXPECT_FALSE(res.has_value()); +} + TEST_F(LoadingAssertTest, LoadInitialLedgerHasDataInDB) { auto const data = createTestData(); diff --git a/tests/unit/etlng/MonitorTests.cpp b/tests/unit/etlng/MonitorTests.cpp index 5c07216e..e719a171 100644 --- a/tests/unit/etlng/MonitorTests.cpp +++ b/tests/unit/etlng/MonitorTests.cpp @@ -66,7 +66,7 @@ TEST_F(MonitorTests, ConsumesAndNotifiesForAllOutstandingSequencesAtOnce) }); auto subscription = monitor_.subscribe(actionMock_.AsStdFunction()); - monitor_.run(std::chrono::milliseconds{1}); + monitor_.run(std::chrono::milliseconds{10}); unblock.acquire(); } @@ -111,3 +111,18 @@ TEST_F(MonitorTests, NotifiesWhenForcedByNewSequenceAvailableFromNetwork) pusher(kSTART_SEQ); // pretend network validated a new ledger unblock.acquire(); } + +TEST_F(MonitorTests, NotifiesWhenForcedByLedgerLoaded) +{ + LedgerRange const range(kSTART_SEQ, kSTART_SEQ); + std::binary_semaphore unblock(0); + + EXPECT_CALL(*ledgers_, subscribe(testing::_)); + EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(range)); + EXPECT_CALL(actionMock_, Call).WillOnce([&] { unblock.release(); }); + + auto subscription = monitor_.subscribe(actionMock_.AsStdFunction()); + monitor_.run(std::chrono::seconds{10}); // expected to be force-invoked sooner than in 10 sec + monitor_.notifyLedgerLoaded(kSTART_SEQ); // notify about newly committed ledger + unblock.acquire(); +} diff --git a/tests/unit/etlng/RegistryTests.cpp b/tests/unit/etlng/RegistryTests.cpp index 3e01ca7a..5e40a6c2 100644 --- a/tests/unit/etlng/RegistryTests.cpp +++ b/tests/unit/etlng/RegistryTests.cpp @@ -17,16 +17,21 @@ */ //============================================================================== +#include "etl/SystemState.hpp" #include "etlng/Models.hpp" +#include "etlng/MonitorInterface.hpp" #include "etlng/impl/Registry.hpp" #include "util/BinaryTestObject.hpp" #include "util/LoggerFixtures.hpp" +#include "util/MockPrometheus.hpp" #include "util/TestObject.hpp" +#include #include #include #include +#include #include #include #include @@ -176,7 +181,88 @@ struct MockExtNftOffer { MOCK_METHOD(void, onInitialTransaction, (uint32_t, etlng::model::Transaction const&), (const)); }; -struct RegistryTest : NoLoggerFixture {}; +// Mock extensions with allowInReadonly +struct MockExtLedgerDataReadonly { + MOCK_METHOD(void, onLedgerData, (etlng::model::LedgerData const&), (const)); + + static bool + allowInReadonly() + { + return true; + } +}; + +struct MockExtInitialDataReadonly { + MOCK_METHOD(void, onInitialData, (etlng::model::LedgerData const&), (const)); + + static bool + allowInReadonly() + { + return true; + } +}; + +struct MockExtOnObjectReadonly { + MOCK_METHOD(void, onObject, (uint32_t, etlng::model::Object const&), (const)); + + static bool + allowInReadonly() + { + return true; + } +}; + +struct MockExtTransactionNftBurnReadonly { + using spec = etlng::model::Spec; + MOCK_METHOD(void, onTransaction, (uint32_t, etlng::model::Transaction const&), (const)); + + static bool + allowInReadonly() + { + return true; + } +}; + +struct MockExtInitialObjectReadonly { + MOCK_METHOD(void, onInitialObject, (uint32_t, etlng::model::Object const&), (const)); + + static bool + allowInReadonly() + { + return true; + } +}; + +struct MockExtInitialObjectsReadonly { + MOCK_METHOD(void, onInitialObjects, (uint32_t, std::vector const&, std::string), (const)); + + static bool + allowInReadonly() + { + return true; + } +}; + +struct MockExtNftBurnReadonly { + using spec = etlng::model::Spec; + MOCK_METHOD(void, onInitialTransaction, (uint32_t, etlng::model::Transaction const&), (const)); + + static bool + allowInReadonly() + { + return true; + } +}; + +struct RegistryTest : NoLoggerFixture, util::prometheus::WithPrometheus { + RegistryTest() + { + state_.isWriting = true; + } + +protected: + etl::SystemState state_{}; +}; } // namespace @@ -195,7 +281,7 @@ TEST_F(RegistryTest, FilteringOfTxWorksCorrectlyForInitialTransaction) EXPECT_CALL(extOffer, onInitialTransaction(testing::_, testing::_)); // 1 create offer auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); - auto reg = Registry(extBurn, extOffer); + auto reg = Registry(state_, extBurn, extOffer); reg.dispatchInitialData(etlng::model::LedgerData{ .transactions = transactions, .objects = {}, @@ -222,7 +308,7 @@ TEST_F(RegistryTest, FilteringOfTxWorksCorrectlyForTransaction) EXPECT_CALL(extOffer, onTransaction(testing::_, testing::_)); // 1 create offer auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); - auto reg = Registry(extBurn, extOffer); + auto reg = Registry(state_, extBurn, extOffer); reg.dispatch(etlng::model::LedgerData{ .transactions = std::move(transactions), .objects = {}, @@ -242,7 +328,7 @@ TEST_F(RegistryTest, InitialObjectsEmpty) EXPECT_CALL(extObj, onInitialObject(testing::_, testing::_)).Times(0); // 0 empty objects sent EXPECT_CALL(extObjs, onInitialObjects(testing::_, testing::_, testing::_)); // 1 vector passed as is - auto reg = Registry(extObj, extObjs); + auto reg = Registry(state_, extObj, extObjs); reg.dispatchInitialObjects(kSEQ, {}, {}); } @@ -254,7 +340,7 @@ TEST_F(RegistryTest, InitialObjectsDispatched) EXPECT_CALL(extObj, onInitialObject(testing::_, testing::_)).Times(3); // 3 objects sent EXPECT_CALL(extObjs, onInitialObjects(testing::_, testing::_, testing::_)); // 1 vector passed as is - auto reg = Registry(extObj, extObjs); + auto reg = Registry(state_, extObj, extObjs); reg.dispatchInitialObjects(kSEQ, {util::createObject(), util::createObject(), util::createObject()}, {}); } @@ -265,7 +351,7 @@ TEST_F(RegistryTest, ObjectsDispatched) EXPECT_CALL(extObj, onObject(testing::_, testing::_)).Times(3); // 3 objects sent auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); - auto reg = Registry(extObj); + auto reg = Registry(state_, extObj); reg.dispatch(etlng::model::LedgerData{ .transactions = {}, .objects = {util::createObject(), util::createObject(), util::createObject()}, @@ -290,7 +376,7 @@ TEST_F(RegistryTest, OnLedgerDataForBatch) EXPECT_CALL(ext, onLedgerData(testing::_)); // 1 batch (dispatch call) auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); - auto reg = Registry(ext); + auto reg = Registry(state_, ext); reg.dispatch(etlng::model::LedgerData{ .transactions = std::move(transactions), .objects = {}, @@ -311,7 +397,7 @@ TEST_F(RegistryTest, InitialObjectsCorrectOrderOfHookCalls) EXPECT_CALL(extObjs, onInitialObjects); EXPECT_CALL(extObj, onInitialObject).Times(3); - auto reg = Registry(extObj, extObjs); + auto reg = Registry(state_, extObj, extObjs); reg.dispatchInitialObjects(kSEQ, {util::createObject(), util::createObject(), util::createObject()}, {}); } @@ -331,7 +417,7 @@ TEST_F(RegistryTest, InitialDataCorrectOrderOfHookCalls) EXPECT_CALL(extInitialTransaction, onInitialTransaction).Times(2); auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); - auto reg = Registry(extInitialTransaction, extInitialData); + auto reg = Registry(state_, extInitialTransaction, extInitialData); reg.dispatchInitialData(etlng::model::LedgerData{ .transactions = std::move(transactions), .objects = {}, @@ -368,7 +454,7 @@ TEST_F(RegistryTest, LedgerDataCorrectOrderOfHookCalls) auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); auto reg = Registry( - extOnObject, extOnTransaction, extLedgerData + state_, extOnObject, extOnTransaction, extLedgerData ); reg.dispatch(etlng::model::LedgerData{ .transactions = std::move(transactions), @@ -380,3 +466,362 @@ TEST_F(RegistryTest, LedgerDataCorrectOrderOfHookCalls) .seq = kSEQ }); } + +TEST_F(RegistryTest, ReadonlyModeLedgerDataAllowed) +{ + auto transactions = std::vector{ + util::createTransaction(ripple::TxType::ttNFTOKEN_BURN), + util::createTransaction(ripple::TxType::ttNFTOKEN_BURN), + }; + + auto ext = MockExtLedgerDataReadonly{}; + state_.isWriting = false; + + EXPECT_CALL(ext, onLedgerData(testing::_)); + + auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); + auto reg = Registry(state_, ext); + reg.dispatch(etlng::model::LedgerData{ + .transactions = std::move(transactions), + .objects = {}, + .successors = {}, + .edgeKeys = {}, + .header = header, + .rawHeader = {}, + .seq = kSEQ + }); +} + +TEST_F(RegistryTest, ReadonlyModeTransactionAllowed) +{ + auto transactions = std::vector{ + util::createTransaction(ripple::TxType::ttNFTOKEN_BURN), + util::createTransaction(ripple::TxType::ttNFTOKEN_BURN), + }; + + auto extTx = MockExtTransactionNftBurnReadonly{}; + state_.isWriting = false; + + EXPECT_CALL(extTx, onTransaction(testing::_, testing::_)).Times(2); + + auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); + auto reg = Registry(state_, extTx); + reg.dispatch(etlng::model::LedgerData{ + .transactions = std::move(transactions), + .objects = {}, + .successors = {}, + .edgeKeys = {}, + .header = header, + .rawHeader = {}, + .seq = kSEQ + }); +} + +TEST_F(RegistryTest, ReadonlyModeObjectAllowed) +{ + auto objects = std::vector{ + util::createObject(), + util::createObject(), + util::createObject(), + }; + + auto extObj = MockExtOnObjectReadonly{}; + state_.isWriting = false; + + EXPECT_CALL(extObj, onObject(testing::_, testing::_)).Times(3); + + auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); + auto reg = Registry(state_, extObj); + reg.dispatch(etlng::model::LedgerData{ + .transactions = {}, + .objects = std::move(objects), + .successors = {}, + .edgeKeys = {}, + .header = header, + .rawHeader = {}, + .seq = kSEQ + }); +} + +TEST_F(RegistryTest, ReadonlyModeInitialDataAllowed) +{ + auto transactions = std::vector{ + util::createTransaction(ripple::TxType::ttNFTOKEN_BURN), + util::createTransaction(ripple::TxType::ttNFTOKEN_BURN), + }; + + auto extInitialData = MockExtInitialDataReadonly{}; + state_.isWriting = false; + + EXPECT_CALL(extInitialData, onInitialData(testing::_)); + + auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); + auto reg = Registry(state_, extInitialData); + reg.dispatchInitialData(etlng::model::LedgerData{ + .transactions = std::move(transactions), + .objects = {}, + .successors = {}, + .edgeKeys = {}, + .header = header, + .rawHeader = {}, + .seq = kSEQ + }); +} + +TEST_F(RegistryTest, ReadonlyModeInitialTransactionAllowed) +{ + auto transactions = std::vector{ + util::createTransaction(ripple::TxType::ttNFTOKEN_BURN), + util::createTransaction(ripple::TxType::ttNFTOKEN_BURN), + }; + + auto extTx = MockExtNftBurnReadonly{}; + state_.isWriting = false; + + EXPECT_CALL(extTx, onInitialTransaction(testing::_, testing::_)).Times(2); + + auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); + auto reg = Registry(state_, extTx); + reg.dispatchInitialData(etlng::model::LedgerData{ + .transactions = std::move(transactions), + .objects = {}, + .successors = {}, + .edgeKeys = {}, + .header = header, + .rawHeader = {}, + .seq = kSEQ + }); +} + +TEST_F(RegistryTest, ReadonlyModeInitialObjectAllowed) +{ + auto extObj = MockExtInitialObjectReadonly{}; + state_.isWriting = false; + + EXPECT_CALL(extObj, onInitialObject(testing::_, testing::_)).Times(3); + + auto reg = Registry(state_, extObj); + reg.dispatchInitialObjects(kSEQ, {util::createObject(), util::createObject(), util::createObject()}, {}); +} + +TEST_F(RegistryTest, ReadonlyModeInitialObjectsAllowed) +{ + auto extObjs = MockExtInitialObjectsReadonly{}; + state_.isWriting = false; + + EXPECT_CALL(extObjs, onInitialObjects(testing::_, testing::_, testing::_)); + + auto reg = Registry(state_, extObjs); + reg.dispatchInitialObjects(kSEQ, {util::createObject(), util::createObject(), util::createObject()}, {}); +} + +TEST_F(RegistryTest, ReadonlyModeRegularExtensionsNotCalled) +{ + auto extLedgerData = MockExtLedgerData{}; // No allowInReadonly method + auto objects = std::vector{ + util::createObject(), + util::createObject(), + util::createObject(), + }; + + state_.isWriting = false; + + EXPECT_CALL(extLedgerData, onLedgerData(testing::_)).Times(0); // Should NOT be called in readonly mode + + auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); + auto reg = Registry(state_, extLedgerData); + reg.dispatch(etlng::model::LedgerData{ + .transactions = {}, + .objects = std::move(objects), + .successors = {}, + .edgeKeys = {}, + .header = header, + .rawHeader = {}, + .seq = kSEQ + }); +} + +TEST_F(RegistryTest, MixedReadonlyAndRegularExtensions) +{ + auto extReadonly = MockExtLedgerDataReadonly{}; + auto extRegular = MockExtLedgerData{}; + auto objects = std::vector{ + util::createObject(), + util::createObject(), + util::createObject(), + }; + + state_.isWriting = false; + + EXPECT_CALL(extReadonly, onLedgerData(testing::_)); + EXPECT_CALL(extRegular, onLedgerData(testing::_)).Times(0); // Should NOT be called in readonly mode + + auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); + auto reg = Registry(state_, extReadonly, extRegular); + reg.dispatch(etlng::model::LedgerData{ + .transactions = {}, + .objects = std::move(objects), + .successors = {}, + .edgeKeys = {}, + .header = header, + .rawHeader = {}, + .seq = kSEQ + }); +} + +TEST_F(RegistryTest, MonitorInterfaceExecution) +{ + struct MockMonitor : etlng::MonitorInterface { + MOCK_METHOD(void, notifyLedgerLoaded, (uint32_t), (override)); + MOCK_METHOD(boost::signals2::scoped_connection, subscribe, (SignalType::slot_type const&), (override)); + MOCK_METHOD(void, run, (std::chrono::steady_clock::duration), (override)); + MOCK_METHOD(void, stop, (), (override)); + }; + + auto monitor = MockMonitor{}; + EXPECT_CALL(monitor, notifyLedgerLoaded(kSEQ)).Times(1); + + monitor.notifyLedgerLoaded(kSEQ); +} + +TEST_F(RegistryTest, ReadonlyModeWithAllowInReadonlyTest) +{ + struct ExtWithAllowInReadonly { + MOCK_METHOD(void, onLedgerData, (etlng::model::LedgerData const&), (const)); + + static bool + allowInReadonly() + { + return true; + } + }; + + auto ext = ExtWithAllowInReadonly{}; + state_.isWriting = false; + + EXPECT_CALL(ext, onLedgerData(testing::_)).Times(1); + + auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); + auto reg = Registry(state_, ext); + reg.dispatch(etlng::model::LedgerData{ + .transactions = {}, + .objects = {}, + .successors = {}, + .edgeKeys = {}, + .header = header, + .rawHeader = {}, + .seq = kSEQ + }); +} + +TEST_F(RegistryTest, ReadonlyModeExecutePluralHooksIfAllowedPaths) +{ + struct ExtWithBothHooksAndAllowReadonly { + MOCK_METHOD(void, onLedgerData, (etlng::model::LedgerData const&), (const)); + MOCK_METHOD(void, onInitialData, (etlng::model::LedgerData const&), (const)); + MOCK_METHOD(void, onInitialObjects, (uint32_t, std::vector const&, std::string), (const)); + + static bool + allowInReadonly() + { + return true; + } + }; + + auto ext = ExtWithBothHooksAndAllowReadonly{}; + state_.isWriting = false; + + auto transactions = std::vector{ + util::createTransaction(ripple::TxType::ttNFTOKEN_BURN), + }; + auto objects = std::vector{ + util::createObject(), + }; + + EXPECT_CALL(ext, onLedgerData(testing::_)).Times(1); + EXPECT_CALL(ext, onInitialData(testing::_)).Times(1); + EXPECT_CALL(ext, onInitialObjects(testing::_, testing::_, testing::_)).Times(1); + + auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); + auto reg = Registry(state_, ext); + + reg.dispatch(etlng::model::LedgerData{ + .transactions = transactions, + .objects = objects, + .successors = {}, + .edgeKeys = {}, + .header = header, + .rawHeader = {}, + .seq = kSEQ + }); + + reg.dispatchInitialData(etlng::model::LedgerData{ + .transactions = std::move(transactions), + .objects = {}, + .successors = {}, + .edgeKeys = {}, + .header = header, + .rawHeader = {}, + .seq = kSEQ + }); + + reg.dispatchInitialObjects(kSEQ, objects, {}); +} + +TEST_F(RegistryTest, ReadonlyModeExecuteByOneHooksIfAllowedPaths) +{ + struct ExtWithBothHooksAndAllowReadonly { + using spec = etlng::model::Spec; + + MOCK_METHOD(void, onObject, (uint32_t, etlng::model::Object const&), (const)); + MOCK_METHOD(void, onInitialObject, (uint32_t, etlng::model::Object const&), (const)); + MOCK_METHOD(void, onTransaction, (uint32_t, etlng::model::Transaction const&), (const)); + MOCK_METHOD(void, onInitialTransaction, (uint32_t, etlng::model::Transaction const&), (const)); + + static bool + allowInReadonly() + { + return true; + } + }; + + auto ext = ExtWithBothHooksAndAllowReadonly{}; + state_.isWriting = false; + + auto transactions = std::vector{ + util::createTransaction(ripple::TxType::ttNFTOKEN_BURN), + }; + auto objects = std::vector{ + util::createObject(), + }; + + EXPECT_CALL(ext, onTransaction(testing::_, testing::_)).Times(1); + EXPECT_CALL(ext, onObject(testing::_, testing::_)).Times(1); + EXPECT_CALL(ext, onInitialTransaction(testing::_, testing::_)).Times(1); + EXPECT_CALL(ext, onInitialObject(testing::_, testing::_)).Times(1); + + auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ); + auto reg = Registry(state_, ext); + + reg.dispatch(etlng::model::LedgerData{ + .transactions = transactions, + .objects = objects, + .successors = {}, + .edgeKeys = {}, + .header = header, + .rawHeader = {}, + .seq = kSEQ + }); + + reg.dispatchInitialData(etlng::model::LedgerData{ + .transactions = std::move(transactions), + .objects = {}, + .successors = {}, + .edgeKeys = {}, + .header = header, + .rawHeader = {}, + .seq = kSEQ + }); + + reg.dispatchInitialObjects(kSEQ, objects, {}); +} diff --git a/tests/unit/etlng/TaskManagerTests.cpp b/tests/unit/etlng/TaskManagerTests.cpp index cbeaa752..d94655a0 100644 --- a/tests/unit/etlng/TaskManagerTests.cpp +++ b/tests/unit/etlng/TaskManagerTests.cpp @@ -20,6 +20,7 @@ #include "etlng/ExtractorInterface.hpp" #include "etlng/LoaderInterface.hpp" #include "etlng/Models.hpp" +#include "etlng/MonitorInterface.hpp" #include "etlng/SchedulerInterface.hpp" #include "etlng/impl/Loading.hpp" #include "etlng/impl/TaskManager.hpp" @@ -29,11 +30,13 @@ #include "util/async/AnyExecutionContext.hpp" #include "util/async/context/BasicExecutionContext.hpp" +#include #include #include #include #include +#include #include #include #include @@ -63,18 +66,27 @@ struct MockLoader : etlng::LoaderInterface { MOCK_METHOD(std::optional, loadInitialLedger, (LedgerData const&), (override)); }; +struct MockMonitor : etlng::MonitorInterface { + MOCK_METHOD(void, notifyLedgerLoaded, (uint32_t), (override)); + MOCK_METHOD(boost::signals2::scoped_connection, subscribe, (SignalType::slot_type const&), (override)); + MOCK_METHOD(void, run, (std::chrono::steady_clock::duration), (override)); + MOCK_METHOD(void, stop, (), (override)); +}; + struct TaskManagerTests : NoLoggerFixture { using MockSchedulerType = testing::NiceMock; using MockExtractorType = testing::NiceMock; using MockLoaderType = testing::NiceMock; + using MockMonitorType = testing::NiceMock; protected: util::async::CoroExecutionContext ctx_{2}; std::shared_ptr mockSchedulerPtr_ = std::make_shared(); std::shared_ptr mockExtractorPtr_ = std::make_shared(); std::shared_ptr mockLoaderPtr_ = std::make_shared(); + std::shared_ptr mockMonitorPtr_ = std::make_shared(); - TaskManager taskManager_{ctx_, *mockSchedulerPtr_, *mockExtractorPtr_, *mockLoaderPtr_}; + TaskManager taskManager_{ctx_, mockSchedulerPtr_, *mockExtractorPtr_, *mockLoaderPtr_, *mockMonitorPtr_, kSEQ}; }; auto @@ -97,8 +109,7 @@ createTestData(uint32_t seq) TEST_F(TaskManagerTests, LoaderGetsDataIfNextSequenceIsExtracted) { static constexpr auto kTOTAL = 64uz; - static constexpr auto kEXTRACTORS = 5uz; - static constexpr auto kLOADERS = 1uz; + static constexpr auto kEXTRACTORS = 4uz; std::atomic_uint32_t seq = kSEQ; std::vector loaded; @@ -118,17 +129,16 @@ TEST_F(TaskManagerTests, LoaderGetsDataIfNextSequenceIsExtracted) EXPECT_CALL(*mockLoaderPtr_, load(testing::_)).Times(kTOTAL).WillRepeatedly([&](LedgerData data) { loaded.push_back(data.seq); - if (loaded.size() == kTOTAL) { done.release(); } }); - auto loop = ctx_.execute([&] { taskManager_.run({.numExtractors = kEXTRACTORS, .numLoaders = kLOADERS}); }); - done.acquire(); + EXPECT_CALL(*mockMonitorPtr_, notifyLedgerLoaded(testing::_)).Times(kTOTAL); + taskManager_.run(kEXTRACTORS); + done.acquire(); taskManager_.stop(); - loop.wait(); EXPECT_EQ(loaded.size(), kTOTAL); for (std::size_t i = 0; i < loaded.size(); ++i) { diff --git a/tests/unit/etlng/ext/CacheTests.cpp b/tests/unit/etlng/ext/CacheTests.cpp index c1697b6d..5fa83399 100644 --- a/tests/unit/etlng/ext/CacheTests.cpp +++ b/tests/unit/etlng/ext/CacheTests.cpp @@ -18,8 +18,10 @@ //============================================================================== #include "etlng/Models.hpp" +#include "etlng/impl/CacheUpdater.hpp" #include "etlng/impl/ext/Cache.hpp" #include "util/BinaryTestObject.hpp" +#include "util/LoggerFixtures.hpp" #include "util/MockLedgerCache.hpp" #include "util/MockPrometheus.hpp" #include "util/TestObject.hpp" @@ -27,6 +29,7 @@ #include #include +#include #include #include @@ -56,10 +59,11 @@ createTestData() } // namespace -struct CacheExtTests : util::prometheus::WithPrometheus { +struct CacheExtTests : NoLoggerFixture, util::prometheus::WithPrometheus { protected: MockLedgerCache cache_; - etlng::impl::CacheExt ext_{cache_}; + std::shared_ptr updater_ = std::make_shared(cache_); + etlng::impl::CacheExt ext_{updater_}; }; TEST_F(CacheExtTests, OnLedgerDataUpdatesCache) @@ -89,3 +93,8 @@ TEST_F(CacheExtTests, OnInitialObjectsUpdateCache) ext_.onInitialObjects(kSEQ, objects, kUNUSED_LAST_KEY); } + +TEST_F(CacheExtTests, AllowInReadonlyReturnsTrue) +{ + EXPECT_TRUE(ext_.allowInReadonly()); +} diff --git a/tests/unit/util/StrandedPriorityQueueTests.cpp b/tests/unit/util/StrandedPriorityQueueTests.cpp deleted file mode 100644 index 00703321..00000000 --- a/tests/unit/util/StrandedPriorityQueueTests.cpp +++ /dev/null @@ -1,195 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of clio: https://github.com/XRPLF/clio - Copyright (c) 2025, the clio developers. - - Permission to use, copy, modify, and distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include "util/StrandedPriorityQueue.hpp" -#include "util/async/AnyExecutionContext.hpp" -#include "util/async/AnyOperation.hpp" -#include "util/async/context/BasicExecutionContext.hpp" - -#include - -#include -#include -#include -#include -#include -#include - -using namespace util; - -namespace { - -struct TestData { - uint32_t seq; - - auto - operator<=>(TestData const&) const = default; -}; - -} // namespace - -TEST(StrandedPriorityQueueTests, DefaultPriority) -{ - util::async::CoroExecutionContext ctx; - StrandedPriorityQueue queue{ctx.makeStrand()}; - - for (auto i = 0u; i < 100u; ++i) { - EXPECT_TRUE(queue.enqueue(TestData{.seq = i})); - } - - EXPECT_FALSE(queue.empty()); - - auto next = 99u; - while (auto maybeValue = queue.dequeue()) { - EXPECT_EQ(maybeValue->seq, next--); - } - - EXPECT_TRUE(queue.empty()); -} - -TEST(StrandedPriorityQueueTests, CustomPriority) -{ - struct Comp { - [[nodiscard]] bool - operator()(TestData const& lhs, TestData const& rhs) const noexcept - { - return lhs.seq > rhs.seq; - } - }; - - util::async::CoroExecutionContext ctx; - StrandedPriorityQueue queue{ctx.makeStrand()}; - - for (auto i = 0u; i < 100u; ++i) { - EXPECT_TRUE(queue.enqueue(TestData{.seq = i})); - } - - EXPECT_FALSE(queue.empty()); - - auto next = 0u; - while (auto maybeValue = queue.dequeue()) { - EXPECT_EQ(maybeValue->seq, next++); - } - - EXPECT_TRUE(queue.empty()); -} - -TEST(StrandedPriorityQueueTests, MultipleThreadsUnlimitedQueue) -{ - async::CoroExecutionContext realCtx{6}; - async::AnyExecutionContext ctx{realCtx}; - StrandedPriorityQueue queue{ctx.makeStrand()}; - - EXPECT_TRUE(queue.empty()); - static constexpr auto kTOTAL_THREADS = 5u; - static constexpr auto kTOTAL_ITEMS_PER_THREAD = 100u; - - std::atomic_size_t totalEnqueued = 0uz; - std::vector> tasks; - tasks.reserve(kTOTAL_THREADS); - - for (auto batchIdx = 0u; batchIdx < kTOTAL_THREADS; ++batchIdx) { - // enqueue batches tasks running on multiple threads - tasks.push_back(ctx.execute([&queue, batchIdx, &totalEnqueued] { - for (auto i = 0u; i < kTOTAL_ITEMS_PER_THREAD; ++i) { - if (queue.enqueue(TestData{.seq = (batchIdx * kTOTAL_ITEMS_PER_THREAD) + i})) - ++totalEnqueued; - } - })); - } - - for (auto& task : tasks) - task.wait(); - - auto next = (kTOTAL_ITEMS_PER_THREAD * kTOTAL_THREADS) - 1; - while (auto maybeValue = queue.dequeue()) { - EXPECT_EQ(maybeValue->seq, next--); - } - - EXPECT_TRUE(queue.empty()); - EXPECT_EQ(totalEnqueued, kTOTAL_ITEMS_PER_THREAD * kTOTAL_THREADS); -} - -TEST(StrandedPriorityQueueTests, MultipleThreadsLimitedQueue) -{ - static constexpr auto kQUEUE_SIZE_LIMIT = 32uz; - static constexpr auto kTOTAL_THREADS = 5u; - static constexpr auto kTOTAL_ITEMS_PER_THREAD = 100u; - - async::CoroExecutionContext realCtx{8}; - async::AnyExecutionContext ctx{realCtx}; - StrandedPriorityQueue queue{ctx.makeStrand(), kQUEUE_SIZE_LIMIT}; - - EXPECT_TRUE(queue.empty()); - - std::atomic_size_t totalEnqueued = 0uz; - std::atomic_size_t totalSleepCycles = 0uz; - std::vector> tasks; - tasks.reserve(kTOTAL_THREADS); - - std::unordered_set expectedSequences; - - for (auto batchIdx = 0u; batchIdx < kTOTAL_THREADS; ++batchIdx) { - for (auto i = 0u; i < kTOTAL_ITEMS_PER_THREAD; ++i) { - expectedSequences.insert((batchIdx * kTOTAL_ITEMS_PER_THREAD) + i); - } - - // enqueue batches tasks running on multiple threads - tasks.push_back(ctx.execute([&queue, batchIdx, &totalEnqueued, &totalSleepCycles] { - for (auto i = 0u; i < kTOTAL_ITEMS_PER_THREAD; ++i) { - auto data = TestData{.seq = (batchIdx * kTOTAL_ITEMS_PER_THREAD) + i}; - while (not queue.enqueue(data)) { - std::this_thread::sleep_for(std::chrono::nanoseconds{1}); - ++totalSleepCycles; - } - ++totalEnqueued; - } - })); - } - - EXPECT_FALSE(expectedSequences.empty()); - - auto loader = ctx.execute([&queue, &expectedSequences] { - while (not expectedSequences.empty()) { - while (auto maybeValue = queue.dequeue()) { - EXPECT_TRUE(expectedSequences.contains(maybeValue->seq)); - expectedSequences.erase(maybeValue->seq); - } - } - }); - - for (auto& task : tasks) - task.wait(); - loader.wait(); - - EXPECT_TRUE(queue.empty()); - EXPECT_TRUE(expectedSequences.empty()); - EXPECT_EQ(totalEnqueued, kTOTAL_ITEMS_PER_THREAD * kTOTAL_THREADS); - EXPECT_GE(totalSleepCycles, 1uz); -} - -TEST(StrandedPriorityQueueTests, ReturnsNulloptIfQueueEmpty) -{ - async::CoroExecutionContext realCtx; - StrandedPriorityQueue queue{realCtx.makeStrand()}; - - EXPECT_TRUE(queue.empty()); - auto maybeValue = queue.dequeue(); - EXPECT_FALSE(maybeValue.has_value()); -}