mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-20 19:56:00 +00:00
feat: New ETL by default (#2752)
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2023, the clio developers.
|
||||
Copyright (c) 2025, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
@@ -20,371 +20,385 @@
|
||||
#include "etl/ETLService.hpp"
|
||||
|
||||
#include "data/BackendInterface.hpp"
|
||||
#include "data/LedgerCacheInterface.hpp"
|
||||
#include "data/Types.hpp"
|
||||
#include "etl/CacheLoader.hpp"
|
||||
#include "etl/CorruptionDetector.hpp"
|
||||
#include "etl/LoadBalancer.hpp"
|
||||
#include "etl/CacheLoaderInterface.hpp"
|
||||
#include "etl/CacheUpdaterInterface.hpp"
|
||||
#include "etl/ETLState.hpp"
|
||||
#include "etl/ExtractorInterface.hpp"
|
||||
#include "etl/InitialLoadObserverInterface.hpp"
|
||||
#include "etl/LedgerPublisherInterface.hpp"
|
||||
#include "etl/LoadBalancerInterface.hpp"
|
||||
#include "etl/LoaderInterface.hpp"
|
||||
#include "etl/MonitorInterface.hpp"
|
||||
#include "etl/MonitorProviderInterface.hpp"
|
||||
#include "etl/NetworkValidatedLedgersInterface.hpp"
|
||||
#include "etl/SystemState.hpp"
|
||||
#include "etl/TaskManagerProviderInterface.hpp"
|
||||
#include "etl/impl/AmendmentBlockHandler.hpp"
|
||||
#include "etl/impl/ExtractionDataPipe.hpp"
|
||||
#include "etl/impl/Extractor.hpp"
|
||||
#include "etl/impl/CacheUpdater.hpp"
|
||||
#include "etl/impl/Extraction.hpp"
|
||||
#include "etl/impl/LedgerFetcher.hpp"
|
||||
#include "etl/impl/LedgerLoader.hpp"
|
||||
#include "etl/impl/LedgerPublisher.hpp"
|
||||
#include "etl/impl/Transformer.hpp"
|
||||
#include "etlng/ETLService.hpp"
|
||||
#include "etlng/ETLServiceInterface.hpp"
|
||||
#include "etlng/LoadBalancer.hpp"
|
||||
#include "etlng/LoadBalancerInterface.hpp"
|
||||
#include "etlng/impl/LedgerPublisher.hpp"
|
||||
#include "etlng/impl/MonitorProvider.hpp"
|
||||
#include "etlng/impl/TaskManagerProvider.hpp"
|
||||
#include "etlng/impl/ext/Cache.hpp"
|
||||
#include "etlng/impl/ext/Core.hpp"
|
||||
#include "etlng/impl/ext/MPT.hpp"
|
||||
#include "etlng/impl/ext/NFT.hpp"
|
||||
#include "etlng/impl/ext/Successor.hpp"
|
||||
#include "feed/SubscriptionManagerInterface.hpp"
|
||||
#include "etl/impl/Loading.hpp"
|
||||
#include "etl/impl/MonitorProvider.hpp"
|
||||
#include "etl/impl/Registry.hpp"
|
||||
#include "etl/impl/Scheduling.hpp"
|
||||
#include "etl/impl/TaskManager.hpp"
|
||||
#include "etl/impl/TaskManagerProvider.hpp"
|
||||
#include "etl/impl/ext/Cache.hpp"
|
||||
#include "etl/impl/ext/Core.hpp"
|
||||
#include "etl/impl/ext/MPT.hpp"
|
||||
#include "etl/impl/ext/NFT.hpp"
|
||||
#include "etl/impl/ext/Successor.hpp"
|
||||
#include "util/Assert.hpp"
|
||||
#include "util/Constants.hpp"
|
||||
#include "util/Profiler.hpp"
|
||||
#include "util/async/AnyExecutionContext.hpp"
|
||||
#include "util/config/ConfigDefinition.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <xrpl/beast/core/CurrentThreadName.h>
|
||||
#include <boost/json/object.hpp>
|
||||
#include <boost/signals2/connection.hpp>
|
||||
#include <xrpl/protocol/LedgerHeader.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <stdexcept>
|
||||
#include <thread>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
namespace etl {
|
||||
|
||||
std::shared_ptr<etlng::ETLServiceInterface>
|
||||
std::shared_ptr<ETLServiceInterface>
|
||||
ETLService::makeETLService(
|
||||
util::config::ClioConfigDefinition const& config,
|
||||
boost::asio::io_context& ioc,
|
||||
util::async::AnyExecutionContext ctx,
|
||||
std::shared_ptr<BackendInterface> backend,
|
||||
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
|
||||
std::shared_ptr<etlng::LoadBalancerInterface> balancer,
|
||||
std::shared_ptr<LoadBalancerInterface> balancer,
|
||||
std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
|
||||
)
|
||||
{
|
||||
std::shared_ptr<etlng::ETLServiceInterface> ret;
|
||||
std::shared_ptr<ETLServiceInterface> ret;
|
||||
|
||||
if (config.get<bool>("__ng_etl")) {
|
||||
ASSERT(
|
||||
std::dynamic_pointer_cast<etlng::LoadBalancer>(balancer), "LoadBalancer type must be etlng::LoadBalancer"
|
||||
);
|
||||
auto state = std::make_shared<SystemState>();
|
||||
state->isStrictReadonly = config.get<bool>("read_only");
|
||||
|
||||
auto state = std::make_shared<etl::SystemState>();
|
||||
state->isStrictReadonly = config.get<bool>("read_only");
|
||||
auto fetcher = std::make_shared<impl::LedgerFetcher>(backend, balancer);
|
||||
auto extractor = std::make_shared<impl::Extractor>(fetcher);
|
||||
auto publisher = std::make_shared<impl::LedgerPublisher>(ioc, backend, subscriptions, *state);
|
||||
auto cacheLoader = std::make_shared<CacheLoader<>>(config, backend, backend->cache());
|
||||
auto cacheUpdater = std::make_shared<impl::CacheUpdater>(backend->cache());
|
||||
auto amendmentBlockHandler = std::make_shared<impl::AmendmentBlockHandler>(ctx, *state);
|
||||
auto monitorProvider = std::make_shared<impl::MonitorProvider>();
|
||||
|
||||
auto fetcher = std::make_shared<etl::impl::LedgerFetcher>(backend, balancer);
|
||||
auto extractor = std::make_shared<etlng::impl::Extractor>(fetcher);
|
||||
auto publisher = std::make_shared<etlng::impl::LedgerPublisher>(ioc, backend, subscriptions, *state);
|
||||
auto cacheLoader = std::make_shared<etl::CacheLoader<>>(config, backend, backend->cache());
|
||||
auto cacheUpdater = std::make_shared<etlng::impl::CacheUpdater>(backend->cache());
|
||||
auto amendmentBlockHandler = std::make_shared<etlng::impl::AmendmentBlockHandler>(ctx, *state);
|
||||
auto monitorProvider = std::make_shared<etlng::impl::MonitorProvider>();
|
||||
backend->setCorruptionDetector(CorruptionDetector{*state, backend->cache()});
|
||||
|
||||
backend->setCorruptionDetector(CorruptionDetector{*state, backend->cache()});
|
||||
auto loader = std::make_shared<impl::Loader>(
|
||||
backend,
|
||||
impl::makeRegistry(
|
||||
*state,
|
||||
impl::CacheExt{cacheUpdater},
|
||||
impl::CoreExt{backend},
|
||||
impl::SuccessorExt{backend, backend->cache()},
|
||||
impl::NFTExt{backend},
|
||||
impl::MPTExt{backend}
|
||||
),
|
||||
amendmentBlockHandler,
|
||||
state
|
||||
);
|
||||
|
||||
auto loader = std::make_shared<etlng::impl::Loader>(
|
||||
backend,
|
||||
etlng::impl::makeRegistry(
|
||||
*state,
|
||||
etlng::impl::CacheExt{cacheUpdater},
|
||||
etlng::impl::CoreExt{backend},
|
||||
etlng::impl::SuccessorExt{backend, backend->cache()},
|
||||
etlng::impl::NFTExt{backend},
|
||||
etlng::impl::MPTExt{backend}
|
||||
),
|
||||
amendmentBlockHandler,
|
||||
state
|
||||
);
|
||||
auto taskManagerProvider = std::make_shared<impl::TaskManagerProvider>(*ledgers, extractor, loader);
|
||||
|
||||
auto taskManagerProvider = std::make_shared<etlng::impl::TaskManagerProvider>(*ledgers, extractor, loader);
|
||||
|
||||
ret = std::make_shared<etlng::ETLService>(
|
||||
ctx,
|
||||
config,
|
||||
backend,
|
||||
balancer,
|
||||
ledgers,
|
||||
publisher,
|
||||
cacheLoader,
|
||||
cacheUpdater,
|
||||
extractor,
|
||||
loader, // loader itself
|
||||
loader, // initial load observer
|
||||
taskManagerProvider,
|
||||
monitorProvider,
|
||||
state
|
||||
);
|
||||
} else {
|
||||
ASSERT(std::dynamic_pointer_cast<etl::LoadBalancer>(balancer), "LoadBalancer type must be etl::LoadBalancer");
|
||||
ret = std::make_shared<etl::ETLService>(config, ioc, backend, subscriptions, balancer, ledgers);
|
||||
}
|
||||
ret = std::make_shared<ETLService>(
|
||||
ctx,
|
||||
config,
|
||||
backend,
|
||||
balancer,
|
||||
ledgers,
|
||||
publisher,
|
||||
cacheLoader,
|
||||
cacheUpdater,
|
||||
extractor,
|
||||
loader, // loader itself
|
||||
loader, // initial load observer
|
||||
taskManagerProvider,
|
||||
monitorProvider,
|
||||
state
|
||||
);
|
||||
|
||||
// inject networkID into subscriptions, as transaction feed require it to inject CTID in response
|
||||
if (auto const state = ret->getETLState(); state)
|
||||
subscriptions->setNetworkID(state->networkID);
|
||||
if (auto const etlState = ret->getETLState(); etlState)
|
||||
subscriptions->setNetworkID(etlState->networkID);
|
||||
|
||||
ret->run();
|
||||
return ret;
|
||||
}
|
||||
|
||||
// Database must be populated when this starts
|
||||
std::optional<uint32_t>
|
||||
ETLService::runETLPipeline(uint32_t startSequence, uint32_t numExtractors)
|
||||
ETLService::ETLService(
|
||||
util::async::AnyExecutionContext ctx,
|
||||
std::reference_wrapper<util::config::ClioConfigDefinition const> config,
|
||||
std::shared_ptr<data::BackendInterface> backend,
|
||||
std::shared_ptr<LoadBalancerInterface> balancer,
|
||||
std::shared_ptr<NetworkValidatedLedgersInterface> ledgers,
|
||||
std::shared_ptr<LedgerPublisherInterface> publisher,
|
||||
std::shared_ptr<CacheLoaderInterface> cacheLoader,
|
||||
std::shared_ptr<CacheUpdaterInterface> cacheUpdater,
|
||||
std::shared_ptr<ExtractorInterface> extractor,
|
||||
std::shared_ptr<LoaderInterface> loader,
|
||||
std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver,
|
||||
std::shared_ptr<TaskManagerProviderInterface> taskManagerProvider,
|
||||
std::shared_ptr<MonitorProviderInterface> monitorProvider,
|
||||
std::shared_ptr<SystemState> state
|
||||
)
|
||||
: ctx_(std::move(ctx))
|
||||
, config_(config)
|
||||
, backend_(std::move(backend))
|
||||
, balancer_(std::move(balancer))
|
||||
, ledgers_(std::move(ledgers))
|
||||
, publisher_(std::move(publisher))
|
||||
, cacheLoader_(std::move(cacheLoader))
|
||||
, cacheUpdater_(std::move(cacheUpdater))
|
||||
, extractor_(std::move(extractor))
|
||||
, loader_(std::move(loader))
|
||||
, initialLoadObserver_(std::move(initialLoadObserver))
|
||||
, taskManagerProvider_(std::move(taskManagerProvider))
|
||||
, monitorProvider_(std::move(monitorProvider))
|
||||
, state_(std::move(state))
|
||||
, startSequence_(config.get().maybeValue<uint32_t>("start_sequence"))
|
||||
, finishSequence_(config.get().maybeValue<uint32_t>("finish_sequence"))
|
||||
{
|
||||
if (finishSequence_ && startSequence > *finishSequence_)
|
||||
return {};
|
||||
ASSERT(not state_->isWriting, "ETL should never start in writer mode");
|
||||
|
||||
LOG(log_.debug()) << "Wait for cache containing seq " << startSequence - 1
|
||||
<< " current cache last seq =" << backend_->cache().latestLedgerSequence();
|
||||
backend_->cache().waitUntilCacheContainsSeq(startSequence - 1);
|
||||
if (startSequence_.has_value())
|
||||
LOG(log_.info()) << "Start sequence: " << *startSequence_;
|
||||
|
||||
LOG(log_.debug()) << "Starting etl pipeline";
|
||||
state_.isWriting = true;
|
||||
if (finishSequence_.has_value())
|
||||
LOG(log_.info()) << "Finish sequence: " << *finishSequence_;
|
||||
|
||||
auto const rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||
ASSERT(rng.has_value(), "Parent ledger range can't be null");
|
||||
ASSERT(
|
||||
rng->maxSequence >= startSequence - 1,
|
||||
"Got not parent ledger. rnd->maxSequence = {}, startSequence = {}",
|
||||
rng->maxSequence,
|
||||
startSequence
|
||||
);
|
||||
|
||||
auto const begin = std::chrono::system_clock::now();
|
||||
auto extractors = std::vector<std::unique_ptr<ExtractorType>>{};
|
||||
auto pipe = DataPipeType{numExtractors, startSequence};
|
||||
|
||||
for (auto i = 0u; i < numExtractors; ++i) {
|
||||
extractors.push_back(
|
||||
std::make_unique<ExtractorType>(
|
||||
pipe, networkValidatedLedgers_, ledgerFetcher_, startSequence + i, finishSequence_, state_
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
auto transformer =
|
||||
TransformerType{pipe, backend_, ledgerLoader_, ledgerPublisher_, amendmentBlockHandler_, startSequence, state_};
|
||||
transformer.waitTillFinished(); // suspend current thread until exit condition is met
|
||||
pipe.cleanup(); // TODO: this should probably happen automatically using destructor
|
||||
|
||||
// wait for all of the extractors to stop
|
||||
for (auto& t : extractors)
|
||||
t->waitTillFinished();
|
||||
|
||||
auto const end = std::chrono::system_clock::now();
|
||||
auto const lastPublishedSeq = ledgerPublisher_.getLastPublishedSequence();
|
||||
static constexpr auto kNANOSECONDS_PER_SECOND = 1'000'000'000.0;
|
||||
LOG(log_.debug()) << "Extracted and wrote " << lastPublishedSeq.value_or(startSequence) - startSequence << " in "
|
||||
<< ((end - begin).count()) / kNANOSECONDS_PER_SECOND;
|
||||
|
||||
state_.isWriting = false;
|
||||
|
||||
LOG(log_.debug()) << "Stopping etl pipeline";
|
||||
return lastPublishedSeq;
|
||||
LOG(log_.info()) << "Starting in " << (state_->isStrictReadonly ? "STRICT READONLY MODE" : "WRITE MODE");
|
||||
}
|
||||
|
||||
// Main loop of ETL.
|
||||
// The software begins monitoring the ledgers that are validated by the network.
|
||||
// The member networkValidatedLedgers_ keeps track of the sequences of ledgers validated by the network.
|
||||
// Whenever a ledger is validated by the network, the software looks for that ledger in the database. Once the ledger is
|
||||
// found in the database, the software publishes that ledger to the ledgers stream. If a network validated ledger is not
|
||||
// found in the database after a certain amount of time, then the software attempts to take over responsibility of the
|
||||
// ETL process, where it writes new ledgers to the database. The software will relinquish control of the ETL process if
|
||||
// it detects that another process has taken over ETL.
|
||||
void
|
||||
ETLService::monitor()
|
||||
ETLService::~ETLService()
|
||||
{
|
||||
auto rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||
if (!rng) {
|
||||
LOG(log_.info()) << "Database is empty. Will download a ledger from the network.";
|
||||
std::optional<ripple::LedgerHeader> ledger;
|
||||
|
||||
try {
|
||||
if (startSequence_) {
|
||||
LOG(log_.info()) << "ledger sequence specified in config. "
|
||||
<< "Will begin ETL process starting with ledger " << *startSequence_;
|
||||
ledger = ledgerLoader_.loadInitialLedger(*startSequence_);
|
||||
} else {
|
||||
LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
|
||||
std::optional<uint32_t> mostRecentValidated = networkValidatedLedgers_->getMostRecent();
|
||||
|
||||
if (mostRecentValidated) {
|
||||
LOG(log_.info()) << "Ledger " << *mostRecentValidated << " has been validated. Downloading...";
|
||||
ledger = ledgerLoader_.loadInitialLedger(*mostRecentValidated);
|
||||
} else {
|
||||
LOG(log_.info()) << "The wait for the next validated ledger has been aborted. "
|
||||
"Exiting monitor loop";
|
||||
return;
|
||||
}
|
||||
}
|
||||
} catch (std::runtime_error const& e) {
|
||||
LOG(log_.fatal()) << "Failed to load initial ledger: " << e.what();
|
||||
amendmentBlockHandler_.notifyAmendmentBlocked();
|
||||
return;
|
||||
}
|
||||
|
||||
if (ledger) {
|
||||
rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||
} else {
|
||||
LOG(log_.error()) << "Failed to load initial ledger. Exiting monitor loop";
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
if (startSequence_)
|
||||
LOG(log_.warn()) << "start sequence specified but db is already populated";
|
||||
|
||||
LOG(log_.info()) << "Database already populated. Picking up from the tip of history";
|
||||
cacheLoader_.load(rng->maxSequence);
|
||||
}
|
||||
|
||||
ASSERT(rng.has_value(), "Ledger range can't be null");
|
||||
uint32_t nextSequence = rng->maxSequence + 1;
|
||||
|
||||
LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence;
|
||||
|
||||
while (not isStopping()) {
|
||||
nextSequence = publishNextSequence(nextSequence);
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t
|
||||
ETLService::publishNextSequence(uint32_t nextSequence)
|
||||
{
|
||||
if (auto rng = backend_->hardFetchLedgerRangeNoThrow(); rng && rng->maxSequence >= nextSequence) {
|
||||
ledgerPublisher_.publish(nextSequence, {});
|
||||
++nextSequence;
|
||||
} else if (networkValidatedLedgers_->waitUntilValidatedByNetwork(nextSequence, util::kMILLISECONDS_PER_SECOND)) {
|
||||
LOG(log_.info()) << "Ledger with sequence = " << nextSequence << " has been validated by the network. "
|
||||
<< "Attempting to find in database and publish";
|
||||
|
||||
// Attempt to take over responsibility of ETL writer after 10 failed
|
||||
// attempts to publish the ledger. publishLedger() fails if the
|
||||
// ledger that has been validated by the network is not found in the
|
||||
// database after the specified number of attempts. publishLedger()
|
||||
// waits one second between each attempt to read the ledger from the
|
||||
// database
|
||||
constexpr size_t kTIMEOUT_SECONDS = 10;
|
||||
bool const success = ledgerPublisher_.publish(nextSequence, kTIMEOUT_SECONDS);
|
||||
|
||||
if (!success) {
|
||||
LOG(log_.warn()) << "Failed to publish ledger with sequence = " << nextSequence << " . Beginning ETL";
|
||||
|
||||
// returns the most recent sequence published. empty optional if no sequence was published
|
||||
std::optional<uint32_t> lastPublished = runETLPipeline(nextSequence, extractorThreads_);
|
||||
LOG(log_.info()) << "Aborting ETL. Falling back to publishing";
|
||||
|
||||
// if no ledger was published, don't increment nextSequence
|
||||
if (lastPublished)
|
||||
nextSequence = *lastPublished + 1;
|
||||
} else {
|
||||
++nextSequence;
|
||||
}
|
||||
}
|
||||
return nextSequence;
|
||||
}
|
||||
|
||||
void
|
||||
ETLService::monitorReadOnly()
|
||||
{
|
||||
LOG(log_.debug()) << "Starting reporting in strict read only mode";
|
||||
|
||||
auto const latestSequenceOpt = [this]() -> std::optional<uint32_t> {
|
||||
auto rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||
|
||||
if (!rng) {
|
||||
if (auto net = networkValidatedLedgers_->getMostRecent()) {
|
||||
return net;
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return rng->maxSequence;
|
||||
}();
|
||||
|
||||
if (!latestSequenceOpt.has_value()) {
|
||||
return;
|
||||
}
|
||||
|
||||
uint32_t latestSequence = *latestSequenceOpt;
|
||||
|
||||
cacheLoader_.load(latestSequence);
|
||||
latestSequence++;
|
||||
|
||||
while (not isStopping()) {
|
||||
if (auto rng = backend_->hardFetchLedgerRangeNoThrow(); rng && rng->maxSequence >= latestSequence) {
|
||||
ledgerPublisher_.publish(latestSequence, {});
|
||||
latestSequence = latestSequence + 1;
|
||||
} else {
|
||||
// if we can't, wait until it's validated by the network, or 1 second passes, whichever occurs
|
||||
// first. Even if we don't hear from rippled, if ledgers are being written to the db, we publish
|
||||
// them.
|
||||
networkValidatedLedgers_->waitUntilValidatedByNetwork(latestSequence, util::kMILLISECONDS_PER_SECOND);
|
||||
}
|
||||
}
|
||||
stop();
|
||||
LOG(log_.debug()) << "Destroying ETL";
|
||||
}
|
||||
|
||||
void
|
||||
ETLService::run()
|
||||
{
|
||||
LOG(log_.info()) << "Starting reporting etl";
|
||||
state_.isStopping = false;
|
||||
LOG(log_.info()) << "Running ETL...";
|
||||
|
||||
doWork();
|
||||
mainLoop_.emplace(ctx_.execute([this] {
|
||||
auto const rng = loadInitialLedgerIfNeeded();
|
||||
|
||||
LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
|
||||
std::optional<uint32_t> const mostRecentValidated = ledgers_->getMostRecent();
|
||||
|
||||
if (not mostRecentValidated) {
|
||||
LOG(log_.info()) << "The wait for the next validated ledger has been aborted. "
|
||||
"Exiting monitor loop";
|
||||
return;
|
||||
}
|
||||
|
||||
if (not rng.has_value()) {
|
||||
LOG(log_.warn()) << "Initial ledger download got cancelled - stopping ETL service";
|
||||
return;
|
||||
}
|
||||
|
||||
auto const nextSequence = rng->maxSequence + 1;
|
||||
|
||||
LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence;
|
||||
startMonitor(nextSequence);
|
||||
|
||||
// If we are a writer as the result of loading the initial ledger - start loading
|
||||
if (state_->isWriting)
|
||||
startLoading(nextSequence);
|
||||
}));
|
||||
}
|
||||
|
||||
void
|
||||
ETLService::doWork()
|
||||
ETLService::stop()
|
||||
{
|
||||
worker_ = std::thread([this]() {
|
||||
beast::setCurrentThreadName("ETLService worker");
|
||||
LOG(log_.info()) << "Stop called";
|
||||
|
||||
if (state_.isStrictReadonly) {
|
||||
monitorReadOnly();
|
||||
} else {
|
||||
monitor();
|
||||
if (mainLoop_)
|
||||
mainLoop_->wait();
|
||||
if (taskMan_)
|
||||
taskMan_->stop();
|
||||
if (monitor_)
|
||||
monitor_->stop();
|
||||
}
|
||||
|
||||
boost::json::object
|
||||
ETLService::getInfo() const
|
||||
{
|
||||
boost::json::object result;
|
||||
|
||||
result["etl_sources"] = balancer_->toJson();
|
||||
result["is_writer"] = static_cast<int>(state_->isWriting);
|
||||
result["read_only"] = static_cast<int>(state_->isStrictReadonly);
|
||||
auto last = publisher_->getLastPublish();
|
||||
if (last.time_since_epoch().count() != 0)
|
||||
result["last_publish_age_seconds"] = std::to_string(publisher_->lastPublishAgeSeconds());
|
||||
return result;
|
||||
}
|
||||
|
||||
bool
|
||||
ETLService::isAmendmentBlocked() const
|
||||
{
|
||||
return state_->isAmendmentBlocked;
|
||||
}
|
||||
|
||||
bool
|
||||
ETLService::isCorruptionDetected() const
|
||||
{
|
||||
return state_->isCorruptionDetected;
|
||||
}
|
||||
|
||||
std::optional<ETLState>
|
||||
ETLService::getETLState() const
|
||||
{
|
||||
return balancer_->getETLState();
|
||||
}
|
||||
|
||||
std::uint32_t
|
||||
ETLService::lastCloseAgeSeconds() const
|
||||
{
|
||||
return publisher_->lastCloseAgeSeconds();
|
||||
}
|
||||
|
||||
std::optional<data::LedgerRange>
|
||||
ETLService::loadInitialLedgerIfNeeded()
|
||||
{
|
||||
auto rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||
if (not rng.has_value()) {
|
||||
ASSERT(
|
||||
not state_->isStrictReadonly,
|
||||
"Database is empty but this node is in strict readonly mode. Can't write initial ledger."
|
||||
);
|
||||
|
||||
LOG(log_.info()) << "Database is empty. Will download a ledger from the network.";
|
||||
state_->isWriting = true; // immediately become writer as the db is empty
|
||||
|
||||
auto const getMostRecent = [this]() {
|
||||
LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
|
||||
return ledgers_->getMostRecent();
|
||||
};
|
||||
|
||||
if (auto const maybeSeq = startSequence_.or_else(getMostRecent); maybeSeq.has_value()) {
|
||||
auto const seq = *maybeSeq;
|
||||
LOG(log_.info()) << "Starting from sequence " << seq
|
||||
<< ". Initial ledger download and extraction can take a while...";
|
||||
|
||||
auto [ledger, timeDiff] = ::util::timed<std::chrono::duration<double>>([this, seq]() {
|
||||
return extractor_->extractLedgerOnly(seq).and_then(
|
||||
[this, seq](auto&& data) -> std::optional<ripple::LedgerHeader> {
|
||||
// TODO: loadInitialLedger in balancer should be called fetchEdgeKeys or similar
|
||||
auto res = balancer_->loadInitialLedger(seq, *initialLoadObserver_);
|
||||
if (not res.has_value() and res.error() == InitialLedgerLoadError::Cancelled) {
|
||||
LOG(log_.debug()) << "Initial ledger load got cancelled";
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
ASSERT(res.has_value(), "Initial ledger retry logic failed");
|
||||
data.edgeKeys = std::move(res).value();
|
||||
|
||||
return loader_->loadInitialLedger(data);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
if (not ledger.has_value()) {
|
||||
LOG(log_.error()) << "Failed to load initial ledger. Exiting monitor loop";
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff;
|
||||
LOG(log_.info()) << "Finished loadInitialLedger. cache size = " << backend_->cache().size();
|
||||
|
||||
return backend_->hardFetchLedgerRangeNoThrow();
|
||||
}
|
||||
});
|
||||
|
||||
LOG(log_.info()) << "The wait for the next validated ledger has been aborted. "
|
||||
"Exiting monitor loop";
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
LOG(log_.info()) << "Database already populated. Picking up from the tip of history";
|
||||
cacheLoader_->load(rng->maxSequence);
|
||||
|
||||
return rng;
|
||||
}
|
||||
|
||||
ETLService::ETLService(
|
||||
util::config::ClioConfigDefinition const& config,
|
||||
boost::asio::io_context& ioc,
|
||||
std::shared_ptr<BackendInterface> backend,
|
||||
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
|
||||
std::shared_ptr<etlng::LoadBalancerInterface> balancer,
|
||||
std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
|
||||
)
|
||||
: backend_(backend)
|
||||
, loadBalancer_(balancer)
|
||||
, networkValidatedLedgers_(std::move(ledgers))
|
||||
, cacheLoader_(config, backend, backend->cache())
|
||||
, ledgerFetcher_(backend, balancer)
|
||||
, ledgerLoader_(backend, balancer, ledgerFetcher_, state_)
|
||||
, ledgerPublisher_(ioc, backend, backend->cache(), subscriptions, state_)
|
||||
, amendmentBlockHandler_(ioc, state_)
|
||||
void
|
||||
ETLService::startMonitor(uint32_t seq)
|
||||
{
|
||||
startSequence_ = config.maybeValue<uint32_t>("start_sequence");
|
||||
finishSequence_ = config.maybeValue<uint32_t>("finish_sequence");
|
||||
state_.isStrictReadonly = config.get<bool>("read_only");
|
||||
extractorThreads_ = config.get<uint32_t>("extractor_threads");
|
||||
monitor_ = monitorProvider_->make(ctx_, backend_, ledgers_, seq);
|
||||
|
||||
// This should probably be done in the backend factory but we don't have state available until here
|
||||
backend_->setCorruptionDetector(CorruptionDetector{state_, backend->cache()});
|
||||
monitorNewSeqSubscription_ = monitor_->subscribeToNewSequence([this](uint32_t seq) {
|
||||
LOG(log_.info()) << "ETLService (via Monitor) got new seq from db: " << seq;
|
||||
|
||||
if (state_->writeConflict) {
|
||||
LOG(log_.info()) << "Got a write conflict; Giving up writer seat immediately";
|
||||
giveUpWriter();
|
||||
}
|
||||
|
||||
if (not state_->isWriting) {
|
||||
auto const diff = data::synchronousAndRetryOnTimeout([this, seq](auto yield) {
|
||||
return backend_->fetchLedgerDiff(seq, yield);
|
||||
});
|
||||
|
||||
cacheUpdater_->update(seq, diff);
|
||||
backend_->updateRange(seq);
|
||||
}
|
||||
|
||||
publisher_->publish(seq, {});
|
||||
});
|
||||
|
||||
monitorDbStalledSubscription_ = monitor_->subscribeToDbStalled([this]() {
|
||||
LOG(log_.warn()) << "ETLService received DbStalled signal from Monitor";
|
||||
if (not state_->isStrictReadonly and not state_->isWriting)
|
||||
attemptTakeoverWriter();
|
||||
});
|
||||
|
||||
monitor_->run();
|
||||
}
|
||||
|
||||
void
|
||||
ETLService::startLoading(uint32_t seq)
|
||||
{
|
||||
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
|
||||
taskMan_ = taskManagerProvider_->make(ctx_, *monitor_, seq, finishSequence_);
|
||||
|
||||
// FIXME: this legacy name "extractor_threads" is no longer accurate (we have coroutines now)
|
||||
taskMan_->run(config_.get().get<std::size_t>("extractor_threads"));
|
||||
}
|
||||
|
||||
void
|
||||
ETLService::attemptTakeoverWriter()
|
||||
{
|
||||
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
|
||||
auto rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||
ASSERT(rng.has_value(), "Ledger range can't be null");
|
||||
|
||||
state_->isWriting = true; // switch to writer
|
||||
LOG(log_.info()) << "Taking over the ETL writer seat";
|
||||
startLoading(rng->maxSequence + 1);
|
||||
}
|
||||
|
||||
void
|
||||
ETLService::giveUpWriter()
|
||||
{
|
||||
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
|
||||
state_->isWriting = false;
|
||||
state_->writeConflict = false;
|
||||
taskMan_ = nullptr;
|
||||
}
|
||||
|
||||
} // namespace etl
|
||||
|
||||
Reference in New Issue
Block a user