mirror of
https://github.com/XRPLF/clio.git
synced 2025-12-06 17:27:58 +00:00
@@ -174,4 +174,20 @@ struct LedgerData {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Represents a task for the extractors
|
||||||
|
*/
|
||||||
|
struct Task {
|
||||||
|
/**
|
||||||
|
* @brief Represents the priority of the task
|
||||||
|
*/
|
||||||
|
enum class Priority : uint8_t {
|
||||||
|
Lower = 0u,
|
||||||
|
Higher = 1u,
|
||||||
|
};
|
||||||
|
|
||||||
|
Priority priority;
|
||||||
|
uint32_t seq;
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace etlng::model
|
} // namespace etlng::model
|
||||||
|
|||||||
42
src/etlng/SchedulerInterface.hpp
Normal file
42
src/etlng/SchedulerInterface.hpp
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
//------------------------------------------------------------------------------
|
||||||
|
/*
|
||||||
|
This file is part of clio: https://github.com/XRPLF/clio
|
||||||
|
Copyright (c) 2025, the clio developers.
|
||||||
|
|
||||||
|
Permission to use, copy, modify, and distribute this software for any
|
||||||
|
purpose with or without fee is hereby granted, provided that the above
|
||||||
|
copyright notice and this permission notice appear in all copies.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||||
|
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||||
|
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||||
|
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||||
|
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||||
|
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||||
|
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||||
|
*/
|
||||||
|
//==============================================================================
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "etlng/Models.hpp"
|
||||||
|
|
||||||
|
#include <optional>
|
||||||
|
|
||||||
|
namespace etlng {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief The interface of a scheduler for the extraction proccess
|
||||||
|
*/
|
||||||
|
struct SchedulerInterface {
|
||||||
|
virtual ~SchedulerInterface() = default;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Attempt to obtain the next task
|
||||||
|
* @return A task if one exists; std::nullopt otherwise
|
||||||
|
*/
|
||||||
|
[[nodiscard]] virtual std::optional<model::Task>
|
||||||
|
next() = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace etlng
|
||||||
151
src/etlng/impl/Scheduling.hpp
Normal file
151
src/etlng/impl/Scheduling.hpp
Normal file
@@ -0,0 +1,151 @@
|
|||||||
|
//------------------------------------------------------------------------------
|
||||||
|
/*
|
||||||
|
This file is part of clio: https://github.com/XRPLF/clio
|
||||||
|
Copyright (c) 2025, the clio developers.
|
||||||
|
|
||||||
|
Permission to use, copy, modify, and distribute this software for any
|
||||||
|
purpose with or without fee is hereby granted, provided that the above
|
||||||
|
copyright notice and this permission notice appear in all copies.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||||
|
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||||
|
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||||
|
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||||
|
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||||
|
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||||
|
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||||
|
*/
|
||||||
|
//==============================================================================
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "etl/NetworkValidatedLedgersInterface.hpp"
|
||||||
|
#include "etlng/Models.hpp"
|
||||||
|
#include "etlng/SchedulerInterface.hpp"
|
||||||
|
|
||||||
|
#include <sys/types.h>
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <functional>
|
||||||
|
#include <limits>
|
||||||
|
#include <memory>
|
||||||
|
#include <optional>
|
||||||
|
#include <tuple>
|
||||||
|
#include <type_traits>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
namespace etlng::impl {
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
concept SomeScheduler = std::is_base_of_v<SchedulerInterface, std::decay_t<T>>;
|
||||||
|
|
||||||
|
class ForwardScheduler : public SchedulerInterface {
|
||||||
|
std::reference_wrapper<etl::NetworkValidatedLedgersInterface> ledgers_;
|
||||||
|
|
||||||
|
uint32_t startSeq_;
|
||||||
|
std::optional<uint32_t> maxSeq_;
|
||||||
|
std::atomic_uint32_t seq_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
ForwardScheduler(ForwardScheduler const& other)
|
||||||
|
: ledgers_(other.ledgers_), startSeq_(other.startSeq_), maxSeq_(other.maxSeq_), seq_(other.seq_.load())
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
ForwardScheduler(
|
||||||
|
std::reference_wrapper<etl::NetworkValidatedLedgersInterface> ledgers,
|
||||||
|
uint32_t startSeq,
|
||||||
|
std::optional<uint32_t> maxSeq = std::nullopt
|
||||||
|
)
|
||||||
|
: ledgers_(ledgers), startSeq_(startSeq), maxSeq_(maxSeq), seq_(startSeq)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
[[nodiscard]] std::optional<model::Task>
|
||||||
|
next() override
|
||||||
|
{
|
||||||
|
static constexpr auto kMAX = std::numeric_limits<uint32_t>::max();
|
||||||
|
uint32_t currentSeq = seq_;
|
||||||
|
|
||||||
|
if (ledgers_.get().getMostRecent() >= currentSeq) {
|
||||||
|
while (currentSeq < maxSeq_.value_or(kMAX)) {
|
||||||
|
if (seq_.compare_exchange_weak(currentSeq, currentSeq + 1u, std::memory_order_acq_rel)) {
|
||||||
|
return {{.priority = model::Task::Priority::Higher, .seq = currentSeq}};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class BackfillScheduler : public SchedulerInterface {
|
||||||
|
uint32_t startSeq_;
|
||||||
|
uint32_t minSeq_ = 0u;
|
||||||
|
|
||||||
|
std::atomic_uint32_t seq_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
BackfillScheduler(BackfillScheduler const& other)
|
||||||
|
: startSeq_(other.startSeq_), minSeq_(other.minSeq_), seq_(other.seq_.load())
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
BackfillScheduler(uint32_t startSeq, std::optional<uint32_t> minSeq = std::nullopt)
|
||||||
|
: startSeq_(startSeq), minSeq_(minSeq.value_or(0)), seq_(startSeq)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
[[nodiscard]] std::optional<model::Task>
|
||||||
|
next() override
|
||||||
|
{
|
||||||
|
uint32_t currentSeq = seq_;
|
||||||
|
while (currentSeq > minSeq_) {
|
||||||
|
if (seq_.compare_exchange_weak(currentSeq, currentSeq - 1u, std::memory_order_acq_rel)) {
|
||||||
|
return {{.priority = model::Task::Priority::Lower, .seq = currentSeq}};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
template <SomeScheduler... Schedulers>
|
||||||
|
class SchedulerChain : public SchedulerInterface {
|
||||||
|
std::tuple<Schedulers...> schedulers_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
template <SomeScheduler... Ts>
|
||||||
|
requires(std::is_same_v<Ts, Schedulers> and ...)
|
||||||
|
SchedulerChain(Ts&&... schedulers) : schedulers_(std::forward<Ts>(schedulers)...)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
[[nodiscard]] std::optional<model::Task>
|
||||||
|
next() override
|
||||||
|
{
|
||||||
|
std::optional<model::Task> task;
|
||||||
|
auto const expand = [&](auto& s) {
|
||||||
|
if (task.has_value())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
task = s.next();
|
||||||
|
return task.has_value();
|
||||||
|
};
|
||||||
|
|
||||||
|
std::apply([&expand](auto&&... xs) { (... || expand(xs)); }, schedulers_);
|
||||||
|
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
static auto
|
||||||
|
makeScheduler(SomeScheduler auto&&... schedulers)
|
||||||
|
{
|
||||||
|
return std::make_unique<SchedulerChain<std::decay_t<decltype(schedulers)>...>>(
|
||||||
|
std::forward<decltype(schedulers)>(schedulers)...
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace etlng::impl
|
||||||
@@ -38,6 +38,7 @@ target_sources(
|
|||||||
etlng/ExtractionTests.cpp
|
etlng/ExtractionTests.cpp
|
||||||
etlng/GrpcSourceTests.cpp
|
etlng/GrpcSourceTests.cpp
|
||||||
etlng/RegistryTests.cpp
|
etlng/RegistryTests.cpp
|
||||||
|
etlng/SchedulingTests.cpp
|
||||||
etlng/LoadingTests.cpp
|
etlng/LoadingTests.cpp
|
||||||
# Feed
|
# Feed
|
||||||
util/BytesConverterTests.cpp
|
util/BytesConverterTests.cpp
|
||||||
|
|||||||
187
tests/unit/etlng/SchedulingTests.cpp
Normal file
187
tests/unit/etlng/SchedulingTests.cpp
Normal file
@@ -0,0 +1,187 @@
|
|||||||
|
//------------------------------------------------------------------------------
|
||||||
|
/*
|
||||||
|
This file is part of clio: https://github.com/XRPLF/clio
|
||||||
|
Copyright (c) 2025, the clio developers.
|
||||||
|
|
||||||
|
Permission to use, copy, modify, and distribute this software for any
|
||||||
|
purpose with or without fee is hereby granted, provided that the above
|
||||||
|
copyright notice and this permission notice appear in all copies.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||||
|
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||||
|
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||||
|
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||||
|
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||||
|
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||||
|
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||||
|
*/
|
||||||
|
//==============================================================================
|
||||||
|
|
||||||
|
#include "etlng/Models.hpp"
|
||||||
|
#include "etlng/SchedulerInterface.hpp"
|
||||||
|
#include "etlng/impl/Loading.hpp"
|
||||||
|
#include "etlng/impl/Scheduling.hpp"
|
||||||
|
#include "util/LoggerFixtures.hpp"
|
||||||
|
#include "util/MockNetworkValidatedLedgers.hpp"
|
||||||
|
|
||||||
|
#include <gmock/gmock.h>
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include <functional>
|
||||||
|
#include <memory>
|
||||||
|
#include <optional>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
using namespace etlng;
|
||||||
|
using namespace etlng::model;
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
class FakeScheduler : SchedulerInterface {
|
||||||
|
std::function<std::optional<Task>()> generator_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
FakeScheduler(std::function<std::optional<Task>()> generator) : generator_{std::move(generator)}
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
[[nodiscard]] std::optional<Task>
|
||||||
|
next() override
|
||||||
|
{
|
||||||
|
return generator_();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
struct ForwardSchedulerTests : NoLoggerFixture {
|
||||||
|
protected:
|
||||||
|
MockNetworkValidatedLedgersPtr networkValidatedLedgers_;
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(ForwardSchedulerTests, ExhaustsSchedulerIfMostRecentLedgerIsNewerThanRequestedSequence)
|
||||||
|
{
|
||||||
|
auto scheduler = impl::ForwardScheduler(*networkValidatedLedgers_, 0u, 10u);
|
||||||
|
EXPECT_CALL(*networkValidatedLedgers_, getMostRecent()).Times(11).WillRepeatedly(testing::Return(11u));
|
||||||
|
|
||||||
|
for (auto i = 0u; i < 10u; ++i) {
|
||||||
|
auto maybeTask = scheduler.next();
|
||||||
|
|
||||||
|
EXPECT_TRUE(maybeTask.has_value());
|
||||||
|
EXPECT_EQ(maybeTask->seq, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto const empty = scheduler.next();
|
||||||
|
EXPECT_FALSE(empty.has_value());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ForwardSchedulerTests, ReturnsNulloptIfMostRecentLedgerIsOlderThanRequestedSequence)
|
||||||
|
{
|
||||||
|
auto scheduler = impl::ForwardScheduler(*networkValidatedLedgers_, 0u, 10u);
|
||||||
|
EXPECT_CALL(*networkValidatedLedgers_, getMostRecent()).Times(10).WillRepeatedly(testing::Return(4u));
|
||||||
|
|
||||||
|
for (auto i = 0u; i < 5u; ++i) {
|
||||||
|
auto const maybeTask = scheduler.next();
|
||||||
|
|
||||||
|
EXPECT_TRUE(maybeTask.has_value());
|
||||||
|
EXPECT_EQ(maybeTask->seq, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto i = 0u; i < 5u; ++i)
|
||||||
|
EXPECT_FALSE(scheduler.next().has_value());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(BackfillSchedulerTests, ExhaustsSchedulerUntilMinSeqReached)
|
||||||
|
{
|
||||||
|
auto scheduler = impl::BackfillScheduler(10u, 5u);
|
||||||
|
|
||||||
|
for (auto i = 10u; i > 5u; --i) {
|
||||||
|
auto maybeTask = scheduler.next();
|
||||||
|
|
||||||
|
EXPECT_TRUE(maybeTask.has_value());
|
||||||
|
EXPECT_EQ(maybeTask->seq, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto const empty = scheduler.next();
|
||||||
|
EXPECT_FALSE(empty.has_value());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(BackfillSchedulerTests, ExhaustsSchedulerUntilDefaultMinValueReached)
|
||||||
|
{
|
||||||
|
auto scheduler = impl::BackfillScheduler(10u);
|
||||||
|
|
||||||
|
for (auto i = 10u; i > 0u; --i) {
|
||||||
|
auto const maybeTask = scheduler.next();
|
||||||
|
|
||||||
|
EXPECT_TRUE(maybeTask.has_value());
|
||||||
|
EXPECT_EQ(maybeTask->seq, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto const empty = scheduler.next();
|
||||||
|
EXPECT_FALSE(empty.has_value());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(SchedulerChainTests, ExhaustsOneGenerator)
|
||||||
|
{
|
||||||
|
auto generate = [stop = 10u, seq = 0u]() mutable {
|
||||||
|
std::optional<Task> task = std::nullopt;
|
||||||
|
if (seq < stop)
|
||||||
|
task = Task{.priority = model::Task::Priority::Lower, .seq = seq++};
|
||||||
|
|
||||||
|
return task;
|
||||||
|
};
|
||||||
|
testing::MockFunction<std::optional<Task>()> upToTenGen;
|
||||||
|
EXPECT_CALL(upToTenGen, Call()).Times(11).WillRepeatedly(testing::ByRef(generate));
|
||||||
|
|
||||||
|
auto scheduler = impl::makeScheduler(FakeScheduler(upToTenGen.AsStdFunction()));
|
||||||
|
|
||||||
|
for (auto i = 0u; i < 10u; ++i) {
|
||||||
|
auto const maybeTask = scheduler->next();
|
||||||
|
|
||||||
|
EXPECT_TRUE(maybeTask.has_value());
|
||||||
|
EXPECT_EQ(maybeTask->seq, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto const empty = scheduler->next();
|
||||||
|
EXPECT_FALSE(empty.has_value());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(SchedulerChainTests, ExhaustsFirstSchedulerBeforeUsingSecond)
|
||||||
|
{
|
||||||
|
auto generateFirst = [stop = 10u, seq = 0u]() mutable {
|
||||||
|
std::optional<Task> task = std::nullopt;
|
||||||
|
if (seq < stop)
|
||||||
|
task = Task{.priority = model::Task::Priority::Lower, .seq = seq++};
|
||||||
|
|
||||||
|
return task;
|
||||||
|
};
|
||||||
|
testing::MockFunction<std::optional<Task>()> upToTenGen;
|
||||||
|
EXPECT_CALL(upToTenGen, Call()).Times(21).WillRepeatedly(testing::ByRef(generateFirst));
|
||||||
|
|
||||||
|
auto generateSecond = [seq = 10u]() mutable {
|
||||||
|
std::optional<Task> task = std::nullopt;
|
||||||
|
if (seq > 0u)
|
||||||
|
task = Task{.priority = model::Task::Priority::Lower, .seq = seq--};
|
||||||
|
|
||||||
|
return task;
|
||||||
|
};
|
||||||
|
testing::MockFunction<std::optional<Task>()> downToZeroGen;
|
||||||
|
EXPECT_CALL(downToZeroGen, Call()).Times(11).WillRepeatedly(testing::ByRef(generateSecond));
|
||||||
|
|
||||||
|
auto scheduler =
|
||||||
|
impl::makeScheduler(FakeScheduler(upToTenGen.AsStdFunction()), FakeScheduler(downToZeroGen.AsStdFunction()));
|
||||||
|
|
||||||
|
for (auto i = 0u; i < 10u; ++i) {
|
||||||
|
auto const maybeTask = scheduler->next();
|
||||||
|
|
||||||
|
EXPECT_TRUE(maybeTask.has_value());
|
||||||
|
EXPECT_EQ(maybeTask->seq, i);
|
||||||
|
}
|
||||||
|
for (auto i = 10u; i > 0u; --i) {
|
||||||
|
auto const maybeTask = scheduler->next();
|
||||||
|
|
||||||
|
EXPECT_TRUE(maybeTask.has_value());
|
||||||
|
EXPECT_EQ(maybeTask->seq, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto const empty = scheduler->next();
|
||||||
|
EXPECT_FALSE(empty.has_value());
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user