feat: ETLng publisher and service refactoring (#2138)

This commit is contained in:
Alex Kremer
2025-05-23 15:01:50 +01:00
committed by GitHub
parent 8aab33c18c
commit 2a147b9487
40 changed files with 2619 additions and 585 deletions

View File

@@ -36,6 +36,7 @@
#include "rpc/RPCEngine.hpp"
#include "rpc/WorkQueue.hpp"
#include "rpc/common/impl/HandlerProvider.hpp"
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/build/Build.hpp"
#include "util/config/ConfigDefinition.hpp"
#include "util/log/Logger.hpp"
@@ -103,6 +104,10 @@ ClioApplication::run(bool const useNgWebServer)
// This is not the only io context in the application.
boost::asio::io_context ioc{threads};
// Similarly we need a context to run ETLng on
// In the future we can remove the raw ioc and use ctx instead
util::async::CoroExecutionContext ctx{threads};
// Rate limiter, to prevent abuse
auto whitelistHandler = web::dosguard::WhitelistHandler{config_};
auto const dosguardWeights = web::dosguard::Weights::make(config_);
@@ -146,7 +151,7 @@ ClioApplication::run(bool const useNgWebServer)
}();
// ETL is responsible for writing and publishing to streams. In read-only mode, ETL only publishes
auto etl = etl::ETLService::makeETLService(config_, ioc, backend, subscriptions, balancer, ledgers);
auto etl = etl::ETLService::makeETLService(config_, ioc, ctx, backend, subscriptions, balancer, ledgers);
auto workQueue = rpc::WorkQueue::makeWorkQueue(config_);
auto counters = rpc::Counters::makeCounters(workQueue);

View File

