Compare commits

...

10 Commits

Author SHA1 Message Date
Sergey Kuznetsov
d0f88438ca fix: Check config is valid 2025-01-14 16:13:11 +00:00
Alex Kremer
2cf849dd12 feat: ETLng scheduling (#1820)
For #1596
2025-01-14 15:50:59 +00:00
Peter Chen
c47b96bc68 fix: comment from verify config PR (#1823)
PR here: https://github.com/XRPLF/clio/pull/1814#
2025-01-13 12:58:20 -05:00
Sergey Kuznetsov
9659d98140 fix: Reading log_channels levels from config (#1821) 2025-01-13 16:29:45 +00:00
Peter Chen
f1698c55ff feat: add config verify flag (#1814)
fixes #1806
2025-01-13 09:57:11 -05:00
Alex Kremer
91c00e781a fix: Silence expected use after move warnings (#1819)
Fixes #1818
2025-01-10 15:47:48 +00:00
github-actions[bot]
c0d52723c9 style: clang-tidy auto fixes (#1817)
Fixes #1816. Please review and commit clang-tidy fixes.

Co-authored-by: kuznetsss <15742918+kuznetsss@users.noreply.github.com>
2025-01-10 09:06:05 +00:00
Alex Kremer
590c07ad84 fix: AsyncFramework RAII (#1815)
Fixes #1812
2025-01-09 15:26:25 +00:00
Alex Kremer
48c8d85d0c feat: ETLng loader basics (#1808)
For #1597
2025-01-09 14:47:08 +00:00
Alex Kremer
36a9f40a60 fix: Optimize ledger_range query (#1797) 2025-01-07 14:52:56 +00:00
50 changed files with 2060 additions and 81 deletions

View File

@@ -47,6 +47,7 @@ CliArgs::parse(int argc, char const* argv[])
("conf,c", po::value<std::string>()->default_value(kDEFAULT_CONFIG_PATH), "configuration file")
("ng-web-server,w", "Use ng-web-server")
("migrate", po::value<std::string>(), "start migration helper")
("verify", "Checks the validity of config values")
;
// clang-format on
po::positional_options_description positional;
@@ -75,6 +76,9 @@ CliArgs::parse(int argc, char const* argv[])
return Action{Action::Migrate{.configPath = std::move(configPath), .subCmd = MigrateSubCmd::migration(opt)}};
}
if (parsed.count("verify") != 0u)
return Action{Action::VerifyConfig{.configPath = std::move(configPath)}};
return Action{Action::Run{.configPath = std::move(configPath), .useNgWebServer = parsed.count("ng-web-server") != 0}
};
}

View File

@@ -59,6 +59,11 @@ public:
MigrateSubCmd subCmd;
};
/** @brief Verify Config action. */
struct VerifyConfig {
std::string configPath;
};
/**
* @brief Construct an action from a Run.
*
@@ -66,7 +71,7 @@ public:
*/
template <typename ActionType>
requires std::is_same_v<ActionType, Run> or std::is_same_v<ActionType, Exit> or
std::is_same_v<ActionType, Migrate>
std::is_same_v<ActionType, Migrate> or std::is_same_v<ActionType, VerifyConfig>
explicit Action(ActionType&& action) : action_(std::forward<ActionType>(action))
{
}
@@ -86,7 +91,7 @@ public:
}
private:
std::variant<Run, Exit, Migrate> action_;
std::variant<Run, Exit, Migrate, VerifyConfig> action_;
};
/**

57
src/app/VerifyConfig.hpp Normal file
View File

@@ -0,0 +1,57 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/newconfig/ConfigDefinition.hpp"
#include "util/newconfig/ConfigFileJson.hpp"
#include <cstdlib>
#include <iostream>
#include <string_view>
namespace app {
/**
* @brief Verifies user's config values are correct
*
* @param configPath The path to config
* @return true if config values are all correct, false otherwise
*/
inline bool
verifyConfig(std::string_view configPath)
{
using namespace util::config;
auto const json = ConfigFileJson::makeConfigFileJson(configPath);
if (!json.has_value()) {
std::cerr << "Error parsing json from config: " << configPath << "\n" << json.error().error << std::endl;
return false;
}
auto const errors = gClioConfig.parse(json.value());
if (errors.has_value()) {
for (auto const& err : errors.value()) {
std::cerr << "Issues found in provided config '" << configPath << "':\n";
std::cerr << err.error << std::endl;
}
return false;
}
return true;
}
} // namespace app

View File

@@ -74,7 +74,7 @@ public:
'class': 'SimpleStrategy',
'replication_factor': '{}'
}}
AND durable_writes = true
AND durable_writes = True
)",
settingsProvider_.get().getKeyspace(),
settingsProvider_.get().getReplicationFactor()
@@ -472,7 +472,7 @@ public:
R"(
UPDATE {}
SET sequence = ?
WHERE is_latest = false
WHERE is_latest = False
)",
qualifiedTableName(settingsProvider_.get(), "ledger_range")
));
@@ -776,7 +776,7 @@ public:
R"(
SELECT sequence
FROM {}
WHERE is_latest = true
WHERE is_latest = True
)",
qualifiedTableName(settingsProvider_.get(), "ledger_range")
));
@@ -787,6 +787,7 @@ public:
R"(
SELECT sequence
FROM {}
WHERE is_latest in (True, False)
)",
qualifiedTableName(settingsProvider_.get(), "ledger_range")
));

View File

@@ -134,7 +134,7 @@ ETLService::monitor()
}
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to load initial ledger: " << e.what();
amendmentBlockHandler_.onAmendmentBlock();
amendmentBlockHandler_.notifyAmendmentBlocked();
return;
}

View File

@@ -47,7 +47,7 @@ AmendmentBlockHandler::AmendmentBlockHandler(
}
void
AmendmentBlockHandler::onAmendmentBlock()
AmendmentBlockHandler::notifyAmendmentBlocked()
{
state_.get().isAmendmentBlocked = true;
repeat_.start(interval_, action_);

View File

@@ -53,7 +53,7 @@ public:
);
void
onAmendmentBlock();
notifyAmendmentBlocked();
};
} // namespace etl::impl

View File

@@ -203,7 +203,7 @@ private:
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to build next ledger: " << e.what();
amendmentBlockHandler_.get().onAmendmentBlock();
amendmentBlockHandler_.get().notifyAmendmentBlocked();
return {ripple::LedgerHeader{}, false};
}

View File

@@ -0,0 +1,37 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
namespace etlng {
/**
* @brief The interface of a handler for amendment blocking
*/
struct AmendmentBlockHandlerInterface {
virtual ~AmendmentBlockHandlerInterface() = default;
/**
* @brief The function to call once an amendment block has been discovered
*/
virtual void
notifyAmendmentBlocked() = 0;
};
} // namespace etlng

View File

@@ -1,5 +1,8 @@
add_library(clio_etlng)
target_sources(clio_etlng PRIVATE impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp)
target_sources(
clio_etlng PRIVATE impl/AmendmentBlockHandler.cpp impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp
impl/Loading.cpp
)
target_link_libraries(clio_etlng PUBLIC clio_data)

View File

@@ -0,0 +1,134 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
/** @file */
#pragma once
#include "etl/ETLState.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "rpc/Errors.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>
#include <boost/json/value.hpp>
#include <org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
#include <chrono>
#include <cstdint>
#include <expected>
#include <optional>
#include <string>
#include <vector>
namespace etlng {
/**
* @brief An interface for LoadBalancer
*/
class LoadBalancerInterface {
public:
using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
virtual ~LoadBalancerInterface() = default;
/**
* @brief Load the initial ledger, writing data to the queue.
* @note This function will retry indefinitely until the ledger is downloaded.
*
* @param sequence Sequence of ledger to download
* @param loader InitialLoadObserverInterface implementation
* @param retryAfter Time to wait between retries (2 seconds by default)
* @return A std::vector<std::string> The ledger data
*/
virtual std::vector<std::string>
loadInitialLedger(
uint32_t sequence,
etlng::InitialLoadObserverInterface& loader,
std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
) = 0;
/**
* @brief Load the initial ledger, writing data to the queue.
* @note This function will retry indefinitely until the ledger is downloaded.
*
* @param sequence Sequence of ledger to download
* @param retryAfter Time to wait between retries (2 seconds by default)
* @return A std::vector<std::string> The ledger data
*/
virtual std::vector<std::string>
loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}) = 0;
/**
* @brief Fetch data for a specific ledger.
*
* This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger
* is found in the database, or the server is shutting down.
*
* @param ledgerSequence Sequence of the ledger to fetch
* @param getObjects Whether to get the account state diff between this ledger and the prior one
* @param getObjectNeighbors Whether to request object neighbors
* @param retryAfter Time to wait between retries (2 seconds by default)
* @return The extracted data, if extraction was successful. If the ledger was found
* in the database or the server is shutting down, the optional will be empty
*/
virtual OptionalGetLedgerResponseType
fetchLedger(
uint32_t ledgerSequence,
bool getObjects,
bool getObjectNeighbors,
std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
) = 0;
/**
* @brief Represent the state of this load balancer as a JSON object
*
* @return JSON representation of the state of this load balancer.
*/
virtual boost::json::value
toJson() const = 0;
/**
* @brief Forward a JSON RPC request to a randomly selected rippled node.
*
* @param request JSON-RPC request to forward
* @param clientIp The IP address of the peer, if known
* @param isAdmin Whether the request is from an admin
* @param yield The coroutine context
* @return Response received from rippled node as JSON object on success or error on failure
*/
virtual std::expected<boost::json::object, rpc::ClioError>
forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& clientIp,
bool isAdmin,
boost::asio::yield_context yield
) = 0;
/**
* @brief Return state of ETL nodes.
* @return ETL state, nullopt if etl nodes not available
*/
virtual std::optional<etl::ETLState>
getETLState() noexcept = 0;
};
} // namespace etlng

