//------------------------------------------------------------------------------ /* This file is part of clio: https://github.com/XRPLF/clio Copyright (c) 2025, the clio developers. Permission to use, copy, modify, and distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies. THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ //============================================================================== #include "etl/ETLService.hpp" #include "data/BackendInterface.hpp" #include "data/LedgerCacheInterface.hpp" #include "data/Types.hpp" #include "etl/CacheLoader.hpp" #include "etl/CacheLoaderInterface.hpp" #include "etl/CacheUpdaterInterface.hpp" #include "etl/CorruptionDetector.hpp" #include "etl/ETLServiceInterface.hpp" #include "etl/ETLState.hpp" #include "etl/ExtractorInterface.hpp" #include "etl/InitialLoadObserverInterface.hpp" #include "etl/LedgerPublisherInterface.hpp" #include "etl/LoadBalancerInterface.hpp" #include "etl/LoaderInterface.hpp" #include "etl/MonitorInterface.hpp" #include "etl/MonitorProviderInterface.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/SystemState.hpp" #include "etl/TaskManagerProviderInterface.hpp" #include "etl/impl/AmendmentBlockHandler.hpp" #include "etl/impl/CacheUpdater.hpp" #include "etl/impl/Extraction.hpp" #include "etl/impl/LedgerFetcher.hpp" #include "etl/impl/LedgerPublisher.hpp" #include "etl/impl/Loading.hpp" #include "etl/impl/MonitorProvider.hpp" #include "etl/impl/Registry.hpp" #include "etl/impl/Scheduling.hpp" #include "etl/impl/TaskManager.hpp" #include "etl/impl/TaskManagerProvider.hpp" #include "etl/impl/ext/Cache.hpp" #include "etl/impl/ext/Core.hpp" #include "etl/impl/ext/MPT.hpp" #include "etl/impl/ext/NFT.hpp" #include "etl/impl/ext/Successor.hpp" #include "feed/SubscriptionManagerInterface.hpp" #include "util/Assert.hpp" #include "util/Profiler.hpp" #include "util/async/AnyExecutionContext.hpp" #include "util/log/Logger.hpp" #include #include #include #include #include #include #include #include #include #include #include namespace etl { std::shared_ptr ETLService::makeETLService( util::config::ClioConfigDefinition const& config, std::shared_ptr state, util::async::AnyExecutionContext ctx, std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr ledgers ) { std::shared_ptr ret; auto fetcher = std::make_shared(backend, balancer); auto extractor = std::make_shared(fetcher); auto publisher = std::make_shared(ctx, backend, subscriptions, *state); auto cacheLoader = std::make_shared>(config, backend, backend->cache()); auto cacheUpdater = std::make_shared(backend->cache()); auto amendmentBlockHandler = std::make_shared(ctx, *state); auto monitorProvider = std::make_shared(); backend->setCorruptionDetector(CorruptionDetector{*state, backend->cache()}); auto loader = std::make_shared( backend, impl::makeRegistry( *state, impl::CacheExt{cacheUpdater}, impl::CoreExt{backend}, impl::SuccessorExt{backend, backend->cache()}, impl::NFTExt{backend}, impl::MPTExt{backend} ), amendmentBlockHandler, state ); auto taskManagerProvider = std::make_shared(*ledgers, extractor, loader); ret = std::make_shared( ctx, config, backend, balancer, ledgers, publisher, cacheLoader, cacheUpdater, extractor, loader, // loader itself loader, // initial load observer taskManagerProvider, monitorProvider, state ); // inject networkID into subscriptions, as transaction feed require it to inject CTID in // response if (auto const etlState = ret->getETLState(); etlState) subscriptions->setNetworkID(etlState->networkID); ret->run(); return ret; } ETLService::ETLService( util::async::AnyExecutionContext ctx, std::reference_wrapper config, std::shared_ptr backend, std::shared_ptr balancer, std::shared_ptr ledgers, std::shared_ptr publisher, std::shared_ptr cacheLoader, std::shared_ptr cacheUpdater, std::shared_ptr extractor, std::shared_ptr loader, std::shared_ptr initialLoadObserver, std::shared_ptr taskManagerProvider, std::shared_ptr monitorProvider, std::shared_ptr state ) : ctx_(std::move(ctx)) , config_(config) , backend_(std::move(backend)) , balancer_(std::move(balancer)) , ledgers_(std::move(ledgers)) , publisher_(std::move(publisher)) , cacheLoader_(std::move(cacheLoader)) , cacheUpdater_(std::move(cacheUpdater)) , extractor_(std::move(extractor)) , loader_(std::move(loader)) , initialLoadObserver_(std::move(initialLoadObserver)) , taskManagerProvider_(std::move(taskManagerProvider)) , monitorProvider_(std::move(monitorProvider)) , state_(std::move(state)) , startSequence_(config.get().maybeValue("start_sequence")) , finishSequence_(config.get().maybeValue("finish_sequence")) , writeCommandStrand_(ctx_.makeStrand()) { ASSERT(not state_->isWriting, "ETL should never start in writer mode"); if (startSequence_.has_value()) LOG(log_.info()) << "Start sequence: " << *startSequence_; if (finishSequence_.has_value()) LOG(log_.info()) << "Finish sequence: " << *finishSequence_; LOG(log_.info()) << "Starting in " << (state_->isStrictReadonly ? "STRICT READONLY MODE" : "WRITE MODE"); } ETLService::~ETLService() { stop(); LOG(log_.debug()) << "Destroying ETL"; } void ETLService::run() { LOG(log_.info()) << "Running ETL..."; mainLoop_.emplace(ctx_.execute([this] { auto const rng = loadInitialLedgerIfNeeded(); LOG(log_.info()) << "Waiting for next ledger to be validated by network..."; std::optional const mostRecentValidated = ledgers_->getMostRecent(); if (not mostRecentValidated) { LOG(log_.info()) << "The wait for the next validated ledger has been aborted. " "Exiting monitor loop"; return; } if (not rng.has_value()) { LOG(log_.warn()) << "Initial ledger download got cancelled - stopping ETL service"; return; } auto const nextSequence = rng->maxSequence + 1; LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence; startMonitor(nextSequence); state_->etlStarted = true; // If we are a writer as the result of loading the initial ledger - start loading if (state_->isWriting) startLoading(nextSequence); })); } void ETLService::stop() { LOG(log_.info()) << "Stop called"; systemStateWriteCommandSubscription_.disconnect(); auto count = runningWriteCommandHandlers_.load(); while (count != 0) { runningWriteCommandHandlers_.wait(count); // Blocks until value changes count = runningWriteCommandHandlers_.load(); } if (mainLoop_) mainLoop_->wait(); if (taskMan_) taskMan_->stop(); if (monitor_) monitor_->stop(); } boost::json::object ETLService::getInfo() const { boost::json::object result; result["etl_sources"] = balancer_->toJson(); result["is_writer"] = static_cast(state_->isWriting); result["read_only"] = static_cast(state_->isStrictReadonly); auto last = publisher_->getLastPublish(); if (last.time_since_epoch().count() != 0) result["last_publish_age_seconds"] = std::to_string(publisher_->lastPublishAgeSeconds()); return result; } bool ETLService::isAmendmentBlocked() const { return state_->isAmendmentBlocked; } bool ETLService::isCorruptionDetected() const { return state_->isCorruptionDetected; } std::optional ETLService::getETLState() const { return balancer_->getETLState(); } std::uint32_t ETLService::lastCloseAgeSeconds() const { return publisher_->lastCloseAgeSeconds(); } std::optional ETLService::loadInitialLedgerIfNeeded() { auto rng = backend_->hardFetchLedgerRangeNoThrow(); if (not rng.has_value()) { ASSERT( not state_->isStrictReadonly, "Database is empty but this node is in strict readonly mode. Can't write initial " "ledger." ); LOG(log_.info()) << "Database is empty. Will download a ledger from the network."; state_->isWriting = true; // immediately become writer as the db is empty auto const getMostRecent = [this]() { LOG(log_.info()) << "Waiting for next ledger to be validated by network..."; return ledgers_->getMostRecent(); }; if (auto const maybeSeq = startSequence_.or_else(getMostRecent); maybeSeq.has_value()) { auto const seq = *maybeSeq; LOG(log_.info()) << "Starting from sequence " << seq << ". Initial ledger download and extraction can take a while..."; auto [ledger, timeDiff] = ::util::timed>([this, seq]() { return extractor_->extractLedgerOnly(seq).and_then( [this, seq](auto&& data) -> std::optional { // TODO: loadInitialLedger in balancer should be called fetchEdgeKeys or // similar auto res = balancer_->loadInitialLedger(seq, *initialLoadObserver_); if (not res.has_value() and res.error() == InitialLedgerLoadError::Cancelled) { LOG(log_.debug()) << "Initial ledger load got cancelled"; return std::nullopt; } ASSERT(res.has_value(), "Initial ledger retry logic failed"); data.edgeKeys = std::move(res).value(); return loader_->loadInitialLedger(data); } ); }); if (not ledger.has_value()) { LOG(log_.error()) << "Failed to load initial ledger. Exiting monitor loop"; return std::nullopt; } LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff; LOG(log_.info()) << "Finished loadInitialLedger. cache size = " << backend_->cache().size(); return backend_->hardFetchLedgerRangeNoThrow(); } LOG(log_.info()) << "The wait for the next validated ledger has been aborted. " "Exiting monitor loop"; return std::nullopt; } LOG(log_.info()) << "Database already populated. Picking up from the tip of history"; if (not backend_->cache().isFull()) { cacheLoader_->load(rng->maxSequence); } return rng; } void ETLService::updateCache(uint32_t seq) { auto const cacheNeedsUpdate = backend_->cache().latestLedgerSequence() < seq; auto const backendRange = backend_->fetchLedgerRange(); auto const backendNeedsUpdate = backendRange.has_value() and backendRange->maxSequence < seq; if (cacheNeedsUpdate) { auto const diff = data::synchronousAndRetryOnTimeout([this, seq](auto yield) { return backend_->fetchLedgerDiff(seq, yield); }); cacheUpdater_->update(seq, diff); } if (backendNeedsUpdate) backend_->updateRange(seq); publisher_->publish(seq, {}); } void ETLService::startMonitor(uint32_t seq) { monitor_ = monitorProvider_->make(ctx_, backend_, ledgers_, seq); systemStateWriteCommandSubscription_ = state_->writeCommandSignal.connect([this](SystemState::WriteCommand command) { ++runningWriteCommandHandlers_; writeCommandStrand_.submit([this, command]() { switch (command) { case etl::SystemState::WriteCommand::StartWriting: attemptTakeoverWriter(); break; case etl::SystemState::WriteCommand::StopWriting: giveUpWriter(); break; } --runningWriteCommandHandlers_; runningWriteCommandHandlers_.notify_one(); }); }); monitorNewSeqSubscription_ = monitor_->subscribeToNewSequence([this](uint32_t seq) { LOG(log_.info()) << "ETLService (via Monitor) got new seq from db: " << seq; updateCache(seq); }); monitorDbStalledSubscription_ = monitor_->subscribeToDbStalled([this]() { LOG(log_.warn()) << "ETLService received DbStalled signal from Monitor"; // Database stall detected - no writer has been active for 10 seconds // This triggers the fallback mechanism and attempts to become the writer if (not state_->isStrictReadonly and not state_->isWriting) state_->writeCommandSignal(SystemState::WriteCommand::StartWriting); state_->isWriterDecidingFallback = true; }); monitor_->run(); } void ETLService::startLoading(uint32_t seq) { ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes"); taskMan_ = taskManagerProvider_->make(ctx_, *monitor_, seq, finishSequence_); // FIXME: this legacy name "extractor_threads" is no longer accurate (we have coroutines now) taskMan_->run(config_.get().get("extractor_threads")); } void ETLService::attemptTakeoverWriter() { ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes"); auto rng = backend_->hardFetchLedgerRangeNoThrow(); ASSERT(rng.has_value(), "Ledger range can't be null"); if (backend_->cache().latestLedgerSequence() != rng->maxSequence) { LOG(log_.info()) << "Wanted to take over the ETL writer seat but LedgerCache is outdated"; // Give ETL time to update LedgerCache. This method will be called because // ClusterCommunication will likely to continue sending StartWriting signal every 1 second return; } state_->isWriting = true; // switch to writer LOG(log_.info()) << "Taking over the ETL writer seat"; startLoading(rng->maxSequence + 1); } void ETLService::giveUpWriter() { ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes"); state_->isWriting = false; LOG(log_.info()) << "Giving up writer seat"; taskMan_ = nullptr; } } // namespace etl