@@ -26,6 +26,7 @@
#include "etl/impl/CursorFromAccountProvider.hpp"
#include "etl/impl/CursorFromDiffProvider.hpp"
#include "etl/impl/CursorFromFixDiffNumProvider.hpp"
#include "etlng/CacheLoaderInterface.hpp"
#include "util/Assert.hpp"
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/log/Logger.hpp"
@@ -33,6 +34,7 @@
#include <cstdint>
#include <functional>
#include <memory>
#include <utility>
namespace etl {
@@ -46,7 +48,7 @@ namespace etl {
* @tparam ExecutionContextType The type of the execution context to use
*/
template <typename ExecutionContextType = util::async::CoroExecutionContext>
class CacheLoader {
class CacheLoader : public etlng::CacheLoaderInterface {
using CacheLoaderType = impl::CacheLoaderImpl<data::LedgerCacheInterface>;
util::Logger log_{"ETL"};
@@ -67,10 +69,13 @@ public:
*/
CacheLoader(
util::config::ClioConfigDefinition const& config,
std::shared_ptr<BackendInterface> const& backend,
std::shared_ptr<BackendInterface> backend,
data::LedgerCacheInterface& cache
)
: backend_{backend}, cache_{cache}, settings_{makeCacheLoaderSettings(config)}, ctx_{settings_.numThreads}
: backend_{std::move(backend)}
, cache_{cache}
, settings_{makeCacheLoaderSettings(config)}
, ctx_{settings_.numThreads}
{
}
@@ -83,7 +88,7 @@ public:
* @param seq The sequence number to load cache for
*/
void
load(uint32_t const seq)
load(uint32_t const seq) override
{
ASSERT(not cache_.get().isFull(), "Cache must not be full. seq = {}", seq);
@@ -129,7 +134,7 @@ public:
* @brief Requests the loader to stop asap
*/
void
stop() noexcept
stop() noexcept override
{
if (loader_ != nullptr)
loader_->stop();
@@ -139,7 +144,7 @@ public:
* @brief Waits for the loader to finish background work
*/
void
wait() noexcept
wait() noexcept override
{
if (loader_ != nullptr)
loader_->wait();

View File

@@ -20,17 +20,38 @@
#include "etl/ETLService.hpp"
#include "data/BackendInterface.hpp"
#include "etl/CacheLoader.hpp"
#include "etl/CorruptionDetector.hpp"
#include "etl/ETLState.hpp"
#include "etl/LoadBalancer.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/SystemState.hpp"
#include "etl/impl/AmendmentBlockHandler.hpp"
#include "etl/impl/ExtractionDataPipe.hpp"
#include "etl/impl/Extractor.hpp"
#include "etl/impl/LedgerFetcher.hpp"
#include "etl/impl/LedgerLoader.hpp"
#include "etl/impl/LedgerPublisher.hpp"
#include "etl/impl/Transformer.hpp"
#include "etlng/ETLService.hpp"
#include "etlng/ETLServiceInterface.hpp"
#include "etlng/LoadBalancer.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/impl/LedgerPublisher.hpp"
#include "etlng/impl/TaskManagerProvider.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/Assert.hpp"
#include "util/Constants.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/config/ConfigDefinition.hpp"
#include "util/log/Logger.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/json/object.hpp>
#include <grpcpp/grpcpp.h>
#include <org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <chrono>
@@ -45,6 +66,75 @@
namespace etl {
std::shared_ptr<etlng::ETLServiceInterface>
ETLService::makeETLService(
util::config::ClioConfigDefinition const& config,
boost::asio::io_context& ioc,
util::async::AnyExecutionContext ctx,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<etlng::LoadBalancerInterface> balancer,
std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
)
{
std::shared_ptr<etlng::ETLServiceInterface> ret;
if (config.get<bool>("__ng_etl")) {
ASSERT(
std::dynamic_pointer_cast<etlng::LoadBalancer>(balancer), "LoadBalancer type must be etlng::LoadBalancer"
);
auto state = std::make_shared<etl::SystemState>();
auto fetcher = std::make_shared<etl::impl::LedgerFetcher>(backend, balancer);
auto extractor = std::make_shared<etlng::impl::Extractor>(fetcher);
auto publisher = std::make_shared<etlng::impl::LedgerPublisher>(ioc, backend, subscriptions, *state);
auto cacheLoader = std::make_shared<etl::CacheLoader<>>(config, backend, backend->cache());
auto cacheUpdater = std::make_shared<etlng::impl::CacheUpdater>(backend->cache());
auto amendmentBlockHandler = std::make_shared<etlng::impl::AmendmentBlockHandler>(ctx, *state);
auto loader = std::make_shared<etlng::impl::Loader>(
backend,
etlng::impl::makeRegistry(
*state,
etlng::impl::CacheExt{cacheUpdater},
etlng::impl::CoreExt{backend},
etlng::impl::SuccessorExt{backend, backend->cache()},
etlng::impl::NFTExt{backend}
),
amendmentBlockHandler
);
auto taskManagerProvider = std::make_shared<etlng::impl::TaskManagerProvider>(*ledgers, extractor, loader);
ret = std::make_shared<etlng::ETLService>(
ctx,
config,
backend,
balancer,
ledgers,
publisher,
cacheLoader,
cacheUpdater,
extractor,
loader, // loader itself
loader, // initial load observer
taskManagerProvider,
state
);
} else {
ASSERT(std::dynamic_pointer_cast<etl::LoadBalancer>(balancer), "LoadBalancer type must be etl::LoadBalancer");
ret = std::make_shared<etl::ETLService>(config, ioc, backend, subscriptions, balancer, ledgers);
}
// inject networkID into subscriptions, as transaction feed require it to inject CTID in response
if (auto const state = ret->getETLState(); state)
subscriptions->setNetworkID(state->networkID);
ret->run();
return ret;
}
// Database must be populated when this starts
std::optional<uint32_t>
ETLService::runETLPipeline(uint32_t startSequence, uint32_t numExtractors)

View File

@@ -22,7 +22,6 @@
#include "data/BackendInterface.hpp"
#include "etl/CacheLoader.hpp"
#include "etl/ETLState.hpp"
#include "etl/LoadBalancer.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/SystemState.hpp"
#include "etl/impl/AmendmentBlockHandler.hpp"
@@ -32,12 +31,12 @@
#include "etl/impl/LedgerLoader.hpp"
#include "etl/impl/LedgerPublisher.hpp"
#include "etl/impl/Transformer.hpp"
#include "etlng/ETLService.hpp"
#include "etlng/ETLServiceInterface.hpp"
#include "etlng/LoadBalancer.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/impl/LedgerPublisher.hpp"
#include "etlng/impl/TaskManagerProvider.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/Assert.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/log/Logger.hpp"
#include <boost/asio/io_context.hpp>
@@ -150,6 +149,7 @@ public:
*
* @param config The configuration to use
* @param ioc io context to run on
* @param ctx Execution context for asynchronous operations
* @param backend BackendInterface implementation
* @param subscriptions Subscription manager
* @param balancer Load balancer to use
@@ -160,34 +160,12 @@ public:
makeETLService(
util::config::ClioConfigDefinition const& config,
boost::asio::io_context& ioc,
util::async::AnyExecutionContext ctx,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<etlng::LoadBalancerInterface> balancer,
std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
)
{
std::shared_ptr<etlng::ETLServiceInterface> ret;
if (config.get<bool>("__ng_etl")) {
ASSERT(
std::dynamic_pointer_cast<etlng::LoadBalancer>(balancer),
"LoadBalancer type must be etlng::LoadBalancer"
);
ret = std::make_shared<etlng::ETLService>(config, backend, subscriptions, balancer, ledgers);
} else {
ASSERT(
std::dynamic_pointer_cast<etl::LoadBalancer>(balancer), "LoadBalancer type must be etl::LoadBalancer"
);
ret = std::make_shared<etl::ETLService>(config, ioc, backend, subscriptions, balancer, ledgers);
}
// inject networkID into subscriptions, as transaction feed require it to inject CTID in response
if (auto const state = ret->getETLState(); state)
subscriptions->setNetworkID(state->networkID);
ret->run();
return ret;
}
);
/**
* @brief Stops components and joins worker thread.

View File

@@ -24,6 +24,7 @@
#include "data/LedgerCacheInterface.hpp"
#include "data/Types.hpp"
#include "etl/SystemState.hpp"
#include "etlng/LedgerPublisherInterface.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/Assert.hpp"
#include "util/log/Logger.hpp"
@@ -31,6 +32,7 @@
#include "util/prometheus/Prometheus.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/strand.hpp>
#include <xrpl/basics/chrono.h>
#include <xrpl/protocol/Fees.h>
@@ -66,7 +68,7 @@ namespace etl::impl {
* includes reading all of the transactions from the database) is done from the application wide asio io_service, and a
* strand is used to ensure ledgers are published in order.
*/
class LedgerPublisher {
class LedgerPublisher : public etlng::LedgerPublisherInterface {
util::Logger log_{"ETL"};
boost::asio::strand<boost::asio::io_context::executor_type> publishStrand_;
@@ -121,7 +123,7 @@ public:
uint32_t ledgerSequence,
std::optional<uint32_t> maxAttempts,
std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
)
) override
{
LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
size_t numAttempts = 0;
@@ -235,7 +237,7 @@ public:
* @brief Get time passed since last publish, in seconds
*/
std::uint32_t
lastPublishAgeSeconds() const
lastPublishAgeSeconds() const override
{
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - getLastPublish())
.count();
@@ -245,7 +247,7 @@ public:
* @brief Get last publish time as a time point
*/
std::chrono::time_point<std::chrono::system_clock>
getLastPublish() const
getLastPublish() const override
{
return std::chrono::time_point<std::chrono::system_clock>{std::chrono::seconds{lastPublishSeconds_.get().value()
}};
@@ -255,7 +257,7 @@ public:
* @brief Get time passed since last ledger close, in seconds
*/
std::uint32_t
lastCloseAgeSeconds() const
lastCloseAgeSeconds() const override
{
std::shared_lock const lck(closeTimeMtx_);
auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())

View File

@@ -2,7 +2,8 @@ add_library(clio_etlng)
target_sources(
clio_etlng
PRIVATE LoadBalancer.cpp
PRIVATE ETLService.cpp
LoadBalancer.cpp
Source.cpp
impl/AmendmentBlockHandler.cpp
impl/AsyncGrpcCall.cpp

View File

@@ -0,0 +1,53 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <cstdint>
namespace etlng {
/**
* @brief An interface for the Cache Loader
*/
struct CacheLoaderInterface {
virtual ~CacheLoaderInterface() = default;
/**
* @brief Load the cache with the most recent ledger data
*
* @param seq The sequence number of the ledger to load
*/
virtual void
load(uint32_t const seq) = 0;
/**
* @brief Stop the cache loading process
*/
virtual void
stop() noexcept = 0;
/**
* @brief Wait for all cache loading tasks to complete
*/
virtual void
wait() noexcept = 0;
};
} // namespace etlng

View File

@@ -0,0 +1,66 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/Types.hpp"
#include "etlng/Models.hpp"
#include <cstdint>
#include <vector>
namespace etlng {
/**
* @brief An interface for the Cache Updater
*/
struct CacheUpdaterInterface {
virtual ~CacheUpdaterInterface() = default;
/**
* @brief Update the cache with ledger data
* @param data The ledger data to update with
*/
virtual void
update(model::LedgerData const& data) = 0;
/**
* @brief Update the cache with ledger objects at a specific sequence
* @param seq The ledger sequence number
* @param objs The ledger objects to update with
*/
virtual void
update(uint32_t seq, std::vector<data::LedgerObject> const& objs) = 0;
/**
* @brief Update the cache with model objects at a specific sequence
* @param seq The ledger sequence number
* @param objs The model objects to update with
*/
virtual void
update(uint32_t seq, std::vector<model::Object> const& objs) = 0;
/**
* @brief Mark the cache as fully loaded
*/
virtual void
setFull() = 0;
};
} // namespace etlng

275
src/etlng/ETLService.cpp Normal file
View File

@@ -0,0 +1,275 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etlng/ETLService.hpp"
#include "data/BackendInterface.hpp"
#include "data/LedgerCacheInterface.hpp"
#include "data/Types.hpp"
#include "etl/ETLState.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/SystemState.hpp"
#include "etl/impl/AmendmentBlockHandler.hpp"
#include "etl/impl/LedgerFetcher.hpp"
#include "etlng/CacheLoaderInterface.hpp"
#include "etlng/CacheUpdaterInterface.hpp"
#include "etlng/ExtractorInterface.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LedgerPublisherInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/MonitorInterface.hpp"
#include "etlng/TaskManagerProviderInterface.hpp"
#include "etlng/impl/AmendmentBlockHandler.hpp"
#include "etlng/impl/CacheUpdater.hpp"
#include "etlng/impl/Extraction.hpp"
#include "etlng/impl/LedgerPublisher.hpp"
#include "etlng/impl/Loading.hpp"
#include "etlng/impl/Monitor.hpp"
#include "etlng/impl/Registry.hpp"
#include "etlng/impl/Scheduling.hpp"
#include "etlng/impl/TaskManager.hpp"
#include "etlng/impl/ext/Cache.hpp"
#include "etlng/impl/ext/Core.hpp"
#include "etlng/impl/ext/NFT.hpp"
#include "etlng/impl/ext/Successor.hpp"
#include "util/Assert.hpp"
#include "util/Profiler.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/log/Logger.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/json/object.hpp>
#include <boost/signals2/connection.hpp>
#include <fmt/core.h>
#include <xrpl/basics/Blob.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/strHex.h>
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/protocol/TxMeta.h>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <utility>
namespace etlng {
ETLService::ETLService(
util::async::AnyExecutionContext ctx,
std::reference_wrapper<util::config::ClioConfigDefinition const> config,
std::shared_ptr<data::BackendInterface> backend,
std::shared_ptr<LoadBalancerInterface> balancer,
std::shared_ptr<etl::NetworkValidatedLedgersInterface> ledgers,
std::shared_ptr<LedgerPublisherInterface> publisher,
std::shared_ptr<CacheLoaderInterface> cacheLoader,
std::shared_ptr<CacheUpdaterInterface> cacheUpdater,
std::shared_ptr<ExtractorInterface> extractor,
std::shared_ptr<LoaderInterface> loader,
std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver,
std::shared_ptr<etlng::TaskManagerProviderInterface> taskManagerProvider,
std::shared_ptr<etl::SystemState> state
)
: ctx_(std::move(ctx))
, config_(config)
, backend_(std::move(backend))
, balancer_(std::move(balancer))
, ledgers_(std::move(ledgers))
, publisher_(std::move(publisher))
, cacheLoader_(std::move(cacheLoader))
, cacheUpdater_(std::move(cacheUpdater))
, extractor_(std::move(extractor))
, loader_(std::move(loader))
, initialLoadObserver_(std::move(initialLoadObserver))
, taskManagerProvider_(std::move(taskManagerProvider))
, state_(std::move(state))
{
LOG(log_.info()) << "Creating ETLng...";
}
ETLService::~ETLService()
{
stop();
LOG(log_.debug()) << "Destroying ETLng";
}
void
ETLService::run()
{
LOG(log_.info()) << "Running ETLng...";
// TODO: write-enabled node should start in readonly and do the 10 second dance to become a writer
mainLoop_.emplace(ctx_.execute([this] {
state_->isWriting =
not state_->isReadOnly; // TODO: this is now needed because we don't have a mechanism for readonly or
// ETL writer node. remove later in favor of real mechanism
auto const rng = loadInitialLedgerIfNeeded();
LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
std::optional<uint32_t> const mostRecentValidated = ledgers_->getMostRecent();
if (not mostRecentValidated) {
LOG(log_.info()) << "The wait for the next validated ledger has been aborted. "
"Exiting monitor loop";
return;
}
ASSERT(rng.has_value(), "Ledger range can't be null");
auto const nextSequence = rng->maxSequence + 1;
LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence;
startMonitor(nextSequence);
// TODO: we only want to run the full ETL task man if we are POSSIBLY a write node
// but definitely not in strict readonly
if (not state_->isReadOnly)
startLoading(nextSequence);
}));
}
void
ETLService::stop()
{
LOG(log_.info()) << "Stop called";
if (taskMan_)
taskMan_->stop();
if (monitor_)
monitor_->stop();
}
boost::json::object
ETLService::getInfo() const
{
boost::json::object result;
result["etl_sources"] = balancer_->toJson();
result["is_writer"] = static_cast<int>(state_->isWriting);
result["read_only"] = static_cast<int>(state_->isReadOnly);
auto last = publisher_->getLastPublish();
if (last.time_since_epoch().count() != 0)
result["last_publish_age_seconds"] = std::to_string(publisher_->lastPublishAgeSeconds());
return result;
}
bool
ETLService::isAmendmentBlocked() const
{
return state_->isAmendmentBlocked;
}
bool
ETLService::isCorruptionDetected() const
{
return state_->isCorruptionDetected;
}
std::optional<etl::ETLState>
ETLService::getETLState() const
{
return balancer_->getETLState();
}
std::uint32_t
ETLService::lastCloseAgeSeconds() const
{
return publisher_->lastCloseAgeSeconds();
}
std::optional<data::LedgerRange>
ETLService::loadInitialLedgerIfNeeded()
{
auto rng = backend_->hardFetchLedgerRangeNoThrow();
if (not rng.has_value()) {
LOG(log_.info()) << "Database is empty. Will download a ledger from the network.";
LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
if (auto const mostRecentValidated = ledgers_->getMostRecent(); mostRecentValidated.has_value()) {
auto const seq = *mostRecentValidated;
LOG(log_.info()) << "Ledger " << seq << " has been validated. Downloading... ";
auto [ledger, timeDiff] = ::util::timed<std::chrono::duration<double>>([this, seq]() {
return extractor_->extractLedgerOnly(seq).and_then([this, seq](auto&& data) {
// TODO: loadInitialLedger in balancer should be called fetchEdgeKeys or similar
data.edgeKeys = balancer_->loadInitialLedger(seq, *initialLoadObserver_);
// TODO: this should be interruptible for graceful shutdown
return loader_->loadInitialLedger(data);
});
});
if (not ledger.has_value()) {
LOG(log_.error()) << "Failed to load initial ledger. Exiting monitor loop";
return std::nullopt;
}
LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff;
LOG(log_.info()) << "Finished loadInitialLedger. cache size = " << backend_->cache().size();
return backend_->hardFetchLedgerRangeNoThrow();
}
LOG(log_.info()) << "The wait for the next validated ledger has been aborted. "
"Exiting monitor loop";
return std::nullopt;
}
LOG(log_.info()) << "Database already populated. Picking up from the tip of history";
cacheLoader_->load(rng->maxSequence);
return rng;
}
void
ETLService::startMonitor(uint32_t seq)
{
monitor_ = std::make_unique<impl::Monitor>(ctx_, backend_, ledgers_, seq);
monitorSubscription_ = monitor_->subscribe([this](uint32_t seq) {
log_.info() << "MONITOR got new seq from db: " << seq;
// FIXME: is this the best way?
if (not state_->isWriting) {
auto const diff = data::synchronousAndRetryOnTimeout([this, seq](auto yield) {
return backend_->fetchLedgerDiff(seq, yield);
});
cacheUpdater_->update(seq, diff);
}
publisher_->publish(seq, {});
});
monitor_->run();
}
void
ETLService::startLoading(uint32_t seq)
{
taskMan_ = taskManagerProvider_->make(ctx_, *monitor_, seq);
taskMan_->run(config_.get().get<std::size_t>("extractor_threads"));
}
} // namespace etlng

View File

@@ -20,22 +20,27 @@
#pragma once
#include "data/BackendInterface.hpp"
#include "data/LedgerCache.hpp"
#include "data/Types.hpp"
#include "etl/CacheLoader.hpp"
#include "etl/ETLState.hpp"
#include "etl/LedgerFetcherInterface.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/SystemState.hpp"
#include "etl/impl/AmendmentBlockHandler.hpp"
#include "etl/impl/LedgerFetcher.hpp"
#include "etl/impl/LedgerPublisher.hpp"
#include "etlng/AmendmentBlockHandlerInterface.hpp"
#include "etlng/CacheLoaderInterface.hpp"
#include "etlng/CacheUpdaterInterface.hpp"
#include "etlng/ETLServiceInterface.hpp"
#include "etlng/ExtractorInterface.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LedgerPublisherInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/MonitorInterface.hpp"
#include "etlng/TaskManagerInterface.hpp"
#include "etlng/TaskManagerProviderInterface.hpp"
#include "etlng/impl/AmendmentBlockHandler.hpp"
#include "etlng/impl/CacheUpdater.hpp"
#include "etlng/impl/Extraction.hpp"
#include "etlng/impl/LedgerPublisher.hpp"
#include "etlng/impl/Loading.hpp"
#include "etlng/impl/Monitor.hpp"
#include "etlng/impl/Registry.hpp"
@@ -45,14 +50,14 @@
#include "etlng/impl/ext/Core.hpp"
#include "etlng/impl/ext/NFT.hpp"
#include "etlng/impl/ext/Successor.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/Assert.hpp"
#include "util/Profiler.hpp"
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/config/ConfigDefinition.hpp"
#include "util/log/Logger.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/json/object.hpp>
#include <boost/signals2/connection.hpp>
#include <fmt/core.h>
#include <xrpl/basics/Blob.h>
#include <xrpl/basics/base_uint.h>
@@ -64,15 +69,12 @@
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/protocol/TxMeta.h>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <ranges>
#include <stdexcept>
#include <string>
#include <tuple>
#include <utility>
namespace etlng {
@@ -92,191 +94,94 @@ namespace etlng {
class ETLService : public ETLServiceInterface {
util::Logger log_{"ETL"};
util::async::AnyExecutionContext ctx_;
std::reference_wrapper<util::config::ClioConfigDefinition const> config_;
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
std::shared_ptr<etlng::LoadBalancerInterface> balancer_;
std::shared_ptr<LoadBalancerInterface> balancer_;
std::shared_ptr<etl::NetworkValidatedLedgersInterface> ledgers_;
std::shared_ptr<etl::CacheLoader<>> cacheLoader_;
std::shared_ptr<etl::LedgerFetcherInterface> fetcher_;
std::shared_ptr<LedgerPublisherInterface> publisher_;
std::shared_ptr<CacheLoaderInterface> cacheLoader_;
std::shared_ptr<CacheUpdaterInterface> cacheUpdater_;
std::shared_ptr<ExtractorInterface> extractor_;
std::shared_ptr<LoaderInterface> loader_;
std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver_;
std::shared_ptr<etlng::TaskManagerProviderInterface> taskManagerProvider_;
std::shared_ptr<etl::SystemState> state_;
etl::SystemState state_;
util::async::CoroExecutionContext ctx_{8};
std::unique_ptr<MonitorInterface> monitor_;
std::unique_ptr<TaskManagerInterface> taskMan_;
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler_;
std::shared_ptr<impl::Loader> loader_;
boost::signals2::scoped_connection monitorSubscription_;
std::optional<util::async::CoroExecutionContext::Operation<void>> mainLoop_;
std::optional<util::async::AnyOperation<void>> mainLoop_;
public:
/**
* @brief Create an instance of ETLService.
*
* @param config The configuration to use
* @param backend BackendInterface implementation
* @param subscriptions Subscription manager
* @param balancer Load balancer to use
* @param ledgers The network validated ledgers datastructure
* @param ctx The execution context for asynchronous operations
* @param config The Clio configuration definition
* @param backend Interface to the backend database
* @param balancer Load balancer for distributing work
* @param ledgers Interface for accessing network validated ledgers
* @param publisher Interface for publishing ledger data
* @param cacheLoader Interface for loading cache data
* @param cacheUpdater Interface for updating cache data
* @param extractor The extractor to use
* @param loader Interface for loading data
* @param initialLoadObserver The observer for initial data loading
* @param taskManagerProvider The provider of the task manager instance
* @param state System state tracking object
*/
ETLService(
util::config::ClioConfigDefinition const& config,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<etlng::LoadBalancerInterface> balancer,
std::shared_ptr<etl::NetworkValidatedLedgersInterface> ledgers
)
: backend_(std::move(backend))
, subscriptions_(std::move(subscriptions))
, balancer_(std::move(balancer))
, ledgers_(std::move(ledgers))
, cacheLoader_(std::make_shared<etl::CacheLoader<>>(config, backend_, backend_->cache()))
, fetcher_(std::make_shared<etl::impl::LedgerFetcher>(backend_, balancer_))
, extractor_(std::make_shared<impl::Extractor>(fetcher_))
, amendmentBlockHandler_(std::make_shared<etlng::impl::AmendmentBlockHandler>(ctx_, state_))
, loader_(std::make_shared<impl::Loader>(
backend_,
fetcher_,
impl::makeRegistry(
impl::CacheExt{backend_->cache()},
impl::CoreExt{backend_},
impl::SuccessorExt{backend_, backend_->cache()},
impl::NFTExt{backend_}
),
amendmentBlockHandler_
))
{
LOG(log_.info()) << "Creating ETLng...";
}
util::async::AnyExecutionContext ctx,
std::reference_wrapper<util::config::ClioConfigDefinition const> config,
std::shared_ptr<data::BackendInterface> backend,
std::shared_ptr<LoadBalancerInterface> balancer,
std::shared_ptr<etl::NetworkValidatedLedgersInterface> ledgers,
std::shared_ptr<LedgerPublisherInterface> publisher,
std::shared_ptr<CacheLoaderInterface> cacheLoader,
std::shared_ptr<CacheUpdaterInterface> cacheUpdater,
std::shared_ptr<ExtractorInterface> extractor,
std::shared_ptr<LoaderInterface> loader,
std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver,
std::shared_ptr<etlng::TaskManagerProviderInterface> taskManagerProvider,
std::shared_ptr<etl::SystemState> state
);
~ETLService() override
{
LOG(log_.debug()) << "Stopping ETLng";
}
~ETLService() override;
void
run() override
{
LOG(log_.info()) << "run() in ETLng...";
mainLoop_.emplace(ctx_.execute([this] {
auto const rng = loadInitialLedgerIfNeeded();
LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
std::optional<uint32_t> const mostRecentValidated = ledgers_->getMostRecent();
if (not mostRecentValidated) {
LOG(log_.info()) << "The wait for the next validated ledger has been aborted. "
"Exiting monitor loop";
return;
}
ASSERT(rng.has_value(), "Ledger range can't be null");
auto const nextSequence = rng->maxSequence + 1;
LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence;
auto scheduler = impl::makeScheduler(impl::ForwardScheduler{*ledgers_, nextSequence}
// impl::BackfillScheduler{nextSequence - 1, nextSequence - 1000},
// TODO lift limit and start with rng.minSeq
);
auto man = impl::TaskManager(ctx_, *scheduler, *extractor_, *loader_);
// TODO: figure out this: std::make_shared<impl::Monitor>(backend_, ledgers_, nextSequence)
man.run({}); // TODO: needs to be interruptible and fill out settings
}));
}
run() override;
void
stop() override
{
LOG(log_.info()) << "Stop called";
// TODO: stop the service correctly
}
stop() override;
boost::json::object
getInfo() const override
{
// TODO
return {{"ok", true}};
}
getInfo() const override;
bool
isAmendmentBlocked() const override
{
// TODO
return false;
}
isAmendmentBlocked() const override;
bool
isCorruptionDetected() const override
{
// TODO
return false;
}
isCorruptionDetected() const override;
std::optional<etl::ETLState>
getETLState() const override
{
// TODO
return std::nullopt;
}
getETLState() const override;
std::uint32_t
lastCloseAgeSeconds() const override
{
// TODO
return 0;
}
lastCloseAgeSeconds() const override;
private:
// TODO: this better be std::expected
std::optional<data::LedgerRange>
loadInitialLedgerIfNeeded()
{
if (auto rng = backend_->hardFetchLedgerRangeNoThrow(); not rng.has_value()) {
LOG(log_.info()) << "Database is empty. Will download a ledger from the network.";
loadInitialLedgerIfNeeded();
try {
LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
if (auto const mostRecentValidated = ledgers_->getMostRecent(); mostRecentValidated.has_value()) {
auto const seq = *mostRecentValidated;
LOG(log_.info()) << "Ledger " << seq << " has been validated. Downloading... ";
void
startMonitor(uint32_t seq);
auto [ledger, timeDiff] = ::util::timed<std::chrono::duration<double>>([this, seq]() {
return extractor_->extractLedgerOnly(seq).and_then([this, seq](auto&& data) {
// TODO: loadInitialLedger in balancer should be called fetchEdgeKeys or similar
data.edgeKeys = balancer_->loadInitialLedger(seq, *loader_);
// TODO: this should be interruptible for graceful shutdown
return loader_->loadInitialLedger(data);
});
});
LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff;
LOG(log_.info()) << "Finished loadInitialLedger. cache size = " << backend_->cache().size();
if (ledger.has_value())
return backend_->hardFetchLedgerRangeNoThrow();
LOG(log_.error()) << "Failed to load initial ledger. Exiting monitor loop";
} else {
LOG(log_.info()) << "The wait for the next validated ledger has been aborted. "
"Exiting monitor loop";
}
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to load initial ledger: " << e.what();
amendmentBlockHandler_->notifyAmendmentBlocked();
}
} else {
LOG(log_.info()) << "Database already populated. Picking up from the tip of history";
cacheLoader_->load(rng->maxSequence);
return rng;
}
return std::nullopt;
}
void
startLoading(uint32_t seq);
};
} // namespace etlng

View File

@@ -37,13 +37,38 @@ struct LedgerPublisherInterface {
* @param seq The sequence number of the ledger
* @param maxAttempts The maximum number of attempts to publish the ledger; no limit if nullopt
* @param attemptsDelay The delay between attempts
* @return Whether the ledger was found in the database and published
*/
virtual void
virtual bool
publish(
uint32_t seq,
std::optional<uint32_t> maxAttempts,
std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
) = 0;
/**
* @brief Get last publish time as a time point
*
* @return A std::chrono::time_point representing the time of the last publish
*/
virtual std::chrono::time_point<std::chrono::system_clock>
getLastPublish() const = 0;
/**
* @brief Get time passed since last ledger close, in seconds
*
* @return The number of seconds since the last ledger close as std::uint32_t
*/
virtual std::uint32_t
lastCloseAgeSeconds() const = 0;
/**
* @brief Get time passed since last publish, in seconds
*
* @return The number of seconds since the last publish as std::uint32_t
*/
virtual std::uint32_t
lastPublishAgeSeconds() const = 0;
};
} // namespace etlng

View File

@@ -40,6 +40,13 @@ public:
virtual ~MonitorInterface() = default;
/**
* @brief Allows the loading process to notify of a freshly committed ledger
* @param seq The ledger sequence loaded
*/
virtual void
notifyLedgerLoaded(uint32_t seq) = 0;
/**
* @brief Allows clients to get notified when a new ledger becomes available in Clio's database
*

View File

@@ -0,0 +1,46 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <cstddef>
namespace etlng {
/**
* @brief An interface for the Task Manager
*/
struct TaskManagerInterface {
virtual ~TaskManagerInterface() = default;
/**
* @brief Start the task manager with specified settings
* @param numExtractors The number of extraction tasks
*/
virtual void
run(size_t numExtractors) = 0;
/**
* @brief Stop the task manager
*/
virtual void
stop() = 0;
};
} // namespace etlng

View File

@@ -0,0 +1,51 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etlng/MonitorInterface.hpp"
#include "etlng/TaskManagerInterface.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
namespace etlng {
/**
* @brief An interface for providing the Task Manager
*/
struct TaskManagerProviderInterface {
virtual ~TaskManagerProviderInterface() = default;
/**
* @brief Make a task manager
*
* @param ctx The async context to associate the task manager instance with
* @param monitor The monitor to notify when ledger is loaded
* @param seq The sequence to start at
* @return A unique pointer to a TaskManager implementation
*/
virtual std::unique_ptr<TaskManagerInterface>
make(util::async::AnyExecutionContext ctx, std::reference_wrapper<MonitorInterface> monitor, uint32_t seq) = 0;
};
} // namespace etlng

View File

@@ -36,7 +36,7 @@ AmendmentBlockHandler::ActionType const AmendmentBlockHandler::kDEFAULT_AMENDMEN
};
AmendmentBlockHandler::AmendmentBlockHandler(
util::async::AnyExecutionContext&& ctx,
util::async::AnyExecutionContext ctx,
etl::SystemState& state,
std::chrono::steady_clock::duration interval,
ActionType action

View File

@@ -50,7 +50,7 @@ public:
static ActionType const kDEFAULT_AMENDMENT_BLOCK_ACTION;
AmendmentBlockHandler(
util::async::AnyExecutionContext&& ctx,
util::async::AnyExecutionContext ctx,
etl::SystemState& state,
std::chrono::steady_clock::duration interval = std::chrono::seconds{1},
ActionType action = kDEFAULT_AMENDMENT_BLOCK_ACTION

View File

@@ -0,0 +1,69 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/LedgerCacheInterface.hpp"
#include "data/Types.hpp"
#include "etlng/CacheUpdaterInterface.hpp"
#include "etlng/Models.hpp"
#include "util/log/Logger.hpp"
#include <cstdint>
#include <functional>
#include <vector>
namespace etlng::impl {
class CacheUpdater : public CacheUpdaterInterface {
std::reference_wrapper<data::LedgerCacheInterface> cache_;
util::Logger log_{"ETL"};
public:
CacheUpdater(data::LedgerCacheInterface& cache) : cache_{cache}
{
}
void
update(model::LedgerData const& data) override
{
cache_.get().update(data.objects, data.seq);
}
void
update(uint32_t seq, std::vector<data::LedgerObject> const& objs) override
{
cache_.get().update(objs, seq);
}
void
update(uint32_t seq, std::vector<model::Object> const& objs) override
{
cache_.get().update(objs, seq);
}
void
setFull() override
{
cache_.get().setFull();
}
};
} // namespace etlng::impl

View File

@@ -90,6 +90,13 @@ public:
{
}
Extractor(Extractor const&) = delete;
Extractor(Extractor&&) = delete;
Extractor&
operator=(Extractor const&) = delete;
Extractor&
operator=(Extractor&&) = delete;
[[nodiscard]] std::optional<model::LedgerData>
extractLedgerWithDiff(uint32_t seq) override;

View File

@@ -0,0 +1,286 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/BackendInterface.hpp"
#include "data/DBHelpers.hpp"
#include "data/Types.hpp"
#include "etl/SystemState.hpp"
#include "etlng/LedgerPublisherInterface.hpp"
#include "etlng/impl/Loading.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/Assert.hpp"
#include "util/Mutex.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Counter.hpp"
#include "util/prometheus/Prometheus.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/strand.hpp>
#include <xrpl/basics/chrono.h>
#include <xrpl/protocol/Fees.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/SField.h>
#include <xrpl/protocol/STObject.h>
#include <xrpl/protocol/Serializer.h>
#include <algorithm>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <shared_mutex>
#include <string>
#include <thread>
#include <utility>
#include <vector>
namespace etlng::impl {
/**
* @brief Publishes ledgers in a synchronized fashion.
*
* If ETL is started far behind the network, ledgers will be written and published very rapidly. Monitoring processes
* will publish ledgers as they are written. However, to publish a ledger, the monitoring process needs to read all of
* the transactions for that ledger from the database. Reading the transactions from the database requires network
* calls, which can be slow. It is imperative however that the monitoring processes keep up with the writer, else the
* monitoring processes will not be able to detect if the writer failed. Therefore, publishing each ledger (which
* includes reading all of the transactions from the database) is done from the application wide asio io_service, and a
* strand is used to ensure ledgers are published in order.
*/
class LedgerPublisher : public etlng::LedgerPublisherInterface {
util::Logger log_{"ETL"};
boost::asio::strand<boost::asio::io_context::executor_type> publishStrand_;
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
std::reference_wrapper<etl::SystemState const> state_; // shared state for ETL
util::Mutex<std::chrono::time_point<ripple::NetClock>, std::shared_mutex> lastCloseTime_;
std::reference_wrapper<util::prometheus::CounterInt> lastPublishSeconds_ = PrometheusService::counterInt(
"etl_last_publish_seconds",
{},
"Seconds since epoch of the last published ledger"
);
util::Mutex<std::optional<uint32_t>, std::shared_mutex> lastPublishedSequence_;
public:
/**
* @brief Create an instance of the publisher
*/
LedgerPublisher(
boost::asio::io_context& ioc, // TODO: replace with AsyncContext shared with ETLServiceNg
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
etl::SystemState const& state
)
: publishStrand_{boost::asio::make_strand(ioc)}
, backend_{std::move(backend)}
, subscriptions_{std::move(subscriptions)}
, state_{std::cref(state)}
{
}
/**
* @brief Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers
* stream.
*
* @param ledgerSequence the sequence of the ledger to publish
* @param maxAttempts the number of times to attempt to read the ledger from the database
* @param attemptsDelay the delay between attempts to read the ledger from the database
* @return Whether the ledger was found in the database and published
*/
bool
publish(
uint32_t ledgerSequence,
std::optional<uint32_t> maxAttempts,
std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
) override
{
LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
size_t numAttempts = 0;
while (not state_.get().isStopping) {
auto range = backend_->hardFetchLedgerRangeNoThrow();
if (!range || range->maxSequence < ledgerSequence) {
++numAttempts;
LOG(log_.debug()) << "Trying to publish. Could not find ledger with sequence = " << ledgerSequence;
// We try maxAttempts times to publish the ledger, waiting one second in between each attempt.
if (maxAttempts && numAttempts >= maxAttempts) {
LOG(log_.debug()) << "Failed to publish ledger after " << numAttempts << " attempts.";
return false;
}
std::this_thread::sleep_for(attemptsDelay);
continue;
}
auto lgr = data::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchLedgerBySequence(ledgerSequence, yield);
});
ASSERT(lgr.has_value(), "Ledger must exist in database. Ledger sequence = {}", ledgerSequence);
publish(*lgr);
return true;
}
return false;
}
/**
* @brief Publish the passed ledger asynchronously.
*
* All ledgers are published thru publishStrand_ which ensures that all publishes are performed in a serial fashion.
*
* @param lgrInfo the ledger to publish
*/
void
publish(ripple::LedgerHeader const& lgrInfo)
{
boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() {
LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq);
// TODO: This should probably not be part of publisher in the future
if (not state_.get().isWriting)
backend_->updateRange(lgrInfo.seq); // This can't be unit tested atm.
setLastClose(lgrInfo.closeTime);
auto age = lastCloseAgeSeconds();
// if the ledger closed over MAX_LEDGER_AGE_SECONDS ago, assume we are still catching up and don't publish
static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600;
if (age < kMAX_LEDGER_AGE_SECONDS) {
std::optional<ripple::Fees> fees = data::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchFees(lgrInfo.seq, yield);
});
ASSERT(fees.has_value(), "Fees must exist for ledger {}", lgrInfo.seq);
auto transactions = data::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
});
auto const ledgerRange = backend_->fetchLedgerRange();
ASSERT(ledgerRange.has_value(), "Ledger range must exist");
auto const range = fmt::format("{}-{}", ledgerRange->minSequence, ledgerRange->maxSequence);
subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size());
// order with transaction index
std::ranges::sort(transactions, [](auto const& t1, auto const& t2) {
ripple::SerialIter iter1{t1.metadata.data(), t1.metadata.size()};
ripple::STObject const object1(iter1, ripple::sfMetadata);
ripple::SerialIter iter2{t2.metadata.data(), t2.metadata.size()};
ripple::STObject const object2(iter2, ripple::sfMetadata);
return object1.getFieldU32(ripple::sfTransactionIndex) <
object2.getFieldU32(ripple::sfTransactionIndex);
});
for (auto const& txAndMeta : transactions)
subscriptions_->pubTransaction(txAndMeta, lgrInfo);
subscriptions_->pubBookChanges(lgrInfo, transactions);
setLastPublishTime();
LOG(log_.info()) << "Published ledger " << lgrInfo.seq;
} else {
LOG(log_.info()) << "Skipping publishing ledger " << lgrInfo.seq;
}
});
// we track latest publish-requested seq, not necessarily already published
setLastPublishedSequence(lgrInfo.seq);
}
/**
* @brief Get time passed since last publish, in seconds
*/
std::uint32_t
lastPublishAgeSeconds() const override
{
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - getLastPublish())
.count();
}
/**
* @brief Get last publish time as a time point
*/
std::chrono::time_point<std::chrono::system_clock>
getLastPublish() const override
{
return std::chrono::time_point<std::chrono::system_clock>{std::chrono::seconds{lastPublishSeconds_.get().value()
}};
}
/**
* @brief Get time passed since last ledger close, in seconds
*/
std::uint32_t
lastCloseAgeSeconds() const override
{
auto closeTime = lastCloseTime_.lock()->time_since_epoch().count();
auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
if (now < (kRIPPLE_EPOCH_START + closeTime))
return 0;
return now - (kRIPPLE_EPOCH_START + closeTime);
}
/**
* @brief Get the sequence of the last schueduled ledger to publish, Be aware that the ledger may not have been
* published to network
*/
std::optional<uint32_t>
getLastPublishedSequence() const
{
return *lastPublishedSequence_.lock();
}
private:
void
setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)
{
auto closeTime = lastCloseTime_.lock<std::scoped_lock>();
*closeTime = lastCloseTime;
}
void
setLastPublishTime()
{
using namespace std::chrono;
auto const nowSeconds = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
lastPublishSeconds_.get().set(nowSeconds);
}
void
setLastPublishedSequence(std::optional<uint32_t> lastPublishedSequence)
{
auto lastPublishSeq = lastPublishedSequence_.lock();
*lastPublishSeq = lastPublishedSequence;
}
};
} // namespace etlng::impl