View File

@@ -0,0 +1,52 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etlng/Models.hpp"
#include <xrpl/protocol/LedgerHeader.h>
#include <optional>
namespace etlng {
/**
* @brief An interface for a ETL Loader
*/
struct LoaderInterface {
virtual ~LoaderInterface() = default;
/**
* @brief Load ledger data
* @param data The data to load
*/
virtual void
load(model::LedgerData const& data) = 0;
/**
* @brief Load the initial ledger
* @param data The data to load
* @return Optional ledger header
*/
virtual std::optional<ripple::LedgerHeader>
loadInitialLedger(model::LedgerData const& data) = 0;
};
} // namespace etlng

View File

@@ -29,6 +29,7 @@
#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/protocol/TxMeta.h>
@@ -79,6 +80,23 @@ struct Transaction {
ripple::uint256 id;
std::string key; // key is the above id as a string of 32 characters
ripple::TxType type;
/**
* @brief Compares Transaction objects to each other without considering sttx and meta fields
* @param other The Transaction to compare to
* @return true if transaction is equivalent; false otherwise
*/
bool
operator==(Transaction const& other) const
{
return raw == other.raw //
and metaRaw == other.metaRaw //
and sttx.getTransactionID() == other.sttx.getTransactionID() //
and meta.getTxID() == other.meta.getTxID() //
and id == other.id //
and key == other.key //
and type == other.type;
}
};
/**
@@ -103,6 +121,9 @@ struct Object {
std::string predecessor;
ModType type;
bool
operator==(Object const&) const = default;
};
/**
@@ -111,6 +132,9 @@ struct Object {
struct BookSuccessor {
std::string firstBook;
std::string bookBase;
bool
operator==(BookSuccessor const&) const = default;
};
/**
@@ -125,6 +149,45 @@ struct LedgerData {
ripple::LedgerHeader header;
std::string rawHeader;
uint32_t seq;
/**
* @brief Compares LedgerData objects to each other without considering the header field
* @param other The LedgerData to compare to
* @return true if data is equivalent; false otherwise
*/
bool
operator==(LedgerData const& other) const
{
auto const serialized = [](auto const& hdr) {
ripple::Serializer ser;
ripple::addRaw(hdr, ser);
return ser.getString();
};
return transactions == other.transactions //
and objects == other.objects //
and successors == other.successors //
and edgeKeys == other.edgeKeys //
and serialized(header) == serialized(other.header) //
and rawHeader == other.rawHeader //
and seq == other.seq;
}
};
/**
* @brief Represents a task for the extractors
*/
struct Task {
/**
* @brief Represents the priority of the task
*/
enum class Priority : uint8_t {
Lower = 0u,
Higher = 1u,
};
Priority priority;
uint32_t seq;
};
} // namespace etlng::model

View File

@@ -0,0 +1,42 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etlng/Models.hpp"
#include <optional>
namespace etlng {
/**
* @brief The interface of a scheduler for the extraction proccess
*/
struct SchedulerInterface {
virtual ~SchedulerInterface() = default;
/**
* @brief Attempt to obtain the next task
* @return A task if one exists; std::nullopt otherwise
*/
[[nodiscard]] virtual std::optional<model::Task>
next() = 0;
};
} // namespace etlng

View File

@@ -0,0 +1,56 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etlng/impl/AmendmentBlockHandler.hpp"
#include "etl/SystemState.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/log/Logger.hpp"
#include <chrono>
#include <functional>
#include <utility>
namespace etlng::impl {
AmendmentBlockHandler::ActionType const AmendmentBlockHandler::kDEFAULT_AMENDMENT_BLOCK_ACTION = []() {
static util::Logger const log{"ETL"}; // NOLINT(readability-identifier-naming)
LOG(log.fatal()) << "Can't process new ledgers: The current ETL source is not compatible with the version of "
<< "the libxrpl Clio is currently using. Please upgrade Clio to a newer version.";
};
AmendmentBlockHandler::AmendmentBlockHandler(
util::async::AnyExecutionContext&& ctx,
etl::SystemState& state,
std::chrono::steady_clock::duration interval,
ActionType action
)
: state_{std::ref(state)}, interval_{interval}, ctx_{std::move(ctx)}, action_{std::move(action)}
{
}
void
AmendmentBlockHandler::notifyAmendmentBlocked()
{
state_.get().isAmendmentBlocked = true;
if (not operation_.has_value())
operation_.emplace(ctx_.executeRepeatedly(interval_, action_));
}
} // namespace etlng::impl

View File

@@ -0,0 +1,69 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etl/SystemState.hpp"
#include "etlng/AmendmentBlockHandlerInterface.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/steady_timer.hpp>
#include <chrono>
#include <functional>
#include <optional>
namespace etlng::impl {
class AmendmentBlockHandler : public AmendmentBlockHandlerInterface {
public:
using ActionType = std::function<void()>;
private:
std::reference_wrapper<etl::SystemState> state_;
std::chrono::steady_clock::duration interval_;
util::async::AnyExecutionContext ctx_;
std::optional<util::async::AnyOperation<void>> operation_;
ActionType action_;
public:
static ActionType const kDEFAULT_AMENDMENT_BLOCK_ACTION;
AmendmentBlockHandler(
util::async::AnyExecutionContext&& ctx,
etl::SystemState& state,
std::chrono::steady_clock::duration interval = std::chrono::seconds{1},
ActionType action = kDEFAULT_AMENDMENT_BLOCK_ACTION
);
~AmendmentBlockHandler() override
{
if (operation_.has_value())
operation_.value().abort();
}
void
notifyAmendmentBlocked() override;
};
} // namespace etlng::impl

View File

