From 743c9b92de9065f92c654e358957b0fd930d9fa9 Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Wed, 11 Jun 2025 17:53:14 +0100 Subject: [PATCH] feat: Read-write switching in ETLng (#2199) Fixes #1597 --- cmake/Settings.cmake | 8 +- src/etl/ETLService.cpp | 11 +- src/etl/ETLService.hpp | 2 +- src/etl/SystemState.hpp | 2 +- src/etl/impl/LedgerLoader.hpp | 4 +- src/etlng/ETLService.cpp | 75 +++++-- src/etlng/ETLService.hpp | 16 +- src/etlng/LoadBalancerInterface.hpp | 12 +- src/etlng/LoaderInterface.hpp | 7 +- src/etlng/MonitorInterface.hpp | 23 +- src/etlng/MonitorProviderInterface.hpp | 64 ++++++ src/etlng/TaskManagerProviderInterface.hpp | 2 +- src/etlng/impl/LedgerPublisher.hpp | 5 +- src/etlng/impl/Loading.cpp | 54 ++++- src/etlng/impl/Loading.hpp | 11 +- src/etlng/impl/Monitor.cpp | 84 ++++++- src/etlng/impl/Monitor.hpp | 28 ++- src/etlng/impl/MonitorProvider.hpp | 53 +++++ src/etlng/impl/TaskManager.cpp | 26 ++- src/util/Constants.hpp | 1 + src/util/async/context/impl/Cancellation.hpp | 1 + tests/unit/etl/ExtractorTests.cpp | 2 +- tests/unit/etl/TransformerTests.cpp | 2 +- tests/unit/etlng/ETLServiceTests.cpp | 221 +++++++++++++++++-- tests/unit/etlng/LedgerPublisherTests.cpp | 105 +++++---- tests/unit/etlng/LoadingTests.cpp | 6 +- tests/unit/etlng/MonitorTests.cpp | 59 ++++- tests/unit/etlng/RegistryTests.cpp | 20 +- tests/unit/etlng/TaskManagerTests.cpp | 93 +++++++- 29 files changed, 816 insertions(+), 181 deletions(-) create mode 100644 src/etlng/MonitorProviderInterface.hpp create mode 100644 src/etlng/impl/MonitorProvider.hpp diff --git a/cmake/Settings.cmake b/cmake/Settings.cmake index 85fe7df9..02ef1b1c 100644 --- a/cmake/Settings.cmake +++ b/cmake/Settings.cmake @@ -1,20 +1,20 @@ set(COMPILER_FLAGS + -pedantic -Wall -Wcast-align -Wdouble-promotion - -Wextra -Werror + -Wextra -Wformat=2 -Wimplicit-fallthrough -Wmisleading-indentation - -Wno-narrowing - -Wno-deprecated-declarations -Wno-dangling-else + -Wno-deprecated-declarations + -Wno-narrowing -Wno-unused-but-set-variable -Wnon-virtual-dtor -Wnull-dereference -Wold-style-cast - -pedantic -Wpedantic -Wunused # FIXME: The following bunch are needed for gcc12 atm. diff --git a/src/etl/ETLService.cpp b/src/etl/ETLService.cpp index 63ea4591..496f019f 100644 --- a/src/etl/ETLService.cpp +++ b/src/etl/ETLService.cpp @@ -38,6 +38,7 @@ #include "etlng/LoadBalancer.hpp" #include "etlng/LoadBalancerInterface.hpp" #include "etlng/impl/LedgerPublisher.hpp" +#include "etlng/impl/MonitorProvider.hpp" #include "etlng/impl/TaskManagerProvider.hpp" #include "etlng/impl/ext/Cache.hpp" #include "etlng/impl/ext/Core.hpp" @@ -86,6 +87,7 @@ ETLService::makeETLService( ); auto state = std::make_shared(); + state->isStrictReadonly = config.get("read_only"); auto fetcher = std::make_shared(backend, balancer); auto extractor = std::make_shared(fetcher); @@ -93,6 +95,7 @@ ETLService::makeETLService( auto cacheLoader = std::make_shared>(config, backend, backend->cache()); auto cacheUpdater = std::make_shared(backend->cache()); auto amendmentBlockHandler = std::make_shared(ctx, *state); + auto monitorProvider = std::make_shared(); auto loader = std::make_shared( backend, @@ -104,7 +107,8 @@ ETLService::makeETLService( etlng::impl::NFTExt{backend}, etlng::impl::MPTExt{backend} ), - amendmentBlockHandler + amendmentBlockHandler, + state ); auto taskManagerProvider = std::make_shared(*ledgers, extractor, loader); @@ -122,6 +126,7 @@ ETLService::makeETLService( loader, // loader itself loader, // initial load observer taskManagerProvider, + monitorProvider, state ); } else { @@ -346,7 +351,7 @@ ETLService::doWork() worker_ = std::thread([this]() { beast::setCurrentThreadName("ETLService worker"); - if (state_.isReadOnly) { + if (state_.isStrictReadonly) { monitorReadOnly(); } else { monitor(); @@ -373,7 +378,7 @@ ETLService::ETLService( { startSequence_ = config.maybeValue("start_sequence"); finishSequence_ = config.maybeValue("finish_sequence"); - state_.isReadOnly = config.get("read_only"); + state_.isStrictReadonly = config.get("read_only"); extractorThreads_ = config.get("extractor_threads"); // This should probably be done in the backend factory but we don't have state available until here diff --git a/src/etl/ETLService.hpp b/src/etl/ETLService.hpp index af9a3442..09f5478e 100644 --- a/src/etl/ETLService.hpp +++ b/src/etl/ETLService.hpp @@ -239,7 +239,7 @@ public: result["etl_sources"] = loadBalancer_->toJson(); result["is_writer"] = static_cast(state_.isWriting); - result["read_only"] = static_cast(state_.isReadOnly); + result["read_only"] = static_cast(state_.isStrictReadonly); auto last = ledgerPublisher_.getLastPublish(); if (last.time_since_epoch().count() != 0) result["last_publish_age_seconds"] = std::to_string(ledgerPublisher_.lastPublishAgeSeconds()); diff --git a/src/etl/SystemState.hpp b/src/etl/SystemState.hpp index fdfc6c6e..7f841665 100644 --- a/src/etl/SystemState.hpp +++ b/src/etl/SystemState.hpp @@ -37,7 +37,7 @@ struct SystemState { * In strict read-only mode, the process will never attempt to become the ETL writer, and will only publish ledgers * as they are written to the database. */ - util::prometheus::Bool isReadOnly = PrometheusService::boolMetric( + util::prometheus::Bool isStrictReadonly = PrometheusService::boolMetric( "read_only", util::prometheus::Labels{}, "Whether the process is in strict read-only mode" diff --git a/src/etl/impl/LedgerLoader.hpp b/src/etl/impl/LedgerLoader.hpp index 8eeb7553..2ca7bdf1 100644 --- a/src/etl/impl/LedgerLoader.hpp +++ b/src/etl/impl/LedgerLoader.hpp @@ -242,8 +242,8 @@ public: } prev = cur->key; - static constexpr std::size_t kLOG_INTERVAL = 100000; - if (numWrites % kLOG_INTERVAL == 0 && numWrites != 0) + static constexpr std::size_t kLOG_STRIDE = 100000; + if (numWrites % kLOG_STRIDE == 0 && numWrites != 0) LOG(log_.info()) << "Wrote " << numWrites << " book successors"; } diff --git a/src/etlng/ETLService.cpp b/src/etlng/ETLService.cpp index 0d0eb7e5..991b867a 100644 --- a/src/etlng/ETLService.cpp +++ b/src/etlng/ETLService.cpp @@ -35,13 +35,13 @@ #include "etlng/LoadBalancerInterface.hpp" #include "etlng/LoaderInterface.hpp" #include "etlng/MonitorInterface.hpp" +#include "etlng/MonitorProviderInterface.hpp" #include "etlng/TaskManagerProviderInterface.hpp" #include "etlng/impl/AmendmentBlockHandler.hpp" #include "etlng/impl/CacheUpdater.hpp" #include "etlng/impl/Extraction.hpp" #include "etlng/impl/LedgerPublisher.hpp" #include "etlng/impl/Loading.hpp" -#include "etlng/impl/Monitor.hpp" #include "etlng/impl/Registry.hpp" #include "etlng/impl/Scheduling.hpp" #include "etlng/impl/TaskManager.hpp" @@ -52,6 +52,7 @@ #include "util/Assert.hpp" #include "util/Profiler.hpp" #include "util/async/AnyExecutionContext.hpp" +#include "util/async/AnyOperation.hpp" #include "util/log/Logger.hpp" #include @@ -82,6 +83,7 @@ ETLService::ETLService( std::shared_ptr loader, std::shared_ptr initialLoadObserver, std::shared_ptr taskManagerProvider, + std::shared_ptr monitorProvider, std::shared_ptr state ) : ctx_(std::move(ctx)) @@ -96,9 +98,11 @@ ETLService::ETLService( , loader_(std::move(loader)) , initialLoadObserver_(std::move(initialLoadObserver)) , taskManagerProvider_(std::move(taskManagerProvider)) + , monitorProvider_(std::move(monitorProvider)) , state_(std::move(state)) { - LOG(log_.info()) << "Creating ETLng..."; + ASSERT(not state_->isWriting, "ETL should never start in writer mode"); + LOG(log_.info()) << "Starting in " << (state_->isStrictReadonly ? "STRICT READONLY MODE" : "WRITE MODE"); } ETLService::~ETLService() @@ -112,12 +116,7 @@ ETLService::run() { LOG(log_.info()) << "Running ETLng..."; - // TODO: write-enabled node should start in readonly and do the 10 second dance to become a writer mainLoop_.emplace(ctx_.execute([this] { - state_->isWriting = - not state_->isReadOnly; // TODO: this is now needed because we don't have a mechanism for readonly or - // ETL writer node. remove later in favor of real mechanism - auto const rng = loadInitialLedgerIfNeeded(); LOG(log_.info()) << "Waiting for next ledger to be validated by network..."; @@ -135,9 +134,8 @@ ETLService::run() LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence; startMonitor(nextSequence); - // TODO: we only want to run the full ETL task man if we are POSSIBLY a write node - // but definitely not in strict readonly - if (not state_->isReadOnly) + // If we are a writer as the result of loading the initial ledger - start loading + if (state_->isWriting) startLoading(nextSequence); })); } @@ -147,6 +145,8 @@ ETLService::stop() { LOG(log_.info()) << "Stop called"; + if (mainLoop_) + mainLoop_->wait(); if (taskMan_) taskMan_->stop(); if (monitor_) @@ -160,7 +160,7 @@ ETLService::getInfo() const result["etl_sources"] = balancer_->toJson(); result["is_writer"] = static_cast(state_->isWriting); - result["read_only"] = static_cast(state_->isReadOnly); + result["read_only"] = static_cast(state_->isStrictReadonly); auto last = publisher_->getLastPublish(); if (last.time_since_epoch().count() != 0) result["last_publish_age_seconds"] = std::to_string(publisher_->lastPublishAgeSeconds()); @@ -196,12 +196,19 @@ ETLService::loadInitialLedgerIfNeeded() { auto rng = backend_->hardFetchLedgerRangeNoThrow(); if (not rng.has_value()) { + ASSERT( + not state_->isStrictReadonly, + "Database is empty but this node is in strict readonly mode. Can't write initial ledger." + ); + LOG(log_.info()) << "Database is empty. Will download a ledger from the network."; + state_->isWriting = true; // immediately become writer as the db is empty LOG(log_.info()) << "Waiting for next ledger to be validated by network..."; if (auto const mostRecentValidated = ledgers_->getMostRecent(); mostRecentValidated.has_value()) { auto const seq = *mostRecentValidated; - LOG(log_.info()) << "Ledger " << seq << " has been validated. Downloading... "; + LOG(log_.info()) << "Ledger " << seq << " has been validated. " + << "Downloading and extracting (takes a while)..."; auto [ledger, timeDiff] = ::util::timed>([this, seq]() { return extractor_->extractLedgerOnly(seq).and_then([this, seq](auto&& data) { @@ -238,28 +245,64 @@ ETLService::loadInitialLedgerIfNeeded() void ETLService::startMonitor(uint32_t seq) { - monitor_ = std::make_unique(ctx_, backend_, ledgers_, seq); - monitorSubscription_ = monitor_->subscribe([this](uint32_t seq) { - log_.info() << "MONITOR got new seq from db: " << seq; + monitor_ = monitorProvider_->make(ctx_, backend_, ledgers_, seq); + + monitorNewSeqSubscription_ = monitor_->subscribeToNewSequence([this](uint32_t seq) { + LOG(log_.info()) << "ETLService (via Monitor) got new seq from db: " << seq; + + if (state_->writeConflict) { + LOG(log_.info()) << "Got a write conflict; Giving up writer seat immediately"; + giveUpWriter(); + } - // FIXME: is this the best way? if (not state_->isWriting) { auto const diff = data::synchronousAndRetryOnTimeout([this, seq](auto yield) { return backend_->fetchLedgerDiff(seq, yield); }); + cacheUpdater_->update(seq, diff); + backend_->updateRange(seq); } publisher_->publish(seq, {}); }); + + monitorDbStalledSubscription_ = monitor_->subscribeToDbStalled([this]() { + LOG(log_.warn()) << "ETLService received DbStalled signal from Monitor"; + if (not state_->isStrictReadonly and not state_->isWriting) + attemptTakeoverWriter(); + }); + monitor_->run(); } void ETLService::startLoading(uint32_t seq) { + ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes"); taskMan_ = taskManagerProvider_->make(ctx_, *monitor_, seq); taskMan_->run(config_.get().get("extractor_threads")); } +void +ETLService::attemptTakeoverWriter() +{ + ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes"); + auto rng = backend_->hardFetchLedgerRangeNoThrow(); + ASSERT(rng.has_value(), "Ledger range can't be null"); + + state_->isWriting = true; // switch to writer + LOG(log_.info()) << "Taking over the ETL writer seat"; + startLoading(rng->maxSequence + 1); +} + +void +ETLService::giveUpWriter() +{ + ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes"); + state_->isWriting = false; + state_->writeConflict = false; + taskMan_ = nullptr; +} + } // namespace etlng diff --git a/src/etlng/ETLService.hpp b/src/etlng/ETLService.hpp index 168db6f9..5ec75e0e 100644 --- a/src/etlng/ETLService.hpp +++ b/src/etlng/ETLService.hpp @@ -35,6 +35,7 @@ #include "etlng/LoadBalancerInterface.hpp" #include "etlng/LoaderInterface.hpp" #include "etlng/MonitorInterface.hpp" +#include "etlng/MonitorProviderInterface.hpp" #include "etlng/TaskManagerInterface.hpp" #include "etlng/TaskManagerProviderInterface.hpp" #include "etlng/impl/AmendmentBlockHandler.hpp" @@ -42,7 +43,6 @@ #include "etlng/impl/Extraction.hpp" #include "etlng/impl/LedgerPublisher.hpp" #include "etlng/impl/Loading.hpp" -#include "etlng/impl/Monitor.hpp" #include "etlng/impl/Registry.hpp" #include "etlng/impl/Scheduling.hpp" #include "etlng/impl/TaskManager.hpp" @@ -69,6 +69,7 @@ #include #include +#include #include #include #include @@ -106,12 +107,14 @@ class ETLService : public ETLServiceInterface { std::shared_ptr loader_; std::shared_ptr initialLoadObserver_; std::shared_ptr taskManagerProvider_; + std::shared_ptr monitorProvider_; std::shared_ptr state_; std::unique_ptr monitor_; std::unique_ptr taskMan_; - boost::signals2::scoped_connection monitorSubscription_; + boost::signals2::scoped_connection monitorNewSeqSubscription_; + boost::signals2::scoped_connection monitorDbStalledSubscription_; std::optional> mainLoop_; @@ -131,6 +134,7 @@ public: * @param loader Interface for loading data * @param initialLoadObserver The observer for initial data loading * @param taskManagerProvider The provider of the task manager instance + * @param monitorProvider The provider of the monitor instance * @param state System state tracking object */ ETLService( @@ -146,6 +150,7 @@ public: std::shared_ptr loader, std::shared_ptr initialLoadObserver, std::shared_ptr taskManagerProvider, + std::shared_ptr monitorProvider, std::shared_ptr state ); @@ -173,7 +178,6 @@ public: lastCloseAgeSeconds() const override; private: - // TODO: this better be std::expected std::optional loadInitialLedgerIfNeeded(); @@ -182,6 +186,12 @@ private: void startLoading(uint32_t seq); + + void + attemptTakeoverWriter(); + + void + giveUpWriter(); }; } // namespace etlng diff --git a/src/etlng/LoadBalancerInterface.hpp b/src/etlng/LoadBalancerInterface.hpp index 200bbb3f..fbba73a4 100644 --- a/src/etlng/LoadBalancerInterface.hpp +++ b/src/etlng/LoadBalancerInterface.hpp @@ -59,7 +59,7 @@ public: * @param retryAfter Time to wait between retries (2 seconds by default) * @return A std::vector The ledger data */ - virtual std::vector + [[nodiscard]] virtual std::vector loadInitialLedger( uint32_t sequence, etlng::InitialLoadObserverInterface& loader, @@ -74,7 +74,7 @@ public: * @param retryAfter Time to wait between retries (2 seconds by default) * @return A std::vector The ledger data */ - virtual std::vector + [[nodiscard]] virtual std::vector loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}) = 0; /** @@ -90,7 +90,7 @@ public: * @return The extracted data, if extraction was successful. If the ledger was found * in the database or the server is shutting down, the optional will be empty */ - virtual OptionalGetLedgerResponseType + [[nodiscard]] virtual OptionalGetLedgerResponseType fetchLedger( uint32_t ledgerSequence, bool getObjects, @@ -103,7 +103,7 @@ public: * * @return JSON representation of the state of this load balancer. */ - virtual boost::json::value + [[nodiscard]] virtual boost::json::value toJson() const = 0; /** @@ -115,7 +115,7 @@ public: * @param yield The coroutine context * @return Response received from rippled node as JSON object on success or error on failure */ - virtual std::expected + [[nodiscard]] virtual std::expected forwardToRippled( boost::json::object const& request, std::optional const& clientIp, @@ -127,7 +127,7 @@ public: * @brief Return state of ETL nodes. * @return ETL state, nullopt if etl nodes not available */ - virtual std::optional + [[nodiscard]] virtual std::optional getETLState() noexcept = 0; /** diff --git a/src/etlng/LoaderInterface.hpp b/src/etlng/LoaderInterface.hpp index 72cad319..1d2f4e53 100644 --- a/src/etlng/LoaderInterface.hpp +++ b/src/etlng/LoaderInterface.hpp @@ -23,10 +23,14 @@ #include +#include #include +#include namespace etlng { +using Error = std::string; + /** * @brief An interface for a ETL Loader */ @@ -36,8 +40,9 @@ struct LoaderInterface { /** * @brief Load ledger data * @param data The data to load + * @return Nothing or error as std::expected */ - virtual void + [[nodiscard]] virtual std::expected load(model::LedgerData const& data) = 0; /** diff --git a/src/etlng/MonitorInterface.hpp b/src/etlng/MonitorInterface.hpp index dfae9344..13645534 100644 --- a/src/etlng/MonitorInterface.hpp +++ b/src/etlng/MonitorInterface.hpp @@ -36,7 +36,8 @@ namespace etlng { class MonitorInterface { public: static constexpr auto kDEFAULT_REPEAT_INTERVAL = std::chrono::seconds{1}; - using SignalType = boost::signals2::signal; + using NewSequenceSignalType = boost::signals2::signal; + using DbStalledSignalType = boost::signals2::signal; virtual ~MonitorInterface() = default; @@ -45,7 +46,14 @@ public: * @param seq The ledger sequence loaded */ virtual void - notifyLedgerLoaded(uint32_t seq) = 0; + notifySequenceLoaded(uint32_t seq) = 0; + + /** + * @brief Notifies the monitor of a write conflict + * @param seq The sequence number of the ledger that encountered a write conflict + */ + virtual void + notifyWriteConflict(uint32_t seq) = 0; /** * @brief Allows clients to get notified when a new ledger becomes available in Clio's database @@ -54,7 +62,16 @@ public: * @return A connection object that automatically disconnects the subscription once destroyed */ [[nodiscard]] virtual boost::signals2::scoped_connection - subscribe(SignalType::slot_type const& subscriber) = 0; + subscribeToNewSequence(NewSequenceSignalType::slot_type const& subscriber) = 0; + + /** + * @brief Allows clients to get notified when no database update is detected for a configured period. + * + * @param subscriber The slot to connect + * @return A connection object that automatically disconnects the subscription once destroyed + */ + [[nodiscard]] virtual boost::signals2::scoped_connection + subscribeToDbStalled(DbStalledSignalType::slot_type const& subscriber) = 0; /** * @brief Run the monitor service diff --git a/src/etlng/MonitorProviderInterface.hpp b/src/etlng/MonitorProviderInterface.hpp new file mode 100644 index 00000000..f92b1a43 --- /dev/null +++ b/src/etlng/MonitorProviderInterface.hpp @@ -0,0 +1,64 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/BackendInterface.hpp" +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etlng/MonitorInterface.hpp" +#include "util/async/AnyExecutionContext.hpp" + +#include +#include +#include + +namespace etlng { + +/** + * @brief An interface for providing Monitor instances + */ +struct MonitorProviderInterface { + /** + * @brief The time Monitor should wait before reporting absence of updates to the database + */ + static constexpr auto kDEFAULT_DB_STALLED_REPORT_DELAY = std::chrono::seconds{10}; + + virtual ~MonitorProviderInterface() = default; + + /** + * @brief Create a new Monitor instance + * + * @param ctx The execution context for asynchronous operations + * @param backend Interface to the backend database + * @param validatedLedgers Interface for accessing network validated ledgers + * @param startSequence The sequence number to start monitoring from + * @param dbStalledReportDelay The timeout duration after which to signal no database updates + * @return A unique pointer to a Monitor implementation + */ + [[nodiscard]] virtual std::unique_ptr + make( + util::async::AnyExecutionContext ctx, + std::shared_ptr backend, + std::shared_ptr validatedLedgers, + uint32_t startSequence, + std::chrono::steady_clock::duration dbStalledReportDelay = kDEFAULT_DB_STALLED_REPORT_DELAY + ) = 0; +}; + +} // namespace etlng diff --git a/src/etlng/TaskManagerProviderInterface.hpp b/src/etlng/TaskManagerProviderInterface.hpp index 2894170b..532c0ad7 100644 --- a/src/etlng/TaskManagerProviderInterface.hpp +++ b/src/etlng/TaskManagerProviderInterface.hpp @@ -44,7 +44,7 @@ struct TaskManagerProviderInterface { * @param seq The sequence to start at * @return A unique pointer to a TaskManager implementation */ - virtual std::unique_ptr + [[nodiscard]] virtual std::unique_ptr make(util::async::AnyExecutionContext ctx, std::reference_wrapper monitor, uint32_t seq) = 0; }; diff --git a/src/etlng/impl/LedgerPublisher.hpp b/src/etlng/impl/LedgerPublisher.hpp index 2c0d9ed7..ddac66c8 100644 --- a/src/etlng/impl/LedgerPublisher.hpp +++ b/src/etlng/impl/LedgerPublisher.hpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -164,10 +165,6 @@ public: boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() { LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq); - // TODO: This should probably not be part of publisher in the future - if (not state_.get().isWriting) - backend_->updateRange(lgrInfo.seq); // This can't be unit tested atm. - setLastClose(lgrInfo.closeTime); auto age = lastCloseAgeSeconds(); diff --git a/src/etlng/impl/Loading.cpp b/src/etlng/impl/Loading.cpp index 701fe359..0603ef8c 100644 --- a/src/etlng/impl/Loading.cpp +++ b/src/etlng/impl/Loading.cpp @@ -20,11 +20,14 @@ #include "etlng/impl/Loading.hpp" #include "data/BackendInterface.hpp" +#include "etl/SystemState.hpp" #include "etl/impl/LedgerLoader.hpp" #include "etlng/AmendmentBlockHandlerInterface.hpp" +#include "etlng/LoaderInterface.hpp" #include "etlng/Models.hpp" #include "etlng/RegistryInterface.hpp" #include "util/Assert.hpp" +#include "util/Constants.hpp" #include "util/LedgerUtils.hpp" #include "util/Profiler.hpp" #include "util/log/Logger.hpp" @@ -46,29 +49,45 @@ namespace etlng::impl { Loader::Loader( std::shared_ptr backend, std::shared_ptr registry, - std::shared_ptr amendmentBlockHandler + std::shared_ptr amendmentBlockHandler, + std::shared_ptr state ) : backend_(std::move(backend)) , registry_(std::move(registry)) , amendmentBlockHandler_(std::move(amendmentBlockHandler)) + , state_(std::move(state)) { } -void +std::expected Loader::load(model::LedgerData const& data) { try { - // perform cache updates and all writes from extensions + // Perform cache updates and all writes from extensions + // TODO: maybe this readonly logic should be removed? registry_->dispatch(data); - auto [success, duration] = - ::util::timed>([&]() { return backend_->finishWrites(data.seq); }); - LOG(log_.info()) << "Finished writes to DB for " << data.seq << ": " << (success ? "YES" : "NO") << "; took " - << duration; + // Only a writer should attempt to commit to DB + // This is also where conflicts with other writer nodes will be detected + if (state_->isWriting) { + auto [success, duration] = + ::util::timed([&]() { return backend_->finishWrites(data.seq); }); + LOG(log_.info()) << "Finished writes to DB for " << data.seq << ": " << (success ? "YES" : "NO") + << "; took " << duration << "ms"; + + if (not success) { + state_->writeConflict = true; + LOG(log_.warn()) << "Another node wrote a ledger into the DB - we have a write conflict"; + return std::unexpected("write conflict"); + } + } } catch (std::runtime_error const& e) { LOG(log_.fatal()) << "Failed to load " << data.seq << ": " << e.what(); amendmentBlockHandler_->notifyAmendmentBlocked(); + return std::unexpected("amendment blocked"); } + + return {}; }; void @@ -78,13 +97,32 @@ Loader::onInitialLoadGotMoreObjects( std::optional lastKey ) { + static constexpr std::size_t kLOG_STRIDE = 1000u; + static auto kINITIAL_LOAD_START_TIME = std::chrono::steady_clock::now(); + try { - LOG(log_.debug()) << "On initial load: got more objects for seq " << seq << ". size = " << data.size(); + LOG(log_.trace()) << "On initial load: got more objects for seq " << seq << ". size = " << data.size(); registry_->dispatchInitialObjects( seq, data, std::move(lastKey).value_or(std::string{}) // TODO: perhaps use optional all the way to extensions? ); + + initialLoadWrittenObjects_ += data.size(); + ++initialLoadWrites_; + if (initialLoadWrites_ % kLOG_STRIDE == 0u && initialLoadWrites_ != 0u) { + auto elapsedSinceStart = std::chrono::duration_cast( + std::chrono::steady_clock::now() - kINITIAL_LOAD_START_TIME + ); + auto elapsedSeconds = elapsedSinceStart.count() / static_cast(util::kMILLISECONDS_PER_SECOND); + auto objectsPerSecond = + elapsedSeconds > 0.0 ? static_cast(initialLoadWrittenObjects_) / elapsedSeconds : 0.0; + + LOG(log_.info()) << "Wrote " << initialLoadWrittenObjects_ + << " initial ledger objects so far with average rate of " << objectsPerSecond + << " objects per second"; + } + } catch (std::runtime_error const& e) { LOG(log_.fatal()) << "Failed to load initial objects for " << seq << ": " << e.what(); amendmentBlockHandler_->notifyAmendmentBlocked(); diff --git a/src/etlng/impl/Loading.hpp b/src/etlng/impl/Loading.hpp index 39a1e150..9da5edf9 100644 --- a/src/etlng/impl/Loading.hpp +++ b/src/etlng/impl/Loading.hpp @@ -20,7 +20,7 @@ #pragma once #include "data/BackendInterface.hpp" -#include "etl/LedgerFetcherInterface.hpp" +#include "etl/SystemState.hpp" #include "etl/impl/LedgerLoader.hpp" #include "etlng/AmendmentBlockHandlerInterface.hpp" #include "etlng/InitialLoadObserverInterface.hpp" @@ -39,6 +39,7 @@ #include #include +#include #include #include #include @@ -51,7 +52,10 @@ class Loader : public LoaderInterface, public InitialLoadObserverInterface { std::shared_ptr backend_; std::shared_ptr registry_; std::shared_ptr amendmentBlockHandler_; + std::shared_ptr state_; + std::size_t initialLoadWrittenObjects_{0u}; + std::size_t initialLoadWrites_{0u}; util::Logger log_{"ETL"}; public: @@ -62,7 +66,8 @@ public: Loader( std::shared_ptr backend, std::shared_ptr registry, - std::shared_ptr amendmentBlockHandler + std::shared_ptr amendmentBlockHandler, + std::shared_ptr state ); Loader(Loader const&) = delete; @@ -72,7 +77,7 @@ public: Loader& operator=(Loader&&) = delete; - void + std::expected load(model::LedgerData const& data) override; void diff --git a/src/etlng/impl/Monitor.cpp b/src/etlng/impl/Monitor.cpp index e55eb345..241eaa8d 100644 --- a/src/etlng/impl/Monitor.cpp +++ b/src/etlng/impl/Monitor.cpp @@ -23,11 +23,11 @@ #include "etl/NetworkValidatedLedgersInterface.hpp" #include "util/Assert.hpp" #include "util/async/AnyExecutionContext.hpp" -#include "util/async/AnyOperation.hpp" #include "util/log/Logger.hpp" #include +#include #include #include #include @@ -41,12 +41,18 @@ Monitor::Monitor( util::async::AnyExecutionContext ctx, std::shared_ptr backend, std::shared_ptr validatedLedgers, - uint32_t startSequence + uint32_t startSequence, + std::chrono::steady_clock::duration dbStalledReportDelay ) : strand_(ctx.makeStrand()) , backend_(std::move(backend)) , validatedLedgers_(std::move(validatedLedgers)) , nextSequence_(startSequence) + , updateData_({ + .dbStalledReportDelay = dbStalledReportDelay, + .lastDbCheckTime = std::chrono::steady_clock::now(), + .lastSeenMaxSeqInDb = startSequence > 0 ? startSequence - 1 : 0, + }) { } @@ -55,20 +61,37 @@ Monitor::~Monitor() stop(); } -// TODO: think about using signals perhaps? maybe combining with onNextSequence? -// also, how do we not double invoke or does it not matter void -Monitor::notifyLedgerLoaded(uint32_t seq) +Monitor::notifySequenceLoaded(uint32_t seq) { - LOG(log_.debug()) << "Loader notified about newly committed ledger " << seq; - repeatedTask_->invoke(); // force-invoke immediately + LOG(log_.debug()) << "Loader notified Monitor about newly committed ledger " << seq; + { + auto lck = updateData_.lock(); + lck->lastSeenMaxSeqInDb = std::max(seq, lck->lastSeenMaxSeqInDb); + lck->lastDbCheckTime = std::chrono::steady_clock::now(); + } + repeatedTask_->invoke(); // force-invoke doWork immediately }; +void +Monitor::notifyWriteConflict(uint32_t seq) +{ + LOG(log_.warn()) << "Loader notified Monitor about write conflict at " << seq; + nextSequence_ = seq + 1; // we already loaded the cache for seq just before we detected conflict + LOG(log_.warn()) << "Resume monitoring from " << nextSequence_; +} + void Monitor::run(std::chrono::steady_clock::duration repeatInterval) { ASSERT(not repeatedTask_.has_value(), "Monitor attempted to run more than once"); - LOG(log_.debug()) << "Starting monitor"; + { + auto lck = updateData_.lock(); + LOG(log_.debug()) << "Starting monitor with repeat interval: " + << std::chrono::duration_cast(repeatInterval).count() + << "s and dbStalledReportDelay: " + << std::chrono::duration_cast(lck->dbStalledReportDelay).count() << "s"; + } repeatedTask_ = strand_.executeRepeatedly(repeatInterval, std::bind_front(&Monitor::doWork, this)); subscription_ = validatedLedgers_->subscribe(std::bind_front(&Monitor::onNextSequence, this)); @@ -80,28 +103,65 @@ Monitor::stop() if (repeatedTask_.has_value()) repeatedTask_->abort(); + subscription_ = std::nullopt; repeatedTask_ = std::nullopt; } boost::signals2::scoped_connection -Monitor::subscribe(SignalType::slot_type const& subscriber) +Monitor::subscribeToNewSequence(NewSequenceSignalType::slot_type const& subscriber) { return notificationChannel_.connect(subscriber); } +boost::signals2::scoped_connection +Monitor::subscribeToDbStalled(DbStalledSignalType::slot_type const& subscriber) +{ + return dbStalledChannel_.connect(subscriber); +} + void Monitor::onNextSequence(uint32_t seq) { - LOG(log_.debug()) << "rippled published sequence " << seq; + ASSERT(repeatedTask_.has_value(), "Ledger subscription without repeated task is a logic error"); + LOG(log_.debug()) << "Notified about new sequence on the network: " << seq; repeatedTask_->invoke(); // force-invoke immediately } void Monitor::doWork() { - if (auto rng = backend_->hardFetchLedgerRangeNoThrow(); rng) { - while (rng->maxSequence >= nextSequence_) + auto rng = backend_->hardFetchLedgerRangeNoThrow(); + bool dbProgressedThisCycle = false; + auto lck = updateData_.lock(); + + if (rng.has_value()) { + if (rng->maxSequence > lck->lastSeenMaxSeqInDb) { + LOG(log_.trace()) << "DB progressed. Old max seq = " << lck->lastSeenMaxSeqInDb + << ", new max seq = " << rng->maxSequence; + lck->lastSeenMaxSeqInDb = rng->maxSequence; + dbProgressedThisCycle = true; + } + + while (lck->lastSeenMaxSeqInDb >= nextSequence_) { + LOG(log_.trace()) << "Publishing from Monitor::doWork. nextSequence_ = " << nextSequence_ + << ", lastSeenMaxSeqInDb_ = " << lck->lastSeenMaxSeqInDb; notificationChannel_(nextSequence_++); + dbProgressedThisCycle = true; + } + } else { + LOG(log_.trace()) << "DB range is not available or empty. lastSeenMaxSeqInDb_ = " << lck->lastSeenMaxSeqInDb + << ", nextSequence_ = " << nextSequence_; + } + + if (dbProgressedThisCycle) { + lck->lastDbCheckTime = std::chrono::steady_clock::now(); + } else if (std::chrono::steady_clock::now() - lck->lastDbCheckTime > lck->dbStalledReportDelay) { + LOG(log_.info()) << "No DB update detected for " + << std::chrono::duration_cast(lck->dbStalledReportDelay).count() + << " seconds. Firing dbStalledChannel. Last seen max seq in DB: " << lck->lastSeenMaxSeqInDb + << ". Expecting next: " << nextSequence_; + dbStalledChannel_(); + lck->lastDbCheckTime = std::chrono::steady_clock::now(); } } diff --git a/src/etlng/impl/Monitor.hpp b/src/etlng/impl/Monitor.hpp index b8971bc8..c59e6063 100644 --- a/src/etlng/impl/Monitor.hpp +++ b/src/etlng/impl/Monitor.hpp @@ -22,6 +22,7 @@ #include "data/BackendInterface.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp" #include "etlng/MonitorInterface.hpp" +#include "util/Mutex.hpp" #include "util/async/AnyExecutionContext.hpp" #include "util/async/AnyOperation.hpp" #include "util/async/AnyStrand.hpp" @@ -30,6 +31,7 @@ #include #include +#include #include #include #include @@ -43,11 +45,20 @@ class Monitor : public MonitorInterface { std::shared_ptr backend_; std::shared_ptr validatedLedgers_; - uint32_t nextSequence_; + std::atomic_uint32_t nextSequence_; std::optional> repeatedTask_; std::optional subscription_; // network validated ledgers subscription - SignalType notificationChannel_; + NewSequenceSignalType notificationChannel_; + DbStalledSignalType dbStalledChannel_; + + struct UpdateData { + std::chrono::steady_clock::duration dbStalledReportDelay; + std::chrono::steady_clock::time_point lastDbCheckTime; + uint32_t lastSeenMaxSeqInDb = 0u; + }; + + util::Mutex updateData_; util::Logger log_{"ETL"}; @@ -56,12 +67,16 @@ public: util::async::AnyExecutionContext ctx, std::shared_ptr backend, std::shared_ptr validatedLedgers, - uint32_t startSequence + uint32_t startSequence, + std::chrono::steady_clock::duration dbStalledReportDelay ); ~Monitor() override; void - notifyLedgerLoaded(uint32_t seq) override; + notifySequenceLoaded(uint32_t seq) override; + + void + notifyWriteConflict(uint32_t seq) override; void run(std::chrono::steady_clock::duration repeatInterval) override; @@ -70,7 +85,10 @@ public: stop() override; boost::signals2::scoped_connection - subscribe(SignalType::slot_type const& subscriber) override; + subscribeToNewSequence(NewSequenceSignalType::slot_type const& subscriber) override; + + boost::signals2::scoped_connection + subscribeToDbStalled(DbStalledSignalType::slot_type const& subscriber) override; private: void diff --git a/src/etlng/impl/MonitorProvider.hpp b/src/etlng/impl/MonitorProvider.hpp new file mode 100644 index 00000000..f449fe78 --- /dev/null +++ b/src/etlng/impl/MonitorProvider.hpp @@ -0,0 +1,53 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/BackendInterface.hpp" +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "etlng/MonitorInterface.hpp" +#include "etlng/MonitorProviderInterface.hpp" +#include "etlng/impl/Monitor.hpp" +#include "util/async/AnyExecutionContext.hpp" + +#include +#include +#include +#include + +namespace etlng::impl { + +class MonitorProvider : public MonitorProviderInterface { +public: + std::unique_ptr + make( + util::async::AnyExecutionContext ctx, + std::shared_ptr backend, + std::shared_ptr validatedLedgers, + uint32_t startSequence, + std::chrono::steady_clock::duration dbStalledReportDelay + ) override + { + return std::make_unique( + std::move(ctx), std::move(backend), std::move(validatedLedgers), startSequence, dbStalledReportDelay + ); + } +}; + +} // namespace etlng::impl diff --git a/src/etlng/impl/TaskManager.cpp b/src/etlng/impl/TaskManager.cpp index 8f61dc44..86bb7da8 100644 --- a/src/etlng/impl/TaskManager.cpp +++ b/src/etlng/impl/TaskManager.cpp @@ -26,6 +26,7 @@ #include "etlng/SchedulerInterface.hpp" #include "etlng/impl/Monitor.hpp" #include "etlng/impl/TaskQueue.hpp" +#include "util/Constants.hpp" #include "util/LedgerUtils.hpp" #include "util/Profiler.hpp" #include "util/async/AnyExecutionContext.hpp" @@ -102,29 +103,36 @@ TaskManager::spawnExtractor(TaskQueue& queue) if (stopRequested) break; } - } else { - // TODO: how do we signal to the loaders that it's time to shutdown? some special task? - break; // TODO: handle server shutdown or other node took over ETL } } else { // TODO (https://github.com/XRPLF/clio/issues/1852) std::this_thread::sleep_for(kDELAY_BETWEEN_ATTEMPTS); } } + + LOG(log_.info()) << "Extractor (one of) coroutine stopped"; }); } util::async::AnyOperation TaskManager::spawnLoader(TaskQueue& queue) { - static constexpr auto kNANO_TO_SECOND = 1.0e9; - return ctx_.execute([this, &queue](auto stopRequested) { while (not stopRequested) { // TODO (https://github.com/XRPLF/clio/issues/66): does not tell the loader whether it's out of order or not if (auto data = queue.dequeue(); data.has_value()) { - auto nanos = util::timed([this, data = *data] { loader_.get().load(data); }); - auto const seconds = nanos / kNANO_TO_SECOND; + // perhaps this should return an error if conflict happened, then we can stop loading immediately + auto [expectedSuccess, nanos] = + util::timed([&] { return loader_.get().load(*data); }); + + if (not expectedSuccess.has_value()) { + LOG(log_.warn()) << "Immediately stopping loader with error: " << expectedSuccess.error() + << "; latest ledger cache loaded for " << data->seq; + monitor_.get().notifyWriteConflict(data->seq); + break; + } + + auto const seconds = nanos / util::kNANO_PER_SECOND; auto const txnCount = data->transactions.size(); auto const objCount = data->objects.size(); @@ -133,9 +141,11 @@ TaskManager::spawnLoader(TaskQueue& queue) << " seconds;" << " tps[" << txnCount / seconds << "], ops[" << objCount / seconds << "]"; - monitor_.get().notifyLedgerLoaded(data->seq); + monitor_.get().notifySequenceLoaded(data->seq); } } + + LOG(log_.info()) << "Loader coroutine stopped"; }); } diff --git a/src/util/Constants.hpp b/src/util/Constants.hpp index e148ea58..e16e5b33 100644 --- a/src/util/Constants.hpp +++ b/src/util/Constants.hpp @@ -23,4 +23,5 @@ namespace util { static constexpr std::size_t kMILLISECONDS_PER_SECOND = 1000; +static constexpr double kNANO_PER_SECOND = 1.0e9; } // namespace util diff --git a/src/util/async/context/impl/Cancellation.hpp b/src/util/async/context/impl/Cancellation.hpp index 9dfd57d7..e148aa99 100644 --- a/src/util/async/context/impl/Cancellation.hpp +++ b/src/util/async/context/impl/Cancellation.hpp @@ -19,6 +19,7 @@ #pragma once +#include #include #include #include diff --git a/tests/unit/etl/ExtractorTests.cpp b/tests/unit/etl/ExtractorTests.cpp index 43de87f5..bb87a7e7 100644 --- a/tests/unit/etl/ExtractorTests.cpp +++ b/tests/unit/etl/ExtractorTests.cpp @@ -43,7 +43,7 @@ struct ETLExtractorTest : util::prometheus::WithPrometheus, NoLoggerFixture { { state_.isStopping = false; state_.writeConflict = false; - state_.isReadOnly = false; + state_.isStrictReadonly = false; state_.isWriting = false; } diff --git a/tests/unit/etl/TransformerTests.cpp b/tests/unit/etl/TransformerTests.cpp index 0b0aba11..00bebbe8 100644 --- a/tests/unit/etl/TransformerTests.cpp +++ b/tests/unit/etl/TransformerTests.cpp @@ -64,7 +64,7 @@ struct ETLTransformerTest : util::prometheus::WithPrometheus, MockBackendTest { { state_.isStopping = false; state_.writeConflict = false; - state_.isReadOnly = false; + state_.isStrictReadonly = false; state_.isWriting = false; } diff --git a/tests/unit/etlng/ETLServiceTests.cpp b/tests/unit/etlng/ETLServiceTests.cpp index dfcffade..0215dafa 100644 --- a/tests/unit/etlng/ETLServiceTests.cpp +++ b/tests/unit/etlng/ETLServiceTests.cpp @@ -17,8 +17,10 @@ */ //============================================================================== +#include "data/BackendInterface.hpp" #include "data/Types.hpp" #include "etl/ETLState.hpp" +#include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/SystemState.hpp" #include "etlng/CacheLoaderInterface.hpp" #include "etlng/CacheUpdaterInterface.hpp" @@ -28,6 +30,7 @@ #include "etlng/LoaderInterface.hpp" #include "etlng/Models.hpp" #include "etlng/MonitorInterface.hpp" +#include "etlng/MonitorProviderInterface.hpp" #include "etlng/TaskManagerInterface.hpp" #include "etlng/TaskManagerProviderInterface.hpp" #include "util/BinaryTestObject.hpp" @@ -62,6 +65,7 @@ #include #include #include +#include #include using namespace util::config; @@ -71,8 +75,20 @@ constinit auto const kSEQ = 100; constinit auto const kLEDGER_HASH = "4BC50C9B0D8515D3EAAE1E74B29A95804346C491EE1A95BF25E4AAB854A6A652"; struct MockMonitor : public etlng::MonitorInterface { - MOCK_METHOD(void, notifyLedgerLoaded, (uint32_t), (override)); - MOCK_METHOD(boost::signals2::scoped_connection, subscribe, (SignalType::slot_type const&), (override)); + MOCK_METHOD(void, notifySequenceLoaded, (uint32_t), (override)); + MOCK_METHOD(void, notifyWriteConflict, (uint32_t), (override)); + MOCK_METHOD( + boost::signals2::scoped_connection, + subscribeToNewSequence, + (NewSequenceSignalType::slot_type const&), + (override) + ); + MOCK_METHOD( + boost::signals2::scoped_connection, + subscribeToDbStalled, + (DbStalledSignalType::slot_type const&), + (override) + ); MOCK_METHOD(void, run, (std::chrono::steady_clock::duration), (override)); MOCK_METHOD(void, stop, (), (override)); }; @@ -83,7 +99,8 @@ struct MockExtractor : etlng::ExtractorInterface { }; struct MockLoader : etlng::LoaderInterface { - MOCK_METHOD(void, load, (etlng::model::LedgerData const&), (override)); + using ExpectedType = std::expected; + MOCK_METHOD(ExpectedType, load, (etlng::model::LedgerData const&), (override)); MOCK_METHOD(std::optional, loadInitialLedger, (etlng::model::LedgerData const&), (override)); }; @@ -123,6 +140,19 @@ struct MockTaskManagerProvider : etlng::TaskManagerProviderInterface { ); }; +struct MockMonitorProvider : etlng::MonitorProviderInterface { + MOCK_METHOD( + std::unique_ptr, + make, + (util::async::AnyExecutionContext, + std::shared_ptr, + std::shared_ptr, + uint32_t, + std::chrono::steady_clock::duration), + (override) + ); +}; + auto createTestData(uint32_t seq) { @@ -134,7 +164,7 @@ createTestData(uint32_t seq) .edgeKeys = {}, .header = header, .rawHeader = {}, - .seq = seq + .seq = seq, }; } } // namespace @@ -150,6 +180,7 @@ struct ETLServiceTests : util::prometheus::WithPrometheus, MockBackendTest { protected: SameThreadTestContext ctx_; util::config::ClioConfigDefinition config_{ + {"read_only", ConfigValue{ConfigType::Boolean}.defaultValue(false)}, {"extractor_threads", ConfigValue{ConfigType::Integer}.defaultValue(4)}, {"io_threads", ConfigValue{ConfigType::Integer}.defaultValue(2)}, {"cache.num_diffs", ConfigValue{ConfigType::Integer}.defaultValue(32)}, @@ -159,7 +190,7 @@ protected: {"cache.page_fetch_size", ConfigValue{ConfigType::Integer}.defaultValue(512)}, {"cache.load", ConfigValue{ConfigType::String}.defaultValue("async")} }; - StrictMockSubscriptionManagerSharedPtr subscriptions_; + MockSubscriptionManagerSharedPtr subscriptions_; std::shared_ptr> balancer_ = std::make_shared>(); std::shared_ptr> ledgers_ = @@ -176,6 +207,8 @@ protected: std::make_shared>(); std::shared_ptr> taskManagerProvider_ = std::make_shared>(); + std::shared_ptr> monitorProvider_ = + std::make_shared>(); std::shared_ptr systemState_ = std::make_shared(); etlng::ETLService service_{ @@ -191,6 +224,7 @@ protected: loader_, initialLoadObserver_, taskManagerProvider_, + monitorProvider_, systemState_ }; }; @@ -258,65 +292,206 @@ TEST_F(ETLServiceTests, LastCloseAgeSeconds) TEST_F(ETLServiceTests, RunWithEmptyDatabase) { auto mockTaskManager = std::make_unique>(); + auto& mockTaskManagerRef = *mockTaskManager; auto ledgerData = createTestData(kSEQ); testing::Sequence const s; - EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).InSequence(s).WillOnce(testing::Return(std::nullopt)); + EXPECT_CALL(*backend_, hardFetchLedgerRange).InSequence(s).WillOnce(testing::Return(std::nullopt)); EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ)); EXPECT_CALL(*extractor_, extractLedgerOnly(kSEQ)).WillOnce(testing::Return(ledgerData)); EXPECT_CALL(*balancer_, loadInitialLedger(kSEQ, testing::_, testing::_)) .WillOnce(testing::Return(std::vector{})); - EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).WillOnce(testing::Return(ripple::LedgerHeader{})); - EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)) + EXPECT_CALL(*loader_, loadInitialLedger).WillOnce(testing::Return(ripple::LedgerHeader{})); + EXPECT_CALL(*backend_, hardFetchLedgerRange) .InSequence(s) .WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); - EXPECT_CALL(*mockTaskManager, run(testing::_)); + EXPECT_CALL(mockTaskManagerRef, run); EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1)) .WillOnce(testing::Return(std::unique_ptr(mockTaskManager.release()))); + EXPECT_CALL(*monitorProvider_, make(testing::_, testing::_, testing::_, testing::_, testing::_)) + .WillOnce([](auto, auto, auto, auto, auto) { return std::make_unique>(); }); service_.run(); } TEST_F(ETLServiceTests, RunWithPopulatedDatabase) { - auto mockTaskManager = std::make_unique>(); - - EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)) + EXPECT_CALL(*backend_, hardFetchLedgerRange) .WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); + EXPECT_CALL(*monitorProvider_, make(testing::_, testing::_, testing::_, testing::_, testing::_)) + .WillOnce([](auto, auto, auto, auto, auto) { return std::make_unique>(); }); EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ)); EXPECT_CALL(*cacheLoader_, load(kSEQ)); - EXPECT_CALL(*mockTaskManager, run(testing::_)); - EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1)) - .WillOnce(testing::Return(std::unique_ptr(mockTaskManager.release()))); service_.run(); } TEST_F(ETLServiceTests, WaitForValidatedLedgerIsAborted) { - EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt)); + EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(testing::Return(std::nullopt)); EXPECT_CALL(*ledgers_, getMostRecent()).Times(2).WillRepeatedly(testing::Return(std::nullopt)); // No other calls should happen because we exit early - EXPECT_CALL(*extractor_, extractLedgerOnly(testing::_)).Times(0); + EXPECT_CALL(*extractor_, extractLedgerOnly).Times(0); EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_)).Times(0); - EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).Times(0); + EXPECT_CALL(*loader_, loadInitialLedger).Times(0); EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, testing::_)).Times(0); service_.run(); } +TEST_F(ETLServiceTests, HandlesWriteConflictInMonitorSubscription) +{ + auto mockMonitor = std::make_unique>(); + auto& mockMonitorRef = *mockMonitor; + std::function capturedCallback; + + EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) { + return std::move(mockMonitor); + }); + + EXPECT_CALL(mockMonitorRef, subscribeToNewSequence).WillOnce([&capturedCallback](auto&& callback) { + capturedCallback = callback; + return boost::signals2::scoped_connection{}; + }); + EXPECT_CALL(mockMonitorRef, subscribeToDbStalled); + EXPECT_CALL(mockMonitorRef, run); + + EXPECT_CALL(*backend_, hardFetchLedgerRange) + .WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); + EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ)); + EXPECT_CALL(*cacheLoader_, load(kSEQ)); + + service_.run(); + systemState_->writeConflict = true; + + EXPECT_CALL(*publisher_, publish(kSEQ + 1, testing::_, testing::_)); + ASSERT_TRUE(capturedCallback); + capturedCallback(kSEQ + 1); + + EXPECT_FALSE(systemState_->writeConflict); + EXPECT_FALSE(systemState_->isWriting); +} + +TEST_F(ETLServiceTests, NormalFlowInMonitorSubscription) +{ + auto mockMonitor = std::make_unique>(); + auto& mockMonitorRef = *mockMonitor; + std::function capturedCallback; + + EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) { + return std::move(mockMonitor); + }); + + EXPECT_CALL(mockMonitorRef, subscribeToNewSequence).WillOnce([&capturedCallback](auto callback) { + capturedCallback = callback; + return boost::signals2::scoped_connection{}; + }); + EXPECT_CALL(mockMonitorRef, subscribeToDbStalled); + EXPECT_CALL(mockMonitorRef, run); + + EXPECT_CALL(*backend_, hardFetchLedgerRange) + .WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); + EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ)); + EXPECT_CALL(*cacheLoader_, load(kSEQ)); + + service_.run(); + systemState_->isWriting = false; + std::vector dummyDiff = {}; + + EXPECT_CALL(*backend_, fetchLedgerDiff(kSEQ + 1, testing::_)).WillOnce(testing::Return(dummyDiff)); + EXPECT_CALL(*cacheUpdater_, update(kSEQ + 1, testing::A const&>())); + EXPECT_CALL(*publisher_, publish(kSEQ + 1, testing::_, testing::_)); + + ASSERT_TRUE(capturedCallback); + capturedCallback(kSEQ + 1); +} + +TEST_F(ETLServiceTests, AttemptTakeoverWriter) +{ + auto mockMonitor = std::make_unique>(); + auto& mockMonitorRef = *mockMonitor; + std::function capturedDbStalledCallback; + + EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) { + return std::move(mockMonitor); + }); + + EXPECT_CALL(mockMonitorRef, subscribeToNewSequence); + EXPECT_CALL(mockMonitorRef, subscribeToDbStalled).WillOnce([&capturedDbStalledCallback](auto callback) { + capturedDbStalledCallback = callback; + return boost::signals2::scoped_connection{}; + }); + EXPECT_CALL(mockMonitorRef, run); + + EXPECT_CALL(*backend_, hardFetchLedgerRange) + .WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); + EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ)); + EXPECT_CALL(*cacheLoader_, load(kSEQ)); + + service_.run(); + systemState_->isStrictReadonly = false; // writer node + systemState_->isWriting = false; // but starts in readonly as usual + + auto mockTaskManager = std::make_unique>(); + auto& mockTaskManagerRef = *mockTaskManager; + EXPECT_CALL(mockTaskManagerRef, run); + + EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1)) + .WillOnce(testing::Return(std::move(mockTaskManager))); + + ASSERT_TRUE(capturedDbStalledCallback); + capturedDbStalledCallback(); + + EXPECT_TRUE(systemState_->isWriting); // should attempt to become writer +} + +TEST_F(ETLServiceTests, GiveUpWriterAfterWriteConflict) +{ + auto mockMonitor = std::make_unique>(); + auto& mockMonitorRef = *mockMonitor; + + std::function capturedCallback; + + EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) { + return std::move(mockMonitor); + }); + EXPECT_CALL(mockMonitorRef, subscribeToNewSequence).WillOnce([&capturedCallback](auto callback) { + capturedCallback = callback; + return boost::signals2::scoped_connection{}; + }); + EXPECT_CALL(mockMonitorRef, subscribeToDbStalled); + EXPECT_CALL(mockMonitorRef, run); + + EXPECT_CALL(*backend_, hardFetchLedgerRange) + .WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); + EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ)); + EXPECT_CALL(*cacheLoader_, load(kSEQ)); + + service_.run(); + systemState_->isWriting = true; + systemState_->writeConflict = true; // got a write conflict along the way + + EXPECT_CALL(*publisher_, publish(kSEQ + 1, testing::_, testing::_)); + + ASSERT_TRUE(capturedCallback); + capturedCallback(kSEQ + 1); + + EXPECT_FALSE(systemState_->isWriting); // gives up writing + EXPECT_FALSE(systemState_->writeConflict); // and removes write conflict flag +} + struct ETLServiceAssertTests : common::util::WithMockAssert, ETLServiceTests {}; TEST_F(ETLServiceAssertTests, FailToLoadInitialLedger) { - EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt)); + EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(testing::Return(std::nullopt)); EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ)); EXPECT_CALL(*extractor_, extractLedgerOnly(kSEQ)).WillOnce(testing::Return(std::nullopt)); // These calls should not happen because loading the initial ledger fails EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_)).Times(0); - EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).Times(0); + EXPECT_CALL(*loader_, loadInitialLedger).Times(0); EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, testing::_)).Times(0); EXPECT_CLIO_ASSERT_FAIL({ service_.run(); }); @@ -325,14 +500,14 @@ TEST_F(ETLServiceAssertTests, FailToLoadInitialLedger) TEST_F(ETLServiceAssertTests, WaitForValidatedLedgerIsAbortedLeadToFailToLoadInitialLedger) { testing::Sequence const s; - EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt)); + EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(testing::Return(std::nullopt)); EXPECT_CALL(*ledgers_, getMostRecent()).InSequence(s).WillOnce(testing::Return(std::nullopt)); EXPECT_CALL(*ledgers_, getMostRecent()).InSequence(s).WillOnce(testing::Return(kSEQ)); // No other calls should happen because we exit early - EXPECT_CALL(*extractor_, extractLedgerOnly(testing::_)).Times(0); + EXPECT_CALL(*extractor_, extractLedgerOnly).Times(0); EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_)).Times(0); - EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).Times(0); + EXPECT_CALL(*loader_, loadInitialLedger).Times(0); EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, testing::_)).Times(0); EXPECT_CLIO_ASSERT_FAIL({ service_.run(); }); diff --git a/tests/unit/etlng/LedgerPublisherTests.cpp b/tests/unit/etlng/LedgerPublisherTests.cpp index fdd32e36..220c3d11 100644 --- a/tests/unit/etlng/LedgerPublisherTests.cpp +++ b/tests/unit/etlng/LedgerPublisherTests.cpp @@ -69,52 +69,64 @@ struct ETLLedgerPublisherNgTest : util::prometheus::WithPrometheus, MockBackendT StrictMockSubscriptionManagerSharedPtr mockSubscriptionManagerPtr; }; -TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderIsWritingFalseAndCacheDisabled) +TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderSkipDueToAge) { - etl::SystemState dummyState; - dummyState.isWriting = false; + // Use kAGE (800) which is > MAX_LEDGER_AGE_SECONDS (600) to test skipping auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE); - impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); - publisher.publish(dummyLedgerHeader); - EXPECT_CALL(*backend_, fetchLedgerDiff(kSEQ, _)).Times(0); + auto dummyState = etl::SystemState{}; + auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); - // setLastPublishedSequence not in strand, should verify before run + backend_->setRange(kSEQ - 1, kSEQ); + publisher.publish(dummyLedgerHeader); + + // Verify last published sequence is set immediately EXPECT_TRUE(publisher.getLastPublishedSequence()); EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); + // Since age > MAX_LEDGER_AGE_SECONDS, these should not be called + EXPECT_CALL(*backend_, doFetchLedgerObject).Times(0); + EXPECT_CALL(*backend_, fetchAllTransactionsInLedger).Times(0); + EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger).Times(0); + EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges).Times(0); + EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction).Times(0); + ctx_.run(); - EXPECT_TRUE(backend_->fetchLedgerRange()); - EXPECT_EQ(backend_->fetchLedgerRange().value().minSequence, kSEQ); - EXPECT_EQ(backend_->fetchLedgerRange().value().maxSequence, kSEQ); } -TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderIsWritingFalseAndCacheEnabled) +TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderWithinAgeLimit) { - etl::SystemState dummyState; - dummyState.isWriting = false; - auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE); - impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + // Use age 0 which is < MAX_LEDGER_AGE_SECONDS to ensure publishing happens + auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); + auto dummyState = etl::SystemState{}; + auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + + backend_->setRange(kSEQ - 1, kSEQ); publisher.publish(dummyLedgerHeader); - // setLastPublishedSequence not in strand, should verify before run + // Verify last published sequence is set immediately EXPECT_TRUE(publisher.getLastPublishedSequence()); EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); + EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) + .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); + EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _)) + .WillOnce(Return(std::vector{})); + + EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 0)); + EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges); + ctx_.run(); - EXPECT_TRUE(backend_->fetchLedgerRange()); - EXPECT_EQ(backend_->fetchLedgerRange().value().minSequence, kSEQ); - EXPECT_EQ(backend_->fetchLedgerRange().value().maxSequence, kSEQ); + EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1); } TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderIsWritingTrue) { - etl::SystemState dummyState; + auto dummyState = etl::SystemState{}; dummyState.isWriting = true; auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE); - impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); publisher.publish(dummyLedgerHeader); - // setLastPublishedSequence not in strand, should verify before run EXPECT_TRUE(publisher.getLastPublishedSequence()); EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); @@ -124,16 +136,15 @@ TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderIsWritingTrue) TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderInRange) { - etl::SystemState dummyState; + auto dummyState = etl::SystemState{}; dummyState.isWriting = true; auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0 - impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); backend_->setRange(kSEQ - 1, kSEQ); publisher.publish(dummyLedgerHeader); - // mock fetch fee EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); @@ -145,10 +156,8 @@ TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderInRange) .peekData(); t1.ledgerSequence = kSEQ; - // mock fetch transactions EXPECT_CALL(*backend_, fetchAllTransactionsInLedger).WillOnce(Return(std::vector{t1})); - // setLastPublishedSequence not in strand, should verify before run EXPECT_TRUE(publisher.getLastPublishedSequence()); EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); @@ -158,26 +167,24 @@ TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderInRange) EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction); ctx_.run(); - // last publish time should be set EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1); } TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderCloseTimeGreaterThanNow) { - etl::SystemState dummyState; + auto dummyState = etl::SystemState{}; dummyState.isWriting = true; - ripple::LedgerHeader dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); + auto dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); auto const nowPlus10 = system_clock::now() + seconds(10); auto const closeTime = duration_cast(nowPlus10.time_since_epoch()).count() - kRIPPLE_EPOCH_START; dummyLedgerHeader.closeTime = ripple::NetClock::time_point{seconds{closeTime}}; backend_->setRange(kSEQ - 1, kSEQ); - impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); publisher.publish(dummyLedgerHeader); - // mock fetch fee EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); @@ -189,37 +196,33 @@ TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderCloseTimeGreaterThanNow) .peekData(); t1.ledgerSequence = kSEQ; - // mock fetch transactions EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _)) .WillOnce(Return(std::vector{t1})); - // setLastPublishedSequence not in strand, should verify before run EXPECT_TRUE(publisher.getLastPublishedSequence()); EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 1)); EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges); - // mock 1 transaction EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction); ctx_.run(); - // last publish time should be set EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1); } TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqStopIsTrue) { - etl::SystemState dummyState; + auto dummyState = etl::SystemState{}; dummyState.isStopping = true; - impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); EXPECT_FALSE(publisher.publish(kSEQ, {})); } TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqMaxAttempt) { - etl::SystemState dummyState; + auto dummyState = etl::SystemState{}; dummyState.isStopping = false; - impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); static constexpr auto kMAX_ATTEMPT = 2; @@ -231,9 +234,9 @@ TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqMaxAttempt) TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqStopIsFalse) { - etl::SystemState dummyState; + auto dummyState = etl::SystemState{}; dummyState.isStopping = false; - impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); LedgerRange const range{.minSequence = kSEQ, .maxSequence = kSEQ}; EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(Return(range)); @@ -247,16 +250,15 @@ TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqStopIsFalse) TEST_F(ETLLedgerPublisherNgTest, PublishMultipleTxInOrder) { - etl::SystemState dummyState; + auto dummyState = etl::SystemState{}; dummyState.isWriting = true; auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0 - impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); backend_->setRange(kSEQ - 1, kSEQ); publisher.publish(dummyLedgerHeader); - // mock fetch fee EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); @@ -278,34 +280,31 @@ TEST_F(ETLLedgerPublisherNgTest, PublishMultipleTxInOrder) t2.ledgerSequence = kSEQ; t2.date = 2; - // mock fetch transactions EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _)) .WillOnce(Return(std::vector{t1, t2})); - // setLastPublishedSequence not in strand, should verify before run EXPECT_TRUE(publisher.getLastPublishedSequence()); EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 2)); EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges); - // should call pubTransaction t2 first (greater tx index) + Sequence const s; EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t2, _)).InSequence(s); EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t1, _)).InSequence(s); ctx_.run(); - // last publish time should be set EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1); } TEST_F(ETLLedgerPublisherNgTest, PublishVeryOldLedgerShouldSkip) { - etl::SystemState dummyState; + auto dummyState = etl::SystemState{}; dummyState.isWriting = true; // Create a ledger header with age (800) greater than MAX_LEDGER_AGE_SECONDS (600) auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 800); - impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); backend_->setRange(kSEQ - 1, kSEQ); publisher.publish(dummyLedgerHeader); @@ -322,12 +321,12 @@ TEST_F(ETLLedgerPublisherNgTest, PublishVeryOldLedgerShouldSkip) TEST_F(ETLLedgerPublisherNgTest, PublishMultipleLedgersInQuickSuccession) { - etl::SystemState dummyState; + auto dummyState = etl::SystemState{}; dummyState.isWriting = true; auto const dummyLedgerHeader1 = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); auto const dummyLedgerHeader2 = createLedgerHeader(kLEDGER_HASH, kSEQ + 1, 0); - impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); backend_->setRange(kSEQ - 1, kSEQ + 1); // Publish two ledgers in quick succession diff --git a/tests/unit/etlng/LoadingTests.cpp b/tests/unit/etlng/LoadingTests.cpp index 512c8968..086e523f 100644 --- a/tests/unit/etlng/LoadingTests.cpp +++ b/tests/unit/etlng/LoadingTests.cpp @@ -18,6 +18,7 @@ //============================================================================== #include "data/Types.hpp" +#include "etl/SystemState.hpp" #include "etlng/InitialLoadObserverInterface.hpp" #include "etlng/Models.hpp" #include "etlng/RegistryInterface.hpp" @@ -67,7 +68,8 @@ struct MockLoadObserver : etlng::InitialLoadObserverInterface { struct LoadingTests : util::prometheus::WithPrometheus, MockBackendTest, MockAmendmentBlockHandlerTest { protected: std::shared_ptr mockRegistryPtr_ = std::make_shared(); - Loader loader_{backend_, mockRegistryPtr_, mockAmendmentBlockHandlerPtr_}; + std::shared_ptr state_ = std::make_shared(); + Loader loader_{backend_, mockRegistryPtr_, mockAmendmentBlockHandlerPtr_, state_}; }; struct LoadingAssertTest : common::util::WithMockAssert, LoadingTests {}; @@ -104,6 +106,7 @@ TEST_F(LoadingTests, LoadInitialLedger) TEST_F(LoadingTests, LoadSuccess) { + state_->isWriting = true; // writer is active auto const data = createTestData(); EXPECT_CALL(*backend_, doFinishWrites()); @@ -114,6 +117,7 @@ TEST_F(LoadingTests, LoadSuccess) TEST_F(LoadingTests, LoadFailure) { + state_->isWriting = true; // writer is active auto const data = createTestData(); EXPECT_CALL(*backend_, doFinishWrites()).Times(0); diff --git a/tests/unit/etlng/MonitorTests.cpp b/tests/unit/etlng/MonitorTests.cpp index e719a171..54c689ce 100644 --- a/tests/unit/etlng/MonitorTests.cpp +++ b/tests/unit/etlng/MonitorTests.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include using namespace etlng::impl; @@ -40,6 +41,7 @@ using namespace data; namespace { constexpr auto kSTART_SEQ = 123u; +constexpr auto kNO_NEW_LEDGER_REPORT_DELAY = std::chrono::milliseconds(1u); } // namespace struct MonitorTests : util::prometheus::WithPrometheus, MockBackendTest { @@ -47,8 +49,10 @@ protected: util::async::CoroExecutionContext ctx_; StrictMockNetworkValidatedLedgersPtr ledgers_; testing::StrictMock> actionMock_; + testing::StrictMock> dbStalledMock_; - etlng::impl::Monitor monitor_ = etlng::impl::Monitor(ctx_, backend_, ledgers_, kSTART_SEQ); + etlng::impl::Monitor monitor_ = + etlng::impl::Monitor(ctx_, backend_, ledgers_, kSTART_SEQ, kNO_NEW_LEDGER_REPORT_DELAY); }; TEST_F(MonitorTests, ConsumesAndNotifiesForAllOutstandingSequencesAtOnce) @@ -65,7 +69,7 @@ TEST_F(MonitorTests, ConsumesAndNotifiesForAllOutstandingSequencesAtOnce) unblock.release(); }); - auto subscription = monitor_.subscribe(actionMock_.AsStdFunction()); + auto subscription = monitor_.subscribeToNewSequence(actionMock_.AsStdFunction()); monitor_.run(std::chrono::milliseconds{10}); unblock.acquire(); } @@ -88,7 +92,7 @@ TEST_F(MonitorTests, NotifiesForEachSequence) unblock.release(); }); - auto subscription = monitor_.subscribe(actionMock_.AsStdFunction()); + auto subscription = monitor_.subscribeToNewSequence(actionMock_.AsStdFunction()); monitor_.run(std::chrono::milliseconds{1}); unblock.acquire(); } @@ -106,7 +110,7 @@ TEST_F(MonitorTests, NotifiesWhenForcedByNewSequenceAvailableFromNetwork) EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(range)); EXPECT_CALL(actionMock_, Call).WillOnce([&] { unblock.release(); }); - auto subscription = monitor_.subscribe(actionMock_.AsStdFunction()); + auto subscription = monitor_.subscribeToNewSequence(actionMock_.AsStdFunction()); monitor_.run(std::chrono::seconds{10}); // expected to be force-invoked sooner than in 10 sec pusher(kSTART_SEQ); // pretend network validated a new ledger unblock.acquire(); @@ -121,8 +125,49 @@ TEST_F(MonitorTests, NotifiesWhenForcedByLedgerLoaded) EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(range)); EXPECT_CALL(actionMock_, Call).WillOnce([&] { unblock.release(); }); - auto subscription = monitor_.subscribe(actionMock_.AsStdFunction()); - monitor_.run(std::chrono::seconds{10}); // expected to be force-invoked sooner than in 10 sec - monitor_.notifyLedgerLoaded(kSTART_SEQ); // notify about newly committed ledger + auto subscription = monitor_.subscribeToNewSequence(actionMock_.AsStdFunction()); + monitor_.run(std::chrono::seconds{10}); // expected to be force-invoked sooner than in 10 sec + monitor_.notifySequenceLoaded(kSTART_SEQ); // notify about newly committed ledger + unblock.acquire(); +} + +TEST_F(MonitorTests, ResumesMonitoringFromNextSequenceAfterWriteConflict) +{ + constexpr uint32_t kCONFLICT_SEQ = 456u; + constexpr uint32_t kEXPECTED_NEXT_SEQ = kCONFLICT_SEQ + 1; + + LedgerRange const rangeBeforeConflict(kSTART_SEQ, kSTART_SEQ); + LedgerRange const rangeAfterConflict(kEXPECTED_NEXT_SEQ, kEXPECTED_NEXT_SEQ); + std::binary_semaphore unblock(0); + + EXPECT_CALL(*ledgers_, subscribe(testing::_)); + + { + testing::InSequence seq; // second call will produce conflict + EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(rangeBeforeConflict)); + EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillRepeatedly(testing::Return(rangeAfterConflict)); + } + + EXPECT_CALL(actionMock_, Call(kEXPECTED_NEXT_SEQ)).WillOnce([&](uint32_t seq) { + EXPECT_EQ(seq, kEXPECTED_NEXT_SEQ); + unblock.release(); + }); + + auto subscription = monitor_.subscribeToNewSequence(actionMock_.AsStdFunction()); + monitor_.run(std::chrono::nanoseconds{100}); + monitor_.notifyWriteConflict(kCONFLICT_SEQ); + unblock.acquire(); +} + +TEST_F(MonitorTests, DbStalledChannelTriggeredWhenTimeoutExceeded) +{ + std::binary_semaphore unblock(0); + + EXPECT_CALL(*ledgers_, subscribe(testing::_)); + EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillRepeatedly(testing::Return(std::nullopt)); + EXPECT_CALL(dbStalledMock_, Call()).WillOnce([&]() { unblock.release(); }); + + auto subscription = monitor_.subscribeToDbStalled(dbStalledMock_.AsStdFunction()); + monitor_.run(std::chrono::nanoseconds{100}); unblock.acquire(); } diff --git a/tests/unit/etlng/RegistryTests.cpp b/tests/unit/etlng/RegistryTests.cpp index 5e40a6c2..b75b044d 100644 --- a/tests/unit/etlng/RegistryTests.cpp +++ b/tests/unit/etlng/RegistryTests.cpp @@ -672,16 +672,28 @@ TEST_F(RegistryTest, MixedReadonlyAndRegularExtensions) TEST_F(RegistryTest, MonitorInterfaceExecution) { struct MockMonitor : etlng::MonitorInterface { - MOCK_METHOD(void, notifyLedgerLoaded, (uint32_t), (override)); - MOCK_METHOD(boost::signals2::scoped_connection, subscribe, (SignalType::slot_type const&), (override)); + MOCK_METHOD(void, notifySequenceLoaded, (uint32_t), (override)); + MOCK_METHOD(void, notifyWriteConflict, (uint32_t), (override)); + MOCK_METHOD( + boost::signals2::scoped_connection, + subscribeToNewSequence, + (NewSequenceSignalType::slot_type const&), + (override) + ); + MOCK_METHOD( + boost::signals2::scoped_connection, + subscribeToDbStalled, + (DbStalledSignalType::slot_type const&), + (override) + ); MOCK_METHOD(void, run, (std::chrono::steady_clock::duration), (override)); MOCK_METHOD(void, stop, (), (override)); }; auto monitor = MockMonitor{}; - EXPECT_CALL(monitor, notifyLedgerLoaded(kSEQ)).Times(1); + EXPECT_CALL(monitor, notifySequenceLoaded(kSEQ)).Times(1); - monitor.notifyLedgerLoaded(kSEQ); + monitor.notifySequenceLoaded(kSEQ); } TEST_F(RegistryTest, ReadonlyModeWithAllowInReadonlyTest) diff --git a/tests/unit/etlng/TaskManagerTests.cpp b/tests/unit/etlng/TaskManagerTests.cpp index d94655a0..59480207 100644 --- a/tests/unit/etlng/TaskManagerTests.cpp +++ b/tests/unit/etlng/TaskManagerTests.cpp @@ -62,13 +62,26 @@ struct MockExtractor : etlng::ExtractorInterface { }; struct MockLoader : etlng::LoaderInterface { - MOCK_METHOD(void, load, (LedgerData const&), (override)); + using ExpectedType = std::expected; + MOCK_METHOD(ExpectedType, load, (LedgerData const&), (override)); MOCK_METHOD(std::optional, loadInitialLedger, (LedgerData const&), (override)); }; struct MockMonitor : etlng::MonitorInterface { - MOCK_METHOD(void, notifyLedgerLoaded, (uint32_t), (override)); - MOCK_METHOD(boost::signals2::scoped_connection, subscribe, (SignalType::slot_type const&), (override)); + MOCK_METHOD(void, notifySequenceLoaded, (uint32_t), (override)); + MOCK_METHOD(void, notifyWriteConflict, (uint32_t), (override)); + MOCK_METHOD( + boost::signals2::scoped_connection, + subscribeToNewSequence, + (NewSequenceSignalType::slot_type const&), + (override) + ); + MOCK_METHOD( + boost::signals2::scoped_connection, + subscribeToDbStalled, + (DbStalledSignalType::slot_type const&), + (override) + ); MOCK_METHOD(void, run, (std::chrono::steady_clock::duration), (override)); MOCK_METHOD(void, stop, (), (override)); }; @@ -127,14 +140,17 @@ TEST_F(TaskManagerTests, LoaderGetsDataIfNextSequenceIsExtracted) return createTestData(seq); }); - EXPECT_CALL(*mockLoaderPtr_, load(testing::_)).Times(kTOTAL).WillRepeatedly([&](LedgerData data) { - loaded.push_back(data.seq); - if (loaded.size() == kTOTAL) { - done.release(); - } - }); + EXPECT_CALL(*mockLoaderPtr_, load(testing::_)) + .Times(kTOTAL) + .WillRepeatedly([&](LedgerData data) -> std::expected { + loaded.push_back(data.seq); + if (loaded.size() == kTOTAL) { + done.release(); + } + return {}; + }); - EXPECT_CALL(*mockMonitorPtr_, notifyLedgerLoaded(testing::_)).Times(kTOTAL); + EXPECT_CALL(*mockMonitorPtr_, notifySequenceLoaded(testing::_)).Times(kTOTAL); taskManager_.run(kEXTRACTORS); done.acquire(); @@ -145,3 +161,60 @@ TEST_F(TaskManagerTests, LoaderGetsDataIfNextSequenceIsExtracted) EXPECT_EQ(loaded[i], kSEQ + i); } } + +TEST_F(TaskManagerTests, WriteConflictHandling) +{ + static constexpr auto kTOTAL = 64uz; + static constexpr auto kCONFLICT_AFTER = 32uz; // Conflict after 32 ledgers + static constexpr auto kEXTRACTORS = 4uz; + + std::atomic_uint32_t seq = kSEQ; + std::vector loaded; + std::binary_semaphore done{0}; + bool conflictOccurred = false; + + EXPECT_CALL(*mockSchedulerPtr_, next()).WillRepeatedly([&]() { + return Task{.priority = Task::Priority::Higher, .seq = seq++}; + }); + + EXPECT_CALL(*mockExtractorPtr_, extractLedgerWithDiff(testing::_)) + .WillRepeatedly([](uint32_t seq) -> std::optional { + if (seq > kSEQ + kTOTAL - 1) + return std::nullopt; + + return createTestData(seq); + }); + + // First kCONFLICT_AFTER calls succeed, then we get a write conflict + EXPECT_CALL(*mockLoaderPtr_, load(testing::_)) + .WillRepeatedly([&](LedgerData data) -> std::expected { + loaded.push_back(data.seq); + + if (loaded.size() == kCONFLICT_AFTER) { + conflictOccurred = true; + done.release(); + return std::unexpected("write conflict"); + } + + // Only release semaphore if we reach kTOTAL without conflict + if (loaded.size() == kTOTAL) { + done.release(); + } + + return {}; + }); + + EXPECT_CALL(*mockMonitorPtr_, notifySequenceLoaded(testing::_)).Times(kCONFLICT_AFTER - 1); + EXPECT_CALL(*mockMonitorPtr_, notifyWriteConflict(kSEQ + kCONFLICT_AFTER - 1)); + + taskManager_.run(kEXTRACTORS); + done.acquire(); + taskManager_.stop(); + + EXPECT_EQ(loaded.size(), kCONFLICT_AFTER); + EXPECT_TRUE(conflictOccurred); + + for (std::size_t i = 0; i < loaded.size(); ++i) { + EXPECT_EQ(loaded[i], kSEQ + i); + } +}