View File

@@ -20,7 +20,6 @@
#include "etlng/impl/Loading.hpp"
#include "data/BackendInterface.hpp"
#include "etl/LedgerFetcherInterface.hpp"
#include "etl/impl/LedgerLoader.hpp"
#include "etlng/AmendmentBlockHandlerInterface.hpp"
#include "etlng/Models.hpp"
@@ -46,12 +45,10 @@ namespace etlng::impl {
Loader::Loader(
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<etl::LedgerFetcherInterface> fetcher,
std::shared_ptr<RegistryInterface> registry,
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler
)
: backend_(std::move(backend))
, fetcher_(std::move(fetcher))
, registry_(std::move(registry))
, amendmentBlockHandler_(std::move(amendmentBlockHandler))
{
@@ -81,31 +78,44 @@ Loader::onInitialLoadGotMoreObjects(
std::optional<std::string> lastKey
)
{
LOG(log_.debug()) << "On initial load: got more objects for seq " << seq << ". size = " << data.size();
registry_->dispatchInitialObjects(
seq, data, std::move(lastKey).value_or(std::string{}) // TODO: perhaps use optional all the way to extensions?
);
try {
LOG(log_.debug()) << "On initial load: got more objects for seq " << seq << ". size = " << data.size();
registry_->dispatchInitialObjects(
seq,
data,
std::move(lastKey).value_or(std::string{}) // TODO: perhaps use optional all the way to extensions?
);
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to load initial objects for " << seq << ": " << e.what();
amendmentBlockHandler_->notifyAmendmentBlocked();
}
}
std::optional<ripple::LedgerHeader>
Loader::loadInitialLedger(model::LedgerData const& data)
{
// check that database is actually empty
auto rng = backend_->hardFetchLedgerRangeNoThrow();
if (rng) {
ASSERT(false, "Database is not empty");
try {
// check that database is actually empty
auto rng = backend_->hardFetchLedgerRangeNoThrow();
if (rng) {
ASSERT(false, "Database is not empty");
return std::nullopt;
}
LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(data.header);
auto seconds = ::util::timed<std::chrono::seconds>([this, &data]() { registry_->dispatchInitialData(data); });
LOG(log_.info()) << "Dispatching initial data and submitting all writes took " << seconds << " seconds.";
backend_->finishWrites(data.seq);
LOG(log_.debug()) << "Loaded initial ledger";
return {data.header};
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to load initial ledger " << data.seq << ": " << e.what();
amendmentBlockHandler_->notifyAmendmentBlocked();
return std::nullopt;
}
LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(data.header);
auto seconds = ::util::timed<std::chrono::seconds>([this, &data]() { registry_->dispatchInitialData(data); });
LOG(log_.info()) << "Dispatching initial data and submitting all writes took " << seconds << " seconds.";
backend_->finishWrites(data.seq);
LOG(log_.debug()) << "Loaded initial ledger";
return {data.header};
}
} // namespace etlng::impl

View File

@@ -49,7 +49,6 @@ namespace etlng::impl {
class Loader : public LoaderInterface, public InitialLoadObserverInterface {
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<etl::LedgerFetcherInterface> fetcher_;
std::shared_ptr<RegistryInterface> registry_;
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler_;
@@ -62,11 +61,17 @@ public:
Loader(
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<etl::LedgerFetcherInterface> fetcher,
std::shared_ptr<RegistryInterface> registry,
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler
);
Loader(Loader const&) = delete;
Loader(Loader&&) = delete;
Loader&
operator=(Loader const&) = delete;
Loader&
operator=(Loader&&) = delete;
void
load(model::LedgerData const& data) override;

View File

@@ -55,6 +55,15 @@ Monitor::~Monitor()
stop();
}
// TODO: think about using signals perhaps? maybe combining with onNextSequence?
// also, how do we not double invoke or does it not matter
void
Monitor::notifyLedgerLoaded(uint32_t seq)
{
LOG(log_.debug()) << "Loader notified about newly committed ledger " << seq;
repeatedTask_->invoke(); // force-invoke immediately
};
void
Monitor::run(std::chrono::steady_clock::duration repeatInterval)
{

View File

@@ -60,6 +60,9 @@ public:
);
~Monitor() override;
void
notifyLedgerLoaded(uint32_t seq) override;
void
run(std::chrono::steady_clock::duration repeatInterval) override;

View File

@@ -19,12 +19,15 @@
#pragma once
#include "etl/SystemState.hpp"
#include "etlng/Models.hpp"
#include "etlng/RegistryInterface.hpp"
#include <xrpl/protocol/TxFormats.h>
#include <concepts>
#include <cstdint>
#include <functional>
#include <string>
#include <tuple>
#include <type_traits>
@@ -88,6 +91,7 @@ concept SomeExtension = NoTwoOfKind<T> and ContainsValidHook<T>;
template <SomeExtension... Ps>
class Registry : public RegistryInterface {
std::reference_wrapper<etl::SystemState const> state_;
std::tuple<Ps...> store_;
static_assert(
@@ -101,9 +105,9 @@ class Registry : public RegistryInterface {
);
public:
explicit constexpr Registry(SomeExtension auto&&... exts)
explicit constexpr Registry(etl::SystemState const& state, SomeExtension auto&&... exts)
requires(std::is_same_v<std::decay_t<decltype(exts)>, std::decay_t<Ps>> and ...)
: store_(std::forward<Ps>(exts)...)
: state_{state}, store_(std::forward<Ps>(exts)...)
{
}
@@ -121,9 +125,8 @@ public:
// send entire batch of data at once
{
auto const expand = [&](auto& p) {
if constexpr (requires { p.onLedgerData(data); }) {
p.onLedgerData(data);
}
if constexpr (requires { p.onLedgerData(data); })
executeIfAllowed(p, [&data](auto& p) { p.onLedgerData(data); });
};
std::apply([&expand](auto&&... xs) { (expand(xs), ...); }, store_);
@@ -134,7 +137,7 @@ public:
auto const expand = [&]<typename P>(P& p, model::Transaction const& t) {
if constexpr (requires { p.onTransaction(data.seq, t); }) {
if (std::decay_t<P>::spec::wants(t.type))
p.onTransaction(data.seq, t);
executeIfAllowed(p, [&data, &t](auto& p) { p.onTransaction(data.seq, t); });
}
};
@@ -146,9 +149,8 @@ public:
// send per object path
{
auto const expand = [&]<typename P>(P&& p, model::Object const& o) {
if constexpr (requires { p.onObject(data.seq, o); }) {
p.onObject(data.seq, o);
}
if constexpr (requires { p.onObject(data.seq, o); })
executeIfAllowed(p, [&data, &o](auto& p) { p.onObject(data.seq, o); });
};
for (auto const& obj : data.objects) {
@@ -163,9 +165,8 @@ public:
// send entire vector path
{
auto const expand = [&](auto&& p) {
if constexpr (requires { p.onInitialObjects(seq, data, lastKey); }) {
p.onInitialObjects(seq, data, lastKey);
}
if constexpr (requires { p.onInitialObjects(seq, data, lastKey); })
executeIfAllowed(p, [seq, &data, &lastKey](auto& p) { p.onInitialObjects(seq, data, lastKey); });
};
std::apply([&expand](auto&&... xs) { (expand(xs), ...); }, store_);
@@ -174,9 +175,8 @@ public:
// send per object path
{
auto const expand = [&]<typename P>(P&& p, model::Object const& o) {
if constexpr (requires { p.onInitialObject(seq, o); }) {
p.onInitialObject(seq, o);
}
if constexpr (requires { p.onInitialObject(seq, o); })
executeIfAllowed(p, [seq, &o](auto& p) { p.onInitialObject(seq, o); });
};
for (auto const& obj : data) {
@@ -191,9 +191,8 @@ public:
// send entire batch path
{
auto const expand = [&](auto&& p) {
if constexpr (requires { p.onInitialData(data); }) {
p.onInitialData(data);
}
if constexpr (requires { p.onInitialData(data); })
executeIfAllowed(p, [&data](auto& p) { p.onInitialData(data); });
};
std::apply([&expand](auto&&... xs) { (expand(xs), ...); }, store_);
@@ -204,7 +203,7 @@ public:
auto const expand = [&]<typename P>(P&& p, model::Transaction const& tx) {
if constexpr (requires { p.onInitialTransaction(data.seq, tx); }) {
if (std::decay_t<P>::spec::wants(tx.type))
p.onInitialTransaction(data.seq, tx);
executeIfAllowed(p, [&data, &tx](auto& p) { p.onInitialTransaction(data.seq, tx); });
}
};
@@ -213,12 +212,25 @@ public:
}
}
}
private:
void
executeIfAllowed(auto& p, auto&& fn)
{
if constexpr (requires { p.allowInReadonly(); }) {
if (state_.get().isWriting or p.allowInReadonly())
fn(p);
} else {
if (state_.get().isWriting)
fn(p);
}
}
};
static auto
makeRegistry(auto&&... exts)
makeRegistry(etl::SystemState const& state, auto&&... exts)
{
return std::make_unique<Registry<std::decay_t<decltype(exts)>...>>(std::forward<decltype(exts)>(exts)...);
return std::make_unique<Registry<std::decay_t<decltype(exts)>...>>(state, std::forward<decltype(exts)>(exts)...);
}
} // namespace etlng::impl

View File

@@ -22,15 +22,21 @@
#include "etlng/ExtractorInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/MonitorInterface.hpp"
#include "etlng/SchedulerInterface.hpp"
#include "etlng/impl/Monitor.hpp"
#include "etlng/impl/TaskQueue.hpp"
#include "util/LedgerUtils.hpp"
#include "util/Profiler.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/async/AnyStrand.hpp"
#include "util/log/Logger.hpp"
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <ranges>
#include <thread>
#include <utility>
@@ -39,12 +45,19 @@
namespace etlng::impl {
TaskManager::TaskManager(
util::async::AnyExecutionContext&& ctx,
std::reference_wrapper<SchedulerInterface> scheduler,
util::async::AnyExecutionContext ctx,
std::shared_ptr<SchedulerInterface> scheduler,
std::reference_wrapper<ExtractorInterface> extractor,
std::reference_wrapper<LoaderInterface> loader
std::reference_wrapper<LoaderInterface> loader,
std::reference_wrapper<MonitorInterface> monitor,
uint32_t startSeq
)
: ctx_(std::move(ctx)), schedulers_(scheduler), extractor_(extractor), loader_(loader)
: ctx_(std::move(ctx))
, schedulers_(std::move(scheduler))
, extractor_(extractor)
, loader_(loader)
, monitor_(monitor)
, queue_({.startSeq = startSeq, .increment = 1u, .limit = kQUEUE_SIZE_LIMIT})
{
}
@@ -54,37 +67,32 @@ TaskManager::~TaskManager()
}
void
TaskManager::run(Settings settings)
TaskManager::run(std::size_t numExtractors)
{
static constexpr auto kQUEUE_SIZE_LIMIT = 2048uz;
LOG(log_.debug()) << "Starting task manager with " << numExtractors << " extractors...\n";
auto schedulingStrand = ctx_.makeStrand();
PriorityQueue queue(ctx_.makeStrand(), kQUEUE_SIZE_LIMIT);
stop();
extractors_.clear();
loaders_.clear();
LOG(log_.debug()) << "Starting task manager...\n";
extractors_.reserve(numExtractors);
for ([[maybe_unused]] auto _ : std::views::iota(0uz, numExtractors))
extractors_.push_back(spawnExtractor(queue_));
extractors_.reserve(settings.numExtractors);
for ([[maybe_unused]] auto _ : std::views::iota(0uz, settings.numExtractors))
extractors_.push_back(spawnExtractor(schedulingStrand, queue));
loaders_.reserve(settings.numLoaders);
for ([[maybe_unused]] auto _ : std::views::iota(0uz, settings.numLoaders))
loaders_.push_back(spawnLoader(queue));
wait();
LOG(log_.debug()) << "All finished in task manager..\n";
// Only one forward loader for now. Backfill to be added here later
loaders_.push_back(spawnLoader(queue_));
}
util::async::AnyOperation<void>
TaskManager::spawnExtractor(util::async::AnyStrand& strand, PriorityQueue& queue)
TaskManager::spawnExtractor(TaskQueue& queue)
{
// TODO: these values may be extracted to config later and/or need to be fine-tuned on a realistic system
static constexpr auto kDELAY_BETWEEN_ATTEMPTS = std::chrono::milliseconds{100u};
static constexpr auto kDELAY_BETWEEN_ENQUEUE_ATTEMPTS = std::chrono::milliseconds{1u};
return strand.execute([this, &queue](auto stopRequested) {
return ctx_.execute([this, &queue](auto stopRequested) {
while (not stopRequested) {
if (auto task = schedulers_.get().next(); task.has_value()) {
if (auto task = schedulers_->next(); task.has_value()) {
if (auto maybeBatch = extractor_.get().extractLedgerWithDiff(task->seq); maybeBatch.has_value()) {
LOG(log_.debug()) << "Adding data after extracting diff";
while (not queue.enqueue(*maybeBatch)) {
@@ -107,13 +115,26 @@ TaskManager::spawnExtractor(util::async::AnyStrand& strand, PriorityQueue& queue
}
util::async::AnyOperation<void>
TaskManager::spawnLoader(PriorityQueue& queue)
TaskManager::spawnLoader(TaskQueue& queue)
{
static constexpr auto kNANO_TO_SECOND = 1.0e9;
return ctx_.execute([this, &queue](auto stopRequested) {
while (not stopRequested) {
// TODO (https://github.com/XRPLF/clio/issues/66): does not tell the loader whether it's out of order or not
if (auto data = queue.dequeue(); data.has_value())
loader_.get().load(*data);
if (auto data = queue.dequeue(); data.has_value()) {
auto nanos = util::timed<std::chrono::nanoseconds>([this, data = *data] { loader_.get().load(data); });
auto const seconds = nanos / kNANO_TO_SECOND;
auto const txnCount = data->transactions.size();
auto const objCount = data->objects.size();
LOG(log_.info()) << "Wrote ledger " << data->seq << " with header: " << util::toString(data->header)
<< ". txns[" << txnCount << "]; objs[" << objCount << "]; in " << seconds
<< " seconds;"
<< " tps[" << txnCount / seconds << "], ops[" << objCount / seconds << "]";
monitor_.get().notifyLedgerLoaded(data->seq);
}
}
});
}

View File

@@ -21,74 +21,70 @@
#include "etlng/ExtractorInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/MonitorInterface.hpp"
#include "etlng/SchedulerInterface.hpp"
#include "util/StrandedPriorityQueue.hpp"
#include "etlng/TaskManagerInterface.hpp"
#include "etlng/impl/Monitor.hpp"
#include "etlng/impl/TaskQueue.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/async/AnyStrand.hpp"
#include "util/log/Logger.hpp"
#include <xrpl/protocol/TxFormats.h>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <vector>
namespace etlng::impl {
class TaskManager {
class TaskManager : public TaskManagerInterface {
static constexpr auto kQUEUE_SIZE_LIMIT = 2048uz;
util::async::AnyExecutionContext ctx_;
std::reference_wrapper<SchedulerInterface> schedulers_;
std::shared_ptr<SchedulerInterface> schedulers_;
std::reference_wrapper<ExtractorInterface> extractor_;
std::reference_wrapper<LoaderInterface> loader_;
std::reference_wrapper<MonitorInterface> monitor_;
impl::TaskQueue queue_;
std::atomic_uint32_t nextForwardSequence_;
std::vector<util::async::AnyOperation<void>> extractors_;
std::vector<util::async::AnyOperation<void>> loaders_;
util::Logger log_{"ETL"};
struct ReverseOrderComparator {
[[nodiscard]] bool
operator()(model::LedgerData const& lhs, model::LedgerData const& rhs) const noexcept
{
return lhs.seq > rhs.seq;
}
};
public:
struct Settings {
size_t numExtractors; /**< number of extraction tasks */
size_t numLoaders; /**< number of loading tasks */
};
// reverse order loading is needed (i.e. start with oldest seq in forward fill buffer)
using PriorityQueue = util::StrandedPriorityQueue<model::LedgerData, ReverseOrderComparator>;
TaskManager(
util::async::AnyExecutionContext&& ctx,
std::reference_wrapper<SchedulerInterface> scheduler,
util::async::AnyExecutionContext ctx,
std::shared_ptr<SchedulerInterface> scheduler,
std::reference_wrapper<ExtractorInterface> extractor,
std::reference_wrapper<LoaderInterface> loader
std::reference_wrapper<LoaderInterface> loader,
std::reference_wrapper<MonitorInterface> monitor,
uint32_t startSeq
);
~TaskManager();
~TaskManager() override;
void
run(Settings settings);
run(std::size_t numExtractors) override;
void
stop();
stop() override;
private:
void
wait();
[[nodiscard]] util::async::AnyOperation<void>
spawnExtractor(util::async::AnyStrand& strand, PriorityQueue& queue);
spawnExtractor(TaskQueue& queue);
[[nodiscard]] util::async::AnyOperation<void>
spawnLoader(PriorityQueue& queue);
spawnLoader(TaskQueue& queue);
};
} // namespace etlng::impl

View File

@@ -0,0 +1,74 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etlng/ExtractorInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/MonitorInterface.hpp"
#include "etlng/TaskManagerInterface.hpp"
#include "etlng/TaskManagerProviderInterface.hpp"
#include "etlng/impl/Scheduling.hpp"
#include "etlng/impl/TaskManager.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include <cstdint>
#include <functional>
#include <memory>
#include <utility>
namespace etlng::impl {
/**
* @brief Implementation of the TaskManagerProvider interface
*/
class TaskManagerProvider : public TaskManagerProviderInterface {
std::reference_wrapper<etl::NetworkValidatedLedgersInterface> ledgers_;
std::shared_ptr<ExtractorInterface> extractor_;
std::shared_ptr<LoaderInterface> loader_;
public:
/**
* @brief Constructor
*
* @param ledgers Reference to ledgers
* @param extractor The extractor
* @param loader The loader
*/
TaskManagerProvider(
std::reference_wrapper<etl::NetworkValidatedLedgersInterface> ledgers,
std::shared_ptr<ExtractorInterface> extractor,
std::shared_ptr<LoaderInterface> loader
)
: ledgers_(ledgers), extractor_(std::move(extractor)), loader_(std::move(loader))
{
}
std::unique_ptr<TaskManagerInterface>
make(util::async::AnyExecutionContext ctx, std::reference_wrapper<MonitorInterface> monitor, uint32_t seq) override
{
auto scheduler = impl::makeScheduler(impl::ForwardScheduler{ledgers_, seq});
// TODO: add impl::BackfillScheduler{seq - 1, seq - 1000},
return std::make_unique<TaskManager>(std::move(ctx), std::move(scheduler), *extractor_, *loader_, monitor, seq);
}
};
} // namespace etlng::impl

View File

@@ -19,36 +19,58 @@
#pragma once
#include "util/async/AnyStrand.hpp"
#include "etlng/Models.hpp"
#include "util/Mutex.hpp"
#include <cstddef>
#include <functional>
#include <cstdint>
#include <optional>
#include <queue>
#include <type_traits>
#include <utility>
#include <vector>
namespace util {
namespace etlng::impl {
struct ReverseOrderComparator {
[[nodiscard]] bool
operator()(model::LedgerData const& lhs, model::LedgerData const& rhs) const noexcept
{
return lhs.seq > rhs.seq;
}
};
/**
* @brief A wrapper for std::priority_queue that serialises operations using a strand
* @brief A wrapper for std::priority_queue that serialises operations using a mutex
* @note This may be a candidate for future improvements if performance proves to be poor (e.g. use a lock free queue)
*/
template <typename T, typename Compare = std::less<T>>
class StrandedPriorityQueue {
util::async::AnyStrand strand_;
class TaskQueue {
std::size_t limit_;
std::priority_queue<T, std::vector<T>, Compare> queue_;
std::uint32_t increment_;
struct Data {
std::uint32_t expectedSequence;
std::priority_queue<model::LedgerData, std::vector<model::LedgerData>, ReverseOrderComparator> forwardLoadQueue;
Data(std::uint32_t seq) : expectedSequence(seq)
{
}
};
util::Mutex<Data> data_;
public:
struct Settings {
std::uint32_t startSeq = 0u; // sequence to start from (for dequeue)
std::uint32_t increment = 1u; // increment sequence by this value once dequeue was successful
std::optional<std::size_t> limit = std::nullopt;
};
/**
* @brief Construct a new priority queue on a strand
* @param strand The strand to use
* @brief Construct a new priority queue
* @param limit The limit of items allowed simultaneously in the queue
*/
StrandedPriorityQueue(util::async::AnyStrand&& strand, std::optional<std::size_t> limit = std::nullopt)
: strand_(std::move(strand)), limit_(limit.value_or(0uz))
explicit TaskQueue(Settings settings)
: limit_(settings.limit.value_or(0uz)), increment_(settings.increment), data_(settings.startSeq)
{
}
@@ -56,25 +78,20 @@ public:
* @brief Enqueue a new item onto the queue if space is available
* @note This function blocks until the item is attempted to be added to the queue
*
* @tparam I Type of the item to add
* @param item The item to add
* @return true if item added to the queue; false otherwise
*/
template <typename I>
[[nodiscard]] bool
enqueue(I&& item)
requires std::is_same_v<std::decay_t<I>, T>
enqueue(model::LedgerData item)
{
return strand_
.execute([&item, this] {
if (limit_ == 0uz or queue_.size() < limit_) {
queue_.push(std::forward<I>(item));
return true;
}
return false;
})
.get()
.value_or(false); // if some exception happens - failed to add
auto lock = data_.lock();
if (limit_ == 0uz or lock->forwardLoadQueue.size() < limit_) {
lock->forwardLoadQueue.push(std::move(item));
return true;
}
return false;
}
/**
@@ -82,22 +99,19 @@ public:
* @note This function blocks until the item is taken off the queue
* @return An item if available; nullopt otherwise
*/
[[nodiscard]] std::optional<T>
[[nodiscard]] std::optional<model::LedgerData>
dequeue()
{
return strand_
.execute([this] -> std::optional<T> {
std::optional<T> out;
auto lock = data_.lock();
std::optional<model::LedgerData> out;
if (not queue_.empty()) {
out.emplace(queue_.top());
queue_.pop();
}
if (not lock->forwardLoadQueue.empty() && lock->forwardLoadQueue.top().seq == lock->expectedSequence) {
out.emplace(lock->forwardLoadQueue.top());
lock->forwardLoadQueue.pop();
lock->expectedSequence += increment_;
}
return out;
})
.get()
.value_or(std::nullopt);
return out;
}
/**
@@ -109,8 +123,8 @@ public:
[[nodiscard]] bool
empty()
{
return strand_.execute([this] { return queue_.empty(); }).get().value();
return data_.lock()->forwardLoadQueue.empty();
}
};
} // namespace util
} // namespace etlng::impl

View File

@@ -19,24 +19,26 @@
#include "etlng/impl/ext/Cache.hpp"
#include "data/LedgerCacheInterface.hpp"
#include "etlng/CacheUpdaterInterface.hpp"
#include "etlng/Models.hpp"
#include "util/log/Logger.hpp"
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>
namespace etlng::impl {
CacheExt::CacheExt(data::LedgerCacheInterface& cache) : cache_(cache)
CacheExt::CacheExt(std::shared_ptr<CacheUpdaterInterface> cacheUpdater) : cacheUpdater_(std::move(cacheUpdater))
{
}
void
CacheExt::onLedgerData(model::LedgerData const& data) const
{
cache_.get().update(data.objects, data.seq);
cacheUpdater_->update(data);
LOG(log_.trace()) << "got data. objects cnt = " << data.objects.size();
}
@@ -44,8 +46,8 @@ void
CacheExt::onInitialData(model::LedgerData const& data) const
{
LOG(log_.trace()) << "got initial data. objects cnt = " << data.objects.size();
cache_.get().update(data.objects, data.seq);
cache_.get().setFull();
cacheUpdater_->update(data);
cacheUpdater_->setFull();
}
void
@@ -53,7 +55,7 @@ CacheExt::onInitialObjects(uint32_t seq, std::vector<model::Object> const& objs,
const
{
LOG(log_.trace()) << "got initial objects cnt = " << objs.size();
cache_.get().update(objs, seq);
cacheUpdater_->update(seq, objs);
}
} // namespace etlng::impl

View File

@@ -19,24 +19,25 @@
#pragma once
#include "data/LedgerCacheInterface.hpp"
#include "etlng/CacheUpdaterInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/impl/CacheUpdater.hpp"
#include "util/log/Logger.hpp"
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <vector>
namespace etlng::impl {
class CacheExt {
std::reference_wrapper<data::LedgerCacheInterface> cache_;
std::shared_ptr<CacheUpdaterInterface> cacheUpdater_;
util::Logger log_{"ETL"};
public:
CacheExt(data::LedgerCacheInterface& cache);
CacheExt(std::shared_ptr<CacheUpdaterInterface> cacheUpdater);
void
onLedgerData(model::LedgerData const& data) const;
@@ -46,6 +47,13 @@ public:
void
onInitialObjects(uint32_t seq, std::vector<model::Object> const& objs, [[maybe_unused]] std::string lastKey) const;
// We want cache updates through ETL if we are a potential writer but currently are not writing to DB
static bool
allowInReadonly()
{
return true;
}
};
} // namespace etlng::impl

View File

@@ -19,6 +19,8 @@
#pragma once
#include "etlng/LedgerPublisherInterface.hpp"
#include <gmock/gmock.h>
#include <xrpl/protocol/LedgerHeader.h>
@@ -26,11 +28,11 @@
#include <cstdint>
#include <optional>
struct MockLedgerPublisher {
MOCK_METHOD(bool, publish, (uint32_t, std::optional<uint32_t>), ());
struct MockLedgerPublisher : public etlng::LedgerPublisherInterface {
MOCK_METHOD(bool, publish, (uint32_t, std::optional<uint32_t>, std::chrono::steady_clock::duration), (override));
MOCK_METHOD(void, publish, (ripple::LedgerHeader const&), ());
MOCK_METHOD(std::uint32_t, lastPublishAgeSeconds, (), (const));
MOCK_METHOD(std::chrono::time_point<std::chrono::system_clock>, getLastPublish, (), (const));
MOCK_METHOD(std::uint32_t, lastCloseAgeSeconds, (), (const));
MOCK_METHOD(std::chrono::time_point<std::chrono::system_clock>, getLastPublish, (), (const, override));
MOCK_METHOD(std::uint32_t, lastCloseAgeSeconds, (), (const, override));
MOCK_METHOD(std::optional<uint32_t>, getLastPublishedSequence, (), (const));
};

View File

@@ -40,9 +40,11 @@ target_sources(
etl/TransformerTests.cpp
# ETLng
etlng/AmendmentBlockHandlerTests.cpp
etlng/ETLServiceTests.cpp
etlng/ExtractionTests.cpp
etlng/ForwardingSourceTests.cpp
etlng/GrpcSourceTests.cpp
etlng/LedgerPublisherTests.cpp
etlng/RegistryTests.cpp
etlng/SchedulingTests.cpp
etlng/TaskManagerTests.cpp
@@ -145,7 +147,6 @@ target_sources(
util/ConceptsTests.cpp
util/CoroutineGroupTests.cpp
util/LedgerUtilsTests.cpp
util/StrandedPriorityQueueTests.cpp
util/StringHashTests.cpp
# Prometheus support
util/prometheus/BoolTests.cpp

View File

@@ -0,0 +1,341 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025 the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "data/Types.hpp"
#include "etl/ETLState.hpp"
#include "etl/SystemState.hpp"
#include "etlng/CacheLoaderInterface.hpp"
#include "etlng/CacheUpdaterInterface.hpp"
#include "etlng/ETLService.hpp"
#include "etlng/ExtractorInterface.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/MonitorInterface.hpp"
#include "etlng/TaskManagerInterface.hpp"
#include "etlng/TaskManagerProviderInterface.hpp"
#include "util/BinaryTestObject.hpp"
#include "util/MockAssert.hpp"
#include "util/MockBackendTestFixture.hpp"
#include "util/MockLedgerPublisher.hpp"
#include "util/MockLoadBalancer.hpp"
#include "util/MockNetworkValidatedLedgers.hpp"
#include "util/MockPrometheus.hpp"
#include "util/MockSubscriptionManager.hpp"
#include "util/TestObject.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/async/context/SyncExecutionContext.hpp"
#include "util/async/impl/ErasedOperation.hpp"
#include "util/config/ConfigDefinition.hpp"
#include "util/config/ConfigValue.hpp"
#include "util/config/Types.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/json/array.hpp>
#include <boost/json/kind.hpp>
#include <boost/json/object.hpp>
#include <boost/json/serialize.hpp>
#include <boost/signals2/connection.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <vector>
using namespace util::config;
namespace {
constinit auto const kSEQ = 100;
constinit auto const kLEDGER_HASH = "4BC50C9B0D8515D3EAAE1E74B29A95804346C491EE1A95BF25E4AAB854A6A652";
struct MockMonitor : public etlng::MonitorInterface {
MOCK_METHOD(void, notifyLedgerLoaded, (uint32_t), (override));
MOCK_METHOD(boost::signals2::scoped_connection, subscribe, (SignalType::slot_type const&), (override));
MOCK_METHOD(void, run, (std::chrono::steady_clock::duration), (override));
MOCK_METHOD(void, stop, (), (override));
};
struct MockExtractor : etlng::ExtractorInterface {
MOCK_METHOD(std::optional<etlng::model::LedgerData>, extractLedgerWithDiff, (uint32_t), (override));
MOCK_METHOD(std::optional<etlng::model::LedgerData>, extractLedgerOnly, (uint32_t), (override));
};
struct MockLoader : etlng::LoaderInterface {
MOCK_METHOD(void, load, (etlng::model::LedgerData const&), (override));
MOCK_METHOD(std::optional<ripple::LedgerHeader>, loadInitialLedger, (etlng::model::LedgerData const&), (override));
};
struct MockCacheLoader : etlng::CacheLoaderInterface {
MOCK_METHOD(void, load, (uint32_t), (override));
MOCK_METHOD(void, stop, (), (noexcept, override));
MOCK_METHOD(void, wait, (), (noexcept, override));
};
struct MockCacheUpdater : etlng::CacheUpdaterInterface {
MOCK_METHOD(void, update, (etlng::model::LedgerData const&), (override));
MOCK_METHOD(void, update, (uint32_t, std::vector<data::LedgerObject> const&), (override));
MOCK_METHOD(void, update, (uint32_t, std::vector<etlng::model::Object> const&), (override));
MOCK_METHOD(void, setFull, (), (override));
};
struct MockInitialLoadObserver : etlng::InitialLoadObserverInterface {
MOCK_METHOD(
void,
onInitialLoadGotMoreObjects,
(uint32_t, std::vector<etlng::model::Object> const&, std::optional<std::string>),
(override)
);
};
struct MockTaskManager : etlng::TaskManagerInterface {
MOCK_METHOD(void, run, (std::size_t), (override));
MOCK_METHOD(void, stop, (), (override));
};
struct MockTaskManagerProvider : etlng::TaskManagerProviderInterface {
MOCK_METHOD(
std::unique_ptr<etlng::TaskManagerInterface>,
make,
(util::async::AnyExecutionContext, std::reference_wrapper<etlng::MonitorInterface>, uint32_t),
(override)
);
};
auto
createTestData(uint32_t seq)
{
auto const header = createLedgerHeader(kLEDGER_HASH, seq);
return etlng::model::LedgerData{
.transactions = {},
.objects = {util::createObject(), util::createObject(), util::createObject()},
.successors = {},
.edgeKeys = {},
.header = header,
.rawHeader = {},
.seq = seq
};
}
} // namespace
struct ETLServiceTests : util::prometheus::WithPrometheus, MockBackendTest {
using SameThreadTestContext = util::async::BasicExecutionContext<
util::async::impl::SameThreadContext,
util::async::impl::BasicStopSource,
util::async::impl::SyncDispatchStrategy,
util::async::impl::SystemContextProvider,
util::async::impl::NoErrorHandler>; // This will allow ASSERTs turned exceptions to propagate
protected:
SameThreadTestContext ctx_;
util::config::ClioConfigDefinition config_{
{"extractor_threads", ConfigValue{ConfigType::Integer}.defaultValue(4)},
{"io_threads", ConfigValue{ConfigType::Integer}.defaultValue(2)},
{"cache.num_diffs", ConfigValue{ConfigType::Integer}.defaultValue(32)},
{"cache.num_markers", ConfigValue{ConfigType::Integer}.defaultValue(48)},
{"cache.num_cursors_from_diff", ConfigValue{ConfigType::Integer}.defaultValue(0)},
{"cache.num_cursors_from_account", ConfigValue{ConfigType::Integer}.defaultValue(0)},
{"cache.page_fetch_size", ConfigValue{ConfigType::Integer}.defaultValue(512)},
{"cache.load", ConfigValue{ConfigType::String}.defaultValue("async")}
};
StrictMockSubscriptionManagerSharedPtr subscriptions_;
std::shared_ptr<testing::NiceMock<MockLoadBalancer>> balancer_ =
std::make_shared<testing::NiceMock<MockLoadBalancer>>();
std::shared_ptr<testing::NiceMock<MockNetworkValidatedLedgers>> ledgers_ =
std::make_shared<testing::NiceMock<MockNetworkValidatedLedgers>>();
std::shared_ptr<testing::NiceMock<MockLedgerPublisher>> publisher_ =
std::make_shared<testing::NiceMock<MockLedgerPublisher>>();
std::shared_ptr<testing::NiceMock<MockCacheLoader>> cacheLoader_ =
std::make_shared<testing::NiceMock<MockCacheLoader>>();
std::shared_ptr<testing::NiceMock<MockCacheUpdater>> cacheUpdater_ =
std::make_shared<testing::NiceMock<MockCacheUpdater>>();
std::shared_ptr<testing::NiceMock<MockExtractor>> extractor_ = std::make_shared<testing::NiceMock<MockExtractor>>();
std::shared_ptr<testing::NiceMock<MockLoader>> loader_ = std::make_shared<testing::NiceMock<MockLoader>>();
std::shared_ptr<testing::NiceMock<MockInitialLoadObserver>> initialLoadObserver_ =
std::make_shared<testing::NiceMock<MockInitialLoadObserver>>();
std::shared_ptr<testing::NiceMock<MockTaskManagerProvider>> taskManagerProvider_ =
std::make_shared<testing::NiceMock<MockTaskManagerProvider>>();
std::shared_ptr<etl::SystemState> systemState_ = std::make_shared<etl::SystemState>();
etlng::ETLService service_{
ctx_,
config_,
backend_,
balancer_,
ledgers_,
publisher_,
cacheLoader_,
cacheUpdater_,
extractor_,
loader_,
initialLoadObserver_,
taskManagerProvider_,
systemState_
};
};
TEST_F(ETLServiceTests, GetInfoWithoutLastPublish)
{
EXPECT_CALL(*balancer_, toJson()).WillOnce(testing::Return(boost::json::parse(R"json([{"test": "value"}])json")));
EXPECT_CALL(*publisher_, getLastPublish()).WillOnce(testing::Return(std::chrono::system_clock::time_point{}));
EXPECT_CALL(*publisher_, lastPublishAgeSeconds()).WillRepeatedly(testing::Return(0));
auto result = service_.getInfo();
auto expectedResult = boost::json::parse(R"json({
"etl_sources": [{"test": "value"}],
"is_writer": 0,
"read_only": 0
})json");
EXPECT_TRUE(result == expectedResult);
EXPECT_FALSE(result.contains("last_publish_age_seconds"));
}
TEST_F(ETLServiceTests, GetInfoWithLastPublish)
{
EXPECT_CALL(*balancer_, toJson()).WillOnce(testing::Return(boost::json::parse(R"json([{"test": "value"}])json")));
EXPECT_CALL(*publisher_, getLastPublish()).WillOnce(testing::Return(std::chrono::system_clock::now()));
EXPECT_CALL(*publisher_, lastPublishAgeSeconds()).WillOnce(testing::Return(42));
auto result = service_.getInfo();
auto expectedResult = boost::json::parse(R"json({
"etl_sources": [{"test": "value"}],
"is_writer": 0,
"read_only": 0,
"last_publish_age_seconds": "42"
})json");
EXPECT_TRUE(result == expectedResult);
}
TEST_F(ETLServiceTests, IsAmendmentBlocked)
{
EXPECT_FALSE(service_.isAmendmentBlocked());
}
TEST_F(ETLServiceTests, IsCorruptionDetected)
{
EXPECT_FALSE(service_.isCorruptionDetected());
}
TEST_F(ETLServiceTests, GetETLState)
{
EXPECT_CALL(*balancer_, getETLState()).WillOnce(testing::Return(etl::ETLState{}));
auto result = service_.getETLState();
EXPECT_TRUE(result.has_value());
}
TEST_F(ETLServiceTests, LastCloseAgeSeconds)
{
EXPECT_CALL(*publisher_, lastCloseAgeSeconds()).WillOnce(testing::Return(10));
auto result = service_.lastCloseAgeSeconds();
EXPECT_GE(result, 0);
}
TEST_F(ETLServiceTests, RunWithEmptyDatabase)
{
auto mockTaskManager = std::make_unique<testing::NiceMock<MockTaskManager>>();
auto ledgerData = createTestData(kSEQ);
testing::Sequence s;
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).InSequence(s).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ));
EXPECT_CALL(*extractor_, extractLedgerOnly(kSEQ)).WillOnce(testing::Return(ledgerData));
EXPECT_CALL(*balancer_, loadInitialLedger(kSEQ, testing::_, testing::_))
.WillOnce(testing::Return(std::vector<std::string>{}));
EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).WillOnce(testing::Return(ripple::LedgerHeader{}));
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_))
.InSequence(s)
.WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
EXPECT_CALL(*mockTaskManager, run(testing::_));
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1))
.WillOnce(testing::Return(std::unique_ptr<etlng::TaskManagerInterface>(mockTaskManager.release())));
service_.run();
}
TEST_F(ETLServiceTests, RunWithPopulatedDatabase)
{
auto mockTaskManager = std::make_unique<testing::NiceMock<MockTaskManager>>();
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_))
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ));
EXPECT_CALL(*cacheLoader_, load(kSEQ));
EXPECT_CALL(*mockTaskManager, run(testing::_));
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1))
.WillOnce(testing::Return(std::unique_ptr<etlng::TaskManagerInterface>(mockTaskManager.release())));
service_.run();
}
TEST_F(ETLServiceTests, WaitForValidatedLedgerIsAborted)
{
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*ledgers_, getMostRecent()).Times(2).WillRepeatedly(testing::Return(std::nullopt));
// No other calls should happen because we exit early
EXPECT_CALL(*extractor_, extractLedgerOnly(testing::_)).Times(0);
EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).Times(0);
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, testing::_)).Times(0);
service_.run();
}
struct ETLServiceAssertTests : common::util::WithMockAssert, ETLServiceTests {};
TEST_F(ETLServiceAssertTests, FailToLoadInitialLedger)
{
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ));
EXPECT_CALL(*extractor_, extractLedgerOnly(kSEQ)).WillOnce(testing::Return(std::nullopt));
// These calls should not happen because loading the initial ledger fails
EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).Times(0);
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CLIO_ASSERT_FAIL({ service_.run(); });
}
TEST_F(ETLServiceAssertTests, WaitForValidatedLedgerIsAbortedLeadToFailToLoadInitialLedger)
{
testing::Sequence s;
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*ledgers_, getMostRecent()).InSequence(s).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*ledgers_, getMostRecent()).InSequence(s).WillOnce(testing::Return(kSEQ));
// No other calls should happen because we exit early
EXPECT_CALL(*extractor_, extractLedgerOnly(testing::_)).Times(0);
EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).Times(0);
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CLIO_ASSERT_FAIL({ service_.run(); });
}

View File

@@ -0,0 +1,357 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "data/DBHelpers.hpp"
#include "data/Types.hpp"
#include "etl/SystemState.hpp"
#include "etlng/impl/LedgerPublisher.hpp"
#include "util/AsioContextTestFixture.hpp"
#include "util/MockBackendTestFixture.hpp"
#include "util/MockPrometheus.hpp"
#include "util/MockSubscriptionManager.hpp"
#include "util/TestObject.hpp"
#include "util/config/ConfigDefinition.hpp"
#include <fmt/core.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <xrpl/basics/chrono.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <chrono>
#include <optional>
#include <vector>
using namespace testing;
using namespace etlng;
using namespace data;
using namespace std::chrono;
namespace {
constexpr auto kACCOUNT = "rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn";
constexpr auto kACCOUNT2 = "rLEsXccBGNR3UPuPu2hUXPjziKC3qKSBun";
constexpr auto kLEDGER_HASH = "4BC50C9B0D8515D3EAAE1E74B29A95804346C491EE1A95BF25E4AAB854A6A652";
constexpr auto kSEQ = 30;
constexpr auto kAGE = 800;
constexpr auto kAMOUNT = 100;
constexpr auto kFEE = 3;
constexpr auto kFINAL_BALANCE = 110;
constexpr auto kFINAL_BALANCE2 = 30;
MATCHER_P(ledgerHeaderMatcher, expectedHeader, "Headers match")
{
return arg.seq == expectedHeader.seq && arg.hash == expectedHeader.hash &&
arg.closeTime == expectedHeader.closeTime;
}
} // namespace
struct ETLLedgerPublisherNgTest : util::prometheus::WithPrometheus, MockBackendTestStrict, SyncAsioContextTest {
util::config::ClioConfigDefinition cfg{{}};
StrictMockSubscriptionManagerSharedPtr mockSubscriptionManagerPtr;
};
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderIsWritingFalseAndCacheDisabled)
{
etl::SystemState dummyState;
dummyState.isWriting = false;
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE);
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
publisher.publish(dummyLedgerHeader);
EXPECT_CALL(*backend_, fetchLedgerDiff(kSEQ, _)).Times(0);
// setLastPublishedSequence not in strand, should verify before run
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
ctx_.run();
EXPECT_TRUE(backend_->fetchLedgerRange());
EXPECT_EQ(backend_->fetchLedgerRange().value().minSequence, kSEQ);
EXPECT_EQ(backend_->fetchLedgerRange().value().maxSequence, kSEQ);
}
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderIsWritingFalseAndCacheEnabled)
{
etl::SystemState dummyState;
dummyState.isWriting = false;
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE);
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
publisher.publish(dummyLedgerHeader);
// setLastPublishedSequence not in strand, should verify before run
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
ctx_.run();
EXPECT_TRUE(backend_->fetchLedgerRange());
EXPECT_EQ(backend_->fetchLedgerRange().value().minSequence, kSEQ);
EXPECT_EQ(backend_->fetchLedgerRange().value().maxSequence, kSEQ);
}
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderIsWritingTrue)
{
etl::SystemState dummyState;
dummyState.isWriting = true;
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE);
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
publisher.publish(dummyLedgerHeader);
// setLastPublishedSequence not in strand, should verify before run
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
ctx_.run();
EXPECT_FALSE(backend_->fetchLedgerRange());
}
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderInRange)
{
etl::SystemState dummyState;
dummyState.isWriting = true;
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
backend_->setRange(kSEQ - 1, kSEQ);
publisher.publish(dummyLedgerHeader);
// mock fetch fee
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
TransactionAndMetadata t1;
t1.transaction =
createPaymentTransactionObject(kACCOUNT, kACCOUNT2, kAMOUNT, kFEE, kSEQ).getSerializer().peekData();
t1.metadata = createPaymentTransactionMetaObject(kACCOUNT, kACCOUNT2, kFINAL_BALANCE, kFINAL_BALANCE2)
.getSerializer()
.peekData();
t1.ledgerSequence = kSEQ;
// mock fetch transactions
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger).WillOnce(Return(std::vector<TransactionAndMetadata>{t1}));
// setLastPublishedSequence not in strand, should verify before run
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 1));
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges);
// mock 1 transaction
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction);
ctx_.run();
// last publish time should be set
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
}
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderCloseTimeGreaterThanNow)
{
etl::SystemState dummyState;
dummyState.isWriting = true;
ripple::LedgerHeader dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0);
auto const nowPlus10 = system_clock::now() + seconds(10);
auto const closeTime = duration_cast<seconds>(nowPlus10.time_since_epoch()).count() - kRIPPLE_EPOCH_START;
dummyLedgerHeader.closeTime = ripple::NetClock::time_point{seconds{closeTime}};
backend_->setRange(kSEQ - 1, kSEQ);
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
publisher.publish(dummyLedgerHeader);
// mock fetch fee
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
TransactionAndMetadata t1;
t1.transaction =
createPaymentTransactionObject(kACCOUNT, kACCOUNT2, kAMOUNT, kFEE, kSEQ).getSerializer().peekData();
t1.metadata = createPaymentTransactionMetaObject(kACCOUNT, kACCOUNT2, kFINAL_BALANCE, kFINAL_BALANCE2)
.getSerializer()
.peekData();
t1.ledgerSequence = kSEQ;
// mock fetch transactions
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _))
.WillOnce(Return(std::vector<TransactionAndMetadata>{t1}));
// setLastPublishedSequence not in strand, should verify before run
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 1));
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges);
// mock 1 transaction
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction);
ctx_.run();
// last publish time should be set
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
}
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqStopIsTrue)
{
etl::SystemState dummyState;
dummyState.isStopping = true;
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
EXPECT_FALSE(publisher.publish(kSEQ, {}));
}
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqMaxAttempt)
{
etl::SystemState dummyState;
dummyState.isStopping = false;
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
static constexpr auto kMAX_ATTEMPT = 2;
LedgerRange const range{.minSequence = kSEQ - 1, .maxSequence = kSEQ - 1};
EXPECT_CALL(*backend_, hardFetchLedgerRange).Times(kMAX_ATTEMPT).WillRepeatedly(Return(range));
EXPECT_FALSE(publisher.publish(kSEQ, kMAX_ATTEMPT, std::chrono::milliseconds{1}));
}
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqStopIsFalse)
{
etl::SystemState dummyState;
dummyState.isStopping = false;
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
LedgerRange const range{.minSequence = kSEQ, .maxSequence = kSEQ};
EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(Return(range));
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE);
EXPECT_CALL(*backend_, fetchLedgerBySequence(kSEQ, _)).WillOnce(Return(dummyLedgerHeader));
EXPECT_TRUE(publisher.publish(kSEQ, {}));
ctx_.run();
}
TEST_F(ETLLedgerPublisherNgTest, PublishMultipleTxInOrder)
{
etl::SystemState dummyState;
dummyState.isWriting = true;
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
backend_->setRange(kSEQ - 1, kSEQ);
publisher.publish(dummyLedgerHeader);
// mock fetch fee
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
// t1 index > t2 index
TransactionAndMetadata t1;
t1.transaction =
createPaymentTransactionObject(kACCOUNT, kACCOUNT2, kAMOUNT, kFEE, kSEQ).getSerializer().peekData();
t1.metadata = createPaymentTransactionMetaObject(kACCOUNT, kACCOUNT2, kFINAL_BALANCE, kFINAL_BALANCE2, 2)
.getSerializer()
.peekData();
t1.ledgerSequence = kSEQ;
t1.date = 1;
TransactionAndMetadata t2;
t2.transaction =
createPaymentTransactionObject(kACCOUNT, kACCOUNT2, kAMOUNT, kFEE, kSEQ).getSerializer().peekData();
t2.metadata = createPaymentTransactionMetaObject(kACCOUNT, kACCOUNT2, kFINAL_BALANCE, kFINAL_BALANCE2, 1)
.getSerializer()
.peekData();
t2.ledgerSequence = kSEQ;
t2.date = 2;
// mock fetch transactions
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _))
.WillOnce(Return(std::vector<TransactionAndMetadata>{t1, t2}));
// setLastPublishedSequence not in strand, should verify before run
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 2));
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges);
// should call pubTransaction t2 first (greater tx index)
Sequence const s;
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t2, _)).InSequence(s);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t1, _)).InSequence(s);
ctx_.run();
// last publish time should be set
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
}
TEST_F(ETLLedgerPublisherNgTest, PublishVeryOldLedgerShouldSkip)
{
etl::SystemState dummyState;
dummyState.isWriting = true;
// Create a ledger header with age (800) greater than MAX_LEDGER_AGE_SECONDS (600)
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 800);
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
backend_->setRange(kSEQ - 1, kSEQ);
publisher.publish(dummyLedgerHeader);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger).Times(0);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges).Times(0);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction).Times(0);
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
ctx_.run();
}
TEST_F(ETLLedgerPublisherNgTest, PublishMultipleLedgersInQuickSuccession)
{
etl::SystemState dummyState;
dummyState.isWriting = true;
auto const dummyLedgerHeader1 = createLedgerHeader(kLEDGER_HASH, kSEQ, 0);
auto const dummyLedgerHeader2 = createLedgerHeader(kLEDGER_HASH, kSEQ + 1, 0);
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
backend_->setRange(kSEQ - 1, kSEQ + 1);
// Publish two ledgers in quick succession
publisher.publish(dummyLedgerHeader1);
publisher.publish(dummyLedgerHeader2);
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ + 1, _))
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _))
.WillOnce(Return(std::vector<TransactionAndMetadata>{}));
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ + 1, _))
.WillOnce(Return(std::vector<TransactionAndMetadata>{}));
Sequence const s;
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(ledgerHeaderMatcher(dummyLedgerHeader1), _, _, _)).InSequence(s);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges(ledgerHeaderMatcher(dummyLedgerHeader1), _)).InSequence(s);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(ledgerHeaderMatcher(dummyLedgerHeader2), _, _, _)).InSequence(s);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges(ledgerHeaderMatcher(dummyLedgerHeader2), _)).InSequence(s);
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ + 1);
ctx_.run();
}

View File

@@ -64,13 +64,10 @@ struct MockLoadObserver : etlng::InitialLoadObserverInterface {
);
};
struct LoadingTests : util::prometheus::WithPrometheus,
MockBackendTest,
MockLedgerFetcherTest,
MockAmendmentBlockHandlerTest {
struct LoadingTests : util::prometheus::WithPrometheus, MockBackendTest, MockAmendmentBlockHandlerTest {
protected:
std::shared_ptr<MockRegistry> mockRegistryPtr_ = std::make_shared<MockRegistry>();
Loader loader_{backend_, mockLedgerFetcherPtr_, mockRegistryPtr_, mockAmendmentBlockHandlerPtr_};
Loader loader_{backend_, mockRegistryPtr_, mockAmendmentBlockHandlerPtr_};
};
struct LoadingAssertTest : common::util::WithMockAssert, LoadingTests {};
@@ -148,6 +145,33 @@ TEST_F(LoadingTests, OnInitialLoadGotMoreObjectsWithoutKey)
loader_.onInitialLoadGotMoreObjects(kSEQ, data.objects, lastKey);
}
TEST_F(LoadingTests, OnInitialLoadGotMoreObjectsFailure)
{
auto const data = createTestData();
auto const lastKey = std::optional<std::string>{};
EXPECT_CALL(*mockRegistryPtr_, dispatchInitialObjects(kSEQ, data.objects, std::string{}))
.WillOnce([](auto, auto, auto) { throw std::runtime_error("some error"); });
EXPECT_CALL(*mockAmendmentBlockHandlerPtr_, notifyAmendmentBlocked());
loader_.onInitialLoadGotMoreObjects(kSEQ, data.objects, lastKey);
}
TEST_F(LoadingTests, LoadInitialLedgerFailure)
{
auto const data = createTestData();
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*backend_, doFinishWrites()).Times(0);
EXPECT_CALL(*mockRegistryPtr_, dispatchInitialData(data)).WillOnce([](auto const&) {
throw std::runtime_error("some error");
});
EXPECT_CALL(*mockAmendmentBlockHandlerPtr_, notifyAmendmentBlocked());
auto const res = loader_.loadInitialLedger(data);
EXPECT_FALSE(res.has_value());
}
TEST_F(LoadingAssertTest, LoadInitialLedgerHasDataInDB)
{
auto const data = createTestData();

View File

@@ -66,7 +66,7 @@ TEST_F(MonitorTests, ConsumesAndNotifiesForAllOutstandingSequencesAtOnce)
});
auto subscription = monitor_.subscribe(actionMock_.AsStdFunction());
monitor_.run(std::chrono::milliseconds{1});
monitor_.run(std::chrono::milliseconds{10});
unblock.acquire();
}
@@ -111,3 +111,18 @@ TEST_F(MonitorTests, NotifiesWhenForcedByNewSequenceAvailableFromNetwork)
pusher(kSTART_SEQ); // pretend network validated a new ledger
unblock.acquire();
}
TEST_F(MonitorTests, NotifiesWhenForcedByLedgerLoaded)
{
LedgerRange const range(kSTART_SEQ, kSTART_SEQ);
std::binary_semaphore unblock(0);
EXPECT_CALL(*ledgers_, subscribe(testing::_));
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(range));
EXPECT_CALL(actionMock_, Call).WillOnce([&] { unblock.release(); });
auto subscription = monitor_.subscribe(actionMock_.AsStdFunction());
monitor_.run(std::chrono::seconds{10}); // expected to be force-invoked sooner than in 10 sec
monitor_.notifyLedgerLoaded(kSTART_SEQ); // notify about newly committed ledger
unblock.acquire();
}

View File

@@ -17,16 +17,21 @@
*/
//==============================================================================
#include "etl/SystemState.hpp"
#include "etlng/Models.hpp"
#include "etlng/MonitorInterface.hpp"
#include "etlng/impl/Registry.hpp"
#include "util/BinaryTestObject.hpp"
#include "util/LoggerFixtures.hpp"
#include "util/MockPrometheus.hpp"
#include "util/TestObject.hpp"
#include <boost/signals2/connection.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <xrpl/protocol/TxFormats.h>
#include <chrono>
#include <cstdint>
#include <string>
#include <utility>
@@ -176,7 +181,88 @@ struct MockExtNftOffer {
MOCK_METHOD(void, onInitialTransaction, (uint32_t, etlng::model::Transaction const&), (const));
};
struct RegistryTest : NoLoggerFixture {};
// Mock extensions with allowInReadonly
struct MockExtLedgerDataReadonly {
MOCK_METHOD(void, onLedgerData, (etlng::model::LedgerData const&), (const));
static bool
allowInReadonly()
{
return true;
}
};
struct MockExtInitialDataReadonly {
MOCK_METHOD(void, onInitialData, (etlng::model::LedgerData const&), (const));
static bool
allowInReadonly()
{
return true;
}
};
struct MockExtOnObjectReadonly {
MOCK_METHOD(void, onObject, (uint32_t, etlng::model::Object const&), (const));
static bool
allowInReadonly()
{
return true;
}
};
struct MockExtTransactionNftBurnReadonly {
using spec = etlng::model::Spec<ripple::TxType::ttNFTOKEN_BURN>;
MOCK_METHOD(void, onTransaction, (uint32_t, etlng::model::Transaction const&), (const));
static bool
allowInReadonly()
{
return true;
}
};
struct MockExtInitialObjectReadonly {
MOCK_METHOD(void, onInitialObject, (uint32_t, etlng::model::Object const&), (const));
static bool
allowInReadonly()
{
return true;
}
};
struct MockExtInitialObjectsReadonly {
MOCK_METHOD(void, onInitialObjects, (uint32_t, std::vector<etlng::model::Object> const&, std::string), (const));
static bool
allowInReadonly()
{
return true;
}
};
struct MockExtNftBurnReadonly {
using spec = etlng::model::Spec<ripple::TxType::ttNFTOKEN_BURN>;
MOCK_METHOD(void, onInitialTransaction, (uint32_t, etlng::model::Transaction const&), (const));
static bool
allowInReadonly()
{
return true;
}
};
struct RegistryTest : NoLoggerFixture, util::prometheus::WithPrometheus {
RegistryTest()
{
state_.isWriting = true;
}
protected:
etl::SystemState state_{};
};
} // namespace
@@ -195,7 +281,7 @@ TEST_F(RegistryTest, FilteringOfTxWorksCorrectlyForInitialTransaction)
EXPECT_CALL(extOffer, onInitialTransaction(testing::_, testing::_)); // 1 create offer
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<MockExtNftBurn&, MockExtNftOffer&>(extBurn, extOffer);
auto reg = Registry<MockExtNftBurn&, MockExtNftOffer&>(state_, extBurn, extOffer);
reg.dispatchInitialData(etlng::model::LedgerData{
.transactions = transactions,
.objects = {},
@@ -222,7 +308,7 @@ TEST_F(RegistryTest, FilteringOfTxWorksCorrectlyForTransaction)
EXPECT_CALL(extOffer, onTransaction(testing::_, testing::_)); // 1 create offer
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<MockExtTransactionNftBurn&, MockExtTransactionNftOffer&>(extBurn, extOffer);
auto reg = Registry<MockExtTransactionNftBurn&, MockExtTransactionNftOffer&>(state_, extBurn, extOffer);
reg.dispatch(etlng::model::LedgerData{
.transactions = std::move(transactions),
.objects = {},
@@ -242,7 +328,7 @@ TEST_F(RegistryTest, InitialObjectsEmpty)
EXPECT_CALL(extObj, onInitialObject(testing::_, testing::_)).Times(0); // 0 empty objects sent
EXPECT_CALL(extObjs, onInitialObjects(testing::_, testing::_, testing::_)); // 1 vector passed as is
auto reg = Registry<MockExtInitialObject&, MockExtInitialObjects&>(extObj, extObjs);
auto reg = Registry<MockExtInitialObject&, MockExtInitialObjects&>(state_, extObj, extObjs);
reg.dispatchInitialObjects(kSEQ, {}, {});
}
@@ -254,7 +340,7 @@ TEST_F(RegistryTest, InitialObjectsDispatched)
EXPECT_CALL(extObj, onInitialObject(testing::_, testing::_)).Times(3); // 3 objects sent
EXPECT_CALL(extObjs, onInitialObjects(testing::_, testing::_, testing::_)); // 1 vector passed as is
auto reg = Registry<MockExtInitialObject&, MockExtInitialObjects&>(extObj, extObjs);
auto reg = Registry<MockExtInitialObject&, MockExtInitialObjects&>(state_, extObj, extObjs);
reg.dispatchInitialObjects(kSEQ, {util::createObject(), util::createObject(), util::createObject()}, {});
}
@@ -265,7 +351,7 @@ TEST_F(RegistryTest, ObjectsDispatched)
EXPECT_CALL(extObj, onObject(testing::_, testing::_)).Times(3); // 3 objects sent
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<MockExtOnObject&>(extObj);
auto reg = Registry<MockExtOnObject&>(state_, extObj);
reg.dispatch(etlng::model::LedgerData{
.transactions = {},
.objects = {util::createObject(), util::createObject(), util::createObject()},
@@ -290,7 +376,7 @@ TEST_F(RegistryTest, OnLedgerDataForBatch)
EXPECT_CALL(ext, onLedgerData(testing::_)); // 1 batch (dispatch call)
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<MockExtLedgerData&>(ext);
auto reg = Registry<MockExtLedgerData&>(state_, ext);
reg.dispatch(etlng::model::LedgerData{
.transactions = std::move(transactions),
.objects = {},
@@ -311,7 +397,7 @@ TEST_F(RegistryTest, InitialObjectsCorrectOrderOfHookCalls)
EXPECT_CALL(extObjs, onInitialObjects);
EXPECT_CALL(extObj, onInitialObject).Times(3);
auto reg = Registry<MockExtInitialObject&, MockExtInitialObjects&>(extObj, extObjs);
auto reg = Registry<MockExtInitialObject&, MockExtInitialObjects&>(state_, extObj, extObjs);
reg.dispatchInitialObjects(kSEQ, {util::createObject(), util::createObject(), util::createObject()}, {});
}
@@ -331,7 +417,7 @@ TEST_F(RegistryTest, InitialDataCorrectOrderOfHookCalls)
EXPECT_CALL(extInitialTransaction, onInitialTransaction).Times(2);
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<MockExtNftBurn&, MockExtInitialData&>(extInitialTransaction, extInitialData);
auto reg = Registry<MockExtNftBurn&, MockExtInitialData&>(state_, extInitialTransaction, extInitialData);
reg.dispatchInitialData(etlng::model::LedgerData{
.transactions = std::move(transactions),
.objects = {},
@@ -368,7 +454,7 @@ TEST_F(RegistryTest, LedgerDataCorrectOrderOfHookCalls)
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<MockExtOnObject&, MockExtTransactionNftBurn&, MockExtLedgerData&>(
extOnObject, extOnTransaction, extLedgerData
state_, extOnObject, extOnTransaction, extLedgerData
);
reg.dispatch(etlng::model::LedgerData{
.transactions = std::move(transactions),
@@ -380,3 +466,362 @@ TEST_F(RegistryTest, LedgerDataCorrectOrderOfHookCalls)
.seq = kSEQ
});
}
TEST_F(RegistryTest, ReadonlyModeLedgerDataAllowed)
{
auto transactions = std::vector{
util::createTransaction(ripple::TxType::ttNFTOKEN_BURN),
util::createTransaction(ripple::TxType::ttNFTOKEN_BURN),
};
auto ext = MockExtLedgerDataReadonly{};
state_.isWriting = false;
EXPECT_CALL(ext, onLedgerData(testing::_));
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<MockExtLedgerDataReadonly&>(state_, ext);
reg.dispatch(etlng::model::LedgerData{
.transactions = std::move(transactions),
.objects = {},
.successors = {},
.edgeKeys = {},
.header = header,
.rawHeader = {},
.seq = kSEQ
});
}
TEST_F(RegistryTest, ReadonlyModeTransactionAllowed)
{
auto transactions = std::vector{
util::createTransaction(ripple::TxType::ttNFTOKEN_BURN),
util::createTransaction(ripple::TxType::ttNFTOKEN_BURN),
};
auto extTx = MockExtTransactionNftBurnReadonly{};
state_.isWriting = false;
EXPECT_CALL(extTx, onTransaction(testing::_, testing::_)).Times(2);
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<MockExtTransactionNftBurnReadonly&>(state_, extTx);
reg.dispatch(etlng::model::LedgerData{
.transactions = std::move(transactions),
.objects = {},
.successors = {},
.edgeKeys = {},
.header = header,
.rawHeader = {},
.seq = kSEQ
});
}
TEST_F(RegistryTest, ReadonlyModeObjectAllowed)
{
auto objects = std::vector{
util::createObject(),
util::createObject(),
util::createObject(),
};
auto extObj = MockExtOnObjectReadonly{};
state_.isWriting = false;
EXPECT_CALL(extObj, onObject(testing::_, testing::_)).Times(3);
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<MockExtOnObjectReadonly&>(state_, extObj);
reg.dispatch(etlng::model::LedgerData{
.transactions = {},
.objects = std::move(objects),
.successors = {},
.edgeKeys = {},
.header = header,
.rawHeader = {},
.seq = kSEQ
});
}
TEST_F(RegistryTest, ReadonlyModeInitialDataAllowed)
{
auto transactions = std::vector{
util::createTransaction(ripple::TxType::ttNFTOKEN_BURN),
util::createTransaction(ripple::TxType::ttNFTOKEN_BURN),
};
auto extInitialData = MockExtInitialDataReadonly{};
state_.isWriting = false;
EXPECT_CALL(extInitialData, onInitialData(testing::_));
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<MockExtInitialDataReadonly&>(state_, extInitialData);
reg.dispatchInitialData(etlng::model::LedgerData{
.transactions = std::move(transactions),
.objects = {},
.successors = {},
.edgeKeys = {},
.header = header,
.rawHeader = {},
.seq = kSEQ
});
}
TEST_F(RegistryTest, ReadonlyModeInitialTransactionAllowed)
{
auto transactions = std::vector{
util::createTransaction(ripple::TxType::ttNFTOKEN_BURN),
util::createTransaction(ripple::TxType::ttNFTOKEN_BURN),
};
auto extTx = MockExtNftBurnReadonly{};
state_.isWriting = false;
EXPECT_CALL(extTx, onInitialTransaction(testing::_, testing::_)).Times(2);
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<MockExtNftBurnReadonly&>(state_, extTx);
reg.dispatchInitialData(etlng::model::LedgerData{
.transactions = std::move(transactions),
.objects = {},
.successors = {},
.edgeKeys = {},
.header = header,
.rawHeader = {},
.seq = kSEQ
});
}
TEST_F(RegistryTest, ReadonlyModeInitialObjectAllowed)
{
auto extObj = MockExtInitialObjectReadonly{};
state_.isWriting = false;
EXPECT_CALL(extObj, onInitialObject(testing::_, testing::_)).Times(3);
auto reg = Registry<MockExtInitialObjectReadonly&>(state_, extObj);
reg.dispatchInitialObjects(kSEQ, {util::createObject(), util::createObject(), util::createObject()}, {});
}
TEST_F(RegistryTest, ReadonlyModeInitialObjectsAllowed)
{
auto extObjs = MockExtInitialObjectsReadonly{};
state_.isWriting = false;
EXPECT_CALL(extObjs, onInitialObjects(testing::_, testing::_, testing::_));
auto reg = Registry<MockExtInitialObjectsReadonly&>(state_, extObjs);
reg.dispatchInitialObjects(kSEQ, {util::createObject(), util::createObject(), util::createObject()}, {});
}
TEST_F(RegistryTest, ReadonlyModeRegularExtensionsNotCalled)
{
auto extLedgerData = MockExtLedgerData{}; // No allowInReadonly method
auto objects = std::vector{
util::createObject(),
util::createObject(),
util::createObject(),
};
state_.isWriting = false;
EXPECT_CALL(extLedgerData, onLedgerData(testing::_)).Times(0); // Should NOT be called in readonly mode
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<MockExtLedgerData&>(state_, extLedgerData);
reg.dispatch(etlng::model::LedgerData{
.transactions = {},
.objects = std::move(objects),
.successors = {},
.edgeKeys = {},
.header = header,
.rawHeader = {},
.seq = kSEQ
});
}
TEST_F(RegistryTest, MixedReadonlyAndRegularExtensions)
{
auto extReadonly = MockExtLedgerDataReadonly{};
auto extRegular = MockExtLedgerData{};
auto objects = std::vector{
util::createObject(),
util::createObject(),
util::createObject(),
};
state_.isWriting = false;
EXPECT_CALL(extReadonly, onLedgerData(testing::_));
EXPECT_CALL(extRegular, onLedgerData(testing::_)).Times(0); // Should NOT be called in readonly mode
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<MockExtLedgerDataReadonly&, MockExtLedgerData&>(state_, extReadonly, extRegular);
reg.dispatch(etlng::model::LedgerData{
.transactions = {},
.objects = std::move(objects),
.successors = {},
.edgeKeys = {},
.header = header,
.rawHeader = {},
.seq = kSEQ
});
}
TEST_F(RegistryTest, MonitorInterfaceExecution)
{
struct MockMonitor : etlng::MonitorInterface {
MOCK_METHOD(void, notifyLedgerLoaded, (uint32_t), (override));
MOCK_METHOD(boost::signals2::scoped_connection, subscribe, (SignalType::slot_type const&), (override));
MOCK_METHOD(void, run, (std::chrono::steady_clock::duration), (override));
MOCK_METHOD(void, stop, (), (override));
};
auto monitor = MockMonitor{};
EXPECT_CALL(monitor, notifyLedgerLoaded(kSEQ)).Times(1);
monitor.notifyLedgerLoaded(kSEQ);
}
TEST_F(RegistryTest, ReadonlyModeWithAllowInReadonlyTest)
{
struct ExtWithAllowInReadonly {
MOCK_METHOD(void, onLedgerData, (etlng::model::LedgerData const&), (const));
static bool
allowInReadonly()
{
return true;
}
};
auto ext = ExtWithAllowInReadonly{};
state_.isWriting = false;
EXPECT_CALL(ext, onLedgerData(testing::_)).Times(1);
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<ExtWithAllowInReadonly&>(state_, ext);
reg.dispatch(etlng::model::LedgerData{
.transactions = {},
.objects = {},
.successors = {},
.edgeKeys = {},
.header = header,
.rawHeader = {},
.seq = kSEQ
});
}
TEST_F(RegistryTest, ReadonlyModeExecutePluralHooksIfAllowedPaths)
{
struct ExtWithBothHooksAndAllowReadonly {
MOCK_METHOD(void, onLedgerData, (etlng::model::LedgerData const&), (const));
MOCK_METHOD(void, onInitialData, (etlng::model::LedgerData const&), (const));
MOCK_METHOD(void, onInitialObjects, (uint32_t, std::vector<etlng::model::Object> const&, std::string), (const));
static bool
allowInReadonly()
{
return true;
}
};
auto ext = ExtWithBothHooksAndAllowReadonly{};
state_.isWriting = false;
auto transactions = std::vector{
util::createTransaction(ripple::TxType::ttNFTOKEN_BURN),
};
auto objects = std::vector{
util::createObject(),
};
EXPECT_CALL(ext, onLedgerData(testing::_)).Times(1);
EXPECT_CALL(ext, onInitialData(testing::_)).Times(1);
EXPECT_CALL(ext, onInitialObjects(testing::_, testing::_, testing::_)).Times(1);
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<ExtWithBothHooksAndAllowReadonly&>(state_, ext);
reg.dispatch(etlng::model::LedgerData{
.transactions = transactions,
.objects = objects,
.successors = {},
.edgeKeys = {},
.header = header,
.rawHeader = {},
.seq = kSEQ
});
reg.dispatchInitialData(etlng::model::LedgerData{
.transactions = std::move(transactions),
.objects = {},
.successors = {},
.edgeKeys = {},
.header = header,
.rawHeader = {},
.seq = kSEQ
});
reg.dispatchInitialObjects(kSEQ, objects, {});
}
TEST_F(RegistryTest, ReadonlyModeExecuteByOneHooksIfAllowedPaths)
{
struct ExtWithBothHooksAndAllowReadonly {
using spec = etlng::model::Spec<ripple::TxType::ttNFTOKEN_BURN>;
MOCK_METHOD(void, onObject, (uint32_t, etlng::model::Object const&), (const));
MOCK_METHOD(void, onInitialObject, (uint32_t, etlng::model::Object const&), (const));
MOCK_METHOD(void, onTransaction, (uint32_t, etlng::model::Transaction const&), (const));
MOCK_METHOD(void, onInitialTransaction, (uint32_t, etlng::model::Transaction const&), (const));
static bool
allowInReadonly()
{
return true;
}
};
auto ext = ExtWithBothHooksAndAllowReadonly{};
state_.isWriting = false;
auto transactions = std::vector{
util::createTransaction(ripple::TxType::ttNFTOKEN_BURN),
};
auto objects = std::vector{
util::createObject(),
};
EXPECT_CALL(ext, onTransaction(testing::_, testing::_)).Times(1);
EXPECT_CALL(ext, onObject(testing::_, testing::_)).Times(1);
EXPECT_CALL(ext, onInitialTransaction(testing::_, testing::_)).Times(1);
EXPECT_CALL(ext, onInitialObject(testing::_, testing::_)).Times(1);
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
auto reg = Registry<ExtWithBothHooksAndAllowReadonly&>(state_, ext);
reg.dispatch(etlng::model::LedgerData{
.transactions = transactions,
.objects = objects,
.successors = {},
.edgeKeys = {},
.header = header,
.rawHeader = {},
.seq = kSEQ
});
reg.dispatchInitialData(etlng::model::LedgerData{
.transactions = std::move(transactions),
.objects = {},
.successors = {},
.edgeKeys = {},
.header = header,
.rawHeader = {},
.seq = kSEQ
});
reg.dispatchInitialObjects(kSEQ, objects, {});
}

View File

@@ -20,6 +20,7 @@
#include "etlng/ExtractorInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/MonitorInterface.hpp"
#include "etlng/SchedulerInterface.hpp"
#include "etlng/impl/Loading.hpp"
#include "etlng/impl/TaskManager.hpp"
@@ -29,11 +30,13 @@
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/context/BasicExecutionContext.hpp"
#include <boost/signals2/connection.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <memory>
@@ -63,18 +66,27 @@ struct MockLoader : etlng::LoaderInterface {
MOCK_METHOD(std::optional<ripple::LedgerHeader>, loadInitialLedger, (LedgerData const&), (override));
};
struct MockMonitor : etlng::MonitorInterface {
MOCK_METHOD(void, notifyLedgerLoaded, (uint32_t), (override));
MOCK_METHOD(boost::signals2::scoped_connection, subscribe, (SignalType::slot_type const&), (override));
MOCK_METHOD(void, run, (std::chrono::steady_clock::duration), (override));
MOCK_METHOD(void, stop, (), (override));
};
struct TaskManagerTests : NoLoggerFixture {
using MockSchedulerType = testing::NiceMock<MockScheduler>;
using MockExtractorType = testing::NiceMock<MockExtractor>;
using MockLoaderType = testing::NiceMock<MockLoader>;
using MockMonitorType = testing::NiceMock<MockMonitor>;
protected:
util::async::CoroExecutionContext ctx_{2};
std::shared_ptr<MockSchedulerType> mockSchedulerPtr_ = std::make_shared<MockSchedulerType>();
std::shared_ptr<MockExtractorType> mockExtractorPtr_ = std::make_shared<MockExtractorType>();
std::shared_ptr<MockLoaderType> mockLoaderPtr_ = std::make_shared<MockLoaderType>();
std::shared_ptr<MockMonitorType> mockMonitorPtr_ = std::make_shared<MockMonitorType>();
TaskManager taskManager_{ctx_, *mockSchedulerPtr_, *mockExtractorPtr_, *mockLoaderPtr_};
TaskManager taskManager_{ctx_, mockSchedulerPtr_, *mockExtractorPtr_, *mockLoaderPtr_, *mockMonitorPtr_, kSEQ};
};
auto
@@ -97,8 +109,7 @@ createTestData(uint32_t seq)
TEST_F(TaskManagerTests, LoaderGetsDataIfNextSequenceIsExtracted)
{
static constexpr auto kTOTAL = 64uz;
static constexpr auto kEXTRACTORS = 5uz;
static constexpr auto kLOADERS = 1uz;
static constexpr auto kEXTRACTORS = 4uz;
std::atomic_uint32_t seq = kSEQ;
std::vector<uint32_t> loaded;
@@ -118,17 +129,16 @@ TEST_F(TaskManagerTests, LoaderGetsDataIfNextSequenceIsExtracted)
EXPECT_CALL(*mockLoaderPtr_, load(testing::_)).Times(kTOTAL).WillRepeatedly([&](LedgerData data) {
loaded.push_back(data.seq);
if (loaded.size() == kTOTAL) {
done.release();
}
});
auto loop = ctx_.execute([&] { taskManager_.run({.numExtractors = kEXTRACTORS, .numLoaders = kLOADERS}); });
done.acquire();
EXPECT_CALL(*mockMonitorPtr_, notifyLedgerLoaded(testing::_)).Times(kTOTAL);
taskManager_.run(kEXTRACTORS);
done.acquire();
taskManager_.stop();
loop.wait();
EXPECT_EQ(loaded.size(), kTOTAL);
for (std::size_t i = 0; i < loaded.size(); ++i) {

View File

@@ -18,8 +18,10 @@
//==============================================================================
#include "etlng/Models.hpp"
#include "etlng/impl/CacheUpdater.hpp"
#include "etlng/impl/ext/Cache.hpp"
#include "util/BinaryTestObject.hpp"
#include "util/LoggerFixtures.hpp"
#include "util/MockLedgerCache.hpp"
#include "util/MockPrometheus.hpp"
#include "util/TestObject.hpp"
@@ -27,6 +29,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <memory>
#include <utility>
#include <vector>
@@ -56,10 +59,11 @@ createTestData()
} // namespace
struct CacheExtTests : util::prometheus::WithPrometheus {
struct CacheExtTests : NoLoggerFixture, util::prometheus::WithPrometheus {
protected:
MockLedgerCache cache_;
etlng::impl::CacheExt ext_{cache_};
std::shared_ptr<etlng::impl::CacheUpdater> updater_ = std::make_shared<etlng::impl::CacheUpdater>(cache_);
etlng::impl::CacheExt ext_{updater_};
};
TEST_F(CacheExtTests, OnLedgerDataUpdatesCache)
@@ -89,3 +93,8 @@ TEST_F(CacheExtTests, OnInitialObjectsUpdateCache)
ext_.onInitialObjects(kSEQ, objects, kUNUSED_LAST_KEY);
}
TEST_F(CacheExtTests, AllowInReadonlyReturnsTrue)
{
EXPECT_TRUE(ext_.allowInReadonly());
}

View File

@@ -1,195 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "util/StrandedPriorityQueue.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/async/context/BasicExecutionContext.hpp"
#include <gtest/gtest.h>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <thread>
#include <unordered_set>
#include <vector>
using namespace util;
namespace {
struct TestData {
uint32_t seq;
auto
operator<=>(TestData const&) const = default;
};
} // namespace
TEST(StrandedPriorityQueueTests, DefaultPriority)
{
util::async::CoroExecutionContext ctx;
StrandedPriorityQueue<TestData> queue{ctx.makeStrand()};
for (auto i = 0u; i < 100u; ++i) {
EXPECT_TRUE(queue.enqueue(TestData{.seq = i}));
}
EXPECT_FALSE(queue.empty());
auto next = 99u;
while (auto maybeValue = queue.dequeue()) {
EXPECT_EQ(maybeValue->seq, next--);
}
EXPECT_TRUE(queue.empty());
}
TEST(StrandedPriorityQueueTests, CustomPriority)
{
struct Comp {
[[nodiscard]] bool
operator()(TestData const& lhs, TestData const& rhs) const noexcept
{
return lhs.seq > rhs.seq;
}
};
util::async::CoroExecutionContext ctx;
StrandedPriorityQueue<TestData, Comp> queue{ctx.makeStrand()};
for (auto i = 0u; i < 100u; ++i) {
EXPECT_TRUE(queue.enqueue(TestData{.seq = i}));
}
EXPECT_FALSE(queue.empty());
auto next = 0u;
while (auto maybeValue = queue.dequeue()) {
EXPECT_EQ(maybeValue->seq, next++);
}
EXPECT_TRUE(queue.empty());
}
TEST(StrandedPriorityQueueTests, MultipleThreadsUnlimitedQueue)
{
async::CoroExecutionContext realCtx{6};
async::AnyExecutionContext ctx{realCtx};
StrandedPriorityQueue<TestData> queue{ctx.makeStrand()};
EXPECT_TRUE(queue.empty());
static constexpr auto kTOTAL_THREADS = 5u;
static constexpr auto kTOTAL_ITEMS_PER_THREAD = 100u;
std::atomic_size_t totalEnqueued = 0uz;
std::vector<async::AnyOperation<void>> tasks;
tasks.reserve(kTOTAL_THREADS);
for (auto batchIdx = 0u; batchIdx < kTOTAL_THREADS; ++batchIdx) {
// enqueue batches tasks running on multiple threads
tasks.push_back(ctx.execute([&queue, batchIdx, &totalEnqueued] {
for (auto i = 0u; i < kTOTAL_ITEMS_PER_THREAD; ++i) {
if (queue.enqueue(TestData{.seq = (batchIdx * kTOTAL_ITEMS_PER_THREAD) + i}))
++totalEnqueued;
}
}));
}
for (auto& task : tasks)
task.wait();
auto next = (kTOTAL_ITEMS_PER_THREAD * kTOTAL_THREADS) - 1;
while (auto maybeValue = queue.dequeue()) {
EXPECT_EQ(maybeValue->seq, next--);
}
EXPECT_TRUE(queue.empty());
EXPECT_EQ(totalEnqueued, kTOTAL_ITEMS_PER_THREAD * kTOTAL_THREADS);
}
TEST(StrandedPriorityQueueTests, MultipleThreadsLimitedQueue)
{
static constexpr auto kQUEUE_SIZE_LIMIT = 32uz;
static constexpr auto kTOTAL_THREADS = 5u;
static constexpr auto kTOTAL_ITEMS_PER_THREAD = 100u;
async::CoroExecutionContext realCtx{8};
async::AnyExecutionContext ctx{realCtx};
StrandedPriorityQueue<TestData> queue{ctx.makeStrand(), kQUEUE_SIZE_LIMIT};
EXPECT_TRUE(queue.empty());
std::atomic_size_t totalEnqueued = 0uz;
std::atomic_size_t totalSleepCycles = 0uz;
std::vector<async::AnyOperation<void>> tasks;
tasks.reserve(kTOTAL_THREADS);
std::unordered_set<uint32_t> expectedSequences;
for (auto batchIdx = 0u; batchIdx < kTOTAL_THREADS; ++batchIdx) {
for (auto i = 0u; i < kTOTAL_ITEMS_PER_THREAD; ++i) {
expectedSequences.insert((batchIdx * kTOTAL_ITEMS_PER_THREAD) + i);
}
// enqueue batches tasks running on multiple threads
tasks.push_back(ctx.execute([&queue, batchIdx, &totalEnqueued, &totalSleepCycles] {
for (auto i = 0u; i < kTOTAL_ITEMS_PER_THREAD; ++i) {
auto data = TestData{.seq = (batchIdx * kTOTAL_ITEMS_PER_THREAD) + i};
while (not queue.enqueue(data)) {
std::this_thread::sleep_for(std::chrono::nanoseconds{1});
++totalSleepCycles;
}
++totalEnqueued;
}
}));
}
EXPECT_FALSE(expectedSequences.empty());
auto loader = ctx.execute([&queue, &expectedSequences] {
while (not expectedSequences.empty()) {
while (auto maybeValue = queue.dequeue()) {
EXPECT_TRUE(expectedSequences.contains(maybeValue->seq));
expectedSequences.erase(maybeValue->seq);
}
}
});
for (auto& task : tasks)
task.wait();
loader.wait();
EXPECT_TRUE(queue.empty());
EXPECT_TRUE(expectedSequences.empty());
EXPECT_EQ(totalEnqueued, kTOTAL_ITEMS_PER_THREAD * kTOTAL_THREADS);
EXPECT_GE(totalSleepCycles, 1uz);
}
TEST(StrandedPriorityQueueTests, ReturnsNulloptIfQueueEmpty)
{
async::CoroExecutionContext realCtx;
StrandedPriorityQueue<TestData> queue{realCtx.makeStrand()};
EXPECT_TRUE(queue.empty());
auto maybeValue = queue.dequeue();
EXPECT_FALSE(maybeValue.has_value());
}