@@ -87,14 +87,14 @@ AsyncGrpcCall::process(
if (abort) {
LOG(log_.error()) << "AsyncGrpcCall aborted";
return CallStatus::ERRORED;
return CallStatus::Errored;
}
if (!status_.ok()) {
LOG(log_.error()) << "AsyncGrpcCall status_ not ok: code = " << status_.error_code()
<< " message = " << status_.error_message();
return CallStatus::ERRORED;
return CallStatus::Errored;
}
if (!next_->is_unlimited()) {
@@ -141,7 +141,7 @@ AsyncGrpcCall::process(
predecessorKey_ = lastKey_; // but for ongoing onInitialObjects calls we need to pass along the key we left
// off at so that we can link the two lists correctly
return more ? CallStatus::MORE : CallStatus::DONE;
return more ? CallStatus::More : CallStatus::Done;
}
void

View File

@@ -38,7 +38,7 @@ namespace etlng::impl {
class AsyncGrpcCall {
public:
enum class CallStatus { MORE, DONE, ERRORED };
enum class CallStatus { More, Done, Errored };
using RequestType = org::xrpl::rpc::v1::GetLedgerDataRequest;
using ResponseType = org::xrpl::rpc::v1::GetLedgerDataResponse;
using StubType = org::xrpl::rpc::v1::XRPLedgerAPIService::Stub;

View File

@@ -139,7 +139,7 @@ GrpcSource::loadInitialLedger(
LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix();
auto result = ptr->process(stub_, queue, observer, abort);
if (result != AsyncGrpcCall::CallStatus::MORE) {
if (result != AsyncGrpcCall::CallStatus::More) {
++numFinished;
LOG(log_.debug()) << "Finished a marker. Current number of finished = " << numFinished;
@@ -147,7 +147,7 @@ GrpcSource::loadInitialLedger(
edgeKeys.push_back(std::move(lastKey));
}
if (result == AsyncGrpcCall::CallStatus::ERRORED)
if (result == AsyncGrpcCall::CallStatus::Errored)
abort = true;
}

111
src/etlng/impl/Loading.cpp Normal file
View File

@@ -0,0 +1,111 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etlng/impl/Loading.hpp"
#include "data/BackendInterface.hpp"
#include "etl/LedgerFetcherInterface.hpp"
#include "etl/impl/LedgerLoader.hpp"
#include "etlng/AmendmentBlockHandlerInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/RegistryInterface.hpp"
#include "util/Assert.hpp"
#include "util/LedgerUtils.hpp"
#include "util/Profiler.hpp"
#include "util/log/Logger.hpp"
#include <xrpl/protocol/LedgerHeader.h>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>
namespace etlng::impl {
Loader::Loader(
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<etl::LedgerFetcherInterface> fetcher,
std::shared_ptr<RegistryInterface> registry,
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler
)
: backend_(std::move(backend))
, fetcher_(std::move(fetcher))
, registry_(std::move(registry))
, amendmentBlockHandler_(std::move(amendmentBlockHandler))
{
}
void
Loader::load(model::LedgerData const& data)
{
try {
// perform cache updates and all writes from extensions
registry_->dispatch(data);
auto [success, duration] =
::util::timed<std::chrono::duration<double>>([&]() { return backend_->finishWrites(data.seq); });
LOG(log_.info()) << "Finished writes to DB for " << data.seq << ": " << (success ? "YES" : "NO") << "; took "
<< duration;
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to load " << data.seq << ": " << e.what();
amendmentBlockHandler_->notifyAmendmentBlocked();
}
};
void
Loader::onInitialLoadGotMoreObjects(
uint32_t seq,
std::vector<model::Object> const& data,
std::optional<std::string> lastKey
)
{
LOG(log_.debug()) << "On initial load: got more objects for seq " << seq << ". size = " << data.size();
registry_->dispatchInitialObjects(
seq, data, std::move(lastKey).value_or(std::string{}) // TODO: perhaps use optional all the way to extensions?
);
}
std::optional<ripple::LedgerHeader>
Loader::loadInitialLedger(model::LedgerData const& data)
{
// check that database is actually empty
auto rng = backend_->hardFetchLedgerRangeNoThrow();
if (rng) {
ASSERT(false, "Database is not empty");
return std::nullopt;
}
LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(data.header);
auto seconds = ::util::timed<std::chrono::seconds>([this, &data]() { registry_->dispatchInitialData(data); });
LOG(log_.info()) << "Dispatching initial data and submitting all writes took " << seconds << " seconds.";
backend_->finishWrites(data.seq);
LOG(log_.debug()) << "Loaded initial ledger";
return {data.header};
}
} // namespace etlng::impl

View File

@@ -0,0 +1,85 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/BackendInterface.hpp"
#include "etl/LedgerFetcherInterface.hpp"
#include "etl/impl/LedgerLoader.hpp"
#include "etlng/AmendmentBlockHandlerInterface.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/RegistryInterface.hpp"
#include "util/log/Logger.hpp"
#include <org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/basics/Slice.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/strHex.h>
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/TxMeta.h>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <vector>
namespace etlng::impl {
class Loader : public LoaderInterface, public InitialLoadObserverInterface {
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<etl::LedgerFetcherInterface> fetcher_;
std::shared_ptr<RegistryInterface> registry_;
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler_;
util::Logger log_{"ETL"};
public:
using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
Loader(
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<etl::LedgerFetcherInterface> fetcher,
std::shared_ptr<RegistryInterface> registry,
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler
);
void
load(model::LedgerData const& data) override;
void
onInitialLoadGotMoreObjects(
uint32_t seq,
std::vector<model::Object> const& data,
std::optional<std::string> lastKey
) override;
std::optional<ripple::LedgerHeader>
loadInitialLedger(model::LedgerData const& data) override;
};
} // namespace etlng::impl

View File

@@ -0,0 +1,151 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/SchedulerInterface.hpp"
#include <sys/types.h>
#include <atomic>
#include <cstdint>
#include <functional>
#include <limits>
#include <memory>
#include <optional>
#include <tuple>
#include <type_traits>
#include <utility>
namespace etlng::impl {
template <typename T>
concept SomeScheduler = std::is_base_of_v<SchedulerInterface, std::decay_t<T>>;
class ForwardScheduler : public SchedulerInterface {
std::reference_wrapper<etl::NetworkValidatedLedgersInterface> ledgers_;
uint32_t startSeq_;
std::optional<uint32_t> maxSeq_;
std::atomic_uint32_t seq_;
public:
ForwardScheduler(ForwardScheduler const& other)
: ledgers_(other.ledgers_), startSeq_(other.startSeq_), maxSeq_(other.maxSeq_), seq_(other.seq_.load())
{
}
ForwardScheduler(
std::reference_wrapper<etl::NetworkValidatedLedgersInterface> ledgers,
uint32_t startSeq,
std::optional<uint32_t> maxSeq = std::nullopt
)
: ledgers_(ledgers), startSeq_(startSeq), maxSeq_(maxSeq), seq_(startSeq)
{
}
[[nodiscard]] std::optional<model::Task>
next() override
{
static constexpr auto kMAX = std::numeric_limits<uint32_t>::max();
uint32_t currentSeq = seq_;
if (ledgers_.get().getMostRecent() >= currentSeq) {
while (currentSeq < maxSeq_.value_or(kMAX)) {
if (seq_.compare_exchange_weak(currentSeq, currentSeq + 1u, std::memory_order_acq_rel)) {
return {{.priority = model::Task::Priority::Higher, .seq = currentSeq}};
}
}
}
return std::nullopt;
}
};
class BackfillScheduler : public SchedulerInterface {
uint32_t startSeq_;
uint32_t minSeq_ = 0u;
std::atomic_uint32_t seq_;
public:
BackfillScheduler(BackfillScheduler const& other)
: startSeq_(other.startSeq_), minSeq_(other.minSeq_), seq_(other.seq_.load())
{
}
BackfillScheduler(uint32_t startSeq, std::optional<uint32_t> minSeq = std::nullopt)
: startSeq_(startSeq), minSeq_(minSeq.value_or(0)), seq_(startSeq)
{
}
[[nodiscard]] std::optional<model::Task>
next() override
{
uint32_t currentSeq = seq_;
while (currentSeq > minSeq_) {
if (seq_.compare_exchange_weak(currentSeq, currentSeq - 1u, std::memory_order_acq_rel)) {
return {{.priority = model::Task::Priority::Lower, .seq = currentSeq}};
}
}
return std::nullopt;
}
};
template <SomeScheduler... Schedulers>
class SchedulerChain : public SchedulerInterface {
std::tuple<Schedulers...> schedulers_;
public:
template <SomeScheduler... Ts>
requires(std::is_same_v<Ts, Schedulers> and ...)
SchedulerChain(Ts&&... schedulers) : schedulers_(std::forward<Ts>(schedulers)...)
{
}
[[nodiscard]] std::optional<model::Task>
next() override
{
std::optional<model::Task> task;
auto const expand = [&](auto& s) {
if (task.has_value())
return false;
task = s.next();
return task.has_value();
};
std::apply([&expand](auto&&... xs) { (... || expand(xs)); }, schedulers_);
return task;
}
};
static auto
makeScheduler(SomeScheduler auto&&... schedulers)
{
return std::make_unique<SchedulerChain<std::decay_t<decltype(schedulers)>...>>(
std::forward<decltype(schedulers)>(schedulers)...
);
}
} // namespace etlng::impl

View File

@@ -19,12 +19,12 @@
#include "app/CliArgs.hpp"
#include "app/ClioApplication.hpp"
#include "app/VerifyConfig.hpp"
#include "migration/MigrationApplication.hpp"
#include "rpc/common/impl/HandlerProvider.hpp"
#include "util/TerminationHandler.hpp"
#include "util/log/Logger.hpp"
#include "util/newconfig/ConfigDefinition.hpp"
#include "util/newconfig/ConfigFileJson.hpp"
#include <cstdlib>
#include <exception>
@@ -40,34 +40,25 @@ try {
auto const action = app::CliArgs::parse(argc, argv);
return action.apply(
[](app::CliArgs::Action::Exit const& exit) { return exit.exitCode; },
[](app::CliArgs::Action::VerifyConfig const& verify) {
if (app::verifyConfig(verify.configPath)) {
std::cout << "Config " << verify.configPath << " is correct" << "\n";
return EXIT_SUCCESS;
}
return EXIT_FAILURE;
},
[](app::CliArgs::Action::Run const& run) {
auto const json = ConfigFileJson::makeConfigFileJson(run.configPath);
if (!json.has_value()) {
std::cerr << json.error().error << std::endl;
if (app::verifyConfig(verify.configPath))
return EXIT_FAILURE;
}
auto const errors = gClioConfig.parse(json.value());
if (errors.has_value()) {
for (auto const& err : errors.value())
std::cerr << err.error << std::endl;
return EXIT_FAILURE;
}
util::LogService::init(gClioConfig);
app::ClioApplication clio{gClioConfig};
return clio.run(run.useNgWebServer);
},
[](app::CliArgs::Action::Migrate const& migrate) {
auto const json = ConfigFileJson::makeConfigFileJson(migrate.configPath);
if (!json.has_value()) {
std::cerr << json.error().error << std::endl;
if (app::verifyConfig(verify.configPath))
return EXIT_FAILURE;
}
auto const errors = gClioConfig.parse(json.value());
if (errors.has_value()) {
for (auto const& err : errors.value())
std::cerr << err.error << std::endl;
return EXIT_FAILURE;
}
util::LogService::init(gClioConfig);
app::MigratorApplication migrator{gClioConfig, migrate.subCmd};
return migrator.run();

78
src/util/MoveTracker.hpp Normal file
View File

@@ -0,0 +1,78 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <utility>
namespace util {
/**
* @brief A base-class that can be used to check whether the current instance was moved from
*/
class MoveTracker {
bool wasMoved_ = false;
protected:
/**
* @brief The function to be used by clients in order to check whether the instance was moved from
* @return true if moved from; false otherwise
*/
[[nodiscard]] bool
wasMoved() const noexcept
{
return wasMoved_;
}
MoveTracker() = default; // should only be used via inheritance
public:
virtual ~MoveTracker() = default;
/**
* @brief Move constructor sets the moved-from state on `other` and resets the state on `this`
* @param other The moved-from object
*/
MoveTracker(MoveTracker&& other)
{
*this = std::move(other);
}
/**
* @brief Move operator sets the moved-from state on `other` and resets the state on `this`
* @param other The moved-from object
* @return Reference to self
*/
MoveTracker&
operator=(MoveTracker&& other)
{
if (this != &other) {
other.wasMoved_ = true;
wasMoved_ = false;
}
return *this;
}
MoveTracker(MoveTracker const&) = default;
MoveTracker&
operator=(MoveTracker const&) = default;
};
} // namespace util

View File

@@ -26,6 +26,7 @@ Repeat::stop()
{
if (control_->stopping)
return;
control_->stopping = true;
control_->timer.cancel();
control_->semaphore.acquire();

View File

@@ -30,6 +30,7 @@
#include <concepts>
#include <memory>
#include <semaphore>
#include <utility>
namespace util {
@@ -48,7 +49,7 @@ class Repeat {
}
};
std::unique_ptr<Control> control_;
std::shared_ptr<Control> control_;
public:
/**
@@ -89,23 +90,23 @@ public:
{
ASSERT(control_->stopping, "Should be stopped before starting");
control_->stopping = false;
startImpl(interval, std::forward<Action>(action));
startImpl(control_, interval, std::forward<Action>(action));
}
private:
template <std::invocable Action>
void
startImpl(std::chrono::steady_clock::duration interval, Action&& action)
static void
startImpl(std::shared_ptr<Control> control, std::chrono::steady_clock::duration interval, Action&& action)
{
control_->timer.expires_after(interval);
control_->timer.async_wait([this, interval, action = std::forward<Action>(action)](auto const& ec) mutable {
if (ec or control_->stopping) {
control_->semaphore.release();
control->timer.expires_after(interval);
control->timer.async_wait([control, interval, action = std::forward<Action>(action)](auto const& ec) mutable {
if (ec or control->stopping) {
control->semaphore.release();
return;
}
action();
startImpl(interval, std::forward<Action>(action));
startImpl(std::move(control), interval, std::forward<Action>(action));
});
}
};

View File

@@ -24,6 +24,7 @@
#include <boost/asio/spawn.hpp>
#include <chrono>
#include <memory>
#include <type_traits>
#include <utility>

View File

@@ -19,9 +19,9 @@
#pragma once
#include "util/MoveTracker.hpp"
#include "util/Repeat.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/Error.hpp"
#include "util/async/Outcome.hpp"
#include "util/async/context/impl/Cancellation.hpp"
#include "util/async/context/impl/Timer.hpp"
@@ -36,7 +36,6 @@
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
namespace util::async {
namespace impl {
@@ -71,7 +70,7 @@ public:
};
template <typename CtxType, typename OpType>
struct BasicScheduledOperation {
struct BasicScheduledOperation : util::MoveTracker {
class State {
std::mutex m_;
std::condition_variable ready_;
@@ -105,6 +104,19 @@ struct BasicScheduledOperation {
{
}
~BasicScheduledOperation() override
{
if (not wasMoved())
abort();
}
BasicScheduledOperation(BasicScheduledOperation const&) = default;
BasicScheduledOperation&
operator=(BasicScheduledOperation const&) = default;
BasicScheduledOperation(BasicScheduledOperation&&) = default;
BasicScheduledOperation&
operator=(BasicScheduledOperation&&) = default;
[[nodiscard]] auto
get()
{
@@ -149,7 +161,8 @@ struct BasicScheduledOperation {
* @tparam StopSourceType The type of the stop source
*/
template <typename RetType, typename StopSourceType>
class StoppableOperation : public impl::BasicOperation<StoppableOutcome<RetType, StopSourceType>> {
class StoppableOperation : public impl::BasicOperation<StoppableOutcome<RetType, StopSourceType>>,
public util::MoveTracker {
using OutcomeType = StoppableOutcome<RetType, StopSourceType>;
StopSourceType stopSource_;
@@ -165,6 +178,19 @@ public:
{
}
~StoppableOperation() override
{
if (not wasMoved())
requestStop();
}
StoppableOperation(StoppableOperation const&) = delete;
StoppableOperation&
operator=(StoppableOperation const&) = delete;
StoppableOperation(StoppableOperation&&) = default;
StoppableOperation&
operator=(StoppableOperation&&) = default;
/** @brief Requests the operation to stop */
void
requestStop() noexcept
@@ -199,7 +225,7 @@ using ScheduledOperation = impl::BasicScheduledOperation<CtxType, OpType>;
* @tparam CtxType The type of the execution context
*/
template <typename CtxType>
class RepeatingOperation {
class RepeatingOperation : public util::MoveTracker {
util::Repeat repeat_;
public:
@@ -217,6 +243,12 @@ public:
repeat_.start(interval, std::forward<decltype(fn)>(fn));
}
~RepeatingOperation() override
{
if (not wasMoved())
abort();
}
RepeatingOperation(RepeatingOperation const&) = delete;
RepeatingOperation&
operator=(RepeatingOperation const&) = delete;

View File

@@ -19,8 +19,10 @@
#pragma once
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <atomic>
#include <memory>

View File

@@ -167,12 +167,12 @@ LogService::init(config::ClioConfigDefinition const& config)
auto const overrides = config.getArray("log_channels");
for (auto it = overrides.begin<util::config::ObjectView>(); it != overrides.end<util::config::ObjectView>(); ++it) {
auto const& cfg = *it;
auto name = cfg.get<std::string>("channel");
auto const& channelConfig = *it;
auto const name = channelConfig.get<std::string>("channel");
if (std::count(std::begin(Logger::kCHANNELS), std::end(Logger::kCHANNELS), name) == 0)
throw std::runtime_error("Can't override settings for log channel " + name + ": invalid channel");
minSeverity[name] = getSeverityLevel(config.get<std::string>("log_level"));
minSeverity[name] = getSeverityLevel(channelConfig.get<std::string>("log_level"));
}
auto logFilter = [minSeverity = std::move(minSeverity),

View File

@@ -416,6 +416,7 @@ static ClioConfigDefinition gClioConfig = ClioConfigDefinition{
ConfigValue{ConfigType::Integer}.defaultValue(rpc::kAPI_VERSION_MIN).withConstraint(gValidateApiVersion)},
{"api_version.max",
ConfigValue{ConfigType::Integer}.defaultValue(rpc::kAPI_VERSION_MAX).withConstraint(gValidateApiVersion)},
{"migration.full_scan_threads", ConfigValue{ConfigType::Integer}.defaultValue(2).withConstraint(gValidateUint32)},
{"migration.full_scan_jobs", ConfigValue{ConfigType::Integer}.defaultValue(4).withConstraint(gValidateUint32)},
{"migration.cursors_per_job", ConfigValue{ConfigType::Integer}.defaultValue(100).withConstraint(gValidateUint32)}},

View File

@@ -211,7 +211,7 @@ createObject()
.dataRaw = hexStringToBinaryString(kOBJ_BLOB),
.successor = hexStringToBinaryString(kOBJ_SUCC),
.predecessor = hexStringToBinaryString(kOBJ_PRED),
.type = {},
.type = etlng::model::Object::ModType::Created,
};
}

View File

@@ -91,7 +91,7 @@ protected:
checkEqual(std::string expected)
{
auto value = buffer_.getStrAndReset();
ASSERT_EQ(value, expected + '\n');
ASSERT_EQ(value, expected + '\n') << "Got: " << value;
}
void

View File

@@ -19,8 +19,10 @@
#pragma once
#include "etlng/AmendmentBlockHandlerInterface.hpp"
#include <gmock/gmock.h>
struct MockAmendmentBlockHandler {
MOCK_METHOD(void, onAmendmentBlock, (), ());
struct MockAmendmentBlockHandler : etlng::AmendmentBlockHandlerInterface {
MOCK_METHOD(void, notifyAmendmentBlocked, (), (override));
};

View File

@@ -20,11 +20,15 @@
#pragma once
#include "util/LoggerFixtures.hpp"
#include "util/MockAmendmentBlockHandler.hpp"
#include "util/MockETLService.hpp"
#include "util/MockLedgerFetcher.hpp"
#include "util/MockLoadBalancer.hpp"
#include <gmock/gmock.h>
#include <memory>
/**
* @brief Fixture with a mock etl service
*/
@@ -63,3 +67,28 @@ struct MockLoadBalancerTest : virtual public NoLoggerFixture {
protected:
std::shared_ptr<MockLoadBalancer> mockLoadBalancerPtr_ = std::make_shared<MockLoadBalancer>();
};
/**
* @brief Fixture with a mock NG etl balancer
*/
struct MockNgLoadBalancerTest : virtual public NoLoggerFixture {
protected:
std::shared_ptr<MockNgLoadBalancer> mockLoadBalancerPtr_ = std::make_shared<MockNgLoadBalancer>();
};
/**
* @brief Fixture with a mock ledger fetcher
*/
struct MockLedgerFetcherTest : virtual public NoLoggerFixture {
protected:
std::shared_ptr<MockNgLedgerFetcher> mockLedgerFetcherPtr_ = std::make_shared<MockNgLedgerFetcher>();
};
/**
* @brief Fixture with a mock ledger fetcher
*/
struct MockAmendmentBlockHandlerTest : virtual public NoLoggerFixture {
protected:
std::shared_ptr<MockAmendmentBlockHandler> mockAmendmentBlockHandlerPtr_ =
std::make_shared<MockAmendmentBlockHandler>();
};

View File

@@ -19,6 +19,7 @@
#pragma once
#include "etl/LedgerFetcherInterface.hpp"
#include "util/FakeFetchResponse.hpp"
#include <gmock/gmock.h>
@@ -30,3 +31,8 @@ struct MockLedgerFetcher {
MOCK_METHOD(std::optional<FakeFetchResponse>, fetchData, (uint32_t), ());
MOCK_METHOD(std::optional<FakeFetchResponse>, fetchDataAndDiff, (uint32_t), ());
};
struct MockNgLedgerFetcher : etl::LedgerFetcherInterface {
MOCK_METHOD(OptionalGetLedgerResponseType, fetchData, (uint32_t), (override));
MOCK_METHOD(OptionalGetLedgerResponseType, fetchDataAndDiff, (uint32_t), (override));
};

View File

@@ -19,6 +19,9 @@
#pragma once
#include "etl/ETLState.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "rpc/Errors.hpp"
#include "util/FakeFetchResponse.hpp"
@@ -28,10 +31,12 @@
#include <boost/json/value.hpp>
#include <gmock/gmock.h>
#include <chrono>
#include <cstdint>
#include <expected>
#include <optional>
#include <string>
#include <vector>
struct MockLoadBalancer {
using RawLedgerObjectType = FakeLedgerObject;
@@ -48,3 +53,36 @@ struct MockLoadBalancer {
(const)
);
};
struct MockNgLoadBalancer : etlng::LoadBalancerInterface {
using RawLedgerObjectType = FakeLedgerObject;
MOCK_METHOD(
std::vector<std::string>,
loadInitialLedger,
(uint32_t, etlng::InitialLoadObserverInterface&, std::chrono::steady_clock::duration),
(override)
);
MOCK_METHOD(
std::vector<std::string>,
loadInitialLedger,
(uint32_t, std::chrono::steady_clock::duration),
(override)
);
MOCK_METHOD(
OptionalGetLedgerResponseType,
fetchLedger,
(uint32_t, bool, bool, std::chrono::steady_clock::duration),
(override)
);
MOCK_METHOD(boost::json::value, toJson, (), (const, override));
MOCK_METHOD(std::optional<etl::ETLState>, getETLState, (), (noexcept, override));
using ForwardToRippledReturnType = std::expected<boost::json::object, rpc::ClioError>;
MOCK_METHOD(
ForwardToRippledReturnType,
forwardToRippled,
(boost::json::object const&, std::optional<std::string> const&, bool, boost::asio::yield_context),
(override)
);
};

View File

@@ -52,7 +52,7 @@ protected:
R"(
CREATE KEYSPACE IF NOT EXISTS {}
WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': '1'}}
AND durable_writes = true
AND durable_writes = True
)",
keyspace
);
@@ -211,7 +211,7 @@ TEST_F(BackendCassandraBaseTest, KeyspaceManipulation)
R"(
CREATE KEYSPACE {}
WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': '1'}}
AND durable_writes = true
AND durable_writes = True
)",
keyspace
);

View File

@@ -5,6 +5,7 @@ target_sources(
PRIVATE # Common
ConfigTests.cpp
app/CliArgsTests.cpp
app/VerifyConfigTests.cpp
app/WebHandlersTests.cpp
data/AmendmentCenterTests.cpp
data/BackendCountersTests.cpp
@@ -33,9 +34,12 @@ target_sources(
etl/SubscriptionSourceTests.cpp
etl/TransformerTests.cpp
# ETLng
etlng/AmendmentBlockHandlerTests.cpp
etlng/ExtractionTests.cpp
etlng/GrpcSourceTests.cpp
etlng/RegistryTests.cpp
etlng/SchedulingTests.cpp
etlng/LoadingTests.cpp
# Feed
util/BytesConverterTests.cpp
feed/BookChangesFeedTests.cpp
@@ -112,8 +116,6 @@ target_sources(
rpc/RPCHelpersTests.cpp
rpc/WorkQueueTests.cpp
test_data/SslCert.cpp
util/AccountUtilsTests.cpp
util/AssertTests.cpp
# Async framework
util/async/AnyExecutionContextTests.cpp
util/async/AnyOperationTests.cpp
@@ -138,6 +140,10 @@ target_sources(
util/requests/RequestBuilderTests.cpp
util/requests/SslContextTests.cpp
util/requests/WsConnectionTests.cpp
# Common utils
util/AccountUtilsTests.cpp
util/AssertTests.cpp
util/MoveTrackerTests.cpp
util/RandomTests.cpp
util/RetryTests.cpp
util/RepeatTests.cpp

View File

@@ -19,8 +19,20 @@
#include "util/LoggerFixtures.hpp"
#include "util/log/Logger.hpp"
#include "util/newconfig/Array.hpp"
#include "util/newconfig/ConfigConstraints.hpp"
#include "util/newconfig/ConfigDefinition.hpp"
#include "util/newconfig/ConfigFileJson.hpp"
#include "util/newconfig/ConfigValue.hpp"
#include "util/newconfig/Types.hpp"
#include <boost/json/object.hpp>
#include <boost/json/parse.hpp>
#include <fmt/core.h>
#include <gtest/gtest.h>
#include <string>
#include <string_view>
using namespace util;
// Used as a fixture for tests with enabled logging
@@ -56,6 +68,106 @@ TEST_F(LoggerTest, Filtering)
checkEqual("Trace:TRC Trace line logged for 'Trace' component");
}
using util::config::Array;
using util::config::ConfigFileJson;
using util::config::ConfigType;
using util::config::ConfigValue;
struct LoggerInitTest : LoggerTest {
protected:
util::config::ClioConfigDefinition config_{
{"log_channels.[].channel", Array{ConfigValue{ConfigType::String}.optional()}},
{"log_channels.[].log_level", Array{ConfigValue{ConfigType::String}.optional()}},
{"log_level", ConfigValue{ConfigType::String}.defaultValue("info")},
{"log_format",
ConfigValue{ConfigType::String}.defaultValue(
R"(%TimeStamp% (%SourceLocation%) [%ThreadID%] %Channel%:%Severity% %Message%)"
)},
{"log_to_console", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
{"log_directory", ConfigValue{ConfigType::String}.optional()},
{"log_rotation_size", ConfigValue{ConfigType::Integer}.defaultValue(2048)},
{"log_directory_max_size", ConfigValue{ConfigType::Integer}.defaultValue(50 * 1024)},
{"log_rotation_hour_interval", ConfigValue{ConfigType::Integer}.defaultValue(12)},
{"log_tag_style", ConfigValue{ConfigType::String}.defaultValue("none")},
};
};
TEST_F(LoggerInitTest, DefaultLogLevel)
{
auto const parsingErrors = config_.parse(ConfigFileJson{boost::json::object{{"log_level", "warn"}}});
ASSERT_FALSE(parsingErrors.has_value());
std::string const logString = "some log";
LogService::init(config_);
for (auto const& channel : Logger::kCHANNELS) {
Logger const log{channel};
log.trace() << logString;
checkEmpty();
log.debug() << logString;
checkEmpty();
log.info() << logString;
checkEmpty();
log.warn() << logString;
checkEqual(fmt::format("{}:WRN {}", channel, logString));
log.error() << logString;
checkEqual(fmt::format("{}:ERR {}", channel, logString));
}
}
TEST_F(LoggerInitTest, ChannelLogLevel)
{
std::string const configStr = R"json(
{
"log_level": "error",
"log_channels": [
{
"channel": "Backend",
"log_level": "warning"
}
]
}
)json";
auto const parsingErrors = config_.parse(ConfigFileJson{boost::json::parse(configStr).as_object()});
ASSERT_FALSE(parsingErrors.has_value());
std::string const logString = "some log";
LogService::init(config_);
for (auto const& channel : Logger::kCHANNELS) {
Logger const log{channel};
log.trace() << logString;
checkEmpty();
log.debug() << logString;
checkEmpty();
log.info() << logString;
checkEmpty();
log.warn() << logString;
if (std::string_view{channel} == "Backend") {
checkEqual(fmt::format("{}:WRN {}", channel, logString));
} else {
checkEmpty();
}
log.error() << "some log";
checkEqual(fmt::format("{}:ERR {}", channel, logString));
}
}
#ifndef COVERAGE_ENABLED
TEST_F(LoggerTest, LOGMacro)
{

View File

@@ -32,6 +32,7 @@ struct CliArgsTests : testing::Test {
testing::StrictMock<testing::MockFunction<int(CliArgs::Action::Run)>> onRunMock;
testing::StrictMock<testing::MockFunction<int(CliArgs::Action::Exit)>> onExitMock;
testing::StrictMock<testing::MockFunction<int(CliArgs::Action::Migrate)>> onMigrateMock;
testing::StrictMock<testing::MockFunction<int(CliArgs::Action::VerifyConfig)>> onVerifyMock;
};
TEST_F(CliArgsTests, Parse_NoArgs)
@@ -46,7 +47,13 @@ TEST_F(CliArgsTests, Parse_NoArgs)
return returnCode;
});
EXPECT_EQ(
action.apply(onRunMock.AsStdFunction(), onExitMock.AsStdFunction(), onMigrateMock.AsStdFunction()), returnCode
action.apply(
onRunMock.AsStdFunction(),
onExitMock.AsStdFunction(),
onMigrateMock.AsStdFunction(),
onVerifyMock.AsStdFunction()
),
returnCode
);
}
@@ -62,7 +69,12 @@ TEST_F(CliArgsTests, Parse_NgWebServer)
return returnCode;
});
EXPECT_EQ(
action.apply(onRunMock.AsStdFunction(), onExitMock.AsStdFunction(), onMigrateMock.AsStdFunction()),
action.apply(
onRunMock.AsStdFunction(),
onExitMock.AsStdFunction(),
onMigrateMock.AsStdFunction(),
onVerifyMock.AsStdFunction()
),
returnCode
);
}
@@ -79,7 +91,12 @@ TEST_F(CliArgsTests, Parse_VersionHelp)
EXPECT_CALL(onExitMock, Call).WillOnce([](CliArgs::Action::Exit const& exit) { return exit.exitCode; });
EXPECT_EQ(
action.apply(onRunMock.AsStdFunction(), onExitMock.AsStdFunction(), onMigrateMock.AsStdFunction()),
action.apply(
onRunMock.AsStdFunction(),
onExitMock.AsStdFunction(),
onMigrateMock.AsStdFunction(),
onVerifyMock.AsStdFunction()
),
EXIT_SUCCESS
);
}
@@ -97,6 +114,34 @@ TEST_F(CliArgsTests, Parse_Config)
return returnCode;
});
EXPECT_EQ(
action.apply(onRunMock.AsStdFunction(), onExitMock.AsStdFunction(), onMigrateMock.AsStdFunction()), returnCode
action.apply(
onRunMock.AsStdFunction(),
onExitMock.AsStdFunction(),
onMigrateMock.AsStdFunction(),
onVerifyMock.AsStdFunction()
),
returnCode
);
}
TEST_F(CliArgsTests, Parse_VerifyConfig)
{
std::string_view configPath = "some_config_path";
std::array argv{"clio_server", configPath.data(), "--verify"}; // NOLINT(bugprone-suspicious-stringview-data-usage)
auto const action = CliArgs::parse(argv.size(), argv.data());
int const returnCode = 123;
EXPECT_CALL(onVerifyMock, Call).WillOnce([&configPath](CliArgs::Action::VerifyConfig const& verify) {
EXPECT_EQ(verify.configPath, configPath);
return returnCode;
});
EXPECT_EQ(
action.apply(
onRunMock.AsStdFunction(),
onExitMock.AsStdFunction(),
onMigrateMock.AsStdFunction(),
onVerifyMock.AsStdFunction()
),
returnCode
);
}

View File

@@ -0,0 +1,69 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "app/VerifyConfig.hpp"
#include "util/TmpFile.hpp"
#include "util/newconfig/FakeConfigData.hpp"
#include <gtest/gtest.h>
using namespace app;
using namespace util::config;
TEST(VerifyConfigTest, InvalidConfig)
{
auto const tmpConfigFile = TmpFile(kJSON_DATA);
// false because json data(kJSON_DATA) is not compatible with current configDefintion
EXPECT_FALSE(verifyConfig(tmpConfigFile.path));
}
TEST(VerifyConfigTest, ValidConfig)
{
// used to Verify Config test
static constexpr auto kVALID_JSON_DATA = R"JSON({
"server": {
"ip": "0.0.0.0",
"port": 51233
}
})JSON";
auto const tmpConfigFile = TmpFile(kVALID_JSON_DATA);
// current example config should always be compatible with configDefinition
EXPECT_TRUE(verifyConfig(tmpConfigFile.path));
}
TEST(VerifyConfigTest, ConfigFileNotExist)
{
EXPECT_FALSE(verifyConfig("doesn't exist Config File"));
}
TEST(VerifyConfigTest, InvalidJsonFile)
{
// invalid json because extra "," after 51233
static constexpr auto kINVALID_JSON = R"({
"server": {
"ip": "0.0.0.0",
"port": 51233,
}
})";
auto const tmpConfigFile = TmpFile(kINVALID_JSON);
EXPECT_FALSE(verifyConfig(tmpConfigFile.path));
}

