feat: Support start and finish sequence in ETLng (#2226)

This PR adds support for start and finish sequence specified through the
config (for parity).
This commit is contained in:
Alex Kremer
2025-06-13 17:39:21 +01:00
committed by GitHub
parent 7fcabd1ce7
commit 3d3db68508
6 changed files with 60 additions and 22 deletions

View File

@@ -97,6 +97,8 @@ ETLService::makeETLService(
auto amendmentBlockHandler = std::make_shared<etlng::impl::AmendmentBlockHandler>(ctx, *state);
auto monitorProvider = std::make_shared<etlng::impl::MonitorProvider>();
backend->setCorruptionDetector(CorruptionDetector{*state, backend->cache()});
auto loader = std::make_shared<etlng::impl::Loader>(
backend,
etlng::impl::makeRegistry(

View File

@@ -52,7 +52,6 @@
#include "util/Assert.hpp"
#include "util/Profiler.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/log/Logger.hpp"
#include <boost/json/object.hpp>
@@ -100,8 +99,17 @@ ETLService::ETLService(
, taskManagerProvider_(std::move(taskManagerProvider))
, monitorProvider_(std::move(monitorProvider))
, state_(std::move(state))
, startSequence_(config.get().maybeValue<uint32_t>("start_sequence"))
, finishSequence_(config.get().maybeValue<uint32_t>("finish_sequence"))
{
ASSERT(not state_->isWriting, "ETL should never start in writer mode");
if (startSequence_.has_value())
LOG(log_.info()) << "Start sequence: " << *startSequence_;
if (finishSequence_.has_value())
LOG(log_.info()) << "Finish sequence: " << *finishSequence_;
LOG(log_.info()) << "Starting in " << (state_->isStrictReadonly ? "STRICT READONLY MODE" : "WRITE MODE");
}
@@ -204,11 +212,15 @@ ETLService::loadInitialLedgerIfNeeded()
LOG(log_.info()) << "Database is empty. Will download a ledger from the network.";
state_->isWriting = true; // immediately become writer as the db is empty
LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
if (auto const mostRecentValidated = ledgers_->getMostRecent(); mostRecentValidated.has_value()) {
auto const seq = *mostRecentValidated;
LOG(log_.info()) << "Ledger " << seq << " has been validated. "
<< "Downloading and extracting (takes a while)...";
auto const getMostRecent = [this]() {
LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
return ledgers_->getMostRecent();
};
if (auto const maybeSeq = startSequence_.or_else(getMostRecent); maybeSeq.has_value()) {
auto const seq = *maybeSeq;
LOG(log_.info()) << "Starting from sequence " << seq
<< ". Initial ledger download and extraction can take a while...";
auto [ledger, timeDiff] = ::util::timed<std::chrono::duration<double>>([this, seq]() {
return extractor_->extractLedgerOnly(seq).and_then([this, seq](auto&& data) {
@@ -280,7 +292,7 @@ void
ETLService::startLoading(uint32_t seq)
{
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
taskMan_ = taskManagerProvider_->make(ctx_, *monitor_, seq);
taskMan_ = taskManagerProvider_->make(ctx_, *monitor_, seq, finishSequence_);
taskMan_->run(config_.get().get<std::size_t>("extractor_threads"));
}

View File

@@ -69,7 +69,6 @@
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/protocol/TxMeta.h>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <functional>
@@ -110,6 +109,9 @@ class ETLService : public ETLServiceInterface {
std::shared_ptr<etlng::MonitorProviderInterface> monitorProvider_;
std::shared_ptr<etl::SystemState> state_;
std::optional<uint32_t> startSequence_;
std::optional<uint32_t> finishSequence_;
std::unique_ptr<MonitorInterface> monitor_;
std::unique_ptr<TaskManagerInterface> taskMan_;

View File

@@ -27,6 +27,7 @@
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
namespace etlng {
@@ -41,11 +42,17 @@ struct TaskManagerProviderInterface {
*
* @param ctx The async context to associate the task manager instance with
* @param monitor The monitor to notify when ledger is loaded
* @param seq The sequence to start at
* @param startSeq The sequence to start at
* @param finishSeq The sequence to stop at if specified
* @return A unique pointer to a TaskManager implementation
*/
[[nodiscard]] virtual std::unique_ptr<TaskManagerInterface>
make(util::async::AnyExecutionContext ctx, std::reference_wrapper<MonitorInterface> monitor, uint32_t seq) = 0;
make(
util::async::AnyExecutionContext ctx,
std::reference_wrapper<MonitorInterface> monitor,
uint32_t startSeq,
std::optional<uint32_t> finishSeq = std::nullopt
) = 0;
};
} // namespace etlng

View File

@@ -32,6 +32,7 @@
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <utility>
namespace etlng::impl {
@@ -62,12 +63,19 @@ public:
}
std::unique_ptr<TaskManagerInterface>
make(util::async::AnyExecutionContext ctx, std::reference_wrapper<MonitorInterface> monitor, uint32_t seq) override
make(
util::async::AnyExecutionContext ctx,
std::reference_wrapper<MonitorInterface> monitor,
uint32_t startSeq,
std::optional<uint32_t> finishSeq
) override
{
auto scheduler = impl::makeScheduler(impl::ForwardScheduler{ledgers_, seq});
// TODO: add impl::BackfillScheduler{seq - 1, seq - 1000},
auto scheduler = impl::makeScheduler(impl::ForwardScheduler{ledgers_, startSeq, finishSeq});
// TODO: add impl::BackfillScheduler{startSeq - 1, startSeq - ...},
return std::make_unique<TaskManager>(std::move(ctx), std::move(scheduler), *extractor_, *loader_, monitor, seq);
return std::make_unique<TaskManager>(
std::move(ctx), std::move(scheduler), *extractor_, *loader_, monitor, startSeq
);
}
};

View File

@@ -46,6 +46,7 @@
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/async/context/SyncExecutionContext.hpp"
#include "util/async/impl/ErasedOperation.hpp"
#include "util/config/ConfigConstraints.hpp"
#include "util/config/ConfigDefinition.hpp"
#include "util/config/ConfigValue.hpp"
#include "util/config/Types.hpp"
@@ -135,7 +136,10 @@ struct MockTaskManagerProvider : etlng::TaskManagerProviderInterface {
MOCK_METHOD(
std::unique_ptr<etlng::TaskManagerInterface>,
make,
(util::async::AnyExecutionContext, std::reference_wrapper<etlng::MonitorInterface>, uint32_t),
(util::async::AnyExecutionContext,
std::reference_wrapper<etlng::MonitorInterface>,
uint32_t,
std::optional<uint32_t>),
(override)
);
};
@@ -181,6 +185,8 @@ protected:
SameThreadTestContext ctx_;
util::config::ClioConfigDefinition config_{
{"read_only", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
{"start_sequence", ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint32)},
{"finish_sequence", ConfigValue{ConfigType::Integer}.optional().withConstraint(gValidateUint32)},
{"extractor_threads", ConfigValue{ConfigType::Integer}.defaultValue(4)},
{"io_threads", ConfigValue{ConfigType::Integer}.defaultValue(2)},
{"cache.num_diffs", ConfigValue{ConfigType::Integer}.defaultValue(32)},
@@ -306,7 +312,7 @@ TEST_F(ETLServiceTests, RunWithEmptyDatabase)
.InSequence(s)
.WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
EXPECT_CALL(mockTaskManagerRef, run);
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1))
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1, testing::_))
.WillOnce(testing::Return(std::unique_ptr<etlng::TaskManagerInterface>(mockTaskManager.release())));
EXPECT_CALL(*monitorProvider_, make(testing::_, testing::_, testing::_, testing::_, testing::_))
.WillOnce([](auto, auto, auto, auto, auto) { return std::make_unique<testing::NiceMock<MockMonitor>>(); });
@@ -318,8 +324,9 @@ TEST_F(ETLServiceTests, RunWithPopulatedDatabase)
{
EXPECT_CALL(*backend_, hardFetchLedgerRange)
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
EXPECT_CALL(*monitorProvider_, make(testing::_, testing::_, testing::_, testing::_, testing::_))
.WillOnce([](auto, auto, auto, auto, auto) { return std::make_unique<testing::NiceMock<MockMonitor>>(); });
EXPECT_CALL(*monitorProvider_, make).WillOnce([](auto, auto, auto, auto, auto) {
return std::make_unique<testing::NiceMock<MockMonitor>>();
});
EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ));
EXPECT_CALL(*cacheLoader_, load(kSEQ));
@@ -335,7 +342,7 @@ TEST_F(ETLServiceTests, WaitForValidatedLedgerIsAborted)
EXPECT_CALL(*extractor_, extractLedgerOnly).Times(0);
EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CALL(*loader_, loadInitialLedger).Times(0);
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CALL(*taskManagerProvider_, make).Times(0);
service_.run();
}
@@ -437,7 +444,7 @@ TEST_F(ETLServiceTests, AttemptTakeoverWriter)
auto& mockTaskManagerRef = *mockTaskManager;
EXPECT_CALL(mockTaskManagerRef, run);
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1))
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1, testing::_))
.WillOnce(testing::Return(std::move(mockTaskManager)));
ASSERT_TRUE(capturedDbStalledCallback);
@@ -492,7 +499,7 @@ TEST_F(ETLServiceAssertTests, FailToLoadInitialLedger)
// These calls should not happen because loading the initial ledger fails
EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CALL(*loader_, loadInitialLedger).Times(0);
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CALL(*taskManagerProvider_, make).Times(0);
EXPECT_CLIO_ASSERT_FAIL({ service_.run(); });
}
@@ -508,7 +515,7 @@ TEST_F(ETLServiceAssertTests, WaitForValidatedLedgerIsAbortedLeadToFailToLoadIni
EXPECT_CALL(*extractor_, extractLedgerOnly).Times(0);
EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CALL(*loader_, loadInitialLedger).Times(0);
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CALL(*taskManagerProvider_, make).Times(0);
EXPECT_CLIO_ASSERT_FAIL({ service_.run(); });
}