Rewrite cache loader to async framework (#1193)

Fixes #1188
This commit is contained in:
Alex Kremer
2024-02-20 19:24:49 +00:00
committed by GitHub
parent 27fe35a2d1
commit 190b5c6a37
21 changed files with 947 additions and 472 deletions

View File

@@ -101,6 +101,7 @@ target_sources (clio PRIVATE
src/etl/ETLService.cpp
src/etl/ETLState.cpp
src/etl/LoadBalancer.cpp
src/etl/CacheLoaderSettings.cpp
src/etl/Source.cpp
src/etl/impl/ForwardingSource.cpp
src/etl/impl/GrpcSource.cpp
@@ -227,7 +228,9 @@ if (tests)
unittests/util/requests/WsConnectionTests.cpp
# ETL
unittests/etl/AmendmentBlockHandlerTests.cpp
unittests/etl/CacheLoaderSettingsTests.cpp
unittests/etl/CacheLoaderTests.cpp
unittests/etl/CursorProviderTests.cpp
unittests/etl/ETLStateTests.cpp
unittests/etl/ExtractionDataPipeTests.cpp
unittests/etl/ExtractorTests.cpp
@@ -310,7 +313,7 @@ if (tests)
include (CMake/deps/gtest.cmake)
# See https://github.com/google/googletest/issues/3475
gtest_discover_tests (clio_tests DISCOVERY_TIMEOUT 10)
gtest_discover_tests (clio_tests DISCOVERY_TIMEOUT 90)
# Fix for dwarf5 bug on ci
target_compile_options (clio PUBLIC -gdwarf-4)

View File

@@ -52,15 +52,6 @@
"max_requests": 20, // Max connections per IP per sweep interval
"sweep_interval": 1 // Time in seconds before resetting max_fetches and max_requests
},
"cache": {
// Comma-separated list of peer nodes that Clio can use to download cache from at startup
"peers": [
{
"ip": "127.0.0.1",
"port": 51234
}
]
},
"server": {
"ip": "0.0.0.0",
"port": 51233,

127
src/etl/CacheLoader.hpp Normal file
View File

@@ -0,0 +1,127 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/BackendInterface.hpp"
#include "etl/CacheLoaderSettings.hpp"
#include "etl/impl/CacheLoader.hpp"
#include "util/Assert.hpp"
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/log/Logger.hpp"
#include <cstdint>
#include <functional>
#include <memory>
namespace etl {
/**
* @brief Cache loading interface
*
* This class is responsible for loading the cache for a given sequence number.
*
* @tparam CacheType The type of the cache to load
* @tparam CursorProviderType The type of the cursor provider to use
* @tparam ExecutionContextType The type of the execution context to use
*/
template <
typename CacheType,
typename CursorProviderType = impl::CursorProvider,
typename ExecutionContextType = util::async::CoroExecutionContext>
class CacheLoader {
using CacheLoaderType = impl::CacheLoaderImpl<CacheType>;
util::Logger log_{"ETL"};
std::shared_ptr<BackendInterface> backend_;
std::reference_wrapper<CacheType> cache_;
CacheLoaderSettings settings_;
ExecutionContextType ctx_;
std::unique_ptr<CacheLoaderType> loader_;
public:
/**
* @brief Construct a new Cache Loader object
*
* @param config The configuration to use
* @param backend The backend to use
* @param cache The cache to load into
*/
CacheLoader(util::Config const& config, std::shared_ptr<BackendInterface> const& backend, CacheType& cache)
: backend_{backend}, cache_{cache}, settings_{make_CacheLoaderSettings(config)}, ctx_{settings_.numThreads}
{
}
/**
* @brief Load the cache for the given sequence number
*
* This function is blocking if the cache load style is set to sync and
* disables the cache entirely if the load style is set to none/no.
*
* @param seq The sequence number to load cache for
*/
void
load(uint32_t const seq)
{
ASSERT(not cache_.get().isFull(), "Cache must not be full. seq = {}", seq);
if (settings_.isDisabled()) {
cache_.get().setDisabled();
LOG(log_.warn()) << "Cache is disabled. Not loading";
return;
}
auto const provider = CursorProviderType{backend_, settings_.numCacheDiffs};
loader_ = std::make_unique<CacheLoaderType>(
ctx_,
backend_,
cache_,
seq,
settings_.numCacheMarkers,
settings_.cachePageFetchSize,
provider.getCursors(seq)
);
if (settings_.isSync()) {
loader_->wait();
ASSERT(cache_.get().isFull(), "Cache must be full after sync load. seq = {}", seq);
}
}
/**
* @brief Requests the loader to stop asap
*/
void
stop() noexcept
{
loader_->stop();
}
/**
* @brief Waits for the loader to finish background work
*/
void
wait() noexcept
{
loader_->wait();
}
};
} // namespace etl

View File

@@ -0,0 +1,75 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/CacheLoaderSettings.hpp"
#include "util/config/Config.hpp"
#include <boost/algorithm/string/predicate.hpp>
#include <ripple/basics/Blob.h>
#include <ripple/basics/base_uint.h>
#include <ripple/basics/strHex.h>
#include <cstddef>
#include <string>
namespace etl {
[[nodiscard]] bool
CacheLoaderSettings::isSync() const
{
return loadStyle == LoadStyle::SYNC;
}
[[nodiscard]] bool
CacheLoaderSettings::isAsync() const
{
return loadStyle == LoadStyle::ASYNC;
}
[[nodiscard]] bool
CacheLoaderSettings::isDisabled() const
{
return loadStyle == LoadStyle::NONE;
}
[[nodiscard]] CacheLoaderSettings
make_CacheLoaderSettings(util::Config const& config)
{
CacheLoaderSettings settings;
settings.numThreads = config.valueOr("io_threads", settings.numThreads);
if (config.contains("cache")) {
auto const cache = config.section("cache");
settings.numCacheDiffs = cache.valueOr<size_t>("num_diffs", settings.numCacheDiffs);
settings.numCacheMarkers = cache.valueOr<size_t>("num_markers", settings.numCacheMarkers);
settings.cachePageFetchSize = cache.valueOr<size_t>("page_fetch_size", settings.cachePageFetchSize);
if (auto entry = cache.maybeValue<std::string>("load"); entry) {
if (boost::iequals(*entry, "sync"))
settings.loadStyle = CacheLoaderSettings::LoadStyle::SYNC;
if (boost::iequals(*entry, "async"))
settings.loadStyle = CacheLoaderSettings::LoadStyle::ASYNC;
if (boost::iequals(*entry, "none") or boost::iequals(*entry, "no"))
settings.loadStyle = CacheLoaderSettings::LoadStyle::NONE;
}
}
return settings;
}
} // namespace etl

View File

@@ -0,0 +1,60 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/config/Config.hpp"
#include <cstddef>
namespace etl {
/**
* @brief Settings for the cache loader
*/
struct CacheLoaderSettings {
enum class LoadStyle { ASYNC, SYNC, NONE };
size_t numCacheDiffs = 32; /**< number of diffs to use to generate cursors */
size_t numCacheMarkers = 48; /**< number of markers to use at one time to traverse the ledger */
size_t cachePageFetchSize = 512; /**< number of ledger objects to fetch concurrently per marker */
size_t numThreads = 2; /**< number of threads to use for loading cache */
LoadStyle loadStyle = LoadStyle::ASYNC; /**< how to load the cache */
auto
operator<=>(CacheLoaderSettings const&) const = default;
/** @returns True if the load style is SYNC; false otherwise */
[[nodiscard]] bool
isSync() const;
/** @returns True if the load style is ASYNC; false otherwise */
[[nodiscard]] bool
isAsync() const;
/** @returns True if the cache is disabled; false otherwise */
[[nodiscard]] bool
isDisabled() const;
};
[[nodiscard]] CacheLoaderSettings
make_CacheLoaderSettings(util::Config const& config);
} // namespace etl

View File

@@ -268,7 +268,7 @@ ETLService::ETLService(
: backend_(backend)
, loadBalancer_(balancer)
, networkValidatedLedgers_(std::move(ledgers))
, cacheLoader_(config, ioc, backend, backend->cache())
, cacheLoader_(config, backend, backend->cache())
, ledgerFetcher_(backend, balancer)
, ledgerLoader_(backend, balancer, ledgerFetcher_, state_)
, ledgerPublisher_(ioc, backend, backend->cache(), subscriptions, state_)

View File

@@ -21,12 +21,12 @@
#include "data/BackendInterface.hpp"
#include "data/LedgerCache.hpp"
#include "etl/CacheLoader.hpp"
#include "etl/ETLHelpers.hpp"
#include "etl/ETLState.hpp"
#include "etl/LoadBalancer.hpp"
#include "etl/SystemState.hpp"
#include "etl/impl/AmendmentBlock.hpp"
#include "etl/impl/CacheLoader.hpp"
#include "etl/impl/ExtractionDataPipe.hpp"
#include "etl/impl/Extractor.hpp"
#include "etl/impl/LedgerFetcher.hpp"
@@ -82,7 +82,7 @@ class ETLService {
using NetworkValidatedLedgersType = NetworkValidatedLedgers;
using DataPipeType = etl::impl::ExtractionDataPipe<org::xrpl::rpc::v1::GetLedgerResponse>;
using CacheType = data::LedgerCache;
using CacheLoaderType = etl::impl::CacheLoader<CacheType>;
using CacheLoaderType = etl::CacheLoader<CacheType>;
using LedgerFetcherType = etl::impl::LedgerFetcher<LoadBalancerType>;
using ExtractorType = etl::impl::Extractor<DataPipeType, NetworkValidatedLedgersType, LedgerFetcherType>;
using LedgerLoaderType = etl::impl::LedgerLoader<LoadBalancerType, LedgerFetcherType>;

View File

@@ -20,423 +20,135 @@
#pragma once
#include "data/BackendInterface.hpp"
#include "data/Types.hpp"
#include "util/Assert.hpp"
#include "etl/ETLHelpers.hpp"
#include "etl/impl/CursorProvider.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/log/Logger.hpp"
#include <boost/algorithm/hex.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/core/buffers_to_string.hpp>
#include <boost/beast/core/error.hpp>
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/beast/core/string.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/stream.hpp>
#include <boost/json/object.hpp>
#include <boost/json/parse.hpp>
#include <boost/json/value.hpp>
#include <boost/json/value_to.hpp>
#include <grpcpp/grpcpp.h>
#include <ripple/basics/Blob.h>
#include <ripple/basics/base_uint.h>
#include <ripple/basics/strHex.h>
#include <ripple/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <exception>
#include <functional>
#include <iterator>
#include <memory>
#include <optional>
#include <random>
#include <sstream>
#include <ranges>
#include <string>
#include <thread>
#include <utility>
#include <vector>
namespace etl::impl {
/**
* @brief Cache loading interface
*/
template <typename CacheType>
class CacheLoader {
static constexpr size_t DEFAULT_NUM_CACHE_DIFFS = 32;
static constexpr size_t DEFAULT_NUM_CACHE_MARKERS = 48;
static constexpr size_t DEFAULT_CACHE_PAGE_FETCH_SIZE = 512;
enum class LoadStyle { ASYNC, SYNC, NOT_AT_ALL };
class CacheLoaderImpl {
util::Logger log_{"ETL"};
std::reference_wrapper<boost::asio::io_context> ioContext_;
util::async::AnyExecutionContext ctx_;
std::shared_ptr<BackendInterface> backend_;
std::reference_wrapper<CacheType> cache_;
LoadStyle cacheLoadStyle_ = LoadStyle::ASYNC;
// number of diffs to use to generate cursors to traverse the ledger in parallel during initial cache download
size_t numCacheDiffs_ = DEFAULT_NUM_CACHE_DIFFS;
etl::ThreadSafeQueue<CursorPair> queue_;
std::atomic_int16_t remaining_;
// number of markers to use at one time to traverse the ledger in parallel during initial cache download
size_t numCacheMarkers_ = DEFAULT_NUM_CACHE_MARKERS;
// number of ledger objects to fetch concurrently per marker during cache download
size_t cachePageFetchSize_ = DEFAULT_CACHE_PAGE_FETCH_SIZE;
struct ClioPeer {
std::string ip;
int port{};
};
std::vector<ClioPeer> clioPeers_;
std::thread thread_;
std::atomic_bool stopping_ = false;
std::chrono::steady_clock::time_point startTime_ = std::chrono::steady_clock::now();
std::vector<util::async::AnyOperation<void>> tasks_;
public:
CacheLoader(
util::Config const& config,
boost::asio::io_context& ioc,
template <typename CtxType>
CacheLoaderImpl(
CtxType& ctx,
std::shared_ptr<BackendInterface> const& backend,
CacheType& ledgerCache
CacheType& cache,
uint32_t const seq,
std::size_t const numCacheMarkers,
std::size_t const cachePageFetchSize,
std::vector<CursorPair> const& cursors
)
: ioContext_{std::ref(ioc)}, backend_{backend}, cache_{ledgerCache}
: ctx_{ctx}, backend_{backend}, cache_{std::ref(cache)}, queue_{cursors.size()}, remaining_{cursors.size()}
{
if (config.contains("cache")) {
auto const cache = config.section("cache");
if (auto entry = cache.maybeValue<std::string>("load"); entry) {
if (boost::iequals(*entry, "sync"))
cacheLoadStyle_ = LoadStyle::SYNC;
if (boost::iequals(*entry, "async"))
cacheLoadStyle_ = LoadStyle::ASYNC;
if (boost::iequals(*entry, "none") or boost::iequals(*entry, "no"))
cacheLoadStyle_ = LoadStyle::NOT_AT_ALL;
}
numCacheDiffs_ = cache.valueOr<size_t>("num_diffs", numCacheDiffs_);
numCacheMarkers_ = cache.valueOr<size_t>("num_markers", numCacheMarkers_);
cachePageFetchSize_ = cache.valueOr<size_t>("page_fetch_size", cachePageFetchSize_);
if (auto peers = cache.maybeArray("peers"); peers) {
for (auto const& peer : *peers) {
auto ip = peer.value<std::string>("ip");
auto port = peer.value<uint32_t>("port");
// todo: use emplace_back when clang is ready
clioPeers_.push_back({ip, port});
}
unsigned const seed = std::chrono::system_clock::now().time_since_epoch().count();
std::shuffle(std::begin(clioPeers_), std::end(clioPeers_), std::default_random_engine(seed));
}
}
std::ranges::for_each(cursors, [this](auto const& cursor) { queue_.push(cursor); });
load(seq, numCacheMarkers, cachePageFetchSize);
}
~CacheLoader()
~CacheLoaderImpl()
{
stop();
if (thread_.joinable())
thread_.join();
}
/**
* @brief Populates the cache by walking through the given ledger.
*
* Should only be called once. The default behavior is to return immediately and populate the cache in the
* background. This can be overridden via config parameter, to populate synchronously, or not at all.
*/
void
load(uint32_t seq)
{
if (cacheLoadStyle_ == LoadStyle::NOT_AT_ALL) {
cache_.get().setDisabled();
LOG(log_.warn()) << "Cache is disabled. Not loading";
return;
}
ASSERT(!cache_.get().isFull(), "Cache must not be full. seq = {}", seq);
if (!clioPeers_.empty()) {
boost::asio::spawn(ioContext_.get(), [this, seq](boost::asio::yield_context yield) {
for (auto const& peer : clioPeers_) {
// returns true on success
if (loadCacheFromClioPeer(seq, peer.ip, std::to_string(peer.port), yield))
return;
}
// if we couldn't successfully load from any peers, load from db
loadCacheFromDb(seq);
});
return;
}
loadCacheFromDb(seq);
// If loading synchronously, poll cache until full
static constexpr size_t SLEEP_TIME_SECONDS = 10;
while (cacheLoadStyle_ == LoadStyle::SYNC && not cache_.get().isFull()) {
LOG(log_.debug()) << "Cache not full. Cache size = " << cache_.get().size() << ". Sleeping ...";
std::this_thread::sleep_for(std::chrono::seconds(SLEEP_TIME_SECONDS));
if (cache_.get().isFull())
LOG(log_.info()) << "Cache is full. Cache size = " << cache_.get().size();
}
wait();
}
void
stop()
stop() noexcept
{
stopping_ = true;
for (auto& t : tasks_)
t.requestStop();
}
void
wait() noexcept
{
for (auto& t : tasks_)
t.wait();
}
private:
bool
loadCacheFromClioPeer(
uint32_t ledgerIndex,
std::string const& ip,
std::string const& port,
boost::asio::yield_context yield
)
void
load(uint32_t const seq, size_t numCacheMarkers, size_t cachePageFetchSize)
{
LOG(log_.info()) << "Loading cache from peer. ip = " << ip << " . port = " << port;
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace websocket = beast::websocket; // from
namespace net = boost::asio; // from
using tcp = boost::asio::ip::tcp; // from
try {
beast::error_code ec;
// These objects perform our I/O
tcp::resolver resolver{ioContext_.get()};
namespace vs = std::views;
LOG(log_.trace()) << "Creating websocket";
auto ws = std::make_unique<websocket::stream<beast::tcp_stream>>(ioContext_.get());
LOG(log_.info()) << "Loading cache. Num cursors = " << queue_.size();
tasks_.reserve(numCacheMarkers);
// Look up the domain name
auto const results = resolver.async_resolve(ip, port, yield[ec]);
if (ec)
return {};
LOG(log_.trace()) << "Connecting websocket";
// Make the connection on the IP address we get from a lookup
ws->next_layer().async_connect(results, yield[ec]);
if (ec)
return false;
LOG(log_.trace()) << "Performing websocket handshake";
// Perform the websocket handshake
ws->async_handshake(ip, "/", yield[ec]);
if (ec)
return false;
std::optional<boost::json::value> marker;
LOG(log_.trace()) << "Sending request";
static constexpr int LIMIT = 2048;
auto getRequest = [&](auto marker) {
boost::json::object request = {
{"command", "ledger_data"},
{"ledger_index", ledgerIndex},
{"binary", true},
{"out_of_order", true},
{"limit", LIMIT}
};
if (marker)
request["marker"] = *marker;
return request;
};
bool started = false;
size_t numAttempts = 0;
do {
// Send the message
ws->async_write(net::buffer(boost::json::serialize(getRequest(marker))), yield[ec]);
if (ec) {
LOG(log_.error()) << "error writing = " << ec.message();
return false;
}
beast::flat_buffer buffer;
ws->async_read(buffer, yield[ec]);
if (ec) {
LOG(log_.error()) << "error reading = " << ec.message();
return false;
}
auto raw = beast::buffers_to_string(buffer.data());
auto parsed = boost::json::parse(raw);
if (!parsed.is_object()) {
LOG(log_.error()) << "Error parsing response: " << raw;
return false;
}
LOG(log_.trace()) << "Successfully parsed response " << parsed;
if (auto const& response = parsed.as_object(); response.contains("error")) {
LOG(log_.error()) << "Response contains error: " << response;
auto const& err = response.at("error");
if (err.is_string() && err.as_string() == "lgrNotFound") {
static constexpr size_t MAX_ATTEMPTS = 5;
++numAttempts;
if (numAttempts >= MAX_ATTEMPTS) {
LOG(log_.error())
<< " ledger not found at peer after 5 attempts. peer = " << ip
<< " ledger = " << ledgerIndex << ". Check your config and the health of the peer";
return false;
}
LOG(log_.warn()) << "Ledger not found. ledger = " << ledgerIndex
<< ". Sleeping and trying again";
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
return false;
}
started = true;
auto const& response = parsed.as_object()["result"].as_object();
if (!response.contains("cache_full") || !response.at("cache_full").as_bool()) {
LOG(log_.error()) << "cache not full for clio node. ip = " << ip;
return false;
}
if (response.contains("marker")) {
marker = response.at("marker");
} else {
marker = {};
}
auto const& state = response.at("state").as_array();
std::vector<data::LedgerObject> objects;
objects.reserve(state.size());
for (auto const& ledgerObject : state) {
auto const& obj = ledgerObject.as_object();
data::LedgerObject stateObject = {};
if (!stateObject.key.parseHex(boost::json::value_to<std::string>(obj.at("index")))) {
LOG(log_.error()) << "failed to parse object id";
return false;
}
boost::algorithm::unhex(
boost::json::value_to<std::string>(obj.at("data")), std::back_inserter(stateObject.blob)
);
objects.push_back(std::move(stateObject));
}
cache_.get().update(objects, ledgerIndex, true);
if (marker)
LOG(log_.debug()) << "At marker " << *marker;
} while (marker || !started);
LOG(log_.info()) << "Finished downloading ledger from clio node. ip = " << ip;
cache_.get().setFull();
return true;
} catch (std::exception const& e) {
LOG(log_.error()) << "Encountered exception : " << e.what() << " - ip = " << ip;
return false;
}
for ([[maybe_unused]] auto taskId : vs::iota(0u, numCacheMarkers))
tasks_.push_back(spawnWorker(seq, cachePageFetchSize));
}
void
loadCacheFromDb(uint32_t seq)
[[nodiscard]] auto
spawnWorker(uint32_t const seq, size_t cachePageFetchSize)
{
std::vector<data::LedgerObject> diff;
std::vector<std::optional<ripple::uint256>> cursors;
return ctx_.execute([this, seq, cachePageFetchSize](auto token) {
while (not token.isStopRequested()) {
auto cursor = queue_.tryPop();
if (not cursor.has_value()) {
return; // queue is empty
}
auto append = [](auto&& a, auto&& b) { a.insert(std::end(a), std::begin(b), std::end(b)); };
auto [start, end] = cursor.value();
LOG(log_.debug()) << "Starting a cursor: " << ripple::strHex(start);
for (size_t i = 0; i < numCacheDiffs_; ++i) {
append(diff, data::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchLedgerDiff(seq - i, yield);
}));
}
while (not token.isStopRequested()) {
auto res = data::retryOnTimeout([this, seq, cachePageFetchSize, &start, token]() {
return backend_->fetchLedgerPage(start, seq, cachePageFetchSize, false, token);
});
std::sort(diff.begin(), diff.end(), [](auto a, auto b) {
return a.key < b.key || (a.key == b.key && a.blob.size() < b.blob.size());
});
cache_.get().update(res.objects, seq, true);
diff.erase(std::unique(diff.begin(), diff.end(), [](auto a, auto b) { return a.key == b.key; }), diff.end());
if (not res.cursor or res.cursor > end) {
if (--remaining_ <= 0) {
auto endTime = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(endTime - startTime_);
cursors.emplace_back();
for (auto const& obj : diff) {
if (!obj.blob.empty())
cursors.emplace_back(obj.key);
}
cursors.emplace_back();
std::stringstream cursorStr;
for (auto const& c : cursors) {
if (c)
cursorStr << ripple::strHex(*c) << ", ";
}
LOG(log_.info()) << "Loading cache. num cursors = " << cursors.size() - 1;
LOG(log_.trace()) << "cursors = " << cursorStr.str();
thread_ = std::thread{[this, seq, cursors = std::move(cursors)]() {
auto startTime = std::chrono::system_clock::now();
auto markers = std::make_shared<std::atomic_int>(0);
auto numRemaining = std::make_shared<std::atomic_int>(cursors.size() - 1);
for (size_t i = 0; i < cursors.size() - 1; ++i) {
auto const start = cursors.at(i);
auto const end = cursors.at(i + 1);
markers->wait(numCacheMarkers_);
++(*markers);
boost::asio::spawn(
ioContext_.get(),
[this, seq, start, end, numRemaining, startTime, markers](boost::asio::yield_context yield) {
auto cursor = start;
std::string cursorStr =
cursor.has_value() ? ripple::strHex(cursor.value()) : ripple::strHex(data::firstKey);
LOG(log_.debug()) << "Starting a cursor: " << cursorStr << " markers = " << *markers;
while (not stopping_) {
auto res = data::retryOnTimeout([this, seq, &cursor, yield]() {
return backend_->fetchLedgerPage(cursor, seq, cachePageFetchSize_, false, yield);
});
cache_.get().update(res.objects, seq, true);
if (!res.cursor || (end && *(res.cursor) > *end))
break;
LOG(log_.trace()) << "Loading cache. cache size = " << cache_.get().size()
<< " - cursor = " << ripple::strHex(res.cursor.value())
<< " start = " << cursorStr << " markers = " << *markers;
cursor = std::move(res.cursor);
}
--(*markers);
markers->notify_one();
if (--(*numRemaining) == 0) {
auto endTime = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(endTime - startTime);
LOG(log_.info()) << "Finished loading cache. cache size = " << cache_.get().size()
LOG(log_.info()) << "Finished loading cache. Cache size = " << cache_.get().size()
<< ". Took " << duration.count() << " seconds";
cache_.get().setFull();
} else {
LOG(log_.info()) << "Finished a cursor. num remaining = " << *numRemaining
<< " start = " << cursorStr << " markers = " << *markers;
LOG(log_.debug()) << "Finished a cursor. Remaining = " << remaining_;
}
break; // pick up the next cursor if available
}
);
start = std::move(res.cursor).value();
}
}
}};
});
}
};

View File

@@ -0,0 +1,113 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/BackendInterface.hpp"
#include "data/Types.hpp"
#include "util/log/Logger.hpp"
#include <boost/algorithm/string/predicate.hpp>
#include <boost/asio/io_context.hpp>
#include <ripple/basics/base_uint.h>
#include <ripple/basics/strHex.h>
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <iterator>
#include <memory>
#include <ranges>
#include <string>
#include <vector>
namespace etl::impl {
struct CursorPair {
ripple::uint256 start;
ripple::uint256 end;
};
class CursorProvider {
util::Logger log_{"ETL"};
std::shared_ptr<BackendInterface> backend_;
size_t numDiffs_;
public:
CursorProvider(std::shared_ptr<BackendInterface> const& backend, size_t numDiffs)
: backend_{backend}, numDiffs_{numDiffs}
{
}
[[nodiscard]] std::vector<CursorPair>
getCursors(uint32_t const seq) const
{
namespace rg = std::ranges;
namespace vs = std::views;
auto diffs = std::vector<data::LedgerObject>{};
auto const append = [](auto&& a, auto&& b) { a.insert(std::end(a), std::begin(b), std::end(b)); };
auto const fetchDiff = [this, seq](uint32_t offset) {
return data::synchronousAndRetryOnTimeout([this, seq, offset](auto yield) {
return backend_->fetchLedgerDiff(seq - offset, yield);
});
};
rg::for_each(vs::iota(0u, numDiffs_), [&](auto i) { append(diffs, fetchDiff(i)); });
rg::sort(diffs, [](auto const& a, auto const& b) {
return a.key < b.key or (a.key == b.key and std::size(a.blob) < std::size(b.blob));
});
diffs.erase(
std::unique(
std::begin(diffs), std::end(diffs), [](auto const& a, auto const& b) { return a.key == b.key; }
),
std::end(diffs)
);
std::vector<ripple::uint256> cursors{data::firstKey};
rg::copy(
diffs //
| vs::filter([](auto const& obj) { return not obj.blob.empty(); }) //
| vs::transform([](auto const& obj) { return obj.key; }),
std::back_inserter(cursors)
);
cursors.push_back(data::lastKey); // last pair should cover the remaining range
std::vector<CursorPair> pairs;
pairs.reserve(cursors.size());
// FIXME: this should be `cursors | vs::pairwise` (C++23)
std::transform(
std::begin(cursors),
std::prev(std::end(cursors)),
std::next(std::begin(cursors)),
std::back_inserter(pairs),
[](auto&& a, auto&& b) -> CursorPair {
return {a, b};
}
);
return pairs;
}
};
} // namespace etl::impl

View File

@@ -47,7 +47,7 @@ assertImpl(
<< fmt::format(format, std::forward<Args>(args)...) << "\n"
<< "Stacktrace:\n"
<< boost::stacktrace::stacktrace() << "\n";
std::abort();
std::exit(EXIT_FAILURE); // std::abort does not flush gcovr output and causes uncovered lines
}
}

View File

@@ -19,8 +19,11 @@
#pragma once
#include "util/Assert.hpp"
#include "util/async/Concepts.hpp"
#include <boost/asio/spawn.hpp>
#include <memory>
#include <type_traits>
@@ -69,6 +72,15 @@ public:
return isStopRequested();
}
/**
* @returns The underlying boost::asio::yield_context
* @note ASSERTs if the stop token is not convertible to boost::asio::yield_context
*/
[[nodiscard]] operator boost::asio::yield_context() const
{
return pimpl_->yieldContext();
}
private:
struct Concept {
virtual ~Concept() = default;
@@ -78,6 +90,9 @@ private:
[[nodiscard]] virtual std::unique_ptr<Concept>
clone() const = 0;
[[nodiscard]] virtual boost::asio::yield_context
yieldContext() const = 0;
};
template <SomeStopToken TokenType>
@@ -99,6 +114,17 @@ private:
{
return std::make_unique<Model>(*this);
}
[[nodiscard]] boost::asio::yield_context
yieldContext() const override
{
if constexpr (std::is_convertible_v<TokenType, boost::asio::yield_context>) {
return token;
}
ASSERT(false, "Token type does not support conversion to boost::asio::yield_context");
__builtin_unreachable(); // TODO: replace with std::unreachable when C++23 is available
}
};
private:

View File

@@ -62,6 +62,9 @@ public:
}
public:
Token(Token const&) = default;
Token(Token&&) = default;
[[nodiscard]] bool
isStopRequested() const noexcept
{
@@ -74,6 +77,11 @@ public:
{
return isStopRequested();
}
[[nodiscard]] operator boost::asio::yield_context() const noexcept
{
return yield_;
}
};
[[nodiscard]] Token

View File

@@ -19,13 +19,13 @@
#pragma once
#include "util/Assert.hpp"
#include "util/Expected.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/Error.hpp"
#include "util/async/impl/Any.hpp"
#include <memory>
#include <stdexcept>
#include <type_traits>
namespace util::async::impl {
@@ -61,8 +61,8 @@ public:
}
/**
* @brief Request the operation to be stopped as soon as possible.
* @throw std::logic_error if the erased operation is non-stoppable
* @brief Request the operation to be stopped as soon as possible
* @note ASSERTs if the operation is not stoppable
*/
void
requestStop()
@@ -71,8 +71,8 @@ public:
}
/**
* @brief Cancel the operation if it is scheduled and not yet started.
* @throw std::logic_error if the erased operation is non-cancellable
* @brief Cancel the operation if it is scheduled and not yet started
* @note ASSERTs if the operation is not cancellable
*/
void
cancel()
@@ -123,7 +123,7 @@ private:
if constexpr (SomeStoppableOperation<OpType>) {
operation.requestStop();
} else {
throw std::logic_error("Stop requested on non-stoppable operation");
ASSERT(false, "Stop requested on non-stoppable operation");
}
}
@@ -133,7 +133,7 @@ private:
if constexpr (SomeCancellableOperation<OpType>) {
operation.cancel();
} else {
throw std::logic_error("Cancellation requested on non-cancellable operation");
ASSERT(false, "Cancellation requested on non-cancellable operation");
}
}
};

View File

@@ -0,0 +1,109 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/CacheLoaderSettings.hpp"
#include "util/config/Config.hpp"
#include <boost/json.hpp>
#include <boost/json/parse.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
namespace json = boost::json;
using namespace etl;
using namespace testing;
struct CacheLoaderSettingsTest : Test {};
TEST_F(CacheLoaderSettingsTest, DefaultSettingsParsedCorrectly)
{
auto const cfg = util::Config{json::parse(R"({})")};
auto const settings = make_CacheLoaderSettings(cfg);
auto const defaults = CacheLoaderSettings{};
EXPECT_EQ(settings, defaults);
}
TEST_F(CacheLoaderSettingsTest, NumThreadsCorrectlyPropagatedThroughConfig)
{
auto const cfg = util::Config{json::parse(R"({"io_threads": 42})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.numThreads, 42);
}
TEST_F(CacheLoaderSettingsTest, NumDiffsCorrectlyPropagatedThroughConfig)
{
auto const cfg = util::Config{json::parse(R"({"cache": {"num_diffs": 42}})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.numCacheDiffs, 42);
}
TEST_F(CacheLoaderSettingsTest, NumMarkersCorrectlyPropagatedThroughConfig)
{
auto const cfg = util::Config{json::parse(R"({"cache": {"num_markers": 42}})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.numCacheMarkers, 42);
}
TEST_F(CacheLoaderSettingsTest, PageFetchSizeCorrectlyPropagatedThroughConfig)
{
auto const cfg = util::Config{json::parse(R"({"cache": {"page_fetch_size": 42}})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.cachePageFetchSize, 42);
}
TEST_F(CacheLoaderSettingsTest, SyncLoadStyleCorrectlyPropagatedThroughConfig)
{
auto const cfg = util::Config{json::parse(R"({"cache": {"load": "sYNC"}})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.loadStyle, CacheLoaderSettings::LoadStyle::SYNC);
EXPECT_TRUE(settings.isSync());
}
TEST_F(CacheLoaderSettingsTest, AsyncLoadStyleCorrectlyPropagatedThroughConfig)
{
auto const cfg = util::Config{json::parse(R"({"cache": {"load": "aSynC"}})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.loadStyle, CacheLoaderSettings::LoadStyle::ASYNC);
EXPECT_TRUE(settings.isAsync());
}
TEST_F(CacheLoaderSettingsTest, NoLoadStyleCorrectlyPropagatedThroughConfig)
{
{
auto const cfg = util::Config{json::parse(R"({"cache": {"load": "nONe"}})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.loadStyle, CacheLoaderSettings::LoadStyle::NONE);
EXPECT_TRUE(settings.isDisabled());
}
{
auto const cfg = util::Config{json::parse(R"({"cache": {"load": "nO"}})")};
auto const settings = make_CacheLoaderSettings(cfg);
EXPECT_EQ(settings.loadStyle, CacheLoaderSettings::LoadStyle::NONE);
EXPECT_TRUE(settings.isDisabled());
}
}

View File

@@ -18,142 +18,208 @@
//==============================================================================
#include "data/Types.hpp"
#include "etl/CacheLoader.hpp"
#include "etl/CacheLoaderSettings.hpp"
#include "etl/impl/CacheLoader.hpp"
#include "etl/impl/FakeDiffProvider.hpp"
#include "util/Fixtures.hpp"
#include "util/MockCache.hpp"
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/config/Config.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/json.hpp>
#include <boost/json/parse.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <ripple/basics/Blob.h>
#include <ripple/basics/base_uint.h>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <map>
#include <mutex>
#include <optional>
#include <thread>
#include <vector>
namespace json = boost::json;
using namespace etl::impl;
using namespace etl;
using namespace util;
using namespace data;
using namespace testing;
constexpr static auto SEQ = 30;
constexpr static auto INDEX1 = "E6DBAFC99223B42257915A63DFC6B0C032D4070F9A574B255AD97466726FC321";
namespace {
struct CacheLoaderTest : public MockBackendTest {
constexpr auto SEQ = 30;
struct CacheLoaderTest : MockBackendTest {
void
SetUp() override
{
MockBackendTest::SetUp();
work.emplace(ctx);
for (auto i = 0; i < 2; ++i)
optThreads.emplace_back([&] { ctx.run(); });
}
void
TearDown() override
{
work.reset();
for (auto& optThread : optThreads) {
if (optThread.joinable())
optThread.join();
}
ctx.stop();
MockBackendTest::TearDown();
}
protected:
DiffProvider diffProvider;
MockCache cache;
Config cfg{json::parse("{}")};
std::optional<boost::asio::io_service::work> work;
boost::asio::io_context ctx;
std::vector<std::thread> optThreads;
};
namespace {
std::vector<LedgerObject>
getLatestDiff()
{
return std::vector<LedgerObject>{
{.key = ripple::uint256{"05E1EAC2574BE082B00B16F907CE32E6058DEB8F9E81CF34A00E80A5D71FA4FE"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"110872C7196EE6EF7032952F1852B11BB461A96FF2D7E06A8003B4BB30FD130B"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"3B3A84E850C724E914293271785A31D0BFC8B9DD1B6332E527B149AD72E80E18"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"4EC98C5C3F34C44409BC058998CBD64F6AED3FF6C0CAAEC15F7F42DF14EE9F04"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"58CEC9F17733EA7BA68C88E6179B8F207D001EE04D4E0366F958CC04FF6AB834"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"64FB1712146BA604C274CC335C5DE7ADFE52D1F8C3E904A9F9765FE8158A3E01"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"700BE23B1D9EE3E6BF52543D05843D5345B85D9EDB3D33BBD6B4C3A13C54B38E"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"82C297FCBCD634C4424F263D17480AA2F13975DF5846A5BB57246022CEEBE441"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"A2AA4C212DC2CA2C49BF58805F7C63363BC981018A01AC9609A7CBAB2A02CEDF"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"BC0DAE09C0BFBC4A49AA94B849266588BFD6E1F554B184B5788AC55D6E07EB95"}, .blob = Blob{'s'}},
{.key = ripple::uint256{"DCC8759A35CB946511763AA5553A82AA25F20B901C98C9BB74D423BCFAFF5F9D"}, .blob = Blob{'s'}},
};
}
using Settings = etl::CacheLoaderSettings;
struct ParametrizedCacheLoaderTest : CacheLoaderTest, WithParamInterface<Settings> {};
}; // namespace
TEST_F(CacheLoaderTest, FromCache)
//
// Tests of implementation
//
INSTANTIATE_TEST_CASE_P(
CacheLoaderTest,
ParametrizedCacheLoaderTest,
Values(
Settings{.numCacheDiffs = 32, .numCacheMarkers = 48, .cachePageFetchSize = 512, .numThreads = 2},
Settings{.numCacheDiffs = 32, .numCacheMarkers = 48, .cachePageFetchSize = 512, .numThreads = 4},
Settings{.numCacheDiffs = 32, .numCacheMarkers = 48, .cachePageFetchSize = 512, .numThreads = 8},
Settings{.numCacheDiffs = 32, .numCacheMarkers = 48, .cachePageFetchSize = 512, .numThreads = 16},
Settings{.numCacheDiffs = 32, .numCacheMarkers = 128, .cachePageFetchSize = 24, .numThreads = 2},
Settings{.numCacheDiffs = 32, .numCacheMarkers = 64, .cachePageFetchSize = 48, .numThreads = 4},
Settings{.numCacheDiffs = 32, .numCacheMarkers = 48, .cachePageFetchSize = 64, .numThreads = 8},
Settings{.numCacheDiffs = 32, .numCacheMarkers = 24, .cachePageFetchSize = 128, .numThreads = 16},
Settings{.numCacheDiffs = 128, .numCacheMarkers = 128, .cachePageFetchSize = 24, .numThreads = 2},
Settings{.numCacheDiffs = 1024, .numCacheMarkers = 64, .cachePageFetchSize = 48, .numThreads = 4},
Settings{.numCacheDiffs = 512, .numCacheMarkers = 48, .cachePageFetchSize = 64, .numThreads = 8},
Settings{.numCacheDiffs = 64, .numCacheMarkers = 24, .cachePageFetchSize = 128, .numThreads = 16}
),
[](auto const& info) {
auto const settings = info.param;
return fmt::format(
"diffs_{}__markers_{}__fetchSize_{}__threads_{}",
settings.numCacheDiffs,
settings.numCacheMarkers,
settings.cachePageFetchSize,
settings.numThreads
);
}
);
TEST_P(ParametrizedCacheLoaderTest, LoadCacheWithDifferentSettings)
{
CacheLoader loader{cfg, ctx, backend, cache};
auto const diffs = getLatestDiff();
ON_CALL(*backend, fetchLedgerDiff(_, _)).WillByDefault(Return(diffs));
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(32);
auto const& settings = GetParam();
auto const diffs = diffProvider.getLatestDiff();
auto const loops = diffs.size() + 1;
auto const keysSize = 14;
std::mutex keysMutex;
std::map<std::thread::id, uint32_t> threadKeysMap;
ON_CALL(*backend, doFetchSuccessorKey(_, SEQ, _)).WillByDefault(Invoke([&]() -> std::optional<ripple::uint256> {
// mock the result from doFetchSuccessorKey, be aware this function will be called from multiple threads
// for each thread, the last 2 items must be end flag and nullopt, otherwise it will loop forever
std::lock_guard<std::mutex> const guard(keysMutex);
threadKeysMap[std::this_thread::get_id()]++;
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).WillRepeatedly(Return(diffs));
EXPECT_CALL(*backend, doFetchSuccessorKey(_, SEQ, _)).Times(keysSize * loops).WillRepeatedly([this]() {
return diffProvider.nextKey(keysSize);
});
if (threadKeysMap[std::this_thread::get_id()] == keysSize - 1) {
return lastKey;
}
if (threadKeysMap[std::this_thread::get_id()] == keysSize) {
threadKeysMap[std::this_thread::get_id()] = 0;
return std::nullopt;
}
return ripple::uint256{INDEX1};
}));
EXPECT_CALL(*backend, doFetchSuccessorKey).Times(keysSize * loops);
ON_CALL(*backend, doFetchLedgerObjects(_, SEQ, _))
.WillByDefault(Return(std::vector<Blob>{keysSize - 1, Blob{'s'}}));
EXPECT_CALL(*backend, doFetchLedgerObjects).Times(loops);
EXPECT_CALL(*backend, doFetchLedgerObjects(_, SEQ, _))
.WillRepeatedly(Return(std::vector<Blob>(keysSize - 1, Blob{'s'})));
EXPECT_CALL(cache, updateImp).Times(loops);
EXPECT_CALL(cache, isFull).Times(1);
EXPECT_CALL(cache, setFull).Times(1);
std::mutex m;
std::condition_variable cv;
bool cacheReady = false;
ON_CALL(cache, setFull).WillByDefault(Invoke([&]() {
{
std::lock_guard const lk(m);
cacheReady = true;
}
cv.notify_one();
}));
// cache is successfully loaded
async::CoroExecutionContext ctx{settings.numThreads};
etl::impl::CursorProvider provider{backend, settings.numCacheDiffs};
etl::impl::CacheLoaderImpl<MockCache> loader{
ctx, backend, cache, SEQ, settings.numCacheMarkers, settings.cachePageFetchSize, provider.getCursors(SEQ)
};
loader.wait();
}
TEST_P(ParametrizedCacheLoaderTest, AutomaticallyCancelledAndAwaitedInDestructor)
{
auto const& settings = GetParam();
auto const diffs = diffProvider.getLatestDiff();
auto const loops = diffs.size() + 1;
auto const keysSize = 1024;
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).WillRepeatedly(Return(diffs));
EXPECT_CALL(*backend, doFetchSuccessorKey(_, SEQ, _)).Times(AtMost(keysSize * loops)).WillRepeatedly([this]() {
return diffProvider.nextKey(keysSize);
});
EXPECT_CALL(*backend, doFetchLedgerObjects(_, SEQ, _))
.WillRepeatedly(Return(std::vector<Blob>(keysSize - 1, Blob{'s'})));
EXPECT_CALL(cache, updateImp).Times(AtMost(loops));
EXPECT_CALL(cache, setFull).Times(AtMost(1));
async::CoroExecutionContext ctx{settings.numThreads};
etl::impl::CursorProvider provider{backend, settings.numCacheDiffs};
etl::impl::CacheLoaderImpl<MockCache> loader{
ctx, backend, cache, SEQ, settings.numCacheMarkers, settings.cachePageFetchSize, provider.getCursors(SEQ)
};
// no loader.wait(): loader is immediately stopped and awaited in destructor
}
//
// Tests of public CacheLoader interface
//
TEST_F(CacheLoaderTest, SyncCacheLoaderWaitsTillFullyLoaded)
{
auto const cfg = util::Config(json::parse(R"({"cache": {"load": "sync"}})"));
CacheLoader loader{cfg, backend, cache};
auto const diffs = diffProvider.getLatestDiff();
auto const loops = diffs.size() + 1;
auto const keysSize = 14;
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(32).WillRepeatedly(Return(diffs));
EXPECT_CALL(*backend, doFetchSuccessorKey).Times(keysSize * loops).WillRepeatedly([this]() {
return diffProvider.nextKey(keysSize);
});
EXPECT_CALL(*backend, doFetchLedgerObjects(_, SEQ, _))
.Times(loops)
.WillRepeatedly(Return(std::vector<Blob>{keysSize - 1, Blob{'s'}}));
EXPECT_CALL(cache, updateImp).Times(loops);
EXPECT_CALL(cache, isFull).WillOnce(Return(false)).WillRepeatedly(Return(true));
EXPECT_CALL(cache, setFull).Times(1);
loader.load(SEQ);
{
std::unique_lock lk(m);
cv.wait_for(lk, std::chrono::milliseconds(300), [&] { return cacheReady; });
}
}
TEST_F(CacheLoaderTest, AsyncCacheLoaderCanBeStopped)
{
auto const cfg = util::Config(json::parse(R"({"cache": {"load": "async"}})"));
CacheLoader loader{cfg, backend, cache};
auto const diffs = diffProvider.getLatestDiff();
auto const loops = diffs.size() + 1;
auto const keysSize = 14;
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(AtMost(32)).WillRepeatedly(Return(diffs));
EXPECT_CALL(*backend, doFetchSuccessorKey).Times(AtMost(keysSize * loops)).WillRepeatedly([this]() {
return diffProvider.nextKey(keysSize);
});
EXPECT_CALL(*backend, doFetchLedgerObjects(_, SEQ, _))
.Times(AtMost(loops))
.WillRepeatedly(Return(std::vector<Blob>{keysSize - 1, Blob{'s'}}));
EXPECT_CALL(cache, updateImp).Times(AtMost(loops));
EXPECT_CALL(cache, isFull).WillRepeatedly(Return(false));
EXPECT_CALL(cache, setFull).Times(AtMost(1));
loader.load(SEQ);
loader.stop();
loader.wait();
}
TEST_F(CacheLoaderTest, DisabledCacheLoaderDoesNotLoadCache)
{
auto cfg = util::Config(json::parse(R"({"cache": {"load": "none"}})"));
CacheLoader loader{cfg, backend, cache};
EXPECT_CALL(cache, updateImp).Times(0);
EXPECT_CALL(cache, isFull).WillRepeatedly(Return(false));
EXPECT_CALL(cache, setDisabled).Times(1);
loader.load(SEQ);
}

View File

@@ -0,0 +1,87 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "data/Types.hpp"
#include "etl/impl/CursorProvider.hpp"
#include "etl/impl/FakeDiffProvider.hpp"
#include "util/Fixtures.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <ripple/basics/Blob.h>
#include <ripple/basics/base_uint.h>
#include <cstddef>
using namespace etl;
using namespace util;
using namespace data;
using namespace testing;
namespace {
constexpr auto SEQ = 30;
struct CursorProviderTest : MockBackendTestNaggy {
DiffProvider diffProvider;
};
struct ParametrizedCursorProviderTest : CursorProviderTest, WithParamInterface<std::size_t> {};
INSTANTIATE_TEST_CASE_P(
CursorProviderTest,
ParametrizedCursorProviderTest,
Values(32, 64, 128, 512, 1024, 3, 2, 1),
[](auto const& info) {
auto const diffs = info.param;
return fmt::format("diffs_{}", diffs);
}
);
}; // namespace
TEST_P(ParametrizedCursorProviderTest, GetCursorsWithDifferentProviderSettings)
{
auto const numDiffs = GetParam();
auto const diffs = diffProvider.getLatestDiff();
auto const provider = etl::impl::CursorProvider{backend, numDiffs};
ON_CALL(*backend, fetchLedgerDiff(_, _)).WillByDefault(Return(diffs));
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(numDiffs);
auto const cursors = provider.getCursors(SEQ);
ASSERT_EQ(cursors.size(), diffs.size() + 1);
EXPECT_EQ(cursors.front().start, firstKey);
EXPECT_EQ(cursors.back().end, lastKey);
}
TEST_F(CursorProviderTest, EmptyCursorIsHandledCorrectly)
{
auto const diffs = diffProvider.getLatestDiff();
auto const provider = etl::impl::CursorProvider{backend, 0};
ON_CALL(*backend, fetchLedgerDiff(_, _)).WillByDefault(Return(diffs));
EXPECT_CALL(*backend, fetchLedgerDiff(_, _)).Times(0);
auto const cursors = provider.getCursors(SEQ);
ASSERT_EQ(cursors.size(), 1);
EXPECT_EQ(cursors.front().start, firstKey);
EXPECT_EQ(cursors.back().end, lastKey);
}

View File

@@ -0,0 +1,77 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/Types.hpp"
#include <ripple/basics/Blob.h>
#include <ripple/basics/base_uint.h>
#include <cstddef>
#include <cstdint>
#include <map>
#include <mutex>
#include <optional>
#include <thread>
#include <vector>
struct DiffProvider {
std::vector<data::LedgerObject>
getLatestDiff() // NOLINT(readability-convert-member-functions-to-static)
{
using namespace ripple;
return {
{.key = uint256{"05E1EAC2574BE082B00B16F907CE32E6058DEB8F9E81CF34A00E80A5D71FA4FE"}, .blob = Blob{'s'}},
{.key = uint256{"110872C7196EE6EF7032952F1852B11BB461A96FF2D7E06A8003B4BB30FD130B"}, .blob = Blob{'s'}},
{.key = uint256{"3B3A84E850C724E914293271785A31D0BFC8B9DD1B6332E527B149AD72E80E18"}, .blob = Blob{'s'}},
{.key = uint256{"4EC98C5C3F34C44409BC058998CBD64F6AED3FF6C0CAAEC15F7F42DF14EE9F04"}, .blob = Blob{'s'}},
{.key = uint256{"58CEC9F17733EA7BA68C88E6179B8F207D001EE04D4E0366F958CC04FF6AB834"}, .blob = Blob{'s'}},
{.key = uint256{"64FB1712146BA604C274CC335C5DE7ADFE52D1F8C3E904A9F9765FE8158A3E01"}, .blob = Blob{'s'}},
{.key = uint256{"700BE23B1D9EE3E6BF52543D05843D5345B85D9EDB3D33BBD6B4C3A13C54B38E"}, .blob = Blob{'s'}},
{.key = uint256{"82C297FCBCD634C4424F263D17480AA2F13975DF5846A5BB57246022CEEBE441"}, .blob = Blob{'s'}},
{.key = uint256{"A2AA4C212DC2CA2C49BF58805F7C63363BC981018A01AC9609A7CBAB2A02CEDF"}, .blob = Blob{'s'}},
{.key = uint256{"BC0DAE09C0BFBC4A49AA94B849266588BFD6E1F554B184B5788AC55D6E07EB95"}, .blob = Blob{'s'}},
{.key = uint256{"DCC8759A35CB946511763AA5553A82AA25F20B901C98C9BB74D423BCFAFF5F9D"}, .blob = Blob{'s'}},
};
}
std::optional<ripple::uint256>
nextKey(std::size_t keysSize)
{
// mock the result from doFetchSuccessorKey, be aware this function will be called from multiple threads
std::lock_guard<std::mutex> const guard(keysMutex);
threadKeysMap[std::this_thread::get_id()]++;
if (threadKeysMap[std::this_thread::get_id()] == keysSize - 1) {
return data::lastKey;
}
if (threadKeysMap[std::this_thread::get_id()] == keysSize) {
threadKeysMap[std::this_thread::get_id()] = 0;
return std::nullopt;
}
return ripple::uint256{"E6DBAFC99223B42257915A63DFC6B0C032D4070F9A574B255AD97466726FC321"};
}
private:
std::mutex keysMutex;
std::map<std::thread::id, uint32_t> threadKeysMap;
};

View File

@@ -26,7 +26,7 @@ TEST(AssertTests, assertTrue)
EXPECT_NO_THROW(ASSERT(true, "Should not fail"));
}
TEST(AssertTests, assertFalse)
TEST(AssertDeathTest, assertFalse)
{
EXPECT_DEATH({ ASSERT(false, "failure"); }, ".*");
}

View File

@@ -42,6 +42,7 @@ struct AnyOperationTests : Test {
AnyOperation<int> intOp{impl::ErasedOperation(static_cast<OperationType&>(mockOp))};
AnyOperation<void> scheduledVoidOp{impl::ErasedOperation(static_cast<ScheduledOperationType&>(mockScheduledOp))};
};
using AnyOperationDeathTest = AnyOperationTests;
TEST_F(AnyOperationTests, VoidDataYieldsNoError)
{
@@ -99,3 +100,13 @@ TEST_F(AnyOperationTests, GetIncorrectDataReturnsError)
EXPECT_TRUE(res.error().message.ends_with("Bad any cast"));
EXPECT_TRUE(std::string{res.error()}.ends_with("Bad any cast"));
}
TEST_F(AnyOperationDeathTest, CallRequestStopOnNonStoppableOperation)
{
EXPECT_DEATH(voidOp.requestStop(), ".*");
}
TEST_F(AnyOperationDeathTest, CallCancelForNonCancellableOperation)
{
EXPECT_DEATH(voidOp.cancel(), ".*");
}

View File

@@ -19,6 +19,8 @@
#include "util/async/AnyStopToken.hpp"
#include <boost/asio/spawn.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
using namespace util::async;
@@ -36,6 +38,7 @@ struct FakeStopToken {
} // namespace
struct AnyStopTokenTests : public TestWithParam<bool> {};
using AnyStopTokenDeathTest = AnyStopTokenTests;
INSTANTIATE_TEST_CASE_P(AnyStopTokenGroup, AnyStopTokenTests, ValuesIn({true, false}), [](auto const& info) {
return info.param ? "true" : "false";
@@ -57,3 +60,10 @@ TEST_P(AnyStopTokenTests, IsStopRequestedCallPropagated)
EXPECT_EQ(stopToken.isStopRequested(), flag);
EXPECT_EQ(stopToken, flag);
}
TEST_F(AnyStopTokenDeathTest, ConversionToYieldContextAssertsIfUnsupported)
{
EXPECT_DEATH(
[[maybe_unused]] auto unused = static_cast<boost::asio::yield_context>(AnyStopToken{FakeStopToken{}}), ".*"
);
}

View File

@@ -32,7 +32,7 @@
using namespace util::prometheus;
TEST(MetricBuilderTest, build)
TEST(MetricBuilderDeathTest, build)
{
std::string const name = "name";
std::string const labelsString = "{label1=\"value1\"}";