View File

@@ -36,13 +36,13 @@ struct AmendmentBlockHandlerTest : util::prometheus::WithPrometheus, SyncAsioCon
etl::SystemState state;
};
TEST_F(AmendmentBlockHandlerTest, CallToOnAmendmentBlockSetsStateAndRepeatedlyCallsAction)
TEST_F(AmendmentBlockHandlerTest, CallTonotifyAmendmentBlockedSetsStateAndRepeatedlyCallsAction)
{
AmendmentBlockHandler handler{ctx_, state, std::chrono::nanoseconds{1}, actionMock.AsStdFunction()};
EXPECT_FALSE(state.isAmendmentBlocked);
EXPECT_CALL(actionMock, Call()).Times(testing::AtLeast(10));
handler.onAmendmentBlock();
handler.notifyAmendmentBlocked();
EXPECT_TRUE(state.isAmendmentBlocked);
runContextFor(std::chrono::milliseconds{1});

View File

@@ -0,0 +1,69 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/SystemState.hpp"
#include "etlng/impl/AmendmentBlockHandler.hpp"
#include "util/LoggerFixtures.hpp"
#include "util/MockPrometheus.hpp"
#include "util/async/context/BasicExecutionContext.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <chrono>
#include <cstddef>
#include <semaphore>
using namespace etlng::impl;
struct AmendmentBlockHandlerNgTests : util::prometheus::WithPrometheus {
protected:
testing::StrictMock<testing::MockFunction<void()>> actionMock_;
etl::SystemState state_;
util::async::CoroExecutionContext ctx_;
};
TEST_F(AmendmentBlockHandlerNgTests, CallTonotifyAmendmentBlockedSetsStateAndRepeatedlyCallsAction)
{
constexpr static auto kMAX_ITERATIONS = 10uz;
etlng::impl::AmendmentBlockHandler handler{ctx_, state_, std::chrono::nanoseconds{1}, actionMock_.AsStdFunction()};
auto counter = 0uz;
std::binary_semaphore stop{0};
EXPECT_FALSE(state_.isAmendmentBlocked);
EXPECT_CALL(actionMock_, Call()).Times(testing::AtLeast(10)).WillRepeatedly([&]() {
if (++counter; counter > kMAX_ITERATIONS)
stop.release();
});
handler.notifyAmendmentBlocked();
stop.acquire(); // wait for the counter to reach over kMAX_ITERATIONS
EXPECT_TRUE(state_.isAmendmentBlocked);
}
struct DefaultAmendmentBlockActionNgTest : LoggerFixture {};
TEST_F(DefaultAmendmentBlockActionNgTest, Call)
{
AmendmentBlockHandler::kDEFAULT_AMENDMENT_BLOCK_ACTION();
auto const loggerString = getLoggerString();
EXPECT_TRUE(loggerString.starts_with("ETL:FTL Can't process new ledgers")) << "LoggerString " << loggerString;
}

View File

@@ -24,10 +24,12 @@
#include "etlng/impl/Extraction.hpp"
#include "util/BinaryTestObject.hpp"
#include "util/LoggerFixtures.hpp"
#include "util/TestObject.hpp"
#include <gmock/gmock.h>
#include <google/protobuf/repeated_ptr_field.h>
#include <gtest/gtest.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/strHex.h>
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
@@ -38,14 +40,146 @@
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <vector>
namespace {
constinit auto const kLEDGER_HASH = "4BC50C9B0D8515D3EAAE1E74B29A95804346C491EE1A95BF25E4AAB854A6A652";
constinit auto const kLEDGER_HASH2 = "1B8590C01B0006EDFA9ED60296DD052DC5E90F99659B25014D08E1BC983515BC";
constinit auto const kSEQ = 30;
} // namespace
struct ExtractionTests : NoLoggerFixture {};
struct ExtractionModelNgTests : NoLoggerFixture {};
TEST_F(ExtractionTests, ModType)
TEST_F(ExtractionModelNgTests, LedgerDataCopyableAndEquatable)
{
auto const first = etlng::model::LedgerData{
.transactions =
{util::createTransaction(ripple::TxType::ttNFTOKEN_BURN),
util::createTransaction(ripple::TxType::ttNFTOKEN_BURN),
util::createTransaction(ripple::TxType::ttNFTOKEN_CREATE_OFFER)},
.objects = {util::createObject(), util::createObject(), util::createObject()},
.successors = std::vector<etlng::model::BookSuccessor>{{.firstBook = "first", .bookBase = "base"}},
.edgeKeys = std::vector<std::string>{"key1", "key2"},
.header = createLedgerHeader(kLEDGER_HASH, kSEQ, 1),
.rawHeader = {1, 2, 3},
.seq = kSEQ
};
auto const second = first;
EXPECT_EQ(first, second);
{
auto third = second;
third.transactions.clear();
EXPECT_NE(first, third);
}
{
auto third = second;
third.objects = {util::createObject()};
EXPECT_NE(first, third);
}
{
auto third = second;
third.successors = std::vector<etlng::model::BookSuccessor>{{.firstBook = "second", .bookBase = "base"}};
EXPECT_NE(first, third);
}
{
auto third = second;
third.edgeKeys = std::vector<std::string>{"key1"};
EXPECT_NE(first, third);
}
{
auto third = second;
third.header = createLedgerHeader(kLEDGER_HASH2, kSEQ, 2);
EXPECT_NE(first, third);
}
{
auto third = second;
third.rawHeader = {2, 3, 4};
EXPECT_NE(first, third);
}
{
auto third = second;
third.seq = kSEQ - 1;
EXPECT_NE(first, third);
}
}
TEST_F(ExtractionModelNgTests, TransactionIsEquatable)
{
auto const tx = std::vector{util::createTransaction(ripple::TxType::ttNFTOKEN_BURN)};
auto other = tx;
EXPECT_EQ(tx, other);
other.push_back(util::createTransaction(ripple::TxType::ttNFTOKEN_ACCEPT_OFFER));
EXPECT_NE(tx, other);
}
TEST_F(ExtractionModelNgTests, ObjectCopyableAndEquatable)
{
auto const obj = util::createObject();
auto const other = obj;
EXPECT_EQ(obj, other);
{
auto third = other;
third.key = ripple::uint256{42};
EXPECT_NE(obj, third);
}
{
auto third = other;
third.keyRaw = "key";
EXPECT_NE(obj, third);
}
{
auto third = other;
third.data = {2, 3};
EXPECT_NE(obj, third);
}
{
auto third = other;
third.dataRaw = "something";
EXPECT_NE(obj, third);
}
{
auto third = other;
third.successor = "succ";
EXPECT_NE(obj, third);
}
{
auto third = other;
third.predecessor = "pred";
EXPECT_NE(obj, third);
}
{
auto third = other;
third.type = etlng::model::Object::ModType::Deleted;
EXPECT_NE(obj, third);
}
}
TEST_F(ExtractionModelNgTests, BookSuccessorCopyableAndEquatable)
{
auto const succ = etlng::model::BookSuccessor{.firstBook = "first", .bookBase = "base"};
auto const other = succ;
EXPECT_EQ(succ, other);
{
auto third = other;
third.bookBase = "all your base are belong to us";
EXPECT_NE(succ, third);
}
{
auto third = other;
third.firstBook = "not the first book";
EXPECT_NE(succ, third);
}
}
struct ExtractionNgTests : NoLoggerFixture {};
TEST_F(ExtractionNgTests, ModType)
{
using namespace etlng::impl;
using ModType = etlng::model::Object::ModType;
@@ -56,7 +190,7 @@ TEST_F(ExtractionTests, ModType)
EXPECT_EQ(extractModType(PBObjType::UNSPECIFIED), ModType::Unspecified);
}
TEST_F(ExtractionTests, OneTransaction)
TEST_F(ExtractionNgTests, OneTransaction)
{
using namespace etlng::impl;
@@ -74,7 +208,7 @@ TEST_F(ExtractionTests, OneTransaction)
EXPECT_EQ(res.sttx.getTxnType(), expected.sttx.getTxnType());
}
TEST_F(ExtractionTests, MultipleTransactions)
TEST_F(ExtractionNgTests, MultipleTransactions)
{
using namespace etlng::impl;
@@ -102,7 +236,7 @@ TEST_F(ExtractionTests, MultipleTransactions)
}
}
TEST_F(ExtractionTests, OneObject)
TEST_F(ExtractionNgTests, OneObject)
{
using namespace etlng::impl;
@@ -110,6 +244,9 @@ TEST_F(ExtractionTests, OneObject)
auto original = org::xrpl::rpc::v1::RawLedgerObject();
original.set_data(expected.dataRaw);
original.set_key(expected.keyRaw);
original.set_mod_type(
org::xrpl::rpc::v1::RawLedgerObject::ModificationType::RawLedgerObject_ModificationType_CREATED
);
auto res = extractObj(original);
EXPECT_EQ(ripple::strHex(res.key), ripple::strHex(expected.keyRaw));
@@ -119,7 +256,7 @@ TEST_F(ExtractionTests, OneObject)
EXPECT_EQ(res.type, expected.type);
}
TEST_F(ExtractionTests, OneObjectWithSuccessorAndPredecessor)
TEST_F(ExtractionNgTests, OneObjectWithSuccessorAndPredecessor)
{
using namespace etlng::impl;
@@ -129,6 +266,9 @@ TEST_F(ExtractionTests, OneObjectWithSuccessorAndPredecessor)
original.set_key(expected.keyRaw);
original.set_predecessor(expected.predecessor);
original.set_successor(expected.successor);
original.set_mod_type(
org::xrpl::rpc::v1::RawLedgerObject::ModificationType::RawLedgerObject_ModificationType_CREATED
);
auto res = extractObj(original);
EXPECT_EQ(ripple::strHex(res.key), ripple::strHex(expected.keyRaw));
@@ -138,7 +278,7 @@ TEST_F(ExtractionTests, OneObjectWithSuccessorAndPredecessor)
EXPECT_EQ(res.type, expected.type);
}
TEST_F(ExtractionTests, MultipleObjects)
TEST_F(ExtractionNgTests, MultipleObjects)
{
using namespace etlng::impl;
@@ -146,6 +286,9 @@ TEST_F(ExtractionTests, MultipleObjects)
auto original = org::xrpl::rpc::v1::RawLedgerObject();
original.set_data(expected.dataRaw);
original.set_key(expected.keyRaw);
original.set_mod_type(
org::xrpl::rpc::v1::RawLedgerObject::ModificationType::RawLedgerObject_ModificationType_CREATED
);
auto list = org::xrpl::rpc::v1::RawLedgerObjects();
for (auto i = 0; i < 10; ++i) {
@@ -165,7 +308,7 @@ TEST_F(ExtractionTests, MultipleObjects)
}
}
TEST_F(ExtractionTests, OneSuccessor)
TEST_F(ExtractionNgTests, OneSuccessor)
{
using namespace etlng::impl;
@@ -179,7 +322,7 @@ TEST_F(ExtractionTests, OneSuccessor)
EXPECT_EQ(ripple::strHex(res.bookBase), ripple::strHex(expected.bookBase));
}
TEST_F(ExtractionTests, MultipleSuccessors)
TEST_F(ExtractionNgTests, MultipleSuccessors)
{
using namespace etlng::impl;
@@ -205,7 +348,7 @@ TEST_F(ExtractionTests, MultipleSuccessors)
}
}
TEST_F(ExtractionTests, SuccessorsWithNoNeighborsIncluded)
TEST_F(ExtractionNgTests, SuccessorsWithNoNeighborsIncluded)
{
using namespace etlng::impl;
@@ -238,7 +381,7 @@ struct MockFetcher : etl::LedgerFetcherInterface {
MOCK_METHOD(std::optional<GetLedgerResponseType>, fetchDataAndDiff, (uint32_t), (override));
};
struct ExtractorTests : ExtractionTests {
struct ExtractorTests : ExtractionNgTests {
std::shared_ptr<MockFetcher> fetcher = std::make_shared<MockFetcher>();
etlng::impl::Extractor extractor{fetcher};
};

View File

@@ -0,0 +1,160 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "data/Types.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/RegistryInterface.hpp"
#include "etlng/impl/Loading.hpp"
#include "rpc/RPCHelpers.hpp"
#include "util/BinaryTestObject.hpp"
#include "util/MockBackendTestFixture.hpp"
#include "util/MockETLServiceTestFixture.hpp"
#include "util/MockPrometheus.hpp"
#include "util/TestObject.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <cstdint>
#include <memory>
#include <optional>
#include <stdexcept>
#include <string>
#include <vector>
using namespace etlng::model;
using namespace etlng::impl;
namespace {
constinit auto const kLEDGER_HASH = "4BC50C9B0D8515D3EAAE1E74B29A95804346C491EE1A95BF25E4AAB854A6A652";
constinit auto const kSEQ = 30;
struct MockRegistry : etlng::RegistryInterface {
MOCK_METHOD(void, dispatchInitialObjects, (uint32_t, std::vector<Object> const&, std::string), (override));
MOCK_METHOD(void, dispatchInitialData, (LedgerData const&), (override));
MOCK_METHOD(void, dispatch, (LedgerData const&), (override));
};
struct MockLoadObserver : etlng::InitialLoadObserverInterface {
MOCK_METHOD(
void,
onInitialLoadGotMoreObjects,
(uint32_t, std::vector<Object> const&, std::optional<std::string>),
(override)
);
};
struct LoadingTests : util::prometheus::WithPrometheus,
MockBackendTest,
MockLedgerFetcherTest,
MockAmendmentBlockHandlerTest {
protected:
std::shared_ptr<MockRegistry> mockRegistryPtr_ = std::make_shared<MockRegistry>();
Loader loader_{backend_, mockLedgerFetcherPtr_, mockRegistryPtr_, mockAmendmentBlockHandlerPtr_};
};
struct LoadingDeathTest : LoadingTests {};
auto
createTestData()
{
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
return LedgerData{
.transactions = {},
.objects = {util::createObject(), util::createObject(), util::createObject()},
.successors = {},
.edgeKeys = {},
.header = header,
.rawHeader = {},
.seq = kSEQ
};
}
} // namespace
TEST_F(LoadingTests, LoadInitialLedger)
{
auto const data = createTestData();
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*backend_, doFinishWrites());
EXPECT_CALL(*mockRegistryPtr_, dispatchInitialData(data));
auto const res = loader_.loadInitialLedger(data);
EXPECT_TRUE(res.has_value());
EXPECT_EQ(rpc::ledgerHeaderToBlob(res.value(), true), rpc::ledgerHeaderToBlob(data.header, true));
}
TEST_F(LoadingTests, LoadSuccess)
{
auto const data = createTestData();
EXPECT_CALL(*backend_, doFinishWrites());
EXPECT_CALL(*mockRegistryPtr_, dispatch(data));
loader_.load(data);
}
TEST_F(LoadingTests, LoadFailure)
{
auto const data = createTestData();
EXPECT_CALL(*backend_, doFinishWrites()).Times(0);
EXPECT_CALL(*mockRegistryPtr_, dispatch(data)).WillOnce([](auto const&) {
throw std::runtime_error("some error");
});
EXPECT_CALL(*mockAmendmentBlockHandlerPtr_, notifyAmendmentBlocked());
loader_.load(data);
}
TEST_F(LoadingTests, OnInitialLoadGotMoreObjectsWithKey)
{
auto const data = createTestData();
auto const lastKey = std::make_optional<std::string>("something");
EXPECT_CALL(*mockRegistryPtr_, dispatchInitialObjects(kSEQ, data.objects, lastKey->data()));
loader_.onInitialLoadGotMoreObjects(kSEQ, data.objects, lastKey);
}
TEST_F(LoadingTests, OnInitialLoadGotMoreObjectsWithoutKey)
{
auto const data = createTestData();
auto const lastKey = std::optional<std::string>{};
EXPECT_CALL(*mockRegistryPtr_, dispatchInitialObjects(kSEQ, data.objects, std::string{}));
loader_.onInitialLoadGotMoreObjects(kSEQ, data.objects, lastKey);
}
TEST_F(LoadingDeathTest, LoadInitialLedgerHasDataInDB)
{
auto const data = createTestData();
auto const range = LedgerRange{.minSequence = kSEQ - 1, .maxSequence = kSEQ};
// backend_ leaks due to death test. would be nice to figure out a better solution but for now
// we simply don't set expectations and allow the mock to leak
testing::Mock::AllowLeak(&*backend_);
ON_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillByDefault(testing::Return(range));
EXPECT_DEATH({ [[maybe_unused]] auto unused = loader_.loadInitialLedger(data); }, ".*");
}

View File

@@ -0,0 +1,187 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etlng/Models.hpp"
#include "etlng/SchedulerInterface.hpp"
#include "etlng/impl/Loading.hpp"
#include "etlng/impl/Scheduling.hpp"
#include "util/LoggerFixtures.hpp"
#include "util/MockNetworkValidatedLedgers.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <functional>
#include <memory>
#include <optional>
#include <utility>
using namespace etlng;
using namespace etlng::model;
namespace {
class FakeScheduler : SchedulerInterface {
std::function<std::optional<Task>()> generator_;
public:
FakeScheduler(std::function<std::optional<Task>()> generator) : generator_{std::move(generator)}
{
}
[[nodiscard]] std::optional<Task>
next() override
{
return generator_();
}
};
} // namespace
struct ForwardSchedulerTests : NoLoggerFixture {
protected:
MockNetworkValidatedLedgersPtr networkValidatedLedgers_;
};
TEST_F(ForwardSchedulerTests, ExhaustsSchedulerIfMostRecentLedgerIsNewerThanRequestedSequence)
{
auto scheduler = impl::ForwardScheduler(*networkValidatedLedgers_, 0u, 10u);
EXPECT_CALL(*networkValidatedLedgers_, getMostRecent()).Times(11).WillRepeatedly(testing::Return(11u));
for (auto i = 0u; i < 10u; ++i) {
auto maybeTask = scheduler.next();
EXPECT_TRUE(maybeTask.has_value());
EXPECT_EQ(maybeTask->seq, i);
}
auto const empty = scheduler.next();
EXPECT_FALSE(empty.has_value());
}
TEST_F(ForwardSchedulerTests, ReturnsNulloptIfMostRecentLedgerIsOlderThanRequestedSequence)
{
auto scheduler = impl::ForwardScheduler(*networkValidatedLedgers_, 0u, 10u);
EXPECT_CALL(*networkValidatedLedgers_, getMostRecent()).Times(10).WillRepeatedly(testing::Return(4u));
for (auto i = 0u; i < 5u; ++i) {
auto const maybeTask = scheduler.next();
EXPECT_TRUE(maybeTask.has_value());
EXPECT_EQ(maybeTask->seq, i);
}
for (auto i = 0u; i < 5u; ++i)
EXPECT_FALSE(scheduler.next().has_value());
}
TEST(BackfillSchedulerTests, ExhaustsSchedulerUntilMinSeqReached)
{
auto scheduler = impl::BackfillScheduler(10u, 5u);
for (auto i = 10u; i > 5u; --i) {
auto maybeTask = scheduler.next();
EXPECT_TRUE(maybeTask.has_value());
EXPECT_EQ(maybeTask->seq, i);
}
auto const empty = scheduler.next();
EXPECT_FALSE(empty.has_value());
}
TEST(BackfillSchedulerTests, ExhaustsSchedulerUntilDefaultMinValueReached)
{
auto scheduler = impl::BackfillScheduler(10u);
for (auto i = 10u; i > 0u; --i) {
auto const maybeTask = scheduler.next();
EXPECT_TRUE(maybeTask.has_value());
EXPECT_EQ(maybeTask->seq, i);
}
auto const empty = scheduler.next();
EXPECT_FALSE(empty.has_value());
}
TEST(SchedulerChainTests, ExhaustsOneGenerator)
{
auto generate = [stop = 10u, seq = 0u]() mutable {
std::optional<Task> task = std::nullopt;
if (seq < stop)
task = Task{.priority = model::Task::Priority::Lower, .seq = seq++};
return task;
};
testing::MockFunction<std::optional<Task>()> upToTenGen;
EXPECT_CALL(upToTenGen, Call()).Times(11).WillRepeatedly(testing::ByRef(generate));
auto scheduler = impl::makeScheduler(FakeScheduler(upToTenGen.AsStdFunction()));
for (auto i = 0u; i < 10u; ++i) {
auto const maybeTask = scheduler->next();
EXPECT_TRUE(maybeTask.has_value());
EXPECT_EQ(maybeTask->seq, i);
}
auto const empty = scheduler->next();
EXPECT_FALSE(empty.has_value());
}
TEST(SchedulerChainTests, ExhaustsFirstSchedulerBeforeUsingSecond)
{
auto generateFirst = [stop = 10u, seq = 0u]() mutable {
std::optional<Task> task = std::nullopt;
if (seq < stop)
task = Task{.priority = model::Task::Priority::Lower, .seq = seq++};
return task;
};
testing::MockFunction<std::optional<Task>()> upToTenGen;
EXPECT_CALL(upToTenGen, Call()).Times(21).WillRepeatedly(testing::ByRef(generateFirst));
auto generateSecond = [seq = 10u]() mutable {
std::optional<Task> task = std::nullopt;
if (seq > 0u)
task = Task{.priority = model::Task::Priority::Lower, .seq = seq--};
return task;
};
testing::MockFunction<std::optional<Task>()> downToZeroGen;
EXPECT_CALL(downToZeroGen, Call()).Times(11).WillRepeatedly(testing::ByRef(generateSecond));
auto scheduler =
impl::makeScheduler(FakeScheduler(upToTenGen.AsStdFunction()), FakeScheduler(downToZeroGen.AsStdFunction()));
for (auto i = 0u; i < 10u; ++i) {
auto const maybeTask = scheduler->next();
EXPECT_TRUE(maybeTask.has_value());
EXPECT_EQ(maybeTask->seq, i);
}
for (auto i = 10u; i > 0u; --i) {
auto const maybeTask = scheduler->next();
EXPECT_TRUE(maybeTask.has_value());
EXPECT_EQ(maybeTask->seq, i);
}
auto const empty = scheduler->next();
EXPECT_FALSE(empty.has_value());
}

View File

@@ -20,6 +20,7 @@
#include "util/BytesConverter.hpp"
#include <gtest/gtest.h>
#include <cstdint>
#include <limits>

View File

@@ -0,0 +1,69 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "util/MoveTracker.hpp"
#include <gtest/gtest.h>
#include <utility>
namespace {
struct MoveMe : util::MoveTracker {
using MoveTracker::wasMoved; // expose as public for tests
};
} // namespace
TEST(MoveTrackerTests, SimpleChecks)
{
auto moveMe = MoveMe();
EXPECT_FALSE(moveMe.wasMoved());
auto other = std::move(moveMe);
EXPECT_TRUE(moveMe.wasMoved()); // NOLINT(bugprone-use-after-move)
EXPECT_FALSE(other.wasMoved());
}
TEST(MoveTrackerTests, SupportReuse)
{
auto original = MoveMe();
auto other = std::move(original);
original = std::move(other);
EXPECT_FALSE(original.wasMoved());
EXPECT_TRUE(other.wasMoved()); // NOLINT(bugprone-use-after-move)
}
TEST(MoveTrackerTests, SelfMove)
{
auto original = MoveMe();
[&](MoveMe& from) { original = std::move(from); }(original); // avoids the compiler catching self-move
EXPECT_FALSE(original.wasMoved());
}
TEST(MoveTrackerTests, SelfMoveAfterWasMoved)
{
auto original = MoveMe();
[[maybe_unused]] auto fake = std::move(original);
// NOLINTNEXTLINE(bugprone-use-after-move)
[&](MoveMe& from) { original = std::move(from); }(original); // avoids the compiler catching self-move
EXPECT_TRUE(original.wasMoved()); // NOLINT(bugprone-use-after-move)
}

View File

@@ -34,8 +34,6 @@
using namespace util::async;
using ::testing::Types;
using ExecutionContextTypes = Types<CoroExecutionContext, PoolExecutionContext, SyncExecutionContext>;
template <typename T>
struct ExecutionContextTests : public ::testing::Test {
using ExecutionContextType = T;
@@ -48,7 +46,15 @@ struct ExecutionContextTests : public ::testing::Test {
}
};
// Suite for tests to be ran against all context types but SyncExecutionContext
template <typename T>
using AsyncExecutionContextTests = ExecutionContextTests<T>;
using ExecutionContextTypes = Types<CoroExecutionContext, PoolExecutionContext, SyncExecutionContext>;
using AsyncExecutionContextTypes = Types<CoroExecutionContext, PoolExecutionContext>;
TYPED_TEST_CASE(ExecutionContextTests, ExecutionContextTypes);
TYPED_TEST_CASE(AsyncExecutionContextTests, AsyncExecutionContextTypes);
TYPED_TEST(ExecutionContextTests, move)
{
@@ -149,6 +155,26 @@ TYPED_TEST(ExecutionContextTests, timerCancel)
EXPECT_EQ(value, 42);
}
TYPED_TEST(ExecutionContextTests, timerAutoCancels)
{
auto value = 0;
std::binary_semaphore sem{0};
{
auto res = this->ctx.scheduleAfter(
std::chrono::milliseconds(1),
[&value, &sem]([[maybe_unused]] auto stopRequested, auto cancelled) {
if (cancelled)
value = 42;
sem.release();
}
);
} // res goes out of scope and cancels the timer
sem.acquire();
EXPECT_EQ(value, 42);
}
TYPED_TEST(ExecutionContextTests, timerStdException)
{
auto res =
@@ -247,6 +273,46 @@ TYPED_TEST(ExecutionContextTests, strandWithTimeout)
EXPECT_EQ(res.get().value(), 42);
}
TYPED_TEST(AsyncExecutionContextTests, executeAutoAborts)
{
auto value = 0;
std::binary_semaphore sem{0};
{
auto res = this->ctx.execute([&](auto stopRequested) {
while (not stopRequested)
;
value = 42;
sem.release();
});
} // res goes out of scope and aborts operation
sem.acquire();
EXPECT_EQ(value, 42);
}
TYPED_TEST(AsyncExecutionContextTests, repeatingOperationAutoAborts)
{
auto const repeatDelay = std::chrono::milliseconds{1};
auto const timeout = std::chrono::milliseconds{15};
auto callCount = 0uz;
auto timeSpentMs = 0u;
{
auto res = this->ctx.executeRepeatedly(repeatDelay, [&] { ++callCount; });
timeSpentMs = util::timed([timeout] { std::this_thread::sleep_for(timeout); }); // calculate actual time spent
} // res goes out of scope and automatically aborts the repeating operation
// double the delay so that if abort did not happen we will fail below expectations
std::this_thread::sleep_for(timeout);
auto const expectedPureCalls = timeout.count() / repeatDelay.count();
auto const expectedActualCount = timeSpentMs / repeatDelay.count();
EXPECT_GE(callCount, expectedPureCalls / 2u); // expect at least half of the scheduled calls
EXPECT_LE(callCount, expectedActualCount); // never should be called more times than possible before timeout
}
using NoErrorHandlerSyncExecutionContext = BasicExecutionContext<
impl::SameThreadContext,
impl::BasicStopSource,